123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843 |
- package main
- import (
- "context"
- "encoding/json"
- "flag"
- "fmt"
- "io/ioutil"
- "log"
- "net/http"
- "os"
- "os/exec"
- "path"
- "path/filepath"
- "strconv"
- "strings"
- "sync"
- "time"
- )
- type SrsCommonResponse struct {
- Code int `json:"code"`
- Data interface{} `json:"data"`
- }
- func SrsWriteErrorResponse(w http.ResponseWriter, err error) {
- w.WriteHeader(http.StatusInternalServerError)
- w.Write([]byte(err.Error()))
- }
- func SrsWriteDataResponse(w http.ResponseWriter, data interface{}) {
- j, err := json.Marshal(data)
- if err != nil {
- SrsWriteErrorResponse(w, fmt.Errorf("marshal %v, err %v", err))
- return
- }
- w.Header().Set("Content-Type", "application/json")
- w.Write(j)
- }
- var StaticDir string
- var sw *SnapshotWorker
- type SrsCommonRequest struct {
- Action string `json:"action"`
- ClientId string `json:"client_id"`
- Ip string `json:"ip"`
- Vhost string `json:"vhost"`
- App string `json:"app"`
- }
- func (v *SrsCommonRequest) String() string {
- return fmt.Sprintf("action=%v, client_id=%v, ip=%v, vhost=%v", v.Action, v.ClientId, v.Ip, v.Vhost)
- }
- type SrsClientRequest struct {
- SrsCommonRequest
-
- TcUrl string `json:"tcUrl"`
- PageUrl string `json:"pageUrl"`
-
- SendBytes int64 `json:"send_bytes"`
- RecvBytes int64 `json:"recv_bytes"`
- }
- func (v *SrsClientRequest) IsOnConnect() bool {
- return v.Action == "on_connect"
- }
- func (v *SrsClientRequest) IsOnClose() bool {
- return v.Action == "on_close"
- }
- func (v *SrsClientRequest) String() string {
- var sb strings.Builder
- sb.WriteString(v.SrsCommonRequest.String())
- if v.IsOnConnect() {
- sb.WriteString(fmt.Sprintf(", tcUrl=%v, pageUrl=%v", v.TcUrl, v.PageUrl))
- } else if v.IsOnClose() {
- sb.WriteString(fmt.Sprintf(", send_bytes=%v, recv_bytes=%v", v.SendBytes, v.RecvBytes))
- }
- return sb.String()
- }
- type SrsStreamRequest struct {
- SrsCommonRequest
- Stream string `json:"stream"`
- Param string `json:"param"`
- }
- func (v *SrsStreamRequest) String() string {
- var sb strings.Builder
- sb.WriteString(v.SrsCommonRequest.String())
- if v.IsOnPublish() || v.IsOnUnPublish() {
- sb.WriteString(fmt.Sprintf(", stream=%v, param=%v", v.Stream, v.Param))
- }
- return sb.String()
- }
- func (v *SrsStreamRequest) IsOnPublish() bool {
- return v.Action == "on_publish"
- }
- func (v *SrsStreamRequest) IsOnUnPublish() bool {
- return v.Action == "on_unpublish"
- }
- type SrsSessionRequest struct {
- SrsCommonRequest
- Stream string `json:"stream"`
- Param string `json:"param"`
-
- PageUrl string `json:"pageUrl"`
- }
- func (v *SrsSessionRequest) String() string {
- var sb strings.Builder
- sb.WriteString(v.SrsCommonRequest.String())
- if v.IsOnPlay() || v.IsOnStop() {
- sb.WriteString(fmt.Sprintf(", stream=%v, param=%v", v.Stream, v.Param))
- }
- if v.IsOnPlay() {
- sb.WriteString(fmt.Sprintf(", pageUrl=%v", v.PageUrl))
- }
- return sb.String()
- }
- func (v *SrsSessionRequest) IsOnPlay() bool {
- return v.Action == "on_play"
- }
- func (v *SrsSessionRequest) IsOnStop() bool {
- return v.Action == "on_stop"
- }
- type SrsDvrRequest struct {
- SrsCommonRequest
- Stream string `json:"stream"`
- Param string `json:"param"`
- Cwd string `json:"cwd"`
- File string `json:"file"`
- }
- func (v *SrsDvrRequest) String() string {
- var sb strings.Builder
- sb.WriteString(v.SrsCommonRequest.String())
- if v.IsOnDvr() {
- sb.WriteString(fmt.Sprintf(", stream=%v, param=%v, cwd=%v, file=%v", v.Stream, v.Param, v.Cwd, v.File))
- }
- return sb.String()
- }
- func (v *SrsDvrRequest) IsOnDvr() bool {
- return v.Action == "on_dvr"
- }
- type SrsHlsRequest struct {
- SrsCommonRequest
- Stream string `json:"stream"`
- Param string `json:"param"`
- Duration float64 `json:"duration"`
- Cwd string `json:"cwd"`
- File string `json:"file"`
- SeqNo int `json:"seq_no"`
- }
- func (v *SrsHlsRequest) String() string {
- var sb strings.Builder
- sb.WriteString(v.SrsCommonRequest.String())
- if v.IsOnHls() {
- sb.WriteString(fmt.Sprintf(", stream=%v, param=%v, cwd=%v, file=%v, duration=%v, seq_no=%v", v.Stream, v.Param, v.Cwd, v.File, v.Duration, v.SeqNo))
- }
- return sb.String()
- }
- func (v *SrsHlsRequest) IsOnHls() bool {
- return v.Action == "on_hls"
- }
- type SrsSnapShotRequest struct {
- SrsCommonRequest
- Stream string `json:"stream"`
- }
- func (v *SrsSnapShotRequest) String() string {
- var sb strings.Builder
- sb.WriteString(v.SrsCommonRequest.String())
- if v.IsOnPublish() || v.IsOnUnPublish() {
- sb.WriteString(fmt.Sprintf(", stream=%v", v.Stream))
- }
- return sb.String()
- }
- func (v *SrsSnapShotRequest) IsOnPublish() bool {
- return v.Action == "on_publish"
- }
- func (v *SrsSnapShotRequest) IsOnUnPublish() bool {
- return v.Action == "on_unpublish"
- }
- type SnapshotJob struct {
- SrsSnapShotRequest
- updatedAt time.Time
- cancelCtx context.Context
- cancelFunc context.CancelFunc
- vframes int
- timeout time.Duration
- }
- func NewSnapshotJob() *SnapshotJob {
- v := &SnapshotJob{
- vframes: 5,
- timeout: time.Duration(30) * time.Second,
- }
- v.cancelCtx, v.cancelFunc = context.WithCancel(context.Background())
- return v
- }
- func (v *SnapshotJob) Tag() string {
- return fmt.Sprintf("%v/%v/%v", v.Vhost, v.App, v.Stream)
- }
- func (v *SnapshotJob) Abort() {
- v.cancelFunc()
- log.Println(fmt.Sprintf("cancel snapshot job %v", v.Tag()))
- }
- func (v *SnapshotJob) do(ffmpegPath, inputUrl string) (err error) {
- outputPicDir := path.Join(StaticDir, v.App)
- if err = os.MkdirAll(outputPicDir, 0777); err != nil {
- log.Println(fmt.Sprintf("create snapshot image dir:%v failed, err is %v", outputPicDir, err))
- return
- }
- normalPicPath := path.Join(outputPicDir, fmt.Sprintf("%v", v.Stream)+"-%03d.png")
- bestPng := path.Join(outputPicDir, fmt.Sprintf("%v-best.png", v.Stream))
- params := []string{
- "-i", inputUrl,
- "-vf", "fps=1",
- "-vcodec", "png",
- "-f", "image2",
- "-an",
- "-vframes", strconv.Itoa(v.vframes),
- "-y", normalPicPath,
- }
- log.Println(fmt.Sprintf("start snapshot, cmd param=%v %v", ffmpegPath, strings.Join(params, " ")))
- timeoutCtx, _ := context.WithTimeout(v.cancelCtx, v.timeout)
- cmd := exec.CommandContext(timeoutCtx, ffmpegPath, params...)
- if err = cmd.Run(); err != nil {
- log.Println(fmt.Sprintf("run snapshot %v cmd failed, err is %v", v.Tag(), err))
- return
- }
- bestFileSize := int64(0)
- for i := 1; i <= v.vframes; i++ {
- pic := path.Join(outputPicDir, fmt.Sprintf("%v-%03d.png", v.Stream, i))
- fi, err := os.Stat(pic)
- if err != nil {
- log.Println(fmt.Sprintf("stat pic:%v failed, err is %v", pic, err))
- continue
- }
- if bestFileSize == 0 {
- bestFileSize = fi.Size()
- } else if fi.Size() > bestFileSize {
- os.Remove(bestPng)
- os.Symlink(pic, bestPng)
- bestFileSize = fi.Size()
- }
- }
- log.Println(fmt.Sprintf("%v the best thumbnail is %v", v.Tag(), bestPng))
- return
- }
- func (v *SnapshotJob) Serve(ffmpegPath, inputUrl string) {
- sleep := time.Duration(1) * time.Second
- for {
- v.do(ffmpegPath, inputUrl)
- select {
- case <-time.After(sleep):
- log.Println(fmt.Sprintf("%v sleep %v to redo snapshot", v.Tag(), sleep))
- break
- case <-v.cancelCtx.Done():
- log.Println(fmt.Sprintf("snapshot job %v cancelled", v.Tag()))
- return
- }
- }
- }
- type SnapshotWorker struct {
- snapshots *sync.Map
- ffmpegPath string
- }
- func NewSnapshotWorker(ffmpegPath string) *SnapshotWorker {
- sw := &SnapshotWorker{
- snapshots: new(sync.Map),
- ffmpegPath: ffmpegPath,
- }
- return sw
- }
- func (v *SnapshotWorker) Create(sm *SrsSnapShotRequest) {
- streamUrl := fmt.Sprintf("rtmp://127.0.0.1/%v/%v?vhost=%v", sm.App, sm.Stream, sm.Vhost)
- if _, ok := v.snapshots.Load(streamUrl); ok {
- return
- }
- sj := NewSnapshotJob()
- sj.SrsSnapShotRequest = *sm
- sj.updatedAt = time.Now()
- go sj.Serve(v.ffmpegPath, streamUrl)
- v.snapshots.Store(streamUrl, sj)
- }
- func (v *SnapshotWorker) Destroy(sm *SrsSnapShotRequest) {
- streamUrl := fmt.Sprintf("rtmp://127.0.0.1/%v/%v?vhost=%v", sm.App, sm.Stream, sm.Vhost)
- value, ok := v.snapshots.Load(streamUrl)
- if ok {
- sj := value.(*SnapshotJob)
- sj.Abort()
- v.snapshots.Delete(streamUrl)
- log.Println(fmt.Sprintf("set stream:%v to destroy, update abort", sm.Stream))
- } else {
- log.Println(fmt.Sprintf("cannot find stream:%v in snapshot worker", streamUrl))
- }
- return
- }
- type SrsForwardRequest struct {
- SrsCommonRequest
- TcUrl string `json:"tc_url"`
- Stream string `json:"stream"`
- Param string `json:"param"`
- }
- func (v *SrsForwardRequest) String() string {
- var sb strings.Builder
- sb.WriteString(v.SrsCommonRequest.String())
- if v.IsOnForward() {
- sb.WriteString(fmt.Sprintf(", tcUrl=%v, stream=%v, param=%v", v.TcUrl, v.Stream, v.Param))
- }
- return sb.String()
- }
- func (v *SrsForwardRequest) IsOnForward() bool {
- return v.Action == "on_forward"
- }
- func main() {
- srsBin := os.Args[0]
- if strings.HasPrefix(srsBin, "/var") {
- srsBin = "go run ."
- }
- var port int
- var ffmpegPath string
- flag.IntVar(&port, "p", 8085, "HTTP listen port. Default is 8085")
- flag.StringVar(&StaticDir, "s", "./static-dir", "HTML home for snapshot. Default is ./static-dir")
- flag.StringVar(&ffmpegPath, "ffmpeg", "/usr/local/bin/ffmpeg", "FFmpeg for snapshot. Default is /usr/local/bin/ffmpeg")
- flag.Usage = func() {
- fmt.Println("A demo api-server for SRS\n")
- fmt.Println(fmt.Sprintf("Usage: %v [flags]", srsBin))
- flag.PrintDefaults()
- fmt.Println(fmt.Sprintf("For example:"))
- fmt.Println(fmt.Sprintf(" %v -p 8085", srsBin))
- fmt.Println(fmt.Sprintf(" %v 8085", srsBin))
- }
- flag.Parse()
- log.SetFlags(log.Lshortfile | log.Ldate | log.Ltime | log.Lmicroseconds)
-
- if len(os.Args[1:]) == 1 {
- portArg := os.Args[1]
- var err error
- if port, err = strconv.Atoi(portArg); err != nil {
- log.Println(fmt.Sprintf("parse port arg:%v to int failed, err %v", portArg, err))
- flag.Usage()
- os.Exit(1)
- }
- }
- sw = NewSnapshotWorker(ffmpegPath)
- StaticDir, err := filepath.Abs(StaticDir)
- if err != nil {
- panic(err)
- }
- log.Println(fmt.Sprintf("api server listen at port:%v, static_dir:%v", port, StaticDir))
- http.Handle("/", http.FileServer(http.Dir(StaticDir)))
- http.HandleFunc("/api/v1", func(writer http.ResponseWriter, request *http.Request) {
- res := &struct {
- Code int `json:"code"`
- Urls struct {
- Clients string `json:"clients"`
- Streams string `json:"streams"`
- Sessions string `json:"sessions"`
- Dvrs string `json:"dvrs"`
- Chats string `json:"chats"`
- Servers struct {
- Summary string `json:"summary"`
- Get string `json:"GET"`
- Post string `json:"POST ip=node_ip&device_id=device_id"`
- }
- } `json:"urls"`
- }{
- Code: 0,
- }
- res.Urls.Clients = "for srs http callback, to handle the clients requests: connect/disconnect vhost/app."
- res.Urls.Streams = "for srs http callback, to handle the streams requests: publish/unpublish stream."
- res.Urls.Sessions = "for srs http callback, to handle the sessions requests: client play/stop stream."
- res.Urls.Dvrs = "for srs http callback, to handle the dvr requests: dvr stream."
-
- res.Urls.Servers.Summary = "for srs raspberry-pi and meeting demo."
- res.Urls.Servers.Get = "get the current raspberry-pi servers info."
- res.Urls.Servers.Post = "the new raspberry-pi server info."
-
- body, _ := json.Marshal(res)
- writer.Write(body)
- })
-
- http.HandleFunc("/api/v1/clients", func(w http.ResponseWriter, r *http.Request) {
- if r.Method != "POST" {
- SrsWriteDataResponse(w, struct{}{})
- return
- }
- if err := func() error {
- body, err := ioutil.ReadAll(r.Body)
- if err != nil {
- return fmt.Errorf("read request body, err %v", err)
- }
- log.Println(fmt.Sprintf("post to clients, req=%v", string(body)))
- msg := &SrsClientRequest{}
- if err := json.Unmarshal(body, msg); err != nil {
- return fmt.Errorf("parse message from %v, err %v", string(body), err)
- }
- log.Println(fmt.Sprintf("Got %v", msg.String()))
- if !msg.IsOnConnect() && !msg.IsOnClose() {
- return fmt.Errorf("invalid message %v", msg.String())
- }
- SrsWriteDataResponse(w, &SrsCommonResponse{Code: 0})
- return nil
- }(); err != nil {
- SrsWriteErrorResponse(w, err)
- }
- })
-
- http.HandleFunc("/api/v1/streams", func(w http.ResponseWriter, r *http.Request) {
- if r.Method != "POST" {
- SrsWriteDataResponse(w, struct{}{})
- return
- }
- if err := func() error {
- body, err := ioutil.ReadAll(r.Body)
- if err != nil {
- return fmt.Errorf("read request body, err %v", err)
- }
- log.Println(fmt.Sprintf("post to streams, req=%v", string(body)))
- msg := &SrsStreamRequest{}
- if err := json.Unmarshal(body, msg); err != nil {
- return fmt.Errorf("parse message from %v, err %v", string(body), err)
- }
- log.Println(fmt.Sprintf("Got %v", msg.String()))
- if !msg.IsOnPublish() && !msg.IsOnUnPublish() {
- return fmt.Errorf("invalid message %v", msg.String())
- }
- SrsWriteDataResponse(w, &SrsCommonResponse{Code: 0})
- return nil
- }(); err != nil {
- SrsWriteErrorResponse(w, err)
- }
- })
-
- http.HandleFunc("/api/v1/sessions", func(w http.ResponseWriter, r *http.Request) {
- if r.Method != "POST" {
- SrsWriteDataResponse(w, struct{}{})
- return
- }
- if err := func() error {
- body, err := ioutil.ReadAll(r.Body)
- if err != nil {
- return fmt.Errorf("read request body, err %v", err)
- }
- log.Println(fmt.Sprintf("post to sessions, req=%v", string(body)))
- msg := &SrsSessionRequest{}
- if err := json.Unmarshal(body, msg); err != nil {
- return fmt.Errorf("parse message from %v, err %v", string(body), err)
- }
- log.Println(fmt.Sprintf("Got %v", msg.String()))
- if !msg.IsOnPlay() && !msg.IsOnStop() {
- return fmt.Errorf("invalid message %v", msg.String())
- }
- SrsWriteDataResponse(w, &SrsCommonResponse{Code: 0})
- return nil
- }(); err != nil {
- SrsWriteErrorResponse(w, err)
- }
- })
-
- http.HandleFunc("/api/v1/dvrs", func(w http.ResponseWriter, r *http.Request) {
- if r.Method != "POST" {
- SrsWriteDataResponse(w, struct{}{})
- return
- }
- if err := func() error {
- body, err := ioutil.ReadAll(r.Body)
- if err != nil {
- return fmt.Errorf("read request body, err %v", err)
- }
- log.Println(fmt.Sprintf("post to dvrs, req=%v", string(body)))
- msg := &SrsDvrRequest{}
- if err := json.Unmarshal(body, msg); err != nil {
- return fmt.Errorf("parse message from %v, err %v", string(body), err)
- }
- log.Println(fmt.Sprintf("Got %v", msg.String()))
- if !msg.IsOnDvr() {
- return fmt.Errorf("invalid message %v", msg.String())
- }
- SrsWriteDataResponse(w, &SrsCommonResponse{Code: 0})
- return nil
- }(); err != nil {
- SrsWriteErrorResponse(w, err)
- }
- })
-
- http.HandleFunc("/api/v1/hls", func(w http.ResponseWriter, r *http.Request) {
- if r.Method != "POST" {
- SrsWriteDataResponse(w, struct{}{})
- return
- }
- if err := func() error {
- body, err := ioutil.ReadAll(r.Body)
- if err != nil {
- return fmt.Errorf("read request body, err %v", err)
- }
- log.Println(fmt.Sprintf("post to hls, req=%v", string(body)))
- msg := &SrsHlsRequest{}
- if err := json.Unmarshal(body, msg); err != nil {
- return fmt.Errorf("parse message from %v, err %v", string(body), err)
- }
- log.Println(fmt.Sprintf("Got %v", msg.String()))
- if !msg.IsOnHls() {
- return fmt.Errorf("invalid message %v", msg.String())
- }
- SrsWriteDataResponse(w, &SrsCommonResponse{Code: 0})
- return nil
- }(); err != nil {
- SrsWriteErrorResponse(w, err)
- }
- })
-
- http.HandleFunc("/api/v1/chat", func(w http.ResponseWriter, r *http.Request) {
- SrsWriteErrorResponse(w, fmt.Errorf("not implemented"))
- })
- http.HandleFunc("/api/v1/snapshots", func(w http.ResponseWriter, r *http.Request) {
- if r.Method != "POST" {
- SrsWriteDataResponse(w, struct{}{})
- return
- }
- if err := func() error {
- body, err := ioutil.ReadAll(r.Body)
- if err != nil {
- return fmt.Errorf("read request body, err %v", err)
- }
- log.Println(fmt.Sprintf("post to snapshots, req=%v", string(body)))
- msg := &SrsSnapShotRequest{}
- if err := json.Unmarshal(body, msg); err != nil {
- return fmt.Errorf("parse message from %v, err %v", string(body), err)
- }
- log.Println(fmt.Sprintf("Got %v", msg.String()))
- if msg.IsOnPublish() {
- sw.Create(msg)
- } else if msg.IsOnUnPublish() {
- sw.Destroy(msg)
- } else {
- return fmt.Errorf("invalid message %v", msg.String())
- }
- SrsWriteDataResponse(w, &SrsCommonResponse{Code: 0})
- return nil
- }(); err != nil {
- SrsWriteErrorResponse(w, err)
- }
- })
-
- http.HandleFunc("/api/v1/forward", func(w http.ResponseWriter, r *http.Request) {
- if r.Method != "POST" {
- SrsWriteDataResponse(w, struct{}{})
- return
- }
- if err := func() error {
- body, err := ioutil.ReadAll(r.Body)
- if err != nil {
- return fmt.Errorf("read request body, err %v", err)
- }
- log.Println(fmt.Sprintf("post to forward, req=%v", string(body)))
- msg := &SrsForwardRequest{}
- if err := json.Unmarshal(body, msg); err != nil {
- return fmt.Errorf("parse message from %v, err %v", string(body), err)
- }
- log.Println(fmt.Sprintf("Got %v", msg.String()))
- if !msg.IsOnForward() {
- return fmt.Errorf("invalid message %v", msg.String())
- }
- SrsWriteDataResponse(w, &SrsCommonResponse{Code: 0, Data: &struct {
- Urls []string `json:"urls"`
- }{
- Urls: []string{"rtmp://127.0.0.1:19350/test/teststream"},
- }})
- return nil
- }(); err != nil {
- SrsWriteErrorResponse(w, err)
- }
- })
- addr := fmt.Sprintf(":%v", port)
- log.Println(fmt.Sprintf("start listen on:%v", addr))
- if err := http.ListenAndServe(addr, nil); err != nil {
- log.Println(fmt.Sprintf("listen on addr:%v failed, err is %v", addr, err))
- }
- }
|