diff --git a/main.py b/main.py index 1fb4553..d7853a0 100644 --- a/main.py +++ b/main.py @@ -1,5 +1,5 @@ +import json import re -import sqlite3 import traceback from argparse import ArgumentParser from datetime import datetime @@ -7,17 +7,22 @@ from sys import stderr from time import sleep import requests +import psycopg -from microqa.items import fetch_item +from microqa.items import fetch_item, url_encode from microqa.engine import analyze_doc +GUI_DOCS_PORTAL_URL = "https://app.phono.dev/w/019b0a7dd865788e83b8cde7fcc99c9e/r/16583/p/019b6375173c76139afa91a356f97583" +GUI_PAGES_PORTAL_URL = "https://app.phono.dev/w/019b0a7dd865788e83b8cde7fcc99c9e/r/16604/p/019b6379b1487b1e8791bd6486804452" + + def main(): parser = ArgumentParser() parser.add_argument( "--database", - help="path to sqlite database for analysis output", - default="./microqa.db", + help="path to PostgreSQL database for analysis output", + required=True, ) parser.add_argument( "--cpus", @@ -26,9 +31,16 @@ def main(): default=6, ) parser.add_argument( - "--earliest-review-date", - help="script will attempt to analyze all items with a review date greater than or equal to this value (YYYYMMDD)", - default="20250701", + "--skip-items-pull", + action="store_true", + help="skip checking Archive.org for newly reviewed items", + default=False, + ) + parser.add_argument( + "--earliest-update-date", + help="script will attempt to analyze all items with an oai_updatedate greater than or equal to this value (YYYY-MM-DD)", + type=lambda s: datetime.strptime(s, "%Y-%m-%d"), + default=datetime(2025, 7, 1), ) parser.add_argument( "--ocr-backend", @@ -48,119 +60,184 @@ def main(): ocr_engine = PaddleOcrEngine(languages=["eng", "fra"]) - with sqlite3.connect(args.database) as conn: + with psycopg.connect(args.database, autocommit=True) as conn: cur = conn.cursor() - cur.execute(""" -create table if not exists items ( - id text primary key not null, - review_date text not null, - skip_analysis bool not null, - analyzed_date text -)""") - cur.execute(""" -create table if not exists docs ( - name text primary key not null, - item text not null -)""") - cur.execute(""" -create table if not exists pages ( - id int primary key, - doc text not null, - page int not null, - page_angle float not null, - sharpness real not null, - is_blank boolean not null, - text_margin_px int not null -)""") - cur.execute("create index if not exists review_date_idx on items (review_date)") - cur.execute( - "create index if not exists analyzed_date_idx on items (analyzed_date)" - ) - cur.execute("create index if not exists item_idx on docs (item)") - cur.execute("create index if not exists doc_idx on pages (doc)") - cur.execute( - "create unique index if not exists doc_page_idx on pages (doc, page)" - ) - conn.commit() + + if not args.skip_items_pull: + print("Pulling item IDs") + pull_new_item_ids(conn, args.earliest_update_date) + print("Done.") while True: - print("Pulling item IDs") - pull_new_item_ids(conn, args.earliest_review_date) - print("Done.") - res = cur.execute(""" -select id -from items -where analyzed_date is null - and skip_analysis = false -order by review_date + cur.execute(""" +update phono.items +set started_date = now() +where _id = ( + select _id + from phono.items + where (started_date is null or started_date < now() - interval '3 hours') + and completed_date is null + order by oai_updatedate + limit 1 +) +returning _id, ia_id """) - for (item_id,) in res.fetchall(): - N_ATTEMPTS = 3 - for _ in range(N_ATTEMPTS): - try: - print(f"Processing {item_id}") - item = fetch_item(item_id) - minimal_docs = ( - [doc for doc in item.docs if doc.name != ""] - if len(item.docs) > 1 - else item.docs + row_item = cur.fetchone() + if row_item is None: + print("No items in queue.") + return + [item_id, ia_id] = row_item + N_ATTEMPTS = 2 + for _ in range(N_ATTEMPTS): + try: + print(f"Processing {item_id}") + item = fetch_item(ia_id) + minimal_docs = ( + [doc for doc in item.docs if doc.name != ""] + if len(item.docs) > 1 + else item.docs + ) + for doc in minimal_docs: + cur.execute( + """ +with + new_data (name, item) as (values (%s, %s)), + existing_data as ( + select docs._id from phono.docs as docs inner join new_data on docs.name = new_data.name and docs.item = new_data.item + ), + inserted_data as ( + insert into phono.docs (name, item) + select name, item from new_data + where not exists (select 1 from existing_data) + returning _id + ) +select _id from existing_data + union all select _id from inserted_data +""", + [doc.name, item_id], ) - for doc in minimal_docs: - cur.execute( - "insert into docs (name, item) values (?, ?) on conflict do nothing", - [doc.name, item_id], + [doc_id] = cur.fetchone() + + pages_subfilter_template = url_encode( + json.dumps( + { + "t": "Comparison", + "c": { + "t": "Infix", + "c": { + "operator": "Eq", + "lhs": { + "t": "Identifier", + "c": {"parts_raw": ["doc"]}, + }, + "rhs": { + "t": "Literal", + "c": { + "t": "Uuid", + "c": "__ID_PLACEHOLDER__", + }, + }, + }, + }, + }, + separators=(",", ":"), ) - analysis = analyze_doc( - doc=doc, ocr_engine=ocr_engine, verbose=True - ) - for i, page in enumerate(analysis["pages"]): - cur.execute( - """ -insert into pages ( + ) + pages_subfilter_parts = pages_subfilter_template.split( + "__ID_PLACEHOLDER__" + ) + assert len(pages_subfilter_parts) == 2 + cur.execute( + """ +update phono.docs +set pages_link = %s || '?subfilter=' || %s || _id::text || %s +where _id = %s +""", + [GUI_PAGES_PORTAL_URL, *pages_subfilter_parts, doc_id], + ) + + analysis = analyze_doc( + doc=doc, ocr_engine=ocr_engine, verbose=True + ) + cur.executemany( + """ +with + new_data (doc, page, page_angle, sharpness, is_blank, text_margin_px, url) as ( + values (%s, %s, %s, %s, %s, %s, %s) + ), + updated_data as ( + update phono.pages as pages set + page_angle = new_data.page_angle, + sharpness = new_data.sharpness, + is_blank = new_data.is_blank, + text_margin_px = new_data.text_margin_px, + url = new_data.url + from new_data where pages.doc = new_data.doc and pages.page = new_data.page + returning 1 + ) +insert into phono.pages ( doc, page, page_angle, sharpness, is_blank, - text_margin_px -) values (?, ?, ?, ?, ?, ?)""", - [ - doc.name, - i + 1, - page["page_angle"], - page["sharpness"], - page["blank"], - page["text_margin_px"], - ], - ) - cur.execute( - "update items set analyzed_date = ? where id = ?", - [datetime.utcnow().strftime("%Y%m%d%H%M%S"), item_id], + text_margin_px, + url +) select + doc, + page, + page_angle, + sharpness, + is_blank, + text_margin_px, + url +from new_data +where not exists (select 1 from updated_data) +""", + [ + [ + doc_id, + i + 1, + page["page_angle"], + page["sharpness"], + page["is_blank"], + page["text_margin_px"], + f"https://archive.org/details/{ia_id}{f'/{url_encode(doc.name)}' if doc.name != ia_id else ''}/page/n{i}", + ] + for i, page in enumerate(analysis["pages"]) + ], ) - conn.commit() - print("Done") - break - except Exception as err: - print(err, file=stderr) - traceback.print_tb(err.__traceback__, file=stderr) - sleep(15) - break - sleep(3600) + cur.execute( + "update phono.items set completed_date = now() where _id = %s", + [item_id], + ) + break + except Exception as err: + print(err, file=stderr) + traceback.print_tb(err.__traceback__, file=stderr) + sleep(15) -def pull_new_item_ids(conn, earliest_review_date): +def pull_new_item_ids(conn, earliest_update_date: datetime): cur = conn.cursor() - res = cur.execute("select review_date from items order by review_date desc limit 1") - (latest_review_date,) = res.fetchone() or (earliest_review_date,) - print(latest_review_date) + cur.execute( + "select oai_updatedate from phono.items order by oai_updatedate desc limit 1" + ) + (latest_update_date,) = cur.fetchone() or (earliest_update_date,) + # There are a couple of "review date" fields, but it's unclear precisely how + # they relate to each other or to the Cebu microfiche review process. Best I + # can tell, `updatedate`/`oai_updatedate` are a more straightforward way to + # paginate. query = f""" collection:(microfiche) AND contributor:(Internet Archive) AND micro_review:(done) - AND review_date:[{latest_review_date} TO null] + AND oai_updatedate:[{latest_update_date.replace(tzinfo=None).isoformat()}Z TO null] """ - sort = "reviewdate asc" + sort = "updatedate asc" + + print(f"Querying:{query}") # Format for API. query = re.sub(r"\s+", "+", query.strip()) @@ -170,20 +247,65 @@ def pull_new_item_ids(conn, earliest_review_date): # be broken in mysterious ways and more or less impossible to use for our # purposes. resp = requests.get( - f"https://archive.org/advancedsearch.php?q={query}&sort[]={sort}&fl[]=identifier&fl[]=review_date&rows=250000&output=json", + f"https://archive.org/advancedsearch.php?q={query}&sort[]={sort}&fl[]=identifier&fl[]=review_date&fl[]=oai_updatedate&rows=25000&output=json", ) resp.raise_for_status() try: body = resp.json() + if "error" in body: + raise Exception("API error") except Exception as err: print("Body:", resp.text, file=stderr) raise err - for doc in body["response"]["docs"]: - cur.execute( - "insert into items (id, review_date, skip_analysis) values (?, ?, false) on conflict do nothing", - (doc["identifier"], doc["review_date"]), + BATCH_SIZE = 250 + docs = body["response"]["docs"] + for i in range(0, len(docs), BATCH_SIZE): + batch = docs[i : min(len(docs), i + BATCH_SIZE)] + # Approximate a unique constraint on the application side. + cur.executemany( + """ +with new_data (ia_id, review_date, oai_updatedate) as (values (%s, %s, %s)) +insert into phono.items (ia_id, review_date, oai_updatedate, url) + select ia_id, review_date, oai_updatedate, 'https://archive.org/details/' || ia_id from new_data + where not exists ( + select 1 from phono.items where ia_id = new_data.ia_id + ) +""", + [ + [ + doc["identifier"], + doc.get("review_date"), + max(*[datetime.fromisoformat(t) for t in doc["oai_updatedate"]]), + ] + for doc in batch + ], ) - conn.commit() + + docs_subfilter_template = url_encode( + json.dumps( + { + "t": "Comparison", + "c": { + "t": "Infix", + "c": { + "operator": "Eq", + "lhs": {"t": "Identifier", "c": {"parts_raw": ["item"]}}, + "rhs": { + "t": "Literal", + "c": {"t": "Uuid", "c": "__ID_PLACEHOLDER__"}, + }, + }, + }, + }, + separators=(",", ":"), + ) + ) + docs_subfilter_parts = docs_subfilter_template.split("__ID_PLACEHOLDER__") + assert len(docs_subfilter_parts) == 2 + cur.execute( + "update phono.items set docs_link = %s || '?subfilter=' || %s || _id::text || %s", + [GUI_DOCS_PORTAL_URL, *docs_subfilter_parts], + ) if __name__ == "__main__": diff --git a/microqa/items.py b/microqa/items.py index de8109f..5906381 100644 --- a/microqa/items.py +++ b/microqa/items.py @@ -97,7 +97,7 @@ class ArchiveDoc: """ if use_cache: - with open(f"{CACHE_DIR}/{_url_encode(self.name)}.pdf", "rb") as f: + with open(f"{CACHE_DIR}/{url_encode(self.name)}.pdf", "rb") as f: pdf_data = f.read() else: pdf_data = _fetch_pdf(self.identifier, self.name) @@ -201,13 +201,13 @@ def fetch_item(identifier: str, use_cache=False) -> ArchiveItem: # this shouldn't usually be an issue, but if/when it is, it can be very # frustrating to troubleshoot user-side. file_names = [ - _url_decode(name) + url_decode(name) for name in os.listdir(CACHE_DIR) if name.lower().startswith(identifier.lower()) ] else: files_resp = requests.get( - f"https://archive.org/metadata/{_url_encode(identifier)}/files" + f"https://archive.org/metadata/{url_encode(identifier)}/files" ) files_resp.raise_for_status() file_names = [item["name"] for item in files_resp.json()["result"]] @@ -221,11 +221,11 @@ def fetch_item(identifier: str, use_cache=False) -> ArchiveItem: # Assert that all files we expect to find are actually present. for doc_name in doc_names: - if f"{_url_encode(doc_name.lower())}.pdf" not in [ + if f"{url_encode(doc_name.lower())}.pdf" not in [ name.lower() for name in file_names ]: raise Exception( - f"expected file not found: {_url_encode(doc_name.lower())}.pdf" + f"expected file not found: {url_encode(doc_name.lower())}.pdf" ) return ArchiveItem( @@ -259,18 +259,18 @@ def cache_item(identifier: str, overwrite=True): os.makedirs(CACHE_DIR, exist_ok=True) for name in os.listdir(CACHE_DIR): - if _url_decode(name.lower()).startswith(identifier.lower()): + if url_decode(name.lower()).startswith(identifier.lower()): if not overwrite: return item = fetch_item(identifier) for doc in item.docs: pdf_data = _fetch_pdf(identifier, doc.name) - with open(f"{CACHE_DIR}/{_url_encode(doc.name)}.pdf", "wb") as f: + with open(f"{CACHE_DIR}/{url_encode(doc.name)}.pdf", "wb") as f: f.write(pdf_data) -def _url_encode(string: str) -> str: +def url_encode(string: str) -> str: """ Helper to encode to a URL-encoded (in other words, percent-encoded) string. """ @@ -278,7 +278,7 @@ def _url_encode(string: str) -> str: return urllib.parse.quote(string, safe=" ._") -def _url_decode(string: str) -> str: +def url_decode(string: str) -> str: """ Helper to decode from a URL-encoded (in other words, percent-encoded) string. @@ -296,7 +296,7 @@ def _fetch_pdf(identifier: str, doc_name: str) -> bytes: # file path itself as defined by archive.org. Percent-encoding it further # may result in a 404 error. resp = requests.get( - f"https://archive.org/download/{_url_encode(identifier)}/{doc_name}.pdf" + f"https://archive.org/download/{url_encode(identifier)}/{doc_name}.pdf" ) resp.raise_for_status() return resp.content