diff mbox

[1/2] Optimize generic spinlock code and use C11 like atomic macros.

Message ID 628f6311-239c-5eea-572c-c2acae6fcbee@linux.vnet.ibm.com
State Superseded
Headers show

Commit Message

Stefan Liebler Feb. 8, 2017, 2:49 p.m. UTC
On 12/19/2016 01:14 PM, Szabolcs Nagy wrote:
> On 16/12/16 16:31, Stefan Liebler wrote:
>> Passing a volatile pointer to the atomic macros can lead to extra stores
>> and loads to stack if such a macro creates a temporary variable by using
>> __typeof (*(mem)). Thus the patch passes a int pointer to the atomic macros.
>
> the resolution of
> http://www.open-std.org/jtc1/sc22/wg14/www/docs/dr_423.htm
> says that the type of a cast expression is unqualified, so
>
>   __typeof ((__typeof (*(mem)))*(mem)) tmp = *(mem);
>
> would have the right type.. seems to work since gcc-5
>
> https://godbolt.org/g/eS0X5b
>
> (looks beautiful.)
>

Thanks for this.

I've adjusted the needed atomic-macros in include/atomic.h and use the 
typeof-construct "__typeof ((__typeof (*(mem)))*(mem)) tmp;".
Now we can pass the volatile int pointers to the atomic-macros.

See the updated patch.

Okay to commit?

Bye.
Stefan

ChangeLog:

	* include/atomic.h:
	(__atomic_val_bysize): Cast type to omit volatile qualifier.
	(atomic_exchange_acq): Likewise.
	(atomic_load_relaxed): Likewise.
	* nptl/pthread_spin_init.c (pthread_spin_init):
	Use atomic_store_relaxed.
	* nptl/pthread_spin_lock.c (pthread_spin_lock):
	Use C11-like atomic macros.
	* nptl/pthread_spin_trylock.c
	(pthread_spin_trylock): Likewise.  Use an explicit test if lock
	is free, if new SPIN_TRYLOCK_USE_CMPXCHG_INSTEAD_OF_XCHG macro
	is set to one.
	(SPIN_TRYLOCK_USE_CMPXCHG_INSTEAD_OF_XCHG): New define.
	* nptl/pthread_spin_unlock.c (pthread_spin_unlock):
	Use atomic_store_release.

Comments

Torvald Riegel Feb. 13, 2017, 8:29 p.m. UTC | #1
Thanks for working on this.  Detailed comments below.

Generally, I think we need to keep working on optimizing this (this can
happen in follow-up patches).  For example, we should have
microbenchmarks for the exchange vs. CAS choice.

Also, some of the tuning knobs such as
SPIN_TRYLOCK_USE_CMPXCHG_INSTEAD_OF_XCHG apply to atomics in general and
not just the spinlock.  I'd prefer if these where in a state in which we
could add them as property that's part of the atomics interface.

