Use the Bigtable Spark connector

The Bigtable Spark connector lets you read and write data to and fromBigtable. You can read data from your Spark application using Spark SQLand DataFrames. The following Bigtable operations are supportedusing the Bigtable Spark connector:

  • Write data
  • Read data
  • Create a new table

This document shows you how to convert a Spark SQL DataFrames tableto a Bigtable table, and then compile and create a JAR fileto submit a Spark job.

Spark and Scala support status

The Bigtable Spark connector supports the following Scala versions:

The Bigtable Spark connector supports the following following Spark versions:

The Bigtable Spark connector supports the following Dataproc versions:

Calculate costs

If you decide to use any of the following billable components of Google Cloud, you are billed for the resources that you use:

  • Bigtable (You are not charged for using the Bigtable emulator)
  • Dataproc
  • Cloud Storage

Dataproc pricing applies to the use ofDataproc on Compute Engine clusters.Dataproc Serverlesspricing applies to workloads and sessions runon Dataproc Serverless for Spark.

To generate a cost estimate based on your projected usage, use thepricing calculator.

Before you begin

Complete the following prerequisites before using the Bigtable Spark connector.

Required roles

To get the permissions that you need to use Bigtable Spark connector, ask your administrator to grant you the following IAM roles on your project:

  • Bigtable Administrator (roles/bigtable.admin)(Optional): lets you read or write data and create a new table.
  • Bigtable User (roles/bigtable.user): lets you read or write data, but doesn't let you create a new table.

For more information about granting roles, seeManage access to projects, folders, and organizations.

You might also be able to get the required permissions throughcustom roles or otherpredefined roles.

Note: The permissions associated with thebigtable.admin are the highest level of permissions. Use other Bigtablepredefined roles based on the level of access you want to grant.

If you are using Dataproc or Cloud Storage, additional permissions might be required. For more information, seeDataproc permissions andCloud Storage permissions.

Set up Spark

Apart from creating a Bigtable instance, you also need to set up your Spark instance. You can do so locally or select either of these options to use Spark with Dataproc:

  • Dataproc cluster
  • Dataproc Serverless

For more information about choosing between a Dataproc cluster or serverless option, see theDataproc Serverless for Spark compared to Dataproc on Compute Engine documentation.

Download the connector JAR file

You can find the Bigtable Spark connector source code with examples in theBigtable Spark connector GitHub repository.

Based on your Spark setup, you can access the JAR file as follows:

  • If you are running PySpark locally, you should download the connector's JAR file from thegs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar Cloud Storage location.

    ReplaceSCALA_VERSION with2.12 or2.13, which are the only supported Scala versions, and replace theCONNECTOR_VERSION with the connector version that you want to use.

  • For Dataproc cluster or serverless option, use the latest JAR file as an artifact that can be added in your Scala or Java Spark applications. For more information about using the JAR file as an artifact, seeManage dependencies.

  • If you are submitting your PySpark job to Dataproc, use thegcloud dataproc jobs submit pyspark --jars flag to set the URI to the JAR file location in Cloud Storage—for examplegs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar.

Determine compute type

For read-only jobs, you can use Data Boost serverless compute, whichlets you avoid impacting your application-serving clusters. Your Sparkapplication must use version 1.1.0 or later of the Spark connector to useData Boost.

To use Data Boost, you must create a Data Boost app profile and thenprovide the app profile ID for thespark.bigtable.app_profile.idSparkoption when you add your Bigtableconfiguration to your Spark application. If you've already created an appprofile for your Spark read jobs and you want to continue using it withoutchanging your application code, you can convert the app profile to aData Boost app profile. For more information, seeConvert an appprofile.

For more information, see theBigtable Data Boostoverview.

For jobs that involve reads and writes, you can use your instance's clusternodes for compute by specifying a standard app profile with your request.

Identify or create an app profile to use

If you don't specify an app profile ID, the connector uses the default appprofile.

We recommend that you use a unique app profile for each application that yourun, including your Spark application. For more information about app profiletypes and settings, see theApp profilesoverview. For instructions, seeCreate andconfigure app profiles.

Add Bigtable configuration to your Spark application

In your Spark application, add the Spark options that let you interact with Bigtable.

Supported Spark options

Use the Spark options that are available as part of thecom.google.cloud.spark.bigtable package.

