2
0

blockonkeys.c 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. #define REDISMODULE_EXPERIMENTAL_API
  2. #include "redismodule.h"
  3. #include <string.h>
  4. #include <assert.h>
  5. #include <unistd.h>
  6. #define LIST_SIZE 1024
  7. typedef struct {
  8. long long list[LIST_SIZE];
  9. long long length;
  10. } fsl_t; /* Fixed-size list */
  11. static RedisModuleType *fsltype = NULL;
  12. fsl_t *fsl_type_create() {
  13. fsl_t *o;
  14. o = RedisModule_Alloc(sizeof(*o));
  15. o->length = 0;
  16. return o;
  17. }
  18. void fsl_type_free(fsl_t *o) {
  19. RedisModule_Free(o);
  20. }
  21. /* ========================== "fsltype" type methods ======================= */
  22. void *fsl_rdb_load(RedisModuleIO *rdb, int encver) {
  23. if (encver != 0) {
  24. return NULL;
  25. }
  26. fsl_t *fsl = fsl_type_create();
  27. fsl->length = RedisModule_LoadUnsigned(rdb);
  28. for (long long i = 0; i < fsl->length; i++)
  29. fsl->list[i] = RedisModule_LoadSigned(rdb);
  30. return fsl;
  31. }
  32. void fsl_rdb_save(RedisModuleIO *rdb, void *value) {
  33. fsl_t *fsl = value;
  34. RedisModule_SaveUnsigned(rdb,fsl->length);
  35. for (long long i = 0; i < fsl->length; i++)
  36. RedisModule_SaveSigned(rdb, fsl->list[i]);
  37. }
  38. void fsl_aofrw(RedisModuleIO *aof, RedisModuleString *key, void *value) {
  39. fsl_t *fsl = value;
  40. for (long long i = 0; i < fsl->length; i++)
  41. RedisModule_EmitAOF(aof, "FSL.PUSH","sl", key, fsl->list[i]);
  42. }
  43. void fsl_free(void *value) {
  44. fsl_type_free(value);
  45. }
  46. /* ========================== helper methods ======================= */
  47. int get_fsl(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode, int create, fsl_t **fsl, int reply_on_failure) {
  48. RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, mode);
  49. int type = RedisModule_KeyType(key);
  50. if (type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(key) != fsltype) {
  51. RedisModule_CloseKey(key);
  52. if (reply_on_failure)
  53. RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
  54. return 0;
  55. }
  56. /* Create an empty value object if the key is currently empty. */
  57. if (type == REDISMODULE_KEYTYPE_EMPTY) {
  58. if (!create) {
  59. /* Key is empty but we cannot create */
  60. RedisModule_CloseKey(key);
  61. *fsl = NULL;
  62. return 1;
  63. }
  64. *fsl = fsl_type_create();
  65. RedisModule_ModuleTypeSetValue(key, fsltype, *fsl);
  66. } else {
  67. *fsl = RedisModule_ModuleTypeGetValue(key);
  68. }
  69. RedisModule_CloseKey(key);
  70. return 1;
  71. }
  72. /* ========================== commands ======================= */
  73. /* FSL.PUSH <key> <int> - Push an integer to the fixed-size list (to the right).
  74. * It must be greater than the element in the head of the list. */
  75. int fsl_push(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  76. if (argc != 3)
  77. return RedisModule_WrongArity(ctx);
  78. long long ele;
  79. if (RedisModule_StringToLongLong(argv[2],&ele) != REDISMODULE_OK)
  80. return RedisModule_ReplyWithError(ctx,"ERR invalid integer");
  81. fsl_t *fsl;
  82. if (!get_fsl(ctx, argv[1], REDISMODULE_WRITE, 1, &fsl, 1))
  83. return REDISMODULE_OK;
  84. if (fsl->length == LIST_SIZE)
  85. return RedisModule_ReplyWithError(ctx,"ERR list is full");
  86. if (fsl->length != 0 && fsl->list[fsl->length-1] >= ele)
  87. return RedisModule_ReplyWithError(ctx,"ERR new element has to be greater than the head element");
  88. fsl->list[fsl->length++] = ele;
  89. if (fsl->length >= 2)
  90. RedisModule_SignalKeyAsReady(ctx, argv[1]);
  91. return RedisModule_ReplyWithSimpleString(ctx, "OK");
  92. }
  93. int bpop2_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  94. REDISMODULE_NOT_USED(argv);
  95. REDISMODULE_NOT_USED(argc);
  96. RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx);
  97. fsl_t *fsl;
  98. if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0))
  99. return REDISMODULE_ERR;
  100. if (!fsl || fsl->length < 2)
  101. return REDISMODULE_ERR;
  102. RedisModule_ReplyWithArray(ctx, 2);
  103. RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
  104. RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
  105. return REDISMODULE_OK;
  106. }
  107. int bpop2_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  108. REDISMODULE_NOT_USED(argv);
  109. REDISMODULE_NOT_USED(argc);
  110. return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
  111. }
  112. /* FSL.BPOP2 <key> <timeout> - Block clients until list has two or more elements.
  113. * When that happens, unblock client and pop the last two elements (from the right). */
  114. int fsl_bpop2(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  115. if (argc != 3)
  116. return RedisModule_WrongArity(ctx);
  117. long long timeout;
  118. if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK || timeout < 0)
  119. return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
  120. fsl_t *fsl;
  121. if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
  122. return REDISMODULE_OK;
  123. if (!fsl || fsl->length < 2) {
  124. /* Key is empty or has <2 elements, we must block */
  125. RedisModule_BlockClientOnKeys(ctx, bpop2_reply_callback, bpop2_timeout_callback,
  126. NULL, timeout, &argv[1], 1, NULL);
  127. } else {
  128. RedisModule_ReplyWithArray(ctx, 2);
  129. RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
  130. RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
  131. }
  132. return REDISMODULE_OK;
  133. }
  134. int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  135. REDISMODULE_NOT_USED(argv);
  136. REDISMODULE_NOT_USED(argc);
  137. RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx);
  138. long long *pgt = RedisModule_GetBlockedClientPrivateData(ctx);
  139. fsl_t *fsl;
  140. if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0))
  141. return REDISMODULE_ERR;
  142. if (!fsl || fsl->list[fsl->length-1] <= *pgt)
  143. return REDISMODULE_ERR;
  144. RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
  145. return REDISMODULE_OK;
  146. }
  147. int bpopgt_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  148. REDISMODULE_NOT_USED(argv);
  149. REDISMODULE_NOT_USED(argc);
  150. return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
  151. }
  152. void bpopgt_free_privdata(RedisModuleCtx *ctx, void *privdata) {
  153. REDISMODULE_NOT_USED(ctx);
  154. RedisModule_Free(privdata);
  155. }
  156. /* FSL.BPOPGT <key> <gt> <timeout> - Block clients until list has an element greater than <gt>.
  157. * When that happens, unblock client and pop the last element (from the right). */
  158. int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  159. if (argc != 4)
  160. return RedisModule_WrongArity(ctx);
  161. long long gt;
  162. if (RedisModule_StringToLongLong(argv[2],&gt) != REDISMODULE_OK)
  163. return RedisModule_ReplyWithError(ctx,"ERR invalid integer");
  164. long long timeout;
  165. if (RedisModule_StringToLongLong(argv[3],&timeout) != REDISMODULE_OK || timeout < 0)
  166. return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
  167. fsl_t *fsl;
  168. if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
  169. return REDISMODULE_OK;
  170. if (!fsl || fsl->list[fsl->length-1] <= gt) {
  171. /* We use malloc so the tests in blockedonkeys.tcl can check for memory leaks */
  172. long long *pgt = RedisModule_Alloc(sizeof(long long));
  173. *pgt = gt;
  174. /* Key is empty or has <2 elements, we must block */
  175. RedisModule_BlockClientOnKeys(ctx, bpopgt_reply_callback, bpopgt_timeout_callback,
  176. bpopgt_free_privdata, timeout, &argv[1], 1, pgt);
  177. } else {
  178. RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
  179. }
  180. return REDISMODULE_OK;
  181. }
  182. int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  183. REDISMODULE_NOT_USED(argv);
  184. REDISMODULE_NOT_USED(argc);
  185. if (RedisModule_Init(ctx, "blockonkeys", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR)
  186. return REDISMODULE_ERR;
  187. RedisModuleTypeMethods tm = {
  188. .version = REDISMODULE_TYPE_METHOD_VERSION,
  189. .rdb_load = fsl_rdb_load,
  190. .rdb_save = fsl_rdb_save,
  191. .aof_rewrite = fsl_aofrw,
  192. .mem_usage = NULL,
  193. .free = fsl_free,
  194. .digest = NULL
  195. };
  196. fsltype = RedisModule_CreateDataType(ctx, "fsltype_t", 0, &tm);
  197. if (fsltype == NULL)
  198. return REDISMODULE_ERR;
  199. if (RedisModule_CreateCommand(ctx,"fsl.push",fsl_push,"",0,0,0) == REDISMODULE_ERR)
  200. return REDISMODULE_ERR;
  201. if (RedisModule_CreateCommand(ctx,"fsl.bpop2",fsl_bpop2,"",0,0,0) == REDISMODULE_ERR)
  202. return REDISMODULE_ERR;
  203. if (RedisModule_CreateCommand(ctx,"fsl.bpopgt",fsl_bpopgt,"",0,0,0) == REDISMODULE_ERR)
  204. return REDISMODULE_ERR;
  205. return REDISMODULE_OK;
  206. }