Stream a Kafka topic to Hive Stay organized with collections Save and categorize content based on your preferences.
Apache Kafka isan open source distributed streaming platform for real-time datapipelines and data integration. It provides an efficient and scalable streaming systemfor use in a variety of applications, including:
- Real-time analytics
- Stream processing
- Log aggregation
- Distributed messaging
- Event streaming
Objectives
Install Kafka on aDataproc HA clusterwith ZooKeeper (referred to in this tutorial as a "Dataproc Kafka cluster").
Create fictitious customer data, then publish the data to a Kafka topic.
Create Hive parquet and ORC tables in Cloud Storageto receive streamed Kafka topic data.
Submit a PySpark job to subscribe to and stream the Kafka topic intoCloud Storage in parquet and ORC format.
Run a query on the streamed Hive table data to count the streamedKafka messages.
Costs
In this document, you use the following billable components of Google Cloud:
To generate a cost estimate based on your projected usage, use thepricing calculator.
When you finish the tasks that are described in this document, you can avoid continued billing by deleting the resources that you created. For more information, seeClean up.
Before you begin
If you haven't already done so, create a Google Cloud project.
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Note: If you don't plan to keep the resources that you create in this procedure, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project.Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
- Create a project: To create a project, you need the Project Creator role (
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission.Learn how to grant roles.
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission.Learn how to grant roles.In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Note: If you don't plan to keep the resources that you create in this procedure, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project.Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
- Create a project: To create a project, you need the Project Creator role (
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission.Learn how to grant roles.
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission.Learn how to grant roles.- In the Google Cloud console, go to the Cloud StorageBuckets page.
- ClickCreate.
- On theCreate a bucket page, enter your bucket information. To go to the next step, clickContinue.
- In theGet started section, do the following:
- Enter a globally unique name that meets thebucket naming requirements.
- To add abucket label, expand theLabels section (), clickadd_boxAdd label, and specify a
keyand avaluefor your label.
- In theChoose where to store your data section, do the following:
- Select aLocation type.
- Choose a location where your bucket's data is permanently stored from theLocation type drop-down menu.
- If you select thedual-region location type, you can also choose to enableturbo replication by using the relevant checkbox.
- To set upcross-bucket replication, selectAdd cross-bucket replication via Storage Transfer Service and follow these steps:
Set up cross-bucket replication
- In theBucket menu, select a bucket.
In theReplication settings section, clickConfigure to configure settings for the replication job.
TheConfigure cross-bucket replication pane appears.
- To filter objects to replicate by object name prefix, enter a prefix that you want to include or exclude objects from, then clickAdd a prefix.
- To set a storage class for the replicated objects, select a storage class from theStorage class menu. If you skip this step, the replicated objects will use the destination bucket's storage class by default.
- ClickDone.
- In theChoose how to store your data section, do the following:
- Select adefault storage class for the bucket orAutoclass for automatic storage class management of your bucket's data.
- To enablehierarchical namespace, in theOptimize storage for data-intensive workloads section, selectEnable hierarchical namespace on this bucket.Note: You cannot enable hierarchical namespace in existing buckets.
- In theChoose how to control access to objects section, select whether or not your bucket enforcespublic access prevention, and select anaccess control method for your bucket's objects.Note: You cannot change thePrevent public access setting if this setting is enforced at anorganization policy.
- In theChoose how to protect object data section, do the following:
- Select any of the options underData protection that you want to set for your bucket.
- To enablesoft delete, click theSoft delete policy (For data recovery) checkbox, and specify the number of days you want to retain objects after deletion.
- To setObject Versioning, click theObject versioning (For version control) checkbox, and specify the maximum number of versions per object and the number of days after which the noncurrent versions expire.
- To enable the retention policy on objects and buckets, click theRetention (For compliance) checkbox, and then do the following:
- To enableObject Retention Lock, click theEnable object retention checkbox.
- To enableBucket Lock, click theSet bucket retention policy checkbox, and choose a unit of time and a length of time for your retention period.
- To choose how your object data will be encrypted, expand theData encryption section (), and select aData encryption method.
- Select any of the options underData protection that you want to set for your bucket.
- In theGet started section, do the following:
- ClickCreate.
Tutorial steps
Perform the following steps to create a Dataproc Kafka cluster toread a Kafka topic into Cloud Storage in parquet OR ORC format.
Note: In a production environment, some of tasks listed in thefollowing sections are performed on separate clusters. For purposesof this tutorial, all tasks are performed on a single DataprocKafka cluster.Copy the Kafka installation script to Cloud Storage
Thekafka.shinitialization actionscript installs Kafka on a Dataproc cluster.
Browse the code.
#!/bin/bash# Copyright 2015 Google, Inc.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.## This script installs Apache Kafka (http://kafka.apache.org) on a Google Cloud# Dataproc cluster.set-euxopipefailreadonlyZOOKEEPER_HOME=/usr/lib/zookeeperreadonlyKAFKA_HOME=/usr/lib/kafkareadonlyKAFKA_PROP_FILE='/etc/kafka/conf/server.properties'readonlyROLE="$(/usr/share/google/get_metadata_valueattributes/dataproc-role)"readonlyRUN_ON_MASTER="$(/usr/share/google/get_metadata_valueattributes/run-on-master||echofalse)"readonlyKAFKA_ENABLE_JMX="$(/usr/share/google/get_metadata_valueattributes/kafka-enable-jmx||echofalse)"readonlyKAFKA_JMX_PORT="$(/usr/share/google/get_metadata_valueattributes/kafka-jmx-port||echo9999)"readonlyINSTALL_KAFKA_PYTHON="$(/usr/share/google/get_metadata_valueattributes/install-kafka-python||echofalse)"# The first ZooKeeper server address, e.g., "cluster1-m-0:2181".ZOOKEEPER_ADDRESS=''# Integer broker ID of this node, e.g., 0BROKER_ID=''functionretry_apt_command(){cmd="$1"for((i=0;i <10;i++));doifeval"$cmd";thenreturn0fisleep5donereturn1}functionrecv_keys(){if[[${OS}==debian]] &&[[$(echo"${DATAPROC_IMAGE_VERSION} >= 3.0"|bc-l)==1]];thenretry_apt_command"apt-get update && apt-get install -y gnupg"exportGNUPGHOME="$(mktemp-d)"trap'rm -rf "${GNUPGHOME}"'EXITgpg--keyserverkeyserver.ubuntu.com--recv-keysB7B3B788A8D3785Cmkdir-p/etc/apt/trusted.gpg.dgpg--exportB7B3B788A8D3785C >/etc/apt/trusted.gpg.d/mysql-repo.gpgelseretry_apt_command"apt-get install -y gnupg2 && \ apt-key adv --keyserver keyserver.ubuntu.com --recv-keys B7B3B788A8D3785C"fi}functionupdate_apt_get(){retry_apt_command"apt-get update"}functioninstall_apt_get(){pkgs="$@"retry_apt_command"apt-get install -y$pkgs"}functionerr(){echo"[$(date+'%Y-%m-%dT%H:%M:%S%z')]:$@">&2return1}# Returns the list of broker IDs registered in ZooKeeper, e.g., " 0, 2, 1,".functionget_broker_list(){${KAFKA_HOME}/bin/zookeeper-shell.sh"${ZOOKEEPER_ADDRESS}"\<<<"ls /brokers/ids"|grep'\[.*\]'|sed's/\[/ /'|sed's/\]/,/'}# Waits for zookeeper to be up or time out.functionwait_for_zookeeper(){foriin{1..20};doif"${ZOOKEEPER_HOME}/bin/zkCli.sh"-server"${ZOOKEEPER_ADDRESS}"ls/;thenreturn0elseecho"Failed to connect to ZooKeeper${ZOOKEEPER_ADDRESS}, retry${i}..."sleep5fidoneecho"Failed to connect to ZooKeeper${ZOOKEEPER_ADDRESS}">&2exit1}# Wait until the current broker is registered or time out.functionwait_for_kafka(){foriin{1..20};dolocalbroker_list=$(get_broker_list||true)if[["${broker_list}"==*"${BROKER_ID},"*]];thenreturn0elseecho"Kafka broker${BROKER_ID} is not registered yet, retry${i}..."sleep5fidoneecho"Failed to start Kafka broker${BROKER_ID}.">&2exit1}functioninstall_and_configure_kafka_server(){# Find zookeeper list first, before attempting any installation.localzookeeper_client_portzookeeper_client_port=$(grep'clientPort'/etc/zookeeper/conf/zoo.cfg|tail-n1|cut-d'='-f2)localzookeeper_listzookeeper_list=$(grep'^server\.'/etc/zookeeper/conf/zoo.cfg|cut-d'='-f2|cut-d':'-f1|sort|uniq|sed"s/$/:${zookeeper_client_port}/"|xargsecho|sed"s/ /,/g")if[[-z"${zookeeper_list}"]];then# Didn't find zookeeper quorum in zoo.cfg, but possibly workers just didn't# bother to populate it. Check if YARN HA is configured.zookeeper_list=$(bdconfigget_property_value--configuration_file\/etc/hadoop/conf/yarn-site.xml\--nameyarn.resourcemanager.zk-address2>/dev/null)fi# If all attempts failed, error out.if[[-z"${zookeeper_list}"]];thenerr'Failed to find configured Zookeeper list; try "--num-masters=3" for HA'fiZOOKEEPER_ADDRESS="${zookeeper_list%%,*}"# Install Kafka from Dataproc distro.install_apt_getkafka-server||dpkg-lkafka-server||err'Unable to install and find kafka-server.'mkdir-p/var/lib/kafka-logschownkafka:kafka-R/var/lib/kafka-logsif[["${ROLE}"=="Master"]];then# For master nodes, broker ID starts from 10,000.if[["$(hostname)"==*-m]];then# non-HABROKER_ID=10000else# HABROKER_ID=$((10000+$(hostname|sed's/.*-m-\([0-9]*\)$/\1/g')))fielse# For worker nodes, broker ID is a random number generated less than 10000.# 10000 is choosen since the max broker ID allowed being set is 10000.BROKER_ID=$((RANDOM%10000))fised-i's|log.dirs=/tmp/kafka-logs|log.dirs=/var/lib/kafka-logs|'\"${KAFKA_PROP_FILE}"sed-i's|^\(zookeeper\.connect=\).*|\1'${zookeeper_list}'|'\"${KAFKA_PROP_FILE}"sed-i's,^\(broker\.id=\).*,\1'${BROKER_ID}','\"${KAFKA_PROP_FILE}"echo-e'\nreserved.broker.max.id=100000'>>"${KAFKA_PROP_FILE}"echo-e'\ndelete.topic.enable=true'>>"${KAFKA_PROP_FILE}"if[["${KAFKA_ENABLE_JMX}"=="true"]];thensed-i'/kafka-run-class.sh/i export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Djava.net.preferIPv4Stack=true"'/usr/lib/kafka/bin/kafka-server-start.shsed-i"/kafka-run-class.sh/i export JMX_PORT=${KAFKA_JMX_PORT}"/usr/lib/kafka/bin/kafka-server-start.shfiwait_for_zookeeper# Start Kafka.servicekafka-serverrestartwait_for_kafka}functioninstall_kafka_python_package(){KAFKA_PYTHON_PACKAGE="kafka-python==2.0.2"if[["${INSTALL_KAFKA_PYTHON}"!="true"]];thenreturnfiif[["$(echo"${DATAPROC_IMAGE_VERSION} > 2.0"|bc)"-eq1]];then/opt/conda/default/bin/pipinstall"${KAFKA_PYTHON_PACKAGE}"||{sleep10;/opt/conda/default/bin/pipinstall"${KAFKA_PYTHON_PACKAGE}";}elseOS=$(./etc/os-release &&echo"${ID}")if[["${OS}"=="rocky"]];thenyuminstall-ypython2-pipelseapt-getinstall-ypython-pipfipip2install"${KAFKA_PYTHON_PACKAGE}"||{sleep10;pip2install"${KAFKA_PYTHON_PACKAGE}";}||{sleep10;pipinstall"${KAFKA_PYTHON_PACKAGE}";}fi}functionremove_old_backports{# This script uses 'apt-get update' and is therefore potentially dependent on# backports repositories which have been archived. In order to mitigate this# problem, we will remove any reference to backports repos older than oldstable# https://github.com/GoogleCloudDataproc/initialization-actions/issues/1157oldstable=$(curl-shttps://deb.debian.org/debian/dists/oldstable/Release|awk'/^Codename/ {print $2}');stable=$(curl-shttps://deb.debian.org/debian/dists/stable/Release|awk'/^Codename/ {print $2}');matched_files="$(grep-rsil'\-backports'/etc/apt/sources.list*)"if[[-n"$matched_files"]];thenforfilenamein"$matched_files";dogrep-e"$oldstable-backports"-e"$stable-backports""$filename"||\sed-i-e's/^.*-backports.*$//'"$filename"donefi}functionmain(){OS=$(./etc/os-release &&echo"${ID}")if[[${OS}==debian]] &&[[$(echo"${DATAPROC_IMAGE_VERSION} <= 2.1"|bc-l)==1]];thenremove_old_backportsfirecv_keys||err'Unable to receive keys.'update_apt_get||err'Unable to update packages lists.'install_kafka_python_package# Only run the installation on workers; verify zookeeper on master(s).if[["${ROLE}"=='Master']];thenservicezookeeper-serverstatus||err'Required zookeeper-server not running on master!'if[["${RUN_ON_MASTER}"=="true"]];then# Run installation on masters.install_and_configure_kafka_serverelse# On master nodes, just install kafka command-line tools and libs but not# kafka-server.install_apt_getkafka||err'Unable to install kafka libraries on master!'fielse# Run installation on workers.install_and_configure_kafka_serverfi}mainCopy the
kafka.shinitialization actionscript to your Cloud Storage bucket.This script installs Kafka on a Dataproc cluster.OpenCloud Shell, then runthe following command:
gcloud storage cp gs://goog-dataproc-initialization-actions-REGION/kafka/kafka.sh gs://BUCKET_NAME/scripts/
Make the following replacements:
- REGION:
kafka.shis stored in public regionally-taggedbuckets in Cloud Storage. Specify a geographically closeCompute Engine region,(example:us-central1). - BUCKET_NAME: The name of your Cloud Storage bucket.
- REGION:
kafka.sh script is available from the GitHubinitialization-actions/kafka/ repository. You can clone or download the script onto your local machine, thenmodify the source specified in the previousgcloud storage cp example to copythe local script togs://BUCKET_NAME/scripts/.Create a Dataproc Kafka cluster
OpenCloud Shell, then runthe following
gcloud dataproc clusters createcommand to create a DataprocHA clustercluster that installs the Kafka and ZooKeeper components:gcloud dataproc clusters createKAFKA_CLUSTER \ --project=PROJECT_ID \ --region=REGION \ --image-version=2.1-debian11 \ --num-masters=3 \ --enable-component-gateway \ --initialization-actions=gs://BUCKET_NAME/scripts/kafka.sh
Notes:
- KAFKA_CLUSTER: The cluster name, which must be unique within a project.The name must start with a lowercase letter, and can contain up to 51 lowercaseletters, numbers, and hyphens. It cannot end with a hyphen. The name of adeleted cluster can be reused.
- PROJECT_ID: The project to associate with this cluster.
- REGION: TheCompute Engine regionwhere the cluster will be located, such as
us-central1.- You can add the optional
--zone=ZONEflagto specify a zone within the specified region,such asus-central1-a. If you do not specify a zone, theDataprocautozone placementfeature selects a zone with the specified region.
- You can add the optional
--image-version: Dataproc image version2.1-debian11is recommended for this tutorial.Note: Each image version contains a set of pre-installedcomponents, including the Hive component used in thistutorial (seeSupported Dataproc image versions).--num-master:3master nodes create anHA cluster.The Zookeeper component, which is required by Kafka, is pre-installedon an HA cluster.In future Kafkareleases,Zookeeper may be deprecated and removed.--enable-component-gateway: Enables theDataproc Component Gateway.- BUCKET_NAME: The name of your Cloud Storage bucketthat contains the
/scripts/kafka.shinitialization script(seeCopy the Kafka installation script to Cloud Storage).
Create a Kafkacustdata topic
To create a Kafka topic on the Dataproc Kafka cluster:
Use theSSHutility to open a terminal window on the cluster master VM.
Create a Kafka
custdatatopic./usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-serverKAFKA_CLUSTER-w-0:9092 \ --create --topic custdata
Notes:
KAFKA_CLUSTER: Insert the name of your Kafka cluster.
-w-0:9092signifies the Kafka broker running onport9092on theworker-0node.You can run the following commands after creating the
custdatatopic:# List all topics./usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-serverKAFKA_CLUSTER-w-0:9092 \ --list
# Consume then display topic data./usr/lib/kafka/bin/kafka-console-consumer.sh \ --bootstrap-serverKAFKA_CLUSTER-w-0:9092 \ --topic custdata
# Count the number of messages in the topic./usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-listKAFKA_CLUSTER-w-0:9092 \ --topic custdata# Delete topic./usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-serverKAFKA_CLUSTER-w-0:9092 \ --delete --topic custdata
Publish content to the Kafkacustdata topic
The following script uses thekafka-console-producer.sh Kafka tool togenerate fictitious customer data in CSV format.
Copy, then paste the script in the SSHterminal on the master node of your Kafka cluster. Press<return> to run the script.
for i in {1..10000}; do \custname="cust name${i}"uuid=$(dbus-uuidgen)age=$((45 + $RANDOM % 45))amount=$(echo "$(( $RANDOM % 99999 )).$(( $RANDOM % 99 ))")message="${uuid}:${custname},${age},${amount}"echo ${message}done | /usr/lib/kafka/bin/kafka-console-producer.sh \--broker-listKAFKA_CLUSTER-w-0:9092 \--topic custdata \--property "parse.key=true" \--property "key.separator=:"Notes:
- KAFKA_CLUSTER: The name of your Kafka cluster.
Run the following Kafka command to confirm the
custdatatopic contains10,000 messages./usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \--broker-listKAFKA_CLUSTER-w-0:9092 \--topic custdata
Notes:
- KAFKA_CLUSTER: The name of your Kafka cluster.
Expected output:
custdata:0:10000
Create Hive tables in Cloud Storage
Create Hive tables to receive streamed Kafka topic data.Perform the following steps to createcust_parquet (parquet) and acust_orc (ORC) Hive tables in your Cloud Storage bucket.
Insert yourBUCKET_NAME in the following script,then copy and paste the script into the SSH terminal on your Kafka cluster master node,then press<return> to create a
~/hivetables.hql(Hive Query Language) script.You will run the
~/hivetables.hqlscriptin the next step to create parquet and ORC Hive tablesin your Cloud Storage bucket.cat > ~/hivetables.hql <<EOFdrop table if exists cust_parquet;create external table if not exists cust_parquet(uuid string, custname string, age string, amount string)row format delimited fields terminated by ','stored as parquetlocation "gs://BUCKET_NAME/tables/cust_parquet";drop table if exists cust_orc;create external table if not exists cust_orc(uuid string, custname string, age string, amount string)row format delimited fields terminated by ','stored as orclocation "gs://BUCKET_NAME/tables/cust_orc";EOF
In the SSH terminal on the master node of your Kafka cluster,submit the
~/hivetables.hqlHive job to createcust_parquet(parquet) and acust_orc(ORC) Hive tablesin your Cloud Storage bucket.gcloud dataproc jobs submit hive \ --cluster=KAFKA_CLUSTER \ --region=REGION \ -f ~/hivetables.hql
Notes:
- The Hive component is pre-installed on the Dataproc Kafkacluster. See2.1.x release versionsfor a list of the Hive component versions included in recently released 2.1 images.
- KAFKA_CLUSTER: The name of your Kafka cluster.
- REGION: The region where your Kafka cluster is located.
Stream Kafkacustdata to Hive tables
- Run the following command in the in the SSH terminal on the master node ofyour Kafka cluster to install the
kafka-pythonlibrary.A Kafka client is needed to stream Kafka topic data toCloud Storage.Typically, the Kafka client runs on a separatemachine, but this tutorial uses one cluster for allprocesses for simplicity and to save costs.pip install kafka-python
Insert yourBUCKET_NAME, then copy then paste the followingPySpark code into the SSH terminal on your Kafka cluster master node, andthen press<return> to create a
streamdata.pyfile.The script subscribes to the Kafka
Note: The Kafka topic data streamed to Cloud Storage is incompressed binary format.custdatatopic, then streams thedata to your Hive tables in Cloud Storage. The output format,which can be parquet or ORC, is passed into the script asa parameter.cat >streamdata.py <<EOF#!/bin/pythonimportsysfrompyspark.sql.functionsimport*frompyspark.sql.typesimport*frompyspark.sqlimportSparkSessionfromkafkaimportKafkaConsumerdefgetNameFn(data):returndata.split(",")[0]defgetAgeFn(data):returndata.split(",")[1]defgetAmtFn(data):returndata.split(",")[2]defmain(cluster,outputfmt):spark=SparkSession.builder.appName("APP").getOrCreate()spark.sparkContext.setLogLevel("WARN")Logger=spark._jvm.org.apache.log4j.Loggerlogger=Logger.getLogger(__name__)rows=spark.readStream.format("kafka") \.option("kafka.bootstrap.servers",cluster+"-w-0:9092").option("subscribe","custdata") \.option("startingOffsets","earliest")\.load()getNameUDF=udf(getNameFn,StringType())getAgeUDF=udf(getAgeFn,StringType())getAmtUDF=udf(getAmtFn,StringType())logger.warn("Params passed in are cluster name: "+cluster+" output format(sink): "+outputfmt)query=rows.select(col("key").cast("string").alias("uuid"),\getNameUDF(col("value").cast("string")).alias("custname"),\getAgeUDF(col("value").cast("string")).alias("age"),\getAmtUDF(col("value").cast("string")).alias("amount"))writer=query.writeStream.format(outputfmt)\.option("path","gs://BUCKET_NAME/tables/cust_"+outputfmt)\.option("checkpointLocation","gs://BUCKET_NAME/chkpt/"+outputfmt+"wr") \.outputMode("append")\.start()writer.awaitTermination()if__name__=="__main__":iflen(sys.argv) <2:print("Invalid number of arguments passed ",len(sys.argv))print("Usage: ",sys.argv[0]," cluster format")print("e.g.: ",sys.argv[0]," <cluster_name> orc")print("e.g.: ",sys.argv[0]," <cluster_name> parquet")main(sys.argv[1],sys.argv[2])EOFIn the SSH terminal on the master node ofyour Kafka cluster, run
spark-submitto stream data toyour Hive tables in Cloud Storage.Insert the name of yourKAFKA_CLUSTER and the outputFORMAT, then copy and paste the following code into the SSHterminal on the master node of your Kafka cluster, and then press<return>to run the code and stream the Kafka
custdatadata in parquet format to yourHive tables in Cloud Storage.spark-submit --packages \org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 \ --conf spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \ --conf spark.driver.memory=4096m \ --conf spark.executor.cores=2 \ --conf spark.executor.instances=2 \ --conf spark.executor.memory=6144m \ streamdata.pyKAFKA_CLUSTERFORMAT
Notes:
- KAFKA_CLUSTER: Insert the name of your Kafka cluster.
- FORMAT: Specify either
parquetororcas theoutput format. You can run the command successively to streamboth formats to the Hive tables: for example, in the first invocation,specifyparquetto stream the Kafkacustdatatopic to the Hiveparquet table; then, in second invocation, specifyorcformat tostreamcustdatato the Hive ORC table.
After standard output halts in the SSH terminal, which signifies thatall of the
custdatahas been streamed, press<control-c> in the SSH terminal to stop the process.List the Hive tables in Cloud Storage.
gcloud storage ls gs://BUCKET_NAME/tables/* --recursive
Notes:
- BUCKET_NAME: Insert the name of the Cloud Storagebucket that contains your Hive tables (seeCreate Hive tables).
Query streamed data
In the SSH terminal on the master node ofyour Kafka cluster, run the following
hivecommandto count the streamed Kafkacustdatamessagesin the Hive tables in Cloud Storage.hive -e "select count(1) fromTABLE_NAME"
Notes:
- TABLE_NAME: Specify either
cust_parquetorcust_orcas theHive table name.
Expected output snippet:
- TABLE_NAME: Specify either
...Status:Running(ExecutingonYARNclusterwithAppidapplication_....)----------------------------------------------------------------------------------------------VERTICESMODESTATUSTOTALCOMPLETEDRUNNINGPENDINGFAILEDKILLED----------------------------------------------------------------------------------------------Map1..........containerSUCCEEDED110000Reducer2......containerSUCCEEDED110000----------------------------------------------------------------------------------------------VERTICES:02/02[==========================>>]100%ELAPSEDTIME:9.89s----------------------------------------------------------------------------------------------OK10000Timetaken:21.394seconds,Fetched:1row(s)Clean up
Delete the project
Delete resources
- In the Google Cloud console, go to the Cloud StorageBuckets page.
- Click the checkbox for the bucket that you want to delete.
- To delete the bucket, clickDelete, and then follow the instructions.
- Delete your Kafka cluster:
gcloud dataproc clusters deleteKAFKA_CLUSTER \ --region=${REGION}
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2025-12-15 UTC.