MicroQA/main.py

315 lines
11 KiB
Python
Raw Normal View History

2026-01-14 23:26:46 +00:00
import json
2025-08-10 12:27:39 -07:00
import re
2025-10-04 15:09:16 -07:00
import traceback
2025-08-10 12:27:39 -07:00
from argparse import ArgumentParser
2025-10-04 15:09:16 -07:00
from datetime import datetime
from sys import stderr
from time import sleep
2025-08-10 12:27:39 -07:00
import requests
2026-01-14 23:26:46 +00:00
import psycopg
2025-08-10 12:27:39 -07:00
2026-01-14 23:26:46 +00:00
from microqa.items import fetch_item, url_encode
2025-11-07 05:41:18 +00:00
from microqa.engine import analyze_doc
2025-08-10 22:10:16 -07:00
2026-01-16 04:55:50 +00:00
# Hard-coded Phonograph URLs are included for convenience and are relevant only
# to the official deployment.
2026-01-14 23:26:46 +00:00
GUI_DOCS_PORTAL_URL = "https://app.phono.dev/w/019b0a7dd865788e83b8cde7fcc99c9e/r/16583/p/019b6375173c76139afa91a356f97583"
GUI_PAGES_PORTAL_URL = "https://app.phono.dev/w/019b0a7dd865788e83b8cde7fcc99c9e/r/16604/p/019b6379b1487b1e8791bd6486804452"
2025-08-10 12:27:39 -07:00
def main():
parser = ArgumentParser()
parser.add_argument(
"--database",
2026-01-14 23:26:46 +00:00
help="path to PostgreSQL database for analysis output",
required=True,
)
parser.add_argument(
"--cpus",
type=int,
help="number of concurrent subprocesses to use; higher is generally faster but consumes more resources",
2025-11-07 05:41:18 +00:00
default=6,
)
parser.add_argument(
2026-01-14 23:26:46 +00:00
"--skip-items-pull",
action="store_true",
help="skip checking Archive.org for newly reviewed items",
default=False,
)
parser.add_argument(
"--earliest-update-date",
help="script will attempt to analyze all items with an oai_updatedate greater than or equal to this value (YYYY-MM-DD)",
type=lambda s: datetime.strptime(s, "%Y-%m-%d"),
default=datetime(2025, 7, 1),
)
2025-12-20 02:16:41 +00:00
parser.add_argument(
"--ocr-backend",
help="which local OCR backend to use when available text in archived PDF files is insufficient; one of 'tesseract' or 'paddleocr'",
default="tesseract",
)
2025-08-10 12:27:39 -07:00
args = parser.parse_args()
2025-12-20 02:16:41 +00:00
# Import OCR engine modules only as needed, to avoid unnecessary slow
# startups and/or missing dependency errors.
if args.ocr_backend == "tesseract":
from microqa.ocr.tesseract import TesseractOcrEngine
ocr_engine = TesseractOcrEngine(languages=["eng", "fra"])
elif args.ocr_backend == "paddleocr":
from microqa.ocr.paddleocr import PaddleOcrEngine
ocr_engine = PaddleOcrEngine(languages=["eng", "fra"])
2026-01-14 23:26:46 +00:00
with psycopg.connect(args.database, autocommit=True) as conn:
2025-10-04 15:09:16 -07:00
cur = conn.cursor()
2026-01-14 23:26:46 +00:00
if not args.skip_items_pull:
2025-10-04 15:09:16 -07:00
print("Pulling item IDs")
2026-01-14 23:26:46 +00:00
pull_new_item_ids(conn, args.earliest_update_date)
2025-10-04 15:09:16 -07:00
print("Done.")
2026-01-14 23:26:46 +00:00
while True:
cur.execute("""
update phono.items
set started_date = now()
where _id = (
select _id
from phono.items
where (started_date is null or started_date < now() - interval '3 hours')
and completed_date is null
order by oai_updatedate
limit 1
)
returning _id, ia_id
2025-10-04 15:09:16 -07:00
""")
2026-01-14 23:26:46 +00:00
row_item = cur.fetchone()
if row_item is None:
print("No items in queue.")
return
[item_id, ia_id] = row_item
N_ATTEMPTS = 2
for _ in range(N_ATTEMPTS):
try:
print(f"Processing {item_id}")
item = fetch_item(ia_id)
minimal_docs = (
[doc for doc in item.docs if doc.name != ""]
if len(item.docs) > 1
else item.docs
)
for doc in minimal_docs:
cur.execute(
"""
with
new_data (name, item) as (values (%s, %s)),
existing_data as (
select docs._id from phono.docs as docs inner join new_data on docs.name = new_data.name and docs.item = new_data.item
),
inserted_data as (
insert into phono.docs (name, item)
select name, item from new_data
where not exists (select 1 from existing_data)
returning _id
)
select _id from existing_data
union all select _id from inserted_data
""",
[doc.name, item_id],
2025-08-10 12:27:39 -07:00
)
2026-01-14 23:26:46 +00:00
[doc_id] = cur.fetchone()
pages_subfilter_template = url_encode(
json.dumps(
{
"t": "Comparison",
"c": {
"t": "Infix",
"c": {
"operator": "Eq",
"lhs": {
"t": "Identifier",
"c": {"parts_raw": ["doc"]},
},
"rhs": {
"t": "Literal",
"c": {
"t": "Uuid",
"c": "__ID_PLACEHOLDER__",
},
},
},
},
},
separators=(",", ":"),
)
2026-01-14 23:26:46 +00:00
)
pages_subfilter_parts = pages_subfilter_template.split(
"__ID_PLACEHOLDER__"
)
assert len(pages_subfilter_parts) == 2
cur.execute(
"""
update phono.docs
set pages_link = %s || '?subfilter=' || %s || _id::text || %s
where _id = %s
""",
[GUI_PAGES_PORTAL_URL, *pages_subfilter_parts, doc_id],
)
analysis = analyze_doc(
doc=doc, ocr_engine=ocr_engine, verbose=True
)
cur.executemany(
"""
with
new_data (doc, page, page_angle, sharpness, is_blank, text_margin_px, url) as (
2026-01-15 21:44:45 +00:00
values (%s, %s, %s, %s::numeric, %s, %s::numeric, %s)
2026-01-14 23:26:46 +00:00
),
updated_data as (
update phono.pages as pages set
page_angle = new_data.page_angle,
sharpness = new_data.sharpness,
is_blank = new_data.is_blank,
text_margin_px = new_data.text_margin_px,
url = new_data.url
from new_data where pages.doc = new_data.doc and pages.page = new_data.page
returning 1
)
insert into phono.pages (
doc,
2025-10-04 15:09:16 -07:00
page,
2025-12-20 02:16:41 +00:00
page_angle,
2025-10-04 15:09:16 -07:00
sharpness,
is_blank,
2026-01-14 23:26:46 +00:00
text_margin_px,
url
) select
doc,
page,
page_angle,
sharpness,
is_blank,
text_margin_px,
url
from new_data
where not exists (select 1 from updated_data)
""",
[
[
doc_id,
i + 1,
page["page_angle"],
page["sharpness"],
page["is_blank"],
page["text_margin_px"],
f"https://archive.org/details/{ia_id}{f'/{url_encode(doc.name)}' if doc.name != ia_id else ''}/page/n{i}",
]
for i, page in enumerate(analysis["pages"])
],
2025-08-10 12:27:39 -07:00
)
2026-01-14 23:26:46 +00:00
cur.execute(
"update phono.items set completed_date = now() where _id = %s",
[item_id],
)
break
except Exception as err:
print(err, file=stderr)
traceback.print_tb(err.__traceback__, file=stderr)
sleep(15)
def pull_new_item_ids(conn, earliest_update_date: datetime):
2025-10-04 15:09:16 -07:00
cur = conn.cursor()
2026-01-14 23:26:46 +00:00
cur.execute(
"select oai_updatedate from phono.items order by oai_updatedate desc limit 1"
)
(latest_update_date,) = cur.fetchone() or (earliest_update_date,)
2025-10-04 15:09:16 -07:00
2026-01-14 23:26:46 +00:00
# There are a couple of "review date" fields, but it's unclear precisely how
# they relate to each other or to the Cebu microfiche review process. Best I
# can tell, `updatedate`/`oai_updatedate` are a more straightforward way to
# paginate.
2025-10-04 15:09:16 -07:00
query = f"""
collection:(microfiche)
AND contributor:(Internet Archive)
AND micro_review:(done)
2026-01-14 23:26:46 +00:00
AND oai_updatedate:[{latest_update_date.replace(tzinfo=None).isoformat()}Z TO null]
2025-08-10 12:27:39 -07:00
"""
2026-01-14 23:26:46 +00:00
sort = "updatedate asc"
print(f"Querying:{query}")
2025-10-04 15:09:16 -07:00
# Format for API.
query = re.sub(r"\s+", "+", query.strip())
sort = re.sub(r"\s+", "+", sort.strip())
# Archive.org has a paginated scraping API, but the query feature seems to
# be broken in mysterious ways and more or less impossible to use for our
# purposes.
2025-10-04 15:09:16 -07:00
resp = requests.get(
2026-01-14 23:26:46 +00:00
f"https://archive.org/advancedsearch.php?q={query}&sort[]={sort}&fl[]=identifier&fl[]=review_date&fl[]=oai_updatedate&rows=25000&output=json",
2025-08-10 12:27:39 -07:00
)
2025-10-04 15:09:16 -07:00
resp.raise_for_status()
try:
body = resp.json()
2026-01-14 23:26:46 +00:00
if "error" in body:
raise Exception("API error")
2025-10-04 15:09:16 -07:00
except Exception as err:
print("Body:", resp.text, file=stderr)
raise err
2026-01-14 23:26:46 +00:00
BATCH_SIZE = 250
docs = body["response"]["docs"]
for i in range(0, len(docs), BATCH_SIZE):
batch = docs[i : min(len(docs), i + BATCH_SIZE)]
# Approximate a unique constraint on the application side.
cur.executemany(
"""
with new_data (ia_id, review_date, oai_updatedate) as (values (%s, %s, %s))
insert into phono.items (ia_id, review_date, oai_updatedate, url)
select ia_id, review_date, oai_updatedate, 'https://archive.org/details/' || ia_id from new_data
where not exists (
select 1 from phono.items where ia_id = new_data.ia_id
)
""",
[
[
doc["identifier"],
doc.get("review_date"),
max([datetime.fromisoformat(t) for t in doc["oai_updatedate"]]),
2026-01-14 23:26:46 +00:00
]
for doc in batch
],
2025-08-10 12:27:39 -07:00
)
2026-01-14 23:26:46 +00:00
docs_subfilter_template = url_encode(
json.dumps(
{
"t": "Comparison",
"c": {
"t": "Infix",
"c": {
"operator": "Eq",
"lhs": {"t": "Identifier", "c": {"parts_raw": ["item"]}},
"rhs": {
"t": "Literal",
"c": {"t": "Uuid", "c": "__ID_PLACEHOLDER__"},
},
},
},
},
separators=(",", ":"),
)
)
docs_subfilter_parts = docs_subfilter_template.split("__ID_PLACEHOLDER__")
assert len(docs_subfilter_parts) == 2
cur.execute(
"update phono.items set docs_link = %s || '?subfilter=' || %s || _id::text || %s",
[GUI_DOCS_PORTAL_URL, *docs_subfilter_parts],
)
2025-08-10 12:27:39 -07:00
if __name__ == "__main__":
main()