-
Notifications
You must be signed in to change notification settings - Fork 27
[AIT-196] feat: Rest mutable message support plus protocol v5 changes #659
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,10 +10,21 @@ | |
|
|
||
| from ably.http.paginatedresult import PaginatedResult, format_params | ||
| from ably.types.channeldetails import ChannelDetails | ||
| from ably.types.message import Message, make_message_response_handler | ||
| from ably.types.message import ( | ||
| Message, | ||
| MessageAction, | ||
| MessageVersion, | ||
| make_message_response_handler, | ||
| make_single_message_response_handler, | ||
| ) | ||
| from ably.types.operations import MessageOperation, PublishResult, UpdateDeleteResult | ||
| from ably.types.presence import Presence | ||
| from ably.util.crypto import get_cipher | ||
| from ably.util.exceptions import IncompatibleClientIdException, catch_all | ||
| from ably.util.exceptions import ( | ||
| AblyException, | ||
| IncompatibleClientIdException, | ||
| catch_all, | ||
| ) | ||
|
|
||
| log = logging.getLogger(__name__) | ||
|
|
||
|
|
@@ -99,7 +110,17 @@ async def publish_messages(self, messages, params=None, timeout=None): | |
| if params: | ||
| params = {k: str(v).lower() if type(v) is bool else v for k, v in params.items()} | ||
| path += '?' + parse.urlencode(params) | ||
| return await self.ably.http.post(path, body=request_body, timeout=timeout) | ||
| response = await self.ably.http.post(path, body=request_body, timeout=timeout) | ||
|
|
||
| # Parse response to extract serials | ||
| try: | ||
| result_data = response.to_native() | ||
| if result_data and isinstance(result_data, dict): | ||
| return PublishResult.from_dict(result_data) | ||
| return PublishResult() | ||
| except Exception: | ||
| # If response parsing fails, return empty PublishResult for backwards compatibility | ||
| return PublishResult() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks like quite a significant breaking change - the code previously returns an HTTP I think we need to consider either (1) make this change part of a python 2.0 release, or (2) make |
||
|
|
||
| async def publish_name_data(self, name, data, timeout=None): | ||
| messages = [Message(name, data)] | ||
|
|
@@ -141,6 +162,190 @@ async def status(self): | |
| obj = response.to_native() | ||
| return ChannelDetails.from_dict(obj) | ||
|
|
||
| async def _send_update( | ||
| self, | ||
| message: Message, | ||
| action: MessageAction, | ||
| operation: MessageOperation = None, | ||
| params: dict = None, | ||
| ): | ||
| """Internal method to send update/delete/append operations.""" | ||
| if not message.serial: | ||
| raise AblyException( | ||
| "Message serial is required for update/delete/append operations", | ||
| 400, | ||
| 40000 | ||
| ) | ||
|
|
||
| if not operation: | ||
| version = None | ||
| else: | ||
| version = MessageVersion( | ||
| client_id=operation.client_id, | ||
| description=operation.description, | ||
| metadata=operation.metadata | ||
| ) | ||
|
|
||
| # Create a new message with the operation fields | ||
| update_message = Message( | ||
| name=message.name, | ||
| data=message.data, | ||
| client_id=message.client_id, | ||
| serial=message.serial, | ||
| action=action, | ||
| version=version, | ||
| ) | ||
|
|
||
| # Encrypt if needed | ||
| if self.cipher: | ||
| update_message.encrypt(self.__cipher) | ||
|
|
||
| # Serialize the message | ||
| request_body = update_message.as_dict(binary=self.ably.options.use_binary_protocol) | ||
|
|
||
| if not self.ably.options.use_binary_protocol: | ||
| request_body = json.dumps(request_body, separators=(',', ':')) | ||
| else: | ||
| request_body = msgpack.packb(request_body, use_bin_type=True) | ||
|
|
||
| # Build path with params | ||
| path = self.__base_path + 'messages/{}'.format(parse.quote_plus(message.serial, safe=':')) | ||
| if params: | ||
| params = {k: str(v).lower() if type(v) is bool else v for k, v in params.items()} | ||
| path += '?' + parse.urlencode(params) | ||
|
|
||
| # Send request | ||
| response = await self.ably.http.patch(path, body=request_body) | ||
|
|
||
| # Parse response | ||
| try: | ||
| result_data = response.to_native() | ||
| if result_data and isinstance(result_data, dict): | ||
| return UpdateDeleteResult.from_dict(result_data) | ||
| return UpdateDeleteResult() | ||
| except Exception: | ||
| return UpdateDeleteResult() | ||
|
|
||
| async def update_message(self, message: Message, operation: MessageOperation = None, params: dict = None): | ||
| """Updates an existing message on this channel. | ||
|
|
||
| Parameters: | ||
| - message: Message object to update. Must have a serial field. | ||
| - operation: Optional MessageOperation containing description and metadata for the update. | ||
| - params: Optional dict of query parameters. | ||
|
|
||
| Returns: | ||
| - UpdateDeleteResult containing the version serial of the updated message. | ||
| """ | ||
| return await self._send_update(message, MessageAction.MESSAGE_UPDATE, operation, params) | ||
|
|
||
| async def delete_message(self, message: Message, operation: MessageOperation = None, params: dict = None): | ||
| """Deletes a message on this channel. | ||
|
|
||
| Parameters: | ||
| - message: Message object to delete. Must have a serial field. | ||
| - operation: Optional MessageOperation containing description and metadata for the delete. | ||
| - params: Optional dict of query parameters. | ||
|
|
||
| Returns: | ||
| - UpdateDeleteResult containing the version serial of the deleted message. | ||
| """ | ||
| return await self._send_update(message, MessageAction.MESSAGE_DELETE, operation, params) | ||
|
|
||
| async def append_message(self, message: Message, operation: MessageOperation = None, params: dict = None): | ||
| """Appends data to an existing message on this channel. | ||
|
|
||
| Parameters: | ||
| - message: Message object with data to append. Must have a serial field. | ||
| - operation: Optional MessageOperation containing description and metadata for the append. | ||
| - params: Optional dict of query parameters. | ||
|
|
||
| Returns: | ||
| - UpdateDeleteResult containing the version serial of the appended message. | ||
| """ | ||
| return await self._send_update(message, MessageAction.MESSAGE_APPEND, operation, params) | ||
|
|
||
| async def get_message(self, serial_or_message, timeout=None): | ||
| """Retrieves a single message by its serial. | ||
|
|
||
| Parameters: | ||
| - serial_or_message: Either a string serial or a Message object with a serial field. | ||
|
|
||
| Returns: | ||
| - Message object for the requested serial. | ||
|
|
||
| Raises: | ||
| - AblyException: If the serial is missing or the message cannot be retrieved. | ||
| """ | ||
| # Extract serial from string or Message object | ||
| if isinstance(serial_or_message, str): | ||
| serial = serial_or_message | ||
| elif isinstance(serial_or_message, Message): | ||
| serial = serial_or_message.serial | ||
| else: | ||
| serial = None | ||
|
|
||
| if not serial: | ||
| raise AblyException( | ||
| 'This message lacks a serial. Make sure you have enabled "Message annotations, ' | ||
| 'updates, and deletes" in channel settings on your dashboard.', | ||
| 400, | ||
| 40003 | ||
| ) | ||
|
|
||
| # Build the path | ||
| path = self.__base_path + 'messages/' + parse.quote_plus(serial, safe=':') | ||
|
|
||
| # Make the request | ||
| response = await self.ably.http.get(path, timeout=timeout) | ||
|
|
||
| # Create Message from the response | ||
| message_handler = make_single_message_response_handler(self.__cipher) | ||
| return message_handler(response) | ||
|
|
||
| async def get_message_versions(self, serial_or_message, params=None): | ||
| """Retrieves version history for a message. | ||
|
|
||
| Parameters: | ||
| - serial_or_message: Either a string serial or a Message object with a serial field. | ||
| - params: Optional dict of query parameters for pagination (e.g., limit, start, end, direction). | ||
|
|
||
| Returns: | ||
| - PaginatedResult containing Message objects representing each version. | ||
|
|
||
| Raises: | ||
| - AblyException: If the serial is missing or versions cannot be retrieved. | ||
| """ | ||
| # Extract serial from string or Message object | ||
| if isinstance(serial_or_message, str): | ||
| serial = serial_or_message | ||
| elif isinstance(serial_or_message, Message): | ||
| serial = serial_or_message.serial | ||
| else: | ||
| serial = None | ||
|
|
||
| if not serial: | ||
| raise AblyException( | ||
| 'This message lacks a serial. Make sure you have enabled "Message annotations, ' | ||
| 'updates, and deletes" in channel settings on your dashboard.', | ||
| 400, | ||
| 40003 | ||
| ) | ||
|
|
||
| # Build the path | ||
| params_str = format_params({}, **params) if params else '' | ||
| path = self.__base_path + 'messages/' + parse.quote_plus(serial, safe=':') + '/versions' + params_str | ||
|
|
||
| # Create message handler for decoding | ||
| message_handler = make_message_response_handler(self.__cipher) | ||
|
|
||
| # Return paginated result | ||
| return await PaginatedResult.paginated_query( | ||
| self.ably.http, | ||
| url=path, | ||
| response_processor=message_handler | ||
| ) | ||
|
|
||
| @property | ||
| def ably(self): | ||
| return self.__ably | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems a bit unsafe to catch all exceptions and hide them completely. at the very least we should log the exception somewhere, but it would be better to narrow which exceptions we're catching