rtpsender.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. //go:build !js
  4. // +build !js
  5. package webrtc
  6. import (
  7. "fmt"
  8. "io"
  9. "sync"
  10. "time"
  11. "github.com/pion/interceptor"
  12. "github.com/pion/randutil"
  13. "github.com/pion/rtcp"
  14. "github.com/pion/rtp"
  15. "github.com/pion/webrtc/v3/internal/util"
  16. )
  17. type trackEncoding struct {
  18. track TrackLocal
  19. srtpStream *srtpWriterFuture
  20. rtcpInterceptor interceptor.RTCPReader
  21. streamInfo interceptor.StreamInfo
  22. context TrackLocalContext
  23. ssrc SSRC
  24. }
  25. // RTPSender allows an application to control how a given Track is encoded and transmitted to a remote peer
  26. type RTPSender struct {
  27. trackEncodings []*trackEncoding
  28. transport *DTLSTransport
  29. payloadType PayloadType
  30. kind RTPCodecType
  31. // nolint:godox
  32. // TODO(sgotti) remove this when in future we'll avoid replacing
  33. // a transceiver sender since we can just check the
  34. // transceiver negotiation status
  35. negotiated bool
  36. // A reference to the associated api object
  37. api *API
  38. id string
  39. rtpTransceiver *RTPTransceiver
  40. mu sync.RWMutex
  41. sendCalled, stopCalled chan struct{}
  42. }
  43. // NewRTPSender constructs a new RTPSender
  44. func (api *API) NewRTPSender(track TrackLocal, transport *DTLSTransport) (*RTPSender, error) {
  45. if track == nil {
  46. return nil, errRTPSenderTrackNil
  47. } else if transport == nil {
  48. return nil, errRTPSenderDTLSTransportNil
  49. }
  50. id, err := randutil.GenerateCryptoRandomString(32, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
  51. if err != nil {
  52. return nil, err
  53. }
  54. r := &RTPSender{
  55. transport: transport,
  56. api: api,
  57. sendCalled: make(chan struct{}),
  58. stopCalled: make(chan struct{}),
  59. id: id,
  60. kind: track.Kind(),
  61. }
  62. r.addEncoding(track)
  63. return r, nil
  64. }
  65. func (r *RTPSender) isNegotiated() bool {
  66. r.mu.RLock()
  67. defer r.mu.RUnlock()
  68. return r.negotiated
  69. }
  70. func (r *RTPSender) setNegotiated() {
  71. r.mu.Lock()
  72. defer r.mu.Unlock()
  73. r.negotiated = true
  74. }
  75. func (r *RTPSender) setRTPTransceiver(rtpTransceiver *RTPTransceiver) {
  76. r.mu.Lock()
  77. defer r.mu.Unlock()
  78. r.rtpTransceiver = rtpTransceiver
  79. }
  80. // Transport returns the currently-configured *DTLSTransport or nil
  81. // if one has not yet been configured
  82. func (r *RTPSender) Transport() *DTLSTransport {
  83. r.mu.RLock()
  84. defer r.mu.RUnlock()
  85. return r.transport
  86. }
  87. func (r *RTPSender) getParameters() RTPSendParameters {
  88. var encodings []RTPEncodingParameters
  89. for _, trackEncoding := range r.trackEncodings {
  90. var rid string
  91. if trackEncoding.track != nil {
  92. rid = trackEncoding.track.RID()
  93. }
  94. encodings = append(encodings, RTPEncodingParameters{
  95. RTPCodingParameters: RTPCodingParameters{
  96. RID: rid,
  97. SSRC: trackEncoding.ssrc,
  98. PayloadType: r.payloadType,
  99. },
  100. })
  101. }
  102. sendParameters := RTPSendParameters{
  103. RTPParameters: r.api.mediaEngine.getRTPParametersByKind(
  104. r.kind,
  105. []RTPTransceiverDirection{RTPTransceiverDirectionSendonly},
  106. ),
  107. Encodings: encodings,
  108. }
  109. if r.rtpTransceiver != nil {
  110. sendParameters.Codecs = r.rtpTransceiver.getCodecs()
  111. } else {
  112. sendParameters.Codecs = r.api.mediaEngine.getCodecsByKind(r.kind)
  113. }
  114. return sendParameters
  115. }
  116. // GetParameters describes the current configuration for the encoding and
  117. // transmission of media on the sender's track.
  118. func (r *RTPSender) GetParameters() RTPSendParameters {
  119. r.mu.RLock()
  120. defer r.mu.RUnlock()
  121. return r.getParameters()
  122. }
  123. // AddEncoding adds an encoding to RTPSender. Used by simulcast senders.
  124. func (r *RTPSender) AddEncoding(track TrackLocal) error {
  125. r.mu.Lock()
  126. defer r.mu.Unlock()
  127. if track == nil {
  128. return errRTPSenderTrackNil
  129. }
  130. if track.RID() == "" {
  131. return errRTPSenderRidNil
  132. }
  133. if r.hasStopped() {
  134. return errRTPSenderStopped
  135. }
  136. if r.hasSent() {
  137. return errRTPSenderSendAlreadyCalled
  138. }
  139. var refTrack TrackLocal
  140. if len(r.trackEncodings) != 0 {
  141. refTrack = r.trackEncodings[0].track
  142. }
  143. if refTrack == nil || refTrack.RID() == "" {
  144. return errRTPSenderNoBaseEncoding
  145. }
  146. if refTrack.ID() != track.ID() || refTrack.StreamID() != track.StreamID() || refTrack.Kind() != track.Kind() {
  147. return errRTPSenderBaseEncodingMismatch
  148. }
  149. for _, encoding := range r.trackEncodings {
  150. if encoding.track == nil {
  151. continue
  152. }
  153. if encoding.track.RID() == track.RID() {
  154. return errRTPSenderRIDCollision
  155. }
  156. }
  157. r.addEncoding(track)
  158. return nil
  159. }
  160. func (r *RTPSender) addEncoding(track TrackLocal) {
  161. ssrc := SSRC(randutil.NewMathRandomGenerator().Uint32())
  162. trackEncoding := &trackEncoding{
  163. track: track,
  164. srtpStream: &srtpWriterFuture{ssrc: ssrc},
  165. ssrc: ssrc,
  166. }
  167. trackEncoding.srtpStream.rtpSender = r
  168. trackEncoding.rtcpInterceptor = r.api.interceptor.BindRTCPReader(
  169. interceptor.RTPReaderFunc(func(in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) {
  170. n, err = trackEncoding.srtpStream.Read(in)
  171. return n, a, err
  172. }),
  173. )
  174. r.trackEncodings = append(r.trackEncodings, trackEncoding)
  175. }
  176. // Track returns the RTCRtpTransceiver track, or nil
  177. func (r *RTPSender) Track() TrackLocal {
  178. r.mu.RLock()
  179. defer r.mu.RUnlock()
  180. if len(r.trackEncodings) == 0 {
  181. return nil
  182. }
  183. return r.trackEncodings[0].track
  184. }
  185. // ReplaceTrack replaces the track currently being used as the sender's source with a new TrackLocal.
  186. // The new track must be of the same media kind (audio, video, etc) and switching the track should not
  187. // require negotiation.
  188. func (r *RTPSender) ReplaceTrack(track TrackLocal) error {
  189. r.mu.Lock()
  190. defer r.mu.Unlock()
  191. if track != nil && r.kind != track.Kind() {
  192. return ErrRTPSenderNewTrackHasIncorrectKind
  193. }
  194. // cannot replace simulcast envelope
  195. if track != nil && len(r.trackEncodings) > 1 {
  196. return ErrRTPSenderNewTrackHasIncorrectEnvelope
  197. }
  198. var replacedTrack TrackLocal
  199. var context *TrackLocalContext
  200. if len(r.trackEncodings) != 0 {
  201. replacedTrack = r.trackEncodings[0].track
  202. context = &r.trackEncodings[0].context
  203. }
  204. if r.hasSent() && replacedTrack != nil {
  205. if err := replacedTrack.Unbind(*context); err != nil {
  206. return err
  207. }
  208. }
  209. if !r.hasSent() || track == nil {
  210. r.trackEncodings[0].track = track
  211. return nil
  212. }
  213. codec, err := track.Bind(TrackLocalContext{
  214. id: context.id,
  215. params: r.api.mediaEngine.getRTPParametersByKind(track.Kind(), []RTPTransceiverDirection{RTPTransceiverDirectionSendonly}),
  216. ssrc: context.ssrc,
  217. writeStream: context.writeStream,
  218. rtcpInterceptor: context.rtcpInterceptor,
  219. })
  220. if err != nil {
  221. // Re-bind the original track
  222. if _, reBindErr := replacedTrack.Bind(*context); reBindErr != nil {
  223. return reBindErr
  224. }
  225. return err
  226. }
  227. // Codec has changed
  228. if r.payloadType != codec.PayloadType {
  229. context.params.Codecs = []RTPCodecParameters{codec}
  230. }
  231. r.trackEncodings[0].track = track
  232. return nil
  233. }
  234. // Send Attempts to set the parameters controlling the sending of media.
  235. func (r *RTPSender) Send(parameters RTPSendParameters) error {
  236. r.mu.Lock()
  237. defer r.mu.Unlock()
  238. switch {
  239. case r.hasSent():
  240. return errRTPSenderSendAlreadyCalled
  241. case r.trackEncodings[0].track == nil:
  242. return errRTPSenderTrackRemoved
  243. }
  244. for idx, trackEncoding := range r.trackEncodings {
  245. writeStream := &interceptorToTrackLocalWriter{}
  246. trackEncoding.context = TrackLocalContext{
  247. id: r.id,
  248. params: r.api.mediaEngine.getRTPParametersByKind(trackEncoding.track.Kind(), []RTPTransceiverDirection{RTPTransceiverDirectionSendonly}),
  249. ssrc: parameters.Encodings[idx].SSRC,
  250. writeStream: writeStream,
  251. rtcpInterceptor: trackEncoding.rtcpInterceptor,
  252. }
  253. codec, err := trackEncoding.track.Bind(trackEncoding.context)
  254. if err != nil {
  255. return err
  256. }
  257. trackEncoding.context.params.Codecs = []RTPCodecParameters{codec}
  258. trackEncoding.streamInfo = *createStreamInfo(
  259. r.id,
  260. parameters.Encodings[idx].SSRC,
  261. codec.PayloadType,
  262. codec.RTPCodecCapability,
  263. parameters.HeaderExtensions,
  264. )
  265. srtpStream := trackEncoding.srtpStream
  266. rtpInterceptor := r.api.interceptor.BindLocalStream(
  267. &trackEncoding.streamInfo,
  268. interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
  269. return srtpStream.WriteRTP(header, payload)
  270. }),
  271. )
  272. writeStream.interceptor.Store(rtpInterceptor)
  273. }
  274. close(r.sendCalled)
  275. return nil
  276. }
  277. // Stop irreversibly stops the RTPSender
  278. func (r *RTPSender) Stop() error {
  279. r.mu.Lock()
  280. if stopped := r.hasStopped(); stopped {
  281. r.mu.Unlock()
  282. return nil
  283. }
  284. close(r.stopCalled)
  285. r.mu.Unlock()
  286. if !r.hasSent() {
  287. return nil
  288. }
  289. if err := r.ReplaceTrack(nil); err != nil {
  290. return err
  291. }
  292. errs := []error{}
  293. for _, trackEncoding := range r.trackEncodings {
  294. r.api.interceptor.UnbindLocalStream(&trackEncoding.streamInfo)
  295. errs = append(errs, trackEncoding.srtpStream.Close())
  296. }
  297. return util.FlattenErrs(errs)
  298. }
  299. // Read reads incoming RTCP for this RTPSender
  300. func (r *RTPSender) Read(b []byte) (n int, a interceptor.Attributes, err error) {
  301. select {
  302. case <-r.sendCalled:
  303. return r.trackEncodings[0].rtcpInterceptor.Read(b, a)
  304. case <-r.stopCalled:
  305. return 0, nil, io.ErrClosedPipe
  306. }
  307. }
  308. // ReadRTCP is a convenience method that wraps Read and unmarshals for you.
  309. func (r *RTPSender) ReadRTCP() ([]rtcp.Packet, interceptor.Attributes, error) {
  310. b := make([]byte, r.api.settingEngine.getReceiveMTU())
  311. i, attributes, err := r.Read(b)
  312. if err != nil {
  313. return nil, nil, err
  314. }
  315. pkts, err := rtcp.Unmarshal(b[:i])
  316. if err != nil {
  317. return nil, nil, err
  318. }
  319. return pkts, attributes, nil
  320. }
  321. // ReadSimulcast reads incoming RTCP for this RTPSender for given rid
  322. func (r *RTPSender) ReadSimulcast(b []byte, rid string) (n int, a interceptor.Attributes, err error) {
  323. select {
  324. case <-r.sendCalled:
  325. for _, t := range r.trackEncodings {
  326. if t.track != nil && t.track.RID() == rid {
  327. return t.rtcpInterceptor.Read(b, a)
  328. }
  329. }
  330. return 0, nil, fmt.Errorf("%w: %s", errRTPSenderNoTrackForRID, rid)
  331. case <-r.stopCalled:
  332. return 0, nil, io.ErrClosedPipe
  333. }
  334. }
  335. // ReadSimulcastRTCP is a convenience method that wraps ReadSimulcast and unmarshal for you
  336. func (r *RTPSender) ReadSimulcastRTCP(rid string) ([]rtcp.Packet, interceptor.Attributes, error) {
  337. b := make([]byte, r.api.settingEngine.getReceiveMTU())
  338. i, attributes, err := r.ReadSimulcast(b, rid)
  339. if err != nil {
  340. return nil, nil, err
  341. }
  342. pkts, err := rtcp.Unmarshal(b[:i])
  343. return pkts, attributes, err
  344. }
  345. // SetReadDeadline sets the deadline for the Read operation.
  346. // Setting to zero means no deadline.
  347. func (r *RTPSender) SetReadDeadline(t time.Time) error {
  348. return r.trackEncodings[0].srtpStream.SetReadDeadline(t)
  349. }
  350. // SetReadDeadlineSimulcast sets the max amount of time the RTCP stream for a given rid will block before returning. 0 is forever.
  351. func (r *RTPSender) SetReadDeadlineSimulcast(deadline time.Time, rid string) error {
  352. r.mu.RLock()
  353. defer r.mu.RUnlock()
  354. for _, t := range r.trackEncodings {
  355. if t.track != nil && t.track.RID() == rid {
  356. return t.srtpStream.SetReadDeadline(deadline)
  357. }
  358. }
  359. return fmt.Errorf("%w: %s", errRTPSenderNoTrackForRID, rid)
  360. }
  361. // hasSent tells if data has been ever sent for this instance
  362. func (r *RTPSender) hasSent() bool {
  363. select {
  364. case <-r.sendCalled:
  365. return true
  366. default:
  367. return false
  368. }
  369. }
  370. // hasStopped tells if stop has been called
  371. func (r *RTPSender) hasStopped() bool {
  372. select {
  373. case <-r.stopCalled:
  374. return true
  375. default:
  376. return false
  377. }
  378. }