client.go 15 KB


  1. package turn
  2. import (
  3. b64 "encoding/base64"
  4. "fmt"
  5. "math"
  6. "net"
  7. "sync"
  8. "time"
  9. "github.com/pion/logging"
  10. "github.com/pion/stun"
  11. "github.com/pion/transport/v2"
  12. "github.com/pion/transport/v2/stdnet"
  13. "github.com/pion/transport/v2/vnet"
  14. "github.com/pion/turn/v2/internal/client"
  15. "github.com/pion/turn/v2/internal/proto"
  16. )
  17. const (
  18. defaultRTO = 200 * time.Millisecond
  19. maxRtxCount = 7 // total 7 requests (Rc)
  20. maxDataBufferSize = math.MaxUint16 // message size limit for Chromium
  21. )
  22. // interval [msec]
  23. // 0: 0 ms +500
  24. // 1: 500 ms +1000
  25. // 2: 1500 ms +2000
  26. // 3: 3500 ms +4000
  27. // 4: 7500 ms +8000
  28. // 5: 15500 ms +16000
  29. // 6: 31500 ms +32000
  30. // -: 63500 ms failed
  31. // ClientConfig is a bag of config parameters for Client.
  32. type ClientConfig struct {
  33. STUNServerAddr string // STUN server address (e.g. "stun.abc.com:3478")
  34. TURNServerAddr string // TURN server address (e.g. "turn.abc.com:3478")
  35. Username string
  36. Password string
  37. Realm string
  38. Software string
  39. RTO time.Duration
  40. Conn net.PacketConn // Listening socket (net.PacketConn)
  41. LoggerFactory logging.LoggerFactory
  42. Net transport.Net
  43. }
  44. // Client is a STUN server client
  45. type Client struct {
  46. conn net.PacketConn // read-only
  47. stunServ net.Addr // read-only
  48. turnServ net.Addr // read-only
  49. stunServStr string // read-only, used for de-multiplexing
  50. turnServStr string // read-only, used for de-multiplexing
  51. username stun.Username // read-only
  52. password string // read-only
  53. realm stun.Realm // read-only
  54. integrity stun.MessageIntegrity // read-only
  55. software stun.Software // read-only
  56. trMap *client.TransactionMap // thread-safe
  57. rto time.Duration // read-only
  58. relayedConn *client.UDPConn // protected by mutex ***
  59. allocTryLock client.TryLock // thread-safe
  60. listenTryLock client.TryLock // thread-safe
  61. net transport.Net // read-only
  62. mutex sync.RWMutex // thread-safe
  63. mutexTrMap sync.Mutex // thread-safe
  64. log logging.LeveledLogger // read-only
  65. }
  66. // NewClient returns a new Client instance. listeningAddress is the address and port to listen on, default "0.0.0.0:0"
  67. func NewClient(config *ClientConfig) (*Client, error) {
  68. loggerFactory := config.LoggerFactory
  69. if loggerFactory == nil {
  70. loggerFactory = logging.NewDefaultLoggerFactory()
  71. }
  72. log := loggerFactory.NewLogger("turnc")
  73. if config.Conn == nil {
  74. return nil, errNilConn
  75. }
  76. var err error
  77. if config.Net == nil {
  78. config.Net, err = stdnet.NewNet() // defaults to native operation
  79. if err != nil {
  80. return nil, err
  81. }
  82. } else if _, ok := config.Net.(*vnet.Net); ok {
  83. log.Warn("Virtual network is enabled")
  84. }
  85. var stunServ, turnServ net.Addr
  86. var stunServStr, turnServStr string
  87. if len(config.STUNServerAddr) > 0 {
  88. log.Debugf("resolving %s", config.STUNServerAddr)
  89. stunServ, err = config.Net.ResolveUDPAddr("udp4", config.STUNServerAddr)
  90. if err != nil {
  91. return nil, err
  92. }
  93. stunServStr = stunServ.String()
  94. log.Debugf("stunServ: %s", stunServStr)
  95. }
  96. if len(config.TURNServerAddr) > 0 {
  97. log.Debugf("resolving %s", config.TURNServerAddr)
  98. turnServ, err = config.Net.ResolveUDPAddr("udp4", config.TURNServerAddr)
  99. if err != nil {
  100. return nil, err
  101. }
  102. turnServStr = turnServ.String()
  103. log.Debugf("turnServ: %s", turnServStr)
  104. }
  105. rto := defaultRTO
  106. if config.RTO > 0 {
  107. rto = config.RTO
  108. }
  109. c := &Client{
  110. conn: config.Conn,
  111. stunServ: stunServ,
  112. turnServ: turnServ,
  113. stunServStr: stunServStr,
  114. turnServStr: turnServStr,
  115. username: stun.NewUsername(config.Username),
  116. password: config.Password,
  117. realm: stun.NewRealm(config.Realm),
  118. software: stun.NewSoftware(config.Software),
  119. net: config.Net,
  120. trMap: client.NewTransactionMap(),
  121. rto: rto,
  122. log: log,
  123. }
  124. return c, nil
  125. }
  126. // TURNServerAddr return the TURN server address
  127. func (c *Client) TURNServerAddr() net.Addr {
  128. return c.turnServ
  129. }
  130. // STUNServerAddr return the STUN server address
  131. func (c *Client) STUNServerAddr() net.Addr {
  132. return c.stunServ
  133. }
  134. // Username returns username
  135. func (c *Client) Username() stun.Username {
  136. return c.username
  137. }
  138. // Realm return realm
  139. func (c *Client) Realm() stun.Realm {
  140. return c.realm
  141. }
  142. // WriteTo sends data to the specified destination using the base socket.
  143. func (c *Client) WriteTo(data []byte, to net.Addr) (int, error) {
  144. return c.conn.WriteTo(data, to)
  145. }
  146. // Listen will have this client start listening on the conn provided via the config.
  147. // This is optional. If not used, you will need to call HandleInbound method
  148. // to supply incoming data, instead.
  149. func (c *Client) Listen() error {
  150. if err := c.listenTryLock.Lock(); err != nil {
  151. return fmt.Errorf("%w: %s", errAlreadyListening, err.Error())
  152. }
  153. go func() {
  154. buf := make([]byte, maxDataBufferSize)
  155. for {
  156. n, from, err := c.conn.ReadFrom(buf)
  157. if err != nil {
  158. c.log.Debugf("exiting read loop: %s", err.Error())
  159. break
  160. }
  161. _, err = c.HandleInbound(buf[:n], from)
  162. if err != nil {
  163. c.log.Debugf("exiting read loop: %s", err.Error())
  164. break
  165. }
  166. }
  167. c.listenTryLock.Unlock()
  168. }()
  169. return nil
  170. }
  171. // Close closes this client
  172. func (c *Client) Close() {
  173. c.mutexTrMap.Lock()
  174. defer c.mutexTrMap.Unlock()
  175. c.trMap.CloseAndDeleteAll()
  176. }
  177. // TransactionID & Base64: https://play.golang.org/p/EEgmJDI971P
  178. // SendBindingRequestTo sends a new STUN request to the given transport address
  179. func (c *Client) SendBindingRequestTo(to net.Addr) (net.Addr, error) {
  180. attrs := []stun.Setter{stun.TransactionID, stun.BindingRequest}
  181. if len(c.software) > 0 {
  182. attrs = append(attrs, c.software)
  183. }
  184. msg, err := stun.Build(attrs...)
  185. if err != nil {
  186. return nil, err
  187. }
  188. trRes, err := c.PerformTransaction(msg, to, false)
  189. if err != nil {
  190. return nil, err
  191. }
  192. var reflAddr stun.XORMappedAddress
  193. if err := reflAddr.GetFrom(trRes.Msg); err != nil {
  194. return nil, err
  195. }
  196. return &net.UDPAddr{
  197. IP: reflAddr.IP,
  198. Port: reflAddr.Port,
  199. }, nil
  200. }
  201. // SendBindingRequest sends a new STUN request to the STUN server
  202. func (c *Client) SendBindingRequest() (net.Addr, error) {
  203. if c.stunServ == nil {
  204. return nil, errSTUNServerAddressNotSet
  205. }
  206. return c.SendBindingRequestTo(c.stunServ)
  207. }
  208. // Allocate sends a TURN allocation request to the given transport address
  209. func (c *Client) Allocate() (net.PacketConn, error) {
  210. if err := c.allocTryLock.Lock(); err != nil {
  211. return nil, fmt.Errorf("%w: %s", errOneAllocateOnly, err.Error())
  212. }
  213. defer c.allocTryLock.Unlock()
  214. relayedConn := c.relayedUDPConn()
  215. if relayedConn != nil {
  216. return nil, fmt.Errorf("%w: %s", errAlreadyAllocated, relayedConn.LocalAddr().String())
  217. }
  218. msg, err := stun.Build(
  219. stun.TransactionID,
  220. stun.NewType(stun.MethodAllocate, stun.ClassRequest),
  221. proto.RequestedTransport{Protocol: proto.ProtoUDP},
  222. stun.Fingerprint,
  223. )
  224. if err != nil {
  225. return nil, err
  226. }
  227. trRes, err := c.PerformTransaction(msg, c.turnServ, false)
  228. if err != nil {
  229. return nil, err
  230. }
  231. res := trRes.Msg
  232. // Anonymous allocate failed, trying to authenticate.
  233. var nonce stun.Nonce
  234. if err = nonce.GetFrom(res); err != nil {
  235. return nil, err
  236. }
  237. if err = c.realm.GetFrom(res); err != nil {
  238. return nil, err
  239. }
  240. c.realm = append([]byte(nil), c.realm...)
  241. c.integrity = stun.NewLongTermIntegrity(
  242. c.username.String(), c.realm.String(), c.password,
  243. )
  244. // Trying to authorize.
  245. msg, err = stun.Build(
  246. stun.TransactionID,
  247. stun.NewType(stun.MethodAllocate, stun.ClassRequest),
  248. proto.RequestedTransport{Protocol: proto.ProtoUDP},
  249. &c.username,
  250. &c.realm,
  251. &nonce,
  252. &c.integrity,
  253. stun.Fingerprint,
  254. )
  255. if err != nil {
  256. return nil, err
  257. }
  258. trRes, err = c.PerformTransaction(msg, c.turnServ, false)
  259. if err != nil {
  260. return nil, err
  261. }
  262. res = trRes.Msg
  263. if res.Type.Class == stun.ClassErrorResponse {
  264. var code stun.ErrorCodeAttribute
  265. if err = code.GetFrom(res); err == nil {
  266. return nil, fmt.Errorf("%s (error %s)", res.Type, code) //nolint:goerr113
  267. }
  268. return nil, fmt.Errorf("%s", res.Type) //nolint:goerr113
  269. }
  270. // Getting relayed addresses from response.
  271. var relayed proto.RelayedAddress
  272. if err := relayed.GetFrom(res); err != nil {
  273. return nil, err
  274. }
  275. relayedAddr := &net.UDPAddr{
  276. IP: relayed.IP,
  277. Port: relayed.Port,
  278. }
  279. // Getting lifetime from response
  280. var lifetime proto.Lifetime
  281. if err := lifetime.GetFrom(res); err != nil {
  282. return nil, err
  283. }
  284. relayedConn = client.NewUDPConn(&client.UDPConnConfig{
  285. Observer: c,
  286. RelayedAddr: relayedAddr,
  287. Integrity: c.integrity,
  288. Nonce: nonce,
  289. Lifetime: lifetime.Duration,
  290. Log: c.log,
  291. })
  292. c.setRelayedUDPConn(relayedConn)
  293. return relayedConn, nil
  294. }
  295. // CreatePermission Issues a CreatePermission request for the supplied addresses
  296. // as described in https://datatracker.ietf.org/doc/html/rfc5766#section-9
  297. func (c *Client) CreatePermission(addrs ...net.Addr) error {
  298. return c.relayedUDPConn().CreatePermissions(addrs...)
  299. }
  300. // PerformTransaction performs STUN transaction
  301. func (c *Client) PerformTransaction(msg *stun.Message, to net.Addr, ignoreResult bool) (client.TransactionResult,
  302. error,
  303. ) {
  304. trKey := b64.StdEncoding.EncodeToString(msg.TransactionID[:])
  305. raw := make([]byte, len(msg.Raw))
  306. copy(raw, msg.Raw)
  307. tr := client.NewTransaction(&client.TransactionConfig{
  308. Key: trKey,
  309. Raw: raw,
  310. To: to,
  311. Interval: c.rto,
  312. IgnoreResult: ignoreResult,
  313. })
  314. c.trMap.Insert(trKey, tr)
  315. c.log.Tracef("start %s transaction %s to %s", msg.Type, trKey, tr.To.String())
  316. _, err := c.conn.WriteTo(tr.Raw, to)
  317. if err != nil {
  318. return client.TransactionResult{}, err
  319. }
  320. tr.StartRtxTimer(c.onRtxTimeout)
  321. // If dontWait is true, get the transaction going and return immediately
  322. if ignoreResult {
  323. return client.TransactionResult{}, nil
  324. }
  325. res := tr.WaitForResult()
  326. if res.Err != nil {
  327. return res, res.Err
  328. }
  329. return res, nil
  330. }
  331. // OnDeallocated is called when de-allocation of relay address has been complete.
  332. // (Called by UDPConn)
  333. func (c *Client) OnDeallocated(relayedAddr net.Addr) {
  334. c.setRelayedUDPConn(nil)
  335. }
  336. // HandleInbound handles data received.
  337. // This method handles incoming packet de-multiplex it by the source address
  338. // and the types of the message.
  339. // This return a boolean (handled or not) and if there was an error.
  340. // Caller should check if the packet was handled by this client or not.
  341. // If not handled, it is assumed that the packet is application data.
  342. // If an error is returned, the caller should discard the packet regardless.
  343. func (c *Client) HandleInbound(data []byte, from net.Addr) (bool, error) {
  344. // +-------------------+-------------------------------+
  345. // | Return Values | |
  346. // +-------------------+ Meaning / Action |
  347. // | handled | error | |
  348. // |=========+=========+===============================+
  349. // | false | nil | Handle the packet as app data |
  350. // |---------+---------+-------------------------------+
  351. // | true | nil | Nothing to do |
  352. // |---------+---------+-------------------------------+
  353. // | false | error | (shouldn't happen) |
  354. // |---------+---------+-------------------------------+
  355. // | true | error | Error occurred while handling |
  356. // +---------+---------+-------------------------------+
  357. // Possible causes of the error:
  358. // - Malformed packet (parse error)
  359. // - STUN message was a request
  360. // - Non-STUN message from the STUN server
  361. switch {
  362. case stun.IsMessage(data):
  363. return true, c.handleSTUNMessage(data, from)
  364. case proto.IsChannelData(data):
  365. return true, c.handleChannelData(data)
  366. case len(c.stunServStr) != 0 && from.String() == c.stunServStr:
  367. // received from STUN server but it is not a STUN message
  368. return true, errNonSTUNMessage
  369. default:
  370. // assume, this is an application data
  371. c.log.Tracef("non-STUN/TURN packet, unhandled")
  372. }
  373. return false, nil
  374. }
  375. func (c *Client) handleSTUNMessage(data []byte, from net.Addr) error {
  376. raw := make([]byte, len(data))
  377. copy(raw, data)
  378. msg := &stun.Message{Raw: raw}
  379. if err := msg.Decode(); err != nil {
  380. return fmt.Errorf("%w: %s", errFailedToDecodeSTUN, err.Error())
  381. }
  382. if msg.Type.Class == stun.ClassRequest {
  383. return fmt.Errorf("%w : %s", errUnexpectedSTUNRequestMessage, msg.String())
  384. }
  385. if msg.Type.Class == stun.ClassIndication {
  386. if msg.Type.Method == stun.MethodData {
  387. var peerAddr proto.PeerAddress
  388. if err := peerAddr.GetFrom(msg); err != nil {
  389. return err
  390. }
  391. from = &net.UDPAddr{
  392. IP: peerAddr.IP,
  393. Port: peerAddr.Port,
  394. }
  395. var data proto.Data
  396. if err := data.GetFrom(msg); err != nil {
  397. return err
  398. }
  399. c.log.Debugf("data indication received from %s", from.String())
  400. relayedConn := c.relayedUDPConn()
  401. if relayedConn == nil {
  402. c.log.Debug("no relayed conn allocated")
  403. return nil // silently discard
  404. }
  405. relayedConn.HandleInbound(data, from)
  406. }
  407. return nil
  408. }
  409. // This is a STUN response message (transactional)
  410. // The type is either:
  411. // - stun.ClassSuccessResponse
  412. // - stun.ClassErrorResponse
  413. trKey := b64.StdEncoding.EncodeToString(msg.TransactionID[:])
  414. c.mutexTrMap.Lock()
  415. tr, ok := c.trMap.Find(trKey)
  416. if !ok {
  417. c.mutexTrMap.Unlock()
  418. // silently discard
  419. c.log.Debugf("no transaction for %s", msg.String())
  420. return nil
  421. }
  422. // End the transaction
  423. tr.StopRtxTimer()
  424. c.trMap.Delete(trKey)
  425. c.mutexTrMap.Unlock()
  426. if !tr.WriteResult(client.TransactionResult{
  427. Msg: msg,
  428. From: from,
  429. Retries: tr.Retries(),
  430. }) {
  431. c.log.Debugf("no listener for %s", msg.String())
  432. }
  433. return nil
  434. }
  435. func (c *Client) handleChannelData(data []byte) error {
  436. chData := &proto.ChannelData{
  437. Raw: make([]byte, len(data)),
  438. }
  439. copy(chData.Raw, data)
  440. if err := chData.Decode(); err != nil {
  441. return err
  442. }
  443. relayedConn := c.relayedUDPConn()
  444. if relayedConn == nil {
  445. c.log.Debug("no relayed conn allocated")
  446. return nil // silently discard
  447. }
  448. addr, ok := relayedConn.FindAddrByChannelNumber(uint16(chData.Number))
  449. if !ok {
  450. return fmt.Errorf("%w: %d", errChannelBindNotFound, int(chData.Number))
  451. }
  452. c.log.Tracef("channel data received from %s (ch=%d)", addr.String(), int(chData.Number))
  453. relayedConn.HandleInbound(chData.Data, addr)
  454. return nil
  455. }
  456. func (c *Client) onRtxTimeout(trKey string, nRtx int) {
  457. c.mutexTrMap.Lock()
  458. defer c.mutexTrMap.Unlock()
  459. tr, ok := c.trMap.Find(trKey)
  460. if !ok {
  461. return // already gone
  462. }
  463. if nRtx == maxRtxCount {
  464. // all retransmissions failed
  465. c.trMap.Delete(trKey)
  466. if !tr.WriteResult(client.TransactionResult{
  467. Err: fmt.Errorf("%w %s", errAllRetransmissionsFailed, trKey),
  468. }) {
  469. c.log.Debug("no listener for transaction")
  470. }
  471. return
  472. }
  473. c.log.Tracef("retransmitting transaction %s to %s (nRtx=%d)",
  474. trKey, tr.To.String(), nRtx)
  475. _, err := c.conn.WriteTo(tr.Raw, tr.To)
  476. if err != nil {
  477. c.trMap.Delete(trKey)
  478. if !tr.WriteResult(client.TransactionResult{
  479. Err: fmt.Errorf("%w %s", errFailedToRetransmitTransaction, trKey),
  480. }) {
  481. c.log.Debug("no listener for transaction")
  482. }
  483. return
  484. }
  485. tr.StartRtxTimer(c.onRtxTimeout)
  486. }
  487. func (c *Client) setRelayedUDPConn(conn *client.UDPConn) {
  488. c.mutex.Lock()
  489. defer c.mutex.Unlock()
  490. c.relayedConn = conn
  491. }
  492. func (c *Client) relayedUDPConn() *client.UDPConn {
  493. c.mutex.RLock()
  494. defer c.mutex.RUnlock()
  495. return c.relayedConn
  496. }