Computing numerical and categorical statistics

You can use Sensitive Data Protection to compute numerical and categoricalnumerical statistics for individual columns in BigQuery tables.Sensitive Data Protection can calculate the following:

  • The column's minimum value
  • The column's maximum value
  • Quantile values for the column
  • A histogram of value frequencies in the column

Compute numerical statistics

You can determine minimum, maximum, and quantile values for an individualBigQuery column. To calculate these values, you configure aDlpJob,setting theNumericalStatsConfigprivacy metric to the name of the column to scan. When you run thejob,Sensitive Data Protection computes statistics for the given column, returningits results in theNumericalStatsResultobject. Sensitive Data Protection can compute statistics for the followingnumber types:

  • integer
  • float
  • date
  • datetime
  • timestamp
  • time

The statistics that a scan run returns include the minimum value, the maximumvalue, and 99quantilevalues thatpartition the set of field values into 100 equal sized buckets.

Code examples

Following is sample code in several languages that demonstrates how to useSensitive Data Protection to calculate numerical statistics.

Important: The code on this page requires that you first set up a Sensitive Data Protection client. For more information about installing and creating a Sensitive Data Protection client, seeSensitive Data Protection client libraries. (Sending JSON to Sensitive Data Protection REST endpoints does not require a client library.)

C#

To learn how to install and use the client library for Sensitive Data Protection, seeSensitive Data Protection client libraries.

To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, seeSet up authentication for a local development environment.

