This browser is no longer supported.
Upgrade to Microsoft Edge to take advantage of the latest features, security updates, and technical support.
Note
Access to this page requires authorization. You can trysigning in orchanging directories.
Access to this page requires authorization. You can trychanging directories.
In this Quickstart, you learn how to send events to and receive events from an event hub using theazure-eventhub Python package.
If you're new to Azure Event Hubs, seeEvent Hubs overview before you do this quickstart.
To complete this quickstart, ensure you have the following prerequisites:
To install the Python packages for Event Hubs, open a command prompt that has Python in its path. Change the directory to the folder where you want to keep your samples.
pip install azure-eventhubpip install azure-identitypip install aiohttpThis quickstart shows you two ways of connecting to Azure Event Hubs:
We recommend using the passwordless option in real-world applications and production environments. For more information, seeService Bus authentication and authorization andPasswordless connections for Azure services.
When you develop locally, make sure that the user account that connects to Azure Event Hubs has the correct permissions. You need theAzure Event Hubs Data Owner role to send and receive messages. To assign yourself this role, you need the User Access Administrator role, or another role that includes theMicrosoft.Authorization/roleAssignments/write action. You can assign Azure RBAC roles to a user using the Azure portal, Azure CLI, or Azure PowerShell. For more information, seeUnderstand scope for Azure RBAC page.
The following example assigns theAzure Event Hubs Data Owner role to your user account, which provides full access to Azure Event Hubs resources. In a real scenario, follow thePrinciple of Least Privilege to give users only the minimum permissions needed for a more secure production environment.
For Azure Event Hubs, the management of namespaces and all related resources through the Azure portal and the Azure resource management API is already protected using the Azure RBAC model. Azure provides the following built-in roles for authorizing access to an Event Hubs namespace:
If you want to create a custom role, seeRights required for Event Hubs operations.
Important
In most cases, it takes a minute or two for the role assignment to propagate in Azure. In rare cases, it might take up to eight minutes. If you receive authentication errors when you first run your code, wait a few moments and try again.
In the Azure portal, locate your Event Hubs namespace using the main search bar or left navigation.
On the overview page, selectAccess control (IAM) from the left-hand menu.
On theAccess control (IAM) page, select theRole assignments tab.
Select+ Add from the top menu. Then selectAdd role assignment.
Use the search box to filter the results to the desired role. For this example, search forAzure Event Hubs Data Owner and select the matching result. Then chooseNext.
UnderAssign access to, selectUser, group, or service principal. Then choose+ Select members.
In the dialog, search for your Microsoft Entra username (usually youruser@domain email address). ChooseSelect at the bottom of the dialog.
SelectReview + assign to go to the final page. SelectReview + assign again to complete the process.
In this section, create a Python script to send events to the event hub that you created earlier.
Open your favorite Python editor, such asVisual Studio Code.
Create a script calledsend.py. This script sends a batch of events to the event hub that you created earlier.
Paste the following code intosend.py:
In the code, use real values to replace the following placeholders:
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE - You see the fully qualified name on theOverview page of the namespace. It should be in the format:<NAMESPACENAME>>.servicebus.windows.net.EVENT_HUB_NAME - Name of the event hub.import asynciofrom azure.eventhub import EventDatafrom azure.eventhub.aio import EventHubProducerClientfrom azure.identity.aio import DefaultAzureCredentialEVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"EVENT_HUB_NAME = "EVENT_HUB_NAME"credential = DefaultAzureCredential()async def run(): # Create a producer client to send messages to the event hub. # Specify a credential that has correct role assigned to access # event hubs namespace and the event hub name. producer = EventHubProducerClient( fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, eventhub_name=EVENT_HUB_NAME, credential=credential, ) print("Producer client created successfully.") async with producer: # Create a batch. event_data_batch = await producer.create_batch() # Add events to the batch. event_data_batch.add(EventData("First event ")) event_data_batch.add(EventData("Second event")) event_data_batch.add(EventData("Third event")) # Send the batch of events to the event hub. await producer.send_batch(event_data_batch) # Close credential when no longer needed. await credential.close()asyncio.run(run())Note
For examples of other options for sending events to an event hub asynchronously using a connection string, see theGitHub send_async.py page. The patterns shown there are also applicable to sending events passwordless.
This quickstart uses Azure Blob storage as a checkpoint store. The checkpoint store is used to persist checkpoints (that is, the last read positions).
Follow these recommendations when you use Azure Blob Storage as a checkpoint store:
On theStorage account page in the Azure portal, in theBlob service section, ensure that the following settings are disabled.
Create an Azure storage account and a blob container in it by doing the following steps:
Be sure to record the connection string and container name for later use in the receive code.
When you develop locally, make sure that the user account that accesses blob data has the correct permissions. You needStorage Blob Data Contributor to read and write blob data. To assign yourself this role, you need to be assigned theUser Access Administrator role, or another role that includes theMicrosoft.Authorization/roleAssignments/write action. You can assign Azure RBAC roles to a user using the Azure portal, Azure CLI, or Azure PowerShell. For more information, seeUnderstand scope for Azure RBAC.
In this scenario, you assign permissions to your user account, scoped to the storage account, to follow thePrinciple of Least Privilege. This practice gives users only the minimum permissions needed and creates more secure production environments.
The following example assigns theStorage Blob Data Contributor role to your user account, which provides both read and write access to blob data in your storage account.
Important
In most cases, it takes a minute or two for the role assignment to propagate in Azure. In rare cases, it might take up to eight minutes. If you receive authentication errors when you first run your code, wait a few moments and try again.
In the Azure portal, locate your storage account using the main search bar or left navigation.
On the storage account page, selectAccess control (IAM) from the left-hand menu.
On theAccess control (IAM) page, select theRole assignments tab.
Select+ Add from the top menu. Then selectAdd role assignment.
Use the search box to filter the results to the desired role. For this example, search forStorage Blob Data Contributor. Select the matching result and then chooseNext.
UnderAssign access to, selectUser, group, or service principal, and then choose+ Select members.
In the dialog, search for your Microsoft Entra username (usually youruser@domain email address) and then chooseSelect at the bottom of the dialog.
SelectReview + assign to go to the final page. SelectReview + assign again to complete the process.
For the receiving side, you need to install one or more packages. In this quickstart, you use Azure Blob storage to persist checkpoints so that the program doesn't read the events that it already read. It performs metadata checkpoints on received messages at regular intervals in a blob. This approach makes it easy to continue receiving messages later from where you left off.
pip install azure-eventhub-checkpointstoreblob-aiopip install azure-identityIn this section, you create a Python script to receive events from your event hub:
Open your favorite Python editor, such asVisual Studio Code.
Create a script calledrecv.py.
Paste the following code intorecv.py:
In the code, use real values to replace the following placeholders:
BLOB_STORAGE_ACCOUNT_URL - This value should be in the format:https://<YOURSTORAGEACCOUNTNAME>.blob.core.windows.net/BLOB_CONTAINER_NAME - Name of the blob container in the Azure storage account.EVENT_HUB_FULLY_QUALIFIED_NAMESPACE - You see the fully qualified name on theOverview page of the namespace. It should be in the format:<NAMESPACENAME>>.servicebus.windows.net.EVENT_HUB_NAME - Name of the event hub.import asynciofrom azure.eventhub.aio import EventHubConsumerClientfrom azure.eventhub.extensions.checkpointstoreblobaio import ( BlobCheckpointStore,)from azure.identity.aio import DefaultAzureCredentialBLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"EVENT_HUB_NAME = "EVENT_HUB_NAME"credential = DefaultAzureCredential()async def on_event(partition_context, event): # Print the event data. print( 'Received the event: "{}" from the partition with ID: "{}"'.format( event.body_as_str(encoding="UTF-8"), partition_context.partition_id ) ) # Update the checkpoint so that the program doesn't read the events # that it has already read when you run it next time. await partition_context.update_checkpoint(event)async def main(): # Create an Azure blob checkpoint store to store the checkpoints. checkpoint_store = BlobCheckpointStore( blob_account_url=BLOB_STORAGE_ACCOUNT_URL, container_name=BLOB_CONTAINER_NAME, credential=credential, ) # Create a consumer client for the event hub. client = EventHubConsumerClient( fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, eventhub_name=EVENT_HUB_NAME, consumer_group="$Default", checkpoint_store=checkpoint_store, credential=credential, ) async with client: # Call the receive method. Read from the beginning of the partition # (starting_position: "-1") await client.receive(on_event=on_event, starting_position="-1") # Close credential when no longer needed. await credential.close()if __name__ == "__main__": # Run the main method. asyncio.run(main())Note
For examples of other options for receiving events from an event hub asynchronously using a connection string, see theGitHub recv_with_checkpoint_store_async.pypage. The patterns shown there are also applicable to receiving events passwordless.
Launch a command prompt.
Run the following command and sign in using account that was added to theAzure Event Hubs Data Owner role on the Event Hubs namespace andStorage Blob Data Contributor role on the Azure storage account.
az loginSwitch to the folder that has the receive.py file, and run the following command:
python recv.pyLaunch a command prompt.
Run the following command and sign in using account that was added to theAzure Event Hubs Data Owner role on the Event Hubs namespace andStorage Blob Data Contributor role on the Azure storage account.
az loginSwitch to the folder that has the send.py, and then run this command:
python send.pyThe receiver window should display the messages that were sent to the event hub.
If you don't see events in the receiver window or the code reports an error, try the following troubleshooting tips:
If you don't see results fromrecy.py, runsend.py several times.
If you see errors about "coroutine" when using the passwordless code (with credentials), make sure you're using importing fromazure.identity.aio.
If you see "Unclosed client session" with passwordless code (with credentials), make sure you close the credential when finished. For more information, seeAsync credentials.
If you see authorization errors withrecv.py when accessing storage, make sure you followed the steps inCreate an Azure storage account and a blob container and assigned theStorage Blob Data Contributor role to the service principal.
If you receive events with different partition IDs, this result is expected. Partitions are a data organization mechanism that relates to the downstream parallelism required in consuming applications. The number of partitions in an event hub directly relates to the number of concurrent readers you expect to have. For more information, seeLearn more about partitions.
In this quickstart, you sent and received events asynchronously. To learn how to send and receive events synchronously, go to theGitHub sync_samples page.
Explore more examples and advanced scenarios in theAzure Event Hubs client library for Python samples.
Was this page helpful?
Need help with this topic?
Want to try using Ask Learn to clarify or guide you through this topic?
Was this page helpful?
Want to try using Ask Learn to clarify or guide you through this topic?