Create continuous queries

This document describes how to run acontinuousquery in BigQuery.

BigQuery continuous queries are SQL statements that runcontinuously. Continuous queries let you analyze incoming data inBigQuery in real time, and then either export the results toBigtable, Pub/Sub, or Spanner, or write the results to aBigQuery table.

Choose an account type

You can create and run a continuous query job by using a user account, oryou can create a continuous query job by using a user account and then run itby using aservice account.You must use a service account to run a continuous query that exportsresults to a Pub/Sub topic.

When you use a user account, acontinuous query runs for up to two days. When you use a service account, acontinuous query runs for up to 150 days. For more information, seeAuthorization.

Required permissions

This section describes the permissions that you need to create and run acontinuous query. As an alternative to the Identity and Access Management (IAM) rolesmentioned, you could get the required permissions throughcustom roles.

Permissions when using a user account

This section provides information about the roles and permissions requiredto create and run a continuous query by using a user account.

To create a job in BigQuery, the user account must have thebigquery.jobs.create IAM permission. Each of thefollowing IAM roles grants thebigquery.jobs.create permission:

To export data from a BigQuery table, the user account must havethebigquery.tables.export IAM permission . Each of thefollowing IAM roles grants thebigquery.tables.exportpermission:

To update data in a BigQuery table, the user account must havethebigquery.tables.updateData IAM permission. Each of thefollowing IAM roles grants thebigquery.tables.updateDatapermission:

If the user account must enable the APIs required for yourcontinuous query use case, the user account must have theService Usage Admin (roles/serviceusage.serviceUsageAdmin)role.

Permissions when using a service account

This section provides information about the roles and permissions required bythe user account that creates the continuous query, and the service account thatruns the continuous query.

User account permissions

To create a job in BigQuery, the user account must have thebigquery.jobs.create IAM permission. Each of the followingIAM roles grants thebigquery.jobs.create permission:

To submit a job that runs using a service account, the user account must have theService Account User (roles/iam.serviceAccountUser)role. If you are using the same user account to create the service account,then the user account must have theService Account Admin (roles/iam.serviceAccountAdmin)role. For information on how to limit a user's access to single service account,rather than to all service accounts within a project, seeGrant a single role.

If the user account must enable the APIs required for yourcontinuous query use case, the user account must have theService Usage Admin (roles/serviceusage.serviceUsageAdmin)role.

Service account permissions

To export data from a BigQuery table, the service account musthave thebigquery.tables.export IAM permission. Each of thefollowing IAM roles grants thebigquery.tables.exportpermission:

To update data in a BigQuery table, the service account must havethebigquery.tables.updateData IAM permission. Each of thefollowing IAM roles grants thebigquery.tables.updateDatapermission:

Before you begin

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.create permission.Learn how to grant roles.
    Note: If you don't plan to keep the resources that you create in this procedure, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project.

    Go to project selector

  2. Verify that billing is enabled for your Google Cloud project.

  3. Enable the BigQuery API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enable permission.Learn how to grant roles.

    Enable the API

Create a reservation

Create an Enterprise or Enterprise Plus edition reservation,and thencreate a reservation assignmentwith aCONTINUOUS job type. This reservation can useautoscalingandidle slot sharing.There arereservation limitationsthat apply to reservation assignments for continuous queries.

Export to Pub/Sub

Additional APIs, IAM permissions, and Google Cloud resources arerequired to export data to Pub/Sub. For more information, seeExport to Pub/Sub.

Embed custom attributes as metadata in Pub/Sub messages

You can usePub/Sub attributesto provide additional information about the message, such as its priority,origin, destination, or additional metadata. You can also use attributes tofilter messages on the subscription.

Within a continuous query result, if a column is named_ATTRIBUTES,then its values are copied to the Pub/Sub message attributes.The provided fields within_ATTRIBUTES are used as attribute keys.

The_ATTRIBUTES column must be ofJSON type, in the formatARRAY<STRUCT<STRING, STRING>> orSTRUCT<STRING>.

For an example, seeexport data to a Pub/Sub topic.

Export to Bigtable

Additional APIs, IAM permissions, and Google Cloudresources are required to export data to Bigtable. For moreinformation, seeExport to Bigtable.

