Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

BigQuery data source for Apache Spark: Read data from BigQuery into DataFrames, write DataFrames into BigQuery tables.

License

NotificationsYou must be signed in to change notification settings

GoogleCloudDataproc/spark-bigquery-connector

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

The connector supports readingGoogle BigQuery tables into Spark's DataFrames, and writing DataFrames back into BigQuery.This is done by using theSpark SQL Data Source API to communicate with BigQuery.

BigQuery Storage API

TheStorage API streams data in parallel directly from BigQuery via gRPC without using Google Cloud Storage as an intermediary.

It has a number of advantages over using the previous export-based read flow that should generally lead to better read performance:

Direct Streaming

It does not leave any temporary files in Google Cloud Storage. Rows are read directly from BigQuery servers using the Arrow or Avro wire formats.

Filtering

The new API allows column and predicate filtering to only read the data you are interested in.

Column Filtering

Since BigQuery isbacked by a columnar datastore, it can efficiently stream data without reading all columns.

Predicate Filtering

The Storage API supports arbitrary pushdown of predicate filters. Connector version 0.8.0-beta and above support pushdown of arbitrary filters to Bigquery.

There is a known issue in Spark that does not allow pushdown of filters on nested fields. For example - filters likeaddress.city = "Sunnyvale" will not get pushdown to Bigquery.

Dynamic Sharding

The API rebalances records between readers until they all complete. This means that all Map phases will finish nearly concurrently. See this blog article onhow dynamic sharding is similarly used in Google Cloud Dataflow.

SeeConfiguring Partitioning for more details.

Requirements

Enable the BigQuery Storage API

Followthese instructions.

Create a Google Cloud Dataproc cluster (Optional)

If you do not have an Apache Spark environment you can create a Cloud Dataproc cluster with pre-configured auth. The following examples assume you are using Cloud Dataproc, but you can usespark-submit on any cluster.

Any Dataproc cluster using the API needs the 'bigquery' or 'cloud-platform' scopes. Dataproc clusters have the 'bigquery' scope by default, so most clusters in enabled projects should work by default e.g.

MY_CLUSTER=...gcloud dataproc clusters create "$MY_CLUSTER"

Downloading and Using the Connector

The latest version of the connector is publicly available in the following links:

versionLink
Spark 3.5gs://spark-lib/bigquery/spark-3.5-bigquery-0.42.2.jar(HTTP link)
Spark 3.4gs://spark-lib/bigquery/spark-3.4-bigquery-0.42.2.jar(HTTP link)
Spark 3.3gs://spark-lib/bigquery/spark-3.3-bigquery-0.42.2.jar(HTTP link)
Spark 3.2gs://spark-lib/bigquery/spark-3.2-bigquery-0.42.2.jar(HTTP link)
Spark 3.1gs://spark-lib/bigquery/spark-3.1-bigquery-0.42.2.jar(HTTP link)
Spark 2.4gs://spark-lib/bigquery/spark-2.4-bigquery-0.37.0.jar(HTTP link)
Scala 2.13gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.42.2.jar (HTTP link)
Scala 2.12gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.2.jar (HTTP link)
Scala 2.11gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar (HTTP link)

The first six versions are Java based connectors targeting Spark 2.4/3.1/3.2/3.3/3.4/3.5 of all Scala versions built on the newData Source APIs (Data Source API v2) of Spark.

The final two connectors are Scala based connectors, please use the jar relevant to your Spark installation as outlinedbelow.

Connector to Spark Compatibility Matrix

Connector \ Spark2.32.43.03.13.23.33.43.5
spark-3.5-bigquery
spark-3.4-bigquery
spark-3.3-bigquery
spark-3.2-bigquery
spark-3.1-bigquery
spark-2.4-bigquery
spark-bigquery-with-dependencies_2.13
spark-bigquery-with-dependencies_2.12
spark-bigquery-with-dependencies_2.11

Connector to Dataproc Image Compatibility Matrix

Connector \ Dataproc Image1.31.41.52.02.12.2Serverless
Image 1.0
Serverless
Image 2.0
Serverless
Image 2.1
Serverless
Image 2.2
spark-3.5-bigquery
spark-3.4-bigquery
spark-3.3-bigquery
spark-3.2-bigquery
spark-3.1-bigquery
spark-2.4-bigquery
spark-bigquery-with-dependencies_2.13
spark-bigquery-with-dependencies_2.12
spark-bigquery-with-dependencies_2.11

Maven / Ivy Package Usage

The connector is also available from theMaven Centralrepository. It can be used using the--packages option or thespark.jars.packages configuration property. Use the following value

versionConnector Artifact
Spark 3.5com.google.cloud.spark:spark-3.5-bigquery:0.42.2
Spark 3.4com.google.cloud.spark:spark-3.4-bigquery:0.42.2
Spark 3.3com.google.cloud.spark:spark-3.3-bigquery:0.42.2
Spark 3.2com.google.cloud.spark:spark-3.2-bigquery:0.42.2
Spark 3.1com.google.cloud.spark:spark-3.1-bigquery:0.42.2
Spark 2.4com.google.cloud.spark:spark-2.4-bigquery:0.37.0
Scala 2.13com.google.cloud.spark:spark-bigquery-with-dependencies_2.13:0.42.2
Scala 2.12com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.42.2
Scala 2.11com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.29.0

