Expand Up @@ -210,6 +210,9 @@ class Client(ClientWithProject): default_query_job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]): Default ``QueryJobConfig``. Will be merged into job configs passed into the ``query`` method. default_load_job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]): Default ``LoadJobConfig``. Will be merged into job configs passed into the ``load_table_*`` methods. client_info (Optional[google.api_core.client_info.ClientInfo]): The client info used to send a user-agent string along with API requests. If ``None``, then default info will be used. Generally, Expand All @@ -235,6 +238,7 @@ def __init__( _http=None, location=None, default_query_job_config=None, default_load_job_config=None, client_info=None, client_options=None, ) -> None: Expand All @@ -260,6 +264,7 @@ def __init__( self._connection = Connection(self, **kw_args) self._location = location self._default_query_job_config = copy.deepcopy(default_query_job_config) self._default_load_job_config = copy.deepcopy(default_load_job_config) @property def location(self): Expand All @@ -277,6 +282,17 @@ def default_query_job_config(self): def default_query_job_config(self, value: QueryJobConfig): self._default_query_job_config = copy.deepcopy(value) @property def default_load_job_config(self): """Default ``LoadJobConfig``. Will be merged into job configs passed into the ``load_table_*`` methods. """ return self._default_load_job_config @default_load_job_config.setter def default_load_job_config(self, value: LoadJobConfig): self._default_load_job_config = copy.deepcopy(value) def close(self): """Close the underlying transport objects, releasing system resources. Expand Down Expand Up @@ -2330,8 +2346,8 @@ def load_table_from_uri( Raises: TypeError: If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig` class. If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig` class. """ job_id = _make_job_id(job_id, job_id_prefix) Expand All @@ -2348,11 +2364,14 @@ def load_table_from_uri( destination = _table_arg_to_table_ref(destination, default_project=self.project) if job_config: job_config = copy.deepcopy(job_config) _verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig) if job_config is not None: _verify_job_config_type(job_config, LoadJobConfig) else: job_config = job.LoadJobConfig() load_job = job.LoadJob(job_ref, source_uris, destination, self, job_config) new_job_config = job_config._fill_from_default(self._default_load_job_config) load_job = job.LoadJob(job_ref, source_uris, destination, self, new_job_config) load_job._begin(retry=retry, timeout=timeout) return load_job Expand Down Expand Up @@ -2424,8 +2443,8 @@ def load_table_from_file( mode. TypeError: If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig` class. If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig` class. """ job_id = _make_job_id(job_id, job_id_prefix) Expand All @@ -2437,10 +2456,15 @@ def load_table_from_file( destination = _table_arg_to_table_ref(destination, default_project=self.project) job_ref = job._JobReference(job_id, project=project, location=location) if job_config: job_config = copy.deepcopy(job_config) _verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig) load_job = job.LoadJob(job_ref, None, destination, self, job_config) if job_config is not None: _verify_job_config_type(job_config, LoadJobConfig) else: job_config = job.LoadJobConfig() new_job_config = job_config._fill_from_default(self._default_load_job_config) load_job = job.LoadJob(job_ref, None, destination, self, new_job_config) job_resource = load_job.to_api_repr() if rewind: Expand Down Expand Up @@ -2564,43 +2588,40 @@ def load_table_from_dataframe( If a usable parquet engine cannot be found. This method requires :mod:`pyarrow` to be installed. TypeError: If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig` class. If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig` class. """ job_id = _make_job_id(job_id, job_id_prefix) if job_config: _verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig) # Make a copy so that the job config isn't modified in-place. job_config_properties = copy.deepcopy(job_config._properties) job_config = job.LoadJobConfig() job_config._properties = job_config_properties if job_config is not None: _verify_job_config_type(job_config, LoadJobConfig) else: job_config = job.LoadJobConfig() new_job_config = job_config._fill_from_default(self._default_load_job_config) supported_formats = {job.SourceFormat.CSV, job.SourceFormat.PARQUET} ifjob_config .source_format is None: ifnew_job_config .source_format is None: # default value job_config .source_format = job.SourceFormat.PARQUETnew_job_config .source_format = job.SourceFormat.PARQUET if ( job_config .source_format == job.SourceFormat.PARQUET andjob_config .parquet_options is None new_job_config .source_format == job.SourceFormat.PARQUET andnew_job_config .parquet_options is None ): parquet_options = ParquetOptions() # default value parquet_options.enable_list_inference = True job_config .parquet_options = parquet_optionsnew_job_config .parquet_options = parquet_options ifjob_config .source_format not in supported_formats: ifnew_job_config .source_format not in supported_formats: raise ValueError( "Got unexpected source_format: '{}'. Currently, only PARQUET and CSV are supported".format( job_config .source_formatnew_job_config .source_format ) ) if pyarrow is None andjob_config .source_format == job.SourceFormat.PARQUET: if pyarrow is None andnew_job_config .source_format == job.SourceFormat.PARQUET: # pyarrow is now the only supported parquet engine. raise ValueError("This method requires pyarrow to be installed") Expand All @@ -2611,8 +2632,8 @@ def load_table_from_dataframe( # schema, and check if dataframe schema is compatible with it - except # for WRITE_TRUNCATE jobs, the existing schema does not matter then. if ( notjob_config .schema andjob_config .write_disposition != job.WriteDisposition.WRITE_TRUNCATE notnew_job_config .schema andnew_job_config .write_disposition != job.WriteDisposition.WRITE_TRUNCATE ): try: table = self.get_table(destination) Expand All @@ -2623,7 +2644,7 @@ def load_table_from_dataframe( name for name, _ in _pandas_helpers.list_columns_and_indexes(dataframe) ) job_config .schema = [new_job_config .schema = [ # Field description and policy tags are not needed to # serialize a data frame. SchemaField( Expand All @@ -2637,11 +2658,11 @@ def load_table_from_dataframe( if field.name in columns_and_indexes ] job_config .schema = _pandas_helpers.dataframe_to_bq_schema( dataframe,job_config .schema new_job_config .schema = _pandas_helpers.dataframe_to_bq_schema( dataframe,new_job_config .schema ) if notjob_config .schema: if notnew_job_config .schema: # the schema could not be fully detected warnings.warn( "Schema could not be detected for all columns. Loading from a " Expand All @@ -2652,13 +2673,13 @@ def load_table_from_dataframe( ) tmpfd, tmppath = tempfile.mkstemp( suffix="_job_{}.{}".format(job_id[:8],job_config .source_format.lower()) suffix="_job_{}.{}".format(job_id[:8],new_job_config .source_format.lower()) ) os.close(tmpfd) try: ifjob_config .source_format == job.SourceFormat.PARQUET: ifnew_job_config .source_format == job.SourceFormat.PARQUET: if _PYARROW_VERSION in _PYARROW_BAD_VERSIONS: msg = ( "Loading dataframe data in PARQUET format with pyarrow " Expand All @@ -2669,13 +2690,13 @@ def load_table_from_dataframe( ) warnings.warn(msg, category=RuntimeWarning) ifjob_config .schema: ifnew_job_config .schema: if parquet_compression == "snappy": # adjust the default value parquet_compression = parquet_compression.upper() _pandas_helpers.dataframe_to_parquet( dataframe, job_config .schema,new_job_config .schema, tmppath, parquet_compression=parquet_compression, parquet_use_compliant_nested_type=True, Expand Down Expand Up @@ -2715,7 +2736,7 @@ def load_table_from_dataframe( job_id_prefix=job_id_prefix, location=location, project=project, job_config=job_config , job_config=new_job_config , timeout=timeout, ) Expand Down Expand Up @@ -2791,22 +2812,22 @@ def load_table_from_json( Raises: TypeError: If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig` class. If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig` class. """ job_id = _make_job_id(job_id, job_id_prefix) if job_config: _verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig) # Make a copy so that the job config isn't modified in-place. job_config = copy.deepcopy(job_config) if job_config is not None: _verify_job_config_type(job_config, LoadJobConfig) else: job_config = job.LoadJobConfig() job_config.source_format = job.SourceFormat.NEWLINE_DELIMITED_JSON new_job_config = job_config._fill_from_default(self._default_load_job_config) new_job_config.source_format = job.SourceFormat.NEWLINE_DELIMITED_JSON ifjob_config .schema is None: job_config .autodetect = True ifnew_job_config .schema is None: new_job_config .autodetect = True if project is None: project = self.project Expand All @@ -2828,7 +2849,7 @@ def load_table_from_json( job_id_prefix=job_id_prefix, location=location, project=project, job_config=job_config , job_config=new_job_config , timeout=timeout, ) Expand Down