Skip to content

Conversation

@haubur
Copy link

@haubur haubur commented Dec 30, 2025

Implementing Client Side (De-)compression

Closes #30

  • adds ClientCompressionConfig based on CompressionAlgorithm with min_size
  • client producer can be set with compressor aka ClientCompressionConfig
  • compression type is added to user-headers in IggyMessage
  • consumer checks for iggy-compression user header and decompresses if present
  • adds unit tests for compression algorithms
  • adds integration test for compression

@hubcio opened as Draft PR. If implementation is fine, I would extend iggy-bench accordingly and run benchmarks.

}

#[test]
fn test_emtpy_input_compression_decompression() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

}
}

pub fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, IggyError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use MAX_PAYLOAD_SIZE for maximum payload size, we dont want to unpack 10GB from couple of KBs :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, done

Copy link
Contributor

@hubcio hubcio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont have hard opinion on it, but maybe you could move compression/decompression layer to separate layer like codec:

  // core/common/src/types/compression/codec.rs
  pub struct MessageCodec {
      config: ClientCompressionConfig,
  }

  impl MessageCodec {
      pub fn maybe_compress(&self, message: &mut IggyMessage) -> Result<(), IggyError> { ... }
      pub fn maybe_decompress(&self, message: &mut IggyMessage) -> Result<(), IggyError> { ... }
  }

@haubur
Copy link
Author

haubur commented Jan 8, 2026

I also have run some benchmarks using iggy-bench which I add here.

iggy-bench -k -H --message-size 4000 --message-batches 500 pinned-producer --streams 10 --producers 10 tcp
Producers Results: Total throughput: 527.52 MB/s, 131879 messages/s, average throughput per Producer: 52.75 MB/s, p50 latency: 58.02 ms, p90 latency: 87.00 ms, p95 latency: 101.18 ms, p99 latency: 528.91 ms, p999 latency: 1119.48 ms, p9999 latency: 1228.77 ms, average latency: 77.08 ms, median latency: 58.02 ms, min: 32.34 ms, max: 1765.82 ms, std dev: 47.53 ms, total time: 41.72 s

iggy-bench -k -H -c gzip --compression-min-size 200 message-size 4000 --message-batches 500 pinned-producer --streams 10 --producers 10 tcp
Producers Results: Total throughput: 1866.32 MB/s, 466577 messages/s, average throughput per Producer: 186.63 MB/s, p50 latency: 20.77 ms, p90 latency: 25.86 ms, p95 latency: 27.82 ms, p99 latency: 32.08 ms, p999 latency: 37.21 ms, p9999 latency: 38.78 ms, average latency: 21.38 ms, median latency: 20.77 ms, min: 14.31 ms, max: 48.67 ms, std dev: 1.19 ms, total time: 10.95 s

iggy-bench -k -H -c zstd --compression-min-size 200 message-size 4000 --message-batches 500 pinned-producer --streams 10 --producers 10 tcp
Producers Results: Total throughput: 1154.30 MB/s, 288630 messages/s, average throughput per Producer: 115.43 MB/s, p50 latency: 33.61 ms, p90 latency: 44.58 ms, p95 latency: 47.28 ms, p99 latency: 51.46 ms, p999 latency: 62.09 ms, p9999 latency: 63.95 ms, average latency: 34.57 ms, median latency: 33.61 ms, min: 14.44 ms, max: 70.95 ms, std dev: 2.45 ms, total time: 17.66 s

iggy-bench -k -H -c lz4 --compression-min-size 200 message-size 4000 --message-batches 500 pinned-producer --streams 10 --producers 10 tcp
Producers Results: Total throughput: 3124.63 MB/s, 781157 messages/s, average throughput per Producer: 312.46 MB/s, p50 latency: 11.90 ms, p90 latency: 17.73 ms, p95 latency: 18.67 ms, p99 latency: 21.08 ms, p999 latency: 31.94 ms, p9999 latency: 39.10 ms, average latency: 12.73 ms, median latency: 11.90 ms, min: 6.24 ms, max: 97.69 ms, std dev: 2.12 ms, total time: 6.66 s

iggy-bench -k -H -c snappy --compression-min-size 200 message-size 4000 --message-batches 500 pinned-producer --streams 10 --producers 10 tcp
Producers Results: Total throughput: 2671.81 MB/s, 667961 messages/s, average throughput per Producer: 267.18 MB/s, p50 latency: 13.63 ms, p90 latency: 21.14 ms, p95 latency: 23.98 ms, p99 latency: 29.64 ms, p999 latency: 37.00 ms, p9999 latency: 38.89 ms, average latency: 14.94 ms, median latency: 13.63 ms, min: 7.28 ms, max: 58.25 ms, std dev: 3.21 ms, total time: 7.76 s

@haubur haubur marked this pull request as ready for review January 8, 2026 18:52
Copy link
Contributor

@hubcio hubcio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code looks better. however i'm not fan of unwraps in consumer.rs and producer.rs.
could you please add a helper methods in compression_algorithm.rs, something like:

impl CompressionAlgorithm {
      pub fn header_key() -> HeaderKey {
          HeaderKey::new(COMPRESSION_HEADER_KEY).expect("COMPRESSION_HEADER_KEY is valid")
      }

      /// Convert algorithm to header value
      pub fn to_header_value(&self) -> HeaderValue {
          HeaderValue::from_str(&self.to_string()).expect("algorithm name is valid")
      }

      /// Parse from header value
      pub fn from_header_value(value: &HeaderValue) -> Result<Self, IggyError> {
          let name = value.as_str()
              .ok_or_else(|| IggyError::DecompressionError("compression header is not a string".into()))?;
          Self::from_str(name)
              .map_err(|_| IggyError::DecompressionError(format!("unknown compression algorithm: {}", name)))
      }
}

(or introduce new IggyError type, up to you)

@hubcio
Copy link
Contributor

hubcio commented Jan 10, 2026

Also, it seems like this bench is broken:
target/release/iggy-bench -k -H -c lz4 --compression-min-size 200 --message-size 4000 --message-batches 500 pinned-producer-and-consumer tcp
consumers never finish.

@haubur haubur changed the title (feat)/sdk client compression feat(sdk): add client side compression for high-level rust API Jan 10, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Client-side message compression feature

2 participants