2025-11-07 05:41:18 +00:00
|
|
|
from dataclasses import dataclass, field
|
2025-10-04 15:09:16 -07:00
|
|
|
from multiprocessing import Pool
|
2025-10-04 18:03:03 -07:00
|
|
|
from sys import stdout
|
2025-10-04 15:09:16 -07:00
|
|
|
|
|
|
|
|
import numpy as np
|
|
|
|
|
from PIL import Image, ImageFilter
|
|
|
|
|
|
2025-11-07 05:41:18 +00:00
|
|
|
from .items import ArchiveDoc
|
|
|
|
|
from .ocr.tesseract import OcrEngine
|
2025-10-04 15:09:16 -07:00
|
|
|
|
|
|
|
|
|
2025-11-07 05:41:18 +00:00
|
|
|
def analyze_doc(doc: ArchiveDoc, parallel=1, use_cache=False, verbose=False):
|
2025-10-04 15:09:16 -07:00
|
|
|
if verbose:
|
2025-10-04 18:03:03 -07:00
|
|
|
print(f"Loading {doc.name}...")
|
|
|
|
|
stdout.flush()
|
2025-10-04 15:09:16 -07:00
|
|
|
|
2025-10-04 18:03:03 -07:00
|
|
|
tasks: PageAnalysisTask = [
|
2025-11-07 05:41:18 +00:00
|
|
|
PageAnalysisTask(im=leaf.image)
|
2025-10-04 18:03:03 -07:00
|
|
|
for leaf in doc.fetch_leaves(use_cache=use_cache)
|
|
|
|
|
]
|
2025-10-04 15:09:16 -07:00
|
|
|
|
|
|
|
|
if verbose:
|
2025-10-04 18:03:03 -07:00
|
|
|
print(f"Processing {len(tasks)} pages...", file=stdout)
|
|
|
|
|
stdout.flush()
|
2025-10-04 15:09:16 -07:00
|
|
|
|
|
|
|
|
if parallel > 1:
|
|
|
|
|
# Parallelize image processing and OCR of pages across up to n cores.
|
|
|
|
|
with Pool(parallel) as pool:
|
|
|
|
|
return {"pages": pool.map(analyze_page, tasks)}
|
|
|
|
|
|
|
|
|
|
return {"pages": [analyze_page(task) for task in tasks]}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class PageAnalysisTask:
|
|
|
|
|
"""
|
|
|
|
|
Attributes:
|
|
|
|
|
im PIL Image, pre-scaled using .thumbnail() to fit the long
|
|
|
|
|
edge to 3200 px.
|
|
|
|
|
ocr_langs Tesseract language codes (3 letters each, in a "+"-separated
|
|
|
|
|
list).
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
im: Image.Image
|
2025-11-07 05:41:18 +00:00
|
|
|
ocr_langs: list[str] = field(default_factory=lambda: ["eng"])
|
2025-10-04 15:09:16 -07:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def analyze_page(task):
|
|
|
|
|
im_cropped = task.im.crop(
|
|
|
|
|
(
|
|
|
|
|
task.im.size[0] * 0.1,
|
|
|
|
|
task.im.size[1] * 0.1,
|
|
|
|
|
task.im.size[0] * 0.9,
|
|
|
|
|
task.im.size[1] * 0.9,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
is_blank = im_cropped.getextrema()[0] > 255 * 0.8
|
|
|
|
|
|
|
|
|
|
if is_blank:
|
|
|
|
|
max_sharpness = 1
|
|
|
|
|
ocr_orientation_match = True
|
|
|
|
|
text_margin_px = -1
|
|
|
|
|
else:
|
|
|
|
|
max_sharpness = 0.0
|
|
|
|
|
if im_cropped.size[0] < im_cropped.size[1]:
|
|
|
|
|
# Page is in portrait orientation.
|
|
|
|
|
segments_x = 2
|
|
|
|
|
segments_y = 3
|
|
|
|
|
else:
|
|
|
|
|
# Page is in landscape orientation.
|
|
|
|
|
segments_x = 3
|
|
|
|
|
segments_y = 2
|
|
|
|
|
for i in range(segments_x):
|
|
|
|
|
for j in range(segments_y):
|
|
|
|
|
max_sharpness = max(
|
|
|
|
|
max_sharpness,
|
|
|
|
|
analyze_sharpness(
|
|
|
|
|
im_cropped.crop(
|
|
|
|
|
(
|
|
|
|
|
im_cropped.size[0] / segments_x * i,
|
|
|
|
|
im_cropped.size[1] / segments_y * j,
|
|
|
|
|
im_cropped.size[0] / segments_x * (i + 1),
|
|
|
|
|
im_cropped.size[1] / segments_y * (j + 1),
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
OCR_SCALE = 1
|
2025-11-07 05:41:18 +00:00
|
|
|
# TODO: Refactor orientation detection logic into the OCR engine
|
|
|
|
|
# modules.
|
2025-10-04 15:09:16 -07:00
|
|
|
best_ocr_score = -1
|
|
|
|
|
best_ocr_words = None
|
|
|
|
|
best_ocr_orientation = -1
|
|
|
|
|
for orientation in range(4):
|
|
|
|
|
im_rotated = task.im.resize(
|
|
|
|
|
np.int_(np.array(task.im.size) * OCR_SCALE)
|
|
|
|
|
).rotate(90 * orientation, expand=True)
|
2025-11-07 05:41:18 +00:00
|
|
|
ocr, ocr_meta = OcrEngine.process(im_rotated, languages=task.ocr_langs)
|
|
|
|
|
|
|
|
|
|
if "page_angle" in ocr_meta:
|
|
|
|
|
# OCR engine automatically accounts for page rotation.
|
|
|
|
|
best_ocr_score = ocr.shape[0]
|
|
|
|
|
# PaddleOCR counts rotation as degrees, in the opposite
|
|
|
|
|
# direction as PIL's `Image.rotate()`
|
|
|
|
|
best_ocr_orientation = (
|
|
|
|
|
4 - round(((ocr_meta["page_angle"] + 360) % 360) / 90)
|
|
|
|
|
) % 4
|
|
|
|
|
best_ocr_words = ocr
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
if ocr.shape[0] > best_ocr_score:
|
|
|
|
|
best_ocr_score = ocr.shape[0]
|
2025-10-04 15:09:16 -07:00
|
|
|
best_ocr_orientation = orientation
|
2025-11-07 05:41:18 +00:00
|
|
|
best_ocr_words = ocr
|
2025-10-04 15:09:16 -07:00
|
|
|
if best_ocr_score > 50:
|
|
|
|
|
# Unlikely that another orientation will have more words, so
|
|
|
|
|
# stop eating up CPU.
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
if best_ocr_words.empty:
|
|
|
|
|
ocr_orientation_match = True
|
|
|
|
|
text_margin_px = -1
|
|
|
|
|
else:
|
|
|
|
|
ocr_orientation_match = best_ocr_orientation == 0
|
|
|
|
|
|
|
|
|
|
best_ocr_dims = OCR_SCALE * np.array(
|
|
|
|
|
task.im.size
|
|
|
|
|
if best_ocr_orientation % 2 == 0
|
|
|
|
|
else (task.im.size[1], task.im.size[0])
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
word_margins_all_directions = np.sort(
|
|
|
|
|
np.int_(
|
|
|
|
|
np.concat(
|
|
|
|
|
(
|
2025-11-07 05:41:18 +00:00
|
|
|
best_ocr_words["x0"].to_numpy(),
|
|
|
|
|
best_ocr_words["y0"].to_numpy(),
|
|
|
|
|
best_ocr_dims[0] - best_ocr_words["x1"].to_numpy(),
|
|
|
|
|
best_ocr_dims[1] - best_ocr_words["y1"].to_numpy(),
|
2025-10-04 15:09:16 -07:00
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
# Transform back into original image pixel density
|
|
|
|
|
/ OCR_SCALE
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
# Skip the n closest words to the edge, to help ignore stray OCR artifacts.
|
|
|
|
|
SKIP_WORDS = 2
|
|
|
|
|
text_margin_px = int(
|
|
|
|
|
word_margins_all_directions[SKIP_WORDS]
|
|
|
|
|
if word_margins_all_directions.shape[0] > SKIP_WORDS
|
|
|
|
|
else -1
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
"blank": is_blank,
|
|
|
|
|
"ocr_orientation_match": ocr_orientation_match,
|
|
|
|
|
"size_analyzed": task.im.size,
|
|
|
|
|
"sharpness": max_sharpness,
|
|
|
|
|
"text_margin_px": text_margin_px,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def analyze_sharpness(im):
|
|
|
|
|
"""
|
|
|
|
|
Crudely quantifies the "sharpness" of edges in an image, on a scale of 0 to
|
|
|
|
|
1. The scale is not linear with respect to scan quality: anything above 0.1
|
|
|
|
|
is usually fine.
|
|
|
|
|
"""
|
|
|
|
|
arr = np.asarray(im)
|
|
|
|
|
|
|
|
|
|
# Normalize contrast based on brightest and darkest pixels. For example,
|
|
|
|
|
# NORM_QUANTILE=0.1 will attempt to transform pixel values so that 80% fall
|
|
|
|
|
# between 10% brightness and 90% brightness. In practice, a value around
|
|
|
|
|
# 0.02 seems to work fairly well.
|
|
|
|
|
NORM_QUANTILE = 0.03
|
|
|
|
|
pixel_range = np.quantile(arr, 1.0 - NORM_QUANTILE) - np.quantile(
|
|
|
|
|
arr, NORM_QUANTILE
|
|
|
|
|
)
|
|
|
|
|
if pixel_range == 0:
|
|
|
|
|
arr_normalized = arr
|
|
|
|
|
else:
|
|
|
|
|
arr_normalized = arr * (1.0 - NORM_QUANTILE * 2) / pixel_range
|
|
|
|
|
arr_normalized = (
|
|
|
|
|
arr_normalized - np.quantile(arr_normalized, NORM_QUANTILE) + NORM_QUANTILE
|
|
|
|
|
)
|
|
|
|
|
arr_normalized = np.uint8(np.clip(arr_normalized, 0, 1) * 255)
|
|
|
|
|
|
|
|
|
|
# "Sharpness" is determined by measuring the median intensity of pixels
|
|
|
|
|
# near edges, after an edge detection filter has been applied to the image.
|
|
|
|
|
edges_arr = np.asarray(
|
|
|
|
|
Image.fromarray(arr_normalized).filter(ImageFilter.FIND_EDGES)
|
|
|
|
|
)
|
|
|
|
|
EDGE_THRESHOLD = 8
|
|
|
|
|
return np.median(edges_arr[edges_arr > EDGE_THRESHOLD]) / 255
|