[1/3] Add a fast path for C rd/wrlock v2

Message ID 1399412209-28245-2-git-send-email-andi@firstfloor.org
State Superseded
Delegated to: Carlos O'Donell
Headers

Commit Message

Andi Kleen May 6, 2014, 9:36 p.m. UTC
  From: Andi Kleen <ak@linux.intel.com>

One difference of the C versions to the assembler wr/rdlock
is that the C compiler saves some registers which are unnecessary
for the fast path in the prologue of the functions. Split the
uncontended fast path out into a separate function. Only when contention is
detected is the full featured function called. This makes
the fast path code (nearly) identical to the assembler version,
and gives uncontended performance within a few cycles.

nptl/:
2014-05-06  Andi Kleen  <ak@linux.intel.com>

	* pthread_rwlock_rdlock (__pthread_rwlock_rdlock):
	Split into __do_pthread_rwlock_rdlock and __pthread_rwlock_rdlock.
	* pthread_rwlock_wrlock (__pthread_rwlock_wrlock):
	Split into __do_pthread_rwlock_wrlock and __pthread_wrlock_rdlock.

v2: Rename some functions and add space.
---
 nptl/pthread_rwlock_rdlock.c | 88 ++++++++++++++++++++++++++++++--------------
 nptl/pthread_rwlock_wrlock.c | 59 ++++++++++++++++++++---------
 2 files changed, 103 insertions(+), 44 deletions(-)
  

Comments

Carlos O'Donell June 9, 2014, 7:13 p.m. UTC | #1
On 05/06/2014 05:36 PM, Andi Kleen wrote:
> From: Andi Kleen <ak@linux.intel.com>
> 
> One difference of the C versions to the assembler wr/rdlock
> is that the C compiler saves some registers which are unnecessary
> for the fast path in the prologue of the functions. Split the
> uncontended fast path out into a separate function. Only when contention is
> detected is the full featured function called. This makes
> the fast path code (nearly) identical to the assembler version,
> and gives uncontended performance within a few cycles.

Sounds like the best of both worlds. Thanks for the cleanup.

> nptl/:
> 2014-05-06  Andi Kleen  <ak@linux.intel.com>
> 
> 	* pthread_rwlock_rdlock (__pthread_rwlock_rdlock):
> 	Split into __do_pthread_rwlock_rdlock and __pthread_rwlock_rdlock.
> 	* pthread_rwlock_wrlock (__pthread_rwlock_wrlock):
> 	Split into __do_pthread_rwlock_wrlock and __pthread_wrlock_rdlock.

Looks good to me.

> v2: Rename some functions and add space.
> ---
>  nptl/pthread_rwlock_rdlock.c | 88 ++++++++++++++++++++++++++++++--------------
>  nptl/pthread_rwlock_wrlock.c | 59 ++++++++++++++++++++---------
>  2 files changed, 103 insertions(+), 44 deletions(-)
> 
> diff --git a/nptl/pthread_rwlock_rdlock.c b/nptl/pthread_rwlock_rdlock.c
> index 3773f7d..1df0327 100644
> --- a/nptl/pthread_rwlock_rdlock.c
> +++ b/nptl/pthread_rwlock_rdlock.c
> @@ -24,39 +24,16 @@
>  #include <stap-probe.h>
>  
>  
> -/* Acquire read lock for RWLOCK.  */
> -int
> -__pthread_rwlock_rdlock (rwlock)
> -     pthread_rwlock_t *rwlock;
> +/* Acquire read lock for RWLOCK.  Slow path.  */
> +static int __attribute__((noinline))
> +__pthread_rwlock_rdlock_slow (pthread_rwlock_t *rwlock)

OK.

>  {
>    int result = 0;
>  
> -  LIBC_PROBE (rdlock_entry, 1, rwlock);
> -
> -  /* Make sure we are alone.  */
> -  lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
> +  /* Lock is taken in caller.  */

OK.

>  
>    while (1)
>      {
> -      /* Get the rwlock if there is no writer...  */
> -      if (rwlock->__data.__writer == 0
> -	  /* ...and if either no writer is waiting or we prefer readers.  */
> -	  && (!rwlock->__data.__nr_writers_queued
> -	      || PTHREAD_RWLOCK_PREFER_READER_P (rwlock)))
> -	{
> -	  /* Increment the reader counter.  Avoid overflow.  */
> -	  if (__glibc_unlikely (++rwlock->__data.__nr_readers == 0))
> -	    {
> -	      /* Overflow on number of readers.	 */
> -	      --rwlock->__data.__nr_readers;
> -	      result = EAGAIN;
> -	    }
> -	  else
> -	    LIBC_PROBE (rdlock_acquire_read, 1, rwlock);
> -
> -	  break;
> -	}
> -
>        /* Make sure we are not holding the rwlock as a writer.  This is
>  	 a deadlock situation we recognize and report.  */
>        if (__builtin_expect (rwlock->__data.__writer
> @@ -88,6 +65,25 @@ __pthread_rwlock_rdlock (rwlock)
>        lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
>  
>        --rwlock->__data.__nr_readers_queued;
> +
> +      /* Get the rwlock if there is no writer...  */
> +      if (rwlock->__data.__writer == 0
> +	  /* ...and if either no writer is waiting or we prefer readers.  */
> +	  && (!rwlock->__data.__nr_writers_queued
> +	      || PTHREAD_RWLOCK_PREFER_READER_P (rwlock)))
> +	{
> +	  /* Increment the reader counter.  Avoid overflow.  */
> +	  if (__glibc_unlikely (++rwlock->__data.__nr_readers == 0))
> +	    {
> +	      /* Overflow on number of readers.	 */
> +	      --rwlock->__data.__nr_readers;
> +	      result = EAGAIN;
> +	    }
> +	  else
> +	    LIBC_PROBE (rdlock_acquire_read, 1, rwlock);
> +
> +	  break;
> +	}

OK, shuffle around the check because we do it once already
in the fast path, and we need to do it here *after* all the
other checks but before we loop.

>      }
>  
>    /* We are done, free the lock.  */
> @@ -96,5 +92,43 @@ __pthread_rwlock_rdlock (rwlock)
>    return result;
>  }
>  
> +
> +/* Fast path of acquiring read lock on RWLOCK.  */
> +
> +int
> +__pthread_rwlock_rdlock (pthread_rwlock_t *rwlock)
> +{
> +  int result = 0;
> +
> +  LIBC_PROBE (rdlock_entry, 1, rwlock);
> +
> +  /* Make sure we are alone.  */
> +  lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
> +
> +  /* Get the rwlock if there is no writer...  */
> +  if (rwlock->__data.__writer == 0
> +      /* ...and if either no writer is waiting or we prefer readers.  */
> +      && (!rwlock->__data.__nr_writers_queued
> +	  || PTHREAD_RWLOCK_PREFER_READER_P (rwlock)))
> +    {
> +      /* Increment the reader counter.  Avoid overflow.  */
> +      if (__glibc_unlikely (++rwlock->__data.__nr_readers == 0))
> +	{
> +	  /* Overflow on number of readers.	 */
> +	  --rwlock->__data.__nr_readers;
> +	  result = EAGAIN;
> +	}
> +      else
> +	LIBC_PROBE (rdlock_acquire_read, 1, rwlock);
> +
> +      /* We are done, free the lock.  */
> +      lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
> +
> +      return result;
> +    }
> +
> +  return __pthread_rwlock_rdlock_slow (rwlock);
> +}

OK.

> +
>  weak_alias (__pthread_rwlock_rdlock, pthread_rwlock_rdlock)
>  hidden_def (__pthread_rwlock_rdlock)
> diff --git a/nptl/pthread_rwlock_wrlock.c b/nptl/pthread_rwlock_wrlock.c
> index 1613d45..de54e51 100644
> --- a/nptl/pthread_rwlock_wrlock.c
> +++ b/nptl/pthread_rwlock_wrlock.c
> @@ -25,29 +25,15 @@
>  
>  
>  /* Acquire write lock for RWLOCK.  */
> -int
> -__pthread_rwlock_wrlock (rwlock)
> -     pthread_rwlock_t *rwlock;
> +static int __attribute__((noinline))
> +__pthread_rwlock_wrlock_slow (pthread_rwlock_t *rwlock)

OK.

>  {
>    int result = 0;
>  
> -  LIBC_PROBE (wrlock_entry, 1, rwlock);
> -
> -  /* Make sure we are alone.  */
> -  lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
> +  /* Caller has taken the lock.  */

OK.

>  
>    while (1)
>      {
> -      /* Get the rwlock if there is no writer and no reader.  */
> -      if (rwlock->__data.__writer == 0 && rwlock->__data.__nr_readers == 0)
> -	{
> -	  /* Mark self as writer.  */
> -	  rwlock->__data.__writer = THREAD_GETMEM (THREAD_SELF, tid);
> -
> -	  LIBC_PROBE (wrlock_acquire_write, 1, rwlock);
> -	  break;
> -	}
> -


OK.

>        /* Make sure we are not holding the rwlock as a writer.  This is
>  	 a deadlock situation we recognize and report.  */
>        if (__builtin_expect (rwlock->__data.__writer
> @@ -80,6 +66,16 @@ __pthread_rwlock_wrlock (rwlock)
>  
>        /* To start over again, remove the thread from the writer list.  */
>        --rwlock->__data.__nr_writers_queued;
> +
> +      /* Get the rwlock if there is no writer and no reader.  */
> +      if (rwlock->__data.__writer == 0 && rwlock->__data.__nr_readers == 0)
> +	{
> +	  /* Mark self as writer.  */
> +	  rwlock->__data.__writer = THREAD_GETMEM (THREAD_SELF, tid);
> +
> +	  LIBC_PROBE (wrlock_acquire_write, 1, rwlock);
> +	  break;

OK.

> +	}
>      }
>  
>    /* We are done, free the lock.  */
> @@ -88,5 +84,34 @@ __pthread_rwlock_wrlock (rwlock)
>    return result;
>  }
>  
> +/* Fast path of acquiring write lock for RWLOCK.  */
> +
> +int
> +__pthread_rwlock_wrlock (pthread_rwlock_t *rwlock)
> +{
> +  LIBC_PROBE (wrlock_entry, 1, rwlock);
> +
> +  /* Make sure we are alone.  */
> +  lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
> +
> +  /* Get the rwlock if there is no writer and no reader.  */
> +  if (__glibc_likely((rwlock->__data.__writer |
> +	rwlock->__data.__nr_readers) == 0))
> +    {
> +      /* Mark self as writer.  */
> +      rwlock->__data.__writer = THREAD_GETMEM (THREAD_SELF, tid);
> +
> +      LIBC_PROBE (wrlock_acquire_write, 1, rwlock);
> +
> +      /* We are done, free the lock.  */
> +      lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
> +
> +      return 0;
> +    }
> +
> +  return __pthread_rwlock_wrlock_slow (rwlock);
> +}
> +
> +

OK.

>  weak_alias (__pthread_rwlock_wrlock, pthread_rwlock_wrlock)
>  hidden_def (__pthread_rwlock_wrlock)
> 

Looks good to me, and should go in right away.

Cheers,
Carlos.
  
Jeff Law June 9, 2014, 7:16 p.m. UTC | #2
On 06/09/14 13:13, Carlos O'Donell wrote:
> On 05/06/2014 05:36 PM, Andi Kleen wrote:
>> From: Andi Kleen <ak@linux.intel.com>
>>
>> One difference of the C versions to the assembler wr/rdlock
>> is that the C compiler saves some registers which are unnecessary
>> for the fast path in the prologue of the functions. Split the
>> uncontended fast path out into a separate function. Only when contention is
>> detected is the full featured function called. This makes
>> the fast path code (nearly) identical to the assembler version,
>> and gives uncontended performance within a few cycles.
>
> Sounds like the best of both worlds. Thanks for the cleanup.
Andi -- any change you could file a gcc bug for that?  GCC knows how to 
do a limited form of shrink-wrapping as well as function splitting to 
optimize these kinds of situations.  If it's not working as it should, 
obviously we'd like to know and try to get it fixed.

Thanks,
jeff
  

Patch

diff --git a/nptl/pthread_rwlock_rdlock.c b/nptl/pthread_rwlock_rdlock.c
index 3773f7d..1df0327 100644
--- a/nptl/pthread_rwlock_rdlock.c
+++ b/nptl/pthread_rwlock_rdlock.c
@@ -24,39 +24,16 @@ 
 #include <stap-probe.h>
 
 
-/* Acquire read lock for RWLOCK.  */
-int
-__pthread_rwlock_rdlock (rwlock)
-     pthread_rwlock_t *rwlock;
+/* Acquire read lock for RWLOCK.  Slow path.  */
+static int __attribute__((noinline))
+__pthread_rwlock_rdlock_slow (pthread_rwlock_t *rwlock)
 {
   int result = 0;
 
-  LIBC_PROBE (rdlock_entry, 1, rwlock);
-
-  /* Make sure we are alone.  */
-  lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
+  /* Lock is taken in caller.  */
 
   while (1)
     {
-      /* Get the rwlock if there is no writer...  */
-      if (rwlock->__data.__writer == 0
-	  /* ...and if either no writer is waiting or we prefer readers.  */
-	  && (!rwlock->__data.__nr_writers_queued
-	      || PTHREAD_RWLOCK_PREFER_READER_P (rwlock)))
-	{
-	  /* Increment the reader counter.  Avoid overflow.  */
-	  if (__glibc_unlikely (++rwlock->__data.__nr_readers == 0))
-	    {
-	      /* Overflow on number of readers.	 */
-	      --rwlock->__data.__nr_readers;
-	      result = EAGAIN;
-	    }
-	  else
-	    LIBC_PROBE (rdlock_acquire_read, 1, rwlock);
-
-	  break;
-	}
-
       /* Make sure we are not holding the rwlock as a writer.  This is
 	 a deadlock situation we recognize and report.  */
       if (__builtin_expect (rwlock->__data.__writer
@@ -88,6 +65,25 @@  __pthread_rwlock_rdlock (rwlock)
       lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
 
       --rwlock->__data.__nr_readers_queued;
+
+      /* Get the rwlock if there is no writer...  */
+      if (rwlock->__data.__writer == 0
+	  /* ...and if either no writer is waiting or we prefer readers.  */
+	  && (!rwlock->__data.__nr_writers_queued
+	      || PTHREAD_RWLOCK_PREFER_READER_P (rwlock)))
+	{
+	  /* Increment the reader counter.  Avoid overflow.  */
+	  if (__glibc_unlikely (++rwlock->__data.__nr_readers == 0))
+	    {
+	      /* Overflow on number of readers.	 */
+	      --rwlock->__data.__nr_readers;
+	      result = EAGAIN;
+	    }
+	  else
+	    LIBC_PROBE (rdlock_acquire_read, 1, rwlock);
+
+	  break;
+	}
     }
 
   /* We are done, free the lock.  */
@@ -96,5 +92,43 @@  __pthread_rwlock_rdlock (rwlock)
   return result;
 }
 
+
+/* Fast path of acquiring read lock on RWLOCK.  */
+
+int
+__pthread_rwlock_rdlock (pthread_rwlock_t *rwlock)
+{
+  int result = 0;
+
+  LIBC_PROBE (rdlock_entry, 1, rwlock);
+
+  /* Make sure we are alone.  */
+  lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
+
+  /* Get the rwlock if there is no writer...  */
+  if (rwlock->__data.__writer == 0
+      /* ...and if either no writer is waiting or we prefer readers.  */
+      && (!rwlock->__data.__nr_writers_queued
+	  || PTHREAD_RWLOCK_PREFER_READER_P (rwlock)))
+    {
+      /* Increment the reader counter.  Avoid overflow.  */
+      if (__glibc_unlikely (++rwlock->__data.__nr_readers == 0))
+	{
+	  /* Overflow on number of readers.	 */
+	  --rwlock->__data.__nr_readers;
+	  result = EAGAIN;
+	}
+      else
+	LIBC_PROBE (rdlock_acquire_read, 1, rwlock);
+
+      /* We are done, free the lock.  */
+      lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
+
+      return result;
+    }
+
+  return __pthread_rwlock_rdlock_slow (rwlock);
+}
+
 weak_alias (__pthread_rwlock_rdlock, pthread_rwlock_rdlock)
 hidden_def (__pthread_rwlock_rdlock)
diff --git a/nptl/pthread_rwlock_wrlock.c b/nptl/pthread_rwlock_wrlock.c
index 1613d45..de54e51 100644
--- a/nptl/pthread_rwlock_wrlock.c
+++ b/nptl/pthread_rwlock_wrlock.c
@@ -25,29 +25,15 @@ 
 
 
 /* Acquire write lock for RWLOCK.  */
-int
-__pthread_rwlock_wrlock (rwlock)
-     pthread_rwlock_t *rwlock;
+static int __attribute__((noinline))
+__pthread_rwlock_wrlock_slow (pthread_rwlock_t *rwlock)
 {
   int result = 0;
 
-  LIBC_PROBE (wrlock_entry, 1, rwlock);
-
-  /* Make sure we are alone.  */
-  lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
+  /* Caller has taken the lock.  */
 
   while (1)
     {
-      /* Get the rwlock if there is no writer and no reader.  */
-      if (rwlock->__data.__writer == 0 && rwlock->__data.__nr_readers == 0)
-	{
-	  /* Mark self as writer.  */
-	  rwlock->__data.__writer = THREAD_GETMEM (THREAD_SELF, tid);
-
-	  LIBC_PROBE (wrlock_acquire_write, 1, rwlock);
-	  break;
-	}
-
       /* Make sure we are not holding the rwlock as a writer.  This is
 	 a deadlock situation we recognize and report.  */
       if (__builtin_expect (rwlock->__data.__writer
@@ -80,6 +66,16 @@  __pthread_rwlock_wrlock (rwlock)
 
       /* To start over again, remove the thread from the writer list.  */
       --rwlock->__data.__nr_writers_queued;
+
+      /* Get the rwlock if there is no writer and no reader.  */
+      if (rwlock->__data.__writer == 0 && rwlock->__data.__nr_readers == 0)
+	{
+	  /* Mark self as writer.  */
+	  rwlock->__data.__writer = THREAD_GETMEM (THREAD_SELF, tid);
+
+	  LIBC_PROBE (wrlock_acquire_write, 1, rwlock);
+	  break;
+	}
     }
 
   /* We are done, free the lock.  */
@@ -88,5 +84,34 @@  __pthread_rwlock_wrlock (rwlock)
   return result;
 }
 
+/* Fast path of acquiring write lock for RWLOCK.  */
+
+int
+__pthread_rwlock_wrlock (pthread_rwlock_t *rwlock)
+{
+  LIBC_PROBE (wrlock_entry, 1, rwlock);
+
+  /* Make sure we are alone.  */
+  lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
+
+  /* Get the rwlock if there is no writer and no reader.  */
+  if (__glibc_likely((rwlock->__data.__writer |
+	rwlock->__data.__nr_readers) == 0))
+    {
+      /* Mark self as writer.  */
+      rwlock->__data.__writer = THREAD_GETMEM (THREAD_SELF, tid);
+
+      LIBC_PROBE (wrlock_acquire_write, 1, rwlock);
+
+      /* We are done, free the lock.  */
+      lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
+
+      return 0;
+    }
+
+  return __pthread_rwlock_wrlock_slow (rwlock);
+}
+
+
 weak_alias (__pthread_rwlock_wrlock, pthread_rwlock_wrlock)
 hidden_def (__pthread_rwlock_wrlock)