searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

redis发布与订阅机制介绍

2023-10-10 08:14:17
4
0

今天给大家简单介绍下redis中的发布与订阅机制,本文目录如下:

什么是发布订阅

redis中的发布订阅功能由publish、subscribe、psubscribe等命令组成,就是多个客户端可以订阅服务端一个或者多个频道,当向该频道发布消息时,订阅该频道的所有客户端就会收到消息,如下图所示:

客户端可以订阅一个或者多个频道,另外还可以进行模式的订阅,意思是当向某个频道publish消息时,该消息不仅会被发送给订阅了这个频道的所有客户端,还会被发送给订阅了所有与该频道相匹配的模式的客户端,有点绕,看下图:

当向news.it频道发送消息时,客户端A、客户端C、客户端D都会收到消息:

发布消息的命令是publish,语法是PUBLISH channel message

普通订阅命令是subscribe,语法是subcribe channel [channel ...],可以同时订阅多个频道

模式订阅命令是psubscribe,语法是psubscribe pattern [pattern ...],可以同时订阅多个模式

发布订阅的实现

频道的订阅与退订

首先讲一下频道的订阅与退订,相关的命令是subcribe和unsubscribe,大概原理是当执行subscribe命令订阅某个频道时,服务端会用一个字典来保存订阅的客户端,其中频道名作为字典key,客户端链表作为value,执行subscribe就往该链表插入一个客户端,执行unsubscribe就从该链表中删除该客户端,当执行publish向某个频道发布消息时,通过频道名找到对应的客户端链表,然后依次给客户端发送消息,这样就实现了频道的订阅和发布。

redis将订阅关系保存在服务器状态的pubsub_channel字典中,key是频道名,value是链表,保存了订阅了该频道的客户端:

struct redisServer {
    ...
	dict *pubsub_channels;  /* Map channels to list of subscribed clients */
    ...
}

如下例子:

client-1、client-2、client-3订阅了news.it频道,client-4订阅了news.sport频道,client-5、client-6订阅了news.business频道,下面从代码层面看一下:

subscribe命令的处理函数是subscribeCommand函数,最后会调用到pubsubSubscribeChannel函数,如下:

/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
 * 0 if the client was already subscribed to that channel. */
int pubsubSubscribeChannel(client *c, robj *channel) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    /* Add the channel to the client -> channels hash table */
    // 订阅的频道保存到c->pubsub_channels字典中,key是channel,value是NULL
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);
        /* Add the client to the channel -> list of clients hash table */
        de = dictFind(server.pubsub_channels,channel);  // 查找该频道是否存在
        if (de == NULL) {
            clients = listCreate();
            // 添加该频道到server.pubsub_channels字典中
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }
        // 将该客户端插入到链表尾部
        listAddNodeTail(clients,c);
    }
    /* Notify the client */
    addReplyPubsubSubscribed(c,channel);
    return retval;
}

可以看出如果频道不存在则添加频道,然后添加客户端到链表中,如果该客户端已经订阅了该频道,则不会执行if中的代码。

unsubscribe命令的处理函数是unsubscribeCommand,最后会调用到pubsubUnsubscribeChannel函数:

/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
 * 0 if the client was not subscribed to the specified channel. */
int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
    dictEntry *de;
    list *clients;
    listNode *ln;
    int retval = 0;

    /* Remove the channel from the client -> channels hash table */
    incrRefCount(channel); /* channel may be just a pointer to the same object
                            we have in the hash tables. Protect it... */
    if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
        retval = 1;
        /* Remove the client from the channel -> clients list hash table */
        de = dictFind(server.pubsub_channels,channel);  // 查找该频道
        serverAssertWithInfo(c,NULL,de != NULL);
        clients = dictGetVal(de); // 得到该频道列表
        ln = listSearchKey(clients,c); // 从列表中查找该客户端c
        serverAssertWithInfo(c,NULL,ln != NULL);
        listDelNode(clients,ln);  // 删除该客户端节点
        if (listLength(clients) == 0) {
            /* Free the list and associated hash entry at all if this was
             * the latest client, so that it will be possible to abuse
             * Redis PUBSUB creating millions of channels. */
            // 该频道没有订阅者了,从字典中删除该频道
            dictDelete(server.pubsub_channels,channel);
        }
    }
    /* Notify the client */
    if (notify) addReplyPubsubUnsubscribed(c,channel);
    decrRefCount(channel); /* it is finally safe to release it */
    return retval;
}

