udpproxy.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package vnet
  2. import (
  3. "context"
  4. "net"
  5. "sync"
  6. "time"
  7. )
  8. // UDPProxy is a proxy between real server(net.UDPConn) and vnet.UDPConn.
  9. //
  10. // High level design:
  11. //
  12. // ..............................................
  13. // : Virtual Network (vnet) :
  14. // : :
  15. // +-------+ * 1 +----+ +--------+ :
  16. // | :App |------------>|:Net|--o<-----|:Router | .............................
  17. // +-------+ +----+ | | : UDPProxy :
  18. // : | | +----+ +---------+ +---------+ +--------+
  19. // : | |--->o--|:Net|-->o-| vnet. |-->o-| net. |--->-| :Real |
  20. // : | | +----+ | UDPConn | | UDPConn | | Server |
  21. // : | | : +---------+ +---------+ +--------+
  22. // : | | ............................:
  23. // : +--------+ :
  24. // ...............................................
  25. type UDPProxy struct {
  26. // The router bind to.
  27. router *Router
  28. // Each vnet source, bind to a real socket to server.
  29. // key is real server addr, which is net.Addr
  30. // value is *aUDPProxyWorker
  31. workers sync.Map
  32. // For each endpoint, we never know when to start and stop proxy,
  33. // so we stop the endpoint when timeout.
  34. timeout time.Duration
  35. // For utest, to mock the target real server.
  36. // Optional, use the address of received client packet.
  37. mockRealServerAddr *net.UDPAddr
  38. }
  39. // NewProxy create a proxy, the router for this proxy belongs/bind to. If need to proxy for
  40. // please create a new proxy for each router. For all addresses we proxy, we will create a
  41. // vnet.Net in this router and proxy all packets.
  42. func NewProxy(router *Router) (*UDPProxy, error) {
  43. v := &UDPProxy{router: router, timeout: 2 * time.Minute}
  44. return v, nil
  45. }
  46. // Close the proxy, stop all workers.
  47. func (v *UDPProxy) Close() error {
  48. v.workers.Range(func(key, value interface{}) bool {
  49. _ = value.(*aUDPProxyWorker).Close()
  50. return true
  51. })
  52. return nil
  53. }
  54. // Proxy starts a worker for server, ignore if already started.
  55. func (v *UDPProxy) Proxy(client *Net, server *net.UDPAddr) error {
  56. // Note that even if the worker exists, it's also ok to create a same worker,
  57. // because the router will use the last one, and the real server will see a address
  58. // change event after we switch to the next worker.
  59. if _, ok := v.workers.Load(server.String()); ok {
  60. // nolint:godox // TODO: Need to restart the stopped worker?
  61. return nil
  62. }
  63. // Not exists, create a new one.
  64. worker := &aUDPProxyWorker{
  65. router: v.router, mockRealServerAddr: v.mockRealServerAddr,
  66. }
  67. // Create context for cleanup.
  68. var ctx context.Context
  69. ctx, worker.ctxDisposeCancel = context.WithCancel(context.Background())
  70. v.workers.Store(server.String(), worker)
  71. return worker.Proxy(ctx, client, server)
  72. }
  73. // A proxy worker for a specified proxy server.
  74. type aUDPProxyWorker struct {
  75. router *Router
  76. mockRealServerAddr *net.UDPAddr
  77. // Each vnet source, bind to a real socket to server.
  78. // key is vnet client addr, which is net.Addr
  79. // value is *net.UDPConn
  80. endpoints sync.Map
  81. // For cleanup.
  82. ctxDisposeCancel context.CancelFunc
  83. wg sync.WaitGroup
  84. }
  85. func (v *aUDPProxyWorker) Close() error {
  86. // Notify all goroutines to dispose.
  87. v.ctxDisposeCancel()
  88. // Wait for all goroutines quit.
  89. v.wg.Wait()
  90. return nil
  91. }
  92. func (v *aUDPProxyWorker) Proxy(ctx context.Context, client *Net, serverAddr *net.UDPAddr) error { // nolint:gocognit
  93. // Create vnet for real server by serverAddr.
  94. nw, err := NewNet(&NetConfig{
  95. StaticIP: serverAddr.IP.String(),
  96. })
  97. if err != nil {
  98. return err
  99. }
  100. if err := v.router.AddNet(nw); err != nil {
  101. return err
  102. }
  103. // We must create a "same" vnet.UDPConn as the net.UDPConn,
  104. // which has the same ip:port, to copy packets between them.
  105. vnetSocket, err := nw.ListenUDP("udp4", serverAddr)
  106. if err != nil {
  107. return err
  108. }
  109. // User stop proxy, we should close the socket.
  110. go func() {
  111. <-ctx.Done()
  112. _ = vnetSocket.Close()
  113. }()
  114. // Got new vnet client, start a new endpoint.
  115. findEndpointBy := func(addr net.Addr) (*net.UDPConn, error) {
  116. // Exists binding.
  117. if value, ok := v.endpoints.Load(addr.String()); ok {
  118. // Exists endpoint, reuse it.
  119. return value.(*net.UDPConn), nil
  120. }
  121. // The real server we proxy to, for utest to mock it.
  122. realAddr := serverAddr
  123. if v.mockRealServerAddr != nil {
  124. realAddr = v.mockRealServerAddr
  125. }
  126. // Got new vnet client, create new endpoint.
  127. realSocket, err := net.DialUDP("udp4", nil, realAddr)
  128. if err != nil {
  129. return nil, err
  130. }
  131. // User stop proxy, we should close the socket.
  132. go func() {
  133. <-ctx.Done()
  134. _ = realSocket.Close()
  135. }()
  136. // Bind address.
  137. v.endpoints.Store(addr.String(), realSocket)
  138. // Got packet from real serverAddr, we should proxy it to vnet.
  139. v.wg.Add(1)
  140. go func(vnetClientAddr net.Addr) {
  141. defer v.wg.Done()
  142. buf := make([]byte, 1500)
  143. for {
  144. n, _, err := realSocket.ReadFrom(buf)
  145. if err != nil {
  146. return
  147. }
  148. if n <= 0 {
  149. continue // Drop packet
  150. }
  151. if _, err := vnetSocket.WriteTo(buf[:n], vnetClientAddr); err != nil {
  152. return
  153. }
  154. }
  155. }(addr)
  156. return realSocket, nil
  157. }
  158. // Start a proxy goroutine.
  159. v.wg.Add(1)
  160. go func() {
  161. defer v.wg.Done()
  162. buf := make([]byte, 1500)
  163. for {
  164. n, addr, err := vnetSocket.ReadFrom(buf)
  165. if err != nil {
  166. return
  167. }
  168. if n <= 0 || addr == nil {
  169. continue // Drop packet
  170. }
  171. realSocket, err := findEndpointBy(addr)
  172. if err != nil {
  173. continue // Drop packet.
  174. }
  175. if _, err := realSocket.Write(buf[:n]); err != nil {
  176. return
  177. }
  178. }
  179. }()
  180. return nil
  181. }