redis.tcl 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. # Tcl client library - used by the Redis test
  2. # Copyright (C) 2009-2014 Salvatore Sanfilippo
  3. # Released under the BSD license like Redis itself
  4. #
  5. # Example usage:
  6. #
  7. # set r [redis 127.0.0.1 6379]
  8. # $r lpush mylist foo
  9. # $r lpush mylist bar
  10. # $r lrange mylist 0 -1
  11. # $r close
  12. #
  13. # Non blocking usage example:
  14. #
  15. # proc handlePong {r type reply} {
  16. # puts "PONG $type '$reply'"
  17. # if {$reply ne "PONG"} {
  18. # $r ping [list handlePong]
  19. # }
  20. # }
  21. #
  22. # set r [redis]
  23. # $r blocking 0
  24. # $r get fo [list handlePong]
  25. #
  26. # vwait forever
  27. package require Tcl 8.5
  28. package provide redis 0.1
  29. namespace eval redis {}
  30. set ::redis::id 0
  31. array set ::redis::fd {}
  32. array set ::redis::addr {}
  33. array set ::redis::blocking {}
  34. array set ::redis::deferred {}
  35. array set ::redis::reconnect {}
  36. array set ::redis::callback {}
  37. array set ::redis::state {} ;# State in non-blocking reply reading
  38. array set ::redis::statestack {} ;# Stack of states, for nested mbulks
  39. proc redis {{server 127.0.0.1} {port 6379} {defer 0}} {
  40. set fd [socket $server $port]
  41. fconfigure $fd -translation binary
  42. set id [incr ::redis::id]
  43. set ::redis::fd($id) $fd
  44. set ::redis::addr($id) [list $server $port]
  45. set ::redis::blocking($id) 1
  46. set ::redis::deferred($id) $defer
  47. set ::redis::reconnect($id) 0
  48. ::redis::redis_reset_state $id
  49. interp alias {} ::redis::redisHandle$id {} ::redis::__dispatch__ $id
  50. }
  51. # This is a wrapper to the actual dispatching procedure that handles
  52. # reconnection if needed.
  53. proc ::redis::__dispatch__ {id method args} {
  54. set errorcode [catch {::redis::__dispatch__raw__ $id $method $args} retval]
  55. if {$errorcode && $::redis::reconnect($id) && $::redis::fd($id) eq {}} {
  56. # Try again if the connection was lost.
  57. # FIXME: we don't re-select the previously selected DB, nor we check
  58. # if we are inside a transaction that needs to be re-issued from
  59. # scratch.
  60. set errorcode [catch {::redis::__dispatch__raw__ $id $method $args} retval]
  61. }
  62. return -code $errorcode $retval
  63. }
  64. proc ::redis::__dispatch__raw__ {id method argv} {
  65. set fd $::redis::fd($id)
  66. # Reconnect the link if needed.
  67. if {$fd eq {}} {
  68. lassign $::redis::addr($id) host port
  69. set ::redis::fd($id) [socket $host $port]
  70. fconfigure $::redis::fd($id) -translation binary
  71. set fd $::redis::fd($id)
  72. }
  73. set blocking $::redis::blocking($id)
  74. set deferred $::redis::deferred($id)
  75. if {$blocking == 0} {
  76. if {[llength $argv] == 0} {
  77. error "Please provide a callback in non-blocking mode"
  78. }
  79. set callback [lindex $argv end]
  80. set argv [lrange $argv 0 end-1]
  81. }
  82. if {[info command ::redis::__method__$method] eq {}} {
  83. set cmd "*[expr {[llength $argv]+1}]\r\n"
  84. append cmd "$[string length $method]\r\n$method\r\n"
  85. foreach a $argv {
  86. append cmd "$[string length $a]\r\n$a\r\n"
  87. }
  88. ::redis::redis_write $fd $cmd
  89. if {[catch {flush $fd}]} {
  90. set ::redis::fd($id) {}
  91. return -code error "I/O error reading reply"
  92. }
  93. if {!$deferred} {
  94. if {$blocking} {
  95. ::redis::redis_read_reply $id $fd
  96. } else {
  97. # Every well formed reply read will pop an element from this
  98. # list and use it as a callback. So pipelining is supported
  99. # in non blocking mode.
  100. lappend ::redis::callback($id) $callback
  101. fileevent $fd readable [list ::redis::redis_readable $fd $id]
  102. }
  103. }
  104. } else {
  105. uplevel 1 [list ::redis::__method__$method $id $fd] $argv
  106. }
  107. }
  108. proc ::redis::__method__blocking {id fd val} {
  109. set ::redis::blocking($id) $val
  110. fconfigure $fd -blocking $val
  111. }
  112. proc ::redis::__method__reconnect {id fd val} {
  113. set ::redis::reconnect($id) $val
  114. }
  115. proc ::redis::__method__read {id fd} {
  116. ::redis::redis_read_reply $id $fd
  117. }
  118. proc ::redis::__method__write {id fd buf} {
  119. ::redis::redis_write $fd $buf
  120. }
  121. proc ::redis::__method__flush {id fd} {
  122. flush $fd
  123. }
  124. proc ::redis::__method__close {id fd} {
  125. catch {close $fd}
  126. catch {unset ::redis::fd($id)}
  127. catch {unset ::redis::addr($id)}
  128. catch {unset ::redis::blocking($id)}
  129. catch {unset ::redis::deferred($id)}
  130. catch {unset ::redis::reconnect($id)}
  131. catch {unset ::redis::state($id)}
  132. catch {unset ::redis::statestack($id)}
  133. catch {unset ::redis::callback($id)}
  134. catch {interp alias {} ::redis::redisHandle$id {}}
  135. }
  136. proc ::redis::__method__channel {id fd} {
  137. return $fd
  138. }
  139. proc ::redis::__method__deferred {id fd val} {
  140. set ::redis::deferred($id) $val
  141. }
  142. proc ::redis::redis_write {fd buf} {
  143. puts -nonewline $fd $buf
  144. }
  145. proc ::redis::redis_writenl {fd buf} {
  146. redis_write $fd $buf
  147. redis_write $fd "\r\n"
  148. flush $fd
  149. }
  150. proc ::redis::redis_readnl {fd len} {
  151. set buf [read $fd $len]
  152. read $fd 2 ; # discard CR LF
  153. return $buf
  154. }
  155. proc ::redis::redis_bulk_read {fd} {
  156. set count [redis_read_line $fd]
  157. if {$count == -1} return {}
  158. set buf [redis_readnl $fd $count]
  159. return $buf
  160. }
  161. proc ::redis::redis_multi_bulk_read {id fd} {
  162. set count [redis_read_line $fd]
  163. if {$count == -1} return {}
  164. set l {}
  165. set err {}
  166. for {set i 0} {$i < $count} {incr i} {
  167. if {[catch {
  168. lappend l [redis_read_reply $id $fd]
  169. } e] && $err eq {}} {
  170. set err $e
  171. }
  172. }
  173. if {$err ne {}} {return -code error $err}
  174. return $l
  175. }
  176. proc ::redis::redis_read_line fd {
  177. string trim [gets $fd]
  178. }
  179. proc ::redis::redis_read_reply {id fd} {
  180. set type [read $fd 1]
  181. switch -exact -- $type {
  182. : -
  183. + {redis_read_line $fd}
  184. - {return -code error [redis_read_line $fd]}
  185. $ {redis_bulk_read $fd}
  186. * {redis_multi_bulk_read $id $fd}
  187. default {
  188. if {$type eq {}} {
  189. set ::redis::fd($id) {}
  190. return -code error "I/O error reading reply"
  191. }
  192. return -code error "Bad protocol, '$type' as reply type byte"
  193. }
  194. }
  195. }
  196. proc ::redis::redis_reset_state id {
  197. set ::redis::state($id) [dict create buf {} mbulk -1 bulk -1 reply {}]
  198. set ::redis::statestack($id) {}
  199. }
  200. proc ::redis::redis_call_callback {id type reply} {
  201. set cb [lindex $::redis::callback($id) 0]
  202. set ::redis::callback($id) [lrange $::redis::callback($id) 1 end]
  203. uplevel #0 $cb [list ::redis::redisHandle$id $type $reply]
  204. ::redis::redis_reset_state $id
  205. }
  206. # Read a reply in non-blocking mode.
  207. proc ::redis::redis_readable {fd id} {
  208. if {[eof $fd]} {
  209. redis_call_callback $id eof {}
  210. ::redis::__method__close $id $fd
  211. return
  212. }
  213. if {[dict get $::redis::state($id) bulk] == -1} {
  214. set line [gets $fd]
  215. if {$line eq {}} return ;# No complete line available, return
  216. switch -exact -- [string index $line 0] {
  217. : -
  218. + {redis_call_callback $id reply [string range $line 1 end-1]}
  219. - {redis_call_callback $id err [string range $line 1 end-1]}
  220. $ {
  221. dict set ::redis::state($id) bulk \
  222. [expr [string range $line 1 end-1]+2]
  223. if {[dict get $::redis::state($id) bulk] == 1} {
  224. # We got a $-1, hack the state to play well with this.
  225. dict set ::redis::state($id) bulk 2
  226. dict set ::redis::state($id) buf "\r\n"
  227. ::redis::redis_readable $fd $id
  228. }
  229. }
  230. * {
  231. dict set ::redis::state($id) mbulk [string range $line 1 end-1]
  232. # Handle *-1
  233. if {[dict get $::redis::state($id) mbulk] == -1} {
  234. redis_call_callback $id reply {}
  235. }
  236. }
  237. default {
  238. redis_call_callback $id err \
  239. "Bad protocol, $type as reply type byte"
  240. }
  241. }
  242. } else {
  243. set totlen [dict get $::redis::state($id) bulk]
  244. set buflen [string length [dict get $::redis::state($id) buf]]
  245. set toread [expr {$totlen-$buflen}]
  246. set data [read $fd $toread]
  247. set nread [string length $data]
  248. dict append ::redis::state($id) buf $data
  249. # Check if we read a complete bulk reply
  250. if {[string length [dict get $::redis::state($id) buf]] ==
  251. [dict get $::redis::state($id) bulk]} {
  252. if {[dict get $::redis::state($id) mbulk] == -1} {
  253. redis_call_callback $id reply \
  254. [string range [dict get $::redis::state($id) buf] 0 end-2]
  255. } else {
  256. dict with ::redis::state($id) {
  257. lappend reply [string range $buf 0 end-2]
  258. incr mbulk -1
  259. set bulk -1
  260. }
  261. if {[dict get $::redis::state($id) mbulk] == 0} {
  262. redis_call_callback $id reply \
  263. [dict get $::redis::state($id) reply]
  264. }
  265. }
  266. }
  267. }
  268. }