2
0

rtmp_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. // The MIT License (MIT)
  2. //
  3. // # Copyright (c) 2023 Winlin
  4. //
  5. // Permission is hereby granted, free of charge, to any person obtaining a copy of
  6. // this software and associated documentation files (the "Software"), to deal in
  7. // the Software without restriction, including without limitation the rights to
  8. // use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  9. // the Software, and to permit persons to whom the Software is furnished to do so,
  10. // subject to the following conditions:
  11. //
  12. // The above copyright notice and this permission notice shall be included in all
  13. // copies or substantial portions of the Software.
  14. //
  15. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  16. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  17. // FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  18. // COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  19. // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  20. // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  21. package blackbox
  22. import (
  23. "context"
  24. "fmt"
  25. "github.com/ossrs/go-oryx-lib/errors"
  26. "github.com/ossrs/go-oryx-lib/logger"
  27. "math/rand"
  28. "os"
  29. "path"
  30. "sync"
  31. "testing"
  32. "time"
  33. )
  34. func TestFast_RtmpPublish_RtmpPlay_Basic(t *testing.T) {
  35. // This case is run in parallel.
  36. t.Parallel()
  37. // Setup the max timeout for this case.
  38. ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
  39. defer cancel()
  40. // Check a set of errors.
  41. var r0, r1, r2, r3, r4, r5 error
  42. defer func(ctx context.Context) {
  43. if err := filterTestError(ctx.Err(), r0, r1, r2, r3, r4, r5); err != nil {
  44. t.Errorf("Fail for err %+v", err)
  45. } else {
  46. logger.Tf(ctx, "test done with err %+v", err)
  47. }
  48. }(ctx)
  49. var wg sync.WaitGroup
  50. defer wg.Wait()
  51. // Start SRS server and wait for it to be ready.
  52. svr := NewSRSServer()
  53. wg.Add(1)
  54. go func() {
  55. defer wg.Done()
  56. r0 = svr.Run(ctx, cancel)
  57. }()
  58. // Start FFmpeg to publish stream.
  59. streamID := fmt.Sprintf("stream-%v-%v", os.Getpid(), rand.Int())
  60. streamURL := fmt.Sprintf("rtmp://localhost:%v/live/%v", svr.RTMPPort(), streamID)
  61. ffmpeg := NewFFmpeg(func(v *ffmpegClient) {
  62. v.args = []string{
  63. "-stream_loop", "-1", "-re", "-i", *srsPublishAvatar, "-c", "copy", "-f", "flv", streamURL,
  64. }
  65. })
  66. wg.Add(1)
  67. go func() {
  68. defer wg.Done()
  69. <-svr.ReadyCtx().Done()
  70. r1 = ffmpeg.Run(ctx, cancel)
  71. }()
  72. // Start FFprobe to detect and verify stream.
  73. duration := time.Duration(*srsFFprobeDuration) * time.Millisecond
  74. ffprobe := NewFFprobe(func(v *ffprobeClient) {
  75. v.dvrFile = path.Join(svr.WorkDir(), "objs", fmt.Sprintf("srs-ffprobe-%v.flv", streamID))
  76. v.streamURL, v.duration, v.timeout = streamURL, duration, time.Duration(*srsFFprobeTimeout)*time.Millisecond
  77. })
  78. wg.Add(1)
  79. go func() {
  80. defer wg.Done()
  81. <-svr.ReadyCtx().Done()
  82. r2 = ffprobe.Run(ctx, cancel)
  83. }()
  84. // Fast quit for probe done.
  85. select {
  86. case <-ctx.Done():
  87. case <-ffprobe.ProbeDoneCtx().Done():
  88. defer cancel()
  89. str, m := ffprobe.Result()
  90. if len(m.Streams) != 2 {
  91. r3 = errors.Errorf("invalid streams=%v, %v, %v", len(m.Streams), m.String(), str)
  92. }
  93. if ts := 90; m.Format.ProbeScore < ts {
  94. r4 = errors.Errorf("low score=%v < %v, %v, %v", m.Format.ProbeScore, ts, m.String(), str)
  95. }
  96. if dv := m.Duration(); dv < duration {
  97. r5 = errors.Errorf("short duration=%v < %v, %v, %v", dv, duration, m.String(), str)
  98. }
  99. }
  100. }
  101. func TestFast_RtmpPublish_HttpFlvPlay_Basic(t *testing.T) {
  102. // This case is run in parallel.
  103. t.Parallel()
  104. // Setup the max timeout for this case.
  105. ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
  106. defer cancel()
  107. // Check a set of errors.
  108. var r0, r1, r2, r3, r4, r5 error
  109. defer func(ctx context.Context) {
  110. if err := filterTestError(ctx.Err(), r0, r1, r2, r3, r4, r5); err != nil {
  111. t.Errorf("Fail for err %+v", err)
  112. } else {
  113. logger.Tf(ctx, "test done with err %+v", err)
  114. }
  115. }(ctx)
  116. var wg sync.WaitGroup
  117. defer wg.Wait()
  118. // Start SRS server and wait for it to be ready.
  119. svr := NewSRSServer(func(v *srsServer) {
  120. v.envs = []string{
  121. "SRS_HTTP_SERVER_ENABLED=on",
  122. "SRS_VHOST_HTTP_REMUX_ENABLED=on",
  123. }
  124. })
  125. wg.Add(1)
  126. go func() {
  127. defer wg.Done()
  128. r0 = svr.Run(ctx, cancel)
  129. }()
  130. // Start FFmpeg to publish stream.
  131. streamID := fmt.Sprintf("stream-%v-%v", os.Getpid(), rand.Int())
  132. streamURL := fmt.Sprintf("rtmp://localhost:%v/live/%v", svr.RTMPPort(), streamID)
  133. ffmpeg := NewFFmpeg(func(v *ffmpegClient) {
  134. v.args = []string{
  135. "-stream_loop", "-1", "-re", "-i", *srsPublishAvatar, "-c", "copy", "-f", "flv", streamURL,
  136. }
  137. })
  138. wg.Add(1)
  139. go func() {
  140. defer wg.Done()
  141. <-svr.ReadyCtx().Done()
  142. r1 = ffmpeg.Run(ctx, cancel)
  143. }()
  144. // Start FFprobe to detect and verify stream.
  145. duration := time.Duration(*srsFFprobeDuration) * time.Millisecond
  146. ffprobe := NewFFprobe(func(v *ffprobeClient) {
  147. v.dvrFile = path.Join(svr.WorkDir(), "objs", fmt.Sprintf("srs-ffprobe-%v.flv", streamID))
  148. v.streamURL = fmt.Sprintf("http://localhost:%v/live/%v.flv", svr.HTTPPort(), streamID)
  149. v.duration, v.timeout = duration, time.Duration(*srsFFprobeTimeout)*time.Millisecond
  150. })
  151. wg.Add(1)
  152. go func() {
  153. defer wg.Done()
  154. <-svr.ReadyCtx().Done()
  155. r2 = ffprobe.Run(ctx, cancel)
  156. }()
  157. // Fast quit for probe done.
  158. select {
  159. case <-ctx.Done():
  160. case <-ffprobe.ProbeDoneCtx().Done():
  161. defer cancel()
  162. str, m := ffprobe.Result()
  163. if len(m.Streams) != 2 {
  164. r3 = errors.Errorf("invalid streams=%v, %v, %v", len(m.Streams), m.String(), str)
  165. }
  166. if ts := 90; m.Format.ProbeScore < ts {
  167. r4 = errors.Errorf("low score=%v < %v, %v, %v", m.Format.ProbeScore, ts, m.String(), str)
  168. }
  169. if dv := m.Duration(); dv < duration {
  170. r5 = errors.Errorf("short duration=%v < %v, %v, %v", dv, duration, m.String(), str)
  171. }
  172. }
  173. }
  174. func TestFast_RtmpPublish_RtmpPlay_ChunkSize128(t *testing.T) {
  175. // This case is run in parallel.
  176. t.Parallel()
  177. // Setup the max timeout for this case.
  178. ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
  179. defer cancel()
  180. // Check a set of errors.
  181. var r0, r1, r2, r3, r4, r5 error
  182. defer func(ctx context.Context) {
  183. if err := filterTestError(ctx.Err(), r0, r1, r2, r3, r4, r5); err != nil {
  184. t.Errorf("Fail for err %+v", err)
  185. } else {
  186. logger.Tf(ctx, "test done with err %+v", err)
  187. }
  188. }(ctx)
  189. var wg sync.WaitGroup
  190. defer wg.Wait()
  191. // Start SRS server and wait for it to be ready.
  192. svr := NewSRSServer(func(v *srsServer) {
  193. v.envs = []string{
  194. "SRS_CHUNK_SIZE=128",
  195. }
  196. })
  197. wg.Add(1)
  198. go func() {
  199. defer wg.Done()
  200. r0 = svr.Run(ctx, cancel)
  201. }()
  202. // Start FFmpeg to publish stream.
  203. streamID := fmt.Sprintf("stream-%v-%v", os.Getpid(), rand.Int())
  204. streamURL := fmt.Sprintf("rtmp://localhost:%v/live/%v", svr.RTMPPort(), streamID)
  205. ffmpeg := NewFFmpeg(func(v *ffmpegClient) {
  206. v.args = []string{
  207. "-stream_loop", "-1", "-re", "-i", *srsPublishAvatar, "-c", "copy", "-f", "flv", streamURL,
  208. }
  209. })
  210. wg.Add(1)
  211. go func() {
  212. defer wg.Done()
  213. <-svr.ReadyCtx().Done()
  214. r1 = ffmpeg.Run(ctx, cancel)
  215. }()
  216. // Start FFprobe to detect and verify stream.
  217. duration := time.Duration(*srsFFprobeDuration) * time.Millisecond
  218. ffprobe := NewFFprobe(func(v *ffprobeClient) {
  219. v.dvrFile = path.Join(svr.WorkDir(), "objs", fmt.Sprintf("srs-ffprobe-%v.flv", streamID))
  220. v.streamURL, v.duration, v.timeout = streamURL, duration, time.Duration(*srsFFprobeTimeout)*time.Millisecond
  221. })
  222. wg.Add(1)
  223. go func() {
  224. defer wg.Done()
  225. <-svr.ReadyCtx().Done()
  226. r2 = ffprobe.Run(ctx, cancel)
  227. }()
  228. // Fast quit for probe done.
  229. select {
  230. case <-ctx.Done():
  231. case <-ffprobe.ProbeDoneCtx().Done():
  232. defer cancel()
  233. str, m := ffprobe.Result()
  234. if len(m.Streams) != 2 {
  235. r3 = errors.Errorf("invalid streams=%v, %v, %v", len(m.Streams), m.String(), str)
  236. }
  237. if ts := 90; m.Format.ProbeScore < ts {
  238. r4 = errors.Errorf("low score=%v < %v, %v, %v", m.Format.ProbeScore, ts, m.String(), str)
  239. }
  240. if dv := m.Duration(); dv < duration {
  241. r5 = errors.Errorf("short duration=%v < %v, %v, %v", dv, duration, m.String(), str)
  242. }
  243. }
  244. }
  245. func TestFast_RtmpPublish_RtmpPlay_EnableATC(t *testing.T) {
  246. // This case is run in parallel.
  247. t.Parallel()
  248. // Setup the max timeout for this case.
  249. ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
  250. defer cancel()
  251. // Check a set of errors.
  252. var r0, r1, r2, r3, r4, r5 error
  253. defer func(ctx context.Context) {
  254. if err := filterTestError(ctx.Err(), r0, r1, r2, r3, r4, r5); err != nil {
  255. t.Errorf("Fail for err %+v", err)
  256. } else {
  257. logger.Tf(ctx, "test done with err %+v", err)
  258. }
  259. }(ctx)
  260. var wg sync.WaitGroup
  261. defer wg.Wait()
  262. // Start SRS server and wait for it to be ready.
  263. svr := NewSRSServer(func(v *srsServer) {
  264. v.envs = []string{
  265. "SRS_VHOST_PLAY_ATC=on",
  266. }
  267. })
  268. wg.Add(1)
  269. go func() {
  270. defer wg.Done()
  271. r0 = svr.Run(ctx, cancel)
  272. }()
  273. // Start FFmpeg to publish stream.
  274. streamID := fmt.Sprintf("stream-%v-%v", os.Getpid(), rand.Int())
  275. streamURL := fmt.Sprintf("rtmp://localhost:%v/live/%v", svr.RTMPPort(), streamID)
  276. ffmpeg := NewFFmpeg(func(v *ffmpegClient) {
  277. v.args = []string{
  278. "-stream_loop", "-1", "-re", "-i", *srsPublishAvatar, "-c", "copy", "-f", "flv", streamURL,
  279. }
  280. })
  281. wg.Add(1)
  282. go func() {
  283. defer wg.Done()
  284. <-svr.ReadyCtx().Done()
  285. r1 = ffmpeg.Run(ctx, cancel)
  286. }()
  287. // Start FFprobe to detect and verify stream.
  288. duration := time.Duration(*srsFFprobeDuration) * time.Millisecond
  289. ffprobe := NewFFprobe(func(v *ffprobeClient) {
  290. v.dvrFile = path.Join(svr.WorkDir(), "objs", fmt.Sprintf("srs-ffprobe-%v.flv", streamID))
  291. v.streamURL, v.duration, v.timeout = streamURL, duration, time.Duration(*srsFFprobeTimeout)*time.Millisecond
  292. })
  293. wg.Add(1)
  294. go func() {
  295. defer wg.Done()
  296. <-svr.ReadyCtx().Done()
  297. r2 = ffprobe.Run(ctx, cancel)
  298. }()
  299. // Fast quit for probe done.
  300. select {
  301. case <-ctx.Done():
  302. case <-ffprobe.ProbeDoneCtx().Done():
  303. defer cancel()
  304. str, m := ffprobe.Result()
  305. if len(m.Streams) != 2 {
  306. r3 = errors.Errorf("invalid streams=%v, %v, %v", len(m.Streams), m.String(), str)
  307. }
  308. if ts := 90; m.Format.ProbeScore < ts {
  309. r4 = errors.Errorf("low score=%v < %v, %v, %v", m.Format.ProbeScore, ts, m.String(), str)
  310. }
  311. if dv := m.Duration(); dv < duration {
  312. r5 = errors.Errorf("short duration=%v < %v, %v, %v", dv, duration, m.String(), str)
  313. }
  314. }
  315. }