Receive and parse Pub/Sub messages about data profiles

This document provides examples that demonstrate how to receive and parsenotifications about changes to your data profiles. Sensitive Data Protection sendsthese updates in the form ofPub/Sub messages.

Overview

You can configure Sensitive Data Protection to automatically generate profiles about data across anorganization, folder, or project.Data profiles containmetrics and metadata about your data and help you determine wheresensitive and high-risk data reside.Sensitive Data Protection reports these metrics at various levels of detail. For information about thetypes of data you can profile, seeSupportedresources.

When configuring the data profiler, you can turn on the option to publishPub/Sub messages whenever significant changes in your dataprofiles occur. The messages help you take immediate action in response tothose changes. The following are the events that you can listen for:

  • A data asset is profiled for the first time.
  • A profile is updated.
  • The risk or sensitivity score of a profile increases.
  • There is a new error related to your data profiles.

The Pub/Sub messages that the data profiler publishes contain aDataProfilePubSubMessage object. These messages are always sent in binaryformat, so you need to write code that receives and parses them.

Pricing

When you use Pub/Sub, you are billed according toPub/Sub pricing.

Before you begin

This page assumes the following:

Before you start working on the examples, follow these steps:

  1. Create a Pub/Sub topic and add a subscriptionfor it. Don't assign a schema to the topic.

    For simplicity, the examples on this page listen to only one subscription.However, in practice, you can create a topic and subscription for each eventthat Sensitive Data Protection supports.

  2. If you haven't already done so, configure the data profiler to publishPub/Sub messages:

    1. Edit your scan configuration.

    2. On theEdit scan configuration page, turn on thePublish toPub/Sub option and select the events that you want to listen for. Then,configure the settings for each event.

    3. Save the scan configuration.

  3. Grant the Sensitive Data Protection service agent publishingaccess on the Pub/Sub topic. An example ofa role that has publishing access is the Pub/Sub Publisher role(roles/pubsub.publisher). The Sensitive Data Protection service agent is anemail address in the format:

    service-PROJECT_NUMBER@dlp-api.iam.gserviceaccount.com

    If you're working with an organization- or folder-level scan configuration,thePROJECT_NUMBER is the numerical identifier of theserviceagent container. If you're working with a project-level scan configuration,thePROJECT_NUMBER is the numerical identifier of your project.

    Note: If there are configuration or permission issues with the Pub/Sub topic,Sensitive Data Protection retries sending the Pub/Sub notification for up totwo weeks. After two weeks, the notification is discarded.
  4. Install and set up theSensitive Data Protection client library forJava or Python.

Examples

The following examples demonstrate how to receive and parsePub/Sub messages that the data profiler publishes. You canrepurpose these examples and deploy them as Cloud Run functionsthat are triggered by Pub/Sub events. For more information, seePub/Sub tutorial (2nd gen).

In the following examples, replace the following:

  • PROJECT_ID: the ID of the project that contains the Pub/Sub subscription.
  • SUBSCRIPTION_ID: the ID of the Pub/Sub subscription.

Java

importcom.google.api.core.ApiService;importcom.google.cloud.pubsub.v1.AckReplyConsumer;importcom.google.cloud.pubsub.v1.MessageReceiver;importcom.google.cloud.pubsub.v1.Subscriber;importcom.google.privacy.dlp.v2.DataProfilePubSubMessage;importcom.google.protobuf.InvalidProtocolBufferException;importcom.google.pubsub.v1.ProjectSubscriptionName;importcom.google.pubsub.v1.PubsubMessage;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.TimeoutException;publicclassDataProfilePubSubMessageParser{publicstaticvoidmain(String...args)throwsException{StringprojectId="PROJECT_ID";StringsubscriptionId="SUBSCRIPTION_ID";inttimeoutSeconds=5;// The `ProjectSubscriptionName.of` method creates a fully qualified identifier// in the form `projects/{projectId}/subscriptions/{subscriptionId}`.ProjectSubscriptionNamesubscriptionName=ProjectSubscriptionName.of(projectId,subscriptionId);MessageReceiverreceiver=(PubsubMessagepubsubMessage,AckReplyConsumerconsumer)->{try{DataProfilePubSubMessagemessage=DataProfilePubSubMessage.parseFrom(pubsubMessage.getData());System.out.println("PubsubMessage with ID: "+pubsubMessage.getMessageId()+"; message size: "+pubsubMessage.getData().size()+"; event: "+message.getEvent()+"; profile name: "+message.getProfile().getName()+"; full resource: "+message.getProfile().getFullResource());consumer.ack();}catch(InvalidProtocolBufferExceptione){e.printStackTrace();}};// Create subscriber client.Subscribersubscriber=Subscriber.newBuilder(subscriptionName,receiver).build();try{ApiServiceapiService=subscriber.startAsync();apiService.awaitRunning();System.out.printf("Listening for messages on %s for %d seconds.%n",subscriptionName,timeoutSeconds);subscriber.awaitTerminated(timeoutSeconds,TimeUnit.SECONDS);}catch(TimeoutExceptionignored){}finally{subscriber.stopAsync();}}}

Python

fromgoogle.cloudimportpubsub_v1fromconcurrent.futuresimportTimeoutErrorfromgoogle.cloudimportdlp_v2project_id="PROJECT_ID"subscription_id="SUBSCRIPTION_ID"timeout=5.0subscriber=pubsub_v1.SubscriberClient()# The `subscription_path` method creates a fully qualified identifier# in the form `projects/{project_id}/subscriptions/{subscription_id}`subscription_path=subscriber.subscription_path(project_id,subscription_id)defcallback(message:pubsub_v1.subscriber.message.Message)->None:print(f"Received{message.data}.")dlp_msg=dlp_v2.DataProfilePubSubMessage()dlp_msg._pb.ParseFromString(message.data)print("Parsed message: ",dlp_msg)print("--------")message.ack()streaming_pull_future=subscriber.subscribe(subscription_path,callback=callback)print(f"Listening for messages on{subscription_path} for{timeout} seconds...")# Wrap subscriber in a 'with' block to automatically call close() when done.withsubscriber:try:# When `timeout` is not set, result() will block indefinitely,# unless an exception is encountered first.streaming_pull_future.result(timeout=timeout)exceptTimeoutError:streaming_pull_future.cancel()# Trigger the shutdown.streaming_pull_future.result()# Block until the shutdown is complete.print("Done waiting.")

What's next

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2025-12-17 UTC.