blockedclient.c 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. #define REDISMODULE_EXPERIMENTAL_API
  2. #include "redismodule.h"
  3. #include <assert.h>
  4. #include <stdio.h>
  5. #include <pthread.h>
  6. #define UNUSED(V) ((void) V)
  7. void *sub_worker(void *arg) {
  8. // Get Redis module context
  9. RedisModuleCtx *ctx = (RedisModuleCtx *)arg;
  10. // Try acquiring GIL
  11. int res = RedisModule_ThreadSafeContextTryLock(ctx);
  12. // GIL is already taken by the calling thread expecting to fail.
  13. assert(res != REDISMODULE_OK);
  14. return NULL;
  15. }
  16. void *worker(void *arg) {
  17. // Retrieve blocked client
  18. RedisModuleBlockedClient *bc = (RedisModuleBlockedClient *)arg;
  19. // Get Redis module context
  20. RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc);
  21. // Acquire GIL
  22. RedisModule_ThreadSafeContextLock(ctx);
  23. // Create another thread which will try to acquire the GIL
  24. pthread_t tid;
  25. int res = pthread_create(&tid, NULL, sub_worker, ctx);
  26. assert(res == 0);
  27. // Wait for thread
  28. pthread_join(tid, NULL);
  29. // Release GIL
  30. RedisModule_ThreadSafeContextUnlock(ctx);
  31. // Reply to client
  32. RedisModule_ReplyWithSimpleString(ctx, "OK");
  33. // Unblock client
  34. RedisModule_UnblockClient(bc, NULL);
  35. // Free the Redis module context
  36. RedisModule_FreeThreadSafeContext(ctx);
  37. return NULL;
  38. }
  39. int acquire_gil(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
  40. {
  41. UNUSED(argv);
  42. UNUSED(argc);
  43. int flags = RedisModule_GetContextFlags(ctx);
  44. int allFlags = RedisModule_GetContextFlagsAll();
  45. if ((allFlags & REDISMODULE_CTX_FLAGS_MULTI) &&
  46. (flags & REDISMODULE_CTX_FLAGS_MULTI)) {
  47. RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not supported inside multi");
  48. return REDISMODULE_OK;
  49. }
  50. if ((allFlags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING) &&
  51. (flags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING)) {
  52. RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not allowed");
  53. return REDISMODULE_OK;
  54. }
  55. /* This command handler tries to acquire the GIL twice
  56. * once in the worker thread using "RedisModule_ThreadSafeContextLock"
  57. * second in the sub-worker thread
  58. * using "RedisModule_ThreadSafeContextTryLock"
  59. * as the GIL is already locked. */
  60. RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
  61. pthread_t tid;
  62. int res = pthread_create(&tid, NULL, worker, bc);
  63. assert(res == 0);
  64. return REDISMODULE_OK;
  65. }
  66. typedef struct {
  67. RedisModuleString **argv;
  68. int argc;
  69. RedisModuleBlockedClient *bc;
  70. } bg_call_data;
  71. void *bg_call_worker(void *arg) {
  72. bg_call_data *bg = arg;
  73. // Get Redis module context
  74. RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bg->bc);
  75. // Acquire GIL
  76. RedisModule_ThreadSafeContextLock(ctx);
  77. // Call the command
  78. const char* cmd = RedisModule_StringPtrLen(bg->argv[1], NULL);
  79. RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "v", bg->argv + 2, bg->argc - 2);
  80. // Release GIL
  81. RedisModule_ThreadSafeContextUnlock(ctx);
  82. // Reply to client
  83. if (!rep) {
  84. RedisModule_ReplyWithError(ctx, "NULL reply returned");
  85. } else {
  86. RedisModule_ReplyWithCallReply(ctx, rep);
  87. RedisModule_FreeCallReply(rep);
  88. }
  89. // Unblock client
  90. RedisModule_UnblockClient(bg->bc, NULL);
  91. /* Free the arguments */
  92. for (int i=0; i<bg->argc; i++)
  93. RedisModule_FreeString(ctx, bg->argv[i]);
  94. RedisModule_Free(bg->argv);
  95. RedisModule_Free(bg);
  96. // Free the Redis module context
  97. RedisModule_FreeThreadSafeContext(ctx);
  98. return NULL;
  99. }
  100. int do_bg_rm_call(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
  101. {
  102. UNUSED(argv);
  103. UNUSED(argc);
  104. /* Make sure we're not trying to block a client when we shouldn't */
  105. int flags = RedisModule_GetContextFlags(ctx);
  106. int allFlags = RedisModule_GetContextFlagsAll();
  107. if ((allFlags & REDISMODULE_CTX_FLAGS_MULTI) &&
  108. (flags & REDISMODULE_CTX_FLAGS_MULTI)) {
  109. RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not supported inside multi");
  110. return REDISMODULE_OK;
  111. }
  112. if ((allFlags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING) &&
  113. (flags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING)) {
  114. RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not allowed");
  115. return REDISMODULE_OK;
  116. }
  117. /* Make a copy of the arguments and pass them to the thread. */
  118. bg_call_data *bg = RedisModule_Alloc(sizeof(bg_call_data));
  119. bg->argv = RedisModule_Alloc(sizeof(RedisModuleString*)*argc);
  120. bg->argc = argc;
  121. for (int i=0; i<argc; i++)
  122. bg->argv[i] = RedisModule_HoldString(ctx, argv[i]);
  123. /* Block the client */
  124. bg->bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
  125. /* Start a thread to handle the request */
  126. pthread_t tid;
  127. int res = pthread_create(&tid, NULL, bg_call_worker, bg);
  128. assert(res == 0);
  129. return REDISMODULE_OK;
  130. }
  131. int do_rm_call(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
  132. UNUSED(argv);
  133. UNUSED(argc);
  134. if(argc < 2){
  135. return RedisModule_WrongArity(ctx);
  136. }
  137. const char* cmd = RedisModule_StringPtrLen(argv[1], NULL);
  138. RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "v", argv + 2, argc - 2);
  139. if(!rep){
  140. RedisModule_ReplyWithError(ctx, "NULL reply returned");
  141. }else{
  142. RedisModule_ReplyWithCallReply(ctx, rep);
  143. RedisModule_FreeCallReply(rep);
  144. }
  145. return REDISMODULE_OK;
  146. }
  147. /* simulate a blocked client replying to a thread safe context without creating a thread */
  148. int do_fake_bg_true(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  149. UNUSED(argv);
  150. UNUSED(argc);
  151. RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
  152. RedisModuleCtx *bctx = RedisModule_GetThreadSafeContext(bc);
  153. RedisModule_ReplyWithBool(bctx, 1);
  154. RedisModule_FreeThreadSafeContext(bctx);
  155. RedisModule_UnblockClient(bc, NULL);
  156. return REDISMODULE_OK;
  157. }
  158. int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  159. REDISMODULE_NOT_USED(argv);
  160. REDISMODULE_NOT_USED(argc);
  161. if (RedisModule_Init(ctx, "blockedclient", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR)
  162. return REDISMODULE_ERR;
  163. if (RedisModule_CreateCommand(ctx, "acquire_gil", acquire_gil, "", 0, 0, 0) == REDISMODULE_ERR)
  164. return REDISMODULE_ERR;
  165. if (RedisModule_CreateCommand(ctx, "do_rm_call", do_rm_call, "", 0, 0, 0) == REDISMODULE_ERR)
  166. return REDISMODULE_ERR;
  167. if (RedisModule_CreateCommand(ctx, "do_bg_rm_call", do_bg_rm_call, "", 0, 0, 0) == REDISMODULE_ERR)
  168. return REDISMODULE_ERR;
  169. if (RedisModule_CreateCommand(ctx, "do_fake_bg_true", do_fake_bg_true, "", 0, 0, 0) == REDISMODULE_ERR)
  170. return REDISMODULE_ERR;
  171. return REDISMODULE_OK;
  172. }