123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631 |
- // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
- // SPDX-License-Identifier: MIT
- //go:build !js
- // +build !js
- package webrtc
- import (
- "errors"
- "fmt"
- "io"
- "math"
- "sync"
- "sync/atomic"
- "time"
- "github.com/pion/datachannel"
- "github.com/pion/logging"
- "github.com/pion/webrtc/v3/pkg/rtcerr"
- )
- const dataChannelBufferSize = math.MaxUint16 // message size limit for Chromium
- var errSCTPNotEstablished = errors.New("SCTP not established")
- // DataChannel represents a WebRTC DataChannel
- // The DataChannel interface represents a network channel
- // which can be used for bidirectional peer-to-peer transfers of arbitrary data
- type DataChannel struct {
- mu sync.RWMutex
- statsID string
- label string
- ordered bool
- maxPacketLifeTime *uint16
- maxRetransmits *uint16
- protocol string
- negotiated bool
- id *uint16
- readyState atomic.Value // DataChannelState
- bufferedAmountLowThreshold uint64
- detachCalled bool
- // The binaryType represents attribute MUST, on getting, return the value to
- // which it was last set. On setting, if the new value is either the string
- // "blob" or the string "arraybuffer", then set the IDL attribute to this
- // new value. Otherwise, throw a SyntaxError. When an DataChannel object
- // is created, the binaryType attribute MUST be initialized to the string
- // "blob". This attribute controls how binary data is exposed to scripts.
- // binaryType string
- onMessageHandler func(DataChannelMessage)
- openHandlerOnce sync.Once
- onOpenHandler func()
- dialHandlerOnce sync.Once
- onDialHandler func()
- onCloseHandler func()
- onBufferedAmountLow func()
- onErrorHandler func(error)
- sctpTransport *SCTPTransport
- dataChannel *datachannel.DataChannel
- // A reference to the associated api object used by this datachannel
- api *API
- log logging.LeveledLogger
- }
- // NewDataChannel creates a new DataChannel.
- // This constructor is part of the ORTC API. It is not
- // meant to be used together with the basic WebRTC API.
- func (api *API) NewDataChannel(transport *SCTPTransport, params *DataChannelParameters) (*DataChannel, error) {
- d, err := api.newDataChannel(params, nil, api.settingEngine.LoggerFactory.NewLogger("ortc"))
- if err != nil {
- return nil, err
- }
- err = d.open(transport)
- if err != nil {
- return nil, err
- }
- return d, nil
- }
- // newDataChannel is an internal constructor for the data channel used to
- // create the DataChannel object before the networking is set up.
- func (api *API) newDataChannel(params *DataChannelParameters, sctpTransport *SCTPTransport, log logging.LeveledLogger) (*DataChannel, error) {
- // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #5)
- if len(params.Label) > 65535 {
- return nil, &rtcerr.TypeError{Err: ErrStringSizeLimit}
- }
- d := &DataChannel{
- sctpTransport: sctpTransport,
- statsID: fmt.Sprintf("DataChannel-%d", time.Now().UnixNano()),
- label: params.Label,
- protocol: params.Protocol,
- negotiated: params.Negotiated,
- id: params.ID,
- ordered: params.Ordered,
- maxPacketLifeTime: params.MaxPacketLifeTime,
- maxRetransmits: params.MaxRetransmits,
- api: api,
- log: log,
- }
- d.setReadyState(DataChannelStateConnecting)
- return d, nil
- }
- // open opens the datachannel over the sctp transport
- func (d *DataChannel) open(sctpTransport *SCTPTransport) error {
- association := sctpTransport.association()
- if association == nil {
- return errSCTPNotEstablished
- }
- d.mu.Lock()
- if d.sctpTransport != nil { // already open
- d.mu.Unlock()
- return nil
- }
- d.sctpTransport = sctpTransport
- var channelType datachannel.ChannelType
- var reliabilityParameter uint32
- switch {
- case d.maxPacketLifeTime == nil && d.maxRetransmits == nil:
- if d.ordered {
- channelType = datachannel.ChannelTypeReliable
- } else {
- channelType = datachannel.ChannelTypeReliableUnordered
- }
- case d.maxRetransmits != nil:
- reliabilityParameter = uint32(*d.maxRetransmits)
- if d.ordered {
- channelType = datachannel.ChannelTypePartialReliableRexmit
- } else {
- channelType = datachannel.ChannelTypePartialReliableRexmitUnordered
- }
- default:
- reliabilityParameter = uint32(*d.maxPacketLifeTime)
- if d.ordered {
- channelType = datachannel.ChannelTypePartialReliableTimed
- } else {
- channelType = datachannel.ChannelTypePartialReliableTimedUnordered
- }
- }
- cfg := &datachannel.Config{
- ChannelType: channelType,
- Priority: datachannel.ChannelPriorityNormal,
- ReliabilityParameter: reliabilityParameter,
- Label: d.label,
- Protocol: d.protocol,
- Negotiated: d.negotiated,
- LoggerFactory: d.api.settingEngine.LoggerFactory,
- }
- if d.id == nil {
- // avoid holding lock when generating ID, since id generation locks
- d.mu.Unlock()
- var dcID *uint16
- err := d.sctpTransport.generateAndSetDataChannelID(d.sctpTransport.dtlsTransport.role(), &dcID)
- if err != nil {
- return err
- }
- d.mu.Lock()
- d.id = dcID
- }
- dc, err := datachannel.Dial(association, *d.id, cfg)
- if err != nil {
- d.mu.Unlock()
- return err
- }
- // bufferedAmountLowThreshold and onBufferedAmountLow might be set earlier
- dc.SetBufferedAmountLowThreshold(d.bufferedAmountLowThreshold)
- dc.OnBufferedAmountLow(d.onBufferedAmountLow)
- d.mu.Unlock()
- d.onDial()
- d.handleOpen(dc, false, d.negotiated)
- return nil
- }
- // Transport returns the SCTPTransport instance the DataChannel is sending over.
- func (d *DataChannel) Transport() *SCTPTransport {
- d.mu.RLock()
- defer d.mu.RUnlock()
- return d.sctpTransport
- }
- // After onOpen is complete check that the user called detach
- // and provide an error message if the call was missed
- func (d *DataChannel) checkDetachAfterOpen() {
- d.mu.RLock()
- defer d.mu.RUnlock()
- if d.api.settingEngine.detach.DataChannels && !d.detachCalled {
- d.log.Warn("webrtc.DetachDataChannels() enabled but didn't Detach, call Detach from OnOpen")
- }
- }
- // OnOpen sets an event handler which is invoked when
- // the underlying data transport has been established (or re-established).
- func (d *DataChannel) OnOpen(f func()) {
- d.mu.Lock()
- d.openHandlerOnce = sync.Once{}
- d.onOpenHandler = f
- d.mu.Unlock()
- if d.ReadyState() == DataChannelStateOpen {
- // If the data channel is already open, call the handler immediately.
- go d.openHandlerOnce.Do(func() {
- f()
- d.checkDetachAfterOpen()
- })
- }
- }
- func (d *DataChannel) onOpen() {
- d.mu.RLock()
- handler := d.onOpenHandler
- d.mu.RUnlock()
- if handler != nil {
- go d.openHandlerOnce.Do(func() {
- handler()
- d.checkDetachAfterOpen()
- })
- }
- }
- // OnDial sets an event handler which is invoked when the
- // peer has been dialed, but before said peer has responsed
- func (d *DataChannel) OnDial(f func()) {
- d.mu.Lock()
- d.dialHandlerOnce = sync.Once{}
- d.onDialHandler = f
- d.mu.Unlock()
- if d.ReadyState() == DataChannelStateOpen {
- // If the data channel is already open, call the handler immediately.
- go d.dialHandlerOnce.Do(f)
- }
- }
- func (d *DataChannel) onDial() {
- d.mu.RLock()
- handler := d.onDialHandler
- d.mu.RUnlock()
- if handler != nil {
- go d.dialHandlerOnce.Do(handler)
- }
- }
- // OnClose sets an event handler which is invoked when
- // the underlying data transport has been closed.
- func (d *DataChannel) OnClose(f func()) {
- d.mu.Lock()
- defer d.mu.Unlock()
- d.onCloseHandler = f
- }
- func (d *DataChannel) onClose() {
- d.mu.RLock()
- handler := d.onCloseHandler
- d.mu.RUnlock()
- if handler != nil {
- go handler()
- }
- }
- // OnMessage sets an event handler which is invoked on a binary
- // message arrival over the sctp transport from a remote peer.
- // OnMessage can currently receive messages up to 16384 bytes
- // in size. Check out the detach API if you want to use larger
- // message sizes. Note that browser support for larger messages
- // is also limited.
- func (d *DataChannel) OnMessage(f func(msg DataChannelMessage)) {
- d.mu.Lock()
- defer d.mu.Unlock()
- d.onMessageHandler = f
- }
- func (d *DataChannel) onMessage(msg DataChannelMessage) {
- d.mu.RLock()
- handler := d.onMessageHandler
- d.mu.RUnlock()
- if handler == nil {
- return
- }
- handler(msg)
- }
- func (d *DataChannel) handleOpen(dc *datachannel.DataChannel, isRemote, isAlreadyNegotiated bool) {
- d.mu.Lock()
- d.dataChannel = dc
- d.mu.Unlock()
- d.setReadyState(DataChannelStateOpen)
- // Fire the OnOpen handler immediately not using pion/datachannel
- // * detached datachannels have no read loop, the user needs to read and query themselves
- // * remote datachannels should fire OnOpened. This isn't spec compliant, but we can't break behavior yet
- // * already negotiated datachannels should fire OnOpened
- if d.api.settingEngine.detach.DataChannels || isRemote || isAlreadyNegotiated {
- // bufferedAmountLowThreshold and onBufferedAmountLow might be set earlier
- d.dataChannel.SetBufferedAmountLowThreshold(d.bufferedAmountLowThreshold)
- d.dataChannel.OnBufferedAmountLow(d.onBufferedAmountLow)
- d.onOpen()
- } else {
- dc.OnOpen(func() {
- d.onOpen()
- })
- }
- d.mu.Lock()
- defer d.mu.Unlock()
- if !d.api.settingEngine.detach.DataChannels {
- go d.readLoop()
- }
- }
- // OnError sets an event handler which is invoked when
- // the underlying data transport cannot be read.
- func (d *DataChannel) OnError(f func(err error)) {
- d.mu.Lock()
- defer d.mu.Unlock()
- d.onErrorHandler = f
- }
- func (d *DataChannel) onError(err error) {
- d.mu.RLock()
- handler := d.onErrorHandler
- d.mu.RUnlock()
- if handler != nil {
- go handler(err)
- }
- }
- // See https://github.com/pion/webrtc/issues/1516
- // nolint:gochecknoglobals
- var rlBufPool = sync.Pool{New: func() interface{} {
- return make([]byte, dataChannelBufferSize)
- }}
- func (d *DataChannel) readLoop() {
- for {
- buffer := rlBufPool.Get().([]byte) //nolint:forcetypeassert
- n, isString, err := d.dataChannel.ReadDataChannel(buffer)
- if err != nil {
- rlBufPool.Put(buffer) // nolint:staticcheck
- d.setReadyState(DataChannelStateClosed)
- if !errors.Is(err, io.EOF) {
- d.onError(err)
- }
- d.onClose()
- return
- }
- m := DataChannelMessage{Data: make([]byte, n), IsString: isString}
- copy(m.Data, buffer[:n])
- // The 'staticcheck' pragma is a false positive on the part of the CI linter.
- rlBufPool.Put(buffer) // nolint:staticcheck
- // NB: Why was DataChannelMessage not passed as a pointer value?
- d.onMessage(m) // nolint:staticcheck
- }
- }
- // Send sends the binary message to the DataChannel peer
- func (d *DataChannel) Send(data []byte) error {
- err := d.ensureOpen()
- if err != nil {
- return err
- }
- _, err = d.dataChannel.WriteDataChannel(data, false)
- return err
- }
- // SendText sends the text message to the DataChannel peer
- func (d *DataChannel) SendText(s string) error {
- err := d.ensureOpen()
- if err != nil {
- return err
- }
- _, err = d.dataChannel.WriteDataChannel([]byte(s), true)
- return err
- }
- func (d *DataChannel) ensureOpen() error {
- d.mu.RLock()
- defer d.mu.RUnlock()
- if d.ReadyState() != DataChannelStateOpen {
- return io.ErrClosedPipe
- }
- return nil
- }
- // Detach allows you to detach the underlying datachannel. This provides
- // an idiomatic API to work with, however it disables the OnMessage callback.
- // Before calling Detach you have to enable this behavior by calling
- // webrtc.DetachDataChannels(). Combining detached and normal data channels
- // is not supported.
- // Please refer to the data-channels-detach example and the
- // pion/datachannel documentation for the correct way to handle the
- // resulting DataChannel object.
- func (d *DataChannel) Detach() (datachannel.ReadWriteCloser, error) {
- d.mu.Lock()
- defer d.mu.Unlock()
- if !d.api.settingEngine.detach.DataChannels {
- return nil, errDetachNotEnabled
- }
- if d.dataChannel == nil {
- return nil, errDetachBeforeOpened
- }
- d.detachCalled = true
- return d.dataChannel, nil
- }
- // Close Closes the DataChannel. It may be called regardless of whether
- // the DataChannel object was created by this peer or the remote peer.
- func (d *DataChannel) Close() error {
- d.mu.Lock()
- haveSctpTransport := d.dataChannel != nil
- d.mu.Unlock()
- if d.ReadyState() == DataChannelStateClosed {
- return nil
- }
- d.setReadyState(DataChannelStateClosing)
- if !haveSctpTransport {
- return nil
- }
- return d.dataChannel.Close()
- }
- // Label represents a label that can be used to distinguish this
- // DataChannel object from other DataChannel objects. Scripts are
- // allowed to create multiple DataChannel objects with the same label.
- func (d *DataChannel) Label() string {
- d.mu.RLock()
- defer d.mu.RUnlock()
- return d.label
- }
- // Ordered returns true if the DataChannel is ordered, and false if
- // out-of-order delivery is allowed.
- func (d *DataChannel) Ordered() bool {
- d.mu.RLock()
- defer d.mu.RUnlock()
- return d.ordered
- }
- // MaxPacketLifeTime represents the length of the time window (msec) during
- // which transmissions and retransmissions may occur in unreliable mode.
- func (d *DataChannel) MaxPacketLifeTime() *uint16 {
- d.mu.RLock()
- defer d.mu.RUnlock()
- return d.maxPacketLifeTime
- }
- // MaxRetransmits represents the maximum number of retransmissions that are
- // attempted in unreliable mode.
- func (d *DataChannel) MaxRetransmits() *uint16 {
- d.mu.RLock()
- defer d.mu.RUnlock()
- return d.maxRetransmits
- }
- // Protocol represents the name of the sub-protocol used with this
- // DataChannel.
- func (d *DataChannel) Protocol() string {
- d.mu.RLock()
- defer d.mu.RUnlock()
- return d.protocol
- }
- // Negotiated represents whether this DataChannel was negotiated by the
- // application (true), or not (false).
- func (d *DataChannel) Negotiated() bool {
- d.mu.RLock()
- defer d.mu.RUnlock()
- return d.negotiated
- }
- // ID represents the ID for this DataChannel. The value is initially
- // null, which is what will be returned if the ID was not provided at
- // channel creation time, and the DTLS role of the SCTP transport has not
- // yet been negotiated. Otherwise, it will return the ID that was either
- // selected by the script or generated. After the ID is set to a non-null
- // value, it will not change.
- func (d *DataChannel) ID() *uint16 {
- d.mu.RLock()
- defer d.mu.RUnlock()
- return d.id
- }
- // ReadyState represents the state of the DataChannel object.
- func (d *DataChannel) ReadyState() DataChannelState {
- if v, ok := d.readyState.Load().(DataChannelState); ok {
- return v
- }
- return DataChannelState(0)
- }
- // BufferedAmount represents the number of bytes of application data
- // (UTF-8 text and binary data) that have been queued using send(). Even
- // though the data transmission can occur in parallel, the returned value
- // MUST NOT be decreased before the current task yielded back to the event
- // loop to prevent race conditions. The value does not include framing
- // overhead incurred by the protocol, or buffering done by the operating
- // system or network hardware. The value of BufferedAmount slot will only
- // increase with each call to the send() method as long as the ReadyState is
- // open; however, BufferedAmount does not reset to zero once the channel
- // closes.
- func (d *DataChannel) BufferedAmount() uint64 {
- d.mu.RLock()
- defer d.mu.RUnlock()
- if d.dataChannel == nil {
- return 0
- }
- return d.dataChannel.BufferedAmount()
- }
- // BufferedAmountLowThreshold represents the threshold at which the
- // bufferedAmount is considered to be low. When the bufferedAmount decreases
- // from above this threshold to equal or below it, the bufferedamountlow
- // event fires. BufferedAmountLowThreshold is initially zero on each new
- // DataChannel, but the application may change its value at any time.
- // The threshold is set to 0 by default.
- func (d *DataChannel) BufferedAmountLowThreshold() uint64 {
- d.mu.RLock()
- defer d.mu.RUnlock()
- if d.dataChannel == nil {
- return d.bufferedAmountLowThreshold
- }
- return d.dataChannel.BufferedAmountLowThreshold()
- }
- // SetBufferedAmountLowThreshold is used to update the threshold.
- // See BufferedAmountLowThreshold().
- func (d *DataChannel) SetBufferedAmountLowThreshold(th uint64) {
- d.mu.Lock()
- defer d.mu.Unlock()
- d.bufferedAmountLowThreshold = th
- if d.dataChannel != nil {
- d.dataChannel.SetBufferedAmountLowThreshold(th)
- }
- }
- // OnBufferedAmountLow sets an event handler which is invoked when
- // the number of bytes of outgoing data becomes lower than the
- // BufferedAmountLowThreshold.
- func (d *DataChannel) OnBufferedAmountLow(f func()) {
- d.mu.Lock()
- defer d.mu.Unlock()
- d.onBufferedAmountLow = f
- if d.dataChannel != nil {
- d.dataChannel.OnBufferedAmountLow(f)
- }
- }
- func (d *DataChannel) getStatsID() string {
- d.mu.Lock()
- defer d.mu.Unlock()
- return d.statsID
- }
- func (d *DataChannel) collectStats(collector *statsReportCollector) {
- collector.Collecting()
- d.mu.Lock()
- defer d.mu.Unlock()
- stats := DataChannelStats{
- Timestamp: statsTimestampNow(),
- Type: StatsTypeDataChannel,
- ID: d.statsID,
- Label: d.label,
- Protocol: d.protocol,
- // TransportID string `json:"transportId"`
- State: d.ReadyState(),
- }
- if d.id != nil {
- stats.DataChannelIdentifier = int32(*d.id)
- }
- if d.dataChannel != nil {
- stats.MessagesSent = d.dataChannel.MessagesSent()
- stats.BytesSent = d.dataChannel.BytesSent()
- stats.MessagesReceived = d.dataChannel.MessagesReceived()
- stats.BytesReceived = d.dataChannel.BytesReceived()
- }
- collector.Collect(stats.ID, stats)
- }
- func (d *DataChannel) setReadyState(r DataChannelState) {
- d.readyState.Store(r)
- }
|