[v2] rwlock: Fix explicit hand-over.

Message ID 1490482860.26906.391.camel@redhat.com
State New, archived
Headers

Commit Message

Torvald Riegel March 25, 2017, 11:01 p.m. UTC
  On Sat, 2017-03-25 at 21:17 +0100, Florian Weimer wrote:
> * Torvald Riegel:
> 
> > +  bool registered_while_in_write_phase = false;
> >    if (__glibc_likely ((r & PTHREAD_RWLOCK_WRPHASE) == 0))
> >      return 0;
> > +  else
> > +    registered_while_in_write_phase = true;
> 
> Sorry, this doesn't look quite right.  Isn't
> registered_while_in_write_phase always true?

Attached is a v2 patch.  It's the same logic, but bigger.  Most of this
increase is due to reformatting, but I also adapted some of the
comments.
I get two failures, but I guess these are either due to the bad internet
connectivity I currently have, or something at the resolver.
FAIL: resolv/mtrace-tst-leaks
FAIL: resolv/tst-leaks
  

Comments

Waiman Long March 27, 2017, 4:09 p.m. UTC | #1
On 03/25/2017 07:01 PM, Torvald Riegel wrote:
> On Sat, 2017-03-25 at 21:17 +0100, Florian Weimer wrote:
>> * Torvald Riegel:
>>
>>> +  bool registered_while_in_write_phase = false;
>>>    if (__glibc_likely ((r & PTHREAD_RWLOCK_WRPHASE) == 0))
>>>      return 0;
>>> +  else
>>> +    registered_while_in_write_phase = true;
>> Sorry, this doesn't look quite right.  Isn't
>> registered_while_in_write_phase always true?
> Attached is a v2 patch.  It's the same logic, but bigger.  Most of this
> increase is due to reformatting, but I also adapted some of the
> comments.
> I get two failures, but I guess these are either due to the bad internet
> connectivity I currently have, or something at the resolver.
> FAIL: resolv/mtrace-tst-leaks
> FAIL: resolv/tst-leaks
>
>
I have verified that the v2 patch did fix the hang that I saw with my
microbenchmark. I also observed an increase in performance in the new
rwlock code compared with the old one before the major rewrite. On a
4-socket 40-core 80-thread system, 80 parallel locking threads had an
average per-thread throughput of 32,584 ops/s. The old rwlock code had a
throughput of 13,411 only. So there is a more than 1.4X increase in
performance.

Tested-by: Waiman Long <longman@redhat.com>

Cheers,
Longman
  
Torvald Riegel March 27, 2017, 5:53 p.m. UTC | #2
On Mon, 2017-03-27 at 12:09 -0400, Waiman Long wrote:
> On 03/25/2017 07:01 PM, Torvald Riegel wrote:
> > On Sat, 2017-03-25 at 21:17 +0100, Florian Weimer wrote:
> >> * Torvald Riegel:
> >>
> >>> +  bool registered_while_in_write_phase = false;
> >>>    if (__glibc_likely ((r & PTHREAD_RWLOCK_WRPHASE) == 0))
> >>>      return 0;
> >>> +  else
> >>> +    registered_while_in_write_phase = true;
> >> Sorry, this doesn't look quite right.  Isn't
> >> registered_while_in_write_phase always true?
> > Attached is a v2 patch.  It's the same logic, but bigger.  Most of this
> > increase is due to reformatting, but I also adapted some of the
> > comments.
> > I get two failures, but I guess these are either due to the bad internet
> > connectivity I currently have, or something at the resolver.
> > FAIL: resolv/mtrace-tst-leaks
> > FAIL: resolv/tst-leaks
> >
> >
> I have verified that the v2 patch did fix the hang that I saw with my
> microbenchmark. I also observed an increase in performance in the new
> rwlock code compared with the old one before the major rewrite.

Thanks!

> On a
> 4-socket 40-core 80-thread system, 80 parallel locking threads had an
> average per-thread throughput of 32,584 ops/s. The old rwlock code had a
> throughput of 13,411 only. So there is a more than 1.4X increase in
> performance.

Is that with the 50% reads / 50% writes workload (per thread), empty
critical sections, and no delay between critical sections?
  
Waiman Long March 27, 2017, 6:16 p.m. UTC | #3
On 03/27/2017 01:53 PM, Torvald Riegel wrote:
> On Mon, 2017-03-27 at 12:09 -0400, Waiman Long wrote:
>> On 03/25/2017 07:01 PM, Torvald Riegel wrote:
>>> On Sat, 2017-03-25 at 21:17 +0100, Florian Weimer wrote:
>>>> * Torvald Riegel:
>>>>
>>>>> +  bool registered_while_in_write_phase = false;
>>>>>    if (__glibc_likely ((r & PTHREAD_RWLOCK_WRPHASE) == 0))
>>>>>      return 0;
>>>>> +  else
>>>>> +    registered_while_in_write_phase = true;
>>>> Sorry, this doesn't look quite right.  Isn't
>>>> registered_while_in_write_phase always true?
>>> Attached is a v2 patch.  It's the same logic, but bigger.  Most of this
>>> increase is due to reformatting, but I also adapted some of the
>>> comments.
>>> I get two failures, but I guess these are either due to the bad internet
>>> connectivity I currently have, or something at the resolver.
>>> FAIL: resolv/mtrace-tst-leaks
>>> FAIL: resolv/tst-leaks
>>>
>>>
>> I have verified that the v2 patch did fix the hang that I saw with my
>> microbenchmark. I also observed an increase in performance in the new
>> rwlock code compared with the old one before the major rewrite.
> Thanks!
>
>> On a
>> 4-socket 40-core 80-thread system, 80 parallel locking threads had an
>> average per-thread throughput of 32,584 ops/s. The old rwlock code had a
>> throughput of 13,411 only. So there is a more than 1.4X increase in
>> performance.
> Is that with the 50% reads / 50% writes workload (per thread), empty
> critical sections, and no delay between critical sections?
>
Yes, I used the default configuration of 1:1 read/write ratio. The
critical section isn't exactly empty as I used 1 pause instruction for
both in the critical section and between critical section.

Regards,
Longman
  
Waiman Long March 27, 2017, 6:59 p.m. UTC | #4
On 03/27/2017 02:16 PM, Waiman Long wrote:
> On 03/27/2017 01:53 PM, Torvald Riegel wrote:
>> On Mon, 2017-03-27 at 12:09 -0400, Waiman Long wrote:
>>> On 03/25/2017 07:01 PM, Torvald Riegel wrote:
>>>> On Sat, 2017-03-25 at 21:17 +0100, Florian Weimer wrote:
>>>>> * Torvald Riegel:
>>>>>
>>>>>> +  bool registered_while_in_write_phase = false;
>>>>>>    if (__glibc_likely ((r & PTHREAD_RWLOCK_WRPHASE) == 0))
>>>>>>      return 0;
>>>>>> +  else
>>>>>> +    registered_while_in_write_phase = true;
>>>>> Sorry, this doesn't look quite right.  Isn't
>>>>> registered_while_in_write_phase always true?
>>>> Attached is a v2 patch.  It's the same logic, but bigger.  Most of this
>>>> increase is due to reformatting, but I also adapted some of the
>>>> comments.
>>>> I get two failures, but I guess these are either due to the bad internet
>>>> connectivity I currently have, or something at the resolver.
>>>> FAIL: resolv/mtrace-tst-leaks
>>>> FAIL: resolv/tst-leaks
>>>>
>>>>
>>> I have verified that the v2 patch did fix the hang that I saw with my
>>> microbenchmark. I also observed an increase in performance in the new
>>> rwlock code compared with the old one before the major rewrite.
>> Thanks!
>>
>>> On a
>>> 4-socket 40-core 80-thread system, 80 parallel locking threads had an
>>> average per-thread throughput of 32,584 ops/s. The old rwlock code had a
>>> throughput of 13,411 only. So there is a more than 1.4X increase in
>>> performance.
>> Is that with the 50% reads / 50% writes workload (per thread), empty
>> critical sections, and no delay between critical sections?
>>
> Yes, I used the default configuration of 1:1 read/write ratio. The
> critical section isn't exactly empty as I used 1 pause instruction for
> both in the critical section and between critical section.
>
> Regards,
> Longman
>
Just found out that there is a regression in performance when in writer
preferring mode. The average per-thread throughput was 4,733 ops/s with
the old glibc, but 2,300 ops/s with the new implementation vs 32,584
ops/s for the reader-preferring mode. It was said in the code that
writer-preferring mode isn't the focus in the rewrite. So I am not
saying that it is bad, but it is something to keep in mind about.

Regards,
Longman
  
Waiman Long March 27, 2017, 7:10 p.m. UTC | #5
On 03/27/2017 02:59 PM, Waiman Long wrote:
> On 03/27/2017 02:16 PM, Waiman Long wrote:
>> On 03/27/2017 01:53 PM, Torvald Riegel wrote:
>>> On Mon, 2017-03-27 at 12:09 -0400, Waiman Long wrote:
>>>> On 03/25/2017 07:01 PM, Torvald Riegel wrote:
>>>>> On Sat, 2017-03-25 at 21:17 +0100, Florian Weimer wrote:
>>>>>> * Torvald Riegel:
>>>>>>
>>>>>>> +  bool registered_while_in_write_phase = false;
>>>>>>>    if (__glibc_likely ((r & PTHREAD_RWLOCK_WRPHASE) == 0))
>>>>>>>      return 0;
>>>>>>> +  else
>>>>>>> +    registered_while_in_write_phase = true;
>>>>>> Sorry, this doesn't look quite right.  Isn't
>>>>>> registered_while_in_write_phase always true?
>>>>> Attached is a v2 patch.  It's the same logic, but bigger.  Most of this
>>>>> increase is due to reformatting, but I also adapted some of the
>>>>> comments.
>>>>> I get two failures, but I guess these are either due to the bad internet
>>>>> connectivity I currently have, or something at the resolver.
>>>>> FAIL: resolv/mtrace-tst-leaks
>>>>> FAIL: resolv/tst-leaks
>>>>>
>>>>>
>>>> I have verified that the v2 patch did fix the hang that I saw with my
>>>> microbenchmark. I also observed an increase in performance in the new
>>>> rwlock code compared with the old one before the major rewrite.
>>> Thanks!
>>>
>>>> On a
>>>> 4-socket 40-core 80-thread system, 80 parallel locking threads had an
>>>> average per-thread throughput of 32,584 ops/s. The old rwlock code had a
>>>> throughput of 13,411 only. So there is a more than 1.4X increase in
>>>> performance.
>>> Is that with the 50% reads / 50% writes workload (per thread), empty
>>> critical sections, and no delay between critical sections?
>>>
>> Yes, I used the default configuration of 1:1 read/write ratio. The
>> critical section isn't exactly empty as I used 1 pause instruction for
>> both in the critical section and between critical section.
>>
>> Regards,
>> Longman
>>
> Just found out that there is a regression in performance when in writer
> preferring mode. The average per-thread throughput was 4,733 ops/s with
> the old glibc, but 2,300 ops/s with the new implementation vs 32,584
> ops/s for the reader-preferring mode. It was said in the code that
> writer-preferring mode isn't the focus in the rewrite. So I am not
> saying that it is bad, but it is something to keep in mind about.
>
> Regards,
> Longman

