Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

First message lost between pub/sub sockets #4766

Open
@jmspereira

Description

@jmspereira

Hey everyone,
Sorry for creating another issue about this. But my question slightly differs from the other problems I found about this...

I have a middleware that uses zeromq as the communication layer, and in some specific cases, I am interested in receiving all the messages. To achieve this I used the monitor socket, and I only publish the first message when there are N (a magic number that for the cases that I require this, I know its value) connections established. However, it appears that sometimes, the first message is lost anyway... I isolated the core logic for this in the following minimal example (which when running locally does not replicate the issue, only when executing on a distributed set of machines):

importzmqimporttimeimportmultiprocessingfromzmqimportFlag,PollEvent,Eventfromzmq.utils.monitorimportrecv_monitor_messagedefpub_process():# Create a context and a PUB socketcontext=zmq.Context()pub_socket=context.socket(zmq.PUB)pub_socket.bind("tcp://*:5555")# Create monitor socketmonitor_socket=pub_socket.get_monitor_socket()print("Waiting for subscriber to connect...")# Wait for the subscriber to connectwhileTrue:result=monitor_socket.poll(10)ifresult!=PollEvent.POLLIN:continueevent=recv_monitor_message(monitor_socket,Flag.DONTWAIT)ifevent["event"]==Event.ACCEPTED:print("Subscriber connected!")monitor_socket.close()break# Now send messages after the subscriber is connectedforiinrange(5):message=f"Message{i}"print(f"Publishing:{message}")pub_socket.send_string(message)time.sleep(1)# Send a stop message to the subscriber, then close the socketpub_socket.send_string("STOP")pub_socket.close()context.term()print("Publisher process finished")defsub_process():# Create a context and a SUB socketcontext=zmq.Context()sub_socket=context.socket(zmq.SUB)sub_socket.setsockopt_string(zmq.SUBSCRIBE,"")sub_socket.connect("tcp://localhost:5555")# Poller setup to check for messagespoller=zmq.Poller()poller.register(sub_socket,zmq.POLLIN)print("Waiting for messages...")# Wait for messageswhileTrue:socks=dict(poller.poll())ifsub_socketinsocks:message=sub_socket.recv_string()ifmessage=="STOP":breakprint(f"Received:{message}")# Close the socket and terminate the contextsub_socket.close()context.term()print("Subscriber process finished")defmain():pub_proc=multiprocessing.Process(target=pub_process,daemon=True)sub_proc=multiprocessing.Process(target=sub_process,daemon=True)pub_proc.start()sub_proc.start()pub_proc.join()sub_proc.join()if__name__=="__main__":main()

This issue#4746 suggests using the monitor sockets, which I am doing.
This issue#2267 suggests that the problem exists when the PUB connects to the SUB, which I am not doing.
According to the documentation herehttps://zguide.zeromq.org/docs/chapter1/#Getting-the-Message-Out, if I understand it correctly, this could happen if the PUB is already sending messages out when the SUB connects, which is not the case because the PUB only publishes the message when the connections are established.

So, does the monitor socket guarantee that the SUB is going to receive the message, or do I need to implement logic with another socket to guarantee that the PUB/SUB connection is fully established?

Best,
Jorge

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions


      [8]ページ先頭

      ©2009-2025 Movatter.jp