diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java index af944fce750..99c4b9c2ecb 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java @@ -55,16 +55,16 @@ public class CompressionSettings { /** * The sampling ratio used when choosing ColGroups. Note that, default behavior is to use exact estimator if the * number of elements is below 1000. - * + * * DEPRECATED */ public final double samplingRatio; /** * The sampling ratio power to use when choosing sample size. This is used in accordance to the function: - * + * * sampleSize += nRows^samplePower; - * + * * The value is bounded to be in the range of 0 to 1, 1 giving a sample size of everything, and 0 adding 1. */ public final double samplePower; @@ -114,8 +114,9 @@ public class CompressionSettings { /** * Transpose input matrix, to optimize access when extracting bitmaps. This setting is changed inside the script * based on the transposeInput setting. - * - * This is intentionally left as a mutable value, since the transposition of the input matrix is decided in phase 3. + * + * This is intentionally left as a mutable value, since the transposition of the input matrix is decided in phase + * 3. */ public boolean transposed = false; @@ -135,6 +136,19 @@ public class CompressionSettings { public final boolean preferDeltaEncoding; + // Handling Targetloss for piecewise linear Kompression + + private double piecewiseTargetLoss = Double.NaN; + + public void setPiecewiseTargetLoss(double piecewiseTargetLoss) { + this.piecewiseTargetLoss = piecewiseTargetLoss; + + } + + public double getPiecewiseTargetLoss() { + return piecewiseTargetLoss; + } + protected CompressionSettings(double samplingRatio, double samplePower, boolean allowSharedDictionary, String transposeInput, int seed, boolean lossy, EnumSet validCompressions, boolean sortValuesByLength, PartitionerType columnPartitioner, int maxColGroupCoCode, double coCodePercentage, @@ -161,7 +175,7 @@ protected CompressionSettings(double samplingRatio, double samplePower, boolean this.sdcSortType = sdcSortType; this.scaleFactors = scaleFactors; this.preferDeltaEncoding = preferDeltaEncoding; - + if(!printedStatus && LOG.isDebugEnabled()) { printedStatus = true; LOG.debug(this.toString()); diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java index 003703f86a4..995837a6ad8 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java @@ -55,7 +55,7 @@ /** * Abstract Class that is the lowest class type for the Compression framework. - * + * * AColGroup store information about a number of columns. * */ @@ -65,7 +65,8 @@ public abstract class AColGroup implements Serializable { /** Public super types of compression ColGroups supported */ public static enum CompressionType { - UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCFOR, DDCFOR, DeltaDDC, LinearFunctional; + UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCFOR, DDCFOR, DeltaDDC, LinearFunctional, PiecewiseLinear, + PiecewiseLinearSuccessive; public boolean isDense() { return this == DDC || this == CONST || this == DDCFOR || this == DDCFOR; @@ -82,12 +83,12 @@ public boolean isSDC() { /** * Concrete ColGroupType - * + * * Protected such that outside the ColGroup package it should be unknown which specific subtype is used. */ protected static enum ColGroupType { UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCSingle, SDCSingleZeros, SDCZeros, SDCFOR, DDCFOR, DeltaDDC, - LinearFunctional; + LinearFunctional, PiecewiseLinear; } /** The ColGroup indexes contained in the ColGroup */ @@ -95,7 +96,7 @@ protected static enum ColGroupType { /** * Main constructor. - * + * * @param colIndices offsets of the columns in the matrix block that make up the group */ protected AColGroup(IColIndex colIndices) { @@ -104,7 +105,7 @@ protected AColGroup(IColIndex colIndices) { /** * Obtain the offsets of the columns in the matrix block that make up the group - * + * * @return offsets of the columns in the matrix block that make up the group */ public final IColIndex getColIndices() { @@ -113,7 +114,7 @@ public final IColIndex getColIndices() { /** * Obtain the number of columns in this column group. - * + * * @return number of columns in this column group */ public final int getNumCols() { @@ -124,9 +125,9 @@ public final int getNumCols() { * Shift all column indexes contained by an offset. * * This is used for rbind to combine compressed matrices. - * + * * Since column indexes are reused between operations, we allocate a new list here to be safe - * + * * @param offset The offset to move all columns * @return A new column group object with the shifted columns */ @@ -138,7 +139,7 @@ public final AColGroup shiftColIndices(int offset) { * Copy the content of the column group with pointers to the previous content but with new column given Note this * method does not verify if the colIndexes specified are valid and correct dimensions for the underlying column * groups. - * + * * @param colIndexes the new indexes to use in the copy * @return a new object with pointers to underlying data. */ @@ -146,7 +147,7 @@ public final AColGroup shiftColIndices(int offset) { /** * Get the upper bound estimate of in memory allocation for the column group. - * + * * @return an upper bound on the number of bytes used to store this ColGroup in memory. */ public long estimateInMemorySize() { @@ -157,9 +158,9 @@ public long estimateInMemorySize() { /** * Decompress a range of rows into a sparse block - * + * * Note that this is using append, so the sparse column indexes need to be sorted afterwards. - * + * * @param sb Sparse Target block * @param rl Row to start at * @param ru Row to end at @@ -170,7 +171,7 @@ public final void decompressToSparseBlock(SparseBlock sb, int rl, int ru) { /** * Decompress a range of rows into a dense block - * + * * @param db Dense target block * @param rl Row to start at * @param ru Row to end at @@ -181,7 +182,7 @@ public final void decompressToDenseBlock(DenseBlock db, int rl, int ru) { /** * Decompress a range of rows into a dense transposed block. - * + * * @param db Dense target block * @param rl Row in this column group to start at. * @param ru Row in this column group to end at. @@ -191,7 +192,7 @@ public final void decompressToDenseBlock(DenseBlock db, int rl, int ru) { /** * Decompress the column group to the sparse transposed block. Note that the column groups would only need to * decompress into specific sub rows of the Sparse block - * + * * @param sb Sparse target block * @param nColOut The number of columns in the sb. */ @@ -199,7 +200,7 @@ public final void decompressToDenseBlock(DenseBlock db, int rl, int ru) { /** * Serializes column group to data output. - * + * * @param out data output * @throws IOException if IOException occurs */ @@ -212,7 +213,7 @@ protected void write(DataOutput out) throws IOException { /** * Returns the exact serialized size of column group. This can be used for example for buffer preallocation. - * + * * @return exact serialized size for column group */ public long getExactSizeOnDisk() { @@ -225,11 +226,11 @@ public long getExactSizeOnDisk() { /** * Slice out the columns within the range of cl and cu to remove the dictionary values related to these columns. If * the ColGroup slicing from does not contain any columns within the range null is returned. - * + * * @param cl The lower bound of the columns to select * @param cu The upper bound of the columns to select (not inclusive). * @return A cloned Column Group, with a copied pointer to the old column groups index structure, but reduced - * dictionary and _columnIndexes correctly aligned with the expected sliced compressed matrix. + * dictionary and _columnIndexes correctly aligned with the expected sliced compressed matrix. */ public final AColGroup sliceColumns(int cl, int cu) { if(cl <= _colIndexes.get(0) && cu > _colIndexes.get(_colIndexes.size() - 1)) { @@ -247,10 +248,10 @@ else if(cu - cl == 1) /** * Slice out a single column from the column group. - * + * * @param col The column to slice, the column could potentially not be inside the column group * @return A new column group that is a single column, if the column requested is not in this column group null is - * returned. + * returned. */ public final AColGroup sliceColumn(int col) { int idx = _colIndexes.findIndex(col); @@ -262,11 +263,11 @@ public final AColGroup sliceColumn(int col) { /** * Slice out multiple columns within the interval between the given indexes. - * + * * @param cl The lower column index to slice from * @param cu The upper column index to slice to, (not included) * @return A column group of this containing the columns specified, returns null if the columns specified is not - * contained in the column group + * contained in the column group */ protected final AColGroup sliceMultiColumns(int cl, int cu) { SliceResult sr = _colIndexes.slice(cl, cu); @@ -278,7 +279,7 @@ protected final AColGroup sliceMultiColumns(int cl, int cu) { /** * Compute the column sum of the given list of groups - * + * * @param groups The Groups to sum * @param res The result to put the values into * @param nRows The number of rows in the groups @@ -292,9 +293,9 @@ public static double[] colSum(Collection groups, double[] res, int nR /** * Get the value at a global row/column position. - * + * * In general this performs since a binary search of colIndexes is performed for each lookup. - * + * * @param r row * @param c column * @return value at the row/column position @@ -309,7 +310,7 @@ public double get(int r, int c) { /** * Get the value at a colGroup specific row/column index position. - * + * * @param r row * @param colIdx column index in the _colIndexes. * @return value at the row/column index position @@ -318,16 +319,16 @@ public double get(int r, int c) { /** * Obtain number of distinct tuples in contained sets of values associated with this column group. - * + * * If the column group is uncompressed the number or rows is returned. - * + * * @return the number of distinct sets of values associated with the bitmaps in this column group */ public abstract int getNumValues(); /** * Obtain the compression type. - * + * * @return How the elements of the column group are compressed. */ public abstract CompressionType getCompType(); @@ -335,14 +336,14 @@ public double get(int r, int c) { /** * Internally get the specific type of ColGroup, this could be extracted from the object but that does not allow for * nice switches in the code. - * + * * @return ColGroupType of the object. */ protected abstract ColGroupType getColGroupType(); /** * Decompress into the DenseBlock. (no NNZ handling) - * + * * @param db Target DenseBlock * @param rl Row to start decompression from * @param ru Row to end decompression at (not inclusive) @@ -353,10 +354,10 @@ public double get(int r, int c) { /** * Decompress into the SparseBlock. (no NNZ handling) - * + * * Note this method is allowing to calls to append since it is assumed that the sparse column indexes are sorted * afterwards - * + * * @param sb Target SparseBlock * @param rl Row to start decompression from * @param ru Row to end decompression at (not inclusive) @@ -367,9 +368,9 @@ public double get(int r, int c) { /** * Right matrix multiplication with this column group. - * + * * This method can return null, meaning that the output overlapping group would have been empty. - * + * * @param right The MatrixBlock on the right of this matrix multiplication * @return The new Column Group or null that is the result of the matrix multiplication. */ @@ -379,9 +380,9 @@ public final AColGroup rightMultByMatrix(MatrixBlock right) { /** * Right matrix multiplication with this column group. - * + * * This method can return null, meaning that the output overlapping group would have been empty. - * + * * @param right The MatrixBlock on the right of this matrix multiplication * @param allCols A pre-materialized list of all col indexes, that can be shared across all column groups if use * full, can be set to null. @@ -392,7 +393,7 @@ public final AColGroup rightMultByMatrix(MatrixBlock right) { /** * Right side Matrix multiplication, iterating though this column group and adding to the ret - * + * * @param right Right side matrix to multiply with. * @param ret The return matrix to add results to * @param rl The row of this column group to multiply from @@ -401,18 +402,20 @@ public final AColGroup rightMultByMatrix(MatrixBlock right) { * @param cru The right hand side column upper * @param nRows The number of rows in this column group */ - public void rightDecompressingMult(MatrixBlock right, MatrixBlock ret, int rl, int ru, int nRows, int crl, int cru){ - throw new NotImplementedException("not supporting right Decompressing Multiply on class: " + this.getClass().getSimpleName()); + public void rightDecompressingMult(MatrixBlock right, MatrixBlock ret, int rl, int ru, int nRows, int crl, + int cru) { + throw new NotImplementedException( + "not supporting right Decompressing Multiply on class: " + this.getClass().getSimpleName()); } /** * Do a transposed self matrix multiplication on the left side t(x) %*% x. but only with this column group. - * + * * This gives better performance since there is no need to iterate through all the rows of the matrix, but the * execution can be limited to its number of distinct values. - * + * * Note it only calculate the upper triangle - * + * * @param ret The return matrix block [numColumns x numColumns] * @param nRows The number of rows in the column group */ @@ -420,7 +423,7 @@ public void rightDecompressingMult(MatrixBlock right, MatrixBlock ret, int rl, i /** * Left multiply with this column group. - * + * * @param matrix The matrix to multiply with on the left * @param result The result to output the values into, always dense for the purpose of the column groups * parallelizing @@ -434,7 +437,7 @@ public abstract void leftMultByMatrixNoPreAgg(MatrixBlock matrix, MatrixBlock re /** * Left side matrix multiplication with a column group that is transposed. - * + * * @param lhs The left hand side Column group to multiply with, the left hand side should be considered * transposed. Also it should be guaranteed that this column group is not empty. * @param result The result matrix to insert the result of the multiplication into @@ -444,16 +447,16 @@ public abstract void leftMultByMatrixNoPreAgg(MatrixBlock matrix, MatrixBlock re /** * Matrix multiply with this other column group, but: - * + * * 1. Only output upper triangle values. - * + * * 2. Multiply both ways with "this" being on the left and on the right. - * + * * It should be guaranteed that the input is not the same as the caller of the method. - * + * * The second step is achievable by treating the initial multiplied matrix, and adding its values to the correct * locations in the output. - * + * * @param other The other Column group to multiply with * @param result The result matrix to put the results into */ @@ -462,7 +465,7 @@ public abstract void leftMultByMatrixNoPreAgg(MatrixBlock matrix, MatrixBlock re /** * Perform the specified scalar operation directly on the compressed column group, without decompressing individual * cells if possible. - * + * * @param op operation to perform * @return version of this column group with the operation applied */ @@ -470,7 +473,7 @@ public abstract void leftMultByMatrixNoPreAgg(MatrixBlock matrix, MatrixBlock re /** * Perform a binary row operation. - * + * * @param op The operation to execute * @param v The vector of values to apply the values contained should be at least the length of the highest * value in the column index @@ -481,7 +484,7 @@ public abstract void leftMultByMatrixNoPreAgg(MatrixBlock matrix, MatrixBlock re /** * Short hand add operator call on column group to add a row vector to the column group - * + * * @param v The vector to add * @return A new column group where the vector is added. */ @@ -491,7 +494,7 @@ public AColGroup addVector(double[] v) { /** * Perform a binary row operation. - * + * * @param op The operation to execute * @param v The vector of values to apply the values contained should be at least the length of the highest * value in the column index @@ -503,9 +506,9 @@ public AColGroup addVector(double[] v) { /** * Unary Aggregate operator, since aggregate operators require new object output, the output becomes an uncompressed * matrix. - * + * * The range of rl to ru only applies to row aggregates. (ReduceCol) - * + * * @param op The operator used * @param c The output matrix block * @param nRows The total number of rows in the Column Group @@ -516,9 +519,9 @@ public AColGroup addVector(double[] v) { /** * Slice out column at specific index of this column group. - * + * * It is guaranteed that the column to slice is contained in this columnGroup. - * + * * @param idx The column index to slice out. * @return A new column group containing the columns inside. (never null) */ @@ -526,9 +529,9 @@ public AColGroup addVector(double[] v) { /** * Slice range of columns inside this column group. - * + * * It is guaranteed that the columns to slice is contained in this columnGroup. - * + * * @param idStart The column index to start at * @param idEnd The column index to end at (not included) * @param outputCols The output columns to extract materialized for ease of implementation @@ -538,9 +541,10 @@ public AColGroup addVector(double[] v) { /** * Slice range of rows out of the column group and return a new column group only containing the row segment. - * - * Note that this slice should maintain pointers back to the original dictionaries and only modify index structures. - * + * + * Note that this slice should maintain pointers back to the original dictionaries and only modify index + * structures. + * * @param rl The row to start at * @param ru The row to end at (not included) * @return A new column group containing the specified row range. @@ -549,21 +553,21 @@ public AColGroup addVector(double[] v) { /** * Short hand method for getting minimum value contained in this column group. - * + * * @return The minimum value contained in this ColumnGroup */ public abstract double getMin(); /** * Short hand method for getting maximum value contained in this column group. - * + * * @return The maximum value contained in this ColumnGroup */ public abstract double getMax(); /** * Short hand method for getting the sum of this column group - * + * * @param nRows The number of rows in the column group * @return The sum of this column group */ @@ -571,7 +575,7 @@ public AColGroup addVector(double[] v) { /** * Detect if the column group contains a specific value. - * + * * @param pattern The value to look for. * @return boolean saying true if the value is contained. */ @@ -579,7 +583,7 @@ public AColGroup addVector(double[] v) { /** * Get the number of nonZeros contained in this column group. - * + * * @param nRows The number of rows in the column group, this is used for groups that does not contain information * about how many rows they have. * @return The nnz. @@ -588,7 +592,7 @@ public AColGroup addVector(double[] v) { /** * Make a copy of the column group values, and replace all values that match pattern with replacement value. - * + * * @param pattern The value to look for * @param replace The value to replace the other value with * @return A new Column Group, reusing the index structure but with new values. @@ -597,7 +601,7 @@ public AColGroup addVector(double[] v) { /** * Compute the column sum - * + * * @param c The array to add the column sum to. * @param nRows The number of rows in the column group. */ @@ -605,7 +609,7 @@ public AColGroup addVector(double[] v) { /** * Central Moment instruction executed on a column group. - * + * * @param op The Operator to use. * @param nRows The number of rows contained in the ColumnGroup. * @return A Central Moment object. @@ -614,7 +618,7 @@ public AColGroup addVector(double[] v) { /** * Expand the column group to multiple columns. (one hot encode the column group) - * + * * @param max The number of columns to expand to and cutoff values at. * @param ignore If zero and negative values should be ignored. * @param cast If the double values contained should be cast to whole numbers. @@ -625,7 +629,7 @@ public AColGroup addVector(double[] v) { /** * Get the computation cost associated with this column group. - * + * * @param e The computation cost estimator * @param nRows the number of rows in the column group * @return The cost of this column group @@ -634,7 +638,7 @@ public AColGroup addVector(double[] v) { /** * Perform unary operation on the column group and return a new column group - * + * * @param op The operation to perform * @return The new column group */ @@ -642,19 +646,19 @@ public AColGroup addVector(double[] v) { /** * Get if the group is only containing zero - * + * * @return true if empty */ public abstract boolean isEmpty(); /** - * Append the other column group to this column group. This method tries to combine them to return a new column group - * containing both. In some cases it is possible in reasonable time, in others it is not. - * + * Append the other column group to this column group. This method tries to combine them to return a new column + * group containing both. In some cases it is possible in reasonable time, in others it is not. + * * The result is first this column group followed by the other column group in higher row values. - * + * * If it is not possible or very inefficient null is returned. - * + * * @param g The other column group * @return A combined column group or null */ @@ -662,9 +666,9 @@ public AColGroup addVector(double[] v) { /** * Append all column groups in the list provided together in one go allocating the output once. - * + * * If it is not possible or very inefficient null is returned. - * + * * @param groups The groups to combine. * @param blen The normal number of rows in the groups * @param rlen The total number of rows of all combined. @@ -676,11 +680,11 @@ public static AColGroup appendN(AColGroup[] groups, int blen, int rlen) { /** * Append all column groups in the list provided together with this. - * + * * A Important detail is the first entry in the group == this, and should not be appended twice. - * + * * If it is not possible or very inefficient null is returned. - * + * * @param groups The groups to combine. * @param blen The normal number of rows in the groups * @param rlen The total number of rows of all combined. @@ -690,7 +694,7 @@ public static AColGroup appendN(AColGroup[] groups, int blen, int rlen) { /** * Get the compression scheme for this column group to enable compression of other data. - * + * * @return The compression scheme of this column group */ public abstract ICLAScheme getCompressionScheme(); @@ -704,14 +708,14 @@ public void clear() { /** * Recompress this column group into a new column group. - * + * * @return A new or the same column group depending on optimization goal. */ public abstract AColGroup recompress(); /** * Recompress this column group into a new column group of the given type. - * + * * @param ct The compressionType that the column group should morph into * @param nRow The number of rows in this columngroup. * @return A new column group @@ -741,7 +745,7 @@ else if(ct == CompressionType.UNCOMPRESSED) { /** * Get the compression info for this column group. - * + * * @param nRow The number of rows in this column group. * @return The compression info for this group. */ @@ -749,7 +753,7 @@ else if(ct == CompressionType.UNCOMPRESSED) { /** * Combine this column group with another - * + * * @param other The other column group to combine with. * @param nRow The number of rows in both column groups. * @return A combined representation as a column group. @@ -760,7 +764,7 @@ public AColGroup combine(AColGroup other, int nRow) { /** * Get encoding of this column group. - * + * * @return The encoding of the index structure. */ public IEncode getEncoding() { @@ -781,19 +785,19 @@ public AColGroup sortColumnIndexes() { /** * Perform row sum on the internal dictionaries, and return the same index structure. - * + * * This method returns null on empty column groups. - * + * * Note this method does not guarantee correct behavior if the given group is AMorphingGroup, instead it should be * morphed to a valid columngroup via extractCommon first. - * + * * @return The reduced colgroup. */ public abstract AColGroup reduceCols(); /** * Selection (left matrix multiply) - * + * * @param selection A sparse matrix with "max" a single one in each row all other values are zero. * @param points The coordinates in the selection matrix to extract. * @param ret The MatrixBlock to decompress the selected rows into @@ -806,17 +810,17 @@ public final void selectionMultiply(MatrixBlock selection, P[] points, MatrixBlo else denseSelection(selection, points, ret, rl, ru); } - + /** * Get an approximate sparsity of this column group - * + * * @return the approximate sparsity of this columngroup */ public abstract double getSparsity(); /** * Sparse selection (left matrix multiply) - * + * * @param selection A sparse matrix with "max" a single one in each row all other values are zero. * @param points The coordinates in the selection matrix to extract. * @param ret The Sparse MatrixBlock to decompress the selected rows into @@ -827,7 +831,7 @@ public final void selectionMultiply(MatrixBlock selection, P[] points, MatrixBlo /** * Dense selection (left matrix multiply) - * + * * @param selection A sparse matrix with "max" a single one in each row all other values are zero. * @param points The coordinates in the selection matrix to extract. * @param ret The Dense MatrixBlock to decompress the selected rows into @@ -839,7 +843,7 @@ public final void selectionMultiply(MatrixBlock selection, P[] points, MatrixBlo /** * Method to determine if the columnGroup have the same index structure as another. Note that the column indexes and * dictionaries are allowed to be different. - * + * * @param that the other column group * @return if the index is the same. */ @@ -850,7 +854,7 @@ public boolean sameIndexStructure(AColGroup that) { /** * C bind the list of column groups with this column group. the list of elements provided in the index of each list * is guaranteed to have the same index structures - * + * * @param nRow The number of rows contained in all right and this column group. * @param nCol The number of columns to shift the right hand side column groups over when combining, this should * only effect the column indexes @@ -888,7 +892,7 @@ public AColGroup combineWithSameIndex(int nRow, int nCol, List right) /** * C bind the given column group to this. - * + * * @param nRow The number of rows contained in the right and this column group. * @param nCol The number of columns in this. * @param right The column group to c-bind. @@ -928,16 +932,16 @@ protected IColIndex combineColIndexes(final int nCol, List right) { /** * This method returns a list of column groups that are naive splits of this column group as if it is reshaped. - * + * * This means the column groups rows are split into x number of other column groups where x is the multiplier. - * + * * The indexes are assigned round robbin to each of the output groups, meaning the first index is assigned. - * + * * If for instance the 4. column group is split by a 2 multiplier and there was 5 columns in total originally. The * output becomes 2 column groups at column index 4 and one at 9. - * + * * If possible the split column groups should reuse pointers back to the original dictionaries! - * + * * @param multiplier The number of column groups to split into * @param nRow The number of rows in this column group in case the underlying column group does not know * @param nColOrg The number of overall columns in the host CompressedMatrixBlock. @@ -947,25 +951,25 @@ protected IColIndex combineColIndexes(final int nCol, List right) { /** * This method returns a list of column groups that are naive splits of this column group as if it is reshaped. - * + * * This means the column groups rows are split into x number of other column groups where x is the multiplier. - * + * * The indexes are assigned round robbin to each of the output groups, meaning the first index is assigned. - * + * * If for instance the 4. column group is split by a 2 multiplier and there was 5 columns in total originally. The * output becomes 2 column groups at column index 4 and one at 9. - * + * * If possible the split column groups should reuse pointers back to the original dictionaries! - * + * * This specific variation is pushing down the parallelization given via the executor service provided. If not * overwritten the default is to call the normal split reshape - * + * * @param multiplier The number of column groups to split into * @param nRow The number of rows in this column group in case the underlying column group does not know * @param nColOrg The number of overall columns in the host CompressedMatrixBlock * @param pool The executor service to submit parallel tasks to - * @throws Exception In case there is an error we throw the exception out instead of handling it * @return a list of split column groups + * @throws Exception In case there is an error we throw the exception out instead of handling it */ public AColGroup[] splitReshapePushDown(final int multiplier, final int nRow, final int nColOrg, final ExecutorService pool) throws Exception { diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java index 273df9ff26f..76b6d04ecf2 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java @@ -43,6 +43,7 @@ import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory; import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary; import org.apache.sysds.runtime.compress.colgroup.functional.LinearRegression; +import org.apache.sysds.runtime.compress.colgroup.functional.PiecewiseLinearUtils; import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; import org.apache.sysds.runtime.compress.colgroup.insertionsort.AInsertionSorter; @@ -106,7 +107,7 @@ private ColGroupFactory(MatrixBlock in, CompressedSizeInfo csi, CompressionSetti /** * The actual compression method, that handles the logic of compressing multiple columns together. - * + * * @param in The input matrix, that could have been transposed. If it is transposed the compSettings should specify * this. * @param csi The compression information extracted from the estimation, this contains which groups of columns to @@ -120,7 +121,7 @@ public static List compressColGroups(MatrixBlock in, CompressedSizeIn /** * The actual compression method, that handles the logic of compressing multiple columns together. - * + * * @param in The input matrix, that could have been transposed. If it is transposed the compSettings should specify * this. * @param csi The compression information extracted from the estimation, this contains which groups of columns to @@ -135,7 +136,7 @@ public static List compressColGroups(MatrixBlock in, CompressedSizeIn } /** - * + * * @param in The input matrix, that could have been transposed. If it is transposed the compSettings should specify * this. * @param csi The compression information extracted from the estimation, this contains which groups of columns to @@ -232,8 +233,9 @@ private void logEstVsActual(double time, AColGroup act, CompressedSizeInfoColGro time, retType, estC, actC, act.getNumValues(), cols, wanted, warning)); } else { - LOG.debug(String.format("time[ms]: %10.2f %25s est %10.0f -- act %10.0f distinct:%5d cols:%s wanted:%s", - time, retType, estC, actC, act.getNumValues(), cols, wanted)); + LOG.debug( + String.format("time[ms]: %10.2f %25s est %10.0f -- act %10.0f distinct:%5d cols:%s wanted:%s", time, + retType, estC, actC, act.getNumValues(), cols, wanted)); } } @@ -303,6 +305,12 @@ else if(ct == CompressionType.LinearFunctional) { return compressLinearFunctional(colIndexes, in, cs); } } + else if(ct == CompressionType.PiecewiseLinear) { + return compressPiecewiseLinearFunctional(colIndexes, in, cs); + } + else if(ct == CompressionType.PiecewiseLinearSuccessive) { + return compressPiecewiseLinearFunctionalSuccessive(colIndexes, in, cs); + } else if(ct == CompressionType.DDCFOR) { AColGroup g = directCompressDDC(colIndexes, cg); if(g instanceof ColGroupDDC) @@ -698,7 +706,7 @@ private AColGroup directCompressDeltaDDC(IColIndex colIndexes, CompressedSizeInf if(cs.scaleFactors != null) { throw new NotImplementedException("Delta encoding with quantization not yet implemented"); } - + if(colIndexes.size() > 1) { return directCompressDeltaDDCMultiCol(colIndexes, cg); } @@ -730,7 +738,7 @@ private AColGroup directCompressDeltaDDCSingleCol(IColIndex colIndexes, Compress if(map.size() == 0) return new ColGroupEmpty(colIndexes); - + final double[] dictValues = map.getDictionary(); IDictionary dict = new DeltaDictionary(dictValues, 1); @@ -739,7 +747,8 @@ private AColGroup directCompressDeltaDDCSingleCol(IColIndex colIndexes, Compress return ColGroupDeltaDDC.create(colIndexes, dict, resData, null); } - private AColGroup directCompressDeltaDDCMultiCol(IColIndex colIndexes, CompressedSizeInfoColGroup cg) throws Exception { + private AColGroup directCompressDeltaDDCMultiCol(IColIndex colIndexes, CompressedSizeInfoColGroup cg) + throws Exception { final AMapToData d = MapToFactory.create(nRow, Math.max(Math.min(cg.getNumOffs() + 1, nRow), 126)); final int fill = d.getUpperBoundValue(); d.fill(fill); @@ -818,8 +827,8 @@ private boolean readToMapDDC(IColIndex colIndexes, DblArrayCountHashMap map, AMa int fill) { ReaderColumnSelection reader = (cs.scaleFactors == null) ? ReaderColumnSelection.createReader(in, colIndexes, - cs.transposed, rl, - ru) : ReaderColumnSelection.createQuantizedReader(in, colIndexes, cs.transposed, rl, ru, cs.scaleFactors); + cs.transposed, rl, ru) : ReaderColumnSelection.createQuantizedReader(in, colIndexes, cs.transposed, rl, ru, + cs.scaleFactors); DblArray cellVals = reader.nextRow(); boolean extra = false; @@ -1066,6 +1075,64 @@ private static AColGroup compressLinearFunctional(IColIndex colIndexes, MatrixBl return ColGroupLinearFunctional.create(colIndexes, coefficients, numRows); } + /** + * This method is the entry point to compress a matrix with piecewise linear compression The first method uses a + * segmented least squares with dynamic programming to compress the columns The second method uses a successive + * compression method, which compares each values in linear time and checks if the targetloss exceeded + * + * @param colIndexes the column indices to compress + * @param in the input Matrixblock containing the data + * @param cs compression settings to define the target loss, which should be considered + * @return a piecewise linear compressed column group + */ + + public static AColGroup compressPiecewiseLinearFunctional(IColIndex colIndexes, MatrixBlock in, + CompressionSettings cs) { + + final int numRows = in.getNumRows(); + final int numCols = colIndexes.size(); + int[][] breakpointsPerCol = new int[numCols][]; + double[][] slopesPerCol = new double[numCols][]; + double[][] interceptsPerCol = new double[numCols][]; + + for(int col = 0; col < numCols; col++) { + final int colIdx = colIndexes.get(col); + double[] column = PiecewiseLinearUtils.getColumn(in, colIdx); + PiecewiseLinearUtils.SegmentedRegression fit = PiecewiseLinearUtils.compressSegmentedLeastSquares(column, + cs); + breakpointsPerCol[col] = fit.getBreakpoints(); + interceptsPerCol[col] = fit.getIntercepts(); + slopesPerCol[col] = fit.getSlopes(); + + } + return ColGroupPiecewiseLinearCompressed.create(colIndexes, breakpointsPerCol, slopesPerCol, interceptsPerCol, + numRows); + + } + + public static AColGroup compressPiecewiseLinearFunctionalSuccessive(IColIndex colIndexes, MatrixBlock in, + CompressionSettings cs) { + final int numRows = in.getNumRows(); + final int numCols = colIndexes.size(); + int[][] breakpointsPerCol = new int[numCols][]; + double[][] slopesPerCol = new double[numCols][]; + double[][] interceptsPerCol = new double[numCols][]; + + for(int col = 0; col < numCols; col++) { + final int colIdx = colIndexes.get(col); + double[] column = PiecewiseLinearUtils.getColumn(in, colIdx); + PiecewiseLinearUtils.SegmentedRegression fit = PiecewiseLinearUtils.compressSuccessivePiecewiseLinear( + column, cs); + breakpointsPerCol[col] = fit.getBreakpoints(); + interceptsPerCol[col] = fit.getIntercepts(); + slopesPerCol[col] = fit.getSlopes(); + + } + return ColGroupPiecewiseLinearCompressed.create(colIndexes, breakpointsPerCol, slopesPerCol, interceptsPerCol, + numRows); + + } + private AColGroup compressSDCFromSparseTransposedBlock(IColIndex cols, int nrUniqueEstimate, double tupleSparsity) { if(cols.size() > 1) return compressMultiColSDCFromSparseTransposedBlock(cols, nrUniqueEstimate, tupleSparsity); diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupPiecewiseLinearCompressed.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupPiecewiseLinearCompressed.java new file mode 100644 index 00000000000..f05a5d46e79 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupPiecewiseLinearCompressed.java @@ -0,0 +1,678 @@ +package org.apache.sysds.runtime.compress.colgroup; + +import org.apache.commons.lang3.NotImplementedException; +import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; +import org.apache.sysds.runtime.compress.colgroup.scheme.ICLAScheme; +import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator; +import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup; +import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.data.SparseBlock; +import org.apache.sysds.runtime.data.SparseBlockMCSR; +import org.apache.sysds.runtime.functionobjects.*; +import org.apache.sysds.runtime.instructions.cp.CmCovObject; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.BinaryOperator; +import org.apache.sysds.runtime.matrix.operators.CMOperator; +import org.apache.sysds.runtime.matrix.operators.ScalarOperator; +import org.apache.sysds.runtime.matrix.operators.UnaryOperator; +import org.apache.sysds.utils.MemoryEstimates; + +import java.util.Arrays; + +/** + * This class represents a new ColGroup which is compresses column into segments (piecewise linear) to represent the + * original Data each column is approximate by a set of linear segments defined by breakpoints, slopes and intercepts + */ + +public class ColGroupPiecewiseLinearCompressed extends AColGroupCompressed { + /** + * breakpoints indices per column to define the segment boundaries slopes of the regression line per segment per + * column intercepts of the regression line per segment per column + */ + int[][] breakpointsPerCol; + double[][] slopesPerCol; + double[][] interceptsPerCol; + int numRows; + + protected ColGroupPiecewiseLinearCompressed(IColIndex colIndices) { + super(colIndices); + } + + public ColGroupPiecewiseLinearCompressed(IColIndex colIndices, int[][] breakpoints, double[][] slopes, + double[][] intercepts, int numRows) { + super(colIndices); + this.breakpointsPerCol = breakpoints; + this.slopesPerCol = slopes.clone(); + this.interceptsPerCol = intercepts.clone(); + this.numRows = numRows; + } + + /** + * creates a new piecewise linear compress column group validates inputs and copies all arrays before storing + * + * @param colIndices the column indices this group represents + * @param breakpointsPerCol breakpoint indices per column + * @param slopesPerCol slope of each segment per column + * @param interceptsPerCol intercept of each segment per column + * @param numRows number of rows in the original matrix + * @return a new ColGroupPiecewiseLinearCompressed instance + * @throws IllegalArgumentException if breakpoints are invalid or arrays are inconsistent + */ + + public static AColGroup create(IColIndex colIndices, int[][] breakpointsPerCol, double[][] slopesPerCol, + double[][] interceptsPerCol, int numRows) { + final int numCols = colIndices.size(); + if(breakpointsPerCol.length != numCols) + throw new IllegalArgumentException( + "bp.length=" + breakpointsPerCol.length + " != colIndices.size()=" + numCols); + + for(int c = 0; c < numCols; c++) { + if(breakpointsPerCol[c].length < 1 || breakpointsPerCol[c][0] != 0 || + breakpointsPerCol[c][breakpointsPerCol[c].length - 1] != numRows) + throw new IllegalArgumentException( + "Invalid breakpoints for col " + c + ": must start=0, end=numRows, >=1 pts"); + + if(slopesPerCol[c].length != interceptsPerCol[c].length || + slopesPerCol[c].length != breakpointsPerCol[c].length - 1) + throw new IllegalArgumentException("Inconsistent array lengths col " + c); + } + + int[][] bpCopy = new int[numCols][]; + double[][] slopeCopy = new double[numCols][]; + double[][] interceptCopy = new double[numCols][]; + // defensive copy to prevent external modification + for(int c = 0; c < numCols; c++) { + bpCopy[c] = Arrays.copyOf(breakpointsPerCol[c], breakpointsPerCol[c].length); + slopeCopy[c] = Arrays.copyOf(slopesPerCol[c], slopesPerCol[c].length); + interceptCopy[c] = Arrays.copyOf(interceptsPerCol[c], interceptsPerCol[c].length); + } + + return new ColGroupPiecewiseLinearCompressed(colIndices, bpCopy, slopeCopy, interceptCopy, numRows); + + } + + /** + * Decompresses a ColGroupPiecewiseLinearCompress into a DenseBlock Each value is reconstructed via slopes[seg]*row + * + intercept[seg] + * + * @param db Target DenseBlock + * @param rl Row to start decompression from + * @param ru Row to end decompression at (not inclusive) + * @param offR Row offset into the target to decompress + * @param offC Column offset into the target to decompress + */ + @Override + public void decompressToDenseBlock(DenseBlock db, int rl, int ru, int offR, int offC) { + if(db == null || _colIndexes == null || _colIndexes.size() == 0 || breakpointsPerCol == null || + slopesPerCol == null || interceptsPerCol == null) { + return; + } + for(int col = 0; col < _colIndexes.size(); col++) { + final int colIndex = _colIndexes.get(col); + int[] breakpoints = breakpointsPerCol[col]; + double[] slopes = slopesPerCol[col]; + double[] intercepts = interceptsPerCol[col]; + // per segment in this column + for(int seg = 0; seg + 1 < breakpoints.length; seg++) { + int segStart = breakpoints[seg]; + int segEnd = breakpoints[seg + 1]; + if(segStart >= segEnd) + continue; + + double currentSlopeInSegment = slopes[seg]; + double currentInterceptInSegment = intercepts[seg]; + // intersect segment with requested row range [rl, ru) + + int rowStart = Math.max(segStart, rl); + int rowEnd = Math.min(segEnd, ru); + if(rowStart >= rowEnd) + continue; + + //Fill DenseBlock für this column and Segment + for(int row = rowStart; row < rowEnd; row++) { + double yhat = currentSlopeInSegment * row + currentInterceptInSegment; + int dbRow = offR + row; + int dbCol = offC + colIndex; + + if(dbRow >= 0 && dbRow < db.numRows() && dbCol >= 0 && dbCol < db.numCols()) { + db.set(dbRow, dbCol, yhat); + } + } + + } + + } + } + + public int[][] getBreakpointsPerCol() { + return breakpointsPerCol; + } + + public double[][] getSlopesPerCol() { + return slopesPerCol; + } + + public double[][] getInterceptsPerCol() { + return interceptsPerCol; + } + + /** + * Return a decompressed value at row r and column colIdx uses binary search to find the correct segment + * + * @param r row + * @param colIdx column index in the _colIndexes. + * @return reconstructed value with slope[segment]*r+intercepts[segment] + */ + @Override + public double getIdx(int r, int colIdx) { + //safety check + if(r < 0 || r >= numRows || colIdx < 0 || colIdx >= _colIndexes.size()) { + return 0.0; + } + int[] breakpoints = breakpointsPerCol[colIdx]; + double[] slopes = slopesPerCol[colIdx]; + double[] intercepts = interceptsPerCol[colIdx]; + // binary search for the segment containing row r + int lowerBound = 0; + int higherBound = breakpoints.length - 2; + while(lowerBound <= higherBound) { + int mid = (lowerBound + higherBound) / 2; + if(r < breakpoints[mid + 1]) { + higherBound = mid - 1; + } + else + lowerBound = mid + 1; + } + int segment = Math.min(lowerBound, breakpoints.length - 2); + return slopes[segment] * (double) r + intercepts[segment]; + } + + /** + * Returns a total number of stored values remaining all columns counting breakpoints, slopes and intercepts per + * column + * + * @return total number of stored compression values + */ + @Override + public int getNumValues() { + int total = 0; + for(int c = 0; c < _colIndexes.size(); c++) { + total += breakpointsPerCol[c].length + slopesPerCol[c].length + interceptsPerCol[c].length; + } + return total; + } + + /** + * Returns the exact size on disk in bytes includes per column arrays for breakpoints, slopes, intercepts + * + * @return size in bytes + */ + @Override + public long getExactSizeOnDisk() { + long ret = super.getExactSizeOnDisk(); + int numCols = _colIndexes.size(); + ret += 8L * numCols * 3; //array reference pointers + ret += 24L * 3; // outer array headers + ret += 4L; //numRows field + + for(int c = 0; c < numCols; c++) { + ret += (long) MemoryEstimates.intArrayCost(breakpointsPerCol[c].length); + ret += (long) MemoryEstimates.doubleArrayCost(slopesPerCol[c].length); + ret += (long) MemoryEstimates.doubleArrayCost(interceptsPerCol[c].length); + } + + return ret; + + } + + /** + * Computes the column sums of the decompressed matrix using sum of arithmetic series Where sumX = len * (2*start + + * len - 1) / 2 + * + * @param c output array to accumulate column sums into + * @param nRows number of rows, which is used because it is covered by the breakpoints + */ + @Override + public void computeSum(double[] c, int nRows) { + for(int col = 0; col < _colIndexes.size(); col++) { + double sum = 0.0; + int[] breakpoints = breakpointsPerCol[col]; + double[] intercepts = interceptsPerCol[col]; + double[] slopes = slopesPerCol[col]; + + for(int seg = 0; seg < slopes.length; seg++) { + int start = breakpoints[seg]; + int end = breakpoints[seg + 1]; + int len = end - start; + if(len <= 0) + continue; + + double sumX = (double) len * (2.0 * start + (len - 1)) / 2.0; + sum += slopes[seg] * sumX + intercepts[seg] * len; + } + c[col] += sum; + } + } + + /** + * Computes column sums by delegating to computeSum Methods are identical because every ColGroup just knows its own + * column + * + * @param c The array to add the column sum to. + * @param nRows The number of rows in the column group. + */ + + @Override + public void computeColSums(double[] c, int nRows) { + computeSum(c, nRows); + } + + @Override + public CompressionType getCompType() { + return CompressionType.PiecewiseLinear; + } + + @Override + protected ColGroupType getColGroupType() { + return ColGroupType.PiecewiseLinear; + } + + /** + * Applies a scalar operation to all segments of this column group For plus/minus operation are only the intercepts + * modified For Multiply/Divide slopes and intercepts are scaled + * + * @param op operation to perform + * @return a new ColGroupPiecewiseLinearCompressed with updated coefficients + * @throws NotImplementedException if the operator is not plus, minus, multiply or divide + */ + @Override + public AColGroup scalarOperation(ScalarOperator op) { + final int numCols = _colIndexes.size(); + + if(!(op.fn instanceof Plus || op.fn instanceof Minus || op.fn instanceof Multiply || op.fn instanceof Divide)) { + throw new NotImplementedException("Unsupported scalar op: " + op.fn.getClass().getSimpleName()); + } + + double[][] newIntercepts = new double[numCols][]; + double[][] newSlopes = new double[numCols][]; + + for(int col = 0; col < numCols; col++) { + final int numSegments = interceptsPerCol[col].length; + newIntercepts[col] = new double[numSegments]; + newSlopes[col] = new double[numSegments]; + + for(int seg = 0; seg < numSegments; seg++) { + if(op.fn instanceof Plus || op.fn instanceof Minus) { + // only intercepts changes + newSlopes[col][seg] = slopesPerCol[col][seg]; + newIntercepts[col][seg] = op.executeScalar(interceptsPerCol[col][seg]); + } + else { // Multiply/Divide + newSlopes[col][seg] = op.executeScalar(slopesPerCol[col][seg]); + newIntercepts[col][seg] = op.executeScalar(interceptsPerCol[col][seg]); + } + } + } + + return new ColGroupPiecewiseLinearCompressed(_colIndexes, breakpointsPerCol, newSlopes, newIntercepts, numRows); + } + + /** + * Applies a row vector operation from the left For plus/minus are the intercepts shifted For multiply/divide slopes + * and intercepts are scaled + * + * @param op The operation to execute + * @param v The vector of values to apply the values contained should be at least the length of the highest + * value in the column index + * @param isRowSafe True if the binary op is applied to an entire zero row and all results are zero + * @return a new ColGroupPiecewiseLinearCompressed with updated coefficients + */ + + @Override + public AColGroup binaryRowOpLeft(BinaryOperator op, double[] v, boolean isRowSafe) { + final int numCols = _colIndexes.size(); + double[][] newIntercepts = new double[numCols][]; + double[][] newSlopes = new double[numCols][]; + final boolean isAddSub = op.fn instanceof Plus || op.fn instanceof Minus; + + if(!isAddSub && !(op.fn instanceof Multiply || op.fn instanceof Divide)) + throw new NotImplementedException("Unsupported binary op: " + op.fn.getClass().getSimpleName()); + + for(int col = 0; col < numCols; col++) { + double rowValue = v[_colIndexes.get(col)]; + int numSegs = interceptsPerCol[col].length; + newIntercepts[col] = new double[numSegs]; + + // Plus/Minus: slope is translation-invariant, only intercept shifts + newSlopes[col] = isAddSub ? slopesPerCol[col].clone() : new double[numSegs]; + + for(int seg = 0; seg < numSegs; seg++) { + newIntercepts[col][seg] = op.fn.execute(rowValue, interceptsPerCol[col][seg]); + if(!isAddSub) + newSlopes[col][seg] = op.fn.execute(rowValue, slopesPerCol[col][seg]); + } + } + return new ColGroupPiecewiseLinearCompressed(_colIndexes, breakpointsPerCol, newSlopes, newIntercepts, numRows); + } + + /** + * Applies a row vector operation from the right For plus/minus are the intercepts shifted For multiply/divide + * slopes and intercepts are scaled + * + * @param op The operation to execute + * @param v The vector of values to apply the values contained should be at least the length of the highest + * value in the column index + * @param isRowSafe True if the binary op is applied to an entire zero row and all results are zero + * @return a new ColGroupPiecewiseLinearCompressed with updated coefficients + */ + @Override + public AColGroup binaryRowOpRight(BinaryOperator op, double[] v, boolean isRowSafe) { + final int numCols = _colIndexes.size(); + final boolean isAddSub = op.fn instanceof Plus || op.fn instanceof Minus; + + if(!isAddSub && !(op.fn instanceof Multiply || op.fn instanceof Divide)) + throw new NotImplementedException("Unsupported scalar op: " + op.fn.getClass().getSimpleName()); + + double[][] newSlopes = new double[numCols][]; + double[][] newIntercepts = new double[numCols][]; + + for(int col = 0; col < numCols; col++) { + double val = v[_colIndexes.get(col)]; + int numSegs = interceptsPerCol[col].length; + // Plus/Minus shifts intercept only, slopes are unchanged + newSlopes[col] = isAddSub ? slopesPerCol[col].clone() : new double[numSegs]; + newIntercepts[col] = new double[numSegs]; + + for(int seg = 0; seg < numSegs; seg++) { + newIntercepts[col][seg] = op.fn.execute(interceptsPerCol[col][seg], val); + if(!isAddSub) + newSlopes[col][seg] = op.fn.execute(slopesPerCol[col][seg], val); + } + } + return new ColGroupPiecewiseLinearCompressed(_colIndexes, breakpointsPerCol, newSlopes, newIntercepts, numRows); + } + + /** + * Returns true if any decompressed value in this column group equals the given pattern + * + * @param pattern The value to look for. + * @return true if pattern is found, else false + */ + @Override + public boolean containsValue(double pattern) { + for(int col = 0; col < _colIndexes.size(); col++) { + if(colContainsValue(col, pattern)) + return true; + } + return false; + } + + /** + * checks if any reconstructed value in column col equals the pattern for each segment, solves the m * x + b = + * pattern instead of scanning all rows + * + * @param col column index + * @param pattern the value to search for + * @return true if the pattern is found + */ + + private boolean colContainsValue(int col, double pattern) { + int[] breakpoints = breakpointsPerCol[col]; + double[] intercepts = interceptsPerCol[col]; + double[] slopes = slopesPerCol[col]; + for(int seg = 0; seg < breakpoints.length - 1; seg++) { + int start = breakpoints[seg]; + int len = breakpoints[seg + 1] - start; + if(len <= 0) + continue; + + double b = intercepts[seg]; + double m = slopes[seg]; + + if(m == 0.0) { + // constant segment: all values equal b + if(Double.compare(b, pattern) == 0) + return true; + continue; + } + + // check if pattern lies on the line: solve m*x + b = pattern for x + double x = (pattern - b) / m; + int xi = (int) x; + if(xi >= start && xi < start + len && Double.compare(m * xi + b, pattern) == 0) + return true; + } + return false; + } + + @Override + public AColGroup unaryOperation(UnaryOperator op) { + throw new NotImplementedException(); + } + + @Override + public AColGroup replace(double pattern, double replace) { + throw new NotImplementedException(); + } + + @Override + protected double computeMxx(double c, Builtin builtin) { + throw new NotImplementedException(); + } + + @Override + protected void computeColMxx(double[] c, Builtin builtin) { + throw new NotImplementedException(); + } + + @Override + protected void computeSumSq(double[] c, int nRows) { + throw new NotImplementedException(); + + } + + @Override + protected void computeColSumsSq(double[] c, int nRows) { + throw new NotImplementedException(); + + } + + @Override + protected void computeRowSums(double[] c, int rl, int ru, double[] preAgg) { + throw new NotImplementedException(); + + } + + @Override + protected void computeRowMxx(double[] c, Builtin builtin, int rl, int ru, double[] preAgg) { + throw new NotImplementedException(); + + } + + @Override + protected void computeProduct(double[] c, int nRows) { + throw new NotImplementedException(); + + } + + @Override + protected void computeRowProduct(double[] c, int rl, int ru, double[] preAgg) { + throw new NotImplementedException(); + + } + + @Override + protected void computeColProduct(double[] c, int nRows) { + throw new NotImplementedException(); + + } + + @Override + protected double[] preAggSumRows() { + throw new NotImplementedException(); + } + + @Override + protected double[] preAggSumSqRows() { + throw new NotImplementedException(); + } + + @Override + protected double[] preAggProductRows() { + throw new NotImplementedException(); + } + + @Override + protected double[] preAggBuiltinRows(Builtin builtin) { + throw new NotImplementedException(); + } + + @Override + public boolean sameIndexStructure(AColGroupCompressed that) { + throw new NotImplementedException(); + } + + @Override + protected void tsmm(double[] result, int numColumns, int nRows) { + throw new NotImplementedException(); + + } + + @Override + public AColGroup copyAndSet(IColIndex colIndexes) { + throw new NotImplementedException(); + } + + @Override + public void decompressToDenseBlockTransposed(DenseBlock db, int rl, int ru) { + throw new NotImplementedException(); + + } + + @Override + public void decompressToSparseBlockTransposed(SparseBlockMCSR sb, int nColOut) { + throw new NotImplementedException(); + + } + + @Override + public void decompressToSparseBlock(SparseBlock sb, int rl, int ru, int offR, int offC) { + throw new NotImplementedException(); + + } + + @Override + public AColGroup rightMultByMatrix(MatrixBlock right, IColIndex allCols, int k) { + throw new NotImplementedException(); + } + + @Override + public void leftMultByMatrixNoPreAgg(MatrixBlock matrix, MatrixBlock result, int rl, int ru, int cl, int cu) { + throw new NotImplementedException(); + + } + + @Override + public void leftMultByAColGroup(AColGroup lhs, MatrixBlock result, int nRows) { + throw new NotImplementedException(); + + } + + @Override + public void tsmmAColGroup(AColGroup other, MatrixBlock result) { + throw new NotImplementedException(); + + } + + @Override + protected AColGroup sliceSingleColumn(int idx) { + throw new NotImplementedException(); + } + + @Override + protected AColGroup sliceMultiColumns(int idStart, int idEnd, IColIndex outputCols) { + throw new NotImplementedException(); + } + + @Override + public AColGroup sliceRows(int rl, int ru) { + throw new NotImplementedException(); + } + + @Override + public long getNumberNonZeros(int nRows) { + throw new NotImplementedException(); + } + + @Override + public CmCovObject centralMoment(CMOperator op, int nRows) { + throw new NotImplementedException(); + } + + @Override + public AColGroup rexpandCols(int max, boolean ignore, boolean cast, int nRows) { + throw new NotImplementedException(); + } + + @Override + public double getCost(ComputationCostEstimator e, int nRows) { + throw new NotImplementedException(); + } + + @Override + public AColGroup append(AColGroup g) { + throw new NotImplementedException(); + } + + @Override + protected AColGroup appendNInternal(AColGroup[] groups, int blen, int rlen) { + throw new NotImplementedException(); + } + + @Override + public ICLAScheme getCompressionScheme() { + throw new NotImplementedException(); + } + + @Override + public AColGroup recompress() { + throw new NotImplementedException(); + } + + @Override + public CompressedSizeInfoColGroup getCompressionInfo(int nRow) { + throw new NotImplementedException(); + } + + @Override + protected AColGroup fixColIndexes(IColIndex newColIndex, int[] reordering) { + throw new NotImplementedException(); + } + + @Override + public AColGroup reduceCols() { + throw new NotImplementedException(); + } + + @Override + public double getSparsity() { + throw new NotImplementedException(); + } + + @Override + protected void sparseSelection(MatrixBlock selection, ColGroupUtils.P[] points, MatrixBlock ret, int rl, int ru) { + throw new NotImplementedException(); + } + + @Override + protected void denseSelection(MatrixBlock selection, ColGroupUtils.P[] points, MatrixBlock ret, int rl, int ru) { + throw new NotImplementedException(); + } + + @Override + public AColGroup[] splitReshape(int multiplier, int nRow, int nColOrg) { + throw new NotImplementedException(); + } + +} + diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/functional/PiecewiseLinearUtils.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/functional/PiecewiseLinearUtils.java new file mode 100644 index 00000000000..7b0b4bfa960 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/functional/PiecewiseLinearUtils.java @@ -0,0 +1,306 @@ +package org.apache.sysds.runtime.compress.colgroup.functional; +import org.apache.sysds.runtime.compress.CompressionSettings; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public class PiecewiseLinearUtils { + /** + * Utility methods for piecewise linear compression of matric columns + * supports compression used the segmented least squares algorithm which is implemented with dynamic programming + * and a successive method, which puts all values in a segment till the target loss is exceeded + */ + + private PiecewiseLinearUtils() { + + } + + public static final class SegmentedRegression { + private final int[] breakpoints; + private final double[] slopes; + private final double[] intercepts; + + public SegmentedRegression(int[] breakpoints, double[] slopes, double[] intercepts) { + this.breakpoints = breakpoints; + this.slopes = slopes; + this.intercepts = intercepts; + } + + public int[] getBreakpoints() { + return breakpoints; + } + + public double[] getSlopes() { + return slopes; + } + + public double[] getIntercepts() { + return intercepts; + } + } + + public static double[] getColumn(MatrixBlock in, int colIndex) { + final int numRows = in.getNumRows(); + final double[] column = new double[numRows]; + + for(int row = 0; row < numRows; row++) { + column[row] = in.get(row, colIndex); + } + return column; + } + + public static SegmentedRegression compressSegmentedLeastSquares(double[] column, CompressionSettings cs) { + //compute Breakpoints for a Column with dynamic Programming + final List breakpointsList = computeBreakpoints(cs, column); + final int[] breakpoints = breakpointsList.stream().mapToInt(Integer::intValue).toArray(); + + //get values for Regression + final int numSeg = breakpoints.length - 1; + final double[] slopes = new double[numSeg]; + final double[] intercepts = new double[numSeg]; + + // Regress per Segment + for(int seg = 0; seg < numSeg; seg++) { + final int SegStart = breakpoints[seg]; + final int SegEnd = breakpoints[seg + 1]; + + final double[] line = regressSegment(column, SegStart, SegEnd); + slopes[seg] = line[0]; //slope regession line + intercepts[seg] = line[1]; //intercept regression line + } + + return new SegmentedRegression(breakpoints, slopes, intercepts); + } + + public static SegmentedRegression compressSuccessivePiecewiseLinear(double[] column, CompressionSettings cs) { + //compute Breakpoints for a Column with a sukzessive breakpoints algorithm + + final List breakpointsList = computeBreakpointSuccessive(column, cs); + final int[] breakpoints = breakpointsList.stream().mapToInt(Integer::intValue).toArray(); + + //get values for Regression + final int numSeg = breakpoints.length - 1; + final double[] slopes = new double[numSeg]; + final double[] intercepts = new double[numSeg]; + + // Regress per Segment + for(int seg = 0; seg < numSeg; seg++) { + final int segstart = breakpoints[seg]; + final int segEnd = breakpoints[seg + 1]; + final double[] line = regressSegment(column, segstart, segEnd); + slopes[seg] = line[0]; + intercepts[seg] = line[1]; + } + return new SegmentedRegression(breakpoints, slopes, intercepts); + } + + /** + * Computes breakpoints for a column using segmented least squares with dynamic programming + * Iteratively reduces lambda to increase the number of segments until the target MSE is met. + * + * @param cs compression settings containing the target loss + * @param column the column values to segment + * @return list of breakpoint indices, starting with 0 + */ + public static List computeBreakpoints(CompressionSettings cs, double[] column) { + final int numElements = column.length; + final double targetMSE = cs.getPiecewiseTargetLoss(); + final double sseMax = numElements * targetMSE; // max allowed total SSE + + //start with high lambda an reduce iteratively + double lambda = Math.max(10.0, sseMax * 2.0); + List bestBreaks = Arrays.asList(0, numElements); + double bestSSE = computeTotalSSE(column, bestBreaks); + + for (int iter = 0; iter < 50; iter++) { + List breaks = computeBreakpointsLambda(column, lambda); + double totalSSE = computeTotalSSE(column, breaks); + int numSegs = breaks.size() - 1; + + if (totalSSE < bestSSE) { + bestSSE = totalSSE; + bestBreaks = new ArrayList<>(breaks); + } + //target loss reached + if (bestSSE <= sseMax) { + return bestBreaks; + } + + // only one segment left, break condition + if (numSegs <= 1) { + break; + } + // reducing lambda to allow more segments in next iteration + lambda *= 0.8; + } + + return bestBreaks; + } + + /** + * Computes optimal breakpoints, each segment has a SEE plus a + + */ + + public static List computeBreakpointsLambda(double[] column, double lambda) { + final int n = column.length; + final double[] costs = new double[n + 1]; // min cost to reach i + final int[] prev = new int[n + 1]; + + Arrays.fill(costs, Double.POSITIVE_INFINITY); + costs[0] = 0.0; + // precompute all segment costs to avoid recomputation in dynamic programming + double[][] segCosts = new double[n+1][n+1]; + for(int i = 0; i < n; i++) { + for(int j = i+1; j <= n; j++) { + segCosts[i][j] = computeSegmentCost(column, i, j); + } + } + // for each point j, find the cheapest previous breakpoint i + for(int j = 1; j <= n; j++) { + for(int i = 0; i < j; i++) { + // cost equals the SSE of segment [i,j] plus penalty plus best costs + double cost = costs[i] + segCosts[i][j] + lambda; + if(cost < costs[j]) { + costs[j] = cost; + prev[j] = i; + } + } + } + + // Backtrack to previous points to recover the breakpoints + List breaks = new ArrayList<>(); + int j = n; + while(j > 0) { + breaks.add(j); + j = prev[j]; + } + breaks.add(0); + Collections.reverse(breaks); + return breaks; + } + + /** + * computes the segment cost + * @param column column values + * @param start start index + * @param end end index + * @return SSE of the regression line over the segment + */ + public static double computeSegmentCost(double[] column, int start, int end) { + final int segSize = end - start; + if(segSize <= 1) + return 0.0; + + final double[] ab = regressSegment(column, start, end); + final double slope = ab[0]; + final double intercept = ab[1]; + + double sse = 0.0; + for(int i = start; i < end; i++) { + double err = column[i] - (slope * i + intercept); + sse += err * err; + } + return sse; + } + + /** + * computes the total SSE over all segments defined by the given breakpoints + * @param column + * @param breaks + * @return sum of the total SSE + */ + public static double computeTotalSSE(double[] column, List breaks) { + double total = 0.0; + for(int s = 0; s < breaks.size() - 1; s++) { + final int start = breaks.get(s); + final int end = breaks.get(s + 1); + total += computeSegmentCost(column, start, end); + } + return total; + } + + public static double[] regressSegment(double[] column, int start, int end) { + final int numElements = end - start; + if(numElements <= 0) + return new double[] {0.0, 0.0}; + + double sumOfRowIndices = 0, sumOfColumnValues = 0, sumOfRowIndicesSquared = 0, productRowIndexTimesColumnValue = 0; + for(int i = start; i < end; i++) { + sumOfRowIndices += i; + sumOfColumnValues += column[i]; + sumOfRowIndicesSquared += i * i; + productRowIndexTimesColumnValue += i * column[i]; + } + + + final double denominatorForSlope = + numElements * sumOfRowIndicesSquared - sumOfRowIndices * sumOfRowIndices; + final double slope; + final double intercept; + if(denominatorForSlope == 0) { + slope = 0.0; + intercept = sumOfColumnValues / numElements; + } + else { + slope = (numElements * productRowIndexTimesColumnValue - sumOfRowIndices * sumOfColumnValues) / + denominatorForSlope; + intercept = (sumOfColumnValues - slope * sumOfRowIndices) / numElements; + } + return new double[] {slope, intercept}; + } + + /** + * computes breakpoints for a column using a successive algorithm + * extends each segment until the SEE reaches the target loss, then start a new segment + * @param column column values + * @param cs compression setting for setting the target loss + * @return list of breakpoint indices + */ + public static List computeBreakpointSuccessive(double[] column, CompressionSettings cs) { + final int numElements = column.length; + final double targetMSE = cs.getPiecewiseTargetLoss(); + if (Double.isNaN(targetMSE) || targetMSE <= 0) { + return Arrays.asList(0, numElements); // fallback single segment + } + + List breakpoints = new ArrayList<>(); + breakpoints.add(0); + int currentStart = 0; + + while (currentStart < numElements) { + int bestEnd = -1; // no end found + + for (int end = currentStart + 1; end <= numElements; end++) { + double sse = computeSegmentCost(column, currentStart, end); + if(sse > (end - currentStart) * targetMSE) { + // end-1 is last valid end; if end == segStart+1 force min segment of length 1 + bestEnd = (end == currentStart + 1) ? end : end - 1; + break; + } + } + + if (bestEnd == -1) { + bestEnd = numElements;// all remaining points fitting within budget + } + + // safety guard not allow zero segments + if (bestEnd <= currentStart) { + bestEnd = Math.min(currentStart + 1, numElements); + } + + breakpoints.add(bestEnd); + currentStart = bestEnd; + } + + // make sure, that the last breakpoint equals numElements + int last = breakpoints.get(breakpoints.size() - 1); + if (last != numElements) { + breakpoints.add(numElements); + } + + return breakpoints; + } +} diff --git a/src/test/java/org/apache/sysds/performance/PiecewiseLinearCompressionPerformanceTest.java b/src/test/java/org/apache/sysds/performance/PiecewiseLinearCompressionPerformanceTest.java new file mode 100644 index 00000000000..6046bdfb20b --- /dev/null +++ b/src/test/java/org/apache/sysds/performance/PiecewiseLinearCompressionPerformanceTest.java @@ -0,0 +1,168 @@ +package org.apache.sysds.performance; + +import org.apache.sysds.runtime.compress.CompressionSettings; +import org.apache.sysds.runtime.compress.CompressionSettingsBuilder; +import org.apache.sysds.runtime.compress.colgroup.AColGroup; +import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory; +import org.apache.sysds.runtime.compress.colgroup.ColGroupPiecewiseLinearCompressed; +import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; +import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.utils.stats.Timing; +import java.util.Random; + +/** + * Performance benchmark for piecewise linear compression. + * Successive is benchmarked across large matrices to show scalability. + * DP is only used as a quality reference on small matrices due to quadratic complexity + + */ +public class PiecewiseLinearCompressionPerformanceTest { + + //different target losses : loose, avg, strict + private static final double[] LOSSES = {1e-1, 1e-2, 1e-4}; + // how often compressed + private static final int REPS = 3; + + /** + * generate of a time series matrix to have a realistic test set up + * @param nr number of rows + * @param nc number of columns + * @return matrix with random generated data + */ + private static MatrixBlock generateTestMatrix(int nr, int nc) { + MatrixBlock mb = new MatrixBlock(nr, nc, true); + mb.allocateDenseBlock(); + Random rng = new Random(42); + for(int c = 0; c < nc; c++) { + double trend = 0.001 * c; + double level = rng.nextDouble() * 5.0; + double volatility = 0.1 + 0.01 * c; + double residual = 0.0; + + for(int row = 0; row < nr; row++) { + // random level shift every 75-150 rows + if(row % (75 + (int)(75 * rng.nextDouble())) == 0) { + level += (rng.nextDouble() - 0.5) * 2.0; + trend += (rng.nextDouble() - 0.5) * 0.0005; + } + // noise: residual = 0.7 * prev + random + residual = 0.7 * residual + rng.nextGaussian() * volatility; + mb.set(row, c, Math.max(0, trend * row + level + residual)); + } + } + return mb; + } + /// returns a average number of segments per column + private static double avgSegments(AColGroup cg) { + int[][] breakpoints = ((ColGroupPiecewiseLinearCompressed) cg).getBreakpointsPerCol(); + int total = 0; + for(int[] bp : breakpoints) total += bp.length - 1; + return total / (double) breakpoints.length; + } + + /** + * computes MSE between the compression, the original data and decompression + * @param orig original matrix + * @param cg piecewise linear compressed column group + * @return MSE + */ + private static double reconstructionMSE(MatrixBlock orig, AColGroup cg) { + int nr = orig.getNumRows(), nc = orig.getNumColumns(); + MatrixBlock recon = new MatrixBlock(nr, nc, false); + recon.allocateDenseBlock(); + cg.decompressToDenseBlock(recon.getDenseBlock(), 0, nr, 0, 0); + double sse = 0; + for(int r = 0; r < nr; r++) + for(int c = 0; c < nc; c++) { + double diff = orig.get(r, c) - recon.get(r, c); + sse += diff * diff; + } + return sse / (nr * nc); + } + + /** + * benchmarks successive compression for a given matrix and target loss + * reports segments, compressed data size, runtime and reconstruction + * @param mb original matrix to compress + * @param loss target loss param + */ + private static void benchmarkSuccessive(MatrixBlock mb, double loss) { + long origSize = mb.getInMemorySize(); + int numRows = mb.getNumRows(), numCol = mb.getNumColumns(); + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(loss); + IColIndex colIndexes = ColIndexFactory.create(numCol); + + ColGroupFactory.compressPiecewiseLinearFunctionalSuccessive(colIndexes, mb, cs); + + Timing t = new Timing(); + AColGroup cg = null; + t.start(); + for(int i = 0; i < REPS; i++) + cg = ColGroupFactory.compressPiecewiseLinearFunctionalSuccessive(colIndexes, mb, cs); + double time = t.stop() / REPS; + + long size = cg.getExactSizeOnDisk(); + String saving = size < origSize + ? String.format("saved %3.0f%%", 100.0 - 100.0 * size / origSize) + : String.format("larger +%.0f%%", 100.0 * size / origSize - 100); + + System.out.printf(" successive loss=%.0e %5.1f segs %6.2f MB (%s) %6.1f ms MSE=%.2e%n", + loss, avgSegments(cg), size / 1e6, saving, time, reconstructionMSE(mb, cg)); + } + + /** + * benchmarks dynamic programming compression for a given matrix and target loss + * no repetition, because DP is too slow due complexity + * reports segments, compressed data size, runtime and reconstruction + * @param mb original matrix to compress + * @param loss target loss param + */ + private static void benchmarkDP(MatrixBlock mb, double loss) { + long origSize = mb.getInMemorySize(); + int numColumns = mb.getNumColumns(); + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(loss); + IColIndex colIndexes = ColIndexFactory.create(numColumns); + + Timing t = new Timing(); + t.start(); + AColGroup cg = ColGroupFactory.compressPiecewiseLinearFunctional(colIndexes, mb, cs); + double time = t.stop(); + + long size = cg.getExactSizeOnDisk(); + String saving = size < origSize + ? String.format("saved %3.0f%%", 100.0 - 100.0 * size / origSize) + : String.format("LARGER +%.0f%%", 100.0 * size / origSize - 100); + + System.out.printf(" DP loss=%.0e %5.1f segs %6.2f MB (%s) %6.1f ms MSE=%.2e%n", + loss, avgSegments(cg), size / 1e6, saving, time, reconstructionMSE(mb, cg)); + } + + public static void main(String[] args) { + System.out.println("=== Piecewise Linear Compression Benchmark ===\n"); + + // Successive scalability across large matrices + System.out.println("=== Successive: scalability ==="); + int[][] configs = {{1000, 10}, {1000, 100}, {1000, 500}, + {5000, 10}, {5000, 100}, {5000, 500}, + {10000, 10}, {10000, 100}, {10000, 500}}; + + for(int[] cfg : configs) { + int nr = cfg[0], nc = cfg[1]; + MatrixBlock mb = generateTestMatrix(nr, nc); + System.out.printf("%nnrows=%d ncols=%d original=%.2f MB%n", + nr, nc, mb.getInMemorySize() / 1e6); + for(double loss : LOSSES) + benchmarkSuccessive(mb, loss); + } + + // DP quality reference on small matrix + System.out.println("\n=== DP: quality reference (nrows=1000, ncols=10) ==="); + MatrixBlock mbSmall = generateTestMatrix(1000, 10); + System.out.printf("original=%.2f MB%n", mbSmall.getInMemorySize() / 1e6); + for(double loss : LOSSES) + benchmarkDP(mbSmall, loss); + } +} diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupPiecewiseLinearCompressedOperationsTest.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupPiecewiseLinearCompressedOperationsTest.java new file mode 100644 index 00000000000..53ae3a1277c --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupPiecewiseLinearCompressedOperationsTest.java @@ -0,0 +1,308 @@ +package org.apache.sysds.test.component.compress.colgroup; + +import org.apache.sysds.runtime.compress.CompressionSettings; +import org.apache.sysds.runtime.compress.CompressionSettingsBuilder; +import org.apache.sysds.runtime.compress.colgroup.AColGroup; +import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory; +import org.apache.sysds.runtime.compress.colgroup.ColGroupPiecewiseLinearCompressed; +import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; +import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; +import org.apache.sysds.runtime.functionobjects.Divide; +import org.apache.sysds.runtime.functionobjects.Minus; +import org.apache.sysds.runtime.functionobjects.Multiply; +import org.apache.sysds.runtime.functionobjects.Plus; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.BinaryOperator; +import org.apache.sysds.runtime.matrix.operators.RightScalarOperator; +import org.apache.sysds.runtime.matrix.operators.ScalarOperator; +import org.apache.sysds.runtime.util.DataConverter; +import org.apache.sysds.test.AutomatedTestBase; +import org.junit.Before; +import org.junit.Test; + +import java.util.Random; + +import static org.junit.Assert.*; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests for ColGroupPiecewiseLinearCompressed operations containing: scalarOperation, binaryRowOps, computeSum, + * containsValue, getIdx, getExactSizeOnDisk. + */ +public class ColGroupPiecewiseLinearCompressedOperationsTest extends AutomatedTestBase { + + private static final long SEED = 42L; + private static final int NROWS = 50; + private static final int NCOLS = 3; + private static final double TARGET_LOSS = 1e-8; + private static final double DELTA = 1e-9; + + private ColGroupPiecewiseLinearCompressed piecewiseLinearColGroup; + private MatrixBlock orignalMB; + private MatrixBlock decompressedMB; + private IColIndex colIndexes; + private int numRows; + private int numCols; + + @Before + public void setUp() { + numRows = NROWS; + numCols = NCOLS; + + /// generate random matrix + double[][] data = getRandomMatrix(numRows, numCols, -3, 3, 1.0, SEED); + orignalMB = DataConverter.convertToMatrixBlock(data); + orignalMB.allocateDenseBlock(); + + colIndexes = ColIndexFactory.create(buildColArray(numCols)); + + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(TARGET_LOSS); + + /// create ColGroupPiecewiseLinearCompressed instance + AColGroup result = ColGroupFactory.compressPiecewiseLinearFunctional(colIndexes, orignalMB, cs); + assertTrue(result instanceof ColGroupPiecewiseLinearCompressed); + piecewiseLinearColGroup = (ColGroupPiecewiseLinearCompressed) result; + + /// decompress again + decompressedMB = decompress(piecewiseLinearColGroup); + } + + private MatrixBlock decompress(AColGroup cg) { + MatrixBlock mb = new MatrixBlock(numRows, numCols, false); + mb.allocateDenseBlock(); + cg.decompressToDenseBlock(mb.getDenseBlock(), 0, numRows, 0, 0); + return mb; + } + + /// check elementwise to compare results from compressed and decompressed matrixblock + private void checkMatrixEquals(String msg, MatrixBlock mb1, MatrixBlock mb2) { + if(mb1.getNumRows() != mb2.getNumRows() || mb1.getNumColumns() != mb2.getNumColumns()) + fail(msg + " dimension mismatch"); + for(int r = 0; r < numRows; r++) + for(int c = 0; c < numCols; c++) + assertEquals(msg + "[" + r + "," + c + "]", mb1.get(r, c), mb2.get(r, c), DELTA); + } + + /// compute column sum to validate + private double[] computeSums(MatrixBlock mb) { + double[] sums = new double[numCols]; + for(int c = 0; c < numCols; c++) + for(int r = 0; r < numRows; r++) + sums[c] += mb.get(r, c); + return sums; + } + + /// create row vector + private double[] buildRowVector() { + double[] v = new double[numCols]; + for(int i = 0; i < numCols; i++) + v[i] = 0.5 * (i + 1); + return v; + } + + private int[] buildColArray(int n) { + int[] cols = new int[n]; + for(int i = 0; i < n; i++) + cols[i] = i; + return cols; + } + + private MatrixBlock applyBinaryRowOpLeft(MatrixBlock mb, BinaryOperator op, double[] v) { + MatrixBlock result = new MatrixBlock(numRows, numCols, false); + result.allocateDenseBlock(); + for(int r = 0; r < numRows; r++) + for(int c = 0; c < numCols; c++) + result.getDenseBlock().set(r, c, op.fn.execute(v[c], mb.get(r, c))); + return result; + } + + private MatrixBlock applyBinaryRowOpRight(MatrixBlock mb, BinaryOperator op, double[] v) { + MatrixBlock result = new MatrixBlock(numRows, numCols, false); + result.allocateDenseBlock(); + for(int r = 0; r < numRows; r++) + for(int c = 0; c < numCols; c++) + result.getDenseBlock().set(r, c, op.fn.execute(mb.get(r, c), v[c])); + return result; + } + + @Test + public void testComputeSum() { + double[] sumsComp = new double[numCols]; + piecewiseLinearColGroup.computeSum(sumsComp, numRows); + assertArrayEquals(sumsComp, computeSums(decompressedMB), DELTA); + } + + @Test + public void testComputeColSums() { + double[] sumsComp = new double[numCols]; + piecewiseLinearColGroup.computeColSums(sumsComp, numRows); + assertArrayEquals(sumsComp, computeSums(decompressedMB), DELTA); + } + + private void testScalarOp(ScalarOperator op, double scalar) { + MatrixBlock expected = new MatrixBlock(numRows, numCols, false); + expected.allocateDenseBlock(); + for(int r = 0; r < numRows; r++) + for(int c = 0; c < numCols; c++) + expected.getDenseBlock().set(r, c, op.fn.execute(decompressedMB.get(r, c), scalar)); + + checkMatrixEquals("scalarOp " + op.fn.getClass().getSimpleName(), expected, + decompress(piecewiseLinearColGroup.scalarOperation(op))); + } + + @Test + public void testScalarPlus() { + testScalarOp(new RightScalarOperator(Plus.getPlusFnObject(), 3.7), 3.7); + } + + @Test + public void testScalarMinus() { + testScalarOp(new RightScalarOperator(Minus.getMinusFnObject(), 1.5), 1.5); + } + + @Test + public void testScalarMultiply() { + testScalarOp(new RightScalarOperator(Multiply.getMultiplyFnObject(), 2.0), 2.0); + } + + @Test + public void testScalarDivide() { + testScalarOp(new RightScalarOperator(Divide.getDivideFnObject(), 4.0), 4.0); + } + + @Test + public void testBinaryRowOpLeftPlus() { + BinaryOperator op = new BinaryOperator(Plus.getPlusFnObject()); + double[] v = buildRowVector(); + checkMatrixEquals("binaryRowOpLeft Plus", applyBinaryRowOpLeft(decompressedMB, op, v), + decompress(piecewiseLinearColGroup.binaryRowOpLeft(op, v, false))); + } + + @Test + public void testBinaryRowOpLeftMultiply() { + BinaryOperator op = new BinaryOperator(Multiply.getMultiplyFnObject()); + double[] v = buildRowVector(); + checkMatrixEquals("binaryRowOpLeft Multiply", applyBinaryRowOpLeft(decompressedMB, op, v), + decompress(piecewiseLinearColGroup.binaryRowOpLeft(op, v, false))); + } + + @Test + public void testBinaryRowOpRightMinus() { + BinaryOperator op = new BinaryOperator(Minus.getMinusFnObject()); + double[] v = buildRowVector(); + checkMatrixEquals("binaryRowOpRight Minus", applyBinaryRowOpRight(decompressedMB, op, v), + decompress(piecewiseLinearColGroup.binaryRowOpRight(op, v, false))); + } + + @Test + public void testBinaryRowOpRightDivide() { + BinaryOperator op = new BinaryOperator(Divide.getDivideFnObject()); + double[] v = buildRowVector(); + checkMatrixEquals("binaryRowOpRight Divide", applyBinaryRowOpRight(decompressedMB, op, v), + decompress(piecewiseLinearColGroup.binaryRowOpRight(op, v, false))); + } + + @Test + public void testContainsValueIntercept() { + double pattern = piecewiseLinearColGroup.getInterceptsPerCol()[0][0]; + assertTrue("intercept of col 0 seg 0 should exist", piecewiseLinearColGroup.containsValue(pattern)); + } + + @Test + public void testContainsValueEndpoint() { + int[] breakpoints = piecewiseLinearColGroup.getBreakpointsPerCol()[0]; + double[] intercepts = piecewiseLinearColGroup.getInterceptsPerCol()[0]; + double[] slopes = piecewiseLinearColGroup.getSlopesPerCol()[0]; + if(breakpoints.length > 1) { + double pattern = intercepts[0] + slopes[0] * (breakpoints[1] - breakpoints[0] - 1); + assertTrue("endpoint of col 0 seg 0 should exist", piecewiseLinearColGroup.containsValue(pattern)); + } + } + + @Test + public void testContainsValueConstantSegment() { + ColGroupPiecewiseLinearCompressed cg = (ColGroupPiecewiseLinearCompressed) ColGroupPiecewiseLinearCompressed.create( + ColIndexFactory.create(new int[] {0}), new int[][] {{0, numRows}}, new double[][] {{0.0}}, + new double[][] {{1.23}}, numRows); + + assertTrue("constant value 1.23 should exist", cg.containsValue(1.23)); + assertFalse("value 2.0 should not exist", cg.containsValue(2.0)); + } + + @Test + public void testContainsValueOutsideRange() { + assertFalse("value -10 outside data range", piecewiseLinearColGroup.containsValue(-10.0)); + assertFalse("value +10 outside data range", piecewiseLinearColGroup.containsValue(10.0)); + } + + @Test + public void testGetIdxMatchesDecompress() { + for(int c = 0; c < numCols; c++) + for(int r = 0; r < numRows; r++) + assertEquals("getIdx(" + r + "," + c + ")", decompressedMB.get(r, c), + piecewiseLinearColGroup.getIdx(r, c), 1e-10); + } + + @Test + public void testGetIdxInvalidBounds() { + assertEquals("row < 0", 0.0, piecewiseLinearColGroup.getIdx(-1, 0), DELTA); + assertEquals("row >= numRows", 0.0, piecewiseLinearColGroup.getIdx(numRows, 0), DELTA); + assertEquals("col < 0", 0.0, piecewiseLinearColGroup.getIdx(0, -1), DELTA); + assertEquals("col >= ncols", 0.0, piecewiseLinearColGroup.getIdx(0, numCols), DELTA); + } + + @Test + public void testGetNumValues() { + int expected = 0; + for(int c = 0; c < numCols; c++) { + int breakpointsLen = piecewiseLinearColGroup.getBreakpointsPerCol()[c].length; + int slopesLen = piecewiseLinearColGroup.getSlopesPerCol()[c].length; + int interceptsLen = piecewiseLinearColGroup.getInterceptsPerCol()[c].length; + assertEquals("breakpoints != slopes+1 for col " + c, breakpointsLen, slopesLen + 1); + assertEquals("slopes != intercepts for col " + c, slopesLen, interceptsLen); + expected += breakpointsLen + slopesLen + interceptsLen; + } + assertEquals("getNumValues() mismatch", expected, piecewiseLinearColGroup.getNumValues()); + } + + @Test + public void testGetExactSizeOnDisk() { + Random rng = new Random(SEED); + int rows = 80 + rng.nextInt(40); + int numSegs = 1 + rng.nextInt(3); + + int[] breakpoints = new int[numSegs + 1]; + breakpoints[0] = 0; + breakpoints[numSegs] = rows; + for(int s = 1; s < numSegs; s++) + breakpoints[s] = rng.nextInt(rows * 2 / 3) + rows / 10; + + double[] slopes = new double[numSegs]; + double[] intercepts = new double[numSegs]; + for(int s = 0; s < numSegs; s++) { + slopes[s] = rng.nextDouble() * 4 - 2; + intercepts[s] = rng.nextDouble() * 4 - 2; + } + /// PLC Piecewise Linear Compressed + AColGroup colGroupPLC = ColGroupPiecewiseLinearCompressed.create( + ColIndexFactory.create(new int[] {rng.nextInt(20)}), new int[][] {breakpoints}, new double[][] {slopes}, + new double[][] {intercepts}, rows); + + assertTrue("disk size should be positive", colGroupPLC.getExactSizeOnDisk() > 0); + assertTrue("num values should be positive", colGroupPLC.getNumValues() > 0); + } + + @Override + public double[][] getRandomMatrix(int rows, int cols, double min, double max, double sparsity, long seed) { + Random rng = new Random(seed); + double[][] data = new double[rows][cols]; + for(int r = 0; r < rows; r++) + for(int c = 0; c < cols; c++) + data[r][c] = min + rng.nextDouble() * (max - min); + return data; + } +} diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupPiecewiseLinearCompressedTest.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupPiecewiseLinearCompressedTest.java new file mode 100644 index 00000000000..e05745bc97d --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupPiecewiseLinearCompressedTest.java @@ -0,0 +1,455 @@ +package org.apache.sysds.test.component.compress.colgroup; + +import org.apache.sysds.runtime.compress.CompressionSettings; +import org.apache.sysds.runtime.compress.CompressionSettingsBuilder; +import org.apache.sysds.runtime.compress.colgroup.AColGroup; +import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory; +import org.apache.sysds.runtime.compress.colgroup.ColGroupPiecewiseLinearCompressed; +import org.apache.sysds.runtime.compress.colgroup.functional.PiecewiseLinearUtils; +import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; +import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; +import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo; +import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup; +import org.apache.sysds.runtime.compress.estim.EstimationFactors; +import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.util.DataConverter; +import org.apache.sysds.test.AutomatedTestBase; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.*; + +/** + * Unit tests of ColGroupPiecewiseLinearCompression Covers Validation, Compression and decompression + */ +public class ColGroupPiecewiseLinearCompressedTest extends AutomatedTestBase { + + private static final long SEED = 42L; + + @Override + public void setUp() { + + } + + @Test(expected = NullPointerException.class) + public void testCreateNullBreakpoints() { + IColIndex cols = ColIndexFactory.create(new int[] {0}); + int[][] nullBp = {null}; + ColGroupPiecewiseLinearCompressed.create(cols, nullBp, new double[][] {{1.0}}, new double[][] {{0.0}}, 10); + } + + @Test(expected = IllegalArgumentException.class) + public void testCreateTooFewBreakpoints() { + int[][] singleBp = {new int[] {0}}; + ColGroupPiecewiseLinearCompressed.create(ColIndexFactory.create(new int[] {0}), singleBp, + new double[][] {new double[] {1.0}}, new double[][] {new double[] {0.0}}, 10); + } + + @Test(expected = IllegalArgumentException.class) + public void testCreateInconsistentSlopes() { + int[] bp = {0, 5, 10}; + ColGroupPiecewiseLinearCompressed.create(ColIndexFactory.create(new int[] {0}), new int[][] {bp}, + new double[][] {new double[] {1.0, 2.0, 3.0}}, new double[][] {new double[] {0.0, 1.0}}, 10); + } + + @Test(expected = IllegalArgumentException.class) + public void testCreateInconsistentIntercepts() { + int[] bp = {0, 5, 10}; + ColGroupPiecewiseLinearCompressed.create(ColIndexFactory.create(new int[] {0}), new int[][] {bp}, + new double[][] {new double[] {1.0, 2.0}}, new double[][] {new double[] {0.0}}, 10); + } + + @Test + public void testCompressAndDecompressDP() { + + // create random matrix + final int nrows = 50, ncols = 3; + double[][] data = getRandomMatrix(nrows, ncols, -3, 3, 1.0, SEED); + MatrixBlock in = DataConverter.convertToMatrixBlock(data); + in.allocateDenseBlock(); + + IColIndex colIndexes = ColIndexFactory.create(new int[] {0, 1, 2}); + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(1e-8); + + AColGroup result = ColGroupFactory.compressPiecewiseLinearFunctional(colIndexes, in, cs); + assertTrue(result instanceof ColGroupPiecewiseLinearCompressed); + ColGroupPiecewiseLinearCompressed plGroup = (ColGroupPiecewiseLinearCompressed) result; + + // check the structure + int[][] breakpoints = plGroup.getBreakpointsPerCol(); + double[][] slopes = plGroup.getSlopesPerCol(); + double[][] intercepts = plGroup.getInterceptsPerCol(); + + assertEquals("wrong number of columns in breakpoints", ncols, breakpoints.length); + for(int c = 0; c < ncols; c++) { + assertTrue("breakpoints[" + c + "] needs at least 2 entries", breakpoints[c].length >= 2); + assertEquals("breakpoints[" + c + "] must start at 0", 0, breakpoints[c][0]); + assertEquals("breakpoints[" + c + "] must end at nrows", nrows, breakpoints[c][breakpoints[c].length - 1]); + int numSegs = breakpoints[c].length - 1; + assertEquals("slopes[" + c + "] length mismatch", numSegs, slopes[c].length); + assertEquals("intercepts[" + c + "] length mismatch", numSegs, intercepts[c].length); + } + + // decompress and check reconstruction of column group + MatrixBlock recon = new MatrixBlock(nrows, ncols, false); + recon.allocateDenseBlock(); + plGroup.decompressToDenseBlock(recon.getDenseBlock(), 0, nrows, 0, 0); + DenseBlock db = recon.getDenseBlock(); + + for(int r = 0; r < nrows; r++) { + for(int c = 0; c < ncols; c++) { + double val = db.get(r, c); + assertFalse("NaN at [" + r + "," + c + "]", Double.isNaN(val)); + assertFalse("Infinite at [" + r + "," + c + "]", Double.isInfinite(val)); + assertEquals("reconstruction error too large at [" + r + "," + c + "]", data[r][c], val, 1e-6); + } + } + } + + @Test + public void testCompressAndDecompressSuccessive() { + + //create random matrix + final int nrows = 50, ncols = 3; + double[][] data = getRandomMatrix(nrows, ncols, -3, 3, 1.0, SEED); + MatrixBlock in = DataConverter.convertToMatrixBlock(data); + in.allocateDenseBlock(); + + IColIndex colIndexes = ColIndexFactory.create(new int[] {0, 1, 2}); + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(1e-8); + + // create ColGroupPiecewiseLinearCompressed with successive compression + AColGroup result = ColGroupFactory.compressPiecewiseLinearFunctionalSuccessive(colIndexes, in, cs); + assertTrue(result instanceof ColGroupPiecewiseLinearCompressed); + ColGroupPiecewiseLinearCompressed plGroup = (ColGroupPiecewiseLinearCompressed) result; + + // structure checks + int[][] bp = plGroup.getBreakpointsPerCol(); + double[][] slopes = plGroup.getSlopesPerCol(); + double[][] intercepts = plGroup.getInterceptsPerCol(); + + assertEquals("wrong number of columns in bp", ncols, bp.length); + for(int c = 0; c < ncols; c++) { + assertTrue("bp[" + c + "] needs at least 2 entries", bp[c].length >= 2); + assertEquals("bp[" + c + "] must start at 0", 0, bp[c][0]); + assertEquals("bp[" + c + "] must end at nrows", nrows, bp[c][bp[c].length - 1]); + int numSegs = bp[c].length - 1; + assertEquals("slopes[" + c + "] length mismatch", numSegs, slopes[c].length); + assertEquals("intercepts[" + c + "] length mismatch", numSegs, intercepts[c].length); + } + + // validate decompression + MatrixBlock recon = new MatrixBlock(nrows, ncols, false); + recon.allocateDenseBlock(); + plGroup.decompressToDenseBlock(recon.getDenseBlock(), 0, nrows, 0, 0); + DenseBlock db = recon.getDenseBlock(); + + for(int r = 0; r < nrows; r++) { + for(int c = 0; c < ncols; c++) { + double val = db.get(r, c); + assertFalse("NaN at [" + r + "," + c + "]", Double.isNaN(val)); + assertFalse("Infinite at [" + r + "," + c + "]", Double.isInfinite(val)); + assertEquals("reconstruction error too large at [" + r + "," + c + "]", data[r][c], val, 1e-6); + } + } + } + + /// Wrapper-Classes: Test setup for DP and successive compression + + private void testRoundtripDP(double[][] data, int nrows, int ncols, double targetLoss, double tolerance, + int maxFailures) { + testRoundtrip(data, nrows, ncols, targetLoss, tolerance, maxFailures, false); + } + + private void testRoundtripSuccessive(double[][] data, int nrows, int ncols, double targetLoss, double tolerance, + int maxFailures) { + testRoundtrip(data, nrows, ncols, targetLoss, tolerance, maxFailures, true); + } + + /** + * Set test setup: converting data in matrix block, set compression setting does compression, decompression, + * validation + */ + private void testRoundtrip(double[][] data, int nrows, int ncols, double targetLoss, double tolerance, + int maxFailures, boolean successive) { + + ///create a matrix + MatrixBlock orig = DataConverter.convertToMatrixBlock(data); + orig.allocateDenseBlock(); + + IColIndex colIndexes = ColIndexFactory.create(buildColArray(ncols)); + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(targetLoss); + + /// choose compression + AColGroup result = successive ? ColGroupFactory.compressPiecewiseLinearFunctionalSuccessive(colIndexes, orig, + cs) : ColGroupFactory.compressPiecewiseLinearFunctional(colIndexes, orig, cs); + + assertTrue(result instanceof ColGroupPiecewiseLinearCompressed); + ColGroupPiecewiseLinearCompressed plGroup = (ColGroupPiecewiseLinearCompressed) result; + + /// structure checks + checkStructure(plGroup, nrows, ncols); + + /// decompression check + MatrixBlock recon = new MatrixBlock(nrows, ncols, false); + recon.allocateDenseBlock(); + plGroup.decompressToDenseBlock(recon.getDenseBlock(), 0, nrows, 0, 0); + DenseBlock db = recon.getDenseBlock(); + + int failures = 0; + for(int r = 0; r < nrows; r++) { + for(int c = 0; c < ncols; c++) { + double val = db.get(r, c); + assertFalse("NaN at [" + r + "," + c + "]", Double.isNaN(val)); + assertFalse("Infinite at [" + r + "," + c + "]", Double.isInfinite(val)); + if(Math.abs(data[r][c] - val) > tolerance) + failures++; + } + } + assertTrue("too many reconstruction failures: " + failures, failures <= maxFailures); + } + + private void checkStructure(ColGroupPiecewiseLinearCompressed plGroup, int nrows, int ncols) { + int[][] breakpoints = plGroup.getBreakpointsPerCol(); + double[][] slopes = plGroup.getSlopesPerCol(); + double[][] intercepts = plGroup.getInterceptsPerCol(); + + assertEquals("wrong number of columns in breakpoints", ncols, breakpoints.length); + assertEquals("wrong number of col indices", ncols, plGroup.getColIndices().size()); + + for(int c = 0; c < ncols; c++) { + assertTrue("breakpoints[" + c + "] needs at least 2 entries", breakpoints[c].length >= 2); + assertEquals("breakpoints[" + c + "] must start at 0", 0, breakpoints[c][0]); + assertEquals("breakpoints[" + c + "] must end at nrows", nrows, breakpoints[c][breakpoints[c].length - 1]); + int numSegs = breakpoints[c].length - 1; + assertEquals("slopes[" + c + "] length mismatch", numSegs, slopes[c].length); + assertEquals("intercepts[" + c + "] length mismatch", numSegs, intercepts[c].length); + } + } + + private double[][] buildMultiSegmentData(int nrows, int ncols) { + Random rng = new Random(SEED); + double[][] data = new double[nrows][ncols]; + int[] segStarts = {0, 15, 30, 45, 60}; + double[] slopes = {0.5, -1.2, 2.0, -0.8}; + + for(int c = 0; c < ncols; c++) { + double offset = c; + for(int r = 0; r < nrows; r++) { + int seg = 0; + while(seg < segStarts.length - 1 && r >= segStarts[seg + 1]) + seg++; + data[r][c] = slopes[seg] * (r - segStarts[seg]) + offset + rng.nextGaussian() * 0.8; + offset += 0.01; + } + } + return data; + } + + private int[] buildColArray(int ncols) { + int[] cols = new int[ncols]; + for(int i = 0; i < ncols; i++) + cols[i] = i; + return cols; + } + + @Test + public void testTrendWithNoise() { + final int nrows = 100, ncols = 2; + Random rng = new Random(SEED); + double[][] data = new double[nrows][ncols]; + for(int r = 0; r < nrows; r++) { + double trend = 0.05 * r; + for(int c = 0; c < ncols; c++) + data[r][c] = trend + rng.nextGaussian() * 1.5 + c * 2.0; + } + testRoundtripDP(data, nrows, ncols, 1.0, 4.0, 45); + testRoundtripSuccessive(data, nrows, ncols, 1.0, 4.0, 45); + } + + @Test + public void testAbruptJumps() { + final int nrows = 80, ncols = 3; + double[][] data = getRandomMatrix(nrows, ncols, -2, 2, 1.0, SEED); + for(int c = 0; c < ncols; c++) { + for(int r = 25; r < 55; r++) + data[r][c] += 8.0; + for(int r = 55; r < nrows; r++) + data[r][c] += 15.0; + } + // successive needs looser tolerance on jumps + testRoundtripDP(data, nrows, ncols, 5.0, 10.0, 50); + testRoundtripSuccessive(data, nrows, ncols, 25.0, 18.0, 55); + } + + @Test + public void testHighFrequency() { + final int nrows = 100, ncols = 50; + Random rng = new Random(SEED); + double[][] data = new double[nrows][ncols]; + for(int r = 0; r < nrows; r++) { + double sine = Math.sin(r * 0.4) * 4.0; + for(int c = 0; c < ncols; c++) + data[r][c] = sine + rng.nextGaussian() * 0.8 + Math.sin(r * 0.2 + c) * 2.0; + } + // both struggle with high frequency; successive slightly worse + testRoundtripDP(data, nrows, ncols, 2.0, 2.0, 3500); + testRoundtripSuccessive(data, nrows, ncols, 2.0, 2.5, 2500); + } + + @Test + public void testLowVarianceSingleColumn() { + double[][] data = getRandomMatrix(50, 1, -1, 1, 0.3, SEED); + testRoundtripDP(data, 50, 1, 0.1, 0.5, 5); + testRoundtripSuccessive(data, 50, 1, 0.05, 0.4, 3); + } + + @Test + public void testSingleColumn() { + double[][] data = getRandomMatrix(50, 1, -1, 1, 1.0, SEED); + testRoundtripDP(data, 50, 1, 0.5, 1.0, 8); + testRoundtripSuccessive(data, 50, 1, 0.5, 1.0, 8); + } + + @Test + public void testKnownSegmentBoundaries() { + final int nrows = 60, ncols = 2; + double[][] data = buildMultiSegmentData(nrows, ncols); + // successive needs slightly higher targetLoss for same data + testRoundtripDP(data, nrows, ncols, 0.8, 5.0, 35); + testRoundtripSuccessive(data, nrows, ncols, 1.0, 5.0, 35); + } + + @Test + public void testMultipleColumns() { + double[][] data = getRandomMatrix(80, 5, -5, 5, 1.5, SEED); + testRoundtripDP(data, 80, 5, 3.0, 4.0, 120); + testRoundtripSuccessive(data, 80, 5, 3.0, 4.0, 120); + } + + private CompressedSizeInfo createTestCompressedSizeInfo() { + IColIndex cols = ColIndexFactory.create(new int[] {0}); + EstimationFactors facts = new EstimationFactors(2, 10); + + CompressedSizeInfoColGroup info = new CompressedSizeInfoColGroup(cols, facts, + AColGroup.CompressionType.PiecewiseLinear); + + List infos = Arrays.asList(info); + CompressedSizeInfo csi = new CompressedSizeInfo(infos); + + return csi; + } + + @Test + public void testCompressPiecewiseLinearViaRealAPI() { + + MatrixBlock in = new MatrixBlock(10, 1, false); + in.allocateDenseBlock(); + for(int r = 0; r < 10; r++) { + in.set(r, 0, r * 0.5); + } + + CompressionSettings cs = new CompressionSettingsBuilder().addValidCompression( + AColGroup.CompressionType.PiecewiseLinear).create(); + + CompressedSizeInfo csi = createTestCompressedSizeInfo(); + + List colGroups = ColGroupFactory.compressColGroups(in, csi, cs); + + boolean hasPiecewise = colGroups.stream().anyMatch(cg -> cg instanceof ColGroupPiecewiseLinearCompressed); + assertTrue(hasPiecewise); + } + + @Test + public void testSuccessiveLinearColumnSingleSegment() { + double[] col = {1.0, 2.0, 3.0, 4.0, 5.0}; + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(1e-6); + + List breaks = PiecewiseLinearUtils.computeBreakpointSuccessive(col, cs); + assertEquals("[0, 5]", breaks.toString()); + } + + @Test + public void testSuccessiveNoisyColumnMultipleSegments() { + double[] col = {1.1, 1.9, 2.2, 10.1, 10.8, 11.3}; + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(1.0); + + List breaks = PiecewiseLinearUtils.computeBreakpointSuccessive(col, cs); + assertTrue("expected at least 3 breakpoints", breaks.size() >= 3); + } + + @Test + public void testSuccessiveStrictLossProducesMoreSegments() { + double[] col = {1, 2, 3, 10, 11, 12, 20, 21, 22}; + + CompressionSettings strict = new CompressionSettingsBuilder().create(); + strict.setPiecewiseTargetLoss(0.01); + + CompressionSettings loose = new CompressionSettingsBuilder().create(); + loose.setPiecewiseTargetLoss(10.0); + + List strictBreaks = PiecewiseLinearUtils.computeBreakpointSuccessive(col, strict); + List looseBreaks = PiecewiseLinearUtils.computeBreakpointSuccessive(col, loose); + + assertTrue("strict loss should produce more segments", strictBreaks.size() > looseBreaks.size()); + } + + @Test + public void testSuccessiveBreakpointDetectedAtJump() { + double[] col = getRandomColumn(30, SEED); + for(int r = 10; r < 20; r++) + col[r] += 8.0; + + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(2.0); + + int[] bps = PiecewiseLinearUtils.computeBreakpointSuccessive(col, cs).stream().mapToInt(Integer::intValue) + .toArray(); + + assertTrue("expected at least 3 segments", bps.length >= 3); + assertTrue("expected breakpoint near jump [10,20]", hasBreakInRange(bps, 8, 22)); + } + + @Test + public void testSuccessiveGlobalMSEWithinTarget() { + double[] col = getRandomColumn(40, SEED + 1); + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(1.5); + + List bps = PiecewiseLinearUtils.computeBreakpointSuccessive(col, cs); + double sse = 0.0; + for(int i = 0; i < bps.size() - 1; i++) + sse += PiecewiseLinearUtils.computeSegmentCost(col, bps.get(i), bps.get(i + 1)); + + double mse = sse / col.length; + assertTrue("global MSE=" + mse + " exceeds target=" + cs.getPiecewiseTargetLoss(), + mse <= cs.getPiecewiseTargetLoss() + 1e-10); + } + + private boolean hasBreakInRange(int[] bps, int min, int max) { + for(int i = 1; i < bps.length - 1; i++) + if(bps[i] >= min && bps[i] <= max) + return true; + return false; + } + + private double[] getRandomColumn(int len, long seed) { + Random rng = new Random(seed); + double[] col = new double[len]; + for(int i = 0; i < len; i++) + col[i] = rng.nextGaussian() * 2 + i * 0.01; + return col; + } + +} + +