Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings
This repository was archived by the owner on Oct 16, 2025. It is now read-only.

Multi Cloud Data Tokenization Solution By Using Dataflow and Cloud DLP

License

NotificationsYou must be signed in to change notification settings

GoogleCloudPlatform/dlp-dataflow-deidentification

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

728 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

This repo contains a reference implementation for an end-to-end data tokenization solution. This solution does the following:

  • Passes data through Dataflow pipelines that perform inspection, de-identification, and re-identification through the Cloud Data Loss Prevention API (DLP API).
  • Migrates the processed data to BigQuery.

Table of Contents

Architecture

This solution comprises the following pipelines. To view the job graphs of these pipelines, seeDataflow DAG.

  1. Inspection and de-identification
  2. Re-identification

Inspection and de-identification

Reference architecture

You can use this pipeline for Avro, CSV, JSONL, ORC, Parquet and TSV files stored or ingested in Cloud Storage or an Amazon S3 bucket. This pipeline uses the State and Timer APIs to batch and process the files optimally.The results of the inspection and de-identification processes are written in a BigQuery table.

Re-identification

Reference architecture

The re-identification pipeline is used to read data from a BigQuery table and publish the re-identified data to a secure Pub/Sub topic.

Concepts

The following resources describe key concepts related to the DLP API and Dataflow. The DLP API is part ofSensitive Data Protection.

  1. Using Sensitive Data Protection with BigQuery
  2. De-identification and re-identification of PII in large-scale datasets using Sensitive Data Protection
  3. Sensitive Data Protection templates
  4. Data masking/tokenization from Cloud Storage to BigQuery using a template
  5. Inspect Google Cloud storage and databases for sensitive data
  6. Dataflow pipeline options
  7. Sensitive Data Protection quotas and limits

Costs

This tutorial uses billable components of Google Cloud, including the following:

Use thepricing calculator to generate a cost estimate based on yourprojected usage.

Tutorial

Prerequisites

  1. Create a Google Cloud project.

  2. Make sure thatbilling is enabled for your GoogleCloud project.

  3. Use the link below to open Cloud Shell.

    Open in Cloud Shell

  4. Run the following commands to set up the data tokenization solution in your Google Cloud project:

    gcloud config set project <project_id>sh setup-data-tokeninzation-solution-v2.sh

    Thesetup-data-tokenization-solution-v2.sh script performs the following tasks:

    • Creates a bucket ({project-id}-demo-data) in the us-central1 region anduploads a sample dataset withmock PII data.

    • Creates a BigQuery dataset (demo_dataset) in the US multi-region to store the tokenized data.

    • Creates aKMS-wrapped key (KEK) by creating an automaticTEK (token encryption key).

    • Createsinspection, de-identification, and re-identification templates with the KEK andcrypto-based transformations.

    • Creates a service account (with a custom role) for running the DLP API pipeline.

    • Enables Pub/Sub and creates a Pub/Sub topic that will be notified whenever a new file is added in the Cloud Storage bucket.

    • Emits aset_env.sh script, which you can use to set temporary environment variables while triggering the DLP API pipelines.

  5. Run set_env.sh:

    source set_env.sh

Compile the code

./gradlew build

Run the samples

Inspection

You can run the inspection pipeline to do one or both of the following:

  • Inspect only existing files in the Cloud Storage bucket.
  • Inspect only new files added to the Cloud Storage bucket after the pipeline is triggered.
Inspect new and existing files

To trigger a streaming inspection Dataflow pipeline that processes all the CSV files in theDATA_STORAGE_BUCKET bucket (specified in thefilePattern parameter), run the following command:

./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 \-Pargs=" --region=${REGION} \--project=${PROJECT_ID} \--streaming --enableStreamingEngine \--tempLocation=gs://${DATA_STORAGE_BUCKET}/temp \--numWorkers=1 --maxNumWorkers=2 \--runner=DataflowRunner \--filePattern=gs://${DATA_STORAGE_BUCKET}/*.csv \--dataset=${BQ_DATASET_NAME}   \--inspectTemplateName=${INSPECT_TEMPLATE_NAME} \--deidentifyTemplateName=${DEID_TEMPLATE_NAME} \--batchSize=200000 \--DLPMethod=INSPECT \--serviceAccount=${SERVICE_ACCOUNT_EMAIL} \--gcsNotificationTopic=projects/${PROJECT_ID}/topics/${GCS_NOTIFICATION_TOPIC}"
Inspect only existing files

