Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.launchdarkly.sdk.server.subsystems.DataStore;
import com.launchdarkly.sdk.server.subsystems.LoggingConfiguration;
import com.launchdarkly.sdk.server.subsystems.DataSystemConfiguration;
import com.launchdarkly.sdk.server.subsystems.ComponentConfigurer;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -91,7 +92,26 @@ static FDv2DataSystem create(
DataStoreUpdatesImpl dataStoreUpdates = new DataStoreUpdatesImpl(
EventBroadcasterImpl.forDataStoreStatus(clientContext.sharedExecutor, logger));

InMemoryDataStore store = new InMemoryDataStore();
DataSystemConfiguration dataSystemConfiguration = config.dataSystem.build();

InMemoryDataStore memoryStore = new InMemoryDataStore();

DataStore persistentStore = null;
if (dataSystemConfiguration.getPersistentStore() != null) {
persistentStore = dataSystemConfiguration.getPersistentStore().build(clientContext.withDataStoreUpdateSink(dataStoreUpdates));

// Configure persistent store to sync from memory store during recovery (ReadWrite mode only)
if (persistentStore != null && dataSystemConfiguration.getPersistentDataStoreMode() == DataSystemConfiguration.DataStoreMode.READ_WRITE) {
if (persistentStore instanceof SettableCache) {
((SettableCache) persistentStore).setCacheExporter(memoryStore);
}
}
}

WriteThroughStore store = new WriteThroughStore(
memoryStore,
persistentStore,
dataSystemConfiguration.getPersistentDataStoreMode());

DataStoreStatusProvider dataStoreStatusProvider = new DataStoreStatusProviderImpl(store, dataStoreUpdates);

Expand All @@ -113,7 +133,6 @@ static FDv2DataSystem create(
logger
);

DataSystemConfiguration dataSystemConfiguration = config.dataSystem.build();
SelectorSource selectorSource = new SelectorSourceFacade(store);

DataSourceBuildInputs builderContext = new DataSourceBuildInputs(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.launchdarkly.sdk.server;

import com.google.common.collect.ImmutableList;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.FullDataSet;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.KeyedItems;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.SerializedItemDescriptor;

import java.util.AbstractMap;
import java.util.Map;

/**
* Utility for converting between in-memory and serialized persistent data store formats.
*/
final class PersistentDataStoreConverter {
private PersistentDataStoreConverter() {
// Utility class - prevent instantiation
}

/**
* Converts a FullDataSet of ItemDescriptor to SerializedItemDescriptor format.
*
* @param inMemoryData the in-memory data to convert
* @return a FullDataSet in serialized format suitable for persistent stores
*/
static FullDataSet<SerializedItemDescriptor> toSerializedFormat(
FullDataSet<ItemDescriptor> inMemoryData) {
ImmutableList.Builder<Map.Entry<DataKind, KeyedItems<SerializedItemDescriptor>>> builder =
ImmutableList.builder();

for (Map.Entry<DataKind, KeyedItems<ItemDescriptor>> kindEntry : inMemoryData.getData()) {
DataKind kind = kindEntry.getKey();
KeyedItems<ItemDescriptor> items = kindEntry.getValue();

builder.add(new AbstractMap.SimpleEntry<>(
kind,
serializeAll(kind, items)
));
}

return new FullDataSet<>(builder.build());
}

/**
* Serializes a single item descriptor.
*
* @param kind the data kind
* @param itemDesc the item descriptor to serialize
* @return a serialized item descriptor
*/
static SerializedItemDescriptor serialize(DataKind kind, ItemDescriptor itemDesc) {
boolean isDeleted = itemDesc.getItem() == null;
return new SerializedItemDescriptor(itemDesc.getVersion(), isDeleted, kind.serialize(itemDesc));
}

/**
* Serializes all items of a given DataKind from a KeyedItems collection.
*
* @param kind the data kind
* @param items the items to serialize
* @return keyed items in serialized format
*/
static KeyedItems<SerializedItemDescriptor> serializeAll(
DataKind kind,
KeyedItems<ItemDescriptor> items) {
ImmutableList.Builder<Map.Entry<String, SerializedItemDescriptor>> itemsBuilder =
ImmutableList.builder();
for (Map.Entry<String, ItemDescriptor> e : items.getItems()) {
itemsBuilder.add(new AbstractMap.SimpleEntry<>(e.getKey(), serialize(kind, e.getValue())));
}
return new KeyedItems<>(itemsBuilder.build());
}

/**
* Deserializes a single item descriptor.
*
* @param kind the data kind
* @param serializedItemDesc the serialized item descriptor
* @return a deserialized item descriptor
*/
static ItemDescriptor deserialize(DataKind kind, SerializedItemDescriptor serializedItemDesc) {
if (serializedItemDesc.isDeleted() || serializedItemDesc.getSerializedItem() == null) {
return ItemDescriptor.deletedItem(serializedItemDesc.getVersion());
}
ItemDescriptor deserializedItem = kind.deserialize(serializedItemDesc.getSerializedItem());
if (serializedItemDesc.getVersion() == 0 ||
serializedItemDesc.getVersion() == deserializedItem.getVersion() ||
deserializedItem.getItem() == null) {
return deserializedItem;
}
// If the store gave us a version number that isn't what was encoded in the object, trust it
return new ItemDescriptor(serializedItemDesc.getVersion(), deserializedItem.getItem());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
* <p>
* This class is only constructed by {@link PersistentDataStoreBuilder}.
*/
final class PersistentDataStoreWrapper implements DataStore {
final class PersistentDataStoreWrapper implements DataStore, SettableCache {
private final PersistentDataStore core;
private final LoadingCache<CacheKey, Optional<ItemDescriptor>> itemCache;
private final LoadingCache<DataKind, KeyedItems<ItemDescriptor>> allCache;
Expand All @@ -55,6 +55,9 @@ final class PersistentDataStoreWrapper implements DataStore {
private final ListeningExecutorService cacheExecutor;
private final LDLogger logger;

private final Object externalStoreLock = new Object();
private volatile CacheExporter externalCache;

PersistentDataStoreWrapper(
final PersistentDataStore core,
Duration cacheTtl,
Expand Down Expand Up @@ -180,7 +183,7 @@ public void init(FullDataSet<ItemDescriptor> allData) {
ImmutableList.Builder<Map.Entry<DataKind, KeyedItems<SerializedItemDescriptor>>> allBuilder = ImmutableList.builder();
for (Map.Entry<DataKind, KeyedItems<ItemDescriptor>> e0: allData.getData()) {
DataKind kind = e0.getKey();
KeyedItems<SerializedItemDescriptor> items = serializeAll(kind, e0.getValue());
KeyedItems<SerializedItemDescriptor> items = PersistentDataStoreConverter.serializeAll(kind, e0.getValue());
allBuilder.add(new AbstractMap.SimpleEntry<>(kind, items));
}
RuntimeException failure = initCore(new FullDataSet<>(allBuilder.build()));
Expand Down Expand Up @@ -260,7 +263,7 @@ public boolean upsert(DataKind kind, String key, ItemDescriptor item) {
synchronized (cachedDataKinds) {
cachedDataKinds.add(kind);
}
SerializedItemDescriptor serializedItem = serialize(kind, item);
SerializedItemDescriptor serializedItem = PersistentDataStoreConverter.serialize(kind, item);
boolean updated = false;
RuntimeException failure = null;
try {
Expand Down Expand Up @@ -317,6 +320,24 @@ public boolean isStatusMonitoringEnabled() {
return true;
}

/**
* Sets an external data source for recovery synchronization.
* <p>
* This should be called during initialization if the wrapper is being used
* in a write-through architecture where an external store maintains authoritative data.
* <p>
* When we remove FDv1 support, we should remove this functionality and instead handle it at a higher
* layer.
*
* @param externalDataSource The external data source to sync from during recovery
*/
@Override
public void setCacheExporter(CacheExporter externalDataSource) {
synchronized (externalStoreLock) {
externalCache = externalDataSource;
}
}

@Override
public CacheStats getCacheStats() {
if (itemCache == null || allCache == null) {
Expand All @@ -335,7 +356,7 @@ public CacheStats getCacheStats() {

private ItemDescriptor getAndDeserializeItem(DataKind kind, String key) {
SerializedItemDescriptor maybeSerializedItem = core.get(kind, key);
return maybeSerializedItem == null ? null : deserialize(kind, maybeSerializedItem);
return maybeSerializedItem == null ? null : PersistentDataStoreConverter.deserialize(kind, maybeSerializedItem);
}

private KeyedItems<ItemDescriptor> getAllAndDeserialize(DataKind kind) {
Expand All @@ -345,36 +366,11 @@ private KeyedItems<ItemDescriptor> getAllAndDeserialize(DataKind kind) {
}
ImmutableList.Builder<Map.Entry<String, ItemDescriptor>> b = ImmutableList.builder();
for (Map.Entry<String, SerializedItemDescriptor> e: allItems.getItems()) {
b.add(new AbstractMap.SimpleEntry<>(e.getKey(), deserialize(kind, e.getValue())));
b.add(new AbstractMap.SimpleEntry<>(e.getKey(), PersistentDataStoreConverter.deserialize(kind, e.getValue())));
}
return new KeyedItems<>(b.build());
}

private SerializedItemDescriptor serialize(DataKind kind, ItemDescriptor itemDesc) {
boolean isDeleted = itemDesc.getItem() == null;
return new SerializedItemDescriptor(itemDesc.getVersion(), isDeleted, kind.serialize(itemDesc));
}

private KeyedItems<SerializedItemDescriptor> serializeAll(DataKind kind, KeyedItems<ItemDescriptor> items) {
ImmutableList.Builder<Map.Entry<String, SerializedItemDescriptor>> itemsBuilder = ImmutableList.builder();
for (Map.Entry<String, ItemDescriptor> e: items.getItems()) {
itemsBuilder.add(new AbstractMap.SimpleEntry<>(e.getKey(), serialize(kind, e.getValue())));
}
return new KeyedItems<>(itemsBuilder.build());
}

private ItemDescriptor deserialize(DataKind kind, SerializedItemDescriptor serializedItemDesc) {
if (serializedItemDesc.isDeleted() || serializedItemDesc.getSerializedItem() == null) {
return ItemDescriptor.deletedItem(serializedItemDesc.getVersion());
}
ItemDescriptor deserializedItem = kind.deserialize(serializedItemDesc.getSerializedItem());
if (serializedItemDesc.getVersion() == 0 || serializedItemDesc.getVersion() == deserializedItem.getVersion()
|| deserializedItem.getItem() == null) {
return deserializedItem;
}
// If the store gave us a version number that isn't what was encoded in the object, trust it
return new ItemDescriptor(serializedItemDesc.getVersion(), deserializedItem.getItem());
}

private KeyedItems<ItemDescriptor> updateSingleItem(KeyedItems<ItemDescriptor> items, String key, ItemDescriptor item) {
// This is somewhat inefficient but it's preferable to use immutable data structures in the cache.
Expand All @@ -401,7 +397,47 @@ private boolean pollAvailabilityAfterOutage() {
if (!core.isStoreAvailable()) {
return false;
}


CacheExporter externalCacheSnapshot;
synchronized (externalStoreLock) {
externalCacheSnapshot = externalCache;
}

// If we have an external data source (e.g., WriteThroughStore's memory store) that is initialized,
// use that as the authoritative source. Otherwise, fall back to our internal cache if it's configured
// to cache indefinitely.
if (externalCacheSnapshot != null) {
if (externalCacheSnapshot.isInitialized()) {
try {
FullDataSet<ItemDescriptor> externalData = externalCacheSnapshot.exportAll();
FullDataSet<SerializedItemDescriptor> serializedData =
PersistentDataStoreConverter.toSerializedFormat(externalData);
RuntimeException e = initCore(serializedData);

if (e == null) {
logger.warn("Successfully updated persistent store from external data source");
} else {
// We failed to write the data to the underlying store. In this case, we should not
// return to a recovered state, but just try this all again next time the poll task runs.
logger.error("Tried to write external data to persistent store after outage, but failed: {}",
LogValues.exceptionSummary(e));
logger.debug(LogValues.exceptionTrace(e));
return false;
}
} catch (Exception ex) {
// If we can't export from the external source, don't recover yet
logger.error("Failed to export data from external source during persistent store recovery: {}",
LogValues.exceptionSummary(ex));
logger.debug(LogValues.exceptionTrace(ex));
return false;
}

return true;
}
}

// Fall back to cache-based recovery if external store is not available/initialized
// and we're in infinite cache mode
if (cacheIndefinitely && allCache != null) {
// If we're in infinite cache mode, then we can assume the cache has a full set of current
// flag data (since presumably the data source has still been running) and we can just
Expand All @@ -414,7 +450,7 @@ private boolean pollAvailabilityAfterOutage() {
for (DataKind kind: allKinds) {
KeyedItems<ItemDescriptor> items = allCache.getIfPresent(kind);
if (items != null) {
builder.add(new AbstractMap.SimpleEntry<>(kind, serializeAll(kind, items)));
builder.add(new AbstractMap.SimpleEntry<>(kind, PersistentDataStoreConverter.serializeAll(kind, items)));
}
}
RuntimeException e = initCore(new FullDataSet<>(builder.build()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.launchdarkly.sdk.server;

/**
* Optional interface for data stores that can accept a cache exporter
* for recovery synchronization.
* <p>
* This interface is used in write-through architectures where a persistent store
* may fail temporarily. When the persistent store recovers, it can sync data from
* an external authoritative source (like an in-memory store) rather than relying
* solely on its internal cache.
* <p>
* In the long-term, internal caching should be removed from store implementations and managed centrally.
* <p>
* This is currently for internal implementations only.
*/
interface SettableCache {
/**
* Sets an external cache exporter for recovery synchronization.
* <p>
* This should be called during initialization if the data store is being used
* in a write-through architecture where an external store maintains authoritative data.
* When the persistent store recovers from an outage, it will export data from this
* external source and write it to the underlying persistent storage.
*
* @param externalDataSource an external cache to sync from during recovery
*/
void setCacheExporter(CacheExporter externalDataSource);
}
Loading