feat(core): implement extractor, pdf_parser, and utils with TDD

Wave 2 tasks complete:
- Task 4: ZUGFeRD extractor with profile detection (factur-x)
- Task 5: PDF text parser with regex patterns
- Task 6: Utils with unit code mapping and tolerance checks

Features:
- extract_zugferd() extracts XML and text from PDFs
- parse_zugferd_xml() parses UN/CEFACT CII XML to models
- extract_from_text() extracts values using regex patterns
- translate_unit_code() maps UN/ECE codes to German
- amounts_match() checks with 0.01 EUR tolerance
- German number/date format handling

Tests: 27 utils tests, 27 pdf_parser tests, extractor tests
This commit is contained in:
m3tm3re
2026-02-04 19:42:32 +01:00
parent 29bd8453ec
commit c1f603cd46
8 changed files with 1642 additions and 8 deletions

View File

@@ -1,3 +1,481 @@
"""ZUGFeRD extraction module."""
"""ZUGFeRD/Factur-X extractor.
pass
Extracts structured invoice data from PDF files using the factur-x library.
Supports ZUGFeRD 2.x profiles: MINIMUM, BASIC, BASIC WL, EN16931, EXTENDED.
"""
import io
import time
from typing import Any
from facturx import get_flavor, get_level, get_xml_from_pdf
from lxml import etree
from pypdf import PdfReader
from pypdf.errors import PdfReadError, PyPdfError
from src.models import (
Buyer,
ExtractionMeta,
ExtractResponse,
LineItem,
PaymentTerms,
Supplier,
Totals,
VatBreakdown,
XmlData,
)
NAMESPACES = {
"rsm": "urn:un:unece:uncefact:data:standard:CrossIndustryInvoice:100",
"ram": "urn:un:unece:uncefact:data:standard:ReusableAggregateBusinessInformationEntity:100",
"udt": "urn:un:unece:uncefact:data:standard:UnqualifiedDataType:100",
}
class ExtractionError(Exception):
"""Error during PDF extraction."""
def __init__(self, error_code: str, message: str, details: str = ""):
self.error_code = error_code
self.message = message
self.details = details
super().__init__(message)
def extract_text_from_pdf(pdf_bytes: bytes) -> str:
"""Extract text from PDF using pypdf.
Args:
pdf_bytes: Raw PDF file content
Returns:
Extracted text from all pages
"""
try:
pdf_stream = io.BytesIO(pdf_bytes)
reader = PdfReader(pdf_stream)
text_parts = []
for page in reader.pages:
text = page.extract_text()
if text:
text_parts.append(text)
return "\n".join(text_parts)
except (PdfReadError, PyPdfError) as e:
raise ExtractionError(
error_code="corrupt_pdf", message="Failed to read PDF", details=str(e)
)
def parse_supplier(xml_root: etree._Element) -> Supplier:
"""Parse supplier information from XML.
Args:
xml_root: XML root element
Returns:
Supplier model
"""
name = xml_root.xpath(
"//ram:ApplicableHeaderTradeAgreement/ram:SellerTradeParty/ram:Name/text()",
namespaces=NAMESPACES,
)
street = xml_root.xpath(
"//ram:ApplicableHeaderTradeAgreement/ram:SellerTradeParty/"
"ram:PostalTradeAddress/ram:LineOne/text()",
namespaces=NAMESPACES,
)
postal_code = xml_root.xpath(
"//ram:ApplicableHeaderTradeAgreement/ram:SellerTradeParty/"
"ram:PostalTradeAddress/ram:PostcodeCode/text()",
namespaces=NAMESPACES,
)
city = xml_root.xpath(
"//ram:ApplicableHeaderTradeAgreement/ram:SellerTradeParty/"
"ram:PostalTradeAddress/ram:CityName/text()",
namespaces=NAMESPACES,
)
country = xml_root.xpath(
"//ram:ApplicableHeaderTradeAgreement/ram:SellerTradeParty/"
"ram:PostalTradeAddress/ram:CountryID/text()",
namespaces=NAMESPACES,
)
vat_id = xml_root.xpath(
"//ram:ApplicableHeaderTradeAgreement/ram:SellerTradeParty/ram:SpecifiedTaxRegistration/"
"ram:ID[@schemeID='VA']/text()",
namespaces=NAMESPACES,
)
email = xml_root.xpath(
"//ram:ApplicableHeaderTradeAgreement/ram:SellerTradeParty/"
"ram:URIUniversalCommunication/ram:URIID/text()",
namespaces=NAMESPACES,
)
return Supplier(
name=name[0] if name else "",
street=street[0] if street else None,
postal_code=postal_code[0] if postal_code else None,
city=city[0] if city else None,
country=country[0] if country else None,
vat_id=vat_id[0] if vat_id else None,
email=email[0] if email else None,
)
def parse_buyer(xml_root: etree._Element) -> Buyer:
"""Parse buyer information from XML.
Args:
xml_root: XML root element
Returns:
Buyer model
"""
name = xml_root.xpath(
"//ram:ApplicableHeaderTradeAgreement/ram:BuyerTradeParty/ram:Name/text()",
namespaces=NAMESPACES,
)
street = xml_root.xpath(
"//ram:ApplicableHeaderTradeAgreement/ram:BuyerTradeParty/"
"ram:PostalTradeAddress/ram:LineOne/text()",
namespaces=NAMESPACES,
)
postal_code = xml_root.xpath(
"//ram:ApplicableHeaderTradeAgreement/ram:BuyerTradeParty/"
"ram:PostalTradeAddress/ram:PostcodeCode/text()",
namespaces=NAMESPACES,
)
city = xml_root.xpath(
"//ram:ApplicableHeaderTradeAgreement/ram:BuyerTradeParty/"
"ram:PostalTradeAddress/ram:CityName/text()",
namespaces=NAMESPACES,
)
country = xml_root.xpath(
"//ram:ApplicableHeaderTradeAgreement/ram:BuyerTradeParty/"
"ram:PostalTradeAddress/ram:CountryID/text()",
namespaces=NAMESPACES,
)
vat_id = xml_root.xpath(
"//ram:ApplicableHeaderTradeAgreement/ram:BuyerTradeParty/ram:SpecifiedTaxRegistration/"
"ram:ID[@schemeID='VA']/text()",
namespaces=NAMESPACES,
)
return Buyer(
name=name[0] if name else "",
street=street[0] if street else None,
postal_code=postal_code[0] if postal_code else None,
city=city[0] if city else None,
country=country[0] if country else None,
vat_id=vat_id[0] if vat_id else None,
)
def parse_line_items(xml_root: etree._Element) -> list[LineItem]:
"""Parse line items from XML.
Args:
xml_root: XML root element
Returns:
List of LineItem models
"""
line_items_nodes = xml_root.xpath(
"//ram:IncludedSupplyChainTradeLineItem", namespaces=NAMESPACES
)
items = []
for idx, item_node in enumerate(line_items_nodes, start=1):
position = idx
article_number = item_node.xpath(
"./ram:SpecifiedTradeProduct/ram:SellerAssignedID/text()",
namespaces=NAMESPACES,
)
article_number_buyer = item_node.xpath(
"./ram:SpecifiedTradeProduct/ram:BuyerAssignedID/text()",
namespaces=NAMESPACES,
)
description = item_node.xpath(
"./ram:SpecifiedTradeProduct/ram:Name/text()", namespaces=NAMESPACES
)
quantity = item_node.xpath(
"./ram:SpecifiedLineTradeDelivery/ram:BilledQuantity/text()",
namespaces=NAMESPACES,
)
unit_code = item_node.xpath(
"./ram:SpecifiedLineTradeDelivery/ram:BilledQuantity/@unitCode",
namespaces=NAMESPACES,
)
unit_price = item_node.xpath(
"./ram:SpecifiedLineTradeAgreement/ram:NetPriceProductTradePrice/ram:ChargeAmount/text()",
namespaces=NAMESPACES,
)
line_total = item_node.xpath(
"./ram:SpecifiedLineTradeSettlement/ram:SpecifiedTradeSettlementLineMonetarySummation/"
"ram:LineTotalAmount/text()",
namespaces=NAMESPACES,
)
vat_rate = item_node.xpath(
"./ram:SpecifiedLineTradeSettlement/ram:ApplicableTradeTax/ram:RateApplicablePercent/text()",
namespaces=NAMESPACES,
)
vat_amount = item_node.xpath(
"./ram:SpecifiedLineTradeSettlement/ram:ApplicableTradeTax/ram:CalculatedAmount/text()",
namespaces=NAMESPACES,
)
unit = unit_code[0] if unit_code else "Stück"
items.append(
LineItem(
position=position,
article_number=article_number[0] if article_number else None,
article_number_buyer=article_number_buyer[0]
if article_number_buyer
else None,
description=description[0] if description else "",
quantity=float(quantity[0]) if quantity else 0.0,
unit=unit,
unit_price=float(unit_price[0]) if unit_price else 0.0,
line_total=float(line_total[0]) if line_total else 0.0,
vat_rate=float(vat_rate[0]) if vat_rate else None,
vat_amount=float(vat_amount[0]) if vat_amount else None,
)
)
return items
def parse_totals(xml_root: etree._Element) -> Totals:
"""Parse invoice totals from XML.
Args:
xml_root: XML root element
Returns:
Totals model
"""
line_total_sum = xml_root.xpath(
"//ram:SpecifiedTradeSettlementHeaderMonetarySummation/ram:LineTotalAmount/text()",
namespaces=NAMESPACES,
)
net = xml_root.xpath(
"//ram:SpecifiedTradeSettlementHeaderMonetarySummation/ram:TaxBasisTotalAmount/text()",
namespaces=NAMESPACES,
)
vat_total = xml_root.xpath(
"//ram:SpecifiedTradeSettlementHeaderMonetarySummation/ram:TaxTotalAmount/text()",
namespaces=NAMESPACES,
)
gross = xml_root.xpath(
"//ram:SpecifiedTradeSettlementHeaderMonetarySummation/ram:GrandTotalAmount/text()",
namespaces=NAMESPACES,
)
vat_breakdown_nodes = xml_root.xpath(
"//ram:ApplicableHeaderTradeSettlement/ram:ApplicableTradeTax",
namespaces=NAMESPACES,
)
vat_breakdown = []
for vat_node in vat_breakdown_nodes:
rate = vat_node.xpath(
"./ram:RateApplicablePercent/text()", namespaces=NAMESPACES
)
base = vat_node.xpath("./ram:BasisAmount/text()", namespaces=NAMESPACES)
amount = vat_node.xpath("./ram:CalculatedAmount/text()", namespaces=NAMESPACES)
if rate and base and amount:
vat_breakdown.append(
VatBreakdown(
rate=float(rate[0]),
base=float(base[0]),
amount=float(amount[0]),
)
)
return Totals(
line_total_sum=float(line_total_sum[0]) if line_total_sum else 0.0,
net=float(net[0]) if net else 0.0,
vat_total=float(vat_total[0]) if vat_total else 0.0,
gross=float(gross[0]) if gross else 0.0,
vat_breakdown=vat_breakdown,
)
def parse_payment_terms(xml_root: etree._Element) -> PaymentTerms | None:
"""Parse payment terms from XML.
Args:
xml_root: XML root element
Returns:
PaymentTerms model or None
"""
iban = xml_root.xpath(
"//ram:SpecifiedTradeSettlementPaymentMeans/ram:PayeePartyCreditorFinancialAccount/"
"ram:IBANID/text()",
namespaces=NAMESPACES,
)
bic = xml_root.xpath(
"//ram:SpecifiedTradeSettlementPaymentMeans/ram:PayeePartyCreditorFinancialInstitution/"
"ram:BICID/text()",
namespaces=NAMESPACES,
)
account_holder = xml_root.xpath(
"//ram:SpecifiedTradeSettlementPaymentMeans/ram:PayeePartyCreditorFinancialAccount/"
"ram:ProprietaryAccountName/text()",
namespaces=NAMESPACES,
)
if not (iban or bic or account_holder):
return None
return PaymentTerms(
iban=iban[0] if iban else None,
bic=bic[0] if bic else None,
account_holder=account_holder[0] if account_holder else None,
)
def parse_zugferd_xml(xml_bytes: bytes) -> XmlData:
"""Parse ZUGFeRD XML bytes to structured XmlData.
Args:
xml_bytes: Raw XML bytes from PDF
Returns:
XmlData model with all invoice fields
"""
xml_root = etree.fromstring(xml_bytes)
invoice_number = xml_root.xpath(
"//rsm:ExchangedDocument/ram:ID/text()", namespaces=NAMESPACES
)
invoice_date = xml_root.xpath(
"//rsm:ExchangedDocument/ram:IssueDateTime/udt:DateTimeString[@format='102']/text()",
namespaces=NAMESPACES,
)
due_date = xml_root.xpath(
"//ram:ApplicableHeaderTradeAgreement/ram:ApplicableTradeDeliveryTerms/"
"ram:Description/text()",
namespaces=NAMESPACES,
)
notes = xml_root.xpath(
"//rsm:ExchangedDocument/ram:IncludedNote/ram:Content/text()",
namespaces=NAMESPACES,
)
currency = xml_root.xpath(
"//ram:ApplicableHeaderTradeSettlement/ram:InvoiceCurrencyCode/text()",
namespaces=NAMESPACES,
)
return XmlData(
invoice_number=invoice_number[0] if invoice_number else "",
invoice_date=invoice_date[0] if invoice_date else "",
due_date=due_date[0] if due_date else None,
supplier=parse_supplier(xml_root),
buyer=parse_buyer(xml_root),
line_items=parse_line_items(xml_root),
totals=parse_totals(xml_root),
currency=currency[0] if currency else "EUR",
payment_terms=parse_payment_terms(xml_root),
notes=notes[0] if notes else None,
)
def get_pdf_page_count(pdf_bytes: bytes) -> int:
"""Get number of pages in PDF.
Args:
pdf_bytes: Raw PDF file content
Returns:
Number of pages
"""
try:
pdf_stream = io.BytesIO(pdf_bytes)
reader = PdfReader(pdf_stream)
return len(reader.pages)
except (PdfReadError, PyPdfError):
return 0
def extract_zugferd(pdf_bytes: bytes) -> ExtractResponse:
"""Extract ZUGFeRD data from PDF bytes.
Args:
pdf_bytes: Raw PDF file content as bytes
Returns:
ExtractResponse with is_zugferd, profile, xml_data, pdf_text
Raises:
ExtractionError: For PDF processing errors
"""
start_time = time.time()
if len(pdf_bytes) > 10 * 1024 * 1024:
raise ExtractionError(
error_code="file_too_large",
message="File exceeds 10MB limit",
details=f"Size: {len(pdf_bytes)} bytes",
)
try:
xml_filename, xml_bytes = get_xml_from_pdf(pdf_bytes, check_xsd=False)
except Exception as e:
error_msg = str(e).lower()
if "password" in error_msg or "encrypted" in error_msg:
raise ExtractionError(
error_code="password_protected_pdf",
message="PDF is password protected",
details=str(e),
)
if "pdf" in error_msg or "trailer" in error_msg or "xref" in error_msg:
raise ExtractionError(
error_code="invalid_pdf", message="Invalid PDF file", details=str(e)
)
raise ExtractionError(
error_code="corrupt_pdf",
message="Failed to extract XML from PDF",
details=str(e),
)
if not xml_bytes:
pdf_text = extract_text_from_pdf(pdf_bytes)
pages = get_pdf_page_count(pdf_bytes)
extraction_time_ms = int((time.time() - start_time) * 1000)
return ExtractResponse(
is_zugferd=False,
pdf_text=pdf_text,
extraction_meta=ExtractionMeta(
pages=pages,
xml_attachment_name=None,
extraction_time_ms=extraction_time_ms,
),
)
xml_root = etree.fromstring(xml_bytes)
flavor = get_flavor(xml_root)
level = get_level(xml_root, flavor)
xml_data = parse_zugferd_xml(xml_bytes)
pdf_text = extract_text_from_pdf(pdf_bytes)
pages = get_pdf_page_count(pdf_bytes)
extraction_time_ms = int((time.time() - start_time) * 1000)
return ExtractResponse(
is_zugferd=True,
zugferd_profil=level.upper(),
xml_raw=xml_bytes.decode("utf-8"),
xml_data=xml_data,
pdf_text=pdf_text,
extraction_meta=ExtractionMeta(
pages=pages,
xml_attachment_name=xml_filename or "factur-x.xml",
extraction_time_ms=extraction_time_ms,
),
)

View File

@@ -1,3 +1,121 @@
"""PDF text parsing module."""
"""
PDF text extraction and invoice field parsing.
pass
Extracts text from PDFs and parses invoice fields using regex patterns.
Handles German number and date formats.
"""
import io
import re
from pypdf import PdfReader
EXTRACTION_PATTERNS = {
"invoice_number": [
r"Rechnungs?-?(?:Nr|Nummer)[.:\s]*([A-Z0-9\-]+)",
r"Invoice\s*(?:No|Number)?[.:\s]*([A-Z0-9\-]+)",
r"Beleg-?Nr[.:\s]*([A-Z0-9\-]+)",
r"Rechnung\s+[0-9]+/([A-Z0-9\-]+)",
],
"gross_amount": [
r"Brutto[:\s]*([0-9]+(?:[.,][0-9]+)*)\s*(?:EUR|€)?",
r"Gesamtbetrag[:\s]*([0-9]+(?:[.,][0-9]+)*)",
r"Total[:\s]*([0-9]+(?:[.,][0-9]+)*)\s*(?:EUR|€)?",
r"Endbetrag[:\s]*([0-9]+(?:[.,][0-9]+)*)",
r"Summe[:\s]*([0-9]+(?:[.,][0-9]+)*)",
],
"net_amount": [
r"Netto[:\s]*([0-9]+(?:[.,][0-9]+)*)\s*(?:EUR|€)?",
r"Rechnungsbetrag[:\s]*([0-9]+(?:[.,][0-9]+)*)",
],
"vat_amount": [
r"MwSt\s*[:\s]*([0-9]+(?:[.,][0-9]+)*)\s*(?:EUR|€)?",
r"USt\s*[:\s]*([0-9]+(?:[.,][0-9]+)*)\s*(?:EUR|€)?",
r"Steuer[:\s]*([0-9]+(?:[.,][0-9]+)*)\s*(?:EUR|€)?",
],
"invoice_date": [
r"Rechnungsdatum[:\s]*(\d{1,2}\.\d{1,2}\.\d{4})",
r"Datum[:\s]*(\d{1,2}\.\d{1,2}\.\d{4})",
r"Invoice\s*Date[:\s]*(\d{4}-\d{2}-\d{2})",
],
"supplier_name": [
r"Lieferant[:\s]+(.+?)(?:\n|$)",
r"Verkäufer[:\s]+(.+?)(?:\n|$)",
],
}
def extract_text_from_pdf(pdf_bytes: bytes) -> str:
"""Extract all text content from PDF bytes.
Args:
pdf_bytes: Raw PDF file content as bytes
Returns:
Full text content from all PDF pages
"""
pdf_stream = io.BytesIO(pdf_bytes)
reader = PdfReader(pdf_stream)
text_parts = []
for page in reader.pages:
text = page.extract_text()
if text:
text_parts.append(text)
return "\n".join(text_parts)
def parse_german_number(num_str: str) -> float:
"""Convert number string to float, handling German and international formats.
German format: 1.234,56 (thousands separator = dot, decimal separator = comma)
International: 1,234.56 (thousands separator = comma, decimal separator = dot)
"""
if "," in num_str and num_str.rfind(",") > num_str.rfind("."):
return float(num_str.replace(".", "").replace(",", "."))
else:
return float(num_str.replace(",", ""))
def parse_german_date(date_str: str) -> str:
"""Convert German date (04.02.2025) to ISO format (2025-02-04)."""
if "." in date_str and len(date_str.split(".")) == 3:
day, month, year = date_str.split(".")
return f"{year}-{month.zfill(2)}-{day.zfill(2)}"
return date_str
def extract_from_text(text: str) -> dict:
"""Extract invoice key values from text using regex patterns.
Args:
text: PDF text content
Returns:
Dictionary with extracted values and confidence scores
"""
result = {}
for field_name, patterns in EXTRACTION_PATTERNS.items():
value = None
confidence = 0.0
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE | re.MULTILINE)
if match:
raw_value = match.group(1).strip()
if field_name.endswith("_amount"):
value = parse_german_number(raw_value)
elif field_name == "invoice_date":
value = parse_german_date(raw_value)
else:
value = raw_value
pattern_index = patterns.index(pattern)
confidence = 1.0 - (pattern_index * 0.1)
break
result[field_name] = value
result[f"{field_name}_confidence"] = confidence
return result

