Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 37 additions & 18 deletions parquet/encryption_read_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,7 @@ func (d *TestDecryptionSuite) decryptFile(filename string, decryptConfigNum int)
// Read all rows in column
i = 0
for int96reader.HasNext() {
var (
val [1]parquet.Int96
)
var val [1]parquet.Int96

// read one value at a time. the number of rows read is returned. values
// read contains the number of non-null rows
Expand Down Expand Up @@ -553,15 +551,34 @@ func (d *TestDecryptionSuite) checkResults(fileName string, decryptionConfig, en
// once the file is read and the second exists in parquet-testing/data folder
func (d *TestDecryptionSuite) TestDecryption() {
tests := []struct {
file string
config uint
file string
config uint
isInDataStorage bool
}{
{"uniform_encryption.parquet.encrypted", 1},
{"encrypt_columns_and_footer.parquet.encrypted", 2},
{"encrypt_columns_plaintext_footer.parquet.encrypted", 3},
{"encrypt_columns_and_footer_aad.parquet.encrypted", 4},
{"encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted", 5},
{"encrypt_columns_and_footer_ctr.parquet.encrypted", 6},
{"uniform_encryption.parquet.encrypted", 1, true},
{"uniform_encryption.parquet.uncompressed.encrypted", 1, false},
{"uniform_encryption.parquet.v2.encrypted", 1, false},
{"uniform_encryption.parquet.v2.uncompressed.encrypted", 1, false},
{"encrypt_columns_and_footer.parquet.encrypted", 2, true},
{"encrypt_columns_and_footer.parquet.uncompressed.encrypted", 2, false},
{"encrypt_columns_and_footer.parquet.v2.encrypted", 2, false},
{"encrypt_columns_and_footer.parquet.v2.uncompressed.encrypted", 2, false},
{"encrypt_columns_plaintext_footer.parquet.encrypted", 3, true},
{"encrypt_columns_plaintext_footer.parquet.uncompressed.encrypted", 3, false},
{"encrypt_columns_plaintext_footer.parquet.v2.encrypted", 3, false},
{"encrypt_columns_plaintext_footer.parquet.v2.uncompressed.encrypted", 3, false},
{"encrypt_columns_and_footer_aad.parquet.encrypted", 4, true},
{"encrypt_columns_and_footer_aad.parquet.uncompressed.encrypted", 4, false},
{"encrypt_columns_and_footer_aad.parquet.v2.encrypted", 4, false},
{"encrypt_columns_and_footer_aad.parquet.v2.uncompressed.encrypted", 4, false},
{"encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted", 5, true},
{"encrypt_columns_and_footer_disable_aad_storage.parquet.uncompressed.encrypted", 5, false},
{"encrypt_columns_and_footer_disable_aad_storage.parquet.v2.encrypted", 5, false},
{"encrypt_columns_and_footer_disable_aad_storage.parquet.v2.uncompressed.encrypted", 5, false},
{"encrypt_columns_and_footer_ctr.parquet.encrypted", 6, true},
{"encrypt_columns_and_footer_ctr.parquet.uncompressed.encrypted", 6, false},
{"encrypt_columns_and_footer_ctr.parquet.v2.encrypted", 6, false},
{"encrypt_columns_and_footer_ctr.parquet.v2.uncompressed.encrypted", 6, false},
}
for _, tt := range tests {
d.Run(tt.file, func() {
Expand All @@ -576,14 +593,16 @@ func (d *TestDecryptionSuite) TestDecryption() {
}
os.Remove(tmpFile)

file := path.Join(getDataDir(), tt.file)
d.Require().FileExists(file)
if tt.isInDataStorage {
file := path.Join(getDataDir(), tt.file)
d.Require().FileExists(file)

for idx := range d.decryptionConfigs {
decConfig := idx + 1
d.Run(fmt.Sprintf("config %d", decConfig), func() {
d.checkResults(file, uint(decConfig), tt.config)
})
for idx := range d.decryptionConfigs {
decConfig := idx + 1
d.Run(fmt.Sprintf("config %d", decConfig), func() {
d.checkResults(file, uint(decConfig), tt.config)
})
}
}
})
}
Expand Down
69 changes: 46 additions & 23 deletions parquet/encryption_write_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ import (
* keys. Use the alternative (AES_GCM_CTR_V1) algorithm.
*/

var (
tempdir string
)
var tempdir string

type EncryptionConfigTestSuite struct {
suite.Suite
Expand All @@ -79,13 +77,16 @@ type EncryptionConfigTestSuite struct {
columnEncryptionKey2 string
}

func (en *EncryptionConfigTestSuite) encryptFile(configs *parquet.FileEncryptionProperties, filename string) {
func (en *EncryptionConfigTestSuite) encryptFile(configs *parquet.FileEncryptionProperties, filename string, writerOpts ...parquet.WriterProperty) {
filename = filepath.Join(tempdir, filename)

props := parquet.NewWriterProperties(
opts := []parquet.WriterProperty{
parquet.WithPageIndexEnabled(true),
parquet.WithCompression(compress.Codecs.Snappy),
parquet.WithEncryptionProperties(configs))
parquet.WithEncryptionProperties(configs),
}
opts = append(opts, writerOpts...)
props := parquet.NewWriterProperties(opts...)
outFile, err := os.Create(filename)
en.Require().NoError(err)
en.Require().NotNil(outFile)
Expand Down Expand Up @@ -135,20 +136,18 @@ func (en *EncryptionConfigTestSuite) encryptFile(configs *parquet.FileEncryption

// write the int64 column, each row repeats twice
int64Writer := nextColumn().(*file.Int64ColumnChunkWriter)
for i := 0; i < 2*en.rowsPerRG; i++ {
for i := 0; i < en.rowsPerRG; i++ {
var (
defLevel = [1]int16{1}
repLevel = [1]int16{0}
value = int64(i) * 1000 * 1000 * 1000 * 1000
defLevels = []int16{1, 1}
repLevels = []int16{0, 1}
values = []int64{
int64(i*2) * 1000 * 1000 * 1000 * 1000,
int64(i*2+1) * 1000 * 1000 * 1000 * 1000,
}
)
if i%2 == 0 {
repLevel[0] = 0
} else {
repLevel[0] = 1
}

n, err := int64Writer.WriteBatch([]int64{value}, defLevel[:], repLevel[:])
en.EqualValues(1, n)
n, err := int64Writer.WriteBatch(values, defLevels, repLevels)
en.EqualValues(2, n)
en.Require().NoError(err)
}

Expand Down Expand Up @@ -263,7 +262,11 @@ func (en *EncryptionConfigTestSuite) SetupSuite() {
// (uniform encryption)
func (en *EncryptionConfigTestSuite) TestUniformEncryption() {
props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"))
en.encryptFile(props, "tmp_uniform_encryption.parquet.encrypted")
en.encryptFile(props.Clone(""), "tmp_uniform_encryption.parquet.encrypted")
en.encryptFile(props.Clone(""), "tmp_uniform_encryption.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed))
en.encryptFile(props.Clone(""), "tmp_uniform_encryption.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2))
en.encryptFile(props.Clone(""), "tmp_uniform_encryption.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed),
parquet.WithDataPageVersion(parquet.DataPageV2))
}

// Encryption config 2: Encrypt Two Columns and the Footer, with different keys
Expand All @@ -273,7 +276,11 @@ func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsAndFooter() {
encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))

props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols))
en.encryptFile(props, "tmp_encrypt_columns_and_footer.parquet.encrypted")
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer.parquet.encrypted")
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed))
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2))
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed),
parquet.WithDataPageVersion(parquet.DataPageV2))
}

// Encryption Config 3: encrypt two columns, with different keys.
Expand All @@ -285,7 +292,11 @@ func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsPlaintextFooter() {
encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))

props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols), parquet.WithPlaintextFooter())
en.encryptFile(props, "tmp_encrypt_columns_plaintext_footer.parquet.encrypted")
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_plaintext_footer.parquet.encrypted")
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_plaintext_footer.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed))
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_plaintext_footer.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2))
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_plaintext_footer.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed),
parquet.WithDataPageVersion(parquet.DataPageV2))
}

