Documentation

Reference manual

The ØMQ distribution comes with a detailed reference manual in both UNIX manual page and HTML format.

The reference manual for the latest release can also be viewed on-line.

Sample code

A simple client/server application

This example implements a simple "SQL server" following the client/server model. The server listens to requests using unicast TCP. Each request is a simple NULL-terminated ASCII string. Upon receiving a request the server will send back a canned response of OK, also as a NULL-terminated string.

The server application, using C bindings:

#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <zmq.h>
 
int main () 
{
    int rc;
    void *ctx, *s;
    zmq_msg_t query, resultset;
    const char *query_string, *resultset_string = "OK";
 
    /* Initialise 0MQ context, requesting a single application thread
       and a single I/O thread */
    ctx = zmq_init (1, 1, 0);
    assert (ctx);
    /* Create a ZMQ_REP socket to receive requests and send replies */
    s = zmq_socket (ctx, ZMQ_REP);
    assert (s);
    /* Bind to the TCP transport and port 5555 on the 'lo' interface */
    rc = zmq_bind (s, "tcp://lo:5555");
    assert (rc == 0);
 
    while (1) {
        /* Allocate an empty message to receive a query into */
        rc = zmq_msg_init (&query);
        assert (rc == 0);
 
        /* Receive a message, blocks until one is available */
        rc = zmq_recv (s, &query, 0);
        assert (rc == 0);
 
        /* Process the query */
        query_string = (const char *)zmq_msg_data (&query);
        printf ("Received query: '%s'\n", query_string);
        zmq_msg_close (&query);
 
        /* Allocate a response message and fill in an example response */
        rc = zmq_msg_init_size (&resultset, strlen (resultset_string) + 1);
        assert (rc == 0);
        memcpy (zmq_msg_data (&resultset), 
            resultset_string, strlen (resultset_string) + 1);
 
        /* Send back our canned response */
        rc = zmq_send (s, &resultset, 0);
        assert (rc == 0);
        zmq_msg_close (&resultset);
    }
}

The client application, using C++ bindings:

#include <stdio.h>
#include <zmq.hpp>
 
int main ()
{
    try {
        // Initialise 0MQ context with one application and one I/O thread
        zmq::context_t ctx (1, 1);
        // Create a ZMQ_REQ socket to send requests and receive replies
        zmq::socket_t s (ctx, ZMQ_REQ);
        // Connect it to port 5555 on localhost using the TCP transport
        s.connect ("tcp://localhost:5555");
 
        // Construct an example zmq::message_t with our query
        const char *query_string = "SELECT * FROM mytable";
        zmq::message_t query (strlen (query_string) + 1);
        memcpy (query.data (), query_string, strlen (query_string) + 1);
        // Send the query
        s.send (query);
 
        // Receive and display the result
        zmq::message_t resultset;
        s.recv (&resultset);
        const char *resultset_string = (const char *)resultset.data ();
        printf ("Received response: '%s'\n", resultset_string);
    }
    catch (std::exception &e) {
        // 0MQ throws standard exceptions just like any other C++ API
        printf ("An error occurred: %s\n", e.what());
        return 1;
    }
 
    return 0;
}

To build this example, ensure that you have ØMQ built with --with-c and --with-cpp to include the C and C++ bindings, and run the following:

gcc `pkg-config --libs --cflags libzmq` -o server server.c
g++ `pkg-config --libs --cflags libzmq` -o client client.cpp

You can then run ./server and ./client.

Publish/subscribe example

This example implements a publisher which publishes a continuous stream of 8 byte messages containing a message id as a long long in big-endian byte order. We use native operations for each language to serialize the long long value into a byte buffer.

The publisher application, using Java bindings:

import java.nio.*;
import org.zmq.*;
 
class publisherApp
{
    public static void main (String [] args)
    {
        // Initialise 0MQ with a single application and I/O thread
        org.zmq.Context ctx = new org.zmq.Context (1, 1, 0);
        // Create a PUB socket for port 5555 on the lo interface
        org.zmq.Socket s = new org.zmq.Socket (ctx, org.zmq.Socket.PUB);
        s.bind ("tcp://lo:5555");
 
        for (long msg_id = 1; ; msg_id++) {
            // Create a new, empty, 8 byte message
            byte msg [] = new byte [8];
 
            // Fill it with the current message ID
            ByteBuffer bb = ByteBuffer.wrap (msg);
            bb.putLong (msg_id);
 
            // Publish our message   
            s.send (msg, 0);
        }
    }
}

The subscriber application, using Python bindings:

import libpyzmq
from struct import unpack
 
def main ():
 
    # Initialise 0MQ with a single application and I/O thread
    ctx = libpyzmq.Context (1, 1)
    # Create a SUB socket ...
    s = libpyzmq.Socket (ctx, libpyzmq.SUB)
    # ... ask to subscribe to all messages ...
    s.setsockopt (libpyzmq.SUBSCRIBE , "")
    # ... request the tcp transport with the endpoint myserver.lan:5555
    s.connect ("tcp://localhost:5555")
 
    while True:
        # Receive a message
        msg = s.recv ()
 
        # Unpack the msg_id from it
        msg_id = unpack ('>q', msg)[0]
 
        # Show progress every 10k messages
        if (msg_id % 10000) == 0:
            print msg_id
 
if __name__ == "__main__":
    main ()

To build this example, ensure that you have ØMQ built with --with-java and --with-python to include the Java and Python bindings, and run the following to build the Java publisher:

javac -cp .../zeromq/bindings/java/Zmq.jar ./publisherApp.java

Replace …/zeromq with the path to your ØMQ source tree.

You can then run the Java publisher as follows (again, replacing …/zeromq as appropriate):

java -Djava.library.path=/usr/local/lib -cp .../zeromq/bindings/java/Zmq.jar:. publisherApp

And the Python subscriber as follows:

python ./subscriber.py

Chat example

Simple example for newbies, an instant messaging application. Get the code here.

Camera example

This example demonstrates a more complex ØMQ application using multiple threads and transports. To get the code, see the git tree.

Cookbook

Message subscriptions

Messages that receivers are going to subscribe to have to start with a NULL-terminated string (the topic). To send the message ABCDE with the topic xy.z do the following:

zmq::message_t msg (10);
memcpy (msg.data (), "xy.z\x00" "ABCDE", 10);

Subscribing to all messages:

s.setsockopt (ZMQ_SUBSCRIBE, "", 0);

Subscribing to a particular topic:

s.setsockopt (ZMQ_SUBSCRIBE, "xy.z\x00", 5);

Subscribing to topics with a specified prefix:

s.setsockopt (ZMQ_SUBSCRIBE, "xy.", 3);

Multiple subscriptions can be active at the same time. In this case, any message matching at least one subscription will be passed to the user:

s.setsockopt (ZMQ_SUBSCRIBE, "x.", 2);
s.setsockopt (ZMQ_SUBSCRIBE, "y.", 2);

Sending a message using multicast

// Initialise 0mq context with a single application and single I/O thread
zmq::context_t ctx (1, 1);
// Create a ZMQ_PUB socket
zmq::socket_t s (ctx, ZMQ_PUB);
// Limit the throughput rate to 10Mb/s
int64_t rate = 10000;
s.setsockopt (ZMQ_RATE, &rate, sizeof (rate));
// Connect our socket to the multicast group 224.0.0.1:5555 on the eth0 interface
s.connect ("udp://eth0;224.0.0.1:5555");
// Send a message
zmq::message_t msg (10);
memcpy (msg.data (), "xy.z\x00" "ABCDE", 10);
s.send (msg);

Receiving a message using multicast

// Initialise 0mq context with a single application and single I/O thread
zmq::context_t ctx (1, 1);
// Create a ZMQ_SUB socket and bind it to the multicast group 224.0.0.1:5555 on eth0
zmq::socket_t s (ctx, ZMQ_SUB);
s.connect ("udp://eth0;224.0.0.1:5555");
// Subscribe to all messages
s.setsockopt (ZMQ_SUBSCRIBE, "", 0);
// Receive a single message
zmq::message_t msg;
s.recv (&msg);

Using zmq_forwarder to set up a forwarding device

Create following configuration file myconfig.cfg. The forwarder will connect to IP address 192.168.0.1 and port 5555 to get the message feed. It will republish the feed to all applications that will connect to its interface eth0, port 5556:

<forwarder>
    <in>
        <connect addr = "tcp://192.168.0.1:5555"/>
    </in>
    <out>
        <bind addr = "tcp://eth0:5556"/>
    </out>
</forwarder>

To start the forwarder:

zmq_forwarder ./myconfig.cfg

Establishing inter-thread communication

Following example shows how to establish a connection between worker thread and main thread using inproc transport. The messaging pattern used (publish/subscribe) is arbitrary and can be replaced by messaging pattern of your choice.

void *worker_routine (void *arg)
{
    //  Connect to the inproc endpoint created by the main thread.
    zmq::context_t *ctx = (zmq::context_t*) arg;
    zmq::socket_t sw (*ctx, ZMQ_PUB);
    sw.connect ("inproc://pipe");
    ...
}
 
int main ()
{
    zmq::context_t ctx (2, 1);
 
    //  Create inproc endoint called 'pipe'.
    zmq::socket_t sm (ctx, ZMQ_SUB);
    sm.setsockopt (ZMQ_SUBSCRIBE, "", 0);
    sm.bind ("inproc://pipe");
 
    //  Start the worker thread.
    pthread_t worker;
    int rc = pthread_create (&worker, NULL, worker_routine, (void*) &ctx);
    assert (rc == 0);
 
    ...
}