From b9f2df319cad0f1a07a72e4ccc08c7a62453f2c8 Mon Sep 17 00:00:00 2001 From: Todd Anderson Date: Thu, 22 Jan 2026 15:08:05 -0500 Subject: [PATCH] chore: adds transactional persistent store and recovery --- .../sdk/server/FDv2DataSystem.java | 23 +- .../server/PersistentDataStoreConverter.java | 95 ++ .../server/PersistentDataStoreWrapper.java | 100 +- .../sdk/server/SettableCache.java | 28 + .../sdk/server/WriteThroughStore.java | 199 ++++ .../PersistentDataStoreConverterTest.java | 471 +++++++++ ...ersistentDataStoreWrapperRecoveryTest.java | 629 ++++++++++++ .../sdk/server/WriteThroughStoreTest.java | 930 ++++++++++++++++++ 8 files changed, 2441 insertions(+), 34 deletions(-) create mode 100644 lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PersistentDataStoreConverter.java create mode 100644 lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SettableCache.java create mode 100644 lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/WriteThroughStore.java create mode 100644 lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/PersistentDataStoreConverterTest.java create mode 100644 lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/PersistentDataStoreWrapperRecoveryTest.java create mode 100644 lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/WriteThroughStoreTest.java diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSystem.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSystem.java index 70fc004..0562ce6 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSystem.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSystem.java @@ -15,6 +15,7 @@ import com.launchdarkly.sdk.server.subsystems.DataStore; import com.launchdarkly.sdk.server.subsystems.LoggingConfiguration; import com.launchdarkly.sdk.server.subsystems.DataSystemConfiguration; +import com.launchdarkly.sdk.server.subsystems.ComponentConfigurer; import java.io.Closeable; import java.io.IOException; @@ -91,7 +92,26 @@ static FDv2DataSystem create( DataStoreUpdatesImpl dataStoreUpdates = new DataStoreUpdatesImpl( EventBroadcasterImpl.forDataStoreStatus(clientContext.sharedExecutor, logger)); - InMemoryDataStore store = new InMemoryDataStore(); + DataSystemConfiguration dataSystemConfiguration = config.dataSystem.build(); + + InMemoryDataStore memoryStore = new InMemoryDataStore(); + + DataStore persistentStore = null; + if (dataSystemConfiguration.getPersistentStore() != null) { + persistentStore = dataSystemConfiguration.getPersistentStore().build(clientContext.withDataStoreUpdateSink(dataStoreUpdates)); + + // Configure persistent store to sync from memory store during recovery (ReadWrite mode only) + if (persistentStore != null && dataSystemConfiguration.getPersistentDataStoreMode() == DataSystemConfiguration.DataStoreMode.READ_WRITE) { + if (persistentStore instanceof SettableCache) { + ((SettableCache) persistentStore).setCacheExporter(memoryStore); + } + } + } + + WriteThroughStore store = new WriteThroughStore( + memoryStore, + persistentStore, + dataSystemConfiguration.getPersistentDataStoreMode()); DataStoreStatusProvider dataStoreStatusProvider = new DataStoreStatusProviderImpl(store, dataStoreUpdates); @@ -113,7 +133,6 @@ static FDv2DataSystem create( logger ); - DataSystemConfiguration dataSystemConfiguration = config.dataSystem.build(); SelectorSource selectorSource = new SelectorSourceFacade(store); DataSourceBuildInputs builderContext = new DataSourceBuildInputs( diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PersistentDataStoreConverter.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PersistentDataStoreConverter.java new file mode 100644 index 0000000..050a991 --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PersistentDataStoreConverter.java @@ -0,0 +1,95 @@ +package com.launchdarkly.sdk.server; + +import com.google.common.collect.ImmutableList; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.FullDataSet; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.KeyedItems; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.SerializedItemDescriptor; + +import java.util.AbstractMap; +import java.util.Map; + +/** + * Utility for converting between in-memory and serialized persistent data store formats. + */ +final class PersistentDataStoreConverter { + private PersistentDataStoreConverter() { + // Utility class - prevent instantiation + } + + /** + * Converts a FullDataSet of ItemDescriptor to SerializedItemDescriptor format. + * + * @param inMemoryData the in-memory data to convert + * @return a FullDataSet in serialized format suitable for persistent stores + */ + static FullDataSet toSerializedFormat( + FullDataSet inMemoryData) { + ImmutableList.Builder>> builder = + ImmutableList.builder(); + + for (Map.Entry> kindEntry : inMemoryData.getData()) { + DataKind kind = kindEntry.getKey(); + KeyedItems items = kindEntry.getValue(); + + builder.add(new AbstractMap.SimpleEntry<>( + kind, + serializeAll(kind, items) + )); + } + + return new FullDataSet<>(builder.build()); + } + + /** + * Serializes a single item descriptor. + * + * @param kind the data kind + * @param itemDesc the item descriptor to serialize + * @return a serialized item descriptor + */ + static SerializedItemDescriptor serialize(DataKind kind, ItemDescriptor itemDesc) { + boolean isDeleted = itemDesc.getItem() == null; + return new SerializedItemDescriptor(itemDesc.getVersion(), isDeleted, kind.serialize(itemDesc)); + } + + /** + * Serializes all items of a given DataKind from a KeyedItems collection. + * + * @param kind the data kind + * @param items the items to serialize + * @return keyed items in serialized format + */ + static KeyedItems serializeAll( + DataKind kind, + KeyedItems items) { + ImmutableList.Builder> itemsBuilder = + ImmutableList.builder(); + for (Map.Entry e : items.getItems()) { + itemsBuilder.add(new AbstractMap.SimpleEntry<>(e.getKey(), serialize(kind, e.getValue()))); + } + return new KeyedItems<>(itemsBuilder.build()); + } + + /** + * Deserializes a single item descriptor. + * + * @param kind the data kind + * @param serializedItemDesc the serialized item descriptor + * @return a deserialized item descriptor + */ + static ItemDescriptor deserialize(DataKind kind, SerializedItemDescriptor serializedItemDesc) { + if (serializedItemDesc.isDeleted() || serializedItemDesc.getSerializedItem() == null) { + return ItemDescriptor.deletedItem(serializedItemDesc.getVersion()); + } + ItemDescriptor deserializedItem = kind.deserialize(serializedItemDesc.getSerializedItem()); + if (serializedItemDesc.getVersion() == 0 || + serializedItemDesc.getVersion() == deserializedItem.getVersion() || + deserializedItem.getItem() == null) { + return deserializedItem; + } + // If the store gave us a version number that isn't what was encoded in the object, trust it + return new ItemDescriptor(serializedItemDesc.getVersion(), deserializedItem.getItem()); + } +} diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PersistentDataStoreWrapper.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PersistentDataStoreWrapper.java index b58adf3..52af86a 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PersistentDataStoreWrapper.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PersistentDataStoreWrapper.java @@ -43,7 +43,7 @@ *

* This class is only constructed by {@link PersistentDataStoreBuilder}. */ -final class PersistentDataStoreWrapper implements DataStore { +final class PersistentDataStoreWrapper implements DataStore, SettableCache { private final PersistentDataStore core; private final LoadingCache> itemCache; private final LoadingCache> allCache; @@ -55,6 +55,9 @@ final class PersistentDataStoreWrapper implements DataStore { private final ListeningExecutorService cacheExecutor; private final LDLogger logger; + private final Object externalStoreLock = new Object(); + private volatile CacheExporter externalCache; + PersistentDataStoreWrapper( final PersistentDataStore core, Duration cacheTtl, @@ -180,7 +183,7 @@ public void init(FullDataSet allData) { ImmutableList.Builder>> allBuilder = ImmutableList.builder(); for (Map.Entry> e0: allData.getData()) { DataKind kind = e0.getKey(); - KeyedItems items = serializeAll(kind, e0.getValue()); + KeyedItems items = PersistentDataStoreConverter.serializeAll(kind, e0.getValue()); allBuilder.add(new AbstractMap.SimpleEntry<>(kind, items)); } RuntimeException failure = initCore(new FullDataSet<>(allBuilder.build())); @@ -260,7 +263,7 @@ public boolean upsert(DataKind kind, String key, ItemDescriptor item) { synchronized (cachedDataKinds) { cachedDataKinds.add(kind); } - SerializedItemDescriptor serializedItem = serialize(kind, item); + SerializedItemDescriptor serializedItem = PersistentDataStoreConverter.serialize(kind, item); boolean updated = false; RuntimeException failure = null; try { @@ -317,6 +320,24 @@ public boolean isStatusMonitoringEnabled() { return true; } + /** + * Sets an external data source for recovery synchronization. + *

+ * This should be called during initialization if the wrapper is being used + * in a write-through architecture where an external store maintains authoritative data. + *

+ * When we remove FDv1 support, we should remove this functionality and instead handle it at a higher + * layer. + * + * @param externalDataSource The external data source to sync from during recovery + */ + @Override + public void setCacheExporter(CacheExporter externalDataSource) { + synchronized (externalStoreLock) { + externalCache = externalDataSource; + } + } + @Override public CacheStats getCacheStats() { if (itemCache == null || allCache == null) { @@ -335,7 +356,7 @@ public CacheStats getCacheStats() { private ItemDescriptor getAndDeserializeItem(DataKind kind, String key) { SerializedItemDescriptor maybeSerializedItem = core.get(kind, key); - return maybeSerializedItem == null ? null : deserialize(kind, maybeSerializedItem); + return maybeSerializedItem == null ? null : PersistentDataStoreConverter.deserialize(kind, maybeSerializedItem); } private KeyedItems getAllAndDeserialize(DataKind kind) { @@ -345,36 +366,11 @@ private KeyedItems getAllAndDeserialize(DataKind kind) { } ImmutableList.Builder> b = ImmutableList.builder(); for (Map.Entry e: allItems.getItems()) { - b.add(new AbstractMap.SimpleEntry<>(e.getKey(), deserialize(kind, e.getValue()))); + b.add(new AbstractMap.SimpleEntry<>(e.getKey(), PersistentDataStoreConverter.deserialize(kind, e.getValue()))); } return new KeyedItems<>(b.build()); } - private SerializedItemDescriptor serialize(DataKind kind, ItemDescriptor itemDesc) { - boolean isDeleted = itemDesc.getItem() == null; - return new SerializedItemDescriptor(itemDesc.getVersion(), isDeleted, kind.serialize(itemDesc)); - } - - private KeyedItems serializeAll(DataKind kind, KeyedItems items) { - ImmutableList.Builder> itemsBuilder = ImmutableList.builder(); - for (Map.Entry e: items.getItems()) { - itemsBuilder.add(new AbstractMap.SimpleEntry<>(e.getKey(), serialize(kind, e.getValue()))); - } - return new KeyedItems<>(itemsBuilder.build()); - } - - private ItemDescriptor deserialize(DataKind kind, SerializedItemDescriptor serializedItemDesc) { - if (serializedItemDesc.isDeleted() || serializedItemDesc.getSerializedItem() == null) { - return ItemDescriptor.deletedItem(serializedItemDesc.getVersion()); - } - ItemDescriptor deserializedItem = kind.deserialize(serializedItemDesc.getSerializedItem()); - if (serializedItemDesc.getVersion() == 0 || serializedItemDesc.getVersion() == deserializedItem.getVersion() - || deserializedItem.getItem() == null) { - return deserializedItem; - } - // If the store gave us a version number that isn't what was encoded in the object, trust it - return new ItemDescriptor(serializedItemDesc.getVersion(), deserializedItem.getItem()); - } private KeyedItems updateSingleItem(KeyedItems items, String key, ItemDescriptor item) { // This is somewhat inefficient but it's preferable to use immutable data structures in the cache. @@ -401,7 +397,47 @@ private boolean pollAvailabilityAfterOutage() { if (!core.isStoreAvailable()) { return false; } - + + CacheExporter externalCacheSnapshot; + synchronized (externalStoreLock) { + externalCacheSnapshot = externalCache; + } + + // If we have an external data source (e.g., WriteThroughStore's memory store) that is initialized, + // use that as the authoritative source. Otherwise, fall back to our internal cache if it's configured + // to cache indefinitely. + if (externalCacheSnapshot != null) { + if (externalCacheSnapshot.isInitialized()) { + try { + FullDataSet externalData = externalCacheSnapshot.exportAll(); + FullDataSet serializedData = + PersistentDataStoreConverter.toSerializedFormat(externalData); + RuntimeException e = initCore(serializedData); + + if (e == null) { + logger.warn("Successfully updated persistent store from external data source"); + } else { + // We failed to write the data to the underlying store. In this case, we should not + // return to a recovered state, but just try this all again next time the poll task runs. + logger.error("Tried to write external data to persistent store after outage, but failed: {}", + LogValues.exceptionSummary(e)); + logger.debug(LogValues.exceptionTrace(e)); + return false; + } + } catch (Exception ex) { + // If we can't export from the external source, don't recover yet + logger.error("Failed to export data from external source during persistent store recovery: {}", + LogValues.exceptionSummary(ex)); + logger.debug(LogValues.exceptionTrace(ex)); + return false; + } + + return true; + } + } + + // Fall back to cache-based recovery if external store is not available/initialized + // and we're in infinite cache mode if (cacheIndefinitely && allCache != null) { // If we're in infinite cache mode, then we can assume the cache has a full set of current // flag data (since presumably the data source has still been running) and we can just @@ -414,7 +450,7 @@ private boolean pollAvailabilityAfterOutage() { for (DataKind kind: allKinds) { KeyedItems items = allCache.getIfPresent(kind); if (items != null) { - builder.add(new AbstractMap.SimpleEntry<>(kind, serializeAll(kind, items))); + builder.add(new AbstractMap.SimpleEntry<>(kind, PersistentDataStoreConverter.serializeAll(kind, items))); } } RuntimeException e = initCore(new FullDataSet<>(builder.build())); diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SettableCache.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SettableCache.java new file mode 100644 index 0000000..10ea80b --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SettableCache.java @@ -0,0 +1,28 @@ +package com.launchdarkly.sdk.server; + +/** + * Optional interface for data stores that can accept a cache exporter + * for recovery synchronization. + *

+ * This interface is used in write-through architectures where a persistent store + * may fail temporarily. When the persistent store recovers, it can sync data from + * an external authoritative source (like an in-memory store) rather than relying + * solely on its internal cache. + *

+ * In the long-term, internal caching should be removed from store implementations and managed centrally. + *

+ * This is currently for internal implementations only. + */ +interface SettableCache { + /** + * Sets an external cache exporter for recovery synchronization. + *

+ * This should be called during initialization if the data store is being used + * in a write-through architecture where an external store maintains authoritative data. + * When the persistent store recovers from an outage, it will export data from this + * external source and write it to the underlying persistent storage. + * + * @param externalDataSource an external cache to sync from during recovery + */ + void setCacheExporter(CacheExporter externalDataSource); +} diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/WriteThroughStore.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/WriteThroughStore.java new file mode 100644 index 0000000..70dbeba --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/WriteThroughStore.java @@ -0,0 +1,199 @@ +package com.launchdarkly.sdk.server; + +import com.launchdarkly.sdk.internal.fdv2.sources.Selector; +import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider.CacheStats; +import com.launchdarkly.sdk.server.subsystems.DataStore; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSet; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSetType; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.FullDataSet; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.KeyedItems; +import com.launchdarkly.sdk.server.subsystems.DataSystemConfiguration.DataStoreMode; +import com.launchdarkly.sdk.server.subsystems.TransactionalDataStore; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A data store that writes through to both an in-memory store and an optional persistent store. + *

+ * During initialization, reads happen from the persistent store (if present). Once an initializing + * payload is received, reads switch to the in-memory store. Writes always go to both stores when + * in READ_WRITE mode. + *

+ * This class is package-private and should not be used by application code. + */ +final class WriteThroughStore implements DataStore, TransactionalDataStore { + private final DataStore memoryStore; + private final TransactionalDataStore txMemoryStore; + private final DataStore persistentStore; + private final boolean hasPersistence; + private final DataStoreMode persistenceMode; + private final AtomicBoolean hasReceivedAnInitializingPayload = new AtomicBoolean(false); + + private final Object activeStoreLock = new Object(); + private volatile DataStore activeReadStore; + + /** + * Creates a new WriteThroughStore. + * + * @param memoryStore the in-memory store (must implement TransactionalDataStore) + * @param persistentStore the persistent store, or null if no persistence is configured + * @param persistenceMode the mode for the persistent store + */ + WriteThroughStore(DataStore memoryStore, DataStore persistentStore, DataStoreMode persistenceMode) { + this.memoryStore = memoryStore; + this.txMemoryStore = (TransactionalDataStore) memoryStore; + this.persistentStore = persistentStore; + this.hasPersistence = persistentStore != null; + // During initialization, reads will happen from the persistent store. + this.activeReadStore = hasPersistence ? persistentStore : memoryStore; + this.persistenceMode = persistenceMode; + } + + @Override + public void init(FullDataSet allData) { + memoryStore.init(allData); + maybeSwitchStore(); + + if (persistenceMode == DataStoreMode.READ_WRITE) { + if (persistentStore != null) { + persistentStore.init(allData); + } + } + } + + @Override + public ItemDescriptor get(DataKind kind, String key) { + return activeReadStore.get(kind, key); + } + + @Override + public KeyedItems getAll(DataKind kind) { + return activeReadStore.getAll(kind); + } + + @Override + public boolean upsert(DataKind kind, String key, ItemDescriptor item) { + boolean result = memoryStore.upsert(kind, key, item); + if (hasPersistence && persistenceMode == DataStoreMode.READ_WRITE) { + result = result && persistentStore.upsert(kind, key, item); + } + + // We aren't going to switch from persistence on an update. + // Currently, an upsert should not ever be the first operation on a store. + // If selector support for persistent stores was added, then they would use the apply path. + return result; + } + + @Override + public boolean isInitialized() { + return activeReadStore.isInitialized(); + } + + @Override + public boolean isStatusMonitoringEnabled() { + return persistentStore != null ? persistentStore.isStatusMonitoringEnabled() : false; + } + + @Override + public CacheStats getCacheStats() { + return persistentStore != null ? persistentStore.getCacheStats() : null; + } + + @Override + public void apply(ChangeSet changeSet) { + txMemoryStore.apply(changeSet); + maybeSwitchStore(); + + if (!hasPersistence || persistenceMode != DataStoreMode.READ_WRITE) { + return; + } + + if (persistentStore instanceof TransactionalDataStore) { + ((TransactionalDataStore) persistentStore).apply(changeSet); + } else { + // If an apply fails at init, that will throw on its own, but if it fails via an upsert, then + // we need to throw something to work with the current data source updates implementation. + if (!applyToLegacyPersistence(changeSet)) { + // The exception type doesn't matter here, as it will be converted to data store status. + throw new RuntimeException("Failure to apply data set to persistent store."); + } + } + } + + @Override + public Selector getSelector() { + return txMemoryStore.getSelector(); + } + + @Override + public void close() throws IOException { + memoryStore.close(); + if (persistentStore != null) { + persistentStore.close(); + } + } + + /** + * Switches the active read store from persistent to memory store once an initializing payload is received. + * This transition happens once, and then subsequently we only use the memory store. + */ + private void maybeSwitchStore() { + if (hasReceivedAnInitializingPayload.getAndSet(true)) { + return; + } + synchronized (activeStoreLock) { + activeReadStore = memoryStore; + } + } + + /** + * Applies a change set to a legacy persistent store that doesn't implement TransactionalDataStore. + * + * @param sortedChangeSet the change set to apply (data will have been sorted by data source updates) + * @return true if the operation succeeded, false otherwise + */ + private boolean applyToLegacyPersistence(ChangeSet sortedChangeSet) { + // Data will have been sorted by data source updates. + switch (sortedChangeSet.getType()) { + case Full: + applyFullChangeSetToLegacyStore(sortedChangeSet); + break; + case Partial: + return applyPartialChangeSetToLegacyStore(sortedChangeSet); + case None: + default: + break; + } + + return true; + } + + /** + * Applies a full change set to a legacy persistent store. + */ + private void applyFullChangeSetToLegacyStore(ChangeSet sortedChangeSet) { + persistentStore.init(new FullDataSet<>(sortedChangeSet.getData())); + } + + /** + * Applies a partial change set to a legacy persistent store. + * + * @param sortedChangeSet the change set to apply + * @return true if all operations succeeded, false otherwise + */ + private boolean applyPartialChangeSetToLegacyStore(ChangeSet sortedChangeSet) { + for (java.util.Map.Entry> kindItemsPair : sortedChangeSet.getData()) { + for (java.util.Map.Entry item : kindItemsPair.getValue().getItems()) { + boolean applySuccess = persistentStore.upsert(kindItemsPair.getKey(), item.getKey(), item.getValue()); + if (!applySuccess) { + return false; + } + } + } + + return true; + } +} diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/PersistentDataStoreConverterTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/PersistentDataStoreConverterTest.java new file mode 100644 index 0000000..069f9c2 --- /dev/null +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/PersistentDataStoreConverterTest.java @@ -0,0 +1,471 @@ +package com.launchdarkly.sdk.server; + +import com.google.common.collect.ImmutableList; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.FullDataSet; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.KeyedItems; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.SerializedItemDescriptor; + +import org.junit.Test; + +import java.util.AbstractMap; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +@SuppressWarnings("javadoc") +public class PersistentDataStoreConverterTest { + + // Simple test DataKind that uses "item1:1" format like C# tests + private static final DataKind TEST_DATA_KIND = new DataKind("testdata", + PersistentDataStoreConverterTest::serializeTestItem, + PersistentDataStoreConverterTest::deserializeTestItem); + + private static final DataKind OTHER_DATA_KIND = new DataKind("otherdata", + PersistentDataStoreConverterTest::serializeTestItem, + PersistentDataStoreConverterTest::deserializeTestItem); + + private static String serializeTestItem(ItemDescriptor item) { + if (item.getItem() == null) { + return "DELETED:" + item.getVersion(); + } + TestItem testItem = (TestItem) item.getItem(); + return testItem.name + ":" + item.getVersion(); + } + + private static ItemDescriptor deserializeTestItem(String s) { + String[] parts = s.split(":"); + int version = Integer.parseInt(parts[1]); + if ("DELETED".equals(parts[0])) { + return ItemDescriptor.deletedItem(version); + } + return new ItemDescriptor(version, new TestItem(parts[0])); + } + + private static class TestItem { + final String name; + + TestItem(String name) { + this.name = name; + } + + @Override + public boolean equals(Object o) { + if (o instanceof TestItem) { + return name.equals(((TestItem) o).name); + } + return false; + } + + @Override + public int hashCode() { + return name.hashCode(); + } + + @Override + public String toString() { + return "TestItem(" + name + ")"; + } + } + + private static class TestDataBuilder { + private final Map> data = new HashMap<>(); + + TestDataBuilder add(DataKind kind, String key, int version, Object item) { + data.computeIfAbsent(kind, k -> new HashMap<>()).put(key, new ItemDescriptor(version, item)); + return this; + } + + FullDataSet build() { + ImmutableList.Builder>> builder = + ImmutableList.builder(); + for (Map.Entry> e : data.entrySet()) { + ImmutableList.Builder> itemsBuilder = + ImmutableList.builder(); + for (Map.Entry item : e.getValue().entrySet()) { + itemsBuilder.add(new AbstractMap.SimpleEntry<>(item.getKey(), item.getValue())); + } + builder.add(new AbstractMap.SimpleEntry<>(e.getKey(), new KeyedItems<>(itemsBuilder.build()))); + } + return new FullDataSet<>(builder.build()); + } + } + + @Test + public void toSerializedFormatConvertsCorrectly() { + TestItem item1 = new TestItem("item1"); + TestItem item2 = new TestItem("item2"); + + FullDataSet inMemoryData = new TestDataBuilder() + .add(TEST_DATA_KIND, "key1", 1, item1) + .add(TEST_DATA_KIND, "key2", 2, item2) + .build(); + + FullDataSet serializedData = + PersistentDataStoreConverter.toSerializedFormat(inMemoryData); + + // Should have one data kind + int count = 0; + for (@SuppressWarnings("unused") Map.Entry> ignored : + serializedData.getData()) { + count++; + } + assertEquals(1, count); + + KeyedItems testKindData = null; + for (Map.Entry> e : serializedData.getData()) { + if (e.getKey() == TEST_DATA_KIND) { + testKindData = e.getValue(); + break; + } + } + assertNotNull(testKindData); + + int itemCount = 0; + for (@SuppressWarnings("unused") Map.Entry ignored : + testKindData.getItems()) { + itemCount++; + } + assertEquals(2, itemCount); + + // Verify first item + SerializedItemDescriptor serializedItem1 = null; + for (Map.Entry e : testKindData.getItems()) { + if ("key1".equals(e.getKey())) { + serializedItem1 = e.getValue(); + break; + } + } + assertNotNull(serializedItem1); + assertEquals(1, serializedItem1.getVersion()); + assertFalse(serializedItem1.isDeleted()); + assertNotNull(serializedItem1.getSerializedItem()); + assertEquals("item1:1", serializedItem1.getSerializedItem()); + + // Verify second item + SerializedItemDescriptor serializedItem2 = null; + for (Map.Entry e : testKindData.getItems()) { + if ("key2".equals(e.getKey())) { + serializedItem2 = e.getValue(); + break; + } + } + assertNotNull(serializedItem2); + assertEquals(2, serializedItem2.getVersion()); + assertFalse(serializedItem2.isDeleted()); + assertNotNull(serializedItem2.getSerializedItem()); + assertEquals("item2:2", serializedItem2.getSerializedItem()); + } + + @Test + public void toSerializedFormatHandlesDeletedItems() { + TestItem item1 = new TestItem("item1"); + + FullDataSet inMemoryData = new TestDataBuilder() + .add(TEST_DATA_KIND, "key1", 1, item1) + .add(TEST_DATA_KIND, "key2", 2, null) // Deleted item + .build(); + + FullDataSet serializedData = + PersistentDataStoreConverter.toSerializedFormat(inMemoryData); + + KeyedItems testKindData = null; + for (Map.Entry> e : serializedData.getData()) { + if (e.getKey() == TEST_DATA_KIND) { + testKindData = e.getValue(); + break; + } + } + assertNotNull(testKindData); + + int itemCount = 0; + for (@SuppressWarnings("unused") Map.Entry ignored : + testKindData.getItems()) { + itemCount++; + } + assertEquals(2, itemCount); + + // Regular item + SerializedItemDescriptor serializedItem1 = null; + for (Map.Entry e : testKindData.getItems()) { + if ("key1".equals(e.getKey())) { + serializedItem1 = e.getValue(); + break; + } + } + assertNotNull(serializedItem1); + assertEquals(1, serializedItem1.getVersion()); + assertFalse(serializedItem1.isDeleted()); + + // Deleted item + SerializedItemDescriptor serializedItem2 = null; + for (Map.Entry e : testKindData.getItems()) { + if ("key2".equals(e.getKey())) { + serializedItem2 = e.getValue(); + break; + } + } + assertNotNull(serializedItem2); + assertEquals(2, serializedItem2.getVersion()); + assertTrue(serializedItem2.isDeleted()); + // Serialized representation still contains the placeholder + assertEquals("DELETED:2", serializedItem2.getSerializedItem()); + } + + @Test + public void toSerializedFormatPreservesAllDataKinds() { + TestItem item1 = new TestItem("item1"); + TestItem item2 = new TestItem("item2"); + + FullDataSet inMemoryData = new TestDataBuilder() + .add(TEST_DATA_KIND, "key1", 1, item1) + .add(OTHER_DATA_KIND, "key2", 2, item2) + .build(); + + FullDataSet serializedData = + PersistentDataStoreConverter.toSerializedFormat(inMemoryData); + + // Should have both data kinds + int kindCount = 0; + for (@SuppressWarnings("unused") Map.Entry> ignored : + serializedData.getData()) { + kindCount++; + } + assertEquals(2, kindCount); + + // Verify TestDataKind + KeyedItems testKindData = null; + for (Map.Entry> e : serializedData.getData()) { + if (e.getKey() == TEST_DATA_KIND) { + testKindData = e.getValue(); + break; + } + } + assertNotNull(testKindData); + int count = 0; + SerializedItemDescriptor firstItem = null; + for (Map.Entry e : testKindData.getItems()) { + count++; + if (firstItem == null) { + firstItem = e.getValue(); + } + } + assertEquals(1, count); + assertNotNull(firstItem); + assertEquals("item1:1", firstItem.getSerializedItem()); + + // Verify OtherDataKind + KeyedItems otherKindData = null; + for (Map.Entry> e : serializedData.getData()) { + if (e.getKey() == OTHER_DATA_KIND) { + otherKindData = e.getValue(); + break; + } + } + assertNotNull(otherKindData); + count = 0; + firstItem = null; + for (Map.Entry e : otherKindData.getItems()) { + count++; + if (firstItem == null) { + firstItem = e.getValue(); + } + } + assertEquals(1, count); + assertNotNull(firstItem); + assertEquals("item2:2", firstItem.getSerializedItem()); + } + + @Test + public void toSerializedFormatWithEmptyDataReturnsEmptyDataSet() { + FullDataSet inMemoryData = new TestDataBuilder().build(); + + FullDataSet serializedData = + PersistentDataStoreConverter.toSerializedFormat(inMemoryData); + + int count = 0; + for (@SuppressWarnings("unused") Map.Entry> ignored : + serializedData.getData()) { + count++; + } + assertEquals(0, count); + } + + @Test + public void toSerializedFormatWithEmptyKindIncludesEmptyKind() { + // Create a data set with a kind that has no items + FullDataSet inMemoryData = new TestDataBuilder() + .add(TEST_DATA_KIND, "key1", 1, new TestItem("item1")) + .build(); + + FullDataSet serializedData = + PersistentDataStoreConverter.toSerializedFormat(inMemoryData); + + // Should have the kind with items + int count = 0; + for (@SuppressWarnings("unused") Map.Entry> ignored : + serializedData.getData()) { + count++; + } + assertEquals(1, count); + + KeyedItems testKindData = null; + for (Map.Entry> e : serializedData.getData()) { + if (e.getKey() == TEST_DATA_KIND) { + testKindData = e.getValue(); + break; + } + } + assertNotNull(testKindData); + int itemCount = 0; + for (@SuppressWarnings("unused") Map.Entry ignored : + testKindData.getItems()) { + itemCount++; + } + assertEquals(1, itemCount); + } + + @Test + public void toSerializedFormatPreservesVersionNumbers() { + TestItem item1 = new TestItem("item1"); + + FullDataSet inMemoryData = new TestDataBuilder() + .add(TEST_DATA_KIND, "key1", 100, item1) + .add(TEST_DATA_KIND, "key2", 999, item1) + .build(); + + FullDataSet serializedData = + PersistentDataStoreConverter.toSerializedFormat(inMemoryData); + + KeyedItems testKindData = null; + for (Map.Entry> e : serializedData.getData()) { + if (e.getKey() == TEST_DATA_KIND) { + testKindData = e.getValue(); + break; + } + } + assertNotNull(testKindData); + + SerializedItemDescriptor item1Serialized = null; + for (Map.Entry e : testKindData.getItems()) { + if ("key1".equals(e.getKey())) { + item1Serialized = e.getValue(); + break; + } + } + assertNotNull(item1Serialized); + assertEquals(100, item1Serialized.getVersion()); + + SerializedItemDescriptor item2Serialized = null; + for (Map.Entry e : testKindData.getItems()) { + if ("key2".equals(e.getKey())) { + item2Serialized = e.getValue(); + break; + } + } + assertNotNull(item2Serialized); + assertEquals(999, item2Serialized.getVersion()); + } + + @Test + public void toSerializedFormatWithMultipleItemsInSameKind() { + TestItem item1 = new TestItem("item1"); + TestItem item2 = new TestItem("item2"); + TestItem item3 = new TestItem("item3"); + + FullDataSet inMemoryData = new TestDataBuilder() + .add(TEST_DATA_KIND, "key1", 1, item1) + .add(TEST_DATA_KIND, "key2", 2, item2) + .add(TEST_DATA_KIND, "key3", 3, item3) + .build(); + + FullDataSet serializedData = + PersistentDataStoreConverter.toSerializedFormat(inMemoryData); + + KeyedItems testKindData = null; + for (Map.Entry> e : serializedData.getData()) { + if (e.getKey() == TEST_DATA_KIND) { + testKindData = e.getValue(); + break; + } + } + assertNotNull(testKindData); + + int count = 0; + for (@SuppressWarnings("unused") Map.Entry ignored : + testKindData.getItems()) { + count++; + } + assertEquals(3, count); + + // Verify all three items are present with correct serialization + boolean foundKey1 = false; + boolean foundKey2 = false; + boolean foundKey3 = false; + for (Map.Entry e : testKindData.getItems()) { + if ("key1".equals(e.getKey()) && "item1:1".equals(e.getValue().getSerializedItem())) { + foundKey1 = true; + } + if ("key2".equals(e.getKey()) && "item2:2".equals(e.getValue().getSerializedItem())) { + foundKey2 = true; + } + if ("key3".equals(e.getKey()) && "item3:3".equals(e.getValue().getSerializedItem())) { + foundKey3 = true; + } + } + assertTrue(foundKey1); + assertTrue(foundKey2); + assertTrue(foundKey3); + } + + @Test + public void toSerializedFormatWithMixedDeletedAndRegularItems() { + TestItem item1 = new TestItem("item1"); + TestItem item3 = new TestItem("item3"); + + FullDataSet inMemoryData = new TestDataBuilder() + .add(TEST_DATA_KIND, "key1", 1, item1) + .add(TEST_DATA_KIND, "key2", 2, null) // Deleted + .add(TEST_DATA_KIND, "key3", 3, item3) + .add(TEST_DATA_KIND, "key4", 4, null) // Deleted + .build(); + + FullDataSet serializedData = + PersistentDataStoreConverter.toSerializedFormat(inMemoryData); + + KeyedItems testKindData = null; + for (Map.Entry> e : serializedData.getData()) { + if (e.getKey() == TEST_DATA_KIND) { + testKindData = e.getValue(); + break; + } + } + assertNotNull(testKindData); + + int count = 0; + for (@SuppressWarnings("unused") Map.Entry ignored : + testKindData.getItems()) { + count++; + } + assertEquals(4, count); + + // Count deleted vs non-deleted + int deletedCount = 0; + int regularCount = 0; + for (Map.Entry e : testKindData.getItems()) { + if (e.getValue().isDeleted()) { + deletedCount++; + } else { + regularCount++; + } + } + + assertEquals(2, deletedCount); + assertEquals(2, regularCount); + } +} diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/PersistentDataStoreWrapperRecoveryTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/PersistentDataStoreWrapperRecoveryTest.java new file mode 100644 index 0000000..4a1608f --- /dev/null +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/PersistentDataStoreWrapperRecoveryTest.java @@ -0,0 +1,629 @@ +package com.launchdarkly.sdk.server; + +import com.google.common.collect.ImmutableList; +import com.launchdarkly.sdk.server.integrations.MockPersistentDataStore; +import com.launchdarkly.sdk.server.integrations.PersistentDataStoreBuilder; +import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.FullDataSet; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.KeyedItems; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.SerializedItemDescriptor; + +import org.junit.After; +import org.junit.Test; + +import java.io.IOException; +import java.time.Duration; +import java.util.AbstractMap; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import static com.launchdarkly.sdk.server.DataStoreTestTypes.TEST_ITEMS; +import static com.launchdarkly.sdk.server.TestComponents.sharedExecutor; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the external data source recovery behavior in PersistentDataStoreWrapper. + * These tests verify that when a persistent store recovers from an outage, it syncs + * data from an external data source (like InMemoryDataStore) rather than its internal cache. + */ +@SuppressWarnings("javadoc") +public class PersistentDataStoreWrapperRecoveryTest extends BaseTest { + private static final Duration TIMEOUT_FOR_RECOVERY = Duration.ofSeconds(2); + private static final RuntimeException FAKE_ERROR = new RuntimeException("test error"); + + private final MockPersistentDataStore core; + private final DataStoreUpdatesImpl dataStoreUpdates; + + public PersistentDataStoreWrapperRecoveryTest() { + this.core = new MockPersistentDataStore(); + EventBroadcasterImpl statusBroadcaster = + EventBroadcasterImpl.forDataStoreStatus(sharedExecutor, testLogger); + this.dataStoreUpdates = new DataStoreUpdatesImpl(statusBroadcaster); + } + + @After + public void tearDown() throws IOException { + // Cleanup if needed + } + + // Helper method to create FullDataSet with deleted items + private FullDataSet createDataSetWithDeletedItem(DataKind kind, String key, int version) { + Map> dataMap = new HashMap<>(); + dataMap.put(kind, new HashMap<>()); + dataMap.get(kind).put(key, ItemDescriptor.deletedItem(version)); + ImmutableList.Builder>> builder = ImmutableList.builder(); + for (Map.Entry> e : dataMap.entrySet()) { + ImmutableList.Builder> itemsBuilder = ImmutableList.builder(); + for (Map.Entry item : e.getValue().entrySet()) { + itemsBuilder.add(new AbstractMap.SimpleEntry<>(item.getKey(), item.getValue())); + } + builder.add(new AbstractMap.SimpleEntry<>(e.getKey(), new KeyedItems<>(itemsBuilder.build()))); + } + return new FullDataSet<>(builder.build()); + } + + // Helper method to merge two FullDataSets + private FullDataSet mergeDataSets(FullDataSet set1, FullDataSet set2) { + Map> merged = new HashMap<>(); + + // Add all from set1 + for (Map.Entry> e : set1.getData()) { + merged.put(e.getKey(), new HashMap<>()); + for (Map.Entry item : e.getValue().getItems()) { + merged.get(e.getKey()).put(item.getKey(), item.getValue()); + } + } + + // Add/overwrite with set2 + for (Map.Entry> e : set2.getData()) { + merged.computeIfAbsent(e.getKey(), k -> new HashMap<>()); + for (Map.Entry item : e.getValue().getItems()) { + merged.get(e.getKey()).put(item.getKey(), item.getValue()); + } + } + + ImmutableList.Builder>> builder = ImmutableList.builder(); + for (Map.Entry> e : merged.entrySet()) { + ImmutableList.Builder> itemsBuilder = ImmutableList.builder(); + for (Map.Entry item : e.getValue().entrySet()) { + itemsBuilder.add(new AbstractMap.SimpleEntry<>(item.getKey(), item.getValue())); + } + builder.add(new AbstractMap.SimpleEntry<>(e.getKey(), new KeyedItems<>(itemsBuilder.build()))); + } + return new FullDataSet<>(builder.build()); + } + + private PersistentDataStoreWrapper makeWrapperWithExternalSource(CacheExporter externalSource) { + PersistentDataStoreWrapper wrapper = new PersistentDataStoreWrapper( + core, + Duration.ofMillis(-1), // Infinite TTL + PersistentDataStoreBuilder.StaleValuesPolicy.EVICT, + false, + dataStoreUpdates::updateStatus, + sharedExecutor, + testLogger + ); + if (externalSource != null) { + wrapper.setCacheExporter(externalSource); + } + return wrapper; + } + + @Test + public void externalDataSourceSyncWhenStoreRecoversSyncsFromExternalSource() throws Exception { + // Create a mock external data source with some initial data + MockCacheExporter externalSource = new MockCacheExporter(); + DataStoreTestTypes.TestItem item1 = new DataStoreTestTypes.TestItem("key1", "item1", 1); + DataStoreTestTypes.TestItem item2 = new DataStoreTestTypes.TestItem("key2", "item2", 1); + + externalSource.setData(new DataStoreTestTypes.DataBuilder() + .add(TEST_ITEMS, item1) + .add(TEST_ITEMS, item2) + .build()); + + PersistentDataStoreWrapper wrapper = makeWrapperWithExternalSource(externalSource); + try { + DataStoreStatusProvider dataStoreStatusProvider = new DataStoreStatusProviderImpl(wrapper, dataStoreUpdates); + BlockingQueue statuses = new LinkedBlockingQueue<>(); + dataStoreStatusProvider.addStatusListener(statuses::add); + + // Initialize the wrapper with some initial data + wrapper.init(new DataStoreTestTypes.DataBuilder() + .add(TEST_ITEMS, item1) + .build()); + + // Cause a store error + core.unavailable = true; + core.fakeError = FAKE_ERROR; + try { + wrapper.upsert(TEST_ITEMS, "key1", new ItemDescriptor(2, item1)); + fail("Expected exception"); + } catch (RuntimeException e) { + assertEquals(FAKE_ERROR.getMessage(), e.getMessage()); + } + + DataStoreStatusProvider.Status status1 = statuses.poll(TIMEOUT_FOR_RECOVERY.toMillis(), TimeUnit.MILLISECONDS); + assertNotNull("Expected status update", status1); + assertFalse(status1.isAvailable()); + + // While the store is down, update the external data source with new data + DataStoreTestTypes.TestItem item3 = new DataStoreTestTypes.TestItem("key3", "item3", 1); + externalSource.setData(new DataStoreTestTypes.DataBuilder() + .add(TEST_ITEMS, new DataStoreTestTypes.TestItem("key1", "item1", 2)) + .add(TEST_ITEMS, new DataStoreTestTypes.TestItem("key2", "item2", 2)) + .add(TEST_ITEMS, item3) + .build()); + + // Make store available again + core.fakeError = null; + core.unavailable = false; + + // Wait for recovery + DataStoreStatusProvider.Status status2 = statuses.poll(TIMEOUT_FOR_RECOVERY.toMillis(), TimeUnit.MILLISECONDS); + assertNotNull("Expected recovery status update", status2); + assertTrue(status2.isAvailable()); + assertFalse(status2.isRefreshNeeded()); // Should not need refresh in infinite cache mode + + // Verify that ALL data from external source was synced to persistent store + SerializedItemDescriptor syncedItem1 = core.data.get(TEST_ITEMS).get("key1"); + assertNotNull(syncedItem1); + assertEquals(2, syncedItem1.getVersion()); + + SerializedItemDescriptor syncedItem2 = core.data.get(TEST_ITEMS).get("key2"); + assertNotNull(syncedItem2); + assertEquals(2, syncedItem2.getVersion()); + + SerializedItemDescriptor syncedItem3 = core.data.get(TEST_ITEMS).get("key3"); + assertNotNull(syncedItem3); + assertEquals(1, syncedItem3.getVersion()); + + // Check log message + assertTrue(logCapture.getMessages().stream() + .anyMatch(m -> m.getLevel().name().equals("WARN") && + m.getText().contains("Successfully updated persistent store from external data source"))); + } finally { + wrapper.close(); + } + } + + @Test + public void externalDataSourceSyncWithMultipleKindsSyncsAllKinds() throws Exception { + MockCacheExporter externalSource = new MockCacheExporter(); + DataStoreTestTypes.TestItem item1 = new DataStoreTestTypes.TestItem("key1", "item1", 1); + DataStoreTestTypes.TestItem item2 = new DataStoreTestTypes.TestItem("key2", "item2", 1); + + externalSource.setData(new DataStoreTestTypes.DataBuilder() + .add(TEST_ITEMS, item1) + .add(DataStoreTestTypes.OTHER_TEST_ITEMS, item2) + .build()); + + PersistentDataStoreWrapper wrapper = makeWrapperWithExternalSource(externalSource); + try { + DataStoreStatusProvider dataStoreStatusProvider = new DataStoreStatusProviderImpl(wrapper, dataStoreUpdates); + BlockingQueue statuses = new LinkedBlockingQueue<>(); + dataStoreStatusProvider.addStatusListener(statuses::add); + + wrapper.init(externalSource.exportAll()); + + // Cause error + core.unavailable = true; + core.fakeError = FAKE_ERROR; + try { + wrapper.upsert(TEST_ITEMS, "key1", new ItemDescriptor(2, item1)); + fail("Expected exception"); + } catch (RuntimeException e) { + assertEquals(FAKE_ERROR.getMessage(), e.getMessage()); + } + + DataStoreStatusProvider.Status unavailableStatus = statuses.poll(TIMEOUT_FOR_RECOVERY.toMillis(), TimeUnit.MILLISECONDS); + assertNotNull("Expected unavailable status", unavailableStatus); + + // Update both kinds in external source + externalSource.setData(new DataStoreTestTypes.DataBuilder() + .add(TEST_ITEMS, new DataStoreTestTypes.TestItem("key1", "item1", 3)) + .add(DataStoreTestTypes.OTHER_TEST_ITEMS, new DataStoreTestTypes.TestItem("key2", "item2", 3)) + .build()); + + // Recover + core.fakeError = null; + core.unavailable = false; + + DataStoreStatusProvider.Status recoveryStatus = statuses.poll(TIMEOUT_FOR_RECOVERY.toMillis(), TimeUnit.MILLISECONDS); + assertNotNull("Expected recovery status", recoveryStatus); + + // Both kinds should be synced + SerializedItemDescriptor syncedItem1 = core.data.get(TEST_ITEMS).get("key1"); + assertNotNull(syncedItem1); + assertEquals(3, syncedItem1.getVersion()); + + SerializedItemDescriptor syncedItem2 = core.data.get(DataStoreTestTypes.OTHER_TEST_ITEMS).get("key2"); + assertNotNull(syncedItem2); + assertEquals(3, syncedItem2.getVersion()); + } finally { + wrapper.close(); + } + } + + @Test + public void externalDataSourceSyncWhenExportFailsDoesNotRecover() throws Exception { + MockCacheExporter externalSource = new MockCacheExporter(); + externalSource.setData(new DataStoreTestTypes.DataBuilder() + .add(TEST_ITEMS, new DataStoreTestTypes.TestItem("key1", "item1", 1)) + .build()); + + PersistentDataStoreWrapper wrapper = makeWrapperWithExternalSource(externalSource); + try { + DataStoreStatusProvider dataStoreStatusProvider = new DataStoreStatusProviderImpl(wrapper, dataStoreUpdates); + BlockingQueue statuses = new LinkedBlockingQueue<>(); + dataStoreStatusProvider.addStatusListener(statuses::add); + + wrapper.init(externalSource.exportAll()); + + // Cause error + core.unavailable = true; + core.fakeError = FAKE_ERROR; + try { + wrapper.upsert(TEST_ITEMS, "key1", new ItemDescriptor(2, new DataStoreTestTypes.TestItem("key1", "item1", 2))); + fail("Expected exception"); + } catch (RuntimeException e) { + assertEquals(FAKE_ERROR.getMessage(), e.getMessage()); + } + + DataStoreStatusProvider.Status unavailableStatus = statuses.poll(TIMEOUT_FOR_RECOVERY.toMillis(), TimeUnit.MILLISECONDS); + assertNotNull("Expected unavailable status", unavailableStatus); + + // Make external source throw an error during export + RuntimeException exportError = new RuntimeException("export failed"); + externalSource.exportError = exportError; + + // Make store available, but external source will fail + core.fakeError = null; + core.unavailable = false; + + // Wait a bit to ensure polling happens (but not full recovery timeout) + Thread.sleep(600); + + // Should NOT have recovered because export failed + // Try to get a status update with a short timeout - should fail + try { + DataStoreStatusProvider.Status status = statuses.poll(100, TimeUnit.MILLISECONDS); + if (status != null && status.isAvailable()) { + fail("Should not have received a recovery status update"); + } + } catch (Exception e) { + // Expected - no status update received + } + + // Check log message + assertTrue(logCapture.getMessages().stream() + .anyMatch(m -> m.getLevel().name().equals("ERROR") && + m.getText().contains("Failed to export data from external source"))); + } finally { + wrapper.close(); + } + } + + @Test + public void externalDataSourceSyncWhenInitCoreFailsDoesNotRecover() throws Exception { + MockCacheExporter externalSource = new MockCacheExporter(); + externalSource.setData(new DataStoreTestTypes.DataBuilder() + .add(TEST_ITEMS, new DataStoreTestTypes.TestItem("key1", "item1", 1)) + .build()); + + PersistentDataStoreWrapper wrapper = makeWrapperWithExternalSource(externalSource); + try { + DataStoreStatusProvider dataStoreStatusProvider = new DataStoreStatusProviderImpl(wrapper, dataStoreUpdates); + BlockingQueue statuses = new LinkedBlockingQueue<>(); + dataStoreStatusProvider.addStatusListener(statuses::add); + + wrapper.init(externalSource.exportAll()); + + // Cause error + core.unavailable = true; + core.fakeError = FAKE_ERROR; + try { + wrapper.upsert(TEST_ITEMS, "key1", new ItemDescriptor(2, new DataStoreTestTypes.TestItem("key1", "item1", 2))); + fail("Expected exception"); + } catch (RuntimeException e) { + assertEquals(FAKE_ERROR.getMessage(), e.getMessage()); + } + + DataStoreStatusProvider.Status unavailableStatus = statuses.poll(TIMEOUT_FOR_RECOVERY.toMillis(), TimeUnit.MILLISECONDS); + assertNotNull("Expected unavailable status", unavailableStatus); + + // Make store available but init will fail + core.unavailable = false; + core.fakeError = FAKE_ERROR; // Still throws error on operations + + // Wait a bit for polling + Thread.sleep(600); + + // Should NOT have recovered because init failed + // Try to get a status update with a short timeout - should fail + try { + DataStoreStatusProvider.Status status = statuses.poll(100, TimeUnit.MILLISECONDS); + if (status != null && status.isAvailable()) { + fail("Should not have received a recovery status update"); + } + } catch (Exception e) { + // Expected - no status update received + } + + // Check log message + assertTrue(logCapture.getMessages().stream() + .anyMatch(m -> m.getLevel().name().equals("ERROR") && + m.getText().contains("Tried to write external data to persistent store after outage, but failed"))); + } finally { + wrapper.close(); + } + } + + @Test + public void backwardCompatibilityWithoutExternalSourceUsesCacheSync() throws Exception { + // This test verifies that when no external source is provided, the wrapper + // falls back to the original cache-based recovery behavior + PersistentDataStoreWrapper wrapper = new PersistentDataStoreWrapper( + core, + Duration.ofMillis(-1), // Infinite TTL + PersistentDataStoreBuilder.StaleValuesPolicy.EVICT, + false, + dataStoreUpdates::updateStatus, + sharedExecutor, + testLogger + ); + try { + DataStoreStatusProvider dataStoreStatusProvider = new DataStoreStatusProviderImpl(wrapper, dataStoreUpdates); + BlockingQueue statuses = new LinkedBlockingQueue<>(); + dataStoreStatusProvider.addStatusListener(statuses::add); + + DataStoreTestTypes.TestItem item1 = new DataStoreTestTypes.TestItem("key1", "item1", 1); + wrapper.init(new DataStoreTestTypes.DataBuilder() + .add(TEST_ITEMS, item1) + .build()); + + // Cause error and update cache + core.unavailable = true; + core.fakeError = FAKE_ERROR; + try { + wrapper.upsert(TEST_ITEMS, "key1", new ItemDescriptor(2, item1)); + fail("Expected exception"); + } catch (RuntimeException e) { + assertEquals(FAKE_ERROR.getMessage(), e.getMessage()); + } + + DataStoreStatusProvider.Status unavailableStatus = statuses.poll(TIMEOUT_FOR_RECOVERY.toMillis(), TimeUnit.MILLISECONDS); + assertNotNull("Expected unavailable status", unavailableStatus); + + // The cache should have the update even though store failed + assertEquals(new ItemDescriptor(2, item1), wrapper.get(TEST_ITEMS, "key1")); + + // Recover + core.fakeError = null; + core.unavailable = false; + + DataStoreStatusProvider.Status status = statuses.poll(TIMEOUT_FOR_RECOVERY.toMillis(), TimeUnit.MILLISECONDS); + assertNotNull("Expected status update", status); + + // Should have synced from cache (original behavior) + SerializedItemDescriptor syncedItem = core.data.get(TEST_ITEMS).get("key1"); + assertNotNull(syncedItem); + assertEquals(2, syncedItem.getVersion()); + + // Check log message + assertTrue(logCapture.getMessages().stream() + .anyMatch(m -> m.getLevel().name().equals("WARN") && + m.getText().contains("Successfully updated persistent store from cached data"))); + } finally { + wrapper.close(); + } + } + + @Test + public void externalDataSourceSyncWithEmptyExternalSourceHandlesGracefully() throws Exception { + MockCacheExporter externalSource = new MockCacheExporter(); + // External source has no data + externalSource.setData(new DataStoreTestTypes.DataBuilder().build()); + + PersistentDataStoreWrapper wrapper = makeWrapperWithExternalSource(externalSource); + try { + DataStoreStatusProvider dataStoreStatusProvider = new DataStoreStatusProviderImpl(wrapper, dataStoreUpdates); + BlockingQueue statuses = new LinkedBlockingQueue<>(); + dataStoreStatusProvider.addStatusListener(statuses::add); + + wrapper.init(new DataStoreTestTypes.DataBuilder() + .add(TEST_ITEMS, new DataStoreTestTypes.TestItem("key1", "item1", 1)) + .build()); + + // Cause error + core.unavailable = true; + core.fakeError = FAKE_ERROR; + try { + wrapper.upsert(TEST_ITEMS, "key1", new ItemDescriptor(2, new DataStoreTestTypes.TestItem("key1", "item1", 2))); + fail("Expected exception"); + } catch (RuntimeException e) { + assertEquals(FAKE_ERROR.getMessage(), e.getMessage()); + } + + DataStoreStatusProvider.Status status = statuses.poll(TIMEOUT_FOR_RECOVERY.toMillis(), TimeUnit.MILLISECONDS); + assertNotNull("Expected status update", status); + + // Recover with empty external source + core.fakeError = null; + core.unavailable = false; + + DataStoreStatusProvider.Status recoveryStatus3 = statuses.poll(TIMEOUT_FOR_RECOVERY.toMillis(), TimeUnit.MILLISECONDS); + assertNotNull("Expected status update", recoveryStatus3); + + // Should have cleared the persistent store (synced empty data) + assertFalse(core.data.containsKey(TEST_ITEMS) && core.data.get(TEST_ITEMS).containsKey("key1")); + } finally { + wrapper.close(); + } + } + + @Test + public void externalDataSourceSyncWithDeletedItemsSyncsCorrectly() throws Exception { + MockCacheExporter externalSource = new MockCacheExporter(); + DataStoreTestTypes.TestItem item1 = new DataStoreTestTypes.TestItem("key1", "item1", 1); + + DataStoreTestTypes.DataBuilder builder = new DataStoreTestTypes.DataBuilder() + .add(TEST_ITEMS, item1); + // Manually add deleted item + Map> dataMap = new java.util.HashMap<>(); + for (Map.Entry> e : builder.build().getData()) { + dataMap.put(e.getKey(), new java.util.HashMap<>()); + for (Map.Entry item : e.getValue().getItems()) { + dataMap.get(e.getKey()).put(item.getKey(), item.getValue()); + } + } + dataMap.computeIfAbsent(TEST_ITEMS, k -> new java.util.HashMap<>()) + .put("key2", ItemDescriptor.deletedItem(1)); + ImmutableList.Builder>> builder2 = ImmutableList.builder(); + for (Map.Entry> e : dataMap.entrySet()) { + ImmutableList.Builder> itemsBuilder = ImmutableList.builder(); + for (Map.Entry item : e.getValue().entrySet()) { + itemsBuilder.add(new AbstractMap.SimpleEntry<>(item.getKey(), item.getValue())); + } + builder2.add(new AbstractMap.SimpleEntry<>(e.getKey(), new KeyedItems<>(itemsBuilder.build()))); + } + externalSource.setData(new FullDataSet<>(builder2.build())); + + PersistentDataStoreWrapper wrapper = makeWrapperWithExternalSource(externalSource); + try { + DataStoreStatusProvider dataStoreStatusProvider = new DataStoreStatusProviderImpl(wrapper, dataStoreUpdates); + BlockingQueue statuses = new LinkedBlockingQueue<>(); + dataStoreStatusProvider.addStatusListener(statuses::add); + + wrapper.init(externalSource.exportAll()); + + // Cause error + core.unavailable = true; + core.fakeError = FAKE_ERROR; + try { + wrapper.upsert(TEST_ITEMS, "key1", new ItemDescriptor(2, item1)); + fail("Expected exception"); + } catch (RuntimeException e) { + assertEquals(FAKE_ERROR.getMessage(), e.getMessage()); + } + + DataStoreStatusProvider.Status status = statuses.poll(TIMEOUT_FOR_RECOVERY.toMillis(), TimeUnit.MILLISECONDS); + assertNotNull("Expected status update", status); + + // Update external source with deleted item at higher version + FullDataSet regularData2 = new DataStoreTestTypes.DataBuilder() + .add(TEST_ITEMS, item1) + .build(); + FullDataSet deletedData2 = createDataSetWithDeletedItem(TEST_ITEMS, "key2", 2); + externalSource.setData(mergeDataSets(regularData2, deletedData2)); + + // Recover + core.fakeError = null; + core.unavailable = false; + + DataStoreStatusProvider.Status recoveryStatus4 = statuses.poll(TIMEOUT_FOR_RECOVERY.toMillis(), TimeUnit.MILLISECONDS); + assertNotNull("Expected status update", recoveryStatus4); + + // Verify deleted item was synced correctly + assertTrue(core.data.get(TEST_ITEMS).containsKey("key2")); + SerializedItemDescriptor deletedItem = core.data.get(TEST_ITEMS).get("key2"); + assertTrue(deletedItem.isDeleted()); + assertEquals(2, deletedItem.getVersion()); + } finally { + wrapper.close(); + } + } + + @Test + public void externalDataSourceSyncWhenExternalStoreNotInitializedFallsBackToCache() throws Exception { + // Create an uninitialized external store + MockCacheExporter externalSource = new MockCacheExporter(); + externalSource.isInitialized = false; // Not initialized + + PersistentDataStoreWrapper wrapper = makeWrapperWithExternalSource(externalSource); + try { + DataStoreStatusProvider dataStoreStatusProvider = new DataStoreStatusProviderImpl(wrapper, dataStoreUpdates); + BlockingQueue statuses = new LinkedBlockingQueue<>(); + dataStoreStatusProvider.addStatusListener(statuses::add); + + DataStoreTestTypes.TestItem item1 = new DataStoreTestTypes.TestItem("key1", "item1", 1); + wrapper.init(new DataStoreTestTypes.DataBuilder() + .add(TEST_ITEMS, item1) + .build()); + + // Cause error and update cache + core.unavailable = true; + core.fakeError = FAKE_ERROR; + try { + wrapper.upsert(TEST_ITEMS, "key1", new ItemDescriptor(2, item1)); + fail("Expected exception"); + } catch (RuntimeException e) { + assertEquals(FAKE_ERROR.getMessage(), e.getMessage()); + } + + DataStoreStatusProvider.Status unavailableStatus = statuses.poll(TIMEOUT_FOR_RECOVERY.toMillis(), TimeUnit.MILLISECONDS); + assertNotNull("Expected unavailable status", unavailableStatus); + + // The cache should have the update even though store failed + assertEquals(new ItemDescriptor(2, item1), wrapper.get(TEST_ITEMS, "key1")); + + // Update external source with different data, but keep it uninitialized + externalSource.setData(new DataStoreTestTypes.DataBuilder() + .add(TEST_ITEMS, new DataStoreTestTypes.TestItem("key1", "wrong-item", 99)) + .build()); + + // Recover + core.fakeError = null; + core.unavailable = false; + + DataStoreStatusProvider.Status status = statuses.poll(TIMEOUT_FOR_RECOVERY.toMillis(), TimeUnit.MILLISECONDS); + assertNotNull("Expected status update", status); + + // Should have synced from CACHE (not external source) because external store is not initialized + SerializedItemDescriptor syncedItem = core.data.get(TEST_ITEMS).get("key1"); + assertNotNull(syncedItem); + assertEquals(2, syncedItem.getVersion()); + + // Check log message + assertTrue(logCapture.getMessages().stream() + .anyMatch(m -> m.getLevel().name().equals("WARN") && + m.getText().contains("Successfully updated persistent store from cached data"))); + } finally { + wrapper.close(); + } + } + + /** + * Mock implementation of CacheExporter for testing. + */ + private static class MockCacheExporter implements CacheExporter { + private FullDataSet data = new DataStoreTestTypes.DataBuilder().build(); + public RuntimeException exportError; + public boolean isInitialized = true; + + public void setData(FullDataSet data) { + this.data = data; + } + + @Override + public FullDataSet exportAll() { + if (exportError != null) { + throw exportError; + } + return data; + } + + @Override + public boolean isInitialized() { + return isInitialized; + } + } +} diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/WriteThroughStoreTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/WriteThroughStoreTest.java new file mode 100644 index 0000000..3f54a89 --- /dev/null +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/WriteThroughStoreTest.java @@ -0,0 +1,930 @@ +package com.launchdarkly.sdk.server; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.launchdarkly.sdk.internal.fdv2.sources.Selector; +import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider.CacheStats; +import com.launchdarkly.sdk.server.subsystems.DataStore; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSet; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSetType; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.FullDataSet; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.KeyedItems; +import com.launchdarkly.sdk.server.subsystems.DataSystemConfiguration.DataStoreMode; +import com.launchdarkly.sdk.server.subsystems.TransactionalDataStore; + +import org.junit.After; +import org.junit.Test; + +import java.io.IOException; +import java.util.AbstractMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static com.launchdarkly.sdk.server.DataStoreTestTypes.TEST_ITEMS; +import static com.launchdarkly.sdk.server.DataStoreTestTypes.TestItem; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@SuppressWarnings("javadoc") +public class WriteThroughStoreTest { + private final TestItem item1 = new TestItem("key1", "item1", 10); + private final TestItem item2 = new TestItem("key2", "item2", 11); + + private WriteThroughStore store; + + @After + public void teardown() throws Exception { + if (store != null) { + store.close(); + } + } + + private FullDataSet createTestDataSet() { + return new DataStoreTestTypes.DataBuilder() + .add(TEST_ITEMS, item1, item2) + .build(); + } + + private ChangeSet createFullChangeSet() { + Map> changeSetData = ImmutableMap.of( + TEST_ITEMS, + new KeyedItems<>(ImmutableList.of( + new AbstractMap.SimpleEntry<>("key1", new ItemDescriptor(10, item1)), + new AbstractMap.SimpleEntry<>("key2", new ItemDescriptor(11, item2)) + )) + ); + + return new ChangeSet<>( + ChangeSetType.Full, + Selector.make(1, "state1"), + changeSetData.entrySet(), + null + ); + } + + // Construction Tests + + @Test + public void constructorWithPersistenceSetsActiveReadStoreToPersistent() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockPersistentStore persistentStore = new MockPersistentStore(); + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_WRITE); + + persistentStore.setData(TEST_ITEMS, "key1", new ItemDescriptor(10, item1)); + + ItemDescriptor result = store.get(TEST_ITEMS, "key1"); + assertNotNull(result); + assertEquals(10, result.getVersion()); + assertTrue(persistentStore.wasGetCalled); + } + + @Test + public void constructorWithoutPersistenceSetsActiveReadStoreToMemory() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + + store = new WriteThroughStore(memoryStore, null, DataStoreMode.READ_WRITE); + + memoryStore.upsert(TEST_ITEMS, "key1", new ItemDescriptor(10, item1)); + + ItemDescriptor result = store.get(TEST_ITEMS, "key1"); + assertNotNull(result); + assertEquals(10, result.getVersion()); + } + + // Init Tests + + @Test + public void initWithoutPersistenceInitializesMemoryStoreOnly() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + + store = new WriteThroughStore(memoryStore, null, DataStoreMode.READ_WRITE); + + FullDataSet testData = createTestDataSet(); + store.init(testData); + + assertTrue(memoryStore.isInitialized()); + ItemDescriptor result = memoryStore.get(TEST_ITEMS, "key1"); + assertNotNull(result); + assertEquals(10, result.getVersion()); + } + + @Test + public void initWithPersistenceReadWriteInitializesBothStores() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockPersistentStore persistentStore = new MockPersistentStore(); + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_WRITE); + + FullDataSet testData = createTestDataSet(); + store.init(testData); + + assertTrue(memoryStore.isInitialized()); + assertTrue(persistentStore.isInitialized()); + assertTrue(persistentStore.wasInitCalled); + } + + @Test + public void initWithPersistenceReadOnlyInitializesMemoryStoreOnly() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockPersistentStore persistentStore = new MockPersistentStore(); + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_ONLY); + + FullDataSet testData = createTestDataSet(); + store.init(testData); + + assertTrue(memoryStore.isInitialized()); + assertFalse(persistentStore.wasInitCalled); + } + + @Test + public void initSwitchesActiveReadStoreToMemory() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockPersistentStore persistentStore = new MockPersistentStore(); + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_WRITE); + + persistentStore.setData(TEST_ITEMS, "key1", new ItemDescriptor(5, new TestItem("key1", "old", 5))); + + FullDataSet testData = createTestDataSet(); + store.init(testData); + + ItemDescriptor result = store.get(TEST_ITEMS, "key1"); + assertNotNull(result); + assertEquals(10, result.getVersion()); + assertEquals(item1, result.getItem()); + } + + // Get/GetAll Tests + + @Test + public void getBeforeSwitchReadsFromPersistentStore() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockPersistentStore persistentStore = new MockPersistentStore(); + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_WRITE); + + persistentStore.setData(TEST_ITEMS, "key1", new ItemDescriptor(10, item1)); + + ItemDescriptor result = store.get(TEST_ITEMS, "key1"); + assertNotNull(result); + assertTrue(persistentStore.wasGetCalled); + } + + @Test + public void getAfterSwitchReadsFromMemoryStore() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockPersistentStore persistentStore = new MockPersistentStore(); + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_WRITE); + + FullDataSet testData = createTestDataSet(); + store.init(testData); + + persistentStore.resetCallTracking(); + + ItemDescriptor result = store.get(TEST_ITEMS, "key1"); + assertNotNull(result); + assertFalse(persistentStore.wasGetCalled); + } + + @Test + public void getAllAfterSwitchReadsFromMemoryStore() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockPersistentStore persistentStore = new MockPersistentStore(); + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_WRITE); + + FullDataSet testData = createTestDataSet(); + store.init(testData); + + persistentStore.resetCallTracking(); + + KeyedItems result = store.getAll(TEST_ITEMS); + int count = 0; + for (@SuppressWarnings("unused") Map.Entry ignored : result.getItems()) { + count++; + } + assertEquals(2, count); + assertFalse(persistentStore.wasGetAllCalled); + } + + // Upsert Tests + + @Test + public void upsertWithoutPersistenceUpdatesMemoryStoreOnly() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + + store = new WriteThroughStore(memoryStore, null, DataStoreMode.READ_WRITE); + + boolean result = store.upsert(TEST_ITEMS, "key1", new ItemDescriptor(10, item1)); + assertTrue(result); + + ItemDescriptor retrieved = memoryStore.get(TEST_ITEMS, "key1"); + assertNotNull(retrieved); + assertEquals(10, retrieved.getVersion()); + } + + @Test + public void upsertWithPersistenceReadWriteUpdatesBothStores() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockPersistentStore persistentStore = new MockPersistentStore(); + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_WRITE); + + boolean result = store.upsert(TEST_ITEMS, "key1", new ItemDescriptor(10, item1)); + assertTrue(result); + assertTrue(persistentStore.wasUpsertCalled); + + ItemDescriptor retrieved = memoryStore.get(TEST_ITEMS, "key1"); + assertNotNull(retrieved); + } + + @Test + public void upsertWithPersistenceReadOnlyUpdatesMemoryStoreOnly() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockPersistentStore persistentStore = new MockPersistentStore(); + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_ONLY); + + boolean result = store.upsert(TEST_ITEMS, "key1", new ItemDescriptor(10, item1)); + assertTrue(result); + assertFalse(persistentStore.wasUpsertCalled); + } + + @Test + public void upsertWhenPersistentStoreFailsReturnsFalse() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockPersistentStore persistentStore = new MockPersistentStore(); + persistentStore.failUpsert = true; + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_WRITE); + + boolean result = store.upsert(TEST_ITEMS, "key1", new ItemDescriptor(10, item1)); + assertFalse(result); + } + + // Apply Tests + + @Test + public void applyWithFullChangeSetAppliesToBothStores() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockTransactionalPersistentStore persistentStore = new MockTransactionalPersistentStore(); + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_WRITE); + + ChangeSet changeSet = createFullChangeSet(); + store.apply(changeSet); + + assertTrue(memoryStore.isInitialized()); + assertTrue(persistentStore.wasApplyCalled); + } + + @Test + public void applyWithPartialChangeSetAppliesToBothStores() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockTransactionalPersistentStore persistentStore = new MockTransactionalPersistentStore(); + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_WRITE); + + store.init(createTestDataSet()); + + TestItem item3 = new TestItem("key3", "item3", 30); + Map> changeSetData = ImmutableMap.of( + TEST_ITEMS, + new KeyedItems<>(ImmutableList.of( + new AbstractMap.SimpleEntry<>("key3", new ItemDescriptor(30, item3)) + )) + ); + + ChangeSet changeSet = new ChangeSet<>( + ChangeSetType.Partial, + Selector.make(2, "state2"), + changeSetData.entrySet(), + null + ); + + persistentStore.resetCallTracking(); + store.apply(changeSet); + + assertTrue(persistentStore.wasApplyCalled); + ItemDescriptor result = memoryStore.get(TEST_ITEMS, "key3"); + assertNotNull(result); + assertEquals(item3, result.getItem()); + } + + @Test + public void applyWithLegacyPersistentStoreFullChangeSetCallsInit() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockPersistentStore persistentStore = new MockPersistentStore(); + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_WRITE); + + ChangeSet changeSet = createFullChangeSet(); + store.apply(changeSet); + + assertTrue(persistentStore.wasInitCalled); + } + + @Test + public void applyWithLegacyPersistentStorePartialChangeSetCallsUpsert() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockPersistentStore persistentStore = new MockPersistentStore(); + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_WRITE); + + store.init(createTestDataSet()); + + TestItem item3 = new TestItem("key3", "item3", 30); + Map> changeSetData = ImmutableMap.of( + TEST_ITEMS, + new KeyedItems<>(ImmutableList.of( + new AbstractMap.SimpleEntry<>("key3", new ItemDescriptor(30, item3)) + )) + ); + + ChangeSet changeSet = new ChangeSet<>( + ChangeSetType.Partial, + Selector.make(2, "state2"), + changeSetData.entrySet(), + null + ); + + persistentStore.resetCallTracking(); + store.apply(changeSet); + + assertTrue(persistentStore.wasUpsertCalled); + } + + @Test + public void applySwitchesActiveReadStoreToMemory() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockTransactionalPersistentStore persistentStore = new MockTransactionalPersistentStore(); + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_WRITE); + + persistentStore.setData(TEST_ITEMS, "key1", new ItemDescriptor(5, new TestItem("key1", "old", 5))); + + ChangeSet changeSet = createFullChangeSet(); + store.apply(changeSet); + + ItemDescriptor result = store.get(TEST_ITEMS, "key1"); + assertNotNull(result); + assertEquals(10, result.getVersion()); + assertEquals(item1, result.getItem()); + + persistentStore.resetCallTracking(); + store.get(TEST_ITEMS, "key1"); + assertFalse(persistentStore.wasGetCalled); + } + + // Initialized Tests + + @Test + public void initializedWithPersistenceReturnsPersistentStoreStatus() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockPersistentStore persistentStore = new MockPersistentStore(); + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_WRITE); + + assertFalse(store.isInitialized()); + + persistentStore.setInitialized(true); + + assertTrue(store.isInitialized()); + } + + @Test + public void initializedWithoutPersistenceReturnsMemoryStoreStatus() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + + store = new WriteThroughStore(memoryStore, null, DataStoreMode.READ_WRITE); + + assertFalse(store.isInitialized()); + + memoryStore.init(createTestDataSet()); + + assertTrue(store.isInitialized()); + } + + // Store Switching Tests + + @Test + public void storeSwitchingHappensOnlyOnce() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockPersistentStore persistentStore = new MockPersistentStore(); + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_WRITE); + + persistentStore.setData(TEST_ITEMS, "key1", new ItemDescriptor(5, new TestItem("key1", "old", 5))); + + store.init(createTestDataSet()); + + ItemDescriptor result1 = store.get(TEST_ITEMS, "key1"); + assertNotNull(result1); + assertEquals(10, result1.getVersion()); + + persistentStore.setData(TEST_ITEMS, "key1", new ItemDescriptor(20, new TestItem("key1", "newer", 20))); + + FullDataSet newData = new DataStoreTestTypes.DataBuilder() + .add(TEST_ITEMS, new TestItem("key1", "item1-v15", 15)) + .build(); + store.init(newData); + + ItemDescriptor result2 = store.get(TEST_ITEMS, "key1"); + assertNotNull(result2); + assertEquals(15, result2.getVersion()); + assertFalse(result2.getVersion() == 20); + } + + // Selector Tests + + @Test + public void selectorReturnsMemoryStoreSelector() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockTransactionalPersistentStore persistentStore = new MockTransactionalPersistentStore(); + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_WRITE); + + ChangeSet changeSet = new ChangeSet<>( + ChangeSetType.Full, + Selector.make(42, "test-state"), + ImmutableList.>>of(), + null + ); + + store.apply(changeSet); + + Selector selector = store.getSelector(); + assertEquals(42, selector.getVersion()); + assertEquals("test-state", selector.getState()); + } + + // StatusMonitoringEnabled Tests + + @Test + public void statusMonitoringEnabledWithPersistenceReturnsPersistentStoreValue() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockPersistentStore persistentStore = new MockPersistentStore(); + persistentStore.statusMonitoringEnabledValue = true; + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_WRITE); + + assertTrue(store.isStatusMonitoringEnabled()); + } + + @Test + public void statusMonitoringEnabledWithoutPersistenceReturnsFalse() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + + store = new WriteThroughStore(memoryStore, null, DataStoreMode.READ_WRITE); + + assertFalse(store.isStatusMonitoringEnabled()); + } + + // Dispose Tests + + @Test + public void disposeDisposesBothStores() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockPersistentStore persistentStore = new MockPersistentStore(); + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_WRITE); + + store.close(); + + assertTrue(persistentStore.wasDisposeCalled); + } + + @Test + public void disposeWithoutPersistenceDisposesMemoryStoreOnly() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + + store = new WriteThroughStore(memoryStore, null, DataStoreMode.READ_WRITE); + + store.close(); + // No exception means it worked + } + + // Error Handling Tests + + @Test + public void applyWithLegacyStorePartialChangeSetThrowsWhenUpsertFails() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockPersistentStore persistentStore = new MockPersistentStore(); + persistentStore.failUpsert = true; + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_WRITE); + + store.init(createTestDataSet()); + + TestItem item3 = new TestItem("key3", "item3", 30); + Map> changeSetData = ImmutableMap.of( + TEST_ITEMS, + new KeyedItems<>(ImmutableList.of( + new AbstractMap.SimpleEntry<>("key3", new ItemDescriptor(30, item3)) + )) + ); + + ChangeSet changeSet = new ChangeSet<>( + ChangeSetType.Partial, + Selector.make(2, "state2"), + changeSetData.entrySet(), + null + ); + + try { + store.apply(changeSet); + fail("Expected RuntimeException"); + } catch (RuntimeException e) { + assertEquals("Failure to apply data set to persistent store.", e.getMessage()); + } + } + + @Test + public void applyWithLegacyStorePartialChangeSetThrowsWhenOneOfMultipleUpsertsFails() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockPersistentStore persistentStore = new MockPersistentStore(); + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_WRITE); + + store.init(createTestDataSet()); + + persistentStore.setUpsertFailureForKey("key4"); + + TestItem item3 = new TestItem("key3", "item3", 30); + TestItem item4 = new TestItem("key4", "item4", 40); + TestItem item5 = new TestItem("key5", "item5", 50); + Map> changeSetData = ImmutableMap.of( + TEST_ITEMS, + new KeyedItems<>(ImmutableList.of( + new AbstractMap.SimpleEntry<>("key3", new ItemDescriptor(30, item3)), + new AbstractMap.SimpleEntry<>("key4", new ItemDescriptor(40, item4)), + new AbstractMap.SimpleEntry<>("key5", new ItemDescriptor(50, item5)) + )) + ); + + ChangeSet changeSet = new ChangeSet<>( + ChangeSetType.Partial, + Selector.make(2, "state2"), + changeSetData.entrySet(), + null + ); + + try { + store.apply(changeSet); + fail("Expected RuntimeException"); + } catch (RuntimeException e) { + assertEquals("Failure to apply data set to persistent store.", e.getMessage()); + } + } + + @Test + public void applyWithLegacyStoreFullChangeSetPropagatesInitException() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockPersistentStore persistentStore = new MockPersistentStore(); + persistentStore.throwOnInit = true; + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_WRITE); + + ChangeSet changeSet = createFullChangeSet(); + + try { + store.apply(changeSet); + fail("Expected RuntimeException"); + } catch (RuntimeException e) { + assertEquals("Init failed", e.getMessage()); + } + } + + @Test + public void applyWithTransactionalStorePropagatesApplyException() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockTransactionalPersistentStore persistentStore = new MockTransactionalPersistentStore(); + persistentStore.throwOnApply = true; + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_WRITE); + + ChangeSet changeSet = createFullChangeSet(); + + try { + store.apply(changeSet); + fail("Expected RuntimeException"); + } catch (RuntimeException e) { + assertEquals("Apply failed", e.getMessage()); + } + } + + @Test + public void applyWithLegacyStorePartialChangeSetMemoryStoreStillUpdatedBeforeException() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockPersistentStore persistentStore = new MockPersistentStore(); + persistentStore.failUpsert = true; + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_WRITE); + + store.init(createTestDataSet()); + + TestItem item3 = new TestItem("key3", "item3", 30); + Map> changeSetData = ImmutableMap.of( + TEST_ITEMS, + new KeyedItems<>(ImmutableList.of( + new AbstractMap.SimpleEntry<>("key3", new ItemDescriptor(30, item3)) + )) + ); + + ChangeSet changeSet = new ChangeSet<>( + ChangeSetType.Partial, + Selector.make(2, "state2"), + changeSetData.entrySet(), + null + ); + + try { + store.apply(changeSet); + fail("Expected RuntimeException"); + } catch (RuntimeException e) { + // Expected + } + + ItemDescriptor result = memoryStore.get(TEST_ITEMS, "key3"); + assertNotNull(result); + assertEquals(item3, result.getItem()); + } + + @Test + public void applyWithLegacyStoreNoneChangeSetDoesNotThrow() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockPersistentStore persistentStore = new MockPersistentStore(); + persistentStore.failUpsert = true; + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_WRITE); + + store.init(createTestDataSet()); + + ChangeSet changeSet = new ChangeSet<>( + ChangeSetType.None, + Selector.make(2, "state2"), + ImmutableList.>>of(), + null + ); + + store.apply(changeSet); + // No exception means it worked + } + + @Test + public void applySwitchesToMemoryStoreEvenWhenPersistentStoreApplyFails() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockPersistentStore persistentStore = new MockPersistentStore(); + persistentStore.failUpsert = true; + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_WRITE); + + // Set up persistent store with old data + persistentStore.setData(TEST_ITEMS, "key1", new ItemDescriptor(5, new TestItem("key1", "old", 5))); + + // Before apply, reads should come from persistent store + ItemDescriptor resultBefore = store.get(TEST_ITEMS, "key1"); + assertNotNull(resultBefore); + assertEquals(5, resultBefore.getVersion()); + assertEquals(new TestItem("key1", "old", 5), resultBefore.getItem()); + + TestItem item3 = new TestItem("key3", "item3", 30); + Map> changeSetData = ImmutableMap.of( + TEST_ITEMS, + new KeyedItems<>(ImmutableList.of( + new AbstractMap.SimpleEntry<>("key3", new ItemDescriptor(30, item3)) + )) + ); + + ChangeSet changeSet = new ChangeSet<>( + ChangeSetType.Partial, + Selector.make(2, "state2"), + changeSetData.entrySet(), + null + ); + + // Apply should throw due to persistent store failure + try { + store.apply(changeSet); + fail("Expected RuntimeException"); + } catch (RuntimeException e) { + // Expected + } + + // After apply, even though it failed for persistence, we should have switched to memory store + // Memory store should have the new data + ItemDescriptor resultAfter = store.get(TEST_ITEMS, "key3"); + assertNotNull(resultAfter); + assertEquals(item3, resultAfter.getItem()); + + // Verify we're reading from memory, not persistent store + persistentStore.resetCallTracking(); + store.get(TEST_ITEMS, "key3"); + assertFalse(persistentStore.wasGetCalled); + } + + @Test + public void applySwitchesToMemoryStoreEvenWhenTransactionalStoreApplyFails() throws Exception { + InMemoryDataStore memoryStore = new InMemoryDataStore(); + MockTransactionalPersistentStore persistentStore = new MockTransactionalPersistentStore(); + persistentStore.throwOnApply = true; + + store = new WriteThroughStore(memoryStore, persistentStore, DataStoreMode.READ_WRITE); + + // Set up persistent store with old data + persistentStore.setData(TEST_ITEMS, "key1", new ItemDescriptor(5, new TestItem("key1", "old", 5))); + + // Before apply, reads should come from persistent store + ItemDescriptor resultBefore = store.get(TEST_ITEMS, "key1"); + assertNotNull(resultBefore); + assertEquals(5, resultBefore.getVersion()); + + ChangeSet changeSet = createFullChangeSet(); + + // Apply should throw due to persistent store failure + try { + store.apply(changeSet); + fail("Expected RuntimeException"); + } catch (RuntimeException e) { + assertEquals("Apply failed", e.getMessage()); + } + + // After apply, even though it failed for persistence, we should have switched to memory store + // Memory store should have the new data + ItemDescriptor resultAfter = store.get(TEST_ITEMS, "key1"); + assertNotNull(resultAfter); + assertEquals(10, resultAfter.getVersion()); + assertEquals(item1, resultAfter.getItem()); + + // Verify we're reading from memory, not persistent store + persistentStore.resetCallTracking(); + store.get(TEST_ITEMS, "key1"); + assertFalse(persistentStore.wasGetCalled); + } + + // Mock Stores + + private static class MockPersistentStore implements DataStore { + private final Map> data = new HashMap<>(); + private final Set keysToFailOn = new HashSet<>(); + private boolean initialized; + + public boolean wasInitCalled; + public boolean wasGetCalled; + public boolean wasGetAllCalled; + public boolean wasUpsertCalled; + public boolean wasDisposeCalled; + public boolean failUpsert; + public boolean throwOnInit; + public boolean statusMonitoringEnabledValue; + + public void setUpsertFailureForKey(String key) { + keysToFailOn.add(key); + } + + public void resetCallTracking() { + wasInitCalled = false; + wasGetCalled = false; + wasGetAllCalled = false; + wasUpsertCalled = false; + wasDisposeCalled = false; + } + + public void setData(DataKind kind, String key, ItemDescriptor item) { + data.computeIfAbsent(kind, k -> new HashMap<>()).put(key, item); + } + + public void setInitialized(boolean value) { + initialized = value; + } + + @Override + public void init(FullDataSet allData) { + wasInitCalled = true; + if (throwOnInit) { + throw new RuntimeException("Init failed"); + } + + data.clear(); + for (Map.Entry> kindData : allData.getData()) { + Map itemsMap = new HashMap<>(); + data.put(kindData.getKey(), itemsMap); + for (Map.Entry item : kindData.getValue().getItems()) { + itemsMap.put(item.getKey(), item.getValue()); + } + } + + initialized = true; + } + + @Override + public ItemDescriptor get(DataKind kind, String key) { + wasGetCalled = true; + Map kindData = data.get(kind); + if (kindData != null) { + return kindData.get(key); + } + return null; + } + + @Override + public KeyedItems getAll(DataKind kind) { + wasGetAllCalled = true; + Map kindData = data.get(kind); + if (kindData != null) { + return new KeyedItems<>(kindData.entrySet()); + } + return new KeyedItems<>(ImmutableList.of()); + } + + @Override + public boolean upsert(DataKind kind, String key, ItemDescriptor item) { + wasUpsertCalled = true; + if (failUpsert || keysToFailOn.contains(key)) { + return false; + } + + Map itemsMap = data.computeIfAbsent(kind, k -> new HashMap<>()); + + ItemDescriptor existing = itemsMap.get(key); + if (existing != null) { + if (item.getVersion() <= existing.getVersion()) { + return false; + } + } + + itemsMap.put(key, item); + return true; + } + + @Override + public boolean isInitialized() { + return initialized; + } + + @Override + public boolean isStatusMonitoringEnabled() { + return statusMonitoringEnabledValue; + } + + @Override + public CacheStats getCacheStats() { + return null; + } + + @Override + public void close() throws IOException { + wasDisposeCalled = true; + } + } + + private static class MockTransactionalPersistentStore extends MockPersistentStore implements TransactionalDataStore { + public boolean wasApplyCalled; + public boolean throwOnApply; + + @Override + public void resetCallTracking() { + super.resetCallTracking(); + wasApplyCalled = false; + } + + @Override + public void apply(ChangeSet changeSet) { + wasApplyCalled = true; + if (throwOnApply) { + throw new RuntimeException("Apply failed"); + } + + switch (changeSet.getType()) { + case Full: + init(new FullDataSet<>(changeSet.getData())); + break; + case Partial: + for (Map.Entry> kindData : changeSet.getData()) { + for (Map.Entry item : kindData.getValue().getItems()) { + upsert(kindData.getKey(), item.getKey(), item.getValue()); + } + } + break; + case None: + break; + } + } + + @Override + public Selector getSelector() { + return Selector.EMPTY; + } + } +}