SphinxBase 0.6
|
00001 /* -*- c-basic-offset: 4; indent-tabs-mode: nil -*- */ 00002 /* ==================================================================== 00003 * Copyright (c) 2008 Carnegie Mellon University. All rights 00004 * reserved. 00005 * 00006 * Redistribution and use in source and binary forms, with or without 00007 * modification, are permitted provided that the following conditions 00008 * are met: 00009 * 00010 * 1. Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * 00013 * 2. Redistributions in binary form must reproduce the above copyright 00014 * notice, this list of conditions and the following disclaimer in 00015 * the documentation and/or other materials provided with the 00016 * distribution. 00017 * 00018 * This work was supported in part by funding from the Defense Advanced 00019 * Research Projects Agency and the National Science Foundation of the 00020 * United States of America, and the CMU Sphinx Speech Consortium. 00021 * 00022 * THIS SOFTWARE IS PROVIDED BY CARNEGIE MELLON UNIVERSITY ``AS IS'' AND 00023 * ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 00024 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 00025 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL CARNEGIE MELLON UNIVERSITY 00026 * NOR ITS EMPLOYEES BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 00027 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 00028 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 00029 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 00030 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 00031 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 00032 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00033 * 00034 * ==================================================================== 00035 * 00036 */ 00037 00044 #include <string.h> 00045 00046 #include "sphinxbase/sbthread.h" 00047 #include "sphinxbase/ckd_alloc.h" 00048 #include "sphinxbase/err.h" 00049 00050 /* 00051 * Platform-specific parts: threads, mutexes, and signals. 00052 */ 00053 #if (defined(_WIN32) || defined(__CYGWIN__)) && !defined(__SYMBIAN32__) 00054 #define _WIN32_WINNT 0x0400 00055 #include <windows.h> 00056 00057 struct sbthread_s { 00058 cmd_ln_t *config; 00059 sbmsgq_t *msgq; 00060 sbthread_main func; 00061 void *arg; 00062 HANDLE th; 00063 DWORD tid; 00064 }; 00065 00066 struct sbmsgq_s { 00067 /* Ringbuffer for passing messages. */ 00068 char *data; 00069 size_t depth; 00070 size_t out; 00071 size_t nbytes; 00072 00073 /* Current message is stored here. */ 00074 char *msg; 00075 size_t msglen; 00076 CRITICAL_SECTION mtx; 00077 HANDLE evt; 00078 }; 00079 00080 struct sbevent_s { 00081 HANDLE evt; 00082 }; 00083 00084 struct sbmtx_s { 00085 CRITICAL_SECTION mtx; 00086 }; 00087 00088 DWORD WINAPI 00089 sbthread_internal_main(LPVOID arg) 00090 { 00091 sbthread_t *th = (sbthread_t *)arg; 00092 int rv; 00093 00094 rv = (*th->func)(th); 00095 return (DWORD)rv; 00096 } 00097 00098 sbthread_t * 00099 sbthread_start(cmd_ln_t *config, sbthread_main func, void *arg) 00100 { 00101 sbthread_t *th; 00102 00103 th = ckd_calloc(1, sizeof(*th)); 00104 th->config = config; 00105 th->func = func; 00106 th->arg = arg; 00107 th->msgq = sbmsgq_init(256); 00108 th->th = CreateThread(NULL, 0, sbthread_internal_main, th, 0, &th->tid); 00109 if (th->th == NULL) { 00110 sbthread_free(th); 00111 return NULL; 00112 } 00113 return th; 00114 } 00115 00116 int 00117 sbthread_wait(sbthread_t *th) 00118 { 00119 DWORD rv, exit; 00120 00121 /* It has already been joined. */ 00122 if (th->th == NULL) 00123 return -1; 00124 00125 rv = WaitForSingleObject(th->th, INFINITE); 00126 if (rv == WAIT_FAILED) { 00127 E_ERROR("Failed to join thread: WAIT_FAILED\n"); 00128 return -1; 00129 } 00130 GetExitCodeThread(th->th, &exit); 00131 CloseHandle(th->th); 00132 th->th = NULL; 00133 return (int)exit; 00134 } 00135 00136 static DWORD 00137 cond_timed_wait(HANDLE cond, int sec, int nsec) 00138 { 00139 DWORD rv; 00140 if (sec == -1) { 00141 rv = WaitForSingleObject(cond, INFINITE); 00142 } 00143 else { 00144 DWORD ms; 00145 00146 ms = sec * 1000 + nsec / (1000*1000); 00147 rv = WaitForSingleObject(cond, ms); 00148 } 00149 return rv; 00150 } 00151 00152 /* Silvio Moioli: updated to use Unicode */ 00153 sbevent_t * 00154 sbevent_init(void) 00155 { 00156 sbevent_t *evt; 00157 00158 evt = ckd_calloc(1, sizeof(*evt)); 00159 evt->evt = CreateEventW(NULL, FALSE, FALSE, NULL); 00160 if (evt->evt == NULL) { 00161 ckd_free(evt); 00162 return NULL; 00163 } 00164 return evt; 00165 } 00166 00167 void 00168 sbevent_free(sbevent_t *evt) 00169 { 00170 CloseHandle(evt->evt); 00171 ckd_free(evt); 00172 } 00173 00174 int 00175 sbevent_signal(sbevent_t *evt) 00176 { 00177 return SetEvent(evt->evt) ? 0 : -1; 00178 } 00179 00180 int 00181 sbevent_wait(sbevent_t *evt, int sec, int nsec) 00182 { 00183 DWORD rv; 00184 00185 rv = cond_timed_wait(evt->evt, sec, nsec); 00186 return rv; 00187 } 00188 00189 sbmtx_t * 00190 sbmtx_init(void) 00191 { 00192 sbmtx_t *mtx; 00193 00194 mtx = ckd_calloc(1, sizeof(*mtx)); 00195 InitializeCriticalSection(&mtx->mtx); 00196 return mtx; 00197 } 00198 00199 int 00200 sbmtx_trylock(sbmtx_t *mtx) 00201 { 00202 return TryEnterCriticalSection(&mtx->mtx) ? 0 : -1; 00203 } 00204 00205 int 00206 sbmtx_lock(sbmtx_t *mtx) 00207 { 00208 EnterCriticalSection(&mtx->mtx); 00209 return 0; 00210 } 00211 00212 int 00213 sbmtx_unlock(sbmtx_t *mtx) 00214 { 00215 LeaveCriticalSection(&mtx->mtx); 00216 return 0; 00217 } 00218 00219 void 00220 sbmtx_free(sbmtx_t *mtx) 00221 { 00222 DeleteCriticalSection(&mtx->mtx); 00223 ckd_free(mtx); 00224 } 00225 00226 sbmsgq_t * 00227 sbmsgq_init(size_t depth) 00228 { 00229 sbmsgq_t *msgq; 00230 00231 msgq = ckd_calloc(1, sizeof(*msgq)); 00232 msgq->depth = depth; 00233 msgq->evt = CreateEventW(NULL, FALSE, FALSE, NULL); 00234 if (msgq->evt == NULL) { 00235 ckd_free(msgq); 00236 return NULL; 00237 } 00238 InitializeCriticalSection(&msgq->mtx); 00239 msgq->data = ckd_calloc(depth, 1); 00240 msgq->msg = ckd_calloc(depth, 1); 00241 return msgq; 00242 } 00243 00244 void 00245 sbmsgq_free(sbmsgq_t *msgq) 00246 { 00247 CloseHandle(msgq->evt); 00248 ckd_free(msgq->data); 00249 ckd_free(msgq->msg); 00250 ckd_free(msgq); 00251 } 00252 00253 int 00254 sbmsgq_send(sbmsgq_t *q, size_t len, void const *data) 00255 { 00256 char const *cdata = (char const *)data; 00257 size_t in; 00258 00259 /* Don't allow things bigger than depth to be sent! */ 00260 if (len + sizeof(len) > q->depth) 00261 return -1; 00262 00263 if (q->nbytes + len + sizeof(len) > q->depth) 00264 WaitForSingleObject(q->evt, INFINITE); 00265 00266 /* Lock things while we manipulate the buffer (FIXME: this 00267 actually should have been atomic with the wait above ...) */ 00268 EnterCriticalSection(&q->mtx); 00269 in = (q->out + q->nbytes) % q->depth; 00270 /* First write the size of the message. */ 00271 if (in + sizeof(len) > q->depth) { 00272 /* Handle the annoying case where the size field gets wrapped around. */ 00273 size_t len1 = q->depth - in; 00274 memcpy(q->data + in, &len, len1); 00275 memcpy(q->data, ((char *)&len) + len1, sizeof(len) - len1); 00276 q->nbytes += sizeof(len); 00277 in = sizeof(len) - len1; 00278 } 00279 else { 00280 memcpy(q->data + in, &len, sizeof(len)); 00281 q->nbytes += sizeof(len); 00282 in += sizeof(len); 00283 } 00284 00285 /* Now write the message body. */ 00286 if (in + len > q->depth) { 00287 /* Handle wraparound. */ 00288 size_t len1 = q->depth - in; 00289 memcpy(q->data + in, cdata, len1); 00290 q->nbytes += len1; 00291 cdata += len1; 00292 len -= len1; 00293 in = 0; 00294 } 00295 memcpy(q->data + in, cdata, len); 00296 q->nbytes += len; 00297 00298 /* Signal the condition variable. */ 00299 SetEvent(q->evt); 00300 /* Unlock. */ 00301 LeaveCriticalSection(&q->mtx); 00302 00303 return 0; 00304 } 00305 00306 void * 00307 sbmsgq_wait(sbmsgq_t *q, size_t *out_len, int sec, int nsec) 00308 { 00309 char *outptr; 00310 size_t len; 00311 00312 /* Wait for data to be available. */ 00313 if (q->nbytes == 0) { 00314 if (cond_timed_wait(q->evt, sec, nsec) == WAIT_FAILED) 00315 /* Timed out or something... */ 00316 return NULL; 00317 } 00318 /* Lock to manipulate the queue (FIXME) */ 00319 EnterCriticalSection(&q->mtx); 00320 /* Get the message size. */ 00321 if (q->out + sizeof(q->msglen) > q->depth) { 00322 /* Handle annoying wraparound case. */ 00323 size_t len1 = q->depth - q->out; 00324 memcpy(&q->msglen, q->data + q->out, len1); 00325 memcpy(((char *)&q->msglen) + len1, q->data, 00326 sizeof(q->msglen) - len1); 00327 q->out = sizeof(q->msglen) - len1; 00328 } 00329 else { 00330 memcpy(&q->msglen, q->data + q->out, sizeof(q->msglen)); 00331 q->out += sizeof(q->msglen); 00332 } 00333 q->nbytes -= sizeof(q->msglen); 00334 /* Get the message body. */ 00335 outptr = q->msg; 00336 len = q->msglen; 00337 if (q->out + q->msglen > q->depth) { 00338 /* Handle wraparound. */ 00339 size_t len1 = q->depth - q->out; 00340 memcpy(outptr, q->data + q->out, len1); 00341 outptr += len1; 00342 len -= len1; 00343 q->nbytes -= len1; 00344 q->out = 0; 00345 } 00346 memcpy(outptr, q->data + q->out, len); 00347 q->nbytes -= len; 00348 q->out += len; 00349 00350 /* Signal the condition variable. */ 00351 SetEvent(q->evt); 00352 /* Unlock. */ 00353 LeaveCriticalSection(&q->mtx); 00354 if (out_len) 00355 *out_len = q->msglen; 00356 return q->msg; 00357 } 00358 00359 #else /* POSIX */ 00360 #include <pthread.h> 00361 #include <sys/time.h> 00362 00363 struct sbthread_s { 00364 cmd_ln_t *config; 00365 sbmsgq_t *msgq; 00366 sbthread_main func; 00367 void *arg; 00368 pthread_t th; 00369 }; 00370 00371 struct sbmsgq_s { 00372 /* Ringbuffer for passing messages. */ 00373 char *data; 00374 size_t depth; 00375 size_t out; 00376 size_t nbytes; 00377 00378 /* Current message is stored here. */ 00379 char *msg; 00380 size_t msglen; 00381 pthread_mutex_t mtx; 00382 pthread_cond_t cond; 00383 }; 00384 00385 struct sbevent_s { 00386 pthread_mutex_t mtx; 00387 pthread_cond_t cond; 00388 int signalled; 00389 }; 00390 00391 struct sbmtx_s { 00392 pthread_mutex_t mtx; 00393 }; 00394 00395 static void * 00396 sbthread_internal_main(void *arg) 00397 { 00398 sbthread_t *th = (sbthread_t *)arg; 00399 int rv; 00400 00401 rv = (*th->func)(th); 00402 return (void *)(long)rv; 00403 } 00404 00405 sbthread_t * 00406 sbthread_start(cmd_ln_t *config, sbthread_main func, void *arg) 00407 { 00408 sbthread_t *th; 00409 int rv; 00410 00411 th = ckd_calloc(1, sizeof(*th)); 00412 th->config = config; 00413 th->func = func; 00414 th->arg = arg; 00415 th->msgq = sbmsgq_init(1024); 00416 if ((rv = pthread_create(&th->th, NULL, &sbthread_internal_main, th)) != 0) { 00417 E_ERROR("Failed to create thread: %d\n", rv); 00418 sbthread_free(th); 00419 return NULL; 00420 } 00421 return th; 00422 } 00423 00424 int 00425 sbthread_wait(sbthread_t *th) 00426 { 00427 void *exit; 00428 int rv; 00429 00430 /* It has already been joined. */ 00431 if (th->th == (pthread_t)-1) 00432 return -1; 00433 00434 rv = pthread_join(th->th, &exit); 00435 if (rv != 0) { 00436 E_ERROR("Failed to join thread: %d\n", rv); 00437 return -1; 00438 } 00439 th->th = (pthread_t)-1; 00440 return (int)(long)exit; 00441 } 00442 00443 sbmsgq_t * 00444 sbmsgq_init(size_t depth) 00445 { 00446 sbmsgq_t *msgq; 00447 00448 msgq = ckd_calloc(1, sizeof(*msgq)); 00449 msgq->depth = depth; 00450 if (pthread_cond_init(&msgq->cond, NULL) != 0) { 00451 ckd_free(msgq); 00452 return NULL; 00453 } 00454 if (pthread_mutex_init(&msgq->mtx, NULL) != 0) { 00455 pthread_cond_destroy(&msgq->cond); 00456 ckd_free(msgq); 00457 return NULL; 00458 } 00459 msgq->data = ckd_calloc(depth, 1); 00460 msgq->msg = ckd_calloc(depth, 1); 00461 return msgq; 00462 } 00463 00464 void 00465 sbmsgq_free(sbmsgq_t *msgq) 00466 { 00467 pthread_mutex_destroy(&msgq->mtx); 00468 pthread_cond_destroy(&msgq->cond); 00469 ckd_free(msgq->data); 00470 ckd_free(msgq->msg); 00471 ckd_free(msgq); 00472 } 00473 00474 int 00475 sbmsgq_send(sbmsgq_t *q, size_t len, void const *data) 00476 { 00477 size_t in; 00478 00479 /* Don't allow things bigger than depth to be sent! */ 00480 if (len + sizeof(len) > q->depth) 00481 return -1; 00482 00483 /* Lock the condition variable while we manipulate the buffer. */ 00484 pthread_mutex_lock(&q->mtx); 00485 if (q->nbytes + len + sizeof(len) > q->depth) { 00486 /* Unlock and wait for space to be available. */ 00487 if (pthread_cond_wait(&q->cond, &q->mtx) != 0) { 00488 /* Timed out, don't send anything. */ 00489 pthread_mutex_unlock(&q->mtx); 00490 return -1; 00491 } 00492 /* Condition is now locked again. */ 00493 } 00494 in = (q->out + q->nbytes) % q->depth; 00495 00496 /* First write the size of the message. */ 00497 if (in + sizeof(len) > q->depth) { 00498 /* Handle the annoying case where the size field gets wrapped around. */ 00499 size_t len1 = q->depth - in; 00500 memcpy(q->data + in, &len, len1); 00501 memcpy(q->data, ((char *)&len) + len1, sizeof(len) - len1); 00502 q->nbytes += sizeof(len); 00503 in = sizeof(len) - len1; 00504 } 00505 else { 00506 memcpy(q->data + in, &len, sizeof(len)); 00507 q->nbytes += sizeof(len); 00508 in += sizeof(len); 00509 } 00510 00511 /* Now write the message body. */ 00512 if (in + len > q->depth) { 00513 /* Handle wraparound. */ 00514 size_t len1 = q->depth - in; 00515 memcpy(q->data + in, data, len1); 00516 q->nbytes += len1; 00517 data = (char const *)data + len1; 00518 len -= len1; 00519 in = 0; 00520 } 00521 memcpy(q->data + in, data, len); 00522 q->nbytes += len; 00523 00524 /* Signal the condition variable. */ 00525 pthread_cond_signal(&q->cond); 00526 /* Unlock it, we have nothing else to do. */ 00527 pthread_mutex_unlock(&q->mtx); 00528 return 0; 00529 } 00530 00531 static int 00532 cond_timed_wait(pthread_cond_t *cond, pthread_mutex_t *mtx, int sec, int nsec) 00533 { 00534 int rv; 00535 if (sec == -1) { 00536 rv = pthread_cond_wait(cond, mtx); 00537 } 00538 else { 00539 struct timeval now; 00540 struct timespec end; 00541 00542 gettimeofday(&now, NULL); 00543 end.tv_sec = now.tv_sec + sec; 00544 end.tv_nsec = now.tv_usec * 1000 + nsec; 00545 if (end.tv_nsec > (1000*1000*1000)) { 00546 sec += end.tv_nsec / (1000*1000*1000); 00547 end.tv_nsec = end.tv_nsec % (1000*1000*1000); 00548 } 00549 rv = pthread_cond_timedwait(cond, mtx, &end); 00550 } 00551 return rv; 00552 } 00553 00554 void * 00555 sbmsgq_wait(sbmsgq_t *q, size_t *out_len, int sec, int nsec) 00556 { 00557 char *outptr; 00558 size_t len; 00559 00560 /* Lock the condition variable while we manipulate nmsg. */ 00561 pthread_mutex_lock(&q->mtx); 00562 if (q->nbytes == 0) { 00563 /* Unlock the condition variable and wait for a signal. */ 00564 if (cond_timed_wait(&q->cond, &q->mtx, sec, nsec) != 0) { 00565 /* Timed out or something... */ 00566 pthread_mutex_unlock(&q->mtx); 00567 return NULL; 00568 } 00569 /* Condition variable is now locked again. */ 00570 } 00571 /* Get the message size. */ 00572 if (q->out + sizeof(q->msglen) > q->depth) { 00573 /* Handle annoying wraparound case. */ 00574 size_t len1 = q->depth - q->out; 00575 memcpy(&q->msglen, q->data + q->out, len1); 00576 memcpy(((char *)&q->msglen) + len1, q->data, 00577 sizeof(q->msglen) - len1); 00578 q->out = sizeof(q->msglen) - len1; 00579 } 00580 else { 00581 memcpy(&q->msglen, q->data + q->out, sizeof(q->msglen)); 00582 q->out += sizeof(q->msglen); 00583 } 00584 q->nbytes -= sizeof(q->msglen); 00585 /* Get the message body. */ 00586 outptr = q->msg; 00587 len = q->msglen; 00588 if (q->out + q->msglen > q->depth) { 00589 /* Handle wraparound. */ 00590 size_t len1 = q->depth - q->out; 00591 memcpy(outptr, q->data + q->out, len1); 00592 outptr += len1; 00593 len -= len1; 00594 q->nbytes -= len1; 00595 q->out = 0; 00596 } 00597 memcpy(outptr, q->data + q->out, len); 00598 q->nbytes -= len; 00599 q->out += len; 00600 00601 /* Signal the condition variable. */ 00602 pthread_cond_signal(&q->cond); 00603 /* Unlock the condition variable, we are done. */ 00604 pthread_mutex_unlock(&q->mtx); 00605 if (out_len) 00606 *out_len = q->msglen; 00607 return q->msg; 00608 } 00609 00610 sbevent_t * 00611 sbevent_init(void) 00612 { 00613 sbevent_t *evt; 00614 int rv; 00615 00616 evt = ckd_calloc(1, sizeof(*evt)); 00617 if ((rv = pthread_mutex_init(&evt->mtx, NULL)) != 0) { 00618 E_ERROR("Failed to initialize mutex: %d\n", rv); 00619 ckd_free(evt); 00620 return NULL; 00621 } 00622 if ((rv = pthread_cond_init(&evt->cond, NULL)) != 0) { 00623 E_ERROR_SYSTEM("Failed to initialize mutex: %d\n", rv); 00624 pthread_mutex_destroy(&evt->mtx); 00625 ckd_free(evt); 00626 return NULL; 00627 } 00628 return evt; 00629 } 00630 00631 void 00632 sbevent_free(sbevent_t *evt) 00633 { 00634 pthread_mutex_destroy(&evt->mtx); 00635 pthread_cond_destroy(&evt->cond); 00636 ckd_free(evt); 00637 } 00638 00639 int 00640 sbevent_signal(sbevent_t *evt) 00641 { 00642 int rv; 00643 00644 pthread_mutex_lock(&evt->mtx); 00645 evt->signalled = TRUE; 00646 rv = pthread_cond_signal(&evt->cond); 00647 pthread_mutex_unlock(&evt->mtx); 00648 return rv; 00649 } 00650 00651 int 00652 sbevent_wait(sbevent_t *evt, int sec, int nsec) 00653 { 00654 int rv = 0; 00655 00656 /* Lock the mutex before we check its signalled state. */ 00657 pthread_mutex_lock(&evt->mtx); 00658 /* If it's not signalled, then wait until it is. */ 00659 if (!evt->signalled) 00660 rv = cond_timed_wait(&evt->cond, &evt->mtx, sec, nsec); 00661 /* Set its state to unsignalled if we were successful. */ 00662 if (rv == 0) 00663 evt->signalled = FALSE; 00664 /* And unlock its mutex. */ 00665 pthread_mutex_unlock(&evt->mtx); 00666 00667 return rv; 00668 } 00669 00670 sbmtx_t * 00671 sbmtx_init(void) 00672 { 00673 sbmtx_t *mtx; 00674 00675 mtx = ckd_calloc(1, sizeof(*mtx)); 00676 if (pthread_mutex_init(&mtx->mtx, NULL) != 0) { 00677 ckd_free(mtx); 00678 return NULL; 00679 } 00680 return mtx; 00681 } 00682 00683 int 00684 sbmtx_trylock(sbmtx_t *mtx) 00685 { 00686 return pthread_mutex_trylock(&mtx->mtx); 00687 } 00688 00689 int 00690 sbmtx_lock(sbmtx_t *mtx) 00691 { 00692 return pthread_mutex_lock(&mtx->mtx); 00693 } 00694 00695 int 00696 sbmtx_unlock(sbmtx_t *mtx) 00697 { 00698 return pthread_mutex_unlock(&mtx->mtx); 00699 } 00700 00701 void 00702 sbmtx_free(sbmtx_t *mtx) 00703 { 00704 pthread_mutex_destroy(&mtx->mtx); 00705 ckd_free(mtx); 00706 } 00707 #endif /* not WIN32 */ 00708 00709 cmd_ln_t * 00710 sbthread_config(sbthread_t *th) 00711 { 00712 return th->config; 00713 } 00714 00715 void * 00716 sbthread_arg(sbthread_t *th) 00717 { 00718 return th->arg; 00719 } 00720 00721 sbmsgq_t * 00722 sbthread_msgq(sbthread_t *th) 00723 { 00724 return th->msgq; 00725 } 00726 00727 int 00728 sbthread_send(sbthread_t *th, size_t len, void const *data) 00729 { 00730 return sbmsgq_send(th->msgq, len, data); 00731 } 00732 00733 void 00734 sbthread_free(sbthread_t *th) 00735 { 00736 sbthread_wait(th); 00737 sbmsgq_free(th->msgq); 00738 ckd_free(th); 00739 }