rtpreceiver.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. //go:build !js
  4. // +build !js
  5. package webrtc
  6. import (
  7. "fmt"
  8. "io"
  9. "sync"
  10. "time"
  11. "github.com/pion/interceptor"
  12. "github.com/pion/rtcp"
  13. "github.com/pion/srtp/v2"
  14. "github.com/pion/webrtc/v3/internal/util"
  15. )
  16. // trackStreams maintains a mapping of RTP/RTCP streams to a specific track
  17. // a RTPReceiver may contain multiple streams if we are dealing with Simulcast
  18. type trackStreams struct {
  19. track *TrackRemote
  20. streamInfo, repairStreamInfo *interceptor.StreamInfo
  21. rtpReadStream *srtp.ReadStreamSRTP
  22. rtpInterceptor interceptor.RTPReader
  23. rtcpReadStream *srtp.ReadStreamSRTCP
  24. rtcpInterceptor interceptor.RTCPReader
  25. repairReadStream *srtp.ReadStreamSRTP
  26. repairInterceptor interceptor.RTPReader
  27. repairRtcpReadStream *srtp.ReadStreamSRTCP
  28. repairRtcpInterceptor interceptor.RTCPReader
  29. }
  30. // RTPReceiver allows an application to inspect the receipt of a TrackRemote
  31. type RTPReceiver struct {
  32. kind RTPCodecType
  33. transport *DTLSTransport
  34. tracks []trackStreams
  35. closed, received chan interface{}
  36. mu sync.RWMutex
  37. tr *RTPTransceiver
  38. // A reference to the associated api object
  39. api *API
  40. }
  41. // NewRTPReceiver constructs a new RTPReceiver
  42. func (api *API) NewRTPReceiver(kind RTPCodecType, transport *DTLSTransport) (*RTPReceiver, error) {
  43. if transport == nil {
  44. return nil, errRTPReceiverDTLSTransportNil
  45. }
  46. r := &RTPReceiver{
  47. kind: kind,
  48. transport: transport,
  49. api: api,
  50. closed: make(chan interface{}),
  51. received: make(chan interface{}),
  52. tracks: []trackStreams{},
  53. }
  54. return r, nil
  55. }
  56. func (r *RTPReceiver) setRTPTransceiver(tr *RTPTransceiver) {
  57. r.mu.Lock()
  58. defer r.mu.Unlock()
  59. r.tr = tr
  60. }
  61. // Transport returns the currently-configured *DTLSTransport or nil
  62. // if one has not yet been configured
  63. func (r *RTPReceiver) Transport() *DTLSTransport {
  64. r.mu.RLock()
  65. defer r.mu.RUnlock()
  66. return r.transport
  67. }
  68. func (r *RTPReceiver) getParameters() RTPParameters {
  69. parameters := r.api.mediaEngine.getRTPParametersByKind(r.kind, []RTPTransceiverDirection{RTPTransceiverDirectionRecvonly})
  70. if r.tr != nil {
  71. parameters.Codecs = r.tr.getCodecs()
  72. }
  73. return parameters
  74. }
  75. // GetParameters describes the current configuration for the encoding and
  76. // transmission of media on the receiver's track.
  77. func (r *RTPReceiver) GetParameters() RTPParameters {
  78. r.mu.RLock()
  79. defer r.mu.RUnlock()
  80. return r.getParameters()
  81. }
  82. // Track returns the RtpTransceiver TrackRemote
  83. func (r *RTPReceiver) Track() *TrackRemote {
  84. r.mu.RLock()
  85. defer r.mu.RUnlock()
  86. if len(r.tracks) != 1 {
  87. return nil
  88. }
  89. return r.tracks[0].track
  90. }
  91. // Tracks returns the RtpTransceiver tracks
  92. // A RTPReceiver to support Simulcast may now have multiple tracks
  93. func (r *RTPReceiver) Tracks() []*TrackRemote {
  94. r.mu.RLock()
  95. defer r.mu.RUnlock()
  96. var tracks []*TrackRemote
  97. for i := range r.tracks {
  98. tracks = append(tracks, r.tracks[i].track)
  99. }
  100. return tracks
  101. }
  102. // configureReceive initialize the track
  103. func (r *RTPReceiver) configureReceive(parameters RTPReceiveParameters) {
  104. r.mu.Lock()
  105. defer r.mu.Unlock()
  106. for i := range parameters.Encodings {
  107. t := trackStreams{
  108. track: newTrackRemote(
  109. r.kind,
  110. parameters.Encodings[i].SSRC,
  111. parameters.Encodings[i].RID,
  112. r,
  113. ),
  114. }
  115. r.tracks = append(r.tracks, t)
  116. }
  117. }
  118. // startReceive starts all the transports
  119. func (r *RTPReceiver) startReceive(parameters RTPReceiveParameters) error {
  120. r.mu.Lock()
  121. defer r.mu.Unlock()
  122. select {
  123. case <-r.received:
  124. return errRTPReceiverReceiveAlreadyCalled
  125. default:
  126. }
  127. defer close(r.received)
  128. globalParams := r.getParameters()
  129. codec := RTPCodecCapability{}
  130. if len(globalParams.Codecs) != 0 {
  131. codec = globalParams.Codecs[0].RTPCodecCapability
  132. }
  133. for i := range parameters.Encodings {
  134. if parameters.Encodings[i].RID != "" {
  135. // RID based tracks will be set up in receiveForRid
  136. continue
  137. }
  138. var t *trackStreams
  139. for idx, ts := range r.tracks {
  140. if ts.track != nil && parameters.Encodings[i].SSRC != 0 && ts.track.SSRC() == parameters.Encodings[i].SSRC {
  141. t = &r.tracks[idx]
  142. break
  143. }
  144. }
  145. if t == nil {
  146. return fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, parameters.Encodings[i].SSRC)
  147. }
  148. if parameters.Encodings[i].SSRC != 0 {
  149. t.streamInfo = createStreamInfo("", parameters.Encodings[i].SSRC, 0, codec, globalParams.HeaderExtensions)
  150. var err error
  151. if t.rtpReadStream, t.rtpInterceptor, t.rtcpReadStream, t.rtcpInterceptor, err = r.transport.streamsForSSRC(parameters.Encodings[i].SSRC, *t.streamInfo); err != nil {
  152. return err
  153. }
  154. }
  155. if rtxSsrc := parameters.Encodings[i].RTX.SSRC; rtxSsrc != 0 {
  156. streamInfo := createStreamInfo("", rtxSsrc, 0, codec, globalParams.HeaderExtensions)
  157. rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor, err := r.transport.streamsForSSRC(rtxSsrc, *streamInfo)
  158. if err != nil {
  159. return err
  160. }
  161. if err = r.receiveForRtx(rtxSsrc, "", streamInfo, rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor); err != nil {
  162. return err
  163. }
  164. }
  165. }
  166. return nil
  167. }
  168. // Receive initialize the track and starts all the transports
  169. func (r *RTPReceiver) Receive(parameters RTPReceiveParameters) error {
  170. r.configureReceive(parameters)
  171. return r.startReceive(parameters)
  172. }
  173. // Read reads incoming RTCP for this RTPReceiver
  174. func (r *RTPReceiver) Read(b []byte) (n int, a interceptor.Attributes, err error) {
  175. select {
  176. case <-r.received:
  177. return r.tracks[0].rtcpInterceptor.Read(b, a)
  178. case <-r.closed:
  179. return 0, nil, io.ErrClosedPipe
  180. }
  181. }
  182. // ReadSimulcast reads incoming RTCP for this RTPReceiver for given rid
  183. func (r *RTPReceiver) ReadSimulcast(b []byte, rid string) (n int, a interceptor.Attributes, err error) {
  184. select {
  185. case <-r.received:
  186. for _, t := range r.tracks {
  187. if t.track != nil && t.track.rid == rid {
  188. return t.rtcpInterceptor.Read(b, a)
  189. }
  190. }
  191. return 0, nil, fmt.Errorf("%w: %s", errRTPReceiverForRIDTrackStreamNotFound, rid)
  192. case <-r.closed:
  193. return 0, nil, io.ErrClosedPipe
  194. }
  195. }
  196. // ReadRTCP is a convenience method that wraps Read and unmarshal for you.
  197. // It also runs any configured interceptors.
  198. func (r *RTPReceiver) ReadRTCP() ([]rtcp.Packet, interceptor.Attributes, error) {
  199. b := make([]byte, r.api.settingEngine.getReceiveMTU())
  200. i, attributes, err := r.Read(b)
  201. if err != nil {
  202. return nil, nil, err
  203. }
  204. pkts, err := rtcp.Unmarshal(b[:i])
  205. if err != nil {
  206. return nil, nil, err
  207. }
  208. return pkts, attributes, nil
  209. }
  210. // ReadSimulcastRTCP is a convenience method that wraps ReadSimulcast and unmarshal for you
  211. func (r *RTPReceiver) ReadSimulcastRTCP(rid string) ([]rtcp.Packet, interceptor.Attributes, error) {
  212. b := make([]byte, r.api.settingEngine.getReceiveMTU())
  213. i, attributes, err := r.ReadSimulcast(b, rid)
  214. if err != nil {
  215. return nil, nil, err
  216. }
  217. pkts, err := rtcp.Unmarshal(b[:i])
  218. return pkts, attributes, err
  219. }
  220. func (r *RTPReceiver) haveReceived() bool {
  221. select {
  222. case <-r.received:
  223. return true
  224. default:
  225. return false
  226. }
  227. }
  228. // Stop irreversibly stops the RTPReceiver
  229. func (r *RTPReceiver) Stop() error {
  230. r.mu.Lock()
  231. defer r.mu.Unlock()
  232. var err error
  233. select {
  234. case <-r.closed:
  235. return err
  236. default:
  237. }
  238. select {
  239. case <-r.received:
  240. for i := range r.tracks {
  241. errs := []error{}
  242. if r.tracks[i].rtcpReadStream != nil {
  243. errs = append(errs, r.tracks[i].rtcpReadStream.Close())
  244. }
  245. if r.tracks[i].rtpReadStream != nil {
  246. errs = append(errs, r.tracks[i].rtpReadStream.Close())
  247. }
  248. if r.tracks[i].repairReadStream != nil {
  249. errs = append(errs, r.tracks[i].repairReadStream.Close())
  250. }
  251. if r.tracks[i].repairRtcpReadStream != nil {
  252. errs = append(errs, r.tracks[i].repairRtcpReadStream.Close())
  253. }
  254. if r.tracks[i].streamInfo != nil {
  255. r.api.interceptor.UnbindRemoteStream(r.tracks[i].streamInfo)
  256. }
  257. if r.tracks[i].repairStreamInfo != nil {
  258. r.api.interceptor.UnbindRemoteStream(r.tracks[i].repairStreamInfo)
  259. }
  260. err = util.FlattenErrs(errs)
  261. }
  262. default:
  263. }
  264. close(r.closed)
  265. return err
  266. }
  267. func (r *RTPReceiver) streamsForTrack(t *TrackRemote) *trackStreams {
  268. for i := range r.tracks {
  269. if r.tracks[i].track == t {
  270. return &r.tracks[i]
  271. }
  272. }
  273. return nil
  274. }
  275. // readRTP should only be called by a track, this only exists so we can keep state in one place
  276. func (r *RTPReceiver) readRTP(b []byte, reader *TrackRemote) (n int, a interceptor.Attributes, err error) {
  277. <-r.received
  278. if t := r.streamsForTrack(reader); t != nil {
  279. return t.rtpInterceptor.Read(b, a)
  280. }
  281. return 0, nil, fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, reader.SSRC())
  282. }
  283. // receiveForRid is the sibling of Receive expect for RIDs instead of SSRCs
  284. // It populates all the internal state for the given RID
  285. func (r *RTPReceiver) receiveForRid(rid string, params RTPParameters, streamInfo *interceptor.StreamInfo, rtpReadStream *srtp.ReadStreamSRTP, rtpInterceptor interceptor.RTPReader, rtcpReadStream *srtp.ReadStreamSRTCP, rtcpInterceptor interceptor.RTCPReader) (*TrackRemote, error) {
  286. r.mu.Lock()
  287. defer r.mu.Unlock()
  288. for i := range r.tracks {
  289. if r.tracks[i].track.RID() == rid {
  290. r.tracks[i].track.mu.Lock()
  291. r.tracks[i].track.kind = r.kind
  292. r.tracks[i].track.codec = params.Codecs[0]
  293. r.tracks[i].track.params = params
  294. r.tracks[i].track.ssrc = SSRC(streamInfo.SSRC)
  295. r.tracks[i].track.mu.Unlock()
  296. r.tracks[i].streamInfo = streamInfo
  297. r.tracks[i].rtpReadStream = rtpReadStream
  298. r.tracks[i].rtpInterceptor = rtpInterceptor
  299. r.tracks[i].rtcpReadStream = rtcpReadStream
  300. r.tracks[i].rtcpInterceptor = rtcpInterceptor
  301. return r.tracks[i].track, nil
  302. }
  303. }
  304. return nil, fmt.Errorf("%w: %s", errRTPReceiverForRIDTrackStreamNotFound, rid)
  305. }
  306. // receiveForRtx starts a routine that processes the repair stream
  307. // These packets aren't exposed to the user yet, but we need to process them for
  308. // TWCC
  309. func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *interceptor.StreamInfo, rtpReadStream *srtp.ReadStreamSRTP, rtpInterceptor interceptor.RTPReader, rtcpReadStream *srtp.ReadStreamSRTCP, rtcpInterceptor interceptor.RTCPReader) error {
  310. var track *trackStreams
  311. if ssrc != 0 && len(r.tracks) == 1 {
  312. track = &r.tracks[0]
  313. } else {
  314. for i := range r.tracks {
  315. if r.tracks[i].track.RID() == rsid {
  316. track = &r.tracks[i]
  317. }
  318. }
  319. }
  320. if track == nil {
  321. return fmt.Errorf("%w: ssrc(%d) rsid(%s)", errRTPReceiverForRIDTrackStreamNotFound, ssrc, rsid)
  322. }
  323. track.repairStreamInfo = streamInfo
  324. track.repairReadStream = rtpReadStream
  325. track.repairInterceptor = rtpInterceptor
  326. track.repairRtcpReadStream = rtcpReadStream
  327. track.repairRtcpInterceptor = rtcpInterceptor
  328. go func() {
  329. b := make([]byte, r.api.settingEngine.getReceiveMTU())
  330. for {
  331. if _, _, readErr := track.repairInterceptor.Read(b, nil); readErr != nil {
  332. return
  333. }
  334. }
  335. }()
  336. return nil
  337. }
  338. // SetReadDeadline sets the max amount of time the RTCP stream will block before returning. 0 is forever.
  339. func (r *RTPReceiver) SetReadDeadline(t time.Time) error {
  340. r.mu.RLock()
  341. defer r.mu.RUnlock()
  342. return r.tracks[0].rtcpReadStream.SetReadDeadline(t)
  343. }
  344. // SetReadDeadlineSimulcast sets the max amount of time the RTCP stream for a given rid will block before returning. 0 is forever.
  345. func (r *RTPReceiver) SetReadDeadlineSimulcast(deadline time.Time, rid string) error {
  346. r.mu.RLock()
  347. defer r.mu.RUnlock()
  348. for _, t := range r.tracks {
  349. if t.track != nil && t.track.rid == rid {
  350. return t.rtcpReadStream.SetReadDeadline(deadline)
  351. }
  352. }
  353. return fmt.Errorf("%w: %s", errRTPReceiverForRIDTrackStreamNotFound, rid)
  354. }
  355. // setRTPReadDeadline sets the max amount of time the RTP stream will block before returning. 0 is forever.
  356. // This should be fired by calling SetReadDeadline on the TrackRemote
  357. func (r *RTPReceiver) setRTPReadDeadline(deadline time.Time, reader *TrackRemote) error {
  358. r.mu.RLock()
  359. defer r.mu.RUnlock()
  360. if t := r.streamsForTrack(reader); t != nil {
  361. return t.rtpReadStream.SetReadDeadline(deadline)
  362. }
  363. return fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, reader.SSRC())
  364. }