-
Notifications
You must be signed in to change notification settings - Fork 520
SYSTEMDS-3539 Implement delta encoding (Parts 1, 2, and 3) #2361
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Thanks for the good first PR @HanaHalitim which also includes plenty of tests . There are a few things that need to be addressed. I will leave some comments in the code. |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
| // } | ||
| public static AColGroup create(IColIndex colIndexes, IDictionary dict, AMapToData data, int[] cachedCounts) { | ||
| if(data.getUnique() == 1) | ||
| return ColGroupConst.create(colIndexes, dict); |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
| for(int j = 0; j < nCol; j++) { | ||
| prevRow[j] = prevRowData[prevOff + _colIndexes.get(j)]; | ||
| } | ||
| } |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
| oldIdToNewId[dac.id] = i; | ||
| idx += colIndexes.size(); | ||
| } | ||
| IDictionary dict = new DeltaDictionary(dictValues, colIndexes.size()); |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
|
|
||
| package org.apache.sysds.test.component.compress.estim.encoding; | ||
|
|
||
| import static org.junit.Assert.assertEquals; |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
- Fixed incorrect decompression logic for rl > 0 (partial ranges). - Removed unnecessary empty constructors. - Overrode unsupported DDC methods in ColGroupDeltaDDC. - Corrected ColGroupDeltaDDC.create for constant conversion. - Fixed dictionary allocation size for extra flag in ColGroupFactory. - Optimized CUMSUM/ROWCUMSUM to reinterpret DDC groups as DeltaDDC. - Strengthened EncodeDeltaTest assertions and added combine() tests. - Added new tests for partial range decompression and serialization. - Removed unused imports.
- Implemented DeltaDDC conversion to DDC for unsupported scalar/unary ops (e.g., K-Means). - Added comprehensive tests for relational and unary operations in ColGroupDeltaDDCTest.
|
Thank you for the improvements and the fix for the failing test case @HanaHalitim. I will leave a few comments in the code that I think should be addressed before merging. |
|
|
||
| double[] dictVals = null; | ||
| try { | ||
| dictVals = _dict.getValues(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this wrapped in a try ... catch? I don't think that there is a scenario where this would fail
| prevRow[j] = val; | ||
| } | ||
| } | ||
| } else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this case actually happen? Otherwise remove that null check
| else if(ct == CompressionType.DeltaDDC) { | ||
| return directCompressDeltaDDC(colIndexes, cg); | ||
| } | ||
| else if(ct == CompressionType.CONST && cs.preferDeltaEncoding) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would you encode CONST as DeltaDDC?
| } | ||
| final IntArrayList[] of = ubm.getOffsetList(); | ||
| if(of.length == 1 && of[0].size() == nRow) { // If this always constant | ||
| if(of.length == 1 && of[0].size() == nRow && ct != CompressionType.DeltaDDC) { // If this always constant |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would you encode CONST as DeltaDDC?
| final int r = m.getNumRows(); | ||
| final int c = m.getNumColumns(); | ||
|
|
||
| if(Builtin.isBuiltinCode(op.fn, BuiltinCode.CUMSUM, BuiltinCode.ROWCUMSUM)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't handle the ROWCUMSUM case, it would not be efficient (and DDC reinterpretation would be wrong)
|
|
||
| if(allDDC && !groups.isEmpty()) { | ||
| MatrixBlock uncompressed = m.getUncompressed("CUMSUM/ROWCUMSUM requires uncompression", op.getNumThreads()); | ||
| MatrixBlock opResult = uncompressed.unaryOperations(op, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't uncompress to do this redundant operation. In case of CUMSUM the reinterpretation should always be correct, so no need to verify.
| MatrixBlock uncompressed = m.getUncompressed("CUMSUM/ROWCUMSUM requires uncompression", op.getNumThreads()); | ||
| MatrixBlock opResult = uncompressed.unaryOperations(op, null); | ||
|
|
||
| CompressionSettingsBuilder csb = new CompressionSettingsBuilder(); | ||
| csb.clearValidCompression(); | ||
| csb.setPreferDeltaEncoding(true); | ||
| csb.addValidCompression(CompressionType.DeltaDDC); | ||
| csb.addValidCompression(CompressionType.UNCOMPRESSED); | ||
| csb.setTransposeInput("false"); | ||
| Pair<MatrixBlock, CompressionStatistics> compressedPair = CompressedMatrixBlockFactory.compress(opResult, op.getNumThreads(), csb); | ||
| MatrixBlock compressedResult = compressedPair.getLeft(); | ||
|
|
||
| if(compressedResult == null) { | ||
| compressedResult = opResult; | ||
| } | ||
|
|
||
| CompressedMatrixBlock finalResult; | ||
| if(compressedResult instanceof CompressedMatrixBlock) { | ||
| finalResult = (CompressedMatrixBlock) compressedResult; | ||
| } | ||
| else { | ||
| finalResult = CompressedMatrixBlockFactory.genUncompressedCompressedMatrixBlock(compressedResult); | ||
| } | ||
|
|
||
| return finalResult; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part is unnecessary. Let it just fall through and let the case LibMatrixAgg.isSupportedUnaryOperator(op) handle it.
|
|
||
| return finalResult; | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might make more sense to put the entire if branch below if (m.isEmpty())...
|
|
||
| protected final DArrCounts create(DblArray key, int id) { | ||
| return new DArrCounts(key, id); | ||
| return new DArrCounts(new DblArray(key), id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to create a copy of key because new DArrCounts(...) already takes care of that. So you can safely revert that change
| @Override | ||
| protected void decompressToSparseBlockDenseDictionary(SparseBlock ret, int rl, int ru, int offR, int offC, | ||
| double[] values) { | ||
| final int nCol = _colIndexes.size(); | ||
| final double[] prevRow = new double[nCol]; | ||
|
|
||
| if(rl > 0) { | ||
| final int dictIdx0 = _data.getIndex(0); | ||
| final int rowIndex0 = dictIdx0 * nCol; | ||
| for(int j = 0; j < nCol; j++) { | ||
| prevRow[j] = values[rowIndex0 + j]; | ||
| } | ||
| for(int i = 1; i < rl; i++) { | ||
| final int dictIdx = _data.getIndex(i); | ||
| final int rowIndex = dictIdx * nCol; | ||
| for(int j = 0; j < nCol; j++) { | ||
| prevRow[j] += values[rowIndex + j]; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| for(int i = rl, offT = rl + offR; i < ru; i++, offT++) { | ||
| final int dictIdx = _data.getIndex(i); | ||
| final int rowIndex = dictIdx * nCol; | ||
|
|
||
| if(i == 0 && rl == 0) { | ||
| for(int j = 0; j < nCol; j++) { | ||
| final double value = values[rowIndex + j]; | ||
| final int colIdx = _colIndexes.get(j); | ||
| ret.append(offT, colIdx + offC, value); | ||
| prevRow[j] = value; | ||
| } | ||
| } | ||
| else { | ||
| for(int j = 0; j < nCol; j++) { | ||
| final double delta = values[rowIndex + j]; | ||
| final double newValue = prevRow[j] + delta; | ||
| final int colIdx = _colIndexes.get(j); | ||
| ret.append(offT, colIdx + offC, newValue); | ||
| prevRow[j] = newValue; | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that this method is covered by any test case
| @Override | ||
| public AColGroup scalarOperation(ScalarOperator op) { | ||
| if(op.fn instanceof Multiply || op.fn instanceof Divide) { | ||
| return super.scalarOperation(op); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Untested
| return super.scalarOperation(op); | ||
| } | ||
| else if(op.fn instanceof Plus || op.fn instanceof Minus) { | ||
| return scalarOperationShift(op); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Untested
|
|
||
| if(nCol == 1) { | ||
| DoubleCountHashMap map = new DoubleCountHashMap(16); | ||
| AMapToData mapData = MapToFactory.create(nRow, 256); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if number of unique items > 256?
| } | ||
| else { | ||
| DblArrayCountHashMap map = new DblArrayCountHashMap(16); | ||
| AMapToData mapData = MapToFactory.create(nRow, 256); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if number of unique items > 256?
| @Override | ||
| public AColGroup sliceRows(int rl, int ru) { | ||
| AMapToData slicedData = _data.slice(rl, ru); | ||
| final int nCol = _colIndexes.size(); | ||
| double[] firstRowValues = new double[nCol]; | ||
| double[] dictVals = ((DeltaDictionary)_dict).getValues(); | ||
|
|
||
| for(int i = 0; i <= rl; i++) { | ||
| int dictIdx = _data.getIndex(i); | ||
| int dictOffset = dictIdx * nCol; | ||
| if(i == 0) { | ||
| for(int j = 0; j < nCol; j++) firstRowValues[j] = dictVals[dictOffset + j]; | ||
| } else { | ||
| for(int j = 0; j < nCol; j++) firstRowValues[j] += dictVals[dictOffset + j]; | ||
| } | ||
| } | ||
|
|
||
| int nEntries = dictVals.length / nCol; | ||
| int newId = -1; | ||
| for(int k = 0; k < nEntries; k++) { | ||
| boolean match = true; | ||
| for(int j = 0; j < nCol; j++) { | ||
| if(dictVals[k * nCol + j] != firstRowValues[j]) { | ||
| match = false; | ||
| break; | ||
| } | ||
| } | ||
| if(match) { | ||
| newId = k; | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| IDictionary newDict = _dict; | ||
| if(newId == -1) { | ||
| double[] newDictVals = Arrays.copyOf(dictVals, dictVals.length + nCol); | ||
| System.arraycopy(firstRowValues, 0, newDictVals, dictVals.length, nCol); | ||
| newDict = new DeltaDictionary(newDictVals, nCol); | ||
| newId = nEntries; | ||
|
|
||
| if(newId >= slicedData.getUpperBoundValue()) { | ||
| slicedData = slicedData.resize(newId + 1); | ||
| } | ||
| } | ||
|
|
||
| slicedData.set(0, newId); | ||
| return ColGroupDeltaDDC.create(_colIndexes, newDict, slicedData, null); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this method covered by tests?
- ColGroupDDC: Reverted defensive try-catch around getValues() to match project convention. - ColGroupFactory: Removed redundant check preventing CONST groups when DeltaDDC is requested. - CLALibUnary: Removed flawed CUMSUM optimization and ROWCUMSUM support; rely on robust recompression fallback. - ColGroupDeltaDDC: Implemented dynamic resizing for map construction to handle unknown unique counts (>256). - ColGroupDeltaDDC: Fixed and verified scalar shift logic with map handling. - DblArrayCountHashMap: Removed redundant object creation. - Tests: Added comprehensive tests for scalar ops in ColGroupDeltaDDCTest; adjusted CLALibUnaryDeltaTest to reflect removed ROWCUMSUM support.
- Corrected scalar Multiply and Divide for DeltaDDC by scaling the dictionary values instead of falling back to default DDC logic (which was incorrect for deltas). - Added unit tests for scalar operations (Plus, Minus, Multiply, Divide) in ColGroupDeltaDDCTest. - Implemented and tested sliceRows support in ColGroupDeltaDDCTest, verifying that slicing DeltaDDC column groups preserves the delta encoding structure. - Refined CLALibUnary structure by moving CUMSUM optimization check after isEmpty() check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much for the review Jannik! I have tried to make the adjustments as requested
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2361 +/- ##
============================================
+ Coverage 72.33% 72.38% +0.04%
- Complexity 46911 47175 +264
============================================
Files 1513 1516 +3
Lines 178198 179309 +1111
Branches 34984 35212 +228
============================================
+ Hits 128897 129790 +893
- Misses 39556 39705 +149
- Partials 9745 9814 +69 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
Thank you @HanaHalitim for the recent changes. Please have a look at the missing test coverage from the codecov report to cover the remaining relevant code parts (excluding |
|
Hi @janniklinde |
Implemented delta encoding compression for SystemDS. Added ColGroupDeltaDDC compression type that stores row differences instead of absolute values, improving compression for data with predictable patterns.
Created delta readers that compute row differences on-the-fly during compression, avoiding delta matrix materialization. Wired CUMSUM and ROWCUMSUM operations to automatically use delta encoding for their results.
Extended compression estimation with preferDeltaEncoding flag to evaluate delta encoding as a compression option. Fixed dictionary remapping bug where extractValues() reordered entries, breaking row-to-dictionary mappings.
All tests pass including ColGroupDeltaDDCTest, CLALibUnaryDeltaTest, ReadersDeltaTest, and related compression tests.