thread.cpp 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. /*
  2. * Copyright (c) 2018-2023 SignalWire, Inc
  3. *
  4. * Permission is hereby granted, free of charge, to any person obtaining a copy
  5. * of this software and associated documentation files (the "Software"), to deal
  6. * in the Software without restriction, including without limitation the rights
  7. * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  8. * copies of the Software, and to permit persons to whom the Software is
  9. * furnished to do so, subject to the following conditions:
  10. *
  11. * The above copyright notice and this permission notice shall be included in all
  12. * copies or substantial portions of the Software.
  13. *
  14. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  15. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  16. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  17. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  18. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  19. * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  20. * SOFTWARE.
  21. */
  22. #include "KSTest.hpp"
  23. #include "libks/internal/ks_thread.h"
  24. #include "catch/catch.hpp"
  25. using namespace signalwire::pal;
  26. using namespace signalwire::pal::async;
  27. /**
  28. * thread_join tests the request_stop feature and how it relates to
  29. * the thread state management from stop->join->destroy flow.
  30. */
  31. TEST_CASE("thread_request_stop")
  32. {
  33. ks_thread_t *thread = nullptr;
  34. struct State {
  35. volatile ks_bool_t ran = KS_FALSE;
  36. ks_spinlock_t block_stop = {0};
  37. };
  38. State state;
  39. auto runLoop = [](ks_thread_t *thread, void *data)->void * {
  40. auto state = static_cast<State *>(data);
  41. state->ran = KS_TRUE;
  42. while (ks_thread_stop_requested(thread) == KS_FALSE) {
  43. sleep(time::seconds(1));
  44. }
  45. // Let caller control our exit here by blocking on their spin lock
  46. ks_spinlock_acquire(&state->block_stop);
  47. return nullptr;
  48. };
  49. uint32_t active_attached, active_detached;
  50. ks_thread_stats(&active_attached, &active_detached);
  51. REQUIRE(active_attached == 0);
  52. KCHECK(ks_thread_create_ex(&thread, runLoop, &state, 0, 1024 * 1024, KS_PRI_REALTIME, nullptr));
  53. // Keep the block stop locked so we can force the thread to sit in a mid transition state
  54. ks_spinlock_acquire(&state.block_stop);
  55. ks_thread_stats(&active_attached, &active_detached);
  56. REQUIRE(active_attached == 1);
  57. // First call should succeed
  58. KCHECK(ks_thread_request_stop(thread));
  59. // Second should fail indicating it was already requested
  60. REQUIRE(ks_thread_request_stop(thread) == KS_STATUS_THREAD_ALREADY_STOPPED);
  61. // Now even though we requested it stop, we still
  62. // should be allowed to block on a join
  63. auto firstJoiner = task::start("First joiner", ks_thread_join, thread);
  64. // But a second thread should error (first one is hung now)
  65. auto secondJoiner = task::start("Check block on second", ks_thread_join, thread);
  66. REQUIRE(secondJoiner->getResult() == KS_STATUS_THREAD_ALREADY_JOINED);
  67. // Great now let the thread die
  68. ks_spinlock_release(&state.block_stop);
  69. // And join on the first joiner
  70. REQUIRE(firstJoiner->getResult() == KS_STATUS_SUCCESS);
  71. // Finally should be able to delete the darn thing
  72. ks_thread_destroy(&thread);
  73. // And verify the resource numbers line up
  74. ks_thread_stats(&active_attached, &active_detached);
  75. REQUIRE(active_attached == 0);
  76. REQUIRE(state.ran == KS_TRUE);
  77. }
  78. /**
  79. * thread_destroy tests the synchronization of a single call to destroy
  80. * properly joining and deleting an attached thread.
  81. */
  82. TEST_CASE("thread_destroy")
  83. {
  84. ks_thread_t *thread = nullptr;
  85. ks_bool_t ran = KS_FALSE;
  86. auto runLoop = [](ks_thread_t *thread, void *data)->void * {
  87. *static_cast<ks_bool_t *>(data) = KS_TRUE;
  88. while (ks_thread_stop_requested(thread) == KS_FALSE) {
  89. sleep(time::seconds(1));
  90. }
  91. // Wait extra to force a block on join to ensure that is working
  92. sleep(time::seconds(3));
  93. return nullptr;
  94. };
  95. uint32_t active_attached, active_detached;
  96. ks_thread_stats(&active_attached, &active_detached);
  97. REQUIRE(active_attached == 0);
  98. // Spawn an attached thread in the global pool
  99. KCHECK(ks_thread_create_ex(&thread, runLoop, &ran, 0, 1024 * 1024, KS_PRI_REALTIME, nullptr));
  100. // Ensure tracking is correct
  101. ks_thread_stats(&active_attached, &active_detached);
  102. REQUIRE(active_attached == 1);
  103. // Destroy this thread should properly block and not crash
  104. ks_thread_destroy(&thread);
  105. // Stats should be zero
  106. ks_thread_stats(&active_attached, &active_detached);
  107. REQUIRE(active_attached == 0);
  108. // And it should've ran
  109. REQUIRE(ran == KS_TRUE);
  110. }
  111. class KSThread {
  112. public:
  113. KSThread() = default;
  114. ~KSThread()
  115. {
  116. LOG(TEST, "Destructing");
  117. stop();
  118. destroy();
  119. }
  120. void start()
  121. {
  122. auto guard = m_stateLock.lock();
  123. destroy();
  124. LOG(TEST, "Starting");
  125. auto result = ks_thread_create_ex(&m_thread, KSThread::__bootstrap, this, 0, 1024 * 1024, KS_PRI_REALTIME, nullptr);
  126. REQUIRE(result == KS_STATUS_SUCCESS);
  127. REQUIRE(m_thread != nullptr);
  128. LOG(TEST, "Started:", ks_thread_self_id());
  129. }
  130. void join()
  131. {
  132. auto guard = m_stateLock.lock();
  133. if (m_joined)
  134. return;
  135. auto result = ks_thread_join(m_thread);
  136. if (result && result != KS_STATUS_THREAD_ALREADY_JOINED) {
  137. LOG(TEST, "Unexpected ks_status:", result);
  138. }
  139. m_joined = true;
  140. if (m_exception) {
  141. std::rethrow_exception(m_exception);
  142. }
  143. }
  144. void stop()
  145. {
  146. auto guard = m_stateLock.lock();
  147. auto result = ks_thread_request_stop(m_thread);
  148. if (result && result != KS_STATUS_THREAD_ALREADY_STOPPED) {
  149. PAL_THROW(RuntimeError, "Expected success result from ks_thread_request_stop, got:", result);
  150. }
  151. guard.release();
  152. join();
  153. }
  154. bool stopRequested() const {
  155. auto guard = m_stateLock.lock();
  156. return static_cast<bool>(ks_thread_stop_requested(m_thread));
  157. }
  158. protected:
  159. void destroy() {
  160. if (m_thread) {
  161. ks_thread_destroy(&m_thread);
  162. }
  163. }
  164. static void *__bootstrap(ks_thread_t *thread, void *data) {
  165. while (!ks_thread_stop_requested(thread)) {
  166. char buffer[256 * 1024];
  167. for (char num = 0; num < 255 && !ks_thread_stop_requested(thread); num++) {
  168. memset(buffer, num, sizeof(buffer));
  169. }
  170. }
  171. return nullptr;
  172. }
  173. ks_thread_t *m_thread = nullptr;
  174. std::exception_ptr m_exception;
  175. mutable SpinLock m_stateLock;
  176. bool m_joined = false;
  177. };
  178. TEST_CASE("thread_stress")
  179. {
  180. return;
  181. const auto THREAD_COUNT = 5ul;
  182. const auto THREAD_LOOPS = 10ul;
  183. const auto THREAD_ITERATIONS = 10ul;
  184. auto threadBatch = [&](uint32_t iter_count, uint32_t thr_count) {
  185. for (uint32_t iter = 0; iter < iter_count; iter++) {
  186. LOG(TEST, "Iteration:", iter);
  187. std::vector<std::shared_ptr<KSThread>> threads;
  188. for (unsigned thr = 0; thr < thr_count; thr++) {
  189. auto thread = std::make_shared<KSThread>();
  190. thread->start();
  191. threads.push_back(std::move(thread));
  192. }
  193. sleep(time::seconds(1));
  194. for (auto &thread : threads)
  195. thread->stop();
  196. }
  197. };
  198. std::vector<task::TaskPtr> loopers;
  199. for (auto loop_count = 0; loop_count < THREAD_LOOPS; loop_count++) {
  200. loopers.push_back(
  201. task::start(
  202. "Thread starter/stopper",
  203. threadBatch, // Closure to execute
  204. THREAD_ITERATIONS,
  205. THREAD_COUNT
  206. )
  207. );
  208. }
  209. for (auto &looper : loopers) {
  210. uint32_t active_attached, active_detached;
  211. // Check our counts should not be greater then the sum of all loopers with all threads
  212. ks_thread_stats(&active_attached, &active_detached);
  213. REQUIRE(active_detached == 0);
  214. REQUIRE(active_attached <= (THREAD_COUNT * THREAD_LOOPS));
  215. looper->join();
  216. }
  217. }