MicroQA/main.py

318 lines
10 KiB
Python
Raw Normal View History

2025-08-10 12:27:39 -07:00
import json
import re
import urllib.parse
from argparse import ArgumentParser
from csv import QUOTE_NONE
from dataclasses import dataclass
from io import BytesIO, StringIO
from multiprocessing import Pool
from multiprocessing.pool import ThreadPool
from sys import stderr, stdin, stdout
from zipfile import ZipFile
import numpy as np
import pandas as pd
import pytesseract
import requests
from PIL import Image, ImageFilter
def main():
parser = ArgumentParser()
parser.add_argument("--summarize", action="store_true")
parser.add_argument("-v", "--verbose", action="store_true")
parser.add_argument("-w", "--workers", type=int, default=1)
args = parser.parse_args()
# Process STDIN line by line, where each line contains one or more item IDs
# separated by whitespace.
for line in stdin:
item_ids = [value for value in re.split(r",|\s", line) if value]
with ThreadPool(args.workers) as pool:
if args.verbose:
print(f"Running with {args.workers} workers.", file=stderr)
stderr.flush()
if args.summarize:
pool.map(
_summarize_item_to_stdout,
[
ItemTask(item_id=item_id, verbose=args.verbose)
for item_id in item_ids
],
)
else:
pool.map(
_analyze_item_to_stdout,
[
ItemTask(item_id=item_id, verbose=args.verbose)
for item_id in item_ids
],
)
@dataclass
class ItemTask:
item_id: str
verbose: bool
def _summarize_item_to_stdout(task):
item_id = task.item_id
verbose = task.verbose
if verbose:
print(f"Summarizing item {item_id}...", file=stderr)
stderr.flush()
analysis = analyze_item(item_id, verbose)
# 3 or more blank pages in a row is a flag.
CONSECUTIVE_BLANKS_THRESHOLD = 3
if len(analysis["pages"]) >= CONSECUTIVE_BLANKS_THRESHOLD:
consecutive_blanks = [page["blank"] for page in analysis["pages"]]
for _ in range(1, CONSECUTIVE_BLANKS_THRESHOLD):
consecutive_blanks = [
value and consecutive_blanks[i]
for i, value in enumerate(consecutive_blanks[1:])
]
consecutive_blanks = [
i + 2 # +1 to account for enumeration offset, and +1 to 1-index
for i, value in enumerate(consecutive_blanks[1:])
if value and not consecutive_blanks[i]
]
else:
consecutive_blanks = []
# 3 or more blank pages in a row is a flag.
CONSECUTIVE_BLURRY_THRESHOLD = 3
SHARPNESS_THRESHOLD = 0.08
if len(analysis["pages"]) >= CONSECUTIVE_BLURRY_THRESHOLD:
consecutive_blurry = [
page["sharpness"] < SHARPNESS_THRESHOLD for page in analysis["pages"]
]
for _ in range(1, CONSECUTIVE_BLURRY_THRESHOLD):
consecutive_blurry = [
value and consecutive_blurry[i]
for i, value in enumerate(consecutive_blurry[1:])
]
consecutive_blurry = [
i + 2 # +1 to account for enumeration offset, and +1 to 1-index
for i, value in enumerate(consecutive_blurry[1:])
if value and not consecutive_blurry[i]
]
else:
consecutive_blurry = []
check_orientation = [
i + 1
for i, page in enumerate(analysis["pages"])
if not page["ocr_orientation_match"]
]
if check_orientation or consecutive_blanks or consecutive_blurry:
print(
json.dumps(
{
"item_id": item_id,
"check_orientation": check_orientation,
"consecutive_blanks": consecutive_blanks,
"consecutive_blurry": consecutive_blurry,
}
)
)
stdout.flush()
if verbose:
print(f"Done summarizing item {item_id}.", file=stderr)
stderr.flush()
def _analyze_item_to_stdout(task):
item_id = task.item_id
verbose = task.verbose
if verbose:
print(f"Analyzing item {item_id}...", file=stderr)
stderr.flush()
print(json.dumps(analyze_item(item_id, verbose)))
stdout.flush()
if verbose:
print(f"Done analyzing item {item_id}.", file=stderr)
stderr.flush()
@dataclass
class PageAnalysisTask:
im: Image.Image
page_index: int
file_name: str
def _analyze_page(task):
im_original = task.im
page_index = task.page_index
file_name = task.file_name
im_cropped = im_original.crop(
(
im_original.size[0] * 0.1,
im_original.size[1] * 0.1,
im_original.size[0] * 0.9,
im_original.size[1] * 0.9,
)
)
is_blank = im_cropped.getextrema()[0] > 255 * 0.8
if is_blank:
max_sharpness = 1
ocr_orientation_match = True
else:
max_sharpness = 0.0
if im_cropped.size[0] < im_cropped.size[1]:
# Page is in portrait orientation.
segments_x = 2
segments_y = 3
else:
# Page is in landscape orientation.
segments_x = 3
segments_y = 2
for i in range(segments_x):
for j in range(segments_y):
max_sharpness = max(
max_sharpness,
analyze_sharpness(
im_cropped.crop(
(
im_cropped.size[0] / segments_x * i,
im_cropped.size[1] / segments_y * j,
im_cropped.size[0] / segments_x * (i + 1),
im_cropped.size[1] / segments_y * (j + 1),
)
)
),
)
n_words = []
for orientation in range(4):
im_rotated = im_original.rotate(90 * orientation)
ocr = pd.read_csv(
StringIO(pytesseract.image_to_data(im_rotated)),
sep="\t",
quoting=QUOTE_NONE,
dtype={"text": str},
).fillna({"text": ""})
words = ocr[(ocr["conf"] > 90) & (ocr["width"] > ocr["height"])]
words = words[
words.apply(
lambda row: re.fullmatch(r"[a-zA-Z]{4,}", row["text"]) is not None,
axis=1,
)
]
n_words.append(words.shape[0])
if len(words) > 50:
# Unlikely that another orientation will have more words, so
# stop eating up CPU unnecessarily.
break
ocr_orientation_match = np.max(n_words) == n_words[0]
return {
"blank": is_blank,
"file_name": file_name,
"ocr_orientation_match": ocr_orientation_match,
"page_index": page_index,
"size": im_original.size,
"sharpness": max_sharpness,
}
def analyze_item(item_id, parallel=False, verbose=False):
escaped_item_id = urllib.parse.quote(item_id, safe="")
if verbose:
print("Downloading...", file=stderr)
stderr.flush()
page_nums_resp = requests.get(
f"https://archive.org/metadata/{escaped_item_id}/page_numbers/pages"
)
page_nums_resp.raise_for_status()
page_nums = page_nums_resp.json()["result"]
zip_resp = requests.get(
f"https://archive.org/download/{escaped_item_id}/{escaped_item_id}_jp2.zip"
)
zip_resp.raise_for_status()
if verbose:
print("Decompressing...", file=stderr)
stderr.flush()
tasks = []
with ZipFile(BytesIO(zip_resp.content)) as jp_zip:
for leaf_num, file_name in enumerate(sorted(jp_zip.namelist())):
for page_index, page_num_info in enumerate(page_nums):
if page_num_info["leafNum"] == leaf_num:
# Stop iterating and keep page_index set to the current item.
break
else:
# Set to -1 to indicate that leaf was not found in page_num list.
page_index = -1
if page_index != -1:
with jp_zip.open(file_name) as jp_file:
im = Image.open(jp_file).convert("L")
tasks.append(
PageAnalysisTask(
im=im, page_index=page_index, file_name=file_name
)
)
if verbose:
print(f"Processing {len(page_nums)} pages...", file=stderr)
stderr.flush()
if parallel:
# Parallelize image processing and OCR of pages across up to n cores.
N_PROCESSES = 8
with Pool(N_PROCESSES) as pool:
return {"pages": pool.map(_analyze_page, tasks)}
return {"pages": [_analyze_page(task) for task in tasks]}
def analyze_sharpness(im):
"""
Crudely quantifies the "sharpness" of edges in an image, on a scale of 0 to
1. The scale is not linear with respect to scan quality: anything above 0.1
is usually fine.
"""
arr = np.asarray(im)
# Normalize contrast based on brightest and darkest pixels. For example,
# NORM_QUANTILE=0.1 will attempt to transform pixel values so that 80% fall
# between 10% brightness and 90% brightness. In practice, a value around
# 0.02 seems to work fairly well.
NORM_QUANTILE = 0.03
pixel_range = np.quantile(arr, 1.0 - NORM_QUANTILE) - np.quantile(
arr, NORM_QUANTILE
)
if pixel_range == 0:
arr_normalized = arr
else:
arr_normalized = arr * (1.0 - NORM_QUANTILE * 2) / pixel_range
arr_normalized = (
arr_normalized - np.quantile(arr_normalized, NORM_QUANTILE) + NORM_QUANTILE
)
arr_normalized = np.uint8(np.clip(arr_normalized, 0, 1) * 255)
# "Sharpness" is determined by measuring the median intensity of pixels
# near edges, after an edge detection filter has been applied to the image.
edges_arr = np.asarray(
Image.fromarray(arr_normalized).filter(ImageFilter.FIND_EDGES)
)
EDGE_THRESHOLD = 8
return np.median(edges_arr[edges_arr > EDGE_THRESHOLD]) / 255
if __name__ == "__main__":
main()