From b46ac5e11469de7ebd5b8c287a38f84b38a61c5c Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Fri, 23 Jan 2026 15:12:42 -0800 Subject: [PATCH 1/3] chore: Add fallback and recovery support for FDv2. --- .../sdk/server/FDv2DataSource.java | 370 +++++++++++++-- .../sdk/server/FDv2DataSystem.java | 5 +- .../com/launchdarkly/sdk/server/Loggers.java | 3 + .../sdk/server/PollingInitializerImpl.java | 2 +- .../sdk/server/PollingSynchronizerImpl.java | 2 +- .../sdk/server/StreamingSynchronizerImpl.java | 2 +- .../FDv2DataSourceFallbackConditionTest.java | 438 ++++++++++++++++++ 7 files changed, 790 insertions(+), 32 deletions(-) create mode 100644 lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceFallbackConditionTest.java diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java index 906c4847..edaf130a 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java @@ -1,6 +1,7 @@ package com.launchdarkly.sdk.server; import com.google.common.collect.ImmutableList; +import com.launchdarkly.logging.LDLogger; import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; import com.launchdarkly.sdk.server.datasources.Initializer; import com.launchdarkly.sdk.server.datasources.Synchronizer; @@ -10,18 +11,29 @@ import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; class FDv2DataSource implements DataSource { + /** + * Default fallback timeout of 2 minutes. The timeout is only configurable for testing. + */ + private static final int defaultFallbackTimeout = 2 * 60; + + /** + * Default recovery timeout of 5 minutes. The timeout is only configurable for testing. + */ + private static final long defaultRecoveryTimeout = 5 * 60; + private final List> initializers; private final List synchronizers; + private final List conditionFactories; + private final DataSourceUpdateSinkV2 dataSourceUpdates; private final CompletableFuture startFuture = new CompletableFuture<>(); @@ -34,6 +46,168 @@ class FDv2DataSource implements DataSource { private Closeable activeSource; private boolean isShutdown = false; + private final int threadPriority; + + private final LDLogger logger; + + /** + * Package-private for testing. + */ + interface Condition { + enum ConditionType { + FALLBACK, + RECOVERY, + } + CompletableFuture execute(); + + void inform(FDv2SourceResult sourceResult); + + void close() throws IOException; + + ConditionType getType(); + } + + interface ConditionFactory { + Condition build(); + + Condition.ConditionType getType(); + } + + + static abstract class TimedCondition implements Condition { + protected final CompletableFuture resultFuture = new CompletableFuture<>(); + + protected final ScheduledExecutorService sharedExecutor; + + /** + * Future for the timeout task, if any. Will be null when no timeout is active. + */ + protected ScheduledFuture timerFuture; + + /** + * Timeout duration for the fallback operation. + */ + protected final long timeoutSeconds; + + public TimedCondition(ScheduledExecutorService sharedExecutor, long timeoutSeconds) { + this.sharedExecutor = sharedExecutor; + this.timeoutSeconds = timeoutSeconds; + } + + @Override + public CompletableFuture execute() { + return resultFuture; + } + + @Override + public void close() throws IOException { + if (timerFuture != null) { + timerFuture.cancel(false); + timerFuture = null; + } + } + + static abstract class Factory implements ConditionFactory { + protected final ScheduledExecutorService sharedExecutor; + protected final long timeoutSeconds; + + public Factory(ScheduledExecutorService sharedExecutor, long timeout) { + this.sharedExecutor = sharedExecutor; + this.timeoutSeconds = timeout; + } + } + } + + /** + * This condition is used to determine if a fallback should be performed. It is informed of each data source result + * via {@link #inform(FDv2SourceResult)}. Based on the results, it updates its internal state. When the fallback + * condition is met, then the {@link Future} returned by {@link #execute()} will complete. + *

+ * This is package-private, instead of private, for ease of testing. + */ + static class FallbackCondition extends TimedCondition { + static class Factory extends TimedCondition.Factory { + public Factory(ScheduledExecutorService sharedExecutor, long timeout) { + super(sharedExecutor, timeout); + } + @Override + public Condition build() { + return new FallbackCondition(sharedExecutor, timeoutSeconds); + } + + @Override + public ConditionType getType() { + return ConditionType.FALLBACK; + } + } + + public FallbackCondition(ScheduledExecutorService sharedExecutor, long timeoutSeconds) { + super(sharedExecutor, timeoutSeconds); + } + + @Override + public void inform(FDv2SourceResult sourceResult) { + if(sourceResult == null) { + return; + } + if(sourceResult.getResultType() == FDv2SourceResult.ResultType.CHANGE_SET) { + if(timerFuture != null) { + timerFuture.cancel(false); + timerFuture = null; + } + } + if(sourceResult.getResultType() == FDv2SourceResult.ResultType.STATUS && sourceResult.getStatus().getState() == FDv2SourceResult.State.INTERRUPTED) { + if (timerFuture == null) { + timerFuture = sharedExecutor.schedule(() -> { + resultFuture.complete(this); + return null; + }, timeoutSeconds, TimeUnit.SECONDS); + } + } + } + + @Override + public ConditionType getType() { + return ConditionType.FALLBACK; + } + } + + static class RecoveryCondition extends TimedCondition { + + static class Factory extends TimedCondition.Factory { + public Factory(ScheduledExecutorService sharedExecutor, long timeout) { + super(sharedExecutor, timeout); + } + @Override + public Condition build() { + return new RecoveryCondition(sharedExecutor, timeoutSeconds); + } + + @Override + public ConditionType getType() { + return ConditionType.RECOVERY; + } + } + + public RecoveryCondition(ScheduledExecutorService sharedExecutor, long timeoutSeconds) { + super(sharedExecutor, timeoutSeconds); + timerFuture = sharedExecutor.schedule(() -> { + resultFuture.complete(this); + return null; + }, timeoutSeconds, TimeUnit.SECONDS); + } + + @Override + public void inform(FDv2SourceResult sourceResult) { + // Time-based recovery. + } + + @Override + public ConditionType getType() { + return ConditionType.RECOVERY; + } + } + private static class SynchronizerFactoryWithState { public enum State { /** @@ -73,18 +247,39 @@ public interface DataSourceFactory { T build(); } + public FDv2DataSource( + ImmutableList> initializers, + ImmutableList> synchronizers, + DataSourceUpdateSinkV2 dataSourceUpdates, + int threadPriority, + LDLogger logger, + ScheduledExecutorService sharedExecutor + ) { + this(initializers, synchronizers, dataSourceUpdates, threadPriority, logger, sharedExecutor, defaultFallbackTimeout, defaultRecoveryTimeout); + } + public FDv2DataSource( - ImmutableList> initializers, - ImmutableList> synchronizers, - DataSourceUpdateSinkV2 dataSourceUpdates + ImmutableList> initializers, + ImmutableList> synchronizers, + DataSourceUpdateSinkV2 dataSourceUpdates, + int threadPriority, + LDLogger logger, + ScheduledExecutorService sharedExecutor, + long fallbackTimeout, + long recoveryTimeout ) { this.initializers = initializers; this.synchronizers = synchronizers - .stream() - .map(SynchronizerFactoryWithState::new) - .collect(Collectors.toList()); + .stream() + .map(SynchronizerFactoryWithState::new) + .collect(Collectors.toList()); this.dataSourceUpdates = dataSourceUpdates; + this.threadPriority = threadPriority; + this.logger = logger; + this.conditionFactories = new ArrayList<>(); + this.conditionFactories.add(new FallbackCondition.Factory(sharedExecutor, fallbackTimeout)); + this.conditionFactories.add(new RecoveryCondition.Factory(sharedExecutor, recoveryTimeout)); } private void run() { @@ -92,24 +287,64 @@ private void run() { if (!initializers.isEmpty()) { runInitializers(); } - runSynchronizers(); + boolean fdv1Fallback = runSynchronizers(); + if (fdv1Fallback) { + // TODO: Run FDv1 fallback. + } // TODO: Handle. We have ran out of sources or we are shutting down. }); runThread.setDaemon(true); - // TODO: Thread priority. - //thread.setPriority(threadPriority); + runThread.setPriority(threadPriority); runThread.start(); } - private SynchronizerFactoryWithState getFirstAvailableSynchronizer() { + /** + * We start at -1, so finding the next synchronizer can non-conditionally increment the index. + */ + private int sourceIndex = -1; + + /** + * Reset the source index to -1, indicating that we should start from the first synchronizer when looking for + * the next one to use. This is used when recovering from a non-primary synchronizer. + */ + private void resetSynchronizerSourceIndex() { + synchronized (activeSourceLock) { + sourceIndex = -1; + } + } + + /** + * Get the next synchronizer to use. This operates based on tracking the index of the currently active synchronizer, + * which will loop through all available synchronizers handling interruptions. Then a non-prime synchronizer recovers + * the source index will be reset, and we start at the beginning. + *

+ * Any given synchronizer can be marked as blocked, in which case that synchronizer is not eligible to be used again. + * Synchronizers that are not blocked are available, and this function will only return available synchronizers. + * @return the next synchronizer factory to use, or null if there are no more available synchronizers. + */ + private SynchronizerFactoryWithState getNextAvailableSynchronizer() { synchronized (synchronizers) { - for (SynchronizerFactoryWithState synchronizer : synchronizers) { - if (synchronizer.getState() == SynchronizerFactoryWithState.State.Available) { - return synchronizer; + SynchronizerFactoryWithState factory = null; + + // There is at least one available factory. + if(synchronizers.stream().anyMatch(s -> s.getState() == SynchronizerFactoryWithState.State.Available)) { + // Look for the next synchronizer starting at the position after the current one. (avoiding just re-using the same synchronizer.) + while(factory == null) { + sourceIndex++; + // We aren't using module here because we want to keep the stored index within range instead + // of increasing indefinitely. + if(sourceIndex >= synchronizers.size()) { + sourceIndex = 0; + } + SynchronizerFactoryWithState candidate = synchronizers.get(sourceIndex); + if (candidate.getState() == SynchronizerFactoryWithState.State.Available) { + factory = candidate; + } + } } - return null; + return factory; } } @@ -136,7 +371,9 @@ private void runInitializers() { break; } } catch (ExecutionException | InterruptedException | CancellationException e) { - // TODO: Log. + // TODO: Better messaging? + // TODO: Data source status? + logger.warn("Error running initializer: {}", e.toString()); } } // We received data without a selector, and we have exhausted initializers, so we are going to @@ -147,18 +384,88 @@ private void runInitializers() { } } - private void runSynchronizers() { - SynchronizerFactoryWithState availableSynchronizer = getFirstAvailableSynchronizer(); - // TODO: Add recovery handling. If there are no available synchronizers, but there are - // recovering ones, then we likely will want to wait for them to be available (or bypass recovery). + /** + * Determine conditions for the current synchronizer. Synchronizers require different conditions depending on if + * they are the 'prime' synchronizer or if there are other available synchronizers to use. + * @return a list of conditions to apply to the synchronizer + */ + private List getConditions() { + boolean isPrimeSynchronizer = false; + int availableSynchronizers = 0; + boolean firstAvailableSynchronizer = true; + + synchronized (activeSourceLock) { + for (int index = 0; index < synchronizers.size(); index++) { + + if (synchronizers.get(index).getState() == SynchronizerFactoryWithState.State.Available) { + if (firstAvailableSynchronizer && sourceIndex == index) { + // This is the first synchronizer that is available, and it also is the current one. + isPrimeSynchronizer = true; + } + // Subsequently encountered synchronizers that are available are not the first one. + firstAvailableSynchronizer = false; + availableSynchronizers++; + } + } + } + if(availableSynchronizers == 1) { + // If there is only 1 synchronizer, then we cannot fall back or recover, so we don't need any conditions. + return Collections.emptyList(); + } + if(isPrimeSynchronizer) { + // If there isn't a synchronizer to recover to, then don't add and recovery conditions. + return conditionFactories.stream() + .filter((ConditionFactory factory) -> factory.getType() != Condition.ConditionType.RECOVERY) + .map(ConditionFactory::build).collect(Collectors.toList()); + } + // The synchronizer can both fall back and recover. + return conditionFactories.stream().map(ConditionFactory::build).collect(Collectors.toList()); + } + + private boolean runSynchronizers() { + SynchronizerFactoryWithState availableSynchronizer = getNextAvailableSynchronizer(); while (availableSynchronizer != null) { Synchronizer synchronizer = availableSynchronizer.build(); + // Returns true if shutdown. - if (setActiveSource(synchronizer)) return; + if (setActiveSource(synchronizer)) return false; + try { boolean running = true; + // Conditions run once for the life of the synchronizer. + List conditions = getConditions(); + CompletableFuture conditionFutures = CompletableFuture.anyOf( + conditions.stream().map(Condition::execute).toArray(CompletableFuture[]::new)); + while (running) { - FDv2SourceResult result = synchronizer.next().get(); + CompletableFuture nextResultFuture = synchronizer.next(); + + Object res = CompletableFuture.anyOf(conditionFutures, nextResultFuture).get(); + + if(res instanceof Condition) { + Condition c = (Condition) res; + switch (c.getType()) { + case FALLBACK: + // For fallback, we will move to the next available synchronizer, which may loop. + // This is the default behavior of exiting the run loop, so we don't need to take + // any action. + break; + case RECOVERY: + // For recovery, we will start at the first available synchronizer. + // So we reset the source index, and finding the source will start at the beginning. + resetSynchronizerSourceIndex(); + break; + } + // A running synchronizer will only have fallback and recovery conditions that it can act on. + // So, if there are no synchronizers to recover to or fallback to, then we will not have + // those conditions. + break; + } + + + FDv2SourceResult result = (FDv2SourceResult) res; + conditions.forEach(c -> c.inform(result)); + switch (result.getResultType()) { case CHANGE_SET: dataSourceUpdates.apply(result.getChangeSet()); @@ -175,7 +482,7 @@ private void runSynchronizers() { case SHUTDOWN: // We should be overall shutting down. // TODO: We may need logging or to do a little more. - return; + return false; case TERMINAL_ERROR: availableSynchronizer.block(); running = false; @@ -186,13 +493,20 @@ private void runSynchronizers() { } break; } + // We have been requested to fall back to FDv1. We handle whatever message was associated, + // close the synchronizer, and then fallback. + if(result.isFdv1Fallback()) { + safeClose(synchronizer); + return true; + } } } catch (ExecutionException | InterruptedException | CancellationException e) { // TODO: Log. // Move to next synchronizer. } - availableSynchronizer = getFirstAvailableSynchronizer(); + availableSynchronizer = getNextAvailableSynchronizer(); } + return false; } private void safeClose(Closeable synchronizer) { @@ -239,7 +553,7 @@ public void close() throws IOException { // If there is an active source, we will shut it down, and that will result in the loop handling that source // exiting. // If we do not have an active source, then the loop will check isShutdown when attempting to set one. When - // it detects shutdown it will exit the loop. + // it detects shutdown, it will exit the loop. synchronized (activeSourceLock) { isShutdown = true; if (activeSource != null) { diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSystem.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSystem.java index 70fc0048..c23e7168 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSystem.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSystem.java @@ -138,7 +138,10 @@ static FDv2DataSystem create( DataSource dataSource = new FDv2DataSource( initializerFactories, synchronizerFactories, - dataSourceUpdates + dataSourceUpdates, + config.threadPriority, + clientContext.getBaseLogger().subLogger(Loggers.DATA_SOURCE_LOGGER_NAME), + clientContext.sharedExecutor ); DataSourceStatusProvider dataSourceStatusProvider = new DataSourceStatusProviderImpl( dataSourceStatusBroadcaster, diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/Loggers.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/Loggers.java index 823aa6a7..588ba3f3 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/Loggers.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/Loggers.java @@ -24,4 +24,7 @@ private Loggers() {} static final String EVALUATION_LOGGER_NAME = "Evaluation"; static final String EVENTS_LOGGER_NAME = "Events"; static final String HOOKS_LOGGER_NAME = "Hooks"; + static final String STREAMING_SYNCHRONIZER = "StreamingSynchronizer"; + static final String POLLING_SYNCHRONIZER = "PollingSynchronizer"; + static final String POLLING_INITIALIZER = "PollingInitializer"; } diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingInitializerImpl.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingInitializerImpl.java index 856a118b..2e3b368f 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingInitializerImpl.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingInitializerImpl.java @@ -12,7 +12,7 @@ class PollingInitializerImpl extends PollingBase implements Initializer { private final SelectorSource selectorSource; public PollingInitializerImpl(FDv2Requestor requestor, LDLogger logger, SelectorSource selectorSource) { - super(requestor, logger); + super(requestor, logger.subLogger(Loggers.POLLING_INITIALIZER)); this.selectorSource = selectorSource; } diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingSynchronizerImpl.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingSynchronizerImpl.java index 8bbc6a4e..43c95ee0 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingSynchronizerImpl.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingSynchronizerImpl.java @@ -23,7 +23,7 @@ public PollingSynchronizerImpl( ScheduledExecutorService sharedExecutor, Duration pollInterval ) { - super(requestor, logger); + super(requestor, logger.subLogger(Loggers.POLLING_SYNCHRONIZER)); this.selectorSource = selectorSource; synchronized (this) { diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java index 6fa4487c..e5099e5c 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java @@ -71,7 +71,7 @@ public StreamingSynchronizerImpl( ) { this.httpProperties = httpProperties; this.selectorSource = selectorSource; - this.logger = logger; + this.logger = logger.subLogger(Loggers.STREAMING_SYNCHRONIZER); this.payloadFilter = payloadFilter; this.streamUri = HttpHelpers.concatenateUriPath(baseUri, requestPath); this.initialReconnectDelay = initialReconnectDelaySeconds; diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceFallbackConditionTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceFallbackConditionTest.java new file mode 100644 index 00000000..b4577e1a --- /dev/null +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceFallbackConditionTest.java @@ -0,0 +1,438 @@ +package com.launchdarkly.sdk.server; + +import com.launchdarkly.sdk.internal.fdv2.sources.Selector; +import com.launchdarkly.sdk.server.FDv2DataSource.Condition; +import com.launchdarkly.sdk.server.FDv2DataSource.FallbackCondition; +import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; +import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes; + +import org.junit.After; +import org.junit.Test; + +import java.time.Instant; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +public class FDv2DataSourceFallbackConditionTest extends BaseTest { + + private ScheduledExecutorService executor; + + @After + public void tearDown() { + if (executor != null && !executor.isShutdown()) { + executor.shutdownNow(); + } + } + + private DataStoreTypes.ChangeSet makeChangeSet() { + return new DataStoreTypes.ChangeSet<>( + DataStoreTypes.ChangeSetType.None, + Selector.EMPTY, + null, + null + ); + } + + @Test + public void executeReturnsCompletableFuture() { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 120); + + CompletableFuture result = condition.execute(); + + assertFalse(result.isDone()); + } + + @Test + public void getTypeReturnsFallback() { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 120); + + assertEquals(Condition.ConditionType.FALLBACK, condition.getType()); + } + + @Test + public void interruptedStatusStartsTimerThatCompletesResultFuture() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + assertFalse(resultFuture.isDone()); + + // Inform with INTERRUPTED status + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + // Future should still not be done immediately + assertFalse(resultFuture.isDone()); + + // Wait for timeout to fire + Condition result = resultFuture.get(2, TimeUnit.SECONDS); + + // Now it should be done and return the condition instance + assertTrue(resultFuture.isDone()); + assertSame(condition, result); + } + + @Test + public void changeSetCancelsActiveTimer() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Start timer with INTERRUPTED + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + // Cancel timer with CHANGE_SET + DataStoreTypes.ChangeSet changeSet = makeChangeSet(); + condition.inform(FDv2SourceResult.changeSet(changeSet, false)); + + // Wait longer than the timeout period + Thread.sleep(1500); + + // Future should still not be complete (timer was cancelled) + assertFalse(resultFuture.isDone()); + } + + @Test + public void changeSetWithoutActiveTimerDoesNothing() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Inform with CHANGE_SET without starting a timer first + DataStoreTypes.ChangeSet changeSet = makeChangeSet(); + condition.inform(FDv2SourceResult.changeSet(changeSet, false)); + + // Wait to ensure nothing happens + Thread.sleep(100); + + // Future should still not be complete + assertFalse(resultFuture.isDone()); + } + + @Test + public void multipleInterruptedStatusesDoNotStartMultipleTimers() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 2); + + CompletableFuture resultFuture = condition.execute(); + + // Inform with INTERRUPTED multiple times + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + Thread.sleep(100); + + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + Thread.sleep(100); + + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + // Wait for the timer (should only fire once) + Condition result = resultFuture.get(3, TimeUnit.SECONDS); + + assertTrue(resultFuture.isDone()); + assertSame(condition, result); + } + + @Test + public void terminalErrorStatusDoesNotStartTimer() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Inform with TERMINAL_ERROR status + condition.inform( + FDv2SourceResult.terminalError( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.ERROR_RESPONSE, 401, null, Instant.now()), + false + ) + ); + + // Wait longer than timeout + Thread.sleep(1500); + + // Future should still not be complete (no timer started) + assertFalse(resultFuture.isDone()); + } + + @Test + public void shutdownStatusDoesNotStartTimer() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Inform with SHUTDOWN status + condition.inform(FDv2SourceResult.shutdown()); + + // Wait longer than timeout + Thread.sleep(1500); + + // Future should still not be complete (no timer started) + assertFalse(resultFuture.isDone()); + } + + @Test + public void goodbyeStatusDoesNotStartTimer() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Inform with GOODBYE status + condition.inform(FDv2SourceResult.goodbye("server-requested", false)); + + // Wait longer than timeout + Thread.sleep(1500); + + // Future should still not be complete (no timer started) + assertFalse(resultFuture.isDone()); + } + + @Test + public void closeCancelsActiveTimer() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Start timer with INTERRUPTED + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + // Close the condition + condition.close(); + + // Wait longer than the timeout period + Thread.sleep(1500); + + // Future should still not be complete (timer was cancelled) + assertFalse(resultFuture.isDone()); + } + + @Test + public void closeWithoutActiveTimerDoesNotFail() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 120); + + // Close without starting a timer + condition.close(); + + // Should not throw exception + } + + @Test + public void timerCanBeStartedAfterBeingCancelled() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Start timer + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + // Cancel timer with CHANGE_SET + DataStoreTypes.ChangeSet changeSet = makeChangeSet(); + condition.inform(FDv2SourceResult.changeSet(changeSet, false)); + + // Start timer again + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + // Wait for second timer to fire + Condition result = resultFuture.get(2, TimeUnit.SECONDS); + + assertTrue(resultFuture.isDone()); + assertSame(condition, result); + } + + @Test + public void changeSetAfterTimerFiresDoesNotAffectCompletedFuture() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Start timer + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + // Wait for timer to fire + Condition result = resultFuture.get(2, TimeUnit.SECONDS); + assertTrue(resultFuture.isDone()); + assertSame(condition, result); + + // Inform with CHANGE_SET after timer has fired + DataStoreTypes.ChangeSet changeSet = makeChangeSet(); + condition.inform(FDv2SourceResult.changeSet(changeSet, false)); + + // Future should remain complete + assertTrue(resultFuture.isDone()); + } + + @Test + public void factoryCreatesFallbackCondition() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition.Factory factory = new FallbackCondition.Factory(executor, 1); + + FallbackCondition condition = (FallbackCondition) factory.build(); + + // Verify it works by using it + CompletableFuture resultFuture = condition.execute(); + assertFalse(resultFuture.isDone()); + + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + Condition result = resultFuture.get(2, TimeUnit.SECONDS); + assertTrue(resultFuture.isDone()); + assertSame(condition, result); + } + + @Test + public void executeReturnsTheSameFutureOnMultipleCalls() { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 120); + + CompletableFuture first = condition.execute(); + CompletableFuture second = condition.execute(); + + assertSame(first, second); + } + + @Test + public void changeSetDuringTimerExecutionCancelsTimer() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Start timer + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + // Wait partway through timeout period + Thread.sleep(500); + + // Cancel with CHANGE_SET + DataStoreTypes.ChangeSet changeSet = makeChangeSet(); + condition.inform(FDv2SourceResult.changeSet(changeSet, false)); + + // Wait past the original timeout + Thread.sleep(1000); + + // Future should still not be complete + assertFalse(resultFuture.isDone()); + } + + @Test + public void multipleChangeSetCallsWithActiveTimerAreHandled() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Start timer + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + // Cancel with multiple CHANGE_SETs + DataStoreTypes.ChangeSet changeSet = makeChangeSet(); + condition.inform(FDv2SourceResult.changeSet(changeSet, false)); + condition.inform(FDv2SourceResult.changeSet(changeSet, false)); + condition.inform(FDv2SourceResult.changeSet(changeSet, false)); + + // Wait longer than timeout + Thread.sleep(1500); + + // Future should still not be complete + assertFalse(resultFuture.isDone()); + } + + @Test + public void closeCanBeCalledMultipleTimes() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 1); + + // Start timer + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + // Close multiple times + condition.close(); + condition.close(); + condition.close(); + + // Should not throw exception + } +} \ No newline at end of file From fba5aa29b7a264d38fd1f65ed52c82bc10caed17 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Fri, 23 Jan 2026 15:27:34 -0800 Subject: [PATCH 2/3] Extract conditions from FDv2DataSource. --- .../sdk/server/FDv2DataSource.java | 198 +----------------- .../sdk/server/FDv2DataSourceConditions.java | 184 ++++++++++++++++ .../server/SynchronizerFactoryWithState.java | 38 ++++ .../FDv2DataSourceFallbackConditionTest.java | 4 +- 4 files changed, 229 insertions(+), 195 deletions(-) create mode 100644 lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSourceConditions.java create mode 100644 lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SynchronizerFactoryWithState.java diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java index edaf130a..4ae0956b 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java @@ -18,6 +18,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import static com.launchdarkly.sdk.server.FDv2DataSourceConditions.Condition; +import static com.launchdarkly.sdk.server.FDv2DataSourceConditions.ConditionFactory; +import static com.launchdarkly.sdk.server.FDv2DataSourceConditions.FallbackCondition; +import static com.launchdarkly.sdk.server.FDv2DataSourceConditions.RecoveryCondition; + class FDv2DataSource implements DataSource { /** * Default fallback timeout of 2 minutes. The timeout is only configurable for testing. @@ -50,199 +55,6 @@ class FDv2DataSource implements DataSource { private final LDLogger logger; - /** - * Package-private for testing. - */ - interface Condition { - enum ConditionType { - FALLBACK, - RECOVERY, - } - CompletableFuture execute(); - - void inform(FDv2SourceResult sourceResult); - - void close() throws IOException; - - ConditionType getType(); - } - - interface ConditionFactory { - Condition build(); - - Condition.ConditionType getType(); - } - - - static abstract class TimedCondition implements Condition { - protected final CompletableFuture resultFuture = new CompletableFuture<>(); - - protected final ScheduledExecutorService sharedExecutor; - - /** - * Future for the timeout task, if any. Will be null when no timeout is active. - */ - protected ScheduledFuture timerFuture; - - /** - * Timeout duration for the fallback operation. - */ - protected final long timeoutSeconds; - - public TimedCondition(ScheduledExecutorService sharedExecutor, long timeoutSeconds) { - this.sharedExecutor = sharedExecutor; - this.timeoutSeconds = timeoutSeconds; - } - - @Override - public CompletableFuture execute() { - return resultFuture; - } - - @Override - public void close() throws IOException { - if (timerFuture != null) { - timerFuture.cancel(false); - timerFuture = null; - } - } - - static abstract class Factory implements ConditionFactory { - protected final ScheduledExecutorService sharedExecutor; - protected final long timeoutSeconds; - - public Factory(ScheduledExecutorService sharedExecutor, long timeout) { - this.sharedExecutor = sharedExecutor; - this.timeoutSeconds = timeout; - } - } - } - - /** - * This condition is used to determine if a fallback should be performed. It is informed of each data source result - * via {@link #inform(FDv2SourceResult)}. Based on the results, it updates its internal state. When the fallback - * condition is met, then the {@link Future} returned by {@link #execute()} will complete. - *

- * This is package-private, instead of private, for ease of testing. - */ - static class FallbackCondition extends TimedCondition { - static class Factory extends TimedCondition.Factory { - public Factory(ScheduledExecutorService sharedExecutor, long timeout) { - super(sharedExecutor, timeout); - } - @Override - public Condition build() { - return new FallbackCondition(sharedExecutor, timeoutSeconds); - } - - @Override - public ConditionType getType() { - return ConditionType.FALLBACK; - } - } - - public FallbackCondition(ScheduledExecutorService sharedExecutor, long timeoutSeconds) { - super(sharedExecutor, timeoutSeconds); - } - - @Override - public void inform(FDv2SourceResult sourceResult) { - if(sourceResult == null) { - return; - } - if(sourceResult.getResultType() == FDv2SourceResult.ResultType.CHANGE_SET) { - if(timerFuture != null) { - timerFuture.cancel(false); - timerFuture = null; - } - } - if(sourceResult.getResultType() == FDv2SourceResult.ResultType.STATUS && sourceResult.getStatus().getState() == FDv2SourceResult.State.INTERRUPTED) { - if (timerFuture == null) { - timerFuture = sharedExecutor.schedule(() -> { - resultFuture.complete(this); - return null; - }, timeoutSeconds, TimeUnit.SECONDS); - } - } - } - - @Override - public ConditionType getType() { - return ConditionType.FALLBACK; - } - } - - static class RecoveryCondition extends TimedCondition { - - static class Factory extends TimedCondition.Factory { - public Factory(ScheduledExecutorService sharedExecutor, long timeout) { - super(sharedExecutor, timeout); - } - @Override - public Condition build() { - return new RecoveryCondition(sharedExecutor, timeoutSeconds); - } - - @Override - public ConditionType getType() { - return ConditionType.RECOVERY; - } - } - - public RecoveryCondition(ScheduledExecutorService sharedExecutor, long timeoutSeconds) { - super(sharedExecutor, timeoutSeconds); - timerFuture = sharedExecutor.schedule(() -> { - resultFuture.complete(this); - return null; - }, timeoutSeconds, TimeUnit.SECONDS); - } - - @Override - public void inform(FDv2SourceResult sourceResult) { - // Time-based recovery. - } - - @Override - public ConditionType getType() { - return ConditionType.RECOVERY; - } - } - - private static class SynchronizerFactoryWithState { - public enum State { - /** - * This synchronizer is available to use. - */ - Available, - - /** - * This synchronizer is no longer available to use. - */ - Blocked - } - - private final DataSourceFactory factory; - - private State state = State.Available; - - - public SynchronizerFactoryWithState(DataSourceFactory factory) { - this.factory = factory; - } - - public State getState() { - return state; - } - - public void block() { - state = State.Blocked; - } - - public Synchronizer build() { - return factory.build(); - } - } - public interface DataSourceFactory { T build(); } diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSourceConditions.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSourceConditions.java new file mode 100644 index 00000000..714e5869 --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSourceConditions.java @@ -0,0 +1,184 @@ +package com.launchdarkly.sdk.server; + +import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Container class for FDv2 data source conditions and related types. + *

+ * This class is non-constructable and serves only as a namespace for condition-related types. + * Package-private for internal use and testing. + */ +class FDv2DataSourceConditions { + /** + * Private constructor to prevent instantiation. + */ + private FDv2DataSourceConditions() { + } + + /** + * Package-private for testing. + */ + interface Condition extends Closeable { + enum ConditionType { + FALLBACK, + RECOVERY, + } + + CompletableFuture execute(); + + void inform(FDv2SourceResult sourceResult); + + void close() throws IOException; + + ConditionType getType(); + } + + interface ConditionFactory { + Condition build(); + + Condition.ConditionType getType(); + } + + static abstract class TimedCondition implements Condition { + protected final CompletableFuture resultFuture = new CompletableFuture<>(); + + protected final ScheduledExecutorService sharedExecutor; + + /** + * Future for the timeout task, if any. Will be null when no timeout is active. + */ + protected ScheduledFuture timerFuture; + + /** + * Timeout duration for the fallback operation. + */ + protected final long timeoutSeconds; + + public TimedCondition(ScheduledExecutorService sharedExecutor, long timeoutSeconds) { + this.sharedExecutor = sharedExecutor; + this.timeoutSeconds = timeoutSeconds; + } + + @Override + public CompletableFuture execute() { + return resultFuture; + } + + @Override + public void close() throws IOException { + if (timerFuture != null) { + timerFuture.cancel(false); + timerFuture = null; + } + } + + static abstract class Factory implements ConditionFactory { + protected final ScheduledExecutorService sharedExecutor; + protected final long timeoutSeconds; + + public Factory(ScheduledExecutorService sharedExecutor, long timeout) { + this.sharedExecutor = sharedExecutor; + this.timeoutSeconds = timeout; + } + } + } + + /** + * This condition is used to determine if a fallback should be performed. It is informed of each data source result + * via {@link #inform(FDv2SourceResult)}. Based on the results, it updates its internal state. When the fallback + * condition is met, then the {@link java.util.concurrent.Future} returned by {@link #execute()} will complete. + *

+ * This is package-private, instead of private, for ease of testing. + */ + static class FallbackCondition extends TimedCondition { + static class Factory extends TimedCondition.Factory { + public Factory(ScheduledExecutorService sharedExecutor, long timeout) { + super(sharedExecutor, timeout); + } + + @Override + public Condition build() { + return new FallbackCondition(sharedExecutor, timeoutSeconds); + } + + @Override + public ConditionType getType() { + return ConditionType.FALLBACK; + } + } + + public FallbackCondition(ScheduledExecutorService sharedExecutor, long timeoutSeconds) { + super(sharedExecutor, timeoutSeconds); + } + + @Override + public void inform(FDv2SourceResult sourceResult) { + if (sourceResult == null) { + return; + } + if (sourceResult.getResultType() == FDv2SourceResult.ResultType.CHANGE_SET) { + if (timerFuture != null) { + timerFuture.cancel(false); + timerFuture = null; + } + } + if (sourceResult.getResultType() == FDv2SourceResult.ResultType.STATUS && sourceResult.getStatus().getState() == FDv2SourceResult.State.INTERRUPTED) { + if (timerFuture == null) { + timerFuture = sharedExecutor.schedule(() -> { + resultFuture.complete(this); + return null; + }, timeoutSeconds, TimeUnit.SECONDS); + } + } + } + + @Override + public ConditionType getType() { + return ConditionType.FALLBACK; + } + } + + static class RecoveryCondition extends TimedCondition { + + static class Factory extends TimedCondition.Factory { + public Factory(ScheduledExecutorService sharedExecutor, long timeout) { + super(sharedExecutor, timeout); + } + + @Override + public Condition build() { + return new RecoveryCondition(sharedExecutor, timeoutSeconds); + } + + @Override + public ConditionType getType() { + return ConditionType.RECOVERY; + } + } + + public RecoveryCondition(ScheduledExecutorService sharedExecutor, long timeoutSeconds) { + super(sharedExecutor, timeoutSeconds); + timerFuture = sharedExecutor.schedule(() -> { + resultFuture.complete(this); + return null; + }, timeoutSeconds, TimeUnit.SECONDS); + } + + @Override + public void inform(FDv2SourceResult sourceResult) { + // Time-based recovery. + } + + @Override + public ConditionType getType() { + return ConditionType.RECOVERY; + } + } +} \ No newline at end of file diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SynchronizerFactoryWithState.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SynchronizerFactoryWithState.java new file mode 100644 index 00000000..c0afa642 --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SynchronizerFactoryWithState.java @@ -0,0 +1,38 @@ +package com.launchdarkly.sdk.server; + +import com.launchdarkly.sdk.server.datasources.Synchronizer; + +class SynchronizerFactoryWithState { + public enum State { + /** + * This synchronizer is available to use. + */ + Available, + + /** + * This synchronizer is no longer available to use. + */ + Blocked + } + + private final FDv2DataSource.DataSourceFactory factory; + + private State state = State.Available; + + + public SynchronizerFactoryWithState(FDv2DataSource.DataSourceFactory factory) { + this.factory = factory; + } + + public State getState() { + return state; + } + + public void block() { + state = State.Blocked; + } + + public Synchronizer build() { + return factory.build(); + } +} diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceFallbackConditionTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceFallbackConditionTest.java index b4577e1a..ca3fb5fd 100644 --- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceFallbackConditionTest.java +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceFallbackConditionTest.java @@ -1,8 +1,8 @@ package com.launchdarkly.sdk.server; import com.launchdarkly.sdk.internal.fdv2.sources.Selector; -import com.launchdarkly.sdk.server.FDv2DataSource.Condition; -import com.launchdarkly.sdk.server.FDv2DataSource.FallbackCondition; +import com.launchdarkly.sdk.server.FDv2DataSourceConditions.Condition; +import com.launchdarkly.sdk.server.FDv2DataSourceConditions.FallbackCondition; import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes; From 472a1d2b5847f8cc185e55f691e47e7e1ffd1dba Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Fri, 23 Jan 2026 15:40:03 -0800 Subject: [PATCH 3/3] Extract synchronizer state management from the FDv2DataSource. --- .../sdk/server/FDv2DataSource.java | 142 +++------------ .../sdk/server/SynchronizerStateManager.java | 164 ++++++++++++++++++ 2 files changed, 192 insertions(+), 114 deletions(-) create mode 100644 lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SynchronizerStateManager.java diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java index 4ae0956b..1d2c75e4 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java @@ -9,7 +9,6 @@ import com.launchdarkly.sdk.server.subsystems.DataSource; import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSinkV2; -import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -35,7 +34,7 @@ class FDv2DataSource implements DataSource { private static final long defaultRecoveryTimeout = 5 * 60; private final List> initializers; - private final List synchronizers; + private final SynchronizerStateManager synchronizerStateManager; private final List conditionFactories; @@ -44,13 +43,6 @@ class FDv2DataSource implements DataSource { private final CompletableFuture startFuture = new CompletableFuture<>(); private final AtomicBoolean started = new AtomicBoolean(false); - /** - * Lock for active sources and shutdown state. - */ - private final Object activeSourceLock = new Object(); - private Closeable activeSource; - private boolean isShutdown = false; - private final int threadPriority; private final LDLogger logger; @@ -67,7 +59,15 @@ public FDv2DataSource( LDLogger logger, ScheduledExecutorService sharedExecutor ) { - this(initializers, synchronizers, dataSourceUpdates, threadPriority, logger, sharedExecutor, defaultFallbackTimeout, defaultRecoveryTimeout); + this(initializers, + synchronizers, + dataSourceUpdates, + threadPriority, + logger, + sharedExecutor, + defaultFallbackTimeout, + defaultRecoveryTimeout + ); } @@ -82,10 +82,11 @@ public FDv2DataSource( long recoveryTimeout ) { this.initializers = initializers; - this.synchronizers = synchronizers + List synchronizerFactories = synchronizers .stream() .map(SynchronizerFactoryWithState::new) .collect(Collectors.toList()); + this.synchronizerStateManager = new SynchronizerStateManager(synchronizerFactories); this.dataSourceUpdates = dataSourceUpdates; this.threadPriority = threadPriority; this.logger = logger; @@ -110,62 +111,13 @@ private void run() { runThread.start(); } - /** - * We start at -1, so finding the next synchronizer can non-conditionally increment the index. - */ - private int sourceIndex = -1; - - /** - * Reset the source index to -1, indicating that we should start from the first synchronizer when looking for - * the next one to use. This is used when recovering from a non-primary synchronizer. - */ - private void resetSynchronizerSourceIndex() { - synchronized (activeSourceLock) { - sourceIndex = -1; - } - } - - /** - * Get the next synchronizer to use. This operates based on tracking the index of the currently active synchronizer, - * which will loop through all available synchronizers handling interruptions. Then a non-prime synchronizer recovers - * the source index will be reset, and we start at the beginning. - *

- * Any given synchronizer can be marked as blocked, in which case that synchronizer is not eligible to be used again. - * Synchronizers that are not blocked are available, and this function will only return available synchronizers. - * @return the next synchronizer factory to use, or null if there are no more available synchronizers. - */ - private SynchronizerFactoryWithState getNextAvailableSynchronizer() { - synchronized (synchronizers) { - SynchronizerFactoryWithState factory = null; - - // There is at least one available factory. - if(synchronizers.stream().anyMatch(s -> s.getState() == SynchronizerFactoryWithState.State.Available)) { - // Look for the next synchronizer starting at the position after the current one. (avoiding just re-using the same synchronizer.) - while(factory == null) { - sourceIndex++; - // We aren't using module here because we want to keep the stored index within range instead - // of increasing indefinitely. - if(sourceIndex >= synchronizers.size()) { - sourceIndex = 0; - } - SynchronizerFactoryWithState candidate = synchronizers.get(sourceIndex); - if (candidate.getState() == SynchronizerFactoryWithState.State.Available) { - factory = candidate; - } - - } - } - - return factory; - } - } private void runInitializers() { boolean anyDataReceived = false; for (DataSourceFactory factory : initializers) { try { Initializer initializer = factory.build(); - if (setActiveSource(initializer)) return; + if (synchronizerStateManager.setActiveSource(initializer)) return; FDv2SourceResult result = initializer.run().get(); switch (result.getResultType()) { case CHANGE_SET: @@ -202,24 +154,9 @@ private void runInitializers() { * @return a list of conditions to apply to the synchronizer */ private List getConditions() { - boolean isPrimeSynchronizer = false; - int availableSynchronizers = 0; - boolean firstAvailableSynchronizer = true; - - synchronized (activeSourceLock) { - for (int index = 0; index < synchronizers.size(); index++) { + int availableSynchronizers = synchronizerStateManager.getAvailableSynchronizerCount(); + boolean isPrimeSynchronizer = synchronizerStateManager.isPrimeSynchronizer(); - if (synchronizers.get(index).getState() == SynchronizerFactoryWithState.State.Available) { - if (firstAvailableSynchronizer && sourceIndex == index) { - // This is the first synchronizer that is available, and it also is the current one. - isPrimeSynchronizer = true; - } - // Subsequently encountered synchronizers that are available are not the first one. - firstAvailableSynchronizer = false; - availableSynchronizers++; - } - } - } if(availableSynchronizers == 1) { // If there is only 1 synchronizer, then we cannot fall back or recover, so we don't need any conditions. return Collections.emptyList(); @@ -235,24 +172,27 @@ private List getConditions() { } private boolean runSynchronizers() { - SynchronizerFactoryWithState availableSynchronizer = getNextAvailableSynchronizer(); + SynchronizerFactoryWithState availableSynchronizer = synchronizerStateManager.getNextAvailableSynchronizer(); while (availableSynchronizer != null) { Synchronizer synchronizer = availableSynchronizer.build(); // Returns true if shutdown. - if (setActiveSource(synchronizer)) return false; + if (synchronizerStateManager.setActiveSource(synchronizer)) return false; try { boolean running = true; // Conditions run once for the life of the synchronizer. List conditions = getConditions(); - CompletableFuture conditionFutures = CompletableFuture.anyOf( + + // The conditionsFuture will complete if any condition is met. Meeting any condition means we will + // switch to a different synchronizer. + CompletableFuture conditionsFuture = CompletableFuture.anyOf( conditions.stream().map(Condition::execute).toArray(CompletableFuture[]::new)); while (running) { CompletableFuture nextResultFuture = synchronizer.next(); - Object res = CompletableFuture.anyOf(conditionFutures, nextResultFuture).get(); + Object res = CompletableFuture.anyOf(conditionsFuture, nextResultFuture).get(); if(res instanceof Condition) { Condition c = (Condition) res; @@ -265,7 +205,7 @@ private boolean runSynchronizers() { case RECOVERY: // For recovery, we will start at the first available synchronizer. // So we reset the source index, and finding the source will start at the beginning. - resetSynchronizerSourceIndex(); + synchronizerStateManager.resetSourceIndex(); break; } // A running synchronizer will only have fallback and recovery conditions that it can act on. @@ -308,7 +248,8 @@ private boolean runSynchronizers() { // We have been requested to fall back to FDv1. We handle whatever message was associated, // close the synchronizer, and then fallback. if(result.isFdv1Fallback()) { - safeClose(synchronizer); + // When falling back to FDv1, we are done with any FDv2 synchronizers. + synchronizerStateManager.shutdown(); return true; } } @@ -316,29 +257,7 @@ private boolean runSynchronizers() { // TODO: Log. // Move to next synchronizer. } - availableSynchronizer = getNextAvailableSynchronizer(); - } - return false; - } - - private void safeClose(Closeable synchronizer) { - try { - synchronizer.close(); - } catch (IOException e) { - // Ignore close exceptions. - } - } - - private boolean setActiveSource(Closeable synchronizer) { - synchronized (activeSourceLock) { - if (activeSource != null) { - safeClose(activeSource); - } - if (isShutdown) { - safeClose(synchronizer); - return true; - } - activeSource = synchronizer; + availableSynchronizer = synchronizerStateManager.getNextAvailableSynchronizer(); } return false; } @@ -361,17 +280,12 @@ public boolean isInitialized() { } @Override - public void close() throws IOException { + public void close() { // If there is an active source, we will shut it down, and that will result in the loop handling that source // exiting. // If we do not have an active source, then the loop will check isShutdown when attempting to set one. When // it detects shutdown, it will exit the loop. - synchronized (activeSourceLock) { - isShutdown = true; - if (activeSource != null) { - activeSource.close(); - } - } + synchronizerStateManager.shutdown(); // If this is already set, then this has no impact. startFuture.complete(false); diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SynchronizerStateManager.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SynchronizerStateManager.java new file mode 100644 index 00000000..a1a143c9 --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SynchronizerStateManager.java @@ -0,0 +1,164 @@ +package com.launchdarkly.sdk.server; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * Manages the state of synchronizers including tracking which synchronizer is active, + * managing the list of available synchronizers, and handling source transitions. + *

+ * Package-private for internal use. + */ +class SynchronizerStateManager { + private final List synchronizers; + + /** + * Lock for active sources and shutdown state. + */ + private final Object activeSourceLock = new Object(); + private Closeable activeSource; + private boolean isShutdown = false; + + /** + * We start at -1, so finding the next synchronizer can non-conditionally increment the index. + */ + private int sourceIndex = -1; + + public SynchronizerStateManager(List synchronizers) { + this.synchronizers = synchronizers; + } + + /** + * Reset the source index to -1, indicating that we should start from the first synchronizer when looking for + * the next one to use. This is used when recovering from a non-primary synchronizer. + */ + public void resetSourceIndex() { + synchronized (activeSourceLock) { + sourceIndex = -1; + } + } + + /** + * Get the next synchronizer to use. This operates based on tracking the index of the currently active synchronizer, + * which will loop through all available synchronizers handling interruptions. Then a non-prime synchronizer recovers + * the source index will be reset, and we start at the beginning. + *

+ * Any given synchronizer can be marked as blocked, in which case that synchronizer is not eligible to be used again. + * Synchronizers that are not blocked are available, and this function will only return available synchronizers. + * @return the next synchronizer factory to use, or null if there are no more available synchronizers. + */ + public SynchronizerFactoryWithState getNextAvailableSynchronizer() { + synchronized (synchronizers) { + SynchronizerFactoryWithState factory = null; + + // There is at least one available factory. + if(synchronizers.stream().anyMatch(s -> s.getState() == SynchronizerFactoryWithState.State.Available)) { + // Look for the next synchronizer starting at the position after the current one. (avoiding just re-using the same synchronizer.) + while(factory == null) { + sourceIndex++; + // We aren't using module here because we want to keep the stored index within range instead + // of increasing indefinitely. + if(sourceIndex >= synchronizers.size()) { + sourceIndex = 0; + } + SynchronizerFactoryWithState candidate = synchronizers.get(sourceIndex); + if (candidate.getState() == SynchronizerFactoryWithState.State.Available) { + factory = candidate; + } + + } + } + + return factory; + } + } + + /** + * Determine if the currently active synchronizer is the prime (first available) synchronizer. + * @return true if the current synchronizer is the prime synchronizer, false otherwise + */ + public boolean isPrimeSynchronizer() { + synchronized (activeSourceLock) { + boolean firstAvailableSynchronizer = true; + for (int index = 0; index < synchronizers.size(); index++) { + if (synchronizers.get(index).getState() == SynchronizerFactoryWithState.State.Available) { + if (firstAvailableSynchronizer && sourceIndex == index) { + // This is the first synchronizer that is available, and it also is the current one. + return true; + } + // Subsequently encountered synchronizers that are available are not the first one. + firstAvailableSynchronizer = false; + } + } + } + return false; + } + + /** + * Get the count of available synchronizers. + * @return the number of available synchronizers + */ + public int getAvailableSynchronizerCount() { + synchronized (activeSourceLock) { + int count = 0; + for (int index = 0; index < synchronizers.size(); index++) { + if (synchronizers.get(index).getState() == SynchronizerFactoryWithState.State.Available) { + count++; + } + } + return count; + } + } + + /** + * Set the active source. If shutdown has been initiated, the source will be closed immediately. + * Any previously active source will be closed. + * @param source the source to set as active + * @return true if shutdown has been initiated, false otherwise + */ + public boolean setActiveSource(Closeable source) { + synchronized (activeSourceLock) { + if (activeSource != null) { + safeClose(activeSource); + } + if (isShutdown) { + safeClose(source); + return true; + } + activeSource = source; + } + return false; + } + + /** + * Initiate shutdown of the state manager. This will close any active source. + * @throws IOException if an error occurs closing the active source + */ + public void shutdown() { + synchronized (activeSourceLock) { + isShutdown = true; + if (activeSource != null) { + try { + activeSource.close(); + } catch (IOException e) { + // We are done with this synchronizer, so we don't care if it encounters + // an error condition. + } + activeSource = null; + } + } + } + + /** + * Safely close a closeable, ignoring any exceptions. + * @param closeable the closeable to close + */ + private void safeClose(Closeable closeable) { + try { + closeable.close(); + } catch (IOException e) { + // Ignore close exceptions. + } + } +} \ No newline at end of file