One of the main questions that AMQP users have when coming to 0MQ is how we do reliability.
So I'm collecting requirements for reliability on top of 0MQ. My idea is to build this as an application layer on top of 0MQ, using a different API and a separate protocol that sits on top of the 0MQ SPB framing.
Here is a sketch of what seems to be the simplest design for fire-and-forget reliability:
- It covers only one-to-one delivery of messages (e.g. for trade execution or request-response).
- The publisher delivers a message to a local dispatcher service (perhaps based on a 0MQ device).
- The dispatcher queues the message on persistent storage and the publisher continues.
- The dispatcher in a second thread delivers messages to their destination.
- Recipients reply to messages with an acknowledgement.
- Recipients store received messages and if they get a duplicate, they discard it but resend the acknowledgement.
- The dispatcher marks acknowledged messages, and ignores duplicate acknowledgements.
In the most brutal implementation, both sides use a light database to hold messages, and store everything. There are more intelligent ways to hold persistent queues but that's an implementation detail.
The above design covers crashes in the publisher, consumer, and network afaics. Throughput would depend on message size, since messages can easily be batched and acknowledged in blocks.
Has anyone been implementing such reliability on top of 0MQ already?

I believe a good starting point for this discussion is the functionality offered by the JMS spec. It is well known, mostly understandable, and seems to satisfy many requirements in this space.
So, for delivery, these might be a start:
The above pertain to quite possibly more than one message at a time as messages may be batched in brackets (transactions).
The client should be unaware of 'expected' problems in the infrastructure such as network down, server process overloaded, network congestion, and so forth. The client may be made aware of fatal errors, i.e., those from which no recovery is deemed possible, e.g., natural disasters. It might be feasible to include such things as full disks at this point, but that is perhaps up for debate, at least the moment when the error is returned to the client.
The same should apply to the server process when replying to messages.
I shall stop at this point as I am not sure if this is what is being asked for? Reliability is a rather loose, ill defined term, so I do not want to waste people's time with unrelated content.
As another discussion point, you could look at the QoS models for MQTT (www.mqtt.org). QoS 2 = guaranteed once and only once, QoS 1 = guaranteed at least once, QoS 0 = fire and forget.
MQTT is a lightweight protocol, and simple to encode/decode. Having said that, there are gaping holes in the semantic parts of the spec. For example:
Anyway, I'm not proposing much be taken from this, but I think it's a useful comparison point.
I'm looking for "guaranteed at least once" delivery, and some sort of lightweight persistent queue which can be embedded in the 'client' or 'source' side. Writing message(s) into a queue gets them stored to disk and Ack(s) sent when the file is flushed. When message(s) have been forwarded and the Ack(s) received, then they are freed from disk.
I think the MQTT QoS 2 model implies that "guaranteed once and only once" can be implemented as a layer on top of "guaranteed at least once".
Here is a brief review of some protocols with guaranteed delivery
RADIUS Accounting
(RFC2865, RFC2866)
RADIUS is not a general message delivery protocol, but sends encoded lists
of attribute-value pairs.
RADIUS accounting packets are sent from a Network Access Server (NAS) to a
RADIUS server to report session start, stop and statistics. They are sent
over UDP, and have a simple assured delivery mechanism.
The IDs do not need to be generated sequentially (although many NAS
implementations do so), and the Acct-Responses do not need to be received in
order. This allows for stateless proxying:
(Typically the proxy adds an attribute when forwarding the Acct-Request,
and when the response comes backs to the proxy, strips this attribute and
uses it to forward back to the NAS)
Notes:
MQTT
http://www.mqtt.org/
MQTT is an IBM-defined lightweight messaging protocol over TCP, intended
for use in embedded applications.
MQTT defines three QoS levels: 0 (no assurance), 1 (at least once
delivery), 2 (exactly once delivery). We'll look at QoS 1 here.
Messages are sent with a 16-bit ID, and a PUBLISH is acknowledged by a
PUBACK with the same ID.
Apart from connection establishment, the protocol is symmetrical; if the
client has subscribed to a topic, then PUBLISH and PUBACK messages can go in
the reverse directions on the same TCP connection.
Similar to RADIUS, there is no requirement to use ID numbers in any
sequence. PUBACKs can arrive out of sequence, and there is no 'window' to
determine when it's safe to re-use any particular ID number.
The client is identified by a 1 to 23 character unique ID which it provides
when it connects to the server. That is, the (clientID, messageID) for each
in-transit message is unique. This allows the client to disconnect and
reconnect from a different IP/source port, whilst still having outstanding
messages to deliver. But it prevents the same client making multiple
connections to the broker using the same clientID.
The protocol specification is clear about the packet format on the wire,
but less clear about the semantics of assured delivery.
SMTP
(RFC2821)
The simplest of assured delivery mechanisms:
There's no ID because only one message is outstanding at a time in the
connection. If you want to send multiple messages in parallel, you have to
open multiple SMTP TCP connections.
If the 350 OK is lost, then the message may be retransmitted as a duplicate
(this does happen occasionally)
Sliding window protocols (TCP, X25)
These have the common characteristic of a wrapping sequence number (byte
pointer or packet number) and a window which says how much data can be
outstanding in transit before it is acknowledged.
A batch of bytes (or packets) can be acknowledged with a single ack, which
as an optimisation can be piggybacked on a data packet going in the opposite
direction.
These protocols are simple, well understood, and provide guarantees of
in-order delivery as non duplication. However they only work well for a
single connection between the two endpoints. This would be hard to map onto
0MQ if there were a variable number of connections between the same pair of
devices.
Aside: IBM's reference C implementation of MQTT client ("IA93") has the ability to spool messages to disk. The default implementation is one message per file, but this is pluggable.
The problem is that the 16-bit sequence number is assigned *before* writing the message to disk, so you can only have 64K messages queued before it falls over. This means that any application which might have to collect some hours or days of data while the broker is down, has to have its own separate queuing mechanism anyway :-(
I'd suggest that 0MQ have an opaque binary tag as the message identifier (similar to the message topic prefix). The receiver of the data would just echo back this tag as the acknowledgement.
Not only is this extensible, it allows for some interesting applications. It allows a stateless broker which receives a message and routes it to one of N recipients. By prepending some state info to the ID before forwarding, when the Ack comes back it can remove the state info to know which source to return the Ack to.
The application I have in mind is a RADIUS server which receives RADIUS
accounting packets and forwards them to a central data warehouse for
processing.
The data warehouse and/or the intervening network connection may not be
available 100% of the time. So the RADIUS server will need to Ack the
messages back to the NAS (which can't hold them for very long), spool them
to persistent storage, and de-spool them later when available. It doesn't
need to be 100% bomb-proof but in the normal case it should survive a
process stop/start.
Another similar application might be forwarding syslogs over zeromq, where
you don't want to lose any syslog messages if the remote syslog server is
not reachable.
The following requirements come out of this.
In the API, I want to be able to send a message and then either recv() or
get a callback when the message has been successfully delivered or queued.
This is so that I can hold off sending the RADIUS Acct-Response until the
request has been safely processed or stored.
(RADIUS Accounting runs over UDP, and has a very simple assured delivery
mechanism; the NAS sends an Accounting-Request packet with an 8-bit sequence
number, and resends periodically until it gets the corresponding
Accounting-Response. See RFC 2865/2866)
(A good way to build this is OpenRADIUS, which lets you plug in an external
process over stdin/stdout to receive and response to RADIUS messages)
I think it should be possible to plug in queues where needed, without
changing the code at either end. Consider a remote SQL writer process which
returns an ACK after committing to the database. In the simplest case,
these ACKs could be used to return the RADIUS Acct-Responses to the NAS:
Then I would like to be able to decouple them by inserting a persistent
queue process, with no code changes.
This ensures the NAS gets its Acct-Response quickly, even if the remote
SQL writer is down or unreachable.
I might also want to insert an inbound Queue at the SQL writer side too -
for example, to be able to batch up a number of RADIUS packets into a single
SQL update.
So basically, we need the "DATA - ACK" pattern. Adding a queue means that
the protocol is unchanged but you get your ACKs back quicker.
ack(1) can be returned once the message is queued, and ack(2) will cause it
to be removed from the queue.
To get reasonable message throughput, there needs to be a number of
unacknowledged messages in transit at any one time. So the existing 0MQ
request-reply pattern may not be ideal, as it would require many concurrent
TCP connections to get that throughput.
I would like the queue process to be able to perform batching: appending
multiple messages to a file, fsyncing it after a configured time (say
100ms), and then returning acks after the fsync. This gives a potential
throughput of tens of thousands of messages per second even on a single
commodity hard drive (M10K), depending on what latency tradeoff you're
prepared to accept.
There is an even bigger potential benefit from this batching. If the
message is forwarded before it is written to disk, and the ack comes back
before the timeout expires, then there is no need to write it to disk at
all! You thus get the benefit of persistent storage without any performance
penalty in the usual case.
Ideally I'd like to get this benefit if the queues are chained too. The
message should propagate as far as it can and only get written to disk at
the last point. This may be awkward to arrange if each queue has its own
independent timeout.
If you have a queue feeding multiple recipients in a round-robin way, you
want to have some decent flow control - or be able to configure a window
size of 1 message.
Consider that if you commit a large burst of messages to rx1, rx2 and rx3,
it might be that rx2 and rx3 become idle while rx1 is still working through
its backlog.
In this application I don't actually care about in-order delivery, but
it may be simplest to implement it as such. Then all you need is a TCP-like
message sequence number and ack number, and you get duplicate-prevention
too.
However, this becomes complicated if there are multiple concurrent
connections between the same client-server pair, or if routing different
messages to different destinations (see rx1,rx2,rx3 above) which may take
varying times to process each message.
So perhaps it is better to acknowledge each message individually, like
RADIUS and MQTT. What is the scope of the message ID? Should it be globally
unique, or unique for the particular (client IP,server IP,server port)?
Other thoughts.
Similarly, the output of a queue may be a publish stream.
Hi Brian,
Unfortunately your comment is so long it cannot be answered in detail using a forum…
Some of the comments you've made are right, some of them fail to take more general cases into account. In overall, if you thought of it yourself, you must be a pretty clever person.
However, messing with technical details inevitably leads to complex and convoluted design. I've been down that road several times.
So, to keep matter simple, let's rather answer the most basic question: What's the formal definition of reliability?
In other words, what does it mean to transfer message "reliably" from A to B (via optional intermediate nodes X, Y and Z) ?
I would go with the SMTP view of reliability: a confirmed undertaking from the next node in the chain that it will ensure the eventual delivery of the message at least once.
This is a simple and useful model in its own right, and I suspect (without proof) that other models like exactly-once-delivery can be layered on top of this.
Messages which are delayed forever, e.g. due to misconfiguration, are a special case. SMTP will 'bounce' these after a timeout. I don't think that's required or desirable here; rather, the administrator should have the opportunity to fix the problem and allow the flow to resume. Out-of-band notification of stale messages and full queues is desirable, as is graphing of queue depths, which opens a whole other area. SNMP over 0MQ, anyone? :-)
That won't work. In case of failure of the middle node (such as Y) the messages stored at that node won't be ever delivered. Additionally, there's no chance for the endpoints to know about the failure. It may be argued that the resolving of the problem is up to the administrator, however, the argument doesn't hold for Internet: the network in the middle is out of your control, it's inherently unreliable and the admin of the middle note may not even care about you loosing some messages.
The obvious way to get an end-to-end delivery guarantee is for the ack to come all the way back from the final endpoint. That's one valid use case, and it also means all the intermediate nodes can be stateless switches.
Another use case is when you need to get the message queued quickly for later delivery. The NAS will only retransmit its RADIUS accounting packet a few times before discarding it; the user sitting at a web page will only wait so long. So getting the message into a persistent queue, which is not the final endpoint, is also a valid use case.
I think both cases can be supported by the same protocol. Origin endpoint -> queue can be one guaranteed delivery, and queue -> final endpoint can be another guaranteed delivery. That's the SMTP model. If you are worried about a catastrophic failure of the intermediate node you could deliver to N queues on N nodes.
Messages crossing different domains of control or untrusted networks are IMO a separate problem. I think it is rare that you would want to route messages via an untrusted third-party messaging node; if possible you would want to tunnel through their network. The exception is XMPP-style rendezvous, but such a server has to be trusted by both parties.
In the SMTP world, the queues are (1) on the sender's PC; (2) on the sender's ISP's trusted outbound SMTP relay; and (3) at the receipient's ISP's SMTP receiver. The sender is explicitly configured to send via the outbound SMTP relay, and that relay uses DNS information to locate the recipient's SMTP receiver.
I don't see why a similar model can't be used for general messaging. You may choose to send messages directly to the other party, or you may send to a 'nearby' trusted queuing system, and/or you may have a local queue on the originating system.
There are other problems to making 0MQ scale globally, such as endpoint naming and message routing. The 0MQ model of one-TCP-port-per-service doesn't scale as well as SMTP RCPT TO or HTTP Host: header - although pubsub at least has a topic.
Not done it for zeromq, but for quite a few others to make them work at some reasonable speed without disk e.g. for large grid calc engines which tend to be unrelaible.
At the application node level, the queue isn't be a queue in the normal sense, it is a historical state machine modelling a distributed journal of event notifications. Nothing gets popped, just its status is marked until some archival process comes along and sweeps away old stuff.
Reliability is separated into confirmation (normal reliability) and recovery (node failure), much like clustered session state.
Both can be seen as the detection of, and querying from upstream for missed messages.
An important concept that is needed is the ability to send null messages for confirmation of reachability both upstream and downstream (heartbeats).
This is all a lot of work for the messaging layer, and starts to turn it into something else : a fully distributed transaction manager.
Yup. Too complex for 0MQ IMO.
In the context of messaging systems one is usually concerned only about the case where the consumer stops consuming for a finite amount of time, and then restarts normal operation. Brokered messaging systems provide "reliable" messaging by persisting the data at the broker till it is consumed. These too could fail, but the messaging mode is still considered reliable.
It is the application architects responsibility to build fault tolerance around this. Whether they do it by replicating the disk over Symmetrix disks, or by having a hot standby is out of scope for the designer of the messaging service.
In other words the immediate problem can be framed in more relaxed terms than the problem the cluster manager level solution above seeks to solve.
thanks,
dh1r
Not sure I see it as a "cluster management" problem, more a problem of levels.
To clarify, I think reliability intrudes at multiple levels of abstraction, usually expressed at multiple scales.
The question for us then is to ask at what scale it is trying to be reliable, and under what challenges?
The pattern I observe is that there is a confirmed handoff between upstream and downstream participants. With that handoff comes a direct responsibility to persist the appropriate data. What I notice is that the data, and associated transfer messages, should be persisted together in the strong sense of in a guaranteed transactionally coherent form under all contingencies. Practically that usually means in the same database/same disk (though distributed db's like Mnesia are interesting).
So, if messaging is point to point, and extended responsibility rests with the producing and consuming application (as distinct from the messaging infrastructure), then the messaging infrastructure should not need to persist anything except what is needed to provide a safe transfer of data to the upper layers, which is basically the queue itself, baton-passing protocol, heartbeats, and hooks for the upper layers to participate/monitor.
Where messaging tries to take on this responsibility, things get very messy (and slow) very quickly. Particularly, under contingency (infrastructure sudden death/network partition/application historical recovery).
Doesn't mean we shouldn't design nice solutions, but I would draw a pretty hard line between the layers of protocol and maybe not even conceptually consider it part of messaging, but of the application.
May I assume that this topic is not about 0MQ functionality per se?
0MQ remains 'sockets on steroids' and provides a highly efficient, easy to program (sockets) API and is as 'reliable' as the transport method it uses, e.g., TCP/IP?
Anyone wishing additional reliability, be it durable messages or whatever, implements that on top of the 0MQ layer (SPB framing)?
Apologies for perhaps stating the obvious and partly repeating the introduction to the topic!
Is it possible to setup 0MQ such that the sender/publisher publishes to "multiple" 0MQ nodes which are fanned in for the receiver/subscriber? In other words, sender -> n x (0MQ nodes) -> fan in -> receiver. This way, if one of the nodes (actually n-1 nodes) dies, the message can still have guaranteed delivery. A simple replication-based scheme maybe better in terms of performance for message durability than true persistence, right?
Has there been any movement over the last year regarding reliability under 0MQ?
There has been a huge amount of work done with respect to reliability, see http://zguide.zeromq.org/ chapters 4 and 5.
Portfolio
Have a look at PZQ project here:
https://github.com/mkoppanen/pzq/wiki/An-Introduction-To-PZQ
It may provide what you are looking for.