api.cpp 151 KB


  1. /*
  2. * SRT - Secure, Reliable, Transport
  3. * Copyright (c) 2018 Haivision Systems Inc.
  4. *
  5. * This Source Code Form is subject to the terms of the Mozilla Public
  6. * License, v. 2.0. If a copy of the MPL was not distributed with this
  7. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  8. *
  9. */
  10. /*****************************************************************************
  11. Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois.
  12. All rights reserved.
  13. Redistribution and use in source and binary forms, with or without
  14. modification, are permitted provided that the following conditions are
  15. met:
  16. * Redistributions of source code must retain the above
  17. copyright notice, this list of conditions and the
  18. following disclaimer.
  19. * Redistributions in binary form must reproduce the
  20. above copyright notice, this list of conditions
  21. and the following disclaimer in the documentation
  22. and/or other materials provided with the distribution.
  23. * Neither the name of the University of Illinois
  24. nor the names of its contributors may be used to
  25. endorse or promote products derived from this
  26. software without specific prior written permission.
  27. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
  28. IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
  29. THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
  30. PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
  31. CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
  32. EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
  33. PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
  34. PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
  35. LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
  36. NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  37. SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  38. *****************************************************************************/
  39. /*****************************************************************************
  40. written by
  41. Yunhong Gu, last updated 07/09/2011
  42. modified by
  43. Haivision Systems Inc.
  44. *****************************************************************************/
  45. #include "platform_sys.h"
  46. #include <exception>
  47. #include <stdexcept>
  48. #include <typeinfo>
  49. #include <iterator>
  50. #include <vector>
  51. #include <cstring>
  52. #include "utilities.h"
  53. #include "netinet_any.h"
  54. #include "api.h"
  55. #include "core.h"
  56. #include "epoll.h"
  57. #include "logging.h"
  58. #include "threadname.h"
  59. #include "srt.h"
  60. #include "udt.h"
  61. #ifdef _WIN32
  62. #include <win/wintime.h>
  63. #endif
  64. #ifdef _MSC_VER
  65. #pragma warning(error : 4530)
  66. #endif
  67. using namespace std;
  68. using namespace srt_logging;
  69. using namespace srt::sync;
  70. void srt::CUDTSocket::construct()
  71. {
  72. #if ENABLE_BONDING
  73. m_GroupOf = NULL;
  74. m_GroupMemberData = NULL;
  75. #endif
  76. setupMutex(m_AcceptLock, "Accept");
  77. setupCond(m_AcceptCond, "Accept");
  78. setupMutex(m_ControlLock, "Control");
  79. }
  80. srt::CUDTSocket::~CUDTSocket()
  81. {
  82. releaseMutex(m_AcceptLock);
  83. releaseCond(m_AcceptCond);
  84. releaseMutex(m_ControlLock);
  85. }
  86. SRT_SOCKSTATUS srt::CUDTSocket::getStatus()
  87. {
  88. // TTL in CRendezvousQueue::updateConnStatus() will set m_bConnecting to false.
  89. // Although m_Status is still SRTS_CONNECTING, the connection is in fact to be closed due to TTL expiry.
  90. // In this case m_bConnected is also false. Both checks are required to avoid hitting
  91. // a regular state transition from CONNECTING to CONNECTED.
  92. if (m_UDT.m_bBroken)
  93. return SRTS_BROKEN;
  94. // Connecting timed out
  95. if ((m_Status == SRTS_CONNECTING) && !m_UDT.m_bConnecting && !m_UDT.m_bConnected)
  96. return SRTS_BROKEN;
  97. return m_Status;
  98. }
  99. // [[using locked(m_GlobControlLock)]]
  100. void srt::CUDTSocket::breakSocket_LOCKED()
  101. {
  102. // This function is intended to be called from GC,
  103. // under a lock of m_GlobControlLock.
  104. m_UDT.m_bBroken = true;
  105. m_UDT.m_iBrokenCounter = 0;
  106. HLOGC(smlog.Debug, log << "@" << m_SocketID << " CLOSING AS SOCKET");
  107. m_UDT.closeInternal();
  108. setClosed();
  109. }
  110. void srt::CUDTSocket::setClosed()
  111. {
  112. m_Status = SRTS_CLOSED;
  113. // a socket will not be immediately removed when it is closed
  114. // in order to prevent other methods from accessing invalid address
  115. // a timer is started and the socket will be removed after approximately
  116. // 1 second
  117. m_tsClosureTimeStamp = steady_clock::now();
  118. }
  119. void srt::CUDTSocket::setBrokenClosed()
  120. {
  121. m_UDT.m_iBrokenCounter = 60;
  122. m_UDT.m_bBroken = true;
  123. setClosed();
  124. }
  125. bool srt::CUDTSocket::readReady()
  126. {
  127. // TODO: Use m_RcvBufferLock here (CUDT::isRcvReadReady())?
  128. if (m_UDT.m_bConnected && m_UDT.m_pRcvBuffer->isRcvDataReady())
  129. return true;
  130. if (m_UDT.m_bListening)
  131. return !m_QueuedSockets.empty();
  132. return broken();
  133. }
  134. bool srt::CUDTSocket::writeReady() const
  135. {
  136. return (m_UDT.m_bConnected && (m_UDT.m_pSndBuffer->getCurrBufSize() < m_UDT.m_config.iSndBufSize)) || broken();
  137. }
  138. bool srt::CUDTSocket::broken() const
  139. {
  140. return m_UDT.m_bBroken || !m_UDT.m_bConnected;
  141. }
  142. ////////////////////////////////////////////////////////////////////////////////
  143. srt::CUDTUnited::CUDTUnited()
  144. : m_Sockets()
  145. , m_GlobControlLock()
  146. , m_IDLock()
  147. , m_mMultiplexer()
  148. , m_MultiplexerLock()
  149. , m_pCache(NULL)
  150. , m_bClosing(false)
  151. , m_GCStopCond()
  152. , m_InitLock()
  153. , m_iInstanceCount(0)
  154. , m_bGCStatus(false)
  155. , m_ClosedSockets()
  156. {
  157. // Socket ID MUST start from a random value
  158. m_SocketIDGenerator = genRandomInt(1, MAX_SOCKET_VAL);
  159. m_SocketIDGenerator_init = m_SocketIDGenerator;
  160. // XXX An unlikely exception thrown from the below calls
  161. // might destroy the application before `main`. This shouldn't
  162. // be a problem in general.
  163. setupMutex(m_GCStopLock, "GCStop");
  164. setupCond(m_GCStopCond, "GCStop");
  165. setupMutex(m_GlobControlLock, "GlobControl");
  166. setupMutex(m_IDLock, "ID");
  167. setupMutex(m_InitLock, "Init");
  168. m_pCache = new CCache<CInfoBlock>;
  169. }
  170. srt::CUDTUnited::~CUDTUnited()
  171. {
  172. // Call it if it wasn't called already.
  173. // This will happen at the end of main() of the application,
  174. // when the user didn't call srt_cleanup().
  175. if (m_bGCStatus)
  176. {
  177. cleanup();
  178. }
  179. releaseMutex(m_GlobControlLock);
  180. releaseMutex(m_IDLock);
  181. releaseMutex(m_InitLock);
  182. // XXX There's some weird bug here causing this
  183. // to hangup on Windows. This might be either something
  184. // bigger, or some problem in pthread-win32. As this is
  185. // the application cleanup section, this can be temporarily
  186. // tolerated with simply exit the application without cleanup,
  187. // counting on that the system will take care of it anyway.
  188. #ifndef _WIN32
  189. releaseCond(m_GCStopCond);
  190. #endif
  191. releaseMutex(m_GCStopLock);
  192. delete m_pCache;
  193. }
  194. string srt::CUDTUnited::CONID(SRTSOCKET sock)
  195. {
  196. if (sock == 0)
  197. return "";
  198. std::ostringstream os;
  199. os << "@" << sock << ":";
  200. return os.str();
  201. }
  202. int srt::CUDTUnited::startup()
  203. {
  204. ScopedLock gcinit(m_InitLock);
  205. if (m_iInstanceCount++ > 0)
  206. return 1;
  207. // Global initialization code
  208. #ifdef _WIN32
  209. WORD wVersionRequested;
  210. WSADATA wsaData;
  211. wVersionRequested = MAKEWORD(2, 2);
  212. if (0 != WSAStartup(wVersionRequested, &wsaData))
  213. throw CUDTException(MJ_SETUP, MN_NONE, WSAGetLastError());
  214. #endif
  215. CCryptoControl::globalInit();
  216. PacketFilter::globalInit();
  217. if (m_bGCStatus)
  218. return 1;
  219. m_bClosing = false;
  220. if (!StartThread(m_GCThread, garbageCollect, this, "SRT:GC"))
  221. return -1;
  222. m_bGCStatus = true;
  223. HLOGC(inlog.Debug, log << "SRT Clock Type: " << SRT_SYNC_CLOCK_STR);
  224. return 0;
  225. }
  226. int srt::CUDTUnited::cleanup()
  227. {
  228. // IMPORTANT!!!
  229. // In this function there must be NO LOGGING AT ALL. This function may
  230. // potentially be called from within the global program destructor, and
  231. // therefore some of the facilities used by the logging system - including
  232. // the default std::cerr object bound to it by default, but also a different
  233. // stream that the user's app has bound to it, and which got destroyed
  234. // together with already exited main() - may be already deleted when
  235. // executing this procedure.
  236. ScopedLock gcinit(m_InitLock);
  237. if (--m_iInstanceCount > 0)
  238. return 0;
  239. if (!m_bGCStatus)
  240. return 0;
  241. {
  242. UniqueLock gclock(m_GCStopLock);
  243. m_bClosing = true;
  244. }
  245. // NOTE: we can do relaxed signaling here because
  246. // waiting on m_GCStopCond has a 1-second timeout,
  247. // after which the m_bClosing flag is cheched, which
  248. // is set here above. Worst case secenario, this
  249. // pthread_join() call will block for 1 second.
  250. CSync::notify_one_relaxed(m_GCStopCond);
  251. m_GCThread.join();
  252. m_bGCStatus = false;
  253. // Global destruction code
  254. #ifdef _WIN32
  255. WSACleanup();
  256. #endif
  257. return 0;
  258. }
  259. SRTSOCKET srt::CUDTUnited::generateSocketID(bool for_group)
  260. {
  261. ScopedLock guard(m_IDLock);
  262. int sockval = m_SocketIDGenerator - 1;
  263. // First problem: zero-value should be avoided by various reasons.
  264. if (sockval <= 0)
  265. {
  266. // We have a rollover on the socket value, so
  267. // definitely we haven't made the Columbus mistake yet.
  268. m_SocketIDGenerator = MAX_SOCKET_VAL;
  269. sockval = MAX_SOCKET_VAL;
  270. }
  271. // Check all sockets if any of them has this value.
  272. // Socket IDs are begin created this way:
  273. //
  274. // Initial random
  275. // |
  276. // |
  277. // |
  278. // |
  279. // ...
  280. // The only problem might be if the number rolls over
  281. // and reaches the same value from the opposite side.
  282. // This is still a valid socket value, but this time
  283. // we have to check, which sockets have been used already.
  284. if (sockval == m_SocketIDGenerator_init)
  285. {
  286. // Mark that since this point on the checks for
  287. // whether the socket ID is in use must be done.
  288. m_SocketIDGenerator_init = 0;
  289. }
  290. // This is when all socket numbers have been already used once.
  291. // This may happen after many years of running an application
  292. // constantly when the connection breaks and gets restored often.
  293. if (m_SocketIDGenerator_init == 0)
  294. {
  295. int startval = sockval;
  296. for (;;) // Roll until an unused value is found
  297. {
  298. enterCS(m_GlobControlLock);
  299. const bool exists =
  300. #if ENABLE_BONDING
  301. for_group
  302. ? m_Groups.count(sockval | SRTGROUP_MASK)
  303. :
  304. #endif
  305. m_Sockets.count(sockval);
  306. leaveCS(m_GlobControlLock);
  307. if (exists)
  308. {
  309. // The socket value is in use.
  310. --sockval;
  311. if (sockval <= 0)
  312. sockval = MAX_SOCKET_VAL;
  313. // Before continuing, check if we haven't rolled back to start again
  314. // This is virtually impossible, so just make an RTI error.
  315. if (sockval == startval)
  316. {
  317. // Of course, we don't lack memory, but actually this is so impossible
  318. // that a complete memory extinction is much more possible than this.
  319. // So treat this rather as a formal fallback for something that "should
  320. // never happen". This should make the socket creation functions, from
  321. // socket_create and accept, return this error.
  322. m_SocketIDGenerator = sockval + 1; // so that any next call will cause the same error
  323. throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
  324. }
  325. // try again, if this is a free socket
  326. continue;
  327. }
  328. // No socket found, this ID is free to use
  329. m_SocketIDGenerator = sockval;
  330. break;
  331. }
  332. }
  333. else
  334. {
  335. m_SocketIDGenerator = sockval;
  336. }
  337. // The socket value counter remains with the value rolled
  338. // without the group bit set; only the returned value may have
  339. // the group bit set.
  340. if (for_group)
  341. sockval = m_SocketIDGenerator | SRTGROUP_MASK;
  342. else
  343. sockval = m_SocketIDGenerator;
  344. LOGC(smlog.Debug, log << "generateSocketID: " << (for_group ? "(group)" : "") << ": @" << sockval);
  345. return sockval;
  346. }
  347. SRTSOCKET srt::CUDTUnited::newSocket(CUDTSocket** pps)
  348. {
  349. // XXX consider using some replacement of std::unique_ptr
  350. // so that exceptions will clean up the object without the
  351. // need for a dedicated code.
  352. CUDTSocket* ns = NULL;
  353. try
  354. {
  355. ns = new CUDTSocket;
  356. }
  357. catch (...)
  358. {
  359. delete ns;
  360. throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
  361. }
  362. try
  363. {
  364. ns->m_SocketID = generateSocketID();
  365. }
  366. catch (...)
  367. {
  368. delete ns;
  369. throw;
  370. }
  371. ns->m_Status = SRTS_INIT;
  372. ns->m_ListenSocket = 0;
  373. ns->core().m_SocketID = ns->m_SocketID;
  374. ns->core().m_pCache = m_pCache;
  375. try
  376. {
  377. HLOGC(smlog.Debug, log << CONID(ns->m_SocketID) << "newSocket: mapping socket " << ns->m_SocketID);
  378. // protect the m_Sockets structure.
  379. ScopedLock cs(m_GlobControlLock);
  380. m_Sockets[ns->m_SocketID] = ns;
  381. }
  382. catch (...)
  383. {
  384. // failure and rollback
  385. delete ns;
  386. ns = NULL;
  387. throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
  388. }
  389. if (pps)
  390. *pps = ns;
  391. return ns->m_SocketID;
  392. }
  393. int srt::CUDTUnited::newConnection(const SRTSOCKET listen,
  394. const sockaddr_any& peer,
  395. const CPacket& hspkt,
  396. CHandShake& w_hs,
  397. int& w_error,
  398. CUDT*& w_acpu)
  399. {
  400. CUDTSocket* ns = NULL;
  401. w_acpu = NULL;
  402. w_error = SRT_REJ_IPE;
  403. // Can't manage this error through an exception because this is
  404. // running in the listener loop.
  405. CUDTSocket* ls = locateSocket(listen);
  406. if (!ls)
  407. {
  408. LOGC(cnlog.Error, log << "IPE: newConnection by listener socket id=" << listen << " which DOES NOT EXIST.");
  409. return -1;
  410. }
  411. HLOGC(cnlog.Debug,
  412. log << "newConnection: creating new socket after listener @" << listen
  413. << " contacted with backlog=" << ls->m_uiBackLog);
  414. // if this connection has already been processed
  415. if ((ns = locatePeer(peer, w_hs.m_iID, w_hs.m_iISN)) != NULL)
  416. {
  417. if (ns->core().m_bBroken)
  418. {
  419. // last connection from the "peer" address has been broken
  420. ns->setClosed();
  421. ScopedLock acceptcg(ls->m_AcceptLock);
  422. ls->m_QueuedSockets.erase(ns->m_SocketID);
  423. }
  424. else
  425. {
  426. // connection already exist, this is a repeated connection request
  427. // respond with existing HS information
  428. HLOGC(cnlog.Debug, log << "newConnection: located a WORKING peer @" << w_hs.m_iID << " - ADAPTING.");
  429. w_hs.m_iISN = ns->core().m_iISN;
  430. w_hs.m_iMSS = ns->core().MSS();
  431. w_hs.m_iFlightFlagSize = ns->core().m_config.iFlightFlagSize;
  432. w_hs.m_iReqType = URQ_CONCLUSION;
  433. w_hs.m_iID = ns->m_SocketID;
  434. // Report the original UDT because it will be
  435. // required to complete the HS data for conclusion response.
  436. w_acpu = &ns->core();
  437. return 0;
  438. // except for this situation a new connection should be started
  439. }
  440. }
  441. else
  442. {
  443. HLOGC(cnlog.Debug,
  444. log << "newConnection: NOT located any peer @" << w_hs.m_iID << " - resuming with initial connection.");
  445. }
  446. // exceeding backlog, refuse the connection request
  447. if (ls->m_QueuedSockets.size() >= ls->m_uiBackLog)
  448. {
  449. w_error = SRT_REJ_BACKLOG;
  450. LOGC(cnlog.Note, log << "newConnection: listen backlog=" << ls->m_uiBackLog << " EXCEEDED");
  451. return -1;
  452. }
  453. try
  454. {
  455. ns = new CUDTSocket(*ls);
  456. // No need to check the peer, this is the address from which the request has come.
  457. ns->m_PeerAddr = peer;
  458. }
  459. catch (...)
  460. {
  461. w_error = SRT_REJ_RESOURCE;
  462. delete ns;
  463. LOGC(cnlog.Error, log << "IPE: newConnection: unexpected exception (probably std::bad_alloc)");
  464. return -1;
  465. }
  466. ns->core().m_RejectReason = SRT_REJ_UNKNOWN; // pre-set a universal value
  467. try
  468. {
  469. ns->m_SocketID = generateSocketID();
  470. }
  471. catch (const CUDTException&)
  472. {
  473. LOGC(cnlog.Fatal, log << "newConnection: IPE: all sockets occupied? Last gen=" << m_SocketIDGenerator);
  474. // generateSocketID throws exception, which can be naturally handled
  475. // when the call is derived from the API call, but here it's called
  476. // internally in response to receiving a handshake. It must be handled
  477. // here and turned into an erroneous return value.
  478. delete ns;
  479. return -1;
  480. }
  481. ns->m_ListenSocket = listen;
  482. ns->core().m_SocketID = ns->m_SocketID;
  483. ns->m_PeerID = w_hs.m_iID;
  484. ns->m_iISN = w_hs.m_iISN;
  485. HLOGC(cnlog.Debug,
  486. log << "newConnection: DATA: lsnid=" << listen << " id=" << ns->core().m_SocketID
  487. << " peerid=" << ns->core().m_PeerID << " ISN=" << ns->m_iISN);
  488. int error = 0;
  489. bool should_submit_to_accept = true;
  490. // Set the error code for all prospective problems below.
  491. // It won't be interpreted when result was successful.
  492. w_error = SRT_REJ_RESOURCE;
  493. // These can throw exception only when the memory allocation failed.
  494. // CUDT::connect() translates exception into CUDTException.
  495. // CUDT::open() may only throw original std::bad_alloc from new.
  496. // This is only to make the library extra safe (when your machine lacks
  497. // memory, it will continue to work, but fail to accept connection).
  498. try
  499. {
  500. // This assignment must happen b4 the call to CUDT::connect() because
  501. // this call causes sending the SRT Handshake through this socket.
  502. // Without this mapping the socket cannot be found and therefore
  503. // the SRT Handshake message would fail.
  504. HLOGC(cnlog.Debug, log <<
  505. "newConnection: incoming " << peer.str() << ", mapping socket " << ns->m_SocketID);
  506. {
  507. ScopedLock cg(m_GlobControlLock);
  508. m_Sockets[ns->m_SocketID] = ns;
  509. }
  510. if (ls->core().m_cbAcceptHook)
  511. {
  512. if (!ls->core().runAcceptHook(&ns->core(), peer.get(), w_hs, hspkt))
  513. {
  514. w_error = ns->core().m_RejectReason;
  515. error = 1;
  516. goto ERR_ROLLBACK;
  517. }
  518. }
  519. // bind to the same addr of listening socket
  520. ns->core().open();
  521. if (!updateListenerMux(ns, ls))
  522. {
  523. // This is highly unlikely if not impossible, but there's
  524. // a theoretical runtime chance of failure so it should be
  525. // handled
  526. ns->core().m_RejectReason = SRT_REJ_IPE;
  527. throw false; // let it jump directly into the omni exception handler
  528. }
  529. ns->core().acceptAndRespond(ls->m_SelfAddr, peer, hspkt, (w_hs));
  530. }
  531. catch (...)
  532. {
  533. // Extract the error that was set in this new failed entity.
  534. w_error = ns->core().m_RejectReason;
  535. error = 1;
  536. goto ERR_ROLLBACK;
  537. }
  538. ns->m_Status = SRTS_CONNECTED;
  539. // copy address information of local node
  540. // Precisely, what happens here is:
  541. // - Get the IP address and port from the system database
  542. ns->core().m_pSndQueue->m_pChannel->getSockAddr((ns->m_SelfAddr));
  543. // - OVERWRITE just the IP address itself by a value taken from piSelfIP
  544. // (the family is used exactly as the one taken from what has been returned
  545. // by getsockaddr)
  546. CIPAddress::pton((ns->m_SelfAddr), ns->core().m_piSelfIP, peer);
  547. {
  548. // protect the m_PeerRec structure (and group existence)
  549. ScopedLock glock(m_GlobControlLock);
  550. try
  551. {
  552. HLOGC(cnlog.Debug, log << "newConnection: mapping peer " << ns->m_PeerID
  553. << " to that socket (" << ns->m_SocketID << ")");
  554. m_PeerRec[ns->getPeerSpec()].insert(ns->m_SocketID);
  555. }
  556. catch (...)
  557. {
  558. LOGC(cnlog.Error, log << "newConnection: error when mapping peer!");
  559. error = 2;
  560. }
  561. // The access to m_GroupOf should be also protected, as the group
  562. // could be requested deletion in the meantime. This will hold any possible
  563. // removal from group and resetting m_GroupOf field.
  564. #if ENABLE_BONDING
  565. if (ns->m_GroupOf)
  566. {
  567. // XXX this might require another check of group type.
  568. // For redundancy group, at least, update the status in the group
  569. CUDTGroup* g = ns->m_GroupOf;
  570. ScopedLock grlock(g->m_GroupLock);
  571. if (g->m_bClosing)
  572. {
  573. error = 1; // "INTERNAL REJECTION"
  574. goto ERR_ROLLBACK;
  575. }
  576. // Check if this is the first socket in the group.
  577. // If so, give it up to accept, otherwise just do nothing
  578. // The client will be informed about the newly added connection at the
  579. // first moment when attempting to get the group status.
  580. for (CUDTGroup::gli_t gi = g->m_Group.begin(); gi != g->m_Group.end(); ++gi)
  581. {
  582. if (gi->laststatus == SRTS_CONNECTED)
  583. {
  584. HLOGC(cnlog.Debug,
  585. log << "Found another connected socket in the group: $" << gi->id
  586. << " - socket will be NOT given up for accepting");
  587. should_submit_to_accept = false;
  588. break;
  589. }
  590. }
  591. // Update the status in the group so that the next
  592. // operation can include the socket in the group operation.
  593. CUDTGroup::SocketData* gm = ns->m_GroupMemberData;
  594. HLOGC(cnlog.Debug,
  595. log << "newConnection(GROUP): Socket @" << ns->m_SocketID << " BELONGS TO $" << g->id() << " - will "
  596. << (should_submit_to_accept ? "" : "NOT ") << "report in accept");
  597. gm->sndstate = SRT_GST_IDLE;
  598. gm->rcvstate = SRT_GST_IDLE;
  599. gm->laststatus = SRTS_CONNECTED;
  600. if (!g->m_bConnected)
  601. {
  602. HLOGC(cnlog.Debug, log << "newConnection(GROUP): First socket connected, SETTING GROUP CONNECTED");
  603. g->m_bConnected = true;
  604. }
  605. // XXX PROLBEM!!! These events are subscribed here so that this is done once, lazily,
  606. // but groupwise connections could be accepted from multiple listeners for the same group!
  607. // m_listener MUST BE A CONTAINER, NOT POINTER!!!
  608. // ALSO: Maybe checking "the same listener" is not necessary as subscruption may be done
  609. // multiple times anyway?
  610. if (!g->m_listener)
  611. {
  612. // Newly created group from the listener, which hasn't yet
  613. // the listener set.
  614. g->m_listener = ls;
  615. // Listen on both first connected socket and continued sockets.
  616. // This might help with jump-over situations, and in regular continued
  617. // sockets the IN event won't be reported anyway.
  618. int listener_modes = SRT_EPOLL_ACCEPT | SRT_EPOLL_UPDATE;
  619. epoll_add_usock_INTERNAL(g->m_RcvEID, ls, &listener_modes);
  620. // This listening should be done always when a first connected socket
  621. // appears as accepted off the listener. This is for the sake of swait() calls
  622. // inside the group receiving and sending functions so that they get
  623. // interrupted when a new socket is connected.
  624. }
  625. // Add also per-direction subscription for the about-to-be-accepted socket.
  626. // Both first accepted socket that makes the group-accept and every next
  627. // socket that adds a new link.
  628. int read_modes = SRT_EPOLL_IN | SRT_EPOLL_ERR;
  629. int write_modes = SRT_EPOLL_OUT | SRT_EPOLL_ERR;
  630. epoll_add_usock_INTERNAL(g->m_RcvEID, ns, &read_modes);
  631. epoll_add_usock_INTERNAL(g->m_SndEID, ns, &write_modes);
  632. // With app reader, do not set groupPacketArrival (block the
  633. // provider array feature completely for now).
  634. /* SETUP HERE IF NEEDED
  635. ns->core().m_cbPacketArrival.set(ns->m_pUDT, &CUDT::groupPacketArrival);
  636. */
  637. }
  638. else
  639. {
  640. HLOGC(cnlog.Debug, log << "newConnection: Socket @" << ns->m_SocketID << " is not in a group");
  641. }
  642. #endif
  643. }
  644. if (should_submit_to_accept)
  645. {
  646. enterCS(ls->m_AcceptLock);
  647. try
  648. {
  649. ls->m_QueuedSockets.insert(ns->m_SocketID);
  650. }
  651. catch (...)
  652. {
  653. LOGC(cnlog.Error, log << "newConnection: error when queuing socket!");
  654. error = 3;
  655. }
  656. leaveCS(ls->m_AcceptLock);
  657. HLOGC(cnlog.Debug, log << "ACCEPT: new socket @" << ns->m_SocketID << " submitted for acceptance");
  658. // acknowledge users waiting for new connections on the listening socket
  659. m_EPoll.update_events(listen, ls->core().m_sPollID, SRT_EPOLL_ACCEPT, true);
  660. CGlobEvent::triggerEvent();
  661. // XXX the exact value of 'error' is ignored
  662. if (error > 0)
  663. {
  664. goto ERR_ROLLBACK;
  665. }
  666. // wake up a waiting accept() call
  667. CSync::lock_notify_one(ls->m_AcceptCond, ls->m_AcceptLock);
  668. }
  669. else
  670. {
  671. HLOGC(cnlog.Debug,
  672. log << "ACCEPT: new socket @" << ns->m_SocketID
  673. << " NOT submitted to acceptance, another socket in the group is already connected");
  674. // acknowledge INTERNAL users waiting for new connections on the listening socket
  675. // that are reported when a new socket is connected within an already connected group.
  676. m_EPoll.update_events(listen, ls->core().m_sPollID, SRT_EPOLL_UPDATE, true);
  677. CGlobEvent::triggerEvent();
  678. }
  679. ERR_ROLLBACK:
  680. // XXX the exact value of 'error' is ignored
  681. if (error > 0)
  682. {
  683. #if ENABLE_LOGGING
  684. static const char* why[] = {
  685. "UNKNOWN ERROR", "INTERNAL REJECTION", "IPE when mapping a socket", "IPE when inserting a socket"};
  686. LOGC(cnlog.Warn,
  687. log << CONID(ns->m_SocketID) << "newConnection: connection rejected due to: " << why[error] << " - "
  688. << RequestTypeStr(URQFailure(w_error)));
  689. #endif
  690. SRTSOCKET id = ns->m_SocketID;
  691. ns->core().closeInternal();
  692. ns->setClosed();
  693. // The mapped socket should be now unmapped to preserve the situation that
  694. // was in the original UDT code.
  695. // In SRT additionally the acceptAndRespond() function (it was called probably
  696. // connect() in UDT code) may fail, in which case this socket should not be
  697. // further processed and should be removed.
  698. {
  699. ScopedLock cg(m_GlobControlLock);
  700. #if ENABLE_BONDING
  701. if (ns->m_GroupOf)
  702. {
  703. HLOGC(smlog.Debug,
  704. log << "@" << ns->m_SocketID << " IS MEMBER OF $" << ns->m_GroupOf->id()
  705. << " - REMOVING FROM GROUP");
  706. ns->removeFromGroup(true);
  707. }
  708. #endif
  709. m_Sockets.erase(id);
  710. m_ClosedSockets[id] = ns;
  711. }
  712. return -1;
  713. }
  714. return 1;
  715. }
  716. // static forwarder
  717. int srt::CUDT::installAcceptHook(SRTSOCKET lsn, srt_listen_callback_fn* hook, void* opaq)
  718. {
  719. return uglobal().installAcceptHook(lsn, hook, opaq);
  720. }
  721. int srt::CUDTUnited::installAcceptHook(const SRTSOCKET lsn, srt_listen_callback_fn* hook, void* opaq)
  722. {
  723. try
  724. {
  725. CUDTSocket* s = locateSocket(lsn, ERH_THROW);
  726. s->core().installAcceptHook(hook, opaq);
  727. }
  728. catch (CUDTException& e)
  729. {
  730. SetThreadLocalError(e);
  731. return SRT_ERROR;
  732. }
  733. return 0;
  734. }
  735. int srt::CUDT::installConnectHook(SRTSOCKET lsn, srt_connect_callback_fn* hook, void* opaq)
  736. {
  737. return uglobal().installConnectHook(lsn, hook, opaq);
  738. }
  739. int srt::CUDTUnited::installConnectHook(const SRTSOCKET u, srt_connect_callback_fn* hook, void* opaq)
  740. {
  741. try
  742. {
  743. #if ENABLE_BONDING
  744. if (u & SRTGROUP_MASK)
  745. {
  746. GroupKeeper k(*this, u, ERH_THROW);
  747. k.group->installConnectHook(hook, opaq);
  748. return 0;
  749. }
  750. #endif
  751. CUDTSocket* s = locateSocket(u, ERH_THROW);
  752. s->core().installConnectHook(hook, opaq);
  753. }
  754. catch (CUDTException& e)
  755. {
  756. SetThreadLocalError(e);
  757. return SRT_ERROR;
  758. }
  759. return 0;
  760. }
  761. SRT_SOCKSTATUS srt::CUDTUnited::getStatus(const SRTSOCKET u)
  762. {
  763. // protects the m_Sockets structure
  764. ScopedLock cg(m_GlobControlLock);
  765. sockets_t::const_iterator i = m_Sockets.find(u);
  766. if (i == m_Sockets.end())
  767. {
  768. if (m_ClosedSockets.find(u) != m_ClosedSockets.end())
  769. return SRTS_CLOSED;
  770. return SRTS_NONEXIST;
  771. }
  772. return i->second->getStatus();
  773. }
  774. int srt::CUDTUnited::bind(CUDTSocket* s, const sockaddr_any& name)
  775. {
  776. ScopedLock cg(s->m_ControlLock);
  777. // cannot bind a socket more than once
  778. if (s->m_Status != SRTS_INIT)
  779. throw CUDTException(MJ_NOTSUP, MN_NONE, 0);
  780. if (s->core().m_config.iIpV6Only == -1 && name.family() == AF_INET6 && name.isany())
  781. {
  782. // V6ONLY option must be set explicitly if you want to bind to a wildcard address in IPv6
  783. HLOGP(smlog.Error,
  784. "bind: when binding to :: (IPv6 wildcard), SRTO_IPV6ONLY option must be set explicitly to 0 or 1");
  785. throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
  786. }
  787. s->core().open();
  788. updateMux(s, name);
  789. s->m_Status = SRTS_OPENED;
  790. // copy address information of local node
  791. s->core().m_pSndQueue->m_pChannel->getSockAddr((s->m_SelfAddr));
  792. return 0;
  793. }
  794. int srt::CUDTUnited::bind(CUDTSocket* s, UDPSOCKET udpsock)
  795. {
  796. ScopedLock cg(s->m_ControlLock);
  797. // cannot bind a socket more than once
  798. if (s->m_Status != SRTS_INIT)
  799. throw CUDTException(MJ_NOTSUP, MN_NONE, 0);
  800. sockaddr_any name;
  801. socklen_t namelen = sizeof name; // max of inet and inet6
  802. // This will preset the sa_family as well; the namelen is given simply large
  803. // enough for any family here.
  804. if (::getsockname(udpsock, &name.sa, &namelen) == -1)
  805. throw CUDTException(MJ_NOTSUP, MN_INVAL);
  806. // Successfully extracted, so update the size
  807. name.len = namelen;
  808. s->core().open();
  809. updateMux(s, name, &udpsock);
  810. s->m_Status = SRTS_OPENED;
  811. // copy address information of local node
  812. s->core().m_pSndQueue->m_pChannel->getSockAddr(s->m_SelfAddr);
  813. return 0;
  814. }
  815. int srt::CUDTUnited::listen(const SRTSOCKET u, int backlog)
  816. {
  817. if (backlog <= 0)
  818. throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
  819. // Don't search for the socket if it's already -1;
  820. // this never is a valid socket.
  821. if (u == UDT::INVALID_SOCK)
  822. throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
  823. CUDTSocket* s = locateSocket(u);
  824. if (!s)
  825. throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
  826. ScopedLock cg(s->m_ControlLock);
  827. // NOTE: since now the socket is protected against simultaneous access.
  828. // In the meantime the socket might have been closed, which means that
  829. // it could have changed the state. It could be also set listen in another
  830. // thread, so check it out.
  831. // do nothing if the socket is already listening
  832. if (s->m_Status == SRTS_LISTENING)
  833. return 0;
  834. // a socket can listen only if is in OPENED status
  835. if (s->m_Status != SRTS_OPENED)
  836. throw CUDTException(MJ_NOTSUP, MN_ISUNBOUND, 0);
  837. // [[using assert(s->m_Status == OPENED)]];
  838. // listen is not supported in rendezvous connection setup
  839. if (s->core().m_config.bRendezvous)
  840. throw CUDTException(MJ_NOTSUP, MN_ISRENDEZVOUS, 0);
  841. s->m_uiBackLog = backlog;
  842. // [[using assert(s->m_Status == OPENED)]]; // (still, unchanged)
  843. s->core().setListenState(); // propagates CUDTException,
  844. // if thrown, remains in OPENED state if so.
  845. s->m_Status = SRTS_LISTENING;
  846. return 0;
  847. }
  848. SRTSOCKET srt::CUDTUnited::accept_bond(const SRTSOCKET listeners[], int lsize, int64_t msTimeOut)
  849. {
  850. CEPollDesc* ed = 0;
  851. int eid = m_EPoll.create(&ed);
  852. // Destroy it at return - this function can be interrupted
  853. // by an exception.
  854. struct AtReturn
  855. {
  856. int eid;
  857. CUDTUnited* that;
  858. AtReturn(CUDTUnited* t, int e)
  859. : eid(e)
  860. , that(t)
  861. {
  862. }
  863. ~AtReturn() { that->m_EPoll.release(eid); }
  864. } l_ar(this, eid);
  865. // Subscribe all of listeners for accept
  866. int events = SRT_EPOLL_ACCEPT;
  867. for (int i = 0; i < lsize; ++i)
  868. {
  869. srt_epoll_add_usock(eid, listeners[i], &events);
  870. }
  871. CEPoll::fmap_t st;
  872. m_EPoll.swait(*ed, (st), msTimeOut, true);
  873. if (st.empty())
  874. {
  875. // Sanity check
  876. throw CUDTException(MJ_AGAIN, MN_XMTIMEOUT, 0);
  877. }
  878. // Theoretically we can have a situation that more than one
  879. // listener is ready for accept. In this case simply get
  880. // only the first found.
  881. int lsn = st.begin()->first;
  882. sockaddr_storage dummy;
  883. int outlen = sizeof dummy;
  884. return accept(lsn, ((sockaddr*)&dummy), (&outlen));
  885. }
  886. SRTSOCKET srt::CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int* pw_addrlen)
  887. {
  888. if (pw_addr && !pw_addrlen)
  889. {
  890. LOGC(cnlog.Error, log << "srt_accept: provided address, but address length parameter is missing");
  891. throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
  892. }
  893. CUDTSocket* ls = locateSocket(listen);
  894. if (ls == NULL)
  895. {
  896. LOGC(cnlog.Error, log << "srt_accept: invalid listener socket ID value: " << listen);
  897. throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
  898. }
  899. // the "listen" socket must be in LISTENING status
  900. if (ls->m_Status != SRTS_LISTENING)
  901. {
  902. LOGC(cnlog.Error, log << "srt_accept: socket @" << listen << " is not in listening state (forgot srt_listen?)");
  903. throw CUDTException(MJ_NOTSUP, MN_NOLISTEN, 0);
  904. }
  905. // no "accept" in rendezvous connection setup
  906. if (ls->core().m_config.bRendezvous)
  907. {
  908. LOGC(cnlog.Fatal,
  909. log << "CUDTUnited::accept: RENDEZVOUS flag passed through check in srt_listen when it set listen state");
  910. // This problem should never happen because `srt_listen` function should have
  911. // checked this situation before and not set listen state in result.
  912. // Inform the user about the invalid state in the universal way.
  913. throw CUDTException(MJ_NOTSUP, MN_NOLISTEN, 0);
  914. }
  915. SRTSOCKET u = CUDT::INVALID_SOCK;
  916. bool accepted = false;
  917. // !!only one connection can be set up each time!!
  918. while (!accepted)
  919. {
  920. UniqueLock accept_lock(ls->m_AcceptLock);
  921. CSync accept_sync(ls->m_AcceptCond, accept_lock);
  922. if ((ls->m_Status != SRTS_LISTENING) || ls->core().m_bBroken)
  923. {
  924. // This socket has been closed.
  925. accepted = true;
  926. }
  927. else if (ls->m_QueuedSockets.size() > 0)
  928. {
  929. set<SRTSOCKET>::iterator b = ls->m_QueuedSockets.begin();
  930. u = *b;
  931. ls->m_QueuedSockets.erase(b);
  932. accepted = true;
  933. }
  934. else if (!ls->core().m_config.bSynRecving)
  935. {
  936. accepted = true;
  937. }
  938. if (!accepted && (ls->m_Status == SRTS_LISTENING))
  939. accept_sync.wait();
  940. if (ls->m_QueuedSockets.empty())
  941. m_EPoll.update_events(listen, ls->core().m_sPollID, SRT_EPOLL_ACCEPT, false);
  942. }
  943. if (u == CUDT::INVALID_SOCK)
  944. {
  945. // non-blocking receiving, no connection available
  946. if (!ls->core().m_config.bSynRecving)
  947. {
  948. LOGC(cnlog.Error, log << "srt_accept: no pending connection available at the moment");
  949. throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0);
  950. }
  951. LOGC(cnlog.Error, log << "srt_accept: listener socket @" << listen << " is already closed");
  952. // listening socket is closed
  953. throw CUDTException(MJ_SETUP, MN_CLOSED, 0);
  954. }
  955. CUDTSocket* s = locateSocket(u);
  956. if (s == NULL)
  957. {
  958. LOGC(cnlog.Error, log << "srt_accept: pending connection has unexpectedly closed");
  959. throw CUDTException(MJ_SETUP, MN_CLOSED, 0);
  960. }
  961. // Set properly the SRTO_GROUPCONNECT flag
  962. s->core().m_config.iGroupConnect = 0;
  963. // Check if LISTENER has the SRTO_GROUPCONNECT flag set,
  964. // and the already accepted socket has successfully joined
  965. // the mirror group. If so, RETURN THE GROUP ID, not the socket ID.
  966. #if ENABLE_BONDING
  967. if (ls->core().m_config.iGroupConnect == 1 && s->m_GroupOf)
  968. {
  969. // Put a lock to protect the group against accidental deletion
  970. // in the meantime.
  971. ScopedLock glock(m_GlobControlLock);
  972. // Check again; it's unlikely to happen, but
  973. // it's a theoretically possible scenario
  974. if (s->m_GroupOf)
  975. {
  976. u = s->m_GroupOf->m_GroupID;
  977. s->core().m_config.iGroupConnect = 1; // should be derived from ls, but make sure
  978. // Mark the beginning of the connection at the moment
  979. // when the group ID is returned to the app caller
  980. s->m_GroupOf->m_stats.tsLastSampleTime = steady_clock::now();
  981. }
  982. else
  983. {
  984. LOGC(smlog.Error, log << "accept: IPE: socket's group deleted in the meantime of accept process???");
  985. }
  986. }
  987. #endif
  988. ScopedLock cg(s->m_ControlLock);
  989. if (pw_addr != NULL && pw_addrlen != NULL)
  990. {
  991. // Check if the length of the buffer to fill the name in
  992. // was large enough.
  993. const int len = s->m_PeerAddr.size();
  994. if (*pw_addrlen < len)
  995. throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
  996. memcpy((pw_addr), &s->m_PeerAddr, len);
  997. *pw_addrlen = len;
  998. }
  999. return u;
  1000. }
  1001. int srt::CUDTUnited::connect(SRTSOCKET u, const sockaddr* srcname, const sockaddr* tarname, int namelen)
  1002. {
  1003. // Here both srcname and tarname must be specified
  1004. if (!srcname || !tarname || namelen < int(sizeof(sockaddr_in)))
  1005. {
  1006. LOGC(aclog.Error,
  1007. log << "connect(with source): invalid call: srcname=" << srcname << " tarname=" << tarname
  1008. << " namelen=" << namelen);
  1009. throw CUDTException(MJ_NOTSUP, MN_INVAL);
  1010. }
  1011. sockaddr_any source_addr(srcname, namelen);
  1012. if (source_addr.len == 0)
  1013. throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
  1014. sockaddr_any target_addr(tarname, namelen);
  1015. if (target_addr.len == 0)
  1016. throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
  1017. #if ENABLE_BONDING
  1018. // Check affiliation of the socket. It's now allowed for it to be
  1019. // a group or socket. For a group, add automatically a socket to
  1020. // the group.
  1021. if (u & SRTGROUP_MASK)
  1022. {
  1023. GroupKeeper k(*this, u, ERH_THROW);
  1024. // Note: forced_isn is ignored when connecting a group.
  1025. // The group manages the ISN by itself ALWAYS, that is,
  1026. // it's generated anew for the very first socket, and then
  1027. // derived by all sockets in the group.
  1028. SRT_SOCKGROUPCONFIG gd[1] = {srt_prepare_endpoint(srcname, tarname, namelen)};
  1029. // When connecting to exactly one target, only this very target
  1030. // can be returned as a socket, so rewritten back array can be ignored.
  1031. return singleMemberConnect(k.group, gd);
  1032. }
  1033. #endif
  1034. CUDTSocket* s = locateSocket(u);
  1035. if (s == NULL)
  1036. throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
  1037. // For a single socket, just do bind, then connect
  1038. bind(s, source_addr);
  1039. return connectIn(s, target_addr, SRT_SEQNO_NONE);
  1040. }
  1041. int srt::CUDTUnited::connect(const SRTSOCKET u, const sockaddr* name, int namelen, int32_t forced_isn)
  1042. {
  1043. if (!name || namelen < int(sizeof(sockaddr_in)))
  1044. {
  1045. LOGC(aclog.Error, log << "connect(): invalid call: name=" << name << " namelen=" << namelen);
  1046. throw CUDTException(MJ_NOTSUP, MN_INVAL);
  1047. }
  1048. sockaddr_any target_addr(name, namelen);
  1049. if (target_addr.len == 0)
  1050. throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
  1051. #if ENABLE_BONDING
  1052. // Check affiliation of the socket. It's now allowed for it to be
  1053. // a group or socket. For a group, add automatically a socket to
  1054. // the group.
  1055. if (u & SRTGROUP_MASK)
  1056. {
  1057. GroupKeeper k(*this, u, ERH_THROW);
  1058. // Note: forced_isn is ignored when connecting a group.
  1059. // The group manages the ISN by itself ALWAYS, that is,
  1060. // it's generated anew for the very first socket, and then
  1061. // derived by all sockets in the group.
  1062. SRT_SOCKGROUPCONFIG gd[1] = {srt_prepare_endpoint(NULL, name, namelen)};
  1063. return singleMemberConnect(k.group, gd);
  1064. }
  1065. #endif
  1066. CUDTSocket* s = locateSocket(u);
  1067. if (!s)
  1068. throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
  1069. return connectIn(s, target_addr, forced_isn);
  1070. }
  1071. #if ENABLE_BONDING
  1072. int srt::CUDTUnited::singleMemberConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* gd)
  1073. {
  1074. int gstat = groupConnect(pg, gd, 1);
  1075. if (gstat == -1)
  1076. {
  1077. // We have only one element here, so refer to it.
  1078. // Sanity check
  1079. if (gd->errorcode == SRT_SUCCESS)
  1080. gd->errorcode = SRT_EINVPARAM;
  1081. CodeMajor mj = CodeMajor(gd->errorcode / 1000);
  1082. CodeMinor mn = CodeMinor(gd->errorcode % 1000);
  1083. return CUDT::APIError(mj, mn);
  1084. }
  1085. return gstat;
  1086. }
  1087. // [[using assert(pg->m_iBusy > 0)]]
  1088. int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int arraysize)
  1089. {
  1090. CUDTGroup& g = *pg;
  1091. SRT_ASSERT(g.m_iBusy > 0);
  1092. // Check and report errors on data brought in by srt_prepare_endpoint,
  1093. // as the latter function has no possibility to report errors.
  1094. for (int tii = 0; tii < arraysize; ++tii)
  1095. {
  1096. if (targets[tii].srcaddr.ss_family != targets[tii].peeraddr.ss_family)
  1097. {
  1098. LOGC(aclog.Error, log << "srt_connect/group: family differs on source and target address");
  1099. throw CUDTException(MJ_NOTSUP, MN_INVAL);
  1100. }
  1101. if (targets[tii].weight > CUDT::MAX_WEIGHT)
  1102. {
  1103. LOGC(aclog.Error, log << "srt_connect/group: weight value must be between 0 and " << (+CUDT::MAX_WEIGHT));
  1104. throw CUDTException(MJ_NOTSUP, MN_INVAL);
  1105. }
  1106. }
  1107. // If the open state switched to OPENED, the blocking mode
  1108. // must make it wait for connecting it. Doing connect when the
  1109. // group is already OPENED returns immediately, regardless if the
  1110. // connection is going to later succeed or fail (this will be
  1111. // known in the group state information).
  1112. bool block_new_opened = !g.m_bOpened && g.m_bSynRecving;
  1113. const bool was_empty = g.groupEmpty();
  1114. // In case the group was retried connection, clear first all epoll readiness.
  1115. const int ncleared = m_EPoll.update_events(g.id(), g.m_sPollID, SRT_EPOLL_ERR, false);
  1116. if (was_empty || ncleared)
  1117. {
  1118. HLOGC(aclog.Debug,
  1119. log << "srt_connect/group: clearing IN/OUT because was_empty=" << was_empty
  1120. << " || ncleared=" << ncleared);
  1121. // IN/OUT only in case when the group is empty, otherwise it would
  1122. // clear out correct readiness resulting from earlier calls.
  1123. // This also should happen if ERR flag was set, as IN and OUT could be set, too.
  1124. m_EPoll.update_events(g.id(), g.m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT, false);
  1125. }
  1126. SRTSOCKET retval = -1;
  1127. int eid = -1;
  1128. int connect_modes = SRT_EPOLL_CONNECT | SRT_EPOLL_ERR;
  1129. if (block_new_opened)
  1130. {
  1131. // Create this eid only to block-wait for the first
  1132. // connection.
  1133. eid = srt_epoll_create();
  1134. }
  1135. // Use private map to avoid searching in the
  1136. // overall map.
  1137. map<SRTSOCKET, CUDTSocket*> spawned;
  1138. HLOGC(aclog.Debug,
  1139. log << "groupConnect: will connect " << arraysize << " links and "
  1140. << (block_new_opened ? "BLOCK until any is ready" : "leave the process in background"));
  1141. for (int tii = 0; tii < arraysize; ++tii)
  1142. {
  1143. sockaddr_any target_addr(targets[tii].peeraddr);
  1144. sockaddr_any source_addr(targets[tii].srcaddr);
  1145. SRTSOCKET& sid_rloc = targets[tii].id;
  1146. int& erc_rloc = targets[tii].errorcode;
  1147. erc_rloc = SRT_SUCCESS; // preinitialized
  1148. HLOGC(aclog.Debug, log << "groupConnect: taking on " << sockaddr_any(targets[tii].peeraddr).str());
  1149. CUDTSocket* ns = 0;
  1150. // NOTE: After calling newSocket, the socket is mapped into m_Sockets.
  1151. // It must be MANUALLY removed from this list in case we need it deleted.
  1152. SRTSOCKET sid = newSocket(&ns);
  1153. if (pg->m_cbConnectHook)
  1154. {
  1155. // Derive the connect hook by the socket, if set on the group
  1156. ns->core().m_cbConnectHook = pg->m_cbConnectHook;
  1157. }
  1158. SRT_SocketOptionObject* config = targets[tii].config;
  1159. // XXX Support non-blocking mode:
  1160. // If the group has nonblocking set for connect (SNDSYN),
  1161. // then it must set so on the socket. Then, the connection
  1162. // process is asynchronous. The socket appears first as
  1163. // GST_PENDING state, and only after the socket becomes
  1164. // connected does its status in the group turn into GST_IDLE.
  1165. // Set all options that were requested by the options set on a group
  1166. // prior to connecting.
  1167. string error_reason SRT_ATR_UNUSED;
  1168. try
  1169. {
  1170. for (size_t i = 0; i < g.m_config.size(); ++i)
  1171. {
  1172. HLOGC(aclog.Debug, log << "groupConnect: OPTION @" << sid << " #" << g.m_config[i].so);
  1173. error_reason = "setting group-derived option: #" + Sprint(g.m_config[i].so);
  1174. ns->core().setOpt(g.m_config[i].so, &g.m_config[i].value[0], (int)g.m_config[i].value.size());
  1175. }
  1176. // Do not try to set a user option if failed already.
  1177. if (config)
  1178. {
  1179. error_reason = "user option";
  1180. ns->core().applyMemberConfigObject(*config);
  1181. }
  1182. error_reason = "bound address";
  1183. // We got it. Bind the socket, if the source address was set
  1184. if (!source_addr.empty())
  1185. bind(ns, source_addr);
  1186. }
  1187. catch (CUDTException& e)
  1188. {
  1189. // Just notify the problem, but the loop must continue.
  1190. // Set the original error as reported.
  1191. targets[tii].errorcode = e.getErrorCode();
  1192. LOGC(aclog.Error, log << "srt_connect_group: failed to set " << error_reason);
  1193. }
  1194. catch (...)
  1195. {
  1196. // Set the general EINVPARAM - this error should never happen
  1197. LOGC(aclog.Error, log << "IPE: CUDT::setOpt reported unknown exception");
  1198. targets[tii].errorcode = SRT_EINVPARAM;
  1199. }
  1200. // Add socket to the group.
  1201. // Do it after setting all stored options, as some of them may
  1202. // influence some group data.
  1203. srt::groups::SocketData data = srt::groups::prepareSocketData(ns);
  1204. if (targets[tii].token != -1)
  1205. {
  1206. // Reuse the token, if specified by the caller
  1207. data.token = targets[tii].token;
  1208. }
  1209. else
  1210. {
  1211. // Otherwise generate and write back the token
  1212. data.token = CUDTGroup::genToken();
  1213. targets[tii].token = data.token;
  1214. }
  1215. {
  1216. ScopedLock cs(m_GlobControlLock);
  1217. if (m_Sockets.count(sid) == 0)
  1218. {
  1219. HLOGC(aclog.Debug, log << "srt_connect_group: socket @" << sid << " deleted in process");
  1220. // Someone deleted the socket in the meantime?
  1221. // Unlikely, but possible in theory.
  1222. // Don't delete anyhting - it's alreay done.
  1223. continue;
  1224. }
  1225. // There's nothing wrong with preparing the data first
  1226. // even if this happens for nothing. But now, under the lock
  1227. // and after checking that the socket still exists, check now
  1228. // if this succeeded, and then also if the group is still usable.
  1229. // The group will surely exist because it's set busy, until the
  1230. // end of this function. But it might be simultaneously requested closed.
  1231. bool proceed = true;
  1232. if (targets[tii].errorcode != SRT_SUCCESS)
  1233. {
  1234. HLOGC(aclog.Debug,
  1235. log << "srt_connect_group: not processing @" << sid << " due to error in setting options");
  1236. proceed = false;
  1237. }
  1238. if (g.m_bClosing)
  1239. {
  1240. HLOGC(aclog.Debug,
  1241. log << "srt_connect_group: not processing @" << sid << " due to CLOSED GROUP $" << g.m_GroupID);
  1242. proceed = false;
  1243. }
  1244. if (proceed)
  1245. {
  1246. CUDTGroup::SocketData* f = g.add(data);
  1247. ns->m_GroupMemberData = f;
  1248. ns->m_GroupOf = &g;
  1249. f->weight = targets[tii].weight;
  1250. HLOGC(aclog.Debug, log << "srt_connect_group: socket @" << sid << " added to group $" << g.m_GroupID);
  1251. }
  1252. else
  1253. {
  1254. targets[tii].id = CUDT::INVALID_SOCK;
  1255. delete ns;
  1256. m_Sockets.erase(sid);
  1257. // If failed to set options, then do not continue
  1258. // neither with binding, nor with connecting.
  1259. continue;
  1260. }
  1261. }
  1262. // XXX This should be reenabled later, this should
  1263. // be probably still in use to exchange information about
  1264. // packets asymmetrically lost. But for no other purpose.
  1265. /*
  1266. ns->core().m_cbPacketArrival.set(ns->m_pUDT, &CUDT::groupPacketArrival);
  1267. */
  1268. int isn = g.currentSchedSequence();
  1269. // Set it the groupconnect option, as all in-group sockets should have.
  1270. ns->core().m_config.iGroupConnect = 1;
  1271. // Every group member will have always nonblocking
  1272. // (this implies also non-blocking connect/accept).
  1273. // The group facility functions will block when necessary
  1274. // using epoll_wait.
  1275. ns->core().m_config.bSynRecving = false;
  1276. ns->core().m_config.bSynSending = false;
  1277. HLOGC(aclog.Debug, log << "groupConnect: NOTIFIED AS PENDING @" << sid << " both read and write");
  1278. // If this socket is not to block the current connect process,
  1279. // it may still be needed for the further check if the redundant
  1280. // connection succeeded or failed and whether the new socket is
  1281. // ready to use or needs to be closed.
  1282. epoll_add_usock_INTERNAL(g.m_SndEID, ns, &connect_modes);
  1283. epoll_add_usock_INTERNAL(g.m_RcvEID, ns, &connect_modes);
  1284. // Adding a socket on which we need to block to BOTH these tracking EIDs
  1285. // and the blocker EID. We'll simply remove from them later all sockets that
  1286. // got connected state or were broken.
  1287. if (block_new_opened)
  1288. {
  1289. HLOGC(aclog.Debug, log << "groupConnect: WILL BLOCK on @" << sid << " until connected");
  1290. epoll_add_usock_INTERNAL(eid, ns, &connect_modes);
  1291. }
  1292. // And connect
  1293. try
  1294. {
  1295. HLOGC(aclog.Debug, log << "groupConnect: connecting a new socket with ISN=" << isn);
  1296. connectIn(ns, target_addr, isn);
  1297. }
  1298. catch (const CUDTException& e)
  1299. {
  1300. LOGC(aclog.Error,
  1301. log << "groupConnect: socket @" << sid << " in group " << pg->id() << " failed to connect");
  1302. // We know it does belong to a group.
  1303. // Remove it first because this involves a mutex, and we want
  1304. // to avoid locking more than one mutex at a time.
  1305. erc_rloc = e.getErrorCode();
  1306. targets[tii].errorcode = e.getErrorCode();
  1307. targets[tii].id = CUDT::INVALID_SOCK;
  1308. ScopedLock cl(m_GlobControlLock);
  1309. ns->removeFromGroup(false);
  1310. m_Sockets.erase(ns->m_SocketID);
  1311. // Intercept to delete the socket on failure.
  1312. delete ns;
  1313. continue;
  1314. }
  1315. catch (...)
  1316. {
  1317. LOGC(aclog.Fatal, log << "groupConnect: IPE: UNKNOWN EXCEPTION from connectIn");
  1318. targets[tii].errorcode = SRT_ESYSOBJ;
  1319. targets[tii].id = CUDT::INVALID_SOCK;
  1320. ScopedLock cl(m_GlobControlLock);
  1321. ns->removeFromGroup(false);
  1322. m_Sockets.erase(ns->m_SocketID);
  1323. // Intercept to delete the socket on failure.
  1324. delete ns;
  1325. // Do not use original exception, it may crash off a C API.
  1326. throw CUDTException(MJ_SYSTEMRES, MN_OBJECT);
  1327. }
  1328. SRT_SOCKSTATUS st;
  1329. {
  1330. ScopedLock grd(ns->m_ControlLock);
  1331. st = ns->getStatus();
  1332. }
  1333. {
  1334. // NOTE: Not applying m_GlobControlLock because the group is now
  1335. // set busy, so it won't be deleted, even if it was requested to be closed.
  1336. ScopedLock grd(g.m_GroupLock);
  1337. if (!ns->m_GroupOf)
  1338. {
  1339. // The situation could get changed between the unlock and lock of m_GroupLock.
  1340. // This must be checked again.
  1341. // If a socket has been removed from group, it means that some other thread is
  1342. // currently trying to delete the socket. Therefore it doesn't have, and even shouldn't,
  1343. // be deleted here. Just exit with error report.
  1344. LOGC(aclog.Error, log << "groupConnect: self-created member socket deleted during process, SKIPPING.");
  1345. // Do not report the error from here, just ignore this socket.
  1346. continue;
  1347. }
  1348. // If m_GroupOf is not NULL, the m_IncludedIter is still valid.
  1349. CUDTGroup::SocketData* f = ns->m_GroupMemberData;
  1350. // Now under a group lock, we need to make sure the group isn't being closed
  1351. // in order not to add a socket to a dead group.
  1352. if (g.m_bClosing)
  1353. {
  1354. LOGC(aclog.Error, log << "groupConnect: group deleted while connecting; breaking the process");
  1355. // Set the status as pending so that the socket is taken care of later.
  1356. // Note that all earlier sockets that were processed in this loop were either
  1357. // set BROKEN or PENDING.
  1358. f->sndstate = SRT_GST_PENDING;
  1359. f->rcvstate = SRT_GST_PENDING;
  1360. retval = -1;
  1361. break;
  1362. }
  1363. HLOGC(aclog.Debug,
  1364. log << "groupConnect: @" << sid << " connection successful, setting group OPEN (was "
  1365. << (g.m_bOpened ? "ALREADY" : "NOT") << "), will " << (block_new_opened ? "" : "NOT ")
  1366. << "block the connect call, status:" << SockStatusStr(st));
  1367. // XXX OPEN OR CONNECTED?
  1368. // BLOCK IF NOT OPEN OR BLOCK IF NOT CONNECTED?
  1369. //
  1370. // What happens to blocking when there are 2 connections
  1371. // pending, about to be broken, and srt_connect() is called again?
  1372. // SHOULD BLOCK the latter, even though is OPEN.
  1373. // Or, OPEN should be removed from here and srt_connect(_group)
  1374. // should block always if the group doesn't have neither 1 conencted link
  1375. g.m_bOpened = true;
  1376. g.m_stats.tsLastSampleTime = steady_clock::now();
  1377. f->laststatus = st;
  1378. // Check the socket status and update it.
  1379. // Turn the group state of the socket to IDLE only if
  1380. // connection is established or in progress
  1381. f->agent = source_addr;
  1382. f->peer = target_addr;
  1383. if (st >= SRTS_BROKEN)
  1384. {
  1385. f->sndstate = SRT_GST_BROKEN;
  1386. f->rcvstate = SRT_GST_BROKEN;
  1387. epoll_remove_socket_INTERNAL(g.m_SndEID, ns);
  1388. epoll_remove_socket_INTERNAL(g.m_RcvEID, ns);
  1389. }
  1390. else
  1391. {
  1392. f->sndstate = SRT_GST_PENDING;
  1393. f->rcvstate = SRT_GST_PENDING;
  1394. spawned[sid] = ns;
  1395. sid_rloc = sid;
  1396. erc_rloc = 0;
  1397. retval = sid;
  1398. }
  1399. }
  1400. }
  1401. if (retval == -1)
  1402. {
  1403. HLOGC(aclog.Debug, log << "groupConnect: none succeeded as background-spawn, exit with error");
  1404. block_new_opened = false; // Avoid executing further while loop
  1405. }
  1406. vector<SRTSOCKET> broken;
  1407. while (block_new_opened)
  1408. {
  1409. if (spawned.empty())
  1410. {
  1411. // All were removed due to errors.
  1412. retval = -1;
  1413. break;
  1414. }
  1415. HLOGC(aclog.Debug, log << "groupConnect: first connection, applying EPOLL WAITING.");
  1416. int len = (int)spawned.size();
  1417. vector<SRTSOCKET> ready(spawned.size());
  1418. const int estat = srt_epoll_wait(eid,
  1419. NULL,
  1420. NULL, // IN/ACCEPT
  1421. &ready[0],
  1422. &len, // OUT/CONNECT
  1423. -1, // indefinitely (FIXME Check if it needs to REGARD CONNECTION TIMEOUT!)
  1424. NULL,
  1425. NULL,
  1426. NULL,
  1427. NULL);
  1428. // Sanity check. Shouldn't happen if subs are in sync with spawned.
  1429. if (estat == -1)
  1430. {
  1431. #if ENABLE_LOGGING
  1432. CUDTException& x = CUDT::getlasterror();
  1433. if (x.getErrorCode() != SRT_EPOLLEMPTY)
  1434. {
  1435. LOGC(aclog.Error,
  1436. log << "groupConnect: srt_epoll_wait failed not because empty, unexpected IPE:"
  1437. << x.getErrorMessage());
  1438. }
  1439. #endif
  1440. HLOGC(aclog.Debug, log << "groupConnect: srt_epoll_wait failed - breaking the wait loop");
  1441. retval = -1;
  1442. break;
  1443. }
  1444. // At the moment when you are going to work with real sockets,
  1445. // lock the groups so that no one messes up with something here
  1446. // in the meantime.
  1447. ScopedLock lock(*g.exp_groupLock());
  1448. // NOTE: UNDER m_GroupLock, NO API FUNCTION CALLS DARE TO HAPPEN BELOW!
  1449. // Check first if a socket wasn't closed in the meantime. It will be
  1450. // automatically removed from all EIDs, but there's no sense in keeping
  1451. // them in 'spawned' map.
  1452. for (map<SRTSOCKET, CUDTSocket*>::iterator y = spawned.begin(); y != spawned.end(); ++y)
  1453. {
  1454. SRTSOCKET sid = y->first;
  1455. if (y->second->getStatus() >= SRTS_BROKEN)
  1456. {
  1457. HLOGC(aclog.Debug,
  1458. log << "groupConnect: Socket @" << sid
  1459. << " got BROKEN in the meantine during the check, remove from candidates");
  1460. // Remove from spawned and try again
  1461. broken.push_back(sid);
  1462. epoll_remove_socket_INTERNAL(eid, y->second);
  1463. epoll_remove_socket_INTERNAL(g.m_SndEID, y->second);
  1464. epoll_remove_socket_INTERNAL(g.m_RcvEID, y->second);
  1465. }
  1466. }
  1467. // Remove them outside the loop because this can't be done
  1468. // while iterating over the same container.
  1469. for (size_t i = 0; i < broken.size(); ++i)
  1470. spawned.erase(broken[i]);
  1471. // Check the sockets if they were reported due
  1472. // to have connected or due to have failed.
  1473. // Distill successful ones. If distilled nothing, return -1.
  1474. // If not all sockets were reported in this instance, repeat
  1475. // the call until you get information about all of them.
  1476. for (int i = 0; i < len; ++i)
  1477. {
  1478. map<SRTSOCKET, CUDTSocket*>::iterator x = spawned.find(ready[i]);
  1479. if (x == spawned.end())
  1480. {
  1481. // Might be removed above - ignore it.
  1482. continue;
  1483. }
  1484. SRTSOCKET sid = x->first;
  1485. CUDTSocket* s = x->second;
  1486. // Check status. If failed, remove from spawned
  1487. // and try again.
  1488. SRT_SOCKSTATUS st = s->getStatus();
  1489. if (st >= SRTS_BROKEN)
  1490. {
  1491. HLOGC(aclog.Debug,
  1492. log << "groupConnect: Socket @" << sid
  1493. << " got BROKEN during background connect, remove & TRY AGAIN");
  1494. // Remove from spawned and try again
  1495. if (spawned.erase(sid))
  1496. broken.push_back(sid);
  1497. epoll_remove_socket_INTERNAL(eid, s);
  1498. epoll_remove_socket_INTERNAL(g.m_SndEID, s);
  1499. epoll_remove_socket_INTERNAL(g.m_RcvEID, s);
  1500. continue;
  1501. }
  1502. if (st == SRTS_CONNECTED)
  1503. {
  1504. HLOGC(aclog.Debug,
  1505. log << "groupConnect: Socket @" << sid << " got CONNECTED as first in the group - reporting");
  1506. retval = sid;
  1507. g.m_bConnected = true;
  1508. block_new_opened = false; // Interrupt also rolling epoll (outer loop)
  1509. // Remove this socket from SND EID because it doesn't need to
  1510. // be connection-tracked anymore. Don't remove from the RCV EID
  1511. // however because RCV procedure relies on epoll also for reading
  1512. // and when found this socket connected it will "upgrade" it to
  1513. // read-ready tracking only.
  1514. epoll_remove_socket_INTERNAL(g.m_SndEID, s);
  1515. break;
  1516. }
  1517. // Spurious?
  1518. HLOGC(aclog.Debug,
  1519. log << "groupConnect: Socket @" << sid << " got spurious wakeup in " << SockStatusStr(st)
  1520. << " TRY AGAIN");
  1521. }
  1522. // END of m_GroupLock CS - you can safely use API functions now.
  1523. }
  1524. // Finished, delete epoll.
  1525. if (eid != -1)
  1526. {
  1527. HLOGC(aclog.Debug, log << "connect FIRST IN THE GROUP finished, removing E" << eid);
  1528. srt_epoll_release(eid);
  1529. }
  1530. for (vector<SRTSOCKET>::iterator b = broken.begin(); b != broken.end(); ++b)
  1531. {
  1532. CUDTSocket* s = locateSocket(*b, ERH_RETURN);
  1533. if (!s)
  1534. continue;
  1535. // This will also automatically remove it from the group and all eids
  1536. close(s);
  1537. }
  1538. // There's no possibility to report a problem on every connection
  1539. // separately in case when every single connection has failed. What
  1540. // is more interesting, it's only a matter of luck that all connections
  1541. // fail at exactly the same time. OTOH if all are to fail, this
  1542. // function will still be polling sockets to determine the last man
  1543. // standing. Each one could, however, break by a different reason,
  1544. // for example, one by timeout, another by wrong passphrase. Check
  1545. // the `errorcode` field to determine the reaon for particular link.
  1546. if (retval == -1)
  1547. throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
  1548. return retval;
  1549. }
  1550. #endif
  1551. int srt::CUDTUnited::connectIn(CUDTSocket* s, const sockaddr_any& target_addr, int32_t forced_isn)
  1552. {
  1553. ScopedLock cg(s->m_ControlLock);
  1554. // a socket can "connect" only if it is in the following states:
  1555. // - OPENED: assume the socket binding parameters are configured
  1556. // - INIT: configure binding parameters here
  1557. // - any other (meaning, already connected): report error
  1558. if (s->m_Status == SRTS_INIT)
  1559. {
  1560. if (s->core().m_config.bRendezvous)
  1561. throw CUDTException(MJ_NOTSUP, MN_ISRENDUNBOUND, 0);
  1562. // If bind() was done first on this socket, then the
  1563. // socket will not perform this step. This actually does the
  1564. // same thing as bind() does, just with empty address so that
  1565. // the binding parameters are autoselected.
  1566. s->core().open();
  1567. sockaddr_any autoselect_sa(target_addr.family());
  1568. // This will create such a sockaddr_any that
  1569. // will return true from empty().
  1570. updateMux(s, autoselect_sa); // <<---- updateMux
  1571. // -> C(Snd|Rcv)Queue::init
  1572. // -> pthread_create(...C(Snd|Rcv)Queue::worker...)
  1573. s->m_Status = SRTS_OPENED;
  1574. }
  1575. else
  1576. {
  1577. if (s->m_Status != SRTS_OPENED)
  1578. throw CUDTException(MJ_NOTSUP, MN_ISCONNECTED, 0);
  1579. // status = SRTS_OPENED, so family should be known already.
  1580. if (target_addr.family() != s->m_SelfAddr.family())
  1581. {
  1582. LOGP(cnlog.Error, "srt_connect: socket is bound to a different family than target address");
  1583. throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
  1584. }
  1585. }
  1586. // connect_complete() may be called before connect() returns.
  1587. // So we need to update the status before connect() is called,
  1588. // otherwise the status may be overwritten with wrong value
  1589. // (CONNECTED vs. CONNECTING).
  1590. s->m_Status = SRTS_CONNECTING;
  1591. /*
  1592. * In blocking mode, connect can block for up to 30 seconds for
  1593. * rendez-vous mode. Holding the s->m_ControlLock prevent close
  1594. * from cancelling the connect
  1595. */
  1596. try
  1597. {
  1598. // record peer address
  1599. s->m_PeerAddr = target_addr;
  1600. s->core().startConnect(target_addr, forced_isn);
  1601. }
  1602. catch (const CUDTException&) // Interceptor, just to change the state.
  1603. {
  1604. s->m_Status = SRTS_OPENED;
  1605. throw;
  1606. }
  1607. return 0;
  1608. }
  1609. int srt::CUDTUnited::close(const SRTSOCKET u)
  1610. {
  1611. #if ENABLE_BONDING
  1612. if (u & SRTGROUP_MASK)
  1613. {
  1614. GroupKeeper k(*this, u, ERH_THROW);
  1615. k.group->close();
  1616. deleteGroup(k.group);
  1617. return 0;
  1618. }
  1619. #endif
  1620. CUDTSocket* s = locateSocket(u);
  1621. if (!s)
  1622. throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
  1623. return close(s);
  1624. }
  1625. #if ENABLE_BONDING
  1626. void srt::CUDTUnited::deleteGroup(CUDTGroup* g)
  1627. {
  1628. using srt_logging::gmlog;
  1629. srt::sync::ScopedLock cg(m_GlobControlLock);
  1630. return deleteGroup_LOCKED(g);
  1631. }
  1632. // [[using locked(m_GlobControlLock)]]
  1633. void srt::CUDTUnited::deleteGroup_LOCKED(CUDTGroup* g)
  1634. {
  1635. SRT_ASSERT(g->groupEmpty());
  1636. // After that the group is no longer findable by GroupKeeper
  1637. m_Groups.erase(g->m_GroupID);
  1638. m_ClosedGroups[g->m_GroupID] = g;
  1639. // Paranoid check: since the group is in m_ClosedGroups
  1640. // it may potentially be deleted. Make sure no socket points
  1641. // to it. Actually all sockets should have been already removed
  1642. // from the group container, so if any does, it's invalid.
  1643. for (sockets_t::iterator i = m_Sockets.begin(); i != m_Sockets.end(); ++i)
  1644. {
  1645. CUDTSocket* s = i->second;
  1646. if (s->m_GroupOf == g)
  1647. {
  1648. HLOGC(smlog.Debug, log << "deleteGroup: IPE: existing @" << s->m_SocketID << " points to a dead group!");
  1649. s->m_GroupOf = NULL;
  1650. s->m_GroupMemberData = NULL;
  1651. }
  1652. }
  1653. // Just in case, do it in closed sockets, too, although this should be
  1654. // always done before moving to it.
  1655. for (sockets_t::iterator i = m_ClosedSockets.begin(); i != m_ClosedSockets.end(); ++i)
  1656. {
  1657. CUDTSocket* s = i->second;
  1658. if (s->m_GroupOf == g)
  1659. {
  1660. HLOGC(smlog.Debug, log << "deleteGroup: IPE: closed @" << s->m_SocketID << " points to a dead group!");
  1661. s->m_GroupOf = NULL;
  1662. s->m_GroupMemberData = NULL;
  1663. }
  1664. }
  1665. }
  1666. #endif
  1667. int srt::CUDTUnited::close(CUDTSocket* s)
  1668. {
  1669. HLOGC(smlog.Debug, log << s->core().CONID() << "CLOSE. Acquiring control lock");
  1670. ScopedLock socket_cg(s->m_ControlLock);
  1671. HLOGC(smlog.Debug, log << s->core().CONID() << "CLOSING (removing from listening, closing CUDT)");
  1672. const bool synch_close_snd = s->core().m_config.bSynSending;
  1673. SRTSOCKET u = s->m_SocketID;
  1674. if (s->m_Status == SRTS_LISTENING)
  1675. {
  1676. if (s->core().m_bBroken)
  1677. return 0;
  1678. s->m_tsClosureTimeStamp = steady_clock::now();
  1679. s->core().m_bBroken = true;
  1680. // Change towards original UDT:
  1681. // Leave all the closing activities for garbageCollect to happen,
  1682. // however remove the listener from the RcvQueue IMMEDIATELY.
  1683. // Even though garbageCollect would eventually remove the listener
  1684. // as well, there would be some time interval between now and the
  1685. // moment when it's done, and during this time the application will
  1686. // be unable to bind to this port that the about-to-delete listener
  1687. // is currently occupying (due to blocked slot in the RcvQueue).
  1688. HLOGC(smlog.Debug, log << s->core().CONID() << "CLOSING (removing listener immediately)");
  1689. s->core().notListening();
  1690. s->m_Status = SRTS_CLOSING;
  1691. // broadcast all "accept" waiting
  1692. CSync::lock_notify_all(s->m_AcceptCond, s->m_AcceptLock);
  1693. }
  1694. else
  1695. {
  1696. s->m_Status = SRTS_CLOSING;
  1697. // Note: this call may be done on a socket that hasn't finished
  1698. // sending all packets scheduled for sending, which means, this call
  1699. // may block INDEFINITELY. As long as it's acceptable to block the
  1700. // call to srt_close(), and all functions in all threads where this
  1701. // very socket is used, this shall not block the central database.
  1702. s->core().closeInternal();
  1703. // synchronize with garbage collection.
  1704. HLOGC(smlog.Debug,
  1705. log << "@" << u << "U::close done. GLOBAL CLOSE: " << s->core().CONID()
  1706. << "Acquiring GLOBAL control lock");
  1707. ScopedLock manager_cg(m_GlobControlLock);
  1708. // since "s" is located before m_GlobControlLock, locate it again in case
  1709. // it became invalid
  1710. // XXX This is very weird; if we state that the CUDTSocket object
  1711. // could not be deleted between locks, then definitely it couldn't
  1712. // also change the pointer value. There's no other reason for getting
  1713. // this iterator but to obtain the 's' pointer, which is impossible to
  1714. // be different than previous 's' (m_Sockets is a map that stores pointers
  1715. // transparently). This iterator isn't even later used to delete the socket
  1716. // from the container, though it would be more efficient.
  1717. // FURTHER RESEARCH REQUIRED.
  1718. sockets_t::iterator i = m_Sockets.find(u);
  1719. if ((i == m_Sockets.end()) || (i->second->m_Status == SRTS_CLOSED))
  1720. {
  1721. HLOGC(smlog.Debug, log << "@" << u << "U::close: NOT AN ACTIVE SOCKET, returning.");
  1722. return 0;
  1723. }
  1724. s = i->second;
  1725. s->setClosed();
  1726. #if ENABLE_BONDING
  1727. if (s->m_GroupOf)
  1728. {
  1729. HLOGC(smlog.Debug,
  1730. log << "@" << s->m_SocketID << " IS MEMBER OF $" << s->m_GroupOf->id() << " - REMOVING FROM GROUP");
  1731. s->removeFromGroup(true);
  1732. }
  1733. #endif
  1734. m_Sockets.erase(s->m_SocketID);
  1735. m_ClosedSockets[s->m_SocketID] = s;
  1736. HLOGC(smlog.Debug, log << "@" << u << "U::close: Socket MOVED TO CLOSED for collecting later.");
  1737. CGlobEvent::triggerEvent();
  1738. }
  1739. HLOGC(smlog.Debug, log << "@" << u << ": GLOBAL: CLOSING DONE");
  1740. // Check if the ID is still in closed sockets before you access it
  1741. // (the last triggerEvent could have deleted it).
  1742. if (synch_close_snd)
  1743. {
  1744. #if SRT_ENABLE_CLOSE_SYNCH
  1745. HLOGC(smlog.Debug, log << "@" << u << " GLOBAL CLOSING: sync-waiting for releasing sender resources...");
  1746. for (;;)
  1747. {
  1748. CSndBuffer* sb = s->core().m_pSndBuffer;
  1749. // Disconnected from buffer - nothing more to check.
  1750. if (!sb)
  1751. {
  1752. HLOGC(smlog.Debug,
  1753. log << "@" << u << " GLOBAL CLOSING: sending buffer disconnected. Allowed to close.");
  1754. break;
  1755. }
  1756. // Sender buffer empty
  1757. if (sb->getCurrBufSize() == 0)
  1758. {
  1759. HLOGC(smlog.Debug, log << "@" << u << " GLOBAL CLOSING: sending buffer depleted. Allowed to close.");
  1760. break;
  1761. }
  1762. // Ok, now you are keeping GC thread hands off the internal data.
  1763. // You can check then if it has already deleted the socket or not.
  1764. // The socket is either in m_ClosedSockets or is already gone.
  1765. // Done the other way, but still done. You can stop waiting.
  1766. bool isgone = false;
  1767. {
  1768. ScopedLock manager_cg(m_GlobControlLock);
  1769. isgone = m_ClosedSockets.count(u) == 0;
  1770. }
  1771. if (!isgone)
  1772. {
  1773. isgone = !s->core().m_bOpened;
  1774. }
  1775. if (isgone)
  1776. {
  1777. HLOGC(smlog.Debug,
  1778. log << "@" << u << " GLOBAL CLOSING: ... gone in the meantime, whatever. Exiting close().");
  1779. break;
  1780. }
  1781. HLOGC(smlog.Debug, log << "@" << u << " GLOBAL CLOSING: ... still waiting for any update.");
  1782. // How to handle a possible error here?
  1783. CGlobEvent::waitForEvent();
  1784. // Continue waiting in case when an event happened or 1s waiting time passed for checkpoint.
  1785. }
  1786. #endif
  1787. }
  1788. /*
  1789. This code is PUT ASIDE for now.
  1790. Most likely this will be never required.
  1791. It had to hold the closing activity until the time when the receiver buffer is depleted.
  1792. However the closing of the socket should only happen when the receiver has received
  1793. an information about that the reading is no longer possible (error report from recv/recvfile).
  1794. When this happens, the receiver buffer is definitely depleted already and there's no need to check
  1795. anything.
  1796. Should there appear any other conditions in future under which the closing process should be
  1797. delayed until the receiver buffer is empty, this code can be filled here.
  1798. if ( synch_close_rcv )
  1799. {
  1800. ...
  1801. }
  1802. */
  1803. CSync::notify_one_relaxed(m_GCStopCond);
  1804. return 0;
  1805. }
  1806. void srt::CUDTUnited::getpeername(const SRTSOCKET u, sockaddr* pw_name, int* pw_namelen)
  1807. {
  1808. if (!pw_name || !pw_namelen)
  1809. throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
  1810. if (getStatus(u) != SRTS_CONNECTED)
  1811. throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
  1812. CUDTSocket* s = locateSocket(u);
  1813. if (!s)
  1814. throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
  1815. if (!s->core().m_bConnected || s->core().m_bBroken)
  1816. throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
  1817. const int len = s->m_PeerAddr.size();
  1818. if (*pw_namelen < len)
  1819. throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
  1820. memcpy((pw_name), &s->m_PeerAddr.sa, len);
  1821. *pw_namelen = len;
  1822. }
  1823. void srt::CUDTUnited::getsockname(const SRTSOCKET u, sockaddr* pw_name, int* pw_namelen)
  1824. {
  1825. if (!pw_name || !pw_namelen)
  1826. throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
  1827. CUDTSocket* s = locateSocket(u);
  1828. if (!s)
  1829. throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
  1830. if (s->core().m_bBroken)
  1831. throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
  1832. if (s->m_Status == SRTS_INIT)
  1833. throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
  1834. const int len = s->m_SelfAddr.size();
  1835. if (*pw_namelen < len)
  1836. throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
  1837. memcpy((pw_name), &s->m_SelfAddr.sa, len);
  1838. *pw_namelen = len;
  1839. }
  1840. int srt::CUDTUnited::select(UDT::UDSET* readfds, UDT::UDSET* writefds, UDT::UDSET* exceptfds, const timeval* timeout)
  1841. {
  1842. const steady_clock::time_point entertime = steady_clock::now();
  1843. const int64_t timeo_us = timeout ? static_cast<int64_t>(timeout->tv_sec) * 1000000 + timeout->tv_usec : -1;
  1844. const steady_clock::duration timeo(microseconds_from(timeo_us));
  1845. // initialize results
  1846. int count = 0;
  1847. set<SRTSOCKET> rs, ws, es;
  1848. // retrieve related UDT sockets
  1849. vector<CUDTSocket*> ru, wu, eu;
  1850. CUDTSocket* s;
  1851. if (readfds)
  1852. for (set<SRTSOCKET>::iterator i1 = readfds->begin(); i1 != readfds->end(); ++i1)
  1853. {
  1854. if (getStatus(*i1) == SRTS_BROKEN)
  1855. {
  1856. rs.insert(*i1);
  1857. ++count;
  1858. }
  1859. else if (!(s = locateSocket(*i1)))
  1860. throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
  1861. else
  1862. ru.push_back(s);
  1863. }
  1864. if (writefds)
  1865. for (set<SRTSOCKET>::iterator i2 = writefds->begin(); i2 != writefds->end(); ++i2)
  1866. {
  1867. if (getStatus(*i2) == SRTS_BROKEN)
  1868. {
  1869. ws.insert(*i2);
  1870. ++count;
  1871. }
  1872. else if (!(s = locateSocket(*i2)))
  1873. throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
  1874. else
  1875. wu.push_back(s);
  1876. }
  1877. if (exceptfds)
  1878. for (set<SRTSOCKET>::iterator i3 = exceptfds->begin(); i3 != exceptfds->end(); ++i3)
  1879. {
  1880. if (getStatus(*i3) == SRTS_BROKEN)
  1881. {
  1882. es.insert(*i3);
  1883. ++count;
  1884. }
  1885. else if (!(s = locateSocket(*i3)))
  1886. throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
  1887. else
  1888. eu.push_back(s);
  1889. }
  1890. do
  1891. {
  1892. // query read sockets
  1893. for (vector<CUDTSocket*>::iterator j1 = ru.begin(); j1 != ru.end(); ++j1)
  1894. {
  1895. s = *j1;
  1896. if (s->readReady() || s->m_Status == SRTS_CLOSED)
  1897. {
  1898. rs.insert(s->m_SocketID);
  1899. ++count;
  1900. }
  1901. }
  1902. // query write sockets
  1903. for (vector<CUDTSocket*>::iterator j2 = wu.begin(); j2 != wu.end(); ++j2)
  1904. {
  1905. s = *j2;
  1906. if (s->writeReady() || s->m_Status == SRTS_CLOSED)
  1907. {
  1908. ws.insert(s->m_SocketID);
  1909. ++count;
  1910. }
  1911. }
  1912. // query exceptions on sockets
  1913. for (vector<CUDTSocket*>::iterator j3 = eu.begin(); j3 != eu.end(); ++j3)
  1914. {
  1915. // check connection request status, not supported now
  1916. }
  1917. if (0 < count)
  1918. break;
  1919. CGlobEvent::waitForEvent();
  1920. } while (timeo > steady_clock::now() - entertime);
  1921. if (readfds)
  1922. *readfds = rs;
  1923. if (writefds)
  1924. *writefds = ws;
  1925. if (exceptfds)
  1926. *exceptfds = es;
  1927. return count;
  1928. }
  1929. int srt::CUDTUnited::selectEx(const vector<SRTSOCKET>& fds,
  1930. vector<SRTSOCKET>* readfds,
  1931. vector<SRTSOCKET>* writefds,
  1932. vector<SRTSOCKET>* exceptfds,
  1933. int64_t msTimeOut)
  1934. {
  1935. const steady_clock::time_point entertime = steady_clock::now();
  1936. const int64_t timeo_us = msTimeOut >= 0 ? msTimeOut * 1000 : -1;
  1937. const steady_clock::duration timeo(microseconds_from(timeo_us));
  1938. // initialize results
  1939. int count = 0;
  1940. if (readfds)
  1941. readfds->clear();
  1942. if (writefds)
  1943. writefds->clear();
  1944. if (exceptfds)
  1945. exceptfds->clear();
  1946. do
  1947. {
  1948. for (vector<SRTSOCKET>::const_iterator i = fds.begin(); i != fds.end(); ++i)
  1949. {
  1950. CUDTSocket* s = locateSocket(*i);
  1951. if ((!s) || s->core().m_bBroken || (s->m_Status == SRTS_CLOSED))
  1952. {
  1953. if (exceptfds)
  1954. {
  1955. exceptfds->push_back(*i);
  1956. ++count;
  1957. }
  1958. continue;
  1959. }
  1960. if (readfds)
  1961. {
  1962. if ((s->core().m_bConnected && s->core().m_pRcvBuffer->isRcvDataReady()) ||
  1963. (s->core().m_bListening && (s->m_QueuedSockets.size() > 0)))
  1964. {
  1965. readfds->push_back(s->m_SocketID);
  1966. ++count;
  1967. }
  1968. }
  1969. if (writefds)
  1970. {
  1971. if (s->core().m_bConnected &&
  1972. (s->core().m_pSndBuffer->getCurrBufSize() < s->core().m_config.iSndBufSize))
  1973. {
  1974. writefds->push_back(s->m_SocketID);
  1975. ++count;
  1976. }
  1977. }
  1978. }
  1979. if (count > 0)
  1980. break;
  1981. CGlobEvent::waitForEvent();
  1982. } while (timeo > steady_clock::now() - entertime);
  1983. return count;
  1984. }
  1985. int srt::CUDTUnited::epoll_create()
  1986. {
  1987. return m_EPoll.create();
  1988. }
  1989. int srt::CUDTUnited::epoll_clear_usocks(int eid)
  1990. {
  1991. return m_EPoll.clear_usocks(eid);
  1992. }
  1993. int srt::CUDTUnited::epoll_add_usock(const int eid, const SRTSOCKET u, const int* events)
  1994. {
  1995. int ret = -1;
  1996. #if ENABLE_BONDING
  1997. if (u & SRTGROUP_MASK)
  1998. {
  1999. GroupKeeper k(*this, u, ERH_THROW);
  2000. ret = m_EPoll.update_usock(eid, u, events);
  2001. k.group->addEPoll(eid);
  2002. return 0;
  2003. }
  2004. #endif
  2005. CUDTSocket* s = locateSocket(u);
  2006. if (s)
  2007. {
  2008. ret = epoll_add_usock_INTERNAL(eid, s, events);
  2009. }
  2010. else
  2011. {
  2012. throw CUDTException(MJ_NOTSUP, MN_SIDINVAL);
  2013. }
  2014. return ret;
  2015. }
  2016. // NOTE: WILL LOCK (serially):
  2017. // - CEPoll::m_EPollLock
  2018. // - CUDT::m_RecvLock
  2019. int srt::CUDTUnited::epoll_add_usock_INTERNAL(const int eid, CUDTSocket* s, const int* events)
  2020. {
  2021. int ret = m_EPoll.update_usock(eid, s->m_SocketID, events);
  2022. s->core().addEPoll(eid);
  2023. return ret;
  2024. }
  2025. int srt::CUDTUnited::epoll_add_ssock(const int eid, const SYSSOCKET s, const int* events)
  2026. {
  2027. return m_EPoll.add_ssock(eid, s, events);
  2028. }
  2029. int srt::CUDTUnited::epoll_update_ssock(const int eid, const SYSSOCKET s, const int* events)
  2030. {
  2031. return m_EPoll.update_ssock(eid, s, events);
  2032. }
  2033. template <class EntityType>
  2034. int srt::CUDTUnited::epoll_remove_entity(const int eid, EntityType* ent)
  2035. {
  2036. // XXX Not sure if this is anyhow necessary because setting readiness
  2037. // to false doesn't actually trigger any action. Further research needed.
  2038. HLOGC(ealog.Debug, log << "epoll_remove_usock: CLEARING readiness on E" << eid << " of @" << ent->id());
  2039. ent->removeEPollEvents(eid);
  2040. // First remove the EID from the subscribed in the socket so that
  2041. // a possible call to update_events:
  2042. // - if happens before this call, can find the epoll bit update possible
  2043. // - if happens after this call, will not strike this EID
  2044. HLOGC(ealog.Debug, log << "epoll_remove_usock: REMOVING E" << eid << " from back-subscirbers in @" << ent->id());
  2045. ent->removeEPollID(eid);
  2046. HLOGC(ealog.Debug, log << "epoll_remove_usock: CLEARING subscription on E" << eid << " of @" << ent->id());
  2047. int no_events = 0;
  2048. int ret = m_EPoll.update_usock(eid, ent->id(), &no_events);
  2049. return ret;
  2050. }
  2051. // Needed internal access!
  2052. int srt::CUDTUnited::epoll_remove_socket_INTERNAL(const int eid, CUDTSocket* s)
  2053. {
  2054. return epoll_remove_entity(eid, &s->core());
  2055. }
  2056. #if ENABLE_BONDING
  2057. int srt::CUDTUnited::epoll_remove_group_INTERNAL(const int eid, CUDTGroup* g)
  2058. {
  2059. return epoll_remove_entity(eid, g);
  2060. }
  2061. #endif
  2062. int srt::CUDTUnited::epoll_remove_usock(const int eid, const SRTSOCKET u)
  2063. {
  2064. CUDTSocket* s = 0;
  2065. #if ENABLE_BONDING
  2066. CUDTGroup* g = 0;
  2067. if (u & SRTGROUP_MASK)
  2068. {
  2069. GroupKeeper k(*this, u, ERH_THROW);
  2070. g = k.group;
  2071. return epoll_remove_entity(eid, g);
  2072. }
  2073. else
  2074. #endif
  2075. {
  2076. s = locateSocket(u);
  2077. if (s)
  2078. return epoll_remove_entity(eid, &s->core());
  2079. }
  2080. LOGC(ealog.Error,
  2081. log << "remove_usock: @" << u << " not found as either socket or group. Removing only from epoll system.");
  2082. int no_events = 0;
  2083. return m_EPoll.update_usock(eid, u, &no_events);
  2084. }
  2085. int srt::CUDTUnited::epoll_remove_ssock(const int eid, const SYSSOCKET s)
  2086. {
  2087. return m_EPoll.remove_ssock(eid, s);
  2088. }
  2089. int srt::CUDTUnited::epoll_uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut)
  2090. {
  2091. return m_EPoll.uwait(eid, fdsSet, fdsSize, msTimeOut);
  2092. }
  2093. int32_t srt::CUDTUnited::epoll_set(int eid, int32_t flags)
  2094. {
  2095. return m_EPoll.setflags(eid, flags);
  2096. }
  2097. int srt::CUDTUnited::epoll_release(const int eid)
  2098. {
  2099. return m_EPoll.release(eid);
  2100. }
  2101. srt::CUDTSocket* srt::CUDTUnited::locateSocket(const SRTSOCKET u, ErrorHandling erh)
  2102. {
  2103. ScopedLock cg(m_GlobControlLock);
  2104. CUDTSocket* s = locateSocket_LOCKED(u);
  2105. if (!s)
  2106. {
  2107. if (erh == ERH_RETURN)
  2108. return NULL;
  2109. throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
  2110. }
  2111. return s;
  2112. }
  2113. // [[using locked(m_GlobControlLock)]];
  2114. srt::CUDTSocket* srt::CUDTUnited::locateSocket_LOCKED(SRTSOCKET u)
  2115. {
  2116. sockets_t::iterator i = m_Sockets.find(u);
  2117. if ((i == m_Sockets.end()) || (i->second->m_Status == SRTS_CLOSED))
  2118. {
  2119. return NULL;
  2120. }
  2121. return i->second;
  2122. }
  2123. #if ENABLE_BONDING
  2124. srt::CUDTGroup* srt::CUDTUnited::locateAcquireGroup(SRTSOCKET u, ErrorHandling erh)
  2125. {
  2126. ScopedLock cg(m_GlobControlLock);
  2127. const groups_t::iterator i = m_Groups.find(u);
  2128. if (i == m_Groups.end())
  2129. {
  2130. if (erh == ERH_THROW)
  2131. throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
  2132. return NULL;
  2133. }
  2134. ScopedLock cgroup(*i->second->exp_groupLock());
  2135. i->second->apiAcquire();
  2136. return i->second;
  2137. }
  2138. srt::CUDTGroup* srt::CUDTUnited::acquireSocketsGroup(CUDTSocket* s)
  2139. {
  2140. ScopedLock cg(m_GlobControlLock);
  2141. CUDTGroup* g = s->m_GroupOf;
  2142. if (!g)
  2143. return NULL;
  2144. // With m_GlobControlLock locked, we are sure the group
  2145. // still exists, if it wasn't removed from this socket.
  2146. g->apiAcquire();
  2147. return g;
  2148. }
  2149. #endif
  2150. srt::CUDTSocket* srt::CUDTUnited::locatePeer(const sockaddr_any& peer, const SRTSOCKET id, int32_t isn)
  2151. {
  2152. ScopedLock cg(m_GlobControlLock);
  2153. map<int64_t, set<SRTSOCKET> >::iterator i = m_PeerRec.find(CUDTSocket::getPeerSpec(id, isn));
  2154. if (i == m_PeerRec.end())
  2155. return NULL;
  2156. for (set<SRTSOCKET>::iterator j = i->second.begin(); j != i->second.end(); ++j)
  2157. {
  2158. sockets_t::iterator k = m_Sockets.find(*j);
  2159. // this socket might have been closed and moved m_ClosedSockets
  2160. if (k == m_Sockets.end())
  2161. continue;
  2162. if (k->second->m_PeerAddr == peer)
  2163. {
  2164. return k->second;
  2165. }
  2166. }
  2167. return NULL;
  2168. }
  2169. void srt::CUDTUnited::checkBrokenSockets()
  2170. {
  2171. ScopedLock cg(m_GlobControlLock);
  2172. #if ENABLE_BONDING
  2173. vector<SRTSOCKET> delgids;
  2174. for (groups_t::iterator i = m_ClosedGroups.begin(); i != m_ClosedGroups.end(); ++i)
  2175. {
  2176. // isStillBusy requires lock on the group, so only after an API
  2177. // function that uses it returns, and so clears the busy flag,
  2178. // a new API function won't be called anyway until it can acquire
  2179. // GlobControlLock, and all functions that have already seen this
  2180. // group as closing will not continue with the API and return.
  2181. // If we caught some API function still using the closed group,
  2182. // it's not going to wait, will be checked next time.
  2183. if (i->second->isStillBusy())
  2184. continue;
  2185. delgids.push_back(i->first);
  2186. delete i->second;
  2187. i->second = NULL; // just for a case, avoid a dangling pointer
  2188. }
  2189. for (vector<SRTSOCKET>::iterator di = delgids.begin(); di != delgids.end(); ++di)
  2190. {
  2191. m_ClosedGroups.erase(*di);
  2192. }
  2193. #endif
  2194. // set of sockets To Be Closed and To Be Removed
  2195. vector<SRTSOCKET> tbc;
  2196. vector<SRTSOCKET> tbr;
  2197. for (sockets_t::iterator i = m_Sockets.begin(); i != m_Sockets.end(); ++i)
  2198. {
  2199. CUDTSocket* s = i->second;
  2200. if (!s->core().m_bBroken)
  2201. continue;
  2202. if (s->m_Status == SRTS_LISTENING)
  2203. {
  2204. const steady_clock::duration elapsed = steady_clock::now() - s->m_tsClosureTimeStamp;
  2205. // A listening socket should wait an extra 3 seconds
  2206. // in case a client is connecting.
  2207. if (elapsed < milliseconds_from(CUDT::COMM_CLOSE_BROKEN_LISTENER_TIMEOUT_MS))
  2208. continue;
  2209. }
  2210. else if ((s->core().m_pRcvBuffer != NULL)
  2211. // FIXED: calling isRcvDataAvailable() just to get the information
  2212. // whether there are any data waiting in the buffer,
  2213. // NOT WHETHER THEY ARE ALSO READY TO PLAY at the time when
  2214. // this function is called (isRcvDataReady also checks if the
  2215. // available data is "ready to play").
  2216. && s->core().m_pRcvBuffer->hasAvailablePackets())
  2217. {
  2218. const int bc = s->core().m_iBrokenCounter.load();
  2219. if (bc > 0)
  2220. {
  2221. // if there is still data in the receiver buffer, wait longer
  2222. s->core().m_iBrokenCounter.store(bc - 1);
  2223. continue;
  2224. }
  2225. }
  2226. #if ENABLE_BONDING
  2227. if (s->m_GroupOf)
  2228. {
  2229. HLOGC(smlog.Debug,
  2230. log << "@" << s->m_SocketID << " IS MEMBER OF $" << s->m_GroupOf->id() << " - REMOVING FROM GROUP");
  2231. s->removeFromGroup(true);
  2232. }
  2233. #endif
  2234. HLOGC(smlog.Debug, log << "checkBrokenSockets: moving BROKEN socket to CLOSED: @" << i->first);
  2235. // close broken connections and start removal timer
  2236. s->setClosed();
  2237. tbc.push_back(i->first);
  2238. m_ClosedSockets[i->first] = s;
  2239. // remove from listener's queue
  2240. sockets_t::iterator ls = m_Sockets.find(s->m_ListenSocket);
  2241. if (ls == m_Sockets.end())
  2242. {
  2243. ls = m_ClosedSockets.find(s->m_ListenSocket);
  2244. if (ls == m_ClosedSockets.end())
  2245. continue;
  2246. }
  2247. enterCS(ls->second->m_AcceptLock);
  2248. ls->second->m_QueuedSockets.erase(s->m_SocketID);
  2249. leaveCS(ls->second->m_AcceptLock);
  2250. }
  2251. for (sockets_t::iterator j = m_ClosedSockets.begin(); j != m_ClosedSockets.end(); ++j)
  2252. {
  2253. // HLOGC(smlog.Debug, log << "checking CLOSED socket: " << j->first);
  2254. if (!is_zero(j->second->core().m_tsLingerExpiration))
  2255. {
  2256. // asynchronous close:
  2257. if ((!j->second->core().m_pSndBuffer) || (0 == j->second->core().m_pSndBuffer->getCurrBufSize()) ||
  2258. (j->second->core().m_tsLingerExpiration <= steady_clock::now()))
  2259. {
  2260. HLOGC(smlog.Debug, log << "checkBrokenSockets: marking CLOSED qualified @" << j->second->m_SocketID);
  2261. j->second->core().m_tsLingerExpiration = steady_clock::time_point();
  2262. j->second->core().m_bClosing = true;
  2263. j->second->m_tsClosureTimeStamp = steady_clock::now();
  2264. }
  2265. }
  2266. // timeout 1 second to destroy a socket AND it has been removed from
  2267. // RcvUList
  2268. const steady_clock::time_point now = steady_clock::now();
  2269. const steady_clock::duration closed_ago = now - j->second->m_tsClosureTimeStamp;
  2270. if (closed_ago > seconds_from(1))
  2271. {
  2272. CRNode* rnode = j->second->core().m_pRNode;
  2273. if (!rnode || !rnode->m_bOnList)
  2274. {
  2275. HLOGC(smlog.Debug,
  2276. log << "checkBrokenSockets: @" << j->second->m_SocketID << " closed "
  2277. << FormatDuration(closed_ago) << " ago and removed from RcvQ - will remove");
  2278. // HLOGC(smlog.Debug, log << "will unref socket: " << j->first);
  2279. tbr.push_back(j->first);
  2280. }
  2281. }
  2282. }
  2283. // move closed sockets to the ClosedSockets structure
  2284. for (vector<SRTSOCKET>::iterator k = tbc.begin(); k != tbc.end(); ++k)
  2285. m_Sockets.erase(*k);
  2286. // remove those timeout sockets
  2287. for (vector<SRTSOCKET>::iterator l = tbr.begin(); l != tbr.end(); ++l)
  2288. removeSocket(*l);
  2289. HLOGC(smlog.Debug, log << "checkBrokenSockets: after removal: m_ClosedSockets.size()=" << m_ClosedSockets.size());
  2290. }
  2291. // [[using locked(m_GlobControlLock)]]
  2292. void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
  2293. {
  2294. sockets_t::iterator i = m_ClosedSockets.find(u);
  2295. // invalid socket ID
  2296. if (i == m_ClosedSockets.end())
  2297. return;
  2298. CUDTSocket* const s = i->second;
  2299. // The socket may be in the trashcan now, but could
  2300. // still be under processing in the sender/receiver worker
  2301. // threads. If that's the case, SKIP IT THIS TIME. The
  2302. // socket will be checked next time the GC rollover starts.
  2303. CSNode* sn = s->core().m_pSNode;
  2304. if (sn && sn->m_iHeapLoc != -1)
  2305. return;
  2306. CRNode* rn = s->core().m_pRNode;
  2307. if (rn && rn->m_bOnList)
  2308. return;
  2309. #if ENABLE_BONDING
  2310. if (s->m_GroupOf)
  2311. {
  2312. HLOGC(smlog.Debug,
  2313. log << "@" << s->m_SocketID << " IS MEMBER OF $" << s->m_GroupOf->id() << " - REMOVING FROM GROUP");
  2314. s->removeFromGroup(true);
  2315. }
  2316. #endif
  2317. // decrease multiplexer reference count, and remove it if necessary
  2318. const int mid = s->m_iMuxID;
  2319. {
  2320. ScopedLock cg(s->m_AcceptLock);
  2321. // if it is a listener, close all un-accepted sockets in its queue
  2322. // and remove them later
  2323. for (set<SRTSOCKET>::iterator q = s->m_QueuedSockets.begin(); q != s->m_QueuedSockets.end(); ++q)
  2324. {
  2325. sockets_t::iterator si = m_Sockets.find(*q);
  2326. if (si == m_Sockets.end())
  2327. {
  2328. // gone in the meantime
  2329. LOGC(smlog.Error,
  2330. log << "removeSocket: IPE? socket @" << (*q) << " being queued for listener socket @"
  2331. << s->m_SocketID << " is GONE in the meantime ???");
  2332. continue;
  2333. }
  2334. CUDTSocket* as = si->second;
  2335. as->breakSocket_LOCKED();
  2336. m_ClosedSockets[*q] = as;
  2337. m_Sockets.erase(*q);
  2338. }
  2339. }
  2340. // remove from peer rec
  2341. map<int64_t, set<SRTSOCKET> >::iterator j = m_PeerRec.find(s->getPeerSpec());
  2342. if (j != m_PeerRec.end())
  2343. {
  2344. j->second.erase(u);
  2345. if (j->second.empty())
  2346. m_PeerRec.erase(j);
  2347. }
  2348. /*
  2349. * Socket may be deleted while still having ePoll events set that would
  2350. * remains forever causing epoll_wait to unblock continuously for inexistent
  2351. * sockets. Get rid of all events for this socket.
  2352. */
  2353. m_EPoll.update_events(u, s->core().m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR, false);
  2354. // delete this one
  2355. m_ClosedSockets.erase(i);
  2356. HLOGC(smlog.Debug, log << "GC/removeSocket: closing associated UDT @" << u);
  2357. s->core().closeInternal();
  2358. HLOGC(smlog.Debug, log << "GC/removeSocket: DELETING SOCKET @" << u);
  2359. delete s;
  2360. HLOGC(smlog.Debug, log << "GC/removeSocket: socket @" << u << " DELETED. Checking muxer.");
  2361. if (mid == -1)
  2362. {
  2363. HLOGC(smlog.Debug, log << "GC/removeSocket: no muxer found, finishing.");
  2364. return;
  2365. }
  2366. map<int, CMultiplexer>::iterator m;
  2367. m = m_mMultiplexer.find(mid);
  2368. if (m == m_mMultiplexer.end())
  2369. {
  2370. LOGC(smlog.Fatal, log << "IPE: For socket @" << u << " MUXER id=" << mid << " NOT FOUND!");
  2371. return;
  2372. }
  2373. CMultiplexer& mx = m->second;
  2374. mx.m_iRefCount--;
  2375. HLOGC(smlog.Debug, log << "unrefing underlying muxer " << mid << " for @" << u << ", ref=" << mx.m_iRefCount);
  2376. if (0 == mx.m_iRefCount)
  2377. {
  2378. HLOGC(smlog.Debug,
  2379. log << "MUXER id=" << mid << " lost last socket @" << u << " - deleting muxer bound to port "
  2380. << mx.m_pChannel->bindAddressAny().hport());
  2381. // The channel has no access to the queues and
  2382. // it looks like the multiplexer is the master of all of them.
  2383. // The queues must be silenced before closing the channel
  2384. // because this will cause error to be returned in any operation
  2385. // being currently done in the queues, if any.
  2386. mx.m_pSndQueue->setClosing();
  2387. mx.m_pRcvQueue->setClosing();
  2388. mx.destroy();
  2389. m_mMultiplexer.erase(m);
  2390. }
  2391. }
  2392. void srt::CUDTUnited::configureMuxer(CMultiplexer& w_m, const CUDTSocket* s, int af)
  2393. {
  2394. w_m.m_mcfg = s->core().m_config;
  2395. w_m.m_iIPversion = af;
  2396. w_m.m_iRefCount = 1;
  2397. w_m.m_iID = s->m_SocketID;
  2398. }
  2399. uint16_t srt::CUDTUnited::installMuxer(CUDTSocket* w_s, CMultiplexer& fw_sm)
  2400. {
  2401. w_s->core().m_pSndQueue = fw_sm.m_pSndQueue;
  2402. w_s->core().m_pRcvQueue = fw_sm.m_pRcvQueue;
  2403. w_s->m_iMuxID = fw_sm.m_iID;
  2404. sockaddr_any sa;
  2405. fw_sm.m_pChannel->getSockAddr((sa));
  2406. w_s->m_SelfAddr = sa; // Will be also completed later, but here it's needed for later checks
  2407. return sa.hport();
  2408. }
  2409. bool srt::CUDTUnited::inet6SettingsCompat(const sockaddr_any& muxaddr, const CSrtMuxerConfig& cfgMuxer,
  2410. const sockaddr_any& reqaddr, const CSrtMuxerConfig& cfgSocket)
  2411. {
  2412. if (muxaddr.family() != AF_INET6)
  2413. return true; // Don't check - the family has been checked already
  2414. if (reqaddr.isany())
  2415. {
  2416. if (cfgSocket.iIpV6Only == -1) // Treat as "adaptive"
  2417. return true;
  2418. // If set explicitly, then it must be equal to the one of found muxer.
  2419. return cfgSocket.iIpV6Only == cfgMuxer.iIpV6Only;
  2420. }
  2421. // If binding to the certain IPv6 address, then this setting doesn't matter.
  2422. return true;
  2423. }
  2424. bool srt::CUDTUnited::channelSettingsMatch(const CSrtMuxerConfig& cfgMuxer, const CSrtConfig& cfgSocket)
  2425. {
  2426. if (!cfgMuxer.bReuseAddr)
  2427. {
  2428. HLOGP(smlog.Debug, "channelSettingsMatch: fail: the multiplexer is not reusable");
  2429. return false;
  2430. }
  2431. if (cfgMuxer.isCompatWith(cfgSocket))
  2432. return true;
  2433. HLOGP(smlog.Debug, "channelSettingsMatch: fail: some options have different values");
  2434. return false;
  2435. }
  2436. void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& reqaddr, const UDPSOCKET* udpsock /*[[nullable]]*/)
  2437. {
  2438. ScopedLock cg(m_GlobControlLock);
  2439. // If udpsock is provided, then this socket will be simply
  2440. // taken for binding as a good deal. It would be nice to make
  2441. // a sanity check to see if this UDP socket isn't already installed
  2442. // in some multiplexer, but we state this UDP socket isn't accessible
  2443. // anyway so this wouldn't be possible.
  2444. if (!udpsock)
  2445. {
  2446. // If not, we need to see if there exist already a multiplexer bound
  2447. // to the same endpoint.
  2448. const int port = reqaddr.hport();
  2449. const CSrtConfig& cfgSocket = s->core().m_config;
  2450. // This loop is going to check the attempted binding of
  2451. // address:port and socket settings against every existing
  2452. // multiplexer. Possible results of the check are:
  2453. // 1. MATCH: identical address - reuse it and quit.
  2454. // 2. CONFLICT: report error: the binding partially overlaps
  2455. // so it neither can be reused nor is free to bind.
  2456. // 3. PASS: different and not overlapping - continue searching.
  2457. // In this function the convention is:
  2458. // MATCH: do nothing and proceed with binding reusage, THEN break.
  2459. // CONFLICT: throw an exception.
  2460. // PASS: use 'continue' to pass to the next element.
  2461. bool reuse_attempt = false;
  2462. for (map<int, CMultiplexer>::iterator i = m_mMultiplexer.begin(); i != m_mMultiplexer.end(); ++i)
  2463. {
  2464. CMultiplexer& m = i->second;
  2465. // First, we need to find a multiplexer with the same port.
  2466. if (m.m_iPort != port)
  2467. {
  2468. HLOGC(smlog.Debug,
  2469. log << "bind: muxer @" << m.m_iID << " found, but for port " << m.m_iPort
  2470. << " (requested port: " << port << ")");
  2471. continue;
  2472. }
  2473. // If this is bound to the wildcard address, it can be reused if:
  2474. // - reqaddr is also a wildcard
  2475. // - channel settings match
  2476. // Otherwise it's a conflict.
  2477. sockaddr_any mux_addr;
  2478. m.m_pChannel->getSockAddr((mux_addr));
  2479. HLOGC(smlog.Debug,
  2480. log << "bind: Found existing muxer @" << m.m_iID << " : " << mux_addr.str() << " - check against "
  2481. << reqaddr.str());
  2482. if (mux_addr.isany())
  2483. {
  2484. if (mux_addr.family() == AF_INET6)
  2485. {
  2486. // With IPv6 we need to research two possibilities:
  2487. // iIpV6Only == 1 -> This means that it binds only :: wildcard, but not 0.0.0.0
  2488. // iIpV6Only == 0 -> This means that it binds both :: and 0.0.0.0.
  2489. // iIpV6Only == -1 -> Hard to say what to do, but treat it as a potential conflict in any doubtful case.
  2490. if (m.m_mcfg.iIpV6Only == 1)
  2491. {
  2492. // PASS IF: candidate is IPv4, no matter the address
  2493. // MATCH IF: candidate is IPv6 with only=1
  2494. // CONFLICT IF: candidate is IPv6 with only != 1 or IPv6 non-wildcard.
  2495. if (reqaddr.family() == AF_INET)
  2496. {
  2497. HLOGC(smlog.Debug, log << "bind: muxer @" << m.m_iID
  2498. << " is :: v6only - requested IPv4 ANY is NOT IN THE WAY. Searching on.");
  2499. continue;
  2500. }
  2501. // Candidate is AF_INET6
  2502. if (cfgSocket.iIpV6Only != 1 || !reqaddr.isany())
  2503. {
  2504. // CONFLICT:
  2505. // 1. attempting to make a wildcard IPv4 + IPv6
  2506. // while the multiplexer for wildcard IPv6 exists.
  2507. // 2. If binding to a given address, it conflicts with the wildcard
  2508. LOGC(smlog.Error,
  2509. log << "bind: Address: " << reqaddr.str()
  2510. << " conflicts with existing IPv6 wildcard binding: " << mux_addr.str());
  2511. throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
  2512. }
  2513. // Otherwise, MATCH.
  2514. }
  2515. else if (m.m_mcfg.iIpV6Only == 0)
  2516. {
  2517. // Muxer's address is a wildcard for :: and 0.0.0.0 at once.
  2518. // This way only IPv6 wildcard with v6only=0 is a perfect match and everything
  2519. // else is a conflict.
  2520. if (reqaddr.family() == AF_INET6 && reqaddr.isany() && cfgSocket.iIpV6Only == 0)
  2521. {
  2522. // MATCH
  2523. }
  2524. else
  2525. {
  2526. // CONFLICT: attempting to make a wildcard IPv4 + IPv6 while
  2527. // the multiplexer for wildcard IPv6 exists.
  2528. LOGC(smlog.Error,
  2529. log << "bind: Address: " << reqaddr.str() << " v6only=" << cfgSocket.iIpV6Only
  2530. << " conflicts with existing IPv6 + IPv4 wildcard binding: " << mux_addr.str());
  2531. throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
  2532. }
  2533. }
  2534. else // Case -1, by unknown reason. Accept only with -1 setting, others are conflict.
  2535. {
  2536. if (reqaddr.family() == AF_INET6 && reqaddr.isany() && cfgSocket.iIpV6Only == -1)
  2537. {
  2538. // MATCH
  2539. }
  2540. else
  2541. {
  2542. LOGC(smlog.Error,
  2543. log << "bind: Address: " << reqaddr.str() << " v6only=" << cfgSocket.iIpV6Only
  2544. << " conflicts with existing IPv6 v6only=unknown wildcard binding: " << mux_addr.str());
  2545. throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
  2546. }
  2547. }
  2548. }
  2549. else // muxer is IPv4 wildcard
  2550. {
  2551. // Then only IPv4 wildcard is a match and:
  2552. // - IPv6 with only=true is PASS (not a conflict)
  2553. // - IPv6 with only=false is CONFLICT
  2554. // - IPv6 with only=undefined is CONFLICT
  2555. // REASON: we need to make a potential conflict a conflict as there will be
  2556. // no bind() call to check if this wouldn't be a conflict in result. If you want
  2557. // to have a binding to IPv6 that should avoid conflict with IPv4 wildcard binding,
  2558. // then SRTO_IPV6ONLY option must be explicitly set before binding.
  2559. // Also:
  2560. if (reqaddr.family() == AF_INET)
  2561. {
  2562. if (reqaddr.isany())
  2563. {
  2564. // MATCH
  2565. }
  2566. else
  2567. {
  2568. LOGC(smlog.Error,
  2569. log << "bind: Address: " << reqaddr.str()
  2570. << " conflicts with existing IPv4 wildcard binding: " << mux_addr.str());
  2571. throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
  2572. }
  2573. }
  2574. else // AF_INET6
  2575. {
  2576. if (cfgSocket.iIpV6Only == 1 || !reqaddr.isany())
  2577. {
  2578. // PASS
  2579. HLOGC(smlog.Debug, log << "bind: muxer @" << m.m_iID
  2580. << " is IPv4 wildcard - requested " << reqaddr.str() << " v6only=" << cfgSocket.iIpV6Only
  2581. << " is NOT IN THE WAY. Searching on.");
  2582. continue;
  2583. }
  2584. else
  2585. {
  2586. LOGC(smlog.Error,
  2587. log << "bind: Address: " << reqaddr.str() << " v6only=" << cfgSocket.iIpV6Only
  2588. << " conflicts with existing IPv4 wildcard binding: " << mux_addr.str());
  2589. throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
  2590. }
  2591. }
  2592. }
  2593. reuse_attempt = true;
  2594. HLOGC(smlog.Debug, log << "bind: wildcard address - multiplexer reusable");
  2595. }
  2596. // Muxer address is NOT a wildcard, so conflicts only with WILDCARD of the same type
  2597. else if (reqaddr.isany() && reqaddr.family() == mux_addr.family())
  2598. {
  2599. LOGC(smlog.Error,
  2600. log << "bind: Wildcard address: " << reqaddr.str()
  2601. << " conflicts with existting IP binding: " << mux_addr.str());
  2602. throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
  2603. }
  2604. // If this is bound to a certain address, AND:
  2605. else if (mux_addr.equal_address(reqaddr))
  2606. {
  2607. // - the address is the same as reqaddr
  2608. reuse_attempt = true;
  2609. HLOGC(smlog.Debug, log << "bind: same IP address - multiplexer reusable");
  2610. }
  2611. else
  2612. {
  2613. HLOGC(smlog.Debug, log << "bind: IP addresses differ - ALLOWED to create a new multiplexer");
  2614. continue;
  2615. }
  2616. // Otherwise:
  2617. // - the address is different than reqaddr
  2618. // - the address can't be reused, but this can go on with new one.
  2619. // If this is a reusage attempt:
  2620. if (reuse_attempt)
  2621. {
  2622. // - if the channel settings match, it can be reused
  2623. if (channelSettingsMatch(m.m_mcfg, cfgSocket) && inet6SettingsCompat(mux_addr, m.m_mcfg, reqaddr, cfgSocket))
  2624. {
  2625. HLOGC(smlog.Debug, log << "bind: reusing multiplexer for port " << port);
  2626. // reuse the existing multiplexer
  2627. ++i->second.m_iRefCount;
  2628. installMuxer((s), (i->second));
  2629. return;
  2630. }
  2631. else
  2632. {
  2633. // - if not, it's a conflict
  2634. LOGC(smlog.Error,
  2635. log << "bind: Address: " << reqaddr.str() << " conflicts with binding: " << mux_addr.str()
  2636. << " due to channel settings");
  2637. throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
  2638. }
  2639. }
  2640. // If not, proceed to the next one, and when there are no reusage
  2641. // candidates, proceed with creating a new multiplexer.
  2642. // Note that a binding to a different IP address is not treated
  2643. // as a candidate for either reusage or conflict.
  2644. LOGC(smlog.Fatal, log << "SHOULD NOT GET HERE!!!");
  2645. SRT_ASSERT(false);
  2646. }
  2647. }
  2648. // a new multiplexer is needed
  2649. CMultiplexer m;
  2650. configureMuxer((m), s, reqaddr.family());
  2651. try
  2652. {
  2653. m.m_pChannel = new CChannel();
  2654. m.m_pChannel->setConfig(m.m_mcfg);
  2655. if (udpsock)
  2656. {
  2657. // In this case, reqaddr contains the address
  2658. // that has been extracted already from the
  2659. // given socket
  2660. m.m_pChannel->attach(*udpsock, reqaddr);
  2661. }
  2662. else if (reqaddr.empty())
  2663. {
  2664. // The case of previously used case of a NULL address.
  2665. // This here is used to pass family only, in this case
  2666. // just automatically bind to the "0" address to autoselect
  2667. // everything.
  2668. m.m_pChannel->open(reqaddr.family());
  2669. }
  2670. else
  2671. {
  2672. // If at least the IP address is specified, then bind to that
  2673. // address, but still possibly autoselect the outgoing port, if the
  2674. // port was specified as 0.
  2675. m.m_pChannel->open(reqaddr);
  2676. }
  2677. // AFTER OPENING, check the matter of IPV6_V6ONLY option,
  2678. // as it decides about the fact that the occupied binding address
  2679. // in case of wildcard is both :: and 0.0.0.0, or only ::.
  2680. if (reqaddr.family() == AF_INET6 && m.m_mcfg.iIpV6Only == -1)
  2681. {
  2682. // XXX We don't know how probable it is to get the error here
  2683. // and resulting -1 value. As a fallback for that case, the value -1
  2684. // is honored here, just all side-bindings for other sockes will be
  2685. // rejected as a potential conflict, even if binding would be accepted
  2686. // in these circumstances. Only a perfect match in case of potential
  2687. // overlapping will be accepted on the same port.
  2688. m.m_mcfg.iIpV6Only = m.m_pChannel->sockopt(IPPROTO_IPV6, IPV6_V6ONLY, -1);
  2689. }
  2690. m.m_pTimer = new CTimer;
  2691. m.m_pSndQueue = new CSndQueue;
  2692. m.m_pSndQueue->init(m.m_pChannel, m.m_pTimer);
  2693. m.m_pRcvQueue = new CRcvQueue;
  2694. m.m_pRcvQueue->init(128, s->core().maxPayloadSize(), m.m_iIPversion, 1024, m.m_pChannel, m.m_pTimer);
  2695. // Rewrite the port here, as it might be only known upon return
  2696. // from CChannel::open.
  2697. m.m_iPort = installMuxer((s), m);
  2698. m_mMultiplexer[m.m_iID] = m;
  2699. }
  2700. catch (const CUDTException&)
  2701. {
  2702. m.destroy();
  2703. throw;
  2704. }
  2705. catch (...)
  2706. {
  2707. m.destroy();
  2708. throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
  2709. }
  2710. HLOGC(smlog.Debug, log << "bind: creating new multiplexer for port " << m.m_iPort);
  2711. }
  2712. // This function is going to find a multiplexer for the port contained
  2713. // in the 'ls' listening socket. The multiplexer must exist when the listener
  2714. // exists, otherwise the dispatching procedure wouldn't even call this
  2715. // function. By historical reasons there's also a fallback for a case when the
  2716. // multiplexer wasn't found by id, the search by port number continues.
  2717. bool srt::CUDTUnited::updateListenerMux(CUDTSocket* s, const CUDTSocket* ls)
  2718. {
  2719. ScopedLock cg(m_GlobControlLock);
  2720. const int port = ls->m_SelfAddr.hport();
  2721. HLOGC(smlog.Debug,
  2722. log << "updateListenerMux: finding muxer of listener socket @" << ls->m_SocketID << " muxid=" << ls->m_iMuxID
  2723. << " bound=" << ls->m_SelfAddr.str() << " FOR @" << s->m_SocketID << " addr=" << s->m_SelfAddr.str()
  2724. << "_->_" << s->m_PeerAddr.str());
  2725. // First thing that should be certain here is that there should exist
  2726. // a muxer with the ID written in the listener socket's mux ID.
  2727. CMultiplexer* mux = map_getp(m_mMultiplexer, ls->m_iMuxID);
  2728. // NOTE:
  2729. // THIS BELOW CODE is only for a highly unlikely situation when the listener
  2730. // socket has been closed in the meantime when the accepted socket is being
  2731. // processed. This procedure is different than updateMux because this time we
  2732. // only want to have a multiplexer socket to be assigned to the accepted socket.
  2733. // It is also unlikely that the listener socket is garbage-collected so fast, so
  2734. // this procedure will most likely find the multiplexer of the zombie listener socket,
  2735. // which no longer accepts new connections (the listener is withdrawn immediately from
  2736. // the port) that wasn't yet completely deleted.
  2737. CMultiplexer* fallback = NULL;
  2738. if (!mux)
  2739. {
  2740. LOGC(smlog.Error, log << "updateListenerMux: IPE? listener muxer not found by ID, trying by port");
  2741. // To be used as first found with different IP version
  2742. // find the listener's address
  2743. for (map<int, CMultiplexer>::iterator i = m_mMultiplexer.begin(); i != m_mMultiplexer.end(); ++i)
  2744. {
  2745. CMultiplexer& m = i->second;
  2746. #if ENABLE_HEAVY_LOGGING
  2747. ostringstream that_muxer;
  2748. that_muxer << "id=" << m.m_iID << " port=" << m.m_iPort
  2749. << " ip=" << (m.m_iIPversion == AF_INET ? "v4" : "v6");
  2750. #endif
  2751. if (m.m_iPort == port)
  2752. {
  2753. HLOGC(smlog.Debug, log << "updateListenerMux: reusing muxer: " << that_muxer.str());
  2754. if (m.m_iIPversion == s->m_PeerAddr.family())
  2755. {
  2756. mux = &m; // best match
  2757. break;
  2758. }
  2759. else if (m.m_iIPversion == AF_INET6)
  2760. {
  2761. // Allowed fallback case when we only need an accepted socket.
  2762. fallback = &m;
  2763. }
  2764. }
  2765. else
  2766. {
  2767. HLOGC(smlog.Debug, log << "updateListenerMux: SKIPPING muxer: " << that_muxer.str());
  2768. }
  2769. }
  2770. if (!mux && fallback)
  2771. {
  2772. // It is allowed to reuse this multiplexer, but the socket must allow both IPv4 and IPv6
  2773. if (fallback->m_mcfg.iIpV6Only == 0)
  2774. {
  2775. HLOGC(smlog.Warn, log << "updateListenerMux: reusing multiplexer from different family");
  2776. mux = fallback;
  2777. }
  2778. }
  2779. }
  2780. // Checking again because the above procedure could have set it
  2781. if (mux)
  2782. {
  2783. // reuse the existing multiplexer
  2784. ++mux->m_iRefCount;
  2785. s->core().m_pSndQueue = mux->m_pSndQueue;
  2786. s->core().m_pRcvQueue = mux->m_pRcvQueue;
  2787. s->m_iMuxID = mux->m_iID;
  2788. return true;
  2789. }
  2790. return false;
  2791. }
  2792. void* srt::CUDTUnited::garbageCollect(void* p)
  2793. {
  2794. CUDTUnited* self = (CUDTUnited*)p;
  2795. THREAD_STATE_INIT("SRT:GC");
  2796. UniqueLock gclock(self->m_GCStopLock);
  2797. while (!self->m_bClosing)
  2798. {
  2799. INCREMENT_THREAD_ITERATIONS();
  2800. self->checkBrokenSockets();
  2801. HLOGC(inlog.Debug, log << "GC: sleep 1 s");
  2802. self->m_GCStopCond.wait_for(gclock, seconds_from(1));
  2803. }
  2804. // remove all sockets and multiplexers
  2805. HLOGC(inlog.Debug, log << "GC: GLOBAL EXIT - releasing all pending sockets. Acquring control lock...");
  2806. {
  2807. ScopedLock glock(self->m_GlobControlLock);
  2808. for (sockets_t::iterator i = self->m_Sockets.begin(); i != self->m_Sockets.end(); ++i)
  2809. {
  2810. CUDTSocket* s = i->second;
  2811. s->breakSocket_LOCKED();
  2812. #if ENABLE_BONDING
  2813. if (s->m_GroupOf)
  2814. {
  2815. HLOGC(smlog.Debug,
  2816. log << "@" << s->m_SocketID << " IS MEMBER OF $" << s->m_GroupOf->id()
  2817. << " (IPE?) - REMOVING FROM GROUP");
  2818. s->removeFromGroup(false);
  2819. }
  2820. #endif
  2821. self->m_ClosedSockets[i->first] = s;
  2822. // remove from listener's queue
  2823. sockets_t::iterator ls = self->m_Sockets.find(s->m_ListenSocket);
  2824. if (ls == self->m_Sockets.end())
  2825. {
  2826. ls = self->m_ClosedSockets.find(s->m_ListenSocket);
  2827. if (ls == self->m_ClosedSockets.end())
  2828. continue;
  2829. }
  2830. enterCS(ls->second->m_AcceptLock);
  2831. ls->second->m_QueuedSockets.erase(s->m_SocketID);
  2832. leaveCS(ls->second->m_AcceptLock);
  2833. }
  2834. self->m_Sockets.clear();
  2835. for (sockets_t::iterator j = self->m_ClosedSockets.begin(); j != self->m_ClosedSockets.end(); ++j)
  2836. {
  2837. j->second->m_tsClosureTimeStamp = steady_clock::time_point();
  2838. }
  2839. }
  2840. HLOGC(inlog.Debug, log << "GC: GLOBAL EXIT - releasing all CLOSED sockets.");
  2841. while (true)
  2842. {
  2843. self->checkBrokenSockets();
  2844. enterCS(self->m_GlobControlLock);
  2845. bool empty = self->m_ClosedSockets.empty();
  2846. leaveCS(self->m_GlobControlLock);
  2847. if (empty)
  2848. break;
  2849. HLOGC(inlog.Debug, log << "GC: checkBrokenSockets didn't wipe all sockets, repeating after 1s sleep");
  2850. srt::sync::this_thread::sleep_for(milliseconds_from(1));
  2851. }
  2852. THREAD_EXIT();
  2853. return NULL;
  2854. }
  2855. ////////////////////////////////////////////////////////////////////////////////
  2856. int srt::CUDT::startup()
  2857. {
  2858. return uglobal().startup();
  2859. }
  2860. int srt::CUDT::cleanup()
  2861. {
  2862. return uglobal().cleanup();
  2863. }
  2864. SRTSOCKET srt::CUDT::socket()
  2865. {
  2866. if (!uglobal().m_bGCStatus)
  2867. uglobal().startup();
  2868. try
  2869. {
  2870. return uglobal().newSocket();
  2871. }
  2872. catch (const CUDTException& e)
  2873. {
  2874. SetThreadLocalError(e);
  2875. return INVALID_SOCK;
  2876. }
  2877. catch (const bad_alloc&)
  2878. {
  2879. SetThreadLocalError(CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0));
  2880. return INVALID_SOCK;
  2881. }
  2882. catch (const std::exception& ee)
  2883. {
  2884. LOGC(aclog.Fatal, log << "socket: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  2885. SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
  2886. return INVALID_SOCK;
  2887. }
  2888. }
  2889. srt::CUDT::APIError::APIError(const CUDTException& e)
  2890. {
  2891. SetThreadLocalError(e);
  2892. }
  2893. srt::CUDT::APIError::APIError(CodeMajor mj, CodeMinor mn, int syserr)
  2894. {
  2895. SetThreadLocalError(CUDTException(mj, mn, syserr));
  2896. }
  2897. #if ENABLE_BONDING
  2898. // This is an internal function; 'type' should be pre-checked if it has a correct value.
  2899. // This doesn't have argument of GroupType due to header file conflicts.
  2900. // [[using locked(s_UDTUnited.m_GlobControlLock)]]
  2901. srt::CUDTGroup& srt::CUDT::newGroup(const int type)
  2902. {
  2903. const SRTSOCKET id = uglobal().generateSocketID(true);
  2904. // Now map the group
  2905. return uglobal().addGroup(id, SRT_GROUP_TYPE(type)).set_id(id);
  2906. }
  2907. SRTSOCKET srt::CUDT::createGroup(SRT_GROUP_TYPE gt)
  2908. {
  2909. // Doing the same lazy-startup as with srt_create_socket()
  2910. if (!uglobal().m_bGCStatus)
  2911. uglobal().startup();
  2912. try
  2913. {
  2914. srt::sync::ScopedLock globlock(uglobal().m_GlobControlLock);
  2915. return newGroup(gt).id();
  2916. // Note: potentially, after this function exits, the group
  2917. // could be deleted, immediately, from a separate thread (tho
  2918. // unlikely because the other thread would need some handle to
  2919. // keep it). But then, the first call to any API function would
  2920. // return invalid ID error.
  2921. }
  2922. catch (const CUDTException& e)
  2923. {
  2924. return APIError(e);
  2925. }
  2926. catch (...)
  2927. {
  2928. return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
  2929. }
  2930. return SRT_INVALID_SOCK;
  2931. }
  2932. // [[using locked(m_ControlLock)]]
  2933. // [[using locked(CUDT::s_UDTUnited.m_GlobControlLock)]]
  2934. void srt::CUDTSocket::removeFromGroup(bool broken)
  2935. {
  2936. CUDTGroup* g = m_GroupOf;
  2937. if (g)
  2938. {
  2939. // Reset group-related fields immediately. They won't be accessed
  2940. // in the below calls, while the iterator will be invalidated for
  2941. // a short moment between removal from the group container and the end,
  2942. // while the GroupLock would be already taken out. It is safer to reset
  2943. // it to a NULL iterator before removal.
  2944. m_GroupOf = NULL;
  2945. m_GroupMemberData = NULL;
  2946. bool still_have = g->remove(m_SocketID);
  2947. if (broken)
  2948. {
  2949. // Activate the SRT_EPOLL_UPDATE event on the group
  2950. // if it was because of a socket that was earlier connected
  2951. // and became broken. This is not to be sent in case when
  2952. // it is a failure during connection, or the socket was
  2953. // explicitly removed from the group.
  2954. g->activateUpdateEvent(still_have);
  2955. }
  2956. HLOGC(smlog.Debug,
  2957. log << "removeFromGroup: socket @" << m_SocketID << " NO LONGER A MEMBER of $" << g->id() << "; group is "
  2958. << (still_have ? "still ACTIVE" : "now EMPTY"));
  2959. }
  2960. }
  2961. SRTSOCKET srt::CUDT::getGroupOfSocket(SRTSOCKET socket)
  2962. {
  2963. // Lock this for the whole function as we need the group
  2964. // to persist the call.
  2965. ScopedLock glock(uglobal().m_GlobControlLock);
  2966. CUDTSocket* s = uglobal().locateSocket_LOCKED(socket);
  2967. if (!s || !s->m_GroupOf)
  2968. return APIError(MJ_NOTSUP, MN_INVAL, 0);
  2969. return s->m_GroupOf->id();
  2970. }
  2971. int srt::CUDT::getGroupData(SRTSOCKET groupid, SRT_SOCKGROUPDATA* pdata, size_t* psize)
  2972. {
  2973. if ((groupid & SRTGROUP_MASK) == 0 || !psize)
  2974. {
  2975. return APIError(MJ_NOTSUP, MN_INVAL, 0);
  2976. }
  2977. CUDTUnited::GroupKeeper k(uglobal(), groupid, CUDTUnited::ERH_RETURN);
  2978. if (!k.group)
  2979. {
  2980. return APIError(MJ_NOTSUP, MN_INVAL, 0);
  2981. }
  2982. // To get only the size of the group pdata=NULL can be used
  2983. return k.group->getGroupData(pdata, psize);
  2984. }
  2985. #endif
  2986. int srt::CUDT::bind(SRTSOCKET u, const sockaddr* name, int namelen)
  2987. {
  2988. try
  2989. {
  2990. sockaddr_any sa(name, namelen);
  2991. if (sa.len == 0)
  2992. {
  2993. // This happens if the namelen check proved it to be
  2994. // too small for particular family, or that family is
  2995. // not recognized (is none of AF_INET, AF_INET6).
  2996. // This is a user error.
  2997. return APIError(MJ_NOTSUP, MN_INVAL, 0);
  2998. }
  2999. CUDTSocket* s = uglobal().locateSocket(u);
  3000. if (!s)
  3001. return APIError(MJ_NOTSUP, MN_INVAL, 0);
  3002. return uglobal().bind(s, sa);
  3003. }
  3004. catch (const CUDTException& e)
  3005. {
  3006. return APIError(e);
  3007. }
  3008. catch (bad_alloc&)
  3009. {
  3010. return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
  3011. }
  3012. catch (const std::exception& ee)
  3013. {
  3014. LOGC(aclog.Fatal, log << "bind: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3015. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3016. }
  3017. }
  3018. int srt::CUDT::bind(SRTSOCKET u, UDPSOCKET udpsock)
  3019. {
  3020. try
  3021. {
  3022. CUDTSocket* s = uglobal().locateSocket(u);
  3023. if (!s)
  3024. return APIError(MJ_NOTSUP, MN_INVAL, 0);
  3025. return uglobal().bind(s, udpsock);
  3026. }
  3027. catch (const CUDTException& e)
  3028. {
  3029. return APIError(e);
  3030. }
  3031. catch (bad_alloc&)
  3032. {
  3033. return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
  3034. }
  3035. catch (const std::exception& ee)
  3036. {
  3037. LOGC(aclog.Fatal, log << "bind/udp: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3038. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3039. }
  3040. }
  3041. int srt::CUDT::listen(SRTSOCKET u, int backlog)
  3042. {
  3043. try
  3044. {
  3045. return uglobal().listen(u, backlog);
  3046. }
  3047. catch (const CUDTException& e)
  3048. {
  3049. return APIError(e);
  3050. }
  3051. catch (bad_alloc&)
  3052. {
  3053. return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
  3054. }
  3055. catch (const std::exception& ee)
  3056. {
  3057. LOGC(aclog.Fatal, log << "listen: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3058. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3059. }
  3060. }
  3061. SRTSOCKET srt::CUDT::accept_bond(const SRTSOCKET listeners[], int lsize, int64_t msTimeOut)
  3062. {
  3063. try
  3064. {
  3065. return uglobal().accept_bond(listeners, lsize, msTimeOut);
  3066. }
  3067. catch (const CUDTException& e)
  3068. {
  3069. SetThreadLocalError(e);
  3070. return INVALID_SOCK;
  3071. }
  3072. catch (bad_alloc&)
  3073. {
  3074. SetThreadLocalError(CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0));
  3075. return INVALID_SOCK;
  3076. }
  3077. catch (const std::exception& ee)
  3078. {
  3079. LOGC(aclog.Fatal, log << "accept_bond: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3080. SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
  3081. return INVALID_SOCK;
  3082. }
  3083. }
  3084. SRTSOCKET srt::CUDT::accept(SRTSOCKET u, sockaddr* addr, int* addrlen)
  3085. {
  3086. try
  3087. {
  3088. return uglobal().accept(u, addr, addrlen);
  3089. }
  3090. catch (const CUDTException& e)
  3091. {
  3092. SetThreadLocalError(e);
  3093. return INVALID_SOCK;
  3094. }
  3095. catch (const bad_alloc&)
  3096. {
  3097. SetThreadLocalError(CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0));
  3098. return INVALID_SOCK;
  3099. }
  3100. catch (const std::exception& ee)
  3101. {
  3102. LOGC(aclog.Fatal, log << "accept: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3103. SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
  3104. return INVALID_SOCK;
  3105. }
  3106. }
  3107. int srt::CUDT::connect(SRTSOCKET u, const sockaddr* name, const sockaddr* tname, int namelen)
  3108. {
  3109. try
  3110. {
  3111. return uglobal().connect(u, name, tname, namelen);
  3112. }
  3113. catch (const CUDTException& e)
  3114. {
  3115. return APIError(e);
  3116. }
  3117. catch (bad_alloc&)
  3118. {
  3119. return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
  3120. }
  3121. catch (std::exception& ee)
  3122. {
  3123. LOGC(aclog.Fatal, log << "connect: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3124. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3125. }
  3126. }
  3127. #if ENABLE_BONDING
  3128. int srt::CUDT::connectLinks(SRTSOCKET grp, SRT_SOCKGROUPCONFIG targets[], int arraysize)
  3129. {
  3130. if (arraysize <= 0)
  3131. return APIError(MJ_NOTSUP, MN_INVAL, 0);
  3132. if ((grp & SRTGROUP_MASK) == 0)
  3133. {
  3134. // connectLinks accepts only GROUP id, not socket id.
  3135. return APIError(MJ_NOTSUP, MN_SIDINVAL, 0);
  3136. }
  3137. try
  3138. {
  3139. CUDTUnited::GroupKeeper k(uglobal(), grp, CUDTUnited::ERH_THROW);
  3140. return uglobal().groupConnect(k.group, targets, arraysize);
  3141. }
  3142. catch (CUDTException& e)
  3143. {
  3144. return APIError(e);
  3145. }
  3146. catch (bad_alloc&)
  3147. {
  3148. return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
  3149. }
  3150. catch (std::exception& ee)
  3151. {
  3152. LOGC(aclog.Fatal, log << "connect: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3153. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3154. }
  3155. }
  3156. #endif
  3157. int srt::CUDT::connect(SRTSOCKET u, const sockaddr* name, int namelen, int32_t forced_isn)
  3158. {
  3159. try
  3160. {
  3161. return uglobal().connect(u, name, namelen, forced_isn);
  3162. }
  3163. catch (const CUDTException& e)
  3164. {
  3165. return APIError(e);
  3166. }
  3167. catch (bad_alloc&)
  3168. {
  3169. return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
  3170. }
  3171. catch (const std::exception& ee)
  3172. {
  3173. LOGC(aclog.Fatal, log << "connect: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3174. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3175. }
  3176. }
  3177. int srt::CUDT::close(SRTSOCKET u)
  3178. {
  3179. try
  3180. {
  3181. return uglobal().close(u);
  3182. }
  3183. catch (const CUDTException& e)
  3184. {
  3185. return APIError(e);
  3186. }
  3187. catch (const std::exception& ee)
  3188. {
  3189. LOGC(aclog.Fatal, log << "close: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3190. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3191. }
  3192. }
  3193. int srt::CUDT::getpeername(SRTSOCKET u, sockaddr* name, int* namelen)
  3194. {
  3195. try
  3196. {
  3197. uglobal().getpeername(u, name, namelen);
  3198. return 0;
  3199. }
  3200. catch (const CUDTException& e)
  3201. {
  3202. return APIError(e);
  3203. }
  3204. catch (const std::exception& ee)
  3205. {
  3206. LOGC(aclog.Fatal, log << "getpeername: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3207. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3208. }
  3209. }
  3210. int srt::CUDT::getsockname(SRTSOCKET u, sockaddr* name, int* namelen)
  3211. {
  3212. try
  3213. {
  3214. uglobal().getsockname(u, name, namelen);
  3215. return 0;
  3216. }
  3217. catch (const CUDTException& e)
  3218. {
  3219. return APIError(e);
  3220. }
  3221. catch (const std::exception& ee)
  3222. {
  3223. LOGC(aclog.Fatal, log << "getsockname: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3224. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3225. }
  3226. }
  3227. int srt::CUDT::getsockopt(SRTSOCKET u, int, SRT_SOCKOPT optname, void* pw_optval, int* pw_optlen)
  3228. {
  3229. if (!pw_optval || !pw_optlen)
  3230. {
  3231. return APIError(MJ_NOTSUP, MN_INVAL, 0);
  3232. }
  3233. try
  3234. {
  3235. #if ENABLE_BONDING
  3236. if (u & SRTGROUP_MASK)
  3237. {
  3238. CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW);
  3239. k.group->getOpt(optname, (pw_optval), (*pw_optlen));
  3240. return 0;
  3241. }
  3242. #endif
  3243. CUDT& udt = uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core();
  3244. udt.getOpt(optname, (pw_optval), (*pw_optlen));
  3245. return 0;
  3246. }
  3247. catch (const CUDTException& e)
  3248. {
  3249. return APIError(e);
  3250. }
  3251. catch (const std::exception& ee)
  3252. {
  3253. LOGC(aclog.Fatal, log << "getsockopt: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3254. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3255. }
  3256. }
  3257. int srt::CUDT::setsockopt(SRTSOCKET u, int, SRT_SOCKOPT optname, const void* optval, int optlen)
  3258. {
  3259. if (!optval)
  3260. return APIError(MJ_NOTSUP, MN_INVAL, 0);
  3261. try
  3262. {
  3263. #if ENABLE_BONDING
  3264. if (u & SRTGROUP_MASK)
  3265. {
  3266. CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW);
  3267. k.group->setOpt(optname, optval, optlen);
  3268. return 0;
  3269. }
  3270. #endif
  3271. CUDT& udt = uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core();
  3272. udt.setOpt(optname, optval, optlen);
  3273. return 0;
  3274. }
  3275. catch (const CUDTException& e)
  3276. {
  3277. return APIError(e);
  3278. }
  3279. catch (const std::exception& ee)
  3280. {
  3281. LOGC(aclog.Fatal, log << "setsockopt: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3282. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3283. }
  3284. }
  3285. int srt::CUDT::send(SRTSOCKET u, const char* buf, int len, int)
  3286. {
  3287. SRT_MSGCTRL mctrl = srt_msgctrl_default;
  3288. return sendmsg2(u, buf, len, (mctrl));
  3289. }
  3290. // --> CUDT::recv moved down
  3291. int srt::CUDT::sendmsg(SRTSOCKET u, const char* buf, int len, int ttl, bool inorder, int64_t srctime)
  3292. {
  3293. SRT_MSGCTRL mctrl = srt_msgctrl_default;
  3294. mctrl.msgttl = ttl;
  3295. mctrl.inorder = inorder;
  3296. mctrl.srctime = srctime;
  3297. return sendmsg2(u, buf, len, (mctrl));
  3298. }
  3299. int srt::CUDT::sendmsg2(SRTSOCKET u, const char* buf, int len, SRT_MSGCTRL& w_m)
  3300. {
  3301. try
  3302. {
  3303. #if ENABLE_BONDING
  3304. if (u & SRTGROUP_MASK)
  3305. {
  3306. CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW);
  3307. return k.group->send(buf, len, (w_m));
  3308. }
  3309. #endif
  3310. return uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core().sendmsg2(buf, len, (w_m));
  3311. }
  3312. catch (const CUDTException& e)
  3313. {
  3314. return APIError(e);
  3315. }
  3316. catch (bad_alloc&)
  3317. {
  3318. return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
  3319. }
  3320. catch (const std::exception& ee)
  3321. {
  3322. LOGC(aclog.Fatal, log << "sendmsg: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3323. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3324. }
  3325. }
  3326. int srt::CUDT::recv(SRTSOCKET u, char* buf, int len, int)
  3327. {
  3328. SRT_MSGCTRL mctrl = srt_msgctrl_default;
  3329. int ret = recvmsg2(u, buf, len, (mctrl));
  3330. return ret;
  3331. }
  3332. int srt::CUDT::recvmsg(SRTSOCKET u, char* buf, int len, int64_t& srctime)
  3333. {
  3334. SRT_MSGCTRL mctrl = srt_msgctrl_default;
  3335. int ret = recvmsg2(u, buf, len, (mctrl));
  3336. srctime = mctrl.srctime;
  3337. return ret;
  3338. }
  3339. int srt::CUDT::recvmsg2(SRTSOCKET u, char* buf, int len, SRT_MSGCTRL& w_m)
  3340. {
  3341. try
  3342. {
  3343. #if ENABLE_BONDING
  3344. if (u & SRTGROUP_MASK)
  3345. {
  3346. CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW);
  3347. return k.group->recv(buf, len, (w_m));
  3348. }
  3349. #endif
  3350. return uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core().recvmsg2(buf, len, (w_m));
  3351. }
  3352. catch (const CUDTException& e)
  3353. {
  3354. return APIError(e);
  3355. }
  3356. catch (const std::exception& ee)
  3357. {
  3358. LOGC(aclog.Fatal, log << "recvmsg: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3359. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3360. }
  3361. }
  3362. int64_t srt::CUDT::sendfile(SRTSOCKET u, fstream& ifs, int64_t& offset, int64_t size, int block)
  3363. {
  3364. try
  3365. {
  3366. CUDT& udt = uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core();
  3367. return udt.sendfile(ifs, offset, size, block);
  3368. }
  3369. catch (const CUDTException& e)
  3370. {
  3371. return APIError(e);
  3372. }
  3373. catch (bad_alloc&)
  3374. {
  3375. return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
  3376. }
  3377. catch (const std::exception& ee)
  3378. {
  3379. LOGC(aclog.Fatal, log << "sendfile: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3380. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3381. }
  3382. }
  3383. int64_t srt::CUDT::recvfile(SRTSOCKET u, fstream& ofs, int64_t& offset, int64_t size, int block)
  3384. {
  3385. try
  3386. {
  3387. return uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core().recvfile(ofs, offset, size, block);
  3388. }
  3389. catch (const CUDTException& e)
  3390. {
  3391. return APIError(e);
  3392. }
  3393. catch (const std::exception& ee)
  3394. {
  3395. LOGC(aclog.Fatal, log << "recvfile: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3396. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3397. }
  3398. }
  3399. int srt::CUDT::select(int, UDT::UDSET* readfds, UDT::UDSET* writefds, UDT::UDSET* exceptfds, const timeval* timeout)
  3400. {
  3401. if ((!readfds) && (!writefds) && (!exceptfds))
  3402. {
  3403. return APIError(MJ_NOTSUP, MN_INVAL, 0);
  3404. }
  3405. try
  3406. {
  3407. return uglobal().select(readfds, writefds, exceptfds, timeout);
  3408. }
  3409. catch (const CUDTException& e)
  3410. {
  3411. return APIError(e);
  3412. }
  3413. catch (bad_alloc&)
  3414. {
  3415. return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
  3416. }
  3417. catch (const std::exception& ee)
  3418. {
  3419. LOGC(aclog.Fatal, log << "select: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3420. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3421. }
  3422. }
  3423. int srt::CUDT::selectEx(const vector<SRTSOCKET>& fds,
  3424. vector<SRTSOCKET>* readfds,
  3425. vector<SRTSOCKET>* writefds,
  3426. vector<SRTSOCKET>* exceptfds,
  3427. int64_t msTimeOut)
  3428. {
  3429. if ((!readfds) && (!writefds) && (!exceptfds))
  3430. {
  3431. return APIError(MJ_NOTSUP, MN_INVAL, 0);
  3432. }
  3433. try
  3434. {
  3435. return uglobal().selectEx(fds, readfds, writefds, exceptfds, msTimeOut);
  3436. }
  3437. catch (const CUDTException& e)
  3438. {
  3439. return APIError(e);
  3440. }
  3441. catch (bad_alloc&)
  3442. {
  3443. return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
  3444. }
  3445. catch (const std::exception& ee)
  3446. {
  3447. LOGC(aclog.Fatal, log << "selectEx: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3448. return APIError(MJ_UNKNOWN);
  3449. }
  3450. }
  3451. int srt::CUDT::epoll_create()
  3452. {
  3453. try
  3454. {
  3455. return uglobal().epoll_create();
  3456. }
  3457. catch (const CUDTException& e)
  3458. {
  3459. return APIError(e);
  3460. }
  3461. catch (const std::exception& ee)
  3462. {
  3463. LOGC(aclog.Fatal, log << "epoll_create: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3464. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3465. }
  3466. }
  3467. int srt::CUDT::epoll_clear_usocks(int eid)
  3468. {
  3469. try
  3470. {
  3471. return uglobal().epoll_clear_usocks(eid);
  3472. }
  3473. catch (const CUDTException& e)
  3474. {
  3475. return APIError(e);
  3476. }
  3477. catch (std::exception& ee)
  3478. {
  3479. LOGC(aclog.Fatal,
  3480. log << "epoll_clear_usocks: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3481. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3482. }
  3483. }
  3484. int srt::CUDT::epoll_add_usock(const int eid, const SRTSOCKET u, const int* events)
  3485. {
  3486. try
  3487. {
  3488. return uglobal().epoll_add_usock(eid, u, events);
  3489. }
  3490. catch (const CUDTException& e)
  3491. {
  3492. return APIError(e);
  3493. }
  3494. catch (const std::exception& ee)
  3495. {
  3496. LOGC(aclog.Fatal, log << "epoll_add_usock: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3497. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3498. }
  3499. }
  3500. int srt::CUDT::epoll_add_ssock(const int eid, const SYSSOCKET s, const int* events)
  3501. {
  3502. try
  3503. {
  3504. return uglobal().epoll_add_ssock(eid, s, events);
  3505. }
  3506. catch (const CUDTException& e)
  3507. {
  3508. return APIError(e);
  3509. }
  3510. catch (const std::exception& ee)
  3511. {
  3512. LOGC(aclog.Fatal, log << "epoll_add_ssock: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3513. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3514. }
  3515. }
  3516. int srt::CUDT::epoll_update_usock(const int eid, const SRTSOCKET u, const int* events)
  3517. {
  3518. try
  3519. {
  3520. return uglobal().epoll_add_usock(eid, u, events);
  3521. }
  3522. catch (const CUDTException& e)
  3523. {
  3524. return APIError(e);
  3525. }
  3526. catch (const std::exception& ee)
  3527. {
  3528. LOGC(aclog.Fatal,
  3529. log << "epoll_update_usock: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3530. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3531. }
  3532. }
  3533. int srt::CUDT::epoll_update_ssock(const int eid, const SYSSOCKET s, const int* events)
  3534. {
  3535. try
  3536. {
  3537. return uglobal().epoll_update_ssock(eid, s, events);
  3538. }
  3539. catch (const CUDTException& e)
  3540. {
  3541. return APIError(e);
  3542. }
  3543. catch (const std::exception& ee)
  3544. {
  3545. LOGC(aclog.Fatal,
  3546. log << "epoll_update_ssock: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3547. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3548. }
  3549. }
  3550. int srt::CUDT::epoll_remove_usock(const int eid, const SRTSOCKET u)
  3551. {
  3552. try
  3553. {
  3554. return uglobal().epoll_remove_usock(eid, u);
  3555. }
  3556. catch (const CUDTException& e)
  3557. {
  3558. return APIError(e);
  3559. }
  3560. catch (const std::exception& ee)
  3561. {
  3562. LOGC(aclog.Fatal,
  3563. log << "epoll_remove_usock: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3564. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3565. }
  3566. }
  3567. int srt::CUDT::epoll_remove_ssock(const int eid, const SYSSOCKET s)
  3568. {
  3569. try
  3570. {
  3571. return uglobal().epoll_remove_ssock(eid, s);
  3572. }
  3573. catch (const CUDTException& e)
  3574. {
  3575. return APIError(e);
  3576. }
  3577. catch (const std::exception& ee)
  3578. {
  3579. LOGC(aclog.Fatal,
  3580. log << "epoll_remove_ssock: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3581. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3582. }
  3583. }
  3584. int srt::CUDT::epoll_wait(const int eid,
  3585. set<SRTSOCKET>* readfds,
  3586. set<SRTSOCKET>* writefds,
  3587. int64_t msTimeOut,
  3588. set<SYSSOCKET>* lrfds,
  3589. set<SYSSOCKET>* lwfds)
  3590. {
  3591. try
  3592. {
  3593. return uglobal().epoll_ref().wait(eid, readfds, writefds, msTimeOut, lrfds, lwfds);
  3594. }
  3595. catch (const CUDTException& e)
  3596. {
  3597. return APIError(e);
  3598. }
  3599. catch (const std::exception& ee)
  3600. {
  3601. LOGC(aclog.Fatal, log << "epoll_wait: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3602. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3603. }
  3604. }
  3605. int srt::CUDT::epoll_uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut)
  3606. {
  3607. try
  3608. {
  3609. return uglobal().epoll_uwait(eid, fdsSet, fdsSize, msTimeOut);
  3610. }
  3611. catch (const CUDTException& e)
  3612. {
  3613. return APIError(e);
  3614. }
  3615. catch (const std::exception& ee)
  3616. {
  3617. LOGC(aclog.Fatal, log << "epoll_uwait: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3618. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3619. }
  3620. }
  3621. int32_t srt::CUDT::epoll_set(const int eid, int32_t flags)
  3622. {
  3623. try
  3624. {
  3625. return uglobal().epoll_set(eid, flags);
  3626. }
  3627. catch (const CUDTException& e)
  3628. {
  3629. return APIError(e);
  3630. }
  3631. catch (const std::exception& ee)
  3632. {
  3633. LOGC(aclog.Fatal, log << "epoll_set: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3634. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3635. }
  3636. }
  3637. int srt::CUDT::epoll_release(const int eid)
  3638. {
  3639. try
  3640. {
  3641. return uglobal().epoll_release(eid);
  3642. }
  3643. catch (const CUDTException& e)
  3644. {
  3645. return APIError(e);
  3646. }
  3647. catch (const std::exception& ee)
  3648. {
  3649. LOGC(aclog.Fatal, log << "epoll_release: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3650. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3651. }
  3652. }
  3653. srt::CUDTException& srt::CUDT::getlasterror()
  3654. {
  3655. return GetThreadLocalError();
  3656. }
  3657. int srt::CUDT::bstats(SRTSOCKET u, CBytePerfMon* perf, bool clear, bool instantaneous)
  3658. {
  3659. #if ENABLE_BONDING
  3660. if (u & SRTGROUP_MASK)
  3661. return groupsockbstats(u, perf, clear);
  3662. #endif
  3663. try
  3664. {
  3665. CUDT& udt = uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core();
  3666. udt.bstats(perf, clear, instantaneous);
  3667. return 0;
  3668. }
  3669. catch (const CUDTException& e)
  3670. {
  3671. return APIError(e);
  3672. }
  3673. catch (const std::exception& ee)
  3674. {
  3675. LOGC(aclog.Fatal, log << "bstats: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3676. return APIError(MJ_UNKNOWN, MN_NONE, 0);
  3677. }
  3678. }
  3679. #if ENABLE_BONDING
  3680. int srt::CUDT::groupsockbstats(SRTSOCKET u, CBytePerfMon* perf, bool clear)
  3681. {
  3682. try
  3683. {
  3684. CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW);
  3685. k.group->bstatsSocket(perf, clear);
  3686. return 0;
  3687. }
  3688. catch (const CUDTException& e)
  3689. {
  3690. SetThreadLocalError(e);
  3691. return ERROR;
  3692. }
  3693. catch (const std::exception& ee)
  3694. {
  3695. LOGC(aclog.Fatal, log << "bstats: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3696. SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
  3697. return ERROR;
  3698. }
  3699. }
  3700. #endif
  3701. srt::CUDT* srt::CUDT::getUDTHandle(SRTSOCKET u)
  3702. {
  3703. try
  3704. {
  3705. return &uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core();
  3706. }
  3707. catch (const CUDTException& e)
  3708. {
  3709. SetThreadLocalError(e);
  3710. return NULL;
  3711. }
  3712. catch (const std::exception& ee)
  3713. {
  3714. LOGC(aclog.Fatal, log << "getUDTHandle: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3715. SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
  3716. return NULL;
  3717. }
  3718. }
  3719. vector<SRTSOCKET> srt::CUDT::existingSockets()
  3720. {
  3721. vector<SRTSOCKET> out;
  3722. for (CUDTUnited::sockets_t::iterator i = uglobal().m_Sockets.begin(); i != uglobal().m_Sockets.end(); ++i)
  3723. {
  3724. out.push_back(i->first);
  3725. }
  3726. return out;
  3727. }
  3728. SRT_SOCKSTATUS srt::CUDT::getsockstate(SRTSOCKET u)
  3729. {
  3730. try
  3731. {
  3732. #if ENABLE_BONDING
  3733. if (isgroup(u))
  3734. {
  3735. CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW);
  3736. return k.group->getStatus();
  3737. }
  3738. #endif
  3739. return uglobal().getStatus(u);
  3740. }
  3741. catch (const CUDTException& e)
  3742. {
  3743. SetThreadLocalError(e);
  3744. return SRTS_NONEXIST;
  3745. }
  3746. catch (const std::exception& ee)
  3747. {
  3748. LOGC(aclog.Fatal, log << "getsockstate: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
  3749. SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
  3750. return SRTS_NONEXIST;
  3751. }
  3752. }
  3753. ////////////////////////////////////////////////////////////////////////////////
  3754. namespace UDT
  3755. {
  3756. int startup()
  3757. {
  3758. return srt::CUDT::startup();
  3759. }
  3760. int cleanup()
  3761. {
  3762. return srt::CUDT::cleanup();
  3763. }
  3764. int bind(SRTSOCKET u, const struct sockaddr* name, int namelen)
  3765. {
  3766. return srt::CUDT::bind(u, name, namelen);
  3767. }
  3768. int bind2(SRTSOCKET u, UDPSOCKET udpsock)
  3769. {
  3770. return srt::CUDT::bind(u, udpsock);
  3771. }
  3772. int listen(SRTSOCKET u, int backlog)
  3773. {
  3774. return srt::CUDT::listen(u, backlog);
  3775. }
  3776. SRTSOCKET accept(SRTSOCKET u, struct sockaddr* addr, int* addrlen)
  3777. {
  3778. return srt::CUDT::accept(u, addr, addrlen);
  3779. }
  3780. int connect(SRTSOCKET u, const struct sockaddr* name, int namelen)
  3781. {
  3782. return srt::CUDT::connect(u, name, namelen, SRT_SEQNO_NONE);
  3783. }
  3784. int close(SRTSOCKET u)
  3785. {
  3786. return srt::CUDT::close(u);
  3787. }
  3788. int getpeername(SRTSOCKET u, struct sockaddr* name, int* namelen)
  3789. {
  3790. return srt::CUDT::getpeername(u, name, namelen);
  3791. }
  3792. int getsockname(SRTSOCKET u, struct sockaddr* name, int* namelen)
  3793. {
  3794. return srt::CUDT::getsockname(u, name, namelen);
  3795. }
  3796. int getsockopt(SRTSOCKET u, int level, SRT_SOCKOPT optname, void* optval, int* optlen)
  3797. {
  3798. return srt::CUDT::getsockopt(u, level, optname, optval, optlen);
  3799. }
  3800. int setsockopt(SRTSOCKET u, int level, SRT_SOCKOPT optname, const void* optval, int optlen)
  3801. {
  3802. return srt::CUDT::setsockopt(u, level, optname, optval, optlen);
  3803. }
  3804. // DEVELOPER API
  3805. int connect_debug(SRTSOCKET u, const struct sockaddr* name, int namelen, int32_t forced_isn)
  3806. {
  3807. return srt::CUDT::connect(u, name, namelen, forced_isn);
  3808. }
  3809. int send(SRTSOCKET u, const char* buf, int len, int flags)
  3810. {
  3811. return srt::CUDT::send(u, buf, len, flags);
  3812. }
  3813. int recv(SRTSOCKET u, char* buf, int len, int flags)
  3814. {
  3815. return srt::CUDT::recv(u, buf, len, flags);
  3816. }
  3817. int sendmsg(SRTSOCKET u, const char* buf, int len, int ttl, bool inorder, int64_t srctime)
  3818. {
  3819. return srt::CUDT::sendmsg(u, buf, len, ttl, inorder, srctime);
  3820. }
  3821. int recvmsg(SRTSOCKET u, char* buf, int len, int64_t& srctime)
  3822. {
  3823. return srt::CUDT::recvmsg(u, buf, len, srctime);
  3824. }
  3825. int recvmsg(SRTSOCKET u, char* buf, int len)
  3826. {
  3827. int64_t srctime;
  3828. return srt::CUDT::recvmsg(u, buf, len, srctime);
  3829. }
  3830. int64_t sendfile(SRTSOCKET u, fstream& ifs, int64_t& offset, int64_t size, int block)
  3831. {
  3832. return srt::CUDT::sendfile(u, ifs, offset, size, block);
  3833. }
  3834. int64_t recvfile(SRTSOCKET u, fstream& ofs, int64_t& offset, int64_t size, int block)
  3835. {
  3836. return srt::CUDT::recvfile(u, ofs, offset, size, block);
  3837. }
  3838. int64_t sendfile2(SRTSOCKET u, const char* path, int64_t* offset, int64_t size, int block)
  3839. {
  3840. fstream ifs(path, ios::binary | ios::in);
  3841. int64_t ret = srt::CUDT::sendfile(u, ifs, *offset, size, block);
  3842. ifs.close();
  3843. return ret;
  3844. }
  3845. int64_t recvfile2(SRTSOCKET u, const char* path, int64_t* offset, int64_t size, int block)
  3846. {
  3847. fstream ofs(path, ios::binary | ios::out);
  3848. int64_t ret = srt::CUDT::recvfile(u, ofs, *offset, size, block);
  3849. ofs.close();
  3850. return ret;
  3851. }
  3852. int select(int nfds, UDSET* readfds, UDSET* writefds, UDSET* exceptfds, const struct timeval* timeout)
  3853. {
  3854. return srt::CUDT::select(nfds, readfds, writefds, exceptfds, timeout);
  3855. }
  3856. int selectEx(const vector<SRTSOCKET>& fds,
  3857. vector<SRTSOCKET>* readfds,
  3858. vector<SRTSOCKET>* writefds,
  3859. vector<SRTSOCKET>* exceptfds,
  3860. int64_t msTimeOut)
  3861. {
  3862. return srt::CUDT::selectEx(fds, readfds, writefds, exceptfds, msTimeOut);
  3863. }
  3864. int epoll_create()
  3865. {
  3866. return srt::CUDT::epoll_create();
  3867. }
  3868. int epoll_clear_usocks(int eid)
  3869. {
  3870. return srt::CUDT::epoll_clear_usocks(eid);
  3871. }
  3872. int epoll_add_usock(int eid, SRTSOCKET u, const int* events)
  3873. {
  3874. return srt::CUDT::epoll_add_usock(eid, u, events);
  3875. }
  3876. int epoll_add_ssock(int eid, SYSSOCKET s, const int* events)
  3877. {
  3878. return srt::CUDT::epoll_add_ssock(eid, s, events);
  3879. }
  3880. int epoll_update_usock(int eid, SRTSOCKET u, const int* events)
  3881. {
  3882. return srt::CUDT::epoll_update_usock(eid, u, events);
  3883. }
  3884. int epoll_update_ssock(int eid, SYSSOCKET s, const int* events)
  3885. {
  3886. return srt::CUDT::epoll_update_ssock(eid, s, events);
  3887. }
  3888. int epoll_remove_usock(int eid, SRTSOCKET u)
  3889. {
  3890. return srt::CUDT::epoll_remove_usock(eid, u);
  3891. }
  3892. int epoll_remove_ssock(int eid, SYSSOCKET s)
  3893. {
  3894. return srt::CUDT::epoll_remove_ssock(eid, s);
  3895. }
  3896. int epoll_wait(int eid,
  3897. set<SRTSOCKET>* readfds,
  3898. set<SRTSOCKET>* writefds,
  3899. int64_t msTimeOut,
  3900. set<SYSSOCKET>* lrfds,
  3901. set<SYSSOCKET>* lwfds)
  3902. {
  3903. return srt::CUDT::epoll_wait(eid, readfds, writefds, msTimeOut, lrfds, lwfds);
  3904. }
  3905. template <class SOCKTYPE>
  3906. inline void set_result(set<SOCKTYPE>* val, int* num, SOCKTYPE* fds)
  3907. {
  3908. if (!val || !num || !fds)
  3909. return;
  3910. if (*num > int(val->size()))
  3911. *num = int(val->size()); // will get 0 if val->empty()
  3912. int count = 0;
  3913. // This loop will run 0 times if val->empty()
  3914. for (typename set<SOCKTYPE>::const_iterator it = val->begin(); it != val->end(); ++it)
  3915. {
  3916. if (count >= *num)
  3917. break;
  3918. fds[count++] = *it;
  3919. }
  3920. }
  3921. int epoll_wait2(int eid,
  3922. SRTSOCKET* readfds,
  3923. int* rnum,
  3924. SRTSOCKET* writefds,
  3925. int* wnum,
  3926. int64_t msTimeOut,
  3927. SYSSOCKET* lrfds,
  3928. int* lrnum,
  3929. SYSSOCKET* lwfds,
  3930. int* lwnum)
  3931. {
  3932. // This API is an alternative format for epoll_wait, created for
  3933. // compatibility with other languages. Users need to pass in an array
  3934. // for holding the returned sockets, with the maximum array length
  3935. // stored in *rnum, etc., which will be updated with returned number
  3936. // of sockets.
  3937. set<SRTSOCKET> readset;
  3938. set<SRTSOCKET> writeset;
  3939. set<SYSSOCKET> lrset;
  3940. set<SYSSOCKET> lwset;
  3941. set<SRTSOCKET>* rval = NULL;
  3942. set<SRTSOCKET>* wval = NULL;
  3943. set<SYSSOCKET>* lrval = NULL;
  3944. set<SYSSOCKET>* lwval = NULL;
  3945. if ((readfds != NULL) && (rnum != NULL))
  3946. rval = &readset;
  3947. if ((writefds != NULL) && (wnum != NULL))
  3948. wval = &writeset;
  3949. if ((lrfds != NULL) && (lrnum != NULL))
  3950. lrval = &lrset;
  3951. if ((lwfds != NULL) && (lwnum != NULL))
  3952. lwval = &lwset;
  3953. int ret = srt::CUDT::epoll_wait(eid, rval, wval, msTimeOut, lrval, lwval);
  3954. if (ret > 0)
  3955. {
  3956. // set<SRTSOCKET>::const_iterator i;
  3957. // SET_RESULT(rval, rnum, readfds, i);
  3958. set_result(rval, rnum, readfds);
  3959. // SET_RESULT(wval, wnum, writefds, i);
  3960. set_result(wval, wnum, writefds);
  3961. // set<SYSSOCKET>::const_iterator j;
  3962. // SET_RESULT(lrval, lrnum, lrfds, j);
  3963. set_result(lrval, lrnum, lrfds);
  3964. // SET_RESULT(lwval, lwnum, lwfds, j);
  3965. set_result(lwval, lwnum, lwfds);
  3966. }
  3967. return ret;
  3968. }
  3969. int epoll_uwait(int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut)
  3970. {
  3971. return srt::CUDT::epoll_uwait(eid, fdsSet, fdsSize, msTimeOut);
  3972. }
  3973. int epoll_release(int eid)
  3974. {
  3975. return srt::CUDT::epoll_release(eid);
  3976. }
  3977. ERRORINFO& getlasterror()
  3978. {
  3979. return srt::CUDT::getlasterror();
  3980. }
  3981. int getlasterror_code()
  3982. {
  3983. return srt::CUDT::getlasterror().getErrorCode();
  3984. }
  3985. const char* getlasterror_desc()
  3986. {
  3987. return srt::CUDT::getlasterror().getErrorMessage();
  3988. }
  3989. int getlasterror_errno()
  3990. {
  3991. return srt::CUDT::getlasterror().getErrno();
  3992. }
  3993. // Get error string of a given error code
  3994. const char* geterror_desc(int code, int err)
  3995. {
  3996. srt::CUDTException e(CodeMajor(code / 1000), CodeMinor(code % 1000), err);
  3997. return (e.getErrorMessage());
  3998. }
  3999. int bstats(SRTSOCKET u, SRT_TRACEBSTATS* perf, bool clear)
  4000. {
  4001. return srt::CUDT::bstats(u, perf, clear);
  4002. }
  4003. SRT_SOCKSTATUS getsockstate(SRTSOCKET u)
  4004. {
  4005. return srt::CUDT::getsockstate(u);
  4006. }
  4007. } // namespace UDT
  4008. namespace srt
  4009. {
  4010. void setloglevel(LogLevel::type ll)
  4011. {
  4012. ScopedLock gg(srt_logger_config.mutex);
  4013. srt_logger_config.max_level = ll;
  4014. }
  4015. void addlogfa(LogFA fa)
  4016. {
  4017. ScopedLock gg(srt_logger_config.mutex);
  4018. srt_logger_config.enabled_fa.set(fa, true);
  4019. }
  4020. void dellogfa(LogFA fa)
  4021. {
  4022. ScopedLock gg(srt_logger_config.mutex);
  4023. srt_logger_config.enabled_fa.set(fa, false);
  4024. }
  4025. void resetlogfa(set<LogFA> fas)
  4026. {
  4027. ScopedLock gg(srt_logger_config.mutex);
  4028. for (int i = 0; i <= SRT_LOGFA_LASTNONE; ++i)
  4029. srt_logger_config.enabled_fa.set(i, fas.count(i));
  4030. }
  4031. void resetlogfa(const int* fara, size_t fara_size)
  4032. {
  4033. ScopedLock gg(srt_logger_config.mutex);
  4034. srt_logger_config.enabled_fa.reset();
  4035. for (const int* i = fara; i != fara + fara_size; ++i)
  4036. srt_logger_config.enabled_fa.set(*i, true);
  4037. }
  4038. void setlogstream(std::ostream& stream)
  4039. {
  4040. ScopedLock gg(srt_logger_config.mutex);
  4041. srt_logger_config.log_stream = &stream;
  4042. }
  4043. void setloghandler(void* opaque, SRT_LOG_HANDLER_FN* handler)
  4044. {
  4045. ScopedLock gg(srt_logger_config.mutex);
  4046. srt_logger_config.loghandler_opaque = opaque;
  4047. srt_logger_config.loghandler_fn = handler;
  4048. }
  4049. void setlogflags(int flags)
  4050. {
  4051. ScopedLock gg(srt_logger_config.mutex);
  4052. srt_logger_config.flags = flags;
  4053. }
  4054. SRT_API bool setstreamid(SRTSOCKET u, const std::string& sid)
  4055. {
  4056. return CUDT::setstreamid(u, sid);
  4057. }
  4058. SRT_API std::string getstreamid(SRTSOCKET u)
  4059. {
  4060. return CUDT::getstreamid(u);
  4061. }
  4062. int getrejectreason(SRTSOCKET u)
  4063. {
  4064. return CUDT::rejectReason(u);
  4065. }
  4066. int setrejectreason(SRTSOCKET u, int value)
  4067. {
  4068. return CUDT::rejectReason(u, value);
  4069. }
  4070. } // namespace srt