Movatterモバイル変換


[0]ホーム

URL:


Skip to main content

This browser is no longer supported.

Upgrade to Microsoft Edge to take advantage of the latest features, security updates, and technical support.

Download Microsoft EdgeMore info about Internet Explorer and Microsoft Edge
Table of contentsExit editor mode

Quickstart: Send events to or receive events from event hubs by using Python

Feedback

In this article

In this Quickstart, you learn how to send events to and receive events from an event hub using theazure-eventhub Python package.

Prerequisites

If you're new to Azure Event Hubs, seeEvent Hubs overview before you do this quickstart.

To complete this quickstart, ensure you have the following prerequisites:

  • Microsoft Azure subscription: Sign up for afree trial if you don't have one.
  • Python 3.8 or later: Ensure pip is installed and updated.
  • Visual Studio Code (recommended): Or use any other IDE of your choice.
  • Event Hubs namespace and event hub: Followthis guide to create them in theAzure portal.

Install the packages to send events

To install the Python packages for Event Hubs, open a command prompt that has Python in its path. Change the directory to the folder where you want to keep your samples.

pip install azure-eventhubpip install azure-identitypip install aiohttp

Authenticate the app to Azure

This quickstart shows you two ways of connecting to Azure Event Hubs:

  • Passwordless. Use your security principal in Microsoft Entra ID and role-based access control (RBAC) to connect to an Event Hubs namespace. You don't need to worry about having hard-coded connection strings in your code, in a configuration file, or in secure storage like Azure Key Vault.
  • Connection string. Use a connection string to connect to an Event Hubs namespace. If you're new to Azure, you might find the connection string option easier to follow.

We recommend using the passwordless option in real-world applications and production environments. For more information, seeService Bus authentication and authorization andPasswordless connections for Azure services.

Assign roles to your Microsoft Entra user

When you develop locally, make sure that the user account that connects to Azure Event Hubs has the correct permissions. You need theAzure Event Hubs Data Owner role to send and receive messages. To assign yourself this role, you need the User Access Administrator role, or another role that includes theMicrosoft.Authorization/roleAssignments/write action. You can assign Azure RBAC roles to a user using the Azure portal, Azure CLI, or Azure PowerShell. For more information, seeUnderstand scope for Azure RBAC page.

The following example assigns theAzure Event Hubs Data Owner role to your user account, which provides full access to Azure Event Hubs resources. In a real scenario, follow thePrinciple of Least Privilege to give users only the minimum permissions needed for a more secure production environment.

Azure built-in roles for Azure Event Hubs

For Azure Event Hubs, the management of namespaces and all related resources through the Azure portal and the Azure resource management API is already protected using the Azure RBAC model. Azure provides the following built-in roles for authorizing access to an Event Hubs namespace:

If you want to create a custom role, seeRights required for Event Hubs operations.

Important

In most cases, it takes a minute or two for the role assignment to propagate in Azure. In rare cases, it might take up to eight minutes. If you receive authentication errors when you first run your code, wait a few moments and try again.

  1. In the Azure portal, locate your Event Hubs namespace using the main search bar or left navigation.

  2. On the overview page, selectAccess control (IAM) from the left-hand menu.

  3. On theAccess control (IAM) page, select theRole assignments tab.

  4. Select+ Add from the top menu. Then selectAdd role assignment.

    Screenshot showing how to assign a role.

  5. Use the search box to filter the results to the desired role. For this example, search forAzure Event Hubs Data Owner and select the matching result. Then chooseNext.

  6. UnderAssign access to, selectUser, group, or service principal. Then choose+ Select members.

  7. In the dialog, search for your Microsoft Entra username (usually youruser@domain email address). ChooseSelect at the bottom of the dialog.

  8. SelectReview + assign to go to the final page. SelectReview + assign again to complete the process.

Send events

