group.h 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809
  1. /*
  2. * SRT - Secure, Reliable, Transport
  3. * Copyright (c) 2020 Haivision Systems Inc.
  4. *
  5. * This Source Code Form is subject to the terms of the Mozilla Public
  6. * License, v. 2.0. If a copy of the MPL was not distributed with this
  7. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  8. *
  9. */
  10. /*****************************************************************************
  11. Written by
  12. Haivision Systems Inc.
  13. *****************************************************************************/
  14. #ifndef INC_SRT_GROUP_H
  15. #define INC_SRT_GROUP_H
  16. #include "srt.h"
  17. #include "common.h"
  18. #include "packet.h"
  19. #include "group_common.h"
  20. #include "group_backup.h"
  21. namespace srt
  22. {
  23. #if ENABLE_HEAVY_LOGGING
  24. const char* const srt_log_grp_state[] = {"PENDING", "IDLE", "RUNNING", "BROKEN"};
  25. #endif
  26. class CUDTGroup
  27. {
  28. friend class CUDTUnited;
  29. typedef sync::steady_clock::time_point time_point;
  30. typedef sync::steady_clock::duration duration;
  31. typedef sync::steady_clock steady_clock;
  32. typedef groups::SocketData SocketData;
  33. typedef groups::SendBackupCtx SendBackupCtx;
  34. typedef groups::BackupMemberState BackupMemberState;
  35. public:
  36. typedef SRT_MEMBERSTATUS GroupState;
  37. // Note that the use of states may differ in particular group types:
  38. //
  39. // Broadcast: links that are freshly connected become PENDING and then IDLE only
  40. // for a short moment to be activated immediately at the nearest sending operation.
  41. //
  42. // Balancing: like with broadcast, just that the link activation gets its shared percentage
  43. // of traffic balancing
  44. //
  45. // Multicast: The link is never idle. The data are always sent over the UDP multicast link
  46. // and the receiver simply gets subscribed and reads packets once it's ready.
  47. //
  48. // Backup: The link stays idle until it's activated, and the activation can only happen
  49. // at the moment when the currently active link is "suspected of being likely broken"
  50. // (the current active link fails to receive ACK in a time when two ACKs should already
  51. // be received). After a while when the current active link is confirmed broken, it turns
  52. // into broken state.
  53. static const char* StateStr(GroupState);
  54. static int32_t s_tokenGen;
  55. static int32_t genToken() { ++s_tokenGen; if (s_tokenGen < 0) s_tokenGen = 0; return s_tokenGen;}
  56. struct ConfigItem
  57. {
  58. SRT_SOCKOPT so;
  59. std::vector<unsigned char> value;
  60. template <class T>
  61. bool get(T& refr)
  62. {
  63. if (sizeof(T) > value.size())
  64. return false;
  65. refr = *(T*)&value[0];
  66. return true;
  67. }
  68. ConfigItem(SRT_SOCKOPT o, const void* val, int size)
  69. : so(o)
  70. {
  71. value.resize(size);
  72. unsigned char* begin = (unsigned char*)val;
  73. std::copy(begin, begin + size, value.begin());
  74. }
  75. struct OfType
  76. {
  77. SRT_SOCKOPT so;
  78. OfType(SRT_SOCKOPT soso)
  79. : so(soso)
  80. {
  81. }
  82. bool operator()(ConfigItem& ci) { return ci.so == so; }
  83. };
  84. };
  85. typedef std::list<SocketData> group_t;
  86. typedef group_t::iterator gli_t;
  87. typedef std::vector< std::pair<SRTSOCKET, srt::CUDTSocket*> > sendable_t;
  88. struct Sendstate
  89. {
  90. SRTSOCKET id;
  91. SocketData* mb;
  92. int stat;
  93. int code;
  94. };
  95. CUDTGroup(SRT_GROUP_TYPE);
  96. ~CUDTGroup();
  97. SocketData* add(SocketData data);
  98. struct HaveID
  99. {
  100. SRTSOCKET id;
  101. HaveID(SRTSOCKET sid)
  102. : id(sid)
  103. {
  104. }
  105. bool operator()(const SocketData& s) { return s.id == id; }
  106. };
  107. bool contains(SRTSOCKET id, SocketData*& w_f)
  108. {
  109. srt::sync::ScopedLock g(m_GroupLock);
  110. gli_t f = std::find_if(m_Group.begin(), m_Group.end(), HaveID(id));
  111. if (f == m_Group.end())
  112. {
  113. w_f = NULL;
  114. return false;
  115. }
  116. w_f = &*f;
  117. return true;
  118. }
  119. // NEED LOCKING
  120. gli_t begin() { return m_Group.begin(); }
  121. gli_t end() { return m_Group.end(); }
  122. /// Remove the socket from the group container.
  123. /// REMEMBER: the group spec should be taken from the socket
  124. /// (set m_GroupOf and m_GroupMemberData to NULL
  125. /// PRIOR TO calling this function.
  126. /// @param id Socket ID to look for in the container to remove
  127. /// @return true if the container still contains any sockets after the operation
  128. bool remove(SRTSOCKET id)
  129. {
  130. using srt_logging::gmlog;
  131. srt::sync::ScopedLock g(m_GroupLock);
  132. bool empty = false;
  133. LOGC(gmlog.Note, log << "group/remove: removing member @" << id << " from group $" << m_GroupID);
  134. gli_t f = std::find_if(m_Group.begin(), m_Group.end(), HaveID(id));
  135. if (f != m_Group.end())
  136. {
  137. m_Group.erase(f);
  138. // Reset sequence numbers on a dead group so that they are
  139. // initialized anew with the new alive connection within
  140. // the group.
  141. // XXX The problem is that this should be done after the
  142. // socket is considered DISCONNECTED, not when it's being
  143. // closed. After being disconnected, the sequence numbers
  144. // are no longer valid, and will be reinitialized when the
  145. // socket is connected again. This may stay as is for now
  146. // as in SRT it's not predicted to do anything with the socket
  147. // that was disconnected other than immediately closing it.
  148. if (m_Group.empty())
  149. {
  150. // When the group is empty, there's no danger that this
  151. // number will collide with any ISN provided by a socket.
  152. // Also since now every socket will derive this ISN.
  153. m_iLastSchedSeqNo = generateISN();
  154. resetInitialRxSequence();
  155. empty = true;
  156. }
  157. }
  158. else
  159. {
  160. HLOGC(gmlog.Debug, log << "group/remove: IPE: id @" << id << " NOT FOUND");
  161. empty = true; // not exactly true, but this is to cause error on group in the APP
  162. }
  163. if (m_Group.empty())
  164. {
  165. m_bOpened = false;
  166. m_bConnected = false;
  167. }
  168. return !empty;
  169. }
  170. bool groupEmpty()
  171. {
  172. srt::sync::ScopedLock g(m_GroupLock);
  173. return m_Group.empty();
  174. }
  175. void setGroupConnected();
  176. int send(const char* buf, int len, SRT_MSGCTRL& w_mc);
  177. int sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc);
  178. int sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc);
  179. static int32_t generateISN();
  180. private:
  181. // For Backup, sending all previous packet
  182. int sendBackupRexmit(srt::CUDT& core, SRT_MSGCTRL& w_mc);
  183. // Support functions for sendBackup and sendBroadcast
  184. /// Check if group member is idle.
  185. /// @param d group member
  186. /// @param[in,out] w_wipeme array of sockets to remove from group
  187. /// @param[in,out] w_pendingLinks array of sockets pending for connection
  188. /// @returns true if d is idle (standby), false otherwise
  189. bool send_CheckIdle(const gli_t d, std::vector<SRTSOCKET>& w_wipeme, std::vector<SRTSOCKET>& w_pendingLinks);
  190. /// This function checks if the member has just become idle (check if sender buffer is empty) to send a KEEPALIVE immidiatelly.
  191. /// @todo Check it is some abandoned logic.
  192. void sendBackup_CheckIdleTime(gli_t w_d);
  193. /// Qualify states of member links.
  194. /// [[using locked(this->m_GroupLock, m_pGlobal->m_GlobControlLock)]]
  195. /// @param[out] w_sendBackupCtx the context will be updated with state qualifications
  196. /// @param[in] currtime current timestamp
  197. void sendBackup_QualifyMemberStates(SendBackupCtx& w_sendBackupCtx, const steady_clock::time_point& currtime);
  198. void sendBackup_AssignBackupState(srt::CUDT& socket, BackupMemberState state, const steady_clock::time_point& currtime);
  199. /// Qualify the state of the active link: fresh, stable, unstable, wary.
  200. /// @retval active backup member state: fresh, stable, unstable, wary.
  201. BackupMemberState sendBackup_QualifyActiveState(const gli_t d, const time_point currtime);
  202. BackupMemberState sendBackup_QualifyIfStandBy(const gli_t d);
  203. /// Sends the same payload over all active members.
  204. /// @param[in] buf payload
  205. /// @param[in] len payload length in bytes
  206. /// @param[in,out] w_mc message control
  207. /// @param[in] currtime current time
  208. /// @param[in] currseq current packet sequence number
  209. /// @param[out] w_nsuccessful number of members with successfull sending.
  210. /// @param[in,out] maxActiveWeight
  211. /// @param[in,out] sendBackupCtx context
  212. /// @param[in,out] w_cx error
  213. /// @return group send result: -1 if sending over all members has failed; number of bytes sent overwise.
  214. int sendBackup_SendOverActive(const char* buf, int len, SRT_MSGCTRL& w_mc, const steady_clock::time_point& currtime, int32_t& w_curseq,
  215. size_t& w_nsuccessful, uint16_t& w_maxActiveWeight, SendBackupCtx& w_sendBackupCtx, CUDTException& w_cx);
  216. /// Check link sending status
  217. /// @param[in] currtime Current time (logging only)
  218. /// @param[in] send_status Result of sending over the socket
  219. /// @param[in] lastseq Last sent sequence number before the current sending operation
  220. /// @param[in] pktseq Packet sequence number currently tried to be sent
  221. /// @param[out] w_u CUDT unit of the current member (to allow calling overrideSndSeqNo)
  222. /// @param[out] w_curseq Group's current sequence number (either -1 or the value used already for other links)
  223. /// @param[out] w_final_stat w_final_stat = send_status if sending succeeded.
  224. ///
  225. /// @returns true if the sending operation result (submitted in stat) is a success, false otherwise.
  226. bool sendBackup_CheckSendStatus(const time_point& currtime,
  227. const int send_status,
  228. const int32_t lastseq,
  229. const int32_t pktseq,
  230. CUDT& w_u,
  231. int32_t& w_curseq,
  232. int& w_final_stat);
  233. void sendBackup_Buffering(const char* buf, const int len, int32_t& curseq, SRT_MSGCTRL& w_mc);
  234. size_t sendBackup_TryActivateStandbyIfNeeded(
  235. const char* buf,
  236. const int len,
  237. bool& w_none_succeeded,
  238. SRT_MSGCTRL& w_mc,
  239. int32_t& w_curseq,
  240. int32_t& w_final_stat,
  241. SendBackupCtx& w_sendBackupCtx,
  242. CUDTException& w_cx,
  243. const steady_clock::time_point& currtime);
  244. /// Check if pending sockets are to be qualified as broken.
  245. /// This qualification later results in removing the socket from a group and closing it.
  246. /// @param[in,out] a context with a list of member sockets, some pending might qualified broken
  247. void sendBackup_CheckPendingSockets(SendBackupCtx& w_sendBackupCtx, const steady_clock::time_point& currtime);
  248. /// Check if unstable sockets are to be qualified as broken.
  249. /// The main reason for such qualification is if a socket is unstable for too long.
  250. /// This qualification later results in removing the socket from a group and closing it.
  251. /// @param[in,out] a context with a list of member sockets, some pending might qualified broken
  252. void sendBackup_CheckUnstableSockets(SendBackupCtx& w_sendBackupCtx, const steady_clock::time_point& currtime);
  253. /// @brief Marks broken sockets as closed. Used in broadcast sending.
  254. /// @param w_wipeme a list of sockets to close
  255. void send_CloseBrokenSockets(std::vector<SRTSOCKET>& w_wipeme);
  256. /// @brief Marks broken sockets as closed. Used in backup sending.
  257. /// @param w_sendBackupCtx the context with a list of broken sockets
  258. void sendBackup_CloseBrokenSockets(SendBackupCtx& w_sendBackupCtx);
  259. void sendBackup_RetryWaitBlocked(SendBackupCtx& w_sendBackupCtx,
  260. int& w_final_stat,
  261. bool& w_none_succeeded,
  262. SRT_MSGCTRL& w_mc,
  263. CUDTException& w_cx);
  264. void sendBackup_SilenceRedundantLinks(SendBackupCtx& w_sendBackupCtx, const steady_clock::time_point& currtime);
  265. void send_CheckValidSockets();
  266. public:
  267. int recv(char* buf, int len, SRT_MSGCTRL& w_mc);
  268. void close();
  269. void setOpt(SRT_SOCKOPT optname, const void* optval, int optlen);
  270. void getOpt(SRT_SOCKOPT optName, void* optval, int& w_optlen);
  271. void deriveSettings(srt::CUDT* source);
  272. bool applyFlags(uint32_t flags, HandshakeSide);
  273. SRT_SOCKSTATUS getStatus();
  274. void debugMasterData(SRTSOCKET slave);
  275. bool isGroupReceiver()
  276. {
  277. // XXX add here also other group types, which
  278. // predict group receiving.
  279. return m_type == SRT_GTYPE_BROADCAST;
  280. }
  281. sync::Mutex* exp_groupLock() { return &m_GroupLock; }
  282. void addEPoll(int eid);
  283. void removeEPollEvents(const int eid);
  284. void removeEPollID(const int eid);
  285. /// @brief Update read-ready state.
  286. /// @param sock member socket ID (unused)
  287. /// @param sequence the latest packet sequence number available for reading.
  288. void updateReadState(SRTSOCKET sock, int32_t sequence);
  289. void updateWriteState();
  290. void updateFailedLink();
  291. void activateUpdateEvent(bool still_have_items);
  292. int32_t getRcvBaseSeqNo();
  293. /// Update the in-group array of packet providers per sequence number.
  294. /// Also basing on the information already provided by possibly other sockets,
  295. /// report the real status of packet loss, including packets maybe lost
  296. /// by the caller provider, but already received from elsewhere. Note that
  297. /// these packets are not ready for extraction until ACK-ed.
  298. ///
  299. /// @param exp_sequence The previously received sequence at this socket
  300. /// @param sequence The sequence of this packet
  301. /// @param provider The core of the socket for which the packet was dispatched
  302. /// @param time TSBPD time of this packet
  303. /// @return The bitmap that marks by 'false' packets lost since next to exp_sequence
  304. std::vector<bool> providePacket(int32_t exp_sequence, int32_t sequence, srt::CUDT* provider, uint64_t time);
  305. /// This is called from the ACK action by particular socket, which
  306. /// actually signs off the packet for extraction.
  307. ///
  308. /// @param core The socket core for which the ACK was sent
  309. /// @param ack The past-the-last-received ACK sequence number
  310. void readyPackets(srt::CUDT* core, int32_t ack);
  311. void syncWithSocket(const srt::CUDT& core, const HandshakeSide side);
  312. int getGroupData(SRT_SOCKGROUPDATA* pdata, size_t* psize);
  313. int getGroupData_LOCKED(SRT_SOCKGROUPDATA* pdata, size_t* psize);
  314. /// Predicted to be called from the reading function to fill
  315. /// the group data array as requested.
  316. void fillGroupData(SRT_MSGCTRL& w_out, //< MSGCTRL to be written
  317. const SRT_MSGCTRL& in //< MSGCTRL read from the data-providing socket
  318. );
  319. void copyGroupData(const CUDTGroup::SocketData& source, SRT_SOCKGROUPDATA& w_target);
  320. #if ENABLE_HEAVY_LOGGING
  321. void debugGroup();
  322. #else
  323. void debugGroup() {}
  324. #endif
  325. void ackMessage(int32_t msgno);
  326. void processKeepalive(SocketData*);
  327. void internalKeepalive(SocketData*);
  328. private:
  329. // Check if there's at least one connected socket.
  330. // If so, grab the status of all member sockets.
  331. void getGroupCount(size_t& w_size, bool& w_still_alive);
  332. srt::CUDTUnited& m_Global;
  333. srt::sync::Mutex m_GroupLock;
  334. SRTSOCKET m_GroupID;
  335. SRTSOCKET m_PeerGroupID;
  336. struct GroupContainer
  337. {
  338. private:
  339. std::list<SocketData> m_List;
  340. sync::atomic<size_t> m_SizeCache;
  341. /// This field is used only by some types of groups that need
  342. /// to keep track as to which link was lately used. Note that
  343. /// by removal of a node from the m_List container, this link
  344. /// must be appropriately reset.
  345. gli_t m_LastActiveLink;
  346. public:
  347. GroupContainer()
  348. : m_SizeCache(0)
  349. , m_LastActiveLink(m_List.end())
  350. {
  351. }
  352. // Property<gli_t> active = { m_LastActiveLink; }
  353. SRTU_PROPERTY_RW(gli_t, active, m_LastActiveLink);
  354. gli_t begin() { return m_List.begin(); }
  355. gli_t end() { return m_List.end(); }
  356. bool empty() { return m_List.empty(); }
  357. void push_back(const SocketData& data) { m_List.push_back(data); ++m_SizeCache; }
  358. void clear()
  359. {
  360. m_LastActiveLink = end();
  361. m_List.clear();
  362. m_SizeCache = 0;
  363. }
  364. size_t size() { return m_SizeCache; }
  365. void erase(gli_t it);
  366. };
  367. GroupContainer m_Group;
  368. SRT_GROUP_TYPE m_type;
  369. CUDTSocket* m_listener; // A "group" can only have one listener.
  370. srt::sync::atomic<int> m_iBusy;
  371. CallbackHolder<srt_connect_callback_fn> m_cbConnectHook;
  372. void installConnectHook(srt_connect_callback_fn* hook, void* opaq)
  373. {
  374. m_cbConnectHook.set(opaq, hook);
  375. }
  376. public:
  377. void apiAcquire() { ++m_iBusy; }
  378. void apiRelease() { --m_iBusy; }
  379. // A normal cycle of the send/recv functions is the following:
  380. // - [Initial API call for a group]
  381. // - GroupKeeper - ctor
  382. // - LOCK: GlobControlLock
  383. // - Find the group ID in the group container (break if not found)
  384. // - LOCK: GroupLock of that group
  385. // - Set BUSY flag
  386. // - UNLOCK GroupLock
  387. // - UNLOCK GlobControlLock
  388. // - [Call the sending function (sendBroadcast/sendBackup)]
  389. // - LOCK GroupLock
  390. // - Preparation activities
  391. // - Loop over group members
  392. // - Send over a single socket
  393. // - Check send status and conditions
  394. // - Exit, if nothing else to be done
  395. // - Check links to send extra
  396. // - UNLOCK GroupLock
  397. // - Wait for first ready link
  398. // - LOCK GroupLock
  399. // - Check status and find sendable link
  400. // - Send over a single socket
  401. // - Check status and update data
  402. // - UNLOCK GroupLock, Exit
  403. // - GroupKeeper - dtor
  404. // - LOCK GroupLock
  405. // - Clear BUSY flag
  406. // - UNLOCK GroupLock
  407. // END.
  408. //
  409. // The possibility for isStillBusy to go on is only the following:
  410. // 1. Before calling the API function. As GlobControlLock is locked,
  411. // the nearest lock on GlobControlLock by GroupKeeper can happen:
  412. // - before the group is moved to ClosedGroups (this allows it to be found)
  413. // - after the group is moved to ClosedGroups (this makes the group not found)
  414. // - NOT after the group was deleted, as it could not be found and occupied.
  415. //
  416. // 2. Before release of GlobControlLock (acquired by GC), but before the
  417. // API function locks GroupLock:
  418. // - the GC call to isStillBusy locks GroupLock, but BUSY flag is already set
  419. // - GC then avoids deletion of the group
  420. //
  421. // 3. In any further place up to the exit of the API implementation function,
  422. // the BUSY flag is still set.
  423. //
  424. // 4. After exit of GroupKeeper destructor and unlock of GroupLock
  425. // - the group is no longer being accessed and can be freely deleted.
  426. // - the group also can no longer be found by ID.
  427. bool isStillBusy()
  428. {
  429. sync::ScopedLock glk(m_GroupLock);
  430. return m_iBusy || !m_Group.empty();
  431. }
  432. struct BufferedMessageStorage
  433. {
  434. size_t blocksize;
  435. size_t maxstorage;
  436. std::vector<char*> storage;
  437. BufferedMessageStorage(size_t blk, size_t max = 0)
  438. : blocksize(blk)
  439. , maxstorage(max)
  440. , storage()
  441. {
  442. }
  443. char* get()
  444. {
  445. if (storage.empty())
  446. return new char[blocksize];
  447. // Get the element from the end
  448. char* block = storage.back();
  449. storage.pop_back();
  450. return block;
  451. }
  452. void put(char* block)
  453. {
  454. if (storage.size() >= maxstorage)
  455. {
  456. // Simply delete
  457. delete[] block;
  458. return;
  459. }
  460. // Put the block into the spare buffer
  461. storage.push_back(block);
  462. }
  463. ~BufferedMessageStorage()
  464. {
  465. for (size_t i = 0; i < storage.size(); ++i)
  466. delete[] storage[i];
  467. }
  468. };
  469. struct BufferedMessage
  470. {
  471. static BufferedMessageStorage storage;
  472. SRT_MSGCTRL mc;
  473. mutable char* data;
  474. size_t size;
  475. BufferedMessage()
  476. : data()
  477. , size()
  478. {
  479. }
  480. ~BufferedMessage()
  481. {
  482. if (data)
  483. storage.put(data);
  484. }
  485. // NOTE: size 's' must be checked against SRT_LIVE_MAX_PLSIZE
  486. // before calling
  487. void copy(const char* buf, size_t s)
  488. {
  489. size = s;
  490. data = storage.get();
  491. memcpy(data, buf, s);
  492. }
  493. BufferedMessage(const BufferedMessage& foreign)
  494. : mc(foreign.mc)
  495. , data(foreign.data)
  496. , size(foreign.size)
  497. {
  498. foreign.data = 0;
  499. }
  500. BufferedMessage& operator=(const BufferedMessage& foreign)
  501. {
  502. data = foreign.data;
  503. size = foreign.size;
  504. mc = foreign.mc;
  505. foreign.data = 0;
  506. return *this;
  507. }
  508. private:
  509. void swap_with(BufferedMessage& b)
  510. {
  511. std::swap(this->mc, b.mc);
  512. std::swap(this->data, b.data);
  513. std::swap(this->size, b.size);
  514. }
  515. };
  516. typedef std::deque<BufferedMessage> senderBuffer_t;
  517. // typedef StaticBuffer<BufferedMessage, 1000> senderBuffer_t;
  518. private:
  519. // Fields required for SRT_GTYPE_BACKUP groups.
  520. senderBuffer_t m_SenderBuffer;
  521. int32_t m_iSndOldestMsgNo; // oldest position in the sender buffer
  522. sync::atomic<int32_t> m_iSndAckedMsgNo;
  523. uint32_t m_uOPT_MinStabilityTimeout_us;
  524. // THIS function must be called only in a function for a group type
  525. // that does use sender buffer.
  526. int32_t addMessageToBuffer(const char* buf, size_t len, SRT_MSGCTRL& w_mc);
  527. std::set<int> m_sPollID; // set of epoll ID to trigger
  528. int m_iMaxPayloadSize;
  529. int m_iAvgPayloadSize;
  530. bool m_bSynRecving;
  531. bool m_bSynSending;
  532. bool m_bTsbPd;
  533. bool m_bTLPktDrop;
  534. int64_t m_iTsbPdDelay_us;
  535. int m_RcvEID;
  536. class CEPollDesc* m_RcvEpolld;
  537. int m_SndEID;
  538. class CEPollDesc* m_SndEpolld;
  539. int m_iSndTimeOut; // sending timeout in milliseconds
  540. int m_iRcvTimeOut; // receiving timeout in milliseconds
  541. // Start times for TsbPd. These times shall be synchronized
  542. // between all sockets in the group. The first connected one
  543. // defines it, others shall derive it. The value 0 decides if
  544. // this has been already set.
  545. time_point m_tsStartTime;
  546. time_point m_tsRcvPeerStartTime;
  547. void recv_CollectAliveAndBroken(std::vector<srt::CUDTSocket*>& w_alive, std::set<srt::CUDTSocket*>& w_broken);
  548. /// The function polls alive member sockets and retrieves a list of read-ready.
  549. /// [acquires lock for CUDT::uglobal()->m_GlobControlLock]
  550. /// [[using locked(m_GroupLock)]] temporally unlocks-locks internally
  551. ///
  552. /// @returns list of read-ready sockets
  553. /// @throws CUDTException(MJ_CONNECTION, MN_NOCONN, 0)
  554. /// @throws CUDTException(MJ_AGAIN, MN_RDAVAIL, 0)
  555. std::vector<srt::CUDTSocket*> recv_WaitForReadReady(const std::vector<srt::CUDTSocket*>& aliveMembers, std::set<srt::CUDTSocket*>& w_broken);
  556. // This is the sequence number of a packet that has been previously
  557. // delivered. Initially it should be set to SRT_SEQNO_NONE so that the sequence read
  558. // from the first delivering socket will be taken as a good deal.
  559. sync::atomic<int32_t> m_RcvBaseSeqNo;
  560. bool m_bOpened; // Set to true when at least one link is at least pending
  561. bool m_bConnected; // Set to true on first link confirmed connected
  562. bool m_bClosing;
  563. // There's no simple way of transforming config
  564. // items that are predicted to be used on socket.
  565. // Use some options for yourself, store the others
  566. // for setting later on a socket.
  567. std::vector<ConfigItem> m_config;
  568. // Signal for the blocking user thread that the packet
  569. // is ready to deliver.
  570. sync::Condition m_RcvDataCond;
  571. sync::Mutex m_RcvDataLock;
  572. sync::atomic<int32_t> m_iLastSchedSeqNo; // represetnts the value of CUDT::m_iSndNextSeqNo for each running socket
  573. sync::atomic<int32_t> m_iLastSchedMsgNo;
  574. // Statistics
  575. struct Stats
  576. {
  577. // Stats state
  578. time_point tsActivateTime; // Time when this group sent or received the first data packet
  579. time_point tsLastSampleTime; // Time reset when clearing stats
  580. stats::Metric<stats::BytesPackets> sent; // number of packets sent from the application
  581. stats::Metric<stats::BytesPackets> recv; // number of packets delivered from the group to the application
  582. stats::Metric<stats::BytesPackets> recvDrop; // number of packets dropped by the group receiver (not received from any member)
  583. stats::Metric<stats::BytesPackets> recvDiscard; // number of packets discarded as already delivered
  584. void init()
  585. {
  586. tsActivateTime = srt::sync::steady_clock::time_point();
  587. tsLastSampleTime = srt::sync::steady_clock::now();
  588. sent.reset();
  589. recv.reset();
  590. recvDrop.reset();
  591. recvDiscard.reset();
  592. }
  593. void reset()
  594. {
  595. tsLastSampleTime = srt::sync::steady_clock::now();
  596. sent.resetTrace();
  597. recv.resetTrace();
  598. recvDrop.resetTrace();
  599. recvDiscard.resetTrace();
  600. }
  601. } m_stats;
  602. void updateAvgPayloadSize(int size)
  603. {
  604. if (m_iAvgPayloadSize == -1)
  605. m_iAvgPayloadSize = size;
  606. else
  607. m_iAvgPayloadSize = avg_iir<4>(m_iAvgPayloadSize, size);
  608. }
  609. int avgRcvPacketSize()
  610. {
  611. // In case when no packet has been received yet, but already notified
  612. // a dropped packet, its size will be SRT_LIVE_DEF_PLSIZE. It will be
  613. // the value most matching in the typical uses, although no matter what
  614. // value would be used here, each one would be wrong from some points
  615. // of view. This one is simply the best choice for typical uses of groups
  616. // provided that they are to be ued only for live mode.
  617. return m_iAvgPayloadSize == -1 ? SRT_LIVE_DEF_PLSIZE : m_iAvgPayloadSize;
  618. }
  619. public:
  620. void bstatsSocket(CBytePerfMon* perf, bool clear);
  621. // Required after the call on newGroup on the listener side.
  622. // On the listener side the group is lazily created just before
  623. // accepting a new socket and therefore always open.
  624. void setOpen() { m_bOpened = true; }
  625. std::string CONID() const
  626. {
  627. #if ENABLE_LOGGING
  628. std::ostringstream os;
  629. os << "@" << m_GroupID << ":";
  630. return os.str();
  631. #else
  632. return "";
  633. #endif
  634. }
  635. void resetInitialRxSequence()
  636. {
  637. // The app-reader doesn't care about the real sequence number.
  638. // The first provided one will be taken as a good deal; even if
  639. // this is going to be past the ISN, at worst it will be caused
  640. // by TLPKTDROP.
  641. m_RcvBaseSeqNo = SRT_SEQNO_NONE;
  642. }
  643. bool applyGroupTime(time_point& w_start_time, time_point& w_peer_start_time)
  644. {
  645. using srt::sync::is_zero;
  646. using srt_logging::gmlog;
  647. if (is_zero(m_tsStartTime))
  648. {
  649. // The first socket, defines the group time for the whole group.
  650. m_tsStartTime = w_start_time;
  651. m_tsRcvPeerStartTime = w_peer_start_time;
  652. return true;
  653. }
  654. // Sanity check. This should never happen, fix the bug if found!
  655. if (is_zero(m_tsRcvPeerStartTime))
  656. {
  657. LOGC(gmlog.Error, log << "IPE: only StartTime is set, RcvPeerStartTime still 0!");
  658. // Kinda fallback, but that's not too safe.
  659. m_tsRcvPeerStartTime = w_peer_start_time;
  660. }
  661. // The redundant connection, derive the times
  662. w_start_time = m_tsStartTime;
  663. w_peer_start_time = m_tsRcvPeerStartTime;
  664. return false;
  665. }
  666. // Live state synchronization
  667. bool getBufferTimeBase(srt::CUDT* forthesakeof, time_point& w_tb, bool& w_wp, duration& w_dr);
  668. bool applyGroupSequences(SRTSOCKET, int32_t& w_snd_isn, int32_t& w_rcv_isn);
  669. /// @brief Synchronize TSBPD base time and clock drift among members using the @a srcMember as a reference.
  670. /// @param srcMember a reference for synchronization.
  671. void synchronizeDrift(const srt::CUDT* srcMember);
  672. void updateLatestRcv(srt::CUDTSocket*);
  673. // Property accessors
  674. SRTU_PROPERTY_RW_CHAIN(CUDTGroup, SRTSOCKET, id, m_GroupID);
  675. SRTU_PROPERTY_RW_CHAIN(CUDTGroup, SRTSOCKET, peerid, m_PeerGroupID);
  676. SRTU_PROPERTY_RW_CHAIN(CUDTGroup, SRT_GROUP_TYPE, type, m_type);
  677. SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int32_t, currentSchedSequence, m_iLastSchedSeqNo);
  678. SRTU_PROPERTY_RRW(std::set<int>&, epollset, m_sPollID);
  679. SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int64_t, latency, m_iTsbPdDelay_us);
  680. SRTU_PROPERTY_RO(bool, closing, m_bClosing);
  681. };
  682. } // namespace srt
  683. #endif // INC_SRT_GROUP_H