Specifying the Spark BigQuery connector version in a Dataproc cluster

Dataproc clusters created using image 2.1 and above, or batches using the Dataproc serverless service come with built-in Spark BigQuery connector.Using the standard--jars or--packages (or alternatively, thespark.jars/spark.jars.packages configuration) won't help in this case as the built-in connector takes precedence.

To use another version than the built-in one, please do one of the following:

  • For Dataproc clusters, using image 2.1 and above, add the following flag on cluster creation to upgrade the version--metadata SPARK_BQ_CONNECTOR_VERSION=0.42.2, or--metadata SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.3-bigquery-0.42.2.jar to create the cluster with a different jar. The URL can point to any valid connector JAR for the cluster's Spark version.
  • For Dataproc serverless batches, add the following property on batch creation to upgrade the version:--properties dataproc.sparkBqConnector.version=0.42.2, or--properties dataproc.sparkBqConnector.uri=gs://spark-lib/bigquery/spark-3.3-bigquery-0.42.2.jar to create the batch with a different jar. The URL can point to any valid connector JAR for the runtime's Spark version.

Hello World Example

You can run a simple PySpark wordcount against the API without compilation by running

Dataproc image 1.5 and above

gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER" \  --jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.2.jar \  examples/python/shakespeare.py

Dataproc image 1.4 and below

gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER" \  --jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar \  examples/python/shakespeare.py

Example Codelab

https://codelabs.developers.google.com/codelabs/pyspark-bigquery

Usage

The connector uses the cross languageSpark SQL Data Source API:

Reading data from a BigQuery table

df = spark.read \  .format("bigquery") \  .load("bigquery-public-data.samples.shakespeare")

or the Scala only implicit API:

import com.google.cloud.spark.bigquery._val df = spark.read.bigquery("bigquery-public-data.samples.shakespeare")

For more information, see additional code samples inPython,ScalaandJava.

Reading data from a BigQuery query

The connector allows you to run anyStandard SQLSELECT query on BigQuery and fetch its results directly to a Spark Dataframe.This is easily done as described in the following code sample:

spark.conf.set("viewsEnabled","true")sql = """  SELECT tag, COUNT(*) c  FROM (    SELECT SPLIT(tags, '|') tags    FROM `bigquery-public-data.stackoverflow.posts_questions` a    WHERE EXTRACT(YEAR FROM creation_date)>=2014  ), UNNEST(tags) tag  GROUP BY 1  ORDER BY 2 DESC  LIMIT 10  """df = spark.read.format("bigquery").load(sql)df.show()

Which yields the result

+----------+-------+|       tag|      c|+----------+-------+|javascript|1643617||    python|1352904||      java|1218220||   android| 913638||       php| 911806||        c#| 905331||      html| 769499||    jquery| 608071||       css| 510343||       c++| 458938|+----------+-------+

A second option is to use thequery option like this:

df = spark.read.format("bigquery").option("query", sql).load()

Notice that the execution should be faster as only the result is transmittedover the wire. In a similar fashion the queries can include JOINs moreefficiently then running joins on Spark or use other BigQuery features such assubqueries,BigQuery user defined functions,wildcard tables,BigQuery MLand more.

In order to use this feature theviewsEnabled configurations MUST be set totrue. This can also be done globally as shown in the example above.

Important: This feature is implemented by running the query on BigQuery andsaving the result into a temporary table, of which Spark will read the resultsfrom. This may add additional costs on your BigQuery account.

Reading From Parameterized Queries

The connector supports executingBigQuery parameterized queries using thestandardspark.read.format('bigquery') API.

To use parameterized queries:

  1. Provide the SQL query containing parameters using the.option("query", "SQL_STRING") with named (@param) or positional (?) parameters.
  2. Specify the parameter values using dedicated options:
  • Named Parameters: Use options prefixed withNamedParameters.. Theparameter name follows the prefix (case-insensitive).
    • Format:.option("NamedParameters.<parameter_name>", "TYPE:value")
    • Example:.option("NamedParameters.corpus", "STRING:romeoandjuliet")
  • Positional Parameters: Use options prefixed withPositionalParameters.. The 1-based index follows the prefix.
    • Format:.option("PositionalParameters.<parameter_index>", "TYPE:value")
    • Example:.option("PositionalParameters.1", "STRING:romeoandjuliet")

TheTYPE in theTYPE:value string specifies the BigQuery Standard SQL datatype. Supported types currently include:BOOL,INT64,FLOAT64,NUMERIC,STRING,DATE,DATETIME,JSON,TIME,GEOGRAPHY,TIMESTAMP.

ARRAY andSTRUCT types are not supported as parameters at this time.

Reading From Views

The connector has a preliminary support for reading fromBigQuery views. Pleasenote there are a few caveats:

  • BigQuery views are not materialized by default, which means that the connectorneeds to materialize them before it can read them. This process affects theread performance, even before running anycollect() orcount() action.
  • The materialization process can also incur additional costs to your BigQuerybill.
  • Reading from views isdisabled by default. In order to enable it,either set the viewsEnabled option when reading the specific view(.option("viewsEnabled", "true")) or set it globally by callingspark.conf.set("viewsEnabled", "true").

