Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions java/vortex-jni/src/main/java/dev/vortex/api/VortexWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ static VortexWriter create(String uri, DType dtype, Map<String, String> options)
*/
void writeBatch(byte[] arrowData) throws IOException;

/**
* Writes a batch of Arrow data directly from Arrow C Data Interface pointers.
* <p>
* This avoids the IPC serialization overhead by accepting raw memory addresses
* of ArrowArray and ArrowSchema structs.
*
* @param arrowArrayAddr memory address of the ArrowArray struct
* @param arrowSchemaAddr memory address of the ArrowSchema struct
* @throws IOException if writing fails
*/
void writeBatchFfi(long arrowArrayAddr, long arrowSchemaAddr) throws IOException;

/**
* Closes the writer and finalizes the Vortex file.
* <p>
Expand Down
18 changes: 18 additions & 0 deletions java/vortex-jni/src/main/java/dev/vortex/jni/JNIWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,24 @@ public void writeBatch(byte[] arrowData) throws IOException {
}
}

/**
* Writes a batch of Arrow data directly from Arrow C Data Interface pointers.
*
* @param arrowArrayAddr memory address of the ArrowArray struct
* @param arrowSchemaAddr memory address of the ArrowSchema struct
* @throws IOException if writing fails
*/
@Override
public void writeBatchFfi(long arrowArrayAddr, long arrowSchemaAddr) throws IOException {
logger.trace("Writing batch via FFI (arrayAddr={}, schemaAddr={})", arrowArrayAddr, arrowSchemaAddr);

boolean success = NativeWriterMethods.writeBatchFfi(ptr.getAsLong(), arrowArrayAddr, arrowSchemaAddr);
if (!success) {
logger.error("Failed to write FFI batch to Vortex file");
throw new IOException("Failed to write FFI batch to Vortex file");
}
}

/**
* Closes the writer and finalizes the Vortex file.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ private NativeWriterMethods() {}
*/
public static native boolean writeBatch(long writerPtr, byte[] arrowData);

/**
* Writes a batch of Arrow data to the Vortex file directly from Arrow C Data Interface pointers.
*
* @param writerPtr the native writer pointer
* @param arrowArrayAddr memory address of the ArrowArray struct
* @param arrowSchemaAddr memory address of the ArrowSchema struct
* @return true if successful, false otherwise
*/
public static native boolean writeBatchFfi(long writerPtr, long arrowArrayAddr, long arrowSchemaAddr);