Export to Spanner

Preview

This feature is subject to the "Pre-GA Offerings Terms" in the General Service Terms section of theService Specific Terms. Pre-GA features are available "as is" and might have limited support. For more information, see thelaunch stage descriptions.

Additional APIs, IAM permissions, and Google Cloudresources are required to export data to Spanner. For moreinformation, seeExport to Spanner (reverse ETL).

Write data to a BigQuery table

You can write data to a BigQuery table by using anINSERT statement.

Use AI functions

Additional APIs, IAM permissions, and Google Cloudresources are required to use asupportedAI function in a continuous query. For more information, see one of thefollowing topics, based on your use case:

When you use an AI function in a continuous query, consider whether the queryoutput will remain within thequota for the function.If you exceed the quota, you might have to separately handle the records thatdon't get processed.

Specify a starting point

You must use theAPPENDS functionin theFROM clause of a continuous query to specify the earliest data toprocess. For example,APPENDS(TABLE my_table, CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) tellsBigQuery to process data that was added to the tablemy_tableat most 10 minutes before the start of the continuous query.Data that's added tomy_table is processed as it comes in. There isno imposed delay on data processing.Don't provide anend_timestamp argument to theAPPENDS functionwhen you use it in a continuous query.

Note: When used in a continuous query, theAPPENDS function is considered tobe at theGeneral Availability (GA) launchstage.

The following example shows how to start a continuous query from a particularpoint in time by using theAPPENDS function, when querying aBigQuery table that is receiving streaming taxi ride information:

EXPORTDATAOPTIONS(format='CLOUD_PUBSUB',uri='https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')AS(SELECTTO_JSON_STRING(STRUCT(ride_id,timestamp,latitude,longitude))ASmessageFROMAPPENDS(TABLE`myproject.real_time_taxi_streaming.taxirides`,-- Configure the APPENDS TVF start_timestamp to specify when you want to-- start processing data using your continuous query.-- This example starts processing at 10 minutes before the current time.CURRENT_TIMESTAMP()-INTERVAL10MINUTE)WHEREride_status='enroute');

Specify a starting point earlier than the current time

If you want to process data from before the current point in time, you can usetheAPPENDS function to specify an earlier starting point for the query. Thestarting point that you specify must fall within thetime travel windowfor the table that you are selecting from. The time travel window covers thepast seven days by default.

To include data that is outside of the time travel window,use a standard query to insert or export data upto a particular point in time, and then start a continuous query from that pointin time.

Example

The following example shows how to load older data from aBigQuery table that is receiving streaming taxi ride informationup to particular point in time into a table, and then start a continuous queryfrom the cutoff point for the older data.

  1. Run a standard query to backfill data up to a particular point in time:

    INSERTINTO`myproject.real_time_taxi_streaming.transformed_taxirides`SELECTtimestamp,meter_reading,ride_status,passenger_count,ST_Distance(ST_GeogPoint(pickup_longitude,pickup_latitude),ST_GeogPoint(dropoff_longitude,dropoff_latitude))ASeuclidean_trip_distance,SAFE_DIVIDE(meter_reading,passenger_count)AScost_per_passengerFROM`myproject.real_time_taxi_streaming.taxirides`-- Include all data inserted into the table up to this point in time.-- This timestamp must be within the time travel window.FORSYSTEM_TIMEASOF'2025-01-01 00:00:00 UTC'WHEREride_status='dropoff';
  2. Run a continuous query from the point in time at which the querystopped:

    INSERTINTO`myproject.real_time_taxi_streaming.transformed_taxirides`SELECTtimestamp,meter_reading,ride_status,passenger_count,ST_Distance(ST_GeogPoint(pickup_longitude,pickup_latitude),ST_GeogPoint(dropoff_longitude,dropoff_latitude))ASeuclidean_trip_distance,SAFE_DIVIDE(meter_reading,passenger_count)AScost_per_passengerFROMAPPENDS(TABLE`myproject.real_time_taxi_streaming.taxirides`,-- Configure the APPENDS TVF start_timestamp to start processing-- data right where the batch query left off.-- This timestamp must be within the time travel window.TIMESTAMP'2025-01-01 00:00:00 UTC'+INTERVAL1MICROSECOND)WHEREride_status='dropoff';

