Create a Amazon Managed Streaming for Apache Kafka import topic Stay organized with collections Save and categorize content based on your preferences.
An Amazon Managed Streaming for Apache Kafka (Amazon MSK) importtopic lets you continuously ingest data fromAmazon MSK as anexternal source and into Pub/Sub. Then you can stream the datainto any of the destinations that Pub/Sub supports.
Note: OnlyAmazon MSK provisionedclusterscan be imported into Pub/Sub. Your clusters also must bepublic. Clusters behind private VPCs aren't supported.This document shows you how to create and manage Amazon MSKimport topics. To create a standard topic, seeCreate a standardtopic.
For more information about import topics, seeAbout import topics.
Before you begin
Know more about thePub/Sub publishprocess.
Configure therequired roles and permissions to manage Amazon MSKimport topics including the following:
Set upworkload identity federation so thatGoogle Cloud can access the external streaming service.
Required roles and permissions
To get the permissions that you need to create and manage Amazon MSK import topics, ask your administrator to grant you thePub/Sub Editor (roles/pubsub.editor) IAM role on your topic or project. For more information about granting roles, seeManage access to projects, folders, and organizations.
This predefined role contains the permissions required to create and manage Amazon MSK import topics. To see the exact permissions that are required, expand theRequired permissions section:
Required permissions
The following permissions are required to create and manage Amazon MSK import topics:
- Create an import topic:
pubsub.topics.create - Delete an import topic:
pubsub.topics.delete - Get an import topic:
pubsub.topics.get - List an import topic:
pubsub.topics.list - Publish to an import topic:
pubsub.topics.publish and pubsub.serviceAgent - Update an import topic:
pubsub.topics.update - Get the IAM policy for an import topic:
pubsub.topics.getIamPolicy - Configure theIAM policy for an import topic:
pubsub.topics.setIamPolicy
You might also be able to get these permissions withcustom roles or otherpredefined roles.
You can configure access control at the project level and theindividual resource level.
Set up federated identity to access Amazon MSK
Workload Identity Federation lets Google Cloud services access workloadsrunning outside of Google Cloud. With identity federation, you don't needto maintain or pass credentials to Google Cloud to access your resourcesin other clouds. Instead, you can use the identities of the workloads themselvesto authenticate to Google Cloud and access resources.
Create a service account in Google Cloud
This is an optional step. If you already have a service account, you can useit in this procedure instead of creating a new service account.If you are using an existing service account, go toRecord the service account unique ID for thenext step.
For Amazon MSK import topics, Pub/Sub uses theservice account as the identity to access resources from AWS.
For more information about creating a service account, including prerequisites,required roles and permissions, and naming guidelines, seeCreate service accounts. After you createa service account, you might need to wait for 60 seconds or more before youuse the service account. This behavior occurs because read operations areeventually consistent; it can take time for the new service account tobecome visible.
Record the service account unique ID
You need a service account unique ID to set up a role in the AWS console.
In the Google Cloud console, go to theService account details page.
Click the service account that you just created or the one that you areplanning to use.
From theService account details page, record the Unique ID number.
You need the ID as part of the workflow to set upa role in the AWS console.
Add the service account token creator role to the Pub/Sub service account
Note: Pub/Sub creates and maintains a service account for eachproject. The service account has the following format:service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com. Perform this procedure only if thePub/Sub service account does not have the Pub/SubService Agent role (roles/pubsub.serviceAgent).TheService account token creator role (roles/iam.serviceAccountTokenCreator)lets principalscreate short-lived credentialsfor a service account. These tokens or credentials are used to impersonatethe service account.
For more information about service account impersonation, seeService account impersonation.
You can also add thePub/Sub publisher role (roles/pubsub.publisher)during this procedure. For more information about the role and why you are adding it,seeAdd the Pub/Sub publisher role to the Pub/Sub service account.
In the Google Cloud console, go to theIAM page.
Click theInclude Google-provided role grants checkbox.
Look for the service account that has the format
service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com.For this service account, click theEdit Principal button.
If required, clickAdd another role.
Search and click theService account token creator role (
roles/iam.serviceAccountTokenCreator).ClickSave.
Create a policy in AWS
You need a policy in AWS to let Pub/Sub authenticate to AWS sothat Pub/Sub can ingest data from Amazon MSK.
- For more methods and information about how to create a policy in AWS, seeCreating IAM policies.
To create a policy in AWS, perform the following steps:
Sign in to the AWS Management Console and open theIAM console.
In the navigation pane of the console forIAM,clickAccess Management >Policies.
ClickCreate policy.
ForClick a service, clickMSK.
ForAction allowed, clickRead >GetBootstrapBrokers.
This action grants permission to get the bootstrap brokers that Pub/Subuses to connect to the MSK cluster.
ClickAdd more permissions.
ForSelect a service, clickApache Kafka APIs for MSK.
ForAction allowed, select the following:
List >DescribeTopic
This action grants permission to allow the Pub/Sub ingestiontopic to get details about the Amazon MSK Kafka topic.
Read >ReadData
This action grants permission to read data from the Amazon MSK Kafka topic.
Write >Connect
This action grants permission to connect and authenticate to theAmazon MSK Kafka cluster.
ForResources, specify thecluster ARN (if you wantto restrict the policy to specific clusters, which is recommended).
ClickAdd more permissions.
ForSelect a service, clickSTS.
ForAction allowed, clickWrite >AssumeRoleWithWebIdentity.
This action grants permission to obtain a set of temporary securitycredentials for Pub/Sub to authenticate to Amazon MSKby using identity federation.
ClickNext.
Enter a policy name and description.
ClickCreate policy.
Create a role in AWS using a custom trust policy
You must create a role in AWS so that Pub/Sub can authenticateto AWS to ingest data from Amazon MSK.
Sign in to the AWS Management Console and open theIAM console.
In the navigation pane of the console forIAM, clickRoles.
ClickCreate role.
ForSelect trusted entity, clickCustom trust policy.
In theCustom trust policy section, enter or paste the following:
{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"Federated":"accounts.google.com"},"Action":"sts:AssumeRoleWithWebIdentity","Condition":{"StringEquals":{"accounts.google.com:sub":"<SERVICE_ACCOUNT_UNIQUE_ID>"}}}]}Replace
<SERVICE_ACCOUNT_UNIQUE_ID>with the unique ID ofthe service account that you recorded inRecord the service account unique ID.ClickNext.
ForAdd permissions, search and click the custom policy thatyou just created.
ClickNext.
Enter a role name and description.
ClickCreate role.
Add the Pub/Sub publisher role to the Pub/Sub principal
To enable publishing, you must assign a publisher role to thePub/Sub service account so that Pub/Sub is able topublish to the Amazon MSK import topic.
Add the Pub/Sub service agent role to the Pub/Sub service account
Note: The Pub/Sub service agent role is granted by default for all projects after April 9, 2021. If your project has been created after this date and the Pub/Sub service agent role has not been removed, your Pub/Sub service account will already have this role.To allow Pub/Sub to use your import topic project's publish quota, the Pub/Sub service agent requires theserviceusage.services.use permission on your import topic's project.
To provide this permission, we recommend you add the Pub/Sub service agent role to the Pub/Sub service account.
If the Pub/Sub service account does not have the Pub/Sub service agent role, it can be granted as follows:
In the Google Cloud console, go to theIAM page.
Click theInclude Google-provided role grants checkbox.
Look for the service account that has the format
service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com.For this service account, click theEdit principal button.
If required, clickAdd another role.
Search and click thePub/Sub Service Agent role (
roles/pubsub.serviceAgent).ClickSave.
Enable publishing from all topics
Use this method if you have not created any Amazon MSK importtopics.
In the Google Cloud console, go to theIAM page.
Click theInclude Google-provided role grants checkbox.
Look for the service account that has the format
service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com.For this service account, click theEdit principal button.
If required, clickAdd another role.
Search and click thePub/Sub publisher role (
roles/pubsub.publisher).ClickSave.
Enable publishing from a single topic
Use this method only if the Amazon MSK import topic already exists.
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, aCloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Run the
gcloud pubsub topics add-iam-policy-bindingcommand:gcloudpubsubtopicsadd-iam-policy-bindingTOPIC_ID\--member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com"\--role="roles/pubsub.publisher"
Replace the following:
TOPIC_ID: the topic ID of the Amazon MSK import topic.PROJECT_NUMBER: the project number. To view the projectnumber, seeIdentifying projects.
Add the service account user role to the service account
TheService Account User role (roles/iam.serviceAccountUser) includes thepermissioniam.serviceAccounts.actAs that lets a principal attach a serviceaccount to the Amazon MSK import topic's ingestion settings anduse that service account for federated identity.
In the Google Cloud console, go to theIAM page.
For the principal that's issuing the create or update topic calls, click theEdit principal button.
If required, clickAdd another role.
Search and click theService account user role(
roles/iam.serviceAccountUser).ClickSave.
Use Amazon MSK import topics
You can create a new import topic or edit an existing topic.
Considerations
Caution: If you already have data in Amazon MSK, firstcreate astandard topic and a default subscription without enabling the ingestionsettings. Then,convert the topic to an import topic. This method ensuresthat all messages are received by the subscription.Creating the topic and subscription separately, even if done in rapidsuccession, can lead to data loss. There's a short window where the topicexists without a subscription. If any data is sent to the topic during thistime, it is lost. By creating the topic first, creating the subscription,and then converting the topic to an import topic, you guarantee that nomessages are missed during the import process.
If you need to re-create the Kafka topic of an existing import topic withthe same name, you can'tjust delete the Kafka topic and re-create it.This action can invalidate Pub/Sub's offset management, which can lead todata loss. To mitigate this, follow these steps:
- Delete the Pub/Sub import topic.
- Delete the Kafka topic.
- Create the Kafka topic.
- Create the Pub/Sub import topic.
Data from an Amazon MSK Kafka topic is always read from theearliestoffset.
Create Amazon MSK import topics
To know more about properties associated with a topic, seeProperties of atopic.
Ensure that you have completed the following procedures:
To create an Amazon MSK import topic, follow these steps:
Console
In the Google Cloud console, go to theTopics page.
ClickCreate topic.
In theTopic ID field, enter an ID for your Amazon MSKimport topic. For more information about naming topics, see thenamingguidelines.
SelectAdd a default subscription.
SelectEnable ingestion.
For ingestion source, selectAmazon MSK.
Enter the following details:
- Cluster ARN: The ARN of the Amazon MSK that you are ingestinginto Pub/Sub. The ARN format is as follows:
arn:aws:kafka:${Region}:${Account}:cluster/${ClusterName}/${ClusterId}. - Topic: The name of the Amazon MSK Kafka topicthat you are ingesting into Pub/Sub.
- AWS Role ARN: The ARN of the AWS role. The ARN format of the role isas follows:
arn:aws:iam::${Account}:role/${RoleName}. - Service account: The service account that you created inCreate aservice account in Google Cloud.
- Cluster ARN: The ARN of the Amazon MSK that you are ingestinginto Pub/Sub. The ARN format is as follows:
ClickCreate topic.
gcloud
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, aCloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Run the
gcloud pubsub topics createcommand:gcloudpubsubtopicscreateTOPIC_ID\--aws-msk-ingestion-cluster-arnMSK_CLUSTER_ARN\--aws-msk-ingestion-topicMSK_TOPIC\--aws-msk-ingestion-aws-role-arnMSK_ROLE_ARN\--aws-msk-ingestion-service-accountPUBSUB_SERVICE_ACCOUNTReplace the following:
TOPIC_ID: the name or ID of your Pub/Subtopic.MSK_CLUSTER_ARN: the ARN for the Amazon MSKcluster that you are ingesting into Pub/Sub. The ARN formatis as follows:arn:aws:kafka:${Region}:${Account}:cluster/${ClusterName}/${ClusterId}.MSK_TOPIC: the name of the Amazon MSKKafka topic that you are ingesting into Pub/Sub.MSK_ROLE_ARN: the ARN of the AWS role. The ARNformat of the role is as follows:arn:aws:iam::${Account}:role/${RoleName}.PUBSUB_SERVICE_ACCOUNT: the service account thatyou created inCreate a service account in Google Cloud.
C++
Before trying this sample, follow the C++ setup instructions inQuickstart: Using Client Libraries. For more information, see thePub/Sub C++ API reference documentation.
namespacepubsub=::google::cloud::pubsub;namespacepubsub_admin=::google::cloud::pubsub_admin;[](pubsub_admin::TopicAdminClientclient,std::stringproject_id,std::stringtopic_id,std::stringconst&cluster_arn,std::stringconst&msk_topic,std::stringconst&aws_role_arn,std::stringconst&gcp_service_account){google::pubsub::v1::Topicrequest;request.set_name(pubsub::Topic(std::move(project_id),std::move(topic_id)).FullName());auto*aws_msk=request.mutable_ingestion_data_source_settings()->mutable_aws_msk();aws_msk->set_cluster_arn(cluster_arn);aws_msk->set_topic(msk_topic);aws_msk->set_aws_role_arn(aws_role_arn);aws_msk->set_gcp_service_account(gcp_service_account);autotopic=client.CreateTopic(request);// Note that kAlreadyExists is a possible error when the library retries.if(topic.status().code()==google::cloud::StatusCode::kAlreadyExists){std::cout <<"The topic already exists\n";return;}if(!topic)throwstd::move(topic).status();std::cout <<"The topic was successfully created: " <<topic->DebugString() <<"\n";}Go
The following sample uses the major version of the Go Pub/Sub client library (v2). If you are still using the v1 library, seethe migration guide to v2.To see a list of v1 code samples, seethe deprecated code samples.
Before trying this sample, follow the Go setup instructions inQuickstart: Using Client Libraries.For more information, see thePub/Sub Go API reference documentation.
import("context""fmt""io""cloud.google.com/go/pubsub/v2""cloud.google.com/go/pubsub/v2/apiv1/pubsubpb")funccreateTopicWithAWSMSKIngestion(wio.Writer,projectID,topicID,clusterARN,mskTopic,awsRoleARN,gcpSAstring)error{// projectID := "my-project-id"// topicID := "my-topic"// // AWS MSK ingestion settings.// clusterARN := "cluster-arn"// mskTopic := "msk-topic"// awsRoleARN := "aws-role-arn"// gcpSA := "gcp-service-account"ctx:=context.Background()client,err:=pubsub.NewClient(ctx,projectID)iferr!=nil{returnfmt.Errorf("pubsub.NewClient: %w",err)}deferclient.Close()topicpb:=&pubsubpb.Topic{Name:fmt.Sprintf("projects/%s/topics/%s",projectID,topicID),IngestionDataSourceSettings:&pubsubpb.IngestionDataSourceSettings{Source:&pubsubpb.IngestionDataSourceSettings_AwsMsk_{AwsMsk:&pubsubpb.IngestionDataSourceSettings_AwsMsk{ClusterArn:clusterARN,Topic:mskTopic,AwsRoleArn:awsRoleARN,GcpServiceAccount:gcpSA,},},},}topic,err:=client.TopicAdminClient.CreateTopic(ctx,topicpb)iferr!=nil{returnfmt.Errorf("CreateTopic: %w",err)}fmt.Fprintf(w,"Created topic with AWS MSK ingestion settings: %v\n",topic)returnnil}Java
Before trying this sample, follow the Java setup instructions inQuickstart: Using Client Libraries. For more information, see thePub/Sub Java API reference documentation.
importcom.google.cloud.pubsub.v1.TopicAdminClient;importcom.google.pubsub.v1.IngestionDataSourceSettings;importcom.google.pubsub.v1.Topic;importcom.google.pubsub.v1.TopicName;importjava.io.IOException;publicclassCreateTopicWithAwsMskIngestionExample{publicstaticvoidmain(String...args)throwsException{// TODO(developer): Replace these variables before running the sample.StringprojectId="your-project-id";StringtopicId="your-topic-id";// AWS MSK ingestion settings.StringclusterArn="cluster-arn";StringmskTopic="msk-topic";StringawsRoleArn="aws-role-arn";StringgcpServiceAccount="gcp-service-account";createTopicWithAwsMskIngestionExample(projectId,topicId,clusterArn,mskTopic,awsRoleArn,gcpServiceAccount);}publicstaticvoidcreateTopicWithAwsMskIngestionExample(StringprojectId,StringtopicId,StringclusterArn,StringmskTopic,StringawsRoleArn,StringgcpServiceAccount)throwsIOException{try(TopicAdminClienttopicAdminClient=TopicAdminClient.create()){TopicNametopicName=TopicName.of(projectId,topicId);IngestionDataSourceSettings.AwsMskawsMsk=IngestionDataSourceSettings.AwsMsk.newBuilder().setClusterArn(clusterArn).setTopic(mskTopic).setAwsRoleArn(awsRoleArn).setGcpServiceAccount(gcpServiceAccount).build();IngestionDataSourceSettingsingestionDataSourceSettings=IngestionDataSourceSettings.newBuilder().setAwsMsk(awsMsk).build();Topictopic=topicAdminClient.createTopic(Topic.newBuilder().setName(topicName.toString()).setIngestionDataSourceSettings(ingestionDataSourceSettings).build());System.out.println("Created topic with AWS MSK ingestion settings: "+topic.getAllFields());}}}Node.js
Before trying this sample, follow the Node.js setup instructions inQuickstart: Using Client Libraries. For more information, see thePub/Sub Node.js API reference documentation.
/***TODO(developer):Uncommentthesevariablesbeforerunningthesample.*///consttopicNameOrId='YOUR_TOPIC_NAME_OR_ID';//constclusterArn='arn:aws:kafka:...';//constmskTopic='YOUR_MSK_TOPIC';//constroleArn='arn:aws:iam:...';//constgcpServiceAccount='ingestion-account@...';//ImportstheGoogleCloudclientlibraryconst{PubSub}=require('@google-cloud/pubsub');//Createsaclient;cachethisforfurtheruseconstpubSubClient=newPubSub();asyncfunctioncreateTopicWithAwsMskIngestion(topicNameOrId,clusterArn,mskTopic,awsRoleArn,gcpServiceAccount,){//CreatesanewtopicwithAWSMSKingestion.awaitpubSubClient.createTopic({name:topicNameOrId,ingestionDataSourceSettings:{awsMsk:{clusterArn,topic:mskTopic,awsRoleArn,gcpServiceAccount,},},});console.log(`Topic${topicNameOrId}createdwithAWSMSKingestion.`);}Node.ts
Before trying this sample, follow the Node.js setup instructions inQuickstart: Using Client Libraries. For more information, see thePub/Sub Node.js API reference documentation.
/***TODO(developer):Uncommentthesevariablesbeforerunningthesample.*///consttopicNameOrId='YOUR_TOPIC_NAME_OR_ID';//constclusterArn='arn:aws:kafka:...';//constmskTopic='YOUR_MSK_TOPIC';//constroleArn='arn:aws:iam:...';//constgcpServiceAccount='ingestion-account@...';//ImportstheGoogleCloudclientlibraryimport{PubSub}from'@google-cloud/pubsub';//Createsaclient;cachethisforfurtheruseconstpubSubClient=newPubSub();asyncfunctioncreateTopicWithAwsMskIngestion(topicNameOrId:string,clusterArn:string,mskTopic:string,awsRoleArn:string,gcpServiceAccount:string,){//CreatesanewtopicwithAWSMSKingestion.awaitpubSubClient.createTopic({name:topicNameOrId,ingestionDataSourceSettings:{awsMsk:{clusterArn,topic:mskTopic,awsRoleArn,gcpServiceAccount,},},});console.log(`Topic${topicNameOrId}createdwithAWSMSKingestion.`);}Python
Before trying this sample, follow the Python setup instructions inQuickstart: Using Client Libraries. For more information, see thePub/Sub Python API reference documentation.
fromgoogle.cloudimportpubsub_v1fromgoogle.pubsub_v1.typesimportTopicfromgoogle.pubsub_v1.typesimportIngestionDataSourceSettings# TODO(developer)# project_id = "your-project-id"# topic_id = "your-topic-id"# cluster_arn = "your-cluster-arn"# msk_topic = "your-msk-topic"# aws_role_arn = "your-aws-role-arn"# gcp_service_account = "your-gcp-service-account"publisher=pubsub_v1.PublisherClient()topic_path=publisher.topic_path(project_id,topic_id)request=Topic(name=topic_path,ingestion_data_source_settings=IngestionDataSourceSettings(aws_msk=IngestionDataSourceSettings.AwsMsk(cluster_arn=cluster_arn,topic=msk_topic,aws_role_arn=aws_role_arn,gcp_service_account=gcp_service_account,)),)topic=publisher.create_topic(request=request)print(f"Created topic:{topic.name} with AWS MSK Ingestion Settings")For more information about ARNs, seeAmazon Resource Names (ARNs) andIAM Identifiers.
If you run into issues, seeTroubleshooting an Amazon MSKimport topic.
Edit Amazon MSK import topics
To edit the ingestion data source settings of an Amazon MSKimport topic, follow these steps:
Console
In the Google Cloud console, go to theTopics page.
Click the Amazon MSK import topic.
In the topic details page, clickEdit.
Update the fields that you want to change.
ClickUpdate.
gcloud
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, aCloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Run the
gcloud pubsub topics updatecommand with all theflags mentioned in the following sample:gcloudpubsubtopicsupdateTOPIC_ID\--aws-msk-ingestion-cluster-arnMSK_CLUSTER_ARN\--aws-msk-ingestion-topicMSK_TOPIC\--aws-msk-ingestion-aws-role-arnMSK_ROLE_ARN\--aws-msk-ingestion-service-accountPUBSUB_SERVICE_ACCOUNT
Replace the following:
- TOPIC_ID: the name or ID of your Pub/Sub topic.
- MSK_CLUSTER_ARN: the ARN for the Amazon MSKcluster that you are ingesting into Pub/Sub. The ARN formatis as follows:
arn:aws:kafka:${Region}:${Account}:cluster/${ClusterName}/${ClusterId}. - MSK_TOPIC: the name of the Amazon MSK Kafka topic that you are ingesting into Pub/Sub.
- MSK_ROLE_ARN: the ARN of the AWS role. The ARN formatof the role is as follows:
arn:aws:iam::${Account}:role/${RoleName}. - PUBSUB_SERVICE_ACCOUNT: the service account that you created inCreate a service account in Google Cloud.
Quotas and limits
The publisher throughput for import topics is bound by the publish quotaof the topic. For more information, seePub/Sub quotas and limits.
What's next
Choose thetype of subscription for your topic.
Learn how topublish a message to a topic.
Create or modify a topic withgcloud CLI,REST APIs, orClient libraries.
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2026-02-19 UTC.