To trigger a batch inspection Dataflow pipeline that processes only the CSV files that are currently present in theDATA_STORAGE_BUCKET bucket (specified in thefilePattern parameter), run the following command:

./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 \-Pargs=" --region=${REGION} \--project=${PROJECT_ID} \--tempLocation=gs://${DATA_STORAGE_BUCKET}/temp \--numWorkers=1 --maxNumWorkers=2 \--runner=DataflowRunner \--filePattern=gs://${DATA_STORAGE_BUCKET}/*.csv \--dataset=${BQ_DATASET_NAME}   \--inspectTemplateName=${INSPECT_TEMPLATE_NAME} \--deidentifyTemplateName=${DEID_TEMPLATE_NAME} \--batchSize=200000 \--DLPMethod=INSPECT \--serviceAccount=${SERVICE_ACCOUNT_EMAIL}"

Any files that are added to the bucket after the pipeline was triggered are not inspected.

Inspect only new files

To trigger a streaming inspection Dataflow pipeline that processes new CSV files that are added to theDATA_STORAGE_BUCKET bucket (specified in thefilePattern parameter), run the following command:

./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 \-Pargs=" --region=${REGION} \--project=${PROJECT_ID} \--streaming --enableStreamingEngine \--tempLocation=gs://${DATA_STORAGE_BUCKET}/temp \--numWorkers=1 --maxNumWorkers=2 \--runner=DataflowRunner \--filePattern=gs://${DATA_STORAGE_BUCKET}/*.csv \--dataset=${BQ_DATASET_NAME}   \--inspectTemplateName=${INSPECT_TEMPLATE_NAME} \--deidentifyTemplateName=${DEID_TEMPLATE_NAME} \--batchSize=200000 \--DLPMethod=INSPECT \--serviceAccount=${SERVICE_ACCOUNT_EMAIL} \--gcsNotificationTopic=projects/${PROJECT_ID}/topics/${GCS_NOTIFICATION_TOPIC}--processExistingFiles=false"

Any files that were added to the bucket before the pipeline was triggered are not inspected.

Inspection results

You can find the inspection results in the BigQuery dataset specified in thedataset parameter.

If--processExistingFiles is set tofalse and--gcsNotificationTopic is not provided, then the pipeline fails with an error similar to the following:

Exception in thread "main" java.lang.IllegalArgumentException: Either --processExistingFiles should be set to true or --gcsNotificationTopic should be provided        at com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2.runInspectAndDeidPipeline(DLPTextToBigQueryStreamingV2.java:113)        at com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2.run(DLPTextToBigQueryStreamingV2.java:93)        at com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2.main(DLPTextToBigQueryStreamingV2.java:84)

For information about the default values of the pipeline parameters, seePipeline parameters on this page.

De-identification

You can run the de-identification pipeline to do one or both of the following:

  • De-identify only existing files in the Cloud Storage bucket.
  • De-identify only new files added to the Cloud Storage bucket after the pipeline is triggered.
De-identify new and existing files

To trigger a streaming de-identification Dataflow pipeline that processes all the CSV files in theDATA_STORAGE_BUCKET bucket (specified in thefilePattern parameter), run the following command:

./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 \-Pargs=" --region=${REGION} \--project=${PROJECT_ID} \--streaming --enableStreamingEngine \--tempLocation=gs://${DATA_STORAGE_BUCKET}/temp \--numWorkers=2 --maxNumWorkers=3 \--runner=DataflowRunner \--filePattern=gs://${DATA_STORAGE_BUCKET}/*.csv \--dataset=${BQ_DATASET_NAME}   \--inspectTemplateName=${INSPECT_TEMPLATE_NAME} \--deidentifyTemplateName=${DEID_TEMPLATE_NAME} \--batchSize=200000 \--DLPMethod=DEID \--serviceAccount=${SERVICE_ACCOUNT_EMAIL} \--gcsNotificationTopic=projects/${PROJECT_ID}/topics/${GCS_NOTIFICATION_TOPIC}"
De-identify only existing files