In this section, create a Python script to send events to the event hub that you created earlier.

  1. Open your favorite Python editor, such asVisual Studio Code.

  2. Create a script calledsend.py. This script sends a batch of events to the event hub that you created earlier.

  3. Paste the following code intosend.py:

    In the code, use real values to replace the following placeholders:

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE - You see the fully qualified name on theOverview page of the namespace. It should be in the format:<NAMESPACENAME>>.servicebus.windows.net.
    • EVENT_HUB_NAME - Name of the event hub.
    import asynciofrom azure.eventhub import EventDatafrom azure.eventhub.aio import EventHubProducerClientfrom azure.identity.aio import DefaultAzureCredentialEVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"EVENT_HUB_NAME = "EVENT_HUB_NAME"credential = DefaultAzureCredential()async def run():    # Create a producer client to send messages to the event hub.    # Specify a credential that has correct role assigned to access    # event hubs namespace and the event hub name.    producer = EventHubProducerClient(        fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,        eventhub_name=EVENT_HUB_NAME,        credential=credential,    )    print("Producer client created successfully.")     async with producer:        # Create a batch.        event_data_batch = await producer.create_batch()        # Add events to the batch.        event_data_batch.add(EventData("First event "))        event_data_batch.add(EventData("Second event"))        event_data_batch.add(EventData("Third event"))        # Send the batch of events to the event hub.        await producer.send_batch(event_data_batch)        # Close credential when no longer needed.        await credential.close()asyncio.run(run())

    Note

    For examples of other options for sending events to an event hub asynchronously using a connection string, see theGitHub send_async.py page. The patterns shown there are also applicable to sending events passwordless.

Receive events

This quickstart uses Azure Blob storage as a checkpoint store. The checkpoint store is used to persist checkpoints (that is, the last read positions).

Follow these recommendations when you use Azure Blob Storage as a checkpoint store:

  • Use a separate container for each consumer group. You can use the same storage account, but use one container per each group.
  • Don't use the storage account for anything else.
  • Don't use the container for anything else.
  • Create the storage account in the same region as the deployed application. If the application is on-premises, try to choose the closest region possible.

On theStorage account page in the Azure portal, in theBlob service section, ensure that the following settings are disabled.

  • Hierarchical namespace
  • Blob soft delete
  • Versioning

Create an Azure storage account and a blob container

Create an Azure storage account and a blob container in it by doing the following steps:

  1. Create an Azure Storage account
  2. Create a blob container.
  3. Authenticate to the blob container.

Be sure to record the connection string and container name for later use in the receive code.

When you develop locally, make sure that the user account that accesses blob data has the correct permissions. You needStorage Blob Data Contributor to read and write blob data. To assign yourself this role, you need to be assigned theUser Access Administrator role, or another role that includes theMicrosoft.Authorization/roleAssignments/write action. You can assign Azure RBAC roles to a user using the Azure portal, Azure CLI, or Azure PowerShell. For more information, seeUnderstand scope for Azure RBAC.

In this scenario, you assign permissions to your user account, scoped to the storage account, to follow thePrinciple of Least Privilege. This practice gives users only the minimum permissions needed and creates more secure production environments.

The following example assigns theStorage Blob Data Contributor role to your user account, which provides both read and write access to blob data in your storage account.

Important

In most cases, it takes a minute or two for the role assignment to propagate in Azure. In rare cases, it might take up to eight minutes. If you receive authentication errors when you first run your code, wait a few moments and try again.

  1. In the Azure portal, locate your storage account using the main search bar or left navigation.

  2. On the storage account page, selectAccess control (IAM) from the left-hand menu.

  3. On theAccess control (IAM) page, select theRole assignments tab.

  4. Select+ Add from the top menu. Then selectAdd role assignment.

    Screenshot showing how to assign a storage account role.

  5. Use the search box to filter the results to the desired role. For this example, search forStorage Blob Data Contributor. Select the matching result and then chooseNext.

  6. UnderAssign access to, selectUser, group, or service principal, and then choose+ Select members.

  7. In the dialog, search for your Microsoft Entra username (usually youruser@domain email address) and then chooseSelect at the bottom of the dialog.

  8. SelectReview + assign to go to the final page. SelectReview + assign again to complete the process.

Install the packages to receive events

For the receiving side, you need to install one or more packages. In this quickstart, you use Azure Blob storage to persist checkpoints so that the program doesn't read the events that it already read. It performs metadata checkpoints on received messages at regular intervals in a blob. This approach makes it easy to continue receiving messages later from where you left off.

pip install azure-eventhub-checkpointstoreblob-aiopip install azure-identity

Create a Python script to receive events

In this section, you create a Python script to receive events from your event hub:

  1. Open your favorite Python editor, such asVisual Studio Code.

  2. Create a script calledrecv.py.

  3. Paste the following code intorecv.py:

    In the code, use real values to replace the following placeholders:

    • BLOB_STORAGE_ACCOUNT_URL - This value should be in the format:https://<YOURSTORAGEACCOUNTNAME>.blob.core.windows.net/
    • BLOB_CONTAINER_NAME - Name of the blob container in the Azure storage account.
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE - You see the fully qualified name on theOverview page of the namespace. It should be in the format:<NAMESPACENAME>>.servicebus.windows.net.
    • EVENT_HUB_NAME - Name of the event hub.
    import asynciofrom azure.eventhub.aio import EventHubConsumerClientfrom azure.eventhub.extensions.checkpointstoreblobaio import (    BlobCheckpointStore,)from azure.identity.aio import DefaultAzureCredentialBLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"EVENT_HUB_NAME = "EVENT_HUB_NAME"credential = DefaultAzureCredential()async def on_event(partition_context, event):    # Print the event data.    print(        'Received the event: "{}" from the partition with ID: "{}"'.format(            event.body_as_str(encoding="UTF-8"), partition_context.partition_id        )    )    # Update the checkpoint so that the program doesn't read the events    # that it has already read when you run it next time.    await partition_context.update_checkpoint(event)async def main():    # Create an Azure blob checkpoint store to store the checkpoints.    checkpoint_store = BlobCheckpointStore(        blob_account_url=BLOB_STORAGE_ACCOUNT_URL,        container_name=BLOB_CONTAINER_NAME,        credential=credential,    )    # Create a consumer client for the event hub.    client = EventHubConsumerClient(        fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,        eventhub_name=EVENT_HUB_NAME,        consumer_group="$Default",        checkpoint_store=checkpoint_store,        credential=credential,    )    async with client:        # Call the receive method. Read from the beginning of the partition        # (starting_position: "-1")        await client.receive(on_event=on_event, starting_position="-1")    # Close credential when no longer needed.    await credential.close()if __name__ == "__main__":    # Run the main method.    asyncio.run(main())

    Note

    For examples of other options for receiving events from an event hub asynchronously using a connection string, see theGitHub recv_with_checkpoint_store_async.pypage. The patterns shown there are also applicable to receiving events passwordless.

Run the receiver app

  1. Launch a command prompt.

  2. Run the following command and sign in using account that was added to theAzure Event Hubs Data Owner role on the Event Hubs namespace andStorage Blob Data Contributor role on the Azure storage account.

    az login
  3. Switch to the folder that has the receive.py file, and run the following command:

    python recv.py

Run the sender app

  1. Launch a command prompt.

  2. Run the following command and sign in using account that was added to theAzure Event Hubs Data Owner role on the Event Hubs namespace andStorage Blob Data Contributor role on the Azure storage account.

    az login
  3. Switch to the folder that has the send.py, and then run this command:

    python send.py

The receiver window should display the messages that were sent to the event hub.

Troubleshooting

If you don't see events in the receiver window or the code reports an error, try the following troubleshooting tips:

  • If you don't see results fromrecy.py, runsend.py several times.

  • If you see errors about "coroutine" when using the passwordless code (with credentials), make sure you're using importing fromazure.identity.aio.

  • If you see "Unclosed client session" with passwordless code (with credentials), make sure you close the credential when finished. For more information, seeAsync credentials.

  • If you see authorization errors withrecv.py when accessing storage, make sure you followed the steps inCreate an Azure storage account and a blob container and assigned theStorage Blob Data Contributor role to the service principal.

  • If you receive events with different partition IDs, this result is expected. Partitions are a data organization mechanism that relates to the downstream parallelism required in consuming applications. The number of partitions in an event hub directly relates to the number of concurrent readers you expect to have. For more information, seeLearn more about partitions.

Next steps

In this quickstart, you sent and received events asynchronously. To learn how to send and receive events synchronously, go to theGitHub sync_samples page.

Explore more examples and advanced scenarios in theAzure Event Hubs client library for Python samples.


Feedback

Was this page helpful?

YesNoNo

Need help with this topic?

Want to try using Ask Learn to clarify or guide you through this topic?

Suggest a fix?

  • Last updated on

In this article

Was this page helpful?

YesNo
NoNeed help with this topic?

Want to try using Ask Learn to clarify or guide you through this topic?

Suggest a fix?