2
0

server.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "fmt"
  7. "io/ioutil"
  8. "log"
  9. "net/http"
  10. "os"
  11. "os/exec"
  12. "path"
  13. "path/filepath"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. "time"
  18. )
  19. type SrsCommonResponse struct {
  20. Code int `json:"code"`
  21. Data interface{} `json:"data"`
  22. }
  23. func SrsWriteErrorResponse(w http.ResponseWriter, err error) {
  24. w.WriteHeader(http.StatusInternalServerError)
  25. w.Write([]byte(err.Error()))
  26. }
  27. func SrsWriteDataResponse(w http.ResponseWriter, data interface{}) {
  28. j, err := json.Marshal(data)
  29. if err != nil {
  30. SrsWriteErrorResponse(w, fmt.Errorf("marshal %v, err %v", err))
  31. return
  32. }
  33. w.Header().Set("Content-Type", "application/json")
  34. w.Write(j)
  35. }
  36. var StaticDir string
  37. var sw *SnapshotWorker
  38. // SrsCommonRequest is the common fields of request messages from SRS HTTP callback.
  39. type SrsCommonRequest struct {
  40. Action string `json:"action"`
  41. ClientId string `json:"client_id"`
  42. Ip string `json:"ip"`
  43. Vhost string `json:"vhost"`
  44. App string `json:"app"`
  45. }
  46. func (v *SrsCommonRequest) String() string {
  47. return fmt.Sprintf("action=%v, client_id=%v, ip=%v, vhost=%v", v.Action, v.ClientId, v.Ip, v.Vhost)
  48. }
  49. /*
  50. handle the clients requests: connect/disconnect vhost/app.
  51. for SRS hook: on_connect/on_close
  52. on_connect:
  53. when client connect to vhost/app, call the hook,
  54. the request in the POST data string is a object encode by json:
  55. {
  56. "action": "on_connect",
  57. "client_id": "9308h583",
  58. "ip": "192.168.1.10",
  59. "vhost": "video.test.com",
  60. "app": "live",
  61. "tcUrl": "rtmp://video.test.com/live?key=d2fa801d08e3f90ed1e1670e6e52651a",
  62. "pageUrl": "http://www.test.com/live.html"
  63. }
  64. on_close:
  65. when client close/disconnect to vhost/app/stream, call the hook,
  66. the request in the POST data string is a object encode by json:
  67. {
  68. "action": "on_close",
  69. "client_id": "9308h583",
  70. "ip": "192.168.1.10",
  71. "vhost": "video.test.com",
  72. "app": "live",
  73. "send_bytes": 10240,
  74. "recv_bytes": 10240
  75. }
  76. if valid, the hook must return HTTP code 200(Stauts OK) and response
  77. an int value specifies the error code(0 corresponding to success):
  78. 0
  79. */
  80. type SrsClientRequest struct {
  81. SrsCommonRequest
  82. // For on_connect message
  83. TcUrl string `json:"tcUrl"`
  84. PageUrl string `json:"pageUrl"`
  85. // For on_close message
  86. SendBytes int64 `json:"send_bytes"`
  87. RecvBytes int64 `json:"recv_bytes"`
  88. }
  89. func (v *SrsClientRequest) IsOnConnect() bool {
  90. return v.Action == "on_connect"
  91. }
  92. func (v *SrsClientRequest) IsOnClose() bool {
  93. return v.Action == "on_close"
  94. }
  95. func (v *SrsClientRequest) String() string {
  96. var sb strings.Builder
  97. sb.WriteString(v.SrsCommonRequest.String())
  98. if v.IsOnConnect() {
  99. sb.WriteString(fmt.Sprintf(", tcUrl=%v, pageUrl=%v", v.TcUrl, v.PageUrl))
  100. } else if v.IsOnClose() {
  101. sb.WriteString(fmt.Sprintf(", send_bytes=%v, recv_bytes=%v", v.SendBytes, v.RecvBytes))
  102. }
  103. return sb.String()
  104. }
  105. /*
  106. for SRS hook: on_publish/on_unpublish
  107. on_publish:
  108. when client(encoder) publish to vhost/app/stream, call the hook,
  109. the request in the POST data string is a object encode by json:
  110. {
  111. "action": "on_publish",
  112. "client_id": "9308h583",
  113. "ip": "192.168.1.10",
  114. "vhost": "video.test.com",
  115. "app": "live",
  116. "stream": "livestream",
  117. "param":"?token=xxx&salt=yyy"
  118. }
  119. on_unpublish:
  120. when client(encoder) stop publish to vhost/app/stream, call the hook,
  121. the request in the POST data string is a object encode by json:
  122. {
  123. "action": "on_unpublish",
  124. "client_id": "9308h583",
  125. "ip": "192.168.1.10",
  126. "vhost": "video.test.com",
  127. "app": "live",
  128. "stream": "livestream",
  129. "param":"?token=xxx&salt=yyy"
  130. }
  131. if valid, the hook must return HTTP code 200(Stauts OK) and response
  132. an int value specifies the error code(0 corresponding to success):
  133. 0
  134. */
  135. type SrsStreamRequest struct {
  136. SrsCommonRequest
  137. Stream string `json:"stream"`
  138. Param string `json:"param"`
  139. }
  140. func (v *SrsStreamRequest) String() string {
  141. var sb strings.Builder
  142. sb.WriteString(v.SrsCommonRequest.String())
  143. if v.IsOnPublish() || v.IsOnUnPublish() {
  144. sb.WriteString(fmt.Sprintf(", stream=%v, param=%v", v.Stream, v.Param))
  145. }
  146. return sb.String()
  147. }
  148. func (v *SrsStreamRequest) IsOnPublish() bool {
  149. return v.Action == "on_publish"
  150. }
  151. func (v *SrsStreamRequest) IsOnUnPublish() bool {
  152. return v.Action == "on_unpublish"
  153. }
  154. /*
  155. for SRS hook: on_play/on_stop
  156. on_play:
  157. when client(encoder) publish to vhost/app/stream, call the hook,
  158. the request in the POST data string is a object encode by json:
  159. {
  160. "action": "on_play",
  161. "client_id": "9308h583",
  162. "ip": "192.168.1.10",
  163. "vhost": "video.test.com",
  164. "app": "live",
  165. "stream": "livestream",
  166. "param":"?token=xxx&salt=yyy",
  167. "pageUrl": "http://www.test.com/live.html"
  168. }
  169. on_stop:
  170. when client(encoder) stop publish to vhost/app/stream, call the hook,
  171. the request in the POST data string is a object encode by json:
  172. {
  173. "action": "on_stop",
  174. "client_id": "9308h583",
  175. "ip": "192.168.1.10",
  176. "vhost": "video.test.com",
  177. "app": "live",
  178. "stream": "livestream",
  179. "param":"?token=xxx&salt=yyy"
  180. }
  181. if valid, the hook must return HTTP code 200(Stauts OK) and response
  182. an int value specifies the error code(0 corresponding to success):
  183. 0
  184. */
  185. type SrsSessionRequest struct {
  186. SrsCommonRequest
  187. Stream string `json:"stream"`
  188. Param string `json:"param"`
  189. // For on_play only.
  190. PageUrl string `json:"pageUrl"`
  191. }
  192. func (v *SrsSessionRequest) String() string {
  193. var sb strings.Builder
  194. sb.WriteString(v.SrsCommonRequest.String())
  195. if v.IsOnPlay() || v.IsOnStop() {
  196. sb.WriteString(fmt.Sprintf(", stream=%v, param=%v", v.Stream, v.Param))
  197. }
  198. if v.IsOnPlay() {
  199. sb.WriteString(fmt.Sprintf(", pageUrl=%v", v.PageUrl))
  200. }
  201. return sb.String()
  202. }
  203. func (v *SrsSessionRequest) IsOnPlay() bool {
  204. return v.Action == "on_play"
  205. }
  206. func (v *SrsSessionRequest) IsOnStop() bool {
  207. return v.Action == "on_stop"
  208. }
  209. /*
  210. for SRS hook: on_dvr
  211. on_dvr:
  212. when srs reap a dvr file, call the hook,
  213. the request in the POST data string is a object encode by json:
  214. {
  215. "action": "on_dvr",
  216. "client_id": "9308h583",
  217. "ip": "192.168.1.10",
  218. "vhost": "video.test.com",
  219. "app": "live",
  220. "stream": "livestream",
  221. "param":"?token=xxx&salt=yyy",
  222. "cwd": "/usr/local/srs",
  223. "file": "./objs/nginx/html/live/livestream.1420254068776.flv"
  224. }
  225. if valid, the hook must return HTTP code 200(Stauts OK) and response
  226. an int value specifies the error code(0 corresponding to success):
  227. 0
  228. */
  229. type SrsDvrRequest struct {
  230. SrsCommonRequest
  231. Stream string `json:"stream"`
  232. Param string `json:"param"`
  233. Cwd string `json:"cwd"`
  234. File string `json:"file"`
  235. }
  236. func (v *SrsDvrRequest) String() string {
  237. var sb strings.Builder
  238. sb.WriteString(v.SrsCommonRequest.String())
  239. if v.IsOnDvr() {
  240. sb.WriteString(fmt.Sprintf(", stream=%v, param=%v, cwd=%v, file=%v", v.Stream, v.Param, v.Cwd, v.File))
  241. }
  242. return sb.String()
  243. }
  244. func (v *SrsDvrRequest) IsOnDvr() bool {
  245. return v.Action == "on_dvr"
  246. }
  247. /*
  248. for SRS hook: on_hls_notify
  249. on_hls_notify:
  250. when srs reap a ts file of hls, call this hook,
  251. used to push file to cdn network, by get the ts file from cdn network.
  252. so we use HTTP GET and use the variable following:
  253. [app], replace with the app.
  254. [stream], replace with the stream.
  255. [param], replace with the param.
  256. [ts_url], replace with the ts url.
  257. ignore any return data of server.
  258. for SRS hook: on_hls
  259. on_hls:
  260. when srs reap a dvr file, call the hook,
  261. the request in the POST data string is a object encode by json:
  262. {
  263. "action": "on_hls",
  264. "client_id": "9308h583",
  265. "ip": "192.168.1.10",
  266. "vhost": "video.test.com",
  267. "app": "live",
  268. "stream": "livestream",
  269. "param":"?token=xxx&salt=yyy",
  270. "duration": 9.68, // in seconds
  271. "cwd": "/usr/local/srs",
  272. "file": "./objs/nginx/html/live/livestream.1420254068776-100.ts",
  273. "seq_no": 100
  274. }
  275. if valid, the hook must return HTTP code 200(Stauts OK) and response
  276. an int value specifies the error code(0 corresponding to success):
  277. 0
  278. */
  279. type SrsHlsRequest struct {
  280. SrsCommonRequest
  281. Stream string `json:"stream"`
  282. Param string `json:"param"`
  283. Duration float64 `json:"duration"`
  284. Cwd string `json:"cwd"`
  285. File string `json:"file"`
  286. SeqNo int `json:"seq_no"`
  287. }
  288. func (v *SrsHlsRequest) String() string {
  289. var sb strings.Builder
  290. sb.WriteString(v.SrsCommonRequest.String())
  291. if v.IsOnHls() {
  292. sb.WriteString(fmt.Sprintf(", stream=%v, param=%v, cwd=%v, file=%v, duration=%v, seq_no=%v", v.Stream, v.Param, v.Cwd, v.File, v.Duration, v.SeqNo))
  293. }
  294. return sb.String()
  295. }
  296. func (v *SrsHlsRequest) IsOnHls() bool {
  297. return v.Action == "on_hls"
  298. }
  299. /*
  300. the snapshot api,
  301. to start a snapshot when encoder start publish stream,
  302. stop the snapshot worker when stream finished.
  303. {"action":"on_publish","client_id":108,"ip":"127.0.0.1","vhost":"__defaultVhost__","app":"live","stream":"livestream"}
  304. {"action":"on_unpublish","client_id":108,"ip":"127.0.0.1","vhost":"__defaultVhost__","app":"live","stream":"livestream"}
  305. */
  306. type SrsSnapShotRequest struct {
  307. SrsCommonRequest
  308. Stream string `json:"stream"`
  309. }
  310. func (v *SrsSnapShotRequest) String() string {
  311. var sb strings.Builder
  312. sb.WriteString(v.SrsCommonRequest.String())
  313. if v.IsOnPublish() || v.IsOnUnPublish() {
  314. sb.WriteString(fmt.Sprintf(", stream=%v", v.Stream))
  315. }
  316. return sb.String()
  317. }
  318. func (v *SrsSnapShotRequest) IsOnPublish() bool {
  319. return v.Action == "on_publish"
  320. }
  321. func (v *SrsSnapShotRequest) IsOnUnPublish() bool {
  322. return v.Action == "on_unpublish"
  323. }
  324. type SnapshotJob struct {
  325. SrsSnapShotRequest
  326. updatedAt time.Time
  327. cancelCtx context.Context
  328. cancelFunc context.CancelFunc
  329. vframes int
  330. timeout time.Duration
  331. }
  332. func NewSnapshotJob() *SnapshotJob {
  333. v := &SnapshotJob{
  334. vframes: 5,
  335. timeout: time.Duration(30) * time.Second,
  336. }
  337. v.cancelCtx, v.cancelFunc = context.WithCancel(context.Background())
  338. return v
  339. }
  340. func (v *SnapshotJob) Tag() string {
  341. return fmt.Sprintf("%v/%v/%v", v.Vhost, v.App, v.Stream)
  342. }
  343. func (v *SnapshotJob) Abort() {
  344. v.cancelFunc()
  345. log.Println(fmt.Sprintf("cancel snapshot job %v", v.Tag()))
  346. }
  347. /*
  348. ./objs/ffmpeg/bin/ffmpeg -i rtmp://127.0.0.1/live/livestream \
  349. -vf fps=1 -vcodec png -f image2 -an -vframes 5 \
  350. -y static-dir/live/livestream-%03d.png
  351. */
  352. func (v *SnapshotJob) do(ffmpegPath, inputUrl string) (err error) {
  353. outputPicDir := path.Join(StaticDir, v.App)
  354. if err = os.MkdirAll(outputPicDir, 0777); err != nil {
  355. log.Println(fmt.Sprintf("create snapshot image dir:%v failed, err is %v", outputPicDir, err))
  356. return
  357. }
  358. normalPicPath := path.Join(outputPicDir, fmt.Sprintf("%v", v.Stream)+"-%03d.png")
  359. bestPng := path.Join(outputPicDir, fmt.Sprintf("%v-best.png", v.Stream))
  360. params := []string{
  361. "-i", inputUrl,
  362. "-vf", "fps=1",
  363. "-vcodec", "png",
  364. "-f", "image2",
  365. "-an",
  366. "-vframes", strconv.Itoa(v.vframes),
  367. "-y", normalPicPath,
  368. }
  369. log.Println(fmt.Sprintf("start snapshot, cmd param=%v %v", ffmpegPath, strings.Join(params, " ")))
  370. timeoutCtx, _ := context.WithTimeout(v.cancelCtx, v.timeout)
  371. cmd := exec.CommandContext(timeoutCtx, ffmpegPath, params...)
  372. if err = cmd.Run(); err != nil {
  373. log.Println(fmt.Sprintf("run snapshot %v cmd failed, err is %v", v.Tag(), err))
  374. return
  375. }
  376. bestFileSize := int64(0)
  377. for i := 1; i <= v.vframes; i++ {
  378. pic := path.Join(outputPicDir, fmt.Sprintf("%v-%03d.png", v.Stream, i))
  379. fi, err := os.Stat(pic)
  380. if err != nil {
  381. log.Println(fmt.Sprintf("stat pic:%v failed, err is %v", pic, err))
  382. continue
  383. }
  384. if bestFileSize == 0 {
  385. bestFileSize = fi.Size()
  386. } else if fi.Size() > bestFileSize {
  387. os.Remove(bestPng)
  388. os.Symlink(pic, bestPng)
  389. bestFileSize = fi.Size()
  390. }
  391. }
  392. log.Println(fmt.Sprintf("%v the best thumbnail is %v", v.Tag(), bestPng))
  393. return
  394. }
  395. func (v *SnapshotJob) Serve(ffmpegPath, inputUrl string) {
  396. sleep := time.Duration(1) * time.Second
  397. for {
  398. v.do(ffmpegPath, inputUrl)
  399. select {
  400. case <-time.After(sleep):
  401. log.Println(fmt.Sprintf("%v sleep %v to redo snapshot", v.Tag(), sleep))
  402. break
  403. case <-v.cancelCtx.Done():
  404. log.Println(fmt.Sprintf("snapshot job %v cancelled", v.Tag()))
  405. return
  406. }
  407. }
  408. }
  409. type SnapshotWorker struct {
  410. snapshots *sync.Map // key is stream url
  411. ffmpegPath string
  412. }
  413. func NewSnapshotWorker(ffmpegPath string) *SnapshotWorker {
  414. sw := &SnapshotWorker{
  415. snapshots: new(sync.Map),
  416. ffmpegPath: ffmpegPath,
  417. }
  418. return sw
  419. }
  420. func (v *SnapshotWorker) Create(sm *SrsSnapShotRequest) {
  421. streamUrl := fmt.Sprintf("rtmp://127.0.0.1/%v/%v?vhost=%v", sm.App, sm.Stream, sm.Vhost)
  422. if _, ok := v.snapshots.Load(streamUrl); ok {
  423. return
  424. }
  425. sj := NewSnapshotJob()
  426. sj.SrsSnapShotRequest = *sm
  427. sj.updatedAt = time.Now()
  428. go sj.Serve(v.ffmpegPath, streamUrl)
  429. v.snapshots.Store(streamUrl, sj)
  430. }
  431. func (v *SnapshotWorker) Destroy(sm *SrsSnapShotRequest) {
  432. streamUrl := fmt.Sprintf("rtmp://127.0.0.1/%v/%v?vhost=%v", sm.App, sm.Stream, sm.Vhost)
  433. value, ok := v.snapshots.Load(streamUrl)
  434. if ok {
  435. sj := value.(*SnapshotJob)
  436. sj.Abort()
  437. v.snapshots.Delete(streamUrl)
  438. log.Println(fmt.Sprintf("set stream:%v to destroy, update abort", sm.Stream))
  439. } else {
  440. log.Println(fmt.Sprintf("cannot find stream:%v in snapshot worker", streamUrl))
  441. }
  442. return
  443. }
  444. /*
  445. handle the forward requests: dynamic forward url.
  446. for SRS hook: on_forward
  447. on_forward:
  448. when srs reap a dvr file, call the hook,
  449. the request in the POST data string is a object encode by json:
  450. {
  451. "action": "on_forward",
  452. "server_id": "server_test",
  453. "client_id": 1985,
  454. "ip": "192.168.1.10",
  455. "vhost": "video.test.com",
  456. "app": "live",
  457. "tcUrl": "rtmp://video.test.com/live?key=d2fa801d08e3f90ed1e1670e6e52651a",
  458. "stream": "livestream",
  459. "param":"?token=xxx&salt=yyy"
  460. }
  461. if valid, the hook must return HTTP code 200(Stauts OK) and response
  462. an int value specifies the error code(0 corresponding to success):
  463. 0
  464. */
  465. type SrsForwardRequest struct {
  466. SrsCommonRequest
  467. TcUrl string `json:"tc_url"`
  468. Stream string `json:"stream"`
  469. Param string `json:"param"`
  470. }
  471. func (v *SrsForwardRequest) String() string {
  472. var sb strings.Builder
  473. sb.WriteString(v.SrsCommonRequest.String())
  474. if v.IsOnForward() {
  475. sb.WriteString(fmt.Sprintf(", tcUrl=%v, stream=%v, param=%v", v.TcUrl, v.Stream, v.Param))
  476. }
  477. return sb.String()
  478. }
  479. func (v *SrsForwardRequest) IsOnForward() bool {
  480. return v.Action == "on_forward"
  481. }
  482. func main() {
  483. srsBin := os.Args[0]
  484. if strings.HasPrefix(srsBin, "/var") {
  485. srsBin = "go run ."
  486. }
  487. var port int
  488. var ffmpegPath string
  489. flag.IntVar(&port, "p", 8085, "HTTP listen port. Default is 8085")
  490. flag.StringVar(&StaticDir, "s", "./static-dir", "HTML home for snapshot. Default is ./static-dir")
  491. flag.StringVar(&ffmpegPath, "ffmpeg", "/usr/local/bin/ffmpeg", "FFmpeg for snapshot. Default is /usr/local/bin/ffmpeg")
  492. flag.Usage = func() {
  493. fmt.Println("A demo api-server for SRS\n")
  494. fmt.Println(fmt.Sprintf("Usage: %v [flags]", srsBin))
  495. flag.PrintDefaults()
  496. fmt.Println(fmt.Sprintf("For example:"))
  497. fmt.Println(fmt.Sprintf(" %v -p 8085", srsBin))
  498. fmt.Println(fmt.Sprintf(" %v 8085", srsBin))
  499. }
  500. flag.Parse()
  501. log.SetFlags(log.Lshortfile | log.Ldate | log.Ltime | log.Lmicroseconds)
  502. // check if only one number arg
  503. if len(os.Args[1:]) == 1 {
  504. portArg := os.Args[1]
  505. var err error
  506. if port, err = strconv.Atoi(portArg); err != nil {
  507. log.Println(fmt.Sprintf("parse port arg:%v to int failed, err %v", portArg, err))
  508. flag.Usage()
  509. os.Exit(1)
  510. }
  511. }
  512. sw = NewSnapshotWorker(ffmpegPath)
  513. StaticDir, err := filepath.Abs(StaticDir)
  514. if err != nil {
  515. panic(err)
  516. }
  517. log.Println(fmt.Sprintf("api server listen at port:%v, static_dir:%v", port, StaticDir))
  518. http.Handle("/", http.FileServer(http.Dir(StaticDir)))
  519. http.HandleFunc("/api/v1", func(writer http.ResponseWriter, request *http.Request) {
  520. res := &struct {
  521. Code int `json:"code"`
  522. Urls struct {
  523. Clients string `json:"clients"`
  524. Streams string `json:"streams"`
  525. Sessions string `json:"sessions"`
  526. Dvrs string `json:"dvrs"`
  527. Chats string `json:"chats"`
  528. Servers struct {
  529. Summary string `json:"summary"`
  530. Get string `json:"GET"`
  531. Post string `json:"POST ip=node_ip&device_id=device_id"`
  532. }
  533. } `json:"urls"`
  534. }{
  535. Code: 0,
  536. }
  537. res.Urls.Clients = "for srs http callback, to handle the clients requests: connect/disconnect vhost/app."
  538. res.Urls.Streams = "for srs http callback, to handle the streams requests: publish/unpublish stream."
  539. res.Urls.Sessions = "for srs http callback, to handle the sessions requests: client play/stop stream."
  540. res.Urls.Dvrs = "for srs http callback, to handle the dvr requests: dvr stream."
  541. //res.Urls.Chats = "for srs demo meeting, the chat streams, public chat room."
  542. res.Urls.Servers.Summary = "for srs raspberry-pi and meeting demo."
  543. res.Urls.Servers.Get = "get the current raspberry-pi servers info."
  544. res.Urls.Servers.Post = "the new raspberry-pi server info."
  545. // TODO: no snapshots
  546. body, _ := json.Marshal(res)
  547. writer.Write(body)
  548. })
  549. // handle the clients requests: connect/disconnect vhost/app.
  550. http.HandleFunc("/api/v1/clients", func(w http.ResponseWriter, r *http.Request) {
  551. if r.Method != "POST" {
  552. SrsWriteDataResponse(w, struct{}{})
  553. return
  554. }
  555. if err := func() error {
  556. body, err := ioutil.ReadAll(r.Body)
  557. if err != nil {
  558. return fmt.Errorf("read request body, err %v", err)
  559. }
  560. log.Println(fmt.Sprintf("post to clients, req=%v", string(body)))
  561. msg := &SrsClientRequest{}
  562. if err := json.Unmarshal(body, msg); err != nil {
  563. return fmt.Errorf("parse message from %v, err %v", string(body), err)
  564. }
  565. log.Println(fmt.Sprintf("Got %v", msg.String()))
  566. if !msg.IsOnConnect() && !msg.IsOnClose() {
  567. return fmt.Errorf("invalid message %v", msg.String())
  568. }
  569. SrsWriteDataResponse(w, &SrsCommonResponse{Code: 0})
  570. return nil
  571. }(); err != nil {
  572. SrsWriteErrorResponse(w, err)
  573. }
  574. })
  575. // handle the streams requests: publish/unpublish stream.
  576. http.HandleFunc("/api/v1/streams", func(w http.ResponseWriter, r *http.Request) {
  577. if r.Method != "POST" {
  578. SrsWriteDataResponse(w, struct{}{})
  579. return
  580. }
  581. if err := func() error {
  582. body, err := ioutil.ReadAll(r.Body)
  583. if err != nil {
  584. return fmt.Errorf("read request body, err %v", err)
  585. }
  586. log.Println(fmt.Sprintf("post to streams, req=%v", string(body)))
  587. msg := &SrsStreamRequest{}
  588. if err := json.Unmarshal(body, msg); err != nil {
  589. return fmt.Errorf("parse message from %v, err %v", string(body), err)
  590. }
  591. log.Println(fmt.Sprintf("Got %v", msg.String()))
  592. if !msg.IsOnPublish() && !msg.IsOnUnPublish() {
  593. return fmt.Errorf("invalid message %v", msg.String())
  594. }
  595. SrsWriteDataResponse(w, &SrsCommonResponse{Code: 0})
  596. return nil
  597. }(); err != nil {
  598. SrsWriteErrorResponse(w, err)
  599. }
  600. })
  601. // handle the sessions requests: client play/stop stream
  602. http.HandleFunc("/api/v1/sessions", func(w http.ResponseWriter, r *http.Request) {
  603. if r.Method != "POST" {
  604. SrsWriteDataResponse(w, struct{}{})
  605. return
  606. }
  607. if err := func() error {
  608. body, err := ioutil.ReadAll(r.Body)
  609. if err != nil {
  610. return fmt.Errorf("read request body, err %v", err)
  611. }
  612. log.Println(fmt.Sprintf("post to sessions, req=%v", string(body)))
  613. msg := &SrsSessionRequest{}
  614. if err := json.Unmarshal(body, msg); err != nil {
  615. return fmt.Errorf("parse message from %v, err %v", string(body), err)
  616. }
  617. log.Println(fmt.Sprintf("Got %v", msg.String()))
  618. if !msg.IsOnPlay() && !msg.IsOnStop() {
  619. return fmt.Errorf("invalid message %v", msg.String())
  620. }
  621. SrsWriteDataResponse(w, &SrsCommonResponse{Code: 0})
  622. return nil
  623. }(); err != nil {
  624. SrsWriteErrorResponse(w, err)
  625. }
  626. })
  627. // handle the dvrs requests: dvr stream.
  628. http.HandleFunc("/api/v1/dvrs", func(w http.ResponseWriter, r *http.Request) {
  629. if r.Method != "POST" {
  630. SrsWriteDataResponse(w, struct{}{})
  631. return
  632. }
  633. if err := func() error {
  634. body, err := ioutil.ReadAll(r.Body)
  635. if err != nil {
  636. return fmt.Errorf("read request body, err %v", err)
  637. }
  638. log.Println(fmt.Sprintf("post to dvrs, req=%v", string(body)))
  639. msg := &SrsDvrRequest{}
  640. if err := json.Unmarshal(body, msg); err != nil {
  641. return fmt.Errorf("parse message from %v, err %v", string(body), err)
  642. }
  643. log.Println(fmt.Sprintf("Got %v", msg.String()))
  644. if !msg.IsOnDvr() {
  645. return fmt.Errorf("invalid message %v", msg.String())
  646. }
  647. SrsWriteDataResponse(w, &SrsCommonResponse{Code: 0})
  648. return nil
  649. }(); err != nil {
  650. SrsWriteErrorResponse(w, err)
  651. }
  652. })
  653. // handle the dvrs requests: on_hls stream.
  654. http.HandleFunc("/api/v1/hls", func(w http.ResponseWriter, r *http.Request) {
  655. if r.Method != "POST" {
  656. SrsWriteDataResponse(w, struct{}{})
  657. return
  658. }
  659. if err := func() error {
  660. body, err := ioutil.ReadAll(r.Body)
  661. if err != nil {
  662. return fmt.Errorf("read request body, err %v", err)
  663. }
  664. log.Println(fmt.Sprintf("post to hls, req=%v", string(body)))
  665. msg := &SrsHlsRequest{}
  666. if err := json.Unmarshal(body, msg); err != nil {
  667. return fmt.Errorf("parse message from %v, err %v", string(body), err)
  668. }
  669. log.Println(fmt.Sprintf("Got %v", msg.String()))
  670. if !msg.IsOnHls() {
  671. return fmt.Errorf("invalid message %v", msg.String())
  672. }
  673. SrsWriteDataResponse(w, &SrsCommonResponse{Code: 0})
  674. return nil
  675. }(); err != nil {
  676. SrsWriteErrorResponse(w, err)
  677. }
  678. })
  679. // not support yet
  680. http.HandleFunc("/api/v1/chat", func(w http.ResponseWriter, r *http.Request) {
  681. SrsWriteErrorResponse(w, fmt.Errorf("not implemented"))
  682. })
  683. http.HandleFunc("/api/v1/snapshots", func(w http.ResponseWriter, r *http.Request) {
  684. if r.Method != "POST" {
  685. SrsWriteDataResponse(w, struct{}{})
  686. return
  687. }
  688. if err := func() error {
  689. body, err := ioutil.ReadAll(r.Body)
  690. if err != nil {
  691. return fmt.Errorf("read request body, err %v", err)
  692. }
  693. log.Println(fmt.Sprintf("post to snapshots, req=%v", string(body)))
  694. msg := &SrsSnapShotRequest{}
  695. if err := json.Unmarshal(body, msg); err != nil {
  696. return fmt.Errorf("parse message from %v, err %v", string(body), err)
  697. }
  698. log.Println(fmt.Sprintf("Got %v", msg.String()))
  699. if msg.IsOnPublish() {
  700. sw.Create(msg)
  701. } else if msg.IsOnUnPublish() {
  702. sw.Destroy(msg)
  703. } else {
  704. return fmt.Errorf("invalid message %v", msg.String())
  705. }
  706. SrsWriteDataResponse(w, &SrsCommonResponse{Code: 0})
  707. return nil
  708. }(); err != nil {
  709. SrsWriteErrorResponse(w, err)
  710. }
  711. })
  712. // handle the dynamic forward requests: on_forward stream.
  713. http.HandleFunc("/api/v1/forward", func(w http.ResponseWriter, r *http.Request) {
  714. if r.Method != "POST" {
  715. SrsWriteDataResponse(w, struct{}{})
  716. return
  717. }
  718. if err := func() error {
  719. body, err := ioutil.ReadAll(r.Body)
  720. if err != nil {
  721. return fmt.Errorf("read request body, err %v", err)
  722. }
  723. log.Println(fmt.Sprintf("post to forward, req=%v", string(body)))
  724. msg := &SrsForwardRequest{}
  725. if err := json.Unmarshal(body, msg); err != nil {
  726. return fmt.Errorf("parse message from %v, err %v", string(body), err)
  727. }
  728. log.Println(fmt.Sprintf("Got %v", msg.String()))
  729. if !msg.IsOnForward() {
  730. return fmt.Errorf("invalid message %v", msg.String())
  731. }
  732. SrsWriteDataResponse(w, &SrsCommonResponse{Code: 0, Data: &struct {
  733. Urls []string `json:"urls"`
  734. }{
  735. Urls: []string{"rtmp://127.0.0.1:19350/test/teststream"},
  736. }})
  737. return nil
  738. }(); err != nil {
  739. SrsWriteErrorResponse(w, err)
  740. }
  741. })
  742. addr := fmt.Sprintf(":%v", port)
  743. log.Println(fmt.Sprintf("start listen on:%v", addr))
  744. if err := http.ListenAndServe(addr, nil); err != nil {
  745. log.Println(fmt.Sprintf("listen on addr:%v failed, err is %v", addr, err))
  746. }
  747. }