-
Notifications
You must be signed in to change notification settings - Fork 89
Open
Labels
Type: bugSomething isn't workingSomething isn't working
Description
Describe the bug
When using compute.Take on a Record that contains a nested struct field (a struct containing another struct), the resulting Record's nested struct column is corrupted and causes a nil pointer dereference when accessed.
Flat fields (primitives, strings) are correctly reordered, but the nested struct field becomes invalid.
Version
- arrow-go: v18.5.0
- Go: 1.25.5 darwin/arm64
Error message
runtime error: invalid memory address or nil pointer dereference
Minimal reproduction
package main
import (
"context"
"log"
"sort"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/compute"
"github.com/apache/arrow-go/v18/arrow/memory"
)
type Impression struct {
AdGroupID int32
ChannelID int32
Spend int64
Exchange string
Bid *Bid
}
type ImpressionBuilder struct {
_builder *array.StructBuilder
AdGroupID *array.Int32Builder
ChannelID *array.Int32Builder
SpendMicros *array.Int64Builder
Exchange *array.StringBuilder
Bid *BidBuilder
}
func ImpressionArrowType() *arrow.StructType {
fields := []arrow.Field{
{
Name: "ad_group_id",
Type: arrow.PrimitiveTypes.Int32,
Nullable: true,
},
{
Name: "channel_id",
Type: arrow.PrimitiveTypes.Int32,
Nullable: true,
},
{
Name: "spend_micros",
Type: arrow.PrimitiveTypes.Int64,
Nullable: true,
},
{
Name: "exchange",
Type: arrow.BinaryTypes.String,
Nullable: true,
},
{
Name: "bid",
Type: BidArrowType(),
Nullable: true,
},
}
return arrow.StructOf(fields...)
}
func NewImpressionBuilder(sb *array.StructBuilder) *ImpressionBuilder {
builder := &ImpressionBuilder{
_builder: sb,
}
builder.AdGroupID = sb.FieldBuilder(0).(*array.Int32Builder)
builder.ChannelID = sb.FieldBuilder(1).(*array.Int32Builder)
builder.SpendMicros = sb.FieldBuilder(2).(*array.Int64Builder)
builder.Exchange = sb.FieldBuilder(3).(*array.StringBuilder)
builder.Bid = NewBidBuilder(sb.FieldBuilder(4).(*array.StructBuilder))
return builder
}
func (b *ImpressionBuilder) Append(imp *Impression) {
if imp == nil {
b._builder.AppendNull()
return
}
b.AdGroupID.Append(imp.AdGroupID)
b.ChannelID.Append(imp.ChannelID)
b.SpendMicros.Append(imp.Spend)
b.Exchange.Append(imp.Exchange)
b.Bid.Append(imp.Bid)
b._builder.Append(true)
}
type Bid struct {
AppPlatform string
CustomerID int32
CampaignID int32
AdGroupID int32
}
type BidBuilder struct {
_builder *array.StructBuilder
AppPlatform *array.StringBuilder
CustomerID *array.Int32Builder
CampaignID *array.Int32Builder
AdGroupID *array.Int32Builder
}
func BidArrowType() *arrow.StructType {
fields := []arrow.Field{
{
Name: "app_platform",
Type: arrow.BinaryTypes.String,
Nullable: true,
},
{
Name: "customer_id",
Type: arrow.PrimitiveTypes.Int32,
Nullable: true,
},
{
Name: "campaign_id",
Type: arrow.PrimitiveTypes.Int32,
Nullable: true,
},
{
Name: "ad_group_id",
Type: arrow.PrimitiveTypes.Int32,
Nullable: true,
},
}
return arrow.StructOf(fields...)
}
func NewBidBuilder(sb *array.StructBuilder) *BidBuilder {
builder := &BidBuilder{
_builder: sb,
}
builder.AppPlatform = sb.FieldBuilder(0).(*array.StringBuilder)
builder.CustomerID = sb.FieldBuilder(1).(*array.Int32Builder)
builder.CampaignID = sb.FieldBuilder(2).(*array.Int32Builder)
builder.AdGroupID = sb.FieldBuilder(3).(*array.Int32Builder)
return builder
}
func (b *BidBuilder) Append(bid *Bid) {
if bid == nil {
b._builder.AppendNull()
return
}
b.AppPlatform.Append(bid.AppPlatform)
b.CustomerID.Append(bid.CustomerID)
b.CampaignID.Append(bid.CampaignID)
b.AdGroupID.Append(bid.AdGroupID)
b._builder.Append(true)
}
func data() []*Impression {
return []*Impression{
{
AdGroupID: 51,
ChannelID: 21,
Spend: 100,
Exchange: "DOUBLECLICK",
Bid: &Bid{
AppPlatform: "IOS",
CustomerID: 31,
CampaignID: 41,
AdGroupID: 51,
},
},
{
AdGroupID: 52,
ChannelID: 22,
Spend: 200,
Exchange: "APPLOVIN",
Bid: &Bid{
AppPlatform: "ANDROID",
CustomerID: 32,
CampaignID: 42,
AdGroupID: 52,
},
},
{
AdGroupID: 53,
ChannelID: 23,
Spend: 300,
Exchange: "VUNGLE",
Bid: &Bid{
AppPlatform: "ANDROID",
CustomerID: 33,
CampaignID: 43,
AdGroupID: 53,
},
},
}
}
// func writeToParquet(record arrow.RecordBatch) {
// out, err := os.Create("output.parquet")
// if err != nil {
// log.Fatalln("error creating file: ", err)
// }
// defer out.Close()
// fw, err := pqarrow.NewFileWriter(record.Schema(), out, parquet.NewWriterProperties(), pqarrow.DefaultWriterProps())
// if err != nil {
// log.Fatalln("error creating new file writer: ", err)
// }
// defer fw.Close()
// if err := fw.Write(record); err != nil {
// log.Println("error writing to parquet: ", err)
// }
// }
type Indices struct {
indices []int
data *array.String
}
func NewIndices(data *array.String) *Indices {
indices := make([]int, data.Len())
for i := range data.Len() {
indices[i] = i
}
return &Indices{
indices,
data,
}
}
func (in *Indices) Len() int {
return len(in.indices)
}
func (in *Indices) Less(i, j int) bool {
return in.data.Value(in.indices[i]) < in.data.Value(in.indices[j])
}
func (in *Indices) Swap(i, j int) {
temp := in.indices[i]
in.indices[i] = in.indices[j]
in.indices[j] = temp
}
func (in *Indices) Array() arrow.Array {
indices := array.NewUint32Builder(memory.DefaultAllocator)
defer indices.Release()
indices.Reserve(len(in.indices))
for _, i := range in.indices {
indices.Append(uint32(i))
}
return indices.NewArray()
}
func main() {
builder := array.NewStructBuilder(memory.DefaultAllocator, ImpressionArrowType())
impBuilder := NewImpressionBuilder(builder)
for _, imp := range data() {
impBuilder.Append(imp)
}
sa := builder.NewStructArray()
defer sa.Release()
record := array.RecordFromStructArray(sa, nil)
defer record.Release()
recordDatum := compute.NewDatum(record)
defer recordDatum.Release()
exchange := sa.Field(3).(*array.String)
defer exchange.Release()
indices := NewIndices(exchange)
sort.Sort(indices)
indicesDatum := compute.NewDatum(indices.Array())
defer indicesDatum.Release()
recordDatumSorted, err := compute.Take(context.Background(), *compute.DefaultTakeOptions(), recordDatum, indicesDatum)
if err != nil {
log.Fatalln("error sorting record: ", err)
}
defer recordDatumSorted.Release()
recordSorted := recordDatumSorted.(*compute.RecordDatum).Value
defer recordSorted.Release()
log.Println("recordSorted.Column(3): ", recordSorted.Column(3))
log.Println("recordSorted.Column(4): ", recordSorted.Column(4))
}
Component(s)
Other (compute)
Metadata
Metadata
Assignees
Labels
Type: bugSomething isn't workingSomething isn't working