On Wed, 2017-02-08 at 15:49 +0100, Stefan Liebler wrote:
> diff --git a/include/atomic.h b/include/atomic.h
> index 7f32640..770db4a 100644
> --- a/include/atomic.h
> 
> +++ b/include/atomic.h
> 
> @@ -54,7 +54,7 @@
>     and following args.  */
>  #define __atomic_val_bysize(pre, post, mem, ...)			      \
>    ({									      \
> -    __typeof (*mem) __atg1_result;					      \
> 
> +    __typeof ((__typeof (*(mem))) *(mem)) __atg1_result;		      \
> 
>      if (sizeof (*mem) == 1)						      \
>        __atg1_result = pre##_8_##post (mem, __VA_ARGS__);		      \
>      else if (sizeof (*mem) == 2)					      \
> @@ -162,9 +162,9 @@
>  /* Store NEWVALUE in *MEM and return the old value.  */
>  #ifndef atomic_exchange_acq
>  # define atomic_exchange_acq(mem, newvalue) \
> -  ({ __typeof (*(mem)) __atg5_oldval;					      \
> 
> +  ({ __typeof ((__typeof (*(mem))) *(mem)) __atg5_oldval;		      \
> 
>       __typeof (mem) __atg5_memp = (mem);				      \
> -     __typeof (*(mem)) __atg5_value = (newvalue);			      \
> 
> +     __typeof ((__typeof (*(mem))) *(mem)) __atg5_value = (newvalue);	      \
> 
>  									      \
>       do									      \
>         __atg5_oldval = *__atg5_memp;					      \
> @@ -668,7 +668,7 @@ void __atomic_link_error (void);
>  
>  # ifndef atomic_load_relaxed
>  #  define atomic_load_relaxed(mem) \
> -   ({ __typeof (*(mem)) __atg100_val;					      \
> 
> +   ({ __typeof ((__typeof (*(mem))) *(mem)) __atg100_val;		      \
> 
>     __asm ("" : "=r" (__atg100_val) : "0" (*(mem)));			      \
>     __atg100_val; })
>  # endif

You could keep these changes, but it's a bit odd because you only apply
them for the functions you needed them for.  I think it would be better
to just remove the volatile qualification in the caller (ie, cast lock
to nonvolatile in pthread_spin_lock etc.

> diff --git a/nptl/pthread_spin_init.c b/nptl/pthread_spin_init.c
> index 01dec5e..bca3590 100644
> --- a/nptl/pthread_spin_init.c
> 
> +++ b/nptl/pthread_spin_init.c
> 
> @@ -22,6 +22,8 @@
>  int
>  pthread_spin_init (pthread_spinlock_t *lock, int pshared)
>  {
> -  *lock = 0;
> 
> +  /* The atomic_store_relaxed is enough as we only initialize the spinlock here
> 
> +     and we are not in a critical region.  */

I would change the comment to:
/* Relaxed MO is fine because this is an initializing store.  */

> 
> +  atomic_store_relaxed (lock, 0);
> 
>    return 0;
>  }
> diff --git a/nptl/pthread_spin_lock.c b/nptl/pthread_spin_lock.c
> index 4d03b78..4107b5e 100644
> --- a/nptl/pthread_spin_lock.c
> 
> +++ b/nptl/pthread_spin_lock.c
> 
> @@ -21,7 +21,7 @@
>  
>  /* A machine-specific version can define SPIN_LOCK_READS_BETWEEN_CMPXCHG
>    to the number of plain reads that it's optimal to spin on between uses
> -  of atomic_compare_and_exchange_val_acq.  If spinning forever is optimal
> 
> +  of atomic_compare_exchange_weak_acquire.  If spinning forever is optimal

, at the end of this line.

>    then use -1.  If no plain reads here would ever be optimal, use 0.  */
>  #ifndef SPIN_LOCK_READS_BETWEEN_CMPXCHG
>  # warning machine-dependent file should define SPIN_LOCK_READS_BETWEEN_CMPXCHG
> @@ -37,10 +37,13 @@ pthread_spin_lock (pthread_spinlock_t *lock)
>       when the lock is locked.
>       We assume that the first try mostly will be successful, and we use
>       atomic_exchange.  For the subsequent tries we use
> -     atomic_compare_and_exchange.  */
> 
> -  if (atomic_exchange_acq (lock, 1) == 0)
> 
> +     atomic_compare_and_exchange.
> 
> +     We need acquire memory order here as we need to see if another thread has
> 
> +     locked / unlocked this spinlock.  */

Please change to:
We use acquire MO to synchronize-with the release MO store in
pthread_spin_unlock, and thus ensure that prior critical sections
happen-before this critical section.

> 
> +  if (__glibc_likely (atomic_exchange_acquire (lock, 1) == 0))
> 
>      return 0;
>  
> +  int val;
> 
>    do
>      {
>        /* The lock is contended and we need to wait.  Going straight back
> @@ -50,20 +53,39 @@ pthread_spin_lock (pthread_spinlock_t *lock)
>  	 On the other hand, we do want to update memory state on the local core
>  	 once in a while to avoid spinning indefinitely until some event that
>  	 will happen to update local memory as a side-effect.  */
> -      if (SPIN_LOCK_READS_BETWEEN_CMPXCHG >= 0)
> 
> +
> 
> +#if SPIN_LOCK_READS_BETWEEN_CMPXCHG >= 0
> 
> +      /* Use at most SPIN_LOCK_READS_BETWEEN_CMPXCHG plain reads between the
> 
> +	 atomic compare and exchanges.  */
> 
> +      int wait;
> 
> +      for (wait = 0; wait < SPIN_LOCK_READS_BETWEEN_CMPXCHG;  wait ++)
> 
>  	{
> -	  int wait = SPIN_LOCK_READS_BETWEEN_CMPXCHG;
> 
> +	  atomic_spin_nop ();
> 
>  
> -	  while (*lock != 0 && wait > 0)
> 
> -	    --wait;
> 
> +	  val = atomic_load_relaxed (lock);
> 
> +	  if (val == 0)
> 
> +	    break;
> 
>  	}
> -      else
> 
> +
> 
> +      /* Set expected value to zero for the next compare and exchange.  */
> 
> +      val = 0;
> 
> +
> 
> +#else /* SPIN_LOCK_READS_BETWEEN_CMPXCHG < 0  */
> 
> +      /* Use plain reads until spinlock is free and then try a further atomic

> +	 compare and exchange the next time.  */


Please change to:
Use relaxed-MO reads until we observe the lock to not be acquired
anymore. 


> +      do
> 
>  	{
> -	  while (*lock != 0)
> 
> -	    ;
> 
> +	  atomic_spin_nop ();
> 
> +
> 
> +	  val = atomic_load_relaxed (lock);
> 
>  	}
> +      while (val != 0);
> 
> +
> 
> +#endif
> 
> +      /* We need acquire memory order here for the same reason as mentioned
> 
> +	 for the first try to lock the spinlock.  */
> 
>      }
> -  while (atomic_compare_and_exchange_val_acq (lock, 1, 0) != 0);
> 
> +  while (!atomic_compare_exchange_weak_acquire (lock, &val, 1));
> 
>  
>    return 0;
>  }
> diff --git a/nptl/pthread_spin_trylock.c b/nptl/pthread_spin_trylock.c
> index 593bba3..942a38f 100644
> --- a/nptl/pthread_spin_trylock.c
> 
> +++ b/nptl/pthread_spin_trylock.c
> 
> @@ -20,8 +20,29 @@
>  #include <atomic.h>
>  #include "pthreadP.h"
>  
> +/* A machine-specific version can define
> 
> +   SPIN_TRYLOCK_USE_CMPXCHG_INSTEAD_OF_XCHG to 1 if an explicit test if
> 
> +   lock is free is optimal.  */
> 
> +#ifndef SPIN_TRYLOCK_USE_CMPXCHG_INSTEAD_OF_XCHG
> 
> +# define SPIN_TRYLOCK_USE_CMPXCHG_INSTEAD_OF_XCHG 0
> 
> +#endif

A while ago we tried hard to remove all code that would fail silently
when a macro had a typo in the name, for example.  We still have a few
of them left (e.g., in the atomics), but I think this here doesn't
warrant an exception.

Also, the name does not match the description.  What are you really
trying to parametrize here?  Do you want to do test-and-test-and-set vs.
test-and-set? Or do you want to make some statement about how CAS vs.
exchange behave in terms of runtime costs / effect on caches?  Or do you
want to have a flag that says that exchange is implemented through a CAS
loop anyway, and thus it's not worth trying an exchange if we actually
want to bail out when the value is not equal to the expected value?

> 
> +
> 
>  int
>  pthread_spin_trylock (pthread_spinlock_t *lock)
>  {
> -  return atomic_exchange_acq (lock, 1) ? EBUSY : 0;
> 
> +  /* We need acquire memory order here as we need to see if another
> 
> +     thread has locked / unlocked this spinlock.  */

See above.  Please fix the comment in a similar way.

> 
> +#if SPIN_TRYLOCK_USE_CMPXCHG_INSTEAD_OF_XCHG == 1
> 
> +  /* Load and test the spinlock and only try to lock the spinlock if it is
> 
> +     free.  */
> 
> +  int val = atomic_load_relaxed (lock);
> 
> +  if (__glibc_likely (val == 0 && atomic_exchange_acquire (lock, 1) == 0))

I think that's not quite the same as a choice between CAS and exchange.
Doing a load first could be helpful for *both* CAS and exchange.  OTOH,
if we assume that trylock will succeed most of the time, then the load
is unnecessary and might be more costly (eg, if it causes the state of
the cache line to change twice).

> 
> +    return 0;
> 
> +#else
> 
> +  /* Set spinlock to locked and test if we have locked it.  */

Just say that we try to acquire the lock, which succeeds if the lock had
not been acquired.

> +  if (__glibc_likely (atomic_exchange_acquire (lock, 1) == 0))
> 
> +    return 0;
> 
> +#endif
> 
> +
> 
> +  return EBUSY;
> 
>  }
> diff --git a/nptl/pthread_spin_unlock.c b/nptl/pthread_spin_unlock.c
> index 5fd73e5..f83b696 100644
> --- a/nptl/pthread_spin_unlock.c
> 
> +++ b/nptl/pthread_spin_unlock.c
> 
> @@ -23,7 +23,9 @@
>  int
>  pthread_spin_unlock (pthread_spinlock_t *lock)
>  {
> -  atomic_full_barrier ();
> 
> -  *lock = 0;
> 
> +  /* The atomic_store_release synchronizes-with the atomic_exchange_acquire
> 
> +     or atomic_compare_exchange_weak_acquire in pthread_spin_lock /
> 
> +     pthread_spin_trylock.  */
> 
> +  atomic_store_release (lock, 0);
> 
>    return 0;
>  }

I agree with this change.  However, depending on how one interprets
POSIX' memory model, one may expect lock releases to be sequentially
consistent.  Nonetheless, IMO, glibc should use only release MO.

But we need to announce this change.  Some of us have been considering
an additional section in the manual were we specify where we deviate
from POSIX; this change might be the first we're listing there (though
to be fair, this is a deviation only on some architectures because on
others such as powerpc I think, we've been using release MO forever).
diff mbox

Patch

commit cc80b947c9c355e83f464fa7135e3ce9db045be8
Author: Stefan Liebler <stli@linux.vnet.ibm.com>
Date:   Wed Feb 8 15:24:14 2017 +0100

    Optimize generic spinlock code and use C11 like atomic macros.
    
    This patch optimizes the generic spinlock code.
    
    The type pthread_spinlock_t is a typedef to volatile int on all archs.
    Passing a volatile pointer to the atomic macros can lead to extra stores
    and loads to stack if such a macro creates a temporary variable by using
    "__typeof (*(mem)) tmp;".  Thus, those macros which are used by spinlock code -
    atomic_exchange_acquire, atomic_load_relaxed,
    atomic_compare_exchange_weak_acquire - have to be adjusted.
    According to the comment from  Szabolcs Nagy, the type of a cast expression is
    unqualified (see http://www.open-std.org/jtc1/sc22/wg14/www/docs/dr_423.htm):
    __typeof ((__typeof (*(mem)) *(mem)) tmp;
    This patch adjusts those macros in include/atomic.h.
    
    The atomic macros are replaced by the C11 like atomic macros and thus
    the code is aligned to it.  The issue with passed volatile int pointers
    applies to the C11 like atomic macros as well as the ones used before.
    
    I've added a glibc_likely hint to the first atomic exchange in
    pthread_spin_lock in order to return immediately to the caller if the lock is
    free.  Without the hint, there is an additional jump if the lock is free.
    
    I've added the atomic_spin_nop macro within the loop of plain reads.
    The plain reads are also realized by C11 like atomic_load_relaxed macro.
    
    For pthread_spin_trylock, a machine-specific version can define
    SPIN_TRYLOCK_USE_CMPXCHG_INSTEAD_OF_XCHG to 1 if an explicit test if
    lock is free is optimal.
    
    ChangeLog:
    
    	* include/atomic.h:
    	(__atomic_val_bysize): Cast type to omit volatile qualifier.
    	(atomic_exchange_acq): Likewise.
    	(atomic_load_relaxed): Likewise.
    	* nptl/pthread_spin_init.c (pthread_spin_init):
    	Use atomic_store_relaxed.
    	* nptl/pthread_spin_lock.c (pthread_spin_lock):
    	Use C11-like atomic macros.
    	* nptl/pthread_spin_trylock.c
    	(pthread_spin_trylock): Likewise.  Use an explicit test if lock
    	is free, if new SPIN_TRYLOCK_USE_CMPXCHG_INSTEAD_OF_XCHG macro
    	is set to one.
    	(SPIN_TRYLOCK_USE_CMPXCHG_INSTEAD_OF_XCHG): New define.
    	* nptl/pthread_spin_unlock.c (pthread_spin_unlock):
    	Use atomic_store_release.

diff --git a/include/atomic.h b/include/atomic.h
index 7f32640..770db4a 100644
--- a/include/atomic.h
+++ b/include/atomic.h
@@ -54,7 +54,7 @@ 
    and following args.  */
 #define __atomic_val_bysize(pre, post, mem, ...)			      \
   ({									      \
-    __typeof (*mem) __atg1_result;					      \
+    __typeof ((__typeof (*(mem))) *(mem)) __atg1_result;		      \
     if (sizeof (*mem) == 1)						      \
       __atg1_result = pre##_8_##post (mem, __VA_ARGS__);		      \
     else if (sizeof (*mem) == 2)					      \
@@ -162,9 +162,9 @@ 
 /* Store NEWVALUE in *MEM and return the old value.  */
 #ifndef atomic_exchange_acq
 # define atomic_exchange_acq(mem, newvalue) \
-  ({ __typeof (*(mem)) __atg5_oldval;					      \
+  ({ __typeof ((__typeof (*(mem))) *(mem)) __atg5_oldval;		      \
      __typeof (mem) __atg5_memp = (mem);				      \
-     __typeof (*(mem)) __atg5_value = (newvalue);			      \
+     __typeof ((__typeof (*(mem))) *(mem)) __atg5_value = (newvalue);	      \
 									      \
      do									      \
        __atg5_oldval = *__atg5_memp;					      \
@@ -668,7 +668,7 @@  void __atomic_link_error (void);
 
 # ifndef atomic_load_relaxed
 #  define atomic_load_relaxed(mem) \
-   ({ __typeof (*(mem)) __atg100_val;					      \
+   ({ __typeof ((__typeof (*(mem))) *(mem)) __atg100_val;		      \
    __asm ("" : "=r" (__atg100_val) : "0" (*(mem)));			      \
    __atg100_val; })
 # endif
diff --git a/nptl/pthread_spin_init.c b/nptl/pthread_spin_init.c
index 01dec5e..bca3590 100644
--- a/nptl/pthread_spin_init.c
+++ b/nptl/pthread_spin_init.c
@@ -22,6 +22,8 @@ 
 int
 pthread_spin_init (pthread_spinlock_t *lock, int pshared)
 {
-  *lock = 0;
+  /* The atomic_store_relaxed is enough as we only initialize the spinlock here
+     and we are not in a critical region.  */
+  atomic_store_relaxed (lock, 0);
   return 0;
 }
diff --git a/nptl/pthread_spin_lock.c b/nptl/pthread_spin_lock.c
index 4d03b78..4107b5e 100644
--- a/nptl/pthread_spin_lock.c
+++ b/nptl/pthread_spin_lock.c
@@ -21,7 +21,7 @@ 
 
 /* A machine-specific version can define SPIN_LOCK_READS_BETWEEN_CMPXCHG
   to the number of plain reads that it's optimal to spin on between uses
-  of atomic_compare_and_exchange_val_acq.  If spinning forever is optimal
+  of atomic_compare_exchange_weak_acquire.  If spinning forever is optimal
   then use -1.  If no plain reads here would ever be optimal, use 0.  */
 #ifndef SPIN_LOCK_READS_BETWEEN_CMPXCHG
 # warning machine-dependent file should define SPIN_LOCK_READS_BETWEEN_CMPXCHG
@@ -37,10 +37,13 @@  pthread_spin_lock (pthread_spinlock_t *lock)
      when the lock is locked.
      We assume that the first try mostly will be successful, and we use
      atomic_exchange.  For the subsequent tries we use
-     atomic_compare_and_exchange.  */
-  if (atomic_exchange_acq (lock, 1) == 0)
+     atomic_compare_and_exchange.
+     We need acquire memory order here as we need to see if another thread has
+     locked / unlocked this spinlock.  */
+  if (__glibc_likely (atomic_exchange_acquire (lock, 1) == 0))
     return 0;
 
+  int val;
   do
     {
       /* The lock is contended and we need to wait.  Going straight back
@@ -50,20 +53,39 @@  pthread_spin_lock (pthread_spinlock_t *lock)
 	 On the other hand, we do want to update memory state on the local core
 	 once in a while to avoid spinning indefinitely until some event that
 	 will happen to update local memory as a side-effect.  */
-      if (SPIN_LOCK_READS_BETWEEN_CMPXCHG >= 0)
+
+#if SPIN_LOCK_READS_BETWEEN_CMPXCHG >= 0
+      /* Use at most SPIN_LOCK_READS_BETWEEN_CMPXCHG plain reads between the
+	 atomic compare and exchanges.  */
+      int wait;
+      for (wait = 0; wait < SPIN_LOCK_READS_BETWEEN_CMPXCHG;  wait ++)
 	{
-	  int wait = SPIN_LOCK_READS_BETWEEN_CMPXCHG;
+	  atomic_spin_nop ();
 
-	  while (*lock != 0 && wait > 0)
-	    --wait;
+	  val = atomic_load_relaxed (lock);
+	  if (val == 0)
+	    break;
 	}
-      else
+
+      /* Set expected value to zero for the next compare and exchange.  */
+      val = 0;
+
+#else /* SPIN_LOCK_READS_BETWEEN_CMPXCHG < 0  */
+      /* Use plain reads until spinlock is free and then try a further atomic
+	 compare and exchange the next time.  */
+      do
 	{
-	  while (*lock != 0)
-	    ;
+	  atomic_spin_nop ();
+
+	  val = atomic_load_relaxed (lock);
 	}
+      while (val != 0);
+
+#endif
+      /* We need acquire memory order here for the same reason as mentioned
+	 for the first try to lock the spinlock.  */
     }
-  while (atomic_compare_and_exchange_val_acq (lock, 1, 0) != 0);
+  while (!atomic_compare_exchange_weak_acquire (lock, &val, 1));
 
   return 0;
 }
diff --git a/nptl/pthread_spin_trylock.c b/nptl/pthread_spin_trylock.c
index 593bba3..942a38f 100644
--- a/nptl/pthread_spin_trylock.c
+++ b/nptl/pthread_spin_trylock.c
@@ -20,8 +20,29 @@ 
 #include <atomic.h>
 #include "pthreadP.h"
 
+/* A machine-specific version can define
+   SPIN_TRYLOCK_USE_CMPXCHG_INSTEAD_OF_XCHG to 1 if an explicit test if
+   lock is free is optimal.  */
+#ifndef SPIN_TRYLOCK_USE_CMPXCHG_INSTEAD_OF_XCHG
+# define SPIN_TRYLOCK_USE_CMPXCHG_INSTEAD_OF_XCHG 0
+#endif
+
 int
 pthread_spin_trylock (pthread_spinlock_t *lock)
 {
-  return atomic_exchange_acq (lock, 1) ? EBUSY : 0;
+  /* We need acquire memory order here as we need to see if another
+     thread has locked / unlocked this spinlock.  */
+#if SPIN_TRYLOCK_USE_CMPXCHG_INSTEAD_OF_XCHG == 1
+  /* Load and test the spinlock and only try to lock the spinlock if it is
+     free.  */
+  int val = atomic_load_relaxed (lock);
+  if (__glibc_likely (val == 0 && atomic_exchange_acquire (lock, 1) == 0))
+    return 0;
+#else
+  /* Set spinlock to locked and test if we have locked it.  */
+  if (__glibc_likely (atomic_exchange_acquire (lock, 1) == 0))
+    return 0;
+#endif
+
+  return EBUSY;
 }
diff --git a/nptl/pthread_spin_unlock.c b/nptl/pthread_spin_unlock.c
index 5fd73e5..f83b696 100644
--- a/nptl/pthread_spin_unlock.c
+++ b/nptl/pthread_spin_unlock.c
@@ -23,7 +23,9 @@ 
 int
 pthread_spin_unlock (pthread_spinlock_t *lock)
 {
-  atomic_full_barrier ();
-  *lock = 0;
+  /* The atomic_store_release synchronizes-with the atomic_exchange_acquire
+     or atomic_compare_exchange_weak_acquire in pthread_spin_lock /
+     pthread_spin_trylock.  */
+  atomic_store_release (lock, 0);
   return 0;
 }