Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf9ad1b1

Browse files
committed
[hotfix][examples] Add Python examples on how to read binary data from Kafka
1 parent6e89776 commitf9ad1b1

File tree

4 files changed

+186
-39
lines changed

4 files changed

+186
-39
lines changed

‎flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py‎

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818
importlogging
1919
importsys
2020

21-
frompyflink.commonimportTypes
21+
frompyflink.commonimportTypes,WatermarkStrategy
2222
frompyflink.datastreamimportStreamExecutionEnvironment
23-
frompyflink.datastream.connectors.kafkaimportFlinkKafkaProducer,FlinkKafkaConsumer
23+
frompyflink.datastream.connectors.kafkaimport (KafkaRecordSerializationSchema,KafkaSink,
24+
KafkaSource,KafkaOffsetsInitializer)
2425
frompyflink.datastream.formats.avroimportAvroRowSerializationSchema,AvroRowDeserializationSchema
2526

2627

@@ -43,14 +44,20 @@ def write_to_kafka(env):
4344
}"""
4445
)
4546

46-
kafka_producer=FlinkKafkaProducer(
47-
topic='test_avro_topic',
48-
serialization_schema=serialization_schema,
49-
producer_config={'bootstrap.servers':'localhost:9092','group.id':'test_group'}
47+
record_serializer=KafkaRecordSerializationSchema.builder() \
48+
.set_topic('test_avro_topic') \
49+
.set_value_serialization_schema(serialization_schema) \
50+
.build()
51+
kafka_sink= (
52+
KafkaSink.builder()
53+
.set_record_serializer(record_serializer)
54+
.set_bootstrap_servers('localhost:9092')
55+
.set_property("group.id","test_group")
56+
.build()
5057
)
5158

5259
# note that the output type of ds must be RowTypeInfo
53-
ds.add_sink(kafka_producer)
60+
ds.sink_to(kafka_sink)
5461
env.execute()
5562

5663

@@ -67,14 +74,22 @@ def read_from_kafka(env):
6774
}"""
6875
)
6976

