Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit806e5f5

Browse files
SEA: Decouple Link Fetching (#632)
* test getting the list of allowed configurationsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* reduce diffSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* reduce diffSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* house constants in enums for readability and immutabilitySigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* add note on hybrid dispositionSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* [squashed from cloudfetch-sea] introduce external links + arrow functionalitySigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* reduce responsibility of QueueSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* reduce repetition in arrow tablee creationSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* reduce redundant code in CloudFetchQueueSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* move chunk link progression to separate funcSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove redundant logSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* improve loggingSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove reliance on schema_bytes in SEASigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove redundant note on arrow_schema_bytesSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* use more fetch methodsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove redundant schema_bytes from parent constructorSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* only call get_chunk_link with non null chunk indexSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* align SeaResultSet structure with ThriftResultSetSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remvoe _fill_result_buffer from SeaResultSetSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* reduce code repetitionSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* align SeaResultSet with ext-links-seaSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove redundant methodsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* update unit testsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove accidental venv changesSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* pre-fetch next chunk link on processing currentSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* reduce nestingSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* line break after multi line pydocSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* re-introduce schema_bytes for better abstraction (likely temporary)Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* add fetchmany_arrow and fetchall_arrowSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove accidental changes in sea backend testsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove irrelevant changesSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove un-necessary test changesSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove un-necessary changes in thrift backend testsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove unimplemented methods testSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove unimplemented method testsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* modify example scripts to include fetch callsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* add GetChunksResponseSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove changes to sea testSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* re-introduce accidentally removed description extraction methodSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* fix type errors (ssl_options, CHUNK_PATH_WITH_ID..., etc.)Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* access ssl_options through connectionSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* DEBUG levelSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove explicit multi chunk testSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* move cloud fetch queues back into utils.pySigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove excess docstringsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* move ThriftCloudFetchQueue above SeaCloudFetchQueueSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* fix sea connector testsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* correct patch module path in cloud fetch queue testsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove unimplemented methods testSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* correct add_link docstringSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove invalid importSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* better align queries with JDBC implSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* line breaks after multi-line PRsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove unused importsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* fix: introduce ExecuteResponse importSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove unimplemented metadata methods test, un-necessary importsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* introduce unit tests for metadata methodsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove verbosity in ResultSetFilter docstringCo-authored-by: jayant <167047871+jayantsing-db@users.noreply.github.com>* remove un-necessary info in ResultSetFilter docstringSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove explicit type checking, string literals around forward annotationsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* house SQL commands in constantsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* convert complex types to string if not _use_arrow_native_complex_typesSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* introduce unit tests for altered functionalitySigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* Revert "Merge branch 'fetch-json-inline' into ext-links-sea"This reverts commitdabba55, reversingchanges made todd7dc6a.Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* reduce verbosity of ResultSetFilter docstringSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove unused importsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* Revert "Merge branch 'fetch-json-inline' into ext-links-sea"This reverts commit3a999c0, reversingchanges made toa1f9b9c.* Revert "reduce verbosity of ResultSetFilter docstring"This reverts commita1f9b9c.* Reapply "Merge branch 'fetch-json-inline' into ext-links-sea"This reverts commit48ad7b3.* Revert "Merge branch 'fetch-json-inline' into ext-links-sea"This reverts commitdabba55, reversingchanges made todd7dc6a.* remove un-necessary filters changesSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove un-necessary backend changesSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove constants changesSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove changes in filters testsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove unit test backend and JSON queue changesSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove changes in sea result set testingSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* Revert "remove changes in sea result set testing"This reverts commitd210ccd.* Revert "remove unit test backend and JSON queue changes"This reverts commitf6c5950.* Revert "remove changes in filters tests"This reverts commitf3f795a.* Revert "remove constants changes"This reverts commit802d045.* Revert "remove un-necessary backend changes"This reverts commit20822e4.* Revert "remove un-necessary filters changes"This reverts commit5e75fb5.* remove unused importsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* working versionSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* adopy _wait_until_command_doneSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* introduce metadata commandsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* use new backend structureSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* constrain backend diffSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove changes to filtersSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* make _parse methods in models internalSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* reduce changes in unit testsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* run small queries with SEA during integration testsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* run some tests for seaSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* allow empty schema bytes for alignment with SEASigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* pass is_vl_op to Sea backend ExecuteResponseSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove catalog requirement in get_tablesSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* move filters.py to SEA utilsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* ensure SeaResultSetSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* prevent circular importsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove unused importsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove cast, throw error if not SeaResultSetSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* pass param as TSparkParameterValueSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove failing test (temp)Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove SeaResultSet type assertionSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* change errors to align with spec, instead of arbitrary ValueErrorSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* make SEA backend methods return SeaResultSetSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* use spec-aligned Exceptions in SEA backendSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove defensive row type checkSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* raise ProgrammingError for invalid idSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* make is_volume_operation strict boolSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove complex types codeSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* Revert "remove complex types code"This reverts commit138359d.* introduce type conversion for primitive types for JSON + INLINESigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove SEA running on metadata queries (known failuresSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove un-necessary docstringsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* align expected types with databricks sdkSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* link rest api reference to validate typesSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove test_catalogs_returns_arrow_table testmetadata commands not expected to passSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* fix fetchall_arrow and fetchmany_arrowSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove thrift aligned test_cancel_during_execute from SEA testsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove un-necessary changes in example scriptsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove un-necessary chagnes in example scriptsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* _convert_json_table -> _create_json_tableSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove accidentally removed testSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove new unit tests (to be re-added based on new arch)Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove changes in sea_result_set functionality (to be re-added)Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* introduce more integration testsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove SEA tests in parameterized queriesSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove partial parameter fix changesSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove un-necessary timestamp tests(pass with minor disparity)Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* slightly stronger typing of _convert_json_typesSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* stronger typing of json utility func sSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* stronger typing of fetch*_jsonSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove unused helper methods in SqlTypeSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* line breaks after multi line pydocs, remove excess logsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* line breaks after multi line pydocs, reduce diff of redundant changesSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* reduce diff of redundant changesSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* mandate ResultData in SeaResultSet constructorSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove complex type conversionSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* correct fetch*_arrowSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* recover old sea testsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* move queue and result set into SEA specific dirSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* pass ssl_options into CloudFetchQueueSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* reduce diffSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove redundant conversion.pySigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* fix type issuesSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* ValueError not ProgrammingErrorSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* reduce diffSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* introduce SEA cloudfetch e2e testsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* allow empty cloudfetch resultSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* add unit tests for CloudFetchQueue and SeaResultSetSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* skip pyarrow dependent testsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* simplify download process: no pre-fetchingSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* correct class name in logsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* align with old implSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* align next_n_rows with prev impleSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* align remaining_rows with prev implSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove un-necessary Optional paramsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove un-necessary changes in thrift field if testsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove unused importsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* init hybrid* run large queriesSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* hybrid dispositionSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove un-ncessary logSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* formatting (black)Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove redundant testsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* multi frame decompression of lz4Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* ensure no compression (temp)Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* introduce separate link fetcherSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* log time to create tableSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* add chunk index to table creation time logSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove custom multi-frame decompressor for lz4Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove excess logs* remove redundant tests (temp)Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* add link to download manager before notifying consumerSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* move link fetching immediately before table creation so link expiry is not an issueSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* resolve merge artifactsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove redundant methodsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* formatting (black)Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* introduce callback to handle link expirySigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* fix typesSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* fix param type in unit testsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* formatting + minor type fixesSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* Revert "introduce callback to handle link expiry"This reverts commitbd51b1c.* remove unused callback (to be introduced later)Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* correct param extractionSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove common constructor for databricks client abcSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* make SEA Http Client instance a private memberSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* make GetChunksResponse model more robustSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* add link to doc of GetChunk response modelSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* pass result_data instead of "initial links" into SeaCloudFetchQueueSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* move download_manager init into parent CloudFetchQueueSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* raise ServerOperationError for no 0th chunkSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* unused iportsSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* return None in case of empty resposeSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* ensure table is empty on no initial link sSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* account for total chunk countSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* iterate by chunk index instead of linkSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* make LinkFetcher convert link staticSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* add helper for link addition, check for edge case to prevent inf waitSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* add unit tests for LinkFetcherSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove un-necessary download manager checkSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove un-necessary string literals around param typeSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* remove duplicate download_manager initSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* account for empty response in LinkFetcher initSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* make get_chunk_link return mandatory ExternalLinkSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* set shutdown_event instead of breaking on completion so get_chunk_link is informedSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* docstrings, logging, pydocSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* use total_chunk_cound > 0Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* clarify that link has already been submitted on getting row_offsetSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* return None for out of rangeSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>* default link_fetcher to NoneSigned-off-by: varun-edachali-dbx <varun.edachali@databricks.com>---------Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent8fbca9d commit806e5f5

File tree

2 files changed

+371
-89
lines changed

2 files changed

+371
-89
lines changed

‎src/databricks/sql/backend/sea/queue.py‎

Lines changed: 201 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
from __future__importannotations
22

33
fromabcimportABC
4-
fromtypingimportList,Optional,Tuple,Union,TYPE_CHECKING
4+
importthreading
5+
fromtypingimportDict,List,Optional,Tuple,Union,TYPE_CHECKING
56

67
fromdatabricks.sql.cloudfetch.download_managerimportResultFileDownloadManager
78

@@ -121,6 +122,179 @@ def close(self):
121122
return
122123

123124

125+
classLinkFetcher:
126+
"""
127+
Background helper that incrementally retrieves *external links* for a
128+
result set produced by the SEA backend and feeds them to a
129+
:class:`databricks.sql.cloudfetch.download_manager.ResultFileDownloadManager`.
130+
131+
The SEA backend splits large result sets into *chunks*. Each chunk is
132+
stored remotely (e.g., in object storage) and exposed via a signed URL
133+
encapsulated by an :class:`ExternalLink`. Only the first batch of links is
134+
returned with the initial query response. The remaining links must be
135+
pulled on demand using the *next-chunk* token embedded in each
136+
:pyattr:`ExternalLink.next_chunk_index`.
137+
138+
LinkFetcher takes care of this choreography so callers (primarily
139+
``SeaCloudFetchQueue``) can simply ask for the link of a specific
140+
``chunk_index`` and block until it becomes available.
141+
142+
Key responsibilities:
143+
144+
• Maintain an in-memory mapping from ``chunk_index`` → ``ExternalLink``.
145+
• Launch a background worker thread that continuously requests the next
146+
batch of links from the backend until all chunks have been discovered or
147+
an unrecoverable error occurs.
148+
• Bridge SEA link objects to the Thrift representation expected by the
149+
existing download manager.
150+
• Provide a synchronous API (`get_chunk_link`) that blocks until the desired
151+
link is present in the cache.
152+
"""
153+
154+
def__init__(
155+
self,
156+
download_manager:ResultFileDownloadManager,
157+
backend:SeaDatabricksClient,
158+
statement_id:str,
159+
initial_links:List[ExternalLink],
160+
total_chunk_count:int,
161+
):
162+
self.download_manager=download_manager
163+
self.backend=backend
164+
self._statement_id=statement_id
165+
166+
self._shutdown_event=threading.Event()
167+
168+
self._link_data_update=threading.Condition()
169+
self._error:Optional[Exception]=None
170+
self.chunk_index_to_link:Dict[int,ExternalLink]= {}
171+
172+
self._add_links(initial_links)
173+
self.total_chunk_count=total_chunk_count
174+
175+
# DEBUG: capture initial state for observability
176+
logger.debug(
177+
"LinkFetcher[%s]: initialized with %d initial link(s); expecting %d total chunk(s)",
178+
statement_id,
179+
len(initial_links),
180+
total_chunk_count,
181+
)
182+
183+
def_add_links(self,links:List[ExternalLink]):
184+
"""Cache *links* locally and enqueue them with the download manager."""
185+
logger.debug(
186+
"LinkFetcher[%s]: caching %d link(s) – chunks %s",
187+
self._statement_id,
188+
len(links),
189+
", ".join(str(l.chunk_index)forlinlinks)iflinkselse"<none>",
190+
)
191+
forlinkinlinks:
192+
self.chunk_index_to_link[link.chunk_index]=link
193+
self.download_manager.add_link(LinkFetcher._convert_to_thrift_link(link))
194+
195+
def_get_next_chunk_index(self)->Optional[int]:
196+
"""Return the next *chunk_index* that should be requested from the backend, or ``None`` if we have them all."""
197+
withself._link_data_update:
198+
max_chunk_index=max(self.chunk_index_to_link.keys(),default=None)
199+
ifmax_chunk_indexisNone:
200+
return0
201+
max_link=self.chunk_index_to_link[max_chunk_index]
202+
returnmax_link.next_chunk_index
203+
204+
def_trigger_next_batch_download(self)->bool:
205+
"""Fetch the next batch of links from the backend and return *True* on success."""
206+
logger.debug(
207+
"LinkFetcher[%s]: requesting next batch of links",self._statement_id
208+
)
209+
next_chunk_index=self._get_next_chunk_index()
210+
ifnext_chunk_indexisNone:
211+
returnFalse
212+
213+
try:
214+
links=self.backend.get_chunk_links(self._statement_id,next_chunk_index)
215+
withself._link_data_update:
216+
self._add_links(links)
217+
self._link_data_update.notify_all()
218+
exceptExceptionase:
219+
logger.error(
220+
f"LinkFetcher: Error fetching links for chunk{next_chunk_index}:{e}"
221+
)
222+
withself._link_data_update:
223+
self._error=e
224+
self._link_data_update.notify_all()
225+
returnFalse
226+
227+
logger.debug(
228+
"LinkFetcher[%s]: received %d new link(s)",
229+
self._statement_id,
230+
len(links),
231+
)
232+
returnTrue
233+
234+
defget_chunk_link(self,chunk_index:int)->Optional[ExternalLink]:
235+
"""Return (blocking) the :class:`ExternalLink` associated with *chunk_index*."""
236+
logger.debug(
237+
"LinkFetcher[%s]: waiting for link of chunk %d",
238+
self._statement_id,
239+
chunk_index,
240+
)
241+
ifchunk_index>=self.total_chunk_count:
242+
returnNone
243+
244+
withself._link_data_update:
245+
whilechunk_indexnotinself.chunk_index_to_link:
246+
ifself._error:
247+
raiseself._error
248+
ifself._shutdown_event.is_set():
249+
raiseProgrammingError(
250+
"LinkFetcher is shutting down without providing link for chunk index {}".format(
251+
chunk_index
252+
)
253+
)
254+
self._link_data_update.wait()
255+
256+
returnself.chunk_index_to_link[chunk_index]
257+
258+
@staticmethod
259+
def_convert_to_thrift_link(link:ExternalLink)->TSparkArrowResultLink:
260+
"""Convert SEA external links to Thrift format for compatibility with existing download manager."""
261+
# Parse the ISO format expiration time
262+
expiry_time=int(dateutil.parser.parse(link.expiration).timestamp())
263+
returnTSparkArrowResultLink(
264+
fileLink=link.external_link,
265+
expiryTime=expiry_time,
266+
rowCount=link.row_count,
267+
bytesNum=link.byte_count,
268+
startRowOffset=link.row_offset,
269+
httpHeaders=link.http_headersor {},
270+
)
271+
272+
def_worker_loop(self):
273+
"""Entry point for the background thread."""
274+
logger.debug("LinkFetcher[%s]: worker thread started",self._statement_id)
275+
whilenotself._shutdown_event.is_set():
276+
links_downloaded=self._trigger_next_batch_download()
277+
ifnotlinks_downloaded:
278+
self._shutdown_event.set()
279+
logger.debug("LinkFetcher[%s]: worker thread exiting",self._statement_id)
280+
self._link_data_update.notify_all()
281+
282+
defstart(self):
283+
"""Spawn the worker thread."""
284+
logger.debug("LinkFetcher[%s]: starting worker thread",self._statement_id)
285+
self._worker_thread=threading.Thread(
286+
target=self._worker_loop,name=f"LinkFetcher-{self._statement_id}"
287+
)
288+
self._worker_thread.start()
289+
290+
defstop(self):
291+
"""Signal the worker thread to stop and wait for its termination."""
292+
logger.debug("LinkFetcher[%s]: stopping worker thread",self._statement_id)
293+
self._shutdown_event.set()
294+
self._worker_thread.join()
295+
logger.debug("LinkFetcher[%s]: worker thread stopped",self._statement_id)
296+
297+
124298
classSeaCloudFetchQueue(CloudFetchQueue):
125299
"""Queue implementation for EXTERNAL_LINKS disposition with ARROW format for SEA backend."""
126300

@@ -158,80 +332,49 @@ def __init__(
158332
description=description,
159333
)
160334

161-
self._sea_client=sea_client
162-
self._statement_id=statement_id
163-
self._total_chunk_count=total_chunk_count
164-
165335
logger.debug(
166336
"SeaCloudFetchQueue: Initialize CloudFetch loader for statement {}, total chunks: {}".format(
167337
statement_id,total_chunk_count
168338
)
169339
)
170340

171341
initial_links=result_data.external_linksor []
172-
self._chunk_index_to_link= {link.chunk_index:linkforlinkininitial_links}
173342

174343
# Track the current chunk we're processing
175344
self._current_chunk_index=0
176-
first_link=self._chunk_index_to_link.get(self._current_chunk_index,None)
177-
ifnotfirst_link:
178-
# possibly an empty response
179-
returnNone
180345

181-
# Track the current chunk we're processing
182-
self._current_chunk_index=0
183-
# Initialize table and position
184-
self.table=self._create_table_from_link(first_link)
346+
self.link_fetcher=None# for empty responses, we do not need a link fetcher
347+
iftotal_chunk_count>0:
348+
self.link_fetcher=LinkFetcher(
349+
download_manager=self.download_manager,
350+
backend=sea_client,
351+
statement_id=statement_id,
352+
initial_links=initial_links,
353+
total_chunk_count=total_chunk_count,
354+
)
355+
self.link_fetcher.start()
185356

186-
def_convert_to_thrift_link(self,link:ExternalLink)->TSparkArrowResultLink:
187-
"""Convert SEA external links to Thrift format for compatibility with existing download manager."""
188-
# Parse the ISO format expiration time
189-
expiry_time=int(dateutil.parser.parse(link.expiration).timestamp())
190-
returnTSparkArrowResultLink(
191-
fileLink=link.external_link,
192-
expiryTime=expiry_time,
193-
rowCount=link.row_count,
194-
bytesNum=link.byte_count,
195-
startRowOffset=link.row_offset,
196-
httpHeaders=link.http_headersor {},
197-
)
357+
# Initialize table and position
358+
self.table=self._create_next_table()
198359

199-
def_get_chunk_link(self,chunk_index:int)->Optional["ExternalLink"]:
200-
ifchunk_index>=self._total_chunk_count:
360+
def_create_next_table(self)->Union["pyarrow.Table",None]:
361+
"""Create next table by retrieving the logical next downloaded file."""
362+
ifself.link_fetcherisNone:
201363
returnNone
202364

203-
ifchunk_indexnotinself._chunk_index_to_link:
204-
links=self._sea_client.get_chunk_links(self._statement_id,chunk_index)
205-
self._chunk_index_to_link.update({l.chunk_index:lforlinlinks})
206-
207-
link=self._chunk_index_to_link.get(chunk_index,None)
208-
ifnotlink:
209-
raiseServerOperationError(
210-
f"Error fetching link for chunk{chunk_index}",
211-
{
212-
"operation-id":self._statement_id,
213-
"diagnostic-info":None,
214-
},
215-
)
216-
returnlink
217-
218-
def_create_table_from_link(
219-
self,link:ExternalLink
220-
)->Union["pyarrow.Table",None]:
221-
"""Create a table from a link."""
222-
223-
thrift_link=self._convert_to_thrift_link(link)
224-
self.download_manager.add_link(thrift_link)
365+
chunk_link=self.link_fetcher.get_chunk_link(self._current_chunk_index)
366+
ifchunk_linkisNone:
367+
returnNone
225368

226-
row_offset=link.row_offset
369+
row_offset=chunk_link.row_offset
370+
# NOTE: link has already been submitted to download manager at this point
227371
arrow_table=self._create_table_at_offset(row_offset)
228372

373+
self._current_chunk_index+=1
374+
229375
returnarrow_table
230376

231-
def_create_next_table(self)->Union["pyarrow.Table",None]:
232-
"""Create next table by retrieving the logical next downloaded file."""
233-
self._current_chunk_index+=1
234-
next_chunk_link=self._get_chunk_link(self._current_chunk_index)
235-
ifnotnext_chunk_link:
236-
returnNone
237-
returnself._create_table_from_link(next_chunk_link)
377+
defclose(self):
378+
super().close()
379+
ifself.link_fetcher:
380+
self.link_fetcher.stop()

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp