Instant Messaging (Tutorial)

WARNING: This text is deprecated and refers to an old version of ØMQ. It remains here for historical interest. DO NOT USE THIS TO LEARN ØMQ.

Introduction

Messaging without the broker is a bit unfamiliar to the developers accustomed to the traditional messaging products like IBM's WebSphere MQ. Although ØMQ does it's best to make the difference between broker-based and broker-less model more of a technical detail than something the developer has to bother with, there are still some aspects to take into account. Most importantly, if message sender and message receiver are not running at same time, there's no place to store the messages in the meantime. With broker-based architecture they are stored in broker, however, as there's no broker in our case, developer has to write message storing application himself. This example shows how to use this design pattern to implement traditional broker-dependent application (chatroom) over ØMQ.

To see a genuine broker-less application, have a look at videoconferencing example packaged with ØMQ. There's no broker (or a substitute like the "mesage storing application" above) in the middle and thus the latency is kept very low. However, with such a design you cannot pass messages when sender and receiver are not alive at the same time.

If you are eager to see a even more real-world-like broker-less application check stock exchange example. It's an example simulating internal workings of a stock exchange - stock trading being the primary business focus of ØMQ lightweight messaging kernel.

Design

Components

If you were implementing instant messaging (IM) on top of a classic broker-based "star" messaging architecture (one broker centralising the discussions between many clients), the broker would hold the messages and all IM clients would connect to the broker to access chatrooms.

With a broker-less architecture we'll write the application to hold the messages ourselves. So actually you'll reimplement "star" architecture by hand. Resulting design will be a distributed application with three types of component:

  1. The "chatroom" receives and redistributes the messages from users;
  2. The "prompt" gets user input and sends it to the chatroom;
  3. The "display" connects to the chatroom and displays the messages from it.
im1.png

Wiring

To send messages between the applications, you first have to create the wiring. Term wiring refers to the static part of the application layout. Wiring is usually established on application startup and it is either not changed or changed very infrequently afterwards. Wiring consists of two types of entities and one type of relationship:

  1. Exchange is an entity your application can send messages to.
  2. Queue is an entity your application can receive messages from.
  3. Binding is a relationship between exchange and queue, meaning that the messages sent to the exchange will be forwarded to the queue. If there are several queues bound to a single exchange, message published to the exchange is delivered to all of the queues. If a queue is bound to several exchanges, it gets a copy of each message published to any of the exchanges.

Each exchange or queue can have a different visibility scope:

  • visible only to the thread that created it, also known as "local";
  • visible within the process it resides in, also known as "process", or
  • visible all over the network, also known as "global".

For example, if I create a local exchange E, I am able to bind it to global queue Q created on a different box. However, the application on the other box is not able to establish binding between E and Q because E is not visible to it.

First of all, let's consider the chatroom component. It is going to receive messages from users. Thus it needs one queue that will store the messages until the chatroom program processes them. It is going to send messages to the users so that they know what is being said in the chatroom. Thus it needs one exchange it'll send messages to. User applications (display) will bind to this exchange to get the messages.

Note that chatroom is not going to connect to the user applications. It doesn't know what the user applications are, where they are running and how many of them are on the network. Instead, user applications are going to connect to the chatroom. To allow them to do so, chatroom must make it's queue and it's exchange public (global) so that they can bind to them.

Choosing to make the exchange and the queue public (global) means that we should choose unique names for them - there cannot be two global objects with the same name on the network. While choosing "E" and "Q" would do, it would prevent more than one chatroom running at the time - the second one would experience name collision. Given that each chatroom has a name and that the name is unique (e.g. there's only one chatroom dedicated to discussing cheese and it's called - surprisingly - "cheese") we can name the exchange E_<chatroom name> (E_cheese) and the queue Q_<chatroom name> (Q_cheese). That way we can run several chatrooms at the same time. On the other hand, if we'll try to run two "cheese" chatrooms in parallel, the second one will fail.

