Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/sdk/server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ ext.versions = [
"launchdarklyJavaSdkInternal": "1.6.1",
"launchdarklyLogging": "1.1.0",
"okhttp": "4.12.0", // specify this for the SDK build instead of relying on the transitive dependency from okhttp-eventsource
"okhttpEventsource": "4.1.0",
"okhttpEventsource": "4.2.0",
"reactorCore":"3.3.22.RELEASE",
"slf4j": "1.7.36",
"snakeyaml": "2.4",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
import com.launchdarkly.logging.LDLogger;
import com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event;
import com.launchdarkly.sdk.internal.fdv2.sources.Selector;
import com.launchdarkly.sdk.internal.http.HttpErrors;
import com.launchdarkly.sdk.internal.http.HttpHelpers;
import com.launchdarkly.sdk.internal.http.HttpProperties;
import com.launchdarkly.sdk.json.SerializationException;

import okhttp3.Call;
import okhttp3.Callback;
Expand Down Expand Up @@ -116,14 +114,12 @@ public void onResponse(@Nonnull Call call, @Nonnull Response response) {
// Handle 304 Not Modified - no new data
if (response.code() == 304) {
logger.debug("FDv2 polling request returned 304: not modified");
future.complete(null);
future.complete(FDv2PayloadResponse.none(response.code()));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having null indicate no transfer I made an explicit none.

return;
}

if (!response.isSuccessful()) {
future.completeExceptionally(
new HttpErrors.HttpErrorException(response.code())
);
future.complete(FDv2PayloadResponse.failure(response.code(), response.headers()));
return;
}

Expand All @@ -145,7 +141,7 @@ public void onResponse(@Nonnull Call call, @Nonnull Response response) {
List<FDv2Event> events = FDv2Event.parseEventsArray(responseBody);

// Create and return the response
FDv2PayloadResponse pollingResponse = new FDv2PayloadResponse(events, response.headers());
FDv2PayloadResponse pollingResponse = FDv2PayloadResponse.success(events, response.headers(), response.code());
future.complete(pollingResponse);

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,31 @@ interface FDv2Requestor {
* to get from one payload version to another.
* This isn't intended for use for implementations which may require multiple executions to get an entire payload.
*/
public static class FDv2PayloadResponse {
public static class FDv2PayloadResponse {
private final List<FDv2Event> events;
private final Headers headers;

public FDv2PayloadResponse(List<FDv2Event> events, Headers headers) {
private final boolean successful;

private final int statusCode;

private FDv2PayloadResponse(List<FDv2Event> events, Headers headers, boolean success, int statusCode) {
this.events = events;
this.headers = headers;
this.successful = success;
this.statusCode = statusCode;
}

public static FDv2PayloadResponse failure(int statusCode, Headers headers) {
return new FDv2PayloadResponse(null, headers, false, statusCode);
}

public static FDv2PayloadResponse success(List<FDv2Event> events, Headers headers, int statusCode) {
return new FDv2PayloadResponse(events, headers, true, statusCode);
}

public static FDv2PayloadResponse none(int statusCode) {
return new FDv2PayloadResponse(null, null, true, statusCode);
}

public List<FDv2Event> getEvents() {
Expand All @@ -34,6 +52,14 @@ public List<FDv2Event> getEvents() {
public Headers getHeaders() {
return headers;
}

public boolean isSuccess() {
return successful;
}

public int getStatusCode() {
return statusCode;
}
}
CompletableFuture<FDv2PayloadResponse> Poll(Selector selector);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.launchdarkly.sdk.server;

enum HeaderConstants {
ENVIRONMENT_ID("x-ld-envid"),
FDV1_FALLBACK("x-ld-fd-fallback");

private final String headerName;

HeaderConstants(String headerName) {
this.headerName = headerName;
}

public String getHeaderName() {
return headerName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event;
import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ProtocolHandler;
import com.launchdarkly.sdk.internal.fdv2.sources.Selector;
import com.launchdarkly.sdk.internal.http.HttpErrors;
import com.launchdarkly.sdk.server.datasources.FDv2SourceResult;
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes;
Expand All @@ -29,23 +28,28 @@ protected void internalShutdown() {
requestor.close();
}

private static boolean getFallback(FDv2Requestor.FDv2PayloadResponse response) {
if (response != null && response.getHeaders() != null) {
String headerValue = response.getHeaders().get(HeaderConstants.FDV1_FALLBACK.getHeaderName());
return headerValue != null && headerValue.equalsIgnoreCase("true");
}

return false;
}

private static String getEnvironmentId(FDv2Requestor.FDv2PayloadResponse response) {
if (response != null && response.getHeaders() != null) {
return response.getHeaders().get(HeaderConstants.ENVIRONMENT_ID.getHeaderName());
}
return null;
}

protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean oneShot) {
return requestor.Poll(selector).handle(((pollingResponse, ex) -> {
boolean fdv1Fallback = getFallback(pollingResponse);
String environmentId = getEnvironmentId(pollingResponse);
if (ex != null) {
if (ex instanceof HttpErrors.HttpErrorException) {
HttpErrors.HttpErrorException e = (HttpErrors.HttpErrorException) ex;
DataSourceStatusProvider.ErrorInfo errorInfo = DataSourceStatusProvider.ErrorInfo.fromHttpError(e.getStatus());
// Errors without an HTTP status are recoverable. If there is a status, then we check if the error
// is recoverable.
boolean recoverable = e.getStatus() <= 0 || isHttpErrorRecoverable(e.getStatus());
logger.error("Polling request failed with HTTP error: {}", e.getStatus());
// For a one-shot request all errors are terminal.
if (oneShot) {
return FDv2SourceResult.terminalError(errorInfo);
} else {
return recoverable ? FDv2SourceResult.interrupted(errorInfo) : FDv2SourceResult.terminalError(errorInfo);
}
} else if (ex instanceof IOException) {
if (ex instanceof IOException) {
IOException e = (IOException) ex;
logger.error("Polling request failed with network error: {}", e.toString());
DataSourceStatusProvider.ErrorInfo info = new DataSourceStatusProvider.ErrorInfo(
Expand All @@ -54,7 +58,7 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
e.toString(),
new Date().toInstant()
);
return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info);
return oneShot ? FDv2SourceResult.terminalError(info, fdv1Fallback) : FDv2SourceResult.interrupted(info, fdv1Fallback);
} else if (ex instanceof SerializationException) {
SerializationException e = (SerializationException) ex;
logger.error("Polling request received malformed data: {}", e.toString());
Expand All @@ -64,7 +68,7 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
e.toString(),
new Date().toInstant()
);
return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info);
return oneShot ? FDv2SourceResult.terminalError(info, fdv1Fallback) : FDv2SourceResult.interrupted(info, fdv1Fallback);
}
String msg = ex.toString();
logger.error("Polling request failed with an unknown error: {}", msg);
Expand All @@ -74,17 +78,30 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
msg,
new Date().toInstant()
);
return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info);
return oneShot ? FDv2SourceResult.terminalError(info, fdv1Fallback) : FDv2SourceResult.interrupted(info, fdv1Fallback);
}
// A null polling response indicates that we received a 304, which means nothing has changed.
if (pollingResponse == null) {
// If we get a 304, then that means nothing has changed.
if (pollingResponse.getStatusCode() == 304) {
return FDv2SourceResult.changeSet(
new DataStoreTypes.ChangeSet<>(DataStoreTypes.ChangeSetType.None,
Selector.EMPTY,
null,
// TODO: Implement environment ID support.
null
));
null // Header derived values will have been handled on initial response.
),
// Headers would have been processed from the initial response.
false);
}
if(!pollingResponse.isSuccess()) {
int statusCode = pollingResponse.getStatusCode();
boolean recoverable = statusCode <= 0 || isHttpErrorRecoverable(statusCode);
DataSourceStatusProvider.ErrorInfo errorInfo = DataSourceStatusProvider.ErrorInfo.fromHttpError(statusCode);
logger.error("Polling request failed with HTTP error: {}", statusCode);
// For a one-shot request all errors are terminal.
if (oneShot) {
return FDv2SourceResult.terminalError(errorInfo, fdv1Fallback);
} else {
return recoverable ? FDv2SourceResult.interrupted(errorInfo, fdv1Fallback) : FDv2SourceResult.terminalError(errorInfo, fdv1Fallback);
}
}
FDv2ProtocolHandler handler = new FDv2ProtocolHandler();
for (FDv2Event event : pollingResponse.getEvents()) {
Expand All @@ -96,10 +113,9 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> converted = FDv2ChangeSetTranslator.toChangeSet(
((FDv2ProtocolHandler.FDv2ActionChangeset) res).getChangeset(),
logger,
// TODO: Implement environment ID support.
null
environmentId
);
return FDv2SourceResult.changeSet(converted);
return FDv2SourceResult.changeSet(converted, fdv1Fallback);
} catch (Exception e) {
// TODO: Do we need to be more specific about the exception type here?
DataSourceStatusProvider.ErrorInfo info = new DataSourceStatusProvider.ErrorInfo(
Expand All @@ -108,7 +124,7 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
e.toString(),
new Date().toInstant()
);
return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info);
return oneShot ? FDv2SourceResult.terminalError(info, fdv1Fallback) : FDv2SourceResult.interrupted(info, fdv1Fallback);
}
case ERROR: {
FDv2ProtocolHandler.FDv2ActionError error = ((FDv2ProtocolHandler.FDv2ActionError) res);
Expand All @@ -117,10 +133,10 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
0,
error.getReason(),
new Date().toInstant());
return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info);
return oneShot ? FDv2SourceResult.terminalError(info, fdv1Fallback) : FDv2SourceResult.interrupted(info, fdv1Fallback);
}
case GOODBYE:
return FDv2SourceResult.goodbye(((FDv2ProtocolHandler.FDv2ActionGoodbye) res).getReason());
return FDv2SourceResult.goodbye(((FDv2ProtocolHandler.FDv2ActionGoodbye) res).getReason(), fdv1Fallback);
case NONE:
break;
case INTERNAL_ERROR: {
Expand All @@ -141,7 +157,7 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
0,
"Internal error occurred during polling",
new Date().toInstant());
return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info);
return oneShot ? FDv2SourceResult.terminalError(info, fdv1Fallback) : FDv2SourceResult.interrupted(info, fdv1Fallback);
}
}
}
Expand All @@ -152,7 +168,7 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
"Unexpected end of polling response",
new Date().toInstant()
);
return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info);
return oneShot ? FDv2SourceResult.terminalError(info, fdv1Fallback) : FDv2SourceResult.interrupted(info, fdv1Fallback);
}));
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.launchdarkly.sdk.server;

import com.launchdarkly.logging.LDLogger;
import com.launchdarkly.sdk.internal.fdv2.sources.Selector;
import com.launchdarkly.sdk.server.datasources.FDv2SourceResult;
import com.launchdarkly.sdk.server.datasources.Initializer;
import com.launchdarkly.sdk.server.datasources.SelectorSource;
Expand Down
Loading