2
0

rtmp.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655
  1. // Copyright (c) 2024 Winlin
  2. //
  3. // SPDX-License-Identifier: MIT
  4. package main
  5. import (
  6. "context"
  7. "fmt"
  8. "math/rand"
  9. "net"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. "srs-proxy/errors"
  15. "srs-proxy/logger"
  16. "srs-proxy/rtmp"
  17. )
  18. // srsRTMPServer is the proxy for SRS RTMP server, to proxy the RTMP stream to backend SRS
  19. // server. It will figure out the backend server to proxy to. Unlike the edge server, it will
  20. // not cache the stream, but just proxy the stream to backend.
  21. type srsRTMPServer struct {
  22. // The TCP listener for RTMP server.
  23. listener *net.TCPListener
  24. // The random number generator.
  25. rd *rand.Rand
  26. // The wait group for all goroutines.
  27. wg sync.WaitGroup
  28. }
  29. func NewSRSRTMPServer(opts ...func(*srsRTMPServer)) *srsRTMPServer {
  30. v := &srsRTMPServer{
  31. rd: rand.New(rand.NewSource(time.Now().UnixNano())),
  32. }
  33. for _, opt := range opts {
  34. opt(v)
  35. }
  36. return v
  37. }
  38. func (v *srsRTMPServer) Close() error {
  39. if v.listener != nil {
  40. v.listener.Close()
  41. }
  42. v.wg.Wait()
  43. return nil
  44. }
  45. func (v *srsRTMPServer) Run(ctx context.Context) error {
  46. endpoint := envRtmpServer()
  47. if !strings.Contains(endpoint, ":") {
  48. endpoint = ":" + endpoint
  49. }
  50. addr, err := net.ResolveTCPAddr("tcp", endpoint)
  51. if err != nil {
  52. return errors.Wrapf(err, "resolve rtmp addr %v", endpoint)
  53. }
  54. listener, err := net.ListenTCP("tcp", addr)
  55. if err != nil {
  56. return errors.Wrapf(err, "listen rtmp addr %v", addr)
  57. }
  58. v.listener = listener
  59. logger.Df(ctx, "RTMP server listen at %v", addr)
  60. v.wg.Add(1)
  61. go func() {
  62. defer v.wg.Done()
  63. for {
  64. conn, err := v.listener.AcceptTCP()
  65. if err != nil {
  66. if ctx.Err() != context.Canceled {
  67. // TODO: If RTMP server closed unexpectedly, we should notice the main loop to quit.
  68. logger.Wf(ctx, "RTMP server accept err %+v", err)
  69. } else {
  70. logger.Df(ctx, "RTMP server done")
  71. }
  72. return
  73. }
  74. v.wg.Add(1)
  75. go func(ctx context.Context, conn *net.TCPConn) {
  76. defer v.wg.Done()
  77. defer conn.Close()
  78. handleErr := func(err error) {
  79. if isPeerClosedError(err) {
  80. logger.Df(ctx, "RTMP peer is closed")
  81. } else {
  82. logger.Wf(ctx, "RTMP serve err %+v", err)
  83. }
  84. }
  85. rc := NewRTMPConnection(func(client *RTMPConnection) {
  86. client.rd = v.rd
  87. })
  88. if err := rc.serve(ctx, conn); err != nil {
  89. handleErr(err)
  90. } else {
  91. logger.Df(ctx, "RTMP client done")
  92. }
  93. }(logger.WithContext(ctx), conn)
  94. }
  95. }()
  96. return nil
  97. }
  98. // RTMPConnection is an RTMP streaming connection. There is no state need to be sync between
  99. // proxy servers.
  100. //
  101. // When we got an RTMP request, we will parse the stream URL from the RTMP publish or play request,
  102. // then proxy to the corresponding backend server. All state is in the RTMP request, so this
  103. // connection is stateless.
  104. type RTMPConnection struct {
  105. // The random number generator.
  106. rd *rand.Rand
  107. }
  108. func NewRTMPConnection(opts ...func(*RTMPConnection)) *RTMPConnection {
  109. v := &RTMPConnection{}
  110. for _, opt := range opts {
  111. opt(v)
  112. }
  113. return v
  114. }
  115. func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error {
  116. logger.Df(ctx, "Got RTMP client from %v", conn.RemoteAddr())
  117. // If any goroutine quit, cancel another one.
  118. parentCtx := ctx
  119. ctx, cancel := context.WithCancel(ctx)
  120. defer cancel()
  121. var backend *RTMPClientToBackend
  122. if true {
  123. go func() {
  124. <-ctx.Done()
  125. conn.Close()
  126. if backend != nil {
  127. backend.Close()
  128. }
  129. }()
  130. }
  131. // Simple handshake with client.
  132. hs := rtmp.NewHandshake(v.rd)
  133. if _, err := hs.ReadC0S0(conn); err != nil {
  134. return errors.Wrapf(err, "read c0")
  135. }
  136. if _, err := hs.ReadC1S1(conn); err != nil {
  137. return errors.Wrapf(err, "read c1")
  138. }
  139. if err := hs.WriteC0S0(conn); err != nil {
  140. return errors.Wrapf(err, "write s1")
  141. }
  142. if err := hs.WriteC1S1(conn); err != nil {
  143. return errors.Wrapf(err, "write s1")
  144. }
  145. if err := hs.WriteC2S2(conn, hs.C1S1()); err != nil {
  146. return errors.Wrapf(err, "write s2")
  147. }
  148. if _, err := hs.ReadC2S2(conn); err != nil {
  149. return errors.Wrapf(err, "read c2")
  150. }
  151. client := rtmp.NewProtocol(conn)
  152. logger.Df(ctx, "RTMP simple handshake done")
  153. // Expect RTMP connect command with tcUrl.
  154. var connectReq *rtmp.ConnectAppPacket
  155. if _, err := rtmp.ExpectPacket(ctx, client, &connectReq); err != nil {
  156. return errors.Wrapf(err, "expect connect req")
  157. }
  158. if true {
  159. ack := rtmp.NewWindowAcknowledgementSize()
  160. ack.AckSize = 2500000
  161. if err := client.WritePacket(ctx, ack, 0); err != nil {
  162. return errors.Wrapf(err, "write set ack size")
  163. }
  164. }
  165. if true {
  166. chunk := rtmp.NewSetChunkSize()
  167. chunk.ChunkSize = 128
  168. if err := client.WritePacket(ctx, chunk, 0); err != nil {
  169. return errors.Wrapf(err, "write set chunk size")
  170. }
  171. }
  172. connectRes := rtmp.NewConnectAppResPacket(connectReq.TransactionID)
  173. connectRes.CommandObject.Set("fmsVer", rtmp.NewAmf0String("FMS/3,5,3,888"))
  174. connectRes.CommandObject.Set("capabilities", rtmp.NewAmf0Number(127))
  175. connectRes.CommandObject.Set("mode", rtmp.NewAmf0Number(1))
  176. connectRes.Args.Set("level", rtmp.NewAmf0String("status"))
  177. connectRes.Args.Set("code", rtmp.NewAmf0String("NetConnection.Connect.Success"))
  178. connectRes.Args.Set("description", rtmp.NewAmf0String("Connection succeeded"))
  179. connectRes.Args.Set("objectEncoding", rtmp.NewAmf0Number(0))
  180. connectResData := rtmp.NewAmf0EcmaArray()
  181. connectResData.Set("version", rtmp.NewAmf0String("3,5,3,888"))
  182. connectResData.Set("srs_version", rtmp.NewAmf0String(Version()))
  183. connectResData.Set("srs_id", rtmp.NewAmf0String(logger.ContextID(ctx)))
  184. connectRes.Args.Set("data", connectResData)
  185. if err := client.WritePacket(ctx, connectRes, 0); err != nil {
  186. return errors.Wrapf(err, "write connect res")
  187. }
  188. tcUrl := connectReq.TcUrl()
  189. logger.Df(ctx, "RTMP connect app %v", tcUrl)
  190. // Expect RTMP command to identify the client, a publisher or viewer.
  191. var currentStreamID, nextStreamID int
  192. var streamName string
  193. var clientType RTMPClientType
  194. for clientType == "" {
  195. var identifyReq rtmp.Packet
  196. if _, err := rtmp.ExpectPacket(ctx, client, &identifyReq); err != nil {
  197. return errors.Wrapf(err, "expect identify req")
  198. }
  199. var response rtmp.Packet
  200. switch pkt := identifyReq.(type) {
  201. case *rtmp.CallPacket:
  202. if pkt.CommandName == "createStream" {
  203. identifyRes := rtmp.NewCreateStreamResPacket(pkt.TransactionID)
  204. response = identifyRes
  205. nextStreamID = 1
  206. identifyRes.StreamID = *rtmp.NewAmf0Number(float64(nextStreamID))
  207. } else if pkt.CommandName == "getStreamLength" {
  208. // Ignore and do not reply these packets.
  209. } else {
  210. // For releaseStream, FCPublish, etc.
  211. identifyRes := rtmp.NewCallPacket()
  212. response = identifyRes
  213. identifyRes.TransactionID = pkt.TransactionID
  214. identifyRes.CommandName = "_result"
  215. identifyRes.CommandObject = rtmp.NewAmf0Null()
  216. identifyRes.Args = rtmp.NewAmf0Undefined()
  217. }
  218. case *rtmp.PublishPacket:
  219. streamName = string(pkt.StreamName)
  220. clientType = RTMPClientTypePublisher
  221. identifyRes := rtmp.NewCallPacket()
  222. response = identifyRes
  223. identifyRes.CommandName = "onFCPublish"
  224. identifyRes.CommandObject = rtmp.NewAmf0Null()
  225. data := rtmp.NewAmf0Object()
  226. data.Set("code", rtmp.NewAmf0String("NetStream.Publish.Start"))
  227. data.Set("description", rtmp.NewAmf0String("Started publishing stream."))
  228. identifyRes.Args = data
  229. case *rtmp.PlayPacket:
  230. streamName = string(pkt.StreamName)
  231. clientType = RTMPClientTypeViewer
  232. identifyRes := rtmp.NewCallPacket()
  233. response = identifyRes
  234. identifyRes.CommandName = "onStatus"
  235. identifyRes.CommandObject = rtmp.NewAmf0Null()
  236. data := rtmp.NewAmf0Object()
  237. data.Set("level", rtmp.NewAmf0String("status"))
  238. data.Set("code", rtmp.NewAmf0String("NetStream.Play.Reset"))
  239. data.Set("description", rtmp.NewAmf0String("Playing and resetting stream."))
  240. data.Set("details", rtmp.NewAmf0String("stream"))
  241. data.Set("clientid", rtmp.NewAmf0String("ASAICiss"))
  242. identifyRes.Args = data
  243. }
  244. if response != nil {
  245. if err := client.WritePacket(ctx, response, currentStreamID); err != nil {
  246. return errors.Wrapf(err, "write identify res for req=%v, stream=%v",
  247. identifyReq, currentStreamID)
  248. }
  249. }
  250. // Update the stream ID for next request.
  251. currentStreamID = nextStreamID
  252. }
  253. logger.Df(ctx, "RTMP identify tcUrl=%v, stream=%v, id=%v, type=%v",
  254. tcUrl, streamName, currentStreamID, clientType)
  255. // Find a backend SRS server to proxy the RTMP stream.
  256. backend = NewRTMPClientToBackend(func(client *RTMPClientToBackend) {
  257. client.rd, client.typ = v.rd, clientType
  258. })
  259. defer backend.Close()
  260. if err := backend.Connect(ctx, tcUrl, streamName); err != nil {
  261. return errors.Wrapf(err, "connect backend, tcUrl=%v, stream=%v", tcUrl, streamName)
  262. }
  263. // Start the streaming.
  264. if clientType == RTMPClientTypePublisher {
  265. identifyRes := rtmp.NewCallPacket()
  266. identifyRes.CommandName = "onStatus"
  267. identifyRes.CommandObject = rtmp.NewAmf0Null()
  268. data := rtmp.NewAmf0Object()
  269. data.Set("level", rtmp.NewAmf0String("status"))
  270. data.Set("code", rtmp.NewAmf0String("NetStream.Publish.Start"))
  271. data.Set("description", rtmp.NewAmf0String("Started publishing stream."))
  272. data.Set("clientid", rtmp.NewAmf0String("ASAICiss"))
  273. identifyRes.Args = data
  274. if err := client.WritePacket(ctx, identifyRes, currentStreamID); err != nil {
  275. return errors.Wrapf(err, "start publish")
  276. }
  277. } else if clientType == RTMPClientTypeViewer {
  278. identifyRes := rtmp.NewCallPacket()
  279. identifyRes.CommandName = "onStatus"
  280. identifyRes.CommandObject = rtmp.NewAmf0Null()
  281. data := rtmp.NewAmf0Object()
  282. data.Set("level", rtmp.NewAmf0String("status"))
  283. data.Set("code", rtmp.NewAmf0String("NetStream.Play.Start"))
  284. data.Set("description", rtmp.NewAmf0String("Started playing stream."))
  285. data.Set("details", rtmp.NewAmf0String("stream"))
  286. data.Set("clientid", rtmp.NewAmf0String("ASAICiss"))
  287. identifyRes.Args = data
  288. if err := client.WritePacket(ctx, identifyRes, currentStreamID); err != nil {
  289. return errors.Wrapf(err, "start play")
  290. }
  291. }
  292. logger.Df(ctx, "RTMP start streaming")
  293. // For all proxy goroutines.
  294. var wg sync.WaitGroup
  295. defer wg.Wait()
  296. // Proxy all message from backend to client.
  297. wg.Add(1)
  298. var r0 error
  299. go func() {
  300. defer wg.Done()
  301. defer cancel()
  302. r0 = func() error {
  303. for {
  304. m, err := backend.client.ReadMessage(ctx)
  305. if err != nil {
  306. return errors.Wrapf(err, "read message")
  307. }
  308. //logger.Df(ctx, "client<- %v %v %vB", m.MessageType, m.Timestamp, len(m.Payload))
  309. // TODO: Update the stream ID if not the same.
  310. if err := client.WriteMessage(ctx, m); err != nil {
  311. return errors.Wrapf(err, "write message")
  312. }
  313. }
  314. }()
  315. }()
  316. // Proxy all messages from client to backend.
  317. wg.Add(1)
  318. var r1 error
  319. go func() {
  320. defer wg.Done()
  321. defer cancel()
  322. r1 = func() error {
  323. for {
  324. m, err := client.ReadMessage(ctx)
  325. if err != nil {
  326. return errors.Wrapf(err, "read message")
  327. }
  328. //logger.Df(ctx, "client-> %v %v %vB", m.MessageType, m.Timestamp, len(m.Payload))
  329. // TODO: Update the stream ID if not the same.
  330. if err := backend.client.WriteMessage(ctx, m); err != nil {
  331. return errors.Wrapf(err, "write message")
  332. }
  333. }
  334. }()
  335. }()
  336. // Wait until all goroutine quit.
  337. wg.Wait()
  338. // Reset the error if caused by another goroutine.
  339. if r0 != nil {
  340. return errors.Wrapf(r0, "proxy backend->client")
  341. }
  342. if r1 != nil {
  343. return errors.Wrapf(r1, "proxy client->backend")
  344. }
  345. return parentCtx.Err()
  346. }
  347. type RTMPClientType string
  348. const (
  349. RTMPClientTypePublisher RTMPClientType = "publisher"
  350. RTMPClientTypeViewer RTMPClientType = "viewer"
  351. )
  352. // RTMPClientToBackend is a RTMP client to proxy the RTMP stream to backend.
  353. type RTMPClientToBackend struct {
  354. // The random number generator.
  355. rd *rand.Rand
  356. // The underlayer tcp client.
  357. tcpConn *net.TCPConn
  358. // The RTMP protocol client.
  359. client *rtmp.Protocol
  360. // The stream type.
  361. typ RTMPClientType
  362. }
  363. func NewRTMPClientToBackend(opts ...func(*RTMPClientToBackend)) *RTMPClientToBackend {
  364. v := &RTMPClientToBackend{}
  365. for _, opt := range opts {
  366. opt(v)
  367. }
  368. return v
  369. }
  370. func (v *RTMPClientToBackend) Close() error {
  371. if v.tcpConn != nil {
  372. v.tcpConn.Close()
  373. }
  374. return nil
  375. }
  376. func (v *RTMPClientToBackend) Connect(ctx context.Context, tcUrl, streamName string) error {
  377. // Build the stream URL in vhost/app/stream schema.
  378. streamURL, err := buildStreamURL(fmt.Sprintf("%v/%v", tcUrl, streamName))
  379. if err != nil {
  380. return errors.Wrapf(err, "build stream url %v/%v", tcUrl, streamName)
  381. }
  382. // Pick a backend SRS server to proxy the RTMP stream.
  383. backend, err := srsLoadBalancer.Pick(ctx, streamURL)
  384. if err != nil {
  385. return errors.Wrapf(err, "pick backend for %v", streamURL)
  386. }
  387. // Parse RTMP port from backend.
  388. if len(backend.RTMP) == 0 {
  389. return errors.Errorf("no rtmp server %+v for %v", backend, streamURL)
  390. }
  391. var rtmpPort int
  392. if iv, err := strconv.ParseInt(backend.RTMP[0], 10, 64); err != nil {
  393. return errors.Wrapf(err, "parse backend %+v rtmp port %v", backend, backend.RTMP[0])
  394. } else {
  395. rtmpPort = int(iv)
  396. }
  397. // Connect to backend SRS server via TCP client.
  398. addr := &net.TCPAddr{IP: net.ParseIP(backend.IP), Port: rtmpPort}
  399. c, err := net.DialTCP("tcp", nil, addr)
  400. if err != nil {
  401. return errors.Wrapf(err, "dial backend addr=%v, srs=%v", addr, backend)
  402. }
  403. v.tcpConn = c
  404. hs := rtmp.NewHandshake(v.rd)
  405. client := rtmp.NewProtocol(c)
  406. v.client = client
  407. // Simple RTMP handshake with server.
  408. if err := hs.WriteC0S0(c); err != nil {
  409. return errors.Wrapf(err, "write c0")
  410. }
  411. if err := hs.WriteC1S1(c); err != nil {
  412. return errors.Wrapf(err, "write c1")
  413. }
  414. if _, err = hs.ReadC0S0(c); err != nil {
  415. return errors.Wrapf(err, "read s0")
  416. }
  417. if _, err := hs.ReadC1S1(c); err != nil {
  418. return errors.Wrapf(err, "read s1")
  419. }
  420. if _, err = hs.ReadC2S2(c); err != nil {
  421. return errors.Wrapf(err, "read c2")
  422. }
  423. logger.Df(ctx, "backend simple handshake done, server=%v", addr)
  424. if err := hs.WriteC2S2(c, hs.C1S1()); err != nil {
  425. return errors.Wrapf(err, "write c2")
  426. }
  427. // Connect RTMP app on tcUrl with server.
  428. if true {
  429. connectApp := rtmp.NewConnectAppPacket()
  430. connectApp.CommandObject.Set("tcUrl", rtmp.NewAmf0String(tcUrl))
  431. if err := client.WritePacket(ctx, connectApp, 1); err != nil {
  432. return errors.Wrapf(err, "write connect app")
  433. }
  434. }
  435. if true {
  436. var connectAppRes *rtmp.ConnectAppResPacket
  437. if _, err := rtmp.ExpectPacket(ctx, client, &connectAppRes); err != nil {
  438. return errors.Wrapf(err, "expect connect app res")
  439. }
  440. logger.Df(ctx, "backend connect RTMP app, tcUrl=%v, id=%v", tcUrl, connectAppRes.SrsID())
  441. }
  442. // Play or view RTMP stream with server.
  443. if v.typ == RTMPClientTypeViewer {
  444. return v.play(ctx, client, streamName)
  445. }
  446. // Publish RTMP stream with server.
  447. return v.publish(ctx, client, streamName)
  448. }
  449. func (v *RTMPClientToBackend) publish(ctx context.Context, client *rtmp.Protocol, streamName string) error {
  450. if true {
  451. identifyReq := rtmp.NewCallPacket()
  452. identifyReq.CommandName = "releaseStream"
  453. identifyReq.TransactionID = 2
  454. identifyReq.CommandObject = rtmp.NewAmf0Null()
  455. identifyReq.Args = rtmp.NewAmf0String(streamName)
  456. if err := client.WritePacket(ctx, identifyReq, 0); err != nil {
  457. return errors.Wrapf(err, "releaseStream")
  458. }
  459. }
  460. for {
  461. var identifyRes *rtmp.CallPacket
  462. if _, err := rtmp.ExpectPacket(ctx, client, &identifyRes); err != nil {
  463. return errors.Wrapf(err, "expect releaseStream res")
  464. }
  465. if identifyRes.CommandName == "_result" {
  466. break
  467. }
  468. }
  469. if true {
  470. identifyReq := rtmp.NewCallPacket()
  471. identifyReq.CommandName = "FCPublish"
  472. identifyReq.TransactionID = 3
  473. identifyReq.CommandObject = rtmp.NewAmf0Null()
  474. identifyReq.Args = rtmp.NewAmf0String(streamName)
  475. if err := client.WritePacket(ctx, identifyReq, 0); err != nil {
  476. return errors.Wrapf(err, "FCPublish")
  477. }
  478. }
  479. for {
  480. var identifyRes *rtmp.CallPacket
  481. if _, err := rtmp.ExpectPacket(ctx, client, &identifyRes); err != nil {
  482. return errors.Wrapf(err, "expect FCPublish res")
  483. }
  484. if identifyRes.CommandName == "_result" {
  485. break
  486. }
  487. }
  488. var currentStreamID int
  489. if true {
  490. createStream := rtmp.NewCreateStreamPacket()
  491. createStream.TransactionID = 4
  492. createStream.CommandObject = rtmp.NewAmf0Null()
  493. if err := client.WritePacket(ctx, createStream, 0); err != nil {
  494. return errors.Wrapf(err, "createStream")
  495. }
  496. }
  497. for {
  498. var identifyRes *rtmp.CreateStreamResPacket
  499. if _, err := rtmp.ExpectPacket(ctx, client, &identifyRes); err != nil {
  500. return errors.Wrapf(err, "expect createStream res")
  501. }
  502. if sid := identifyRes.StreamID; sid != 0 {
  503. currentStreamID = int(sid)
  504. break
  505. }
  506. }
  507. if true {
  508. publishStream := rtmp.NewPublishPacket()
  509. publishStream.TransactionID = 5
  510. publishStream.CommandObject = rtmp.NewAmf0Null()
  511. publishStream.StreamName = *rtmp.NewAmf0String(streamName)
  512. publishStream.StreamType = *rtmp.NewAmf0String("live")
  513. if err := client.WritePacket(ctx, publishStream, currentStreamID); err != nil {
  514. return errors.Wrapf(err, "publish")
  515. }
  516. }
  517. for {
  518. var identifyRes *rtmp.CallPacket
  519. if _, err := rtmp.ExpectPacket(ctx, client, &identifyRes); err != nil {
  520. return errors.Wrapf(err, "expect publish res")
  521. }
  522. // Ignore onFCPublish, expect onStatus(NetStream.Publish.Start).
  523. if identifyRes.CommandName == "onStatus" {
  524. if data := rtmp.NewAmf0Converter(identifyRes.Args).ToObject(); data == nil {
  525. return errors.Errorf("onStatus args not object")
  526. } else if code := rtmp.NewAmf0Converter(data.Get("code")).ToString(); code == nil {
  527. return errors.Errorf("onStatus code not string")
  528. } else if *code != "NetStream.Publish.Start" {
  529. return errors.Errorf("onStatus code=%v not NetStream.Publish.Start", *code)
  530. }
  531. break
  532. }
  533. }
  534. logger.Df(ctx, "backend publish stream=%v, sid=%v", streamName, currentStreamID)
  535. return nil
  536. }
  537. func (v *RTMPClientToBackend) play(ctx context.Context, client *rtmp.Protocol, streamName string) error {
  538. var currentStreamID int
  539. if true {
  540. createStream := rtmp.NewCreateStreamPacket()
  541. createStream.TransactionID = 4
  542. createStream.CommandObject = rtmp.NewAmf0Null()
  543. if err := client.WritePacket(ctx, createStream, 0); err != nil {
  544. return errors.Wrapf(err, "createStream")
  545. }
  546. }
  547. for {
  548. var identifyRes *rtmp.CreateStreamResPacket
  549. if _, err := rtmp.ExpectPacket(ctx, client, &identifyRes); err != nil {
  550. return errors.Wrapf(err, "expect createStream res")
  551. }
  552. if sid := identifyRes.StreamID; sid != 0 {
  553. currentStreamID = int(sid)
  554. break
  555. }
  556. }
  557. playStream := rtmp.NewPlayPacket()
  558. playStream.StreamName = *rtmp.NewAmf0String(streamName)
  559. if err := client.WritePacket(ctx, playStream, currentStreamID); err != nil {
  560. return errors.Wrapf(err, "play")
  561. }
  562. for {
  563. var identifyRes *rtmp.CallPacket
  564. if _, err := rtmp.ExpectPacket(ctx, client, &identifyRes); err != nil {
  565. return errors.Wrapf(err, "expect releaseStream res")
  566. }
  567. if identifyRes.CommandName == "onStatus" && identifyRes.ArgsCode() == "NetStream.Play.Start" {
  568. break
  569. }
  570. }
  571. return nil
  572. }