// Encryption Config 4: Encrypt two columns and the footer, with different keys
Expand All @@ -296,7 +307,11 @@ func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsAndFooterWithAadPrefix
encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))

props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols), parquet.WithAadPrefix(en.fileName))
en.encryptFile(props, "tmp_encrypt_columns_and_footer_aad.parquet.encrypted")
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_aad.parquet.encrypted")
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_aad.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed))
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_aad.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2))
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_aad.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed),
parquet.WithDataPageVersion(parquet.DataPageV2))
}

// Encryption Config 5: Encrypt Two columns and the footer, with different keys
Expand All @@ -307,7 +322,11 @@ func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsAndFooterWithAadPrefix
encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))

props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithAadPrefix(en.fileName), parquet.DisableAadPrefixStorage())
en.encryptFile(props, "tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted")
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted")
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed))
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2))
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed),
parquet.WithDataPageVersion(parquet.DataPageV2))
}

// Encryption Config 6: Encrypt two columns and the footer, with different keys.
Expand All @@ -318,7 +337,11 @@ func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsAndFooterAesGcmCtr() {
encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))

props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols), parquet.WithAlg(parquet.AesCtr))
en.encryptFile(props, "tmp_encrypt_columns_and_footer_ctr.parquet.encrypted")
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_ctr.parquet.encrypted")
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_ctr.parquet.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed))
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_ctr.parquet.v2.encrypted", parquet.WithDataPageVersion(parquet.DataPageV2))
en.encryptFile(props.Clone(""), "tmp_encrypt_columns_and_footer_ctr.parquet.v2.uncompressed.encrypted", parquet.WithCompression(compress.Codecs.Uncompressed),
parquet.WithDataPageVersion(parquet.DataPageV2))
}

func TestFileEncryption(t *testing.T) {
Expand Down
7 changes: 4 additions & 3 deletions parquet/file/column_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,9 +551,11 @@ func (p *PrimitiveReaderSuite) TestRepetitionLvlBytesWithMaxRepZero() {
// bytes: the page header reports 1 byte for repetition levels even
// though the max rep level is 0. If that byte isn't skipped then
// we get def levels of [1, 1, 0, 0] instead of the correct [1, 1, 1, 0].
pageData := [...]byte{0x3, 0x3, 0x7, 0x80, 0x1, 0x4, 0x3,
pageData := [...]byte{
0x3, 0x3, 0x7, 0x80, 0x1, 0x4, 0x3,
0x18, 0x1, 0x2, 0x0, 0x0, 0x0, 0xc,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
}

p.pages = append(p.pages, file.NewDataPageV2(memory.NewBufferBytes(pageData[:]), batchSize, 1, batchSize,
parquet.Encodings.DeltaBinaryPacked, 2, 1, int32(len(pageData)), false))
Expand Down Expand Up @@ -733,7 +735,6 @@ func TestFullSeekRow(t *testing.T) {

for _, dataPageVersion := range []parquet.DataPageVersion{parquet.DataPageV2, parquet.DataPageV1} {
t.Run(fmt.Sprintf("DataPageVersion=%v", dataPageVersion+1), func(t *testing.T) {

props := parquet.NewWriterProperties(parquet.WithAllocator(mem),
parquet.WithDataPageVersion(dataPageVersion), parquet.WithDataPageSize(1),
parquet.WithPageIndexEnabled(true))
Expand Down
5 changes: 3 additions & 2 deletions parquet/file/file_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (p *PageSerdeSuite) TestDataPageV2() {

p.dataPageHdrV2.Statistics = getDummyStats(statsSize, true)
p.dataPageHdrV2.NumValues = nrows
p.WriteDataPageHeaderV2(1024, 20, 10)
p.WriteDataPageHeaderV2(1024, 0, 0)
p.InitSerializedPageReader(nrows, compress.Codecs.Uncompressed)
p.True(p.pageReader.Next())
p.CheckDataPageHeaderV2(p.dataPageHdrV2, p.pageReader.Page())
Expand Down Expand Up @@ -310,7 +310,8 @@ func (p *PageSerdeSuite) TestCompression() {

func TestWithEOFReader(t *testing.T) {
root, _ := schema.NewGroupNode("schema", parquet.Repetitions.Repeated, schema.FieldList{
schema.NewInt32Node("int_col", parquet.Repetitions.Required, -1)}, -1)
schema.NewInt32Node("int_col", parquet.Repetitions.Required, -1),
}, -1)
props := parquet.NewWriterProperties(parquet.WithVersion(parquet.V2_LATEST))

var buf bytes.Buffer
Expand Down
58 changes: 51 additions & 7 deletions parquet/file/page_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,50 @@ func (p *serializedPageReader) decompress(rd io.Reader, lenCompressed int, buf [
return p.codec.Decode(buf, data), nil
}

func (p *serializedPageReader) readV2Encrypted(rd io.Reader, lenCompressed int, levelsBytelen int, compressed bool, buf []byte) error {
// if encrypted, we need to decrypt before decompressing
p.decompressBuffer.ResizeNoShrink(lenCompressed)
b := bytes.NewBuffer(p.decompressBuffer.Bytes()[:0])
if _, err := io.CopyN(b, rd, int64(lenCompressed)); err != nil {
return err
}
data := p.cryptoCtx.DataDecryptor.Decrypt(p.decompressBuffer.Bytes())
// encrypted + uncompressed -> just copy the decrypted data to output buffer
if !compressed {
copy(buf, data)
return nil
}

// definition + repetition levels are always uncompressed
if levelsBytelen > 0 {
copy(buf, data[:levelsBytelen])
data = data[levelsBytelen:]
}
p.codec.Decode(buf[levelsBytelen:], data)
return nil
}

func (p *serializedPageReader) readV2Unencrypted(rd io.Reader, lenCompressed int, levelsBytelen int, compressed bool, buf []byte) error {
if !compressed {
// uncompressed, just read into the buffer
if _, err := io.ReadFull(rd, buf); err != nil {
return err
}
return nil
}

// definition + repetition levels are always uncompressed
if levelsBytelen > 0 {
if _, err := io.ReadFull(rd, buf[:levelsBytelen]); err != nil {
return err
}
}
if _, err := p.decompress(p.r, lenCompressed-levelsBytelen, buf[levelsBytelen:]); err != nil {
return err
}
return nil
}

type dataheader interface {
IsSetStatistics() bool
GetStatistics() *format.Statistics
Expand Down Expand Up @@ -628,7 +672,6 @@ func (p *serializedPageReader) readPageHeader(rd parquet.BufferedReader, hdr *fo
}
continue
}

rd.Discard(len(view) - int(remaining) + extra)
break
}
Expand Down Expand Up @@ -812,15 +855,16 @@ func (p *serializedPageReader) Next() bool {
return false
}

if compressed {
if levelsBytelen > 0 {
io.ReadFull(p.r, buf.Bytes()[:levelsBytelen])
}
if _, p.err = p.decompress(p.r, lenCompressed-levelsBytelen, buf.Bytes()[levelsBytelen:]); p.err != nil {
if p.cryptoCtx.DataDecryptor != nil {
if err := p.readV2Encrypted(p.r, lenCompressed, levelsBytelen, compressed, buf.Bytes()); err != nil {
p.err = err
return false
}
} else {
io.ReadFull(p.r, buf.Bytes())
if err := p.readV2Unencrypted(p.r, lenCompressed, levelsBytelen, compressed, buf.Bytes()); err != nil {
p.err = err
return false
}
}

if buf.Len() != lenUncompressed {
Expand Down