api.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077
  1. // The MIT License (MIT)
  2. //
  3. // # Copyright (c) 2021 Winlin
  4. //
  5. // Permission is hereby granted, free of charge, to any person obtaining a copy of
  6. // this software and associated documentation files (the "Software"), to deal in
  7. // the Software without restriction, including without limitation the rights to
  8. // use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  9. // the Software, and to permit persons to whom the Software is furnished to do so,
  10. // subject to the following conditions:
  11. //
  12. // The above copyright notice and this permission notice shall be included in all
  13. // copies or substantial portions of the Software.
  14. //
  15. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  16. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  17. // FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  18. // COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  19. // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  20. // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  21. package janus
  22. import (
  23. "context"
  24. "encoding/json"
  25. "fmt"
  26. "github.com/ossrs/go-oryx-lib/errors"
  27. "github.com/ossrs/go-oryx-lib/logger"
  28. "io/ioutil"
  29. "net/http"
  30. "strings"
  31. "sync"
  32. "time"
  33. )
  34. type publisherInfo struct {
  35. AudioCodec string `json:"audio_codec"`
  36. Display string `json:"display"`
  37. ID uint64 `json:"id"`
  38. Talking bool `json:"talking"`
  39. VideoCodec string `json:"video_codec"`
  40. }
  41. func (v publisherInfo) String() string {
  42. return fmt.Sprintf("%v(codec:%v/%v,id:%v,talk:%v)",
  43. v.Display, v.VideoCodec, v.AudioCodec, v.ID, v.Talking)
  44. }
  45. type janusReply struct {
  46. transactionID string
  47. replies chan []byte
  48. }
  49. func newJanusReply(tid string) *janusReply {
  50. return &janusReply{
  51. transactionID: tid,
  52. replies: make(chan []byte, 1),
  53. }
  54. }
  55. type janusHandle struct {
  56. api *janusAPI
  57. // The ID created by API.
  58. handleID uint64
  59. publisherID uint64
  60. }
  61. type janusAPI struct {
  62. // For example, http://localhost:8088/janus
  63. r string
  64. // The ID created by API.
  65. sessionID uint64 // By Create().
  66. privateID uint64 // By JoinAsPublisher().
  67. // The handles, key is handleID, value is *janusHandle
  68. handles sync.Map
  69. // The callbacks.
  70. onDetached func(sender, sessionID uint64)
  71. onWebrtcUp func(sender, sessionID uint64)
  72. onMedia func(sender, sessionID uint64, mtype string, receiving bool)
  73. onSlowLink func(sender, sessionID uint64, media string, lost uint64, uplink bool)
  74. onPublisher func(sender, sessionID uint64, publishers []publisherInfo)
  75. onUnPublished func(sender, sessionID, id uint64)
  76. onLeave func(sender, sessionID, id uint64)
  77. // The context for polling.
  78. pollingCtx context.Context
  79. pollingCancel context.CancelFunc
  80. wg sync.WaitGroup
  81. // The replies of polling key is transactionID, value is janusReply.
  82. replies sync.Map
  83. }
  84. func newJanusAPI(r string) *janusAPI {
  85. v := &janusAPI{r: r}
  86. if !strings.HasSuffix(r, "/") {
  87. v.r += "/"
  88. }
  89. v.onDetached = func(sender, sessionID uint64) {
  90. }
  91. v.onWebrtcUp = func(sender, sessionID uint64) {
  92. }
  93. v.onMedia = func(sender, sessionID uint64, mtype string, receiving bool) {
  94. }
  95. v.onSlowLink = func(sender, sessionID uint64, media string, lost uint64, uplink bool) {
  96. }
  97. v.onPublisher = func(sender, sessionID uint64, publishers []publisherInfo) {
  98. }
  99. v.onUnPublished = func(sender, sessionID, id uint64) {
  100. }
  101. v.onLeave = func(sender, sessionID, id uint64) {
  102. }
  103. return v
  104. }
  105. func (v *janusAPI) Close() error {
  106. v.pollingCancel()
  107. v.wg.Wait()
  108. return nil
  109. }
  110. func (v *janusAPI) Create(ctx context.Context) error {
  111. v.pollingCtx, v.pollingCancel = context.WithCancel(ctx)
  112. api := v.r
  113. reqBody := struct {
  114. Janus string `json:"janus"`
  115. Transaction string `json:"transaction"`
  116. }{
  117. "create", newTransactionID(),
  118. }
  119. b, err := json.Marshal(reqBody)
  120. if err != nil {
  121. return errors.Wrapf(err, "Marshal body %v", reqBody)
  122. }
  123. logger.Tf(ctx, "Request url api=%v with %v", api, string(b))
  124. req, err := http.NewRequest("POST", api, strings.NewReader(string(b)))
  125. if err != nil {
  126. return errors.Wrapf(err, "HTTP request %v", string(b))
  127. }
  128. res, err := http.DefaultClient.Do(req.WithContext(ctx))
  129. if err != nil {
  130. return errors.Wrapf(err, "Do HTTP request %v", string(b))
  131. }
  132. b2, err := ioutil.ReadAll(res.Body)
  133. if err != nil {
  134. return errors.Wrapf(err, "Read response for %v", string(b))
  135. }
  136. s2 := escapeJSON(string(b2))
  137. logger.Tf(ctx, "Response from %v is %v", api, s2)
  138. resBody := struct {
  139. Janus string `json:"janus"`
  140. Transaction string `json:"transaction"`
  141. Data struct {
  142. ID uint64 `json:"id"`
  143. } `json:"data"`
  144. }{}
  145. if err := json.Unmarshal([]byte(s2), &resBody); err != nil {
  146. return errors.Wrapf(err, "Marshal %v", s2)
  147. }
  148. if resBody.Janus != "success" {
  149. return errors.Errorf("Server fail code=%v %v", resBody.Janus, s2)
  150. }
  151. v.sessionID = resBody.Data.ID
  152. logger.Tf(ctx, "Parse create sessionID=%v", v.sessionID)
  153. v.wg.Add(1)
  154. go func() {
  155. defer v.wg.Done()
  156. defer v.pollingCancel()
  157. for v.pollingCtx.Err() == nil {
  158. if err := v.polling(v.pollingCtx); err != nil {
  159. if v.pollingCtx.Err() != context.Canceled {
  160. logger.Wf(ctx, "polling err %+v", err)
  161. }
  162. break
  163. }
  164. }
  165. }()
  166. return nil
  167. }
  168. func (v *janusAPI) AttachPlugin(ctx context.Context) (handleID uint64, err error) {
  169. api := fmt.Sprintf("%v%v", v.r, v.sessionID)
  170. reqBody := struct {
  171. Janus string `json:"janus"`
  172. OpaqueID string `json:"opaque_id"`
  173. Plugin string `json:"plugin"`
  174. Transaction string `json:"transaction"`
  175. }{
  176. "attach", newTransactionID(),
  177. "janus.plugin.videoroom", newTransactionID(),
  178. }
  179. b, err := json.Marshal(reqBody)
  180. if err != nil {
  181. return 0, errors.Wrapf(err, "Marshal body %v", reqBody)
  182. }
  183. logger.Tf(ctx, "Request url api=%v with %v", api, string(b))
  184. req, err := http.NewRequest("POST", api, strings.NewReader(string(b)))
  185. if err != nil {
  186. return 0, errors.Wrapf(err, "HTTP request %v", string(b))
  187. }
  188. res, err := http.DefaultClient.Do(req.WithContext(ctx))
  189. if err != nil {
  190. return 0, errors.Wrapf(err, "Do HTTP request %v", string(b))
  191. }
  192. b2, err := ioutil.ReadAll(res.Body)
  193. if err != nil {
  194. return 0, errors.Wrapf(err, "Read response for %v", string(b))
  195. }
  196. s2 := escapeJSON(string(b2))
  197. logger.Tf(ctx, "Response from %v is %v", api, s2)
  198. resBody := struct {
  199. Janus string `json:"janus"`
  200. SessionID uint64 `json:"session_id"`
  201. Transaction string `json:"transaction"`
  202. Data struct {
  203. ID uint64 `json:"id"`
  204. } `json:"data"`
  205. }{}
  206. if err := json.Unmarshal([]byte(s2), &resBody); err != nil {
  207. return 0, errors.Wrapf(err, "Marshal %v", s2)
  208. }
  209. if resBody.Janus != "success" {
  210. return 0, errors.Errorf("Server fail code=%v %v", resBody.Janus, s2)
  211. }
  212. h := &janusHandle{}
  213. h.handleID = resBody.Data.ID
  214. h.api = v
  215. v.handles.Store(h.handleID, h)
  216. logger.Tf(ctx, "Parse create handleID=%v", h.handleID)
  217. return h.handleID, nil
  218. }
  219. func (v *janusAPI) DetachPlugin(ctx context.Context, handleID uint64) error {
  220. handler := v.loadHandler(handleID)
  221. api := fmt.Sprintf("%v%v/%v", v.r, v.sessionID, handler.handleID)
  222. reqBody := struct {
  223. Janus string `json:"janus"`
  224. Transaction string `json:"transaction"`
  225. }{
  226. "detach", newTransactionID(),
  227. }
  228. b, err := json.Marshal(reqBody)
  229. if err != nil {
  230. return errors.Wrapf(err, "Marshal body %v", reqBody)
  231. }
  232. logger.Tf(ctx, "Request url api=%v with %v", api, string(b))
  233. req, err := http.NewRequest("POST", api, strings.NewReader(string(b)))
  234. if err != nil {
  235. return errors.Wrapf(err, "HTTP request %v", string(b))
  236. }
  237. res, err := http.DefaultClient.Do(req.WithContext(ctx))
  238. if err != nil {
  239. return errors.Wrapf(err, "Do HTTP request %v", string(b))
  240. }
  241. b2, err := ioutil.ReadAll(res.Body)
  242. if err != nil {
  243. return errors.Wrapf(err, "Read response for %v", string(b))
  244. }
  245. s2 := escapeJSON(string(b2))
  246. logger.Tf(ctx, "Response from %v is %v", api, s2)
  247. ackBody := struct {
  248. Janus string `json:"janus"`
  249. SessionID uint64 `json:"session_id"`
  250. Transaction string `json:"transaction"`
  251. }{}
  252. if err := json.Unmarshal([]byte(s2), &ackBody); err != nil {
  253. return errors.Wrapf(err, "Marshal %v", s2)
  254. }
  255. if ackBody.Janus != "success" {
  256. return errors.Errorf("Server fail code=%v %v", ackBody.Janus, s2)
  257. }
  258. logger.Tf(ctx, "Detach tid=%v done", reqBody.Transaction)
  259. return nil
  260. }
  261. func (v *janusAPI) loadHandler(handleID uint64) *janusHandle {
  262. if h, ok := v.handles.Load(handleID); !ok {
  263. return nil
  264. } else {
  265. return h.(*janusHandle)
  266. }
  267. }
  268. func (v *janusAPI) JoinAsPublisher(ctx context.Context, handleID uint64, room int, display string) error {
  269. handler := v.loadHandler(handleID)
  270. api := fmt.Sprintf("%v%v/%v", v.r, v.sessionID, handler.handleID)
  271. reqBodyBody := struct {
  272. Request string `json:"request"`
  273. PType string `json:"ptype"`
  274. Room int `json:"room"`
  275. Display string `json:"display"`
  276. }{
  277. "join", "publisher", room, display,
  278. }
  279. reqBody := struct {
  280. Janus string `json:"janus"`
  281. Transaction string `json:"transaction"`
  282. Body interface{} `json:"body"`
  283. }{
  284. "message", newTransactionID(), reqBodyBody,
  285. }
  286. reply := newJanusReply(reqBody.Transaction)
  287. v.replies.Store(reqBody.Transaction, reply)
  288. b, err := json.Marshal(reqBody)
  289. if err != nil {
  290. return errors.Wrapf(err, "Marshal body %v", reqBody)
  291. }
  292. logger.Tf(ctx, "Request url api=%v with %v", api, string(b))
  293. req, err := http.NewRequest("POST", api, strings.NewReader(string(b)))
  294. if err != nil {
  295. return errors.Wrapf(err, "HTTP request %v", string(b))
  296. }
  297. res, err := http.DefaultClient.Do(req.WithContext(ctx))
  298. if err != nil {
  299. return errors.Wrapf(err, "Do HTTP request %v", string(b))
  300. }
  301. b2, err := ioutil.ReadAll(res.Body)
  302. if err != nil {
  303. return errors.Wrapf(err, "Read response for %v", string(b))
  304. }
  305. s2 := escapeJSON(string(b2))
  306. logger.Tf(ctx, "Response from %v is %v", api, s2)
  307. ackBody := struct {
  308. Janus string `json:"janus"`
  309. SessionID uint64 `json:"session_id"`
  310. Transaction string `json:"transaction"`
  311. }{}
  312. if err := json.Unmarshal([]byte(s2), &ackBody); err != nil {
  313. return errors.Wrapf(err, "Marshal %v", s2)
  314. }
  315. if ackBody.Janus != "ack" {
  316. return errors.Errorf("Server fail code=%v %v", ackBody.Janus, s2)
  317. }
  318. logger.Tf(ctx, "Response tid=%v ack", reply.transactionID)
  319. // Reply from polling.
  320. var s3 string
  321. select {
  322. case <-ctx.Done():
  323. return ctx.Err()
  324. case b3 := <-reply.replies:
  325. s3 = escapeJSON(string(b3))
  326. logger.Tf(ctx, "Async response tid=%v, reply=%v", reply.transactionID, s3)
  327. }
  328. resBody := struct {
  329. Janus string `json:"janus"`
  330. Session uint64 `json:"session_id"`
  331. Transaction string `json:"transaction"`
  332. Sender uint64 `json:"sender"`
  333. PluginData struct {
  334. Plugin string `json:"plugin"`
  335. Data struct {
  336. VideoRoom string `json:"videoroom"`
  337. Room int `json:"room"`
  338. Description string `json:"description"`
  339. ID uint64 `json:"id"`
  340. PrivateID uint64 `json:"private_id"`
  341. Publishers []publisherInfo `json:"publishers"`
  342. } `json:"data"`
  343. } `json:"plugindata"`
  344. }{}
  345. if err := json.Unmarshal([]byte(s3), &resBody); err != nil {
  346. return errors.Wrapf(err, "Marshal %v", s3)
  347. }
  348. plugin := resBody.PluginData.Data
  349. if resBody.Janus != "event" || plugin.VideoRoom != "joined" {
  350. return errors.Errorf("Server fail janus=%v, plugin=%v %v", resBody.Janus, plugin.VideoRoom, s3)
  351. }
  352. handler.publisherID = plugin.ID
  353. v.privateID = plugin.PrivateID
  354. logger.Tf(ctx, "Join as publisher room=%v, display=%v, tid=%v ok, event=%v, plugin=%v, id=%v, private=%v, publishers=%v",
  355. room, display, reply.transactionID, resBody.Janus, plugin.VideoRoom, handler.publisherID, plugin.PrivateID, len(plugin.Publishers))
  356. if len(plugin.Publishers) > 0 {
  357. v.onPublisher(resBody.Sender, resBody.Session, plugin.Publishers)
  358. }
  359. return nil
  360. }
  361. func (v *janusAPI) UnPublish(ctx context.Context, handleID uint64) error {
  362. handler := v.loadHandler(handleID)
  363. api := fmt.Sprintf("%v%v/%v", v.r, v.sessionID, handler.handleID)
  364. reqBodyBody := struct {
  365. Request string `json:"request"`
  366. }{
  367. "unpublish",
  368. }
  369. reqBody := struct {
  370. Janus string `json:"janus"`
  371. Transaction string `json:"transaction"`
  372. Body interface{} `json:"body"`
  373. }{
  374. "message", newTransactionID(), reqBodyBody,
  375. }
  376. b, err := json.Marshal(reqBody)
  377. if err != nil {
  378. return errors.Wrapf(err, "Marshal body %v", reqBody)
  379. }
  380. logger.Tf(ctx, "Request url api=%v with %v", api, string(b))
  381. req, err := http.NewRequest("POST", api, strings.NewReader(string(b)))
  382. if err != nil {
  383. return errors.Wrapf(err, "HTTP request %v", string(b))
  384. }
  385. res, err := http.DefaultClient.Do(req.WithContext(ctx))
  386. if err != nil {
  387. return errors.Wrapf(err, "Do HTTP request %v", string(b))
  388. }
  389. b2, err := ioutil.ReadAll(res.Body)
  390. if err != nil {
  391. return errors.Wrapf(err, "Read response for %v", string(b))
  392. }
  393. s2 := escapeJSON(string(b2))
  394. logger.Tf(ctx, "Response from %v is %v", api, s2)
  395. ackBody := struct {
  396. Janus string `json:"janus"`
  397. SessionID uint64 `json:"session_id"`
  398. Transaction string `json:"transaction"`
  399. }{}
  400. if err := json.Unmarshal([]byte(s2), &ackBody); err != nil {
  401. return errors.Wrapf(err, "Marshal %v", s2)
  402. }
  403. if ackBody.Janus != "ack" {
  404. return errors.Errorf("Server fail code=%v %v", ackBody.Janus, s2)
  405. }
  406. logger.Tf(ctx, "UnPublish tid=%v done", reqBody.Transaction)
  407. return nil
  408. }
  409. func (v *janusAPI) Publish(ctx context.Context, handleID uint64, offer string) (answer string, err error) {
  410. handler := v.loadHandler(handleID)
  411. api := fmt.Sprintf("%v%v/%v", v.r, v.sessionID, handler.handleID)
  412. reqBodyBody := struct {
  413. Request string `json:"request"`
  414. Video bool `json:"video"`
  415. Audio bool `json:"audio"`
  416. }{
  417. "configure", true, true,
  418. }
  419. jsepBody := struct {
  420. Type string `json:"type"`
  421. SDP string `json:"sdp"`
  422. }{
  423. "offer", offer,
  424. }
  425. reqBody := struct {
  426. Janus string `json:"janus"`
  427. Transaction string `json:"transaction"`
  428. Body interface{} `json:"body"`
  429. JSEP interface{} `json:"jsep"`
  430. }{
  431. "message", newTransactionID(), reqBodyBody, jsepBody,
  432. }
  433. reply := newJanusReply(reqBody.Transaction)
  434. v.replies.Store(reqBody.Transaction, reply)
  435. b, err := json.Marshal(reqBody)
  436. if err != nil {
  437. return "", errors.Wrapf(err, "Marshal body %v", reqBody)
  438. }
  439. logger.Tf(ctx, "Request url api=%v with %v", api, string(b))
  440. req, err := http.NewRequest("POST", api, strings.NewReader(string(b)))
  441. if err != nil {
  442. return "", errors.Wrapf(err, "HTTP request %v", string(b))
  443. }
  444. res, err := http.DefaultClient.Do(req.WithContext(ctx))
  445. if err != nil {
  446. return "", errors.Wrapf(err, "Do HTTP request %v", string(b))
  447. }
  448. b2, err := ioutil.ReadAll(res.Body)
  449. if err != nil {
  450. return "", errors.Wrapf(err, "Read response for %v", string(b))
  451. }
  452. s2 := escapeJSON(string(b2))
  453. logger.Tf(ctx, "Response from %v is %v", api, s2)
  454. ackBody := struct {
  455. Janus string `json:"janus"`
  456. SessionID uint64 `json:"session_id"`
  457. Transaction string `json:"transaction"`
  458. }{}
  459. if err := json.Unmarshal([]byte(s2), &ackBody); err != nil {
  460. return "", errors.Wrapf(err, "Marshal %v", s2)
  461. }
  462. if ackBody.Janus != "ack" {
  463. return "", errors.Errorf("Server fail code=%v %v", ackBody.Janus, s2)
  464. }
  465. logger.Tf(ctx, "Response tid=%v ack", reply.transactionID)
  466. // Reply from polling.
  467. var s3 string
  468. select {
  469. case <-ctx.Done():
  470. return "", ctx.Err()
  471. case b3 := <-reply.replies:
  472. s3 = escapeJSON(string(b3))
  473. logger.Tf(ctx, "Async response tid=%v, reply=%v", reply.transactionID, s3)
  474. }
  475. resBody := struct {
  476. Janus string `json:"janus"`
  477. Session uint64 `json:"session_id"`
  478. Transaction string `json:"transaction"`
  479. Sender uint64 `json:"sender"`
  480. PluginData struct {
  481. Plugin string `json:"plugin"`
  482. Data struct {
  483. VideoRoom string `json:"videoroom"`
  484. Room int `json:"room"`
  485. Configured string `json:"configured"`
  486. AudioCodec string `json:"audio_codec"`
  487. VideoCodec string `json:"video_codec"`
  488. } `json:"data"`
  489. } `json:"plugindata"`
  490. JSEP struct {
  491. Type string `json:"type"`
  492. SDP string `json:"sdp"`
  493. } `json:"jsep"`
  494. }{}
  495. if err := json.Unmarshal([]byte(s3), &resBody); err != nil {
  496. return "", errors.Wrapf(err, "Marshal %v", s3)
  497. }
  498. plugin := resBody.PluginData.Data
  499. jsep := resBody.JSEP
  500. if resBody.Janus != "event" || plugin.VideoRoom != "event" {
  501. return "", errors.Errorf("Server fail janus=%v, plugin=%v %v", resBody.Janus, plugin.VideoRoom, s3)
  502. }
  503. logger.Tf(ctx, "Configure publisher offer=%vB, tid=%v ok, event=%v, plugin=%v, answer=%vB",
  504. len(offer), reply.transactionID, resBody.Janus, plugin.VideoRoom, len(jsep.SDP))
  505. return jsep.SDP, nil
  506. }
  507. func (v *janusAPI) JoinAsSubscribe(ctx context.Context, handleID uint64, room int, publisher *publisherInfo) (offer string, err error) {
  508. handler := v.loadHandler(handleID)
  509. api := fmt.Sprintf("%v%v/%v", v.r, v.sessionID, handler.handleID)
  510. reqBodyBody := struct {
  511. Request string `json:"request"`
  512. PType string `json:"ptype"`
  513. Room int `json:"room"`
  514. Feed uint64 `json:"feed"`
  515. PrivateID uint64 `json:"private_id"`
  516. }{
  517. "join", "subscriber", room, publisher.ID, v.privateID,
  518. }
  519. reqBody := struct {
  520. Janus string `json:"janus"`
  521. Transaction string `json:"transaction"`
  522. Body interface{} `json:"body"`
  523. }{
  524. "message", newTransactionID(), reqBodyBody,
  525. }
  526. reply := newJanusReply(reqBody.Transaction)
  527. v.replies.Store(reqBody.Transaction, reply)
  528. b, err := json.Marshal(reqBody)
  529. if err != nil {
  530. return "", errors.Wrapf(err, "Marshal body %v", reqBody)
  531. }
  532. logger.Tf(ctx, "Request url api=%v with %v", api, string(b))
  533. req, err := http.NewRequest("POST", api, strings.NewReader(string(b)))
  534. if err != nil {
  535. return "", errors.Wrapf(err, "HTTP request %v", string(b))
  536. }
  537. res, err := http.DefaultClient.Do(req.WithContext(ctx))
  538. if err != nil {
  539. return "", errors.Wrapf(err, "Do HTTP request %v", string(b))
  540. }
  541. b2, err := ioutil.ReadAll(res.Body)
  542. if err != nil {
  543. return "", errors.Wrapf(err, "Read response for %v", string(b))
  544. }
  545. s2 := escapeJSON(string(b2))
  546. logger.Tf(ctx, "Response from %v is %v", api, s2)
  547. ackBody := struct {
  548. Janus string `json:"janus"`
  549. SessionID uint64 `json:"session_id"`
  550. Transaction string `json:"transaction"`
  551. }{}
  552. if err := json.Unmarshal([]byte(s2), &ackBody); err != nil {
  553. return "", errors.Wrapf(err, "Marshal %v", s2)
  554. }
  555. if ackBody.Janus != "ack" {
  556. return "", errors.Errorf("Server fail code=%v %v", ackBody.Janus, s2)
  557. }
  558. logger.Tf(ctx, "Response tid=%v ack", reply.transactionID)
  559. // Reply from polling.
  560. var s3 string
  561. select {
  562. case <-ctx.Done():
  563. return "", ctx.Err()
  564. case b3 := <-reply.replies:
  565. s3 = escapeJSON(string(b3))
  566. logger.Tf(ctx, "Async response tid=%v, reply=%v", reply.transactionID, s3)
  567. }
  568. resBody := struct {
  569. Janus string `json:"janus"`
  570. Session uint64 `json:"session_id"`
  571. Transaction string `json:"transaction"`
  572. Sender uint64 `json:"sender"`
  573. PluginData struct {
  574. Plugin string `json:"plugin"`
  575. Data struct {
  576. VideoRoom string `json:"videoroom"`
  577. Room int `json:"room"`
  578. ID uint64 `json:"id"`
  579. Display string `json:"display"`
  580. } `json:"data"`
  581. } `json:"plugindata"`
  582. JSEP struct {
  583. Type string `json:"type"`
  584. SDP string `json:"sdp"`
  585. } `json:"jsep"`
  586. }{}
  587. if err := json.Unmarshal([]byte(s3), &resBody); err != nil {
  588. return "", errors.Wrapf(err, "Marshal %v", s3)
  589. }
  590. plugin := resBody.PluginData.Data
  591. jsep := resBody.JSEP
  592. if resBody.Janus != "event" || plugin.VideoRoom != "attached" {
  593. return "", errors.Errorf("Server fail janus=%v, plugin=%v %v", resBody.Janus, plugin.VideoRoom, s3)
  594. }
  595. logger.Tf(ctx, "Join as subscriber room=%v, tid=%v ok, event=%v, plugin=%v, offer=%vB",
  596. room, reply.transactionID, resBody.Janus, plugin.VideoRoom, len(jsep.SDP))
  597. return jsep.SDP, nil
  598. }
  599. func (v *janusAPI) Subscribe(ctx context.Context, handleID uint64, room int, answer string) error {
  600. handler := v.loadHandler(handleID)
  601. api := fmt.Sprintf("%v%v/%v", v.r, v.sessionID, handler.handleID)
  602. reqBodyBody := struct {
  603. Request string `json:"request"`
  604. Room int `json:"room"`
  605. }{
  606. "start", room,
  607. }
  608. jsepBody := struct {
  609. Type string `json:"type"`
  610. SDP string `json:"sdp"`
  611. }{
  612. "answer", answer,
  613. }
  614. reqBody := struct {
  615. Janus string `json:"janus"`
  616. Transaction string `json:"transaction"`
  617. Body interface{} `json:"body"`
  618. JSEP interface{} `json:"jsep"`
  619. }{
  620. "message", newTransactionID(), reqBodyBody, jsepBody,
  621. }
  622. reply := newJanusReply(reqBody.Transaction)
  623. v.replies.Store(reqBody.Transaction, reply)
  624. b, err := json.Marshal(reqBody)
  625. if err != nil {
  626. return errors.Wrapf(err, "Marshal body %v", reqBody)
  627. }
  628. logger.Tf(ctx, "Request url api=%v with %v", api, string(b))
  629. req, err := http.NewRequest("POST", api, strings.NewReader(string(b)))
  630. if err != nil {
  631. return errors.Wrapf(err, "HTTP request %v", string(b))
  632. }
  633. res, err := http.DefaultClient.Do(req.WithContext(ctx))
  634. if err != nil {
  635. return errors.Wrapf(err, "Do HTTP request %v", string(b))
  636. }
  637. b2, err := ioutil.ReadAll(res.Body)
  638. if err != nil {
  639. return errors.Wrapf(err, "Read response for %v", string(b))
  640. }
  641. s2 := escapeJSON(string(b2))
  642. logger.Tf(ctx, "Response from %v is %v", api, s2)
  643. ackBody := struct {
  644. Janus string `json:"janus"`
  645. SessionID uint64 `json:"session_id"`
  646. Transaction string `json:"transaction"`
  647. }{}
  648. if err := json.Unmarshal([]byte(s2), &ackBody); err != nil {
  649. return errors.Wrapf(err, "Marshal %v", s2)
  650. }
  651. if ackBody.Janus != "ack" {
  652. return errors.Errorf("Server fail code=%v %v", ackBody.Janus, s2)
  653. }
  654. logger.Tf(ctx, "Response tid=%v ack", reply.transactionID)
  655. // Reply from polling.
  656. var s3 string
  657. select {
  658. case <-ctx.Done():
  659. return ctx.Err()
  660. case b3 := <-reply.replies:
  661. s3 = escapeJSON(string(b3))
  662. logger.Tf(ctx, "Async response tid=%v, reply=%v", reply.transactionID, s3)
  663. }
  664. resBody := struct {
  665. Janus string `json:"janus"`
  666. Session uint64 `json:"session_id"`
  667. Transaction string `json:"transaction"`
  668. Sender uint64 `json:"sender"`
  669. PluginData struct {
  670. Plugin string `json:"plugin"`
  671. Data struct {
  672. VideoRoom string `json:"videoroom"`
  673. Room int `json:"room"`
  674. Started string `json:"started"`
  675. } `json:"data"`
  676. } `json:"plugindata"`
  677. }{}
  678. if err := json.Unmarshal([]byte(s3), &resBody); err != nil {
  679. return errors.Wrapf(err, "Marshal %v", s3)
  680. }
  681. plugin := resBody.PluginData.Data
  682. if resBody.Janus != "event" || plugin.VideoRoom != "event" || plugin.Started != "ok" {
  683. return errors.Errorf("Server fail janus=%v, plugin=%v, started=%v %v", resBody.Janus, plugin.VideoRoom, plugin.Started, s3)
  684. }
  685. logger.Tf(ctx, "Start subscribe answer=%vB, tid=%v ok, event=%v, plugin=%v, started=%v",
  686. len(answer), reply.transactionID, resBody.Janus, plugin.VideoRoom, plugin.Started)
  687. return nil
  688. }
  689. func (v *janusAPI) polling(ctx context.Context) error {
  690. api := fmt.Sprintf("%v%v?rid=%v&maxev=1", v.r, v.sessionID,
  691. uint64(time.Duration(time.Now().UnixNano())/time.Millisecond))
  692. logger.Tf(ctx, "Polling: Request url api=%v", api)
  693. req, err := http.NewRequest("GET", api, nil)
  694. if err != nil {
  695. return errors.Wrapf(err, "HTTP request %v", api)
  696. }
  697. res, err := http.DefaultClient.Do(req.WithContext(ctx))
  698. if err != nil {
  699. return errors.Wrapf(err, "Do HTTP request %v", api)
  700. }
  701. b2, err := ioutil.ReadAll(res.Body)
  702. if err != nil {
  703. return errors.Wrapf(err, "Read response for %v", api)
  704. }
  705. s2 := escapeJSON(string(b2))
  706. logger.Tf(ctx, "Polling: Response from %v is %v", api, s2)
  707. if len(b2) == 0 {
  708. return nil
  709. }
  710. replyID := struct {
  711. Janus string `json:"janus"`
  712. Transaction string `json:"transaction"`
  713. }{}
  714. if err := json.Unmarshal([]byte(s2), &replyID); err != nil {
  715. return errors.Wrapf(err, "Marshal %v", s2)
  716. }
  717. switch replyID.Janus {
  718. case "event":
  719. if r, ok := v.replies.Load(replyID.Transaction); !ok {
  720. if err := v.handleCall(replyID.Janus, s2); err != nil {
  721. logger.Wf(ctx, "Polling: Handle call %v fail %v, err %+v", replyID.Janus, s2, err)
  722. }
  723. } else if r2, ok := r.(*janusReply); !ok {
  724. logger.Wf(ctx, "Polling: Ignore tid=%v reply %v", replyID.Transaction, s2)
  725. } else {
  726. select {
  727. case <-ctx.Done():
  728. return ctx.Err()
  729. case r2.replies <- b2:
  730. logger.Tf(ctx, "Polling: Reply tid=%v ok, %v", replyID.Transaction, s2)
  731. }
  732. }
  733. case "keepalive":
  734. return nil
  735. case "webrtcup", "media", "slowlink", "detached":
  736. if err := v.handleCall(replyID.Janus, s2); err != nil {
  737. logger.Wf(ctx, "Polling: Handle call %v fail %v, err %+v", replyID.Janus, s2, err)
  738. }
  739. default:
  740. logger.Wf(ctx, "Polling: Unknown janus=%v %v", replyID.Janus, s2)
  741. }
  742. return nil
  743. }
  744. func (v *janusAPI) handleCall(janus string, s string) error {
  745. type callHeader struct {
  746. Sender uint64 `json:"sender"`
  747. SessionID uint64 `json:"session_id"`
  748. }
  749. switch janus {
  750. case "detached":
  751. /*{
  752. "janus": "detached",
  753. "sender": 4201795482244652,
  754. "session_id": 373403124722380
  755. }*/
  756. r := callHeader{}
  757. if err := json.Unmarshal([]byte(s), &r); err != nil {
  758. return err
  759. }
  760. v.onDetached(r.Sender, r.SessionID)
  761. case "webrtcup":
  762. /*{
  763. "janus": "webrtcup",
  764. "sender": 7698695982180732,
  765. "session_id": 2403223275773854
  766. }*/
  767. r := callHeader{}
  768. if err := json.Unmarshal([]byte(s), &r); err != nil {
  769. return err
  770. }
  771. v.onWebrtcUp(r.Sender, r.SessionID)
  772. case "media":
  773. /*{
  774. "janus": "media",
  775. "receiving": true,
  776. "sender": 7698695982180732,
  777. "session_id": 2403223275773854,
  778. "type": "audio"
  779. }*/
  780. r := struct {
  781. callHeader
  782. Type string `json:"type"`
  783. Receiving bool `json:"receiving"`
  784. }{}
  785. if err := json.Unmarshal([]byte(s), &r); err != nil {
  786. return err
  787. }
  788. v.onMedia(r.Sender, r.SessionID, r.Type, r.Receiving)
  789. case "slowlink":
  790. /*{
  791. "janus": "slowlink",
  792. "lost": 4294902988,
  793. "media": "video",
  794. "sender": 562229074390269,
  795. "session_id": 156116325213625,
  796. "uplink": false
  797. }*/
  798. r := struct {
  799. callHeader
  800. Lost uint64 `json:"lost"`
  801. Media string `json:"media"`
  802. Uplink bool `json:"uplink"`
  803. }{}
  804. if err := json.Unmarshal([]byte(s), &r); err != nil {
  805. return err
  806. }
  807. v.onSlowLink(r.Sender, r.SessionID, r.Media, r.Lost, r.Uplink)
  808. case "event":
  809. if strings.Contains(s, "publishers") {
  810. /*{
  811. "janus": "event",
  812. "plugindata": {
  813. "data": {
  814. "publishers": [{
  815. "audio_codec": "opus",
  816. "display": "test",
  817. "id": 2805536617160145,
  818. "talking": false,
  819. "video_codec": "h264"
  820. }],
  821. "room": 2345,
  822. "videoroom": "event"
  823. },
  824. "plugin": "janus.plugin.videoroom"
  825. },
  826. "sender": 2156044968631669,
  827. "session_id": 6696376606446844
  828. }*/
  829. r := struct {
  830. callHeader
  831. PluginData struct {
  832. Data struct {
  833. Publishers []publisherInfo `json:"publishers"`
  834. Room int `json:"room"`
  835. VideoRoom string `json:"videoroom"`
  836. } `json:"data"`
  837. Plugin string `json:"plugin"`
  838. } `json:"plugindata"`
  839. }{}
  840. if err := json.Unmarshal([]byte(s), &r); err != nil {
  841. return err
  842. }
  843. v.onPublisher(r.Sender, r.SessionID, r.PluginData.Data.Publishers)
  844. } else if strings.Contains(s, "unpublished") {
  845. /*{
  846. "janus": "event",
  847. "plugindata": {
  848. "data": {
  849. "room": 2345,
  850. "unpublished": 2805536617160145,
  851. "videoroom": "event"
  852. },
  853. "plugin": "janus.plugin.videoroom"
  854. },
  855. "sender": 2156044968631669,
  856. "session_id": 6696376606446844
  857. }*/
  858. r := struct {
  859. callHeader
  860. PluginData struct {
  861. Data struct {
  862. Room int `json:"room"`
  863. UnPublished uint64 `json:"unpublished"`
  864. VideoRoom string `json:"videoroom"`
  865. } `json:"data"`
  866. Plugin string `json:"plugin"`
  867. } `json:"plugindata"`
  868. }{}
  869. if err := json.Unmarshal([]byte(s), &r); err != nil {
  870. return err
  871. }
  872. v.onUnPublished(r.Sender, r.SessionID, r.PluginData.Data.UnPublished)
  873. } else if strings.Contains(s, "leaving") {
  874. /*{
  875. "janus": "event",
  876. "plugindata": {
  877. "data": {
  878. "leaving": 2805536617160145,
  879. "room": 2345,
  880. "videoroom": "event"
  881. },
  882. "plugin": "janus.plugin.videoroom"
  883. },
  884. "sender": 2156044968631669,
  885. "session_id": 6696376606446844
  886. }*/
  887. r := struct {
  888. callHeader
  889. PluginData struct {
  890. Data struct {
  891. Leaving uint64 `json:"leaving"`
  892. Room int `json:"room"`
  893. VideoRoom string `json:"videoroom"`
  894. } `json:"data"`
  895. Plugin string `json:"plugin"`
  896. } `json:"plugindata"`
  897. }{}
  898. if err := json.Unmarshal([]byte(s), &r); err != nil {
  899. return err
  900. }
  901. v.onLeave(r.Sender, r.SessionID, r.PluginData.Data.Leaving)
  902. }
  903. }
  904. return nil
  905. }
  906. func (v *janusAPI) DiscoverPublisher(ctx context.Context, room int, display string, timeout time.Duration) (*publisherInfo, error) {
  907. var publisher *publisherInfo
  908. discoverCtx, discoverCancel := context.WithCancel(context.Background())
  909. ov := v.onPublisher
  910. defer func() {
  911. v.onPublisher = ov
  912. }()
  913. v.onPublisher = func(sender, sessionID uint64, publishers []publisherInfo) {
  914. for _, p := range publishers {
  915. if p.Display == display {
  916. publisher = &p
  917. discoverCancel()
  918. logger.Tf(ctx, "Publisher discovered %v", p)
  919. return
  920. }
  921. }
  922. }
  923. go func() {
  924. if err := func() error {
  925. publishHandleID, err := v.AttachPlugin(ctx)
  926. if err != nil {
  927. return err
  928. }
  929. defer v.DetachPlugin(ctx, publishHandleID)
  930. if err := v.JoinAsPublisher(ctx, publishHandleID, room, fmt.Sprintf("sub-%v", display)); err != nil {
  931. return err
  932. }
  933. <-discoverCtx.Done()
  934. return nil
  935. }(); err != nil {
  936. logger.Ef(ctx, "join err %+v", err)
  937. }
  938. }()
  939. select {
  940. case <-ctx.Done():
  941. return nil, ctx.Err()
  942. case <-discoverCtx.Done():
  943. case <-time.After(timeout):
  944. discoverCancel()
  945. }
  946. if publisher == nil {
  947. return nil, errors.Errorf("no publisher for room=%v, display=%v, session=%v",
  948. room, display, v.sessionID)
  949. }
  950. return publisher, nil
  951. }