usingGoogle.Api.Gax.ResourceNames;usingGoogle.Cloud.Dlp.V2;usingGoogle.Cloud.PubSub.V1;usingNewtonsoft.Json;usingSystem;usingSystem.Collections.Generic;usingSystem.Linq;usingSystem.Threading;usingSystem.Threading.Tasks;usingstaticGoogle.Cloud.Dlp.V2.Action.Types;usingstaticGoogle.Cloud.Dlp.V2.PrivacyMetric.Types;publicclassRiskAnalysisCreateNumericalStats{publicstaticAnalyzeDataSourceRiskDetails.Types.NumericalStatsResultNumericalStats(stringcallingProjectId,stringtableProjectId,stringdatasetId,stringtableId,stringtopicId,stringsubscriptionId,stringcolumnName){vardlp=DlpServiceClient.Create();// Construct + submit the jobvarconfig=newRiskAnalysisJobConfig{PrivacyMetric=newPrivacyMetric{NumericalStatsConfig=newNumericalStatsConfig{Field=newFieldId{Name=columnName}}},SourceTable=newBigQueryTable{ProjectId=tableProjectId,DatasetId=datasetId,TableId=tableId},Actions={newGoogle.Cloud.Dlp.V2.Action{PubSub=newPublishToPubSub{Topic=$"projects/{callingProjectId}/topics/{topicId}"}}}};varsubmittedJob=dlp.CreateDlpJob(newCreateDlpJobRequest{ParentAsProjectName=newProjectName(callingProjectId),RiskJob=config});// Listen to pub/sub for the jobvarsubscriptionName=newSubscriptionName(callingProjectId,subscriptionId);varsubscriber=SubscriberClient.CreateAsync(subscriptionName).Result;// SimpleSubscriber runs your message handle function on multiple// threads to maximize throughput.vardone=newManualResetEventSlim(false);subscriber.StartAsync((PubsubMessagemessage,CancellationTokencancel)=>{if(message.Attributes["DlpJobName"]==submittedJob.Name){Thread.Sleep(500);// Wait for DLP API results to become consistentdone.Set();returnTask.FromResult(SubscriberClient.Reply.Ack);}else{returnTask.FromResult(SubscriberClient.Reply.Nack);}});done.Wait(TimeSpan.FromMinutes(10));// 10 minute timeout; may not work for large jobssubscriber.StopAsync(CancellationToken.None).Wait();// Process resultsvarresultJob=dlp.GetDlpJob(newGetDlpJobRequest{DlpJobName=DlpJobName.Parse(submittedJob.Name)});varresult=resultJob.RiskDetails.NumericalStatsResult;// 'UnpackValue(x)' is a prettier version of 'x.toString()'Console.WriteLine($"Value Range: [{UnpackValue(result.MinValue)}, {UnpackValue(result.MaxValue)}]");varlastValue=string.Empty;for(varquantile=0;quantile <result.QuantileValues.Count;quantile++){varcurrentValue=UnpackValue(result.QuantileValues[quantile]);if(lastValue!=currentValue){Console.WriteLine($"Value at {quantile + 1}% quantile: {currentValue}");}lastValue=currentValue;}returnresult;}publicstaticstringUnpackValue(ValueprotoValue){varjsonValue=JsonConvert.DeserializeObject<Dictionary<string,object>>(protoValue.ToString());returnjsonValue.Values.ElementAt(0).ToString();}}

Go

To learn how to install and use the client library for Sensitive Data Protection, seeSensitive Data Protection client libraries.

To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, seeSet up authentication for a local development environment.

import("context""fmt""io""time"dlp"cloud.google.com/go/dlp/apiv2""cloud.google.com/go/dlp/apiv2/dlppb""cloud.google.com/go/pubsub")// riskNumerical computes the numerical risk of the given column.funcriskNumerical(wio.Writer,projectID,dataProject,pubSubTopic,pubSubSub,datasetID,tableID,columnNamestring)error{// projectID := "my-project-id"// dataProject := "bigquery-public-data"// pubSubTopic := "dlp-risk-sample-topic"// pubSubSub := "dlp-risk-sample-sub"// datasetID := "nhtsa_traffic_fatalities"// tableID := "accident_2015"// columnName := "state_number"ctx:=context.Background()client,err:=dlp.NewClient(ctx)iferr!=nil{returnfmt.Errorf("dlp.NewClient: %w",err)}// Create a PubSub Client used to listen for when the inspect job finishes.pubsubClient,err:=pubsub.NewClient(ctx,projectID)iferr!=nil{returnerr}deferpubsubClient.Close()// Create a PubSub subscription we can use to listen for messages.// Create the Topic if it doesn't exist.t:=pubsubClient.Topic(pubSubTopic)topicExists,err:=t.Exists(ctx)iferr!=nil{returnerr}if!topicExists{ift,err=pubsubClient.CreateTopic(ctx,pubSubTopic);err!=nil{returnerr}}// Create the Subscription if it doesn't exist.s:=pubsubClient.Subscription(pubSubSub)subExists,err:=s.Exists(ctx)iferr!=nil{returnerr}if!subExists{ifs,err=pubsubClient.CreateSubscription(ctx,pubSubSub,pubsub.SubscriptionConfig{Topic:t});err!=nil{returnerr}}// topic is the PubSub topic string where messages should be sent.topic:="projects/"+projectID+"/topics/"+pubSubTopic// Create a configured request.req:=&dlppb.CreateDlpJobRequest{Parent:fmt.Sprintf("projects/%s/locations/global",projectID),Job:&dlppb.CreateDlpJobRequest_RiskJob{RiskJob:&dlppb.RiskAnalysisJobConfig{// PrivacyMetric configures what to compute.PrivacyMetric:&dlppb.PrivacyMetric{Type:&dlppb.PrivacyMetric_NumericalStatsConfig_{NumericalStatsConfig:&dlppb.PrivacyMetric_NumericalStatsConfig{Field:&dlppb.FieldId{Name:columnName,},},},},// SourceTable describes where to find the data.SourceTable:&dlppb.BigQueryTable{ProjectId:dataProject,DatasetId:datasetID,TableId:tableID,},// Send a message to PubSub using Actions.Actions:[]*dlppb.Action{{Action:&dlppb.Action_PubSub{PubSub:&dlppb.Action_PublishToPubSub{Topic:topic,},},},},},},}// Create the risk job.j,err:=client.CreateDlpJob(ctx,req)iferr!=nil{returnfmt.Errorf("CreateDlpJob: %w",err)}fmt.Fprintf(w,"Created job: %v\n",j.GetName())// Wait for the risk job to finish by waiting for a PubSub message.// This only waits for 10 minutes. For long jobs, consider using a truly// asynchronous execution model such as Cloud Functions.ctx,cancel:=context.WithTimeout(ctx,10*time.Minute)defercancel()err=s.Receive(ctx,func(ctxcontext.Context,msg*pubsub.Message){// If this is the wrong job, do not process the result.ifmsg.Attributes["DlpJobName"]!=j.GetName(){msg.Nack()return}msg.Ack()time.Sleep(500*time.Millisecond)resp,err:=client.GetDlpJob(ctx,&dlppb.GetDlpJobRequest{Name:j.GetName(),})iferr!=nil{fmt.Fprintf(w,"GetDlpJob: %v",err)return}n:=resp.GetRiskDetails().GetNumericalStatsResult()fmt.Fprintf(w,"Value range: [%v, %v]\n",n.GetMinValue(),n.GetMaxValue())vartmpstringforp,v:=rangen.GetQuantileValues(){ifv.String()!=tmp{fmt.Fprintf(w,"Value at %v quantile: %v\n",p,v)tmp=v.String()}}// Stop listening for more messages.cancel()})iferr!=nil{returnfmt.Errorf("Recieve: %w",err)}returnnil}

Java

To learn how to install and use the client library for Sensitive Data Protection, seeSensitive Data Protection client libraries.

To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, seeSet up authentication for a local development environment.

importcom.google.api.core.SettableApiFuture;importcom.google.cloud.dlp.v2.DlpServiceClient;importcom.google.cloud.pubsub.v1.AckReplyConsumer;importcom.google.cloud.pubsub.v1.MessageReceiver;importcom.google.cloud.pubsub.v1.Subscriber;importcom.google.privacy.dlp.v2.Action;importcom.google.privacy.dlp.v2.Action.PublishToPubSub;importcom.google.privacy.dlp.v2.AnalyzeDataSourceRiskDetails.NumericalStatsResult;importcom.google.privacy.dlp.v2.BigQueryTable;importcom.google.privacy.dlp.v2.CreateDlpJobRequest;importcom.google.privacy.dlp.v2.DlpJob;importcom.google.privacy.dlp.v2.FieldId;importcom.google.privacy.dlp.v2.GetDlpJobRequest;importcom.google.privacy.dlp.v2.LocationName;importcom.google.privacy.dlp.v2.PrivacyMetric;importcom.google.privacy.dlp.v2.PrivacyMetric.NumericalStatsConfig;importcom.google.privacy.dlp.v2.RiskAnalysisJobConfig;importcom.google.privacy.dlp.v2.Value;importcom.google.pubsub.v1.ProjectSubscriptionName;importcom.google.pubsub.v1.ProjectTopicName;importcom.google.pubsub.v1.PubsubMessage;importjava.io.IOException;importjava.util.concurrent.ExecutionException;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.TimeoutException;classRiskAnalysisNumericalStats{publicstaticvoidmain(String[]args)throwsException{// TODO(developer): Replace these variables before running the sample.StringprojectId="your-project-id";StringdatasetId="your-bigquery-dataset-id";StringtableId="your-bigquery-table-id";StringtopicId="pub-sub-topic";StringsubscriptionId="pub-sub-subscription";numericalStatsAnalysis(projectId,datasetId,tableId,topicId,subscriptionId);}publicstaticvoidnumericalStatsAnalysis(StringprojectId,StringdatasetId,StringtableId,StringtopicId,StringsubscriptionId)throwsExecutionException,InterruptedException,IOException{// Initialize client that will be used to send requests. This client only needs to be created// once, and can be reused for multiple requests. After completing all of your requests, call// the "close" method on the client to safely clean up any remaining background resources.try(DlpServiceClientdlpServiceClient=DlpServiceClient.create()){// Specify the BigQuery table to analyzeBigQueryTablebigQueryTable=BigQueryTable.newBuilder().setTableId(tableId).setDatasetId(datasetId).setProjectId(projectId).build();// This represents the name of the column to analyze, which must contain numerical dataStringcolumnName="Age";// Configure the privacy metric for the jobFieldIdfieldId=FieldId.newBuilder().setName(columnName).build();NumericalStatsConfignumericalStatsConfig=NumericalStatsConfig.newBuilder().setField(fieldId).build();PrivacyMetricprivacyMetric=PrivacyMetric.newBuilder().setNumericalStatsConfig(numericalStatsConfig).build();// Create action to publish job status notifications over Google Cloud Pub/SubProjectTopicNametopicName=ProjectTopicName.of(projectId,topicId);PublishToPubSubpublishToPubSub=PublishToPubSub.newBuilder().setTopic(topicName.toString()).build();Actionaction=Action.newBuilder().setPubSub(publishToPubSub).build();// Configure the risk analysis job to performRiskAnalysisJobConfigriskAnalysisJobConfig=RiskAnalysisJobConfig.newBuilder().setSourceTable(bigQueryTable).setPrivacyMetric(privacyMetric).addActions(action).build();CreateDlpJobRequestcreateDlpJobRequest=CreateDlpJobRequest.newBuilder().setParent(LocationName.of(projectId,"global").toString()).setRiskJob(riskAnalysisJobConfig).build();// Send the request to the API using the clientDlpJobdlpJob=dlpServiceClient.createDlpJob(createDlpJobRequest);// Set up a Pub/Sub subscriber to listen on the job completion statusfinalSettableApiFuture<Boolean>done=SettableApiFuture.create();ProjectSubscriptionNamesubscriptionName=ProjectSubscriptionName.of(projectId,subscriptionId);MessageReceivermessageHandler=(PubsubMessagepubsubMessage,AckReplyConsumerackReplyConsumer)->{handleMessage(dlpJob,done,pubsubMessage,ackReplyConsumer);};Subscribersubscriber=Subscriber.newBuilder(subscriptionName,messageHandler).build();subscriber.startAsync();// Wait for job completion semi-synchronously// For long jobs, consider using a truly asynchronous execution model such as Cloud Functionstry{done.get(15,TimeUnit.MINUTES);}catch(TimeoutExceptione){System.out.println("Job was not completed after 15 minutes.");return;}finally{subscriber.stopAsync();subscriber.awaitTerminated();}// Build a request to get the completed jobGetDlpJobRequestgetDlpJobRequest=GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();// Retrieve completed job statusDlpJobcompletedJob=dlpServiceClient.getDlpJob(getDlpJobRequest);System.out.println("Job status: "+completedJob.getState());System.out.println("Job name: "+dlpJob.getName());// Get the result and parse through and process the informationNumericalStatsResultresult=completedJob.getRiskDetails().getNumericalStatsResult();System.out.printf("Value range : [%.3f, %.3f]\n",result.getMinValue().getFloatValue(),result.getMaxValue().getFloatValue());intpercent=1;DoublelastValue=null;for(ValuequantileValue:result.getQuantileValuesList()){DoublecurrentValue=quantileValue.getFloatValue();if(lastValue==null||!lastValue.equals(currentValue)){System.out.printf("Value at %s %% quantile : %.3f",percent,currentValue);}lastValue=currentValue;}}}// handleMessage injects the job and settableFuture into the message reciever interfaceprivatestaticvoidhandleMessage(DlpJobjob,SettableApiFuture<Boolean>done,PubsubMessagepubsubMessage,AckReplyConsumerackReplyConsumer){StringmessageAttribute=pubsubMessage.getAttributesMap().get("DlpJobName");if(job.getName().equals(messageAttribute)){done.set(true);ackReplyConsumer.ack();}else{ackReplyConsumer.nack();}}}

Node.js

To learn how to install and use the client library for Sensitive Data Protection, seeSensitive Data Protection client libraries.

To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, seeSet up authentication for a local development environment.

// Import the Google Cloud client librariesconstDLP=require('@google-cloud/dlp');const{PubSub}=require('@google-cloud/pubsub');// Instantiates clientsconstdlp=newDLP.DlpServiceClient();constpubsub=newPubSub();// The project ID to run the API call under// const projectId = 'my-project';// The project ID the table is stored under// This may or (for public datasets) may not equal the calling project ID// const tableProjectId = 'my-project';// The ID of the dataset to inspect, e.g. 'my_dataset'// const datasetId = 'my_dataset';// The ID of the table to inspect, e.g. 'my_table'// const tableId = 'my_table';// The name of the column to compute risk metrics for, e.g. 'age'// Note that this column must be a numeric data type// const columnName = 'firstName';// The name of the Pub/Sub topic to notify once the job completes// TODO(developer): create a Pub/Sub topic to use for this// const topicId = 'MY-PUBSUB-TOPIC'// The name of the Pub/Sub subscription to use when listening for job// completion notifications// TODO(developer): create a Pub/Sub subscription to use for this// const subscriptionId = 'MY-PUBSUB-SUBSCRIPTION'asyncfunctionnumericalRiskAnalysis(){constsourceTable={projectId:tableProjectId,datasetId:datasetId,tableId:tableId,};// Construct request for creating a risk analysis jobconstrequest={parent:`projects/${projectId}/locations/global`,riskJob:{privacyMetric:{numericalStatsConfig:{field:{name:columnName,},},},sourceTable:sourceTable,actions:[{pubSub:{topic:`projects/${projectId}/topics/${topicId}`,},},],},};// Create helper function for unpacking valuesconstgetValue=obj=>obj[Object.keys(obj)[0]];// Run risk analysis jobconst[topicResponse]=awaitpubsub.topic(topicId).get();constsubscription=awaittopicResponse.subscription(subscriptionId);const[jobsResponse]=awaitdlp.createDlpJob(request);constjobName=jobsResponse.name;console.log(`Job created. Job name:${jobName}`);// Watch the Pub/Sub topic until the DLP job finishesawaitnewPromise((resolve,reject)=>{constmessageHandler=message=>{if(message.attributes &&message.attributes.DlpJobName===jobName){message.ack();subscription.removeListener('message',messageHandler);subscription.removeListener('error',errorHandler);resolve(jobName);}else{message.nack();}};consterrorHandler=err=>{subscription.removeListener('message',messageHandler);subscription.removeListener('error',errorHandler);reject(err);};subscription.on('message',messageHandler);subscription.on('error',errorHandler);});setTimeout(()=>{console.log(' Waiting for DLP job to fully complete');},500);const[job]=awaitdlp.getDlpJob({name:jobName});constresults=job.riskDetails.numericalStatsResult;console.log(`Value Range: [${getValue(results.minValue)},${getValue(results.maxValue)}]`);// Print unique quantile valueslettempValue=null;results.quantileValues.forEach((result,percent)=>{constvalue=getValue(result);// Only print new valuesif(tempValue!==value&&!(tempValue &&tempValue.equals &&tempValue.equals(value))){console.log(`Value at${percent}% quantile:${value}`);tempValue=value;}});}awaitnumericalRiskAnalysis();

PHP

To learn how to install and use the client library for Sensitive Data Protection, seeSensitive Data Protection client libraries.

To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, seeSet up authentication for a local development environment.

use Google\Cloud\Dlp\V2\Action;use Google\Cloud\Dlp\V2\Action\PublishToPubSub;use Google\Cloud\Dlp\V2\BigQueryTable;use Google\Cloud\Dlp\V2\Client\DlpServiceClient;use Google\Cloud\Dlp\V2\CreateDlpJobRequest;use Google\Cloud\Dlp\V2\DlpJob\JobState;use Google\Cloud\Dlp\V2\FieldId;use Google\Cloud\Dlp\V2\GetDlpJobRequest;use Google\Cloud\Dlp\V2\PrivacyMetric;use Google\Cloud\Dlp\V2\PrivacyMetric\NumericalStatsConfig;use Google\Cloud\Dlp\V2\RiskAnalysisJobConfig;use Google\Cloud\PubSub\PubSubClient;/** * Computes risk metrics of a column of numbers in a Google BigQuery table. * * @param string $callingProjectId  The project ID to run the API call under * @param string $dataProjectId     The project ID containing the target Datastore * @param string $topicId           The name of the Pub/Sub topic to notify once the job completes * @param string $subscriptionId    The name of the Pub/Sub subscription to use when listening for job * @param string $datasetId         The ID of the BigQuery dataset to inspect * @param string $tableId           The ID of the BigQuery table to inspect * @param string $columnName        The name of the column to compute risk metrics for, e.g. "age" */function numerical_stats(    string $callingProjectId,    string $dataProjectId,    string $topicId,    string $subscriptionId,    string $datasetId,    string $tableId,    string $columnName): void {    // Instantiate a client.    $dlp = new DlpServiceClient();    $pubsub = new PubSubClient();    $topic = $pubsub->topic($topicId);    // Construct risk analysis config    $columnField = (new FieldId())        ->setName($columnName);    $statsConfig = (new NumericalStatsConfig())        ->setField($columnField);    $privacyMetric = (new PrivacyMetric())        ->setNumericalStatsConfig($statsConfig);    // Construct items to be analyzed    $bigqueryTable = (new BigQueryTable())        ->setProjectId($dataProjectId)        ->setDatasetId($datasetId)        ->setTableId($tableId);    // Construct the action to run when job completes    $pubSubAction = (new PublishToPubSub())        ->setTopic($topic->name());    $action = (new Action())        ->setPubSub($pubSubAction);    // Construct risk analysis job config to run    $riskJob = (new RiskAnalysisJobConfig())        ->setPrivacyMetric($privacyMetric)        ->setSourceTable($bigqueryTable)        ->setActions([$action]);    // Listen for job notifications via an existing topic/subscription.    $subscription = $topic->subscription($subscriptionId);    // Submit request    $parent = "projects/$callingProjectId/locations/global";    $createDlpJobRequest = (new CreateDlpJobRequest())        ->setParent($parent)        ->setRiskJob($riskJob);    $job = $dlp->createDlpJob($createDlpJobRequest);    // Poll Pub/Sub using exponential backoff until job finishes    // Consider using an asynchronous execution model such as Cloud Functions    $attempt = 1;    $startTime = time();    do {        foreach ($subscription->pull() as $message) {            if (                isset($message->attributes()['DlpJobName'])&&                $message->attributes()['DlpJobName'] === $job->getName()            ) {                $subscription->acknowledge($message);                // Get the updated job. Loop to avoid race condition with DLP API.                do {                    $getDlpJobRequest = (new GetDlpJobRequest())                        ->setName($job->getName());                    $job = $dlp->getDlpJob($getDlpJobRequest);                } while ($job->getState() == JobState::RUNNING);                break 2; // break from parent do while            }        }        print('Waiting for job to complete' . PHP_EOL);        // Exponential backoff with max delay of 60 seconds        sleep(min(60, pow(2, ++$attempt)));    } while (time() - $startTime < 600); // 10 minute timeout    // Helper function to convert Protobuf values to strings    $valueToString = function ($value) {        $json = json_decode($value->serializeToJsonString(), true);        return array_shift($json);    };    // Print finding counts    printf('Job %s status: %s' . PHP_EOL, $job->getName(), JobState::name($job->getState()));    switch ($job->getState()) {        case JobState::DONE:            $results = $job->getRiskDetails()->getNumericalStatsResult();            printf(                'Value range: [%s, %s]' . PHP_EOL,                $valueToString($results->getMinValue()),                $valueToString($results->getMaxValue())            );            // Only print unique values            $lastValue = null;            foreach ($results->getQuantileValues() as $percent => $quantileValue) {                $value = $valueToString($quantileValue);                if ($value != $lastValue) {                    printf('Value at %s quantile: %s' . PHP_EOL, $percent, $value);                    $lastValue = $value;                }            }            break;        case JobState::FAILED:            printf('Job %s had errors:' . PHP_EOL, $job->getName());            $errors = $job->getErrors();            foreach ($errors as $error) {                var_dump($error->getDetails());            }            break;        case JobState::PENDING:            print('Job has not completed. Consider a longer timeout or an asynchronous execution model' . PHP_EOL);            break;        default:            print('Unexpected job state. Most likely, the job is either running or has not yet started.');    }}

Python

To learn how to install and use the client library for Sensitive Data Protection, seeSensitive Data Protection client libraries.

To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, seeSet up authentication for a local development environment.

importconcurrent.futuresimportgoogle.cloud.dlpimportgoogle.cloud.pubsubdefnumerical_risk_analysis(project:str,table_project_id:str,dataset_id:str,table_id:str,column_name:str,topic_id:str,subscription_id:str,timeout:int=300,)->None:"""Uses the Data Loss Prevention API to compute risk metrics of a column       of numerical data in a Google BigQuery table.    Args:        project: The Google Cloud project id to use as a parent resource.        table_project_id: The Google Cloud project id where the BigQuery table            is stored.        dataset_id: The id of the dataset to inspect.        table_id: The id of the table to inspect.        column_name: The name of the column to compute risk metrics for.        topic_id: The name of the Pub/Sub topic to notify once the job            completes.        subscription_id: The name of the Pub/Sub subscription to use when            listening for job completion notifications.        timeout: The number of seconds to wait for a response from the API.    Returns:        None; the response from the API is printed to the terminal.    """# Instantiate a client.dlp=google.cloud.dlp_v2.DlpServiceClient()# Convert the project id into full resource ids.topic=google.cloud.pubsub.PublisherClient.topic_path(project,topic_id)parent=f"projects/{project}/locations/global"# Location info of the BigQuery table.source_table={"project_id":table_project_id,"dataset_id":dataset_id,"table_id":table_id,}# Tell the API where to send a notification when the job is complete.actions=[{"pub_sub":{"topic":topic}}]# Configure risk analysis job# Give the name of the numeric column to compute risk metrics forrisk_job={"privacy_metric":{"numerical_stats_config":{"field":{"name":column_name}}},"source_table":source_table,"actions":actions,}# Call API to start risk analysis joboperation=dlp.create_dlp_job(request={"parent":parent,"risk_job":risk_job})defcallback(message:google.cloud.pubsub_v1.subscriber.message.Message)->None:ifmessage.attributes["DlpJobName"]==operation.name:# This is the message we're looking for, so acknowledge it.message.ack()# Now that the job is done, fetch the results and print them.job=dlp.get_dlp_job(request={"name":operation.name})print(f"Job name:{job.name}")results=job.risk_details.numerical_stats_resultprint("Value Range: [{},{}]".format(results.min_value.integer_value,results.max_value.integer_value,))prev_value=Noneforpercent,resultinenumerate(results.quantile_values):value=result.integer_valueifprev_value!=value:print(f"Value at{percent}% quantile:{value}")prev_value=valuesubscription.set_result(None)else:# This is not the message we're looking for.message.drop()# Create a Pub/Sub client and find the subscription. The subscription is# expected to already be listening to the topic.subscriber=google.cloud.pubsub.SubscriberClient()subscription_path=subscriber.subscription_path(project,subscription_id)subscription=subscriber.subscribe(subscription_path,callback)try:subscription.result(timeout=timeout)exceptconcurrent.futures.TimeoutError:print("No event received before the timeout. Please verify that the ""subscription provided is subscribed to the topic provided.")subscription.close()

Compute categorical numerical statistics

You can compute categorical numerical statistics for the individual histogrambuckets within a BigQuery column, including:

  • Upper bound on value frequency within a given bucket
  • Lower bound on value frequency within a given bucket
  • Size of a given bucket
  • A sample of value frequencies within a given bucket (maximum 20)

To calculate these values, you configure aDlpJob,setting theCategoricalStatsConfigprivacy metric to the name of the column to scan. When you run thejob,Sensitive Data Protection computes statistics for the given column, returningits results in theCategoricalStatsResultobject.

Note: You can apply theCategoricalStatsConfig to all column types, exceptfor arrays and structs. If your column types consist solely of the typesspecified inCompute numerical statistics, it may be moreinformative to scan for that metric.

Code examples

Following is sample code in several languages that demonstrates how touse Sensitive Data Protection to calculate categorical statistics.

Important: The code on this page requires that you first set up a Sensitive Data Protection client. For more information about installing and creating a Sensitive Data Protection client, seeSensitive Data Protection client libraries. (Sending JSON to Sensitive Data Protection REST endpoints does not require a client library.)

C#

To learn how to install and use the client library for Sensitive Data Protection, seeSensitive Data Protection client libraries.

To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, seeSet up authentication for a local development environment.

usingGoogle.Api.Gax.ResourceNames;usingGoogle.Cloud.Dlp.V2;usingGoogle.Cloud.PubSub.V1;usingNewtonsoft.Json;usingSystem;usingSystem.Collections.Generic;usingSystem.Linq;usingSystem.Threading;usingSystem.Threading.Tasks;usingstaticGoogle.Cloud.Dlp.V2.Action.Types;usingstaticGoogle.Cloud.Dlp.V2.PrivacyMetric.Types;publicclassRiskAnalysisCreateCategoricalStats{publicstaticAnalyzeDataSourceRiskDetails.Types.CategoricalStatsResultCategoricalStats(stringcallingProjectId,stringtableProjectId,stringdatasetId,stringtableId,stringtopicId,stringsubscriptionId,stringcolumnName){vardlp=DlpServiceClient.Create();// Construct + submit the jobvarconfig=newRiskAnalysisJobConfig{PrivacyMetric=newPrivacyMetric{CategoricalStatsConfig=newCategoricalStatsConfig(){Field=newFieldId{Name=columnName}}},SourceTable=newBigQueryTable{ProjectId=tableProjectId,DatasetId=datasetId,TableId=tableId},Actions={newGoogle.Cloud.Dlp.V2.Action{PubSub=newPublishToPubSub{Topic=$"projects/{callingProjectId}/topics/{topicId}"}}}};varsubmittedJob=dlp.CreateDlpJob(newCreateDlpJobRequest{ParentAsProjectName=newProjectName(callingProjectId),RiskJob=config});// Listen to pub/sub for the jobvarsubscriptionName=newSubscriptionName(callingProjectId,subscriptionId);varsubscriber=SubscriberClient.CreateAsync(subscriptionName).Result;// SimpleSubscriber runs your message handle function on multiple// threads to maximize throughput.vardone=newManualResetEventSlim(false);subscriber.StartAsync((PubsubMessagemessage,CancellationTokencancel)=>{if(message.Attributes["DlpJobName"]==submittedJob.Name){Thread.Sleep(500);// Wait for DLP API results to become consistentdone.Set();returnTask.FromResult(SubscriberClient.Reply.Ack);}else{returnTask.FromResult(SubscriberClient.Reply.Nack);}});done.Wait(TimeSpan.FromMinutes(10));// 10 minute timeout; may not work for large jobssubscriber.StopAsync(CancellationToken.None).Wait();// Process resultsvarresultJob=dlp.GetDlpJob(newGetDlpJobRequest{DlpJobName=DlpJobName.Parse(submittedJob.Name)});varresult=resultJob.RiskDetails.CategoricalStatsResult;for(varbucketIdx=0;bucketIdx <result.ValueFrequencyHistogramBuckets.Count;bucketIdx++){varbucket=result.ValueFrequencyHistogramBuckets[bucketIdx];Console.WriteLine($"Bucket {bucketIdx}");Console.WriteLine($"  Most common value occurs {bucket.ValueFrequencyUpperBound} time(s).");Console.WriteLine($"  Least common value occurs {bucket.ValueFrequencyLowerBound} time(s).");Console.WriteLine($"  {bucket.BucketSize} unique value(s) total.");foreach(varbucketValueinbucket.BucketValues){// 'UnpackValue(x)' is a prettier version of 'x.toString()'Console.WriteLine($"  Value {UnpackValue(bucketValue.Value)} occurs {bucketValue.Count} time(s).");}}returnresult;}publicstaticstringUnpackValue(ValueprotoValue){varjsonValue=JsonConvert.DeserializeObject<Dictionary<string,object>>(protoValue.ToString());returnjsonValue.Values.ElementAt(0).ToString();}}

Go

To learn how to install and use the client library for Sensitive Data Protection, seeSensitive Data Protection client libraries.

To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, seeSet up authentication for a local development environment.

import("context""fmt""io""time"dlp"cloud.google.com/go/dlp/apiv2""cloud.google.com/go/dlp/apiv2/dlppb""cloud.google.com/go/pubsub")// riskCategorical computes the categorical risk of the given data.funcriskCategorical(wio.Writer,projectID,dataProject,pubSubTopic,pubSubSub,datasetID,tableID,columnNamestring)error{// projectID := "my-project-id"// dataProject := "bigquery-public-data"// pubSubTopic := "dlp-risk-sample-topic"// pubSubSub := "dlp-risk-sample-sub"// datasetID := "nhtsa_traffic_fatalities"// tableID := "accident_2015"// columnName := "state_number"ctx:=context.Background()client,err:=dlp.NewClient(ctx)iferr!=nil{returnfmt.Errorf("dlp.NewClient: %w",err)}// Create a PubSub Client used to listen for when the inspect job finishes.pubsubClient,err:=pubsub.NewClient(ctx,projectID)iferr!=nil{returnerr}deferpubsubClient.Close()// Create a PubSub subscription we can use to listen for messages.// Create the Topic if it doesn't exist.t:=pubsubClient.Topic(pubSubTopic)topicExists,err:=t.Exists(ctx)iferr!=nil{returnerr}if!topicExists{ift,err=pubsubClient.CreateTopic(ctx,pubSubTopic);err!=nil{returnerr}}// Create the Subscription if it doesn't exist.s:=pubsubClient.Subscription(pubSubSub)subExists,err:=s.Exists(ctx)iferr!=nil{returnerr}if!subExists{ifs,err=pubsubClient.CreateSubscription(ctx,pubSubSub,pubsub.SubscriptionConfig{Topic:t});err!=nil{returnerr}}// topic is the PubSub topic string where messages should be sent.topic:="projects/"+projectID+"/topics/"+pubSubTopic// Create a configured request.req:=&dlppb.CreateDlpJobRequest{Parent:fmt.Sprintf("projects/%s/locations/global",projectID),Job:&dlppb.CreateDlpJobRequest_RiskJob{RiskJob:&dlppb.RiskAnalysisJobConfig{// PrivacyMetric configures what to compute.PrivacyMetric:&dlppb.PrivacyMetric{Type:&dlppb.PrivacyMetric_CategoricalStatsConfig_{CategoricalStatsConfig:&dlppb.PrivacyMetric_CategoricalStatsConfig{Field:&dlppb.FieldId{Name:columnName,},},},},// SourceTable describes where to find the data.SourceTable:&dlppb.BigQueryTable{ProjectId:dataProject,DatasetId:datasetID,TableId:tableID,},// Send a message to PubSub using Actions.Actions:[]*dlppb.Action{{Action:&dlppb.Action_PubSub{PubSub:&dlppb.Action_PublishToPubSub{Topic:topic,},},},},},},}// Create the risk job.j,err:=client.CreateDlpJob(ctx,req)iferr!=nil{returnfmt.Errorf("CreateDlpJob: %w",err)}fmt.Fprintf(w,"Created job: %v\n",j.GetName())// Wait for the risk job to finish by waiting for a PubSub message.// This only waits for 10 minutes. For long jobs, consider using a truly// asynchronous execution model such as Cloud Functions.ctx,cancel:=context.WithTimeout(ctx,10*time.Minute)defercancel()err=s.Receive(ctx,func(ctxcontext.Context,msg*pubsub.Message){// If this is the wrong job, do not process the result.ifmsg.Attributes["DlpJobName"]!=j.GetName(){msg.Nack()return}msg.Ack()time.Sleep(500*time.Millisecond)resp,err:=client.GetDlpJob(ctx,&dlppb.GetDlpJobRequest{Name:j.GetName(),})iferr!=nil{fmt.Fprintf(w,"GetDlpJob: %v",err)return}h:=resp.GetRiskDetails().GetCategoricalStatsResult().GetValueFrequencyHistogramBuckets()fori,b:=rangeh{fmt.Fprintf(w,"Histogram bucket %v\n",i)fmt.Fprintf(w,"  Most common value occurs %v times\n",b.GetValueFrequencyUpperBound())fmt.Fprintf(w,"  Least common value occurs %v times\n",b.GetValueFrequencyLowerBound())fmt.Fprintf(w,"  %v unique values total\n",b.GetBucketSize())for_,v:=rangeb.GetBucketValues(){fmt.Fprintf(w,"    Value %v occurs %v times\n",v.GetValue(),v.GetCount())}}// Stop listening for more messages.cancel()})iferr!=nil{returnfmt.Errorf("Receive: %w",err)}returnnil}

Java

To learn how to install and use the client library for Sensitive Data Protection, seeSensitive Data Protection client libraries.

To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, seeSet up authentication for a local development environment.

importcom.google.api.core.SettableApiFuture;importcom.google.cloud.dlp.v2.DlpServiceClient;importcom.google.cloud.pubsub.v1.AckReplyConsumer;importcom.google.cloud.pubsub.v1.MessageReceiver;importcom.google.cloud.pubsub.v1.Subscriber;importcom.google.privacy.dlp.v2.Action;importcom.google.privacy.dlp.v2.Action.PublishToPubSub;importcom.google.privacy.dlp.v2.AnalyzeDataSourceRiskDetails.CategoricalStatsResult;importcom.google.privacy.dlp.v2.AnalyzeDataSourceRiskDetails.CategoricalStatsResult.CategoricalStatsHistogramBucket;importcom.google.privacy.dlp.v2.BigQueryTable;importcom.google.privacy.dlp.v2.CreateDlpJobRequest;importcom.google.privacy.dlp.v2.DlpJob;importcom.google.privacy.dlp.v2.FieldId;importcom.google.privacy.dlp.v2.GetDlpJobRequest;importcom.google.privacy.dlp.v2.LocationName;importcom.google.privacy.dlp.v2.PrivacyMetric;importcom.google.privacy.dlp.v2.PrivacyMetric.CategoricalStatsConfig;importcom.google.privacy.dlp.v2.RiskAnalysisJobConfig;importcom.google.privacy.dlp.v2.ValueFrequency;importcom.google.pubsub.v1.ProjectSubscriptionName;importcom.google.pubsub.v1.ProjectTopicName;importcom.google.pubsub.v1.PubsubMessage;importjava.io.IOException;importjava.util.List;importjava.util.concurrent.ExecutionException;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.TimeoutException;classRiskAnalysisCategoricalStats{publicstaticvoidmain(String[]args)throwsException{// TODO(developer): Replace these variables before running the sample.StringprojectId="your-project-id";StringdatasetId="your-bigquery-dataset-id";StringtableId="your-bigquery-table-id";StringtopicId="pub-sub-topic";StringsubscriptionId="pub-sub-subscription";categoricalStatsAnalysis(projectId,datasetId,tableId,topicId,subscriptionId);}publicstaticvoidcategoricalStatsAnalysis(StringprojectId,StringdatasetId,StringtableId,StringtopicId,StringsubscriptionId)throwsExecutionException,InterruptedException,IOException{// Initialize client that will be used to send requests. This client only needs to be created// once, and can be reused for multiple requests. After completing all of your requests, call// the "close" method on the client to safely clean up any remaining background resources.try(DlpServiceClientdlpServiceClient=DlpServiceClient.create()){// Specify the BigQuery table to analyzeBigQueryTablebigQueryTable=BigQueryTable.newBuilder().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId).build();// The name of the column to analyze, which doesn't need to contain numerical dataStringcolumnName="Mystery";// Configure the privacy metric for the jobFieldIdfieldId=FieldId.newBuilder().setName(columnName).build();CategoricalStatsConfigcategoricalStatsConfig=CategoricalStatsConfig.newBuilder().setField(fieldId).build();PrivacyMetricprivacyMetric=PrivacyMetric.newBuilder().setCategoricalStatsConfig(categoricalStatsConfig).build();// Create action to publish job status notifications over Google Cloud Pub/SubProjectTopicNametopicName=ProjectTopicName.of(projectId,topicId);PublishToPubSubpublishToPubSub=PublishToPubSub.newBuilder().setTopic(topicName.toString()).build();Actionaction=Action.newBuilder().setPubSub(publishToPubSub).build();// Configure the risk analysis job to performRiskAnalysisJobConfigriskAnalysisJobConfig=RiskAnalysisJobConfig.newBuilder().setSourceTable(bigQueryTable).setPrivacyMetric(privacyMetric).addActions(action).build();// Build the job creation request to be sent by the clientCreateDlpJobRequestcreateDlpJobRequest=CreateDlpJobRequest.newBuilder().setParent(LocationName.of(projectId,"global").toString()).setRiskJob(riskAnalysisJobConfig).build();// Send the request to the API using the clientDlpJobdlpJob=dlpServiceClient.createDlpJob(createDlpJobRequest);// Set up a Pub/Sub subscriber to listen on the job completion statusfinalSettableApiFuture<Boolean>done=SettableApiFuture.create();ProjectSubscriptionNamesubscriptionName=ProjectSubscriptionName.of(projectId,subscriptionId);MessageReceivermessageHandler=(PubsubMessagepubsubMessage,AckReplyConsumerackReplyConsumer)->{handleMessage(dlpJob,done,pubsubMessage,ackReplyConsumer);};Subscribersubscriber=Subscriber.newBuilder(subscriptionName,messageHandler).build();subscriber.startAsync();// Wait for job completion semi-synchronously// For long jobs, consider using a truly asynchronous execution model such as Cloud Functionstry{done.get(15,TimeUnit.MINUTES);}catch(TimeoutExceptione){System.out.println("Job was not completed after 15 minutes.");return;}finally{subscriber.stopAsync();subscriber.awaitTerminated();}// Build a request to get the completed jobGetDlpJobRequestgetDlpJobRequest=GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();// Retrieve completed job statusDlpJobcompletedJob=dlpServiceClient.getDlpJob(getDlpJobRequest);System.out.println("Job status: "+completedJob.getState());System.out.println("Job name: "+dlpJob.getName());// Get the result and parse through and process the informationCategoricalStatsResultresult=completedJob.getRiskDetails().getCategoricalStatsResult();List<CategoricalStatsHistogramBucket>histogramBucketList=result.getValueFrequencyHistogramBucketsList();for(CategoricalStatsHistogramBucketbucket:histogramBucketList){longmostCommonFrequency=bucket.getValueFrequencyUpperBound();System.out.printf("Most common value occurs %d time(s).\n",mostCommonFrequency);longleastCommonFrequency=bucket.getValueFrequencyLowerBound();System.out.printf("Least common value occurs %d time(s).\n",leastCommonFrequency);for(ValueFrequencyvalueFrequency:bucket.getBucketValuesList()){System.out.printf("Value %s occurs %d time(s).\n",valueFrequency.getValue().toString(),valueFrequency.getCount());}}}}// handleMessage injects the job and settableFuture into the message reciever interfaceprivatestaticvoidhandleMessage(DlpJobjob,SettableApiFuture<Boolean>done,PubsubMessagepubsubMessage,AckReplyConsumerackReplyConsumer){StringmessageAttribute=pubsubMessage.getAttributesMap().get("DlpJobName");if(job.getName().equals(messageAttribute)){done.set(true);ackReplyConsumer.ack();}else{ackReplyConsumer.nack();}}}

Node.js

To learn how to install and use the client library for Sensitive Data Protection, seeSensitive Data Protection client libraries.

To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, seeSet up authentication for a local development environment.

// Import the Google Cloud client librariesconstDLP=require('@google-cloud/dlp');const{PubSub}=require('@google-cloud/pubsub');// Instantiates clientsconstdlp=newDLP.DlpServiceClient();constpubsub=newPubSub();// The project ID to run the API call under// const projectId = 'my-project';// The project ID the table is stored under// This may or (for public datasets) may not equal the calling project ID// const tableProjectId = 'my-project';// The ID of the dataset to inspect, e.g. 'my_dataset'// const datasetId = 'my_dataset';// The ID of the table to inspect, e.g. 'my_table'// const tableId = 'my_table';// The name of the Pub/Sub topic to notify once the job completes// TODO(developer): create a Pub/Sub topic to use for this// const topicId = 'MY-PUBSUB-TOPIC'// The name of the Pub/Sub subscription to use when listening for job// completion notifications// TODO(developer): create a Pub/Sub subscription to use for this// const subscriptionId = 'MY-PUBSUB-SUBSCRIPTION'// The name of the column to compute risk metrics for, e.g. 'firstName'// const columnName = 'firstName';asyncfunctioncategoricalRiskAnalysis(){constsourceTable={projectId:tableProjectId,datasetId:datasetId,tableId:tableId,};// Construct request for creating a risk analysis jobconstrequest={parent:`projects/${projectId}/locations/global`,riskJob:{privacyMetric:{categoricalStatsConfig:{field:{name:columnName,},},},sourceTable:sourceTable,actions:[{pubSub:{topic:`projects/${projectId}/topics/${topicId}`,},},],},};// Create helper function for unpacking valuesconstgetValue=obj=>obj[Object.keys(obj)[0]];// Run risk analysis jobconst[topicResponse]=awaitpubsub.topic(topicId).get();constsubscription=awaittopicResponse.subscription(subscriptionId);const[jobsResponse]=awaitdlp.createDlpJob(request);constjobName=jobsResponse.name;console.log(`Job created. Job name:${jobName}`);// Watch the Pub/Sub topic until the DLP job finishesawaitnewPromise((resolve,reject)=>{constmessageHandler=message=>{if(message.attributes &&message.attributes.DlpJobName===jobName){message.ack();subscription.removeListener('message',messageHandler);subscription.removeListener('error',errorHandler);resolve(jobName);}else{message.nack();}};consterrorHandler=err=>{subscription.removeListener('message',messageHandler);subscription.removeListener('error',errorHandler);reject(err);};subscription.on('message',messageHandler);subscription.on('error',errorHandler);});setTimeout(()=>{console.log(' Waiting for DLP job to fully complete');},500);const[job]=awaitdlp.getDlpJob({name:jobName});consthistogramBuckets=job.riskDetails.categoricalStatsResult.valueFrequencyHistogramBuckets;histogramBuckets.forEach((histogramBucket,histogramBucketIdx)=>{console.log(`Bucket${histogramBucketIdx}:`);// Print bucket statsconsole.log(`  Most common value occurs${histogramBucket.valueFrequencyUpperBound} time(s)`);console.log(`  Least common value occurs${histogramBucket.valueFrequencyLowerBound} time(s)`);// Print bucket valuesconsole.log(`${histogramBucket.bucketSize} unique values total.`);histogramBucket.bucketValues.forEach(valueBucket=>{console.log(`  Value${getValue(valueBucket.value)} occurs${valueBucket.count} time(s).`);});});}awaitcategoricalRiskAnalysis();

PHP

To learn how to install and use the client library for Sensitive Data Protection, seeSensitive Data Protection client libraries.

To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, seeSet up authentication for a local development environment.

use Google\Cloud\Dlp\V2\Action;use Google\Cloud\Dlp\V2\Action\PublishToPubSub;use Google\Cloud\Dlp\V2\BigQueryTable;use Google\Cloud\Dlp\V2\Client\DlpServiceClient;use Google\Cloud\Dlp\V2\CreateDlpJobRequest;use Google\Cloud\Dlp\V2\DlpJob\JobState;use Google\Cloud\Dlp\V2\FieldId;use Google\Cloud\Dlp\V2\GetDlpJobRequest;use Google\Cloud\Dlp\V2\PrivacyMetric;use Google\Cloud\Dlp\V2\PrivacyMetric\CategoricalStatsConfig;use Google\Cloud\Dlp\V2\RiskAnalysisJobConfig;use Google\Cloud\PubSub\PubSubClient;/** * Computes risk metrics of a column of data in a Google BigQuery table. * * @param string $callingProjectId The project ID to run the API call under * @param string $dataProjectId    The project ID containing the target Datastore * @param string $topicId          The name of the Pub/Sub topic to notify once the job completes * @param string $subscriptionId   The name of the Pub/Sub subscription to use when listening for job * @param string $datasetId        The ID of the dataset to inspect * @param string $tableId          The ID of the table to inspect * @param string $columnName       The name of the column to compute risk metrics for, e.g. "age" */function categorical_stats(    string $callingProjectId,    string $dataProjectId,    string $topicId,    string $subscriptionId,    string $datasetId,    string $tableId,    string $columnName): void {    // Instantiate a client.    $dlp = new DlpServiceClient();    $pubsub = new PubSubClient();    $topic = $pubsub->topic($topicId);    // Construct risk analysis config    $columnField = (new FieldId())        ->setName($columnName);    $statsConfig = (new CategoricalStatsConfig())        ->setField($columnField);    $privacyMetric = (new PrivacyMetric())        ->setCategoricalStatsConfig($statsConfig);    // Construct items to be analyzed    $bigqueryTable = (new BigQueryTable())        ->setProjectId($dataProjectId)        ->setDatasetId($datasetId)        ->setTableId($tableId);    // Construct the action to run when job completes    $pubSubAction = (new PublishToPubSub())        ->setTopic($topic->name());    $action = (new Action())        ->setPubSub($pubSubAction);    // Construct risk analysis job config to run    $riskJob = (new RiskAnalysisJobConfig())        ->setPrivacyMetric($privacyMetric)        ->setSourceTable($bigqueryTable)        ->setActions([$action]);    // Submit request    $parent = "projects/$callingProjectId/locations/global";    $createDlpJobRequest = (new CreateDlpJobRequest())        ->setParent($parent)        ->setRiskJob($riskJob);    $job = $dlp->createDlpJob($createDlpJobRequest);    // Listen for job notifications via an existing topic/subscription.    $subscription = $topic->subscription($subscriptionId);    // Poll Pub/Sub using exponential backoff until job finishes    // Consider using an asynchronous execution model such as Cloud Functions    $attempt = 1;    $startTime = time();    do {        foreach ($subscription->pull() as $message) {            if (                isset($message->attributes()['DlpJobName'])&&                $message->attributes()['DlpJobName'] === $job->getName()            ) {                $subscription->acknowledge($message);                // Get the updated job. Loop to avoid race condition with DLP API.                do {                    $getDlpJobRequest = (new GetDlpJobRequest())                        ->setName($job->getName());                    $job = $dlp->getDlpJob($getDlpJobRequest);                } while ($job->getState() == JobState::RUNNING);                break 2; // break from parent do while            }        }        print('Waiting for job to complete' . PHP_EOL);        // Exponential backoff with max delay of 60 seconds        sleep(min(60, pow(2, ++$attempt)));    } while (time() - $startTime < 600); // 10 minute timeout    // Print finding counts    printf('Job %s status: %s' . PHP_EOL, $job->getName(), JobState::name($job->getState()));    switch ($job->getState()) {        case JobState::DONE:            $histBuckets = $job->getRiskDetails()->getCategoricalStatsResult()->getValueFrequencyHistogramBuckets();            foreach ($histBuckets as $bucketIndex => $histBucket) {                // Print bucket stats                printf('Bucket %s:' . PHP_EOL, $bucketIndex);                printf('  Most common value occurs %s time(s)' . PHP_EOL, $histBucket->getValueFrequencyUpperBound());                printf('  Least common value occurs %s time(s)' . PHP_EOL, $histBucket->getValueFrequencyLowerBound());                printf('  %s unique value(s) total.', $histBucket->getBucketSize());                // Print bucket values                foreach ($histBucket->getBucketValues() as $percent => $quantile) {                    printf(                        '  Value %s occurs %s time(s).' . PHP_EOL,                        $quantile->getValue()->serializeToJsonString(),                        $quantile->getCount()                    );                }            }            break;        case JobState::FAILED:            $errors = $job->getErrors();            printf('Job %s had errors:' . PHP_EOL, $job->getName());            foreach ($errors as $error) {                var_dump($error->getDetails());            }            break;        case JobState::PENDING:            print('Job has not completed. Consider a longer timeout or an asynchronous execution model' . PHP_EOL);            break;        default:            print('Unexpected job state.');    }}

Python

To learn how to install and use the client library for Sensitive Data Protection, seeSensitive Data Protection client libraries.

To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, seeSet up authentication for a local development environment.

importconcurrent.futuresimportgoogle.cloud.dlpimportgoogle.cloud.pubsubdefcategorical_risk_analysis(project:str,table_project_id:str,dataset_id:str,table_id:str,column_name:str,topic_id:str,subscription_id:str,timeout:int=300,)->None:"""Uses the Data Loss Prevention API to compute risk metrics of a column       of categorical data in a Google BigQuery table.    Args:        project: The Google Cloud project id to use as a parent resource.        table_project_id: The Google Cloud project id where the BigQuery table            is stored.        dataset_id: The id of the dataset to inspect.        table_id: The id of the table to inspect.        column_name: The name of the column to compute risk metrics for.        topic_id: The name of the Pub/Sub topic to notify once the job            completes.        subscription_id: The name of the Pub/Sub subscription to use when            listening for job completion notifications.        timeout: The number of seconds to wait for a response from the API.    Returns:        None; the response from the API is printed to the terminal.    """# Instantiate a client.dlp=google.cloud.dlp_v2.DlpServiceClient()# Convert the project id into full resource ids.topic=google.cloud.pubsub.PublisherClient.topic_path(project,topic_id)parent=f"projects/{project}/locations/global"# Location info of the BigQuery table.source_table={"project_id":table_project_id,"dataset_id":dataset_id,"table_id":table_id,}# Tell the API where to send a notification when the job is complete.actions=[{"pub_sub":{"topic":topic}}]# Configure risk analysis job# Give the name of the numeric column to compute risk metrics forrisk_job={"privacy_metric":{"categorical_stats_config":{"field":{"name":column_name}}},"source_table":source_table,"actions":actions,}# Call API to start risk analysis joboperation=dlp.create_dlp_job(request={"parent":parent,"risk_job":risk_job})defcallback(message:google.cloud.pubsub_v1.subscriber.message.Message)->None:ifmessage.attributes["DlpJobName"]==operation.name:# This is the message we're looking for, so acknowledge it.message.ack()# Now that the job is done, fetch the results and print them.job=dlp.get_dlp_job(request={"name":operation.name})print(f"Job name:{job.name}")histogram_buckets=(job.risk_details.categorical_stats_result.value_frequency_histogram_buckets# noqa: E501)# Print bucket statsfori,bucketinenumerate(histogram_buckets):print(f"Bucket{i}:")print("   Most common value occurs{} time(s)".format(bucket.value_frequency_upper_bound))print("   Least common value occurs{} time(s)".format(bucket.value_frequency_lower_bound))print(f"{bucket.bucket_size} unique values total.")forvalueinbucket.bucket_values:print("   Value{} occurs{} time(s)".format(value.value.integer_value,value.count))subscription.set_result(None)else:# This is not the message we're looking for.message.drop()# Create a Pub/Sub client and find the subscription. The subscription is# expected to already be listening to the topic.subscriber=google.cloud.pubsub.SubscriberClient()subscription_path=subscriber.subscription_path(project,subscription_id)subscription=subscriber.subscribe(subscription_path,callback)try:subscription.result(timeout=timeout)exceptconcurrent.futures.TimeoutError:print("No event received before the timeout. Please verify that the ""subscription provided is subscribed to the topic provided.")subscription.close()

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2026-02-19 UTC.