import json import re import traceback from argparse import ArgumentParser from datetime import datetime from sys import stderr from time import sleep import requests import psycopg from microqa.items import fetch_item, url_encode from microqa.engine import analyze_doc GUI_DOCS_PORTAL_URL = "https://app.phono.dev/w/019b0a7dd865788e83b8cde7fcc99c9e/r/16583/p/019b6375173c76139afa91a356f97583" GUI_PAGES_PORTAL_URL = "https://app.phono.dev/w/019b0a7dd865788e83b8cde7fcc99c9e/r/16604/p/019b6379b1487b1e8791bd6486804452" def main(): parser = ArgumentParser() parser.add_argument( "--database", help="path to PostgreSQL database for analysis output", required=True, ) parser.add_argument( "--cpus", type=int, help="number of concurrent subprocesses to use; higher is generally faster but consumes more resources", default=6, ) parser.add_argument( "--skip-items-pull", action="store_true", help="skip checking Archive.org for newly reviewed items", default=False, ) parser.add_argument( "--earliest-update-date", help="script will attempt to analyze all items with an oai_updatedate greater than or equal to this value (YYYY-MM-DD)", type=lambda s: datetime.strptime(s, "%Y-%m-%d"), default=datetime(2025, 7, 1), ) parser.add_argument( "--ocr-backend", help="which local OCR backend to use when available text in archived PDF files is insufficient; one of 'tesseract' or 'paddleocr'", default="tesseract", ) args = parser.parse_args() # Import OCR engine modules only as needed, to avoid unnecessary slow # startups and/or missing dependency errors. if args.ocr_backend == "tesseract": from microqa.ocr.tesseract import TesseractOcrEngine ocr_engine = TesseractOcrEngine(languages=["eng", "fra"]) elif args.ocr_backend == "paddleocr": from microqa.ocr.paddleocr import PaddleOcrEngine ocr_engine = PaddleOcrEngine(languages=["eng", "fra"]) with psycopg.connect(args.database, autocommit=True) as conn: cur = conn.cursor() if not args.skip_items_pull: print("Pulling item IDs") pull_new_item_ids(conn, args.earliest_update_date) print("Done.") while True: cur.execute(""" update phono.items set started_date = now() where _id = ( select _id from phono.items where (started_date is null or started_date < now() - interval '3 hours') and completed_date is null order by oai_updatedate limit 1 ) returning _id, ia_id """) row_item = cur.fetchone() if row_item is None: print("No items in queue.") return [item_id, ia_id] = row_item N_ATTEMPTS = 2 for _ in range(N_ATTEMPTS): try: print(f"Processing {item_id}") item = fetch_item(ia_id) minimal_docs = ( [doc for doc in item.docs if doc.name != ""] if len(item.docs) > 1 else item.docs ) for doc in minimal_docs: cur.execute( """ with new_data (name, item) as (values (%s, %s)), existing_data as ( select docs._id from phono.docs as docs inner join new_data on docs.name = new_data.name and docs.item = new_data.item ), inserted_data as ( insert into phono.docs (name, item) select name, item from new_data where not exists (select 1 from existing_data) returning _id ) select _id from existing_data union all select _id from inserted_data """, [doc.name, item_id], ) [doc_id] = cur.fetchone() pages_subfilter_template = url_encode( json.dumps( { "t": "Comparison", "c": { "t": "Infix", "c": { "operator": "Eq", "lhs": { "t": "Identifier", "c": {"parts_raw": ["doc"]}, }, "rhs": { "t": "Literal", "c": { "t": "Uuid", "c": "__ID_PLACEHOLDER__", }, }, }, }, }, separators=(",", ":"), ) ) pages_subfilter_parts = pages_subfilter_template.split( "__ID_PLACEHOLDER__" ) assert len(pages_subfilter_parts) == 2 cur.execute( """ update phono.docs set pages_link = %s || '?subfilter=' || %s || _id::text || %s where _id = %s """, [GUI_PAGES_PORTAL_URL, *pages_subfilter_parts, doc_id], ) analysis = analyze_doc( doc=doc, ocr_engine=ocr_engine, verbose=True ) cur.executemany( """ with new_data (doc, page, page_angle, sharpness, is_blank, text_margin_px, url) as ( values (%s, %s, %s, %s, %s, %s, %s) ), updated_data as ( update phono.pages as pages set page_angle = new_data.page_angle, sharpness = new_data.sharpness, is_blank = new_data.is_blank, text_margin_px = new_data.text_margin_px, url = new_data.url from new_data where pages.doc = new_data.doc and pages.page = new_data.page returning 1 ) insert into phono.pages ( doc, page, page_angle, sharpness, is_blank, text_margin_px, url ) select doc, page, page_angle, sharpness, is_blank, text_margin_px, url from new_data where not exists (select 1 from updated_data) """, [ [ doc_id, i + 1, page["page_angle"], page["sharpness"], page["is_blank"], page["text_margin_px"], f"https://archive.org/details/{ia_id}{f'/{url_encode(doc.name)}' if doc.name != ia_id else ''}/page/n{i}", ] for i, page in enumerate(analysis["pages"]) ], ) cur.execute( "update phono.items set completed_date = now() where _id = %s", [item_id], ) break except Exception as err: print(err, file=stderr) traceback.print_tb(err.__traceback__, file=stderr) sleep(15) def pull_new_item_ids(conn, earliest_update_date: datetime): cur = conn.cursor() cur.execute( "select oai_updatedate from phono.items order by oai_updatedate desc limit 1" ) (latest_update_date,) = cur.fetchone() or (earliest_update_date,) # There are a couple of "review date" fields, but it's unclear precisely how # they relate to each other or to the Cebu microfiche review process. Best I # can tell, `updatedate`/`oai_updatedate` are a more straightforward way to # paginate. query = f""" collection:(microfiche) AND contributor:(Internet Archive) AND micro_review:(done) AND oai_updatedate:[{latest_update_date.replace(tzinfo=None).isoformat()}Z TO null] """ sort = "updatedate asc" print(f"Querying:{query}") # Format for API. query = re.sub(r"\s+", "+", query.strip()) sort = re.sub(r"\s+", "+", sort.strip()) # Archive.org has a paginated scraping API, but the query feature seems to # be broken in mysterious ways and more or less impossible to use for our # purposes. resp = requests.get( f"https://archive.org/advancedsearch.php?q={query}&sort[]={sort}&fl[]=identifier&fl[]=review_date&fl[]=oai_updatedate&rows=25000&output=json", ) resp.raise_for_status() try: body = resp.json() if "error" in body: raise Exception("API error") except Exception as err: print("Body:", resp.text, file=stderr) raise err BATCH_SIZE = 250 docs = body["response"]["docs"] for i in range(0, len(docs), BATCH_SIZE): batch = docs[i : min(len(docs), i + BATCH_SIZE)] # Approximate a unique constraint on the application side. cur.executemany( """ with new_data (ia_id, review_date, oai_updatedate) as (values (%s, %s, %s)) insert into phono.items (ia_id, review_date, oai_updatedate, url) select ia_id, review_date, oai_updatedate, 'https://archive.org/details/' || ia_id from new_data where not exists ( select 1 from phono.items where ia_id = new_data.ia_id ) """, [ [ doc["identifier"], doc.get("review_date"), max(*[datetime.fromisoformat(t) for t in doc["oai_updatedate"]]), ] for doc in batch ], ) docs_subfilter_template = url_encode( json.dumps( { "t": "Comparison", "c": { "t": "Infix", "c": { "operator": "Eq", "lhs": {"t": "Identifier", "c": {"parts_raw": ["item"]}}, "rhs": { "t": "Literal", "c": {"t": "Uuid", "c": "__ID_PLACEHOLDER__"}, }, }, }, }, separators=(",", ":"), ) ) docs_subfilter_parts = docs_subfilter_template.split("__ID_PLACEHOLDER__") assert len(docs_subfilter_parts) == 2 cur.execute( "update phono.items set docs_link = %s || '?subfilter=' || %s || _id::text || %s", [GUI_DOCS_PORTAL_URL, *docs_subfilter_parts], ) if __name__ == "__main__": main()