pyaccuwage/pyaccuwage/parser.py

485 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/python
# coding=UTF-8
import re
class SimpleDefParser(object):
def __init__(self):
pass
def load(self, infile):
for row in infile:
tokens = tuple(self._tokenize(row))
yield tokens
def _intify(self, x):
try:
x = int(x.strip())
except (ValueError):
pass
return x
def _tokenize(self, row):
for item in row.split(','):
item = item.strip()
if ' ' in item:
item = item.replace(' ','_')
else:
item = item.upper()
if '-' in item:
parts = map(lambda x:self._intify(x), item.split('-'))
item = reduce(lambda x,y: y-x, parts)
else:
item = self._intify(item)
yield item
class BaseToken(object):
regexp = re.compile('(.*)')
def __init__(self, value):
self._value = value
def match(self, value):
return self.regexp
def __repr__(self):
return ",".join([str(self.__class__), self._value])
class StringToken(BaseToken):
regexp = re.compile('(.*)')
@property
def value(self):
return self._value.decode('ascii','ignore')
class RangeToken(BaseToken):
regexp = re.compile('(\d+)-(\d+)')
@property
def value(self):
return reduce(lambda x,y: y-x, map(int, self._value.split('-')))+1
@property
def end_position(self):
return int(self._value.split('-')[1])
class NumericToken(BaseToken):
regexp = re.compile('^(\d+)$')
@property
def value(self):
return int(self._value)
class PastedDefParser(object):
import fields
TOKEN_TYPES = [
RangeToken,
NumericToken,
StringToken,
]
FIELD_TYPES = [
(fields.BlankField, {'name': [
re.compile(r'^blank$'),
]}),
(fields.MoneyField, {'desc': [
re.compile(r'right\-justified'),
re.compile(r'amount'),
re.compile(r'zero\-filled'),
]}),
(fields.TextField, {'desc': [
re.compile(r'enter blanks')
]})
]
def load(self, infile):
tokens = self._tokenize(infile)
entries = self._parse(tokens)
entries = self._compile(entries)
entries = self._guess_field_types(entries)
entries = self._convert_to_records(entries)
return entries
def _tokenize(self, data):
for item in data.replace('\n',' ').split(' '):
item = item.strip()
if len(item) == 0:
continue
for tclass in self.TOKEN_TYPES:
if tclass.regexp.match(item):
yield tclass(item)
break
yield None
def _parse(self, tokens):
# TODO group things based on strides between RangeTokens, probably
# starting with range token, then the following BaseTokens are likely
# the field name, followed by a NumericToken, then Base/Numeric tokens
# for the field's description, until then next RangeToken is found.
groups = []
current_range = None
current_name = []
current_length = None
current_desc = []
state = 'range'
byte_pos = None
# COLLECT TOKENS INTO GROUPS
for token in tokens:
if isinstance(token, NumericToken):
# THIS MAY BE A RANGE, IF THE VALUE MATCHES THE CURRENT byte_pos
# THEN WE ASSUME THIS TO BE A SINGLE BYTE RANGE DEFINITION,
# AND WE WILL TREAT IT AS A NEW RECORD.
if byte_pos == None or token.value == byte_pos:
# UPDATE RANGE POSITION
byte_pos = token.value + 1
# CONVERT TOKEN INTO RangeToken
token = RangeToken("%d-%d" % (token.value, token.value))
if isinstance(token, RangeToken) or token == None:
if current_range:
groups.append({
'byterange': current_range,
'name': current_name,
'length': current_length,
'desc': current_desc,
})
# UPDATE RANGE POSITION
if token:
byte_pos = token.end_position + 1
current_range = token
current_name = []
current_length = None
current_desc = []
state = 'name'
elif state == 'name':
if isinstance(token, StringToken) and current_name and isinstance(current_name[-1], NumericToken):
current_length = current_name.pop()
state = 'desc'
else:
current_name.append(token)
if state == 'desc':
current_desc.append(token)
results = groups
return results
def _compile(self, groups):
for g in groups:
assert g['byterange'].value == g['length'].value
desc = u' '.join(map(lambda x:unicode(x.value), g['desc']))
if g['name'][-1].value.lower() == '(optional)':
g['name'] = g['name'][0:-1]
required = False
elif re.search('required', desc, re.IGNORECASE):
required = True
else:
required = None
name = u'_'.join(map(lambda x:x.value.lower(), g['name']))
name = re.sub(r'[^\w]','', name)
yield({
'name': name,
'desc': desc,
'length': g['byterange'].value,
'required': required,
})
def _guess_field_types(self, entries):
for entry in entries:
matches = dict(map(lambda x:(x[0],0), self.FIELD_TYPES))
for (classtype, criteria) in self.FIELD_TYPES:
for crit_key, crit_values in criteria.items():
for crit_re in crit_values:
matches[classtype] += 1 if crit_re.search(entry[crit_key]) else 0
matches = list(matches.items())
matches.sort(key=lambda x:x[1])
matches_found = True if sum(map(lambda x:x[1], matches)) > 0 else False
entry['guessed_type'] = matches[-1][0] if matches_found else self.fields.TextField
yield entry
def _convert_to_records(self, entries):
blank_count = 1
for entry in entries:
result = []
add = result.append
# FIELD NAME
if entry['name'] == 'blank':
add('blank%d' % blank_count)
blank_count += 1
else:
add(entry['name'])
add(' = ')
if entry['guessed_type']:
add(entry['guessed_type'].__name__)
args = []
args.append("max_length=%d" % entry['length'])
if entry['required'] != None:
args.append("required=%s" % ('True' if entry['required'] else 'False'))
add("(" + ", ".join(args) + ")")
yield "".join(result)
sdp = SimpleDefParser()
tokens = sdp.load([
"record type,text,1",
"payment year, year,2-5",
"corrected return indicator, 6",
])
pdp = PastedDefParser()
tokens2 = pdp.load("""
544 Second TIN
Notice
(Optional)
1 Enter “2” (two) to indicate notification by IRS twice within
three calendar years that the payee provided an incorrect name
and/or TIN combination; otherwise, enter a blank.
545-546 Blank 2 Enter blanks.
547-586 Foreign Country
or U.S.
Possession
40 Enter the name of the foreign country or U.S. possession to
which the withheld foreign tax (Amount Code 6) applies.
Otherwise, enter blanks.
587-599 CUSIP Number 13 Enter CUSIP Number. If the tax-exempt interest is reported in
the aggregate for multiple bonds or accounts, enter: VARIOUS.
Right-justify information and fill unused positions with blanks l.
600-662 Blank 63 Enter blanks.
663-722 Special Data
Entries
60 This portion of the “B” Record may be used to record
information for state or local government reporting or for the
filer's own purposes. Payers should contact the state or local
revenue departments for filing requirements. You may enter
your routing and transit number (RTN) here. If this field is not
utilized, enter blanks.
103-114 Payment
Amount 5*
12 The amount reported in this field represents payments for
Amount Code 5 in the “A” Record.
115-126 Payment
Amount 6*
12 The amount reported in this field represents payments for
Amount Code 6 in the “A” Record.
127-138 Payment
Amount 7*
12 The amount reported in this field represents payments for
Amount Code 7 in the “A” Record.
139-150 Payment
Amount 8*
12 The amount reported in this field represents payments f
""")
tokens3 = pdp.load("""
544-546 Blank 3 Enter blanks.
547 Type of
Payment
Indicator
1 Enter the appropriate indicator from the following table;
otherwise, enter blanks.
Indicator Usage
1 Per diem
2 Reimbursed amount
548-556 Social Security
Number of
Insured
9 Required. Enter the Social Security Number of the insured.
557-596 Name of Insured 40 Required. Enter the name of the insured.
597-636 Address of
Insured
40 Required. Enter the address of the insured. The street address
should include number, street, apartment or suite number (or PO
Box if mail is not delivered to street address). Left-justify
information and fill unused positions with blanks. This field
must not contain any data other than the payees address.
637-676 City of Insured 40 Required. Enter the city, town, or post office. Left-justify and
fill unused positions with blanks. Enter APO or FPO, if
applicable. Do not enter state and ZIP Code information in this
field.
677-678 State of Insured 2 Required. Enter the valid U.S. Postal Service state
abbreviations for states or the appropriate postal identifier (AA,
AE, or AP) described in Part A, Sec. 12.
679-687 ZIP Code of
Insured
9 Required. Enter the valid nine-digit ZIP Code assigned by the
U.S. Postal Service. If only the first five-digits are known, leftjustify information and fill the unused positions with blanks.
For foreign countries, alpha characters are acceptable as long as
the filer has entered a “1” (one) in the Foreign Country
Indicator, located in position 247 of the “B” Record.
688 Status of Illness
Indicator
(Optional)
1 Enter the appropriate code from the table below to indicate the
status of the illness of the insured; otherwise, enter blank.
Indicator Usage
1 Chronically ill
2 Terminally ill
689-696 Date Certified
(Optional)
8 Enter the latest date of a doctor's certification of the status of the
insured's illness. The format of the date is YYYYMMDD (e.g.,
January 5, 2011, would be 20110105). Do not enter hyphens
or slashes.
697 Qualified
Contract
Indicator
(Optional)
1 Enter a “1” (one) if benefits were from a qualified long-term
care insurance contract; otherwise, enter a blank.
698-722 Blank 25 Enter blanks.
723-734 State Income
Tax Withheld
12 State income tax withheld is for the convenience of the filers.
This information does not need to be reported to IRS. Rightjustify information and fill unused positions with zeros.
735-746 Local Income
Tax Withheld
12 Local income tax withheld is for the convenience of the filers.
This information does not need to be reported to IRS. The
payment amount must be right-justify information and fill
unused positions with zeros.
747-748 Blank 2 Enter blanks.
749-750 Blank 2 Enter blanks or carriage return/line feed (CR/LF) characters.
""")