2
0

defragtest.c 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. /* A module that implements defrag callback mechanisms.
  2. */
  3. #define REDISMODULE_EXPERIMENTAL_API
  4. #include "redismodule.h"
  5. #include <stdlib.h>
  6. static RedisModuleType *FragType;
  7. struct FragObject {
  8. unsigned long len;
  9. void **values;
  10. int maxstep;
  11. };
  12. /* Make sure we get the expected cursor */
  13. unsigned long int last_set_cursor = 0;
  14. unsigned long int datatype_attempts = 0;
  15. unsigned long int datatype_defragged = 0;
  16. unsigned long int datatype_resumes = 0;
  17. unsigned long int datatype_wrong_cursor = 0;
  18. unsigned long int global_attempts = 0;
  19. unsigned long int global_defragged = 0;
  20. int global_strings_len = 0;
  21. RedisModuleString **global_strings = NULL;
  22. static void createGlobalStrings(RedisModuleCtx *ctx, int count)
  23. {
  24. global_strings_len = count;
  25. global_strings = RedisModule_Alloc(sizeof(RedisModuleString *) * count);
  26. for (int i = 0; i < count; i++) {
  27. global_strings[i] = RedisModule_CreateStringFromLongLong(ctx, i);
  28. }
  29. }
  30. static int defragGlobalStrings(RedisModuleDefragCtx *ctx)
  31. {
  32. for (int i = 0; i < global_strings_len; i++) {
  33. RedisModuleString *new = RedisModule_DefragRedisModuleString(ctx, global_strings[i]);
  34. global_attempts++;
  35. if (new != NULL) {
  36. global_strings[i] = new;
  37. global_defragged++;
  38. }
  39. }
  40. return 0;
  41. }
  42. static void FragInfo(RedisModuleInfoCtx *ctx, int for_crash_report) {
  43. REDISMODULE_NOT_USED(for_crash_report);
  44. RedisModule_InfoAddSection(ctx, "stats");
  45. RedisModule_InfoAddFieldLongLong(ctx, "datatype_attempts", datatype_attempts);
  46. RedisModule_InfoAddFieldLongLong(ctx, "datatype_defragged", datatype_defragged);
  47. RedisModule_InfoAddFieldLongLong(ctx, "datatype_resumes", datatype_resumes);
  48. RedisModule_InfoAddFieldLongLong(ctx, "datatype_wrong_cursor", datatype_wrong_cursor);
  49. RedisModule_InfoAddFieldLongLong(ctx, "global_attempts", global_attempts);
  50. RedisModule_InfoAddFieldLongLong(ctx, "global_defragged", global_defragged);
  51. }
  52. struct FragObject *createFragObject(unsigned long len, unsigned long size, int maxstep) {
  53. struct FragObject *o = RedisModule_Alloc(sizeof(*o));
  54. o->len = len;
  55. o->values = RedisModule_Alloc(sizeof(RedisModuleString*) * len);
  56. o->maxstep = maxstep;
  57. for (unsigned long i = 0; i < len; i++) {
  58. o->values[i] = RedisModule_Calloc(1, size);
  59. }
  60. return o;
  61. }
  62. /* FRAG.RESETSTATS */
  63. static int fragResetStatsCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  64. REDISMODULE_NOT_USED(argv);
  65. REDISMODULE_NOT_USED(argc);
  66. datatype_attempts = 0;
  67. datatype_defragged = 0;
  68. datatype_resumes = 0;
  69. datatype_wrong_cursor = 0;
  70. global_attempts = 0;
  71. global_defragged = 0;
  72. RedisModule_ReplyWithSimpleString(ctx, "OK");
  73. return REDISMODULE_OK;
  74. }
  75. /* FRAG.CREATE key len size maxstep */
  76. static int fragCreateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  77. if (argc != 5)
  78. return RedisModule_WrongArity(ctx);
  79. RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1],
  80. REDISMODULE_READ|REDISMODULE_WRITE);
  81. int type = RedisModule_KeyType(key);
  82. if (type != REDISMODULE_KEYTYPE_EMPTY)
  83. {
  84. return RedisModule_ReplyWithError(ctx, "ERR key exists");
  85. }
  86. long long len;
  87. if ((RedisModule_StringToLongLong(argv[2], &len) != REDISMODULE_OK)) {
  88. return RedisModule_ReplyWithError(ctx, "ERR invalid len");
  89. }
  90. long long size;
  91. if ((RedisModule_StringToLongLong(argv[3], &size) != REDISMODULE_OK)) {
  92. return RedisModule_ReplyWithError(ctx, "ERR invalid size");
  93. }
  94. long long maxstep;
  95. if ((RedisModule_StringToLongLong(argv[4], &maxstep) != REDISMODULE_OK)) {
  96. return RedisModule_ReplyWithError(ctx, "ERR invalid maxstep");
  97. }
  98. struct FragObject *o = createFragObject(len, size, maxstep);
  99. RedisModule_ModuleTypeSetValue(key, FragType, o);
  100. RedisModule_ReplyWithSimpleString(ctx, "OK");
  101. RedisModule_CloseKey(key);
  102. return REDISMODULE_OK;
  103. }
  104. void FragFree(void *value) {
  105. struct FragObject *o = value;
  106. for (unsigned long i = 0; i < o->len; i++)
  107. RedisModule_Free(o->values[i]);
  108. RedisModule_Free(o->values);
  109. RedisModule_Free(o);
  110. }
  111. size_t FragFreeEffort(RedisModuleString *key, const void *value) {
  112. REDISMODULE_NOT_USED(key);
  113. const struct FragObject *o = value;
  114. return o->len;
  115. }
  116. int FragDefrag(RedisModuleDefragCtx *ctx, RedisModuleString *key, void **value) {
  117. REDISMODULE_NOT_USED(key);
  118. unsigned long i = 0;
  119. int steps = 0;
  120. int dbid = RedisModule_GetDbIdFromDefragCtx(ctx);
  121. RedisModule_Assert(dbid != -1);
  122. /* Attempt to get cursor, validate it's what we're exepcting */
  123. if (RedisModule_DefragCursorGet(ctx, &i) == REDISMODULE_OK) {
  124. if (i > 0) datatype_resumes++;
  125. /* Validate we're expecting this cursor */
  126. if (i != last_set_cursor) datatype_wrong_cursor++;
  127. } else {
  128. if (last_set_cursor != 0) datatype_wrong_cursor++;
  129. }
  130. /* Attempt to defrag the object itself */
  131. datatype_attempts++;
  132. struct FragObject *o = RedisModule_DefragAlloc(ctx, *value);
  133. if (o == NULL) {
  134. /* Not defragged */
  135. o = *value;
  136. } else {
  137. /* Defragged */
  138. *value = o;
  139. datatype_defragged++;
  140. }
  141. /* Deep defrag now */
  142. for (; i < o->len; i++) {
  143. datatype_attempts++;
  144. void *new = RedisModule_DefragAlloc(ctx, o->values[i]);
  145. if (new) {
  146. o->values[i] = new;
  147. datatype_defragged++;
  148. }
  149. if ((o->maxstep && ++steps > o->maxstep) ||
  150. ((i % 64 == 0) && RedisModule_DefragShouldStop(ctx)))
  151. {
  152. RedisModule_DefragCursorSet(ctx, i);
  153. last_set_cursor = i;
  154. return 1;
  155. }
  156. }
  157. last_set_cursor = 0;
  158. return 0;
  159. }
  160. int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  161. REDISMODULE_NOT_USED(argv);
  162. REDISMODULE_NOT_USED(argc);
  163. if (RedisModule_Init(ctx, "defragtest", 1, REDISMODULE_APIVER_1)
  164. == REDISMODULE_ERR) return REDISMODULE_ERR;
  165. if (RedisModule_GetTypeMethodVersion() < REDISMODULE_TYPE_METHOD_VERSION) {
  166. return REDISMODULE_ERR;
  167. }
  168. long long glen;
  169. if (argc != 1 || RedisModule_StringToLongLong(argv[0], &glen) == REDISMODULE_ERR) {
  170. return REDISMODULE_ERR;
  171. }
  172. createGlobalStrings(ctx, glen);
  173. RedisModuleTypeMethods tm = {
  174. .version = REDISMODULE_TYPE_METHOD_VERSION,
  175. .free = FragFree,
  176. .free_effort = FragFreeEffort,
  177. .defrag = FragDefrag
  178. };
  179. FragType = RedisModule_CreateDataType(ctx, "frag_type", 0, &tm);
  180. if (FragType == NULL) return REDISMODULE_ERR;
  181. if (RedisModule_CreateCommand(ctx, "frag.create",
  182. fragCreateCommand, "write deny-oom", 1, 1, 1) == REDISMODULE_ERR)
  183. return REDISMODULE_ERR;
  184. if (RedisModule_CreateCommand(ctx, "frag.resetstats",
  185. fragResetStatsCommand, "write deny-oom", 1, 1, 1) == REDISMODULE_ERR)
  186. return REDISMODULE_ERR;
  187. RedisModule_RegisterInfoFunc(ctx, FragInfo);
  188. RedisModule_RegisterDefragFunc(ctx, defragGlobalStrings);
  189. return REDISMODULE_OK;
  190. }