Skip to content

Conversation

@mw5h
Copy link
Contributor

@mw5h mw5h commented Dec 18, 2025

bulkmerge: implement SST merging in bulk merge processor

Previously, the bulkMergeProcessor was a stub that returned placeholder
output URIs without performing any actual SST merging. This made the
distributed merge infrastructure non-functional for real workloads.

This commit implements the core merging logic in the processor by:

  1. Adding mergeSSTs method that identifies overlapping SSTs for each
    task's key range, creates an iterator over the external SST files,
    and writes merged output while respecting configurable size limits.

  2. Introducing a new Merge function that creates and executes the
    DistSQL merge flow, collects results from all processors, and sorts
    the output SSTs by their start key for ingestion.

  3. Adding comprehensive tests for both single-node and multi-node
    distributed merge scenarios, including test infrastructure for
    creating and verifying SST merge results.

The implementation now produces correctly merged, non-overlapping SST
files that can be ingested into ranges, making the distributed bulk
merge functionality operational.

Fixes: #156658
Release note: None

Co-Authored-By: Annie Pompa annie@cockroachlabs.com

sql: integrate distributed merge into DSC index backfill

Previously, the declarative schema changer (DSC) index backfiller only
partially supported the new distributed merge pipeline. It would generate SSTs,
but then didn't have the rest of the pipeline in place. Now that all components
of the pipeline are available, this completes the entire flow. Note, this
applies to the DSC only. A separate change will do something similar for the
legacy schema changer.

This change completes the integration by:

  • Introducing a registration bridge (bulk_bridge.go) to break import cycles
    between pkg/sql, pkg/sql/bulkmerge, and pkg/sql/bulkingest
  • Implementing runDistributedMerge and runDistributedIngest to coordinate
    the merge and ingest phases
  • Registering the Merge and IngestFiles implementations via init() functions
  • Updating logic tests to validate the new distributed merge flow
  • Removing placeholder tests that previously expected failures. The part that
    verified mixed version was specifically added to a mixed-version only test.

Informs: #158378
Epic: CRDB-48845

Release note: none

bulkmerge: prevent loopback processor hang on task failure

Prior to this change, the loopback processor in the distributed merge
pipeline could hang if a worker failed during execution. This commit
ensures that merge_coordinator closes the loopback as soon as it sees
meta.Err, preventing such hangs.

Also adds testing hooks via BulkMergeTestingKnobs, enabling injection of
errors for validation purposes.

Informs #156658
Epic: CRDB-48845
Release note: none

bulkmerge: fix SST boundary handling for column families

Previously, the merge processor would split SSTs at arbitrary byte
positions when reaching the target SST size, which could result in
splits within column families. Additionally, the split picker was
validating that SST boundaries were already at safe split points and
rejecting SSTs that weren't, which was both overly strict (rejecting
valid splits) and ineffective (the check itself was flawed).

This commit fixes both issues:

  1. In the merge processor, SST endpoints are now selected using
    EnsureSafeSplitKey() to find the next safe split point (row
    boundary) after reaching the target size, then using PrefixEnd()
    to properly bound the SST. This ensures we never split within
    a column family.

  2. In the split picker, the incorrect validation check has been
    removed. The merge processor now ensures SST boundaries are
    correct at write time, making validation at split-picking time
    redundant.

  3. In combine_file_info, merge span boundaries are now validated
    using EnsureSafeSplitKey() to ensure they start at safe split
    points.

Informs: #156658

Release note: none

Epic: CRDB-48845

bulksst: add utility to append one sst file set to another

Previously, there was no way to combine multiple SSTFiles structures,
which is needed when aggregating SST manifests from multiple processors
in distributed operations.

This commit adds an Append method to SSTFiles that combines SST slices,
row samples, and sums the total sizes. This utility will be used by the
distributed merge pipeline for IMPORT operations to collect SST metadata
from multiple import processors.

Informs: #156660
Release note: None

importer: optionally use distributed merge for IMPORT jobs

