Change topic type

You can convert an import topic to a standard one or conversely,a standard topic to an import one.

Convert an import topic to a standard topic

To convert an import topic to a standard topic, clearthe ingestion settings. Perform the following steps:

Console

  1. In the Google Cloud console, go to theTopics page.

    Go to Topics

  2. Click the import topic.

  3. In the topic details page, clickEdit.

  4. Clear the optionEnable ingestion.

  5. ClickUpdate.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, aCloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Run thegcloud pubsub topics update command:

    gcloudpubsubtopicsupdateTOPIC_ID\--clear-ingestion-data-source-settings

    ReplaceTOPIC_ID with the topic ID.

Convert a standard topic to an Amazon Kinesis Data Streams import topic

To convert a standard topic to an Amazon Kinesis Data Streams import topic,first check that you meet alltheprerequisites.

Console

  1. In the Google Cloud console, go to theTopics page.

    Go to Topics

  2. Click the topic that you want to convert to an import topic.

  3. In the topic details page, clickEdit.

  4. Select the optionEnable ingestion.

  5. For ingestion source, selectAmazon Kinesis Data Streams.

  6. Enter the following details:

    • Kinesis Stream ARN: The ARN for the Kinesis Data Stream that you are planning to ingest into Pub/Sub. The ARN format is as follows:arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • Kinesis Consumer ARN: The ARN of the consumer resource that is registered to the AWS Kinesis Data Stream. The ARN format is as follows:arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • AWS Role ARN: The ARN of the AWS role. The ARN format of the role is as follows:arn:aws:iam::${Account}:role/${RoleName}.

    • Service account: The service account that you created.

  7. ClickUpdate.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, aCloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Run thegcloud pubsub topics update command with all the flags mentioned in the following sample:

    gcloudpubsubtopicsupdateTOPIC_ID
    --kinesis-ingestion-stream-arnKINESIS_STREAM_ARN
    --kinesis-ingestion-consumer-arnKINESIS_CONSUMER_ARN
    --kinesis-ingestion-role-arnKINESIS_ROLE_ARN
    --kinesis-ingestion-service-accountPUBSUB_SERVICE_ACCOUNT

    Replace the following:

    • TOPIC_ID is the topic ID or name. This field cannot be updated.

    • KINESIS_STREAM_ARN is the ARN for the Kinesis Data Streams that you are planning to ingest into Pub/Sub. The ARN format is as follows:arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • KINESIS_CONSUMER_ARN is the ARN of the consumer resource that is registered to the AWS Kinesis Data Streams. The ARN format is as follows:arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • KINESIS_ROLE_ARN is the ARN of the AWS role. The ARN format of the role is as follows:arn:aws:iam::${Account}:role/${RoleName}.

    • PUBSUB_SERVICE_ACCOUNT is the service account that you created.

Go

The following sample uses the major version of the Go Pub/Sub client library (v2). If you are still using the v1 library, seethe migration guide to v2.To see a list of v1 code samples, seethe deprecated code samples.

Before trying this sample, follow the Go setup instructions inQuickstart: Using Client Libraries.For more information, see thePub/Sub Go API reference documentation.

