123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077 |
- // The MIT License (MIT)
- //
- // # Copyright (c) 2021 Winlin
- //
- // Permission is hereby granted, free of charge, to any person obtaining a copy of
- // this software and associated documentation files (the "Software"), to deal in
- // the Software without restriction, including without limitation the rights to
- // use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
- // the Software, and to permit persons to whom the Software is furnished to do so,
- // subject to the following conditions:
- //
- // The above copyright notice and this permission notice shall be included in all
- // copies or substantial portions of the Software.
- //
- // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
- // FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
- // COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
- // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
- // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
- package janus
- import (
- "context"
- "encoding/json"
- "fmt"
- "github.com/ossrs/go-oryx-lib/errors"
- "github.com/ossrs/go-oryx-lib/logger"
- "io/ioutil"
- "net/http"
- "strings"
- "sync"
- "time"
- )
- type publisherInfo struct {
- AudioCodec string `json:"audio_codec"`
- Display string `json:"display"`
- ID uint64 `json:"id"`
- Talking bool `json:"talking"`
- VideoCodec string `json:"video_codec"`
- }
- func (v publisherInfo) String() string {
- return fmt.Sprintf("%v(codec:%v/%v,id:%v,talk:%v)",
- v.Display, v.VideoCodec, v.AudioCodec, v.ID, v.Talking)
- }
- type janusReply struct {
- transactionID string
- replies chan []byte
- }
- func newJanusReply(tid string) *janusReply {
- return &janusReply{
- transactionID: tid,
- replies: make(chan []byte, 1),
- }
- }
- type janusHandle struct {
- api *janusAPI
- // The ID created by API.
- handleID uint64
- publisherID uint64
- }
- type janusAPI struct {
- // For example, http://localhost:8088/janus
- r string
- // The ID created by API.
- sessionID uint64 // By Create().
- privateID uint64 // By JoinAsPublisher().
- // The handles, key is handleID, value is *janusHandle
- handles sync.Map
- // The callbacks.
- onDetached func(sender, sessionID uint64)
- onWebrtcUp func(sender, sessionID uint64)
- onMedia func(sender, sessionID uint64, mtype string, receiving bool)
- onSlowLink func(sender, sessionID uint64, media string, lost uint64, uplink bool)
- onPublisher func(sender, sessionID uint64, publishers []publisherInfo)
- onUnPublished func(sender, sessionID, id uint64)
- onLeave func(sender, sessionID, id uint64)
- // The context for polling.
- pollingCtx context.Context
- pollingCancel context.CancelFunc
- wg sync.WaitGroup
- // The replies of polling key is transactionID, value is janusReply.
- replies sync.Map
- }
- func newJanusAPI(r string) *janusAPI {
- v := &janusAPI{r: r}
- if !strings.HasSuffix(r, "/") {
- v.r += "/"
- }
- v.onDetached = func(sender, sessionID uint64) {
- }
- v.onWebrtcUp = func(sender, sessionID uint64) {
- }
- v.onMedia = func(sender, sessionID uint64, mtype string, receiving bool) {
- }
- v.onSlowLink = func(sender, sessionID uint64, media string, lost uint64, uplink bool) {
- }
- v.onPublisher = func(sender, sessionID uint64, publishers []publisherInfo) {
- }
- v.onUnPublished = func(sender, sessionID, id uint64) {
- }
- v.onLeave = func(sender, sessionID, id uint64) {
- }
- return v
- }
- func (v *janusAPI) Close() error {
- v.pollingCancel()
- v.wg.Wait()
- return nil
- }
- func (v *janusAPI) Create(ctx context.Context) error {
- v.pollingCtx, v.pollingCancel = context.WithCancel(ctx)
- api := v.r
- reqBody := struct {
- Janus string `json:"janus"`
- Transaction string `json:"transaction"`
- }{
- "create", newTransactionID(),
- }
- b, err := json.Marshal(reqBody)
- if err != nil {
- return errors.Wrapf(err, "Marshal body %v", reqBody)
- }
- logger.Tf(ctx, "Request url api=%v with %v", api, string(b))
- req, err := http.NewRequest("POST", api, strings.NewReader(string(b)))
- if err != nil {
- return errors.Wrapf(err, "HTTP request %v", string(b))
- }
- res, err := http.DefaultClient.Do(req.WithContext(ctx))
- if err != nil {
- return errors.Wrapf(err, "Do HTTP request %v", string(b))
- }
- b2, err := ioutil.ReadAll(res.Body)
- if err != nil {
- return errors.Wrapf(err, "Read response for %v", string(b))
- }
- s2 := escapeJSON(string(b2))
- logger.Tf(ctx, "Response from %v is %v", api, s2)
- resBody := struct {
- Janus string `json:"janus"`
- Transaction string `json:"transaction"`
- Data struct {
- ID uint64 `json:"id"`
- } `json:"data"`
- }{}
- if err := json.Unmarshal([]byte(s2), &resBody); err != nil {
- return errors.Wrapf(err, "Marshal %v", s2)
- }
- if resBody.Janus != "success" {
- return errors.Errorf("Server fail code=%v %v", resBody.Janus, s2)
- }
- v.sessionID = resBody.Data.ID
- logger.Tf(ctx, "Parse create sessionID=%v", v.sessionID)
- v.wg.Add(1)
- go func() {
- defer v.wg.Done()
- defer v.pollingCancel()
- for v.pollingCtx.Err() == nil {
- if err := v.polling(v.pollingCtx); err != nil {
- if v.pollingCtx.Err() != context.Canceled {
- logger.Wf(ctx, "polling err %+v", err)
- }
- break
- }
- }
- }()
- return nil
- }
- func (v *janusAPI) AttachPlugin(ctx context.Context) (handleID uint64, err error) {
- api := fmt.Sprintf("%v%v", v.r, v.sessionID)
- reqBody := struct {
- Janus string `json:"janus"`
- OpaqueID string `json:"opaque_id"`
- Plugin string `json:"plugin"`
- Transaction string `json:"transaction"`
- }{
- "attach", newTransactionID(),
- "janus.plugin.videoroom", newTransactionID(),
- }
- b, err := json.Marshal(reqBody)
- if err != nil {
- return 0, errors.Wrapf(err, "Marshal body %v", reqBody)
- }
- logger.Tf(ctx, "Request url api=%v with %v", api, string(b))
- req, err := http.NewRequest("POST", api, strings.NewReader(string(b)))
- if err != nil {
- return 0, errors.Wrapf(err, "HTTP request %v", string(b))
- }
- res, err := http.DefaultClient.Do(req.WithContext(ctx))
- if err != nil {
- return 0, errors.Wrapf(err, "Do HTTP request %v", string(b))
- }
- b2, err := ioutil.ReadAll(res.Body)
- if err != nil {
- return 0, errors.Wrapf(err, "Read response for %v", string(b))
- }
- s2 := escapeJSON(string(b2))
- logger.Tf(ctx, "Response from %v is %v", api, s2)
- resBody := struct {
- Janus string `json:"janus"`
- SessionID uint64 `json:"session_id"`
- Transaction string `json:"transaction"`
- Data struct {
- ID uint64 `json:"id"`
- } `json:"data"`
- }{}
- if err := json.Unmarshal([]byte(s2), &resBody); err != nil {
- return 0, errors.Wrapf(err, "Marshal %v", s2)
- }
- if resBody.Janus != "success" {
- return 0, errors.Errorf("Server fail code=%v %v", resBody.Janus, s2)
- }
- h := &janusHandle{}
- h.handleID = resBody.Data.ID
- h.api = v
- v.handles.Store(h.handleID, h)
- logger.Tf(ctx, "Parse create handleID=%v", h.handleID)
- return h.handleID, nil
- }
- func (v *janusAPI) DetachPlugin(ctx context.Context, handleID uint64) error {
- handler := v.loadHandler(handleID)
- api := fmt.Sprintf("%v%v/%v", v.r, v.sessionID, handler.handleID)
- reqBody := struct {
- Janus string `json:"janus"`
- Transaction string `json:"transaction"`
- }{
- "detach", newTransactionID(),
- }
- b, err := json.Marshal(reqBody)
- if err != nil {
- return errors.Wrapf(err, "Marshal body %v", reqBody)
- }
- logger.Tf(ctx, "Request url api=%v with %v", api, string(b))
- req, err := http.NewRequest("POST", api, strings.NewReader(string(b)))
- if err != nil {
- return errors.Wrapf(err, "HTTP request %v", string(b))
- }
- res, err := http.DefaultClient.Do(req.WithContext(ctx))
- if err != nil {
- return errors.Wrapf(err, "Do HTTP request %v", string(b))
- }
- b2, err := ioutil.ReadAll(res.Body)
- if err != nil {
- return errors.Wrapf(err, "Read response for %v", string(b))
- }
- s2 := escapeJSON(string(b2))
- logger.Tf(ctx, "Response from %v is %v", api, s2)
- ackBody := struct {
- Janus string `json:"janus"`
- SessionID uint64 `json:"session_id"`
- Transaction string `json:"transaction"`
- }{}
- if err := json.Unmarshal([]byte(s2), &ackBody); err != nil {
- return errors.Wrapf(err, "Marshal %v", s2)
- }
- if ackBody.Janus != "success" {
- return errors.Errorf("Server fail code=%v %v", ackBody.Janus, s2)
- }
- logger.Tf(ctx, "Detach tid=%v done", reqBody.Transaction)
- return nil
- }
- func (v *janusAPI) loadHandler(handleID uint64) *janusHandle {
- if h, ok := v.handles.Load(handleID); !ok {
- return nil
- } else {
- return h.(*janusHandle)
- }
- }
- func (v *janusAPI) JoinAsPublisher(ctx context.Context, handleID uint64, room int, display string) error {
- handler := v.loadHandler(handleID)
- api := fmt.Sprintf("%v%v/%v", v.r, v.sessionID, handler.handleID)
- reqBodyBody := struct {
- Request string `json:"request"`
- PType string `json:"ptype"`
- Room int `json:"room"`
- Display string `json:"display"`
- }{
- "join", "publisher", room, display,
- }
- reqBody := struct {
- Janus string `json:"janus"`
- Transaction string `json:"transaction"`
- Body interface{} `json:"body"`
- }{
- "message", newTransactionID(), reqBodyBody,
- }
- reply := newJanusReply(reqBody.Transaction)
- v.replies.Store(reqBody.Transaction, reply)
- b, err := json.Marshal(reqBody)
- if err != nil {
- return errors.Wrapf(err, "Marshal body %v", reqBody)
- }
- logger.Tf(ctx, "Request url api=%v with %v", api, string(b))
- req, err := http.NewRequest("POST", api, strings.NewReader(string(b)))
- if err != nil {
- return errors.Wrapf(err, "HTTP request %v", string(b))
- }
- res, err := http.DefaultClient.Do(req.WithContext(ctx))
- if err != nil {
- return errors.Wrapf(err, "Do HTTP request %v", string(b))
- }
- b2, err := ioutil.ReadAll(res.Body)
- if err != nil {
- return errors.Wrapf(err, "Read response for %v", string(b))
- }
- s2 := escapeJSON(string(b2))
- logger.Tf(ctx, "Response from %v is %v", api, s2)
- ackBody := struct {
- Janus string `json:"janus"`
- SessionID uint64 `json:"session_id"`
- Transaction string `json:"transaction"`
- }{}
- if err := json.Unmarshal([]byte(s2), &ackBody); err != nil {
- return errors.Wrapf(err, "Marshal %v", s2)
- }
- if ackBody.Janus != "ack" {
- return errors.Errorf("Server fail code=%v %v", ackBody.Janus, s2)
- }
- logger.Tf(ctx, "Response tid=%v ack", reply.transactionID)
- // Reply from polling.
- var s3 string
- select {
- case <-ctx.Done():
- return ctx.Err()
- case b3 := <-reply.replies:
- s3 = escapeJSON(string(b3))
- logger.Tf(ctx, "Async response tid=%v, reply=%v", reply.transactionID, s3)
- }
- resBody := struct {
- Janus string `json:"janus"`
- Session uint64 `json:"session_id"`
- Transaction string `json:"transaction"`
- Sender uint64 `json:"sender"`
- PluginData struct {
- Plugin string `json:"plugin"`
- Data struct {
- VideoRoom string `json:"videoroom"`
- Room int `json:"room"`
- Description string `json:"description"`
- ID uint64 `json:"id"`
- PrivateID uint64 `json:"private_id"`
- Publishers []publisherInfo `json:"publishers"`
- } `json:"data"`
- } `json:"plugindata"`
- }{}
- if err := json.Unmarshal([]byte(s3), &resBody); err != nil {
- return errors.Wrapf(err, "Marshal %v", s3)
- }
- plugin := resBody.PluginData.Data
- if resBody.Janus != "event" || plugin.VideoRoom != "joined" {
- return errors.Errorf("Server fail janus=%v, plugin=%v %v", resBody.Janus, plugin.VideoRoom, s3)
- }
- handler.publisherID = plugin.ID
- v.privateID = plugin.PrivateID
- logger.Tf(ctx, "Join as publisher room=%v, display=%v, tid=%v ok, event=%v, plugin=%v, id=%v, private=%v, publishers=%v",
- room, display, reply.transactionID, resBody.Janus, plugin.VideoRoom, handler.publisherID, plugin.PrivateID, len(plugin.Publishers))
- if len(plugin.Publishers) > 0 {
- v.onPublisher(resBody.Sender, resBody.Session, plugin.Publishers)
- }
- return nil
- }
- func (v *janusAPI) UnPublish(ctx context.Context, handleID uint64) error {
- handler := v.loadHandler(handleID)
- api := fmt.Sprintf("%v%v/%v", v.r, v.sessionID, handler.handleID)
- reqBodyBody := struct {
- Request string `json:"request"`
- }{
- "unpublish",
- }
- reqBody := struct {
- Janus string `json:"janus"`
- Transaction string `json:"transaction"`
- Body interface{} `json:"body"`
- }{
- "message", newTransactionID(), reqBodyBody,
- }
- b, err := json.Marshal(reqBody)
- if err != nil {
- return errors.Wrapf(err, "Marshal body %v", reqBody)
- }
- logger.Tf(ctx, "Request url api=%v with %v", api, string(b))
- req, err := http.NewRequest("POST", api, strings.NewReader(string(b)))
- if err != nil {
- return errors.Wrapf(err, "HTTP request %v", string(b))
- }
- res, err := http.DefaultClient.Do(req.WithContext(ctx))
- if err != nil {
- return errors.Wrapf(err, "Do HTTP request %v", string(b))
- }
- b2, err := ioutil.ReadAll(res.Body)
- if err != nil {
- return errors.Wrapf(err, "Read response for %v", string(b))
- }
- s2 := escapeJSON(string(b2))
- logger.Tf(ctx, "Response from %v is %v", api, s2)
- ackBody := struct {
- Janus string `json:"janus"`
- SessionID uint64 `json:"session_id"`
- Transaction string `json:"transaction"`
- }{}
- if err := json.Unmarshal([]byte(s2), &ackBody); err != nil {
- return errors.Wrapf(err, "Marshal %v", s2)
- }
- if ackBody.Janus != "ack" {
- return errors.Errorf("Server fail code=%v %v", ackBody.Janus, s2)
- }
- logger.Tf(ctx, "UnPublish tid=%v done", reqBody.Transaction)
- return nil
- }
- func (v *janusAPI) Publish(ctx context.Context, handleID uint64, offer string) (answer string, err error) {
- handler := v.loadHandler(handleID)
- api := fmt.Sprintf("%v%v/%v", v.r, v.sessionID, handler.handleID)
- reqBodyBody := struct {
- Request string `json:"request"`
- Video bool `json:"video"`
- Audio bool `json:"audio"`
- }{
- "configure", true, true,
- }
- jsepBody := struct {
- Type string `json:"type"`
- SDP string `json:"sdp"`
- }{
- "offer", offer,
- }
- reqBody := struct {
- Janus string `json:"janus"`
- Transaction string `json:"transaction"`
- Body interface{} `json:"body"`
- JSEP interface{} `json:"jsep"`
- }{
- "message", newTransactionID(), reqBodyBody, jsepBody,
- }
- reply := newJanusReply(reqBody.Transaction)
- v.replies.Store(reqBody.Transaction, reply)
- b, err := json.Marshal(reqBody)
- if err != nil {
- return "", errors.Wrapf(err, "Marshal body %v", reqBody)
- }
- logger.Tf(ctx, "Request url api=%v with %v", api, string(b))
- req, err := http.NewRequest("POST", api, strings.NewReader(string(b)))
- if err != nil {
- return "", errors.Wrapf(err, "HTTP request %v", string(b))
- }
- res, err := http.DefaultClient.Do(req.WithContext(ctx))
- if err != nil {
- return "", errors.Wrapf(err, "Do HTTP request %v", string(b))
- }
- b2, err := ioutil.ReadAll(res.Body)
- if err != nil {
- return "", errors.Wrapf(err, "Read response for %v", string(b))
- }
- s2 := escapeJSON(string(b2))
- logger.Tf(ctx, "Response from %v is %v", api, s2)
- ackBody := struct {
- Janus string `json:"janus"`
- SessionID uint64 `json:"session_id"`
- Transaction string `json:"transaction"`
- }{}
- if err := json.Unmarshal([]byte(s2), &ackBody); err != nil {
- return "", errors.Wrapf(err, "Marshal %v", s2)
- }
- if ackBody.Janus != "ack" {
- return "", errors.Errorf("Server fail code=%v %v", ackBody.Janus, s2)
- }
- logger.Tf(ctx, "Response tid=%v ack", reply.transactionID)
- // Reply from polling.
- var s3 string
- select {
- case <-ctx.Done():
- return "", ctx.Err()
- case b3 := <-reply.replies:
- s3 = escapeJSON(string(b3))
- logger.Tf(ctx, "Async response tid=%v, reply=%v", reply.transactionID, s3)
- }
- resBody := struct {
- Janus string `json:"janus"`
- Session uint64 `json:"session_id"`
- Transaction string `json:"transaction"`
- Sender uint64 `json:"sender"`
- PluginData struct {
- Plugin string `json:"plugin"`
- Data struct {
- VideoRoom string `json:"videoroom"`
- Room int `json:"room"`
- Configured string `json:"configured"`
- AudioCodec string `json:"audio_codec"`
- VideoCodec string `json:"video_codec"`
- } `json:"data"`
- } `json:"plugindata"`
- JSEP struct {
- Type string `json:"type"`
- SDP string `json:"sdp"`
- } `json:"jsep"`
- }{}
- if err := json.Unmarshal([]byte(s3), &resBody); err != nil {
- return "", errors.Wrapf(err, "Marshal %v", s3)
- }
- plugin := resBody.PluginData.Data
- jsep := resBody.JSEP
- if resBody.Janus != "event" || plugin.VideoRoom != "event" {
- return "", errors.Errorf("Server fail janus=%v, plugin=%v %v", resBody.Janus, plugin.VideoRoom, s3)
- }
- logger.Tf(ctx, "Configure publisher offer=%vB, tid=%v ok, event=%v, plugin=%v, answer=%vB",
- len(offer), reply.transactionID, resBody.Janus, plugin.VideoRoom, len(jsep.SDP))
- return jsep.SDP, nil
- }
- func (v *janusAPI) JoinAsSubscribe(ctx context.Context, handleID uint64, room int, publisher *publisherInfo) (offer string, err error) {
- handler := v.loadHandler(handleID)
- api := fmt.Sprintf("%v%v/%v", v.r, v.sessionID, handler.handleID)
- reqBodyBody := struct {
- Request string `json:"request"`
- PType string `json:"ptype"`
- Room int `json:"room"`
- Feed uint64 `json:"feed"`
- PrivateID uint64 `json:"private_id"`
- }{
- "join", "subscriber", room, publisher.ID, v.privateID,
- }
- reqBody := struct {
- Janus string `json:"janus"`
- Transaction string `json:"transaction"`
- Body interface{} `json:"body"`
- }{
- "message", newTransactionID(), reqBodyBody,
- }
- reply := newJanusReply(reqBody.Transaction)
- v.replies.Store(reqBody.Transaction, reply)
- b, err := json.Marshal(reqBody)
- if err != nil {
- return "", errors.Wrapf(err, "Marshal body %v", reqBody)
- }
- logger.Tf(ctx, "Request url api=%v with %v", api, string(b))
- req, err := http.NewRequest("POST", api, strings.NewReader(string(b)))
- if err != nil {
- return "", errors.Wrapf(err, "HTTP request %v", string(b))
- }
- res, err := http.DefaultClient.Do(req.WithContext(ctx))
- if err != nil {
- return "", errors.Wrapf(err, "Do HTTP request %v", string(b))
- }
- b2, err := ioutil.ReadAll(res.Body)
- if err != nil {
- return "", errors.Wrapf(err, "Read response for %v", string(b))
- }
- s2 := escapeJSON(string(b2))
- logger.Tf(ctx, "Response from %v is %v", api, s2)
- ackBody := struct {
- Janus string `json:"janus"`
- SessionID uint64 `json:"session_id"`
- Transaction string `json:"transaction"`
- }{}
- if err := json.Unmarshal([]byte(s2), &ackBody); err != nil {
- return "", errors.Wrapf(err, "Marshal %v", s2)
- }
- if ackBody.Janus != "ack" {
- return "", errors.Errorf("Server fail code=%v %v", ackBody.Janus, s2)
- }
- logger.Tf(ctx, "Response tid=%v ack", reply.transactionID)
- // Reply from polling.
- var s3 string
- select {
- case <-ctx.Done():
- return "", ctx.Err()
- case b3 := <-reply.replies:
- s3 = escapeJSON(string(b3))
- logger.Tf(ctx, "Async response tid=%v, reply=%v", reply.transactionID, s3)
- }
- resBody := struct {
- Janus string `json:"janus"`
- Session uint64 `json:"session_id"`
- Transaction string `json:"transaction"`
- Sender uint64 `json:"sender"`
- PluginData struct {
- Plugin string `json:"plugin"`
- Data struct {
- VideoRoom string `json:"videoroom"`
- Room int `json:"room"`
- ID uint64 `json:"id"`
- Display string `json:"display"`
- } `json:"data"`
- } `json:"plugindata"`
- JSEP struct {
- Type string `json:"type"`
- SDP string `json:"sdp"`
- } `json:"jsep"`
- }{}
- if err := json.Unmarshal([]byte(s3), &resBody); err != nil {
- return "", errors.Wrapf(err, "Marshal %v", s3)
- }
- plugin := resBody.PluginData.Data
- jsep := resBody.JSEP
- if resBody.Janus != "event" || plugin.VideoRoom != "attached" {
- return "", errors.Errorf("Server fail janus=%v, plugin=%v %v", resBody.Janus, plugin.VideoRoom, s3)
- }
- logger.Tf(ctx, "Join as subscriber room=%v, tid=%v ok, event=%v, plugin=%v, offer=%vB",
- room, reply.transactionID, resBody.Janus, plugin.VideoRoom, len(jsep.SDP))
- return jsep.SDP, nil
- }
- func (v *janusAPI) Subscribe(ctx context.Context, handleID uint64, room int, answer string) error {
- handler := v.loadHandler(handleID)
- api := fmt.Sprintf("%v%v/%v", v.r, v.sessionID, handler.handleID)
- reqBodyBody := struct {
- Request string `json:"request"`
- Room int `json:"room"`
- }{
- "start", room,
- }
- jsepBody := struct {
- Type string `json:"type"`
- SDP string `json:"sdp"`
- }{
- "answer", answer,
- }
- reqBody := struct {
- Janus string `json:"janus"`
- Transaction string `json:"transaction"`
- Body interface{} `json:"body"`
- JSEP interface{} `json:"jsep"`
- }{
- "message", newTransactionID(), reqBodyBody, jsepBody,
- }
- reply := newJanusReply(reqBody.Transaction)
- v.replies.Store(reqBody.Transaction, reply)
- b, err := json.Marshal(reqBody)
- if err != nil {
- return errors.Wrapf(err, "Marshal body %v", reqBody)
- }
- logger.Tf(ctx, "Request url api=%v with %v", api, string(b))
- req, err := http.NewRequest("POST", api, strings.NewReader(string(b)))
- if err != nil {
- return errors.Wrapf(err, "HTTP request %v", string(b))
- }
- res, err := http.DefaultClient.Do(req.WithContext(ctx))
- if err != nil {
- return errors.Wrapf(err, "Do HTTP request %v", string(b))
- }
- b2, err := ioutil.ReadAll(res.Body)
- if err != nil {
- return errors.Wrapf(err, "Read response for %v", string(b))
- }
- s2 := escapeJSON(string(b2))
- logger.Tf(ctx, "Response from %v is %v", api, s2)
- ackBody := struct {
- Janus string `json:"janus"`
- SessionID uint64 `json:"session_id"`
- Transaction string `json:"transaction"`
- }{}
- if err := json.Unmarshal([]byte(s2), &ackBody); err != nil {
- return errors.Wrapf(err, "Marshal %v", s2)
- }
- if ackBody.Janus != "ack" {
- return errors.Errorf("Server fail code=%v %v", ackBody.Janus, s2)
- }
- logger.Tf(ctx, "Response tid=%v ack", reply.transactionID)
- // Reply from polling.
- var s3 string
- select {
- case <-ctx.Done():
- return ctx.Err()
- case b3 := <-reply.replies:
- s3 = escapeJSON(string(b3))
- logger.Tf(ctx, "Async response tid=%v, reply=%v", reply.transactionID, s3)
- }
- resBody := struct {
- Janus string `json:"janus"`
- Session uint64 `json:"session_id"`
- Transaction string `json:"transaction"`
- Sender uint64 `json:"sender"`
- PluginData struct {
- Plugin string `json:"plugin"`
- Data struct {
- VideoRoom string `json:"videoroom"`
- Room int `json:"room"`
- Started string `json:"started"`
- } `json:"data"`
- } `json:"plugindata"`
- }{}
- if err := json.Unmarshal([]byte(s3), &resBody); err != nil {
- return errors.Wrapf(err, "Marshal %v", s3)
- }
- plugin := resBody.PluginData.Data
- if resBody.Janus != "event" || plugin.VideoRoom != "event" || plugin.Started != "ok" {
- return errors.Errorf("Server fail janus=%v, plugin=%v, started=%v %v", resBody.Janus, plugin.VideoRoom, plugin.Started, s3)
- }
- logger.Tf(ctx, "Start subscribe answer=%vB, tid=%v ok, event=%v, plugin=%v, started=%v",
- len(answer), reply.transactionID, resBody.Janus, plugin.VideoRoom, plugin.Started)
- return nil
- }
- func (v *janusAPI) polling(ctx context.Context) error {
- api := fmt.Sprintf("%v%v?rid=%v&maxev=1", v.r, v.sessionID,
- uint64(time.Duration(time.Now().UnixNano())/time.Millisecond))
- logger.Tf(ctx, "Polling: Request url api=%v", api)
- req, err := http.NewRequest("GET", api, nil)
- if err != nil {
- return errors.Wrapf(err, "HTTP request %v", api)
- }
- res, err := http.DefaultClient.Do(req.WithContext(ctx))
- if err != nil {
- return errors.Wrapf(err, "Do HTTP request %v", api)
- }
- b2, err := ioutil.ReadAll(res.Body)
- if err != nil {
- return errors.Wrapf(err, "Read response for %v", api)
- }
- s2 := escapeJSON(string(b2))
- logger.Tf(ctx, "Polling: Response from %v is %v", api, s2)
- if len(b2) == 0 {
- return nil
- }
- replyID := struct {
- Janus string `json:"janus"`
- Transaction string `json:"transaction"`
- }{}
- if err := json.Unmarshal([]byte(s2), &replyID); err != nil {
- return errors.Wrapf(err, "Marshal %v", s2)
- }
- switch replyID.Janus {
- case "event":
- if r, ok := v.replies.Load(replyID.Transaction); !ok {
- if err := v.handleCall(replyID.Janus, s2); err != nil {
- logger.Wf(ctx, "Polling: Handle call %v fail %v, err %+v", replyID.Janus, s2, err)
- }
- } else if r2, ok := r.(*janusReply); !ok {
- logger.Wf(ctx, "Polling: Ignore tid=%v reply %v", replyID.Transaction, s2)
- } else {
- select {
- case <-ctx.Done():
- return ctx.Err()
- case r2.replies <- b2:
- logger.Tf(ctx, "Polling: Reply tid=%v ok, %v", replyID.Transaction, s2)
- }
- }
- case "keepalive":
- return nil
- case "webrtcup", "media", "slowlink", "detached":
- if err := v.handleCall(replyID.Janus, s2); err != nil {
- logger.Wf(ctx, "Polling: Handle call %v fail %v, err %+v", replyID.Janus, s2, err)
- }
- default:
- logger.Wf(ctx, "Polling: Unknown janus=%v %v", replyID.Janus, s2)
- }
- return nil
- }
- func (v *janusAPI) handleCall(janus string, s string) error {
- type callHeader struct {
- Sender uint64 `json:"sender"`
- SessionID uint64 `json:"session_id"`
- }
- switch janus {
- case "detached":
- /*{
- "janus": "detached",
- "sender": 4201795482244652,
- "session_id": 373403124722380
- }*/
- r := callHeader{}
- if err := json.Unmarshal([]byte(s), &r); err != nil {
- return err
- }
- v.onDetached(r.Sender, r.SessionID)
- case "webrtcup":
- /*{
- "janus": "webrtcup",
- "sender": 7698695982180732,
- "session_id": 2403223275773854
- }*/
- r := callHeader{}
- if err := json.Unmarshal([]byte(s), &r); err != nil {
- return err
- }
- v.onWebrtcUp(r.Sender, r.SessionID)
- case "media":
- /*{
- "janus": "media",
- "receiving": true,
- "sender": 7698695982180732,
- "session_id": 2403223275773854,
- "type": "audio"
- }*/
- r := struct {
- callHeader
- Type string `json:"type"`
- Receiving bool `json:"receiving"`
- }{}
- if err := json.Unmarshal([]byte(s), &r); err != nil {
- return err
- }
- v.onMedia(r.Sender, r.SessionID, r.Type, r.Receiving)
- case "slowlink":
- /*{
- "janus": "slowlink",
- "lost": 4294902988,
- "media": "video",
- "sender": 562229074390269,
- "session_id": 156116325213625,
- "uplink": false
- }*/
- r := struct {
- callHeader
- Lost uint64 `json:"lost"`
- Media string `json:"media"`
- Uplink bool `json:"uplink"`
- }{}
- if err := json.Unmarshal([]byte(s), &r); err != nil {
- return err
- }
- v.onSlowLink(r.Sender, r.SessionID, r.Media, r.Lost, r.Uplink)
- case "event":
- if strings.Contains(s, "publishers") {
- /*{
- "janus": "event",
- "plugindata": {
- "data": {
- "publishers": [{
- "audio_codec": "opus",
- "display": "test",
- "id": 2805536617160145,
- "talking": false,
- "video_codec": "h264"
- }],
- "room": 2345,
- "videoroom": "event"
- },
- "plugin": "janus.plugin.videoroom"
- },
- "sender": 2156044968631669,
- "session_id": 6696376606446844
- }*/
- r := struct {
- callHeader
- PluginData struct {
- Data struct {
- Publishers []publisherInfo `json:"publishers"`
- Room int `json:"room"`
- VideoRoom string `json:"videoroom"`
- } `json:"data"`
- Plugin string `json:"plugin"`
- } `json:"plugindata"`
- }{}
- if err := json.Unmarshal([]byte(s), &r); err != nil {
- return err
- }
- v.onPublisher(r.Sender, r.SessionID, r.PluginData.Data.Publishers)
- } else if strings.Contains(s, "unpublished") {
- /*{
- "janus": "event",
- "plugindata": {
- "data": {
- "room": 2345,
- "unpublished": 2805536617160145,
- "videoroom": "event"
- },
- "plugin": "janus.plugin.videoroom"
- },
- "sender": 2156044968631669,
- "session_id": 6696376606446844
- }*/
- r := struct {
- callHeader
- PluginData struct {
- Data struct {
- Room int `json:"room"`
- UnPublished uint64 `json:"unpublished"`
- VideoRoom string `json:"videoroom"`
- } `json:"data"`
- Plugin string `json:"plugin"`
- } `json:"plugindata"`
- }{}
- if err := json.Unmarshal([]byte(s), &r); err != nil {
- return err
- }
- v.onUnPublished(r.Sender, r.SessionID, r.PluginData.Data.UnPublished)
- } else if strings.Contains(s, "leaving") {
- /*{
- "janus": "event",
- "plugindata": {
- "data": {
- "leaving": 2805536617160145,
- "room": 2345,
- "videoroom": "event"
- },
- "plugin": "janus.plugin.videoroom"
- },
- "sender": 2156044968631669,
- "session_id": 6696376606446844
- }*/
- r := struct {
- callHeader
- PluginData struct {
- Data struct {
- Leaving uint64 `json:"leaving"`
- Room int `json:"room"`
- VideoRoom string `json:"videoroom"`
- } `json:"data"`
- Plugin string `json:"plugin"`
- } `json:"plugindata"`
- }{}
- if err := json.Unmarshal([]byte(s), &r); err != nil {
- return err
- }
- v.onLeave(r.Sender, r.SessionID, r.PluginData.Data.Leaving)
- }
- }
- return nil
- }
- func (v *janusAPI) DiscoverPublisher(ctx context.Context, room int, display string, timeout time.Duration) (*publisherInfo, error) {
- var publisher *publisherInfo
- discoverCtx, discoverCancel := context.WithCancel(context.Background())
- ov := v.onPublisher
- defer func() {
- v.onPublisher = ov
- }()
- v.onPublisher = func(sender, sessionID uint64, publishers []publisherInfo) {
- for _, p := range publishers {
- if p.Display == display {
- publisher = &p
- discoverCancel()
- logger.Tf(ctx, "Publisher discovered %v", p)
- return
- }
- }
- }
- go func() {
- if err := func() error {
- publishHandleID, err := v.AttachPlugin(ctx)
- if err != nil {
- return err
- }
- defer v.DetachPlugin(ctx, publishHandleID)
- if err := v.JoinAsPublisher(ctx, publishHandleID, room, fmt.Sprintf("sub-%v", display)); err != nil {
- return err
- }
- <-discoverCtx.Done()
- return nil
- }(); err != nil {
- logger.Ef(ctx, "join err %+v", err)
- }
- }()
- select {
- case <-ctx.Done():
- return nil, ctx.Err()
- case <-discoverCtx.Done():
- case <-time.After(timeout):
- discoverCancel()
- }
- if publisher == nil {
- return nil, errors.Errorf("no publisher for room=%v, display=%v, session=%v",
- room, display, v.sessionID)
- }
- return publisher, nil
- }
|