From 08abc7ebc25eca8b415aee4921aab1188303269f Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Wed, 21 Jan 2026 14:12:13 -0800 Subject: [PATCH 1/2] chore: Pipe headers through data sources. --- lib/sdk/server/build.gradle | 2 +- .../sdk/server/DefaultFDv2Requestor.java | 12 +- .../sdk/server/FDv2Requestor.java | 30 ++- .../sdk/server/HeaderConstants.java | 16 ++ .../launchdarkly/sdk/server/PollingBase.java | 82 ++++--- .../sdk/server/PollingInitializerImpl.java | 1 - .../sdk/server/StreamingSynchronizerImpl.java | 44 +++- .../server/datasources/FDv2SourceResult.java | 43 +++- .../sdk/server/DefaultFDv2RequestorTest.java | 49 ++-- .../server/PollingInitializerImplTest.java | 220 +++++++++++++++-- .../server/PollingSynchronizerImplTest.java | 227 +++++++++++++++++- .../server/StreamingSynchronizerImplTest.java | 178 ++++++++++++++ 12 files changed, 785 insertions(+), 119 deletions(-) create mode 100644 lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/HeaderConstants.java diff --git a/lib/sdk/server/build.gradle b/lib/sdk/server/build.gradle index a27e9c7..b91fe42 100644 --- a/lib/sdk/server/build.gradle +++ b/lib/sdk/server/build.gradle @@ -74,7 +74,7 @@ ext.versions = [ "launchdarklyJavaSdkInternal": "1.6.1", "launchdarklyLogging": "1.1.0", "okhttp": "4.12.0", // specify this for the SDK build instead of relying on the transitive dependency from okhttp-eventsource - "okhttpEventsource": "4.1.0", + "okhttpEventsource": "4.2.0", "reactorCore":"3.3.22.RELEASE", "slf4j": "1.7.36", "snakeyaml": "2.4", diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DefaultFDv2Requestor.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DefaultFDv2Requestor.java index 133a56a..8d55059 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DefaultFDv2Requestor.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DefaultFDv2Requestor.java @@ -3,10 +3,8 @@ import com.launchdarkly.logging.LDLogger; import com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event; import com.launchdarkly.sdk.internal.fdv2.sources.Selector; -import com.launchdarkly.sdk.internal.http.HttpErrors; import com.launchdarkly.sdk.internal.http.HttpHelpers; import com.launchdarkly.sdk.internal.http.HttpProperties; -import com.launchdarkly.sdk.json.SerializationException; import okhttp3.Call; import okhttp3.Callback; @@ -104,17 +102,15 @@ public void onFailure(@Nonnull Call call, @Nonnull IOException e) { @Override public void onResponse(@Nonnull Call call, @Nonnull Response response) { try { - // Handle 304 Not Modified - no new data + // Handle 304 Not Modified - no new data, but return response with headers if (response.code() == 304) { logger.debug("FDv2 polling request returned 304: not modified"); - future.complete(null); + future.complete(FDv2PayloadResponse.none(response.code())); return; } if (!response.isSuccessful()) { - future.completeExceptionally( - new HttpErrors.HttpErrorException(response.code()) - ); + future.complete(FDv2PayloadResponse.failure(response.code(), response.headers())); return; } @@ -136,7 +132,7 @@ public void onResponse(@Nonnull Call call, @Nonnull Response response) { List events = FDv2Event.parseEventsArray(responseBody); // Create and return the response - FDv2PayloadResponse pollingResponse = new FDv2PayloadResponse(events, response.headers()); + FDv2PayloadResponse pollingResponse = FDv2PayloadResponse.success(events, response.headers(), response.code()); future.complete(pollingResponse); } catch (Exception e) { diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2Requestor.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2Requestor.java index 8a2297e..dbebd9d 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2Requestor.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2Requestor.java @@ -18,13 +18,31 @@ interface FDv2Requestor { * to get from one payload version to another. * This isn't intended for use for implementations which may require multiple executions to get an entire payload. */ - public static class FDv2PayloadResponse { +public static class FDv2PayloadResponse { private final List events; private final Headers headers; - public FDv2PayloadResponse(List events, Headers headers) { + private final boolean successful; + + private final int statusCode; + + private FDv2PayloadResponse(List events, Headers headers, boolean success, int statusCode) { this.events = events; this.headers = headers; + this.successful = success; + this.statusCode = statusCode; + } + + public static FDv2PayloadResponse failure(int statusCode, Headers headers) { + return new FDv2PayloadResponse(null, headers, false, statusCode); + } + + public static FDv2PayloadResponse success(List events, Headers headers, int statusCode) { + return new FDv2PayloadResponse(events, headers, true, statusCode); + } + + public static FDv2PayloadResponse none(int statusCode) { + return new FDv2PayloadResponse(null, null, true, statusCode); } public List getEvents() { @@ -34,6 +52,14 @@ public List getEvents() { public Headers getHeaders() { return headers; } + + public boolean isSuccess() { + return successful; + } + + public int getStatusCode() { + return statusCode; + } } CompletableFuture Poll(Selector selector); diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/HeaderConstants.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/HeaderConstants.java new file mode 100644 index 0000000..66ad149 --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/HeaderConstants.java @@ -0,0 +1,16 @@ +package com.launchdarkly.sdk.server; + +enum HeaderConstants { + ENVIRONMENT_ID("x-ld-envid"), + FDV1_FALLBACK("x-ld-fd-fallback"); + + private final String headerName; + + HeaderConstants(String headerName) { + this.headerName = headerName; + } + + public String getHeaderName() { + return headerName; + } +} diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingBase.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingBase.java index 20a5a1f..f18309d 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingBase.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingBase.java @@ -29,23 +29,33 @@ protected void internalShutdown() { requestor.close(); } + private static boolean getFallback(FDv2Requestor.FDv2PayloadResponse response) { + if (response != null && response.getHeaders() != null) { + String headerValue = response.getHeaders().get(HeaderConstants.FDV1_FALLBACK.getHeaderName()); + return headerValue != null && headerValue.equalsIgnoreCase("true"); + } +// if(ex != null) { +// if(ex instanceof HttpErrorException) { +// ((HttpErrors.HttpErrorException) ex). +// } +// } + + return false; + } + + private static String getEnvironmentId(FDv2Requestor.FDv2PayloadResponse response) { + if (response != null && response.getHeaders() != null) { + return response.getHeaders().get(HeaderConstants.ENVIRONMENT_ID.getHeaderName()); + } + return null; + } + protected CompletableFuture poll(Selector selector, boolean oneShot) { return requestor.Poll(selector).handle(((pollingResponse, ex) -> { + boolean fdv1Fallback = getFallback(pollingResponse); + String environmentId = getEnvironmentId(pollingResponse); if (ex != null) { - if (ex instanceof HttpErrors.HttpErrorException) { - HttpErrors.HttpErrorException e = (HttpErrors.HttpErrorException) ex; - DataSourceStatusProvider.ErrorInfo errorInfo = DataSourceStatusProvider.ErrorInfo.fromHttpError(e.getStatus()); - // Errors without an HTTP status are recoverable. If there is a status, then we check if the error - // is recoverable. - boolean recoverable = e.getStatus() <= 0 || isHttpErrorRecoverable(e.getStatus()); - logger.error("Polling request failed with HTTP error: {}", e.getStatus()); - // For a one-shot request all errors are terminal. - if (oneShot) { - return FDv2SourceResult.terminalError(errorInfo); - } else { - return recoverable ? FDv2SourceResult.interrupted(errorInfo) : FDv2SourceResult.terminalError(errorInfo); - } - } else if (ex instanceof IOException) { + if (ex instanceof IOException) { IOException e = (IOException) ex; logger.error("Polling request failed with network error: {}", e.toString()); DataSourceStatusProvider.ErrorInfo info = new DataSourceStatusProvider.ErrorInfo( @@ -54,7 +64,7 @@ protected CompletableFuture poll(Selector selector, boolean on e.toString(), new Date().toInstant() ); - return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info); + return oneShot ? FDv2SourceResult.terminalError(info, fdv1Fallback) : FDv2SourceResult.interrupted(info, fdv1Fallback); } else if (ex instanceof SerializationException) { SerializationException e = (SerializationException) ex; logger.error("Polling request received malformed data: {}", e.toString()); @@ -64,7 +74,7 @@ protected CompletableFuture poll(Selector selector, boolean on e.toString(), new Date().toInstant() ); - return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info); + return oneShot ? FDv2SourceResult.terminalError(info, fdv1Fallback) : FDv2SourceResult.interrupted(info, fdv1Fallback); } String msg = ex.toString(); logger.error("Polling request failed with an unknown error: {}", msg); @@ -74,17 +84,30 @@ protected CompletableFuture poll(Selector selector, boolean on msg, new Date().toInstant() ); - return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info); + return oneShot ? FDv2SourceResult.terminalError(info, fdv1Fallback) : FDv2SourceResult.interrupted(info, fdv1Fallback); } - // A null polling response indicates that we received a 304, which means nothing has changed. - if (pollingResponse == null) { + // If we get a 304, then that means nothing has changed. + if (pollingResponse.getStatusCode() == 304) { return FDv2SourceResult.changeSet( new DataStoreTypes.ChangeSet<>(DataStoreTypes.ChangeSetType.None, Selector.EMPTY, null, - // TODO: Implement environment ID support. - null - )); + null // Header derived values will have been handled on initial response. + ), + // Headers would have been processed from the initial response. + false); + } + if(!pollingResponse.isSuccess()) { + int statusCode = pollingResponse.getStatusCode(); + boolean recoverable = statusCode <= 0 || isHttpErrorRecoverable(statusCode); + DataSourceStatusProvider.ErrorInfo errorInfo = DataSourceStatusProvider.ErrorInfo.fromHttpError(statusCode); + logger.error("Polling request failed with HTTP error: {}", statusCode); + // For a one-shot request all errors are terminal. + if (oneShot) { + return FDv2SourceResult.terminalError(errorInfo, fdv1Fallback); + } else { + return recoverable ? FDv2SourceResult.interrupted(errorInfo, fdv1Fallback) : FDv2SourceResult.terminalError(errorInfo, fdv1Fallback); + } } FDv2ProtocolHandler handler = new FDv2ProtocolHandler(); for (FDv2Event event : pollingResponse.getEvents()) { @@ -96,10 +119,9 @@ protected CompletableFuture poll(Selector selector, boolean on DataStoreTypes.ChangeSet converted = FDv2ChangeSetTranslator.toChangeSet( ((FDv2ProtocolHandler.FDv2ActionChangeset) res).getChangeset(), logger, - // TODO: Implement environment ID support. - null + environmentId ); - return FDv2SourceResult.changeSet(converted); + return FDv2SourceResult.changeSet(converted, fdv1Fallback); } catch (Exception e) { // TODO: Do we need to be more specific about the exception type here? DataSourceStatusProvider.ErrorInfo info = new DataSourceStatusProvider.ErrorInfo( @@ -108,7 +130,7 @@ protected CompletableFuture poll(Selector selector, boolean on e.toString(), new Date().toInstant() ); - return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info); + return oneShot ? FDv2SourceResult.terminalError(info, fdv1Fallback) : FDv2SourceResult.interrupted(info, fdv1Fallback); } case ERROR: { FDv2ProtocolHandler.FDv2ActionError error = ((FDv2ProtocolHandler.FDv2ActionError) res); @@ -117,10 +139,10 @@ protected CompletableFuture poll(Selector selector, boolean on 0, error.getReason(), new Date().toInstant()); - return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info); + return oneShot ? FDv2SourceResult.terminalError(info, fdv1Fallback) : FDv2SourceResult.interrupted(info, fdv1Fallback); } case GOODBYE: - return FDv2SourceResult.goodbye(((FDv2ProtocolHandler.FDv2ActionGoodbye) res).getReason()); + return FDv2SourceResult.goodbye(((FDv2ProtocolHandler.FDv2ActionGoodbye) res).getReason(), fdv1Fallback); case NONE: break; case INTERNAL_ERROR: { @@ -141,7 +163,7 @@ protected CompletableFuture poll(Selector selector, boolean on 0, "Internal error occurred during polling", new Date().toInstant()); - return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info); + return oneShot ? FDv2SourceResult.terminalError(info, fdv1Fallback) : FDv2SourceResult.interrupted(info, fdv1Fallback); } } } @@ -152,7 +174,7 @@ protected CompletableFuture poll(Selector selector, boolean on "Unexpected end of polling response", new Date().toInstant() ); - return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info); + return oneShot ? FDv2SourceResult.terminalError(info, fdv1Fallback) : FDv2SourceResult.interrupted(info, fdv1Fallback); })); } } diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingInitializerImpl.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingInitializerImpl.java index 9bc5165..856a118 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingInitializerImpl.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingInitializerImpl.java @@ -1,7 +1,6 @@ package com.launchdarkly.sdk.server; import com.launchdarkly.logging.LDLogger; -import com.launchdarkly.sdk.internal.fdv2.sources.Selector; import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; import com.launchdarkly.sdk.server.datasources.Initializer; import com.launchdarkly.sdk.server.datasources.SelectorSource; diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java index c5d52f3..6fa4487 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java @@ -166,7 +166,7 @@ private Thread getRunThread() { ); // We aren't restarting the event source here. We aren't going to automatically recover, so the // data system will move to the next source when it determined this source is unhealthy. - resultQueue.put(FDv2SourceResult.interrupted(errorInfo)); + resultQueue.put(FDv2SourceResult.interrupted(errorInfo, getFallback(e))); } finally { eventSource.close(); } @@ -249,10 +249,12 @@ private void handleMessage(MessageEvent event) { case CHANGESET: FDv2ProtocolHandler.FDv2ActionChangeset changeset = (FDv2ProtocolHandler.FDv2ActionChangeset) action; try { - // TODO: Environment ID. DataStoreTypes.ChangeSet converted = - FDv2ChangeSetTranslator.toChangeSet(changeset.getChangeset(), logger, null); - result = FDv2SourceResult.changeSet(converted); + FDv2ChangeSetTranslator.toChangeSet( + changeset.getChangeset(), + logger, + event.getHeaders().value(HeaderConstants.ENVIRONMENT_ID.getHeaderName())); + result = FDv2SourceResult.changeSet(converted, getFallback(event)); } catch (Exception e) { logger.error("Failed to convert FDv2 changeset: {}", LogValues.exceptionSummary(e)); logger.debug(LogValues.exceptionTrace(e)); @@ -262,7 +264,7 @@ private void handleMessage(MessageEvent event) { e.toString(), Instant.now() ); - result = FDv2SourceResult.interrupted(conversionError); + result = FDv2SourceResult.interrupted(conversionError, getFallback(event)); restartStream(); } break; @@ -276,7 +278,7 @@ private void handleMessage(MessageEvent event) { case GOODBYE: FDv2ProtocolHandler.FDv2ActionGoodbye goodbye = (FDv2ProtocolHandler.FDv2ActionGoodbye) action; - result = FDv2SourceResult.goodbye(goodbye.getReason()); + result = FDv2SourceResult.goodbye(goodbye.getReason(), getFallback(event)); // We drop this current connection and attempt to restart the stream. restartStream(); break; @@ -300,7 +302,7 @@ private void handleMessage(MessageEvent event) { "Internal error during FDv2 event processing", Instant.now() ); - result = FDv2SourceResult.interrupted(internalError); + result = FDv2SourceResult.interrupted(internalError, getFallback(event)); restartStream(); break; @@ -322,7 +324,7 @@ private void interruptedWithException(Exception e, DataSourceStatusProvider.Erro e.toString(), Instant.now() ); - resultQueue.put(FDv2SourceResult.interrupted(errorInfo)); + resultQueue.put(FDv2SourceResult.interrupted(errorInfo, getFallback(e))); restartStream(); } @@ -343,11 +345,11 @@ private boolean handleError(StreamException e) { "will retry"); if (!recoverable) { - shutdownFuture.complete(FDv2SourceResult.terminalError(errorInfo)); + shutdownFuture.complete(FDv2SourceResult.terminalError(errorInfo, getFallback(e))); return false; } else { // Queue as INTERRUPTED to indicate temporary failure - resultQueue.put(FDv2SourceResult.interrupted(errorInfo)); + resultQueue.put(FDv2SourceResult.interrupted(errorInfo, getFallback(e))); return true; // allow reconnect } } @@ -361,7 +363,7 @@ private boolean handleError(StreamException e) { e.toString(), Instant.now() ); - resultQueue.put(FDv2SourceResult.interrupted(errorInfo)); + resultQueue.put(FDv2SourceResult.interrupted(errorInfo, getFallback(e))); return true; // allow reconnect } @@ -385,4 +387,24 @@ private FDv2Event parseFDv2Event(String eventName, Reader eventDataReader) throw throw new SerializationException(e); } } + + private static boolean getFallback(Exception ex) { + if(ex instanceof StreamHttpErrorException) { + String headerValue = ((StreamHttpErrorException) ex).getHeaders() + .value(HeaderConstants.FDV1_FALLBACK.getHeaderName()); + return headerValue != null && headerValue.equalsIgnoreCase("true"); + } + return false; + } + + private static boolean getFallback(StreamEvent event) { + String headerName = HeaderConstants.FDV1_FALLBACK.getHeaderName(); + String headerValue = null; + if(event instanceof FaultEvent) { + headerValue = ((FaultEvent) event).getHeaders().value(headerName); + } else if (event instanceof MessageEvent) { + headerValue = ((MessageEvent) event).getHeaders().value(headerName); + } + return headerValue != null && headerValue.equalsIgnoreCase("true"); + } } diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/FDv2SourceResult.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/FDv2SourceResult.java index 3f7ad16..6f440c6 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/FDv2SourceResult.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/FDv2SourceResult.java @@ -12,7 +12,7 @@ public class FDv2SourceResult { public enum State { /** * The data source has encountered an interruption and will attempt to reconnect. This isn't intended to be used - * with an initializer, and instead TERMINAL_ERROR should be used. When this status is used with an initializer + * with an initializer, and instead TERMINAL_ERROR should be used. When this status is used with an initializer, * it will still be a terminal state. */ INTERRUPTED, @@ -67,32 +67,49 @@ public Status(State state, DataSourceStatusProvider.ErrorInfo errorInfo) { private final Status status; private final ResultType resultType; + + private final boolean fdv1Fallback; - private FDv2SourceResult(DataStoreTypes.ChangeSet changeSet, Status status, ResultType resultType) { + private FDv2SourceResult(DataStoreTypes.ChangeSet changeSet, Status status, ResultType resultType, boolean fdv1Fallback) { this.changeSet = changeSet; this.status = status; this.resultType = resultType; + this.fdv1Fallback = fdv1Fallback; } - public static FDv2SourceResult interrupted(DataSourceStatusProvider.ErrorInfo errorInfo) { - return new FDv2SourceResult(null, new Status(State.INTERRUPTED, errorInfo), ResultType.STATUS); + public static FDv2SourceResult interrupted(DataSourceStatusProvider.ErrorInfo errorInfo, boolean fdv1Fallback) { + return new FDv2SourceResult( + null, + new Status(State.INTERRUPTED, errorInfo), + ResultType.STATUS, + fdv1Fallback); } public static FDv2SourceResult shutdown() { - return new FDv2SourceResult(null, new Status(State.SHUTDOWN, null), ResultType.STATUS); + return new FDv2SourceResult(null, + new Status(State.SHUTDOWN, null), + ResultType.STATUS, + false); } - public static FDv2SourceResult terminalError(DataSourceStatusProvider.ErrorInfo errorInfo) { - return new FDv2SourceResult(null, new Status(State.TERMINAL_ERROR, errorInfo), ResultType.STATUS); + public static FDv2SourceResult terminalError(DataSourceStatusProvider.ErrorInfo errorInfo, boolean fdv1Fallback) { + return new FDv2SourceResult(null, + new Status(State.TERMINAL_ERROR, errorInfo), + ResultType.STATUS, + fdv1Fallback); } - public static FDv2SourceResult changeSet(DataStoreTypes.ChangeSet changeSet) { - return new FDv2SourceResult(changeSet, null, ResultType.CHANGE_SET); + public static FDv2SourceResult changeSet(DataStoreTypes.ChangeSet changeSet, boolean fdv1Fallback) { + return new FDv2SourceResult(changeSet, null, ResultType.CHANGE_SET, fdv1Fallback); } - public static FDv2SourceResult goodbye(String reason) { + public static FDv2SourceResult goodbye(String reason, boolean fdv1Fallback) { // TODO: Goodbye reason. - return new FDv2SourceResult(null, new Status(State.GOODBYE, null), ResultType.STATUS); + return new FDv2SourceResult( + null, + new Status(State.GOODBYE, null), + ResultType.STATUS, + fdv1Fallback); } public ResultType getResultType() { @@ -106,4 +123,8 @@ public Status getStatus() { public DataStoreTypes.ChangeSet getChangeSet() { return changeSet; } + + public boolean isFdv1Fallback() { + return fdv1Fallback; + } } diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DefaultFDv2RequestorTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DefaultFDv2RequestorTest.java index 5ce321b..dd5ec9d 100644 --- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DefaultFDv2RequestorTest.java +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DefaultFDv2RequestorTest.java @@ -2,6 +2,7 @@ import com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event; import com.launchdarkly.sdk.internal.fdv2.sources.Selector; +import com.launchdarkly.sdk.internal.http.HttpErrors; import com.launchdarkly.sdk.internal.http.HttpProperties; import com.launchdarkly.sdk.server.subsystems.ClientContext; import com.launchdarkly.testhelpers.httptest.Handler; @@ -18,12 +19,8 @@ import java.util.concurrent.TimeUnit; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.notNullValue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.hamcrest.Matchers.*; +import static org.junit.Assert.*; @SuppressWarnings("javadoc") public class DefaultFDv2RequestorTest extends BaseTest { @@ -216,14 +213,14 @@ public void etagCachingWith304NotModified() throws Exception { RequestInfo req1 = server.getRecorder().requireRequest(); assertEquals(REQUEST_PATH, req1.getPath()); - assertEquals(null, req1.getHeader("If-None-Match")); + assertNull(req1.getHeader("If-None-Match")); // Second request should send If-None-Match and receive 304 CompletableFuture future2 = requestor.Poll(Selector.EMPTY); FDv2Requestor.FDv2PayloadResponse response2 = future2.get(5, TimeUnit.SECONDS); - assertEquals(null, response2); + assertEquals(304, response2.getStatusCode()); RequestInfo req2 = server.getRecorder().requireRequest(); assertEquals(REQUEST_PATH, req2.getPath()); @@ -250,7 +247,7 @@ public void etagUpdatedOnNewResponse() throws Exception { // First request requestor.Poll(Selector.EMPTY).get(5, TimeUnit.SECONDS); RequestInfo req1 = server.getRecorder().requireRequest(); - assertEquals(null, req1.getHeader("If-None-Match")); + assertNull(req1.getHeader("If-None-Match")); // Second request should use etag-1 requestor.Poll(Selector.EMPTY).get(5, TimeUnit.SECONDS); @@ -289,13 +286,13 @@ public void etagRemovedWhenNotInResponse() throws Exception { // Third request should not send ETag (was removed) requestor.Poll(Selector.EMPTY).get(5, TimeUnit.SECONDS); RequestInfo req3 = server.getRecorder().requireRequest(); - assertEquals(null, req3.getHeader("If-None-Match")); + assertNull(req3.getHeader("If-None-Match")); } } } @Test - public void httpErrorCodeThrowsException() throws Exception { + public void httpErrorCodeReturnsFailureResponse() throws Exception { Handler resp = Handlers.status(500); try (HttpServer server = HttpServer.start(resp)) { @@ -303,19 +300,17 @@ public void httpErrorCodeThrowsException() throws Exception { CompletableFuture future = requestor.Poll(Selector.EMPTY); - try { - future.get(5, TimeUnit.SECONDS); - fail("Expected ExecutionException"); - } catch (ExecutionException e) { - assertThat(e.getCause(), notNullValue()); - assertThat(e.getCause().getMessage(), containsString("500")); - } + FDv2Requestor.FDv2PayloadResponse response = future.get(5, TimeUnit.SECONDS); + + assertNotNull(response); + assertEquals(500, response.getStatusCode()); + assertFalse(response.isSuccess()); } } } @Test - public void http404ThrowsException() throws Exception { + public void http404ReturnsFailureResponse() throws Exception { Handler resp = Handlers.status(404); try (HttpServer server = HttpServer.start(resp)) { @@ -323,13 +318,11 @@ public void http404ThrowsException() throws Exception { CompletableFuture future = requestor.Poll(Selector.EMPTY); - try { - future.get(5, TimeUnit.SECONDS); - fail("Expected ExecutionException"); - } catch (ExecutionException e) { - assertThat(e.getCause(), notNullValue()); - assertThat(e.getCause().getMessage(), containsString("404")); - } + FDv2Requestor.FDv2PayloadResponse response = future.get(5, TimeUnit.SECONDS); + + assertNotNull(response); + assertEquals(404, response.getStatusCode()); + assertFalse(response.isSuccess()); } } } @@ -408,7 +401,7 @@ public void differentSelectorsUseDifferentEtags() throws Exception { // First request with selector1 requestor.Poll(selector1).get(5, TimeUnit.SECONDS); RequestInfo req1 = server.getRecorder().requireRequest(); - assertEquals(null, req1.getHeader("If-None-Match")); + assertNull(req1.getHeader("If-None-Match")); // Second request with selector1 should use cached ETag requestor.Poll(selector1).get(5, TimeUnit.SECONDS); @@ -418,7 +411,7 @@ public void differentSelectorsUseDifferentEtags() throws Exception { // Request with selector2 should not have ETag (different URI) requestor.Poll(selector2).get(5, TimeUnit.SECONDS); RequestInfo req3 = server.getRecorder().requireRequest(); - assertEquals(null, req3.getHeader("If-None-Match")); + assertNull(req3.getHeader("If-None-Match")); } } } diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/PollingInitializerImplTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/PollingInitializerImplTest.java index c97194f..c22e6f7 100644 --- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/PollingInitializerImplTest.java +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/PollingInitializerImplTest.java @@ -67,9 +67,10 @@ private FDv2Requestor.FDv2PayloadResponse makeSuccessResponse() { "}"; try { - return new FDv2Requestor.FDv2PayloadResponse( + return FDv2Requestor.FDv2PayloadResponse.success( com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray(json), - okhttp3.Headers.of() + okhttp3.Headers.of(), + 200 ); } catch (Exception e) { throw new RuntimeException(e); @@ -102,8 +103,10 @@ public void httpRecoverableError() throws Exception { FDv2Requestor requestor = mockRequestor(); SelectorSource selectorSource = mockSelectorSource(); + FDv2Requestor.FDv2PayloadResponse errorResponse = + FDv2Requestor.FDv2PayloadResponse.failure(503, okhttp3.Headers.of()); when(requestor.Poll(any(Selector.class))) - .thenReturn(failedFuture(new HttpErrors.HttpErrorException(503))); + .thenReturn(CompletableFuture.completedFuture(errorResponse)); PollingInitializerImpl initializer = new PollingInitializerImpl(requestor, testLogger, selectorSource); @@ -116,7 +119,7 @@ public void httpRecoverableError() throws Exception { assertNotNull(result.getStatus().getErrorInfo()); assertEquals(DataSourceStatusProvider.ErrorKind.ERROR_RESPONSE, result.getStatus().getErrorInfo().getKind()); - + } @Test @@ -124,8 +127,10 @@ public void httpNonRecoverableError() throws Exception { FDv2Requestor requestor = mockRequestor(); SelectorSource selectorSource = mockSelectorSource(); + FDv2Requestor.FDv2PayloadResponse errorResponse = + FDv2Requestor.FDv2PayloadResponse.failure(401, okhttp3.Headers.of()); when(requestor.Poll(any(Selector.class))) - .thenReturn(failedFuture(new HttpErrors.HttpErrorException(401))); + .thenReturn(CompletableFuture.completedFuture(errorResponse)); PollingInitializerImpl initializer = new PollingInitializerImpl(requestor, testLogger, selectorSource); @@ -137,7 +142,7 @@ public void httpNonRecoverableError() throws Exception { assertEquals(FDv2SourceResult.State.TERMINAL_ERROR, result.getStatus().getState()); assertEquals(DataSourceStatusProvider.ErrorKind.ERROR_RESPONSE, result.getStatus().getErrorInfo().getKind()); - + } @Test @@ -248,9 +253,10 @@ public void errorEventInResponse() throws Exception { " ]\n" + "}"; - FDv2Requestor.FDv2PayloadResponse response = new FDv2Requestor.FDv2PayloadResponse( + FDv2Requestor.FDv2PayloadResponse response = FDv2Requestor.FDv2PayloadResponse.success( com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray(errorJson), - okhttp3.Headers.of() + okhttp3.Headers.of(), + 200 ); when(requestor.Poll(any(Selector.class))) @@ -284,9 +290,10 @@ public void goodbyeEventInResponse() throws Exception { " ]\n" + "}"; - FDv2Requestor.FDv2PayloadResponse response = new FDv2Requestor.FDv2PayloadResponse( + FDv2Requestor.FDv2PayloadResponse response = FDv2Requestor.FDv2PayloadResponse.success( com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray(goodbyeJson), - okhttp3.Headers.of() + okhttp3.Headers.of(), + 200 ); when(requestor.Poll(any(Selector.class))) @@ -311,9 +318,10 @@ public void emptyEventsArray() throws Exception { String emptyJson = "{\"events\": []}"; - FDv2Requestor.FDv2PayloadResponse response = new FDv2Requestor.FDv2PayloadResponse( + FDv2Requestor.FDv2PayloadResponse response = FDv2Requestor.FDv2PayloadResponse.success( com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray(emptyJson), - okhttp3.Headers.of() + okhttp3.Headers.of(), + 200 ); when(requestor.Poll(any(Selector.class))) @@ -366,9 +374,10 @@ public void internalErrorWithInvalidDataKind() throws Exception { " ]\n" + "}"; - FDv2Requestor.FDv2PayloadResponse response = new FDv2Requestor.FDv2PayloadResponse( + FDv2Requestor.FDv2PayloadResponse response = FDv2Requestor.FDv2PayloadResponse.success( com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray(malformedPayloadTransferred), - okhttp3.Headers.of() + okhttp3.Headers.of(), + 200 ); when(requestor.Poll(any(Selector.class))) @@ -403,9 +412,10 @@ public void internalErrorWithUnknownKind() throws Exception { " ]\n" + "}"; - FDv2Requestor.FDv2PayloadResponse response = new FDv2Requestor.FDv2PayloadResponse( + FDv2Requestor.FDv2PayloadResponse response = FDv2Requestor.FDv2PayloadResponse.success( com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray(unknownEventJson), - okhttp3.Headers.of() + okhttp3.Headers.of(), + 200 ); when(requestor.Poll(any(Selector.class))) @@ -423,4 +433,182 @@ public void internalErrorWithUnknownKind() throws Exception { } + + @Test + public void fdv1FallbackFlagSetToTrueInSuccessResponse() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + + okhttp3.Headers headers = new okhttp3.Headers.Builder() + .add("x-ld-fd-fallback", "true") + .build(); + + FDv2Requestor.FDv2PayloadResponse response = FDv2Requestor.FDv2PayloadResponse.success( + com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray( + "{\"events\": [{\"event\": \"server-intent\", \"data\": {\"payloads\": [{\"id\": \"payload-1\", \"target\": 100, \"intentCode\": \"xfer-full\", \"reason\": \"payload-missing\"}]}}, {\"event\": \"payload-transferred\", \"data\": {\"state\": \"(p:payload-1:100)\", \"version\": 100}}]}" + ), + headers, + 200 + ); + + when(requestor.Poll(any(Selector.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + + PollingInitializerImpl initializer = new PollingInitializerImpl(requestor, testLogger, selectorSource); + + CompletableFuture resultFuture = initializer.run(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType()); + assertEquals(true, result.isFdv1Fallback()); + } + + @Test + public void fdv1FallbackFlagSetToFalseWhenHeaderNotPresent() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + + FDv2Requestor.FDv2PayloadResponse response = makeSuccessResponse(); + when(requestor.Poll(any(Selector.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + + PollingInitializerImpl initializer = new PollingInitializerImpl(requestor, testLogger, selectorSource); + + CompletableFuture resultFuture = initializer.run(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType()); + assertEquals(false, result.isFdv1Fallback()); + } + + @Test + public void fdv1FallbackFlagSetToTrueInErrorResponse() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + + okhttp3.Headers headers = new okhttp3.Headers.Builder() + .add("x-ld-fd-fallback", "true") + .build(); + + FDv2Requestor.FDv2PayloadResponse errorResponse = + FDv2Requestor.FDv2PayloadResponse.failure(503, headers); + when(requestor.Poll(any(Selector.class))) + .thenReturn(CompletableFuture.completedFuture(errorResponse)); + + PollingInitializerImpl initializer = new PollingInitializerImpl(requestor, testLogger, selectorSource); + + CompletableFuture resultFuture = initializer.run(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType()); + assertEquals(FDv2SourceResult.State.TERMINAL_ERROR, result.getStatus().getState()); + assertEquals(true, result.isFdv1Fallback()); + } + + @Test + public void fdv1FallbackFlagSetToFalseInNetworkError() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + + when(requestor.Poll(any(Selector.class))) + .thenReturn(failedFuture(new IOException("Connection refused"))); + + PollingInitializerImpl initializer = new PollingInitializerImpl(requestor, testLogger, selectorSource); + + CompletableFuture resultFuture = initializer.run(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType()); + assertEquals(FDv2SourceResult.State.TERMINAL_ERROR, result.getStatus().getState()); + // Network errors don't have headers, so fallback should be false + assertEquals(false, result.isFdv1Fallback()); + } + + @Test + public void environmentIdExtractedFromHeaders() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + + okhttp3.Headers headers = new okhttp3.Headers.Builder() + .add("x-ld-envid", "test-env-123") + .build(); + + FDv2Requestor.FDv2PayloadResponse response = FDv2Requestor.FDv2PayloadResponse.success( + com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray( + "{\"events\": [{\"event\": \"server-intent\", \"data\": {\"payloads\": [{\"id\": \"payload-1\", \"target\": 100, \"intentCode\": \"xfer-full\", \"reason\": \"payload-missing\"}]}}, {\"event\": \"payload-transferred\", \"data\": {\"state\": \"(p:payload-1:100)\", \"version\": 100}}]}" + ), + headers, + 200 + ); + + when(requestor.Poll(any(Selector.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + + PollingInitializerImpl initializer = new PollingInitializerImpl(requestor, testLogger, selectorSource); + + CompletableFuture resultFuture = initializer.run(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType()); + assertNotNull(result.getChangeSet()); + assertEquals("test-env-123", result.getChangeSet().getEnvironmentId()); + } + + @Test + public void environmentIdNullWhenHeaderNotPresent() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + + FDv2Requestor.FDv2PayloadResponse response = makeSuccessResponse(); + when(requestor.Poll(any(Selector.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + + PollingInitializerImpl initializer = new PollingInitializerImpl(requestor, testLogger, selectorSource); + + CompletableFuture resultFuture = initializer.run(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType()); + assertNotNull(result.getChangeSet()); + assertNull(result.getChangeSet().getEnvironmentId()); + } + + @Test + public void bothFdv1FallbackAndEnvironmentIdExtractedFromHeaders() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + + okhttp3.Headers headers = new okhttp3.Headers.Builder() + .add("x-ld-fd-fallback", "true") + .add("x-ld-envid", "test-env-456") + .build(); + + FDv2Requestor.FDv2PayloadResponse response = FDv2Requestor.FDv2PayloadResponse.success( + com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray( + "{\"events\": [{\"event\": \"server-intent\", \"data\": {\"payloads\": [{\"id\": \"payload-1\", \"target\": 100, \"intentCode\": \"xfer-full\", \"reason\": \"payload-missing\"}]}}, {\"event\": \"payload-transferred\", \"data\": {\"state\": \"(p:payload-1:100)\", \"version\": 100}}]}" + ), + headers, + 200 + ); + + when(requestor.Poll(any(Selector.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + + PollingInitializerImpl initializer = new PollingInitializerImpl(requestor, testLogger, selectorSource); + + CompletableFuture resultFuture = initializer.run(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType()); + assertEquals(true, result.isFdv1Fallback()); + assertNotNull(result.getChangeSet()); + assertEquals("test-env-456", result.getChangeSet().getEnvironmentId()); + } } diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/PollingSynchronizerImplTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/PollingSynchronizerImplTest.java index 4dbccd0..da6a9fd 100644 --- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/PollingSynchronizerImplTest.java +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/PollingSynchronizerImplTest.java @@ -67,9 +67,10 @@ private FDv2Requestor.FDv2PayloadResponse makeSuccessResponse() { "}"; try { - return new FDv2Requestor.FDv2PayloadResponse( + return FDv2Requestor.FDv2PayloadResponse.success( com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray(json), - okhttp3.Headers.of() + okhttp3.Headers.of(), + 200 ); } catch (Exception e) { throw new RuntimeException(e); @@ -535,7 +536,8 @@ public void nonRecoverableHttpErrorStopsPolling() throws Exception { int count = callCount.incrementAndGet(); // First call returns 401 (non-recoverable) if (count == 1) { - return failedFuture(new com.launchdarkly.sdk.internal.http.HttpErrors.HttpErrorException(401)); + return CompletableFuture.completedFuture( + FDv2Requestor.FDv2PayloadResponse.failure(401, okhttp3.Headers.of())); } else { // Subsequent calls should not happen, but return success if they do return CompletableFuture.completedFuture(makeSuccessResponse()); @@ -583,7 +585,8 @@ public void recoverableHttpErrorContinuesPolling() throws Exception { int count = callCount.incrementAndGet(); // First call returns 429 (recoverable - too many requests) if (count == 1) { - return failedFuture(new com.launchdarkly.sdk.internal.http.HttpErrors.HttpErrorException(429)); + return CompletableFuture.completedFuture( + FDv2Requestor.FDv2PayloadResponse.failure(429, okhttp3.Headers.of())); } else { // Subsequent calls succeed successCount.incrementAndGet(); @@ -635,9 +638,11 @@ public void multipleRecoverableErrorsContinuePolling() throws Exception { int count = callCount.incrementAndGet(); // Multiple recoverable errors: 408, 429, network error, success pattern if (count == 1) { - return failedFuture(new com.launchdarkly.sdk.internal.http.HttpErrors.HttpErrorException(408)); + return CompletableFuture.completedFuture( + FDv2Requestor.FDv2PayloadResponse.failure(408, okhttp3.Headers.of())); } else if (count == 2) { - return failedFuture(new com.launchdarkly.sdk.internal.http.HttpErrors.HttpErrorException(429)); + return CompletableFuture.completedFuture( + FDv2Requestor.FDv2PayloadResponse.failure(429, okhttp3.Headers.of())); } else if (count == 3) { return failedFuture(new IOException("Connection timeout")); } else { @@ -691,7 +696,8 @@ public void nonRecoverableThenRecoverableErrorStopsPolling() throws Exception { int count = callCount.incrementAndGet(); // First call returns 403 (non-recoverable) if (count == 1) { - return failedFuture(new com.launchdarkly.sdk.internal.http.HttpErrors.HttpErrorException(403)); + return CompletableFuture.completedFuture( + FDv2Requestor.FDv2PayloadResponse.failure(403, okhttp3.Headers.of())); } else { // Any subsequent calls should not happen return failedFuture(new IOException("Network error")); @@ -761,9 +767,10 @@ public void internalErrorWithInvalidDataKindContinuesPolling() throws Exception " ]\n" + "}"; - return CompletableFuture.completedFuture(new FDv2Requestor.FDv2PayloadResponse( + return CompletableFuture.completedFuture(FDv2Requestor.FDv2PayloadResponse.success( com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray(malformedPayloadTransferred), - okhttp3.Headers.of() + okhttp3.Headers.of(), + 200 )); } else { // Subsequent calls succeed @@ -824,9 +831,10 @@ public void internalErrorWithUnknownKindContinuesPolling() throws Exception { " ]\n" + "}"; - return CompletableFuture.completedFuture(new FDv2Requestor.FDv2PayloadResponse( + return CompletableFuture.completedFuture(FDv2Requestor.FDv2PayloadResponse.success( com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray(unknownEventJson), - okhttp3.Headers.of() + okhttp3.Headers.of(), + 200 )); } else { // Subsequent calls succeed @@ -866,4 +874,201 @@ public void internalErrorWithUnknownKindContinuesPolling() throws Exception { executor.shutdown(); } } + + @Test + public void fdv1FallbackFlagSetToTrueInSuccessResponse() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + + okhttp3.Headers headers = new okhttp3.Headers.Builder() + .add("x-ld-fd-fallback", "true") + .build(); + + FDv2Requestor.FDv2PayloadResponse response = FDv2Requestor.FDv2PayloadResponse.success( + com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray( + "{\"events\": [{\"event\": \"server-intent\", \"data\": {\"payloads\": [{\"id\": \"payload-1\", \"target\": 100, \"intentCode\": \"xfer-full\", \"reason\": \"payload-missing\"}]}}, {\"event\": \"payload-transferred\", \"data\": {\"state\": \"(p:payload-1:100)\", \"version\": 100}}]}" + ), + headers, + 200 + ); + + when(requestor.Poll(any(Selector.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + + try { + PollingSynchronizerImpl synchronizer = new PollingSynchronizerImpl( + requestor, + testLogger, + selectorSource, + executor, + Duration.ofMillis(100) + ); + + FDv2SourceResult result = synchronizer.next().get(1, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType()); + assertEquals(true, result.isFdv1Fallback()); + + synchronizer.close(); + } finally { + executor.shutdown(); + } + } + + @Test + public void fdv1FallbackFlagSetToFalseWhenHeaderNotPresent() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + + when(requestor.Poll(any(Selector.class))) + .thenReturn(CompletableFuture.completedFuture(makeSuccessResponse())); + + try { + PollingSynchronizerImpl synchronizer = new PollingSynchronizerImpl( + requestor, + testLogger, + selectorSource, + executor, + Duration.ofMillis(100) + ); + + FDv2SourceResult result = synchronizer.next().get(1, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType()); + assertEquals(false, result.isFdv1Fallback()); + + synchronizer.close(); + } finally { + executor.shutdown(); + } + } + + @Test + public void fdv1FallbackFlagSetToTrueInErrorResponse() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + + okhttp3.Headers headers = new okhttp3.Headers.Builder() + .add("x-ld-fd-fallback", "true") + .build(); + + FDv2Requestor.FDv2PayloadResponse errorResponse = + FDv2Requestor.FDv2PayloadResponse.failure(503, headers); + when(requestor.Poll(any(Selector.class))) + .thenReturn(CompletableFuture.completedFuture(errorResponse)); + + try { + PollingSynchronizerImpl synchronizer = new PollingSynchronizerImpl( + requestor, + testLogger, + selectorSource, + executor, + Duration.ofMillis(100) + ); + + FDv2SourceResult result = synchronizer.next().get(1, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType()); + assertEquals(FDv2SourceResult.State.INTERRUPTED, result.getStatus().getState()); + assertEquals(true, result.isFdv1Fallback()); + + synchronizer.close(); + } finally { + executor.shutdown(); + } + } + + @Test + public void environmentIdExtractedFromHeaders() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + + okhttp3.Headers headers = new okhttp3.Headers.Builder() + .add("x-ld-envid", "test-env-789") + .build(); + + FDv2Requestor.FDv2PayloadResponse response = FDv2Requestor.FDv2PayloadResponse.success( + com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray( + "{\"events\": [{\"event\": \"server-intent\", \"data\": {\"payloads\": [{\"id\": \"payload-1\", \"target\": 100, \"intentCode\": \"xfer-full\", \"reason\": \"payload-missing\"}]}}, {\"event\": \"payload-transferred\", \"data\": {\"state\": \"(p:payload-1:100)\", \"version\": 100}}]}" + ), + headers, + 200 + ); + + when(requestor.Poll(any(Selector.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + + try { + PollingSynchronizerImpl synchronizer = new PollingSynchronizerImpl( + requestor, + testLogger, + selectorSource, + executor, + Duration.ofMillis(100) + ); + + FDv2SourceResult result = synchronizer.next().get(1, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType()); + assertNotNull(result.getChangeSet()); + assertEquals("test-env-789", result.getChangeSet().getEnvironmentId()); + + synchronizer.close(); + } finally { + executor.shutdown(); + } + } + + @Test + public void bothFdv1FallbackAndEnvironmentIdExtractedFromHeaders() throws Exception { + FDv2Requestor requestor = mockRequestor(); + SelectorSource selectorSource = mockSelectorSource(); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + + okhttp3.Headers headers = new okhttp3.Headers.Builder() + .add("x-ld-fd-fallback", "true") + .add("x-ld-envid", "test-env-combined") + .build(); + + FDv2Requestor.FDv2PayloadResponse response = FDv2Requestor.FDv2PayloadResponse.success( + com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray( + "{\"events\": [{\"event\": \"server-intent\", \"data\": {\"payloads\": [{\"id\": \"payload-1\", \"target\": 100, \"intentCode\": \"xfer-full\", \"reason\": \"payload-missing\"}]}}, {\"event\": \"payload-transferred\", \"data\": {\"state\": \"(p:payload-1:100)\", \"version\": 100}}]}" + ), + headers, + 200 + ); + + when(requestor.Poll(any(Selector.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + + try { + PollingSynchronizerImpl synchronizer = new PollingSynchronizerImpl( + requestor, + testLogger, + selectorSource, + executor, + Duration.ofMillis(100) + ); + + FDv2SourceResult result = synchronizer.next().get(1, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType()); + assertEquals(true, result.isFdv1Fallback()); + assertNotNull(result.getChangeSet()); + assertEquals("test-env-combined", result.getChangeSet().getEnvironmentId()); + + synchronizer.close(); + } finally { + executor.shutdown(); + } + } } diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/StreamingSynchronizerImplTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/StreamingSynchronizerImplTest.java index eaf305c..48194f3 100644 --- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/StreamingSynchronizerImplTest.java +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/StreamingSynchronizerImplTest.java @@ -823,4 +823,182 @@ public void nullPayloadFilterNotAddedToRequest() throws Exception { synchronizer.close(); } } + + @Test + public void fdv1FallbackFlagSetToTrueInSuccessResponse() throws Exception { + String serverIntent = makeEvent("server-intent", "{\"payloads\":[{\"id\":\"payload-1\",\"target\":100,\"intentCode\":\"xfer-full\",\"reason\":\"payload-missing\"}]}"); + String payloadTransferred = makeEvent("payload-transferred", "{\"state\":\"(p:payload-1:100)\",\"version\":100}"); + + try (HttpServer server = HttpServer.start(Handlers.all( + Handlers.header("x-ld-fd-fallback", "true"), + Handlers.SSE.start(), + Handlers.SSE.event(serverIntent), + Handlers.SSE.event(payloadTransferred), + Handlers.SSE.leaveOpen()))) { + + HttpProperties httpProperties = toHttpProperties(clientContext("sdk-key", baseConfig().build()).getHttp()); + SelectorSource selectorSource = mockSelectorSource(); + + StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl( + httpProperties, + server.getUri(), + "/stream", + testLogger, + selectorSource, + null, + Duration.ofMillis(100) + ); + + CompletableFuture resultFuture = synchronizer.next(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType()); + assertEquals(true, result.isFdv1Fallback()); + + synchronizer.close(); + } + } + + @Test + public void fdv1FallbackFlagSetToFalseWhenHeaderNotPresent() throws Exception { + String serverIntent = makeEvent("server-intent", "{\"payloads\":[{\"id\":\"payload-1\",\"target\":100,\"intentCode\":\"xfer-full\",\"reason\":\"payload-missing\"}]}"); + String payloadTransferred = makeEvent("payload-transferred", "{\"state\":\"(p:payload-1:100)\",\"version\":100}"); + + try (HttpServer server = HttpServer.start(Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event(serverIntent), + Handlers.SSE.event(payloadTransferred), + Handlers.SSE.leaveOpen()))) { + + HttpProperties httpProperties = toHttpProperties(clientContext("sdk-key", baseConfig().build()).getHttp()); + SelectorSource selectorSource = mockSelectorSource(); + + StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl( + httpProperties, + server.getUri(), + "/stream", + testLogger, + selectorSource, + null, + Duration.ofMillis(100) + ); + + CompletableFuture resultFuture = synchronizer.next(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType()); + assertEquals(false, result.isFdv1Fallback()); + + synchronizer.close(); + } + } + + @Test + public void fdv1FallbackFlagSetToTrueInErrorResponse() throws Exception { + try (HttpServer server = HttpServer.start(Handlers.all( + Handlers.status(503), + Handlers.header("x-ld-fd-fallback", "true")))) { + + HttpProperties httpProperties = toHttpProperties(clientContext("sdk-key", baseConfig().build()).getHttp()); + SelectorSource selectorSource = mockSelectorSource(); + + StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl( + httpProperties, + server.getUri(), + "/stream", + testLogger, + selectorSource, + null, + Duration.ofMillis(100) + ); + + CompletableFuture resultFuture = synchronizer.next(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType()); + assertEquals(FDv2SourceResult.State.INTERRUPTED, result.getStatus().getState()); + assertEquals(true, result.isFdv1Fallback()); + + synchronizer.close(); + } + } + + @Test + public void environmentIdExtractedFromHeaders() throws Exception { + String serverIntent = makeEvent("server-intent", "{\"payloads\":[{\"id\":\"payload-1\",\"target\":100,\"intentCode\":\"xfer-full\",\"reason\":\"payload-missing\"}]}"); + String payloadTransferred = makeEvent("payload-transferred", "{\"state\":\"(p:payload-1:100)\",\"version\":100}"); + + try (HttpServer server = HttpServer.start(Handlers.all( + Handlers.header("x-ld-envid", "test-env-streaming"), + Handlers.SSE.start(), + Handlers.SSE.event(serverIntent), + Handlers.SSE.event(payloadTransferred), + Handlers.SSE.leaveOpen()))) { + + HttpProperties httpProperties = toHttpProperties(clientContext("sdk-key", baseConfig().build()).getHttp()); + SelectorSource selectorSource = mockSelectorSource(); + + StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl( + httpProperties, + server.getUri(), + "/stream", + testLogger, + selectorSource, + null, + Duration.ofMillis(100) + ); + + CompletableFuture resultFuture = synchronizer.next(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType()); + assertNotNull(result.getChangeSet()); + assertEquals("test-env-streaming", result.getChangeSet().getEnvironmentId()); + + synchronizer.close(); + } + } + + @Test + public void bothFdv1FallbackAndEnvironmentIdExtractedFromHeaders() throws Exception { + String serverIntent = makeEvent("server-intent", "{\"payloads\":[{\"id\":\"payload-1\",\"target\":100,\"intentCode\":\"xfer-full\",\"reason\":\"payload-missing\"}]}"); + String payloadTransferred = makeEvent("payload-transferred", "{\"state\":\"(p:payload-1:100)\",\"version\":100}"); + + try (HttpServer server = HttpServer.start(Handlers.all( + Handlers.header("x-ld-fd-fallback", "true"), + Handlers.header("x-ld-envid", "test-env-combined-streaming"), + Handlers.SSE.start(), + Handlers.SSE.event(serverIntent), + Handlers.SSE.event(payloadTransferred), + Handlers.SSE.leaveOpen()))) { + + HttpProperties httpProperties = toHttpProperties(clientContext("sdk-key", baseConfig().build()).getHttp()); + SelectorSource selectorSource = mockSelectorSource(); + + StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl( + httpProperties, + server.getUri(), + "/stream", + testLogger, + selectorSource, + null, + Duration.ofMillis(100) + ); + + CompletableFuture resultFuture = synchronizer.next(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType()); + assertEquals(true, result.isFdv1Fallback()); + assertNotNull(result.getChangeSet()); + assertEquals("test-env-combined-streaming", result.getChangeSet().getEnvironmentId()); + + synchronizer.close(); + } + } } From b20c62102e1f3bd23468a9bc10f917d92187c2f4 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Wed, 21 Jan 2026 15:31:49 -0800 Subject: [PATCH 2/2] Cleanup --- .../com/launchdarkly/sdk/server/DefaultFDv2Requestor.java | 2 +- .../main/java/com/launchdarkly/sdk/server/PollingBase.java | 6 ------ 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DefaultFDv2Requestor.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DefaultFDv2Requestor.java index 1eb75b1..d41be27 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DefaultFDv2Requestor.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DefaultFDv2Requestor.java @@ -111,7 +111,7 @@ public void onFailure(@Nonnull Call call, @Nonnull IOException e) { @Override public void onResponse(@Nonnull Call call, @Nonnull Response response) { try { - // Handle 304 Not Modified - no new data, but return response with headers + // Handle 304 Not Modified - no new data if (response.code() == 304) { logger.debug("FDv2 polling request returned 304: not modified"); future.complete(FDv2PayloadResponse.none(response.code())); diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingBase.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingBase.java index f18309d..c1a4235 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingBase.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingBase.java @@ -4,7 +4,6 @@ import com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event; import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ProtocolHandler; import com.launchdarkly.sdk.internal.fdv2.sources.Selector; -import com.launchdarkly.sdk.internal.http.HttpErrors; import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes; @@ -34,11 +33,6 @@ private static boolean getFallback(FDv2Requestor.FDv2PayloadResponse response) { String headerValue = response.getHeaders().get(HeaderConstants.FDV1_FALLBACK.getHeaderName()); return headerValue != null && headerValue.equalsIgnoreCase("true"); } -// if(ex != null) { -// if(ex instanceof HttpErrorException) { -// ((HttpErrors.HttpErrorException) ex). -// } -// } return false; }