Option nameRequiredDefault valueMeaning
spark.bigtable.project.idYesN/ASet the Bigtable project ID.
spark.bigtable.instance.idYesN/ASet the Bigtable instance ID.
catalogYesN/ASet the JSON format that specifies the conversion format between the DataFrame's SQL-like schema and the Bigtable table's schema.

SeeCreate table metadata in JSON format for more information.
spark.bigtable.app_profile.idNodefaultSet the Bigtable app profile ID.
spark.bigtable.write.timestamp.millisecondsNoCurrent system timeSet the timestamp in milliseconds to use when writing a DataFrame to Bigtable.

Note that since all rows in the DataFrame use the same timestamp, rows with the same row key column in the DataFrame persist as a single version in Bigtable as they share the same timestamp.
spark.bigtable.create.new.tableNofalseSet totrue to create a new table before writing to Bigtable.
spark.bigtable.read.timerange.start.milliseconds orspark.bigtable.read.timerange.end.millisecondsNoN/ASet timestamps (in milliseconds since epoch time) to filter cells with a specific start and end date, respectively.
spark.bigtable.push.down.row.key.filtersNotrueSet totrue to allow simple row key filtering on the server-side. Filtering on compound row keys is implemented on the client-side.

SeeRead a specific DataFrame row using a filter for more information.
spark.bigtable.read.rows.attempt.timeout.millisecondsNo30mSet thetimeout duration for a read rows attempt corresponding to one DataFrame partition in the Bigtable client for Java.
spark.bigtable.read.rows.total.timeout.millisecondsNo12hSet thetotal timeout duration for a read rows attempt corresponding to one DataFrame partition in the Bigtable client for Java.
spark.bigtable.mutate.rows.attempt.timeout.millisecondsNo1mSet thetimeout duration for a mutate rows attempt corresponding to one DataFrame partition in the Bigtable client for Java.
spark.bigtable.mutate.rows.total.timeout.millisecondsNo10mSet thetotal timeout duration for a mutate rows attempt corresponding to one DataFrame partition in the Bigtable client for Java.
spark.bigtable.batch.mutate.sizeNo100Set to the number of mutations in each batch. The maximum value you can set is100000.
spark.bigtable.enable.batch_mutate.flow_controlNofalseSet totrue to enableflow control for batch mutations.

Create table metadata in JSON format

The Spark SQL DataFrames table format must be converted into a Bigtable table using a string with JSON format. This string JSON format makes the data format compatible with Bigtable. You can pass the JSON format in your application code using the.option("catalog", catalog_json_string) option.

As an example, consider the following DataFrame table and the corresponding Bigtable table.

In this example, thename andbirthYear columns in the DataFrame are grouped together under theinfo column family and renamed toname andbirth_year, respectively. Similarly, theaddress column is stored under thelocation column family with the same column name. Theid column from the DataFrame is converted to the Bigtable row key.

The row keys don't have a dedicated column name in Bigtable and in this example,id_rowkey is only used to indicate to the connector that this is the row key column. You can use any name for the row key column and make sure that you use the same name when you declare the"rowkey":"column_name" field in the JSON format.

DataFrameBigtable table =t1
ColumnsRow keyColumn families
infolocation
ColumnsColumns
idnamebirthYearaddressid_rowkeynamebirth_yearaddress

The JSON format for the catalog is as follows:

"""    {      "table": {"name": "t1"},      "rowkey": "id_rowkey",      "columns": {        "id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},        "name": {"cf": "info", "col": "name", "type": "string"},        "birthYear": {"cf": "info", "col": "birth_year", "type": "long"},        "address": {"cf": "location", "col": "address", "type": "string"}      }    }    """

The keys and values used in the JSON format are the following:

Catalog keyCatalog valueJSON Format
tableName of the Bigtable table."table":{"name":"t1"}

If the table does not exist, use.option("spark.bigtable.create.new.table", "true") to create a table.
rowkeyName of the column that will be used as the Bigtable row key. Ensure that the column name of the DataFrame column is used as the row key—for example,id_rowkey.

Compound keys are also accepted as row keys. For example,"rowkey":"name:address". This approach might result in row keys that require a full table scan for all read requests.
"rowkey":"id_rowkey",
columnsMapping of each DataFrame column into the corresponding Bigtable column family ("cf") and column name ("col"). The column name can be different from the column name in the DataFrame table. Supported data types includestring,long, andbinary."columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}"