Another issue that I saw was lock starvation. IOW, a continuous stream
of readers will block writers from acquiring the lock in
reader-preferring mode. In writer-preferring mode, a continuous stream
of writers will block readers from the acquiring the lock. You can see
that by using the -x option in my rwlock benchmark which will force the
separation of reader and writer threads instead of every thread doing
both reader and writer locks.

Again, this may not be an issue as most workloads won't have this kind
of extreme locking pattern.

Cheers,
Longman
  
Torvald Riegel April 6, 2017, 10:46 a.m. UTC | #6
On Mon, 2017-03-27 at 14:59 -0400, Waiman Long wrote:
> Just found out that there is a regression in performance when in writer
> preferring mode. The average per-thread throughput was 4,733 ops/s with
> the old glibc, but 2,300 ops/s with the new implementation vs 32,584
> ops/s for the reader-preferring mode.

Thanks for the note.  How large is the performance difference for
different numbers of threads?  Is the old rwlock always faster in
writer-preferring mode?
  
Torvald Riegel April 6, 2017, 10:50 a.m. UTC | #7
On Mon, 2017-03-27 at 15:10 -0400, Waiman Long wrote:
> On 03/27/2017 02:59 PM, Waiman Long wrote:
> > On 03/27/2017 02:16 PM, Waiman Long wrote:
> >> On 03/27/2017 01:53 PM, Torvald Riegel wrote:
> >>> On Mon, 2017-03-27 at 12:09 -0400, Waiman Long wrote:
> >>>> On 03/25/2017 07:01 PM, Torvald Riegel wrote:
> >>>>> On Sat, 2017-03-25 at 21:17 +0100, Florian Weimer wrote:
> >>>>>> * Torvald Riegel:
> >>>>>>
> >>>>>>> +  bool registered_while_in_write_phase = false;
> >>>>>>>    if (__glibc_likely ((r & PTHREAD_RWLOCK_WRPHASE) == 0))
> >>>>>>>      return 0;
> >>>>>>> +  else
> >>>>>>> +    registered_while_in_write_phase = true;
> >>>>>> Sorry, this doesn't look quite right.  Isn't
> >>>>>> registered_while_in_write_phase always true?
> >>>>> Attached is a v2 patch.  It's the same logic, but bigger.  Most of this
> >>>>> increase is due to reformatting, but I also adapted some of the
> >>>>> comments.
> >>>>> I get two failures, but I guess these are either due to the bad internet
> >>>>> connectivity I currently have, or something at the resolver.
> >>>>> FAIL: resolv/mtrace-tst-leaks
> >>>>> FAIL: resolv/tst-leaks
> >>>>>
> >>>>>
> >>>> I have verified that the v2 patch did fix the hang that I saw with my
> >>>> microbenchmark. I also observed an increase in performance in the new
> >>>> rwlock code compared with the old one before the major rewrite.
> >>> Thanks!
> >>>
> >>>> On a
> >>>> 4-socket 40-core 80-thread system, 80 parallel locking threads had an
> >>>> average per-thread throughput of 32,584 ops/s. The old rwlock code had a
> >>>> throughput of 13,411 only. So there is a more than 1.4X increase in
> >>>> performance.
> >>> Is that with the 50% reads / 50% writes workload (per thread), empty
> >>> critical sections, and no delay between critical sections?
> >>>
> >> Yes, I used the default configuration of 1:1 read/write ratio. The
> >> critical section isn't exactly empty as I used 1 pause instruction for
> >> both in the critical section and between critical section.
> >>
> >> Regards,
> >> Longman
> >>
> > Just found out that there is a regression in performance when in writer
> > preferring mode. The average per-thread throughput was 4,733 ops/s with
> > the old glibc, but 2,300 ops/s with the new implementation vs 32,584
> > ops/s for the reader-preferring mode. It was said in the code that
> > writer-preferring mode isn't the focus in the rewrite. So I am not
> > saying that it is bad, but it is something to keep in mind about.
> >
> > Regards,
> > Longman
> 
> Another issue that I saw was lock starvation. IOW, a continuous stream
> of readers will block writers from acquiring the lock in
> reader-preferring mode. In writer-preferring mode, a continuous stream
> of writers will block readers from the acquiring the lock. You can see
> that by using the -x option in my rwlock benchmark which will force the
> separation of reader and writer threads instead of every thread doing
> both reader and writer locks.

Yes, this is expected.  Whether that's a good thing or a bad thing
depends on what the user wants (e.g., if a rwlock is used to separate a
garbage collection process (writer) and normal operations (readers),
then maybe the user really wants to run GC just when there's nothing
else going on).  The modes we have currently strictly prefer readers or
writers; we have no modes for mostly preferring readers/writers while
also preventing starvation (for which we'd need to choose some latency
number).  OTOH, one could also argue that preferring readers/writers
should rather be the latter.
  
Torvald Riegel April 6, 2017, 10:51 a.m. UTC | #8
On Sun, 2017-03-26 at 00:01 +0100, Torvald Riegel wrote:
> On Sat, 2017-03-25 at 21:17 +0100, Florian Weimer wrote:
> > * Torvald Riegel:
> > 
> > > +  bool registered_while_in_write_phase = false;
> > >    if (__glibc_likely ((r & PTHREAD_RWLOCK_WRPHASE) == 0))
> > >      return 0;
> > > +  else
> > > +    registered_while_in_write_phase = true;
> > 
> > Sorry, this doesn't look quite right.  Isn't
> > registered_while_in_write_phase always true?
> 
> Attached is a v2 patch.  It's the same logic, but bigger.  Most of this
> increase is due to reformatting, but I also adapted some of the
> comments.
> I get two failures, but I guess these are either due to the bad internet
> connectivity I currently have, or something at the resolver.
> FAIL: resolv/mtrace-tst-leaks
> FAIL: resolv/tst-leaks
> 

Ping.
  
Carlos O'Donell July 28, 2017, 4:53 a.m. UTC | #9
On 03/25/2017 07:01 PM, Torvald Riegel wrote:
> On Sat, 2017-03-25 at 21:17 +0100, Florian Weimer wrote:
>> * Torvald Riegel:
>>
>>> +  bool registered_while_in_write_phase = false;
>>>    if (__glibc_likely ((r & PTHREAD_RWLOCK_WRPHASE) == 0))
>>>      return 0;
>>> +  else
>>> +    registered_while_in_write_phase = true;
>> Sorry, this doesn't look quite right.  Isn't
>> registered_while_in_write_phase always true?
> Attached is a v2 patch.  It's the same logic, but bigger.  Most of this
> increase is due to reformatting, but I also adapted some of the
> comments.

I'm going to commit a v3.

v3
- Added new rwlock APIs for support/ framework.
- Ported test case to support/ framework.

This took me almost as long as the actual rwlock review to re-review because
I had to page everything back in about the read/write phase transitions and
convince myself of several assertions you make in the comments were actually
true.

The patch looks good and I have detailed testing notes below.
 
