ZeroMQ

Table of Contents

1 Introduction

ZMQ希望用户在编写应用程序方面,更加关心消息通信而不是socket连接。在内部已经使用异步方式帮用户处理好了网络IO读写以及数据成帧,使得用户在处理的时候只需要关注消息本身。 ZMQ所支持的消息传输模型有很多种,比如push-pull,pub-sub,request-reply以及exclusive-pair,应该可以涵盖应用层面所有的消息通讯方式。同时ZMQ本身可以底层允许选择不同的通信协议, 比如tcp,ipc(进程间通信),inproc(线程间通信).

ZMQ (ZeroMQ, 0MQ, zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry whole messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fanout, pub-sub, task distribution, and request-reply. It's fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems. ZMQ is from iMatix and is LGPL open source.

首先阅读一下manual.然后看看guide里面包括很多设计细节和使用细节东西。最后可以阅读一下API reference.关于Background方面的内容也有一些whitepapers可以参考。

ZMQ总结了几种消息通信模型可以覆盖所有的应用程序,分别是下面这几种:

  • Exclusive-Pair
  • Publish-Subscribe
  • Push-Pull
  • Request-Reply

我们会在后面仔细讨论每一种消息通信模型。

使用ZMQ之后的话不像我们所认识一样的server必须在client之前启动。在ZMQ下面的话client完全可以在server之前启动。一旦连接顺序并不按照我们所想象的那样工作的话,连接顺序是无关的时候,那么客户端和服务端这两个概念就非常模糊了。我们必须重新考虑什么是server,什么是client.ZMQ给出一个非常实际的答案。我们将socket嵌入到网络拓扑的时候,server应该是网络结构中稳定的部分,而client应该是网络结构中比较易变的部分。

What this means is that you should always think in terms of "servers" as stable parts of your topology, with more-or-less fixed endpoint addresses, and "clients" as dynamic parts that come and go. Then, design your application around this model. The chances that it will "just work" are much better like that.

1.1 Example

首先我们看看ZMQ的使用情况。我们以简单的Request-Reply消息模型为例。所谓Request-Reply模型就是client发送一个消息然后等待回应,而server接收到消息然后立刻回复。 这是一个lockstep的过程,就是说中途不能够切换做另外的事情,比如client发送消息之后就不能够再发送必须等待回应,而server接收到消息之后必须立刻恢复而不能够再次接收。 咋一看这是一个同步过程,但是后面可以看到ZMQ允许完成异步的Request-Reply.使用这种简单的通信模型我们可以很快地写出一个echo服务。

//============================================================
//client.cc.send 'hello' always to server,which hosts on localhost:19870
//============================================================

#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>

int main() {
    void* ctx=zmq_init(1);
    printf("connect to server 'localhost:19870'\n");
    void* req=zmq_socket(ctx,ZMQ_REQ);
    zmq_connect(req,"tcp://localhost:19870");
    for(int i=0; i<10; i++) {
        zmq_msg_t request;
        zmq_msg_init_size(&request,6);
        //including trailing '\0'.
        memcpy(zmq_msg_data(&request),"hello",6);
        printf("[C]send 'hello'\n");
        // 0 means in block way.
        zmq_send(req,&request,0);
        zmq_msg_close(&request);
        zmq_msg_t reply;
        zmq_msg_init(&reply);
        // 0 means in block way.
        zmq_recv(req,&reply,0);
        printf("[C]recv '%s'\n",zmq_msg_data(&reply));
        zmq_msg_close(&reply);
    }
    zmq_close(req);
    zmq_term(ctx);
    return 0;
}

//============================================================
//server.cc which hosts on *:19870
//============================================================

#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>

int main() {
    void* ctx=zmq_init(1);
    printf("bind to '*:19870'\n");
    void* rep=zmq_socket(ctx,ZMQ_REP);
    zmq_bind(rep,"tcp://*:19870");
    while(1){
        zmq_msg_t request;
        zmq_msg_init(&request);
        // 0 means in block way
        zmq_recv(rep,&request,0);
        const char* s=(const char*)zmq_msg_data(&request);
        printf("[S]recv '%s'\n",s);
        zmq_msg_t reply;
        // include trailing '\0'.
        zmq_msg_init_size(&reply,strlen(s)+1);
        memcpy(zmq_msg_data(&reply),s,strlen(s)+1);
        zmq_msg_close(&request);
        // 0 means in block way.
        printf("[S]send '%s'\n",s);
        zmq_send(rep,&reply,0);
        zmq_msg_close(&reply);
    }
    zmq_close(rep);
    zmq_term(ctx);
    return 0;
}

从编写角度来看的话确实简化了不少

  • zmq_init创建一个context.这个context就可以认为是一个MQ实例。1表示IO线程数。
  • zmq_socket根据context来创建一个socket,后面类型指定了MQ通信类型。
  • zmq_bind/zmq_connect可以进行绑定进行监听或者是进行连接。
  • zmq_msg_init/zmq_msg_init_size可以用来初始化一个message
  • zmq_send/zmq_recv可以进行message的发送和接收。
  • zmq_msg_close销毁一个message
  • zmq_close关闭一个socket
  • zmq_term销毁一个context

ZMQ底层做好了poller机制,对于server来说的话将多个connection映射到一个socket上面来了。底层使用其他线程完成了IO读写。 这里可以看到如果使用TCP的话底层应该是字节流,而我们没有指定任何成帧策略就得到了一条条消息,可以看到ZMQ内置有一个字节流成帧策略。

1.2 Protocol

我们从上面的Example里面看到,在进行zmq_bind/zmq_connect的时候指定了通信地址,而通信地址上面还附带了通信协议"tcp".ZMQ本身是允许工作在多种通信协议上面的:

  • tcp // tcp
  • ipc // 进程间通信。猜想底层应该是unix domain socket实现的.因为运行完毕之后我们可以看到socket文件。
  • inproc // 线程间通信。对于这种通讯协议来说的话底层IO线程没用使用。
  • pgm // ???
  • epgm // ???

我们可以非常容易地切换到其他通信协议上,而不需要修改任何代码。

zmq_connect(req,"ipc://fuck"); // client.cc
zmq_bind(req,"ipc://fuck"); // server.cc
[zhangyan@tc-cm-et18.tc.baidu.com]$ stat fuck
  File: `fuck'
  Size: 0               Blocks: 0          IO Block: 4096   socket
Device: 803h/2051d      Inode: 133580916   Links: 1
Access: (0755/srwxr-xr-x)  Uid: (  521/zhangyan)   Gid: (  524/zhangyan)
Access: 2011-09-26 14:02:44.000000000 +0800
Modify: 2011-09-26 14:02:44.000000000 +0800
Change: 2011-09-26 14:02:44.000000000 +0800

底层协议本身在使用的时候还有一些特别需要注意的地方,但是差异并不是很大,所以可以认为ZMQ在这个问题上解决还是比较好的。

1.3 Message

我们从上面的Example可以看到,ZMQ内部有一个默认的成帧策略,也就是说我们使用zmq_recv/zmq_send这样写成的webserver是不能够正常工作的, 因为zmq_recv/zmq_send只能够处理内置的消息格式,而不能够处理http请求这种字节流,按照文档的说法"ZMQ is not a neutral carrier".

There is however a good answer to the question, "how can I make profitable use of ZMQ when making my new XYZ server?" You need to implement whatever protocol you want to speak in any case, but you can connect that protocol server (which can be extremely thin) to a ZMQ backend that does the real work. The beautiful part here is that you can then extend your backend with code in any language, running locally or remotely, as you wish. Zed Shaw's Mongrel2 web server is a great example of such an architecture.

ZMQ的消息格式是这样的

struct msg{
    msg_size_t size; // 但是为了效率的话会使用特殊的方法进行压缩
    msg_data_t data[0];
};

ZMQ允许一条message按照多个部分进行发送(multipart message),为了能够更好地描述这节的话我们重新定义一些名词。 后面我们可能会混用这两个名词,但是读者应该是可以区分的:

  • frame.single part message.
  • message.多个frame组成的一条完整message.

我们使用下面的例子来说明如何进行multipart message传输和接收的。multipart message对于理解后面的路由非常重要。

//  Convert C string to ZMQ string and send to socket
static int
s_send (void *socket, char *string) {
    int rc;
    zmq_msg_t message;
    zmq_msg_init_size (&message, strlen (string));
    memcpy (zmq_msg_data (&message), string, strlen (string));
    rc = zmq_send (socket, &message, 0);
    zmq_msg_close (&message);
    return (rc);
}

//  Sends string as ZMQ string, as multipart non-terminal
static int
s_sendmore (void *socket, char *string) {
    int rc;
    zmq_msg_t message;
    zmq_msg_init_size (&message, strlen (string));
    memcpy (zmq_msg_data (&message), string, strlen (string));
    rc = zmq_send (socket, &message, ZMQ_SNDMORE);
    zmq_msg_close (&message);
    return (rc);
}

//  Receives all message parts from socket, prints neatly
//
static void
s_dump (void *socket)
{
    puts ("----------------------------------------");
    while (1) {
        //  Process all parts of the message
        zmq_msg_t message;
        zmq_msg_init (&message);
        zmq_recv (socket, &message, 0);

        //  Dump the message as text or binary
        char *data = (char*) zmq_msg_data (&message);
        int size = zmq_msg_size (&message);
        int is_text = 1;
        int char_nbr;
        for (char_nbr = 0; char_nbr < size; char_nbr++)
            if ((unsigned char) data [char_nbr] < 32
            ||  (unsigned char) data [char_nbr] > 127)
                is_text = 0;

        printf ("[%03d] ", size);
        for (char_nbr = 0; char_nbr < size; char_nbr++) {
            if (is_text)
                printf ("%c", data [char_nbr]);
            else
                printf ("%02X", (unsigned char) data [char_nbr]);
        }
        printf ("\n");

        int64_t more;           //  Multipart detection
        size_t more_size = sizeof (more);
        zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
        zmq_msg_close (&message);
        if (!more)
            break;      //  Last message part
    }
}

如果使用ZMQ出现消息丢失的话,那么可以按照下面这个solver来查找原因 http://zguide.zeromq.org/page:all#Missing-Message-Problem-Solver .

1.4 Identity

Identity可以用来表示一个socket的身份,对于ZMQ是非常有用途的,现在能够总结到的作用有下面这些:

  • 持久化socket(durable socket).影响到Publish-Subscribe通信模型的可靠性。
  • 路由(routing).影响到ROUTER的路由选择。

关于Identity似乎如何影响到上面两个方面的,我们会在后面的各个小节仔细描述。我们看看如何设置Identity的。

zmq_setsockopt(socket,ZMQ_IDENTITY,"dirlt",5);

如果没有设置Identity的话,那么在pub-sub模型上的话就会出现消息丢失,而在路由的时候那么ROUTER会帮助用户生成UUID. Identity的实现非常简单,就是整个message开头加上一个特殊的frame来标记的。

1.5 Device

一旦通信节点超过一定数量的话,那么最好需要一个转发节点或者是中间节点,不然通信费用以及管理复杂度都会急剧上升。作为一个转发节点来说的话, 逻辑非常简单,从一个socket读取数据,然后向另外一个socket里面写数据,可以认为类似于pipe这样的机制。在ZMQ里面称这样的节点为Device. ZMQ里面内置的Device有下面三种:

使用device也非常简单.

#include "zhelpers.h"
int main (void)
{
    void *context = zmq_init (1);

    // Socket facing clients
    void *frontend = zmq_socket (context, ZMQ_ROUTER);
    zmq_bind (frontend, "tcp://*:5559");

    // Socket facing services
    void *backend = zmq_socket (context, ZMQ_DEALER);
    zmq_bind (backend, "tcp://*:5560");

    // Start built-in device
    zmq_device (ZMQ_QUEUE, frontend, backend);

    // We never get here…
    zmq_close (frontend);
    zmq_close (backend);
    zmq_term (context);
    return 0;
}

ZMQ Guide里面提到了不要将不同Device和socket进行混用. If you're like most 01MQ users, at this stage your mind is starting to think, "what kind of evil stuff can I do if I plug random socket types into devices?" The short answer is: don't do it. You can mix socket types but the results are going to be weird. So stick to using ROUTER/DEALER for queue devices, SUB/PUB for forwarders and PULL/PUSH for streamers. 但是如果实际阅读代码的话,会发现这个部分的逻辑都是一样的,也就是事实上在现在ZMQ版本里面是可以混用的

int zmq_device (int device_, void *insocket_, void *outsocket_)
{
    if (!insocket_ || !outsocket_) {
        errno = EFAULT;
        return -1;
    }

    if (device_ != ZMQ_FORWARDER && device_ != ZMQ_QUEUE &&
          device_ != ZMQ_STREAMER) {
       errno = EINVAL;
       return -1;
    }

    return zmq::device ((zmq::socket_base_t*) insocket_,
        (zmq::socket_base_t*) outsocket_);
}

而zmq::device逻辑也非常简单,就是之前提到pipe工作机制。内部使用了ZMQ本身提供的zmq_poll机制来进行通知哪个socket上面有数据。

int zmq::device (class socket_base_t *insocket_,
        class socket_base_t *outsocket_)
{
    zmq_msg_t msg;
    int rc = zmq_msg_init (&msg);

    if (rc != 0) {
        return -1;
    }

    int64_t more;
    size_t moresz;

    zmq_pollitem_t items [2];
    items [0].socket = insocket_;
    items [0].fd = 0;
    items [0].events = ZMQ_POLLIN;
    items [0].revents = 0;
    items [1].socket = outsocket_;
    items [1].fd = 0;
    items [1].events = ZMQ_POLLIN;
    items [1].revents = 0;

    while (true) {

        //  Wait while there are either requests or replies to process.
        rc = zmq_poll (&items [0], 2, -1);
        if (unlikely (rc < 0)) {
            return -1;
        }

        //  The algorithm below asumes ratio of request and replies processed
        //  under full load to be 1:1. Although processing requests replies
        //  first is tempting it is suspectible to DoS attacks (overloading
        //  the system with unsolicited replies).

        //  Process a request.
        if (items [0].revents & ZMQ_POLLIN) {
            while (true) {

                rc = insocket_->recv (&msg, 0);
                if (unlikely (rc < 0)) {
                    return -1;
                }

                moresz = sizeof (more);
                rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
                if (unlikely (rc < 0)) {
                    return -1;
                }

                rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0);
                if (unlikely (rc < 0)) {
                    return -1;
                }

                if (!more)
                    break;
            }
        }

        //  Process a reply.
        if (items [1].revents & ZMQ_POLLIN) {
            while (true) {

                rc = outsocket_->recv (&msg, 0);
                if (unlikely (rc < 0)) {
                    return -1;
                }

                moresz = sizeof (more);
                rc = outsocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
                if (unlikely (rc < 0)) {
                    return -1;
                }

                rc = insocket_->send (&msg, more ? ZMQ_SNDMORE : 0);
                if (unlikely (rc < 0)) {
                    return -1;
                }

                if (!more)
                    break;
            }
        }

    }

    return 0;
}

1.6 Congestion

ZMQ可以通过控制HWM(high-water mark)来控制拥塞。内部实现上每一个socket有关联了buffer,HWM可以控制buffer大小

  • PUB/PUSH有transmit buffers.
  • SUB/PULL/REQ/REP有receive buffers.
  • DEALER/ROUTER/PAIR有transmit buffers也有receive buffers.

一旦socket达到了high-water mark的话,那么会根据socket类型来决定是丢弃还是block.现在实现而言的话PUB会尝试丢弃数据,而其他类型的socket就会block住。 如果socket是线程之间进行通信的话,那么HWM是两者socket的HWM之和。因为默认HWM是ulimited的,所以只要一端没有设置的话那么容量就无限。

Some notes on using the HWM option:

  • This affects both the transmit and receive buffers of a single socket. Some sockets (PUB, PUSH) only have transmit buffers. Some (SUB, PULL, REQ, REP) only have receive buffers. Some (DEALER, ROUTER, PAIR) have both transmit and receive buffers.
  • When your socket reaches its high-water mark, it will either block or drop data depending on the socket type. PUB sockets will drop data if they reach their high-water mark, while other socket types will block.
  • Over the inproc transport, the sender and reciever share the same buffers, so the real HWM is the sum of the HWM set by both sides. This means in effect that if one side does not set a HWM, there is no limit to the buffer size.

如果我们的内存有限的话那么我们希望将内存swap到磁盘上面。ZMQ允许我们如果拥塞内存超过HWM的话,那么还可以将内存swap到磁盘上面去。 不过这个磁盘内容我们是不可见的,并且不能够进行持久化。如果进程一旦crash重启的话那么内容消失。仅仅是为了swap用的,而不是为了持久化用的。

2 Exclusive-Pair

Exclusive-Pair是最简单的1:1通信模式,你可以认为就是一个TCPConnection.我们依然需要写bind和connect,但是server只能够接受一个连接。 数据可以进行双向连接,没有类似于REQ-REP的lockstep这样的要求。例子中我们连续发送了两个message,然后使用了inproc协议的socket.

#include <zmq.h>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <pthread.h>

void* second(void* arg){
    void* ctx=arg;
    void* pair=zmq_socket(ctx,ZMQ_PAIR);
    zmq_connect(pair,"inproc://channel");
    for(int i=0;i<2;i++){
        zmq_msg_t msg;
        zmq_msg_init(&msg);
        zmq_recv(pair,&msg,0);
        printf("[S]recv '%s'\n",zmq_msg_data(&msg));
        zmq_msg_close(&msg);
    }
    for(int i=0;i<2;i++){
        zmq_msg_t msg;
        zmq_msg_init_size(&msg,6);
        memcpy(zmq_msg_data(&msg),"world",6);
        printf("[S]send '%s'\n",zmq_msg_data(&msg));
        zmq_send(pair,&msg,0);
        zmq_msg_close(&msg);
    }
    zmq_close(pair);
}
int main(){
    void* ctx=zmq_init(2);
    void* pair=zmq_socket(ctx,ZMQ_PAIR);
    zmq_bind(pair,"inproc://channel");
    pthread_t id;
    pthread_create(&id,NULL,&second,ctx);
    for(int i=0;i<2;i++){
        zmq_msg_t msg;
        zmq_msg_init_size(&msg,6);
        memcpy(zmq_msg_data(&msg),"world",6);
        printf("[M]send '%s'\n",zmq_msg_data(&msg));
        zmq_send(pair,&msg,0);
        zmq_msg_close(&msg);
    }
    for(int i=0;i<2;i++){
        zmq_msg_t msg;
        zmq_msg_init(&msg);
        zmq_recv(pair,&msg,0);
        printf("[M]recv '%s'\n",zmq_msg_data(&msg));
        zmq_msg_close(&msg);
    }
    pthread_join(id,NULL);
    zmq_close(pair);
    zmq_term(ctx);
    return 0;
}

3 Publish-Subscribe

Pub-Sub模式非常简单,Pub不断地发布消息而Sub那么就不断地接收消息。因为消息的流向是单向的,所以相对于来说比较简单。subscriber可以订阅多个publisher, 多个publisher的消息会交替地到达。关于例子的话可以参考 http://zguide.zeromq.org/page:all#Getting-the-Message-Out .

我们在使用的时候subscriber必须设置ZMQ_SUBSCRIBE内容,否则subscriber是接收不到数据的。对于这个内容在进行过滤的时候有用,subscriber会根据消息头进行过滤, 如果消息头不和ZMQ_SUBSCRIBE的内容匹配的话那么数据就会被丢弃。但是从现在的实现上来看的话,现在过滤过程并不是在publisher来完成的,而是在subscriber获得所有数据来进行过滤的。 如果不想进行过滤的话,那么可以将ZMQ_SUBSCRIBE内容设置为空

zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "", 0);

In the current versions of ZMQ, filtering happens at the subscriber side, not the publisher side. This means, over TCP, that a publisher will send all messages to all subscribers, which will then drop messages they don't want.

3.1 Missing Message

我们看下面一个例子.为了简单起见我们想让subscriber首先运行起来,然后让publisher运行起来。因为如果我们首先将publisher连接起来的话, 那么subscriber在进行连接的话就会丢失很多记录了。

//============================================================
// publisher.cc,faster speed.
//============================================================
#include "zhelpers.h"

int main(){
    void* ctx=zmq_init(1);
    void* pub=zmq_socket(ctx,ZMQ_PUB);
    zmq_bind(pub,"tcp://*:19870");
    const int header=10001;
    for(int i=0;i<10;i++){
        char message[20];
        snprintf(message,sizeof(message),"%d %d",header,i);
        printf("send '%s'\n",message);
        {
            zmq_msg_t msg;
            zmq_msg_init_size(&msg,strlen(message)+1);
            memcpy(zmq_msg_data(&msg),message,strlen(message)+1);
            zmq_send(pub,&msg,0);
            zmq_msg_close(&msg);
            sleep(1);
        }
    }
    zmq_close(pub);
    zmq_term(ctx);
    return 0;
}

//============================================================
// subscriber.cc,litte speed.
//============================================================
#include "zhelpers.h"

int main(){
    void* ctx=zmq_init(1);
    void* sub=zmq_socket(ctx,ZMQ_SUB);
    zmq_setsockopt(sub,ZMQ_SUBSCRIBE,"10001",5);
    zmq_connect(sub,"tcp://localhost:19870");
    for(int i=0;i<10;i++){
        zmq_msg_t msg;
        zmq_msg_init(&msg);
        zmq_recv(sub,&msg,0);
        printf("recv '%s'\n",zmq_msg_data(&msg));
        zmq_msg_close(&msg);
        sleep(1);
    }
    zmq_close(sub);
    zmq_term(ctx);
    return 0;
}
[zhangyan@tc-cm-et18.tc.baidu.com]$ ./publisher
send '10001 0'
send '10001 1'
send '10001 2'
send '10001 3'
send '10001 4'
send '10001 5'
send '10001 6'
send '10001 7'
send '10001 8'
send '10001 9'

[zhangyan@tc-cm-et18.tc.baidu.com]$ ./subscriber
recv '10001 1'
recv '10001 2'
recv '10001 3'
recv '10001 4'
recv '10001 5'
recv '10001 6'
recv '10001 7'
recv '10001 8'
recv '10001 9'

我们看到的是subscriber丢失了一条消息。这个非常好解释,那就是说虽然subsriber首先启动的话,但是只有当publisher启动之后发送了一条信息之后才能够感知到对端启动, 这个时候subscriber再进行连接,那么就造成第一条数据的丢失。(这个过程是我猜测的,但是关于这个现象在ZMQ Guide上面是有解释的)

There is one important thing to know about PUB-SUB sockets: you do not know precisely when a subscriber starts to get messages. Even if you start a subscriber, wait a while, and then start the publisher, the subscriber will always miss the first messages that the publisher sends. This is because as the subscriber connects to the publisher (something that takes a small but non-zero time), the publisher may already be sending messages out.

解决这个问题很简单,就是需要一个同步的机制。但是即使是 http://zguide.zeromq.org/page:all#Node-Coordination 这种同步机制也是不够的。robust的同步机制应该是 A more robust model could be:

  • Publisher opens PUB socket and starts sending "Hello" messages (not data).
  • Subscribers connect SUB socket and when they receive a Hello message they tell the publisher via a REQ/REP socket pair.
  • When the publisher has had all the necessary confirmations, it starts to send real data.

3.2 Congestion Control

之前我们提到拥塞控制,对于PUB来说的话如果达到了HWM的话那么会直接进行丢弃。我们简单地修改一下上面的代码,让subscriber连接上但是不进行处理,而publisher不断地发送消息。

//============================================================
// publisher.cc,faster speed.
//============================================================
#include "zhelpers.h"

int main(){
    void* ctx=zmq_init(1);
    void* pub=zmq_socket(ctx,ZMQ_PUB);
    zmq_bind(pub,"tcp://*:19870");
    const int header=10001;
    int i=0;
    while(1){
        i++;
        char message[20];
        snprintf(message,sizeof(message),"%d %d",header,i);
        printf("send '%s'\n",message);
        {
            zmq_msg_t msg;
            zmq_msg_init_size(&msg,strlen(message)+1);
            memcpy(zmq_msg_data(&msg),message,strlen(message)+1);
            zmq_send(pub,&msg,0);
            zmq_msg_close(&msg);
        }
    }
    zmq_close(pub);
    zmq_term(ctx);
    return 0;
}

//============================================================
// subscriber.cc,litte speed.
//============================================================
#include "zhelpers.h"

int main(){
    void* ctx=zmq_init(1);
    void* sub=zmq_socket(ctx,ZMQ_SUB);
    zmq_setsockopt(sub,ZMQ_SUBSCRIBE,"10001",5);
    zmq_connect(sub,"tcp://localhost:19870");
    sleep(100000);
    zmq_close(sub);
    zmq_term(ctx);
    return 0;
}

然后我们看看运行之后的效果是subscriber占用的内存越来越大,而publisher的内存稳定。这是因为subscriber一旦连接上之后,那么publisher的内容就可以推送给 subscriber在sub这端进行缓存。如果一旦disconnect掉subscriber的话,因为publisher没有订阅者,那么消息直接丢弃不会在pub这端缓存。

我们可以通过设置Identity来强迫publisher进行缓存,在subscriber.cc部分加上

zmq_setsockopt(sub,ZMQ_IDENTITY,"luck",4);

然后启动subscriber然后挂断,因为subscriber连接上之后告诉了publisher自己的identity,那么publisher就会尝试缓存所有没有发往这个subscriber的数据。 如果没有设置PUB的HWM的话,那么PUB的内存很快就会被耗光。如果我们设置了HWM的话,那么publisher仅仅会缓存部分数据。我们还可以通过设置SWAP大小, 将部分拥塞部分结果放在磁盘上面,如果拥塞结果消息数量超过HWM的话

uint64_t hwm = 2;
zmq_setsockopt (publisher, ZMQ_HWM, &hwm, sizeof (hwm));

// Specify swap space in bytes
uint64_t swap = 25000000;
zmq_setsockopt (publisher, ZMQ_SWAP, &swap, sizeof (swap));

4 Push-Pull

Push-Pull相对于Pub-Sub模式更加简单。Push-Pull模型工作方式是Divide-And-Conquer,会保证选择一个并且只有一个client来处理消息,而不像Pub-Sub一样会尝试让所有的client都获得消息。 关于例子的话可以直接参考链接 http://zguide.zeromq.org/page:all#Divide-and-Conquer . 对于ZMQ的Push-Pull实现的话,server端会不断地发现新增的client连接,然后再进行消息分发的时候, 也会将这些消息分发到新增加的client上面去,使用这个功能的话就可以非常方便地处理动态添加机器的行为。

5 Request-Reply

我们返回来再看Example.在Example里面的话虽然server可以维护很多连接,但是读写方式是同步的,但是ZMQ是提供了异步的Request-Reply的通信模型的。 这节我们主要看看异步的Request-Reply在ZMQ里面是如何做到的。

首先ZMQ还定义了两个socket类型分别是:

  • ROUTER(XREP)
  • DEALER(XREQ)

其中ROUTER的大致功能是进行路由转发的,不要求立刻进行reply.而DEALER功能类似于PULL+PUSH,如果进行PUSH操作的话能够将消息进行负载均衡,而如果是PULL的话那么能够进行fair-queue能够均匀地将多个后端数据收集过来,然后配合REQ,REP就可以构造出很多种通信模式了。ZMQ Guide总结了一下各个socket类型特点。里面提到了Envelope会在后面说明。

Here now is a more detailed explanation of the four socket types we use for request-reply patterns:

  • DEALER just load-balances (deals out) the messages you send to all connected peers, and fair-queues (deals in) the messages it receives. It is exactly like a PUSH and PULL socket combined.
  • REQ prepends an empty message part to every message you send, and removes the empty message part from each message you receive. It then works like DEALER (and in fact is built on DEALER) except it also imposes a strict send / receive cycle.
  • ROUTER prepends an envelope with reply address to each message it receives, before passing it to the application. It also chops off the envelope (the first message part) from each message it sends, and uses that reply address to decide which peer the message should go to.
  • REP stores all the message parts up to the first empty message part, when you receive a message and it passes the rest (the data) to your application. When you send a reply, REP prepends the saved envelopes to the message and sends it back using the same semantics as ROUTER (and in fact REP is built on top of ROUTER), but matching REQ, imposes a strict receive / send cycle.

我们需要深入了解Envelope的机制才能够充分利用ZMQ的灵活性。首先我们看看一个使用ROUTER/DEALER的例子 http://zguide.zeromq.org/page:all#Multithreading-with-MQ . 仔细阅读完成这个例子之后会有一个疑问,就是底层是怎么我们回复的消息应该是和哪一个链接绑定的呢?因为在worker_routine里面的话,我们只是往ZMQ_REP socket里面写信息, 这个信息最终会传回给DEALER,然后DEALER通过device交回给ROUTER,那么ROUTER需要将这个信息传回给client.所有的秘密就在于Message Envelope(信息包装).

关于Envelope可以仔细阅读这个章节 http://zguide.zeromq.org/page:all#Request-Reply-Envelopes . 但是为了方便我们理解,这里我们还是重述一遍。 从上节的介绍中我们看到了REQ/REP的Envelope就是一个empty message part.而对于DEALER来说的话没有处理任何Envelope的信息。ROUTER的Envelope是这样的:

  • 如果对端设置了identity的话,每发送一个消息的话ROUTER接收到,那么ROUTER在转发之前会在头部自动加上一个message part,内容是identity.
  • 如果对端没有设置identity的话,每发送一个消息的话ROUTER接收到,那么ROUTER在转发之前会生成一个UUID,同样自动加上一个message part,内容是UUID.

如果消息流经多个ROUTER的话,那么会自动加上多个这样的东西。不过下面的实验证明,并不是每个消息都会生成UUID的,而是针对每个连接生成UUID.

#include "zhelpers.h"

int main (void) {
    void *context = zmq_init (1);

    void *sink = zmq_socket (context, ZMQ_ROUTER);
    zmq_bind (sink, "inproc://example");

    // First allow 0MQ to set the identity
    void *anonymous = zmq_socket (context, ZMQ_DEALER);
    zmq_connect (anonymous, "inproc://example");
    s_send (anonymous, "ROUTER uses a generated UUID");
    s_dump (sink);
    s_send (anonymous, "ROUTER uses a generated UUID");
    s_dump (sink);

    zmq_close (sink);
    zmq_close (anonymous);
    zmq_term (context);
    return 0;
}
[zhangyan@tc-cm-et18.tc.baidu.com]$ ./env
----------------------------------------
[017] 0011A54BD30A5A4FA589A7C2C2860926BA
[028] ROUTER uses a generated UUID
----------------------------------------
[017] 0011A54BD30A5A4FA589A7C2C2860926BA
[028] ROUTER uses a generated UUID

最后不管是DEALER还是REP来进行处理的话,都需要解包。只不过DEALER没有自动处理,需要我们自己在应用层解开多个message part,然后保存起来。当需要回复消息的时候, 在头部重新加上这些message part.这种方式比较灵活可以用来做异步处理。而REP逻辑就非常简单,一直解包直到第一个empty message part将其保存起来,然后当send出去的时候在头部包装, 这就解释了为什么,REP必须是一个lockstep的过程,不然的话整个逻辑就会混乱。

如果理解了ROUTER/DEALER/REQ/REP的机制之后的话,就比较容易理解如何构建一个异步客户端和服务器模型了。http://zguide.zeromq.org/page:all#Asynchronous-Client-Server .

6 API

关于API这一节的话提供的都是从ZeroMQ的代码文档里面得到的非常详细。但是我想针对里面一些具体的函数说一些或者是记下一些自己的体会,因为里面有坑或者是有思想。

6.1 Description

0MQ是一个轻量级的消息传递内核,扩展了socket接口。同时内置了很多新的特性,比如异步队列,多消息,消息订阅和过滤,不同transport的兼容等。

The 0MQ lightweight messaging kernel is a library which extends the standard socket interfaces with features traditionally provided by specialised messaging middleware products. 0MQ sockets provide an abstraction of asynchronous message queues, multiple messaging patterns, message filtering (subscriptions), seamless access to multiple transport protocols and more.

对于transport的话从现在0MQ看实现了下面几种,这个会在transports里面细说:

  • tcp
  • ipc
  • inproc
  • 其他(没有用过也没听说过,可能是多播方面吧).

用户大致上只需要修改工作的uri底层就可以切换实现,非常方面。对于异步队列的话,就是使用inproc这个transports来完成的。

我在编写同步rpc方面使用了0MQ,在这里面多消息基本上一无是处,因为这个东西完全可以在上层将所有的消息(对象)打包称为一个消息然后发送,只要上层提供了足够方便的多个对象的序列化和反序列化接口即可,多消息模式完全没有意义。消息订阅和过滤从之前的文档来看是按照消息的头几个字节来判断的,但是因为pub-sub模式可能会丢消息,所以在我们项目里面没有使用。项目里面使用的就是这个链接里面提到的模型 http://zguide.zeromq.org/page:all#toc38

6.2 Context

我理解context对于0MQ来说就好比是epoll线程的句柄,然后之后所有的建立的socket都会在这个线程里面进行监听。当然整个app通常来说只需要创建一次就可以了,然后在app之前等待结束即可。

首先通过zmq_init来进行初始化

// The io_threads argument specifies the size of the 0MQ thread pool to handle I/O operations.
// If your application is using only the inproc transport for messaging you may set this to zero, otherwise set it to at least one.
void *zmq_init (int io_threads);

其中io_threads指定的就是线程个数。

  • EINVAL.io_threads传入参数非法。

然后在app销毁的时候调用zmq_term来销毁这个epoll线程句柄以及开辟的epoll线程。原型非常简单

int zmq_term (void *context);

但是语义以及返回值有点麻烦。

Context termination is performed in the following steps:

  • Any blocking operations currently in progress on sockets open within context shall return immediately with an error code of ETERM. With the exception of zmq_close(), any further operations on sockets open within context shall fail with an error code of ETERM.
  • After interrupting all blocking calls, zmq_term() shall block until the following conditions are satisfied:
    • All sockets open within context have been closed with zmq_close().
    • For each socket within context, all messages sent by the application with zmq_send() have either been physically transferred to a network peer, or the socket’s linger period set with the ZMQ_LINGER socket option has expired.

一旦zmq_term的话那么所有正在block operations的话都会返回ZMQ_TERM这个错误。但后一旦中断这个错误之后的话,会一直等待直到socket调用zmq_close,如果还设置了LINGER这个选项的话,那么会等待数据到达到了对端或者是linger超时位置。返回值的话可能会

  • EFAULT.context本身无效
  • EINTR.调用被信号处理中断,这个时候可以重新发起zmq_term这个操作。

虽然解释非常清楚,但是对于我们大部分用户来说,真的不会设置LINGER选项,并且都会等待所有线程执行完毕之后才会调用zmq_term来释放句柄。

6.3 Messages

对于Messages而言的话,就是ZMQ传输的消息单元体。通过message得到内容有两种方法

  • zmq_msg_data
  • zmq_msg_size

非常简单。另外还有两个操作方式

  • zmq_msg_copy // 返回EFAULT表示src是无效的message.对于底层的话如果share也只是采用引用计数方法所以不会出现ENOMEM的错误.
  • zmq_msg_move // 返回EFAULT表示src是无效的message.同样底层销毁一个东西将控制权转移到另外一个对象,然后src调用init.

上面这些接口都非常简单,不容易出错。容易出错的就是初始化和销毁部分.

初始化有三种方法分别是

  • zmq_msg_init // 初始化并且没有分配任何内存。比较适合在zmq_recv之前调用。始终成功
  • zmq_msg_init_size // 以某个size进行初始化分配内存。这个比较适合在zmq_send之前的话我们将自己的数据copy进去。ENOMEM表示内存分配失败。
  • zmq_msg_init_data // 以data,size进行初始化,msg里面持有内存指针。比较适合zmq_send我们自己的数据,但是我们需要提供send over销毁数据的回调函数。

我们这里仔细看看init_size以及init_data的实现.我们会发现msg里面还有一个content对象,然后content里面会包含一个头部以及data.并且对于content有引用计数。 这个非常好理解,尤其是这个message是以一种异步的方式进行发送的,所以必须有引用计数才能够搞定这件事情。

int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
{
    if (size_ <= ZMQ_MAX_VSM_SIZE) {
        msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
        msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;
        msg_->vsm_size = (uint8_t) size_;
    }
    else {
        msg_->content =
            (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t) + size_);
        if (!msg_->content) {
            errno = ENOMEM;
            return -1;
        }
        msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;

        zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
        content->data = (void*) (content + 1);
        content->size = size_;
        content->ffn = NULL;
        content->hint = NULL;
        new (&content->refcnt) zmq::atomic_counter_t ();
    }
    return 0;
}

int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
    zmq_free_fn *ffn_, void *hint_)
{
    msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t));
    alloc_assert (msg_->content);
    msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;
    zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
    content->data = data_;
    content->size = size_;
    content->ffn = ffn_;
    content->hint = hint_;
    new (&content->refcnt) zmq::atomic_counter_t ();
    return 0;
}

了解了初始化的逻辑之后,对于销毁逻辑就非常清楚了。但是我们最好看看这个实现

int zmq_msg_close (zmq_msg_t *msg_)
{
    // 这个地方是会检查标志的,这样可以放置多次释放造成错误的结果
    //  Check the validity tag.
    if (unlikely (msg_->flags | ZMQ_MSG_MASK) != 0xff) {
        errno = EFAULT;
        return -1;
    }
    // 如果里面需要进行释放的话
    //  For VSMs and delimiters there are no resources to free.
    if (msg_->content != (zmq::msg_content_t*) ZMQ_DELIMITER &&
          msg_->content != (zmq::msg_content_t*) ZMQ_VSM) {

        //  If the content is not shared, or if it is shared and the reference.
        //  count has dropped to zero, deallocate it.
        zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
                // 那么会使用引用计数进行计算.
        if (!(msg_->flags & ZMQ_MSG_SHARED) || !content->refcnt.sub (1)) {

            //  We used "placement new" operator to initialize the reference.
            //  counter so we call its destructor now.
            content->refcnt.~atomic_counter_t ();
                        // 释放自己内部的内存.
            if (content->ffn)
                content->ffn (content->data, content->hint);
            free (content);
        }
    }
        // 然后底层会将这个flags清空.以防多次释放.
    //  Remove the validity tag from the message.
    msg_->flags = 0;

    return 0;
}

然后这里看看message的WireFormat.所谓的wireformat就是指message的打包方式。zeromq对于底层打包方式非常简单,原理就是bodylen+data(包含flags固定1字节)

  • 如果body_len < 254.那么这个可以使用1个字节表示body_len. (body_len(1byte) + flags(1byte) + data)
  • 如果body_len >= 254的话,zeromq使用8字节表示body_len. (0xff(特殊标记) + body_len(network order,8bytes) + flags(1byte) + data)

在大部分情况消息非常短使用1个字节表示body_len就可以搞定:).

6.4 Sockets

使用zmq_socket和zmq_close就可以创建和销毁socket.对于socket具体的类型的话可以参看文档,写得非常的详细。

// 错误可能有下面这些
// 1.EINVAL type不合法
// 2.EFAULT context无效
// 3.EMFILE 文件句柄不够
// 4.ETERM context已经被zmq_term了.
void *zmq_socket (void *context, int type);

// 错误可能有下面这些
// 1.ENOTSOCk 这个socket不合法
int zmq_close (void *socket);

创建和销毁接口都非常简单不容易出现错误.

然后剩下要做的就是创建服务端(bind)或者是客户端(connect).必须注意到zmq这里提供了一个很方便的东西, 就是connect本身也是一个异步过程。如果本次没有完成连接的话,那么下次隔断时间又会重新尝试发起连接。

// 其中endpoint根据不同的transport表达方式不同.这个后面会提到
// 可能的错误有下面这些
// 1.EPROTONOSUPPORT endpoint的transport有问题
// 2.ENOCOMPATPROTO  endpoint的transport和socket不兼容
// 3.EADDRINUSE address already in use.
// 4.EADDRNOTAVAIL address not available
// 5.ENODEV address指定了一个不存在的device.
// 6.ETERM context正在被销毁
// 7.ENOTSOCK socket无效
// 8.EMTHREAD 没有epoll IO线程完成这个task
// 其实觉得大部分的错误我们是没有必要处理的,EADDRINUSE可能是错常见的错误了.
int zmq_bind (void *socket, const char *endpoint);

// 可能的错误有下面这些
// EPROTONOSUPPORT endpoint的transport有问题
// ENOCOMPATPROTO  endpoint的transport和socket不兼容
// ETERM  context正在被销毁
// ENOTSOCK  socket无效
// EMTHREAD  没有epoll IO线程完成这个task
// 所以这里的大部分错误都可以忽略的.
int zmq_connect (void *socket, const char *endpoint);

接下来的部分就是读写了。zeromq读写都是针对message来操作的,允许操作multipart messages.但是个人现在看起来, 觉得multipart messages似乎没有什么太大的用途:(.因为两个接口面向对象都是message,对于十分简单

// 其中flags可以为ZMQ_NOBLOCK与ZMQ_SNDMORE(表示发送多消息,0表示结束)
// 这里NOBLOCK的含义是,如果不能够理解丢入异步消息队列的话,那么就立刻返回
// 可能的错误有下面这些
// EAGAIN 异步队列已满
// ENOTSUP socket不支持zmq_send
// EFSM socket的状态机错误(因为每种类型的socket允许操作顺序是不同的)
// ETERM context正在被销毁
// ENOTSOCK 无效的socket
// EINTR 被信号中断
// EFAULT 无效的message
// 事实上我们关注的可以很少,如果不用noblock的话,我们只需要关心EINTR.通常来说处理逻辑就是重新发起
// 如果用noblock在关注EAGAIN这个错误.
int zmq_send (void *socket, zmq_msg_t *msg, int flags);

// 其中flags只允许是ZMQ_NOBLOCK.如果异步消息队列里面没有数据的话,那么就返回
// 对于多条消息的话需要通过判断socket选项来决定是否还有更多消息(所以说麻烦).
// 可能的错误有下面这些
// EAGAIN 异步队列空
// ENOTSUP socket不支持zmq_recv
// EFSM 状态机存在问题
// ETERM context正在被销毁
// ENOTSOCK 无效socket
// EINTR 被信号中断
// EFAULT 无效的message
// 和zmq_send一样,如果不用noblock的话,我们只需要关心EINTR。如果关注noblock的话就再处理EAGAIN这个错误。
int zmq_recv (void *socket, zmq_msg_t *msg, int flags);

socket还允许使用poll来观察socket是否可读可写.zmq_poll提供的接口和poll非常相似

typedef struct
{
    void *socket; // 可以使用zmq_socket
    int fd; // 也可以使用fd
    short events;
    short revents;
} zmq_pollitem_t;
int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout); // -1表示没有超时

这里的events包括下面这些

  • ZMQ_POLLIN
  • ZMQ_POLLOUT
  • ZMQ_POLLERR // 这个仅仅对于fd有用.

如果纯粹使用zeromq发送的话,应该只需要关注POLLIN与POLLOUT即可。返回值的语义表示有多少个items ready了. 可能包含下面这些错误

  • ETERM context正在被销毁
  • EFAULT items本身参数无效(NULL)
  • EINTR 信号中断 // 通常我们最关心的错误

poll这个接口对于device需要.

6.5 Options

允许获取和设置的options并不是一样的,所以这里列出所有的options.获取的话标记R,设置的话标记W

int zmq_getsockopt (void *socket, int option_name, void *option_value, size_t *option_len);
int zmq_setsockopt (void *socket, int option_name, const void *option_value, size_t option_len);
  • ZMQ_TYPE(R) 获取socket type.
  • ZMQ_RCVMORE(R) socket是否还有更多的multipart message.
  • ZMQ_HWM(RW) HWM(high water mark).表示允许在内存异步队列里面存放多少消息.如果达到上限的话那么要不阻塞发送,要不丢弃消息(PUB).默认是没有限制.
  • ZMQ_SWAP(RW) 如果达到HWM的话,那么允许swap到磁盘。这个值表示允许swap的最大大小,默认为0就是不进行swap.
  • ZMQ_AFFINITY(RW) 获取socket和io_threads的亲和性(在哪些线程上工作).返回的值一个bitmap(uint64_t).这就意味这io_threads可以多达64个
  • ZMQ_IDENTITY(RW) 获取socket的id.如果自己设置id的话重新连接服务端的话,服务端还可以上次的消息续传。默认使用uuid分配每次启动不同。(不太清楚)
  • ZMQ_RATE(RW) 广播的收发速率.(不太清楚广播)
  • ZMQ_RECOVERY_IVL(RW) 广播恢复的间隔,单位是s.(不太清楚广播)
  • ZMQ_RECOVERY_IVL_MSEC(RW) 广播恢复的间隔,单位是ms.现在推荐使用这个选项.(不太清楚广播)
  • ZMQ_MCAST_LOOP(RW) 广播是否可以走回环.(不太清楚广播)
  • ZMQ_SNDBUF(RW) socket底层的send buffer大小
  • ZMQ_RCVBUF(RW) socket底层的recv buffer大小
  • ZMQ_LINGER(RW) socket底层进行linger的时间
  • ZMQ_RECONNECT_IVL(RW) 重连的时间间隔(默认100ms)
  • ZMQ_RECONNECT_IVL_MAX(RW) 重连的最大时间间隔(默认==IVL).原理是使用IVL开始然后每次*2来进行直到到达MAX来解决这个问题.但是默认的话不会出现指数退避.
  • ZMQ_BACKLOG(RW) listen backlog(默认100)
  • ZMQ_FD(R) 取出底层的fd
  • ZMQ_EVENTS(R) socket的可读写事件,包括ZMQ_POLLIN与ZMQ_POLLOUT
  • ZMQ_SUBSCRIBE(W) 设置subscribe过滤数据.因为subscribe是根据消息内容头部来过滤的.
  • ZMQ_UNSUBSCRIBE(W) 取消subscribe过滤数据.

对于获取和设置option的错误可能有下面这些

  • EINVAL 无效参数
  • ETERM context正在被销毁
  • ENOTSOCK socket无效
  • EINTR 调用被中断

可能我们唯一需要处理的就是EINTR把.对于选项的话我们可能最多设置一下SNDBUF与RCVBUF.可能reconnect时间需要调整一下.

6.6 Transports

transport是指zeromq的传输层在载体,一共有下面4种.

  • tcp
  • pgm
  • ipc
  • inproc

这里因为对于pgm不是很了解所以不做过多分析:)zeromq底层根据不同的传输层载体进行了封装,只需要在bind或者是connect替换endpoint即可。 所以这里只是看看endpoint的表示方法并且看看一些可能的潜在问题。endpoint的表示方法如下

transport://endpoint

对于tcp的bind来说的,最常用的两个方式就是

  • tcp://127.0.0.1:80
  • tcp://*:80

而对于connect来说的话,可以使用的方式就是

  • tcp://dirlt.com:80
  • tcp://127.0.0.1:80

注意这里一定需要提供端口号。事实上不指定端口也行,zeromq只需要修改很少代码即可。端口可以另外获取然后传输到zookeeeper等介质上面。 但是我猜想这样就破坏了zeromq的封装原则了,所以没有这么做:).

对于ipc来说的话,底层使用了unix domain socket来完成进程之间通信。unix domain socket走loopback接口,没有网卡限制也不会耗CPU中断. 如果了解unix domain socket的话,那么知道对应的ip地址在这里映射成为就是文本的一个特殊的unix domain socket文件,所以我们需要提供文件名。 方式就是

  • ipc://pathname

如果path不是绝对路径的话,那么就以当前目录开始。但是通常来说以绝对路径操作会更加方便,

所谓的inproc就是说在进程内部通信就是线程之间的通信,实现上就是异步消息队列。因为不占用任何线程,所以如果只是使用inproc的话, 那么zmq_init的话io_threads可以==0.而endpoint仅仅是为了标识这个异步队列,给定一个名称即可。比如使用方式可以使:

  • inproc://#0

另外需要注意的就是,inproc的server必须首先创建好,然后client才可以connect过来,这点是有顺序的。

6.7 Devices

关于Device的实现,虽然文档强调device必须配合frontend和backend,但是实际上代码只是使用一套。

// ZMQ_QUEUE starts a queue device
// ZMQ_FORWARDER starts a forwarder device
// ZMQ_STREAMER starts a streamer device
int zmq_device (int device, const void *frontend, const void *backend);

我们有必要研究一下device的写法,而且有时候我们可能不能用系统自带的zmq_device.好比zmq文档的multithreads的例子里面, 如果我们希望在每次轮询的时候可以检测一下外部的标记的话,那么就不能够使用自带的zmq_device。但是可以基于现在device简单修改即可.

int zmq::device (class socket_base_t *insocket_,
        class socket_base_t *outsocket_)
{
    zmq_msg_t msg;
    int rc = zmq_msg_init (&msg); // 仅仅初始化一次就可以,因为zmq_recv会自动帮我们close:).

    if (rc != 0) {
        return -1;
    }

    int64_t more;
    size_t moresz;

    zmq_pollitem_t items [2];
    items [0].socket = insocket_;
    items [0].fd = 0;
    items [0].events = ZMQ_POLLIN;
    items [0].revents = 0;
    items [1].socket = outsocket_;
    items [1].fd = 0;
    items [1].events = ZMQ_POLLIN;
    items [1].revents = 0;

    while (true) {

        //  Wait while there are either requests or replies to process.
        rc = zmq_poll (&items [0], 2, -1); // 这个地方我们可以设置一下poll的超时时间
                // 如果没有任何内容的话,我们可以每隔一段时间判断一下外部标记
                // 比如这里我们可以检测一下外部是否设置了exit的标记等。
        if (unlikely (rc < 0)) {
            return -1;
        }

        //  The algorithm below asumes ratio of request and replies processed
        //  under full load to be 1:1. Although processing requests replies
        //  first is tempting it is suspectible to DoS attacks (overloading
        //  the system with unsolicited replies).

        //  Process a request.
        if (items [0].revents & ZMQ_POLLIN) {
            while (true) {

                rc = insocket_->recv (&msg, 0);
                if (unlikely (rc < 0)) {
                    return -1;
                }

                moresz = sizeof (more);
                rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
                if (unlikely (rc < 0)) {
                    return -1;
                }

                rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0);
                if (unlikely (rc < 0)) {
                    return -1;
                }

                if (!more)
                    break;
            }
        }

        //  Process a reply.
        if (items [1].revents & ZMQ_POLLIN) {
            while (true) {

                rc = outsocket_->recv (&msg, 0);
                if (unlikely (rc < 0)) {
                    return -1;
                }

                moresz = sizeof (more);
                rc = outsocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
                if (unlikely (rc < 0)) {
                    return -1;
                }

                rc = insocket_->send (&msg, more ? ZMQ_SNDMORE : 0);
                if (unlikely (rc < 0)) {
                    return -1;
                }

                if (!more)
                    break;
            }
        }

    }

    return 0;
}

7 Philosophy

7.1 Fixing the World

Programming is a science dressed up as art, because most of us don't understand the physics of software, and it's rarely if ever taught. The physics of software is not algorithms, data structures, languages and abstractions. These are just tools we make, use, throw away. The real physics of software is the physics of people.

Specifically, our limitations when it comes to complexity, and our desire to work together to solve large problems in pieces. This is the science of programming: make building blocks that people can understand and use easily, and people will work together to solve the very largest problems.

We live in a connected world, and modern software has to navigate this world. So the building blocks for tomorrow's very largest solutions are connected and massively parallel. It's not enough for code to be "strong and silent" any more. Code has to talk to code. Code has to be chatty, sociable, well-connected. Code has to run like the human brain, trillions of individual neurons firing off messages to each other, a massively parallel network with no central control, no single point of failure, yet able to solve immensely difficult problems. And it's no accident that the future of code looks like the human brain, because the endpoints of every network are, at some level, human brains.

7.2 Programming with ZMQ

Having seen some examples, you're eager to start using ZMQ in some apps. Before you start that, take a deep breath, chillax, and reflect on some basic advice that will save you stress and confusion.

  • Learn ZMQ step by step. It's just one simple API but it hides a world of possibilities. Take the possibilities slowly, master each one.
  • Write nice code. Ugly code hides problems and makes it hard for others to help you. You might get used to meaningless variable names, but people reading your code won't. Use names that are real words, that say something other than "I'm too careless to tell you what this variable is really for". Use consistent indentation, clean layout. Write nice code and your world will be more comfortable.
  • Test what you make as you make it. When your program doesn't work, you should know what five lines are to blame. This is especially true when you do ZMQ magic, which just won't work the first times you try it.
  • When you find that things don't work as expected, break your code into pieces, test each one, see which one is not working. ZMQ lets you make essentially modular code, use that to your advantage.
  • Make abstractions (classes, methods, whatever) as you need them. If you copy/paste a lot of code you're going to copy/paste errors too.

In the ZMQ universe, sockets are clever multithreaded applications that manage a whole set of connections automagically for you. You can't see, work with, open, close, or attach state to these connections. Whether you use blocking send or receive, or poll, all you can talk to is the socket, not the connections it manages for you. The connections are private and invisible, and this is the key to ZMQ's scalability.

Because your code, talking to a socket, can then handle any number of connections across whatever network protocols are around, without change. A messaging pattern sitting in ZMQ can scale more cheaply than a messaging pattern sitting in your application code.

So the general assumption no longer applies. As you read the code examples, your brain will try to map them to what you know. You will read "socket" and think "ah, that represents a connection to another node". That is wrong. You will read "thread" and your brain will again think, "ah, a thread represents a connection to another node", and again your brain will be wrong.

If you're reading this Guide for the first time, realize that until you actually write ZMQ code for a day or two (and maybe three or four days), you may feel confused, especially by how simple ZMQ makes things for you, and you may try to impose that general assumption on ZMQ, and it won't work. And then you will experience your moment of enlightenment and trust, that zap-pow-kaboom satori paradigm-shift moment when it all becomes clear.

7.3 Why We Needed ZMQ

Many applications these days consist of components that stretch across some kind of network, either a LAN or the Internet. So many application developers end up doing some kind of messaging. Some developers use message queuing products, but most of the time they do it themselves, using TCP or UDP. These protocols are not hard to use, but there is a great difference between sending a few bytes from A to B, and doing messaging in any kind of reliable way.

Let's look at the typical problems we face when we start to connect pieces using raw TCP. Any reusable messaging layer would need to solve all or most these:

  • How do we handle I/O? Does our application block, or do we handle I/O in the background? This is a key design decision. Blocking I/O creates architectures that do not scale well. But background I/O can be very hard to do right.
  • How do we handle dynamic components, i.e. pieces that go away temporarily? Do we formally split components into "clients" and "servers" and mandate that servers cannot disappear? What then if we want to connect servers to servers? Do we try to reconnect every few seconds?
  • How do we represent a message on the wire? How do we frame data so it's easy to write and read, safe from buffer overflows, efficient for small messages, yet adequate for the very largest videos of dancing cats wearing party hats?
  • How do we handle messages that we can't deliver immediately? Particularly, if we're waiting for a component to come back on-line? Do we discard messages, put them into a database, or into a memory queue?
  • Where do we store message queues? What happens if the component reading from a queue is very slow, and causes our queues to build up? What's our strategy then?
  • How do we handle lost messages? Do we wait for fresh data, request a resend, or do we build some kind of reliability layer that ensures messages cannot be lost? What if that layer itself crashes?
  • What if we need to use a different network transport. Say, multicast instead of TCP unicast? Or IPv6? Do we need to rewrite the applications, or is the transport abstracted in some layer?
  • How do we route messages? Can we send the same message to multiple peers? Can we send replies back to an original requester?
  • How do we write an API for another language? Do we re-implement a wire-level protocol or do we repackage a library? If the former, how can we guarantee efficient and stable stacks? If the latter, how can we guarantee interoperability?
  • How do we represent data so that it can be read between different architectures? Do we enforce a particular encoding for data types? How far is this the job of the messaging system rather than a higher layer?
  • How do we handle network errors? Do we wait and retry, ignore them silently, or abort?

早期需要设计可靠消息系统比如AMQP.但是这种方式引入了single-point broker。对于需要这种可靠消息系统的应用来说,需要在broker上面做相当多的事情确保可靠性以及性能。但是这样对于中小应用陷入了尴尬,为了使用这种方便的消息系统他们需要引入broker这么东西是不能够忍受的。我们需要的一种简单方便的消息传输系统,没有任何附加代价(比如所有数据都流经broker),这就是ZeroMQ设计初衷。

It turns out that building reusable messaging systems is really difficult, which is why few FOSS projects ever tried, and why commercial messaging products are complex, expensive, inflexible, and brittle. In 2006 iMatix designed AMQP which started to give FOSS developers perhaps the first reusable recipe for a messaging system. AMQP works better than many other designs but remains relatively complex, expensive, and brittle. It takes weeks to learn to use, and months to create stable architectures that don't crash when things get hairy.

Most messaging projects, like AMQP, that try to solve this long list of problems in a reusable way do so by inventing a new concept, the "broker", that does addressing, routing, and queuing. This results in a client-server protocol or a set of APIs on top of some undocumented protocol, that let applications speak to this broker. Brokers are an excellent thing in reducing the complexity of large networks. But adding broker-based messaging to a product like Zookeeper would make it worse, not better. It would mean adding an additional big box, and a new single point of failure. A broker rapidly becomes a bottleneck and a new risk to manage. If the software supports it, we can add a second, third, fourth broker and make some fail-over scheme. People do this. It creates more moving pieces, more complexity, more things to break.

And a broker-centric set-up needs its own operations team. You literally need to watch the brokers day and night, and beat them with a stick when they start misbehaving. You need boxes, and you need backup boxes, and you need people to manage those boxes. It is only worth doing for large applications with many moving pieces, built by several teams of people, over several years.

So small to medium application developers are trapped. Either they avoid network programming, and make monolithic applications that do not scale. Or they jump into network programming and make brittle, complex applications that are hard to maintain. Or they bet on a messaging product, and end up with scalable applications that depend on expensive, easily broken technology. There has been no really good choice, which is maybe why messaging is largely stuck in the last century and stirs strong emotions. Negative ones for users, gleeful joy for those selling support and licenses.

What we need is something that does the job of messaging but does it in such a simple and cheap way that it can work in any application, with close to zero cost. It should be a library that you just link with, without any other dependencies. No additional moving pieces, so no additional risk. It should run on any OS and work with any programming language.

And this is ZMQ: an efficient, embeddable library that solves most of the problems an application needs to become nicely elastic across a network, without much cost. Specifically:

  • It handles I/O asynchronously, in background threads. These communicate with application threads using lock-free data structures, so ZMQ applications need no locks, semaphores, or other wait states.
  • Components can come and go dynamically and ZMQ will automatically reconnect. This means you can start components in any order. You can create "service-oriented architectures" (SOAs) where services can join and leave the network at any time.
  • It queues messages automatically when needed. It does this intelligently, pushing messages to as close as possible to the receiver before queuing them.
  • It has ways of dealing with over-full queues (called "high water mark"). When a queue is full, ZMQ automatically blocks senders, or throws away messages, depending on the kind of messaging you are doing (the so-called "pattern").
  • It lets your applications talk to each other over arbitrary transports: TCP, multicast, in-process, inter-process. You don't need to change your code to use a different transport.
  • It handles slow/blocked readers safely, using different strategies that depend on the messaging pattern.
  • It lets you route messages using a variety of patterns such as request-reply and publish-subscribe. These patterns are how you create the topology, the structure of your network.
  • It lets you place pattern-extending "devices" (small brokers) in the network when you need to reduce the complexity of interconnecting many pieces.
  • It delivers whole messages exactly as they were sent, using a simple framing on the wire. If you write a 10k message, you will receive a 10k message.
  • It does not impose any format on messages. They are blobs of zero to gigabytes large. When you want to represent data you choose some other product on top, such as Google's protocol buffers, XDR, and others.
  • It handles network errors intelligently. Sometimes it retries, sometimes it tells you an operation failed.
  • It reduces your carbon footprint. Doing more with less CPU means your boxes use less power, and you can keep your old boxes in use for longer. Al Gore would love ZMQ.

ZMQ为我们在编写网络应用程序上面带了颠覆性的效果。表面上看是一系列socket API但是当应用规模不断变大开始以消息传输为中心的时候,ZMQ的优势就会凸显出来。

Actually ZMQ does rather more than this. It has a subversive effect on how you develop network-capable applications. Superficially it's just a socket API on which you do zmq_recv(3) and zmq_send(3). But message processing rapidly becomes the central loop, and your application soon breaks down into a set of message processing tasks. It is elegant and natural. And it scales: each of these tasks maps to a node, and the nodes talk to each other across arbitrary transports. Two nodes in one process (node is a thread), two nodes on one box (node is a process), two boxes on one network (node is a box). With no application code changes.

7.4 The Zen of Zero

Originally the zero in ZMQ was meant as "zero broker" and (as close to) "zero latency" (as possible). In the meantime it has come to cover different goals: zero administration, zero cost, zero waste. More generally, "zero" refers to the culture of minimalism that permeates the project. We add power by removing complexity rather than exposing new functionality.

7.5 Handling Errors and ETERM

ZMQ's error handling philosophy is a mix of fail-fast and resilience. Processes, we believe, should be as vulnerable as possible to internal errors, and as robust as possible against external attacks and errors. To give an analogy, a living cell will self-destruct if it detects a single internal error, yet it will resist attack from the outside by all means possible. Assertions, which pepper the ZMQ code, are absolutely vital to robust code, they just have to be on the right side of the cellular wall. And there should be such a wall. If it is unclear whether a fault is internal or external, that is a design flaw that needs to be fixed.

7.6 Multithreading with ZMQ

To make utterly perfect MT programs (and I mean that literally) we don't need mutexes, locks, or any other form of inter-thread communication except messages sent across ZMQ sockets.

By "perfect" MT programs I mean code that's easy to write and understand, that works with one technology in any language and on any operating system, and that scales across any number of CPUs with zero wait states and no point of diminishing returns.

如果有一件事情是我从30多年的并发编程经验所总结出来的话,那么就是永远不要共享状态。

If you've spent years learning tricks to make your MT code work at all, let alone rapidly, with locks and semaphores and critical sections, you will be disgusted when you realize it was all for nothing. If there's one lesson we've learned from 30+ years of concurrent programming it is: just don't share state. It's like two drunkards trying to share a beer. It doesn't matter if they're good buddies. Sooner or later they're going to get into a fight. And the more drunkards you add to the pavement, the more they fight each other over the beer. The tragic majority of MT applications look like drunken bar fights.

The list of weird problems that you need to fight as you write classic shared-state MT code would be hillarious if it didn't translate directly into stress and risk, as code that seems to work suddenly fails under pressure. Here is a list of "11 Likely Problems In Your Multithreaded Code" from a large firm with world-beating experience in buggy code: forgotten synchronization, incorrect granularity, read and write tearing, lock-free reordering, lock convoys, two-step dance, and priority inversion.

Yeah, we also counted seven, not eleven. That's not the point though. The point is, do you really want that code running the power grid or stock market to start getting two-step lock convoys at 3pm on a busy Thursday? Who cares what the terms actually mean. This is not what turned us on to programming, fighting ever more complex side-effects with ever more complex hacks.

Some widely used metaphors, despite being the basis for billion-dollar industries, are fundamentally broken, and shared state concurrency is one of them. Code that wants to scale without limit does it like the Internet does, by sending messages and sharing nothing except a common contempt for broken programming metaphors.

comments powered by Disqus