123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 |
- package sctp
- import (
- "math"
- "sync"
- "time"
- )
- const (
- rtoInitial float64 = 3.0 * 1000 // msec
- rtoMin float64 = 1.0 * 1000 // msec
- rtoMax float64 = 60.0 * 1000 // msec
- rtoAlpha float64 = 0.125
- rtoBeta float64 = 0.25
- maxInitRetrans uint = 8
- pathMaxRetrans uint = 5
- noMaxRetrans uint = 0
- )
- // rtoManager manages Rtx timeout values.
- // This is an implementation of RFC 4960 sec 6.3.1.
- type rtoManager struct {
- srtt float64
- rttvar float64
- rto float64
- noUpdate bool
- mutex sync.RWMutex
- }
- // newRTOManager creates a new rtoManager.
- func newRTOManager() *rtoManager {
- return &rtoManager{
- rto: rtoInitial,
- }
- }
- // setNewRTT takes a newly measured RTT then adjust the RTO in msec.
- func (m *rtoManager) setNewRTT(rtt float64) float64 {
- m.mutex.Lock()
- defer m.mutex.Unlock()
- if m.noUpdate {
- return m.srtt
- }
- if m.srtt == 0 {
- // First measurement
- m.srtt = rtt
- m.rttvar = rtt / 2
- } else {
- // Subsequent rtt measurement
- m.rttvar = (1-rtoBeta)*m.rttvar + rtoBeta*(math.Abs(m.srtt-rtt))
- m.srtt = (1-rtoAlpha)*m.srtt + rtoAlpha*rtt
- }
- m.rto = math.Min(math.Max(m.srtt+4*m.rttvar, rtoMin), rtoMax)
- return m.srtt
- }
- // getRTO simply returns the current RTO in msec.
- func (m *rtoManager) getRTO() float64 {
- m.mutex.RLock()
- defer m.mutex.RUnlock()
- return m.rto
- }
- // reset resets the RTO variables to the initial values.
- func (m *rtoManager) reset() {
- m.mutex.Lock()
- defer m.mutex.Unlock()
- if m.noUpdate {
- return
- }
- m.srtt = 0
- m.rttvar = 0
- m.rto = rtoInitial
- }
- // set RTO value for testing
- func (m *rtoManager) setRTO(rto float64, noUpdate bool) {
- m.mutex.Lock()
- defer m.mutex.Unlock()
- m.rto = rto
- m.noUpdate = noUpdate
- }
- // rtxTimerObserver is the inteface to a timer observer.
- // NOTE: Observers MUST NOT call start() or stop() method on rtxTimer
- // from within these callbacks.
- type rtxTimerObserver interface {
- onRetransmissionTimeout(timerID int, n uint)
- onRetransmissionFailure(timerID int)
- }
- // rtxTimer provides the retnransmission timer conforms with RFC 4960 Sec 6.3.1
- type rtxTimer struct {
- id int
- observer rtxTimerObserver
- maxRetrans uint
- stopFunc stopTimerLoop
- closed bool
- mutex sync.RWMutex
- }
- type stopTimerLoop func()
- // newRTXTimer creates a new retransmission timer.
- // if maxRetrans is set to 0, it will keep retransmitting until stop() is called.
- // (it will never make onRetransmissionFailure() callback.
- func newRTXTimer(id int, observer rtxTimerObserver, maxRetrans uint) *rtxTimer {
- return &rtxTimer{
- id: id,
- observer: observer,
- maxRetrans: maxRetrans,
- }
- }
- // start starts the timer.
- func (t *rtxTimer) start(rto float64) bool {
- t.mutex.Lock()
- defer t.mutex.Unlock()
- // this timer is already closed
- if t.closed {
- return false
- }
- // this is a noop if the timer is always running
- if t.stopFunc != nil {
- return false
- }
- // Note: rto value is intentionally not capped by RTO.Min to allow
- // fast timeout for the tests. Non-test code should pass in the
- // rto generated by rtoManager getRTO() method which caps the
- // value at RTO.Min or at RTO.Max.
- var nRtos uint
- cancelCh := make(chan struct{})
- go func() {
- canceling := false
- for !canceling {
- timeout := calculateNextTimeout(rto, nRtos)
- timer := time.NewTimer(time.Duration(timeout) * time.Millisecond)
- select {
- case <-timer.C:
- nRtos++
- if t.maxRetrans == 0 || nRtos <= t.maxRetrans {
- t.observer.onRetransmissionTimeout(t.id, nRtos)
- } else {
- t.stop()
- t.observer.onRetransmissionFailure(t.id)
- }
- case <-cancelCh:
- canceling = true
- timer.Stop()
- }
- }
- }()
- t.stopFunc = func() {
- close(cancelCh)
- }
- return true
- }
- // stop stops the timer.
- func (t *rtxTimer) stop() {
- t.mutex.Lock()
- defer t.mutex.Unlock()
- if t.stopFunc != nil {
- t.stopFunc()
- t.stopFunc = nil
- }
- }
- // closes the timer. this is similar to stop() but subsequent start() call
- // will fail (the timer is no longer usable)
- func (t *rtxTimer) close() {
- t.mutex.Lock()
- defer t.mutex.Unlock()
- if t.stopFunc != nil {
- t.stopFunc()
- t.stopFunc = nil
- }
- t.closed = true
- }
- // isRunning tests if the timer is running.
- // Debug purpose only
- func (t *rtxTimer) isRunning() bool {
- t.mutex.RLock()
- defer t.mutex.RUnlock()
- return (t.stopFunc != nil)
- }
- func calculateNextTimeout(rto float64, nRtos uint) float64 {
- // RFC 4096 sec 6.3.3. Handle T3-rtx Expiration
- // E2) For the destination address for which the timer expires, set RTO
- // <- RTO * 2 ("back off the timer"). The maximum value discussed
- // in rule C7 above (RTO.max) may be used to provide an upper bound
- // to this doubling operation.
- if nRtos < 31 {
- m := 1 << nRtos
- return math.Min(rto*float64(m), rtoMax)
- }
- return rtoMax
- }
|