Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 196 additions & 56 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions lib/src/main/java/io/ably/lib/realtime/Presence.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import io.ably.lib.types.PresenceMessage;
import io.ably.lib.types.PresenceSerializer;
import io.ably.lib.types.ProtocolMessage;
import io.ably.lib.types.PublishResult;
import io.ably.lib.util.Listeners;
import io.ably.lib.util.Log;
import io.ably.lib.util.StringUtils;

Expand Down Expand Up @@ -120,9 +122,9 @@ public synchronized PresenceMessage[] get(String clientId, boolean wait) throws
return get(new Param(GET_WAITFORSYNC, String.valueOf(wait)), new Param(GET_CLIENTID, clientId));
}

void addPendingPresence(PresenceMessage presenceMessage, CompletionListener listener) {
void addPendingPresence(PresenceMessage presenceMessage, Callback<PublishResult> listener) {
synchronized(channel) {
final QueuedPresence queuedPresence = new QueuedPresence(presenceMessage,listener);
final QueuedPresence queuedPresence = new QueuedPresence(presenceMessage, Listeners.unwrap(listener));
pendingPresence.add(queuedPresence);
}
}
Expand Down Expand Up @@ -763,7 +765,7 @@ public void updatePresence(PresenceMessage msg, CompletionListener listener) thr
ProtocolMessage message = new ProtocolMessage(ProtocolMessage.Action.presence, channel.name);
message.presence = new PresenceMessage[] { msg };
ConnectionManager connectionManager = ably.connection.connectionManager;
connectionManager.send(message, ably.options.queueMessages, listener);
connectionManager.send(message, ably.options.queueMessages, Listeners.fromCompletionListener(listener));
break;
default:
throw AblyException.fromErrorInfo(new ErrorInfo("Unable to enter presence channel in detached or failed state", 400, 91001));
Expand Down Expand Up @@ -892,7 +894,7 @@ private void sendQueuedMessages() {
pendingPresence.clear();

try {
connectionManager.send(message, queueMessages, listener);
connectionManager.send(message, queueMessages, Listeners.fromCompletionListener(listener));
} catch(AblyException e) {
Log.e(TAG, "sendQueuedMessages(): Unexpected exception sending message", e);
if(listener != null)
Expand Down
181 changes: 157 additions & 24 deletions lib/src/main/java/io/ably/lib/rest/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import io.ably.lib.types.Param;
import io.ably.lib.types.PresenceMessage;
import io.ably.lib.types.PresenceSerializer;
import io.ably.lib.types.PublishResult;
import io.ably.lib.types.UpdateDeleteResult;
import io.ably.lib.util.Crypto;

/**
Expand Down Expand Up @@ -65,6 +67,23 @@ void publish(Http http, String name, Object data) throws AblyException {
publishImpl(http, name, data).sync();
}

/**
* Publish a message on this channel using the REST API and return the result.
* Since the REST API is stateless, this request is made independently
* of any other request on this or any other channel.
* @param name the event name
* @param data the message payload;
* @return A {@link PublishResult} containing the message serial(s)
* @throws AblyException
*/
public PublishResult publishWithResult(String name, Object data) throws AblyException {
return publishWithResult(ably.http, name, data);
}

PublishResult publishWithResult(Http http, String name, Object data) throws AblyException {
return publishWithResultImpl(http, name, data).sync();
}

/**
* Publish a message on this channel using the REST API.
* Since the REST API is stateless, this request is made independently
Expand All @@ -75,7 +94,9 @@ void publish(Http http, String name, Object data) throws AblyException {
* @param listener a listener to be notified of the outcome of this message.
* <p>
* This listener is invoked on a background thread.
* @deprecated Use {@link #publishAsync(String, Object, Callback)} instead.
*/
@Deprecated
public void publishAsync(String name, Object data, CompletionListener listener) {
publishAsync(ably.http, name, data, listener);
}
Expand All @@ -84,10 +105,33 @@ void publishAsync(Http http, String name, Object data, CompletionListener listen
publishImpl(http, name, data).async(new CompletionListener.ToCallback(listener));
}

/**
* Asynchronously publish a message on this channel using the REST API.
* Since the REST API is stateless, this request is made independently
* of any other request on this or any other channel.
*
* @param name the event name
* @param data the message payload;
* @param callback a callback to be notified of the outcome of this message with the {@link PublishResult}.
* <p>
* This callback is invoked on a background thread.
*/
public void publishAsync(String name, Object data, Callback<PublishResult> callback) {
publishAsync(ably.http, name, data, callback);
}

void publishAsync(Http http, String name, Object data, Callback<PublishResult> callback) {
publishWithResultImpl(http, name, data).async(callback);
}

private Http.Request<Void> publishImpl(Http http, String name, Object data) {
return publishImpl(http, new Message[] {new Message(name, data)});
}

private Http.Request<PublishResult> publishWithResultImpl(Http http, String name, Object data) {
return publishWithResultImpl(http, new Message[] {new Message(name, data)});
}

/**
* Publish an array of messages on this channel. When there are
* multiple messages to be sent, it is more efficient to use this
Expand Down Expand Up @@ -149,6 +193,43 @@ public void execute(HttpScheduler http, final Callback<Void> callback) throws Ab
});
}

private Http.Request<PublishResult> publishWithResultImpl(Http http, final Message[] messages) {
return http.request(new Http.Execute<PublishResult>() {
@Override
public void execute(HttpScheduler http, final Callback<PublishResult> callback) throws AblyException {
/* handle message ids */
boolean hasClientSuppliedId = false;
for(Message message : messages) {
/* RSL1k2 */
hasClientSuppliedId |= (message.id != null);
/* RTL6g3 */
ably.auth.checkClientId(message, true, false);
message.encode(options);
}
if(!hasClientSuppliedId && ably.options.idempotentRestPublishing) {
/* RSL1k1: populate the message id with a library-generated id */
String messageId = Crypto.getRandomId();
for (int i = 0; i < messages.length; i++) {
messages[i].id = messageId + ':' + i;
}
}

HttpCore.RequestBody requestBody = ably.options.useBinaryProtocol ? MessageSerializer.asMsgpackRequest(messages) : MessageSerializer.asJsonRequest(messages);
final Param[] params = ably.options.addRequestIds ? Param.array(Crypto.generateRandomRequestId()) : null; // RSC7c

// Create ResponseHandler from BodyHandler
HttpCore.BodyHandler<String> bodyHandler = PublishResult.getBodyHandler();
HttpCore.ResponseHandler<PublishResult> responseHandler = (response, error) -> {
if (error != null) throw AblyException.fromErrorInfo(error);
String[] serials = bodyHandler.handleResponseBody(response.contentType, response.body);
return new PublishResult(serials);
};

http.post(basePath + "/messages", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, requestBody, responseHandler, true, callback);
}
});
}

