MicroQA/engine.py

208 lines
7 KiB
Python
Raw Normal View History

2025-10-04 15:09:16 -07:00
import re
from dataclasses import dataclass
from multiprocessing import Pool
from sys import stdout
2025-10-04 15:09:16 -07:00
import numpy as np
import pytesseract
from PIL import Image, ImageFilter
from archive_item import ArchiveDoc
2025-10-04 15:09:16 -07:00
def analyze_doc(
doc: ArchiveDoc, ocr_langs="eng+fra", parallel=1, use_cache=False, verbose=False
):
2025-10-04 15:09:16 -07:00
if verbose:
print(f"Loading {doc.name}...")
stdout.flush()
2025-10-04 15:09:16 -07:00
tasks: PageAnalysisTask = [
PageAnalysisTask(im=leaf.image, ocr_langs=ocr_langs)
for leaf in doc.fetch_leaves(use_cache=use_cache)
]
2025-10-04 15:09:16 -07:00
if verbose:
print(f"Processing {len(tasks)} pages...", file=stdout)
stdout.flush()
2025-10-04 15:09:16 -07:00
if parallel > 1:
# Parallelize image processing and OCR of pages across up to n cores.
with Pool(parallel) as pool:
return {"pages": pool.map(analyze_page, tasks)}
return {"pages": [analyze_page(task) for task in tasks]}
@dataclass
class PageAnalysisTask:
"""
Attributes:
im PIL Image, pre-scaled using .thumbnail() to fit the long
edge to 3200 px.
ocr_langs Tesseract language codes (3 letters each, in a "+"-separated
list).
"""
im: Image.Image
ocr_langs: str = "eng+fra"
def analyze_page(task):
im_cropped = task.im.crop(
(
task.im.size[0] * 0.1,
task.im.size[1] * 0.1,
task.im.size[0] * 0.9,
task.im.size[1] * 0.9,
)
)
is_blank = im_cropped.getextrema()[0] > 255 * 0.8
if is_blank:
max_sharpness = 1
ocr_orientation_match = True
text_margin_px = -1
else:
max_sharpness = 0.0
if im_cropped.size[0] < im_cropped.size[1]:
# Page is in portrait orientation.
segments_x = 2
segments_y = 3
else:
# Page is in landscape orientation.
segments_x = 3
segments_y = 2
for i in range(segments_x):
for j in range(segments_y):
max_sharpness = max(
max_sharpness,
analyze_sharpness(
im_cropped.crop(
(
im_cropped.size[0] / segments_x * i,
im_cropped.size[1] / segments_y * j,
im_cropped.size[0] / segments_x * (i + 1),
im_cropped.size[1] / segments_y * (j + 1),
)
)
),
)
OCR_SCALE = 1
best_ocr_score = -1
best_ocr_words = None
best_ocr_orientation = -1
for orientation in range(4):
im_rotated = task.im.resize(
np.int_(np.array(task.im.size) * OCR_SCALE)
).rotate(90 * orientation, expand=True)
ocr = pytesseract.image_to_data(
im_rotated,
lang=task.ocr_langs,
config=f"--oem 1 --dpi {int(300 * OCR_SCALE)} --tessdata-dir ./data/tessdata_fast-4.1.0",
output_type=pytesseract.Output.DATAFRAME,
).fillna({"text": ""})
# Keep only words that Tesseract is confident in, and which are
# oriented horizontally.
words = ocr[(ocr["conf"] > 90) & (ocr["width"] > ocr["height"])]
# Keep only alphabetical words of 4 or more characters.
words = words[
words.apply(
lambda row: re.fullmatch(r"[a-zA-Z]{4,}", str(row["text"]))
is not None,
axis=1,
)
]
if words.shape[0] > best_ocr_score:
best_ocr_score = words.shape[0]
best_ocr_orientation = orientation
best_ocr_words = words
if best_ocr_score > 50:
# Unlikely that another orientation will have more words, so
# stop eating up CPU.
break
if best_ocr_words.empty:
ocr_orientation_match = True
text_margin_px = -1
else:
ocr_orientation_match = best_ocr_orientation == 0
best_ocr_dims = OCR_SCALE * np.array(
task.im.size
if best_ocr_orientation % 2 == 0
else (task.im.size[1], task.im.size[0])
)
word_margins_all_directions = np.sort(
np.int_(
np.concat(
(
best_ocr_words["left"].to_numpy(),
best_ocr_words["top"].to_numpy(),
best_ocr_dims[0]
- (
best_ocr_words["left"] + best_ocr_words["width"]
).to_numpy(),
best_ocr_dims[1]
- (
best_ocr_words["top"] + best_ocr_words["height"]
).to_numpy(),
)
)
# Transform back into original image pixel density
/ OCR_SCALE
)
)
# Skip the n closest words to the edge, to help ignore stray OCR artifacts.
SKIP_WORDS = 2
text_margin_px = int(
word_margins_all_directions[SKIP_WORDS]
if word_margins_all_directions.shape[0] > SKIP_WORDS
else -1
)
return {
"blank": is_blank,
"ocr_orientation_match": ocr_orientation_match,
"size_analyzed": task.im.size,
"sharpness": max_sharpness,
"text_margin_px": text_margin_px,
}
def analyze_sharpness(im):
"""
Crudely quantifies the "sharpness" of edges in an image, on a scale of 0 to
1. The scale is not linear with respect to scan quality: anything above 0.1
is usually fine.
"""
arr = np.asarray(im)
# Normalize contrast based on brightest and darkest pixels. For example,
# NORM_QUANTILE=0.1 will attempt to transform pixel values so that 80% fall
# between 10% brightness and 90% brightness. In practice, a value around
# 0.02 seems to work fairly well.
NORM_QUANTILE = 0.03
pixel_range = np.quantile(arr, 1.0 - NORM_QUANTILE) - np.quantile(
arr, NORM_QUANTILE
)
if pixel_range == 0:
arr_normalized = arr
else:
arr_normalized = arr * (1.0 - NORM_QUANTILE * 2) / pixel_range
arr_normalized = (
arr_normalized - np.quantile(arr_normalized, NORM_QUANTILE) + NORM_QUANTILE
)
arr_normalized = np.uint8(np.clip(arr_normalized, 0, 1) * 255)
# "Sharpness" is determined by measuring the median intensity of pixels
# near edges, after an edge detection filter has been applied to the image.
edges_arr = np.asarray(
Image.fromarray(arr_normalized).filter(ImageFilter.FIND_EDGES)
)
EDGE_THRESHOLD = 8
return np.median(edges_arr[edges_arr > EDGE_THRESHOLD]) / 255