libatomic: Fix up libat_{,un}lock_n [PR119796]
Checks
Context |
Check |
Description |
linaro-tcwg-bot/tcwg_gcc_build--master-aarch64 |
success
|
Build passed
|
linaro-tcwg-bot/tcwg_simplebootstrap_build--master-arm-bootstrap |
success
|
Build passed
|
linaro-tcwg-bot/tcwg_simplebootstrap_build--master-aarch64-bootstrap |
pending
|
Patch applied
|
linaro-tcwg-bot/tcwg_gcc_check--master-aarch64 |
success
|
Test passed
|
linaro-tcwg-bot/tcwg_gcc_build--master-arm |
success
|
Build passed
|
Commit Message
Hi!
As mentioned in the PR (and I think in PR101075 too), we can run into
deadlock with libat_lock_n calls with larger n.
As mentioned in PR66842, we use multiple locks (normally 64 mutexes
for each 64 byte cache line in 4KiB page) and currently can lock more
than one lock, in particular for n [0, 64] a single lock, for n [65, 128]
2 locks, for n [129, 192] 3 locks etc.
There are two problems with this:
1) we can deadlock if there is some wrap-around, because the locks are
acquired always in the order from addr_hash (ptr) up to
locks[NLOCKS-1].mutex and then if needed from locks[0].mutex onwards;
so if e.g. 2 threads perform libat_lock_n with n = 2048+64, in one
case at pointer starting at page boundary and in another case at
page boundary + 2048 bytes, the first thread can lock the first
32 mutexes, the second thread can lock the last 32 mutexes and
then first thread wait for the lock 32 held by second thread and
second thread wait for the lock 0 held by the first thread;
fixed below by always locking the locks in order of increasing
index, if there is a wrap-around, by locking in 2 loops, first
locking some locks at the start of the array and second at the
end of it
2) the number of locks seems to be determined solely depending on the
n value, I think that is wrong, we don't know the structure alignment
on the libatomic side, it could very well be 1 byte aligned struct,
and so how many cachelines are actually (partly or fully) occupied
by the atomic access depends not just on the size, but also on
ptr % WATCH_SIZE, e.g. 2 byte structure at address page_boundary+63
should IMHO lock 2 locks because it occupies the first and second
cacheline
Note, before this patch it locked exactly one lock for n = 0, while
with this patch it could lock either no locks at all (if it is at cacheline
boundary) or 1 (otherwise).
Dunno of libatomic APIs can be called for zero sizes and whether
we actually care that much how many mutexes are locked in that case,
because one can't actually read/write anything into zero sized memory.
If you think it is important, I could add else if (nlocks == 0) nlocks = 1;
in both spots.
Bootstrapped/regtested on x86_64-linux and i686-linux, ok for trunk?
2025-04-14 Jakub Jelinek <jakub@redhat.com>
PR libstdc++/119796
* config/posix/lock.c (libat_lock_n, libat_unlock_n): Start with
computing how many locks will be needed and take into account
((uintptr_t)ptr % WATCH_SIZE). If some locks from the end of the
locks array and others from the start of it will be needed, first
lock the ones from the start followed by ones from the end.
Jakub
Comments
On Mon, 14 Apr 2025, Jakub Jelinek wrote:
> Hi!
>
> As mentioned in the PR (and I think in PR101075 too), we can run into
> deadlock with libat_lock_n calls with larger n.
> As mentioned in PR66842, we use multiple locks (normally 64 mutexes
> for each 64 byte cache line in 4KiB page) and currently can lock more
> than one lock, in particular for n [0, 64] a single lock, for n [65, 128]
> 2 locks, for n [129, 192] 3 locks etc.
> There are two problems with this:
> 1) we can deadlock if there is some wrap-around, because the locks are
> acquired always in the order from addr_hash (ptr) up to
> locks[NLOCKS-1].mutex and then if needed from locks[0].mutex onwards;
> so if e.g. 2 threads perform libat_lock_n with n = 2048+64, in one
> case at pointer starting at page boundary and in another case at
> page boundary + 2048 bytes, the first thread can lock the first
> 32 mutexes, the second thread can lock the last 32 mutexes and
> then first thread wait for the lock 32 held by second thread and
> second thread wait for the lock 0 held by the first thread;
> fixed below by always locking the locks in order of increasing
> index, if there is a wrap-around, by locking in 2 loops, first
> locking some locks at the start of the array and second at the
> end of it
> 2) the number of locks seems to be determined solely depending on the
> n value, I think that is wrong, we don't know the structure alignment
> on the libatomic side, it could very well be 1 byte aligned struct,
> and so how many cachelines are actually (partly or fully) occupied
> by the atomic access depends not just on the size, but also on
> ptr % WATCH_SIZE, e.g. 2 byte structure at address page_boundary+63
> should IMHO lock 2 locks because it occupies the first and second
> cacheline
>
> Note, before this patch it locked exactly one lock for n = 0, while
> with this patch it could lock either no locks at all (if it is at cacheline
> boundary) or 1 (otherwise).
> Dunno of libatomic APIs can be called for zero sizes and whether
> we actually care that much how many mutexes are locked in that case,
> because one can't actually read/write anything into zero sized memory.
> If you think it is important, I could add else if (nlocks == 0) nlocks = 1;
> in both spots.
>
> Bootstrapped/regtested on x86_64-linux and i686-linux, ok for trunk?
OK.
Richard.
> 2025-04-14 Jakub Jelinek <jakub@redhat.com>
>
> PR libstdc++/119796
> * config/posix/lock.c (libat_lock_n, libat_unlock_n): Start with
> computing how many locks will be needed and take into account
> ((uintptr_t)ptr % WATCH_SIZE). If some locks from the end of the
> locks array and others from the start of it will be needed, first
> lock the ones from the start followed by ones from the end.
>
> --- libatomic/config/posix/lock.c.jj 2025-04-08 14:09:40.988589457 +0200
> +++ libatomic/config/posix/lock.c 2025-04-14 14:27:11.933210898 +0200
> @@ -81,19 +81,22 @@ libat_lock_n (void *ptr, size_t n)
> {
> uintptr_t h = addr_hash (ptr);
> size_t i = 0;
> + size_t nlocks
> + = (n + ((uintptr_t)ptr % WATCH_SIZE) + WATCH_SIZE - 1) / WATCH_SIZE;
>
> /* Don't lock more than all the locks we have. */
> - if (n > PAGE_SIZE)
> - n = PAGE_SIZE;
> + if (nlocks > NLOCKS)
> + nlocks = NLOCKS;
>
> - do
> + if (__builtin_expect (h + nlocks > NLOCKS, 0))
> {
> - pthread_mutex_lock (&locks[h].mutex);
> - if (++h == NLOCKS)
> - h = 0;
> - i += WATCH_SIZE;
> + size_t j = h + nlocks - NLOCKS;
> + for (; i < j; ++i)
> + pthread_mutex_lock (&locks[i].mutex);
> }
> - while (i < n);
> +
> + for (; i < nlocks; ++i)
> + pthread_mutex_lock (&locks[h++].mutex);
> }
>
> void
> @@ -101,16 +104,20 @@ libat_unlock_n (void *ptr, size_t n)
> {
> uintptr_t h = addr_hash (ptr);
> size_t i = 0;
> + size_t nlocks
> + = (n + ((uintptr_t)ptr % WATCH_SIZE) + WATCH_SIZE - 1) / WATCH_SIZE;
>
> - if (n > PAGE_SIZE)
> - n = PAGE_SIZE;
> + /* Don't lock more than all the locks we have. */
> + if (nlocks > NLOCKS)
> + nlocks = NLOCKS;
>
> - do
> + if (__builtin_expect (h + nlocks > NLOCKS, 0))
> {
> - pthread_mutex_unlock (&locks[h].mutex);
> - if (++h == NLOCKS)
> - h = 0;
> - i += WATCH_SIZE;
> + size_t j = h + nlocks - NLOCKS;
> + for (; i < j; ++i)
> + pthread_mutex_unlock (&locks[i].mutex);
> }
> - while (i < n);
> +
> + for (; i < nlocks; ++i)
> + pthread_mutex_unlock (&locks[h++].mutex);
> }
>
> Jakub
>
>
@@ -81,19 +81,22 @@ libat_lock_n (void *ptr, size_t n)
{
uintptr_t h = addr_hash (ptr);
size_t i = 0;
+ size_t nlocks
+ = (n + ((uintptr_t)ptr % WATCH_SIZE) + WATCH_SIZE - 1) / WATCH_SIZE;
/* Don't lock more than all the locks we have. */
- if (n > PAGE_SIZE)
- n = PAGE_SIZE;
+ if (nlocks > NLOCKS)
+ nlocks = NLOCKS;
- do
+ if (__builtin_expect (h + nlocks > NLOCKS, 0))
{
- pthread_mutex_lock (&locks[h].mutex);
- if (++h == NLOCKS)
- h = 0;
- i += WATCH_SIZE;
+ size_t j = h + nlocks - NLOCKS;
+ for (; i < j; ++i)
+ pthread_mutex_lock (&locks[i].mutex);
}
- while (i < n);
+
+ for (; i < nlocks; ++i)
+ pthread_mutex_lock (&locks[h++].mutex);
}
void
@@ -101,16 +104,20 @@ libat_unlock_n (void *ptr, size_t n)
{
uintptr_t h = addr_hash (ptr);
size_t i = 0;
+ size_t nlocks
+ = (n + ((uintptr_t)ptr % WATCH_SIZE) + WATCH_SIZE - 1) / WATCH_SIZE;
- if (n > PAGE_SIZE)
- n = PAGE_SIZE;
+ /* Don't lock more than all the locks we have. */
+ if (nlocks > NLOCKS)
+ nlocks = NLOCKS;
- do
+ if (__builtin_expect (h + nlocks > NLOCKS, 0))
{
- pthread_mutex_unlock (&locks[h].mutex);
- if (++h == NLOCKS)
- h = 0;
- i += WATCH_SIZE;
+ size_t j = h + nlocks - NLOCKS;
+ for (; i < j; ++i)
+ pthread_mutex_unlock (&locks[i].mutex);
}
- while (i < n);
+
+ for (; i < nlocks; ++i)
+ pthread_mutex_unlock (&locks[h++].mutex);
}