APIs
Functions for managing downloads.
defread_sql(query:str, billing_project_id:Optional[str] =None, from_file:bool =False, reauth:bool =False, use_bqstorage_api:bool =False) -> pd.DataFrameLoad data from BigQuery using a query. Just a wrapper aroundpandas.read_gbq.
Args
query: Valid SQL Standard Query to basedosdados.billing_project_id: Project that will be billed. Find your Project IDhere.from_file: Uses the credentials from file, located in~/.basedosdados/credentials/.reauth: Re-authorize Google Cloud Project in case you need to changeuser or reset configurations.use_bqstorage_api: Use the BigQuery Storage API to download queryresults quickly, but at an increased cost.More info.To use this API, first enable it in the Cloud Console:Enable API.You must also have thebigquery.readsessions.create permission onthe project you are billing queries to.Returns
defread_table(dataset_id:str, table_id:str, billing_project_id:Optional[str] =None, query_project_id:str ="basedosdados", limit:Optional[int] =None, from_file:bool =False, reauth:bool =False, use_bqstorage_api:bool =False) -> pd.DataFrameLoad data from BigQuery usingdataset_id andtable_id.
Args
dataset_id: Dataset id available in basedosdados. It should always comewithtable_id.table_id: Table id available inbasedosdados.dataset_id. It shouldalways come withdataset_id.billing_project_id: Project that will be billed. Find your Project IDhere.query_project_id: Which project the table lives. You can change this ifyou want to query different projects.limit: Number of rows to read from table.from_file: Uses the credentials from file, located in~/.basedosdados/credentials/.reauth: Re-authorize Google Cloud Project in case you need to changeuser or reset configurations.use_bqstorage_api: Use the BigQuery Storage API to download queryresults quickly, but at an increased cost.More info.To use this API, first enable it in the Cloud Console:Enable API.You must also have thebigquery.readsessions.create permission onthe project you are billing queries to.Returns
defdownload(savepath:Union[str, Path], query:Optional[str] =None, dataset_id:Optional[str] =None, table_id:Optional[str] =None, billing_project_id:Optional[str] =None, query_project_id:str ="basedosdados", limit:Optional[int] =None, from_file:bool =False, reauth:bool =False, compression:str ="GZIP") ->NoneDownload table or query result from basedosdados BigQuery (or other).
Using aquery:
download('select * from basedosdados.br_suporte.diretorio_municipios limit 10')
Usingdataset_id & table_id:
download(dataset_id='br_suporte', table_id='diretorio_municipios')
You can also add arguments to modify save parameters:
download(dataset_id='br_suporte', table_id='diretorio_municipios', index=False, sep='|')
Args
savepath: File path to save the result. Only supports.csv.query: Valid SQL Standard Query to basedosdados. If query is available,dataset_id andtable_id are not required.dataset_id: Dataset id available in basedosdados. It should always comewithtable_id.table_id: Table id available inbasedosdados.dataset_id. It shouldalways come withdataset_id.billing_project_id: Project that will be billed. Find your Project IDhere.query_project_id: Which project the table lives. You can change this ifyou want to query different projects.limit: Number of rows.from_file: Uses the credentials from file, located in~/.basedosdados/credentials/.reauth: Re-authorize Google Cloud Project in case you need to changeuser or reset configurations.compression: Compression type. OnlyGZIP is available for now.Raises
Exception: If eithertable_id,dataset_id orquery are empty.Functions to get metadata from BD's API.
defget_datasets(dataset_id:Optional[str] =None, dataset_name:Optional[str] =None, page:int =1, page_size:int =10, backend:Optional[Backend] =None) ->list[dict]Get a list of available datasets, either bydataset_id ordataset_name.
Args
dataset_id: Dataset slug in Google BigQuery (GBQ).dataset_name: Dataset name in Base dos Dados metadata.page: Page for pagination.page_size: Page size for pagination.backend: Backend instance, injected automatically.Returns
defget_tables(dataset_id:Optional[str] =None, table_id:Optional[str] =None, table_name:Optional[str] =None, page:int =1, page_size:int =10, backend:Optional[Backend] =None) ->list[dict]Get a list of available tables, either bydataset_id,table_id ortable_name.
Args
dataset_id: Dataset slug in Google BigQuery (GBQ).table_id: Table slug in Google BigQuery (GBQ).table_name: Table name in Base dos Dados metadata.page: Page for pagination.page_size: Page size for pagination.backend: Backend instance, injected automatically.Returns
defget_columns(table_id:Optional[str] =None, column_id:Optional[str] =None, columns_name:Optional[str] =None, page:int =1, page_size:int =10, backend:Optional[Backend] =None) ->list[dict]Get a list of available columns, either bytable_id,column_id orcolumn_name.
Args
table_id: Table slug in Google BigQuery (GBQ).column_id: Column slug in Google BigQuery (GBQ).column_name: Column name in Base dos Dados metadata.page: Page for pagination.page_size: Page size for pagination.backend: Backend instance, injected automatically.Returns
defsearch(q:Optional[str] =None, page:int =1, page_size:int =10, backend:Optional[Backend] =None) ->list[dict]Search for datasets, querying all available metadata for the termq.
Args
q: Search term.page: Page for pagination.page_size: Page size for pagination.backend: Backend instance, injected automatically.Returns
Module for manage dataset to the server.
classDataset(Base)Manage datasets in BigQuery.
def__init__(dataset_id:str, **kwargs)Initializes a new instance of the class with the specified dataset ID.
Args
dataset_id: The identifier of the dataset. Hyphens in the ID will be replaced with underscores.**kwargs: Additional keyword arguments to be passed to the superclass initializer.@property@lru_cachedefdataset_config() ->dict[str,Any]Dataset config file.
defpublicize(mode:str ="all", dataset_is_public:bool =True) ->NoneChanges IAM configuration to turn BigQuery dataset public.
Args
mode: Which dataset to create [prod|staging|all].dataset_is_public: Control if prod dataset is public or not. Bydefault, staging datasets likedataset_id_staging are notpublic.defexists(mode:str ="staging") ->boolCheck if dataset exists.
defcreate(mode:str ="all", if_exists:str ="raise", dataset_is_public:bool =True, location:Optional[str] =None) ->NoneCreates BigQuery datasets givendataset_id.
It can create two datasets:
<dataset_id> (mode =prod)<dataset_id>_staging (mode =staging)Ifmode isall, it creates both.
Args
mode: Which dataset to create [prod|staging|all].if_exists: What to do if dataset existsraise: Raises Conflict exceptionreplace: Drop all tables and replace datasetupdate: Update dataset descriptionpass: Do nothingdataset_is_public: Control if prod dataset is public or not. Bydefault, staging datasets likedataset_id_staging are notpublic.location: Location of dataset data. List of possible region names:BigQuery locationsRaises
Warning: Dataset already exists and if_exists is set toraisedefdelete(mode:str ="all") ->NoneDeletes dataset in BigQuery. Toggle mode to choose which dataset todelete.
Args
mode: Which dataset to delete [prod|staging|all]defupdate(mode:str ="all", location:Optional[str] =None) ->NoneUpdate dataset description. Toggle mode to choose which dataset toupdate.
Args
mode: Which dataset to update [prod|staging|all]location: Location of dataset data. List of possible region names:BigQuery locationsClass for manage tables in Storage and BigQuery.
classTable(Base)Manage tables in Google Cloud Storage and BigQuery.
def__init__(dataset_id:str, table_id:str, **kwargs)Initializes a new instance of the class with the specified dataset and table identifiers.
Args
dataset_id: The identifier of the dataset. Hyphens will be replaced with underscores.table_id: The identifier of the table. Hyphens will be replaced with underscores.**kwargs: Additional keyword arguments to be passed to the superclass initializer.Attributes
table_id: The sanitized table identifier (hyphens replaced with underscores).dataset_id: The sanitized dataset identifier (hyphens replaced with underscores).table_full_name: Dictionary containing fully qualified table names for different environments:@property@lru_cache(256)deftable_config() ->dict[str,Any]Load table config.
deftable_exists(mode:str) ->boolCheck if table exists in BigQuery.
Args
mode: Which dataset to check [prod|staging].defcreate(path:Optional[Union[str, Path]] =None, source_format:str ="csv", csv_delimiter:str =",", csv_skip_leading_rows:int =1, csv_allow_jagged_rows:bool =False, if_table_exists:str ="raise", if_storage_data_exists:str ="raise", if_dataset_exists:str ="pass", dataset_is_public:bool =True, location:Optional[str] =None, chunk_size:Optional[int] =None, biglake_table:bool =False, set_biglake_connection_permissions:bool =True) ->NoneCreates a BigQuery table in the staging dataset.
If a path is provided, data is automatically saved in storage,and a datasets folder and BigQuery location are created, in addition tocreating the table and its configuration files.
The new table is located at<dataset_id>_staging.<table_id> inBigQuery.
Data can be found in Storage at<bucket_name>/staging/<dataset_id>/<table_id>/* and is used to buildthe table.
The following data types are supported:
Data can also be partitioned following the Hive partitioning scheme<key1>=<value1>/<key2>=<value2>. For example,year=2012/country=BR. The partition is automatically detected bysearching forpartitions in thetable_config.yaml file.
Args
path: The path to the file to be uploaded to create the table.source_format: The format of the data source. Only 'csv', 'avro',and 'parquet' are supported. Defaults to 'csv'.csv_delimiter: The separator for fields in a CSV file. Theseparator can be any ISO-8859-1 single-byte character. Defaultsto ','.csv_skip_leading_rows: The number of rows at the top of a CSV filethat BigQuery will skip when loading the data. Defaults to 1.csv_allow_jagged_rows: Indicates if BigQuery should allow extravalues that are not represented in the table schema. Defaults toFalse.if_table_exists: Determines what to do if the table already exists:raise: Raises a Conflict exceptionreplace: Replaces the tablepass: Does nothingif_storage_data_exists: Determines what to do if the data alreadyexists on your bucket:raise: Raises a Conflict exceptionreplace: Replaces the tablepass: Does nothingif_dataset_exists: Determines what to do if the dataset alreadyexists:raise: Raises a Conflict exceptionreplace: Replaces the datasetpass: Does nothingdataset_is_public: Controls if the prod dataset is public or not. Bydefault, staging datasets likedataset_id_staging are notpublic.location: The location of the dataset data. List of possible regionnames:BigQuery locationschunk_size: The size of a chunk of data whenever iterating (inbytes). This must be a multiple of 256 KB per the APIspecification. If not specified, the chunk_size of the blobitself is used. If that is not specified, a default value of 40MB is used.biglake_table: Sets this as a BigLake table. BigLake tables allowend-users to query from external data (such as GCS) even if theydon't have access to the source data. IAM is managed like anyother BigQuery native table. SeeBigLake introfor more on BigLake.set_biglake_connection_permissions: If set toTrue, attempts togrant the BigLake connection service account access to thetable's data in GCS.defupdate(mode:str ="prod", custom_schema:Optional[list[dict[str,str]]] =None) ->NoneUpdates BigQuery schema and description.
Args
mode: Table of which table to update [prod].not_found_ok: What to do if table is not found.defpublish(if_exists:str ="raise", custom_publish_sql:Optional[str] =None, custom_schema:Optional[list[dict[str,str]]] =None) ->NoneCreates BigQuery table at production dataset.
Table should be located at<dataset_id>.<table_id>.
It creates a view that uses the query from<metadata_path>/<dataset_id>/<table_id>/publish.sql.
Make sure that all columns from the query also exist at<metadata_path>/<dataset_id>/<table_id>/table_config.sql, includingthe partitions.
Args
if_exists: What to do if table exists.raise: Raises Conflict exceptionreplace: Replace tablepass: Do nothingdefdelete(mode:str ="all") ->NoneDeletes table in BigQuery.
Args
mode: Table of which table to delete [prod|staging].defappend(filepath:Union[str, Path], partitions:Optional[Union[str,dict[str,str]]] =None, if_exists:str ="replace", chunk_size:Optional[int] =None, **upload_args) ->NoneAppends new data to existing BigQuery table.
As long as the data has the same schema. It appends the data in thefilepath to the existing table.
Args
filepath: Where to find the file that you want to upload to create atable with.partitions: Hive structured partition as a string or dict.<key>=<value>/<key2>=<value2>dict(key=value, key2=value2)if_exists: What to do if data with same name exists in storage.raise: Raises Conflict exceptionreplace: Replace tablepass: Do nothingchunk_size: The size of a chunk of data whenever iterating (inbytes). This must be a multiple of 256 KB per the APIspecification. If not specified, the chunk_size of the blobitself is used. If that is not specified, a default value of 40MB is used.Class for managing the files in Google Cloud Storage.
classStorage(Base)Manage files on Google Cloud Storage.
def__init__(dataset_id:str, table_id:str, **kwargs)Initializes the storage upload class with the specified dataset and table identifiers.
Args
dataset_id: The identifier of the dataset. Hyphens will be replaced with underscores.table_id: The identifier of the table. Hyphens will be replaced with underscores.**kwargs: Additional keyword arguments to pass to the superclass initializer.Attributes
bucket: The storage bucket object used for staging.dataset_id: The normalized dataset identifier.table_id: The normalized table identifier.definit(replace:bool =False, very_sure:bool =False) ->NoneInitialize bucket and folders.
Folder should be:
raw : contains raw datastaging : preprocessed data ready to upload to BigQueryArgs
replace: Whether to replace if bucket already exists.very_sure: Are you aware that everything will be erased if youreplace the bucket?Raises
Warning:very_sure argument is still False.defupload(path:Union[str, Path], mode:str ="all", partitions:Optional[Union[str,dict[str,str]]] =None, if_exists:str ="raise", chunk_size:Optional[int] =None, **upload_args) ->NoneUpload to storage at<bucket_name>/<mode>/<dataset_id>/<table_id>.
You can:
path = <file_path>path = <folder_path>. The folder should only containfiles, not folders.path = <folder_path>. This folder must followthe hive partitioning scheme, e.g.<table_id>/<key>=<value>/<key2>=<value2>/<partition>.csvRemember all files must follow a single schema. Otherwise, thingsmight fail in the future.
Modes:
raw : raw files from datasourcestaging : pre-treated files ready to upload to BigQueryheader: header of the tablesauxiliary_files: auxiliary files from each tablearchitecture: architecture sheet of the tablesall: if no treatment is needed, useall.Args
path: Where to find the file or folder to upload to storage.mode: Folder of which dataset to update[raw|staging|header|auxiliary_files|architecture|all]partitions: If adding a single file, use this to add it to aspecific partition. Can be a string or dict.if_exists: What to do if data exists.raise: Raises Conflict exceptionreplace: Replace tablepass: Do nothingchunk_size: The size of a chunk of data when iterating (in bytes).upload_args: Extra arguments accepted bygoogle.cloud.storage.blob.Blob.upload_from_filedefdownload(filename:str ="*", savepath:Union[Path,str] = Path("."), partitions:Optional[Union[str,dict[str,str]]] =None, mode:str ="staging", if_not_exists:str ="raise") ->NoneDownload files from Google Storage from pathmode/dataset_id/table_id/partitions/filename and replicate folderhierarchy on save.
Modes:
raw : raw files from datasourcestaging : pre-treated files ready to upload to BigQueryheader: header of the tablesauxiliary_files: auxiliary files from each tablearchitecture: architecture sheet of the tablesYou can use thepartitions argument to choose files from a partition.
Args
filename: Specify which file to download. If"*", downloads allfiles within the bucket folder. Defaults to"*".savepath: Where to save the data on your computer. Must be a path toa directory.partitions: If downloading a single file, use this to specify thepartition path from which to download. Can be a string<key>=<value>/<key2>=<value2> or dictdict(key=value, key2=value2).mode: Folder of which dataset to update.[raw|staging|header|auxiliary_files|architecture]if_not_exists: What to do if data not found.raise: Raises FileNotFoundError.pass: Do nothing and exit the function.Raises
FileNotFoundError: If the given path<mode>/<dataset_id>/<table_id>/<partitions>/<filename> could not be found or there areno files to download.defdelete_file(filename:str, mode:str, partitions:Optional[Union[str,dict[str,str]]] =None, not_found_ok:bool =False) ->NoneDelete file from path<bucket_name>/<mode>/<dataset_id>/<table_id>/<partitions>/<filename>.
Args
filename: Name of the file to be deleted.mode: Folder of which dataset to update[raw|staging|header|auxiliary_files|architecture|all]partitions: Hive structured partition as a string<key>=<value>/<key2>=<value2> or dictdict(key=value, key2=value2).not_found_ok: What to do if file not found.defdelete_table(mode:str ="staging", bucket_name:Optional[str] =None, not_found_ok:bool =False) ->NoneDelete a table from storage, sends request in batches.
Args
mode: Folder of which dataset to update[raw|staging|header|auxiliary_files|architecture]bucket_name: The bucket name from which to delete the table. IfNone, defaults to the bucket initialized when instantiating theStorage object.not_found_ok: What to do if table not found.Raises
FileNotFoundError: If the requested table could not be found.defcopy_table(source_bucket_name:str ="basedosdados", destination_bucket_name:Optional[str] =None, mode:str ="staging", new_table_id:Optional[str] =None) ->NoneCopy table from a source bucket to your bucket, sends request inbatches.
Args
source_bucket_name: The bucket name from which to copy data. You canchange it to copy from another external bucket.destination_bucket_name: The bucket name where data will be copiedto. If None, defaults to the bucket initialized wheninstantiating the Storage object. You can check it with theStorage().bucket property.mode: Folder of which dataset to update[raw|staging|header|auxiliary_files|architecture]new_table_id: New table id to be copied to. If None, defaults to thetable id initialized when instantiating the Storage object.Base dos Dados