Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,18 @@ func (s *Service) newStampedPutter(ctx context.Context, opts putterOptions, stam
return nil, errInvalidPostageBatch
}

return s.newStampedPutterWithBatch(ctx, opts, stamp, storedBatch)
}

// newStampedPutterWithBatch creates a stamped putter using a pre-fetched batch
// This avoids the database lookup when batch info is already cached
func (s *Service) newStampedPutterWithBatch(ctx context.Context, opts putterOptions, stamp *postage.Stamp, storedBatch *postage.Batch) (storer.PutterSession, error) {
if !opts.Deferred && s.beeMode == DevMode {
return nil, errUnsupportedDevNodeOperation
}

var session storer.PutterSession
var err error
if opts.Deferred || opts.Pin {
session, err = s.storer.Upload(ctx, opts.Pin, opts.TagID)
if err != nil {
Expand Down
172 changes: 143 additions & 29 deletions pkg/api/chunk_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (s *Service) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Reques
logger := s.logger.WithName("chunks_stream").Build()

headers := struct {
BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"`
BatchID []byte `map:"Swarm-Postage-Batch-Id"` // Optional: can be omitted for per-chunk stamping
SwarmTag uint64 `map:"Swarm-Tag"`
}{}
if response := s.mapStructure(r.Header, &headers); response != nil {
Expand All @@ -55,29 +55,34 @@ func (s *Service) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Reques
}
}

// if tag not specified use direct upload
// Using context.Background here because the putter's lifetime extends beyond that of the HTTP request.
putter, err := s.newStamperPutter(context.Background(), putterOptions{
BatchID: headers.BatchID,
TagID: tag,
Deferred: tag != 0,
})
if err != nil {
logger.Debug("get putter failed", "error", err)
logger.Error(nil, "get putter failed")
switch {
case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable):
jsonhttp.UnprocessableEntity(w, "batch not usable yet or does not exist")
case errors.Is(err, postage.ErrNotFound):
jsonhttp.NotFound(w, "batch with id not found")
case errors.Is(err, errInvalidPostageBatch):
jsonhttp.BadRequest(w, "invalid batch id")
case errors.Is(err, errUnsupportedDevNodeOperation):
jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation)
default:
jsonhttp.BadRequest(w, nil)
// Create connection-level putter only if BatchID is provided
// If BatchID is not provided, per-chunk stamps must be used
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I fully understand what per-chunk stamps are. Do you mean these are presigned stamps? If so, consider changing the comment to something like if ...., the API caller is expected to provide pre-signed stamps (and is also expected to keep track over stamp state over time)

var putter storer.PutterSession
if len(headers.BatchID) > 0 {
// if tag not specified use direct upload
// Using context.Background here because the putter's lifetime extends beyond that of the HTTP request.
putter, err = s.newStamperPutter(context.Background(), putterOptions{
BatchID: headers.BatchID,
TagID: tag,
Deferred: tag != 0,
})
if err != nil {
logger.Debug("get putter failed", "error", err)
logger.Error(nil, "get putter failed")
switch {
case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable):
jsonhttp.UnprocessableEntity(w, "batch not usable yet or does not exist")
case errors.Is(err, postage.ErrNotFound):
jsonhttp.NotFound(w, "batch with id not found")
case errors.Is(err, errInvalidPostageBatch):
jsonhttp.BadRequest(w, "invalid batch id")
case errors.Is(err, errUnsupportedDevNodeOperation):
jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation)
default:
jsonhttp.BadRequest(w, nil)
}
return
}
return
}

upgrader := websocket.Upgrader{
Expand All @@ -95,13 +100,46 @@ func (s *Service) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Reques
}

s.wsWg.Add(1)
go s.handleUploadStream(logger, wsConn, putter)
var decode chunkDecoder
if len(headers.BatchID) > 0 {
decode = decodeChunkWithoutStamp
} else {
decode = decodeChunkWithStamp
}
go s.handleUploadStream(logger, wsConn, putter, tag, decode)
}

// chunkDecoder extracts chunk data and optionally a stamp from a websocket message.
// When BatchID is provided in headers, decodeChunkWithoutStamp is used (no stamp in message).
// When BatchID is not provided, decodeChunkWithStamp is used (stamp prepended to chunk data).
type chunkDecoder func(msg []byte) (chunkData []byte, stamp *postage.Stamp, err error)

// decodeChunkWithoutStamp returns the message as-is (used when BatchID provided in headers).
func decodeChunkWithoutStamp(msg []byte) ([]byte, *postage.Stamp, error) {
return msg, nil, nil
}