> commit 68cb32e0202eb8f3c647e56e0a2eb88a4e903e23
> Author: Torvald Riegel <triegel@redhat.com>
> Date:   Sat Mar 25 00:36:46 2017 +0100
> 
>     rwlock: Fix explicit hand-over.
>     
>     Without this patch, the rwlock can fail to execute the explicit hand-over
>     in certain cases (e.g., empty critical sections that switch quickly between
>     read and write phases).  This can then lead to errors in how __wrphase_futex
>     is accessed, which in turn can lead to deadlocks.
>     
>     	[BZ #21298]
>     	* nptl/pthread_rwlock_common.c (__pthread_rwlock_rdlock_full): Fix
>     	explicit hand-over.
>     	(__pthread_rwlock_wrlock_full): Likewise.
>     	* nptl/tst-rwlock20.c: New file.
>     	* nptl/Makefile: Add new test.
> 
> diff --git a/nptl/Makefile b/nptl/Makefile
> index 6d48c0c..0cc2873 100644
> --- a/nptl/Makefile
> +++ b/nptl/Makefile
> @@ -241,7 +241,7 @@ tests = tst-typesizes \
>  	tst-rwlock4 tst-rwlock5 tst-rwlock6 tst-rwlock7 tst-rwlock8 \
>  	tst-rwlock9 tst-rwlock10 tst-rwlock11 tst-rwlock12 tst-rwlock13 \
>  	tst-rwlock14 tst-rwlock15 tst-rwlock16 tst-rwlock17 tst-rwlock18 \
> -	tst-rwlock19 \
> +	tst-rwlock19 tst-rwlock20 \

OK.

This test only reliably failed for me when I used hardware with 8 or
more cores, at 16 cores it fails reliably 100% of the time. I expect this
has to do with having two to three cores completely unloaded for the test
to use those cores.

As examples a ppc64 box with 16 cores reproduces the issue with the
test case 100% of the time, similarly a 16 core x86_64 box, but at 8
cores the reproducer rate falls to ~80-95%, and at 4 cores it never
reproduces i.e. 0%. Just an interesting anecdote for anyone trying to
write such test cases and not finding that they reproduce. I'm sure
that with 4 cores it reproduces at some point in the fare future, but
I don't know what influences that.

Don't even think about reproducing this reliably on qemu/kvm guests,
even with 8 cores, the reproducer rate drops to ~10-16% on such hosts
given the timing variance.

Either way I'm happy to report that I can use the reproducer to successfully
test on the following hardware configurations and verified with thousands
of runs of the reproducer:

- ppc64, verified fixed with reproducer
- ppc64le, verified fixed with reproducer (qemu/kvm guest also tested)
- x86_64, verified fixed with reproducer
- i686, verified fixed with reproducer (x86_64 hardware)
- s390x, verified fixed with reproducer, and despite having only a 4 core
         system, the reproducer rate was ~1.6% (roughly 1 in 100 runs failed).
- armv7hl, verified fixed with reproducer, and despite having only a 4 core
         system, the reproducer rate was ~0.5% (roughtly 1 in 200 runs failed).
         Note: I had to try two different hardware boxes to find one with the
	       ~0.5% failure rate, the other box never failed the reproducer,
	       so there is some aspect of hardware timing there.

I could not get the reproder to verify the fix on this hardware, but also it
means no regressions were seen:

- aarch64, could not verify fixed (used 32 core box)

This looks very good to me indicating that we probably have caught the bug
for the reader/writer phase transition.

>  	tst-once1 tst-once2 tst-once3 tst-once4 tst-once5 \
>  	tst-key1 tst-key2 tst-key3 tst-key4 \
>  	tst-sem1 tst-sem2 tst-sem3 tst-sem4 tst-sem5 tst-sem6 tst-sem7 \
> diff --git a/nptl/pthread_rwlock_common.c b/nptl/pthread_rwlock_common.c
> index 256508c..c07eae4 100644
> --- a/nptl/pthread_rwlock_common.c
> +++ b/nptl/pthread_rwlock_common.c
> @@ -70,15 +70,16 @@
>     ---------------------------
>     #1    0   0   0   0   Lock is idle (and in a read phase).
>     #2    0   0   >0  0   Readers have acquired the lock.
> -   #3    0   1   0   0   Lock is not acquired; a writer is waiting for a write
> -			 phase to start or will try to start one.
> +   #3    0   1   0   0   Lock is not acquired; a writer will try to start a
> +			 write phase.

OK.

>     #4    0   1   >0  0   Readers have acquired the lock; a writer is waiting
>  			 and explicit hand-over to the writer is required.
>     #4a   0   1   >0  1   Same as #4 except that there are further readers
>  			 waiting because the writer is to be preferred.
>     #5    1   0   0   0   Lock is idle (and in a write phase).
> -   #6    1   0   >0  0   Write phase; readers are waiting for a read phase to
> -			 start or will try to start one.
> +   #6    1   0   >0  0   Write phase; readers will try to start a read phase
> +			 (requires explicit hand-over to all readers that
> +			 do not start the read phase).

OK.

>     #7    1   1   0   0   Lock is acquired by a writer.
>     #8    1   1   >0  0   Lock acquired by a writer and readers are waiting;
>  			 explicit hand-over to the readers is required.
> @@ -375,9 +376,9 @@ __pthread_rwlock_rdlock_full (pthread_rwlock_t *rwlock,
>       complexity.  */
>    if (__glibc_likely ((r & PTHREAD_RWLOCK_WRPHASE) == 0))
>      return 0;
> -
> -  /* If there is no primary writer but we are in a write phase, we can try
> -     to install a read phase ourself.  */
> +  /* Otherwise, if we were in a write phase (states #6 or #8), we must wait
> +     for explicit hand-over of the read phase; the only exception is if we
> +     can start a read phase if there is no primary writer currently.  */



>    while (((r & PTHREAD_RWLOCK_WRPHASE) != 0)
>        && ((r & PTHREAD_RWLOCK_WRLOCKED) == 0))
>      {
> @@ -390,15 +391,18 @@ __pthread_rwlock_rdlock_full (pthread_rwlock_t *rwlock,
>  	{
>  	  /* We started the read phase, so we are also responsible for
>  	     updating the write-phase futex.  Relaxed MO is sufficient.
> -	     Note that there can be no other reader that we have to wake
> -	     because all other readers will see the read phase started by us
> -	     (or they will try to start it themselves); if a writer started
> -	     the read phase, we cannot have started it.  Furthermore, we
> -	     cannot discard a PTHREAD_RWLOCK_FUTEX_USED flag because we will
> -	     overwrite the value set by the most recent writer (or the readers
> -	     before it in case of explicit hand-over) and we know that there
> -	     are no waiting readers.  */
> -	  atomic_store_relaxed (&rwlock->__data.__wrphase_futex, 0);
> +	     We have to do the same steps as a writer would when handing
> +	     over the read phase to us because other readers cannot
> +	     distinguish between us and the writer; this includes
> +	     explicit hand-over and potentially having to wake other readers
> +	     (but we can pretend to do the setting and unsetting of WRLOCKED
> +	     atomically, and thus can skip this step).  */
> +	  if ((atomic_exchange_relaxed (&rwlock->__data.__wrphase_futex, 0)
> +	      & PTHREAD_RWLOCK_FUTEX_USED) != 0)
> +	    {
> +	      int private = __pthread_rwlock_get_private (rwlock);
> +	      futex_wake (&rwlock->__data.__wrphase_futex, INT_MAX, private);
> +	    }

OK.

>  	  return 0;
>  	}
>        else
> @@ -407,102 +411,98 @@ __pthread_rwlock_rdlock_full (pthread_rwlock_t *rwlock,
>  	}
>      }
>  
> -  if ((r & PTHREAD_RWLOCK_WRPHASE) != 0)
> +  /* We were in a write phase but did not install the read phase.  We cannot
> +     distinguish between a writer and another reader starting the read phase,
> +     so we must wait for explicit hand-over via __wrphase_futex.
> +     However, __wrphase_futex might not have been set to 1 yet (either
> +     because explicit hand-over to the writer is still ongoing, or because
> +     the writer has started the write phase but does not yet have updated

s/does not yet have/has not yet/g

> +     __wrphase_futex).  The least recent value of __wrphase_futex we can
> +     read from here is the modification of the last read phase (because
> +     we synchronize with the last reader in this read phase through
> +     __readers; see the use of acquire MO on the fetch_add above).
> +     Therefore, if we observe a value of 0 for __wrphase_futex, we need
> +     to subsequently check that __readers now indicates a read phase; we
> +     need to use acquire MO for this so that if we observe a read phase,
> +     we will also see the modification of __wrphase_futex by the previous
> +     writer.  We then need to load __wrphase_futex again and continue to
> +     wait if it is not 0, so that we do not skip explicit hand-over.
> +     Relaxed MO is sufficient for the load from __wrphase_futex because
> +     we just use it as an indicator for when we can proceed; we use
> +     __readers and the acquire MO accesses to it to eventually read from
> +     the proper stores to __wrphase_futex.  */

OK.

> +  unsigned int wpf;
> +  bool ready = false;

OK.

> +  for (;;)
>      {
> -      /* We are in a write phase, and there must be a primary writer because
> -	 of the previous loop.  Block until the primary writer gives up the
> -	 write phase.  This case requires explicit hand-over using
> -	 __wrphase_futex.
> -	 However, __wrphase_futex might not have been set to 1 yet (either
> -	 because explicit hand-over to the writer is still ongoing, or because
> -	 the writer has started the write phase but does not yet have updated
> -	 __wrphase_futex).  The least recent value of __wrphase_futex we can
> -	 read from here is the modification of the last read phase (because
> -	 we synchronize with the last reader in this read phase through
> -	 __readers; see the use of acquire MO on the fetch_add above).
> -	 Therefore, if we observe a value of 0 for __wrphase_futex, we need
> -	 to subsequently check that __readers now indicates a read phase; we
> -	 need to use acquire MO for this so that if we observe a read phase,
> -	 we will also see the modification of __wrphase_futex by the previous
> -	 writer.  We then need to load __wrphase_futex again and continue to
> -	 wait if it is not 0, so that we do not skip explicit hand-over.
> -	 Relaxed MO is sufficient for the load from __wrphase_futex because
> -	 we just use it as an indicator for when we can proceed; we use
> -	 __readers and the acquire MO accesses to it to eventually read from
> -	 the proper stores to __wrphase_futex.  */
> -      unsigned int wpf;
> -      bool ready = false;
> -      for (;;)
> +      while (((wpf = atomic_load_relaxed (&rwlock->__data.__wrphase_futex))
> +	  | PTHREAD_RWLOCK_FUTEX_USED) == (1 | PTHREAD_RWLOCK_FUTEX_USED))
>  	{
> -	  while (((wpf = atomic_load_relaxed (&rwlock->__data.__wrphase_futex))
> -	      | PTHREAD_RWLOCK_FUTEX_USED) == (1 | PTHREAD_RWLOCK_FUTEX_USED))
> +	  int private = __pthread_rwlock_get_private (rwlock);
> +	  if (((wpf & PTHREAD_RWLOCK_FUTEX_USED) == 0)
> +	      && !atomic_compare_exchange_weak_relaxed
> +		  (&rwlock->__data.__wrphase_futex,
> +		   &wpf, wpf | PTHREAD_RWLOCK_FUTEX_USED))
> +	    continue;
> +	  int err = futex_abstimed_wait (&rwlock->__data.__wrphase_futex,
> +	      1 | PTHREAD_RWLOCK_FUTEX_USED, abstime, private);
> +	  if (err == ETIMEDOUT)
>  	    {
> -	      int private = __pthread_rwlock_get_private (rwlock);
> -	      if (((wpf & PTHREAD_RWLOCK_FUTEX_USED) == 0)
> -		  && !atomic_compare_exchange_weak_relaxed
> -		      (&rwlock->__data.__wrphase_futex,
> -		       &wpf, wpf | PTHREAD_RWLOCK_FUTEX_USED))
> -		continue;
> -	      int err = futex_abstimed_wait (&rwlock->__data.__wrphase_futex,
> -		  1 | PTHREAD_RWLOCK_FUTEX_USED, abstime, private);
> -	      if (err == ETIMEDOUT)
> +	      /* If we timed out, we need to unregister.  If no read phase
> +		 has been installed while we waited, we can just decrement
> +		 the number of readers.  Otherwise, we just acquire the
> +		 lock, which is allowed because we give no precise timing
> +		 guarantees, and because the timeout is only required to
> +		 be in effect if we would have had to wait for other
> +		 threads (e.g., if futex_wait would time-out immediately
> +		 because the given absolute time is in the past).  */
> +	      r = atomic_load_relaxed (&rwlock->__data.__readers);
> +	      while ((r & PTHREAD_RWLOCK_WRPHASE) != 0)
>  		{
> -		  /* If we timed out, we need to unregister.  If no read phase
> -		     has been installed while we waited, we can just decrement
> -		     the number of readers.  Otherwise, we just acquire the
> -		     lock, which is allowed because we give no precise timing
> -		     guarantees, and because the timeout is only required to
> -		     be in effect if we would have had to wait for other
> -		     threads (e.g., if futex_wait would time-out immediately
> -		     because the given absolute time is in the past).  */
> -		  r = atomic_load_relaxed (&rwlock->__data.__readers);
> -		  while ((r & PTHREAD_RWLOCK_WRPHASE) != 0)
> -		    {
> -		      /* We don't need to make anything else visible to
> -			 others besides unregistering, so relaxed MO is
> -			 sufficient.  */
> -		      if (atomic_compare_exchange_weak_relaxed
> -			  (&rwlock->__data.__readers, &r,
> -			   r - (1 << PTHREAD_RWLOCK_READER_SHIFT)))
> -			return ETIMEDOUT;
> -		      /* TODO Back-off.  */
> -		    }
> -		  /* Use the acquire MO fence to mirror the steps taken in the
> -		     non-timeout case.  Note that the read can happen both
> -		     in the atomic_load above as well as in the failure case
> -		     of the CAS operation.  */
> -		  atomic_thread_fence_acquire ();
> -		  /* We still need to wait for explicit hand-over, but we must
> -		     not use futex_wait anymore because we would just time out
> -		     in this case and thus make the spin-waiting we need
> -		     unnecessarily expensive.  */
> -		  while ((atomic_load_relaxed (&rwlock->__data.__wrphase_futex)
> -		      | PTHREAD_RWLOCK_FUTEX_USED)
> -		      == (1 | PTHREAD_RWLOCK_FUTEX_USED))
> -		    {
> -		      /* TODO Back-off?  */
> -		    }
> -		  ready = true;
> -		  break;
> +		  /* We don't need to make anything else visible to
> +		     others besides unregistering, so relaxed MO is
> +		     sufficient.  */
> +		  if (atomic_compare_exchange_weak_relaxed
> +		      (&rwlock->__data.__readers, &r,
> +		       r - (1 << PTHREAD_RWLOCK_READER_SHIFT)))
> +		    return ETIMEDOUT;
> +		  /* TODO Back-off.  */
> +		}
> +	      /* Use the acquire MO fence to mirror the steps taken in the
> +		 non-timeout case.  Note that the read can happen both
> +		 in the atomic_load above as well as in the failure case
> +		 of the CAS operation.  */
> +	      atomic_thread_fence_acquire ();
> +	      /* We still need to wait for explicit hand-over, but we must
> +		 not use futex_wait anymore because we would just time out
> +		 in this case and thus make the spin-waiting we need
> +		 unnecessarily expensive.  */
> +	      while ((atomic_load_relaxed (&rwlock->__data.__wrphase_futex)
> +		  | PTHREAD_RWLOCK_FUTEX_USED)
> +		  == (1 | PTHREAD_RWLOCK_FUTEX_USED))
> +		{
> +		  /* TODO Back-off?  */
>  		}
> -	      /* If we got interrupted (EINTR) or the futex word does not have the
> -		 expected value (EAGAIN), retry.  */
> +	      ready = true;
> +	      break;

OK. Though the refactoring makes this final patch harder to read.

>  	    }
> -	  if (ready)
> -	    /* See below.  */
> -	    break;
> -	  /* We need acquire MO here so that we synchronize with the lock
> -	     release of the writer, and so that we observe a recent value of
> -	     __wrphase_futex (see below).  */
> -	  if ((atomic_load_acquire (&rwlock->__data.__readers)
> -	      & PTHREAD_RWLOCK_WRPHASE) == 0)
> -	    /* We are in a read phase now, so the least recent modification of
> -	       __wrphase_futex we can read from is the store by the writer
> -	       with value 1.  Thus, only now we can assume that if we observe
> -	       a value of 0, explicit hand-over is finished. Retry the loop
> -	       above one more time.  */
> -	    ready = true;
> +	  /* If we got interrupted (EINTR) or the futex word does not have the
> +	     expected value (EAGAIN), retry.  */

OK.

>  	}
> +      if (ready)
> +	/* See below.  */
> +	break;
> +      /* We need acquire MO here so that we synchronize with the lock
> +	 release of the writer, and so that we observe a recent value of
> +	 __wrphase_futex (see below).  */
> +      if ((atomic_load_acquire (&rwlock->__data.__readers)
> +	  & PTHREAD_RWLOCK_WRPHASE) == 0)
> +	/* We are in a read phase now, so the least recent modification of
> +	   __wrphase_futex we can read from is the store by the writer
> +	   with value 1.  Thus, only now we can assume that if we observe
> +	   a value of 0, explicit hand-over is finished. Retry the loop
> +	   above one more time.  */
> +	ready = true;

OK.

>      }
>  
>    return 0;
> @@ -741,10 +741,23 @@ __pthread_rwlock_wrlock_full (pthread_rwlock_t *rwlock,
>  	  r = atomic_load_relaxed (&rwlock->__data.__readers);
>  	}
>        /* Our snapshot of __readers is up-to-date at this point because we
> -	 either set WRLOCKED using a CAS or were handed over WRLOCKED from
> +	 either set WRLOCKED using a CAS (and update r accordingly below,
> +	 which was used as expected value for the CAS) or got WRLOCKED from
>  	 another writer whose snapshot of __readers we inherit.  */
> +      r |= PTHREAD_RWLOCK_WRLOCKED;
>      }
>  
> +  /* We are the primary writer; enable blocking on __writers_futex.  Relaxed
> +     MO is sufficient for futex words; acquire MO on the previous
> +     modifications of __readers ensures that this store happens after the
> +     store of value 0 by the previous primary writer.  */
> +  atomic_store_relaxed (&rwlock->__data.__writers_futex,
> +      1 | (may_share_futex_used_flag ? PTHREAD_RWLOCK_FUTEX_USED : 0));

OK.

> +
> +  /* If we are in a write phase, we have acquired the lock.  */
> +  if ((r & PTHREAD_RWLOCK_WRPHASE) != 0)
> +    goto done;
> +
>    /* If we are in a read phase and there are no readers, try to start a write
>       phase.  */
>    while (((r & PTHREAD_RWLOCK_WRPHASE) == 0)
> @@ -758,166 +771,156 @@ __pthread_rwlock_wrlock_full (pthread_rwlock_t *rwlock,
>  	  &r, r | PTHREAD_RWLOCK_WRPHASE))
>  	{
>  	  /* We have started a write phase, so need to enable readers to wait.
> -	     See the similar case in__pthread_rwlock_rdlock_full.  */
> +	     See the similar case in __pthread_rwlock_rdlock_full.  Unlike in
> +	     that similar case, we are the (only) primary writer and so do
> +	     not need to wake another writer.  */
>  	  atomic_store_relaxed (&rwlock->__data.__wrphase_futex, 1);
> -	  /* Make sure we fall through to the end of the function.  */
> -	  r |= PTHREAD_RWLOCK_WRPHASE;
> -	  break;
> +
> +	  goto done;
>  	}
>        /* TODO Back-off.  */
>      }
>  
> -  /* We are the primary writer; enable blocking on __writers_futex.  Relaxed
> -     MO is sufficient for futex words; acquire MO on the previous
> -     modifications of __readers ensures that this store happens after the
> -     store of value 0 by the previous primary writer.  */
> -  atomic_store_relaxed (&rwlock->__data.__writers_futex,
> -      1 | (may_share_futex_used_flag ? PTHREAD_RWLOCK_FUTEX_USED : 0));
> -
> -  if (__glibc_unlikely ((r & PTHREAD_RWLOCK_WRPHASE) == 0))
> +  /* We became the primary writer in a read phase and there were readers when
> +     we did (because of the previous loop).  Thus, we have to wait for
> +     explicit hand-over from one of these readers.
> +     We basically do the same steps as for the similar case in
> +     __pthread_rwlock_rdlock_full, except that we additionally might try
> +     to directly hand over to another writer and need to wake up
> +     other writers or waiting readers (i.e., PTHREAD_RWLOCK_RWAITING).  */
> +  unsigned int wpf;
> +  bool ready = false;

OK.

> +  for (;;)
>      {
> -      /* We are not in a read phase and there are readers (because of the
> -	 previous loop).  Thus, we have to wait for explicit hand-over from
> -	 one of these readers.
> -	 We basically do the same steps as for the similar case in
> -	 __pthread_rwlock_rdlock_full, except that we additionally might try
> -	 to directly hand over to another writer and need to wake up
> -	 other writers or waiting readers (i.e., PTHREAD_RWLOCK_RWAITING).  */
> -      unsigned int wpf;
> -      bool ready = false;
> -      for (;;)
> +      while (((wpf = atomic_load_relaxed (&rwlock->__data.__wrphase_futex))
> +	  | PTHREAD_RWLOCK_FUTEX_USED) == PTHREAD_RWLOCK_FUTEX_USED)
>  	{
> -	  while (((wpf = atomic_load_relaxed (&rwlock->__data.__wrphase_futex))
> -	      | PTHREAD_RWLOCK_FUTEX_USED) == PTHREAD_RWLOCK_FUTEX_USED)
> +	  int private = __pthread_rwlock_get_private (rwlock);
> +	  if (((wpf & PTHREAD_RWLOCK_FUTEX_USED) == 0)
> +	      && !atomic_compare_exchange_weak_relaxed
> +		  (&rwlock->__data.__wrphase_futex, &wpf,
> +		   PTHREAD_RWLOCK_FUTEX_USED))
> +	    continue;
> +	  int err = futex_abstimed_wait (&rwlock->__data.__wrphase_futex,
> +	      PTHREAD_RWLOCK_FUTEX_USED, abstime, private);
> +	  if (err == ETIMEDOUT)
>  	    {
> -	      int private = __pthread_rwlock_get_private (rwlock);
> -	      if (((wpf & PTHREAD_RWLOCK_FUTEX_USED) == 0)
> -		  && !atomic_compare_exchange_weak_relaxed
> -		      (&rwlock->__data.__wrphase_futex, &wpf,
> -		       PTHREAD_RWLOCK_FUTEX_USED))
> -		continue;
> -	      int err = futex_abstimed_wait (&rwlock->__data.__wrphase_futex,
> -		  PTHREAD_RWLOCK_FUTEX_USED, abstime, private);
> -	      if (err == ETIMEDOUT)
> +	      if (rwlock->__data.__flags
> +		  != PTHREAD_RWLOCK_PREFER_READER_NP)
>  		{
> -		  if (rwlock->__data.__flags
> -		      != PTHREAD_RWLOCK_PREFER_READER_NP)
> -		    {
> -		      /* We try writer--writer hand-over.  */
> -		      unsigned int w = atomic_load_relaxed
> -			  (&rwlock->__data.__writers);
> -		      if (w != 0)
> -			{
> -			  /* We are about to hand over WRLOCKED, so we must
> -			     release __writers_futex too; otherwise, we'd have
> -			     a pending store, which could at least prevent
> -			     other threads from waiting using the futex
> -			     because it could interleave with the stores
> -			     by subsequent writers.  In turn, this means that
> -			     we have to clean up when we do not hand over
> -			     WRLOCKED.
> -			     Release MO so that another writer that gets
> -			     WRLOCKED from us can take over our view of
> -			     __readers.  */
> -			  unsigned int wf = atomic_exchange_relaxed
> -			      (&rwlock->__data.__writers_futex, 0);
> -			  while (w != 0)
> -			    {
> -			      if (atomic_compare_exchange_weak_release
> -				  (&rwlock->__data.__writers, &w,
> -				      w | PTHREAD_RWLOCK_WRHANDOVER))
> -				{
> -				  /* Wake other writers.  */
> -				  if ((wf & PTHREAD_RWLOCK_FUTEX_USED) != 0)
> -				    futex_wake
> -					(&rwlock->__data.__writers_futex, 1,
> -					 private);
> -				  return ETIMEDOUT;
> -				}
> -			      /* TODO Back-off.  */
> -			    }
> -			  /* We still own WRLOCKED and someone else might set
> -			     a write phase concurrently, so enable waiting
> -			     again.  Make sure we don't loose the flag that
> -			     signals whether there are threads waiting on
> -			     this futex.  */
> -			  atomic_store_relaxed
> -			      (&rwlock->__data.__writers_futex, wf);
> -			}
> -		    }
> -		  /* If we timed out and we are not in a write phase, we can
> -		     just stop being a primary writer.  Otherwise, we just
> -		     acquire the lock.  */
> -		  r = atomic_load_relaxed (&rwlock->__data.__readers);
> -		  if ((r & PTHREAD_RWLOCK_WRPHASE) == 0)
> +		  /* We try writer--writer hand-over.  */
> +		  unsigned int w = atomic_load_relaxed
> +		      (&rwlock->__data.__writers);
> +		  if (w != 0)
>  		    {
> -		      /* We are about to release WRLOCKED, so we must release
> -			 __writers_futex too; see the handling of
> -			 writer--writer hand-over above.  */
> +		      /* We are about to hand over WRLOCKED, so we must
> +			 release __writers_futex too; otherwise, we'd have
> +			 a pending store, which could at least prevent
> +			 other threads from waiting using the futex
> +			 because it could interleave with the stores
> +			 by subsequent writers.  In turn, this means that
> +			 we have to clean up when we do not hand over
> +			 WRLOCKED.
> +			 Release MO so that another writer that gets
> +			 WRLOCKED from us can take over our view of
> +			 __readers.  */
>  		      unsigned int wf = atomic_exchange_relaxed
>  			  (&rwlock->__data.__writers_futex, 0);
> -		      while ((r & PTHREAD_RWLOCK_WRPHASE) == 0)
> +		      while (w != 0)
>  			{
> -			  /* While we don't need to make anything from a
> -			     caller's critical section visible to other
> -			     threads, we need to ensure that our changes to
> -			     __writers_futex are properly ordered.
> -			     Therefore, use release MO to synchronize with
> -			     subsequent primary writers.  Also wake up any
> -			     waiting readers as they are waiting because of
> -			     us.  */
>  			  if (atomic_compare_exchange_weak_release
> -			      (&rwlock->__data.__readers, &r,
> -			       (r ^ PTHREAD_RWLOCK_WRLOCKED)
> -			       & ~(unsigned int) PTHREAD_RWLOCK_RWAITING))
> +			      (&rwlock->__data.__writers, &w,
> +				  w | PTHREAD_RWLOCK_WRHANDOVER))
>  			    {
>  			      /* Wake other writers.  */
>  			      if ((wf & PTHREAD_RWLOCK_FUTEX_USED) != 0)
>  				futex_wake (&rwlock->__data.__writers_futex,
> -				    1, private);
> -			      /* Wake waiting readers.  */
> -			      if ((r & PTHREAD_RWLOCK_RWAITING) != 0)
> -				futex_wake (&rwlock->__data.__readers,
> -				    INT_MAX, private);
> +					    1, private);
>  			      return ETIMEDOUT;
>  			    }
> +			  /* TODO Back-off.  */
>  			}
> -		      /* We still own WRLOCKED and someone else might set a
> -			 write phase concurrently, so enable waiting again.
> -			 Make sure we don't loose the flag that signals
> -			 whether there are threads waiting on this futex.  */
> -		      atomic_store_relaxed (&rwlock->__data.__writers_futex,
> -			  wf);
> +		      /* We still own WRLOCKED and someone else might set
> +			 a write phase concurrently, so enable waiting
> +			 again.  Make sure we don't loose the flag that
> +			 signals whether there are threads waiting on
> +			 this futex.  */
> +		      atomic_store_relaxed
> +			  (&rwlock->__data.__writers_futex, wf);
>  		    }
> -		  /* Use the acquire MO fence to mirror the steps taken in the
> -		     non-timeout case.  Note that the read can happen both
> -		     in the atomic_load above as well as in the failure case
> -		     of the CAS operation.  */
> -		  atomic_thread_fence_acquire ();
> -		  /* We still need to wait for explicit hand-over, but we must
> -		     not use futex_wait anymore.  */
> -		  while ((atomic_load_relaxed
> -		      (&rwlock->__data.__wrphase_futex)
> -		       | PTHREAD_RWLOCK_FUTEX_USED)
> -		      == PTHREAD_RWLOCK_FUTEX_USED)
> +		}
> +	      /* If we timed out and we are not in a write phase, we can
> +		 just stop being a primary writer.  Otherwise, we just
> +		 acquire the lock.  */
> +	      r = atomic_load_relaxed (&rwlock->__data.__readers);
> +	      if ((r & PTHREAD_RWLOCK_WRPHASE) == 0)
> +		{
> +		  /* We are about to release WRLOCKED, so we must release
> +		     __writers_futex too; see the handling of
> +		     writer--writer hand-over above.  */
> +		  unsigned int wf = atomic_exchange_relaxed
> +		      (&rwlock->__data.__writers_futex, 0);
> +		  while ((r & PTHREAD_RWLOCK_WRPHASE) == 0)
>  		    {
> -		      /* TODO Back-off.  */
> +		      /* While we don't need to make anything from a
> +			 caller's critical section visible to other
> +			 threads, we need to ensure that our changes to
> +			 __writers_futex are properly ordered.
> +			 Therefore, use release MO to synchronize with
> +			 subsequent primary writers.  Also wake up any
> +			 waiting readers as they are waiting because of
> +			 us.  */
> +		      if (atomic_compare_exchange_weak_release
> +			  (&rwlock->__data.__readers, &r,
> +			   (r ^ PTHREAD_RWLOCK_WRLOCKED)
> +			   & ~(unsigned int) PTHREAD_RWLOCK_RWAITING))
> +			{
> +			  /* Wake other writers.  */
> +			  if ((wf & PTHREAD_RWLOCK_FUTEX_USED) != 0)
> +			    futex_wake (&rwlock->__data.__writers_futex,
> +				1, private);
> +			  /* Wake waiting readers.  */
> +			  if ((r & PTHREAD_RWLOCK_RWAITING) != 0)
> +			    futex_wake (&rwlock->__data.__readers,
> +				INT_MAX, private);
> +			  return ETIMEDOUT;
> +			}
>  		    }
> -		  ready = true;
> -		  break;
> +		  /* We still own WRLOCKED and someone else might set a
> +		     write phase concurrently, so enable waiting again.
> +		     Make sure we don't loose the flag that signals
> +		     whether there are threads waiting on this futex.  */
> +		  atomic_store_relaxed (&rwlock->__data.__writers_futex, wf);

OK.

>  		}
> -	      /* If we got interrupted (EINTR) or the futex word does not have
> -		 the expected value (EAGAIN), retry.  */
> +	      /* Use the acquire MO fence to mirror the steps taken in the
> +		 non-timeout case.  Note that the read can happen both
> +		 in the atomic_load above as well as in the failure case
> +		 of the CAS operation.  */
> +	      atomic_thread_fence_acquire ();
> +	      /* We still need to wait for explicit hand-over, but we must
> +		 not use futex_wait anymore.  */
> +	      while ((atomic_load_relaxed
> +		  (&rwlock->__data.__wrphase_futex)
> +		   | PTHREAD_RWLOCK_FUTEX_USED)
> +		  == PTHREAD_RWLOCK_FUTEX_USED)
> +		{
> +		  /* TODO Back-off.  */
> +		}
> +	      ready = true;
> +	      break;
>  	    }
> -	  /* See pthread_rwlock_rdlock_full.  */
> -	  if (ready)
> -	    break;
> -	  if ((atomic_load_acquire (&rwlock->__data.__readers)
> -	      & PTHREAD_RWLOCK_WRPHASE) != 0)
> -	    ready = true;
> +	  /* If we got interrupted (EINTR) or the futex word does not have
> +	     the expected value (EAGAIN), retry.  */
>  	}
> +      /* See pthread_rwlock_rdlock_full.  */
> +      if (ready)
> +	break;
> +      if ((atomic_load_acquire (&rwlock->__data.__readers)
> +	  & PTHREAD_RWLOCK_WRPHASE) != 0)
> +	ready = true;
>      }
>  
> + done:

