stream.tcl 15 KB


  1. # return value is like strcmp() and similar.
  2. proc streamCompareID {a b} {
  3. if {$a eq $b} {return 0}
  4. lassign [split $a -] a_ms a_seq
  5. lassign [split $b -] b_ms b_seq
  6. if {$a_ms > $b_ms} {return 1}
  7. if {$a_ms < $b_ms} {return -1}
  8. # Same ms case, compare seq.
  9. if {$a_seq > $b_seq} {return 1}
  10. if {$a_seq < $b_seq} {return -1}
  11. }
  12. # return the ID immediately greater than the specified one.
  13. # Note that this function does not care to handle 'seq' overflow
  14. # since it's a 64 bit value.
  15. proc streamNextID {id} {
  16. lassign [split $id -] ms seq
  17. incr seq
  18. join [list $ms $seq] -
  19. }
  20. # Generate a random stream entry ID with the ms part between min and max
  21. # and a low sequence number (0 - 999 range), in order to stress test
  22. # XRANGE against a Tcl implementation implementing the same concept
  23. # with Tcl-only code in a linear array.
  24. proc streamRandomID {min_id max_id} {
  25. lassign [split $min_id -] min_ms min_seq
  26. lassign [split $max_id -] max_ms max_seq
  27. set delta [expr {$max_ms-$min_ms+1}]
  28. set ms [expr {$min_ms+[randomInt $delta]}]
  29. set seq [randomInt 1000]
  30. return $ms-$seq
  31. }
  32. # Tcl-side implementation of XRANGE to perform fuzz testing in the Redis
  33. # XRANGE implementation.
  34. proc streamSimulateXRANGE {items start end} {
  35. set res {}
  36. foreach i $items {
  37. set this_id [lindex $i 0]
  38. if {[streamCompareID $this_id $start] >= 0} {
  39. if {[streamCompareID $this_id $end] <= 0} {
  40. lappend res $i
  41. }
  42. }
  43. }
  44. return $res
  45. }
  46. set content {} ;# Will be populated with Tcl side copy of the stream content.
  47. start_server {
  48. tags {"stream"}
  49. } {
  50. test {XADD can add entries into a stream that XRANGE can fetch} {
  51. r XADD mystream * item 1 value a
  52. r XADD mystream * item 2 value b
  53. assert_equal 2 [r XLEN mystream]
  54. set items [r XRANGE mystream - +]
  55. assert_equal [lindex $items 0 1] {item 1 value a}
  56. assert_equal [lindex $items 1 1] {item 2 value b}
  57. }
  58. test {XADD IDs are incremental} {
  59. set id1 [r XADD mystream * item 1 value a]
  60. set id2 [r XADD mystream * item 2 value b]
  61. set id3 [r XADD mystream * item 3 value c]
  62. assert {[streamCompareID $id1 $id2] == -1}
  63. assert {[streamCompareID $id2 $id3] == -1}
  64. }
  65. test {XADD IDs are incremental when ms is the same as well} {
  66. r multi
  67. r XADD mystream * item 1 value a
  68. r XADD mystream * item 2 value b
  69. r XADD mystream * item 3 value c
  70. lassign [r exec] id1 id2 id3
  71. assert {[streamCompareID $id1 $id2] == -1}
  72. assert {[streamCompareID $id2 $id3] == -1}
  73. }
  74. test {XADD IDs correctly report an error when overflowing} {
  75. r DEL mystream
  76. r xadd mystream 18446744073709551615-18446744073709551615 a b
  77. assert_error ERR* {r xadd mystream * c d}
  78. }
  79. test {XADD with MAXLEN option} {
  80. r DEL mystream
  81. for {set j 0} {$j < 1000} {incr j} {
  82. if {rand() < 0.9} {
  83. r XADD mystream MAXLEN 5 * xitem $j
  84. } else {
  85. r XADD mystream MAXLEN 5 * yitem $j
  86. }
  87. }
  88. set res [r xrange mystream - +]
  89. set expected 995
  90. foreach r $res {
  91. assert {[lindex $r 1 1] == $expected}
  92. incr expected
  93. }
  94. }
  95. test {XADD mass insertion and XLEN} {
  96. r DEL mystream
  97. r multi
  98. for {set j 0} {$j < 10000} {incr j} {
  99. # From time to time insert a field with a different set
  100. # of fields in order to stress the stream compression code.
  101. if {rand() < 0.9} {
  102. r XADD mystream * item $j
  103. } else {
  104. r XADD mystream * item $j otherfield foo
  105. }
  106. }
  107. r exec
  108. set items [r XRANGE mystream - +]
  109. for {set j 0} {$j < 10000} {incr j} {
  110. assert {[lrange [lindex $items $j 1] 0 1] eq [list item $j]}
  111. }
  112. assert {[r xlen mystream] == $j}
  113. }
  114. test {XADD with ID 0-0} {
  115. r DEL otherstream
  116. catch {r XADD otherstream 0-0 k v} err
  117. assert {[r EXISTS otherstream] == 0}
  118. }
  119. test {XRANGE COUNT works as expected} {
  120. assert {[llength [r xrange mystream - + COUNT 10]] == 10}
  121. }
  122. test {XREVRANGE COUNT works as expected} {
  123. assert {[llength [r xrevrange mystream + - COUNT 10]] == 10}
  124. }
  125. test {XRANGE can be used to iterate the whole stream} {
  126. set last_id "-"
  127. set j 0
  128. while 1 {
  129. set elements [r xrange mystream $last_id + COUNT 100]
  130. if {[llength $elements] == 0} break
  131. foreach e $elements {
  132. assert {[lrange [lindex $e 1] 0 1] eq [list item $j]}
  133. incr j;
  134. }
  135. set last_id [streamNextID [lindex $elements end 0]]
  136. }
  137. assert {$j == 10000}
  138. }
  139. test {XREVRANGE returns the reverse of XRANGE} {
  140. assert {[r xrange mystream - +] == [lreverse [r xrevrange mystream + -]]}
  141. }
  142. test {XREAD with non empty stream} {
  143. set res [r XREAD COUNT 1 STREAMS mystream 0-0]
  144. assert {[lrange [lindex $res 0 1 0 1] 0 1] eq {item 0}}
  145. }
  146. test {Non blocking XREAD with empty streams} {
  147. set res [r XREAD STREAMS s1 s2 0-0 0-0]
  148. assert {$res eq {}}
  149. }
  150. test {XREAD with non empty second stream} {
  151. set res [r XREAD COUNT 1 STREAMS nostream mystream 0-0 0-0]
  152. assert {[lindex $res 0 0] eq {mystream}}
  153. assert {[lrange [lindex $res 0 1 0 1] 0 1] eq {item 0}}
  154. }
  155. test {Blocking XREAD waiting new data} {
  156. r XADD s2 * old abcd1234
  157. set rd [redis_deferring_client]
  158. $rd XREAD BLOCK 20000 STREAMS s1 s2 s3 $ $ $
  159. r XADD s2 * new abcd1234
  160. set res [$rd read]
  161. assert {[lindex $res 0 0] eq {s2}}
  162. assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
  163. }
  164. test {Blocking XREAD waiting old data} {
  165. set rd [redis_deferring_client]
  166. $rd XREAD BLOCK 20000 STREAMS s1 s2 s3 $ 0-0 $
  167. r XADD s2 * foo abcd1234
  168. set res [$rd read]
  169. assert {[lindex $res 0 0] eq {s2}}
  170. assert {[lindex $res 0 1 0 1] eq {old abcd1234}}
  171. }
  172. test {Blocking XREAD will not reply with an empty array} {
  173. r del s1
  174. r XADD s1 666 f v
  175. r XADD s1 667 f2 v2
  176. r XDEL s1 667
  177. set rd [redis_deferring_client]
  178. $rd XREAD BLOCK 10 STREAMS s1 666
  179. after 20
  180. assert {[$rd read] == {}} ;# before the fix, client didn't even block, but was served synchronously with {s1 {}}
  181. }
  182. test "XREAD: XADD + DEL should not awake client" {
  183. set rd [redis_deferring_client]
  184. r del s1
  185. $rd XREAD BLOCK 20000 STREAMS s1 $
  186. r multi
  187. r XADD s1 * old abcd1234
  188. r DEL s1
  189. r exec
  190. r XADD s1 * new abcd1234
  191. set res [$rd read]
  192. assert {[lindex $res 0 0] eq {s1}}
  193. assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
  194. }
  195. test "XREAD: XADD + DEL + LPUSH should not awake client" {
  196. set rd [redis_deferring_client]
  197. r del s1
  198. $rd XREAD BLOCK 20000 STREAMS s1 $
  199. r multi
  200. r XADD s1 * old abcd1234
  201. r DEL s1
  202. r LPUSH s1 foo bar
  203. r exec
  204. r DEL s1
  205. r XADD s1 * new abcd1234
  206. set res [$rd read]
  207. assert {[lindex $res 0 0] eq {s1}}
  208. assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
  209. }
  210. test {XREAD with same stream name multiple times should work} {
  211. r XADD s2 * old abcd1234
  212. set rd [redis_deferring_client]
  213. $rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $
  214. r XADD s2 * new abcd1234
  215. set res [$rd read]
  216. assert {[lindex $res 0 0] eq {s2}}
  217. assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
  218. }
  219. test {XREAD + multiple XADD inside transaction} {
  220. r XADD s2 * old abcd1234
  221. set rd [redis_deferring_client]
  222. $rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $
  223. r MULTI
  224. r XADD s2 * field one
  225. r XADD s2 * field two
  226. r XADD s2 * field three
  227. r EXEC
  228. set res [$rd read]
  229. assert {[lindex $res 0 0] eq {s2}}
  230. assert {[lindex $res 0 1 0 1] eq {field one}}
  231. assert {[lindex $res 0 1 1 1] eq {field two}}
  232. }
  233. test {XDEL basic test} {
  234. r del somestream
  235. r xadd somestream * foo value0
  236. set id [r xadd somestream * foo value1]
  237. r xadd somestream * foo value2
  238. r xdel somestream $id
  239. assert {[r xlen somestream] == 2}
  240. set result [r xrange somestream - +]
  241. assert {[lindex $result 0 1 1] eq {value0}}
  242. assert {[lindex $result 1 1 1] eq {value2}}
  243. }
  244. # Here the idea is to check the consistency of the stream data structure
  245. # as we remove all the elements down to zero elements.
  246. test {XDEL fuzz test} {
  247. r del somestream
  248. set ids {}
  249. set x 0; # Length of the stream
  250. while 1 {
  251. lappend ids [r xadd somestream * item $x]
  252. incr x
  253. # Add enough elements to have a few radix tree nodes inside the stream.
  254. if {[dict get [r xinfo stream somestream] radix-tree-keys] > 20} break
  255. }
  256. # Now remove all the elements till we reach an empty stream
  257. # and after every deletion, check that the stream is sane enough
  258. # to report the right number of elements with XRANGE: this will also
  259. # force accessing the whole data structure to check sanity.
  260. assert {[r xlen somestream] == $x}
  261. # We want to remove elements in random order to really test the
  262. # implementation in a better way.
  263. set ids [lshuffle $ids]
  264. foreach id $ids {
  265. assert {[r xdel somestream $id] == 1}
  266. incr x -1
  267. assert {[r xlen somestream] == $x}
  268. # The test would be too slow calling XRANGE for every iteration.
  269. # Do it every 100 removal.
  270. if {$x % 100 == 0} {
  271. set res [r xrange somestream - +]
  272. assert {[llength $res] == $x}
  273. }
  274. }
  275. }
  276. test {XRANGE fuzzing} {
  277. set low_id [lindex $items 0 0]
  278. set high_id [lindex $items end 0]
  279. for {set j 0} {$j < 100} {incr j} {
  280. set start [streamRandomID $low_id $high_id]
  281. set end [streamRandomID $low_id $high_id]
  282. set range [r xrange mystream $start $end]
  283. set tcl_range [streamSimulateXRANGE $items $start $end]
  284. if {$range ne $tcl_range} {
  285. puts "*** WARNING *** - XRANGE fuzzing mismatch: $start - $end"
  286. puts "---"
  287. puts "XRANGE: '$range'"
  288. puts "---"
  289. puts "TCL: '$tcl_range'"
  290. puts "---"
  291. fail "XRANGE fuzzing failed, check logs for details"
  292. }
  293. }
  294. }
  295. test {XREVRANGE regression test for issue #5006} {
  296. # Add non compressed entries
  297. r xadd teststream 1234567891230 key1 value1
  298. r xadd teststream 1234567891240 key2 value2
  299. r xadd teststream 1234567891250 key3 value3
  300. # Add SAMEFIELD compressed entries
  301. r xadd teststream2 1234567891230 key1 value1
  302. r xadd teststream2 1234567891240 key1 value2
  303. r xadd teststream2 1234567891250 key1 value3
  304. assert_equal [r xrevrange teststream 1234567891245 -] {{1234567891240-0 {key2 value2}} {1234567891230-0 {key1 value1}}}
  305. assert_equal [r xrevrange teststream2 1234567891245 -] {{1234567891240-0 {key1 value2}} {1234567891230-0 {key1 value1}}}
  306. }
  307. test {XREAD streamID edge (no-blocking)} {
  308. r del x
  309. r XADD x 1-1 f v
  310. r XADD x 1-18446744073709551615 f v
  311. r XADD x 2-1 f v
  312. set res [r XREAD BLOCK 0 STREAMS x 1-18446744073709551615]
  313. assert {[lindex $res 0 1 0] == {2-1 {f v}}}
  314. }
  315. test {XREAD streamID edge (blocking)} {
  316. r del x
  317. set rd [redis_deferring_client]
  318. $rd XREAD BLOCK 0 STREAMS x 1-18446744073709551615
  319. r XADD x 1-1 f v
  320. r XADD x 1-18446744073709551615 f v
  321. r XADD x 2-1 f v
  322. set res [$rd read]
  323. assert {[lindex $res 0 1 0] == {2-1 {f v}}}
  324. }
  325. test {XADD streamID edge} {
  326. r del x
  327. r XADD x 2577343934890-18446744073709551615 f v ;# we need the timestamp to be in the future
  328. r XADD x * f2 v2
  329. assert_equal [r XRANGE x - +] {{2577343934890-18446744073709551615 {f v}} {2577343934891-0 {f2 v2}}}
  330. }
  331. }
  332. start_server {tags {"stream"} overrides {appendonly yes}} {
  333. test {XADD with MAXLEN > xlen can propagate correctly} {
  334. for {set j 0} {$j < 100} {incr j} {
  335. r XADD mystream * xitem v
  336. }
  337. r XADD mystream MAXLEN 200 * xitem v
  338. incr j
  339. assert {[r xlen mystream] == $j}
  340. r debug loadaof
  341. r XADD mystream * xitem v
  342. incr j
  343. assert {[r xlen mystream] == $j}
  344. }
  345. }
  346. start_server {tags {"stream"} overrides {appendonly yes}} {
  347. test {XADD with ~ MAXLEN can propagate correctly} {
  348. for {set j 0} {$j < 100} {incr j} {
  349. r XADD mystream * xitem v
  350. }
  351. r XADD mystream MAXLEN ~ $j * xitem v
  352. incr j
  353. assert {[r xlen mystream] == $j}
  354. r config set stream-node-max-entries 1
  355. r debug loadaof
  356. r XADD mystream * xitem v
  357. incr j
  358. assert {[r xlen mystream] == $j}
  359. }
  360. }
  361. start_server {tags {"stream"} overrides {appendonly yes stream-node-max-entries 10}} {
  362. test {XTRIM with ~ MAXLEN can propagate correctly} {
  363. for {set j 0} {$j < 100} {incr j} {
  364. r XADD mystream * xitem v
  365. }
  366. r XTRIM mystream MAXLEN ~ 85
  367. assert {[r xlen mystream] == 90}
  368. r config set stream-node-max-entries 1
  369. r debug loadaof
  370. r XADD mystream * xitem v
  371. incr j
  372. assert {[r xlen mystream] == 91}
  373. }
  374. }
  375. start_server {tags {"stream xsetid"}} {
  376. test {XADD can CREATE an empty stream} {
  377. r XADD mystream MAXLEN 0 * a b
  378. assert {[dict get [r xinfo stream mystream] length] == 0}
  379. }
  380. test {XSETID can set a specific ID} {
  381. r XSETID mystream "200-0"
  382. assert {[dict get [r xinfo stream mystream] last-generated-id] == "200-0"}
  383. }
  384. test {XSETID cannot SETID with smaller ID} {
  385. r XADD mystream * a b
  386. catch {r XSETID mystream "1-1"} err
  387. r XADD mystream MAXLEN 0 * a b
  388. set err
  389. } {ERR*smaller*}
  390. test {XSETID cannot SETID on non-existent key} {
  391. catch {r XSETID stream 1-1} err
  392. set _ $err
  393. } {ERR no such key}
  394. }
  395. start_server {tags {"stream"} overrides {appendonly yes aof-use-rdb-preamble no}} {
  396. test {Empty stream can be rewrite into AOF correctly} {
  397. r XADD mystream MAXLEN 0 * a b
  398. assert {[dict get [r xinfo stream mystream] length] == 0}
  399. r bgrewriteaof
  400. waitForBgrewriteaof r
  401. r debug loadaof
  402. assert {[dict get [r xinfo stream mystream] length] == 0}
  403. }
  404. test {Stream can be rewrite into AOF correctly after XDEL lastid} {
  405. r XSETID mystream 0-0
  406. r XADD mystream 1-1 a b
  407. r XADD mystream 2-2 a b
  408. assert {[dict get [r xinfo stream mystream] length] == 2}
  409. r XDEL mystream 2-2
  410. r bgrewriteaof
  411. waitForBgrewriteaof r
  412. r debug loadaof
  413. assert {[dict get [r xinfo stream mystream] length] == 1}
  414. assert {[dict get [r xinfo stream mystream] last-generated-id] == "2-2"}
  415. }
  416. }