2
0

testqueue.c 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. /* Copyright 2000-2005 The Apache Software Foundation or its licensors, as
  2. * applicable.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <apr_thread_proc.h>
  17. #include <apr_errno.h>
  18. #include <apr_general.h>
  19. #include <apr_getopt.h>
  20. #include <apr_strings.h>
  21. #include "errno.h"
  22. #include <stdio.h>
  23. #include <stdlib.h>
  24. #include <apr_time.h>
  25. #if APR_HAVE_UNISTD_H
  26. #include <unistd.h>
  27. #endif
  28. #include <apr_portable.h>
  29. #include "apr_queue.h"
  30. #if !APR_HAS_THREADS
  31. int main(void)
  32. {
  33. fprintf(stderr,
  34. "This program won't work on this platform because there is no "
  35. "support for threads.\n");
  36. return 0;
  37. }
  38. #else /* !APR_HAS_THREADS */
  39. apr_pool_t *context;
  40. int consumer_activity=400;
  41. int producer_activity=300;
  42. int verbose=0;
  43. static void * APR_THREAD_FUNC consumer(apr_thread_t *thd, void *data);
  44. static void * APR_THREAD_FUNC producer(apr_thread_t *thd, void *data);
  45. static void usage(void);
  46. static void * APR_THREAD_FUNC consumer(apr_thread_t *thd, void *data)
  47. {
  48. long sleeprate;
  49. apr_queue_t *q = (apr_queue_t*)data;
  50. apr_status_t rv;
  51. int val;
  52. void *v;
  53. char current_thread_str[30];
  54. apr_os_thread_t current_thread = apr_os_thread_current();
  55. apr_snprintf(current_thread_str, sizeof current_thread_str,
  56. "%pT", &current_thread);
  57. sleeprate = 1000000/consumer_activity;
  58. apr_sleep( (rand() % 4 ) * 1000000 ); /* sleep random seconds */
  59. while (1) {
  60. do {
  61. rv = apr_queue_pop(q, &v);
  62. if (rv == APR_EINTR) {
  63. fprintf(stderr, "%s\tconsumer intr\n", current_thread_str);
  64. }
  65. } while (rv == APR_EINTR) ;
  66. if (rv != APR_SUCCESS) {
  67. if (rv == APR_EOF) {
  68. fprintf(stderr, "%s\tconsumer:queue terminated APR_EOF\n", current_thread_str);
  69. rv=APR_SUCCESS;
  70. }
  71. else
  72. fprintf(stderr, "%s\tconsumer thread exit rv %d\n", current_thread_str, rv);
  73. apr_thread_exit(thd, rv);
  74. return NULL;
  75. }
  76. val = *(int*)v;
  77. if (verbose)
  78. fprintf(stderr, "%s\tpop %d\n", current_thread_str, val);
  79. apr_sleep( sleeprate ); /* sleep this long to acheive our rate */
  80. }
  81. /* not reached */
  82. return NULL;
  83. }
  84. static void * APR_THREAD_FUNC producer(apr_thread_t *thd, void *data)
  85. {
  86. int i=0;
  87. long sleeprate;
  88. apr_queue_t *q = (apr_queue_t*)data;
  89. apr_status_t rv;
  90. int *val;
  91. char current_thread_str[30];
  92. apr_os_thread_t current_thread = apr_os_thread_current();
  93. apr_snprintf(current_thread_str, sizeof current_thread_str,
  94. "%pT", &current_thread);
  95. sleeprate = 1000000/producer_activity;
  96. apr_sleep( (rand() % 4 ) * 1000000 ); /* sleep random seconds */
  97. while(1) {
  98. val = apr_palloc(context, sizeof(int));
  99. *val=i;
  100. if (verbose)
  101. fprintf(stderr, "%s\tpush %d\n", current_thread_str, *val);
  102. do {
  103. rv = apr_queue_push(q, val);
  104. if (rv == APR_EINTR)
  105. fprintf(stderr, "%s\tproducer intr\n", current_thread_str);
  106. } while (rv == APR_EINTR);
  107. if (rv != APR_SUCCESS) {
  108. if (rv == APR_EOF) {
  109. fprintf(stderr, "%s\tproducer: queue terminated APR_EOF\n", current_thread_str);
  110. rv = APR_SUCCESS;
  111. }
  112. else
  113. fprintf(stderr, "%s\tproducer thread exit rv %d\n", current_thread_str, rv);
  114. apr_thread_exit(thd, rv);
  115. return NULL;
  116. }
  117. i++;
  118. apr_sleep( sleeprate ); /* sleep this long to acheive our rate */
  119. }
  120. /* not reached */
  121. return NULL;
  122. }
  123. static void usage(void)
  124. {
  125. fprintf(stderr,"usage: testqueue -p n -P n -c n -C n -q n -s n\n");
  126. fprintf(stderr,"-c # of consumer\n");
  127. fprintf(stderr,"-C amount they consumer before dying\n");
  128. fprintf(stderr,"-p # of producers\n");
  129. fprintf(stderr,"-P amount they produce before dying\n");
  130. fprintf(stderr,"-q queue size\n");
  131. fprintf(stderr,"-s amount of time to sleep before killing it\n");
  132. fprintf(stderr,"-v verbose\n");
  133. }
  134. int main(int argc, const char* const argv[])
  135. {
  136. apr_thread_t **t;
  137. apr_queue_t *queue;
  138. int i;
  139. apr_status_t rv;
  140. apr_getopt_t *opt;
  141. const char *optarg;
  142. char c;
  143. int numconsumers=3;
  144. int numproducers=4;
  145. int queuesize=100;
  146. int sleeptime=30;
  147. char errorbuf[200];
  148. apr_initialize();
  149. srand((unsigned int)apr_time_now());
  150. printf("APR Queue Test\n======================\n\n");
  151. printf("%-60s", "Initializing the context");
  152. if (apr_pool_create(&context, NULL) != APR_SUCCESS) {
  153. fflush(stdout);
  154. fprintf(stderr, "Failed.\nCould not initialize\n");
  155. exit(-1);
  156. }
  157. printf("OK\n");
  158. apr_getopt_init(&opt, context, argc, argv);
  159. while ((rv = apr_getopt(opt, "p:c:P:C:q:s:v", &c, &optarg))
  160. == APR_SUCCESS) {
  161. switch (c) {
  162. case 'c':
  163. numconsumers = atoi( optarg);
  164. break;
  165. case 'p':
  166. numproducers = atoi( optarg);
  167. break;
  168. case 'C':
  169. consumer_activity = atoi( optarg);
  170. break;
  171. case 'P':
  172. producer_activity = atoi( optarg);
  173. break;
  174. case 's':
  175. sleeptime= atoi(optarg);
  176. break;
  177. case 'q':
  178. queuesize = atoi(optarg);
  179. break;
  180. case 'v':
  181. verbose= 1;
  182. break;
  183. default:
  184. usage();
  185. exit(-1);
  186. }
  187. }
  188. /* bad cmdline option? then we die */
  189. if (rv != APR_EOF || opt->ind < opt->argc) {
  190. usage();
  191. exit(-1);
  192. }
  193. printf("test stats %d consumers (rate %d/sec) %d producers (rate %d/sec) queue size %d sleep %d\n",
  194. numconsumers,consumer_activity, numproducers, producer_activity, queuesize,sleeptime);
  195. printf("%-60s", "Initializing the queue");
  196. rv = apr_queue_create(&queue, queuesize, context);
  197. if (rv != APR_SUCCESS) {
  198. fflush(stdout);
  199. fprintf(stderr, "Failed\nCould not create queue %d\n",rv);
  200. apr_strerror(rv, errorbuf,200);
  201. fprintf(stderr,"%s\n",errorbuf);
  202. exit(-1);
  203. }
  204. printf("OK\n");
  205. t = apr_palloc( context, sizeof(apr_thread_t*) * (numconsumers+numproducers));
  206. printf("%-60s", "Starting consumers");
  207. for (i=0;i<numconsumers;i++) {
  208. rv = apr_thread_create(&t[i], NULL, consumer, queue, context);
  209. if (rv != APR_SUCCESS) {
  210. apr_strerror(rv, errorbuf,200);
  211. fprintf(stderr, "Failed\nError starting consumer thread (%d) rv=%d:%s\n",i, rv,errorbuf);
  212. exit(-1);
  213. }
  214. }
  215. for (i=numconsumers;i<(numconsumers+numproducers);i++) {
  216. rv = apr_thread_create(&t[i], NULL, producer, queue, context);
  217. if (rv != APR_SUCCESS) {
  218. apr_strerror(rv, errorbuf,200);
  219. fprintf(stderr, "Failed\nError starting producer thread (%d) rv=%d:%s\n",i, rv,errorbuf);
  220. exit(-1);
  221. }
  222. }
  223. printf("OK\n");
  224. printf("%-60s", "Sleeping\n");
  225. apr_sleep( sleeptime * 1000000 ); /* sleep 10 seconds */
  226. printf("OK\n");
  227. printf("%-60s", "Terminating queue");
  228. rv = apr_queue_term(queue);
  229. if (rv != APR_SUCCESS) {
  230. apr_strerror(rv, errorbuf,200);
  231. fprintf( stderr, "apr_queue_term failed %d:%s\n",rv,errorbuf);
  232. }
  233. printf("OK\n");
  234. printf("%-60s", "Waiting for threads to exit\n");
  235. fflush(stdout);
  236. for (i=0;i<numconsumers+numproducers;i++) {
  237. apr_thread_join(&rv, t[i]);
  238. if (rv != 0 ) {
  239. apr_strerror(rv, errorbuf,200);
  240. if (i<numconsumers)
  241. fprintf( stderr, "consumer thread %d failed rv %d:%s\n",i,rv,errorbuf);
  242. else
  243. fprintf( stderr, "producer thread %d failed rv %d:%s\n",i,rv,errorbuf);
  244. }
  245. }
  246. printf("OK\n");
  247. apr_terminate();
  248. return 0;
  249. }
  250. #endif /* !APR_HAS_THREADS */