stream.tcl 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. # return value is like strcmp() and similar.
  2. proc streamCompareID {a b} {
  3. if {$a eq $b} {return 0}
  4. lassign [split $a -] a_ms a_seq
  5. lassign [split $b -] b_ms b_seq
  6. if {$a_ms > $b_ms} {return 1}
  7. if {$a_ms < $b_ms} {return -1}
  8. # Same ms case, compare seq.
  9. if {$a_seq > $b_seq} {return 1}
  10. if {$a_seq < $b_seq} {return -1}
  11. }
  12. # return the ID immediately greater than the specified one.
  13. # Note that this function does not care to handle 'seq' overflow
  14. # since it's a 64 bit value.
  15. proc streamNextID {id} {
  16. lassign [split $id -] ms seq
  17. incr seq
  18. join [list $ms $seq] -
  19. }
  20. # Generate a random stream entry ID with the ms part between min and max
  21. # and a low sequence number (0 - 999 range), in order to stress test
  22. # XRANGE against a Tcl implementation implementing the same concept
  23. # with Tcl-only code in a linear array.
  24. proc streamRandomID {min_id max_id} {
  25. lassign [split $min_id -] min_ms min_seq
  26. lassign [split $max_id -] max_ms max_seq
  27. set delta [expr {$max_ms-$min_ms+1}]
  28. set ms [expr {$min_ms+[randomInt $delta]}]
  29. set seq [randomInt 1000]
  30. return $ms-$seq
  31. }
  32. # Tcl-side implementation of XRANGE to perform fuzz testing in the Redis
  33. # XRANGE implementation.
  34. proc streamSimulateXRANGE {items start end} {
  35. set res {}
  36. foreach i $items {
  37. set this_id [lindex $i 0]
  38. if {[streamCompareID $this_id $start] >= 0} {
  39. if {[streamCompareID $this_id $end] <= 0} {
  40. lappend res $i
  41. }
  42. }
  43. }
  44. return $res
  45. }
  46. set content {} ;# Will be populated with Tcl side copy of the stream content.
  47. start_server {
  48. tags {"stream"}
  49. } {
  50. test {XADD can add entries into a stream that XRANGE can fetch} {
  51. r XADD mystream * item 1 value a
  52. r XADD mystream * item 2 value b
  53. assert_equal 2 [r XLEN mystream]
  54. set items [r XRANGE mystream - +]
  55. assert_equal [lindex $items 0 1] {item 1 value a}
  56. assert_equal [lindex $items 1 1] {item 2 value b}
  57. }
  58. test {XADD IDs are incremental} {
  59. set id1 [r XADD mystream * item 1 value a]
  60. set id2 [r XADD mystream * item 2 value b]
  61. set id3 [r XADD mystream * item 3 value c]
  62. assert {[streamCompareID $id1 $id2] == -1}
  63. assert {[streamCompareID $id2 $id3] == -1}
  64. }
  65. test {XADD IDs are incremental when ms is the same as well} {
  66. r multi
  67. r XADD mystream * item 1 value a
  68. r XADD mystream * item 2 value b
  69. r XADD mystream * item 3 value c
  70. lassign [r exec] id1 id2 id3
  71. assert {[streamCompareID $id1 $id2] == -1}
  72. assert {[streamCompareID $id2 $id3] == -1}
  73. }
  74. test {XADD with MAXLEN option} {
  75. r DEL mystream
  76. for {set j 0} {$j < 1000} {incr j} {
  77. if {rand() < 0.9} {
  78. r XADD mystream MAXLEN 5 * xitem $j
  79. } else {
  80. r XADD mystream MAXLEN 5 * yitem $j
  81. }
  82. }
  83. set res [r xrange mystream - +]
  84. set expected 995
  85. foreach r $res {
  86. assert {[lindex $r 1 1] == $expected}
  87. incr expected
  88. }
  89. }
  90. test {XADD mass insertion and XLEN} {
  91. r DEL mystream
  92. r multi
  93. for {set j 0} {$j < 10000} {incr j} {
  94. # From time to time insert a field with a different set
  95. # of fields in order to stress the stream compression code.
  96. if {rand() < 0.9} {
  97. r XADD mystream * item $j
  98. } else {
  99. r XADD mystream * item $j otherfield foo
  100. }
  101. }
  102. r exec
  103. set items [r XRANGE mystream - +]
  104. for {set j 0} {$j < 10000} {incr j} {
  105. assert {[lrange [lindex $items $j 1] 0 1] eq [list item $j]}
  106. }
  107. assert {[r xlen mystream] == $j}
  108. }
  109. test {XRANGE COUNT works as expected} {
  110. assert {[llength [r xrange mystream - + COUNT 10]] == 10}
  111. }
  112. test {XREVRANGE COUNT works as expected} {
  113. assert {[llength [r xrevrange mystream + - COUNT 10]] == 10}
  114. }
  115. test {XRANGE can be used to iterate the whole stream} {
  116. set last_id "-"
  117. set j 0
  118. while 1 {
  119. set elements [r xrange mystream $last_id + COUNT 100]
  120. if {[llength $elements] == 0} break
  121. foreach e $elements {
  122. assert {[lrange [lindex $e 1] 0 1] eq [list item $j]}
  123. incr j;
  124. }
  125. set last_id [streamNextID [lindex $elements end 0]]
  126. }
  127. assert {$j == 10000}
  128. }
  129. test {XREVRANGE returns the reverse of XRANGE} {
  130. assert {[r xrange mystream - +] == [lreverse [r xrevrange mystream + -]]}
  131. }
  132. test {XREAD with non empty stream} {
  133. set res [r XREAD COUNT 1 STREAMS mystream 0.0]
  134. assert {[lrange [lindex $res 0 1 0 1] 0 1] eq {item 0}}
  135. }
  136. test {Non blocking XREAD with empty streams} {
  137. set res [r XREAD STREAMS s1 s2 0.0 0.0]
  138. assert {$res eq {}}
  139. }
  140. test {XREAD with non empty second stream} {
  141. set res [r XREAD COUNT 1 STREAMS nostream mystream 0.0 0.0]
  142. assert {[lindex $res 0 0] eq {mystream}}
  143. assert {[lrange [lindex $res 0 1 0 1] 0 1] eq {item 0}}
  144. }
  145. test {Blocking XREAD waiting new data} {
  146. r XADD s2 * old abcd1234
  147. set rd [redis_deferring_client]
  148. $rd XREAD BLOCK 20000 STREAMS s1 s2 s3 $ $ $
  149. r XADD s2 * new abcd1234
  150. set res [$rd read]
  151. assert {[lindex $res 0 0] eq {s2}}
  152. assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
  153. }
  154. test {Blocking XREAD waiting old data} {
  155. set rd [redis_deferring_client]
  156. $rd XREAD BLOCK 20000 STREAMS s1 s2 s3 $ 0.0 $
  157. r XADD s2 * foo abcd1234
  158. set res [$rd read]
  159. assert {[lindex $res 0 0] eq {s2}}
  160. assert {[lindex $res 0 1 0 1] eq {old abcd1234}}
  161. }
  162. test "XREAD: XADD + DEL should not awake client" {
  163. set rd [redis_deferring_client]
  164. r del s1
  165. $rd XREAD BLOCK 20000 STREAMS s1 $
  166. r multi
  167. r XADD s1 * old abcd1234
  168. r DEL s1
  169. r exec
  170. r XADD s1 * new abcd1234
  171. set res [$rd read]
  172. assert {[lindex $res 0 0] eq {s1}}
  173. assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
  174. }
  175. test "XREAD: XADD + DEL + LPUSH should not awake client" {
  176. set rd [redis_deferring_client]
  177. r del s1
  178. $rd XREAD BLOCK 20000 STREAMS s1 $
  179. r multi
  180. r XADD s1 * old abcd1234
  181. r DEL s1
  182. r LPUSH s1 foo bar
  183. r exec
  184. r DEL s1
  185. r XADD s1 * new abcd1234
  186. set res [$rd read]
  187. assert {[lindex $res 0 0] eq {s1}}
  188. assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
  189. }
  190. test {XREAD with same stream name multiple times should work} {
  191. r XADD s2 * old abcd1234
  192. set rd [redis_deferring_client]
  193. $rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $
  194. r XADD s2 * new abcd1234
  195. set res [$rd read]
  196. assert {[lindex $res 0 0] eq {s2}}
  197. assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
  198. }
  199. test {XREAD + multiple XADD inside transaction} {
  200. r XADD s2 * old abcd1234
  201. set rd [redis_deferring_client]
  202. $rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $
  203. r MULTI
  204. r XADD s2 * field one
  205. r XADD s2 * field two
  206. r XADD s2 * field three
  207. r EXEC
  208. set res [$rd read]
  209. assert {[lindex $res 0 0] eq {s2}}
  210. assert {[lindex $res 0 1 0 1] eq {field one}}
  211. assert {[lindex $res 0 1 1 1] eq {field two}}
  212. }
  213. test {XRANGE fuzzing} {
  214. set low_id [lindex $items 0 0]
  215. set high_id [lindex $items end 0]
  216. for {set j 0} {$j < 100} {incr j} {
  217. set start [streamRandomID $low_id $high_id]
  218. set end [streamRandomID $low_id $high_id]
  219. set range [r xrange mystream $start $end]
  220. set tcl_range [streamSimulateXRANGE $items $start $end]
  221. if {$range ne $tcl_range} {
  222. puts "*** WARNING *** - XRANGE fuzzing mismatch: $start - $end"
  223. puts "---"
  224. puts "XRANGE: '$range'"
  225. puts "---"
  226. puts "TCL: '$tcl_range'"
  227. puts "---"
  228. fail "XRANGE fuzzing failed, check logs for details"
  229. }
  230. }
  231. }
  232. }