OK.

>    atomic_store_relaxed (&rwlock->__data.__cur_writer,
>        THREAD_GETMEM (THREAD_SELF, tid));
>    return 0;
> diff --git a/nptl/tst-rwlock20.c b/nptl/tst-rwlock20.c
> new file mode 100644
> index 0000000..fc636e3
> --- /dev/null
> +++ b/nptl/tst-rwlock20.c
> @@ -0,0 +1,128 @@
> +/* Test program for a read-phase / write-phase explicit hand-over.

OK.

> +   Copyright (C) 2000-2017 Free Software Foundation, Inc.

Fixed, should only be 2017. This is a new test.

> +
> +   The GNU C Library is free software; you can redistribute it and/or
> +   modify it under the terms of the GNU Lesser General Public License as
> +   published by the Free Software Foundation; either version 2.1 of the
> +   License, or (at your option) any later version.
> +
> +   The GNU C Library is distributed in the hope that it will be useful,
> +   but WITHOUT ANY WARRANTY; without even the implied warranty of
> +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> +   Lesser General Public License for more details.
> +
> +   You should have received a copy of the GNU Lesser General Public
> +   License along with the GNU C Library; see the file COPYING.LIB.  If
> +   not, see <http://www.gnu.org/licenses/>.  */
> +
> +#include <errno.h>
> +#include <error.h>
> +#include <pthread.h>
> +#include <stdio.h>
> +#include <stdlib.h>
> +#include <unistd.h>
> +#include <stdint.h>
> +#include <time.h>
> +#include <atomic.h>
> +
> +#define THREADS 2

