|
16 | 16 | # [START documentai_batch_process_document] |
17 | 17 | import re |
18 | 18 |
|
| 19 | +from google.api_core.client_options import ClientOptions |
19 | 20 | from google.cloud import documentai_v1 as documentai |
20 | 21 | from google.cloud import storage |
21 | 22 |
|
22 | 23 | # TODO(developer): Uncomment these variables before running the sample. |
23 | | -# project_id= 'YOUR_PROJECT_ID' |
24 | | -# location = 'YOUR_PROJECT_LOCATION' # Format is 'us' or 'eu' |
| 24 | +# project_id = 'YOUR_PROJECT_ID' |
| 25 | +# location = 'YOUR_PROCESSOR_LOCATION' # Format is 'us' or 'eu' |
25 | 26 | # processor_id = 'YOUR_PROCESSOR_ID' # Create processor in Cloud Console |
26 | | -# gcs_input_uri = "YOUR_INPUT_URI" |
27 | | -# gcs_output_uri = "YOUR_OUTPUT_BUCKET_URI" |
28 | | -# gcs_output_uri_prefix = "YOUR_OUTPUT_URI_PREFIX" |
| 27 | +# gcs_input_uri = "YOUR_INPUT_URI" # Format: gs://bucket/directory/file.pdf |
| 28 | +# input_mime_type = "application/pdf" |
| 29 | +# gcs_output_bucket = "YOUR_OUTPUT_BUCKET_NAME" # Format: gs://bucket |
| 30 | +# gcs_output_uri_prefix = "YOUR_OUTPUT_URI_PREFIX" # Format: directory/subdirectory/ |
29 | 31 |
|
30 | 32 |
|
31 | 33 | def batch_process_documents( |
32 | | - project_id, |
33 | | - location, |
34 | | - processor_id, |
35 | | - gcs_input_uri, |
36 | | - gcs_output_uri, |
37 | | - gcs_output_uri_prefix, |
| 34 | + project_id: str, |
| 35 | + location: str, |
| 36 | + processor_id: str, |
| 37 | + gcs_input_uri: str, |
| 38 | + input_mime_type: str, |
| 39 | + gcs_output_bucket: str, |
| 40 | + gcs_output_uri_prefix: str, |
38 | 41 | timeout: int = 300, |
39 | 42 | ): |
40 | 43 |
|
41 | 44 | # You must set the api_endpoint if you use a location other than 'us', e.g.: |
42 | | - opts = {} |
43 | | - if location == "eu": |
44 | | - opts = {"api_endpoint": "eu-documentai.googleapis.com"} |
| 45 | + opts = ClientOptions(api_endpoint=f"{location}-documentai.googleapis.com") |
45 | 46 |
|
46 | 47 | client = documentai.DocumentProcessorServiceClient(client_options=opts) |
47 | 48 |
|
48 | | - destination_uri = f"{gcs_output_uri}/{gcs_output_uri_prefix}/" |
49 | | - |
50 | | - gcs_documents = documentai.GcsDocuments( |
51 | | - documents=[{"gcs_uri": gcs_input_uri, "mime_type": "application/pdf"}] |
| 49 | + gcs_document = documentai.GcsDocument( |
| 50 | + gcs_uri=gcs_input_uri, mime_type=input_mime_type |
52 | 51 | ) |
53 | 52 |
|
54 | | - # 'mime_type' can be 'application/pdf', 'image/tiff', |
55 | | - # and 'image/gif', or 'application/json' |
| 53 | + # Load GCS Input URI into a List of document files |
| 54 | + gcs_documents = documentai.GcsDocuments(documents=[gcs_document]) |
56 | 55 | input_config = documentai.BatchDocumentsInputConfig(gcs_documents=gcs_documents) |
57 | 56 |
|
58 | | - # Where to write results |
59 | | - output_config = documentai.DocumentOutputConfig( |
60 | | - gcs_output_config={"gcs_uri": destination_uri} |
| 57 | + # NOTE: Alternatively, specify a GCS URI Prefix to process an entire directory |
| 58 | + # |
| 59 | + # gcs_input_uri = "gs://bucket/directory/" |
| 60 | + # gcs_prefix = documentai.GcsPrefix(gcs_uri_prefix=gcs_input_uri) |
| 61 | + # input_config = documentai.BatchDocumentsInputConfig(gcs_prefix=gcs_prefix) |
| 62 | + # |
| 63 | + |
| 64 | + # Cloud Storage URI for the Output Directory |
| 65 | + destination_uri = f"{gcs_output_bucket}/{gcs_output_uri_prefix}/" |
| 66 | + |
| 67 | + gcs_output_config = documentai.DocumentOutputConfig.GcsOutputConfig( |
| 68 | + gcs_uri=destination_uri |
61 | 69 | ) |
62 | 70 |
|
63 | | - # Location can be 'us' or 'eu' |
64 | | - name = f"projects/{project_id}/locations/{location}/processors/{processor_id}" |
65 | | - request = documentai.types.document_processor_service.BatchProcessRequest( |
| 71 | + # Where to write results |
| 72 | + output_config = documentai.DocumentOutputConfig(gcs_output_config=gcs_output_config) |
| 73 | + |
| 74 | + # The full resource name of the processor, e.g.: |
| 75 | + # projects/project_id/locations/location/processor/processor_id |
| 76 | + # You must create new processors in the Cloud Console first |
| 77 | + name = client.processor_path(project_id, location, processor_id) |
| 78 | + |
| 79 | + request = documentai.BatchProcessRequest( |
66 | 80 | name=name, |
67 | 81 | input_documents=input_config, |
68 | 82 | document_output_config=output_config, |
69 | 83 | ) |
70 | 84 |
|
| 85 | + # BatchProcess returns a Long Running Operation (LRO) |
71 | 86 | operation = client.batch_process_documents(request) |
72 | 87 |
|
73 | | - # Wait for the operation to finish |
| 88 | + # Continually polls the operation until it is complete. |
| 89 | + # This could take some time for larger files |
| 90 | + # Format: projects/PROJECT_NUMBER/locations/LOCATION/operations/OPERATION_ID |
| 91 | + print(f"Waiting for operation {operation.operation.name} to complete...") |
74 | 92 | operation.result(timeout=timeout) |
75 | 93 |
|
76 | | - # Results are written to GCS. Use a regex to find |
77 | | - # output files |
78 | | - match = re.match(r"gs://([^/]+)/(.+)", destination_uri) |
79 | | - output_bucket = match.group(1) |
80 | | - prefix = match.group(2) |
| 94 | + # NOTE: Can also use callbacks for asynchronous processing |
| 95 | + # |
| 96 | + # def my_callback(future): |
| 97 | + # result = future.result() |
| 98 | + # |
| 99 | + # operation.add_done_callback(my_callback) |
81 | 100 |
|
82 | | - storage_client = storage.Client() |
83 | | - bucket = storage_client.get_bucket(output_bucket) |
84 | | - blob_list = list(bucket.list_blobs(prefix=prefix)) |
85 | | - print("Output files:") |
| 101 | + # Once the operation is complete, |
| 102 | + # get output document information from operation metadata |
| 103 | + metadata = documentai.BatchProcessMetadata(operation.metadata) |
86 | 104 |
|
87 | | - for i, blob in enumerate(blob_list): |
88 | | - # If JSON file, download the contents of this blob as a bytes object. |
89 | | - if ".json" in blob.name: |
90 | | - blob_as_bytes = blob.download_as_bytes() |
| 105 | + if metadata.state != documentai.BatchProcessMetadata.State.SUCCEEDED: |
| 106 | + raise ValueError(f"Batch Process Failed: {metadata.state_message}") |
91 | 107 |
|
92 | | - document = documentai.types.Document.from_json(blob_as_bytes) |
93 | | - print(f"Fetched file {i + 1}") |
| 108 | + storage_client = storage.Client() |
| 109 | + |
| 110 | + print("Output files:") |
| 111 | + # One process per Input Document |
| 112 | + for process in metadata.individual_process_statuses: |
| 113 | + # output_gcs_destination format: gs://BUCKET/PREFIX/OPERATION_NUMBER/INPUT_FILE_NUMBER/ |
| 114 | + # The Cloud Storage API requires the bucket name and URI prefix separately |
| 115 | + matches = re.match(r"gs://(.*?)/(.*)", process.output_gcs_destination) |
| 116 | + if not matches: |
| 117 | + print( |
| 118 | + "Could not parse output GCS destination:", |
| 119 | + process.output_gcs_destination, |
| 120 | + ) |
| 121 | + continue |
| 122 | + |
| 123 | + output_bucket, output_prefix = matches.groups() |
| 124 | + |
| 125 | + # Get List of Document Objects from the Output Bucket |
| 126 | + output_blobs = storage_client.list_blobs(output_bucket, prefix=output_prefix) |
| 127 | + |
| 128 | + # Document AI may output multiple JSON files per source file |
| 129 | + for blob in output_blobs: |
| 130 | + # Document AI should only output JSON files to GCS |
| 131 | + if ".json" not in blob.name: |
| 132 | + print( |
| 133 | + f"Skipping non-supported file: {blob.name} - Mimetype: {blob.content_type}" |
| 134 | + ) |
| 135 | + continue |
| 136 | + |
| 137 | + # Download JSON File as bytes object and convert to Document Object |
| 138 | + print(f"Fetching {blob.name}") |
| 139 | + document = documentai.Document.from_json( |
| 140 | + blob.download_as_bytes(), ignore_unknown_fields=True |
| 141 | + ) |
94 | 142 |
|
95 | 143 | # For a full list of Document object attributes, please reference this page: |
96 | | - # https://cloud.google.com/document-ai/docs/reference/rpc/google.cloud.documentai.v1beta3#document |
| 144 | + # https://cloud.google.com/python/docs/reference/documentai/latest/google.cloud.documentai_v1.types.Document |
97 | 145 |
|
98 | 146 | # Read the text recognition output from the processor |
99 | | - for page in document.pages: |
100 | | - for form_field in page.form_fields: |
101 | | - field_name = get_text(form_field.field_name, document) |
102 | | - field_value = get_text(form_field.field_value, document) |
103 | | - print("Extracted key value pair:") |
104 | | - print(f"\t{field_name}, {field_value}") |
105 | | - for paragraph in page.paragraphs: |
106 | | - paragraph_text = get_text(paragraph.layout, document) |
107 | | - print(f"Paragraph text:\n{paragraph_text}") |
108 | | - else: |
109 | | - print(f"Skipping non-supported file type {blob.name}") |
110 | | - |
111 | | - |
112 | | -# Extract shards from the text field |
113 | | -def get_text(doc_element: dict, document: dict): |
114 | | - """ |
115 | | - Document AI identifies form fields by their offsets |
116 | | - in document text. This function converts offsets |
117 | | - to text snippets. |
118 | | - """ |
119 | | - response = "" |
120 | | - # If a text segment spans several lines, it will |
121 | | - # be stored in different text segments. |
122 | | - for segment in doc_element.text_anchor.text_segments: |
123 | | - start_index = ( |
124 | | - int(segment.start_index) |
125 | | - if segment in doc_element.text_anchor.text_segments |
126 | | - else 0 |
127 | | - ) |
128 | | - end_index = int(segment.end_index) |
129 | | - response += document.text[start_index:end_index] |
130 | | - return response |
| 147 | + print("The document contains the following text:") |
| 148 | + print(document.text) |
131 | 149 |
|
132 | 150 |
|
133 | 151 | # [END documentai_batch_process_document] |
0 commit comments