ØMQ Lightweight Messaging Kernel (v0.2)

WARNING: This text is deprecated and refers to an old version of ØMQ. It remains here for historical interest. DO NOT USE THIS TO LEARN ØMQ.

HISTORICAL WHITEPAPER

Introduction

Where ØMQ/0.1 was more of a proof-of-concept rather than a full-blown solution, with all the code necessary to do the basic job, but no elaborate internal design, ØMQ/0.2 has a flexible internal architecture, designed to support a wide range of scenarios and use-cases.

Our idea is that ØMQ would be a vanilla kernel for messaging software, with multiple packagings, each exposing different facets of the kernel, in different ways, adding different utilities and components to the core codebase. So, rather than being monolithic application, ØMQ is a collection of building blocks you can put together in various ways to get different types of applications. You can build things like a AMQP broker, a AMQP client, a point-to-point messaging library, in-process event distribution library, an inter-protocol bridge, a multicast-enabler for AMQP-based systems, a logger application that logs all the message feed into a file, and so on. Implementing each of those requires just few lines of code needed to glue the components together.

This document describes the architecture of ØMQ lightweight messaging kernel (version 0.2) and explains the rationale behind architectural decisions.

What's new

  • Scaling on multicore systems;
  • Possibility to use multiple sockets;
  • Possibility to use ØMQ from several threads;
  • Pluggable wire-level protocols;
  • Pluggable APIs;

What's missing

  • Run-time management of I/O connections - at the moment you have to open all the network connections in advance;
  • Message routing - currently, messages are following predefined routes rather than being examined and passed on depending on their properties.

Goals

To get a better understanding of the rationale behind ØMQ architecture, here's the list of goals we've tried to accommodate:

  • ØMQ should be able to interact with several network connections at the same time.
  • ØMQ should be able to interact with several client threads at the same time.
  • ØMQ must process (send & receive) messages even when client application is busy.
  • ØMQ must be able to use a predefined number of CPU cores in parallel.
  • It should be possible to minimise the number of threads used so that there are no superfluous context switches even on a single-core machine.
  • ØMQ should be able to use several types of network connections at the same time (say TCP, SCTP or PGM) and convert messages between them.
  • Specifically, ØMQ should be able to convert messages passed over one-to-one connection (like TCP) to messages on one-to-many connection (like PGM). The reason is that better performance can be gained on TCP for a low number of recipients. It would be redundant to multicast to only one receiver.
  • ØMQ should be able to use several models of I/O interaction at the the same time (say standard I/O vs. asynchronous I/O).
  • ØMQ should be able to use several wire-level messaging protocols at the same time (say AMQP, native backend protocol, STOMP or XMPP).
  • ØMQ should be able to provide several APIs at the same time (native API, WireAPI, JMS-like API etc.)
  • ØMQ should allow for different models of notification of client application (blocking calls, callbacks, pollable file descriptors, real-time signals etc.)
  • ØMQ should be flexible enough to allow unidirectional connections (such as PGM) aside of bidirectional ones.
  • Where messages are passed within a single process (from one thread to another) the networking stack (e.g. local sockets) shouldn't be involved in the process; also, no copying of message data should occur.
  • Where messages are passed within a single thread, no synchronisation (locks, atomic operations) should be involved.
  • ØMQ should support message batching in all components to keep the throughput high; the size of batching should be consumer driven, so that the latency is kept low at the same time.
  • ØMQ should prevent head-of-line blocking: a message that has no immediate receiver should not block other messages from being received.
  • ØMQ should avoid busy-waiting and other CPU-intensive operations so that there is as much CPU power left for the client application as possible.
  • The design should be flexible enough to support at least following scenarios: AMQP broker, AMQP client, AMQP broker federation, protocol bridge, peer-to-peer messaging, multicast and in-process messaging.

Design

Dispatcher and Engines

