- Notifications
You must be signed in to change notification settings - Fork2.4k
Hey everyone,
Sorry for creating another issue about this. But my question slightly differs from the other problems I found about this...
I have a middleware that uses zeromq as the communication layer, and in some specific cases, I am interested in receiving all the messages. To achieve this I used the monitor socket, and I only publish the first message when there are N (a magic number that for the cases that I require this, I know its value) connections established. However, it appears that sometimes, the first message is lost anyway... I isolated the core logic for this in the following minimal example (which when running locally does not replicate the issue, only when executing on a distributed set of machines):
importzmqimporttimeimportmultiprocessingfromzmqimportFlag,PollEvent,Eventfromzmq.utils.monitorimportrecv_monitor_messagedefpub_process():# Create a context and a PUB socketcontext=zmq.Context()pub_socket=context.socket(zmq.PUB)pub_socket.bind("tcp://*:5555")# Create monitor socketmonitor_socket=pub_socket.get_monitor_socket()print("Waiting for subscriber to connect...")# Wait for the subscriber to connectwhileTrue:result=monitor_socket.poll(10)ifresult!=PollEvent.POLLIN:continueevent=recv_monitor_message(monitor_socket,Flag.DONTWAIT)ifevent["event"]==Event.ACCEPTED:print("Subscriber connected!")monitor_socket.close()break# Now send messages after the subscriber is connectedforiinrange(5):message=f"Message{i}"print(f"Publishing:{message}")pub_socket.send_string(message)time.sleep(1)# Send a stop message to the subscriber, then close the socketpub_socket.send_string("STOP")pub_socket.close()context.term()print("Publisher process finished")defsub_process():# Create a context and a SUB socketcontext=zmq.Context()sub_socket=context.socket(zmq.SUB)sub_socket.setsockopt_string(zmq.SUBSCRIBE,"")sub_socket.connect("tcp://localhost:5555")# Poller setup to check for messagespoller=zmq.Poller()poller.register(sub_socket,zmq.POLLIN)print("Waiting for messages...")# Wait for messageswhileTrue:socks=dict(poller.poll())ifsub_socketinsocks:message=sub_socket.recv_string()ifmessage=="STOP":breakprint(f"Received:{message}")# Close the socket and terminate the contextsub_socket.close()context.term()print("Subscriber process finished")defmain():pub_proc=multiprocessing.Process(target=pub_process,daemon=True)sub_proc=multiprocessing.Process(target=sub_process,daemon=True)pub_proc.start()sub_proc.start()pub_proc.join()sub_proc.join()if__name__=="__main__":main()
This issue#4746 suggests using the monitor sockets, which I am doing.
This issue#2267 suggests that the problem exists when the PUB connects to the SUB, which I am not doing.
According to the documentation herehttps://zguide.zeromq.org/docs/chapter1/#Getting-the-Message-Out, if I understand it correctly, this could happen if the PUB is already sending messages out when the SUB connects, which is not the case because the PUB only publishes the message when the connections are established.
So, does the monitor socket guarantee that the SUB is going to receive the message, or do I need to implement logic with another socket to guarantee that the PUB/SUB connection is fully established?