lazyfree.c 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. #include "server.h"
  2. #include "bio.h"
  3. #include "atomicvar.h"
  4. #include "cluster.h"
  5. static size_t lazyfree_objects = 0;
  6. pthread_mutex_t lazyfree_objects_mutex = PTHREAD_MUTEX_INITIALIZER;
  7. /* Return the number of currently pending objects to free. */
  8. size_t lazyfreeGetPendingObjectsCount(void) {
  9. size_t aux;
  10. atomicGet(lazyfree_objects,aux);
  11. return aux;
  12. }
  13. /* Return the amount of work needed in order to free an object.
  14. * The return value is not always the actual number of allocations the
  15. * object is compoesd of, but a number proportional to it.
  16. *
  17. * For strings the function always returns 1.
  18. *
  19. * For aggregated objects represented by hash tables or other data structures
  20. * the function just returns the number of elements the object is composed of.
  21. *
  22. * Objects composed of single allocations are always reported as having a
  23. * single item even if they are actually logical composed of multiple
  24. * elements.
  25. *
  26. * For lists the function returns the number of elements in the quicklist
  27. * representing the list. */
  28. size_t lazyfreeGetFreeEffort(robj *obj) {
  29. if (obj->type == OBJ_LIST) {
  30. quicklist *ql = obj->ptr;
  31. return ql->len;
  32. } else if (obj->type == OBJ_SET && obj->encoding == OBJ_ENCODING_HT) {
  33. dict *ht = obj->ptr;
  34. return dictSize(ht);
  35. } else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST){
  36. zset *zs = obj->ptr;
  37. return zs->zsl->length;
  38. } else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HT) {
  39. dict *ht = obj->ptr;
  40. return dictSize(ht);
  41. } else {
  42. return 1; /* Everything else is a single allocation. */
  43. }
  44. }
  45. /* Delete a key, value, and associated expiration entry if any, from the DB.
  46. * If there are enough allocations to free the value object may be put into
  47. * a lazy free list instead of being freed synchronously. The lazy free list
  48. * will be reclaimed in a different bio.c thread. */
  49. #define LAZYFREE_THRESHOLD 64
  50. int dbAsyncDelete(redisDb *db, robj *key) {
  51. /* Deleting an entry from the expires dict will not free the sds of
  52. * the key, because it is shared with the main dictionary. */
  53. if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
  54. /* If the value is composed of a few allocations, to free in a lazy way
  55. * is actually just slower... So under a certain limit we just free
  56. * the object synchronously. */
  57. dictEntry *de = dictUnlink(db->dict,key->ptr);
  58. if (de) {
  59. robj *val = dictGetVal(de);
  60. size_t free_effort = lazyfreeGetFreeEffort(val);
  61. /* If releasing the object is too much work, do it in the background
  62. * by adding the object to the lazy free list.
  63. * Note that if the object is shared, to reclaim it now it is not
  64. * possible. This rarely happens, however sometimes the implementation
  65. * of parts of the Redis core may call incrRefCount() to protect
  66. * objects, and then call dbDelete(). In this case we'll fall
  67. * through and reach the dictFreeUnlinkedEntry() call, that will be
  68. * equivalent to just calling decrRefCount(). */
  69. if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
  70. atomicIncr(lazyfree_objects,1);
  71. bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
  72. dictSetVal(db->dict,de,NULL);
  73. }
  74. }
  75. /* Release the key-val pair, or just the key if we set the val
  76. * field to NULL in order to lazy free it later. */
  77. if (de) {
  78. dictFreeUnlinkedEntry(db->dict,de);
  79. if (server.cluster_enabled) slotToKeyDel(key);
  80. return 1;
  81. } else {
  82. return 0;
  83. }
  84. }
  85. /* Free an object, if the object is huge enough, free it in async way. */
  86. void freeObjAsync(robj *o) {
  87. size_t free_effort = lazyfreeGetFreeEffort(o);
  88. if (free_effort > LAZYFREE_THRESHOLD && o->refcount == 1) {
  89. atomicIncr(lazyfree_objects,1);
  90. bioCreateBackgroundJob(BIO_LAZY_FREE,o,NULL,NULL);
  91. } else {
  92. decrRefCount(o);
  93. }
  94. }
  95. /* Empty a Redis DB asynchronously. What the function does actually is to
  96. * create a new empty set of hash tables and scheduling the old ones for
  97. * lazy freeing. */
  98. void emptyDbAsync(redisDb *db) {
  99. dict *oldht1 = db->dict, *oldht2 = db->expires;
  100. db->dict = dictCreate(&dbDictType,NULL);
  101. db->expires = dictCreate(&keyptrDictType,NULL);
  102. atomicIncr(lazyfree_objects,dictSize(oldht1));
  103. bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2);
  104. }
  105. /* Empty the slots-keys map of Redis CLuster by creating a new empty one
  106. * and scheduiling the old for lazy freeing. */
  107. void slotToKeyFlushAsync(void) {
  108. rax *old = server.cluster->slots_to_keys;
  109. server.cluster->slots_to_keys = raxNew();
  110. memset(server.cluster->slots_keys_count,0,
  111. sizeof(server.cluster->slots_keys_count));
  112. atomicIncr(lazyfree_objects,old->numele);
  113. bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,NULL,old);
  114. }
  115. /* Release objects from the lazyfree thread. It's just decrRefCount()
  116. * updating the count of objects to release. */
  117. void lazyfreeFreeObjectFromBioThread(robj *o) {
  118. decrRefCount(o);
  119. atomicDecr(lazyfree_objects,1);
  120. }
  121. /* Release a database from the lazyfree thread. The 'db' pointer is the
  122. * database which was substitutied with a fresh one in the main thread
  123. * when the database was logically deleted. 'sl' is a skiplist used by
  124. * Redis Cluster in order to take the hash slots -> keys mapping. This
  125. * may be NULL if Redis Cluster is disabled. */
  126. void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) {
  127. size_t numkeys = dictSize(ht1);
  128. dictRelease(ht1);
  129. dictRelease(ht2);
  130. atomicDecr(lazyfree_objects,numkeys);
  131. }
  132. /* Release the skiplist mapping Redis Cluster keys to slots in the
  133. * lazyfree thread. */
  134. void lazyfreeFreeSlotsMapFromBioThread(rax *rt) {
  135. size_t len = rt->numele;
  136. raxFree(rt);
  137. atomicDecr(lazyfree_objects,len);
  138. }