Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Bring Python telemetry event model consistent with JDBC#701

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Open
nikhilsuri-db wants to merge4 commits intomain
base:main
Choose a base branch
Loading
fromPECOBLR-1048
Open
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletionsrc/databricks/sql/client.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -9,6 +9,7 @@
import json
import os
import decimal
from urllib.parse import urlparse
from uuid import UUID

from databricks.sql import __version__
Expand DownExpand Up@@ -322,6 +323,20 @@ def read(self) -> Optional[OAuthToken]:
session_id_hex=self.get_session_id_hex()
)

# Determine proxy usage
use_proxy = self.http_client.using_proxy()
proxy_host_info = None
if (
use_proxy
and self.http_client.proxy_uri
and isinstance(self.http_client.proxy_uri, str)
):
parsed = urlparse(self.http_client.proxy_uri)
proxy_host_info = HostDetails(
host_url=parsed.hostname or self.http_client.proxy_uri,
port=parsed.port or 8080,
)

driver_connection_params = DriverConnectionParameters(
http_path=http_path,
mode=DatabricksClientType.SEA
Expand All@@ -331,13 +346,31 @@ def read(self) -> Optional[OAuthToken]:
auth_mech=TelemetryHelper.get_auth_mechanism(self.session.auth_provider),
auth_flow=TelemetryHelper.get_auth_flow(self.session.auth_provider),
socket_timeout=kwargs.get("_socket_timeout", None),
azure_workspace_resource_id=kwargs.get("azure_workspace_resource_id", None),
azure_tenant_id=kwargs.get("azure_tenant_id", None),
use_proxy=use_proxy,
use_system_proxy=use_proxy,
proxy_host_info=proxy_host_info,
use_cf_proxy=False, # CloudFlare proxy not yet supported in Python
cf_proxy_host_info=None, # CloudFlare proxy not yet supported in Python
non_proxy_hosts=None,
allow_self_signed_support=kwargs.get("_tls_no_verify", False),
use_system_trust_store=True, # Python uses system SSL by default
enable_arrow=pyarrow is not None,
enable_direct_results=True, # Always enabled in Python
enable_sea_hybrid_results=kwargs.get("use_hybrid_disposition", False),
http_connection_pool_size=kwargs.get("pool_maxsize", None),
rows_fetched_per_block=DEFAULT_ARRAY_SIZE,
async_poll_interval_millis=2000, # Default polling interval
support_many_parameters=True, # Native parameters supported
enable_complex_datatype_support=_use_arrow_native_complex_types,
allowed_volume_ingestion_paths=self.staging_allowed_local_path,
)

self._telemetry_client.export_initial_telemetry_log(
driver_connection_params=driver_connection_params,
user_agent=self.session.useragent_header,
)
self.staging_allowed_local_path = kwargs.get("staging_allowed_local_path", None)

def _set_use_inline_params_with_warning(self, value: Union[bool, str]):
"""Valid values are True, False, and "silent"
Expand Down
5 changes: 5 additions & 0 deletionssrc/databricks/sql/common/unified_http_client.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -301,6 +301,11 @@ def using_proxy(self) -> bool:
"""Check if proxy support is available (not whether it's being used for a specific request)."""
return self._proxy_pool_manager is not None

@property
def proxy_uri(self) -> Optional[str]:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Are there any security implications of exposing the proxy URI?

"""Get the configured proxy URI, if any."""
return self._proxy_uri

def close(self):
"""Close the underlying connection pools."""
if self._direct_pool_manager:
Expand Down
109 changes: 108 additions & 1 deletionsrc/databricks/sql/telemetry/models/event.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -38,6 +38,25 @@ class DriverConnectionParameters(JsonSerializableMixin):
auth_mech (AuthMech): The authentication mechanism used
auth_flow (AuthFlow): The authentication flow type
socket_timeout (int): Connection timeout in milliseconds
azure_workspace_resource_id (str): Azure workspace resource ID
azure_tenant_id (str): Azure tenant ID
use_proxy (bool): Whether proxy is being used
use_system_proxy (bool): Whether system proxy is being used
proxy_host_info (HostDetails): Proxy host details if configured
use_cf_proxy (bool): Whether CloudFlare proxy is being used
cf_proxy_host_info (HostDetails): CloudFlare proxy host details if configured
non_proxy_hosts (list): List of hosts that bypass proxy
allow_self_signed_support (bool): Whether self-signed certificates are allowed
use_system_trust_store (bool): Whether system trust store is used
enable_arrow (bool): Whether Arrow format is enabled
enable_direct_results (bool): Whether direct results are enabled
enable_sea_hybrid_results (bool): Whether SEA hybrid results are enabled
http_connection_pool_size (int): HTTP connection pool size
rows_fetched_per_block (int): Number of rows fetched per block
async_poll_interval_millis (int): Async polling interval in milliseconds
support_many_parameters (bool): Whether many parameters are supported
enable_complex_datatype_support (bool): Whether complex datatypes are supported
allowed_volume_ingestion_paths (str): Allowed paths for volume ingestion
"""

http_path: str
Expand All@@ -46,6 +65,25 @@ class DriverConnectionParameters(JsonSerializableMixin):
auth_mech: Optional[AuthMech] = None
auth_flow: Optional[AuthFlow] = None
socket_timeout: Optional[int] = None
azure_workspace_resource_id: Optional[str] = None
azure_tenant_id: Optional[str] = None
use_proxy: Optional[bool] = None
use_system_proxy: Optional[bool] = None
proxy_host_info: Optional[HostDetails] = None
use_cf_proxy: Optional[bool] = None
cf_proxy_host_info: Optional[HostDetails] = None
non_proxy_hosts: Optional[list] = None
allow_self_signed_support: Optional[bool] = None
use_system_trust_store: Optional[bool] = None
enable_arrow: Optional[bool] = None
enable_direct_results: Optional[bool] = None
enable_sea_hybrid_results: Optional[bool] = None
http_connection_pool_size: Optional[int] = None
rows_fetched_per_block: Optional[int] = None
async_poll_interval_millis: Optional[int] = None
support_many_parameters: Optional[bool] = None
enable_complex_datatype_support: Optional[bool] = None
allowed_volume_ingestion_paths: Optional[str] = None


@dataclass
Expand DownExpand Up@@ -111,6 +149,69 @@ class DriverErrorInfo(JsonSerializableMixin):
stack_trace: str


@dataclass
class ChunkDetails(JsonSerializableMixin):
"""
Contains detailed metrics about chunk downloads during result fetching.

These metrics are accumulated across all chunk downloads for a single statement.

Attributes:
initial_chunk_latency_millis (int): Latency of the first chunk download
slowest_chunk_latency_millis (int): Latency of the slowest chunk download
total_chunks_present (int): Total number of chunks available
total_chunks_iterated (int): Number of chunks actually downloaded
sum_chunks_download_time_millis (int): Total time spent downloading all chunks
"""

initial_chunk_latency_millis: Optional[int] = None
slowest_chunk_latency_millis: Optional[int] = None
total_chunks_present: Optional[int] = None
total_chunks_iterated: Optional[int] = None
sum_chunks_download_time_millis: Optional[int] = None


@dataclass
class ResultLatency(JsonSerializableMixin):
"""
Contains latency metrics for different phases of query execution.

This tracks two distinct phases:
1. result_set_ready_latency_millis: Time from query submission until results are available (execute phase)
- Set when execute() completes
2. result_set_consumption_latency_millis: Time spent iterating/fetching results (fetch phase)
- Measured from first fetch call until no more rows available
- In Java: tracked via markResultSetConsumption(hasNext) method
- Records start time on first fetch, calculates total on last fetch

Attributes:
result_set_ready_latency_millis (int): Time until query results are ready (execution phase)
result_set_consumption_latency_millis (int): Time spent fetching/consuming results (fetch phase)

"""

result_set_ready_latency_millis: Optional[int] = None
result_set_consumption_latency_millis: Optional[int] = None


@dataclass
class OperationDetail(JsonSerializableMixin):
"""
Contains detailed information about the operation being performed.

Attributes:
n_operation_status_calls (int): Number of status polling calls made
operation_status_latency_millis (int): Total latency of all status calls
operation_type (str): Specific operation type (e.g., EXECUTE_STATEMENT, LIST_TABLES, CANCEL_STATEMENT)
is_internal_call (bool): Whether this is an internal driver operation
"""

n_operation_status_calls: Optional[int] = None
operation_status_latency_millis: Optional[int] = None
operation_type: Optional[str] = None
is_internal_call: Optional[bool] = None


@dataclass
class SqlExecutionEvent(JsonSerializableMixin):
"""
Expand All@@ -122,14 +223,20 @@ class SqlExecutionEvent(JsonSerializableMixin):
is_compressed (bool): Whether the result is compressed
execution_result (ExecutionResultFormat): Format of the execution result
retry_count (int): Number of retry attempts made
chunk_id (int): ID of the chunk if applicable
chunk_id (int): ID of the chunk if applicable (used for error tracking)
chunk_details (ChunkDetails): Aggregated chunk download metrics
result_latency (ResultLatency): Latency breakdown by execution phase
operation_detail (OperationDetail): Detailed operation information
"""

statement_type: StatementType
is_compressed: bool
execution_result: ExecutionResultFormat
retry_count: Optional[int]
chunk_id: Optional[int]
chunk_details: Optional[ChunkDetails] = None
result_latency: Optional[ResultLatency] = None
operation_detail: Optional[OperationDetail] = None


@dataclass
Expand Down
2 changes: 1 addition & 1 deletionsrc/databricks/sql/telemetry/telemetry_client.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -380,7 +380,7 @@ class TelemetryClientFactory:
# Shared flush thread for all clients
_flush_thread = None
_flush_event = threading.Event()
_flush_interval_seconds =90
_flush_interval_seconds =300 # 5 minutes

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This is align with JDBC configuration


DEFAULT_BATCH_SIZE = 100

Expand Down
Loading
Loading

[8]ページ先頭

©2009-2025 Movatter.jp