Create continuous queries
This document describes how to run acontinuousquery in BigQuery.
BigQuery continuous queries are SQL statements that runcontinuously. Continuous queries let you analyze incoming data inBigQuery in real time, and then either export the results toBigtable, Pub/Sub, or Spanner, or write the results to aBigQuery table.
Choose an account type
You can create and run a continuous query job by using a user account, oryou can create a continuous query job by using a user account and then run itby using aservice account.You must use a service account to run a continuous query that exportsresults to a Pub/Sub topic.
When you use a user account, acontinuous query runs for up to two days. When you use a service account, acontinuous query runs for up to 150 days. For more information, seeAuthorization.
Required permissions
This section describes the permissions that you need to create and run acontinuous query. As an alternative to the Identity and Access Management (IAM) rolesmentioned, you could get the required permissions throughcustom roles.
Permissions when using a user account
This section provides information about the roles and permissions requiredto create and run a continuous query by using a user account.
To create a job in BigQuery, the user account must have thebigquery.jobs.create IAM permission. Each of thefollowing IAM roles grants thebigquery.jobs.create permission:
- BigQuery User (
roles/bigquery.user) - BigQuery Job User (
roles/bigquery.jobUser) - BigQuery Admin (
roles/bigquery.admin)
To export data from a BigQuery table, the user account must havethebigquery.tables.export IAM permission . Each of thefollowing IAM roles grants thebigquery.tables.exportpermission:
- BigQuery Data Viewer (
roles/bigquery.dataViewer) - BigQuery Data Editor (
roles/bigquery.dataEditor) - BigQuery Data Owner (
roles/bigquery.dataOwner) - BigQuery Admin (
roles/bigquery.admin)
To update data in a BigQuery table, the user account must havethebigquery.tables.updateData IAM permission. Each of thefollowing IAM roles grants thebigquery.tables.updateDatapermission:
- BigQuery Data Editor (
roles/bigquery.dataEditor) - BigQuery Data Owner (
roles/bigquery.dataOwner) - BigQuery Admin (
roles/bigquery.admin)
If the user account must enable the APIs required for yourcontinuous query use case, the user account must have theService Usage Admin (roles/serviceusage.serviceUsageAdmin)role.
Permissions when using a service account
This section provides information about the roles and permissions required bythe user account that creates the continuous query, and the service account thatruns the continuous query.
User account permissions
To create a job in BigQuery, the user account must have thebigquery.jobs.create IAM permission. Each of the followingIAM roles grants thebigquery.jobs.create permission:
- BigQuery User (
roles/bigquery.user) - BigQuery Job User (
roles/bigquery.jobUser) - BigQuery Admin (
roles/bigquery.admin)
To submit a job that runs using a service account, the user account must have theService Account User (roles/iam.serviceAccountUser)role. If you are using the same user account to create the service account,then the user account must have theService Account Admin (roles/iam.serviceAccountAdmin)role. For information on how to limit a user's access to single service account,rather than to all service accounts within a project, seeGrant a single role.
If the user account must enable the APIs required for yourcontinuous query use case, the user account must have theService Usage Admin (roles/serviceusage.serviceUsageAdmin)role.
Service account permissions
To export data from a BigQuery table, the service account musthave thebigquery.tables.export IAM permission. Each of thefollowing IAM roles grants thebigquery.tables.exportpermission:
- BigQuery Data Viewer (
roles/bigquery.dataViewer) - BigQuery Data Editor (
roles/bigquery.dataEditor) - BigQuery Data Owner (
roles/bigquery.dataOwner) - BigQuery Admin (
roles/bigquery.admin)
bigquery.tables.updateData IAM permission. Each of thefollowing IAM roles grants thebigquery.tables.updateDatapermission:- BigQuery Data Editor (
roles/bigquery.dataEditor) - BigQuery Data Owner (
roles/bigquery.dataOwner) - BigQuery Admin (
roles/bigquery.admin)
Before you begin
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Note: If you don't plan to keep the resources that you create in this procedure, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project.Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
- Create a project: To create a project, you need the Project Creator role (
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission.Learn how to grant roles.
Verify that billing is enabled for your Google Cloud project.
Enable the BigQuery API.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission.Learn how to grant roles.
Create a reservation
Create an Enterprise or Enterprise Plus edition reservation,and thencreate a reservation assignmentwith aCONTINUOUS job type. This reservation can useautoscalingandidle slot sharing.There arereservation limitationsthat apply to reservation assignments for continuous queries.
Export to Pub/Sub
Additional APIs, IAM permissions, and Google Cloud resources arerequired to export data to Pub/Sub. For more information, seeExport to Pub/Sub.
Embed custom attributes as metadata in Pub/Sub messages
You can usePub/Sub attributesto provide additional information about the message, such as its priority,origin, destination, or additional metadata. You can also use attributes tofilter messages on the subscription.
Within a continuous query result, if a column is named_ATTRIBUTES,then its values are copied to the Pub/Sub message attributes.The provided fields within_ATTRIBUTES are used as attribute keys.
The_ATTRIBUTES column must be ofJSON type, in the formatARRAY<STRUCT<STRING, STRING>> orSTRUCT<STRING>.
For an example, seeexport data to a Pub/Sub topic.
Export to Bigtable
Additional APIs, IAM permissions, and Google Cloudresources are required to export data to Bigtable. For moreinformation, seeExport to Bigtable.
Export to Spanner
Preview
This feature is subject to the "Pre-GA Offerings Terms" in the General Service Terms section of theService Specific Terms. Pre-GA features are available "as is" and might have limited support. For more information, see thelaunch stage descriptions.
Additional APIs, IAM permissions, and Google Cloudresources are required to export data to Spanner. For moreinformation, seeExport to Spanner (reverse ETL).
Write data to a BigQuery table
You can write data to a BigQuery table by using anINSERT statement.
Use AI functions
Additional APIs, IAM permissions, and Google Cloudresources are required to use asupportedAI function in a continuous query. For more information, see one of thefollowing topics, based on your use case:
- Generate text by using the
AI.GENERATE_TEXTfunction - Generate text embeddings by using the
AI.GENERATE_EMBEDDINGfunction - Understand text with the
ML.UNDERSTAND_TEXTfunction - Translate text with the
ML.TRANSLATEfunction
When you use an AI function in a continuous query, consider whether the queryoutput will remain within thequota for the function.If you exceed the quota, you might have to separately handle the records thatdon't get processed.
Specify a starting point
You must use theAPPENDS functionin theFROM clause of a continuous query to specify the earliest data toprocess. For example,APPENDS(TABLE my_table, CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) tellsBigQuery to process data that was added to the tablemy_tableat most 10 minutes before the start of the continuous query.Data that's added tomy_table is processed as it comes in. There isno imposed delay on data processing.Don't provide anend_timestamp argument to theAPPENDS functionwhen you use it in a continuous query.
APPENDS function is considered tobe at theGeneral Availability (GA) launchstage.The following example shows how to start a continuous query from a particularpoint in time by using theAPPENDS function, when querying aBigQuery table that is receiving streaming taxi ride information:
EXPORTDATAOPTIONS(format='CLOUD_PUBSUB',uri='https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')AS(SELECTTO_JSON_STRING(STRUCT(ride_id,timestamp,latitude,longitude))ASmessageFROMAPPENDS(TABLE`myproject.real_time_taxi_streaming.taxirides`,-- Configure the APPENDS TVF start_timestamp to specify when you want to-- start processing data using your continuous query.-- This example starts processing at 10 minutes before the current time.CURRENT_TIMESTAMP()-INTERVAL10MINUTE)WHEREride_status='enroute');
Specify a starting point earlier than the current time
If you want to process data from before the current point in time, you can usetheAPPENDS function to specify an earlier starting point for the query. Thestarting point that you specify must fall within thetime travel windowfor the table that you are selecting from. The time travel window covers thepast seven days by default.
To include data that is outside of the time travel window,use a standard query to insert or export data upto a particular point in time, and then start a continuous query from that pointin time.
Example
The following example shows how to load older data from aBigQuery table that is receiving streaming taxi ride informationup to particular point in time into a table, and then start a continuous queryfrom the cutoff point for the older data.
Run a standard query to backfill data up to a particular point in time:
INSERTINTO`myproject.real_time_taxi_streaming.transformed_taxirides`SELECTtimestamp,meter_reading,ride_status,passenger_count,ST_Distance(ST_GeogPoint(pickup_longitude,pickup_latitude),ST_GeogPoint(dropoff_longitude,dropoff_latitude))ASeuclidean_trip_distance,SAFE_DIVIDE(meter_reading,passenger_count)AScost_per_passengerFROM`myproject.real_time_taxi_streaming.taxirides`-- Include all data inserted into the table up to this point in time.-- This timestamp must be within the time travel window.FORSYSTEM_TIMEASOF'2025-01-01 00:00:00 UTC'WHEREride_status='dropoff';
Run a continuous query from the point in time at which the querystopped:
INSERTINTO`myproject.real_time_taxi_streaming.transformed_taxirides`SELECTtimestamp,meter_reading,ride_status,passenger_count,ST_Distance(ST_GeogPoint(pickup_longitude,pickup_latitude),ST_GeogPoint(dropoff_longitude,dropoff_latitude))ASeuclidean_trip_distance,SAFE_DIVIDE(meter_reading,passenger_count)AScost_per_passengerFROMAPPENDS(TABLE`myproject.real_time_taxi_streaming.taxirides`,-- Configure the APPENDS TVF start_timestamp to start processing-- data right where the batch query left off.-- This timestamp must be within the time travel window.TIMESTAMP'2025-01-01 00:00:00 UTC'+INTERVAL1MICROSECOND)WHEREride_status='dropoff';
Run a continuous query by using a user account
This section describes how to run a continuous query by using a user account.After the continuous query is running, you can close the Google Cloud console,terminal window, or application without interrupting query execution. Acontinuous query run by a user account runs for a maximum of two days and thenautomatically stops. To continue processing new incoming data, start a newcontinuous query andspecify a starting point.To automate this process, seeretry failed queries.
Follow these steps to run a continuous query:
Console
In the Google Cloud console, go to theBigQuery page.
In the query editor, clickMore.
- In theChoose query mode section, chooseContinuous query.
- ClickConfirm.
- Optional: To control how long the query runs, clickQuery settingsand set theJob timeout in milliseconds.
In the query editor, type in the SQL statement for the continuous query.The SQL statement must only containsupported operations.
ClickRun.
bq
In the Google Cloud console, activate Cloud Shell.
In Cloud Shell, run the continuous query by using the
bq querycommandwith the--continuousflag:bqquery--use_legacy_sql=false--continuous=true'QUERY'
Replace
QUERYwith the SQL statement for thecontinuous query. The SQL statement must only containsupported operations.You can control how long the query runs by using the--job_timeout_msflag.
API
Run the continuous query by calling thejobs.insert method.You must set thecontinuous field totrue in theJobConfigurationQueryof theJob resource that you pass in.You can optionally control how long the query runs by setting thejobTimeoutMs field.
curl--requestPOST\"https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs"\--header"Authorization: Bearer$(gcloudauthprint-access-token)"\--header"Content-Type: application/json; charset=utf-8"\--data'{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true}}}'\--compressed
Replace the following:
PROJECT_ID: your project ID.QUERY: the SQL statement for thecontinuous query. The SQL statement must only containsupported operations.
Run a continuous query by using a service account
This section describes how to run a continuous query by using a service account.After the continuous query is running, you can close the Google Cloud console,terminal window, or application without interrupting query execution. Acontinuous query run by using a service account can run for up to 150 days andthen automatically stops. To continue processing new incoming data, start a newcontinuous query andspecify a starting point.To automate this process, seeretry failed queries.
Follow these steps to use a service account to run a continuous query:
Console
- Create a service account.
- Grant the requiredpermissions to the service account.
In the Google Cloud console, go to theBigQuery page.
In the query editor, clickMore.
In theChoose query mode section, chooseContinuous query.
ClickConfirm.
In the query editor, clickMore>Query settings.
In theContinuous query section, use theService account boxto select the service account that you created.
Optional: To control how long the query runs, set theJob timeoutin milliseconds.
ClickSave.
In the query editor, type in the SQL statement for the continuous query.The SQL statement must only containsupported operations.
ClickRun.
bq
- Create a service account.
- Grant the requiredpermissions to the service account.
In the Google Cloud console, activate Cloud Shell.
On the command line, run the continuous query by using the
bq querycommandwith the following flags:- Set the
--continuousflag totrueto make the query continuous. - Use the
--connection_propertyflag to specify a service account to use. - Optional: Set the
--job_timeout_msflag to limit the query runtime.
bqquery--project_id=PROJECT_ID--use_legacy_sql=false\--continuous=true--connection_property=service_account=SERVICE_ACCOUNT_EMAIL\'QUERY'
Replace the following:
PROJECT_ID: your project ID.SERVICE_ACCOUNT_EMAIL: the serviceaccount email. You can get the service account email from theService accounts pageof the Google Cloud console.QUERY: the SQL statement for thecontinuous query. The SQL statement must only containsupported operations.
- Set the
API
- Create a service account.
- Grant the requiredpermissions to the service account.
Run the continuous query by calling the
jobs.insertmethod.Set the following fields in theJobConfigurationQueryresourceof theJobresource that you pass in:- Set the
continuousfield totrueto make the query continuous. - Use the
connectionPropertiesfield to specify a service account to use.
You can optionally control how long the query runs by setting the
jobTimeoutMsfieldin theJobConfigurationresource.curl--requestPOST\"https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs"\--header"Authorization: Bearer$(gcloudauthprint-access-token)"\--header"Content-Type: application/json; charset=utf-8"\--data'{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":[{"key":"service_account","value":"SERVICE_ACCOUNT_EMAIL"}]}}}'\--compressed
Replace the following:
PROJECT_ID: your project ID.QUERY: the SQL statement for thecontinuous query. The SQL statement must only containsupported operations.SERVICE_ACCOUNT_EMAIL: the serviceaccount email. You can get the service account email on theService accounts pageof the Google Cloud console.
- Set the
Create a custom job ID
Every query job is assigned a job ID that you can use to search for andmanage the job. By default, job IDs are randomly generated. To make it easierto search for the job ID of a continuous query usingjob history orjobs explorer, you can assigna custom job ID prefix:
In the Google Cloud console, go to theBigQuery page.
In the query editor, clickMore.
In theChoose query mode section, chooseContinuous query.
ClickConfirm.
In the query editor, clickMore> Query settings.
In theCustom job ID prefix section, enter a custom name prefix.
ClickSave.
Examples
The following SQL examples show common use cases for continuous queries.
Export data to a Pub/Sub topic
The following example shows a continuous query that filters data from aBigQuery table that is receiving streaming taxi ride information,and publishes the data to a Pub/Sub topic in real time withmessage attributes:
EXPORTDATAOPTIONS(format='CLOUD_PUBSUB',uri='https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')AS(SELECTTO_JSON_STRING(STRUCT(ride_id,timestamp,latitude,longitude))ASmessage,TO_JSON(STRUCT(CAST(passenger_countASSTRING)ASpassenger_count))AS_ATTRIBUTESFROMAPPENDS(TABLE`myproject.real_time_taxi_streaming.taxi_rides`,-- Configure the APPENDS TVF start_timestamp to specify when you want to-- start processing data using your continuous query.-- This example starts processing at 10 minutes before the current time.CURRENT_TIMESTAMP()-INTERVAL10MINUTE)WHEREride_status='enroute');
Export data to a Bigtable table
The following example shows a continuous query that filters data from aBigQuery table that is receiving streaming taxi ride information,and exports the data into a Bigtable table in real time:
EXPORTDATAOPTIONS(format='CLOUD_BIGTABLE',truncate=TRUE,overwrite=TRUE,uri='https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides')AS(SELECTCAST(CONCAT(ride_id,timestamp,latitude,longitude)ASSTRING)ASrowkey,STRUCT(timestamp,latitude,longitude,meter_reading,ride_status,passenger_count)ASfeaturesFROMAPPENDS(TABLE`myproject.real_time_taxi_streaming.taxirides`,-- Configure the APPENDS TVF start_timestamp to specify when you want to-- start processing data using your continuous query.-- This example starts processing at 10 minutes before the current time.CURRENT_TIMESTAMP()-INTERVAL10MINUTE)WHEREride_status='enroute');
Export data to a Spanner table
The following example shows a continuous query that filters data from aBigQuery table that is receiving streaming taxi ride information,and then exports the data into a Spanner table in real time:
EXPORTDATAOPTIONS(format='CLOUD_SPANNER',uri='https://spanner.googleapis.com/projects/myproject/instances/myspannerinstance/databases/taxi-real-time-rides',spanner_options="""{ "table": "rides", -- To ensure data is written to Spanner in the correct sequence -- during a continuous export, use the change_timestamp_column -- option. This should be mapped to a timestamp column from your -- BigQuery data. If your source data lacks a timestamp, the -- _CHANGE_TIMESTAMP pseudocolumn provided by the APPENDS function -- will be automatically mapped to the "change_timestamp" column. "change_timestamp_column": "change_timestamp" }""")AS(SELECTride_id,latitude,longitude,meter_reading,ride_status,passenger_count,_CHANGE_TIMESTAMPaschange_timestampFROMAPPENDS(TABLE`myproject.real_time_taxi_streaming.taxirides`,-- Configure the APPENDS TVF start_timestamp to specify when you want to-- start processing data using your continuous query.-- This example starts processing at 10 minutes before the current time.CURRENT_TIMESTAMP()-INTERVAL10MINUTE)WHEREride_status='enroute');
Write data to a BigQuery table
The following example shows a continuous query that filters and transforms datafrom a BigQuery table that is receiving streaming taxi rideinformation, and then writes the data to another BigQuery tablein real time. This makes the data available for further downstream analysis.
INSERTINTO`myproject.real_time_taxi_streaming.transformed_taxirides`SELECTtimestamp,meter_reading,ride_status,passenger_count,ST_Distance(ST_GeogPoint(pickup_longitude,pickup_latitude),ST_GeogPoint(dropoff_longitude,dropoff_latitude))ASeuclidean_trip_distance,SAFE_DIVIDE(meter_reading,passenger_count)AScost_per_passengerFROMAPPENDS(TABLE`myproject.real_time_taxi_streaming.taxirides`,-- Configure the APPENDS TVF start_timestamp to specify when you want to-- start processing data using your continuous query.-- This example starts processing at 10 minutes before the current time.CURRENT_TIMESTAMP()-INTERVAL10MINUTE)WHEREride_status='dropoff';
Process data by using a Vertex AI model
The following example shows a continuous query which uses aVertex AI model to generate an advertisement for taxi ridersbased on their current latitude and longitude, and then exports the resultsinto a Pub/Sub topic in real time:
EXPORTDATAOPTIONS(format='CLOUD_PUBSUB',uri='https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')AS(SELECTTO_JSON_STRING(STRUCT(ride_id,timestamp,latitude,longitude,prompt,result))ASmessageFROMAI.GENERATE_TEXT(MODEL`myproject.real_time_taxi_streaming.taxi_ml_generate_model`,(SELECTtimestamp,ride_id,latitude,longitude,CONCAT('Generate an ad based on the current latitude of ',latitude,' and longitude of ',longitude)ASpromptFROMAPPENDS(TABLE`myproject.real_time_taxi_streaming.taxirides`,-- Configure the APPENDS TVF start_timestamp to specify when you-- want to start processing data using your continuous query.-- This example starts processing at 10 minutes before the current time.CURRENT_TIMESTAMP()-INTERVAL10MINUTE)WHEREride_status='enroute'),STRUCT(50ASmax_output_tokens,1.0AStemperature,40AStop_k,1.0AStop_p))ASml_output);
Modify the SQL of a continuous query
You can't update the SQL used in a continuous query while the continuous queryjob is running. You must cancel the continuous query job, modify the SQL,and then a start a new continuous query job from the point where you stoppedthe original continuous query job.
Follow these steps to modify the SQL used in a continuous query:
- View the job details for thecontinuous query job that you want to update, and note the job ID.
- If possible, pause collection of upstream data. If you can't do this, youmight get some data duplication when the continuous query is restarted.
- Cancel the continuous querythat you want to modify.
Get the
end_timevalue for the original continuous query job by using theINFORMATION_SCHEMAJOBSview:SELECTend_timeFROM`PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECTWHEREEXTRACT(DATEFROMcreation_time)=current_date()ANDerror_result.reason='stopped'ANDjob_id='JOB_ID';
Replace the following:
PROJECT_ID: your project ID.REGION: the region used by your project.JOB_ID: the continuous query job ID that youidentified in Step 1.
Modify the continuous query SQL statement tostart the continuous query from a particular point in time,using the
end_timevalue that you retrieved in Step 5 as the startingpoint.Modify the continuous query SQL statement to reflect your needed changes.
Run the modified continuous query.
Cancel a continuous query
You cancancel a continuous queryjob just like any other job. It might take up to a minute for the query tostop running after the job is cancelled.
If you cancel and then restart a query, the restarted query behaves like a new,independent query. The restarted query doesn't start processing data where theprevious job stopped and can't reference the previous query's results. SeeStart a continuous query from a particular point in time.
Monitor queries and handle errors
A continuous query might be interrupted due to factors such as datainconsistencies, schema changes, temporary service disruptions, or maintenance.Although BigQuery handles some transient errors, best practicesfor improving job resiliency include the following:
What's next
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2025-12-15 UTC.