From f569dc0ee2df4ed12553458a8a6f1995841f2dd0 Mon Sep 17 00:00:00 2001 From: e-strauss Date: Mon, 8 Dec 2025 11:42:48 +0100 Subject: [PATCH] Added new mode of data transfer replaced py4j with unix pipes --- .gitignore | 2 +- .../org/apache/sysds/api/PythonDMLScript.java | 99 ++- .../frame/data/columns/FloatArray.java | 2 +- ...turnParameterizedBuiltinCPInstruction.java | 4 +- .../sysds/runtime/util/UnixPipeUtils.java | 717 ++++++++++++--- .../systemds/context/systemds_context.py | 1 + src/main/python/systemds/utils/converters.py | 840 ++++++++++++++---- .../matrix/test_block_converter_unix_pipe.py | 104 --- .../python_java_data_transfer/__init__.py | 20 + .../test_dense_numpy_matrix.py | 246 +++++ .../test_pandas_frame.py | 265 ++++++ src/main/python/tests/test_utils.py | 58 ++ .../component/utils/UnixPipeUtilsTest.java | 313 ++++++- .../test/usertest/pythonapi/StartupTest.java | 98 +- 14 files changed, 2318 insertions(+), 451 deletions(-) delete mode 100644 src/main/python/tests/matrix/test_block_converter_unix_pipe.py create mode 100644 src/main/python/tests/python_java_data_transfer/__init__.py create mode 100644 src/main/python/tests/python_java_data_transfer/test_dense_numpy_matrix.py create mode 100644 src/main/python/tests/python_java_data_transfer/test_pandas_frame.py create mode 100644 src/main/python/tests/test_utils.py diff --git a/.gitignore b/.gitignore index d2fcdb9a4de..5de697a37e3 100644 --- a/.gitignore +++ b/.gitignore @@ -146,7 +146,7 @@ src/test/scripts/functions/pipelines/intermediates/classification/* venv venv/* - +.venv # resource optimization scripts/resource/output *.pem diff --git a/src/main/java/org/apache/sysds/api/PythonDMLScript.java b/src/main/java/org/apache/sysds/api/PythonDMLScript.java index 3b1864d71dd..1a74ba0ea49 100644 --- a/src/main/java/org/apache/sysds/api/PythonDMLScript.java +++ b/src/main/java/org/apache/sysds/api/PythonDMLScript.java @@ -1,18 +1,18 @@ /* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ @@ -24,9 +24,11 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.sysds.api.jmlc.Connection; +import org.apache.sysds.common.Types.ValueType; -import org.apache.sysds.common.Types; import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.frame.data.columns.Array; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.CommonThreadPool; import org.apache.sysds.runtime.util.UnixPipeUtils; @@ -79,7 +81,7 @@ public static void main(String[] args) throws Exception { * therefore use logging framework. and terminate program. */ LOG.info("failed startup", p4e); - System.exit(-1); + exitHandler.exit(-1); } catch(Exception e) { throw new DMLException("Failed startup and maintaining Python gateway", e); @@ -116,59 +118,59 @@ public void openPipes(String path, int num) throws IOException { } } - public MatrixBlock startReadingMbFromPipe(int id, int rlen, int clen, Types.ValueType type) throws IOException { + public MatrixBlock startReadingMbFromPipe(int id, int rlen, int clen, ValueType type) throws IOException { long limit = (long) rlen * clen; LOG.debug("trying to read matrix from "+id+" with "+rlen+" rows and "+clen+" columns. Total size: "+limit); if(limit > Integer.MAX_VALUE) throw new DMLRuntimeException("Dense NumPy array of size " + limit + " cannot be converted to MatrixBlock"); - MatrixBlock mb = new MatrixBlock(rlen, clen, false, -1); + MatrixBlock mb; if(fromPython != null){ BufferedInputStream pipe = fromPython.get(id); double[] denseBlock = new double[(int) limit]; - UnixPipeUtils.readNumpyArrayInBatches(pipe, id, BATCH_SIZE, (int) limit, type, denseBlock, 0); - mb.init(denseBlock, rlen, clen); + long nnz = UnixPipeUtils.readNumpyArrayInBatches(pipe, id, BATCH_SIZE, (int) limit, type, denseBlock, 0); + mb = new MatrixBlock(rlen, clen, denseBlock); + mb.setNonZeros(nnz); } else { throw new DMLRuntimeException("FIFO Pipes are not initialized."); } - mb.recomputeNonZeros(); - mb.examSparsity(); LOG.debug("Reading from Python finished"); + mb.examSparsity(); return mb; } - public MatrixBlock startReadingMbFromPipes(int[] blockSizes, int rlen, int clen, Types.ValueType type) throws ExecutionException, InterruptedException { + public MatrixBlock startReadingMbFromPipes(int[] blockSizes, int rlen, int clen, ValueType type) throws ExecutionException, InterruptedException { long limit = (long) rlen * clen; if(limit > Integer.MAX_VALUE) throw new DMLRuntimeException("Dense NumPy array of size " + limit + " cannot be converted to MatrixBlock"); - MatrixBlock mb = new MatrixBlock(rlen, clen, false, -1); + MatrixBlock mb = new MatrixBlock(rlen, clen, false, rlen*clen); if(fromPython != null){ ExecutorService pool = CommonThreadPool.get(); double[] denseBlock = new double[(int) limit]; int offsetOut = 0; - List> futures = new ArrayList<>(); + List> futures = new ArrayList<>(); for (int i = 0; i < blockSizes.length; i++) { BufferedInputStream pipe = fromPython.get(i); int id = i, blockSize = blockSizes[i], _offsetOut = offsetOut; - Callable task = () -> { - UnixPipeUtils.readNumpyArrayInBatches(pipe, id, BATCH_SIZE, blockSize, type, denseBlock, _offsetOut); - return null; + Callable task = () -> { + return UnixPipeUtils.readNumpyArrayInBatches(pipe, id, BATCH_SIZE, blockSize, type, denseBlock, _offsetOut); }; futures.add(pool.submit(task)); offsetOut += blockSize; } - // Wait for all tasks and propagate exceptions - for (Future f : futures) { - f.get(); + // Wait for all tasks and propagate exceptions, sum up nonzeros + long nnz = 0; + for (Future f : futures) { + nnz += f.get(); } - mb.init(denseBlock, rlen, clen); + mb = new MatrixBlock(rlen, clen, denseBlock); + mb.setNonZeros(nnz); } else { throw new DMLRuntimeException("FIFO Pipes are not initialized."); } - mb.recomputeNonZeros(); mb.examSparsity(); return mb; } @@ -181,7 +183,7 @@ public void startWritingMbToPipe(int id, MatrixBlock mb) throws IOException { LOG.debug("Trying to write matrix ["+baseDir + "-"+ id+"] with "+rlen+" rows and "+clen+" columns. Total size: "+numElem*8); BufferedOutputStream out = toPython.get(id); - long bytes = UnixPipeUtils.writeNumpyArrayInBatches(out, id, BATCH_SIZE, numElem, Types.ValueType.FP64, mb); + long bytes = UnixPipeUtils.writeNumpyArrayInBatches(out, id, BATCH_SIZE, numElem, ValueType.FP64, mb); LOG.debug("Writing of " + bytes +" Bytes to Python ["+baseDir + "-"+ id+"] finished"); } else { @@ -189,6 +191,43 @@ public void startWritingMbToPipe(int id, MatrixBlock mb) throws IOException { } } + public void startReadingColFromPipe(int id, FrameBlock fb, int rows, int totalBytes, int col, ValueType type, boolean any) throws IOException { + if (fromPython == null) { + throw new DMLRuntimeException("FIFO Pipes are not initialized."); + } + + BufferedInputStream pipe = fromPython.get(id); + LOG.debug("Start reading FrameBlock column from pipe #" + id + " with type " + type); + + // Delegate to UnixPipeUtils + Array arr = UnixPipeUtils.readFrameColumnFromPipe(pipe, id, rows, totalBytes, BATCH_SIZE, type); + // Set column into FrameBlock + fb.setColumn(col, arr); + ValueType[] schema = fb.getSchema(); + // inplace update the schema for cases: int8 -> int32 + schema[col] = arr.getValueType(); + + LOG.debug("Finished reading FrameBlock column from pipe #" + id); + } + + public void startWritingColToPipe(int id, FrameBlock fb, int col) throws IOException { + if (toPython == null) { + throw new DMLRuntimeException("FIFO Pipes are not initialized."); + } + + BufferedOutputStream pipe = toPython.get(id); + ValueType type = fb.getSchema()[col]; + int rows = fb.getNumRows(); + Array array = fb.getColumn(col); + + LOG.debug("Start writing FrameBlock column #" + col + " to pipe #" + id + " with type " + type + " and " + rows + " rows"); + + // Delegate to UnixPipeUtils + long bytes = UnixPipeUtils.writeFrameColumnToPipe(pipe, id, BATCH_SIZE, array, type); + + LOG.debug("Finished writing FrameBlock column #" + col + " to pipe #" + id + ". Total bytes: " + bytes); + } + public void closePipes() throws IOException { LOG.debug("Closing all pipes in Java"); for (BufferedInputStream pipe : fromPython.values()) @@ -198,6 +237,20 @@ public void closePipes() throws IOException { LOG.debug("Closed all pipes in Java"); } + @FunctionalInterface + public interface ExitHandler { + void exit(int status); + } + + private static volatile ExitHandler exitHandler = System::exit; + + public static void setExitHandler(ExitHandler handler) { + exitHandler = handler == null ? System::exit : handler; + } + + public static void resetExitHandler() { + exitHandler = System::exit; + } protected static class DMLGateWayListener extends DefaultGatewayServerListener { private static final Log LOG = LogFactory.getLog(DMLGateWayListener.class.getName()); diff --git a/src/main/java/org/apache/sysds/runtime/frame/data/columns/FloatArray.java b/src/main/java/org/apache/sysds/runtime/frame/data/columns/FloatArray.java index d0ab7a56305..f838eadc1d2 100644 --- a/src/main/java/org/apache/sysds/runtime/frame/data/columns/FloatArray.java +++ b/src/main/java/org/apache/sysds/runtime/frame/data/columns/FloatArray.java @@ -174,7 +174,7 @@ public void reset(int size) { @Override public byte[] getAsByteArray() { - ByteBuffer floatBuffer = ByteBuffer.allocate(8 * _size); + ByteBuffer floatBuffer = ByteBuffer.allocate(4 * _size); floatBuffer.order(ByteOrder.nativeOrder()); for(int i = 0; i < _size; i++) floatBuffer.putFloat(_data[i]); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java index 28bd01f08d2..98101348da0 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java @@ -8,7 +8,7 @@ * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -91,7 +91,7 @@ public void processInstruction(ExecutionContext ec) { FrameBlock fin = ec.getFrameInput(input1.getName()); String spec = ec.getScalarInput(input2).getStringValue(); String[] colnames = fin.getColumnNames(); - + // execute block transform encode MultiColumnEncoder encoder = EncoderFactory.createEncoder(spec, colnames, fin.getNumColumns(), null); // TODO: Assign #threads in compiler and pass via the instruction string diff --git a/src/main/java/org/apache/sysds/runtime/util/UnixPipeUtils.java b/src/main/java/org/apache/sysds/runtime/util/UnixPipeUtils.java index 69014acc0f0..9ed1f73b7f4 100644 --- a/src/main/java/org/apache/sysds/runtime/util/UnixPipeUtils.java +++ b/src/main/java/org/apache/sysds/runtime/util/UnixPipeUtils.java @@ -1,29 +1,24 @@ /* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.sysds.runtime.util; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.sysds.common.Types; -import org.apache.sysds.runtime.matrix.data.MatrixBlock; - import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.EOFException; @@ -37,10 +32,33 @@ import java.nio.DoubleBuffer; import java.nio.FloatBuffer; import java.nio.IntBuffer; +import java.nio.LongBuffer; +import java.nio.charset.StandardCharsets; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.sysds.common.Types; +import org.apache.sysds.runtime.frame.data.columns.Array; +import org.apache.sysds.runtime.frame.data.columns.ArrayFactory; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; + public class UnixPipeUtils { private static final Log LOG = LogFactory.getLog(UnixPipeUtils.class.getName()); + public static int getElementSize(Types.ValueType type) { + return switch (type) { + case UINT8, BOOLEAN -> 1; + case INT32, FP32 -> 4; + case INT64, FP64 -> 8; + default -> throw new UnsupportedOperationException("Unsupported type: " + type); + }; + } + + private static ByteBuffer newLittleEndianBuffer(byte[] buffer, int length) { + return ByteBuffer.wrap(buffer, 0, length).order(ByteOrder.LITTLE_ENDIAN); + } + /** * Opens a named pipe for input, reads 4 bytes as an int, compares it to the expected ID. * If matched, returns the InputStream for further use. @@ -74,7 +92,10 @@ public static void readHandshake(int expectedId, BufferedInputStream bis) throws bis.close(); throw new IOException("Failed to read handshake integer from pipe"); } + compareHandshakeIds(expectedId, bis, buffer); + } + private static void compareHandshakeIds(int expectedId, BufferedInputStream bis, byte[] buffer) throws IOException { // Convert bytes to int (assuming little-endian to match typical Python struct.pack) int receivedId = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN).getInt(); expectedId += 1000; @@ -106,15 +127,65 @@ public static void writeHandshake(int expectedId, BufferedOutputStream bos) thro bos.flush(); } - public static void readNumpyArrayInBatches(BufferedInputStream in, int id, int batchSize, int numElem, - Types.ValueType type, double[] out, int offsetOut) - throws IOException { - int elemSize; - switch (type){ - case UINT8 -> elemSize = 1; - case INT32, FP32 -> elemSize = 4; - default -> elemSize = 8; + @FunctionalInterface + private interface BufferReader { + int readTo(Object dest, int offset, ByteBuffer bb); + } + + private static BufferReader getBufferReader(Types.ValueType type) { + return switch (type) { + case FP64 -> (dest, offset, bb) -> { + DoubleBuffer db = bb.asDoubleBuffer(); + double[] out = (double[]) dest; + int remaining = db.remaining(); + db.get(out, offset, remaining); + return offset + remaining; + }; + case FP32 -> (dest, offset, bb) -> { + FloatBuffer fb = bb.asFloatBuffer(); + double[] out = (double[]) dest; + int n = fb.remaining(); + for (int i = 0; i < n; i++) out[offset++] = fb.get(); + return offset; + }; + case INT64 -> (dest, offset, bb) -> { + LongBuffer lb = bb.asLongBuffer(); + double[] out = (double[]) dest; + int n = lb.remaining(); + for (int i = 0; i < n; i++) out[offset++] = lb.get(); + return offset; + }; + case INT32 -> (dest, offset, bb) -> { + IntBuffer ib = bb.asIntBuffer(); + double[] out = (double[]) dest; + int n = ib.remaining(); + for (int i = 0; i < n; i++) out[offset++] = ib.get(); + return offset; + }; + case UINT8 -> (dest, offset, bb) -> { + double[] out = (double[]) dest; + for (int i = 0; i < bb.limit(); i++) out[offset++] = bb.get(i) & 0xFF; + return offset; + }; + default -> throw new UnsupportedOperationException("Unsupported type: " + type); + }; + } + + private static void readFully(BufferedInputStream in, byte[] buffer, int len) throws IOException { + int total = 0; + while (total < len) { + int read = in.read(buffer, total, len - total); + if (read == -1) + throw new EOFException("Unexpected end of stream"); + total += read; } + } + + public static long readNumpyArrayInBatches(BufferedInputStream in, int id, int batchSize, int numElem, + Types.ValueType type, double[] out, int offsetOut) + throws IOException { + int elemSize = getElementSize(type); + long nonZeros = 0; try { // Read start header @@ -122,26 +193,26 @@ public static void readNumpyArrayInBatches(BufferedInputStream in, int id, int b long bytesRemaining = ((long) numElem) * elemSize; byte[] buffer = new byte[batchSize]; + BufferReader reader = getBufferReader(type); + int prevOffset = offsetOut; while (bytesRemaining > 0) { - int currentBatchSize = (int) Math.min(batchSize, bytesRemaining); - int totalRead = 0; - - while (totalRead < currentBatchSize) { - int bytesRead = in.read(buffer, totalRead, currentBatchSize - totalRead); - if (bytesRead == -1) { - throw new EOFException("Unexpected end of stream in pipe #" + id + - ": expected " + currentBatchSize + " bytes, got " + totalRead); + int chunk = (int) Math.min(batchSize, bytesRemaining); + readFully(in, buffer, chunk); + offsetOut = reader.readTo(out, offsetOut, newLittleEndianBuffer(buffer, chunk)); + + // Count nonzeros in the batch we just read (performant: single pass) + for (int i = prevOffset; i < offsetOut; i++) { + if (out[i] != 0.0) { + nonZeros++; } - totalRead += bytesRead; } - - // Interpret bytes with value type and fill the dense MB - offsetOut = fillDoubleArrayFromByteArray(type, out, offsetOut, buffer, currentBatchSize); - bytesRemaining -= currentBatchSize; + prevOffset = offsetOut; + bytesRemaining -= chunk; } // Read end header readHandshake(id, in); + return nonZeros; } catch (Exception e) { LOG.error("Error occurred while reading data from pipe #" + id, e); @@ -149,120 +220,540 @@ public static void readNumpyArrayInBatches(BufferedInputStream in, int id, int b } } - private static int fillDoubleArrayFromByteArray(Types.ValueType type, double[] out, int offsetOut, byte[] buffer, - int currentBatchSize) { - ByteBuffer bb = ByteBuffer.wrap(buffer, 0, currentBatchSize).order(ByteOrder.LITTLE_ENDIAN); - switch (type){ - default -> { - DoubleBuffer doubleBuffer = bb.asDoubleBuffer(); - int numDoubles = doubleBuffer.remaining(); - doubleBuffer.get(out, offsetOut, numDoubles); - offsetOut += numDoubles; + + @FunctionalInterface + private interface BufferWriter { + int writeFrom(Object src, int offset, ByteBuffer bb); + } + + private static BufferWriter getBufferWriter(Types.ValueType type) { + return switch (type) { + case FP64 -> (src, offset, bb) -> { + MatrixBlock mb = (MatrixBlock) src; + DoubleBuffer db = bb.asDoubleBuffer(); + int n = Math.min(db.remaining(), mb.getNumRows() * mb.getNumColumns() - offset); + for (int i = 0; i < n; i++) { + int r = (offset + i) / mb.getNumColumns(); + int c = (offset + i) % mb.getNumColumns(); + db.put(mb.getDouble(r, c)); + } + return n * 8; + }; + case FP32 -> (src, offset, bb) -> { + MatrixBlock mb = (MatrixBlock) src; + FloatBuffer fb = bb.asFloatBuffer(); + int n = Math.min(fb.remaining(), mb.getNumRows() * mb.getNumColumns() - offset); + for (int i = 0; i < n; i++) { + int r = (offset + i) / mb.getNumColumns(); + int c = (offset + i) % mb.getNumColumns(); + fb.put((float) mb.getDouble(r, c)); + } + return n * 4; + }; + case INT64 -> (src, offset, bb) -> { + MatrixBlock mb = (MatrixBlock) src; + LongBuffer lb = bb.asLongBuffer(); + int n = Math.min(lb.remaining(), mb.getNumRows() * mb.getNumColumns() - offset); + for (int i = 0; i < n; i++) { + int r = (offset + i) / mb.getNumColumns(); + int c = (offset + i) % mb.getNumColumns(); + lb.put((long) mb.getDouble(r, c)); + } + return n * 8; + }; + case INT32 -> (src, offset, bb) -> { + MatrixBlock mb = (MatrixBlock) src; + IntBuffer ib = bb.asIntBuffer(); + int n = Math.min(ib.remaining(), mb.getNumRows() * mb.getNumColumns() - offset); + for (int i = 0; i < n; i++) { + int r = (offset + i) / mb.getNumColumns(); + int c = (offset + i) % mb.getNumColumns(); + ib.put((int) mb.getDouble(r, c)); + } + return n * 4; + }; + case UINT8 -> (src, offset, bb) -> { + MatrixBlock mb = (MatrixBlock) src; + int n = Math.min(bb.limit(), mb.getNumRows() * mb.getNumColumns() - offset); + for (int i = 0; i < n; i++) { + int r = (offset + i) / mb.getNumColumns(); + int c = (offset + i) % mb.getNumColumns(); + bb.put(i, (byte) ((int) mb.getDouble(r, c) & 0xFF)); + } + return n; + }; + default -> throw new UnsupportedOperationException("Unsupported type: " + type); + }; + } + + /** + * Symmetric with readNumpyArrayInBatches — writes data in batches with handshake. + */ + public static long writeNumpyArrayInBatches(BufferedOutputStream out, int id, int batchSize, + int numElem, Types.ValueType type, MatrixBlock mb) + throws IOException { + int elemSize = getElementSize(type); + long totalBytesWritten = 0; + + try { + writeHandshake(id, out); + long bytesRemaining = ((long) numElem) * elemSize; + byte[] buffer = new byte[batchSize]; + BufferWriter writer = getBufferWriter(type); + + int offset = 0; + while (bytesRemaining > 0) { + int chunk = (int) Math.min(batchSize, bytesRemaining); + ByteBuffer bb = newLittleEndianBuffer(buffer, chunk); + int bytesFilled = writer.writeFrom(mb, offset, bb); + out.write(buffer, 0, bytesFilled); + totalBytesWritten += bytesFilled; + bytesRemaining -= bytesFilled; + offset += bytesFilled / elemSize; + } + + out.flush(); + writeHandshake(id, out); + return totalBytesWritten; + } catch (Exception e) { + LOG.error("Error occurred while writing data to pipe #" + id, e); + throw e; + } + } + + public static Array readFrameColumnFromPipe( + BufferedInputStream in, int id, int rows, int totalBytes, int batchSize, + Types.ValueType type) throws IOException { + + long tStart = System.nanoTime(); + long tIoStart, tIoTotal = 0; + long tDecodeTotal = 0; + int numStrings = 0; + + readHandshake(id, in); + Array array = ArrayFactory.allocate(type, rows); + byte[] buffer = new byte[batchSize]; + try { + if (type != Types.ValueType.STRING) { + tIoStart = System.nanoTime(); + readFixedTypeColumn(in, array, type, rows, totalBytes, buffer); + tIoTotal = System.nanoTime() - tIoStart; + readHandshake(id, in); + } else { + tIoStart = System.nanoTime(); + VarFillTiming timing = readVariableTypeColumn(in, id, array, type, rows, buffer); + tIoTotal = System.nanoTime() - tIoStart; + tDecodeTotal = timing.decodeTime; + numStrings = timing.numStrings; + } + } catch (Exception e) { + LOG.error("Error occurred while reading FrameBlock column from pipe #" + id, e); + throw e; + } + + long tTotal = System.nanoTime() - tStart; + if (type == Types.ValueType.STRING) { + LOG.debug(String.format( + "Java readFrameColumnFromPipe timing: total=%.3fs, I/O=%.3fs (%.1f%%), decode=%.3fs (%.1f%%), strings=%d", + tTotal / 1e9, tIoTotal / 1e9, 100.0 * tIoTotal / tTotal, + tDecodeTotal / 1e9, 100.0 * tDecodeTotal / tTotal, numStrings)); + } + return array; + } + + private static class VarFillTiming { + long decodeTime; + int numStrings; + VarFillTiming(long decodeTime, int numStrings) { + this.decodeTime = decodeTime; + this.numStrings = numStrings; + } + } + + private static void readFixedTypeColumn( + BufferedInputStream in, Array array, + Types.ValueType type, int rows, int totalBytes, byte[] buffer) throws IOException { + + int elemSize = getElementSize(type); + int expected = rows * elemSize; + if (totalBytes != expected) + throw new IOException("Expected " + expected + " bytes but got " + totalBytes); + + int offset = 0; + long bytesRemaining = totalBytes; + + while (bytesRemaining > 0) { + int chunk = (int) Math.min(buffer.length, bytesRemaining); + readFully(in, buffer, chunk); + offset = fillFixedArrayFromBytes(array, type, offset, buffer, chunk); + bytesRemaining -= chunk; + } + } + + private static int fillFixedArrayFromBytes( + Array array, Types.ValueType type, int offsetOut, + byte[] buffer, int currentBatchSize) { + + ByteBuffer bb = newLittleEndianBuffer(buffer, currentBatchSize); + + switch (type) { + case FP64 -> { + DoubleBuffer db = bb.asDoubleBuffer(); + while (db.hasRemaining()) + array.set(offsetOut++, db.get()); } case FP32 -> { - FloatBuffer floatBuffer = bb.asFloatBuffer(); - int numFloats = floatBuffer.remaining(); - for (int i = 0; i < numFloats; i++) { - out[offsetOut++] = floatBuffer.get(); - } + FloatBuffer fb = bb.asFloatBuffer(); + while (fb.hasRemaining()) + array.set(offsetOut++, fb.get()); + } + case INT64 -> { + LongBuffer lb = bb.asLongBuffer(); + while (lb.hasRemaining()) + array.set(offsetOut++, lb.get()); } case INT32 -> { - IntBuffer intBuffer = bb.asIntBuffer(); - int numInts = intBuffer.remaining(); - for (int i = 0; i < numInts; i++) { - out[offsetOut++] = intBuffer.get(); - } + IntBuffer ib = bb.asIntBuffer(); + while (ib.hasRemaining()) + array.set(offsetOut++, ib.get()); } case UINT8 -> { - for (int i = 0; i < currentBatchSize; i++) { - out[offsetOut++] = bb.get(i) & 0xFF; - } + for (int i = 0; i < currentBatchSize; i++) + array.set(offsetOut++, (int) (bb.get(i) & 0xFF)); } + case BOOLEAN -> { + for (int i = 0; i < currentBatchSize; i++) + array.set(offsetOut++, bb.get(i) != 0 ? 1.0 : 0.0); + } + default -> throw new UnsupportedOperationException("Unsupported fixed type: " + type); } return offsetOut; } - public static long writeNumpyArrayInBatches(BufferedOutputStream out, int id, int batchSize, int numElem, - Types.ValueType type, MatrixBlock mb) throws IOException { - int elemSize; - switch (type) { - case UINT8 -> elemSize = 1; - case INT32, FP32 -> elemSize = 4; - default -> elemSize = 8; + private static VarFillTiming readVariableTypeColumn( + BufferedInputStream in, int id, Array array, + Types.ValueType type, int elems, byte[] buffer) throws IOException { + + long tDecodeTotal = 0; + int numStrings = 0; + + int offset = 0; + // Use a reusable growable byte array to avoid repeated toByteArray() allocations + byte[] combined = new byte[32 * 1024]; // Start with 32KB + int combinedLen = 0; + + // Keep reading until all expected elements are filled + while (offset < elems) { + int chunk = in.read(buffer); + + // Ensure combined array is large enough + if (combinedLen + chunk > combined.length) { + // Grow array (double size, but at least accommodate new data) + int newSize = Math.max(combined.length * 2, combinedLen + chunk); + byte[] newCombined = new byte[newSize]; + System.arraycopy(combined, 0, newCombined, 0, combinedLen); + combined = newCombined; + } + + // Append newly read bytes + System.arraycopy(buffer, 0, combined, combinedLen, chunk); + combinedLen += chunk; + + // Try decoding as many complete elements as possible + long tDecodeStart = System.nanoTime(); + VarFillResult res = fillVariableArrayFromBytes(array, offset, elems, combined, combinedLen, type); + tDecodeTotal += System.nanoTime() - tDecodeStart; + int stringsDecoded = res.offsetOut - offset; + numStrings += stringsDecoded; + offset = res.offsetOut; + + // Retain any incomplete trailing bytes by shifting them to the start + int remainingBytes = res.remainingBytes; + if (remainingBytes > 0) { + // Move remaining bytes to the start of the buffer + System.arraycopy(combined, combinedLen - remainingBytes, combined, 0, remainingBytes); + combinedLen = remainingBytes; + } else { + combinedLen = 0; + } } - long totalBytesWritten = 0; - // Write start header - writeHandshake(id, out); + // ---- handshake check ---- + if(combinedLen == 0) + readHandshake(id, in); + else if (combinedLen == 4) { + byte[] tail = new byte[4]; + System.arraycopy(combined, 0, tail, 0, 4); + compareHandshakeIds(id, in, tail); + } + else + throw new IOException("Expected 4-byte handshake after last element, found " + combinedLen + " bytes"); - int bytesRemaining = numElem * elemSize; - int offset = 0; + return new VarFillTiming(tDecodeTotal, numStrings); + } - byte[] buffer = new byte[batchSize]; + /** + * Result container for variable-length decoding. + * + * @param offsetOut number of elements written to the output array + * @param remainingBytes number of unconsumed tail bytes (partial element) + */ + private record VarFillResult(int offsetOut, int remainingBytes) { + } - while (bytesRemaining > 0) { - int currentBatchSize = Math.min(batchSize, bytesRemaining); + private static VarFillResult fillVariableArrayFromBytes( + Array array, int offsetOut, int maxOffset, byte[] buffer, + int currentBatchSize, Types.ValueType type) { + + ByteBuffer bb = newLittleEndianBuffer(buffer, currentBatchSize); + int bytesConsumed = 0; + + // Each variable-length element = [int32 length][payload...] + while (bb.remaining() >= 4 && offsetOut < maxOffset) { + bb.mark(); + int len = bb.getInt(); + + if (len < 0) { + // null string + array.set(offsetOut++, (String) null); + bytesConsumed = bb.position(); + continue; + } + if (bb.remaining() < len) { + // Not enough bytes for full payload → rollback and stop + bb.reset(); + break; + } + + + switch (type) { + case STRING -> { + int stringStart = bb.position(); + + byte[] backingArray = bb.array(); + int arrayOffset = bb.arrayOffset() + stringStart; + String s = new String(backingArray, arrayOffset, len, StandardCharsets.UTF_8); + array.set(offsetOut++, s); + + bb.position(stringStart + len); + } - // Fill buffer from MatrixBlock into byte[] (typed) - int bytesWritten = fillByteArrayFromDoubleArray(type, mb, offset, buffer, currentBatchSize); - totalBytesWritten += bytesWritten; + default -> throw new UnsupportedOperationException( + "Unsupported variable-length type: " + type); + } - out.write(buffer, 0, currentBatchSize); - offset += currentBatchSize / elemSize; - bytesRemaining -= currentBatchSize; + bytesConsumed = bb.position(); } - out.flush(); + int remainingBytes = currentBatchSize - bytesConsumed; + return new VarFillResult(offsetOut, remainingBytes); + } + + /** + * Symmetric with readFrameColumnFromPipe — writes FrameBlock column data to pipe. + * Supports both fixed-size types and variable-length types (strings). + */ + public static long writeFrameColumnToPipe( + BufferedOutputStream out, int id, int batchSize, + Array array, Types.ValueType type) throws IOException { + + long tStart = System.nanoTime(); + long tIoStart, tIoTotal = 0; + long tEncodeTotal = 0; + int numStrings = 0; + long totalBytesWritten = 0; - // Write end header - writeHandshake(id, out); - return totalBytesWritten; + try { + writeHandshake(id, out); + + if (type != Types.ValueType.STRING) { + tIoStart = System.nanoTime(); + totalBytesWritten = writeFixedTypeColumn(out, array, type, batchSize); + tIoTotal = System.nanoTime() - tIoStart; + } else { + tIoStart = System.nanoTime(); + VarWriteTiming timing = writeVariableTypeColumn(out, array, type, batchSize); + tIoTotal = System.nanoTime() - tIoStart; + tEncodeTotal = timing.encodeTime; + numStrings = timing.numStrings; + totalBytesWritten = timing.totalBytes; + } + + out.flush(); + writeHandshake(id, out); + + long tTotal = System.nanoTime() - tStart; + if (type == Types.ValueType.STRING) { + LOG.debug(String.format( + "Java writeFrameColumnToPipe timing: total=%.3fs, I/O=%.3fs (%.1f%%), encode=%.3fs (%.1f%%), strings=%d", + tTotal / 1e9, tIoTotal / 1e9, 100.0 * tIoTotal / tTotal, + tEncodeTotal / 1e9, 100.0 * tEncodeTotal / tTotal, numStrings)); + } + + return totalBytesWritten; + } catch (Exception e) { + LOG.error("Error occurred while writing FrameBlock column to pipe #" + id, e); + throw e; + } } - private static int fillByteArrayFromDoubleArray(Types.ValueType type, MatrixBlock mb, int offsetIn, - byte[] buffer, int maxBytes) { - ByteBuffer bb = ByteBuffer.wrap(buffer, 0, maxBytes).order(ByteOrder.LITTLE_ENDIAN); - int r,c; - switch (type) { - default -> { // FP64 - DoubleBuffer doubleBuffer = bb.asDoubleBuffer(); - int count = Math.min(doubleBuffer.remaining(), mb.getNumRows() * mb.getNumColumns() - offsetIn); - for (int i = 0; i < count; i++) { - r = (offsetIn + i) / mb.getNumColumns(); - c = (offsetIn + i) % mb.getNumColumns(); - doubleBuffer.put(mb.getDouble(r,c)); - } - return count * 8; + private static class VarWriteTiming { + long encodeTime; + int numStrings; + long totalBytes; + VarWriteTiming(long encodeTime, int numStrings, long totalBytes) { + this.encodeTime = encodeTime; + this.numStrings = numStrings; + this.totalBytes = totalBytes; + } + } + + private static long writeFixedTypeColumn( + BufferedOutputStream out, Array array, + Types.ValueType type, int batchSize) throws IOException { + + int elemSize = getElementSize(type); + int rows = array.size(); + long totalBytes = (long) rows * elemSize; + + byte[] buffer = new byte[batchSize]; + int arrayIndex = 0; + int bufferPos = 0; + + while (arrayIndex < rows) { + // Calculate how many elements can fit in the remaining buffer space + int remainingBufferSpace = batchSize - bufferPos; + int elementsToWrite = Math.min((remainingBufferSpace / elemSize), rows - arrayIndex); + + if (elementsToWrite == 0) { + // Buffer is full, flush it + out.write(buffer, 0, bufferPos); + bufferPos = 0; + continue; } - case FP32 -> { - FloatBuffer floatBuffer = bb.asFloatBuffer(); - int count = Math.min(floatBuffer.remaining(), mb.getNumRows() * mb.getNumColumns() - offsetIn); - for (int i = 0; i < count; i++) { - r = (offsetIn + i) / mb.getNumColumns(); - c = (offsetIn + i) % mb.getNumColumns(); - floatBuffer.put((float) mb.getDouble(r,c)); + + // Convert elements to bytes directly into the buffer + ByteBuffer bb = ByteBuffer.wrap(buffer, bufferPos, elementsToWrite * elemSize) + .order(ByteOrder.LITTLE_ENDIAN); + + switch (type) { + case FP64 -> { + DoubleBuffer db = bb.asDoubleBuffer(); + for (int i = 0; i < elementsToWrite; i++) { + db.put(array.getAsDouble(arrayIndex++)); + } + bufferPos += elementsToWrite * 8; + } + case FP32 -> { + FloatBuffer fb = bb.asFloatBuffer(); + for (int i = 0; i < elementsToWrite; i++) { + fb.put((float) array.getAsDouble(arrayIndex++)); + } + bufferPos += elementsToWrite * 4; + } + case INT64 -> { + LongBuffer lb = bb.asLongBuffer(); + for (int i = 0; i < elementsToWrite; i++) { + lb.put((long) array.getAsDouble(arrayIndex++)); + } + bufferPos += elementsToWrite * 8; } - return count * 4; - } case INT32 -> { - IntBuffer intBuffer = bb.asIntBuffer(); - int count = Math.min(intBuffer.remaining(), mb.getNumRows() * mb.getNumColumns() - offsetIn); - for (int i = 0; i < count; i++) { - r = (offsetIn + i) / mb.getNumColumns(); - c = (offsetIn + i) % mb.getNumColumns(); - intBuffer.put((int) mb.getDouble(r,c)); + IntBuffer ib = bb.asIntBuffer(); + for (int i = 0; i < elementsToWrite; i++) { + ib.put((int) array.getAsDouble(arrayIndex++)); } - return count * 4; + bufferPos += elementsToWrite * 4; } - case UINT8 -> { - int count = Math.min(maxBytes, mb.getNumRows() * mb.getNumColumns() - offsetIn); - for (int i = 0; i < count; i++) { - r = (offsetIn + i) / mb.getNumColumns(); - c = (offsetIn + i) % mb.getNumColumns(); - buffer[i] = (byte) ((int) mb.getDouble(r,c) & 0xFF); + case BOOLEAN -> { + for (int i = 0; i < elementsToWrite; i++) { + buffer[bufferPos++] = (byte) (array.getAsDouble(arrayIndex++) != 0.0 ? 1 : 0); } - return count; + } + default -> throw new UnsupportedOperationException("Unsupported type: " + type); } } + + out.write(buffer, 0, bufferPos); + return totalBytes; + } + + private static VarWriteTiming writeVariableTypeColumn( + BufferedOutputStream out, Array array, + Types.ValueType type, int batchSize) throws IOException { + + long tEncodeTotal = 0; + int numStrings = 0; + long totalBytesWritten = 0; + + byte[] buffer = new byte[batchSize]; // Use 2x batch size like Python side + int pos = 0; + + int rows = array.size(); + + for (int i = 0; i < rows; i++) { + numStrings++; + + // Get string value + Object value = array.get(i); + boolean isNull = (value == null); + + int length; + byte[] encoded; + + if (isNull) { + // Use -1 as marker for null values + length = -1; + encoded = new byte[0]; + } else { + // Encode to UTF-8 + long tEncodeStart = System.nanoTime(); + String str = value.toString(); + encoded = str.getBytes(StandardCharsets.UTF_8); + tEncodeTotal += System.nanoTime() - tEncodeStart; + length = encoded.length; + } + + int entrySize = 4 + (length >= 0 ? length : 0); // length prefix + data (or just prefix for null) + + // If next string doesn't fit comfortably, flush first half + if (pos + entrySize > batchSize) { + out.write(buffer, 0, pos); + totalBytesWritten += pos; + pos = 0; + } + + // Write length prefix (little-endian) - use -1 for null + ByteBuffer bb = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN); + bb.putInt(length); + System.arraycopy(bb.array(), 0, buffer, pos, 4); + pos += 4; + + // Write the encoded bytes (skip for null) + if (length > 0) { + int remainingBytes = length; + int encodedOffset = 0; + while (remainingBytes > 0) { + int chunk = Math.min(remainingBytes, batchSize - pos); + System.arraycopy(encoded, encodedOffset, buffer, pos, chunk); + pos += chunk; + if (pos == batchSize) { + out.write(buffer, 0, pos); + totalBytesWritten += pos; + pos = 0; + } + encodedOffset += chunk; + remainingBytes -= chunk; + } + } + } + + // Flush the tail + if (pos > 0) { + out.write(buffer, 0, pos); + totalBytesWritten += pos; + } + + return new VarWriteTiming(tEncodeTotal, numStrings, totalBytesWritten); } } \ No newline at end of file diff --git a/src/main/python/systemds/context/systemds_context.py b/src/main/python/systemds/context/systemds_context.py index 41cfdfc698f..99a6cba57b8 100644 --- a/src/main/python/systemds/context/systemds_context.py +++ b/src/main/python/systemds/context/systemds_context.py @@ -126,6 +126,7 @@ def __setup_data_transfer(self, data_transfer_mode=0, multi_pipe_enabled=False): self._FIFO_PY2JAVA_PIPES = out_pipes self._FIFO_JAVA2PY_PIPES = in_pipes else: + self._log.info("Using py4j for data transfer") self._data_transfer_mode = 0 def __init_pipes(self, num_pipes): diff --git a/src/main/python/systemds/utils/converters.py b/src/main/python/systemds/utils/converters.py index 93744a267e1..ab7b7ffd8d5 100644 --- a/src/main/python/systemds/utils/converters.py +++ b/src/main/python/systemds/utils/converters.py @@ -20,16 +20,21 @@ # ------------------------------------------------------------- import struct -import tempfile -import mmap -import time - +from time import time import numpy as np import pandas as pd import concurrent.futures from py4j.java_gateway import JavaClass, JavaGateway, JavaObject, JVMView import os +# Constants +_HANDSHAKE_OFFSET = 1000 +_DEFAULT_BATCH_SIZE_BYTES = 32 * 1024 # 32 KB +_FRAME_BATCH_SIZE_BYTES = 16 * 1024 # 16 KB +_MIN_BYTES_PER_PIPE = 1024 * 1024 * 1024 # 1 GB +_STRING_LENGTH_PREFIX_SIZE = 4 # int32 +_MAX_ROWS_FOR_OPTIMIZED_CONVERSION = 4 + def format_bytes(size): for unit in ["Bytes", "KB", "MB", "GB", "TB", "PB"]: @@ -38,36 +43,39 @@ def format_bytes(size): size /= 1024.0 -def pipe_transfer_header(pipe, pipe_id): - handshake = struct.pack(" 0: + buf[:buf_remaining] = buf[buf_pos : buf_pos + buf_remaining] + + # Read more data + t0 = time() + chunk = os.read(fd, batch_size) + t_io += time() - t0 + if not chunk: + raise IOError("Pipe read returned empty data unexpectedly") + + # Append new data to buffer + chunk_len = len(chunk) + if buf_remaining + chunk_len > len(buf): + # Grow buffer if needed + new_buf = bytearray(len(buf) * 2) + new_buf[:buf_remaining] = buf[:buf_remaining] + buf = new_buf + + buf[buf_remaining : buf_remaining + chunk_len] = chunk + buf_remaining += chunk_len + buf_pos = 0 + + # Read length prefix (little-endian int32) + # Note: length can be -1 (0xFFFFFFFF) to indicate null value + length = struct.unpack( + " 0: + buf[:buf_remaining] = buf[buf_pos : buf_pos + buf_remaining] + buf_pos = 0 + + # Read more data until we have enough + bytes_needed = length - buf_remaining + while bytes_needed > 0: + t0 = time() + chunk = os.read(fd, min(batch_size, bytes_needed)) + t_io += time() - t0 + if not chunk: + raise IOError("Pipe read returned empty data unexpectedly") + + chunk_len = len(chunk) + if buf_remaining + chunk_len > len(buf): + # Grow buffer if needed + new_buf = bytearray(len(buf) * 2) + new_buf[:buf_remaining] = buf[:buf_remaining] + buf = new_buf + + buf[buf_remaining : buf_remaining + chunk_len] = chunk + buf_remaining += chunk_len + bytes_needed -= chunk_len + + # Decode the string + t0 = time() + if length == 0: + decoded_str = "" + else: + decoded_str = buf[buf_pos : buf_pos + length].decode("utf-8") + t_decode += time() - t0 + + strings.append(decoded_str) + buf_pos += length + buf_remaining -= length + i += 1 + header_received = False + if buf_remaining == _STRING_LENGTH_PREFIX_SIZE: + # There is still data in the buffer, probably the handshake header + received = struct.unpack( + " _STRING_LENGTH_PREFIX_SIZE: + raise ValueError( + "Unexpected number of bytes in buffer: {}".format(buf_remaining) + ) + + t_total = time() - t_total_start + return (strings, t_total, t_decode, t_io, num_strings, header_received) + + +def _get_numpy_value_type(jvm, dtype): + """Maps numpy dtype to SystemDS ValueType.""" + if dtype is np.dtype(np.uint8): + return jvm.org.apache.sysds.common.Types.ValueType.UINT8 + elif dtype is np.dtype(np.int32): + return jvm.org.apache.sysds.common.Types.ValueType.INT32 + elif dtype is np.dtype(np.float32): + return jvm.org.apache.sysds.common.Types.ValueType.FP32 + else: + return jvm.org.apache.sysds.common.Types.ValueType.FP64 + + +def _transfer_matrix_block_single_pipe( + sds, pipe_id, pipe, mv, total_bytes, rows, cols, value_type, ep +): + """Transfers matrix block data using a single pipe.""" + sds._log.debug( + "Using single FIFO pipe for transferring {}".format(format_bytes(total_bytes)) + ) + fut = sds._executor_pool.submit( + ep.startReadingMbFromPipe, pipe_id, rows, cols, value_type + ) + + _pipe_transfer_header(pipe, pipe_id) # start + _pipe_transfer_bytes(pipe, 0, total_bytes, _DEFAULT_BATCH_SIZE_BYTES, mv) + _pipe_transfer_header(pipe, pipe_id) # end + + return fut.result() # Java returns MatrixBlock + + +def _transfer_matrix_block_multi_pipe( + sds, mv, arr, np_arr, total_bytes, rows, cols, value_type, ep, jvm +): + """Transfers matrix block data using multiple pipes in parallel.""" + num_pipes = min(len(sds._FIFO_PY2JAVA_PIPES), total_bytes // _MIN_BYTES_PER_PIPE) + # Align blocks per element + num_elems = len(arr) + elem_size = np_arr.dtype.itemsize + min_elems_block = num_elems // num_pipes + left_over = num_elems % num_pipes + block_sizes = sds.java_gateway.new_array(jvm.int, num_pipes) + for i in range(num_pipes): + block_sizes[i] = min_elems_block + int(i < left_over) + + # Run java readers in parallel + fut_java = sds._executor_pool.submit( + ep.startReadingMbFromPipes, block_sizes, rows, cols, value_type + ) + + # Run writers in parallel + def _pipe_write_task(_pipe_id, _pipe, memview, start, end): + _pipe_transfer_header(_pipe, _pipe_id) + _pipe_transfer_bytes(_pipe, start, end, _DEFAULT_BATCH_SIZE_BYTES, memview) + _pipe_transfer_header(_pipe, _pipe_id) + + cur = 0 + futures = [] + for i, size in enumerate(block_sizes): + pipe = sds._FIFO_PY2JAVA_PIPES[i] + start_byte = cur * elem_size + cur += size + end_byte = cur * elem_size + + fut = sds._executor_pool.submit( + _pipe_write_task, i, pipe, mv, start_byte, end_byte + ) + futures.append(fut) + + return fut_java.result() # Java returns MatrixBlock + + def numpy_to_matrix_block(sds, np_arr: np.array): """Converts a given numpy array, to internal matrix block representation. @@ -89,7 +291,7 @@ def numpy_to_matrix_block(sds, np_arr: np.array): cols = np_arr.shape[1] if np_arr.ndim == 2 else 1 if rows > 2147483647: - raise Exception("") + raise ValueError("Matrix rows exceed maximum value (2147483647)") # If not numpy array then convert to numpy array if not isinstance(np_arr, np.ndarray): @@ -98,90 +300,45 @@ def numpy_to_matrix_block(sds, np_arr: np.array): jvm: JVMView = sds.java_gateway.jvm ep = sds.java_gateway.entry_point - # flatten and set value type + # Flatten and set value type if np_arr.dtype is np.dtype(np.uint8): arr = np_arr.ravel() - value_type = jvm.org.apache.sysds.common.Types.ValueType.UINT8 elif np_arr.dtype is np.dtype(np.int32): arr = np_arr.ravel() - value_type = jvm.org.apache.sysds.common.Types.ValueType.INT32 elif np_arr.dtype is np.dtype(np.float32): arr = np_arr.ravel() - value_type = jvm.org.apache.sysds.common.Types.ValueType.FP32 else: arr = np_arr.ravel().astype(np.float64) - value_type = jvm.org.apache.sysds.common.Types.ValueType.FP64 + + value_type = _get_numpy_value_type(jvm, np_arr.dtype) if sds._data_transfer_mode == 1: mv = memoryview(arr).cast("B") total_bytes = mv.nbytes - min_bytes_per_pipe = 1024 * 1024 * 1024 * 1 - batch_size_bytes = 32 * 1024 # pipe's ring buffer is 64KB # Using multiple pipes is disabled by default use_single_pipe = ( - not sds._multi_pipe_enabled or total_bytes < 2 * min_bytes_per_pipe + not sds._multi_pipe_enabled or total_bytes < 2 * _MIN_BYTES_PER_PIPE ) if use_single_pipe: - sds._log.debug( - "Using single FIFO pipe for reading {}".format( - format_bytes(total_bytes) - ) - ) - pipe_id = 0 - pipe = sds._FIFO_PY2JAVA_PIPES[pipe_id] - fut = sds._executor_pool.submit( - ep.startReadingMbFromPipe, pipe_id, rows, cols, value_type + return _transfer_matrix_block_single_pipe( + sds, + 0, + sds._FIFO_PY2JAVA_PIPES[0], + mv, + total_bytes, + rows, + cols, + value_type, + ep, ) - - pipe_transfer_header(pipe, pipe_id) # start - pipe_transfer_bytes(pipe, 0, total_bytes, batch_size_bytes, mv) - pipe_transfer_header(pipe, pipe_id) # end - - return fut.result() # Java returns MatrixBlock else: - num_pipes = min( - len(sds._FIFO_PY2JAVA_PIPES), total_bytes // min_bytes_per_pipe - ) - # align blocks per element - num_elems = len(arr) - elem_size = np_arr.dtype.itemsize - min_elems_block = num_elems // num_pipes - left_over = num_elems % num_pipes - block_sizes = sds.java_gateway.new_array(jvm.int, num_pipes) - for i in range(num_pipes): - block_sizes[i] = min_elems_block + int(i < left_over) - - # run java readers in parallel - fut_java = sds._executor_pool.submit( - ep.startReadingMbFromPipes, block_sizes, rows, cols, value_type + return _transfer_matrix_block_multi_pipe( + sds, mv, arr, np_arr, total_bytes, rows, cols, value_type, ep, jvm ) - - # run writers in parallel - def _pipe_write_task(_pipe_id, _pipe, memview, start, end): - pipe_transfer_header(_pipe, _pipe_id) - pipe_transfer_bytes(_pipe, start, end, batch_size_bytes, memview) - pipe_transfer_header(_pipe, _pipe_id) - - cur = 0 - futures = [] - for i, size in enumerate(block_sizes): - pipe = sds._FIFO_PY2JAVA_PIPES[i] - start_byte = cur * elem_size - cur += size - end_byte = cur * elem_size - - fut = sds._executor_pool.submit( - _pipe_write_task, i, pipe, mv, start_byte, end_byte - ) - futures.append(fut) - - return fut_java.result() # Java returns MatrixBlock else: - # prepare byte buffer. + # Prepare byte buffer and send data to java via Py4J buf = arr.tobytes() - - # Send data to java. j_class: JavaClass = jvm.org.apache.sysds.runtime.util.Py4jConverterUtils return j_class.convertPy4JArrayToMB(buf, rows, cols, value_type) @@ -213,7 +370,7 @@ def matrix_block_to_numpy(sds, mb: JavaObject): pipe = sds._FIFO_JAVA2PY_PIPES[pipe_id] sds._log.debug( - "Using single FIFO pipe for reading {}".format( + "Using single FIFO pipe for transferring {}".format( format_bytes(total_bytes) ) ) @@ -221,14 +378,9 @@ def matrix_block_to_numpy(sds, mb: JavaObject): # Java starts writing to pipe in background fut = sds._executor_pool.submit(ep.startWritingMbToPipe, pipe_id, mb) - pipe_receive_header(pipe, pipe_id, sds._log) - sds._log.debug( - "Py4j task for writing {} [{}] is: done=[{}], running=[{}]".format( - format_bytes(total_bytes), sds._FIFO_PATH, fut.done(), fut.running() - ) - ) - pipe_receive_bytes(pipe, mv, 0, total_bytes, batch_size_bytes, sds._log) - pipe_receive_header(pipe, pipe_id, sds._log) + _pipe_receive_header(pipe, pipe_id, sds._log) + _pipe_receive_bytes(pipe, mv, 0, total_bytes, batch_size_bytes, sds._log) + _pipe_receive_header(pipe, pipe_id, sds._log) fut.result() sds._log.debug("Reading is done for {}".format(format_bytes(total_bytes))) @@ -246,7 +398,9 @@ def matrix_block_to_numpy(sds, mb: JavaObject): return None -def convert(jvm, fb, idx, num_elements, value_type, pd_series, conversion="column"): +def _convert_pandas_series_to_frameblock( + jvm, fb, idx, num_elements, value_type, pd_series, conversion="column" +): """Converts a given pandas column or row to a FrameBlock representation. :param jvm: The JVMView of the current SystemDS context. @@ -326,59 +480,411 @@ def pandas_to_frame_block(sds, pd_df: pd.DataFrame): try: jc_String = jvm.java.lang.String jc_FrameBlock = jvm.org.apache.sysds.runtime.frame.data.FrameBlock - # execution speed increases with optimized code when the number of rows exceeds 4 - if rows > 4: - # Row conversion if more columns than rows and all columns have the same type, otherwise column - conversion_type = ( - "row" if cols > rows and len(set(pd_df.dtypes)) == 1 else "column" - ) - if conversion_type == "row": - pd_df = pd_df.transpose() - col_names = pd_df.columns.tolist() # re-calculate col names - fb = jc_FrameBlock( + if sds._data_transfer_mode == 1: + return pandas_to_frame_block_pipe( + col_names, + j_colNameArray, j_valueTypeArray, + jc_FrameBlock, + pd_df, + rows, + schema, + sds, + ) + else: + return pandas_to_frame_block_py4j( + col_names, j_colNameArray, - rows if conversion_type == "column" else None, + j_valueTypeArray, + jc_FrameBlock, + jc_String, + pd_df, + rows, + cols, + schema, + sds, + ) + + except Exception as e: + sds.exception_and_close(e) + + +def pandas_to_frame_block_py4j( + col_names: list, + j_colNameArray, + j_valueTypeArray, + jc_FrameBlock, + jc_String, + pd_df: pd.DataFrame, + rows: int, + cols: int, + schema: list, + sds, +): + java_gate = sds.java_gateway + jvm = java_gate.jvm + + # Execution speed increases with optimized code when the number of rows exceeds threshold + if rows > _MAX_ROWS_FOR_OPTIMIZED_CONVERSION: + # Row conversion if more columns than rows and all columns have the same type, otherwise column + conversion_type = ( + "row" if cols > rows and len(set(pd_df.dtypes)) == 1 else "column" + ) + if conversion_type == "row": + pd_df = pd_df.transpose() + col_names = pd_df.columns.tolist() # re-calculate col names + + fb = jc_FrameBlock( + j_valueTypeArray, + j_colNameArray, + rows if conversion_type == "column" else None, + ) + if conversion_type == "row": + fb.ensureAllocatedColumns(rows) + + # We use .submit() with explicit .result() calling to properly propagate exceptions + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [ + executor.submit( + _convert_pandas_series_to_frameblock, + jvm, + fb, + i, + rows if conversion_type == "column" else cols, + schema[i], + pd_df[col_name], + conversion_type, + ) + for i, col_name in enumerate(col_names) + ] + + for future in concurrent.futures.as_completed(futures): + future.result() + + return fb + else: + j_dataArray = java_gate.new_array(jc_String, rows, cols) + + for j, col_name in enumerate[str](col_names): + col_data = pd_df[col_name].fillna("").to_numpy(dtype=str) + + for i in range(col_data.shape[0]): + if col_data[i]: + j_dataArray[i][j] = col_data[i] + + fb = jc_FrameBlock(j_valueTypeArray, j_colNameArray, j_dataArray) + return fb + + +def _transfer_string_column_to_pipe( + sds, pipe, pipe_id, pd_series, col_name, rows, fb, col_idx, schema, ep +): + """Transfers a string column to FrameBlock via pipe.""" + t0 = time() + + # Start Java reader in background + fut = sds._executor_pool.submit( + ep.startReadingColFromPipe, pipe_id, fb, rows, -1, col_idx, schema, True + ) + + _pipe_transfer_header(pipe, pipe_id) # start + py_timing = _pipe_transfer_strings(pipe, pd_series, _FRAME_BATCH_SIZE_BYTES) + _pipe_transfer_header(pipe, pipe_id) # end + + fut.result() + + t1 = time() + + # Print aggregated timing breakdown + py_total, py_encoding, py_packing, py_io, num_strings = py_timing + total_time = t1 - t0 + + sds._log.debug( + f""" + === TO FrameBlock - Timing Breakdown (Strings) === + Column: {col_name} + Total time: {total_time:.3f}s + Python side (writing): + Total: {py_total:.3f}s + Encoding: {py_encoding:.3f}s ({100*py_encoding/py_total:.1f}%) + Struct packing: {py_packing:.3f}s ({100*py_packing/py_total:.1f}%) + I/O writes: {py_io:.3f}s ({100*py_io/py_total:.1f}%) + Other: {py_total - py_encoding - py_packing - py_io:.3f}s + Strings processed: {num_strings:,} + """ + ) + + +def _transfer_numeric_column_to_pipe( + sds, pipe, pipe_id, byte_data, col_name, rows, fb, col_idx, schema, ep +): + """Transfers a numeric column to FrameBlock via pipe.""" + mv = memoryview(byte_data).cast("B") + total_bytes = mv.nbytes + sds._log.debug( + "TO FrameBlock - Using single FIFO pipe for transferring {} | {} bytes | Column: {}".format( + format_bytes(total_bytes), total_bytes, col_name + ) + ) + + fut = sds._executor_pool.submit( + ep.startReadingColFromPipe, + pipe_id, + fb, + rows, + total_bytes, + col_idx, + schema, + True, + ) + + _pipe_transfer_header(pipe, pipe_id) # start + _pipe_transfer_bytes(pipe, 0, total_bytes, _FRAME_BATCH_SIZE_BYTES, mv) + _pipe_transfer_header(pipe, pipe_id) # end + + fut.result() + + +def pandas_to_frame_block_pipe( + col_names: list, + j_colNameArray, + j_valueTypeArray, + jc_FrameBlock, + pd_df: pd.DataFrame, + rows: int, + schema: list, + sds, +): + ep = sds.java_gateway.entry_point + fb = jc_FrameBlock( + j_valueTypeArray, + j_colNameArray, + rows, + ) + + pipe_id = 0 + pipe = sds._FIFO_PY2JAVA_PIPES[pipe_id] + + for i, col_name in enumerate(col_names): + pd_series = pd_df[col_name] + + if pd_series.dtype == "string" or pd_series.dtype == "object": + _transfer_string_column_to_pipe( + sds, pipe, pipe_id, pd_series, col_name, rows, fb, i, schema[i], ep ) - if conversion_type == "row": - fb.ensureAllocatedColumns(rows) - - # We use .submit() with explicit .result() calling to properly propagate exceptions - with concurrent.futures.ThreadPoolExecutor() as executor: - futures = [ - executor.submit( - convert, - jvm, - fb, - i, - rows if conversion_type == "column" else cols, - schema[i], - pd_df[col_name], - conversion_type, - ) - for i, col_name in enumerate(col_names) - ] - - for future in concurrent.futures.as_completed(futures): - future.result() - - return fb + continue + + # Prepare numeric data + if pd_series.dtype == "bool": + # Convert boolean to uint8 (0/1) for proper byte representation + byte_data = pd_series.fillna(False).astype(np.uint8).to_numpy() else: - j_dataArray = java_gate.new_array(jc_String, rows, cols) + byte_data = pd_series.fillna("").to_numpy() - for j, col_name in enumerate(col_names): - col_data = pd_df[col_name].fillna("").to_numpy(dtype=str) + _transfer_numeric_column_to_pipe( + sds, pipe, pipe_id, byte_data, col_name, rows, fb, i, schema[i], ep + ) - for i in range(col_data.shape[0]): - if col_data[i]: - j_dataArray[i][j] = col_data[i] + return fb - fb = jc_FrameBlock(j_valueTypeArray, j_colNameArray, j_dataArray) - return fb - except Exception as e: - sds.exception_and_close(e) +def _pipe_transfer_strings(pipe, pd_series, batch_size=_DEFAULT_BATCH_SIZE_BYTES): + """ + Streams UTF-8 encoded strings to the pipe in batches without building the full bytearray first. + Uses a 2×batch_size buffer to accommodate long strings without frequent flushes. + + Returns: tuple of (total_time, encoding_time, packing_time, io_time, num_strings) + """ + t_total_start = time() + t_encoding = 0.0 + t_packing = 0.0 + t_io = 0.0 + num_strings = 0 + + buf = bytearray(batch_size * 2) + view = memoryview(buf) + pos = 0 + fd = pipe.fileno() # Cache file descriptor to avoid repeated lookups + + # Convert pandas Series to list/array for faster iteration (avoids pandas overhead) + # Use .values for numpy array or .tolist() for Python list - tolist() is often faster for strings + values = pd_series.tolist() if hasattr(pd_series, "tolist") else list(pd_series) + + for value in values: + num_strings += 1 + + # Check for null values (None, pd.NA, np.nan) + is_null = value is None or pd.isna(value) + + if is_null: + # Use -1 as marker for null values (signed int32) + length = -1 + entry_size = _STRING_LENGTH_PREFIX_SIZE # Only length prefix, no data bytes + else: + # Encode and get length - len() on bytes is very fast (O(1) attribute access) + t0 = time() + encoded = value.encode("utf-8") + t_encoding += time() - t0 + + length = len(encoded) # Fast O(1) operation on bytes + entry_size = _STRING_LENGTH_PREFIX_SIZE + length # length prefix + data + + # if next string doesn't fit comfortably, flush first half + if pos + entry_size > batch_size: + # write everything up to 'pos' + t0 = time() + written = os.write(fd, view[:pos]) + t_io += time() - t0 + if written != pos: + raise IOError(f"Expected to write {pos} bytes, wrote {written}") + pos = 0 + + # Write length prefix (little-endian, signed int32 for -1 null marker) + t0 = time() + struct.pack_into(" 0: + t0 = time() + written = os.write(fd, view[:pos]) + t_io += time() - t0 + if written != pos: + raise IOError(f"Expected to write {pos} bytes, wrote {written}") + + t_total = time() - t_total_start + return (t_total, t_encoding, t_packing, t_io, num_strings) + + +def _get_elem_size_for_type(d_type): + """Returns the element size in bytes for a given SystemDS type.""" + return { + "INT32": 4, + "INT64": 8, + "FP64": 8, + "BOOLEAN": 1, + "FP32": 4, + "UINT8": 1, + "CHARACTER": 1, + }.get(d_type, 8) + + +def _get_numpy_dtype_for_type(d_type): + """Returns the numpy dtype for a given SystemDS type.""" + dtype_map = { + "INT32": np.int32, + "INT64": np.int64, + "FP64": np.float64, + "BOOLEAN": np.dtype("?"), + "FP32": np.float32, + "UINT8": np.uint8, + "CHARACTER": np.char, + } + return dtype_map.get(d_type, np.float64) + + +def _receive_string_column_from_pipe( + sds, pipe, pipe_id, num_rows, batch_size_bytes, col_name +): + """Receives a string column from FrameBlock via pipe.""" + py_strings, py_total, py_decode, py_io, num_strings, header_received = ( + _pipe_receive_strings(pipe, num_rows, batch_size_bytes, pipe_id, sds._log) + ) + + sds._log.debug( + f""" + === FROM FrameBlock - Timing Breakdown (Strings) === + Column: {col_name} + Total time: {py_total:.3f}s + Python side (reading): + Total: {py_total:.3f}s + Decoding: {py_decode:.3f}s ({100*py_decode/py_total:.1f}%) + I/O reads: {py_io:.3f}s ({100*py_io/py_total:.1f}%) + Other: {py_total - py_decode - py_io:.3f}s + Strings processed: {num_strings:,} + """ + ) + + if not header_received: + _pipe_receive_header(pipe, pipe_id, sds._log) + + return py_strings + + +def _receive_numeric_column_from_pipe( + sds, pipe, pipe_id, d_type, num_rows, batch_size_bytes, col_name +): + """Receives a numeric column from FrameBlock via pipe.""" + elem_size = _get_elem_size_for_type(d_type) + total_bytes = num_rows * elem_size + numpy_dtype = _get_numpy_dtype_for_type(d_type) + + sds._log.debug( + "FROM FrameBlock - Using single FIFO pipe for transferring {} | {} bytes | Column: {} | Type: {}".format( + format_bytes(total_bytes), + total_bytes, + col_name, + d_type, + ) + ) + + if d_type == "BOOLEAN": + # Read as uint8 first, then convert to boolean + # This ensures proper interpretation of 0/1 bytes + arr_uint8 = np.empty(num_rows, dtype=np.uint8) + mv = memoryview(arr_uint8).cast("B") + _pipe_receive_bytes(pipe, mv, 0, total_bytes, batch_size_bytes, sds._log) + ret = arr_uint8.astype(bool) + else: + arr = np.empty(num_rows, dtype=numpy_dtype) + mv = memoryview(arr).cast("B") + _pipe_receive_bytes(pipe, mv, 0, total_bytes, batch_size_bytes, sds._log) + ret = arr + + _pipe_receive_header(pipe, pipe_id, sds._log) + return ret + + +def _receive_column_py4j(fb, col_array, c_index, d_type, num_rows): + """Receives a column from FrameBlock using Py4J (fallback method).""" + if d_type == "STRING": + ret = [] + for row in range(num_rows): + ent = col_array.getIndexAsBytes(row) + if ent: + ent = ent.decode() + ret.append(ent) + else: + ret.append(None) + elif d_type == "INT32": + byteArray = fb.getColumn(c_index).getAsByteArray() + ret = np.frombuffer(byteArray, dtype=np.int32) + elif d_type == "INT64": + byteArray = fb.getColumn(c_index).getAsByteArray() + ret = np.frombuffer(byteArray, dtype=np.int64) + elif d_type == "FP64": + byteArray = fb.getColumn(c_index).getAsByteArray() + ret = np.frombuffer(byteArray, dtype=np.float64) + elif d_type == "BOOLEAN": + # TODO maybe it is more efficient to bit pack the booleans. + # https://stackoverflow.com/questions/5602155/numpy-boolean-array-with-1-bit-entries + byteArray = fb.getColumn(c_index).getAsByteArray() + ret = np.frombuffer(byteArray, dtype=np.dtype("?")) + elif d_type == "CHARACTER": + byteArray = fb.getColumn(c_index).getAsByteArray() + ret = np.frombuffer(byteArray, dtype=np.char) + else: + raise NotImplementedError( + f"Not Implemented {d_type} for systemds to pandas parsing" + ) + return ret def frame_block_to_pandas(sds, fb: JavaObject): @@ -387,45 +893,55 @@ def frame_block_to_pandas(sds, fb: JavaObject): :param sds: The current systemds context. :param fb: A pointer to the JVM's FrameBlock object. """ - num_rows = fb.getNumRows() num_cols = fb.getNumColumns() df = pd.DataFrame() + ep = sds.java_gateway.entry_point + jvm = sds.java_gateway.jvm + for c_index in range(num_cols): col_array = fb.getColumn(c_index) - d_type = col_array.getValueType().toString() - if d_type == "STRING": - ret = [] - for row in range(num_rows): - ent = col_array.getIndexAsBytes(row) - if ent: - ent = ent.decode() - ret.append(ent) - else: - ret.append(None) - elif d_type == "INT32": - byteArray = fb.getColumn(c_index).getAsByteArray() - ret = np.frombuffer(byteArray, dtype=np.int32) - elif d_type == "INT64": - byteArray = fb.getColumn(c_index).getAsByteArray() - ret = np.frombuffer(byteArray, dtype=np.int64) - elif d_type == "FP64": - byteArray = fb.getColumn(c_index).getAsByteArray() - ret = np.frombuffer(byteArray, dtype=np.float64) - elif d_type == "BOOLEAN": - # TODO maybe it is more efficient to bit pack the booleans. - # https://stackoverflow.com/questions/5602155/numpy-boolean-array-with-1-bit-entries - byteArray = fb.getColumn(c_index).getAsByteArray() - ret = np.frombuffer(byteArray, dtype=np.dtype("?")) - elif d_type == "CHARACTER": - byteArray = fb.getColumn(c_index).getAsByteArray() - ret = np.frombuffer(byteArray, dtype=np.char) - else: - raise NotImplementedError( - f"Not Implemented {d_type} for systemds to pandas parsing" + + if sds._data_transfer_mode == 1: + # Use pipe transfer for faster data transfer + batch_size_bytes = _DEFAULT_BATCH_SIZE_BYTES + pipe_id = 0 + pipe = sds._FIFO_JAVA2PY_PIPES[pipe_id] + + # Java starts writing to pipe in background + fut = sds._executor_pool.submit( + ep.startWritingColToPipe, pipe_id, fb, c_index ) + + _pipe_receive_header(pipe, pipe_id, sds._log) + + if d_type == "STRING": + ret = _receive_string_column_from_pipe( + sds, + pipe, + pipe_id, + num_rows, + batch_size_bytes, + fb.getColumnName(c_index), + ) + else: + ret = _receive_numeric_column_from_pipe( + sds, + pipe, + pipe_id, + d_type, + num_rows, + batch_size_bytes, + fb.getColumnName(c_index), + ) + + fut.result() + else: + # Use Py4J transfer (original method) + ret = _receive_column_py4j(fb, col_array, c_index, d_type, num_rows) + df[fb.getColumnName(c_index)] = ret return df diff --git a/src/main/python/tests/matrix/test_block_converter_unix_pipe.py b/src/main/python/tests/matrix/test_block_converter_unix_pipe.py deleted file mode 100644 index c24a9357c84..00000000000 --- a/src/main/python/tests/matrix/test_block_converter_unix_pipe.py +++ /dev/null @@ -1,104 +0,0 @@ -# ------------------------------------------------------------- -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# ------------------------------------------------------------- - - -import os -import shutil -import unittest -import pandas as pd -import numpy as np -from systemds.context import SystemDSContext - - -class TestMatrixBlockConverterUnixPipe(unittest.TestCase): - - sds: SystemDSContext = None - temp_dir: str = "tests/iotests/temp_write_csv/" - - @classmethod - def setUpClass(cls): - cls.sds = SystemDSContext( - data_transfer_mode=1, logging_level=50, capture_stdout=True - ) - if not os.path.exists(cls.temp_dir): - os.makedirs(cls.temp_dir) - - @classmethod - def tearDownClass(cls): - cls.sds.close() - shutil.rmtree(cls.temp_dir, ignore_errors=True) - - def test_python_to_java(self): - combinations = [ # (n_rows, n_cols) - (5, 0), - (5, 1), - (10, 10), - ] - - for n_rows, n_cols in combinations: - matrix = ( - np.random.random((n_rows, n_cols)) - if n_cols != 0 - else np.random.random(n_rows) - ) - # Transfer into SystemDS and write to CSV - matrix_sds = self.sds.from_numpy(matrix) - matrix_sds.write( - self.temp_dir + "into_systemds_matrix.csv", format="csv", header=False - ).compute() - - # Read the CSV file using pandas - result_df = pd.read_csv( - self.temp_dir + "into_systemds_matrix.csv", header=None - ) - matrix_out = result_df.to_numpy() - if n_cols == 0: - matrix_out = matrix_out.flatten() - # Verify the data - self.assertTrue(np.allclose(matrix_out, matrix)) - - def test_java_to_python(self): - combinations = [ # (n_rows, n_cols) - (5, 1), - (10, 10), - ] - - for n_rows, n_cols in combinations: - matrix = np.random.random((n_rows, n_cols)) - - # Create a CSV file to read into SystemDS - pd.DataFrame(matrix).to_csv( - self.temp_dir + "out_of_systemds_matrix.csv", header=False, index=False - ) - - matrix_sds = self.sds.read( - self.temp_dir + "out_of_systemds_matrix.csv", - data_type="matrix", - format="csv", - ) - matrix_out = matrix_sds.compute() - - # Verify the data - self.assertTrue(np.allclose(matrix_out, matrix)) - - -if __name__ == "__main__": - unittest.main(exit=False) diff --git a/src/main/python/tests/python_java_data_transfer/__init__.py b/src/main/python/tests/python_java_data_transfer/__init__.py new file mode 100644 index 00000000000..e66abb4646f --- /dev/null +++ b/src/main/python/tests/python_java_data_transfer/__init__.py @@ -0,0 +1,20 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- diff --git a/src/main/python/tests/python_java_data_transfer/test_dense_numpy_matrix.py b/src/main/python/tests/python_java_data_transfer/test_dense_numpy_matrix.py new file mode 100644 index 00000000000..fcfe683dc7f --- /dev/null +++ b/src/main/python/tests/python_java_data_transfer/test_dense_numpy_matrix.py @@ -0,0 +1,246 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- + + +import os +import shutil +import unittest +import pandas as pd +import numpy as np +from systemds.context import SystemDSContext +from tests.test_utils import timeout + + +class TestMatrixBlockConverterUnixPipe(unittest.TestCase): + + sds: SystemDSContext = None + temp_dir: str = "tests/iotests/temp_write_csv/" + + @classmethod + def setUpClass(cls): + cls.sds = SystemDSContext( + data_transfer_mode=1, logging_level=10, capture_stdout=True + ) + if not os.path.exists(cls.temp_dir): + os.makedirs(cls.temp_dir) + + @classmethod + def tearDownClass(cls): + cls.sds.close() + shutil.rmtree(cls.temp_dir, ignore_errors=True) + + @timeout(60) + def test_python_to_java(self): + combinations = [ # (n_rows, n_cols) + (5, 0), + (5, 1), + (10, 10), + ] + + for n_rows, n_cols in combinations: + matrix = ( + np.random.random((n_rows, n_cols)) + if n_cols != 0 + else np.random.random(n_rows) + ) + # Transfer into SystemDS and write to CSV + matrix_sds = self.sds.from_numpy(matrix) + matrix_sds.write( + self.temp_dir + "into_systemds_matrix.csv", format="csv", header=False + ).compute() + + # Read the CSV file using pandas + result_df = pd.read_csv( + self.temp_dir + "into_systemds_matrix.csv", header=None + ) + matrix_out = result_df.to_numpy() + if n_cols == 0: + matrix_out = matrix_out.flatten() + # Verify the data + self.assertTrue(np.allclose(matrix_out, matrix)) + + @timeout(60) + def test_java_to_python(self): + """Test reading matrices from SystemDS back to Python with various dtypes.""" + # (dtype, shapes, data_type, tolerance) + configs = [ + (np.float64, [(5, 1), (10, 10), (100, 5)], "random", 1e-9), + (np.float32, [(10, 10), (50, 3)], "random", 1e-6), + (np.int32, [(10, 10), (20, 5)], "randint", 0.0), + (np.uint8, [(10, 10), (15, 8)], "randuint8", 0.0), + ] + + def _gen_data(dtype, data_type): + if data_type == "random": + return lambda s: np.random.random(s).astype(dtype) + elif data_type == "randint": + return lambda s: np.random.randint(-10000, 10000, s).astype(dtype) + elif data_type == "randuint8": + return lambda s: np.random.randint(0, 255, s).astype(dtype) + + test_cases = [ + { + "dtype": dt, + "shape": sh, + "data": _gen_data(dt, data_type), + "tolerance": tol, + } + for dt, shapes, data_type, tol in configs + for sh in shapes + ] + [ + # Edge cases + { + "dtype": np.float64, + "shape": (1, 1), + "data": lambda s: np.random.random(s).astype(np.float64), + "tolerance": 1e-9, + }, + { + "dtype": np.float64, + "shape": (1, 10), + "data": lambda s: np.random.random(s).astype(np.float64), + "tolerance": 1e-9, + }, + { + "dtype": np.float64, + "shape": (10, 10), + "data": lambda s: np.zeros(s, dtype=np.float64), + "tolerance": 0.0, + }, + { + "dtype": np.float64, + "shape": (10, 5), + "data": lambda s: np.random.uniform(-100.0, 100.0, s).astype( + np.float64 + ), + "tolerance": 1e-9, + }, + ] + + for i, test_case in enumerate(test_cases): + with self.subTest(i=i, dtype=test_case["dtype"], shape=test_case["shape"]): + matrix = test_case["data"](test_case["shape"]) + + # Create a CSV file to read into SystemDS + csv_path = self.temp_dir + f"out_of_systemds_matrix_{i}.csv" + pd.DataFrame(matrix).to_csv(csv_path, header=False, index=False) + + matrix_sds = self.sds.read( + csv_path, + data_type="matrix", + format="csv", + ) + matrix_out = matrix_sds.compute() + + # Verify the data + # Note: SystemDS reads all matrices as FP64, so we compare accordingly + if test_case["tolerance"] == 0.0: + # Exact match for integer types + self.assertTrue( + np.array_equal( + matrix.astype(np.float64), matrix_out.astype(np.float64) + ), + f"Matrix with dtype {test_case['dtype']} and shape {test_case['shape']} doesn't match exactly", + ) + else: + # Approximate match for float types + self.assertTrue( + np.allclose( + matrix.astype(np.float64), + matrix_out.astype(np.float64), + atol=test_case["tolerance"], + ), + f"Matrix with dtype {test_case['dtype']} and shape {test_case['shape']} doesn't match within tolerance", + ) + + @timeout(60) + def test_java_to_python_unsupported_dtypes(self): + """Test that unsupported dtypes are handled gracefully or converted.""" + # Note: SystemDS will convert unsupported dtypes to FP64 when reading from CSV + # So these should still work, just with type conversion + + test_cases = [ + # INT64 - not directly supported for MatrixBlock, but CSV reads as FP64 + { + "dtype": np.int64, + "shape": (10, 5), + "data": lambda s: np.random.randint(-1000000, 1000000, s).astype( + np.int64 + ), + }, + # Complex types - not supported, should fail or be converted + { + "dtype": np.complex128, + "shape": (5, 5), + "data": lambda s: np.random.random(s) + 1j * np.random.random(s), + "should_fail": True, # Complex numbers not supported in matrices + }, + ] + + for i, test_case in enumerate(test_cases): + with self.subTest(i=i, dtype=test_case["dtype"], shape=test_case["shape"]): + if test_case.get("should_fail", False): + # Test that unsupported types fail gracefully + matrix = test_case["data"](test_case["shape"]) + csv_path = self.temp_dir + f"unsupported_matrix_{i}.csv" + + # Writing complex numbers to CSV might fail or convert to real part + try: + pd.DataFrame(matrix).to_csv(csv_path, header=False, index=False) + # If writing succeeds, reading might fail or behave unexpectedly + with self.assertRaises(Exception): + matrix_sds = self.sds.read( + csv_path, + data_type="matrix", + format="csv", + ) + matrix_sds.compute() + except Exception: + # Writing failed, which is expected + pass + else: + # Type should be converted to FP64 + matrix = test_case["data"](test_case["shape"]) + csv_path = self.temp_dir + f"converted_matrix_{i}.csv" + + # Write as the original dtype (pandas will handle conversion for CSV) + pd.DataFrame(matrix).to_csv(csv_path, header=False, index=False) + + matrix_sds = self.sds.read( + csv_path, + data_type="matrix", + format="csv", + ) + matrix_out = matrix_sds.compute() + + # Should be converted to FP64 and match values + self.assertTrue( + np.allclose( + matrix.astype(np.float64), + matrix_out.astype(np.float64), + atol=1e-9, + ), + f"Converted matrix with dtype {test_case['dtype']} doesn't match", + ) + + +if __name__ == "__main__": + unittest.main(exit=False) diff --git a/src/main/python/tests/python_java_data_transfer/test_pandas_frame.py b/src/main/python/tests/python_java_data_transfer/test_pandas_frame.py new file mode 100644 index 00000000000..a841795363a --- /dev/null +++ b/src/main/python/tests/python_java_data_transfer/test_pandas_frame.py @@ -0,0 +1,265 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- + + +import os +import shutil +import unittest +import pandas as pd +import numpy as np +from systemds.context import SystemDSContext +from tests.test_utils import timeout + + +class TestFrameConverterUnixPipe(unittest.TestCase): + + sds: SystemDSContext = None + temp_dir: str = "tests/iotests/temp_write_csv/" + + @classmethod + def setUpClass(cls): + cls.sds = SystemDSContext( + data_transfer_mode=1, logging_level=10, capture_stdout=True + ) + if not os.path.exists(cls.temp_dir): + os.makedirs(cls.temp_dir) + + @classmethod + def tearDownClass(cls): + cls.sds.close() + shutil.rmtree(cls.temp_dir, ignore_errors=True) + + @timeout(60) + def test_frame_python_to_java(self): + """Test converting pandas DataFrame to SystemDS FrameBlock and writing to CSV.""" + combinations = [ + # Float32 column + {"float32_col": np.random.random(50).astype(np.float32)}, + # Float64 column + {"float64_col": np.random.random(50).astype(np.float64)}, + # Int32 column + {"int32_col": np.random.randint(-1000, 1000, 50).astype(np.int32)}, + # Int64 column + {"int64_col": np.random.randint(-1000000, 1000000, 50).astype(np.int64)}, + # Uint8 column + {"uint8_col": np.random.randint(0, 255, 50).astype(np.uint8)}, + # All numeric types together + { + "float32_col": np.random.random(30).astype(np.float32), + "float64_col": np.random.random(30).astype(np.float64), + "int32_col": np.random.randint(-1000, 1000, 30).astype(np.int32), + "int64_col": np.random.randint(-1000000, 1000000, 30).astype(np.int64), + "uint8_col": np.random.randint(0, 255, 30).astype(np.uint8), + }, + # Mixed numeric types with strings + { + "float32_col": np.random.random(25).astype(np.float32), + "float64_col": np.random.random(25).astype(np.float64), + "int32_col": np.random.randint(-1000, 1000, 25).astype(np.int32), + "int64_col": np.random.randint(-1000000, 1000000, 25).astype(np.int64), + "uint8_col": np.random.randint(0, 255, 25).astype(np.uint8), + "string_col": [f"string_{i}" for i in range(25)], + }, + ] + + for frame_dict in combinations: + frame = pd.DataFrame(frame_dict) + # Transfer into SystemDS and write to CSV + frame_sds = self.sds.from_pandas(frame) + frame_sds.write( + self.temp_dir + "into_systemds_frame.csv", format="csv", header=False + ).compute() + + # Read the CSV file using pandas + result_df = pd.read_csv( + self.temp_dir + "into_systemds_frame.csv", header=None + ) + + # For numeric columns, verify with allclose for floats, exact match for integers + # For string columns, verify exact match + for col_idx, col_name in enumerate(frame.columns): + original_col = frame[col_name] + result_col = result_df.iloc[:, col_idx] + + if pd.api.types.is_numeric_dtype(original_col): + original_dtype = original_col.dtype + # For integer types (int32, int64, uint8), use exact equality + if original_dtype in [np.int32, np.int64, np.uint8]: + self.assertTrue( + np.array_equal( + original_col.values.astype(original_dtype), + result_col.values.astype(original_dtype), + ), + f"Column {col_name} (dtype: {original_dtype}) integer values don't match exactly", + ) + else: + # For float types (float32, float64), use allclose + self.assertTrue( + np.allclose( + original_col.values.astype(float), + result_col.values.astype(float), + equal_nan=True, + ), + f"Column {col_name} (dtype: {original_dtype}) float values don't match", + ) + else: + # For string columns, compare as strings + self.assertTrue( + ( + original_col.astype(str).values + == result_col.astype(str).values + ).all(), + f"Column {col_name} string values don't match", + ) + + @timeout(60) + def test_frame_java_to_python(self): + """Test reading CSV into SystemDS FrameBlock and converting back to pandas DataFrame.""" + combinations = [ + {"float32_col": np.random.random(50).astype(np.float32)}, + {"float64_col": np.random.random(50).astype(np.float64)}, + {"int32_col": np.random.randint(-1000, 1000, 50).astype(np.int32)}, + {"int64_col": np.random.randint(-1000000, 1000000, 50).astype(np.int64)}, + {"uint8_col": np.random.randint(0, 255, 50).astype(np.uint8)}, + # String column only + {"text_col": [f"text_value_{i}" for i in range(30)]}, + # All numeric types together + { + "float32_col": np.random.random(30).astype(np.float32), + "float64_col": np.random.random(30).astype(np.float64), + "int32_col": np.random.randint(-1000, 1000, 30).astype(np.int32), + "int64_col": np.random.randint(-1000000, 1000000, 30).astype(np.int64), + "uint8_col": np.random.randint(0, 255, 30).astype(np.uint8), + }, + # Mixed numeric types with strings + { + "float32_col": np.random.random(25).astype(np.float32), + "float64_col": np.random.random(25).astype(np.float64), + "int32_col": np.random.randint(-1000, 1000, 25).astype(np.int32), + "int64_col": np.random.randint(-1000000, 1000000, 25).astype(np.int64), + "uint8_col": np.random.randint(0, 255, 25).astype(np.uint8), + "string_col": [f"string_{i}" for i in range(25)], + }, + ] + print("Running frame conversion test\n\n!!!!") + for frame_dict in combinations: + frame = pd.DataFrame(frame_dict) + # Create a CSV file to read into SystemDS + frame_sds = self.sds.from_pandas(frame) + frame_sds = frame_sds.rbind(frame_sds) + frame_out = frame_sds.compute() + + frame = pd.concat([frame, frame], ignore_index=True) + + # Verify it's a DataFrame + self.assertIsInstance(frame_out, pd.DataFrame) + + # Verify shape matches + self.assertEqual(frame.shape, frame_out.shape, "Frame shapes don't match") + + # Verify column data + for col_name in frame.columns: + original_col = frame[col_name] + # FrameBlock to pandas may not preserve column names, so compare by position + col_idx = list(frame.columns).index(col_name) + result_col = frame_out.iloc[:, col_idx] + + if pd.api.types.is_numeric_dtype(original_col): + original_dtype = original_col.dtype + # For integer types (int32, int64, uint8), use exact equality + if original_dtype in [np.int32, np.int64, np.uint8]: + self.assertTrue( + np.array_equal( + original_col.values.astype(original_dtype), + result_col.values.astype(original_dtype), + ), + f"Column {col_name} (dtype: {original_dtype}) integer values don't match exactly", + ) + else: + # For float types (float32, float64), use allclose + # print difference in case of failure + if not np.allclose( + original_col.values.astype(float), + result_col.values.astype(float), + equal_nan=True, + atol=1e-6, + ): + print( + f"Column {col_name} (dtype: {original_dtype}) float values don't match: {np.abs(original_col.values.astype(float) - result_col.values.astype(float))}" + ) + self.assertTrue( + False, + f"Column {col_name} (dtype: {original_dtype}) float values don't match", + ) + + else: + # For string columns, compare as strings + original_str = original_col.astype(str).values + result_str = result_col.astype(str).values + self.assertTrue( + (original_str == result_str).all(), + f"Column {col_name} string values don't match", + ) + + @timeout(60) + def test_frame_string_with_nulls(self): + """Test converting pandas DataFrame with null string values.""" + # Create a simple DataFrame with 5 string values, 2 of them None + df = pd.DataFrame({"string_col": ["hello", None, "world", None, "test"]}) + + # Transfer into SystemDS and back + frame_sds = self.sds.from_pandas(df) + frame_sds = frame_sds.rbind(frame_sds) + frame_out = frame_sds.compute() + df = pd.concat([df, df], ignore_index=True) + + # Verify it's a DataFrame + self.assertIsInstance(frame_out, pd.DataFrame) + + # Verify shape matches + self.assertEqual(df.shape, frame_out.shape, "Frame shapes don't match") + + # Verify column data - check that None values are preserved + original_col = df["string_col"] + result_col = frame_out.iloc[:, 0] + + # Check each value + for i in range(len(original_col)): + original_val = original_col.iloc[i] + result_val = result_col.iloc[i] + + if pd.isna(original_val): + # Original is null, result should also be null + self.assertTrue( + pd.isna(result_val), + f"Row {i}: Expected null but got '{result_val}'", + ) + else: + # Original is not null, result should match + self.assertEqual( + str(original_val), + str(result_val), + f"Row {i}: Expected '{original_val}' but got '{result_val}'", + ) + + +if __name__ == "__main__": + unittest.main(exit=False) diff --git a/src/main/python/tests/test_utils.py b/src/main/python/tests/test_utils.py new file mode 100644 index 00000000000..b4aec71d4b3 --- /dev/null +++ b/src/main/python/tests/test_utils.py @@ -0,0 +1,58 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- + +import functools +import threading + + +def timeout(seconds): + """Decorator to add timeout to test methods.""" + + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + result = [None] + exception = [None] + + def target(): + try: + result[0] = func(*args, **kwargs) + except Exception as e: + exception[0] = e + + thread = threading.Thread(target=target) + thread.daemon = True + thread.start() + thread.join(seconds) + + if thread.is_alive(): + raise TimeoutError( + f"Test {func.__name__} exceeded timeout of {seconds} seconds" + ) + + if exception[0]: + raise exception[0] + + return result[0] + + return wrapper + + return decorator diff --git a/src/test/java/org/apache/sysds/test/component/utils/UnixPipeUtilsTest.java b/src/test/java/org/apache/sysds/test/component/utils/UnixPipeUtilsTest.java index 650d6c1053f..424d513a99f 100644 --- a/src/test/java/org/apache/sysds/test/component/utils/UnixPipeUtilsTest.java +++ b/src/test/java/org/apache/sysds/test/component/utils/UnixPipeUtilsTest.java @@ -20,6 +20,8 @@ package org.apache.sysds.test.component.utils; import org.apache.sysds.common.Types; +import org.apache.sysds.runtime.frame.data.columns.Array; +import org.apache.sysds.runtime.frame.data.columns.ArrayFactory; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.UnixPipeUtils; import org.junit.Rule; @@ -44,6 +46,8 @@ import static org.junit.Assert.assertArrayEquals; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; @RunWith(Enclosed.class) public class UnixPipeUtilsTest { @@ -59,6 +63,7 @@ public static Collection data() { {Types.ValueType.FP64, 6, 48, 99, new MatrixBlock(2, 3, new double[]{1.0, 2.0, 3.0, 4.0, 5.0, 6.0})}, {Types.ValueType.FP32, 6, 24, 88, new MatrixBlock(3, 2, new double[]{1.0, 2.0, 3.0, 4.0, 5.0, 6.0})}, {Types.ValueType.INT32, 4, 16, 77, new MatrixBlock(2, 2, new double[]{0, -1, 2, -3})}, + {Types.ValueType.INT64, 4, 64, 55, new MatrixBlock(2, 2, new double[]{0, 1, 2, 3})}, {Types.ValueType.UINT8, 4, 4, 66, new MatrixBlock(2, 2, new double[]{0, 1, 2, 3})} }); } @@ -81,6 +86,7 @@ public ParameterizedTest(Types.ValueType type, int numElem, int batchSize, int i @Test public void testReadWriteNumpyArrayBatch() throws IOException { File tempFile = folder.newFile("pipe_test_" + type.name()); + matrixBlock.recomputeNonZeros(); try (BufferedOutputStream out = UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), id)) { UnixPipeUtils.writeNumpyArrayInBatches(out, id, batchSize, numElem, type, matrixBlock); @@ -88,35 +94,105 @@ public void testReadWriteNumpyArrayBatch() throws IOException { double[] output = new double[numElem]; try (BufferedInputStream in = UnixPipeUtils.openInput(tempFile.getAbsolutePath(), id)) { - UnixPipeUtils.readNumpyArrayInBatches(in, id, batchSize, numElem, type, output, 0); + long nonZeros = UnixPipeUtils.readNumpyArrayInBatches(in, id, batchSize, numElem, type, output, 0); + // Verify nonzero count matches MatrixBlock + org.junit.Assert.assertEquals(matrixBlock.getNonZeros(), nonZeros); } assertArrayEquals(matrixBlock.getDenseBlockValues(), output, 1e-9); } } + @RunWith(Parameterized.class) + public static class FrameColumnParameterizedTest { + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Parameterized.Parameters(name = "{index}: frameType={0}") + public static Collection data() { + return Arrays.asList(new Object[][]{ + {Types.ValueType.FP64, new Object[]{1.0, -2.5, 3.25, 4.75}, 64, 201}, + {Types.ValueType.FP32, new Object[]{1.0f, -2.25f, 3.5f, -4.125f}, 48, 202}, + {Types.ValueType.INT32, new Object[]{0, -1, 5, 42}, 32, 203}, + {Types.ValueType.BOOLEAN, new Object[]{true, false, true, false}, 8, 205}, + {Types.ValueType.STRING, new Object[]{"alpha", "beta", "gamma",null, "delta"}, 64, 205}, + {Types.ValueType.STRING, new Object[]{"alphaalphaalphaalphaalpha", "beta", "gamma",null, "delta"}, 16, 205}, + {Types.ValueType.FP64, new Object[]{1.0, -2.5, 3.25, 4.75}, 8, 201}, + }); + } + + private final Types.ValueType type; + private final Object[] values; + private final int batchSize; + private final int id; + + public FrameColumnParameterizedTest(Types.ValueType type, Object[] values, int batchSize, int id) { + this.type = type; + this.values = values; + this.batchSize = batchSize; + this.id = id; + } + + @Test + public void testReadWriteFrameColumn() throws IOException { + File tempFile = folder.newFile("frame_pipe_" + type.name()); + Array column = createColumn(type, values); + + long bytesWritten; + try(BufferedOutputStream out = UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), id)) { + bytesWritten = UnixPipeUtils.writeFrameColumnToPipe(out, id, batchSize, column, type); + } + + int totalBytes = Math.toIntExact(bytesWritten); + try(BufferedInputStream in = UnixPipeUtils.openInput(tempFile.getAbsolutePath(), id)) { + Array read = UnixPipeUtils.readFrameColumnFromPipe(in, id, values.length, totalBytes, batchSize, type); + assertFrameColumnEquals(column, read, type); + } + } + + private static Array createColumn(Types.ValueType type, Object[] values) { + Array array = ArrayFactory.allocate(type, values.length); + for(int i = 0; i < values.length; i++) { + switch(type) { + case STRING -> array.set(i, (String) values[i]); + case BOOLEAN -> array.set(i, ((Boolean) values[i]) ? 1.0 : 0.0); + default -> array.set(i, ((Number) values[i]).doubleValue()); + } + } + return array; + } + + private static void assertFrameColumnEquals(Array expected, Array actual, Types.ValueType type) { + org.junit.Assert.assertEquals(expected.size(), actual.size()); + for(int i = 0; i < expected.size(); i++) { + switch(type) { + case FP64 -> org.junit.Assert.assertEquals( + ((Number) expected.get(i)).doubleValue(), + ((Number) actual.get(i)).doubleValue(), 1e-9); + case FP32 -> org.junit.Assert.assertEquals( + ((Number) expected.get(i)).floatValue(), + ((Number) actual.get(i)).floatValue(), 1e-6f); + case STRING -> org.junit.Assert.assertEquals(expected.get(i), actual.get(i)); + default -> org.junit.Assert.assertEquals(expected.get(i), actual.get(i)); + } + } + } + } + public static class NonParameterizedTest { @Rule public TemporaryFolder folder = new TemporaryFolder(); @Test(expected = FileNotFoundException.class) public void testOpenInputFileNotFound() throws IOException { - // instantiate class once for coverage new UnixPipeUtils(); - - // Create a path that does not exist File nonExistentFile = new File(folder.getRoot(), "nonexistent.pipe"); - - // This should throw FileNotFoundException UnixPipeUtils.openInput(nonExistentFile.getAbsolutePath(), 123); } @Test(expected = FileNotFoundException.class) public void testOpenOutputFileNotFound() throws IOException { - // Create a path that does not exist File nonExistentFile = new File(folder.getRoot(), "nonexistent.pipe"); - - // This should throw FileNotFoundException UnixPipeUtils.openOutput(nonExistentFile.getAbsolutePath(), 123); } @@ -125,14 +201,9 @@ public void testOpenOutputFileNotFound() throws IOException { public void testOpenInputAndOutputHandshakeMatch() throws IOException { File tempFile = folder.newFile("pipe_test1"); int id = 42; - - // Write expected handshake + try (BufferedOutputStream bos = UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), id)) {} - - // Read and validate handshake - try (BufferedInputStream bis = UnixPipeUtils.openInput(tempFile.getAbsolutePath(), id)) { - // success: no exception = handshake passed - } + try (BufferedInputStream bis = UnixPipeUtils.openInput(tempFile.getAbsolutePath(), id)) {} } @Test(expected = IllegalStateException.class) @@ -142,8 +213,6 @@ public void testOpenInputHandshakeMismatch() throws IOException { int wrongReadId = 456; try (BufferedOutputStream bos = UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), writeId)) {} - - // Will throw due to ID mismatch UnixPipeUtils.openInput(tempFile.getAbsolutePath(), wrongReadId); } @@ -159,6 +228,18 @@ public void testOpenInputIncompleteHandshake() throws IOException { UnixPipeUtils.openInput(tempFile.getAbsolutePath(), 100); } + @Test(expected = IOException.class) + public void testReadColumnFromPipeError() throws IOException { + File tempFile = folder.newFile("pipe_test3"); + int id = 42; + + BufferedOutputStream bos = UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), id); + BufferedInputStream bis = UnixPipeUtils.openInput(tempFile.getAbsolutePath(), id); + Array column = ArrayFactory.allocate(Types.ValueType.INT64, 4); + UnixPipeUtils.writeFrameColumnToPipe(bos, id, 16, column, Types.ValueType.INT64); + UnixPipeUtils.readFrameColumnFromPipe(bis, 42, 4, 12, 32 * 1024, Types.ValueType.INT32); + } + @Test(expected = EOFException.class) public void testReadNumpyArrayUnexpectedEOF() throws IOException { File tempFile = folder.newFile("pipe_test5"); @@ -187,5 +268,201 @@ public void testReadNumpyArrayUnexpectedEOF() throws IOException { UnixPipeUtils.readNumpyArrayInBatches(in, id, batchSize, numElem, type, outArr, 0); } } + + @Test(expected = UnsupportedOperationException.class) + public void testGetElementSizeUnsupportedType() { + UnixPipeUtils.getElementSize(Types.ValueType.STRING); + } + + @Test(expected = UnsupportedOperationException.class) + public void testReadNumpyArrayUnsupportedType() throws IOException { + File file = folder.newFile("unsupported_type.pipe"); + int id = 7; + try (BufferedOutputStream out = UnixPipeUtils.openOutput(file.getAbsolutePath(), id)) { + UnixPipeUtils.writeHandshake(id, out); // start handshake + out.flush(); + UnixPipeUtils.writeHandshake(id, out); // end handshake, no payload + } + double[] outArr = new double[0]; + try (BufferedInputStream in = UnixPipeUtils.openInput(file.getAbsolutePath(), id)) { + UnixPipeUtils.readNumpyArrayInBatches(in, id, 32, 0, Types.ValueType.STRING, outArr, 0); + } + } + + @Test(expected = NullPointerException.class) + public void testWriteNumpyArrayInBatchesError() throws IOException { + UnixPipeUtils.writeNumpyArrayInBatches(null, 0, 0, 0, Types.ValueType.INT32, null); + } + + @Test + public void testGetBufferReaderUnsupportedType() throws Exception { + Method m = UnixPipeUtils.class.getDeclaredMethod("getBufferReader", Types.ValueType.class); + m.setAccessible(true); + + try { + m.invoke(null, Types.ValueType.STRING); + org.junit.Assert.fail("Expected UnsupportedOperationException"); + } catch (InvocationTargetException e) { + org.junit.Assert.assertTrue(e.getCause() instanceof UnsupportedOperationException); + } + } + + @Test + public void testGetBufferWriterUnsupportedType() throws Exception { + Method m = UnixPipeUtils.class.getDeclaredMethod("getBufferWriter", Types.ValueType.class); + m.setAccessible(true); + + try { + m.invoke(null, Types.ValueType.STRING); + org.junit.Assert.fail("Expected UnsupportedOperationException"); + } catch (InvocationTargetException e) { + org.junit.Assert.assertTrue(e.getCause() instanceof UnsupportedOperationException); + } + } + + @Test(expected = UnsupportedOperationException.class) + public void testReadWriteFrameColumnUINT8() throws IOException { + File tempFile = folder.newFile("frame_pipe_UINT8"); + int id = 204; + + BufferedOutputStream out = UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), id); + UnixPipeUtils.writeHandshake(id, out); + out.write(new byte[]{(byte) 0x00, (byte) 0x01, (byte) 0x02, (byte) 0x03}); + UnixPipeUtils.writeHandshake(id, out); + out.flush(); + + Array read = null; + try(BufferedInputStream in = UnixPipeUtils.openInput(tempFile.getAbsolutePath(), id)) { + read = UnixPipeUtils.readFrameColumnFromPipe(in, id, 4, 4, 32 * 1024, Types.ValueType.UINT8); + for(int i = 0; i < 4; i++) { + org.junit.Assert.assertEquals(i, read.get(i)); + } + } + try(BufferedOutputStream out2 = UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), id)) { + UnixPipeUtils.writeFrameColumnToPipe(out2, id, 16, read, Types.ValueType.UINT8); + } + } + + @Test + public void testReadWriteFrameColumnINT64() throws IOException { + File tempFile = folder.newFile("frame_pipe_INT32"); + int id = 204; + + BufferedOutputStream out = UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), id); + UnixPipeUtils.writeHandshake(id, out); + // write 4 int64 values + ByteBuffer bb = ByteBuffer.allocate(8 * 4).order(ByteOrder.LITTLE_ENDIAN); + for(int i = 0; i < 4; i++) { + bb.putLong(i); + } + out.write(bb.array()); + UnixPipeUtils.writeHandshake(id, out); + out.flush(); + + Array read = null; + try(BufferedInputStream in = UnixPipeUtils.openInput(tempFile.getAbsolutePath(), id)) { + read = UnixPipeUtils.readFrameColumnFromPipe(in, id, 4, 32, 32 * 1024, Types.ValueType.INT64); + for(int i = 0; i < 4; i++) { + org.junit.Assert.assertEquals(i, ((Number) read.get(i)).longValue()); + } + } + } + + @Test(expected = UnsupportedOperationException.class) + public void testReadWriteFrameColumnUnsupportedType() throws IOException { + File tempFile = folder.newFile("frame_pipe_HASH64"); + int id = 204; + + BufferedOutputStream out = UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), id); + UnixPipeUtils.writeHandshake(id, out); + out.flush(); + + try(BufferedInputStream in = UnixPipeUtils.openInput(tempFile.getAbsolutePath(), id)) { + UnixPipeUtils.readFrameColumnFromPipe(in, id, 4, 32, 4, Types.ValueType.HASH64); + } + } + + @Test + public void testReadWriteFrameColumnLongString1() throws IOException { + File tempFile = folder.newFile("frame_pipe_long_string"); + Array column = FrameColumnParameterizedTest.createColumn(Types.ValueType.STRING, + new Object[]{"alphaalphaalphaalphaalphaa", "beta", "gamma",null, "delta"}); + int id = 205; + int batchSize = 16; + + long bytesWritten; + try(BufferedOutputStream out = UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), id)) { + bytesWritten = UnixPipeUtils.writeFrameColumnToPipe(out, id, batchSize, column, Types.ValueType.STRING); + } + + int totalBytes = Math.toIntExact(bytesWritten); + try(BufferedInputStream in = UnixPipeUtils.openInput(tempFile.getAbsolutePath(), id)) { + Array read = UnixPipeUtils.readFrameColumnFromPipe(in, id, column.size(), totalBytes, batchSize, Types.ValueType.STRING); + FrameColumnParameterizedTest.assertFrameColumnEquals(column, read, Types.ValueType.STRING); + } + } + + @Test + public void testReadWriteFrameColumnLongString2() throws IOException { + File tempFile = folder.newFile("frame_pipe_long_string"); + StringBuilder sb = new StringBuilder(); + for(int i = 0; i < 35*1024; i++) { + sb.append("a"); + } + Array column = FrameColumnParameterizedTest.createColumn(Types.ValueType.STRING, + new Object[]{sb.toString()}); + int id = 205; + int batchSize = 16*1024; + + long bytesWritten; + try(BufferedOutputStream out = UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), id)) { + bytesWritten = UnixPipeUtils.writeFrameColumnToPipe(out, id, batchSize, column, Types.ValueType.STRING); + } + + int totalBytes = Math.toIntExact(bytesWritten); + try(BufferedInputStream in = UnixPipeUtils.openInput(tempFile.getAbsolutePath(), id)) { + Array read = UnixPipeUtils.readFrameColumnFromPipe(in, id, column.size(), totalBytes, batchSize, Types.ValueType.STRING); + FrameColumnParameterizedTest.assertFrameColumnEquals(column, read, Types.ValueType.STRING); + } + } + + @Test + public void testReadWriteFrameColumnString() throws IOException { + File tempFile = folder.newFile("frame_pipe_long_string"); + Array column = FrameColumnParameterizedTest.createColumn(Types.ValueType.STRING, + new Object[]{"alphabet"}); + int id = 205; + int batchSize = 12; + + long bytesWritten; + try(BufferedOutputStream out = UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), id)) { + bytesWritten = UnixPipeUtils.writeFrameColumnToPipe(out, id, batchSize, column, Types.ValueType.STRING); + } + + int totalBytes = Math.toIntExact(bytesWritten); + try(BufferedInputStream in = UnixPipeUtils.openInput(tempFile.getAbsolutePath(), id)) { + Array read = UnixPipeUtils.readFrameColumnFromPipe(in, id, column.size(), totalBytes, batchSize, Types.ValueType.STRING); + FrameColumnParameterizedTest.assertFrameColumnEquals(column, read, Types.ValueType.STRING); + } + } + + @Test + public void testWriteFrameColumnINT32() throws IOException { + File tempFile = folder.newFile("frame_pip2_INT32"); + int id = 204; + Array column = FrameColumnParameterizedTest.createColumn(Types.ValueType.INT32, + new Object[]{0, 1, 2, 3}); + + try(BufferedOutputStream out = UnixPipeUtils.openOutput(tempFile.getAbsolutePath(), id)) { + UnixPipeUtils.writeFrameColumnToPipe(out, id, 4, column, Types.ValueType.INT32); + } + + Array read = null; + try(BufferedInputStream in = UnixPipeUtils.openInput(tempFile.getAbsolutePath(), id)) { + read = UnixPipeUtils.readFrameColumnFromPipe(in, id, 4, 16, 4, Types.ValueType.INT32); + FrameColumnParameterizedTest.assertFrameColumnEquals(column, read, Types.ValueType.INT32); + } + + } } } diff --git a/src/test/java/org/apache/sysds/test/usertest/pythonapi/StartupTest.java b/src/test/java/org/apache/sysds/test/usertest/pythonapi/StartupTest.java index f08dd63c65a..4365dd629fa 100644 --- a/src/test/java/org/apache/sysds/test/usertest/pythonapi/StartupTest.java +++ b/src/test/java/org/apache/sysds/test/usertest/pythonapi/StartupTest.java @@ -1,18 +1,18 @@ /* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ @@ -24,7 +24,10 @@ import org.apache.log4j.spi.LoggingEvent; import org.apache.sysds.api.PythonDMLScript; import org.apache.sysds.common.Types; +import org.apache.sysds.common.Types.ValueType; import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.frame.data.columns.Array; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.UnixPipeUtils; import org.apache.sysds.test.LoggingUtils; @@ -39,7 +42,6 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.File; -import java.security.Permission; import java.util.List; import static org.junit.Assert.assertArrayEquals; @@ -48,29 +50,19 @@ /** Simple tests to verify startup of Python Gateway server happens without crashes */ public class StartupTest { private LoggingUtils.TestAppender appender; - @SuppressWarnings("removal") - private SecurityManager sm; @Before - @SuppressWarnings("removal") public void setUp() { appender = LoggingUtils.overwrite(); - sm = System.getSecurityManager(); - System.setSecurityManager(new NoExitSecurityManager()); + PythonDMLScript.setExitHandler(new ExitCalled()); PythonDMLScript.setDMLGateWayListenerLoggerLevel(Level.ALL); Logger.getLogger(PythonDMLScript.class.getName()).setLevel(Level.ALL); } @After - @SuppressWarnings("removal") public void tearDown() { LoggingUtils.reinsert(appender); - System.setSecurityManager(sm); - } - - @SuppressWarnings("unused") - private void assertLogMessages(String... expectedMessages) { - assertLogMessages(true, expectedMessages); + PythonDMLScript.resetExitHandler(); } private void assertLogMessages(boolean strict, String... expectedMessages) { @@ -92,9 +84,9 @@ private void assertLogMessages(boolean strict, String... expectedMessages) { // order does not matter boolean found = false; - for (LoggingEvent loggingEvent : log) { - found |= loggingEvent.getMessage().toString().startsWith(message); - } + for (LoggingEvent loggingEvent : log) { + found |= loggingEvent.getMessage().toString().startsWith(message); + } Assert.assertTrue("Expected log message not found: " + message,found); } } @@ -137,7 +129,7 @@ public void testStartupIncorrect_6() throws Exception { Thread.sleep(200); PythonDMLScript.main(new String[]{"-python", "4001"}); Thread.sleep(200); - } catch (SecurityException e) { + } catch (ExitCalled e) { assertLogMessages(false, "GatewayServer started", "failed startup" @@ -185,8 +177,9 @@ public void testDataTransfer() throws Exception { MatrixBlock mb = new MatrixBlock(2, 3, data); script.startWritingMbToPipe(0, mb); double[] rcv_data = new double[data.length]; - UnixPipeUtils.readNumpyArrayInBatches(java2py, 0, 32, data.length, Types.ValueType.FP64, rcv_data, 0); + long nonZeros = UnixPipeUtils.readNumpyArrayInBatches(java2py, 0, 32, data.length, Types.ValueType.FP64, rcv_data, 0); assertArrayEquals(data, rcv_data, 1e-9); + Assert.assertEquals((long) data.length, nonZeros); // All values are non-zero // Read Test UnixPipeUtils.writeNumpyArrayInBatches(py2java, 0, 32, data.length, Types.ValueType.FP64, mb); @@ -230,6 +223,46 @@ public void testDataTransferMultiPipes() throws Exception { PythonDMLScript.GwS.shutdown(); Thread.sleep(200); } + + + @Test + public void testDataFrameTransfer() throws Exception { + PythonDMLScript.main(new String[]{"-python", "4003"}); + Thread.sleep(200); + PythonDMLScript script = (PythonDMLScript) PythonDMLScript.GwS.getGateway().getEntryPoint(); + + File in = folder.newFile("py2java-0"); + File out = folder.newFile("java2py-0"); + + // Init Test + BufferedOutputStream py2java = UnixPipeUtils.openOutput(in.getAbsolutePath(), 0); + script.openPipes(folder.getRoot().getPath(), 1); + BufferedInputStream java2py = UnixPipeUtils.openInput(out.getAbsolutePath(), 0); + + // Write Test + String[][] data = new String[][]{{"1", "2", "3"}, {"4", "5", "6"}}; + ValueType[] schema = new ValueType[]{Types.ValueType.STRING, Types.ValueType.STRING, Types.ValueType.STRING}; + FrameBlock fb = new FrameBlock(schema, data); + + FrameBlock rcv_fb = new FrameBlock(schema, 2); + + for (int i = 0; i < 3; i++) { + script.startWritingColToPipe(0, fb, i); + Array rcv_arr = UnixPipeUtils.readFrameColumnFromPipe(java2py, 0, 2, -1, 32 * 1024, Types.ValueType.STRING); + rcv_fb.setColumn(i, rcv_arr); + } + + for (int i = 0; i < 3; i++) { + UnixPipeUtils.writeFrameColumnToPipe(py2java, 0, 32, fb.getColumn(i), Types.ValueType.STRING); + script.startReadingColFromPipe(0, rcv_fb, 2, -1, i, Types.ValueType.STRING, false); + } + + script.closePipes(); + + PythonDMLScript.GwS.shutdown(); + Thread.sleep(200); + } + @Test(expected = DMLRuntimeException.class) public void testDataTransferNotInit1() throws Exception { @@ -255,14 +288,25 @@ public void testDataTransferNotInit3() throws Exception { script.startReadingMbFromPipes(new int[]{3,3}, 2, 3, Types.ValueType.FP64); } - @SuppressWarnings("removal") - class NoExitSecurityManager extends SecurityManager { - @Override - public void checkPermission(Permission perm) { } + @Test(expected = Exception.class) + public void testDataTransferNotInit4() throws Exception { + PythonDMLScript.main(new String[]{"-python", "4007"}); + Thread.sleep(200); + PythonDMLScript script = (PythonDMLScript) PythonDMLScript.GwS.getGateway().getEntryPoint(); + script.startReadingColFromPipe(0, null, 2, -1, 0, Types.ValueType.STRING, false); + } + @Test(expected = Exception.class) + public void testDataTransferNotInit5() throws Exception { + PythonDMLScript.main(new String[]{"-python", "4007"}); + Thread.sleep(200); + PythonDMLScript script = (PythonDMLScript) PythonDMLScript.GwS.getGateway().getEntryPoint(); + script.startWritingColToPipe(0, null, 0); + } + private static class ExitCalled extends RuntimeException implements PythonDMLScript.ExitHandler { @Override - public void checkExit(int status) { - throw new SecurityException("Intercepted exit()"); + public void exit(int status) { + throw this; } }