TheUnstructured Python SDK client allows you to send one file at a time for processing bytheUnstructured Partition Endpoint.
To use the Python SDK, you’ll first need to set an environment variable namedUNSTRUCTURED_API_KEY
,representing your Unstructured API key. To get your API key, do the following:
Sign in to your Unstructured account:
Get your Unstructured API key:
a. In the Unstructured UI, clickAPI Keys on the sidebar.
b. ClickGenerate API Key.
c. Follow the on-screen instructions to finish generating the key.
d. Click theCopy icon next to your new key to add the key to your system’s clipboard. If you lose this key, simply return and click theCopy icon again.
Before using the SDK to interact with Unstructured, install the library:
pip install unstructured-client
The SDK uses semantic versioning and major bumps could bring breaking changes. It is advised topin your installed version. See themigration guide, later on this page, for breaking change announcements.
Let’s start with a simple example in which you send a PDF document to the Unstructured Partition Endpoint to be partitioned by Unstructured.
import os, jsonimport unstructured_clientfrom unstructured_client.modelsimport operations, sharedclient= unstructured_client.UnstructuredClient( api_key_auth=os.getenv("UNSTRUCTURED_API_KEY"))filename= "PATH_TO_INPUT_FILE"req= operations.PartitionRequest( partition_parameters=shared.PartitionParameters( files=shared.Files( content=open(filename,"rb"), file_name=filename, ), strategy=shared.Strategy.VLM, vlm_model="gpt-4o", vlm_model_provider="openai", languages=['eng'], split_pdf_page=True,# If True, splits the PDF file into smaller chunks of pages. split_pdf_allow_failed=True,# If True, the partitioning continues even if some pages fail. split_pdf_concurrency_level=15 # Set the number of concurrent request to the maximum value: 15. ),)try: res= client.general.partition( request=req ) element_dicts= [elementfor elementin res.elements] # Print the processed data's first element only. print(element_dicts[0]) # Write the processed data to a local file. json_elements= json.dumps(element_dicts,indent=2) with open("PATH_TO_OUTPUT_FILE","w")as file: file.write(json_elements)except Exception as e: print(e)
The Python SDK also has apartition_async
. This call is equivalent topartition
except that it can be used in a non blocking context. For instance,asyncio.gather
can be used to concurrently process multiple files inside of a directory hierarchy, as demonstrated here:
import asyncioimport osimport jsonimport unstructured_clientfrom unstructured_client.modelsimport sharedclient= unstructured_client.UnstructuredClient( api_key_auth=os.getenv("UNSTRUCTURED_API_KEY"))async def call_api(filename,input_dir,output_dir): req= { "partition_parameters": { "files": { "content":open(filename,"rb"), "file_name": os.path.basename(filename), }, "strategy": shared.Strategy.VLM, "vlm_model":"gpt-4o", "vlm_model_provider":"openai", "languages": ['eng'], "split_pdf_page",True,# If True, splits the PDF file into smaller chunks of pages. "split_pdf_allow_failed":True,# If True, the partitioning continues even if some pages fail. "split_pdf_concurrency_level":15 # Set the number of concurrent request to the maximum value: 15. } } try: res= await client.general.partition_async( request=req ) element_dicts= [elementfor elementin res.elements] json_elements= json.dumps(element_dicts,indent=2) # Create the output directory structure. relative_path= os.path.relpath(os.path.dirname(filename), input_dir) output_subdir= os.path.join(output_dir, relative_path) os.makedirs(output_subdir,exist_ok=True) # Write the output file. output_filename= os.path.join(output_subdir, os.path.basename(filename)+ ".json") with open(output_filename,"w")as file: file.write(json_elements) except Exception as e: print(f"Error processing{filename}:{e}")async def process_files(input_directory,output_directory): tasks= [] for root, _, filesin os.walk(input_directory): for file in files: if not file.endswith('.json'): full_path= os.path.join(root,file) tasks.append(call_api(full_path, input_directory, output_directory)) await asyncio.gather(*tasks)if __name__ == "__main__": asyncio.run(process_files( input_directory=os.getenv("LOCAL_FILE_INPUT_DIR"), output_directory=os.getenv("LOCAL_FILE_OUTPUT_DIR") ))
In order to speed up processing of large PDF files, thesplit_pdf_page
* parameter isTrue
by default. Thiscauses the PDF to be split into small batches of pages before sending requests to the API. The clientawaits all parallel requests and combines the responses into a single response object. This is specific to PDF files and otherfiletypes are ignored.
The number of parallel requests is controlled bysplit_pdf_concurrency_level
*.The default is 8 and the max is set to 15 to avoid high resource usage and costs.
If at least one request is successful, the responses are combined into a single response object. Anerror is returned only if all requests failed or there was an error during splitting.
This feature may lead to unexpected results when chunking because the server does not see the entiredocument context at once. If you’d like to chunk across the whole document and still get the speedup fromparallel processing, you can:
split_pdf_page
set toTrue
, without any chunking parameters.results.json
.req= operations.PartitionRequest( partition_parameters=shared.PartitionParameters( files=shared.Files( content=file.read(), file_name=filename, ), strategy=shared.Strategy.VLM, vlm_model="gpt-4o", vlm_model_provider="openai", split_pdf_page=True,# If True, splits the PDF file into smaller chunks of pages. split_pdf_allow_failed=True,# If True, the partitioning continues even if some pages fail. split_pdf_concurrency_level=15 # Set the number of concurrent request to the maximum value: 15. ))res= client.general.partition( request=req)
You can also change the defaults for retries through theretry_config
*when initializing the client. If a request to the API fails, the client will retry therequest with an exponential backoff strategy up to a maximum interval of one minute. Thefunction keeps retrying until the total elapsed time exceedsmax_elapsed_time
*,which defaults to one hour:
import osclient= UnstructuredClient( api_key_auth=os.getenv("UNSTRUCTURED_API_KEY") retry_config=RetryConfig( strategy="backoff", retry_connection_errors=True, backoff=BackoffStrategy( # time intervals are defined in milliseconds initial_interval=500, max_interval=60000, exponent=1.5, max_elapsed_time=900000,# 15min*60sec*1000ms = 15 minutes ), ))
If you disable SSL validation, requests will accept any TLS certificatepresented by the server and ignore hostname mismatches and/or expired certificates,which will make your application vulnerable to man-in-the-middle (MitM) attacks.Only set this toFalse
for testing.
http_client= requests.Session()http_client.verify= Falseclient= UnstructuredClient( client=http_client, ...)
The partition response defaults to a dict format that can be converted to Unstructured elements withtheelements_from_dicts
utility function as seen below. Otherwise, the API response can be sent directlyto your vector store or another destination.
from unstructured.staging.baseimport elements_from_dicts# ...if res.elementsis not None: elements= elements_from_dicts(response.elements)
The parameter names used in this document are for the Python SDK, which follow snake_case convention. The JavaScript/TypeScript SDK follows camelCaseconvention. Other than this difference in naming convention,the names used in the SDKs are the same across all methods.
There are breaking changes beginning with Python SDK version 0.26.0. If you encounter any errors when upgrading, please find the solution below.
If you see the error:AttributeError: 'PartitionParameters' object has no attribute 'partition_parameters'
Before 0.26.0, the SDK accepted aPartitionParameters
object as input to thesdk.general.partition
function. Beginning with 0.26.0, this object must be wrapped in aPartitionRequest
object. The old behavior was deprecated in 0.23.0 and removed in 0.26.0.
# Instead of:from unstructured_client.modelsimport sharedreq= shared.PartitionParameters( files=files,)resp= s.general.partition( request=req)# Switch to:from unstructured_client.modelsimport shared, operationsreq= operations.PartitionRequest( partition_parameters=shared.PartitionParameters( files=files, ))resp= s.general.partition( request=req)
If you see the error:TypeError: BaseModel.__init__() takes 1 positional argument but 2 were given
Beginning with 0.26.0, thePartitionRequest
constructor no longer allows for positional arguments. You must specifypartition_parameters
by name.
# Instead of:req= operations.PartitionRequest( shared.PartitionParameters( files=files, ))# Switch to:req= operations.PartitionRequest( partition_parameters=shared.PartitionParameters( files=files, ))
If you see the error:TypeError: General.partition() takes 1 positional argument but 2 were given
Beginning with 0.26.0, thepartition
function no longer allows for positional arguments. You must specifyrequest
by name.
# Instead of:resp= s.general.partition(req)# Switch to:resp= s.general.partition( request=req)
Was this page helpful?