: PR30316: debuginfod wal checkpointing

Message ID ZFlWrb0NJcBtlOUQ@elastic.org
State Committed
Headers
Series : PR30316: debuginfod wal checkpointing |

Commit Message

Frank Ch. Eigler May 8, 2023, 8:08 p.m. UTC
  Hi -

This is patch 2/2 of this series.


commit 9b70bce69268eaae7621791ba7c72639469d4a69
Author: Frank Ch. Eigler <fche@redhat.com>
Date:   Mon May 8 11:05:48 2023 -0400

    PR30316: debuginfod wal checkpointing
    
    Add a "--scan-checkpoint=NUM" option to debuginfod to control forced
    synchronization & sqlite -wal checkpointing for the multithreaded
    scanning process.  In absence of this, a server that's busy with other
    read & write operations can accumulate potentially large SQLITE WAL
    temporary files.  This option causes the server to take intermittent
    quiescent breaks during scanning, during which the -wal file can be
    processed and truncated.
    
    Signed-off-by: Frank Ch. Eigler <fche@redhat.com>
  

Patch

diff --git a/NEWS b/NEWS
index 3c63a66026d7..53c717eba546 100644
--- a/NEWS
+++ b/NEWS
@@ -2,6 +2,12 @@  Version 0.190
 
 readelf: Support readelf -Ds, --use-dynamic --symbol.
 
+debuginfod: Schema change (reindexing required, sorry!) for a 60%
+            compression in filename representation, which was a large
+            part of the sqlite index; also, more deliberate sqlite
+            -wal management during scanning using the
+            --scan-checkpoint option.
+
 Version 0.189 "Don't deflate!"
 
 configure: eu-nm, eu-addr2line and eu-stack can provide demangled symbols
diff --git a/debuginfod/debuginfod.cxx b/debuginfod/debuginfod.cxx
index 7b19bb3a5d72..d72d2ad16960 100644
--- a/debuginfod/debuginfod.cxx
+++ b/debuginfod/debuginfod.cxx
@@ -429,6 +429,8 @@  static const struct argp_option options[] =
    { "passive", ARGP_KEY_PASSIVE, NULL, 0, "Do not scan or groom, read-only database.", 0 },
 #define ARGP_KEY_DISABLE_SOURCE_SCAN 0x1009
    { "disable-source-scan", ARGP_KEY_DISABLE_SOURCE_SCAN, NULL, 0, "Do not scan dwarf source info.", 0 },
+#define ARGP_SCAN_CHECKPOINT 0x100A
+   { "scan-checkpoint", ARGP_SCAN_CHECKPOINT, "NUM", 0, "Number of files scanned before a WAL checkpoint.", 0 },
    { NULL, 0, NULL, 0, NULL, 0 },
   };
 
@@ -483,6 +485,7 @@  static unsigned forwarded_ttl_limit = 8;
 static bool scan_source_info = true;
 static string tmpdir;
 static bool passive_p = false;
+static long scan_checkpoint = 256;
 
 static void set_metric(const string& key, double value);
 // static void inc_metric(const string& key);
@@ -684,6 +687,11 @@  parse_opt (int key, char *arg,
     case ARGP_KEY_DISABLE_SOURCE_SCAN:
       scan_source_info = false;
       break;
+    case ARGP_SCAN_CHECKPOINT:
+      scan_checkpoint = atol (arg);
+      if (scan_checkpoint < 0)
+        argp_failure(state, 1, EINVAL, "scan checkpoint");        
+      break;
       // case 'h': argp_state_help (state, stderr, ARGP_HELP_LONG|ARGP_HELP_EXIT_OK);
     default: return ARGP_ERR_UNKNOWN;
     }
@@ -929,6 +937,72 @@  class unique_set_reserver
 };
 
 
+////////////////////////////////////////////////////////////////////////
+
+// periodic_barrier is a concurrency control object that lets N threads
+// periodically (based on counter value) agree to wait at a barrier,
+// let one of them carry out some work, then be set free
+
+class periodic_barrier
+{
+private:
+  unsigned period; // number of count() reports to trigger barrier activation
+  unsigned threads; // number of threads participating
+  mutex mtx; // protects all the following fields
+  unsigned counter; // count of count() reports in the current generation
+  unsigned generation; // barrier activation generation
+  unsigned waiting; // number of threads waiting for barrier
+  bool dead; // bring out your
+  condition_variable cv;
+public:
+  periodic_barrier(unsigned t, unsigned p):
+    period(p), threads(t), counter(0), generation(0), waiting(0), dead(false) { }
+  virtual ~periodic_barrier() {}
+
+  virtual void periodic_barrier_work() noexcept = 0;
+  void nuke() {
+    unique_lock<mutex> lock(mtx);
+    dead = true;
+    cv.notify_all();
+  }
+  
+  void count()
+  {
+    unique_lock<mutex> lock(mtx);
+    unsigned prev_generation = this->generation;
+    if (counter < period-1) // normal case: counter just freely running
+      {
+        counter ++;
+        return;
+      }
+    else if (counter == period-1) // we're the doer
+      {
+        counter = period; // entering barrier holding phase
+        cv.notify_all();
+        while (waiting < threads-1 && !dead)
+          cv.wait(lock);
+        // all other threads are now stuck in the barrier
+        this->periodic_barrier_work(); // NB: we're holding the mutex the whole time
+        // reset for next barrier, releasing other waiters
+        counter = 0;
+        generation ++;
+        cv.notify_all();
+        return;
+      }
+    else if (counter == period) // we're a waiter, in holding phase
+      {
+        waiting ++;
+        cv.notify_all();
+        while (counter == period && generation == prev_generation && !dead)
+          cv.wait(lock);
+        waiting --;
+        return;
+      }
+  }
+};
+
+
+
 ////////////////////////////////////////////////////////////////////////
 
 
