2020import com .telkomdev .producer .serializer .ProductAvroSerializer ;
2121import com .telkomdev .producer .serializer .ProductJsonSerializer ;
2222import com .telkomdev .producer .serializer .ProductProtobufSerializer ;
23- import org .apache .kafka .clients .producer .KafkaProducer ;
24- import org .apache .kafka .clients .producer .Producer ;
25- import org .apache .kafka .clients .producer .ProducerConfig ;
26- import org .apache .kafka .clients .producer .ProducerRecord ;
2723
2824import java .util .ArrayList ;
2925import java .util .List ;
30- import java .util .Properties ;
3126import java .util .Scanner ;
3227
3328public class App {
@@ -48,34 +43,27 @@ public static void main(String[] args) {
4843System .exit (0 );
4944 }
5045
51- Properties producerConfig =new Properties ();
52-
53- producerConfig .put (ProducerConfig .CLIENT_ID_CONFIG ,"client-1" );
54- producerConfig .put (ProducerConfig .RETRIES_CONFIG ,0 );
55- producerConfig .put (ProducerConfig .ACKS_CONFIG ,"all" );
56-
57- // kafka brokers
58- producerConfig .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG ,brokers );
59- producerConfig .put (ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG ,org .apache .kafka .common .serialization .ByteArraySerializer .class .getName ());
60-
6146// send String data
62- //producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName() );
47+ //String valueSerializer = org.apache.kafka.common.serialization.StringSerializer.class.getName();
6348
6449// send Protocol Buffer data
65- producerConfig . put ( ProducerConfig . VALUE_SERIALIZER_CLASS_CONFIG , ProductProtobufSerializer .class .getName () );
50+ String valueSerializer = ProductProtobufSerializer .class .getName ();
6651
6752// send JSON data
68- //producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProductJsonSerializer.class.getName());
53+ // String valueSerializer = ProductJsonSerializer.class.getName());
6954
7055// send AVRO data
71- //producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProductAvroSerializer.class.getName());
56+ // String valueSerializer = producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProductAvroSerializer.class.getName();
57+
58+ final MyKafkaProducer <String ,Product >producer =new MyKafkaProducer ("client-1" ,
59+ org .apache .kafka .common .serialization .StringSerializer .class .getName (),
60+ valueSerializer ,brokers );
7261
73- Producer producer =new KafkaProducer <String ,String >(producerConfig );
7462
7563// read input
7664Scanner in =new Scanner (System .in );
7765
78- System .out .println ("Type Message (type 'exit' to quit)" );
66+ System .out .println ("Click Enter to send message (type 'exit' to quit)" );
7967String input =in .nextLine ();
8068
8169while (!input .equals ("exit" )) {
@@ -91,12 +79,11 @@ public static void main(String[] args) {
9179images .add ("wuriyanto.com/img2" );
9280p .setImages (images );
9381
94- ProducerRecord <String ,Product >record =new ProducerRecord <String ,Product >(topic ,p );
9582
9683try {
9784System .out .println (input );
9885
99- producer .send (record );
86+ producer .send (topic , p );
10087input =in .nextLine ();
10188 }catch (Exception ex ) {
10289System .out .println ("error send data to kafka: " +ex .getMessage ());
@@ -105,6 +92,6 @@ public static void main(String[] args) {
10592 }
10693
10794in .close ();
108- producer .close ();
95+ producer .getProducer (). close ();
10996 }
11097}