"""ZUGFeRD/Factur-X extractor. Extracts structured invoice data from PDF files using the factur-x library. Supports ZUGFeRD 2.x profiles: MINIMUM, BASIC, BASIC WL, EN16931, EXTENDED. """ import io import time from typing import Any from facturx import get_flavor, get_level, get_xml_from_pdf from lxml import etree from pypdf import PdfReader from pypdf.errors import PdfReadError, PyPdfError from src.models import ( Buyer, ExtractionMeta, ExtractResponse, LineItem, PaymentTerms, Supplier, Totals, VatBreakdown, XmlData, ) NAMESPACES = { "rsm": "urn:un:unece:uncefact:data:standard:CrossIndustryInvoice:100", "ram": "urn:un:unece:uncefact:data:standard:ReusableAggregateBusinessInformationEntity:100", "udt": "urn:un:unece:uncefact:data:standard:UnqualifiedDataType:100", } class ExtractionError(Exception): """Error during PDF extraction.""" def __init__(self, error_code: str, message: str, details: str = ""): self.error_code = error_code self.message = message self.details = details super().__init__(message) def extract_text_from_pdf(pdf_bytes: bytes) -> str: """Extract text from PDF using pypdf. Args: pdf_bytes: Raw PDF file content Returns: Extracted text from all pages """ try: pdf_stream = io.BytesIO(pdf_bytes) reader = PdfReader(pdf_stream) text_parts = [] for page in reader.pages: text = page.extract_text() if text: text_parts.append(text) return "\n".join(text_parts) except (PdfReadError, PyPdfError) as e: raise ExtractionError( error_code="corrupt_pdf", message="Failed to read PDF", details=str(e) ) def parse_supplier(xml_root: etree._Element) -> Supplier: """Parse supplier information from XML. Args: xml_root: XML root element Returns: Supplier model """ name = xml_root.xpath( "//ram:ApplicableHeaderTradeAgreement/ram:SellerTradeParty/ram:Name/text()", namespaces=NAMESPACES, ) street = xml_root.xpath( "//ram:ApplicableHeaderTradeAgreement/ram:SellerTradeParty/" "ram:PostalTradeAddress/ram:LineOne/text()", namespaces=NAMESPACES, ) postal_code = xml_root.xpath( "//ram:ApplicableHeaderTradeAgreement/ram:SellerTradeParty/" "ram:PostalTradeAddress/ram:PostcodeCode/text()", namespaces=NAMESPACES, ) city = xml_root.xpath( "//ram:ApplicableHeaderTradeAgreement/ram:SellerTradeParty/" "ram:PostalTradeAddress/ram:CityName/text()", namespaces=NAMESPACES, ) country = xml_root.xpath( "//ram:ApplicableHeaderTradeAgreement/ram:SellerTradeParty/" "ram:PostalTradeAddress/ram:CountryID/text()", namespaces=NAMESPACES, ) vat_id = xml_root.xpath( "//ram:ApplicableHeaderTradeAgreement/ram:SellerTradeParty/ram:SpecifiedTaxRegistration/" "ram:ID[@schemeID='VA']/text()", namespaces=NAMESPACES, ) email = xml_root.xpath( "//ram:ApplicableHeaderTradeAgreement/ram:SellerTradeParty/" "ram:URIUniversalCommunication/ram:URIID/text()", namespaces=NAMESPACES, ) return Supplier( name=name[0] if name else "", street=street[0] if street else None, postal_code=postal_code[0] if postal_code else None, city=city[0] if city else None, country=country[0] if country else None, vat_id=vat_id[0] if vat_id else None, email=email[0] if email else None, ) def parse_buyer(xml_root: etree._Element) -> Buyer: """Parse buyer information from XML. Args: xml_root: XML root element Returns: Buyer model """ name = xml_root.xpath( "//ram:ApplicableHeaderTradeAgreement/ram:BuyerTradeParty/ram:Name/text()", namespaces=NAMESPACES, ) street = xml_root.xpath( "//ram:ApplicableHeaderTradeAgreement/ram:BuyerTradeParty/" "ram:PostalTradeAddress/ram:LineOne/text()", namespaces=NAMESPACES, ) postal_code = xml_root.xpath( "//ram:ApplicableHeaderTradeAgreement/ram:BuyerTradeParty/" "ram:PostalTradeAddress/ram:PostcodeCode/text()", namespaces=NAMESPACES, ) city = xml_root.xpath( "//ram:ApplicableHeaderTradeAgreement/ram:BuyerTradeParty/" "ram:PostalTradeAddress/ram:CityName/text()", namespaces=NAMESPACES, ) country = xml_root.xpath( "//ram:ApplicableHeaderTradeAgreement/ram:BuyerTradeParty/" "ram:PostalTradeAddress/ram:CountryID/text()", namespaces=NAMESPACES, ) vat_id = xml_root.xpath( "//ram:ApplicableHeaderTradeAgreement/ram:BuyerTradeParty/ram:SpecifiedTaxRegistration/" "ram:ID[@schemeID='VA']/text()", namespaces=NAMESPACES, ) return Buyer( name=name[0] if name else "", street=street[0] if street else None, postal_code=postal_code[0] if postal_code else None, city=city[0] if city else None, country=country[0] if country else None, vat_id=vat_id[0] if vat_id else None, ) def parse_line_items(xml_root: etree._Element) -> list[LineItem]: """Parse line items from XML. Args: xml_root: XML root element Returns: List of LineItem models """ line_items_nodes = xml_root.xpath( "//ram:IncludedSupplyChainTradeLineItem", namespaces=NAMESPACES ) items = [] for idx, item_node in enumerate(line_items_nodes, start=1): position = idx article_number = item_node.xpath( "./ram:SpecifiedTradeProduct/ram:SellerAssignedID/text()", namespaces=NAMESPACES, ) article_number_buyer = item_node.xpath( "./ram:SpecifiedTradeProduct/ram:BuyerAssignedID/text()", namespaces=NAMESPACES, ) description = item_node.xpath( "./ram:SpecifiedTradeProduct/ram:Name/text()", namespaces=NAMESPACES ) quantity = item_node.xpath( "./ram:SpecifiedLineTradeDelivery/ram:BilledQuantity/text()", namespaces=NAMESPACES, ) unit_code = item_node.xpath( "./ram:SpecifiedLineTradeDelivery/ram:BilledQuantity/@unitCode", namespaces=NAMESPACES, ) unit_price = item_node.xpath( "./ram:SpecifiedLineTradeAgreement/ram:NetPriceProductTradePrice/ram:ChargeAmount/text()", namespaces=NAMESPACES, ) line_total = item_node.xpath( "./ram:SpecifiedLineTradeSettlement/ram:SpecifiedTradeSettlementLineMonetarySummation/" "ram:LineTotalAmount/text()", namespaces=NAMESPACES, ) vat_rate = item_node.xpath( "./ram:SpecifiedLineTradeSettlement/ram:ApplicableTradeTax/ram:RateApplicablePercent/text()", namespaces=NAMESPACES, ) vat_amount = item_node.xpath( "./ram:SpecifiedLineTradeSettlement/ram:ApplicableTradeTax/ram:CalculatedAmount/text()", namespaces=NAMESPACES, ) unit = unit_code[0] if unit_code else "Stück" items.append( LineItem( position=position, article_number=article_number[0] if article_number else None, article_number_buyer=article_number_buyer[0] if article_number_buyer else None, description=description[0] if description else "", quantity=float(quantity[0]) if quantity else 0.0, unit=unit, unit_price=float(unit_price[0]) if unit_price else 0.0, line_total=float(line_total[0]) if line_total else 0.0, vat_rate=float(vat_rate[0]) if vat_rate else None, vat_amount=float(vat_amount[0]) if vat_amount else None, ) ) return items def parse_totals(xml_root: etree._Element) -> Totals: """Parse invoice totals from XML. Args: xml_root: XML root element Returns: Totals model """ line_total_sum = xml_root.xpath( "//ram:SpecifiedTradeSettlementHeaderMonetarySummation/ram:LineTotalAmount/text()", namespaces=NAMESPACES, ) net = xml_root.xpath( "//ram:SpecifiedTradeSettlementHeaderMonetarySummation/ram:TaxBasisTotalAmount/text()", namespaces=NAMESPACES, ) vat_total = xml_root.xpath( "//ram:SpecifiedTradeSettlementHeaderMonetarySummation/ram:TaxTotalAmount/text()", namespaces=NAMESPACES, ) gross = xml_root.xpath( "//ram:SpecifiedTradeSettlementHeaderMonetarySummation/ram:GrandTotalAmount/text()", namespaces=NAMESPACES, ) vat_breakdown_nodes = xml_root.xpath( "//ram:ApplicableHeaderTradeSettlement/ram:ApplicableTradeTax", namespaces=NAMESPACES, ) vat_breakdown = [] for vat_node in vat_breakdown_nodes: rate = vat_node.xpath( "./ram:RateApplicablePercent/text()", namespaces=NAMESPACES ) base = vat_node.xpath("./ram:BasisAmount/text()", namespaces=NAMESPACES) amount = vat_node.xpath("./ram:CalculatedAmount/text()", namespaces=NAMESPACES) if rate and base and amount: vat_breakdown.append( VatBreakdown( rate=float(rate[0]), base=float(base[0]), amount=float(amount[0]), ) ) return Totals( line_total_sum=float(line_total_sum[0]) if line_total_sum else 0.0, net=float(net[0]) if net else 0.0, vat_total=float(vat_total[0]) if vat_total else 0.0, gross=float(gross[0]) if gross else 0.0, vat_breakdown=vat_breakdown, ) def parse_payment_terms(xml_root: etree._Element) -> PaymentTerms | None: """Parse payment terms from XML. Args: xml_root: XML root element Returns: PaymentTerms model or None """ iban = xml_root.xpath( "//ram:SpecifiedTradeSettlementPaymentMeans/ram:PayeePartyCreditorFinancialAccount/" "ram:IBANID/text()", namespaces=NAMESPACES, ) bic = xml_root.xpath( "//ram:SpecifiedTradeSettlementPaymentMeans/ram:PayeePartyCreditorFinancialInstitution/" "ram:BICID/text()", namespaces=NAMESPACES, ) account_holder = xml_root.xpath( "//ram:SpecifiedTradeSettlementPaymentMeans/ram:PayeePartyCreditorFinancialAccount/" "ram:ProprietaryAccountName/text()", namespaces=NAMESPACES, ) if not (iban or bic or account_holder): return None return PaymentTerms( iban=iban[0] if iban else None, bic=bic[0] if bic else None, account_holder=account_holder[0] if account_holder else None, ) def parse_zugferd_xml(xml_bytes: bytes) -> XmlData: """Parse ZUGFeRD XML bytes to structured XmlData. Args: xml_bytes: Raw XML bytes from PDF Returns: XmlData model with all invoice fields """ xml_root = etree.fromstring(xml_bytes) invoice_number = xml_root.xpath( "//rsm:ExchangedDocument/ram:ID/text()", namespaces=NAMESPACES ) invoice_date = xml_root.xpath( "//rsm:ExchangedDocument/ram:IssueDateTime/udt:DateTimeString[@format='102']/text()", namespaces=NAMESPACES, ) due_date = xml_root.xpath( "//ram:ApplicableHeaderTradeAgreement/ram:ApplicableTradeDeliveryTerms/" "ram:Description/text()", namespaces=NAMESPACES, ) notes = xml_root.xpath( "//rsm:ExchangedDocument/ram:IncludedNote/ram:Content/text()", namespaces=NAMESPACES, ) currency = xml_root.xpath( "//ram:ApplicableHeaderTradeSettlement/ram:InvoiceCurrencyCode/text()", namespaces=NAMESPACES, ) return XmlData( invoice_number=invoice_number[0] if invoice_number else "", invoice_date=invoice_date[0] if invoice_date else "", due_date=due_date[0] if due_date else None, supplier=parse_supplier(xml_root), buyer=parse_buyer(xml_root), line_items=parse_line_items(xml_root), totals=parse_totals(xml_root), currency=currency[0] if currency else "EUR", payment_terms=parse_payment_terms(xml_root), notes=notes[0] if notes else None, ) def get_pdf_page_count(pdf_bytes: bytes) -> int: """Get number of pages in PDF. Args: pdf_bytes: Raw PDF file content Returns: Number of pages """ try: pdf_stream = io.BytesIO(pdf_bytes) reader = PdfReader(pdf_stream) return len(reader.pages) except (PdfReadError, PyPdfError): return 0 def _profile_from_urn(urn: str) -> str: """Extract a short profile name from a Factur-X/ZUGFeRD URN. Falls back to the last segment of the URN after '#', or 'unknown'. Args: urn: The full profile URN (e.g. 'urn:cen.eu:en16931:2017#compliant#urn:xeinkauf.de:kosit:xrechnung_3.0') Returns: Short profile name (e.g. 'xrechnung_3.0') """ if not urn: return "unknown" return urn.rsplit("#", maxsplit=1)[-1].rsplit(":", maxsplit=1)[-1] def extract_zugferd(pdf_bytes: bytes) -> ExtractResponse: """Extract ZUGFeRD data from PDF bytes. Args: pdf_bytes: Raw PDF file content as bytes Returns: ExtractResponse with is_zugferd, profile, xml_data, pdf_text Raises: ExtractionError: For PDF processing errors """ start_time = time.time() if len(pdf_bytes) > 10 * 1024 * 1024: raise ExtractionError( error_code="file_too_large", message="File exceeds 10MB limit", details=f"Size: {len(pdf_bytes)} bytes", ) try: xml_filename, xml_bytes = get_xml_from_pdf(pdf_bytes, check_xsd=False) except Exception as e: error_msg = str(e).lower() if "password" in error_msg or "encrypted" in error_msg: raise ExtractionError( error_code="password_protected_pdf", message="PDF is password protected", details=str(e), ) if "pdf" in error_msg or "trailer" in error_msg or "xref" in error_msg: raise ExtractionError( error_code="invalid_pdf", message="Invalid PDF file", details=str(e) ) raise ExtractionError( error_code="corrupt_pdf", message="Failed to extract XML from PDF", details=str(e), ) if not xml_bytes: pdf_text = extract_text_from_pdf(pdf_bytes) pages = get_pdf_page_count(pdf_bytes) extraction_time_ms = int((time.time() - start_time) * 1000) return ExtractResponse( is_zugferd=False, pdf_text=pdf_text, extraction_meta=ExtractionMeta( pages=pages, xml_attachment_name=None, extraction_time_ms=extraction_time_ms, ), ) xml_root = etree.fromstring(xml_bytes) flavor = get_flavor(xml_root) try: level = get_level(xml_root, flavor) except ValueError: urn_nodes = xml_root.xpath( "//rsm:ExchangedDocumentContext/" "rsm:GuidelineSpecifiedDocumentContextParameter/ram:ID/text()", namespaces=NAMESPACES, ) urn = urn_nodes[0] if urn_nodes else "" level = _profile_from_urn(urn) xml_data = parse_zugferd_xml(xml_bytes) pdf_text = extract_text_from_pdf(pdf_bytes) pages = get_pdf_page_count(pdf_bytes) extraction_time_ms = int((time.time() - start_time) * 1000) return ExtractResponse( is_zugferd=True, zugferd_profil=level.upper(), xml_raw=xml_bytes.decode("utf-8"), xml_data=xml_data, pdf_text=pdf_text, extraction_meta=ExtractionMeta( pages=pages, xml_attachment_name=xml_filename or "factur-x.xml", extraction_time_ms=extraction_time_ms, ), )