agent.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. // Package ice implements the Interactive Connectivity Establishment (ICE)
  4. // protocol defined in rfc5245.
  5. package ice
  6. import (
  7. "context"
  8. "fmt"
  9. "net"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. atomicx "github.com/pion/ice/v2/internal/atomic"
  15. stunx "github.com/pion/ice/v2/internal/stun"
  16. "github.com/pion/logging"
  17. "github.com/pion/mdns"
  18. "github.com/pion/stun"
  19. "github.com/pion/transport/v2"
  20. "github.com/pion/transport/v2/packetio"
  21. "github.com/pion/transport/v2/stdnet"
  22. "github.com/pion/transport/v2/vnet"
  23. "golang.org/x/net/proxy"
  24. )
  25. type bindingRequest struct {
  26. timestamp time.Time
  27. transactionID [stun.TransactionIDSize]byte
  28. destination net.Addr
  29. isUseCandidate bool
  30. }
  31. // Agent represents the ICE agent
  32. type Agent struct {
  33. chanTask chan task
  34. afterRunFn []func(ctx context.Context)
  35. muAfterRun sync.Mutex
  36. onConnectionStateChangeHdlr atomic.Value // func(ConnectionState)
  37. onSelectedCandidatePairChangeHdlr atomic.Value // func(Candidate, Candidate)
  38. onCandidateHdlr atomic.Value // func(Candidate)
  39. // State owned by the taskLoop
  40. onConnected chan struct{}
  41. onConnectedOnce sync.Once
  42. // Force candidate to be contacted immediately (instead of waiting for task ticker)
  43. forceCandidateContact chan bool
  44. tieBreaker uint64
  45. lite bool
  46. connectionState ConnectionState
  47. gatheringState GatheringState
  48. mDNSMode MulticastDNSMode
  49. mDNSName string
  50. mDNSConn *mdns.Conn
  51. muHaveStarted sync.Mutex
  52. startedCh <-chan struct{}
  53. startedFn func()
  54. isControlling bool
  55. maxBindingRequests uint16
  56. hostAcceptanceMinWait time.Duration
  57. srflxAcceptanceMinWait time.Duration
  58. prflxAcceptanceMinWait time.Duration
  59. relayAcceptanceMinWait time.Duration
  60. portMin uint16
  61. portMax uint16
  62. candidateTypes []CandidateType
  63. // How long connectivity checks can fail before the ICE Agent
  64. // goes to disconnected
  65. disconnectedTimeout time.Duration
  66. // How long connectivity checks can fail before the ICE Agent
  67. // goes to failed
  68. failedTimeout time.Duration
  69. // How often should we send keepalive packets?
  70. // 0 means never
  71. keepaliveInterval time.Duration
  72. // How often should we run our internal taskLoop to check for state changes when connecting
  73. checkInterval time.Duration
  74. localUfrag string
  75. localPwd string
  76. localCandidates map[NetworkType][]Candidate
  77. remoteUfrag string
  78. remotePwd string
  79. remoteCandidates map[NetworkType][]Candidate
  80. checklist []*CandidatePair
  81. selector pairCandidateSelector
  82. selectedPair atomic.Value // *CandidatePair
  83. urls []*stun.URI
  84. networkTypes []NetworkType
  85. buf *packetio.Buffer
  86. // LRU of outbound Binding request Transaction IDs
  87. pendingBindingRequests []bindingRequest
  88. // 1:1 D-NAT IP address mapping
  89. extIPMapper *externalIPMapper
  90. // State for closing
  91. done chan struct{}
  92. taskLoopDone chan struct{}
  93. err atomicx.Error
  94. gatherCandidateCancel func()
  95. gatherCandidateDone chan struct{}
  96. chanCandidate chan Candidate
  97. chanCandidatePair chan *CandidatePair
  98. chanState chan ConnectionState
  99. loggerFactory logging.LoggerFactory
  100. log logging.LeveledLogger
  101. net transport.Net
  102. tcpMux TCPMux
  103. udpMux UDPMux
  104. udpMuxSrflx UniversalUDPMux
  105. interfaceFilter func(string) bool
  106. ipFilter func(net.IP) bool
  107. includeLoopback bool
  108. insecureSkipVerify bool
  109. proxyDialer proxy.Dialer
  110. }
  111. type task struct {
  112. fn func(context.Context, *Agent)
  113. done chan struct{}
  114. }
  115. // afterRun registers function to be run after the task.
  116. func (a *Agent) afterRun(f func(context.Context)) {
  117. a.muAfterRun.Lock()
  118. a.afterRunFn = append(a.afterRunFn, f)
  119. a.muAfterRun.Unlock()
  120. }
  121. func (a *Agent) getAfterRunFn() []func(context.Context) {
  122. a.muAfterRun.Lock()
  123. defer a.muAfterRun.Unlock()
  124. fns := a.afterRunFn
  125. a.afterRunFn = nil
  126. return fns
  127. }
  128. func (a *Agent) ok() error {
  129. select {
  130. case <-a.done:
  131. return a.getErr()
  132. default:
  133. }
  134. return nil
  135. }
  136. func (a *Agent) getErr() error {
  137. if err := a.err.Load(); err != nil {
  138. return err
  139. }
  140. return ErrClosed
  141. }
  142. // Run task in serial. Blocking tasks must be cancelable by context.
  143. func (a *Agent) run(ctx context.Context, t func(context.Context, *Agent)) error {
  144. if err := a.ok(); err != nil {
  145. return err
  146. }
  147. done := make(chan struct{})
  148. select {
  149. case <-ctx.Done():
  150. return ctx.Err()
  151. case a.chanTask <- task{t, done}:
  152. <-done
  153. return nil
  154. }
  155. }
  156. // taskLoop handles registered tasks and agent close.
  157. func (a *Agent) taskLoop() {
  158. after := func() {
  159. for {
  160. // Get and run func registered by afterRun().
  161. fns := a.getAfterRunFn()
  162. if len(fns) == 0 {
  163. break
  164. }
  165. for _, fn := range fns {
  166. fn(a.context())
  167. }
  168. }
  169. }
  170. defer func() {
  171. a.deleteAllCandidates()
  172. a.startedFn()
  173. if err := a.buf.Close(); err != nil {
  174. a.log.Warnf("Failed to close buffer: %v", err)
  175. }
  176. a.closeMulticastConn()
  177. a.updateConnectionState(ConnectionStateClosed)
  178. after()
  179. close(a.chanState)
  180. close(a.chanCandidate)
  181. close(a.chanCandidatePair)
  182. close(a.taskLoopDone)
  183. }()
  184. for {
  185. select {
  186. case <-a.done:
  187. return
  188. case t := <-a.chanTask:
  189. t.fn(a.context(), a)
  190. close(t.done)
  191. after()
  192. }
  193. }
  194. }
  195. // NewAgent creates a new Agent
  196. func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit
  197. var err error
  198. if config.PortMax < config.PortMin {
  199. return nil, ErrPort
  200. }
  201. mDNSName := config.MulticastDNSHostName
  202. if mDNSName == "" {
  203. if mDNSName, err = generateMulticastDNSName(); err != nil {
  204. return nil, err
  205. }
  206. }
  207. if !strings.HasSuffix(mDNSName, ".local") || len(strings.Split(mDNSName, ".")) != 2 {
  208. return nil, ErrInvalidMulticastDNSHostName
  209. }
  210. mDNSMode := config.MulticastDNSMode
  211. if mDNSMode == 0 {
  212. mDNSMode = MulticastDNSModeQueryOnly
  213. }
  214. loggerFactory := config.LoggerFactory
  215. if loggerFactory == nil {
  216. loggerFactory = logging.NewDefaultLoggerFactory()
  217. }
  218. log := loggerFactory.NewLogger("ice")
  219. startedCtx, startedFn := context.WithCancel(context.Background())
  220. a := &Agent{
  221. chanTask: make(chan task),
  222. chanState: make(chan ConnectionState),
  223. chanCandidate: make(chan Candidate),
  224. chanCandidatePair: make(chan *CandidatePair),
  225. tieBreaker: globalMathRandomGenerator.Uint64(),
  226. lite: config.Lite,
  227. gatheringState: GatheringStateNew,
  228. connectionState: ConnectionStateNew,
  229. localCandidates: make(map[NetworkType][]Candidate),
  230. remoteCandidates: make(map[NetworkType][]Candidate),
  231. urls: config.Urls,
  232. networkTypes: config.NetworkTypes,
  233. onConnected: make(chan struct{}),
  234. buf: packetio.NewBuffer(),
  235. done: make(chan struct{}),
  236. taskLoopDone: make(chan struct{}),
  237. startedCh: startedCtx.Done(),
  238. startedFn: startedFn,
  239. portMin: config.PortMin,
  240. portMax: config.PortMax,
  241. loggerFactory: loggerFactory,
  242. log: log,
  243. net: config.Net,
  244. proxyDialer: config.ProxyDialer,
  245. tcpMux: config.TCPMux,
  246. udpMux: config.UDPMux,
  247. udpMuxSrflx: config.UDPMuxSrflx,
  248. mDNSMode: mDNSMode,
  249. mDNSName: mDNSName,
  250. gatherCandidateCancel: func() {},
  251. forceCandidateContact: make(chan bool, 1),
  252. interfaceFilter: config.InterfaceFilter,
  253. ipFilter: config.IPFilter,
  254. insecureSkipVerify: config.InsecureSkipVerify,
  255. includeLoopback: config.IncludeLoopback,
  256. }
  257. if a.net == nil {
  258. a.net, err = stdnet.NewNet()
  259. if err != nil {
  260. return nil, fmt.Errorf("failed to create network: %w", err)
  261. }
  262. } else if _, isVirtual := a.net.(*vnet.Net); isVirtual {
  263. a.log.Warn("Virtual network is enabled")
  264. if a.mDNSMode != MulticastDNSModeDisabled {
  265. a.log.Warn("Virtual network does not support mDNS yet")
  266. }
  267. }
  268. // Opportunistic mDNS: If we can't open the connection, that's ok: we
  269. // can continue without it.
  270. if a.mDNSConn, a.mDNSMode, err = createMulticastDNS(a.net, mDNSMode, mDNSName, log); err != nil {
  271. log.Warnf("Failed to initialize mDNS %s: %v", mDNSName, err)
  272. }
  273. config.initWithDefaults(a)
  274. // Make sure the buffer doesn't grow indefinitely.
  275. // NOTE: We actually won't get anywhere close to this limit.
  276. // SRTP will constantly read from the endpoint and drop packets if it's full.
  277. a.buf.SetLimitSize(maxBufferSize)
  278. if a.lite && (len(a.candidateTypes) != 1 || a.candidateTypes[0] != CandidateTypeHost) {
  279. a.closeMulticastConn()
  280. return nil, ErrLiteUsingNonHostCandidates
  281. }
  282. if config.Urls != nil && len(config.Urls) > 0 && !containsCandidateType(CandidateTypeServerReflexive, a.candidateTypes) && !containsCandidateType(CandidateTypeRelay, a.candidateTypes) {
  283. a.closeMulticastConn()
  284. return nil, ErrUselessUrlsProvided
  285. }
  286. if err = config.initExtIPMapping(a); err != nil {
  287. a.closeMulticastConn()
  288. return nil, err
  289. }
  290. go a.taskLoop()
  291. // CandidatePair and ConnectionState are usually changed at once.
  292. // Blocking one by the other one causes deadlock.
  293. // Hence, we call handlers from independent Goroutines.
  294. go a.candidatePairRoutine()
  295. go a.connectionStateRoutine()
  296. go a.candidateRoutine()
  297. // Restart is also used to initialize the agent for the first time
  298. if err := a.Restart(config.LocalUfrag, config.LocalPwd); err != nil {
  299. a.closeMulticastConn()
  300. _ = a.Close()
  301. return nil, err
  302. }
  303. return a, nil
  304. }
  305. func (a *Agent) startConnectivityChecks(isControlling bool, remoteUfrag, remotePwd string) error {
  306. a.muHaveStarted.Lock()
  307. defer a.muHaveStarted.Unlock()
  308. select {
  309. case <-a.startedCh:
  310. return ErrMultipleStart
  311. default:
  312. }
  313. if err := a.SetRemoteCredentials(remoteUfrag, remotePwd); err != nil { //nolint:contextcheck
  314. return err
  315. }
  316. a.log.Debugf("Started agent: isControlling? %t, remoteUfrag: %q, remotePwd: %q", isControlling, remoteUfrag, remotePwd)
  317. return a.run(a.context(), func(ctx context.Context, agent *Agent) {
  318. agent.isControlling = isControlling
  319. agent.remoteUfrag = remoteUfrag
  320. agent.remotePwd = remotePwd
  321. if isControlling {
  322. a.selector = &controllingSelector{agent: a, log: a.log}
  323. } else {
  324. a.selector = &controlledSelector{agent: a, log: a.log}
  325. }
  326. if a.lite {
  327. a.selector = &liteSelector{pairCandidateSelector: a.selector}
  328. }
  329. a.selector.Start()
  330. a.startedFn()
  331. agent.updateConnectionState(ConnectionStateChecking)
  332. a.requestConnectivityCheck()
  333. go a.connectivityChecks() //nolint:contextcheck
  334. })
  335. }
  336. func (a *Agent) connectivityChecks() {
  337. lastConnectionState := ConnectionState(0)
  338. checkingDuration := time.Time{}
  339. contact := func() {
  340. if err := a.run(a.context(), func(ctx context.Context, a *Agent) {
  341. defer func() {
  342. lastConnectionState = a.connectionState
  343. }()
  344. switch a.connectionState {
  345. case ConnectionStateFailed:
  346. // The connection is currently failed so don't send any checks
  347. // In the future it may be restarted though
  348. return
  349. case ConnectionStateChecking:
  350. // We have just entered checking for the first time so update our checking timer
  351. if lastConnectionState != a.connectionState {
  352. checkingDuration = time.Now()
  353. }
  354. // We have been in checking longer then Disconnect+Failed timeout, set the connection to Failed
  355. if time.Since(checkingDuration) > a.disconnectedTimeout+a.failedTimeout {
  356. a.updateConnectionState(ConnectionStateFailed)
  357. return
  358. }
  359. default:
  360. }
  361. a.selector.ContactCandidates()
  362. }); err != nil {
  363. a.log.Warnf("Failed to start connectivity checks: %v", err)
  364. }
  365. }
  366. for {
  367. interval := defaultKeepaliveInterval
  368. updateInterval := func(x time.Duration) {
  369. if x != 0 && (interval == 0 || interval > x) {
  370. interval = x
  371. }
  372. }
  373. switch lastConnectionState {
  374. case ConnectionStateNew, ConnectionStateChecking: // While connecting, check candidates more frequently
  375. updateInterval(a.checkInterval)
  376. case ConnectionStateConnected, ConnectionStateDisconnected:
  377. updateInterval(a.keepaliveInterval)
  378. default:
  379. }
  380. // Ensure we run our task loop as quickly as the minimum of our various configured timeouts
  381. updateInterval(a.disconnectedTimeout)
  382. updateInterval(a.failedTimeout)
  383. t := time.NewTimer(interval)
  384. select {
  385. case <-a.forceCandidateContact:
  386. t.Stop()
  387. contact()
  388. case <-t.C:
  389. contact()
  390. case <-a.done:
  391. t.Stop()
  392. return
  393. }
  394. }
  395. }
  396. func (a *Agent) updateConnectionState(newState ConnectionState) {
  397. if a.connectionState != newState {
  398. // Connection has gone to failed, release all gathered candidates
  399. if newState == ConnectionStateFailed {
  400. a.deleteAllCandidates()
  401. }
  402. a.log.Infof("Setting new connection state: %s", newState)
  403. a.connectionState = newState
  404. // Call handler after finishing current task since we may be holding the agent lock
  405. // and the handler may also require it
  406. a.afterRun(func(ctx context.Context) {
  407. a.chanState <- newState
  408. })
  409. }
  410. }
  411. func (a *Agent) setSelectedPair(p *CandidatePair) {
  412. if p == nil {
  413. var nilPair *CandidatePair
  414. a.selectedPair.Store(nilPair)
  415. a.log.Tracef("Unset selected candidate pair")
  416. return
  417. }
  418. p.nominated = true
  419. a.selectedPair.Store(p)
  420. a.log.Tracef("Set selected candidate pair: %s", p)
  421. a.updateConnectionState(ConnectionStateConnected)
  422. // Notify when the selected pair changes
  423. a.afterRun(func(ctx context.Context) {
  424. select {
  425. case a.chanCandidatePair <- p:
  426. case <-ctx.Done():
  427. }
  428. })
  429. // Signal connected
  430. a.onConnectedOnce.Do(func() { close(a.onConnected) })
  431. }
  432. func (a *Agent) pingAllCandidates() {
  433. a.log.Trace("pinging all candidates")
  434. if len(a.checklist) == 0 {
  435. a.log.Warn("Failed to ping without candidate pairs. Connection is not possible yet.")
  436. }
  437. for _, p := range a.checklist {
  438. if p.state == CandidatePairStateWaiting {
  439. p.state = CandidatePairStateInProgress
  440. } else if p.state != CandidatePairStateInProgress {
  441. continue
  442. }
  443. if p.bindingRequestCount > a.maxBindingRequests {
  444. a.log.Tracef("max requests reached for pair %s, marking it as failed", p)
  445. p.state = CandidatePairStateFailed
  446. } else {
  447. a.selector.PingCandidate(p.Local, p.Remote)
  448. p.bindingRequestCount++
  449. }
  450. }
  451. }
  452. func (a *Agent) getBestAvailableCandidatePair() *CandidatePair {
  453. var best *CandidatePair
  454. for _, p := range a.checklist {
  455. if p.state == CandidatePairStateFailed {
  456. continue
  457. }
  458. if best == nil {
  459. best = p
  460. } else if best.priority() < p.priority() {
  461. best = p
  462. }
  463. }
  464. return best
  465. }
  466. func (a *Agent) getBestValidCandidatePair() *CandidatePair {
  467. var best *CandidatePair
  468. for _, p := range a.checklist {
  469. if p.state != CandidatePairStateSucceeded {
  470. continue
  471. }
  472. if best == nil {
  473. best = p
  474. } else if best.priority() < p.priority() {
  475. best = p
  476. }
  477. }
  478. return best
  479. }
  480. func (a *Agent) addPair(local, remote Candidate) *CandidatePair {
  481. p := newCandidatePair(local, remote, a.isControlling)
  482. a.checklist = append(a.checklist, p)
  483. return p
  484. }
  485. func (a *Agent) findPair(local, remote Candidate) *CandidatePair {
  486. for _, p := range a.checklist {
  487. if p.Local.Equal(local) && p.Remote.Equal(remote) {
  488. return p
  489. }
  490. }
  491. return nil
  492. }
  493. // validateSelectedPair checks if the selected pair is (still) valid
  494. // Note: the caller should hold the agent lock.
  495. func (a *Agent) validateSelectedPair() bool {
  496. selectedPair := a.getSelectedPair()
  497. if selectedPair == nil {
  498. return false
  499. }
  500. disconnectedTime := time.Since(selectedPair.Remote.LastReceived())
  501. // Only allow transitions to failed if a.failedTimeout is non-zero
  502. totalTimeToFailure := a.failedTimeout
  503. if totalTimeToFailure != 0 {
  504. totalTimeToFailure += a.disconnectedTimeout
  505. }
  506. switch {
  507. case totalTimeToFailure != 0 && disconnectedTime > totalTimeToFailure:
  508. a.updateConnectionState(ConnectionStateFailed)
  509. case a.disconnectedTimeout != 0 && disconnectedTime > a.disconnectedTimeout:
  510. a.updateConnectionState(ConnectionStateDisconnected)
  511. default:
  512. a.updateConnectionState(ConnectionStateConnected)
  513. }
  514. return true
  515. }
  516. // checkKeepalive sends STUN Binding Indications to the selected pair
  517. // if no packet has been sent on that pair in the last keepaliveInterval
  518. // Note: the caller should hold the agent lock.
  519. func (a *Agent) checkKeepalive() {
  520. selectedPair := a.getSelectedPair()
  521. if selectedPair == nil {
  522. return
  523. }
  524. if (a.keepaliveInterval != 0) &&
  525. ((time.Since(selectedPair.Local.LastSent()) > a.keepaliveInterval) ||
  526. (time.Since(selectedPair.Remote.LastReceived()) > a.keepaliveInterval)) {
  527. // We use binding request instead of indication to support refresh consent schemas
  528. // see https://tools.ietf.org/html/rfc7675
  529. a.selector.PingCandidate(selectedPair.Local, selectedPair.Remote)
  530. }
  531. }
  532. // AddRemoteCandidate adds a new remote candidate
  533. func (a *Agent) AddRemoteCandidate(c Candidate) error {
  534. if c == nil {
  535. return nil
  536. }
  537. // Cannot check for network yet because it might not be applied
  538. // when mDNS hostname is used.
  539. if c.TCPType() == TCPTypeActive {
  540. // TCP Candidates with TCP type active will probe server passive ones, so
  541. // no need to do anything with them.
  542. a.log.Infof("Ignoring remote candidate with tcpType active: %s", c)
  543. return nil
  544. }
  545. // If we have a mDNS Candidate lets fully resolve it before adding it locally
  546. if c.Type() == CandidateTypeHost && strings.HasSuffix(c.Address(), ".local") {
  547. if a.mDNSMode == MulticastDNSModeDisabled {
  548. a.log.Warnf("Remote mDNS candidate added, but mDNS is disabled: (%s)", c.Address())
  549. return nil
  550. }
  551. hostCandidate, ok := c.(*CandidateHost)
  552. if !ok {
  553. return ErrAddressParseFailed
  554. }
  555. go a.resolveAndAddMulticastCandidate(hostCandidate)
  556. return nil
  557. }
  558. go func() {
  559. if err := a.run(a.context(), func(ctx context.Context, agent *Agent) {
  560. agent.addRemoteCandidate(c)
  561. }); err != nil {
  562. a.log.Warnf("Failed to add remote candidate %s: %v", c.Address(), err)
  563. return
  564. }
  565. }()
  566. return nil
  567. }
  568. func (a *Agent) resolveAndAddMulticastCandidate(c *CandidateHost) {
  569. if a.mDNSConn == nil {
  570. return
  571. }
  572. _, src, err := a.mDNSConn.Query(c.context(), c.Address())
  573. if err != nil {
  574. a.log.Warnf("Failed to discover mDNS candidate %s: %v", c.Address(), err)
  575. return
  576. }
  577. ip, ipOk := parseMulticastAnswerAddr(src)
  578. if !ipOk {
  579. a.log.Warnf("Failed to discover mDNS candidate %s: failed to parse IP", c.Address())
  580. return
  581. }
  582. if err = c.setIP(ip); err != nil {
  583. a.log.Warnf("Failed to discover mDNS candidate %s: %v", c.Address(), err)
  584. return
  585. }
  586. if err = a.run(a.context(), func(ctx context.Context, agent *Agent) {
  587. agent.addRemoteCandidate(c)
  588. }); err != nil {
  589. a.log.Warnf("Failed to add mDNS candidate %s: %v", c.Address(), err)
  590. return
  591. }
  592. }
  593. func (a *Agent) requestConnectivityCheck() {
  594. select {
  595. case a.forceCandidateContact <- true:
  596. default:
  597. }
  598. }
  599. // addRemoteCandidate assumes you are holding the lock (must be execute using a.run)
  600. func (a *Agent) addRemoteCandidate(c Candidate) {
  601. set := a.remoteCandidates[c.NetworkType()]
  602. for _, candidate := range set {
  603. if candidate.Equal(c) {
  604. return
  605. }
  606. }
  607. set = append(set, c)
  608. a.remoteCandidates[c.NetworkType()] = set
  609. if localCandidates, ok := a.localCandidates[c.NetworkType()]; ok {
  610. for _, localCandidate := range localCandidates {
  611. a.addPair(localCandidate, c)
  612. }
  613. }
  614. a.requestConnectivityCheck()
  615. }
  616. func (a *Agent) addCandidate(ctx context.Context, c Candidate, candidateConn net.PacketConn) error {
  617. return a.run(ctx, func(ctx context.Context, agent *Agent) {
  618. set := a.localCandidates[c.NetworkType()]
  619. for _, candidate := range set {
  620. if candidate.Equal(c) {
  621. a.log.Debugf("Ignore duplicate candidate: %s", c.String())
  622. if err := c.close(); err != nil {
  623. a.log.Warnf("Failed to close duplicate candidate: %v", err)
  624. }
  625. if err := candidateConn.Close(); err != nil {
  626. a.log.Warnf("Failed to close duplicate candidate connection: %v", err)
  627. }
  628. return
  629. }
  630. }
  631. c.start(a, candidateConn, a.startedCh)
  632. set = append(set, c)
  633. a.localCandidates[c.NetworkType()] = set
  634. if remoteCandidates, ok := a.remoteCandidates[c.NetworkType()]; ok {
  635. for _, remoteCandidate := range remoteCandidates {
  636. a.addPair(c, remoteCandidate)
  637. }
  638. }
  639. a.requestConnectivityCheck()
  640. a.chanCandidate <- c
  641. })
  642. }
  643. // GetLocalCandidates returns the local candidates
  644. func (a *Agent) GetLocalCandidates() ([]Candidate, error) {
  645. var res []Candidate
  646. err := a.run(a.context(), func(ctx context.Context, agent *Agent) {
  647. var candidates []Candidate
  648. for _, set := range agent.localCandidates {
  649. candidates = append(candidates, set...)
  650. }
  651. res = candidates
  652. })
  653. if err != nil {
  654. return nil, err
  655. }
  656. return res, nil
  657. }
  658. // GetLocalUserCredentials returns the local user credentials
  659. func (a *Agent) GetLocalUserCredentials() (frag string, pwd string, err error) {
  660. valSet := make(chan struct{})
  661. err = a.run(a.context(), func(ctx context.Context, agent *Agent) {
  662. frag = agent.localUfrag
  663. pwd = agent.localPwd
  664. close(valSet)
  665. })
  666. if err == nil {
  667. <-valSet
  668. }
  669. return
  670. }
  671. // GetRemoteUserCredentials returns the remote user credentials
  672. func (a *Agent) GetRemoteUserCredentials() (frag string, pwd string, err error) {
  673. valSet := make(chan struct{})
  674. err = a.run(a.context(), func(ctx context.Context, agent *Agent) {
  675. frag = agent.remoteUfrag
  676. pwd = agent.remotePwd
  677. close(valSet)
  678. })
  679. if err == nil {
  680. <-valSet
  681. }
  682. return
  683. }
  684. func (a *Agent) removeUfragFromMux() {
  685. if a.tcpMux != nil {
  686. a.tcpMux.RemoveConnByUfrag(a.localUfrag)
  687. }
  688. if a.udpMux != nil {
  689. a.udpMux.RemoveConnByUfrag(a.localUfrag)
  690. }
  691. if a.udpMuxSrflx != nil {
  692. a.udpMuxSrflx.RemoveConnByUfrag(a.localUfrag)
  693. }
  694. }
  695. // Close cleans up the Agent
  696. func (a *Agent) Close() error {
  697. if err := a.ok(); err != nil {
  698. return err
  699. }
  700. a.afterRun(func(context.Context) {
  701. a.gatherCandidateCancel()
  702. if a.gatherCandidateDone != nil {
  703. <-a.gatherCandidateDone
  704. }
  705. })
  706. a.err.Store(ErrClosed)
  707. a.removeUfragFromMux()
  708. close(a.done)
  709. <-a.taskLoopDone
  710. return nil
  711. }
  712. // Remove all candidates. This closes any listening sockets
  713. // and removes both the local and remote candidate lists.
  714. //
  715. // This is used for restarts, failures and on close
  716. func (a *Agent) deleteAllCandidates() {
  717. for net, cs := range a.localCandidates {
  718. for _, c := range cs {
  719. if err := c.close(); err != nil {
  720. a.log.Warnf("Failed to close candidate %s: %v", c, err)
  721. }
  722. }
  723. delete(a.localCandidates, net)
  724. }
  725. for net, cs := range a.remoteCandidates {
  726. for _, c := range cs {
  727. if err := c.close(); err != nil {
  728. a.log.Warnf("Failed to close candidate %s: %v", c, err)
  729. }
  730. }
  731. delete(a.remoteCandidates, net)
  732. }
  733. }
  734. func (a *Agent) findRemoteCandidate(networkType NetworkType, addr net.Addr) Candidate {
  735. ip, port, _, ok := parseAddr(addr)
  736. if !ok {
  737. a.log.Warnf("Failed to parse address: %s", addr)
  738. return nil
  739. }
  740. set := a.remoteCandidates[networkType]
  741. for _, c := range set {
  742. if c.Address() == ip.String() && c.Port() == port {
  743. return c
  744. }
  745. }
  746. return nil
  747. }
  748. func (a *Agent) sendBindingRequest(m *stun.Message, local, remote Candidate) {
  749. a.log.Tracef("ping STUN from %s to %s", local.String(), remote.String())
  750. a.invalidatePendingBindingRequests(time.Now())
  751. a.pendingBindingRequests = append(a.pendingBindingRequests, bindingRequest{
  752. timestamp: time.Now(),
  753. transactionID: m.TransactionID,
  754. destination: remote.addr(),
  755. isUseCandidate: m.Contains(stun.AttrUseCandidate),
  756. })
  757. a.sendSTUN(m, local, remote)
  758. }
  759. func (a *Agent) sendBindingSuccess(m *stun.Message, local, remote Candidate) {
  760. base := remote
  761. ip, port, _, ok := parseAddr(base.addr())
  762. if !ok {
  763. a.log.Warnf("Failed to parse address: %s", base.addr())
  764. return
  765. }
  766. if out, err := stun.Build(m, stun.BindingSuccess,
  767. &stun.XORMappedAddress{
  768. IP: ip,
  769. Port: port,
  770. },
  771. stun.NewShortTermIntegrity(a.localPwd),
  772. stun.Fingerprint,
  773. ); err != nil {
  774. a.log.Warnf("Failed to handle inbound ICE from: %s to: %s error: %s", local, remote, err)
  775. } else {
  776. a.sendSTUN(out, local, remote)
  777. }
  778. }
  779. // Removes pending binding requests that are over maxBindingRequestTimeout old
  780. //
  781. // Let HTO be the transaction timeout, which SHOULD be 2*RTT if
  782. // RTT is known or 500 ms otherwise.
  783. // https://tools.ietf.org/html/rfc8445#appendix-B.1
  784. func (a *Agent) invalidatePendingBindingRequests(filterTime time.Time) {
  785. initialSize := len(a.pendingBindingRequests)
  786. temp := a.pendingBindingRequests[:0]
  787. for _, bindingRequest := range a.pendingBindingRequests {
  788. if filterTime.Sub(bindingRequest.timestamp) < maxBindingRequestTimeout {
  789. temp = append(temp, bindingRequest)
  790. }
  791. }
  792. a.pendingBindingRequests = temp
  793. if bindRequestsRemoved := initialSize - len(a.pendingBindingRequests); bindRequestsRemoved > 0 {
  794. a.log.Tracef("Discarded %d binding requests because they expired", bindRequestsRemoved)
  795. }
  796. }
  797. // Assert that the passed TransactionID is in our pendingBindingRequests and returns the destination
  798. // If the bindingRequest was valid remove it from our pending cache
  799. func (a *Agent) handleInboundBindingSuccess(id [stun.TransactionIDSize]byte) (bool, *bindingRequest) {
  800. a.invalidatePendingBindingRequests(time.Now())
  801. for i := range a.pendingBindingRequests {
  802. if a.pendingBindingRequests[i].transactionID == id {
  803. validBindingRequest := a.pendingBindingRequests[i]
  804. a.pendingBindingRequests = append(a.pendingBindingRequests[:i], a.pendingBindingRequests[i+1:]...)
  805. return true, &validBindingRequest
  806. }
  807. }
  808. return false, nil
  809. }
  810. // handleInbound processes STUN traffic from a remote candidate
  811. func (a *Agent) handleInbound(m *stun.Message, local Candidate, remote net.Addr) { //nolint:gocognit
  812. var err error
  813. if m == nil || local == nil {
  814. return
  815. }
  816. if m.Type.Method != stun.MethodBinding ||
  817. !(m.Type.Class == stun.ClassSuccessResponse ||
  818. m.Type.Class == stun.ClassRequest ||
  819. m.Type.Class == stun.ClassIndication) {
  820. a.log.Tracef("unhandled STUN from %s to %s class(%s) method(%s)", remote, local, m.Type.Class, m.Type.Method)
  821. return
  822. }
  823. if a.isControlling {
  824. if m.Contains(stun.AttrICEControlling) {
  825. a.log.Debug("Inbound STUN message: isControlling && a.isControlling == true")
  826. return
  827. } else if m.Contains(stun.AttrUseCandidate) {
  828. a.log.Debug("Inbound STUN message: useCandidate && a.isControlling == true")
  829. return
  830. }
  831. } else {
  832. if m.Contains(stun.AttrICEControlled) {
  833. a.log.Debug("Inbound STUN message: isControlled && a.isControlling == false")
  834. return
  835. }
  836. }
  837. remoteCandidate := a.findRemoteCandidate(local.NetworkType(), remote)
  838. if m.Type.Class == stun.ClassSuccessResponse {
  839. if err = stun.MessageIntegrity([]byte(a.remotePwd)).Check(m); err != nil {
  840. a.log.Warnf("Discard message from (%s), %v", remote, err)
  841. return
  842. }
  843. if remoteCandidate == nil {
  844. a.log.Warnf("Discard success message from (%s), no such remote", remote)
  845. return
  846. }
  847. a.selector.HandleSuccessResponse(m, local, remoteCandidate, remote)
  848. } else if m.Type.Class == stun.ClassRequest {
  849. if err = stunx.AssertUsername(m, a.localUfrag+":"+a.remoteUfrag); err != nil {
  850. a.log.Warnf("Discard message from (%s), %v", remote, err)
  851. return
  852. } else if err = stun.MessageIntegrity([]byte(a.localPwd)).Check(m); err != nil {
  853. a.log.Warnf("Discard message from (%s), %v", remote, err)
  854. return
  855. }
  856. if remoteCandidate == nil {
  857. ip, port, networkType, ok := parseAddr(remote)
  858. if !ok {
  859. a.log.Errorf("Failed to create parse remote net.Addr when creating remote prflx candidate")
  860. return
  861. }
  862. prflxCandidateConfig := CandidatePeerReflexiveConfig{
  863. Network: networkType.String(),
  864. Address: ip.String(),
  865. Port: port,
  866. Component: local.Component(),
  867. RelAddr: "",
  868. RelPort: 0,
  869. }
  870. prflxCandidate, err := NewCandidatePeerReflexive(&prflxCandidateConfig)
  871. if err != nil {
  872. a.log.Errorf("Failed to create new remote prflx candidate (%s)", err)
  873. return
  874. }
  875. remoteCandidate = prflxCandidate
  876. a.log.Debugf("Adding a new peer-reflexive candidate: %s ", remote)
  877. a.addRemoteCandidate(remoteCandidate)
  878. }
  879. a.log.Tracef("inbound STUN (Request) from %s to %s", remote.String(), local.String())
  880. a.selector.HandleBindingRequest(m, local, remoteCandidate)
  881. }
  882. if remoteCandidate != nil {
  883. remoteCandidate.seen(false)
  884. }
  885. }
  886. // validateNonSTUNTraffic processes non STUN traffic from a remote candidate,
  887. // and returns true if it is an actual remote candidate
  888. func (a *Agent) validateNonSTUNTraffic(local Candidate, remote net.Addr) (Candidate, bool) {
  889. var remoteCandidate Candidate
  890. if err := a.run(local.context(), func(ctx context.Context, agent *Agent) {
  891. remoteCandidate = a.findRemoteCandidate(local.NetworkType(), remote)
  892. if remoteCandidate != nil {
  893. remoteCandidate.seen(false)
  894. }
  895. }); err != nil {
  896. a.log.Warnf("Failed to validate remote candidate: %v", err)
  897. }
  898. return remoteCandidate, remoteCandidate != nil
  899. }
  900. // GetSelectedCandidatePair returns the selected pair or nil if there is none
  901. func (a *Agent) GetSelectedCandidatePair() (*CandidatePair, error) {
  902. selectedPair := a.getSelectedPair()
  903. if selectedPair == nil {
  904. return nil, nil //nolint:nilnil
  905. }
  906. local, err := selectedPair.Local.copy()
  907. if err != nil {
  908. return nil, err
  909. }
  910. remote, err := selectedPair.Remote.copy()
  911. if err != nil {
  912. return nil, err
  913. }
  914. return &CandidatePair{Local: local, Remote: remote}, nil
  915. }
  916. func (a *Agent) getSelectedPair() *CandidatePair {
  917. if selectedPair, ok := a.selectedPair.Load().(*CandidatePair); ok {
  918. return selectedPair
  919. }
  920. return nil
  921. }
  922. func (a *Agent) closeMulticastConn() {
  923. if a.mDNSConn != nil {
  924. if err := a.mDNSConn.Close(); err != nil {
  925. a.log.Warnf("Failed to close mDNS Conn: %v", err)
  926. }
  927. }
  928. }
  929. // SetRemoteCredentials sets the credentials of the remote agent
  930. func (a *Agent) SetRemoteCredentials(remoteUfrag, remotePwd string) error {
  931. switch {
  932. case remoteUfrag == "":
  933. return ErrRemoteUfragEmpty
  934. case remotePwd == "":
  935. return ErrRemotePwdEmpty
  936. }
  937. return a.run(a.context(), func(ctx context.Context, agent *Agent) {
  938. agent.remoteUfrag = remoteUfrag
  939. agent.remotePwd = remotePwd
  940. })
  941. }
  942. // Restart restarts the ICE Agent with the provided ufrag/pwd
  943. // If no ufrag/pwd is provided the Agent will generate one itself
  944. //
  945. // If there is a gatherer routine currently running, Restart will
  946. // cancel it.
  947. // After a Restart, the user must then call GatherCandidates explicitly
  948. // to start generating new ones.
  949. func (a *Agent) Restart(ufrag, pwd string) error {
  950. if ufrag == "" {
  951. var err error
  952. ufrag, err = generateUFrag()
  953. if err != nil {
  954. return err
  955. }
  956. }
  957. if pwd == "" {
  958. var err error
  959. pwd, err = generatePwd()
  960. if err != nil {
  961. return err
  962. }
  963. }
  964. if len([]rune(ufrag))*8 < 24 {
  965. return ErrLocalUfragInsufficientBits
  966. }
  967. if len([]rune(pwd))*8 < 128 {
  968. return ErrLocalPwdInsufficientBits
  969. }
  970. var err error
  971. if runErr := a.run(a.context(), func(ctx context.Context, agent *Agent) {
  972. if agent.gatheringState == GatheringStateGathering {
  973. agent.gatherCandidateCancel()
  974. }
  975. // Clear all agent needed to take back to fresh state
  976. a.removeUfragFromMux()
  977. agent.localUfrag = ufrag
  978. agent.localPwd = pwd
  979. agent.remoteUfrag = ""
  980. agent.remotePwd = ""
  981. a.gatheringState = GatheringStateNew
  982. a.checklist = make([]*CandidatePair, 0)
  983. a.pendingBindingRequests = make([]bindingRequest, 0)
  984. a.setSelectedPair(nil)
  985. a.deleteAllCandidates()
  986. if a.selector != nil {
  987. a.selector.Start()
  988. }
  989. // Restart is used by NewAgent. Accept/Connect should be used to move to checking
  990. // for new Agents
  991. if a.connectionState != ConnectionStateNew {
  992. a.updateConnectionState(ConnectionStateChecking)
  993. }
  994. }); runErr != nil {
  995. return runErr
  996. }
  997. return err
  998. }
  999. func (a *Agent) setGatheringState(newState GatheringState) error {
  1000. done := make(chan struct{})
  1001. if err := a.run(a.context(), func(ctx context.Context, agent *Agent) {
  1002. if a.gatheringState != newState && newState == GatheringStateComplete {
  1003. a.chanCandidate <- nil
  1004. }
  1005. a.gatheringState = newState
  1006. close(done)
  1007. }); err != nil {
  1008. return err
  1009. }
  1010. <-done
  1011. return nil
  1012. }