import("context""fmt""io""cloud.google.com/go/pubsub/v2""cloud.google.com/go/pubsub/v2/apiv1/pubsubpb""google.golang.org/protobuf/types/known/fieldmaskpb")funcupdateTopicType(wio.Writer,projectID,topicstring)error{// projectID := "my-project-id"// topic := "projects/my-project-id/topics/my-topic"streamARN:="stream-arn"consumerARN:="consumer-arn"awsRoleARN:="aws-role-arn"gcpServiceAccount:="gcp-service-account"ctx:=context.Background()client,err:=pubsub.NewClient(ctx,projectID)iferr!=nil{returnfmt.Errorf("pubsub.NewClient: %w",err)}deferclient.Close()pbTopic:=&pubsubpb.Topic{Name:topic,IngestionDataSourceSettings:&pubsubpb.IngestionDataSourceSettings{Source:&pubsubpb.IngestionDataSourceSettings_AwsKinesis_{AwsKinesis:&pubsubpb.IngestionDataSourceSettings_AwsKinesis{StreamArn:streamARN,ConsumerArn:consumerARN,AwsRoleArn:awsRoleARN,GcpServiceAccount:gcpServiceAccount,},},},}updateReq:=&pubsubpb.UpdateTopicRequest{Topic:pbTopic,UpdateMask:&fieldmaskpb.FieldMask{Paths:[]string{"ingestion_data_source_settings"},},}topicCfg,err:=client.TopicAdminClient.UpdateTopic(ctx,updateReq)iferr!=nil{returnfmt.Errorf("topic.Update: %w",err)}fmt.Fprintf(w,"Topic updated with kinesis source: %v\n",topicCfg)returnnil}

Java

Before trying this sample, follow the Java setup instructions inQuickstart: Using Client Libraries. For more information, see thePub/Sub Java API reference documentation.

importcom.google.cloud.pubsub.v1.TopicAdminClient;importcom.google.protobuf.FieldMask;importcom.google.pubsub.v1.IngestionDataSourceSettings;importcom.google.pubsub.v1.Topic;importcom.google.pubsub.v1.TopicName;importcom.google.pubsub.v1.UpdateTopicRequest;importjava.io.IOException;publicclassUpdateTopicTypeExample{publicstaticvoidmain(String...args)throwsException{// TODO(developer): Replace these variables before running the sample.StringprojectId="your-project-id";StringtopicId="your-topic-id";// Kinesis ingestion settings.StringstreamArn="stream-arn";StringconsumerArn="consumer-arn";StringawsRoleArn="aws-role-arn";StringgcpServiceAccount="gcp-service-account";UpdateTopicTypeExample.updateTopicTypeExample(projectId,topicId,streamArn,consumerArn,awsRoleArn,gcpServiceAccount);}publicstaticvoidupdateTopicTypeExample(StringprojectId,StringtopicId,StringstreamArn,StringconsumerArn,StringawsRoleArn,StringgcpServiceAccount)throwsIOException{try(TopicAdminClienttopicAdminClient=TopicAdminClient.create()){TopicNametopicName=TopicName.of(projectId,topicId);IngestionDataSourceSettings.AwsKinesisawsKinesis=IngestionDataSourceSettings.AwsKinesis.newBuilder().setStreamArn(streamArn).setConsumerArn(consumerArn).setAwsRoleArn(awsRoleArn).setGcpServiceAccount(gcpServiceAccount).build();IngestionDataSourceSettingsingestionDataSourceSettings=IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();// Construct the topic with Kinesis ingestion settings.Topictopic=Topic.newBuilder().setName(topicName.toString()).setIngestionDataSourceSettings(ingestionDataSourceSettings).build();// Construct a field mask to indicate which field to update in the topic.FieldMaskupdateMask=FieldMask.newBuilder().addPaths("ingestion_data_source_settings").build();UpdateTopicRequestrequest=UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();Topicresponse=topicAdminClient.updateTopic(request);System.out.println("Updated topic with Kinesis ingestion settings: "+response.getAllFields());}}}

Node.js

Before trying this sample, follow the Node.js setup instructions inQuickstart: Using Client Libraries. For more information, see thePub/Sub Node.js API reference documentation.

/** * TODO(developer): Uncomment these variables before running the sample. */// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';// const awsRoleArn = 'arn:aws:iam:...';// const gcpServiceAccount = 'ingestion-account@...';// const streamArn = 'arn:aws:kinesis:...';// const consumerArn = 'arn:aws:kinesis:...';// Imports the Google Cloud client libraryconst{PubSub}=require('@google-cloud/pubsub');// Creates a client; cache this for further useconstpubSubClient=newPubSub();asyncfunctionupdateTopicIngestionType(topicNameOrId,awsRoleArn,gcpServiceAccount,streamArn,consumerArn,){constmetadata={ingestionDataSourceSettings:{awsKinesis:{awsRoleArn,gcpServiceAccount,streamArn,consumerArn,},},};awaitpubSubClient.topic(topicNameOrId).setMetadata(metadata);console.log('Topic updated with Kinesis source successfully.');}

Python

Before trying this sample, follow the Python setup instructions inQuickstart: Using Client Libraries. For more information, see thePub/Sub Python API reference documentation.

fromgoogle.cloudimportpubsub_v1fromgoogle.pubsub_v1.typesimportTopicfromgoogle.pubsub_v1.typesimportIngestionDataSourceSettingsfromgoogle.pubsub_v1.typesimportUpdateTopicRequestfromgoogle.protobufimportfield_mask_pb2# TODO(developer)# project_id = "your-project-id"# topic_id = "your-topic-id"# stream_arn = "your-stream-arn"# consumer_arn = "your-consumer-arn"# aws_role_arn = "your-aws-role-arn"# gcp_service_account = "your-gcp-service-account"publisher=pubsub_v1.PublisherClient()topic_path=publisher.topic_path(project_id,topic_id)update_request=UpdateTopicRequest(topic=Topic(name=topic_path,ingestion_data_source_settings=IngestionDataSourceSettings(aws_kinesis=IngestionDataSourceSettings.AwsKinesis(stream_arn=stream_arn,consumer_arn=consumer_arn,aws_role_arn=aws_role_arn,gcp_service_account=gcp_service_account,)),),update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),)topic=publisher.update_topic(request=update_request)print(f"Updated topic:{topic.name} with AWS Kinesis Ingestion Settings")

C++

Before trying this sample, follow the C++ setup instructions inQuickstart: Using Client Libraries. For more information, see thePub/Sub C++ API reference documentation.

namespacepubsub=::google::cloud::pubsub;namespacepubsub_admin=::google::cloud::pubsub_admin;[](pubsub_admin::TopicAdminClientclient,std::stringproject_id,std::stringtopic_id,std::stringstream_arn,std::stringconsumer_arn,std::stringaws_role_arn,std::stringgcp_service_account){google::pubsub::v1::UpdateTopicRequestrequest;request.mutable_topic()->set_name(pubsub::Topic(std::move(project_id),std::move(topic_id)).FullName());auto*aws_kinesis=request.mutable_topic()->mutable_ingestion_data_source_settings()->mutable_aws_kinesis();aws_kinesis->set_stream_arn(stream_arn);aws_kinesis->set_consumer_arn(consumer_arn);aws_kinesis->set_aws_role_arn(aws_role_arn);aws_kinesis->set_gcp_service_account(gcp_service_account);*request.mutable_update_mask()->add_paths()="ingestion_data_source_settings";autotopic=client.UpdateTopic(request);if(!topic)throwstd::move(topic).status();std::cout <<"The topic was successfully updated: " <<topic->DebugString()            <<"\n";}

Node.js

Before trying this sample, follow the Node.js setup instructions inQuickstart: Using Client Libraries. For more information, see thePub/Sub Node.js API reference documentation.

/** * TODO(developer): Uncomment these variables before running the sample. */// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';// const awsRoleArn = 'arn:aws:iam:...';// const gcpServiceAccount = 'ingestion-account@...';// const streamArn = 'arn:aws:kinesis:...';// const consumerArn = 'arn:aws:kinesis:...';// Imports the Google Cloud client libraryimport{PubSub,TopicMetadata}from'@google-cloud/pubsub';// Creates a client; cache this for further useconstpubSubClient=newPubSub();asyncfunctionupdateTopicIngestionType(topicNameOrId:string,awsRoleArn:string,gcpServiceAccount:string,streamArn:string,consumerArn:string,){constmetadata:TopicMetadata={ingestionDataSourceSettings:{awsKinesis:{awsRoleArn,gcpServiceAccount,streamArn,consumerArn,},},};awaitpubSubClient.topic(topicNameOrId).setMetadata(metadata);console.log('Topic updated with Kinesis source successfully.');}

For more information about ARNs, seeAmazon Resource Names (ARNs) andIAM Identifiers.

Convert a standard topic to a Cloud Storage import topic

To convert a standard topic to a Cloud Storage import topic,first check that you meet all theprerequisites.

Console

  1. In the Google Cloud console, go to theTopics page.

    Go to Topics

  2. Click the topic that you want to convert to a Cloud Storage import topic.

  3. In the topic details page, clickEdit.

  4. Select the optionEnable ingestion.

  5. For ingestion source, selectGoogle Cloud Storage.

  6. For the Cloud Storage bucket, clickBrowse.

    TheSelect bucket page opens. Select one of the following options:

    • Select an existing bucket from any appropriate project.

    • Click the create icon and follow the instructions on the screen to create a new bucket. After you create the bucket, select the bucket for the Cloud Storage import topic.

  7. When you specify the bucket, Pub/Sub checks for the appropriate permissions on the bucket for the Pub/Sub service account. If there are permissions issues, you see an error message related to the permissions.

    If you get permission issues, clickSet permissions. For more information, see Grant Cloud Storage permissions to the Pub/Sub service account.

  8. ForObject format, selectText,Avro, orPub/Sub Avro.

    If you selectText, you can optionally specify aDelimiter with which to split objects into messages.

    For more information about these options, seeInput format.

  9. Optional. You can specify aMinimum object creation time for your topic. If set, only objects created after the minimum object creation time are ingested.

    For more information, see Minimum object creation time.

  10. You must specify aGlob pattern. To ingest all objects in the bucket, use** as the glob pattern. Only objects that match the given pattern are ingested.

    For more information, see Match a glob pattern.

  11. Retain the other default settings.
  12. ClickUpdate topic.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, aCloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. To avoid losing your settings for the import topic, make sure to include all of them every time you update the topic. If you leave something out, Pub/Sub resets the setting to its original default value.

    Run thegcloud pubsub topics update command with all the flags mentioned in the following sample:

    gcloudpubsubtopicsupdateTOPIC_ID\--cloud-storage-ingestion-bucket=BUCKET_NAME\--cloud-storage-ingestion-input-format=INPUT_FORMAT\--cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\--cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\--cloud-storage-ingestion-match-glob=MATCH_GLOB

    Replace the following:

    • TOPIC_ID is the topic ID or name. This field cannot be updated.

    • BUCKET_NAME: Specifies the name of an existing bucket.For example,prod_bucket.The bucket name must not include the project ID.To create a bucket, seeCreate buckets.

    • INPUT_FORMAT: Specifies the format of the objects that are ingested.This can betext,avro, orpubsub_avro.For more information about these options, seeInput format.

    • TEXT_DELIMITER: Specifies the delimiter with which to splittext objects into Pub/Sub messages.This must be a single character and must only be set whenINPUT_FORMAT istext.It defaults to the newline character (\n).

      When using gcloud CLI to specify the delimiter, pay close attentionto the handling of special characters like newline\n. Usethe format'\n' to ensure the delimiter is correctly interpreted.Simply using\n without quotes or escaping results in adelimiter of"n".

    • MINIMUM_OBJECT_CREATE_TIME: Specifies the minimum timeat which an object was created in order for it to be ingested.This should be in UTC in the formatYYYY-MM-DDThh:mm:ssZ.For example,2024-10-14T08:30:30Z.

      Any date, past or future, from0001-01-01T00:00:00Z to9999-12-31T23:59:59Z inclusive, is valid.

    • MATCH_GLOB: Specifies the glob pattern to match inorder for an object to be ingested. When you are using gcloud CLI,a match glob with* characters must have the*character formatted as escaped in the form\*\*.txtor the whole match glob must be in quotes"**.txt" or'**.txt'.For information on supported syntax for glob patterns, seetheCloud Storage documentation.

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2026-02-19 UTC.