Skip to content

compute.Take on Record/Array with nested struct field causes nil pointer dereference #644

@sriharshaj

Description

@sriharshaj

Describe the bug

When using compute.Take on a Record that contains a nested struct field (a struct containing another struct), the resulting Record's nested struct column is corrupted and causes a nil pointer dereference when accessed.

Flat fields (primitives, strings) are correctly reordered, but the nested struct field becomes invalid.

Version

  • arrow-go: v18.5.0
  • Go: 1.25.5 darwin/arm64

Error message

 runtime error: invalid memory address or nil pointer dereference

Minimal reproduction

package main

import (
	"context"
	"log"
	"sort"

	"github.com/apache/arrow-go/v18/arrow"
	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/compute"
	"github.com/apache/arrow-go/v18/arrow/memory"
)

type Impression struct {
	AdGroupID int32
	ChannelID int32
	Spend     int64
	Exchange  string
	Bid       *Bid
}

type ImpressionBuilder struct {
	_builder *array.StructBuilder

	AdGroupID   *array.Int32Builder
	ChannelID   *array.Int32Builder
	SpendMicros *array.Int64Builder
	Exchange    *array.StringBuilder
	Bid         *BidBuilder
}

func ImpressionArrowType() *arrow.StructType {
	fields := []arrow.Field{
		{
			Name:     "ad_group_id",
			Type:     arrow.PrimitiveTypes.Int32,
			Nullable: true,
		},
		{
			Name:     "channel_id",
			Type:     arrow.PrimitiveTypes.Int32,
			Nullable: true,
		},
		{
			Name:     "spend_micros",
			Type:     arrow.PrimitiveTypes.Int64,
			Nullable: true,
		},
		{
			Name:     "exchange",
			Type:     arrow.BinaryTypes.String,
			Nullable: true,
		},
		{
			Name:     "bid",
			Type:     BidArrowType(),
			Nullable: true,
		},
	}

	return arrow.StructOf(fields...)
}

func NewImpressionBuilder(sb *array.StructBuilder) *ImpressionBuilder {
	builder := &ImpressionBuilder{
		_builder: sb,
	}

	builder.AdGroupID = sb.FieldBuilder(0).(*array.Int32Builder)
	builder.ChannelID = sb.FieldBuilder(1).(*array.Int32Builder)
	builder.SpendMicros = sb.FieldBuilder(2).(*array.Int64Builder)
	builder.Exchange = sb.FieldBuilder(3).(*array.StringBuilder)

	builder.Bid = NewBidBuilder(sb.FieldBuilder(4).(*array.StructBuilder))

	return builder
}

func (b *ImpressionBuilder) Append(imp *Impression) {
	if imp == nil {
		b._builder.AppendNull()
		return
	}

	b.AdGroupID.Append(imp.AdGroupID)
	b.ChannelID.Append(imp.ChannelID)
	b.SpendMicros.Append(imp.Spend)
	b.Exchange.Append(imp.Exchange)

	b.Bid.Append(imp.Bid)

	b._builder.Append(true)
}

type Bid struct {
	AppPlatform string
	CustomerID  int32
	CampaignID  int32
	AdGroupID   int32
}

type BidBuilder struct {
	_builder *array.StructBuilder

	AppPlatform *array.StringBuilder
	CustomerID  *array.Int32Builder
	CampaignID  *array.Int32Builder
	AdGroupID   *array.Int32Builder
}

func BidArrowType() *arrow.StructType {
	fields := []arrow.Field{
		{
			Name:     "app_platform",
			Type:     arrow.BinaryTypes.String,
			Nullable: true,
		},
		{
			Name:     "customer_id",
			Type:     arrow.PrimitiveTypes.Int32,
			Nullable: true,
		},
		{
			Name:     "campaign_id",
			Type:     arrow.PrimitiveTypes.Int32,
			Nullable: true,
		},
		{
			Name:     "ad_group_id",
			Type:     arrow.PrimitiveTypes.Int32,
			Nullable: true,
		},
	}

	return arrow.StructOf(fields...)
}

func NewBidBuilder(sb *array.StructBuilder) *BidBuilder {
	builder := &BidBuilder{
		_builder: sb,
	}

	builder.AppPlatform = sb.FieldBuilder(0).(*array.StringBuilder)
	builder.CustomerID = sb.FieldBuilder(1).(*array.Int32Builder)
	builder.CampaignID = sb.FieldBuilder(2).(*array.Int32Builder)
	builder.AdGroupID = sb.FieldBuilder(3).(*array.Int32Builder)

	return builder
}

