[LUAU] pthreads signaling question

Jim Thompson jim at netgate.com
Fri Mar 11 00:26:55 PST 2005


On Mar 10, 2005, at 9:05 PM, Charles Lockhart wrote:

> Jim Thompson wrote:
>
>> pthread_cond_broadcast() will (attempt) to waken *ALL* threads 
>> waiting on that condition variable.   If there is more than one, 
>> they'll
>> race, and the first one that gets past the mutex 'wins'.   
>> pthread_cond_signal() will attempt to waken *one* thread (typically 
>> the first
>> thread waiting on the condition variable.)
>
> What do you mean 'wins'?  Maybe I don't understand correctly, but if I 
> have three threads waiting on a conditional, and I do a broadcast on 
> that conditional, they should ALL wake up, right?

Yes, but then two of them won't get (the outter) mutex which 
essentially forced my former code to execute single-threaded.   The one 
that has or gets the mutex 'wins' (and gets to continue running).

Of course, before you said:

>> One of the threads sleeps on a conditional wait ( 
>> pthread_cond_wait(&my_cond, &my_mutex)), and when a different thread 
>> calls pthread_cond_broadcast(&my_cond), I kind of expect the first 
>> thread to wake it's shiny smiling face and do some work.  But it 
>> doesn't.  Shouldn't it?  It used to.

And I'll have to ask, "Are you 100% sure that the first thread is 
sleeping on the CV?"  The pthread_cond_broadcast() and 
pthread_cond_signal() functions have no effect if there are no threads 
currently blocked on the CV.

If all (worker?) threads are busy handling previous requests, when a 
new one arrives, the signaling of the condition variable will do 
nothing (since all threads are busy doing other things, NOT waiting on 
the condition variable), and after all the worker (?) threads finish 
handling their current request, they come back to wait on the variable, 
which won't necessarily be signaled again (for example, if no new 
requests arrive). Thus, there is at least one request pending, while 
all handling threads are blocked, waiting for a signal, which will 
never arrive.

I can't read your code, but this describes one interpretation of the 
problem you may be having, given what you've said, though violating the 
assumption that the thread is, indeed, sleeping on the CV.

The typical response to this problem is to  set some integer variable 
to denote the number of pending requests, and have each thread check 
the value of this variable before waiting on the CV. If this variable's 
value is positive, some request is pending, and the thread should go 
and handle it, instead of going to sleep on the CV. Additionally, a 
thread that handles a request, should reduce the value of this variable 
by one, to keep the count correct.

I've attached a more complete example, though the code here is still 
imperfect.

>  I mean, I don't see anything to indicate that there's any particular 
> order, but they should all wake up.  right?  Wait, uh, yeah, that's 
> even what it says in the man page.  Whereas pthread_cond_signal wakes 
> up one thread, but if there are multiple threads waiting, there's no 
> provision for determining which one.

Typically you get the thread at the top of the list associated with 
that CV.   Now, please pardon me while I get pedantic.

The first version of the pthreads standard (IEEE POSIX 1003.1c standard 
(1995)) says pthread_cond_signal() will wake "one" thread, but it 
doesn't say which one, as you allow.

