gather.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. package ice
  4. import (
  5. "context"
  6. "crypto/tls"
  7. "fmt"
  8. "io"
  9. "net"
  10. "reflect"
  11. "sync"
  12. "time"
  13. "github.com/pion/dtls/v2"
  14. "github.com/pion/ice/v2/internal/fakenet"
  15. stunx "github.com/pion/ice/v2/internal/stun"
  16. "github.com/pion/logging"
  17. "github.com/pion/stun"
  18. "github.com/pion/turn/v2"
  19. )
  20. const (
  21. stunGatherTimeout = time.Second * 5
  22. )
  23. // Close a net.Conn and log if we have a failure
  24. func closeConnAndLog(c io.Closer, log logging.LeveledLogger, msg string, args ...interface{}) {
  25. if c == nil || (reflect.ValueOf(c).Kind() == reflect.Ptr && reflect.ValueOf(c).IsNil()) {
  26. log.Warnf("Connection is not allocated: "+msg, args...)
  27. return
  28. }
  29. log.Warnf(msg)
  30. if err := c.Close(); err != nil {
  31. log.Warnf("Failed to close connection: %v", err)
  32. }
  33. }
  34. // GatherCandidates initiates the trickle based gathering process.
  35. func (a *Agent) GatherCandidates() error {
  36. var gatherErr error
  37. if runErr := a.run(a.context(), func(ctx context.Context, agent *Agent) {
  38. if a.gatheringState != GatheringStateNew {
  39. gatherErr = ErrMultipleGatherAttempted
  40. return
  41. } else if a.onCandidateHdlr.Load() == nil {
  42. gatherErr = ErrNoOnCandidateHandler
  43. return
  44. }
  45. a.gatherCandidateCancel() // Cancel previous gathering routine
  46. ctx, cancel := context.WithCancel(ctx)
  47. a.gatherCandidateCancel = cancel
  48. done := make(chan struct{})
  49. a.gatherCandidateDone = done
  50. go a.gatherCandidates(ctx, done)
  51. }); runErr != nil {
  52. return runErr
  53. }
  54. return gatherErr
  55. }
  56. func (a *Agent) gatherCandidates(ctx context.Context, done chan struct{}) {
  57. defer close(done)
  58. if err := a.setGatheringState(GatheringStateGathering); err != nil { //nolint:contextcheck
  59. a.log.Warnf("Failed to set gatheringState to GatheringStateGathering: %v", err)
  60. return
  61. }
  62. var wg sync.WaitGroup
  63. for _, t := range a.candidateTypes {
  64. switch t {
  65. case CandidateTypeHost:
  66. wg.Add(1)
  67. go func() {
  68. a.gatherCandidatesLocal(ctx, a.networkTypes)
  69. wg.Done()
  70. }()
  71. case CandidateTypeServerReflexive:
  72. wg.Add(1)
  73. go func() {
  74. if a.udpMuxSrflx != nil {
  75. a.gatherCandidatesSrflxUDPMux(ctx, a.urls, a.networkTypes)
  76. } else {
  77. a.gatherCandidatesSrflx(ctx, a.urls, a.networkTypes)
  78. }
  79. wg.Done()
  80. }()
  81. if a.extIPMapper != nil && a.extIPMapper.candidateType == CandidateTypeServerReflexive {
  82. wg.Add(1)
  83. go func() {
  84. a.gatherCandidatesSrflxMapped(ctx, a.networkTypes)
  85. wg.Done()
  86. }()
  87. }
  88. case CandidateTypeRelay:
  89. wg.Add(1)
  90. go func() {
  91. a.gatherCandidatesRelay(ctx, a.urls)
  92. wg.Done()
  93. }()
  94. case CandidateTypePeerReflexive, CandidateTypeUnspecified:
  95. }
  96. }
  97. // Block until all STUN and TURN URLs have been gathered (or timed out)
  98. wg.Wait()
  99. if err := a.setGatheringState(GatheringStateComplete); err != nil { //nolint:contextcheck
  100. a.log.Warnf("Failed to set gatheringState to GatheringStateComplete: %v", err)
  101. }
  102. }
  103. func (a *Agent) gatherCandidatesLocal(ctx context.Context, networkTypes []NetworkType) { //nolint:gocognit
  104. networks := map[string]struct{}{}
  105. for _, networkType := range networkTypes {
  106. if networkType.IsTCP() {
  107. networks[tcp] = struct{}{}
  108. } else {
  109. networks[udp] = struct{}{}
  110. }
  111. }
  112. // When UDPMux is enabled, skip other UDP candidates
  113. if a.udpMux != nil {
  114. if err := a.gatherCandidatesLocalUDPMux(ctx); err != nil {
  115. a.log.Warnf("Failed to create host candidate for UDPMux: %s", err)
  116. }
  117. delete(networks, udp)
  118. }
  119. localIPs, err := localInterfaces(a.net, a.interfaceFilter, a.ipFilter, networkTypes, a.includeLoopback)
  120. if err != nil {
  121. a.log.Warnf("Failed to iterate local interfaces, host candidates will not be gathered %s", err)
  122. return
  123. }
  124. for _, ip := range localIPs {
  125. mappedIP := ip
  126. if a.mDNSMode != MulticastDNSModeQueryAndGather && a.extIPMapper != nil && a.extIPMapper.candidateType == CandidateTypeHost {
  127. if _mappedIP, innerErr := a.extIPMapper.findExternalIP(ip.String()); innerErr == nil {
  128. mappedIP = _mappedIP
  129. } else {
  130. a.log.Warnf("1:1 NAT mapping is enabled but no external IP is found for %s", ip.String())
  131. }
  132. }
  133. address := mappedIP.String()
  134. if a.mDNSMode == MulticastDNSModeQueryAndGather {
  135. address = a.mDNSName
  136. }
  137. for network := range networks {
  138. type connAndPort struct {
  139. conn net.PacketConn
  140. port int
  141. }
  142. var (
  143. conns []connAndPort
  144. tcpType TCPType
  145. )
  146. switch network {
  147. case tcp:
  148. if a.tcpMux == nil {
  149. continue
  150. }
  151. // Handle ICE TCP passive mode
  152. var muxConns []net.PacketConn
  153. if multi, ok := a.tcpMux.(AllConnsGetter); ok {
  154. a.log.Debugf("GetAllConns by ufrag: %s", a.localUfrag)
  155. muxConns, err = multi.GetAllConns(a.localUfrag, mappedIP.To4() == nil, ip)
  156. if err != nil {
  157. a.log.Warnf("Failed to get all TCP connections by ufrag: %s %s %s", network, ip, a.localUfrag)
  158. continue
  159. }
  160. } else {
  161. a.log.Debugf("GetConn by ufrag: %s", a.localUfrag)
  162. conn, err := a.tcpMux.GetConnByUfrag(a.localUfrag, mappedIP.To4() == nil, ip)
  163. if err != nil {
  164. a.log.Warnf("Failed to get TCP connections by ufrag: %s %s %s", network, ip, a.localUfrag)
  165. continue
  166. }
  167. muxConns = []net.PacketConn{conn}
  168. }
  169. // Extract the port for each PacketConn we got.
  170. for _, conn := range muxConns {
  171. if tcpConn, ok := conn.LocalAddr().(*net.TCPAddr); ok {
  172. conns = append(conns, connAndPort{conn, tcpConn.Port})
  173. } else {
  174. a.log.Warnf("Failed to get port of connection from TCPMux: %s %s %s", network, ip, a.localUfrag)
  175. }
  176. }
  177. if len(conns) == 0 {
  178. // Didn't succeed with any, try the next network.
  179. continue
  180. }
  181. tcpType = TCPTypePassive
  182. // Is there a way to verify that the listen address is even
  183. // accessible from the current interface.
  184. case udp:
  185. conn, err := listenUDPInPortRange(a.net, a.log, int(a.portMax), int(a.portMin), network, &net.UDPAddr{IP: ip, Port: 0})
  186. if err != nil {
  187. a.log.Warnf("Failed to listen %s %s", network, ip)
  188. continue
  189. }
  190. if udpConn, ok := conn.LocalAddr().(*net.UDPAddr); ok {
  191. conns = append(conns, connAndPort{conn, udpConn.Port})
  192. } else {
  193. a.log.Warnf("Failed to get port of UDPAddr from ListenUDPInPortRange: %s %s %s", network, ip, a.localUfrag)
  194. continue
  195. }
  196. }
  197. for _, connAndPort := range conns {
  198. hostConfig := CandidateHostConfig{
  199. Network: network,
  200. Address: address,
  201. Port: connAndPort.port,
  202. Component: ComponentRTP,
  203. TCPType: tcpType,
  204. }
  205. c, err := NewCandidateHost(&hostConfig)
  206. if err != nil {
  207. closeConnAndLog(connAndPort.conn, a.log, "failed to create host candidate: %s %s %d: %v", network, mappedIP, connAndPort.port, err)
  208. continue
  209. }
  210. if a.mDNSMode == MulticastDNSModeQueryAndGather {
  211. if err = c.setIP(ip); err != nil {
  212. closeConnAndLog(connAndPort.conn, a.log, "failed to create host candidate: %s %s %d: %v", network, mappedIP, connAndPort.port, err)
  213. continue
  214. }
  215. }
  216. if err := a.addCandidate(ctx, c, connAndPort.conn); err != nil {
  217. if closeErr := c.close(); closeErr != nil {
  218. a.log.Warnf("Failed to close candidate: %v", closeErr)
  219. }
  220. a.log.Warnf("Failed to append to localCandidates and run onCandidateHdlr: %v", err)
  221. }
  222. }
  223. }
  224. }
  225. }
  226. func (a *Agent) gatherCandidatesLocalUDPMux(ctx context.Context) error { //nolint:gocognit
  227. if a.udpMux == nil {
  228. return errUDPMuxDisabled
  229. }
  230. localAddresses := a.udpMux.GetListenAddresses()
  231. existingConfigs := make(map[CandidateHostConfig]struct{})
  232. for _, addr := range localAddresses {
  233. udpAddr, ok := addr.(*net.UDPAddr)
  234. if !ok {
  235. return errInvalidAddress
  236. }
  237. candidateIP := udpAddr.IP
  238. if a.extIPMapper != nil && a.extIPMapper.candidateType == CandidateTypeHost {
  239. mappedIP, err := a.extIPMapper.findExternalIP(candidateIP.String())
  240. if err != nil {
  241. a.log.Warnf("1:1 NAT mapping is enabled but no external IP is found for %s", candidateIP.String())
  242. continue
  243. }
  244. candidateIP = mappedIP
  245. }
  246. hostConfig := CandidateHostConfig{
  247. Network: udp,
  248. Address: candidateIP.String(),
  249. Port: udpAddr.Port,
  250. Component: ComponentRTP,
  251. }
  252. // Detect a duplicate candidate before calling addCandidate().
  253. // otherwise, addCandidate() detects the duplicate candidate
  254. // and close its connection, invalidating all candidates
  255. // that share the same connection.
  256. if _, ok := existingConfigs[hostConfig]; ok {
  257. continue
  258. }
  259. conn, err := a.udpMux.GetConn(a.localUfrag, udpAddr)
  260. if err != nil {
  261. return err
  262. }
  263. c, err := NewCandidateHost(&hostConfig)
  264. if err != nil {
  265. closeConnAndLog(conn, a.log, "failed to create host mux candidate: %s %d: %v", candidateIP, udpAddr.Port, err)
  266. continue
  267. }
  268. if err := a.addCandidate(ctx, c, conn); err != nil {
  269. if closeErr := c.close(); closeErr != nil {
  270. a.log.Warnf("Failed to close candidate: %v", closeErr)
  271. }
  272. closeConnAndLog(conn, a.log, "failed to add candidate: %s %d: %v", candidateIP, udpAddr.Port, err)
  273. continue
  274. }
  275. existingConfigs[hostConfig] = struct{}{}
  276. }
  277. return nil
  278. }
  279. func (a *Agent) gatherCandidatesSrflxMapped(ctx context.Context, networkTypes []NetworkType) {
  280. var wg sync.WaitGroup
  281. defer wg.Wait()
  282. for _, networkType := range networkTypes {
  283. if networkType.IsTCP() {
  284. continue
  285. }
  286. network := networkType.String()
  287. wg.Add(1)
  288. go func() {
  289. defer wg.Done()
  290. conn, err := listenUDPInPortRange(a.net, a.log, int(a.portMax), int(a.portMin), network, &net.UDPAddr{IP: nil, Port: 0})
  291. if err != nil {
  292. a.log.Warnf("Failed to listen %s: %v", network, err)
  293. return
  294. }
  295. lAddr, ok := conn.LocalAddr().(*net.UDPAddr)
  296. if !ok {
  297. closeConnAndLog(conn, a.log, "1:1 NAT mapping is enabled but LocalAddr is not a UDPAddr")
  298. return
  299. }
  300. mappedIP, err := a.extIPMapper.findExternalIP(lAddr.IP.String())
  301. if err != nil {
  302. closeConnAndLog(conn, a.log, "1:1 NAT mapping is enabled but no external IP is found for %s", lAddr.IP.String())
  303. return
  304. }
  305. srflxConfig := CandidateServerReflexiveConfig{
  306. Network: network,
  307. Address: mappedIP.String(),
  308. Port: lAddr.Port,
  309. Component: ComponentRTP,
  310. RelAddr: lAddr.IP.String(),
  311. RelPort: lAddr.Port,
  312. }
  313. c, err := NewCandidateServerReflexive(&srflxConfig)
  314. if err != nil {
  315. closeConnAndLog(conn, a.log, "failed to create server reflexive candidate: %s %s %d: %v",
  316. network,
  317. mappedIP.String(),
  318. lAddr.Port,
  319. err)
  320. return
  321. }
  322. if err := a.addCandidate(ctx, c, conn); err != nil {
  323. if closeErr := c.close(); closeErr != nil {
  324. a.log.Warnf("Failed to close candidate: %v", closeErr)
  325. }
  326. a.log.Warnf("Failed to append to localCandidates and run onCandidateHdlr: %v", err)
  327. }
  328. }()
  329. }
  330. }
  331. func (a *Agent) gatherCandidatesSrflxUDPMux(ctx context.Context, urls []*stun.URI, networkTypes []NetworkType) { //nolint:gocognit
  332. var wg sync.WaitGroup
  333. defer wg.Wait()
  334. for _, networkType := range networkTypes {
  335. if networkType.IsTCP() {
  336. continue
  337. }
  338. for i := range urls {
  339. for _, listenAddr := range a.udpMuxSrflx.GetListenAddresses() {
  340. udpAddr, ok := listenAddr.(*net.UDPAddr)
  341. if !ok {
  342. a.log.Warn("Failed to cast udpMuxSrflx listen address to UDPAddr")
  343. continue
  344. }
  345. wg.Add(1)
  346. go func(url stun.URI, network string, localAddr *net.UDPAddr) {
  347. defer wg.Done()
  348. hostPort := fmt.Sprintf("%s:%d", url.Host, url.Port)
  349. serverAddr, err := a.net.ResolveUDPAddr(network, hostPort)
  350. if err != nil {
  351. a.log.Warnf("Failed to resolve STUN host: %s: %v", hostPort, err)
  352. return
  353. }
  354. xorAddr, err := a.udpMuxSrflx.GetXORMappedAddr(serverAddr, stunGatherTimeout)
  355. if err != nil {
  356. a.log.Warnf("Failed get server reflexive address %s %s: %v", network, url, err)
  357. return
  358. }
  359. conn, err := a.udpMuxSrflx.GetConnForURL(a.localUfrag, url.String(), localAddr)
  360. if err != nil {
  361. a.log.Warnf("Failed to find connection in UDPMuxSrflx %s %s: %v", network, url, err)
  362. return
  363. }
  364. ip := xorAddr.IP
  365. port := xorAddr.Port
  366. srflxConfig := CandidateServerReflexiveConfig{
  367. Network: network,
  368. Address: ip.String(),
  369. Port: port,
  370. Component: ComponentRTP,
  371. RelAddr: localAddr.IP.String(),
  372. RelPort: localAddr.Port,
  373. }
  374. c, err := NewCandidateServerReflexive(&srflxConfig)
  375. if err != nil {
  376. closeConnAndLog(conn, a.log, "failed to create server reflexive candidate: %s %s %d: %v", network, ip, port, err)
  377. return
  378. }
  379. if err := a.addCandidate(ctx, c, conn); err != nil {
  380. if closeErr := c.close(); closeErr != nil {
  381. a.log.Warnf("Failed to close candidate: %v", closeErr)
  382. }
  383. a.log.Warnf("Failed to append to localCandidates and run onCandidateHdlr: %v", err)
  384. }
  385. }(*urls[i], networkType.String(), udpAddr)
  386. }
  387. }
  388. }
  389. }
  390. func (a *Agent) gatherCandidatesSrflx(ctx context.Context, urls []*stun.URI, networkTypes []NetworkType) { //nolint:gocognit
  391. var wg sync.WaitGroup
  392. defer wg.Wait()
  393. for _, networkType := range networkTypes {
  394. if networkType.IsTCP() {
  395. continue
  396. }
  397. for i := range urls {
  398. wg.Add(1)
  399. go func(url stun.URI, network string) {
  400. defer wg.Done()
  401. hostPort := fmt.Sprintf("%s:%d", url.Host, url.Port)
  402. serverAddr, err := a.net.ResolveUDPAddr(network, hostPort)
  403. if err != nil {
  404. a.log.Warnf("Failed to resolve STUN host: %s: %v", hostPort, err)
  405. return
  406. }
  407. conn, err := listenUDPInPortRange(a.net, a.log, int(a.portMax), int(a.portMin), network, &net.UDPAddr{IP: nil, Port: 0})
  408. if err != nil {
  409. closeConnAndLog(conn, a.log, "failed to listen for %s: %v", serverAddr.String(), err)
  410. return
  411. }
  412. // If the agent closes midway through the connection
  413. // we end it early to prevent close delay.
  414. cancelCtx, cancelFunc := context.WithCancel(ctx)
  415. defer cancelFunc()
  416. go func() {
  417. select {
  418. case <-cancelCtx.Done():
  419. return
  420. case <-a.done:
  421. _ = conn.Close()
  422. }
  423. }()
  424. xorAddr, err := stunx.GetXORMappedAddr(conn, serverAddr, stunGatherTimeout)
  425. if err != nil {
  426. closeConnAndLog(conn, a.log, "failed to get server reflexive address %s %s: %v", network, url, err)
  427. return
  428. }
  429. ip := xorAddr.IP
  430. port := xorAddr.Port
  431. lAddr := conn.LocalAddr().(*net.UDPAddr) //nolint:forcetypeassert
  432. srflxConfig := CandidateServerReflexiveConfig{
  433. Network: network,
  434. Address: ip.String(),
  435. Port: port,
  436. Component: ComponentRTP,
  437. RelAddr: lAddr.IP.String(),
  438. RelPort: lAddr.Port,
  439. }
  440. c, err := NewCandidateServerReflexive(&srflxConfig)
  441. if err != nil {
  442. closeConnAndLog(conn, a.log, "failed to create server reflexive candidate: %s %s %d: %v", network, ip, port, err)
  443. return
  444. }
  445. if err := a.addCandidate(ctx, c, conn); err != nil {
  446. if closeErr := c.close(); closeErr != nil {
  447. a.log.Warnf("Failed to close candidate: %v", closeErr)
  448. }
  449. a.log.Warnf("Failed to append to localCandidates and run onCandidateHdlr: %v", err)
  450. }
  451. }(*urls[i], networkType.String())
  452. }
  453. }
  454. }
  455. func (a *Agent) gatherCandidatesRelay(ctx context.Context, urls []*stun.URI) { //nolint:gocognit
  456. var wg sync.WaitGroup
  457. defer wg.Wait()
  458. network := NetworkTypeUDP4.String()
  459. for i := range urls {
  460. switch {
  461. case urls[i].Scheme != stun.SchemeTypeTURN && urls[i].Scheme != stun.SchemeTypeTURNS:
  462. continue
  463. case urls[i].Username == "":
  464. a.log.Errorf("Failed to gather relay candidates: %v", ErrUsernameEmpty)
  465. return
  466. case urls[i].Password == "":
  467. a.log.Errorf("Failed to gather relay candidates: %v", ErrPasswordEmpty)
  468. return
  469. }
  470. wg.Add(1)
  471. go func(url stun.URI) {
  472. defer wg.Done()
  473. turnServerAddr := fmt.Sprintf("%s:%d", url.Host, url.Port)
  474. var (
  475. locConn net.PacketConn
  476. err error
  477. relAddr string
  478. relPort int
  479. relayProtocol string
  480. )
  481. switch {
  482. case url.Proto == stun.ProtoTypeUDP && url.Scheme == stun.SchemeTypeTURN:
  483. if locConn, err = a.net.ListenPacket(network, "0.0.0.0:0"); err != nil {
  484. a.log.Warnf("Failed to listen %s: %v", network, err)
  485. return
  486. }
  487. relAddr = locConn.LocalAddr().(*net.UDPAddr).IP.String() //nolint:forcetypeassert
  488. relPort = locConn.LocalAddr().(*net.UDPAddr).Port //nolint:forcetypeassert
  489. relayProtocol = udp
  490. case a.proxyDialer != nil && url.Proto == stun.ProtoTypeTCP &&
  491. (url.Scheme == stun.SchemeTypeTURN || url.Scheme == stun.SchemeTypeTURNS):
  492. conn, connectErr := a.proxyDialer.Dial(NetworkTypeTCP4.String(), turnServerAddr)
  493. if connectErr != nil {
  494. a.log.Warnf("Failed to dial TCP address %s via proxy dialer: %v", turnServerAddr, connectErr)
  495. return
  496. }
  497. relAddr = conn.LocalAddr().(*net.TCPAddr).IP.String() //nolint:forcetypeassert
  498. relPort = conn.LocalAddr().(*net.TCPAddr).Port //nolint:forcetypeassert
  499. if url.Scheme == stun.SchemeTypeTURN {
  500. relayProtocol = tcp
  501. } else if url.Scheme == stun.SchemeTypeTURNS {
  502. relayProtocol = "tls"
  503. }
  504. locConn = turn.NewSTUNConn(conn)
  505. case url.Proto == stun.ProtoTypeTCP && url.Scheme == stun.SchemeTypeTURN:
  506. tcpAddr, connectErr := a.net.ResolveTCPAddr(NetworkTypeTCP4.String(), turnServerAddr)
  507. if connectErr != nil {
  508. a.log.Warnf("Failed to resolve TCP address %s: %v", turnServerAddr, connectErr)
  509. return
  510. }
  511. conn, connectErr := a.net.DialTCP(NetworkTypeTCP4.String(), nil, tcpAddr)
  512. if connectErr != nil {
  513. a.log.Warnf("Failed to dial TCP address %s: %v", turnServerAddr, connectErr)
  514. return
  515. }
  516. relAddr = conn.LocalAddr().(*net.TCPAddr).IP.String() //nolint:forcetypeassert
  517. relPort = conn.LocalAddr().(*net.TCPAddr).Port //nolint:forcetypeassert
  518. relayProtocol = tcp
  519. locConn = turn.NewSTUNConn(conn)
  520. case url.Proto == stun.ProtoTypeUDP && url.Scheme == stun.SchemeTypeTURNS:
  521. udpAddr, connectErr := a.net.ResolveUDPAddr(network, turnServerAddr)
  522. if connectErr != nil {
  523. a.log.Warnf("Failed to resolve UDP address %s: %v", turnServerAddr, connectErr)
  524. return
  525. }
  526. udpConn, dialErr := a.net.DialUDP("udp", nil, udpAddr)
  527. if dialErr != nil {
  528. a.log.Warnf("Failed to dial DTLS address %s: %v", turnServerAddr, dialErr)
  529. return
  530. }
  531. conn, connectErr := dtls.ClientWithContext(ctx, udpConn, &dtls.Config{
  532. ServerName: url.Host,
  533. InsecureSkipVerify: a.insecureSkipVerify, //nolint:gosec
  534. })
  535. if connectErr != nil {
  536. a.log.Warnf("Failed to create DTLS client: %v", turnServerAddr, connectErr)
  537. return
  538. }
  539. relAddr = conn.LocalAddr().(*net.UDPAddr).IP.String() //nolint:forcetypeassert
  540. relPort = conn.LocalAddr().(*net.UDPAddr).Port //nolint:forcetypeassert
  541. relayProtocol = "dtls"
  542. locConn = &fakenet.PacketConn{Conn: conn}
  543. case url.Proto == stun.ProtoTypeTCP && url.Scheme == stun.SchemeTypeTURNS:
  544. tcpAddr, resolvErr := a.net.ResolveTCPAddr(NetworkTypeTCP4.String(), turnServerAddr)
  545. if resolvErr != nil {
  546. a.log.Warnf("Failed to resolve relay address %s: %v", turnServerAddr, resolvErr)
  547. return
  548. }
  549. tcpConn, dialErr := a.net.DialTCP(NetworkTypeTCP4.String(), nil, tcpAddr)
  550. if dialErr != nil {
  551. a.log.Warnf("Failed to connect to relay: %v", dialErr)
  552. return
  553. }
  554. conn := tls.Client(tcpConn, &tls.Config{
  555. ServerName: url.Host,
  556. InsecureSkipVerify: a.insecureSkipVerify, //nolint:gosec
  557. })
  558. if hsErr := conn.HandshakeContext(ctx); hsErr != nil {
  559. if closeErr := tcpConn.Close(); closeErr != nil {
  560. a.log.Errorf("Failed to close relay connection: %v", closeErr)
  561. }
  562. a.log.Warnf("Failed to connect to relay: %v", hsErr)
  563. return
  564. }
  565. relAddr = conn.LocalAddr().(*net.TCPAddr).IP.String() //nolint:forcetypeassert
  566. relPort = conn.LocalAddr().(*net.TCPAddr).Port //nolint:forcetypeassert
  567. relayProtocol = "tls"
  568. locConn = turn.NewSTUNConn(conn)
  569. default:
  570. a.log.Warnf("Unable to handle URL in gatherCandidatesRelay %v", url)
  571. return
  572. }
  573. client, err := turn.NewClient(&turn.ClientConfig{
  574. TURNServerAddr: turnServerAddr,
  575. Conn: locConn,
  576. Username: url.Username,
  577. Password: url.Password,
  578. LoggerFactory: a.loggerFactory,
  579. Net: a.net,
  580. })
  581. if err != nil {
  582. closeConnAndLog(locConn, a.log, "failed to create new TURN client %s %s", turnServerAddr, err)
  583. return
  584. }
  585. if err = client.Listen(); err != nil {
  586. client.Close()
  587. closeConnAndLog(locConn, a.log, "failed to listen on TURN client %s %s", turnServerAddr, err)
  588. return
  589. }
  590. relayConn, err := client.Allocate()
  591. if err != nil {
  592. client.Close()
  593. closeConnAndLog(locConn, a.log, "failed to allocate on TURN client %s %s", turnServerAddr, err)
  594. return
  595. }
  596. rAddr := relayConn.LocalAddr().(*net.UDPAddr) //nolint:forcetypeassert
  597. relayConfig := CandidateRelayConfig{
  598. Network: network,
  599. Component: ComponentRTP,
  600. Address: rAddr.IP.String(),
  601. Port: rAddr.Port,
  602. RelAddr: relAddr,
  603. RelPort: relPort,
  604. RelayProtocol: relayProtocol,
  605. OnClose: func() error {
  606. client.Close()
  607. return locConn.Close()
  608. },
  609. }
  610. relayConnClose := func() {
  611. if relayConErr := relayConn.Close(); relayConErr != nil {
  612. a.log.Warnf("Failed to close relay %v", relayConErr)
  613. }
  614. }
  615. candidate, err := NewCandidateRelay(&relayConfig)
  616. if err != nil {
  617. relayConnClose()
  618. client.Close()
  619. closeConnAndLog(locConn, a.log, "failed to create relay candidate: %s %s: %v", network, rAddr.String(), err)
  620. return
  621. }
  622. if err := a.addCandidate(ctx, candidate, relayConn); err != nil {
  623. relayConnClose()
  624. if closeErr := candidate.close(); closeErr != nil {
  625. a.log.Warnf("Failed to close candidate: %v", closeErr)
  626. }
  627. a.log.Warnf("Failed to append to localCandidates and run onCandidateHdlr: %v", err)
  628. }
  629. }(*urls[i])
  630. }
  631. }