-
Notifications
You must be signed in to change notification settings - Fork 11
chore: Connect FDv2 data system. #108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
19d9c02
fbea872
adcaa0e
f2b209d
8c115cc
de2fded
98d3b39
da3c639
8fb88ed
da27015
aba46ef
7401331
bba0cdc
89bd017
228f3e6
9469b23
9a450e6
4b8313b
31eb13e
4a2fe3b
3428591
ff60216
e985f80
a956484
ff2376e
376bb1f
194c30c
707fe0e
cb79f5e
6702239
0aba424
91d2cb9
d610988
49c6008
b429eba
278f670
84be62d
ec609a5
3461149
b32d3ca
97ac7c4
7c84a68
9c43cbb
a383fc9
9c1c4c4
dab1ef1
68c490f
7bb6394
ae39996
02c4c98
d4dff45
776b4c6
d9dc5f0
20ec764
47e30b9
dd1146b
38f90c2
ee82077
83b4217
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,144 @@ | ||
| package com.launchdarkly.sdk.server; | ||
|
|
||
| import com.launchdarkly.sdk.server.datasources.Initializer; | ||
| import com.launchdarkly.sdk.server.datasources.Synchronizer; | ||
| import com.launchdarkly.sdk.server.integrations.FDv2PollingInitializerBuilder; | ||
| import com.launchdarkly.sdk.server.integrations.FDv2PollingSynchronizerBuilder; | ||
| import com.launchdarkly.sdk.server.integrations.FDv2StreamingSynchronizerBuilder; | ||
| import com.launchdarkly.sdk.server.integrations.PollingDataSourceBuilder; | ||
| import com.launchdarkly.sdk.server.interfaces.ServiceEndpoints; | ||
| import com.launchdarkly.sdk.server.subsystems.DataSourceBuildInputs; | ||
|
|
||
| import java.net.URI; | ||
|
|
||
| import static com.launchdarkly.sdk.server.ComponentsImpl.toHttpProperties; | ||
|
|
||
| /** | ||
| * Components for use with the data system. | ||
| * <p> | ||
| * This class is not stable, and not subject to any backwards compatibility guarantees or semantic versioning. | ||
| * It is in early access. If you want access to this feature please join the EAP. https://launchdarkly.com/docs/sdk/features/data-saving-mode | ||
| * </p> | ||
| */ | ||
| public final class DataSystemComponents { | ||
|
|
||
| static class FDv2PollingInitializerBuilderImpl extends FDv2PollingInitializerBuilder { | ||
| @Override | ||
| public Initializer build(DataSourceBuildInputs context) { | ||
| ServiceEndpoints endpoints = serviceEndpointsOverride != null | ||
| ? serviceEndpointsOverride | ||
| : context.getServiceEndpoints(); | ||
| URI configuredBaseUri = StandardEndpoints.selectBaseUri( | ||
| endpoints.getPollingBaseUri(), | ||
| StandardEndpoints.DEFAULT_POLLING_BASE_URI, | ||
| "Polling", | ||
| context.getBaseLogger()); | ||
|
|
||
| DefaultFDv2Requestor requestor = new DefaultFDv2Requestor( | ||
| toHttpProperties(context.getHttp()), | ||
| configuredBaseUri, | ||
| StandardEndpoints.FDV2_POLLING_REQUEST_PATH, | ||
| context.getBaseLogger()); | ||
|
|
||
| return new PollingInitializerImpl( | ||
| requestor, | ||
| context.getBaseLogger(), | ||
| context.getSelectorSource() | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| static class FDv2PollingSynchronizerBuilderImpl extends FDv2PollingSynchronizerBuilder { | ||
| @Override | ||
| public Synchronizer build(DataSourceBuildInputs context) { | ||
| ServiceEndpoints endpoints = serviceEndpointsOverride != null | ||
| ? serviceEndpointsOverride | ||
| : context.getServiceEndpoints(); | ||
| URI configuredBaseUri = StandardEndpoints.selectBaseUri( | ||
| endpoints.getPollingBaseUri(), | ||
| StandardEndpoints.DEFAULT_POLLING_BASE_URI, | ||
| "Polling", | ||
| context.getBaseLogger()); | ||
|
|
||
| DefaultFDv2Requestor requestor = new DefaultFDv2Requestor( | ||
| toHttpProperties(context.getHttp()), | ||
| configuredBaseUri, | ||
| StandardEndpoints.FDV2_POLLING_REQUEST_PATH, | ||
| context.getBaseLogger()); | ||
|
|
||
| return new PollingSynchronizerImpl( | ||
| requestor, | ||
| context.getBaseLogger(), | ||
| context.getSelectorSource(), | ||
| context.getSharedExecutor(), | ||
| pollInterval | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| static class FDv2StreamingSynchronizerBuilderImpl extends FDv2StreamingSynchronizerBuilder { | ||
| @Override | ||
| public Synchronizer build(DataSourceBuildInputs context) { | ||
| ServiceEndpoints endpoints = serviceEndpointsOverride != null | ||
| ? serviceEndpointsOverride | ||
| : context.getServiceEndpoints(); | ||
| URI configuredBaseUri = StandardEndpoints.selectBaseUri( | ||
| endpoints.getStreamingBaseUri(), | ||
| StandardEndpoints.DEFAULT_STREAMING_BASE_URI, | ||
| "Streaming", | ||
| context.getBaseLogger()); | ||
|
|
||
| return new StreamingSynchronizerImpl( | ||
| toHttpProperties(context.getHttp()), | ||
| configuredBaseUri, | ||
| StandardEndpoints.FDV2_STREAMING_REQUEST_PATH, | ||
| context.getBaseLogger(), | ||
| context.getSelectorSource(), | ||
| null, | ||
| initialReconnectDelay | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| private DataSystemComponents() {} | ||
|
|
||
| /** | ||
| * Get a builder for a polling initializer. | ||
| * | ||
| * @return the polling initializer builder | ||
| */ | ||
| public static FDv2PollingInitializerBuilder pollingInitializer() { | ||
| return new FDv2PollingInitializerBuilderImpl(); | ||
| } | ||
|
|
||
| /** | ||
| * Get a builder for a polling synchronizer. | ||
| * | ||
| * @return the polling synchronizer builder | ||
| */ | ||
| public static FDv2PollingSynchronizerBuilder pollingSynchronizer() { | ||
| return new FDv2PollingSynchronizerBuilderImpl(); | ||
| } | ||
|
|
||
| /** | ||
| * Get a builder for a streaming synchronizer. | ||
| * | ||
| * @return the streaming synchronizer builder | ||
| */ | ||
| public static FDv2StreamingSynchronizerBuilder streamingSynchronizer() { | ||
| return new FDv2StreamingSynchronizerBuilderImpl(); | ||
| } | ||
|
|
||
| /** | ||
| * Get a builder for a FDv1 compatible polling data source. | ||
| * <p> | ||
| * This is intended for use as a fallback. | ||
| * </p> | ||
| * | ||
| * @return the FDv1 compatible polling data source builder | ||
| */ | ||
| public static PollingDataSourceBuilder fDv1Polling() { | ||
| return Components.pollingDataSource(); | ||
| } | ||
| } | ||
|
|
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was a mistake in the implementation. So both polling and streaming have this update. |
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Still lots of work to do here. Outside the scope of this PR.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you intend to add tests in the subsequent PR? |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,11 +1,12 @@ | ||
| package com.launchdarkly.sdk.server; | ||
|
|
||
| import com.google.common.collect.ImmutableList; | ||
| import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; | ||
| import com.launchdarkly.sdk.server.datasources.Initializer; | ||
| import com.launchdarkly.sdk.server.datasources.Synchronizer; | ||
| import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider; | ||
| import com.launchdarkly.sdk.server.subsystems.DataSource; | ||
| import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSink; | ||
| import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSinkV2; | ||
|
|
||
| import java.io.Closeable; | ||
| import java.io.IOException; | ||
|
|
@@ -18,10 +19,10 @@ | |
| import java.util.stream.Collectors; | ||
|
|
||
| class FDv2DataSource implements DataSource { | ||
| private final List<InitializerFactory> initializers; | ||
| private final List<DataSourceFactory<Initializer>> initializers; | ||
| private final List<SynchronizerFactoryWithState> synchronizers; | ||
|
|
||
| private final DataSourceUpdateSink dataSourceUpdates; | ||
| private final DataSourceUpdateSinkV2 dataSourceUpdates; | ||
|
|
||
| private final CompletableFuture<Boolean> startFuture = new CompletableFuture<>(); | ||
| private final AtomicBoolean started = new AtomicBoolean(false); | ||
|
|
@@ -46,12 +47,12 @@ public enum State { | |
| Blocked | ||
| } | ||
|
|
||
| private final SynchronizerFactory factory; | ||
| private final DataSourceFactory<Synchronizer> factory; | ||
|
|
||
| private State state = State.Available; | ||
|
|
||
|
|
||
| public SynchronizerFactoryWithState(SynchronizerFactory factory) { | ||
| public SynchronizerFactoryWithState(DataSourceFactory<Synchronizer> factory) { | ||
| this.factory = factory; | ||
| } | ||
|
|
||
|
|
@@ -68,19 +69,15 @@ public Synchronizer build() { | |
| } | ||
| } | ||
|
|
||
| public interface InitializerFactory { | ||
| Initializer build(); | ||
| } | ||
|
|
||
| public interface SynchronizerFactory { | ||
| Synchronizer build(); | ||
| public interface DataSourceFactory<T> { | ||
| T build(); | ||
| } | ||
|
|
||
|
|
||
| public FDv2DataSource( | ||
| List<InitializerFactory> initializers, | ||
| List<SynchronizerFactory> synchronizers, | ||
| DataSourceUpdateSink dataSourceUpdates | ||
| ImmutableList<DataSourceFactory<Initializer>> initializers, | ||
| ImmutableList<DataSourceFactory<Synchronizer>> synchronizers, | ||
| DataSourceUpdateSinkV2 dataSourceUpdates | ||
| ) { | ||
| this.initializers = initializers; | ||
| this.synchronizers = synchronizers | ||
|
|
@@ -116,6 +113,40 @@ private SynchronizerFactoryWithState getFirstAvailableSynchronizer() { | |
| } | ||
| } | ||
|
|
||
| private void runInitializers() { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I moved this for better logical organization. |
||
| boolean anyDataReceived = false; | ||
| for (DataSourceFactory<Initializer> factory : initializers) { | ||
| try { | ||
| Initializer initializer = factory.build(); | ||
| if (setActiveSource(initializer)) return; | ||
| FDv2SourceResult result = initializer.run().get(); | ||
| switch (result.getResultType()) { | ||
| case CHANGE_SET: | ||
| dataSourceUpdates.apply(result.getChangeSet()); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actual change to the method. Apply the data. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ignored return value of apply causes incorrect initializationMedium Severity The return value of Additional Locations (1)
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am going to ignore this for now, until we work on other FDv2DataSource updates. |
||
| anyDataReceived = true; | ||
| if (!result.getChangeSet().getSelector().isEmpty()) { | ||
| // We received data with a selector, so we end the initialization process. | ||
| dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null); | ||
| startFuture.complete(true); | ||
| return; | ||
| } | ||
| break; | ||
| case STATUS: | ||
| // TODO: Implement. | ||
| break; | ||
| } | ||
| } catch (ExecutionException | InterruptedException | CancellationException e) { | ||
| // TODO: Log. | ||
| } | ||
| } | ||
| // We received data without a selector, and we have exhausted initializers, so we are going to | ||
| // consider ourselves initialized. | ||
| if (anyDataReceived) { | ||
| dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null); | ||
| startFuture.complete(true); | ||
| } | ||
| } | ||
|
|
||
| private void runSynchronizers() { | ||
| SynchronizerFactoryWithState availableSynchronizer = getFirstAvailableSynchronizer(); | ||
| // TODO: Add recovery handling. If there are no available synchronizers, but there are | ||
|
|
@@ -130,7 +161,7 @@ private void runSynchronizers() { | |
| FDv2SourceResult result = synchronizer.next().get(); | ||
| switch (result.getResultType()) { | ||
| case CHANGE_SET: | ||
| // TODO: Apply to the store. | ||
| dataSourceUpdates.apply(result.getChangeSet()); | ||
| // This could have been completed by any data source. But if it has not been completed before | ||
| // now, then we complete it. | ||
| startFuture.complete(true); | ||
|
|
@@ -186,40 +217,6 @@ private boolean setActiveSource(Closeable synchronizer) { | |
| return false; | ||
| } | ||
|
|
||
| private void runInitializers() { | ||
| boolean anyDataReceived = false; | ||
| for (InitializerFactory factory : initializers) { | ||
| try { | ||
| Initializer initializer = factory.build(); | ||
| if (setActiveSource(initializer)) return; | ||
| FDv2SourceResult res = initializer.run().get(); | ||
| switch (res.getResultType()) { | ||
| case CHANGE_SET: | ||
| // TODO: Apply to the store. | ||
| anyDataReceived = true; | ||
| if (!res.getChangeSet().getSelector().isEmpty()) { | ||
| // We received data with a selector, so we end the initialization process. | ||
| dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null); | ||
| startFuture.complete(true); | ||
| return; | ||
| } | ||
| break; | ||
| case STATUS: | ||
| // TODO: Implement. | ||
| break; | ||
| } | ||
| } catch (ExecutionException | InterruptedException | CancellationException e) { | ||
| // TODO: Log. | ||
| } | ||
| } | ||
| // We received data without a selector, and we have exhausted initializers, so we are going to | ||
| // consider ourselves initialized. | ||
| if (anyDataReceived) { | ||
| dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null); | ||
| startFuture.complete(true); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public Future<Void> start() { | ||
| if (!started.getAndSet(true)) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is a move because of package boundaries. Not sure how we want them to shake out.
The HttpConfig -> HttpProperties I think was the missing component, and moving that around created further complications.