func (b *BidBuilder) Append(bid *Bid) {
	if bid == nil {
		b._builder.AppendNull()
		return
	}

	b.AppPlatform.Append(bid.AppPlatform)
	b.CustomerID.Append(bid.CustomerID)
	b.CampaignID.Append(bid.CampaignID)
	b.AdGroupID.Append(bid.AdGroupID)

	b._builder.Append(true)
}


func data() []*Impression {
	return []*Impression{
		{
			AdGroupID: 51,
			ChannelID: 21,
			Spend:     100,
			Exchange:  "DOUBLECLICK",
			Bid: &Bid{
				AppPlatform: "IOS",
				CustomerID:  31,
				CampaignID:  41,
				AdGroupID:   51,
			},
		},
		{
			AdGroupID: 52,
			ChannelID: 22,
			Spend:     200,
			Exchange:  "APPLOVIN",
			Bid: &Bid{
				AppPlatform: "ANDROID",
				CustomerID:  32,
				CampaignID:  42,
				AdGroupID:   52,
			},
		},
		{
			AdGroupID: 53,
			ChannelID: 23,
			Spend:     300,
			Exchange:  "VUNGLE",
			Bid: &Bid{
				AppPlatform: "ANDROID",
				CustomerID:  33,
				CampaignID:  43,
				AdGroupID:   53,
			},
		},
	}
}

// func writeToParquet(record arrow.RecordBatch) {
// 	out, err := os.Create("output.parquet")
// 	if err != nil {
// 		log.Fatalln("error creating file: ", err)
// 	}
// 	defer out.Close()

// 	fw, err := pqarrow.NewFileWriter(record.Schema(), out, parquet.NewWriterProperties(), pqarrow.DefaultWriterProps())
// 	if err != nil {
// 		log.Fatalln("error creating new file writer: ", err)
// 	}
// 	defer fw.Close()

// 	if err := fw.Write(record); err != nil {
// 		log.Println("error writing to parquet: ", err)
// 	}
// }

type Indices struct {
	indices []int
	data    *array.String
}

func NewIndices(data *array.String) *Indices {
	indices := make([]int, data.Len())
	for i := range data.Len() {
		indices[i] = i
	}

	return &Indices{
		indices,
		data,
	}
}

func (in *Indices) Len() int {
	return len(in.indices)
}

func (in *Indices) Less(i, j int) bool {
	return in.data.Value(in.indices[i]) < in.data.Value(in.indices[j])
}

func (in *Indices) Swap(i, j int) {
	temp := in.indices[i]
	in.indices[i] = in.indices[j]
	in.indices[j] = temp
}

func (in *Indices) Array() arrow.Array {
	indices := array.NewUint32Builder(memory.DefaultAllocator)
	defer indices.Release()

	indices.Reserve(len(in.indices))

	for _, i := range in.indices {
		indices.Append(uint32(i))
	}

	return indices.NewArray()
}

func main() {
	builder := array.NewStructBuilder(memory.DefaultAllocator, ImpressionArrowType())
	impBuilder := NewImpressionBuilder(builder)

	for _, imp := range data() {
		impBuilder.Append(imp)
	}

	sa := builder.NewStructArray()
	defer sa.Release()

	record := array.RecordFromStructArray(sa, nil)
	defer record.Release()

	recordDatum := compute.NewDatum(record)
	defer recordDatum.Release()

	exchange := sa.Field(3).(*array.String)
        defer exchange.Release()

	indices := NewIndices(exchange)
	sort.Sort(indices)

	indicesDatum := compute.NewDatum(indices.Array())
	defer indicesDatum.Release()

	recordDatumSorted, err := compute.Take(context.Background(), *compute.DefaultTakeOptions(), recordDatum, indicesDatum)
	if err != nil {
		log.Fatalln("error sorting record: ", err)
	}
	defer recordDatumSorted.Release()

	recordSorted := recordDatumSorted.(*compute.RecordDatum).Value
	defer recordSorted.Release()

	log.Println("recordSorted.Column(3): ", recordSorted.Column(3))
	log.Println("recordSorted.Column(4): ", recordSorted.Column(4))
}

Component(s)

Other (compute)

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type: bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions