pyaccuwage/pyaccuwage/parser.py
Binh Nguyen 1c7533973a Parsing all the way through the pdf appears to work. Next we need
to track the beginning/ending points for each record and append
continuation records onto the previous. There's some issue in
the pyaccuwage-pdfparse script causing it to have problems reading
the last record field in a record group. Maybe the record extractor
needs to dump the last failed ColumnCollector rather than return it
if it's determined to hold junk data?

The record builder seems to handle everything just fine.

Added a function to the field name parsing to replace ampersands
with an "and" string so as not to cause problems with variable names.
2012-11-13 15:53:41 -06:00

449 lines
13 KiB
Python

#!/usr/bin/python
# coding=UTF-8
"""
Parser utility to read data from Publication 1220 and
convert it into python classes.
"""
import re
class SimpleDefParser(object):
def __init__(self):
pass
def load(self, infile):
for row in infile:
tokens = tuple(self._tokenize(row))
yield tokens
def _intify(self, x):
try:
x = int(x.strip())
except (ValueError):
pass
return x
def _tokenize(self, row):
for item in row.split(','):
item = item.strip()
if ' ' in item:
item = item.replace(' ','_')
else:
item = item.upper()
if '-' in item:
parts = map(lambda x:self._intify(x), item.split('-'))
item = reduce(lambda x,y: y-x, parts)
else:
item = self._intify(item)
yield item
class LengthExpression(object):
import operator
REG = re.compile(r'([^\d]*)(\d+)')
OPS = {
'<': operator.lt,
'>': operator.gt,
'<=': operator.le,
'>=': operator.ge,
'=': operator.eq,
}
def __init__(self):
self.exp_cache = {}
def __call__(self, value, exps):
return len(exps) == sum(map(lambda x: self.check(value, x), exps))
def compile_exp(self, exp):
op, val = self.REG.match(exp).groups()
val = int(val)
return (self.OPS.get(op, None), val)
def check(self, value, exp):
if exp not in self.exp_cache:
(op, opval) = self.exp_cache[exp] = self.compile_exp(exp)
else:
(op, opval) = self.exp_cache[exp]
return op(value, opval)
class BaseToken(object):
regexp = re.compile('(.*)')
def __init__(self, value):
self._value = value
def match(self, value):
return self.regexp
def __repr__(self):
return ",".join([str(self.__class__), self._value])
class StringToken(BaseToken):
regexp = re.compile('(.*)')
@property
def value(self):
return self._value.decode('ascii','ignore')
class RangeToken(BaseToken):
regexp = re.compile('(\d+)-(\d+)')
@property
def value(self):
if '-' not in self._value:
return 1
return reduce(lambda x,y: y-x, map(int, self._value.split('-')))+1
@property
def end_position(self):
return int(self._value.split('-')[1])
@property
def start_position(self):
return int(self._value.split('-')[0])
class NumericToken(BaseToken):
regexp = re.compile('^(\d+)$')
@property
def value(self):
return int(self._value)
class RecordBuilder(object):
import fields
entry_max_length = 4
TOKEN_TYPES = [
RangeToken,
NumericToken,
StringToken,
]
FIELD_TYPES = [
(fields.BlankField, {
'regexp': {
'name': [
re.compile(r'^blank$'),
],
},
}),
(fields.MoneyField, {
'regexp': {
'desc': [
re.compile(r'right\-justified'),
re.compile(r'amount'),
re.compile(r'zero\-filled'),
],
},
}),
(fields.TextField, {
'regexp': {
'desc': [
re.compile(r'enter blanks'),
],
},
}),
(fields.StateField, {
'regexp': {
'desc': [
re.compile(r'state'),
re.compile(r'postal'),
],
},
'length': ['=2'],
})
]
def load(self, entries):
# EXPECTS ENTRIES TO BE IN THE FORM OF
# [('1-5', 'Field Name', '5', 'Description of field.'),...]
entries = self._compile(entries)
entries = self._guess_field_types(entries)
entries = self._convert_to_records(entries)
return entries
def _compile(self, entries):
for entry in entries:
if len(entry) > self.entry_max_length:
continue
(f_range, f_name, f_length, f_desc) = list(entry) + ['']*(self.entry_max_length-len(entry))
try:
f_length = int(f_length)
except ValueError, e:
# bad result, skip
continue
try:
assert f_length == RangeToken(f_range).value
except AssertionError, e:
continue
except ValueError, e:
# bad result, skip
continue
name_parts = f_name.split(' ')
if name_parts[-1].lower() == '(optional)':
name_parts = name_parts[0:-1]
required = False
elif re.search('required', f_desc, re.IGNORECASE):
required = True
else:
required = None
f_name = u'_'.join(map(lambda x:x.lower(), name_parts))
f_name = f_name.replace('&', 'and')
f_name = re.sub(r'[^\w]','', f_name)
yield {
'name': f_name,
'range': f_range,
'desc': f_desc,
'length': f_length,
'required': required,
}
def _guess_field_types(self, entries):
lengthexp = LengthExpression()
for entry in entries:
matches = dict(map(lambda x:(x[0],0), self.FIELD_TYPES))
for (classtype, criteria) in self.FIELD_TYPES:
if 'length' in criteria:
if not lengthexp(int(entry['length']), criteria['length']):
continue
if 'regexp' in criteria:
for crit_key, crit_values in criteria['regexp'].items():
for crit_re in crit_values:
matches[classtype] += 1 if crit_re.search(entry[crit_key]) else 0
matches = list(matches.items())
matches.sort(key=lambda x:x[1])
matches_found = True if sum(map(lambda x:x[1], matches)) > 0 else False
entry['guessed_type'] = matches[-1][0] if matches_found else self.fields.TextField
yield entry
def _convert_to_records(self, entries):
blank_count = 1
for entry in entries:
result = []
add = result.append
# FIELD NAME
if entry['name'] == 'blank':
add( (u'blank%d' % blank_count).ljust(40) )
blank_count += 1
else:
add(entry['name'].ljust(40))
add(' = ')
if entry['guessed_type']:
add(entry['guessed_type'].__name__)
args = []
args.append("max_length=%d" % entry['length'])
if entry['required'] != None:
args.append("required=%s" % ('True' if entry['required'] else 'False'))
add("(" + ", ".join(args) + ")")
yield "".join(result).ljust(85) + "# %s" % entry['range']
class PastedDefParser(RecordBuilder):
def load(self, infile):
tokens = self._tokenize(infile)
entries = self._parse(tokens)
entries = self._compile(entries)
entries = self._guess_field_types(entries)
entries = self._convert_to_records(entries)
return entries
def _tokenize(self, data):
for item in data.replace('\n',' ').split(' '):
item = item.strip()
if len(item) == 0:
continue
for tclass in self.TOKEN_TYPES:
if tclass.regexp.match(item):
yield tclass(item)
break
yield None
def _parse(self, tokens):
# TODO group things based on strides between RangeTokens, probably
# starting with range token, then the following BaseTokens are likely
# the field name, followed by a NumericToken, then Base/Numeric tokens
# for the field's description, until then next RangeToken is found.
groups = []
current_range = None
current_name = []
current_length = None
current_desc = []
state = 'range'
byte_pos = None
# COLLECT TOKENS INTO GROUPS
for token in tokens:
if isinstance(token, NumericToken):
# THIS MAY BE A RANGE, IF THE VALUE MATCHES THE CURRENT byte_pos
# THEN WE ASSUME THIS TO BE A SINGLE BYTE RANGE DEFINITION,
# AND WE WILL TREAT IT AS A NEW RECORD.
if byte_pos == None or token.value == byte_pos:
# UPDATE RANGE POSITION
byte_pos = token.value + 1
# CONVERT TOKEN INTO RangeToken
token = RangeToken("%d-%d" % (token.value, token.value))
if isinstance(token, RangeToken) or token == None:
# IF THIS RANGETOKEN IS NOT THE BEGINNING OF A NEW
# FIELD, THEN ITS RANGES WILL NOT MATCH THE PREVIOUS
# byte_pos AND IS ASSUMED TO BE PART OF THE DESCRIPTION.
#if byte_pos and token and state == 'desc' and token.start_position != byte_pos:
# print token.start_position, byte_pos
# current_desc.append(token)
if token and byte_pos and token.start_position != byte_pos:
state = 'desc'
else:
if current_range:
groups.append({
'byterange': current_range,
'name': current_name,
'length': current_length,
'desc': current_desc,
})
# UPDATE RANGE POSITION
if token:
byte_pos = token.end_position + 1
current_range = token
current_name = []
current_length = None
current_desc = []
state = 'name'
elif state == 'name':
if isinstance(token, StringToken) and current_name and isinstance(current_name[-1], NumericToken):
current_length = current_name.pop()
state = 'desc'
else:
current_name.append(token)
if state == 'desc':
current_desc.append(token)
results = groups
return results
def _compile(self, groups):
for g in groups:
assert g['byterange'].value == g['length'].value
desc = u' '.join(map(lambda x:unicode(x.value), g['desc']))
if g['name'][-1].value.lower() == '(optional)':
g['name'] = g['name'][0:-1]
required = False
elif re.search('required', desc, re.IGNORECASE):
required = True
else:
required = None
name = u'_'.join(map(lambda x:x.value.lower(), g['name']))
name = re.sub(r'[^\w]','', name)
yield({
'name': name,
'desc': desc,
'length': g['byterange'].value,
'required': required,
})
"""
def _guess_field_types(self, entries):
lengthexp = LengthExpression()
for entry in entries:
matches = dict(map(lambda x:(x[0],0), self.FIELD_TYPES))
for (classtype, criteria) in self.FIELD_TYPES:
if 'length' in criteria:
if not lengthexp(int(entry['length']), criteria['length']):
continue
if 'regexp' in criteria:
for crit_key, crit_values in criteria['regexp'].items():
for crit_re in crit_values:
matches[classtype] += 1 if crit_re.search(entry[crit_key]) else 0
matches = list(matches.items())
matches.sort(key=lambda x:x[1])
matches_found = True if sum(map(lambda x:x[1], matches)) > 0 else False
entry['guessed_type'] = matches[-1][0] if matches_found else self.fields.TextField
yield entry
"""
"""
def _convert_to_records(self, entries):
blank_count = 1
for entry in entries:
result = []
add = result.append
# FIELD NAME
if entry['name'] == 'blank':
add( (u'blank%d' % blank_count).ljust(40) )
blank_count += 1
else:
add(entry['name'].ljust(40))
add(' = ')
if entry['guessed_type']:
add(entry['guessed_type'].__name__)
args = []
args.append("max_length=%d" % entry['length'])
if entry['required'] != None:
args.append("required=%s" % ('True' if entry['required'] else 'False'))
add("(" + ", ".join(args) + ")")
yield "".join(result)
"""