Notice: Before version 0.42.1 of the connector, the following configurationsare required:

  • By default, the materialized views are created in the same project anddataset. Those can be configured by the optionalmaterializationProjectandmaterializationDataset options, respectively. These options can alsobe globally set by callingspark.conf.set(...) before reading the views.
  • As mentioned in theBigQuery documentation,thematerializationDataset should be in same location as the view.

Starting version 0.42.1 those configurations areredundant and are ignored.It is highly recommended to upgrade to this version or a later one to enjoysimpler configuration when using views or loading from queries.

Writing data to BigQuery

Writing DataFrames to BigQuery can be done using two methods: Direct and Indirect.

Direct write using the BigQuery Storage Write API

In this method the data is written directly to BigQuery using theBigQuery Storage Write API. In order to enable this option, pleaseset thewriteMethod option todirect, as shown below:

df.write \  .format("bigquery") \  .option("writeMethod", "direct") \  .option("writeAtLeastOnce", "true")  .save("dataset.table")

Writing to existing partitioned tables (date partitioned, ingestion time partitioned and rangepartitioned) in APPEND save mode and OVERWRITE mode (only date and range partitioned) is fully supported by the connector and the BigQuery Storage WriteAPI. The use ofdatePartition,partitionField,partitionType,partitionRangeStart,partitionRangeEnd,partitionRangeIntervaldescribed below is not supported at this moment by the direct write method.

Important: Please refer to thedata ingestion pricingpage regarding the BigQuery Storage Write API pricing.

Important: Please use version 0.24.2 and above for direct writes, as previousversions have a bug that may cause a table deletion in certain cases.

Indirect write

In this method the data is written first to GCS, and then it is loaded it to BigQuery. A GCS bucket must be configuredto indicate the temporary data location.

df.write \  .format("bigquery") \  .option("temporaryGcsBucket","some-bucket") \  .save("dataset.table")

The data is temporarily stored using theApache Parquet,Apache ORC orApache Avro formats.

The GCS bucket and the format can also be set globally using Spark's RuntimeConfig like this:

spark.conf.set("temporaryGcsBucket","some-bucket")df.write \  .format("bigquery") \  .save("dataset.table")

