Bring your own ML model to Beam RunInference Stay organized with collections Save and categorize content based on your preferences.
Run in Google Colab | View source on GitHub |
This notebook demonstrates how to run inference on your custom framework using theModelHandler class.
Named-entity recognition (NER) is one of the most common tasks for natural language processing (NLP). NLP locates named entities in unstructured text and classifies the entities using pre-defined labels, such as person name, organization, date, and so on.
This example illustrates how to use the popularspaCy package to load a machine learning (ML) model and perform inference in an Apache Beam pipeline using the RunInferencePTransform.For more information about the RunInference API, seeAbout Beam ML in the Apache Beam documentation.
Install package dependencies
The RunInference library is available in Apache Beam versions 2.40 and later.
For this example, you need to installspaCy andpandas. A small NER model,en_core_web_sm, is also installed, but you can use any validspaCy model.
# Uncomment the following lines to install the required packages.# %pip install spacy pandas# %pip install "apache-beam[gcp, dataframe, interactive]"# !python -m spacy download en_core_web_smLearn aboutspaCy
To learn more aboutspaCy, create aspaCy language object in memory usingspaCy's trained models.You can install these models as Python packages.For more information, see spaCy'sModels and Languages documentation.
importspacynlp=spacy.load("en_core_web_sm")# Add text strings.text_strings=["The New York Times is an American daily newspaper based in New York City with a worldwide readership.","It was founded in 1851 by Henry Jarvis Raymond and George Jones, and was initially published by Raymond, Jones & Company."]# Check which entities spaCy can recognize.doc=nlp(text_strings[0])forentindoc.ents:print(ent.text,ent.start_char,ent.end_char,ent.label_)The New York Times 0 18 ORGAmerican 25 33 NORPdaily 34 39 DATENew York City 59 72 GPE
# Visualize the results.fromspacyimportdisplacydisplacy.render(doc,style="ent")# Visualize another example.displacy.render(nlp(text_strings[1]),style="ent")Create a model handler
This section demonstrates how to create your ownModelHandler so that you can usespaCy for inference.
importapache_beamasbeamfromapache_beam.options.pipeline_optionsimportPipelineOptionsimportwarningswarnings.filterwarnings("ignore")pipeline=beam.Pipeline()# Print the results for verification.withpipelineasp:(p|"CreateSentences" >>beam.Create(text_strings)|beam.Map(print))The New York Times is an American daily newspaper based in New York City with a worldwide readership.It was founded in 1851 by Henry Jarvis Raymond and George Jones, and was initially published by Raymond, Jones & Company.
# Define `SpacyModelHandler` to load the model and perform the inference.fromapache_beam.ml.inference.baseimportRunInferencefromapache_beam.ml.inference.baseimportModelHandlerfromapache_beam.ml.inference.baseimportPredictionResultfromspacyimportLanguagefromtypingimportAnyfromtypingimportDictfromtypingimportIterablefromtypingimportOptionalfromtypingimportSequenceclassSpacyModelHandler(ModelHandler[str,PredictionResult,Language]):def__init__(self,model_name:str="en_core_web_sm",):""" Implementation of the ModelHandler interface for spaCy using text as input. Example Usage:: pcoll | RunInference(SpacyModelHandler()) Args: model_name: The spaCy model name. Default is en_core_web_sm. """self._model_name=model_nameself._env_vars={}defload_model(self)->Language:"""Loads and initializes a model for processing."""returnspacy.load(self._model_name)defrun_inference(self,batch:Sequence[str],model:Language,inference_args:Optional[Dict[str,Any]]=None)->Iterable[PredictionResult]:"""Runs inferences on a batch of text strings. Args: batch: A sequence of examples as text strings. model: A spaCy language model inference_args: Any additional arguments for an inference. Returns: An Iterable of type PredictionResult. """# Loop each text string, and use a tuple to store the inference results.predictions=[]forone_textinbatch:doc=model(one_text)predictions.append([(ent.text,ent.start_char,ent.end_char,ent.label_)forentindoc.ents])return[PredictionResult(x,y)forx,yinzip(batch,predictions)]# Verify that the inference results are correct.withpipelineasp:(p|"CreateSentences" >>beam.Create(text_strings)|"RunInferenceSpacy" >>RunInference(SpacyModelHandler("en_core_web_sm"))|beam.Map(print))The New York Times is an American daily newspaper based in New York City with a worldwide readership.It was founded in 1851 by Henry Jarvis Raymond and George Jones, and was initially published by Raymond, Jones & Company.PredictionResult(example='The New York Times is an American daily newspaper based in New York City with a worldwide readership.', inference=[('The New York Times', 0, 18, 'ORG'), ('American', 25, 33, 'NORP'), ('daily', 34, 39, 'DATE'), ('New York City', 59, 72, 'GPE')])PredictionResult(example='It was founded in 1851 by Henry Jarvis Raymond and George Jones, and was initially published by Raymond, Jones & Company.', inference=[('1851', 18, 22, 'DATE'), ('Henry Jarvis', 26, 38, 'PERSON'), ('Raymond', 39, 46, 'PERSON'), ('George Jones', 51, 63, 'PERSON'), ('Raymond, Jones & Company', 96, 120, 'ORG')])UseKeyedModelHandler to handle keyed data
If you have keyed data, useKeyedModelHandler.
# You can use these text strings with keys to distinguish examples.text_strings_with_keys=[("example_0","The New York Times is an American daily newspaper based in New York City with a worldwide readership."),("example_1","It was founded in 1851 by Henry Jarvis Raymond and George Jones, and was initially published by Raymond, Jones & Company.")]fromapache_beam.runners.interactive.interactive_runnerimportInteractiveRunnerfromapache_beam.ml.inference.baseimportKeyedModelHandlerfromapache_beam.dataframe.convertimportto_dataframepipeline=beam.Pipeline(InteractiveRunner())keyed_spacy_model_handler=KeyedModelHandler(SpacyModelHandler("en_core_web_sm"))# Verify that the inference results are correct.withpipelineasp:results=(p|"CreateSentences" >>beam.Create(text_strings_with_keys)|"RunInferenceSpacy" >>RunInference(keyed_spacy_model_handler)# Generate a schema suitable for conversion to a dataframe using Map to Row objects.|'ToRows' >>beam.Map(lambdarow:beam.Row(key=row[0],text=row[1][0],predictions=row[1][1])))# Convert the results to a pandas dataframe.importapache_beam.runners.interactive.interactive_beamasibbeam_df=to_dataframe(results)df=ib.collect(beam_df)dfExcept as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2025-10-22 UTC.
Run in Google Colab
View source on GitHub