The "Unified Unix Specification" (IEEE POSIX 1003-2001 (and 2004) says 
"at least one", which would allow a conforming implementation to wake 
*all* threads waiting on a given CV, just like 
pthread_cond_broadcast().   To see why this was changed, read this 
little tidbit from the 2001 (and 2004) versions of 1003.1:

------
On a multi-processor, it may be impossible for an implementation of 
pthread_cond_signal() to avoid the unblocking of more than one thread 
blocked on a condition variable. For example, consider the following 
partial implementation of pthread_cond_wait() and 
pthread_cond_signal(), executed by two threads in the order given. One 
thread is trying to wait on the condition variable, another is 
concurrently executing pthread_cond_signal(), while a third thread is 
already waiting.

pthread_cond_wait(mutex, cond):
     value = cond->value; /* 1 */
     pthread_mutex_unlock(mutex); /* 2 */
     pthread_mutex_lock(cond->mutex); /* 10 */
     if (value == cond->value) { /* 11 */
         me->next_cond = cond->waiter;
         cond->waiter = me;
         pthread_mutex_unlock(cond->mutex);
         unable_to_run(me);
     } else
         pthread_mutex_unlock(cond->mutex); /* 12 */
     pthread_mutex_lock(mutex); /* 13 */


pthread_cond_signal(cond):
     pthread_mutex_lock(cond->mutex); /* 3 */
     cond->value++; /* 4 */
     if (cond->waiter) { /* 5 */
         sleeper = cond->waiter; /* 6 */
         cond->waiter = sleeper->next_cond; /* 7 */
         able_to_run(sleeper); /* 8 */
     }
     pthread_mutex_unlock(cond->mutex); /* 9 */


The effect is that more than one thread can return from its call to 
pthread_cond_wait() or pthread_cond_timedwait() as a result of one call 
to pthread_cond_signal(). This effect is called "spurious wakeup". Note 
that the situation is self-correcting in that the number of threads 
that are so awakened is finite; for example, the next thread to call 
pthread_cond_wait() after the sequence of events above blocks.

While this problem could be resolved, the loss of efficiency for a 
fringe condition that occurs only rarely is unacceptable, especially 
given that one has to check the predicate associated with a condition 
variable anyway. Correcting this problem would unnecessarily reduce the 
degree of concurrency in this basic building block for all higher-level 
synchronization operations.

An added benefit of allowing spurious wakeups is that applications are 
forced to code a predicate-testing-loop around the condition wait. This 
also makes the application tolerate superfluous condition broadcasts or 
signals on the same condition variable that may be coded in some other 
part of the application. The resulting applications are thus more 
robust. Therefore, IEEE Std 1003.1-2001 explicitly documents that 
spurious wakeups may occur.
--------

Heh.

>> Perhaps you're just running into side-effects of the scheduler.  It 
>> seems to work here (gentoo, 2.6.10 kernel).  See the code and output 
>> below.
>> (I'm not bragging on the code, it was quick-n-dirty.)
>
> Ah, man, you're code worked on my machine too.  So I must be doing 
> something screwy.  I guess I should be happy, because that means I can 
> fix the bug.  On the other hand, that means I have to fix the bug.

In my experience, fixing bugs in your code is more fun than finding 
workarounds for bugs in other people's code.

>> The Fedora Project officially ended support for Fedora Core 1 (FC1) 
>> on September 20th, 2004.  FC3 was released November 8, 2004.
>> Maybe you should run FC3 (which is current) .vs a release that is now 
>> known as "Fedora Legacy".  (http://fedora.redhat.com/)
>
> Yeah, that would be nice.  But some of the hardware we have in "the 
> machine" currently only has drivers for the 2.4 kernel, and it's kind 
> of late in the game to start porting and testing, and I am way too 
> busy (and LAZY).  Probably on the next instrument.
>
> Jim, you The Man, thank you very much for your time and feedback.

No problem.  Here's the more complete code (the previous thing was a 
hack.  This could actually be used to do something, though there are 
still changes that would need to be done if this was part of a 
long-running program.  (I wouldn't accept an exit on an out of memory 
condition if I was in the code review.   Still, this is a mail list, 
and its unlikely that I'll be working for anyone soon, so perfection 
isn't required.)

Note that the scheduling behavior is still quite different if you use 
pthread_cond_signal() .vs pthread_cond_broadcast().

/home/jim> cat thread-pool-server.c
#define _GNU_SOURCE

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>

/* number of threads used to service requests */
#define NUM_HANDLER_THREADS 3

/* global mutex for our program. note that we use a recursive mutex,
  * since a handlerthread might try to lock it twice consecutively.
  */

pthread_mutex_t request_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;

/* global condition variable for our program.*/
pthread_cond_t  got_request   = PTHREAD_COND_INITIALIZER;

int num_requests = 0;

/* a single request. */
struct request {
     struct request* next;   /* pointer to next request, NULL if none */
     int number;
};

struct request* requests = NULL;     /* head of linked list of 
requests. */
struct request* last_request = NULL; /* pointer to last request */

/*
  * add a request to the requests list
  *
  * creates a request structure, adds to the list, and
  * increases number of pending requests by one.
  */
void
add_request(int request_num, pthread_mutex_t* p_mutex, pthread_cond_t*  
p_cond_var)
{
     int rc;                         /* return code of pthreads 
functions, UNCHECKED */
     struct request* a_request;      /* pointer to newly added request */

     /* create structure with new request */
     a_request = (struct request*)malloc(sizeof(struct request));
     if (!a_request) { /* malloc failed?? */
         fprintf(stderr, "add_request: out of memory\n");
         exit(1);
     }
     a_request->number = request_num;
     a_request->next = NULL;

     /* lock the mutex, to assure exclusive access to the list */
     rc = pthread_mutex_lock(p_mutex);

     /* add new request to the end of the list, updating list pointers 
as required */
     if (num_requests == 0) { /* special case - list is empty */
         requests = a_request;
         last_request = a_request;
     }  else {
         last_request->next = a_request;
         last_request = a_request;
     }

     /* increase total number of pending requests by one. */
     num_requests++;

#ifdef DEBUG
     printf("add_request: added request with id '%d'\n", 
a_request->number);
     fflush(stdout);
#endif /* DEBUG */

     /* unlock mutex */
     rc = pthread_mutex_unlock(p_mutex);

     /* signal the condition variable - there's a new request to handle 
*/
#ifdef COND_SIGNAL
     rc = pthread_cond_signal(p_cond_var);
#else
     rc = pthread_cond_broadcast(p_cond_var);
#endif
}

/*
  * gets the first pending request from the requests list removing it 
from the list.
  * the returned request need to be freed by the caller.
  */
struct request*
get_request(pthread_mutex_t* p_mutex)
{
     int rc;                         /* return code of pthreads 
functions UNCHECKED */
     struct request* a_request;      /* pointer to request */

     /* lock the mutex, to assure exclusive access to the list */
     rc = pthread_mutex_lock(p_mutex);

     if (num_requests > 0) {
         a_request = requests;
         requests = a_request->next;
         if (requests == NULL) { /* this was the last request on the 
list */
             last_request = NULL;
         }
         /* decrease the total number of pending requests */
         num_requests--;
     }  else { /* requests list is empty */
         a_request = NULL;
     }

     /* unlock mutex */
     rc = pthread_mutex_unlock(p_mutex);

     /* return the request to the caller. */
     return a_request;
}

void
handle_request(struct request* a_request, int thread_id)
{
     if (a_request) {
         printf("Thread '%d' handled request '%d'\n", thread_id, 
a_request->number);
         fflush(stdout);
     }
}

/*
  * infinite loop of request handling
  */
void*
handle_requests_loop(void* data)
{
     int rc;                         /* return code UNCHECKED */
     struct request* a_request;      /* pointer to a request */
     int thread_id = *((int*)data);  /* thread id */

#ifdef DEBUG
     printf("Starting thread '%d'\n", thread_id);
     fflush(stdout);
#endif /* DEBUG */

     /* access the requests list exclusively */
     rc = pthread_mutex_lock(&request_mutex);

#ifdef DEBUG
     printf("thread '%d' after pthread_mutex_lock\n", thread_id);
     fflush(stdout);
#endif /* DEBUG */

     while (1) {
#ifdef DEBUG
         printf("thread '%d', num_requests =  %d\n", thread_id, 
num_requests);
         fflush(stdout);
#endif /* DEBUG */
         if (num_requests > 0) { /* a request is pending */
             a_request = get_request(&request_mutex);
             if (a_request) { /* got a request - handle and free it */
                 /* unlock mutex - so other threads would be able to 
handle
                  * other reqeusts waiting in the queue paralelly
                  */
                 rc = pthread_mutex_unlock(&request_mutex);
                 handle_request(a_request, thread_id);
                 free(a_request);
                 /* and lock the mutex again */
                 rc = pthread_mutex_lock(&request_mutex);
             }
         } else {
             /* wait for a request to arrive. note the mutex will be
              * unlocked here, thus allowing other threads access to
              * requests list
              */
#ifdef DEBUG
             printf("thread '%d' before pthread_cond_wait\n", thread_id);
             fflush(stdout);
#endif /* DEBUG */
             rc = pthread_cond_wait(&got_request, &request_mutex);
             /* after we return from pthread_cond_wait, the mutex
              * is locked again, so we don't need to lock it ourselves
              */
#ifdef DEBUG
             printf("thread '%d' after pthread_cond_wait\n", thread_id);
             fflush(stdout);
#endif /* DEBUG */
         }
     }
}

int
main(int argc, char* argv[])
{
     int i;                                       /* loop control */
     int thr_id[NUM_HANDLER_THREADS];             /* thread IDs */
     pthread_t  p_threads[NUM_HANDLER_THREADS];   /* thread's structures 
*/
     struct timespec delay;                       /* used for wasting 
time */

     /* create the request-handling threads */
     for (i=0; i<NUM_HANDLER_THREADS; i++) {
         thr_id[i] = i;  /* this should actually store the value 
returned from pthread_create() for a future pthread_join() */
         pthread_create(&p_threads[i], NULL, handle_requests_loop, 
(void*)&thr_id[i]);
     }

     /* generate requests */
     for (i=0; i<600; i++) {
         add_request(i, &request_mutex, &got_request);
         /* pause execution for a little bit, to allow
          * other threads to run and handle some requests.
          */
         if (rand() > 3*(RAND_MAX/4)) { /* approx 25% of the time */
             delay.tv_sec = 0;
             delay.tv_nsec = 10;
             nanosleep(&delay, NULL);
         }
     }

     /* now wait till there are no more requests to process */
     sleep(5);

     printf("Check ya latter, bra\n");

     return 0;
}




More information about the LUAU mailing list