Run a continuous query by using a user account

This section describes how to run a continuous query by using a user account.After the continuous query is running, you can close the Google Cloud console,terminal window, or application without interrupting query execution. Acontinuous query run by a user account runs for a maximum of two days and thenautomatically stops. To continue processing new incoming data, start a newcontinuous query andspecify a starting point.To automate this process, seeretry failed queries.

Follow these steps to run a continuous query:

Console

  1. In the Google Cloud console, go to theBigQuery page.

    Go to BigQuery

  2. In the query editor, clickMore.

    1. In theChoose query mode section, chooseContinuous query.
    2. ClickConfirm.
    3. Optional: To control how long the query runs, clickQuery settingsand set theJob timeout in milliseconds.
  3. In the query editor, type in the SQL statement for the continuous query.The SQL statement must only containsupported operations.

  4. ClickRun.

bq

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

  2. In Cloud Shell, run the continuous query by using thebq query commandwith the--continuous flag:

    bqquery--use_legacy_sql=false--continuous=true'QUERY'

    ReplaceQUERY with the SQL statement for thecontinuous query. The SQL statement must only containsupported operations.You can control how long the query runs by using the--job_timeout_msflag.

API

Run the continuous query by calling thejobs.insert method.You must set thecontinuous field totrue in theJobConfigurationQueryof theJob resource that you pass in.You can optionally control how long the query runs by setting thejobTimeoutMs field.

curl--requestPOST\"https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs"\--header"Authorization: Bearer$(gcloudauthprint-access-token)"\--header"Content-Type: application/json; charset=utf-8"\--data'{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true}}}'\--compressed

Replace the following:

  • PROJECT_ID: your project ID.
  • QUERY: the SQL statement for thecontinuous query. The SQL statement must only containsupported operations.

Run a continuous query by using a service account

This section describes how to run a continuous query by using a service account.After the continuous query is running, you can close the Google Cloud console,terminal window, or application without interrupting query execution. Acontinuous query run by using a service account can run for up to 150 days andthen automatically stops. To continue processing new incoming data, start a newcontinuous query andspecify a starting point.To automate this process, seeretry failed queries.

Follow these steps to use a service account to run a continuous query:

Console

  1. Create a service account.
  2. Grant the requiredpermissions to the service account.
  3. In the Google Cloud console, go to theBigQuery page.

    Go to BigQuery

  4. In the query editor, clickMore.

  5. In theChoose query mode section, chooseContinuous query.

  6. ClickConfirm.

  7. In the query editor, clickMore>Query settings.

  8. In theContinuous query section, use theService account boxto select the service account that you created.

  9. Optional: To control how long the query runs, set theJob timeoutin milliseconds.

  10. ClickSave.

  11. In the query editor, type in the SQL statement for the continuous query.The SQL statement must only containsupported operations.

  12. ClickRun.

bq

  1. Create a service account.
  2. Grant the requiredpermissions to the service account.
  3. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

  4. On the command line, run the continuous query by using thebq query commandwith the following flags:

    • Set the--continuous flag totrue to make the query continuous.
    • Use the--connection_property flag to specify a service account to use.
    • Optional: Set the--job_timeout_ms flag to limit the query runtime.
    bqquery--project_id=PROJECT_ID--use_legacy_sql=false\--continuous=true--connection_property=service_account=SERVICE_ACCOUNT_EMAIL\'QUERY'

    Replace the following:

    • PROJECT_ID: your project ID.
    • SERVICE_ACCOUNT_EMAIL: the serviceaccount email. You can get the service account email from theService accounts pageof the Google Cloud console.
    • QUERY: the SQL statement for thecontinuous query. The SQL statement must only containsupported operations.

API

  1. Create a service account.
  2. Grant the requiredpermissions to the service account.
  3. Run the continuous query by calling thejobs.insert method.Set the following fields in theJobConfigurationQuery resourceof theJob resource that you pass in:

    • Set thecontinuous field totrue to make the query continuous.
    • Use theconnectionProperties field to specify a service account to use.

    You can optionally control how long the query runs by setting thejobTimeoutMs fieldin theJobConfiguration resource.

    curl--requestPOST\"https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs"\--header"Authorization: Bearer$(gcloudauthprint-access-token)"\--header"Content-Type: application/json; charset=utf-8"\--data'{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":[{"key":"service_account","value":"SERVICE_ACCOUNT_EMAIL"}]}}}'\--compressed

    Replace the following:

    • PROJECT_ID: your project ID.
    • QUERY: the SQL statement for thecontinuous query. The SQL statement must only containsupported operations.
    • SERVICE_ACCOUNT_EMAIL: the serviceaccount email. You can get the service account email on theService accounts pageof the Google Cloud console.

Create a custom job ID

Every query job is assigned a job ID that you can use to search for andmanage the job. By default, job IDs are randomly generated. To make it easierto search for the job ID of a continuous query usingjob history orjobs explorer, you can assigna custom job ID prefix:

  1. In the Google Cloud console, go to theBigQuery page.

    Go to BigQuery

  2. In the query editor, clickMore.

  3. In theChoose query mode section, chooseContinuous query.

  4. ClickConfirm.

  5. In the query editor, clickMore> Query settings.

  6. In theCustom job ID prefix section, enter a custom name prefix.

  7. ClickSave.

Examples

The following SQL examples show common use cases for continuous queries.

Export data to a Pub/Sub topic

The following example shows a continuous query that filters data from aBigQuery table that is receiving streaming taxi ride information,and publishes the data to a Pub/Sub topic in real time withmessage attributes:

EXPORTDATAOPTIONS(format='CLOUD_PUBSUB',uri='https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')AS(SELECTTO_JSON_STRING(STRUCT(ride_id,timestamp,latitude,longitude))ASmessage,TO_JSON(STRUCT(CAST(passenger_countASSTRING)ASpassenger_count))AS_ATTRIBUTESFROMAPPENDS(TABLE`myproject.real_time_taxi_streaming.taxi_rides`,-- Configure the APPENDS TVF start_timestamp to specify when you want to-- start processing data using your continuous query.-- This example starts processing at 10 minutes before the current time.CURRENT_TIMESTAMP()-INTERVAL10MINUTE)WHEREride_status='enroute');

Export data to a Bigtable table

The following example shows a continuous query that filters data from aBigQuery table that is receiving streaming taxi ride information,and exports the data into a Bigtable table in real time:

EXPORTDATAOPTIONS(format='CLOUD_BIGTABLE',truncate=TRUE,overwrite=TRUE,uri='https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides')AS(SELECTCAST(CONCAT(ride_id,timestamp,latitude,longitude)ASSTRING)ASrowkey,STRUCT(timestamp,latitude,longitude,meter_reading,ride_status,passenger_count)ASfeaturesFROMAPPENDS(TABLE`myproject.real_time_taxi_streaming.taxirides`,-- Configure the APPENDS TVF start_timestamp to specify when you want to-- start processing data using your continuous query.-- This example starts processing at 10 minutes before the current time.CURRENT_TIMESTAMP()-INTERVAL10MINUTE)WHEREride_status='enroute');

Export data to a Spanner table

The following example shows a continuous query that filters data from aBigQuery table that is receiving streaming taxi ride information,and then exports the data into a Spanner table in real time:

EXPORTDATAOPTIONS(format='CLOUD_SPANNER',uri='https://spanner.googleapis.com/projects/myproject/instances/myspannerinstance/databases/taxi-real-time-rides',spanner_options="""{      "table": "rides",      -- To ensure data is written to Spanner in the correct sequence      -- during a continuous export, use the change_timestamp_column      -- option. This should be mapped to a timestamp column from your      -- BigQuery data. If your source data lacks a timestamp, the      -- _CHANGE_TIMESTAMP pseudocolumn provided by the APPENDS function      -- will be automatically mapped to the "change_timestamp" column.      "change_timestamp_column": "change_timestamp"   }""")AS(SELECTride_id,latitude,longitude,meter_reading,ride_status,passenger_count,_CHANGE_TIMESTAMPaschange_timestampFROMAPPENDS(TABLE`myproject.real_time_taxi_streaming.taxirides`,-- Configure the APPENDS TVF start_timestamp to specify when you want to-- start processing data using your continuous query.-- This example starts processing at 10 minutes before the current time.CURRENT_TIMESTAMP()-INTERVAL10MINUTE)WHEREride_status='enroute');

