Create a target campaign pipeline

Learn how to use Cloud Data Fusion to clean, transform, and processcustomer data to select candidates for a target campaign.


To follow step-by-step guidance for this task directly in the Google Cloud console, clickGuide me:

Guide me


Scenario

You want to create custom marketing materials for an ongoing campaign promotion,and you'd like to distribute the materials directly to the home mailboxes ofyour customers.

Your campaign has two constraints:

  • Location: You only deliver to customers in California, Washington,and Oregon.
  • Cost: To save on fuel, you deliver to quickly accessible customerhomes. You deliver only to customers who live on avenues.

This tutorial shows you how to generate the list of customer addresses for thecampaign. In this tutorial, you do the following:

  1. Clean the customer data: filter customers that live on an avenue inCalifornia, Washington, or Oregon.
  2. Create a pipeline that does the following:

    • Joins the filtered customer data with a public dataset that containsstate abbreviations.
    • Stores the cleaned and joined data in a BigQuery tablethat you can query (by using the BigQuery web interface)or analyze (by using Looker Studio).

Objectives

  • Connect Cloud Data Fusion to two data sources
  • Apply basic transformations
  • Join the two data sources
  • Write the output data to a sink

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.create permission.Learn how to grant roles.
    Note: If you don't plan to keep the resources that you create in this procedure, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.create permission.Learn how to grant roles.
    Note: If you don't plan to keep the resources that you create in this procedure, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project.

    Go to project selector

  5. Verify that billing is enabled for your Google Cloud project.

  6. Enable the Cloud Data Fusion, BigQuery, Cloud Storage, and Dataproc APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enable permission.Learn how to grant roles.

    Enable the APIs

  7. Create a Cloud Data Fusion instance.
    This tutorial assumes that you use the default Compute Engine service account.
Note: Ensure that you check the estimated cost before proceeding. Togenerate a cost estimate based on your projected usage, use thepricing calculator.

Manage permissions

Create and assign the required custom roles and permissions.

Create a custom role and add permissions

  1. In the Google Cloud console, go to theRoles page:

    Go to the Roles page

  2. Click Create role.

  3. In theTitle field, enterCustom Role-Tutorial.

  4. Click Add permissions.

  5. In theAdd permissions window, select the following permissions and clickAdd:

    • bigquery.datasets.create
    • bigquery.jobs.create
    • storage.buckets.create
  6. ClickCreate.

Assign custom role to the default Compute Engine service account

  1. Go to the Cloud Data FusionInstances page:

    Create an instance

  2. Click the name of your instance.

  3. Make a note of the defaultDataproc Service Account. The instancedetails page contains this information.

    The following is the format of the Dataproc service accountname:

    CUSTOMER_PROJECT_NUMBER-compute@developer.gserviceaccount.com.

    Learn more aboutDataproc service accounts.

  4. Go to theIAM page:

    Go to the Roles page

  5. In theFilter bar, enter the name of your default Dataprocservice account.

  6. For your default Compute Engine service account, click Edit.

  7. Click Add another role.

  8. In theSelect a role field, selectCustom Role-Tutorial.

  9. ClickSave.

  10. Ensure that the service account is already assigned the Cloud Data FusionRunner role.

Prepare the customer data

This tutorial requires the following two input datasets, both of which areprovided with your Cloud Data Fusion instance:

  • Sample customer data: A CSV file namedcustomers.csv.
  • State abbreviations: A BigQuery table namedstate_abbreviations.

Load the customer data

Note: The customer data is in a Cloud Storage bucket. TheCloud Storage bucket is publicly available through theSampleBuckets connection, provided by default with your Cloud Data Fusioninstance.
  1. Go to the Cloud Data FusionInstances page:

    Go to Instances

  2. For the Cloud Data Fusion instance you are using, clickView instance. The Cloud Data Fusion web interface opens in a new tab.

  3. ClickWrangler. TheWrangler page opens.

  4. In theConnections pane,GCS> Sample Buckets.

  5. Clickcampaign-tutorial.

  6. Clickcustomers.csv.

  7. In theParsing options window, specify the following:

    • Format:csv
    • Enable quoted value:False
    • Use first row as header:False
    • File-encoding:UTF-8
  8. ClickConfirm. Customer data is loaded in a new tab in Wrangler.

    Loaded customer data

