(root)/
glibc-2.38/
nptl/
sem_waitcommon.c
       1  /* sem_waitcommon -- wait on a semaphore, shared code.
       2     Copyright (C) 2003-2023 Free Software Foundation, Inc.
       3     This file is part of the GNU C Library.
       4  
       5     The GNU C Library is free software; you can redistribute it and/or
       6     modify it under the terms of the GNU Lesser General Public
       7     License as published by the Free Software Foundation; either
       8     version 2.1 of the License, or (at your option) any later version.
       9  
      10     The GNU C Library is distributed in the hope that it will be useful,
      11     but WITHOUT ANY WARRANTY; without even the implied warranty of
      12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
      13     Lesser General Public License for more details.
      14  
      15     You should have received a copy of the GNU Lesser General Public
      16     License along with the GNU C Library; if not, see
      17     <https://www.gnu.org/licenses/>.  */
      18  
      19  #include <kernel-features.h>
      20  #include <errno.h>
      21  #include <sysdep.h>
      22  #include <futex-internal.h>
      23  #include <internaltypes.h>
      24  #include <semaphore.h>
      25  #include <sys/time.h>
      26  
      27  #include <pthreadP.h>
      28  #include <shlib-compat.h>
      29  #include <atomic.h>
      30  
      31  
      32  /* The semaphore provides two main operations: sem_post adds a token to the
      33     semaphore; sem_wait grabs a token from the semaphore, potentially waiting
      34     until there is a token available.  A sem_wait needs to synchronize with
      35     the sem_post that provided the token, so that whatever lead to the sem_post
      36     happens before the code after sem_wait.
      37  
      38     Conceptually, available tokens can simply be counted; let's call that the
      39     value of the semaphore.  However, we also want to know whether there might
      40     be a sem_wait that is blocked on the value because it was zero (using a
      41     futex with the value being the futex variable); if there is no blocked
      42     sem_wait, sem_post does not need to execute a futex_wake call.  Therefore,
      43     we also need to count the number of potentially blocked sem_wait calls
      44     (which we call nwaiters).
      45  
      46     What makes this tricky is that POSIX requires that a semaphore can be
      47     destroyed as soon as the last remaining sem_wait has returned, and no
      48     other sem_wait or sem_post calls are executing concurrently.  However, the
      49     sem_post call whose token was consumed by the last sem_wait is considered
      50     to have finished once it provided the token to the sem_wait.
      51     Thus, sem_post must not access the semaphore struct anymore after it has
      52     made a token available; IOW, it needs to be able to atomically provide
      53     a token and check whether any blocked sem_wait calls might exist.
      54  
      55     This is straightforward to do if the architecture provides 64b atomics
      56     because we can just put both the value and nwaiters into one variable that
      57     we access atomically: This is the data field, the value is in the
      58     least-significant 32 bits, and nwaiters in the other bits.  When sem_post
      59     makes a value available, it can atomically check nwaiters.
      60  
      61     If we have only 32b atomics available, we cannot put both nwaiters and
      62     value into one 32b value because then we might have too few bits for both
      63     of those counters.  Therefore, we need to use two distinct fields.
      64  
      65     To allow sem_post to atomically make a token available and check for
      66     blocked sem_wait calls, we use one bit in value to indicate whether
      67     nwaiters is nonzero.  That allows sem_post to use basically the same
      68     algorithm as with 64b atomics, but requires sem_wait to update the bit; it
      69     can't do this atomically with another access to nwaiters, but it can compute
      70     a conservative value for the bit because it's benign if the bit is set
      71     even if nwaiters is zero (all we get is an unnecessary futex wake call by
      72     sem_post).
      73     Specifically, sem_wait will unset the bit speculatively if it believes that
      74     there is no other concurrently executing sem_wait.  If it misspeculated,
      75     it will have to clean up by waking any other sem_wait call (i.e., what
      76     sem_post would do otherwise).  This does not conflict with the destruction
      77     requirement because the semaphore must not be destructed while any sem_wait
      78     is still executing.  */
      79  
      80  #if !__HAVE_64B_ATOMICS
      81  static void
      82  __sem_wait_32_finish (struct new_sem *sem);
      83  #endif
      84  
      85  static void
      86  __sem_wait_cleanup (void *arg)
      87  {
      88    struct new_sem *sem = (struct new_sem *) arg;
      89  
      90  #if __HAVE_64B_ATOMICS
      91    /* Stop being registered as a waiter.  See below for MO.  */
      92    atomic_fetch_add_relaxed (&sem->data, -((uint64_t) 1 << SEM_NWAITERS_SHIFT));
      93  #else
      94    __sem_wait_32_finish (sem);
      95  #endif
      96  }
      97  
      98  /* Wait until at least one token is available, possibly with a timeout.
      99     This is in a separate function in order to make sure gcc
     100     puts the call site into an exception region, and thus the
     101     cleanups get properly run.  TODO still necessary?  Other futex_wait
     102     users don't seem to need it.  */
     103  static int
     104  __attribute__ ((noinline))
     105  do_futex_wait (struct new_sem *sem, clockid_t clockid,
     106  	       const struct __timespec64 *abstime)
     107  {
     108    int err;
     109  
     110  #if __HAVE_64B_ATOMICS
     111    err = __futex_abstimed_wait_cancelable64 (
     112        (unsigned int *) &sem->data + SEM_VALUE_OFFSET, 0,
     113        clockid, abstime,
     114        sem->private);
     115  #else
     116    err = __futex_abstimed_wait_cancelable64 (&sem->value, SEM_NWAITERS_MASK,
     117  					    clockid, abstime, sem->private);
     118  #endif
     119  
     120    return err;
     121  }
     122  
     123  /* Fast path: Try to grab a token without blocking.  */
     124  static int
     125  __new_sem_wait_fast (struct new_sem *sem, int definitive_result)
     126  {
     127    /* We need acquire MO if we actually grab a token, so that this
     128       synchronizes with all token providers (i.e., the RMW operation we read
     129       from or all those before it in modification order; also see sem_post).
     130       We do not need to guarantee any ordering if we observed that there is
     131       no token (POSIX leaves it unspecified whether functions that fail
     132       synchronize memory); thus, relaxed MO is sufficient for the initial load
     133       and the failure path of the CAS.  If the weak CAS fails and we need a
     134       definitive result, retry.  */
     135  #if __HAVE_64B_ATOMICS
     136    uint64_t d = atomic_load_relaxed (&sem->data);
     137    do
     138      {
     139        if ((d & SEM_VALUE_MASK) == 0)
     140  	break;
     141        if (atomic_compare_exchange_weak_acquire (&sem->data, &d, d - 1))
     142  	return 0;
     143      }
     144    while (definitive_result);
     145    return -1;
     146  #else
     147    unsigned int v = atomic_load_relaxed (&sem->value);
     148    do
     149      {
     150        if ((v >> SEM_VALUE_SHIFT) == 0)
     151  	break;
     152        if (atomic_compare_exchange_weak_acquire (&sem->value,
     153  	  &v, v - (1 << SEM_VALUE_SHIFT)))
     154  	return 0;
     155      }
     156    while (definitive_result);
     157    return -1;
     158  #endif
     159  }
     160  
     161  /* Slow path that blocks.  */
     162  static int
     163  __attribute__ ((noinline))
     164  __new_sem_wait_slow64 (struct new_sem *sem, clockid_t clockid,
     165  		       const struct __timespec64 *abstime)
     166  {
     167    int err = 0;
     168  
     169  #if __HAVE_64B_ATOMICS
     170    /* Add a waiter.  Relaxed MO is sufficient because we can rely on the
     171       ordering provided by the RMW operations we use.  */
     172    uint64_t d = atomic_fetch_add_relaxed (&sem->data,
     173        (uint64_t) 1 << SEM_NWAITERS_SHIFT);
     174  
     175    pthread_cleanup_push (__sem_wait_cleanup, sem);
     176  
     177    /* Wait for a token to be available.  Retry until we can grab one.  */
     178    for (;;)
     179      {
     180        /* If there is no token available, sleep until there is.  */
     181        if ((d & SEM_VALUE_MASK) == 0)
     182  	{
     183  	  err = do_futex_wait (sem, clockid, abstime);
     184  	  /* A futex return value of 0 or EAGAIN is due to a real or spurious
     185  	     wake-up, or due to a change in the number of tokens.  We retry in
     186  	     these cases.
     187  	     If we timed out, forward this to the caller.
     188  	     EINTR is returned if we are interrupted by a signal; we
     189  	     forward this to the caller.  (See futex_wait and related
     190  	     documentation.  Before Linux 2.6.22, EINTR was also returned on
     191  	     spurious wake-ups; we only support more recent Linux versions,
     192  	     so do not need to consider this here.)  */
     193  	  if (err == ETIMEDOUT || err == EINTR || err == EOVERFLOW)
     194  	    {
     195  	      __set_errno (err);
     196  	      err = -1;
     197  	      /* Stop being registered as a waiter.  */
     198  	      atomic_fetch_add_relaxed (&sem->data,
     199  		  -((uint64_t) 1 << SEM_NWAITERS_SHIFT));
     200  	      break;
     201  	    }
     202  	  /* Relaxed MO is sufficient; see below.  */
     203  	  d = atomic_load_relaxed (&sem->data);
     204  	}
     205        else
     206  	{
     207  	  /* Try to grab both a token and stop being a waiter.  We need
     208  	     acquire MO so this synchronizes with all token providers (i.e.,
     209  	     the RMW operation we read from or all those before it in
     210  	     modification order; also see sem_post).  On the failure path,
     211  	     relaxed MO is sufficient because we only eventually need the
     212  	     up-to-date value; the futex_wait or the CAS perform the real
     213  	     work.  */
     214  	  if (atomic_compare_exchange_weak_acquire (&sem->data,
     215  	      &d, d - 1 - ((uint64_t) 1 << SEM_NWAITERS_SHIFT)))
     216  	    {
     217  	      err = 0;
     218  	      break;
     219  	    }
     220  	}
     221      }
     222  
     223    pthread_cleanup_pop (0);
     224  #else
     225    /* The main difference to the 64b-atomics implementation is that we need to
     226       access value and nwaiters in separate steps, and that the nwaiters bit
     227       in the value can temporarily not be set even if nwaiters is nonzero.
     228       We work around incorrectly unsetting the nwaiters bit by letting sem_wait
     229       set the bit again and waking the number of waiters that could grab a
     230       token.  There are two additional properties we need to ensure:
     231       (1) We make sure that whenever unsetting the bit, we see the increment of
     232       nwaiters by the other thread that set the bit.  IOW, we will notice if
     233       we make a mistake.
     234       (2) When setting the nwaiters bit, we make sure that we see the unsetting
     235       of the bit by another waiter that happened before us.  This avoids having
     236       to blindly set the bit whenever we need to block on it.  We set/unset
     237       the bit while having incremented nwaiters (i.e., are a registered
     238       waiter), and the problematic case only happens when one waiter indeed
     239       followed another (i.e., nwaiters was never larger than 1); thus, this
     240       works similarly as with a critical section using nwaiters (see the MOs
     241       and related comments below).
     242  
     243       An alternative approach would be to unset the bit after decrementing
     244       nwaiters; however, that would result in needing Dekker-like
     245       synchronization and thus full memory barriers.  We also would not be able
     246       to prevent misspeculation, so this alternative scheme does not seem
     247       beneficial.  */
     248    unsigned int v;
     249  
     250    /* Add a waiter.  We need acquire MO so this synchronizes with the release
     251       MO we use when decrementing nwaiters below; it ensures that if another
     252       waiter unset the bit before us, we see that and set it again.  Also see
     253       property (2) above.  */
     254    atomic_fetch_add_acquire (&sem->nwaiters, 1);
     255  
     256    pthread_cleanup_push (__sem_wait_cleanup, sem);
     257  
     258    /* Wait for a token to be available.  Retry until we can grab one.  */
     259    /* We do not need any ordering wrt. to this load's reads-from, so relaxed
     260       MO is sufficient.  The acquire MO above ensures that in the problematic
     261       case, we do see the unsetting of the bit by another waiter.  */
     262    v = atomic_load_relaxed (&sem->value);
     263    do
     264      {
     265        do
     266  	{
     267  	  /* We are about to block, so make sure that the nwaiters bit is
     268  	     set.  We need release MO on the CAS to ensure that when another
     269  	     waiter unsets the nwaiters bit, it will also observe that we
     270  	     incremented nwaiters in the meantime (also see the unsetting of
     271  	     the bit below).  Relaxed MO on CAS failure is sufficient (see
     272  	     above).  */
     273  	  do
     274  	    {
     275  	      if ((v & SEM_NWAITERS_MASK) != 0)
     276  		break;
     277  	    }
     278  	  while (!atomic_compare_exchange_weak_release (&sem->value,
     279  	      &v, v | SEM_NWAITERS_MASK));
     280  	  /* If there is no token, wait.  */
     281  	  if ((v >> SEM_VALUE_SHIFT) == 0)
     282  	    {
     283  	      /* See __HAVE_64B_ATOMICS variant.  */
     284  	      err = do_futex_wait (sem, clockid, abstime);
     285  	      if (err == ETIMEDOUT || err == EINTR)
     286  		{
     287  		  __set_errno (err);
     288  		  err = -1;
     289  		  goto error;
     290  		}
     291  	      err = 0;
     292  	      /* We blocked, so there might be a token now.  Relaxed MO is
     293  		 sufficient (see above).  */
     294  	      v = atomic_load_relaxed (&sem->value);
     295  	    }
     296  	}
     297        /* If there is no token, we must not try to grab one.  */
     298        while ((v >> SEM_VALUE_SHIFT) == 0);
     299      }
     300    /* Try to grab a token.  We need acquire MO so this synchronizes with
     301       all token providers (i.e., the RMW operation we read from or all those
     302       before it in modification order; also see sem_post).  */
     303    while (!atomic_compare_exchange_weak_acquire (&sem->value,
     304        &v, v - (1 << SEM_VALUE_SHIFT)));
     305  
     306  error:
     307    pthread_cleanup_pop (0);
     308  
     309    __sem_wait_32_finish (sem);
     310  #endif
     311  
     312    return err;
     313  }
     314  
     315  /* Stop being a registered waiter (non-64b-atomics code only).  */
     316  #if !__HAVE_64B_ATOMICS
     317  static void
     318  __sem_wait_32_finish (struct new_sem *sem)
     319  {
     320    /* The nwaiters bit is still set, try to unset it now if this seems
     321       necessary.  We do this before decrementing nwaiters so that the unsetting
     322       is visible to other waiters entering after us.  Relaxed MO is sufficient
     323       because we are just speculating here; a stronger MO would not prevent
     324       misspeculation.  */
     325    unsigned int wguess = atomic_load_relaxed (&sem->nwaiters);
     326    if (wguess == 1)
     327      /* We might be the last waiter, so unset.  This needs acquire MO so that
     328         it synchronizes with the release MO when setting the bit above; if we
     329         overwrite someone else that set the bit, we'll read in the following
     330         decrement of nwaiters at least from that release sequence, so we'll
     331         see if the other waiter is still active or if another writer entered
     332         in the meantime (i.e., using the check below).  */
     333      atomic_fetch_and_acquire (&sem->value, ~SEM_NWAITERS_MASK);
     334  
     335    /* Now stop being a waiter, and see whether our guess was correct.
     336       This needs release MO so that it synchronizes with the acquire MO when
     337       a waiter increments nwaiters; this makes sure that newer writers see that
     338       we reset the waiters_present bit.  */
     339    unsigned int wfinal = atomic_fetch_add_release (&sem->nwaiters, -1);
     340    if (wfinal > 1 && wguess == 1)
     341      {
     342        /* We guessed wrong, and so need to clean up after the mistake and
     343           unblock any waiters that could have not been woken.  There is no
     344           additional ordering that we need to set up, so relaxed MO is
     345           sufficient.  */
     346        unsigned int v = atomic_fetch_or_relaxed (&sem->value,
     347  						SEM_NWAITERS_MASK);
     348        /* If there are available tokens, then wake as many waiters.  If there
     349           aren't any, then there is no need to wake anyone because there is
     350           none to grab for another waiter.  If tokens become available
     351           subsequently, then the respective sem_post calls will do the wake-up
     352           due to us having set the nwaiters bit again.  */
     353        v >>= SEM_VALUE_SHIFT;
     354        if (v > 0)
     355  	futex_wake (&sem->value, v, sem->private);
     356      }
     357  }
     358  #endif