|
21 | 21 | exceptImportError:# Python 2.7 |
22 | 22 | importcollectionsascollections_abc |
23 | 23 |
|
| 24 | +importlogging |
| 25 | + |
24 | 26 | importsix |
25 | 27 |
|
26 | 28 | fromgoogle.cloud.bigqueryimportjob |
27 | 29 | fromgoogle.cloud.bigquery.dbapiimport_helpers |
28 | 30 | fromgoogle.cloud.bigquery.dbapiimportexceptions |
29 | 31 | importgoogle.cloud.exceptions |
30 | 32 |
|
| 33 | + |
| 34 | +_LOGGER=logging.getLogger(__name__) |
| 35 | + |
31 | 36 | # Per PEP 249: A 7-item sequence containing information describing one result |
32 | 37 | # column. The first two items (name and type_code) are mandatory, the other |
33 | 38 | # five are optional and are set to None if no meaningful values can be |
@@ -212,13 +217,74 @@ def _try_fetch(self, size=None): |
212 | 217 |
|
213 | 218 | ifself._query_dataisNone: |
214 | 219 | client=self.connection._client |
| 220 | +bqstorage_client=self.connection._bqstorage_client |
| 221 | + |
| 222 | +ifbqstorage_client: |
| 223 | +try: |
| 224 | +rows_iterable=self._bqstorage_fetch(bqstorage_client) |
| 225 | +self._query_data=_helpers.to_bq_table_rows(rows_iterable) |
| 226 | +return |
| 227 | +exceptgoogle.api_core.exceptions.Forbidden: |
| 228 | +# Don't hide errors such as insufficient permissions to create |
| 229 | +# a read session, or the API is not enabled. Both of those are |
| 230 | +# clearly problems if the developer has explicitly asked for |
| 231 | +# BigQuery Storage API support. |
| 232 | +raise |
| 233 | +exceptgoogle.api_core.exceptions.GoogleAPICallError: |
| 234 | +# There is an issue with reading from small anonymous |
| 235 | +# query results tables. If such an error occurs, we silence |
| 236 | +# it in order to try again with the tabledata.list API. |
| 237 | +_LOGGER.debug( |
| 238 | +"Error fetching data with BigQuery Storage API, " |
| 239 | +"falling back to tabledata.list API." |
| 240 | + ) |
| 241 | + |
215 | 242 | rows_iter=client.list_rows( |
216 | 243 | self._query_job.destination, |
217 | 244 | selected_fields=self._query_job._query_results.schema, |
218 | 245 | page_size=self.arraysize, |
219 | 246 | ) |
220 | 247 | self._query_data=iter(rows_iter) |
221 | 248 |
|
| 249 | +def_bqstorage_fetch(self,bqstorage_client): |
| 250 | +"""Start fetching data with the BigQuery Storage API. |
| 251 | +
|
| 252 | + The method assumes that the data about the relevant query job already |
| 253 | + exists internally. |
| 254 | +
|
| 255 | + Args: |
| 256 | + bqstorage_client(\ |
| 257 | + google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient\ |
| 258 | + ): |
| 259 | + A client tha know how to talk to the BigQuery Storage API. |
| 260 | +
|
| 261 | + Returns: |
| 262 | + Iterable[Mapping]: |
| 263 | + A sequence of rows, represented as dictionaries. |
| 264 | + """ |
| 265 | +# NOTE: Given that BQ storage client instance is passed in, it means |
| 266 | +# that bigquery_storage_v1beta1 library is available (no ImportError). |
| 267 | +fromgoogle.cloudimportbigquery_storage_v1beta1 |
| 268 | + |
| 269 | +table_reference=self._query_job.destination |
| 270 | + |
| 271 | +read_session=bqstorage_client.create_read_session( |
| 272 | +table_reference.to_bqstorage(), |
| 273 | +"projects/{}".format(table_reference.project), |
| 274 | +# only a single stream only, as DB API is not well-suited for multithreading |
| 275 | +requested_streams=1, |
| 276 | + ) |
| 277 | + |
| 278 | +ifnotread_session.streams: |
| 279 | +returniter([])# empty table, nothing to read |
| 280 | + |
| 281 | +read_position=bigquery_storage_v1beta1.types.StreamPosition( |
| 282 | +stream=read_session.streams[0], |
| 283 | + ) |
| 284 | +read_rows_stream=bqstorage_client.read_rows(read_position) |
| 285 | +rows_iterable=read_rows_stream.rows(read_session) |
| 286 | +returnrows_iterable |
| 287 | + |
222 | 288 | deffetchone(self): |
223 | 289 | """Fetch a single row from the results of the last ``execute*()`` call. |
224 | 290 |
|
|