70-
kafka_consumer=FlinkKafkaConsumer(
71-
topics='test_avro_topic',
72-
deserialization_schema=deserialization_schema,
73-
properties={'bootstrap.servers':'localhost:9092','group.id':'test_group_1'}
77+
kafka_source= (
78+
KafkaSource.builder()
79+
.set_topics('test_avro_topic')
80+
.set_value_only_deserializer(deserialization_schema)
81+
.set_properties({'bootstrap.servers':'localhost:9092','group.id':'test_group_1'})
82+
.set_starting_offsets(KafkaOffsetsInitializer.earliest())
83+
.build()
7484
)
75-
kafka_consumer.set_start_from_earliest()
7685

77-
env.add_source(kafka_consumer).print()
86+
ds=env.from_source(
87+
kafka_source,
88+
watermark_strategy=WatermarkStrategy.no_watermarks(),
89+
source_name="kafka source"
90+
)
91+
92+
ds.print()
7893
env.execute()
7994

8095

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
importjson
19+
importlogging
20+
importsys
21+
22+
frompyflink.commonimportTypes,ByteArraySchema,WatermarkStrategy
23+
frompyflink.datastreamimportStreamExecutionEnvironment
24+
frompyflink.datastream.connectors.kafkaimportKafkaSource, \
25+
KafkaOffsetsInitializer,KafkaSink,KafkaRecordSerializationSchema
26+
27+
28+
# This example works since Flink 2.0 since ByteArraySchema was introduced in Flink 2.0
29+
30+
# Make sure that the Kafka cluster is started and the topic 'test_json_topic' is
31+
# created before executing this job.
32+
defwrite_to_kafka(env):
33+
data= [
34+
(json.dumps({
35+
"id":1,
36+
"country":"USA"
37+
}).encode("utf-8"),),
38+
(json.dumps({
39+
"id":2,
40+
"country":"Canada"
41+
}).encode("utf-8"),),
42+
(json.dumps({
43+
"id":3,
44+
"country":"Germany"
45+
}).encode("utf-8"),)
46+
]
47+
type_info=Types.ROW([Types.PRIMITIVE_ARRAY(Types.BYTE())])
48+
ds=env.from_collection(data,type_info=type_info)
49+
50+
# declare the output type as Types.PRIMITIVE_ARRAY(Types.BYTE()),
51+
# otherwise, Types.PICKLED_BYTE_ARRAY() will be used by default, it will
52+
# use pickler to serialize the result byte array which is unnecessary
53+
ds=ds.map(lambdax:x[0],output_type=Types.PRIMITIVE_ARRAY(Types.BYTE()))
54+
55+
record_serializer=KafkaRecordSerializationSchema.builder() \
56+
.set_topic('test_bytearray_topic') \
57+
.set_value_serialization_schema(ByteArraySchema()) \
58+
.build()
59+
kafka_sink= (
60+
KafkaSink.builder()
61+
.set_record_serializer(record_serializer)
62+
.set_bootstrap_servers('localhost:9092')
63+
.set_property("group.id","test_group")
64+
.build()
65+
)
66+
67+
ds.sink_to(kafka_sink)
68+
env.execute()
69+
70+
71+
defread_from_kafka(env):
72+
kafka_source= (
73+
KafkaSource.builder()
74+
.set_topics('test_bytearray_topic')
75+
.set_value_only_deserializer(ByteArraySchema())
76+
.set_properties({'bootstrap.servers':'localhost:9092','group.id':'test_group_1'})
77+
.set_starting_offsets(KafkaOffsetsInitializer.earliest())
78+
.build()
79+
)
80+
81+
ds=env.from_source(
82+
kafka_source,
83+
watermark_strategy=WatermarkStrategy.no_watermarks(),
84+
source_name="kafka source"
85+
)
86+
87+
# the data read out from the source is byte array, decode it as a string
88+
ds.map(lambdadata:data.decode("utf-8")).print()
89+
env.execute()
90+
91+
92+
if__name__=='__main__':
93+
logging.basicConfig(stream=sys.stdout,level=logging.INFO,format="%(message)s")
94+
95+
env=StreamExecutionEnvironment.get_execution_environment()
96+
env.add_jars("file:///path/to/flink-sql-connector-kafka-1.15.0.jar")
97+
98+
print("start writing data to kafka")
99+
write_to_kafka(env)
100+
101+
print("start reading data from kafka")
102+
read_from_kafka(env)

‎flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py‎

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818
importlogging
1919
importsys
2020

21-
frompyflink.commonimportTypes
21+
frompyflink.commonimportTypes,WatermarkStrategy
2222
frompyflink.datastreamimportStreamExecutionEnvironment
23-
frompyflink.datastream.connectors.kafkaimportFlinkKafkaProducer,FlinkKafkaConsumer
23+
frompyflink.datastream.connectors.kafkaimport (KafkaRecordSerializationSchema,KafkaSink,
24+
KafkaSource,KafkaOffsetsInitializer)
2425
frompyflink.datastream.formats.csvimportCsvRowSerializationSchema,CsvRowDeserializationSchema
2526

2627

@@ -33,29 +34,43 @@ def write_to_kafka(env):
3334
type_info=type_info)
3435