I incremented this to 3, and wrote up some detailed testing notes here
which I will commit with the changes.

> +
> +#define KIND PTHREAD_RWLOCK_PREFER_READER_NP
> +
> +static pthread_rwlock_t lock;
> +static int done = 0;
> +
> +static void*
> +tf (void* arg)
> +{
> +  while (atomic_load_relaxed (&done) == 0)
> +    {
> +      int rcnt = 0;
> +      int wcnt = 100;
> +      if ((uintptr_t) arg == 0)
> +	{
> +	  rcnt = 1;
> +	  wcnt = 1;
> +	}
> +
> +      do
> +	{
> +	  if (wcnt)
> +	    {
> +	      pthread_rwlock_wrlock (&lock);
> +	      pthread_rwlock_unlock (&lock);
> +	      wcnt--;
> +	  }
> +	  if (rcnt)
> +	    {
> +	      pthread_rwlock_rdlock (&lock);
> +	      pthread_rwlock_unlock (&lock);
> +	      rcnt--;
> +	  }
> +	}
> +      while ((atomic_load_relaxed (&done) == 0) && (rcnt + wcnt > 0));
> +
> +    }
> +    return NULL;
> +}
> +
> +
> +
> +static int
> +do_test (void)
> +{
> +  pthread_t thr[THREADS];
> +  int n;
> +  void *res;
> +  pthread_rwlockattr_t a;
> +
> +  if (pthread_rwlockattr_init (&a) != 0)
> +    {
> +      puts ("rwlockattr_t failed");
> +      exit (1);
> +    }
> +
> +  if (pthread_rwlockattr_setkind_np (&a, KIND) != 0)
> +    {
> +      puts ("rwlockattr_setkind failed");
> +      exit (1);
> +    }
> +
> +  if (pthread_rwlock_init (&lock, &a) != 0)
> +    {
> +      puts ("rwlock_init failed");
> +      exit (1);
> +    }
> +
> +  /* Make standard error the same as standard output.  */
> +  dup2 (1, 2);
> +
> +  /* Make sure we see all message, even those on stdout.  */
> +  setvbuf (stdout, NULL, _IONBF, 0);
> +
> +  for (n = 0; n < THREADS; ++n)
> +    if (pthread_create (&thr[n], NULL, tf, (void *) (uintptr_t) n) != 0)
> +      {
> +	puts ("thread create failed");
> +	exit (1);
> +      }
> +  struct timespec delay;
> +  delay.tv_sec = 10;
> +  delay.tv_nsec = 0;
> +  nanosleep (&delay, NULL);
> +  atomic_store_relaxed (&done, 1);
> +
> +  /* Wait for all the threads.  */
> +  for (n = 0; n < THREADS; ++n)
> +    if (pthread_join (thr[n], &res) != 0)
> +      {
> +	puts ("writer join failed");
> +	exit (1);
> +      }
> +
> +  return 0;
> +}
> +
> +#define TIMEOUT 15
> +#define TEST_FUNCTION do_test ()
> +#include "../test-skeleton.c"

I converted this to use support/* testing.
  

Patch

commit 68cb32e0202eb8f3c647e56e0a2eb88a4e903e23
Author: Torvald Riegel <triegel@redhat.com>
Date:   Sat Mar 25 00:36:46 2017 +0100

    rwlock: Fix explicit hand-over.
    
    Without this patch, the rwlock can fail to execute the explicit hand-over
    in certain cases (e.g., empty critical sections that switch quickly between
    read and write phases).  This can then lead to errors in how __wrphase_futex
    is accessed, which in turn can lead to deadlocks.
    
    	[BZ #21298]
    	* nptl/pthread_rwlock_common.c (__pthread_rwlock_rdlock_full): Fix
    	explicit hand-over.
    	(__pthread_rwlock_wrlock_full): Likewise.
    	* nptl/tst-rwlock20.c: New file.
    	* nptl/Makefile: Add new test.

diff --git a/nptl/Makefile b/nptl/Makefile
index 6d48c0c..0cc2873 100644
--- a/nptl/Makefile
+++ b/nptl/Makefile
@@ -241,7 +241,7 @@  tests = tst-typesizes \
 	tst-rwlock4 tst-rwlock5 tst-rwlock6 tst-rwlock7 tst-rwlock8 \
 	tst-rwlock9 tst-rwlock10 tst-rwlock11 tst-rwlock12 tst-rwlock13 \
 	tst-rwlock14 tst-rwlock15 tst-rwlock16 tst-rwlock17 tst-rwlock18 \
-	tst-rwlock19 \
+	tst-rwlock19 tst-rwlock20 \
 	tst-once1 tst-once2 tst-once3 tst-once4 tst-once5 \
 	tst-key1 tst-key2 tst-key3 tst-key4 \
 	tst-sem1 tst-sem2 tst-sem3 tst-sem4 tst-sem5 tst-sem6 tst-sem7 \
diff --git a/nptl/pthread_rwlock_common.c b/nptl/pthread_rwlock_common.c
index 256508c..c07eae4 100644
--- a/nptl/pthread_rwlock_common.c
+++ b/nptl/pthread_rwlock_common.c
@@ -70,15 +70,16 @@ 
    ---------------------------
    #1    0   0   0   0   Lock is idle (and in a read phase).
    #2    0   0   >0  0   Readers have acquired the lock.
-   #3    0   1   0   0   Lock is not acquired; a writer is waiting for a write
-			 phase to start or will try to start one.
+   #3    0   1   0   0   Lock is not acquired; a writer will try to start a
+			 write phase.
    #4    0   1   >0  0   Readers have acquired the lock; a writer is waiting
 			 and explicit hand-over to the writer is required.
    #4a   0   1   >0  1   Same as #4 except that there are further readers
 			 waiting because the writer is to be preferred.
    #5    1   0   0   0   Lock is idle (and in a write phase).
-   #6    1   0   >0  0   Write phase; readers are waiting for a read phase to
-			 start or will try to start one.
+   #6    1   0   >0  0   Write phase; readers will try to start a read phase
+			 (requires explicit hand-over to all readers that
+			 do not start the read phase).
    #7    1   1   0   0   Lock is acquired by a writer.
    #8    1   1   >0  0   Lock acquired by a writer and readers are waiting;
 			 explicit hand-over to the readers is required.
@@ -375,9 +376,9 @@  __pthread_rwlock_rdlock_full (pthread_rwlock_t *rwlock,
      complexity.  */
   if (__glibc_likely ((r & PTHREAD_RWLOCK_WRPHASE) == 0))
     return 0;
-
-  /* If there is no primary writer but we are in a write phase, we can try
-     to install a read phase ourself.  */
+  /* Otherwise, if we were in a write phase (states #6 or #8), we must wait
+     for explicit hand-over of the read phase; the only exception is if we
+     can start a read phase if there is no primary writer currently.  */
   while (((r & PTHREAD_RWLOCK_WRPHASE) != 0)
       && ((r & PTHREAD_RWLOCK_WRLOCKED) == 0))
     {
@@ -390,15 +391,18 @@  __pthread_rwlock_rdlock_full (pthread_rwlock_t *rwlock,
 	{
 	  /* We started the read phase, so we are also responsible for
 	     updating the write-phase futex.  Relaxed MO is sufficient.
-	     Note that there can be no other reader that we have to wake
-	     because all other readers will see the read phase started by us
-	     (or they will try to start it themselves); if a writer started
-	     the read phase, we cannot have started it.  Furthermore, we
-	     cannot discard a PTHREAD_RWLOCK_FUTEX_USED flag because we will
-	     overwrite the value set by the most recent writer (or the readers
-	     before it in case of explicit hand-over) and we know that there
-	     are no waiting readers.  */
-	  atomic_store_relaxed (&rwlock->__data.__wrphase_futex, 0);
+	     We have to do the same steps as a writer would when handing
+	     over the read phase to us because other readers cannot
+	     distinguish between us and the writer; this includes
+	     explicit hand-over and potentially having to wake other readers
+	     (but we can pretend to do the setting and unsetting of WRLOCKED
+	     atomically, and thus can skip this step).  */
+	  if ((atomic_exchange_relaxed (&rwlock->__data.__wrphase_futex, 0)
+	      & PTHREAD_RWLOCK_FUTEX_USED) != 0)
+	    {
+	      int private = __pthread_rwlock_get_private (rwlock);
+	      futex_wake (&rwlock->__data.__wrphase_futex, INT_MAX, private);
+	    }
 	  return 0;
 	}
       else
@@ -407,102 +411,98 @@  __pthread_rwlock_rdlock_full (pthread_rwlock_t *rwlock,
 	}
     }
 
-  if ((r & PTHREAD_RWLOCK_WRPHASE) != 0)
+  /* We were in a write phase but did not install the read phase.  We cannot
+     distinguish between a writer and another reader starting the read phase,
+     so we must wait for explicit hand-over via __wrphase_futex.
+     However, __wrphase_futex might not have been set to 1 yet (either
+     because explicit hand-over to the writer is still ongoing, or because
+     the writer has started the write phase but does not yet have updated
+     __wrphase_futex).  The least recent value of __wrphase_futex we can
+     read from here is the modification of the last read phase (because
+     we synchronize with the last reader in this read phase through
+     __readers; see the use of acquire MO on the fetch_add above).
+     Therefore, if we observe a value of 0 for __wrphase_futex, we need
+     to subsequently check that __readers now indicates a read phase; we
+     need to use acquire MO for this so that if we observe a read phase,
+     we will also see the modification of __wrphase_futex by the previous
+     writer.  We then need to load __wrphase_futex again and continue to
+     wait if it is not 0, so that we do not skip explicit hand-over.
+     Relaxed MO is sufficient for the load from __wrphase_futex because
+     we just use it as an indicator for when we can proceed; we use
+     __readers and the acquire MO accesses to it to eventually read from
+     the proper stores to __wrphase_futex.  */
+  unsigned int wpf;
+  bool ready = false;
+  for (;;)
     {
-      /* We are in a write phase, and there must be a primary writer because
-	 of the previous loop.  Block until the primary writer gives up the
-	 write phase.  This case requires explicit hand-over using
-	 __wrphase_futex.
-	 However, __wrphase_futex might not have been set to 1 yet (either
-	 because explicit hand-over to the writer is still ongoing, or because
-	 the writer has started the write phase but does not yet have updated
-	 __wrphase_futex).  The least recent value of __wrphase_futex we can
-	 read from here is the modification of the last read phase (because
-	 we synchronize with the last reader in this read phase through
-	 __readers; see the use of acquire MO on the fetch_add above).
-	 Therefore, if we observe a value of 0 for __wrphase_futex, we need
-	 to subsequently check that __readers now indicates a read phase; we
-	 need to use acquire MO for this so that if we observe a read phase,
-	 we will also see the modification of __wrphase_futex by the previous
-	 writer.  We then need to load __wrphase_futex again and continue to
-	 wait if it is not 0, so that we do not skip explicit hand-over.
-	 Relaxed MO is sufficient for the load from __wrphase_futex because
-	 we just use it as an indicator for when we can proceed; we use
-	 __readers and the acquire MO accesses to it to eventually read from
-	 the proper stores to __wrphase_futex.  */
-      unsigned int wpf;
-      bool ready = false;
-      for (;;)
+      while (((wpf = atomic_load_relaxed (&rwlock->__data.__wrphase_futex))
+	  | PTHREAD_RWLOCK_FUTEX_USED) == (1 | PTHREAD_RWLOCK_FUTEX_USED))
 	{
-	  while (((wpf = atomic_load_relaxed (&rwlock->__data.__wrphase_futex))
-	      | PTHREAD_RWLOCK_FUTEX_USED) == (1 | PTHREAD_RWLOCK_FUTEX_USED))
+	  int private = __pthread_rwlock_get_private (rwlock);
+	  if (((wpf & PTHREAD_RWLOCK_FUTEX_USED) == 0)
+	      && !atomic_compare_exchange_weak_relaxed
+		  (&rwlock->__data.__wrphase_futex,
+		   &wpf, wpf | PTHREAD_RWLOCK_FUTEX_USED))
+	    continue;
+	  int err = futex_abstimed_wait (&rwlock->__data.__wrphase_futex,
+	      1 | PTHREAD_RWLOCK_FUTEX_USED, abstime, private);
+	  if (err == ETIMEDOUT)
 	    {
-	      int private = __pthread_rwlock_get_private (rwlock);
-	      if (((wpf & PTHREAD_RWLOCK_FUTEX_USED) == 0)
-		  && !atomic_compare_exchange_weak_relaxed
-		      (&rwlock->__data.__wrphase_futex,
-		       &wpf, wpf | PTHREAD_RWLOCK_FUTEX_USED))
-		continue;
-	      int err = futex_abstimed_wait (&rwlock->__data.__wrphase_futex,
-		  1 | PTHREAD_RWLOCK_FUTEX_USED, abstime, private);
-	      if (err == ETIMEDOUT)
+	      /* If we timed out, we need to unregister.  If no read phase
+		 has been installed while we waited, we can just decrement
+		 the number of readers.  Otherwise, we just acquire the
+		 lock, which is allowed because we give no precise timing
+		 guarantees, and because the timeout is only required to
+		 be in effect if we would have had to wait for other
+		 threads (e.g., if futex_wait would time-out immediately
+		 because the given absolute time is in the past).  */
+	      r = atomic_load_relaxed (&rwlock->__data.__readers);
+	      while ((r & PTHREAD_RWLOCK_WRPHASE) != 0)
 		{
-		  /* If we timed out, we need to unregister.  If no read phase
-		     has been installed while we waited, we can just decrement
-		     the number of readers.  Otherwise, we just acquire the
-		     lock, which is allowed because we give no precise timing
-		     guarantees, and because the timeout is only required to
-		     be in effect if we would have had to wait for other
-		     threads (e.g., if futex_wait would time-out immediately
-		     because the given absolute time is in the past).  */
-		  r = atomic_load_relaxed (&rwlock->__data.__readers);
-		  while ((r & PTHREAD_RWLOCK_WRPHASE) != 0)
-		    {
-		      /* We don't need to make anything else visible to
-			 others besides unregistering, so relaxed MO is
-			 sufficient.  */
-		      if (atomic_compare_exchange_weak_relaxed
-			  (&rwlock->__data.__readers, &r,
-			   r - (1 << PTHREAD_RWLOCK_READER_SHIFT)))
-			return ETIMEDOUT;
-		      /* TODO Back-off.  */
-		    }
-		  /* Use the acquire MO fence to mirror the steps taken in the
-		     non-timeout case.  Note that the read can happen both
-		     in the atomic_load above as well as in the failure case
-		     of the CAS operation.  */
-		  atomic_thread_fence_acquire ();
-		  /* We still need to wait for explicit hand-over, but we must
-		     not use futex_wait anymore because we would just time out
-		     in this case and thus make the spin-waiting we need
-		     unnecessarily expensive.  */
-		  while ((atomic_load_relaxed (&rwlock->__data.__wrphase_futex)
-		      | PTHREAD_RWLOCK_FUTEX_USED)
-		      == (1 | PTHREAD_RWLOCK_FUTEX_USED))
-		    {
-		      /* TODO Back-off?  */
-		    }
-		  ready = true;
-		  break;
+		  /* We don't need to make anything else visible to
+		     others besides unregistering, so relaxed MO is
+		     sufficient.  */
+		  if (atomic_compare_exchange_weak_relaxed
+		      (&rwlock->__data.__readers, &r,
+		       r - (1 << PTHREAD_RWLOCK_READER_SHIFT)))
+		    return ETIMEDOUT;
+		  /* TODO Back-off.  */
+		}
+	      /* Use the acquire MO fence to mirror the steps taken in the
+		 non-timeout case.  Note that the read can happen both
+		 in the atomic_load above as well as in the failure case
+		 of the CAS operation.  */
+	      atomic_thread_fence_acquire ();
+	      /* We still need to wait for explicit hand-over, but we must
+		 not use futex_wait anymore because we would just time out
+		 in this case and thus make the spin-waiting we need
+		 unnecessarily expensive.  */
+	      while ((atomic_load_relaxed (&rwlock->__data.__wrphase_futex)
+		  | PTHREAD_RWLOCK_FUTEX_USED)
+		  == (1 | PTHREAD_RWLOCK_FUTEX_USED))
+		{
+		  /* TODO Back-off?  */
 		}
-	      /* If we got interrupted (EINTR) or the futex word does not have the
-		 expected value (EAGAIN), retry.  */
+	      ready = true;
+	      break;
 	    }
-	  if (ready)
-	    /* See below.  */
-	    break;
-	  /* We need acquire MO here so that we synchronize with the lock
-	     release of the writer, and so that we observe a recent value of
-	     __wrphase_futex (see below).  */
-	  if ((atomic_load_acquire (&rwlock->__data.__readers)
-	      & PTHREAD_RWLOCK_WRPHASE) == 0)
-	    /* We are in a read phase now, so the least recent modification of
-	       __wrphase_futex we can read from is the store by the writer
-	       with value 1.  Thus, only now we can assume that if we observe
-	       a value of 0, explicit hand-over is finished. Retry the loop
-	       above one more time.  */
-	    ready = true;
+	  /* If we got interrupted (EINTR) or the futex word does not have the
+	     expected value (EAGAIN), retry.  */
 	}
+      if (ready)
+	/* See below.  */
+	break;
+      /* We need acquire MO here so that we synchronize with the lock
+	 release of the writer, and so that we observe a recent value of
+	 __wrphase_futex (see below).  */
+      if ((atomic_load_acquire (&rwlock->__data.__readers)
+	  & PTHREAD_RWLOCK_WRPHASE) == 0)
+	/* We are in a read phase now, so the least recent modification of
+	   __wrphase_futex we can read from is the store by the writer
+	   with value 1.  Thus, only now we can assume that if we observe
+	   a value of 0, explicit hand-over is finished. Retry the loop
+	   above one more time.  */
+	ready = true;
     }
 
   return 0;
@@ -741,10 +741,23 @@  __pthread_rwlock_wrlock_full (pthread_rwlock_t *rwlock,
 	  r = atomic_load_relaxed (&rwlock->__data.__readers);
 	}
       /* Our snapshot of __readers is up-to-date at this point because we
-	 either set WRLOCKED using a CAS or were handed over WRLOCKED from
+	 either set WRLOCKED using a CAS (and update r accordingly below,
+	 which was used as expected value for the CAS) or got WRLOCKED from
 	 another writer whose snapshot of __readers we inherit.  */
+      r |= PTHREAD_RWLOCK_WRLOCKED;
     }
 
+  /* We are the primary writer; enable blocking on __writers_futex.  Relaxed
+     MO is sufficient for futex words; acquire MO on the previous
+     modifications of __readers ensures that this store happens after the
+     store of value 0 by the previous primary writer.  */
+  atomic_store_relaxed (&rwlock->__data.__writers_futex,
+      1 | (may_share_futex_used_flag ? PTHREAD_RWLOCK_FUTEX_USED : 0));
+
+  /* If we are in a write phase, we have acquired the lock.  */
+  if ((r & PTHREAD_RWLOCK_WRPHASE) != 0)
+    goto done;
+
   /* If we are in a read phase and there are no readers, try to start a write
      phase.  */
   while (((r & PTHREAD_RWLOCK_WRPHASE) == 0)
@@ -758,166 +771,156 @@  __pthread_rwlock_wrlock_full (pthread_rwlock_t *rwlock,
 	  &r, r | PTHREAD_RWLOCK_WRPHASE))
 	{
 	  /* We have started a write phase, so need to enable readers to wait.
-	     See the similar case in__pthread_rwlock_rdlock_full.  */
+	     See the similar case in __pthread_rwlock_rdlock_full.  Unlike in
+	     that similar case, we are the (only) primary writer and so do
+	     not need to wake another writer.  */
 	  atomic_store_relaxed (&rwlock->__data.__wrphase_futex, 1);
-	  /* Make sure we fall through to the end of the function.  */
-	  r |= PTHREAD_RWLOCK_WRPHASE;
-	  break;
+
+	  goto done;
 	}
       /* TODO Back-off.  */
     }
 
