datachannel.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. //go:build !js
  4. // +build !js
  5. package webrtc
  6. import (
  7. "errors"
  8. "fmt"
  9. "io"
  10. "math"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "github.com/pion/datachannel"
  15. "github.com/pion/logging"
  16. "github.com/pion/webrtc/v3/pkg/rtcerr"
  17. )
  18. const dataChannelBufferSize = math.MaxUint16 // message size limit for Chromium
  19. var errSCTPNotEstablished = errors.New("SCTP not established")
  20. // DataChannel represents a WebRTC DataChannel
  21. // The DataChannel interface represents a network channel
  22. // which can be used for bidirectional peer-to-peer transfers of arbitrary data
  23. type DataChannel struct {
  24. mu sync.RWMutex
  25. statsID string
  26. label string
  27. ordered bool
  28. maxPacketLifeTime *uint16
  29. maxRetransmits *uint16
  30. protocol string
  31. negotiated bool
  32. id *uint16
  33. readyState atomic.Value // DataChannelState
  34. bufferedAmountLowThreshold uint64
  35. detachCalled bool
  36. // The binaryType represents attribute MUST, on getting, return the value to
  37. // which it was last set. On setting, if the new value is either the string
  38. // "blob" or the string "arraybuffer", then set the IDL attribute to this
  39. // new value. Otherwise, throw a SyntaxError. When an DataChannel object
  40. // is created, the binaryType attribute MUST be initialized to the string
  41. // "blob". This attribute controls how binary data is exposed to scripts.
  42. // binaryType string
  43. onMessageHandler func(DataChannelMessage)
  44. openHandlerOnce sync.Once
  45. onOpenHandler func()
  46. dialHandlerOnce sync.Once
  47. onDialHandler func()
  48. onCloseHandler func()
  49. onBufferedAmountLow func()
  50. onErrorHandler func(error)
  51. sctpTransport *SCTPTransport
  52. dataChannel *datachannel.DataChannel
  53. // A reference to the associated api object used by this datachannel
  54. api *API
  55. log logging.LeveledLogger
  56. }
  57. // NewDataChannel creates a new DataChannel.
  58. // This constructor is part of the ORTC API. It is not
  59. // meant to be used together with the basic WebRTC API.
  60. func (api *API) NewDataChannel(transport *SCTPTransport, params *DataChannelParameters) (*DataChannel, error) {
  61. d, err := api.newDataChannel(params, nil, api.settingEngine.LoggerFactory.NewLogger("ortc"))
  62. if err != nil {
  63. return nil, err
  64. }
  65. err = d.open(transport)
  66. if err != nil {
  67. return nil, err
  68. }
  69. return d, nil
  70. }
  71. // newDataChannel is an internal constructor for the data channel used to
  72. // create the DataChannel object before the networking is set up.
  73. func (api *API) newDataChannel(params *DataChannelParameters, sctpTransport *SCTPTransport, log logging.LeveledLogger) (*DataChannel, error) {
  74. // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #5)
  75. if len(params.Label) > 65535 {
  76. return nil, &rtcerr.TypeError{Err: ErrStringSizeLimit}
  77. }
  78. d := &DataChannel{
  79. sctpTransport: sctpTransport,
  80. statsID: fmt.Sprintf("DataChannel-%d", time.Now().UnixNano()),
  81. label: params.Label,
  82. protocol: params.Protocol,
  83. negotiated: params.Negotiated,
  84. id: params.ID,
  85. ordered: params.Ordered,
  86. maxPacketLifeTime: params.MaxPacketLifeTime,
  87. maxRetransmits: params.MaxRetransmits,
  88. api: api,
  89. log: log,
  90. }
  91. d.setReadyState(DataChannelStateConnecting)
  92. return d, nil
  93. }
  94. // open opens the datachannel over the sctp transport
  95. func (d *DataChannel) open(sctpTransport *SCTPTransport) error {
  96. association := sctpTransport.association()
  97. if association == nil {
  98. return errSCTPNotEstablished
  99. }
  100. d.mu.Lock()
  101. if d.sctpTransport != nil { // already open
  102. d.mu.Unlock()
  103. return nil
  104. }
  105. d.sctpTransport = sctpTransport
  106. var channelType datachannel.ChannelType
  107. var reliabilityParameter uint32
  108. switch {
  109. case d.maxPacketLifeTime == nil && d.maxRetransmits == nil:
  110. if d.ordered {
  111. channelType = datachannel.ChannelTypeReliable
  112. } else {
  113. channelType = datachannel.ChannelTypeReliableUnordered
  114. }
  115. case d.maxRetransmits != nil:
  116. reliabilityParameter = uint32(*d.maxRetransmits)
  117. if d.ordered {
  118. channelType = datachannel.ChannelTypePartialReliableRexmit
  119. } else {
  120. channelType = datachannel.ChannelTypePartialReliableRexmitUnordered
  121. }
  122. default:
  123. reliabilityParameter = uint32(*d.maxPacketLifeTime)
  124. if d.ordered {
  125. channelType = datachannel.ChannelTypePartialReliableTimed
  126. } else {
  127. channelType = datachannel.ChannelTypePartialReliableTimedUnordered
  128. }
  129. }
  130. cfg := &datachannel.Config{
  131. ChannelType: channelType,
  132. Priority: datachannel.ChannelPriorityNormal,
  133. ReliabilityParameter: reliabilityParameter,
  134. Label: d.label,
  135. Protocol: d.protocol,
  136. Negotiated: d.negotiated,
  137. LoggerFactory: d.api.settingEngine.LoggerFactory,
  138. }
  139. if d.id == nil {
  140. // avoid holding lock when generating ID, since id generation locks
  141. d.mu.Unlock()
  142. var dcID *uint16
  143. err := d.sctpTransport.generateAndSetDataChannelID(d.sctpTransport.dtlsTransport.role(), &dcID)
  144. if err != nil {
  145. return err
  146. }
  147. d.mu.Lock()
  148. d.id = dcID
  149. }
  150. dc, err := datachannel.Dial(association, *d.id, cfg)
  151. if err != nil {
  152. d.mu.Unlock()
  153. return err
  154. }
  155. // bufferedAmountLowThreshold and onBufferedAmountLow might be set earlier
  156. dc.SetBufferedAmountLowThreshold(d.bufferedAmountLowThreshold)
  157. dc.OnBufferedAmountLow(d.onBufferedAmountLow)
  158. d.mu.Unlock()
  159. d.onDial()
  160. d.handleOpen(dc, false, d.negotiated)
  161. return nil
  162. }
  163. // Transport returns the SCTPTransport instance the DataChannel is sending over.
  164. func (d *DataChannel) Transport() *SCTPTransport {
  165. d.mu.RLock()
  166. defer d.mu.RUnlock()
  167. return d.sctpTransport
  168. }
  169. // After onOpen is complete check that the user called detach
  170. // and provide an error message if the call was missed
  171. func (d *DataChannel) checkDetachAfterOpen() {
  172. d.mu.RLock()
  173. defer d.mu.RUnlock()
  174. if d.api.settingEngine.detach.DataChannels && !d.detachCalled {
  175. d.log.Warn("webrtc.DetachDataChannels() enabled but didn't Detach, call Detach from OnOpen")
  176. }
  177. }
  178. // OnOpen sets an event handler which is invoked when
  179. // the underlying data transport has been established (or re-established).
  180. func (d *DataChannel) OnOpen(f func()) {
  181. d.mu.Lock()
  182. d.openHandlerOnce = sync.Once{}
  183. d.onOpenHandler = f
  184. d.mu.Unlock()
  185. if d.ReadyState() == DataChannelStateOpen {
  186. // If the data channel is already open, call the handler immediately.
  187. go d.openHandlerOnce.Do(func() {
  188. f()
  189. d.checkDetachAfterOpen()
  190. })
  191. }
  192. }
  193. func (d *DataChannel) onOpen() {
  194. d.mu.RLock()
  195. handler := d.onOpenHandler
  196. d.mu.RUnlock()
  197. if handler != nil {
  198. go d.openHandlerOnce.Do(func() {
  199. handler()
  200. d.checkDetachAfterOpen()
  201. })
  202. }
  203. }
  204. // OnDial sets an event handler which is invoked when the
  205. // peer has been dialed, but before said peer has responsed
  206. func (d *DataChannel) OnDial(f func()) {
  207. d.mu.Lock()
  208. d.dialHandlerOnce = sync.Once{}
  209. d.onDialHandler = f
  210. d.mu.Unlock()
  211. if d.ReadyState() == DataChannelStateOpen {
  212. // If the data channel is already open, call the handler immediately.
  213. go d.dialHandlerOnce.Do(f)
  214. }
  215. }
  216. func (d *DataChannel) onDial() {
  217. d.mu.RLock()
  218. handler := d.onDialHandler
  219. d.mu.RUnlock()
  220. if handler != nil {
  221. go d.dialHandlerOnce.Do(handler)
  222. }
  223. }
  224. // OnClose sets an event handler which is invoked when
  225. // the underlying data transport has been closed.
  226. func (d *DataChannel) OnClose(f func()) {
  227. d.mu.Lock()
  228. defer d.mu.Unlock()
  229. d.onCloseHandler = f
  230. }
  231. func (d *DataChannel) onClose() {
  232. d.mu.RLock()
  233. handler := d.onCloseHandler
  234. d.mu.RUnlock()
  235. if handler != nil {
  236. go handler()
  237. }
  238. }
  239. // OnMessage sets an event handler which is invoked on a binary
  240. // message arrival over the sctp transport from a remote peer.
  241. // OnMessage can currently receive messages up to 16384 bytes
  242. // in size. Check out the detach API if you want to use larger
  243. // message sizes. Note that browser support for larger messages
  244. // is also limited.
  245. func (d *DataChannel) OnMessage(f func(msg DataChannelMessage)) {
  246. d.mu.Lock()
  247. defer d.mu.Unlock()
  248. d.onMessageHandler = f
  249. }
  250. func (d *DataChannel) onMessage(msg DataChannelMessage) {
  251. d.mu.RLock()
  252. handler := d.onMessageHandler
  253. d.mu.RUnlock()
  254. if handler == nil {
  255. return
  256. }
  257. handler(msg)
  258. }
  259. func (d *DataChannel) handleOpen(dc *datachannel.DataChannel, isRemote, isAlreadyNegotiated bool) {
  260. d.mu.Lock()
  261. d.dataChannel = dc
  262. d.mu.Unlock()
  263. d.setReadyState(DataChannelStateOpen)
  264. // Fire the OnOpen handler immediately not using pion/datachannel
  265. // * detached datachannels have no read loop, the user needs to read and query themselves
  266. // * remote datachannels should fire OnOpened. This isn't spec compliant, but we can't break behavior yet
  267. // * already negotiated datachannels should fire OnOpened
  268. if d.api.settingEngine.detach.DataChannels || isRemote || isAlreadyNegotiated {
  269. // bufferedAmountLowThreshold and onBufferedAmountLow might be set earlier
  270. d.dataChannel.SetBufferedAmountLowThreshold(d.bufferedAmountLowThreshold)
  271. d.dataChannel.OnBufferedAmountLow(d.onBufferedAmountLow)
  272. d.onOpen()
  273. } else {
  274. dc.OnOpen(func() {
  275. d.onOpen()
  276. })
  277. }
  278. d.mu.Lock()
  279. defer d.mu.Unlock()
  280. if !d.api.settingEngine.detach.DataChannels {
  281. go d.readLoop()
  282. }
  283. }
  284. // OnError sets an event handler which is invoked when
  285. // the underlying data transport cannot be read.
  286. func (d *DataChannel) OnError(f func(err error)) {
  287. d.mu.Lock()
  288. defer d.mu.Unlock()
  289. d.onErrorHandler = f
  290. }
  291. func (d *DataChannel) onError(err error) {
  292. d.mu.RLock()
  293. handler := d.onErrorHandler
  294. d.mu.RUnlock()
  295. if handler != nil {
  296. go handler(err)
  297. }
  298. }
  299. // See https://github.com/pion/webrtc/issues/1516
  300. // nolint:gochecknoglobals
  301. var rlBufPool = sync.Pool{New: func() interface{} {
  302. return make([]byte, dataChannelBufferSize)
  303. }}
  304. func (d *DataChannel) readLoop() {
  305. for {
  306. buffer := rlBufPool.Get().([]byte) //nolint:forcetypeassert
  307. n, isString, err := d.dataChannel.ReadDataChannel(buffer)
  308. if err != nil {
  309. rlBufPool.Put(buffer) // nolint:staticcheck
  310. d.setReadyState(DataChannelStateClosed)
  311. if !errors.Is(err, io.EOF) {
  312. d.onError(err)
  313. }
  314. d.onClose()
  315. return
  316. }
  317. m := DataChannelMessage{Data: make([]byte, n), IsString: isString}
  318. copy(m.Data, buffer[:n])
  319. // The 'staticcheck' pragma is a false positive on the part of the CI linter.
  320. rlBufPool.Put(buffer) // nolint:staticcheck
  321. // NB: Why was DataChannelMessage not passed as a pointer value?
  322. d.onMessage(m) // nolint:staticcheck
  323. }
  324. }
  325. // Send sends the binary message to the DataChannel peer
  326. func (d *DataChannel) Send(data []byte) error {
  327. err := d.ensureOpen()
  328. if err != nil {
  329. return err
  330. }
  331. _, err = d.dataChannel.WriteDataChannel(data, false)
  332. return err
  333. }
  334. // SendText sends the text message to the DataChannel peer
  335. func (d *DataChannel) SendText(s string) error {
  336. err := d.ensureOpen()
  337. if err != nil {
  338. return err
  339. }
  340. _, err = d.dataChannel.WriteDataChannel([]byte(s), true)
  341. return err
  342. }
  343. func (d *DataChannel) ensureOpen() error {
  344. d.mu.RLock()
  345. defer d.mu.RUnlock()
  346. if d.ReadyState() != DataChannelStateOpen {
  347. return io.ErrClosedPipe
  348. }
  349. return nil
  350. }
  351. // Detach allows you to detach the underlying datachannel. This provides
  352. // an idiomatic API to work with, however it disables the OnMessage callback.
  353. // Before calling Detach you have to enable this behavior by calling
  354. // webrtc.DetachDataChannels(). Combining detached and normal data channels
  355. // is not supported.
  356. // Please refer to the data-channels-detach example and the
  357. // pion/datachannel documentation for the correct way to handle the
  358. // resulting DataChannel object.
  359. func (d *DataChannel) Detach() (datachannel.ReadWriteCloser, error) {
  360. d.mu.Lock()
  361. defer d.mu.Unlock()
  362. if !d.api.settingEngine.detach.DataChannels {
  363. return nil, errDetachNotEnabled
  364. }
  365. if d.dataChannel == nil {
  366. return nil, errDetachBeforeOpened
  367. }
  368. d.detachCalled = true
  369. return d.dataChannel, nil
  370. }
  371. // Close Closes the DataChannel. It may be called regardless of whether
  372. // the DataChannel object was created by this peer or the remote peer.
  373. func (d *DataChannel) Close() error {
  374. d.mu.Lock()
  375. haveSctpTransport := d.dataChannel != nil
  376. d.mu.Unlock()
  377. if d.ReadyState() == DataChannelStateClosed {
  378. return nil
  379. }
  380. d.setReadyState(DataChannelStateClosing)
  381. if !haveSctpTransport {
  382. return nil
  383. }
  384. return d.dataChannel.Close()
  385. }
  386. // Label represents a label that can be used to distinguish this
  387. // DataChannel object from other DataChannel objects. Scripts are
  388. // allowed to create multiple DataChannel objects with the same label.
  389. func (d *DataChannel) Label() string {
  390. d.mu.RLock()
  391. defer d.mu.RUnlock()
  392. return d.label
  393. }
  394. // Ordered returns true if the DataChannel is ordered, and false if
  395. // out-of-order delivery is allowed.
  396. func (d *DataChannel) Ordered() bool {
  397. d.mu.RLock()
  398. defer d.mu.RUnlock()
  399. return d.ordered
  400. }
  401. // MaxPacketLifeTime represents the length of the time window (msec) during
  402. // which transmissions and retransmissions may occur in unreliable mode.
  403. func (d *DataChannel) MaxPacketLifeTime() *uint16 {
  404. d.mu.RLock()
  405. defer d.mu.RUnlock()
  406. return d.maxPacketLifeTime
  407. }
  408. // MaxRetransmits represents the maximum number of retransmissions that are
  409. // attempted in unreliable mode.
  410. func (d *DataChannel) MaxRetransmits() *uint16 {
  411. d.mu.RLock()
  412. defer d.mu.RUnlock()
  413. return d.maxRetransmits
  414. }
  415. // Protocol represents the name of the sub-protocol used with this
  416. // DataChannel.
  417. func (d *DataChannel) Protocol() string {
  418. d.mu.RLock()
  419. defer d.mu.RUnlock()
  420. return d.protocol
  421. }
  422. // Negotiated represents whether this DataChannel was negotiated by the
  423. // application (true), or not (false).
  424. func (d *DataChannel) Negotiated() bool {
  425. d.mu.RLock()
  426. defer d.mu.RUnlock()
  427. return d.negotiated
  428. }
  429. // ID represents the ID for this DataChannel. The value is initially
  430. // null, which is what will be returned if the ID was not provided at
  431. // channel creation time, and the DTLS role of the SCTP transport has not
  432. // yet been negotiated. Otherwise, it will return the ID that was either
  433. // selected by the script or generated. After the ID is set to a non-null
  434. // value, it will not change.
  435. func (d *DataChannel) ID() *uint16 {
  436. d.mu.RLock()
  437. defer d.mu.RUnlock()
  438. return d.id
  439. }
  440. // ReadyState represents the state of the DataChannel object.
  441. func (d *DataChannel) ReadyState() DataChannelState {
  442. if v, ok := d.readyState.Load().(DataChannelState); ok {
  443. return v
  444. }
  445. return DataChannelState(0)
  446. }
  447. // BufferedAmount represents the number of bytes of application data
  448. // (UTF-8 text and binary data) that have been queued using send(). Even
  449. // though the data transmission can occur in parallel, the returned value
  450. // MUST NOT be decreased before the current task yielded back to the event
  451. // loop to prevent race conditions. The value does not include framing
  452. // overhead incurred by the protocol, or buffering done by the operating
  453. // system or network hardware. The value of BufferedAmount slot will only
  454. // increase with each call to the send() method as long as the ReadyState is
  455. // open; however, BufferedAmount does not reset to zero once the channel
  456. // closes.
  457. func (d *DataChannel) BufferedAmount() uint64 {
  458. d.mu.RLock()
  459. defer d.mu.RUnlock()
  460. if d.dataChannel == nil {
  461. return 0
  462. }
  463. return d.dataChannel.BufferedAmount()
  464. }
  465. // BufferedAmountLowThreshold represents the threshold at which the
  466. // bufferedAmount is considered to be low. When the bufferedAmount decreases
  467. // from above this threshold to equal or below it, the bufferedamountlow
  468. // event fires. BufferedAmountLowThreshold is initially zero on each new
  469. // DataChannel, but the application may change its value at any time.
  470. // The threshold is set to 0 by default.
  471. func (d *DataChannel) BufferedAmountLowThreshold() uint64 {
  472. d.mu.RLock()
  473. defer d.mu.RUnlock()
  474. if d.dataChannel == nil {
  475. return d.bufferedAmountLowThreshold
  476. }
  477. return d.dataChannel.BufferedAmountLowThreshold()
  478. }
  479. // SetBufferedAmountLowThreshold is used to update the threshold.
  480. // See BufferedAmountLowThreshold().
  481. func (d *DataChannel) SetBufferedAmountLowThreshold(th uint64) {
  482. d.mu.Lock()
  483. defer d.mu.Unlock()
  484. d.bufferedAmountLowThreshold = th
  485. if d.dataChannel != nil {
  486. d.dataChannel.SetBufferedAmountLowThreshold(th)
  487. }
  488. }
  489. // OnBufferedAmountLow sets an event handler which is invoked when
  490. // the number of bytes of outgoing data becomes lower than the
  491. // BufferedAmountLowThreshold.
  492. func (d *DataChannel) OnBufferedAmountLow(f func()) {
  493. d.mu.Lock()
  494. defer d.mu.Unlock()
  495. d.onBufferedAmountLow = f
  496. if d.dataChannel != nil {
  497. d.dataChannel.OnBufferedAmountLow(f)
  498. }
  499. }
  500. func (d *DataChannel) getStatsID() string {
  501. d.mu.Lock()
  502. defer d.mu.Unlock()
  503. return d.statsID
  504. }
  505. func (d *DataChannel) collectStats(collector *statsReportCollector) {
  506. collector.Collecting()
  507. d.mu.Lock()
  508. defer d.mu.Unlock()
  509. stats := DataChannelStats{
  510. Timestamp: statsTimestampNow(),
  511. Type: StatsTypeDataChannel,
  512. ID: d.statsID,
  513. Label: d.label,
  514. Protocol: d.protocol,
  515. // TransportID string `json:"transportId"`
  516. State: d.ReadyState(),
  517. }
  518. if d.id != nil {
  519. stats.DataChannelIdentifier = int32(*d.id)
  520. }
  521. if d.dataChannel != nil {
  522. stats.MessagesSent = d.dataChannel.MessagesSent()
  523. stats.BytesSent = d.dataChannel.BytesSent()
  524. stats.MessagesReceived = d.dataChannel.MessagesReceived()
  525. stats.BytesReceived = d.dataChannel.BytesReceived()
  526. }
  527. collector.Collect(stats.ID, stats)
  528. }
  529. func (d *DataChannel) setReadyState(r DataChannelState) {
  530. d.readyState.Store(r)
  531. }