Skip to content

Conversation

@HanaHalitim
Copy link

Implemented delta encoding compression for SystemDS. Added ColGroupDeltaDDC compression type that stores row differences instead of absolute values, improving compression for data with predictable patterns.

Created delta readers that compute row differences on-the-fly during compression, avoiding delta matrix materialization. Wired CUMSUM and ROWCUMSUM operations to automatically use delta encoding for their results.

Extended compression estimation with preferDeltaEncoding flag to evaluate delta encoding as a compression option. Fixed dictionary remapping bug where extractValues() reordered entries, breaking row-to-dictionary mappings.

All tests pass including ColGroupDeltaDDCTest, CLALibUnaryDeltaTest, ReadersDeltaTest, and related compression tests.

@janniklinde
Copy link
Contributor

Thanks for the good first PR @HanaHalitim which also includes plenty of tests . There are a few things that need to be addressed. I will leave some comments in the code.

This comment was marked as resolved.

This comment was marked as resolved.

// }
public static AColGroup create(IColIndex colIndexes, IDictionary dict, AMapToData data, int[] cachedCounts) {
if(data.getUnique() == 1)
return ColGroupConst.create(colIndexes, dict);

This comment was marked as resolved.

for(int j = 0; j < nCol; j++) {
prevRow[j] = prevRowData[prevOff + _colIndexes.get(j)];
}
}

This comment was marked as resolved.

oldIdToNewId[dac.id] = i;
idx += colIndexes.size();
}
IDictionary dict = new DeltaDictionary(dictValues, colIndexes.size());

This comment was marked as resolved.


package org.apache.sysds.test.component.compress.estim.encoding;

import static org.junit.Assert.assertEquals;

This comment was marked as resolved.

This comment was marked as resolved.

- Fixed incorrect decompression logic for rl > 0 (partial ranges).
- Removed unnecessary empty constructors.
- Overrode unsupported DDC methods in ColGroupDeltaDDC.
- Corrected ColGroupDeltaDDC.create for constant conversion.
- Fixed dictionary allocation size for extra flag in ColGroupFactory.
- Optimized CUMSUM/ROWCUMSUM to reinterpret DDC groups as DeltaDDC.
- Strengthened EncodeDeltaTest assertions and added combine() tests.
- Added new tests for partial range decompression and serialization.
- Removed unused imports.
- Implemented DeltaDDC conversion to DDC for unsupported scalar/unary ops (e.g., K-Means).

- Added comprehensive tests for relational and unary operations in ColGroupDeltaDDCTest.
@janniklinde
Copy link
Contributor

Thank you for the improvements and the fix for the failing test case @HanaHalitim. I will leave a few comments in the code that I think should be addressed before merging.


double[] dictVals = null;
try {
dictVals = _dict.getValues();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this wrapped in a try ... catch? I don't think that there is a scenario where this would fail

prevRow[j] = val;
}
}
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this case actually happen? Otherwise remove that null check

