blockonkeys.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. #define REDISMODULE_EXPERIMENTAL_API
  2. #include "redismodule.h"
  3. #include <string.h>
  4. #include <assert.h>
  5. #include <unistd.h>
  6. #define LIST_SIZE 1024
  7. typedef struct {
  8. long long list[LIST_SIZE];
  9. long long length;
  10. } fsl_t; /* Fixed-size list */
  11. static RedisModuleType *fsltype = NULL;
  12. fsl_t *fsl_type_create() {
  13. fsl_t *o;
  14. o = RedisModule_Alloc(sizeof(*o));
  15. o->length = 0;
  16. return o;
  17. }
  18. void fsl_type_free(fsl_t *o) {
  19. RedisModule_Free(o);
  20. }
  21. /* ========================== "fsltype" type methods ======================= */
  22. void *fsl_rdb_load(RedisModuleIO *rdb, int encver) {
  23. if (encver != 0) {
  24. return NULL;
  25. }
  26. fsl_t *fsl = fsl_type_create();
  27. fsl->length = RedisModule_LoadUnsigned(rdb);
  28. for (long long i = 0; i < fsl->length; i++)
  29. fsl->list[i] = RedisModule_LoadSigned(rdb);
  30. return fsl;
  31. }
  32. void fsl_rdb_save(RedisModuleIO *rdb, void *value) {
  33. fsl_t *fsl = value;
  34. RedisModule_SaveUnsigned(rdb,fsl->length);
  35. for (long long i = 0; i < fsl->length; i++)
  36. RedisModule_SaveSigned(rdb, fsl->list[i]);
  37. }
  38. void fsl_aofrw(RedisModuleIO *aof, RedisModuleString *key, void *value) {
  39. fsl_t *fsl = value;
  40. for (long long i = 0; i < fsl->length; i++)
  41. RedisModule_EmitAOF(aof, "FSL.PUSH","sl", key, fsl->list[i]);
  42. }
  43. void fsl_free(void *value) {
  44. fsl_type_free(value);
  45. }
  46. /* ========================== helper methods ======================= */
  47. int get_fsl(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode, int create, fsl_t **fsl, int reply_on_failure) {
  48. RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, mode);
  49. int type = RedisModule_KeyType(key);
  50. if (type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(key) != fsltype) {
  51. RedisModule_CloseKey(key);
  52. if (reply_on_failure)
  53. RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
  54. return 0;
  55. }
  56. /* Create an empty value object if the key is currently empty. */
  57. if (type == REDISMODULE_KEYTYPE_EMPTY) {
  58. if (!create) {
  59. /* Key is empty but we cannot create */
  60. RedisModule_CloseKey(key);
  61. *fsl = NULL;
  62. return 1;
  63. }
  64. *fsl = fsl_type_create();
  65. RedisModule_ModuleTypeSetValue(key, fsltype, *fsl);
  66. } else {
  67. *fsl = RedisModule_ModuleTypeGetValue(key);
  68. }
  69. RedisModule_CloseKey(key);
  70. return 1;
  71. }
  72. /* ========================== commands ======================= */
  73. /* FSL.PUSH <key> <int> - Push an integer to the fixed-size list (to the right).
  74. * It must be greater than the element in the head of the list. */
  75. int fsl_push(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  76. if (argc != 3)
  77. return RedisModule_WrongArity(ctx);
  78. long long ele;
  79. if (RedisModule_StringToLongLong(argv[2],&ele) != REDISMODULE_OK)
  80. return RedisModule_ReplyWithError(ctx,"ERR invalid integer");
  81. fsl_t *fsl;
  82. if (!get_fsl(ctx, argv[1], REDISMODULE_WRITE, 1, &fsl, 1))
  83. return REDISMODULE_OK;
  84. if (fsl->length == LIST_SIZE)
  85. return RedisModule_ReplyWithError(ctx,"ERR list is full");
  86. if (fsl->length != 0 && fsl->list[fsl->length-1] >= ele)
  87. return RedisModule_ReplyWithError(ctx,"ERR new element has to be greater than the head element");
  88. fsl->list[fsl->length++] = ele;
  89. RedisModule_SignalKeyAsReady(ctx, argv[1]);
  90. return RedisModule_ReplyWithSimpleString(ctx, "OK");
  91. }
  92. int bpop_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  93. REDISMODULE_NOT_USED(argv);
  94. REDISMODULE_NOT_USED(argc);
  95. RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx);
  96. fsl_t *fsl;
  97. if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0) || !fsl)
  98. return REDISMODULE_ERR;
  99. RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
  100. return REDISMODULE_OK;
  101. }
  102. int bpop_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  103. REDISMODULE_NOT_USED(argv);
  104. REDISMODULE_NOT_USED(argc);
  105. return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
  106. }
  107. /* FSL.BPOP <key> <timeout> - Block clients until list has two or more elements.
  108. * When that happens, unblock client and pop the last two elements (from the right). */
  109. int fsl_bpop(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  110. if (argc != 3)
  111. return RedisModule_WrongArity(ctx);
  112. long long timeout;
  113. if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK || timeout < 0)
  114. return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
  115. fsl_t *fsl;
  116. if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
  117. return REDISMODULE_OK;
  118. if (!fsl) {
  119. RedisModule_BlockClientOnKeys(ctx, bpop_reply_callback, bpop_timeout_callback,
  120. NULL, timeout, &argv[1], 1, NULL);
  121. } else {
  122. RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
  123. }
  124. return REDISMODULE_OK;
  125. }
  126. int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  127. REDISMODULE_NOT_USED(argv);
  128. REDISMODULE_NOT_USED(argc);
  129. RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx);
  130. long long *pgt = RedisModule_GetBlockedClientPrivateData(ctx);
  131. fsl_t *fsl;
  132. if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0) || !fsl)
  133. return REDISMODULE_ERR;
  134. if (fsl->list[fsl->length-1] <= *pgt)
  135. return REDISMODULE_ERR;
  136. RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
  137. return REDISMODULE_OK;
  138. }
  139. int bpopgt_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  140. REDISMODULE_NOT_USED(argv);
  141. REDISMODULE_NOT_USED(argc);
  142. return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
  143. }
  144. void bpopgt_free_privdata(RedisModuleCtx *ctx, void *privdata) {
  145. REDISMODULE_NOT_USED(ctx);
  146. RedisModule_Free(privdata);
  147. }
  148. /* FSL.BPOPGT <key> <gt> <timeout> - Block clients until list has an element greater than <gt>.
  149. * When that happens, unblock client and pop the last element (from the right). */
  150. int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  151. if (argc != 4)
  152. return RedisModule_WrongArity(ctx);
  153. long long gt;
  154. if (RedisModule_StringToLongLong(argv[2],&gt) != REDISMODULE_OK)
  155. return RedisModule_ReplyWithError(ctx,"ERR invalid integer");
  156. long long timeout;
  157. if (RedisModule_StringToLongLong(argv[3],&timeout) != REDISMODULE_OK || timeout < 0)
  158. return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
  159. fsl_t *fsl;
  160. if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
  161. return REDISMODULE_OK;
  162. if (!fsl || fsl->list[fsl->length-1] <= gt) {
  163. /* We use malloc so the tests in blockedonkeys.tcl can check for memory leaks */
  164. long long *pgt = RedisModule_Alloc(sizeof(long long));
  165. *pgt = gt;
  166. RedisModule_BlockClientOnKeys(ctx, bpopgt_reply_callback, bpopgt_timeout_callback,
  167. bpopgt_free_privdata, timeout, &argv[1], 1, pgt);
  168. } else {
  169. RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
  170. }
  171. return REDISMODULE_OK;
  172. }
  173. int bpoppush_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  174. REDISMODULE_NOT_USED(argv);
  175. REDISMODULE_NOT_USED(argc);
  176. RedisModuleString *src_keyname = RedisModule_GetBlockedClientReadyKey(ctx);
  177. RedisModuleString *dst_keyname = RedisModule_GetBlockedClientPrivateData(ctx);
  178. fsl_t *src;
  179. if (!get_fsl(ctx, src_keyname, REDISMODULE_READ, 0, &src, 0) || !src)
  180. return REDISMODULE_ERR;
  181. fsl_t *dst;
  182. if (!get_fsl(ctx, dst_keyname, REDISMODULE_WRITE, 1, &dst, 0) || !dst)
  183. return REDISMODULE_ERR;
  184. long long ele = src->list[--src->length];
  185. dst->list[dst->length++] = ele;
  186. RedisModule_SignalKeyAsReady(ctx, dst_keyname);
  187. return RedisModule_ReplyWithLongLong(ctx, ele);
  188. }
  189. int bpoppush_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  190. REDISMODULE_NOT_USED(argv);
  191. REDISMODULE_NOT_USED(argc);
  192. return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
  193. }
  194. void bpoppush_free_privdata(RedisModuleCtx *ctx, void *privdata) {
  195. RedisModule_FreeString(ctx, privdata);
  196. }
  197. /* FSL.BPOPPUSH <src> <dst> <timeout> - Block clients until <src> has an element.
  198. * When that happens, unblock client, pop the last element from <src> and push it to <dst>
  199. * (from the right). */
  200. int fsl_bpoppush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  201. if (argc != 4)
  202. return RedisModule_WrongArity(ctx);
  203. long long timeout;
  204. if (RedisModule_StringToLongLong(argv[3],&timeout) != REDISMODULE_OK || timeout < 0)
  205. return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
  206. fsl_t *src;
  207. if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &src, 1))
  208. return REDISMODULE_OK;
  209. if (!src) {
  210. /* Retain string for reply callback */
  211. RedisModule_RetainString(ctx, argv[2]);
  212. /* Key is empty, we must block */
  213. RedisModule_BlockClientOnKeys(ctx, bpoppush_reply_callback, bpoppush_timeout_callback,
  214. bpoppush_free_privdata, timeout, &argv[1], 1, argv[2]);
  215. } else {
  216. fsl_t *dst;
  217. if (!get_fsl(ctx, argv[2], REDISMODULE_WRITE, 1, &dst, 1))
  218. return REDISMODULE_OK;
  219. long long ele = src->list[--src->length];
  220. dst->list[dst->length++] = ele;
  221. RedisModule_SignalKeyAsReady(ctx, argv[2]);
  222. RedisModule_ReplyWithLongLong(ctx, ele);
  223. }
  224. return REDISMODULE_OK;
  225. }
  226. /* FSL.GETALL <key> - Reply with an array containing all elements. */
  227. int fsl_getall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  228. if (argc != 2)
  229. return RedisModule_WrongArity(ctx);
  230. fsl_t *fsl;
  231. if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
  232. return REDISMODULE_OK;
  233. if (!fsl)
  234. return RedisModule_ReplyWithArray(ctx, 0);
  235. RedisModule_ReplyWithArray(ctx, fsl->length);
  236. for (int i = 0; i < fsl->length; i++)
  237. RedisModule_ReplyWithLongLong(ctx, fsl->list[i]);
  238. return REDISMODULE_OK;
  239. }
  240. int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  241. REDISMODULE_NOT_USED(argv);
  242. REDISMODULE_NOT_USED(argc);
  243. if (RedisModule_Init(ctx, "blockonkeys", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR)
  244. return REDISMODULE_ERR;
  245. RedisModuleTypeMethods tm = {
  246. .version = REDISMODULE_TYPE_METHOD_VERSION,
  247. .rdb_load = fsl_rdb_load,
  248. .rdb_save = fsl_rdb_save,
  249. .aof_rewrite = fsl_aofrw,
  250. .mem_usage = NULL,
  251. .free = fsl_free,
  252. .digest = NULL
  253. };
  254. fsltype = RedisModule_CreateDataType(ctx, "fsltype_t", 0, &tm);
  255. if (fsltype == NULL)
  256. return REDISMODULE_ERR;
  257. if (RedisModule_CreateCommand(ctx,"fsl.push",fsl_push,"",0,0,0) == REDISMODULE_ERR)
  258. return REDISMODULE_ERR;
  259. if (RedisModule_CreateCommand(ctx,"fsl.bpop",fsl_bpop,"",0,0,0) == REDISMODULE_ERR)
  260. return REDISMODULE_ERR;
  261. if (RedisModule_CreateCommand(ctx,"fsl.bpopgt",fsl_bpopgt,"",0,0,0) == REDISMODULE_ERR)
  262. return REDISMODULE_ERR;
  263. if (RedisModule_CreateCommand(ctx,"fsl.bpoppush",fsl_bpoppush,"",0,0,0) == REDISMODULE_ERR)
  264. return REDISMODULE_ERR;
  265. if (RedisModule_CreateCommand(ctx,"fsl.getall",fsl_getall,"",0,0,0) == REDISMODULE_ERR)
  266. return REDISMODULE_ERR;
  267. return REDISMODULE_OK;
  268. }