rtc.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515
  1. // Copyright (c) 2024 Winlin
  2. //
  3. // SPDX-License-Identifier: MIT
  4. package main
  5. import (
  6. "context"
  7. "encoding/binary"
  8. "fmt"
  9. "io/ioutil"
  10. "net"
  11. "net/http"
  12. "strconv"
  13. "strings"
  14. stdSync "sync"
  15. "srs-proxy/errors"
  16. "srs-proxy/logger"
  17. "srs-proxy/sync"
  18. )
  19. // srsWebRTCServer is the proxy for SRS WebRTC server via WHIP or WHEP protocol. It will figure out
  20. // which backend server to proxy to. It will also replace the UDP port to the proxy server's in the
  21. // SDP answer.
  22. type srsWebRTCServer struct {
  23. // The UDP listener for WebRTC server.
  24. listener *net.UDPConn
  25. // Fast cache for the username to identify the connection.
  26. // The key is username, the value is the UDP address.
  27. usernames sync.Map[string, *RTCConnection]
  28. // Fast cache for the udp address to identify the connection.
  29. // The key is UDP address, the value is the username.
  30. // TODO: Support fast earch by uint64 address.
  31. addresses sync.Map[string, *RTCConnection]
  32. // The wait group for server.
  33. wg stdSync.WaitGroup
  34. }
  35. func NewSRSWebRTCServer(opts ...func(*srsWebRTCServer)) *srsWebRTCServer {
  36. v := &srsWebRTCServer{}
  37. for _, opt := range opts {
  38. opt(v)
  39. }
  40. return v
  41. }
  42. func (v *srsWebRTCServer) Close() error {
  43. if v.listener != nil {
  44. _ = v.listener.Close()
  45. }
  46. v.wg.Wait()
  47. return nil
  48. }
  49. func (v *srsWebRTCServer) HandleApiForWHIP(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
  50. defer r.Body.Close()
  51. ctx = logger.WithContext(ctx)
  52. // Always allow CORS for all requests.
  53. if ok := apiCORS(ctx, w, r); ok {
  54. return nil
  55. }
  56. // Read remote SDP offer from body.
  57. remoteSDPOffer, err := ioutil.ReadAll(r.Body)
  58. if err != nil {
  59. return errors.Wrapf(err, "read remote sdp offer")
  60. }
  61. // Build the stream URL in vhost/app/stream schema.
  62. unifiedURL, fullURL := convertURLToStreamURL(r)
  63. logger.Df(ctx, "Got WebRTC WHIP from %v with %vB offer for %v", r.RemoteAddr, len(remoteSDPOffer), fullURL)
  64. streamURL, err := buildStreamURL(unifiedURL)
  65. if err != nil {
  66. return errors.Wrapf(err, "build stream url %v", unifiedURL)
  67. }
  68. // Pick a backend SRS server to proxy the RTMP stream.
  69. backend, err := srsLoadBalancer.Pick(ctx, streamURL)
  70. if err != nil {
  71. return errors.Wrapf(err, "pick backend for %v", streamURL)
  72. }
  73. if err = v.proxyApiToBackend(ctx, w, r, backend, string(remoteSDPOffer), streamURL); err != nil {
  74. return errors.Wrapf(err, "serve %v with %v by backend %+v", fullURL, streamURL, backend)
  75. }
  76. return nil
  77. }
  78. func (v *srsWebRTCServer) HandleApiForWHEP(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
  79. defer r.Body.Close()
  80. ctx = logger.WithContext(ctx)
  81. // Always allow CORS for all requests.
  82. if ok := apiCORS(ctx, w, r); ok {
  83. return nil
  84. }
  85. // Read remote SDP offer from body.
  86. remoteSDPOffer, err := ioutil.ReadAll(r.Body)
  87. if err != nil {
  88. return errors.Wrapf(err, "read remote sdp offer")
  89. }
  90. // Build the stream URL in vhost/app/stream schema.
  91. unifiedURL, fullURL := convertURLToStreamURL(r)
  92. logger.Df(ctx, "Got WebRTC WHEP from %v with %vB offer for %v", r.RemoteAddr, len(remoteSDPOffer), fullURL)
  93. streamURL, err := buildStreamURL(unifiedURL)
  94. if err != nil {
  95. return errors.Wrapf(err, "build stream url %v", unifiedURL)
  96. }
  97. // Pick a backend SRS server to proxy the RTMP stream.
  98. backend, err := srsLoadBalancer.Pick(ctx, streamURL)
  99. if err != nil {
  100. return errors.Wrapf(err, "pick backend for %v", streamURL)
  101. }
  102. if err = v.proxyApiToBackend(ctx, w, r, backend, string(remoteSDPOffer), streamURL); err != nil {
  103. return errors.Wrapf(err, "serve %v with %v by backend %+v", fullURL, streamURL, backend)
  104. }
  105. return nil
  106. }
  107. func (v *srsWebRTCServer) proxyApiToBackend(
  108. ctx context.Context, w http.ResponseWriter, r *http.Request, backend *SRSServer,
  109. remoteSDPOffer string, streamURL string,
  110. ) error {
  111. // Parse HTTP port from backend.
  112. if len(backend.API) == 0 {
  113. return errors.Errorf("no http api server")
  114. }
  115. var apiPort int
  116. if iv, err := strconv.ParseInt(backend.API[0], 10, 64); err != nil {
  117. return errors.Wrapf(err, "parse http port %v", backend.API[0])
  118. } else {
  119. apiPort = int(iv)
  120. }
  121. // Connect to backend SRS server via HTTP client.
  122. backendURL := fmt.Sprintf("http://%v:%v%s", backend.IP, apiPort, r.URL.Path)
  123. if r.URL.RawQuery != "" {
  124. backendURL += "?" + r.URL.RawQuery
  125. }
  126. req, err := http.NewRequestWithContext(ctx, r.Method, backendURL, strings.NewReader(remoteSDPOffer))
  127. if err != nil {
  128. return errors.Wrapf(err, "create request to %v", backendURL)
  129. }
  130. resp, err := http.DefaultClient.Do(req)
  131. if err != nil {
  132. return errors.Errorf("do request to %v EOF", backendURL)
  133. }
  134. defer resp.Body.Close()
  135. if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
  136. return errors.Errorf("proxy api to %v failed, status=%v", backendURL, resp.Status)
  137. }
  138. // Copy all headers from backend to client.
  139. w.WriteHeader(resp.StatusCode)
  140. for k, v := range resp.Header {
  141. for _, vv := range v {
  142. w.Header().Add(k, vv)
  143. }
  144. }
  145. // Parse the local SDP answer from backend.
  146. b, err := ioutil.ReadAll(resp.Body)
  147. if err != nil {
  148. return errors.Wrapf(err, "read stream from %v", backendURL)
  149. }
  150. // Replace the WebRTC UDP port in answer.
  151. localSDPAnswer := string(b)
  152. for _, endpoint := range backend.RTC {
  153. _, _, port, err := parseListenEndpoint(endpoint)
  154. if err != nil {
  155. return errors.Wrapf(err, "parse endpoint %v", endpoint)
  156. }
  157. from := fmt.Sprintf(" %v typ host", port)
  158. to := fmt.Sprintf(" %v typ host", envWebRTCServer())
  159. localSDPAnswer = strings.Replace(localSDPAnswer, from, to, -1)
  160. }
  161. // Fetch the ice-ufrag and ice-pwd from local SDP answer.
  162. remoteICEUfrag, remoteICEPwd, err := parseIceUfragPwd(remoteSDPOffer)
  163. if err != nil {
  164. return errors.Wrapf(err, "parse remote sdp offer")
  165. }
  166. localICEUfrag, localICEPwd, err := parseIceUfragPwd(localSDPAnswer)
  167. if err != nil {
  168. return errors.Wrapf(err, "parse local sdp answer")
  169. }
  170. // Save the new WebRTC connection to LB.
  171. icePair := &RTCICEPair{
  172. RemoteICEUfrag: remoteICEUfrag, RemoteICEPwd: remoteICEPwd,
  173. LocalICEUfrag: localICEUfrag, LocalICEPwd: localICEPwd,
  174. }
  175. if err := srsLoadBalancer.StoreWebRTC(ctx, streamURL, NewRTCConnection(func(c *RTCConnection) {
  176. c.StreamURL, c.Ufrag = streamURL, icePair.Ufrag()
  177. c.Initialize(ctx, v.listener)
  178. // Cache the connection for fast search by username.
  179. v.usernames.Store(c.Ufrag, c)
  180. })); err != nil {
  181. return errors.Wrapf(err, "load or store webrtc %v", streamURL)
  182. }
  183. // Response client with local answer.
  184. if _, err = w.Write([]byte(localSDPAnswer)); err != nil {
  185. return errors.Wrapf(err, "write local sdp answer %v", localSDPAnswer)
  186. }
  187. logger.Df(ctx, "Create WebRTC connection with local answer %vB with ice-ufrag=%v, ice-pwd=%vB",
  188. len(localSDPAnswer), localICEUfrag, len(localICEPwd))
  189. return nil
  190. }
  191. func (v *srsWebRTCServer) Run(ctx context.Context) error {
  192. // Parse address to listen.
  193. endpoint := envWebRTCServer()
  194. if !strings.Contains(endpoint, ":") {
  195. endpoint = fmt.Sprintf(":%v", endpoint)
  196. }
  197. saddr, err := net.ResolveUDPAddr("udp", endpoint)
  198. if err != nil {
  199. return errors.Wrapf(err, "resolve udp addr %v", endpoint)
  200. }
  201. listener, err := net.ListenUDP("udp", saddr)
  202. if err != nil {
  203. return errors.Wrapf(err, "listen udp %v", saddr)
  204. }
  205. v.listener = listener
  206. logger.Df(ctx, "WebRTC server listen at %v", saddr)
  207. // Consume all messages from UDP media transport.
  208. v.wg.Add(1)
  209. go func() {
  210. defer v.wg.Done()
  211. for ctx.Err() == nil {
  212. buf := make([]byte, 4096)
  213. n, caddr, err := listener.ReadFromUDP(buf)
  214. if err != nil {
  215. // TODO: If WebRTC server closed unexpectedly, we should notice the main loop to quit.
  216. logger.Wf(ctx, "read from udp failed, err=%+v", err)
  217. continue
  218. }
  219. if err := v.handleClientUDP(ctx, caddr, buf[:n]); err != nil {
  220. logger.Wf(ctx, "handle udp %vB failed, addr=%v, err=%+v", n, caddr, err)
  221. }
  222. }
  223. }()
  224. return nil
  225. }
  226. func (v *srsWebRTCServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data []byte) error {
  227. var connection *RTCConnection
  228. // If STUN binding request, parse the ufrag and identify the connection.
  229. if err := func() error {
  230. if rtcIsRTPOrRTCP(data) || !rtcIsSTUN(data) {
  231. return nil
  232. }
  233. var pkt RTCStunPacket
  234. if err := pkt.UnmarshalBinary(data); err != nil {
  235. return errors.Wrapf(err, "unmarshal stun packet")
  236. }
  237. // Search the connection in fast cache.
  238. if s, ok := v.usernames.Load(pkt.Username); ok {
  239. connection = s
  240. return nil
  241. }
  242. // Load connection by username.
  243. if s, err := srsLoadBalancer.LoadWebRTCByUfrag(ctx, pkt.Username); err != nil {
  244. return errors.Wrapf(err, "load webrtc by ufrag %v", pkt.Username)
  245. } else {
  246. connection = s.Initialize(ctx, v.listener)
  247. logger.Df(ctx, "Create WebRTC connection by ufrag=%v, stream=%v", pkt.Username, connection.StreamURL)
  248. }
  249. // Cache connection for fast search.
  250. if connection != nil {
  251. v.usernames.Store(pkt.Username, connection)
  252. }
  253. return nil
  254. }(); err != nil {
  255. return err
  256. }
  257. // Search the connection by addr.
  258. if s, ok := v.addresses.Load(addr.String()); ok {
  259. connection = s
  260. } else if connection != nil {
  261. // Cache the address for fast search.
  262. v.addresses.Store(addr.String(), connection)
  263. }
  264. // If connection is not found, ignore the packet.
  265. if connection == nil {
  266. // TODO: Should logging the dropped packet, only logging the first one for each address.
  267. return nil
  268. }
  269. // Proxy the packet to backend.
  270. if err := connection.HandlePacket(addr, data); err != nil {
  271. return errors.Wrapf(err, "proxy %vB for %v", len(data), connection.StreamURL)
  272. }
  273. return nil
  274. }
  275. // RTCConnection is a WebRTC connection proxy, for both WHIP and WHEP. It represents a WebRTC
  276. // connection, identify by the ufrag in sdp offer/answer and ICE binding request.
  277. //
  278. // It's not like RTMP or HTTP FLV/TS proxy connection, which are stateless and all state is
  279. // in the client request. The RTCConnection is stateful, and need to sync the ufrag between
  280. // proxy servers.
  281. //
  282. // The media transport is UDP, which is also a special thing for WebRTC. So if the client switch
  283. // to another UDP address, it may connect to another WebRTC proxy, then we should discover the
  284. // RTCConnection by the ufrag from the ICE binding request.
  285. type RTCConnection struct {
  286. // The stream context for WebRTC streaming.
  287. ctx context.Context
  288. // The stream URL in vhost/app/stream schema.
  289. StreamURL string `json:"stream_url"`
  290. // The ufrag for this WebRTC connection.
  291. Ufrag string `json:"ufrag"`
  292. // The UDP connection proxy to backend.
  293. backendUDP *net.UDPConn
  294. // The client UDP address. Note that it may change.
  295. clientUDP *net.UDPAddr
  296. // The listener UDP connection, used to send messages to client.
  297. listenerUDP *net.UDPConn
  298. }
  299. func NewRTCConnection(opts ...func(*RTCConnection)) *RTCConnection {
  300. v := &RTCConnection{}
  301. for _, opt := range opts {
  302. opt(v)
  303. }
  304. return v
  305. }
  306. func (v *RTCConnection) Initialize(ctx context.Context, listener *net.UDPConn) *RTCConnection {
  307. if v.ctx == nil {
  308. v.ctx = logger.WithContext(ctx)
  309. }
  310. if listener != nil {
  311. v.listenerUDP = listener
  312. }
  313. return v
  314. }
  315. func (v *RTCConnection) HandlePacket(addr *net.UDPAddr, data []byte) error {
  316. ctx := v.ctx
  317. // Update the current UDP address.
  318. v.clientUDP = addr
  319. // Start the UDP proxy to backend.
  320. if err := v.connectBackend(ctx); err != nil {
  321. return errors.Wrapf(err, "connect backend for %v", v.StreamURL)
  322. }
  323. // Proxy client message to backend.
  324. if v.backendUDP == nil {
  325. return nil
  326. }
  327. // Proxy all messages from backend to client.
  328. go func() {
  329. for ctx.Err() == nil {
  330. buf := make([]byte, 4096)
  331. n, _, err := v.backendUDP.ReadFromUDP(buf)
  332. if err != nil {
  333. // TODO: If backend server closed unexpectedly, we should notice the stream to quit.
  334. logger.Wf(ctx, "read from backend failed, err=%v", err)
  335. break
  336. }
  337. if _, err = v.listenerUDP.WriteToUDP(buf[:n], v.clientUDP); err != nil {
  338. // TODO: If backend server closed unexpectedly, we should notice the stream to quit.
  339. logger.Wf(ctx, "write to client failed, err=%v", err)
  340. break
  341. }
  342. }
  343. }()
  344. if _, err := v.backendUDP.Write(data); err != nil {
  345. return errors.Wrapf(err, "write to backend %v", v.StreamURL)
  346. }
  347. return nil
  348. }
  349. func (v *RTCConnection) connectBackend(ctx context.Context) error {
  350. if v.backendUDP != nil {
  351. return nil
  352. }
  353. // Pick a backend SRS server to proxy the RTC stream.
  354. backend, err := srsLoadBalancer.Pick(ctx, v.StreamURL)
  355. if err != nil {
  356. return errors.Wrapf(err, "pick backend")
  357. }
  358. // Parse UDP port from backend.
  359. if len(backend.RTC) == 0 {
  360. return errors.Errorf("no udp server")
  361. }
  362. _, _, udpPort, err := parseListenEndpoint(backend.RTC[0])
  363. if err != nil {
  364. return errors.Wrapf(err, "parse udp port %v of %v for %v", backend.RTC[0], backend, v.StreamURL)
  365. }
  366. // Connect to backend SRS server via UDP client.
  367. // TODO: FIXME: Support close the connection when timeout or DTLS alert.
  368. backendAddr := net.UDPAddr{IP: net.ParseIP(backend.IP), Port: int(udpPort)}
  369. if backendUDP, err := net.DialUDP("udp", nil, &backendAddr); err != nil {
  370. return errors.Wrapf(err, "dial udp to %v", backendAddr)
  371. } else {
  372. v.backendUDP = backendUDP
  373. }
  374. return nil
  375. }
  376. type RTCICEPair struct {
  377. // The remote ufrag, used for ICE username and session id.
  378. RemoteICEUfrag string `json:"remote_ufrag"`
  379. // The remote pwd, used for ICE password.
  380. RemoteICEPwd string `json:"remote_pwd"`
  381. // The local ufrag, used for ICE username and session id.
  382. LocalICEUfrag string `json:"local_ufrag"`
  383. // The local pwd, used for ICE password.
  384. LocalICEPwd string `json:"local_pwd"`
  385. }
  386. // Generate the ICE ufrag for the WebRTC streaming, format is remote-ufrag:local-ufrag.
  387. func (v *RTCICEPair) Ufrag() string {
  388. return fmt.Sprintf("%v:%v", v.LocalICEUfrag, v.RemoteICEUfrag)
  389. }
  390. type RTCStunPacket struct {
  391. // The stun message type.
  392. MessageType uint16
  393. // The stun username, or ufrag.
  394. Username string
  395. }
  396. func (v *RTCStunPacket) UnmarshalBinary(data []byte) error {
  397. if len(data) < 20 {
  398. return errors.Errorf("stun packet too short %v", len(data))
  399. }
  400. p := data
  401. v.MessageType = binary.BigEndian.Uint16(p)
  402. messageLen := binary.BigEndian.Uint16(p[2:])
  403. //magicCookie := p[:8]
  404. //transactionID := p[:20]
  405. p = p[20:]
  406. if len(p) != int(messageLen) {
  407. return errors.Errorf("stun packet length invalid %v != %v", len(data), messageLen)
  408. }
  409. for len(p) > 0 {
  410. typ := binary.BigEndian.Uint16(p)
  411. length := binary.BigEndian.Uint16(p[2:])
  412. p = p[4:]
  413. if len(p) < int(length) {
  414. return errors.Errorf("stun attribute length invalid %v < %v", len(p), length)
  415. }
  416. value := p[:length]
  417. p = p[length:]
  418. if length%4 != 0 {
  419. p = p[4-length%4:]
  420. }
  421. switch typ {
  422. case 0x0006:
  423. v.Username = string(value)
  424. }
  425. }
  426. return nil
  427. }