3536
serialization_schema=CsvRowSerializationSchema.Builder(type_info).build()
36-
kafka_producer=FlinkKafkaProducer(
37-
topic='test_csv_topic',
38-
serialization_schema=serialization_schema,
39-
producer_config={'bootstrap.servers':'localhost:9092','group.id':'test_group'}
37+
record_serializer=KafkaRecordSerializationSchema.builder() \
38+
.set_topic('test_csv_topic') \
39+
.set_value_serialization_schema(serialization_schema) \
40+
.build()
41+
kafka_sink= (
42+
KafkaSink.builder()
43+
.set_record_serializer(record_serializer)
44+
.set_bootstrap_servers('localhost:9092')
45+
.set_property("group.id","test_group")
46+
.build()
4047
)
4148

4249
# note that the output type of ds must be RowTypeInfo
43-
ds.add_sink(kafka_producer)
50+
ds.sink_to(kafka_sink)
4451
env.execute()
4552

4653

4754
defread_from_kafka(env):
4855
type_info=Types.ROW([Types.INT(),Types.STRING()])
4956
deserialization_schema=CsvRowDeserializationSchema.Builder(type_info).build()
5057

51-
kafka_consumer=FlinkKafkaConsumer(
52-
topics='test_csv_topic',
53-
deserialization_schema=deserialization_schema,
54-
properties={'bootstrap.servers':'localhost:9092','group.id':'test_group_1'}
58+
kafka_source= (
59+
KafkaSource.builder()
60+
.set_topics('test_csv_topic')
61+
.set_value_only_deserializer(deserialization_schema)
62+
.set_properties({'bootstrap.servers':'localhost:9092','group.id':'test_group_1'})
63+
.set_starting_offsets(KafkaOffsetsInitializer.earliest())
64+
.build()
5565
)
56-
kafka_consumer.set_start_from_earliest()
5766

58-
env.add_source(kafka_consumer).print()
67+
ds=env.from_source(
68+
kafka_source,
69+
watermark_strategy=WatermarkStrategy.no_watermarks(),
70+
source_name="kafka source"
71+
)
72+
73+
ds.print()
5974
env.execute()
6075

6176

‎flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py‎

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818
importlogging
1919
importsys
2020

21-
frompyflink.commonimportTypes
21+
frompyflink.commonimportTypes,WatermarkStrategy
2222
frompyflink.datastreamimportStreamExecutionEnvironment
23-
frompyflink.datastream.connectors.kafkaimportFlinkKafkaProducer,FlinkKafkaConsumer
23+
frompyflink.datastream.connectors.kafkaimport (KafkaRecordSerializationSchema,KafkaSink,
24+
KafkaSource,KafkaOffsetsInitializer)
2425
frompyflink.datastream.formats.jsonimportJsonRowSerializationSchema,JsonRowDeserializationSchema
2526

2627

@@ -35,29 +36,43 @@ def write_to_kafka(env):
3536
serialization_schema=JsonRowSerializationSchema.Builder() \
3637
.with_type_info(type_info) \
3738
.build()
38-
kafka_producer=FlinkKafkaProducer(
39-
topic='test_json_topic',
40-
serialization_schema=serialization_schema,
41-
producer_config={'bootstrap.servers':'localhost:9092','group.id':'test_group'}
39+
record_serializer=KafkaRecordSerializationSchema.builder() \
40+
.set_topic('test_json_topic') \
41+
.set_value_serialization_schema(serialization_schema) \
42+
.build()
43+
kafka_sink= (
44+
KafkaSink.builder()
45+
.set_record_serializer(record_serializer)
46+
.set_bootstrap_servers('localhost:9092')
47+
.set_property("group.id","test_group")
48+
.build()
4249
)
4350

4451
# note that the output type of ds must be RowTypeInfo
45-
ds.add_sink(kafka_producer)
52+
ds.sink_to(kafka_sink)
4653
env.execute()
4754

4855

4956
defread_from_kafka(env):
5057
deserialization_schema=JsonRowDeserializationSchema.Builder() \
5158
.type_info(Types.ROW([Types.INT(),Types.STRING()])) \
5259
.build()
53-
kafka_consumer=FlinkKafkaConsumer(
54-
topics='test_json_topic',
55-
deserialization_schema=deserialization_schema,
56-
properties={'bootstrap.servers':'localhost:9092','group.id':'test_group_1'}
60+
kafka_source= (
61+
KafkaSource.builder()
62+
.set_topics('test_json_topic')
63+
.set_value_only_deserializer(deserialization_schema)
64+
.set_properties({'bootstrap.servers':'localhost:9092','group.id':'test_group_1'})
65+
.set_starting_offsets(KafkaOffsetsInitializer.earliest())
66+
.build()
67+
)
68+
69+
ds=env.from_source(
70+
kafka_source,
71+
watermark_strategy=WatermarkStrategy.no_watermarks(),
72+
source_name="kafka source"
5773
)
58-
kafka_consumer.set_start_from_earliest()
5974

60-
env.add_source(kafka_consumer).print()
75+
ds.print()
6176
env.execute()
6277

6378

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp