123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505 |
- #define REDISMODULE_EXPERIMENTAL_API
- #include "redismodule.h"
- #include <string.h>
- #include <strings.h>
- #include <assert.h>
- #include <unistd.h>
- #define LIST_SIZE 1024
- typedef struct {
- long long list[LIST_SIZE];
- long long length;
- } fsl_t; /* Fixed-size list */
- static RedisModuleType *fsltype = NULL;
- fsl_t *fsl_type_create() {
- fsl_t *o;
- o = RedisModule_Alloc(sizeof(*o));
- o->length = 0;
- return o;
- }
- void fsl_type_free(fsl_t *o) {
- RedisModule_Free(o);
- }
- /* ========================== "fsltype" type methods ======================= */
- void *fsl_rdb_load(RedisModuleIO *rdb, int encver) {
- if (encver != 0) {
- return NULL;
- }
- fsl_t *fsl = fsl_type_create();
- fsl->length = RedisModule_LoadUnsigned(rdb);
- for (long long i = 0; i < fsl->length; i++)
- fsl->list[i] = RedisModule_LoadSigned(rdb);
- return fsl;
- }
- void fsl_rdb_save(RedisModuleIO *rdb, void *value) {
- fsl_t *fsl = value;
- RedisModule_SaveUnsigned(rdb,fsl->length);
- for (long long i = 0; i < fsl->length; i++)
- RedisModule_SaveSigned(rdb, fsl->list[i]);
- }
- void fsl_aofrw(RedisModuleIO *aof, RedisModuleString *key, void *value) {
- fsl_t *fsl = value;
- for (long long i = 0; i < fsl->length; i++)
- RedisModule_EmitAOF(aof, "FSL.PUSH","sl", key, fsl->list[i]);
- }
- void fsl_free(void *value) {
- fsl_type_free(value);
- }
- /* ========================== helper methods ======================= */
- int get_fsl(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode, int create, fsl_t **fsl, int reply_on_failure) {
- RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, mode);
- int type = RedisModule_KeyType(key);
- if (type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(key) != fsltype) {
- RedisModule_CloseKey(key);
- if (reply_on_failure)
- RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
- RedisModuleCallReply *reply = RedisModule_Call(ctx, "INCR", "c", "fsl_wrong_type");
- RedisModule_FreeCallReply(reply);
- return 0;
- }
- /* Create an empty value object if the key is currently empty. */
- if (type == REDISMODULE_KEYTYPE_EMPTY) {
- if (!create) {
- /* Key is empty but we cannot create */
- RedisModule_CloseKey(key);
- *fsl = NULL;
- return 1;
- }
- *fsl = fsl_type_create();
- RedisModule_ModuleTypeSetValue(key, fsltype, *fsl);
- } else {
- *fsl = RedisModule_ModuleTypeGetValue(key);
- }
- RedisModule_CloseKey(key);
- return 1;
- }
- /* ========================== commands ======================= */
- /* FSL.PUSH <key> <int> - Push an integer to the fixed-size list (to the right).
- * It must be greater than the element in the head of the list. */
- int fsl_push(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- if (argc != 3)
- return RedisModule_WrongArity(ctx);
- long long ele;
- if (RedisModule_StringToLongLong(argv[2],&ele) != REDISMODULE_OK)
- return RedisModule_ReplyWithError(ctx,"ERR invalid integer");
- fsl_t *fsl;
- if (!get_fsl(ctx, argv[1], REDISMODULE_WRITE, 1, &fsl, 1))
- return REDISMODULE_OK;
- if (fsl->length == LIST_SIZE)
- return RedisModule_ReplyWithError(ctx,"ERR list is full");
- if (fsl->length != 0 && fsl->list[fsl->length-1] >= ele)
- return RedisModule_ReplyWithError(ctx,"ERR new element has to be greater than the head element");
- fsl->list[fsl->length++] = ele;
- RedisModule_SignalKeyAsReady(ctx, argv[1]);
- return RedisModule_ReplyWithSimpleString(ctx, "OK");
- }
- int bpop_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- REDISMODULE_NOT_USED(argv);
- REDISMODULE_NOT_USED(argc);
- RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx);
- fsl_t *fsl;
- if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0) || !fsl)
- return REDISMODULE_ERR;
- RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
- return REDISMODULE_OK;
- }
- int bpop_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- REDISMODULE_NOT_USED(argv);
- REDISMODULE_NOT_USED(argc);
- return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
- }
- /* FSL.BPOP <key> <timeout> - Block clients until list has two or more elements.
- * When that happens, unblock client and pop the last two elements (from the right). */
- int fsl_bpop(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- if (argc != 3)
- return RedisModule_WrongArity(ctx);
- long long timeout;
- if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK || timeout < 0)
- return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
- fsl_t *fsl;
- if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
- return REDISMODULE_OK;
- if (!fsl) {
- RedisModule_BlockClientOnKeys(ctx, bpop_reply_callback, bpop_timeout_callback,
- NULL, timeout, &argv[1], 1, NULL);
- } else {
- RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
- }
- return REDISMODULE_OK;
- }
- int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- REDISMODULE_NOT_USED(argv);
- REDISMODULE_NOT_USED(argc);
- RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx);
- long long *pgt = RedisModule_GetBlockedClientPrivateData(ctx);
- fsl_t *fsl;
- if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0) || !fsl)
- return REDISMODULE_ERR;
- if (fsl->list[fsl->length-1] <= *pgt)
- return REDISMODULE_ERR;
- RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
- return REDISMODULE_OK;
- }
- int bpopgt_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- REDISMODULE_NOT_USED(argv);
- REDISMODULE_NOT_USED(argc);
- return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
- }
- void bpopgt_free_privdata(RedisModuleCtx *ctx, void *privdata) {
- REDISMODULE_NOT_USED(ctx);
- RedisModule_Free(privdata);
- }
- /* FSL.BPOPGT <key> <gt> <timeout> - Block clients until list has an element greater than <gt>.
- * When that happens, unblock client and pop the last element (from the right). */
- int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- if (argc != 4)
- return RedisModule_WrongArity(ctx);
- long long gt;
- if (RedisModule_StringToLongLong(argv[2],>) != REDISMODULE_OK)
- return RedisModule_ReplyWithError(ctx,"ERR invalid integer");
- long long timeout;
- if (RedisModule_StringToLongLong(argv[3],&timeout) != REDISMODULE_OK || timeout < 0)
- return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
- fsl_t *fsl;
- if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
- return REDISMODULE_OK;
- if (!fsl || fsl->list[fsl->length-1] <= gt) {
- /* We use malloc so the tests in blockedonkeys.tcl can check for memory leaks */
- long long *pgt = RedisModule_Alloc(sizeof(long long));
- *pgt = gt;
- RedisModule_BlockClientOnKeys(ctx, bpopgt_reply_callback, bpopgt_timeout_callback,
- bpopgt_free_privdata, timeout, &argv[1], 1, pgt);
- } else {
- RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
- }
- return REDISMODULE_OK;
- }
- int bpoppush_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- REDISMODULE_NOT_USED(argv);
- REDISMODULE_NOT_USED(argc);
- RedisModuleString *src_keyname = RedisModule_GetBlockedClientReadyKey(ctx);
- RedisModuleString *dst_keyname = RedisModule_GetBlockedClientPrivateData(ctx);
- fsl_t *src;
- if (!get_fsl(ctx, src_keyname, REDISMODULE_READ, 0, &src, 0) || !src)
- return REDISMODULE_ERR;
- fsl_t *dst;
- if (!get_fsl(ctx, dst_keyname, REDISMODULE_WRITE, 1, &dst, 0) || !dst)
- return REDISMODULE_ERR;
- long long ele = src->list[--src->length];
- dst->list[dst->length++] = ele;
- RedisModule_SignalKeyAsReady(ctx, dst_keyname);
- return RedisModule_ReplyWithLongLong(ctx, ele);
- }
- int bpoppush_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- REDISMODULE_NOT_USED(argv);
- REDISMODULE_NOT_USED(argc);
- return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
- }
- void bpoppush_free_privdata(RedisModuleCtx *ctx, void *privdata) {
- RedisModule_FreeString(ctx, privdata);
- }
- /* FSL.BPOPPUSH <src> <dst> <timeout> - Block clients until <src> has an element.
- * When that happens, unblock client, pop the last element from <src> and push it to <dst>
- * (from the right). */
- int fsl_bpoppush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- if (argc != 4)
- return RedisModule_WrongArity(ctx);
- long long timeout;
- if (RedisModule_StringToLongLong(argv[3],&timeout) != REDISMODULE_OK || timeout < 0)
- return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
- fsl_t *src;
- if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &src, 1))
- return REDISMODULE_OK;
- if (!src) {
- /* Retain string for reply callback */
- RedisModule_RetainString(ctx, argv[2]);
- /* Key is empty, we must block */
- RedisModule_BlockClientOnKeys(ctx, bpoppush_reply_callback, bpoppush_timeout_callback,
- bpoppush_free_privdata, timeout, &argv[1], 1, argv[2]);
- } else {
- fsl_t *dst;
- if (!get_fsl(ctx, argv[2], REDISMODULE_WRITE, 1, &dst, 1))
- return REDISMODULE_OK;
- long long ele = src->list[--src->length];
- dst->list[dst->length++] = ele;
- RedisModule_SignalKeyAsReady(ctx, argv[2]);
- RedisModule_ReplyWithLongLong(ctx, ele);
- }
- return REDISMODULE_OK;
- }
- /* FSL.GETALL <key> - Reply with an array containing all elements. */
- int fsl_getall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- if (argc != 2)
- return RedisModule_WrongArity(ctx);
- fsl_t *fsl;
- if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
- return REDISMODULE_OK;
- if (!fsl)
- return RedisModule_ReplyWithArray(ctx, 0);
- RedisModule_ReplyWithArray(ctx, fsl->length);
- for (int i = 0; i < fsl->length; i++)
- RedisModule_ReplyWithLongLong(ctx, fsl->list[i]);
- return REDISMODULE_OK;
- }
- /* Callback for blockonkeys_popall */
- int blockonkeys_popall_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- REDISMODULE_NOT_USED(argc);
- RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
- if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_LIST) {
- RedisModuleString *elem;
- long len = 0;
- RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
- while ((elem = RedisModule_ListPop(key, REDISMODULE_LIST_HEAD)) != NULL) {
- len++;
- RedisModule_ReplyWithString(ctx, elem);
- RedisModule_FreeString(ctx, elem);
- }
- RedisModule_ReplySetArrayLength(ctx, len);
- } else {
- RedisModule_ReplyWithError(ctx, "ERR Not a list");
- }
- RedisModule_CloseKey(key);
- return REDISMODULE_OK;
- }
- int blockonkeys_popall_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- REDISMODULE_NOT_USED(argv);
- REDISMODULE_NOT_USED(argc);
- return RedisModule_ReplyWithError(ctx, "ERR Timeout");
- }
- /* BLOCKONKEYS.POPALL key
- *
- * Blocks on an empty key for up to 3 seconds. When unblocked by a list
- * operation like LPUSH, all the elements are popped and returned. Fails with an
- * error on timeout. */
- int blockonkeys_popall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- if (argc != 2)
- return RedisModule_WrongArity(ctx);
- RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ);
- if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_EMPTY) {
- RedisModule_BlockClientOnKeys(ctx, blockonkeys_popall_reply_callback,
- blockonkeys_popall_timeout_callback,
- NULL, 3000, &argv[1], 1, NULL);
- } else {
- RedisModule_ReplyWithError(ctx, "ERR Key not empty");
- }
- RedisModule_CloseKey(key);
- return REDISMODULE_OK;
- }
- /* BLOCKONKEYS.LPUSH key val [val ..]
- * BLOCKONKEYS.LPUSH_UNBLOCK key val [val ..]
- *
- * A module equivalent of LPUSH. If the name LPUSH_UNBLOCK is used,
- * RM_SignalKeyAsReady() is also called. */
- int blockonkeys_lpush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- if (argc < 3)
- return RedisModule_WrongArity(ctx);
- RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
- if (RedisModule_KeyType(key) != REDISMODULE_KEYTYPE_EMPTY &&
- RedisModule_KeyType(key) != REDISMODULE_KEYTYPE_LIST) {
- RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
- } else {
- for (int i = 2; i < argc; i++) {
- if (RedisModule_ListPush(key, REDISMODULE_LIST_HEAD,
- argv[i]) != REDISMODULE_OK) {
- RedisModule_CloseKey(key);
- return RedisModule_ReplyWithError(ctx, "ERR Push failed");
- }
- }
- }
- RedisModule_CloseKey(key);
- /* signal key as ready if the command is lpush_unblock */
- size_t len;
- const char *str = RedisModule_StringPtrLen(argv[0], &len);
- if (!strncasecmp(str, "blockonkeys.lpush_unblock", len)) {
- RedisModule_SignalKeyAsReady(ctx, argv[1]);
- }
- return RedisModule_ReplyWithSimpleString(ctx, "OK");
- }
- /* Callback for the BLOCKONKEYS.BLPOPN command */
- int blockonkeys_blpopn_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- REDISMODULE_NOT_USED(argc);
- long long n;
- RedisModule_StringToLongLong(argv[2], &n);
- RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
- int result;
- if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_LIST &&
- RedisModule_ValueLength(key) >= (size_t)n) {
- RedisModule_ReplyWithArray(ctx, n);
- for (long i = 0; i < n; i++) {
- RedisModuleString *elem = RedisModule_ListPop(key, REDISMODULE_LIST_HEAD);
- RedisModule_ReplyWithString(ctx, elem);
- RedisModule_FreeString(ctx, elem);
- }
- result = REDISMODULE_OK;
- } else if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_LIST ||
- RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_EMPTY) {
- /* continue blocking */
- result = REDISMODULE_ERR;
- } else {
- result = RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
- }
- RedisModule_CloseKey(key);
- return result;
- }
- int blockonkeys_blpopn_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- REDISMODULE_NOT_USED(argv);
- REDISMODULE_NOT_USED(argc);
- return RedisModule_ReplyWithError(ctx, "ERR Timeout");
- }
- /* BLOCKONKEYS.BLPOPN key N
- *
- * Blocks until key has N elements and then pops them or fails after 3 seconds.
- */
- int blockonkeys_blpopn(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- if (argc < 3) return RedisModule_WrongArity(ctx);
- long long n;
- if (RedisModule_StringToLongLong(argv[2], &n) != REDISMODULE_OK) {
- return RedisModule_ReplyWithError(ctx, "ERR Invalid N");
- }
- RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
- int keytype = RedisModule_KeyType(key);
- if (keytype != REDISMODULE_KEYTYPE_EMPTY &&
- keytype != REDISMODULE_KEYTYPE_LIST) {
- RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
- } else if (keytype == REDISMODULE_KEYTYPE_LIST &&
- RedisModule_ValueLength(key) >= (size_t)n) {
- RedisModule_ReplyWithArray(ctx, n);
- for (long i = 0; i < n; i++) {
- RedisModuleString *elem = RedisModule_ListPop(key, REDISMODULE_LIST_HEAD);
- RedisModule_ReplyWithString(ctx, elem);
- RedisModule_FreeString(ctx, elem);
- }
- } else {
- RedisModule_BlockClientOnKeys(ctx, blockonkeys_blpopn_reply_callback,
- blockonkeys_blpopn_timeout_callback,
- NULL, 3000, &argv[1], 1, NULL);
- }
- RedisModule_CloseKey(key);
- return REDISMODULE_OK;
- }
- int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- REDISMODULE_NOT_USED(argv);
- REDISMODULE_NOT_USED(argc);
- if (RedisModule_Init(ctx, "blockonkeys", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR)
- return REDISMODULE_ERR;
- RedisModuleTypeMethods tm = {
- .version = REDISMODULE_TYPE_METHOD_VERSION,
- .rdb_load = fsl_rdb_load,
- .rdb_save = fsl_rdb_save,
- .aof_rewrite = fsl_aofrw,
- .mem_usage = NULL,
- .free = fsl_free,
- .digest = NULL
- };
- fsltype = RedisModule_CreateDataType(ctx, "fsltype_t", 0, &tm);
- if (fsltype == NULL)
- return REDISMODULE_ERR;
- if (RedisModule_CreateCommand(ctx,"fsl.push",fsl_push,"",1,1,1) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
- if (RedisModule_CreateCommand(ctx,"fsl.bpop",fsl_bpop,"",1,1,1) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
- if (RedisModule_CreateCommand(ctx,"fsl.bpopgt",fsl_bpopgt,"",1,1,1) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
- if (RedisModule_CreateCommand(ctx,"fsl.bpoppush",fsl_bpoppush,"",1,2,1) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
- if (RedisModule_CreateCommand(ctx,"fsl.getall",fsl_getall,"",1,1,1) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
- if (RedisModule_CreateCommand(ctx, "blockonkeys.popall", blockonkeys_popall,
- "", 1, 1, 1) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
- if (RedisModule_CreateCommand(ctx, "blockonkeys.lpush", blockonkeys_lpush,
- "", 1, 1, 1) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
- if (RedisModule_CreateCommand(ctx, "blockonkeys.lpush_unblock", blockonkeys_lpush,
- "", 1, 1, 1) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
- if (RedisModule_CreateCommand(ctx, "blockonkeys.blpopn", blockonkeys_blpopn,
- "", 1, 1, 1) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
- return REDISMODULE_OK;
- }
|