Previously, IMPORT jobs always used the legacy direct-ingest approach
where each processor directly ingested SSTs into the storage layer. This
approach doesn't work well for large imports as it can create excessive
L0 files and doesn't allow for merge-time optimizations.

This commit adds support for the distributed merge pipeline to IMPORT
operations, following the same pattern used by index backfills. The
implementation:

  1. Adds a new cluster setting bulkio.import.distributed_merge.mode to
    control whether distributed merge is used for imports (defaults to
    false for backward compatibility).

  2. Determines the merge mode at IMPORT planning time based on the
    cluster setting value and stores it in the job payload
    (ImportDetails.use_distributed_merge). This ensures the decision
    persists across job resumptions and isn't affected by setting changes
    during execution.

  3. Adds a version gate at V26_1 to ensure all nodes in the cluster
    support the new proto fields before allowing distributed merge to be
    enabled.

  4. When distributed merge is enabled, import processors write SSTs to
    node-local storage and emit file metadata instead of directly
    ingesting. The coordinator collects these SSTs and uses the bulkmerge
    package to merge and ingest them.

This harmonizes the IMPORT implementation with the index backfill
approach and provides a common foundation for future optimizations.

Fixes: #156660

Co-Authored-By: Jeff Swenson jeffswenson@betterthannull.com
Co-Authored-By: Faizan Qazi faizan@cockroachlabs.com

Release note (sql change): Added a new cluster setting
bulkio.import.distributed_merge.mode to enable distributed merge
support for IMPORT operations. When enabled (default: false), IMPORT
jobs will use a two-phase approach where import processors first write
SST files to local storage, then a coordinator merges and ingests them.
This can improve performance for large imports by reducing L0 file
counts and enabling merge-time optimizations. Note: This feature
requires all nodes to be running version 26.1 or later.

@mw5h mw5h requested review from a team as code owners December 18, 2025 23:54
@mw5h mw5h requested review from kev-cao and yuzefovich and removed request for a team December 18, 2025 23:54
@blathers-crl
Copy link

blathers-crl bot commented Dec 18, 2025

Thanks for opening a backport.

Before merging, please confirm that the change does not break backwards compatibility and otherwise complies with the backport policy. Include a brief release justification in the PR description explaining why the backport is appropriate. All backports must be reviewed by the TL for the owning area. While the stricter LTS policy does not yet apply, please exercise judgment and consider gating non-critical changes behind a disabled-by-default feature flag when appropriate.

@blathers-crl blathers-crl bot added backport Label PR's that are backports to older release branches T-sql-queries SQL Queries Team labels Dec 18, 2025
@blathers-crl
Copy link

blathers-crl bot commented Dec 18, 2025

Your pull request contains more than 1000 changes. It is strongly encouraged to split big PRs into smaller chunks.

🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf.

@cockroach-teamcity
Copy link
Member

This change is Reviewable

kyle-a-wong and others added 10 commits December 18, 2025 16:55
There have been a couple incidents recently where workloads running
multi day / million statement txns cause sql stats to have unbounded
memory growth. This is happening because SQL Stats are buffered by
sessionID until a transaction commits, which causes the buffer to
flush and the statement stats to be recorded in the SQL stats
subsystem. When there is a long running transaction with many
statements, we buffer is never flushed and continues to grow.

To fix this, we have introduced a new "flush" event that
forces the SQL stats ingester to flush sql stats in the current
session, if a certain threshold is met. This threshold is currently
set to the value of a new cluster setting:
`sql.metrics.transaction_details.max_statement_stats`.

If this threshold is met, the stats are automatically flushed. The side effect
of this is that these stats will not have an associated transaction
fingerprint id. This is because the transaction fingerprint id
isn't finalized until a transaction is complete and we don't know
the transaction fingerprint id at the time of this forced flush.
There is also no way to "backfill" or set the transaction
fingerprint id once the statement stat has been recorded to the
SQL Stats subsystem.

This commit also includes a change to the
`SQLStatsIngester.flushBuffer` method to recording transaction
stats when there are no statements stats in the buffer. This is
the case when the buffer is being force flushed in the scenario
mentioned above.

