core.h 55 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197
  1. /*
  2. * SRT - Secure, Reliable, Transport
  3. * Copyright (c) 2018 Haivision Systems Inc.
  4. *
  5. * This Source Code Form is subject to the terms of the Mozilla Public
  6. * License, v. 2.0. If a copy of the MPL was not distributed with this
  7. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  8. *
  9. */
  10. /*****************************************************************************
  11. Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois.
  12. All rights reserved.
  13. Redistribution and use in source and binary forms, with or without
  14. modification, are permitted provided that the following conditions are
  15. met:
  16. * Redistributions of source code must retain the above
  17. copyright notice, this list of conditions and the
  18. following disclaimer.
  19. * Redistributions in binary form must reproduce the
  20. above copyright notice, this list of conditions
  21. and the following disclaimer in the documentation
  22. and/or other materials provided with the distribution.
  23. * Neither the name of the University of Illinois
  24. nor the names of its contributors may be used to
  25. endorse or promote products derived from this
  26. software without specific prior written permission.
  27. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
  28. IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
  29. THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
  30. PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
  31. CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
  32. EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
  33. PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
  34. PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
  35. LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
  36. NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  37. SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  38. *****************************************************************************/
  39. /*****************************************************************************
  40. written by
  41. Yunhong Gu, last updated 02/28/2012
  42. modified by
  43. Haivision Systems Inc.
  44. *****************************************************************************/
  45. #ifndef INC_SRT_CORE_H
  46. #define INC_SRT_CORE_H
  47. #include <deque>
  48. #include <sstream>
  49. #include "srt.h"
  50. #include "common.h"
  51. #include "list.h"
  52. #include "buffer_snd.h"
  53. #include "buffer_rcv.h"
  54. #include "window.h"
  55. #include "packet.h"
  56. #include "channel.h"
  57. #include "cache.h"
  58. #include "queue.h"
  59. #include "handshake.h"
  60. #include "congctl.h"
  61. #include "packetfilter.h"
  62. #include "socketconfig.h"
  63. #include "utilities.h"
  64. #include "logger_defs.h"
  65. #include "stats.h"
  66. #include <haicrypt.h>
  67. // TODO: Utility function - to be moved to utilities.h?
  68. template <class T>
  69. inline T CountIIR(T base, T newval, double factor)
  70. {
  71. if ( base == 0.0 )
  72. return newval;
  73. T diff = newval - base;
  74. return base+T(diff*factor);
  75. }
  76. // TODO: Probably a better rework for that can be done - this can be
  77. // turned into a serializable structure, just like it's done for CHandShake.
  78. enum AckDataItem
  79. {
  80. ACKD_RCVLASTACK = 0,
  81. ACKD_RTT = 1,
  82. ACKD_RTTVAR = 2,
  83. ACKD_BUFFERLEFT = 3,
  84. ACKD_TOTAL_SIZE_SMALL = 4, // Size of the Small ACK, packet length = 16.
  85. // Extra fields for Full ACK.
  86. ACKD_RCVSPEED = 4,
  87. ACKD_BANDWIDTH = 5,
  88. ACKD_TOTAL_SIZE_UDTBASE = 6, // Packet length = 24.
  89. // Extra stats since SRT v1.0.1.
  90. ACKD_RCVRATE = 6,
  91. ACKD_TOTAL_SIZE_VER101 = 7, // Packet length = 28.
  92. // Only in SRT v1.0.2.
  93. ACKD_XMRATE_VER102_ONLY = 7,
  94. ACKD_TOTAL_SIZE_VER102_ONLY = 8, // Packet length = 32.
  95. ACKD_TOTAL_SIZE = ACKD_TOTAL_SIZE_VER102_ONLY // The maximum known ACK length is 32 bytes.
  96. };
  97. const size_t ACKD_FIELD_SIZE = sizeof(int32_t);
  98. static const size_t SRT_SOCKOPT_NPOST = 12;
  99. extern const SRT_SOCKOPT srt_post_opt_list [];
  100. enum GroupDataItem
  101. {
  102. GRPD_GROUPID,
  103. GRPD_GROUPDATA,
  104. GRPD_E_SIZE
  105. };
  106. const size_t GRPD_MIN_SIZE = 2; // ID and GROUPTYPE as backward compat
  107. const size_t GRPD_FIELD_SIZE = sizeof(int32_t);
  108. // For HSv4 legacy handshake
  109. #define SRT_MAX_HSRETRY 10 /* Maximum SRT handshake retry */
  110. enum SeqPairItems
  111. {
  112. SEQ_BEGIN = 0, SEQ_END = 1, SEQ_SIZE = 2
  113. };
  114. // Extended SRT Congestion control class - only an incomplete definition required
  115. class CCryptoControl;
  116. namespace srt {
  117. class CUDTUnited;
  118. class CUDTSocket;
  119. #if ENABLE_BONDING
  120. class CUDTGroup;
  121. #endif
  122. // XXX REFACTOR: The 'CUDT' class is to be merged with 'CUDTSocket'.
  123. // There's no reason for separating them, there's no case of having them
  124. // anyhow managed separately. After this is done, with a small help with
  125. // separating the internal abnormal path management (exceptions) from the
  126. // API (return values), through CUDTUnited, this class may become in future
  127. // an officially exposed C++ API.
  128. class CUDT
  129. {
  130. friend class CUDTSocket;
  131. friend class CUDTUnited;
  132. friend class CCC;
  133. friend struct CUDTComp;
  134. friend class CCache<CInfoBlock>;
  135. friend class CRendezvousQueue;
  136. friend class CSndQueue;
  137. friend class CRcvQueue;
  138. friend class CSndUList;
  139. friend class CRcvUList;
  140. friend class PacketFilter;
  141. friend class CUDTGroup;
  142. friend class TestMockCUDT; // unit tests
  143. typedef sync::steady_clock::time_point time_point;
  144. typedef sync::steady_clock::duration duration;
  145. typedef sync::AtomicClock<sync::steady_clock> atomic_time_point;
  146. typedef sync::AtomicDuration<sync::steady_clock> atomic_duration;
  147. private: // constructor and desctructor
  148. void construct();
  149. void clearData();
  150. CUDT(CUDTSocket* parent);
  151. CUDT(CUDTSocket* parent, const CUDT& ancestor);
  152. const CUDT& operator=(const CUDT&) {return *this;} // = delete ?
  153. ~CUDT();
  154. public: //API
  155. static int startup();
  156. static int cleanup();
  157. static SRTSOCKET socket();
  158. #if ENABLE_BONDING
  159. static SRTSOCKET createGroup(SRT_GROUP_TYPE);
  160. static SRTSOCKET getGroupOfSocket(SRTSOCKET socket);
  161. static int getGroupData(SRTSOCKET groupid, SRT_SOCKGROUPDATA* pdata, size_t* psize);
  162. static bool isgroup(SRTSOCKET sock) { return (sock & SRTGROUP_MASK) != 0; }
  163. #endif
  164. static int bind(SRTSOCKET u, const sockaddr* name, int namelen);
  165. static int bind(SRTSOCKET u, UDPSOCKET udpsock);
  166. static int listen(SRTSOCKET u, int backlog);
  167. static SRTSOCKET accept(SRTSOCKET u, sockaddr* addr, int* addrlen);
  168. static SRTSOCKET accept_bond(const SRTSOCKET listeners [], int lsize, int64_t msTimeOut);
  169. static int connect(SRTSOCKET u, const sockaddr* name, int namelen, int32_t forced_isn);
  170. static int connect(SRTSOCKET u, const sockaddr* name, const sockaddr* tname, int namelen);
  171. #if ENABLE_BONDING
  172. static int connectLinks(SRTSOCKET grp, SRT_SOCKGROUPCONFIG links [], int arraysize);
  173. #endif
  174. static int close(SRTSOCKET u);
  175. static int getpeername(SRTSOCKET u, sockaddr* name, int* namelen);
  176. static int getsockname(SRTSOCKET u, sockaddr* name, int* namelen);
  177. static int getsockopt(SRTSOCKET u, int level, SRT_SOCKOPT optname, void* optval, int* optlen);
  178. static int setsockopt(SRTSOCKET u, int level, SRT_SOCKOPT optname, const void* optval, int optlen);
  179. static int send(SRTSOCKET u, const char* buf, int len, int flags);
  180. static int recv(SRTSOCKET u, char* buf, int len, int flags);
  181. static int sendmsg(SRTSOCKET u, const char* buf, int len, int ttl = SRT_MSGTTL_INF, bool inorder = false, int64_t srctime = 0);
  182. static int recvmsg(SRTSOCKET u, char* buf, int len, int64_t& srctime);
  183. static int sendmsg2(SRTSOCKET u, const char* buf, int len, SRT_MSGCTRL& mctrl);
  184. static int recvmsg2(SRTSOCKET u, char* buf, int len, SRT_MSGCTRL& w_mctrl);
  185. static int64_t sendfile(SRTSOCKET u, std::fstream& ifs, int64_t& offset, int64_t size, int block = SRT_DEFAULT_SENDFILE_BLOCK);
  186. static int64_t recvfile(SRTSOCKET u, std::fstream& ofs, int64_t& offset, int64_t size, int block = SRT_DEFAULT_RECVFILE_BLOCK);
  187. static int select(int nfds, UDT::UDSET* readfds, UDT::UDSET* writefds, UDT::UDSET* exceptfds, const timeval* timeout);
  188. static int selectEx(const std::vector<SRTSOCKET>& fds, std::vector<SRTSOCKET>* readfds, std::vector<SRTSOCKET>* writefds, std::vector<SRTSOCKET>* exceptfds, int64_t msTimeOut);
  189. static int epoll_create();
  190. static int epoll_clear_usocks(int eid);
  191. static int epoll_add_usock(const int eid, const SRTSOCKET u, const int* events = NULL);
  192. static int epoll_add_ssock(const int eid, const SYSSOCKET s, const int* events = NULL);
  193. static int epoll_remove_usock(const int eid, const SRTSOCKET u);
  194. static int epoll_remove_ssock(const int eid, const SYSSOCKET s);
  195. static int epoll_update_usock(const int eid, const SRTSOCKET u, const int* events = NULL);
  196. static int epoll_update_ssock(const int eid, const SYSSOCKET s, const int* events = NULL);
  197. static int epoll_wait(const int eid, std::set<SRTSOCKET>* readfds, std::set<SRTSOCKET>* writefds,
  198. int64_t msTimeOut, std::set<SYSSOCKET>* lrfds = NULL, std::set<SYSSOCKET>* wrfds = NULL);
  199. static int epoll_uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut);
  200. static int32_t epoll_set(const int eid, int32_t flags);
  201. static int epoll_release(const int eid);
  202. static CUDTException& getlasterror();
  203. static int bstats(SRTSOCKET u, CBytePerfMon* perf, bool clear = true, bool instantaneous = false);
  204. #if ENABLE_BONDING
  205. static int groupsockbstats(SRTSOCKET u, CBytePerfMon* perf, bool clear = true);
  206. #endif
  207. static SRT_SOCKSTATUS getsockstate(SRTSOCKET u);
  208. static bool setstreamid(SRTSOCKET u, const std::string& sid);
  209. static std::string getstreamid(SRTSOCKET u);
  210. static int getsndbuffer(SRTSOCKET u, size_t* blocks, size_t* bytes);
  211. static int rejectReason(SRTSOCKET s);
  212. static int rejectReason(SRTSOCKET s, int value);
  213. static int64_t socketStartTime(SRTSOCKET s);
  214. public: // internal API
  215. // This is public so that it can be used directly in API implementation functions.
  216. struct APIError
  217. {
  218. APIError(const CUDTException&);
  219. APIError(CodeMajor, CodeMinor, int = 0);
  220. operator int() const
  221. {
  222. return SRT_ERROR;
  223. }
  224. };
  225. static const SRTSOCKET INVALID_SOCK = -1; // Invalid socket descriptor
  226. static const int ERROR = -1; // Socket api error returned value
  227. static const int HS_VERSION_UDT4 = 4;
  228. static const int HS_VERSION_SRT1 = 5;
  229. // Parameters
  230. //
  231. // NOTE: Use notation with X*1000*1000*... instead of
  232. // million zeros in a row.
  233. static const int COMM_RESPONSE_MAX_EXP = 16;
  234. static const int SRT_TLPKTDROP_MINTHRESHOLD_MS = 1000;
  235. static const uint64_t COMM_KEEPALIVE_PERIOD_US = 1*1000*1000;
  236. static const int32_t COMM_SYN_INTERVAL_US = 10*1000;
  237. static const int COMM_CLOSE_BROKEN_LISTENER_TIMEOUT_MS = 3000;
  238. static const uint16_t MAX_WEIGHT = 32767;
  239. static const size_t ACK_WND_SIZE = 1024;
  240. static const int INITIAL_RTT = 10 * COMM_SYN_INTERVAL_US;
  241. static const int INITIAL_RTTVAR = INITIAL_RTT / 2;
  242. int handshakeVersion()
  243. {
  244. return m_ConnRes.m_iVersion;
  245. }
  246. std::string CONID() const
  247. {
  248. #if ENABLE_LOGGING
  249. std::ostringstream os;
  250. os << "@" << m_SocketID << ": ";
  251. return os.str();
  252. #else
  253. return "";
  254. #endif
  255. }
  256. SRTSOCKET socketID() const { return m_SocketID; }
  257. static CUDT* getUDTHandle(SRTSOCKET u);
  258. static std::vector<SRTSOCKET> existingSockets();
  259. void addressAndSend(CPacket& pkt);
  260. SRT_ATTR_REQUIRES(m_ConnectionLock)
  261. void sendSrtMsg(int cmd, uint32_t *srtdata_in = NULL, size_t srtlen_in = 0);
  262. bool isOPT_TsbPd() const { return m_config.bTSBPD; }
  263. int SRTT() const { return m_iSRTT; }
  264. int RTTVar() const { return m_iRTTVar; }
  265. int32_t sndSeqNo() const { return m_iSndCurrSeqNo; }
  266. int32_t schedSeqNo() const { return m_iSndNextSeqNo; }
  267. bool overrideSndSeqNo(int32_t seq);
  268. #if ENABLE_BONDING
  269. sync::steady_clock::time_point lastRspTime() const { return m_tsLastRspTime.load(); }
  270. sync::steady_clock::time_point freshActivationStart() const { return m_tsFreshActivation; }
  271. #endif
  272. int32_t rcvSeqNo() const { return m_iRcvCurrSeqNo; }
  273. int flowWindowSize() const { return m_iFlowWindowSize; }
  274. int32_t deliveryRate() const { return m_iDeliveryRate; }
  275. int bandwidth() const { return m_iBandwidth; }
  276. int64_t maxBandwidth() const { return m_config.llMaxBW; }
  277. int MSS() const { return m_config.iMSS; }
  278. uint32_t peerLatency_us() const { return m_iPeerTsbPdDelay_ms * 1000; }
  279. int peerIdleTimeout_ms() const { return m_config.iPeerIdleTimeout_ms; }
  280. size_t maxPayloadSize() const { return m_iMaxSRTPayloadSize; }
  281. size_t OPT_PayloadSize() const { return m_config.zExpPayloadSize; }
  282. int sndLossLength() { return m_pSndLossList->getLossLength(); }
  283. int32_t ISN() const { return m_iISN; }
  284. int32_t peerISN() const { return m_iPeerISN; }
  285. duration minNAKInterval() const { return m_tdMinNakInterval; }
  286. sockaddr_any peerAddr() const { return m_PeerAddr; }
  287. /// Returns the number of packets in flight (sent, but not yet acknowledged).
  288. /// @param lastack is the sequence number of the first unacknowledged packet.
  289. /// @param curseq is the sequence number of the latest original packet sent
  290. ///
  291. /// @note When there are no packets in flight, lastack = incseq(curseq).
  292. ///
  293. /// @returns The number of packets in flight belonging to the interval [0; ...)
  294. static int32_t getFlightSpan(int32_t lastack, int32_t curseq)
  295. {
  296. // Packets sent:
  297. // | 1 | 2 | 3 | 4 | 5 |
  298. // ^ ^
  299. // | |
  300. // lastack |
  301. // curseq
  302. //
  303. // In Flight: [lastack; curseq]
  304. //
  305. // Normally 'lastack' should be PAST the 'curseq',
  306. // however in a case when the sending stopped and all packets were
  307. // ACKed, the 'lastack' is one sequence ahead of 'curseq'.
  308. // Therefore we increase 'curseq' by 1 forward and then
  309. // get the distance towards the last ACK. This way this value may
  310. // be only positive as seqlen() includes endpoints.
  311. // Finally, we subtract 1 to exclude the increment added earlier.
  312. return CSeqNo::seqlen(lastack, CSeqNo::incseq(curseq)) - 1;
  313. }
  314. /// Returns the number of packets in flight (sent, but not yet acknowledged).
  315. /// @returns The number of packets in flight belonging to the interval [0; ...)
  316. int32_t getFlightSpan() const
  317. {
  318. return getFlightSpan(m_iSndLastAck, m_iSndCurrSeqNo);
  319. }
  320. int minSndSize(int len = 0) const
  321. {
  322. const int ps = (int) maxPayloadSize();
  323. if (len == 0) // weird, can't use non-static data member as default argument!
  324. len = ps;
  325. return m_config.bMessageAPI ? (len+ps-1)/ps : 1;
  326. }
  327. static int32_t makeTS(const time_point& from_time, const time_point& tsStartTime)
  328. {
  329. // NOTE:
  330. // - This calculates first the time difference towards start time.
  331. // - This difference value is also CUT OFF THE SEGMENT information
  332. // (a multiple of MAX_TIMESTAMP+1)
  333. // So, this can be simply defined as: TS = (RTS - STS) % (MAX_TIMESTAMP+1)
  334. SRT_ASSERT(from_time >= tsStartTime);
  335. return (int32_t) sync::count_microseconds(from_time - tsStartTime);
  336. }
  337. /// @brief Set the timestamp field of the packet using the provided value (no check)
  338. /// @param p the packet structure to set the timestamp on.
  339. /// @param ts timestamp to use as a source for packet timestamp.
  340. SRT_ATTR_EXCLUDES(m_StatsLock)
  341. void setPacketTS(CPacket& p, const time_point& ts);
  342. /// @brief Set the timestamp field of the packet according the TSBPD mode.
  343. /// Also checks the connection start time (m_tsStartTime).
  344. /// @param p the packet structure to set the timestamp on.
  345. /// @param ts timestamp to use as a source for packet timestamp. Ignored if m_bPeerTsbPd is false.
  346. SRT_ATTR_EXCLUDES(m_StatsLock)
  347. void setDataPacketTS(CPacket& p, const time_point& ts);
  348. // Utility used for closing a listening socket
  349. // immediately to free the socket
  350. void notListening()
  351. {
  352. sync::ScopedLock cg(m_ConnectionLock);
  353. m_bListening = false;
  354. m_pRcvQueue->removeListener(this);
  355. }
  356. static int32_t generateISN()
  357. {
  358. using namespace sync;
  359. return genRandomInt(0, CSeqNo::m_iMaxSeqNo);
  360. }
  361. static CUDTUnited& uglobal(); // UDT global management base
  362. std::set<int>& pollset() { return m_sPollID; }
  363. CSrtConfig m_config;
  364. SRTU_PROPERTY_RO(SRTSOCKET, id, m_SocketID);
  365. SRTU_PROPERTY_RO(bool, isClosing, m_bClosing);
  366. SRTU_PROPERTY_RO(srt::CRcvBuffer*, rcvBuffer, m_pRcvBuffer);
  367. SRTU_PROPERTY_RO(bool, isTLPktDrop, m_bTLPktDrop);
  368. SRTU_PROPERTY_RO(bool, isSynReceiving, m_config.bSynRecving);
  369. SRTU_PROPERTY_RR(sync::Condition*, recvDataCond, &m_RecvDataCond);
  370. SRTU_PROPERTY_RR(sync::Condition*, recvTsbPdCond, &m_RcvTsbPdCond);
  371. /// @brief Request a socket to be broken due to too long instability (normally by a group).
  372. void breakAsUnstable() { m_bBreakAsUnstable = true; }
  373. void ConnectSignal(ETransmissionEvent tev, EventSlot sl);
  374. void DisconnectSignal(ETransmissionEvent tev);
  375. // This is in public section so prospective overriding it can be
  376. // done by directly assigning to a field.
  377. typedef std::vector< std::pair<int32_t, int32_t> > loss_seqs_t;
  378. typedef loss_seqs_t packetArrival_cb(void*, CPacket&);
  379. CallbackHolder<packetArrival_cb> m_cbPacketArrival;
  380. private:
  381. /// initialize a UDT entity and bind to a local address.
  382. void open();
  383. /// Start listening to any connection request.
  384. void setListenState();
  385. /// Connect to a UDT entity listening at address "peer".
  386. /// @param peer [in] The address of the listening UDT entity.
  387. void startConnect(const sockaddr_any& peer, int32_t forced_isn);
  388. /// Process the response handshake packet. Failure reasons can be:
  389. /// * Socket is not in connecting state
  390. /// * Response @a pkt is not a handshake control message
  391. /// * Rendezvous socket has once processed a regular handshake
  392. /// @param pkt [in] handshake packet.
  393. /// @retval 0 Connection successful
  394. /// @retval 1 Connection in progress (m_ConnReq turned into RESPONSE)
  395. /// @retval -1 Connection failed
  396. SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock)
  397. EConnectStatus processConnectResponse(const CPacket& pkt, CUDTException* eout) ATR_NOEXCEPT;
  398. // This function works in case of HSv5 rendezvous. It changes the state
  399. // according to the present state and received message type, as well as the
  400. // INITIATOR/RESPONDER side resolved through cookieContest().
  401. // The resulting data are:
  402. // - rsptype: handshake message type that should be sent back to the peer (nothing if URQ_DONE)
  403. // - needs_extension: the HSREQ/KMREQ or HSRSP/KMRSP extensions should be attached to the handshake message.
  404. // - RETURNED VALUE: if true, it means a URQ_CONCLUSION message was received with HSRSP/KMRSP extensions and needs HSRSP/KMRSP.
  405. void rendezvousSwitchState(UDTRequestType& rsptype, bool& needs_extension, bool& needs_hsrsp);
  406. void cookieContest();
  407. /// Interpret the incoming handshake packet in order to perform appropriate
  408. /// rendezvous FSM state transition if needed, and craft the response, serialized
  409. /// into the packet to be next sent.
  410. /// @param reqpkt Packet to be written with handshake data
  411. /// @param response incoming handshake response packet to be interpreted
  412. /// @param serv_addr incoming packet's address
  413. /// @param rst Current read status to know if the HS packet was freshly received from the peer, or this is only a periodic update (RST_AGAIN)
  414. SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock)
  415. EConnectStatus processRendezvous(const CPacket* response, const sockaddr_any& serv_addr, EReadStatus, CPacket& reqpkt);
  416. void sendRendezvousRejection(const sockaddr_any& serv_addr, CPacket& request);
  417. /// Create the CryptoControl object based on the HS packet.
  418. SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock)
  419. bool prepareConnectionObjects(const CHandShake &hs, HandshakeSide hsd, CUDTException* eout);
  420. /// Allocates sender and receiver buffers and loss lists.
  421. SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock)
  422. bool prepareBuffers(CUDTException* eout);
  423. SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock)
  424. EConnectStatus postConnect(const CPacket* response, bool rendezvous, CUDTException* eout) ATR_NOEXCEPT;
  425. SRT_ATR_NODISCARD bool applyResponseSettings(const CPacket* hspkt /*[[nullable]]*/) ATR_NOEXCEPT;
  426. SRT_ATR_NODISCARD EConnectStatus processAsyncConnectResponse(const CPacket& pkt) ATR_NOEXCEPT;
  427. SRT_ATR_NODISCARD bool processAsyncConnectRequest(EReadStatus rst, EConnectStatus cst, const CPacket* response, const sockaddr_any& serv_addr);
  428. SRT_ATR_NODISCARD EConnectStatus craftKmResponse(uint32_t* aw_kmdata, size_t& w_kmdatasize);
  429. void checkUpdateCryptoKeyLen(const char* loghdr, int32_t typefield);
  430. SRT_ATR_NODISCARD size_t fillSrtHandshake_HSREQ(uint32_t* srtdata, size_t srtlen, int hs_version);
  431. SRT_ATR_NODISCARD size_t fillSrtHandshake_HSRSP(uint32_t* srtdata, size_t srtlen, int hs_version);
  432. SRT_ATR_NODISCARD size_t fillSrtHandshake(uint32_t* srtdata, size_t srtlen, int msgtype, int hs_version);
  433. SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock)
  434. bool createSrtHandshake(int srths_cmd, int srtkm_cmd, const uint32_t* data, size_t datalen,
  435. CPacket& w_reqpkt, CHandShake& w_hs);
  436. SRT_ATR_NODISCARD size_t fillHsExtConfigString(uint32_t *pcmdspec, int cmd, const std::string &str);
  437. #if ENABLE_BONDING
  438. SRT_ATR_NODISCARD size_t fillHsExtGroup(uint32_t *pcmdspec);
  439. #endif
  440. SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock)
  441. size_t fillHsExtKMREQ(uint32_t *pcmdspec, size_t ki);
  442. SRT_ATR_NODISCARD size_t fillHsExtKMRSP(uint32_t *pcmdspec, const uint32_t *kmdata, size_t kmdata_wordsize);
  443. SRT_ATR_NODISCARD size_t prepareSrtHsMsg(int cmd, uint32_t* srtdata, size_t size);
  444. SRT_ATR_NODISCARD bool processSrtMsg(const CPacket *ctrlpkt);
  445. SRT_ATR_NODISCARD int processSrtMsg_HSREQ(const uint32_t* srtdata, size_t bytelen, uint32_t ts, int hsv);
  446. SRT_ATR_NODISCARD int processSrtMsg_HSRSP(const uint32_t* srtdata, size_t bytelen, uint32_t ts, int hsv);
  447. SRT_ATR_NODISCARD bool interpretSrtHandshake(const CHandShake& hs, const CPacket& hspkt, uint32_t* out_data, size_t* out_len);
  448. SRT_ATR_NODISCARD bool checkApplyFilterConfig(const std::string& cs);
  449. #if ENABLE_BONDING
  450. static CUDTGroup& newGroup(const int); // defined EXCEPTIONALLY in api.cpp for convenience reasons
  451. // Note: This is an "interpret" function, which should treat the tp as
  452. // "possibly group type" that might be out of the existing values.
  453. SRT_ATR_NODISCARD bool interpretGroup(const int32_t grpdata[], size_t data_size, int hsreq_type_cmd);
  454. SRT_ATR_NODISCARD SRTSOCKET makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE tp, uint32_t link_flags);
  455. void synchronizeWithGroup(CUDTGroup* grp);
  456. #endif
  457. void updateAfterSrtHandshake(int hsv);
  458. void updateSrtRcvSettings();
  459. void updateSrtSndSettings();
  460. void updateIdleLinkFrom(CUDT* source);
  461. /// @brief Drop packets too late to be delivered if any.
  462. /// @returns the number of packets actually dropped.
  463. SRT_ATTR_REQUIRES2(m_RecvAckLock, m_StatsLock)
  464. int sndDropTooLate();
  465. /// @bried Allow packet retransmission.
  466. /// Depending on the configuration mode (live / file), retransmission
  467. /// can be blocked if e.g. there are original packets pending to be sent.
  468. /// @return true if retransmission is allowed; false otherwise.
  469. bool isRetransmissionAllowed(const time_point& tnow);
  470. /// Connect to a UDT entity as per hs request. This will update
  471. /// required data in the entity, then update them also in the hs structure,
  472. /// and then send the response back to the caller.
  473. /// @param agent [in] The address to which the UDT entity is bound.
  474. /// @param peer [in] The address of the listening UDT entity.
  475. /// @param hspkt [in] The original packet that brought the handshake.
  476. /// @param hs [in/out] The handshake information sent by the peer side (in), negotiated value (out).
  477. void acceptAndRespond(const sockaddr_any& agent, const sockaddr_any& peer, const CPacket& hspkt, CHandShake& hs);
  478. /// Write back to the hs structure the data after they have been
  479. /// negotiated by acceptAndRespond.
  480. void rewriteHandshakeData(const sockaddr_any& peer, CHandShake& w_hs);
  481. bool runAcceptHook(CUDT* acore, const sockaddr* peer, const CHandShake& hs, const CPacket& hspkt);
  482. /// Close the opened UDT entity.
  483. bool closeInternal();
  484. void updateBrokenConnection();
  485. void completeBrokenConnectionDependencies(int errorcode);
  486. /// Request UDT to send out a data block "data" with size of "len".
  487. /// @param data [in] The address of the application data to be sent.
  488. /// @param len [in] The size of the data block.
  489. /// @return Actual size of data sent.
  490. SRT_ATR_NODISCARD int send(const char* data, int len)
  491. {
  492. return sendmsg(data, len, SRT_MSGTTL_INF, false, 0);
  493. }
  494. /// Request UDT to receive data to a memory block "data" with size of "len".
  495. /// @param data [out] data received.
  496. /// @param len [in] The desired size of data to be received.
  497. /// @return Actual size of data received.
  498. SRT_ATR_NODISCARD int recv(char* data, int len);
  499. /// send a message of a memory block "data" with size of "len".
  500. /// @param data [out] data received.
  501. /// @param len [in] The desired size of data to be received.
  502. /// @param ttl [in] the time-to-live of the message.
  503. /// @param inorder [in] if the message should be delivered in order.
  504. /// @param srctime [in] Time when the data were ready to send.
  505. /// @return Actual size of data sent.
  506. SRT_ATR_NODISCARD int sendmsg(const char* data, int len, int ttl, bool inorder, int64_t srctime);
  507. /// Receive a message to buffer "data".
  508. /// @param data [out] data received.
  509. /// @param len [in] size of the buffer.
  510. /// @return Actual size of data received.
  511. SRT_ATR_NODISCARD int sendmsg2(const char* data, int len, SRT_MSGCTRL& w_m);
  512. SRT_ATR_NODISCARD int recvmsg(char* data, int len, int64_t& srctime);
  513. SRT_ATR_NODISCARD int recvmsg2(char* data, int len, SRT_MSGCTRL& w_m);
  514. SRT_ATR_NODISCARD int receiveMessage(char* data, int len, SRT_MSGCTRL& w_m, int erh = 1 /*throw exception*/);
  515. SRT_ATR_NODISCARD int receiveBuffer(char* data, int len);
  516. size_t dropMessage(int32_t seqtoskip);
  517. /// Request UDT to send out a file described as "fd", starting from "offset", with size of "size".
  518. /// @param ifs [in] The input file stream.
  519. /// @param offset [in, out] From where to read and send data; output is the new offset when the call returns.
  520. /// @param size [in] How many data to be sent.
  521. /// @param block [in] size of block per read from disk
  522. /// @return Actual size of data sent.
  523. SRT_ATR_NODISCARD int64_t sendfile(std::fstream& ifs, int64_t& offset, int64_t size, int block = 366000);
  524. /// Request UDT to receive data into a file described as "fd", starting from "offset", with expected size of "size".
  525. /// @param ofs [out] The output file stream.
  526. /// @param offset [in, out] From where to write data; output is the new offset when the call returns.
  527. /// @param size [in] How many data to be received.
  528. /// @param block [in] size of block per write to disk
  529. /// @return Actual size of data received.
  530. SRT_ATR_NODISCARD int64_t recvfile(std::fstream& ofs, int64_t& offset, int64_t size, int block = 7320000);
  531. /// Configure UDT options.
  532. /// @param optName [in] The enum name of a UDT option.
  533. /// @param optval [in] The value to be set.
  534. /// @param optlen [in] size of "optval".
  535. void setOpt(SRT_SOCKOPT optName, const void* optval, int optlen);
  536. /// Read UDT options.
  537. /// @param optName [in] The enum name of a UDT option.
  538. /// @param optval [in] The value to be returned.
  539. /// @param optlen [out] size of "optval".
  540. void getOpt(SRT_SOCKOPT optName, void* optval, int& w_optlen);
  541. #if ENABLE_BONDING
  542. /// Applies the configuration set on the socket.
  543. /// Any errors in this process are reported by exception.
  544. SRT_ERRNO applyMemberConfigObject(const SRT_SocketOptionObject& opt);
  545. #endif
  546. /// read the performance data with bytes counters since bstats()
  547. ///
  548. /// @param perf [in, out] pointer to a CPerfMon structure to record the performance data.
  549. /// @param clear [in] flag to decide if the local performance trace should be cleared.
  550. /// @param instantaneous [in] flag to request instantaneous data
  551. /// instead of moving averages.
  552. void bstats(CBytePerfMon* perf, bool clear = true, bool instantaneous = false);
  553. /// Mark sequence contained in the given packet as not lost. This
  554. /// removes the loss record from both current receiver loss list and
  555. /// the receiver fresh loss list.
  556. void unlose(const CPacket& oldpacket);
  557. void dropFromLossLists(int32_t from, int32_t to);
  558. bool getFirstNoncontSequence(int32_t& w_seq, std::string& w_log_reason);
  559. SRT_ATTR_EXCLUDES(m_ConnectionLock)
  560. void checkSndTimers();
  561. /// @brief Check and perform KM refresh if needed.
  562. void checkSndKMRefresh();
  563. void handshakeDone()
  564. {
  565. m_iSndHsRetryCnt = 0;
  566. }
  567. int64_t withOverhead(int64_t basebw)
  568. {
  569. return (basebw * (100 + m_config.iOverheadBW))/100;
  570. }
  571. static double Bps2Mbps(int64_t basebw)
  572. {
  573. return double(basebw) * 8.0/1000000.0;
  574. }
  575. bool stillConnected()
  576. {
  577. // Still connected is when:
  578. // - no "broken" condition appeared (security, protocol error, response timeout)
  579. return !m_bBroken
  580. // - still connected (no one called srt_close())
  581. && m_bConnected
  582. // - isn't currently closing (srt_close() called, response timeout, shutdown)
  583. && !m_bClosing;
  584. }
  585. int sndSpaceLeft()
  586. {
  587. return static_cast<int>(sndBuffersLeft() * maxPayloadSize());
  588. }
  589. int sndBuffersLeft()
  590. {
  591. return m_config.iSndBufSize - m_pSndBuffer->getCurrBufSize();
  592. }
  593. time_point socketStartTime()
  594. {
  595. return m_stats.tsStartTime;
  596. }
  597. SRT_ATTR_EXCLUDES(m_RcvBufferLock)
  598. bool isRcvBufferReady() const;
  599. SRT_ATTR_REQUIRES(m_RcvBufferLock)
  600. bool isRcvBufferReadyNoLock() const;
  601. // TSBPD thread main function.
  602. static void* tsbpd(void* param);
  603. /// Drop too late packets (receiver side). Update loss lists and ACK positions.
  604. /// The @a seqno packet itself is not dropped.
  605. /// @param seqno [in] The sequence number of the first packets following those to be dropped.
  606. /// @return The number of packets dropped.
  607. int rcvDropTooLateUpTo(int seqno);
  608. static loss_seqs_t defaultPacketArrival(void* vself, CPacket& pkt);
  609. static loss_seqs_t groupPacketArrival(void* vself, CPacket& pkt);
  610. CRateEstimator getRateEstimator() const
  611. {
  612. if (!m_pSndBuffer)
  613. return CRateEstimator();
  614. return m_pSndBuffer->getRateEstimator();
  615. }
  616. void setRateEstimator(const CRateEstimator& rate)
  617. {
  618. if (!m_pSndBuffer)
  619. return;
  620. m_pSndBuffer->setRateEstimator(rate);
  621. updateCC(TEV_SYNC, EventVariant(0));
  622. }
  623. private: // Identification
  624. CUDTSocket* const m_parent; // Temporary, until the CUDTSocket class is merged with CUDT
  625. SRTSOCKET m_SocketID; // UDT socket number
  626. SRTSOCKET m_PeerID; // Peer ID, for multiplexer
  627. // HSv4 (legacy handshake) support)
  628. time_point m_tsSndHsLastTime; // Last SRT handshake request time
  629. int m_iSndHsRetryCnt; // SRT handshake retries left
  630. #if ENABLE_BONDING
  631. SRT_GROUP_TYPE m_HSGroupType; // Group type about-to-be-set in the handshake
  632. #endif
  633. private:
  634. int m_iMaxSRTPayloadSize; // Maximum/regular payload size, in bytes
  635. int m_iTsbPdDelay_ms; // Rx delay to absorb burst, in milliseconds
  636. int m_iPeerTsbPdDelay_ms; // Tx delay that the peer uses to absorb burst, in milliseconds
  637. bool m_bTLPktDrop; // Enable Too-late Packet Drop
  638. SRT_ATTR_PT_GUARDED_BY(m_ConnectionLock)
  639. UniquePtr<CCryptoControl> m_pCryptoControl; // Crypto control module
  640. CCache<CInfoBlock>* m_pCache; // Network information cache
  641. // Congestion control
  642. std::vector<EventSlot> m_Slots[TEV_E_SIZE];
  643. SrtCongestion m_CongCtl;
  644. // Packet filtering
  645. PacketFilter m_PacketFilter;
  646. SRT_ARQLevel m_PktFilterRexmitLevel;
  647. std::string m_sPeerPktFilterConfigString;
  648. // Attached tool function
  649. void EmitSignal(ETransmissionEvent tev, EventVariant var);
  650. // Internal state
  651. sync::atomic<bool> m_bListening; // If the UDT entity is listening to connection
  652. sync::atomic<bool> m_bConnecting; // The short phase when connect() is called but not yet completed
  653. sync::atomic<bool> m_bConnected; // Whether the connection is on or off
  654. sync::atomic<bool> m_bClosing; // If the UDT entity is closing
  655. sync::atomic<bool> m_bShutdown; // If the peer side has shutdown the connection
  656. sync::atomic<bool> m_bBroken; // If the connection has been broken
  657. sync::atomic<bool> m_bBreakAsUnstable; // A flag indicating that the socket should become broken because it has been unstable for too long.
  658. sync::atomic<bool> m_bPeerHealth; // If the peer status is normal
  659. sync::atomic<int> m_RejectReason;
  660. bool m_bOpened; // If the UDT entity has been opened
  661. // A counter (number of GC checks happening every 1s) to let the GC tag this socket as closed.
  662. sync::atomic<int> m_iBrokenCounter; // If a broken socket still has data in the receiver buffer, it is not marked closed until the counter is 0.
  663. int m_iEXPCount; // Expiration counter
  664. sync::atomic<int> m_iBandwidth; // Estimated bandwidth, number of packets per second
  665. sync::atomic<int> m_iSRTT; // Smoothed RTT (an exponentially-weighted moving average (EWMA)
  666. // of an endpoint's RTT samples), in microseconds
  667. sync::atomic<int> m_iRTTVar; // The variation in the RTT samples (RTT variance), in microseconds
  668. sync::atomic<bool> m_bIsFirstRTTReceived; // True if the first RTT sample was obtained from the ACK/ACKACK pair
  669. // at the receiver side or received by the sender from an ACK packet.
  670. // It's used to reset the initial value of smoothed RTT (m_iSRTT)
  671. // at the beginning of transmission (including the one taken from
  672. // cache). False by default.
  673. sync::atomic<int> m_iDeliveryRate; // Packet arrival rate at the receiver side
  674. sync::atomic<int> m_iByteDeliveryRate; // Byte arrival rate at the receiver side
  675. CHandShake m_ConnReq; // Connection request
  676. CHandShake m_ConnRes; // Connection response
  677. CHandShake::RendezvousState m_RdvState; // HSv5 rendezvous state
  678. HandshakeSide m_SrtHsSide; // HSv5 rendezvous handshake side resolved from cookie contest (DRAW if not yet resolved)
  679. private: // Sending related data
  680. CSndBuffer* m_pSndBuffer; // Sender buffer
  681. CSndLossList* m_pSndLossList; // Sender loss list
  682. CPktTimeWindow<16, 16> m_SndTimeWindow; // Packet sending time window
  683. #ifdef ENABLE_MAXREXMITBW
  684. CSndRateEstimator m_SndRexmitRate; // Retransmission rate estimation.
  685. #endif
  686. atomic_duration m_tdSendInterval; // Inter-packet time, in CPU clock cycles
  687. atomic_duration m_tdSendTimeDiff; // Aggregate difference in inter-packet sending time
  688. SRT_ATTR_GUARDED_BY(m_RecvAckLock)
  689. sync::atomic<int> m_iFlowWindowSize; // Flow control window size
  690. double m_dCongestionWindow; // Congestion window size
  691. private: // Timers
  692. atomic_time_point m_tsNextACKTime; // Next ACK time, in CPU clock cycles, same below
  693. atomic_time_point m_tsNextNAKTime; // Next NAK time
  694. duration m_tdACKInterval; // ACK interval
  695. duration m_tdNAKInterval; // NAK interval
  696. SRT_ATTR_GUARDED_BY(m_RecvAckLock)
  697. atomic_time_point m_tsLastRspTime; // Timestamp of last response from the peer
  698. time_point m_tsLastRspAckTime; // (SND) Timestamp of last ACK from the peer
  699. atomic_time_point m_tsLastSndTime; // Timestamp of last data/ctrl sent (in system ticks)
  700. time_point m_tsLastWarningTime; // Last time that a warning message is sent
  701. atomic_time_point m_tsLastReqTime; // last time when a connection request is sent
  702. time_point m_tsRcvPeerStartTime;
  703. time_point m_tsLingerExpiration; // Linger expiration time (for GC to close a socket with data in sending buffer)
  704. time_point m_tsLastAckTime; // (RCV) Timestamp of last ACK
  705. duration m_tdMinNakInterval; // NAK timeout lower bound; too small value can cause unnecessary retransmission
  706. duration m_tdMinExpInterval; // Timeout lower bound threshold: too small timeout can cause problem
  707. int m_iPktCount; // Packet counter for ACK
  708. int m_iLightACKCount; // Light ACK counter
  709. time_point m_tsNextSendTime; // Scheduled time of next packet sending
  710. sync::atomic<int32_t> m_iSndLastFullAck; // Last full ACK received
  711. SRT_ATTR_GUARDED_BY(m_RecvAckLock)
  712. sync::atomic<int32_t> m_iSndLastAck; // Last ACK received
  713. // NOTE: m_iSndLastDataAck is the value strictly bound to the CSndBufer object (m_pSndBuffer)
  714. // and this is the sequence number that refers to the block at position [0]. Upon acknowledgement,
  715. // this value is shifted to the acknowledged position, and the blocks are removed from the
  716. // m_pSndBuffer buffer up to excluding this sequence number.
  717. // XXX CONSIDER removing this field and giving up the maintenance of this sequence number
  718. // to the sending buffer. This way, extraction of an old packet for retransmission should
  719. // require only the lost sequence number, and how to find the packet with this sequence
  720. // will be up to the sending buffer.
  721. sync::atomic<int32_t> m_iSndLastDataAck; // The real last ACK that updates the sender buffer and loss list
  722. sync::atomic<int32_t> m_iSndCurrSeqNo; // The largest sequence number that HAS BEEN SENT
  723. sync::atomic<int32_t> m_iSndNextSeqNo; // The sequence number predicted to be placed at the currently scheduled packet
  724. // Note important differences between Curr and Next fields:
  725. // - m_iSndCurrSeqNo: this is used by SRT:SndQ:worker thread and it's operated from CUDT::packData
  726. // function only. This value represents the sequence number that has been stamped on a packet directly
  727. // before it is sent over the network.
  728. // - m_iSndNextSeqNo: this is used by the user's thread and it's operated from CUDT::sendmsg2
  729. // function only. This value represents the sequence number that is PREDICTED to be stamped on the
  730. // first block out of the block series that will be scheduled for later sending over the network
  731. // out of the data passed in this function. For a special case when the length of the data is
  732. // short enough to be passed in one UDP packet (always the case for live mode), this value is
  733. // always increased by one in this call, otherwise it will be increased by the number of blocks
  734. // scheduled for sending.
  735. int32_t m_iSndLastAck2; // Last ACK2 sent back
  736. time_point m_SndLastAck2Time; // The time when last ACK2 was sent back
  737. void setInitialSndSeq(int32_t isn)
  738. {
  739. m_iSndLastAck = isn;
  740. m_iSndLastDataAck = isn;
  741. m_iSndLastFullAck = isn;
  742. m_iSndCurrSeqNo = CSeqNo::decseq(isn);
  743. m_iSndNextSeqNo = isn;
  744. m_iSndLastAck2 = isn;
  745. }
  746. void setInitialRcvSeq(int32_t isn);
  747. int32_t m_iISN; // Initial Sequence Number
  748. bool m_bPeerTsbPd; // Peer accept TimeStamp-Based Rx mode
  749. bool m_bPeerTLPktDrop; // Enable sender late packet dropping
  750. bool m_bPeerNakReport; // Sender's peer (receiver) issues Periodic NAK Reports
  751. bool m_bPeerRexmitFlag; // Receiver supports rexmit flag in payload packets
  752. SRT_ATTR_GUARDED_BY(m_RecvAckLock)
  753. int32_t m_iReXmitCount; // Re-Transmit Count since last ACK
  754. time_point m_tsLogSlowDown; // The last time a log message from the "slow down" group was shown.
  755. // The "slow down" group of logs are those that can be printed too often otherwise, but can't be turned off (warnings and errors).
  756. // Currently only used by decryption failure message, therefore no mutex protection needed.
  757. /// @brief Check if a frequent log can be shown.
  758. /// @param tnow current time
  759. /// @return true if it is ok to print a frequent log message.
  760. bool frequentLogAllowed(const time_point& tnow) const;
  761. private: // Receiving related data
  762. CRcvBuffer* m_pRcvBuffer; //< Receiver buffer
  763. SRT_ATTR_GUARDED_BY(m_RcvLossLock)
  764. CRcvLossList* m_pRcvLossList; //< Receiver loss list
  765. SRT_ATTR_GUARDED_BY(m_RcvLossLock)
  766. std::deque<CRcvFreshLoss> m_FreshLoss; //< Lost sequence already added to m_pRcvLossList, but not yet sent UMSG_LOSSREPORT for.
  767. int m_iReorderTolerance; //< Current value of dynamic reorder tolerance
  768. int m_iConsecEarlyDelivery; //< Increases with every OOO packet that came <TTL-2 time, resets with every increased reorder tolerance
  769. int m_iConsecOrderedDelivery; //< Increases with every packet coming in order or retransmitted, resets with every out-of-order packet
  770. CACKWindow<ACK_WND_SIZE> m_ACKWindow; // ACK history window
  771. CPktTimeWindow<16, 64> m_RcvTimeWindow; // Packet arrival time window
  772. int32_t m_iRcvLastAck; // First unacknowledged packet seqno sent in the latest ACK.
  773. #ifdef ENABLE_LOGGING
  774. int32_t m_iDebugPrevLastAck;
  775. #endif
  776. int32_t m_iRcvLastAckAck; // (RCV) Latest packet seqno in a sent ACK acknowledged by ACKACK. RcvQTh (sendCtrlAck {r}, processCtrlAckAck {r}, processCtrlAck {r}, connection {w}).
  777. int32_t m_iAckSeqNo; // Last ACK sequence number
  778. sync::atomic<int32_t> m_iRcvCurrSeqNo; // (RCV) Largest received sequence number. RcvQTh, TSBPDTh.
  779. int32_t m_iRcvCurrPhySeqNo; // Same as m_iRcvCurrSeqNo, but physical only (disregarding a filter)
  780. bool m_bBufferWasFull; // Indicate that RX buffer was full last time a ack was sent
  781. int32_t m_iPeerISN; // Initial Sequence Number of the peer side
  782. uint32_t m_uPeerSrtVersion;
  783. uint32_t m_uPeerSrtFlags;
  784. bool m_bTsbPd; // Peer sends TimeStamp-Based Packet Delivery Packets
  785. bool m_bGroupTsbPd; // TSBPD should be used for GROUP RECEIVER instead
  786. sync::CThread m_RcvTsbPdThread; // Rcv TsbPD Thread handle
  787. sync::Condition m_RcvTsbPdCond; // TSBPD signals if reading is ready. Use together with m_RecvLock
  788. bool m_bTsbPdAckWakeup; // Signal TsbPd thread on Ack sent
  789. sync::Mutex m_RcvTsbPdStartupLock; // Protects TSBPD thread creating and joining
  790. CallbackHolder<srt_listen_callback_fn> m_cbAcceptHook;
  791. CallbackHolder<srt_connect_callback_fn> m_cbConnectHook;
  792. // FORWARDER
  793. public:
  794. static int installAcceptHook(SRTSOCKET lsn, srt_listen_callback_fn* hook, void* opaq);
  795. static int installConnectHook(SRTSOCKET lsn, srt_connect_callback_fn* hook, void* opaq);
  796. private:
  797. void installAcceptHook(srt_listen_callback_fn* hook, void* opaq)
  798. {
  799. m_cbAcceptHook.set(opaq, hook);
  800. }
  801. void installConnectHook(srt_connect_callback_fn* hook, void* opaq)
  802. {
  803. m_cbConnectHook.set(opaq, hook);
  804. }
  805. private: // synchronization: mutexes and conditions
  806. sync::Mutex m_ConnectionLock; // used to synchronize connection operation
  807. sync::Condition m_SendBlockCond; // used to block "send" call
  808. sync::Mutex m_SendBlockLock; // lock associated to m_SendBlockCond
  809. mutable sync::Mutex m_RcvBufferLock; // Protects the state of the m_pRcvBuffer
  810. // Protects access to m_iSndCurrSeqNo, m_iSndLastAck
  811. sync::Mutex m_RecvAckLock; // Protects the state changes while processing incoming ACK (SRT_EPOLL_OUT)
  812. sync::Condition m_RecvDataCond; // used to block "srt_recv*" when there is no data. Use together with m_RecvLock
  813. sync::Mutex m_RecvLock; // used to synchronize "srt_recv*" call, protects TSBPD drift updates (CRcvBuffer::isRcvDataReady())
  814. sync::Mutex m_SendLock; // used to synchronize "send" call
  815. sync::Mutex m_RcvLossLock; // Protects the receiver loss list (access: CRcvQueue::worker, CUDT::tsbpd)
  816. mutable sync::Mutex m_StatsLock; // used to synchronize access to trace statistics
  817. void initSynch();
  818. void destroySynch();
  819. void releaseSynch();
  820. private: // Common connection Congestion Control setup
  821. // This can fail only when it failed to create a congctl
  822. // which only may happen when the congctl list is extended
  823. // with user-supplied congctl modules, not a case so far.
  824. SRT_ATR_NODISCARD
  825. SRT_REJECT_REASON setupCC();
  826. // for updateCC it's ok to discard the value. This returns false only if
  827. // the congctl isn't created, and this can be prevented from.
  828. bool updateCC(ETransmissionEvent, const EventVariant arg);
  829. // Failure to create the crypter means that an encrypted
  830. // connection should be rejected if ENFORCEDENCRYPTION is on.
  831. SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock)
  832. bool createCrypter(HandshakeSide side, bool bidi);
  833. private: // Generation and processing of packets
  834. void sendCtrl(UDTMessageType pkttype, const int32_t* lparam = NULL, void* rparam = NULL, int size = 0);
  835. /// Forms and sends ACK packet
  836. /// @note Assumes @ctrlpkt already has a timestamp.
  837. ///
  838. /// @param ctrlpkt A control packet structure to fill. It must have a timestemp already set.
  839. /// @param size Sends lite ACK if size is SEND_LITE_ACK, Full ACK otherwise
  840. ///
  841. /// @returns the nmber of packets sent.
  842. int sendCtrlAck(CPacket& ctrlpkt, int size);
  843. void sendLossReport(const std::vector< std::pair<int32_t, int32_t> >& losslist);
  844. void processCtrl(const CPacket& ctrlpkt);
  845. /// @brief Process incoming control ACK packet.
  846. /// @param ctrlpkt incoming ACK packet
  847. /// @param currtime current clock time
  848. void processCtrlAck(const CPacket& ctrlpkt, const time_point& currtime);
  849. /// @brief Process incoming control ACKACK packet.
  850. /// @param ctrlpkt incoming ACKACK packet
  851. /// @param tsArrival time when packet has arrived (used to calculate RTT)
  852. void processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsArrival);
  853. /// @brief Process incoming loss report (NAK) packet.
  854. /// @param ctrlpkt incoming NAK packet
  855. void processCtrlLossReport(const CPacket& ctrlpkt);
  856. /// @brief Process incoming handshake control packet
  857. /// @param ctrlpkt incoming HS packet
  858. void processCtrlHS(const CPacket& ctrlpkt);
  859. /// @brief Process incoming drop request control packet
  860. /// @param ctrlpkt incoming drop request packet
  861. void processCtrlDropReq(const CPacket& ctrlpkt);
  862. /// @brief Process incoming shutdown control packet
  863. void processCtrlShutdown();
  864. /// @brief Process incoming user defined control packet
  865. /// @param ctrlpkt incoming user defined packet
  866. void processCtrlUserDefined(const CPacket& ctrlpkt);
  867. /// @brief Update sender's loss list on an incoming acknowledgement.
  868. /// @param ackdata_seqno sequence number of a data packet being acknowledged
  869. void updateSndLossListOnACK(int32_t ackdata_seqno);
  870. /// Pack a packet from a list of lost packets.
  871. /// @param packet [in, out] a packet structure to fill
  872. /// @return payload size on success, <=0 on failure
  873. int packLostData(CPacket &packet);
  874. /// Pack a unique data packet (never sent so far) in CPacket for sending.
  875. /// @param packet [in, out] a CPacket structure to fill.
  876. ///
  877. /// @return true if a packet has been packets; false otherwise.
  878. bool packUniqueData(CPacket& packet);
  879. /// Pack in CPacket the next data to be send.
  880. ///
  881. /// @param packet [out] a CPacket structure to fill
  882. /// @param nexttime [out] Time when this socket should be next time picked up for processing.
  883. /// @param src_addr [out] Source address to pass to channel's sendto
  884. ///
  885. /// @retval true A packet was extracted for sending, the socket should be rechecked at @a nexttime
  886. /// @retval false Nothing was extracted for sending, @a nexttime should be ignored
  887. bool packData(CPacket& packet, time_point& nexttime, sockaddr_any& src_addr);
  888. int processData(CUnit* unit);
  889. /// This function passes the incoming packet to the initial processing
  890. /// (like packet filter) and is about to store it effectively to the
  891. /// receiver buffer and do some postprocessing (decryption) if necessary
  892. /// and report the status thereof.
  893. ///
  894. /// @param incoming [in] The packet coming from the network medium
  895. /// @param w_new_inserted [out] Set false, if the packet already exists, otherwise true (packet added)
  896. /// @param w_was_sent_in_order [out] Set false, if the packet was belated, but had no R flag set.
  897. /// @param w_srt_loss_seqs [out] Gets inserted a loss, if this function has detected it.
  898. ///
  899. /// @return 0 The call was successful (regardless if the packet was accepted or not).
  900. /// @return -1 The call has failed: no space left in the buffer.
  901. /// @return -2 The incoming packet exceeds the expected sequence by more than a length of the buffer (irrepairable discrepancy).
  902. int handleSocketPacketReception(const std::vector<CUnit*>& incoming, bool& w_new_inserted, bool& w_was_sent_in_order, CUDT::loss_seqs_t& w_srt_loss_seqs);
  903. /// Get the packet's TSBPD time.
  904. /// The @a grp passed by void* is not used yet
  905. /// and shall not be used when ENABLE_BONDING=0.
  906. time_point getPktTsbPdTime(void* grp, const CPacket& packet);
  907. /// Checks and spawns the TSBPD thread if required.
  908. int checkLazySpawnTsbPdThread();
  909. void processClose();
  910. /// Process the request after receiving the handshake from caller.
  911. /// The @a packet param is passed here as non-const because this function
  912. /// will need to make a temporary back-and-forth endian swap; it doesn't intend to
  913. /// modify the object permanently.
  914. /// @param addr source address from where the request came
  915. /// @param packet contents of the packet
  916. /// @return URQ code, possibly containing reject reason
  917. int processConnectRequest(const sockaddr_any& addr, CPacket& packet);
  918. static void addLossRecord(std::vector<int32_t>& lossrecord, int32_t lo, int32_t hi);
  919. int32_t bake(const sockaddr_any& addr, int32_t previous_cookie = 0, int correction = 0);
  920. #if ENABLE_BONDING
  921. /// @brief Drop packets in the recv buffer behind group_recv_base.
  922. /// Updates m_iRcvLastSkipAck if it's behind group_recv_base.
  923. void dropToGroupRecvBase();
  924. #endif
  925. void processKeepalive(const CPacket& ctrlpkt, const time_point& tsArrival);
  926. /// Retrieves the available size of the receiver buffer.
  927. /// Expects that m_RcvBufferLock is locked.
  928. SRT_ATTR_REQUIRES(m_RcvBufferLock)
  929. size_t getAvailRcvBufferSizeNoLock() const;
  930. private: // Trace
  931. struct CoreStats
  932. {
  933. time_point tsStartTime; // timestamp when the UDT entity is started
  934. stats::Sender sndr; // sender statistics
  935. stats::Receiver rcvr; // receiver statistics
  936. int64_t m_sndDurationTotal; // total real time for sending
  937. time_point tsLastSampleTime; // last performance sample time
  938. int traceReorderDistance;
  939. double traceBelatedTime;
  940. int64_t sndDuration; // real time for sending
  941. time_point sndDurationCounter; // timers to record the sending Duration
  942. } m_stats;
  943. public:
  944. static const int SELF_CLOCK_INTERVAL = 64; // ACK interval for self-clocking
  945. static const int SEND_LITE_ACK = sizeof(int32_t); // special size for ack containing only ack seq
  946. static const int PACKETPAIR_MASK = 0xF;
  947. private: // Timers functions
  948. #if ENABLE_BONDING
  949. time_point m_tsFreshActivation; // GROUPS: time of fresh activation of the link, or 0 if past the activation phase or idle
  950. time_point m_tsUnstableSince; // GROUPS: time since unexpected ACK delay experienced, or 0 if link seems healthy
  951. time_point m_tsWarySince; // GROUPS: time since an unstable link has first some response
  952. #endif
  953. static const int BECAUSE_NO_REASON = 0, // NO BITS
  954. BECAUSE_ACK = 1 << 0,
  955. BECAUSE_LITEACK = 1 << 1,
  956. BECAUSE_NAKREPORT = 1 << 2,
  957. LAST_BECAUSE_BIT = 3;
  958. void checkTimers();
  959. void considerLegacySrtHandshake(const time_point &timebase);
  960. int checkACKTimer (const time_point& currtime);
  961. int checkNAKTimer(const time_point& currtime);
  962. bool checkExpTimer (const time_point& currtime, int check_reason); // returns true if the connection is expired
  963. void checkRexmitTimer(const time_point& currtime);
  964. private: // for UDP multiplexer
  965. CSndQueue* m_pSndQueue; // packet sending queue
  966. CRcvQueue* m_pRcvQueue; // packet receiving queue
  967. sockaddr_any m_PeerAddr; // peer address
  968. sockaddr_any m_SourceAddr; // override UDP source address with this one when sending
  969. uint32_t m_piSelfIP[4]; // local UDP IP address
  970. CSNode* m_pSNode; // node information for UDT list used in snd queue
  971. CRNode* m_pRNode; // node information for UDT list used in rcv queue
  972. public: // For SrtCongestion
  973. const CSndQueue* sndQueue() { return m_pSndQueue; }
  974. const CRcvQueue* rcvQueue() { return m_pRcvQueue; }
  975. private: // for epoll
  976. std::set<int> m_sPollID; // set of epoll ID to trigger
  977. void addEPoll(const int eid);
  978. void removeEPollEvents(const int eid);
  979. void removeEPollID(const int eid);
  980. };
  981. } // namespace srt
  982. #endif