[3/3] debuginfod: optimize extraction from seekable xz archives

Message ID 54b1cd82e066537ce232940175e2e1b485b648dd.1720644134.git.osandov@fb.com
State Superseded
Headers
Series debuginfod: speed up extraction from kernel debuginfo packages by 200x |

Commit Message

Omar Sandoval July 10, 2024, 8:47 p.m. UTC
  From: Omar Sandoval <osandov@fb.com>

The kernel debuginfo packages on Fedora, Debian, and Ubuntu, and many of
their downstreams, are all compressed with xz in multi-threaded mode,
which allows random access.  We can use this to bypass the full archive
extraction and dramatically speed up kernel debuginfo requests (from ~50
seconds in the worst case to < 0.25 seconds).

This works because multi-threaded xz compression splits up the stream
into many independently compressed blocks.  The stream ends with an
index of blocks.  So, to seek to an offset, we find the block containing
that offset in the index and then decompress and throw away data until
we reach the offset within the block.  We can then decompress the
desired amount of data, possibly from subsequent blocks.  There's no
high-level API in liblzma to do this, but we can do it by stitching
together a few low-level APIs.

Since we now save the uncompressed offset and size of each archive file,
pass that information down so we can do the optimized extraction when
applicable.

Signed-off-by: Omar Sandoval <osandov@fb.com>
---
 debuginfod/debuginfod.cxx | 431 ++++++++++++++++++++++++++++++++++++--
 1 file changed, 417 insertions(+), 14 deletions(-)
  

Patch

diff --git a/debuginfod/debuginfod.cxx b/debuginfod/debuginfod.cxx
index 95a7d941..c3822be3 100644
--- a/debuginfod/debuginfod.cxx
+++ b/debuginfod/debuginfod.cxx
@@ -1971,6 +1971,16 @@  handle_buildid_f_match (bool internal_req_t,
 
 
 #ifdef USE_LZMA
+struct lzma_exception: public reportable_exception
+{
+  lzma_exception(int rc, const string& msg):
+    // liblzma doesn't have a lzma_ret -> string conversion function, so just
+    // report the value.
+    reportable_exception(string ("lzma error: ") + msg + ": error " + to_string(rc)) {
+      inc_metric("error_count","lzma",to_string(rc));
+    }
+};
+
 // Neither RPM nor deb files support seeking to a specific file in the package.
 // Instead, to extract a specific file, we normally need to read the archive
 // sequentially until we find the file.  This is very slow for files at the end
@@ -2094,12 +2104,352 @@  is_seekable_archive (const string& rps, struct archive* a)
   // The file is only seekable if it has more than one Block.
   return num_records > 1;
 }
