epoll.cpp 33 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013
  1. /*
  2. * SRT - Secure, Reliable, Transport
  3. * Copyright (c) 2018 Haivision Systems Inc.
  4. *
  5. * This Source Code Form is subject to the terms of the Mozilla Public
  6. * License, v. 2.0. If a copy of the MPL was not distributed with this
  7. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  8. *
  9. */
  10. /*****************************************************************************
  11. Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois.
  12. All rights reserved.
  13. Redistribution and use in source and binary forms, with or without
  14. modification, are permitted provided that the following conditions are
  15. met:
  16. * Redistributions of source code must retain the above
  17. copyright notice, this list of conditions and the
  18. following disclaimer.
  19. * Redistributions in binary form must reproduce the
  20. above copyright notice, this list of conditions
  21. and the following disclaimer in the documentation
  22. and/or other materials provided with the distribution.
  23. * Neither the name of the University of Illinois
  24. nor the names of its contributors may be used to
  25. endorse or promote products derived from this
  26. software without specific prior written permission.
  27. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
  28. IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
  29. THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
  30. PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
  31. CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
  32. EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
  33. PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
  34. PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
  35. LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
  36. NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  37. SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  38. *****************************************************************************/
  39. /*****************************************************************************
  40. written by
  41. Yunhong Gu, last updated 01/01/2011
  42. modified by
  43. Haivision Systems Inc.
  44. *****************************************************************************/
  45. #define SRT_IMPORT_EVENT
  46. #include "platform_sys.h"
  47. #include <algorithm>
  48. #include <cerrno>
  49. #include <cstring>
  50. #include <iterator>
  51. #if defined(__FreeBSD_kernel__)
  52. #include <sys/event.h>
  53. #endif
  54. #include "common.h"
  55. #include "epoll.h"
  56. #include "logging.h"
  57. #include "udt.h"
  58. #include "utilities.h"
  59. using namespace std;
  60. using namespace srt::sync;
  61. #if ENABLE_HEAVY_LOGGING
  62. namespace srt {
  63. static ostream& PrintEpollEvent(ostream& os, int events, int et_events = 0);
  64. }
  65. #endif
  66. namespace srt_logging
  67. {
  68. extern Logger eilog, ealog;
  69. }
  70. using namespace srt_logging;
  71. #if ENABLE_HEAVY_LOGGING
  72. #define IF_DIRNAME(tested, flag, name) (tested & flag ? name : "")
  73. #endif
  74. srt::CEPoll::CEPoll():
  75. m_iIDSeed(0)
  76. {
  77. // Exception -> CUDTUnited ctor.
  78. setupMutex(m_EPollLock, "EPoll");
  79. }
  80. srt::CEPoll::~CEPoll()
  81. {
  82. releaseMutex(m_EPollLock);
  83. }
  84. int srt::CEPoll::create(CEPollDesc** pout)
  85. {
  86. ScopedLock pg(m_EPollLock);
  87. if (++ m_iIDSeed >= 0x7FFFFFFF)
  88. m_iIDSeed = 0;
  89. // Check if an item already exists. Should not ever happen.
  90. if (m_mPolls.find(m_iIDSeed) != m_mPolls.end())
  91. throw CUDTException(MJ_SETUP, MN_NONE);
  92. int localid = 0;
  93. #ifdef LINUX
  94. // NOTE: epoll_create1() and EPOLL_CLOEXEC were introduced in GLIBC-2.9.
  95. // So earlier versions of GLIBC, must use epoll_create() and set
  96. // FD_CLOEXEC on the file descriptor returned by it after the fact.
  97. #if defined(EPOLL_CLOEXEC)
  98. int flags = 0;
  99. #if ENABLE_SOCK_CLOEXEC
  100. flags |= EPOLL_CLOEXEC;
  101. #endif
  102. localid = epoll_create1(flags);
  103. #else
  104. localid = epoll_create(1);
  105. #if ENABLE_SOCK_CLOEXEC
  106. if (localid != -1)
  107. {
  108. int fdFlags = fcntl(localid, F_GETFD);
  109. if (fdFlags != -1)
  110. {
  111. fdFlags |= FD_CLOEXEC;
  112. fcntl(localid, F_SETFD, fdFlags);
  113. }
  114. }
  115. #endif
  116. #endif
  117. /* Possible reasons of -1 error:
  118. EMFILE: The per-user limit on the number of epoll instances imposed by /proc/sys/fs/epoll/max_user_instances was encountered.
  119. ENFILE: The system limit on the total number of open files has been reached.
  120. ENOMEM: There was insufficient memory to create the kernel object.
  121. */
  122. if (localid < 0)
  123. throw CUDTException(MJ_SETUP, MN_NONE, errno);
  124. #elif defined(BSD) || TARGET_OS_MAC
  125. localid = kqueue();
  126. if (localid < 0)
  127. throw CUDTException(MJ_SETUP, MN_NONE, errno);
  128. #else
  129. // TODO: Solaris, use port_getn()
  130. // https://docs.oracle.com/cd/E86824_01/html/E54766/port-get-3c.html
  131. // on Windows, select
  132. #endif
  133. pair<map<int, CEPollDesc>::iterator, bool> res = m_mPolls.insert(make_pair(m_iIDSeed, CEPollDesc(m_iIDSeed, localid)));
  134. if (!res.second) // Insertion failed (no memory?)
  135. throw CUDTException(MJ_SETUP, MN_NONE);
  136. if (pout)
  137. *pout = &res.first->second;
  138. return m_iIDSeed;
  139. }
  140. int srt::CEPoll::clear_usocks(int eid)
  141. {
  142. // This should remove all SRT sockets from given eid.
  143. ScopedLock pg (m_EPollLock);
  144. map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
  145. if (p == m_mPolls.end())
  146. throw CUDTException(MJ_NOTSUP, MN_EIDINVAL);
  147. CEPollDesc& d = p->second;
  148. d.clearAll();
  149. return 0;
  150. }
  151. void srt::CEPoll::clear_ready_usocks(CEPollDesc& d, int direction)
  152. {
  153. if ((direction & ~SRT_EPOLL_EVENTTYPES) != 0)
  154. {
  155. // This is internal function, so simply report an IPE on incorrect usage.
  156. LOGC(eilog.Error, log << "CEPoll::clear_ready_usocks: IPE, event flags exceed event types: " << direction);
  157. return;
  158. }
  159. ScopedLock pg (m_EPollLock);
  160. vector<SRTSOCKET> cleared;
  161. CEPollDesc::enotice_t::iterator i = d.enotice_begin();
  162. while (i != d.enotice_end())
  163. {
  164. IF_HEAVY_LOGGING(SRTSOCKET subsock = i->fd);
  165. SRTSOCKET rs = d.clearEventSub(i++, direction);
  166. // This function returns:
  167. // - a valid socket - if there are no other subscription after 'direction' was cleared
  168. // - SRT_INVALID_SOCK otherwise
  169. // Valid sockets should be collected as sockets that no longer
  170. // have a subscribed event should be deleted from subscriptions.
  171. if (rs != SRT_INVALID_SOCK)
  172. {
  173. HLOGC(eilog.Debug, log << "CEPoll::clear_ready_usocks: @" << rs << " got all subscription cleared");
  174. cleared.push_back(rs);
  175. }
  176. else
  177. {
  178. HLOGC(eilog.Debug, log << "CEPoll::clear_ready_usocks: @" << subsock << " is still subscribed");
  179. }
  180. }
  181. for (size_t j = 0; j < cleared.size(); ++j)
  182. d.removeSubscription(cleared[j]);
  183. }
  184. int srt::CEPoll::add_ssock(const int eid, const SYSSOCKET& s, const int* events)
  185. {
  186. ScopedLock pg(m_EPollLock);
  187. map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
  188. if (p == m_mPolls.end())
  189. throw CUDTException(MJ_NOTSUP, MN_EIDINVAL);
  190. #ifdef LINUX
  191. epoll_event ev;
  192. memset(&ev, 0, sizeof(epoll_event));
  193. if (NULL == events)
  194. ev.events = EPOLLIN | EPOLLOUT | EPOLLERR;
  195. else
  196. {
  197. ev.events = 0;
  198. if (*events & SRT_EPOLL_IN)
  199. ev.events |= EPOLLIN;
  200. if (*events & SRT_EPOLL_OUT)
  201. ev.events |= EPOLLOUT;
  202. if (*events & SRT_EPOLL_ERR)
  203. ev.events |= EPOLLERR;
  204. }
  205. ev.data.fd = s;
  206. if (::epoll_ctl(p->second.m_iLocalID, EPOLL_CTL_ADD, s, &ev) < 0)
  207. throw CUDTException();
  208. #elif defined(BSD) || TARGET_OS_MAC
  209. struct kevent ke[2];
  210. int num = 0;
  211. if (NULL == events)
  212. {
  213. EV_SET(&ke[num++], s, EVFILT_READ, EV_ADD, 0, 0, NULL);
  214. EV_SET(&ke[num++], s, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
  215. }
  216. else
  217. {
  218. if (*events & SRT_EPOLL_IN)
  219. {
  220. EV_SET(&ke[num++], s, EVFILT_READ, EV_ADD, 0, 0, NULL);
  221. }
  222. if (*events & SRT_EPOLL_OUT)
  223. {
  224. EV_SET(&ke[num++], s, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
  225. }
  226. }
  227. if (kevent(p->second.m_iLocalID, ke, num, NULL, 0, NULL) < 0)
  228. throw CUDTException();
  229. #else
  230. // fake use 'events' to prevent warning. Remove when implemented.
  231. (void)events;
  232. (void)s;
  233. #ifdef _MSC_VER
  234. // Microsoft Visual Studio doesn't support the #warning directive - nonstandard anyway.
  235. // Use #pragma message with the same text.
  236. // All other compilers should be ok :)
  237. #pragma message("WARNING: Unsupported system for epoll. The epoll_add_ssock() API call won't work on this platform.")
  238. #else
  239. #warning "Unsupported system for epoll. The epoll_add_ssock() API call won't work on this platform."
  240. #endif
  241. #endif
  242. p->second.m_sLocals.insert(s);
  243. return 0;
  244. }
  245. int srt::CEPoll::remove_ssock(const int eid, const SYSSOCKET& s)
  246. {
  247. ScopedLock pg(m_EPollLock);
  248. map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
  249. if (p == m_mPolls.end())
  250. throw CUDTException(MJ_NOTSUP, MN_EIDINVAL);
  251. #ifdef LINUX
  252. epoll_event ev; // ev is ignored, for compatibility with old Linux kernel only.
  253. if (::epoll_ctl(p->second.m_iLocalID, EPOLL_CTL_DEL, s, &ev) < 0)
  254. throw CUDTException();
  255. #elif defined(BSD) || TARGET_OS_MAC
  256. struct kevent ke;
  257. //
  258. // Since I don't know what was set before
  259. // Just clear out both read and write
  260. //
  261. EV_SET(&ke, s, EVFILT_READ, EV_DELETE, 0, 0, NULL);
  262. kevent(p->second.m_iLocalID, &ke, 1, NULL, 0, NULL);
  263. EV_SET(&ke, s, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
  264. kevent(p->second.m_iLocalID, &ke, 1, NULL, 0, NULL);
  265. #endif
  266. p->second.m_sLocals.erase(s);
  267. return 0;
  268. }
  269. // Need this to atomically modify polled events (ex: remove write/keep read)
  270. int srt::CEPoll::update_usock(const int eid, const SRTSOCKET& u, const int* events)
  271. {
  272. ScopedLock pg(m_EPollLock);
  273. IF_HEAVY_LOGGING(ostringstream evd);
  274. map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
  275. if (p == m_mPolls.end())
  276. throw CUDTException(MJ_NOTSUP, MN_EIDINVAL);
  277. CEPollDesc& d = p->second;
  278. int32_t evts = events ? *events : uint32_t(SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR);
  279. bool edgeTriggered = evts & SRT_EPOLL_ET;
  280. evts &= ~SRT_EPOLL_ET;
  281. // et_evts = all events, if SRT_EPOLL_ET, or only those that are always ET otherwise.
  282. int32_t et_evts = edgeTriggered ? evts : evts & SRT_EPOLL_ETONLY;
  283. if (evts)
  284. {
  285. pair<CEPollDesc::ewatch_t::iterator, bool> iter_new = d.addWatch(u, evts, et_evts);
  286. CEPollDesc::Wait& wait = iter_new.first->second;
  287. if (!iter_new.second)
  288. {
  289. // The object exists. We only are certain about the `u`
  290. // parameter, but others are probably unchanged. Change them
  291. // forcefully and take out notices that are no longer valid.
  292. const int removable = wait.watch & ~evts;
  293. IF_HEAVY_LOGGING(PrintEpollEvent(evd, evts & (~wait.watch)));
  294. // Check if there are any events that would be removed.
  295. // If there are no removed events watched (for example, when
  296. // only new events are being added to existing socket),
  297. // there's nothing to remove, but might be something to update.
  298. if (removable)
  299. {
  300. d.removeExcessEvents(wait, evts);
  301. }
  302. // Update the watch configuration, including edge
  303. wait.watch = evts;
  304. wait.edge = et_evts;
  305. // Now it should look exactly like newly added
  306. // and the state is also updated
  307. HLOGC(ealog.Debug, log << "srt_epoll_update_usock: UPDATED E" << eid << " for @" << u << " +" << evd.str());
  308. }
  309. else
  310. {
  311. IF_HEAVY_LOGGING(PrintEpollEvent(evd, evts));
  312. HLOGC(ealog.Debug, log << "srt_epoll_update_usock: ADDED E" << eid << " for @" << u << " " << evd.str());
  313. }
  314. const int newstate = wait.watch & wait.state;
  315. if (newstate)
  316. {
  317. d.addEventNotice(wait, u, newstate);
  318. }
  319. }
  320. else if (edgeTriggered)
  321. {
  322. LOGC(ealog.Error, log << "srt_epoll_update_usock: Specified only SRT_EPOLL_ET flag, but no event flag. Error.");
  323. throw CUDTException(MJ_NOTSUP, MN_INVAL);
  324. }
  325. else
  326. {
  327. // Update with no events means to remove subscription
  328. HLOGC(ealog.Debug, log << "srt_epoll_update_usock: REMOVED E" << eid << " socket @" << u);
  329. d.removeSubscription(u);
  330. }
  331. return 0;
  332. }
  333. int srt::CEPoll::update_ssock(const int eid, const SYSSOCKET& s, const int* events)
  334. {
  335. ScopedLock pg(m_EPollLock);
  336. map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
  337. if (p == m_mPolls.end())
  338. throw CUDTException(MJ_NOTSUP, MN_EIDINVAL);
  339. #ifdef LINUX
  340. epoll_event ev;
  341. memset(&ev, 0, sizeof(epoll_event));
  342. if (NULL == events)
  343. ev.events = EPOLLIN | EPOLLOUT | EPOLLERR;
  344. else
  345. {
  346. ev.events = 0;
  347. if (*events & SRT_EPOLL_IN)
  348. ev.events |= EPOLLIN;
  349. if (*events & SRT_EPOLL_OUT)
  350. ev.events |= EPOLLOUT;
  351. if (*events & SRT_EPOLL_ERR)
  352. ev.events |= EPOLLERR;
  353. }
  354. ev.data.fd = s;
  355. if (::epoll_ctl(p->second.m_iLocalID, EPOLL_CTL_MOD, s, &ev) < 0)
  356. throw CUDTException();
  357. #elif defined(BSD) || TARGET_OS_MAC
  358. struct kevent ke[2];
  359. int num = 0;
  360. //
  361. // Since I don't know what was set before
  362. // Just clear out both read and write
  363. //
  364. EV_SET(&ke[0], s, EVFILT_READ, EV_DELETE, 0, 0, NULL);
  365. kevent(p->second.m_iLocalID, ke, 1, NULL, 0, NULL);
  366. EV_SET(&ke[0], s, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
  367. kevent(p->second.m_iLocalID, ke, 1, NULL, 0, NULL);
  368. if (NULL == events)
  369. {
  370. EV_SET(&ke[num++], s, EVFILT_READ, EV_ADD, 0, 0, NULL);
  371. EV_SET(&ke[num++], s, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
  372. }
  373. else
  374. {
  375. if (*events & SRT_EPOLL_IN)
  376. {
  377. EV_SET(&ke[num++], s, EVFILT_READ, EV_ADD, 0, 0, NULL);
  378. }
  379. if (*events & SRT_EPOLL_OUT)
  380. {
  381. EV_SET(&ke[num++], s, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
  382. }
  383. }
  384. if (kevent(p->second.m_iLocalID, ke, num, NULL, 0, NULL) < 0)
  385. throw CUDTException();
  386. #else
  387. // fake use 'events' to prevent warning. Remove when implemented.
  388. (void)events;
  389. (void)s;
  390. #endif
  391. // Assuming add is used if not inserted
  392. // p->second.m_sLocals.insert(s);
  393. return 0;
  394. }
  395. int srt::CEPoll::setflags(const int eid, int32_t flags)
  396. {
  397. ScopedLock pg(m_EPollLock);
  398. map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
  399. if (p == m_mPolls.end())
  400. throw CUDTException(MJ_NOTSUP, MN_EIDINVAL);
  401. CEPollDesc& ed = p->second;
  402. int32_t oflags = ed.flags();
  403. if (flags == -1)
  404. return oflags;
  405. if (flags == 0)
  406. {
  407. ed.clr_flags(~int32_t());
  408. }
  409. else
  410. {
  411. ed.set_flags(flags);
  412. }
  413. return oflags;
  414. }
  415. int srt::CEPoll::uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut)
  416. {
  417. // It is allowed to call this function witn fdsSize == 0
  418. // and therefore also NULL fdsSet. This will then only report
  419. // the number of ready sockets, just without information which.
  420. if (fdsSize < 0 || (fdsSize > 0 && !fdsSet))
  421. throw CUDTException(MJ_NOTSUP, MN_INVAL);
  422. steady_clock::time_point entertime = steady_clock::now();
  423. while (true)
  424. {
  425. {
  426. ScopedLock pg(m_EPollLock);
  427. map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
  428. if (p == m_mPolls.end())
  429. throw CUDTException(MJ_NOTSUP, MN_EIDINVAL);
  430. CEPollDesc& ed = p->second;
  431. if (!ed.flags(SRT_EPOLL_ENABLE_EMPTY) && ed.watch_empty())
  432. {
  433. // Empty EID is not allowed, report error.
  434. throw CUDTException(MJ_NOTSUP, MN_EEMPTY);
  435. }
  436. if (ed.flags(SRT_EPOLL_ENABLE_OUTPUTCHECK) && (fdsSet == NULL || fdsSize == 0))
  437. {
  438. // Empty EID is not allowed, report error.
  439. throw CUDTException(MJ_NOTSUP, MN_INVAL);
  440. }
  441. if (!ed.m_sLocals.empty())
  442. {
  443. // XXX Add error log
  444. // uwait should not be used with EIDs subscribed to system sockets
  445. throw CUDTException(MJ_NOTSUP, MN_INVAL);
  446. }
  447. int total = 0; // This is a list, so count it during iteration
  448. CEPollDesc::enotice_t::iterator i = ed.enotice_begin();
  449. while (i != ed.enotice_end())
  450. {
  451. int pos = total; // previous past-the-end position
  452. ++total;
  453. if (total > fdsSize)
  454. break;
  455. fdsSet[pos] = *i;
  456. ed.checkEdge(i++); // NOTE: potentially deletes `i`
  457. }
  458. if (total)
  459. return total;
  460. }
  461. if ((msTimeOut >= 0) && (count_microseconds(srt::sync::steady_clock::now() - entertime) >= msTimeOut * int64_t(1000)))
  462. break; // official wait does: throw CUDTException(MJ_AGAIN, MN_XMTIMEOUT, 0);
  463. CGlobEvent::waitForEvent();
  464. }
  465. return 0;
  466. }
  467. int srt::CEPoll::wait(const int eid, set<SRTSOCKET>* readfds, set<SRTSOCKET>* writefds, int64_t msTimeOut, set<SYSSOCKET>* lrfds, set<SYSSOCKET>* lwfds)
  468. {
  469. // if all fields is NULL and waiting time is infinite, then this would be a deadlock
  470. if (!readfds && !writefds && !lrfds && !lwfds && (msTimeOut < 0))
  471. throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
  472. // Clear these sets in case the app forget to do it.
  473. if (readfds) readfds->clear();
  474. if (writefds) writefds->clear();
  475. if (lrfds) lrfds->clear();
  476. if (lwfds) lwfds->clear();
  477. int total = 0;
  478. srt::sync::steady_clock::time_point entertime = srt::sync::steady_clock::now();
  479. while (true)
  480. {
  481. {
  482. ScopedLock epollock(m_EPollLock);
  483. map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
  484. if (p == m_mPolls.end())
  485. {
  486. LOGC(ealog.Error, log << "EID:" << eid << " INVALID.");
  487. throw CUDTException(MJ_NOTSUP, MN_EIDINVAL);
  488. }
  489. CEPollDesc& ed = p->second;
  490. if (!ed.flags(SRT_EPOLL_ENABLE_EMPTY) && ed.watch_empty() && ed.m_sLocals.empty())
  491. {
  492. // Empty EID is not allowed, report error.
  493. //throw CUDTException(MJ_NOTSUP, MN_INVAL);
  494. LOGC(ealog.Error, log << "EID:" << eid << " no sockets to check, this would deadlock");
  495. throw CUDTException(MJ_NOTSUP, MN_EEMPTY, 0);
  496. }
  497. if (ed.flags(SRT_EPOLL_ENABLE_OUTPUTCHECK))
  498. {
  499. // Empty report is not allowed, report error.
  500. if (!ed.m_sLocals.empty() && (!lrfds || !lwfds))
  501. throw CUDTException(MJ_NOTSUP, MN_INVAL);
  502. if (!ed.watch_empty() && (!readfds || !writefds))
  503. throw CUDTException(MJ_NOTSUP, MN_INVAL);
  504. }
  505. IF_HEAVY_LOGGING(int total_noticed = 0);
  506. IF_HEAVY_LOGGING(ostringstream debug_sockets);
  507. // Sockets with exceptions are returned to both read and write sets.
  508. for (CEPollDesc::enotice_t::iterator it = ed.enotice_begin(), it_next = it; it != ed.enotice_end(); it = it_next)
  509. {
  510. ++it_next;
  511. IF_HEAVY_LOGGING(++total_noticed);
  512. if (readfds && ((it->events & SRT_EPOLL_IN) || (it->events & SRT_EPOLL_ERR)))
  513. {
  514. if (readfds->insert(it->fd).second)
  515. ++total;
  516. }
  517. if (writefds && ((it->events & SRT_EPOLL_OUT) || (it->events & SRT_EPOLL_ERR)))
  518. {
  519. if (writefds->insert(it->fd).second)
  520. ++total;
  521. }
  522. IF_HEAVY_LOGGING(debug_sockets << " " << it->fd << ":"
  523. << IF_DIRNAME(it->events, SRT_EPOLL_IN, "R")
  524. << IF_DIRNAME(it->events, SRT_EPOLL_OUT, "W")
  525. << IF_DIRNAME(it->events, SRT_EPOLL_ERR, "E"));
  526. if (ed.checkEdge(it)) // NOTE: potentially erases 'it'.
  527. {
  528. IF_HEAVY_LOGGING(debug_sockets << "!");
  529. }
  530. }
  531. HLOGC(ealog.Debug, log << "CEPoll::wait: REPORTED " << total << "/" << total_noticed
  532. << debug_sockets.str());
  533. if ((lrfds || lwfds) && !ed.m_sLocals.empty())
  534. {
  535. #ifdef LINUX
  536. const int max_events = ed.m_sLocals.size();
  537. SRT_ASSERT(max_events > 0);
  538. srt::FixedArray<epoll_event> ev(max_events);
  539. int nfds = ::epoll_wait(ed.m_iLocalID, ev.data(), ev.size(), 0);
  540. IF_HEAVY_LOGGING(const int prev_total = total);
  541. for (int i = 0; i < nfds; ++ i)
  542. {
  543. if ((NULL != lrfds) && (ev[i].events & EPOLLIN))
  544. {
  545. lrfds->insert(ev[i].data.fd);
  546. ++ total;
  547. }
  548. if ((NULL != lwfds) && (ev[i].events & EPOLLOUT))
  549. {
  550. lwfds->insert(ev[i].data.fd);
  551. ++ total;
  552. }
  553. }
  554. HLOGC(ealog.Debug, log << "CEPoll::wait: LINUX: picking up " << (total - prev_total) << " ready fds.");
  555. #elif defined(BSD) || TARGET_OS_MAC
  556. struct timespec tmout = {0, 0};
  557. const int max_events = (int)ed.m_sLocals.size();
  558. SRT_ASSERT(max_events > 0);
  559. srt::FixedArray<struct kevent> ke(max_events);
  560. int nfds = kevent(ed.m_iLocalID, NULL, 0, ke.data(), (int)ke.size(), &tmout);
  561. IF_HEAVY_LOGGING(const int prev_total = total);
  562. for (int i = 0; i < nfds; ++ i)
  563. {
  564. if ((NULL != lrfds) && (ke[i].filter == EVFILT_READ))
  565. {
  566. lrfds->insert((int)ke[i].ident);
  567. ++ total;
  568. }
  569. if ((NULL != lwfds) && (ke[i].filter == EVFILT_WRITE))
  570. {
  571. lwfds->insert((int)ke[i].ident);
  572. ++ total;
  573. }
  574. }
  575. HLOGC(ealog.Debug, log << "CEPoll::wait: Darwin/BSD: picking up " << (total - prev_total) << " ready fds.");
  576. #else
  577. //currently "select" is used for all non-Linux platforms.
  578. //faster approaches can be applied for specific systems in the future.
  579. //"select" has a limitation on the number of sockets
  580. int max_fd = 0;
  581. fd_set rqreadfds;
  582. fd_set rqwritefds;
  583. FD_ZERO(&rqreadfds);
  584. FD_ZERO(&rqwritefds);
  585. for (set<SYSSOCKET>::const_iterator i = ed.m_sLocals.begin(); i != ed.m_sLocals.end(); ++ i)
  586. {
  587. if (lrfds)
  588. FD_SET(*i, &rqreadfds);
  589. if (lwfds)
  590. FD_SET(*i, &rqwritefds);
  591. if ((int)*i > max_fd)
  592. max_fd = (int)*i;
  593. }
  594. IF_HEAVY_LOGGING(const int prev_total = total);
  595. timeval tv;
  596. tv.tv_sec = 0;
  597. tv.tv_usec = 0;
  598. if (::select(max_fd + 1, &rqreadfds, &rqwritefds, NULL, &tv) > 0)
  599. {
  600. for (set<SYSSOCKET>::const_iterator i = ed.m_sLocals.begin(); i != ed.m_sLocals.end(); ++ i)
  601. {
  602. if (lrfds && FD_ISSET(*i, &rqreadfds))
  603. {
  604. lrfds->insert(*i);
  605. ++ total;
  606. }
  607. if (lwfds && FD_ISSET(*i, &rqwritefds))
  608. {
  609. lwfds->insert(*i);
  610. ++ total;
  611. }
  612. }
  613. }
  614. HLOGC(ealog.Debug, log << "CEPoll::wait: select(otherSYS): picking up " << (total - prev_total) << " ready fds.");
  615. #endif
  616. }
  617. } // END-LOCK: m_EPollLock
  618. HLOGC(ealog.Debug, log << "CEPoll::wait: Total of " << total << " READY SOCKETS");
  619. if (total > 0)
  620. return total;
  621. if ((msTimeOut >= 0) && (count_microseconds(srt::sync::steady_clock::now() - entertime) >= msTimeOut * int64_t(1000)))
  622. {
  623. HLOGC(ealog.Debug, log << "EID:" << eid << ": TIMEOUT.");
  624. throw CUDTException(MJ_AGAIN, MN_XMTIMEOUT, 0);
  625. }
  626. const bool wait_signaled SRT_ATR_UNUSED = CGlobEvent::waitForEvent();
  627. HLOGC(ealog.Debug, log << "CEPoll::wait: EVENT WAITING: "
  628. << (wait_signaled ? "TRIGGERED" : "CHECKPOINT"));
  629. }
  630. return 0;
  631. }
  632. int srt::CEPoll::swait(CEPollDesc& d, map<SRTSOCKET, int>& st, int64_t msTimeOut, bool report_by_exception)
  633. {
  634. {
  635. ScopedLock lg (m_EPollLock);
  636. if (!d.flags(SRT_EPOLL_ENABLE_EMPTY) && d.watch_empty() && msTimeOut < 0)
  637. {
  638. // no socket is being monitored, this may be a deadlock
  639. LOGC(ealog.Error, log << "EID:" << d.m_iID << " no sockets to check, this would deadlock");
  640. if (report_by_exception)
  641. throw CUDTException(MJ_NOTSUP, MN_EEMPTY, 0);
  642. return -1;
  643. }
  644. }
  645. st.clear();
  646. steady_clock::time_point entertime = steady_clock::now();
  647. while (true)
  648. {
  649. {
  650. // Not extracting separately because this function is
  651. // for internal use only and we state that the eid could
  652. // not be deleted or changed the target CEPollDesc in the
  653. // meantime.
  654. // Here we only prevent the pollset be updated simultaneously
  655. // with unstable reading.
  656. ScopedLock lg (m_EPollLock);
  657. if (!d.flags(SRT_EPOLL_ENABLE_EMPTY) && d.watch_empty())
  658. {
  659. // Empty EID is not allowed, report error.
  660. throw CUDTException(MJ_NOTSUP, MN_EEMPTY);
  661. }
  662. if (!d.m_sLocals.empty())
  663. {
  664. // XXX Add error log
  665. // uwait should not be used with EIDs subscribed to system sockets
  666. throw CUDTException(MJ_NOTSUP, MN_INVAL);
  667. }
  668. bool empty = d.enotice_empty();
  669. if (!empty || msTimeOut == 0)
  670. {
  671. IF_HEAVY_LOGGING(ostringstream singles);
  672. // If msTimeOut == 0, it means that we need the information
  673. // immediately, we don't want to wait. Therefore in this case
  674. // report also when none is ready.
  675. int total = 0; // This is a list, so count it during iteration
  676. CEPollDesc::enotice_t::iterator i = d.enotice_begin();
  677. while (i != d.enotice_end())
  678. {
  679. ++total;
  680. st[i->fd] = i->events;
  681. IF_HEAVY_LOGGING(singles << "@" << i->fd << ":");
  682. IF_HEAVY_LOGGING(PrintEpollEvent(singles, i->events, i->parent->edgeOnly()));
  683. const bool edged SRT_ATR_UNUSED = d.checkEdge(i++); // NOTE: potentially deletes `i`
  684. IF_HEAVY_LOGGING(singles << (edged ? "<^> " : " "));
  685. }
  686. // Logging into 'singles' because it notifies as to whether
  687. // the edge-triggered event has been cleared
  688. HLOGC(ealog.Debug, log << "E" << d.m_iID << " rdy=" << total << ": "
  689. << singles.str()
  690. << " TRACKED: " << d.DisplayEpollWatch());
  691. return total;
  692. }
  693. // Don't report any updates because this check happens
  694. // extremely often.
  695. }
  696. if ((msTimeOut >= 0) && ((steady_clock::now() - entertime) >= microseconds_from(msTimeOut * int64_t(1000))))
  697. {
  698. HLOGC(ealog.Debug, log << "EID:" << d.m_iID << ": TIMEOUT.");
  699. if (report_by_exception)
  700. throw CUDTException(MJ_AGAIN, MN_XMTIMEOUT, 0);
  701. return 0; // meaning "none is ready"
  702. }
  703. CGlobEvent::waitForEvent();
  704. }
  705. return 0;
  706. }
  707. bool srt::CEPoll::empty(const CEPollDesc& d) const
  708. {
  709. ScopedLock lg (m_EPollLock);
  710. return d.watch_empty();
  711. }
  712. int srt::CEPoll::release(const int eid)
  713. {
  714. ScopedLock pg(m_EPollLock);
  715. map<int, CEPollDesc>::iterator i = m_mPolls.find(eid);
  716. if (i == m_mPolls.end())
  717. throw CUDTException(MJ_NOTSUP, MN_EIDINVAL);
  718. #ifdef LINUX
  719. // release local/system epoll descriptor
  720. ::close(i->second.m_iLocalID);
  721. #elif defined(BSD) || TARGET_OS_MAC
  722. ::close(i->second.m_iLocalID);
  723. #endif
  724. m_mPolls.erase(i);
  725. return 0;
  726. }
  727. int srt::CEPoll::update_events(const SRTSOCKET& uid, std::set<int>& eids, const int events, const bool enable)
  728. {
  729. // As event flags no longer contain only event types, check now.
  730. if ((events & ~SRT_EPOLL_EVENTTYPES) != 0)
  731. {
  732. LOGC(eilog.Fatal, log << "epoll/update: IPE: 'events' parameter shall not contain special flags!");
  733. return -1; // still, ignored.
  734. }
  735. int nupdated = 0;
  736. vector<int> lost;
  737. IF_HEAVY_LOGGING(ostringstream debug);
  738. IF_HEAVY_LOGGING(debug << "epoll/update: @" << uid << " " << (enable ? "+" : "-"));
  739. IF_HEAVY_LOGGING(PrintEpollEvent(debug, events));
  740. ScopedLock pg (m_EPollLock);
  741. for (set<int>::iterator i = eids.begin(); i != eids.end(); ++ i)
  742. {
  743. map<int, CEPollDesc>::iterator p = m_mPolls.find(*i);
  744. if (p == m_mPolls.end())
  745. {
  746. HLOGC(eilog.Note, log << "epoll/update: E" << *i << " was deleted in the meantime");
  747. // EID invalid, though still present in the socket's subscriber list
  748. // (dangling in the socket). Postpone to fix the subscruption and continue.
  749. lost.push_back(*i);
  750. continue;
  751. }
  752. CEPollDesc& ed = p->second;
  753. // Check if this EID is subscribed for this socket.
  754. CEPollDesc::Wait* pwait = ed.watch_find(uid);
  755. if (!pwait)
  756. {
  757. // As this is mapped in the socket's data, it should be impossible.
  758. LOGC(eilog.Error, log << "epoll/update: IPE: update struck E"
  759. << (*i) << " which is NOT SUBSCRIBED to @" << uid);
  760. continue;
  761. }
  762. IF_HEAVY_LOGGING(string tracking = " TRACKING: " + ed.DisplayEpollWatch());
  763. // compute new states
  764. // New state to be set into the permanent state
  765. const int newstate = enable ? pwait->state | events // SET event bits if enable
  766. : pwait->state & (~events); // CLEAR event bits
  767. // compute states changes!
  768. int changes = pwait->state ^ newstate; // oldState XOR newState
  769. if (!changes)
  770. {
  771. HLOGC(eilog.Debug, log << debug.str() << ": E" << (*i)
  772. << tracking << " NOT updated: no changes");
  773. continue; // no changes!
  774. }
  775. // assign new state
  776. pwait->state = newstate;
  777. // filter change relating what is watching
  778. changes &= pwait->watch;
  779. if (!changes)
  780. {
  781. HLOGC(eilog.Debug, log << debug.str() << ": E" << (*i)
  782. << tracking << " NOT updated: not subscribed");
  783. continue; // no change watching
  784. }
  785. // set events changes!
  786. // This function will update the notice object associated with
  787. // the given events, that is:
  788. // - if enable, it will set event flags, possibly in a new notice object
  789. // - if !enable, it will clear event flags, possibly remove notice if resulted in 0
  790. ed.updateEventNotice(*pwait, uid, events, enable);
  791. ++nupdated;
  792. HLOGC(eilog.Debug, log << debug.str() << ": E" << (*i)
  793. << " TRACKING: " << ed.DisplayEpollWatch());
  794. }
  795. for (vector<int>::iterator i = lost.begin(); i != lost.end(); ++ i)
  796. eids.erase(*i);
  797. return nupdated;
  798. }
  799. // Debug use only.
  800. #if ENABLE_HEAVY_LOGGING
  801. namespace srt
  802. {
  803. static ostream& PrintEpollEvent(ostream& os, int events, int et_events)
  804. {
  805. static pair<int, const char*> const namemap [] = {
  806. make_pair(SRT_EPOLL_IN, "R"),
  807. make_pair(SRT_EPOLL_OUT, "W"),
  808. make_pair(SRT_EPOLL_ERR, "E"),
  809. make_pair(SRT_EPOLL_UPDATE, "U")
  810. };
  811. int N = Size(namemap);
  812. for (int i = 0; i < N; ++i)
  813. {
  814. if (events & namemap[i].first)
  815. {
  816. os << "[";
  817. if (et_events & namemap[i].first)
  818. os << "^";
  819. os << namemap[i].second << "]";
  820. }
  821. }
  822. return os;
  823. }
  824. string DisplayEpollResults(const std::map<SRTSOCKET, int>& sockset)
  825. {
  826. typedef map<SRTSOCKET, int> fmap_t;
  827. ostringstream os;
  828. for (fmap_t::const_iterator i = sockset.begin(); i != sockset.end(); ++i)
  829. {
  830. os << "@" << i->first << ":";
  831. PrintEpollEvent(os, i->second);
  832. os << " ";
  833. }
  834. return os.str();
  835. }
  836. string CEPollDesc::DisplayEpollWatch()
  837. {
  838. ostringstream os;
  839. for (ewatch_t::const_iterator i = m_USockWatchState.begin(); i != m_USockWatchState.end(); ++i)
  840. {
  841. os << "@" << i->first << ":";
  842. PrintEpollEvent(os, i->second.watch, i->second.edge);
  843. os << " ";
  844. }
  845. return os.str();
  846. }
  847. } // namespace srt
  848. #endif