[v2,1/4] Avoid submitting empty tasks in parallel_for_each

Message ID 20230110183338.2623088-2-tromey@adacore.com
State Committed
Headers
Series Fix regression in new DWARF reader |

Commit Message

Tom Tromey Jan. 10, 2023, 6:33 p.m. UTC
  I found that parallel_for_each would submit empty tasks to the thread
pool.  For example, this can happen if the number of tasks is smaller
than the number of available threads.  In the DWARF reader, this
resulted in the cooked index containing empty sub-indices.  This patch
arranges to instead shrink the result vector and process the trailing
entries in the calling thread.
---
 gdb/unittests/parallel-for-selftests.c | 39 ++++++++++++++++++++++++++
 gdbsupport/parallel-for.h              | 30 ++++++++++++++++++++
 2 files changed, 69 insertions(+)
  

Comments

Joel Brobecker Jan. 14, 2023, 6:03 a.m. UTC | #1
Hi Tom,

On Tue, Jan 10, 2023 at 11:33:35AM -0700, Tom Tromey via Gdb-patches wrote:
> I found that parallel_for_each would submit empty tasks to the thread
> pool.  For example, this can happen if the number of tasks is smaller
> than the number of available threads.  In the DWARF reader, this
> resulted in the cooked index containing empty sub-indices.  This patch
> arranges to instead shrink the result vector and process the trailing
> entries in the calling thread.


Thanks for the updated patch, and the added test.

This patch looks OK to me.