-  /* We are the primary writer; enable blocking on __writers_futex.  Relaxed
-     MO is sufficient for futex words; acquire MO on the previous
-     modifications of __readers ensures that this store happens after the
-     store of value 0 by the previous primary writer.  */
-  atomic_store_relaxed (&rwlock->__data.__writers_futex,
-      1 | (may_share_futex_used_flag ? PTHREAD_RWLOCK_FUTEX_USED : 0));
-
-  if (__glibc_unlikely ((r & PTHREAD_RWLOCK_WRPHASE) == 0))
+  /* We became the primary writer in a read phase and there were readers when
+     we did (because of the previous loop).  Thus, we have to wait for
+     explicit hand-over from one of these readers.
+     We basically do the same steps as for the similar case in
+     __pthread_rwlock_rdlock_full, except that we additionally might try
+     to directly hand over to another writer and need to wake up
+     other writers or waiting readers (i.e., PTHREAD_RWLOCK_RWAITING).  */
+  unsigned int wpf;
+  bool ready = false;
+  for (;;)
     {
-      /* We are not in a read phase and there are readers (because of the
-	 previous loop).  Thus, we have to wait for explicit hand-over from
-	 one of these readers.
-	 We basically do the same steps as for the similar case in
-	 __pthread_rwlock_rdlock_full, except that we additionally might try
-	 to directly hand over to another writer and need to wake up
-	 other writers or waiting readers (i.e., PTHREAD_RWLOCK_RWAITING).  */
-      unsigned int wpf;
-      bool ready = false;
-      for (;;)
+      while (((wpf = atomic_load_relaxed (&rwlock->__data.__wrphase_futex))
+	  | PTHREAD_RWLOCK_FUTEX_USED) == PTHREAD_RWLOCK_FUTEX_USED)
 	{
-	  while (((wpf = atomic_load_relaxed (&rwlock->__data.__wrphase_futex))
-	      | PTHREAD_RWLOCK_FUTEX_USED) == PTHREAD_RWLOCK_FUTEX_USED)
+	  int private = __pthread_rwlock_get_private (rwlock);
+	  if (((wpf & PTHREAD_RWLOCK_FUTEX_USED) == 0)
+	      && !atomic_compare_exchange_weak_relaxed
+		  (&rwlock->__data.__wrphase_futex, &wpf,
+		   PTHREAD_RWLOCK_FUTEX_USED))
+	    continue;
+	  int err = futex_abstimed_wait (&rwlock->__data.__wrphase_futex,
+	      PTHREAD_RWLOCK_FUTEX_USED, abstime, private);
+	  if (err == ETIMEDOUT)
 	    {
-	      int private = __pthread_rwlock_get_private (rwlock);
-	      if (((wpf & PTHREAD_RWLOCK_FUTEX_USED) == 0)
-		  && !atomic_compare_exchange_weak_relaxed
-		      (&rwlock->__data.__wrphase_futex, &wpf,
-		       PTHREAD_RWLOCK_FUTEX_USED))
-		continue;
-	      int err = futex_abstimed_wait (&rwlock->__data.__wrphase_futex,
-		  PTHREAD_RWLOCK_FUTEX_USED, abstime, private);
-	      if (err == ETIMEDOUT)
+	      if (rwlock->__data.__flags
+		  != PTHREAD_RWLOCK_PREFER_READER_NP)
 		{
-		  if (rwlock->__data.__flags
-		      != PTHREAD_RWLOCK_PREFER_READER_NP)
-		    {
-		      /* We try writer--writer hand-over.  */
-		      unsigned int w = atomic_load_relaxed
-			  (&rwlock->__data.__writers);
-		      if (w != 0)
-			{
-			  /* We are about to hand over WRLOCKED, so we must
-			     release __writers_futex too; otherwise, we'd have
-			     a pending store, which could at least prevent
-			     other threads from waiting using the futex
-			     because it could interleave with the stores
-			     by subsequent writers.  In turn, this means that
-			     we have to clean up when we do not hand over
-			     WRLOCKED.
-			     Release MO so that another writer that gets
-			     WRLOCKED from us can take over our view of
-			     __readers.  */
-			  unsigned int wf = atomic_exchange_relaxed
-			      (&rwlock->__data.__writers_futex, 0);
-			  while (w != 0)
-			    {
-			      if (atomic_compare_exchange_weak_release
-				  (&rwlock->__data.__writers, &w,
-				      w | PTHREAD_RWLOCK_WRHANDOVER))
-				{
-				  /* Wake other writers.  */
-				  if ((wf & PTHREAD_RWLOCK_FUTEX_USED) != 0)
-				    futex_wake
-					(&rwlock->__data.__writers_futex, 1,
-					 private);
-				  return ETIMEDOUT;
-				}
-			      /* TODO Back-off.  */
-			    }
-			  /* We still own WRLOCKED and someone else might set
-			     a write phase concurrently, so enable waiting
-			     again.  Make sure we don't loose the flag that
-			     signals whether there are threads waiting on
-			     this futex.  */
-			  atomic_store_relaxed
-			      (&rwlock->__data.__writers_futex, wf);
-			}
-		    }
-		  /* If we timed out and we are not in a write phase, we can
-		     just stop being a primary writer.  Otherwise, we just
-		     acquire the lock.  */
-		  r = atomic_load_relaxed (&rwlock->__data.__readers);
-		  if ((r & PTHREAD_RWLOCK_WRPHASE) == 0)
+		  /* We try writer--writer hand-over.  */
+		  unsigned int w = atomic_load_relaxed
+		      (&rwlock->__data.__writers);
+		  if (w != 0)
 		    {
-		      /* We are about to release WRLOCKED, so we must release
-			 __writers_futex too; see the handling of
-			 writer--writer hand-over above.  */
+		      /* We are about to hand over WRLOCKED, so we must
+			 release __writers_futex too; otherwise, we'd have
+			 a pending store, which could at least prevent
+			 other threads from waiting using the futex
+			 because it could interleave with the stores
+			 by subsequent writers.  In turn, this means that
+			 we have to clean up when we do not hand over
+			 WRLOCKED.
+			 Release MO so that another writer that gets
+			 WRLOCKED from us can take over our view of
+			 __readers.  */
 		      unsigned int wf = atomic_exchange_relaxed
 			  (&rwlock->__data.__writers_futex, 0);
-		      while ((r & PTHREAD_RWLOCK_WRPHASE) == 0)
+		      while (w != 0)
 			{
-			  /* While we don't need to make anything from a
-			     caller's critical section visible to other
-			     threads, we need to ensure that our changes to
-			     __writers_futex are properly ordered.
-			     Therefore, use release MO to synchronize with
-			     subsequent primary writers.  Also wake up any
-			     waiting readers as they are waiting because of
-			     us.  */
 			  if (atomic_compare_exchange_weak_release
-			      (&rwlock->__data.__readers, &r,
-			       (r ^ PTHREAD_RWLOCK_WRLOCKED)
-			       & ~(unsigned int) PTHREAD_RWLOCK_RWAITING))
+			      (&rwlock->__data.__writers, &w,
+				  w | PTHREAD_RWLOCK_WRHANDOVER))
 			    {
 			      /* Wake other writers.  */
 			      if ((wf & PTHREAD_RWLOCK_FUTEX_USED) != 0)
 				futex_wake (&rwlock->__data.__writers_futex,
-				    1, private);
-			      /* Wake waiting readers.  */
-			      if ((r & PTHREAD_RWLOCK_RWAITING) != 0)
-				futex_wake (&rwlock->__data.__readers,
-				    INT_MAX, private);
+					    1, private);
 			      return ETIMEDOUT;
 			    }
+			  /* TODO Back-off.  */
 			}
-		      /* We still own WRLOCKED and someone else might set a
-			 write phase concurrently, so enable waiting again.
-			 Make sure we don't loose the flag that signals
-			 whether there are threads waiting on this futex.  */
-		      atomic_store_relaxed (&rwlock->__data.__writers_futex,
-			  wf);
+		      /* We still own WRLOCKED and someone else might set
+			 a write phase concurrently, so enable waiting
+			 again.  Make sure we don't loose the flag that
+			 signals whether there are threads waiting on
+			 this futex.  */
+		      atomic_store_relaxed
+			  (&rwlock->__data.__writers_futex, wf);
 		    }
-		  /* Use the acquire MO fence to mirror the steps taken in the
-		     non-timeout case.  Note that the read can happen both
-		     in the atomic_load above as well as in the failure case
-		     of the CAS operation.  */
-		  atomic_thread_fence_acquire ();
-		  /* We still need to wait for explicit hand-over, but we must
-		     not use futex_wait anymore.  */
-		  while ((atomic_load_relaxed
-		      (&rwlock->__data.__wrphase_futex)
-		       | PTHREAD_RWLOCK_FUTEX_USED)
-		      == PTHREAD_RWLOCK_FUTEX_USED)
+		}
+	      /* If we timed out and we are not in a write phase, we can
+		 just stop being a primary writer.  Otherwise, we just
+		 acquire the lock.  */
+	      r = atomic_load_relaxed (&rwlock->__data.__readers);
+	      if ((r & PTHREAD_RWLOCK_WRPHASE) == 0)
+		{
+		  /* We are about to release WRLOCKED, so we must release
+		     __writers_futex too; see the handling of
+		     writer--writer hand-over above.  */
+		  unsigned int wf = atomic_exchange_relaxed
+		      (&rwlock->__data.__writers_futex, 0);
+		  while ((r & PTHREAD_RWLOCK_WRPHASE) == 0)
 		    {
-		      /* TODO Back-off.  */
+		      /* While we don't need to make anything from a
+			 caller's critical section visible to other
+			 threads, we need to ensure that our changes to
+			 __writers_futex are properly ordered.
+			 Therefore, use release MO to synchronize with
+			 subsequent primary writers.  Also wake up any
+			 waiting readers as they are waiting because of
+			 us.  */
+		      if (atomic_compare_exchange_weak_release
+			  (&rwlock->__data.__readers, &r,
+			   (r ^ PTHREAD_RWLOCK_WRLOCKED)
+			   & ~(unsigned int) PTHREAD_RWLOCK_RWAITING))
+			{
+			  /* Wake other writers.  */
+			  if ((wf & PTHREAD_RWLOCK_FUTEX_USED) != 0)
+			    futex_wake (&rwlock->__data.__writers_futex,
+				1, private);
+			  /* Wake waiting readers.  */
+			  if ((r & PTHREAD_RWLOCK_RWAITING) != 0)
+			    futex_wake (&rwlock->__data.__readers,
+				INT_MAX, private);
+			  return ETIMEDOUT;
+			}
 		    }
-		  ready = true;
-		  break;
+		  /* We still own WRLOCKED and someone else might set a
+		     write phase concurrently, so enable waiting again.
+		     Make sure we don't loose the flag that signals
+		     whether there are threads waiting on this futex.  */
+		  atomic_store_relaxed (&rwlock->__data.__writers_futex, wf);
 		}
-	      /* If we got interrupted (EINTR) or the futex word does not have
-		 the expected value (EAGAIN), retry.  */
+	      /* Use the acquire MO fence to mirror the steps taken in the
+		 non-timeout case.  Note that the read can happen both
+		 in the atomic_load above as well as in the failure case
+		 of the CAS operation.  */
+	      atomic_thread_fence_acquire ();
+	      /* We still need to wait for explicit hand-over, but we must
+		 not use futex_wait anymore.  */
+	      while ((atomic_load_relaxed
+		  (&rwlock->__data.__wrphase_futex)
+		   | PTHREAD_RWLOCK_FUTEX_USED)
+		  == PTHREAD_RWLOCK_FUTEX_USED)
+		{
+		  /* TODO Back-off.  */
+		}
+	      ready = true;
+	      break;
 	    }
-	  /* See pthread_rwlock_rdlock_full.  */
-	  if (ready)
-	    break;
-	  if ((atomic_load_acquire (&rwlock->__data.__readers)
-	      & PTHREAD_RWLOCK_WRPHASE) != 0)
-	    ready = true;
+	  /* If we got interrupted (EINTR) or the futex word does not have
+	     the expected value (EAGAIN), retry.  */
 	}
+      /* See pthread_rwlock_rdlock_full.  */
+      if (ready)
+	break;
+      if ((atomic_load_acquire (&rwlock->__data.__readers)
+	  & PTHREAD_RWLOCK_WRPHASE) != 0)
+	ready = true;
     }
 
+ done:
   atomic_store_relaxed (&rwlock->__data.__cur_writer,
       THREAD_GETMEM (THREAD_SELF, tid));
   return 0;
diff --git a/nptl/tst-rwlock20.c b/nptl/tst-rwlock20.c
new file mode 100644
index 0000000..fc636e3
--- /dev/null
+++ b/nptl/tst-rwlock20.c
@@ -0,0 +1,128 @@ 
+/* Test program for a read-phase / write-phase explicit hand-over.
+   Copyright (C) 2000-2017 Free Software Foundation, Inc.
+
+   The GNU C Library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public License as
+   published by the Free Software Foundation; either version 2.1 of the
+   License, or (at your option) any later version.
+
+   The GNU C Library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with the GNU C Library; see the file COPYING.LIB.  If
+   not, see <http://www.gnu.org/licenses/>.  */
+
+#include <errno.h>
+#include <error.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <stdint.h>
+#include <time.h>
+#include <atomic.h>
+
+#define THREADS 2
+
+#define KIND PTHREAD_RWLOCK_PREFER_READER_NP
+
+static pthread_rwlock_t lock;
+static int done = 0;
+
+static void*
+tf (void* arg)
+{
+  while (atomic_load_relaxed (&done) == 0)
+    {
+      int rcnt = 0;
+      int wcnt = 100;
+      if ((uintptr_t) arg == 0)
+	{
+	  rcnt = 1;
+	  wcnt = 1;
+	}
+
+      do
+	{
+	  if (wcnt)
+	    {
+	      pthread_rwlock_wrlock (&lock);
+	      pthread_rwlock_unlock (&lock);
+	      wcnt--;
+	  }
+	  if (rcnt)
+	    {
+	      pthread_rwlock_rdlock (&lock);
+	      pthread_rwlock_unlock (&lock);
+	      rcnt--;
+	  }
+	}
+      while ((atomic_load_relaxed (&done) == 0) && (rcnt + wcnt > 0));
+
+    }
+    return NULL;
+}
+
+
+
+static int
+do_test (void)
+{
+  pthread_t thr[THREADS];
+  int n;
+  void *res;
+  pthread_rwlockattr_t a;
+
+  if (pthread_rwlockattr_init (&a) != 0)
+    {
+      puts ("rwlockattr_t failed");
+      exit (1);
+    }
+
+  if (pthread_rwlockattr_setkind_np (&a, KIND) != 0)
+    {
+      puts ("rwlockattr_setkind failed");
+      exit (1);
+    }
+
+  if (pthread_rwlock_init (&lock, &a) != 0)
+    {
+      puts ("rwlock_init failed");
+      exit (1);
+    }
+
+  /* Make standard error the same as standard output.  */
+  dup2 (1, 2);
+
+  /* Make sure we see all message, even those on stdout.  */
+  setvbuf (stdout, NULL, _IONBF, 0);
+
+  for (n = 0; n < THREADS; ++n)
+    if (pthread_create (&thr[n], NULL, tf, (void *) (uintptr_t) n) != 0)
+      {
+	puts ("thread create failed");
+	exit (1);
+      }
+  struct timespec delay;
+  delay.tv_sec = 10;
+  delay.tv_nsec = 0;
+  nanosleep (&delay, NULL);
+  atomic_store_relaxed (&done, 1);
+
+  /* Wait for all the threads.  */
+  for (n = 0; n < THREADS; ++n)
+    if (pthread_join (thr[n], &res) != 0)
+      {
+	puts ("writer join failed");
+	exit (1);
+      }
+
+  return 0;
+}
+
+#define TIMEOUT 15
+#define TEST_FUNCTION do_test ()
+#include "../test-skeleton.c"