stream-cgroups.tcl 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. start_server {
  2. tags {"stream"}
  3. } {
  4. test {XGROUP CREATE: creation and duplicate group name detection} {
  5. r DEL mystream
  6. r XADD mystream * foo bar
  7. r XGROUP CREATE mystream mygroup $
  8. catch {r XGROUP CREATE mystream mygroup $} err
  9. set err
  10. } {BUSYGROUP*}
  11. test {XGROUP CREATE: automatic stream creation fails without MKSTREAM} {
  12. r DEL mystream
  13. catch {r XGROUP CREATE mystream mygroup $} err
  14. set err
  15. } {ERR*}
  16. test {XGROUP CREATE: automatic stream creation works with MKSTREAM} {
  17. r DEL mystream
  18. r XGROUP CREATE mystream mygroup $ MKSTREAM
  19. } {OK}
  20. test {XREADGROUP will return only new elements} {
  21. r XADD mystream * a 1
  22. r XADD mystream * b 2
  23. # XREADGROUP should return only the new elements "a 1" "b 1"
  24. # and not the element "foo bar" which was pre existing in the
  25. # stream (see previous test)
  26. set reply [
  27. r XREADGROUP GROUP mygroup client-1 STREAMS mystream ">"
  28. ]
  29. assert {[llength [lindex $reply 0 1]] == 2}
  30. lindex $reply 0 1 0 1
  31. } {a 1}
  32. test {XREADGROUP can read the history of the elements we own} {
  33. # Add a few more elements
  34. r XADD mystream * c 3
  35. r XADD mystream * d 4
  36. # Read a few elements using a different consumer name
  37. set reply [
  38. r XREADGROUP GROUP mygroup client-2 STREAMS mystream ">"
  39. ]
  40. assert {[llength [lindex $reply 0 1]] == 2}
  41. assert {[lindex $reply 0 1 0 1] eq {c 3}}
  42. set r1 [r XREADGROUP GROUP mygroup client-1 COUNT 10 STREAMS mystream 0]
  43. set r2 [r XREADGROUP GROUP mygroup client-2 COUNT 10 STREAMS mystream 0]
  44. assert {[lindex $r1 0 1 0 1] eq {a 1}}
  45. assert {[lindex $r2 0 1 0 1] eq {c 3}}
  46. }
  47. test {XPENDING is able to return pending items} {
  48. set pending [r XPENDING mystream mygroup - + 10]
  49. assert {[llength $pending] == 4}
  50. for {set j 0} {$j < 4} {incr j} {
  51. set item [lindex $pending $j]
  52. if {$j < 2} {
  53. set owner client-1
  54. } else {
  55. set owner client-2
  56. }
  57. assert {[lindex $item 1] eq $owner}
  58. assert {[lindex $item 1] eq $owner}
  59. }
  60. }
  61. test {XPENDING can return single consumer items} {
  62. set pending [r XPENDING mystream mygroup - + 10 client-1]
  63. assert {[llength $pending] == 2}
  64. }
  65. test {XACK is able to remove items from the client/group PEL} {
  66. set pending [r XPENDING mystream mygroup - + 10 client-1]
  67. set id1 [lindex $pending 0 0]
  68. set id2 [lindex $pending 1 0]
  69. assert {[r XACK mystream mygroup $id1] eq 1}
  70. set pending [r XPENDING mystream mygroup - + 10 client-1]
  71. assert {[llength $pending] == 1}
  72. set id [lindex $pending 0 0]
  73. assert {$id eq $id2}
  74. set global_pel [r XPENDING mystream mygroup - + 10]
  75. assert {[llength $global_pel] == 3}
  76. }
  77. test {XACK can't remove the same item multiple times} {
  78. assert {[r XACK mystream mygroup $id1] eq 0}
  79. }
  80. test {XACK is able to accept multiple arguments} {
  81. # One of the IDs was already removed, so it should ack
  82. # just ID2.
  83. assert {[r XACK mystream mygroup $id1 $id2] eq 1}
  84. }
  85. test {XACK should fail if got at least one invalid ID} {
  86. r del mystream
  87. r xgroup create s g $ MKSTREAM
  88. r xadd s * f1 v1
  89. set c [llength [lindex [r xreadgroup group g c streams s >] 0 1]]
  90. assert {$c == 1}
  91. set pending [r xpending s g - + 10 c]
  92. set id1 [lindex $pending 0 0]
  93. assert_error "*Invalid stream ID specified*" {r xack s g $id1 invalid-id}
  94. assert {[r xack s g $id1] eq 1}
  95. }
  96. test {PEL NACK reassignment after XGROUP SETID event} {
  97. r del events
  98. r xadd events * f1 v1
  99. r xadd events * f1 v1
  100. r xadd events * f1 v1
  101. r xadd events * f1 v1
  102. r xgroup create events g1 $
  103. r xadd events * f1 v1
  104. set c [llength [lindex [r xreadgroup group g1 c1 streams events >] 0 1]]
  105. assert {$c == 1}
  106. r xgroup setid events g1 -
  107. set c [llength [lindex [r xreadgroup group g1 c2 streams events >] 0 1]]
  108. assert {$c == 5}
  109. }
  110. test {XREADGROUP will not report data on empty history. Bug #5577} {
  111. r del events
  112. r xadd events * a 1
  113. r xadd events * b 2
  114. r xadd events * c 3
  115. r xgroup create events mygroup 0
  116. # Current local PEL should be empty
  117. set res [r xpending events mygroup - + 10]
  118. assert {[llength $res] == 0}
  119. # So XREADGROUP should read an empty history as well
  120. set res [r xreadgroup group mygroup myconsumer count 3 streams events 0]
  121. assert {[llength [lindex $res 0 1]] == 0}
  122. # We should fetch all the elements in the stream asking for >
  123. set res [r xreadgroup group mygroup myconsumer count 3 streams events >]
  124. assert {[llength [lindex $res 0 1]] == 3}
  125. # Now the history is populated with three not acked entries
  126. set res [r xreadgroup group mygroup myconsumer count 3 streams events 0]
  127. assert {[llength [lindex $res 0 1]] == 3}
  128. }
  129. test {XREADGROUP history reporting of deleted entries. Bug #5570} {
  130. r del mystream
  131. r XGROUP CREATE mystream mygroup $ MKSTREAM
  132. r XADD mystream 1 field1 A
  133. r XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
  134. r XADD mystream MAXLEN 1 2 field1 B
  135. r XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
  136. # Now we have two pending entries, however one should be deleted
  137. # and one should be ok (we should only see "B")
  138. set res [r XREADGROUP GROUP mygroup myconsumer STREAMS mystream 0-1]
  139. assert {[lindex $res 0 1 0] == {1-0 {}}}
  140. assert {[lindex $res 0 1 1] == {2-0 {field1 B}}}
  141. }
  142. test {Blocking XREADGROUP will not reply with an empty array} {
  143. r del mystream
  144. r XGROUP CREATE mystream mygroup $ MKSTREAM
  145. r XADD mystream 666 f v
  146. set res [r XREADGROUP GROUP mygroup Alice BLOCK 10 STREAMS mystream ">"]
  147. assert {[lindex $res 0 1 0] == {666-0 {f v}}}
  148. r XADD mystream 667 f2 v2
  149. r XDEL mystream 667
  150. set rd [redis_deferring_client]
  151. $rd XREADGROUP GROUP mygroup Alice BLOCK 10 STREAMS mystream ">"
  152. after 20
  153. assert {[$rd read] == {}} ;# before the fix, client didn't even block, but was served synchronously with {mystream {}}
  154. }
  155. test {XGROUP DESTROY should unblock XREADGROUP with -NOGROUP} {
  156. r del mystream
  157. r XGROUP CREATE mystream mygroup $ MKSTREAM
  158. set rd [redis_deferring_client]
  159. $rd XREADGROUP GROUP mygroup Alice BLOCK 100 STREAMS mystream ">"
  160. r XGROUP DESTROY mystream mygroup
  161. assert_error "*NOGROUP*" {$rd read}
  162. }
  163. test {RENAME can unblock XREADGROUP with data} {
  164. r del mystream
  165. r XGROUP CREATE mystream mygroup $ MKSTREAM
  166. set rd [redis_deferring_client]
  167. $rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">"
  168. r XGROUP CREATE mystream2 mygroup $ MKSTREAM
  169. r XADD mystream2 100 f1 v1
  170. r RENAME mystream2 mystream
  171. assert_equal "{mystream {{100-0 {f1 v1}}}}" [$rd read] ;# mystream2 had mygroup before RENAME
  172. }
  173. test {RENAME can unblock XREADGROUP with -NOGROUP} {
  174. r del mystream
  175. r XGROUP CREATE mystream mygroup $ MKSTREAM
  176. set rd [redis_deferring_client]
  177. $rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">"
  178. r XADD mystream2 100 f1 v1
  179. r RENAME mystream2 mystream
  180. assert_error "*NOGROUP*" {$rd read} ;# mystream2 didn't have mygroup before RENAME
  181. }
  182. test {XCLAIM can claim PEL items from another consumer} {
  183. # Add 3 items into the stream, and create a consumer group
  184. r del mystream
  185. set id1 [r XADD mystream * a 1]
  186. set id2 [r XADD mystream * b 2]
  187. set id3 [r XADD mystream * c 3]
  188. r XGROUP CREATE mystream mygroup 0
  189. # Client 1 reads item 1 from the stream without acknowledgements.
  190. # Client 2 then claims pending item 1 from the PEL of client 1
  191. set reply [
  192. r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream >
  193. ]
  194. assert {[llength [lindex $reply 0 1 0 1]] == 2}
  195. assert {[lindex $reply 0 1 0 1] eq {a 1}}
  196. r debug sleep 0.2
  197. set reply [
  198. r XCLAIM mystream mygroup client2 10 $id1
  199. ]
  200. assert {[llength [lindex $reply 0 1]] == 2}
  201. assert {[lindex $reply 0 1] eq {a 1}}
  202. # Client 1 reads another 2 items from stream
  203. r XREADGROUP GROUP mygroup client1 count 2 STREAMS mystream >
  204. r debug sleep 0.2
  205. # Delete item 2 from the stream. Now client 1 has PEL that contains
  206. # only item 3. Try to use client 2 to claim the deleted item 2
  207. # from the PEL of client 1, this should return nil
  208. r XDEL mystream $id2
  209. set reply [
  210. r XCLAIM mystream mygroup client2 10 $id2
  211. ]
  212. assert {[llength $reply] == 1}
  213. assert_equal "" [lindex $reply 0]
  214. # Delete item 3 from the stream. Now client 1 has PEL that is empty.
  215. # Try to use client 2 to claim the deleted item 3 from the PEL
  216. # of client 1, this should return nil
  217. r debug sleep 0.2
  218. r XDEL mystream $id3
  219. set reply [
  220. r XCLAIM mystream mygroup client2 10 $id3
  221. ]
  222. assert {[llength $reply] == 1}
  223. assert_equal "" [lindex $reply 0]
  224. }
  225. test {XCLAIM without JUSTID increments delivery count} {
  226. # Add 3 items into the stream, and create a consumer group
  227. r del mystream
  228. set id1 [r XADD mystream * a 1]
  229. set id2 [r XADD mystream * b 2]
  230. set id3 [r XADD mystream * c 3]
  231. r XGROUP CREATE mystream mygroup 0
  232. # Client 1 reads item 1 from the stream without acknowledgements.
  233. # Client 2 then claims pending item 1 from the PEL of client 1
  234. set reply [
  235. r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream >
  236. ]
  237. assert {[llength [lindex $reply 0 1 0 1]] == 2}
  238. assert {[lindex $reply 0 1 0 1] eq {a 1}}
  239. r debug sleep 0.2
  240. set reply [
  241. r XCLAIM mystream mygroup client2 10 $id1
  242. ]
  243. assert {[llength [lindex $reply 0 1]] == 2}
  244. assert {[lindex $reply 0 1] eq {a 1}}
  245. set reply [
  246. r XPENDING mystream mygroup - + 10
  247. ]
  248. assert {[llength [lindex $reply 0]] == 4}
  249. assert {[lindex $reply 0 3] == 2}
  250. # Client 3 then claims pending item 1 from the PEL of client 2 using JUSTID
  251. r debug sleep 0.2
  252. set reply [
  253. r XCLAIM mystream mygroup client3 10 $id1 JUSTID
  254. ]
  255. assert {[llength $reply] == 1}
  256. assert {[lindex $reply 0] eq $id1}
  257. set reply [
  258. r XPENDING mystream mygroup - + 10
  259. ]
  260. assert {[llength [lindex $reply 0]] == 4}
  261. assert {[lindex $reply 0 3] == 2}
  262. }
  263. test {XINFO FULL output} {
  264. r del x
  265. r XADD x 100 a 1
  266. r XADD x 101 b 1
  267. r XADD x 102 c 1
  268. r XADD x 103 e 1
  269. r XADD x 104 f 1
  270. r XGROUP CREATE x g1 0
  271. r XGROUP CREATE x g2 0
  272. r XREADGROUP GROUP g1 Alice COUNT 1 STREAMS x >
  273. r XREADGROUP GROUP g1 Bob COUNT 1 STREAMS x >
  274. r XREADGROUP GROUP g1 Bob NOACK COUNT 1 STREAMS x >
  275. r XREADGROUP GROUP g2 Charlie COUNT 4 STREAMS x >
  276. r XDEL x 103
  277. set reply [r XINFO STREAM x FULL]
  278. assert_equal [llength $reply] 12
  279. assert_equal [lindex $reply 1] 4 ;# stream length
  280. assert_equal [lindex $reply 9] "{100-0 {a 1}} {101-0 {b 1}} {102-0 {c 1}} {104-0 {f 1}}" ;# entries
  281. assert_equal [lindex $reply 11 0 1] "g1" ;# first group name
  282. assert_equal [lindex $reply 11 0 7 0 0] "100-0" ;# first entry in group's PEL
  283. assert_equal [lindex $reply 11 0 9 0 1] "Alice" ;# first consumer
  284. assert_equal [lindex $reply 11 0 9 0 7 0 0] "100-0" ;# first entry in first consumer's PEL
  285. assert_equal [lindex $reply 11 1 1] "g2" ;# second group name
  286. assert_equal [lindex $reply 11 1 9 0 1] "Charlie" ;# first consumer
  287. assert_equal [lindex $reply 11 1 9 0 7 0 0] "100-0" ;# first entry in first consumer's PEL
  288. assert_equal [lindex $reply 11 1 9 0 7 1 0] "101-0" ;# second entry in first consumer's PEL
  289. set reply [r XINFO STREAM x FULL COUNT 1]
  290. assert_equal [llength $reply] 12
  291. assert_equal [lindex $reply 1] 4
  292. assert_equal [lindex $reply 9] "{100-0 {a 1}}"
  293. }
  294. start_server {} {
  295. set master [srv -1 client]
  296. set master_host [srv -1 host]
  297. set master_port [srv -1 port]
  298. set slave [srv 0 client]
  299. foreach noack {0 1} {
  300. test "Consumer group last ID propagation to slave (NOACK=$noack)" {
  301. $slave slaveof $master_host $master_port
  302. wait_for_condition 50 100 {
  303. [s 0 master_link_status] eq {up}
  304. } else {
  305. fail "Replication not started."
  306. }
  307. $master del stream
  308. $master xadd stream * a 1
  309. $master xadd stream * a 2
  310. $master xadd stream * a 3
  311. $master xgroup create stream mygroup 0
  312. # Consume the first two items on the master
  313. for {set j 0} {$j < 2} {incr j} {
  314. if {$noack} {
  315. set item [$master xreadgroup group mygroup \
  316. myconsumer COUNT 1 NOACK STREAMS stream >]
  317. } else {
  318. set item [$master xreadgroup group mygroup \
  319. myconsumer COUNT 1 STREAMS stream >]
  320. }
  321. set id [lindex $item 0 1 0 0]
  322. if {$noack == 0} {
  323. assert {[$master xack stream mygroup $id] eq "1"}
  324. }
  325. }
  326. wait_for_ofs_sync $master $slave
  327. # Turn slave into master
  328. $slave slaveof no one
  329. set item [$slave xreadgroup group mygroup myconsumer \
  330. COUNT 1 STREAMS stream >]
  331. # The consumed enty should be the third
  332. set myentry [lindex $item 0 1 0 1]
  333. assert {$myentry eq {a 3}}
  334. }
  335. }
  336. }
  337. start_server {tags {"stream"} overrides {appendonly yes aof-use-rdb-preamble no}} {
  338. test {Empty stream with no lastid can be rewrite into AOF correctly} {
  339. r XGROUP CREATE mystream group-name $ MKSTREAM
  340. assert {[dict get [r xinfo stream mystream] length] == 0}
  341. set grpinfo [r xinfo groups mystream]
  342. r bgrewriteaof
  343. waitForBgrewriteaof r
  344. r debug loadaof
  345. assert {[dict get [r xinfo stream mystream] length] == 0}
  346. assert {[r xinfo groups mystream] == $grpinfo}
  347. }
  348. }
  349. }