redis.tcl 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. # Tcl client library - used by the Redis test
  2. # Copyright (C) 2009-2014 Salvatore Sanfilippo
  3. # Released under the BSD license like Redis itself
  4. #
  5. # Example usage:
  6. #
  7. # set r [redis 127.0.0.1 6379]
  8. # $r lpush mylist foo
  9. # $r lpush mylist bar
  10. # $r lrange mylist 0 -1
  11. # $r close
  12. #
  13. # Non blocking usage example:
  14. #
  15. # proc handlePong {r type reply} {
  16. # puts "PONG $type '$reply'"
  17. # if {$reply ne "PONG"} {
  18. # $r ping [list handlePong]
  19. # }
  20. # }
  21. #
  22. # set r [redis]
  23. # $r blocking 0
  24. # $r get fo [list handlePong]
  25. #
  26. # vwait forever
  27. package require Tcl 8.5
  28. package provide redis 0.1
  29. namespace eval redis {}
  30. set ::redis::id 0
  31. array set ::redis::fd {}
  32. array set ::redis::addr {}
  33. array set ::redis::blocking {}
  34. array set ::redis::deferred {}
  35. array set ::redis::readraw {}
  36. array set ::redis::reconnect {}
  37. array set ::redis::tls {}
  38. array set ::redis::callback {}
  39. array set ::redis::state {} ;# State in non-blocking reply reading
  40. array set ::redis::statestack {} ;# Stack of states, for nested mbulks
  41. proc redis {{server 127.0.0.1} {port 6379} {defer 0} {tls 0} {tlsoptions {}} {readraw 0}} {
  42. if {$tls} {
  43. package require tls
  44. ::tls::init \
  45. -cafile "$::tlsdir/ca.crt" \
  46. -certfile "$::tlsdir/client.crt" \
  47. -keyfile "$::tlsdir/client.key" \
  48. {*}$tlsoptions
  49. set fd [::tls::socket $server $port]
  50. } else {
  51. set fd [socket $server $port]
  52. }
  53. fconfigure $fd -translation binary
  54. set id [incr ::redis::id]
  55. set ::redis::fd($id) $fd
  56. set ::redis::addr($id) [list $server $port]
  57. set ::redis::blocking($id) 1
  58. set ::redis::deferred($id) $defer
  59. set ::redis::readraw($id) $readraw
  60. set ::redis::reconnect($id) 0
  61. set ::redis::tls($id) $tls
  62. ::redis::redis_reset_state $id
  63. interp alias {} ::redis::redisHandle$id {} ::redis::__dispatch__ $id
  64. }
  65. # This is a wrapper to the actual dispatching procedure that handles
  66. # reconnection if needed.
  67. proc ::redis::__dispatch__ {id method args} {
  68. set errorcode [catch {::redis::__dispatch__raw__ $id $method $args} retval]
  69. if {$errorcode && $::redis::reconnect($id) && $::redis::fd($id) eq {}} {
  70. # Try again if the connection was lost.
  71. # FIXME: we don't re-select the previously selected DB, nor we check
  72. # if we are inside a transaction that needs to be re-issued from
  73. # scratch.
  74. set errorcode [catch {::redis::__dispatch__raw__ $id $method $args} retval]
  75. }
  76. return -code $errorcode $retval
  77. }
  78. proc ::redis::__dispatch__raw__ {id method argv} {
  79. set fd $::redis::fd($id)
  80. # Reconnect the link if needed.
  81. if {$fd eq {} && $method ne {close}} {
  82. lassign $::redis::addr($id) host port
  83. if {$::redis::tls($id)} {
  84. set ::redis::fd($id) [::tls::socket $host $port]
  85. } else {
  86. set ::redis::fd($id) [socket $host $port]
  87. }
  88. fconfigure $::redis::fd($id) -translation binary
  89. set fd $::redis::fd($id)
  90. }
  91. set blocking $::redis::blocking($id)
  92. set deferred $::redis::deferred($id)
  93. if {$blocking == 0} {
  94. if {[llength $argv] == 0} {
  95. error "Please provide a callback in non-blocking mode"
  96. }
  97. set callback [lindex $argv end]
  98. set argv [lrange $argv 0 end-1]
  99. }
  100. if {[info command ::redis::__method__$method] eq {}} {
  101. set cmd "*[expr {[llength $argv]+1}]\r\n"
  102. append cmd "$[string length $method]\r\n$method\r\n"
  103. foreach a $argv {
  104. append cmd "$[string length $a]\r\n$a\r\n"
  105. }
  106. ::redis::redis_write $fd $cmd
  107. if {[catch {flush $fd}]} {
  108. catch {close $fd}
  109. set ::redis::fd($id) {}
  110. return -code error "I/O error reading reply"
  111. }
  112. if {!$deferred} {
  113. if {$blocking} {
  114. ::redis::redis_read_reply $id $fd
  115. } else {
  116. # Every well formed reply read will pop an element from this
  117. # list and use it as a callback. So pipelining is supported
  118. # in non blocking mode.
  119. lappend ::redis::callback($id) $callback
  120. fileevent $fd readable [list ::redis::redis_readable $fd $id]
  121. }
  122. }
  123. } else {
  124. uplevel 1 [list ::redis::__method__$method $id $fd] $argv
  125. }
  126. }
  127. proc ::redis::__method__blocking {id fd val} {
  128. set ::redis::blocking($id) $val
  129. fconfigure $fd -blocking $val
  130. }
  131. proc ::redis::__method__reconnect {id fd val} {
  132. set ::redis::reconnect($id) $val
  133. }
  134. proc ::redis::__method__read {id fd} {
  135. ::redis::redis_read_reply $id $fd
  136. }
  137. proc ::redis::__method__write {id fd buf} {
  138. ::redis::redis_write $fd $buf
  139. }
  140. proc ::redis::__method__flush {id fd} {
  141. flush $fd
  142. }
  143. proc ::redis::__method__close {id fd} {
  144. catch {close $fd}
  145. catch {unset ::redis::fd($id)}
  146. catch {unset ::redis::addr($id)}
  147. catch {unset ::redis::blocking($id)}
  148. catch {unset ::redis::deferred($id)}
  149. catch {unset ::redis::readraw($id)}
  150. catch {unset ::redis::reconnect($id)}
  151. catch {unset ::redis::tls($id)}
  152. catch {unset ::redis::state($id)}
  153. catch {unset ::redis::statestack($id)}
  154. catch {unset ::redis::callback($id)}
  155. catch {interp alias {} ::redis::redisHandle$id {}}
  156. }
  157. proc ::redis::__method__channel {id fd} {
  158. return $fd
  159. }
  160. proc ::redis::__method__deferred {id fd val} {
  161. set ::redis::deferred($id) $val
  162. }
  163. proc ::redis::__method__readraw {id fd val} {
  164. set ::redis::readraw($id) $val
  165. }
  166. proc ::redis::redis_write {fd buf} {
  167. puts -nonewline $fd $buf
  168. }
  169. proc ::redis::redis_writenl {fd buf} {
  170. redis_write $fd $buf
  171. redis_write $fd "\r\n"
  172. flush $fd
  173. }
  174. proc ::redis::redis_readnl {fd len} {
  175. set buf [read $fd $len]
  176. read $fd 2 ; # discard CR LF
  177. return $buf
  178. }
  179. proc ::redis::redis_bulk_read {fd} {
  180. set count [redis_read_line $fd]
  181. if {$count == -1} return {}
  182. set buf [redis_readnl $fd $count]
  183. return $buf
  184. }
  185. proc ::redis::redis_multi_bulk_read {id fd} {
  186. set count [redis_read_line $fd]
  187. if {$count == -1} return {}
  188. set l {}
  189. set err {}
  190. for {set i 0} {$i < $count} {incr i} {
  191. if {[catch {
  192. lappend l [redis_read_reply $id $fd]
  193. } e] && $err eq {}} {
  194. set err $e
  195. }
  196. }
  197. if {$err ne {}} {return -code error $err}
  198. return $l
  199. }
  200. proc ::redis::redis_read_map {id fd} {
  201. set count [redis_read_line $fd]
  202. if {$count == -1} return {}
  203. set d {}
  204. set err {}
  205. for {set i 0} {$i < $count} {incr i} {
  206. if {[catch {
  207. set k [redis_read_reply $id $fd] ; # key
  208. set v [redis_read_reply $id $fd] ; # value
  209. dict set d $k $v
  210. } e] && $err eq {}} {
  211. set err $e
  212. }
  213. }
  214. if {$err ne {}} {return -code error $err}
  215. return $d
  216. }
  217. proc ::redis::redis_read_line fd {
  218. string trim [gets $fd]
  219. }
  220. proc ::redis::redis_read_null fd {
  221. gets $fd
  222. return {}
  223. }
  224. proc ::redis::redis_read_bool fd {
  225. set v [redis_read_line $fd]
  226. if {$v == "t"} {return 1}
  227. if {$v == "f"} {return 0}
  228. return -code error "Bad protocol, '$v' as bool type"
  229. }
  230. proc ::redis::redis_read_verbatim_str fd {
  231. set v [redis_bulk_read $fd]
  232. # strip the first 4 chars ("txt:")
  233. return [string range $v 4 end]
  234. }
  235. proc ::redis::redis_read_reply {id fd} {
  236. if {$::redis::readraw($id)} {
  237. return [redis_read_line $fd]
  238. }
  239. while {1} {
  240. set type [read $fd 1]
  241. switch -exact -- $type {
  242. _ {return [redis_read_null $fd]}
  243. : -
  244. ( -
  245. + {return [redis_read_line $fd]}
  246. , {return [expr {double([redis_read_line $fd])}]}
  247. # {return [redis_read_bool $fd]}
  248. = {return [redis_read_verbatim_str $fd]}
  249. - {return -code error [redis_read_line $fd]}
  250. $ {return [redis_bulk_read $fd]}
  251. > -
  252. ~ -
  253. * {return [redis_multi_bulk_read $id $fd]}
  254. % {return [redis_read_map $id $fd]}
  255. | {
  256. # ignore attributes for now (nowhere to store them)
  257. redis_read_map $id $fd
  258. continue
  259. }
  260. default {
  261. if {$type eq {}} {
  262. catch {close $fd}
  263. set ::redis::fd($id) {}
  264. return -code error "I/O error reading reply"
  265. }
  266. return -code error "Bad protocol, '$type' as reply type byte"
  267. }
  268. }
  269. }
  270. }
  271. proc ::redis::redis_reset_state id {
  272. set ::redis::state($id) [dict create buf {} mbulk -1 bulk -1 reply {}]
  273. set ::redis::statestack($id) {}
  274. }
  275. proc ::redis::redis_call_callback {id type reply} {
  276. set cb [lindex $::redis::callback($id) 0]
  277. set ::redis::callback($id) [lrange $::redis::callback($id) 1 end]
  278. uplevel #0 $cb [list ::redis::redisHandle$id $type $reply]
  279. ::redis::redis_reset_state $id
  280. }
  281. # Read a reply in non-blocking mode.
  282. proc ::redis::redis_readable {fd id} {
  283. if {[eof $fd]} {
  284. redis_call_callback $id eof {}
  285. ::redis::__method__close $id $fd
  286. return
  287. }
  288. if {[dict get $::redis::state($id) bulk] == -1} {
  289. set line [gets $fd]
  290. if {$line eq {}} return ;# No complete line available, return
  291. switch -exact -- [string index $line 0] {
  292. : -
  293. + {redis_call_callback $id reply [string range $line 1 end-1]}
  294. - {redis_call_callback $id err [string range $line 1 end-1]}
  295. ( {redis_call_callback $id reply [string range $line 1 end-1]}
  296. $ {
  297. dict set ::redis::state($id) bulk \
  298. [expr [string range $line 1 end-1]+2]
  299. if {[dict get $::redis::state($id) bulk] == 1} {
  300. # We got a $-1, hack the state to play well with this.
  301. dict set ::redis::state($id) bulk 2
  302. dict set ::redis::state($id) buf "\r\n"
  303. ::redis::redis_readable $fd $id
  304. }
  305. }
  306. * {
  307. dict set ::redis::state($id) mbulk [string range $line 1 end-1]
  308. # Handle *-1
  309. if {[dict get $::redis::state($id) mbulk] == -1} {
  310. redis_call_callback $id reply {}
  311. }
  312. }
  313. default {
  314. redis_call_callback $id err \
  315. "Bad protocol, $type as reply type byte"
  316. }
  317. }
  318. } else {
  319. set totlen [dict get $::redis::state($id) bulk]
  320. set buflen [string length [dict get $::redis::state($id) buf]]
  321. set toread [expr {$totlen-$buflen}]
  322. set data [read $fd $toread]
  323. set nread [string length $data]
  324. dict append ::redis::state($id) buf $data
  325. # Check if we read a complete bulk reply
  326. if {[string length [dict get $::redis::state($id) buf]] ==
  327. [dict get $::redis::state($id) bulk]} {
  328. if {[dict get $::redis::state($id) mbulk] == -1} {
  329. redis_call_callback $id reply \
  330. [string range [dict get $::redis::state($id) buf] 0 end-2]
  331. } else {
  332. dict with ::redis::state($id) {
  333. lappend reply [string range $buf 0 end-2]
  334. incr mbulk -1
  335. set bulk -1
  336. }
  337. if {[dict get $::redis::state($id) mbulk] == 0} {
  338. redis_call_callback $id reply \
  339. [dict get $::redis::state($id) reply]
  340. }
  341. }
  342. }
  343. }
  344. }