From f84e3c4b5a535e89f27c85a10a15d1fcaa4e72cc Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Sat, 7 Feb 2026 19:54:01 +0800 Subject: [PATCH 1/5] src: release memory for zstd contexts in `Close()` This aligns zstd streams with other compression libraries in this regard, and enables releasing memory early when the stream ends in JS instead of waiting for GC to clean up the wrapper object (which is a problem that is exacerbated in the zstd context because we do not track memory and report memory pressure to V8 yet). --- src/node_zlib.cc | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/node_zlib.cc b/src/node_zlib.cc index d0077c282ba193..88fc36cee22747 100644 --- a/src/node_zlib.cc +++ b/src/node_zlib.cc @@ -298,7 +298,6 @@ class ZstdContext : public MemoryRetainer { ZstdContext() = default; // Streaming-related, should be available for all compression libraries: - void Close(); void SetBuffers(const char* in, uint32_t in_len, char* out, uint32_t out_len); void SetFlush(int flush); void GetAfterWriteOffsets(uint32_t* avail_in, uint32_t* avail_out) const; @@ -323,6 +322,7 @@ class ZstdCompressContext final : public ZstdContext { ZstdCompressContext() = default; // Streaming-related, should be available for all compression libraries: + void Close(); void DoThreadPoolWork(); CompressionError ResetStream(); @@ -349,6 +349,7 @@ class ZstdDecompressContext final : public ZstdContext { ZstdDecompressContext() = default; // Streaming-related, should be available for all compression libraries: + void Close(); void DoThreadPoolWork(); CompressionError ResetStream(); @@ -1488,8 +1489,6 @@ CompressionError BrotliDecoderContext::GetErrorInfo() const { } } -void ZstdContext::Close() {} - void ZstdContext::SetBuffers(const char* in, uint32_t in_len, char* out, @@ -1533,6 +1532,10 @@ CompressionError ZstdCompressContext::SetParameter(int key, int value) { return {}; } +void ZstdCompressContext::Close() { + cctx_.reset(); +} + CompressionError ZstdCompressContext::Init(uint64_t pledged_src_size, std::string_view dictionary) { pledged_src_size_ = pledged_src_size; @@ -1585,6 +1588,10 @@ CompressionError ZstdDecompressContext::SetParameter(int key, int value) { return {}; } +void ZstdDecompressContext::Close() { + dctx_.reset(); +} + CompressionError ZstdDecompressContext::Init(uint64_t pledged_src_size, std::string_view dictionary) { dctx_.reset(ZSTD_createDCtx()); From 50b23389473c4e836cfb6951b7227aa9e04acd8d Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Sat, 7 Feb 2026 18:48:12 +0800 Subject: [PATCH 2/5] src: extract zlib allocation tracking into its own class This makes it a bit easier to separate concerns, and results in reduced code duplication when compiling since this code does not depend on template parameters. --- src/node_zlib.cc | 134 ++++++++++++++++++++++++++++------------------- 1 file changed, 80 insertions(+), 54 deletions(-) diff --git a/src/node_zlib.cc b/src/node_zlib.cc index 88fc36cee22747..30aeb24d77159a 100644 --- a/src/node_zlib.cc +++ b/src/node_zlib.cc @@ -370,8 +370,68 @@ class ZstdDecompressContext final : public ZstdContext { DeleteFnPtr dctx_; }; +class CompressionStreamMemoryOwner { + public: + // Allocation functions provided to zlib itself. We store the real size of + // the allocated memory chunk just before the "payload" memory we return + // to zlib. + // Because we use zlib off the thread pool, we can not report memory directly + // to V8; rather, we first store it as "unreported" memory in a separate + // field and later report it back from the main thread. + static void* AllocForZlib(void* data, uInt items, uInt size) { + size_t real_size = MultiplyWithOverflowCheck(static_cast(items), + static_cast(size)); + return AllocForBrotli(data, real_size); + } + + static constexpr size_t reserveSizeAndAlign = + std::max(sizeof(size_t), alignof(max_align_t)); + + static void* AllocForBrotli(void* data, size_t size) { + size += reserveSizeAndAlign; + CompressionStreamMemoryOwner* ctx = + static_cast(data); + char* memory = UncheckedMalloc(size); + if (memory == nullptr) [[unlikely]] { + return nullptr; + } + *reinterpret_cast(memory) = size; + ctx->unreported_allocations_.fetch_add(size, std::memory_order_relaxed); + return memory + reserveSizeAndAlign; + } + + static void FreeForZlib(void* data, void* pointer) { + if (pointer == nullptr) [[unlikely]] { + return; + } + CompressionStreamMemoryOwner* ctx = + static_cast(data); + char* real_pointer = static_cast(pointer) - reserveSizeAndAlign; + size_t real_size = *reinterpret_cast(real_pointer); + ctx->unreported_allocations_.fetch_sub(real_size, + std::memory_order_relaxed); + free(real_pointer); + } + + void* as_allocator_opaque_value() { return static_cast(this); } + + protected: + ssize_t ComputeAdjustmentToExternalAllocatedMemory() { + ssize_t report = + unreported_allocations_.exchange(0, std::memory_order_relaxed); + CHECK_IMPLIES(report < 0, zlib_memory_ >= static_cast(-report)); + zlib_memory_ += report; + return report; + } + + std::atomic unreported_allocations_{0}; + size_t zlib_memory_ = 0; +}; + template -class CompressionStream : public AsyncWrap, public ThreadPoolWork { +class CompressionStream : public AsyncWrap, + public ThreadPoolWork, + protected CompressionStreamMemoryOwner { public: enum InternalFields { kCompressionStreamBaseField = AsyncWrap::kInternalFieldCount, @@ -591,6 +651,19 @@ class CompressionStream : public AsyncWrap, public ThreadPoolWork { zlib_memory_ + unreported_allocations_); } + static void* AllocatorOpaquePointerForContext(CompressionContext* ctx) { + CompressionStream* self = ContainerOf(&CompressionStream::ctx_, ctx); + // There is nothing at the type level stopping someone from using this + // method with `ctx` being an argument that is not part of a + // CompressionStream. This check catches that potential discrepancy in debug + // builds. std::launder is necessary to keep the compiler from optimizing + // away the check in the (common) case that `CompressionContext` is a final + // class. + DCHECK_EQ(std::launder(&self->ctx_)->MemoryInfoName(), + CompressionContext{}.MemoryInfoName()); + return self->as_allocator_opaque_value(); + } + protected: CompressionContext* context() { return &ctx_; } @@ -600,55 +673,11 @@ class CompressionStream : public AsyncWrap, public ThreadPoolWork { init_done_ = true; } - // Allocation functions provided to zlib itself. We store the real size of - // the allocated memory chunk just before the "payload" memory we return - // to zlib. - // Because we use zlib off the thread pool, we can not report memory directly - // to V8; rather, we first store it as "unreported" memory in a separate - // field and later report it back from the main thread. - static void* AllocForZlib(void* data, uInt items, uInt size) { - size_t real_size = - MultiplyWithOverflowCheck(static_cast(items), - static_cast(size)); - return AllocForBrotli(data, real_size); - } - - static constexpr size_t reserveSizeAndAlign = - std::max(sizeof(size_t), alignof(max_align_t)); - - static void* AllocForBrotli(void* data, size_t size) { - size += reserveSizeAndAlign; - CompressionStream* ctx = static_cast(data); - char* memory = UncheckedMalloc(size); - if (memory == nullptr) [[unlikely]] { - return nullptr; - } - *reinterpret_cast(memory) = size; - ctx->unreported_allocations_.fetch_add(size, - std::memory_order_relaxed); - return memory + reserveSizeAndAlign; - } - - static void FreeForZlib(void* data, void* pointer) { - if (pointer == nullptr) [[unlikely]] { - return; - } - CompressionStream* ctx = static_cast(data); - char* real_pointer = static_cast(pointer) - reserveSizeAndAlign; - size_t real_size = *reinterpret_cast(real_pointer); - ctx->unreported_allocations_.fetch_sub(real_size, - std::memory_order_relaxed); - free(real_pointer); - } - // This is called on the main thread after zlib may have allocated something // in order to report it back to V8. void AdjustAmountOfExternalAllocatedMemory() { - ssize_t report = - unreported_allocations_.exchange(0, std::memory_order_relaxed); + ssize_t report = ComputeAdjustmentToExternalAllocatedMemory(); if (report == 0) return; - CHECK_IMPLIES(report < 0, zlib_memory_ >= static_cast(-report)); - zlib_memory_ += report; AsyncWrap::env()->external_memory_accounter()->Update( AsyncWrap::env()->isolate(), report); } @@ -679,8 +708,6 @@ class CompressionStream : public AsyncWrap, public ThreadPoolWork { bool closed_ = false; unsigned int refs_ = 0; uint32_t* write_result_ = nullptr; - std::atomic unreported_allocations_{0}; - size_t zlib_memory_ = 0; CompressionContext ctx_; }; @@ -757,7 +784,7 @@ class ZlibStream final : public CompressionStream { AllocScope alloc_scope(wrap); wrap->context()->SetAllocationFunctions( - AllocForZlib, FreeForZlib, static_cast(wrap)); + AllocForZlib, FreeForZlib, wrap->as_allocator_opaque_value()); wrap->context()->Init(level, window_bits, mem_level, strategy, std::move(dictionary)); } @@ -819,11 +846,10 @@ class BrotliCompressionStream final : wrap->InitStream(write_result, write_js_callback); AllocScope alloc_scope(wrap); - CompressionError err = - wrap->context()->Init( - CompressionStream::AllocForBrotli, - CompressionStream::FreeForZlib, - static_cast*>(wrap)); + CompressionError err = wrap->context()->Init( + CompressionStream::AllocForBrotli, + CompressionStream::FreeForZlib, + wrap->as_allocator_opaque_value()); if (err.IsError()) { wrap->EmitError(err); // TODO(addaleax): Sometimes we generate better error codes in C++ land, From ae69cf687875884324c42e7c3a4cff6983b52d90 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Sat, 7 Feb 2026 19:01:39 +0800 Subject: [PATCH 3/5] src: do not store compression methods on Brotli classes This addresses a long-standing TODO comment, referencing the fact that these values are either known at compile time or can be inferred from the `this` value in the context class. --- src/node_zlib.cc | 70 +++++++++++++++++++----------------------------- 1 file changed, 27 insertions(+), 43 deletions(-) diff --git a/src/node_zlib.cc b/src/node_zlib.cc index 30aeb24d77159a..8a5615b69507d5 100644 --- a/src/node_zlib.cc +++ b/src/node_zlib.cc @@ -187,9 +187,11 @@ class ZlibContext final : public MemoryRetainer { CompressionError ResetStream(); // Zlib-specific: - void Init(int level, int window_bits, int mem_level, int strategy, + void Init(int level, + int window_bits, + int mem_level, + int strategy, std::vector&& dictionary); - void SetAllocationFunctions(alloc_func alloc, free_func free, void* opaque); CompressionError SetParams(int level, int strategy); SET_MEMORY_INFO_NAME(ZlibContext) @@ -243,11 +245,6 @@ class BrotliContext : public MemoryRetainer { size_t avail_in_ = 0; size_t avail_out_ = 0; BrotliEncoderOperation flush_ = BROTLI_OPERATION_PROCESS; - // TODO(addaleax): These should not need to be stored here. - // This is currently only done this way to make implementing ResetStream() - // easier. - brotli_alloc_func alloc_ = nullptr; - brotli_free_func free_ = nullptr; void* alloc_opaque_ = nullptr; }; @@ -255,9 +252,7 @@ class BrotliEncoderContext final : public BrotliContext { public: void Close(); void DoThreadPoolWork(); - CompressionError Init(brotli_alloc_func alloc, - brotli_free_func free, - void* opaque); + CompressionError Init(); CompressionError ResetStream(); CompressionError SetParams(int key, uint32_t value); CompressionError GetErrorInfo() const; @@ -275,9 +270,7 @@ class BrotliDecoderContext final : public BrotliContext { public: void Close(); void DoThreadPoolWork(); - CompressionError Init(brotli_alloc_func alloc, - brotli_free_func free, - void* opaque); + CompressionError Init(); CompressionError ResetStream(); CompressionError SetParams(int key, uint32_t value); CompressionError GetErrorInfo() const; @@ -783,8 +776,6 @@ class ZlibStream final : public CompressionStream { wrap->InitStream(write_result, write_js_callback); AllocScope alloc_scope(wrap); - wrap->context()->SetAllocationFunctions( - AllocForZlib, FreeForZlib, wrap->as_allocator_opaque_value()); wrap->context()->Init(level, window_bits, mem_level, strategy, std::move(dictionary)); } @@ -846,10 +837,7 @@ class BrotliCompressionStream final : wrap->InitStream(write_result, write_js_callback); AllocScope alloc_scope(wrap); - CompressionError err = wrap->context()->Init( - CompressionStream::AllocForBrotli, - CompressionStream::FreeForZlib, - wrap->as_allocator_opaque_value()); + CompressionError err = wrap->context()->Init(); if (err.IsError()) { wrap->EmitError(err); // TODO(addaleax): Sometimes we generate better error codes in C++ land, @@ -1208,19 +1196,15 @@ CompressionError ZlibContext::ResetStream() { return SetDictionary(); } - -void ZlibContext::SetAllocationFunctions(alloc_func alloc, - free_func free, - void* opaque) { - strm_.zalloc = alloc; - strm_.zfree = free; - strm_.opaque = opaque; -} - - void ZlibContext::Init( int level, int window_bits, int mem_level, int strategy, std::vector&& dictionary) { + // Set allocation functions + strm_.zalloc = CompressionStreamMemoryOwner::AllocForZlib; + strm_.zfree = CompressionStreamMemoryOwner::FreeForZlib; + strm_.opaque = + CompressionStream::AllocatorOpaquePointerForContext(this); + if (!((window_bits == 0) && (mode_ == INFLATE || mode_ == GUNZIP || @@ -1402,12 +1386,12 @@ void BrotliEncoderContext::Close() { mode_ = NONE; } -CompressionError BrotliEncoderContext::Init(brotli_alloc_func alloc, - brotli_free_func free, - void* opaque) { - alloc_ = alloc; - free_ = free; - alloc_opaque_ = opaque; +CompressionError BrotliEncoderContext::Init() { + brotli_alloc_func alloc = CompressionStreamMemoryOwner::AllocForBrotli; + brotli_free_func free = CompressionStreamMemoryOwner::FreeForZlib; + void* opaque = + CompressionStream::AllocatorOpaquePointerForContext( + this); state_.reset(BrotliEncoderCreateInstance(alloc, free, opaque)); if (!state_) { return CompressionError("Could not initialize Brotli instance", @@ -1419,7 +1403,7 @@ CompressionError BrotliEncoderContext::Init(brotli_alloc_func alloc, } CompressionError BrotliEncoderContext::ResetStream() { - return Init(alloc_, free_, alloc_opaque_); + return Init(); } CompressionError BrotliEncoderContext::SetParams(int key, uint32_t value) { @@ -1467,12 +1451,12 @@ void BrotliDecoderContext::DoThreadPoolWork() { } } -CompressionError BrotliDecoderContext::Init(brotli_alloc_func alloc, - brotli_free_func free, - void* opaque) { - alloc_ = alloc; - free_ = free; - alloc_opaque_ = opaque; +CompressionError BrotliDecoderContext::Init() { + brotli_alloc_func alloc = CompressionStreamMemoryOwner::AllocForBrotli; + brotli_free_func free = CompressionStreamMemoryOwner::FreeForZlib; + void* opaque = + CompressionStream::AllocatorOpaquePointerForContext( + this); state_.reset(BrotliDecoderCreateInstance(alloc, free, opaque)); if (!state_) { return CompressionError("Could not initialize Brotli instance", @@ -1484,7 +1468,7 @@ CompressionError BrotliDecoderContext::Init(brotli_alloc_func alloc, } CompressionError BrotliDecoderContext::ResetStream() { - return Init(alloc_, free_, alloc_opaque_); + return Init(); } CompressionError BrotliDecoderContext::SetParams(int key, uint32_t value) { From 5110ea404674a9b32dcd18f949db11331121e923 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Sat, 7 Feb 2026 20:05:29 +0800 Subject: [PATCH 4/5] src: track allocations made by zstd streams This is both valuable as a diagnostic tool and as a way to inform the JS runtime about external allocations. Currently, this is a feature only enabled in the statically linked builds of zstd, so with `--shared-zstd`, we fall back to the non-tracking variant. --- node.gypi | 1 + src/node_zlib.cc | 22 ++++++++++++++ test/pummel/test-heapdump-zstd.js | 49 +++++++++++++++++++++++++++++++ 3 files changed, 72 insertions(+) create mode 100644 test/pummel/test-heapdump-zstd.js diff --git a/node.gypi b/node.gypi index ecf118efc2cad2..6587270c9e52f3 100644 --- a/node.gypi +++ b/node.gypi @@ -248,6 +248,7 @@ [ 'node_shared_zstd=="false"', { 'dependencies': [ 'deps/zstd/zstd.gyp:zstd' ], + 'defines': [ 'NODE_BUNDLED_ZSTD' ], }], [ 'OS=="mac"', { diff --git a/src/node_zlib.cc b/src/node_zlib.cc index 8a5615b69507d5..8a88b400128cd2 100644 --- a/src/node_zlib.cc +++ b/src/node_zlib.cc @@ -19,6 +19,10 @@ // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE // USE OR OTHER DEALINGS IN THE SOFTWARE. +#ifdef NODE_BUNDLED_ZLIB +#define ZSTD_STATIC_LINKING_ONLY +#endif + #include "memory_tracker-inl.h" #include "node.h" #include "node_buffer.h" @@ -1549,7 +1553,16 @@ void ZstdCompressContext::Close() { CompressionError ZstdCompressContext::Init(uint64_t pledged_src_size, std::string_view dictionary) { pledged_src_size_ = pledged_src_size; +#ifdef NODE_BUNDLED_ZSTD + ZSTD_customMem custom_mem = { + CompressionStreamMemoryOwner::AllocForBrotli, + CompressionStreamMemoryOwner::FreeForZlib, + CompressionStream::AllocatorOpaquePointerForContext( + this)}; + cctx_.reset(ZSTD_createCCtx_advanced(custom_mem)); +#else cctx_.reset(ZSTD_createCCtx()); +#endif if (!cctx_) { return CompressionError("Could not initialize zstd instance", "ERR_ZLIB_INITIALIZATION_FAILED", @@ -1604,7 +1617,16 @@ void ZstdDecompressContext::Close() { CompressionError ZstdDecompressContext::Init(uint64_t pledged_src_size, std::string_view dictionary) { +#ifdef NODE_BUNDLED_ZSTD + ZSTD_customMem custom_mem = { + CompressionStreamMemoryOwner::AllocForBrotli, + CompressionStreamMemoryOwner::FreeForZlib, + CompressionStream< + ZstdDecompressContext>::AllocatorOpaquePointerForContext(this)}; + dctx_.reset(ZSTD_createDCtx_advanced(custom_mem)); +#else dctx_.reset(ZSTD_createDCtx()); +#endif if (!dctx_) { return CompressionError("Could not initialize zstd instance", "ERR_ZLIB_INITIALIZATION_FAILED", diff --git a/test/pummel/test-heapdump-zstd.js b/test/pummel/test-heapdump-zstd.js new file mode 100644 index 00000000000000..5f642202e3fb88 --- /dev/null +++ b/test/pummel/test-heapdump-zstd.js @@ -0,0 +1,49 @@ +'use strict'; +// This tests heap snapshot integration of zlib stream. + +const common = require('../common'); +const assert = require('assert'); +const { validateByRetainingPath, validateByRetainingPathFromNodes } = require('../common/heap'); +const zlib = require('zlib'); + +// Before zstd stream is created, no ZstdStream should be created. +{ + const nodes = validateByRetainingPath('Node / ZstdStream', []); + assert.strictEqual(nodes.length, 0); +} + +const compress = zlib.createZstdCompress(); + +// After zstd stream is created, a ZstdStream should be created. +{ + const streams = validateByRetainingPath('Node / ZstdStream', []); + validateByRetainingPathFromNodes(streams, 'Node / ZstdStream', [ + { node_name: 'ZstdCompress', edge_name: 'native_to_javascript' }, + ]); + const withMemory = validateByRetainingPathFromNodes(streams, 'Node / ZstdStream', [ + { node_name: 'Node / zlib_memory', edge_name: 'zlib_memory' }, + ], true); + if (process.config.variables.node_shared_zstd === true) { + assert.strictEqual(withMemory.length, 0); + } else { + assert.strictEqual(withMemory.length, 1); + // Between 1KB and 1MB (measured value was around ~5kB) + assert.ok(withMemory[0].self_size > 1024); + assert.ok(withMemory[0].self_size < 1024 * 1024); + } +} + +// After zstd stream is written, zlib_memory is significantly larger. +compress.write('hello world', common.mustCall(() => { + const streams = validateByRetainingPath('Node / ZstdStream', []); + const withMemory = validateByRetainingPathFromNodes(streams, 'Node / ZstdStream', [ + { node_name: 'Node / zlib_memory', edge_name: 'zlib_memory' }, + ], true); + if (process.config.variables.node_shared_zstd === true) { + assert.strictEqual(withMemory.length, 0); + } else { + assert.strictEqual(withMemory.length, 1); + // More than 1MB + assert.ok(withMemory[0].self_size > 1024 * 1024); + } +})); From 26450c0bdad63564fe3676c0ff9980a5eef5cc2d Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Mon, 9 Feb 2026 10:33:40 +0100 Subject: [PATCH 5/5] fixup! src: track allocations made by zstd streams --- src/node_zlib.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/node_zlib.cc b/src/node_zlib.cc index 8a88b400128cd2..d3bd0f6f6540b4 100644 --- a/src/node_zlib.cc +++ b/src/node_zlib.cc @@ -19,7 +19,7 @@ // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE // USE OR OTHER DEALINGS IN THE SOFTWARE. -#ifdef NODE_BUNDLED_ZLIB +#ifdef NODE_BUNDLED_ZSTD #define ZSTD_STATIC_LINKING_ONLY #endif