123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179 |
- // The MIT License (MIT)
- //
- // # Copyright (c) 2021 Winlin
- //
- // Permission is hereby granted, free of charge, to any person obtaining a copy of
- // this software and associated documentation files (the "Software"), to deal in
- // the Software without restriction, including without limitation the rights to
- // use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
- // the Software, and to permit persons to whom the Software is furnished to do so,
- // subject to the following conditions:
- //
- // The above copyright notice and this permission notice shall be included in all
- // copies or substantial portions of the Software.
- //
- // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
- // FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
- // COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
- // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
- // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
- package janus
- import (
- "bytes"
- "context"
- "encoding/json"
- "flag"
- "fmt"
- "io"
- "io/ioutil"
- "net"
- "net/http"
- "net/url"
- "os"
- "path"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/ossrs/go-oryx-lib/errors"
- "github.com/ossrs/go-oryx-lib/logger"
- vnet_proxy "github.com/ossrs/srs-bench/vnet"
- "github.com/pion/interceptor"
- "github.com/pion/logging"
- "github.com/pion/rtcp"
- "github.com/pion/transport/v2/vnet"
- "github.com/pion/webrtc/v3"
- "github.com/pion/webrtc/v3/pkg/media/h264reader"
- )
- var srsHttps *bool
- var srsLog *bool
- var srsTimeout *int
- var srsPlayPLI *int
- var srsPlayOKPackets *int
- var srsPublishOKPackets *int
- var srsPublishVideoFps *int
- var srsDTLSDropPackets *int
- var srsSchema string
- var srsServer *string
- var srsStream *string
- var srsPublishAudio *string
- var srsPublishVideo *string
- var srsVnetClientIP *string
- func prepareTest() error {
- var err error
- srsHttps = flag.Bool("srs-https", false, "Whther connect to HTTPS-API")
- srsServer = flag.String("srs-server", "127.0.0.1", "The RTC server to connect to")
- srsStream = flag.String("srs-stream", "/rtc/regression", "The RTC stream to play")
- srsLog = flag.Bool("srs-log", false, "Whether enable the detail log")
- srsTimeout = flag.Int("srs-timeout", 5000, "For each case, the timeout in ms")
- srsPlayPLI = flag.Int("srs-play-pli", 5000, "The PLI interval in seconds for player.")
- srsPlayOKPackets = flag.Int("srs-play-ok-packets", 10, "If recv N RTP packets, it's ok, or fail")
- srsPublishOKPackets = flag.Int("srs-publish-ok-packets", 3, "If send N RTP, recv N RTCP packets, it's ok, or fail")
- srsPublishAudio = flag.String("srs-publish-audio", "avatar.ogg", "The audio file for publisher.")
- srsPublishVideo = flag.String("srs-publish-video", "avatar.h264", "The video file for publisher.")
- srsPublishVideoFps = flag.Int("srs-publish-video-fps", 25, "The video fps for publisher.")
- srsVnetClientIP = flag.String("srs-vnet-client-ip", "192.168.168.168", "The client ip in pion/vnet.")
- srsDTLSDropPackets = flag.Int("srs-dtls-drop-packets", 5, "If dropped N packets, it's ok, or fail")
- // Should parse it first.
- flag.Parse()
- // The stream should starts with /, for example, /rtc/regression
- if !strings.HasPrefix(*srsStream, "/") {
- *srsStream = "/" + *srsStream
- }
- // Generate srs protocol from whether use HTTPS.
- srsSchema = "http"
- if *srsHttps {
- srsSchema = "https"
- }
- // Check file.
- tryOpenFile := func(filename string) (string, error) {
- if filename == "" {
- return filename, nil
- }
- f, err := os.Open(filename)
- if err != nil {
- nfilename := path.Join("../", filename)
- f2, err := os.Open(nfilename)
- if err != nil {
- return filename, errors.Wrapf(err, "No video file at %v or %v", filename, nfilename)
- }
- defer f2.Close()
- return nfilename, nil
- }
- defer f.Close()
- return filename, nil
- }
- if *srsPublishVideo, err = tryOpenFile(*srsPublishVideo); err != nil {
- return err
- }
- if *srsPublishAudio, err = tryOpenFile(*srsPublishAudio); err != nil {
- return err
- }
- return nil
- }
- func apiRtcRequest(ctx context.Context, apiPath, r, offer string) (string, error) {
- u, err := url.Parse(r)
- if err != nil {
- return "", errors.Wrapf(err, "Parse url %v", r)
- }
- // Build api url.
- host := u.Host
- if !strings.Contains(host, ":") {
- host += ":1985"
- }
- api := fmt.Sprintf("http://%v", host)
- if !strings.HasPrefix(apiPath, "/") {
- api += "/"
- }
- api += apiPath
- if !strings.HasSuffix(apiPath, "/") {
- api += "/"
- }
- if u.RawQuery != "" {
- api += "?" + u.RawQuery
- }
- // Build JSON body.
- reqBody := struct {
- Api string `json:"api"`
- ClientIP string `json:"clientip"`
- SDP string `json:"sdp"`
- StreamURL string `json:"streamurl"`
- }{
- api, "", offer, r,
- }
- b, err := json.Marshal(reqBody)
- if err != nil {
- return "", errors.Wrapf(err, "Marshal body %v", reqBody)
- }
- logger.If(ctx, "Request url api=%v with %v", api, string(b))
- logger.Tf(ctx, "Request url api=%v with %v bytes", api, len(b))
- req, err := http.NewRequest("POST", api, strings.NewReader(string(b)))
- if err != nil {
- return "", errors.Wrapf(err, "HTTP request %v", string(b))
- }
- res, err := http.DefaultClient.Do(req.WithContext(ctx))
- if err != nil {
- return "", errors.Wrapf(err, "Do HTTP request %v", string(b))
- }
- b2, err := ioutil.ReadAll(res.Body)
- if err != nil {
- return "", errors.Wrapf(err, "Read response for %v", string(b))
- }
- logger.If(ctx, "Response from %v is %v", api, string(b2))
- logger.Tf(ctx, "Response from %v is %v bytes", api, len(b2))
- resBody := struct {
- Code int `json:"code"`
- Session string `json:"sessionid"`
- SDP string `json:"sdp"`
- }{}
- if err := json.Unmarshal(b2, &resBody); err != nil {
- return "", errors.Wrapf(err, "Marshal %v", string(b2))
- }
- if resBody.Code != 0 {
- return "", errors.Errorf("Server fail code=%v %v", resBody.Code, string(b2))
- }
- logger.If(ctx, "Parse response to code=%v, session=%v, sdp=%v",
- resBody.Code, resBody.Session, escapeSDP(resBody.SDP))
- logger.Tf(ctx, "Parse response to code=%v, session=%v, sdp=%v bytes",
- resBody.Code, resBody.Session, len(resBody.SDP))
- return resBody.SDP, nil
- }
- func escapeSDP(sdp string) string {
- return strings.ReplaceAll(strings.ReplaceAll(sdp, "\r", "\\r"), "\n", "\\n")
- }
- func packageAsSTAPA(frames ...*h264reader.NAL) *h264reader.NAL {
- first := frames[0]
- buf := bytes.Buffer{}
- buf.WriteByte(
- first.RefIdc<<5&0x60 | byte(24), // STAP-A
- )
- for _, frame := range frames {
- buf.WriteByte(byte(len(frame.Data) >> 8))
- buf.WriteByte(byte(len(frame.Data)))
- buf.Write(frame.Data)
- }
- return &h264reader.NAL{
- PictureOrderCount: first.PictureOrderCount,
- ForbiddenZeroBit: false,
- RefIdc: first.RefIdc,
- UnitType: h264reader.NalUnitType(24), // STAP-A
- Data: buf.Bytes(),
- }
- }
- type wallClock struct {
- start time.Time
- duration time.Duration
- }
- func newWallClock() *wallClock {
- return &wallClock{start: time.Now()}
- }
- func (v *wallClock) Tick(d time.Duration) time.Duration {
- v.duration += d
- wc := time.Now().Sub(v.start)
- re := v.duration - wc
- if re > 30*time.Millisecond {
- return re
- }
- return 0
- }
- // Set to active, as DTLS client, to start ClientHello.
- func testUtilSetupActive(s *webrtc.SessionDescription) error {
- if strings.Contains(s.SDP, "setup:passive") {
- return errors.New("set to active")
- }
- s.SDP = strings.ReplaceAll(s.SDP, "setup:actpass", "setup:active")
- return nil
- }
- // Set to passive, as DTLS client, to start ClientHello.
- func testUtilSetupPassive(s *webrtc.SessionDescription) error {
- if strings.Contains(s.SDP, "setup:active") {
- return errors.New("set to passive")
- }
- s.SDP = strings.ReplaceAll(s.SDP, "setup:actpass", "setup:passive")
- return nil
- }
- // Parse address from SDP.
- // candidate:0 1 udp 2130706431 192.168.3.8 8000 typ host generation 0
- func parseAddressOfCandidate(answerSDP string) (*net.UDPAddr, error) {
- answer := webrtc.SessionDescription{Type: webrtc.SDPTypeAnswer, SDP: answerSDP}
- answerObject, err := answer.Unmarshal()
- if err != nil {
- return nil, errors.Wrapf(err, "unmarshal answer %v", answerSDP)
- }
- if len(answerObject.MediaDescriptions) == 0 {
- return nil, errors.New("no media")
- }
- candidate, ok := answerObject.MediaDescriptions[0].Attribute("candidate")
- if !ok {
- return nil, errors.New("no candidate")
- }
- // candidate:0 1 udp 2130706431 192.168.3.8 8000 typ host generation 0
- attrs := strings.Split(candidate, " ")
- if len(attrs) <= 6 {
- return nil, errors.Errorf("no address in %v", candidate)
- }
- // Parse ip and port from answer.
- ip := attrs[4]
- port, err := strconv.Atoi(attrs[5])
- if err != nil {
- return nil, errors.Wrapf(err, "invalid port %v", candidate)
- }
- address := fmt.Sprintf("%v:%v", ip, port)
- addr, err := net.ResolveUDPAddr("udp4", address)
- if err != nil {
- return nil, errors.Wrapf(err, "parse %v", address)
- }
- return addr, nil
- }
- // Filter the test error, ignore context.Canceled
- func filterTestError(errs ...error) error {
- var filteredErrors []error
- for _, err := range errs {
- if err == nil || errors.Cause(err) == context.Canceled {
- continue
- }
- // If url error, server maybe error, do not print the detail log.
- if r0 := errors.Cause(err); r0 != nil {
- if r1, ok := r0.(*url.Error); ok {
- err = r1
- }
- }
- filteredErrors = append(filteredErrors, err)
- }
- if len(filteredErrors) == 0 {
- return nil
- }
- if len(filteredErrors) == 1 {
- return filteredErrors[0]
- }
- var descs []string
- for i, err := range filteredErrors[1:] {
- descs = append(descs, fmt.Sprintf("err #%d, %+v", i, err))
- }
- return errors.Wrapf(filteredErrors[0], "with %v", strings.Join(descs, ","))
- }
- // For STUN packet, 0x00 is binding request, 0x01 is binding success response.
- // @see srs_is_stun of https://github.com/ossrs/srs
- func srsIsStun(b []byte) bool {
- return len(b) > 0 && (b[0] == 0 || b[0] == 1)
- }
- // change_cipher_spec(20), alert(21), handshake(22), application_data(23)
- // @see https://tools.ietf.org/html/rfc2246#section-6.2.1
- // @see srs_is_dtls of https://github.com/ossrs/srs
- func srsIsDTLS(b []byte) bool {
- return len(b) >= 13 && (b[0] > 19 && b[0] < 64)
- }
- // For RTP or RTCP, the V=2 which is in the high 2bits, 0xC0 (1100 0000)
- // @see srs_is_rtp_or_rtcp of https://github.com/ossrs/srs
- func srsIsRTPOrRTCP(b []byte) bool {
- return len(b) >= 12 && (b[0]&0xC0) == 0x80
- }
- // For RTCP, PT is [128, 223] (or without marker [0, 95]).
- // Literally, RTCP starts from 64 not 0, so PT is [192, 223] (or without marker [64, 95]).
- // @note For RTP, the PT is [96, 127], or [224, 255] with marker.
- // @see srs_is_rtcp of https://github.com/ossrs/srs
- func srsIsRTCP(b []byte) bool {
- return (len(b) >= 12) && (b[0]&0x80) != 0 && (b[1] >= 192 && b[1] <= 223)
- }
- type chunkType int
- const (
- chunkTypeICE chunkType = iota + 1
- chunkTypeDTLS
- chunkTypeRTP
- chunkTypeRTCP
- )
- func (v chunkType) String() string {
- switch v {
- case chunkTypeICE:
- return "ICE"
- case chunkTypeDTLS:
- return "DTLS"
- case chunkTypeRTP:
- return "RTP"
- case chunkTypeRTCP:
- return "RTCP"
- default:
- return "Unknown"
- }
- }
- type dtlsContentType int
- const (
- dtlsContentTypeHandshake dtlsContentType = 22
- dtlsContentTypeChangeCipherSpec dtlsContentType = 20
- dtlsContentTypeAlert dtlsContentType = 21
- )
- func (v dtlsContentType) String() string {
- switch v {
- case dtlsContentTypeHandshake:
- return "Handshake"
- case dtlsContentTypeChangeCipherSpec:
- return "ChangeCipherSpec"
- default:
- return "Unknown"
- }
- }
- type dtlsHandshakeType int
- const (
- dtlsHandshakeTypeClientHello dtlsHandshakeType = 1
- dtlsHandshakeTypeServerHello dtlsHandshakeType = 2
- dtlsHandshakeTypeCertificate dtlsHandshakeType = 11
- dtlsHandshakeTypeServerKeyExchange dtlsHandshakeType = 12
- dtlsHandshakeTypeCertificateRequest dtlsHandshakeType = 13
- dtlsHandshakeTypeServerDone dtlsHandshakeType = 14
- dtlsHandshakeTypeCertificateVerify dtlsHandshakeType = 15
- dtlsHandshakeTypeClientKeyExchange dtlsHandshakeType = 16
- dtlsHandshakeTypeFinished dtlsHandshakeType = 20
- )
- func (v dtlsHandshakeType) String() string {
- switch v {
- case dtlsHandshakeTypeClientHello:
- return "ClientHello"
- case dtlsHandshakeTypeServerHello:
- return "ServerHello"
- case dtlsHandshakeTypeCertificate:
- return "Certificate"
- case dtlsHandshakeTypeServerKeyExchange:
- return "ServerKeyExchange"
- case dtlsHandshakeTypeCertificateRequest:
- return "CertificateRequest"
- case dtlsHandshakeTypeServerDone:
- return "ServerDone"
- case dtlsHandshakeTypeCertificateVerify:
- return "CertificateVerify"
- case dtlsHandshakeTypeClientKeyExchange:
- return "ClientKeyExchange"
- case dtlsHandshakeTypeFinished:
- return "Finished"
- default:
- return "Unknown"
- }
- }
- type chunkMessageType struct {
- chunk chunkType
- content dtlsContentType
- handshake dtlsHandshakeType
- }
- func (v *chunkMessageType) String() string {
- if v.chunk == chunkTypeDTLS {
- if v.content == dtlsContentTypeHandshake {
- return fmt.Sprintf("%v-%v-%v", v.chunk, v.content, v.handshake)
- } else {
- return fmt.Sprintf("%v-%v", v.chunk, v.content)
- }
- }
- return fmt.Sprintf("%v", v.chunk)
- }
- func newChunkMessageType(c vnet.Chunk) (*chunkMessageType, bool) {
- b := c.UserData()
- if len(b) == 0 {
- return nil, false
- }
- v := &chunkMessageType{}
- if srsIsRTPOrRTCP(b) {
- if srsIsRTCP(b) {
- v.chunk = chunkTypeRTCP
- } else {
- v.chunk = chunkTypeRTP
- }
- return v, true
- }
- if srsIsStun(b) {
- v.chunk = chunkTypeICE
- return v, true
- }
- if !srsIsDTLS(b) {
- return nil, false
- }
- v.chunk, v.content = chunkTypeDTLS, dtlsContentType(b[0])
- if v.content != dtlsContentTypeHandshake {
- return v, true
- }
- if len(b) < 14 {
- return v, false
- }
- v.handshake = dtlsHandshakeType(b[13])
- return v, true
- }
- func (v *chunkMessageType) IsHandshake() bool {
- return v.chunk == chunkTypeDTLS && v.content == dtlsContentTypeHandshake
- }
- func (v *chunkMessageType) IsClientHello() bool {
- return v.chunk == chunkTypeDTLS && v.content == dtlsContentTypeHandshake && v.handshake == dtlsHandshakeTypeClientHello
- }
- func (v *chunkMessageType) IsServerHello() bool {
- return v.chunk == chunkTypeDTLS && v.content == dtlsContentTypeHandshake && v.handshake == dtlsHandshakeTypeServerHello
- }
- func (v *chunkMessageType) IsCertificate() bool {
- return v.chunk == chunkTypeDTLS && v.content == dtlsContentTypeHandshake && v.handshake == dtlsHandshakeTypeCertificate
- }
- func (v *chunkMessageType) IsChangeCipherSpec() bool {
- return v.chunk == chunkTypeDTLS && v.content == dtlsContentTypeChangeCipherSpec
- }
- type dtlsRecord struct {
- ContentType dtlsContentType
- Version uint16
- Epoch uint16
- SequenceNumber uint64
- Length uint16
- Data []byte
- }
- func newDTLSRecord(b []byte) (*dtlsRecord, error) {
- v := &dtlsRecord{}
- return v, v.Unmarshal(b)
- }
- func (v *dtlsRecord) String() string {
- return fmt.Sprintf("epoch=%v, sequence=%v", v.Epoch, v.SequenceNumber)
- }
- func (v *dtlsRecord) Equals(p *dtlsRecord) bool {
- return v.Epoch == p.Epoch && v.SequenceNumber == p.SequenceNumber
- }
- func (v *dtlsRecord) Unmarshal(b []byte) error {
- if len(b) < 13 {
- return errors.Errorf("requires 13B only %v", len(b))
- }
- v.ContentType = dtlsContentType(b[0])
- v.Version = uint16(b[1])<<8 | uint16(b[2])
- v.Epoch = uint16(b[3])<<8 | uint16(b[4])
- v.SequenceNumber = uint64(b[5])<<40 | uint64(b[6])<<32 | uint64(b[7])<<24 | uint64(b[8])<<16 | uint64(b[9])<<8 | uint64(b[10])
- v.Length = uint16(b[11])<<8 | uint16(b[12])
- v.Data = b[13:]
- return nil
- }
- type testWebRTCAPIOptionFunc func(api *testWebRTCAPI)
- type testWebRTCAPI struct {
- // The options to setup the api.
- options []testWebRTCAPIOptionFunc
- // The api and settings.
- api *webrtc.API
- mediaEngine *webrtc.MediaEngine
- registry *interceptor.Registry
- settingEngine *webrtc.SettingEngine
- // The vnet router, can be shared by different apis, but we do not share it.
- router *vnet.Router
- // The network for api.
- network *vnet.Net
- // The vnet UDP proxy bind to the router.
- proxy *vnet_proxy.UDPProxy
- }
- func newTestWebRTCAPI(options ...testWebRTCAPIOptionFunc) (*testWebRTCAPI, error) {
- v := &testWebRTCAPI{}
- v.mediaEngine = &webrtc.MediaEngine{}
- if err := v.mediaEngine.RegisterDefaultCodecs(); err != nil {
- return nil, err
- }
- v.registry = &interceptor.Registry{}
- if err := webrtc.RegisterDefaultInterceptors(v.mediaEngine, v.registry); err != nil {
- return nil, err
- }
- for _, setup := range options {
- setup(v)
- }
- v.settingEngine = &webrtc.SettingEngine{}
- return v, nil
- }
- func (v *testWebRTCAPI) Close() error {
- if v.proxy != nil {
- _ = v.proxy.Close()
- }
- if v.router != nil {
- _ = v.router.Stop()
- }
- return nil
- }
- func (v *testWebRTCAPI) Setup(vnetClientIP string, options ...testWebRTCAPIOptionFunc) error {
- // Setting engine for https://github.com/pion/transport/tree/master/vnet
- setupVnet := func(vnetClientIP string) (err error) {
- // We create a private router for a api, however, it's possible to share the
- // same router between apis.
- if v.router, err = vnet.NewRouter(&vnet.RouterConfig{
- CIDR: "0.0.0.0/0", // Accept all ip, no sub router.
- LoggerFactory: logging.NewDefaultLoggerFactory(),
- }); err != nil {
- return errors.Wrapf(err, "create router for api")
- }
- // Each api should bind to a network, however, it's possible to share it
- // for different apis.
- v.network, err = vnet.NewNet(&vnet.NetConfig{
- StaticIP: vnetClientIP,
- })
- if err != nil {
- return errors.Wrapf(err, "create network for api")
- }
- if err = v.router.AddNet(v.network); err != nil {
- return errors.Wrapf(err, "create network for api")
- }
- v.settingEngine.SetVNet(v.network)
- // Create a proxy bind to the router.
- if v.proxy, err = vnet_proxy.NewProxy(v.router); err != nil {
- return errors.Wrapf(err, "create proxy for router")
- }
- return v.router.Start()
- }
- if err := setupVnet(vnetClientIP); err != nil {
- return err
- }
- for _, setup := range options {
- setup(v)
- }
- for _, setup := range v.options {
- setup(v)
- }
- v.api = webrtc.NewAPI(
- webrtc.WithMediaEngine(v.mediaEngine),
- webrtc.WithInterceptorRegistry(v.registry),
- webrtc.WithSettingEngine(*v.settingEngine),
- )
- return nil
- }
- func (v *testWebRTCAPI) NewPeerConnection(configuration webrtc.Configuration) (*webrtc.PeerConnection, error) {
- return v.api.NewPeerConnection(configuration)
- }
- type testPlayerOptionFunc func(p *testPlayer) error
- type testPlayer struct {
- pc *webrtc.PeerConnection
- receivers []*webrtc.RTPReceiver
- // We should dispose it.
- api *testWebRTCAPI
- // Optional suffix for stream url.
- streamSuffix string
- }
- func createApiForPlayer(play *testPlayer) error {
- api, err := newTestWebRTCAPI()
- if err != nil {
- return err
- }
- play.api = api
- return nil
- }
- func newTestPlayer(options ...testPlayerOptionFunc) (*testPlayer, error) {
- v := &testPlayer{}
- for _, opt := range options {
- if err := opt(v); err != nil {
- return nil, err
- }
- }
- return v, nil
- }
- func (v *testPlayer) Setup(vnetClientIP string, options ...testWebRTCAPIOptionFunc) error {
- return v.api.Setup(vnetClientIP, options...)
- }
- func (v *testPlayer) Close() error {
- if v.pc != nil {
- _ = v.pc.Close()
- }
- for _, receiver := range v.receivers {
- _ = receiver.Stop()
- }
- if v.api != nil {
- _ = v.api.Close()
- }
- return nil
- }
- func (v *testPlayer) Run(ctx context.Context, cancel context.CancelFunc) error {
- r := fmt.Sprintf("%v://%v%v", srsSchema, *srsServer, *srsStream)
- if v.streamSuffix != "" {
- r = fmt.Sprintf("%v-%v", r, v.streamSuffix)
- }
- pli := time.Duration(*srsPlayPLI) * time.Millisecond
- logger.Tf(ctx, "Run play url=%v", r)
- pc, err := v.api.NewPeerConnection(webrtc.Configuration{})
- if err != nil {
- return errors.Wrapf(err, "Create PC")
- }
- v.pc = pc
- if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{
- Direction: webrtc.RTPTransceiverDirectionRecvonly,
- }); err != nil {
- return errors.Wrapf(err, "add track")
- }
- if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{
- Direction: webrtc.RTPTransceiverDirectionRecvonly,
- }); err != nil {
- return errors.Wrapf(err, "add track")
- }
- offer, err := pc.CreateOffer(nil)
- if err != nil {
- return errors.Wrapf(err, "Create Offer")
- }
- if err := pc.SetLocalDescription(offer); err != nil {
- return errors.Wrapf(err, "Set offer %v", offer)
- }
- answer, err := apiRtcRequest(ctx, "/rtc/v1/play", r, offer.SDP)
- if err != nil {
- return errors.Wrapf(err, "Api request offer=%v", offer.SDP)
- }
- // Run a proxy for real server and vnet.
- if address, err := parseAddressOfCandidate(answer); err != nil {
- return errors.Wrapf(err, "parse address of %v", answer)
- } else if err := v.api.proxy.Proxy(v.api.network, address); err != nil {
- return errors.Wrapf(err, "proxy %v to %v", v.api.network, address)
- }
- if err := pc.SetRemoteDescription(webrtc.SessionDescription{
- Type: webrtc.SDPTypeAnswer, SDP: answer,
- }); err != nil {
- return errors.Wrapf(err, "Set answer %v", answer)
- }
- handleTrack := func(ctx context.Context, track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) error {
- // Send a PLI on an interval so that the publisher is pushing a keyframe
- go func() {
- if track.Kind() == webrtc.RTPCodecTypeAudio {
- return
- }
- for {
- select {
- case <-ctx.Done():
- return
- case <-time.After(pli):
- _ = pc.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{
- MediaSSRC: uint32(track.SSRC()),
- }})
- }
- }
- }()
- v.receivers = append(v.receivers, receiver)
- for ctx.Err() == nil {
- _, _, err := track.ReadRTP()
- if err != nil {
- return errors.Wrapf(err, "Read RTP")
- }
- }
- return nil
- }
- pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
- err = handleTrack(ctx, track, receiver)
- if err != nil {
- codec := track.Codec()
- err = errors.Wrapf(err, "Handle track %v, pt=%v", codec.MimeType, codec.PayloadType)
- cancel()
- }
- })
- pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
- if state == webrtc.ICEConnectionStateFailed || state == webrtc.ICEConnectionStateClosed {
- err = errors.Errorf("Close for ICE state %v", state)
- cancel()
- }
- })
- <-ctx.Done()
- return err
- }
- type testPublisherOptionFunc func(p *testPublisher) error
- type testPublisher struct {
- onOffer func(s *webrtc.SessionDescription) error
- onAnswer func(s *webrtc.SessionDescription) error
- iceReadyCancel context.CancelFunc
- // internal objects
- aIngester *audioIngester
- vIngester *videoIngester
- pc *webrtc.PeerConnection
- // We should dispose it.
- api *testWebRTCAPI
- // Optional suffix for stream url.
- streamSuffix string
- // To cancel the publisher, pass by Run.
- cancel context.CancelFunc
- }
- func createApiForPublisher(pub *testPublisher) error {
- api, err := newTestWebRTCAPI()
- if err != nil {
- return err
- }
- pub.api = api
- return nil
- }
- func newTestPublisher(options ...testPublisherOptionFunc) (*testPublisher, error) {
- sourceVideo, sourceAudio := *srsPublishVideo, *srsPublishAudio
- v := &testPublisher{}
- for _, opt := range options {
- if err := opt(v); err != nil {
- return nil, err
- }
- }
- // Create ingesters.
- if sourceAudio != "" {
- v.aIngester = newAudioIngester(sourceAudio)
- }
- if sourceVideo != "" {
- v.vIngester = newVideoIngester(sourceVideo)
- }
- // Setup the interceptors for packets.
- api := v.api
- api.options = append(api.options, func(api *testWebRTCAPI) {
- // Filter for RTCP packets.
- rtcpInterceptor := &rtcpInterceptor{}
- rtcpInterceptor.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
- return rtcpInterceptor.nextRTCPReader.Read(buf, attributes)
- }
- rtcpInterceptor.rtcpWriter = func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) {
- return rtcpInterceptor.nextRTCPWriter.Write(pkts, attributes)
- }
- api.registry.Add(&rtcpInteceptorFactory{rtcpInterceptor})
- // Filter for ingesters.
- if sourceAudio != "" {
- api.registry.Add(&rtpInteceptorFactory{v.aIngester.audioLevelInterceptor})
- }
- if sourceVideo != "" {
- api.registry.Add(&rtpInteceptorFactory{v.vIngester.markerInterceptor})
- }
- })
- return v, nil
- }
- func (v *testPublisher) Setup(vnetClientIP string, options ...testWebRTCAPIOptionFunc) error {
- return v.api.Setup(vnetClientIP, options...)
- }
- func (v *testPublisher) Close() error {
- if v.vIngester != nil {
- _ = v.vIngester.Close()
- }
- if v.aIngester != nil {
- _ = v.aIngester.Close()
- }
- if v.pc != nil {
- _ = v.pc.Close()
- }
- if v.api != nil {
- _ = v.api.Close()
- }
- return nil
- }
- func (v *testPublisher) SetStreamSuffix(suffix string) *testPublisher {
- v.streamSuffix = suffix
- return v
- }
- func (v *testPublisher) Run(ctx context.Context, cancel context.CancelFunc) error {
- // Save the cancel.
- v.cancel = cancel
- r := fmt.Sprintf("%v://%v%v", srsSchema, *srsServer, *srsStream)
- if v.streamSuffix != "" {
- r = fmt.Sprintf("%v-%v", r, v.streamSuffix)
- }
- sourceVideo, sourceAudio, fps := *srsPublishVideo, *srsPublishAudio, *srsPublishVideoFps
- logger.Tf(ctx, "Run publish url=%v, audio=%v, video=%v, fps=%v",
- r, sourceAudio, sourceVideo, fps)
- pc, err := v.api.NewPeerConnection(webrtc.Configuration{})
- if err != nil {
- return errors.Wrapf(err, "Create PC")
- }
- v.pc = pc
- if v.vIngester != nil {
- if err := v.vIngester.AddTrack(pc, fps); err != nil {
- return errors.Wrapf(err, "Add track")
- }
- }
- if v.aIngester != nil {
- if err := v.aIngester.AddTrack(pc); err != nil {
- return errors.Wrapf(err, "Add track")
- }
- }
- offer, err := pc.CreateOffer(nil)
- if err != nil {
- return errors.Wrapf(err, "Create Offer")
- }
- if err := pc.SetLocalDescription(offer); err != nil {
- return errors.Wrapf(err, "Set offer %v", offer)
- }
- if v.onOffer != nil {
- if err := v.onOffer(&offer); err != nil {
- return errors.Wrapf(err, "sdp %v %v", offer.Type, offer.SDP)
- }
- }
- answerSDP, err := apiRtcRequest(ctx, "/rtc/v1/publish", r, offer.SDP)
- if err != nil {
- return errors.Wrapf(err, "Api request offer=%v", offer.SDP)
- }
- // Run a proxy for real server and vnet.
- if address, err := parseAddressOfCandidate(answerSDP); err != nil {
- return errors.Wrapf(err, "parse address of %v", answerSDP)
- } else if err := v.api.proxy.Proxy(v.api.network, address); err != nil {
- return errors.Wrapf(err, "proxy %v to %v", v.api.network, address)
- }
- answer := &webrtc.SessionDescription{
- Type: webrtc.SDPTypeAnswer, SDP: answerSDP,
- }
- if v.onAnswer != nil {
- if err := v.onAnswer(answer); err != nil {
- return errors.Wrapf(err, "on answerSDP")
- }
- }
- if err := pc.SetRemoteDescription(*answer); err != nil {
- return errors.Wrapf(err, "Set answerSDP %v", answerSDP)
- }
- logger.Tf(ctx, "State signaling=%v, ice=%v, conn=%v", pc.SignalingState(), pc.ICEConnectionState(), pc.ConnectionState())
- // ICE state management.
- pc.OnICEGatheringStateChange(func(state webrtc.ICEGathererState) {
- logger.Tf(ctx, "ICE gather state %v", state)
- })
- pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
- logger.Tf(ctx, "ICE candidate %v %v:%v", candidate.Protocol, candidate.Address, candidate.Port)
- })
- pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
- logger.Tf(ctx, "ICE state %v", state)
- })
- pc.OnSignalingStateChange(func(state webrtc.SignalingState) {
- logger.Tf(ctx, "Signaling state %v", state)
- })
- if v.aIngester != nil {
- v.aIngester.sAudioSender.Transport().OnStateChange(func(state webrtc.DTLSTransportState) {
- logger.Tf(ctx, "DTLS state %v", state)
- })
- }
- pcDone, pcDoneCancel := context.WithCancel(context.Background())
- pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
- logger.Tf(ctx, "PC state %v", state)
- if state == webrtc.PeerConnectionStateConnected {
- pcDoneCancel()
- if v.iceReadyCancel != nil {
- v.iceReadyCancel()
- }
- }
- if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateClosed {
- err = errors.Errorf("Close for PC state %v", state)
- cancel()
- }
- })
- // Wait for event from context or tracks.
- var wg sync.WaitGroup
- var finalErr error
- wg.Add(1)
- go func() {
- defer wg.Done()
- defer logger.Tf(ctx, "ingest notify done")
- <-ctx.Done()
- if v.aIngester != nil && v.aIngester.sAudioSender != nil {
- // We MUST wait for the ingester ready(or closed), because it might crash if sender is disposed.
- <-v.aIngester.ready.Done()
- _ = v.aIngester.Close()
- }
- if v.vIngester != nil && v.vIngester.sVideoSender != nil {
- // We MUST wait for the ingester ready(or closed), because it might crash if sender is disposed.
- <-v.vIngester.ready.Done()
- _ = v.vIngester.Close()
- }
- }()
- wg.Add(1)
- go func() {
- defer wg.Done()
- defer cancel()
- if v.aIngester == nil {
- return
- }
- defer v.aIngester.readyCancel()
- select {
- case <-ctx.Done():
- return
- case <-pcDone.Done():
- }
- wg.Add(1)
- go func() {
- defer wg.Done()
- defer logger.Tf(ctx, "aingester sender read done")
- buf := make([]byte, 1500)
- for ctx.Err() == nil {
- if _, _, err := v.aIngester.sAudioSender.Read(buf); err != nil {
- return
- }
- }
- }()
- for {
- if err := v.aIngester.Ingest(ctx); err != nil {
- if err == io.EOF {
- logger.Tf(ctx, "aingester retry for %v", err)
- continue
- }
- if err != context.Canceled {
- finalErr = errors.Wrapf(err, "audio")
- }
- logger.Tf(ctx, "aingester err=%v, final=%v", err, finalErr)
- return
- }
- }
- }()
- wg.Add(1)
- go func() {
- defer wg.Done()
- defer cancel()
- if v.vIngester == nil {
- return
- }
- defer v.vIngester.readyCancel()
- select {
- case <-ctx.Done():
- return
- case <-pcDone.Done():
- logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start ingest video %v", sourceVideo)
- }
- wg.Add(1)
- go func() {
- defer wg.Done()
- defer logger.Tf(ctx, "vingester sender read done")
- buf := make([]byte, 1500)
- for ctx.Err() == nil {
- // The Read() might block in r.rtcpInterceptor.Read(b, a),
- // so that the Stop() can not stop it.
- if _, _, err := v.vIngester.sVideoSender.Read(buf); err != nil {
- return
- }
- }
- }()
- for {
- if err := v.vIngester.Ingest(ctx); err != nil {
- if err == io.EOF {
- logger.Tf(ctx, "vingester retry for %v", err)
- continue
- }
- if err != context.Canceled {
- finalErr = errors.Wrapf(err, "video")
- }
- logger.Tf(ctx, "vingester err=%v, final=%v", err, finalErr)
- return
- }
- }
- }()
- wg.Wait()
- logger.Tf(ctx, "ingester done ctx=%v, final=%v", ctx.Err(), finalErr)
- if finalErr != nil {
- return finalErr
- }
- return ctx.Err()
- }
|