> ---
>  gdb/unittests/parallel-for-selftests.c | 39 ++++++++++++++++++++++++++
>  gdbsupport/parallel-for.h              | 30 ++++++++++++++++++++
>  2 files changed, 69 insertions(+)
> 
> diff --git a/gdb/unittests/parallel-for-selftests.c b/gdb/unittests/parallel-for-selftests.c
> index 3162db18df1..15a095ae62b 100644
> --- a/gdb/unittests/parallel-for-selftests.c
> +++ b/gdb/unittests/parallel-for-selftests.c
> @@ -149,6 +149,45 @@ TEST (int n_threads)
>    SELF_CHECK (counter == NUMBER);
>  
>  #undef NUMBER
> +
> +  /* Check that if there are fewer tasks than threads, then we won't
> +     end up with a null result.  */
> +  std::vector<std::unique_ptr<int>> intresults;
> +  std::atomic<bool> any_empty_tasks (false);
> +
> +  FOR_EACH (1, 0, 1,
> +	    [&] (int start, int end)
> +	      {
> +		if (start == end)
> +		  any_empty_tasks = true;
> +		return std::unique_ptr<int> (new int (end - start));
> +	      });
> +  SELF_CHECK (!any_empty_tasks);
> +  SELF_CHECK (std::all_of (intresults.begin (),
> +			   intresults.end (),
> +			   [] (const std::unique_ptr<int> &entry)
> +			     {
> +			       return entry != nullptr;
> +			     }));
> +
> +  /* The same but using the task size parameter.  */
> +  intresults.clear ();
> +  any_empty_tasks = false;
> +  FOR_EACH (1, 0, 1,
> +	    [&] (int start, int end)
> +	      {
> +		if (start == end)
> +		  any_empty_tasks = true;
> +		return std::unique_ptr<int> (new int (end - start));
> +	      },
> +	    task_size_one);
> +  SELF_CHECK (!any_empty_tasks);
> +  SELF_CHECK (std::all_of (intresults.begin (),
> +			   intresults.end (),
> +			   [] (const std::unique_ptr<int> &entry)
> +			     {
> +			       return entry != nullptr;
> +			     }));
>  }
>  
>  #endif /* FOR_EACH */
> diff --git a/gdbsupport/parallel-for.h b/gdbsupport/parallel-for.h
> index b565676a0d0..de9ebb15746 100644
> --- a/gdbsupport/parallel-for.h
> +++ b/gdbsupport/parallel-for.h
> @@ -70,6 +70,12 @@ struct par_for_accumulator
>      return result;
>    }
>  
> +  /* Resize the results to N.  */
> +  void resize (size_t n)
> +  {
> +    m_futures.resize (n);
> +  }
> +
>  private:
>    
>    /* A vector of futures coming from the tasks run in the
> @@ -108,6 +114,12 @@ struct par_for_accumulator<void>
>        }
>    }
>  
> +  /* Resize the results to N.  */
> +  void resize (size_t n)
> +  {
> +    m_futures.resize (n);
> +  }
> +
>  private:
>  
>    std::vector<gdb::future<void>> m_futures;
> @@ -232,6 +244,24 @@ parallel_for_each (unsigned n, RandomIt first, RandomIt last,
>  	  end = j;
>  	  remaining_size -= chunk_size;
>  	}
> +
> +      /* This case means we don't have enough elements to really
> +	 distribute them.  Rather than ever submit a task that does
> +	 nothing, we short-circuit here.  */
> +      if (first == end)
> +	end = last;
> +
> +      if (end == last)
> +	{
> +	  /* We're about to dispatch the last batch of elements, which
> +	     we normally process in the main thread.  So just truncate
> +	     the result list here.  This avoids submitting empty tasks
> +	     to the thread pool.  */
> +	  count = i;
> +	  results.resize (count);
> +	  break;
> +	}
> +
>        if (parallel_for_each_debug)
>  	{
>  	  debug_printf (_("Parallel for: elements on worker thread %i\t: %zu"),
> -- 
> 2.38.1
>
  

Patch

diff --git a/gdb/unittests/parallel-for-selftests.c b/gdb/unittests/parallel-for-selftests.c
index 3162db18df1..15a095ae62b 100644
--- a/gdb/unittests/parallel-for-selftests.c
+++ b/gdb/unittests/parallel-for-selftests.c
@@ -149,6 +149,45 @@  TEST (int n_threads)
   SELF_CHECK (counter == NUMBER);
 
 #undef NUMBER
+
+  /* Check that if there are fewer tasks than threads, then we won't
+     end up with a null result.  */
+  std::vector<std::unique_ptr<int>> intresults;
+  std::atomic<bool> any_empty_tasks (false);
+
+  FOR_EACH (1, 0, 1,
+	    [&] (int start, int end)
+	      {
+		if (start == end)
+		  any_empty_tasks = true;
+		return std::unique_ptr<int> (new int (end - start));
+	      });
+  SELF_CHECK (!any_empty_tasks);
+  SELF_CHECK (std::all_of (intresults.begin (),
+			   intresults.end (),
+			   [] (const std::unique_ptr<int> &entry)
+			     {
+			       return entry != nullptr;
+			     }));
+
+  /* The same but using the task size parameter.  */
+  intresults.clear ();
+  any_empty_tasks = false;
+  FOR_EACH (1, 0, 1,
+	    [&] (int start, int end)
+	      {
+		if (start == end)
+		  any_empty_tasks = true;
+		return std::unique_ptr<int> (new int (end - start));
+	      },
+	    task_size_one);
+  SELF_CHECK (!any_empty_tasks);
+  SELF_CHECK (std::all_of (intresults.begin (),
+			   intresults.end (),
+			   [] (const std::unique_ptr<int> &entry)
+			     {
+			       return entry != nullptr;
+			     }));
 }
 
 #endif /* FOR_EACH */
diff --git a/gdbsupport/parallel-for.h b/gdbsupport/parallel-for.h
index b565676a0d0..de9ebb15746 100644
--- a/gdbsupport/parallel-for.h
+++ b/gdbsupport/parallel-for.h
@@ -70,6 +70,12 @@  struct par_for_accumulator
     return result;
   }
 
+  /* Resize the results to N.  */
+  void resize (size_t n)
+  {
+    m_futures.resize (n);
+  }
+
 private:
   
   /* A vector of futures coming from the tasks run in the
@@ -108,6 +114,12 @@  struct par_for_accumulator<void>
       }
   }
 
+  /* Resize the results to N.  */
+  void resize (size_t n)
+  {
+    m_futures.resize (n);
+  }
+
 private:
 
   std::vector<gdb::future<void>> m_futures;
@@ -232,6 +244,24 @@  parallel_for_each (unsigned n, RandomIt first, RandomIt last,
 	  end = j;
 	  remaining_size -= chunk_size;
 	}
+
+      /* This case means we don't have enough elements to really
+	 distribute them.  Rather than ever submit a task that does
+	 nothing, we short-circuit here.  */
+      if (first == end)
+	end = last;
+
+      if (end == last)
+	{
+	  /* We're about to dispatch the last batch of elements, which
+	     we normally process in the main thread.  So just truncate
+	     the result list here.  This avoids submitting empty tasks
+	     to the thread pool.  */
+	  count = i;
+	  results.resize (count);
+	  break;
+	}
+
       if (parallel_for_each_debug)
 	{
 	  debug_printf (_("Parallel for: elements on worker thread %i\t: %zu"),