This browser is no longer supported.
Upgrade to Microsoft Edge to take advantage of the latest features, security updates, and technical support.
Note
Access to this page requires authorization. You can trysigning in orchanging directories.
Access to this page requires authorization. You can trychanging directories.
Azure Event Hubs is a Big Data streaming platform and event ingestion service, capable of receiving and processing millions of events per second. Event Hubs can process and store events, data, or telemetry produced by distributed software and devices. Data sent to an event hub can be transformed and stored using any real-time analytics provider or batching/storage adapters. For detailed overview of Event Hubs, seeEvent Hubs overview andEvent Hubs features.
This quickstart describes how to write Go applications to send events to or receive events from an event hub.
Note
This quickstart is based on samples on GitHub athttps://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. The send events section is based on theexample_producing_events_test.go sample and the receive one is based on theexample_processor_test.go sample. The code is simplified for the quickstart and all the detailed comments are removed, so look at the samples for more details and explanations.
To complete this quickstart, you need the following prerequisites:
This section shows you how to create a Go application to send events to an event hub.
Get the Go package for Event Hubs as shown in the following example.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubsHere's the code to send events to an event hub. The main steps in the code are:
Important
ReplaceNAMESPACE CONNECTION STRING with the connection string to your Event Hubs namespace andEVENT HUB NAME with the event hub name in the sample code.
package mainimport ("context""github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs")func main() {// create an Event Hubs producer client using a connection string to the namespace and the event hubproducerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)if err != nil {panic(err)}defer producerClient.Close(context.TODO())// create sample eventsevents := createEventsForSample()// create a batch object and add sample events to the batchnewBatchOptions := &azeventhubs.EventDataBatchOptions{}batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)if err != nil {panic(err)}for i := 0; i < len(events); i++ {err = batch.AddEventData(events[i], nil)if err != nil {panic(err)}}// send the batch of events to the event huberr = producerClient.SendEventDataBatch(context.TODO(), batch, nil)if err != nil {panic(err)}}func createEventsForSample() []*azeventhubs.EventData {return []*azeventhubs.EventData{{Body: []byte("hello"),},{Body: []byte("world"),},}}Don't run the application yet. You first need to run the receiver app and then the sender app.
State such as leases on partitions and checkpoints in the events are shared between receivers using an Azure Storage container. You can create a storage account and container with the Go SDK, but you can also create one by following the instructions inAbout Azure storage accounts.
Follow these recommendations when you use Azure Blob Storage as a checkpoint store:
On theStorage account page in the Azure portal, in theBlob service section, ensure that the following settings are disabled.
To receive the messages, get the Go packages for Event Hubs as shown in the following example.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubsgo get github.com/Azure/azure-sdk-for-go/sdk/storage/azblobHere's the code to receive events from an event hub. The main steps in the code are:
Important
Replace the following placeholder values with actual values:
AZURE STORAGE CONNECTION STRING with the connection string for your Azure storage accountBLOB CONTAINER NAME with the name of the blob container you created in the storage accountNAMESPACE CONNECTION STRING with the connection string for your Event Hubs namespaceEVENT HUB NAME with the event hub name in the sample code.package mainimport ("context""errors""fmt""time""github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs""github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints""github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container")func main() {// create a container client using a connection string and container namecheckClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)if err != nil {panic(err)}// create a checkpoint store that will be used by the event hubcheckpointStore, err := checkpoints.NewBlobStore(checkClient, nil)if err != nil {panic(err)}// create a consumer client using a connection string to the namespace and the event hubconsumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)if err != nil {panic(err)}defer consumerClient.Close(context.TODO())// create a processor to receive and process eventsprocessor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)if err != nil {panic(err)}// for each partition in the event hub, create a partition client with processEvents as the function to process eventsdispatchPartitionClients := func() {for {partitionClient := processor.NextPartitionClient(context.TODO())if partitionClient == nil {break}go func() {if err := processEvents(partitionClient); err != nil {panic(err)}}()}}// run all partition clientsgo dispatchPartitionClients()processorCtx, processorCancel := context.WithCancel(context.TODO())defer processorCancel()if err := processor.Run(processorCtx); err != nil {panic(err)}}func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {defer closePartitionResources(partitionClient)for {receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)receiveCtxCancel()if err != nil && !errors.Is(err, context.DeadlineExceeded) {return err}fmt.Printf("Processing %d event(s)\n", len(events))for _, event := range events {fmt.Printf("Event received with body %v\n", string(event.Body))}if len(events) != 0 {if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {return err}}}}func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {defer partitionClient.Close(context.TODO())}Run the receiver app first.
Run the sender app.
Wait for a minute to see the following output in the receiver window.
Processing 2 event(s)Event received with body helloEvent received with body worldSee samples on GitHub athttps://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.
Was this page helpful?
Need help with this topic?
Want to try using Ask Learn to clarify or guide you through this topic?
Was this page helpful?
Want to try using Ask Learn to clarify or guide you through this topic?