Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;

import org.apache.hc.core5.annotation.Contract;
Expand Down Expand Up @@ -62,6 +65,14 @@ public final class ReactiveResponseConsumer implements AsyncResponseConsumer<Voi
private final ReactiveDataConsumer reactiveDataConsumer = new ReactiveDataConsumer();
private final List<Header> trailers = Collections.synchronizedList(new ArrayList<>());
private final BasicFuture<Message<HttpResponse, Publisher<ByteBuffer>>> responseFuture;
private final CompletableFuture<Message<HttpResponse, Publisher<ByteBuffer>>> responseCompletableFuture;
private final CompletableFuture<Void> responseCompletionFuture;

/**
* Completes with {@code null} on success and with the terminal {@link Throwable} on failure.
* This future never completes exceptionally.
*/
private final CompletableFuture<Throwable> failureFuture;

private volatile BasicFuture<Void> responseCompletion;
private volatile HttpResponse informationResponse;
Expand All @@ -72,6 +83,9 @@ public final class ReactiveResponseConsumer implements AsyncResponseConsumer<Voi
*/
public ReactiveResponseConsumer() {
this.responseFuture = new BasicFuture<>(null);
this.responseCompletableFuture = new CompletableFuture<>();
this.responseCompletionFuture = new CompletableFuture<>();
this.failureFuture = new CompletableFuture<>();
}

/**
Expand All @@ -82,12 +96,65 @@ public ReactiveResponseConsumer() {
*/
public ReactiveResponseConsumer(final FutureCallback<Message<HttpResponse, Publisher<ByteBuffer>>> responseCallback) {
this.responseFuture = new BasicFuture<>(Args.notNull(responseCallback, "responseCallback"));
this.responseCompletableFuture = new CompletableFuture<>();
this.responseCompletionFuture = new CompletableFuture<>();
this.failureFuture = new CompletableFuture<>();
}

public Future<Message<HttpResponse, Publisher<ByteBuffer>>> getResponseFuture() {
return responseFuture;
}

/**
* Returns a {@link CompletableFuture} that completes when the response head and body {@link Publisher}
* are available.
*
* @since 5.5
*/
public CompletableFuture<Message<HttpResponse, Publisher<ByteBuffer>>> getResponseCompletableFuture() {
return responseCompletableFuture;
}

/**
* Returns a {@link CompletableFuture} that completes when the response exchange is complete
* (end-of-stream reached and trailers processed, if any).
*
* @since 5.5
*/
public CompletableFuture<Void> getResponseCompletionFuture() {
return responseCompletionFuture;
}

/**
* Returns a {@link CompletionStage} that completes when the response head and body {@link Publisher}
* are available.
*
* @since 5.5
*/
public CompletionStage<Message<HttpResponse, Publisher<ByteBuffer>>> getResponseStage() {
return responseCompletableFuture;
}

/**
* Returns a {@link CompletionStage} that completes when the response exchange is complete
* (end-of-stream reached and trailers processed, if any).
*
* @since 5.5
*/
public CompletionStage<Void> getResponseCompletionStage() {
return responseCompletionFuture;
}

/**
* Completes with {@code null} on success and with the terminal {@link Throwable} on failure.
* This stage never completes exceptionally.
*
* @since 5.5
*/
public CompletionStage<Throwable> getFailureStage() {
return failureFuture;
}

/**
* Returns the intermediate (1xx) HTTP response if one was received.
*
Expand Down Expand Up @@ -124,7 +191,11 @@ public void consumeResponse(
) {
this.entityDetails = entityDetails;
this.responseCompletion = new BasicFuture<>(resultCallback);
this.responseFuture.completed(new Message<>(response, reactiveDataConsumer));

final Message<HttpResponse, Publisher<ByteBuffer>> message = new Message<>(response, reactiveDataConsumer);
this.responseFuture.completed(message);
this.responseCompletableFuture.complete(message);

if (entityDetails == null) {
streamEnd(null);
}
Expand All @@ -139,8 +210,15 @@ public void informationResponse(final HttpResponse response, final HttpContext h
public void failed(final Exception cause) {
reactiveDataConsumer.failed(cause);
responseFuture.failed(cause);
if (responseCompletion != null) {
responseCompletion.failed(cause);
responseCompletableFuture.completeExceptionally(cause);
responseCompletionFuture.completeExceptionally(cause);

// Record failure as a normal completion value.
failureFuture.complete(cause);

final BasicFuture<Void> completion = responseCompletion;
if (completion != null) {
completion.failed(cause);
}
}

Expand All @@ -160,15 +238,39 @@ public void streamEnd(final List<? extends Header> trailers) {
this.trailers.addAll(trailers);
}
reactiveDataConsumer.streamEnd(trailers);
responseCompletion.completed(null);

// Complete CF before BasicFuture.completed(...) (it may trigger releaseResources()).
responseCompletionFuture.complete(null);

// Success => no failure.
failureFuture.complete(null);

final BasicFuture<Void> completion = responseCompletion;
if (completion != null) {
completion.completed(null);
}
}

@Override
public void releaseResources() {
reactiveDataConsumer.releaseResources();

responseFuture.cancel();
if (responseCompletion != null) {
responseCompletion.cancel();

if (!responseCompletableFuture.isDone()) {
responseCompletableFuture.cancel(true);
}
if (!responseCompletionFuture.isDone()) {
responseCompletionFuture.cancel(true);
}

if (!failureFuture.isDone()) {
failureFuture.complete(new CancellationException());
}

final BasicFuture<Void> completion = responseCompletion;
if (completion != null) {
completion.cancel();
}
}
}
Loading