main.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. // The MIT License (MIT)
  2. //
  3. // Copyright (c) 2021 Winlin
  4. //
  5. // Permission is hereby granted, free of charge, to any person obtaining a copy of
  6. // this software and associated documentation files (the "Software"), to deal in
  7. // the Software without restriction, including without limitation the rights to
  8. // use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  9. // the Software, and to permit persons to whom the Software is furnished to do so,
  10. // subject to the following conditions:
  11. //
  12. // The above copyright notice and this permission notice shall be included in all
  13. // copies or substantial portions of the Software.
  14. //
  15. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  16. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  17. // FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  18. // COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  19. // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  20. // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  21. package main
  22. import (
  23. "context"
  24. "encoding/json"
  25. "flag"
  26. "fmt"
  27. "net/http"
  28. "os"
  29. "path"
  30. "strings"
  31. "sync"
  32. "github.com/ossrs/go-oryx-lib/errors"
  33. "github.com/ossrs/go-oryx-lib/logger"
  34. "golang.org/x/net/websocket"
  35. )
  36. type Participant struct {
  37. Room *Room `json:"-"`
  38. Display string `json:"display"`
  39. Publishing bool `json:"publishing"`
  40. Out chan []byte `json:"-"`
  41. }
  42. func (v *Participant) String() string {
  43. return fmt.Sprintf("display=%v, room=%v", v.Display, v.Room.Name)
  44. }
  45. type Room struct {
  46. Name string `json:"room"`
  47. Participants []*Participant `json:"participants"`
  48. lock sync.RWMutex `json:"-"`
  49. }
  50. func (v *Room) String() string {
  51. return fmt.Sprintf("room=%v, participants=%v", v.Name, len(v.Participants))
  52. }
  53. func (v *Room) Add(p *Participant) error {
  54. v.lock.Lock()
  55. defer v.lock.Unlock()
  56. for _, r := range v.Participants {
  57. if r.Display == p.Display {
  58. return errors.Errorf("Participant %v exists in room %v", p.Display, v.Name)
  59. }
  60. }
  61. v.Participants = append(v.Participants, p)
  62. return nil
  63. }
  64. func (v *Room) Get(display string) *Participant {
  65. v.lock.RLock()
  66. defer v.lock.RUnlock()
  67. for _, r := range v.Participants {
  68. if r.Display == display {
  69. return r
  70. }
  71. }
  72. return nil
  73. }
  74. func (v *Room) Remove(p *Participant) {
  75. v.lock.Lock()
  76. defer v.lock.Unlock()
  77. for i, r := range v.Participants {
  78. if p == r {
  79. v.Participants = append(v.Participants[:i], v.Participants[i+1:]...)
  80. return
  81. }
  82. }
  83. }
  84. func (v *Room) Notify(ctx context.Context, peer *Participant, event, param, data string) {
  85. var participants []*Participant
  86. func() {
  87. v.lock.RLock()
  88. defer v.lock.RUnlock()
  89. participants = append(participants, v.Participants...)
  90. }()
  91. for _, r := range participants {
  92. if r == peer {
  93. continue
  94. }
  95. res := struct {
  96. Action string `json:"action"`
  97. Event string `json:"event"`
  98. Param string `json:"param,omitempty"`
  99. Data string `json:"data,omitempty"`
  100. Room string `json:"room"`
  101. Self *Participant `json:"self"`
  102. Peer *Participant `json:"peer"`
  103. Participants []*Participant `json:"participants"`
  104. }{
  105. "notify", event, param, data,
  106. v.Name, r, peer, participants,
  107. }
  108. b, err := json.Marshal(struct {
  109. Message interface{} `json:"msg"`
  110. }{
  111. res,
  112. })
  113. if err != nil {
  114. return
  115. }
  116. select {
  117. case <-ctx.Done():
  118. return
  119. case r.Out <- b:
  120. }
  121. logger.Tf(ctx, "Notify %v about %v %v", r, peer, event)
  122. }
  123. }
  124. func main() {
  125. var listen string
  126. flag.StringVar(&listen, "listen", "1989", "The TCP listen port")
  127. var html string
  128. flag.StringVar(&html, "root", "./www", "The www web root")
  129. flag.Usage = func() {
  130. fmt.Println(fmt.Sprintf("Usage: %v [Options]", os.Args[0]))
  131. fmt.Println(fmt.Sprintf("Options:"))
  132. fmt.Println(fmt.Sprintf(" -listen The TCP listen port. Default: %v", listen))
  133. fmt.Println(fmt.Sprintf(" -root The www web root. Default: %v", html))
  134. fmt.Println(fmt.Sprintf("For example:"))
  135. fmt.Println(fmt.Sprintf(" %v -listen %v -html %v", os.Args[0], listen, html))
  136. }
  137. flag.Parse()
  138. if !strings.Contains(listen, ":") {
  139. listen = ":" + listen
  140. }
  141. ctx := context.Background()
  142. home := listen
  143. if strings.HasPrefix(home, ":") {
  144. home = "http://localhost" + listen
  145. }
  146. if !path.IsAbs(html) && path.IsAbs(os.Args[0]) {
  147. html = path.Join(path.Dir(os.Args[0]), html)
  148. }
  149. logger.Tf(ctx, "Signaling ok, root=%v, home page is %v", html, home)
  150. http.Handle("/", http.FileServer(http.Dir(html)))
  151. http.Handle("/sig/v1/versions", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  152. w.Write([]byte("1.0"))
  153. }))
  154. // Key is name of room, value is Room
  155. var rooms sync.Map
  156. http.Handle("/sig/v1/rtc", websocket.Handler(func(c *websocket.Conn) {
  157. ctx, cancel := context.WithCancel(logger.WithContext(ctx))
  158. defer cancel()
  159. r := c.Request()
  160. logger.Tf(ctx, "Serve client %v at %v", r.RemoteAddr, r.RequestURI)
  161. defer c.Close()
  162. var self *Participant
  163. go func() {
  164. <-ctx.Done()
  165. if self == nil {
  166. return
  167. }
  168. // Notify other peers that we're quiting.
  169. // @remark The ctx(of self) is done, so we must use a new context.
  170. go self.Room.Notify(context.Background(), self, "leave", "", "")
  171. self.Room.Remove(self)
  172. logger.Tf(ctx, "Remove client %v", self)
  173. }()
  174. inMessages := make(chan []byte, 0)
  175. go func() {
  176. defer cancel()
  177. buf := make([]byte, 16384)
  178. for {
  179. n, err := c.Read(buf)
  180. if err != nil {
  181. logger.Wf(ctx, "Ignore err %v for %v", err, r.RemoteAddr)
  182. break
  183. }
  184. select {
  185. case <-ctx.Done():
  186. case inMessages <- buf[:n]:
  187. }
  188. }
  189. }()
  190. outMessages := make(chan []byte, 0)
  191. go func() {
  192. defer cancel()
  193. handleMessage := func(m []byte) error {
  194. action := struct {
  195. TID string `json:"tid"`
  196. Message struct {
  197. Action string `json:"action"`
  198. } `json:"msg"`
  199. }{}
  200. if err := json.Unmarshal(m, &action); err != nil {
  201. return errors.Wrapf(err, "Unmarshal %s", m)
  202. }
  203. var res interface{}
  204. if action.Message.Action == "join" {
  205. obj := struct {
  206. Message struct {
  207. Room string `json:"room"`
  208. Display string `json:"display"`
  209. } `json:"msg"`
  210. }{}
  211. if err := json.Unmarshal(m, &obj); err != nil {
  212. return errors.Wrapf(err, "Unmarshal %s", m)
  213. }
  214. r, _ := rooms.LoadOrStore(obj.Message.Room, &Room{Name: obj.Message.Room})
  215. p := &Participant{Room: r.(*Room), Display: obj.Message.Display, Out: outMessages}
  216. if err := r.(*Room).Add(p); err != nil {
  217. return errors.Wrapf(err, "join")
  218. }
  219. self = p
  220. logger.Tf(ctx, "Join %v ok", self)
  221. res = struct {
  222. Action string `json:"action"`
  223. Room string `json:"room"`
  224. Self *Participant `json:"self"`
  225. Participants []*Participant `json:"participants"`
  226. }{
  227. action.Message.Action, obj.Message.Room, p, r.(*Room).Participants,
  228. }
  229. go r.(*Room).Notify(ctx, p, action.Message.Action, "", "")
  230. } else if action.Message.Action == "publish" {
  231. obj := struct {
  232. Message struct {
  233. Room string `json:"room"`
  234. Display string `json:"display"`
  235. } `json:"msg"`
  236. }{}
  237. if err := json.Unmarshal(m, &obj); err != nil {
  238. return errors.Wrapf(err, "Unmarshal %s", m)
  239. }
  240. r, _ := rooms.LoadOrStore(obj.Message.Room, &Room{Name: obj.Message.Room})
  241. p := r.(*Room).Get(obj.Message.Display)
  242. // Now, the peer is publishing.
  243. p.Publishing = true
  244. go r.(*Room).Notify(ctx, p, action.Message.Action, "", "")
  245. } else if action.Message.Action == "control" {
  246. obj := struct {
  247. Message struct {
  248. Room string `json:"room"`
  249. Display string `json:"display"`
  250. Call string `json:"call"`
  251. Data string `json:"data"`
  252. } `json:"msg"`
  253. }{}
  254. if err := json.Unmarshal(m, &obj); err != nil {
  255. return errors.Wrapf(err, "Unmarshal %s", m)
  256. }
  257. r, _ := rooms.LoadOrStore(obj.Message.Room, &Room{Name: obj.Message.Room})
  258. p := r.(*Room).Get(obj.Message.Display)
  259. go r.(*Room).Notify(ctx, p, action.Message.Action, obj.Message.Call, obj.Message.Data)
  260. } else {
  261. return errors.Errorf("Invalid message %s", m)
  262. }
  263. if b, err := json.Marshal(struct {
  264. TID string `json:"tid"`
  265. Message interface{} `json:"msg"`
  266. }{
  267. action.TID, res,
  268. }); err != nil {
  269. return errors.Wrapf(err, "marshal")
  270. } else {
  271. select {
  272. case <-ctx.Done():
  273. return ctx.Err()
  274. case outMessages <- b:
  275. }
  276. }
  277. return nil
  278. }
  279. for m := range inMessages {
  280. if err := handleMessage(m); err != nil {
  281. logger.Wf(ctx, "Handle %s err %v", m, err)
  282. break
  283. }
  284. }
  285. }()
  286. for m := range outMessages {
  287. if _, err := c.Write(m); err != nil {
  288. logger.Wf(ctx, "Ignore err %v for %v", err, r.RemoteAddr)
  289. break
  290. }
  291. }
  292. }))
  293. http.ListenAndServe(listen, nil)
  294. }