sync_posix.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572
  1. /*
  2. * SRT - Secure, Reliable, Transport
  3. * Copyright (c) 2019 Haivision Systems Inc.
  4. *
  5. * This Source Code Form is subject to the terms of the Mozilla Public
  6. * License, v. 2.0. If a copy of the MPL was not distributed with this
  7. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  8. *
  9. */
  10. #include "platform_sys.h"
  11. #include <iomanip>
  12. #include <math.h>
  13. #include <stdexcept>
  14. #include "sync.h"
  15. #include "utilities.h"
  16. #include "udt.h"
  17. #include "srt.h"
  18. #include "srt_compat.h"
  19. #include "logging.h"
  20. #include "common.h"
  21. #if defined(_WIN32)
  22. #include "win/wintime.h"
  23. #include <sys/timeb.h>
  24. #elif TARGET_OS_MAC
  25. #include <mach/mach_time.h>
  26. #endif
  27. namespace srt_logging
  28. {
  29. extern Logger inlog;
  30. }
  31. using namespace srt_logging;
  32. namespace srt
  33. {
  34. namespace sync
  35. {
  36. static void rdtsc(uint64_t& x)
  37. {
  38. #if SRT_SYNC_CLOCK == SRT_SYNC_CLOCK_IA32_RDTSC
  39. uint32_t lval, hval;
  40. // asm volatile ("push %eax; push %ebx; push %ecx; push %edx");
  41. // asm volatile ("xor %eax, %eax; cpuid");
  42. asm volatile("rdtsc" : "=a"(lval), "=d"(hval));
  43. // asm volatile ("pop %edx; pop %ecx; pop %ebx; pop %eax");
  44. x = hval;
  45. x = (x << 32) | lval;
  46. #elif SRT_SYNC_CLOCK == SRT_SYNC_CLOCK_IA64_ITC
  47. asm("mov %0=ar.itc" : "=r"(x)::"memory");
  48. #elif SRT_SYNC_CLOCK == SRT_SYNC_CLOCK_AMD64_RDTSC
  49. uint32_t lval, hval;
  50. asm volatile("rdtsc" : "=a"(lval), "=d"(hval));
  51. x = hval;
  52. x = (x << 32) | lval;
  53. #elif SRT_SYNC_CLOCK == SRT_SYNC_CLOCK_WINQPC
  54. // This function should not fail, because we checked the QPC
  55. // when calling to QueryPerformanceFrequency. If it failed,
  56. // the m_bUseMicroSecond was set to true.
  57. QueryPerformanceCounter((LARGE_INTEGER*)&x);
  58. #elif SRT_SYNC_CLOCK == SRT_SYNC_CLOCK_MACH_ABSTIME
  59. x = mach_absolute_time();
  60. #elif SRT_SYNC_CLOCK == SRT_SYNC_CLOCK_GETTIME_MONOTONIC
  61. // get_cpu_frequency() returns 1 us accuracy in this case
  62. timespec tm;
  63. clock_gettime(CLOCK_MONOTONIC, &tm);
  64. x = tm.tv_sec * uint64_t(1000000) + (tm.tv_nsec / 1000);
  65. #elif SRT_SYNC_CLOCK == SRT_SYNC_CLOCK_POSIX_GETTIMEOFDAY
  66. // use system call to read time clock for other archs
  67. timeval t;
  68. gettimeofday(&t, 0);
  69. x = t.tv_sec * uint64_t(1000000) + t.tv_usec;
  70. #else
  71. #error Wrong SRT_SYNC_CLOCK
  72. #endif
  73. }
  74. static int64_t get_cpu_frequency()
  75. {
  76. int64_t frequency = 1; // 1 tick per microsecond.
  77. #if SRT_SYNC_CLOCK == SRT_SYNC_CLOCK_WINQPC
  78. LARGE_INTEGER ccf; // in counts per second
  79. if (QueryPerformanceFrequency(&ccf))
  80. {
  81. frequency = ccf.QuadPart / 1000000; // counts per microsecond
  82. if (frequency == 0)
  83. {
  84. LOGC(inlog.Warn, log << "Win QPC frequency of " << ccf.QuadPart
  85. << " counts/s is below the required 1 us accuracy. Please consider using C++11 timing (-DENABLE_STDCXX_SYNC=ON) instead.");
  86. frequency = 1; // set back to 1 to avoid division by zero.
  87. }
  88. }
  89. else
  90. {
  91. // Can't throw an exception, it won't be handled.
  92. LOGC(inlog.Error, log << "IPE: QueryPerformanceFrequency failed with " << GetLastError());
  93. }
  94. #elif SRT_SYNC_CLOCK == SRT_SYNC_CLOCK_MACH_ABSTIME
  95. mach_timebase_info_data_t info;
  96. mach_timebase_info(&info);
  97. frequency = info.denom * int64_t(1000) / info.numer;
  98. #elif SRT_SYNC_CLOCK >= SRT_SYNC_CLOCK_AMD64_RDTSC && SRT_SYNC_CLOCK <= SRT_SYNC_CLOCK_IA64_ITC
  99. // SRT_SYNC_CLOCK_AMD64_RDTSC or SRT_SYNC_CLOCK_IA32_RDTSC or SRT_SYNC_CLOCK_IA64_ITC
  100. uint64_t t1, t2;
  101. rdtsc(t1);
  102. timespec ts;
  103. ts.tv_sec = 0;
  104. ts.tv_nsec = 100000000;
  105. nanosleep(&ts, NULL);
  106. rdtsc(t2);
  107. // CPU clocks per microsecond
  108. frequency = int64_t(t2 - t1) / 100000;
  109. #endif
  110. return frequency;
  111. }
  112. static int count_subsecond_precision(int64_t ticks_per_us)
  113. {
  114. int signs = 6; // starting from 1 us
  115. while (ticks_per_us /= 10) ++signs;
  116. return signs;
  117. }
  118. const int64_t s_clock_ticks_per_us = get_cpu_frequency();
  119. const int s_clock_subsecond_precision = count_subsecond_precision(s_clock_ticks_per_us);
  120. int clockSubsecondPrecision() { return s_clock_subsecond_precision; }
  121. } // namespace sync
  122. } // namespace srt
  123. ////////////////////////////////////////////////////////////////////////////////
  124. //
  125. // Sync utilities section
  126. //
  127. ////////////////////////////////////////////////////////////////////////////////
  128. static timespec us_to_timespec(const uint64_t time_us)
  129. {
  130. timespec timeout;
  131. timeout.tv_sec = time_us / 1000000;
  132. timeout.tv_nsec = (time_us % 1000000) * 1000;
  133. return timeout;
  134. }
  135. ////////////////////////////////////////////////////////////////////////////////
  136. //
  137. // TimePoint section
  138. //
  139. ////////////////////////////////////////////////////////////////////////////////
  140. template <>
  141. srt::sync::Duration<srt::sync::steady_clock> srt::sync::TimePoint<srt::sync::steady_clock>::time_since_epoch() const
  142. {
  143. return srt::sync::Duration<srt::sync::steady_clock>(m_timestamp);
  144. }
  145. srt::sync::TimePoint<srt::sync::steady_clock> srt::sync::steady_clock::now()
  146. {
  147. uint64_t x = 0;
  148. rdtsc(x);
  149. return TimePoint<steady_clock>(x);
  150. }
  151. int64_t srt::sync::count_microseconds(const steady_clock::duration& t)
  152. {
  153. return t.count() / s_clock_ticks_per_us;
  154. }
  155. int64_t srt::sync::count_milliseconds(const steady_clock::duration& t)
  156. {
  157. return t.count() / s_clock_ticks_per_us / 1000;
  158. }
  159. int64_t srt::sync::count_seconds(const steady_clock::duration& t)
  160. {
  161. return t.count() / s_clock_ticks_per_us / 1000000;
  162. }
  163. srt::sync::steady_clock::duration srt::sync::microseconds_from(int64_t t_us)
  164. {
  165. return steady_clock::duration(t_us * s_clock_ticks_per_us);
  166. }
  167. srt::sync::steady_clock::duration srt::sync::milliseconds_from(int64_t t_ms)
  168. {
  169. return steady_clock::duration((1000 * t_ms) * s_clock_ticks_per_us);
  170. }
  171. srt::sync::steady_clock::duration srt::sync::seconds_from(int64_t t_s)
  172. {
  173. return steady_clock::duration((1000000 * t_s) * s_clock_ticks_per_us);
  174. }
  175. srt::sync::Mutex::Mutex()
  176. {
  177. const int err = pthread_mutex_init(&m_mutex, 0);
  178. if (err)
  179. {
  180. throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
  181. }
  182. }
  183. srt::sync::Mutex::~Mutex()
  184. {
  185. pthread_mutex_destroy(&m_mutex);
  186. }
  187. int srt::sync::Mutex::lock()
  188. {
  189. return pthread_mutex_lock(&m_mutex);
  190. }
  191. int srt::sync::Mutex::unlock()
  192. {
  193. return pthread_mutex_unlock(&m_mutex);
  194. }
  195. bool srt::sync::Mutex::try_lock()
  196. {
  197. return (pthread_mutex_trylock(&m_mutex) == 0);
  198. }
  199. srt::sync::ScopedLock::ScopedLock(Mutex& m)
  200. : m_mutex(m)
  201. {
  202. m_mutex.lock();
  203. }
  204. srt::sync::ScopedLock::~ScopedLock()
  205. {
  206. m_mutex.unlock();
  207. }
  208. srt::sync::UniqueLock::UniqueLock(Mutex& m)
  209. : m_Mutex(m)
  210. {
  211. m_iLocked = m_Mutex.lock();
  212. }
  213. srt::sync::UniqueLock::~UniqueLock()
  214. {
  215. if (m_iLocked == 0)
  216. {
  217. unlock();
  218. }
  219. }
  220. void srt::sync::UniqueLock::lock()
  221. {
  222. if (m_iLocked != -1)
  223. throw CThreadException(MJ_SYSTEMRES, MN_THREAD, 0);
  224. m_iLocked = m_Mutex.lock();
  225. }
  226. void srt::sync::UniqueLock::unlock()
  227. {
  228. if (m_iLocked != 0)
  229. throw CThreadException(MJ_SYSTEMRES, MN_THREAD, 0);
  230. m_Mutex.unlock();
  231. m_iLocked = -1;
  232. }
  233. srt::sync::Mutex* srt::sync::UniqueLock::mutex()
  234. {
  235. return &m_Mutex;
  236. }
  237. ////////////////////////////////////////////////////////////////////////////////
  238. //
  239. // Condition section (based on pthreads)
  240. //
  241. ////////////////////////////////////////////////////////////////////////////////
  242. namespace srt
  243. {
  244. namespace sync
  245. {
  246. Condition::Condition()
  247. #ifdef _WIN32
  248. : m_cv(PTHREAD_COND_INITIALIZER)
  249. #endif
  250. {}
  251. Condition::~Condition() {}
  252. void Condition::init()
  253. {
  254. pthread_condattr_t* attr = NULL;
  255. #if SRT_SYNC_CLOCK == SRT_SYNC_CLOCK_GETTIME_MONOTONIC
  256. pthread_condattr_t CondAttribs;
  257. pthread_condattr_init(&CondAttribs);
  258. pthread_condattr_setclock(&CondAttribs, CLOCK_MONOTONIC);
  259. attr = &CondAttribs;
  260. #endif
  261. const int res = pthread_cond_init(&m_cv, attr);
  262. if (res != 0)
  263. throw std::runtime_error("pthread_cond_init monotonic failed");
  264. }
  265. void Condition::destroy()
  266. {
  267. pthread_cond_destroy(&m_cv);
  268. }
  269. void Condition::wait(UniqueLock& lock)
  270. {
  271. pthread_cond_wait(&m_cv, &lock.mutex()->ref());
  272. }
  273. bool Condition::wait_for(UniqueLock& lock, const steady_clock::duration& rel_time)
  274. {
  275. timespec timeout;
  276. #if SRT_SYNC_CLOCK == SRT_SYNC_CLOCK_GETTIME_MONOTONIC
  277. clock_gettime(CLOCK_MONOTONIC, &timeout);
  278. const uint64_t now_us = timeout.tv_sec * uint64_t(1000000) + (timeout.tv_nsec / 1000);
  279. #else
  280. timeval now;
  281. gettimeofday(&now, 0);
  282. const uint64_t now_us = now.tv_sec * uint64_t(1000000) + now.tv_usec;
  283. #endif
  284. timeout = us_to_timespec(now_us + count_microseconds(rel_time));
  285. return pthread_cond_timedwait(&m_cv, &lock.mutex()->ref(), &timeout) != ETIMEDOUT;
  286. }
  287. bool Condition::wait_until(UniqueLock& lock, const steady_clock::time_point& timeout_time)
  288. {
  289. // This will work regardless as to which clock is in use. The time
  290. // should be specified as steady_clock::time_point, so there's no
  291. // question of the timer base.
  292. const steady_clock::time_point now = steady_clock::now();
  293. if (now >= timeout_time)
  294. return false; // timeout
  295. // wait_for() is used because it will be converted to pthread-frienly timeout_time inside.
  296. return wait_for(lock, timeout_time - now);
  297. }
  298. void Condition::notify_one()
  299. {
  300. pthread_cond_signal(&m_cv);
  301. }
  302. void Condition::notify_all()
  303. {
  304. pthread_cond_broadcast(&m_cv);
  305. }
  306. }; // namespace sync
  307. }; // namespace srt
  308. ////////////////////////////////////////////////////////////////////////////////
  309. //
  310. // CThread class
  311. //
  312. ////////////////////////////////////////////////////////////////////////////////
  313. srt::sync::CThread::CThread()
  314. {
  315. m_thread = pthread_t();
  316. }
  317. srt::sync::CThread::CThread(void *(*start_routine) (void *), void *arg)
  318. {
  319. create(start_routine, arg);
  320. }
  321. #if HAVE_FULL_CXX11
  322. srt::sync::CThread& srt::sync::CThread::operator=(CThread&& other)
  323. #else
  324. srt::sync::CThread& srt::sync::CThread::operator=(CThread& other)
  325. #endif
  326. {
  327. if (joinable())
  328. {
  329. // If the thread has already terminated, then
  330. // pthread_join() returns immediately.
  331. // But we have to check it has terminated before replacing it.
  332. LOGC(inlog.Error, log << "IPE: Assigning to a thread that is not terminated!");
  333. #ifndef DEBUG
  334. #ifndef __ANDROID__
  335. // In case of production build the hanging thread should be terminated
  336. // to avoid hang ups and align with C++11 implementation.
  337. // There is no pthread_cancel on Android. See #1476. This error should not normally
  338. // happen, but if it happen, then detaching the thread.
  339. pthread_cancel(m_thread);
  340. #endif // __ANDROID__
  341. #else
  342. join();
  343. #endif
  344. }
  345. // Move thread handler from other
  346. m_thread = other.m_thread;
  347. other.m_thread = pthread_t();
  348. return *this;
  349. }
  350. #if !HAVE_FULL_CXX11
  351. void srt::sync::CThread::create_thread(void *(*start_routine) (void *), void *arg)
  352. {
  353. SRT_ASSERT(!joinable());
  354. create(start_routine, arg);
  355. }
  356. #endif
  357. bool srt::sync::CThread::joinable() const
  358. {
  359. return !pthread_equal(m_thread, pthread_t());
  360. }
  361. void srt::sync::CThread::join()
  362. {
  363. void *retval;
  364. const int ret SRT_ATR_UNUSED = pthread_join(m_thread, &retval);
  365. if (ret != 0)
  366. {
  367. LOGC(inlog.Error, log << "pthread_join failed with " << ret);
  368. }
  369. #ifdef HEAVY_LOGGING
  370. else
  371. {
  372. HLOGC(inlog.Debug, log << "pthread_join SUCCEEDED");
  373. }
  374. #endif
  375. // After joining, joinable should be false
  376. m_thread = pthread_t();
  377. return;
  378. }
  379. void srt::sync::CThread::create(void *(*start_routine) (void *), void *arg)
  380. {
  381. const int st = pthread_create(&m_thread, NULL, start_routine, arg);
  382. if (st != 0)
  383. {
  384. LOGC(inlog.Error, log << "pthread_create failed with " << st);
  385. throw CThreadException(MJ_SYSTEMRES, MN_THREAD, 0);
  386. }
  387. }
  388. ////////////////////////////////////////////////////////////////////////////////
  389. //
  390. // CThreadError class - thread local storage error wrapper
  391. //
  392. ////////////////////////////////////////////////////////////////////////////////
  393. namespace srt {
  394. namespace sync {
  395. class CThreadError
  396. {
  397. public:
  398. CThreadError()
  399. {
  400. pthread_key_create(&m_ThreadSpecKey, ThreadSpecKeyDestroy);
  401. // This is a global object and as such it should be called in the
  402. // main application thread or at worst in the thread that has first
  403. // run `srt_startup()` function and so requested the SRT library to
  404. // be dynamically linked. Most probably in this very thread the API
  405. // errors will be reported, so preallocate the ThreadLocalSpecific
  406. // object for this error description.
  407. // This allows std::bac_alloc to crash the program during
  408. // the initialization of the SRT library (likely it would be
  409. // during the DL constructor, still way before any chance of
  410. // doing any operations here). This will prevent SRT from running
  411. // into trouble while trying to operate.
  412. CUDTException* ne = new CUDTException();
  413. pthread_setspecific(m_ThreadSpecKey, ne);
  414. }
  415. ~CThreadError()
  416. {
  417. // Likely all objects should be deleted in all
  418. // threads that have exited, but std::this_thread didn't exit
  419. // yet :).
  420. ThreadSpecKeyDestroy(pthread_getspecific(m_ThreadSpecKey));
  421. pthread_key_delete(m_ThreadSpecKey);
  422. }
  423. void set(const CUDTException& e)
  424. {
  425. CUDTException* cur = get();
  426. // If this returns NULL, it means that there was an unexpected
  427. // memory allocation error. Simply ignore this request if so
  428. // happened, and then when trying to get the error description
  429. // the application will always get the memory allocation error.
  430. // There's no point in doing anything else here; lack of memory
  431. // must be prepared for prematurely, and that was already done.
  432. if (!cur)
  433. return;
  434. *cur = e;
  435. }
  436. /*[[nullable]]*/ CUDTException* get()
  437. {
  438. if (!pthread_getspecific(m_ThreadSpecKey))
  439. {
  440. // This time if this can't be done due to memory allocation
  441. // problems, just allow this value to be NULL, which during
  442. // getting the error description will redirect to a memory
  443. // allocation error.
  444. // It would be nice to somehow ensure that this object is
  445. // created in every thread of the application using SRT, but
  446. // POSIX thread API doesn't contain any possibility to have
  447. // a creation callback that would apply to every thread in
  448. // the application (as it is for C++11 thread_local storage).
  449. CUDTException* ne = new(std::nothrow) CUDTException();
  450. pthread_setspecific(m_ThreadSpecKey, ne);
  451. return ne;
  452. }
  453. return (CUDTException*)pthread_getspecific(m_ThreadSpecKey);
  454. }
  455. static void ThreadSpecKeyDestroy(void* e)
  456. {
  457. delete (CUDTException*)e;
  458. }
  459. private:
  460. pthread_key_t m_ThreadSpecKey;
  461. };
  462. // Threal local error will be used by CUDTUnited
  463. // that has a static scope
  464. // This static makes this object file-private access so that
  465. // the access is granted only for the accessor functions.
  466. static CThreadError s_thErr;
  467. void SetThreadLocalError(const CUDTException& e)
  468. {
  469. s_thErr.set(e);
  470. }
  471. CUDTException& GetThreadLocalError()
  472. {
  473. // In POSIX version we take into account the possibility
  474. // of having an allocation error here. Therefore we need to
  475. // allow thie value to return NULL and have some fallback
  476. // for that case. The dynamic memory allocation failure should
  477. // be the only case as to why it is unable to get the pointer
  478. // to the error description.
  479. static CUDTException resident_alloc_error (MJ_SYSTEMRES, MN_MEMORY);
  480. CUDTException* curx = s_thErr.get();
  481. if (!curx)
  482. return resident_alloc_error;
  483. return *curx;
  484. }
  485. } // namespace sync
  486. } // namespace srt