update for 2023 p1220 parsing, stupid irs
This commit is contained in:
parent
86f8861da1
commit
66573e4d1d
3 changed files with 97 additions and 346 deletions
|
@ -3,313 +3,102 @@
|
|||
|
||||
import subprocess
|
||||
import re
|
||||
import pdb
|
||||
import itertools
|
||||
import fitz
|
||||
|
||||
""" pdftotext -layout -nopgbrk p1220.pdf - """
|
||||
|
||||
def strip_values(items):
|
||||
expr_non_alphanum = re.compile(r'[^\w\s]*', re.MULTILINE)
|
||||
return [expr_non_alphanum.sub(x, '').strip().replace('\n', ' ') for x in items if x]
|
||||
|
||||
class PDFRecordFinder(object):
|
||||
def __init__(self, src, heading_exp=None):
|
||||
if not heading_exp:
|
||||
heading_exp = re.compile('(\s+Record Name: (.*))|Record\ Layout')
|
||||
field_range_expr = re.compile(r'^(\d+)[-]?(\d*)$')
|
||||
|
||||
field_heading_exp = re.compile('^Field.*Field.*Length.*Description')
|
||||
def __init__(self, src):
|
||||
self.document = fitz.open(src)
|
||||
|
||||
opts = ["pdftotext", "-layout", "-nopgbrk", "-eol", "unix", src, '-']
|
||||
pdftext = subprocess.check_output(opts)
|
||||
self.textrows = pdftext.split('\n')
|
||||
self.heading_exp = heading_exp
|
||||
self.field_heading_exp = field_heading_exp
|
||||
def find_record_table_ranges(self):
|
||||
matches = []
|
||||
for (page_number, page) in enumerate(self.document):
|
||||
header_rects = page.search_for("Record Name:")
|
||||
for header_match_rect in header_rects:
|
||||
header_match_rect.x0 = header_match_rect.x1 # Start after match of "Record Name: "
|
||||
header_match_rect.x1 = page.bound().x1 # Extend to right side of page
|
||||
header_text = page.get_textbox(header_match_rect)
|
||||
record_name = re.sub(r'[^\w\s\n]*', '', header_text).strip()
|
||||
matches.append((record_name, {
|
||||
'page': page_number,
|
||||
'y': header_match_rect.y1 - 5, # Back up a hair to include header more reliably
|
||||
}))
|
||||
return matches
|
||||
|
||||
def find_records(self):
|
||||
record_ranges = self.find_record_table_ranges()
|
||||
for record_index, (record_name, record_details) in enumerate(record_ranges):
|
||||
current_rows = []
|
||||
next_index = record_index+1
|
||||
(_, next_record_details) = record_ranges[next_index] if next_index < len(record_ranges) else (None, {'page': self.document.page_count-1})
|
||||
for page_number in range(record_details['page'], next_record_details['page']):
|
||||
page = self.document[page_number]
|
||||
table_search_rect = page.bound()
|
||||
if page_number == record_details['page']:
|
||||
table_search_rect.y0 = record_details['y']
|
||||
tables = page.find_tables(
|
||||
clip = table_search_rect,
|
||||
min_words_horizontal = 1,
|
||||
min_words_vertical = 1,
|
||||
horizontal_strategy = "lines_strict",
|
||||
intersection_tolerance = 1,
|
||||
)
|
||||
for table in tables:
|
||||
if table.col_count == 4:
|
||||
table = table.extract()
|
||||
# Parse field position (sometimes a cell has multiple
|
||||
# values because IRS employees apparently smoke crack
|
||||
for row in table:
|
||||
first_column_lines = row[0].strip().split('\n')
|
||||
if len(first_column_lines) > 1:
|
||||
for sub_row in self.split_row(row):
|
||||
current_rows.append(strip_values(sub_row))
|
||||
else:
|
||||
current_rows.append(strip_values(row))
|
||||
consecutive_rows = self.filter_nonconsecutive_rows(current_rows)
|
||||
yield(record_name, consecutive_rows)
|
||||
|
||||
def split_row(self, row):
|
||||
if not row[1]:
|
||||
return []
|
||||
split_rows = list(itertools.zip_longest(*[x.strip().split('\n') for x in row[:3]], fillvalue=None))
|
||||
description = strip_values([row[3]])[0]
|
||||
rows = []
|
||||
for row in split_rows:
|
||||
if len(row) < 3 or not row[2]:
|
||||
row = self.infer_field_length(row)
|
||||
rows.append([*row, description])
|
||||
return rows
|
||||
|
||||
def infer_field_length(self, row):
|
||||
matches = PDFRecordFinder.field_range_expr.match(row[0])
|
||||
if not matches:
|
||||
return row
|
||||
(start, end) = ([int(x) for x in list(matches.groups()) if x] + [None])[:2]
|
||||
length = str(end-start+1) if end and start else '1'
|
||||
return (*row[:2], length)
|
||||
|
||||
def filter_nonconsecutive_rows(self, rows):
|
||||
consecutive_rows = []
|
||||
last_position = 0
|
||||
for row in rows:
|
||||
matches = PDFRecordFinder.field_range_expr.match(row[0])
|
||||
if not matches:
|
||||
continue
|
||||
(start, end) = ([int(x) for x in list(matches.groups()) if x] + [None])[:2]
|
||||
if start != last_position + 1:
|
||||
continue
|
||||
last_position = end if end else start
|
||||
consecutive_rows.append(row)
|
||||
return consecutive_rows
|
||||
|
||||
def records(self):
|
||||
headings = self.locate_heading_rows_by_field()
|
||||
|
||||
#for x in headings:
|
||||
# print x
|
||||
|
||||
for (start, end, name) in headings:
|
||||
name = name.decode('ascii', 'ignore')
|
||||
yield (name, list(self.find_fields(iter(self.textrows[start+1:end]))), (start+1, end))
|
||||
|
||||
|
||||
def locate_heading_rows_by_field(self):
|
||||
results = []
|
||||
record_break = []
|
||||
line_is_whitespace_exp = re.compile('^(\s*)$')
|
||||
record_begin_exp = self.heading_exp #re.compile('Record\ Name')
|
||||
|
||||
for (i, row) in enumerate(self.textrows):
|
||||
match = self.field_heading_exp.match(row)
|
||||
if match:
|
||||
# work backwards until we think the header is fully copied
|
||||
space_count_exp = re.compile('^(\s*)')
|
||||
position = i - 1
|
||||
spaces = 0
|
||||
#last_spaces = 10000
|
||||
complete = False
|
||||
header = None
|
||||
while not complete:
|
||||
line_is_whitespace = True if line_is_whitespace_exp.match(self.textrows[position]) else False
|
||||
is_record_begin = record_begin_exp.search(self.textrows[position])
|
||||
if is_record_begin or line_is_whitespace:
|
||||
header = self.textrows[position-1:i]
|
||||
complete = True
|
||||
position -= 1
|
||||
|
||||
name = ''.join(header).strip().decode('ascii','ignore')
|
||||
print((name, position))
|
||||
results.append((i, name, position))
|
||||
else:
|
||||
# See if this row forces us to break from field reading.
|
||||
if re.search('Record\ Layout', row):
|
||||
record_break.append(i)
|
||||
|
||||
merged = []
|
||||
for (a, b) in zip(results, results[1:] + [(len(self.textrows), None)]):
|
||||
end_pos = None
|
||||
|
||||
#print a[0], record_break[0], b[0]-1
|
||||
|
||||
while record_break and record_break[0] < a[0]:
|
||||
record_break = record_break[1:]
|
||||
|
||||
if record_break[0] < b[0]-1:
|
||||
end_pos = record_break[0]
|
||||
record_break = record_break[1:]
|
||||
else:
|
||||
end_pos = b[0]-1
|
||||
|
||||
merged.append( (a[0], end_pos-1, a[1]) )
|
||||
return merged
|
||||
|
||||
"""
|
||||
def locate_heading_rows(self):
|
||||
results = []
|
||||
for (i, row) in enumerate(self.textrows):
|
||||
match = self.heading_exp.match(row)
|
||||
if match:
|
||||
results.append((i, ''.join(match.groups())))
|
||||
|
||||
merged = []
|
||||
for (a, b) in zip(results, results[1:] + [(len(self.textrows),None)]):
|
||||
merged.append( (a[0], b[0]-1, a[1]) )
|
||||
|
||||
return merged
|
||||
|
||||
def locate_layout_block_rows(self):
|
||||
# Search for rows that contain "Record Layout", as these are not fields
|
||||
# we are interested in because they contain the crazy blocks of field definitions
|
||||
# and not the nice 4-column ones that we're looking for.
|
||||
|
||||
results = []
|
||||
for (i, row) in enumerate(self.textrows):
|
||||
match = re.match("Record Layout", row)
|
||||
|
||||
"""
|
||||
|
||||
def find_fields(self, row_iter):
|
||||
cc = ColumnCollector()
|
||||
blank_row_counter = 0
|
||||
|
||||
for r in row_iter:
|
||||
row = r.decode('UTF-8')
|
||||
#print row
|
||||
row_columns = self.extract_columns_from_row(row)
|
||||
|
||||
if not row_columns:
|
||||
if cc.data and len(list(cc.data.keys())) > 1 and len(row.strip()) > list(cc.data.keys())[-1]:
|
||||
yield cc
|
||||
cc = ColumnCollector()
|
||||
else:
|
||||
cc.empty_row()
|
||||
continue
|
||||
|
||||
try:
|
||||
cc.add(row_columns)
|
||||
|
||||
except IsNextField as e:
|
||||
yield cc
|
||||
cc = ColumnCollector()
|
||||
cc.add(row_columns)
|
||||
except UnknownColumn as e:
|
||||
raise StopIteration
|
||||
|
||||
yield cc
|
||||
|
||||
|
||||
def extract_columns_from_row(self, row):
|
||||
re_multiwhite = re.compile(r'\s{2,}')
|
||||
|
||||
# IF LINE DOESN'T CONTAIN MULTIPLE WHITESPACES, IT'S LIKELY NOT A TABLE
|
||||
if not re_multiwhite.search(row):
|
||||
return None
|
||||
|
||||
white_ranges = [0,]
|
||||
pos = 0
|
||||
while pos < len(row):
|
||||
match = re_multiwhite.search(row[pos:])
|
||||
if match:
|
||||
white_ranges.append(pos + match.start())
|
||||
white_ranges.append(pos + match.end())
|
||||
pos += match.end()
|
||||
else:
|
||||
white_ranges.append(len(row))
|
||||
pos = len(row)
|
||||
|
||||
row_result = []
|
||||
white_iter = iter(white_ranges)
|
||||
while white_iter:
|
||||
try:
|
||||
start = next(white_iter)
|
||||
end = next(white_iter)
|
||||
if start != end:
|
||||
row_result.append(
|
||||
(start, row[start:end].encode('ascii','ignore'))
|
||||
)
|
||||
|
||||
except StopIteration:
|
||||
white_iter = None
|
||||
|
||||
#print row_result
|
||||
return row_result
|
||||
|
||||
|
||||
class UnknownColumn(Exception):
|
||||
pass
|
||||
|
||||
class IsNextField(Exception):
|
||||
pass
|
||||
|
||||
class ColumnCollector(object):
|
||||
def __init__(self, initial=None):
|
||||
self.data = None
|
||||
self.column_widths = None
|
||||
self.max_data_length = 0
|
||||
self.adjust_pad = 3
|
||||
self.empty_rows = 0
|
||||
pass
|
||||
|
||||
def __repr__(self):
|
||||
return "<%s: %s>" % (
|
||||
self.__class__.__name__,
|
||||
[x if len(x) < 25 else x[:25] + '..' for x in list(self.data.values()) if self.data else ''])
|
||||
|
||||
def add(self, data):
|
||||
#if self.empty_rows > 2:
|
||||
# raise IsNextField()
|
||||
|
||||
if not self.data:
|
||||
self.data = dict(data)
|
||||
else:
|
||||
data = self.adjust_columns(data)
|
||||
if self.is_next_field(data):
|
||||
raise IsNextField()
|
||||
for col_id, value in data:
|
||||
self.merge_column(col_id, value)
|
||||
|
||||
self.update_column_widths(data)
|
||||
|
||||
def empty_row(self):
|
||||
self.empty_rows += 1
|
||||
|
||||
def update_column_widths(self, data):
|
||||
self.last_data_length = len(data)
|
||||
self.max_data_length = max(self.max_data_length, len(data))
|
||||
|
||||
if not self.column_widths:
|
||||
self.column_widths = dict([[column_value[0], column_value[0] + len(column_value[1])] for column_value in data])
|
||||
else:
|
||||
for col_id, value in data:
|
||||
try:
|
||||
self.column_widths[col_id] = max(self.column_widths[col_id], col_id + len(value.strip()))
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
def add_old(self, data):
|
||||
if not self.data:
|
||||
self.data = dict(data)
|
||||
else:
|
||||
if self.is_next_field(data):
|
||||
raise IsNextField()
|
||||
for col_id, value in data:
|
||||
self.merge_column(col_id, value)
|
||||
|
||||
|
||||
def adjust_columns(self, data):
|
||||
adjusted_data = {}
|
||||
for col_id, value in data:
|
||||
if col_id in list(self.data.keys()):
|
||||
adjusted_data[col_id] = value.strip()
|
||||
else:
|
||||
for col_start, col_end in list(self.column_widths.items()):
|
||||
if (col_start - self.adjust_pad) <= col_id and (col_end + self.adjust_pad) >= col_id:
|
||||
if col_start in adjusted_data:
|
||||
adjusted_data[col_start] += ' ' + value.strip()
|
||||
else:
|
||||
adjusted_data[col_start] = value.strip()
|
||||
|
||||
return list(adjusted_data.items())
|
||||
|
||||
|
||||
def merge_column(self, col_id, value):
|
||||
if col_id in list(self.data.keys()):
|
||||
self.data[col_id] += ' ' + value.strip()
|
||||
else:
|
||||
# try adding a wiggle room value?
|
||||
# FIXME:
|
||||
# Sometimes description columns contain column-like
|
||||
# layouts, and this causes the ColumnCollector to become
|
||||
# confused. Perhaps we could check to see if a column occurs
|
||||
# after the maximum column, and assume it's part of the
|
||||
# max column?
|
||||
|
||||
"""
|
||||
for col_start, col_end in self.column_widths.items():
|
||||
if col_start <= col_id and (col_end) >= col_id:
|
||||
self.data[col_start] += ' ' + value.strip()
|
||||
return
|
||||
"""
|
||||
raise UnknownColumn
|
||||
|
||||
def is_next_field(self, data):
|
||||
"""
|
||||
If the first key value contains a string
|
||||
and we already have some data in the record,
|
||||
then this row is probably the beginning of
|
||||
the next field. Raise an exception and continue
|
||||
on with a fresh ColumnCollector.
|
||||
"""
|
||||
|
||||
""" If the length of the value in column_id is less than the position of the next column_id,
|
||||
then this is probably a continuation.
|
||||
"""
|
||||
|
||||
if self.data and data:
|
||||
keys = list(dict(self.column_widths).keys())
|
||||
keys.sort()
|
||||
keys += [None]
|
||||
|
||||
if self.last_data_length < len(data):
|
||||
return True
|
||||
|
||||
first_key, first_value = list(dict(data).items())[0]
|
||||
if list(self.data.keys())[0] == first_key:
|
||||
|
||||
position = keys.index(first_key)
|
||||
max_length = keys[position + 1]
|
||||
if max_length:
|
||||
return len(first_value) > max_length or len(data) == self.max_data_length
|
||||
|
||||
return False
|
||||
|
||||
|
||||
@property
|
||||
def tuple(self):
|
||||
#try:
|
||||
if self.data:
|
||||
return tuple([self.data[k] for k in sorted(self.data.keys())])
|
||||
return ()
|
||||
#except:
|
||||
# import pdb
|
||||
# pdb.set_trace()
|
||||
|
||||
return self.find_records()
|
||||
|
|
1
requirements.txt
Normal file
1
requirements.txt
Normal file
|
@ -0,0 +1 @@
|
|||
PyMuPDF==1.24.0
|
|
@ -1,4 +1,4 @@
|
|||
#!/usr/bin/python
|
||||
#!/usr/bin/env python
|
||||
from pyaccuwage.parser import RecordBuilder
|
||||
from pyaccuwage.pdfextract import PDFRecordFinder
|
||||
import argparse
|
||||
|
@ -29,48 +29,9 @@ doc = PDFRecordFinder(source_file)
|
|||
records = doc.records()
|
||||
builder = RecordBuilder()
|
||||
|
||||
def record_begins_at(field):
|
||||
return int(fields[0].data.values()[0].split('-')[0], 10)
|
||||
|
||||
def record_ends_at(fields):
|
||||
return int(fields[-1].data.values()[0].split('-')[-1], 10)
|
||||
|
||||
last_record_begins_at = -1
|
||||
last_record_ends_at = -1
|
||||
|
||||
for rec in records:
|
||||
#if not rec[1]:
|
||||
# continue # no actual fields detected
|
||||
fields = rec[1]
|
||||
|
||||
# strip out fields that are not 4 items long
|
||||
fields = filter(lambda x:len(x.tuple) == 4, fields)
|
||||
|
||||
# strip fields that don't begin at position 0
|
||||
fields = filter(lambda x: 0 in x.data, fields)
|
||||
|
||||
# strip fields that don't have a length-range type item in position 0
|
||||
fields = filter(lambda x: re.match('^\d+[-]?\d*$', x.data[0]), fields)
|
||||
|
||||
if not fields:
|
||||
continue
|
||||
|
||||
begins_at = record_begins_at(fields)
|
||||
ends_at = record_ends_at(fields)
|
||||
|
||||
# FIXME record_ends_at is randomly exploding due to record data being
|
||||
# a lump of text and not necessarily a field entry. I assume
|
||||
# this is cleaned out by the record builder class.
|
||||
|
||||
#print last_record_ends_at + 1, begins_at
|
||||
if last_record_ends_at + 1 != begins_at:
|
||||
name = re.sub('^[^a-zA-Z]*','',rec[0].split(':')[-1])
|
||||
name = re.sub('[^\w]*', '', name)
|
||||
for (name, fields) in records:
|
||||
name = re.sub(r'^[^a-zA-Z]*','', name.split(':')[-1])
|
||||
name = re.sub(r'[^\w]*', '', name)
|
||||
sys.stdout.write("\nclass %s(pyaccuwagemodel.Model):\n" % name)
|
||||
|
||||
for field in builder.load(map(lambda x:x.tuple, rec[1][0:])):
|
||||
for field in builder.load(map(lambda x: x, fields[0:])):
|
||||
sys.stdout.write('\t' + field + '\n')
|
||||
#print field
|
||||
|
||||
last_record_ends_at = ends_at
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue