start adding parquet support

This commit is contained in:
Mario Fink 2020-07-30 18:34:53 +02:00
parent 194990c62f
commit 441745d894
4 changed files with 554 additions and 2 deletions

View File

@ -5,7 +5,7 @@ The _raw_eater_ package is used to parse files with extension `*.raw`, which
are usually binary files produced by the labsoftware _Famos_ to dump measurement are usually binary files produced by the labsoftware _Famos_ to dump measurement
time series. time series.
## File Structure ## .raw-file format structure
The binary `*.raw` file features a series of markers that indicate the starting The binary `*.raw` file features a series of markers that indicate the starting
point of various blocks of information. Every markers is introduced by character point of various blocks of information. Every markers is introduced by character
@ -101,13 +101,24 @@ section in the file). The markers have the following meaning:
provide the number of values and the actual data, provide the number of values and the actual data,
e.g. `|CS,1, 341299, 1, ...data... ;` e.g. `|CS,1, 341299, 1, ...data... ;`
## Open Issues and question? ### Open Issues and question?
- which parameter indicate(s) little vs. big endian? - which parameter indicate(s) little vs. big endian?
## .parquet-file writer
The extracted and converted data originating from the *.raw file format may be efficiently grouped and
written as .parquet files
[parquet file writer example](https://github.com/apache/parquet-cpp/blob/master/examples/low-level-api/reader-writer.cc)
## References ## References
- https://ch.mathworks.com/matlabcentral/fileexchange/30187-sequnce-to-read-famos-data-into-matlab-workspace - https://ch.mathworks.com/matlabcentral/fileexchange/30187-sequnce-to-read-famos-data-into-matlab-workspace
- https://community.ptc.com/t5/PTC-Mathcad/FAMOS-IMC-raw-data-in-MathCAD/td-p/130378 - https://community.ptc.com/t5/PTC-Mathcad/FAMOS-IMC-raw-data-in-MathCAD/td-p/130378
- http://marmatek.com/wp-content/uploads/2014/04/imc_STUDIO_Manual.pdf - http://marmatek.com/wp-content/uploads/2014/04/imc_STUDIO_Manual.pdf
### Parquet
- https://github.com/apache/parquet-cpp
- https://github.com/apache/parquet-cpp/tree/master/examples

61
lib/parquet/makefile Normal file
View File

@ -0,0 +1,61 @@
PARQUETDIR := /home/mario/Desktop/Record_Evolution/parquet-cpp
ARROWDIR := /home/mario/Desktop/Record_Evolution/arrow/cpp/src
CPP := g++ -std=c++14
OPT :=
#-Wall -Woverflow -Wpedantic -Wextra -Waddress -Waligned-new -Walloc-zero
prepare : collect_parquet modify_parquet collect_arrow modify_arrow
collect_parquet :
cp -r $(PARQUETDIR)/src/parquet ./
cp $(PARQUETDIR)/examples/low-level-api/reader_writer.h ./
cp $(PARQUETDIR)/examples/low-level-api/reader-writer.cc ./
modify_parquet :
cp parquet/parquet_version.h.in parquet/parquet_version.h
sed -i 's/ReadableFileInterface/ReadWriteFileInterface/g' parquet/util/memory.h
sed -i 's/ReadableFileInterface/ReadWriteFileInterface/g' parquet/file_reader.h
sed -i 's/arrow::Codec/arrow::util::Codec/g' parquet/util/memory.h
sed -i 's/valid_bits_writer/valid_bits_offset/g' parquet/column_reader.h
collect_arrow :
cp -r $(ARROWDIR)/arrow ./
modify_arrow :
cp arrow/util/bit_util.h arrow/util/bit-util.h
collect_test :
cp $(PARQUETDIR)/examples/low-level-api/reader-writer.cc ./
subst :
sed -i 's/#include \"arrow\//\/\/#include \"arrow/g' parquet/properties.h
test :
$(CPP) $(OPT) -I$(PWD) reader-writer.cc
clean :
rm -r parquet/ arrow/
rm reader-writer.cc reader_writer.h
#------------------------------------------------------------------------------------#
# only use more recent and up to date repository arrow.git
# build arrow shared/static libraries
build :
cd arrow/cpp
# cmake -LA to show all options
cmake . -D ARROW_PARQUET=ON #ARROW_ARMV8_ARCH=armv8-a
make
example :
cd arrow/cpp/examples/parquet/low-level-api/
g++ reader-writer.cc -I. -I../../../src/ -L../../../../cpp/build/release/ -larrow -lparquet
# set environment variable LD_LIBRARY_PATH=../../../../cpp/build/release/ before launching executable

View File

@ -0,0 +1,409 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <cassert>
#include <fstream>
#include <iostream>
#include <memory>
#include <reader_writer.h>
/*
* This example describes writing and reading Parquet Files in C++ and serves as a
* reference to the API.
* The file contains all the physical data types supported by Parquet.
* This example uses the RowGroupWriter API that supports writing RowGroups optimized for
*memory consumption
**/
/* Parquet is a structured columnar file format
* Parquet File = "Parquet data" + "Parquet Metadata"
* "Parquet data" is simply a vector of RowGroups. Each RowGroup is a batch of rows in a
* columnar layout
* "Parquet Metadata" contains the "file schema" and attributes of the RowGroups and their
* Columns
* "file schema" is a tree where each node is either a primitive type (leaf nodes) or a
* complex (nested) type (internal nodes)
* For specific details, please refer the format here:
* https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
**/
constexpr int NUM_ROWS_PER_ROW_GROUP = 500;
const char PARQUET_FILENAME[] = "parquet_cpp_example.parquet";
int main(int argc, char** argv) {
/**********************************************************************************
PARQUET WRITER EXAMPLE
**********************************************************************************/
// parquet::REQUIRED fields do not need definition and repetition level values
// parquet::OPTIONAL fields require only definition level values
// parquet::REPEATED fields require both definition and repetition level values
try {
// Create a local file output stream instance.
using FileClass = ::arrow::io::FileOutputStream;
std::shared_ptr<FileClass> out_file;
PARQUET_THROW_NOT_OK(FileClass::Open(PARQUET_FILENAME, &out_file));
// Setup the parquet schema
std::shared_ptr<GroupNode> schema = SetupSchema();
// Add writer properties
parquet::WriterProperties::Builder builder;
builder.compression(parquet::Compression::SNAPPY);
std::shared_ptr<parquet::WriterProperties> props = builder.build();
// Create a ParquetFileWriter instance
std::shared_ptr<parquet::ParquetFileWriter> file_writer =
parquet::ParquetFileWriter::Open(out_file, schema, props);
// Append a RowGroup with a specific number of rows.
parquet::RowGroupWriter* rg_writer = file_writer->AppendRowGroup();
// Write the Bool column
parquet::BoolWriter* bool_writer =
static_cast<parquet::BoolWriter*>(rg_writer->NextColumn());
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
bool value = ((i % 2) == 0) ? true : false;
bool_writer->WriteBatch(1, nullptr, nullptr, &value);
}
// Write the Int32 column
parquet::Int32Writer* int32_writer =
static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
int32_t value = i;
int32_writer->WriteBatch(1, nullptr, nullptr, &value);
}
// Write the Int64 column. Each row has repeats twice.
parquet::Int64Writer* int64_writer =
static_cast<parquet::Int64Writer*>(rg_writer->NextColumn());
for (int i = 0; i < 2 * NUM_ROWS_PER_ROW_GROUP; i++) {
int64_t value = i * 1000 * 1000;
value *= 1000 * 1000;
int16_t definition_level = 1;
int16_t repetition_level = 0;
if ((i % 2) == 0) {
repetition_level = 1; // start of a new record
}
int64_writer->WriteBatch(1, &definition_level, &repetition_level, &value);
}
// Write the INT96 column.
parquet::Int96Writer* int96_writer =
static_cast<parquet::Int96Writer*>(rg_writer->NextColumn());
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
parquet::Int96 value;
value.value[0] = i;
value.value[1] = i + 1;
value.value[2] = i + 2;
int96_writer->WriteBatch(1, nullptr, nullptr, &value);
}
// Write the Float column
parquet::FloatWriter* float_writer =
static_cast<parquet::FloatWriter*>(rg_writer->NextColumn());
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
float value = static_cast<float>(i) * 1.1f;
float_writer->WriteBatch(1, nullptr, nullptr, &value);
}
// Write the Double column
parquet::DoubleWriter* double_writer =
static_cast<parquet::DoubleWriter*>(rg_writer->NextColumn());
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
double value = i * 1.1111111;
double_writer->WriteBatch(1, nullptr, nullptr, &value);
}
// Write the ByteArray column. Make every alternate values NULL
parquet::ByteArrayWriter* ba_writer =
static_cast<parquet::ByteArrayWriter*>(rg_writer->NextColumn());
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
parquet::ByteArray value;
char hello[FIXED_LENGTH] = "parquet";
hello[7] = static_cast<char>(static_cast<int>('0') + i / 100);
hello[8] = static_cast<char>(static_cast<int>('0') + (i / 10) % 10);
hello[9] = static_cast<char>(static_cast<int>('0') + i % 10);
if (i % 2 == 0) {
int16_t definition_level = 1;
value.ptr = reinterpret_cast<const uint8_t*>(&hello[0]);
value.len = FIXED_LENGTH;
ba_writer->WriteBatch(1, &definition_level, nullptr, &value);
} else {
int16_t definition_level = 0;
ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr);
}
}
// Write the FixedLengthByteArray column
parquet::FixedLenByteArrayWriter* flba_writer =
static_cast<parquet::FixedLenByteArrayWriter*>(rg_writer->NextColumn());
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
parquet::FixedLenByteArray value;
char v = static_cast<char>(i);
char flba[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v};
value.ptr = reinterpret_cast<const uint8_t*>(&flba[0]);
flba_writer->WriteBatch(1, nullptr, nullptr, &value);
}
// Close the ParquetFileWriter
file_writer->Close();
// Write the bytes to file
DCHECK(out_file->Close().ok());
} catch (const std::exception& e) {
std::cerr << "Parquet write error: " << e.what() << std::endl;
return -1;
}
/**********************************************************************************
PARQUET READER EXAMPLE
**********************************************************************************/
try {
// Create a ParquetReader instance
std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
parquet::ParquetFileReader::OpenFile(PARQUET_FILENAME, false);
// Get the File MetaData
std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata();
// Get the number of RowGroups
int num_row_groups = file_metadata->num_row_groups();
assert(num_row_groups == 1);
// Get the number of Columns
int num_columns = file_metadata->num_columns();
assert(num_columns == 8);
// Iterate over all the RowGroups in the file
for (int r = 0; r < num_row_groups; ++r) {
// Get the RowGroup Reader
std::shared_ptr<parquet::RowGroupReader> row_group_reader =
parquet_reader->RowGroup(r);
int64_t values_read = 0;
int64_t rows_read = 0;
int16_t definition_level;
int16_t repetition_level;
int i;
std::shared_ptr<parquet::ColumnReader> column_reader;
// Get the Column Reader for the boolean column
column_reader = row_group_reader->Column(0);
parquet::BoolReader* bool_reader =
static_cast<parquet::BoolReader*>(column_reader.get());
// Read all the rows in the column
i = 0;
while (bool_reader->HasNext()) {
bool value;
// Read one value at a time. The number of rows read is returned. values_read
// contains the number of non-null rows
rows_read = bool_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
// Ensure only one value is read
assert(rows_read == 1);
// There are no NULL values in the rows written
assert(values_read == 1);
// Verify the value written
bool expected_value = ((i % 2) == 0) ? true : false;
assert(value == expected_value);
i++;
}
// Get the Column Reader for the Int32 column
column_reader = row_group_reader->Column(1);
parquet::Int32Reader* int32_reader =
static_cast<parquet::Int32Reader*>(column_reader.get());
// Read all the rows in the column
i = 0;
while (int32_reader->HasNext()) {
int32_t value;
// Read one value at a time. The number of rows read is returned. values_read
// contains the number of non-null rows
rows_read = int32_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
// Ensure only one value is read
assert(rows_read == 1);
// There are no NULL values in the rows written
assert(values_read == 1);
// Verify the value written
assert(value == i);
i++;
}
// Get the Column Reader for the Int64 column
column_reader = row_group_reader->Column(2);
parquet::Int64Reader* int64_reader =
static_cast<parquet::Int64Reader*>(column_reader.get());
// Read all the rows in the column
i = 0;
while (int64_reader->HasNext()) {
int64_t value;
// Read one value at a time. The number of rows read is returned. values_read
// contains the number of non-null rows
rows_read = int64_reader->ReadBatch(1, &definition_level, &repetition_level,
&value, &values_read);
// Ensure only one value is read
assert(rows_read == 1);
// There are no NULL values in the rows written
assert(values_read == 1);
// Verify the value written
int64_t expected_value = i * 1000 * 1000;
expected_value *= 1000 * 1000;
assert(value == expected_value);
if ((i % 2) == 0) {
assert(repetition_level == 1);
} else {
assert(repetition_level == 0);
}
i++;
}
// Get the Column Reader for the Int96 column
column_reader = row_group_reader->Column(3);
parquet::Int96Reader* int96_reader =
static_cast<parquet::Int96Reader*>(column_reader.get());
// Read all the rows in the column
i = 0;
while (int96_reader->HasNext()) {
parquet::Int96 value;
// Read one value at a time. The number of rows read is returned. values_read
// contains the number of non-null rows
rows_read = int96_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
// Ensure only one value is read
assert(rows_read == 1);
// There are no NULL values in the rows written
assert(values_read == 1);
// Verify the value written
parquet::Int96 expected_value;
expected_value.value[0] = i;
expected_value.value[1] = i + 1;
expected_value.value[2] = i + 2;
for (int j = 0; j < 3; j++) {
assert(value.value[j] == expected_value.value[j]);
}
i++;
}
// Get the Column Reader for the Float column
column_reader = row_group_reader->Column(4);
parquet::FloatReader* float_reader =
static_cast<parquet::FloatReader*>(column_reader.get());
// Read all the rows in the column
i = 0;
while (float_reader->HasNext()) {
float value;
// Read one value at a time. The number of rows read is returned. values_read
// contains the number of non-null rows
rows_read = float_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
// Ensure only one value is read
assert(rows_read == 1);
// There are no NULL values in the rows written
assert(values_read == 1);
// Verify the value written
float expected_value = static_cast<float>(i) * 1.1f;
assert(value == expected_value);
i++;
}
// Get the Column Reader for the Double column
column_reader = row_group_reader->Column(5);
parquet::DoubleReader* double_reader =
static_cast<parquet::DoubleReader*>(column_reader.get());
// Read all the rows in the column
i = 0;
while (double_reader->HasNext()) {
double value;
// Read one value at a time. The number of rows read is returned. values_read
// contains the number of non-null rows
rows_read = double_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
// Ensure only one value is read
assert(rows_read == 1);
// There are no NULL values in the rows written
assert(values_read == 1);
// Verify the value written
double expected_value = i * 1.1111111;
assert(value == expected_value);
i++;
}
// Get the Column Reader for the ByteArray column
column_reader = row_group_reader->Column(6);
parquet::ByteArrayReader* ba_reader =
static_cast<parquet::ByteArrayReader*>(column_reader.get());
// Read all the rows in the column
i = 0;
while (ba_reader->HasNext()) {
parquet::ByteArray value;
// Read one value at a time. The number of rows read is returned. values_read
// contains the number of non-null rows
rows_read =
ba_reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read);
// Ensure only one value is read
assert(rows_read == 1);
// Verify the value written
char expected_value[FIXED_LENGTH] = "parquet";
expected_value[7] = static_cast<char>('0' + i / 100);
expected_value[8] = static_cast<char>('0' + (i / 10) % 10);
expected_value[9] = static_cast<char>('0' + i % 10);
if (i % 2 == 0) { // only alternate values exist
// There are no NULL values in the rows written
assert(values_read == 1);
assert(value.len == FIXED_LENGTH);
assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0);
assert(definition_level == 1);
} else {
// There are NULL values in the rows written
assert(values_read == 0);
assert(definition_level == 0);
}
i++;
}
// Get the Column Reader for the FixedLengthByteArray column
column_reader = row_group_reader->Column(7);
parquet::FixedLenByteArrayReader* flba_reader =
static_cast<parquet::FixedLenByteArrayReader*>(column_reader.get());
// Read all the rows in the column
i = 0;
while (flba_reader->HasNext()) {
parquet::FixedLenByteArray value;
// Read one value at a time. The number of rows read is returned. values_read
// contains the number of non-null rows
rows_read = flba_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
// Ensure only one value is read
assert(rows_read == 1);
// There are no NULL values in the rows written
assert(values_read == 1);
// Verify the value written
char v = static_cast<char>(i);
char expected_value[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v};
assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0);
i++;
}
}
} catch (const std::exception& e) {
std::cerr << "Parquet read error: " << e.what() << std::endl;
return -1;
}
std::cout << "Parquet Writing and Reading Complete" << std::endl;
return 0;
}

View File

@ -0,0 +1,71 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <arrow/io/file.h>
#include <arrow/util/logging.h>
#include <parquet/api/reader.h>
#include <parquet/api/writer.h>
using parquet::LogicalType;
using parquet::Repetition;
using parquet::Type;
using parquet::schema::GroupNode;
using parquet::schema::PrimitiveNode;
constexpr int FIXED_LENGTH = 10;
static std::shared_ptr<GroupNode> SetupSchema() {
parquet::schema::NodeVector fields;
// Create a primitive node named 'boolean_field' with type:BOOLEAN,
// repetition:REQUIRED
fields.push_back(PrimitiveNode::Make("boolean_field", Repetition::REQUIRED,
Type::BOOLEAN, LogicalType::NONE));
// Create a primitive node named 'int32_field' with type:INT32, repetition:REQUIRED,
// logical type:TIME_MILLIS
fields.push_back(PrimitiveNode::Make("int32_field", Repetition::REQUIRED, Type::INT32,
LogicalType::TIME_MILLIS));
// Create a primitive node named 'int64_field' with type:INT64, repetition:REPEATED
fields.push_back(PrimitiveNode::Make("int64_field", Repetition::REPEATED, Type::INT64,
LogicalType::NONE));
fields.push_back(PrimitiveNode::Make("int96_field", Repetition::REQUIRED, Type::INT96,
LogicalType::NONE));
fields.push_back(PrimitiveNode::Make("float_field", Repetition::REQUIRED, Type::FLOAT,
LogicalType::NONE));
fields.push_back(PrimitiveNode::Make("double_field", Repetition::REQUIRED, Type::DOUBLE,
LogicalType::NONE));
// Create a primitive node named 'ba_field' with type:BYTE_ARRAY, repetition:OPTIONAL
fields.push_back(PrimitiveNode::Make("ba_field", Repetition::OPTIONAL, Type::BYTE_ARRAY,
LogicalType::NONE));
// Create a primitive node named 'flba_field' with type:FIXED_LEN_BYTE_ARRAY,
// repetition:REQUIRED, field_length = FIXED_LENGTH
fields.push_back(PrimitiveNode::Make("flba_field", Repetition::REQUIRED,
Type::FIXED_LEN_BYTE_ARRAY, LogicalType::NONE,
FIXED_LENGTH));
// Create a GroupNode named 'schema' using the primitive nodes defined above
// This GroupNode is the root node of the schema tree
return std::static_pointer_cast<GroupNode>(
GroupNode::Make("schema", Repetition::REQUIRED, fields));
}