-
Notifications
You must be signed in to change notification settings - Fork 251
feat(sdk): add client side compression for high-level rust API #2529
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
| } | ||
|
|
||
| #[test] | ||
| fn test_emtpy_input_compression_decompression() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo
| } | ||
| } | ||
|
|
||
| pub fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, IggyError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use MAX_PAYLOAD_SIZE for maximum payload size, we dont want to unpack 10GB from couple of KBs :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, done
hubcio
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i dont have hard opinion on it, but maybe you could move compression/decompression layer to separate layer like codec:
// core/common/src/types/compression/codec.rs
pub struct MessageCodec {
config: ClientCompressionConfig,
}
impl MessageCodec {
pub fn maybe_compress(&self, message: &mut IggyMessage) -> Result<(), IggyError> { ... }
pub fn maybe_decompress(&self, message: &mut IggyMessage) -> Result<(), IggyError> { ... }
}|
I also have run some benchmarks using iggy-bench which I add here. iggy-bench -k -H --message-size 4000 --message-batches 500 pinned-producer --streams 10 --producers 10 tcp iggy-bench -k -H -c gzip --compression-min-size 200 message-size 4000 --message-batches 500 pinned-producer --streams 10 --producers 10 tcp iggy-bench -k -H -c zstd --compression-min-size 200 message-size 4000 --message-batches 500 pinned-producer --streams 10 --producers 10 tcp iggy-bench -k -H -c lz4 --compression-min-size 200 message-size 4000 --message-batches 500 pinned-producer --streams 10 --producers 10 tcp iggy-bench -k -H -c snappy --compression-min-size 200 message-size 4000 --message-batches 500 pinned-producer --streams 10 --producers 10 tcp |
hubcio
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code looks better. however i'm not fan of unwraps in consumer.rs and producer.rs.
could you please add a helper methods in compression_algorithm.rs, something like:
impl CompressionAlgorithm {
pub fn header_key() -> HeaderKey {
HeaderKey::new(COMPRESSION_HEADER_KEY).expect("COMPRESSION_HEADER_KEY is valid")
}
/// Convert algorithm to header value
pub fn to_header_value(&self) -> HeaderValue {
HeaderValue::from_str(&self.to_string()).expect("algorithm name is valid")
}
/// Parse from header value
pub fn from_header_value(value: &HeaderValue) -> Result<Self, IggyError> {
let name = value.as_str()
.ok_or_else(|| IggyError::DecompressionError("compression header is not a string".into()))?;
Self::from_str(name)
.map_err(|_| IggyError::DecompressionError(format!("unknown compression algorithm: {}", name)))
}
}(or introduce new IggyError type, up to you)
|
Also, it seems like this bench is broken: |
Implementing Client Side (De-)compression
Closes #30
ClientCompressionConfigbased onCompressionAlgorithmwith min_sizecompressorakaClientCompressionConfiguser-headersinIggyMessageiggy-compressionuser header and decompresses if present@hubcio opened as Draft PR. If implementation is fine, I would extend iggy-bench accordingly and run benchmarks.