payload_queue.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package sctp
  2. import (
  3. "fmt"
  4. "sort"
  5. )
  6. type payloadQueue struct {
  7. chunkMap map[uint32]*chunkPayloadData
  8. sorted []uint32
  9. dupTSN []uint32
  10. nBytes int
  11. }
  12. func newPayloadQueue() *payloadQueue {
  13. return &payloadQueue{chunkMap: map[uint32]*chunkPayloadData{}}
  14. }
  15. func (q *payloadQueue) updateSortedKeys() {
  16. if q.sorted != nil {
  17. return
  18. }
  19. q.sorted = make([]uint32, len(q.chunkMap))
  20. i := 0
  21. for k := range q.chunkMap {
  22. q.sorted[i] = k
  23. i++
  24. }
  25. sort.Slice(q.sorted, func(i, j int) bool {
  26. return sna32LT(q.sorted[i], q.sorted[j])
  27. })
  28. }
  29. func (q *payloadQueue) canPush(p *chunkPayloadData, cumulativeTSN uint32) bool {
  30. _, ok := q.chunkMap[p.tsn]
  31. if ok || sna32LTE(p.tsn, cumulativeTSN) {
  32. return false
  33. }
  34. return true
  35. }
  36. func (q *payloadQueue) pushNoCheck(p *chunkPayloadData) {
  37. q.chunkMap[p.tsn] = p
  38. q.nBytes += len(p.userData)
  39. q.sorted = nil
  40. }
  41. // push pushes a payload data. If the payload data is already in our queue or
  42. // older than our cumulativeTSN marker, it will be recored as duplications,
  43. // which can later be retrieved using popDuplicates.
  44. func (q *payloadQueue) push(p *chunkPayloadData, cumulativeTSN uint32) bool {
  45. _, ok := q.chunkMap[p.tsn]
  46. if ok || sna32LTE(p.tsn, cumulativeTSN) {
  47. // Found the packet, log in dups
  48. q.dupTSN = append(q.dupTSN, p.tsn)
  49. return false
  50. }
  51. q.chunkMap[p.tsn] = p
  52. q.nBytes += len(p.userData)
  53. q.sorted = nil
  54. return true
  55. }
  56. // pop pops only if the oldest chunk's TSN matches the given TSN.
  57. func (q *payloadQueue) pop(tsn uint32) (*chunkPayloadData, bool) {
  58. q.updateSortedKeys()
  59. if len(q.chunkMap) > 0 && tsn == q.sorted[0] {
  60. q.sorted = q.sorted[1:]
  61. if c, ok := q.chunkMap[tsn]; ok {
  62. delete(q.chunkMap, tsn)
  63. q.nBytes -= len(c.userData)
  64. return c, true
  65. }
  66. }
  67. return nil, false
  68. }
  69. // get returns reference to chunkPayloadData with the given TSN value.
  70. func (q *payloadQueue) get(tsn uint32) (*chunkPayloadData, bool) {
  71. c, ok := q.chunkMap[tsn]
  72. return c, ok
  73. }
  74. // popDuplicates returns an array of TSN values that were found duplicate.
  75. func (q *payloadQueue) popDuplicates() []uint32 {
  76. dups := q.dupTSN
  77. q.dupTSN = []uint32{}
  78. return dups
  79. }
  80. func (q *payloadQueue) getGapAckBlocks(cumulativeTSN uint32) (gapAckBlocks []gapAckBlock) {
  81. var b gapAckBlock
  82. if len(q.chunkMap) == 0 {
  83. return []gapAckBlock{}
  84. }
  85. q.updateSortedKeys()
  86. for i, tsn := range q.sorted {
  87. if i == 0 {
  88. b.start = uint16(tsn - cumulativeTSN)
  89. b.end = b.start
  90. continue
  91. }
  92. diff := uint16(tsn - cumulativeTSN)
  93. if b.end+1 == diff {
  94. b.end++
  95. } else {
  96. gapAckBlocks = append(gapAckBlocks, gapAckBlock{
  97. start: b.start,
  98. end: b.end,
  99. })
  100. b.start = diff
  101. b.end = diff
  102. }
  103. }
  104. gapAckBlocks = append(gapAckBlocks, gapAckBlock{
  105. start: b.start,
  106. end: b.end,
  107. })
  108. return gapAckBlocks
  109. }
  110. func (q *payloadQueue) getGapAckBlocksString(cumulativeTSN uint32) string {
  111. gapAckBlocks := q.getGapAckBlocks(cumulativeTSN)
  112. str := fmt.Sprintf("cumTSN=%d", cumulativeTSN)
  113. for _, b := range gapAckBlocks {
  114. str += fmt.Sprintf(",%d-%d", b.start, b.end)
  115. }
  116. return str
  117. }
  118. func (q *payloadQueue) markAsAcked(tsn uint32) int {
  119. var nBytesAcked int
  120. if c, ok := q.chunkMap[tsn]; ok {
  121. c.acked = true
  122. c.retransmit = false
  123. nBytesAcked = len(c.userData)
  124. q.nBytes -= nBytesAcked
  125. c.userData = []byte{}
  126. }
  127. return nBytesAcked
  128. }
  129. func (q *payloadQueue) getLastTSNReceived() (uint32, bool) {
  130. q.updateSortedKeys()
  131. qlen := len(q.sorted)
  132. if qlen == 0 {
  133. return 0, false
  134. }
  135. return q.sorted[qlen-1], true
  136. }
  137. func (q *payloadQueue) markAllToRetrasmit() {
  138. for _, c := range q.chunkMap {
  139. if c.acked || c.abandoned() {
  140. continue
  141. }
  142. c.retransmit = true
  143. }
  144. }
  145. func (q *payloadQueue) getNumBytes() int {
  146. return q.nBytes
  147. }
  148. func (q *payloadQueue) size() int {
  149. return len(q.chunkMap)
  150. }