안녕하세요 😉
유유자적한 개발자 유로띠 입니다 😀
👏👏👏👏
이번 포스팅에서는
✅ redis의 메시지 pub/sub
✅ redis pub/sub functions
에 대해서 알아보겠습니다
🎉 REDIS - PUB/SUB
📢 redis pub / sub
redis의 기능 중 메시지 기능이 있습니다.
통로인 채널(Channel)을 이용하여 Publish 명령으로 메시지를 보내고, Subscribe 명령으로 메시지를 받습니다.
📍 특징
메시지를 보관하지 않습니다.
publish 하는 시점에 이미 실행 중인 subscribe한 클라이언트만 메시지를 받을 수 있습니다.
메시지의 수신을 보장하지 않습니다.
메시지 전송 시 subscribe한 클라이언트가 없다고 해서 문제가 되지 않습니다.
수신자 모두에게 메시지를 전송합니다.
채널을 수신하는 모든 subscribe한 클라이언트에 대해서 모두 메시지를 전송합니다.
해당 특징은 메시지 큐잉(Point-to-Point Channel)과의 차이점 입니다.
6개의 기본적인 명령어를 알아보도록 하겠습니다.
✅ SUBSCRIBE : [ subscribe channel ]
지정한 채널로 보내진 메시지를 받습니다.
subscribe fc:0:ping
✅ UNSUBSCRIBE : [ unsubscribe channel ]
등록한 채널을 삭제하여 더 이상 메시지를 받지 않습니다.
unsubscribe fc:0:ping
✅ PUBLISH : [ publish channel message ]
지정한 채널로 메시지를 전송합니다.
publish fc:0:ping "ping!!"
✅ PUBSUB : [ pubsub channels ]
해당 redis 서버에 등록된 모든 채널명을 보여줍니다.
pubsub channels
💡 참고
채널을 패턴으로 등록하거나 삭제하여 관리합니다.
✅ PSUBSCRIBE : [ psubscribe pattern [pattern ...] ]
채널 이름을 패턴으로 등록합니다.
psubscribe fc*
✅ PUNSUBSCRIBE : [ punsubscribe pattern [pattern ...] ]
등록한 패턴 채넣의 구독을 해제합니다.
punsubscribe fc*
📢 redis pub/sub functions
pub/sub의 내부 구조에 대해서 알아봅시다.
✅ SUBSCRIBE
채널을 등록하고 메시지를 받는 명령어입니다.
void subscribeCommand(redisClient *c) {
int j;
for (j = 1; j < c->argc; j++)
pubsubSubscribeChannel(c,c->argv[j]);
c->flags |= REDIS_PUBSUB;
}
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
struct dictEntry *de;
list *clients = NULL;
int retval = 0;
/* Add the channel to the client -> channels hash table */
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel);
/* Add the client to the channel -> list of clients hash table */
de = dictFind(server.pubsub_channels,channel);
if (de == NULL) {
clients = listCreate();
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel);
} else {
clients = dictGetVal(de);
}
listAddNodeTail(clients,c);
}
/* Notify the client */
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.subscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,clientSubscriptionsCount(c));
return retval;
}
서버(redisServer)와 클라이언트(redisClient)에 채널을 등록합니다.
클라이언트
dictAdd(c->pubsub_channels,channel,NULL)에서 클라이언트에 채널을 등록하며, 값은 null을 저장합니다.
채널을 저장하는 클라이언트 Dict 데이터 구조
서버
dictFind(server.pubsub_channels,channel)에서 서버에 채널을 검색합니다.
만약 채널이 없다면 clients = listCreate()하여 리스트를 생성하고 dictAdd(server.pubsub_channels,channel,clients)하여
채널과 리스트를 등록합니다.
채널이 있다면 기존 리스트에 클라이언트를 등록합니다.
채널을 저장하는 서버 Dict 데이터 구조
📍 subscribe
create:external:employee 란 채널을 구독하였습니다.
채널을 조회하면 해당 채널이 생성된 것을 확인할 수 있습니다.
✅ PUBLISH
메시지를 보내는 명령어입니다.
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
struct dictEntry *de;
listNode *ln;
listIter li;
/* Send to clients listening for that channel */
de = dictFind(server.pubsub_channels,channel);
if (de) {
list *list = dictGetVal(de);
listNode *ln;
listIter li;
listRewind(list,&li);
while ((ln = listNext(&li)) != NULL) {
redisClient *c = ln->value;
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.messagebulk);
addReplyBulk(c,channel);
addReplyBulk(c,message);
receivers++;
}
}
/* Send to clients listening to matching channels */
if (listLength(server.pubsub_patterns)) {
listRewind(server.pubsub_patterns,&li);
channel = getDecodedObject(channel);
while ((ln = listNext(&li)) != NULL) {
pubsubPattern *pat = ln->value;
if (stringmatchlen((char*)pat->pattern->ptr,
sdslen(pat->pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) {
addReply(pat->client,shared.mbulkhdr[4]);
addReply(pat->client,shared.pmessagebulk);
addReplyBulk(pat->client,pat->pattern);
addReplyBulk(pat->client,channel);
addReplyBulk(pat->client,message);
receivers++;
}
}
decrRefCount(channel);
}
return receivers;
}
서버에서 해당 채널을 조회합니다.
dictFind(server.pubsub_channels,channel)
노드가 없을 때까지 while loop문을 반복합니다.
while ((ln = listNext(&li)) != NULL)
메시지를 보냅니다.
addReplyBulk(c,channel);
addReplyBulk(c,message);
'Programming > redis' 카테고리의 다른 글
docker로 redis 설치 및 redis 기본적인 명령어를 알아보자 (1) | 2022.02.10 |
---|