ingester.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. // The MIT License (MIT)
  2. //
  3. // # Copyright (c) 2021 Winlin
  4. //
  5. // Permission is hereby granted, free of charge, to any person obtaining a copy of
  6. // this software and associated documentation files (the "Software"), to deal in
  7. // the Software without restriction, including without limitation the rights to
  8. // use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  9. // the Software, and to permit persons to whom the Software is furnished to do so,
  10. // subject to the following conditions:
  11. //
  12. // The above copyright notice and this permission notice shall be included in all
  13. // copies or substantial portions of the Software.
  14. //
  15. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  16. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  17. // FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  18. // COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  19. // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  20. // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  21. package janus
  22. import (
  23. "context"
  24. "io"
  25. "os"
  26. "strings"
  27. "time"
  28. "github.com/ossrs/go-oryx-lib/errors"
  29. "github.com/ossrs/go-oryx-lib/logger"
  30. "github.com/pion/interceptor"
  31. "github.com/pion/rtp"
  32. "github.com/pion/sdp/v3"
  33. "github.com/pion/webrtc/v3"
  34. "github.com/pion/webrtc/v3/pkg/media"
  35. "github.com/pion/webrtc/v3/pkg/media/h264reader"
  36. "github.com/pion/webrtc/v3/pkg/media/oggreader"
  37. )
  38. type videoIngester struct {
  39. sourceVideo string
  40. fps int
  41. markerInterceptor *rtpInterceptor
  42. sVideoTrack *webrtc.TrackLocalStaticSample
  43. sVideoSender *webrtc.RTPSender
  44. ready context.Context
  45. readyCancel context.CancelFunc
  46. }
  47. func newVideoIngester(sourceVideo string) *videoIngester {
  48. v := &videoIngester{markerInterceptor: &rtpInterceptor{}, sourceVideo: sourceVideo}
  49. v.ready, v.readyCancel = context.WithCancel(context.Background())
  50. return v
  51. }
  52. func (v *videoIngester) Close() error {
  53. v.readyCancel()
  54. if v.sVideoSender != nil {
  55. _ = v.sVideoSender.Stop()
  56. }
  57. return nil
  58. }
  59. func (v *videoIngester) AddTrack(pc *webrtc.PeerConnection, fps int) error {
  60. v.fps = fps
  61. mimeType, trackID := "video/H264", "video"
  62. if strings.HasSuffix(v.sourceVideo, ".ivf") {
  63. mimeType = "video/VP8"
  64. }
  65. var err error
  66. v.sVideoTrack, err = webrtc.NewTrackLocalStaticSample(
  67. webrtc.RTPCodecCapability{MimeType: mimeType, ClockRate: 90000}, trackID, "pion",
  68. )
  69. if err != nil {
  70. return errors.Wrapf(err, "Create video track")
  71. }
  72. v.sVideoSender, err = pc.AddTrack(v.sVideoTrack)
  73. if err != nil {
  74. return errors.Wrapf(err, "Add video track")
  75. }
  76. return err
  77. }
  78. func (v *videoIngester) Ingest(ctx context.Context) error {
  79. source, sender, track, fps := v.sourceVideo, v.sVideoSender, v.sVideoTrack, v.fps
  80. f, err := os.Open(source)
  81. if err != nil {
  82. return errors.Wrapf(err, "Open file %v", source)
  83. }
  84. defer f.Close()
  85. // TODO: FIXME: Support ivf for vp8.
  86. h264, err := h264reader.NewReader(f)
  87. if err != nil {
  88. return errors.Wrapf(err, "Open h264 %v", source)
  89. }
  90. enc := sender.GetParameters().Encodings[0]
  91. codec := sender.GetParameters().Codecs[0]
  92. headers := sender.GetParameters().HeaderExtensions
  93. logger.Tf(ctx, "Video %v, tbn=%v, fps=%v, ssrc=%v, pt=%v, header=%v",
  94. codec.MimeType, codec.ClockRate, fps, enc.SSRC, codec.PayloadType, headers)
  95. // OK, we are ready.
  96. v.readyCancel()
  97. clock := newWallClock()
  98. sampleDuration := time.Duration(uint64(time.Millisecond) * 1000 / uint64(fps))
  99. for ctx.Err() == nil {
  100. var sps, pps *h264reader.NAL
  101. var oFrames []*h264reader.NAL
  102. for ctx.Err() == nil {
  103. frame, err := h264.NextNAL()
  104. if err == io.EOF {
  105. return io.EOF
  106. }
  107. if err != nil {
  108. return errors.Wrapf(err, "Read h264")
  109. }
  110. oFrames = append(oFrames, frame)
  111. logger.If(ctx, "NALU %v PictureOrderCount=%v, ForbiddenZeroBit=%v, RefIdc=%v, %v bytes",
  112. frame.UnitType.String(), frame.PictureOrderCount, frame.ForbiddenZeroBit, frame.RefIdc, len(frame.Data))
  113. if frame.UnitType == h264reader.NalUnitTypeSPS {
  114. sps = frame
  115. } else if frame.UnitType == h264reader.NalUnitTypePPS {
  116. pps = frame
  117. } else {
  118. break
  119. }
  120. }
  121. var frames []*h264reader.NAL
  122. // Package SPS/PPS to STAP-A
  123. if sps != nil && pps != nil {
  124. stapA := packageAsSTAPA(sps, pps)
  125. frames = append(frames, stapA)
  126. }
  127. // Append other original frames.
  128. for _, frame := range oFrames {
  129. if frame.UnitType != h264reader.NalUnitTypeSPS && frame.UnitType != h264reader.NalUnitTypePPS {
  130. frames = append(frames, frame)
  131. }
  132. }
  133. // Covert frames to sample(buffers).
  134. for i, frame := range frames {
  135. sample := media.Sample{Data: frame.Data, Duration: sampleDuration}
  136. // Use the sample timestamp for frames.
  137. if i != len(frames)-1 {
  138. sample.Duration = 0
  139. }
  140. // For STAP-A, set marker to false, to make Chrome happy.
  141. if ri := v.markerInterceptor; ri.rtpWriter == nil {
  142. ri.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
  143. // TODO: Should we decode to check whether SPS/PPS?
  144. if len(payload) > 0 && payload[0]&0x1f == 24 {
  145. header.Marker = false // 24, STAP-A
  146. }
  147. return ri.nextRTPWriter.Write(header, payload, attributes)
  148. }
  149. }
  150. if err = track.WriteSample(sample); err != nil {
  151. return errors.Wrapf(err, "Write sample")
  152. }
  153. }
  154. if d := clock.Tick(sampleDuration); d > 0 {
  155. time.Sleep(d)
  156. }
  157. }
  158. return ctx.Err()
  159. }
  160. type audioIngester struct {
  161. sourceAudio string
  162. audioLevelInterceptor *rtpInterceptor
  163. sAudioTrack *webrtc.TrackLocalStaticSample
  164. sAudioSender *webrtc.RTPSender
  165. ready context.Context
  166. readyCancel context.CancelFunc
  167. }
  168. func newAudioIngester(sourceAudio string) *audioIngester {
  169. v := &audioIngester{audioLevelInterceptor: &rtpInterceptor{}, sourceAudio: sourceAudio}
  170. v.ready, v.readyCancel = context.WithCancel(context.Background())
  171. return v
  172. }
  173. func (v *audioIngester) Close() error {
  174. v.readyCancel() // OK we are closed, also ready.
  175. if v.sAudioSender != nil {
  176. _ = v.sAudioSender.Stop()
  177. }
  178. return nil
  179. }
  180. func (v *audioIngester) AddTrack(pc *webrtc.PeerConnection) error {
  181. var err error
  182. mimeType, trackID := "audio/opus", "audio"
  183. v.sAudioTrack, err = webrtc.NewTrackLocalStaticSample(
  184. webrtc.RTPCodecCapability{MimeType: mimeType, ClockRate: 48000, Channels: 2}, trackID, "pion",
  185. )
  186. if err != nil {
  187. return errors.Wrapf(err, "Create audio track")
  188. }
  189. v.sAudioSender, err = pc.AddTrack(v.sAudioTrack)
  190. if err != nil {
  191. return errors.Wrapf(err, "Add audio track")
  192. }
  193. return nil
  194. }
  195. func (v *audioIngester) Ingest(ctx context.Context) error {
  196. source, sender, track := v.sourceAudio, v.sAudioSender, v.sAudioTrack
  197. f, err := os.Open(source)
  198. if err != nil {
  199. return errors.Wrapf(err, "Open file %v", source)
  200. }
  201. defer f.Close()
  202. ogg, _, err := oggreader.NewWith(f)
  203. if err != nil {
  204. return errors.Wrapf(err, "Open ogg %v", source)
  205. }
  206. enc := sender.GetParameters().Encodings[0]
  207. codec := sender.GetParameters().Codecs[0]
  208. headers := sender.GetParameters().HeaderExtensions
  209. logger.Tf(ctx, "Audio %v, tbn=%v, channels=%v, ssrc=%v, pt=%v, header=%v",
  210. codec.MimeType, codec.ClockRate, codec.Channels, enc.SSRC, codec.PayloadType, headers)
  211. // Whether should encode the audio-level in RTP header.
  212. var audioLevel *webrtc.RTPHeaderExtensionParameter
  213. for _, h := range headers {
  214. if h.URI == sdp.AudioLevelURI {
  215. audioLevel = &h
  216. }
  217. }
  218. // OK, we are ready.
  219. v.readyCancel()
  220. clock := newWallClock()
  221. var lastGranule uint64
  222. for ctx.Err() == nil {
  223. pageData, pageHeader, err := ogg.ParseNextPage()
  224. if err == io.EOF {
  225. return io.EOF
  226. }
  227. if err != nil {
  228. return errors.Wrapf(err, "Read ogg")
  229. }
  230. // The amount of samples is the difference between the last and current timestamp
  231. sampleCount := pageHeader.GranulePosition - lastGranule
  232. lastGranule = pageHeader.GranulePosition
  233. sampleDuration := time.Duration(uint64(time.Millisecond) * 1000 * sampleCount / uint64(codec.ClockRate))
  234. // For audio-level, set the extensions if negotiated.
  235. if ri := v.audioLevelInterceptor; ri.rtpWriter == nil {
  236. ri.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
  237. if audioLevel != nil {
  238. audioLevelPayload, err := new(rtp.AudioLevelExtension).Marshal()
  239. if err != nil {
  240. return 0, err
  241. }
  242. _ = header.SetExtension(uint8(audioLevel.ID), audioLevelPayload)
  243. }
  244. return ri.nextRTPWriter.Write(header, payload, attributes)
  245. }
  246. }
  247. if err = track.WriteSample(media.Sample{Data: pageData, Duration: sampleDuration}); err != nil {
  248. return errors.Wrapf(err, "Write sample")
  249. }
  250. if d := clock.Tick(sampleDuration); d > 0 {
  251. time.Sleep(d)
  252. }
  253. }
  254. return ctx.Err()
  255. }