@@ -1073,6 +1147,33 @@  struct sqlite_ps
 };
 
 
+////////////////////////////////////////////////////////////////////////
+
+
+struct sqlite_checkpoint_pb: public periodic_barrier
+{
+  sqlite_ps ckpt;
+
+  sqlite_checkpoint_pb(unsigned t, unsigned p):
+    periodic_barrier(t, p), ckpt(db, "periodic wal checkpoint",
+                                 "pragma wal_checkpoint(truncate);") {}
+  
+  void periodic_barrier_work() noexcept
+  {
+    try
+      {
+        ckpt.reset().step_ok_done();
+      }
+    catch (const reportable_exception& e)
+      {
+        e.report(clog);        
+      }
+  }
+};
+  
+static periodic_barrier* scan_barrier = 0; // initialized in main()
+
+
 ////////////////////////////////////////////////////////////////////////
 
 // RAII style templated autocloser
@@ -3136,6 +3237,7 @@  register_file_name(sqlite_ps& ps_upsert_fileparts,
   if (rc != SQLITE_ROW) throw sqlite_exception(rc, "step");
   
   int64_t id = sqlite3_column_int64 (ps_lookup_file, 0);
+  ps_lookup_file.reset();
   return id;
 }
 
@@ -3682,6 +3784,9 @@  thread_main_scanner (void* arg)
       scan_payload p;
 
       add_metric("thread_busy", "role", "scan", -1);
+      // NB: threads may be blocked within either of these two waiting
+      // states, if the work queue happens to run dry.  That's OK.
+      if (scan_barrier) scan_barrier->count();
       bool gotone = scanq.wait_front(p);
       add_metric("thread_busy", "role", "scan", 1);
 
@@ -4482,8 +4587,10 @@  main (int argc, char *argv[])
     obatched(clog) << "search concurrency " << concurrency << endl;
   obatched(clog) << "webapi connection pool " << connection_pool
                  << (connection_pool ? "" : " (unlimited)") << endl;
-  if (! passive_p)
+  if (! passive_p) {
     obatched(clog) << "rescan time " << rescan_s << endl;
+    obatched(clog) << "scan checkpoint " << scan_checkpoint << endl;
+  }
   obatched(clog) << "fdcache fds " << fdcache_fds << endl;
   obatched(clog) << "fdcache mbs " << fdcache_mbs << endl;
   obatched(clog) << "fdcache prefetch " << fdcache_prefetch << endl;
@@ -4525,6 +4632,9 @@  main (int argc, char *argv[])
 
       if (scan_files || scan_archives.size() > 0)
         {
+          if (scan_checkpoint > 0)
+            scan_barrier = new sqlite_checkpoint_pb(concurrency, (unsigned) scan_checkpoint);
+
           rc = pthread_create (& pt, NULL, thread_main_fts_source_paths, NULL);
           if (rc)
             error (EXIT_FAILURE, rc, "cannot spawn thread to traverse source paths\n");
@@ -4551,6 +4661,7 @@  main (int argc, char *argv[])
   while (! interrupted)
     pause ();
   scanq.nuke(); // wake up any remaining scanq-related threads, let them die
+  if (scan_barrier) scan_barrier->nuke(); // ... in case they're stuck in a barrier
   set_metric("ready", 0);
 
   if (verbose)
@@ -4576,6 +4687,7 @@  main (int argc, char *argv[])
     }
 
   debuginfod_pool_groom ();
+  delete scan_barrier;
 
   // NB: no problem with unconditional free here - an earlier failed regcomp would exit program
   (void) regfree (& file_include_regex);
diff --git a/doc/debuginfod.8 b/doc/debuginfod.8
index 5358aaba42c9..d4316bec8175 100644
--- a/doc/debuginfod.8
+++ b/doc/debuginfod.8
@@ -282,6 +282,13 @@  Disable scan of the dwarf source info of debuginfo sections.
 If a setup has no access to source code, the source info is not
 required.
 
+.TP
+.B "\-\-scan\-checkpoint=NUM"
+Run a synchronized SQLITE WAL checkpoint operation after every NUM
+completed archive or file scans.  This may slow down parallel scanning
+phase somewhat, but generate much smaller "-wal" temporary files on
+busy servers.  The default is 256.  Disabled if 0.
+
 .TP
 .B "\-v"
 Increase verbosity of logging to the standard error file descriptor.
diff --git a/tests/run-debuginfod-extraction.sh b/tests/run-debuginfod-extraction.sh
index 78014c868062..da6b25cf89dc 100755
--- a/tests/run-debuginfod-extraction.sh
+++ b/tests/run-debuginfod-extraction.sh
@@ -32,7 +32,7 @@  DB=${PWD}/.debuginfod_tmp.sqlite
 tempfiles $DB
 export DEBUGINFOD_CACHE_PATH=${PWD}/.client_cache
 
-env LD_LIBRARY_PATH=$ldpath ${abs_builddir}/../debuginfod/debuginfod $VERBOSE -d $DB -F -R -Z .tar.xz -Z .tar.bz2=bzcat -p $PORT1 -t0 -g0 -v R Z > vlog$PORT1 2>&1 &
+env LD_LIBRARY_PATH=$ldpath ${abs_builddir}/../debuginfod/debuginfod $VERBOSE -d $DB -F -R -Z .tar.xz -Z .tar.bz2=bzcat -p $PORT1 --scan-checkpoint=1 -t0 -g0 -v R Z > vlog$PORT1 2>&1 &
 PID1=$!
 tempfiles vlog$PORT1
 errfiles vlog$PORT1