buffer_rcv.h 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. /*
  2. * SRT - Secure, Reliable, Transport
  3. * Copyright (c) 2020 Haivision Systems Inc.
  4. *
  5. * This Source Code Form is subject to the terms of the Mozilla Public
  6. * License, v. 2.0. If a copy of the MPL was not distributed with this
  7. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  8. *
  9. */
  10. #ifndef INC_SRT_BUFFER_RCV_H
  11. #define INC_SRT_BUFFER_RCV_H
  12. #include "buffer_tools.h" // AvgBufSize
  13. #include "common.h"
  14. #include "queue.h"
  15. #include "tsbpd_time.h"
  16. namespace srt
  17. {
  18. /*
  19. * Circular receiver buffer.
  20. *
  21. * |<------------------- m_szSize ---------------------------->|
  22. * | |<------------ m_iMaxPosOff ----------->| |
  23. * | | | |
  24. * +---+---+---+---+---+---+---+---+---+---+---+---+---+ +---+
  25. * | 0 | 0 | 1 | 1 | 1 | 0 | 1 | 1 | 1 | 1 | 0 | 1 | 0 |...| 0 | m_pUnit[]
  26. * +---+---+---+---+---+---+---+---+---+---+---+---+---+ +---+
  27. * | |
  28. * | \__last pkt received
  29. * |
  30. * \___ m_iStartPos: first message to read
  31. *
  32. * m_pUnit[i]->status_: 0: free, 1: good, 2: read, 3: dropped (can be combined with read?)
  33. *
  34. * thread safety:
  35. * start_pos_: CUDT::m_RecvLock
  36. * first_unack_pos_: CUDT::m_AckLock
  37. * max_pos_inc_: none? (modified on add and ack
  38. * first_nonread_pos_:
  39. */
  40. class CRcvBuffer
  41. {
  42. typedef sync::steady_clock::time_point time_point;
  43. typedef sync::steady_clock::duration duration;
  44. public:
  45. CRcvBuffer(int initSeqNo, size_t size, CUnitQueue* unitqueue, bool bMessageAPI);
  46. ~CRcvBuffer();
  47. public:
  48. /// Insert a unit into the buffer.
  49. /// Similar to CRcvBuffer::addData(CUnit* unit, int offset)
  50. ///
  51. /// @param [in] unit pointer to a data unit containing new packet
  52. /// @param [in] offset offset from last ACK point.
  53. ///
  54. /// @return 0 on success, -1 if packet is already in buffer, -2 if packet is before m_iStartSeqNo.
  55. /// -3 if a packet is offset is ahead the buffer capacity.
  56. // TODO: Previously '-2' also meant 'already acknowledged'. Check usage of this value.
  57. int insert(CUnit* unit);
  58. /// Drop packets in the receiver buffer from the current position up to the seqno (excluding seqno).
  59. /// @param [in] seqno drop units up to this sequence number
  60. /// @return number of dropped packets.
  61. int dropUpTo(int32_t seqno);
  62. /// @brief Drop all the packets in the receiver buffer.
  63. /// The starting position and seqno are shifted right after the last packet in the buffer.
  64. /// @return the number of dropped packets.
  65. int dropAll();
  66. enum DropActionIfExists {
  67. DROP_EXISTING = 0,
  68. KEEP_EXISTING = 1
  69. };
  70. /// @brief Drop a sequence of packets from the buffer.
  71. /// If @a msgno is valid, sender has requested to drop the whole message by TTL. In this case it has to also provide a pkt seqno range.
  72. /// However, if a message has been partially acknowledged and already removed from the SND buffer,
  73. /// the @a seqnolo might specify some position in the middle of the message, not the very first packet.
  74. /// If those packets have been acknowledged, they must exist in the receiver buffer unless already read.
  75. /// In this case the @a msgno should be used to determine starting packets of the message.
  76. /// Some packets of the message can be missing on the receiver, therefore the actual drop should still be performed by pkt seqno range.
  77. /// If message number is 0 or SRT_MSGNO_NONE, then use sequence numbers to locate sequence range to drop [seqnolo, seqnohi].
  78. /// A SOLO message packet can be kept depending on @a actionOnExisting value.
  79. /// TODO: A message in general can be kept if all of its packets are in the buffer, depending on @a actionOnExisting value.
  80. /// This is done to avoid dropping existing packet when the sender was asked to re-transmit a packet from an outdated loss report,
  81. /// which is already not available in the SND buffer.
  82. /// @param seqnolo sequence number of the first packet in the dropping range.
  83. /// @param seqnohi sequence number of the last packet in the dropping range.
  84. /// @param msgno message number to drop (0 if unknown)
  85. /// @param actionOnExisting Should an exising SOLO packet be dropped from the buffer or preserved?
  86. /// @return the number of packets actually dropped.
  87. int dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno, DropActionIfExists actionOnExisting);
  88. /// Read the whole message from one or several packets.
  89. ///
  90. /// @param [in,out] data buffer to write the message into.
  91. /// @param [in] len size of the buffer.
  92. /// @param [in,out] message control data
  93. ///
  94. /// @return actual number of bytes extracted from the buffer.
  95. /// 0 if nothing to read.
  96. /// -1 on failure.
  97. int readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl = NULL);
  98. /// Read acknowledged data into a user buffer.
  99. /// @param [in, out] dst pointer to the target user buffer.
  100. /// @param [in] len length of user buffer.
  101. /// @return size of data read. -1 on error.
  102. int readBuffer(char* dst, int len);
  103. /// Read acknowledged data directly into file.
  104. /// @param [in] ofs C++ file stream.
  105. /// @param [in] len expected length of data to write into the file.
  106. /// @return size of data read. -1 on error.
  107. int readBufferToFile(std::fstream& ofs, int len);
  108. public:
  109. /// Get the starting position of the buffer as a packet sequence number.
  110. int getStartSeqNo() const { return m_iStartSeqNo; }
  111. /// Sets the start seqno of the buffer.
  112. /// Must be used with caution and only when the buffer is empty.
  113. void setStartSeqNo(int seqno) { m_iStartSeqNo = seqno; }
  114. /// Given the sequence number of the first unacknowledged packet
  115. /// tells the size of the buffer available for packets.
  116. /// Effective returns capacity of the buffer minus acknowledged packet still kept in it.
  117. // TODO: Maybe does not need to return minus one slot now to distinguish full and empty buffer.
  118. size_t getAvailSize(int iFirstUnackSeqNo) const
  119. {
  120. // Receiver buffer allows reading unacknowledged packets.
  121. // Therefore if the first packet in the buffer is ahead of the iFirstUnackSeqNo
  122. // then it does not have acknowledged packets and its full capacity is available.
  123. // Otherwise subtract the number of acknowledged but not yet read packets from its capacity.
  124. const int iRBufSeqNo = getStartSeqNo();
  125. if (CSeqNo::seqcmp(iRBufSeqNo, iFirstUnackSeqNo) >= 0) // iRBufSeqNo >= iFirstUnackSeqNo
  126. {
  127. // Full capacity is available.
  128. return capacity();
  129. }
  130. // Note: CSeqNo::seqlen(n, n) returns 1.
  131. return capacity() - CSeqNo::seqlen(iRBufSeqNo, iFirstUnackSeqNo) + 1;
  132. }
  133. /// @brief Checks if the buffer has packets available for reading regardless of the TSBPD.
  134. /// A message is available for reading only if all of its packets are present in the buffer.
  135. /// @return true if there are packets available for reading, false otherwise.
  136. bool hasAvailablePackets() const;
  137. /// Query how many data has been continuously received (for reading) and available for reading out
  138. /// regardless of the TSBPD.
  139. /// TODO: Rename to countAvailablePackets().
  140. /// @return size of valid (continuous) data for reading.
  141. int getRcvDataSize() const;
  142. /// Get the number of packets, bytes and buffer timespan.
  143. /// Differs from getRcvDataSize() that it counts all packets in the buffer, not only continious.
  144. int getRcvDataSize(int& bytes, int& timespan) const;
  145. struct PacketInfo
  146. {
  147. int seqno;
  148. bool seq_gap; //< true if there are missing packets in the buffer, preceding current packet
  149. time_point tsbpd_time;
  150. };
  151. /// Get information on the 1st message in queue.
  152. /// Similar to CRcvBuffer::getRcvFirstMsg
  153. /// Parameters (of the 1st packet queue, ready to play or not):
  154. /// @param [out] tsbpdtime localtime-based (uSec) packet time stamp including buffering delay of 1st packet or 0 if
  155. /// none
  156. /// @param [out] passack true if 1st ready packet is not yet acknowledged (allowed to be delivered to the app)
  157. /// @param [out] skipseqno -1 or sequence number of 1st unacknowledged packet (after one or more missing packets) that is ready to play.
  158. /// @retval true 1st packet ready to play (tsbpdtime <= now). Not yet acknowledged if passack == true
  159. /// @retval false IF tsbpdtime = 0: rcv buffer empty; ELSE:
  160. /// IF skipseqno != -1, packet ready to play preceded by missing packets.;
  161. /// IF skipseqno == -1, no missing packet but 1st not ready to play.
  162. PacketInfo getFirstValidPacketInfo() const;
  163. PacketInfo getFirstReadablePacketInfo(time_point time_now) const;
  164. /// Get information on packets available to be read.
  165. /// @returns a pair of sequence numbers (first available; first unavailable).
  166. ///
  167. /// @note CSeqNo::seqoff(first, second) is 0 if nothing to read.
  168. std::pair<int, int> getAvailablePacketsRange() const;
  169. size_t countReadable() const;
  170. bool empty() const
  171. {
  172. return (m_iMaxPosOff == 0);
  173. }
  174. /// Return buffer capacity.
  175. /// One slot had to be empty in order to tell the difference between "empty buffer" and "full buffer".
  176. /// E.g. m_iFirstNonreadPos would again point to m_iStartPos if m_szSize entries are added continiously.
  177. /// TODO: Old receiver buffer capacity returns the actual size. Check for conflicts.
  178. size_t capacity() const
  179. {
  180. return m_szSize - 1;
  181. }
  182. int64_t getDrift() const { return m_tsbpd.drift(); }
  183. // TODO: make thread safe?
  184. int debugGetSize() const
  185. {
  186. return getRcvDataSize();
  187. }
  188. /// Zero time to include all available packets.
  189. /// TODO: Rename to 'canRead`.
  190. bool isRcvDataReady(time_point time_now = time_point()) const;
  191. int getRcvAvgDataSize(int& bytes, int& timespan);
  192. void updRcvAvgDataSize(const time_point& now);
  193. unsigned getRcvAvgPayloadSize() const { return m_uAvgPayloadSz; }
  194. void getInternalTimeBase(time_point& w_timebase, bool& w_wrp, duration& w_udrift)
  195. {
  196. return m_tsbpd.getInternalTimeBase(w_timebase, w_wrp, w_udrift);
  197. }
  198. public: // Used for testing
  199. /// Peek unit in position of seqno
  200. const CUnit* peek(int32_t seqno);
  201. private:
  202. inline int incPos(int pos, int inc = 1) const { return (pos + inc) % m_szSize; }
  203. inline int decPos(int pos) const { return (pos - 1) >= 0 ? (pos - 1) : int(m_szSize - 1); }
  204. inline int offPos(int pos1, int pos2) const { return (pos2 >= pos1) ? (pos2 - pos1) : int(m_szSize + pos2 - pos1); }
  205. inline int cmpPos(int pos2, int pos1) const
  206. {
  207. // XXX maybe not the best implementation, but this keeps up to the rule
  208. const int off1 = pos1 >= m_iStartPos ? pos1 - m_iStartPos : pos1 + (int)m_szSize - m_iStartPos;
  209. const int off2 = pos2 >= m_iStartPos ? pos2 - m_iStartPos : pos2 + (int)m_szSize - m_iStartPos;
  210. return off2 - off1;
  211. }
  212. // NOTE: Assumes that pUnit != NULL
  213. CPacket& packetAt(int pos) { return m_entries[pos].pUnit->m_Packet; }
  214. const CPacket& packetAt(int pos) const { return m_entries[pos].pUnit->m_Packet; }
  215. private:
  216. void countBytes(int pkts, int bytes);
  217. void updateNonreadPos();
  218. void releaseUnitInPos(int pos);
  219. /// @brief Drop a unit from the buffer.
  220. /// @param pos position in the m_entries of the unit to drop.
  221. /// @return false if nothing to drop, true if the unit was dropped successfully.
  222. bool dropUnitInPos(int pos);
  223. /// Release entries following the current buffer position if they were already
  224. /// read out of order (EntryState_Read) or dropped (EntryState_Drop).
  225. void releaseNextFillerEntries();
  226. bool hasReadableInorderPkts() const { return (m_iFirstNonreadPos != m_iStartPos); }
  227. /// Find position of the last packet of the message.
  228. int findLastMessagePkt();
  229. /// Scan for availability of out of order packets.
  230. void onInsertNotInOrderPacket(int insertpos);
  231. // Check if m_iFirstReadableOutOfOrder is still readable.
  232. bool checkFirstReadableOutOfOrder();
  233. void updateFirstReadableOutOfOrder();
  234. int scanNotInOrderMessageRight(int startPos, int msgNo) const;
  235. int scanNotInOrderMessageLeft(int startPos, int msgNo) const;
  236. typedef bool copy_to_dst_f(char* data, int len, int dst_offset, void* arg);
  237. /// Read acknowledged data directly into file.
  238. /// @param [in] ofs C++ file stream.
  239. /// @param [in] len expected length of data to write into the file.
  240. /// @return size of data read.
  241. int readBufferTo(int len, copy_to_dst_f funcCopyToDst, void* arg);
  242. /// @brief Estimate timespan of the stored packets (acknowledged and unacknowledged).
  243. /// @return timespan in milliseconds
  244. int getTimespan_ms() const;
  245. private:
  246. // TODO: Call makeUnitTaken upon assignment, and makeUnitFree upon clearing.
  247. // TODO: CUnitPtr is not in use at the moment, but may be a smart pointer.
  248. // class CUnitPtr
  249. // {
  250. // public:
  251. // void operator=(CUnit* pUnit)
  252. // {
  253. // if (m_pUnit != NULL)
  254. // {
  255. // // m_pUnitQueue->makeUnitFree(m_entries[i].pUnit);
  256. // }
  257. // m_pUnit = pUnit;
  258. // }
  259. // private:
  260. // CUnit* m_pUnit;
  261. // };
  262. enum EntryStatus
  263. {
  264. EntryState_Empty, //< No CUnit record.
  265. EntryState_Avail, //< Entry is available for reading.
  266. EntryState_Read, //< Entry has already been read (out of order).
  267. EntryState_Drop //< Entry has been dropped.
  268. };
  269. struct Entry
  270. {
  271. Entry()
  272. : pUnit(NULL)
  273. , status(EntryState_Empty)
  274. {}
  275. CUnit* pUnit;
  276. EntryStatus status;
  277. };
  278. //static Entry emptyEntry() { return Entry { NULL, EntryState_Empty }; }
  279. FixedArray<Entry> m_entries;
  280. const size_t m_szSize; // size of the array of units (buffer)
  281. CUnitQueue* m_pUnitQueue; // the shared unit queue
  282. int m_iStartSeqNo;
  283. int m_iStartPos; // the head position for I/O (inclusive)
  284. int m_iFirstNonreadPos; // First position that can't be read (<= m_iLastAckPos)
  285. int m_iMaxPosOff; // the furthest data position
  286. int m_iNotch; // the starting read point of the first unit
  287. size_t m_numOutOfOrderPackets; // The number of stored packets with "inorder" flag set to false
  288. int m_iFirstReadableOutOfOrder; // In case of out ouf order packet, points to a position of the first such packet to
  289. // read
  290. bool m_bPeerRexmitFlag; // Needed to read message number correctly
  291. const bool m_bMessageAPI; // Operation mode flag: message or stream.
  292. public: // TSBPD public functions
  293. /// Set TimeStamp-Based Packet Delivery Rx Mode
  294. /// @param [in] timebase localtime base (uSec) of packet time stamps including buffering delay
  295. /// @param [in] wrap Is in wrapping period
  296. /// @param [in] delay agreed TsbPD delay
  297. ///
  298. /// @return 0
  299. void setTsbPdMode(const time_point& timebase, bool wrap, duration delay);
  300. void setPeerRexmitFlag(bool flag) { m_bPeerRexmitFlag = flag; }
  301. void applyGroupTime(const time_point& timebase, bool wrp, uint32_t delay, const duration& udrift);
  302. void applyGroupDrift(const time_point& timebase, bool wrp, const duration& udrift);
  303. bool addRcvTsbPdDriftSample(uint32_t usTimestamp, const time_point& tsPktArrival, int usRTTSample);
  304. time_point getPktTsbPdTime(uint32_t usPktTimestamp) const;
  305. time_point getTsbPdTimeBase(uint32_t usPktTimestamp) const;
  306. void updateTsbPdTimeBase(uint32_t usPktTimestamp);
  307. bool isTsbPd() const { return m_tsbpd.isEnabled(); }
  308. /// Form a string of the current buffer fullness state.
  309. /// number of packets acknowledged, TSBPD readiness, etc.
  310. std::string strFullnessState(int iFirstUnackSeqNo, const time_point& tsNow) const;
  311. private:
  312. CTsbpdTime m_tsbpd;
  313. private: // Statistics
  314. AvgBufSize m_mavg;
  315. // TODO: m_BytesCountLock is probably not needed as the buffer has to be protected from simultaneous access.
  316. mutable sync::Mutex m_BytesCountLock; // used to protect counters operations
  317. int m_iBytesCount; // Number of payload bytes in the buffer
  318. int m_iPktsCount; // Number of payload bytes in the buffer
  319. unsigned m_uAvgPayloadSz; // Average payload size for dropped bytes estimation
  320. };
  321. } // namespace srt
  322. #endif // INC_SRT_BUFFER_RCV_H