Clean the customer data

This contains two sub-tasks:

  • Setting the schema
  • Filtering the customer data to present only the target audience you need

Set the schema

Set the schema of the data by assigning appropriate names to the tablecolumns. To give the columns, such asbody_1 andbody_2, more informativenames, follow these steps:.

  1. In the right pane, click theColumns tab.
  2. Click theColumn names drop-down and selectSet all.
  3. In theBulk set column names dialog, enter thefollowing, comma-separated column names:

    Name,StreetAddress,City,State,Country
  4. ClickApply.

Filter the data

Filter the data to display only customers that live in California, Oregon, orWashington.

Remove all rows that contain values other than those states:

  1. Click theState column drop-down and selectFilter.
  2. In the filter window, do the following:

    1. ClickKeep rows.
    2. Click theIf drop-down, and selectvalue matches regex.
    3. Enter the following regular expression:

      ^(California|Oregon|Washington)$
    4. ClickApply.

    The values in theState column areCalifornia,Oregon, orWashington.

Filter the data to display only customers that live on avenues. Keeponly the addresses that contain the stringAvenue:

  1. Click theStreetAddress column drop-down, and selectFilter.
  2. In the filter window, do the following:
    1. ClickKeep rows.
    2. Click theIf drop-down, selectvalue contains, and enterAvenue.
    3. SelectIgnore case.
    4. ClickApply.

Before performing parallel-processing jobs on your entire dataset, Wranglerdisplays only the first 1000 values of your dataset. Because you filteredsome data, only a few customers remain in the Wrangler display.

Create a batch pipeline

You've cleaned your data and you've run transformations on a subset of yourdata. You can now create a batch pipeline to run transformations on your entiredataset.

Cloud Data Fusion translates the pipeline that you build in the Studiointo an Apache Spark program that executes transformations inparallel on an ephemeral Dataproc cluster. This process lets youexecute complex transformations over vast quantities of data in a scalable,reliable manner, without having to handle the infrastructure.

  1. On the Wrangler page, clickCreate a pipeline.
  2. SelectBatch pipeline. The Studio page opens.
  3. On the Studio page, aGCSFile source node is connected to aWranglernode.

    GCSFile node connected to Wrangler node

    The transformations you applied on the Wrangler page appear in theWrangler node on the Studio page.

  4. To view the transformations that you applied, hold the pointer over theWrangler node and clickProperties.

    The transformations you applied appear in theDirectives.

    View applied transformations

  5. ClickValidate.

  6. Click Close.

You can apply more transformations by clickingWrangle, which takes you backto the Wrangler page. The transformation that you added appears on the Studiopage.

For example, you realize theCountry column isn't needed because the valueis alwaysUSA. You delete the column by following these steps:

  1. ClickWrangle.
  2. Click the down arrow next toCountry and selectDelete Column.
  3. ClickApply. The Wrangler page closes and the Wrangler Properties windowopens on the Studio page. In theDirectives,drop Country appears.
  4. ClickClose.

Abbreviate the state names

The navigation system in your delivery vehicle only recognizes addresses thatcontain abbreviated state names (CA, not California), and yourcustomer data contains full state names.

The public BigQuerystate_abbreviations table contains twocolumns: one with the full state names and one with the abbreviated state names.You can use this table to update the state names in your customer data.

View the state names data in BigQuery

  1. In a separate tab, go to the BigQuery Studio page:

    Go to BigQuery

  2. ClickCreate SQL query and enter the following query in the query editor:

    SELECT * FROM `dis-user-guide.campaign_tutorial.state_abbreviations`
  3. ClickRun.

    BigQuery displays a list of state names and theirabbreviations.

Access the BigQuery table

Add a source in your pipeline that will access the BigQuerystate_abbreviations table.

  1. Go to the Cloud Data Fusion Studio page and expand theSource menu.
  2. ClickBigQuery.

    A BigQuery source node appears on the canvas, along withthe other two nodes.

  3. Hold the pointer over theBigQuery source node and clickProperties.

    1. In theDataset Project ID field, enterdis-user-guide.
    2. In theReference Name field, enterstate_abbreviations.
    3. In theDataset field, entercampaign_tutorial.
    4. In theTable field, enterstate_abbreviations.
  4. Populate the schema of the table from BigQuery by clickingGet Schema.

  5. ClickClose.

Join the two data sources

To generate output that contains customer data with abbreviated state names, join the two data sources, the customer data, and the state abbreviations.

  1. Go to the Cloud Data Fusion Studio page and expand theAnalyticsmenu.
  2. ClickJoiner.

    AJoiner node, representing an action similar to a SQL Join, appearson the canvas.

  3. Connect theWrangler node and theBigQuery node to theJoiner node: Drag a connection arrow on the right edge of the sourcenode and drop onto the destination node.

    Join Wrangler and BigQuery nodes to Joiner node

  4. Hold the pointer over theJoiner node and clickProperties.

    1. In theFields section, expandWrangler andBigQuery.

      1. Clear the Wranglerstate checkbox.
      2. Clear the BigQueryname checkbox because you wantonly the abbreviated state name and not the full state name.
      3. Keep the BigQueryabbreviation checkboxselected, and change the alias toState.

        Joiner node properties

    2. In theJoin Type field, leave the value asOuter. ForRequired inputs, select theWrangler checkbox.

    3. In theJoin condition section, for Wrangler, selectState. For BigQuery, selectname.

    4. Generate the schema of the resultant join. ClickGet Schema.

    5. ClickValidate.

    6. ClickClose.

Store the output to BigQuery

Store the result of your pipeline into a BigQuery table.Where you store your data is called a sink.

  1. Go to the Cloud Data Fusion Studio page and expandSink.
  2. ClickBigQuery.
  3. Connect theJoiner node to theBigQuery node.

    Connect Joiner node and BigQuery node

  4. Hold the pointer over theBigQuery node and clickProperties.

    1. In theDataset field, enterdis_user_guide.
    2. In theTable field, selectcustomer_data_abbreviated_states.
    3. ClickClose.

Deploy and run the pipeline

  1. On the Studio page, clickName your pipeline and enterCampaignPipeline.
  2. ClickSave.
  3. In the upper-right corner, clickDeploy.
  4. After deployment completes, clickRun.

Running your pipeline can take a few minutes. While you wait, you can observetheStatus of the pipeline transition fromProvisioning>Starting>Running>Deprovisioning>Succeeded.

View the results

  1. In the Google Cloud console, go to the BigQuery page:

    Go to BigQuery

  2. ClickCreate SQL query.

  3. Query thecustomer_data_abbreviated_states table:

    SELECT * FROM dis_user_guide.customer_data_abbreviated_states LIMIT 1000

    View the results

    Note: For further analysis, you canconnect this table to Looker Studio.

You have successfully created a data pipeline.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used on this page, follow these steps.

Delete the BigQuery dataset

To delete the BigQuery dataset that you created in thistutorial, do the following:

  1. In the Google Cloud console, go to the BigQuery page.

    Go to BigQuery

  2. Select thedis_user_guide dataset.
  3. ClickDelete dataset.

Delete the Cloud Data Fusion instance

Follow these instructions to delete your Cloud Data Fusion instance.

Delete the project

The easiest way to eliminate billing is to delete the project that you created for the tutorial.

To delete the project:

    Caution: Deleting a project has the following effects:
    • Everything in the project is deleted. If you used an existing project for the tasks in this document, when you delete it, you also delete any other work you've done in the project.
    • Custom project IDs are lost. When you created this project, you might have created a custom project ID that you want to use in the future. To preserve the URLs that use the project ID, such as anappspot.com URL, delete selected resources inside the project instead of deleting the whole project.

    If you plan to explore multiple architectures, tutorials, or quickstarts, reusing projects can help you avoid exceeding project quota limits.

  1. In the Google Cloud console, go to theManage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then clickDelete.
  3. In the dialog, type the project ID, and then clickShut down to delete the project.

What's next

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2025-12-15 UTC.