stream-cgroups.tcl 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. start_server {
  2. tags {"stream"}
  3. } {
  4. test {XGROUP CREATE: creation and duplicate group name detection} {
  5. r DEL mystream
  6. r XADD mystream * foo bar
  7. r XGROUP CREATE mystream mygroup $
  8. catch {r XGROUP CREATE mystream mygroup $} err
  9. set err
  10. } {BUSYGROUP*}
  11. test {XGROUP CREATE: automatic stream creation fails without MKSTREAM} {
  12. r DEL mystream
  13. catch {r XGROUP CREATE mystream mygroup $} err
  14. set err
  15. } {ERR*}
  16. test {XGROUP CREATE: automatic stream creation works with MKSTREAM} {
  17. r DEL mystream
  18. r XGROUP CREATE mystream mygroup $ MKSTREAM
  19. } {OK}
  20. test {XREADGROUP will return only new elements} {
  21. r XADD mystream * a 1
  22. r XADD mystream * b 2
  23. # XREADGROUP should return only the new elements "a 1" "b 1"
  24. # and not the element "foo bar" which was pre existing in the
  25. # stream (see previous test)
  26. set reply [
  27. r XREADGROUP GROUP mygroup client-1 STREAMS mystream ">"
  28. ]
  29. assert {[llength [lindex $reply 0 1]] == 2}
  30. lindex $reply 0 1 0 1
  31. } {a 1}
  32. test {XREADGROUP can read the history of the elements we own} {
  33. # Add a few more elements
  34. r XADD mystream * c 3
  35. r XADD mystream * d 4
  36. # Read a few elements using a different consumer name
  37. set reply [
  38. r XREADGROUP GROUP mygroup client-2 STREAMS mystream ">"
  39. ]
  40. assert {[llength [lindex $reply 0 1]] == 2}
  41. assert {[lindex $reply 0 1 0 1] eq {c 3}}
  42. set r1 [r XREADGROUP GROUP mygroup client-1 COUNT 10 STREAMS mystream 0]
  43. set r2 [r XREADGROUP GROUP mygroup client-2 COUNT 10 STREAMS mystream 0]
  44. assert {[lindex $r1 0 1 0 1] eq {a 1}}
  45. assert {[lindex $r2 0 1 0 1] eq {c 3}}
  46. }
  47. test {XPENDING is able to return pending items} {
  48. set pending [r XPENDING mystream mygroup - + 10]
  49. assert {[llength $pending] == 4}
  50. for {set j 0} {$j < 4} {incr j} {
  51. set item [lindex $pending $j]
  52. if {$j < 2} {
  53. set owner client-1
  54. } else {
  55. set owner client-2
  56. }
  57. assert {[lindex $item 1] eq $owner}
  58. assert {[lindex $item 1] eq $owner}
  59. }
  60. }
  61. test {XPENDING can return single consumer items} {
  62. set pending [r XPENDING mystream mygroup - + 10 client-1]
  63. assert {[llength $pending] == 2}
  64. }
  65. test {XACK is able to remove items from the client/group PEL} {
  66. set pending [r XPENDING mystream mygroup - + 10 client-1]
  67. set id1 [lindex $pending 0 0]
  68. set id2 [lindex $pending 1 0]
  69. assert {[r XACK mystream mygroup $id1] eq 1}
  70. set pending [r XPENDING mystream mygroup - + 10 client-1]
  71. assert {[llength $pending] == 1}
  72. set id [lindex $pending 0 0]
  73. assert {$id eq $id2}
  74. set global_pel [r XPENDING mystream mygroup - + 10]
  75. assert {[llength $global_pel] == 3}
  76. }
  77. test {XACK can't remove the same item multiple times} {
  78. assert {[r XACK mystream mygroup $id1] eq 0}
  79. }
  80. test {XACK is able to accept multiple arguments} {
  81. # One of the IDs was already removed, so it should ack
  82. # just ID2.
  83. assert {[r XACK mystream mygroup $id1 $id2] eq 1}
  84. }
  85. test {PEL NACK reassignment after XGROUP SETID event} {
  86. r del events
  87. r xadd events * f1 v1
  88. r xadd events * f1 v1
  89. r xadd events * f1 v1
  90. r xadd events * f1 v1
  91. r xgroup create events g1 $
  92. r xadd events * f1 v1
  93. set c [llength [lindex [r xreadgroup group g1 c1 streams events >] 0 1]]
  94. assert {$c == 1}
  95. r xgroup setid events g1 -
  96. set c [llength [lindex [r xreadgroup group g1 c2 streams events >] 0 1]]
  97. assert {$c == 5}
  98. }
  99. test {XREADGROUP will not report data on empty history. Bug #5577} {
  100. r del events
  101. r xadd events * a 1
  102. r xadd events * b 2
  103. r xadd events * c 3
  104. r xgroup create events mygroup 0
  105. # Current local PEL should be empty
  106. set res [r xpending events mygroup - + 10]
  107. assert {[llength $res] == 0}
  108. # So XREADGROUP should read an empty history as well
  109. set res [r xreadgroup group mygroup myconsumer count 3 streams events 0]
  110. assert {[llength [lindex $res 0 1]] == 0}
  111. # We should fetch all the elements in the stream asking for >
  112. set res [r xreadgroup group mygroup myconsumer count 3 streams events >]
  113. assert {[llength [lindex $res 0 1]] == 3}
  114. # Now the history is populated with three not acked entries
  115. set res [r xreadgroup group mygroup myconsumer count 3 streams events 0]
  116. assert {[llength [lindex $res 0 1]] == 3}
  117. }
  118. test {XREADGROUP history reporting of deleted entries. Bug #5570} {
  119. r del mystream
  120. r XGROUP CREATE mystream mygroup $ MKSTREAM
  121. r XADD mystream 1 field1 A
  122. r XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
  123. r XADD mystream MAXLEN 1 2 field1 B
  124. r XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
  125. # Now we have two pending entries, however one should be deleted
  126. # and one should be ok (we should only see "B")
  127. set res [r XREADGROUP GROUP mygroup myconsumer STREAMS mystream 0-1]
  128. assert {[lindex $res 0 1 0] == {1-0 {}}}
  129. assert {[lindex $res 0 1 1] == {2-0 {field1 B}}}
  130. }
  131. test {Blocking XREADGROUP will not reply with an empty array} {
  132. r del mystream
  133. r XGROUP CREATE mystream mygroup $ MKSTREAM
  134. r XADD mystream 666 f v
  135. set res [r XREADGROUP GROUP mygroup Alice BLOCK 10 STREAMS mystream ">"]
  136. assert {[lindex $res 0 1 0] == {666-0 {f v}}}
  137. r XADD mystream 667 f2 v2
  138. r XDEL mystream 667
  139. set rd [redis_deferring_client]
  140. $rd XREADGROUP GROUP mygroup Alice BLOCK 10 STREAMS mystream ">"
  141. after 20
  142. assert {[$rd read] == {}} ;# before the fix, client didn't even block, but was served synchronously with {mystream {}}
  143. }
  144. test {XGROUP DESTROY should unblock XREADGROUP with -NOGROUP} {
  145. r del mystream
  146. r XGROUP CREATE mystream mygroup $ MKSTREAM
  147. set rd [redis_deferring_client]
  148. $rd XREADGROUP GROUP mygroup Alice BLOCK 100 STREAMS mystream ">"
  149. r XGROUP DESTROY mystream mygroup
  150. assert_error "*NOGROUP*" {$rd read}
  151. }
  152. test {XCLAIM can claim PEL items from another consumer} {
  153. # Add 3 items into the stream, and create a consumer group
  154. r del mystream
  155. set id1 [r XADD mystream * a 1]
  156. set id2 [r XADD mystream * b 2]
  157. set id3 [r XADD mystream * c 3]
  158. r XGROUP CREATE mystream mygroup 0
  159. # Client 1 reads item 1 from the stream without acknowledgements.
  160. # Client 2 then claims pending item 1 from the PEL of client 1
  161. set reply [
  162. r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream >
  163. ]
  164. assert {[llength [lindex $reply 0 1 0 1]] == 2}
  165. assert {[lindex $reply 0 1 0 1] eq {a 1}}
  166. r debug sleep 0.2
  167. set reply [
  168. r XCLAIM mystream mygroup client2 10 $id1
  169. ]
  170. assert {[llength [lindex $reply 0 1]] == 2}
  171. assert {[lindex $reply 0 1] eq {a 1}}
  172. # Client 1 reads another 2 items from stream
  173. r XREADGROUP GROUP mygroup client1 count 2 STREAMS mystream >
  174. r debug sleep 0.2
  175. # Delete item 2 from the stream. Now client 1 has PEL that contains
  176. # only item 3. Try to use client 2 to claim the deleted item 2
  177. # from the PEL of client 1, this should return nil
  178. r XDEL mystream $id2
  179. set reply [
  180. r XCLAIM mystream mygroup client2 10 $id2
  181. ]
  182. assert {[llength $reply] == 1}
  183. assert_equal "" [lindex $reply 0]
  184. # Delete item 3 from the stream. Now client 1 has PEL that is empty.
  185. # Try to use client 2 to claim the deleted item 3 from the PEL
  186. # of client 1, this should return nil
  187. r debug sleep 0.2
  188. r XDEL mystream $id3
  189. set reply [
  190. r XCLAIM mystream mygroup client2 10 $id3
  191. ]
  192. assert {[llength $reply] == 1}
  193. assert_equal "" [lindex $reply 0]
  194. }
  195. test {XCLAIM without JUSTID increments delivery count} {
  196. # Add 3 items into the stream, and create a consumer group
  197. r del mystream
  198. set id1 [r XADD mystream * a 1]
  199. set id2 [r XADD mystream * b 2]
  200. set id3 [r XADD mystream * c 3]
  201. r XGROUP CREATE mystream mygroup 0
  202. # Client 1 reads item 1 from the stream without acknowledgements.
  203. # Client 2 then claims pending item 1 from the PEL of client 1
  204. set reply [
  205. r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream >
  206. ]
  207. assert {[llength [lindex $reply 0 1 0 1]] == 2}
  208. assert {[lindex $reply 0 1 0 1] eq {a 1}}
  209. r debug sleep 0.2
  210. set reply [
  211. r XCLAIM mystream mygroup client2 10 $id1
  212. ]
  213. assert {[llength [lindex $reply 0 1]] == 2}
  214. assert {[lindex $reply 0 1] eq {a 1}}
  215. set reply [
  216. r XPENDING mystream mygroup - + 10
  217. ]
  218. assert {[llength [lindex $reply 0]] == 4}
  219. assert {[lindex $reply 0 3] == 2}
  220. # Client 3 then claims pending item 1 from the PEL of client 2 using JUSTID
  221. r debug sleep 0.2
  222. set reply [
  223. r XCLAIM mystream mygroup client3 10 $id1 JUSTID
  224. ]
  225. assert {[llength $reply] == 1}
  226. assert {[lindex $reply 0] eq $id1}
  227. set reply [
  228. r XPENDING mystream mygroup - + 10
  229. ]
  230. assert {[llength [lindex $reply 0]] == 4}
  231. assert {[lindex $reply 0 3] == 2}
  232. }
  233. start_server {} {
  234. set master [srv -1 client]
  235. set master_host [srv -1 host]
  236. set master_port [srv -1 port]
  237. set slave [srv 0 client]
  238. foreach noack {0 1} {
  239. test "Consumer group last ID propagation to slave (NOACK=$noack)" {
  240. $slave slaveof $master_host $master_port
  241. wait_for_condition 50 100 {
  242. [s 0 master_link_status] eq {up}
  243. } else {
  244. fail "Replication not started."
  245. }
  246. $master del stream
  247. $master xadd stream * a 1
  248. $master xadd stream * a 2
  249. $master xadd stream * a 3
  250. $master xgroup create stream mygroup 0
  251. # Consume the first two items on the master
  252. for {set j 0} {$j < 2} {incr j} {
  253. if {$noack} {
  254. set item [$master xreadgroup group mygroup \
  255. myconsumer COUNT 1 NOACK STREAMS stream >]
  256. } else {
  257. set item [$master xreadgroup group mygroup \
  258. myconsumer COUNT 1 STREAMS stream >]
  259. }
  260. set id [lindex $item 0 1 0 0]
  261. if {$noack == 0} {
  262. assert {[$master xack stream mygroup $id] eq "1"}
  263. }
  264. }
  265. wait_for_ofs_sync $master $slave
  266. # Turn slave into master
  267. $slave slaveof no one
  268. set item [$slave xreadgroup group mygroup myconsumer \
  269. COUNT 1 STREAMS stream >]
  270. # The consumed enty should be the third
  271. set myentry [lindex $item 0 1 0 1]
  272. assert {$myentry eq {a 3}}
  273. }
  274. }
  275. }
  276. start_server {tags {"stream"} overrides {appendonly yes aof-use-rdb-preamble no}} {
  277. test {Empty stream with no lastid can be rewrite into AOF correctly} {
  278. r XGROUP CREATE mystream group-name $ MKSTREAM
  279. assert {[dict get [r xinfo stream mystream] length] == 0}
  280. set grpinfo [r xinfo groups mystream]
  281. r bgrewriteaof
  282. waitForBgrewriteaof r
  283. r debug loadaof
  284. assert {[dict get [r xinfo stream mystream] length] == 0}
  285. assert {[r xinfo groups mystream] == $grpinfo}
  286. }
  287. }
  288. }