To trigger a batch de-identification Dataflow pipeline that processes only the CSV files that are currently present in theDATA_STORAGE_BUCKET bucket (specified in thefilePattern parameter), run the following command:

./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 \-Pargs=" --region=${REGION} \--project=${PROJECT_ID} \--tempLocation=gs://${DATA_STORAGE_BUCKET}/temp \--numWorkers=1 --maxNumWorkers=2 \--runner=DataflowRunner \--filePattern=gs://${DATA_STORAGE_BUCKET}/*.csv \--dataset=${BQ_DATASET_NAME}   \--inspectTemplateName=${INSPECT_TEMPLATE_NAME} \--deidentifyTemplateName=${DEID_TEMPLATE_NAME} \--batchSize=200000 \--DLPMethod=DEID \--serviceAccount=${SERVICE_ACCOUNT_EMAIL}"

Any files that are added to the bucket after the pipeline was triggered are not de-identified.

De-identify only new files

To trigger a streaming de-identification Dataflow pipeline that processes new CSV files that are added to theDATA_STORAGE_BUCKET bucket (specified in thefilePattern parameter), run the following command:

./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 \-Pargs=" --region=${REGION} \--project=${PROJECT_ID} \--streaming --enableStreamingEngine \--tempLocation=gs://${DATA_STORAGE_BUCKET}/temp \--numWorkers=1 --maxNumWorkers=2 \--runner=DataflowRunner \--filePattern=gs://${DATA_STORAGE_BUCKET}/*.csv \--dataset=${BQ_DATASET_NAME}   \--inspectTemplateName=${INSPECT_TEMPLATE_NAME} \--deidentifyTemplateName=${DEID_TEMPLATE_NAME} \--batchSize=200000 \--DLPMethod=INSPECT \--serviceAccount=${SERVICE_ACCOUNT_EMAIL} \--gcsNotificationTopic=projects/${PROJECT_ID}/topics/${GCS_NOTIFICATION_TOPIC} \--processExistingFiles=false"

Any files that were added to the bucket before the pipeline was triggered are not de-identified.

De-identification results

You can find the de-identification findings in the BigQuery dataset, specified in thedataset parameter, with the same names as the respective input files.

If--processExistingFiles is set tofalse and--gcsNotificationTopic is not provided, then the pipeline fails with an error similar to the following:

Exception in thread "main" java.lang.IllegalArgumentException: Either --processExistingFiles should be set to true or --gcsNotificationTopic should be provided        at com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2.runInspectAndDeidPipeline(DLPTextToBigQueryStreamingV2.java:113)        at com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2.run(DLPTextToBigQueryStreamingV2.java:93)        at com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2.main(DLPTextToBigQueryStreamingV2.java:84)

For information about the default values of the pipeline parameters, seePipeline parameters on this page.

Validation

