for all required records in a set. Added a controller class but decided to put stuff in __init__ instead, at least for now. Added a DateField which converts datetime.date into the proper string format for EFW2 files (hopefully), this should still be tested next week.
143 lines
4.3 KiB
Python
143 lines
4.3 KiB
Python
from record import *
|
|
|
|
RECORD_TYPES = [
|
|
'SubmitterRecord',
|
|
'EmployerRecord',
|
|
'EmployeeWageRecord',
|
|
'OptionalEmployeeWageRecord',
|
|
'TotalRecord',
|
|
'StateWageRecord',
|
|
'OptionalTotalRecord',
|
|
'StateTotalRecord',
|
|
'FinalRecord'
|
|
]
|
|
|
|
def test():
|
|
import record, model
|
|
for rname in RECORD_TYPES:
|
|
inst = record.__dict__[rname]()
|
|
print type(inst), inst.record_identifier, len(inst.output())
|
|
|
|
|
|
def test_dump():
|
|
import record, StringIO
|
|
records = [
|
|
record.SubmitterRecord(),
|
|
record.EmployerRecord(),
|
|
record.EmployeeWageRecord(),
|
|
]
|
|
out = StringIO.StringIO()
|
|
dump(records, out)
|
|
return out
|
|
|
|
|
|
def test_record_order():
|
|
import record
|
|
records = [
|
|
record.SubmitterRecord(),
|
|
record.EmployerRecord(),
|
|
record.EmployeeWageRecord(),
|
|
record.TotalRecord(),
|
|
record.FinalRecord(),
|
|
]
|
|
verify_record_order(records)
|
|
|
|
|
|
def test_load(fp):
|
|
return load(fp)
|
|
|
|
def load(fp):
|
|
# BUILD LIST OF RECORD TYPES
|
|
import record
|
|
types = {}
|
|
for r in RECORD_TYPES:
|
|
klass = record.__dict__[r]
|
|
types[klass.record_identifier] = klass
|
|
|
|
# PARSE DATA INTO RECORDS AND YIELD THEM
|
|
while fp.tell() < fp.len:
|
|
record_ident = fp.read(2)
|
|
if record_ident in types:
|
|
record = types[record_ident]()
|
|
record.read(fp)
|
|
yield record
|
|
|
|
def loads(s):
|
|
import StringIO
|
|
fp = StringIO.StringIO(s)
|
|
return load(fp)
|
|
|
|
|
|
def dump(records, fp):
|
|
for r in records:
|
|
fp.write(r.output())
|
|
|
|
def dumps(records):
|
|
import StringIO
|
|
fp = StringIO.StringIO()
|
|
dump(records, fp)
|
|
fp.seek(0)
|
|
return fp.read()
|
|
|
|
|
|
# THIS WAS IN CONTROLLER, BUT UNLESS WE
|
|
# REALLY NEED A CONTROLLER CLASS, IT'S SIMPLER
|
|
# TO JUST KEEP IT IN HERE.
|
|
def verify_required_records(records):
|
|
types = [rec.__class__.__name__ for rec in records]
|
|
req_types = []
|
|
|
|
for r in record.RECORD_TYPES:
|
|
klass = record.__dict__[r]
|
|
if klass.required:
|
|
req_types.append(klass.__name__)
|
|
|
|
while req_types:
|
|
req = req_types[0]
|
|
if req not in types:
|
|
raise ValidationError("Record set missing required record: %s" % req)
|
|
else:
|
|
req_types.remove(req)
|
|
|
|
def verify_record_order(records):
|
|
import record
|
|
from fields import ValidationError
|
|
|
|
# 1st record must be SubmitterRecord
|
|
if not isinstance(records[0], record.SubmitterRecord):
|
|
raise ValidationError("First record must be SubmitterRecord")
|
|
|
|
# 2nd record must be EmployeeRecord
|
|
if not isinstance(records[1], record.EmployerRecord):
|
|
raise ValidationError("The first record after SubmitterRecord must be an EmployeeRecord")
|
|
|
|
# FinalRecord - Must be the last record on the file
|
|
if not isinstance(records[-1], record.FinalRecord):
|
|
raise ValidationError("Last record must be a FinalRecord")
|
|
|
|
# an EmployerRecord *must* come after each EmployeeWageREcord
|
|
for i in range(len(records)):
|
|
if isinstance(records[i], record.EmployerRecord):
|
|
if not isinstance(records[i+1], record.EmployeeWageRecord):
|
|
raise ValidationError("All EmployerRecords must be followed by an EmployeeWageRecord")
|
|
|
|
num_ro_records = len(filter(lambda x:isinstance(x, record.OptionalEmployeeWageRecord), records))
|
|
num_ru_records = len(filter(lambda x:isinstance(x, record.OptionalTotalRecord), records))
|
|
num_employer_records = len(filter(lambda x:isinstance(x, record.EmployerRecord), records))
|
|
num_total_records = len(filter(lambda x: isinstance(x, record.TotalRecord), records))
|
|
|
|
# a TotalRecord is required for each instance of an EmployeeRecord
|
|
if num_total_records != num_employer_records:
|
|
raise ValidationError("Number of TotalRecords (%d) does not match number of EmployeeRecords (%d)" % (
|
|
num_total_records, num_employer_records))
|
|
|
|
# an OptionalTotalRecord is required for each OptionalEmployeeWageRecord
|
|
if num_ro_records != num_ru_records:
|
|
raise ValidationError("Number of OptionalEmployeeWageRecords (%d) does not match number OptionalTotalRecords (%d)" % (
|
|
num_ro_records, num_ru_records))
|
|
|
|
# FinalRecord - Must appear only once on each file.
|
|
if len(filter(lambda x:isinstance(x, record.FinalRecord), records)) != 1:
|
|
raise ValidationError("Incorrect number of FinalRecords")
|
|
|
|
|