diff --git a/pkg/api/api.go b/pkg/api/api.go index 5f747237bc4..38f7cff11ee 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -836,7 +836,18 @@ func (s *Service) newStampedPutter(ctx context.Context, opts putterOptions, stam return nil, errInvalidPostageBatch } + return s.newStampedPutterWithBatch(ctx, opts, stamp, storedBatch) +} + +// newStampedPutterWithBatch creates a stamped putter using a pre-fetched batch +// This avoids the database lookup when batch info is already cached +func (s *Service) newStampedPutterWithBatch(ctx context.Context, opts putterOptions, stamp *postage.Stamp, storedBatch *postage.Batch) (storer.PutterSession, error) { + if !opts.Deferred && s.beeMode == DevMode { + return nil, errUnsupportedDevNodeOperation + } + var session storer.PutterSession + var err error if opts.Deferred || opts.Pin { session, err = s.storer.Upload(ctx, opts.Pin, opts.TagID) if err != nil { diff --git a/pkg/api/chunk_stream.go b/pkg/api/chunk_stream.go index 2f91939f21a..fb083a19f88 100644 --- a/pkg/api/chunk_stream.go +++ b/pkg/api/chunk_stream.go @@ -28,7 +28,7 @@ func (s *Service) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Reques logger := s.logger.WithName("chunks_stream").Build() headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` + BatchID []byte `map:"Swarm-Postage-Batch-Id"` // Optional: can be omitted for per-chunk stamping SwarmTag uint64 `map:"Swarm-Tag"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { @@ -55,29 +55,34 @@ func (s *Service) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Reques } } - // if tag not specified use direct upload - // Using context.Background here because the putter's lifetime extends beyond that of the HTTP request. - putter, err := s.newStamperPutter(context.Background(), putterOptions{ - BatchID: headers.BatchID, - TagID: tag, - Deferred: tag != 0, - }) - if err != nil { - logger.Debug("get putter failed", "error", err) - logger.Error(nil, "get putter failed") - switch { - case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable): - jsonhttp.UnprocessableEntity(w, "batch not usable yet or does not exist") - case errors.Is(err, postage.ErrNotFound): - jsonhttp.NotFound(w, "batch with id not found") - case errors.Is(err, errInvalidPostageBatch): - jsonhttp.BadRequest(w, "invalid batch id") - case errors.Is(err, errUnsupportedDevNodeOperation): - jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation) - default: - jsonhttp.BadRequest(w, nil) + // Create connection-level putter only if BatchID is provided + // If BatchID is not provided, per-chunk stamps must be used + var putter storer.PutterSession + if len(headers.BatchID) > 0 { + // if tag not specified use direct upload + // Using context.Background here because the putter's lifetime extends beyond that of the HTTP request. + putter, err = s.newStamperPutter(context.Background(), putterOptions{ + BatchID: headers.BatchID, + TagID: tag, + Deferred: tag != 0, + }) + if err != nil { + logger.Debug("get putter failed", "error", err) + logger.Error(nil, "get putter failed") + switch { + case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable): + jsonhttp.UnprocessableEntity(w, "batch not usable yet or does not exist") + case errors.Is(err, postage.ErrNotFound): + jsonhttp.NotFound(w, "batch with id not found") + case errors.Is(err, errInvalidPostageBatch): + jsonhttp.BadRequest(w, "invalid batch id") + case errors.Is(err, errUnsupportedDevNodeOperation): + jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation) + default: + jsonhttp.BadRequest(w, nil) + } + return } - return } upgrader := websocket.Upgrader{ @@ -95,13 +100,46 @@ func (s *Service) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Reques } s.wsWg.Add(1) - go s.handleUploadStream(logger, wsConn, putter) + var decode chunkDecoder + if len(headers.BatchID) > 0 { + decode = decodeChunkWithoutStamp + } else { + decode = decodeChunkWithStamp + } + go s.handleUploadStream(logger, wsConn, putter, tag, decode) +} + +// chunkDecoder extracts chunk data and optionally a stamp from a websocket message. +// When BatchID is provided in headers, decodeChunkWithoutStamp is used (no stamp in message). +// When BatchID is not provided, decodeChunkWithStamp is used (stamp prepended to chunk data). +type chunkDecoder func(msg []byte) (chunkData []byte, stamp *postage.Stamp, err error) + +// decodeChunkWithoutStamp returns the message as-is (used when BatchID provided in headers). +func decodeChunkWithoutStamp(msg []byte) ([]byte, *postage.Stamp, error) { + return msg, nil, nil +} + +// decodeChunkWithStamp extracts a stamp from the first 113 bytes of the message. +// Returns an error if the message is too small or the stamp is invalid. +func decodeChunkWithStamp(msg []byte) ([]byte, *postage.Stamp, error) { + if len(msg) < postage.StampSize+swarm.SpanSize { + return nil, nil, errors.New("message too small for stamp + chunk") + } + + stamp := &postage.Stamp{} + if err := stamp.UnmarshalBinary(msg[:postage.StampSize]); err != nil { + return nil, nil, errors.New("invalid stamp") + } + + return msg[postage.StampSize:], stamp, nil } func (s *Service) handleUploadStream( logger log.Logger, conn *websocket.Conn, putter storer.PutterSession, + tag uint64, + decode chunkDecoder, ) { defer s.wsWg.Done() @@ -111,11 +149,23 @@ func (s *Service) handleUploadStream( gone = make(chan struct{}) err error ) + + // Cache for batch validation to avoid database lookups for every chunk + // Key: batch ID hex string, Value: stored batch info + // This avoids the expensive batchStore.Get() call for each chunk + batchCache := make(map[string]*postage.Batch) + defer func() { cancel() _ = conn.Close() - if err = putter.Done(swarm.ZeroAddress); err != nil { - logger.Error(err, "chunk upload stream: syncing chunks failed") + + // No cleanup needed for batch cache - it's just metadata + + // Only call Done on connection-level putter if it exists + if putter != nil { + if err = putter.Done(swarm.ZeroAddress); err != nil { + logger.Error(err, "chunk upload stream: syncing chunks failed") + } } }() @@ -190,14 +240,71 @@ func (s *Service) handleUploadStream( return } - chunk, err := cac.NewWithDataSpan(msg) + // Decode the message using the appropriate decoder + chunkData, stamp, err := decode(msg) if err != nil { - logger.Debug("chunk upload stream: create chunk failed", "error", err) + logger.Debug("chunk upload stream: decode failed", "error", err) + logger.Error(nil, "chunk upload stream: "+err.Error()) + sendErrorClose(websocket.CloseInternalServerErr, err.Error()) + return + } + + // Determine the putter to use + var ( + chunk swarm.Chunk + chunkPutter = putter + ) + + // If stamp was extracted, create a per-chunk putter + if stamp != nil { + batchID := stamp.BatchID() + batchIDHex := string(batchID) + + storedBatch, exists := batchCache[batchIDHex] + if !exists { + storedBatch, err = s.batchStore.Get(batchID) + if err != nil { + logger.Debug("chunk upload stream: batch validation failed", "error", err) + logger.Error(nil, "chunk upload stream: batch validation failed") + if errors.Is(err, storage.ErrNotFound) { + sendErrorClose(websocket.CloseInternalServerErr, "batch not found") + } else { + sendErrorClose(websocket.CloseInternalServerErr, "batch validation failed") + } + return + } + batchCache[batchIDHex] = storedBatch + } + + chunkPutter, err = s.newStampedPutterWithBatch(ctx, putterOptions{ + BatchID: batchID, + TagID: tag, + Deferred: tag != 0, + }, stamp, storedBatch) + if err != nil { + logger.Debug("chunk upload stream: failed to create stamped putter", "error", err) + logger.Error(nil, "chunk upload stream: failed to create stamped putter") + switch { + case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable): + sendErrorClose(websocket.CloseInternalServerErr, "batch not usable") + case errors.Is(err, postage.ErrNotFound): + sendErrorClose(websocket.CloseInternalServerErr, "batch not found") + default: + sendErrorClose(websocket.CloseInternalServerErr, "stamped putter creation failed") + } + return + } + } + + chunk, err = cac.NewWithDataSpan(chunkData) + if err != nil { + logger.Debug("chunk upload stream: create chunk failed", "error", err, "chunk_size", len(chunkData)) logger.Error(nil, "chunk upload stream: create chunk failed") + sendErrorClose(websocket.CloseInternalServerErr, "invalid chunk data") return } - err = putter.Put(ctx, chunk) + err = chunkPutter.Put(ctx, chunk) if err != nil { logger.Debug("chunk upload stream: write chunk failed", "address", chunk.Address(), "error", err) logger.Error(nil, "chunk upload stream: write chunk failed") @@ -210,6 +317,13 @@ func (s *Service) handleUploadStream( return } + // Clean up per-chunk putter + if chunkPutter != putter { + if err := chunkPutter.Done(swarm.ZeroAddress); err != nil { + logger.Error(err, "chunk upload stream: failed to finalize per-chunk putter") + } + } + err = sendMsg(websocket.BinaryMessage, successWsMsg) if err != nil { s.logger.Debug("chunk upload stream: sending success message failed", "error", err)