rtx_timer.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package sctp
  2. import (
  3. "math"
  4. "sync"
  5. "time"
  6. )
  7. const (
  8. rtoInitial float64 = 3.0 * 1000 // msec
  9. rtoMin float64 = 1.0 * 1000 // msec
  10. rtoMax float64 = 60.0 * 1000 // msec
  11. rtoAlpha float64 = 0.125
  12. rtoBeta float64 = 0.25
  13. maxInitRetrans uint = 8
  14. pathMaxRetrans uint = 5
  15. noMaxRetrans uint = 0
  16. )
  17. // rtoManager manages Rtx timeout values.
  18. // This is an implementation of RFC 4960 sec 6.3.1.
  19. type rtoManager struct {
  20. srtt float64
  21. rttvar float64
  22. rto float64
  23. noUpdate bool
  24. mutex sync.RWMutex
  25. }
  26. // newRTOManager creates a new rtoManager.
  27. func newRTOManager() *rtoManager {
  28. return &rtoManager{
  29. rto: rtoInitial,
  30. }
  31. }
  32. // setNewRTT takes a newly measured RTT then adjust the RTO in msec.
  33. func (m *rtoManager) setNewRTT(rtt float64) float64 {
  34. m.mutex.Lock()
  35. defer m.mutex.Unlock()
  36. if m.noUpdate {
  37. return m.srtt
  38. }
  39. if m.srtt == 0 {
  40. // First measurement
  41. m.srtt = rtt
  42. m.rttvar = rtt / 2
  43. } else {
  44. // Subsequent rtt measurement
  45. m.rttvar = (1-rtoBeta)*m.rttvar + rtoBeta*(math.Abs(m.srtt-rtt))
  46. m.srtt = (1-rtoAlpha)*m.srtt + rtoAlpha*rtt
  47. }
  48. m.rto = math.Min(math.Max(m.srtt+4*m.rttvar, rtoMin), rtoMax)
  49. return m.srtt
  50. }
  51. // getRTO simply returns the current RTO in msec.
  52. func (m *rtoManager) getRTO() float64 {
  53. m.mutex.RLock()
  54. defer m.mutex.RUnlock()
  55. return m.rto
  56. }
  57. // reset resets the RTO variables to the initial values.
  58. func (m *rtoManager) reset() {
  59. m.mutex.Lock()
  60. defer m.mutex.Unlock()
  61. if m.noUpdate {
  62. return
  63. }
  64. m.srtt = 0
  65. m.rttvar = 0
  66. m.rto = rtoInitial
  67. }
  68. // set RTO value for testing
  69. func (m *rtoManager) setRTO(rto float64, noUpdate bool) {
  70. m.mutex.Lock()
  71. defer m.mutex.Unlock()
  72. m.rto = rto
  73. m.noUpdate = noUpdate
  74. }
  75. // rtxTimerObserver is the inteface to a timer observer.
  76. // NOTE: Observers MUST NOT call start() or stop() method on rtxTimer
  77. // from within these callbacks.
  78. type rtxTimerObserver interface {
  79. onRetransmissionTimeout(timerID int, n uint)
  80. onRetransmissionFailure(timerID int)
  81. }
  82. // rtxTimer provides the retnransmission timer conforms with RFC 4960 Sec 6.3.1
  83. type rtxTimer struct {
  84. id int
  85. observer rtxTimerObserver
  86. maxRetrans uint
  87. stopFunc stopTimerLoop
  88. closed bool
  89. mutex sync.RWMutex
  90. }
  91. type stopTimerLoop func()
  92. // newRTXTimer creates a new retransmission timer.
  93. // if maxRetrans is set to 0, it will keep retransmitting until stop() is called.
  94. // (it will never make onRetransmissionFailure() callback.
  95. func newRTXTimer(id int, observer rtxTimerObserver, maxRetrans uint) *rtxTimer {
  96. return &rtxTimer{
  97. id: id,
  98. observer: observer,
  99. maxRetrans: maxRetrans,
  100. }
  101. }
  102. // start starts the timer.
  103. func (t *rtxTimer) start(rto float64) bool {
  104. t.mutex.Lock()
  105. defer t.mutex.Unlock()
  106. // this timer is already closed
  107. if t.closed {
  108. return false
  109. }
  110. // this is a noop if the timer is always running
  111. if t.stopFunc != nil {
  112. return false
  113. }
  114. // Note: rto value is intentionally not capped by RTO.Min to allow
  115. // fast timeout for the tests. Non-test code should pass in the
  116. // rto generated by rtoManager getRTO() method which caps the
  117. // value at RTO.Min or at RTO.Max.
  118. var nRtos uint
  119. cancelCh := make(chan struct{})
  120. go func() {
  121. canceling := false
  122. for !canceling {
  123. timeout := calculateNextTimeout(rto, nRtos)
  124. timer := time.NewTimer(time.Duration(timeout) * time.Millisecond)
  125. select {
  126. case <-timer.C:
  127. nRtos++
  128. if t.maxRetrans == 0 || nRtos <= t.maxRetrans {
  129. t.observer.onRetransmissionTimeout(t.id, nRtos)
  130. } else {
  131. t.stop()
  132. t.observer.onRetransmissionFailure(t.id)
  133. }
  134. case <-cancelCh:
  135. canceling = true
  136. timer.Stop()
  137. }
  138. }
  139. }()
  140. t.stopFunc = func() {
  141. close(cancelCh)
  142. }
  143. return true
  144. }
  145. // stop stops the timer.
  146. func (t *rtxTimer) stop() {
  147. t.mutex.Lock()
  148. defer t.mutex.Unlock()
  149. if t.stopFunc != nil {
  150. t.stopFunc()
  151. t.stopFunc = nil
  152. }
  153. }
  154. // closes the timer. this is similar to stop() but subsequent start() call
  155. // will fail (the timer is no longer usable)
  156. func (t *rtxTimer) close() {
  157. t.mutex.Lock()
  158. defer t.mutex.Unlock()
  159. if t.stopFunc != nil {
  160. t.stopFunc()
  161. t.stopFunc = nil
  162. }
  163. t.closed = true
  164. }
  165. // isRunning tests if the timer is running.
  166. // Debug purpose only
  167. func (t *rtxTimer) isRunning() bool {
  168. t.mutex.RLock()
  169. defer t.mutex.RUnlock()
  170. return (t.stopFunc != nil)
  171. }
  172. func calculateNextTimeout(rto float64, nRtos uint) float64 {
  173. // RFC 4096 sec 6.3.3. Handle T3-rtx Expiration
  174. // E2) For the destination address for which the timer expires, set RTO
  175. // <- RTO * 2 ("back off the timer"). The maximum value discussed
  176. // in rule C7 above (RTO.max) may be used to provide an upper bound
  177. // to this doubling operation.
  178. if nRtos < 31 {
  179. m := 1 << nRtos
  180. return math.Min(rto*float64(m), rtoMax)
  181. }
  182. return rtoMax
  183. }