As for the prompt application, it is used to send messages to the chatroom. Thus it needs an exchange to send messages to. It needs no queue as it's not going to receive any messages.

Nobody is going to connect to the prompt's exchange. On the contrary, prompt is going to connect (bind) it's exchange to the queue exposed by a particular chatroom, so that the messages typed by the user are sent to the chatroom and subsequently to all the discussion participants. Thus, the exchange doesn't have to be public. Actually, it cannot be: If the exchange would be public (global), we would have hard time trying to ensure that it's name is unique and doesn't collide with other prompt instances ran by different users.

Display application receives messages from a chatroom to display them and so it needs a queue to receive messages to. It doesn't need an exchange because it sends no messages. Following the same line of thought as with prompt we'll decide that display's queue will be private (local) rather than public (global).

The following picture shows these three components and their exchanges and queues:

im2.png

Thread layout

One of the design principles behind ØMQ was to delegate tight control of resources to the developer. Other messaging systems tend to create threads, allocate excessive amounts of memory, choose network interfaces to use etc. all by themselves. That's nice as long as you are creating dummy applications to learn the product - you don't have to care about lot of things. However, once you move to production environment and you want to do fine tuning of your application, it becomes a problem.

With ØMQ you are actually forced to allocate the resources yourself. With respect to threads you have to determine how many threads you'll use to do business logic (application threads), how many you'll use to interact with the network (I/O threads), which exchanges and/or queues are associated with a specific I/O thread etc.

As our instant messaging application is quite simple and won't be used in performance-critical environments, we'll choose the simplest possible model. For each component (chatroom, prompt, display) there'll be one application thread and one I/O thread. All the queues and exchanges will be handled by the I/O thread. The layout is shown on the following picture:

im3.png

Network interfaces

When you are creating a global object - whether exchange or queue - you have to specify which network interface it'll use to communicate with the world. This may seem annoying, however, it's the consequence of the principle of tight control of resources mentioned in the previous section. Actually, specifying the interface explicitly becomes a necessity in datacenter environments where individual boxes tend to have multiple network interfaces, often not even of the same kind (for example, administration may be handled by standard 1GbE, while high-volume connections may run on 10GbE, InfiniBand or SCI-based network interface cards).

As for our instant messaging application, the only component declaring global objects (exchanges/queues) is chatroom component. Prompt and display don't expose any global objects and thus they don't have to care about assigning network interfaces. Chatroom exposes one exchange and one queue. To keep the code simple we'll allow user to pass the network interface for each of them as a command line parameter.

Implementation

In this section we'll gradually compose the code of the instant messaging application. However, we are not going to describe every single line of code, just those that are important for understanding the way to use ØMQ. For full source code of the application, see chat example bundled with the ØMQ package.

Chatroom: Initialising the infrastructure

Dispatcher is the object that interconnects all the ØMQ threads (both application threads and I/O threads). The parameter is the overall number of threads you are going use. If you want your program to be really efficient, this number should be less than number of CPU cores on your box. That way operating system can avoid excessive context switches. As for our application, we'll already decided that we'll use one application thread and one I/O threads. Thus we initialise dispatcher to be able to handle 2 threads:

dispatcher_t dispatcher (2);

Locator is the directory service that allows your application to query for objects (exchanges and queues) on the network. To initialise locator you have to pass it IP address and port of where zmq_server is running. We initialise it by 'host' which is host name passed to the chatroom on the command line:

locator_t locator (host);

Chatroom: Implementing the thread layout

As already mentioned, we are going to use two threads. One of them will be application thread, other one will be I/O thread. Let's first create the I/O thread - we'll pass it the dispatcher as a parameter so that it is able to communicate with other threads:

i_thread *pt = io_thread_t::create (&dispatcher);

At this point the thread is running and ready to process network traffic. Now, let's have a look at the application thread.