// decodeChunkWithStamp extracts a stamp from the first 113 bytes of the message.
// Returns an error if the message is too small or the stamp is invalid.
func decodeChunkWithStamp(msg []byte) ([]byte, *postage.Stamp, error) {
if len(msg) < postage.StampSize+swarm.SpanSize {
return nil, nil, errors.New("message too small for stamp + chunk")
}

stamp := &postage.Stamp{}
if err := stamp.UnmarshalBinary(msg[:postage.StampSize]); err != nil {
return nil, nil, errors.New("invalid stamp")
}

return msg[postage.StampSize:], stamp, nil
}

func (s *Service) handleUploadStream(
logger log.Logger,
conn *websocket.Conn,
putter storer.PutterSession,
tag uint64,
decode chunkDecoder,
) {
defer s.wsWg.Done()

Expand All @@ -111,11 +149,23 @@ func (s *Service) handleUploadStream(
gone = make(chan struct{})
err error
)

// Cache for batch validation to avoid database lookups for every chunk
// Key: batch ID hex string, Value: stored batch info
// This avoids the expensive batchStore.Get() call for each chunk
batchCache := make(map[string]*postage.Batch)

defer func() {
cancel()
_ = conn.Close()
if err = putter.Done(swarm.ZeroAddress); err != nil {
logger.Error(err, "chunk upload stream: syncing chunks failed")

// No cleanup needed for batch cache - it's just metadata

// Only call Done on connection-level putter if it exists
if putter != nil {
if err = putter.Done(swarm.ZeroAddress); err != nil {
logger.Error(err, "chunk upload stream: syncing chunks failed")
}
}
}()

Expand Down Expand Up @@ -190,14 +240,71 @@ func (s *Service) handleUploadStream(
return
}

chunk, err := cac.NewWithDataSpan(msg)
// Decode the message using the appropriate decoder
chunkData, stamp, err := decode(msg)
if err != nil {
logger.Debug("chunk upload stream: create chunk failed", "error", err)
logger.Debug("chunk upload stream: decode failed", "error", err)
logger.Error(nil, "chunk upload stream: "+err.Error())
sendErrorClose(websocket.CloseInternalServerErr, err.Error())
return
}

// Determine the putter to use
var (
chunk swarm.Chunk
chunkPutter = putter
)

// If stamp was extracted, create a per-chunk putter
if stamp != nil {
batchID := stamp.BatchID()
batchIDHex := string(batchID)

storedBatch, exists := batchCache[batchIDHex]
if !exists {
storedBatch, err = s.batchStore.Get(batchID)
if err != nil {
logger.Debug("chunk upload stream: batch validation failed", "error", err)
logger.Error(nil, "chunk upload stream: batch validation failed")
if errors.Is(err, storage.ErrNotFound) {
sendErrorClose(websocket.CloseInternalServerErr, "batch not found")
} else {
sendErrorClose(websocket.CloseInternalServerErr, "batch validation failed")
}
return
}
batchCache[batchIDHex] = storedBatch
}

chunkPutter, err = s.newStampedPutterWithBatch(ctx, putterOptions{
BatchID: batchID,
TagID: tag,
Deferred: tag != 0,
}, stamp, storedBatch)
if err != nil {
logger.Debug("chunk upload stream: failed to create stamped putter", "error", err)
logger.Error(nil, "chunk upload stream: failed to create stamped putter")
switch {
case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable):
sendErrorClose(websocket.CloseInternalServerErr, "batch not usable")
case errors.Is(err, postage.ErrNotFound):
sendErrorClose(websocket.CloseInternalServerErr, "batch not found")
default:
sendErrorClose(websocket.CloseInternalServerErr, "stamped putter creation failed")
}
return
}
}

chunk, err = cac.NewWithDataSpan(chunkData)
if err != nil {
logger.Debug("chunk upload stream: create chunk failed", "error", err, "chunk_size", len(chunkData))
logger.Error(nil, "chunk upload stream: create chunk failed")
sendErrorClose(websocket.CloseInternalServerErr, "invalid chunk data")
return
}

err = putter.Put(ctx, chunk)
err = chunkPutter.Put(ctx, chunk)
if err != nil {
logger.Debug("chunk upload stream: write chunk failed", "address", chunk.Address(), "error", err)
logger.Error(nil, "chunk upload stream: write chunk failed")
Expand All @@ -210,6 +317,13 @@ func (s *Service) handleUploadStream(
return
}

// Clean up per-chunk putter
if chunkPutter != putter {
if err := chunkPutter.Done(swarm.ZeroAddress); err != nil {
logger.Error(err, "chunk upload stream: failed to finalize per-chunk putter")
}
}

err = sendMsg(websocket.BinaryMessage, successWsMsg)
if err != nil {
s.logger.Debug("chunk upload stream: sending success message failed", "error", err)
Expand Down