- Notifications
You must be signed in to change notification settings - Fork469
Description
Description
I am doing a POC with Watermill and am trying to put something into the watermill message context during the unmarshal from kafka to watermill using a custom unmarshaller. However, the value I put in during that step seems to get lost when the message is handled during with a ConsumerHandler. I couldn't find information on this and was wondering if someone could help me look and see if I am doing something wrong, if this is expected, or if this is a bug.
Steps to reproduce
Trimmed down version of code that should show the issue:
- Run provided docker compose
docker-compose up -d(or have a kafka broker available) - Run the provided code (changing the broker value if needed)
docker-compose.yaml
version:'3.8'services:# Copied from https://github.com/confluentinc/cp-all-in-one/blob/6b81eae4be858c2c2cbcbe1dc37439c076148e22/cp-all-in-one-kraft/docker-compose.ymlbroker:image:confluentinc/cp-server:7.5.0hostname:brokercontainer_name:brokerports: -"9092:9092" -"9101:9101"environment:KAFKA_NODE_ID:1KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'KAFKA_ADVERTISED_LISTENERS:'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR:1KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS:0KAFKA_TRANSACTION_STATE_LOG_MIN_ISR:1KAFKA_JMX_PORT:9101KAFKA_JMX_HOSTNAME:localhostKAFKA_METRIC_REPORTERS:io.confluent.telemetry.reporter.TelemetryReporterCONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS:broker:29092CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS:1KAFKA_PROCESS_ROLES:'broker,controller'KAFKA_CONTROLLER_QUORUM_VOTERS:'1@broker:29093'KAFKA_LISTENERS:'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'KAFKA_INTER_BROKER_LISTENER_NAME:'PLAINTEXT'KAFKA_CONTROLLER_LISTENER_NAMES:'CONTROLLER'KAFKA_LOG_DIRS:'/tmp/kraft-combined-logs'CONFLUENT_METRICS_ENABLE:'true'CONFLUENT_SUPPORT_CUSTOMER_ID:'anonymous'CLUSTER_ID:'MkU3OEVBNTcwNTJENDM2Qk'KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR:1KAFKA_CLUSTER_LINK_METADATA_TOPIC_REPLICATION_FACTOR:1KAFKA_CONFLUENT_CLUSTER_LINK_METADATA_TOPIC_REPLICATION_FACTOR:1KAFKA_CONFLUENT_DURABILITY_TOPIC_REPLICATION_FACTOR:1KAFKA_CONFLUENT_TIER_METADATA_REPLICATION_FACTOR:1KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR:1KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR:1
package mainimport ("context""fmt""log/slog""os""time""github.com/IBM/sarama""github.com/ThreeDotsLabs/watermill"wmkafka"github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka""github.com/ThreeDotsLabs/watermill/message""github.com/ThreeDotsLabs/watermill/message/router/plugin")var (publisher message.PublisherglobalLogger*slog.Logger)typeTestMarshallerstruct {defaultMarshaller wmkafka.DefaultMarshaler}funcNewTestMarshaller()*TestMarshaller {return&TestMarshaller{defaultMarshaller: wmkafka.DefaultMarshaler{},}}func (m*TestMarshaller)Marshal(topicstring,msg*message.Message) (*sarama.ProducerMessage,error) {returnm.defaultMarshaller.Marshal(topic,msg)}func (m*TestMarshaller)Unmarshal(msg*sarama.ConsumerMessage) (*message.Message,error) {wmMsg,err:=m.defaultMarshaller.Unmarshal(msg)iferr!=nil {returnnil,err}wmMsg.SetContext(context.WithValue(wmMsg.Context(),"foo","bar"))globalLogger.Info("-----------------Unmarshalling message----------------")globalLogger.Info(fmt.Sprintf("%v",wmMsg.Context()))globalLogger.Info("------------------------------------------------------")returnwmMsg,nil}funcproduce() {for {time.Sleep(10*time.Second)globalLogger.Info("Producing Message")publisher.Publish("testTopic",message.NewMessage(watermill.NewUUID(), []byte("hello")))}}funcconsumeTestTopic(msg*message.Message)error {globalLogger.Info("-------------Received message on testTopic-------------")globalLogger.Info(fmt.Sprintf("%v",msg.Context()))globalLogger.Info("------------------------------------------------------")returnnil}funcmain() {globalLogger=slog.New(slog.NewTextHandler(os.Stdout,&slog.HandlerOptions{}))// Create my kafka subscriberkafkaConsumer,consErr:=wmkafka.NewSubscriber(wmkafka.SubscriberConfig{Brokers: []string{"localhost:9092"},Unmarshaler:NewTestMarshaller(),ConsumerGroup:"topicGroup",},watermill.NewSlogLogger(globalLogger))ifconsErr!=nil {globalLogger.Error("failed to create consumer","error",consErr)panic(consErr)}router,err:=message.NewRouter(message.RouterConfig{},watermill.NewSlogLogger(globalLogger))iferr!=nil {globalLogger.Error("failed to create router","error",err)panic(err)}deferrouter.Close()router.AddPlugin(plugin.SignalsHandler)router.AddConsumerHandler("consumeTestTopic","testTopic",kafkaConsumer,consumeTestTopic)varprodErrerror// Create my kafka publisherpublisher,prodErr=wmkafka.NewPublisher(wmkafka.PublisherConfig{Brokers: []string{"localhost:9092"},Marshaler:NewTestMarshaller(),},watermill.NewSlogLogger(globalLogger))ifprodErr!=nil {globalLogger.Error("failed to create publisher","error",prodErr)panic(prodErr)}goproduce()router.Run(context.Background())}
Expected behavior
My context value should be present when the message is handled.
Actual behavior
My context value is not there - however there is a context with values.
Possible solution
I tried looking to see where the context could be overwritten - but the only thing I found seems to be additive. However, I am not as familiar with the watermill library source.