publisher.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. // The MIT License (MIT)
  2. //
  3. // # Copyright (c) 2021 Winlin
  4. //
  5. // Permission is hereby granted, free of charge, to any person obtaining a copy of
  6. // this software and associated documentation files (the "Software"), to deal in
  7. // the Software without restriction, including without limitation the rights to
  8. // use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  9. // the Software, and to permit persons to whom the Software is furnished to do so,
  10. // subject to the following conditions:
  11. //
  12. // The above copyright notice and this permission notice shall be included in all
  13. // copies or substantial portions of the Software.
  14. //
  15. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  16. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  17. // FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  18. // COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  19. // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  20. // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  21. package live
  22. import (
  23. "context"
  24. "fmt"
  25. "math/rand"
  26. "net"
  27. "net/url"
  28. "strconv"
  29. "strings"
  30. "time"
  31. "github.com/haivision/srtgo"
  32. "github.com/ossrs/go-oryx-lib/amf0"
  33. "github.com/ossrs/go-oryx-lib/errors"
  34. "github.com/ossrs/go-oryx-lib/logger"
  35. "github.com/ossrs/go-oryx-lib/rtmp"
  36. )
  37. func startPublish(ctx context.Context, r string, closeAfterPublished bool) error {
  38. ctx = logger.WithContext(ctx)
  39. logger.Tf(ctx, "Run publish url=%v, cap=%v", r, closeAfterPublished)
  40. u, err := url.Parse(r)
  41. if err != nil {
  42. return errors.Wrapf(err, "parse %v", r)
  43. }
  44. if u.Scheme == "rtmp" {
  45. return startPublishRTMP(ctx, u, closeAfterPublished)
  46. } else if u.Scheme == "srt" {
  47. return startPublishSRT(ctx, u, closeAfterPublished)
  48. }
  49. return fmt.Errorf("invalid schema %v of %v", u.Scheme, r)
  50. }
  51. func startPublishSRT(ctx context.Context, u *url.URL, closeAfterPublished bool) (err error) {
  52. // Parse host and port.
  53. port := 1935
  54. if u.Port() != "" {
  55. if port, err = strconv.Atoi(u.Port()); err != nil {
  56. return errors.Wrapf(err, "parse port %v", u.Port())
  57. }
  58. }
  59. ips, err := net.LookupIP(u.Hostname())
  60. if err != nil {
  61. return errors.Wrapf(err, "lookup %v", u.Hostname())
  62. }
  63. if len(ips) == 0 {
  64. return errors.Errorf("no ips for %v", u.Hostname())
  65. }
  66. logger.Tf(ctx, "Parse url %v to host=%v, ip=%v, port=%v",
  67. u.String(), u.Hostname(), ips[0], port)
  68. // Setup libsrt.
  69. client := srtgo.NewSrtSocket(ips[0].To4().String(), uint16(port),
  70. map[string]string{
  71. "transtype": "live",
  72. "tsbpdmode": "false",
  73. "tlpktdrop": "false",
  74. "latency": "0",
  75. "streamid": fmt.Sprintf("#%v", u.Fragment),
  76. },
  77. )
  78. defer client.Close()
  79. if err := client.Connect(); err != nil {
  80. return errors.Wrapf(err, "SRT connect to %v:%v", u.Hostname(), port)
  81. }
  82. logger.Tf(ctx, "Connect to SRT server %v:%v success", u.Hostname(), port)
  83. // We should wait for a while after connected to SRT server before quit. Because SRT server use timeout
  84. // to detect UDP connection status, so we should never reconnect very fast.
  85. select {
  86. case <-ctx.Done():
  87. case <-time.After(3 * time.Second):
  88. logger.Tf(ctx, "SRT publish stream success, stream=%v", u.Fragment)
  89. }
  90. if closeAfterPublished {
  91. logger.Tf(ctx, "Close connection after published")
  92. return nil
  93. }
  94. return nil
  95. }
  96. func startPublishRTMP(ctx context.Context, u *url.URL, closeAfterPublished bool) (err error) {
  97. parts := strings.Split(u.Path, "/")
  98. if len(parts) == 0 {
  99. return errors.Errorf("invalid path %v", u.Path)
  100. }
  101. app, stream := strings.Join(parts[:len(parts)-1], "/"), parts[len(parts)-1]
  102. // Parse host and port.
  103. port := 1935
  104. if u.Port() != "" {
  105. if port, err = strconv.Atoi(u.Port()); err != nil {
  106. return errors.Wrapf(err, "parse port %v", u.Port())
  107. }
  108. }
  109. ips, err := net.LookupIP(u.Hostname())
  110. if err != nil {
  111. return errors.Wrapf(err, "lookup %v", u.Hostname())
  112. }
  113. if len(ips) == 0 {
  114. return errors.Errorf("no ips for %v", u.Hostname())
  115. }
  116. logger.Tf(ctx, "Parse url %v to host=%v, ip=%v, port=%v, app=%v, stream=%v",
  117. u.String(), u.Hostname(), ips[0], port, app, stream)
  118. // Connect via TCP client.
  119. c, err := net.DialTCP("tcp", nil, &net.TCPAddr{IP: ips[0], Port: port})
  120. if err != nil {
  121. return errors.Wrapf(err, "dial %v %v", u.Hostname(), u.Port())
  122. }
  123. defer c.Close()
  124. logger.Tf(ctx, "Connect to RTMP server %v:%v success", u.Hostname(), port)
  125. // RTMP Handshake.
  126. rd := rand.New(rand.NewSource(time.Now().UnixNano()))
  127. hs := rtmp.NewHandshake(rd)
  128. if err := hs.WriteC0S0(c); err != nil {
  129. return errors.Wrap(err, "write c0")
  130. }
  131. if err := hs.WriteC1S1(c); err != nil {
  132. return errors.Wrap(err, "write c1")
  133. }
  134. if _, err = hs.ReadC0S0(c); err != nil {
  135. return errors.Wrap(err, "read s1")
  136. }
  137. s1, err := hs.ReadC1S1(c)
  138. if err != nil {
  139. return errors.Wrap(err, "read s1")
  140. }
  141. if _, err = hs.ReadC2S2(c); err != nil {
  142. return errors.Wrap(err, "read s2")
  143. }
  144. if err := hs.WriteC2S2(c, s1); err != nil {
  145. return errors.Wrap(err, "write c2")
  146. }
  147. logger.Tf(ctx, "RTMP handshake with %v:%v success", ips[0], port)
  148. // Do connect and publish.
  149. client := rtmp.NewProtocol(c)
  150. connectApp := rtmp.NewConnectAppPacket()
  151. tcURL := fmt.Sprintf("rtmp://%v%v", u.Hostname(), app)
  152. connectApp.CommandObject.Set("tcUrl", amf0.NewString(tcURL))
  153. if err = client.WritePacket(connectApp, 1); err != nil {
  154. return errors.Wrap(err, "write connect app")
  155. }
  156. var connectAppRes *rtmp.ConnectAppResPacket
  157. if _, err = client.ExpectPacket(&connectAppRes); err != nil {
  158. return errors.Wrap(err, "expect connect app res")
  159. }
  160. logger.Tf(ctx, "RTMP connect app success, tcUrl=%v", tcURL)
  161. createStream := rtmp.NewCreateStreamPacket()
  162. if err = client.WritePacket(createStream, 1); err != nil {
  163. return errors.Wrap(err, "write create stream")
  164. }
  165. var createStreamRes *rtmp.CreateStreamResPacket
  166. if _, err = client.ExpectPacket(&createStreamRes); err != nil {
  167. return errors.Wrap(err, "expect create stream res")
  168. }
  169. logger.Tf(ctx, "RTMP create stream success")
  170. publish := rtmp.NewPublishPacket()
  171. publish.StreamName = *amf0.NewString(stream)
  172. if err = client.WritePacket(publish, 1); err != nil {
  173. return errors.Wrap(err, "write publish")
  174. }
  175. logger.Tf(ctx, "RTMP publish stream success, stream=%v", stream)
  176. if closeAfterPublished {
  177. logger.Tf(ctx, "Close connection after published")
  178. return nil
  179. }
  180. return nil
  181. }