redis의 pub/sub 이란?
반응형

안녕하세요 😉

유유자적한 개발자 유로띠 입니다 😀

 

 

👏👏👏👏

 

 

 

 

이번 포스팅에서는

✅ redis의 메시지 pub/sub

✅  redis pub/sub functions

에 대해서 알아보겠습니다

 

 

🎉 REDIS - PUB/SUB


📢 redis pub / sub

 

redis의 기능 중 메시지 기능이 있습니다.

통로인 채널(Channel)을 이용하여 Publish 명령으로 메시지를 보내고, Subscribe 명령으로 메시지를 받습니다.

 

 

📍 특징

메시지를 보관하지 않습니다.

publish 하는 시점에 이미 실행 중인 subscribe한 클라이언트만 메시지를 받을 수 있습니다.

 

메시지의 수신을 보장하지 않습니다.

메시지 전송 시 subscribe한 클라이언트가 없다고 해서 문제가 되지 않습니다.

 

수신자 모두에게 메시지를 전송합니다.

채널을 수신하는 모든 subscribe한 클라이언트에 대해서 모두 메시지를 전송합니다.

해당 특징은 메시지 큐잉(Point-to-Point Channel)과의 차이점 입니다.

 

 

6개의 기본적인 명령어를 알아보도록 하겠습니다.

 

 

✅  SUBSCRIBE : [ subscribe channel ]

 

지정한 채널로 보내진 메시지를 받습니다.

subscribe fc:0:ping

 

 

✅  UNSUBSCRIBE : [ unsubscribe channel ]

 

등록한 채널을 삭제하여 더 이상 메시지를 받지 않습니다.

unsubscribe fc:0:ping

 

✅  PUBLISH : [ publish channel message ]

 

지정한 채널로 메시지를 전송합니다.

publish fc:0:ping "ping!!"

 

 

✅  PUBSUB : [ pubsub channels ]

 

해당 redis 서버에 등록된 모든 채널명을 보여줍니다. 

pubsub channels

 

 

💡 참고

채널을 패턴으로 등록하거나 삭제하여 관리합니다.

 

✅  PSUBSCRIBE : [ psubscribe pattern [pattern ...] ]

 

채널 이름을 패턴으로 등록합니다.

psubscribe fc*

 

 

✅  PUNSUBSCRIBE : [ punsubscribe pattern  [pattern ...] ]

 

등록한 패턴 채넣의 구독을 해제합니다.

punsubscribe fc*

 

 


 

📢 redis pub/sub functions

pub/sub의 내부 구조에 대해서 알아봅시다.

 

✅  SUBSCRIBE

 

채널을 등록하고 메시지를 받는 명령어입니다.

void subscribeCommand(redisClient *c) {
    int j;

    for (j = 1; j < c->argc; j++)
        pubsubSubscribeChannel(c,c->argv[j]);
    c->flags |= REDIS_PUBSUB;
}

int pubsubSubscribeChannel(redisClient *c, robj *channel) {
    struct dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    /* Add the channel to the client -> channels hash table */
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);
        /* Add the client to the channel -> list of clients hash table */
        de = dictFind(server.pubsub_channels,channel);
        if (de == NULL) {
            clients = listCreate();
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }
        listAddNodeTail(clients,c);
    }
    /* Notify the client */
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.subscribebulk);
    addReplyBulk(c,channel);
    addReplyLongLong(c,clientSubscriptionsCount(c));
    return retval;
}

 

서버(redisServer)와 클라이언트(redisClient)에 채널을 등록합니다.

 

클라이언트

dictAdd(c->pubsub_channels,channel,NULL)에서 클라이언트에 채널을 등록하며, 값은 null을 저장합니다.

 

채널을 저장하는 클라이언트 Dict 데이터 구조

 

 

 

서버

dictFind(server.pubsub_channels,channel)에서 서버에 채널을 검색합니다.

만약 채널이 없다면  clients = listCreate()하여 리스트를 생성하고  dictAdd(server.pubsub_channels,channel,clients)하여

채널과 리스트를 등록합니다.

채널이 있다면 기존 리스트에 클라이언트를 등록합니다.

 

채널을 저장하는 서버 Dict 데이터 구조

 

 

📍 subscribe

create:external:employee 란 채널을 구독하였습니다.

채널을 조회하면 해당 채널이 생성된 것을 확인할 수 있습니다.

 

 

✅  PUBLISH

 

메시지를 보내는 명령어입니다.

int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    struct dictEntry *de;
    listNode *ln;
    listIter li;

    /* Send to clients listening for that channel */
    de = dictFind(server.pubsub_channels,channel);
    if (de) {
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;

        listRewind(list,&li);
        while ((ln = listNext(&li)) != NULL) {
            redisClient *c = ln->value;

            addReply(c,shared.mbulkhdr[3]);
            addReply(c,shared.messagebulk);
            addReplyBulk(c,channel);
            addReplyBulk(c,message);
            receivers++;
        }
    }
    /* Send to clients listening to matching channels */
    if (listLength(server.pubsub_patterns)) {
        listRewind(server.pubsub_patterns,&li);
        channel = getDecodedObject(channel);
        while ((ln = listNext(&li)) != NULL) {
            pubsubPattern *pat = ln->value;

            if (stringmatchlen((char*)pat->pattern->ptr,
                                sdslen(pat->pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) {
                addReply(pat->client,shared.mbulkhdr[4]);
                addReply(pat->client,shared.pmessagebulk);
                addReplyBulk(pat->client,pat->pattern);
                addReplyBulk(pat->client,channel);
                addReplyBulk(pat->client,message);
                receivers++;
            }
        }
        decrRefCount(channel);
    }
    return receivers;
}

 

 

서버에서 해당 채널을 조회합니다.

 dictFind(server.pubsub_channels,channel)

노드가 없을 때까지 while loop문을 반복합니다.

 while ((ln = listNext(&li)) != NULL)

 

메시지를 보냅니다.

 

 addReplyBulk(c,channel);
 addReplyBulk(c,message);

반응형