2
0

util.go 34 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301
  1. // The MIT License (MIT)
  2. //
  3. // # Copyright (c) 2023 Winlin
  4. //
  5. // Permission is hereby granted, free of charge, to any person obtaining a copy of
  6. // this software and associated documentation files (the "Software"), to deal in
  7. // the Software without restriction, including without limitation the rights to
  8. // use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  9. // the Software, and to permit persons to whom the Software is furnished to do so,
  10. // subject to the following conditions:
  11. //
  12. // The above copyright notice and this permission notice shall be included in all
  13. // copies or substantial portions of the Software.
  14. //
  15. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  16. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  17. // FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  18. // COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  19. // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  20. // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  21. package blackbox
  22. import (
  23. "bytes"
  24. "context"
  25. "encoding/json"
  26. "flag"
  27. "fmt"
  28. "github.com/ossrs/go-oryx-lib/errors"
  29. ohttp "github.com/ossrs/go-oryx-lib/http"
  30. "github.com/ossrs/go-oryx-lib/logger"
  31. "io/ioutil"
  32. "math/rand"
  33. "net/http"
  34. "net/url"
  35. "os"
  36. "os/exec"
  37. "path"
  38. "strconv"
  39. "strings"
  40. "sync"
  41. "syscall"
  42. "time"
  43. )
  44. var srsLog *bool
  45. var srsStdout *bool
  46. var srsFFmpegStderr *bool
  47. var srsDVRStderr *bool
  48. var srsFFprobeStdout *bool
  49. var srsTimeout *int
  50. var srsFFprobeDuration *int
  51. var srsFFprobeTimeout *int
  52. var srsFFprobeHEVCTimeout *int
  53. var srsBinary *string
  54. var srsFFmpeg *string
  55. var srsFFprobe *string
  56. var srsPublishAvatar *string
  57. func prepareTest() (err error) {
  58. srsLog = flag.Bool("srs-log", false, "Whether enable the detail log")
  59. srsStdout = flag.Bool("srs-stdout", false, "Whether enable the SRS stdout log")
  60. srsFFmpegStderr = flag.Bool("srs-ffmpeg-stderr", false, "Whether enable the FFmpeg stderr log")
  61. srsDVRStderr = flag.Bool("srs-dvr-stderr", false, "Whether enable the DVR stderr log")
  62. srsFFprobeStdout = flag.Bool("srs-ffprobe-stdout", false, "Whether enable the FFprobe stdout log")
  63. srsTimeout = flag.Int("srs-timeout", 64000, "For each case, the timeout in ms")
  64. srsFFprobeDuration = flag.Int("srs-ffprobe-duration", 16000, "For each case, the duration for ffprobe in ms")
  65. srsFFprobeTimeout = flag.Int("srs-ffprobe-timeout", 21000, "For each case, the timeout for ffprobe in ms")
  66. srsBinary = flag.String("srs-binary", "../../objs/srs", "The binary to start SRS server")
  67. srsFFmpeg = flag.String("srs-ffmpeg", "ffmpeg", "The FFmpeg tool")
  68. srsFFprobe = flag.String("srs-ffprobe", "ffprobe", "The FFprobe tool")
  69. srsPublishAvatar = flag.String("srs-publish-avatar", "avatar.flv", "The avatar file for publisher.")
  70. srsFFprobeHEVCTimeout = flag.Int("srs-ffprobe-hevc-timeout", 30000, "For each case, the timeout for ffprobe in ms")
  71. // Parse user options.
  72. flag.Parse()
  73. // Try to locate file.
  74. tryOpenFile := func(filename string) (string, error) {
  75. // Match if file exists.
  76. if _, err := os.Stat(filename); err == nil {
  77. return filename, nil
  78. }
  79. // If we run in GoLand, the current directory is in blackbox, so we use parent directory.
  80. nFilename := path.Join("../", filename)
  81. if _, err := os.Stat(nFilename); err == nil {
  82. return nFilename, nil
  83. }
  84. // Try to find file by which if it's a command like ffmpeg.
  85. cmd := exec.Command("which", filename)
  86. cmd.Env = []string{"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"}
  87. if v, err := cmd.Output(); err == nil {
  88. return strings.TrimSpace(string(v)), nil
  89. }
  90. return filename, errors.Errorf("file %v not found", filename)
  91. }
  92. // Check and relocate path of tools.
  93. if *srsBinary, err = tryOpenFile(*srsBinary); err != nil {
  94. return err
  95. }
  96. if *srsFFmpeg, err = tryOpenFile(*srsFFmpeg); err != nil {
  97. return err
  98. }
  99. if *srsFFprobe, err = tryOpenFile(*srsFFprobe); err != nil {
  100. return err
  101. }
  102. if *srsPublishAvatar, err = tryOpenFile(*srsPublishAvatar); err != nil {
  103. return err
  104. }
  105. return nil
  106. }
  107. // Filter the test error, ignore context.Canceled
  108. func filterTestError(errs ...error) error {
  109. var filteredErrors []error
  110. for _, err := range errs {
  111. if err == nil || errors.Cause(err) == context.Canceled {
  112. continue
  113. }
  114. // If url error, server maybe error, do not print the detail log.
  115. if r0 := errors.Cause(err); r0 != nil {
  116. if r1, ok := r0.(*url.Error); ok {
  117. err = r1
  118. }
  119. }
  120. filteredErrors = append(filteredErrors, err)
  121. }
  122. if len(filteredErrors) == 0 {
  123. return nil
  124. }
  125. if len(filteredErrors) == 1 {
  126. return filteredErrors[0]
  127. }
  128. var descs []string
  129. for i, err := range filteredErrors[1:] {
  130. descs = append(descs, fmt.Sprintf("err #%d, %+v", i, err))
  131. }
  132. return errors.Wrapf(filteredErrors[0], "with %v", strings.Join(descs, ","))
  133. }
  134. // The SRSPortAllocator is SRS port manager.
  135. type SRSPortAllocator struct {
  136. ports sync.Map
  137. }
  138. func NewSRSPortAllocator() *SRSPortAllocator {
  139. return &SRSPortAllocator{}
  140. }
  141. func (v *SRSPortAllocator) Allocate() int {
  142. for i := 0; i < 1024; i++ {
  143. port := 10000 + rand.Int()%50000
  144. if _, ok := v.ports.LoadOrStore(port, true); !ok {
  145. return port
  146. }
  147. time.Sleep(time.Duration(rand.Int()%1000) * time.Microsecond)
  148. }
  149. panic("Allocate port failed")
  150. }
  151. func (v *SRSPortAllocator) Free(port int) {
  152. v.ports.Delete(port)
  153. }
  154. var allocator *SRSPortAllocator
  155. func init() {
  156. allocator = NewSRSPortAllocator()
  157. }
  158. type backendService struct {
  159. // The context for case.
  160. caseCtx context.Context
  161. caseCtxCancel context.CancelFunc
  162. // When SRS process started.
  163. readyCtx context.Context
  164. readyCtxCancel context.CancelFunc
  165. // Whether already closed.
  166. closedCtx context.Context
  167. closedCtxCancel context.CancelFunc
  168. // All goroutines
  169. wg sync.WaitGroup
  170. // The name, args and env for cmd.
  171. name string
  172. args []string
  173. env []string
  174. // If timeout, kill the process.
  175. duration time.Duration
  176. // The process stdout and stderr.
  177. stdout bytes.Buffer
  178. stderr bytes.Buffer
  179. // The process error.
  180. r0 error
  181. // The process pid.
  182. pid int
  183. // Whether ignore process exit status error.
  184. ignoreExitStatusError bool
  185. // Hooks for owner.
  186. // Before start the process.
  187. onBeforeStart func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error
  188. // After started the process.
  189. onAfterStart func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error
  190. // Before kill the process, when case is done.
  191. onBeforeKill func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error
  192. // After stopped the process. Always callback when run is called.
  193. onStop func(ctx context.Context, bs *backendService, cmd *exec.Cmd, r0 error, stdout, stderr *bytes.Buffer) error
  194. // When dispose the process. Always callback when run is called.
  195. onDispose func(ctx context.Context, bs *backendService) error
  196. }
  197. func newBackendService(opts ...func(v *backendService)) *backendService {
  198. v := &backendService{}
  199. v.readyCtx, v.readyCtxCancel = context.WithCancel(context.Background())
  200. v.closedCtx, v.closedCtxCancel = context.WithCancel(context.Background())
  201. for _, opt := range opts {
  202. opt(v)
  203. }
  204. return v
  205. }
  206. func (v *backendService) Close() error {
  207. if v.closedCtx.Err() != nil {
  208. return v.r0
  209. }
  210. v.closedCtxCancel()
  211. if v.caseCtxCancel != nil {
  212. v.caseCtxCancel()
  213. }
  214. if v.readyCtxCancel != nil {
  215. v.readyCtxCancel()
  216. }
  217. v.wg.Wait()
  218. if v.onDispose != nil {
  219. v.onDispose(v.caseCtx, v)
  220. }
  221. logger.Tf(v.caseCtx, "Process is closed, pid=%v, r0=%v", v.pid, v.r0)
  222. return nil
  223. }
  224. func (v *backendService) ReadyCtx() context.Context {
  225. return v.readyCtx
  226. }
  227. func (v *backendService) Run(ctx context.Context, cancel context.CancelFunc) error {
  228. // Always dispose resource of process.
  229. defer v.Close()
  230. // Start SRS with -e, which only use environment variables.
  231. cmd := exec.Command(v.name, v.args...)
  232. // If not started, we also need to callback the onStop.
  233. var processStarted bool
  234. defer func() {
  235. if v.onStop != nil && !processStarted {
  236. v.onStop(ctx, v, cmd, v.r0, &v.stdout, &v.stderr)
  237. }
  238. }()
  239. // Ignore if already error.
  240. if ctx.Err() != nil {
  241. return ctx.Err()
  242. }
  243. // Save the context of case.
  244. v.caseCtx, v.caseCtxCancel = ctx, cancel
  245. // Setup stdout and stderr.
  246. cmd.Stdout = &v.stdout
  247. cmd.Stderr = &v.stderr
  248. cmd.Env = v.env
  249. if v.onBeforeStart != nil {
  250. if err := v.onBeforeStart(ctx, v, cmd); err != nil {
  251. return errors.Wrapf(err, "onBeforeStart failed")
  252. }
  253. }
  254. // Try to start the SRS server.
  255. if err := cmd.Start(); err != nil {
  256. return err
  257. }
  258. // Now process started, query the pid.
  259. v.pid = cmd.Process.Pid
  260. v.readyCtxCancel()
  261. processStarted = true
  262. if v.onAfterStart != nil {
  263. if err := v.onAfterStart(ctx, v, cmd); err != nil {
  264. return errors.Wrapf(err, "onAfterStart failed")
  265. }
  266. }
  267. // The context for SRS process.
  268. processDone, processDoneCancel := context.WithCancel(context.Background())
  269. // If exceed timeout, kill the process.
  270. v.wg.Add(1)
  271. go func() {
  272. defer v.wg.Done()
  273. if v.duration <= 0 {
  274. return
  275. }
  276. select {
  277. case <-ctx.Done():
  278. case <-time.After(v.duration):
  279. logger.Tf(ctx, "Process killed duration=%v, pid=%v, name=%v, args=%v", v.duration, v.pid, v.name, v.args)
  280. cmd.Process.Kill()
  281. }
  282. }()
  283. // If SRS process terminated, notify case to stop.
  284. v.wg.Add(1)
  285. go func() {
  286. defer v.wg.Done()
  287. // When SRS quit, also terminate the case.
  288. defer cancel()
  289. // Notify other goroutine, SRS already done.
  290. defer processDoneCancel()
  291. if err := cmd.Wait(); err != nil && !v.ignoreExitStatusError {
  292. v.r0 = errors.Wrapf(err, "Process wait err, pid=%v, name=%v, args=%v", v.pid, v.name, v.args)
  293. }
  294. if v.onStop != nil {
  295. if err := v.onStop(ctx, v, cmd, v.r0, &v.stdout, &v.stderr); err != nil {
  296. if v.r0 == nil {
  297. v.r0 = errors.Wrapf(err, "Process onStop err, pid=%v, name=%v, args=%v", v.pid, v.name, v.args)
  298. } else {
  299. logger.Ef(ctx, "Process onStop err %v", err)
  300. }
  301. }
  302. }
  303. }()
  304. // If case terminated, notify SRS process to stop.
  305. v.wg.Add(1)
  306. go func() {
  307. defer v.wg.Done()
  308. select {
  309. case <-ctx.Done():
  310. // Notify owner that we're going to kill the process.
  311. if v.onBeforeKill != nil {
  312. v.onBeforeKill(ctx, v, cmd)
  313. }
  314. // When case terminated, also terminate the SRS process.
  315. cmd.Process.Signal(syscall.SIGINT)
  316. case <-processDone.Done():
  317. // Ignore if already done.
  318. return
  319. }
  320. // Start a goroutine to ensure process killed.
  321. go func() {
  322. time.Sleep(3 * time.Second)
  323. if processDone.Err() == nil { // Ignore if already done.
  324. cmd.Process.Signal(syscall.SIGKILL)
  325. }
  326. }()
  327. }()
  328. // Wait for SRS or case done.
  329. select {
  330. case <-ctx.Done():
  331. case <-processDone.Done():
  332. }
  333. return v.r0
  334. }
  335. // ServiceRunner is an interface to run backend service.
  336. type ServiceRunner interface {
  337. Run(ctx context.Context, cancel context.CancelFunc) error
  338. }
  339. // ServiceReadyQuerier is an interface to detect whether service is ready.
  340. type ServiceReadyQuerier interface {
  341. ReadyCtx() context.Context
  342. }
  343. // SRSServer is the interface for SRS server.
  344. type SRSServer interface {
  345. ServiceRunner
  346. ServiceReadyQuerier
  347. // WorkDir is the current working directory for SRS.
  348. WorkDir() string
  349. // RTMPPort is the RTMP stream port.
  350. RTMPPort() int
  351. // HTTPPort is the HTTP stream port.
  352. HTTPPort() int
  353. // APIPort is the HTTP API port.
  354. APIPort() int
  355. // SRTPort is the SRT UDP port.
  356. SRTPort() int
  357. }
  358. // srsServer is a SRS server instance.
  359. type srsServer struct {
  360. // The backend service process.
  361. process *backendService
  362. // When SRS process started.
  363. readyCtx context.Context
  364. readyCtxCancel context.CancelFunc
  365. // SRS server ID.
  366. srsID string
  367. // SRS workdir.
  368. workDir string
  369. // SRS PID file, relative to the workdir.
  370. srsRelativePidFile string
  371. // SRS server ID cache file, relative to the workdir.
  372. srsRelativeIDFile string
  373. // SRS RTMP server listen port.
  374. rtmpListen int
  375. // HTTP API listen port.
  376. apiListen int
  377. // HTTP server listen port.
  378. httpListen int
  379. // SRT UDP server listen port.
  380. srtListen int
  381. // The envs from user.
  382. envs []string
  383. }
  384. func NewSRSServer(opts ...func(v *srsServer)) SRSServer {
  385. rid := fmt.Sprintf("%v-%v", os.Getpid(), rand.Int())
  386. v := &srsServer{
  387. workDir: path.Join("objs", fmt.Sprintf("%v", rand.Int())),
  388. srsID: fmt.Sprintf("srs-id-%v", rid),
  389. process: newBackendService(),
  390. }
  391. v.readyCtx, v.readyCtxCancel = context.WithCancel(context.Background())
  392. // If we run in GoLand, the current directory is in blackbox, so we use parent directory.
  393. if _, err := os.Stat("objs"); err != nil {
  394. v.workDir = path.Join("..", "objs", fmt.Sprintf("%v", rand.Int()))
  395. }
  396. // Do allocate resource.
  397. v.srsRelativePidFile = path.Join("objs", fmt.Sprintf("srs-%v.pid", rid))
  398. v.srsRelativeIDFile = path.Join("objs", fmt.Sprintf("srs-%v.id", rid))
  399. v.rtmpListen = allocator.Allocate()
  400. v.apiListen = allocator.Allocate()
  401. v.httpListen = allocator.Allocate()
  402. v.srtListen = allocator.Allocate()
  403. // Do cleanup.
  404. v.process.onDispose = func(ctx context.Context, bs *backendService) error {
  405. allocator.Free(v.rtmpListen)
  406. allocator.Free(v.apiListen)
  407. allocator.Free(v.httpListen)
  408. allocator.Free(v.srtListen)
  409. if _, err := os.Stat(v.workDir); err == nil {
  410. os.RemoveAll(v.workDir)
  411. }
  412. logger.Tf(ctx, "SRS server is closed, id=%v, pid=%v, cleanup=%v r0=%v",
  413. v.srsID, bs.pid, v.workDir, bs.r0)
  414. return nil
  415. }
  416. for _, opt := range opts {
  417. opt(v)
  418. }
  419. return v
  420. }
  421. func (v *srsServer) ReadyCtx() context.Context {
  422. return v.readyCtx
  423. }
  424. func (v *srsServer) RTMPPort() int {
  425. return v.rtmpListen
  426. }
  427. func (v *srsServer) HTTPPort() int {
  428. return v.httpListen
  429. }
  430. func (v *srsServer) APIPort() int {
  431. return v.apiListen
  432. }
  433. func (v *srsServer) SRTPort() int {
  434. return v.srtListen
  435. }
  436. func (v *srsServer) WorkDir() string {
  437. return v.workDir
  438. }
  439. func (v *srsServer) Run(ctx context.Context, cancel context.CancelFunc) error {
  440. logger.Tf(ctx, "Starting SRS server, dir=%v, binary=%v, id=%v, pid=%v, rtmp=%v",
  441. v.workDir, *srsBinary, v.srsID, v.srsRelativePidFile, v.rtmpListen,
  442. )
  443. // Create directories.
  444. if err := os.MkdirAll(path.Join(v.workDir, "./objs/nginx/html"), os.FileMode(0755)|os.ModeDir); err != nil {
  445. return errors.Wrapf(err, "SRS create directory %v", path.Join(v.workDir, "./objs/nginx/html"))
  446. }
  447. // Setup the name and args of process.
  448. v.process.name = *srsBinary
  449. v.process.args = []string{"-e"}
  450. // Setup the constant values.
  451. v.process.env = []string{
  452. // Run in frontend.
  453. "SRS_DAEMON=off",
  454. // Write logs to stdout and stderr.
  455. "SRS_SRS_LOG_FILE=console",
  456. // Disable warning for asan.
  457. "MallocNanoZone=0",
  458. // Avoid error for macOS, which ulimit to 256.
  459. "SRS_MAX_CONNECTIONS=100",
  460. }
  461. // For directories.
  462. v.process.env = append(v.process.env, []string{
  463. // SRS working directory.
  464. fmt.Sprintf("SRS_WORK_DIR=%v", v.workDir),
  465. // Setup the default directory for HTTP server.
  466. "SRS_HTTP_SERVER_DIR=./objs/nginx/html",
  467. // Setup the default directory for HLS stream.
  468. "SRS_VHOST_HLS_HLS_PATH=./objs/nginx/html",
  469. "SRS_VHOST_HLS_HLS_M3U8_FILE=[app]/[stream].m3u8",
  470. "SRS_VHOST_HLS_HLS_TS_FILE=[app]/[stream]-[seq].ts",
  471. }...)
  472. // For variables.
  473. v.process.env = append(v.process.env, []string{
  474. // SRS PID file.
  475. fmt.Sprintf("SRS_PID=%v", v.srsRelativePidFile),
  476. // SRS ID file.
  477. fmt.Sprintf("SRS_SERVER_ID=%v", v.srsID),
  478. // HTTP API to detect the service.
  479. fmt.Sprintf("SRS_HTTP_API_ENABLED=on"),
  480. fmt.Sprintf("SRS_HTTP_API_LISTEN=%v", v.apiListen),
  481. // Setup the RTMP listen port.
  482. fmt.Sprintf("SRS_LISTEN=%v", v.rtmpListen),
  483. // Setup the HTTP sever listen port.
  484. fmt.Sprintf("SRS_HTTP_SERVER_LISTEN=%v", v.httpListen),
  485. // Setup the SRT server listen port.
  486. fmt.Sprintf("SRS_SRT_SERVER_LISTEN=%v", v.srtListen),
  487. }...)
  488. // Rewrite envs by case.
  489. if v.envs != nil {
  490. v.process.env = append(v.process.env, v.envs...)
  491. }
  492. // Allow user to rewrite them.
  493. for _, env := range os.Environ() {
  494. if strings.HasPrefix(env, "SRS") || strings.HasPrefix(env, "PATH") {
  495. v.process.env = append(v.process.env, env)
  496. }
  497. }
  498. // Wait for all goroutine to done.
  499. var wg sync.WaitGroup
  500. defer wg.Wait()
  501. // Start a task to detect the HTTP API.
  502. wg.Add(1)
  503. go func() {
  504. defer wg.Done()
  505. for ctx.Err() == nil {
  506. time.Sleep(100 * time.Millisecond)
  507. r := fmt.Sprintf("http://localhost:%v/api/v1/versions", v.apiListen)
  508. res, err := http.Get(r)
  509. if err != nil {
  510. continue
  511. }
  512. defer res.Body.Close()
  513. b, err := ioutil.ReadAll(res.Body)
  514. if err != nil {
  515. continue
  516. }
  517. logger.Tf(ctx, "SRS API is ready, %v %v", r, string(b))
  518. v.readyCtxCancel()
  519. return
  520. }
  521. }()
  522. // Hooks for process.
  523. v.process.onBeforeStart = func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error {
  524. logger.Tf(ctx, "SRS id=%v, env %v %v %v",
  525. v.srsID, strings.Join(cmd.Env, " "), bs.name, strings.Join(bs.args, " "))
  526. return nil
  527. }
  528. v.process.onAfterStart = func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error {
  529. logger.Tf(ctx, "SRS id=%v, pid=%v", v.srsID, bs.pid)
  530. return nil
  531. }
  532. v.process.onStop = func(ctx context.Context, bs *backendService, cmd *exec.Cmd, r0 error, stdout, stderr *bytes.Buffer) error {
  533. // Should be ready when process stop.
  534. defer v.readyCtxCancel()
  535. logger.Tf(ctx, "SRS process pid=%v exit, r0=%v", bs.pid, r0)
  536. if *srsStdout == true {
  537. logger.Tf(ctx, "SRS process pid=%v, stdout is \n%v", bs.pid, stdout.String())
  538. }
  539. if stderr.Len() > 0 {
  540. logger.Tf(ctx, "SRS process pid=%v, stderr is \n%v", bs.pid, stderr.String())
  541. }
  542. return nil
  543. }
  544. // Run the process util quit.
  545. return v.process.Run(ctx, cancel)
  546. }
  547. type FFmpegClient interface {
  548. ServiceRunner
  549. ServiceReadyQuerier
  550. }
  551. type ffmpegClient struct {
  552. // The backend service process.
  553. process *backendService
  554. // FFmpeg cli args, without ffmpeg binary.
  555. args []string
  556. // Let the process quit, do not cancel the case.
  557. cancelCaseWhenQuit bool
  558. // When timeout, stop FFmpeg, sometimes the '-t' does not work.
  559. ffmpegDuration time.Duration
  560. }
  561. func NewFFmpeg(opts ...func(v *ffmpegClient)) FFmpegClient {
  562. v := &ffmpegClient{
  563. process: newBackendService(),
  564. cancelCaseWhenQuit: true,
  565. }
  566. // Do cleanup.
  567. v.process.onDispose = func(ctx context.Context, bs *backendService) error {
  568. return nil
  569. }
  570. // We ignore any exit error, because FFmpeg might exit with error even publish ok.
  571. v.process.ignoreExitStatusError = true
  572. for _, opt := range opts {
  573. opt(v)
  574. }
  575. return v
  576. }
  577. func (v *ffmpegClient) ReadyCtx() context.Context {
  578. return v.process.ReadyCtx()
  579. }
  580. func (v *ffmpegClient) Run(ctx context.Context, cancel context.CancelFunc) error {
  581. logger.Tf(ctx, "Starting FFmpeg by %v", strings.Join(v.args, " "))
  582. v.process.name = *srsFFmpeg
  583. v.process.args = v.args
  584. v.process.env = os.Environ()
  585. v.process.duration = v.ffmpegDuration
  586. v.process.onStop = func(ctx context.Context, bs *backendService, cmd *exec.Cmd, r0 error, stdout, stderr *bytes.Buffer) error {
  587. logger.Tf(ctx, "FFmpeg process pid=%v exit, r0=%v, stdout=%v", bs.pid, r0, stdout.String())
  588. if *srsFFmpegStderr && stderr.Len() > 0 {
  589. logger.Tf(ctx, "FFmpeg process pid=%v, stderr is \n%v", bs.pid, stderr.String())
  590. }
  591. return nil
  592. }
  593. // We might not want to cancel the case, for example, when check DVR by session, we just let the FFmpeg process to
  594. // quit and we should check the callback and DVR file.
  595. ffCtx, ffCancel := context.WithCancel(ctx)
  596. go func() {
  597. select {
  598. case <-ctx.Done():
  599. case <-ffCtx.Done():
  600. if v.cancelCaseWhenQuit {
  601. cancel()
  602. }
  603. }
  604. }()
  605. return v.process.Run(ffCtx, ffCancel)
  606. }
  607. type FFprobeClient interface {
  608. ServiceRunner
  609. // ProbeDoneCtx indicates the probe is done.
  610. ProbeDoneCtx() context.Context
  611. // Result return the raw string and metadata.
  612. Result() (string, *ffprobeObject)
  613. }
  614. type ffprobeClient struct {
  615. // The DVR file for ffprobe. Stream should be DVR to file, then use ffprobe to detect it. If DVR by FFmpeg, we will
  616. // start a FFmpeg process to do the DVR, or the DVR should be done by other tools.
  617. dvrFile string
  618. // The timeout to wait for task to done.
  619. timeout time.Duration
  620. // Whether do DVR by FFmpeg, if using SRS DVR, please set to false.
  621. dvrByFFmpeg bool
  622. // The stream to DVR for probing. Ignore if not DVR by ffmpeg
  623. streamURL string
  624. // The duration of video file for DVR and probing.
  625. duration time.Duration
  626. // When probe stream metadata object.
  627. doneCtx context.Context
  628. doneCancel context.CancelFunc
  629. // The metadata object.
  630. metadata *ffprobeObject
  631. // The raw string of ffprobe.
  632. rawString string
  633. }
  634. func NewFFprobe(opts ...func(v *ffprobeClient)) FFprobeClient {
  635. v := &ffprobeClient{
  636. metadata: &ffprobeObject{},
  637. dvrByFFmpeg: true,
  638. }
  639. v.doneCtx, v.doneCancel = context.WithCancel(context.Background())
  640. for _, opt := range opts {
  641. opt(v)
  642. }
  643. return v
  644. }
  645. func (v *ffprobeClient) ProbeDoneCtx() context.Context {
  646. return v.doneCtx
  647. }
  648. func (v *ffprobeClient) Result() (string, *ffprobeObject) {
  649. return v.rawString, v.metadata
  650. }
  651. func (v *ffprobeClient) Run(ctxCase context.Context, cancelCase context.CancelFunc) error {
  652. if true {
  653. ctx, cancel := context.WithTimeout(ctxCase, v.timeout)
  654. defer cancel()
  655. logger.Tf(ctx, "Starting FFprobe for stream=%v, dvr=%v, duration=%v, timeout=%v",
  656. v.streamURL, v.dvrFile, v.duration, v.timeout)
  657. // Try to start a DVR process.
  658. for ctx.Err() == nil {
  659. // If not DVR by FFmpeg, we just wait the DVR file to be ready, and it should be done by SRS or other tools.
  660. if v.dvrByFFmpeg {
  661. // If error, just ignore and retry, because the stream might not be ready. For example, for HLS, the DVR process
  662. // might need to wait for a duration of segment, 10s as such.
  663. _ = v.doDVR(ctx)
  664. }
  665. // Check whether DVR file is ok.
  666. if fs, err := os.Stat(v.dvrFile); err == nil && fs.Size() > 1024 {
  667. logger.Tf(ctx, "DVR FFprobe file is ok, file=%v, size=%v", v.dvrFile, fs.Size())
  668. break
  669. }
  670. // If not DVR by FFmpeg, must be by other tools, only need to wait.
  671. if !v.dvrByFFmpeg {
  672. logger.Tf(ctx, "Waiting stream=%v to be DVR", v.streamURL)
  673. }
  674. // Wait for a while and retry. Use larger timeout for HLS.
  675. retryTimeout := 1 * time.Second
  676. if strings.Contains(v.streamURL, ".m3u8") || v.dvrFile == "" {
  677. retryTimeout = 3 * time.Second
  678. }
  679. select {
  680. case <-ctx.Done():
  681. case <-time.After(retryTimeout):
  682. }
  683. }
  684. }
  685. // Ignore if case terminated.
  686. if ctxCase.Err() != nil {
  687. return nil
  688. }
  689. // Start a probe process for the DVR file.
  690. return v.doProbe(ctxCase, cancelCase)
  691. }
  692. func (v *ffprobeClient) doDVR(ctx context.Context) error {
  693. ctx, cancel := context.WithCancel(ctx)
  694. if !v.dvrByFFmpeg {
  695. return nil
  696. }
  697. process := newBackendService()
  698. process.name = *srsFFmpeg
  699. process.args = []string{
  700. "-t", fmt.Sprintf("%v", int64(v.duration/time.Second)),
  701. "-i", v.streamURL, "-c", "copy", "-y", v.dvrFile,
  702. }
  703. process.env = os.Environ()
  704. process.onDispose = func(ctx context.Context, bs *backendService) error {
  705. return nil
  706. }
  707. process.onBeforeStart = func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error {
  708. logger.Tf(ctx, "DVR start %v %v", bs.name, strings.Join(bs.args, " "))
  709. return nil
  710. }
  711. process.onStop = func(ctx context.Context, bs *backendService, cmd *exec.Cmd, r0 error, stdout, stderr *bytes.Buffer) error {
  712. logger.Tf(ctx, "DVR process pid=%v exit, r0=%v, stdout=%v", bs.pid, r0, stdout.String())
  713. if *srsDVRStderr && stderr.Len() > 0 {
  714. logger.Tf(ctx, "DVR process pid=%v, stderr is \n%v", bs.pid, stderr.String())
  715. }
  716. return nil
  717. }
  718. return process.Run(ctx, cancel)
  719. }
  720. func (v *ffprobeClient) doProbe(ctx context.Context, cancel context.CancelFunc) error {
  721. process := newBackendService()
  722. process.name = *srsFFprobe
  723. process.args = []string{
  724. "-show_error", "-show_private_data", "-v", "quiet", "-find_stream_info",
  725. "-analyzeduration", fmt.Sprintf("%v", int64(v.duration/time.Microsecond)),
  726. "-print_format", "json", "-show_format", "-show_streams", v.dvrFile,
  727. }
  728. process.env = os.Environ()
  729. process.onDispose = func(ctx context.Context, bs *backendService) error {
  730. if _, err := os.Stat(v.dvrFile); !os.IsNotExist(err) {
  731. os.Remove(v.dvrFile)
  732. }
  733. return nil
  734. }
  735. process.onBeforeStart = func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error {
  736. logger.Tf(ctx, "FFprobe start %v %v", bs.name, strings.Join(bs.args, " "))
  737. return nil
  738. }
  739. process.onStop = func(ctx context.Context, bs *backendService, cmd *exec.Cmd, r0 error, stdout, stderr *bytes.Buffer) error {
  740. logger.Tf(ctx, "FFprobe process pid=%v exit, r0=%v, stderr=%v", bs.pid, r0, stderr.String())
  741. if *srsFFprobeStdout && stdout.Len() > 0 {
  742. logger.Tf(ctx, "FFprobe process pid=%v, stdout is \n%v", bs.pid, stdout.String())
  743. }
  744. str := stdout.String()
  745. v.rawString = str
  746. if err := json.Unmarshal([]byte(str), v.metadata); err != nil {
  747. return err
  748. }
  749. m := v.metadata
  750. logger.Tf(ctx, "FFprobe done pid=%v, %v", bs.pid, m.String())
  751. v.doneCancel()
  752. return nil
  753. }
  754. return process.Run(ctx, cancel)
  755. }
  756. /*
  757. "index": 0,
  758. "codec_name": "h264",
  759. "codec_long_name": "H.264 / AVC / MPEG-4 AVC / MPEG-4 part 10",
  760. "profile": "High",
  761. "codec_type": "video",
  762. "codec_tag_string": "avc1",
  763. "codec_tag": "0x31637661",
  764. "width": 768,
  765. "height": 320,
  766. "coded_width": 768,
  767. "coded_height": 320,
  768. "closed_captions": 0,
  769. "film_grain": 0,
  770. "has_b_frames": 2,
  771. "sample_aspect_ratio": "1:1",
  772. "display_aspect_ratio": "12:5",
  773. "pix_fmt": "yuv420p",
  774. "level": 32,
  775. "chroma_location": "left",
  776. "field_order": "progressive",
  777. "refs": 1,
  778. "is_avc": "true",
  779. "nal_length_size": "4",
  780. "id": "0x1",
  781. "r_frame_rate": "25/1",
  782. "avg_frame_rate": "25/1",
  783. "time_base": "1/16000",
  784. "start_pts": 1280,
  785. "start_time": "0.080000",
  786. "duration_ts": 160000,
  787. "duration": "10.000000",
  788. "bit_rate": "196916",
  789. "bits_per_raw_sample": "8",
  790. "nb_frames": "250",
  791. "extradata_size": 41,
  792. "disposition": {
  793. "default": 1,
  794. "dub": 0,
  795. "original": 0,
  796. "comment": 0,
  797. "lyrics": 0,
  798. "karaoke": 0,
  799. "forced": 0,
  800. "hearing_impaired": 0,
  801. "visual_impaired": 0,
  802. "clean_effects": 0,
  803. "attached_pic": 0,
  804. "timed_thumbnails": 0,
  805. "captions": 0,
  806. "descriptions": 0,
  807. "metadata": 0,
  808. "dependent": 0,
  809. "still_image": 0
  810. },
  811. "tags": {
  812. "language": "und",
  813. "handler_name": "VideoHandler",
  814. "vendor_id": "[0][0][0][0]"
  815. }
  816. */
  817. /*
  818. "index": 1,
  819. "codec_name": "aac",
  820. "codec_long_name": "AAC (Advanced Audio Coding)",
  821. "profile": "LC",
  822. "codec_type": "audio",
  823. "codec_tag_string": "mp4a",
  824. "codec_tag": "0x6134706d",
  825. "sample_fmt": "fltp",
  826. "sample_rate": "44100",
  827. "channels": 2,
  828. "channel_layout": "stereo",
  829. "bits_per_sample": 0,
  830. "id": "0x2",
  831. "r_frame_rate": "0/0",
  832. "avg_frame_rate": "0/0",
  833. "time_base": "1/44100",
  834. "start_pts": 132,
  835. "start_time": "0.002993",
  836. "duration_ts": 441314,
  837. "duration": "10.007120",
  838. "bit_rate": "29827",
  839. "nb_frames": "431",
  840. "extradata_size": 2,
  841. "disposition": {
  842. "default": 1,
  843. "dub": 0,
  844. "original": 0,
  845. "comment": 0,
  846. "lyrics": 0,
  847. "karaoke": 0,
  848. "forced": 0,
  849. "hearing_impaired": 0,
  850. "visual_impaired": 0,
  851. "clean_effects": 0,
  852. "attached_pic": 0,
  853. "timed_thumbnails": 0,
  854. "captions": 0,
  855. "descriptions": 0,
  856. "metadata": 0,
  857. "dependent": 0,
  858. "still_image": 0
  859. },
  860. "tags": {
  861. "language": "und",
  862. "handler_name": "SoundHandler",
  863. "vendor_id": "[0][0][0][0]"
  864. }
  865. */
  866. type ffprobeObjectMedia struct {
  867. Index int `json:"index"`
  868. CodecName string `json:"codec_name"`
  869. CodecType string `json:"codec_type"`
  870. Timebase string `json:"time_base"`
  871. Bitrate string `json:"bit_rate"`
  872. Profile string `json:"profile"`
  873. Duration string `json:"duration"`
  874. CodecTagString string `json:"codec_tag_string"`
  875. // For video codec.
  876. Width int `json:"width"`
  877. Height int `json:"height"`
  878. CodedWidth int `json:"coded_width"`
  879. CodedHeight int `json:"coded_height"`
  880. RFramerate string `json:"r_frame_rate"`
  881. AvgFramerate string `json:"avg_frame_rate"`
  882. PixFmt string `json:"pix_fmt"`
  883. Level int `json:"level"`
  884. // For audio codec.
  885. Channels int `json:"channels"`
  886. ChannelLayout string `json:"channel_layout"`
  887. SampleFmt string `json:"sample_fmt"`
  888. SampleRate string `json:"sample_rate"`
  889. }
  890. func (v *ffprobeObjectMedia) String() string {
  891. sb := strings.Builder{}
  892. sb.WriteString(fmt.Sprintf("index=%v, codec=%v, type=%v, tb=%v, bitrate=%v, profile=%v, duration=%v",
  893. v.Index, v.CodecName, v.CodecType, v.Timebase, v.Bitrate, v.Profile, v.Duration))
  894. sb.WriteString(fmt.Sprintf(", codects=%v", v.CodecTagString))
  895. if v.CodecType == "video" {
  896. sb.WriteString(fmt.Sprintf(", size=%vx%v, csize=%vx%v, rfr=%v, afr=%v, pix=%v, level=%v",
  897. v.Width, v.Height, v.CodedWidth, v.CodedHeight, v.RFramerate, v.AvgFramerate, v.PixFmt, v.Level))
  898. } else if v.CodecType == "audio" {
  899. sb.WriteString(fmt.Sprintf(", channels=%v, layout=%v, fmt=%v, srate=%v",
  900. v.Channels, v.ChannelLayout, v.SampleFmt, v.SampleRate))
  901. }
  902. return sb.String()
  903. }
  904. /*
  905. "filename": "../objs/srs-ffprobe-stream-84487-8369019999559815097.mp4",
  906. "nb_streams": 2,
  907. "nb_programs": 0,
  908. "format_name": "mov,mp4,m4a,3gp,3g2,mj2",
  909. "format_long_name": "QuickTime / MOV",
  910. "start_time": "0.002993",
  911. "duration": "10.080000",
  912. "size": "292725",
  913. "bit_rate": "232321",
  914. "probe_score": 100,
  915. "tags": {
  916. "major_brand": "isom",
  917. "minor_version": "512",
  918. "compatible_brands": "isomiso2avc1mp41",
  919. "encoder": "Lavf59.27.100"
  920. }
  921. */
  922. type ffprobeObjectFormat struct {
  923. Filename string `json:"filename"`
  924. Duration string `json:"duration"`
  925. NBStream int16 `json:"nb_streams"`
  926. Size string `json:"size"`
  927. Bitrate string `json:"bit_rate"`
  928. ProbeScore int `json:"probe_score"`
  929. }
  930. func (v *ffprobeObjectFormat) String() string {
  931. return fmt.Sprintf("file=%v, duration=%v, score=%v, size=%v, bitrate=%v, streams=%v",
  932. v.Filename, v.Duration, v.ProbeScore, v.Size, v.Bitrate, v.NBStream)
  933. }
  934. /*
  935. {
  936. "streams": [{ffprobeObjectMedia}, {ffprobeObjectMedia}],
  937. "format": {ffprobeObjectFormat}
  938. }
  939. */
  940. type ffprobeObject struct {
  941. Format ffprobeObjectFormat `json:"format"`
  942. Streams []ffprobeObjectMedia `json:"streams"`
  943. }
  944. func (v *ffprobeObject) String() string {
  945. sb := strings.Builder{}
  946. sb.WriteString(v.Format.String())
  947. sb.WriteString(", [")
  948. for _, stream := range v.Streams {
  949. sb.WriteString("{")
  950. sb.WriteString(stream.String())
  951. sb.WriteString("}")
  952. }
  953. sb.WriteString("]")
  954. return sb.String()
  955. }
  956. func (v *ffprobeObject) Duration() time.Duration {
  957. dv, err := strconv.ParseFloat(v.Format.Duration, 10)
  958. if err != nil {
  959. return time.Duration(0)
  960. }
  961. return time.Duration(dv*1000) * time.Millisecond
  962. }
  963. func (v *ffprobeObject) Video() *ffprobeObjectMedia {
  964. for _, media := range v.Streams {
  965. if media.CodecType == "video" {
  966. return &media
  967. }
  968. }
  969. return nil
  970. }
  971. func (v *ffprobeObject) Audio() *ffprobeObjectMedia {
  972. for _, media := range v.Streams {
  973. if media.CodecType == "audio" {
  974. return &media
  975. }
  976. }
  977. return nil
  978. }
  979. type HooksEvent interface {
  980. HookAction() string
  981. }
  982. type HooksEventBase struct {
  983. Action string `json:"action"`
  984. }
  985. func (v *HooksEventBase) HookAction() string {
  986. return v.Action
  987. }
  988. type HooksEventOnDvr struct {
  989. HooksEventBase
  990. Stream string `json:"stream"`
  991. StreamUrl string `json:"stream_url"`
  992. StreamID string `json:"stream_id"`
  993. CWD string `json:"cwd"`
  994. File string `json:"file"`
  995. TcUrl string `json:"tcUrl"`
  996. App string `json:"app"`
  997. Vhost string `json:"vhost"`
  998. IP string `json:"ip"`
  999. ClientIP string `json:"client_id"`
  1000. ServerID string `json:"server_id"`
  1001. }
  1002. type HooksService interface {
  1003. ServiceRunner
  1004. ServiceReadyQuerier
  1005. HooksAPI() int
  1006. HooksEvents() <-chan HooksEvent
  1007. }
  1008. type hooksService struct {
  1009. readyCtx context.Context
  1010. readyCancel context.CancelFunc
  1011. httpPort int
  1012. dispose func()
  1013. r0 error
  1014. hooksOnDvr chan HooksEvent
  1015. }
  1016. func NewHooksService(opts ...func(v *hooksService)) HooksService {
  1017. v := &hooksService{}
  1018. v.httpPort = allocator.Allocate()
  1019. v.dispose = func() {
  1020. allocator.Free(v.httpPort)
  1021. close(v.hooksOnDvr)
  1022. }
  1023. v.hooksOnDvr = make(chan HooksEvent, 64)
  1024. v.readyCtx, v.readyCancel = context.WithCancel(context.Background())
  1025. for _, opt := range opts {
  1026. opt(v)
  1027. }
  1028. return v
  1029. }
  1030. func (v *hooksService) ReadyCtx() context.Context {
  1031. return v.readyCtx
  1032. }
  1033. func (v *hooksService) HooksAPI() int {
  1034. return v.httpPort
  1035. }
  1036. func (v *hooksService) HooksEvents() <-chan HooksEvent {
  1037. return v.hooksOnDvr
  1038. }
  1039. func (v *hooksService) Run(ctx context.Context, cancel context.CancelFunc) error {
  1040. defer func() {
  1041. v.readyCancel()
  1042. v.dispose()
  1043. }()
  1044. handler := http.ServeMux{}
  1045. handler.HandleFunc("/api/v1/ping", func(w http.ResponseWriter, r *http.Request) {
  1046. ohttp.WriteData(ctx, w, r, "pong")
  1047. })
  1048. handler.HandleFunc("/api/v1/dvrs", func(w http.ResponseWriter, r *http.Request) {
  1049. b, err := ioutil.ReadAll(r.Body)
  1050. if err != nil {
  1051. ohttp.WriteError(ctx, w, r, err)
  1052. return
  1053. }
  1054. evt := HooksEventOnDvr{}
  1055. if err := json.Unmarshal(b, &evt); err != nil {
  1056. ohttp.WriteError(ctx, w, r, err)
  1057. return
  1058. }
  1059. select {
  1060. case <-ctx.Done():
  1061. case v.hooksOnDvr <- &evt:
  1062. }
  1063. logger.Tf(ctx, "Callback: Got on_dvr request %v", string(b))
  1064. ohttp.WriteData(ctx, w, r, nil)
  1065. })
  1066. server := &http.Server{Addr: fmt.Sprintf(":%v", v.httpPort), Handler: &handler}
  1067. var wg sync.WaitGroup
  1068. defer wg.Wait()
  1069. wg.Add(1)
  1070. go func() {
  1071. defer wg.Done()
  1072. logger.Tf(ctx, "Callback: Start hooks server, listen=%v", v.httpPort)
  1073. if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  1074. logger.Wf(ctx, "Callback: Service listen=%v, err %v", v.httpPort, err)
  1075. v.r0 = errors.Wrapf(err, "server listen=%v", v.httpPort)
  1076. cancel()
  1077. return
  1078. }
  1079. logger.Tf(ctx, "Callback: Hooks done, listen=%v", v.httpPort)
  1080. }()
  1081. wg.Add(1)
  1082. go func() {
  1083. defer wg.Done()
  1084. <-ctx.Done()
  1085. go server.Shutdown(context.Background())
  1086. }()
  1087. wg.Add(1)
  1088. go func() {
  1089. defer wg.Done()
  1090. for ctx.Err() == nil {
  1091. time.Sleep(100 * time.Millisecond)
  1092. r := fmt.Sprintf("http://localhost:%v/api/v1/ping", v.httpPort)
  1093. res, err := http.Get(r)
  1094. if err != nil {
  1095. continue
  1096. }
  1097. defer res.Body.Close()
  1098. b, err := ioutil.ReadAll(res.Body)
  1099. if err != nil {
  1100. continue
  1101. }
  1102. logger.Tf(ctx, "Callback: API is ready, %v %v", r, string(b))
  1103. v.readyCancel()
  1104. return
  1105. }
  1106. }()
  1107. wg.Wait()
  1108. return v.r0
  1109. }