stream-cgroups.tcl 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. start_server {
  2. tags {"stream"}
  3. } {
  4. test {XGROUP CREATE: creation and duplicate group name detection} {
  5. r DEL mystream
  6. r XADD mystream * foo bar
  7. r XGROUP CREATE mystream mygroup $
  8. catch {r XGROUP CREATE mystream mygroup $} err
  9. set err
  10. } {BUSYGROUP*}
  11. test {XGROUP CREATE: automatic stream creation fails without MKSTREAM} {
  12. r DEL mystream
  13. catch {r XGROUP CREATE mystream mygroup $} err
  14. set err
  15. } {ERR*}
  16. test {XGROUP CREATE: automatic stream creation works with MKSTREAM} {
  17. r DEL mystream
  18. r XGROUP CREATE mystream mygroup $ MKSTREAM
  19. } {OK}
  20. test {XREADGROUP will return only new elements} {
  21. r XADD mystream * a 1
  22. r XADD mystream * b 2
  23. # XREADGROUP should return only the new elements "a 1" "b 1"
  24. # and not the element "foo bar" which was pre existing in the
  25. # stream (see previous test)
  26. set reply [
  27. r XREADGROUP GROUP mygroup client-1 STREAMS mystream ">"
  28. ]
  29. assert {[llength [lindex $reply 0 1]] == 2}
  30. lindex $reply 0 1 0 1
  31. } {a 1}
  32. test {XREADGROUP can read the history of the elements we own} {
  33. # Add a few more elements
  34. r XADD mystream * c 3
  35. r XADD mystream * d 4
  36. # Read a few elements using a different consumer name
  37. set reply [
  38. r XREADGROUP GROUP mygroup client-2 STREAMS mystream ">"
  39. ]
  40. assert {[llength [lindex $reply 0 1]] == 2}
  41. assert {[lindex $reply 0 1 0 1] eq {c 3}}
  42. set r1 [r XREADGROUP GROUP mygroup client-1 COUNT 10 STREAMS mystream 0]
  43. set r2 [r XREADGROUP GROUP mygroup client-2 COUNT 10 STREAMS mystream 0]
  44. assert {[lindex $r1 0 1 0 1] eq {a 1}}
  45. assert {[lindex $r2 0 1 0 1] eq {c 3}}
  46. }
  47. test {XPENDING is able to return pending items} {
  48. set pending [r XPENDING mystream mygroup - + 10]
  49. assert {[llength $pending] == 4}
  50. for {set j 0} {$j < 4} {incr j} {
  51. set item [lindex $pending $j]
  52. if {$j < 2} {
  53. set owner client-1
  54. } else {
  55. set owner client-2
  56. }
  57. assert {[lindex $item 1] eq $owner}
  58. assert {[lindex $item 1] eq $owner}
  59. }
  60. }
  61. test {XPENDING can return single consumer items} {
  62. set pending [r XPENDING mystream mygroup - + 10 client-1]
  63. assert {[llength $pending] == 2}
  64. }
  65. test {XACK is able to remove items from the client/group PEL} {
  66. set pending [r XPENDING mystream mygroup - + 10 client-1]
  67. set id1 [lindex $pending 0 0]
  68. set id2 [lindex $pending 1 0]
  69. assert {[r XACK mystream mygroup $id1] eq 1}
  70. set pending [r XPENDING mystream mygroup - + 10 client-1]
  71. assert {[llength $pending] == 1}
  72. set id [lindex $pending 0 0]
  73. assert {$id eq $id2}
  74. set global_pel [r XPENDING mystream mygroup - + 10]
  75. assert {[llength $global_pel] == 3}
  76. }
  77. test {XACK can't remove the same item multiple times} {
  78. assert {[r XACK mystream mygroup $id1] eq 0}
  79. }
  80. test {XACK is able to accept multiple arguments} {
  81. # One of the IDs was already removed, so it should ack
  82. # just ID2.
  83. assert {[r XACK mystream mygroup $id1 $id2] eq 1}
  84. }
  85. test {PEL NACK reassignment after XGROUP SETID event} {
  86. r del events
  87. r xadd events * f1 v1
  88. r xadd events * f1 v1
  89. r xadd events * f1 v1
  90. r xadd events * f1 v1
  91. r xgroup create events g1 $
  92. r xadd events * f1 v1
  93. set c [llength [lindex [r xreadgroup group g1 c1 streams events >] 0 1]]
  94. assert {$c == 1}
  95. r xgroup setid events g1 -
  96. set c [llength [lindex [r xreadgroup group g1 c2 streams events >] 0 1]]
  97. assert {$c == 5}
  98. }
  99. test {XREADGROUP will not report data on empty history. Bug #5577} {
  100. r del events
  101. r xadd events * a 1
  102. r xadd events * b 2
  103. r xadd events * c 3
  104. r xgroup create events mygroup 0
  105. # Current local PEL should be empty
  106. set res [r xpending events mygroup - + 10]
  107. assert {[llength $res] == 0}
  108. # So XREADGROUP should read an empty history as well
  109. set res [r xreadgroup group mygroup myconsumer count 3 streams events 0]
  110. assert {[llength [lindex $res 0 1]] == 0}
  111. # We should fetch all the elements in the stream asking for >
  112. set res [r xreadgroup group mygroup myconsumer count 3 streams events >]
  113. assert {[llength [lindex $res 0 1]] == 3}
  114. # Now the history is populated with three not acked entries
  115. set res [r xreadgroup group mygroup myconsumer count 3 streams events 0]
  116. assert {[llength [lindex $res 0 1]] == 3}
  117. }
  118. test {XREADGROUP history reporting of deleted entries. Bug #5570} {
  119. r del mystream
  120. r XGROUP CREATE mystream mygroup $ MKSTREAM
  121. r XADD mystream 1 field1 A
  122. r XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
  123. r XADD mystream MAXLEN 1 2 field1 B
  124. r XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
  125. # Now we have two pending entries, however one should be deleted
  126. # and one should be ok (we should only see "B")
  127. set res [r XREADGROUP GROUP mygroup myconsumer STREAMS mystream 0-1]
  128. assert {[lindex $res 0 1 0] == {1-0 {}}}
  129. assert {[lindex $res 0 1 1] == {2-0 {field1 B}}}
  130. }
  131. test {XCLAIM can claim PEL items from another consumer} {
  132. # Add 3 items into the stream, and create a consumer group
  133. r del mystream
  134. set id1 [r XADD mystream * a 1]
  135. set id2 [r XADD mystream * b 2]
  136. set id3 [r XADD mystream * c 3]
  137. r XGROUP CREATE mystream mygroup 0
  138. # Client 1 reads item 1 from the stream without acknowledgements.
  139. # Client 2 then claims pending item 1 from the PEL of client 1
  140. set reply [
  141. r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream >
  142. ]
  143. assert {[llength [lindex $reply 0 1 0 1]] == 2}
  144. assert {[lindex $reply 0 1 0 1] eq {a 1}}
  145. r debug sleep 0.2
  146. set reply [
  147. r XCLAIM mystream mygroup client2 10 $id1
  148. ]
  149. assert {[llength [lindex $reply 0 1]] == 2}
  150. assert {[lindex $reply 0 1] eq {a 1}}
  151. # Client 1 reads another 2 items from stream
  152. r XREADGROUP GROUP mygroup client1 count 2 STREAMS mystream >
  153. r debug sleep 0.2
  154. # Delete item 2 from the stream. Now client 1 has PEL that contains
  155. # only item 3. Try to use client 2 to claim the deleted item 2
  156. # from the PEL of client 1, this should return nil
  157. r XDEL mystream $id2
  158. set reply [
  159. r XCLAIM mystream mygroup client2 10 $id2
  160. ]
  161. assert {[llength $reply] == 1}
  162. assert_equal "" [lindex $reply 0]
  163. # Delete item 3 from the stream. Now client 1 has PEL that is empty.
  164. # Try to use client 2 to claim the deleted item 3 from the PEL
  165. # of client 1, this should return nil
  166. r debug sleep 0.2
  167. r XDEL mystream $id3
  168. set reply [
  169. r XCLAIM mystream mygroup client2 10 $id3
  170. ]
  171. assert {[llength $reply] == 1}
  172. assert_equal "" [lindex $reply 0]
  173. }
  174. test {XCLAIM without JUSTID increments delivery count} {
  175. # Add 3 items into the stream, and create a consumer group
  176. r del mystream
  177. set id1 [r XADD mystream * a 1]
  178. set id2 [r XADD mystream * b 2]
  179. set id3 [r XADD mystream * c 3]
  180. r XGROUP CREATE mystream mygroup 0
  181. # Client 1 reads item 1 from the stream without acknowledgements.
  182. # Client 2 then claims pending item 1 from the PEL of client 1
  183. set reply [
  184. r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream >
  185. ]
  186. assert {[llength [lindex $reply 0 1 0 1]] == 2}
  187. assert {[lindex $reply 0 1 0 1] eq {a 1}}
  188. r debug sleep 0.2
  189. set reply [
  190. r XCLAIM mystream mygroup client2 10 $id1
  191. ]
  192. assert {[llength [lindex $reply 0 1]] == 2}
  193. assert {[lindex $reply 0 1] eq {a 1}}
  194. set reply [
  195. r XPENDING mystream mygroup - + 10
  196. ]
  197. assert {[llength [lindex $reply 0]] == 4}
  198. assert {[lindex $reply 0 3] == 2}
  199. # Client 3 then claims pending item 1 from the PEL of client 2 using JUSTID
  200. r debug sleep 0.2
  201. set reply [
  202. r XCLAIM mystream mygroup client3 10 $id1 JUSTID
  203. ]
  204. assert {[llength $reply] == 1}
  205. assert {[lindex $reply 0] eq $id1}
  206. set reply [
  207. r XPENDING mystream mygroup - + 10
  208. ]
  209. assert {[llength [lindex $reply 0]] == 4}
  210. assert {[lindex $reply 0 3] == 2}
  211. }
  212. start_server {} {
  213. set master [srv -1 client]
  214. set master_host [srv -1 host]
  215. set master_port [srv -1 port]
  216. set slave [srv 0 client]
  217. foreach noack {0 1} {
  218. test "Consumer group last ID propagation to slave (NOACK=$noack)" {
  219. $slave slaveof $master_host $master_port
  220. wait_for_condition 50 100 {
  221. [s 0 master_link_status] eq {up}
  222. } else {
  223. fail "Replication not started."
  224. }
  225. $master del stream
  226. $master xadd stream * a 1
  227. $master xadd stream * a 2
  228. $master xadd stream * a 3
  229. $master xgroup create stream mygroup 0
  230. # Consume the first two items on the master
  231. for {set j 0} {$j < 2} {incr j} {
  232. if {$noack} {
  233. set item [$master xreadgroup group mygroup \
  234. myconsumer COUNT 1 NOACK STREAMS stream >]
  235. } else {
  236. set item [$master xreadgroup group mygroup \
  237. myconsumer COUNT 1 STREAMS stream >]
  238. }
  239. set id [lindex $item 0 1 0 0]
  240. if {$noack == 0} {
  241. assert {[$master xack stream mygroup $id] eq "1"}
  242. }
  243. }
  244. wait_for_ofs_sync $master $slave
  245. # Turn slave into master
  246. $slave slaveof no one
  247. set item [$slave xreadgroup group mygroup myconsumer \
  248. COUNT 1 STREAMS stream >]
  249. # The consumed enty should be the third
  250. set myentry [lindex $item 0 1 0 1]
  251. assert {$myentry eq {a 3}}
  252. }
  253. }
  254. }
  255. }