Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

License

NotificationsYou must be signed in to change notification settings

googleapis/java-pubsublite-spark

Java idiomatic client forPub/Sub Lite Spark Connector.

MavenStability

Quickstart

If you are using Maven, add this to your pom.xml file:

<dependency>  <groupId>com.google.cloud</groupId>  <artifactId>pubsublite-spark-sql-streaming</artifactId>  <version>1.0.0</version></dependency>

If you are using Gradle without BOM, add this to your dependencies:

implementation'com.google.cloud:pubsublite-spark-sql-streaming:1.0.0'

If you are using SBT, add this to your dependencies:

libraryDependencies+="com.google.cloud"%"pubsublite-spark-sql-streaming"%"1.0.0"

Authentication

See theAuthentication section in the base directory's README.

Authorization

The client application making API calls must be grantedauthorization scopes required for the desired Pub/Sub Lite Spark Connector APIs, and the authenticated principal must have theIAM role(s) required to access GCP resources using the Pub/Sub Lite Spark Connector API calls.

Getting Started

Prerequisites

You will need aGoogle Cloud Platform Console project with the Pub/Sub Lite Spark ConnectorAPI enabled.You will need toenable billing to use Google Pub/Sub Lite Spark Connector.Follow these instructions to get your project set up. You will also need to set up the local development environment byinstalling the Google Cloud SDK and running the following commands in command line:gcloud auth login andgcloud config set project [YOUR PROJECT ID].

Installation and setup

You'll need to obtain thepubsublite-spark-sql-streaming library. See theQuickstart sectionto addpubsublite-spark-sql-streaming as a dependency in your code.

About Pub/Sub Lite Spark Connector

Google Cloud Pub/Sub Lite is a zonal, real-time messagingservice that lets you send and receive messages between independentapplications. You can manually configure the throughput and storage capacityfor Pub/Sub Lite systems.

The Pub/Sub Lite Spark connector supports Pub/Sub Lite as an input source toApache Spark Structured Streaming in both the default micro-batch processingmode and theexperimental continous processing mode. The connector works inall Apache Spark distributions, includingGoogle Cloud Dataprocand manual Spark installations.

Requirements

Creating a new subscription or using an existing subscription

Followthe instruction to create a new subscription or use an existing subscription. If using an existing subscription, the connector will read from the oldest unacknowledged message in the subscription.

Creating a Google Cloud Dataproc cluster (Optional)

If you do not have an Apache Spark environment, you can create aCloud Dataproc cluster with pre-configured auth. The following examples assume you are using Cloud Dataproc, but you can usespark-submit on any cluster.

MY_CLUSTER=...gcloud dataproc clusters create "$MY_CLUSTER"

Downloading and Using the Connector

The latest version of the connector is publicly available from theMaven Central repository. You can download and pass it in the--jars option when using thespark-submit command.

Compatibility

Connector versionSpark version
≤0.3.42.4.X
Current3.X.X

Usage

Samples

There are 3 java samples (word count, simple write, simple read) undersamples that shows using the connector inside Dataproc.

Reading data from Pub/Sub Lite

Here is an example in Python:

df=spark.readStream \  .format("pubsublite") \  .option("pubsublite.subscription","projects/$PROJECT_NUMBER/locations/$LOCATION/subscriptions/$SUBSCRIPTION_ID") \  .load

Here is an example in Java:

Dataset<Row>df =spark  .readStream()  .format("pubsublite")  .option("pubsublite.subscription","projects/$PROJECT_NUMBER/locations/$LOCATION/subscriptions/$SUBSCRIPTION_ID")  .load();

Note that the connector supports both MicroBatch Processing andContinuous Processing.

Writing data to Pub/Sub Lite

Here is an example in Python:

df.writeStream \  .format("pubsublite") \  .option("pubsublite.topic","projects/$PROJECT_NUMBER/locations/$LOCATION/topics/$TOPIC_ID") \  .option("checkpointLocation","path/to/HDFS/dir")  .outputMode("complete") \  .trigger(processingTime="2 seconds") \  .start()

Here is an example in Java:

df.writeStream()  .format("pubsublite")  .option("pubsublite.topic","projects/$PROJECT_NUMBER/locations/$LOCATION/topics/$TOPIC_ID")  .option("checkpointLocation","path/to/HDFS/dir")  .outputMode(OutputMode.Complete())  .trigger(Trigger.ProcessingTime(2,TimeUnit.SECONDS))  .start();

Properties

When reading from Pub/Sub Lite, the connector supports a number of configuration options:

