ts-muxer.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. package mpeg2
  2. import (
  3. "errors"
  4. "github.com/yapingcat/gomedia/codec"
  5. )
  6. type pes_stream struct {
  7. pid uint16
  8. cc uint8
  9. streamtype TS_STREAM_TYPE
  10. }
  11. func NewPESStream(pid uint16, cid TS_STREAM_TYPE) *pes_stream {
  12. return &pes_stream{
  13. pid: pid,
  14. cc: 0,
  15. streamtype: cid,
  16. }
  17. }
  18. type table_pmt struct {
  19. pid uint16
  20. cc uint8
  21. pcr_pid uint16
  22. version_number uint8
  23. pm uint16
  24. streams []*pes_stream
  25. }
  26. func NewTablePmt() *table_pmt {
  27. return &table_pmt{
  28. pid: 0,
  29. cc: 0,
  30. pcr_pid: 0,
  31. version_number: 0,
  32. pm: 0,
  33. streams: make([]*pes_stream, 0, 2),
  34. }
  35. }
  36. type table_pat struct {
  37. cc uint8
  38. version_number uint8
  39. pmts []*table_pmt
  40. }
  41. func NewTablePat() *table_pat {
  42. return &table_pat{
  43. cc: 0,
  44. version_number: 0,
  45. pmts: make([]*table_pmt, 0, 8),
  46. }
  47. }
  48. type TSMuxer struct {
  49. pat *table_pat
  50. stream_pid uint16
  51. pmt_pid uint16
  52. pat_period uint64
  53. OnPacket func(pkg []byte)
  54. }
  55. func NewTSMuxer() *TSMuxer {
  56. return &TSMuxer{
  57. pat: NewTablePat(),
  58. stream_pid: 0x100,
  59. pmt_pid: 0x200,
  60. pat_period: 0,
  61. OnPacket: nil,
  62. }
  63. }
  64. func (mux *TSMuxer) AddStream(cid TS_STREAM_TYPE) uint16 {
  65. if mux.pat == nil {
  66. mux.pat = NewTablePat()
  67. }
  68. if len(mux.pat.pmts) == 0 {
  69. tmppmt := NewTablePmt()
  70. tmppmt.pid = mux.pmt_pid
  71. tmppmt.pm = 1
  72. mux.pmt_pid++
  73. mux.pat.pmts = append(mux.pat.pmts, tmppmt)
  74. }
  75. sid := mux.stream_pid
  76. tmpstream := NewPESStream(sid, cid)
  77. mux.stream_pid++
  78. mux.pat.pmts[0].streams = append(mux.pat.pmts[0].streams, tmpstream)
  79. return sid
  80. }
  81. /// Muxer audio/video stream data
  82. /// pid: stream id by AddStream
  83. /// pts: audio/video stream timestamp in ms
  84. /// dts: audio/video stream timestamp in ms
  85. func (mux *TSMuxer) Write(pid uint16, data []byte, pts uint64, dts uint64) error {
  86. var whichpmt *table_pmt = nil
  87. var whichstream *pes_stream = nil
  88. for _, pmt := range mux.pat.pmts {
  89. for _, stream := range pmt.streams {
  90. if stream.pid == pid {
  91. whichpmt = pmt
  92. whichstream = stream
  93. break
  94. }
  95. }
  96. }
  97. if whichpmt == nil || whichstream == nil {
  98. return errors.New("not Found pid stream")
  99. }
  100. if whichpmt.pcr_pid == 0 || (findPESIDByStreamType(whichstream.streamtype) == PES_STREAM_VIDEO && whichpmt.pcr_pid != pid) {
  101. whichpmt.pcr_pid = pid
  102. }
  103. var withaud bool = false
  104. if whichstream.streamtype == TS_STREAM_H264 || whichstream.streamtype == TS_STREAM_H265 {
  105. codec.SplitFrame(data, func(nalu []byte) bool {
  106. if whichstream.streamtype == TS_STREAM_H264 {
  107. nalu_type := codec.H264NaluTypeWithoutStartCode(nalu)
  108. if nalu_type == codec.H264_NAL_AUD {
  109. withaud = true
  110. return false
  111. } else if codec.IsH264VCLNaluType(nalu_type) {
  112. return false
  113. }
  114. return true
  115. } else {
  116. nalu_type := codec.H265NaluTypeWithoutStartCode(nalu)
  117. if nalu_type == codec.H265_NAL_AUD {
  118. withaud = true
  119. return false
  120. } else if codec.IsH265VCLNaluType(nalu_type) {
  121. return false
  122. }
  123. return true
  124. }
  125. })
  126. }
  127. if mux.pat_period == 0 || mux.pat_period+400 < dts {
  128. mux.pat_period = dts
  129. if mux.pat_period == 0 {
  130. mux.pat_period = 1 //avoid write pat twice
  131. }
  132. tmppat := NewPat()
  133. tmppat.Version_number = mux.pat.version_number
  134. for _, pmt := range mux.pat.pmts {
  135. tmppm := PmtPair{
  136. Program_number: pmt.pm,
  137. PID: pmt.pid,
  138. }
  139. tmppat.Pmts = append(tmppat.Pmts, tmppm)
  140. }
  141. mux.writePat(tmppat)
  142. for _, pmt := range mux.pat.pmts {
  143. tmppmt := NewPmt()
  144. tmppmt.Program_number = pmt.pm
  145. tmppmt.Version_number = pmt.version_number
  146. tmppmt.PCR_PID = pmt.pcr_pid
  147. for _, stream := range pmt.streams {
  148. var sp StreamPair
  149. sp.StreamType = uint8(stream.streamtype)
  150. sp.Elementary_PID = stream.pid
  151. sp.ES_Info_Length = 0
  152. tmppmt.Streams = append(tmppmt.Streams, sp)
  153. }
  154. mux.writePmt(tmppmt, pmt)
  155. }
  156. }
  157. flag := false
  158. switch whichstream.streamtype {
  159. case TS_STREAM_H264:
  160. flag = codec.IsH264IDRFrame(data)
  161. case TS_STREAM_H265:
  162. flag = codec.IsH265IDRFrame(data)
  163. }
  164. mux.writePES(whichstream, whichpmt, data, pts*90, dts*90, flag, withaud)
  165. return nil
  166. }
  167. func (mux *TSMuxer) writePat(pat *Pat) {
  168. var tshdr TSPacket
  169. tshdr.Payload_unit_start_indicator = 1
  170. tshdr.PID = 0
  171. tshdr.Adaptation_field_control = 0x01
  172. tshdr.Continuity_counter = mux.pat.cc
  173. mux.pat.cc++
  174. mux.pat.cc = (mux.pat.cc + 1) % 16
  175. bsw := codec.NewBitStreamWriter(TS_PAKCET_SIZE)
  176. tshdr.EncodeHeader(bsw)
  177. bsw.PutByte(0x00) //pointer
  178. pat.Encode(bsw)
  179. bsw.FillRemainData(0xff)
  180. if mux.OnPacket != nil {
  181. mux.OnPacket(bsw.Bits())
  182. }
  183. }
  184. func (mux *TSMuxer) writePmt(pmt *Pmt, t_pmt *table_pmt) {
  185. var tshdr TSPacket
  186. tshdr.Payload_unit_start_indicator = 1
  187. tshdr.PID = t_pmt.pid
  188. tshdr.Adaptation_field_control = 0x01
  189. tshdr.Continuity_counter = t_pmt.cc
  190. t_pmt.cc = (t_pmt.cc + 1) % 16
  191. bsw := codec.NewBitStreamWriter(TS_PAKCET_SIZE)
  192. tshdr.EncodeHeader(bsw)
  193. bsw.PutByte(0x00) //pointer
  194. pmt.Encode(bsw)
  195. bsw.FillRemainData(0xff)
  196. if mux.OnPacket != nil {
  197. mux.OnPacket(bsw.Bits())
  198. }
  199. }
  200. func (mux *TSMuxer) writePES(pes *pes_stream, pmt *table_pmt, data []byte, pts uint64, dts uint64, idr_flag bool, withaud bool) {
  201. var firstPesPacket bool = true
  202. bsw := codec.NewBitStreamWriter(TS_PAKCET_SIZE)
  203. for {
  204. bsw.Reset()
  205. var tshdr TSPacket
  206. if firstPesPacket {
  207. tshdr.Payload_unit_start_indicator = 1
  208. }
  209. tshdr.PID = pes.pid
  210. tshdr.Adaptation_field_control = 0x01
  211. tshdr.Continuity_counter = pes.cc
  212. headlen := 4
  213. pes.cc = (pes.cc + 1) % 16
  214. var adaptation *Adaptation_field = nil
  215. if firstPesPacket && idr_flag {
  216. adaptation = new(Adaptation_field)
  217. tshdr.Adaptation_field_control = tshdr.Adaptation_field_control | 0x20
  218. adaptation.Random_access_indicator = 1
  219. headlen += 2
  220. }
  221. if firstPesPacket && pes.pid == pmt.pcr_pid {
  222. if adaptation == nil {
  223. adaptation = new(Adaptation_field)
  224. headlen += 2
  225. }
  226. tshdr.Adaptation_field_control = tshdr.Adaptation_field_control | 0x20
  227. adaptation.PCR_flag = 1
  228. var pcr_base uint64 = 0
  229. var pcr_ext uint16 = 0
  230. if dts == 0 {
  231. pcr_base = pts * 300 / 300
  232. pcr_ext = uint16(pts * 300 % 300)
  233. } else {
  234. pcr_base = dts * 300 / 300
  235. pcr_ext = uint16(dts * 300 % 300)
  236. }
  237. adaptation.Program_clock_reference_base = pcr_base
  238. adaptation.Program_clock_reference_extension = pcr_ext
  239. headlen += 6
  240. }
  241. var payload []byte
  242. var pespkg *PesPacket = nil
  243. if firstPesPacket {
  244. oldheadlen := headlen
  245. headlen += 19
  246. if !withaud && pes.streamtype == TS_STREAM_H264 {
  247. headlen += 6
  248. payload = append(payload, H264_AUD_NALU...)
  249. } else if !withaud && pes.streamtype == TS_STREAM_H265 {
  250. payload = append(payload, H265_AUD_NALU...)
  251. headlen += 7
  252. }
  253. pespkg = NewPesPacket()
  254. pespkg.PTS_DTS_flags = 0x03
  255. pespkg.PES_header_data_length = 10
  256. pespkg.Pts = pts
  257. pespkg.Dts = dts
  258. pespkg.Stream_id = uint8(findPESIDByStreamType(pes.streamtype))
  259. if idr_flag {
  260. pespkg.Data_alignment_indicator = 1
  261. }
  262. if headlen-oldheadlen-6+len(data) > 0xFFFF {
  263. pespkg.PES_packet_length = 0
  264. } else {
  265. pespkg.PES_packet_length = uint16(len(data) + headlen - oldheadlen - 6)
  266. }
  267. }
  268. if len(data)+headlen < TS_PAKCET_SIZE {
  269. if adaptation == nil {
  270. adaptation = new(Adaptation_field)
  271. headlen += 1
  272. if TS_PAKCET_SIZE-len(data)-headlen >= 1 {
  273. headlen += 1
  274. } else {
  275. adaptation.SingleStuffingByte = true
  276. }
  277. }
  278. adaptation.Stuffing_byte = uint8(TS_PAKCET_SIZE - len(data) - headlen)
  279. payload = append(payload, data...)
  280. data = data[:0]
  281. } else {
  282. payload = append(payload, data[0:TS_PAKCET_SIZE-headlen]...)
  283. data = data[TS_PAKCET_SIZE-headlen:]
  284. }
  285. if adaptation != nil {
  286. tshdr.Field = adaptation
  287. tshdr.Adaptation_field_control |= 0x02
  288. }
  289. tshdr.EncodeHeader(bsw)
  290. if pespkg != nil {
  291. pespkg.Pes_payload = payload
  292. pespkg.Encode(bsw)
  293. } else {
  294. bsw.PutBytes(payload)
  295. }
  296. firstPesPacket = false
  297. if mux.OnPacket != nil {
  298. if len(bsw.Bits()) != TS_PAKCET_SIZE {
  299. panic("packet ts packet failed")
  300. }
  301. mux.OnPacket(bsw.Bits())
  302. }
  303. if len(data) == 0 {
  304. break
  305. }
  306. }
  307. }