pyaccuwage/pyaccuwage/pdfextract.py
Binh Nguyen 31ff97db8a Almost have things working. It seems like some of the record results
are overlapping. I'm assuming this is due to missing a continue
or something inside the ColumnCollector. I added a couple new IsNextRecord
exceptions in response to blank rows, but this may be causing more problems
than expected. Next step is probably to check the records returned, and verify
that nothing is being duplicated. Some of the duplicates may be filtered out
by the RecordBuilder class, or during the fields filtering in the pyaccuwage-pdfparse
script (see: fields).
2012-11-20 16:05:36 -06:00

258 lines
7.5 KiB
Python

#!/usr/bin/python
# coding=UTF-8
import subprocess
import re
import pdb
""" pdftotext -layout -nopgbrk p1220.pdf - """
class PDFRecordFinder(object):
def __init__(self, src, heading_exp=None):
if not heading_exp:
heading_exp = re.compile('\s+Record Name: (.*)')
opts = ["pdftotext", "-layout", "-nopgbrk", "-eol", "unix", src, '-']
pdftext = subprocess.check_output(opts)
self.textrows = pdftext.split('\n')
self.heading_exp = heading_exp
def records(self):
headings = self.locate_heading_rows()
for (start, end, name) in headings:
name = name.decode('ascii', 'ignore')
yield (name, list(self.find_fields(iter(self.textrows[start+1:end]))))
def locate_heading_rows(self):
results = []
for (i, row) in enumerate(self.textrows):
match = self.heading_exp.match(row)
if match:
results.append((i, ''.join(match.groups())))
merged = []
for (a, b) in zip(results, results[1:] + [(len(self.textrows),None)]):
merged.append( (a[0], b[0]-1, a[1]) )
return merged
def find_fields(self, row_iter):
cc = ColumnCollector()
blank_row_counter = 0
for r in row_iter:
row = self.extract_columns_from_row(r.decode('UTF-8'))
if not row:
cc.empty_row()
continue
try:
cc.add(row)
except IsNextField, e:
yield cc
cc = ColumnCollector()
cc.add(row)
except UnknownColumn, e:
raise StopIteration
yield cc
def find_fields_old(self, row_iter):
cc = ColumnCollector()
for r in row_iter:
row = self.extract_columns_from_row(r.decode('UTF-8'))
if not row:
continue
if cc.is_next_field(row):
#if row[1][1] == 'Vendor Indicator':
# import pdb
# pdb.set_trace()
yield cc
cc = ColumnCollector()
try:
cc.add(row)
except UnknownColumn, e:
raise StopIteration
yield cc
def extract_columns_from_row(self, row):
re_multiwhite = re.compile(r'\s{2,}')
# IF LINE DOESN'T CONTAIN MULTIPLE WHITESPACES, IT'S LIKELY NOT A TABLE
if not re_multiwhite.search(row):
return None
white_ranges = [0,]
pos = 0
while pos < len(row):
match = re_multiwhite.search(row[pos:])
if match:
white_ranges.append(pos + match.start())
white_ranges.append(pos + match.end())
pos += match.end()
else:
white_ranges.append(len(row))
pos = len(row)
row_result = []
white_iter = iter(white_ranges)
while white_iter:
try:
start = white_iter.next()
end = white_iter.next()
if start != end:
row_result.append(
(start, row[start:end].encode('ascii','ignore'))
)
except StopIteration:
white_iter = None
return row_result
class UnknownColumn(Exception):
pass
class IsNextField(Exception):
pass
class ColumnCollector(object):
def __init__(self, initial=None):
self.data = None
self.column_widths = None
self.max_data_length = 0
self.adjust_pad = 3
self.empty_rows = 0
pass
def add(self, data):
if self.empty_rows > 2:
raise IsNextField()
if not self.data:
self.data = dict(data)
else:
data = self.adjust_columns(data)
if self.is_next_field(data):
raise IsNextField()
for col_id, value in data:
self.merge_column(col_id, value)
self.update_column_widths(data)
def empty_row(self):
self.empty_rows += 1
def update_column_widths(self, data):
self.last_data_length = len(data)
self.max_data_length = max(self.max_data_length, len(data))
if not self.column_widths:
self.column_widths = dict(map(lambda (column, value): [column, column + len(value)], data))
else:
for col_id, value in data:
try:
self.column_widths[col_id] = max(self.column_widths[col_id], col_id + len(value.strip()))
except KeyError:
pass
def add_old(self, data):
if not self.data:
self.data = dict(data)
else:
if self.is_next_field(data):
raise IsNextField()
for col_id, value in data:
self.merge_column(col_id, value)
def adjust_columns(self, data):
adjusted_data = {}
for col_id, value in data:
if col_id in self.data.keys():
adjusted_data[col_id] = value.strip()
else:
for col_start, col_end in self.column_widths.items():
if (col_start - self.adjust_pad) <= col_id and (col_end + self.adjust_pad) >= col_id:
if col_start in adjusted_data:
adjusted_data[col_start] += ' ' + value.strip()
else:
adjusted_data[col_start] = value.strip()
return adjusted_data.items()
def merge_column(self, col_id, value):
if col_id in self.data.keys():
self.data[col_id] += ' ' + value.strip()
else:
# try adding a wiggle room value?
# FIXME:
# Sometimes description columns contain column-like
# layouts, and this causes the ColumnCollector to become
# confused. Perhaps we could check to see if a column occurs
# after the maximum column, and assume it's part of the
# max column?
"""
for col_start, col_end in self.column_widths.items():
if col_start <= col_id and (col_end) >= col_id:
self.data[col_start] += ' ' + value.strip()
return
"""
raise UnknownColumn
def is_next_field(self, data):
"""
If the first key value contains a string
and we already have some data in the record,
then this row is probably the beginning of
the next field. Raise an exception and continue
on with a fresh ColumnCollector.
"""
""" If the length of the value in column_id is less than the position of the next column_id,
then this is probably a continuation.
"""
if self.data and data:
keys = dict(self.column_widths).keys()
keys.sort()
keys += [None]
if self.last_data_length < len(data):
return True
first_key, first_value = dict(data).items()[0]
if self.data.keys()[0] == first_key:
position = keys.index(first_key)
max_length = keys[position + 1]
if max_length:
return len(first_value) > max_length or len(data) == self.max_data_length
return False
@property
def tuple(self):
try:
return tuple(map(lambda k:self.data[k], sorted(self.data.keys())))
except:
import pdb
pdb.set_trace()