redis.tcl 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. # Tcl client library - used by the Redis test
  2. # Copyright (C) 2009-2014 Salvatore Sanfilippo
  3. # Released under the BSD license like Redis itself
  4. #
  5. # Example usage:
  6. #
  7. # set r [redis 127.0.0.1 6379]
  8. # $r lpush mylist foo
  9. # $r lpush mylist bar
  10. # $r lrange mylist 0 -1
  11. # $r close
  12. #
  13. # Non blocking usage example:
  14. #
  15. # proc handlePong {r type reply} {
  16. # puts "PONG $type '$reply'"
  17. # if {$reply ne "PONG"} {
  18. # $r ping [list handlePong]
  19. # }
  20. # }
  21. #
  22. # set r [redis]
  23. # $r blocking 0
  24. # $r get fo [list handlePong]
  25. #
  26. # vwait forever
  27. package require Tcl 8.5
  28. package provide redis 0.1
  29. namespace eval redis {}
  30. set ::redis::id 0
  31. array set ::redis::fd {}
  32. array set ::redis::addr {}
  33. array set ::redis::blocking {}
  34. array set ::redis::deferred {}
  35. array set ::redis::reconnect {}
  36. array set ::redis::callback {}
  37. array set ::redis::state {} ;# State in non-blocking reply reading
  38. array set ::redis::statestack {} ;# Stack of states, for nested mbulks
  39. proc redis {{server 127.0.0.1} {port 6379} {defer 0} {tls 0} {tlsoptions {}}} {
  40. if {$tls} {
  41. package require tls
  42. ::tls::init \
  43. -cafile "$::tlsdir/ca.crt" \
  44. -certfile "$::tlsdir/redis.crt" \
  45. -keyfile "$::tlsdir/redis.key" \
  46. {*}$tlsoptions
  47. set fd [::tls::socket $server $port]
  48. } else {
  49. set fd [socket $server $port]
  50. }
  51. fconfigure $fd -translation binary
  52. set id [incr ::redis::id]
  53. set ::redis::fd($id) $fd
  54. set ::redis::addr($id) [list $server $port]
  55. set ::redis::blocking($id) 1
  56. set ::redis::deferred($id) $defer
  57. set ::redis::reconnect($id) 0
  58. set ::redis::tls $tls
  59. ::redis::redis_reset_state $id
  60. interp alias {} ::redis::redisHandle$id {} ::redis::__dispatch__ $id
  61. }
  62. # This is a wrapper to the actual dispatching procedure that handles
  63. # reconnection if needed.
  64. proc ::redis::__dispatch__ {id method args} {
  65. set errorcode [catch {::redis::__dispatch__raw__ $id $method $args} retval]
  66. if {$errorcode && $::redis::reconnect($id) && $::redis::fd($id) eq {}} {
  67. # Try again if the connection was lost.
  68. # FIXME: we don't re-select the previously selected DB, nor we check
  69. # if we are inside a transaction that needs to be re-issued from
  70. # scratch.
  71. set errorcode [catch {::redis::__dispatch__raw__ $id $method $args} retval]
  72. }
  73. return -code $errorcode $retval
  74. }
  75. proc ::redis::__dispatch__raw__ {id method argv} {
  76. set fd $::redis::fd($id)
  77. # Reconnect the link if needed.
  78. if {$fd eq {}} {
  79. lassign $::redis::addr($id) host port
  80. if {$::redis::tls} {
  81. set ::redis::fd($id) [::tls::socket $host $port]
  82. } else {
  83. set ::redis::fd($id) [socket $host $port]
  84. }
  85. fconfigure $::redis::fd($id) -translation binary
  86. set fd $::redis::fd($id)
  87. }
  88. set blocking $::redis::blocking($id)
  89. set deferred $::redis::deferred($id)
  90. if {$blocking == 0} {
  91. if {[llength $argv] == 0} {
  92. error "Please provide a callback in non-blocking mode"
  93. }
  94. set callback [lindex $argv end]
  95. set argv [lrange $argv 0 end-1]
  96. }
  97. if {[info command ::redis::__method__$method] eq {}} {
  98. set cmd "*[expr {[llength $argv]+1}]\r\n"
  99. append cmd "$[string length $method]\r\n$method\r\n"
  100. foreach a $argv {
  101. append cmd "$[string length $a]\r\n$a\r\n"
  102. }
  103. ::redis::redis_write $fd $cmd
  104. if {[catch {flush $fd}]} {
  105. set ::redis::fd($id) {}
  106. return -code error "I/O error reading reply"
  107. }
  108. if {!$deferred} {
  109. if {$blocking} {
  110. ::redis::redis_read_reply $id $fd
  111. } else {
  112. # Every well formed reply read will pop an element from this
  113. # list and use it as a callback. So pipelining is supported
  114. # in non blocking mode.
  115. lappend ::redis::callback($id) $callback
  116. fileevent $fd readable [list ::redis::redis_readable $fd $id]
  117. }
  118. }
  119. } else {
  120. uplevel 1 [list ::redis::__method__$method $id $fd] $argv
  121. }
  122. }
  123. proc ::redis::__method__blocking {id fd val} {
  124. set ::redis::blocking($id) $val
  125. fconfigure $fd -blocking $val
  126. }
  127. proc ::redis::__method__reconnect {id fd val} {
  128. set ::redis::reconnect($id) $val
  129. }
  130. proc ::redis::__method__read {id fd} {
  131. ::redis::redis_read_reply $id $fd
  132. }
  133. proc ::redis::__method__write {id fd buf} {
  134. ::redis::redis_write $fd $buf
  135. }
  136. proc ::redis::__method__flush {id fd} {
  137. flush $fd
  138. }
  139. proc ::redis::__method__close {id fd} {
  140. catch {close $fd}
  141. catch {unset ::redis::fd($id)}
  142. catch {unset ::redis::addr($id)}
  143. catch {unset ::redis::blocking($id)}
  144. catch {unset ::redis::deferred($id)}
  145. catch {unset ::redis::reconnect($id)}
  146. catch {unset ::redis::state($id)}
  147. catch {unset ::redis::statestack($id)}
  148. catch {unset ::redis::callback($id)}
  149. catch {interp alias {} ::redis::redisHandle$id {}}
  150. }
  151. proc ::redis::__method__channel {id fd} {
  152. return $fd
  153. }
  154. proc ::redis::__method__deferred {id fd val} {
  155. set ::redis::deferred($id) $val
  156. }
  157. proc ::redis::redis_write {fd buf} {
  158. puts -nonewline $fd $buf
  159. }
  160. proc ::redis::redis_writenl {fd buf} {
  161. redis_write $fd $buf
  162. redis_write $fd "\r\n"
  163. flush $fd
  164. }
  165. proc ::redis::redis_readnl {fd len} {
  166. set buf [read $fd $len]
  167. read $fd 2 ; # discard CR LF
  168. return $buf
  169. }
  170. proc ::redis::redis_bulk_read {fd} {
  171. set count [redis_read_line $fd]
  172. if {$count == -1} return {}
  173. set buf [redis_readnl $fd $count]
  174. return $buf
  175. }
  176. proc ::redis::redis_multi_bulk_read {id fd} {
  177. set count [redis_read_line $fd]
  178. if {$count == -1} return {}
  179. set l {}
  180. set err {}
  181. for {set i 0} {$i < $count} {incr i} {
  182. if {[catch {
  183. lappend l [redis_read_reply $id $fd]
  184. } e] && $err eq {}} {
  185. set err $e
  186. }
  187. }
  188. if {$err ne {}} {return -code error $err}
  189. return $l
  190. }
  191. proc ::redis::redis_read_line fd {
  192. string trim [gets $fd]
  193. }
  194. proc ::redis::redis_read_reply {id fd} {
  195. set type [read $fd 1]
  196. switch -exact -- $type {
  197. : -
  198. + {redis_read_line $fd}
  199. - {return -code error [redis_read_line $fd]}
  200. $ {redis_bulk_read $fd}
  201. * {redis_multi_bulk_read $id $fd}
  202. default {
  203. if {$type eq {}} {
  204. set ::redis::fd($id) {}
  205. return -code error "I/O error reading reply"
  206. }
  207. return -code error "Bad protocol, '$type' as reply type byte"
  208. }
  209. }
  210. }
  211. proc ::redis::redis_reset_state id {
  212. set ::redis::state($id) [dict create buf {} mbulk -1 bulk -1 reply {}]
  213. set ::redis::statestack($id) {}
  214. }
  215. proc ::redis::redis_call_callback {id type reply} {
  216. set cb [lindex $::redis::callback($id) 0]
  217. set ::redis::callback($id) [lrange $::redis::callback($id) 1 end]
  218. uplevel #0 $cb [list ::redis::redisHandle$id $type $reply]
  219. ::redis::redis_reset_state $id
  220. }
  221. # Read a reply in non-blocking mode.
  222. proc ::redis::redis_readable {fd id} {
  223. if {[eof $fd]} {
  224. redis_call_callback $id eof {}
  225. ::redis::__method__close $id $fd
  226. return
  227. }
  228. if {[dict get $::redis::state($id) bulk] == -1} {
  229. set line [gets $fd]
  230. if {$line eq {}} return ;# No complete line available, return
  231. switch -exact -- [string index $line 0] {
  232. : -
  233. + {redis_call_callback $id reply [string range $line 1 end-1]}
  234. - {redis_call_callback $id err [string range $line 1 end-1]}
  235. $ {
  236. dict set ::redis::state($id) bulk \
  237. [expr [string range $line 1 end-1]+2]
  238. if {[dict get $::redis::state($id) bulk] == 1} {
  239. # We got a $-1, hack the state to play well with this.
  240. dict set ::redis::state($id) bulk 2
  241. dict set ::redis::state($id) buf "\r\n"
  242. ::redis::redis_readable $fd $id
  243. }
  244. }
  245. * {
  246. dict set ::redis::state($id) mbulk [string range $line 1 end-1]
  247. # Handle *-1
  248. if {[dict get $::redis::state($id) mbulk] == -1} {
  249. redis_call_callback $id reply {}
  250. }
  251. }
  252. default {
  253. redis_call_callback $id err \
  254. "Bad protocol, $type as reply type byte"
  255. }
  256. }
  257. } else {
  258. set totlen [dict get $::redis::state($id) bulk]
  259. set buflen [string length [dict get $::redis::state($id) buf]]
  260. set toread [expr {$totlen-$buflen}]
  261. set data [read $fd $toread]
  262. set nread [string length $data]
  263. dict append ::redis::state($id) buf $data
  264. # Check if we read a complete bulk reply
  265. if {[string length [dict get $::redis::state($id) buf]] ==
  266. [dict get $::redis::state($id) bulk]} {
  267. if {[dict get $::redis::state($id) mbulk] == -1} {
  268. redis_call_callback $id reply \
  269. [string range [dict get $::redis::state($id) buf] 0 end-2]
  270. } else {
  271. dict with ::redis::state($id) {
  272. lappend reply [string range $buf 0 end-2]
  273. incr mbulk -1
  274. set bulk -1
  275. }
  276. if {[dict get $::redis::state($id) mbulk] == 0} {
  277. redis_call_callback $id reply \
  278. [dict get $::redis::state($id) reply]
  279. }
  280. }
  281. }
  282. }
  283. }