123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- #define REDISMODULE_EXPERIMENTAL_API
- #include "redismodule.h"
- #include <assert.h>
- #include <stdio.h>
- #include <pthread.h>
- #define UNUSED(V) ((void) V)
- void *sub_worker(void *arg) {
- // Get Redis module context
- RedisModuleCtx *ctx = (RedisModuleCtx *)arg;
- // Try acquiring GIL
- int res = RedisModule_ThreadSafeContextTryLock(ctx);
- // GIL is already taken by the calling thread expecting to fail.
- assert(res != REDISMODULE_OK);
- return NULL;
- }
- void *worker(void *arg) {
- // Retrieve blocked client
- RedisModuleBlockedClient *bc = (RedisModuleBlockedClient *)arg;
- // Get Redis module context
- RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc);
- // Acquire GIL
- RedisModule_ThreadSafeContextLock(ctx);
- // Create another thread which will try to acquire the GIL
- pthread_t tid;
- int res = pthread_create(&tid, NULL, sub_worker, ctx);
- assert(res == 0);
- // Wait for thread
- pthread_join(tid, NULL);
- // Release GIL
- RedisModule_ThreadSafeContextUnlock(ctx);
- // Reply to client
- RedisModule_ReplyWithSimpleString(ctx, "OK");
- // Unblock client
- RedisModule_UnblockClient(bc, NULL);
- // Free the Redis module context
- RedisModule_FreeThreadSafeContext(ctx);
- return NULL;
- }
- int acquire_gil(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
- {
- UNUSED(argv);
- UNUSED(argc);
- int flags = RedisModule_GetContextFlags(ctx);
- int allFlags = RedisModule_GetContextFlagsAll();
- if ((allFlags & REDISMODULE_CTX_FLAGS_MULTI) &&
- (flags & REDISMODULE_CTX_FLAGS_MULTI)) {
- RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not supported inside multi");
- return REDISMODULE_OK;
- }
- if ((allFlags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING) &&
- (flags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING)) {
- RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not allowed");
- return REDISMODULE_OK;
- }
- /* This command handler tries to acquire the GIL twice
- * once in the worker thread using "RedisModule_ThreadSafeContextLock"
- * second in the sub-worker thread
- * using "RedisModule_ThreadSafeContextTryLock"
- * as the GIL is already locked. */
- RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
- pthread_t tid;
- int res = pthread_create(&tid, NULL, worker, bc);
- assert(res == 0);
- return REDISMODULE_OK;
- }
- typedef struct {
- RedisModuleString **argv;
- int argc;
- RedisModuleBlockedClient *bc;
- } bg_call_data;
- void *bg_call_worker(void *arg) {
- bg_call_data *bg = arg;
- // Get Redis module context
- RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bg->bc);
- // Acquire GIL
- RedisModule_ThreadSafeContextLock(ctx);
- // Call the command
- const char* cmd = RedisModule_StringPtrLen(bg->argv[1], NULL);
- RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "v", bg->argv + 2, bg->argc - 2);
- // Release GIL
- RedisModule_ThreadSafeContextUnlock(ctx);
- // Reply to client
- if (!rep) {
- RedisModule_ReplyWithError(ctx, "NULL reply returned");
- } else {
- RedisModule_ReplyWithCallReply(ctx, rep);
- RedisModule_FreeCallReply(rep);
- }
- // Unblock client
- RedisModule_UnblockClient(bg->bc, NULL);
- /* Free the arguments */
- for (int i=0; i<bg->argc; i++)
- RedisModule_FreeString(ctx, bg->argv[i]);
- RedisModule_Free(bg->argv);
- RedisModule_Free(bg);
- // Free the Redis module context
- RedisModule_FreeThreadSafeContext(ctx);
- return NULL;
- }
- int do_bg_rm_call(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
- {
- UNUSED(argv);
- UNUSED(argc);
- /* Make sure we're not trying to block a client when we shouldn't */
- int flags = RedisModule_GetContextFlags(ctx);
- int allFlags = RedisModule_GetContextFlagsAll();
- if ((allFlags & REDISMODULE_CTX_FLAGS_MULTI) &&
- (flags & REDISMODULE_CTX_FLAGS_MULTI)) {
- RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not supported inside multi");
- return REDISMODULE_OK;
- }
- if ((allFlags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING) &&
- (flags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING)) {
- RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not allowed");
- return REDISMODULE_OK;
- }
- /* Make a copy of the arguments and pass them to the thread. */
- bg_call_data *bg = RedisModule_Alloc(sizeof(bg_call_data));
- bg->argv = RedisModule_Alloc(sizeof(RedisModuleString*)*argc);
- bg->argc = argc;
- for (int i=0; i<argc; i++)
- bg->argv[i] = RedisModule_HoldString(ctx, argv[i]);
- /* Block the client */
- bg->bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
- /* Start a thread to handle the request */
- pthread_t tid;
- int res = pthread_create(&tid, NULL, bg_call_worker, bg);
- assert(res == 0);
- return REDISMODULE_OK;
- }
- int do_rm_call(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
- UNUSED(argv);
- UNUSED(argc);
- if(argc < 2){
- return RedisModule_WrongArity(ctx);
- }
- const char* cmd = RedisModule_StringPtrLen(argv[1], NULL);
- RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "v", argv + 2, argc - 2);
- if(!rep){
- RedisModule_ReplyWithError(ctx, "NULL reply returned");
- }else{
- RedisModule_ReplyWithCallReply(ctx, rep);
- RedisModule_FreeCallReply(rep);
- }
- return REDISMODULE_OK;
- }
- /* simulate a blocked client replying to a thread safe context without creating a thread */
- int do_fake_bg_true(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- UNUSED(argv);
- UNUSED(argc);
- RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
- RedisModuleCtx *bctx = RedisModule_GetThreadSafeContext(bc);
- RedisModule_ReplyWithBool(bctx, 1);
- RedisModule_FreeThreadSafeContext(bctx);
- RedisModule_UnblockClient(bc, NULL);
- return REDISMODULE_OK;
- }
- int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- REDISMODULE_NOT_USED(argv);
- REDISMODULE_NOT_USED(argc);
- if (RedisModule_Init(ctx, "blockedclient", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR)
- return REDISMODULE_ERR;
- if (RedisModule_CreateCommand(ctx, "acquire_gil", acquire_gil, "", 0, 0, 0) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
- if (RedisModule_CreateCommand(ctx, "do_rm_call", do_rm_call, "", 0, 0, 0) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
- if (RedisModule_CreateCommand(ctx, "do_bg_rm_call", do_bg_rm_call, "", 0, 0, 0) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
- if (RedisModule_CreateCommand(ctx, "do_fake_bg_true", do_fake_bg_true, "", 0, 0, 0) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
- return REDISMODULE_OK;
- }
|