agent.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. package stun
  4. import (
  5. "errors"
  6. "sync"
  7. "time"
  8. )
  9. // NoopHandler just discards any event.
  10. func NoopHandler() Handler {
  11. return func(e Event) {}
  12. }
  13. // NewAgent initializes and returns new Agent with provided handler.
  14. // If h is nil, the NoopHandler will be used.
  15. func NewAgent(h Handler) *Agent {
  16. if h == nil {
  17. h = NoopHandler()
  18. }
  19. a := &Agent{
  20. transactions: make(map[transactionID]agentTransaction),
  21. handler: h,
  22. }
  23. return a
  24. }
  25. // Agent is low-level abstraction over transaction list that
  26. // handles concurrency (all calls are goroutine-safe) and
  27. // time outs (via Collect call).
  28. type Agent struct {
  29. // transactions is map of transactions that are currently
  30. // in progress. Event handling is done in such way when
  31. // transaction is unregistered before agentTransaction access,
  32. // minimizing mux lock and protecting agentTransaction from
  33. // data races via unexpected concurrent access.
  34. transactions map[transactionID]agentTransaction
  35. closed bool // all calls are invalid if true
  36. mux sync.Mutex // protects transactions and closed
  37. handler Handler // handles transactions
  38. }
  39. // Handler handles state changes of transaction.
  40. //
  41. // Handler is called on transaction state change.
  42. // Usage of e is valid only during call, user must
  43. // copy needed fields explicitly.
  44. type Handler func(e Event)
  45. // Event is passed to Handler describing the transaction event.
  46. // Do not reuse outside Handler.
  47. type Event struct {
  48. TransactionID [TransactionIDSize]byte
  49. Message *Message
  50. Error error
  51. }
  52. // agentTransaction represents transaction in progress.
  53. // Concurrent access is invalid.
  54. type agentTransaction struct {
  55. id transactionID
  56. deadline time.Time
  57. }
  58. var (
  59. // ErrTransactionStopped indicates that transaction was manually stopped.
  60. ErrTransactionStopped = errors.New("transaction is stopped")
  61. // ErrTransactionNotExists indicates that agent failed to find transaction.
  62. ErrTransactionNotExists = errors.New("transaction not exists")
  63. // ErrTransactionExists indicates that transaction with same id is already
  64. // registered.
  65. ErrTransactionExists = errors.New("transaction exists with same id")
  66. )
  67. // StopWithError removes transaction from list and calls handler with
  68. // provided error. Can return ErrTransactionNotExists and ErrAgentClosed.
  69. func (a *Agent) StopWithError(id [TransactionIDSize]byte, err error) error {
  70. a.mux.Lock()
  71. if a.closed {
  72. a.mux.Unlock()
  73. return ErrAgentClosed
  74. }
  75. t, exists := a.transactions[id]
  76. delete(a.transactions, id)
  77. h := a.handler
  78. a.mux.Unlock()
  79. if !exists {
  80. return ErrTransactionNotExists
  81. }
  82. h(Event{
  83. TransactionID: t.id,
  84. Error: err,
  85. })
  86. return nil
  87. }
  88. // Stop stops transaction by id with ErrTransactionStopped, blocking
  89. // until handler returns.
  90. func (a *Agent) Stop(id [TransactionIDSize]byte) error {
  91. return a.StopWithError(id, ErrTransactionStopped)
  92. }
  93. // ErrAgentClosed indicates that agent is in closed state and is unable
  94. // to handle transactions.
  95. var ErrAgentClosed = errors.New("agent is closed")
  96. // Start registers transaction with provided id and deadline.
  97. // Could return ErrAgentClosed, ErrTransactionExists.
  98. //
  99. // Agent handler is guaranteed to be eventually called.
  100. func (a *Agent) Start(id [TransactionIDSize]byte, deadline time.Time) error {
  101. a.mux.Lock()
  102. defer a.mux.Unlock()
  103. if a.closed {
  104. return ErrAgentClosed
  105. }
  106. _, exists := a.transactions[id]
  107. if exists {
  108. return ErrTransactionExists
  109. }
  110. a.transactions[id] = agentTransaction{
  111. id: id,
  112. deadline: deadline,
  113. }
  114. return nil
  115. }
  116. // agentCollectCap is initial capacity for Agent.Collect slices,
  117. // sufficient to make function zero-alloc in most cases.
  118. const agentCollectCap = 100
  119. // ErrTransactionTimeOut indicates that transaction has reached deadline.
  120. var ErrTransactionTimeOut = errors.New("transaction is timed out")
  121. // Collect terminates all transactions that have deadline before provided
  122. // time, blocking until all handlers will process ErrTransactionTimeOut.
  123. // Will return ErrAgentClosed if agent is already closed.
  124. //
  125. // It is safe to call Collect concurrently but makes no sense.
  126. func (a *Agent) Collect(gcTime time.Time) error {
  127. toRemove := make([]transactionID, 0, agentCollectCap)
  128. a.mux.Lock()
  129. if a.closed {
  130. // Doing nothing if agent is closed.
  131. // All transactions should be already closed
  132. // during Close() call.
  133. a.mux.Unlock()
  134. return ErrAgentClosed
  135. }
  136. // Adding all transactions with deadline before gcTime
  137. // to toCall and toRemove slices.
  138. // No allocs if there are less than agentCollectCap
  139. // timed out transactions.
  140. for id, t := range a.transactions {
  141. if t.deadline.Before(gcTime) {
  142. toRemove = append(toRemove, id)
  143. }
  144. }
  145. // Un-registering timed out transactions.
  146. for _, id := range toRemove {
  147. delete(a.transactions, id)
  148. }
  149. // Calling handler does not require locked mutex,
  150. // reducing lock time.
  151. h := a.handler
  152. a.mux.Unlock()
  153. // Sending ErrTransactionTimeOut to handler for all transactions,
  154. // blocking until last one.
  155. event := Event{
  156. Error: ErrTransactionTimeOut,
  157. }
  158. for _, id := range toRemove {
  159. event.TransactionID = id
  160. h(event)
  161. }
  162. return nil
  163. }
  164. // Process incoming message, synchronously passing it to handler.
  165. func (a *Agent) Process(m *Message) error {
  166. e := Event{
  167. TransactionID: m.TransactionID,
  168. Message: m,
  169. }
  170. a.mux.Lock()
  171. if a.closed {
  172. a.mux.Unlock()
  173. return ErrAgentClosed
  174. }
  175. h := a.handler
  176. delete(a.transactions, m.TransactionID)
  177. a.mux.Unlock()
  178. h(e)
  179. return nil
  180. }
  181. // SetHandler sets agent handler to h.
  182. func (a *Agent) SetHandler(h Handler) error {
  183. a.mux.Lock()
  184. if a.closed {
  185. a.mux.Unlock()
  186. return ErrAgentClosed
  187. }
  188. a.handler = h
  189. a.mux.Unlock()
  190. return nil
  191. }
  192. // Close terminates all transactions with ErrAgentClosed and renders Agent to
  193. // closed state.
  194. func (a *Agent) Close() error {
  195. e := Event{
  196. Error: ErrAgentClosed,
  197. }
  198. a.mux.Lock()
  199. if a.closed {
  200. a.mux.Unlock()
  201. return ErrAgentClosed
  202. }
  203. for _, t := range a.transactions {
  204. e.TransactionID = t.id
  205. a.handler(e)
  206. }
  207. a.transactions = nil
  208. a.closed = true
  209. a.handler = nil
  210. a.mux.Unlock()
  211. return nil
  212. }
  213. type transactionID [TransactionIDSize]byte