2
0

ingester.go 13 KB


  1. // The MIT License (MIT)
  2. //
  3. // # Copyright (c) 2022-2024 Winlin
  4. //
  5. // Permission is hereby granted, free of charge, to any person obtaining a copy of
  6. // this software and associated documentation files (the "Software"), to deal in
  7. // the Software without restriction, including without limitation the rights to
  8. // use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  9. // the Software, and to permit persons to whom the Software is furnished to do so,
  10. // subject to the following conditions:
  11. //
  12. // The above copyright notice and this permission notice shall be included in all
  13. // copies or substantial portions of the Software.
  14. //
  15. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  16. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  17. // FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  18. // COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  19. // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  20. // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  21. package gb28181
  22. import (
  23. "context"
  24. "github.com/ghettovoice/gosip/sip"
  25. "github.com/ossrs/go-oryx-lib/errors"
  26. "github.com/ossrs/go-oryx-lib/logger"
  27. "github.com/pion/webrtc/v3/pkg/media/h264reader"
  28. "github.com/yapingcat/gomedia/mpeg2"
  29. "io"
  30. "os"
  31. "path"
  32. "strconv"
  33. "strings"
  34. "sync"
  35. "time"
  36. )
  37. type GBSessionConfig struct {
  38. regTimeout time.Duration
  39. inviteTimeout time.Duration
  40. }
  41. type GBSessionOutput struct {
  42. ssrc int64
  43. mediaPort int64
  44. clockRate uint64
  45. payloadType uint8
  46. }
  47. type GBSession struct {
  48. // GB config.
  49. conf *GBSessionConfig
  50. // The output of session.
  51. out *GBSessionOutput
  52. // The SIP session object.
  53. sip *SIPSession
  54. // Callback when REGISTER done.
  55. onRegisterDone func(req, res sip.Message) error
  56. // Callback when got INVITE request.
  57. onInviteRequest func(req sip.Message) error
  58. // Callback when got INVITE 200 OK ACK request.
  59. onInviteOkAck func(req, res sip.Message) error
  60. // Callback when got MESSAGE response.
  61. onMessageHeartbeat func(req, res sip.Message) error
  62. // For heartbeat coroutines.
  63. heartbeatInterval time.Duration
  64. heartbeatCtx context.Context
  65. cancel context.CancelFunc
  66. // WaitGroup for coroutines.
  67. wg sync.WaitGroup
  68. }
  69. func NewGBSession(c *GBSessionConfig, sc *SIPConfig) *GBSession {
  70. return &GBSession{
  71. sip: NewSIPSession(sc),
  72. conf: c,
  73. out: &GBSessionOutput{
  74. clockRate: uint64(90000),
  75. payloadType: uint8(96),
  76. },
  77. heartbeatInterval: 1 * time.Second,
  78. }
  79. }
  80. func (v *GBSession) Close() error {
  81. if v.cancel != nil {
  82. v.cancel()
  83. }
  84. v.sip.Close()
  85. v.wg.Wait()
  86. return nil
  87. }
  88. func (v *GBSession) Connect(ctx context.Context) error {
  89. client := v.sip
  90. if err := client.Connect(ctx); err != nil {
  91. return errors.Wrap(err, "connect")
  92. }
  93. return ctx.Err()
  94. }
  95. func (v *GBSession) Register(ctx context.Context) error {
  96. client := v.sip
  97. for ctx.Err() == nil {
  98. ctx, regCancel := context.WithTimeout(ctx, v.conf.regTimeout)
  99. defer regCancel()
  100. regReq, regRes, err := client.Register(ctx)
  101. if err != nil {
  102. return errors.Wrap(err, "register")
  103. }
  104. logger.Tf(ctx, "Register id=%v, response=%v", regReq.MessageID(), regRes.MessageID())
  105. if v.onRegisterDone != nil {
  106. if err = v.onRegisterDone(regReq, regRes); err != nil {
  107. return errors.Wrap(err, "callback")
  108. }
  109. }
  110. break
  111. }
  112. return ctx.Err()
  113. }
  114. func (v *GBSession) Invite(ctx context.Context) error {
  115. client := v.sip
  116. for ctx.Err() == nil {
  117. ctx, inviteCancel := context.WithTimeout(ctx, v.conf.inviteTimeout)
  118. defer inviteCancel()
  119. inviteReq, err := client.Wait(ctx, sip.INVITE)
  120. if err != nil {
  121. return errors.Wrap(err, "wait")
  122. }
  123. logger.Tf(ctx, "Got INVITE request, Call-ID=%v", sipGetCallID(inviteReq))
  124. if v.onInviteRequest != nil {
  125. if err = v.onInviteRequest(inviteReq); err != nil {
  126. return errors.Wrap(err, "callback")
  127. }
  128. }
  129. if err = client.Trying(ctx, inviteReq); err != nil {
  130. return errors.Wrapf(err, "trying invite is %v", inviteReq.String())
  131. }
  132. time.Sleep(100 * time.Millisecond)
  133. inviteRes, err := client.InviteResponse(ctx, inviteReq)
  134. if err != nil {
  135. return errors.Wrapf(err, "response invite is %v", inviteReq.String())
  136. }
  137. offer := inviteReq.Body()
  138. ssrcStr := strings.Split(strings.Split(offer, "y=")[1], "\r\n")[0]
  139. if v.out.ssrc, err = strconv.ParseInt(ssrcStr, 10, 64); err != nil {
  140. return errors.Wrapf(err, "parse ssrc=%v, sdp %v", ssrcStr, offer)
  141. }
  142. mediaPortStr := strings.Split(strings.Split(offer, "m=video")[1], " ")[1]
  143. if v.out.mediaPort, err = strconv.ParseInt(mediaPortStr, 10, 64); err != nil {
  144. return errors.Wrapf(err, "parse media port=%v, sdp %v", mediaPortStr, offer)
  145. }
  146. logger.Tf(ctx, "Invite id=%v, response=%v, y=%v, ssrc=%v, mediaPort=%v",
  147. inviteReq.MessageID(), inviteRes.MessageID(), ssrcStr, v.out.ssrc, v.out.mediaPort,
  148. )
  149. if v.onInviteOkAck != nil {
  150. if err = v.onInviteOkAck(inviteReq, inviteRes); err != nil {
  151. return errors.Wrap(err, "callback")
  152. }
  153. }
  154. break
  155. }
  156. // Start goroutine for heartbeat every 1s.
  157. v.heartbeatCtx, v.cancel = context.WithCancel(ctx)
  158. go func(ctx context.Context) {
  159. v.wg.Add(1)
  160. defer v.wg.Done()
  161. for ctx.Err() == nil {
  162. req, res, err := client.Message(ctx)
  163. if err != nil {
  164. v.cancel()
  165. logger.Ef(ctx, "heartbeat err %+v", err)
  166. return
  167. }
  168. if v.onMessageHeartbeat != nil {
  169. if err = v.onMessageHeartbeat(req, res); err != nil {
  170. v.cancel()
  171. logger.Ef(ctx, "callback err %+v", err)
  172. return
  173. }
  174. }
  175. select {
  176. case <-ctx.Done():
  177. return
  178. case <-time.After(v.heartbeatInterval):
  179. }
  180. }
  181. }(v.heartbeatCtx)
  182. return ctx.Err()
  183. }
  184. func (v *GBSession) Bye(ctx context.Context) error {
  185. client := v.sip
  186. for ctx.Err() == nil {
  187. ctx, regCancel := context.WithTimeout(ctx, v.conf.regTimeout)
  188. defer regCancel()
  189. regReq, regRes, err := client.Bye(ctx)
  190. if err != nil {
  191. return errors.Wrap(err, "bye")
  192. }
  193. logger.Tf(ctx, "Bye id=%v, response=%v", regReq.MessageID(), regRes.MessageID())
  194. break
  195. }
  196. return ctx.Err()
  197. }
  198. func (v *GBSession) UnRegister(ctx context.Context) error {
  199. client := v.sip
  200. for ctx.Err() == nil {
  201. ctx, regCancel := context.WithTimeout(ctx, v.conf.regTimeout)
  202. defer regCancel()
  203. regReq, regRes, err := client.UnRegister(ctx)
  204. if err != nil {
  205. return errors.Wrap(err, "UnRegister")
  206. }
  207. logger.Tf(ctx, "UnRegister id=%v, response=%v", regReq.MessageID(), regRes.MessageID())
  208. break
  209. }
  210. return ctx.Err()
  211. }
  212. type IngesterConfig struct {
  213. psConfig PSConfig
  214. ssrc uint32
  215. serverAddr string
  216. clockRate uint64
  217. payloadType uint8
  218. }
  219. type PSIngester struct {
  220. conf *IngesterConfig
  221. onSendPacket func(pack *PSPackStream) error
  222. cancel context.CancelFunc
  223. }
  224. func NewPSIngester(c *IngesterConfig) *PSIngester {
  225. return &PSIngester{conf: c}
  226. }
  227. func (v *PSIngester) Close() error {
  228. if v.cancel != nil {
  229. v.cancel()
  230. }
  231. return nil
  232. }
  233. func (v *PSIngester) Ingest(ctx context.Context) error {
  234. ctx, v.cancel = context.WithCancel(ctx)
  235. ps := NewPSClient(uint32(v.conf.ssrc), v.conf.serverAddr)
  236. if err := ps.Connect(ctx); err != nil {
  237. return errors.Wrapf(err, "connect media=%v", v.conf.serverAddr)
  238. }
  239. defer ps.Close()
  240. videoFile, err := os.Open(v.conf.psConfig.video)
  241. if err != nil {
  242. return errors.Wrapf(err, "Open file %v", v.conf.psConfig.video)
  243. }
  244. defer videoFile.Close()
  245. f, err := os.Open(v.conf.psConfig.audio)
  246. if err != nil {
  247. return errors.Wrapf(err, "Open file %v", v.conf.psConfig.audio)
  248. }
  249. defer f.Close()
  250. fileSuffix := path.Ext(v.conf.psConfig.video)
  251. var h264 *h264reader.H264Reader
  252. var h265 *H265Reader
  253. if fileSuffix == ".h265" {
  254. h265, err = NewReader(videoFile)
  255. } else {
  256. h264, err = h264reader.NewReader(videoFile)
  257. }
  258. if err != nil {
  259. return errors.Wrapf(err, "Open %v", v.conf.psConfig.video)
  260. }
  261. audio, err := NewAACReader(f)
  262. if err != nil {
  263. return errors.Wrapf(err, "Open ogg %v", v.conf.psConfig.audio)
  264. }
  265. // Scale the video samples to 1024 according to AAC, that is 1 video frame means 1024 samples.
  266. audioSampleRate := audio.codec.ASC().SampleRate.ToHz()
  267. videoSampleRate := 1024 * 1000 / v.conf.psConfig.fps
  268. logger.Tf(ctx, "PS: Media stream, tbn=%v, ssrc=%v, pt=%v, Video(%v, fps=%v, rate=%v), Audio(%v, rate=%v, channels=%v)",
  269. v.conf.clockRate, v.conf.ssrc, v.conf.payloadType, v.conf.psConfig.video, v.conf.psConfig.fps, videoSampleRate,
  270. v.conf.psConfig.audio, audioSampleRate, audio.codec.ASC().Channels)
  271. lastPrint := time.Now()
  272. var aacSamples, avcSamples uint64
  273. var audioDTS, videoDTS uint64
  274. defer func() {
  275. logger.Tf(ctx, "Consume Video(samples=%v, dts=%v, ts=%.2f) and Audio(samples=%v, dts=%v, ts=%.2f)",
  276. avcSamples, videoDTS, float64(videoDTS)/90.0, aacSamples, audioDTS, float64(audioDTS)/90.0,
  277. )
  278. }()
  279. clock := newWallClock()
  280. var pack *PSPackStream
  281. for ctx.Err() == nil {
  282. if pack == nil {
  283. pack = NewPSPackStream(v.conf.payloadType)
  284. }
  285. // One pack should only contains one video frame.
  286. if !pack.hasVideo {
  287. if fileSuffix == ".h265" {
  288. err = v.writeH265(ctx, pack, h265, videoSampleRate, &avcSamples, &videoDTS)
  289. } else {
  290. err = v.writeH264(ctx, pack, h264, videoSampleRate, &avcSamples, &videoDTS)
  291. }
  292. if err != nil {
  293. return errors.Wrap(err, "WriteVideo")
  294. }
  295. }
  296. // Always read and consume one audio frame each time.
  297. if true {
  298. audioFrame, err := audio.NextADTSFrame()
  299. if err != nil {
  300. return errors.Wrap(err, "Read AAC")
  301. }
  302. // Each AAC frame contains 1024 samples, DTS = total-samples / sample-rate
  303. aacSamples += 1024
  304. audioDTS = uint64(v.conf.clockRate*aacSamples) / uint64(audioSampleRate)
  305. if time.Now().Sub(lastPrint) > 3*time.Second {
  306. lastPrint = time.Now()
  307. logger.Tf(ctx, "Consume Video(samples=%v, dts=%v, ts=%.2f) and Audio(samples=%v, dts=%v, ts=%.2f)",
  308. avcSamples, videoDTS, float64(videoDTS)/90.0, aacSamples, audioDTS, float64(audioDTS)/90.0,
  309. )
  310. }
  311. if err = pack.WriteAudio(audioFrame, audioDTS); err != nil {
  312. return errors.Wrapf(err, "write audio %v", len(audioFrame))
  313. }
  314. }
  315. // Send pack when got video and enough audio frames.
  316. if pack.hasVideo && videoDTS < audioDTS {
  317. if err := ps.WritePacksOverRTP(pack.packets); err != nil {
  318. return errors.Wrap(err, "write")
  319. }
  320. if v.onSendPacket != nil {
  321. if err := v.onSendPacket(pack); err != nil {
  322. return errors.Wrap(err, "callback")
  323. }
  324. }
  325. pack = nil // Reset pack.
  326. }
  327. // One audio frame(1024 samples), the duration is 1024/audioSampleRate in seconds.
  328. sampleDuration := time.Duration(uint64(time.Second) * 1024 / uint64(audioSampleRate))
  329. if d := clock.Tick(sampleDuration); d > 0 {
  330. time.Sleep(d)
  331. }
  332. }
  333. return nil
  334. }
  335. func (v *PSIngester) writeH264(ctx context.Context, pack *PSPackStream, h264 *h264reader.H264Reader,
  336. videoSampleRate int, avcSamples, videoDTS *uint64) error {
  337. var sps, pps *h264reader.NAL
  338. var videoFrames []*h264reader.NAL
  339. for ctx.Err() == nil {
  340. frame, err := h264.NextNAL()
  341. if err == io.EOF {
  342. return io.EOF
  343. }
  344. if err != nil {
  345. return errors.Wrapf(err, "Read h264")
  346. }
  347. videoFrames = append(videoFrames, frame)
  348. logger.If(ctx, "NALU %v PictureOrderCount=%v, ForbiddenZeroBit=%v, RefIdc=%v, %v bytes",
  349. frame.UnitType.String(), frame.PictureOrderCount, frame.ForbiddenZeroBit, frame.RefIdc, len(frame.Data))
  350. if frame.UnitType == h264reader.NalUnitTypeSPS {
  351. sps = frame
  352. } else if frame.UnitType == h264reader.NalUnitTypePPS {
  353. pps = frame
  354. } else {
  355. break
  356. }
  357. }
  358. // We convert the video sample rate to be based over 1024, that is 1024 samples means one video frame.
  359. *avcSamples += 1024
  360. *videoDTS = uint64(v.conf.clockRate*(*avcSamples)) / uint64(videoSampleRate)
  361. var err error
  362. if sps != nil || pps != nil {
  363. err = pack.WriteHeader(mpeg2.PS_STREAM_H264, *videoDTS)
  364. } else {
  365. err = pack.WritePackHeader(*videoDTS)
  366. }
  367. if err != nil {
  368. return errors.Wrap(err, "pack header")
  369. }
  370. for _, frame := range videoFrames {
  371. if err = pack.WriteVideo(frame.Data, *videoDTS); err != nil {
  372. return errors.Wrapf(err, "write video %v", len(frame.Data))
  373. }
  374. }
  375. return nil
  376. }
  377. func (v *PSIngester) writeH265(ctx context.Context, pack *PSPackStream, h265 *H265Reader,
  378. videoSampleRate int, avcSamples, videoDTS *uint64) error {
  379. var vps, sps, pps *NAL
  380. var videoFrames []*NAL
  381. for ctx.Err() == nil {
  382. frame, err := h265.NextNAL()
  383. if err == io.EOF {
  384. return io.EOF
  385. }
  386. if err != nil {
  387. return errors.Wrapf(err, "Read h265")
  388. }
  389. videoFrames = append(videoFrames, frame)
  390. logger.If(ctx, "NALU %v PictureOrderCount=%v, ForbiddenZeroBit=%v, %v bytes",
  391. frame.UnitType, frame.PictureOrderCount, frame.ForbiddenZeroBit, len(frame.Data))
  392. if frame.UnitType == NaluTypeVps {
  393. vps = frame
  394. } else if frame.UnitType == NaluTypeSps {
  395. sps = frame
  396. } else if frame.UnitType == NaluTypePps {
  397. pps = frame
  398. } else {
  399. break
  400. }
  401. }
  402. // We convert the video sample rate to be based over 1024, that is 1024 samples means one video frame.
  403. *avcSamples += 1024
  404. *videoDTS = uint64(v.conf.clockRate*(*avcSamples)) / uint64(videoSampleRate)
  405. var err error
  406. if vps != nil || sps != nil || pps != nil {
  407. err = pack.WriteHeader(mpeg2.PS_STREAM_H265, *videoDTS)
  408. } else {
  409. err = pack.WritePackHeader(*videoDTS)
  410. }
  411. if err != nil {
  412. return errors.Wrap(err, "pack header")
  413. }
  414. for _, frame := range videoFrames {
  415. if err = pack.WriteVideo(frame.Data, *videoDTS); err != nil {
  416. return errors.Wrapf(err, "write video %v", len(frame.Data))
  417. }
  418. }
  419. return nil
  420. }