最新消息:欢迎来到 艺宵网,有任何问题或建议请联系!在线留言


设计常识 Iduer 0

最近接连有几位同学问我关于skynet 的消息队列算法中为什么引入了一个独立的 flags bool 数组的问题。由于时间太长了,我自己差点都忘记设计初衷了。今天在代码里加了点注释,防止以后忘记。其实当时我就写过一篇 blog 记录过,这篇 blog 下面的评论中也有许多讨论。

今天把里面一些细节再展开说一次,我用了一个循环队列来保存 skynet 的二级消息队列,代码是这样的:

#define GP(p) ((p) % MAX_GLOBAL_MQ)

static void 
skynet_globalmq_push(struct message_queue * queue) {
    struct global_queue *q= Q;

    uint32_t tail = GP(__sync_fetch_and_add(&q->tail,1));
    q->queue[tail] = queue;
    q->flag[tail] = true;

struct message_queue * 
skynet_globalmq_pop() {
    struct global_queue *q = Q;
    uint32_t head =  q->head;
    uint32_t head_ptr = GP(head);
    if (head_ptr == GP(q->tail)) {
        return NULL;

    if(!q->flag[head_ptr]) {
        return NULL;


    struct message_queue * mq = q->queue[head_ptr];
    if (!__sync_bool_compare_and_swap(&q->head, head, head+1)) {
        return NULL;
    q->flag[head_ptr] = false;

    return mq;

有同学问我,为什么要用一个单独的 flag 数组。用指针数组里的指针是否为空来判断不是更简单吗?

见 skynet 的第 68 个 PR 。


#define GP(p) ((p) % MAX_GLOBAL_MQ)

static void 
skynet_globalmq_push(struct message_queue * queue) {
    struct global_queue *q= Q;

    uint32_t tail = GP(__sync_fetch_and_add(&q->tail,1));
    // 如果线程在这里挂起,q->queue[tail] 将不为空,
    // 却没有更新到新的值
    q->queue[tail] = queue;

struct message_queue * 
skynet_globalmq_pop() {
    struct global_queue *q = Q;
    uint32_t head =  q->head;
    uint32_t head_ptr = GP(head);
    if (head_ptr == GP(q->tail)) {
        return NULL;

    if (!q->queue[head_ptr]) {
        return NULL;
    struct message_queue * mq = q->queue[head_ptr];
    // 这里无法确保 mq 读到的是 push 进去的值。
    // 它有可能是队列用完一圈后,上一个版本的值。
    if (!__sync_bool_compare_and_swap(&q->head, head, head+1)) {
        return NULL;

    q->queue[head_ptr] = NULL;

    return mq;


这种情况只有在 64K 的队列全部转过一圈,某个 push 线程一直挂起在递增 tail 指针,还来不及写入新的值的位置。



另外,以前的代码有一个限制:当活跃的(有消息的)服务总数超过 64K 的时候,这段代码就不能正常工作了。虽然一个 skynet 节点中的服务数量很难超过这个限制(因为无消息的服务不会在全局队列中),但理论上一个 skynet 节点支持的服务数量上限是远大于 64K 的。


static void 
skynet_globalmq_push(struct message_queue * queue) {
    struct global_queue *q= Q;

    if (q->flag[GP(q->tail)]) {
        // The queue may full seldom, save queue in list
        assert(queue->next == NULL);
        struct message_queue * last;
        do {
            last = q->list;
            queue->next = last;
        } while(!__sync_bool_compare_and_swap(&q->list, last, queue));


    uint32_t tail = GP(__sync_fetch_and_add(&q->tail,1));
    // The thread would suspend here, and the q->queue[tail] is last version ,
    // but the queue tail is increased.
    // So we set q->flag[tail] after changing q->queue[tail].
    q->queue[tail] = queue;
    q->flag[tail] = true;

struct message_queue * 
skynet_globalmq_pop() {
    struct global_queue *q = Q;
    uint32_t head =  q->head;

    if (head == q->tail) {
        // The queue is empty.
        return NULL;

    uint32_t head_ptr = GP(head);

    struct message_queue * list = q->list;
    if (list) {
        // If q->list is not empty, try to load it back to the queue
        struct message_queue *newhead = list->next;
        if (__sync_bool_compare_and_swap(&q->list, list, newhead)) {
            // try load list only once, if success , push it back to the queue.
            list->next = NULL;

    // Check the flag first, if the flag is false, the pushing may not complete.
    if(!q->flag[head_ptr]) {
        return NULL;


    struct message_queue * mq = q->queue[head_ptr];
    if (!__sync_bool_compare_and_swap(&q->head, head, head+1)) {
        return NULL;
    q->flag[head_ptr] = false;

    return mq;


转载请注明:艺宵网 » skynet消息队列调度算法设计要点

昵称 (必填) 手机号 (必填)