OptionTypeRequiredDefault ValueMeaning
pubsublite.subscriptionStringYFull subscription path that the connector will read from.
pubsublite.flowcontrol.byteoutstandingperpartitionLongN50_000_000Max number of bytes per partition that will be cached in workers before Spark processes the messages.
pubsublite.flowcontrol.messageoutstandingperpartitionLongNLong.MAXMax number of messages per partition that will be cached in workers before Spark processes the messages.
pubsublite.flowcontrol.maxmessagesperbatchLongNLong.MAXMax number of messages in micro batch.
gcp.credentials.keyStringNApplication Default CredentialsService account JSON in base64.

When writing to Pub/Sub Lite, the connector supports a number of configuration options:

OptionTypeRequiredDefault ValueMeaning
pubsublite.topicStringYFull topic path that the connector will write to.
gcp.credentials.keyStringNApplication Default CredentialsService account JSON in base64.

Data Schema

When reading from Pub/Sub Lite, the connector has a fixed data schema as follows:

Data FieldSpark Data TypeNotes
subscriptionStringTypeFull subscription path
partitionLongType
offsetLongType
keyBinaryType
dataBinaryType
attributesMapType[StringType, ArrayType[BinaryType]]
publish_timestampTimestampType
event_timestampTimestampTypeNullable

When writing to Pub/Sub Lite, the connetor matches the following data field and data types as follows:

Data FieldSpark Data TypeRequired
keyBinaryTypeN
dataBinaryTypeN
attributesMapType[StringType, ArrayType[BinaryType]]N
event_timestampTimestampTypeN

Note that when a data field is present in the table but the data type mismatches, the connector will throw IllegalArgumentException that terminates the query.

Building the Connector

The connector is built using Maven. Following command creates a JAR file with shaded dependencies:

mvn package

FAQ

What is the cost for the Pub/Sub Lite?

See thePub/Sub Lite pricing documentation.

Can I configure the number of Spark partitions?

No, the number of Spark partitions is set to be the number of Pub/Sub Lite partitions of the topic that the subscription is attached to.

How do I authenticate outside Cloud Compute Engine / Cloud Dataproc?

Use a service account JSON key andGOOGLE_APPLICATION_CREDENTIALS as describedhere.

Credentials can be provided withgcp.credentials.key option, it needs to be passed in as a base64-encoded string.

Example:

spark.readStream.format("pubsublite").option("gcp.credentials.key","<SERVICE_ACCOUNT_JSON_IN_BASE64>")

Samples

Samples are in thesamples/ directory.

SampleSource CodeTry it
Admin Utilssource codeOpen in Cloud Shell
Common Utilssource codeOpen in Cloud Shell
Publish Wordssource codeOpen in Cloud Shell
Read Resultssource codeOpen in Cloud Shell
Simple Readsource codeOpen in Cloud Shell
Simple Writesource codeOpen in Cloud Shell
Word Countsource codeOpen in Cloud Shell

Troubleshooting

To get help, follow the instructions in theshared Troubleshooting document.

Transport

Pub/Sub Lite Spark Connector uses gRPC for the transport layer.

Supported Java Versions

Java 8 or above is required for using this client.

Google's Java client libraries,Google Cloud Client LibrariesandGoogle Cloud API Libraries,follow theOracle Java SE support roadmap(see the Oracle Java SE Product Releases section).

For new development

In general, new feature development occurs with support for the lowest JavaLTS version covered by Oracle's Premier Support (which typically lasts 5 yearsfrom initial General Availability). If the minimum required JVM for a givenlibrary is changed, it is accompanied by asemver major release.

Java 11 and (in September 2021) Java 17 are the best choices for newdevelopment.

Keeping production systems current

Google tests its client libraries with all current LTS versions covered byOracle's Extended Support (which typically lasts 8 years from initialGeneral Availability).

Legacy support

Google's client libraries support legacy versions of Java runtimes with longterm stable libraries that don't receive feature updates on a best efforts basisas it may not be possible to backport all patches.

Google provides updates on a best efforts basis to apps that continue to useJava 7, though apps might need to upgrade to current versions of the librarythat supports their JVM.

Where to find specific information

The latest versions and the supported Java versions are identified onthe individual GitHub repositorygithub.com/GoogleAPIs/java-SERVICENAMEand ongoogle-cloud-java.

Versioning

This library followsSemantic Versioning.

Contributing

Contributions to this library are always welcome and highly encouraged.

SeeCONTRIBUTING for more information how to get started.

Please note that this project is released with a Contributor Code of Conduct. By participating inthis project you agree to abide by its terms. SeeCode of Conduct for moreinformation.

License

Apache 2.0 - SeeLICENSE for more information.

CI Status

Java VersionStatus
Java 8Kokoro CI
Java 8 OSXKokoro CI
Java 8 WindowsKokoro CI
Java 11Kokoro CI

Java is a registered trademark of Oracle and/or its affiliates.

About

No description, website, or topics provided.

Resources

License

Code of conduct

Contributing

Security policy

Stars

Watchers

Forks

Packages

No packages published

Contributors23


[8]ページ先頭

©2009-2025 Movatter.jp