import json import re import urllib.parse from argparse import ArgumentParser from dataclasses import dataclass from io import BytesIO from multiprocessing import Pool from multiprocessing.pool import ThreadPool from sys import stderr, stdin, stdout from zipfile import ZipFile import numpy as np import pytesseract import requests from PIL import Image, ImageFilter OCR_LANGS = "eng+fra" N_OCR_PROCESSES = 4 def main(): parser = ArgumentParser() parser.add_argument("--summarize", action="store_true") parser.add_argument("-v", "--verbose", action="store_true") parser.add_argument("-w", "--workers", type=int, default=1) args = parser.parse_args() # Process STDIN line by line, where each line contains one or more item IDs # separated by whitespace. for line in stdin: item_ids = [value for value in re.split(r",|\s", line) if value] with ThreadPool(args.workers) as pool: if args.verbose: print(f"Running with {args.workers} workers.", file=stderr) stderr.flush() if args.summarize: pool.map( _summarize_item_to_stdout, [ ItemTask(item_id=item_id, verbose=args.verbose) for item_id in item_ids ], ) else: pool.map( _analyze_item_to_stdout, [ ItemTask(item_id=item_id, verbose=args.verbose) for item_id in item_ids ], ) @dataclass class ItemTask: item_id: str verbose: bool def _summarize_item_to_stdout(task): item_id = task.item_id verbose = task.verbose if verbose: print(f"Summarizing item {item_id}...", file=stderr) stderr.flush() analysis = analyze_item(item_id, True, verbose) # 3 or more blank pages in a row is a flag. CONSECUTIVE_BLANKS_THRESHOLD = 3 if len(analysis["pages"]) >= CONSECUTIVE_BLANKS_THRESHOLD: consecutive_blanks = [page["blank"] for page in analysis["pages"]] for _ in range(1, CONSECUTIVE_BLANKS_THRESHOLD): consecutive_blanks = [ value and consecutive_blanks[i] for i, value in enumerate(consecutive_blanks[1:]) ] consecutive_blanks = [ i + 2 # +1 to account for enumeration offset, and +1 to 1-index for i, value in enumerate(consecutive_blanks[1:]) if value and not consecutive_blanks[i] ] else: consecutive_blanks = [] # 3 or more blank pages in a row is a flag. CONSECUTIVE_BLURRY_THRESHOLD = 3 SHARPNESS_THRESHOLD = 0.1 if len(analysis["pages"]) >= CONSECUTIVE_BLURRY_THRESHOLD: consecutive_blurry = [ page["sharpness"] < SHARPNESS_THRESHOLD for page in analysis["pages"] ] for _ in range(1, CONSECUTIVE_BLURRY_THRESHOLD): consecutive_blurry = [ value and consecutive_blurry[i] for i, value in enumerate(consecutive_blurry[1:]) ] consecutive_blurry = [ i + 2 # +1 to account for enumeration offset, and +1 to 1-index for i, value in enumerate(consecutive_blurry[1:]) if value and not consecutive_blurry[i] ] else: consecutive_blurry = [] check_orientation = [ i + 1 for i, page in enumerate(analysis["pages"]) if not page["ocr_orientation_match"] ] check_crop = [ i + 1 for i, page in enumerate(analysis["pages"]) if page["words_near_edge"] > 2 ] if check_orientation or check_crop or consecutive_blanks or consecutive_blurry: print( json.dumps( { "item_id": item_id, "check_orientation": check_orientation, "check_crop": check_crop, "consecutive_blanks": consecutive_blanks, "consecutive_blurry": consecutive_blurry, } ) ) stdout.flush() if verbose: print(f"Done summarizing item {item_id}.", file=stderr) stderr.flush() def _analyze_item_to_stdout(task): item_id = task.item_id verbose = task.verbose if verbose: print(f"Analyzing item {item_id}...", file=stderr) stderr.flush() print(json.dumps(analyze_item(item_id, True, verbose))) stdout.flush() if verbose: print(f"Done analyzing item {item_id}.", file=stderr) stderr.flush() @dataclass class PageAnalysisTask: im: Image.Image page_index: int file_name: str def _analyze_page(task): im_original = task.im page_index = task.page_index file_name = task.file_name im_cropped = im_original.crop( ( im_original.size[0] * 0.1, im_original.size[1] * 0.1, im_original.size[0] * 0.9, im_original.size[1] * 0.9, ) ) is_blank = im_cropped.getextrema()[0] > 255 * 0.8 if is_blank: max_sharpness = 1 ocr_orientation_match = True words_near_edge = 0 else: max_sharpness = 0.0 if im_cropped.size[0] < im_cropped.size[1]: # Page is in portrait orientation. segments_x = 2 segments_y = 3 else: # Page is in landscape orientation. segments_x = 3 segments_y = 2 for i in range(segments_x): for j in range(segments_y): max_sharpness = max( max_sharpness, analyze_sharpness( im_cropped.crop( ( im_cropped.size[0] / segments_x * i, im_cropped.size[1] / segments_y * j, im_cropped.size[0] / segments_x * (i + 1), im_cropped.size[1] / segments_y * (j + 1), ) ) ), ) best_ocr_score = -1 best_ocr_words = None best_ocr_orientation = -1 for orientation in range(4): im_rotated = im_original.rotate(90 * orientation, expand=True) ocr = pytesseract.image_to_data( im_rotated, lang=OCR_LANGS, config="--oem 1 --dpi 300 --tessdata-dir ./tessdata_fast-4.1.0", output_type=pytesseract.Output.DATAFRAME, ).fillna({"text": ""}) words = ocr[(ocr["conf"] > 90) & (ocr["width"] > ocr["height"])] words = words[ words.apply( lambda row: re.fullmatch(r"[a-zA-Z]{4,}", row["text"]) is not None, axis=1, ) ] if words.shape[0] > best_ocr_score: best_ocr_score = words.shape[0] best_ocr_orientation = orientation best_ocr_words = words if best_ocr_score > 50: # Unlikely that another orientation will have more words, so # stop eating up CPU unnecessarily. break ocr_orientation_match = best_ocr_orientation == 0 best_ocr_dims = ( im_original.size if best_ocr_orientation % 2 == 0 else (im_original.size[1], im_original.size[0]) ) EDGE_TOLERANCE = 0.03 words_near_edge = best_ocr_words[ (best_ocr_words["left"] < best_ocr_dims[0] * EDGE_TOLERANCE) | (best_ocr_words["top"] < best_ocr_dims[1] * EDGE_TOLERANCE) | ( best_ocr_words["left"] + best_ocr_words["width"] > best_ocr_dims[0] * (1 - EDGE_TOLERANCE) ) | ( best_ocr_words["top"] + best_ocr_words["height"] > best_ocr_dims[1] * (1 - EDGE_TOLERANCE) ) ] words_near_edge = words_near_edge.shape[0] return { "blank": is_blank, "file_name": file_name, "ocr_orientation_match": ocr_orientation_match, "page_index": page_index, "size": im_original.size, "sharpness": max_sharpness, "words_near_edge": words_near_edge, } def analyze_item(item_id, parallel=False, verbose=False): escaped_item_id = urllib.parse.quote(item_id, safe="") if verbose: print("Downloading...", file=stderr) stderr.flush() page_nums_resp = requests.get( f"https://archive.org/metadata/{escaped_item_id}/page_numbers/pages" ) page_nums_resp.raise_for_status() page_nums = page_nums_resp.json()["result"] zip_resp = requests.get( f"https://archive.org/download/{escaped_item_id}/{escaped_item_id}_jp2.zip" ) zip_resp.raise_for_status() if verbose: print("Decompressing...", file=stderr) stderr.flush() tasks = [] with ZipFile(BytesIO(zip_resp.content)) as jp_zip: for leaf_num, file_name in enumerate(sorted(jp_zip.namelist())): for page_index, page_num_info in enumerate(page_nums): if page_num_info["leafNum"] == leaf_num: # Stop iterating and keep page_index set to the current item. break else: # Set to -1 to indicate that leaf was not found in page_num list. page_index = -1 if page_index != -1: with jp_zip.open(file_name) as jp_file: im = Image.open(jp_file).convert("L") im.thumbnail((3200, 3200)) tasks.append( PageAnalysisTask( im=im, page_index=page_index, file_name=file_name ) ) if verbose: print(f"Processing {len(page_nums)} pages...", file=stderr) stderr.flush() if parallel: # Parallelize image processing and OCR of pages across up to n cores. with Pool(N_OCR_PROCESSES) as pool: return {"pages": pool.map(_analyze_page, tasks)} return {"pages": [_analyze_page(task) for task in tasks]} def analyze_sharpness(im): """ Crudely quantifies the "sharpness" of edges in an image, on a scale of 0 to 1. The scale is not linear with respect to scan quality: anything above 0.1 is usually fine. """ arr = np.asarray(im) # Normalize contrast based on brightest and darkest pixels. For example, # NORM_QUANTILE=0.1 will attempt to transform pixel values so that 80% fall # between 10% brightness and 90% brightness. In practice, a value around # 0.02 seems to work fairly well. NORM_QUANTILE = 0.03 pixel_range = np.quantile(arr, 1.0 - NORM_QUANTILE) - np.quantile( arr, NORM_QUANTILE ) if pixel_range == 0: arr_normalized = arr else: arr_normalized = arr * (1.0 - NORM_QUANTILE * 2) / pixel_range arr_normalized = ( arr_normalized - np.quantile(arr_normalized, NORM_QUANTILE) + NORM_QUANTILE ) arr_normalized = np.uint8(np.clip(arr_normalized, 0, 1) * 255) # "Sharpness" is determined by measuring the median intensity of pixels # near edges, after an edge detection filter has been applied to the image. edges_arr = np.asarray( Image.fromarray(arr_normalized).filter(ImageFilter.FIND_EDGES) ) EDGE_THRESHOLD = 8 return np.median(edges_arr[edges_arr > EDGE_THRESHOLD]) / 255 if __name__ == "__main__": main()