When streaming a DataFrame to BigQuery, each batch is written in the same manner as a non-streaming DataFrame.Note that a HDFS compatiblecheckpoint location(eg:path/to/HDFS/dir orgs://checkpoint-bucket/checkpointDir) must be specified.

df.writeStream \  .format("bigquery") \  .option("temporaryGcsBucket","some-bucket") \  .option("checkpointLocation", "some-location") \  .option("table", "dataset.table")

Important: The connector does not configure the GCS connector, in order to avoid conflict with another GCS connector, if exists. In order to use the write capabilities of the connector, please configure the GCS connector on your cluster as explainedhere.

Properties

The API Supports a number of options to configure the read

<style>table#propertytable td, table th{word-break:break-word}</style>
PropertyMeaningUsage
tableThe BigQuery table in the format[[project:]dataset.]table. It is recommended to use thepath parameter ofload()/save() instead. This option has been deprecated and will be removed in a future version.
(Deprecated)
Read/Write
datasetThe dataset containing the table. This option should be used with standard table and views, but not when loading query results.
(Optional unless omitted intable)
Read/Write
projectThe Google Cloud Project ID of the table. This option should be used with standard table and views, but not when loading query results.
(Optional. Defaults to the project of the Service Account being used)
Read/Write
parentProjectThe Google Cloud Project ID of the table to bill for the export.
(Optional. Defaults to the project of the Service Account being used)
Read/Write
maxParallelismThe maximal number of partitions to split the data into. Actual number may be less if BigQuery deems the data small enough. If there are not enough executors to schedule a reader per partition, some partitions may be empty.
Important: The old parameter (parallelism) is still supported but in deprecated mode. It will ve removed in version 1.0 of the connector.
(Optional. Defaults to the larger of the preferredMinParallelism and 20,000).)
Read
preferredMinParallelismThe preferred minimal number of partitions to split the data into. Actual number may be less if BigQuery deems the data small enough. If there are not enough executors to schedule a reader per partition, some partitions may be empty.
(Optional. Defaults to the smallest of 3 times the application's default parallelism and maxParallelism.)
Read
viewsEnabledEnables the connector to read from views and not only tables. Please read therelevant section before activating this option.
(Optional. Defaults tofalse)
Read
readDataFormatData Format for reading from BigQuery. Options :ARROW,AVRO
(Optional. Defaults toARROW)
Read
optimizedEmptyProjectionThe connector uses an optimized empty projection (select without any columns) logic, used forcount() execution. This logic takes the data directly from the table metadata or performs a much efficient `SELECT COUNT(*) WHERE...` in case there is a filter. You can cancel the use of this logic by setting this option tofalse.
(Optional, defaults totrue)
Read
pushAllFiltersIf set totrue, the connector pushes all the filters Spark can delegate to BigQuery Storage API. This reduces amount of data that needs to be sent from BigQuery Storage API servers to Spark clients. This option has been deprecated and will be removed in a future version.
(Optional, defaults totrue)
(Deprecated)
Read
bigQueryJobLabelCan be used to add labels to the connector initiated query and load BigQuery jobs. Multiple labels can be set.
(Optional)
Read
bigQueryTableLabelCan be used to add labels to the table while writing to a table. Multiple labels can be set.
(Optional)
Write
traceApplicationNameApplication name used to trace BigQuery Storage read and write sessions. Setting the application name is required to set the trace ID on the sessions.
(Optional)
Read
traceJobIdJob ID used to trace BigQuery Storage read and write sessions.
(Optional, defaults to the Dataproc job ID is exists, otherwise uses the Spark application ID)
Read
createDispositionSpecifies whether the job is allowed to create new tables. The permitted values are:
  • CREATE_IF_NEEDED - Configures the job to create the table if it does not exist.
  • CREATE_NEVER - Configures the job to fail if the table does not exist.
This option takes place only in case Spark has decided to write data to the table based on the SaveMode.
(Optional. Default to CREATE_IF_NEEDED).
Write
writeMethodControls the method in which the data is written to BigQuery. Available values aredirect to use the BigQuery Storage Write API andindirect which writes the data first to GCS and then triggers a BigQuery load operation. See morehere
(Optional, defaults toindirect)
Write
writeAtLeastOnceGuarantees that data is written to BigQuery at least once. This is a lesser guarantee than exactly once. This is suitable for streaming scenarios in which data is continuously being written in small batches.
(Optional. Defaults tofalse)
Supported only by the `DIRECT` write method and mode isNOT `Overwrite`.
Write
temporaryGcsBucketThe GCS bucket that temporarily holds the data before it is loaded to BigQuery. Required unless set in the Spark configuration (spark.conf.set(...)).
Defaults to the `fs.gs.system.bucket` if exists, for example on Google Cloud Dataproc clusters, starting version 0.42.0.
Supported only by the `INDIRECT` write method.
Write
persistentGcsBucketThe GCS bucket that holds the data before it is loaded to BigQuery. If informed, the data won't be deleted after write data into BigQuery.
Supported only by the `INDIRECT` write method.
Write
persistentGcsPathThe GCS path that holds the data before it is loaded to BigQuery. Used only withpersistentGcsBucket.
Not supported by the `DIRECT` write method.
Write
intermediateFormatThe format of the data before it is loaded to BigQuery, values can be either "parquet","orc" or "avro". In order to use the Avro format, the spark-avro package must be added in runtime.
(Optional. Defaults toparquet). On write only. Supported only for the `INDIRECT` write method.
Write
useAvroLogicalTypesWhen loading from Avro (`.option("intermediateFormat", "avro")`), BigQuery uses the underlying Avro types instead of the logical types [by default](https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#logical_types). Supplying this option converts Avro logical types to their corresponding BigQuery data types.
(Optional. Defaults tofalse). On write only.
Write
datePartitionThe date partition the data is going to be written to. Should be a date string given in the formatYYYYMMDD. Can be used to overwrite the data of a single partition, like this:
df.write.format("bigquery")
  .option("datePartition", "20220331")
  .mode("overwrite")
  .save("table")

(Optional). On write only.
Can also be used with different partition types like:
HOUR:YYYYMMDDHH
MONTH:YYYYMM
YEAR:YYYY
Not supported by the `DIRECT` write method.
Write
partitionFieldIf this field is specified, the table is partitioned by this field.
For Time partitioning, specify together with the option `partitionType`.
For Integer-range partitioning, specify together with the 3 options: `partitionRangeStart`, `partitionRangeEnd, `partitionRangeInterval`.
The field must be a top-level TIMESTAMP or DATE field for Time partitioning, or INT64 for Integer-range partitioning. Its mode must beNULLABLE orREQUIRED. If the option is not set for a Time partitioned table, then the table will be partitioned by pseudo column, referenced via either'_PARTITIONTIME' as TIMESTAMP type, or'_PARTITIONDATE' as DATE type.
(Optional).
Not supported by the `DIRECT` write method.
Write
partitionExpirationMsNumber of milliseconds for which to keep the storage for partitions in the table. The storage in a partition will have an expiration time of its partition time plus this value.
(Optional).
Not supported by the `DIRECT` write method.
Write
partitionTypeUsed to specify Time partitioning.
Supported types are:HOUR, DAY, MONTH, YEAR
This option ismandatory for a target table to be Time partitioned.
(Optional. Defaults to DAY if PartitionField is specified).
Not supported by the `DIRECT` write method.
Write
partitionRangeStart,partitionRangeEnd,partitionRangeIntervalUsed to specify Integer-range partitioning.
These options aremandatory for a target table to be Integer-range partitioned.
All 3 options must be specified.
Not supported by the `DIRECT` write method.
Write
clusteredFieldsA string of non-repeated, top level columns seperated by comma.
(Optional).
Write
allowFieldAdditionAdds theALLOW_FIELD_ADDITION SchemaUpdateOption to the BigQuery LoadJob. Allowed values aretrue andfalse.
(Optional. Default tofalse).
Supported only by the `INDIRECT` write method.
Write
allowFieldRelaxationAdds theALLOW_FIELD_RELAXATION SchemaUpdateOption to the BigQuery LoadJob. Allowed values aretrue andfalse.
(Optional. Default tofalse).
Supported only by the `INDIRECT` write method.
Write
proxyAddress Address of the proxy server. The proxy must be a HTTP proxy and address should be in the `host:port` format. Can be alternatively set in the Spark configuration (spark.conf.set(...)) or in Hadoop Configuration (fs.gs.proxy.address).
(Optional. Required only if connecting to GCP via proxy.)
Read/Write
proxyUsername The userName used to connect to the proxy. Can be alternatively set in the Spark configuration (spark.conf.set(...)) or in Hadoop Configuration (fs.gs.proxy.username).
(Optional. Required only if connecting to GCP via proxy with authentication.)
Read/Write
proxyPassword The password used to connect to the proxy. Can be alternatively set in the Spark configuration (spark.conf.set(...)) or in Hadoop Configuration (fs.gs.proxy.password).
(Optional. Required only if connecting to GCP via proxy with authentication.)
Read/Write
httpMaxRetry The maximum number of retries for the low-level HTTP requests to BigQuery. Can be alternatively set in the Spark configuration (spark.conf.set("httpMaxRetry", ...)) or in Hadoop Configuration (fs.gs.http.max.retry).
(Optional. Default is 10)
Read/Write
httpConnectTimeout The timeout in milliseconds to establish a connection with BigQuery. Can be alternatively set in the Spark configuration (spark.conf.set("httpConnectTimeout", ...)) or in Hadoop Configuration (fs.gs.http.connect-timeout).
(Optional. Default is 60000 ms. 0 for an infinite timeout, a negative number for 20000)
Read/Write
httpReadTimeout The timeout in milliseconds to read data from an established connection. Can be alternatively set in the Spark configuration (spark.conf.set("httpReadTimeout", ...)) or in Hadoop Configuration (fs.gs.http.read-timeout).
(Optional. Default is 60000 ms. 0 for an infinite timeout, a negative number for 20000)
Read
arrowCompressionCodec Compression codec while reading from a BigQuery table when using Arrow format. Options :ZSTD (Zstandard compression),LZ4_FRAME (https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md),COMPRESSION_UNSPECIFIED. The recommended compression codec isZSTD while using Java.
(Optional. Defaults toCOMPRESSION_UNSPECIFIED which means no compression will be used)
Read
responseCompressionCodec Compression codec used to compress the ReadRowsResponse data. Options:RESPONSE_COMPRESSION_CODEC_UNSPECIFIED,RESPONSE_COMPRESSION_CODEC_LZ4
(Optional. Defaults toRESPONSE_COMPRESSION_CODEC_UNSPECIFIED which means no compression will be used)
Read
cacheExpirationTimeInMinutes The expiration time of the in-memory cache storing query information.
To disable caching, set the value to 0.
(Optional. Defaults to 15 minutes)
Read
enableModeCheckForSchemaFields Checks the mode of every field in destination schema to be equal to the mode in corresponding source field schema, during DIRECT write.
Default value is true i.e., the check is done by default. If set to false the mode check is ignored.
Write
enableListInference Indicates whether to use schema inference specifically when the mode is Parquet (https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions).
Defaults to false.
Write
bqChannelPoolSize The (fixed) size of the gRPC channel pool created by the BigQueryReadClient.
For optimal performance, this should be set to at least the number of cores on the cluster executors.
Read
createReadSessionTimeoutInSeconds The timeout in seconds to create a ReadSession when reading a table.
For Extremely large table this value should be increased.
(Optional. Defaults to 600 seconds)
Read
queryJobPriority Priority levels set for the job while reading data from BigQuery query. The permitted values are:
  • BATCH - Query is queued and started as soon as idle resources are available, usually within a few minutes. If the query hasn't started within 3 hours, its priority is changed toINTERACTIVE.
  • INTERACTIVE - Query is executed as soon as possible and count towards the concurrent rate limit and the daily rate limit.
For WRITE, this option will be effective when DIRECT write is used with OVERWRITE mode, where the connector overwrites the destination table using MERGE statement.
(Optional. Defaults toINTERACTIVE)
Read/Write
destinationTableKmsKeyNameDescribes the Cloud KMS encryption key that will be used to protect destination BigQuery table. The BigQuery Service Account associated with your project requires access to this encryption key. for further Information about using CMEK with BigQuery see [here](https://cloud.google.com/bigquery/docs/customer-managed-encryption#key_resource_id).
Notice: The table will be encrypted by the key only if it created by the connector. A pre-existing unencrypted table won't be encrypted just by setting this option.
(Optional)
Write
allowMapTypeConversionBoolean config to disable conversion from BigQuery records to Spark MapType when the record has two subfields with field names askey andvalue. Default value istrue which allows the conversion.
(Optional)
Read
spark.sql.sources.partitionOverwriteModeConfig to specify the overwrite mode on write when the table is range/time partitioned. Currently supportd two modes :STATIC andDYNAMIC. InSTATIC mode, the entire table is overwritten. InDYNAMIC mode, the data is overwritten by partitions of the existing table. The default value isSTATIC.
(Optional)
Write
enableReadSessionCachingBoolean config to disable read session caching. Caches BigQuery read sessions to allow for faster Spark query planning. Default value istrue.
(Optional)
Read
readSessionCacheDurationMinsConfig to set the read session caching duration in minutes. Only works ifenableReadSessionCaching istrue (default). Allows specifying the duration to cache read sessions for. Maximum allowed value is300. Default value is5.
(Optional)
Read
bigQueryJobTimeoutInMinutesConfig to set the BigQuery job timeout in minutes. Default value is360 minutes.
(Optional)
Read/Write
snapshotTimeMillisA timestamp specified in milliseconds to use to read a table snapshot. By default this is not set and the latest version of a table is read.
(Optional)
Read
bigNumericDefaultPrecisionAn alternative default precision for BigNumeric fields, as the BigQuery default is too wide for Spark. Values can be between 1 and 38. This default is used only when the field has an unparameterized BigNumeric type. Please note that there might be data loss if the actual data's precision is more than what is specified.
(Optional)
Read/Write
bigNumericDefaultScaleAn alternative default scale for BigNumeric fields. Values can be between 0 and 38, and less than bigNumericFieldsPrecision. This default is used only when the field has an unparameterized BigNumeric type. Please note that there might be data loss if the actual data's scale is more than what is specified.
(Optional)
Read/Write

Options can also be set outside of the code, using the--conf parameter ofspark-submit or--properties parameterof thegcloud dataproc submit spark. In order to use this, prepend the prefixspark.datasource.bigquery. to any ofthe options, for examplespark.conf.set("temporaryGcsBucket", "some-bucket") can also be set as--conf spark.datasource.bigquery.temporaryGcsBucket=some-bucket.

Data types

With the exception ofDATETIME andTIME all BigQuery data types directed map into the corresponding Spark SQL data type. Here are all of the mappings:

BigQuery Standard SQL Data TypeSpark SQL

Data Type

Notes
BOOLBooleanType
INT64LongType
FLOAT64DoubleType
NUMERICDecimalType Please refer toNumeric and BigNumeric support
BIGNUMERICDecimalType Please refer toNumeric and BigNumeric support
STRINGStringType
BYTESBinaryType
STRUCTStructType
ARRAYArrayType
TIMESTAMPTimestampType
DATEDateType
DATETIMEStringType,TimestampNTZType*Spark has no DATETIME type.

Spark string can be written to an existing BQ DATETIME column provided it is in theformat for BQ DATETIME literals.

* For Spark 3.4+, BQ DATETIME is read as Spark's TimestampNTZ type i.e. java LocalDateTime

TIMELongType,StringType*Spark has no TIME type. The generated longs, which indicatemicroseconds since midnight can be safely cast to TimestampType, but this causes the date to be inferred as the current day. Thus times are left as longs and user can cast if they like.

When casting to Timestamp TIME have the same TimeZone issues as DATETIME

* Spark string can be written to an existing BQ TIME column provided it is in theformat for BQ TIME literals.

JSONStringTypeSpark has no JSON type. The values are read as String. In order to write JSON back to BigQuery, the following conditions areREQUIRED:
  • Use theINDIRECT write method
  • Use theAVRO intermediate format
  • The DataFrame fieldMUST be of typeString and has an entry of sqlType=JSON in its metadata
ARRAY<STRUCT<key,value>>MapTypeBigQuery has no MAP type, therefore similar to other conversions like Apache Avro and BigQuery Load jobs, the connector converts a Spark Map to a REPEATED STRUCT<key,value>. This means that while writing and reading of maps is available, running a SQL on BigQuery that uses map semantics is not supported. To refer to the map's values using BigQuery SQL, please check theBigQuery documentation. Due to these incompatibilities, a few restrictions apply:
  • Keys can be Strings only
  • Values can be simple types (not structs)
  • For INDIRECT write, use theAVRO intermediate format. DIRECT write is supported as well

Spark ML Data Types Support

The Spark MLVector andMatrix are supported,including their dense and sparse versions. The data is saved as a BigQuery RECORD. Notice that a suffix is added tothe field's description which includes the spark type of the field.

In order to write those types to BigQuery, use the ORC or Avro intermediate format, and have them as column of theRow (i.e. not a field in a struct).

Numeric and BigNumeric support

BigQuery's BigNumeric has a precision of 76.76 (the 77th digit is partial) and scale of 38. Sincethis precision and scale is beyond spark's DecimalType (38 scale and 38 precision) support, it meansthat BigNumeric fields with precision larger than 38 cannot be used. Once this Spark limitation willbe updated the connector will be updated accordingly.

The Spark Decimal/BigQuery Numeric conversion tries to preserve the parameterization of the type, i.eNUMERIC(10,2) will be converted toDecimal(10,2) and vice versa. Notice however that there arecases wherethe parameters are lost.This means that the parameters will be reverted to the defaults - NUMERIC (38,9) and BIGNUMERIC(76,38).This means that at the moment, BigNumeric read is supported only from a standard table, but not fromBigQuery view or whenreading data from a BigQuery query.

Filtering

The connector automatically computes column and pushdown filters the DataFrame'sSELECT statement e.g.

spark.read.bigquery("bigquery-public-data:samples.shakespeare")  .select("word")  .where("word = 'Hamlet' or word = 'Claudius'")  .collect()

filters to the columnword and pushed down the predicate filterword = 'hamlet' or word = 'Claudius'.

If you do not wish to make multiple read requests to BigQuery, you can cache the DataFrame before filtering e.g.:

val cachedDF = spark.read.bigquery("bigquery-public-data:samples.shakespeare").cache()val rows = cachedDF.select("word")  .where("word = 'Hamlet'")  .collect()// All of the table was cached and this doesn't require an API callval otherRows = cachedDF.select("word_count")  .where("word = 'Romeo'")  .collect()

You can also manually specify thefilter option, which will override automatic pushdown and Spark will do the rest of the filtering in the client.

Partitioned Tables

The pseudo columns _PARTITIONDATE and _PARTITIONTIME are not part of the table schema. Therefore in order to query by the partitions ofpartitioned tables do not use the where() method shown above. Instead, add a filter option in the following manner:

val df = spark.read.format("bigquery")  .option("filter", "_PARTITIONDATE > '2019-01-01'")  ...  .load(TABLE)

Configuring Partitioning

By default the connector creates one partition per 400MB in the table being read (before filtering). This should roughly correspond to the maximum number of readers supported by the BigQuery Storage API.This can be configured explicitly with themaxParallelism property. BigQuery may limit the number of partitions based on server constraints.

Tagging BigQuery Resources

In order to support tracking the usage of BigQuery resources the connectorsoffers the following options to tag BigQuery resources:

Adding BigQuery Jobs Labels

The connector can launch BigQuery load and query jobs. Adding labels to the jobsis done in the following manner:

spark.conf.set("bigQueryJobLabel.cost_center", "analytics")spark.conf.set("bigQueryJobLabel.usage", "nightly_etl")

This will create labelscost_center=analytics andusage=nightly_etl.

Adding BigQuery Storage Trace ID

Used to annotate the read and write sessions. The trace ID is of the formatSpark:ApplicationName:JobID. This is an opt-in option, and to use it the userneed to set thetraceApplicationName property. JobID is auto generated by theDataproc job ID, with a fallback to the Spark application ID (such asapplication_1648082975639_0001). The Job ID can be overridden by setting thetraceJobId option. Notice that the total length of the trace ID cannot be over256 characters.

Using in Jupyter Notebooks

The connector can be used inJupyter notebooks even ifit is not installed on the Spark cluster. It can be added as an external jar inusing the following code:

Python:

frompyspark.sqlimportSparkSessionspark=SparkSession.builder \  .config("spark.jars.packages","com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.42.2") \  .getOrCreate()df=spark.read.format("bigquery") \  .load("dataset.table")

Scala:

valspark=SparkSession.builder.config("spark.jars.packages","com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.42.2").getOrCreate()valdf= spark.read.format("bigquery").load("dataset.table")

In case Spark cluster is using Scala 2.12 (it's optional for Spark 2.4.x,mandatory in 3.0.x), then the relevant package iscom.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.42.2. Inorder to know which Scala version is used, please run the following code:

Python:

spark.sparkContext._jvm.scala.util.Properties.versionString()

Scala:

scala.util.Properties.versionString

Compiling against the connector

Unless you wish to use the implicit Scala APIspark.read.bigquery("TABLE_ID"), there is no need to compile against the connector.

To include the connector in your project:

Maven

<dependency>  <groupId>com.google.cloud.spark</groupId>  <artifactId>spark-bigquery-with-dependencies_${scala.version}</artifactId>  <version>0.42.2</version></dependency>

SBT

libraryDependencies+="com.google.cloud.spark"%%"spark-bigquery-with-dependencies"%"0.42.2"

Connector metrics and how to view them

Spark populates a lot of metrics which can be found by the end user in the spark history page. But all these metrics are spark related which are implicitly collected without any change from the connector.But there are few metrics which are populated from the BigQuery and currently are visible in the application logs which can be read in the driver/executor logs.

From Spark 3.2 onwards, spark has provided the API to expose custom metrics in the spark UI pagehttps://spark.apache.org/docs/3.2.0/api/java/org/apache/spark/sql/connector/metric/CustomMetric.html

Currently, using this API, connector exposes the following bigquery metrics during read

<style>table#metricstable td, table th{word-break:break-word}</style>
Metric NameDescription
bytes readnumber of BigQuery bytes read
rows readnumber of BigQuery rows read
scan timethe amount of time spent between read rows response requested to obtained across all the executors, in milliseconds.
parse timethe amount of time spent for parsing the rows read across all the executors, in milliseconds.
spark timethe amount of time spent in spark to process the queries (i.e., apart from scanning and parsing), across all the executors, in milliseconds.

Note: To use the metrics in the Spark UI page, you need to make sure thespark-bigquery-metrics-0.42.2.jar is the class path before starting the history-server and the connector version isspark-3.2 or above.

FAQ

What is the Pricing for the Storage API?

See theBigQuery pricing documentation.

I have very few partitions

You can manually set the number of partitions with themaxParallelism property. BigQuery may provide fewer partitions than you ask for. SeeConfiguring Partitioning.

You can also always repartition after reading in Spark.

I get quota exceeded errors while writing

If there are too many partitions the CreateWriteStream or Throughputquotasmay be exceeded. This occurs because while the data within each partition is processed serially, independentpartitions may be processed in parallel on different nodes within the spark cluster. Generally, to ensure maximumsustained throughput you should file a quota increase request. However, you can also manually reduce the number ofpartitions being written by callingcoalesce on the DataFrame to mitigate this problem.

desiredPartitionCount = 5dfNew = df.coalesce(desiredPartitionCount)dfNew.write

A rule of thumb is to have a single partition handle at least 1GB of data.

Also note that a job running with thewriteAtLeastOnce property turned on will not encounter CreateWriteStreamquota errors.

How do I authenticate outside GCE / Dataproc?

The connector needs an instance of a GoogleCredentials in order to connect to the BigQuery APIs. There are multipleoptions to provide it:

  • The default is to load the JSON key from theGOOGLE_APPLICATION_CREDENTIALS environment variable, as describedhere.
  • In case the environment variable cannot be changed, the credentials file can be configured asas a spark option. The file should reside on the same path on all the nodes of the cluster.
// Globallyspark.conf.set("credentialsFile", "</path/to/key/file>")// Per read/Writespark.read.format("bigquery").option("credentialsFile", "</path/to/key/file>")
  • Credentials can also be provided explicitly, either as a parameter or from Spark runtime configuration.They should be passed in as a base64-encoded string directly.
// Globallyspark.conf.set("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")// Per read/Writespark.read.format("bigquery").option("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
  • In cases where the user has an internal service providing the Google AccessToken, a custom implementationcan be done, creating only the AccessToken and providing its TTL. Token refresh will re-generate a new token. In orderto use this, implement thecom.google.cloud.bigquery.connector.common.AccessTokenProviderinterface. The fully qualified class name of the implementation should be provided in thegcpAccessTokenProvideroption.AccessTokenProvider must be implemented in Java or other JVM language such as Scala or Kotlin. It musteither have a no-arg constructor or a constructor accepting a singlejava.util.String argument. This configurationparameter can be supplied using thegcpAccessTokenProviderConfig option. If this is not provided then the no-argconstructor wil be called. The jar containing the implementation should be on the cluster's classpath.
// Globallyspark.conf.set("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")// Per read/Writespark.read.format("bigquery").option("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
  • Service account impersonation can be configured for a specific username and a group name, orfor all users by default using below properties:

    • gcpImpersonationServiceAccountForUser_<USER_NAME> (not set by default)

      The service account impersonation for a specific user.

    • gcpImpersonationServiceAccountForGroup_<GROUP_NAME> (not set by default)

      The service account impersonation for a specific group.

    • gcpImpersonationServiceAccount (not set by default)

      Default service account impersonation for all users.

    If any of the above properties are set then the service account specified will be impersonated bygenerating a short-lived credentials when accessing BigQuery.

    If more than one property is set then the service account associated with the username will takeprecedence over the service account associated with the group name for a matching user and group,which in turn will take precedence over default service account impersonation.

  • For a simpler application, where access token refresh is not required, another alternative is to pass the access tokenas thegcpAccessToken configuration option. You can get the access token by runninggcloud auth application-default print-access-token.

// Globallyspark.conf.set("gcpAccessToken", "<access-token>")// Per read/Writespark.read.format("bigquery").option("gcpAccessToken", "<acccess-token>")

Important: TheCredentialsProvider andAccessTokenProvider need to be implemented in Java orother JVM language such as Scala or Kotlin. The jar containing the implementation should be on the cluster's classpath.

Notice: Only one of the above options should be provided.

How do I connect to GCP/BigQuery via Proxy?

To connect to a forward proxy and to authenticate the user credentials, configure the following options.

proxyAddress: Address of the proxy server. The proxy must be an HTTP proxy and address should be in thehost:portformat.

proxyUsername: The userName used to connect to the proxy.

proxyPassword: The password used to connect to the proxy.

val df = spark.read.format("bigquery")  .option("proxyAddress", "http://my-proxy:1234")  .option("proxyUsername", "my-username")  .option("proxyPassword", "my-password")  .load("some-table")

The same proxy parameters can also be set globally using Spark's RuntimeConfig like this:

spark.conf.set("proxyAddress", "http://my-proxy:1234")spark.conf.set("proxyUsername", "my-username")spark.conf.set("proxyPassword", "my-password")val df = spark.read.format("bigquery")  .load("some-table")

You can set the following in the hadoop configuration as well.

fs.gs.proxy.address(similar to "proxyAddress"),fs.gs.proxy.username(similar to "proxyUsername") andfs.gs.proxy.password(similar to "proxyPassword").

If the same parameter is set at multiple places the order of priority is as follows:

option("key", "value") > spark.conf > hadoop configuration

About

BigQuery data source for Apache Spark: Read data from BigQuery into DataFrames, write DataFrames into BigQuery tables.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages


[8]ページ先頭

©2009-2025 Movatter.jp