pyaccuwage/__init__.py
Binh Van Nguyen 5781cbf335 Finished up most of the record order validation and also checking
for all required records in a set. Added a controller class but
decided to put stuff in __init__ instead, at least for now.
Added a DateField which converts datetime.date into the proper
string format for EFW2 files (hopefully), this should still be
tested next week.
2011-05-07 15:19:48 -05:00

143 lines
4.3 KiB
Python

from record import *
RECORD_TYPES = [
'SubmitterRecord',
'EmployerRecord',
'EmployeeWageRecord',
'OptionalEmployeeWageRecord',
'TotalRecord',
'StateWageRecord',
'OptionalTotalRecord',
'StateTotalRecord',
'FinalRecord'
]
def test():
import record, model
for rname in RECORD_TYPES:
inst = record.__dict__[rname]()
print type(inst), inst.record_identifier, len(inst.output())
def test_dump():
import record, StringIO
records = [
record.SubmitterRecord(),
record.EmployerRecord(),
record.EmployeeWageRecord(),
]
out = StringIO.StringIO()
dump(records, out)
return out
def test_record_order():
import record
records = [
record.SubmitterRecord(),
record.EmployerRecord(),
record.EmployeeWageRecord(),
record.TotalRecord(),
record.FinalRecord(),
]
verify_record_order(records)
def test_load(fp):
return load(fp)
def load(fp):
# BUILD LIST OF RECORD TYPES
import record
types = {}
for r in RECORD_TYPES:
klass = record.__dict__[r]
types[klass.record_identifier] = klass
# PARSE DATA INTO RECORDS AND YIELD THEM
while fp.tell() < fp.len:
record_ident = fp.read(2)
if record_ident in types:
record = types[record_ident]()
record.read(fp)
yield record
def loads(s):
import StringIO
fp = StringIO.StringIO(s)
return load(fp)
def dump(records, fp):
for r in records:
fp.write(r.output())
def dumps(records):
import StringIO
fp = StringIO.StringIO()
dump(records, fp)
fp.seek(0)
return fp.read()
# THIS WAS IN CONTROLLER, BUT UNLESS WE
# REALLY NEED A CONTROLLER CLASS, IT'S SIMPLER
# TO JUST KEEP IT IN HERE.
def verify_required_records(records):
types = [rec.__class__.__name__ for rec in records]
req_types = []
for r in record.RECORD_TYPES:
klass = record.__dict__[r]
if klass.required:
req_types.append(klass.__name__)
while req_types:
req = req_types[0]
if req not in types:
raise ValidationError("Record set missing required record: %s" % req)
else:
req_types.remove(req)
def verify_record_order(records):
import record
from fields import ValidationError
# 1st record must be SubmitterRecord
if not isinstance(records[0], record.SubmitterRecord):
raise ValidationError("First record must be SubmitterRecord")
# 2nd record must be EmployeeRecord
if not isinstance(records[1], record.EmployerRecord):
raise ValidationError("The first record after SubmitterRecord must be an EmployeeRecord")
# FinalRecord - Must be the last record on the file
if not isinstance(records[-1], record.FinalRecord):
raise ValidationError("Last record must be a FinalRecord")
# an EmployerRecord *must* come after each EmployeeWageREcord
for i in range(len(records)):
if isinstance(records[i], record.EmployerRecord):
if not isinstance(records[i+1], record.EmployeeWageRecord):
raise ValidationError("All EmployerRecords must be followed by an EmployeeWageRecord")
num_ro_records = len(filter(lambda x:isinstance(x, record.OptionalEmployeeWageRecord), records))
num_ru_records = len(filter(lambda x:isinstance(x, record.OptionalTotalRecord), records))
num_employer_records = len(filter(lambda x:isinstance(x, record.EmployerRecord), records))
num_total_records = len(filter(lambda x: isinstance(x, record.TotalRecord), records))
# a TotalRecord is required for each instance of an EmployeeRecord
if num_total_records != num_employer_records:
raise ValidationError("Number of TotalRecords (%d) does not match number of EmployeeRecords (%d)" % (
num_total_records, num_employer_records))
# an OptionalTotalRecord is required for each OptionalEmployeeWageRecord
if num_ro_records != num_ru_records:
raise ValidationError("Number of OptionalEmployeeWageRecords (%d) does not match number OptionalTotalRecords (%d)" % (
num_ro_records, num_ru_records))
# FinalRecord - Must appear only once on each file.
if len(filter(lambda x:isinstance(x, record.FinalRecord), records)) != 1:
raise ValidationError("Incorrect number of FinalRecords")