Write Pub/Sub Lite messages by using Apache Spark
- Current customers: Pub/Sub Lite remains functional until March 18, 2026.
If you have not used Pub/Sub Lite within the 90-day period preceding July 15, 2025 (April 15, 2025 - July 15, 2025), you won't be able to access Pub/Sub Lite starting on July 15, 2025. - New customers: Pub/Sub Lite is no longer available for new customers after September 24, 2024.
You can migrate your Pub/Sub Lite service toGoogle Cloud Managed Service for Apache Kafka orPub/Sub.
ThePub/Sub Lite Spark Connector is an open-source Java client library that supports the use ofPub/Sub Lite as an input and output source forApache Spark Structured Streaming . The connector works in all Apache Spark distributions, includingDataproc.
This quickstart shows you how to:
- read messages from Pub/Sub Lite
- write messages to Pub/Sub Lite
usingPySpark from a Dataproc Spark cluster.
Before you begin
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Note: If you don't plan to keep the resources that you create in this procedure, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project.Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
- Create a project: To create a project, you need the Project Creator role (
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission.Learn how to grant roles.
Verify that billing is enabled for your Google Cloud project.
Enable the Pub/Sub Lite, Dataproc,Cloud Storage, Logging APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission.Learn how to grant roles.Install the Google Cloud CLI.
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
Toinitialize the gcloud CLI, run the following command:
gcloudinit
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Note: If you don't plan to keep the resources that you create in this procedure, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project.Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
- Create a project: To create a project, you need the Project Creator role (
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission.Learn how to grant roles.
Verify that billing is enabled for your Google Cloud project.
Enable the Pub/Sub Lite, Dataproc,Cloud Storage, Logging APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission.Learn how to grant roles.Install the Google Cloud CLI.
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
Toinitialize the gcloud CLI, run the following command:
gcloudinit
Set up
Create variables for your project.
exportPROJECT_ID=$(gcloudconfigget-valueproject)exportPROJECT_NUMBER=$(gcloudprojectslist\--filter="projectId:$PROJECT_ID"\--format="value(PROJECT_NUMBER)")Create a Cloud Storage bucket. Cloud Storage bucketnames must be globally unique.
export BUCKET=your-bucket-namegcloud storage buckets create gs://$BUCKETCreate a Pub/Sub Lite topic and subscription in a supportedlocation. SeeCreate a topicif you use a Pub/Sub Litereservation.
exportTOPIC=your-lite-topic-idexportSUBSCRIPTION=your-lite-subscription-idexportPUBSUBLITE_LOCATION=your-lite-locationgcloudpubsublite-topicscreate$TOPIC\--location=$PUBSUBLITE_LOCATION\--partitions=2\--per-partition-bytes=30GiBgcloudpubsublite-subscriptionscreate$SUBSCRIPTION\--location=$PUBSUBLITE_LOCATION\--topic=$TOPICCreate a Dataproc cluster.
exportDATAPROC_REGION=your-dataproc-regionexportCLUSTER_ID=your-dataproc-cluster-idgclouddataprocclusterscreate$CLUSTER_ID\--region$DATAPROC_REGION\--image-version2.1\--scopes'https://www.googleapis.com/auth/cloud-platform'\--enable-component-gateway\--bucket$BUCKET--region: a supported Dataprocregionwhere your Pub/Sub Lite topic and subscription reside.--image-version: thecluster's image version, which determines the Apache Spark version installed on the cluster.Choose2.x.x image release versionsbecause the Pub/Sub Lite Spark Connector currently supports Apache Spark 3.x.x.--scopes: enable API access to Google Cloud services in the same project.--enable-component-gateway: enable access to the Apache Spark web UI.--bucket: a staging Cloud Storage bucket used to store clusterjob dependencies, driver output, and cluster config files.
Clone the quickstart repository and navigate to the sample code directory:
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.gitcd python-docs-samples/pubsublite/spark-connector/
Writing to Pub/Sub Lite
The following example will:
- create arate source that generates consecutive numbers and timestamps formatted as
spark.sql.Row - transform the data to match the requiredtable schema by the Pub/Sub Lite Spark Connector's
writeStreamAPI - write the data to an existing Pub/Sub Lite topic
frompyspark.sqlimportSparkSessionfrompyspark.sql.functionsimportarray,create_map,col,lit,whenfrompyspark.sql.typesimportBinaryType,StringTypeimportuuid# TODO(developer):# project_number = 11223344556677# location = "us-central1-a"# topic_id = "your-topic-id"spark=SparkSession.builder.appName("write-app").getOrCreate()# Create a RateStreamSource that generates consecutive numbers with timestamps:# |-- timestamp: timestamp (nullable = true)# |-- value: long (nullable = true)sdf=spark.readStream.format("rate").option("rowsPerSecond",1).load()# Transform the dataframe to match the required data fields and data types:# https://github.com/googleapis/java-pubsublite-spark#data-schemasdf=(sdf.withColumn("key",lit("example").cast(BinaryType())).withColumn("data",col("value").cast(StringType()).cast(BinaryType())).withColumnRenamed("timestamp","event_timestamp")# Populate the attributes field. For example, an even value will# have {"key1", [b"even"]}..withColumn("attributes",create_map(lit("key1"),array(when(col("value")%2==0,b"even").otherwise(b"odd")),),).drop("value"))# After the transformation, the schema of the dataframe should look like:# |-- key: binary (nullable = false)# |-- data: binary (nullable = true)# |-- event_timestamp: timestamp (nullable = true)# |-- attributes: map (nullable = false)# | |-- key: string# | |-- value: array (valueContainsNull = false)# | | |-- element: binary (containsNull = false)sdf.printSchema()query=(sdf.writeStream.format("pubsublite").option("pubsublite.topic",f"projects/{project_number}/locations/{location}/topics/{topic_id}",)# Required. Use a unique checkpoint location for each job..option("checkpointLocation","/tmp/app"+uuid.uuid4().hex).outputMode("append").trigger(processingTime="1 second").start())# Wait 60 seconds to terminate the query.query.awaitTermination(60)query.stop()To submit the write job to Dataproc:
Console
- Upload the PySpark script to your Cloud Storage bucket.
- Go to theCloud Storage console.
- Select your bucket.
- UseUpload files to upload the PySpark script that you intendto use.
- Submit the job to your Dataproc cluster:
- Go to theDataproc console.
- Navigate to jobs.
- ClickSubmit job.
- Fill in the job details.
- UnderCluster, choose your cluster.
- UnderJob, give a name to the job ID.
- ForJob type, choose PySpark.
- ForMain python file, provide the gcloud storage URI of theuploaded PySpark script that starts with
gs://. - ForJar files, choose the latest Spark connector version fromMaven , look for the jar with dependencies in the download options, andcopy its link.
- ForArguments, if you use the full PySpark script fromGitHub, enter
--project_number=PROJECT_NUMBER,--location=PUBSUBLITE_LOCATION,--topic_id=TOPIC_ID; if you copy the PySpark script above with the to-do's completed,leave it blank. - UnderProperties, enter key
spark.masterand valueyarn. - ClickSubmit.
gcloud
Use thegcloud dataproc jobs submit pysparkcommand to submit the job to Dataproc:
gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \ --region=$DATAPROC_REGION \ --cluster=$CLUSTER_ID \ --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \ --driver-log-levels=root=INFO \ --properties=spark.master=yarn \ -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC--region: the pre-selected Dataprocregion.--cluster: the Dataproc cluster name.--jars: the Pub/Sub Lite Spark Connector's uber jar with dependencies ina public Cloud Storage bucket. You can also visit thislink to download the uber jar with dependencies from Maven.--driver-log-levels: set logging level to INFO at the root level.--properties: use YARN resource manager for the Spark master.--: provide the arguments required by the script.
If thewriteStream operation succeeds, you should see log messageslike the following locally as well as in the job details page in theGoogle Cloud console:
INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..Reading from Pub/Sub Lite
The following example will read messages from an existingPub/Sub Lite subscription using thereadStream API. The connector will output messages that conforms to the fixedtable schema formatted asspark.sql.Row .
frompyspark.sqlimportSparkSessionfrompyspark.sql.typesimportStringType# TODO(developer):# project_number = 11223344556677# location = "us-central1-a"# subscription_id = "your-subscription-id"spark=SparkSession.builder.appName("read-app").master("yarn").getOrCreate()sdf=(spark.readStream.format("pubsublite").option("pubsublite.subscription",f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",).load())sdf=sdf.withColumn("data",sdf.data.cast(StringType()))query=(sdf.writeStream.format("console").outputMode("append").trigger(processingTime="1 second").start())# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.query.awaitTermination(120)query.stop()To submit the read job to Dataproc:
Console
- Upload the PySpark script to your Cloud Storage bucket.
- Go to theCloud Storage console.
- Select your bucket.
- UseUpload files to upload the PySpark script that you intendto use.
- Submit the job to your Dataproc cluster:
- Go to theDataproc console.
- Navigate to jobs.
- ClickSubmit job.
- Fill in the job details.
- UnderCluster, choose your cluster.
- UnderJob, give a name to the job ID.
- ForJob type, choose PySpark.
- ForMain python file, provide the gcloud storage URI of theuploaded PySpark script that starts with
gs://. - ForJar files, choose the latest Spark connector version fromMaven , look for the jar with dependencies in the download options, andcopy its link.
- ForArguments, if you use the full PySpark script fromGitHub, enter
--project_number=PROJECT_NUMBER,--location=PUBSUBLITE_LOCATION,--subscription_id=SUBSCRIPTION_ID; if you copy the PySpark script above with the to-do's completed,leave it blank. - UnderProperties, enter key
spark.masterand valueyarn. - ClickSubmit.
gcloud
Use thegcloud dataproc jobs submit pysparkcommand again to submit the job to Dataproc:
gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \ --region=$DATAPROC_REGION \ --cluster=$CLUSTER_ID \ --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \ --driver-log-levels=root=INFO \ --properties=spark.master=yarn \ -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION--region: the pre-selected Dataprocregion.--cluster: the Dataproc cluster name.--jars: the Pub/Sub Lite Spark Connector's uber jar with dependencies ina public Cloud Storage bucket. You can also visit thislink to download the uber jar with dependencies from Maven.--driver-log-levels: set logging level to INFO at the root level.--properties: use YARN resource manager for the Spark master.--: provide required arguments for the script.
If thereadStream operation succeeds, you should see log messageslike the following locally as well as in the job details page in theGoogle Cloud console:
+--------------------+---------+------+---+----+--------------------+--------------------+----------+| subscription|partition|offset|key|data| publish_timestamp| event_timestamp|attributes|+--------------------+---------+------+---+----+--------------------+--------------------+----------+|projects/50200928...| 0| 89523| 0| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []||projects/50200928...| 0| 89524| 1| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []||projects/50200928...| 0| 89525| 2| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|Replay and purge messages from Pub/Sub Lite
Seek operations don't work when reading fromPub/Sub Lite using the Pub/Sub Lite Spark Connector becauseApache Spark systems performtheir own tracking of offsets within partitions. The workaround is to drain,seek and restart the workflows.
Note: Due tolimitations inApache Spark's processingmodel, don't attempt to seek to or create a subscription at the beginning ofbacklog if using the Pub/Sub Lite Spark Connector to read fromPub/Sub Lite. Instead, seek to the publish timestamp of the unixepoch to replay all messages from backlog.Clean up
To avoid incurring charges to your Google Cloud account for the resources used on this page, follow these steps.
Delete the topic and subscription.
gcloud pubsub lite-topics delete $TOPICgcloud pubsub lite-subscriptions delete $SUBSCRIPTIONDelete the Dataproc cluster.
gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGIONRemove the Cloud Storage bucket.
gcloud storage rm gs://$BUCKET
What's next
Check out theword count example in Java for the Pub/Sub Lite Spark Connector.
Learn how toaccess the Dataproc job driver output.
Other Spark connectors by Google Cloud products:BigQuery connector,Bigtable connector,Cloud Storage connector.
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2026-02-19 UTC.