MicroQA/main.py
2026-01-15 21:00:06 -08:00

314 lines
11 KiB
Python

import json
import re
import traceback
from argparse import ArgumentParser
from datetime import datetime
from sys import stderr
from time import sleep
import requests
import psycopg
from microqa.items import fetch_item, url_encode
from microqa.engine import analyze_doc
# Hard-coded Phonograph URLs are included for convenience and are relevant only
# to the official deployment.
GUI_DOCS_PORTAL_URL = "https://app.phono.dev/w/019b0a7dd865788e83b8cde7fcc99c9e/r/16583/p/019b6375173c76139afa91a356f97583"
GUI_PAGES_PORTAL_URL = "https://app.phono.dev/w/019b0a7dd865788e83b8cde7fcc99c9e/r/16604/p/019b6379b1487b1e8791bd6486804452"
def main():
parser = ArgumentParser()
parser.add_argument(
"--database",
help="path to PostgreSQL database for analysis output",
required=True,
)
parser.add_argument(
"--cpus",
type=int,
help="number of concurrent subprocesses to use; higher is generally faster but consumes more resources",
default=6,
)
parser.add_argument(
"--skip-items-pull",
action="store_true",
help="skip checking Archive.org for newly reviewed items",
default=False,
)
parser.add_argument(
"--earliest-update-date",
help="script will attempt to analyze all items with an oai_updatedate greater than or equal to this value (YYYY-MM-DD)",
type=lambda s: datetime.strptime(s, "%Y-%m-%d"),
default=datetime(2025, 7, 1),
)
parser.add_argument(
"--ocr-backend",
help="which local OCR backend to use when available text in archived PDF files is insufficient; one of 'tesseract' or 'paddleocr'",
default="tesseract",
)
args = parser.parse_args()
# Import OCR engine modules only as needed, to avoid unnecessary slow
# startups and/or missing dependency errors.
if args.ocr_backend == "tesseract":
from microqa.ocr.tesseract import TesseractOcrEngine
ocr_engine = TesseractOcrEngine(languages=["eng", "fra"])
elif args.ocr_backend == "paddleocr":
from microqa.ocr.paddleocr import PaddleOcrEngine
ocr_engine = PaddleOcrEngine(languages=["eng", "fra"])
with psycopg.connect(args.database, autocommit=True) as conn:
cur = conn.cursor()
if not args.skip_items_pull:
print("Pulling item IDs")
pull_new_item_ids(conn, args.earliest_update_date)
print("Done.")
while True:
cur.execute("""
update phono.items
set started_date = now()
where _id = (
select _id
from phono.items
where (started_date is null or started_date < now() - interval '3 hours')
and completed_date is null
order by oai_updatedate
limit 1
)
returning _id, ia_id
""")
row_item = cur.fetchone()
if row_item is None:
print("No items in queue.")
return
[item_id, ia_id] = row_item
N_ATTEMPTS = 2
for _ in range(N_ATTEMPTS):
try:
print(f"Processing {item_id}")
item = fetch_item(ia_id)
minimal_docs = (
[doc for doc in item.docs if doc.name != ""]
if len(item.docs) > 1
else item.docs
)
for doc in minimal_docs:
cur.execute(
"""
with
new_data (name, item) as (values (%s, %s)),
existing_data as (
select docs._id from phono.docs as docs inner join new_data on docs.name = new_data.name and docs.item = new_data.item
),
inserted_data as (
insert into phono.docs (name, item)
select name, item from new_data
where not exists (select 1 from existing_data)
returning _id
)
select _id from existing_data
union all select _id from inserted_data
""",
[doc.name, item_id],
)
[doc_id] = cur.fetchone()
pages_subfilter_template = url_encode(
json.dumps(
{
"t": "Comparison",
"c": {
"t": "Infix",
"c": {
"operator": "Eq",
"lhs": {
"t": "Identifier",
"c": {"parts_raw": ["doc"]},
},
"rhs": {
"t": "Literal",
"c": {
"t": "Uuid",
"c": "__ID_PLACEHOLDER__",
},
},
},
},
},
separators=(",", ":"),
)
)
pages_subfilter_parts = pages_subfilter_template.split(
"__ID_PLACEHOLDER__"
)
assert len(pages_subfilter_parts) == 2
cur.execute(
"""
update phono.docs
set pages_link = %s || '?subfilter=' || %s || _id::text || %s
where _id = %s
""",
[GUI_PAGES_PORTAL_URL, *pages_subfilter_parts, doc_id],
)
analysis = analyze_doc(
doc=doc, ocr_engine=ocr_engine, verbose=True
)
cur.executemany(
"""
with
new_data (doc, page, page_angle, sharpness, is_blank, text_margin_px, url) as (
values (%s, %s, %s, %s::numeric, %s, %s::numeric, %s)
),
updated_data as (
update phono.pages as pages set
page_angle = new_data.page_angle,
sharpness = new_data.sharpness,
is_blank = new_data.is_blank,
text_margin_px = new_data.text_margin_px,
url = new_data.url
from new_data where pages.doc = new_data.doc and pages.page = new_data.page
returning 1
)
insert into phono.pages (
doc,
page,
page_angle,
sharpness,
is_blank,
text_margin_px,
url
) select
doc,
page,
page_angle,
sharpness,
is_blank,
text_margin_px,
url
from new_data
where not exists (select 1 from updated_data)
""",
[
[
doc_id,
i + 1,
page["page_angle"],
page["sharpness"],
page["is_blank"],
page["text_margin_px"],
f"https://archive.org/details/{ia_id}{f'/{url_encode(doc.name)}' if doc.name != ia_id else ''}/page/n{i}",
]
for i, page in enumerate(analysis["pages"])
],
)
cur.execute(
"update phono.items set completed_date = now() where _id = %s",
[item_id],
)
break
except Exception as err:
print(err, file=stderr)
traceback.print_tb(err.__traceback__, file=stderr)
sleep(15)
def pull_new_item_ids(conn, earliest_update_date: datetime):
cur = conn.cursor()
cur.execute(
"select oai_updatedate from phono.items order by oai_updatedate desc limit 1"
)
(latest_update_date,) = cur.fetchone() or (earliest_update_date,)
# There are a couple of "review date" fields, but it's unclear precisely how
# they relate to each other or to the Cebu microfiche review process. Best I
# can tell, `updatedate`/`oai_updatedate` are a more straightforward way to
# paginate.
query = f"""
collection:(microfiche)
AND contributor:(Internet Archive)
AND micro_review:(done)
AND oai_updatedate:[{latest_update_date.replace(tzinfo=None).isoformat()}Z TO null]
"""
sort = "updatedate asc"
print(f"Querying:{query}")
# Format for API.
query = re.sub(r"\s+", "+", query.strip())
sort = re.sub(r"\s+", "+", sort.strip())
# Archive.org has a paginated scraping API, but the query feature seems to
# be broken in mysterious ways and more or less impossible to use for our
# purposes.
resp = requests.get(
f"https://archive.org/advancedsearch.php?q={query}&sort[]={sort}&fl[]=identifier&fl[]=review_date&fl[]=oai_updatedate&rows=25000&output=json",
)
resp.raise_for_status()
try:
body = resp.json()
if "error" in body:
raise Exception("API error")
except Exception as err:
print("Body:", resp.text, file=stderr)
raise err
BATCH_SIZE = 250
docs = body["response"]["docs"]
for i in range(0, len(docs), BATCH_SIZE):
batch = docs[i : min(len(docs), i + BATCH_SIZE)]
# Approximate a unique constraint on the application side.
cur.executemany(
"""
with new_data (ia_id, review_date, oai_updatedate) as (values (%s, %s, %s))
insert into phono.items (ia_id, review_date, oai_updatedate, url)
select ia_id, review_date, oai_updatedate, 'https://archive.org/details/' || ia_id from new_data
where not exists (
select 1 from phono.items where ia_id = new_data.ia_id
)
""",
[
[
doc["identifier"],
doc.get("review_date"),
max([datetime.fromisoformat(t) for t in doc["oai_updatedate"]]),
]
for doc in batch
],
)
docs_subfilter_template = url_encode(
json.dumps(
{
"t": "Comparison",
"c": {
"t": "Infix",
"c": {
"operator": "Eq",
"lhs": {"t": "Identifier", "c": {"parts_raw": ["item"]}},
"rhs": {
"t": "Literal",
"c": {"t": "Uuid", "c": "__ID_PLACEHOLDER__"},
},
},
},
},
separators=(",", ":"),
)
)
docs_subfilter_parts = docs_subfilter_template.split("__ID_PLACEHOLDER__")
assert len(docs_subfilter_parts) == 2
cur.execute(
"update phono.items set docs_link = %s || '?subfilter=' || %s || _id::text || %s",
[GUI_DOCS_PORTAL_URL, *docs_subfilter_parts],
)
if __name__ == "__main__":
main()