Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 120 additions & 69 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions lib/src/main/java/io/ably/lib/realtime/Presence.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import io.ably.lib.types.PresenceMessage;
import io.ably.lib.types.PresenceSerializer;
import io.ably.lib.types.ProtocolMessage;
import io.ably.lib.types.PublishResult;
import io.ably.lib.util.Listeners;
import io.ably.lib.util.Log;
import io.ably.lib.util.StringUtils;

Expand Down Expand Up @@ -120,9 +122,9 @@ public synchronized PresenceMessage[] get(String clientId, boolean wait) throws
return get(new Param(GET_WAITFORSYNC, String.valueOf(wait)), new Param(GET_CLIENTID, clientId));
}

void addPendingPresence(PresenceMessage presenceMessage, CompletionListener listener) {
void addPendingPresence(PresenceMessage presenceMessage, Callback<PublishResult> listener) {
synchronized(channel) {
final QueuedPresence queuedPresence = new QueuedPresence(presenceMessage,listener);
final QueuedPresence queuedPresence = new QueuedPresence(presenceMessage, Listeners.unwrap(listener));
pendingPresence.add(queuedPresence);
}
}
Expand Down Expand Up @@ -763,7 +765,7 @@ public void updatePresence(PresenceMessage msg, CompletionListener listener) thr
ProtocolMessage message = new ProtocolMessage(ProtocolMessage.Action.presence, channel.name);
message.presence = new PresenceMessage[] { msg };
ConnectionManager connectionManager = ably.connection.connectionManager;
connectionManager.send(message, ably.options.queueMessages, listener);
connectionManager.send(message, ably.options.queueMessages, Listeners.fromCompletionListener(listener));
break;
default:
throw AblyException.fromErrorInfo(new ErrorInfo("Unable to enter presence channel in detached or failed state", 400, 91001));
Expand Down Expand Up @@ -892,7 +894,7 @@ private void sendQueuedMessages() {
pendingPresence.clear();

try {
connectionManager.send(message, queueMessages, listener);
connectionManager.send(message, queueMessages, Listeners.fromCompletionListener(listener));
} catch(AblyException e) {
Log.e(TAG, "sendQueuedMessages(): Unexpected exception sending message", e);
if(listener != null)
Expand Down
26 changes: 16 additions & 10 deletions lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@
import io.ably.lib.transport.ITransport.TransportParams;
import io.ably.lib.transport.NetworkConnectivity.NetworkConnectivityListener;
import io.ably.lib.types.AblyException;
import io.ably.lib.types.Callback;
import io.ably.lib.types.ClientOptions;
import io.ably.lib.types.ConnectionDetails;
import io.ably.lib.types.ErrorInfo;
import io.ably.lib.types.Param;
import io.ably.lib.types.ProtocolMessage;
import io.ably.lib.types.ProtocolSerializer;
import io.ably.lib.types.PublishResult;
import io.ably.lib.util.Log;
import io.ably.lib.util.PlatformAgentProvider;
import io.ably.lib.util.ReconnectionStrategy;
import org.jetbrains.annotations.Nullable;

public class ConnectionManager implements ConnectListener {
final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
Expand Down Expand Up @@ -1403,7 +1406,7 @@ private synchronized void onError(ProtocolMessage message) {
}

private void onAck(ProtocolMessage message) {
pendingMessages.ack(message.msgSerial, message.count, message.error);
pendingMessages.ack(message.msgSerial, message.count, message.res, message.error);
}

private void onNack(ProtocolMessage message) {
Expand Down Expand Up @@ -1724,14 +1727,14 @@ protected void setLastActivity(long lastActivityTime) {

public static class QueuedMessage {
public final ProtocolMessage msg;
public final CompletionListener listener;
public QueuedMessage(ProtocolMessage msg, CompletionListener listener) {
public final Callback<PublishResult> listener;
public QueuedMessage(ProtocolMessage msg, Callback<PublishResult> listener) {
this.msg = msg;
this.listener = listener;
}
}

public void send(ProtocolMessage msg, boolean queueEvents, CompletionListener listener) throws AblyException {
public void send(ProtocolMessage msg, boolean queueEvents, Callback<PublishResult> listener) throws AblyException {
State state;
synchronized(this) {
state = this.currentState;
Expand All @@ -1747,7 +1750,7 @@ public void send(ProtocolMessage msg, boolean queueEvents, CompletionListener li
throw AblyException.fromErrorInfo(state.defaultErrorInfo);
}

private void sendImpl(ProtocolMessage message, CompletionListener listener) throws AblyException {
private void sendImpl(ProtocolMessage message, Callback<PublishResult> listener) throws AblyException {
if(transport == null) {
Log.v(TAG, "sendImpl(): Discarding message; transport unavailable");
return;
Expand Down Expand Up @@ -1825,7 +1828,7 @@ public synchronized void push(QueuedMessage msg) {
queue.add(msg);
}

public void ack(long msgSerial, int count, ErrorInfo reason) {
public void ack(long msgSerial, int count, @Nullable PublishResult[] results, ErrorInfo reason) {
QueuedMessage[] ackMessages = null, nackMessages = null;
synchronized(this) {
if (queue.isEmpty()) return;
Expand Down Expand Up @@ -1867,11 +1870,14 @@ public void ack(long msgSerial, int count, ErrorInfo reason) {
}
}
if(ackMessages != null) {
for(QueuedMessage msg : ackMessages) {
for (int i = 0; i < ackMessages.length; i++) {
QueuedMessage msg = ackMessages[i];
try {
if(msg.listener != null)
msg.listener.onSuccess();
} catch(Throwable t) {
if (msg.listener != null) {
PublishResult messageResult = results != null && results.length > i ? results[i] : null;
msg.listener.onSuccess(messageResult);
}
} catch (Throwable t) {
Log.e(TAG, "ack(): listener exception", t);
}
}
Expand Down
10 changes: 10 additions & 0 deletions lib/src/main/java/io/ably/lib/types/ProtocolMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ public ProtocolMessage(Action action, String channel) {
@JsonAdapter(ObjectsJsonSerializer.class)
public Object[] state;

public @Nullable PublishResult[] res;

public boolean hasFlag(final Flag flag) {
return (flags & flag.getMask()) == flag.getMask();
}
Expand All @@ -161,6 +163,7 @@ void writeMsgpack(MessagePacker packer) throws IOException {
if(channelSerial != null) ++fieldCount;
if(annotations != null) ++fieldCount;
if(state != null && ObjectsHelper.getSerializer() != null) ++fieldCount;
if(res != null) ++fieldCount;
packer.packMapHeader(fieldCount);
packer.packString("action");
packer.packInt(action.getValue());
Expand Down Expand Up @@ -209,6 +212,10 @@ void writeMsgpack(MessagePacker packer) throws IOException {
Log.w(TAG, "Skipping 'state' field msgpack serialization because ObjectsSerializer not found");
}
}
if (res != null) {
packer.packString("res");
PublishResult.writeMsgpackArray(res, packer);
}
}

ProtocolMessage readMsgpack(MessageUnpacker unpacker) throws IOException {
Expand Down Expand Up @@ -280,6 +287,9 @@ ProtocolMessage readMsgpack(MessageUnpacker unpacker) throws IOException {
unpacker.skipValue();
}
break;
case "res":
res = PublishResult.readMsgpackArray(unpacker);
break;
default:
Log.v(TAG, "Unexpected field: " + fieldName);
unpacker.skipValue();
Expand Down
134 changes: 134 additions & 0 deletions lib/src/main/java/io/ably/lib/types/PublishResult.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package io.ably.lib.types;

import io.ably.lib.http.HttpCore;
import io.ably.lib.util.Serialisation;
import org.jetbrains.annotations.Nullable;
import org.msgpack.core.MessageFormat;
import org.msgpack.core.MessagePacker;
import org.msgpack.core.MessageUnpacker;

import java.io.IOException;

/**
* Contains the result of a publish operation.
*/
public class PublishResult {

private static final String SERIALS = "serials";

/**
* An array of message serials corresponding 1:1 to the messages that were published.
* A serial may be null if the message was discarded due to a configured conflation rule.
*/
public final @Nullable String[] serials;

public PublishResult(@Nullable String[] serials) {
this.serials = serials;
}

private static PublishResult readFromJson(byte[] packed) throws MessageDecodeException {
return Serialisation.gson.fromJson(new String(packed), PublishResult.class);
}

private static PublishResult readMsgpack(byte[] packed) throws AblyException {
try {
MessageUnpacker unpacker = Serialisation.msgpackUnpackerConfig.newUnpacker(packed);
return readMsgpack(unpacker);
} catch (IOException ioe) {
throw AblyException.fromThrowable(ioe);
}
}

private static PublishResult readMsgpack(MessageUnpacker unpacker) throws IOException {
int fieldCount = unpacker.unpackMapHeader();
for (int i = 0; i < fieldCount; i++) {
String fieldName = unpacker.unpackString();
MessageFormat fieldFormat = unpacker.getNextFormat();
if (fieldFormat.equals(MessageFormat.NIL)) {
unpacker.unpackNil();
continue;
}

if (fieldName.equals(SERIALS)) {
int count = unpacker.unpackArrayHeader();
String[] serials = new String[count];
for (int j = 0; j < count; j++) {
if (unpacker.getNextFormat().equals(MessageFormat.NIL)) {
unpacker.unpackNil();
serials[j] = null;
} else {
serials[j] = unpacker.unpackString();
}
}
return new PublishResult(serials);
} else {
unpacker.skipValue();
}
}
return new PublishResult(new String[]{});
}

static void writeMsgpackArray(PublishResult[] results, MessagePacker packer) {
try {
int count = results.length;
packer.packArrayHeader(count);
for (PublishResult result : results) {
if (result != null) {
result.writeMsgpack(packer);
} else {
packer.packNil();
}
}
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
}

static PublishResult[] readMsgpackArray(MessageUnpacker unpacker) throws IOException {
int count = unpacker.unpackArrayHeader();
PublishResult[] results = new PublishResult[count];
for (int i = 0; i < count; i++) {
results[i] = readMsgpack(unpacker);
}
return results;
}

public static HttpCore.BodyHandler<String> getBodyHandler() {
return new PublishResultBodyHandler();
}

private void writeMsgpack(MessagePacker packer) throws IOException {
int fieldCount = 0;
if (serials != null) ++fieldCount;
packer.packMapHeader(fieldCount);
if (serials != null) {
packer.packString(SERIALS);
packer.packArrayHeader(serials.length);
for (String serial : serials) {
if (serial == null) {
packer.packNil();
} else {
packer.packString(serial);
}
}
}
}

private static class PublishResultBodyHandler implements HttpCore.BodyHandler<String> {

@Override
public String[] handleResponseBody(String contentType, byte[] body) throws AblyException {
try {
PublishResult publishResult = null;
if ("application/json".equals(contentType))
publishResult = readFromJson(body);
else if ("application/x-msgpack".equals(contentType))
publishResult = readMsgpack(body);
return publishResult != null ? publishResult.serials : new String[]{};
} catch (MessageDecodeException e) {
throw AblyException.fromThrowable(e);
}
}
}
}

72 changes: 72 additions & 0 deletions lib/src/main/java/io/ably/lib/util/Listeners.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package io.ably.lib.util;

import io.ably.lib.realtime.CompletionListener;
import io.ably.lib.types.Callback;
import io.ably.lib.types.ErrorInfo;
import io.ably.lib.types.PublishResult;
import io.ably.lib.types.UpdateDeleteResult;

public class Listeners {

public static <T> Callback<T> fromCompletionListener(CompletionListener listener) {
return new CompletionListenerWrapper<T>(listener);
}

public static Callback<PublishResult> toPublishResultListener(Callback<UpdateDeleteResult> listener) {
return new UpdateResultToPublishAdapter(listener);
}

public static <T> CompletionListener unwrap(Callback<T> listener) {
if (listener instanceof CompletionListenerWrapper) {
return ((CompletionListenerWrapper<T>)listener).listener;
} else {
return null;
}
}

private static class CompletionListenerWrapper<T> implements Callback<T> {
private final CompletionListener listener;

private CompletionListenerWrapper(CompletionListener listener) {
this.listener = listener;
}

@Override
public void onSuccess(T result) {
if (listener != null) {
listener.onSuccess();
}
}

@Override
public void onError(ErrorInfo reason) {
if (listener != null) {
listener.onError(reason);
}
}
}

private static class UpdateResultToPublishAdapter implements Callback<PublishResult> {
private final Callback<UpdateDeleteResult> listener;

private UpdateResultToPublishAdapter(Callback<UpdateDeleteResult> listener) {
this.listener = listener;
}

@Override
public void onSuccess(PublishResult result) {
if (listener != null) {
String serial = result != null && result.serials != null && result.serials.length > 0
? result.serials[0] : null;
listener.onSuccess(new UpdateDeleteResult(serial));
}
}

@Override
public void onError(ErrorInfo reason) {
if (listener != null) {
listener.onError(reason);
}
}
}
}
Loading
Loading