else if(ct == CompressionType.DeltaDDC) {
return directCompressDeltaDDC(colIndexes, cg);
}
else if(ct == CompressionType.CONST && cs.preferDeltaEncoding) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would you encode CONST as DeltaDDC?

}
final IntArrayList[] of = ubm.getOffsetList();
if(of.length == 1 && of[0].size() == nRow) { // If this always constant
if(of.length == 1 && of[0].size() == nRow && ct != CompressionType.DeltaDDC) { // If this always constant
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would you encode CONST as DeltaDDC?

final int r = m.getNumRows();
final int c = m.getNumColumns();

if(Builtin.isBuiltinCode(op.fn, BuiltinCode.CUMSUM, BuiltinCode.ROWCUMSUM)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't handle the ROWCUMSUM case, it would not be efficient (and DDC reinterpretation would be wrong)


if(allDDC && !groups.isEmpty()) {
MatrixBlock uncompressed = m.getUncompressed("CUMSUM/ROWCUMSUM requires uncompression", op.getNumThreads());
MatrixBlock opResult = uncompressed.unaryOperations(op, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't uncompress to do this redundant operation. In case of CUMSUM the reinterpretation should always be correct, so no need to verify.

Comment on lines 89 to 113
MatrixBlock uncompressed = m.getUncompressed("CUMSUM/ROWCUMSUM requires uncompression", op.getNumThreads());
MatrixBlock opResult = uncompressed.unaryOperations(op, null);

CompressionSettingsBuilder csb = new CompressionSettingsBuilder();
csb.clearValidCompression();
csb.setPreferDeltaEncoding(true);
csb.addValidCompression(CompressionType.DeltaDDC);
csb.addValidCompression(CompressionType.UNCOMPRESSED);
csb.setTransposeInput("false");
Pair<MatrixBlock, CompressionStatistics> compressedPair = CompressedMatrixBlockFactory.compress(opResult, op.getNumThreads(), csb);
MatrixBlock compressedResult = compressedPair.getLeft();

if(compressedResult == null) {
compressedResult = opResult;
}

CompressedMatrixBlock finalResult;
if(compressedResult instanceof CompressedMatrixBlock) {
finalResult = (CompressedMatrixBlock) compressedResult;
}
else {
finalResult = CompressedMatrixBlockFactory.genUncompressedCompressedMatrixBlock(compressedResult);
}

return finalResult;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is unnecessary. Let it just fall through and let the case LibMatrixAgg.isSupportedUnaryOperator(op) handle it.


return finalResult;
}

Copy link
Contributor

@janniklinde janniklinde Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might make more sense to put the entire if branch below if (m.isEmpty())...


protected final DArrCounts create(DblArray key, int id) {
return new DArrCounts(key, id);
return new DArrCounts(new DblArray(key), id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to create a copy of key because new DArrCounts(...) already takes care of that. So you can safely revert that change

@github-project-automation github-project-automation bot moved this from In Progress to In Review in SystemDS PR Queue Dec 12, 2025
Comment on lines +189 to +232
@Override
protected void decompressToSparseBlockDenseDictionary(SparseBlock ret, int rl, int ru, int offR, int offC,
double[] values) {
final int nCol = _colIndexes.size();
final double[] prevRow = new double[nCol];

if(rl > 0) {
final int dictIdx0 = _data.getIndex(0);
final int rowIndex0 = dictIdx0 * nCol;
for(int j = 0; j < nCol; j++) {
prevRow[j] = values[rowIndex0 + j];
}
for(int i = 1; i < rl; i++) {
final int dictIdx = _data.getIndex(i);
final int rowIndex = dictIdx * nCol;
for(int j = 0; j < nCol; j++) {
prevRow[j] += values[rowIndex + j];
}
}
}

for(int i = rl, offT = rl + offR; i < ru; i++, offT++) {
final int dictIdx = _data.getIndex(i);
final int rowIndex = dictIdx * nCol;

if(i == 0 && rl == 0) {
for(int j = 0; j < nCol; j++) {
final double value = values[rowIndex + j];
final int colIdx = _colIndexes.get(j);
ret.append(offT, colIdx + offC, value);
prevRow[j] = value;
}
}
else {
for(int j = 0; j < nCol; j++) {
final double delta = values[rowIndex + j];
final double newValue = prevRow[j] + delta;
final int colIdx = _colIndexes.get(j);
ret.append(offT, colIdx + offC, newValue);
prevRow[j] = newValue;
}
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that this method is covered by any test case

@Override
public AColGroup scalarOperation(ScalarOperator op) {
if(op.fn instanceof Multiply || op.fn instanceof Divide) {
return super.scalarOperation(op);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Untested

return super.scalarOperation(op);
}
else if(op.fn instanceof Plus || op.fn instanceof Minus) {
return scalarOperationShift(op);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Untested


if(nCol == 1) {
DoubleCountHashMap map = new DoubleCountHashMap(16);
AMapToData mapData = MapToFactory.create(nRow, 256);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if number of unique items > 256?

}
else {
DblArrayCountHashMap map = new DblArrayCountHashMap(16);
AMapToData mapData = MapToFactory.create(nRow, 256);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if number of unique items > 256?

Comment on lines 451 to 498
@Override
public AColGroup sliceRows(int rl, int ru) {
AMapToData slicedData = _data.slice(rl, ru);
final int nCol = _colIndexes.size();
double[] firstRowValues = new double[nCol];
double[] dictVals = ((DeltaDictionary)_dict).getValues();

for(int i = 0; i <= rl; i++) {
int dictIdx = _data.getIndex(i);
int dictOffset = dictIdx * nCol;
if(i == 0) {
for(int j = 0; j < nCol; j++) firstRowValues[j] = dictVals[dictOffset + j];
} else {
for(int j = 0; j < nCol; j++) firstRowValues[j] += dictVals[dictOffset + j];
}
}

int nEntries = dictVals.length / nCol;
int newId = -1;
for(int k = 0; k < nEntries; k++) {
boolean match = true;
for(int j = 0; j < nCol; j++) {
if(dictVals[k * nCol + j] != firstRowValues[j]) {
match = false;
break;
}
}
if(match) {
newId = k;
break;
}
}

IDictionary newDict = _dict;
if(newId == -1) {
double[] newDictVals = Arrays.copyOf(dictVals, dictVals.length + nCol);
System.arraycopy(firstRowValues, 0, newDictVals, dictVals.length, nCol);
newDict = new DeltaDictionary(newDictVals, nCol);
newId = nEntries;

if(newId >= slicedData.getUpperBoundValue()) {
slicedData = slicedData.resize(newId + 1);
}
}

slicedData.set(0, newId);
return ColGroupDeltaDDC.create(_colIndexes, newDict, slicedData, null);
}
Copy link
Contributor

@janniklinde janniklinde Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method covered by tests?

- ColGroupDDC: Reverted defensive try-catch around getValues() to match project convention.

- ColGroupFactory: Removed redundant check preventing CONST groups when DeltaDDC is requested.

- CLALibUnary: Removed flawed CUMSUM optimization and ROWCUMSUM support; rely on robust recompression fallback.

- ColGroupDeltaDDC: Implemented dynamic resizing for map construction to handle unknown unique counts (>256).

- ColGroupDeltaDDC: Fixed and verified scalar shift logic with map handling.

- DblArrayCountHashMap: Removed redundant object creation.

- Tests: Added comprehensive tests for scalar ops in ColGroupDeltaDDCTest; adjusted CLALibUnaryDeltaTest to reflect removed ROWCUMSUM support.
- Corrected scalar Multiply and Divide for DeltaDDC by scaling the dictionary values instead of falling back to default DDC logic (which was incorrect for deltas).

- Added unit tests for scalar operations (Plus, Minus, Multiply, Divide) in ColGroupDeltaDDCTest.

- Implemented and tested sliceRows support in ColGroupDeltaDDCTest, verifying that slicing DeltaDDC column groups preserves the delta encoding structure.

- Refined CLALibUnary structure by moving CUMSUM optimization check after isEmpty() check.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much for the review Jannik! I have tried to make the adjustments as requested

@codecov
Copy link

codecov bot commented Dec 17, 2025

Codecov Report

❌ Patch coverage is 86.50000% with 81 lines in your changes missing coverage. Please review.
✅ Project coverage is 72.38%. Comparing base (3779d50) to head (0d64bff).
⚠️ Report is 14 commits behind head on main.

Files with missing lines Patch % Lines
...ds/runtime/compress/colgroup/ColGroupDeltaDDC.java 84.67% 29 Missing and 13 partials ⚠️
...sds/runtime/compress/colgroup/ColGroupFactory.java 69.76% 11 Missing and 15 partials ⚠️
...g/apache/sysds/runtime/compress/estim/AComEst.java 58.33% 4 Missing and 1 partial ⚠️
...e/sysds/runtime/compress/colgroup/ColGroupDDC.java 94.59% 1 Missing and 1 partial ⚠️
.../compress/colgroup/dictionary/DeltaDictionary.java 92.00% 2 Missing ⚠️
...apache/sysds/runtime/compress/lib/CLALibUnary.java 90.47% 1 Missing and 1 partial ⚠️
...he/sysds/runtime/compress/colgroup/ColGroupIO.java 66.66% 1 Missing ⚠️
...ntime/compress/estim/encoding/EncodingFactory.java 97.72% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #2361      +/-   ##
============================================
+ Coverage     72.33%   72.38%   +0.04%     
- Complexity    46911    47175     +264     
============================================
  Files          1513     1516       +3     
  Lines        178198   179309    +1111     
  Branches      34984    35212     +228     
============================================
+ Hits         128897   129790     +893     
- Misses        39556    39705     +149     
- Partials       9745     9814      +69     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@janniklinde
Copy link
Contributor

Thank you @HanaHalitim for the recent changes. Please have a look at the missing test coverage from the codecov report to cover the remaining relevant code parts (excluding NotImplementedException and other minor branches). Also check for test redundancy and relevance.
Once this is addressed, we can attempt to merge the PR and proceed with a final detailed code review. Please double-check that everything works as intended and meets the project’s code quality standards, rather than relying solely on review feedback. If any high-level questions remain, feel free to contact me.

@HanaHalitim
Copy link
Author

Hi @janniklinde
Thanks a lot for your review, I checked the codecov report and expanded the test coverage accordingly and also checked for redundancy and relevance. I did run the tests locally and they were successful, I'm just waiting for the workflows approval on GitHub

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: In Review

Development

Successfully merging this pull request may close these issues.

3 participants