In this example,id_rowkey is the row key, andinfo andlocation are the column families.

Supported data types

The connector supports usingstring,long, andbinary (byte array) typesin the catalog. Until support for other types such asintandfloat is added,you can manually convert such data types to byte arrays (Spark SQL'sBinaryType) before using the connector to write them toBigtable.

In addition, you can use Avro for serialize complextypes, such asArrayType. For more information, seeSerialize complex datatypes using Apache Avro.

Write to Bigtable

Use the.write() function and the supported options to write your data to Bigtable.

Java

The following code from theGitHub repository uses Java and Maven to write to Bigtable.

Stringcatalog="{"+"\"table\":{\"name\":\""+tableName+"\","+"\"tableCoder\":\"PrimitiveType\"},"+"\"rowkey\":\"wordCol\","+"\"columns\":{"+"\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"},"+"\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}"+"}}".replaceAll("\\s+","");privatestaticvoidwriteDataframeToBigtable(Dataset<Row>dataframe,Stringcatalog,StringcreateNewTable){dataframe.write().format("bigtable").option("catalog",catalog).option("spark.bigtable.project.id",projectId).option("spark.bigtable.instance.id",instanceId).option("spark.bigtable.create.new.table",createNewTable).save();}

Python

The following code from theGitHub repository uses Python to write to Bigtable.

catalog=''.join(("""{        "table":{"name":" """+bigtable_table_name+"""        ", "tableCoder":"PrimitiveType"},        "rowkey":"wordCol",        "columns":{          "word":{"cf":"rowkey", "col":"wordCol", "type":"string"},          "count":{"cf":"example_family", "col":"countCol", "type":"long"}        }        }""").split())input_data=spark.createDataFrame(data)print('Created the DataFrame:')input_data.show()input_data.write \.format('bigtable') \.options(catalog=catalog) \.option('spark.bigtable.project.id',bigtable_project_id) \.option('spark.bigtable.instance.id',bigtable_instance_id) \.option('spark.bigtable.create.new.table',create_new_table) \.save()print('DataFrame was written to Bigtable.')

Read from Bigtable

Use the.read() function to check whether the table was successfully imported into Bigtable.

Java

privatestaticDataset<Row>readDataframeFromBigtable(Stringcatalog){Dataset<Row>dataframe=spark.read().format("bigtable").option("catalog",catalog).option("spark.bigtable.project.id",projectId).option("spark.bigtable.instance.id",instanceId).load();returndataframe;}

Python

records=spark.read \.format('bigtable') \.option('spark.bigtable.project.id',bigtable_project_id) \.option('spark.bigtable.instance.id',bigtable_instance_id) \.options(catalog=catalog) \.load()print('Reading the DataFrame from Bigtable:')records.show()

Compile your project

Generate the JAR file that is used to run a job in either a Dataproc cluster, Dataproc Serverless, or local Spark instance. You can compile the JAR file locally and then use it to submit a job. The path to the compiled JAR is set as thePATH_TO_COMPILED_JAR environment variable when you submit a job.

This step doesn't apply to PySpark applications.

Manage dependencies

The Bigtable Spark connector supports the following dependency management tools:

Compile the JAR file

Maven

  1. Add thespark-bigtable dependency to your pom.xml file.

    <dependencies><dependency><groupId>com.google.cloud.spark.bigtable</groupId><artifactId>spark-bigtable_SCALA_VERSION</artifactId><version>0.1.0</version></dependency></dependencies>
  2. Add theMaven Shade plugin to yourpom.xml file to create an uber JAR:

    <plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals></execution></executions></plugin></plugins>
  3. Runmvn clean install command to generate a JAR file.

sbt

  1. Add thespark-bigtable dependency to yourbuild.sbt file:

    libraryDependencies+="com.google.cloud.spark.bigtable"%"spark-bigtable_SCALA_VERSION"%"0.1.0{""}}"
  2. Add thesbt-assembly plugin to yourproject/plugins.sbt orproject/assembly.sbt file to create an Uber JAR file.

    addSbtPlugin("com.eed3si9n"%"sbt-assembly"%"2.1.1")
  3. Run thesbt clean assembly command to generate the JAR file.

Gradle

  1. Add thespark-bigtable dependency to yourbuild.gradle file.

    dependencies{implementationgroup:'com.google.cloud.bigtable',name:'spark-bigtable_SCALA_VERSION',version:'0.1.0'}
  2. Add theShadow plugin in yourbuild.gradle file to create an uber JAR file:

    plugins{id'com.github.johnrengelman.shadow'version'8.1.1'id'java'}
  3. See theShadow plugin's documentation for more configuration and JAR compilation information.

Note: SeeGradle documentation, for detailed information about creating an uber JAR file with Gradle.

Submit a job

Submit a Spark job using either Dataproc, Dataproc Serverless, or a local Spark instance to launch your application.

Set runtime environment

Set the following environment variables.

      #Google Cloud      export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID      export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID      export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME      export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER      export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION      export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE      #Dataproc Serverless      export BIGTABLE_SPARK_SUBNET=SUBNET      export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME      #Scala/Java      export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR      #PySpark      export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR      export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE      export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR

Replace the following:

  • PROJECT_ID: The permanent identifier for the Bigtable project.
  • INSTANCE_ID: The permanent identifier for the Bigtable instance.
  • TABLE_NAME: The permanent identifier for the table.
  • DATAPROC_CLUSTER: The permanent identifier for the Dataproc cluster.
  • DATAPROC_REGION: The Dataproc region that contains one of the clusters in your Dataproc instance—for example,northamerica-northeast2.
  • DATAPROC_ZONE: The zone where the Dataproc cluster runs.
  • SUBNET: The full resource path of thesubnet.
  • GCS_BUCKET_NAME: The Cloud Storage bucket to upload Spark workload dependencies.
  • PATH_TO_COMPILED_JAR: The full or relative path to the compiled JAR—for example,/path/to/project/root/target/<compiled_JAR_name> for Maven.
  • GCS_PATH_TO_CONNECTOR_JAR: Thegs://spark-lib/bigtable Cloud Storage bucket, where thespark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar file is located.
  • PATH_TO_PYTHON_FILE: For PySpark applications, the path to the Python file that will be used to write data to and read data from Bigtable.
  • LOCAL_PATH_TO_CONNECTOR_JAR: For PySpark applications, path to the downloaded Bigtable Spark connector JAR file.

Submit a Spark job

For Dataproc instances or your local Spark setup, run a Spark job to upload data to Bigtable.

Note: The following examples use environment variables that you configure foryour environment, such as$BIGTABLE_SPARK_PROJECT_ID in Java or Scala, or--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID in PySpark. For moreinformation, see theSet runtime environment section of thisdocument.

Dataproc cluster

Use the compiled JAR file and create a Dataproc cluster job that reads and writes data from and to Bigtable.

  1. Create a Dataproc cluster. The following example shows a sample command to create a Dataproc v2.0 cluster with Debian 10, two worker nodes, and default configurations.

    gcloud dataproc clusters create \  $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \  --zone $BIGTABLE_SPARK_DATAPROC_ZONE \  --master-machine-type n2-standard-4 --master-boot-disk-size 500 \  --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \  --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
    Note: Make sure that you include a logging framework compatible with the SLF4J logging API. For PySpark applications, use the
    --packages option to specify the logging package, such asreload4j, when you submit the Spark job. For Scala and Java applications, include the logging package in your dependency management tool. If you don't include this package, then thejava.lang.NoClassDefFoundError: org/slf4j/impl/StaticLoggerBinder error occurs.
  2. Submit a job.

    Scala/Java

    The following example shows thespark.bigtable.example.WordCount class that includes the logic to create a test table in DataFrame, write the table to Bigtable, and then count the number of words in the table.

        gcloud dataproc jobs submit spark \    --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \    --region=$BIGTABLE_SPARK_DATAPROC_REGION \    --class=spark.bigtable.example.WordCount \    --jar=$PATH_TO_COMPILED_JAR \    -- \    $BIGTABLE_SPARK_PROJECT_ID \    $BIGTABLE_SPARK_INSTANCE_ID \    $BIGTABLE_SPARK_TABLE_NAME \

    PySpark

        gcloud dataproc jobs submit pyspark \    --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \    --region=$BIGTABLE_SPARK_DATAPROC_REGION \    --jars=$GCS_PATH_TO_CONNECTOR_JAR \    --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \    $PATH_TO_PYTHON_FILE \    -- \    --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \    --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \    --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \

Dataproc Serverless

Use the compiled JAR file and create a Dataproc job that reads and writes data from and to Bigtable with a Dataproc Serverless instance.

Note: If you encounter aSpecify a network which contains subnetworks to enable 'internal_ip_only' error while submitting a job to Dataproc Serverless, try creating a new subnet. For more information about creating a new subnet, seeCreate and manage VPC networks.

Local Spark

Use the downloaded JAR file and create a Spark job that reads and writes data from and to Bigtable with a local Spark instance. You can also use the Bigtable emulator to submit the Spark job.

Use the Bigtable emulator

If you decide to use the Bigtable emulator, then follow these steps:

  1. Run the following command to start the emulator:

    gcloud beta emulators bigtable start

    By default, the emulator chooseslocalhost:8086.

  2. Set theBIGTABLE_EMULATOR_HOST environment variable:

    exportBIGTABLE_EMULATOR_HOST=localhost:8086
  3. Submit the Spark job.

For more information about using the Bigtable emulator, seeTest using the emulator.

Submit a Spark job

Use thespark-submit command to submit a Spark job regardless of whether you are using a local Bigtable emulator.

Scala/Java

  spark-submit $PATH_TO_COMPILED_JAR \  $BIGTABLE_SPARK_PROJECT_ID \  $BIGTABLE_SPARK_INSTANCE_ID \  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  spark-submit \  --jars=$LOCAL_PATH_TO_CONNECTOR_JAR \  --packages=org.slf4j:slf4j-reload4j:1.7.36 \  $PATH_TO_PYTHON_FILE \  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

Verify the table data

Run the followingcbt CLI command to verify the data is written to Bigtable. Thecbt CLI is a component of the Google Cloud CLI. For more information, see thecbt CLI overview.

    cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \    read $BIGTABLE_SPARK_TABLE_NAME

Additional solutions

Use the Bigtable Spark connector for specific solutions, such as serializing complex Spark SQL types, reading specific rows, and generating client-side metrics.

Read a specific DataFrame row using a filter

When using DataFrames to read from Bigtable, you can specify afilter to only read specific rows. Simple filters such as==,<=, andstartsWith on the row key column are applied on the server side to avoid a full-table scan. Filters on compound row keys or complex filters such as theLIKE filter on the row key column are applied at the client side.

If you are reading large tables, we recommend using simple row key filters to avoid performing a full-table scan. The following sample statement shows how to read using a simple filter. Make sure that in your Spark filter, you use the name of the DataFrame column that is converted to the row key:

dataframe.filter("id == 'some_id'").show()

When applying a filter, use the DataFrame column name instead of the Bigtable table column name.

Serialize complex data types using Apache Avro

The Bigtable Spark connector provides support for using Apache Avro to serialize complexSpark SQL types, such asArrayType,MapType, orStructType. Apache Avro provides data serialization for record data that is commonly used for processing and storing complex data structures.

Use a syntax such as"avro":"avroSchema" to specify that a column in Bigtable should be encoded using Avro. You can then use.option("avroSchema", avroSchemaString) when reading from or writing to Bigtable to specify the Avro schema corresponding to that column in string format. You can use different option names—for example,"anotherAvroSchema" for different columns and pass Avro schemas for multiple columns.

defcatalogWithAvroColumn=s"""{                    |"table":{"name":"ExampleAvroTable"},                    |"rowkey":"key",                    |"columns":{                    |"col0":{"cf":"rowkey", "col":"key", "type":"string"},                    |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}                    |}                    |}""".stripMargin

Use client-side metrics

Since the Bigtable Spark connector is based on theBigtable Client for Java, client-side metrics are enabled inside the connector by default. You can refer to theclient-side metrics documentation to find more details on accessing and interpreting these metrics.

Use the Bigtable client for Java with low-level RDD functions

Since the Bigtable Spark connector is based on theBigtable client for Java, you can directly use the client in your Spark applications and perform distributed read or write requests within the low-level RDD functions such asmapPartitions andforeachPartition.

To use the Bigtable client for Java classes, append thecom.google.cloud.spark.bigtable.repackaged prefix to the package names. For example, instead of using the class name ascom.google.cloud.bigtable.data.v2.BigtableDataClient, usecom.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient.

For more information about the Bigtable client for Java, see theBigtable client for Java.

What's next

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2025-12-15 UTC.