diff --git a/.gitignore b/.gitignore index 327445f..6a9eea5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ /target /data *.db +*.db-journal +/archive_cache __pycache__ diff --git a/archive_item.py b/archive_item.py new file mode 100644 index 0000000..1a0b262 --- /dev/null +++ b/archive_item.py @@ -0,0 +1,307 @@ +""" +Python utilities for structuring data and metadata pulled from archive.org +microfiche scans. +""" + +import json +import os +import urllib +from contextlib import nullcontext +from dataclasses import dataclass +from io import BytesIO +from typing import Optional +from zipfile import ZipFile + +import requests +from PIL import Image + + +CACHE_DIR = "./archive_cache" + + +@dataclass +class ArchiveLeaf: + """ + A leaf corresponds to a single image from one of the "Single Page Processed + JP2 Zip" files from an `ArchiveItem`. Not all leaves become part of the + final processed PDF displayed to the user, as some contain metadata or + superfluous information scanned off of the microfiche cards and retained for + posterity. To identify whether a leaf is pertinent or not, refer to the page + number metadata pulled as JSON from the archive.org API. + + Attributes: + + image PIL Image, pre-scaled using .thumbnail() to fit the long + edge to 3200 pixels. + + page_number `None` if the leaf is not included in the processed PDF + presented to users, otherwise a (potentially empty) + string with the inferred page number as defined by the + document being scanned. + """ + + image: Image + page_number: Optional[str] + + +@dataclass +class ArchiveDoc: + """ + Information pertaining to a single set of processed pages, of which there + may be multiple for any given ArchiveItem. For example, one SCOTUS case may + contain several briefs/petitions/etc., each presented as a distinct PDF but + all held within the parent `ArchiveItem`. + + Note that this is a slightly different concept than the literal "files" + available via the archive.org API: an `ArchiveDoc` may combine information + from, say, both a `_page_numbers.json` file and a `_jp2.zip` file to store + image data and page number data conveniently within the same Python object. + + Attributes: + + identifier archive.org identifier string, for example + `"micro_IA40386007_0012"`. + + name Document name, with the item identifier, leading whitespace, + and file extension stripped. + + title Optional `title` metadata field assigned to the `_jp2.zip` + file, usually indicating that this file represents a subset + of the parent item's content, for example a specific brief + or opinion from a larger SCOTUS case document. + + For QA intents and purposes, it's usually easiest to skip + over any documents where `title is not None`, assuming that + the item has at least one processed `_jp2.zip` file for + which `title is None`. + """ + + identifier: str + name: str + title: Optional[str] + + def fetch_leaves(self, numbered_only=True, use_cache=False) -> list[ArchiveLeaf]: + """ + Fetch images and page number data for this document from archive.org, + over the Internet. + + Params: + + numbered_only If `True`, discards any leaves with no corresponding + page number entries. Leaves for which the page + number is an empty string are retained. + use_cache If `True`, locally cached zip files under the + `./archive_cache` directory (relative to the working + directory) will be used instead of fetching over + HTTPS. + """ + + if use_cache: + # Cached file names are derived from the percent-encoded verison of + # `self.name`, so that there's no need to worry about directory + # separators or other disallowed characters in the file names + # defined by archive.org. + with open( + f"{CACHE_DIR}/{_url_encode(self.name)}_page_numbers.json", "r" + ) as f: + page_nums = json.load(f)["pages"] + zip_reader_ctx = open(f"{CACHE_DIR}/{_url_encode(self.name)}_jp2.zip", "rb") + + else: + page_nums = _fetch_page_nums(self.identifier, self.name)["pages"] + + # Wrap in a context manager so that the reader can be used in a `with` + # block in the same way as a file accessed with `open()`. + zip_reader_ctx = nullcontext( + BytesIO(_fetch_jp2_zip(self.identifier, self.name)) + ) + + leaves = [] + + with zip_reader_ctx as zip_reader, ZipFile(zip_reader) as jp_zip: + for leaf_num, file_name in enumerate(sorted(jp_zip.namelist())): + for page_index, page_num_info in enumerate(page_nums): + if page_num_info["leafNum"] == leaf_num: + # Stop iterating and keep page_index set to the current + # value. + break + else: + # Indicate that leaf was not found in page_num list. + page_index = None + + with jp_zip.open(file_name) as jp_file: + # Convert to single-channel greyscale ("L"). + image = Image.open(jp_file).convert("L") + # Rescale long edge to no more than 3200 px. + image.thumbnail((3200, 3200)) + leaves.append(ArchiveLeaf(image=image, page_number=page_index)) + + return leaves + + +@dataclass +class ArchiveItem: + """ + Information pertaining to an archive.org item. Documents, ultimately of type + `ArchiveDoc`, are referenced by name only in this class so that content + downloads for individual `ArchiveDoc`s may be skipped, staggered, or + performed in parallel if desired, rather than in one chunk per item. + + Attributes: + + identifier archive.org identifier string, for example + `"micro_IA40386007_0012"`. + + docs List of `ArchiveDoc` names, with the item identifier, + leading whitespace, and file extension stripped. + """ + + identifier: str + docs: list[ArchiveDoc] + + +def fetch_item(identifier: str, use_cache=False) -> ArchiveItem: + """ + Fetch the relevant top-level information for an `ArchiveItem` from + archive.org. This assumes a specific naming convention for the item's files: + - `[ Title]_jp2.zip` for processed scans + - `[ Title]_page_numbers.json` for page number metadata + - `_micro_jp2.zip` for unprocessed scans + + This function treats file names as case-insensitive, but preserves casing in + its output. + + Params: + + identifier archive.org identifier string, for example + `"micro_IA40386007_0012"`. + use_cache If `True`, locally cached zip files under the + `./archive_cache` directory (relative to the working + directory) will be used instead of fetching over HTTPS. + """ + + if use_cache: + # File names should be treated as case-insensitive, in case the file + # system is case-insensitive. As I understand it, this applies to FAT + # and APFS in their default configurations. Both are case-preserving, so + # this shouldn't usually be an issue, but if/when it is, it can be very + # frustrating to troubleshoot user-side. + file_names = [ + _url_decode(name) + for name in os.listdir(CACHE_DIR) + if name.lower().startswith(identifier.lower()) + ] + else: + files_resp = requests.get( + f"https://archive.org/metadata/{_url_encode(identifier)}/files" + ) + files_resp.raise_for_status() + file_names = [item["name"] for item in files_resp.json()["result"]] + + doc_names = [ + # Strip suffix, to just leave the identifier, and title if present. + name[: -len("_jp2.zip")] + for name in file_names + if name.lower().endswith("_jp2.zip") + # Exclude unprocessed scans, which are also named `..._jp2.zip`. + and name.lower() != f"{identifier.lower()}_micro_jp2.zip" + ] + + # Assert that all files we expect to find are actually present. + for doc_name in doc_names: + if f"{_url_encode(doc_name.lower())}_page_numbers.json" not in [ + name.lower() for name in file_names + ]: + raise Exception( + f"expected file not found: {_url_encode(doc_name.lower())}_page_numbers.zip" + ) + + return ArchiveItem( + identifier=identifier, + docs=[ + ArchiveDoc( + identifier=identifier, + name=name, + title=name[len(identifier) :].strip() or None, + ) + for name in doc_names + ], + ) + + +def cache_item(identifier: str, overwrite=True): + """ + Load the relevant files for an `ArchiveItem` and its component `ArchiveDoc`s + and store them within the `archive_cache` directory (relative to the working + directory). The `archive_cache` directory will be created if it does not + exist. + + Params: + + identifier archive.org identifier string, for example + `"micro_IA40386007_0012"`. + overwrite If set to `False` and any file names in the cache already + match the item, fetching the item is skipped. + """ + + os.makedirs(CACHE_DIR, exist_ok=True) + + for name in os.listdir(CACHE_DIR): + if _url_decode(name.lower()).startswith(identifier.lower()): + return + + item = fetch_item(identifier) + for doc in item.docs: + page_nums = _fetch_page_nums(identifier, doc.name) + zip_file = _fetch_jp2_zip(identifier, doc.name) + with open(f"{CACHE_DIR}/{_url_encode(doc.name)}_page_numbers.json", "w") as f: + json.dump(page_nums, f) + with open(f"{CACHE_DIR}/{_url_encode(doc.name)}_jp2.zip", "wb") as f: + f.write(zip_file) + + +def _url_encode(string: str) -> str: + """ + Helper to encode to a URL-encoded (in other words, percent-encoded) string. + """ + + return urllib.parse.quote(string, safe=" ._") + + +def _url_decode(string: str) -> str: + """ + Helper to decode from a URL-encoded (in other words, percent-encoded) + string. + """ + + return urllib.parse.unquote(string) + + +def _fetch_page_nums(identifier: str, doc_name: str) -> dict: + """ + Fetch JSON file with page number metadata for an `ArchiveDoc`. + """ + + # `self.name` does not get percent-encoded, because it is derived from the + # file path itself as defined by archive.org. Percent- encoding it further + # may result in a 404 error. + page_nums_resp = requests.get( + f"https://archive.org/download/{_url_encode(identifier)}/{doc_name}_page_numbers.json" + ) + page_nums_resp.raise_for_status() + return page_nums_resp.json() + + +def _fetch_jp2_zip(identifier: str, doc_name: str) -> bytes: + """ + Fetch zip file with processed page scans for an `ArchiveDoc`. + """ + + # `self.name` does not get percent-encoded, because it is derived + # from the file path itself as defined by archive.org. Percent- + # encoding it further may result in a 404 error. + zip_resp = requests.get( + f"https://archive.org/download/{_url_encode(identifier)}/{doc_name}_jp2.zip" + ) + zip_resp.raise_for_status() + return zip_resp.content diff --git a/diagnostics.py b/diagnostics.py new file mode 100644 index 0000000..493d9c4 --- /dev/null +++ b/diagnostics.py @@ -0,0 +1,65 @@ +import json +from argparse import ArgumentParser +from time import time + +import numpy as np + +from archive_item import cache_item, fetch_item +from engine import analyze_doc + + +def main(): + parser = ArgumentParser() + parser.add_argument("--item-id") + parser.add_argument("--cpus", type=int, default=4) + args = parser.parse_args() + + cache_item( + args.item_id, + # Will not refetch if value is already cached. + overwrite=False, + ) + item = fetch_item(args.item_id, use_cache=True) + + t_start = time() + + minimal_docs = ( + [doc for doc in item.docs if doc.name != ""] + if len(item.docs) > 1 + else item.docs + ) + analyses = [ + analyze_doc(doc, parallel=args.cpus, use_cache=True) for doc in minimal_docs + ] + + t_end = time() + + print( + json.dumps( + { + "analyses": analyses, + "duration_secs": t_end - t_start, + "disoriented_pages": [ + [ + i + for i, page in enumerate(doc["pages"]) + if not page["ocr_orientation_match"] + ] + for doc in analyses + ], + "sharpness_max": max( + *[page["sharpness"] for doc in analyses for page in doc["pages"]] + ), + "sharpness_median": np.median( + [page["sharpness"] for doc in analyses for page in doc["pages"]] + ).tolist(), + "sharpness_min": min( + *[page["sharpness"] for doc in analyses for page in doc["pages"]] + ), + } + ) + ) + + +if __name__ == "__main__": + main() diff --git a/engine.py b/engine.py index c6f91d2..42aa631 100644 --- a/engine.py +++ b/engine.py @@ -1,62 +1,30 @@ -import urllib.parse import re from dataclasses import dataclass -from io import BytesIO from multiprocessing import Pool -from sys import stderr -from zipfile import ZipFile +from sys import stdout import numpy as np import pytesseract -import requests from PIL import Image, ImageFilter +from archive_item import ArchiveDoc -def analyze_item(item_id, ocr_langs="eng+fra", parallel=1, verbose=False): - escaped_item_id = urllib.parse.quote(item_id, safe="") + +def analyze_doc( + doc: ArchiveDoc, ocr_langs="eng+fra", parallel=1, use_cache=False, verbose=False +): + if verbose: + print(f"Loading {doc.name}...") + stdout.flush() + + tasks: PageAnalysisTask = [ + PageAnalysisTask(im=leaf.image, ocr_langs=ocr_langs) + for leaf in doc.fetch_leaves(use_cache=use_cache) + ] if verbose: - print("Downloading...", file=stderr) - stderr.flush() - page_nums_resp = requests.get( - f"https://archive.org/metadata/{escaped_item_id}/page_numbers/pages" - ) - page_nums_resp.raise_for_status() - page_nums = page_nums_resp.json()["result"] - - zip_resp = requests.get( - f"https://archive.org/download/{escaped_item_id}/{escaped_item_id}_jp2.zip" - ) - zip_resp.raise_for_status() - - if verbose: - print("Decompressing...", file=stderr) - stderr.flush() - tasks = [] - with ZipFile(BytesIO(zip_resp.content)) as jp_zip: - for leaf_num, file_name in enumerate(sorted(jp_zip.namelist())): - for page_index, page_num_info in enumerate(page_nums): - if page_num_info["leafNum"] == leaf_num: - # Stop iterating and keep page_index set to the current item. - break - else: - # Set to -1 to indicate that leaf was not found in page_num list. - page_index = -1 - - if page_index != -1: - with jp_zip.open(file_name) as jp_file: - im = Image.open(jp_file).convert("L") - im.thumbnail((3200, 3200)) - tasks.append( - PageAnalysisTask( - im=im, - ocr_langs=ocr_langs, - ) - ) - - if verbose: - print(f"Processing {len(page_nums)} pages...", file=stderr) - stderr.flush() + print(f"Processing {len(tasks)} pages...", file=stdout) + stdout.flush() if parallel > 1: # Parallelize image processing and OCR of pages across up to n cores. diff --git a/main.py b/main.py index 95e19f0..2e0ccdc 100644 --- a/main.py +++ b/main.py @@ -8,14 +8,28 @@ from time import sleep import requests -from engine import analyze_item +from archive_item import fetch_item +from engine import analyze_doc def main(): parser = ArgumentParser() - parser.add_argument("--database", default="./microqa.db") - parser.add_argument("--cpus", type=int, default=2) - parser.add_argument("--earliest-review-date", default="20250701") + parser.add_argument( + "--database", + help="path to sqlite database for analysis output", + default="./microqa.db", + ) + parser.add_argument( + "--cpus", + type=int, + help="number of concurrent subprocesses to use; higher is generally faster but consumes more resources", + default=2, + ) + parser.add_argument( + "--earliest-review-date", + help="script will attempt to analyze all items with a review date greater than or equal to this value (YYYYMMDD)", + default="20250701", + ) args = parser.parse_args() with sqlite3.connect(args.database) as conn: @@ -28,9 +42,14 @@ create table if not exists items ( analyzed_date text )""") cur.execute(""" +create table if not exists docs ( + name text primary key not null, + item text not null +)""") + cur.execute(""" create table if not exists pages ( id int primary key, - item text not null, + doc text not null, page int not null, orientation_match boolean not null, sharpness real not null, @@ -41,9 +60,10 @@ create table if not exists pages ( cur.execute( "create index if not exists analyzed_date_idx on items (analyzed_date)" ) - cur.execute("create index if not exists item_idx on pages (item)") + cur.execute("create index if not exists item_idx on docs (item)") + cur.execute("create index if not exists doc_idx on pages (doc)") cur.execute( - "create unique index if not exists item_page_idx on pages (item, page)" + "create unique index if not exists doc_page_idx on pages (doc, page)" ) conn.commit() @@ -63,36 +83,40 @@ order by review_date for _ in range(N_ATTEMPTS): try: print(f"Processing {item_id}") - analysis = analyze_item( - item_id, parallel=args.cpus, verbose=True + item = fetch_item(item_id) + minimal_docs = ( + [doc for doc in item.docs if doc.name != ""] + if len(item.docs) > 1 + else item.docs ) - for i, page in enumerate(analysis["pages"]): + for doc in minimal_docs: cur.execute( - """ + "insert into docs (name, item) values (?, ?) on conflict do nothing", + [doc.name, item_id], + ) + analysis = analyze_doc( + doc, parallel=args.cpus, verbose=True + ) + for i, page in enumerate(analysis["pages"]): + cur.execute( + """ insert into pages ( - item, + doc, page, orientation_match, sharpness, is_blank, text_margin_px -) values ( - ?, - ?, - ?, - ?, - ?, - ? - )""", - [ - item_id, - i + 1, - page["ocr_orientation_match"], - page["sharpness"], - page["blank"], - page["text_margin_px"], - ], - ) +) values (?, ?, ?, ?, ?, ?)""", + [ + doc.name, + i + 1, + page["ocr_orientation_match"], + page["sharpness"], + page["blank"], + page["text_margin_px"], + ], + ) cur.execute( "update items set analyzed_date = ? where id = ?", [datetime.utcnow().strftime("%Y%m%d%H%M%S"), item_id], @@ -126,35 +150,9 @@ def pull_new_item_ids(conn, earliest_review_date): query = re.sub(r"\s+", "+", query.strip()) sort = re.sub(r"\s+", "+", sort.strip()) - # params = { - # "q": query, - # "count": 100, - # "fields": "identifier,review_date", - # "sorts": sort, - # } - # for i in range(1, 999): - # resp = requests.get( - # "https://archive.org/services/search/v1/scrape", - # params=params, - # ) - # resp.raise_for_status() - # print(resp.text) - # try: - # body = resp.json() - # except Exception as err: - # print("Body:", resp.text, file=stderr) - # raise err - # for doc in body["items"]: - # cur.execute( - # "insert into items (id, review_date, skip_analysis) values (?, ?, false) on conflict do nothing", - # (doc["identifier"], doc["review_date"]), - # ) - # conn.commit() - # cursor = body.get("cursor", None) - # if cursor is None: - # break - # params = params.copy() - # params["cursor"] = cursor + # Archive.org has a paginated scraping API, but the query feature seems to + # be broken in mysterious ways and more or less impossible to use for our + # purposes. resp = requests.get( f"https://archive.org/advancedsearch.php?q={query}&sort[]={sort}&fl[]=identifier&fl[]=review_date&rows=250000&output=json", )