switch from sqlite to phonograph

This commit is contained in:
Brent Schroeter 2026-01-14 23:26:46 +00:00
parent d48b672e1b
commit 1ca2238c5d
2 changed files with 237 additions and 115 deletions

332
main.py
View file

@ -1,5 +1,5 @@
import json
import re
import sqlite3
import traceback
from argparse import ArgumentParser
from datetime import datetime
@ -7,17 +7,22 @@ from sys import stderr
from time import sleep
import requests
import psycopg
from microqa.items import fetch_item
from microqa.items import fetch_item, url_encode
from microqa.engine import analyze_doc
GUI_DOCS_PORTAL_URL = "https://app.phono.dev/w/019b0a7dd865788e83b8cde7fcc99c9e/r/16583/p/019b6375173c76139afa91a356f97583"
GUI_PAGES_PORTAL_URL = "https://app.phono.dev/w/019b0a7dd865788e83b8cde7fcc99c9e/r/16604/p/019b6379b1487b1e8791bd6486804452"
def main():
parser = ArgumentParser()
parser.add_argument(
"--database",
help="path to sqlite database for analysis output",
default="./microqa.db",
help="path to PostgreSQL database for analysis output",
required=True,
)
parser.add_argument(
"--cpus",
@ -26,9 +31,16 @@ def main():
default=6,
)
parser.add_argument(
"--earliest-review-date",
help="script will attempt to analyze all items with a review date greater than or equal to this value (YYYYMMDD)",
default="20250701",
"--skip-items-pull",
action="store_true",
help="skip checking Archive.org for newly reviewed items",
default=False,
)
parser.add_argument(
"--earliest-update-date",
help="script will attempt to analyze all items with an oai_updatedate greater than or equal to this value (YYYY-MM-DD)",
type=lambda s: datetime.strptime(s, "%Y-%m-%d"),
default=datetime(2025, 7, 1),
)
parser.add_argument(
"--ocr-backend",
@ -48,119 +60,184 @@ def main():
ocr_engine = PaddleOcrEngine(languages=["eng", "fra"])
with sqlite3.connect(args.database) as conn:
with psycopg.connect(args.database, autocommit=True) as conn:
cur = conn.cursor()
cur.execute("""
create table if not exists items (
id text primary key not null,
review_date text not null,
skip_analysis bool not null,
analyzed_date text
)""")
cur.execute("""
create table if not exists docs (
name text primary key not null,
item text not null
)""")
cur.execute("""
create table if not exists pages (
id int primary key,
doc text not null,
page int not null,
page_angle float not null,
sharpness real not null,
is_blank boolean not null,
text_margin_px int not null
)""")
cur.execute("create index if not exists review_date_idx on items (review_date)")
cur.execute(
"create index if not exists analyzed_date_idx on items (analyzed_date)"
)
cur.execute("create index if not exists item_idx on docs (item)")
cur.execute("create index if not exists doc_idx on pages (doc)")
cur.execute(
"create unique index if not exists doc_page_idx on pages (doc, page)"
)
conn.commit()
if not args.skip_items_pull:
print("Pulling item IDs")
pull_new_item_ids(conn, args.earliest_update_date)
print("Done.")
while True:
print("Pulling item IDs")
pull_new_item_ids(conn, args.earliest_review_date)
print("Done.")
res = cur.execute("""
select id
from items
where analyzed_date is null
and skip_analysis = false
order by review_date
cur.execute("""
update phono.items
set started_date = now()
where _id = (
select _id
from phono.items
where (started_date is null or started_date < now() - interval '3 hours')
and completed_date is null
order by oai_updatedate
limit 1
)
returning _id, ia_id
""")
for (item_id,) in res.fetchall():
N_ATTEMPTS = 3
for _ in range(N_ATTEMPTS):
try:
print(f"Processing {item_id}")
item = fetch_item(item_id)
minimal_docs = (
[doc for doc in item.docs if doc.name != ""]
if len(item.docs) > 1
else item.docs
row_item = cur.fetchone()
if row_item is None:
print("No items in queue.")
return
[item_id, ia_id] = row_item
N_ATTEMPTS = 2
for _ in range(N_ATTEMPTS):
try:
print(f"Processing {item_id}")
item = fetch_item(ia_id)
minimal_docs = (
[doc for doc in item.docs if doc.name != ""]
if len(item.docs) > 1
else item.docs
)
for doc in minimal_docs:
cur.execute(
"""
with
new_data (name, item) as (values (%s, %s)),
existing_data as (
select docs._id from phono.docs as docs inner join new_data on docs.name = new_data.name and docs.item = new_data.item
),
inserted_data as (
insert into phono.docs (name, item)
select name, item from new_data
where not exists (select 1 from existing_data)
returning _id
)
select _id from existing_data
union all select _id from inserted_data
""",
[doc.name, item_id],
)
for doc in minimal_docs:
cur.execute(
"insert into docs (name, item) values (?, ?) on conflict do nothing",
[doc.name, item_id],
[doc_id] = cur.fetchone()
pages_subfilter_template = url_encode(
json.dumps(
{
"t": "Comparison",
"c": {
"t": "Infix",
"c": {
"operator": "Eq",
"lhs": {
"t": "Identifier",
"c": {"parts_raw": ["doc"]},
},
"rhs": {
"t": "Literal",
"c": {
"t": "Uuid",
"c": "__ID_PLACEHOLDER__",
},
},
},
},
},
separators=(",", ":"),
)
analysis = analyze_doc(
doc=doc, ocr_engine=ocr_engine, verbose=True
)
for i, page in enumerate(analysis["pages"]):
cur.execute(
"""
insert into pages (
)
pages_subfilter_parts = pages_subfilter_template.split(
"__ID_PLACEHOLDER__"
)
assert len(pages_subfilter_parts) == 2
cur.execute(
"""
update phono.docs
set pages_link = %s || '?subfilter=' || %s || _id::text || %s
where _id = %s
""",
[GUI_PAGES_PORTAL_URL, *pages_subfilter_parts, doc_id],
)
analysis = analyze_doc(
doc=doc, ocr_engine=ocr_engine, verbose=True
)
cur.executemany(
"""
with
new_data (doc, page, page_angle, sharpness, is_blank, text_margin_px, url) as (
values (%s, %s, %s, %s, %s, %s, %s)
),
updated_data as (
update phono.pages as pages set
page_angle = new_data.page_angle,
sharpness = new_data.sharpness,
is_blank = new_data.is_blank,
text_margin_px = new_data.text_margin_px,
url = new_data.url
from new_data where pages.doc = new_data.doc and pages.page = new_data.page
returning 1
)
insert into phono.pages (
doc,
page,
page_angle,
sharpness,
is_blank,
text_margin_px
) values (?, ?, ?, ?, ?, ?)""",
[
doc.name,
i + 1,
page["page_angle"],
page["sharpness"],
page["blank"],
page["text_margin_px"],
],
)
cur.execute(
"update items set analyzed_date = ? where id = ?",
[datetime.utcnow().strftime("%Y%m%d%H%M%S"), item_id],
text_margin_px,
url
) select
doc,
page,
page_angle,
sharpness,
is_blank,
text_margin_px,
url
from new_data
where not exists (select 1 from updated_data)
""",
[
[
doc_id,
i + 1,
page["page_angle"],
page["sharpness"],
page["is_blank"],
page["text_margin_px"],
f"https://archive.org/details/{ia_id}{f'/{url_encode(doc.name)}' if doc.name != ia_id else ''}/page/n{i}",
]
for i, page in enumerate(analysis["pages"])
],
)
conn.commit()
print("Done")
break
except Exception as err:
print(err, file=stderr)
traceback.print_tb(err.__traceback__, file=stderr)
sleep(15)
break
sleep(3600)
cur.execute(
"update phono.items set completed_date = now() where _id = %s",
[item_id],
)
break
except Exception as err:
print(err, file=stderr)
traceback.print_tb(err.__traceback__, file=stderr)
sleep(15)
def pull_new_item_ids(conn, earliest_review_date):
def pull_new_item_ids(conn, earliest_update_date: datetime):
cur = conn.cursor()
res = cur.execute("select review_date from items order by review_date desc limit 1")
(latest_review_date,) = res.fetchone() or (earliest_review_date,)
print(latest_review_date)
cur.execute(
"select oai_updatedate from phono.items order by oai_updatedate desc limit 1"
)
(latest_update_date,) = cur.fetchone() or (earliest_update_date,)
# There are a couple of "review date" fields, but it's unclear precisely how
# they relate to each other or to the Cebu microfiche review process. Best I
# can tell, `updatedate`/`oai_updatedate` are a more straightforward way to
# paginate.
query = f"""
collection:(microfiche)
AND contributor:(Internet Archive)
AND micro_review:(done)
AND review_date:[{latest_review_date} TO null]
AND oai_updatedate:[{latest_update_date.replace(tzinfo=None).isoformat()}Z TO null]
"""
sort = "reviewdate asc"
sort = "updatedate asc"
print(f"Querying:{query}")
# Format for API.
query = re.sub(r"\s+", "+", query.strip())
@ -170,20 +247,65 @@ def pull_new_item_ids(conn, earliest_review_date):
# be broken in mysterious ways and more or less impossible to use for our
# purposes.
resp = requests.get(
f"https://archive.org/advancedsearch.php?q={query}&sort[]={sort}&fl[]=identifier&fl[]=review_date&rows=250000&output=json",
f"https://archive.org/advancedsearch.php?q={query}&sort[]={sort}&fl[]=identifier&fl[]=review_date&fl[]=oai_updatedate&rows=25000&output=json",
)
resp.raise_for_status()
try:
body = resp.json()
if "error" in body:
raise Exception("API error")
except Exception as err:
print("Body:", resp.text, file=stderr)
raise err
for doc in body["response"]["docs"]:
cur.execute(
"insert into items (id, review_date, skip_analysis) values (?, ?, false) on conflict do nothing",
(doc["identifier"], doc["review_date"]),
BATCH_SIZE = 250
docs = body["response"]["docs"]
for i in range(0, len(docs), BATCH_SIZE):
batch = docs[i : min(len(docs), i + BATCH_SIZE)]
# Approximate a unique constraint on the application side.
cur.executemany(
"""
with new_data (ia_id, review_date, oai_updatedate) as (values (%s, %s, %s))
insert into phono.items (ia_id, review_date, oai_updatedate, url)
select ia_id, review_date, oai_updatedate, 'https://archive.org/details/' || ia_id from new_data
where not exists (
select 1 from phono.items where ia_id = new_data.ia_id
)
""",
[
[
doc["identifier"],
doc.get("review_date"),
max(*[datetime.fromisoformat(t) for t in doc["oai_updatedate"]]),
]
for doc in batch
],
)
conn.commit()
docs_subfilter_template = url_encode(
json.dumps(
{
"t": "Comparison",
"c": {
"t": "Infix",
"c": {
"operator": "Eq",
"lhs": {"t": "Identifier", "c": {"parts_raw": ["item"]}},
"rhs": {
"t": "Literal",
"c": {"t": "Uuid", "c": "__ID_PLACEHOLDER__"},
},
},
},
},
separators=(",", ":"),
)
)
docs_subfilter_parts = docs_subfilter_template.split("__ID_PLACEHOLDER__")
assert len(docs_subfilter_parts) == 2
cur.execute(
"update phono.items set docs_link = %s || '?subfilter=' || %s || _id::text || %s",
[GUI_DOCS_PORTAL_URL, *docs_subfilter_parts],
)
if __name__ == "__main__":

View file

@ -97,7 +97,7 @@ class ArchiveDoc:
"""
if use_cache:
with open(f"{CACHE_DIR}/{_url_encode(self.name)}.pdf", "rb") as f:
with open(f"{CACHE_DIR}/{url_encode(self.name)}.pdf", "rb") as f:
pdf_data = f.read()
else:
pdf_data = _fetch_pdf(self.identifier, self.name)
@ -201,13 +201,13 @@ def fetch_item(identifier: str, use_cache=False) -> ArchiveItem:
# this shouldn't usually be an issue, but if/when it is, it can be very
# frustrating to troubleshoot user-side.
file_names = [
_url_decode(name)
url_decode(name)
for name in os.listdir(CACHE_DIR)
if name.lower().startswith(identifier.lower())
]
else:
files_resp = requests.get(
f"https://archive.org/metadata/{_url_encode(identifier)}/files"
f"https://archive.org/metadata/{url_encode(identifier)}/files"
)
files_resp.raise_for_status()
file_names = [item["name"] for item in files_resp.json()["result"]]
@ -221,11 +221,11 @@ def fetch_item(identifier: str, use_cache=False) -> ArchiveItem:
# Assert that all files we expect to find are actually present.
for doc_name in doc_names:
if f"{_url_encode(doc_name.lower())}.pdf" not in [
if f"{url_encode(doc_name.lower())}.pdf" not in [
name.lower() for name in file_names
]:
raise Exception(
f"expected file not found: {_url_encode(doc_name.lower())}.pdf"
f"expected file not found: {url_encode(doc_name.lower())}.pdf"
)
return ArchiveItem(
@ -259,18 +259,18 @@ def cache_item(identifier: str, overwrite=True):
os.makedirs(CACHE_DIR, exist_ok=True)
for name in os.listdir(CACHE_DIR):
if _url_decode(name.lower()).startswith(identifier.lower()):
if url_decode(name.lower()).startswith(identifier.lower()):
if not overwrite:
return
item = fetch_item(identifier)
for doc in item.docs:
pdf_data = _fetch_pdf(identifier, doc.name)
with open(f"{CACHE_DIR}/{_url_encode(doc.name)}.pdf", "wb") as f:
with open(f"{CACHE_DIR}/{url_encode(doc.name)}.pdf", "wb") as f:
f.write(pdf_data)
def _url_encode(string: str) -> str:
def url_encode(string: str) -> str:
"""
Helper to encode to a URL-encoded (in other words, percent-encoded) string.
"""
@ -278,7 +278,7 @@ def _url_encode(string: str) -> str:
return urllib.parse.quote(string, safe=" ._")
def _url_decode(string: str) -> str:
def url_decode(string: str) -> str:
"""
Helper to decode from a URL-encoded (in other words, percent-encoded)
string.
@ -296,7 +296,7 @@ def _fetch_pdf(identifier: str, doc_name: str) -> bytes:
# file path itself as defined by archive.org. Percent-encoding it further
# may result in a 404 error.
resp = requests.get(
f"https://archive.org/download/{_url_encode(identifier)}/{doc_name}.pdf"
f"https://archive.org/download/{url_encode(identifier)}/{doc_name}.pdf"
)
resp.raise_for_status()
return resp.content