Write data to a BigQuery table

The following example shows a continuous query that filters and transforms datafrom a BigQuery table that is receiving streaming taxi rideinformation, and then writes the data to another BigQuery tablein real time. This makes the data available for further downstream analysis.

INSERTINTO`myproject.real_time_taxi_streaming.transformed_taxirides`SELECTtimestamp,meter_reading,ride_status,passenger_count,ST_Distance(ST_GeogPoint(pickup_longitude,pickup_latitude),ST_GeogPoint(dropoff_longitude,dropoff_latitude))ASeuclidean_trip_distance,SAFE_DIVIDE(meter_reading,passenger_count)AScost_per_passengerFROMAPPENDS(TABLE`myproject.real_time_taxi_streaming.taxirides`,-- Configure the APPENDS TVF start_timestamp to specify when you want to-- start processing data using your continuous query.-- This example starts processing at 10 minutes before the current time.CURRENT_TIMESTAMP()-INTERVAL10MINUTE)WHEREride_status='dropoff';

Process data by using a Vertex AI model

The following example shows a continuous query which uses aVertex AI model to generate an advertisement for taxi ridersbased on their current latitude and longitude, and then exports the resultsinto a Pub/Sub topic in real time:

EXPORTDATAOPTIONS(format='CLOUD_PUBSUB',uri='https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')AS(SELECTTO_JSON_STRING(STRUCT(ride_id,timestamp,latitude,longitude,prompt,result))ASmessageFROMAI.GENERATE_TEXT(MODEL`myproject.real_time_taxi_streaming.taxi_ml_generate_model`,(SELECTtimestamp,ride_id,latitude,longitude,CONCAT('Generate an ad based on the current latitude of ',latitude,' and longitude of ',longitude)ASpromptFROMAPPENDS(TABLE`myproject.real_time_taxi_streaming.taxirides`,-- Configure the APPENDS TVF start_timestamp to specify when you-- want to start processing data using your continuous query.-- This example starts processing at 10 minutes before the current time.CURRENT_TIMESTAMP()-INTERVAL10MINUTE)WHEREride_status='enroute'),STRUCT(50ASmax_output_tokens,1.0AStemperature,40AStop_k,1.0AStop_p))ASml_output);

Modify the SQL of a continuous query

You can't update the SQL used in a continuous query while the continuous queryjob is running. You must cancel the continuous query job, modify the SQL,and then a start a new continuous query job from the point where you stoppedthe original continuous query job.

Follow these steps to modify the SQL used in a continuous query:

  1. View the job details for thecontinuous query job that you want to update, and note the job ID.
  2. If possible, pause collection of upstream data. If you can't do this, youmight get some data duplication when the continuous query is restarted.
  3. Cancel the continuous querythat you want to modify.
  4. Get theend_time value for the original continuous query job by using theINFORMATION_SCHEMAJOBS view:

    SELECTend_timeFROM`PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECTWHEREEXTRACT(DATEFROMcreation_time)=current_date()ANDerror_result.reason='stopped'ANDjob_id='JOB_ID';

    Replace the following:

    • PROJECT_ID: your project ID.
    • REGION: the region used by your project.
    • JOB_ID: the continuous query job ID that youidentified in Step 1.
  5. Modify the continuous query SQL statement tostart the continuous query from a particular point in time,using theend_time value that you retrieved in Step 5 as the startingpoint.

  6. Modify the continuous query SQL statement to reflect your needed changes.

  7. Run the modified continuous query.

Cancel a continuous query

You cancancel a continuous queryjob just like any other job. It might take up to a minute for the query tostop running after the job is cancelled.

If you cancel and then restart a query, the restarted query behaves like a new,independent query. The restarted query doesn't start processing data where theprevious job stopped and can't reference the previous query's results. SeeStart a continuous query from a particular point in time.

Monitor queries and handle errors

A continuous query might be interrupted due to factors such as datainconsistencies, schema changes, temporary service disruptions, or maintenance.Although BigQuery handles some transient errors, best practicesfor improving job resiliency include the following:

What's next

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2025-12-15 UTC.