Fixes: cockroachdb#158800
Epic: None
Release note (bug fix): Addresses a bug with unbounded memory
growth when long running transactions are contain many statements.
In this case, the SQL Stats system will automatically flush these
before the transaction commits. The side effect of this is that
these statement statistics will no longer have an associated
transaction fingerprint id.
PR cockroachdb#158527 introduced a fix to sql stats for a bug
that could cause unbounded memory growth. The original
implementation of this change changed the underlying
`flushBuffer` function that is used in the normal happy
path of sql stats ingestion. In order to make this change
more safe for backporting, the implementation of this
fix is being changed such that the existing flush
logic is the same and only the "forceFlush" path
has new logic.

Epic: None
Release note: None
Fixes cockroachdb#159660

Previously, we don't reset the `optionalMultiValFilterSuffixLen` when we
find a necessary multi-val filter for a column. This would cause an incorrect
removal of an unoptional filter.

For example,
- Index: (x, y ,z)
- ON filters: x = 1, z IN (1, 2)
- Optional filters: y IN (3, 4)

Since the filters are processed with the order of columns in the index,
the filters will be processed in the order of `x = 1`, `y IN (3, 4)`,
`z IN (1, 2)`.

When the processing reaches `y IN (3, 4)`, since it is an optional filter,
`optionalMultiValFilterSuffixLen = 1`. But `z IN (1, 2)` is actually a
necessary filter, with `optionalMultiValFilterSuffixLen` not being reset,
it will be eventually wrongly removed.

Release note (bug fix): Fixed a bug causing a query predicate to be ignored
when the predicate is on a column following one or more ENUM columns in an
index, the predicate constrains the column to multiple values, and a lookup
join to the index is chosen for the query plan. This bug was introduced in
24.3.0 and present in all versions since.
This is done behind a testing knob which is only enabled in KVNemesis
for now. This should help us track the source of a sys bytes stats
mismatch, should one arise.

Informs cockroachdb#159425

Epic: none

Release note: None
This commit removes the now-unused testing knob BaseQueuePostEnqueueInterceptor,
following the removal of TestPriorityInversionRequeue.
Previously, the bulkMergeProcessor was a stub that returned placeholder
output URIs without performing any actual SST merging. This made the
distributed merge infrastructure non-functional for real workloads.

This commit implements the core merging logic in the processor by:

1. Adding `mergeSSTs` method that identifies overlapping SSTs for each
   task's key range, creates an iterator over the external SST files,
   and writes merged output while respecting configurable size limits.

2. Introducing a new `Merge` function that creates and executes the
   DistSQL merge flow, collects results from all processors, and sorts
   the output SSTs by their start key for ingestion.

3. Adding comprehensive tests for both single-node and multi-node
   distributed merge scenarios, including test infrastructure for
   creating and verifying SST merge results.

The implementation now produces correctly merged, non-overlapping SST
files that can be ingested into ranges, making the distributed bulk
merge functionality operational.

Fixes: cockroachdb#156658
Release note: None

Co-Authored-By: Annie Pompa <annie@cockroachlabs.com>
Previously, the declarative schema changer (DSC) index backfiller only
partially supported the new distributed merge pipeline. It would generate SSTs,
but then didn't have the rest of the pipeline in place. Now that all components
of the pipeline are available, this completes the entire flow. Note, this
applies to the DSC only. A separate change will do something similar for the
legacy schema changer.

This change completes the integration by:
- Introducing a registration bridge (bulk_bridge.go) to break import cycles
between pkg/sql, pkg/sql/bulkmerge, and pkg/sql/bulkingest
- Implementing runDistributedMerge and runDistributedIngest to coordinate
the merge and ingest phases
- Registering the Merge and IngestFiles implementations via init() functions
- Updating logic tests to validate the new distributed merge flow
- Removing placeholder tests that previously expected failures. The part that
verified mixed version was specifically added to a mixed-version only test.

Informs: cockroachdb#158378
Epic: CRDB-48845

