Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5718d48

Browse files
committed
answers
1 parentb646aae commit5718d48

File tree

2 files changed

+17
-11
lines changed

2 files changed

+17
-11
lines changed

‎consumer.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@
3434
# 'auto.offset.reset=earliest' to start reading from the beginning of the
3535
# topic if no committed offsets exist
3636
consumer=Consumer({
37-
# Exercise: Add the consumer setting
38-
# Make sure you add a consumer group and choose to reset offset to earliest
37+
'bootstrap.servers':'localhost:32768,localhost:32769,localhost:32770',
38+
'group.id':'python_example_group_1',
39+
'auto.offset.reset':'earliest',
3940
})
4041

4142
# Subscribe to topic
@@ -57,10 +58,14 @@
5758
elifmsg.error():
5859
print('error: {}'.format(msg.error()))
5960
else:
60-
61-
# Exercise: Read and print the key and value
62-
pass
63-
61+
# Check for Kafka message
62+
record_key=msg.key()
63+
record_value=msg.value()
64+
print(record_value)
65+
data=json.loads(record_value)
66+
print("Consumed record with key {} and value {},\
67+
and updated total count to {}"
68+
.format(record_key,record_value,0))
6469
exceptKeyboardInterrupt:
6570
pass
6671
finally:

‎producer.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939

4040
# Create Producer instance
4141
producer=Producer({
42-
# Exercise: Add your producer configuration here
42+
'bootstrap.servers':'localhost:32768,localhost:32769,localhost:32770'
4343
})
4444

4545
delivered_records=0
@@ -65,10 +65,11 @@ def acked(err, msg):
6565
forninrange(10):
6666
city=random.choice(cities)
6767
temp=random.choice(temps)
68-
69-
# Exercise: Create a Json with the city and temp
70-
# Send a message to kafka with the city as key and the json as value
71-
68+
j=json.dumps({'city':city,'temp':temp})
69+
record_key=city
70+
record_value=json.dumps(j)
71+
print("Producing record: {}\t{}".format(record_key,record_value))
72+
producer.produce(topic,key=record_key,value=record_value,on_delivery=acked)
7273
# p.poll() serves delivery reports (on_delivery)
7374
# from previous produce() calls.
7475
producer.poll(0)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp