su_pthread_port.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622
  1. /*
  2. * This file is part of the Sofia-SIP package
  3. *
  4. * Copyright (C) 2005 Nokia Corporation.
  5. *
  6. * Contact: Pekka Pessi <pekka.pessi@nokia.com>
  7. *
  8. * This library is free software; you can redistribute it and/or
  9. * modify it under the terms of the GNU Lesser General Public License
  10. * as published by the Free Software Foundation; either version 2.1 of
  11. * the License, or (at your option) any later version.
  12. *
  13. * This library is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  16. * Lesser General Public License for more details.
  17. *
  18. * You should have received a copy of the GNU Lesser General Public
  19. * License along with this library; if not, write to the Free Software
  20. * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  21. * 02110-1301 USA
  22. *
  23. */
  24. /**@ingroup su_wait
  25. * @CFILE su_pthread_port.c
  26. *
  27. * OS-Independent Syncronization Interface with pthreads
  28. *
  29. * This implements #su_msg_t message passing functionality using pthreads.
  30. *
  31. * @author Pekka Pessi <Pekka.Pessi@nokia.com>
  32. * @author Kai Vehmanen <kai.vehmanen@nokia.com>
  33. *
  34. * @date Created: Tue Sep 14 15:51:04 1999 ppessi
  35. */
  36. #include "config.h"
  37. #include <stdlib.h>
  38. #include <assert.h>
  39. #include <stdarg.h>
  40. #include <stdio.h>
  41. #include <string.h>
  42. #include <limits.h>
  43. #include <errno.h>
  44. #define su_pthread_port_s su_port_s
  45. #define SU_CLONE_T su_msg_t
  46. #include "sofia-sip/su.h"
  47. #include "su_port.h"
  48. #include "sofia-sip/su_alloc.h"
  49. #if 1
  50. #define PORT_LOCK_DEBUG(x) ((void)0)
  51. #else
  52. #define PORT_LOCK_DEBUG(x) printf x
  53. #endif
  54. #define SU_TASK_COPY(d, s, by) (void)((d)[0]=(s)[0], \
  55. (s)->sut_port?(void)su_port_incref(s->sut_port, #by):(void)0)
  56. /**@internal
  57. *
  58. * Initializes a message port. It creates a mailbox used to wake up the
  59. * thread waiting on the port if needed. Currently, the mailbox is a
  60. * socketpair or an UDP socket connected to itself.
  61. */
  62. int su_pthread_port_init(su_port_t *self, su_port_vtable_t const *vtable)
  63. {
  64. SU_DEBUG_9(("su_pthread_port_init(%p, %p) called\n",
  65. (void *)self, (void *)vtable));
  66. pthread_mutex_init(self->sup_obtained, NULL);
  67. return su_base_port_init(self, vtable);
  68. }
  69. /** @internal Deinit a base implementation of port. */
  70. void su_pthread_port_deinit(su_port_t *self)
  71. {
  72. assert(self);
  73. su_base_port_deinit(self);
  74. #if 0
  75. pthread_mutex_destroy(self->sup_runlock);
  76. pthread_cond_destroy(self->sup_resume);
  77. #endif
  78. pthread_mutex_destroy(self->sup_obtained);
  79. }
  80. void su_pthread_port_lock(su_port_t *self, char const *who)
  81. {
  82. PORT_LOCK_DEBUG(("%p at %s locking(%p)...",
  83. (void *)pthread_self(), who, self));
  84. su_home_lock(self->sup_base->sup_home);
  85. PORT_LOCK_DEBUG((" ...%p at %s locked(%p)...",
  86. (void *)pthread_self(), who, self));
  87. }
  88. void su_pthread_port_unlock(su_port_t *self, char const *who)
  89. {
  90. su_home_unlock(self->sup_base->sup_home);
  91. PORT_LOCK_DEBUG((" ...%p at %s unlocked(%p)\n",
  92. (void *)pthread_self(), who, self));
  93. }
  94. /** @internal
  95. *
  96. * Change or query ownership of the port object.
  97. *
  98. * @param self pointer to a port object
  99. * @param op operation
  100. *
  101. * @ERRORS
  102. * @ERROR EALREADY port already has an owner (or has no owner)
  103. */
  104. int su_pthread_port_thread(su_port_t *self, enum su_port_thread_op op)
  105. {
  106. pthread_t me = pthread_self();
  107. switch (op) {
  108. case su_port_thread_op_is_obtained:
  109. if (self->sup_thread == 0)
  110. return 0; /* No thread has obtained the port */
  111. else if (pthread_equal(self->sup_tid, me))
  112. return 2; /* Current thread has obtained the port */
  113. else
  114. return 1; /* A thread has obtained the port */
  115. case su_port_thread_op_release:
  116. if (!self->sup_thread || !pthread_equal(self->sup_tid, me))
  117. return errno = EALREADY, -1;
  118. self->sup_thread = 0;
  119. pthread_mutex_unlock(self->sup_obtained);
  120. return 0;
  121. case su_port_thread_op_obtain:
  122. su_home_threadsafe(su_port_home(self));
  123. pthread_mutex_lock(self->sup_obtained);
  124. self->sup_tid = me;
  125. self->sup_thread = 1;
  126. return 0;
  127. default:
  128. return errno = ENOSYS, -1;
  129. }
  130. }
  131. /* -- Clones ------------------------------------------------------------ */
  132. struct clone_args
  133. {
  134. su_port_create_f*create;
  135. su_root_t *parent;
  136. su_root_magic_t *magic;
  137. su_root_init_f init;
  138. su_root_deinit_f deinit;
  139. pthread_mutex_t mutex[1];
  140. pthread_cond_t cv[1];
  141. int retval;
  142. su_msg_r clone;
  143. };
  144. static void *su_pthread_port_clone_main(void *varg);
  145. static void su_pthread_port_return_to_parent(struct clone_args *arg,
  146. int retval);
  147. static su_msg_function su_pthread_port_clone_break;
  148. /* Structure used to synchronize parent and clone in su_clone_wait() */
  149. struct su_pthread_port_waiting_parent {
  150. pthread_mutex_t deinit[1];
  151. pthread_mutex_t mutex[1];
  152. pthread_cond_t cv[1];
  153. int waiting;
  154. };
  155. /** Start a clone task running under a pthread.
  156. *
  157. * @internal
  158. *
  159. * Allocates and initializes a sub-task with its own pthread. The sub-task is
  160. * represented by clone handle to the rest of the application. The function
  161. * su_clone_start() returns the clone handle in @a return_clone. The clone
  162. * handle is used to communicate with the newly created clone task using
  163. * messages.
  164. *
  165. * A new #su_root_t object is created for the sub-task with the @a magic as
  166. * the root context pointer. Because the sub-task may or may not have its
  167. * own thread, all its activity must be scheduled via this root object. In
  168. * other words, the sub-task can be schedule
  169. * -# I/O events with su_root_register()
  170. * -# timers with su_timer_set(), su_timer_set_at() or su_timer_run()
  171. * -# messages with su_msg_send().
  172. *
  173. * Messages can also be used to pass information between tasks or threads.
  174. *
  175. * After the new thread has been launched, the initialization routine is
  176. * executed by the newly created thread. The calling thread blocks until
  177. * the initialization routine completes. If the initialization routine
  178. * returns #su_success (0), the sub-task is considered to be created
  179. * successfully. After the successful initialization, the sub-task continues
  180. * to execeute the function su_root_run().
  181. *
  182. * If the initalization function @a init fails, the sub-task (either the
  183. * newly created thread or the current thread executing the su_clone_start()
  184. * function) calls the deinitialization function, and su_clone_start()
  185. * returns NULL.
  186. *
  187. * @param parent root to be cloned (may be NULL if multi-threaded)
  188. * @param return_clone reference to a clone [OUT]
  189. * @param magic pointer to user data
  190. * @param init initialization function
  191. * @param deinit deinitialization function
  192. *
  193. * @return 0 if successfull, -1 upon an error.
  194. *
  195. * @sa su_root_threading(), su_clone_task(), su_clone_stop(), su_clone_wait(),
  196. * su_clone_forget().
  197. *
  198. */
  199. int su_pthreaded_port_start(su_port_create_f *create,
  200. su_root_t *parent,
  201. su_clone_r return_clone,
  202. su_root_magic_t *magic,
  203. su_root_init_f init,
  204. su_root_deinit_f deinit)
  205. {
  206. struct clone_args arg = {
  207. /* create: */ NULL,
  208. /* parent: */ NULL,
  209. /* magic: */ NULL,
  210. /* init: */ NULL,
  211. /* deinit: */ NULL,
  212. /* mutex: */ { PTHREAD_MUTEX_INITIALIZER },
  213. #if HAVE_OPEN_C
  214. /* cv: */ { _ENeedsNormalInit, NULL },
  215. #else
  216. /* cv: */ { PTHREAD_COND_INITIALIZER },
  217. #endif
  218. /* retval: */ -1,
  219. /* clone: */ SU_MSG_R_INIT,
  220. };
  221. int thread_created = 0;
  222. pthread_t tid;
  223. pthread_attr_t attr;
  224. struct sched_param param;
  225. arg.create = create;
  226. arg.parent = parent;
  227. arg.magic = magic;
  228. arg.init = init;
  229. arg.deinit = deinit;
  230. pthread_attr_init(&attr);
  231. pthread_attr_setstacksize(&attr, 244);
  232. pthread_attr_getschedparam(&attr, &param);
  233. param.sched_priority = 99;
  234. pthread_attr_setschedparam(&attr, &param);
  235. pthread_mutex_lock(arg.mutex);
  236. if (pthread_create(&tid, &attr, su_pthread_port_clone_main, &arg) == 0) {
  237. #ifdef HAVE_PTHREAD_SETSCHEDPARAM
  238. int policy;
  239. struct sched_param param;
  240. pthread_getschedparam(tid, &policy, &param);
  241. param.sched_priority = 99;
  242. pthread_setschedparam(tid, policy, &param);
  243. #endif
  244. pthread_cond_wait(arg.cv, arg.mutex);
  245. thread_created = 1;
  246. }
  247. pthread_attr_destroy(&attr);
  248. pthread_mutex_unlock(arg.mutex);
  249. pthread_mutex_destroy(arg.mutex);
  250. pthread_cond_destroy(arg.cv);
  251. if (arg.retval != 0) {
  252. if (thread_created)
  253. pthread_join(tid, NULL);
  254. return -1;
  255. }
  256. *return_clone = *arg.clone;
  257. return 0;
  258. }
  259. /** Main function for clone thread.
  260. *
  261. * @internal
  262. */
  263. static void *su_pthread_port_clone_main(void *varg)
  264. {
  265. struct clone_args *arg = (struct clone_args *)varg;
  266. su_task_r task;
  267. int zap = 1;
  268. #if SU_HAVE_WINSOCK
  269. su_init();
  270. #endif
  271. task->sut_port = arg->create();
  272. if (task->sut_port) {
  273. task->sut_root = su_salloc(su_port_home(task->sut_port),
  274. sizeof *task->sut_root);
  275. if (task->sut_root) {
  276. task->sut_root->sur_threading = 1; /* By default */
  277. SU_TASK_COPY(task->sut_root->sur_parent, su_root_task(arg->parent),
  278. su_pthread_port_clone_main);
  279. SU_TASK_COPY(task->sut_root->sur_task, task,
  280. su_pthread_port_clone_main);
  281. if (su_msg_create(arg->clone,
  282. task,
  283. su_root_task(arg->parent),
  284. su_pthread_port_clone_break,
  285. 0) == 0) {
  286. task->sut_root->sur_magic = arg->magic;
  287. task->sut_root->sur_deinit = arg->deinit;
  288. su_root_set_max_defer(task->sut_root,
  289. su_root_get_max_defer(arg->parent));
  290. if (arg->init(task->sut_root, arg->magic) == 0) {
  291. su_pthread_port_return_to_parent(arg, 0), arg = NULL;
  292. su_root_run(task->sut_root); /* Do the work */
  293. /* Cleanup */
  294. if (task->sut_port->sup_waiting_parent) {
  295. struct su_pthread_port_waiting_parent *mom;
  296. mom = task->sut_port->sup_waiting_parent;
  297. pthread_mutex_lock(mom->mutex);
  298. mom->waiting = 0;
  299. pthread_cond_signal(mom->cv);
  300. pthread_mutex_unlock(mom->mutex);
  301. pthread_mutex_lock(mom->deinit);
  302. su_port_getmsgs(task->sut_port);
  303. pthread_mutex_unlock(mom->deinit);
  304. }
  305. else
  306. zap = 0;
  307. }
  308. else
  309. su_msg_destroy(arg->clone);
  310. su_root_destroy(task->sut_root);
  311. }
  312. }
  313. task->sut_port->sup_base->sup_vtable->
  314. su_port_decref(task->sut_port, zap,
  315. "su_pthread_port_clone_main");
  316. }
  317. #if SU_HAVE_WINSOCK
  318. su_deinit();
  319. #endif
  320. if (arg)
  321. su_pthread_port_return_to_parent(arg, -1);
  322. return NULL; /* Exit from thread */
  323. }
  324. /* Signal that parent can resume execution */
  325. static void su_pthread_port_return_to_parent(struct clone_args *arg,
  326. int retval)
  327. {
  328. arg->retval = retval;
  329. pthread_mutex_lock(arg->mutex);
  330. pthread_cond_signal(arg->cv);
  331. pthread_mutex_unlock(arg->mutex);
  332. }
  333. /** "Stop" message function for pthread clone.
  334. *
  335. * @sa su_clone_wait()
  336. * @internal
  337. */
  338. static void su_pthread_port_clone_break(su_root_magic_t *m,
  339. su_msg_r msg,
  340. su_msg_arg_t *a)
  341. {
  342. su_root_t *root = su_msg_to(msg)->sut_root;
  343. root->sur_deiniting = 1;
  344. su_root_break(root);
  345. }
  346. /** Wait for the pthread clone to exit.
  347. * @internal
  348. *
  349. * Called by su_port_wait() and su_clone_wait().
  350. */
  351. void su_pthread_port_wait(su_clone_r rclone)
  352. {
  353. su_port_t *clone, *parent;
  354. struct su_pthread_port_waiting_parent mom[1];
  355. pthread_t tid;
  356. assert(*rclone);
  357. clone = su_msg_to(rclone)->sut_port;
  358. parent = su_msg_from(rclone)->sut_port;
  359. if (clone == parent) {
  360. su_base_port_wait(rclone);
  361. return;
  362. }
  363. assert(parent); assert(clone);
  364. assert(rclone[0]->sum_func == su_pthread_port_clone_break);
  365. #if 0
  366. assert(!clone->sup_paused);
  367. #endif
  368. tid = clone->sup_tid;
  369. if (!clone->sup_thread) { /* Already died */
  370. su_msg_destroy(rclone);
  371. pthread_join(tid, NULL);
  372. return;
  373. }
  374. pthread_mutex_init(mom->deinit, NULL);
  375. pthread_mutex_lock(mom->deinit);
  376. pthread_cond_init(mom->cv, NULL);
  377. pthread_mutex_init(mom->mutex, NULL);
  378. pthread_mutex_lock(mom->mutex);
  379. mom->waiting = 1;
  380. clone->sup_waiting_parent = mom;
  381. su_msg_send(rclone);
  382. while (mom->waiting)
  383. pthread_cond_wait(mom->cv, mom->mutex);
  384. /* Run all messages from clone */
  385. while (su_port_getmsgs_from(parent, clone))
  386. ;
  387. /* Allow clone thread to exit */
  388. pthread_mutex_unlock(mom->deinit);
  389. pthread_join(tid, NULL);
  390. pthread_mutex_destroy(mom->deinit);
  391. pthread_mutex_unlock(mom->mutex);
  392. pthread_mutex_destroy(mom->mutex);
  393. pthread_cond_destroy(mom->cv);
  394. }
  395. struct su_pthread_port_execute
  396. {
  397. pthread_mutex_t mutex[1];
  398. pthread_cond_t cond[1];
  399. int (*function)(void *);
  400. void *arg;
  401. int value;
  402. };
  403. static su_msg_function _su_pthread_port_execute;
  404. /** Execute the @a function by a pthread @a task.
  405. *
  406. * @retval 0 if successful
  407. * @retval -1 upon an error
  408. *
  409. * @sa su_task_execute()
  410. *
  411. * @internal
  412. */
  413. int su_pthread_port_execute(su_task_r const task,
  414. int (*function)(void *), void *arg,
  415. int *return_value)
  416. {
  417. int success;
  418. su_msg_r m = SU_MSG_R_INIT;
  419. #if HAVE_OPEN_C
  420. struct su_pthread_port_execute frame = {
  421. { PTHREAD_MUTEX_INITIALIZER },
  422. { _ENeedsNormalInit, NULL },
  423. NULL, NULL, 0
  424. };
  425. frame.function = function;
  426. frame.arg = arg;
  427. #else
  428. struct su_pthread_port_execute frame = {
  429. { PTHREAD_MUTEX_INITIALIZER },
  430. { PTHREAD_COND_INITIALIZER },
  431. function, arg, 0
  432. };
  433. #endif
  434. if (su_msg_create(m, task, su_task_null,
  435. _su_pthread_port_execute, (sizeof &frame)) < 0)
  436. return -1;
  437. *(struct su_pthread_port_execute **)su_msg_data(m) = &frame;
  438. pthread_mutex_lock(frame.mutex);
  439. success = su_msg_send(m);
  440. if (success == 0)
  441. while (frame.function)
  442. pthread_cond_wait(frame.cond, frame.mutex);
  443. else
  444. su_msg_destroy(m);
  445. pthread_mutex_unlock(frame.mutex);
  446. pthread_mutex_destroy(frame.mutex);
  447. pthread_cond_destroy(frame.cond);
  448. if (return_value)
  449. *return_value = frame.value;
  450. return success;
  451. }
  452. static void _su_pthread_port_execute(su_root_magic_t *m,
  453. su_msg_r msg,
  454. su_msg_arg_t *a)
  455. {
  456. struct su_pthread_port_execute *frame;
  457. frame = *(struct su_pthread_port_execute **)a;
  458. pthread_mutex_lock(frame->mutex);
  459. frame->value = frame->function(frame->arg);
  460. frame->function = NULL; /* Mark as completed */
  461. pthread_cond_signal(frame->cond);
  462. pthread_mutex_unlock(frame->mutex);
  463. }
  464. #if 0 /* pausing and resuming are not used */
  465. /** Pause the pthread port.
  466. *
  467. * This is a message function invoked by su_pthread_port_pause() and called
  468. * from the message dispatcher. It releases the lock sup_runlock and waits
  469. * until the condition variable sup_resume is signaled and sup_paused is
  470. * cleared by su_pthread_port_resume().
  471. */
  472. static
  473. void su_pthread_port_paused(su_root_magic_t *magic,
  474. su_msg_r msg,
  475. su_msg_arg_t *arg)
  476. {
  477. su_port_t *self = su_msg_to(msg)->sut_port;
  478. self->sup_paused = 1;
  479. while (self->sup_paused)
  480. pthread_cond_wait(self->sup_resume, self->sup_runlock);
  481. }
  482. /** Pause a port.
  483. *
  484. * Obtain an exclusive lock on port's private data.
  485. *
  486. * @retval 0 if successful (and clone is paused)
  487. * @retval -1 upon an error
  488. */
  489. int su_pthread_port_pause(su_port_t *self)
  490. {
  491. su_msg_r m = SU_MSG_R_INIT;
  492. _su_task_t task[1] = {{ self, NULL }};
  493. if (su_msg_create(m, task, su_task_null, su_pthread_port_paused, 0) < 0)
  494. return -1;
  495. if (su_msg_send(m) < 0)
  496. return -1;
  497. if (pthread_mutex_lock(self->sup_runlock) < 0)
  498. return -1;
  499. return 0;
  500. }
  501. /** Resume a port.
  502. *
  503. * Give up an exclusive lock on port's private data.
  504. *
  505. * @retval 0 if successful (and clone is resumed)
  506. * @retval -1 upon an error
  507. */
  508. int su_pthread_port_resume(su_port_t *self)
  509. {
  510. assert(self && self->sup_paused);
  511. self->sup_paused = 0;
  512. if (pthread_cond_signal(self->sup_resume) < 0 ||
  513. pthread_mutex_unlock(self->sup_runlock) < 0)
  514. return -1;
  515. return 0;
  516. }
  517. #endif