123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622 |
- /*
- * This file is part of the Sofia-SIP package
- *
- * Copyright (C) 2005 Nokia Corporation.
- *
- * Contact: Pekka Pessi <pekka.pessi@nokia.com>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA
- *
- */
- /**@ingroup su_wait
- * @CFILE su_pthread_port.c
- *
- * OS-Independent Syncronization Interface with pthreads
- *
- * This implements #su_msg_t message passing functionality using pthreads.
- *
- * @author Pekka Pessi <Pekka.Pessi@nokia.com>
- * @author Kai Vehmanen <kai.vehmanen@nokia.com>
- *
- * @date Created: Tue Sep 14 15:51:04 1999 ppessi
- */
- #include "config.h"
- #include <stdlib.h>
- #include <assert.h>
- #include <stdarg.h>
- #include <stdio.h>
- #include <string.h>
- #include <limits.h>
- #include <errno.h>
- #define su_pthread_port_s su_port_s
- #define SU_CLONE_T su_msg_t
- #include "sofia-sip/su.h"
- #include "su_port.h"
- #include "sofia-sip/su_alloc.h"
- #if 1
- #define PORT_LOCK_DEBUG(x) ((void)0)
- #else
- #define PORT_LOCK_DEBUG(x) printf x
- #endif
- #define SU_TASK_COPY(d, s, by) (void)((d)[0]=(s)[0], \
- (s)->sut_port?(void)su_port_incref(s->sut_port, #by):(void)0)
- /**@internal
- *
- * Initializes a message port. It creates a mailbox used to wake up the
- * thread waiting on the port if needed. Currently, the mailbox is a
- * socketpair or an UDP socket connected to itself.
- */
- int su_pthread_port_init(su_port_t *self, su_port_vtable_t const *vtable)
- {
- SU_DEBUG_9(("su_pthread_port_init(%p, %p) called\n",
- (void *)self, (void *)vtable));
- pthread_mutex_init(self->sup_obtained, NULL);
- return su_base_port_init(self, vtable);
- }
- /** @internal Deinit a base implementation of port. */
- void su_pthread_port_deinit(su_port_t *self)
- {
- assert(self);
- su_base_port_deinit(self);
- #if 0
- pthread_mutex_destroy(self->sup_runlock);
- pthread_cond_destroy(self->sup_resume);
- #endif
- pthread_mutex_destroy(self->sup_obtained);
- }
- void su_pthread_port_lock(su_port_t *self, char const *who)
- {
- PORT_LOCK_DEBUG(("%p at %s locking(%p)...",
- (void *)pthread_self(), who, self));
- su_home_lock(self->sup_base->sup_home);
- PORT_LOCK_DEBUG((" ...%p at %s locked(%p)...",
- (void *)pthread_self(), who, self));
- }
- void su_pthread_port_unlock(su_port_t *self, char const *who)
- {
- su_home_unlock(self->sup_base->sup_home);
- PORT_LOCK_DEBUG((" ...%p at %s unlocked(%p)\n",
- (void *)pthread_self(), who, self));
- }
- /** @internal
- *
- * Change or query ownership of the port object.
- *
- * @param self pointer to a port object
- * @param op operation
- *
- * @ERRORS
- * @ERROR EALREADY port already has an owner (or has no owner)
- */
- int su_pthread_port_thread(su_port_t *self, enum su_port_thread_op op)
- {
- pthread_t me = pthread_self();
- switch (op) {
- case su_port_thread_op_is_obtained:
- if (self->sup_thread == 0)
- return 0; /* No thread has obtained the port */
- else if (pthread_equal(self->sup_tid, me))
- return 2; /* Current thread has obtained the port */
- else
- return 1; /* A thread has obtained the port */
- case su_port_thread_op_release:
- if (!self->sup_thread || !pthread_equal(self->sup_tid, me))
- return errno = EALREADY, -1;
- self->sup_thread = 0;
- pthread_mutex_unlock(self->sup_obtained);
- return 0;
- case su_port_thread_op_obtain:
- su_home_threadsafe(su_port_home(self));
- pthread_mutex_lock(self->sup_obtained);
- self->sup_tid = me;
- self->sup_thread = 1;
- return 0;
- default:
- return errno = ENOSYS, -1;
- }
- }
- /* -- Clones ------------------------------------------------------------ */
- struct clone_args
- {
- su_port_create_f*create;
- su_root_t *parent;
- su_root_magic_t *magic;
- su_root_init_f init;
- su_root_deinit_f deinit;
- pthread_mutex_t mutex[1];
- pthread_cond_t cv[1];
- int retval;
- su_msg_r clone;
- };
- static void *su_pthread_port_clone_main(void *varg);
- static void su_pthread_port_return_to_parent(struct clone_args *arg,
- int retval);
- static su_msg_function su_pthread_port_clone_break;
- /* Structure used to synchronize parent and clone in su_clone_wait() */
- struct su_pthread_port_waiting_parent {
- pthread_mutex_t deinit[1];
- pthread_mutex_t mutex[1];
- pthread_cond_t cv[1];
- int waiting;
- };
- /** Start a clone task running under a pthread.
- *
- * @internal
- *
- * Allocates and initializes a sub-task with its own pthread. The sub-task is
- * represented by clone handle to the rest of the application. The function
- * su_clone_start() returns the clone handle in @a return_clone. The clone
- * handle is used to communicate with the newly created clone task using
- * messages.
- *
- * A new #su_root_t object is created for the sub-task with the @a magic as
- * the root context pointer. Because the sub-task may or may not have its
- * own thread, all its activity must be scheduled via this root object. In
- * other words, the sub-task can be schedule
- * -# I/O events with su_root_register()
- * -# timers with su_timer_set(), su_timer_set_at() or su_timer_run()
- * -# messages with su_msg_send().
- *
- * Messages can also be used to pass information between tasks or threads.
- *
- * After the new thread has been launched, the initialization routine is
- * executed by the newly created thread. The calling thread blocks until
- * the initialization routine completes. If the initialization routine
- * returns #su_success (0), the sub-task is considered to be created
- * successfully. After the successful initialization, the sub-task continues
- * to execeute the function su_root_run().
- *
- * If the initalization function @a init fails, the sub-task (either the
- * newly created thread or the current thread executing the su_clone_start()
- * function) calls the deinitialization function, and su_clone_start()
- * returns NULL.
- *
- * @param parent root to be cloned (may be NULL if multi-threaded)
- * @param return_clone reference to a clone [OUT]
- * @param magic pointer to user data
- * @param init initialization function
- * @param deinit deinitialization function
- *
- * @return 0 if successfull, -1 upon an error.
- *
- * @sa su_root_threading(), su_clone_task(), su_clone_stop(), su_clone_wait(),
- * su_clone_forget().
- *
- */
- int su_pthreaded_port_start(su_port_create_f *create,
- su_root_t *parent,
- su_clone_r return_clone,
- su_root_magic_t *magic,
- su_root_init_f init,
- su_root_deinit_f deinit)
- {
- struct clone_args arg = {
- /* create: */ NULL,
- /* parent: */ NULL,
- /* magic: */ NULL,
- /* init: */ NULL,
- /* deinit: */ NULL,
- /* mutex: */ { PTHREAD_MUTEX_INITIALIZER },
- #if HAVE_OPEN_C
- /* cv: */ { _ENeedsNormalInit, NULL },
- #else
- /* cv: */ { PTHREAD_COND_INITIALIZER },
- #endif
- /* retval: */ -1,
- /* clone: */ SU_MSG_R_INIT,
- };
- int thread_created = 0;
- pthread_t tid;
- pthread_attr_t attr;
- struct sched_param param;
- arg.create = create;
- arg.parent = parent;
- arg.magic = magic;
- arg.init = init;
- arg.deinit = deinit;
- pthread_attr_init(&attr);
- pthread_attr_setstacksize(&attr, 244);
- pthread_attr_getschedparam(&attr, ¶m);
- param.sched_priority = 99;
- pthread_attr_setschedparam(&attr, ¶m);
- pthread_mutex_lock(arg.mutex);
- if (pthread_create(&tid, &attr, su_pthread_port_clone_main, &arg) == 0) {
- #ifdef HAVE_PTHREAD_SETSCHEDPARAM
- int policy;
- struct sched_param param;
- pthread_getschedparam(tid, &policy, ¶m);
- param.sched_priority = 99;
- pthread_setschedparam(tid, policy, ¶m);
- #endif
- pthread_cond_wait(arg.cv, arg.mutex);
- thread_created = 1;
- }
- pthread_attr_destroy(&attr);
- pthread_mutex_unlock(arg.mutex);
- pthread_mutex_destroy(arg.mutex);
- pthread_cond_destroy(arg.cv);
- if (arg.retval != 0) {
- if (thread_created)
- pthread_join(tid, NULL);
- return -1;
- }
- *return_clone = *arg.clone;
- return 0;
- }
- /** Main function for clone thread.
- *
- * @internal
- */
- static void *su_pthread_port_clone_main(void *varg)
- {
- struct clone_args *arg = (struct clone_args *)varg;
- su_task_r task;
- int zap = 1;
- #if SU_HAVE_WINSOCK
- su_init();
- #endif
- task->sut_port = arg->create();
- if (task->sut_port) {
- task->sut_root = su_salloc(su_port_home(task->sut_port),
- sizeof *task->sut_root);
- if (task->sut_root) {
- task->sut_root->sur_threading = 1; /* By default */
- SU_TASK_COPY(task->sut_root->sur_parent, su_root_task(arg->parent),
- su_pthread_port_clone_main);
- SU_TASK_COPY(task->sut_root->sur_task, task,
- su_pthread_port_clone_main);
- if (su_msg_create(arg->clone,
- task,
- su_root_task(arg->parent),
- su_pthread_port_clone_break,
- 0) == 0) {
- task->sut_root->sur_magic = arg->magic;
- task->sut_root->sur_deinit = arg->deinit;
- su_root_set_max_defer(task->sut_root,
- su_root_get_max_defer(arg->parent));
- if (arg->init(task->sut_root, arg->magic) == 0) {
- su_pthread_port_return_to_parent(arg, 0), arg = NULL;
- su_root_run(task->sut_root); /* Do the work */
- /* Cleanup */
- if (task->sut_port->sup_waiting_parent) {
- struct su_pthread_port_waiting_parent *mom;
- mom = task->sut_port->sup_waiting_parent;
- pthread_mutex_lock(mom->mutex);
- mom->waiting = 0;
- pthread_cond_signal(mom->cv);
- pthread_mutex_unlock(mom->mutex);
- pthread_mutex_lock(mom->deinit);
- su_port_getmsgs(task->sut_port);
- pthread_mutex_unlock(mom->deinit);
- }
- else
- zap = 0;
- }
- else
- su_msg_destroy(arg->clone);
- su_root_destroy(task->sut_root);
- }
- }
- task->sut_port->sup_base->sup_vtable->
- su_port_decref(task->sut_port, zap,
- "su_pthread_port_clone_main");
- }
- #if SU_HAVE_WINSOCK
- su_deinit();
- #endif
- if (arg)
- su_pthread_port_return_to_parent(arg, -1);
- return NULL; /* Exit from thread */
- }
- /* Signal that parent can resume execution */
- static void su_pthread_port_return_to_parent(struct clone_args *arg,
- int retval)
- {
- arg->retval = retval;
- pthread_mutex_lock(arg->mutex);
- pthread_cond_signal(arg->cv);
- pthread_mutex_unlock(arg->mutex);
- }
- /** "Stop" message function for pthread clone.
- *
- * @sa su_clone_wait()
- * @internal
- */
- static void su_pthread_port_clone_break(su_root_magic_t *m,
- su_msg_r msg,
- su_msg_arg_t *a)
- {
- su_root_t *root = su_msg_to(msg)->sut_root;
- root->sur_deiniting = 1;
- su_root_break(root);
- }
- /** Wait for the pthread clone to exit.
- * @internal
- *
- * Called by su_port_wait() and su_clone_wait().
- */
- void su_pthread_port_wait(su_clone_r rclone)
- {
- su_port_t *clone, *parent;
- struct su_pthread_port_waiting_parent mom[1];
- pthread_t tid;
- assert(*rclone);
- clone = su_msg_to(rclone)->sut_port;
- parent = su_msg_from(rclone)->sut_port;
- if (clone == parent) {
- su_base_port_wait(rclone);
- return;
- }
- assert(parent); assert(clone);
- assert(rclone[0]->sum_func == su_pthread_port_clone_break);
- #if 0
- assert(!clone->sup_paused);
- #endif
- tid = clone->sup_tid;
- if (!clone->sup_thread) { /* Already died */
- su_msg_destroy(rclone);
- pthread_join(tid, NULL);
- return;
- }
- pthread_mutex_init(mom->deinit, NULL);
- pthread_mutex_lock(mom->deinit);
- pthread_cond_init(mom->cv, NULL);
- pthread_mutex_init(mom->mutex, NULL);
- pthread_mutex_lock(mom->mutex);
- mom->waiting = 1;
- clone->sup_waiting_parent = mom;
- su_msg_send(rclone);
- while (mom->waiting)
- pthread_cond_wait(mom->cv, mom->mutex);
- /* Run all messages from clone */
- while (su_port_getmsgs_from(parent, clone))
- ;
- /* Allow clone thread to exit */
- pthread_mutex_unlock(mom->deinit);
- pthread_join(tid, NULL);
- pthread_mutex_destroy(mom->deinit);
- pthread_mutex_unlock(mom->mutex);
- pthread_mutex_destroy(mom->mutex);
- pthread_cond_destroy(mom->cv);
- }
- struct su_pthread_port_execute
- {
- pthread_mutex_t mutex[1];
- pthread_cond_t cond[1];
- int (*function)(void *);
- void *arg;
- int value;
- };
- static su_msg_function _su_pthread_port_execute;
- /** Execute the @a function by a pthread @a task.
- *
- * @retval 0 if successful
- * @retval -1 upon an error
- *
- * @sa su_task_execute()
- *
- * @internal
- */
- int su_pthread_port_execute(su_task_r const task,
- int (*function)(void *), void *arg,
- int *return_value)
- {
- int success;
- su_msg_r m = SU_MSG_R_INIT;
- #if HAVE_OPEN_C
- struct su_pthread_port_execute frame = {
- { PTHREAD_MUTEX_INITIALIZER },
- { _ENeedsNormalInit, NULL },
- NULL, NULL, 0
- };
- frame.function = function;
- frame.arg = arg;
- #else
- struct su_pthread_port_execute frame = {
- { PTHREAD_MUTEX_INITIALIZER },
- { PTHREAD_COND_INITIALIZER },
- function, arg, 0
- };
- #endif
- if (su_msg_create(m, task, su_task_null,
- _su_pthread_port_execute, (sizeof &frame)) < 0)
- return -1;
- *(struct su_pthread_port_execute **)su_msg_data(m) = &frame;
- pthread_mutex_lock(frame.mutex);
- success = su_msg_send(m);
- if (success == 0)
- while (frame.function)
- pthread_cond_wait(frame.cond, frame.mutex);
- else
- su_msg_destroy(m);
- pthread_mutex_unlock(frame.mutex);
- pthread_mutex_destroy(frame.mutex);
- pthread_cond_destroy(frame.cond);
- if (return_value)
- *return_value = frame.value;
- return success;
- }
- static void _su_pthread_port_execute(su_root_magic_t *m,
- su_msg_r msg,
- su_msg_arg_t *a)
- {
- struct su_pthread_port_execute *frame;
- frame = *(struct su_pthread_port_execute **)a;
- pthread_mutex_lock(frame->mutex);
- frame->value = frame->function(frame->arg);
- frame->function = NULL; /* Mark as completed */
- pthread_cond_signal(frame->cond);
- pthread_mutex_unlock(frame->mutex);
- }
- #if 0 /* pausing and resuming are not used */
- /** Pause the pthread port.
- *
- * This is a message function invoked by su_pthread_port_pause() and called
- * from the message dispatcher. It releases the lock sup_runlock and waits
- * until the condition variable sup_resume is signaled and sup_paused is
- * cleared by su_pthread_port_resume().
- */
- static
- void su_pthread_port_paused(su_root_magic_t *magic,
- su_msg_r msg,
- su_msg_arg_t *arg)
- {
- su_port_t *self = su_msg_to(msg)->sut_port;
- self->sup_paused = 1;
- while (self->sup_paused)
- pthread_cond_wait(self->sup_resume, self->sup_runlock);
- }
- /** Pause a port.
- *
- * Obtain an exclusive lock on port's private data.
- *
- * @retval 0 if successful (and clone is paused)
- * @retval -1 upon an error
- */
- int su_pthread_port_pause(su_port_t *self)
- {
- su_msg_r m = SU_MSG_R_INIT;
- _su_task_t task[1] = {{ self, NULL }};
- if (su_msg_create(m, task, su_task_null, su_pthread_port_paused, 0) < 0)
- return -1;
- if (su_msg_send(m) < 0)
- return -1;
- if (pthread_mutex_lock(self->sup_runlock) < 0)
- return -1;
- return 0;
- }
- /** Resume a port.
- *
- * Give up an exclusive lock on port's private data.
- *
- * @retval 0 if successful (and clone is resumed)
- * @retval -1 upon an error
- */
- int su_pthread_port_resume(su_port_t *self)
- {
- assert(self && self->sup_paused);
- self->sup_paused = 0;
- if (pthread_cond_signal(self->sup_resume) < 0 ||
- pthread_mutex_unlock(self->sup_runlock) < 0)
- return -1;
- return 0;
- }
- #endif
|