diff --git a/lib/sycamore/sycamore/tests/unit/utils/test_pdf_utils.py b/lib/sycamore/sycamore/tests/unit/utils/test_pdf_utils.py new file mode 100644 index 000000000..1ac862213 --- /dev/null +++ b/lib/sycamore/sycamore/tests/unit/utils/test_pdf_utils.py @@ -0,0 +1,158 @@ +from io import BytesIO +from pypdf import PdfReader +import pytest +import re +import sycamore +from sycamore.data import Element +from sycamore.utils.pdf_utils import ( + flatten_selected_pages, + filter_elements_by_page, + select_pdf_pages, + select_pages, +) +from sycamore.tests.config import TEST_DIR + + +def test_flatten_selected_pages_single_page(): + result = flatten_selected_pages([3], 10) + assert result == ([3], {1: 3}) + + +def test_flatten_selected_pages_page_range(): + result = flatten_selected_pages([[2, 4]], 10) + assert result == ([2, 3, 4], {1: 2, 2: 3, 3: 4}) + + +def test_flatten_selected_pages_mixed(): + result = flatten_selected_pages([1, [3, 5], 7], 10) + assert result == ([1, 3, 4, 5, 7], {2: 3, 3: 4, 4: 5, 5: 7}) + + +def test_flatten_selected_pages_out_of_order(): + result = flatten_selected_pages([[5, 7], 2, [3, 4]], 10) + assert result == ([5, 6, 7, 2, 3, 4], {1: 5, 2: 6, 3: 7, 4: 2, 5: 3, 6: 4}) + + +def test_flatten_selected_pages_invalid_range(): + with pytest.raises(ValueError, match=re.escape("For selected_pages like [a, b] it must be that a <= b.")): + flatten_selected_pages([[5, 3]], 10) + + +def test_flatten_selected_pages_overlapping(): + with pytest.raises(ValueError, match="selected_pages may not include overlapping pages."): + flatten_selected_pages([[1, 3], [2, 4]], 10) + + +def test_flatten_selected_pages_out_of_bounds(): + with pytest.raises(ValueError, match="Invalid page number"): + flatten_selected_pages([11], 10) + + +def test_flatten_selected_pages_zero_page(): + with pytest.raises(ValueError, match="Invalid page number"): + flatten_selected_pages([0], 10) + + +def test_flatten_selected_pages_invalid_type(): + with pytest.raises(ValueError, match="Page selection must either be an integer or a 2-element list"): + flatten_selected_pages(["1"], 10) + + +def test_flatten_selected_pages_empty_input(): + result = flatten_selected_pages([], 10) + assert result == ([], {}) + + +def test_flatten_selected_pages_all_pages(): + result = flatten_selected_pages([[1, 10]], 10) + assert result == (list(range(1, 11)), {}) + + +def test_flatten_selected_pages_single_page_as_range(): + result = flatten_selected_pages([[3, 3]], 10) + assert result == ([3], {1: 3}) + + +def test_select_pdf_pages(): + path = TEST_DIR / "resources/data/pdfs/Ray.pdf" + + bytes_out = BytesIO() + with open(path, "rb") as infile: + select_pdf_pages(infile, bytes_out, [1, 2, 4]) + + bytes_out.seek(0) + reader = PdfReader(bytes_out) + assert len(reader.pages) == 3 + + +def test_select_pdf_pages_empty(): + path = TEST_DIR / "resources/data/pdfs/Ray.pdf" + + bytes_out = BytesIO() + with open(path, "rb") as infile: + select_pdf_pages(infile, bytes_out, []) + + bytes_out.seek(0) + reader = PdfReader(bytes_out) + assert len(reader.pages) == 0 + + +def test_select_pdf_pages_invalid_pages(): + path = TEST_DIR / "resources/data/pdfs/Ray.pdf" + bytes_out = BytesIO() + with pytest.raises(IndexError): + with open(path, "rb") as infile: + select_pdf_pages(infile, bytes_out, [1, 3, 100]) + + +def test_select_pdf_pages_existing_reader(): + path = TEST_DIR / "resources/data/pdfs/Ray.pdf" + + bytes_out = BytesIO() + with PdfReader(path) as reader: + select_pdf_pages(reader, bytes_out, [1, 2, 4]) + bytes_out.seek(0) + out_reader = PdfReader(bytes_out) + assert len(out_reader.pages) == 3 + + +def test_filter_elements_by_page(): + elements = [ + Element(properties={"page_number": 1}), + Element(properties={"page_number": 1}), + Element(properties={"page_number": 2}), + Element(properties={"page_number": 3}), + Element(properties={"page_number": 4}), + ] + + result = filter_elements_by_page(elements, [1]) + assert len(result) == 2 and all(e.properties["page_number"] == 1 for e in result) + + result = filter_elements_by_page(elements, [2, 4]) + assert sorted([e.properties["page_number"] for e in result]) == [1, 2] + + result = filter_elements_by_page(elements, []) + assert len(result) == 0 + + result = filter_elements_by_page(elements, [5]) + assert len(result) == 0 + + +def test_select_pages(): + import copy + + path = TEST_DIR / "resources/data/pdfs/Ray.pdf" + context = sycamore.init(exec_mode=sycamore.EXEC_LOCAL) + docs = context.read.binary(paths=[str(path)], binary_format="pdf").take_all() + + assert len(docs) == 1 + doc = docs[0] + + doc_fn = select_pages([[1, 2], 4]) + + doc2 = copy.deepcopy(doc) + new_doc = doc_fn(doc2) + + assert new_doc.binary_representation is not None + assert len(new_doc.binary_representation) < len(doc.binary_representation) + assert all(e.properties["page_number"] in [1, 2, 4] for e in new_doc.elements) diff --git a/lib/sycamore/sycamore/utils/pdf_utils.py b/lib/sycamore/sycamore/utils/pdf_utils.py index c9726f3c4..6cef18cec 100644 --- a/lib/sycamore/sycamore/utils/pdf_utils.py +++ b/lib/sycamore/sycamore/utils/pdf_utils.py @@ -1,12 +1,19 @@ from io import BytesIO +from contextlib import nullcontext +import logging +from typing import Any, BinaryIO, Callable, cast, Union from PIL import Image +from pypdf import PdfReader, PdfWriter + from sycamore import DocSet from sycamore.functions.document import DrawBoxes, split_and_convert_to_image from sycamore.utils.image_utils import show_images -from sycamore.data import Document +from sycamore.data import Document, Element import json +logger = logging.getLogger(__name__) + def show_pages(docset: DocSet, limit: int = 2): documents = ( @@ -22,6 +29,109 @@ def show_pages(docset: DocSet, limit: int = 2): show_images(images) +def flatten_selected_pages( + selected_pages: list[Union[int, list[int]]], page_count: int +) -> tuple[list[int], dict[int, int]]: + """ + Accepts a page selection that consists of a page (like [11] ), a page range (like [[25,30]] ), + or a combination of both (like [11, [25,30]] ). Pages are 1-indexed. + + Returns a list of individual page numbers and a dictionary that maps the new page numbers to the + original page numbers in cases where the two are not equal. + """ + + page_list = [] + present_pages = set() + remapped_pages = {} + for selection in selected_pages: + if isinstance(selection, int): + selection = [selection, selection] + if isinstance(selection, list): + subset_start, subset_end = selection + if subset_end < subset_start: + raise ValueError("For selected_pages like [a, b] it must be that a <= b.") + for page_num in range(subset_start, subset_end + 1): + if page_num in present_pages: + raise ValueError("selected_pages may not include overlapping pages.") + if page_num <= 0 or page_num > page_count: + raise ValueError( + f"Invalid page number ({page_num}): for this document," + f"page numbers must be at least 1 and at most {page_count}" + ) + present_pages.add(page_num) + page_list.append(page_num) + + if page_num != len(page_list): + remapped_pages[len(page_list)] = page_num + + else: + raise ValueError("Page selection must either be an integer or a 2-element list [integer, integer]") + return (page_list, remapped_pages) + + +def select_pdf_pages(input: Union[BinaryIO, PdfReader], out: BinaryIO, page_list: list[int]) -> None: + if isinstance(input, PdfReader): + read_cm: Any = nullcontext(input) # Caller is responsible for cleaning up. + else: + input.seek(0) + read_cm = PdfReader(input) + + with read_cm as pdf_reader, PdfWriter() as pdf_writer: + for page_num in page_list: + pdf_writer.add_page(pdf_reader.pages[page_num - 1]) + pdf_writer.write_stream(out) # see pypdf issue #2905 + out.flush() + + +def filter_elements_by_page(elements: list[Element], page_numbers: list[int]) -> list[Element]: + page_map = {num: idx + 1 for idx, num in enumerate(page_numbers)} + new_elements = [] + for element in elements: + page_number = element.properties.get("page_number") + if (new_number := page_map.get(cast(int, page_number))) is not None: + # renumber pages so the elements reference the pages in the new document. + element.properties["page_number"] = new_number + new_elements.append(element) + return new_elements + + +def select_pages(page_selection: list[Union[int, list[int]]]) -> Callable[[Document], Document]: + """ + Returns a function that selects pages from a PDF document based on a list of page selections. + Each selection can be a single page number or a range of page numbers. Page numbers are 1-indexed. + + Examples: + [1,2,3] pages 1, 2, and 3 + [[1,3], 5] pages 1, 2, 3, and 5 + [[1,3], [5,7] pages 1, 2, 3, and 5, 6, 7 + [2, 1, [4, 6]] pages 2, 1, 4, 5, 6, in that order + + Args: + page_selection: A list of page numbers or page ranges to select. Page numbers are 1-indexed. + + """ + + def select_pages_fn(doc: Document) -> Document: + if doc.binary_representation is None: + logging.warning("No binary_representation found in doc {doc.doc_id}. Skipping page selection.") + return doc + + outstream = BytesIO() + + with PdfReader(BytesIO(doc.binary_representation)) as reader: + page_count = len(reader.pages) + page_list, remapped_pages = flatten_selected_pages(page_selection, page_count) + select_pdf_pages(reader, outstream, page_list=page_list) + + doc.binary_representation = outstream.getvalue() + doc.properties["remapped_pages"] = remapped_pages + new_elements = filter_elements_by_page(doc.elements, page_list) + doc.elements = new_elements + return doc + + return select_pages_fn + + def enumerate_images_and_tables(m_pages: list[Document]): from IPython.display import display, HTML