123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333 |
- package mpeg2
- import (
- "errors"
- "github.com/yapingcat/gomedia/codec"
- )
- type pes_stream struct {
- pid uint16
- cc uint8
- streamtype TS_STREAM_TYPE
- }
- func NewPESStream(pid uint16, cid TS_STREAM_TYPE) *pes_stream {
- return &pes_stream{
- pid: pid,
- cc: 0,
- streamtype: cid,
- }
- }
- type table_pmt struct {
- pid uint16
- cc uint8
- pcr_pid uint16
- version_number uint8
- pm uint16
- streams []*pes_stream
- }
- func NewTablePmt() *table_pmt {
- return &table_pmt{
- pid: 0,
- cc: 0,
- pcr_pid: 0,
- version_number: 0,
- pm: 0,
- streams: make([]*pes_stream, 0, 2),
- }
- }
- type table_pat struct {
- cc uint8
- version_number uint8
- pmts []*table_pmt
- }
- func NewTablePat() *table_pat {
- return &table_pat{
- cc: 0,
- version_number: 0,
- pmts: make([]*table_pmt, 0, 8),
- }
- }
- type TSMuxer struct {
- pat *table_pat
- stream_pid uint16
- pmt_pid uint16
- pat_period uint64
- OnPacket func(pkg []byte)
- }
- func NewTSMuxer() *TSMuxer {
- return &TSMuxer{
- pat: NewTablePat(),
- stream_pid: 0x100,
- pmt_pid: 0x200,
- pat_period: 0,
- OnPacket: nil,
- }
- }
- func (mux *TSMuxer) AddStream(cid TS_STREAM_TYPE) uint16 {
- if mux.pat == nil {
- mux.pat = NewTablePat()
- }
- if len(mux.pat.pmts) == 0 {
- tmppmt := NewTablePmt()
- tmppmt.pid = mux.pmt_pid
- tmppmt.pm = 1
- mux.pmt_pid++
- mux.pat.pmts = append(mux.pat.pmts, tmppmt)
- }
- sid := mux.stream_pid
- tmpstream := NewPESStream(sid, cid)
- mux.stream_pid++
- mux.pat.pmts[0].streams = append(mux.pat.pmts[0].streams, tmpstream)
- return sid
- }
- /// Muxer audio/video stream data
- /// pid: stream id by AddStream
- /// pts: audio/video stream timestamp in ms
- /// dts: audio/video stream timestamp in ms
- func (mux *TSMuxer) Write(pid uint16, data []byte, pts uint64, dts uint64) error {
- var whichpmt *table_pmt = nil
- var whichstream *pes_stream = nil
- for _, pmt := range mux.pat.pmts {
- for _, stream := range pmt.streams {
- if stream.pid == pid {
- whichpmt = pmt
- whichstream = stream
- break
- }
- }
- }
- if whichpmt == nil || whichstream == nil {
- return errors.New("not Found pid stream")
- }
- if whichpmt.pcr_pid == 0 || (findPESIDByStreamType(whichstream.streamtype) == PES_STREAM_VIDEO && whichpmt.pcr_pid != pid) {
- whichpmt.pcr_pid = pid
- }
- var withaud bool = false
- if whichstream.streamtype == TS_STREAM_H264 || whichstream.streamtype == TS_STREAM_H265 {
- codec.SplitFrame(data, func(nalu []byte) bool {
- if whichstream.streamtype == TS_STREAM_H264 {
- nalu_type := codec.H264NaluTypeWithoutStartCode(nalu)
- if nalu_type == codec.H264_NAL_AUD {
- withaud = true
- return false
- } else if codec.IsH264VCLNaluType(nalu_type) {
- return false
- }
- return true
- } else {
- nalu_type := codec.H265NaluTypeWithoutStartCode(nalu)
- if nalu_type == codec.H265_NAL_AUD {
- withaud = true
- return false
- } else if codec.IsH265VCLNaluType(nalu_type) {
- return false
- }
- return true
- }
- })
- }
- if mux.pat_period == 0 || mux.pat_period+400 < dts {
- mux.pat_period = dts
- if mux.pat_period == 0 {
- mux.pat_period = 1 //avoid write pat twice
- }
- tmppat := NewPat()
- tmppat.Version_number = mux.pat.version_number
- for _, pmt := range mux.pat.pmts {
- tmppm := PmtPair{
- Program_number: pmt.pm,
- PID: pmt.pid,
- }
- tmppat.Pmts = append(tmppat.Pmts, tmppm)
- }
- mux.writePat(tmppat)
- for _, pmt := range mux.pat.pmts {
- tmppmt := NewPmt()
- tmppmt.Program_number = pmt.pm
- tmppmt.Version_number = pmt.version_number
- tmppmt.PCR_PID = pmt.pcr_pid
- for _, stream := range pmt.streams {
- var sp StreamPair
- sp.StreamType = uint8(stream.streamtype)
- sp.Elementary_PID = stream.pid
- sp.ES_Info_Length = 0
- tmppmt.Streams = append(tmppmt.Streams, sp)
- }
- mux.writePmt(tmppmt, pmt)
- }
- }
- flag := false
- switch whichstream.streamtype {
- case TS_STREAM_H264:
- flag = codec.IsH264IDRFrame(data)
- case TS_STREAM_H265:
- flag = codec.IsH265IDRFrame(data)
- }
- mux.writePES(whichstream, whichpmt, data, pts*90, dts*90, flag, withaud)
- return nil
- }
- func (mux *TSMuxer) writePat(pat *Pat) {
- var tshdr TSPacket
- tshdr.Payload_unit_start_indicator = 1
- tshdr.PID = 0
- tshdr.Adaptation_field_control = 0x01
- tshdr.Continuity_counter = mux.pat.cc
- mux.pat.cc++
- mux.pat.cc = (mux.pat.cc + 1) % 16
- bsw := codec.NewBitStreamWriter(TS_PAKCET_SIZE)
- tshdr.EncodeHeader(bsw)
- bsw.PutByte(0x00) //pointer
- pat.Encode(bsw)
- bsw.FillRemainData(0xff)
- if mux.OnPacket != nil {
- mux.OnPacket(bsw.Bits())
- }
- }
- func (mux *TSMuxer) writePmt(pmt *Pmt, t_pmt *table_pmt) {
- var tshdr TSPacket
- tshdr.Payload_unit_start_indicator = 1
- tshdr.PID = t_pmt.pid
- tshdr.Adaptation_field_control = 0x01
- tshdr.Continuity_counter = t_pmt.cc
- t_pmt.cc = (t_pmt.cc + 1) % 16
- bsw := codec.NewBitStreamWriter(TS_PAKCET_SIZE)
- tshdr.EncodeHeader(bsw)
- bsw.PutByte(0x00) //pointer
- pmt.Encode(bsw)
- bsw.FillRemainData(0xff)
- if mux.OnPacket != nil {
- mux.OnPacket(bsw.Bits())
- }
- }
- func (mux *TSMuxer) writePES(pes *pes_stream, pmt *table_pmt, data []byte, pts uint64, dts uint64, idr_flag bool, withaud bool) {
- var firstPesPacket bool = true
- bsw := codec.NewBitStreamWriter(TS_PAKCET_SIZE)
- for {
- bsw.Reset()
- var tshdr TSPacket
- if firstPesPacket {
- tshdr.Payload_unit_start_indicator = 1
- }
- tshdr.PID = pes.pid
- tshdr.Adaptation_field_control = 0x01
- tshdr.Continuity_counter = pes.cc
- headlen := 4
- pes.cc = (pes.cc + 1) % 16
- var adaptation *Adaptation_field = nil
- if firstPesPacket && idr_flag {
- adaptation = new(Adaptation_field)
- tshdr.Adaptation_field_control = tshdr.Adaptation_field_control | 0x20
- adaptation.Random_access_indicator = 1
- headlen += 2
- }
- if firstPesPacket && pes.pid == pmt.pcr_pid {
- if adaptation == nil {
- adaptation = new(Adaptation_field)
- headlen += 2
- }
- tshdr.Adaptation_field_control = tshdr.Adaptation_field_control | 0x20
- adaptation.PCR_flag = 1
- var pcr_base uint64 = 0
- var pcr_ext uint16 = 0
- if dts == 0 {
- pcr_base = pts * 300 / 300
- pcr_ext = uint16(pts * 300 % 300)
- } else {
- pcr_base = dts * 300 / 300
- pcr_ext = uint16(dts * 300 % 300)
- }
- adaptation.Program_clock_reference_base = pcr_base
- adaptation.Program_clock_reference_extension = pcr_ext
- headlen += 6
- }
- var payload []byte
- var pespkg *PesPacket = nil
- if firstPesPacket {
- oldheadlen := headlen
- headlen += 19
- if !withaud && pes.streamtype == TS_STREAM_H264 {
- headlen += 6
- payload = append(payload, H264_AUD_NALU...)
- } else if !withaud && pes.streamtype == TS_STREAM_H265 {
- payload = append(payload, H265_AUD_NALU...)
- headlen += 7
- }
- pespkg = NewPesPacket()
- pespkg.PTS_DTS_flags = 0x03
- pespkg.PES_header_data_length = 10
- pespkg.Pts = pts
- pespkg.Dts = dts
- pespkg.Stream_id = uint8(findPESIDByStreamType(pes.streamtype))
- if idr_flag {
- pespkg.Data_alignment_indicator = 1
- }
- if headlen-oldheadlen-6+len(data) > 0xFFFF {
- pespkg.PES_packet_length = 0
- } else {
- pespkg.PES_packet_length = uint16(len(data) + headlen - oldheadlen - 6)
- }
- }
- if len(data)+headlen < TS_PAKCET_SIZE {
- if adaptation == nil {
- adaptation = new(Adaptation_field)
- headlen += 1
- if TS_PAKCET_SIZE-len(data)-headlen >= 1 {
- headlen += 1
- } else {
- adaptation.SingleStuffingByte = true
- }
- }
- adaptation.Stuffing_byte = uint8(TS_PAKCET_SIZE - len(data) - headlen)
- payload = append(payload, data...)
- data = data[:0]
- } else {
- payload = append(payload, data[0:TS_PAKCET_SIZE-headlen]...)
- data = data[TS_PAKCET_SIZE-headlen:]
- }
- if adaptation != nil {
- tshdr.Field = adaptation
- tshdr.Adaptation_field_control |= 0x02
- }
- tshdr.EncodeHeader(bsw)
- if pespkg != nil {
- pespkg.Pes_payload = payload
- pespkg.Encode(bsw)
- } else {
- bsw.PutBytes(payload)
- }
- firstPesPacket = false
- if mux.OnPacket != nil {
- if len(bsw.Bits()) != TS_PAKCET_SIZE {
- panic("packet ts packet failed")
- }
- mux.OnPacket(bsw.Bits())
- }
- if len(data) == 0 {
- break
- }
- }
- }
|