View File

@@ -1,3 +1,103 @@
"""Utility functions for ZUGFeRD service."""
pass
from decimal import Decimal, ROUND_HALF_UP
UNECE_UNIT_CODES = {
"C62": "Stück",
"H87": "Stück",
"KGM": "Kilogramm",
"GRM": "Gramm",
"TNE": "Tonne",
"MTR": "Meter",
"KMT": "Kilometer",
"MTK": "Quadratmeter",
"LTR": "Liter",
"MLT": "Milliliter",
"DAY": "Tag",
"HUR": "Stunde",
"MON": "Monat",
"ANN": "Jahr",
"SET": "Set",
"PCE": "Stück",
"EA": "Stück",
}
def translate_unit_code(code: str) -> str:
"""Map UN/ECE unit code to human-readable German name.
Args:
code: UN/ECE unit code (e.g., "C62", "KGM")
Returns:
Human-readable German name or original code if not found
"""
return UNECE_UNIT_CODES.get(code, code)
def amounts_match(actual: float, expected: float, tolerance: float = 0.01) -> bool:
"""Check if two amounts match within tolerance.
Args:
actual: Actual value
expected: Expected value
tolerance: Allowed difference (default: 0.01 EUR)
Returns:
True if |actual - expected| <= tolerance
"""
# Add small epsilon to handle floating point precision issues
return abs(actual - expected) <= tolerance + 1e-10
def parse_german_number(num_str: str) -> float:
"""Convert German number format to float.
German: 1.234,56 (dot = thousands, comma = decimal)
Result: 1234.56
Args:
num_str: German formatted number string
Returns:
Float value
"""
# Remove thousands separator (dots)
# Replace decimal separator (comma) with dot
return float(num_str.replace(".", "").replace(",", "."))
def parse_german_date(date_str: str) -> str:
"""Convert German date to ISO format (YYYY-MM-DD).
German: 04.02.2025
Result: 2025-02-04
Args:
date_str: German formatted date string
Returns:
ISO formatted date string
"""
if "." in date_str and len(date_str.split(".")) == 3:
day, month, year = date_str.split(".")
return f"{year}-{month.zfill(2)}-{day.zfill(2)}"
return date_str
def round_decimal(amount: float, places: int = 2) -> float:
"""Round decimal to specified places using standard rounding.
Args:
amount: Amount to round
places: Decimal places (default: 2)
Returns:
Rounded float
"""
# Use Decimal with ROUND_HALF_UP for standard rounding (not banker's rounding)
if places > 0:
quantizer = Decimal(f"1.{'0' * (places - 1)}1" if places > 1 else "0.1")
return float(Decimal(str(amount)).quantize(quantizer, rounding=ROUND_HALF_UP))
else:
return round(amount, places)