Release note: none
Prior to this change, the loopback processor in the distributed merge
pipeline could hang if a worker failed during execution. This commit
ensures that merge_coordinator closes the loopback as soon as it sees
meta.Err, preventing such hangs.

Also adds testing hooks via BulkMergeTestingKnobs, enabling injection of
errors for validation purposes.

Informs cockroachdb#156658
Epic: CRDB-48845
Release note: none
Previously, the merge processor would split SSTs at arbitrary byte
positions when reaching the target SST size, which could result in
splits within column families. Additionally, the split picker was
validating that SST boundaries were already at safe split points and
rejecting SSTs that weren't, which was both overly strict (rejecting
valid splits) and ineffective (the check itself was flawed).

This commit fixes both issues:

1. In the merge processor, SST endpoints are now selected using
   EnsureSafeSplitKey() to find the next safe split point (row
   boundary) after reaching the target size, then using PrefixEnd()
   to properly bound the SST. This ensures we never split within
   a column family.

2. In the split picker, the incorrect validation check has been
   removed. The merge processor now ensures SST boundaries are
   correct at write time, making validation at split-picking time
   redundant.

3. In combine_file_info, merge span boundaries are now validated
   using EnsureSafeSplitKey() to ensure they start at safe split
   points.

Informs: cockroachdb#156658

Release note: none

Epic: CRDB-48845
mw5h added 2 commits December 18, 2025 16:55
Previously, there was no way to combine multiple SSTFiles structures,
which is needed when aggregating SST manifests from multiple processors
in distributed operations.

This commit adds an Append method to SSTFiles that combines SST slices,
row samples, and sums the total sizes. This utility will be used by the
distributed merge pipeline for IMPORT operations to collect SST metadata
from multiple import processors.

Informs: cockroachdb#156660
Release note: None
Previously, IMPORT jobs always used the legacy direct-ingest approach
where each processor directly ingested SSTs into the storage layer. This
approach doesn't work well for large imports as it can create excessive
L0 files and doesn't allow for merge-time optimizations.

This commit adds support for the distributed merge pipeline to IMPORT
operations, following the same pattern used by index backfills. The
implementation:

1. Adds a new cluster setting `bulkio.import.distributed_merge.mode` to
   control whether distributed merge is used for imports (defaults to
   false for backward compatibility).

2. Determines the merge mode at IMPORT planning time based on the
   cluster setting value and stores it in the job payload
   (ImportDetails.use_distributed_merge). This ensures the decision
   persists across job resumptions and isn't affected by setting changes
   during execution.

3. Adds a version gate at V26_1 to ensure all nodes in the cluster
   support the new proto fields before allowing distributed merge to be
   enabled.

4. When distributed merge is enabled, import processors write SSTs to
   node-local storage and emit file metadata instead of directly
   ingesting. The coordinator collects these SSTs and uses the bulkmerge
   package to merge and ingest them.

This harmonizes the IMPORT implementation with the index backfill
approach and provides a common foundation for future optimizations.

Fixes: cockroachdb#156660

Co-Authored-By: Jeff Swenson <jeffswenson@betterthannull.com>
Co-Authored-By: Faizan Qazi <faizan@cockroachlabs.com>

Release note (sql change): Added a new cluster setting
`bulkio.import.distributed_merge.mode` to enable distributed merge
support for IMPORT operations. When enabled (default: false), IMPORT
jobs will use a two-phase approach where import processors first write
SST files to local storage, then a coordinator merges and ingests them.
This can improve performance for large imports by reducing L0 file
counts and enabling merge-time optimizations. Note: This feature
requires all nodes to be running version 26.1 or later.
@mw5h mw5h force-pushed the backportrelease-26.1-159208 branch from ee93bae to 057f914 Compare December 19, 2025 00:56
@mw5h mw5h requested review from a team as code owners December 19, 2025 00:56
@mw5h mw5h requested review from alyshanjahani-crl and removed request for a team December 19, 2025 00:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport Label PR's that are backports to older release branches T-sql-queries SQL Queries Team

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants