- Notifications
You must be signed in to change notification settings - Fork126
Separate Session related functionality from Connection class#571
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes fromall commits
f97c81dfe0af8718f8f67fd8decb1a92b771b9a50a0bf2794ff351650df486a63b10c3f2b3fd553f16aba0267510d6995c923bbb68df8c33bcf5994500dd0b510b454634faa9a32862b88b728ded04584ff842d79190a33File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -45,6 +45,7 @@ | ||
| from databricks.sql.types import Row, SSLOptions | ||
| from databricks.sql.auth.auth import get_python_sql_connector_auth_provider | ||
| from databricks.sql.experimental.oauth_persistence import OAuthPersistence | ||
| from databricks.sql.session import Session | ||
| from databricks.sql.thrift_api.TCLIService.ttypes import ( | ||
| TSparkParameter, | ||
| @@ -224,66 +225,28 @@ def read(self) -> Optional[OAuthToken]: | ||
| access_token_kv = {"access_token": access_token} | ||
| kwargs = {**kwargs, **access_token_kv} | ||
| self.disable_pandas = kwargs.get("_disable_pandas", False) | ||
| self.lz4_compression = kwargs.get("enable_query_result_lz4_compression", True) | ||
| self.use_cloud_fetch = kwargs.get("use_cloud_fetch", True) | ||
| self._cursors = [] # type: List[Cursor] | ||
| # Create the session | ||
| self.session = Session( | ||
| server_hostname, | ||
| http_path, | ||
| http_headers, | ||
| session_configuration, | ||
| catalog, | ||
| schema, | ||
| _use_arrow_native_complex_types, | ||
| **kwargs, | ||
| ) | ||
| self.session.open() | ||
| logger.info( | ||
| "Successfully opened connection with session " | ||
| + str(self.get_session_id_hex()) | ||
| ) | ||
| self.use_inline_params = self._set_use_inline_params_with_warning( | ||
| kwargs.get("use_inline_params", False) | ||
| @@ -342,34 +305,32 @@ def __del__(self): | ||
| logger.debug("Couldn't close unclosed connection: {}".format(e.message)) | ||
| def get_session_id(self): | ||
| """Get the session ID from the Session object""" | ||
| return self.session.get_id() | ||
| def get_session_id_hex(self): | ||
| """Get the session ID in hex format from the Session object""" | ||
| return self.session.get_id_hex() | ||
| @staticmethod | ||
| def server_parameterized_queries_enabled(protocolVersion): | ||
| """Delegate to Session class static method""" | ||
| return Session.server_parameterized_queries_enabled(protocolVersion) | ||
| @property | ||
| def protocol_version(self): | ||
| """Get the protocol version from the Session object""" | ||
| return self.session.protocol_version | ||
| @staticmethod | ||
| def get_protocol_version(openSessionResp): | ||
| """Delegate to Session class static method""" | ||
| return Session.get_protocol_version(openSessionResp) | ||
| @property | ||
| def open(self) -> bool: | ||
| """Return whether the connection is open by checking if the session is open.""" | ||
| return self.session.is_open | ||
Comment on lines +320 to +333 Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. nit: should we group together property and staticmethod?@jprakash-db any coding/lint guidelines OSS python driver follows? Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. There are no specific standard in python, because saying something as private etc has no meaning, we can access anything anytime. There are some general standards but nothing concrete | ||
| def cursor( | ||
| self, | ||
| @@ -386,7 +347,7 @@ def cursor( | ||
| cursor = Cursor( | ||
| self, | ||
| self.session.thrift_backend, | ||
| arraysize=arraysize, | ||
| result_buffer_size_bytes=buffer_size_bytes, | ||
| ) | ||
| @@ -402,28 +363,10 @@ def _close(self, close_cursors=True) -> None: | ||
| for cursor in self._cursors: | ||
| cursor.close() | ||
| try: | ||
| self.session.close() | ||
| except Exception as e: | ||
| logger.error(f"Attempt to close session raised an exception: {e}") | ||
| def commit(self): | ||
| """No-op because Databricks does not support transactions""" | ||
| @@ -833,7 +776,7 @@ def execute( | ||
| self._close_and_clear_active_result_set() | ||
| execute_response = self.thrift_backend.execute_command( | ||
| operation=prepared_operation, | ||
| session_handle=self.connection.session.get_handle(), | ||
| max_rows=self.arraysize, | ||
| max_bytes=self.buffer_size_bytes, | ||
| lz4_compression=self.connection.lz4_compression, | ||
| @@ -896,7 +839,7 @@ def execute_async( | ||
| self._close_and_clear_active_result_set() | ||
| self.thrift_backend.execute_command( | ||
| operation=prepared_operation, | ||
| session_handle=self.connection.session.get_handle(), | ||
| max_rows=self.arraysize, | ||
| max_bytes=self.buffer_size_bytes, | ||
| lz4_compression=self.connection.lz4_compression, | ||
| @@ -992,7 +935,7 @@ def catalogs(self) -> "Cursor": | ||
| self._check_not_closed() | ||
| self._close_and_clear_active_result_set() | ||
| execute_response = self.thrift_backend.get_catalogs( | ||
| session_handle=self.connection.session.get_handle(), | ||
| max_rows=self.arraysize, | ||
| max_bytes=self.buffer_size_bytes, | ||
| cursor=self, | ||
| @@ -1018,7 +961,7 @@ def schemas( | ||
| self._check_not_closed() | ||
| self._close_and_clear_active_result_set() | ||
| execute_response = self.thrift_backend.get_schemas( | ||
| session_handle=self.connection.session.get_handle(), | ||
| max_rows=self.arraysize, | ||
| max_bytes=self.buffer_size_bytes, | ||
| cursor=self, | ||
| @@ -1051,7 +994,7 @@ def tables( | ||
| self._close_and_clear_active_result_set() | ||
| execute_response = self.thrift_backend.get_tables( | ||
| session_handle=self.connection.session.get_handle(), | ||
| max_rows=self.arraysize, | ||
| max_bytes=self.buffer_size_bytes, | ||
| cursor=self, | ||
| @@ -1086,7 +1029,7 @@ def columns( | ||
| self._close_and_clear_active_result_set() | ||
| execute_response = self.thrift_backend.get_columns( | ||
| session_handle=self.connection.session.get_handle(), | ||
| max_rows=self.arraysize, | ||
| max_bytes=self.buffer_size_bytes, | ||
| cursor=self, | ||
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,160 @@ | ||||||||||||
| importlogging | ||||||||||||
| fromtypingimportDict,Tuple,List,Optional,Any | ||||||||||||
| fromdatabricks.sql.thrift_api.TCLIServiceimportttypes | ||||||||||||
| fromdatabricks.sql.typesimportSSLOptions | ||||||||||||
| fromdatabricks.sql.auth.authimportget_python_sql_connector_auth_provider | ||||||||||||
| fromdatabricks.sql.excimportSessionAlreadyClosedError,DatabaseError,RequestError | ||||||||||||
| fromdatabricks.sqlimport__version__ | ||||||||||||
| fromdatabricks.sqlimportUSER_AGENT_NAME | ||||||||||||
| fromdatabricks.sql.thrift_backendimportThriftBackend | ||||||||||||
| logger=logging.getLogger(__name__) | ||||||||||||
| classSession: | ||||||||||||
| def__init__( | ||||||||||||
| self, | ||||||||||||
| server_hostname:str, | ||||||||||||
varun-edachali-dbx marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||||||||||||
| http_path:str, | ||||||||||||
| http_headers:Optional[List[Tuple[str,str]]]=None, | ||||||||||||
| session_configuration:Optional[Dict[str,Any]]=None, | ||||||||||||
| catalog:Optional[str]=None, | ||||||||||||
| schema:Optional[str]=None, | ||||||||||||
| _use_arrow_native_complex_types:Optional[bool]=True, | ||||||||||||
| **kwargs, | ||||||||||||
Comment on lines +17 to +25 Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. nit
ContributorAuthor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. I agree, that makes sense. Shouldn't we name it Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. I believe Session is an internal abstraction and these params originate from Connection so better to name ConnectionParams. Additionally, i think this change might break a lot of things. Let's do it completely separately (let's log a JIRA ticket for now and take it up later) ContributorAuthor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. | ||||||||||||
| )->None: | ||||||||||||
| """ | ||||||||||||
| Create a session to a Databricks SQL endpoint or a Databricks cluster. | ||||||||||||
| This class handles all session-related behavior and communication with the backend. | ||||||||||||
| """ | ||||||||||||
| self.is_open=False | ||||||||||||
| self.host=server_hostname | ||||||||||||
| self.port=kwargs.get("_port",443) | ||||||||||||
| self.session_configuration=session_configuration | ||||||||||||
| self.catalog=catalog | ||||||||||||
| self.schema=schema | ||||||||||||
| auth_provider=get_python_sql_connector_auth_provider( | ||||||||||||
| server_hostname,**kwargs | ||||||||||||
| ) | ||||||||||||
| user_agent_entry=kwargs.get("user_agent_entry") | ||||||||||||
| ifuser_agent_entryisNone: | ||||||||||||
| user_agent_entry=kwargs.get("_user_agent_entry") | ||||||||||||
| ifuser_agent_entryisnotNone: | ||||||||||||
| logger.warning( | ||||||||||||
| "[WARN] Parameter '_user_agent_entry' is deprecated; use 'user_agent_entry' instead. " | ||||||||||||
| "This parameter will be removed in the upcoming releases." | ||||||||||||
| ) | ||||||||||||
| ifuser_agent_entry: | ||||||||||||
| useragent_header="{}/{} ({})".format( | ||||||||||||
| USER_AGENT_NAME,__version__,user_agent_entry | ||||||||||||
| ) | ||||||||||||
| else: | ||||||||||||
| useragent_header="{}/{}".format(USER_AGENT_NAME,__version__) | ||||||||||||
| base_headers= [("User-Agent",useragent_header)] | ||||||||||||
| self._ssl_options=SSLOptions( | ||||||||||||
| # Double negation is generally a bad thing, but we have to keep backward compatibility | ||||||||||||
| tls_verify=notkwargs.get( | ||||||||||||
| "_tls_no_verify",False | ||||||||||||
| ),# by default - verify cert and host | ||||||||||||
| tls_verify_hostname=kwargs.get("_tls_verify_hostname",True), | ||||||||||||
| tls_trusted_ca_file=kwargs.get("_tls_trusted_ca_file"), | ||||||||||||
| tls_client_cert_file=kwargs.get("_tls_client_cert_file"), | ||||||||||||
| tls_client_cert_key_file=kwargs.get("_tls_client_cert_key_file"), | ||||||||||||
| tls_client_cert_key_password=kwargs.get("_tls_client_cert_key_password"), | ||||||||||||
| ) | ||||||||||||
| self.thrift_backend=ThriftBackend( | ||||||||||||
| self.host, | ||||||||||||
| self.port, | ||||||||||||
| http_path, | ||||||||||||
| (http_headersor [])+base_headers, | ||||||||||||
| auth_provider, | ||||||||||||
| ssl_options=self._ssl_options, | ||||||||||||
| _use_arrow_native_complex_types=_use_arrow_native_complex_types, | ||||||||||||
| **kwargs, | ||||||||||||
| ) | ||||||||||||
| self._handle=None | ||||||||||||
| self.protocol_version=None | ||||||||||||
| defopen(self)->None: | ||||||||||||
| self._open_session_resp=self.thrift_backend.open_session( | ||||||||||||
| self.session_configuration,self.catalog,self.schema | ||||||||||||
| ) | ||||||||||||
| self._handle=self._open_session_resp.sessionHandle | ||||||||||||
| self.protocol_version=self.get_protocol_version(self._open_session_resp) | ||||||||||||
| self.is_open=True | ||||||||||||
| logger.info("Successfully opened session "+str(self.get_id_hex())) | ||||||||||||
| @staticmethod | ||||||||||||
| defget_protocol_version(openSessionResp): | ||||||||||||
| """ | ||||||||||||
| Since the sessionHandle will sometimes have a serverProtocolVersion, it takes | ||||||||||||
| precedence over the serverProtocolVersion defined in the OpenSessionResponse. | ||||||||||||
| """ | ||||||||||||
Comment on lines +101 to +102 Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Suggested change
i think there is a line gap after a multi-line pydoc.@jprakash-db do we follow any python coding guidelines like for docstring:https://peps.python.org/pep-0257/ Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. in databricks we follow thishttps://databricks.atlassian.net/wiki/spaces/UN/pages/3334538555/Python+Guidelines+go+py but this could be different for OSS repo. Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. We use theblack formatter which follows the PEP-257 style. Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. got it. is there a linter? in the CI or do we have to run the linter manually? Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Currently we have | ||||||||||||
| if ( | ||||||||||||
| openSessionResp.sessionHandle | ||||||||||||
| andhasattr(openSessionResp.sessionHandle,"serverProtocolVersion") | ||||||||||||
| andopenSessionResp.sessionHandle.serverProtocolVersion | ||||||||||||
| ): | ||||||||||||
| returnopenSessionResp.sessionHandle.serverProtocolVersion | ||||||||||||
| returnopenSessionResp.serverProtocolVersion | ||||||||||||
| @staticmethod | ||||||||||||
| defserver_parameterized_queries_enabled(protocolVersion): | ||||||||||||
| if ( | ||||||||||||
| protocolVersion | ||||||||||||
| andprotocolVersion>=ttypes.TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V8 | ||||||||||||
| ): | ||||||||||||
| returnTrue | ||||||||||||
| else: | ||||||||||||
| returnFalse | ||||||||||||
| defget_handle(self): | ||||||||||||
| returnself._handle | ||||||||||||
| defget_id(self): | ||||||||||||
| handle=self.get_handle() | ||||||||||||
| ifhandleisNone: | ||||||||||||
| returnNone | ||||||||||||
| returnself.thrift_backend.handle_to_id(handle) | ||||||||||||
| defget_id_hex(self): | ||||||||||||
| handle=self.get_handle() | ||||||||||||
| ifhandleisNone: | ||||||||||||
| returnNone | ||||||||||||
| returnself.thrift_backend.handle_to_hex_id(handle) | ||||||||||||
| defclose(self)->None: | ||||||||||||
| """Close the underlying session.""" | ||||||||||||
| logger.info(f"Closing session{self.get_id_hex()}") | ||||||||||||
| ifnotself.is_open: | ||||||||||||
| logger.debug("Session appears to have been closed already") | ||||||||||||
| return | ||||||||||||
| try: | ||||||||||||
| self.thrift_backend.close_session(self.get_handle()) | ||||||||||||
| exceptRequestErrorase: | ||||||||||||
| ifisinstance(e.args[1],SessionAlreadyClosedError): | ||||||||||||
| logger.info("Session was closed by a prior request") | ||||||||||||
| exceptDatabaseErrorase: | ||||||||||||
| if"Invalid SessionHandle"instr(e): | ||||||||||||
| logger.warning( | ||||||||||||
| f"Attempted to close session that was already closed:{e}" | ||||||||||||
| ) | ||||||||||||
| else: | ||||||||||||
| logger.warning( | ||||||||||||
| f"Attempt to close session raised an exception at the server:{e}" | ||||||||||||
| ) | ||||||||||||
| exceptExceptionase: | ||||||||||||
| logger.error(f"Attempt to close session raised a local exception:{e}") | ||||||||||||
| self.is_open=False | ||||||||||||
Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. when opening session the flag is set at the end which makes sense. for closing session call, should we be eager to unset the flag in the very beginning? ContributorAuthor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. What if the session close fails? Shouldn't the session remain open? Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. if the client code has called on the Session class to close the session, then the client assumes that method will close the session. I think unsetting the flag right away makes more sense then. However, an interesting question is do we use this flag internally in Session class to make unsetting meaningful (i.e., when flag is false, do we give null or throw exception when getting session handle?) ContributorAuthor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Currently there does not seem to exist such a dependency, but I'm still not clear on this. If the close() call raises an exception isn't the client expected to retry? ContributorAuthor
| ||||||||||||
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.