2
0

sip.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561
  1. // The MIT License (MIT)
  2. //
  3. // # Copyright (c) 2022-2024 Winlin
  4. //
  5. // Permission is hereby granted, free of charge, to any person obtaining a copy of
  6. // this software and associated documentation files (the "Software"), to deal in
  7. // the Software without restriction, including without limitation the rights to
  8. // use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  9. // the Software, and to permit persons to whom the Software is furnished to do so,
  10. // subject to the following conditions:
  11. //
  12. // The above copyright notice and this permission notice shall be included in all
  13. // copies or substantial portions of the Software.
  14. //
  15. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  16. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  17. // FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  18. // COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  19. // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  20. // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  21. package gb28181
  22. import (
  23. "context"
  24. "fmt"
  25. "github.com/ghettovoice/gosip/log"
  26. "github.com/ghettovoice/gosip/sip"
  27. "github.com/ghettovoice/gosip/transport"
  28. "github.com/ossrs/go-oryx-lib/errors"
  29. "github.com/ossrs/go-oryx-lib/logger"
  30. "math/rand"
  31. "net/url"
  32. "strings"
  33. "sync"
  34. "time"
  35. )
  36. type SIPConfig struct {
  37. // The server address, for example: tcp://127.0.0.1:5060k
  38. addr string
  39. // The SIP domain, for example: ossrs.io or 3402000000
  40. domain string
  41. // The SIP device ID, for example: camera or 34020000001320000001
  42. user string
  43. // The N number of random device ID, for example, 10 means 1320000001
  44. random int
  45. // The SIP server ID, for example: srs or 34020000002000000001
  46. server string
  47. // The cached device id.
  48. deviceID string
  49. }
  50. // The global cache to avoid conflict of deviceID.
  51. // Note that it's not coroutine safe, but it should be OK for utest.
  52. var deviceIDCache map[string]bool
  53. func init() {
  54. deviceIDCache = make(map[string]bool)
  55. }
  56. func (v *SIPConfig) DeviceID() string {
  57. for v.deviceID == "" {
  58. // Generate a random ID.
  59. var rid string
  60. for len(rid) < v.random {
  61. rid += fmt.Sprintf("%v", rand.Uint64())
  62. }
  63. deviceID := fmt.Sprintf("%v%v", v.user, rid[:v.random])
  64. // Ignore if exists.
  65. if _, ok := deviceIDCache[deviceID]; !ok {
  66. v.deviceID = deviceID
  67. deviceIDCache[deviceID] = true
  68. }
  69. }
  70. return v.deviceID
  71. }
  72. func (v *SIPConfig) String() string {
  73. sb := []string{}
  74. if v.addr != "" {
  75. sb = append(sb, fmt.Sprintf("addr=%v", v.addr))
  76. }
  77. if v.domain != "" {
  78. sb = append(sb, fmt.Sprintf("domain=%v", v.domain))
  79. }
  80. if v.user != "" {
  81. sb = append(sb, fmt.Sprintf("user=%v", v.user))
  82. sb = append(sb, fmt.Sprintf("deviceID=%v", v.DeviceID()))
  83. }
  84. if v.random > 0 {
  85. sb = append(sb, fmt.Sprintf("random=%v", v.random))
  86. }
  87. if v.server != "" {
  88. sb = append(sb, fmt.Sprintf("server=%v", v.server))
  89. }
  90. return strings.Join(sb, ",")
  91. }
  92. type SIPSession struct {
  93. conf *SIPConfig
  94. rb *sip.RequestBuilder
  95. requests chan sip.Request
  96. responses chan sip.Response
  97. wg sync.WaitGroup
  98. ctx context.Context
  99. cancel context.CancelFunc
  100. client *SIPClient
  101. seq uint
  102. }
  103. func NewSIPSession(c *SIPConfig) *SIPSession {
  104. return &SIPSession{
  105. conf: c, client: NewSIPClient(), rb: sip.NewRequestBuilder(),
  106. requests: make(chan sip.Request, 1024), responses: make(chan sip.Response, 1024),
  107. seq: 100,
  108. }
  109. }
  110. func (v *SIPSession) Close() error {
  111. if v.cancel != nil {
  112. v.cancel()
  113. }
  114. v.client.Close()
  115. v.wg.Wait()
  116. return nil
  117. }
  118. func (v *SIPSession) Connect(ctx context.Context) error {
  119. if ctx.Err() != nil {
  120. return ctx.Err()
  121. }
  122. ctx, cancel := context.WithCancel(ctx)
  123. v.ctx, v.cancel = ctx, cancel
  124. if err := v.client.Connect(ctx, v.conf.addr); err != nil {
  125. return errors.Wrapf(err, "connect with sipConfig %v", v.conf.String())
  126. }
  127. // Dispatch requests and responses.
  128. go func() {
  129. v.wg.Add(1)
  130. defer v.wg.Done()
  131. for {
  132. select {
  133. case <-v.ctx.Done():
  134. return
  135. case msg := <-v.client.incoming:
  136. if req, ok := msg.(sip.Request); ok {
  137. select {
  138. case v.requests <- req:
  139. case <-v.ctx.Done():
  140. return
  141. }
  142. } else if res, ok := msg.(sip.Response); ok {
  143. select {
  144. case v.responses <- res:
  145. case <-v.ctx.Done():
  146. return
  147. }
  148. } else {
  149. logger.Wf(ctx, "Drop message %v", msg.String())
  150. }
  151. }
  152. }
  153. }()
  154. return nil
  155. }
  156. func (v *SIPSession) Register(ctx context.Context) (sip.Message, sip.Message, error) {
  157. return v.doRegister(ctx, 3600)
  158. }
  159. func (v *SIPSession) UnRegister(ctx context.Context) (sip.Message, sip.Message, error) {
  160. return v.doRegister(ctx, 0)
  161. }
  162. func (v *SIPSession) doRegister(ctx context.Context, expires int) (sip.Message, sip.Message, error) {
  163. if ctx.Err() != nil {
  164. return nil, nil, ctx.Err()
  165. }
  166. sipPort := sip.Port(5060)
  167. sipCallID := sip.CallID(fmt.Sprintf("%v", rand.Uint64()))
  168. sipBranch := fmt.Sprintf("z9hG4bK_%v", rand.Uint32())
  169. sipTag := fmt.Sprintf("%v", rand.Uint32())
  170. sipMaxForwards := sip.MaxForwards(70)
  171. sipExpires := sip.Expires(uint32(expires))
  172. sipPIP := "192.168.3.99"
  173. v.seq++
  174. rb := v.rb
  175. rb.SetTransport("TCP")
  176. rb.SetMethod(sip.REGISTER)
  177. rb.AddVia(&sip.ViaHop{
  178. ProtocolName: "SIP", ProtocolVersion: "2.0", Transport: "TCP", Host: sipPIP, Port: &sipPort,
  179. Params: sip.NewParams().Add("branch", sip.String{Str: sipBranch}),
  180. })
  181. rb.SetFrom(&sip.Address{
  182. Uri: &sip.SipUri{FUser: sip.String{v.conf.DeviceID()}, FHost: v.conf.domain},
  183. Params: sip.NewParams().Add("tag", sip.String{Str: sipTag}),
  184. })
  185. rb.SetTo(&sip.Address{
  186. Uri: &sip.SipUri{FUser: sip.String{v.conf.DeviceID()}, FHost: v.conf.domain},
  187. })
  188. rb.SetCallID(&sipCallID)
  189. rb.SetSeqNo(v.seq)
  190. rb.SetRecipient(&sip.SipUri{FUser: sip.String{v.conf.server}, FHost: v.conf.domain})
  191. rb.SetContact(&sip.Address{
  192. Uri: &sip.SipUri{FUser: sip.String{v.conf.DeviceID()}, FHost: sipPIP, FPort: &sipPort},
  193. })
  194. rb.SetMaxForwards(&sipMaxForwards)
  195. rb.SetExpires(&sipExpires)
  196. req, err := rb.Build()
  197. if err != nil {
  198. return req, nil, errors.Wrap(err, "build request")
  199. }
  200. if err = v.client.Send(req); err != nil {
  201. return req, nil, errors.Wrapf(err, "send request %v", req.String())
  202. }
  203. callID := sipGetCallID(req)
  204. if callID == "" {
  205. return req, nil, errors.Errorf("Invalid SIP Call-ID register %v", req.String())
  206. }
  207. logger.Tf(ctx, "Send REGISTER request, Call-ID=%v, Expires=%v", callID, expires)
  208. for {
  209. select {
  210. case <-ctx.Done():
  211. return nil, nil, ctx.Err()
  212. case <-v.ctx.Done():
  213. return nil, nil, v.ctx.Err()
  214. case msg := <-v.responses:
  215. if tv := sipGetCallID(msg); tv == callID {
  216. return req, msg, nil
  217. } else {
  218. logger.Wf(v.ctx, "Not callID=%v, msg=%v, drop message %v", callID, tv, msg.String())
  219. }
  220. }
  221. }
  222. }
  223. func (v *SIPSession) Trying(ctx context.Context, invite sip.Message) error {
  224. if ctx.Err() != nil {
  225. return ctx.Err()
  226. }
  227. req, ok := invite.(sip.Request)
  228. if !ok {
  229. return errors.Errorf("Invalid SIP request invite %v", invite.String())
  230. }
  231. res := sip.NewResponseFromRequest("", req, sip.StatusCode(100), "Trying", "")
  232. if err := v.client.Send(res); err != nil {
  233. return errors.Wrapf(err, "send response %v", res.String())
  234. }
  235. return nil
  236. }
  237. func (v *SIPSession) InviteResponse(ctx context.Context, invite sip.Message) (sip.Message, error) {
  238. if ctx.Err() != nil {
  239. return nil, ctx.Err()
  240. }
  241. req, ok := invite.(sip.Request)
  242. if !ok {
  243. return nil, errors.Errorf("Invalid SIP request invite %v", invite.String())
  244. }
  245. callID := sipGetCallID(invite)
  246. if callID == "" {
  247. return nil, errors.Errorf("Invalid SIP Call-ID invite %v", invite.String())
  248. }
  249. res := sip.NewResponseFromRequest("", req, sip.StatusCode(200), "OK", "")
  250. if err := v.client.Send(res); err != nil {
  251. return nil, errors.Wrapf(err, "send response %v", res.String())
  252. }
  253. logger.Tf(ctx, "Send INVITE response, Call-ID=%v", callID)
  254. for {
  255. select {
  256. case <-ctx.Done():
  257. return nil, ctx.Err()
  258. case <-v.ctx.Done():
  259. return nil, v.ctx.Err()
  260. case msg := <-v.requests:
  261. // Must be an ACK message.
  262. if !msg.IsAck() {
  263. return msg, errors.Errorf("invalid ACK message %v", msg.String())
  264. }
  265. // Check CALL-ID of ACK, should be equal to 200 OK.
  266. if tv := sipGetCallID(msg); tv == callID {
  267. return msg, nil
  268. } else {
  269. logger.Wf(v.ctx, "Not callID=%v, msg=%v, drop message %v", callID, tv, msg.String())
  270. }
  271. }
  272. }
  273. }
  274. func (v *SIPSession) Message(ctx context.Context) (sip.Message, sip.Message, error) {
  275. if ctx.Err() != nil {
  276. return nil, nil, ctx.Err()
  277. }
  278. sipPort := sip.Port(5060)
  279. sipCallID := sip.CallID(fmt.Sprintf("%v", rand.Uint64()))
  280. sipBranch := fmt.Sprintf("z9hG4bK_%v", rand.Uint32())
  281. sipTag := fmt.Sprintf("%v", rand.Uint32())
  282. sipMaxForwards := sip.MaxForwards(70)
  283. sipExpires := sip.Expires(3600)
  284. sipPIP := "192.168.3.99"
  285. v.seq++
  286. rb := v.rb
  287. rb.SetTransport("TCP")
  288. rb.SetMethod(sip.MESSAGE)
  289. rb.AddVia(&sip.ViaHop{
  290. ProtocolName: "SIP", ProtocolVersion: "2.0", Transport: "TCP", Host: sipPIP, Port: &sipPort,
  291. Params: sip.NewParams().Add("branch", sip.String{Str: sipBranch}),
  292. })
  293. rb.SetFrom(&sip.Address{
  294. Uri: &sip.SipUri{FUser: sip.String{v.conf.DeviceID()}, FHost: v.conf.domain},
  295. Params: sip.NewParams().Add("tag", sip.String{Str: sipTag}),
  296. })
  297. rb.SetTo(&sip.Address{
  298. Uri: &sip.SipUri{FUser: sip.String{v.conf.server}, FHost: v.conf.domain},
  299. })
  300. rb.SetCallID(&sipCallID)
  301. rb.SetSeqNo(v.seq)
  302. rb.SetRecipient(&sip.SipUri{FUser: sip.String{v.conf.server}, FHost: v.conf.domain})
  303. rb.SetContact(&sip.Address{
  304. Uri: &sip.SipUri{FUser: sip.String{v.conf.DeviceID()}, FHost: sipPIP, FPort: &sipPort},
  305. })
  306. rb.SetMaxForwards(&sipMaxForwards)
  307. rb.SetExpires(&sipExpires)
  308. v.seq++
  309. rb.SetBody(strings.Join([]string{
  310. `<?xml version="1.0" encoding="GB2312"?>`,
  311. "<Notify>",
  312. "<CmdType>Keepalive</CmdType>",
  313. fmt.Sprintf("<SN>%v</SN>", v.seq),
  314. fmt.Sprintf("<DeviceID>%v</DeviceID>", v.conf.DeviceID()),
  315. "<Status>OK</Status>",
  316. "</Notify>\n",
  317. }, "\n"))
  318. req, err := rb.Build()
  319. if err != nil {
  320. return req, nil, errors.Wrap(err, "build request")
  321. }
  322. if err = v.client.Send(req); err != nil {
  323. return req, nil, errors.Wrapf(err, "send request %v", req.String())
  324. }
  325. callID := sipGetCallID(req)
  326. if callID == "" {
  327. return req, nil, errors.Errorf("Invalid SIP Call-ID message %v", req.String())
  328. }
  329. logger.Tf(ctx, "Send MESSAGE request, Call-ID=%v", callID)
  330. for {
  331. select {
  332. case <-ctx.Done():
  333. return nil, nil, ctx.Err()
  334. case <-v.ctx.Done():
  335. return nil, nil, v.ctx.Err()
  336. case msg := <-v.responses:
  337. if tv := sipGetCallID(msg); tv == callID {
  338. return req, msg, nil
  339. } else {
  340. logger.Wf(v.ctx, "Not callID=%v, msg=%v, drop message %v", callID, tv, msg.String())
  341. }
  342. }
  343. }
  344. }
  345. func (v *SIPSession) Bye(ctx context.Context) (sip.Message, sip.Message, error) {
  346. if ctx.Err() != nil {
  347. return nil, nil, ctx.Err()
  348. }
  349. sipPort := sip.Port(5060)
  350. sipCallID := sip.CallID(fmt.Sprintf("%v", rand.Uint64()))
  351. sipBranch := fmt.Sprintf("z9hG4bK_%v", rand.Uint32())
  352. sipTag := fmt.Sprintf("%v", rand.Uint32())
  353. sipMaxForwards := sip.MaxForwards(70)
  354. sipExpires := sip.Expires(3600)
  355. sipPIP := "192.168.3.99"
  356. v.seq++
  357. rb := v.rb
  358. rb.SetTransport("TCP")
  359. rb.SetMethod(sip.BYE)
  360. rb.AddVia(&sip.ViaHop{
  361. ProtocolName: "SIP", ProtocolVersion: "2.0", Transport: "TCP", Host: sipPIP, Port: &sipPort,
  362. Params: sip.NewParams().Add("branch", sip.String{Str: sipBranch}),
  363. })
  364. rb.SetFrom(&sip.Address{
  365. Uri: &sip.SipUri{FUser: sip.String{v.conf.DeviceID()}, FHost: v.conf.domain},
  366. Params: sip.NewParams().Add("tag", sip.String{Str: sipTag}),
  367. })
  368. rb.SetTo(&sip.Address{
  369. Uri: &sip.SipUri{FUser: sip.String{v.conf.server}, FHost: v.conf.domain},
  370. })
  371. rb.SetCallID(&sipCallID)
  372. rb.SetSeqNo(v.seq)
  373. rb.SetRecipient(&sip.SipUri{FUser: sip.String{v.conf.server}, FHost: v.conf.domain})
  374. rb.SetContact(&sip.Address{
  375. Uri: &sip.SipUri{FUser: sip.String{v.conf.DeviceID()}, FHost: sipPIP, FPort: &sipPort},
  376. })
  377. rb.SetMaxForwards(&sipMaxForwards)
  378. rb.SetExpires(&sipExpires)
  379. req, err := rb.Build()
  380. if err != nil {
  381. return req, nil, errors.Wrap(err, "build request")
  382. }
  383. if err = v.client.Send(req); err != nil {
  384. return req, nil, errors.Wrapf(err, "send request %v", req.String())
  385. }
  386. callID := sipGetCallID(req)
  387. if callID == "" {
  388. return req, nil, errors.Errorf("Invalid SIP Call-ID bye %v", req.String())
  389. }
  390. logger.Tf(ctx, "Send BYE request, Call-ID=%v", callID)
  391. for {
  392. select {
  393. case <-ctx.Done():
  394. return nil, nil, ctx.Err()
  395. case <-v.ctx.Done():
  396. return nil, nil, v.ctx.Err()
  397. case msg := <-v.responses:
  398. if tv := sipGetCallID(msg); tv == callID {
  399. return req, msg, nil
  400. } else {
  401. logger.Wf(v.ctx, "Not callID=%v, msg=%v, drop message %v", callID, tv, msg.String())
  402. }
  403. }
  404. }
  405. }
  406. func (v *SIPSession) Wait(ctx context.Context, method sip.RequestMethod) (sip.Message, error) {
  407. if ctx.Err() != nil {
  408. return nil, ctx.Err()
  409. }
  410. for {
  411. select {
  412. case <-ctx.Done():
  413. return nil, ctx.Err()
  414. case <-v.ctx.Done():
  415. return nil, v.ctx.Err()
  416. case msg := <-v.requests:
  417. if r, ok := msg.(sip.Request); ok && r.Method() == method {
  418. return msg, nil
  419. } else {
  420. logger.Wf(v.ctx, "Not method=%v, drop message %v", method, msg.String())
  421. }
  422. }
  423. }
  424. }
  425. type SIPClient struct {
  426. ctx context.Context
  427. cancel context.CancelFunc
  428. incoming chan sip.Message
  429. target *transport.Target
  430. protocol transport.Protocol
  431. cleanupTimeout time.Duration
  432. }
  433. func NewSIPClient() *SIPClient {
  434. return &SIPClient{
  435. cleanupTimeout: 5 * time.Second,
  436. }
  437. }
  438. func (v *SIPClient) Close() error {
  439. if v.cancel != nil {
  440. v.cancel()
  441. }
  442. // Wait for protocol stack to cleanup.
  443. if v.protocol != nil {
  444. select {
  445. case <-time.After(v.cleanupTimeout):
  446. logger.E(v.ctx, "Wait for protocol cleanup timeout")
  447. case <-v.protocol.Done():
  448. logger.T(v.ctx, "SIP protocol stack done")
  449. }
  450. }
  451. return nil
  452. }
  453. func (v *SIPClient) Connect(ctx context.Context, addr string) error {
  454. prURL, err := url.Parse(addr)
  455. if err != nil {
  456. return errors.Wrapf(err, "parse addr=%v", addr)
  457. }
  458. if prURL.Scheme != "tcp" && prURL.Scheme != "tcp4" {
  459. return errors.Errorf("invalid scheme=%v of addr=%v", prURL.Scheme, addr)
  460. }
  461. target, err := transport.NewTargetFromAddr(prURL.Host)
  462. if err != nil {
  463. return errors.Wrapf(err, "create target to %v", prURL.Host)
  464. }
  465. v.target = target
  466. incoming := make(chan sip.Message, 1024)
  467. errs := make(chan error, 1)
  468. cancels := make(chan struct{}, 1)
  469. protocol := transport.NewTcpProtocol(incoming, errs, cancels, nil, log.NewDefaultLogrusLogger())
  470. v.protocol = protocol
  471. v.incoming = incoming
  472. // Convert protocol stack errs to context signal.
  473. ctx, cancel := context.WithCancel(ctx)
  474. v.cancel = cancel
  475. v.ctx = ctx
  476. go func() {
  477. select {
  478. case <-ctx.Done():
  479. return
  480. case r0 := <-errs:
  481. logger.Ef(ctx, "SIP stack err %+v", r0)
  482. cancel()
  483. }
  484. }()
  485. // Covert context signal to cancels for protocol stack.
  486. go func() {
  487. <-ctx.Done()
  488. close(cancels)
  489. logger.Tf(ctx, "Notify SIP stack to cancel")
  490. }()
  491. return nil
  492. }
  493. func (v *SIPClient) Send(msg sip.Message) error {
  494. logger.Tf(v.ctx, "Send msg %v", msg.String())
  495. return v.protocol.Send(v.target, msg)
  496. }