apr_queue.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. /* Copyright 2000-2005 The Apache Software Foundation or its licensors, as
  2. * applicable.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "apr.h"
  17. #if APR_HAVE_STDIO_H
  18. #include <stdio.h>
  19. #endif
  20. #if APR_HAVE_STDLIB_H
  21. #include <stdlib.h>
  22. #endif
  23. #if APR_HAVE_UNISTD_H
  24. #include <unistd.h>
  25. #endif
  26. #include "apu.h"
  27. #include "apr_portable.h"
  28. #include "apr_thread_mutex.h"
  29. #include "apr_thread_cond.h"
  30. #include "apr_errno.h"
  31. #include "apr_queue.h"
  32. #if APR_HAS_THREADS
  33. /*
  34. * define this to get debug messages
  35. *
  36. #define QUEUE_DEBUG
  37. */
  38. struct apr_queue_t {
  39. void **data;
  40. unsigned int nelts; /**< # elements */
  41. unsigned int in; /**< next empty location */
  42. unsigned int out; /**< next filled location */
  43. unsigned int bounds;/**< max size of queue */
  44. unsigned int full_waiters;
  45. unsigned int empty_waiters;
  46. apr_thread_mutex_t *one_big_mutex;
  47. apr_thread_cond_t *not_empty;
  48. apr_thread_cond_t *not_full;
  49. int terminated;
  50. };
  51. #ifdef QUEUE_DEBUG
  52. static void Q_DBG(char*msg, apr_queue_t *q) {
  53. fprintf(stderr, "%ld\t#%d in %d out %d\t%s\n",
  54. apr_os_thread_current(),
  55. q->nelts, q->in, q->out,
  56. msg
  57. );
  58. }
  59. #else
  60. #define Q_DBG(x,y)
  61. #endif
  62. /**
  63. * Detects when the apr_queue_t is full. This utility function is expected
  64. * to be called from within critical sections, and is not threadsafe.
  65. */
  66. #define apr_queue_full(queue) ((queue)->nelts == (queue)->bounds)
  67. /**
  68. * Detects when the apr_queue_t is empty. This utility function is expected
  69. * to be called from within critical sections, and is not threadsafe.
  70. */
  71. #define apr_queue_empty(queue) ((queue)->nelts == 0)
  72. /**
  73. * Callback routine that is called to destroy this
  74. * apr_queue_t when its pool is destroyed.
  75. */
  76. static apr_status_t queue_destroy(void *data)
  77. {
  78. apr_queue_t *queue = data;
  79. /* Ignore errors here, we can't do anything about them anyway. */
  80. apr_thread_cond_destroy(queue->not_empty);
  81. apr_thread_cond_destroy(queue->not_full);
  82. apr_thread_mutex_destroy(queue->one_big_mutex);
  83. return APR_SUCCESS;
  84. }
  85. /**
  86. * Initialize the apr_queue_t.
  87. */
  88. APU_DECLARE(apr_status_t) apr_queue_create(apr_queue_t **q,
  89. unsigned int queue_capacity,
  90. apr_pool_t *a)
  91. {
  92. apr_status_t rv;
  93. apr_queue_t *queue;
  94. queue = apr_palloc(a, sizeof(apr_queue_t));
  95. *q = queue;
  96. /* nested doesn't work ;( */
  97. rv = apr_thread_mutex_create(&queue->one_big_mutex,
  98. APR_THREAD_MUTEX_UNNESTED,
  99. a);
  100. if (rv != APR_SUCCESS) {
  101. return rv;
  102. }
  103. rv = apr_thread_cond_create(&queue->not_empty, a);
  104. if (rv != APR_SUCCESS) {
  105. return rv;
  106. }
  107. rv = apr_thread_cond_create(&queue->not_full, a);
  108. if (rv != APR_SUCCESS) {
  109. return rv;
  110. }
  111. /* Set all the data in the queue to NULL */
  112. queue->data = apr_palloc(a, queue_capacity * sizeof(void*));
  113. if (!queue->data) return APR_ENOMEM;
  114. memset(queue->data, 0, queue_capacity * sizeof(void*));
  115. queue->bounds = queue_capacity;
  116. queue->nelts = 0;
  117. queue->in = 0;
  118. queue->out = 0;
  119. queue->terminated = 0;
  120. queue->full_waiters = 0;
  121. queue->empty_waiters = 0;
  122. apr_pool_cleanup_register(a, queue, queue_destroy, apr_pool_cleanup_null);
  123. return APR_SUCCESS;
  124. }
  125. /**
  126. * Push new data onto the queue. Blocks if the queue is full. Once
  127. * the push operation has completed, it signals other threads waiting
  128. * in apr_queue_pop() that they may continue consuming sockets.
  129. */
  130. APU_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data)
  131. {
  132. apr_status_t rv;
  133. if (queue->terminated) {
  134. return APR_EOF; /* no more elements ever again */
  135. }
  136. rv = apr_thread_mutex_lock(queue->one_big_mutex);
  137. if (rv != APR_SUCCESS) {
  138. return rv;
  139. }
  140. if (apr_queue_full(queue)) {
  141. if (!queue->terminated) {
  142. queue->full_waiters++;
  143. rv = apr_thread_cond_wait(queue->not_full, queue->one_big_mutex);
  144. queue->full_waiters--;
  145. if (rv != APR_SUCCESS) {
  146. apr_thread_mutex_unlock(queue->one_big_mutex);
  147. return rv;
  148. }
  149. }
  150. /* If we wake up and it's still empty, then we were interrupted */
  151. if (apr_queue_full(queue)) {
  152. Q_DBG("queue full (intr)", queue);
  153. rv = apr_thread_mutex_unlock(queue->one_big_mutex);
  154. if (rv != APR_SUCCESS) {
  155. return rv;
  156. }
  157. if (queue->terminated) {
  158. return APR_EOF; /* no more elements ever again */
  159. }
  160. else {
  161. return APR_EINTR;
  162. }
  163. }
  164. }
  165. queue->data[queue->in] = data;
  166. queue->in = (queue->in + 1) % queue->bounds;
  167. queue->nelts++;
  168. if (queue->empty_waiters) {
  169. Q_DBG("sig !empty", queue);
  170. rv = apr_thread_cond_signal(queue->not_empty);
  171. if (rv != APR_SUCCESS) {
  172. apr_thread_mutex_unlock(queue->one_big_mutex);
  173. return rv;
  174. }
  175. }
  176. rv = apr_thread_mutex_unlock(queue->one_big_mutex);
  177. return rv;
  178. }
  179. /**
  180. * Push new data onto the queue. Blocks if the queue is full. Once
  181. * the push operation has completed, it signals other threads waiting
  182. * in apr_queue_pop() that they may continue consuming sockets.
  183. */
  184. APU_DECLARE(apr_status_t) apr_queue_trypush(apr_queue_t *queue, void *data)
  185. {
  186. apr_status_t rv;
  187. if (queue->terminated) {
  188. return APR_EOF; /* no more elements ever again */
  189. }
  190. rv = apr_thread_mutex_lock(queue->one_big_mutex);
  191. if (rv != APR_SUCCESS) {
  192. return rv;
  193. }
  194. if (apr_queue_full(queue)) {
  195. apr_thread_mutex_unlock(queue->one_big_mutex);
  196. return APR_EAGAIN;
  197. }
  198. queue->data[queue->in] = data;
  199. queue->in = (queue->in + 1) % queue->bounds;
  200. queue->nelts++;
  201. if (queue->empty_waiters) {
  202. Q_DBG("sig !empty", queue);
  203. rv = apr_thread_cond_signal(queue->not_empty);
  204. if (rv != APR_SUCCESS) {
  205. apr_thread_mutex_unlock(queue->one_big_mutex);
  206. return rv;
  207. }
  208. }
  209. rv = apr_thread_mutex_unlock(queue->one_big_mutex);
  210. return rv;
  211. }
  212. /**
  213. * not thread safe
  214. */
  215. APU_DECLARE(unsigned int) apr_queue_size(apr_queue_t *queue) {
  216. return queue->nelts;
  217. }
  218. /**
  219. * Retrieves the next item from the queue. If there are no
  220. * items available, it will block until one becomes available.
  221. * Once retrieved, the item is placed into the address specified by
  222. * 'data'.
  223. */
  224. APU_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data)
  225. {
  226. apr_status_t rv;
  227. if (queue->terminated) {
  228. return APR_EOF; /* no more elements ever again */
  229. }
  230. rv = apr_thread_mutex_lock(queue->one_big_mutex);
  231. if (rv != APR_SUCCESS) {
  232. return rv;
  233. }
  234. /* Keep waiting until we wake up and find that the queue is not empty. */
  235. if (apr_queue_empty(queue)) {
  236. if (!queue->terminated) {
  237. queue->empty_waiters++;
  238. rv = apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex);
  239. queue->empty_waiters--;
  240. if (rv != APR_SUCCESS) {
  241. apr_thread_mutex_unlock(queue->one_big_mutex);
  242. return rv;
  243. }
  244. }
  245. /* If we wake up and it's still empty, then we were interrupted */
  246. if (apr_queue_empty(queue)) {
  247. Q_DBG("queue empty (intr)", queue);
  248. rv = apr_thread_mutex_unlock(queue->one_big_mutex);
  249. if (rv != APR_SUCCESS) {
  250. return rv;
  251. }
  252. if (queue->terminated) {
  253. return APR_EOF; /* no more elements ever again */
  254. }
  255. else {
  256. return APR_EINTR;
  257. }
  258. }
  259. }
  260. *data = queue->data[queue->out];
  261. queue->nelts--;
  262. queue->out = (queue->out + 1) % queue->bounds;
  263. if (queue->full_waiters) {
  264. Q_DBG("signal !full", queue);
  265. rv = apr_thread_cond_signal(queue->not_full);
  266. if (rv != APR_SUCCESS) {
  267. apr_thread_mutex_unlock(queue->one_big_mutex);
  268. return rv;
  269. }
  270. }
  271. rv = apr_thread_mutex_unlock(queue->one_big_mutex);
  272. return rv;
  273. }
  274. /**
  275. * Retrieves the next item from the queue. If there are no
  276. * items available, it will block until one becomes available, or
  277. * until timeout is elapsed. Once retrieved, the item is placed into
  278. * the address specified by'data'.
  279. */
  280. APU_DECLARE(apr_status_t) apr_queue_pop_timeout(apr_queue_t *queue, void **data, apr_interval_time_t timeout)
  281. {
  282. apr_status_t rv;
  283. if (queue->terminated) {
  284. return APR_EOF; /* no more elements ever again */
  285. }
  286. rv = apr_thread_mutex_lock(queue->one_big_mutex);
  287. if (rv != APR_SUCCESS) {
  288. return rv;
  289. }
  290. /* Keep waiting until we wake up and find that the queue is not empty. */
  291. if (apr_queue_empty(queue)) {
  292. if (!queue->terminated) {
  293. queue->empty_waiters++;
  294. rv = apr_thread_cond_timedwait(queue->not_empty, queue->one_big_mutex, timeout);
  295. queue->empty_waiters--;
  296. /* In the event of a timemout, APR_TIMEUP will be returned */
  297. if (rv != APR_SUCCESS) {
  298. apr_thread_mutex_unlock(queue->one_big_mutex);
  299. return rv;
  300. }
  301. }
  302. /* If we wake up and it's still empty, then we were interrupted */
  303. if (apr_queue_empty(queue)) {
  304. Q_DBG("queue empty (intr)", queue);
  305. rv = apr_thread_mutex_unlock(queue->one_big_mutex);
  306. if (rv != APR_SUCCESS) {
  307. return rv;
  308. }
  309. if (queue->terminated) {
  310. return APR_EOF; /* no more elements ever again */
  311. }
  312. else {
  313. return APR_EINTR;
  314. }
  315. }
  316. }
  317. *data = queue->data[queue->out];
  318. queue->nelts--;
  319. queue->out = (queue->out + 1) % queue->bounds;
  320. if (queue->full_waiters) {
  321. Q_DBG("signal !full", queue);
  322. rv = apr_thread_cond_signal(queue->not_full);
  323. if (rv != APR_SUCCESS) {
  324. apr_thread_mutex_unlock(queue->one_big_mutex);
  325. return rv;
  326. }
  327. }
  328. rv = apr_thread_mutex_unlock(queue->one_big_mutex);
  329. return rv;
  330. }
  331. /**
  332. * Retrieves the next item from the queue. If there are no
  333. * items available, return APR_EAGAIN. Once retrieved,
  334. * the item is placed into the address specified by 'data'.
  335. */
  336. APU_DECLARE(apr_status_t) apr_queue_trypop(apr_queue_t *queue, void **data)
  337. {
  338. apr_status_t rv;
  339. if (queue->terminated) {
  340. return APR_EOF; /* no more elements ever again */
  341. }
  342. rv = apr_thread_mutex_lock(queue->one_big_mutex);
  343. if (rv != APR_SUCCESS) {
  344. return rv;
  345. }
  346. if (apr_queue_empty(queue)) {
  347. apr_thread_mutex_unlock(queue->one_big_mutex);
  348. return APR_EAGAIN;
  349. }
  350. *data = queue->data[queue->out];
  351. queue->nelts--;
  352. queue->out = (queue->out + 1) % queue->bounds;
  353. if (queue->full_waiters) {
  354. Q_DBG("signal !full", queue);
  355. rv = apr_thread_cond_signal(queue->not_full);
  356. if (rv != APR_SUCCESS) {
  357. apr_thread_mutex_unlock(queue->one_big_mutex);
  358. return rv;
  359. }
  360. }
  361. rv = apr_thread_mutex_unlock(queue->one_big_mutex);
  362. return rv;
  363. }
  364. APU_DECLARE(apr_status_t) apr_queue_interrupt_all(apr_queue_t *queue)
  365. {
  366. apr_status_t rv;
  367. Q_DBG("intr all", queue);
  368. if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
  369. return rv;
  370. }
  371. apr_thread_cond_broadcast(queue->not_empty);
  372. apr_thread_cond_broadcast(queue->not_full);
  373. if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
  374. return rv;
  375. }
  376. return APR_SUCCESS;
  377. }
  378. APU_DECLARE(apr_status_t) apr_queue_term(apr_queue_t *queue)
  379. {
  380. apr_status_t rv;
  381. if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
  382. return rv;
  383. }
  384. /* we must hold one_big_mutex when setting this... otherwise,
  385. * we could end up setting it and waking everybody up just after a
  386. * would-be popper checks it but right before they block
  387. */
  388. queue->terminated = 1;
  389. if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
  390. return rv;
  391. }
  392. return apr_queue_interrupt_all(queue);
  393. }
  394. #endif /* APR_HAS_THREADS */