2
0

main.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package main
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "crypto/sha256"
  6. "encoding/hex"
  7. "fmt"
  8. "net"
  9. "sync"
  10. "time"
  11. )
  12. func main() {
  13. if err := doMain(); err != nil {
  14. panic(err)
  15. }
  16. }
  17. func doMain() error {
  18. hashID := buildHashID()
  19. listener, err := net.Listen("tcp", ":1935")
  20. if err != nil {
  21. return err
  22. }
  23. trace(hashID, "Listen at %v", listener.Addr())
  24. for {
  25. client, err := listener.Accept()
  26. if err != nil {
  27. return err
  28. }
  29. backend, err := net.Dial("tcp", "localhost:19350")
  30. if err != nil {
  31. return err
  32. }
  33. go serve(client, backend)
  34. }
  35. return nil
  36. }
  37. func serve(client, backend net.Conn) {
  38. defer client.Close()
  39. defer backend.Close()
  40. hashID := buildHashID()
  41. if err := doServe(hashID, client, backend); err != nil {
  42. trace(hashID, "Serve error %v", err)
  43. }
  44. }
  45. func doServe(hashID string, client, backend net.Conn) error {
  46. var wg sync.WaitGroup
  47. var r0 error
  48. ctx, cancel := context.WithCancel(context.Background())
  49. defer cancel()
  50. if c, ok := client.(*net.TCPConn); ok {
  51. c.SetNoDelay(true)
  52. }
  53. if c, ok := backend.(*net.TCPConn); ok {
  54. c.SetNoDelay(true)
  55. }
  56. wg.Add(1)
  57. go func() {
  58. defer wg.Done()
  59. defer cancel()
  60. for {
  61. buf := make([]byte, 128*1024)
  62. nn, err := client.Read(buf)
  63. if err != nil {
  64. trace(hashID, "Read from client error %v", err)
  65. r0 = err
  66. return
  67. }
  68. if nn == 0 {
  69. trace(hashID, "Read from client EOF")
  70. return
  71. }
  72. _, err = backend.Write(buf[:nn])
  73. if err != nil {
  74. trace(hashID, "Write to RTMP backend error %v", err)
  75. r0 = err
  76. return
  77. }
  78. trace(hashID, "Copy %v bytes to RTMP backend", nn)
  79. }
  80. }()
  81. wg.Add(1)
  82. go func() {
  83. defer wg.Done()
  84. defer cancel()
  85. for {
  86. buf := make([]byte, 128*1024)
  87. nn, err := backend.Read(buf)
  88. if err != nil {
  89. trace(hashID, "Read from RTMP backend error %v", err)
  90. r0 = err
  91. return
  92. }
  93. if nn == 0 {
  94. trace(hashID, "Read from RTMP backend EOF")
  95. return
  96. }
  97. _, err = client.Write(buf[:nn])
  98. if err != nil {
  99. trace(hashID, "Write to client error %v", err)
  100. r0 = err
  101. return
  102. }
  103. trace(hashID, "Copy %v bytes to RTMP client", nn)
  104. }
  105. }()
  106. wg.Add(1)
  107. go func() {
  108. defer wg.Done()
  109. defer client.Close()
  110. defer backend.Close()
  111. <-ctx.Done()
  112. trace(hashID, "Context is done, close the connections")
  113. }()
  114. trace(hashID, "Start proxing client %v over %v to backend %v", client.RemoteAddr(), backend.LocalAddr(), backend.RemoteAddr())
  115. wg.Wait()
  116. trace(hashID, "Finish proxing client %v over %v to backend %v", client.RemoteAddr(), backend.LocalAddr(), backend.RemoteAddr())
  117. return r0
  118. }
  119. func trace(id, msg string, a ...interface{}) {
  120. fmt.Println(fmt.Sprintf("[%v][%v] %v",
  121. time.Now().Format("2006-01-02 15:04:05.000"), id,
  122. fmt.Sprintf(msg, a...),
  123. ))
  124. }
  125. func buildHashID() string {
  126. randomData := make([]byte, 16)
  127. if _, err := rand.Read(randomData); err != nil {
  128. return ""
  129. }
  130. hash := sha256.Sum256(randomData)
  131. return hex.EncodeToString(hash[:])[:6]
  132. }