Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

[watermill-kafka] Context value added during unmarshal missing #633

Open
@mike-gurney

Description

@mike-gurney

Description

I am doing a POC with Watermill and am trying to put something into the watermill message context during the unmarshal from kafka to watermill using a custom unmarshaller. However, the value I put in during that step seems to get lost when the message is handled during with a ConsumerHandler. I couldn't find information on this and was wondering if someone could help me look and see if I am doing something wrong, if this is expected, or if this is a bug.

Steps to reproduce

Trimmed down version of code that should show the issue:

  1. Run provided docker composedocker-compose up -d (or have a kafka broker available)
  2. Run the provided code (changing the broker value if needed)

docker-compose.yaml

version:'3.8'services:# Copied from https://github.com/confluentinc/cp-all-in-one/blob/6b81eae4be858c2c2cbcbe1dc37439c076148e22/cp-all-in-one-kraft/docker-compose.ymlbroker:image:confluentinc/cp-server:7.5.0hostname:brokercontainer_name:brokerports:      -"9092:9092"      -"9101:9101"environment:KAFKA_NODE_ID:1KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'KAFKA_ADVERTISED_LISTENERS:'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR:1KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS:0KAFKA_TRANSACTION_STATE_LOG_MIN_ISR:1KAFKA_JMX_PORT:9101KAFKA_JMX_HOSTNAME:localhostKAFKA_METRIC_REPORTERS:io.confluent.telemetry.reporter.TelemetryReporterCONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS:broker:29092CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS:1KAFKA_PROCESS_ROLES:'broker,controller'KAFKA_CONTROLLER_QUORUM_VOTERS:'1@broker:29093'KAFKA_LISTENERS:'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'KAFKA_INTER_BROKER_LISTENER_NAME:'PLAINTEXT'KAFKA_CONTROLLER_LISTENER_NAMES:'CONTROLLER'KAFKA_LOG_DIRS:'/tmp/kraft-combined-logs'CONFLUENT_METRICS_ENABLE:'true'CONFLUENT_SUPPORT_CUSTOMER_ID:'anonymous'CLUSTER_ID:'MkU3OEVBNTcwNTJENDM2Qk'KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR:1KAFKA_CLUSTER_LINK_METADATA_TOPIC_REPLICATION_FACTOR:1KAFKA_CONFLUENT_CLUSTER_LINK_METADATA_TOPIC_REPLICATION_FACTOR:1KAFKA_CONFLUENT_DURABILITY_TOPIC_REPLICATION_FACTOR:1KAFKA_CONFLUENT_TIER_METADATA_REPLICATION_FACTOR:1KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR:1KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR:1
package mainimport ("context""fmt""log/slog""os""time""github.com/IBM/sarama""github.com/ThreeDotsLabs/watermill"wmkafka"github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka""github.com/ThreeDotsLabs/watermill/message""github.com/ThreeDotsLabs/watermill/message/router/plugin")var (publisher    message.PublisherglobalLogger*slog.Logger)typeTestMarshallerstruct {defaultMarshaller wmkafka.DefaultMarshaler}funcNewTestMarshaller()*TestMarshaller {return&TestMarshaller{defaultMarshaller: wmkafka.DefaultMarshaler{},}}func (m*TestMarshaller)Marshal(topicstring,msg*message.Message) (*sarama.ProducerMessage,error) {returnm.defaultMarshaller.Marshal(topic,msg)}func (m*TestMarshaller)Unmarshal(msg*sarama.ConsumerMessage) (*message.Message,error) {wmMsg,err:=m.defaultMarshaller.Unmarshal(msg)iferr!=nil {returnnil,err}wmMsg.SetContext(context.WithValue(wmMsg.Context(),"foo","bar"))globalLogger.Info("-----------------Unmarshalling message----------------")globalLogger.Info(fmt.Sprintf("%v",wmMsg.Context()))globalLogger.Info("------------------------------------------------------")returnwmMsg,nil}funcproduce() {for {time.Sleep(10*time.Second)globalLogger.Info("Producing Message")publisher.Publish("testTopic",message.NewMessage(watermill.NewUUID(), []byte("hello")))}}funcconsumeTestTopic(msg*message.Message)error {globalLogger.Info("-------------Received message on testTopic-------------")globalLogger.Info(fmt.Sprintf("%v",msg.Context()))globalLogger.Info("------------------------------------------------------")returnnil}funcmain() {globalLogger=slog.New(slog.NewTextHandler(os.Stdout,&slog.HandlerOptions{}))// Create my kafka subscriberkafkaConsumer,consErr:=wmkafka.NewSubscriber(wmkafka.SubscriberConfig{Brokers:       []string{"localhost:9092"},Unmarshaler:NewTestMarshaller(),ConsumerGroup:"topicGroup",},watermill.NewSlogLogger(globalLogger))ifconsErr!=nil {globalLogger.Error("failed to create consumer","error",consErr)panic(consErr)}router,err:=message.NewRouter(message.RouterConfig{},watermill.NewSlogLogger(globalLogger))iferr!=nil {globalLogger.Error("failed to create router","error",err)panic(err)}deferrouter.Close()router.AddPlugin(plugin.SignalsHandler)router.AddConsumerHandler("consumeTestTopic","testTopic",kafkaConsumer,consumeTestTopic)varprodErrerror// Create my kafka publisherpublisher,prodErr=wmkafka.NewPublisher(wmkafka.PublisherConfig{Brokers:   []string{"localhost:9092"},Marshaler:NewTestMarshaller(),},watermill.NewSlogLogger(globalLogger))ifprodErr!=nil {globalLogger.Error("failed to create publisher","error",prodErr)panic(prodErr)}goproduce()router.Run(context.Background())}

Expected behavior

My context value should be present when the message is handled.

Actual behavior

My context value is not there - however there is a context with values.

Possible solution

I tried looking to see where the context could be overwritten - but the only thing I found seems to be additive. However, I am not as familiar with the watermill library source.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions


      [8]ページ先頭

      ©2009-2025 Movatter.jp