icegatherer.go 13 KB


  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. //go:build !js
  4. // +build !js
  5. package webrtc
  6. import (
  7. "fmt"
  8. "sync"
  9. "sync/atomic"
  10. "github.com/pion/ice/v2"
  11. "github.com/pion/logging"
  12. "github.com/pion/stun"
  13. )
  14. // ICEGatherer gathers local host, server reflexive and relay
  15. // candidates, as well as enabling the retrieval of local Interactive
  16. // Connectivity Establishment (ICE) parameters which can be
  17. // exchanged in signaling.
  18. type ICEGatherer struct {
  19. lock sync.RWMutex
  20. log logging.LeveledLogger
  21. state ICEGathererState
  22. validatedServers []*stun.URI
  23. gatherPolicy ICETransportPolicy
  24. agent *ice.Agent
  25. onLocalCandidateHandler atomic.Value // func(candidate *ICECandidate)
  26. onStateChangeHandler atomic.Value // func(state ICEGathererState)
  27. // Used for GatheringCompletePromise
  28. onGatheringCompleteHandler atomic.Value // func()
  29. api *API
  30. }
  31. // NewICEGatherer creates a new NewICEGatherer.
  32. // This constructor is part of the ORTC API. It is not
  33. // meant to be used together with the basic WebRTC API.
  34. func (api *API) NewICEGatherer(opts ICEGatherOptions) (*ICEGatherer, error) {
  35. var validatedServers []*stun.URI
  36. if len(opts.ICEServers) > 0 {
  37. for _, server := range opts.ICEServers {
  38. url, err := server.urls()
  39. if err != nil {
  40. return nil, err
  41. }
  42. validatedServers = append(validatedServers, url...)
  43. }
  44. }
  45. return &ICEGatherer{
  46. state: ICEGathererStateNew,
  47. gatherPolicy: opts.ICEGatherPolicy,
  48. validatedServers: validatedServers,
  49. api: api,
  50. log: api.settingEngine.LoggerFactory.NewLogger("ice"),
  51. }, nil
  52. }
  53. func (g *ICEGatherer) createAgent() error {
  54. g.lock.Lock()
  55. defer g.lock.Unlock()
  56. if g.agent != nil || g.State() != ICEGathererStateNew {
  57. return nil
  58. }
  59. candidateTypes := []ice.CandidateType{}
  60. if g.api.settingEngine.candidates.ICELite {
  61. candidateTypes = append(candidateTypes, ice.CandidateTypeHost)
  62. } else if g.gatherPolicy == ICETransportPolicyRelay {
  63. candidateTypes = append(candidateTypes, ice.CandidateTypeRelay)
  64. }
  65. var nat1To1CandiTyp ice.CandidateType
  66. switch g.api.settingEngine.candidates.NAT1To1IPCandidateType {
  67. case ICECandidateTypeHost:
  68. nat1To1CandiTyp = ice.CandidateTypeHost
  69. case ICECandidateTypeSrflx:
  70. nat1To1CandiTyp = ice.CandidateTypeServerReflexive
  71. default:
  72. nat1To1CandiTyp = ice.CandidateTypeUnspecified
  73. }
  74. mDNSMode := g.api.settingEngine.candidates.MulticastDNSMode
  75. if mDNSMode != ice.MulticastDNSModeDisabled && mDNSMode != ice.MulticastDNSModeQueryAndGather {
  76. // If enum is in state we don't recognized default to MulticastDNSModeQueryOnly
  77. mDNSMode = ice.MulticastDNSModeQueryOnly
  78. }
  79. config := &ice.AgentConfig{
  80. Lite: g.api.settingEngine.candidates.ICELite,
  81. Urls: g.validatedServers,
  82. PortMin: g.api.settingEngine.ephemeralUDP.PortMin,
  83. PortMax: g.api.settingEngine.ephemeralUDP.PortMax,
  84. DisconnectedTimeout: g.api.settingEngine.timeout.ICEDisconnectedTimeout,
  85. FailedTimeout: g.api.settingEngine.timeout.ICEFailedTimeout,
  86. KeepaliveInterval: g.api.settingEngine.timeout.ICEKeepaliveInterval,
  87. LoggerFactory: g.api.settingEngine.LoggerFactory,
  88. CandidateTypes: candidateTypes,
  89. HostAcceptanceMinWait: g.api.settingEngine.timeout.ICEHostAcceptanceMinWait,
  90. SrflxAcceptanceMinWait: g.api.settingEngine.timeout.ICESrflxAcceptanceMinWait,
  91. PrflxAcceptanceMinWait: g.api.settingEngine.timeout.ICEPrflxAcceptanceMinWait,
  92. RelayAcceptanceMinWait: g.api.settingEngine.timeout.ICERelayAcceptanceMinWait,
  93. InterfaceFilter: g.api.settingEngine.candidates.InterfaceFilter,
  94. IPFilter: g.api.settingEngine.candidates.IPFilter,
  95. NAT1To1IPs: g.api.settingEngine.candidates.NAT1To1IPs,
  96. NAT1To1IPCandidateType: nat1To1CandiTyp,
  97. IncludeLoopback: g.api.settingEngine.candidates.IncludeLoopbackCandidate,
  98. Net: g.api.settingEngine.net,
  99. MulticastDNSMode: mDNSMode,
  100. MulticastDNSHostName: g.api.settingEngine.candidates.MulticastDNSHostName,
  101. LocalUfrag: g.api.settingEngine.candidates.UsernameFragment,
  102. LocalPwd: g.api.settingEngine.candidates.Password,
  103. TCPMux: g.api.settingEngine.iceTCPMux,
  104. UDPMux: g.api.settingEngine.iceUDPMux,
  105. ProxyDialer: g.api.settingEngine.iceProxyDialer,
  106. }
  107. requestedNetworkTypes := g.api.settingEngine.candidates.ICENetworkTypes
  108. if len(requestedNetworkTypes) == 0 {
  109. requestedNetworkTypes = supportedNetworkTypes()
  110. }
  111. for _, typ := range requestedNetworkTypes {
  112. config.NetworkTypes = append(config.NetworkTypes, ice.NetworkType(typ))
  113. }
  114. agent, err := ice.NewAgent(config)
  115. if err != nil {
  116. return err
  117. }
  118. g.agent = agent
  119. return nil
  120. }
  121. // Gather ICE candidates.
  122. func (g *ICEGatherer) Gather() error {
  123. if err := g.createAgent(); err != nil {
  124. return err
  125. }
  126. agent := g.getAgent()
  127. // it is possible agent had just been closed
  128. if agent == nil {
  129. return fmt.Errorf("%w: unable to gather", errICEAgentNotExist)
  130. }
  131. g.setState(ICEGathererStateGathering)
  132. if err := agent.OnCandidate(func(candidate ice.Candidate) {
  133. onLocalCandidateHandler := func(*ICECandidate) {}
  134. if handler, ok := g.onLocalCandidateHandler.Load().(func(candidate *ICECandidate)); ok && handler != nil {
  135. onLocalCandidateHandler = handler
  136. }
  137. onGatheringCompleteHandler := func() {}
  138. if handler, ok := g.onGatheringCompleteHandler.Load().(func()); ok && handler != nil {
  139. onGatheringCompleteHandler = handler
  140. }
  141. if candidate != nil {
  142. c, err := newICECandidateFromICE(candidate)
  143. if err != nil {
  144. g.log.Warnf("Failed to convert ice.Candidate: %s", err)
  145. return
  146. }
  147. onLocalCandidateHandler(&c)
  148. } else {
  149. g.setState(ICEGathererStateComplete)
  150. onGatheringCompleteHandler()
  151. onLocalCandidateHandler(nil)
  152. }
  153. }); err != nil {
  154. return err
  155. }
  156. return agent.GatherCandidates()
  157. }
  158. // Close prunes all local candidates, and closes the ports.
  159. func (g *ICEGatherer) Close() error {
  160. g.lock.Lock()
  161. defer g.lock.Unlock()
  162. if g.agent == nil {
  163. return nil
  164. } else if err := g.agent.Close(); err != nil {
  165. return err
  166. }
  167. g.agent = nil
  168. g.setState(ICEGathererStateClosed)
  169. return nil
  170. }
  171. // GetLocalParameters returns the ICE parameters of the ICEGatherer.
  172. func (g *ICEGatherer) GetLocalParameters() (ICEParameters, error) {
  173. if err := g.createAgent(); err != nil {
  174. return ICEParameters{}, err
  175. }
  176. agent := g.getAgent()
  177. // it is possible agent had just been closed
  178. if agent == nil {
  179. return ICEParameters{}, fmt.Errorf("%w: unable to get local parameters", errICEAgentNotExist)
  180. }
  181. frag, pwd, err := agent.GetLocalUserCredentials()
  182. if err != nil {
  183. return ICEParameters{}, err
  184. }
  185. return ICEParameters{
  186. UsernameFragment: frag,
  187. Password: pwd,
  188. ICELite: false,
  189. }, nil
  190. }
  191. // GetLocalCandidates returns the sequence of valid local candidates associated with the ICEGatherer.
  192. func (g *ICEGatherer) GetLocalCandidates() ([]ICECandidate, error) {
  193. if err := g.createAgent(); err != nil {
  194. return nil, err
  195. }
  196. agent := g.getAgent()
  197. // it is possible agent had just been closed
  198. if agent == nil {
  199. return nil, fmt.Errorf("%w: unable to get local candidates", errICEAgentNotExist)
  200. }
  201. iceCandidates, err := agent.GetLocalCandidates()
  202. if err != nil {
  203. return nil, err
  204. }
  205. return newICECandidatesFromICE(iceCandidates)
  206. }
  207. // OnLocalCandidate sets an event handler which fires when a new local ICE candidate is available
  208. // Take note that the handler will be called with a nil pointer when gathering is finished.
  209. func (g *ICEGatherer) OnLocalCandidate(f func(*ICECandidate)) {
  210. g.onLocalCandidateHandler.Store(f)
  211. }
  212. // OnStateChange fires any time the ICEGatherer changes
  213. func (g *ICEGatherer) OnStateChange(f func(ICEGathererState)) {
  214. g.onStateChangeHandler.Store(f)
  215. }
  216. // State indicates the current state of the ICE gatherer.
  217. func (g *ICEGatherer) State() ICEGathererState {
  218. return atomicLoadICEGathererState(&g.state)
  219. }
  220. func (g *ICEGatherer) setState(s ICEGathererState) {
  221. atomicStoreICEGathererState(&g.state, s)
  222. if handler, ok := g.onStateChangeHandler.Load().(func(state ICEGathererState)); ok && handler != nil {
  223. handler(s)
  224. }
  225. }
  226. func (g *ICEGatherer) getAgent() *ice.Agent {
  227. g.lock.RLock()
  228. defer g.lock.RUnlock()
  229. return g.agent
  230. }
  231. func (g *ICEGatherer) collectStats(collector *statsReportCollector) {
  232. agent := g.getAgent()
  233. if agent == nil {
  234. return
  235. }
  236. collector.Collecting()
  237. go func(collector *statsReportCollector, agent *ice.Agent) {
  238. for _, candidatePairStats := range agent.GetCandidatePairsStats() {
  239. collector.Collecting()
  240. state, err := toStatsICECandidatePairState(candidatePairStats.State)
  241. if err != nil {
  242. g.log.Error(err.Error())
  243. }
  244. pairID := newICECandidatePairStatsID(candidatePairStats.LocalCandidateID,
  245. candidatePairStats.RemoteCandidateID)
  246. stats := ICECandidatePairStats{
  247. Timestamp: statsTimestampFrom(candidatePairStats.Timestamp),
  248. Type: StatsTypeCandidatePair,
  249. ID: pairID,
  250. // TransportID:
  251. LocalCandidateID: candidatePairStats.LocalCandidateID,
  252. RemoteCandidateID: candidatePairStats.RemoteCandidateID,
  253. State: state,
  254. Nominated: candidatePairStats.Nominated,
  255. PacketsSent: candidatePairStats.PacketsSent,
  256. PacketsReceived: candidatePairStats.PacketsReceived,
  257. BytesSent: candidatePairStats.BytesSent,
  258. BytesReceived: candidatePairStats.BytesReceived,
  259. LastPacketSentTimestamp: statsTimestampFrom(candidatePairStats.LastPacketSentTimestamp),
  260. LastPacketReceivedTimestamp: statsTimestampFrom(candidatePairStats.LastPacketReceivedTimestamp),
  261. FirstRequestTimestamp: statsTimestampFrom(candidatePairStats.FirstRequestTimestamp),
  262. LastRequestTimestamp: statsTimestampFrom(candidatePairStats.LastRequestTimestamp),
  263. LastResponseTimestamp: statsTimestampFrom(candidatePairStats.LastResponseTimestamp),
  264. TotalRoundTripTime: candidatePairStats.TotalRoundTripTime,
  265. CurrentRoundTripTime: candidatePairStats.CurrentRoundTripTime,
  266. AvailableOutgoingBitrate: candidatePairStats.AvailableOutgoingBitrate,
  267. AvailableIncomingBitrate: candidatePairStats.AvailableIncomingBitrate,
  268. CircuitBreakerTriggerCount: candidatePairStats.CircuitBreakerTriggerCount,
  269. RequestsReceived: candidatePairStats.RequestsReceived,
  270. RequestsSent: candidatePairStats.RequestsSent,
  271. ResponsesReceived: candidatePairStats.ResponsesReceived,
  272. ResponsesSent: candidatePairStats.ResponsesSent,
  273. RetransmissionsReceived: candidatePairStats.RetransmissionsReceived,
  274. RetransmissionsSent: candidatePairStats.RetransmissionsSent,
  275. ConsentRequestsSent: candidatePairStats.ConsentRequestsSent,
  276. ConsentExpiredTimestamp: statsTimestampFrom(candidatePairStats.ConsentExpiredTimestamp),
  277. }
  278. collector.Collect(stats.ID, stats)
  279. }
  280. for _, candidateStats := range agent.GetLocalCandidatesStats() {
  281. collector.Collecting()
  282. networkType, err := getNetworkType(candidateStats.NetworkType)
  283. if err != nil {
  284. g.log.Error(err.Error())
  285. }
  286. candidateType, err := getCandidateType(candidateStats.CandidateType)
  287. if err != nil {
  288. g.log.Error(err.Error())
  289. }
  290. stats := ICECandidateStats{
  291. Timestamp: statsTimestampFrom(candidateStats.Timestamp),
  292. ID: candidateStats.ID,
  293. Type: StatsTypeLocalCandidate,
  294. NetworkType: networkType,
  295. IP: candidateStats.IP,
  296. Port: int32(candidateStats.Port),
  297. Protocol: networkType.Protocol(),
  298. CandidateType: candidateType,
  299. Priority: int32(candidateStats.Priority),
  300. URL: candidateStats.URL,
  301. RelayProtocol: candidateStats.RelayProtocol,
  302. Deleted: candidateStats.Deleted,
  303. }
  304. collector.Collect(stats.ID, stats)
  305. }
  306. for _, candidateStats := range agent.GetRemoteCandidatesStats() {
  307. collector.Collecting()
  308. networkType, err := getNetworkType(candidateStats.NetworkType)
  309. if err != nil {
  310. g.log.Error(err.Error())
  311. }
  312. candidateType, err := getCandidateType(candidateStats.CandidateType)
  313. if err != nil {
  314. g.log.Error(err.Error())
  315. }
  316. stats := ICECandidateStats{
  317. Timestamp: statsTimestampFrom(candidateStats.Timestamp),
  318. ID: candidateStats.ID,
  319. Type: StatsTypeRemoteCandidate,
  320. NetworkType: networkType,
  321. IP: candidateStats.IP,
  322. Port: int32(candidateStats.Port),
  323. Protocol: networkType.Protocol(),
  324. CandidateType: candidateType,
  325. Priority: int32(candidateStats.Priority),
  326. URL: candidateStats.URL,
  327. RelayProtocol: candidateStats.RelayProtocol,
  328. }
  329. collector.Collect(stats.ID, stats)
  330. }
  331. collector.Done()
  332. }(collector, agent)
  333. }