You can run some quick validations for a BigQuery table to check the tokenized data. The following steps use a sample datasetto validate de-identified results:

  1. In Cloud Shell, display the header row of the CSV file that you used to create the schema:

    gsutil cp gs://${PROJECT_ID}-demo-data/CCRecords_1564602825.csv - | head -1
  2. In the query editor of SQL workspace, run the following query to compare the schema with the header row of the CSV file:

    SELECT table_name, column_nameFROM `<dataset_id>.INFORMATION_SCHEMA.COLUMNS`WHERE table_name="CCRecords_1564602825"

    There are no spaces in the column names because the pipeline ensures that the column and table names only contain validcharacters according to the BigQuery naming standard.

  3. Validate that the number of rows in the input file and the output table are equal:

    Input records:

    echo $(($(gcloud storage cat gs://${PROJECT_ID}-demo-data/CCRecords_1564602825.csv | wc -l) - 1))

    Output records:

    SELECT COUNT(*) as number_of_rowsFROM `<dataset_id>.CCRecords_1564602825` LIMIT 1
  4. Validate that the bucketing transformation is successfully applied to the JobTitle column:

    SELECT JobTitle, COUNT(*) AS number_of_records_foundFROM `<dataset_id>.CCRecords_1564602825`GROUP BY JobTitle

    In the output, the JobTitle values should be grouped into three generalized buckets: Executive, Engineer, and Manager.

  5. Validate that values in the Age column are grouped into six different buckets from 60 to 20:

    SELECT Age, COUNT(*) AS number_of_records_foundFROM `<dataset_id>.CCRecords_1564602825`GROUP BY AgeORDER BY Age DESC
  6. Validate the masking transformation for the SSN:

    SELECT SSNFROM `<dataset_id>.CCRecords_*`WHERE REGEXP_CONTAINS(SSN, "@*")

    In the output, the first five digits for all SSN entries should be masked.

  7. Validate that the cryptographic transformation used deterministic encryption for thecard_holders_name,card_number andcard_pin entries:

    SELECT Additional_DetailsFROM `<dataset_id>.CCRecords_*`WHERE REGEXP_CONTAINS(Additional_Details, r'(IBAN_CODE+)') or REGEXP_CONTAINS(Additional_Details, r'(EMAIL_ADDRESS+)')or regexp_contains(Additional_Details, r'(PHONE_NUMBER+)')or regexp_contains(Additional_Details, r'(ONLINE_USER_ID+)')

    In the output, sensitive values should be replaced with placeholder values such as[IBAN_CODE],[EMAIL_ADDRESS],[PHONE_NUMBER,] and[ONLINE_USER_ID].

  8. Query the de-identified copies of the dataset for the ID 76901:

    SELECT * FROM `<dataset_id>.CCRecords_1564602825` WHERE ID='76901'
  9. In Cloud Shell, compare the output from the previous step with the original dataset in the CSV file for the ID 76901:

    gsutil cp gs://${PROJECT_ID}-demo-data/CCRecords_1564602825.csv - | awk -F "," '$1 == 76901'

Re-identification from BigQuery

  1. Export a SQL query to read and re-identify data from BigQuery. The sample provided below selects 10 records that match the query.

    export QUERY="select ID,Card_Number,Card_Holders_Name from \`${PROJECT_ID}.${BQ_DATASET_NAME}.CCRecords_1564602828\` where safe_cast(Credit_Limit as int64)>100000 and safe_cast (Age as int64)>50 group by ID,Card_Number,Card_Holders_Name limit 10"
  2. Create a Cloud Storage file with the following query:

    export REID_QUERY_BUCKET=<name>cat << EOF | gsutil cp - gs://${REID_QUERY_BUCKET}/reid_query.sql${QUERY}EOF
  3. Create a Pub/Sub topic. For more information, seecreate a topic.

  4. Run the pipeline by passing the required parameters:

    ./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 -Pargs="--region=${REGION} --project=${PROJECT_ID} --tempLocation=gs://${DATA_STORAGE_BUCKET}/temp --numWorkers=1 --maxNumWorkers=2 --runner=DataflowRunner --tableRef=${PROJECT_ID}:${BQ_DATASET_NAME}.<table> --dataset=${BQ_DATASET_NAME} --topic=projects/${PROJECT_ID}/topics/<topic_name> --autoscalingAlgorithm=THROUGHPUT_BASED --workerMachineType=n1-highmem-4 --deidentifyTemplateName=${REID_TEMPLATE_NAME} --DLPMethod=REID --keyRange=1024 --queryPath=gs://${REID_QUERY_BUCKET}/reid_query.sql --serviceAccount=${SERVICE_ACCOUNT_EMAIL}"

    This command triggers a batch re-identification Dataflow pipeline that processes all the records from the querystored inreid_query.sql. The re-identified results can be found in the BigQuerydataset (dataset parameter) with the name of the input table as the suffix.

Pipeline Parameters

Pipeline OptionDescriptionUsed in Operations
regionSpecifies a regional endpoint for deploying your Dataflow jobs.All
projectThe project ID for your Google Cloud project.All
streamingtrue if streaming pipeline.INSPECT/DEID
enableStreamingEngineSpecifies whether Dataflow Streaming Engine is enabled or disabled.INSPECT/DEID
tempLocationCloud Storage path for temporary files. Must be a valid Cloud Storage URL.All
numWorkers(Optional) The initial number of Compute Engine instances to use when executing your pipeline. This option determines how many workers the Dataflow service starts up when your job begins.All
maxNumWorkers(Optional) The maximum number of Compute Engine instances to be made available to your pipeline during execution. This value can be higher than the initial number of workers (specified by numWorkers) to allow your job to scale up, automatically or otherwise.All
runnerDataflowRunnerAll
inspectTemplateNameDLP API inspect template name.INSPECT/DEID
filePatternThe file pattern that will be used to scan the files to be processed by the job.INSPECT/DEID
deidentifyTemplateNameDLP de-identify template name.All
DLPMethodType of DLP operation to perform. Valid values areINSPECT,DEID, orREID.All
processExistingFilesFiles existing in the Cloud Storage bucket before the pipeline is started will not be processed when the value is set to false. The default value is true.INSPECT/DEID
gcsNotificationTopicPub/Sub topic for notifications of new files added to the Cloud Storage bucket in filePattern. Format: projects/$PROJECT_ID>/topics/$GCS_TOPIC_NAME.INSPECT/DEID
batchSize(Optional) Batch size for the DLP API. The default is 500,000.All
datasetBigQuery dataset to write the inspection or de-identification results to or to read from in case of re-identification.All
recordDelimiter(Optional) Record delimiter.INSPECT/DEID
columnDelimiterColumn delimiter. Only required in case of a custom delimiter.INSPECT/DEID
tableRefBigQuery table to export from in the form<project>:<dataset>.<table>.REID
queryPathQuery file for re-identification.REID
headersDLP table headers. Required for the JSONL file type.INSPECT/DEID
numShardsPerDLPRequestBatching(Optional) Number of shards for DLP request batches. Can be used to control the parallelism of DLP requests. The default value is 100.All
numberOfWorkerHarnessThreads(Optional) The number of threads per each worker harness process.All
dlpApiRetryCount(Optional) Number of retries in case of transient errors in DLP API. The default value is 10.All
initialBackoff(Optional) Initial backoff (in seconds) for retries with exponential backoff. The default is 5s.All
outputBucketGCS path for storing the deidentified filesDEID
DLPParent(Optional) The resource location for DLP templates. Format:projects/<project-id>/locations/<region>. By default,the location will be global.ALL

For more details, seeDataflow Pipeline Options.

Supported file formats

1. CSV

For sample commands for processing CSV files, seeRun the samples.

2. TSV

The pipeline supports the TSV file format which uses tabs as column delimiters. The pipeline options are similar to that of CSV files. The extension of the file name should be .tsv.

./gradlew run ... -Pargs="... --filePattern=gs://<bucket_name>/small_file.tsv"

3. JSONL

The pipeline supports JSONL file format where each line is a valid JSON object and newline characters separate JSON objects. For a sample file, see thetest resources.To run the pipeline for JSONL files, the list of comma-separated headers also needs to be passed in the pipeline options.

./gradlew run ... -Pargs="... --filePattern=gs://${PROJECT_ID}-demo-data/CCRecords_sample.jsonl --headers=<comma_separated_list_of_headers>"

The original schema of the input file will not be preserved. This solution simplifies the data when it converts the datato a DLP API request object.

4. Avro

The pipeline handles Avro files similarly to how it handles CSV files. No additional changes are required to runthe pipeline except updating the--filePattern parameter. For example:

./gradlew run ... -Pargs="... --filePattern=gs://${PROJECT_ID}-demo-data/*.avro"

The original schema of the input file will not be preserved. This solution simplifies the data when it converts the datato a DLP API request object.

5. CSV files with custom delimiters

The pipeline supports CSV files with a custom delimiter. The delimiter has to be passed in the pipeline option as--columnDelimiter.

./gradlew run ... -Pargs="... --columnDelimiter=|"

5. Parquet

The inspection and de-identification pipelines support Parquet file format where data can be read from GCS storagebucket and the results of DLP Dataflow pipeline will be written in BigQuery tables. For sample data in Parquet fileformat, refer tomock-data.

No additional changes are required to run the pipeline except updating the--filePattern parameter. For example:

./gradlew run ... -Pargs="... --filePattern=gs://${PROJECT_ID}-demo-data/*.parquet"

The original schema of the input file will not be preserved. This solution simplifies the data when it converts the datato a DLP API request object.

6. ORC

Inspection of ORC files

The inspection pipeline will read ORC files from an input Cloud Storage bucket and the results will be written in BigQuerytables.

./gradlew run ... -Pargs="... --filePattern=gs://${PROJECT_ID}-demo-data/*.orc"
De-identification

The de-identification pipeline will read the data from an input Cloud Storage bucket. The de-identified results can bewritten in a BigQuery dataset as tables or an output Cloud Storage bucket as ORC files.

  1. To write the de-identified files to BigQuery tables:
./gradlew run ... -Pargs="... --filePattern=gs://${PROJECT_ID}-demo-data/*.orc"
  1. To write the de-identified files to a Cloud storage bucket:
./gradlew run ... -Pargs="... --filePattern=gs://${PROJECT_ID}-demo-data/*.orc \--outputBucket=<output_storage_bucket> ..."

In the above command, replaceoutput_storage_bucket with the URI of the Cloud Storage bucket where you want tostore the de-identified ORC files. The de-identification pipeline allows input files with varying schemas to beprocessed in the same pipeline.

Currently, this solution can process only primitive data types available in ORC format when the results are stored in anoutput Cloud Storage bucket.

For sample data in ORC file format, refer tomock-data.

Amazon S3 Scanner

To use Amazon S3 as a source of input files, use AWS credentials as instructed below.

  1. Create an AWS access key.

  2. Export the AWS access key, secret key, and credentials provider to your environment variables.

    export AWS_ACCESS_KEY="<access_key>"export AWS_SECRET_KEY="<secret_key>"export AWS_CRED="{\"@type\":\"AWSStaticCredentialsProvider\",\"awsAccessKeyId\":\"${AWS_ACCESS_KEY}\",\"awsSecretKey\":\"${AWS_SECRET_KEY}\"}"
  3. Use Gradle to build and run the job to perform DLP operations on a CSV file stored in Amazon S3. The results will be written to BigQuery.

    Update thefilePattern andawsRegion parameters with appropriate values in the following command.

    ./gradlew build// inspect is default as DLP Method; For deid: --DLPMethod=DEID./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 -Pargs="--region=${REGION}  --project=${PROJECT_ID}--streaming --enableStreamingEngine --tempLocation=gs://${PROJECT_ID}-demo-data/temp --numWorkers=1 --maxNumWorkers=2 --runner=DataflowRunner --filePattern=s3://<bucket>/file.csv --dataset=${BQ_DATASET_NAME}  --inspectTemplateName=${INSPECT_TEMPLATE_NAME}  --deidentifyTemplateName=${DEID_TEMPLATE_NAME}--awsRegion=<aws_region> --awsCredentialsProvider=${AWS_CRED} --serviceAccount=$SERVICE_ACCOUNT_EMAIL"

Parameters

  • --awsRegion: The region where the AWS resources reside.

  • --awsCredentialsProvider: The AWS credentials provider.

GCS as sink for DEID

The pipeline offers GCS as a sink for the DEID workflow. Currently, it supports the deidentification of CSV files and outputs the files in CSV format.

Following is the command to deidentify existing CSV files. UpdateoutputBucket parameter with the correct GCS path for storing the deidentified files.

./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 \-Pargs=" --region=${REGION} \--project=${PROJECT_ID} \--tempLocation=gs://${DATA_STORAGE_BUCKET}/temp \--numWorkers=1 --maxNumWorkers=2 \--runner=DataflowRunner \--filePattern=gs://${DATA_STORAGE_BUCKET}/*.csv \--inspectTemplateName=${INSPECT_TEMPLATE_NAME} \--deidentifyTemplateName=${DEID_TEMPLATE_NAME} \--batchSize=200000 \--DLPMethod=DEID \--outputBucket=gs://${DATA_STORAGE_BUCKET}/output \--serviceAccount=${SERVICE_ACCOUNT_EMAIL}"

Parameters

  • --outputBucket: The GCS path for storing the deidentified files

Adapt this pipeline for your use cases

The DLP templates used in this tutorial are specifically tailored for inspecting, de-identifying, and re-identifying sample data containing simulated personally identifiable information (PII). It is important to note that when working with your data, you should create custom DLP templates that align with the characteristics of the data being processed. For information about creating your templates, see the following:

  1. Create your inspection templates and run the inspection on sample data
  2. Create de-identification templates and run de-identification on sample data

Run this pipeline outside Cloud Shell

Cloud Shell comes with a pre-configured development environment that includes many of the most popular tools andlibraries, such as the Google Cloud CLI (gcloud CLI), Gradle, Java, Git, Vim, and more. This means that you don't haveto spend time setting up your own environment on your local machine. If you want to run this solution from your ownmachine, follow these steps:

  1. Create a new Google Cloud project.
  2. Set uprequired tools and libraries to run the DLP Dataflow pipelines.
  3. Set upApplication Default Credentials.
  4. Set the project id:
gcloud config set project <project_id>
  1. Create your credential file:
gcloud auth application-default login
  1. Set the project as a quota project:
gcloud auth application-default set-quota-project <project_id>
  1. Run the shell script:
sh setup-data-tokeninzation-solution-v2.sh
  1. Set the environment variables:
source set_env.sh
  1. Run the INSPECT/DEID/REID pipeline commands as described in thetutorial.

Required tools and libraries

This section provides a minimal list of tools and libraries along with their recommended versions that can be used torun this pipeline outside Google Shell.

  1. Install Java usingOpenJDK (recommended version: 19.0.1 and above).
  2. Use Gradle Wrapper to build the project:
    ./gradlew build
  3. Set upGit.
  4. Install theGoogle Cloud CLI.
  5. Downloadjq.

Troubleshooting

The following are issues you might encounter while running the pipeline, and the ways you can avoid or recover from them.

  • Duplicate rows: When writing data to a BigQuery table, Cloud DLP might write duplicate rows.

    The project uses the Streaming Inserts API of BigQuery, which by default, enables a best-effort deduplication mechanism. For a possible solution, seeHigh number of duplicates in Dataflow pipeline streaming inserts to BigQuery.

  • Errors in the transformation process. You can view the detailed errors in worker logs for the PTransform.

    1. INVALID_ARGUMENT: Too many findings to de-identify. Retry with a smaller request.

      DLP has a max finding per requestlimit of 3000. Run the pipeline again with a smaller batch size.

    2. RESOURCE_EXHAUSTED: Quota exceeded for quota metric 'Number of requests' and limit 'Number of requests per minute' of service 'dlp.googleapis.com'

      This can happen if the Dataflow pipeline is being run with a small batch size. Re-run the pipeline with a larger batch size.

      If increasing the batch size is not possible or if the issue persists even after reaching the maximum batch size, consider trying one of the following options:

      • The pipeline incorporates a retry mechanism for DLP API calls, utilizing an exponential delay approach. To enhance the retry behavior, you can adjust the value of thedlpApiRetryCount parameter. For more information, see the entries fordlpApiRetryCount andinitialBackoff inPipeline parameters.

      • The pipeline includes a parameter callednumShardsPerDLPRequestBatching. Decreasing this value below the default (100) will result in a lower number of concurrent requests sent to DLP.

      • Verify if any other pipelines or clients are generating DLP API requests.

      • Consider submitting a request toincrease the quota limit for Sensitive Data Protection.

Dataflow DAG

Inspection and de-identification

v2_dag_

Re-identification

v2_dag_

Advanced topics

  • Dataflow templates allow you to package a Dataflow pipeline for deployment. Instead of having to build the pipeline every time, you can create Flex Templates and deploy the template by using the Google Cloud console, the Google Cloud CLI, or REST API calls.
  • For more details, seeDataflow templates andFlex Templates.

Some considerations

  • The behavior of the pipeline is dependent on factors such as the length of the record, the number of findings per record, the DLP API quota on the project, and other applications or pipelines generating DLP API traffic.
  • You may need to adjust the parameters mentioned above. If you encounter errors, seeTroubleshooting on this page.
  • Most errors observed in the pipeline indicate that the parameters need to be adjusted. However, there may be error scenarios that the pipeline doesn't currently handle and these may require code changes.
  • Due to the reasons mentioned above, this solution should not be considered production-ready.

About

Multi Cloud Data Tokenization Solution By Using Dataflow and Cloud DLP

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

[8]ページ先頭

©2009-2026 Movatter.jp