MicroQA/engine.py
2025-10-04 15:10:10 -07:00

239 lines
8.2 KiB
Python

import urllib.parse
import re
from dataclasses import dataclass
from io import BytesIO
from multiprocessing import Pool
from sys import stderr
from zipfile import ZipFile
import numpy as np
import pytesseract
import requests
from PIL import Image, ImageFilter
def analyze_item(item_id, ocr_langs="eng+fra", parallel=1, verbose=False):
escaped_item_id = urllib.parse.quote(item_id, safe="")
if verbose:
print("Downloading...", file=stderr)
stderr.flush()
page_nums_resp = requests.get(
f"https://archive.org/metadata/{escaped_item_id}/page_numbers/pages"
)
page_nums_resp.raise_for_status()
page_nums = page_nums_resp.json()["result"]
zip_resp = requests.get(
f"https://archive.org/download/{escaped_item_id}/{escaped_item_id}_jp2.zip"
)
zip_resp.raise_for_status()
if verbose:
print("Decompressing...", file=stderr)
stderr.flush()
tasks = []
with ZipFile(BytesIO(zip_resp.content)) as jp_zip:
for leaf_num, file_name in enumerate(sorted(jp_zip.namelist())):
for page_index, page_num_info in enumerate(page_nums):
if page_num_info["leafNum"] == leaf_num:
# Stop iterating and keep page_index set to the current item.
break
else:
# Set to -1 to indicate that leaf was not found in page_num list.
page_index = -1
if page_index != -1:
with jp_zip.open(file_name) as jp_file:
im = Image.open(jp_file).convert("L")
im.thumbnail((3200, 3200))
tasks.append(
PageAnalysisTask(
im=im,
ocr_langs=ocr_langs,
)
)
if verbose:
print(f"Processing {len(page_nums)} pages...", file=stderr)
stderr.flush()
if parallel > 1:
# Parallelize image processing and OCR of pages across up to n cores.
with Pool(parallel) as pool:
return {"pages": pool.map(analyze_page, tasks)}
return {"pages": [analyze_page(task) for task in tasks]}
@dataclass
class PageAnalysisTask:
"""
Attributes:
im PIL Image, pre-scaled using .thumbnail() to fit the long
edge to 3200 px.
ocr_langs Tesseract language codes (3 letters each, in a "+"-separated
list).
"""
im: Image.Image
ocr_langs: str = "eng+fra"
def analyze_page(task):
im_cropped = task.im.crop(
(
task.im.size[0] * 0.1,
task.im.size[1] * 0.1,
task.im.size[0] * 0.9,
task.im.size[1] * 0.9,
)
)
is_blank = im_cropped.getextrema()[0] > 255 * 0.8
if is_blank:
max_sharpness = 1
ocr_orientation_match = True
text_margin_px = -1
else:
max_sharpness = 0.0
if im_cropped.size[0] < im_cropped.size[1]:
# Page is in portrait orientation.
segments_x = 2
segments_y = 3
else:
# Page is in landscape orientation.
segments_x = 3
segments_y = 2
for i in range(segments_x):
for j in range(segments_y):
max_sharpness = max(
max_sharpness,
analyze_sharpness(
im_cropped.crop(
(
im_cropped.size[0] / segments_x * i,
im_cropped.size[1] / segments_y * j,
im_cropped.size[0] / segments_x * (i + 1),
im_cropped.size[1] / segments_y * (j + 1),
)
)
),
)
OCR_SCALE = 1
best_ocr_score = -1
best_ocr_words = None
best_ocr_orientation = -1
for orientation in range(4):
im_rotated = task.im.resize(
np.int_(np.array(task.im.size) * OCR_SCALE)
).rotate(90 * orientation, expand=True)
ocr = pytesseract.image_to_data(
im_rotated,
lang=task.ocr_langs,
config=f"--oem 1 --dpi {int(300 * OCR_SCALE)} --tessdata-dir ./data/tessdata_fast-4.1.0",
output_type=pytesseract.Output.DATAFRAME,
).fillna({"text": ""})
# Keep only words that Tesseract is confident in, and which are
# oriented horizontally.
words = ocr[(ocr["conf"] > 90) & (ocr["width"] > ocr["height"])]
# Keep only alphabetical words of 4 or more characters.
words = words[
words.apply(
lambda row: re.fullmatch(r"[a-zA-Z]{4,}", str(row["text"]))
is not None,
axis=1,
)
]
if words.shape[0] > best_ocr_score:
best_ocr_score = words.shape[0]
best_ocr_orientation = orientation
best_ocr_words = words
if best_ocr_score > 50:
# Unlikely that another orientation will have more words, so
# stop eating up CPU.
break
if best_ocr_words.empty:
ocr_orientation_match = True
text_margin_px = -1
else:
ocr_orientation_match = best_ocr_orientation == 0
best_ocr_dims = OCR_SCALE * np.array(
task.im.size
if best_ocr_orientation % 2 == 0
else (task.im.size[1], task.im.size[0])
)
word_margins_all_directions = np.sort(
np.int_(
np.concat(
(
best_ocr_words["left"].to_numpy(),
best_ocr_words["top"].to_numpy(),
best_ocr_dims[0]
- (
best_ocr_words["left"] + best_ocr_words["width"]
).to_numpy(),
best_ocr_dims[1]
- (
best_ocr_words["top"] + best_ocr_words["height"]
).to_numpy(),
)
)
# Transform back into original image pixel density
/ OCR_SCALE
)
)
# Skip the n closest words to the edge, to help ignore stray OCR artifacts.
SKIP_WORDS = 2
text_margin_px = int(
word_margins_all_directions[SKIP_WORDS]
if word_margins_all_directions.shape[0] > SKIP_WORDS
else -1
)
return {
"blank": is_blank,
"ocr_orientation_match": ocr_orientation_match,
"size_analyzed": task.im.size,
"sharpness": max_sharpness,
"text_margin_px": text_margin_px,
}
def analyze_sharpness(im):
"""
Crudely quantifies the "sharpness" of edges in an image, on a scale of 0 to
1. The scale is not linear with respect to scan quality: anything above 0.1
is usually fine.
"""
arr = np.asarray(im)
# Normalize contrast based on brightest and darkest pixels. For example,
# NORM_QUANTILE=0.1 will attempt to transform pixel values so that 80% fall
# between 10% brightness and 90% brightness. In practice, a value around
# 0.02 seems to work fairly well.
NORM_QUANTILE = 0.03
pixel_range = np.quantile(arr, 1.0 - NORM_QUANTILE) - np.quantile(
arr, NORM_QUANTILE
)
if pixel_range == 0:
arr_normalized = arr
else:
arr_normalized = arr * (1.0 - NORM_QUANTILE * 2) / pixel_range
arr_normalized = (
arr_normalized - np.quantile(arr_normalized, NORM_QUANTILE) + NORM_QUANTILE
)
arr_normalized = np.uint8(np.clip(arr_normalized, 0, 1) * 255)
# "Sharpness" is determined by measuring the median intensity of pixels
# near edges, after an edge detection filter has been applied to the image.
edges_arr = np.asarray(
Image.fromarray(arr_normalized).filter(ImageFilter.FIND_EDGES)
)
EDGE_THRESHOLD = 8
return np.median(edges_arr[edges_arr > EDGE_THRESHOLD]) / 255