util.go 30 KB


  1. // The MIT License (MIT)
  2. //
  3. // # Copyright (c) 2021 Winlin
  4. //
  5. // Permission is hereby granted, free of charge, to any person obtaining a copy of
  6. // this software and associated documentation files (the "Software"), to deal in
  7. // the Software without restriction, including without limitation the rights to
  8. // use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  9. // the Software, and to permit persons to whom the Software is furnished to do so,
  10. // subject to the following conditions:
  11. //
  12. // The above copyright notice and this permission notice shall be included in all
  13. // copies or substantial portions of the Software.
  14. //
  15. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  16. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  17. // FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  18. // COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  19. // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  20. // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  21. package janus
  22. import (
  23. "bytes"
  24. "context"
  25. "encoding/json"
  26. "flag"
  27. "fmt"
  28. "io"
  29. "io/ioutil"
  30. "net"
  31. "net/http"
  32. "net/url"
  33. "os"
  34. "path"
  35. "strconv"
  36. "strings"
  37. "sync"
  38. "time"
  39. "github.com/ossrs/go-oryx-lib/errors"
  40. "github.com/ossrs/go-oryx-lib/logger"
  41. vnet_proxy "github.com/ossrs/srs-bench/vnet"
  42. "github.com/pion/interceptor"
  43. "github.com/pion/logging"
  44. "github.com/pion/rtcp"
  45. "github.com/pion/transport/v2/vnet"
  46. "github.com/pion/webrtc/v3"
  47. "github.com/pion/webrtc/v3/pkg/media/h264reader"
  48. )
  49. var srsHttps *bool
  50. var srsLog *bool
  51. var srsTimeout *int
  52. var srsPlayPLI *int
  53. var srsPlayOKPackets *int
  54. var srsPublishOKPackets *int
  55. var srsPublishVideoFps *int
  56. var srsDTLSDropPackets *int
  57. var srsSchema string
  58. var srsServer *string
  59. var srsStream *string
  60. var srsPublishAudio *string
  61. var srsPublishVideo *string
  62. var srsVnetClientIP *string
  63. func prepareTest() error {
  64. var err error
  65. srsHttps = flag.Bool("srs-https", false, "Whther connect to HTTPS-API")
  66. srsServer = flag.String("srs-server", "127.0.0.1", "The RTC server to connect to")
  67. srsStream = flag.String("srs-stream", "/rtc/regression", "The RTC stream to play")
  68. srsLog = flag.Bool("srs-log", false, "Whether enable the detail log")
  69. srsTimeout = flag.Int("srs-timeout", 5000, "For each case, the timeout in ms")
  70. srsPlayPLI = flag.Int("srs-play-pli", 5000, "The PLI interval in seconds for player.")
  71. srsPlayOKPackets = flag.Int("srs-play-ok-packets", 10, "If recv N RTP packets, it's ok, or fail")
  72. srsPublishOKPackets = flag.Int("srs-publish-ok-packets", 3, "If send N RTP, recv N RTCP packets, it's ok, or fail")
  73. srsPublishAudio = flag.String("srs-publish-audio", "avatar.ogg", "The audio file for publisher.")
  74. srsPublishVideo = flag.String("srs-publish-video", "avatar.h264", "The video file for publisher.")
  75. srsPublishVideoFps = flag.Int("srs-publish-video-fps", 25, "The video fps for publisher.")
  76. srsVnetClientIP = flag.String("srs-vnet-client-ip", "192.168.168.168", "The client ip in pion/vnet.")
  77. srsDTLSDropPackets = flag.Int("srs-dtls-drop-packets", 5, "If dropped N packets, it's ok, or fail")
  78. // Should parse it first.
  79. flag.Parse()
  80. // The stream should starts with /, for example, /rtc/regression
  81. if !strings.HasPrefix(*srsStream, "/") {
  82. *srsStream = "/" + *srsStream
  83. }
  84. // Generate srs protocol from whether use HTTPS.
  85. srsSchema = "http"
  86. if *srsHttps {
  87. srsSchema = "https"
  88. }
  89. // Check file.
  90. tryOpenFile := func(filename string) (string, error) {
  91. if filename == "" {
  92. return filename, nil
  93. }
  94. f, err := os.Open(filename)
  95. if err != nil {
  96. nfilename := path.Join("../", filename)
  97. f2, err := os.Open(nfilename)
  98. if err != nil {
  99. return filename, errors.Wrapf(err, "No video file at %v or %v", filename, nfilename)
  100. }
  101. defer f2.Close()
  102. return nfilename, nil
  103. }
  104. defer f.Close()
  105. return filename, nil
  106. }
  107. if *srsPublishVideo, err = tryOpenFile(*srsPublishVideo); err != nil {
  108. return err
  109. }
  110. if *srsPublishAudio, err = tryOpenFile(*srsPublishAudio); err != nil {
  111. return err
  112. }
  113. return nil
  114. }
  115. func apiRtcRequest(ctx context.Context, apiPath, r, offer string) (string, error) {
  116. u, err := url.Parse(r)
  117. if err != nil {
  118. return "", errors.Wrapf(err, "Parse url %v", r)
  119. }
  120. // Build api url.
  121. host := u.Host
  122. if !strings.Contains(host, ":") {
  123. host += ":1985"
  124. }
  125. api := fmt.Sprintf("http://%v", host)
  126. if !strings.HasPrefix(apiPath, "/") {
  127. api += "/"
  128. }
  129. api += apiPath
  130. if !strings.HasSuffix(apiPath, "/") {
  131. api += "/"
  132. }
  133. if u.RawQuery != "" {
  134. api += "?" + u.RawQuery
  135. }
  136. // Build JSON body.
  137. reqBody := struct {
  138. Api string `json:"api"`
  139. ClientIP string `json:"clientip"`
  140. SDP string `json:"sdp"`
  141. StreamURL string `json:"streamurl"`
  142. }{
  143. api, "", offer, r,
  144. }
  145. b, err := json.Marshal(reqBody)
  146. if err != nil {
  147. return "", errors.Wrapf(err, "Marshal body %v", reqBody)
  148. }
  149. logger.If(ctx, "Request url api=%v with %v", api, string(b))
  150. logger.Tf(ctx, "Request url api=%v with %v bytes", api, len(b))
  151. req, err := http.NewRequest("POST", api, strings.NewReader(string(b)))
  152. if err != nil {
  153. return "", errors.Wrapf(err, "HTTP request %v", string(b))
  154. }
  155. res, err := http.DefaultClient.Do(req.WithContext(ctx))
  156. if err != nil {
  157. return "", errors.Wrapf(err, "Do HTTP request %v", string(b))
  158. }
  159. b2, err := ioutil.ReadAll(res.Body)
  160. if err != nil {
  161. return "", errors.Wrapf(err, "Read response for %v", string(b))
  162. }
  163. logger.If(ctx, "Response from %v is %v", api, string(b2))
  164. logger.Tf(ctx, "Response from %v is %v bytes", api, len(b2))
  165. resBody := struct {
  166. Code int `json:"code"`
  167. Session string `json:"sessionid"`
  168. SDP string `json:"sdp"`
  169. }{}
  170. if err := json.Unmarshal(b2, &resBody); err != nil {
  171. return "", errors.Wrapf(err, "Marshal %v", string(b2))
  172. }
  173. if resBody.Code != 0 {
  174. return "", errors.Errorf("Server fail code=%v %v", resBody.Code, string(b2))
  175. }
  176. logger.If(ctx, "Parse response to code=%v, session=%v, sdp=%v",
  177. resBody.Code, resBody.Session, escapeSDP(resBody.SDP))
  178. logger.Tf(ctx, "Parse response to code=%v, session=%v, sdp=%v bytes",
  179. resBody.Code, resBody.Session, len(resBody.SDP))
  180. return resBody.SDP, nil
  181. }
  182. func escapeSDP(sdp string) string {
  183. return strings.ReplaceAll(strings.ReplaceAll(sdp, "\r", "\\r"), "\n", "\\n")
  184. }
  185. func packageAsSTAPA(frames ...*h264reader.NAL) *h264reader.NAL {
  186. first := frames[0]
  187. buf := bytes.Buffer{}
  188. buf.WriteByte(
  189. first.RefIdc<<5&0x60 | byte(24), // STAP-A
  190. )
  191. for _, frame := range frames {
  192. buf.WriteByte(byte(len(frame.Data) >> 8))
  193. buf.WriteByte(byte(len(frame.Data)))
  194. buf.Write(frame.Data)
  195. }
  196. return &h264reader.NAL{
  197. PictureOrderCount: first.PictureOrderCount,
  198. ForbiddenZeroBit: false,
  199. RefIdc: first.RefIdc,
  200. UnitType: h264reader.NalUnitType(24), // STAP-A
  201. Data: buf.Bytes(),
  202. }
  203. }
  204. type wallClock struct {
  205. start time.Time
  206. duration time.Duration
  207. }
  208. func newWallClock() *wallClock {
  209. return &wallClock{start: time.Now()}
  210. }
  211. func (v *wallClock) Tick(d time.Duration) time.Duration {
  212. v.duration += d
  213. wc := time.Now().Sub(v.start)
  214. re := v.duration - wc
  215. if re > 30*time.Millisecond {
  216. return re
  217. }
  218. return 0
  219. }
  220. // Set to active, as DTLS client, to start ClientHello.
  221. func testUtilSetupActive(s *webrtc.SessionDescription) error {
  222. if strings.Contains(s.SDP, "setup:passive") {
  223. return errors.New("set to active")
  224. }
  225. s.SDP = strings.ReplaceAll(s.SDP, "setup:actpass", "setup:active")
  226. return nil
  227. }
  228. // Set to passive, as DTLS client, to start ClientHello.
  229. func testUtilSetupPassive(s *webrtc.SessionDescription) error {
  230. if strings.Contains(s.SDP, "setup:active") {
  231. return errors.New("set to passive")
  232. }
  233. s.SDP = strings.ReplaceAll(s.SDP, "setup:actpass", "setup:passive")
  234. return nil
  235. }
  236. // Parse address from SDP.
  237. // candidate:0 1 udp 2130706431 192.168.3.8 8000 typ host generation 0
  238. func parseAddressOfCandidate(answerSDP string) (*net.UDPAddr, error) {
  239. answer := webrtc.SessionDescription{Type: webrtc.SDPTypeAnswer, SDP: answerSDP}
  240. answerObject, err := answer.Unmarshal()
  241. if err != nil {
  242. return nil, errors.Wrapf(err, "unmarshal answer %v", answerSDP)
  243. }
  244. if len(answerObject.MediaDescriptions) == 0 {
  245. return nil, errors.New("no media")
  246. }
  247. candidate, ok := answerObject.MediaDescriptions[0].Attribute("candidate")
  248. if !ok {
  249. return nil, errors.New("no candidate")
  250. }
  251. // candidate:0 1 udp 2130706431 192.168.3.8 8000 typ host generation 0
  252. attrs := strings.Split(candidate, " ")
  253. if len(attrs) <= 6 {
  254. return nil, errors.Errorf("no address in %v", candidate)
  255. }
  256. // Parse ip and port from answer.
  257. ip := attrs[4]
  258. port, err := strconv.Atoi(attrs[5])
  259. if err != nil {
  260. return nil, errors.Wrapf(err, "invalid port %v", candidate)
  261. }
  262. address := fmt.Sprintf("%v:%v", ip, port)
  263. addr, err := net.ResolveUDPAddr("udp4", address)
  264. if err != nil {
  265. return nil, errors.Wrapf(err, "parse %v", address)
  266. }
  267. return addr, nil
  268. }
  269. // Filter the test error, ignore context.Canceled
  270. func filterTestError(errs ...error) error {
  271. var filteredErrors []error
  272. for _, err := range errs {
  273. if err == nil || errors.Cause(err) == context.Canceled {
  274. continue
  275. }
  276. // If url error, server maybe error, do not print the detail log.
  277. if r0 := errors.Cause(err); r0 != nil {
  278. if r1, ok := r0.(*url.Error); ok {
  279. err = r1
  280. }
  281. }
  282. filteredErrors = append(filteredErrors, err)
  283. }
  284. if len(filteredErrors) == 0 {
  285. return nil
  286. }
  287. if len(filteredErrors) == 1 {
  288. return filteredErrors[0]
  289. }
  290. var descs []string
  291. for i, err := range filteredErrors[1:] {
  292. descs = append(descs, fmt.Sprintf("err #%d, %+v", i, err))
  293. }
  294. return errors.Wrapf(filteredErrors[0], "with %v", strings.Join(descs, ","))
  295. }
  296. // For STUN packet, 0x00 is binding request, 0x01 is binding success response.
  297. // @see srs_is_stun of https://github.com/ossrs/srs
  298. func srsIsStun(b []byte) bool {
  299. return len(b) > 0 && (b[0] == 0 || b[0] == 1)
  300. }
  301. // change_cipher_spec(20), alert(21), handshake(22), application_data(23)
  302. // @see https://tools.ietf.org/html/rfc2246#section-6.2.1
  303. // @see srs_is_dtls of https://github.com/ossrs/srs
  304. func srsIsDTLS(b []byte) bool {
  305. return len(b) >= 13 && (b[0] > 19 && b[0] < 64)
  306. }
  307. // For RTP or RTCP, the V=2 which is in the high 2bits, 0xC0 (1100 0000)
  308. // @see srs_is_rtp_or_rtcp of https://github.com/ossrs/srs
  309. func srsIsRTPOrRTCP(b []byte) bool {
  310. return len(b) >= 12 && (b[0]&0xC0) == 0x80
  311. }
  312. // For RTCP, PT is [128, 223] (or without marker [0, 95]).
  313. // Literally, RTCP starts from 64 not 0, so PT is [192, 223] (or without marker [64, 95]).
  314. // @note For RTP, the PT is [96, 127], or [224, 255] with marker.
  315. // @see srs_is_rtcp of https://github.com/ossrs/srs
  316. func srsIsRTCP(b []byte) bool {
  317. return (len(b) >= 12) && (b[0]&0x80) != 0 && (b[1] >= 192 && b[1] <= 223)
  318. }
  319. type chunkType int
  320. const (
  321. chunkTypeICE chunkType = iota + 1
  322. chunkTypeDTLS
  323. chunkTypeRTP
  324. chunkTypeRTCP
  325. )
  326. func (v chunkType) String() string {
  327. switch v {
  328. case chunkTypeICE:
  329. return "ICE"
  330. case chunkTypeDTLS:
  331. return "DTLS"
  332. case chunkTypeRTP:
  333. return "RTP"
  334. case chunkTypeRTCP:
  335. return "RTCP"
  336. default:
  337. return "Unknown"
  338. }
  339. }
  340. type dtlsContentType int
  341. const (
  342. dtlsContentTypeHandshake dtlsContentType = 22
  343. dtlsContentTypeChangeCipherSpec dtlsContentType = 20
  344. dtlsContentTypeAlert dtlsContentType = 21
  345. )
  346. func (v dtlsContentType) String() string {
  347. switch v {
  348. case dtlsContentTypeHandshake:
  349. return "Handshake"
  350. case dtlsContentTypeChangeCipherSpec:
  351. return "ChangeCipherSpec"
  352. default:
  353. return "Unknown"
  354. }
  355. }
  356. type dtlsHandshakeType int
  357. const (
  358. dtlsHandshakeTypeClientHello dtlsHandshakeType = 1
  359. dtlsHandshakeTypeServerHello dtlsHandshakeType = 2
  360. dtlsHandshakeTypeCertificate dtlsHandshakeType = 11
  361. dtlsHandshakeTypeServerKeyExchange dtlsHandshakeType = 12
  362. dtlsHandshakeTypeCertificateRequest dtlsHandshakeType = 13
  363. dtlsHandshakeTypeServerDone dtlsHandshakeType = 14
  364. dtlsHandshakeTypeCertificateVerify dtlsHandshakeType = 15
  365. dtlsHandshakeTypeClientKeyExchange dtlsHandshakeType = 16
  366. dtlsHandshakeTypeFinished dtlsHandshakeType = 20
  367. )
  368. func (v dtlsHandshakeType) String() string {
  369. switch v {
  370. case dtlsHandshakeTypeClientHello:
  371. return "ClientHello"
  372. case dtlsHandshakeTypeServerHello:
  373. return "ServerHello"
  374. case dtlsHandshakeTypeCertificate:
  375. return "Certificate"
  376. case dtlsHandshakeTypeServerKeyExchange:
  377. return "ServerKeyExchange"
  378. case dtlsHandshakeTypeCertificateRequest:
  379. return "CertificateRequest"
  380. case dtlsHandshakeTypeServerDone:
  381. return "ServerDone"
  382. case dtlsHandshakeTypeCertificateVerify:
  383. return "CertificateVerify"
  384. case dtlsHandshakeTypeClientKeyExchange:
  385. return "ClientKeyExchange"
  386. case dtlsHandshakeTypeFinished:
  387. return "Finished"
  388. default:
  389. return "Unknown"
  390. }
  391. }
  392. type chunkMessageType struct {
  393. chunk chunkType
  394. content dtlsContentType
  395. handshake dtlsHandshakeType
  396. }
  397. func (v *chunkMessageType) String() string {
  398. if v.chunk == chunkTypeDTLS {
  399. if v.content == dtlsContentTypeHandshake {
  400. return fmt.Sprintf("%v-%v-%v", v.chunk, v.content, v.handshake)
  401. } else {
  402. return fmt.Sprintf("%v-%v", v.chunk, v.content)
  403. }
  404. }
  405. return fmt.Sprintf("%v", v.chunk)
  406. }
  407. func newChunkMessageType(c vnet.Chunk) (*chunkMessageType, bool) {
  408. b := c.UserData()
  409. if len(b) == 0 {
  410. return nil, false
  411. }
  412. v := &chunkMessageType{}
  413. if srsIsRTPOrRTCP(b) {
  414. if srsIsRTCP(b) {
  415. v.chunk = chunkTypeRTCP
  416. } else {
  417. v.chunk = chunkTypeRTP
  418. }
  419. return v, true
  420. }
  421. if srsIsStun(b) {
  422. v.chunk = chunkTypeICE
  423. return v, true
  424. }
  425. if !srsIsDTLS(b) {
  426. return nil, false
  427. }
  428. v.chunk, v.content = chunkTypeDTLS, dtlsContentType(b[0])
  429. if v.content != dtlsContentTypeHandshake {
  430. return v, true
  431. }
  432. if len(b) < 14 {
  433. return v, false
  434. }
  435. v.handshake = dtlsHandshakeType(b[13])
  436. return v, true
  437. }
  438. func (v *chunkMessageType) IsHandshake() bool {
  439. return v.chunk == chunkTypeDTLS && v.content == dtlsContentTypeHandshake
  440. }
  441. func (v *chunkMessageType) IsClientHello() bool {
  442. return v.chunk == chunkTypeDTLS && v.content == dtlsContentTypeHandshake && v.handshake == dtlsHandshakeTypeClientHello
  443. }
  444. func (v *chunkMessageType) IsServerHello() bool {
  445. return v.chunk == chunkTypeDTLS && v.content == dtlsContentTypeHandshake && v.handshake == dtlsHandshakeTypeServerHello
  446. }
  447. func (v *chunkMessageType) IsCertificate() bool {
  448. return v.chunk == chunkTypeDTLS && v.content == dtlsContentTypeHandshake && v.handshake == dtlsHandshakeTypeCertificate
  449. }
  450. func (v *chunkMessageType) IsChangeCipherSpec() bool {
  451. return v.chunk == chunkTypeDTLS && v.content == dtlsContentTypeChangeCipherSpec
  452. }
  453. type dtlsRecord struct {
  454. ContentType dtlsContentType
  455. Version uint16
  456. Epoch uint16
  457. SequenceNumber uint64
  458. Length uint16
  459. Data []byte
  460. }
  461. func newDTLSRecord(b []byte) (*dtlsRecord, error) {
  462. v := &dtlsRecord{}
  463. return v, v.Unmarshal(b)
  464. }
  465. func (v *dtlsRecord) String() string {
  466. return fmt.Sprintf("epoch=%v, sequence=%v", v.Epoch, v.SequenceNumber)
  467. }
  468. func (v *dtlsRecord) Equals(p *dtlsRecord) bool {
  469. return v.Epoch == p.Epoch && v.SequenceNumber == p.SequenceNumber
  470. }
  471. func (v *dtlsRecord) Unmarshal(b []byte) error {
  472. if len(b) < 13 {
  473. return errors.Errorf("requires 13B only %v", len(b))
  474. }
  475. v.ContentType = dtlsContentType(b[0])
  476. v.Version = uint16(b[1])<<8 | uint16(b[2])
  477. v.Epoch = uint16(b[3])<<8 | uint16(b[4])
  478. v.SequenceNumber = uint64(b[5])<<40 | uint64(b[6])<<32 | uint64(b[7])<<24 | uint64(b[8])<<16 | uint64(b[9])<<8 | uint64(b[10])
  479. v.Length = uint16(b[11])<<8 | uint16(b[12])
  480. v.Data = b[13:]
  481. return nil
  482. }
  483. type testWebRTCAPIOptionFunc func(api *testWebRTCAPI)
  484. type testWebRTCAPI struct {
  485. // The options to setup the api.
  486. options []testWebRTCAPIOptionFunc
  487. // The api and settings.
  488. api *webrtc.API
  489. mediaEngine *webrtc.MediaEngine
  490. registry *interceptor.Registry
  491. settingEngine *webrtc.SettingEngine
  492. // The vnet router, can be shared by different apis, but we do not share it.
  493. router *vnet.Router
  494. // The network for api.
  495. network *vnet.Net
  496. // The vnet UDP proxy bind to the router.
  497. proxy *vnet_proxy.UDPProxy
  498. }
  499. func newTestWebRTCAPI(options ...testWebRTCAPIOptionFunc) (*testWebRTCAPI, error) {
  500. v := &testWebRTCAPI{}
  501. v.mediaEngine = &webrtc.MediaEngine{}
  502. if err := v.mediaEngine.RegisterDefaultCodecs(); err != nil {
  503. return nil, err
  504. }
  505. v.registry = &interceptor.Registry{}
  506. if err := webrtc.RegisterDefaultInterceptors(v.mediaEngine, v.registry); err != nil {
  507. return nil, err
  508. }
  509. for _, setup := range options {
  510. setup(v)
  511. }
  512. v.settingEngine = &webrtc.SettingEngine{}
  513. return v, nil
  514. }
  515. func (v *testWebRTCAPI) Close() error {
  516. if v.proxy != nil {
  517. _ = v.proxy.Close()
  518. }
  519. if v.router != nil {
  520. _ = v.router.Stop()
  521. }
  522. return nil
  523. }
  524. func (v *testWebRTCAPI) Setup(vnetClientIP string, options ...testWebRTCAPIOptionFunc) error {
  525. // Setting engine for https://github.com/pion/transport/tree/master/vnet
  526. setupVnet := func(vnetClientIP string) (err error) {
  527. // We create a private router for a api, however, it's possible to share the
  528. // same router between apis.
  529. if v.router, err = vnet.NewRouter(&vnet.RouterConfig{
  530. CIDR: "0.0.0.0/0", // Accept all ip, no sub router.
  531. LoggerFactory: logging.NewDefaultLoggerFactory(),
  532. }); err != nil {
  533. return errors.Wrapf(err, "create router for api")
  534. }
  535. // Each api should bind to a network, however, it's possible to share it
  536. // for different apis.
  537. v.network, err = vnet.NewNet(&vnet.NetConfig{
  538. StaticIP: vnetClientIP,
  539. })
  540. if err != nil {
  541. return errors.Wrapf(err, "create network for api")
  542. }
  543. if err = v.router.AddNet(v.network); err != nil {
  544. return errors.Wrapf(err, "create network for api")
  545. }
  546. v.settingEngine.SetVNet(v.network)
  547. // Create a proxy bind to the router.
  548. if v.proxy, err = vnet_proxy.NewProxy(v.router); err != nil {
  549. return errors.Wrapf(err, "create proxy for router")
  550. }
  551. return v.router.Start()
  552. }
  553. if err := setupVnet(vnetClientIP); err != nil {
  554. return err
  555. }
  556. for _, setup := range options {
  557. setup(v)
  558. }
  559. for _, setup := range v.options {
  560. setup(v)
  561. }
  562. v.api = webrtc.NewAPI(
  563. webrtc.WithMediaEngine(v.mediaEngine),
  564. webrtc.WithInterceptorRegistry(v.registry),
  565. webrtc.WithSettingEngine(*v.settingEngine),
  566. )
  567. return nil
  568. }
  569. func (v *testWebRTCAPI) NewPeerConnection(configuration webrtc.Configuration) (*webrtc.PeerConnection, error) {
  570. return v.api.NewPeerConnection(configuration)
  571. }
  572. type testPlayerOptionFunc func(p *testPlayer) error
  573. type testPlayer struct {
  574. pc *webrtc.PeerConnection
  575. receivers []*webrtc.RTPReceiver
  576. // We should dispose it.
  577. api *testWebRTCAPI
  578. // Optional suffix for stream url.
  579. streamSuffix string
  580. }
  581. func createApiForPlayer(play *testPlayer) error {
  582. api, err := newTestWebRTCAPI()
  583. if err != nil {
  584. return err
  585. }
  586. play.api = api
  587. return nil
  588. }
  589. func newTestPlayer(options ...testPlayerOptionFunc) (*testPlayer, error) {
  590. v := &testPlayer{}
  591. for _, opt := range options {
  592. if err := opt(v); err != nil {
  593. return nil, err
  594. }
  595. }
  596. return v, nil
  597. }
  598. func (v *testPlayer) Setup(vnetClientIP string, options ...testWebRTCAPIOptionFunc) error {
  599. return v.api.Setup(vnetClientIP, options...)
  600. }
  601. func (v *testPlayer) Close() error {
  602. if v.pc != nil {
  603. _ = v.pc.Close()
  604. }
  605. for _, receiver := range v.receivers {
  606. _ = receiver.Stop()
  607. }
  608. if v.api != nil {
  609. _ = v.api.Close()
  610. }
  611. return nil
  612. }
  613. func (v *testPlayer) Run(ctx context.Context, cancel context.CancelFunc) error {
  614. r := fmt.Sprintf("%v://%v%v", srsSchema, *srsServer, *srsStream)
  615. if v.streamSuffix != "" {
  616. r = fmt.Sprintf("%v-%v", r, v.streamSuffix)
  617. }
  618. pli := time.Duration(*srsPlayPLI) * time.Millisecond
  619. logger.Tf(ctx, "Run play url=%v", r)
  620. pc, err := v.api.NewPeerConnection(webrtc.Configuration{})
  621. if err != nil {
  622. return errors.Wrapf(err, "Create PC")
  623. }
  624. v.pc = pc
  625. if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{
  626. Direction: webrtc.RTPTransceiverDirectionRecvonly,
  627. }); err != nil {
  628. return errors.Wrapf(err, "add track")
  629. }
  630. if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{
  631. Direction: webrtc.RTPTransceiverDirectionRecvonly,
  632. }); err != nil {
  633. return errors.Wrapf(err, "add track")
  634. }
  635. offer, err := pc.CreateOffer(nil)
  636. if err != nil {
  637. return errors.Wrapf(err, "Create Offer")
  638. }
  639. if err := pc.SetLocalDescription(offer); err != nil {
  640. return errors.Wrapf(err, "Set offer %v", offer)
  641. }
  642. answer, err := apiRtcRequest(ctx, "/rtc/v1/play", r, offer.SDP)
  643. if err != nil {
  644. return errors.Wrapf(err, "Api request offer=%v", offer.SDP)
  645. }
  646. // Run a proxy for real server and vnet.
  647. if address, err := parseAddressOfCandidate(answer); err != nil {
  648. return errors.Wrapf(err, "parse address of %v", answer)
  649. } else if err := v.api.proxy.Proxy(v.api.network, address); err != nil {
  650. return errors.Wrapf(err, "proxy %v to %v", v.api.network, address)
  651. }
  652. if err := pc.SetRemoteDescription(webrtc.SessionDescription{
  653. Type: webrtc.SDPTypeAnswer, SDP: answer,
  654. }); err != nil {
  655. return errors.Wrapf(err, "Set answer %v", answer)
  656. }
  657. handleTrack := func(ctx context.Context, track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) error {
  658. // Send a PLI on an interval so that the publisher is pushing a keyframe
  659. go func() {
  660. if track.Kind() == webrtc.RTPCodecTypeAudio {
  661. return
  662. }
  663. for {
  664. select {
  665. case <-ctx.Done():
  666. return
  667. case <-time.After(pli):
  668. _ = pc.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{
  669. MediaSSRC: uint32(track.SSRC()),
  670. }})
  671. }
  672. }
  673. }()
  674. v.receivers = append(v.receivers, receiver)
  675. for ctx.Err() == nil {
  676. _, _, err := track.ReadRTP()
  677. if err != nil {
  678. return errors.Wrapf(err, "Read RTP")
  679. }
  680. }
  681. return nil
  682. }
  683. pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
  684. err = handleTrack(ctx, track, receiver)
  685. if err != nil {
  686. codec := track.Codec()
  687. err = errors.Wrapf(err, "Handle track %v, pt=%v", codec.MimeType, codec.PayloadType)
  688. cancel()
  689. }
  690. })
  691. pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
  692. if state == webrtc.ICEConnectionStateFailed || state == webrtc.ICEConnectionStateClosed {
  693. err = errors.Errorf("Close for ICE state %v", state)
  694. cancel()
  695. }
  696. })
  697. <-ctx.Done()
  698. return err
  699. }
  700. type testPublisherOptionFunc func(p *testPublisher) error
  701. type testPublisher struct {
  702. onOffer func(s *webrtc.SessionDescription) error
  703. onAnswer func(s *webrtc.SessionDescription) error
  704. iceReadyCancel context.CancelFunc
  705. // internal objects
  706. aIngester *audioIngester
  707. vIngester *videoIngester
  708. pc *webrtc.PeerConnection
  709. // We should dispose it.
  710. api *testWebRTCAPI
  711. // Optional suffix for stream url.
  712. streamSuffix string
  713. // To cancel the publisher, pass by Run.
  714. cancel context.CancelFunc
  715. }
  716. func createApiForPublisher(pub *testPublisher) error {
  717. api, err := newTestWebRTCAPI()
  718. if err != nil {
  719. return err
  720. }
  721. pub.api = api
  722. return nil
  723. }
  724. func newTestPublisher(options ...testPublisherOptionFunc) (*testPublisher, error) {
  725. sourceVideo, sourceAudio := *srsPublishVideo, *srsPublishAudio
  726. v := &testPublisher{}
  727. for _, opt := range options {
  728. if err := opt(v); err != nil {
  729. return nil, err
  730. }
  731. }
  732. // Create ingesters.
  733. if sourceAudio != "" {
  734. v.aIngester = newAudioIngester(sourceAudio)
  735. }
  736. if sourceVideo != "" {
  737. v.vIngester = newVideoIngester(sourceVideo)
  738. }
  739. // Setup the interceptors for packets.
  740. api := v.api
  741. api.options = append(api.options, func(api *testWebRTCAPI) {
  742. // Filter for RTCP packets.
  743. rtcpInterceptor := &rtcpInterceptor{}
  744. rtcpInterceptor.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
  745. return rtcpInterceptor.nextRTCPReader.Read(buf, attributes)
  746. }
  747. rtcpInterceptor.rtcpWriter = func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) {
  748. return rtcpInterceptor.nextRTCPWriter.Write(pkts, attributes)
  749. }
  750. api.registry.Add(&rtcpInteceptorFactory{rtcpInterceptor})
  751. // Filter for ingesters.
  752. if sourceAudio != "" {
  753. api.registry.Add(&rtpInteceptorFactory{v.aIngester.audioLevelInterceptor})
  754. }
  755. if sourceVideo != "" {
  756. api.registry.Add(&rtpInteceptorFactory{v.vIngester.markerInterceptor})
  757. }
  758. })
  759. return v, nil
  760. }
  761. func (v *testPublisher) Setup(vnetClientIP string, options ...testWebRTCAPIOptionFunc) error {
  762. return v.api.Setup(vnetClientIP, options...)
  763. }
  764. func (v *testPublisher) Close() error {
  765. if v.vIngester != nil {
  766. _ = v.vIngester.Close()
  767. }
  768. if v.aIngester != nil {
  769. _ = v.aIngester.Close()
  770. }
  771. if v.pc != nil {
  772. _ = v.pc.Close()
  773. }
  774. if v.api != nil {
  775. _ = v.api.Close()
  776. }
  777. return nil
  778. }
  779. func (v *testPublisher) SetStreamSuffix(suffix string) *testPublisher {
  780. v.streamSuffix = suffix
  781. return v
  782. }
  783. func (v *testPublisher) Run(ctx context.Context, cancel context.CancelFunc) error {
  784. // Save the cancel.
  785. v.cancel = cancel
  786. r := fmt.Sprintf("%v://%v%v", srsSchema, *srsServer, *srsStream)
  787. if v.streamSuffix != "" {
  788. r = fmt.Sprintf("%v-%v", r, v.streamSuffix)
  789. }
  790. sourceVideo, sourceAudio, fps := *srsPublishVideo, *srsPublishAudio, *srsPublishVideoFps
  791. logger.Tf(ctx, "Run publish url=%v, audio=%v, video=%v, fps=%v",
  792. r, sourceAudio, sourceVideo, fps)
  793. pc, err := v.api.NewPeerConnection(webrtc.Configuration{})
  794. if err != nil {
  795. return errors.Wrapf(err, "Create PC")
  796. }
  797. v.pc = pc
  798. if v.vIngester != nil {
  799. if err := v.vIngester.AddTrack(pc, fps); err != nil {
  800. return errors.Wrapf(err, "Add track")
  801. }
  802. }
  803. if v.aIngester != nil {
  804. if err := v.aIngester.AddTrack(pc); err != nil {
  805. return errors.Wrapf(err, "Add track")
  806. }
  807. }
  808. offer, err := pc.CreateOffer(nil)
  809. if err != nil {
  810. return errors.Wrapf(err, "Create Offer")
  811. }
  812. if err := pc.SetLocalDescription(offer); err != nil {
  813. return errors.Wrapf(err, "Set offer %v", offer)
  814. }
  815. if v.onOffer != nil {
  816. if err := v.onOffer(&offer); err != nil {
  817. return errors.Wrapf(err, "sdp %v %v", offer.Type, offer.SDP)
  818. }
  819. }
  820. answerSDP, err := apiRtcRequest(ctx, "/rtc/v1/publish", r, offer.SDP)
  821. if err != nil {
  822. return errors.Wrapf(err, "Api request offer=%v", offer.SDP)
  823. }
  824. // Run a proxy for real server and vnet.
  825. if address, err := parseAddressOfCandidate(answerSDP); err != nil {
  826. return errors.Wrapf(err, "parse address of %v", answerSDP)
  827. } else if err := v.api.proxy.Proxy(v.api.network, address); err != nil {
  828. return errors.Wrapf(err, "proxy %v to %v", v.api.network, address)
  829. }
  830. answer := &webrtc.SessionDescription{
  831. Type: webrtc.SDPTypeAnswer, SDP: answerSDP,
  832. }
  833. if v.onAnswer != nil {
  834. if err := v.onAnswer(answer); err != nil {
  835. return errors.Wrapf(err, "on answerSDP")
  836. }
  837. }
  838. if err := pc.SetRemoteDescription(*answer); err != nil {
  839. return errors.Wrapf(err, "Set answerSDP %v", answerSDP)
  840. }
  841. logger.Tf(ctx, "State signaling=%v, ice=%v, conn=%v", pc.SignalingState(), pc.ICEConnectionState(), pc.ConnectionState())
  842. // ICE state management.
  843. pc.OnICEGatheringStateChange(func(state webrtc.ICEGathererState) {
  844. logger.Tf(ctx, "ICE gather state %v", state)
  845. })
  846. pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
  847. logger.Tf(ctx, "ICE candidate %v %v:%v", candidate.Protocol, candidate.Address, candidate.Port)
  848. })
  849. pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
  850. logger.Tf(ctx, "ICE state %v", state)
  851. })
  852. pc.OnSignalingStateChange(func(state webrtc.SignalingState) {
  853. logger.Tf(ctx, "Signaling state %v", state)
  854. })
  855. if v.aIngester != nil {
  856. v.aIngester.sAudioSender.Transport().OnStateChange(func(state webrtc.DTLSTransportState) {
  857. logger.Tf(ctx, "DTLS state %v", state)
  858. })
  859. }
  860. pcDone, pcDoneCancel := context.WithCancel(context.Background())
  861. pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
  862. logger.Tf(ctx, "PC state %v", state)
  863. if state == webrtc.PeerConnectionStateConnected {
  864. pcDoneCancel()
  865. if v.iceReadyCancel != nil {
  866. v.iceReadyCancel()
  867. }
  868. }
  869. if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateClosed {
  870. err = errors.Errorf("Close for PC state %v", state)
  871. cancel()
  872. }
  873. })
  874. // Wait for event from context or tracks.
  875. var wg sync.WaitGroup
  876. var finalErr error
  877. wg.Add(1)
  878. go func() {
  879. defer wg.Done()
  880. defer logger.Tf(ctx, "ingest notify done")
  881. <-ctx.Done()
  882. if v.aIngester != nil && v.aIngester.sAudioSender != nil {
  883. // We MUST wait for the ingester ready(or closed), because it might crash if sender is disposed.
  884. <-v.aIngester.ready.Done()
  885. _ = v.aIngester.Close()
  886. }
  887. if v.vIngester != nil && v.vIngester.sVideoSender != nil {
  888. // We MUST wait for the ingester ready(or closed), because it might crash if sender is disposed.
  889. <-v.vIngester.ready.Done()
  890. _ = v.vIngester.Close()
  891. }
  892. }()
  893. wg.Add(1)
  894. go func() {
  895. defer wg.Done()
  896. defer cancel()
  897. if v.aIngester == nil {
  898. return
  899. }
  900. defer v.aIngester.readyCancel()
  901. select {
  902. case <-ctx.Done():
  903. return
  904. case <-pcDone.Done():
  905. }
  906. wg.Add(1)
  907. go func() {
  908. defer wg.Done()
  909. defer logger.Tf(ctx, "aingester sender read done")
  910. buf := make([]byte, 1500)
  911. for ctx.Err() == nil {
  912. if _, _, err := v.aIngester.sAudioSender.Read(buf); err != nil {
  913. return
  914. }
  915. }
  916. }()
  917. for {
  918. if err := v.aIngester.Ingest(ctx); err != nil {
  919. if err == io.EOF {
  920. logger.Tf(ctx, "aingester retry for %v", err)
  921. continue
  922. }
  923. if err != context.Canceled {
  924. finalErr = errors.Wrapf(err, "audio")
  925. }
  926. logger.Tf(ctx, "aingester err=%v, final=%v", err, finalErr)
  927. return
  928. }
  929. }
  930. }()
  931. wg.Add(1)
  932. go func() {
  933. defer wg.Done()
  934. defer cancel()
  935. if v.vIngester == nil {
  936. return
  937. }
  938. defer v.vIngester.readyCancel()
  939. select {
  940. case <-ctx.Done():
  941. return
  942. case <-pcDone.Done():
  943. logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start ingest video %v", sourceVideo)
  944. }
  945. wg.Add(1)
  946. go func() {
  947. defer wg.Done()
  948. defer logger.Tf(ctx, "vingester sender read done")
  949. buf := make([]byte, 1500)
  950. for ctx.Err() == nil {
  951. // The Read() might block in r.rtcpInterceptor.Read(b, a),
  952. // so that the Stop() can not stop it.
  953. if _, _, err := v.vIngester.sVideoSender.Read(buf); err != nil {
  954. return
  955. }
  956. }
  957. }()
  958. for {
  959. if err := v.vIngester.Ingest(ctx); err != nil {
  960. if err == io.EOF {
  961. logger.Tf(ctx, "vingester retry for %v", err)
  962. continue
  963. }
  964. if err != context.Canceled {
  965. finalErr = errors.Wrapf(err, "video")
  966. }
  967. logger.Tf(ctx, "vingester err=%v, final=%v", err, finalErr)
  968. return
  969. }
  970. }
  971. }()
  972. wg.Wait()
  973. logger.Tf(ctx, "ingester done ctx=%v, final=%v", ctx.Err(), finalErr)
  974. if finalErr != nil {
  975. return finalErr
  976. }
  977. return ctx.Err()
  978. }