Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit23b604f

Browse files
committed
Debug log send/recv bytes from protocol parser
1 parent9016c02 commit23b604f

File tree

2 files changed

+15
-8
lines changed

2 files changed

+15
-8
lines changed

‎kafka/conn.py‎

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -274,9 +274,7 @@ def __init__(self, host, port, afi, **configs):
274274
# can use a simple dictionary of correlation_id => request data
275275
self.in_flight_requests=dict()
276276

277-
self._protocol=KafkaProtocol(
278-
client_id=self.config['client_id'],
279-
api_version=self.config['api_version'])
277+
self._protocol=self._new_protocol_parser()
280278
self.state=ConnectionStates.DISCONNECTED
281279
self._reset_reconnect_backoff()
282280
self._sock=None
@@ -295,6 +293,12 @@ def __init__(self, host, port, afi, **configs):
295293
self.config['metric_group_prefix'],
296294
self.node_id)
297295

296+
def_new_protocol_parser(self):
297+
returnKafkaProtocol(
298+
ident='%s:%d'% (self.host,self.port),
299+
client_id=self.config['client_id'],
300+
api_version=self.config['api_version'])
301+
298302
def_init_sasl_mechanism(self):
299303
ifself.config['security_protocol']in ('SASL_PLAINTEXT','SASL_SSL'):
300304
self._sasl_mechanism=get_sasl_mechanism(self.config['sasl_mechanism'])(host=self.host,**self.config)
@@ -934,9 +938,7 @@ def close(self, error=None):
934938
self._api_versions_future=None
935939
self._sasl_auth_future=None
936940
self._init_sasl_mechanism()
937-
self._protocol=KafkaProtocol(
938-
client_id=self.config['client_id'],
939-
api_version=self.config['api_version'])
941+
self._protocol=self._new_protocol_parser()
940942
self._send_buffer=b''
941943
iferrorisNone:
942944
error=Errors.Cancelled(str(self))

‎kafka/protocol/parser.py‎

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ class KafkaProtocol(object):
2222
Currently only used to check for 0.8.2 protocol quirks, but
2323
may be used for more in the future.
2424
"""
25-
def__init__(self,client_id=None,api_version=None):
25+
def__init__(self,client_id=None,api_version=None,ident=''):
26+
self._ident=ident
2627
ifclient_idisNone:
2728
client_id=self._gen_client_id()
2829
self._client_id=client_id
@@ -53,7 +54,7 @@ def send_request(self, request, correlation_id=None):
5354
Returns:
5455
correlation_id
5556
"""
56-
log.debug('Sending request %s',request)
57+
log.debug('Sending request %s',request.__class__.__name__)
5758
ifcorrelation_idisNone:
5859
correlation_id=self._next_correlation_id()
5960

@@ -71,6 +72,8 @@ def send_bytes(self):
7172
"""Retrieve all pending bytes to send on the network"""
7273
data=b''.join(self.bytes_to_send)
7374
self.bytes_to_send= []
75+
ifdata:
76+
log.debug('%s Send: %r',self._ident,data)
7477
returndata
7578

7679
defreceive_bytes(self,data):
@@ -92,6 +95,8 @@ def receive_bytes(self, data):
9295
i=0
9396
n=len(data)
9497
responses= []
98+
ifdata:
99+
log.debug('%s Recv: %r',self._ident,data)
95100
whilei<n:
96101

97102
# Not receiving is the state of reading the payload header

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp