queue.cpp 58 KB


  1. /*
  2. * SRT - Secure, Reliable, Transport
  3. * Copyright (c) 2018 Haivision Systems Inc.
  4. *
  5. * This Source Code Form is subject to the terms of the Mozilla Public
  6. * License, v. 2.0. If a copy of the MPL was not distributed with this
  7. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  8. *
  9. */
  10. /*****************************************************************************
  11. Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois.
  12. All rights reserved.
  13. Redistribution and use in source and binary forms, with or without
  14. modification, are permitted provided that the following conditions are
  15. met:
  16. * Redistributions of source code must retain the above
  17. copyright notice, this list of conditions and the
  18. following disclaimer.
  19. * Redistributions in binary form must reproduce the
  20. above copyright notice, this list of conditions
  21. and the following disclaimer in the documentation
  22. and/or other materials provided with the distribution.
  23. * Neither the name of the University of Illinois
  24. nor the names of its contributors may be used to
  25. endorse or promote products derived from this
  26. software without specific prior written permission.
  27. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
  28. IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
  29. THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
  30. PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
  31. CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
  32. EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
  33. PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
  34. PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
  35. LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
  36. NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  37. SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  38. *****************************************************************************/
  39. /*****************************************************************************
  40. written by
  41. Yunhong Gu, last updated 05/05/2011
  42. modified by
  43. Haivision Systems Inc.
  44. *****************************************************************************/
  45. #include "platform_sys.h"
  46. #include <cstring>
  47. #include "common.h"
  48. #include "api.h"
  49. #include "netinet_any.h"
  50. #include "threadname.h"
  51. #include "logging.h"
  52. #include "queue.h"
  53. using namespace std;
  54. using namespace srt::sync;
  55. using namespace srt_logging;
  56. srt::CUnitQueue::CUnitQueue(int initNumUnits, int mss)
  57. : m_iNumTaken(0)
  58. , m_iMSS(mss)
  59. , m_iBlockSize(initNumUnits)
  60. {
  61. CQEntry* tempq = allocateEntry(m_iBlockSize, m_iMSS);
  62. if (tempq == NULL)
  63. throw CUDTException(MJ_SYSTEMRES, MN_MEMORY);
  64. m_pQEntry = m_pCurrQueue = m_pLastQueue = tempq;
  65. m_pQEntry->m_pNext = m_pQEntry;
  66. m_pAvailUnit = m_pCurrQueue->m_pUnit;
  67. m_iSize = m_iBlockSize;
  68. }
  69. srt::CUnitQueue::~CUnitQueue()
  70. {
  71. CQEntry* p = m_pQEntry;
  72. while (p != NULL)
  73. {
  74. delete[] p->m_pUnit;
  75. delete[] p->m_pBuffer;
  76. CQEntry* q = p;
  77. if (p == m_pLastQueue)
  78. p = NULL;
  79. else
  80. p = p->m_pNext;
  81. delete q;
  82. }
  83. }
  84. srt::CUnitQueue::CQEntry* srt::CUnitQueue::allocateEntry(const int iNumUnits, const int mss)
  85. {
  86. CQEntry* tempq = NULL;
  87. CUnit* tempu = NULL;
  88. char* tempb = NULL;
  89. try
  90. {
  91. tempq = new CQEntry;
  92. tempu = new CUnit[iNumUnits];
  93. tempb = new char[iNumUnits * mss];
  94. }
  95. catch (...)
  96. {
  97. delete tempq;
  98. delete[] tempu;
  99. delete[] tempb;
  100. LOGC(rslog.Error, log << "CUnitQueue: failed to allocate " << iNumUnits << " units.");
  101. return NULL;
  102. }
  103. for (int i = 0; i < iNumUnits; ++i)
  104. {
  105. tempu[i].m_bTaken = false;
  106. tempu[i].m_Packet.m_pcData = tempb + i * mss;
  107. }
  108. tempq->m_pUnit = tempu;
  109. tempq->m_pBuffer = tempb;
  110. tempq->m_iSize = iNumUnits;
  111. return tempq;
  112. }
  113. int srt::CUnitQueue::increase_()
  114. {
  115. const int numUnits = m_iBlockSize;
  116. HLOGC(qrlog.Debug, log << "CUnitQueue::increase: Capacity" << capacity() << " + " << numUnits << " new units, " << m_iNumTaken << " in use.");
  117. CQEntry* tempq = allocateEntry(numUnits, m_iMSS);
  118. if (tempq == NULL)
  119. return -1;
  120. m_pLastQueue->m_pNext = tempq;
  121. m_pLastQueue = tempq;
  122. m_pLastQueue->m_pNext = m_pQEntry;
  123. m_iSize += numUnits;
  124. return 0;
  125. }
  126. srt::CUnit* srt::CUnitQueue::getNextAvailUnit()
  127. {
  128. const int iNumUnitsTotal = capacity();
  129. if (m_iNumTaken * 10 > iNumUnitsTotal * 9) // 90% or more are in use.
  130. increase_();
  131. if (m_iNumTaken >= capacity())
  132. {
  133. LOGC(qrlog.Error, log << "CUnitQueue: No free units to take. Capacity" << capacity() << ".");
  134. return NULL;
  135. }
  136. int units_checked = 0;
  137. do
  138. {
  139. const CUnit* end = m_pCurrQueue->m_pUnit + m_pCurrQueue->m_iSize;
  140. for (; m_pAvailUnit != end; ++m_pAvailUnit, ++units_checked)
  141. {
  142. if (!m_pAvailUnit->m_bTaken)
  143. {
  144. return m_pAvailUnit;
  145. }
  146. }
  147. m_pCurrQueue = m_pCurrQueue->m_pNext;
  148. m_pAvailUnit = m_pCurrQueue->m_pUnit;
  149. } while (units_checked < m_iSize);
  150. return NULL;
  151. }
  152. void srt::CUnitQueue::makeUnitFree(CUnit* unit)
  153. {
  154. SRT_ASSERT(unit != NULL);
  155. SRT_ASSERT(unit->m_bTaken);
  156. unit->m_bTaken.store(false);
  157. --m_iNumTaken;
  158. }
  159. void srt::CUnitQueue::makeUnitTaken(CUnit* unit)
  160. {
  161. ++m_iNumTaken;
  162. SRT_ASSERT(unit != NULL);
  163. SRT_ASSERT(!unit->m_bTaken);
  164. unit->m_bTaken.store(true);
  165. }
  166. srt::CSndUList::CSndUList(sync::CTimer* pTimer)
  167. : m_pHeap(NULL)
  168. , m_iArrayLength(512)
  169. , m_iLastEntry(-1)
  170. , m_ListLock()
  171. , m_pTimer(pTimer)
  172. {
  173. setupCond(m_ListCond, "CSndUListCond");
  174. m_pHeap = new CSNode*[m_iArrayLength];
  175. }
  176. srt::CSndUList::~CSndUList()
  177. {
  178. releaseCond(m_ListCond);
  179. delete[] m_pHeap;
  180. }
  181. void srt::CSndUList::update(const CUDT* u, EReschedule reschedule, sync::steady_clock::time_point ts)
  182. {
  183. ScopedLock listguard(m_ListLock);
  184. CSNode* n = u->m_pSNode;
  185. if (n->m_iHeapLoc >= 0)
  186. {
  187. if (reschedule == DONT_RESCHEDULE)
  188. return;
  189. if (n->m_tsTimeStamp <= ts)
  190. return;
  191. if (n->m_iHeapLoc == 0)
  192. {
  193. n->m_tsTimeStamp = ts;
  194. m_pTimer->interrupt();
  195. return;
  196. }
  197. remove_(u);
  198. insert_norealloc_(ts, u);
  199. return;
  200. }
  201. insert_(ts, u);
  202. }
  203. srt::CUDT* srt::CSndUList::pop()
  204. {
  205. ScopedLock listguard(m_ListLock);
  206. if (-1 == m_iLastEntry)
  207. return NULL;
  208. // no pop until the next scheduled time
  209. if (m_pHeap[0]->m_tsTimeStamp > steady_clock::now())
  210. return NULL;
  211. CUDT* u = m_pHeap[0]->m_pUDT;
  212. remove_(u);
  213. return u;
  214. }
  215. void srt::CSndUList::remove(const CUDT* u)
  216. {
  217. ScopedLock listguard(m_ListLock);
  218. remove_(u);
  219. }
  220. steady_clock::time_point srt::CSndUList::getNextProcTime()
  221. {
  222. ScopedLock listguard(m_ListLock);
  223. if (-1 == m_iLastEntry)
  224. return steady_clock::time_point();
  225. return m_pHeap[0]->m_tsTimeStamp;
  226. }
  227. void srt::CSndUList::waitNonEmpty() const
  228. {
  229. UniqueLock listguard(m_ListLock);
  230. if (m_iLastEntry >= 0)
  231. return;
  232. m_ListCond.wait(listguard);
  233. }
  234. void srt::CSndUList::signalInterrupt() const
  235. {
  236. ScopedLock listguard(m_ListLock);
  237. m_ListCond.notify_one();
  238. }
  239. void srt::CSndUList::realloc_()
  240. {
  241. CSNode** temp = NULL;
  242. try
  243. {
  244. temp = new CSNode*[2 * m_iArrayLength];
  245. }
  246. catch (...)
  247. {
  248. throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
  249. }
  250. memcpy((temp), m_pHeap, sizeof(CSNode*) * m_iArrayLength);
  251. m_iArrayLength *= 2;
  252. delete[] m_pHeap;
  253. m_pHeap = temp;
  254. }
  255. void srt::CSndUList::insert_(const steady_clock::time_point& ts, const CUDT* u)
  256. {
  257. // increase the heap array size if necessary
  258. if (m_iLastEntry == m_iArrayLength - 1)
  259. realloc_();
  260. insert_norealloc_(ts, u);
  261. }
  262. void srt::CSndUList::insert_norealloc_(const steady_clock::time_point& ts, const CUDT* u)
  263. {
  264. CSNode* n = u->m_pSNode;
  265. // do not insert repeated node
  266. if (n->m_iHeapLoc >= 0)
  267. return;
  268. SRT_ASSERT(m_iLastEntry < m_iArrayLength);
  269. m_iLastEntry++;
  270. m_pHeap[m_iLastEntry] = n;
  271. n->m_tsTimeStamp = ts;
  272. int q = m_iLastEntry;
  273. int p = q;
  274. while (p != 0)
  275. {
  276. p = (q - 1) >> 1;
  277. if (m_pHeap[p]->m_tsTimeStamp <= m_pHeap[q]->m_tsTimeStamp)
  278. break;
  279. swap(m_pHeap[p], m_pHeap[q]);
  280. m_pHeap[q]->m_iHeapLoc = q;
  281. q = p;
  282. }
  283. n->m_iHeapLoc = q;
  284. // an earlier event has been inserted, wake up sending worker
  285. if (n->m_iHeapLoc == 0)
  286. m_pTimer->interrupt();
  287. // first entry, activate the sending queue
  288. if (0 == m_iLastEntry)
  289. {
  290. // m_ListLock is assumed to be locked.
  291. m_ListCond.notify_one();
  292. }
  293. }
  294. void srt::CSndUList::remove_(const CUDT* u)
  295. {
  296. CSNode* n = u->m_pSNode;
  297. if (n->m_iHeapLoc >= 0)
  298. {
  299. // remove the node from heap
  300. m_pHeap[n->m_iHeapLoc] = m_pHeap[m_iLastEntry];
  301. m_iLastEntry--;
  302. m_pHeap[n->m_iHeapLoc]->m_iHeapLoc = n->m_iHeapLoc.load();
  303. int q = n->m_iHeapLoc;
  304. int p = q * 2 + 1;
  305. while (p <= m_iLastEntry)
  306. {
  307. if ((p + 1 <= m_iLastEntry) && (m_pHeap[p]->m_tsTimeStamp > m_pHeap[p + 1]->m_tsTimeStamp))
  308. p++;
  309. if (m_pHeap[q]->m_tsTimeStamp > m_pHeap[p]->m_tsTimeStamp)
  310. {
  311. swap(m_pHeap[p], m_pHeap[q]);
  312. m_pHeap[p]->m_iHeapLoc = p;
  313. m_pHeap[q]->m_iHeapLoc = q;
  314. q = p;
  315. p = q * 2 + 1;
  316. }
  317. else
  318. break;
  319. }
  320. n->m_iHeapLoc = -1;
  321. }
  322. // the only event has been deleted, wake up immediately
  323. if (0 == m_iLastEntry)
  324. m_pTimer->interrupt();
  325. }
  326. //
  327. srt::CSndQueue::CSndQueue()
  328. : m_pSndUList(NULL)
  329. , m_pChannel(NULL)
  330. , m_pTimer(NULL)
  331. , m_bClosing(false)
  332. {
  333. }
  334. srt::CSndQueue::~CSndQueue()
  335. {
  336. m_bClosing = true;
  337. if (m_pTimer != NULL)
  338. {
  339. m_pTimer->interrupt();
  340. }
  341. // Unblock CSndQueue worker thread if it is waiting.
  342. m_pSndUList->signalInterrupt();
  343. if (m_WorkerThread.joinable())
  344. {
  345. HLOGC(rslog.Debug, log << "SndQueue: EXIT");
  346. m_WorkerThread.join();
  347. }
  348. delete m_pSndUList;
  349. }
  350. int srt::CSndQueue::ioctlQuery(int type) const
  351. {
  352. return m_pChannel->ioctlQuery(type);
  353. }
  354. int srt::CSndQueue::sockoptQuery(int level, int type) const
  355. {
  356. return m_pChannel->sockoptQuery(level, type);
  357. }
  358. #if ENABLE_LOGGING
  359. int srt::CSndQueue::m_counter = 0;
  360. #endif
  361. void srt::CSndQueue::init(CChannel* c, CTimer* t)
  362. {
  363. m_pChannel = c;
  364. m_pTimer = t;
  365. m_pSndUList = new CSndUList(t);
  366. #if ENABLE_LOGGING
  367. ++m_counter;
  368. const std::string thrname = "SRT:SndQ:w" + Sprint(m_counter);
  369. const char* thname = thrname.c_str();
  370. #else
  371. const char* thname = "SRT:SndQ";
  372. #endif
  373. if (!StartThread(m_WorkerThread, CSndQueue::worker, this, thname))
  374. throw CUDTException(MJ_SYSTEMRES, MN_THREAD);
  375. }
  376. int srt::CSndQueue::getIpTTL() const
  377. {
  378. return m_pChannel ? m_pChannel->getIpTTL() : -1;
  379. }
  380. int srt::CSndQueue::getIpToS() const
  381. {
  382. return m_pChannel ? m_pChannel->getIpToS() : -1;
  383. }
  384. #ifdef SRT_ENABLE_BINDTODEVICE
  385. bool srt::CSndQueue::getBind(char* dst, size_t len) const
  386. {
  387. return m_pChannel ? m_pChannel->getBind(dst, len) : false;
  388. }
  389. #endif
  390. #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
  391. static void CSndQueueDebugHighratePrint(const srt::CSndQueue* self, const steady_clock::time_point currtime)
  392. {
  393. if (self->m_DbgTime <= currtime)
  394. {
  395. fprintf(stdout,
  396. "SndQueue %lu slt:%lu nrp:%lu snt:%lu nrt:%lu ctw:%lu\n",
  397. self->m_WorkerStats.lIteration,
  398. self->m_WorkerStats.lSleepTo,
  399. self->m_WorkerStats.lNotReadyPop,
  400. self->m_WorkerStats.lSendTo,
  401. self->m_WorkerStats.lNotReadyTs,
  402. self->m_WorkerStats.lCondWait);
  403. memset(&self->m_WorkerStats, 0, sizeof(self->m_WorkerStats));
  404. self->m_DbgTime = currtime + self->m_DbgPeriod;
  405. }
  406. }
  407. #endif
  408. void* srt::CSndQueue::worker(void* param)
  409. {
  410. CSndQueue* self = (CSndQueue*)param;
  411. #if ENABLE_LOGGING
  412. THREAD_STATE_INIT(("SRT:SndQ:w" + Sprint(m_counter)).c_str());
  413. #else
  414. THREAD_STATE_INIT("SRT:SndQ:worker");
  415. #endif
  416. #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
  417. #define IF_DEBUG_HIGHRATE(statement) statement
  418. self->m_DbgTime = sync::steady_clock::now();
  419. self->m_DbgPeriod = sync::microseconds_from(5000000);
  420. self->m_DbgTime += self->m_DbgPeriod;
  421. #else
  422. #define IF_DEBUG_HIGHRATE(statement) (void)0
  423. #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
  424. while (!self->m_bClosing)
  425. {
  426. const steady_clock::time_point next_time = self->m_pSndUList->getNextProcTime();
  427. INCREMENT_THREAD_ITERATIONS();
  428. IF_DEBUG_HIGHRATE(self->m_WorkerStats.lIteration++);
  429. if (is_zero(next_time))
  430. {
  431. IF_DEBUG_HIGHRATE(self->m_WorkerStats.lNotReadyTs++);
  432. // wait here if there is no sockets with data to be sent
  433. THREAD_PAUSED();
  434. if (!self->m_bClosing)
  435. {
  436. self->m_pSndUList->waitNonEmpty();
  437. IF_DEBUG_HIGHRATE(self->m_WorkerStats.lCondWait++);
  438. }
  439. THREAD_RESUMED();
  440. continue;
  441. }
  442. // wait until next processing time of the first socket on the list
  443. const steady_clock::time_point currtime = steady_clock::now();
  444. IF_DEBUG_HIGHRATE(CSndQueueDebugHighratePrint(self, currtime));
  445. if (currtime < next_time)
  446. {
  447. THREAD_PAUSED();
  448. self->m_pTimer->sleep_until(next_time);
  449. THREAD_RESUMED();
  450. IF_DEBUG_HIGHRATE(self->m_WorkerStats.lSleepTo++);
  451. }
  452. // Get a socket with a send request if any.
  453. CUDT* u = self->m_pSndUList->pop();
  454. if (u == NULL)
  455. {
  456. IF_DEBUG_HIGHRATE(self->m_WorkerStats.lNotReadyPop++);
  457. continue;
  458. }
  459. #define UST(field) ((u->m_b##field) ? "+" : "-") << #field << " "
  460. HLOGC(qslog.Debug,
  461. log << "CSndQueue: requesting packet from @" << u->socketID() << " STATUS: " << UST(Listening)
  462. << UST(Connecting) << UST(Connected) << UST(Closing) << UST(Shutdown) << UST(Broken) << UST(PeerHealth)
  463. << UST(Opened));
  464. #undef UST
  465. if (!u->m_bConnected || u->m_bBroken)
  466. {
  467. IF_DEBUG_HIGHRATE(self->m_WorkerStats.lNotReadyPop++);
  468. continue;
  469. }
  470. // pack a packet from the socket
  471. CPacket pkt;
  472. steady_clock::time_point next_send_time;
  473. sockaddr_any source_addr;
  474. const bool res = u->packData((pkt), (next_send_time), (source_addr));
  475. // Check if extracted anything to send
  476. if (res == false)
  477. {
  478. IF_DEBUG_HIGHRATE(self->m_WorkerStats.lNotReadyPop++);
  479. continue;
  480. }
  481. const sockaddr_any addr = u->m_PeerAddr;
  482. if (!is_zero(next_send_time))
  483. self->m_pSndUList->update(u, CSndUList::DO_RESCHEDULE, next_send_time);
  484. HLOGC(qslog.Debug, log << self->CONID() << "chn:SENDING: " << pkt.Info());
  485. self->m_pChannel->sendto(addr, pkt, source_addr);
  486. IF_DEBUG_HIGHRATE(self->m_WorkerStats.lSendTo++);
  487. }
  488. THREAD_EXIT();
  489. return NULL;
  490. }
  491. int srt::CSndQueue::sendto(const sockaddr_any& addr, CPacket& w_packet, const sockaddr_any& src)
  492. {
  493. // send out the packet immediately (high priority), this is a control packet
  494. // NOTE: w_packet is passed by mutable reference because this function will do
  495. // a modification in place and then it will revert it. After returning this object
  496. // should look unmodified, hence it is here passed without a reference marker.
  497. m_pChannel->sendto(addr, w_packet, src);
  498. return (int)w_packet.getLength();
  499. }
  500. //
  501. srt::CRcvUList::CRcvUList()
  502. : m_pUList(NULL)
  503. , m_pLast(NULL)
  504. {
  505. }
  506. srt::CRcvUList::~CRcvUList() {}
  507. void srt::CRcvUList::insert(const CUDT* u)
  508. {
  509. CRNode* n = u->m_pRNode;
  510. n->m_tsTimeStamp = steady_clock::now();
  511. if (NULL == m_pUList)
  512. {
  513. // empty list, insert as the single node
  514. n->m_pPrev = n->m_pNext = NULL;
  515. m_pLast = m_pUList = n;
  516. return;
  517. }
  518. // always insert at the end for RcvUList
  519. n->m_pPrev = m_pLast;
  520. n->m_pNext = NULL;
  521. m_pLast->m_pNext = n;
  522. m_pLast = n;
  523. }
  524. void srt::CRcvUList::remove(const CUDT* u)
  525. {
  526. CRNode* n = u->m_pRNode;
  527. if (!n->m_bOnList)
  528. return;
  529. if (NULL == n->m_pPrev)
  530. {
  531. // n is the first node
  532. m_pUList = n->m_pNext;
  533. if (NULL == m_pUList)
  534. m_pLast = NULL;
  535. else
  536. m_pUList->m_pPrev = NULL;
  537. }
  538. else
  539. {
  540. n->m_pPrev->m_pNext = n->m_pNext;
  541. if (NULL == n->m_pNext)
  542. {
  543. // n is the last node
  544. m_pLast = n->m_pPrev;
  545. }
  546. else
  547. n->m_pNext->m_pPrev = n->m_pPrev;
  548. }
  549. n->m_pNext = n->m_pPrev = NULL;
  550. }
  551. void srt::CRcvUList::update(const CUDT* u)
  552. {
  553. CRNode* n = u->m_pRNode;
  554. if (!n->m_bOnList)
  555. return;
  556. n->m_tsTimeStamp = steady_clock::now();
  557. // if n is the last node, do not need to change
  558. if (NULL == n->m_pNext)
  559. return;
  560. if (NULL == n->m_pPrev)
  561. {
  562. m_pUList = n->m_pNext;
  563. m_pUList->m_pPrev = NULL;
  564. }
  565. else
  566. {
  567. n->m_pPrev->m_pNext = n->m_pNext;
  568. n->m_pNext->m_pPrev = n->m_pPrev;
  569. }
  570. n->m_pPrev = m_pLast;
  571. n->m_pNext = NULL;
  572. m_pLast->m_pNext = n;
  573. m_pLast = n;
  574. }
  575. //
  576. srt::CHash::CHash()
  577. : m_pBucket(NULL)
  578. , m_iHashSize(0)
  579. {
  580. }
  581. srt::CHash::~CHash()
  582. {
  583. for (int i = 0; i < m_iHashSize; ++i)
  584. {
  585. CBucket* b = m_pBucket[i];
  586. while (NULL != b)
  587. {
  588. CBucket* n = b->m_pNext;
  589. delete b;
  590. b = n;
  591. }
  592. }
  593. delete[] m_pBucket;
  594. }
  595. void srt::CHash::init(int size)
  596. {
  597. m_pBucket = new CBucket*[size];
  598. for (int i = 0; i < size; ++i)
  599. m_pBucket[i] = NULL;
  600. m_iHashSize = size;
  601. }
  602. srt::CUDT* srt::CHash::lookup(int32_t id)
  603. {
  604. // simple hash function (% hash table size); suitable for socket descriptors
  605. CBucket* b = m_pBucket[id % m_iHashSize];
  606. while (NULL != b)
  607. {
  608. if (id == b->m_iID)
  609. return b->m_pUDT;
  610. b = b->m_pNext;
  611. }
  612. return NULL;
  613. }
  614. void srt::CHash::insert(int32_t id, CUDT* u)
  615. {
  616. CBucket* b = m_pBucket[id % m_iHashSize];
  617. CBucket* n = new CBucket;
  618. n->m_iID = id;
  619. n->m_pUDT = u;
  620. n->m_pNext = b;
  621. m_pBucket[id % m_iHashSize] = n;
  622. }
  623. void srt::CHash::remove(int32_t id)
  624. {
  625. CBucket* b = m_pBucket[id % m_iHashSize];
  626. CBucket* p = NULL;
  627. while (NULL != b)
  628. {
  629. if (id == b->m_iID)
  630. {
  631. if (NULL == p)
  632. m_pBucket[id % m_iHashSize] = b->m_pNext;
  633. else
  634. p->m_pNext = b->m_pNext;
  635. delete b;
  636. return;
  637. }
  638. p = b;
  639. b = b->m_pNext;
  640. }
  641. }
  642. //
  643. srt::CRendezvousQueue::CRendezvousQueue()
  644. : m_lRendezvousID()
  645. , m_RIDListLock()
  646. {
  647. }
  648. srt::CRendezvousQueue::~CRendezvousQueue()
  649. {
  650. m_lRendezvousID.clear();
  651. }
  652. void srt::CRendezvousQueue::insert(const SRTSOCKET& id,
  653. CUDT* u,
  654. const sockaddr_any& addr,
  655. const steady_clock::time_point& ttl)
  656. {
  657. ScopedLock vg(m_RIDListLock);
  658. CRL r;
  659. r.m_iID = id;
  660. r.m_pUDT = u;
  661. r.m_PeerAddr = addr;
  662. r.m_tsTTL = ttl;
  663. m_lRendezvousID.push_back(r);
  664. HLOGC(cnlog.Debug,
  665. log << "RID: adding socket @" << id << " for address: " << addr.str() << " expires: " << FormatTime(ttl)
  666. << " (total connectors: " << m_lRendezvousID.size() << ")");
  667. }
  668. void srt::CRendezvousQueue::remove(const SRTSOCKET& id)
  669. {
  670. ScopedLock lkv(m_RIDListLock);
  671. for (list<CRL>::iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++i)
  672. {
  673. if (i->m_iID == id)
  674. {
  675. m_lRendezvousID.erase(i);
  676. break;
  677. }
  678. }
  679. }
  680. srt::CUDT* srt::CRendezvousQueue::retrieve(const sockaddr_any& addr, SRTSOCKET& w_id) const
  681. {
  682. ScopedLock vg(m_RIDListLock);
  683. IF_HEAVY_LOGGING(const char* const id_type = w_id ? "THIS ID" : "A NEW CONNECTION");
  684. // TODO: optimize search
  685. for (list<CRL>::const_iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++i)
  686. {
  687. if (i->m_PeerAddr == addr && ((w_id == 0) || (w_id == i->m_iID)))
  688. {
  689. // This procedure doesn't exactly respond to the original UDT idea.
  690. // As the "rendezvous queue" is used for both handling rendezvous and
  691. // the caller sockets in the non-blocking mode (for blocking mode the
  692. // entire handshake procedure is handled in a loop-style in CUDT::startConnect),
  693. // the RID list should give up a socket entity in the following cases:
  694. // 1. For THE SAME id as passed in w_id, respond always, as per a caller
  695. // socket that is currently trying to connect and is managed with
  696. // HS roundtrips in an event-style. Same for rendezvous.
  697. // 2. For the "connection request" ID=0 the found socket should be given up
  698. // ONLY IF it is rendezvous. Normally ID=0 is only for listener as a
  699. // connection request. But if there was a listener, then this function
  700. // wouldn't even be called, as this case would be handled before trying
  701. // to call this function.
  702. //
  703. // This means: if an incoming ID is 0, then this search should succeed ONLY
  704. // IF THE FOUND SOCKET WAS RENDEZVOUS.
  705. if (!w_id && !i->m_pUDT->m_config.bRendezvous)
  706. {
  707. HLOGC(cnlog.Debug,
  708. log << "RID: found id @" << i->m_iID << " while looking for "
  709. << id_type << " FROM " << i->m_PeerAddr.str()
  710. << ", but it's NOT RENDEZVOUS, skipping");
  711. continue;
  712. }
  713. HLOGC(cnlog.Debug,
  714. log << "RID: found id @" << i->m_iID << " while looking for "
  715. << id_type << " FROM " << i->m_PeerAddr.str());
  716. w_id = i->m_iID;
  717. return i->m_pUDT;
  718. }
  719. }
  720. #if ENABLE_HEAVY_LOGGING
  721. std::ostringstream spec;
  722. if (w_id == 0)
  723. spec << "A NEW CONNECTION REQUEST";
  724. else
  725. spec << " AGENT @" << w_id;
  726. HLOGC(cnlog.Debug,
  727. log << "RID: NO CONNECTOR FOR ADR:" << addr.str() << " while looking for " << spec.str() << " ("
  728. << m_lRendezvousID.size() << " connectors total)");
  729. #endif
  730. return NULL;
  731. }
  732. void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst, CUnit* unit)
  733. {
  734. vector<LinkStatusInfo> toRemove, toProcess;
  735. const CPacket* pkt = unit ? &unit->m_Packet : NULL;
  736. // Need a stub value for a case when there's no unit provided ("storage depleted" case).
  737. // It should be normally NOT IN USE because in case of "storage depleted", rst != RST_OK.
  738. const SRTSOCKET dest_id = pkt ? pkt->m_iID : 0;
  739. // If no socket were qualified for further handling, finish here.
  740. // Otherwise toRemove and toProcess contain items to handle.
  741. if (!qualifyToHandle(rst, cst, dest_id, (toRemove), (toProcess)))
  742. return;
  743. HLOGC(cnlog.Debug,
  744. log << "updateConnStatus: collected " << toProcess.size() << " for processing, " << toRemove.size()
  745. << " to close");
  746. // Repeat (resend) connection request.
  747. for (vector<LinkStatusInfo>::iterator i = toProcess.begin(); i != toProcess.end(); ++i)
  748. {
  749. // IMPORTANT INFORMATION concerning changes towards UDT legacy.
  750. // In the UDT code there was no attempt to interpret any incoming data.
  751. // All data from the incoming packet were considered to be already deployed into
  752. // m_ConnRes field, and m_ConnReq field was considered at this time accordingly updated.
  753. // Therefore this procedure did only one thing: craft a new handshake packet and send it.
  754. // In SRT this may also interpret extra data (extensions in case when Agent is Responder)
  755. // and the `pktIn` packet may sometimes contain no data. Therefore the passed `rst`
  756. // must be checked to distinguish the call by periodic update (RST_AGAIN) from a call
  757. // due to have received the packet (RST_OK).
  758. //
  759. // In the below call, only the underlying `processRendezvous` function will be attempting
  760. // to interpret these data (for caller-listener this was already done by `processConnectRequest`
  761. // before calling this function), and it checks for the data presence.
  762. EReadStatus read_st = rst;
  763. EConnectStatus conn_st = cst;
  764. if (cst != CONN_RENDEZVOUS && dest_id != 0)
  765. {
  766. if (i->id != dest_id)
  767. {
  768. HLOGC(cnlog.Debug, log << "updateConnStatus: cst=" << ConnectStatusStr(cst) << " but for RID @" << i->id
  769. << " dest_id=@" << dest_id << " - resetting to AGAIN");
  770. read_st = RST_AGAIN;
  771. conn_st = CONN_AGAIN;
  772. }
  773. else
  774. {
  775. HLOGC(cnlog.Debug, log << "updateConnStatus: cst=" << ConnectStatusStr(cst) << " for @"
  776. << i->id);
  777. }
  778. }
  779. else
  780. {
  781. HLOGC(cnlog.Debug, log << "updateConnStatus: cst=" << ConnectStatusStr(cst) << " and dest_id=@" << dest_id
  782. << " - NOT checking against RID @" << i->id);
  783. }
  784. HLOGC(cnlog.Debug,
  785. log << "updateConnStatus: processing async conn for @" << i->id << " FROM " << i->peeraddr.str());
  786. if (!i->u->processAsyncConnectRequest(read_st, conn_st, pkt, i->peeraddr))
  787. {
  788. // cst == CONN_REJECT can only be result of worker_ProcessAddressedPacket and
  789. // its already set in this case.
  790. LinkStatusInfo fi = *i;
  791. fi.errorcode = SRT_ECONNREJ;
  792. toRemove.push_back(fi);
  793. i->u->sendCtrl(UMSG_SHUTDOWN);
  794. }
  795. }
  796. // NOTE: it is "believed" here that all CUDT objects will not be
  797. // deleted in the meantime. This is based on a statement that at worst
  798. // they have been "just" declared failed and it will pass at least 1s until
  799. // they are moved to ClosedSockets and it is believed that this function will
  800. // not be held on mutexes that long.
  801. for (vector<LinkStatusInfo>::iterator i = toRemove.begin(); i != toRemove.end(); ++i)
  802. {
  803. HLOGC(cnlog.Debug, log << "updateConnStatus: COMPLETING dep objects update on failed @" << i->id);
  804. //
  805. // Setting m_bConnecting to false, and need to remove the socket from the rendezvous queue
  806. // because the next CUDT::close will not remove it from the queue when m_bConnecting = false,
  807. // and may crash on next pass.
  808. //
  809. // TODO: maybe lock i->u->m_ConnectionLock?
  810. i->u->m_bConnecting = false;
  811. remove(i->u->m_SocketID);
  812. // DO NOT close the socket here because in this case it might be
  813. // unable to get status from at the right moment. Also only member
  814. // sockets should be taken care of internally - single sockets should
  815. // be normally closed by the application, after it is done with them.
  816. // app can call any UDT API to learn the connection_broken error
  817. CUDT::uglobal().m_EPoll.update_events(
  818. i->u->m_SocketID, i->u->m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR, true);
  819. i->u->completeBrokenConnectionDependencies(i->errorcode);
  820. }
  821. {
  822. // Now, additionally for every failed link reset the TTL so that
  823. // they are set expired right now.
  824. ScopedLock vg(m_RIDListLock);
  825. for (list<CRL>::iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++i)
  826. {
  827. if (find_if(toRemove.begin(), toRemove.end(), LinkStatusInfo::HasID(i->m_iID)) != toRemove.end())
  828. {
  829. LOGC(cnlog.Error,
  830. log << "updateConnStatus: processAsyncConnectRequest FAILED on @" << i->m_iID
  831. << ". Setting TTL as EXPIRED.");
  832. i->m_tsTTL =
  833. steady_clock::time_point(); // Make it expire right now, will be picked up at the next iteration
  834. }
  835. }
  836. }
  837. }
  838. bool srt::CRendezvousQueue::qualifyToHandle(EReadStatus rst,
  839. EConnectStatus cst SRT_ATR_UNUSED,
  840. int iDstSockID,
  841. vector<LinkStatusInfo>& toRemove,
  842. vector<LinkStatusInfo>& toProcess)
  843. {
  844. ScopedLock vg(m_RIDListLock);
  845. if (m_lRendezvousID.empty())
  846. return false; // nothing to process.
  847. HLOGC(cnlog.Debug,
  848. log << "updateConnStatus: updating after getting pkt with DST socket ID @" << iDstSockID
  849. << " status: " << ConnectStatusStr(cst));
  850. for (list<CRL>::iterator i = m_lRendezvousID.begin(), i_next = i; i != m_lRendezvousID.end(); i = i_next)
  851. {
  852. // Safe iterator to the next element. If the current element is erased, the iterator is updated again.
  853. ++i_next;
  854. const steady_clock::time_point tsNow = steady_clock::now();
  855. if (tsNow >= i->m_tsTTL)
  856. {
  857. HLOGC(cnlog.Debug,
  858. log << "RID: socket @" << i->m_iID
  859. << " removed - EXPIRED ("
  860. // The "enforced on FAILURE" is below when processAsyncConnectRequest failed.
  861. << (is_zero(i->m_tsTTL) ? "enforced on FAILURE" : "passed TTL") << "). WILL REMOVE from queue.");
  862. // Set appropriate error information, but do not update yet.
  863. // Exit the lock first. Collect objects to update them later.
  864. int ccerror = SRT_ECONNREJ;
  865. if (i->m_pUDT->m_RejectReason == SRT_REJ_UNKNOWN)
  866. {
  867. if (!is_zero(i->m_tsTTL))
  868. {
  869. // Timer expired, set TIMEOUT forcefully
  870. i->m_pUDT->m_RejectReason = SRT_REJ_TIMEOUT;
  871. ccerror = SRT_ENOSERVER;
  872. }
  873. else
  874. {
  875. // In case of unknown reason, rejection should at least
  876. // suggest error on the peer
  877. i->m_pUDT->m_RejectReason = SRT_REJ_PEER;
  878. }
  879. }
  880. // The call to completeBrokenConnectionDependencies() cannot happen here
  881. // under the lock of m_RIDListLock as it risks a deadlock.
  882. // Collect in 'toRemove' to update later.
  883. LinkStatusInfo fi = {i->m_pUDT, i->m_iID, ccerror, i->m_PeerAddr, -1};
  884. toRemove.push_back(fi);
  885. // i_next was preincremented, but this is guaranteed to point to
  886. // the element next to erased one.
  887. i_next = m_lRendezvousID.erase(i);
  888. continue;
  889. }
  890. else
  891. {
  892. HLOGC(cnlog.Debug,
  893. log << "RID: socket @" << i->m_iID << " still active (remaining " << std::fixed
  894. << (count_microseconds(i->m_tsTTL - tsNow) / 1000000.0) << "s of TTL)...");
  895. }
  896. const steady_clock::time_point tsLastReq = i->m_pUDT->m_tsLastReqTime;
  897. const steady_clock::time_point tsRepeat =
  898. tsLastReq + milliseconds_from(250); // Repeat connection request (send HS).
  899. // A connection request is repeated every 250 ms if there was no response from the peer:
  900. // - RST_AGAIN means no packet was received over UDP.
  901. // - a packet was received, but not for THIS socket.
  902. if ((rst == RST_AGAIN || i->m_iID != iDstSockID) && tsNow <= tsRepeat)
  903. {
  904. HLOGC(cnlog.Debug,
  905. log << "RID:@" << i->m_iID << std::fixed << count_microseconds(tsNow - tsLastReq) / 1000.0
  906. << " ms passed since last connection request.");
  907. continue;
  908. }
  909. HLOGC(cnlog.Debug,
  910. log << "RID:@" << i->m_iID << " cst=" << ConnectStatusStr(cst) << " -- repeating connection request.");
  911. // This queue is used only in case of Async mode (rendezvous or caller-listener).
  912. // Synchronous connection requests are handled in startConnect() completely.
  913. if (!i->m_pUDT->m_config.bSynRecving)
  914. {
  915. // Collect them so that they can be updated out of m_RIDListLock.
  916. LinkStatusInfo fi = {i->m_pUDT, i->m_iID, SRT_SUCCESS, i->m_PeerAddr, -1};
  917. toProcess.push_back(fi);
  918. }
  919. else
  920. {
  921. HLOGC(cnlog.Debug, log << "RID: socket @" << i->m_iID << " is SYNCHRONOUS, NOT UPDATING");
  922. }
  923. }
  924. return !toRemove.empty() || !toProcess.empty();
  925. }
  926. //
  927. srt::CRcvQueue::CRcvQueue()
  928. : m_WorkerThread()
  929. , m_pUnitQueue(NULL)
  930. , m_pRcvUList(NULL)
  931. , m_pHash(NULL)
  932. , m_pChannel(NULL)
  933. , m_pTimer(NULL)
  934. , m_iIPversion()
  935. , m_szPayloadSize()
  936. , m_bClosing(false)
  937. , m_LSLock()
  938. , m_pListener(NULL)
  939. , m_pRendezvousQueue(NULL)
  940. , m_vNewEntry()
  941. , m_IDLock()
  942. , m_mBuffer()
  943. , m_BufferCond()
  944. {
  945. setupCond(m_BufferCond, "QueueBuffer");
  946. }
  947. srt::CRcvQueue::~CRcvQueue()
  948. {
  949. m_bClosing = true;
  950. if (m_WorkerThread.joinable())
  951. {
  952. HLOGC(rslog.Debug, log << "RcvQueue: EXIT");
  953. m_WorkerThread.join();
  954. }
  955. releaseCond(m_BufferCond);
  956. delete m_pUnitQueue;
  957. delete m_pRcvUList;
  958. delete m_pHash;
  959. delete m_pRendezvousQueue;
  960. // remove all queued messages
  961. for (map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.begin(); i != m_mBuffer.end(); ++i)
  962. {
  963. while (!i->second.empty())
  964. {
  965. CPacket* pkt = i->second.front();
  966. delete pkt;
  967. i->second.pop();
  968. }
  969. }
  970. }
  971. #if ENABLE_LOGGING
  972. srt::sync::atomic<int> srt::CRcvQueue::m_counter(0);
  973. #endif
  974. void srt::CRcvQueue::init(int qsize, size_t payload, int version, int hsize, CChannel* cc, CTimer* t)
  975. {
  976. m_iIPversion = version;
  977. m_szPayloadSize = payload;
  978. SRT_ASSERT(m_pUnitQueue == NULL);
  979. m_pUnitQueue = new CUnitQueue(qsize, (int)payload);
  980. m_pHash = new CHash;
  981. m_pHash->init(hsize);
  982. m_pChannel = cc;
  983. m_pTimer = t;
  984. m_pRcvUList = new CRcvUList;
  985. m_pRendezvousQueue = new CRendezvousQueue;
  986. #if ENABLE_LOGGING
  987. const int cnt = ++m_counter;
  988. const std::string thrname = "SRT:RcvQ:w" + Sprint(cnt);
  989. #else
  990. const std::string thrname = "SRT:RcvQ:w";
  991. #endif
  992. if (!StartThread(m_WorkerThread, CRcvQueue::worker, this, thrname.c_str()))
  993. {
  994. throw CUDTException(MJ_SYSTEMRES, MN_THREAD);
  995. }
  996. }
  997. void* srt::CRcvQueue::worker(void* param)
  998. {
  999. CRcvQueue* self = (CRcvQueue*)param;
  1000. sockaddr_any sa(self->getIPversion());
  1001. int32_t id = 0;
  1002. #if ENABLE_LOGGING
  1003. THREAD_STATE_INIT(("SRT:RcvQ:w" + Sprint(m_counter)).c_str());
  1004. #else
  1005. THREAD_STATE_INIT("SRT:RcvQ:worker");
  1006. #endif
  1007. CUnit* unit = 0;
  1008. EConnectStatus cst = CONN_AGAIN;
  1009. while (!self->m_bClosing)
  1010. {
  1011. bool have_received = false;
  1012. EReadStatus rst = self->worker_RetrieveUnit((id), (unit), (sa));
  1013. INCREMENT_THREAD_ITERATIONS();
  1014. if (rst == RST_OK)
  1015. {
  1016. if (id < 0)
  1017. {
  1018. // User error on peer. May log something, but generally can only ignore it.
  1019. // XXX Think maybe about sending some "connection rejection response".
  1020. HLOGC(qrlog.Debug,
  1021. log << self->CONID() << "RECEIVED negative socket id '" << id
  1022. << "', rejecting (POSSIBLE ATTACK)");
  1023. continue;
  1024. }
  1025. // NOTE: cst state is being changed here.
  1026. // This state should be maintained through any next failed calls to worker_RetrieveUnit.
  1027. // Any error switches this to rejection, just for a case.
  1028. // Note to rendezvous connection. This can accept:
  1029. // - ID == 0 - take the first waiting rendezvous socket
  1030. // - ID > 0 - find the rendezvous socket that has this ID.
  1031. if (id == 0)
  1032. {
  1033. // ID 0 is for connection request, which should be passed to the listening socket or rendezvous sockets
  1034. cst = self->worker_ProcessConnectionRequest(unit, sa);
  1035. }
  1036. else
  1037. {
  1038. // Otherwise ID is expected to be associated with:
  1039. // - an enqueued rendezvous socket
  1040. // - a socket connected to a peer
  1041. cst = self->worker_ProcessAddressedPacket(id, unit, sa);
  1042. // CAN RETURN CONN_REJECT, but m_RejectReason is already set
  1043. }
  1044. HLOGC(qrlog.Debug, log << self->CONID() << "worker: result for the unit: " << ConnectStatusStr(cst));
  1045. if (cst == CONN_AGAIN)
  1046. {
  1047. HLOGC(qrlog.Debug, log << self->CONID() << "worker: packet not dispatched, continuing reading.");
  1048. continue;
  1049. }
  1050. have_received = true;
  1051. }
  1052. else if (rst == RST_ERROR)
  1053. {
  1054. // According to the description by CChannel::recvfrom, this can be either of:
  1055. // - IPE: all errors except EBADF
  1056. // - socket was closed in the meantime by another thread: EBADF
  1057. // If EBADF, then it's expected that the "closing" state is also set.
  1058. // Check that just to report possible errors, but interrupt the loop anyway.
  1059. if (self->m_bClosing)
  1060. {
  1061. HLOGC(qrlog.Debug,
  1062. log << self->CONID() << "CChannel reported error, but Queue is closing - INTERRUPTING worker.");
  1063. }
  1064. else
  1065. {
  1066. LOGC(qrlog.Fatal,
  1067. log << self->CONID()
  1068. << "CChannel reported ERROR DURING TRANSMISSION - IPE. INTERRUPTING worker anyway.");
  1069. }
  1070. cst = CONN_REJECT;
  1071. break;
  1072. }
  1073. // OTHERWISE: this is an "AGAIN" situation. No data was read, but the process should continue.
  1074. // take care of the timing event for all UDT sockets
  1075. const steady_clock::time_point curtime_minus_syn =
  1076. steady_clock::now() - microseconds_from(CUDT::COMM_SYN_INTERVAL_US);
  1077. CRNode* ul = self->m_pRcvUList->m_pUList;
  1078. while ((NULL != ul) && (ul->m_tsTimeStamp < curtime_minus_syn))
  1079. {
  1080. CUDT* u = ul->m_pUDT;
  1081. if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing)
  1082. {
  1083. u->checkTimers();
  1084. self->m_pRcvUList->update(u);
  1085. }
  1086. else
  1087. {
  1088. HLOGC(qrlog.Debug,
  1089. log << CUDTUnited::CONID(u->m_SocketID) << " SOCKET broken, REMOVING FROM RCV QUEUE/MAP.");
  1090. // the socket must be removed from Hash table first, then RcvUList
  1091. self->m_pHash->remove(u->m_SocketID);
  1092. self->m_pRcvUList->remove(u);
  1093. u->m_pRNode->m_bOnList = false;
  1094. }
  1095. ul = self->m_pRcvUList->m_pUList;
  1096. }
  1097. if (have_received)
  1098. {
  1099. HLOGC(qrlog.Debug,
  1100. log << "worker: RECEIVED PACKET --> updateConnStatus. cst=" << ConnectStatusStr(cst) << " id=" << id
  1101. << " pkt-payload-size=" << unit->m_Packet.getLength());
  1102. }
  1103. // Check connection requests status for all sockets in the RendezvousQueue.
  1104. // Pass the connection status from the last call of:
  1105. // worker_ProcessAddressedPacket --->
  1106. // worker_TryAsyncRend_OrStore --->
  1107. // CUDT::processAsyncConnectResponse --->
  1108. // CUDT::processConnectResponse
  1109. self->m_pRendezvousQueue->updateConnStatus(rst, cst, unit);
  1110. // XXX updateConnStatus may have removed the connector from the list,
  1111. // however there's still m_mBuffer in CRcvQueue for that socket to care about.
  1112. }
  1113. HLOGC(qrlog.Debug, log << "worker: EXIT");
  1114. THREAD_EXIT();
  1115. return NULL;
  1116. }
  1117. srt::EReadStatus srt::CRcvQueue::worker_RetrieveUnit(int32_t& w_id, CUnit*& w_unit, sockaddr_any& w_addr)
  1118. {
  1119. #if !USE_BUSY_WAITING
  1120. // This might be not really necessary, and probably
  1121. // not good for extensive bidirectional communication.
  1122. m_pTimer->tick();
  1123. #endif
  1124. // check waiting list, if new socket, insert it to the list
  1125. while (ifNewEntry())
  1126. {
  1127. CUDT* ne = getNewEntry();
  1128. if (ne)
  1129. {
  1130. HLOGC(qrlog.Debug,
  1131. log << CUDTUnited::CONID(ne->m_SocketID)
  1132. << " SOCKET pending for connection - ADDING TO RCV QUEUE/MAP");
  1133. m_pRcvUList->insert(ne);
  1134. m_pHash->insert(ne->m_SocketID, ne);
  1135. }
  1136. }
  1137. // find next available slot for incoming packet
  1138. w_unit = m_pUnitQueue->getNextAvailUnit();
  1139. if (!w_unit)
  1140. {
  1141. // no space, skip this packet
  1142. CPacket temp;
  1143. temp.allocate(m_szPayloadSize);
  1144. THREAD_PAUSED();
  1145. EReadStatus rst = m_pChannel->recvfrom((w_addr), (temp));
  1146. THREAD_RESUMED();
  1147. // Note: this will print nothing about the packet details unless heavy logging is on.
  1148. LOGC(qrlog.Error, log << CONID() << "LOCAL STORAGE DEPLETED. Dropping 1 packet: " << temp.Info());
  1149. // Be transparent for RST_ERROR, but ignore the correct
  1150. // data read and fake that the packet was dropped.
  1151. return rst == RST_ERROR ? RST_ERROR : RST_AGAIN;
  1152. }
  1153. w_unit->m_Packet.setLength(m_szPayloadSize);
  1154. // reading next incoming packet, recvfrom returns -1 is nothing has been received
  1155. THREAD_PAUSED();
  1156. EReadStatus rst = m_pChannel->recvfrom((w_addr), (w_unit->m_Packet));
  1157. THREAD_RESUMED();
  1158. if (rst == RST_OK)
  1159. {
  1160. w_id = w_unit->m_Packet.m_iID;
  1161. HLOGC(qrlog.Debug,
  1162. log << "INCOMING PACKET: FROM=" << w_addr.str() << " BOUND=" << m_pChannel->bindAddressAny().str() << " "
  1163. << w_unit->m_Packet.Info());
  1164. }
  1165. return rst;
  1166. }
  1167. srt::EConnectStatus srt::CRcvQueue::worker_ProcessConnectionRequest(CUnit* unit, const sockaddr_any& addr)
  1168. {
  1169. HLOGC(cnlog.Debug,
  1170. log << "Got sockID=0 from " << addr.str() << " - trying to resolve it as a connection request...");
  1171. // Introduced protection because it may potentially happen
  1172. // that another thread could have closed the socket at
  1173. // the same time and inject a bug between checking the
  1174. // pointer for NULL and using it.
  1175. int listener_ret = SRT_REJ_UNKNOWN;
  1176. bool have_listener = false;
  1177. {
  1178. ScopedLock cg(m_LSLock);
  1179. if (m_pListener)
  1180. {
  1181. LOGC(cnlog.Note, log << "PASSING request from: " << addr.str() << " to agent:" << m_pListener->socketID());
  1182. listener_ret = m_pListener->processConnectRequest(addr, unit->m_Packet);
  1183. // This function does return a code, but it's hard to say as to whether
  1184. // anything can be done about it. In case when it's stated possible, the
  1185. // listener will try to send some rejection response to the caller, but
  1186. // that's already done inside this function. So it's only used for
  1187. // displaying the error in logs.
  1188. have_listener = true;
  1189. }
  1190. }
  1191. // NOTE: Rendezvous sockets do bind(), but not listen(). It means that the socket is
  1192. // ready to accept connection requests, but they are not being redirected to the listener
  1193. // socket, as this is not a listener socket at all. This goes then HERE.
  1194. if (have_listener) // That is, the above block with m_pListener->processConnectRequest was executed
  1195. {
  1196. LOGC(cnlog.Note,
  1197. log << CONID() << "Listener managed the connection request from: " << addr.str()
  1198. << " result:" << RequestTypeStr(UDTRequestType(listener_ret)));
  1199. return listener_ret == SRT_REJ_UNKNOWN ? CONN_CONTINUE : CONN_REJECT;
  1200. }
  1201. // If there's no listener waiting for the packet, just store it into the queue.
  1202. return worker_TryAsyncRend_OrStore(0, unit, addr); // 0 id because the packet came in with that very ID.
  1203. }
  1204. srt::EConnectStatus srt::CRcvQueue::worker_ProcessAddressedPacket(int32_t id, CUnit* unit, const sockaddr_any& addr)
  1205. {
  1206. CUDT* u = m_pHash->lookup(id);
  1207. if (!u)
  1208. {
  1209. // Pass this to either async rendezvous connection,
  1210. // or store the packet in the queue.
  1211. HLOGC(cnlog.Debug, log << "worker_ProcessAddressedPacket: resending to QUEUED socket @" << id);
  1212. return worker_TryAsyncRend_OrStore(id, unit, addr);
  1213. }
  1214. // Found associated CUDT - process this as control or data packet
  1215. // addressed to an associated socket.
  1216. if (addr != u->m_PeerAddr)
  1217. {
  1218. HLOGC(cnlog.Debug,
  1219. log << CONID() << "Packet for SID=" << id << " asoc with " << u->m_PeerAddr.str() << " received from "
  1220. << addr.str() << " (CONSIDERED ATTACK ATTEMPT)");
  1221. // This came not from the address that is the peer associated
  1222. // with the socket. Ignore it.
  1223. return CONN_AGAIN;
  1224. }
  1225. if (!u->m_bConnected || u->m_bBroken || u->m_bClosing)
  1226. {
  1227. u->m_RejectReason = SRT_REJ_CLOSE;
  1228. // The socket is currently in the process of being disconnected
  1229. // or destroyed. Ignore.
  1230. // XXX send UMSG_SHUTDOWN in this case?
  1231. // XXX May it require mutex protection?
  1232. return CONN_REJECT;
  1233. }
  1234. if (unit->m_Packet.isControl())
  1235. u->processCtrl(unit->m_Packet);
  1236. else
  1237. u->processData(unit);
  1238. u->checkTimers();
  1239. m_pRcvUList->update(u);
  1240. return CONN_RUNNING;
  1241. }
  1242. // This function responds to the fact that a packet has come
  1243. // for a socket that does not expect to receive a normal connection
  1244. // request. This can be then:
  1245. // - a normal packet of whatever kind, just to be processed by the message loop
  1246. // - a rendezvous connection
  1247. // This function then tries to manage the packet as a rendezvous connection
  1248. // request in ASYNC mode; when this is not applicable, it stores the packet
  1249. // in the "receiving queue" so that it will be picked up in the "main" thread.
  1250. srt::EConnectStatus srt::CRcvQueue::worker_TryAsyncRend_OrStore(int32_t id, CUnit* unit, const sockaddr_any& addr)
  1251. {
  1252. // This 'retrieve' requires that 'id' be either one of those
  1253. // stored in the rendezvous queue (see CRcvQueue::registerConnector)
  1254. // or simply 0, but then at least the address must match one of these.
  1255. // If the id was 0, it will be set to the actual socket ID of the returned CUDT.
  1256. CUDT* u = m_pRendezvousQueue->retrieve(addr, (id));
  1257. if (!u)
  1258. {
  1259. // this socket is then completely unknown to the system.
  1260. // Note that this situation may also happen at a very unfortunate
  1261. // coincidence that the socket is already bound, but the registerConnector()
  1262. // has not yet started. In case of rendezvous this may mean that the other
  1263. // side just started sending its handshake packets, the local side has already
  1264. // run the CRcvQueue::worker thread, and this worker thread is trying to dispatch
  1265. // the handshake packet too early, before the dispatcher has a chance to see
  1266. // this socket registerred in the RendezvousQueue, which causes the packet unable
  1267. // to be dispatched. Therefore simply treat every "out of band" packet (with socket
  1268. // not belonging to the connection and not registered as rendezvous) as "possible
  1269. // attack" and ignore it. This also should better protect the rendezvous socket
  1270. // against a rogue connector.
  1271. if (id == 0)
  1272. {
  1273. HLOGC(cnlog.Debug,
  1274. log << CONID() << "AsyncOrRND: no sockets expect connection from " << addr.str()
  1275. << " - POSSIBLE ATTACK, ignore packet");
  1276. }
  1277. else
  1278. {
  1279. HLOGC(cnlog.Debug,
  1280. log << CONID() << "AsyncOrRND: no sockets expect socket " << id << " from " << addr.str()
  1281. << " - POSSIBLE ATTACK, ignore packet");
  1282. }
  1283. return CONN_AGAIN; // This means that the packet should be ignored.
  1284. }
  1285. // asynchronous connect: call connect here
  1286. // otherwise wait for the UDT socket to retrieve this packet
  1287. if (!u->m_config.bSynRecving)
  1288. {
  1289. HLOGC(cnlog.Debug, log << "AsyncOrRND: packet RESOLVED TO @" << id << " -- continuing as ASYNC CONNECT");
  1290. // This is practically same as processConnectResponse, just this applies
  1291. // appropriate mutex lock - which can't be done here because it's intentionally private.
  1292. // OTOH it can't be applied to processConnectResponse because the synchronous
  1293. // call to this method applies the lock by itself, and same-thread-double-locking is nonportable (crashable).
  1294. EConnectStatus cst = u->processAsyncConnectResponse(unit->m_Packet);
  1295. if (cst == CONN_CONFUSED)
  1296. {
  1297. LOGC(cnlog.Warn, log << "AsyncOrRND: PACKET NOT HANDSHAKE - re-requesting handshake from peer");
  1298. storePktClone(id, unit->m_Packet);
  1299. if (!u->processAsyncConnectRequest(RST_AGAIN, CONN_CONTINUE, &unit->m_Packet, u->m_PeerAddr))
  1300. {
  1301. // Reuse previous behavior to reject a packet
  1302. cst = CONN_REJECT;
  1303. }
  1304. else
  1305. {
  1306. cst = CONN_CONTINUE;
  1307. }
  1308. }
  1309. // It might be that this is a data packet, which has turned the connection
  1310. // into "connected" state, removed the connector (so since now every next packet
  1311. // will land directly in the queue), but this data packet shall still be delivered.
  1312. if (cst == CONN_ACCEPT && !unit->m_Packet.isControl())
  1313. {
  1314. // The process as called through processAsyncConnectResponse() should have put the
  1315. // socket into the pending queue for pending connection (don't ask me, this is so).
  1316. // This pending queue is being purged every time in the beginning of this loop, so
  1317. // currently the socket is in the pending queue, but not yet in the connection queue.
  1318. // It will be done at the next iteration of the reading loop, but it will be too late,
  1319. // we have a pending data packet now and we must either dispatch it to an already connected
  1320. // socket or disregard it, and rather prefer the former. So do this transformation now
  1321. // that we KNOW (by the cst == CONN_ACCEPT result) that the socket should be inserted
  1322. // into the pending anteroom.
  1323. CUDT* ne = getNewEntry(); // This function actuall removes the entry and returns it.
  1324. // This **should** now always return a non-null value, but check it first
  1325. // because if this accidentally isn't true, the call to worker_ProcessAddressedPacket will
  1326. // result in redirecting it to here and so on until the call stack overflow. In case of
  1327. // this "accident" simply disregard the packet from any further processing, it will be later
  1328. // loss-recovered.
  1329. // XXX (Probably the old contents of UDT's CRcvQueue::worker should be shaped a little bit
  1330. // differently throughout the functions).
  1331. if (ne)
  1332. {
  1333. HLOGC(cnlog.Debug,
  1334. log << CUDTUnited::CONID(ne->m_SocketID)
  1335. << " SOCKET pending for connection - ADDING TO RCV QUEUE/MAP");
  1336. m_pRcvUList->insert(ne);
  1337. m_pHash->insert(ne->m_SocketID, ne);
  1338. // The current situation is that this has passed processAsyncConnectResponse, but actually
  1339. // this packet *SHOULD HAVE BEEN* handled by worker_ProcessAddressedPacket, however the
  1340. // connection state wasn't completed at the moment when dispatching this packet. This has
  1341. // been now completed inside the call to processAsyncConnectResponse, but this is still a
  1342. // data packet that should have expected the connection to be already established. Therefore
  1343. // redirect it once again into worker_ProcessAddressedPacket here.
  1344. HLOGC(cnlog.Debug,
  1345. log << "AsyncOrRND: packet SWITCHED TO CONNECTED with ID=" << id
  1346. << " -- passing to worker_ProcessAddressedPacket");
  1347. // Theoretically we should check if m_pHash->lookup(ne->m_SocketID) returns 'ne', but this
  1348. // has been just added to m_pHash, so the check would be extremely paranoid here.
  1349. cst = worker_ProcessAddressedPacket(id, unit, addr);
  1350. if (cst == CONN_REJECT)
  1351. return cst;
  1352. return CONN_ACCEPT; // this function usually will return CONN_CONTINUE, which doesn't represent current
  1353. // situation.
  1354. }
  1355. else
  1356. {
  1357. LOGC(cnlog.Error,
  1358. log << "IPE: AsyncOrRND: packet SWITCHED TO CONNECTED, but ID=" << id
  1359. << " is still not present in the socket ID dispatch hash - DISREGARDING");
  1360. }
  1361. }
  1362. return cst;
  1363. }
  1364. HLOGC(cnlog.Debug,
  1365. log << "AsyncOrRND: packet RESOLVED TO ID=" << id << " -- continuing through CENTRAL PACKET QUEUE");
  1366. // This is where also the packets for rendezvous connection will be landing,
  1367. // in case of a synchronous connection.
  1368. storePktClone(id, unit->m_Packet);
  1369. return CONN_CONTINUE;
  1370. }
  1371. void srt::CRcvQueue::stopWorker()
  1372. {
  1373. // We use the decent way, so we say to the thread "please exit".
  1374. m_bClosing = true;
  1375. // Sanity check of the function's affinity.
  1376. if (srt::sync::this_thread::get_id() == m_WorkerThread.get_id())
  1377. {
  1378. LOGC(rslog.Error, log << "IPE: RcvQ:WORKER TRIES TO CLOSE ITSELF!");
  1379. return; // do nothing else, this would cause a hangup or crash.
  1380. }
  1381. HLOGC(rslog.Debug, log << "RcvQueue: EXIT (forced)");
  1382. // And we trust the thread that it does.
  1383. m_WorkerThread.join();
  1384. }
  1385. int srt::CRcvQueue::recvfrom(int32_t id, CPacket& w_packet)
  1386. {
  1387. CUniqueSync buffercond(m_BufferLock, m_BufferCond);
  1388. map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);
  1389. if (i == m_mBuffer.end())
  1390. {
  1391. THREAD_PAUSED();
  1392. buffercond.wait_for(seconds_from(1));
  1393. THREAD_RESUMED();
  1394. i = m_mBuffer.find(id);
  1395. if (i == m_mBuffer.end())
  1396. {
  1397. w_packet.setLength(-1);
  1398. return -1;
  1399. }
  1400. }
  1401. // retrieve the earliest packet
  1402. CPacket* newpkt = i->second.front();
  1403. if (w_packet.getLength() < newpkt->getLength())
  1404. {
  1405. w_packet.setLength(-1);
  1406. return -1;
  1407. }
  1408. // copy packet content
  1409. // XXX Check if this wouldn't be better done by providing
  1410. // copy constructor for DynamicStruct.
  1411. // XXX Another thing: this looks wasteful. This expects an already
  1412. // allocated memory on the packet, this thing gets the packet,
  1413. // copies it into the passed packet and then the source packet
  1414. // gets deleted. Why not simply return the originally stored packet,
  1415. // without copying, allocation and deallocation?
  1416. memcpy((w_packet.m_nHeader), newpkt->m_nHeader, CPacket::HDR_SIZE);
  1417. memcpy((w_packet.m_pcData), newpkt->m_pcData, newpkt->getLength());
  1418. w_packet.setLength(newpkt->getLength());
  1419. w_packet.m_DestAddr = newpkt->m_DestAddr;
  1420. delete newpkt;
  1421. // remove this message from queue,
  1422. // if no more messages left for this socket, release its data structure
  1423. i->second.pop();
  1424. if (i->second.empty())
  1425. m_mBuffer.erase(i);
  1426. return (int)w_packet.getLength();
  1427. }
  1428. int srt::CRcvQueue::setListener(CUDT* u)
  1429. {
  1430. ScopedLock lslock(m_LSLock);
  1431. if (NULL != m_pListener)
  1432. return -1;
  1433. m_pListener = u;
  1434. return 0;
  1435. }
  1436. void srt::CRcvQueue::removeListener(const CUDT* u)
  1437. {
  1438. ScopedLock lslock(m_LSLock);
  1439. if (u == m_pListener)
  1440. m_pListener = NULL;
  1441. }
  1442. void srt::CRcvQueue::registerConnector(const SRTSOCKET& id,
  1443. CUDT* u,
  1444. const sockaddr_any& addr,
  1445. const steady_clock::time_point& ttl)
  1446. {
  1447. HLOGC(cnlog.Debug,
  1448. log << "registerConnector: adding @" << id << " addr=" << addr.str() << " TTL=" << FormatTime(ttl));
  1449. m_pRendezvousQueue->insert(id, u, addr, ttl);
  1450. }
  1451. void srt::CRcvQueue::removeConnector(const SRTSOCKET& id)
  1452. {
  1453. HLOGC(cnlog.Debug, log << "removeConnector: removing @" << id);
  1454. m_pRendezvousQueue->remove(id);
  1455. ScopedLock bufferlock(m_BufferLock);
  1456. map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);
  1457. if (i != m_mBuffer.end())
  1458. {
  1459. HLOGC(cnlog.Debug,
  1460. log << "removeConnector: ... and its packet queue with " << i->second.size() << " packets collected");
  1461. while (!i->second.empty())
  1462. {
  1463. delete i->second.front();
  1464. i->second.pop();
  1465. }
  1466. m_mBuffer.erase(i);
  1467. }
  1468. }
  1469. void srt::CRcvQueue::setNewEntry(CUDT* u)
  1470. {
  1471. HLOGC(cnlog.Debug, log << CUDTUnited::CONID(u->m_SocketID) << "setting socket PENDING FOR CONNECTION");
  1472. ScopedLock listguard(m_IDLock);
  1473. m_vNewEntry.push_back(u);
  1474. }
  1475. bool srt::CRcvQueue::ifNewEntry()
  1476. {
  1477. return !(m_vNewEntry.empty());
  1478. }
  1479. srt::CUDT* srt::CRcvQueue::getNewEntry()
  1480. {
  1481. ScopedLock listguard(m_IDLock);
  1482. if (m_vNewEntry.empty())
  1483. return NULL;
  1484. CUDT* u = (CUDT*)*(m_vNewEntry.begin());
  1485. m_vNewEntry.erase(m_vNewEntry.begin());
  1486. return u;
  1487. }
  1488. void srt::CRcvQueue::storePktClone(int32_t id, const CPacket& pkt)
  1489. {
  1490. CUniqueSync passcond(m_BufferLock, m_BufferCond);
  1491. map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);
  1492. if (i == m_mBuffer.end())
  1493. {
  1494. m_mBuffer[id].push(pkt.clone());
  1495. passcond.notify_one();
  1496. }
  1497. else
  1498. {
  1499. // Avoid storing too many packets, in case of malfunction or attack.
  1500. if (i->second.size() > 16)
  1501. return;
  1502. i->second.push(pkt.clone());
  1503. }
  1504. }
  1505. void srt::CMultiplexer::destroy()
  1506. {
  1507. // Reverse order of the assigned.
  1508. delete m_pRcvQueue;
  1509. delete m_pSndQueue;
  1510. delete m_pTimer;
  1511. if (m_pChannel)
  1512. {
  1513. m_pChannel->close();
  1514. delete m_pChannel;
  1515. }
  1516. }