Application threads are created by user, there's no need to create them inside of ØMQ. However, we still need to create the proxy that'll allow our application thread to speak to ØMQ. The proxy is called api_thread. Keep in mind that it's a placeholder for particular application thread and that it shouldn't be used outside of this thread. I.e. if you have three application threads, you should create three api_threads and use each one exclusively from one thread. The parameters for api_thread creation are dispatcher - so that the thread knows how to communicate with other threads - and locator - so that it is able to find objects (exchanges and queues) resident on the network:

api_thread_t *api = api_thread_t::create (&dispatcher, &locator);

Chatroom: Establishing the wiring

At this point we are ready to set up the wiring. First we'll create global queue named Q_<chatroom name>. Chatroom_name variable is retrieved from the application command line. So is in_interface variable that specifies which network interface and which port should be used to handle the networking traffic associated with the queue. When creating a global object you should as well specify which I/O thread should be used to listen for incoming connections (connections created by other ØMQ applications when binding to the global object) and an array of I/O threads to handle actual established connections. If there are multiple I/O threads in the array, connections are distributed in round-robin manner among them. In our chatroom application we have just a single I/O thread and thus both listening for the incoming connection and processing data on established connections should be done by this thread. We specify pt as the I/O thread to handle listening and single item array containing pt (1, &pt) as the I/O threads to handle actual message traffic:

char tmp [16];
snprintf (tmp, 16, "Q_%s", chatroom_name);
api->create_queue (tmp, scope_global, in_interface, pt, 1, &pt);

Almost everything said above applies to the global exchange we are going to create. We're using E_ prefix instead of Q_ prefix, out_interface instead of in_interface. The only significant difference is that we store the exchange ID of the newly created exchange for later use:

snprintf (tmp, 16, "E_%s", chatroom_name);
int eid = api->create_exchange (tmp, scope_global, out_interface, pt, 1, &pt);

Chatroom: Adding the business logic

Business logic of the chatroom component is pretty straightforward:

while (true) {

    //  Get a message
    message_t in_message;
    api->receive (&in_message);

    //  Get the current time. Replace the newline character at the end
    //  by space character.
    char timebuf [256];
    time_t current_time;
    time (&current_time);
    snprintf (timebuf, 256, ctime (&current_time));
    timebuf [strlen (timebuf) - 1] = ' ';

    //  Create and fill in the message
    message_t out_message (strlen (timebuf) + in_message.size ());
    char *data = (char*) out_message.data ();
    memcpy (data, timebuf, strlen (timebuf));
    data += strlen (timebuf);
    memcpy (data, in_message.data (), in_message.size ());

    //  Send the message
    api->send (eid, out_message);
}

Note that just two lines on the top and one line on the bottom have to do with ØMQ itself. The rest just deals with prefixing the message by chatroom-generated timestamp (so that individual messages in the chatroom are marked by the time they were processed at).

The two lines on the top retrieve a message. If there is no message available, the call will block until there's a message to return. Once you have the message, you can access its binary payload using data and size methods.

The line at the bottom sends a message to specific exchange (recall that we've stored exchange ID when we've create the exchange). To create the message just pass it the size in constructor and then use data and size functions to access and fill in its binary buffer.

Prompt

The prompt application is actually very similar to the chatroom. There are three substantial differences though:

  1. The wiring is different. Instead of creating global objects, we'll create a single local exchange and bind it to the global queue exposed by the chatroom.
  2. Business logic is different. Prompt reads messages from the console and sends them to the chatroom.

Here's the code to establish the wiring. Note that when nothing but exchange name is specified while creating the exchange, the exchange will be created as private (local), i.e. visible only to the thread that created it. Also note that when binding exchange to the queue, we specify I/O thread to use to connect to the remote exchange and I/O thread to use to connect to the remote queue. In our case only the queue is remote so only the seconds pt will be actually used. The exchange is running locally and doesn't need I/O thread to be accessed:

int eid = api->create_exchange ("E");
char tmp [16];
snprintf (tmp, 16, "Q_%s", chatroom_name);
api->bind ("E", tmp, pt, pt);

Business logic of the prompt component follows. It reads a message from the console, prepends it by the user name and sends it to the chatroom:

while (true) {

    //  Allow user to input the message text. Prepend it by user name.
    char textbuf [1024];
    fgets (textbuf, sizeof (textbuf), stdin);
    string text (user_name);
    text = text + ": " + textbuf;

    //  Create the message (terminating zero is part of the message)
    message_t message (text.size () + 1);
    memcpy (message.data (), text.c_str (), text.size () + 1);

    //  Send the message
    api->send (eid, message);
}

Display: Basic functionality

The guidelines for display are almost the same as for prompt. Firstly, the wiring consist of creating a local queue and binding it to the global exchange created by the chatroom. Secondly, business logic is different - it gets the messages from the chatroom and prints them to the console.

Code for wiring setup:

api->create_queue ("Q");
char tmp [16];
snprintf (tmp, 16, "E_%s", chatroom_name);
api->bind (tmp, "Q", pt, pt);

And the business logic:

while (true) {

    //  Get a message and print it to the console.
    message_t message;
    api->receive (&message);
    cout << (char*) message.data ();
}

Display: Disconnection handling

We would like user to be notified when disconnection happens. There should be a message printed to the console, notifying user of the fact.

By default ØMQ applies auto-reconnect strategy. If the connection is broken, reconnect is silently performed without even notifying the application. Thus the first thing to do is to switch on the notifications. Add following line immediately after creating the API thread:

api->mask (message_gap);

This done, call to receive method can return either valid message or a "gap message" - notification that connection was broken and that there may be messages missing in the queue. Let's modify message receiving code like this:

message_t message;
api->receive (&message);
if (message.type () == message_gap)
    cout << "Problems connecting to the chatroom..." << endl;
else
    cout << (char*) message.data () << flush;

The code is pretty straightforward. If the message is actually a disconnect notification (gap in the message stream) we'll print the warning for the user. Otherwise we print the message content in standard way.

Building it

To build the components, simply link them with pthread and libzmq libraries:

$ g++ -pthread -o chatroom chatroom.cpp libzmq.a

The same applies to prompt and display components.

To build the example code supplied in ØMQ package, use --with-chat option with configure:

$ ./configure --with-chat
$ make

Running it

First, you have to run ØMQ server. It is not a messaging broker in the sense that all the messages have to pass through it, rather it is a directory service that ØMQ applications use to let each other know on which network node they are running. However, from your point of view this is irrelevant. Just start the server before any other ØMQ application:

$ zmq_server

Normally, the server listens on all the network interfaces of the local machine on port 5682. Optionally you can specify which network port should be used:

$ zmq_server 3333

Once the server is running, let's start the chatroom on the same box. The chatroom is dedicated to discussing international affairs. However, keep in mind you can start as many different chatrooms as you wish. "chat.un.org" is the name of the box where the server is running, "eth0:5555" is network interface and port to receive messages on, "eth0:5556" is network interface and port to send messages through:

$ chatroom chat.un.org "international affairs" eth0:5555 eth0:5556

Caution: Although ØMQ allows you to specify the interface with a wildcard port ("eth0" - OS will pick a port number itself in that case) you should use fixed port numbers if you want auto-reconnect feature to work in the environment where applications are failing and being restarted.

If you don't want to specify the network interface by hand, you can use wildcard (asterisk) to specify that all the network interfaces should be used:

$ chatroom chat.un.org "international affairs" "*:5555" "*:5556"

That's it. We have the infrastructure running. Now it's Mr. Annan's turn to connect to the chatroom:

$ display chat.un.org "international affairs"

And in a second terminal window:

$ prompt chat.un.org "international affairs" kofi

Type a text to the prompt window and see it displayed in the display window. Connect more buddies to the chatroom and check that all the messages are correctly transferred to all the participants.

im4.png

Conclusion

This tutorial explained how to build simple distributed application using ØMQ. To find more sophisticated applications have a look at other examples - videoconferencing and stock exchange - bundled with ØMQ package.