/**
* Obtain recent history for this channel using the REST API.
* The history provided relqtes to all clients of this application,
Expand Down Expand Up @@ -352,9 +433,10 @@ public void getMessageAsync(String serial, Callback<Message> callback) {
* Only non-null fields will be applied to the existing message.
* @param operation operation metadata such as clientId, description, or metadata in the version field
* @throws AblyException If the update operation fails.
* @return A {@link UpdateDeleteResult} containing the updated message version serial.
*/
public void updateMessage(Message message, MessageOperation operation) throws AblyException {
messageEditsMixin.updateMessage(ably.http, message, operation);
public UpdateDeleteResult updateMessage(Message message, MessageOperation operation) throws AblyException {
return messageEditsMixin.updateMessage(ably.http, message, operation);
}

/**
Expand All @@ -366,34 +448,35 @@ public void updateMessage(Message message, MessageOperation operation) throws Ab
* @param message A {@link Message} object containing the fields to update and the serial identifier.
* Only non-null fields will be applied to the existing message.
* @throws AblyException If the update operation fails.
* @return A {@link UpdateDeleteResult} containing the updated message version serial.
*/
public void updateMessage(Message message) throws AblyException {
updateMessage(message, null);
public UpdateDeleteResult updateMessage(Message message) throws AblyException {
return updateMessage(message, null);
}

/**
* Asynchronously updates an existing message.
*
* @param message A {@link Message} object containing the fields to update and the serial identifier.
* @param operation operation metadata such as clientId, description, or metadata in the version field
* @param listener A listener to be notified of the outcome of this operation.
* @param callback A callback to be notified of the outcome of this operation.
* <p>
* This listener is invoked on a background thread.
* This callback is invoked on a background thread.
*/
public void updateMessageAsync(Message message, MessageOperation operation, CompletionListener listener) {
messageEditsMixin.updateMessageAsync(ably.http, message, operation, listener);
public void updateMessageAsync(Message message, MessageOperation operation, Callback<UpdateDeleteResult> callback) {
messageEditsMixin.updateMessageAsync(ably.http, message, operation, callback);
}

/**
* Asynchronously updates an existing message.
*
* @param message A {@link Message} object containing the fields to update and the serial identifier.
* @param listener A listener to be notified of the outcome of this operation.
* @param callback A callback to be notified of the outcome of this operation.
* <p>
* This listener is invoked on a background thread.
* This callback is invoked on a background thread.
*/
public void updateMessageAsync(Message message, CompletionListener listener) {
updateMessageAsync(message, null, listener);
public void updateMessageAsync(Message message, Callback<UpdateDeleteResult> callback) {
updateMessageAsync(message, null, callback);
}

/**
Expand All @@ -406,9 +489,10 @@ public void updateMessageAsync(Message message, CompletionListener listener) {
* @param message A {@link Message} message containing the serial identifier.
* @param operation operation metadata such as clientId, description, or metadata in the version field
* @throws AblyException If the delete operation fails.
* @return A {@link UpdateDeleteResult} containing the deleted message version serial.
*/
public void deleteMessage(Message message, MessageOperation operation) throws AblyException {
messageEditsMixin.deleteMessage(ably.http, message, operation);
public UpdateDeleteResult deleteMessage(Message message, MessageOperation operation) throws AblyException {
return messageEditsMixin.deleteMessage(ably.http, message, operation);
}

/**
Expand All @@ -420,34 +504,83 @@ public void deleteMessage(Message message, MessageOperation operation) throws Ab
*
* @param message A {@link Message} message containing the serial identifier.
* @throws AblyException If the delete operation fails.
* @return A {@link UpdateDeleteResult} containing the deleted message version serial.
*/
public void deleteMessage(Message message) throws AblyException {
deleteMessage(message, null);
public UpdateDeleteResult deleteMessage(Message message) throws AblyException {
return deleteMessage(message, null);
}

/**
* Asynchronously marks a message as deleted.
*
* @param message A {@link Message} object containing the serial identifier and operation metadata.
* @param operation operation metadata such as clientId, description, or metadata in the version field
* @param listener A listener to be notified of the outcome of this operation.
* @param callback A callback to be notified of the outcome of this operation.
* <p>
* This listener is invoked on a background thread.
* This callback is invoked on a background thread.
*/
public void deleteMessageAsync(Message message, MessageOperation operation, CompletionListener listener) {
messageEditsMixin.deleteMessageAsync(ably.http, message, operation, listener);
public void deleteMessageAsync(Message message, MessageOperation operation, Callback<UpdateDeleteResult> callback) {
messageEditsMixin.deleteMessageAsync(ably.http, message, operation, callback);
}

/**
* Asynchronously marks a message as deleted.
*
* @param message A {@link Message} object containing the serial identifier and operation metadata.
* @param listener A listener to be notified of the outcome of this operation.
* @param callback A callback to be notified of the outcome of this operation.
* <p>
* This listener is invoked on a background thread.
* This callback is invoked on a background thread.
*/
public void deleteMessageAsync(Message message, Callback<UpdateDeleteResult> callback) {
deleteMessageAsync(message, null, callback);
}

/**
* Appends message text to the end of the message.
*
* @param message A {@link Message} object containing the serial identifier and data to append.
* @param operation operation details such as clientId, description, or metadata
* @return A {@link UpdateDeleteResult} containing the updated message version serial.
* @throws AblyException If the append operation fails.
*/
public UpdateDeleteResult appendMessage(Message message, MessageOperation operation) throws AblyException {
return messageEditsMixin.appendMessage(ably.http, message, operation);
}

/**
* Appends message text to the end of the message.
*
* @param message A {@link Message} object containing the serial identifier and data to append.
* @return A {@link UpdateDeleteResult} containing the updated message version serial.
* @throws AblyException If the append operation fails.
*/
public UpdateDeleteResult appendMessage(Message message) throws AblyException {
return appendMessage(message, null);
}

/**
* Asynchronously appends message text to the end of the message.
*
* @param message A {@link Message} object containing the serial identifier and data to append.
* @param operation operation details such as clientId, description, or metadata
* @param callback A callback to be notified of the outcome of this operation.
* <p>
* This callback is invoked on a background thread.
*/
public void appendMessageAsync(Message message, MessageOperation operation, Callback<UpdateDeleteResult> callback) {
messageEditsMixin.appendMessageAsync(ably.http, message, operation, callback);
}

/**
* Asynchronously appends message text to the end of the message.
*
* @param message A {@link Message} object containing the serial identifier and data to append.
* @param callback A callback to be notified of the outcome of this operation.
* <p>
* This callback is invoked on a background thread.
*/
public void deleteMessageAsync(Message message, CompletionListener listener) {
deleteMessageAsync(message, null, listener);
public void appendMessageAsync(Message message, Callback<UpdateDeleteResult> callback) {
appendMessageAsync(message, null, callback);
}

/**
Expand Down
Loading
Loading