/**
* Close and flush the writer, finalizing it to the storage system.
*
Expand Down
80 changes: 80 additions & 0 deletions java/vortex-jni/src/test/java/dev/vortex/jni/JNIWriterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,32 @@

package dev.vortex.jni;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import dev.vortex.api.DType;
import dev.vortex.api.ScanOptions;
import dev.vortex.api.VortexWriter;
import dev.vortex.arrow.ArrowAllocation;
import static java.nio.charset.StandardCharsets.UTF_8;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand Down Expand Up @@ -58,4 +74,68 @@ public void testCreateWriter() throws IOException {
assertTrue(Files.exists(outputPath), "Output file should exist");
System.err.println("File created at: " + outputPath);
}

@Test
public void testWriteBatchFfi() throws IOException {
Path outputPath = tempDir.resolve("test_ffi.vortex");
String writePath = outputPath.toAbsolutePath().toUri().toString();

var writeSchema = DType.newStruct(
new String[] {"name", "age"},
new DType[] {DType.newUtf8(false), DType.newInt(false)},
false);

BufferAllocator allocator = ArrowAllocation.rootAllocator();

Schema arrowSchema = new Schema(java.util.List.of(
new Field("name", FieldType.notNullable(new ArrowType.Utf8()), null),
new Field("age", FieldType.notNullable(new ArrowType.Int(32, true)), null)));

try (VortexWriter writer = VortexWriter.create(writePath, writeSchema, new HashMap<>())) {
// Build a batch with Arrow Java
try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) {
VarCharVector nameVec = (VarCharVector) root.getVector("name");
IntVector ageVec = (IntVector) root.getVector("age");

nameVec.allocateNew(3);
ageVec.allocateNew(3);

nameVec.setSafe(0, "Alice".getBytes(UTF_8));
nameVec.setSafe(1, "Bob".getBytes(UTF_8));
nameVec.setSafe(2, "Carol".getBytes(UTF_8));
ageVec.setSafe(0, 30);
ageVec.setSafe(1, 25);
ageVec.setSafe(2, 40);

root.setRowCount(3);

// Export to C Data Interface
try (ArrowArray arrowArray = ArrowArray.allocateNew(allocator);
ArrowSchema arrowSchemaFfi = ArrowSchema.allocateNew(allocator)) {
Data.exportVectorSchemaRoot(allocator, root, null, arrowArray, arrowSchemaFfi);

writer.writeBatchFfi(arrowArray.memoryAddress(), arrowSchemaFfi.memoryAddress());
}
}
}

assertTrue(Files.exists(outputPath), "Output file should exist");

// Read back and verify
try (var file = dev.vortex.api.Files.open(outputPath.toAbsolutePath().toString());
var scan = file.newScan(ScanOptions.of())) {
assertEquals(3, file.rowCount());

var batch = scan.next();
var nameField = batch.getField(0);
var ageField = batch.getField(1);

assertEquals("Alice", nameField.getUTF8(0));
assertEquals("Bob", nameField.getUTF8(1));
assertEquals("Carol", nameField.getUTF8(2));
assertEquals(30, ageField.getInt(0));
assertEquals(25, ageField.getInt(1));
assertEquals(40, ageField.getInt(2));
}
}
}
37 changes: 37 additions & 0 deletions vortex-jni/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
use std::io::Cursor;

use arrow_array::RecordBatch;
use arrow_array::StructArray;
use arrow_array::ffi::FFI_ArrowArray;
use arrow_array::ffi::FFI_ArrowSchema;
use arrow_ipc::reader::StreamReader;
use futures::SinkExt;
use futures::channel::mpsc;
Expand Down Expand Up @@ -226,6 +229,40 @@ pub extern "system" fn Java_dev_vortex_jni_NativeWriterMethods_writeBatch<'local
})
}

/// Writes a batch to the Vortex file directly from Arrow C Data Interface pointers.
///
/// This avoids the IPC serialization/deserialization overhead of `writeBatch` by accepting
/// raw Arrow FFI pointers directly.
#[unsafe(no_mangle)]
pub extern "system" fn Java_dev_vortex_jni_NativeWriterMethods_writeBatchFfi<'local>(
mut env: JNIEnv<'local>,
_class: JClass<'local>,
writer_ptr: jlong,
arrow_array_addr: jlong,
arrow_schema_addr: jlong,
) -> jboolean {
if writer_ptr <= 0 {
return JNI_FALSE;
}

try_or_throw(&mut env, |_env| {
let writer = unsafe { NativeWriter::from_ptr(writer_ptr) };

// Reconstruct FFI structs from the raw pointers provided by Java.
let ffi_array =
unsafe { FFI_ArrowArray::from_raw(arrow_array_addr as *mut FFI_ArrowArray) };
let ffi_schema = unsafe { &*(arrow_schema_addr as *const FFI_ArrowSchema) };

let array_data = unsafe { arrow_array::ffi::from_ffi(ffi_array, ffi_schema) }
.map_err(|e| JNIError::Vortex(vortex_err!("Failed to import Arrow FFI data: {}", e)))?;

let batch = RecordBatch::from(StructArray::from(array_data));
writer.write_record_batch(batch)?;

Ok(JNI_TRUE)
})
}

/// Closes the writer
#[unsafe(no_mangle)]
pub extern "system" fn Java_dev_vortex_jni_NativeWriterMethods_close<'local>(
Expand Down
Loading