rtmp.go 51 KB


  1. // Copyright (c) 2024 Winlin
  2. //
  3. // SPDX-License-Identifier: MIT
  4. package rtmp
  5. import (
  6. "bufio"
  7. "bytes"
  8. "context"
  9. "encoding"
  10. "encoding/binary"
  11. "fmt"
  12. "io"
  13. "math/rand"
  14. "sync"
  15. "srs-proxy/errors"
  16. )
  17. // The handshake implements the RTMP handshake protocol.
  18. type Handshake struct {
  19. // The random number generator.
  20. r *rand.Rand
  21. // The c1s1 cache.
  22. c1s1 []byte
  23. }
  24. func NewHandshake(r *rand.Rand) *Handshake {
  25. return &Handshake{r: r}
  26. }
  27. func (v *Handshake) C1S1() []byte {
  28. return v.c1s1
  29. }
  30. func (v *Handshake) WriteC0S0(w io.Writer) (err error) {
  31. r := bytes.NewReader([]byte{0x03})
  32. if _, err = io.Copy(w, r); err != nil {
  33. return errors.Wrap(err, "write c0s0")
  34. }
  35. return
  36. }
  37. func (v *Handshake) ReadC0S0(r io.Reader) (c0 []byte, err error) {
  38. b := &bytes.Buffer{}
  39. if _, err = io.CopyN(b, r, 1); err != nil {
  40. return nil, errors.Wrap(err, "read c0s0")
  41. }
  42. c0 = b.Bytes()
  43. return
  44. }
  45. func (v *Handshake) WriteC1S1(w io.Writer) (err error) {
  46. p := make([]byte, 1536)
  47. for i := 8; i < len(p); i++ {
  48. p[i] = byte(v.r.Int())
  49. }
  50. r := bytes.NewReader(p)
  51. if _, err = io.Copy(w, r); err != nil {
  52. return errors.Wrap(err, "write c0s1")
  53. }
  54. return
  55. }
  56. func (v *Handshake) ReadC1S1(r io.Reader) (c1s1 []byte, err error) {
  57. b := &bytes.Buffer{}
  58. if _, err = io.CopyN(b, r, 1536); err != nil {
  59. return nil, errors.Wrap(err, "read c1s1")
  60. }
  61. c1s1 = b.Bytes()
  62. v.c1s1 = c1s1
  63. return
  64. }
  65. func (v *Handshake) WriteC2S2(w io.Writer, s1c1 []byte) (err error) {
  66. r := bytes.NewReader(s1c1[:])
  67. if _, err = io.Copy(w, r); err != nil {
  68. return errors.Wrap(err, "write c2s2")
  69. }
  70. return
  71. }
  72. func (v *Handshake) ReadC2S2(r io.Reader) (c2 []byte, err error) {
  73. b := &bytes.Buffer{}
  74. if _, err = io.CopyN(b, r, 1536); err != nil {
  75. return nil, errors.Wrap(err, "read c2s2")
  76. }
  77. c2 = b.Bytes()
  78. return
  79. }
  80. // Please read @doc rtmp_specification_1.0.pdf, @page 16, @section 6.1. Chunk Format
  81. // Extended timestamp: 0 or 4 bytes
  82. // This field MUST be sent when the normal timsestamp is set to
  83. // 0xffffff, it MUST NOT be sent if the normal timestamp is set to
  84. // anything else. So for values less than 0xffffff the normal
  85. // timestamp field SHOULD be used in which case the extended timestamp
  86. // MUST NOT be present. For values greater than or equal to 0xffffff
  87. // the normal timestamp field MUST NOT be used and MUST be set to
  88. // 0xffffff and the extended timestamp MUST be sent.
  89. const extendedTimestamp = uint64(0xffffff)
  90. // The default chunk size of RTMP is 128 bytes.
  91. const defaultChunkSize = 128
  92. // The intput or output settings for RTMP protocol.
  93. type settings struct {
  94. chunkSize uint32
  95. }
  96. func newSettings() *settings {
  97. return &settings{
  98. chunkSize: defaultChunkSize,
  99. }
  100. }
  101. // The chunk stream which transport a message once.
  102. type chunkStream struct {
  103. format formatType
  104. cid chunkID
  105. header messageHeader
  106. message *Message
  107. count uint64
  108. extendedTimestamp bool
  109. }
  110. func newChunkStream() *chunkStream {
  111. return &chunkStream{}
  112. }
  113. // The protocol implements the RTMP command and chunk stack.
  114. type Protocol struct {
  115. r *bufio.Reader
  116. w *bufio.Writer
  117. input struct {
  118. opt *settings
  119. chunks map[chunkID]*chunkStream
  120. transactions map[amf0Number]amf0String
  121. ltransactions sync.Mutex
  122. }
  123. output struct {
  124. opt *settings
  125. }
  126. }
  127. func NewProtocol(rw io.ReadWriter) *Protocol {
  128. v := &Protocol{
  129. r: bufio.NewReader(rw),
  130. w: bufio.NewWriter(rw),
  131. }
  132. v.input.opt = newSettings()
  133. v.input.chunks = map[chunkID]*chunkStream{}
  134. v.input.transactions = map[amf0Number]amf0String{}
  135. v.output.opt = newSettings()
  136. return v
  137. }
  138. func ExpectPacket[T Packet](ctx context.Context, v *Protocol, ppkt *T) (m *Message, err error) {
  139. for {
  140. if m, err = v.ReadMessage(ctx); err != nil {
  141. return nil, errors.WithMessage(err, "read message")
  142. }
  143. var pkt Packet
  144. if pkt, err = v.DecodeMessage(m); err != nil {
  145. return nil, errors.WithMessage(err, "decode message")
  146. }
  147. if p, ok := pkt.(T); ok {
  148. *ppkt = p
  149. break
  150. }
  151. }
  152. return
  153. }
  154. // Deprecated: Please use rtmp.ExpectPacket instead.
  155. func (v *Protocol) ExpectPacket(ctx context.Context, ppkt any) (m *Message, err error) {
  156. panic("Please use rtmp.ExpectPacket instead")
  157. }
  158. func (v *Protocol) ExpectMessage(ctx context.Context, types ...MessageType) (m *Message, err error) {
  159. for {
  160. if m, err = v.ReadMessage(ctx); err != nil {
  161. return nil, errors.WithMessage(err, "read message")
  162. }
  163. if len(types) == 0 {
  164. return
  165. }
  166. for _, t := range types {
  167. if m.MessageType == t {
  168. return
  169. }
  170. }
  171. }
  172. return
  173. }
  174. func (v *Protocol) parseAMFObject(p []byte) (pkt Packet, err error) {
  175. var commandName amf0String
  176. if err = commandName.UnmarshalBinary(p); err != nil {
  177. return nil, errors.WithMessage(err, "unmarshal command name")
  178. }
  179. switch commandName {
  180. case commandResult, commandError:
  181. var transactionID amf0Number
  182. if err = transactionID.UnmarshalBinary(p[commandName.Size():]); err != nil {
  183. return nil, errors.WithMessage(err, "unmarshal tid")
  184. }
  185. var requestName amf0String
  186. if err = func() error {
  187. v.input.ltransactions.Lock()
  188. defer v.input.ltransactions.Unlock()
  189. var ok bool
  190. if requestName, ok = v.input.transactions[transactionID]; !ok {
  191. return errors.Errorf("No matched request for tid=%v", transactionID)
  192. }
  193. delete(v.input.transactions, transactionID)
  194. return nil
  195. }(); err != nil {
  196. return nil, errors.WithMessage(err, "discovery request name")
  197. }
  198. switch requestName {
  199. case commandConnect:
  200. return NewConnectAppResPacket(transactionID), nil
  201. case commandCreateStream:
  202. return NewCreateStreamResPacket(transactionID), nil
  203. case commandReleaseStream, commandFCPublish, commandFCUnpublish:
  204. call := NewCallPacket()
  205. call.TransactionID = transactionID
  206. return call, nil
  207. default:
  208. return nil, errors.Errorf("No request for %v", string(requestName))
  209. }
  210. case commandConnect:
  211. return NewConnectAppPacket(), nil
  212. case commandPublish:
  213. return NewPublishPacket(), nil
  214. case commandPlay:
  215. return NewPlayPacket(), nil
  216. default:
  217. return NewCallPacket(), nil
  218. }
  219. }
  220. func (v *Protocol) DecodeMessage(m *Message) (pkt Packet, err error) {
  221. p := m.Payload[:]
  222. if len(p) == 0 {
  223. return nil, errors.New("Empty packet")
  224. }
  225. switch m.MessageType {
  226. case MessageTypeAMF3Command, MessageTypeAMF3Data:
  227. p = p[1:]
  228. }
  229. switch m.MessageType {
  230. case MessageTypeSetChunkSize:
  231. pkt = NewSetChunkSize()
  232. case MessageTypeWindowAcknowledgementSize:
  233. pkt = NewWindowAcknowledgementSize()
  234. case MessageTypeSetPeerBandwidth:
  235. pkt = NewSetPeerBandwidth()
  236. case MessageTypeAMF0Command, MessageTypeAMF3Command, MessageTypeAMF0Data, MessageTypeAMF3Data:
  237. if pkt, err = v.parseAMFObject(p); err != nil {
  238. return nil, errors.WithMessage(err, fmt.Sprintf("Parse AMF %v", m.MessageType))
  239. }
  240. case MessageTypeUserControl:
  241. pkt = NewUserControl()
  242. default:
  243. return nil, errors.Errorf("Unknown message %v", m.MessageType)
  244. }
  245. if err = pkt.UnmarshalBinary(p); err != nil {
  246. return nil, errors.WithMessage(err, fmt.Sprintf("Unmarshal %v", m.MessageType))
  247. }
  248. return
  249. }
  250. func (v *Protocol) ReadMessage(ctx context.Context) (m *Message, err error) {
  251. for m == nil {
  252. // TODO: We should convert buffered io to async io, because we will be stuck in block io here,
  253. // TODO: but the risk is acceptable because we literally will set the underlay io timeout.
  254. if ctx.Err() != nil {
  255. return nil, ctx.Err()
  256. }
  257. var cid chunkID
  258. var format formatType
  259. if format, cid, err = v.readBasicHeader(ctx); err != nil {
  260. return nil, errors.WithMessage(err, "read basic header")
  261. }
  262. var ok bool
  263. var chunk *chunkStream
  264. if chunk, ok = v.input.chunks[cid]; !ok {
  265. chunk = newChunkStream()
  266. v.input.chunks[cid] = chunk
  267. chunk.header.betterCid = cid
  268. }
  269. if err = v.readMessageHeader(ctx, chunk, format); err != nil {
  270. return nil, errors.WithMessage(err, "read message header")
  271. }
  272. if m, err = v.readMessagePayload(ctx, chunk); err != nil {
  273. return nil, errors.WithMessage(err, "read message payload")
  274. }
  275. if err = v.onMessageArrivated(m); err != nil {
  276. return nil, errors.WithMessage(err, "on message")
  277. }
  278. }
  279. return
  280. }
  281. func (v *Protocol) readMessagePayload(ctx context.Context, chunk *chunkStream) (m *Message, err error) {
  282. // Empty payload message.
  283. if chunk.message.payloadLength == 0 {
  284. m = chunk.message
  285. chunk.message = nil
  286. return
  287. }
  288. // Calculate the chunk payload size.
  289. chunkedPayloadSize := int(chunk.message.payloadLength) - len(chunk.message.Payload)
  290. if chunkedPayloadSize > int(v.input.opt.chunkSize) {
  291. chunkedPayloadSize = int(v.input.opt.chunkSize)
  292. }
  293. b := make([]byte, chunkedPayloadSize)
  294. if _, err = io.ReadFull(v.r, b); err != nil {
  295. return nil, errors.Wrapf(err, "read chunk %vB", chunkedPayloadSize)
  296. }
  297. chunk.message.Payload = append(chunk.message.Payload, b...)
  298. // Got entire RTMP message?
  299. if int(chunk.message.payloadLength) == len(chunk.message.Payload) {
  300. m = chunk.message
  301. chunk.message = nil
  302. }
  303. return
  304. }
  305. // Please read @doc rtmp_specification_1.0.pdf, @page 18, @section 6.1.2. Chunk Message Header
  306. // There are four different formats for the chunk message header,
  307. // selected by the "fmt" field in the chunk basic header.
  308. type formatType uint8
  309. const (
  310. // 6.1.2.1. Type 0
  311. // Chunks of Type 0 are 11 bytes long. This type MUST be used at the
  312. // start of a chunk stream, and whenever the stream timestamp goes
  313. // backward (e.g., because of a backward seek).
  314. formatType0 formatType = iota
  315. // 6.1.2.2. Type 1
  316. // Chunks of Type 1 are 7 bytes long. The message stream ID is not
  317. // included; this chunk takes the same stream ID as the preceding chunk.
  318. // Streams with variable-sized messages (for example, many video
  319. // formats) SHOULD use this format for the first chunk of each new
  320. // message after the first.
  321. formatType1
  322. // 6.1.2.3. Type 2
  323. // Chunks of Type 2 are 3 bytes long. Neither the stream ID nor the
  324. // message length is included; this chunk has the same stream ID and
  325. // message length as the preceding chunk. Streams with constant-sized
  326. // messages (for example, some audio and data formats) SHOULD use this
  327. // format for the first chunk of each message after the first.
  328. formatType2
  329. // 6.1.2.4. Type 3
  330. // Chunks of Type 3 have no header. Stream ID, message length and
  331. // timestamp delta are not present; chunks of this type take values from
  332. // the preceding chunk. When a single message is split into chunks, all
  333. // chunks of a message except the first one, SHOULD use this type. Refer
  334. // to example 2 in section 6.2.2. Stream consisting of messages of
  335. // exactly the same size, stream ID and spacing in time SHOULD use this
  336. // type for all chunks after chunk of Type 2. Refer to example 1 in
  337. // section 6.2.1. If the delta between the first message and the second
  338. // message is same as the time stamp of first message, then chunk of
  339. // type 3 would immediately follow the chunk of type 0 as there is no
  340. // need for a chunk of type 2 to register the delta. If Type 3 chunk
  341. // follows a Type 0 chunk, then timestamp delta for this Type 3 chunk is
  342. // the same as the timestamp of Type 0 chunk.
  343. formatType3
  344. )
  345. // The message header size, index is format.
  346. var messageHeaderSizes = []int{11, 7, 3, 0}
  347. // Parse the chunk message header.
  348. // 3bytes: timestamp delta, fmt=0,1,2
  349. // 3bytes: payload length, fmt=0,1
  350. // 1bytes: message type, fmt=0,1
  351. // 4bytes: stream id, fmt=0
  352. // where:
  353. // fmt=0, 0x0X
  354. // fmt=1, 0x4X
  355. // fmt=2, 0x8X
  356. // fmt=3, 0xCX
  357. func (v *Protocol) readMessageHeader(ctx context.Context, chunk *chunkStream, format formatType) (err error) {
  358. // We should not assert anything about fmt, for the first packet.
  359. // (when first packet, the chunk.message is nil).
  360. // the fmt maybe 0/1/2/3, the FMLE will send a 0xC4 for some audio packet.
  361. // the previous packet is:
  362. // 04 // fmt=0, cid=4
  363. // 00 00 1a // timestamp=26
  364. // 00 00 9d // payload_length=157
  365. // 08 // message_type=8(audio)
  366. // 01 00 00 00 // stream_id=1
  367. // the current packet maybe:
  368. // c4 // fmt=3, cid=4
  369. // it's ok, for the packet is audio, and timestamp delta is 26.
  370. // the current packet must be parsed as:
  371. // fmt=0, cid=4
  372. // timestamp=26+26=52
  373. // payload_length=157
  374. // message_type=8(audio)
  375. // stream_id=1
  376. // so we must update the timestamp even fmt=3 for first packet.
  377. //
  378. // The fresh packet used to update the timestamp even fmt=3 for first packet.
  379. // fresh packet always means the chunk is the first one of message.
  380. var isFirstChunkOfMsg bool
  381. if chunk.message == nil {
  382. isFirstChunkOfMsg = true
  383. }
  384. // But, we can ensure that when a chunk stream is fresh,
  385. // the fmt must be 0, a new stream.
  386. if chunk.count == 0 && format != formatType0 {
  387. // For librtmp, if ping, it will send a fresh stream with fmt=1,
  388. // 0x42 where: fmt=1, cid=2, protocol contorl user-control message
  389. // 0x00 0x00 0x00 where: timestamp=0
  390. // 0x00 0x00 0x06 where: payload_length=6
  391. // 0x04 where: message_type=4(protocol control user-control message)
  392. // 0x00 0x06 where: event Ping(0x06)
  393. // 0x00 0x00 0x0d 0x0f where: event data 4bytes ping timestamp.
  394. // @see: https://github.com/ossrs/srs/issues/98
  395. if chunk.cid == chunkIDProtocolControl && format == formatType1 {
  396. // We accept cid=2, fmt=1 to make librtmp happy.
  397. } else {
  398. return errors.Errorf("For fresh chunk, fmt %v != %v(required), cid is %v", format, formatType0, chunk.cid)
  399. }
  400. }
  401. // When exists cache msg, means got an partial message,
  402. // the fmt must not be type0 which means new message.
  403. if chunk.message != nil && format == formatType0 {
  404. return errors.Errorf("For exists chunk, fmt is %v, cid is %v", format, chunk.cid)
  405. }
  406. // Create msg when new chunk stream start
  407. if chunk.message == nil {
  408. chunk.message = NewMessage()
  409. }
  410. // Read the message header.
  411. p := make([]byte, messageHeaderSizes[format])
  412. if _, err = io.ReadFull(v.r, p); err != nil {
  413. return errors.Wrapf(err, "read %vB message header", len(p))
  414. }
  415. // Prse the message header.
  416. // 3bytes: timestamp delta, fmt=0,1,2
  417. // 3bytes: payload length, fmt=0,1
  418. // 1bytes: message type, fmt=0,1
  419. // 4bytes: stream id, fmt=0
  420. // where:
  421. // fmt=0, 0x0X
  422. // fmt=1, 0x4X
  423. // fmt=2, 0x8X
  424. // fmt=3, 0xCX
  425. if format <= formatType2 {
  426. chunk.header.timestampDelta = uint32(p[0])<<16 | uint32(p[1])<<8 | uint32(p[2])
  427. p = p[3:]
  428. // fmt: 0
  429. // timestamp: 3 bytes
  430. // If the timestamp is greater than or equal to 16777215
  431. // (hexadecimal 0x00ffffff), this value MUST be 16777215, and the
  432. // 'extended timestamp header' MUST be present. Otherwise, this value
  433. // SHOULD be the entire timestamp.
  434. //
  435. // fmt: 1 or 2
  436. // timestamp delta: 3 bytes
  437. // If the delta is greater than or equal to 16777215 (hexadecimal
  438. // 0x00ffffff), this value MUST be 16777215, and the 'extended
  439. // timestamp header' MUST be present. Otherwise, this value SHOULD be
  440. // the entire delta.
  441. chunk.extendedTimestamp = uint64(chunk.header.timestampDelta) >= extendedTimestamp
  442. if !chunk.extendedTimestamp {
  443. // Extended timestamp: 0 or 4 bytes
  444. // This field MUST be sent when the normal timsestamp is set to
  445. // 0xffffff, it MUST NOT be sent if the normal timestamp is set to
  446. // anything else. So for values less than 0xffffff the normal
  447. // timestamp field SHOULD be used in which case the extended timestamp
  448. // MUST NOT be present. For values greater than or equal to 0xffffff
  449. // the normal timestamp field MUST NOT be used and MUST be set to
  450. // 0xffffff and the extended timestamp MUST be sent.
  451. if format == formatType0 {
  452. // 6.1.2.1. Type 0
  453. // For a type-0 chunk, the absolute timestamp of the message is sent
  454. // here.
  455. chunk.header.Timestamp = uint64(chunk.header.timestampDelta)
  456. } else {
  457. // 6.1.2.2. Type 1
  458. // 6.1.2.3. Type 2
  459. // For a type-1 or type-2 chunk, the difference between the previous
  460. // chunk's timestamp and the current chunk's timestamp is sent here.
  461. chunk.header.Timestamp += uint64(chunk.header.timestampDelta)
  462. }
  463. }
  464. if format <= formatType1 {
  465. payloadLength := uint32(p[0])<<16 | uint32(p[1])<<8 | uint32(p[2])
  466. p = p[3:]
  467. // For a message, if msg exists in cache, the size must not changed.
  468. // always use the actual msg size to compare, for the cache payload length can changed,
  469. // for the fmt type1(stream_id not changed), user can change the payload
  470. // length(it's not allowed in the continue chunks).
  471. if !isFirstChunkOfMsg && chunk.header.payloadLength != payloadLength {
  472. return errors.Errorf("Chunk message size %v != %v(required)", payloadLength, chunk.header.payloadLength)
  473. }
  474. chunk.header.payloadLength = payloadLength
  475. chunk.header.MessageType = MessageType(p[0])
  476. p = p[1:]
  477. if format == formatType0 {
  478. chunk.header.streamID = uint32(p[0]) | uint32(p[1])<<8 | uint32(p[2])<<16 | uint32(p[3])<<24
  479. p = p[4:]
  480. }
  481. }
  482. } else {
  483. // Update the timestamp even fmt=3 for first chunk packet
  484. if isFirstChunkOfMsg && !chunk.extendedTimestamp {
  485. chunk.header.Timestamp += uint64(chunk.header.timestampDelta)
  486. }
  487. }
  488. // Read extended-timestamp
  489. if chunk.extendedTimestamp {
  490. var timestamp uint32
  491. if err = binary.Read(v.r, binary.BigEndian, &timestamp); err != nil {
  492. return errors.Wrapf(err, "read ext-ts, pkt-ts=%v", chunk.header.Timestamp)
  493. }
  494. // We always use 31bits timestamp, for some server may use 32bits extended timestamp.
  495. // @see https://github.com/ossrs/srs/issues/111
  496. timestamp &= 0x7fffffff
  497. // TODO: FIXME: Support detect the extended timestamp.
  498. // @see http://blog.csdn.net/win_lin/article/details/13363699
  499. chunk.header.Timestamp = uint64(timestamp)
  500. }
  501. // The extended-timestamp must be unsigned-int,
  502. // 24bits timestamp: 0xffffff = 16777215ms = 16777.215s = 4.66h
  503. // 32bits timestamp: 0xffffffff = 4294967295ms = 4294967.295s = 1193.046h = 49.71d
  504. // because the rtmp protocol says the 32bits timestamp is about "50 days":
  505. // 3. Byte Order, Alignment, and Time Format
  506. // Because timestamps are generally only 32 bits long, they will roll
  507. // over after fewer than 50 days.
  508. //
  509. // but, its sample says the timestamp is 31bits:
  510. // An application could assume, for example, that all
  511. // adjacent timestamps are within 2^31 milliseconds of each other, so
  512. // 10000 comes after 4000000000, while 3000000000 comes before
  513. // 4000000000.
  514. // and flv specification says timestamp is 31bits:
  515. // Extension of the Timestamp field to form a SI32 value. This
  516. // field represents the upper 8 bits, while the previous
  517. // Timestamp field represents the lower 24 bits of the time in
  518. // milliseconds.
  519. // in a word, 31bits timestamp is ok.
  520. // convert extended timestamp to 31bits.
  521. chunk.header.Timestamp &= 0x7fffffff
  522. // Copy header to msg
  523. chunk.message.messageHeader = chunk.header
  524. // Increase the msg count, the chunk stream can accept fmt=1/2/3 message now.
  525. chunk.count++
  526. return
  527. }
  528. // Please read @doc rtmp_specification_1.0.pdf, @page 17, @section 6.1.1. Chunk Basic Header
  529. // The Chunk Basic Header encodes the chunk stream ID and the chunk
  530. // type(represented by fmt field in the figure below). Chunk type
  531. // determines the format of the encoded message header. Chunk Basic
  532. // Header field may be 1, 2, or 3 bytes, depending on the chunk stream
  533. // ID.
  534. //
  535. // The bits 0-5 (least significant) in the chunk basic header represent
  536. // the chunk stream ID.
  537. //
  538. // Chunk stream IDs 2-63 can be encoded in the 1-byte version of this
  539. // field.
  540. // 0 1 2 3 4 5 6 7
  541. // +-+-+-+-+-+-+-+-+
  542. // |fmt| cs id |
  543. // +-+-+-+-+-+-+-+-+
  544. // Figure 6 Chunk basic header 1
  545. //
  546. // Chunk stream IDs 64-319 can be encoded in the 2-byte version of this
  547. // field. ID is computed as (the second byte + 64).
  548. // 0 1
  549. // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
  550. // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  551. // |fmt| 0 | cs id - 64 |
  552. // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  553. // Figure 7 Chunk basic header 2
  554. //
  555. // Chunk stream IDs 64-65599 can be encoded in the 3-byte version of
  556. // this field. ID is computed as ((the third byte)*256 + the second byte
  557. // + 64).
  558. // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
  559. // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  560. // |fmt| 1 | cs id - 64 |
  561. // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  562. // Figure 8 Chunk basic header 3
  563. //
  564. // cs id: 6 bits
  565. // fmt: 2 bits
  566. // cs id - 64: 8 or 16 bits
  567. //
  568. // Chunk stream IDs with values 64-319 could be represented by both 2-
  569. // byte version and 3-byte version of this field.
  570. func (v *Protocol) readBasicHeader(ctx context.Context) (format formatType, cid chunkID, err error) {
  571. // 2-63, 1B chunk header
  572. var t uint8
  573. if err = binary.Read(v.r, binary.BigEndian, &t); err != nil {
  574. return format, cid, errors.Wrap(err, "read basic header")
  575. }
  576. cid = chunkID(t & 0x3f)
  577. format = formatType((t >> 6) & 0x03)
  578. if cid > 1 {
  579. return
  580. }
  581. // 64-319, 2B chunk header
  582. if err = binary.Read(v.r, binary.BigEndian, &t); err != nil {
  583. return format, cid, errors.Wrapf(err, "read basic header for cid=%v", cid)
  584. }
  585. cid = chunkID(64 + uint32(t))
  586. // 64-65599, 3B chunk header
  587. if cid == 1 {
  588. if err = binary.Read(v.r, binary.BigEndian, &t); err != nil {
  589. return format, cid, errors.Wrapf(err, "read basic header for cid=%v", cid)
  590. }
  591. cid += chunkID(uint32(t) * 256)
  592. }
  593. return
  594. }
  595. func (v *Protocol) WritePacket(ctx context.Context, pkt Packet, streamID int) (err error) {
  596. m := NewMessage()
  597. if m.Payload, err = pkt.MarshalBinary(); err != nil {
  598. return errors.WithMessage(err, "marshal payload")
  599. }
  600. m.MessageType = pkt.Type()
  601. m.streamID = uint32(streamID)
  602. m.betterCid = pkt.BetterCid()
  603. if err = v.WriteMessage(ctx, m); err != nil {
  604. return errors.WithMessage(err, "write message")
  605. }
  606. if err = v.onPacketWriten(m, pkt); err != nil {
  607. return errors.WithMessage(err, "on write packet")
  608. }
  609. return
  610. }
  611. func (v *Protocol) onPacketWriten(m *Message, pkt Packet) (err error) {
  612. var tid amf0Number
  613. var name amf0String
  614. switch pkt := pkt.(type) {
  615. case *ConnectAppPacket:
  616. tid, name = pkt.TransactionID, pkt.CommandName
  617. case *CreateStreamPacket:
  618. tid, name = pkt.TransactionID, pkt.CommandName
  619. case *CallPacket:
  620. tid, name = pkt.TransactionID, pkt.CommandName
  621. }
  622. if tid > 0 && len(name) > 0 {
  623. v.input.ltransactions.Lock()
  624. defer v.input.ltransactions.Unlock()
  625. v.input.transactions[tid] = name
  626. }
  627. return
  628. }
  629. func (v *Protocol) onMessageArrivated(m *Message) (err error) {
  630. if m == nil {
  631. return
  632. }
  633. var pkt Packet
  634. switch m.MessageType {
  635. case MessageTypeSetChunkSize, MessageTypeUserControl, MessageTypeWindowAcknowledgementSize:
  636. if pkt, err = v.DecodeMessage(m); err != nil {
  637. return errors.Errorf("decode message %v", m.MessageType)
  638. }
  639. }
  640. switch pkt := pkt.(type) {
  641. case *SetChunkSize:
  642. v.input.opt.chunkSize = pkt.ChunkSize
  643. }
  644. return
  645. }
  646. func (v *Protocol) WriteMessage(ctx context.Context, m *Message) (err error) {
  647. m.payloadLength = uint32(len(m.Payload))
  648. var c0h, c3h []byte
  649. if c0h, err = m.generateC0Header(); err != nil {
  650. return errors.WithMessage(err, "generate c0 header")
  651. }
  652. if c3h, err = m.generateC3Header(); err != nil {
  653. return errors.WithMessage(err, "generate c3 header")
  654. }
  655. var h []byte
  656. p := m.Payload
  657. for len(p) > 0 {
  658. // TODO: We should convert buffered io to async io, because we will be stuck in block io here,
  659. // TODO: but the risk is acceptable because we literally will set the underlay io timeout.
  660. if ctx.Err() != nil {
  661. return ctx.Err()
  662. }
  663. if h == nil {
  664. h = c0h
  665. } else {
  666. h = c3h
  667. }
  668. if _, err = io.Copy(v.w, bytes.NewReader(h)); err != nil {
  669. return errors.Wrapf(err, "write c0c3 header %x", h)
  670. }
  671. size := len(p)
  672. if size > int(v.output.opt.chunkSize) {
  673. size = int(v.output.opt.chunkSize)
  674. }
  675. if _, err = io.Copy(v.w, bytes.NewReader(p[:size])); err != nil {
  676. return errors.Wrapf(err, "write chunk payload %vB", size)
  677. }
  678. p = p[size:]
  679. }
  680. // TODO: We should convert buffered io to async io, because we will be stuck in block io here,
  681. // TODO: but the risk is acceptable because we literally will set the underlay io timeout.
  682. if ctx.Err() != nil {
  683. return ctx.Err()
  684. }
  685. // TODO: FIXME: Use writev to write for high performance.
  686. if err = v.w.Flush(); err != nil {
  687. return errors.Wrapf(err, "flush writer")
  688. }
  689. return
  690. }
  691. // Please read @doc rtmp_specification_1.0.pdf, @page 30, @section 4.1. Message Header
  692. // 1byte. One byte field to represent the message type. A range of type IDs
  693. // (1-7) are reserved for protocol control messages.
  694. type MessageType uint8
  695. const (
  696. // Please read @doc rtmp_specification_1.0.pdf, @page 30, @section 5. Protocol Control Messages
  697. // RTMP reserves message type IDs 1-7 for protocol control messages.
  698. // These messages contain information needed by the RTM Chunk Stream
  699. // protocol or RTMP itself. Protocol messages with IDs 1 & 2 are
  700. // reserved for usage with RTM Chunk Stream protocol. Protocol messages
  701. // with IDs 3-6 are reserved for usage of RTMP. Protocol message with ID
  702. // 7 is used between edge server and origin server.
  703. MessageTypeSetChunkSize MessageType = 0x01
  704. MessageTypeAbort MessageType = 0x02 // 0x02
  705. MessageTypeAcknowledgement MessageType = 0x03 // 0x03
  706. MessageTypeUserControl MessageType = 0x04 // 0x04
  707. MessageTypeWindowAcknowledgementSize MessageType = 0x05 // 0x05
  708. MessageTypeSetPeerBandwidth MessageType = 0x06 // 0x06
  709. MessageTypeEdgeAndOriginServerCommand MessageType = 0x07 // 0x07
  710. // Please read @doc rtmp_specification_1.0.pdf, @page 38, @section 3. Types of messages
  711. // The server and the client send messages over the network to
  712. // communicate with each other. The messages can be of any type which
  713. // includes audio messages, video messages, command messages, shared
  714. // object messages, data messages, and user control messages.
  715. //
  716. // Please read @doc rtmp_specification_1.0.pdf, @page 41, @section 3.4. Audio message
  717. // The client or the server sends this message to send audio data to the
  718. // peer. The message type value of 8 is reserved for audio messages.
  719. MessageTypeAudio MessageType = 0x08
  720. // Please read @doc rtmp_specification_1.0.pdf, @page 41, @section 3.5. Video message
  721. // The client or the server sends this message to send video data to the
  722. // peer. The message type value of 9 is reserved for video messages.
  723. // These messages are large and can delay the sending of other type of
  724. // messages. To avoid such a situation, the video message is assigned
  725. // the lowest priority.
  726. MessageTypeVideo MessageType = 0x09 // 0x09
  727. // Please read @doc rtmp_specification_1.0.pdf, @page 38, @section 3.1. Command message
  728. // Command messages carry the AMF-encoded commands between the client
  729. // and the server. These messages have been assigned message type value
  730. // of 20 for AMF0 encoding and message type value of 17 for AMF3
  731. // encoding. These messages are sent to perform some operations like
  732. // connect, createStream, publish, play, pause on the peer. Command
  733. // messages like onstatus, result etc. are used to inform the sender
  734. // about the status of the requested commands. A command message
  735. // consists of command name, transaction ID, and command object that
  736. // contains related parameters. A client or a server can request Remote
  737. // Procedure Calls (RPC) over streams that are communicated using the
  738. // command messages to the peer.
  739. MessageTypeAMF3Command MessageType = 17 // 0x11
  740. MessageTypeAMF0Command MessageType = 20 // 0x14
  741. // Please read @doc rtmp_specification_1.0.pdf, @page 38, @section 3.2. Data message
  742. // The client or the server sends this message to send Metadata or any
  743. // user data to the peer. Metadata includes details about the
  744. // data(audio, video etc.) like creation time, duration, theme and so
  745. // on. These messages have been assigned message type value of 18 for
  746. // AMF0 and message type value of 15 for AMF3.
  747. MessageTypeAMF0Data MessageType = 18 // 0x12
  748. MessageTypeAMF3Data MessageType = 15 // 0x0f
  749. )
  750. // The header of message.
  751. type messageHeader struct {
  752. // 3bytes.
  753. // Three-byte field that contains a timestamp delta of the message.
  754. // @remark, only used for decoding message from chunk stream.
  755. timestampDelta uint32
  756. // 3bytes.
  757. // Three-byte field that represents the size of the payload in bytes.
  758. // It is set in big-endian format.
  759. payloadLength uint32
  760. // 1byte.
  761. // One byte field to represent the message type. A range of type IDs
  762. // (1-7) are reserved for protocol control messages.
  763. MessageType MessageType
  764. // 4bytes.
  765. // Four-byte field that identifies the stream of the message. These
  766. // bytes are set in little-endian format.
  767. streamID uint32
  768. // The chunk stream id over which transport.
  769. betterCid chunkID
  770. // Four-byte field that contains a timestamp of the message.
  771. // The 4 bytes are packed in the big-endian order.
  772. // @remark, we use 64bits for large time for jitter detect and for large tbn like HLS.
  773. Timestamp uint64
  774. }
  775. // The RTMP message, transport over chunk stream in RTMP.
  776. // Please read the cs id of @doc rtmp_specification_1.0.pdf, @page 30, @section 4.1. Message Header
  777. type Message struct {
  778. messageHeader
  779. // The payload which carries the RTMP packet.
  780. Payload []byte
  781. }
  782. func NewMessage() *Message {
  783. return &Message{}
  784. }
  785. func NewStreamMessage(streamID int) *Message {
  786. v := NewMessage()
  787. v.streamID = uint32(streamID)
  788. v.betterCid = chunkIDOverStream
  789. return v
  790. }
  791. func (v *Message) generateC3Header() ([]byte, error) {
  792. var c3h []byte
  793. if v.Timestamp < extendedTimestamp {
  794. c3h = make([]byte, 1)
  795. } else {
  796. c3h = make([]byte, 1+4)
  797. }
  798. p := c3h
  799. p[0] = 0xc0 | byte(v.betterCid&0x3f)
  800. p = p[1:]
  801. // In RTMP protocol, there must not any timestamp in C3 header,
  802. // but actually all products from adobe, such as FMS/AMS and Flash player and FMLE,
  803. // always carry a extended timestamp in C3 header.
  804. // @see: http://blog.csdn.net/win_lin/article/details/13363699
  805. if v.Timestamp >= extendedTimestamp {
  806. p[0] = byte(v.Timestamp >> 24)
  807. p[1] = byte(v.Timestamp >> 16)
  808. p[2] = byte(v.Timestamp >> 8)
  809. p[3] = byte(v.Timestamp)
  810. }
  811. return c3h, nil
  812. }
  813. func (v *Message) generateC0Header() ([]byte, error) {
  814. var c0h []byte
  815. if v.Timestamp < extendedTimestamp {
  816. c0h = make([]byte, 1+3+3+1+4)
  817. } else {
  818. c0h = make([]byte, 1+3+3+1+4+4)
  819. }
  820. p := c0h
  821. p[0] = byte(v.betterCid) & 0x3f
  822. p = p[1:]
  823. if v.Timestamp < extendedTimestamp {
  824. p[0] = byte(v.Timestamp >> 16)
  825. p[1] = byte(v.Timestamp >> 8)
  826. p[2] = byte(v.Timestamp)
  827. } else {
  828. p[0] = 0xff
  829. p[1] = 0xff
  830. p[2] = 0xff
  831. }
  832. p = p[3:]
  833. p[0] = byte(v.payloadLength >> 16)
  834. p[1] = byte(v.payloadLength >> 8)
  835. p[2] = byte(v.payloadLength)
  836. p = p[3:]
  837. p[0] = byte(v.MessageType)
  838. p = p[1:]
  839. p[0] = byte(v.streamID)
  840. p[1] = byte(v.streamID >> 8)
  841. p[2] = byte(v.streamID >> 16)
  842. p[3] = byte(v.streamID >> 24)
  843. p = p[4:]
  844. if v.Timestamp >= extendedTimestamp {
  845. p[0] = byte(v.Timestamp >> 24)
  846. p[1] = byte(v.Timestamp >> 16)
  847. p[2] = byte(v.Timestamp >> 8)
  848. p[3] = byte(v.Timestamp)
  849. }
  850. return c0h, nil
  851. }
  852. // Please read the cs id of @doc rtmp_specification_1.0.pdf, @page 17, @section 6.1.1. Chunk Basic Header
  853. type chunkID uint32
  854. const (
  855. chunkIDProtocolControl chunkID = 0x02
  856. chunkIDOverConnection chunkID = 0x03
  857. chunkIDOverConnection2 chunkID = 0x04
  858. chunkIDOverStream chunkID = 0x05
  859. chunkIDOverStream2 chunkID = 0x06
  860. chunkIDVideo chunkID = 0x07
  861. chunkIDAudio chunkID = 0x08
  862. )
  863. // The Command Name of message.
  864. const (
  865. commandConnect amf0String = amf0String("connect")
  866. commandCreateStream amf0String = amf0String("createStream")
  867. commandCloseStream amf0String = amf0String("closeStream")
  868. commandPlay amf0String = amf0String("play")
  869. commandPause amf0String = amf0String("pause")
  870. commandOnBWDone amf0String = amf0String("onBWDone")
  871. commandOnStatus amf0String = amf0String("onStatus")
  872. commandResult amf0String = amf0String("_result")
  873. commandError amf0String = amf0String("_error")
  874. commandReleaseStream amf0String = amf0String("releaseStream")
  875. commandFCPublish amf0String = amf0String("FCPublish")
  876. commandFCUnpublish amf0String = amf0String("FCUnpublish")
  877. commandPublish amf0String = amf0String("publish")
  878. commandRtmpSampleAccess amf0String = amf0String("|RtmpSampleAccess")
  879. )
  880. // The RTMP packet, transport as payload of RTMP message.
  881. type Packet interface {
  882. // Marshaler and unmarshaler
  883. Size() int
  884. encoding.BinaryUnmarshaler
  885. encoding.BinaryMarshaler
  886. // RTMP protocol fields for each packet.
  887. BetterCid() chunkID
  888. Type() MessageType
  889. }
  890. // A Call packet, both object and args are AMF0 objects.
  891. type objectCallPacket struct {
  892. CommandName amf0String
  893. TransactionID amf0Number
  894. CommandObject *amf0Object
  895. Args *amf0Object
  896. }
  897. func (v *objectCallPacket) BetterCid() chunkID {
  898. return chunkIDOverConnection
  899. }
  900. func (v *objectCallPacket) Type() MessageType {
  901. return MessageTypeAMF0Command
  902. }
  903. func (v *objectCallPacket) Size() int {
  904. size := v.CommandName.Size() + v.TransactionID.Size() + v.CommandObject.Size()
  905. if v.Args != nil {
  906. size += v.Args.Size()
  907. }
  908. return size
  909. }
  910. func (v *objectCallPacket) UnmarshalBinary(data []byte) (err error) {
  911. p := data
  912. if err = v.CommandName.UnmarshalBinary(p); err != nil {
  913. return errors.WithMessage(err, "unmarshal command name")
  914. }
  915. p = p[v.CommandName.Size():]
  916. if err = v.TransactionID.UnmarshalBinary(p); err != nil {
  917. return errors.WithMessage(err, "unmarshal tid")
  918. }
  919. p = p[v.TransactionID.Size():]
  920. if err = v.CommandObject.UnmarshalBinary(p); err != nil {
  921. return errors.WithMessage(err, "unmarshal command")
  922. }
  923. p = p[v.CommandObject.Size():]
  924. if len(p) == 0 {
  925. return
  926. }
  927. v.Args = NewAmf0Object()
  928. if err = v.Args.UnmarshalBinary(p); err != nil {
  929. return errors.WithMessage(err, "unmarshal args")
  930. }
  931. return
  932. }
  933. func (v *objectCallPacket) MarshalBinary() (data []byte, err error) {
  934. var pb []byte
  935. if pb, err = v.CommandName.MarshalBinary(); err != nil {
  936. return nil, errors.WithMessage(err, "marshal command name")
  937. }
  938. data = append(data, pb...)
  939. if pb, err = v.TransactionID.MarshalBinary(); err != nil {
  940. return nil, errors.WithMessage(err, "marshal tid")
  941. }
  942. data = append(data, pb...)
  943. if pb, err = v.CommandObject.MarshalBinary(); err != nil {
  944. return nil, errors.WithMessage(err, "marshal command object")
  945. }
  946. data = append(data, pb...)
  947. if v.Args != nil {
  948. if pb, err = v.Args.MarshalBinary(); err != nil {
  949. return nil, errors.WithMessage(err, "marshal args")
  950. }
  951. data = append(data, pb...)
  952. }
  953. return
  954. }
  955. // Please read @doc rtmp_specification_1.0.pdf, @page 45, @section 4.1.1. connect
  956. // The client sends the connect command to the server to request
  957. // connection to a server application instance.
  958. type ConnectAppPacket struct {
  959. objectCallPacket
  960. }
  961. func NewConnectAppPacket() *ConnectAppPacket {
  962. v := &ConnectAppPacket{}
  963. v.CommandName = commandConnect
  964. v.CommandObject = NewAmf0Object()
  965. v.TransactionID = amf0Number(1.0)
  966. return v
  967. }
  968. func (v *ConnectAppPacket) UnmarshalBinary(data []byte) (err error) {
  969. if err = v.objectCallPacket.UnmarshalBinary(data); err != nil {
  970. return errors.WithMessage(err, "unmarshal call")
  971. }
  972. if v.CommandName != commandConnect {
  973. return errors.Errorf("Invalid command name %v", string(v.CommandName))
  974. }
  975. if v.TransactionID != 1.0 {
  976. return errors.Errorf("Invalid transaction ID %v", float64(v.TransactionID))
  977. }
  978. return
  979. }
  980. func (v *ConnectAppPacket) TcUrl() string {
  981. if v.CommandObject != nil {
  982. if v, ok := v.CommandObject.Get("tcUrl").(*amf0String); ok {
  983. return string(*v)
  984. }
  985. }
  986. return ""
  987. }
  988. // The response for ConnectAppPacket.
  989. type ConnectAppResPacket struct {
  990. objectCallPacket
  991. }
  992. func NewConnectAppResPacket(tid amf0Number) *ConnectAppResPacket {
  993. v := &ConnectAppResPacket{}
  994. v.CommandName = commandResult
  995. v.CommandObject = NewAmf0Object()
  996. v.Args = NewAmf0Object()
  997. v.TransactionID = tid
  998. return v
  999. }
  1000. func (v *ConnectAppResPacket) SrsID() string {
  1001. if v.Args != nil {
  1002. if v, ok := v.Args.Get("data").(*amf0EcmaArray); ok {
  1003. if v, ok := v.Get("srs_id").(*amf0String); ok {
  1004. return string(*v)
  1005. }
  1006. }
  1007. }
  1008. return ""
  1009. }
  1010. func (v *ConnectAppResPacket) UnmarshalBinary(data []byte) (err error) {
  1011. if err = v.objectCallPacket.UnmarshalBinary(data); err != nil {
  1012. return errors.WithMessage(err, "unmarshal call")
  1013. }
  1014. if v.CommandName != commandResult {
  1015. return errors.Errorf("Invalid command name %v", string(v.CommandName))
  1016. }
  1017. return
  1018. }
  1019. // A Call object, command object is variant.
  1020. type variantCallPacket struct {
  1021. CommandName amf0String
  1022. TransactionID amf0Number
  1023. CommandObject amf0Any // object or null
  1024. }
  1025. func (v *variantCallPacket) BetterCid() chunkID {
  1026. return chunkIDOverConnection
  1027. }
  1028. func (v *variantCallPacket) Type() MessageType {
  1029. return MessageTypeAMF0Command
  1030. }
  1031. func (v *variantCallPacket) Size() int {
  1032. size := v.CommandName.Size() + v.TransactionID.Size()
  1033. if v.CommandObject != nil {
  1034. size += v.CommandObject.Size()
  1035. }
  1036. return size
  1037. }
  1038. func (v *variantCallPacket) UnmarshalBinary(data []byte) (err error) {
  1039. p := data
  1040. if err = v.CommandName.UnmarshalBinary(p); err != nil {
  1041. return errors.WithMessage(err, "unmarshal command name")
  1042. }
  1043. p = p[v.CommandName.Size():]
  1044. if err = v.TransactionID.UnmarshalBinary(p); err != nil {
  1045. return errors.WithMessage(err, "unmarshal tid")
  1046. }
  1047. p = p[v.TransactionID.Size():]
  1048. if len(p) > 0 {
  1049. if v.CommandObject, err = Amf0Discovery(p); err != nil {
  1050. return errors.WithMessage(err, "discovery command object")
  1051. }
  1052. if err = v.CommandObject.UnmarshalBinary(p); err != nil {
  1053. return errors.WithMessage(err, "unmarshal command object")
  1054. }
  1055. p = p[v.CommandObject.Size():]
  1056. }
  1057. return
  1058. }
  1059. func (v *variantCallPacket) MarshalBinary() (data []byte, err error) {
  1060. var pb []byte
  1061. if pb, err = v.CommandName.MarshalBinary(); err != nil {
  1062. return nil, errors.WithMessage(err, "marshal command name")
  1063. }
  1064. data = append(data, pb...)
  1065. if pb, err = v.TransactionID.MarshalBinary(); err != nil {
  1066. return nil, errors.WithMessage(err, "marshal tid")
  1067. }
  1068. data = append(data, pb...)
  1069. if v.CommandObject != nil {
  1070. if pb, err = v.CommandObject.MarshalBinary(); err != nil {
  1071. return nil, errors.WithMessage(err, "marshal command object")
  1072. }
  1073. data = append(data, pb...)
  1074. }
  1075. return
  1076. }
  1077. // Please read @doc rtmp_specification_1.0.pdf, @page 51, @section 4.1.2. Call
  1078. // The call method of the NetConnection object runs remote procedure
  1079. // calls (RPC) at the receiving end. The called RPC name is passed as a
  1080. // parameter to the call command.
  1081. // @remark onStatus packet is a call packet.
  1082. type CallPacket struct {
  1083. variantCallPacket
  1084. Args amf0Any // optional or object or null
  1085. }
  1086. func NewCallPacket() *CallPacket {
  1087. return &CallPacket{}
  1088. }
  1089. func (v *CallPacket) ArgsCode() string {
  1090. if v.Args != nil {
  1091. if v, ok := v.Args.(*amf0Object); ok {
  1092. if code, ok := v.Get("code").(*amf0String); ok {
  1093. return string(*code)
  1094. }
  1095. }
  1096. }
  1097. return ""
  1098. }
  1099. func (v *CallPacket) Size() int {
  1100. size := v.variantCallPacket.Size()
  1101. if v.Args != nil {
  1102. size += v.Args.Size()
  1103. }
  1104. return size
  1105. }
  1106. func (v *CallPacket) UnmarshalBinary(data []byte) (err error) {
  1107. p := data
  1108. if err = v.variantCallPacket.UnmarshalBinary(p); err != nil {
  1109. return errors.WithMessage(err, "unmarshal call")
  1110. }
  1111. p = p[v.variantCallPacket.Size():]
  1112. if len(p) > 0 {
  1113. if v.Args, err = Amf0Discovery(p); err != nil {
  1114. return errors.WithMessage(err, "discovery args")
  1115. }
  1116. if err = v.Args.UnmarshalBinary(p); err != nil {
  1117. return errors.WithMessage(err, "unmarshal args")
  1118. }
  1119. }
  1120. return
  1121. }
  1122. func (v *CallPacket) MarshalBinary() (data []byte, err error) {
  1123. var pb []byte
  1124. if pb, err = v.variantCallPacket.MarshalBinary(); err != nil {
  1125. return nil, errors.WithMessage(err, "marshal call")
  1126. }
  1127. data = append(data, pb...)
  1128. if v.Args != nil {
  1129. if pb, err = v.Args.MarshalBinary(); err != nil {
  1130. return nil, errors.WithMessage(err, "marshal args")
  1131. }
  1132. data = append(data, pb...)
  1133. }
  1134. return
  1135. }
  1136. // Please read @doc rtmp_specification_1.0.pdf, @page 52, @section 4.1.3. createStream
  1137. // The client sends this command to the server to create a logical
  1138. // channel for message communication The publishing of audio, video, and
  1139. // metadata is carried out over stream channel created using the
  1140. // createStream command.
  1141. type CreateStreamPacket struct {
  1142. variantCallPacket
  1143. }
  1144. func NewCreateStreamPacket() *CreateStreamPacket {
  1145. v := &CreateStreamPacket{}
  1146. v.CommandName = commandCreateStream
  1147. v.TransactionID = amf0Number(2)
  1148. v.CommandObject = NewAmf0Null()
  1149. return v
  1150. }
  1151. // The response for create stream
  1152. type CreateStreamResPacket struct {
  1153. variantCallPacket
  1154. StreamID amf0Number
  1155. }
  1156. func NewCreateStreamResPacket(tid amf0Number) *CreateStreamResPacket {
  1157. v := &CreateStreamResPacket{}
  1158. v.CommandName = commandResult
  1159. v.TransactionID = tid
  1160. v.CommandObject = NewAmf0Null()
  1161. v.StreamID = 0
  1162. return v
  1163. }
  1164. func (v *CreateStreamResPacket) Size() int {
  1165. return v.variantCallPacket.Size() + v.StreamID.Size()
  1166. }
  1167. func (v *CreateStreamResPacket) UnmarshalBinary(data []byte) (err error) {
  1168. p := data
  1169. if err = v.variantCallPacket.UnmarshalBinary(p); err != nil {
  1170. return errors.WithMessage(err, "unmarshal call")
  1171. }
  1172. p = p[v.variantCallPacket.Size():]
  1173. if err = v.StreamID.UnmarshalBinary(p); err != nil {
  1174. return errors.WithMessage(err, "unmarshal sid")
  1175. }
  1176. return
  1177. }
  1178. func (v *CreateStreamResPacket) MarshalBinary() (data []byte, err error) {
  1179. var pb []byte
  1180. if pb, err = v.variantCallPacket.MarshalBinary(); err != nil {
  1181. return nil, errors.WithMessage(err, "marshal call")
  1182. }
  1183. data = append(data, pb...)
  1184. if pb, err = v.StreamID.MarshalBinary(); err != nil {
  1185. return nil, errors.WithMessage(err, "marshal sid")
  1186. }
  1187. data = append(data, pb...)
  1188. return
  1189. }
  1190. // Please read @doc rtmp_specification_1.0.pdf, @page 64, @section 4.2.6. Publish
  1191. type PublishPacket struct {
  1192. variantCallPacket
  1193. StreamName amf0String
  1194. StreamType amf0String
  1195. }
  1196. func NewPublishPacket() *PublishPacket {
  1197. v := &PublishPacket{}
  1198. v.CommandName = commandPublish
  1199. v.CommandObject = NewAmf0Null()
  1200. v.StreamType = "live"
  1201. return v
  1202. }
  1203. func (v *PublishPacket) Size() int {
  1204. return v.variantCallPacket.Size() + v.StreamName.Size() + v.StreamType.Size()
  1205. }
  1206. func (v *PublishPacket) UnmarshalBinary(data []byte) (err error) {
  1207. p := data
  1208. if err = v.variantCallPacket.UnmarshalBinary(p); err != nil {
  1209. return errors.WithMessage(err, "unmarshal call")
  1210. }
  1211. p = p[v.variantCallPacket.Size():]
  1212. if err = v.StreamName.UnmarshalBinary(p); err != nil {
  1213. return errors.WithMessage(err, "unmarshal stream name")
  1214. }
  1215. p = p[v.StreamName.Size():]
  1216. if err = v.StreamType.UnmarshalBinary(p); err != nil {
  1217. return errors.WithMessage(err, "unmarshal stream type")
  1218. }
  1219. return
  1220. }
  1221. func (v *PublishPacket) MarshalBinary() (data []byte, err error) {
  1222. var pb []byte
  1223. if pb, err = v.variantCallPacket.MarshalBinary(); err != nil {
  1224. return nil, errors.WithMessage(err, "marshal call")
  1225. }
  1226. data = append(data, pb...)
  1227. if pb, err = v.StreamName.MarshalBinary(); err != nil {
  1228. return nil, errors.WithMessage(err, "marshal stream name")
  1229. }
  1230. data = append(data, pb...)
  1231. if pb, err = v.StreamType.MarshalBinary(); err != nil {
  1232. return nil, errors.WithMessage(err, "marshal stream type")
  1233. }
  1234. data = append(data, pb...)
  1235. return
  1236. }
  1237. // Please read @doc rtmp_specification_1.0.pdf, @page 54, @section 4.2.1. play
  1238. type PlayPacket struct {
  1239. variantCallPacket
  1240. StreamName amf0String
  1241. }
  1242. func NewPlayPacket() *PlayPacket {
  1243. v := &PlayPacket{}
  1244. v.CommandName = commandPlay
  1245. v.CommandObject = NewAmf0Null()
  1246. return v
  1247. }
  1248. func (v *PlayPacket) Size() int {
  1249. return v.variantCallPacket.Size() + v.StreamName.Size()
  1250. }
  1251. func (v *PlayPacket) UnmarshalBinary(data []byte) (err error) {
  1252. p := data
  1253. if err = v.variantCallPacket.UnmarshalBinary(p); err != nil {
  1254. return errors.WithMessage(err, "unmarshal call")
  1255. }
  1256. p = p[v.variantCallPacket.Size():]
  1257. if err = v.StreamName.UnmarshalBinary(p); err != nil {
  1258. return errors.WithMessage(err, "unmarshal stream name")
  1259. }
  1260. p = p[v.StreamName.Size():]
  1261. return
  1262. }
  1263. func (v *PlayPacket) MarshalBinary() (data []byte, err error) {
  1264. var pb []byte
  1265. if pb, err = v.variantCallPacket.MarshalBinary(); err != nil {
  1266. return nil, errors.WithMessage(err, "marshal call")
  1267. }
  1268. data = append(data, pb...)
  1269. if pb, err = v.StreamName.MarshalBinary(); err != nil {
  1270. return nil, errors.WithMessage(err, "marshal stream name")
  1271. }
  1272. data = append(data, pb...)
  1273. return
  1274. }
  1275. // Please read @doc rtmp_specification_1.0.pdf, @page 31, @section 5.1. Set Chunk Size
  1276. // Protocol control message 1, Set Chunk Size, is used to notify the
  1277. // peer about the new maximum chunk size.
  1278. type SetChunkSize struct {
  1279. ChunkSize uint32
  1280. }
  1281. func NewSetChunkSize() *SetChunkSize {
  1282. return &SetChunkSize{
  1283. ChunkSize: defaultChunkSize,
  1284. }
  1285. }
  1286. func (v *SetChunkSize) BetterCid() chunkID {
  1287. return chunkIDProtocolControl
  1288. }
  1289. func (v *SetChunkSize) Type() MessageType {
  1290. return MessageTypeSetChunkSize
  1291. }
  1292. func (v *SetChunkSize) Size() int {
  1293. return 4
  1294. }
  1295. func (v *SetChunkSize) UnmarshalBinary(data []byte) (err error) {
  1296. if len(data) < 4 {
  1297. return errors.Errorf("requires 4 only %v bytes, %x", len(data), data)
  1298. }
  1299. v.ChunkSize = binary.BigEndian.Uint32(data)
  1300. return
  1301. }
  1302. func (v *SetChunkSize) MarshalBinary() (data []byte, err error) {
  1303. data = make([]byte, 4)
  1304. binary.BigEndian.PutUint32(data, v.ChunkSize)
  1305. return
  1306. }
  1307. // Please read @doc rtmp_specification_1.0.pdf, @page 33, @section 5.5. Window Acknowledgement Size (5)
  1308. // The client or the server sends this message to inform the peer which
  1309. // window size to use when sending acknowledgment.
  1310. type WindowAcknowledgementSize struct {
  1311. AckSize uint32
  1312. }
  1313. func NewWindowAcknowledgementSize() *WindowAcknowledgementSize {
  1314. return &WindowAcknowledgementSize{}
  1315. }
  1316. func (v *WindowAcknowledgementSize) BetterCid() chunkID {
  1317. return chunkIDProtocolControl
  1318. }
  1319. func (v *WindowAcknowledgementSize) Type() MessageType {
  1320. return MessageTypeWindowAcknowledgementSize
  1321. }
  1322. func (v *WindowAcknowledgementSize) Size() int {
  1323. return 4
  1324. }
  1325. func (v *WindowAcknowledgementSize) UnmarshalBinary(data []byte) (err error) {
  1326. if len(data) < 4 {
  1327. return errors.Errorf("requires 4 only %v bytes, %x", len(data), data)
  1328. }
  1329. v.AckSize = binary.BigEndian.Uint32(data)
  1330. return
  1331. }
  1332. func (v *WindowAcknowledgementSize) MarshalBinary() (data []byte, err error) {
  1333. data = make([]byte, 4)
  1334. binary.BigEndian.PutUint32(data, v.AckSize)
  1335. return
  1336. }
  1337. // Please read @doc rtmp_specification_1.0.pdf, @page 33, @section 5.6. Set Peer Bandwidth (6)
  1338. // The sender can mark this message hard (0), soft (1), or dynamic (2)
  1339. // using the Limit type field.
  1340. type LimitType uint8
  1341. const (
  1342. LimitTypeHard LimitType = iota
  1343. LimitTypeSoft
  1344. LimitTypeDynamic
  1345. )
  1346. // Please read @doc rtmp_specification_1.0.pdf, @page 33, @section 5.6. Set Peer Bandwidth (6)
  1347. // The client or the server sends this message to update the output
  1348. // bandwidth of the peer.
  1349. type SetPeerBandwidth struct {
  1350. Bandwidth uint32
  1351. LimitType LimitType
  1352. }
  1353. func NewSetPeerBandwidth() *SetPeerBandwidth {
  1354. return &SetPeerBandwidth{}
  1355. }
  1356. func (v *SetPeerBandwidth) BetterCid() chunkID {
  1357. return chunkIDProtocolControl
  1358. }
  1359. func (v *SetPeerBandwidth) Type() MessageType {
  1360. return MessageTypeSetPeerBandwidth
  1361. }
  1362. func (v *SetPeerBandwidth) Size() int {
  1363. return 4 + 1
  1364. }
  1365. func (v *SetPeerBandwidth) UnmarshalBinary(data []byte) (err error) {
  1366. if len(data) < 5 {
  1367. return errors.Errorf("requires 5 only %v bytes, %x", len(data), data)
  1368. }
  1369. v.Bandwidth = binary.BigEndian.Uint32(data)
  1370. v.LimitType = LimitType(data[4])
  1371. return
  1372. }
  1373. func (v *SetPeerBandwidth) MarshalBinary() (data []byte, err error) {
  1374. data = make([]byte, 5)
  1375. binary.BigEndian.PutUint32(data, v.Bandwidth)
  1376. data[4] = byte(v.LimitType)
  1377. return
  1378. }
  1379. type EventType uint16
  1380. const (
  1381. // Generally, 4bytes event-data
  1382. // The server sends this event to notify the client
  1383. // that a stream has become functional and can be
  1384. // used for communication. By default, this event
  1385. // is sent on ID 0 after the application connect
  1386. // command is successfully received from the
  1387. // client. The event data is 4-byte and represents
  1388. // The stream ID of the stream that became
  1389. // Functional.
  1390. EventTypeStreamBegin = 0x00
  1391. // The server sends this event to notify the client
  1392. // that the playback of data is over as requested
  1393. // on this stream. No more data is sent without
  1394. // issuing additional commands. The client discards
  1395. // The messages received for the stream. The
  1396. // 4 bytes of event data represent the ID of the
  1397. // stream on which playback has ended.
  1398. EventTypeStreamEOF = 0x01
  1399. // The server sends this event to notify the client
  1400. // that there is no more data on the stream. If the
  1401. // server does not detect any message for a time
  1402. // period, it can notify the subscribed clients
  1403. // that the stream is dry. The 4 bytes of event
  1404. // data represent the stream ID of the dry stream.
  1405. EventTypeStreamDry = 0x02
  1406. // The client sends this event to inform the server
  1407. // of the buffer size (in milliseconds) that is
  1408. // used to buffer any data coming over a stream.
  1409. // This event is sent before the server starts
  1410. // processing the stream. The first 4 bytes of the
  1411. // event data represent the stream ID and the next
  1412. // 4 bytes represent the buffer length, in
  1413. // milliseconds.
  1414. EventTypeSetBufferLength = 0x03 // 8bytes event-data
  1415. // The server sends this event to notify the client
  1416. // that the stream is a recorded stream. The
  1417. // 4 bytes event data represent the stream ID of
  1418. // The recorded stream.
  1419. EventTypeStreamIsRecorded = 0x04
  1420. // The server sends this event to test whether the
  1421. // client is reachable. Event data is a 4-byte
  1422. // timestamp, representing the local server time
  1423. // When the server dispatched the command. The
  1424. // client responds with kMsgPingResponse on
  1425. // receiving kMsgPingRequest.
  1426. EventTypePingRequest = 0x06
  1427. // The client sends this event to the server in
  1428. // Response to the ping request. The event data is
  1429. // a 4-byte timestamp, which was received with the
  1430. // kMsgPingRequest request.
  1431. EventTypePingResponse = 0x07
  1432. // For PCUC size=3, for example the payload is "00 1A 01",
  1433. // it's a FMS control event, where the event type is 0x001a and event data is 0x01,
  1434. // please notice that the event data is only 1 byte for this event.
  1435. EventTypeFmsEvent0 = 0x1a
  1436. )
  1437. // Please read @doc rtmp_specification_1.0.pdf, @page 32, @5.4. User Control Message (4)
  1438. // The client or the server sends this message to notify the peer about the user control events.
  1439. // This message carries Event type and Event data.
  1440. type UserControl struct {
  1441. // Event type is followed by Event data.
  1442. // @see: SrcPCUCEventType
  1443. EventType EventType
  1444. // The event data generally in 4bytes.
  1445. // @remark for event type is 0x001a, only 1bytes.
  1446. // @see SrsPCUCFmsEvent0
  1447. EventData int32
  1448. // 4bytes if event_type is SetBufferLength; otherwise 0.
  1449. ExtraData int32
  1450. }
  1451. func NewUserControl() *UserControl {
  1452. return &UserControl{}
  1453. }
  1454. func (v *UserControl) BetterCid() chunkID {
  1455. return chunkIDProtocolControl
  1456. }
  1457. func (v *UserControl) Type() MessageType {
  1458. return MessageTypeUserControl
  1459. }
  1460. func (v *UserControl) Size() int {
  1461. size := 2
  1462. if v.EventType == EventTypeFmsEvent0 {
  1463. size += 1
  1464. } else {
  1465. size += 4
  1466. }
  1467. if v.EventType == EventTypeSetBufferLength {
  1468. size += 4
  1469. }
  1470. return size
  1471. }
  1472. func (v *UserControl) UnmarshalBinary(data []byte) (err error) {
  1473. if len(data) < 3 {
  1474. return errors.Errorf("requires 5 only %v bytes, %x", len(data), data)
  1475. }
  1476. v.EventType = EventType(binary.BigEndian.Uint16(data))
  1477. if len(data) < v.Size() {
  1478. return errors.Errorf("requires %v only %v bytes, %x", v.Size(), len(data), data)
  1479. }
  1480. if v.EventType == EventTypeFmsEvent0 {
  1481. v.EventData = int32(uint8(data[2]))
  1482. } else {
  1483. v.EventData = int32(binary.BigEndian.Uint32(data[2:]))
  1484. }
  1485. if v.EventType == EventTypeSetBufferLength {
  1486. v.ExtraData = int32(binary.BigEndian.Uint32(data[6:]))
  1487. }
  1488. return
  1489. }
  1490. func (v *UserControl) MarshalBinary() (data []byte, err error) {
  1491. data = make([]byte, v.Size())
  1492. binary.BigEndian.PutUint16(data, uint16(v.EventType))
  1493. if v.EventType == EventTypeFmsEvent0 {
  1494. data[2] = uint8(v.EventData)
  1495. } else {
  1496. binary.BigEndian.PutUint32(data[2:], uint32(v.EventData))
  1497. }
  1498. if v.EventType == EventTypeSetBufferLength {
  1499. binary.BigEndian.PutUint32(data[6:], uint32(v.ExtraData))
  1500. }
  1501. return
  1502. }