stream-cgroups.tcl 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. start_server {
  2. tags {"stream"}
  3. } {
  4. test {XGROUP CREATE: creation and duplicate group name detection} {
  5. r DEL mystream
  6. r XADD mystream * foo bar
  7. r XGROUP CREATE mystream mygroup $
  8. catch {r XGROUP CREATE mystream mygroup $} err
  9. set err
  10. } {BUSYGROUP*}
  11. test {XREADGROUP will return only new elements} {
  12. r XADD mystream * a 1
  13. r XADD mystream * b 2
  14. # XREADGROUP should return only the new elements "a 1" "b 1"
  15. # and not the element "foo bar" which was pre existing in the
  16. # stream (see previous test)
  17. set reply [
  18. r XREADGROUP GROUP mygroup client-1 STREAMS mystream ">"
  19. ]
  20. assert {[llength [lindex $reply 0 1]] == 2}
  21. lindex $reply 0 1 0 1
  22. } {a 1}
  23. test {XREADGROUP can read the history of the elements we own} {
  24. # Add a few more elements
  25. r XADD mystream * c 3
  26. r XADD mystream * d 4
  27. # Read a few elements using a different consumer name
  28. set reply [
  29. r XREADGROUP GROUP mygroup client-2 STREAMS mystream ">"
  30. ]
  31. assert {[llength [lindex $reply 0 1]] == 2}
  32. assert {[lindex $reply 0 1 0 1] eq {c 3}}
  33. set r1 [r XREADGROUP GROUP mygroup client-1 COUNT 10 STREAMS mystream 0]
  34. set r2 [r XREADGROUP GROUP mygroup client-2 COUNT 10 STREAMS mystream 0]
  35. assert {[lindex $r1 0 1 0 1] eq {a 1}}
  36. assert {[lindex $r2 0 1 0 1] eq {c 3}}
  37. }
  38. test {XPENDING is able to return pending items} {
  39. set pending [r XPENDING mystream mygroup - + 10]
  40. assert {[llength $pending] == 4}
  41. for {set j 0} {$j < 4} {incr j} {
  42. set item [lindex $pending $j]
  43. if {$j < 2} {
  44. set owner client-1
  45. } else {
  46. set owner client-2
  47. }
  48. assert {[lindex $item 1] eq $owner}
  49. assert {[lindex $item 1] eq $owner}
  50. }
  51. }
  52. test {XPENDING can return single consumer items} {
  53. set pending [r XPENDING mystream mygroup - + 10 client-1]
  54. assert {[llength $pending] == 2}
  55. }
  56. test {XACK is able to remove items from the client/group PEL} {
  57. set pending [r XPENDING mystream mygroup - + 10 client-1]
  58. set id1 [lindex $pending 0 0]
  59. set id2 [lindex $pending 1 0]
  60. assert {[r XACK mystream mygroup $id1] eq 1}
  61. set pending [r XPENDING mystream mygroup - + 10 client-1]
  62. assert {[llength $pending] == 1}
  63. set id [lindex $pending 0 0]
  64. assert {$id eq $id2}
  65. set global_pel [r XPENDING mystream mygroup - + 10]
  66. assert {[llength $global_pel] == 3}
  67. }
  68. test {XACK can't remove the same item multiple times} {
  69. assert {[r XACK mystream mygroup $id1] eq 0}
  70. }
  71. test {XACK is able to accept multiple arguments} {
  72. # One of the IDs was already removed, so it should ack
  73. # just ID2.
  74. assert {[r XACK mystream mygroup $id1 $id2] eq 1}
  75. }
  76. test {PEL NACK reassignment after XGROUP SETID event} {
  77. r del events
  78. r xadd events * f1 v1
  79. r xadd events * f1 v1
  80. r xadd events * f1 v1
  81. r xadd events * f1 v1
  82. r xgroup create events g1 $
  83. r xadd events * f1 v1
  84. set c [llength [lindex [r xreadgroup group g1 c1 streams events >] 0 1]]
  85. assert {$c == 1}
  86. r xgroup setid events g1 -
  87. set c [llength [lindex [r xreadgroup group g1 c2 streams events >] 0 1]]
  88. assert {$c == 5}
  89. }
  90. }