Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ set(PARQUET_SRCS
encryption/internal_file_encryptor.cc
exception.cc
file_reader.cc
file_rewriter.cc
file_writer.cc
geospatial/statistics.cc
geospatial/util_internal.cc
Expand Down Expand Up @@ -406,6 +407,8 @@ add_parquet_test(arrow-reader-writer-test

add_parquet_test(arrow-index-test SOURCES arrow/index_test.cc)

add_parquet_test(arrow-rewriter-test SOURCES arrow/arrow_rewriter_test.cc)

add_parquet_test(arrow-internals-test SOURCES arrow/path_internal_test.cc
arrow/reconstruct_internal_test.cc)

Expand Down
390 changes: 390 additions & 0 deletions cpp/src/parquet/arrow/arrow_rewriter_test.cc

Large diffs are not rendered by default.

29 changes: 29 additions & 0 deletions cpp/src/parquet/arrow/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,24 @@
#include "arrow/array/builder_binary.h"
#include "arrow/array/builder_decimal.h"
#include "arrow/array/builder_primitive.h"
#include "arrow/result.h"
#include "arrow/table.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/type_fwd.h"
#include "arrow/type_traits.h"
#include "arrow/util/decimal.h"
#include "arrow/util/float16.h"
#include "parquet/arrow/schema.h"
#include "parquet/arrow/writer.h"
#include "parquet/column_reader.h"
#include "parquet/file_writer.h"
#include "parquet/test_util.h"

namespace parquet {

using internal::RecordReader;
using schema::GroupNode;

namespace arrow {

Expand Down Expand Up @@ -482,6 +488,29 @@ void ExpectArrayT<::arrow::BooleanType>(void* expected, Array* result) {
EXPECT_TRUE(result->Equals(*expected_array));
}

::arrow::Result<std::shared_ptr<Buffer>> WriteFile(
const std::shared_ptr<WriterProperties>& writer_properties,
const std::shared_ptr<::arrow::Table>& table) {
// Get schema from table.
auto schema = table->schema();
std::shared_ptr<SchemaDescriptor> parquet_schema;
auto arrow_writer_properties = default_arrow_writer_properties();
RETURN_NOT_OK(ToParquetSchema(schema.get(), *writer_properties,
*arrow_writer_properties, &parquet_schema));
auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());

// Write table to buffer.
auto sink = CreateOutputStream();
auto pool = ::arrow::default_memory_pool();
auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
std::unique_ptr<FileWriter> arrow_writer;
RETURN_NOT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties,
&arrow_writer));
RETURN_NOT_OK(arrow_writer->WriteTable(*table));
RETURN_NOT_OK(arrow_writer->Close());
return sink->Finish();
}

} // namespace arrow

} // namespace parquet
4 changes: 2 additions & 2 deletions cpp/src/parquet/bloom_filter_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ std::unique_ptr<BloomFilter> RowGroupBloomFilterReaderImpl::GetColumnBloomFilter
}

auto col_chunk = row_group_metadata_->ColumnChunk(i);
std::unique_ptr<ColumnCryptoMetaData> crypto_metadata = col_chunk->crypto_metadata();
if (crypto_metadata != nullptr) {

if (col_chunk->is_encrypted()) {
ParquetException::NYI("BloomFilter decryption is not yet supported");
}

Expand Down
23 changes: 23 additions & 0 deletions cpp/src/parquet/bloom_filter_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ class BloomFilterBuilderImpl : public BloomFilterBuilder {

BloomFilter* CreateBloomFilter(int32_t column_ordinal) override;

void InsertBloomFilter(int32_t column_ordinal,
std::unique_ptr<BloomFilter> bloom_filter) override;

IndexLocations WriteTo(::arrow::io::OutputStream* sink) override;

private:
Expand Down Expand Up @@ -219,6 +222,26 @@ BloomFilter* BloomFilterBuilderImpl::CreateBloomFilter(int32_t column_ordinal) {
return curr_rg_bfs.emplace(column_ordinal, std::move(bf)).first->second.get();
}

void BloomFilterBuilderImpl::InsertBloomFilter(
int32_t column_ordinal, std::unique_ptr<BloomFilter> bloom_filter) {
auto opts = properties_->bloom_filter_options(schema_->Column(column_ordinal)->path());
if (!opts.has_value() || bloom_filter == nullptr) {
return;
}

CheckState(column_ordinal);

auto& curr_rg_bfs = *bloom_filters_.rbegin();
if (curr_rg_bfs.find(column_ordinal) != curr_rg_bfs.cend()) {
std::stringstream ss;
ss << "Bloom filter already exists for column: " << column_ordinal
<< ", row group: " << (bloom_filters_.size() - 1);
throw ParquetException(ss.str());
}

curr_rg_bfs.emplace(column_ordinal, std::move(bloom_filter));
}

IndexLocations BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink) {
if (finished_) {
throw ParquetException("Cannot write a finished BloomFilterBuilder");
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/parquet/bloom_filter_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,18 @@ class PARQUET_EXPORT BloomFilterBuilder {
/// - `WriteTo()` has been called
virtual BloomFilter* CreateBloomFilter(int32_t column_ordinal) = 0;

/// \brief Insert a BloomFilter of the column ordinal of the current row group.
///
/// \param column_ordinal Column ordinal for the bloom filter.
/// \param bloom_filter The bloom filter to insert.
/// \throws ParquetException if any condition is violated:
/// - `AppendRowGroup()` has not been called yet
/// - The column ordinal is out of bound
/// - Bloom filter already exists for the column
/// - `WriteTo()` has been called
virtual void InsertBloomFilter(int32_t column_ordinal,
std::unique_ptr<BloomFilter> bloom_filter) = 0;

/// \brief Write all bloom filters to sink.
///
/// The bloom filters cannot be modified after this method is called.
Expand Down
6 changes: 2 additions & 4 deletions cpp/src/parquet/file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,20 +253,18 @@ class SerializedRowGroup : public RowGroupReader::Contents {
stream = properties_.GetStream(source_, col_range.offset, col_range.length);
}

std::unique_ptr<ColumnCryptoMetaData> crypto_metadata = col->crypto_metadata();

// Prior to Arrow 3.0.0, is_compressed was always set to false in column headers,
// even if compression was used. See ARROW-17100.
bool always_compressed = file_metadata_->writer_version().VersionLt(
ApplicationVersion::PARQUET_CPP_10353_FIXED_VERSION());

// Column is encrypted only if crypto_metadata exists.
if (!crypto_metadata) {
if (!col->is_encrypted()) {
return PageReader::Open(stream, col->num_values(), col->compression(), properties_,
always_compressed);
}

// The column is encrypted
auto crypto_metadata = col->crypto_metadata();
auto* file_decryptor = file_metadata_->file_decryptor().get();
auto meta_decryptor_factory = InternalFileDecryptor::GetColumnMetaDecryptorFactory(
file_decryptor, crypto_metadata.get());
Expand Down
Loading
Loading