如下图例子,将client-10086从news.sport和news.movie频道删除,执行完成后,下图虚线节点将会被删除,另外由于news.movie没有其他订阅者了,该频道也会从字典中删除

模式的订阅与退订

模式的订阅命令是psubscribeCommand,退订是punsubscribeCommand函数,订阅时跟频道订阅类似,将订阅关系保存在服务器状态的pubsub_patterns字典中,模式作为key,客户端链表作为value

struct redisServer {
    ...
	dict *pubsub_patterns;  /* A dict of pubsub_patterns */
    ...
}

订阅最后会调用psubscribeCommand函数,逻辑比较简单,如下:

/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */
int pubsubSubscribePattern(client *c, robj *pattern) {
    dictEntry *de;
    list *clients;
    int retval = 0;

    if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
        retval = 1;
        listAddNodeTail(c->pubsub_patterns,pattern);
        incrRefCount(pattern);
        /* Add the client to the pattern -> list of clients hash table */
        de = dictFind(server.pubsub_patterns,pattern);
        if (de == NULL) {
            clients = listCreate();
            dictAdd(server.pubsub_patterns,pattern,clients);
            incrRefCount(pattern);
        } else {
            clients = dictGetVal(de);
        }
        listAddNodeTail(clients,c);
    }
    /* Notify the client */
    addReplyPubsubPatSubscribed(c,pattern);
    return retval;
}

退订处理函数是punsubscribeCommand,最后会调用到pubsubUnsubscribePattern函数:

/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
 * 0 if the client was not subscribed to the specified channel. */
int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
    dictEntry *de;
    list *clients;
    listNode *ln;
    int retval = 0;

    incrRefCount(pattern); /* Protect the object. May be the same we remove */
    if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {
        retval = 1;
        listDelNode(c->pubsub_patterns,ln);
        /* Remove the client from the pattern -> clients list hash table */
        de = dictFind(server.pubsub_patterns,pattern);
        serverAssertWithInfo(c,NULL,de != NULL);
        clients = dictGetVal(de);  // 得到该模式的客户端链表
        ln = listSearchKey(clients,c);  // 在链表中查找客户端
        serverAssertWithInfo(c,NULL,ln != NULL);
        listDelNode(clients,ln);  // 找到就删除该节点
        if (listLength(clients) == 0) {  
            /* Free the list and associated hash entry at all if this was
             * the latest client. */
            // 如果没有其他订阅者就从字典中删除该模式key
            dictDelete(server.pubsub_patterns,pattern);
        }
    }
    /* Notify the client */
    if (notify) addReplyPubsubPatUnsubscribed(c,pattern);
    decrRefCount(pattern);
    return retval;
}

publish发布命令实现

publish命令发布消息时,对应的处理函数是publishCommand,最后会调用到pubsubPublishMessage函数,大概逻辑如下:

/* Publish a message */
int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    dictEntry *de;
    dictIterator *di;
    listNode *ln;
    listIter li;

    /* Send to clients listening for that channel */
    // 从pubsub_channels字典中查找channel频道
    de = dictFind(server.pubsub_channels,channel);
    if (de) {
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;
    	// 找到对应的频道就遍历客户端列表,给每个客户端发送消息
        listRewind(list,&li);
        while ((ln = listNext(&li)) != NULL) {
            client *c = ln->value;
            addReplyPubsubMessage(c,channel,message);
            receivers++;
        }
    }
    /* Send to clients listening to matching channels */
    // 处理模式订阅的情况
    di = dictGetIterator(server.pubsub_patterns);
    if (di) {
        channel = getDecodedObject(channel);
        while((de = dictNext(di)) != NULL) {
            robj *pattern = dictGetKey(de);
            list *clients = dictGetVal(de);
            // 如果模式匹配当前频道
            if (!stringmatchlen((char*)pattern->ptr,
                                sdslen(pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) continue;

            listRewind(clients,&li);
            // 遍历客户端链表,给客户端发送消息
            while ((ln = listNext(&li)) != NULL) {
                client *c = listNodeValue(ln);
                addReplyPubsubPatMessage(c,pattern,channel,message);
                receivers++;
            }
        }
        decrRefCount(channel);
        dictReleaseIterator(di);
    }
    return receivers;
}

0条评论
0 / 1000