Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb646aae

Browse files
committed
Exercises
0 parents  commitb646aae

File tree

8 files changed

+260
-0
lines changed

8 files changed

+260
-0
lines changed

‎.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
*.pyc
2+
ccloud-venv/
3+
.idea
4+
venv

‎Dockerfile

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
FROM python:3.7-slim
2+
3+
COPY requirements.txt /tmp/requirements.txt
4+
RUN pip3 install -U -r /tmp/requirements.txt
5+
6+
COPY *.py ./

‎README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#Overview
2+
3+
Produce messages to and consume messages from a Kafka cluster using[Confluent Python Client for Apache Kafka](https://github.com/confluentinc/confluent-kafka-python).
4+
5+
##Install requirements
6+
7+
`pip3 install -r requirements.txt`
8+
9+
##Run the file
10+
11+
`python3 <file>.py`
12+
13+
##Exercises
14+
15+
Fill in the commented out code in the producer and consumer files. Answers are in the`answers` branch

‎avroconsumer.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
fromconfluent_kafka.avroimportAvroConsumer
2+
fromconfluent_kafka.avro.serializerimportSerializerError
3+
4+
5+
c=AvroConsumer({
6+
'bootstrap.servers':'localhost:32772,localhost:32773,localhost:32774',
7+
'group.id':'avro-consumer',
8+
'schema.registry.url':'http://localhost:8081',
9+
'auto.offset.reset':'earliest'
10+
})
11+
12+
c.subscribe(['my_topic'])
13+
14+
whileTrue:
15+
try:
16+
msg=c.poll(1.0)
17+
18+
exceptSerializerErrorase:
19+
print("Message deserialization failed for {}: {}".format(msg,e))
20+
break
21+
22+
ifmsgisNone:
23+
continue
24+
25+
ifmsg.error():
26+
print("AvroConsumer error: {}".format(msg.error()))
27+
continue
28+
29+
print(msg.value())
30+
31+
c.close()

‎avroproducer.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
fromconfluent_kafkaimportavro
2+
fromconfluent_kafka.avroimportAvroProducer
3+
4+
5+
value_schema_str="""
6+
{
7+
"namespace": "my.test",
8+
"name": "value",
9+
"type": "record",
10+
"fields" : [
11+
{
12+
"name" : "name",
13+
"type" : "string"
14+
}
15+
]
16+
}
17+
"""
18+
19+
key_schema_str="""
20+
{
21+
"namespace": "my.test",
22+
"name": "key",
23+
"type": "record",
24+
"fields" : [
25+
{
26+
"name" : "name",
27+
"type" : "string"
28+
}
29+
]
30+
}
31+
"""
32+
33+
value_schema=avro.loads(value_schema_str)
34+
key_schema=avro.loads(key_schema_str)
35+
value= {"name":"Value"}
36+
key= {"name":"Key"}
37+
38+
39+
defdelivery_report(err,msg):
40+
""" Called once for each message produced to indicate delivery result.
41+
Triggered by poll() or flush(). """
42+
iferrisnotNone:
43+
print('Message delivery failed: {}'.format(err))
44+
else:
45+
print('Message delivered to {} [{}]'.format(msg.topic(),msg.partition()))
46+
47+
48+
avroProducer=AvroProducer({
49+
'bootstrap.servers':'localhost:32772,localhost:32773,localhost:32774',
50+
'on_delivery':delivery_report,
51+
'schema.registry.url':'http://localhost:8081'
52+
},default_key_schema=key_schema,default_value_schema=value_schema)
53+
54+
avroProducer.produce(topic='my_topic',value=value,key=key)
55+
avroProducer.flush()

‎consumer.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
#!/usr/bin/env python
2+
#
3+
# Copyright 2020 Confluent Inc.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
# =============================================================================
19+
#
20+
# Consume messages from Confluent Cloud
21+
# Using Confluent Python Client for Apache Kafka
22+
#
23+
# =============================================================================
24+
25+
fromconfluent_kafkaimportConsumer
26+
importjson
27+
28+
if__name__=='__main__':
29+
30+
# Read arguments and configurations and initialize
31+
topic='temperature'
32+
33+
# Create Consumer instance
34+
# 'auto.offset.reset=earliest' to start reading from the beginning of the
35+
# topic if no committed offsets exist
36+
consumer=Consumer({
37+
# Exercise: Add the consumer setting
38+
# Make sure you add a consumer group and choose to reset offset to earliest
39+
})
40+
41+
# Subscribe to topic
42+
consumer.subscribe([topic])
43+
44+
# Process messages
45+
total_count=0
46+
temps= []
47+
try:
48+
whileTrue:
49+
msg=consumer.poll(1.0)
50+
ifmsgisNone:
51+
# No message available within timeout.
52+
# Initial message consumption may take up to
53+
# `session.timeout.ms` for the consumer group to
54+
# rebalance and start consuming
55+
print("Waiting for message or event/error in poll()")
56+
continue
57+
elifmsg.error():
58+
print('error: {}'.format(msg.error()))
59+
else:
60+
61+
# Exercise: Read and print the key and value
62+
pass
63+
64+
exceptKeyboardInterrupt:
65+
pass
66+
finally:
67+
# Leave group and commit final offsets
68+
consumer.close()

‎producer.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
#!/usr/bin/env python
2+
#
3+
# Copyright 2020 Confluent Inc.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
# =============================================================================
19+
#
20+
# Produce messages to Confluent Cloud
21+
# Using Confluent Python Client for Apache Kafka
22+
#
23+
# =============================================================================
24+
25+
fromconfluent_kafkaimportProducer,KafkaError
26+
importjson
27+
importrandom
28+
29+
fromconfluent_kafka.adminimportAdminClient
30+
fromconfluent_kafka.cimplimportNewTopic
31+
32+
if__name__=='__main__':
33+
34+
topic='temperature'
35+
36+
admin_client=AdminClient({'bootstrap.servers':'localhost:32768,localhost:32769,localhost:32770'})
37+
38+
admin_client.create_topics([NewTopic(topic,num_partitions=3,replication_factor=1)])
39+
40+
# Create Producer instance
41+
producer=Producer({
42+
# Exercise: Add your producer configuration here
43+
})
44+
45+
delivered_records=0
46+
47+
# Optional per-message on_delivery handler (triggered by poll() or flush())
48+
# when a message has been successfully delivered or
49+
# permanently failed delivery (after retries).
50+
defacked(err,msg):
51+
globaldelivered_records
52+
"""Delivery report handler called on
53+
successful or failed delivery of message
54+
"""
55+
iferrisnotNone:
56+
print("Failed to deliver message: {}".format(err))
57+
else:
58+
delivered_records+=1
59+
print("Produced record to topic {} partition [{}] @ offset {}"
60+
.format(msg.topic(),msg.partition(),msg.offset()))
61+
62+
cities= ['London','New York','Madrid','Paris']
63+
temps= [10,20,30]
64+
65+
forninrange(10):
66+
city=random.choice(cities)
67+
temp=random.choice(temps)
68+
69+
# Exercise: Create a Json with the city and temp
70+
# Send a message to kafka with the city as key and the json as value
71+
72+
# p.poll() serves delivery reports (on_delivery)
73+
# from previous produce() calls.
74+
producer.poll(0)
75+
76+
producer.flush()
77+
78+
print("{} messages were produced to topic {}!".format(delivered_records,topic))

‎requirements.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
requests
2+
certifi
3+
confluent-kafka[avro,json,protobuf]>=1.4.2

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp