diff --git a/launchdarkly-server-sdk.gemspec b/launchdarkly-server-sdk.gemspec index 2f534b64..daae12d4 100644 --- a/launchdarkly-server-sdk.gemspec +++ b/launchdarkly-server-sdk.gemspec @@ -43,7 +43,7 @@ Gem::Specification.new do |spec| spec.add_runtime_dependency "benchmark", "~> 0.1", ">= 0.1.1" spec.add_runtime_dependency "concurrent-ruby", "~> 1.1" - spec.add_runtime_dependency "ld-eventsource", "2.2.6" + spec.add_runtime_dependency "ld-eventsource", "2.4.0" spec.add_runtime_dependency "observer", "~> 0.1.2" spec.add_runtime_dependency "openssl", ">= 3.1.2", "< 5.0" spec.add_runtime_dependency "semantic", "~> 1.6" diff --git a/lib/ldclient-rb.rb b/lib/ldclient-rb.rb index 8eaee8f7..f72d9e04 100644 --- a/lib/ldclient-rb.rb +++ b/lib/ldclient-rb.rb @@ -8,6 +8,7 @@ module LaunchDarkly # Public APIs - these define the main interfaces users interact with require "ldclient-rb/config" require "ldclient-rb/context" +require "ldclient-rb/data_system" require "ldclient-rb/flags_state" require "ldclient-rb/integrations" require "ldclient-rb/interfaces" diff --git a/lib/ldclient-rb/config.rb b/lib/ldclient-rb/config.rb index 8c8727b6..ca6afa5d 100644 --- a/lib/ldclient-rb/config.rb +++ b/lib/ldclient-rb/config.rb @@ -45,7 +45,7 @@ class Config # @option opts [Hash] :application See {#application} # @option opts [String] :payload_filter_key See {#payload_filter_key} # @option opts [Boolean] :omit_anonymous_contexts See {#omit_anonymous_contexts} - # @option opts [DataSystemConfig] :datasystem_config See {#datasystem_config} + # @option opts [DataSystemConfig] :data_system_config See {#data_system_config} # @option hooks [Array e + @logger.info { "[LDClient] Error parsing stream event; will restart stream: #{e}" } + yield LaunchDarkly::Interfaces::DataSystem::Update.new( + state: LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED, + error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new( + LaunchDarkly::Interfaces::DataSource::ErrorInfo::INVALID_DATA, + 0, + e.to_s, + Time.now + ), + environment_id: envid + ) + + # Re-raise the exception so the SSE implementation can catch it and restart the stream. + raise + rescue => e + @logger.info { "[LDClient] Error while handling stream event; will restart stream: #{e}" } + yield LaunchDarkly::Interfaces::DataSystem::Update.new( + state: LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED, + error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new( + LaunchDarkly::Interfaces::DataSource::ErrorInfo::UNKNOWN, + 0, + e.to_s, + Time.now + ), + environment_id: envid + ) + + # Re-raise the exception so the SSE implementation can catch it and restart the stream. + raise + end + end + + client.on_error do |error| + log_connection_result(false) + fallback = false + + # Extract envid and fallback from error headers if available + if error.respond_to?(:headers) && error.headers + envid_from_error = error.headers[LD_ENVID_HEADER] + envid = envid_from_error if envid_from_error + + if error.headers[LD_FD_FALLBACK_HEADER] == 'true' + fallback = true + end + end + + update = handle_error(error, envid, fallback) + yield update if update + end + + client.query_params do + selector = ss.selector + { + "filter" => @config.payload_filter_key, + "basis" => (selector.state if selector&.defined?), + }.compact + end + end + + unless @sse + @logger.error { "[LDClient] Failed to create SSE client for streaming updates" } + return + end + + # Client auto-starts in background thread. Wait here until stop() is called. + @stopped.wait + end + + # + # Stops the streaming synchronizer. + # + def stop + @logger.info { "[LDClient] Stopping StreamingDataSourceV2 synchronizer" } + @sse&.close + @stopped.set + end + + # + # Processes a single SSE message and returns an Update if applicable. + # + # @param message [SSE::StreamEvent] + # @param change_set_builder [LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder] + # @param envid [String, nil] + # @return [LaunchDarkly::Interfaces::DataSystem::Update, nil] + # + private def process_message(message, change_set_builder, envid) + event_type = message.type + + # Handle heartbeat + if event_type == LaunchDarkly::Interfaces::DataSystem::EventName::HEARTBEAT + return nil + end + + @logger.debug { "[LDClient] Stream received #{event_type} message: #{message.data}" } + + case event_type + when LaunchDarkly::Interfaces::DataSystem::EventName::SERVER_INTENT + server_intent = LaunchDarkly::Interfaces::DataSystem::ServerIntent.from_h(JSON.parse(message.data, symbolize_names: true)) + change_set_builder.start(server_intent.payload.code) + + if server_intent.payload.code == LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_NONE + change_set_builder.expect_changes + return LaunchDarkly::Interfaces::DataSystem::Update.new( + state: LaunchDarkly::Interfaces::DataSource::Status::VALID, + environment_id: envid + ) + end + nil + + when LaunchDarkly::Interfaces::DataSystem::EventName::PUT_OBJECT + put = LaunchDarkly::Impl::DataSystem::ProtocolV2::PutObject.from_h(JSON.parse(message.data, symbolize_names: true)) + change_set_builder.add_put(put.kind, put.key, put.version, put.object) + nil + + when LaunchDarkly::Interfaces::DataSystem::EventName::DELETE_OBJECT + delete_object = LaunchDarkly::Impl::DataSystem::ProtocolV2::DeleteObject.from_h(JSON.parse(message.data, symbolize_names: true)) + change_set_builder.add_delete(delete_object.kind, delete_object.key, delete_object.version) + nil + + when LaunchDarkly::Interfaces::DataSystem::EventName::GOODBYE + goodbye = LaunchDarkly::Impl::DataSystem::ProtocolV2::Goodbye.from_h(JSON.parse(message.data, symbolize_names: true)) + unless goodbye.silent + @logger.error { "[LDClient] SSE server received error: #{goodbye.reason} (catastrophe: #{goodbye.catastrophe})" } + end + nil + + when LaunchDarkly::Interfaces::DataSystem::EventName::ERROR + error = LaunchDarkly::Impl::DataSystem::ProtocolV2::Error.from_h(JSON.parse(message.data, symbolize_names: true)) + @logger.error { "[LDClient] Error on #{error.payload_id}: #{error.reason}" } + + # Reset any previous change events but continue with last server intent + change_set_builder.reset + nil + + when LaunchDarkly::Interfaces::DataSystem::EventName::PAYLOAD_TRANSFERRED + selector = LaunchDarkly::Interfaces::DataSystem::Selector.from_h(JSON.parse(message.data, symbolize_names: true)) + change_set = change_set_builder.finish(selector) + + LaunchDarkly::Interfaces::DataSystem::Update.new( + state: LaunchDarkly::Interfaces::DataSource::Status::VALID, + change_set: change_set, + environment_id: envid + ) + + else + @logger.info { "[LDClient] Unexpected event found in stream: #{event_type}" } + nil + end + end + + # + # Handles errors that occur during streaming. + # + # @param error [Exception] + # @param envid [String, nil] + # @param fallback [Boolean] + # @return [LaunchDarkly::Interfaces::DataSystem::Update, nil] + # + private def handle_error(error, envid, fallback) + return nil if @stopped.set? + + update = nil + + case error + when SSE::Errors::HTTPStatusError + error_info = LaunchDarkly::Interfaces::DataSource::ErrorInfo.new( + LaunchDarkly::Interfaces::DataSource::ErrorInfo::ERROR_RESPONSE, + error.status, + Impl::Util.http_error_message(error.status, "stream connection", "will retry"), + Time.now + ) + + if fallback + update = LaunchDarkly::Interfaces::DataSystem::Update.new( + state: LaunchDarkly::Interfaces::DataSource::Status::OFF, + error: error_info, + revert_to_fdv1: true, + environment_id: envid + ) + stop + return update + end + + is_recoverable = Impl::Util.http_error_recoverable?(error.status) + + update = LaunchDarkly::Interfaces::DataSystem::Update.new( + state: is_recoverable ? LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED : LaunchDarkly::Interfaces::DataSource::Status::OFF, + error: error_info, + environment_id: envid + ) + + unless is_recoverable + @logger.error { "[LDClient] #{error_info.message}" } + stop + return update + end + + @logger.warn { "[LDClient] #{error_info.message}" } + + when SSE::Errors::HTTPContentTypeError, SSE::Errors::HTTPProxyError, SSE::Errors::ReadTimeoutError + @logger.warn { "[LDClient] Network error on stream connection: #{error}, will retry" } + + update = LaunchDarkly::Interfaces::DataSystem::Update.new( + state: LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED, + error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new( + LaunchDarkly::Interfaces::DataSource::ErrorInfo::NETWORK_ERROR, + 0, + error.to_s, + Time.now + ), + environment_id: envid + ) + + else + @logger.warn { "[LDClient] Unexpected error on stream connection: #{error}, will retry" } + + update = LaunchDarkly::Interfaces::DataSystem::Update.new( + state: LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED, + error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new( + LaunchDarkly::Interfaces::DataSource::ErrorInfo::UNKNOWN, + 0, + error.to_s, + Time.now + ), + environment_id: envid + ) + end + + update + end + + private def log_connection_started + @connection_attempt_start_time = Impl::Util.current_time_millis + end + + private def log_connection_result(is_success) + return unless @diagnostic_accumulator + return unless @connection_attempt_start_time > 0 + + current_time = Impl::Util.current_time_millis + elapsed = current_time - @connection_attempt_start_time + @diagnostic_accumulator.record_stream_init(@connection_attempt_start_time, !is_success, elapsed >= 0 ? elapsed : 0) + @connection_attempt_start_time = 0 + end + end + + # + # Builder for a StreamingDataSource. + # + class StreamingDataSourceBuilder + # + # @param sdk_key [String] + # @param config [LaunchDarkly::Config] + # + def initialize(sdk_key, config) + @sdk_key = sdk_key + @config = config + end + + # + # Builds the StreamingDataSource with the configured parameters. + # + # @return [StreamingDataSource] + # + def build + StreamingDataSource.new(@sdk_key, @config) + end + end + end + end +end diff --git a/lib/ldclient-rb/integrations/test_data_v2.rb b/lib/ldclient-rb/integrations/test_data_v2.rb index 267c6359..096480c4 100644 --- a/lib/ldclient-rb/integrations/test_data_v2.rb +++ b/lib/ldclient-rb/integrations/test_data_v2.rb @@ -32,7 +32,7 @@ module Integrations # # # config = LaunchDarkly::Config.new( # # sdk_key, - # # datasystem_config: data_config.build + # # data_system_config: data_config.build # # ) # # # flags can be updated at any time: diff --git a/lib/ldclient-rb/interfaces/data_system.rb b/lib/ldclient-rb/interfaces/data_system.rb index 00ccba25..bd46538f 100644 --- a/lib/ldclient-rb/interfaces/data_system.rb +++ b/lib/ldclient-rb/interfaces/data_system.rb @@ -14,25 +14,25 @@ module DataSystem # module EventName # Specifies that an object should be added to the data set with upsert semantics. - PUT_OBJECT = "put-object" + PUT_OBJECT = :"put-object" # Specifies that an object should be removed from the data set. - DELETE_OBJECT = "delete-object" + DELETE_OBJECT = :"delete-object" # Specifies the server's intent. - SERVER_INTENT = "server-intent" + SERVER_INTENT = :"server-intent" # Specifies that all data required to bring the existing data set to a new version has been transferred. - PAYLOAD_TRANSFERRED = "payload-transferred" + PAYLOAD_TRANSFERRED = :"payload-transferred" # Keeps the connection alive. - HEARTBEAT = "heart-beat" + HEARTBEAT = :"heart-beat" # Specifies that the server is about to close the connection. - GOODBYE = "goodbye" + GOODBYE = :goodbye # Specifies that an error occurred while serving the connection. - ERROR = "error" + ERROR = :error end # @@ -153,7 +153,7 @@ def defined? # # Returns the event name for payload transfer. # - # @return [String] + # @return [Symbol] # def name EventName::PAYLOAD_TRANSFERRED diff --git a/lib/ldclient-rb/ldclient.rb b/lib/ldclient-rb/ldclient.rb index cb4662ad..457deaed 100644 --- a/lib/ldclient-rb/ldclient.rb +++ b/lib/ldclient-rb/ldclient.rb @@ -4,6 +4,7 @@ require "ldclient-rb/impl/data_source" require "ldclient-rb/impl/data_store" require "ldclient-rb/impl/data_system/fdv1" +require "ldclient-rb/impl/data_system/fdv2" require "ldclient-rb/impl/diagnostic_events" require "ldclient-rb/impl/evaluation_with_hook_result" require "ldclient-rb/impl/evaluator" @@ -113,9 +114,16 @@ def postfork(wait_for_sec = 5) @hooks = Concurrent::Array.new(@config.hooks + plugin_hooks) - # Initialize the data system (FDv1 for now, will support FDv2 in the future) - # Note: FDv1 will update @config.feature_store to use its wrapped store - @data_system = Impl::DataSystem::FDv1.new(@sdk_key, @config) + # Initialize the data system - use FDv2 if configured, otherwise FDv1 + data_system_config = @config.data_system_config + if data_system_config.nil? + # Use FDv1 for backwards compatibility + # Note: FDv1 will update @config.feature_store to use its wrapped store + @data_system = Impl::DataSystem::FDv1.new(@sdk_key, @config) + else + # Use FDv2 with the provided configuration + @data_system = Impl::DataSystem::FDv2.new(@sdk_key, @config, data_system_config) + end # Components not managed by data system @big_segment_store_manager = Impl::BigSegmentStoreManager.new(@config.big_segments, @config.logger) diff --git a/spec/impl/data_system/streaming_synchronizer_spec.rb b/spec/impl/data_system/streaming_synchronizer_spec.rb new file mode 100644 index 00000000..e695d48b --- /dev/null +++ b/spec/impl/data_system/streaming_synchronizer_spec.rb @@ -0,0 +1,316 @@ +# frozen_string_literal: true + +require "spec_helper" +require "ldclient-rb/impl/data_system/streaming" +require "ldclient-rb/interfaces" +require "json" + +module LaunchDarkly + module Impl + module DataSystem + RSpec.describe StreamingDataSource do + let(:logger) { double("Logger", info: nil, warn: nil, error: nil, debug: nil) } + let(:sdk_key) { "test-sdk-key" } + let(:config) do + double( + "Config", + logger: logger, + stream_uri: "https://stream.example.com", + payload_filter_key: nil, + socket_factory: nil, + initial_reconnect_delay: 1, + instance_id: nil + ) + end + + # Mock SSE event + class MockSSEEvent + attr_reader :type, :data + + def initialize(type, data = nil) + @type = type + @data = data + end + end + + describe '#process_message' do + let(:synchronizer) { StreamingDataSource.new(sdk_key, config) } + let(:change_set_builder) { LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new } + let(:envid) { nil } + + it "ignores unknown events" do + event = MockSSEEvent.new(:unknown_type, "{}") + update = synchronizer.send(:process_message, event, change_set_builder, envid) + expect(update).to be_nil + end + + it "ignores heartbeat events" do + event = MockSSEEvent.new(LaunchDarkly::Interfaces::DataSystem::EventName::HEARTBEAT) + update = synchronizer.send(:process_message, event, change_set_builder, envid) + expect(update).to be_nil + end + + it "handles no changes (TRANSFER_NONE)" do + server_intent = LaunchDarkly::Interfaces::DataSystem::ServerIntent.new( + payload: LaunchDarkly::Interfaces::DataSystem::Payload.new( + id: "id", + target: 300, + code: LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_NONE, + reason: "up-to-date" + ) + ) + + event = MockSSEEvent.new( + LaunchDarkly::Interfaces::DataSystem::EventName::SERVER_INTENT, + JSON.generate(server_intent.to_h) + ) + + update = synchronizer.send(:process_message, event, change_set_builder, envid) + expect(update).not_to be_nil + expect(update.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + expect(update.error).to be_nil + expect(update.revert_to_fdv1).to eq(false) + expect(update.environment_id).to be_nil + expect(update.change_set).to be_nil + end + + it "handles empty changeset" do + server_intent = LaunchDarkly::Interfaces::DataSystem::ServerIntent.new( + payload: LaunchDarkly::Interfaces::DataSystem::Payload.new( + id: "id", + target: 300, + code: LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL, + reason: "cant-catchup" + ) + ) + selector = LaunchDarkly::Interfaces::DataSystem::Selector.new(state: "p:SOMETHING:300", version: 300) + + # Process server intent + event1 = MockSSEEvent.new( + LaunchDarkly::Interfaces::DataSystem::EventName::SERVER_INTENT, + JSON.generate(server_intent.to_h) + ) + synchronizer.send(:process_message, event1, change_set_builder, envid) + + # Process payload transferred + event2 = MockSSEEvent.new( + LaunchDarkly::Interfaces::DataSystem::EventName::PAYLOAD_TRANSFERRED, + JSON.generate(selector.to_h) + ) + update = synchronizer.send(:process_message, event2, change_set_builder, envid) + + expect(update).not_to be_nil + expect(update.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + expect(update.error).to be_nil + expect(update.revert_to_fdv1).to eq(false) + expect(update.environment_id).to be_nil + expect(update.change_set).not_to be_nil + expect(update.change_set.changes.length).to eq(0) + expect(update.change_set.selector).not_to be_nil + expect(update.change_set.selector.version).to eq(300) + expect(update.change_set.selector.state).to eq("p:SOMETHING:300") + expect(update.change_set.intent_code).to eq(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + end + + it "handles put objects" do + server_intent = LaunchDarkly::Interfaces::DataSystem::ServerIntent.new( + payload: LaunchDarkly::Interfaces::DataSystem::Payload.new( + id: "id", + target: 300, + code: LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL, + reason: "cant-catchup" + ) + ) + put = LaunchDarkly::Impl::DataSystem::ProtocolV2::PutObject.new( + version: 100, + kind: LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + key: "flag-key", + object: { key: "flag-key" } + ) + selector = LaunchDarkly::Interfaces::DataSystem::Selector.new(state: "p:SOMETHING:300", version: 300) + + # Process server intent + event1 = MockSSEEvent.new( + LaunchDarkly::Interfaces::DataSystem::EventName::SERVER_INTENT, + JSON.generate(server_intent.to_h) + ) + synchronizer.send(:process_message, event1, change_set_builder, envid) + + # Process put + event2 = MockSSEEvent.new( + LaunchDarkly::Interfaces::DataSystem::EventName::PUT_OBJECT, + JSON.generate(put.to_h) + ) + synchronizer.send(:process_message, event2, change_set_builder, envid) + + # Process payload transferred + event3 = MockSSEEvent.new( + LaunchDarkly::Interfaces::DataSystem::EventName::PAYLOAD_TRANSFERRED, + JSON.generate(selector.to_h) + ) + update = synchronizer.send(:process_message, event3, change_set_builder, envid) + + expect(update).not_to be_nil + expect(update.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + expect(update.error).to be_nil + expect(update.revert_to_fdv1).to eq(false) + expect(update.environment_id).to be_nil + expect(update.change_set).not_to be_nil + expect(update.change_set.changes.length).to eq(1) + expect(update.change_set.changes[0].action).to eq(LaunchDarkly::Interfaces::DataSystem::ChangeType::PUT) + expect(update.change_set.changes[0].kind).to eq(LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG) + expect(update.change_set.changes[0].key).to eq("flag-key") + expect(update.change_set.changes[0].object).to eq({ key: "flag-key" }) + expect(update.change_set.changes[0].version).to eq(100) + end + + it "handles delete objects" do + server_intent = LaunchDarkly::Interfaces::DataSystem::ServerIntent.new( + payload: LaunchDarkly::Interfaces::DataSystem::Payload.new( + id: "id", + target: 300, + code: LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL, + reason: "cant-catchup" + ) + ) + delete_object = LaunchDarkly::Impl::DataSystem::ProtocolV2::DeleteObject.new( + version: 101, + kind: LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + key: "flag-key" + ) + selector = LaunchDarkly::Interfaces::DataSystem::Selector.new(state: "p:SOMETHING:300", version: 300) + + # Process server intent + event1 = MockSSEEvent.new( + LaunchDarkly::Interfaces::DataSystem::EventName::SERVER_INTENT, + JSON.generate(server_intent.to_h) + ) + synchronizer.send(:process_message, event1, change_set_builder, envid) + + # Process delete + event2 = MockSSEEvent.new( + LaunchDarkly::Interfaces::DataSystem::EventName::DELETE_OBJECT, + JSON.generate(delete_object.to_h) + ) + synchronizer.send(:process_message, event2, change_set_builder, envid) + + # Process payload transferred + event3 = MockSSEEvent.new( + LaunchDarkly::Interfaces::DataSystem::EventName::PAYLOAD_TRANSFERRED, + JSON.generate(selector.to_h) + ) + update = synchronizer.send(:process_message, event3, change_set_builder, envid) + + expect(update).not_to be_nil + expect(update.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + expect(update.error).to be_nil + expect(update.revert_to_fdv1).to eq(false) + expect(update.environment_id).to be_nil + expect(update.change_set).not_to be_nil + expect(update.change_set.changes.length).to eq(1) + expect(update.change_set.changes[0].action).to eq(LaunchDarkly::Interfaces::DataSystem::ChangeType::DELETE) + expect(update.change_set.changes[0].kind).to eq(LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG) + expect(update.change_set.changes[0].key).to eq("flag-key") + expect(update.change_set.changes[0].version).to eq(101) + end + + it "swallows goodbye events" do + server_intent = LaunchDarkly::Interfaces::DataSystem::ServerIntent.new( + payload: LaunchDarkly::Interfaces::DataSystem::Payload.new( + id: "id", + target: 300, + code: LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL, + reason: "cant-catchup" + ) + ) + goodbye = LaunchDarkly::Impl::DataSystem::ProtocolV2::Goodbye.new( + reason: "test reason", + silent: true, + catastrophe: false + ) + selector = LaunchDarkly::Interfaces::DataSystem::Selector.new(state: "p:SOMETHING:300", version: 300) + + # Process server intent + event1 = MockSSEEvent.new( + LaunchDarkly::Interfaces::DataSystem::EventName::SERVER_INTENT, + JSON.generate(server_intent.to_h) + ) + synchronizer.send(:process_message, event1, change_set_builder, envid) + + # Process goodbye (should be ignored) + event2 = MockSSEEvent.new( + LaunchDarkly::Interfaces::DataSystem::EventName::GOODBYE, + JSON.generate(goodbye.to_h) + ) + result = synchronizer.send(:process_message, event2, change_set_builder, envid) + expect(result).to be_nil + + # Process payload transferred + event3 = MockSSEEvent.new( + LaunchDarkly::Interfaces::DataSystem::EventName::PAYLOAD_TRANSFERRED, + JSON.generate(selector.to_h) + ) + update = synchronizer.send(:process_message, event3, change_set_builder, envid) + + expect(update).not_to be_nil + expect(update.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + expect(update.change_set).not_to be_nil + expect(update.change_set.changes.length).to eq(0) + end + + it "error event resets changeset builder" do + server_intent = LaunchDarkly::Interfaces::DataSystem::ServerIntent.new( + payload: LaunchDarkly::Interfaces::DataSystem::Payload.new( + id: "id", + target: 300, + code: LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL, + reason: "cant-catchup" + ) + ) + put = LaunchDarkly::Impl::DataSystem::ProtocolV2::PutObject.new( + version: 100, + kind: LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + key: "flag-key", + object: { key: "flag-key" } + ) + error = LaunchDarkly::Impl::DataSystem::ProtocolV2::Error.new( + payload_id: "p:SOMETHING:300", + reason: "test reason" + ) + delete_object = LaunchDarkly::Impl::DataSystem::ProtocolV2::DeleteObject.new( + version: 101, + kind: LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + key: "flag-key" + ) + selector = LaunchDarkly::Interfaces::DataSystem::Selector.new(state: "p:SOMETHING:300", version: 300) + + # Process server intent + synchronizer.send(:process_message, MockSSEEvent.new(LaunchDarkly::Interfaces::DataSystem::EventName::SERVER_INTENT, JSON.generate(server_intent.to_h)), +change_set_builder, envid) + + # Process put (should be reset by error) + synchronizer.send(:process_message, MockSSEEvent.new(LaunchDarkly::Interfaces::DataSystem::EventName::PUT_OBJECT, JSON.generate(put.to_h)), change_set_builder, envid) + + # Process error (resets builder) + synchronizer.send(:process_message, MockSSEEvent.new(LaunchDarkly::Interfaces::DataSystem::EventName::ERROR, JSON.generate(error.to_h)), change_set_builder, envid) + + # Process delete (after reset) + synchronizer.send(:process_message, MockSSEEvent.new(LaunchDarkly::Interfaces::DataSystem::EventName::DELETE_OBJECT, JSON.generate(delete_object.to_h)), +change_set_builder, envid) + + # Process payload transferred + update = synchronizer.send(:process_message, MockSSEEvent.new(LaunchDarkly::Interfaces::DataSystem::EventName::PAYLOAD_TRANSFERRED, JSON.generate(selector.to_h)), +change_set_builder, envid) + + expect(update).not_to be_nil + expect(update.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + expect(update.change_set).not_to be_nil + # Only delete should be in the changeset (put was reset by error) + expect(update.change_set.changes.length).to eq(1) + expect(update.change_set.changes[0].action).to eq(LaunchDarkly::Interfaces::DataSystem::ChangeType::DELETE) + end + end + end + end + end +end