Inter-thread synchronisation is slow. If the code is local to a thread (and doesn't use slow devices like network or persistent storage), execution time of most functions is tens of nanoseconds. However, when inter-thread synchronisation - even a non-blocking synchronisation - kicks in, execution time grows by hundreds of nanoseconds, or even surpasses one microsecond. All kind of time-expensive hardware-level stuff has to be done… synchronisation of CPU caches, memory barriers etc.

The best of the breed solution would run in a single thread and omit any inter-thread synchronisation altogether. It seems simple enough to implement except that single-threaded solution wouldn't be able to use more than one CPU core, i.e. it won't scale on multicore boxes.

A good multi-core solution would be to run as many instances of ØMQ as there are cores on the host and treat them as separate network nodes in the same way as two instances running on two separate boxes would be treated and use local sockets to pass messages between the instances.

This design is basically correct, however, the sockets are not the best way to pass message within a single box. Firstly, they are slow when compared to simple inter-thread communication mechanisms and secondly, data passed via a socket to a different process have to be physically copied, rather than passed by reference.

Therefore, ØMQ allows you to create a fixed number of threads at the startup to handle the work. The "fixed" part is deliberate and integral part of the design. There are a fixed number of cores on any box and there's no point in having more threads than there are cores on the box. In fact, more threads than cores can be harmful to performance as they can introduce excessive OS context switching.

Each thread acts as if it was running one single-threaded instance of ØMQ. The threads are therefore mutually independent and can do without inter-thread synchronisation whatsoever. The only point where synchronisation is done is when one ØMQ 'instance' (i.e. thread) sends a message to another 'instance' (thread). When such a thing happens, the two instances communicate using an efficient message passing mechanism (y-suite) rather than slow and expensive sockets.

The component that passes messages between threads is called the dispatcher.

When ØMQ starts, we create a single instance of the dispatcher and as many threads as needed, each thread being attached to the dispatcher. Each thread runs as fast as possible with no synchronisation involved, using dispatcher to send messages to other threads.

From an implementation point of view, each thread is encapsulated in a object called an engine. (Actually, there are both engine and thread objects, however, they map 1:1 in version ØMQ/0.2 so we won't worry too much about distinguishing them.) There are three engine types at the moment:

  • bp_engine encapsulates a socket that it uses to send/receive messages via ØMQ backend protocol.
  • amqp09_engine encapsulates a socket that it uses to send/receive messages via AMQP protocol.
  • api_engine is a proxy that connects a thread belonging to the client application to the dispatcher.

The picture below shows the dispatcher set up to interconnect 3 engines (note that it is 3 x 3 matrix) and two engine instances. There should be one more engine there, however, it makes the picture messy. How the third engine should be connected to the dispatcher is left as an exercise for the reader:

zmq02-2.png

Each of the nine slots in the dispatcher is a pipe. Engine number 1 sends messages to the pipes in the first row of the dispatcher and receive messages from the pipes in the first column. Engine number 2 sends messages to second row, receive messages from second column, etc.

This means that each pair of engines has a pair of pipes to exchange the messages. One pipe is used to pass messages from engine 1 to engine 2, the other one to pass them in the opposite direction. To understand how the message passing via pipe is implemented have a look here.

Engines and Threads

At the moment there is one-to-one relationship between threads and engines. The consequence is that for each socket there has to be separate thread to take care of it. Obviously, in scenarios like AMQP broker where there are hundreds of open sockets the solution won't scale. That many threads would exhaust the system by excessive context switching.

In subsequent versions of ØMQ kernel we would like to address the issue by allowing several sockets to be handled by the same thread. Therefore, in anticipation of this change, we've separated thread and engine objects to allow one-to-many relationship:

zmq02-3.png

Note that the thread is called poll_thread. That's the only thread type at the moment (not taking client application threads into account). It's called poll because it uses POSIX poll function to wait for events from the engines. In the future we would like to supply various thread implementations like epoll_thread (using epoll to get events from the engines) and signal_thread (using POSIX real-time signals to get events from the engines - this one is neccesary so that we can use POSIX asynchronous I/O). Also, once we port ØMQ to Windows some kind of wait_for_multiple_objects_thread will be needed.

One of the consequences of one-to-one relationship between threads and engines in ØMQ/0.2 is that thread object has to do almost no scheduling - there is only one engine to schedule. The scheduling is therefore constrained to scheduling socket read vs. socket write operations.

In the future where there will be several engines allowed per thread, we will use cooperative multitasking. The thread object will schedule individual engines to run, however, each engine will be responsible for returning control to the thread object. Therefore, engines should split the work into manageable pieces rather then do large amount of processing in a single invocation. If one engine spends 1 second doing some processing, all other engines using same thread will be blocked for 1 second, meaning the latency will increase by 1 second at that moment. Therefore, engines will be designed specifically with cooperative multitasking on mind.

API engine

The API engine does not have a thread of its own. It is a proxy for the client application thread to access dispatcher. To prevent excessive synchronisation, api_engine is completely synchronisation-free i.e. non-thread-safe. Therefore, create a separate instance of api_engine object for each of the threads in your application.

api_engine exposes simple send and receive functions that can be used by the application to send or receive messages. If messages are being received from several threads at the same time api_engine uses John Nagle's simple "fair queueing" scheduling algorithm (RFC970) to balance the load.

The following picture shows the internal architecture of api_engine:

zmq02-4.png

The client application thread uses send and receive functions to send/receive messages. The messages are load balanced using a fair scheduler and sent to the dispatcher via a helper object called dispatcher_proxy. dispatcher_proxy shields api_engine from the details of interaction with the dispatcher object. One of the functions of dispatcher_proxy worth noting is that it doesn't forward messages that are sent just to be delivered back in a loopback fashion to the dispatcher, rather it routes them locally, thus avoiding the cost of inter-thread synchronisation inherent to the dispatcher implementation.

Backend protocol engine

ØMQ backend protocol is an extremely simple protocol designed specifically for high-performance scenarios. bp_engine hosts a TCP socket and uses it to send/receive messages in the backend protocol format.

Have a look at the internal architecture of the engine:

zmq02-5.png

First of all, note that engines are passive objects. They have no threads of their own. In the same way as api_engine has no thread contained within and is dependent on the client application thread to invoke its methods, bp_engine has no thread itself and depends on poll_thread to invoke it.

Now, when do we invoke the engine? With api_engine it is simple: we invoke the engine is invoked on client application request (either send or receive method is called). With bp_engine, the engine passes file descriptor of the socket to poll_thread and poll_thread in its turn uses it to wait for an event to occur (either 'socket ready for reading' or 'socket ready for writing' event). Once the event occurs, poll_thread invokes the appropriate method of bp_engine.

When the engine is invoked it reads/writes as much data as possible (up to the size of its internal buffer) from/to the socket. Incoming data is passed to bp_decoder, the object used to convert binary data into messages. bp_decoder passes messages to dispatcher_proxy (described above) which in its turn passes it to dispatcher. If data is to be written and there is no data in the engine's internal buffer, bp_encoder is asked to provide more data. bp_encoder asks dispatcher_proxy for newly arrived messages, which subsequently may ask dispatcher to provide them. bp_encoder turns the messages into a binary stream.

The size of engine's internal buffer is an important performance tuning parameter. A larger buffer means that messages will be send in large batches thus improving throughput and but also raising latency. A smaller buffer will result in large amount of small data chunks sent to/from the OS. This will improve latency but lower the throughput.

The AMQP engine

The life cycle of AMQP messages closely resembles the life cycle of backend protocol messages. However, as messages are not the only thing passed via AMQP, we had to extend the design to handle protocol commands:

zmq02-6.png

As can be seen, the amqp09_decoder object is able to distinguish between commands and messages. It passes massages to dispatcher (via dispatcher_proxy) and commands to amqp09_unmarshaller. The unmarshaller is a component that converts the command from its binary representation to actual method call (think of RPC). The method call invokes method in amqp09_fsm object. This is the AMQP protocol state machine. As AMQP defines two different peers (client & broker) with different state machines, amqp09_fsm is just an generalisation, not a real object. In the real world you'll use either amqp09_client_fsm or amqp09_server_fsm on the place occupied by amqp09_fsm on the picture.

The state machine processes the command and it may optionally produce a reply (say queue.declare-ok as a reply to queue.declare command). It calls the appropriate method on amqp09_marshaller. The marshaller converts the call into the AMQP wire-level binary representation (think of RPC once again) and stores it.

Once the socket is ready for writing amqp09_encoder is asked to provide data to send. First it asks the marshaller to provide data. The marshaller provides any commands it may have stored. Once there are no more commands to send, the encoder retrieves messages from dispatcher in the same way as backend protocol engine would.

The idea of this design is that commands should get out of the critical path as soon as they are recognised (in the decoder) and replies should join the critical path as late as possible (in the encoder). That way, message processing is in not hindered by any peculiarities of command processing. Using this design we are able to transfer up to 700,000 AMQP messages a second on pretty standard hardware even though AMQP is not particularly well suited for high performance messaging.

It should be noted that amqp09_marshaller and amqp09_unmarshaller are fully generated from the AMQP specification XML rather than hand-written. We do the code generation using iMatix GSL (a code generation language that is rather more sophisticated than XSLT) and distribute the generated code in the ØMQ package.

Also note that the AMQP engine is in its initial version and has very limited functionality. It does connection establishment handshaking and can send and receive messages to a ØMQ broker and/or OpenAMQ.

What happened to x-mode?

You may recall that ØMQ/0.1 implemented an extreme low latency mode called x-mode. X-mode is an optimisation capable of getting the ØMQ latency overhead over underlying transport (TCP) down to under one microsecond. Actually, as x-mode was implemented only on the sending side of the connection in ØMQ/0.1, we believe that implementing it on both sides - receiving as well as sending one - would cut sub-microsecond latency overhead to virtual zero.

In version 0.2 we've removed the x-mode functionality as it poses various constraints on the nature of connection (only one client thread per x-mode socket etc.). We plan to reintroduce x-mode using dedicated ultra-low latency components in future releases.

Examples

Peer-to-peer communication

The following diagram shows ØMQ components plugged together to form an ultra-fast peer-to-peer connection. Note that this setup corresponds to how ØMQ/0.1 used to work. Check the corresponding document for the details.

zmq02-7.png

The diagram shows two boxes, each hosting a single application. The application code is single-threaded and uses API engine component to speak to the dispatcher. The dispatcher transfers the messages to/from the working thread (the one used to run the backend protocol engine) which in its turn sends them to/receives them from the network. As the ØMQ backend protocol is symmetric, the other peer looks exactly the same.

Using ØMQ from multiple threads

Every ØMQ engine is strictly single-threaded to avoid overhead associated with inter-thread synchronisation. Therefore, if your application has several threads, each with a need for messaging functionality, you should create a separate API engine to be used by each of them. The following diagram shows the setup:

zmq02-8.png

Using multiple network connections

If you need to open several network connections, say you want to send messages to two different boxes, you should create a protocol engine (backend protocol engine in this case) for each of them. Recall that ØMQ/0.2 has no dynamic connection creation - connections are created during the engine creation and cannot be added or removed later on. This constraint is obviously inconvenient for real-world usage and we plan to remove it for version 0.3.

zmq02-9.png

Obviously, multiple API engines can be combined with multiple protocol engines in case you have need for several network connections and several client threads at the same time.

The AMQP broker & AMQP client

With the AMQP engines provided in the package you are able to compose AMQP broker and AMQP client quite easily. The following diagram shows this should be done. Note that AMQP broker has no API engine - i.e. there's no one to send or receive messages directly in the broker process.

zmq02-10.png

ØMQ's flexible internal design allows for much more than AMQP broker and AMQP client. Two examples of more complex patterns (protocol bridge and broker federation) follow.

A protocol bridge

A protocol bridge converts one protocol to another one. On the following picture, the bridge is converting AMQP messages to ØMQ backend protocol messages and vice versa:

zmq02-11.png

This kind of bridge may be useful if you want to add ØMQ ultra-low-latency components into your existing AMQP network. Also think of a bridge converting AMQP messages to PGM messages and vice versa (once PGM engine is available) - that would allow to introduce multicast to your messaging network without actually changing your application.

AMQP broker federation

Messaging broker federation is a pattern where two messaging brokers communicate to share a subset of messages. AMQP broker federation is a key feature of the iMatix OpenAMQ product. For detailed discussion of the pattern have a look here.

zmq02-12.png

As can be seen on the diagram, our implementation of broker federation is simple. One broker plays the role of an AMQP client when speaking to the other broker.

In-process communication

This pattern is almost trivial - one client thread speaks to another client thread - however, it may prove useful in case you need to transfer large load of messages between two threads within a single process. The actual message throughput would be much higher than what can be achieved with standard synchronisation mechanisms like critical section or with local sockets.

zmq02-13.png

How can you help?

  • ØMQ is open for adding different wire-level messaging protocols (XMPP, SOAP-based protocols, STOMP, BEEP etc.)
  • ØMQ is open for adding different APIs - APIs used by existing middleware would be extemely helpful to make transition of existing applications to ØMQ easy
  • ØMQ is open for adding different transport mechanisms (SCTP, PGM, SDP/InfiniBand etc.)

Comments: 0

Add a New Comment