+
+// Read the Index at the end of an xz file.
+static lzma_index*
+read_xz_index (int fd)
+{
+  off_t footer_pos = -LZMA_STREAM_HEADER_SIZE;
+  if (lseek (fd, footer_pos, SEEK_END) == -1)
+    throw libc_exception (errno, "lseek");
+
+  uint8_t footer[LZMA_STREAM_HEADER_SIZE];
+  size_t footer_read = 0;
+  while (footer_read < sizeof (footer))
+    {
+      ssize_t bytes_read = read (fd, footer + footer_read,
+                                 sizeof (footer) - footer_read);
+      if (bytes_read < 0)
+        {
+          if (errno == EINTR)
+            continue;
+          throw libc_exception (errno, "read");
+        }
+      if (bytes_read == 0)
+        throw reportable_exception ("truncated file");
+      footer_read += bytes_read;
+    }
+
+  lzma_stream_flags stream_flags;
+  lzma_ret ret = lzma_stream_footer_decode (&stream_flags, footer);
+  if (ret != LZMA_OK)
+    throw lzma_exception (ret, "lzma_stream_footer_decode");
+
+  if (lseek (fd, footer_pos - stream_flags.backward_size, SEEK_END) == -1)
+    throw libc_exception (errno, "lseek");
+
+  lzma_stream strm = LZMA_STREAM_INIT;
+  lzma_index* index = NULL;
+  ret = lzma_index_decoder (&strm, &index, UINT64_MAX);
+  if (ret != LZMA_OK)
+    throw lzma_exception (ret, "lzma_index_decoder");
+  defer_dtor<lzma_stream*,void> strm_ender (&strm, lzma_end);
+
+  uint8_t in_buf[4096];
+  while (true)
+    {
+      if (strm.avail_in == 0)
+        {
+          ssize_t bytes_read = read (fd, in_buf, sizeof (in_buf));
+          if (bytes_read < 0)
+            {
+              if (errno == EINTR)
+                continue;
+              throw libc_exception (errno, "read");
+            }
+          if (bytes_read == 0)
+            throw reportable_exception ("truncated file");
+          strm.avail_in = bytes_read;
+          strm.next_in = in_buf;
+        }
+
+        ret = lzma_code (&strm, LZMA_RUN);
+        if (ret == LZMA_STREAM_END)
+          break;
+        else if (ret != LZMA_OK)
+          throw lzma_exception (ret, "lzma_code index");
+    }
+
+  ret = lzma_index_stream_flags (index, &stream_flags);
+  if (ret != LZMA_OK)
+    {
+      lzma_index_end (index, NULL);
+      throw lzma_exception (ret, "lzma_index_stream_flags");
+    }
+  return index;
+}
+
+static void
+my_lzma_index_end (lzma_index* index)
+{
+  lzma_index_end (index, NULL);
+}
+
+static void
+free_lzma_block_filter_options (lzma_block* block)
+{
+  for (int i = 0; i < LZMA_FILTERS_MAX; i++)
+    {
+      free (block->filters[i].options);
+      block->filters[i].options = NULL;
+    }
+}
+
+static void
+free_lzma_block_filters (lzma_block* block)
+{
+  if (block->filters != NULL)
+    {
+      free_lzma_block_filter_options (block);
+      free (block->filters);
+    }
+}
+
+static void
+extract_xz_blocks_into_fd (const string& srcpath,
+                           int src,
+                           int dst,
+                           lzma_index_iter* iter,
+                           uint64_t offset,
+                           uint64_t size)
+{
+  // Seek to the Block.  Seeking from the end using the compressed size from the
+  // footer means we don't need to know where the xz stream starts in the
+  // archive.
+  if (lseek (src,
+             (off_t) iter->block.compressed_stream_offset
+             - (off_t) iter->stream.compressed_size,
+             SEEK_END) == -1)
+    throw libc_exception (errno, "lseek");
+
+  offset -= iter->block.uncompressed_file_offset;
+
+  lzma_block block{};
+  block.filters = (lzma_filter*) calloc (LZMA_FILTERS_MAX + 1,
+                                         sizeof (lzma_filter));
+  if (block.filters == NULL)
+    throw libc_exception (ENOMEM, "cannot allocate lzma_block filters");
+  defer_dtor<lzma_block*,void> filters_freer (&block, free_lzma_block_filters);
+
+  uint8_t in_buf[4096];
+  uint8_t out_buf[4096];
+  size_t header_read = 0;
+  bool need_log_extracting = verbose > 3;
+  while (true)
+    {
+      // The first byte of the Block is the encoded Block Header Size.  Read the
+      // first byte and whatever extra fits in the buffer.
+      while (header_read == 0)
+        {
+          ssize_t bytes_read = read (src, in_buf, sizeof (in_buf));
+          if (bytes_read < 0)
+            {
+              if (errno == EINTR)
+                continue;
+              throw libc_exception (errno, "read");
+            }
+          if (bytes_read == 0)
+            throw reportable_exception ("truncated file");
+          header_read += bytes_read;
+        }
+
+      block.header_size = lzma_block_header_size_decode (in_buf[0]);
+
+      // If we didn't buffer the whole Block Header earlier, get the rest.
+      eu_static_assert (sizeof (in_buf)
+                        >= lzma_block_header_size_decode (UINT8_MAX));
+      while (header_read < block.header_size)
+        {
+          ssize_t bytes_read = read (src, in_buf + header_read,
+                                     sizeof (in_buf) - header_read);
+          if (bytes_read < 0)
+            {
+              if (errno == EINTR)
+                continue;
+              throw libc_exception (errno, "read");
+            }
+          if (bytes_read == 0)
+            throw reportable_exception ("truncated file");
+          header_read += bytes_read;
+        }
+
+      // Decode the Block Header.
+      block.check = iter->stream.flags->check;
+      lzma_ret ret = lzma_block_header_decode (&block, NULL, in_buf);
+      if (ret != LZMA_OK)
+        throw lzma_exception (ret, "lzma_block_header_decode");
+      ret = lzma_block_compressed_size (&block, iter->block.unpadded_size);
+      if (ret != LZMA_OK)
+        throw lzma_exception (ret, "lzma_block_compressed_size");
+
+      // Start decoding the Block data.
+      lzma_stream strm = LZMA_STREAM_INIT;
+      ret = lzma_block_decoder (&strm, &block);
+      if (ret != LZMA_OK)
+        throw lzma_exception (ret, "lzma_block_decoder");
+      defer_dtor<lzma_stream*,void> strm_ender (&strm, lzma_end);
+
+      // We might still have some input buffered from when we read the header.
+      strm.avail_in = header_read - block.header_size;
+      strm.next_in = in_buf + block.header_size;
+      strm.avail_out = sizeof (out_buf);
+      strm.next_out = out_buf;
+      while (true)
+        {
+          if (strm.avail_in == 0)
+            {
+              ssize_t bytes_read = read (src, in_buf, sizeof (in_buf));
+              if (bytes_read < 0)
+                {
+                  if (errno == EINTR)
+                    continue;
+                  throw libc_exception (errno, "read");
+                }
+              if (bytes_read == 0)
+                throw reportable_exception ("truncated file");
+              strm.avail_in = bytes_read;
+              strm.next_in = in_buf;
+            }
+
+          ret = lzma_code (&strm, LZMA_RUN);
+          if (ret != LZMA_OK && ret != LZMA_STREAM_END)
+            throw lzma_exception (ret, "lzma_code block");
+
+          // Throw away anything we decode until we reach the offset, then
+          // start writing to the destination.
+          if (strm.total_out > offset)
+            {
+              size_t bytes_to_write = strm.next_out - out_buf;
+              uint8_t* buf_to_write = out_buf;
+
+              // Ignore anything in the buffer before the offset.
+              if (bytes_to_write > strm.total_out - offset)
+                {
+                  buf_to_write += bytes_to_write - (strm.total_out - offset);
+                  bytes_to_write = strm.total_out - offset;
+                }
+
+              // Ignore anything after the size.
+              if (strm.total_out - offset >= size)
+                bytes_to_write -= strm.total_out - offset - size;
+
+              if (need_log_extracting)
+                {
+                  obatched(clog) << "extracting from xz archive " << srcpath
+                                 << " size=" << size << endl;
+                  need_log_extracting = false;
+                }
+
+              while (bytes_to_write > 0)
+                {
+                  ssize_t written = write (dst, buf_to_write, bytes_to_write);
+                  if (written < 0)
+                    {
+                      if (errno == EAGAIN)
+                        continue;
+                      throw libc_exception (errno, "write");
+                    }
+                  bytes_to_write -= written;
+                  buf_to_write += written;
+                }
+
+              // If we reached the size, we're done.
+              if (strm.total_out - offset >= size)
+                return;
+            }
+
+          strm.avail_out = sizeof (out_buf);
+          strm.next_out = out_buf;
+
+          if (ret == LZMA_STREAM_END)
+            break;
+        }
+
+      // This Block didn't have enough data.  Go to the next one.
+      if (lzma_index_iter_next (iter, LZMA_INDEX_ITER_BLOCK))
+        throw reportable_exception ("no more blocks");
+      if (strm.total_out > offset)
+        size -= strm.total_out - offset;
+      offset = 0;
+      // If we had any buffered input left, move it to the beginning of the
+      // buffer to decode the next Block Header.
+      if (strm.avail_in > 0)
+        {
+          memmove (in_buf, strm.next_in, strm.avail_in);
+          header_read = strm.avail_in;
+        }
+      else
+        header_read = 0;
+      free_lzma_block_filter_options (&block);
+    }
+}
+
+static int
+extract_from_seekable_archive (const string& srcpath,
+                               char* tmppath,
+                               uint64_t offset,
+                               uint64_t size)
+{
+  int src = open (srcpath.c_str(), O_RDONLY);
+  if (src < 0)
+    throw libc_exception (errno, string("open ") + srcpath);
+  defer_dtor<int,int> src_closer (src, close);
+
+  try
+    {
+      lzma_index* index = read_xz_index (src);
+      defer_dtor<lzma_index*,void> index_ender (index, my_lzma_index_end);
+
+      // Find the Block containing the offset.
+      lzma_index_iter iter;
+      lzma_index_iter_init (&iter, index);
+      if (lzma_index_iter_locate (&iter, offset))
+        throw reportable_exception ("offset not found");
+
+      if (verbose > 3)
+        obatched(clog) << "seeking in xz archive " << srcpath
+                       << " offset=" << offset << " block_offset="
+                       << iter.block.uncompressed_file_offset << endl;
+
+      int dst = mkstemp (tmppath);
+      if (dst < 0)
+        throw libc_exception (errno, "cannot create temporary file");
+
+      try
+        {
+          extract_xz_blocks_into_fd (srcpath, src, dst, &iter, offset, size);
+        }
+      catch (...)
+        {
+          unlink (tmppath);
+          close (dst);
+          throw;
+        }
+
+      return dst;
+    }
+  catch (const reportable_exception &e)
+    {
+      if (verbose)
+        obatched(clog) << "failed to extract from seekable archive " << srcpath
+                       << ": " << e.message << endl;
+      return -1;
+    }
+}
 #else
 static bool
 is_seekable_archive (const string& rps, struct archive* a)
 {
   return false;
 }
