123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463 |
- # return value is like strcmp() and similar.
- proc streamCompareID {a b} {
- if {$a eq $b} {return 0}
- lassign [split $a -] a_ms a_seq
- lassign [split $b -] b_ms b_seq
- if {$a_ms > $b_ms} {return 1}
- if {$a_ms < $b_ms} {return -1}
- # Same ms case, compare seq.
- if {$a_seq > $b_seq} {return 1}
- if {$a_seq < $b_seq} {return -1}
- }
- # return the ID immediately greater than the specified one.
- # Note that this function does not care to handle 'seq' overflow
- # since it's a 64 bit value.
- proc streamNextID {id} {
- lassign [split $id -] ms seq
- incr seq
- join [list $ms $seq] -
- }
- # Generate a random stream entry ID with the ms part between min and max
- # and a low sequence number (0 - 999 range), in order to stress test
- # XRANGE against a Tcl implementation implementing the same concept
- # with Tcl-only code in a linear array.
- proc streamRandomID {min_id max_id} {
- lassign [split $min_id -] min_ms min_seq
- lassign [split $max_id -] max_ms max_seq
- set delta [expr {$max_ms-$min_ms+1}]
- set ms [expr {$min_ms+[randomInt $delta]}]
- set seq [randomInt 1000]
- return $ms-$seq
- }
- # Tcl-side implementation of XRANGE to perform fuzz testing in the Redis
- # XRANGE implementation.
- proc streamSimulateXRANGE {items start end} {
- set res {}
- foreach i $items {
- set this_id [lindex $i 0]
- if {[streamCompareID $this_id $start] >= 0} {
- if {[streamCompareID $this_id $end] <= 0} {
- lappend res $i
- }
- }
- }
- return $res
- }
- set content {} ;# Will be populated with Tcl side copy of the stream content.
- start_server {
- tags {"stream"}
- } {
- test {XADD can add entries into a stream that XRANGE can fetch} {
- r XADD mystream * item 1 value a
- r XADD mystream * item 2 value b
- assert_equal 2 [r XLEN mystream]
- set items [r XRANGE mystream - +]
- assert_equal [lindex $items 0 1] {item 1 value a}
- assert_equal [lindex $items 1 1] {item 2 value b}
- }
- test {XADD IDs are incremental} {
- set id1 [r XADD mystream * item 1 value a]
- set id2 [r XADD mystream * item 2 value b]
- set id3 [r XADD mystream * item 3 value c]
- assert {[streamCompareID $id1 $id2] == -1}
- assert {[streamCompareID $id2 $id3] == -1}
- }
- test {XADD IDs are incremental when ms is the same as well} {
- r multi
- r XADD mystream * item 1 value a
- r XADD mystream * item 2 value b
- r XADD mystream * item 3 value c
- lassign [r exec] id1 id2 id3
- assert {[streamCompareID $id1 $id2] == -1}
- assert {[streamCompareID $id2 $id3] == -1}
- }
- test {XADD IDs correctly report an error when overflowing} {
- r DEL mystream
- r xadd mystream 18446744073709551615-18446744073709551615 a b
- assert_error ERR* {r xadd mystream * c d}
- }
- test {XADD with MAXLEN option} {
- r DEL mystream
- for {set j 0} {$j < 1000} {incr j} {
- if {rand() < 0.9} {
- r XADD mystream MAXLEN 5 * xitem $j
- } else {
- r XADD mystream MAXLEN 5 * yitem $j
- }
- }
- set res [r xrange mystream - +]
- set expected 995
- foreach r $res {
- assert {[lindex $r 1 1] == $expected}
- incr expected
- }
- }
- test {XADD mass insertion and XLEN} {
- r DEL mystream
- r multi
- for {set j 0} {$j < 10000} {incr j} {
- # From time to time insert a field with a different set
- # of fields in order to stress the stream compression code.
- if {rand() < 0.9} {
- r XADD mystream * item $j
- } else {
- r XADD mystream * item $j otherfield foo
- }
- }
- r exec
- set items [r XRANGE mystream - +]
- for {set j 0} {$j < 10000} {incr j} {
- assert {[lrange [lindex $items $j 1] 0 1] eq [list item $j]}
- }
- assert {[r xlen mystream] == $j}
- }
- test {XADD with ID 0-0} {
- r DEL otherstream
- catch {r XADD otherstream 0-0 k v} err
- assert {[r EXISTS otherstream] == 0}
- }
- test {XRANGE COUNT works as expected} {
- assert {[llength [r xrange mystream - + COUNT 10]] == 10}
- }
- test {XREVRANGE COUNT works as expected} {
- assert {[llength [r xrevrange mystream + - COUNT 10]] == 10}
- }
- test {XRANGE can be used to iterate the whole stream} {
- set last_id "-"
- set j 0
- while 1 {
- set elements [r xrange mystream $last_id + COUNT 100]
- if {[llength $elements] == 0} break
- foreach e $elements {
- assert {[lrange [lindex $e 1] 0 1] eq [list item $j]}
- incr j;
- }
- set last_id [streamNextID [lindex $elements end 0]]
- }
- assert {$j == 10000}
- }
- test {XREVRANGE returns the reverse of XRANGE} {
- assert {[r xrange mystream - +] == [lreverse [r xrevrange mystream + -]]}
- }
- test {XREAD with non empty stream} {
- set res [r XREAD COUNT 1 STREAMS mystream 0-0]
- assert {[lrange [lindex $res 0 1 0 1] 0 1] eq {item 0}}
- }
- test {Non blocking XREAD with empty streams} {
- set res [r XREAD STREAMS s1 s2 0-0 0-0]
- assert {$res eq {}}
- }
- test {XREAD with non empty second stream} {
- set res [r XREAD COUNT 1 STREAMS nostream mystream 0-0 0-0]
- assert {[lindex $res 0 0] eq {mystream}}
- assert {[lrange [lindex $res 0 1 0 1] 0 1] eq {item 0}}
- }
- test {Blocking XREAD waiting new data} {
- r XADD s2 * old abcd1234
- set rd [redis_deferring_client]
- $rd XREAD BLOCK 20000 STREAMS s1 s2 s3 $ $ $
- r XADD s2 * new abcd1234
- set res [$rd read]
- assert {[lindex $res 0 0] eq {s2}}
- assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
- }
- test {Blocking XREAD waiting old data} {
- set rd [redis_deferring_client]
- $rd XREAD BLOCK 20000 STREAMS s1 s2 s3 $ 0-0 $
- r XADD s2 * foo abcd1234
- set res [$rd read]
- assert {[lindex $res 0 0] eq {s2}}
- assert {[lindex $res 0 1 0 1] eq {old abcd1234}}
- }
- test {Blocking XREAD will not reply with an empty array} {
- r del s1
- r XADD s1 666 f v
- r XADD s1 667 f2 v2
- r XDEL s1 667
- set rd [redis_deferring_client]
- $rd XREAD BLOCK 10 STREAMS s1 666
- after 20
- assert {[$rd read] == {}} ;# before the fix, client didn't even block, but was served synchronously with {s1 {}}
- }
- test "XREAD: XADD + DEL should not awake client" {
- set rd [redis_deferring_client]
- r del s1
- $rd XREAD BLOCK 20000 STREAMS s1 $
- r multi
- r XADD s1 * old abcd1234
- r DEL s1
- r exec
- r XADD s1 * new abcd1234
- set res [$rd read]
- assert {[lindex $res 0 0] eq {s1}}
- assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
- }
- test "XREAD: XADD + DEL + LPUSH should not awake client" {
- set rd [redis_deferring_client]
- r del s1
- $rd XREAD BLOCK 20000 STREAMS s1 $
- r multi
- r XADD s1 * old abcd1234
- r DEL s1
- r LPUSH s1 foo bar
- r exec
- r DEL s1
- r XADD s1 * new abcd1234
- set res [$rd read]
- assert {[lindex $res 0 0] eq {s1}}
- assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
- }
- test {XREAD with same stream name multiple times should work} {
- r XADD s2 * old abcd1234
- set rd [redis_deferring_client]
- $rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $
- r XADD s2 * new abcd1234
- set res [$rd read]
- assert {[lindex $res 0 0] eq {s2}}
- assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
- }
- test {XREAD + multiple XADD inside transaction} {
- r XADD s2 * old abcd1234
- set rd [redis_deferring_client]
- $rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $
- r MULTI
- r XADD s2 * field one
- r XADD s2 * field two
- r XADD s2 * field three
- r EXEC
- set res [$rd read]
- assert {[lindex $res 0 0] eq {s2}}
- assert {[lindex $res 0 1 0 1] eq {field one}}
- assert {[lindex $res 0 1 1 1] eq {field two}}
- }
- test {XDEL basic test} {
- r del somestream
- r xadd somestream * foo value0
- set id [r xadd somestream * foo value1]
- r xadd somestream * foo value2
- r xdel somestream $id
- assert {[r xlen somestream] == 2}
- set result [r xrange somestream - +]
- assert {[lindex $result 0 1 1] eq {value0}}
- assert {[lindex $result 1 1 1] eq {value2}}
- }
- # Here the idea is to check the consistency of the stream data structure
- # as we remove all the elements down to zero elements.
- test {XDEL fuzz test} {
- r del somestream
- set ids {}
- set x 0; # Length of the stream
- while 1 {
- lappend ids [r xadd somestream * item $x]
- incr x
- # Add enough elements to have a few radix tree nodes inside the stream.
- if {[dict get [r xinfo stream somestream] radix-tree-keys] > 20} break
- }
- # Now remove all the elements till we reach an empty stream
- # and after every deletion, check that the stream is sane enough
- # to report the right number of elements with XRANGE: this will also
- # force accessing the whole data structure to check sanity.
- assert {[r xlen somestream] == $x}
- # We want to remove elements in random order to really test the
- # implementation in a better way.
- set ids [lshuffle $ids]
- foreach id $ids {
- assert {[r xdel somestream $id] == 1}
- incr x -1
- assert {[r xlen somestream] == $x}
- # The test would be too slow calling XRANGE for every iteration.
- # Do it every 100 removal.
- if {$x % 100 == 0} {
- set res [r xrange somestream - +]
- assert {[llength $res] == $x}
- }
- }
- }
- test {XRANGE fuzzing} {
- set low_id [lindex $items 0 0]
- set high_id [lindex $items end 0]
- for {set j 0} {$j < 100} {incr j} {
- set start [streamRandomID $low_id $high_id]
- set end [streamRandomID $low_id $high_id]
- set range [r xrange mystream $start $end]
- set tcl_range [streamSimulateXRANGE $items $start $end]
- if {$range ne $tcl_range} {
- puts "*** WARNING *** - XRANGE fuzzing mismatch: $start - $end"
- puts "---"
- puts "XRANGE: '$range'"
- puts "---"
- puts "TCL: '$tcl_range'"
- puts "---"
- fail "XRANGE fuzzing failed, check logs for details"
- }
- }
- }
- test {XREVRANGE regression test for issue #5006} {
- # Add non compressed entries
- r xadd teststream 1234567891230 key1 value1
- r xadd teststream 1234567891240 key2 value2
- r xadd teststream 1234567891250 key3 value3
- # Add SAMEFIELD compressed entries
- r xadd teststream2 1234567891230 key1 value1
- r xadd teststream2 1234567891240 key1 value2
- r xadd teststream2 1234567891250 key1 value3
- assert_equal [r xrevrange teststream 1234567891245 -] {{1234567891240-0 {key2 value2}} {1234567891230-0 {key1 value1}}}
- assert_equal [r xrevrange teststream2 1234567891245 -] {{1234567891240-0 {key1 value2}} {1234567891230-0 {key1 value1}}}
- }
- test {XREAD streamID edge (no-blocking)} {
- r del x
- r XADD x 1-1 f v
- r XADD x 1-18446744073709551615 f v
- r XADD x 2-1 f v
- set res [r XREAD BLOCK 0 STREAMS x 1-18446744073709551615]
- assert {[lindex $res 0 1 0] == {2-1 {f v}}}
- }
- test {XREAD streamID edge (blocking)} {
- r del x
- set rd [redis_deferring_client]
- $rd XREAD BLOCK 0 STREAMS x 1-18446744073709551615
- r XADD x 1-1 f v
- r XADD x 1-18446744073709551615 f v
- r XADD x 2-1 f v
- set res [$rd read]
- assert {[lindex $res 0 1 0] == {2-1 {f v}}}
- }
- test {XADD streamID edge} {
- r del x
- r XADD x 2577343934890-18446744073709551615 f v ;# we need the timestamp to be in the future
- r XADD x * f2 v2
- assert_equal [r XRANGE x - +] {{2577343934890-18446744073709551615 {f v}} {2577343934891-0 {f2 v2}}}
- }
- }
- start_server {tags {"stream"} overrides {appendonly yes}} {
- test {XADD with MAXLEN > xlen can propagate correctly} {
- for {set j 0} {$j < 100} {incr j} {
- r XADD mystream * xitem v
- }
- r XADD mystream MAXLEN 200 * xitem v
- incr j
- assert {[r xlen mystream] == $j}
- r debug loadaof
- r XADD mystream * xitem v
- incr j
- assert {[r xlen mystream] == $j}
- }
- }
- start_server {tags {"stream"} overrides {appendonly yes}} {
- test {XADD with ~ MAXLEN can propagate correctly} {
- for {set j 0} {$j < 100} {incr j} {
- r XADD mystream * xitem v
- }
- r XADD mystream MAXLEN ~ $j * xitem v
- incr j
- assert {[r xlen mystream] == $j}
- r config set stream-node-max-entries 1
- r debug loadaof
- r XADD mystream * xitem v
- incr j
- assert {[r xlen mystream] == $j}
- }
- }
- start_server {tags {"stream"} overrides {appendonly yes stream-node-max-entries 10}} {
- test {XTRIM with ~ MAXLEN can propagate correctly} {
- for {set j 0} {$j < 100} {incr j} {
- r XADD mystream * xitem v
- }
- r XTRIM mystream MAXLEN ~ 85
- assert {[r xlen mystream] == 90}
- r config set stream-node-max-entries 1
- r debug loadaof
- r XADD mystream * xitem v
- incr j
- assert {[r xlen mystream] == 91}
- }
- }
- start_server {tags {"stream xsetid"}} {
- test {XADD can CREATE an empty stream} {
- r XADD mystream MAXLEN 0 * a b
- assert {[dict get [r xinfo stream mystream] length] == 0}
- }
- test {XSETID can set a specific ID} {
- r XSETID mystream "200-0"
- assert {[dict get [r xinfo stream mystream] last-generated-id] == "200-0"}
- }
- test {XSETID cannot SETID with smaller ID} {
- r XADD mystream * a b
- catch {r XSETID mystream "1-1"} err
- r XADD mystream MAXLEN 0 * a b
- set err
- } {ERR*smaller*}
- test {XSETID cannot SETID on non-existent key} {
- catch {r XSETID stream 1-1} err
- set _ $err
- } {ERR no such key}
- }
- start_server {tags {"stream"} overrides {appendonly yes aof-use-rdb-preamble no}} {
- test {Empty stream can be rewrite into AOF correctly} {
- r XADD mystream MAXLEN 0 * a b
- assert {[dict get [r xinfo stream mystream] length] == 0}
- r bgrewriteaof
- waitForBgrewriteaof r
- r debug loadaof
- assert {[dict get [r xinfo stream mystream] length] == 0}
- }
- test {Stream can be rewrite into AOF correctly after XDEL lastid} {
- r XSETID mystream 0-0
- r XADD mystream 1-1 a b
- r XADD mystream 2-2 a b
- assert {[dict get [r xinfo stream mystream] length] == 2}
- r XDEL mystream 2-2
- r bgrewriteaof
- waitForBgrewriteaof r
- r debug loadaof
- assert {[dict get [r xinfo stream mystream] length] == 1}
- assert {[dict get [r xinfo stream mystream] last-generated-id] == "2-2"}
- }
- }
|