+static int
+extract_from_seekable_archive (const string& srcpath,
+                               char* tmppath,
+                               uint64_t offset,
+                               uint64_t size)
+{
+  return -1;
+}
 #endif
 
 
@@ -2199,9 +2549,12 @@  create_buildid_r_response (int64_t b_mtime0,
 
 static struct MHD_Response*
 handle_buildid_r_match (bool internal_req_p,
-                        int64_t b_mtime,
+                        int64_t b_mtime0,
                         const string& b_source0,
                         const string& b_source1,
+                        int64_t b_size,
+                        int64_t b_mtime1,
+                        int64_t b_uncompressed_offset,
                         const string& section,
                         int *result_fd)
 {
@@ -2213,7 +2566,7 @@  handle_buildid_r_match (bool internal_req_p,
   if (rc != 0)
     throw libc_exception (errno, string("stat ") + b_source0);
 
-  if ((int64_t) fs.st_mtime != b_mtime)
+  if ((int64_t) fs.st_mtime != b_mtime0)
     {
       if (verbose)
         obatched(clog) << "mtime mismatch for " << b_source0 << endl;
@@ -2373,7 +2726,7 @@  handle_buildid_r_match (bool internal_req_p,
           break; // branch out of if "loop", to try new libarchive fetch attempt
         }
 
-      struct MHD_Response* r = create_buildid_r_response (b_mtime, b_source0,
+      struct MHD_Response* r = create_buildid_r_response (b_mtime0, b_source0,
                                                           b_source1, section,
                                                           ima_sig, NULL, fd,
                                                           fs.st_size,
@@ -2388,7 +2741,41 @@  handle_buildid_r_match (bool internal_req_p,
       // NB: see, we never go around the 'loop' more than once
     }
 
-  // no match ... grumble, must process the archive
+  // no match ... if the archive is seekable, try that first
+  if (b_uncompressed_offset > 0)
+    {
+      char* tmppath = NULL;
+      if (asprintf (&tmppath, "%s/debuginfod-fdcache.XXXXXX", tmpdir.c_str()) < 0)
+        throw libc_exception (ENOMEM, "cannot allocate tmppath");
+      defer_dtor<void*,void> tmmpath_freer (tmppath, free);
+
+      fd = extract_from_seekable_archive (b_source0, tmppath,
+                                          b_uncompressed_offset, b_size);
+      if (fd >= 0)
+        {
+          // Set the mtime so the fdcache file mtimes propagate to future webapi
+          // clients.
+          struct timespec tvs[2];
+          tvs[0].tv_sec = 0;
+          tvs[0].tv_nsec = UTIME_OMIT;
+          tvs[1].tv_sec = b_mtime1;
+          tvs[1].tv_nsec = 0;
+          (void) futimens (fd, tvs);  /* best effort */
+          struct MHD_Response* r = create_buildid_r_response (b_mtime0,
+                                                              b_source0,
+                                                              b_source1,
+                                                              section, ima_sig,
+                                                              tmppath, fd,
+                                                              b_size, b_mtime1,
+                                                              "seekable archive",
+                                                              extract_begin);
+          if (r != 0 && result_fd)
+            *result_fd = fd;
+          return r;
+        }
+    }
+
+  // still nothing ...  process the archive
   string archive_decoder = "/dev/null";
   string archive_extension = "";
   for (auto&& arch : scan_archives)
@@ -2512,7 +2899,7 @@  handle_buildid_r_match (bool internal_req_p,
           continue;
         }
 
-      r = create_buildid_r_response (b_mtime, b_source0, b_source1, section,
+      r = create_buildid_r_response (b_mtime0, b_source0, b_source1, section,
                                      ima_sig, tmppath, fd,
                                      archive_entry_size(e),
                                      archive_entry_mtime(e),
@@ -2583,21 +2970,26 @@  and will not query the upstream servers");
 
 static struct MHD_Response*
 handle_buildid_match (bool internal_req_p,
-                      int64_t b_mtime,
+                      int64_t b_mtime0,
                       const string& b_stype,
                       const string& b_source0,
                       const string& b_source1,
+                      int64_t b_size,
+                      int64_t b_mtime1,
+                      int64_t b_uncompressed_offset,
                       const string& section,
                       int *result_fd)
 {
   try
     {
       if (b_stype == "F")
-        return handle_buildid_f_match(internal_req_p, b_mtime, b_source0,
+        return handle_buildid_f_match(internal_req_p, b_mtime0, b_source0,
 				      section, result_fd);
       else if (b_stype == "R")
-        return handle_buildid_r_match(internal_req_p, b_mtime, b_source0,
-				      b_source1, section, result_fd);
+        return handle_buildid_r_match(internal_req_p, b_mtime0, b_source0,
+                                      b_source1, b_size, b_mtime1,
+                                      b_uncompressed_offset, section,
+                                      result_fd);
     }
   catch (const reportable_exception &e)
     {
@@ -2713,7 +3105,7 @@  handle_buildid (MHD_Connection* conn,
   if (atype_code == "D")
     {
       pp = new sqlite_ps (thisdb, "mhd-query-d",
-                          "select mtime0, sourcetype, source0, source1 from " BUILDIDS "_query_d where buildid = ? "
+                          "select mtime0, sourcetype, source0, source1, size, mtime1, uncompressed_offset from " BUILDIDS "_query_d where buildid = ? "
                           "order by mtime0 desc");
       pp->reset();
       pp->bind(1, buildid);
@@ -2721,7 +3113,7 @@  handle_buildid (MHD_Connection* conn,
   else if (atype_code == "E")
     {
       pp = new sqlite_ps (thisdb, "mhd-query-e",
-                          "select mtime0, sourcetype, source0, source1 from " BUILDIDS "_query_e where buildid = ? "
+                          "select mtime0, sourcetype, source0, source1, size, mtime1, uncompressed_offset from " BUILDIDS "_query_e where buildid = ? "
                           "order by mtime0 desc");
       pp->reset();
       pp->bind(1, buildid);
@@ -2765,19 +3157,30 @@  handle_buildid (MHD_Connection* conn,
       if (rc != SQLITE_ROW)
         throw sqlite_exception(rc, "step");
 
-      int64_t b_mtime = sqlite3_column_int64 (*pp, 0);
+      int64_t b_mtime0 = sqlite3_column_int64 (*pp, 0);
       string b_stype = string((const char*) sqlite3_column_text (*pp, 1) ?: ""); /* by DDL may not be NULL */
       string b_source0 = string((const char*) sqlite3_column_text (*pp, 2) ?: ""); /* may be NULL */
       string b_source1 = string((const char*) sqlite3_column_text (*pp, 3) ?: ""); /* may be NULL */
+      int64_t b_size = 0;
+      int64_t b_mtime1 = 0;
+      int64_t b_uncompressed_offset = 0;
+      if (atype_code == "D" || atype_code == "E")
+        {
+          b_mtime1 = sqlite3_column_int64 (*pp, 5);
+          // These may be NULL, in which case sqlite3_column_int64() returns 0.
+          b_size = sqlite3_column_int64 (*pp, 4);
+          b_uncompressed_offset = sqlite3_column_int64 (*pp, 6);
+        }
 
       if (verbose > 1)
-        obatched(clog) << "found mtime=" << b_mtime << " stype=" << b_stype
+        obatched(clog) << "found mtime=" << b_mtime0 << " stype=" << b_stype
              << " source0=" << b_source0 << " source1=" << b_source1 << endl;
 
       // Try accessing the located match.
       // XXX: in case of multiple matches, attempt them in parallel?
       auto r = handle_buildid_match (conn ? false : true,
-                                     b_mtime, b_stype, b_source0, b_source1,
+                                     b_mtime0, b_stype, b_source0, b_source1,
+                                     b_size, b_mtime1, b_uncompressed_offset,
 				     section, result_fd);
       if (r)
         return r;