diff --git a/spec/impl/data_store/in_memory_feature_store_v2_spec.rb b/spec/impl/data_store/in_memory_feature_store_v2_spec.rb index 8b16cf19..fb2d3918 100644 --- a/spec/impl/data_store/in_memory_feature_store_v2_spec.rb +++ b/spec/impl/data_store/in_memory_feature_store_v2_spec.rb @@ -20,6 +20,28 @@ module DataStore } end + let(:segment_key) { "test-segment" } + let(:segment) do + { + key: segment_key, + version: 1, + included: ["user1"], + excluded: [], + rules: [], + } + end + + describe "#initialized?" do + it "returns false before initialization" do + expect(subject.initialized?).to be false + end + + it "returns true after set_basis" do + subject.set_basis({ FEATURES => {} }) + expect(subject.initialized?).to be true + end + end + describe "#get with string/symbol key compatibility" do before do # Store items with symbol keys (as done by FDv2 protocol layer) @@ -44,6 +66,280 @@ module DataStore it "returns nil for non-existent keys" do expect(subject.get(FEATURES, "nonexistent")).to be_nil end + + it "returns nil for deleted items" do + deleted_flag = flag.merge(deleted: true) + collections = { FEATURES => { flag_key.to_sym => deleted_flag } } + subject.set_basis(collections) + expect(subject.get(FEATURES, flag_key)).to be_nil + end + end + + describe "#all" do + it "returns empty hash when no data" do + expect(subject.all(FEATURES)).to eq({}) + end + + it "returns all non-deleted items" do + collections = { + FEATURES => { + flag_key.to_sym => flag, + "deleted-flag".to_sym => flag.merge(key: "deleted-flag", deleted: true), + }, + } + subject.set_basis(collections) + + result = subject.all(FEATURES) + expect(result.keys).to contain_exactly(flag_key.to_sym) + expect(result[flag_key.to_sym].key).to eq(flag_key) + end + + it "returns items for both flags and segments" do + collections = { + FEATURES => { flag_key.to_sym => flag }, + SEGMENTS => { segment_key.to_sym => segment }, + } + subject.set_basis(collections) + + expect(subject.all(FEATURES).keys).to contain_exactly(flag_key.to_sym) + expect(subject.all(SEGMENTS).keys).to contain_exactly(segment_key.to_sym) + end + end + + describe "#set_basis" do + it "initializes the store with valid data" do + collections = { + FEATURES => { flag_key.to_sym => flag }, + SEGMENTS => { segment_key.to_sym => segment }, + } + + result = subject.set_basis(collections) + expect(result).to be true + expect(subject.initialized?).to be true + expect(subject.get(FEATURES, flag_key)).not_to be_nil + expect(subject.get(SEGMENTS, segment_key)).not_to be_nil + end + + it "replaces existing data" do + # Set initial data + initial_collections = { + FEATURES => { flag_key.to_sym => flag }, + } + subject.set_basis(initial_collections) + + # Replace with new data + new_flag = flag.merge(key: "new-flag", version: 2) + new_collections = { + FEATURES => { "new-flag".to_sym => new_flag }, + } + result = subject.set_basis(new_collections) + + expect(result).to be true + expect(subject.get(FEATURES, flag_key)).to be_nil # Old flag gone + expect(subject.get(FEATURES, "new-flag")).not_to be_nil + end + + it "clears all data before setting new data" do + subject.set_basis({ + FEATURES => { flag_key.to_sym => flag }, + SEGMENTS => { segment_key.to_sym => segment }, + }) + + # Replace with data that only has flags + new_collections = { + FEATURES => { "new-flag".to_sym => flag.merge(key: "new-flag") }, + SEGMENTS => {}, + } + subject.set_basis(new_collections) + + expect(subject.all(SEGMENTS)).to be_empty + end + + it "handles multiple flags and segments" do + flag1 = flag.merge(key: "flag-1") + flag2 = flag.merge(key: "flag-2", version: 2) + flag3 = flag.merge(key: "flag-3", version: 3) + + segment1 = segment.merge(key: "segment-1") + segment2 = segment.merge(key: "segment-2", version: 2) + + collections = { + FEATURES => { + "flag-1".to_sym => flag1, + "flag-2".to_sym => flag2, + "flag-3".to_sym => flag3, + }, + SEGMENTS => { + "segment-1".to_sym => segment1, + "segment-2".to_sym => segment2, + }, + } + + result = subject.set_basis(collections) + expect(result).to be true + + expect(subject.all(FEATURES).size).to eq(3) + expect(subject.all(SEGMENTS).size).to eq(2) + end + + it "returns false and logs error on deserialization failure" do + allow(LaunchDarkly::Impl::Model).to receive(:deserialize).and_raise(StandardError.new("test error")) + + collections = { FEATURES => { flag_key.to_sym => flag } } + result = subject.set_basis(collections) + + expect(result).to be false + expect(subject.initialized?).to be false + end + + it "handles empty collections" do + result = subject.set_basis({ FEATURES => {}, SEGMENTS => {} }) + expect(result).to be true + expect(subject.initialized?).to be true + end + end + + describe "#apply_delta" do + before do + # Set initial data + collections = { + FEATURES => { flag_key.to_sym => flag }, + SEGMENTS => { segment_key.to_sym => segment }, + } + subject.set_basis(collections) + end + + it "adds new items without clearing existing data" do + new_flag = flag.merge(key: "new-flag", version: 2) + delta = { + FEATURES => { "new-flag".to_sym => new_flag }, + } + + result = subject.apply_delta(delta) + expect(result).to be true + + # Original flag should still exist + expect(subject.get(FEATURES, flag_key)).not_to be_nil + # New flag should be added + expect(subject.get(FEATURES, "new-flag")).not_to be_nil + # Segment should be unchanged + expect(subject.get(SEGMENTS, segment_key)).not_to be_nil + end + + it "updates existing items" do + updated_flag = flag.merge(version: 2, on: false) + delta = { + FEATURES => { flag_key.to_sym => updated_flag }, + } + + result = subject.apply_delta(delta) + expect(result).to be true + + result = subject.get(FEATURES, flag_key) + expect(result.version).to eq(2) + expect(result.on).to be false + end + + it "handles multiple updates in one delta" do + flag2 = flag.merge(key: "flag-2", version: 2) + flag3 = flag.merge(key: "flag-3", version: 3) + segment2 = segment.merge(key: "segment-2", version: 2) + + delta = { + FEATURES => { + "flag-2".to_sym => flag2, + "flag-3".to_sym => flag3, + }, + SEGMENTS => { + "segment-2".to_sym => segment2, + }, + } + + result = subject.apply_delta(delta) + expect(result).to be true + + # Original items unchanged + expect(subject.get(FEATURES, flag_key)).not_to be_nil + expect(subject.get(SEGMENTS, segment_key)).not_to be_nil + + # New items added + expect(subject.get(FEATURES, "flag-2")).not_to be_nil + expect(subject.get(FEATURES, "flag-3")).not_to be_nil + expect(subject.get(SEGMENTS, "segment-2")).not_to be_nil + end + + it "handles delete operations" do + deleted_flag = { key: flag_key, version: 2, deleted: true } + delta = { + FEATURES => { flag_key.to_sym => deleted_flag }, + } + + result = subject.apply_delta(delta) + expect(result).to be true + + # Deleted flag should return nil + expect(subject.get(FEATURES, flag_key)).to be_nil + end + + it "returns false and logs error on deserialization failure" do + allow(LaunchDarkly::Impl::Model).to receive(:deserialize).and_raise(StandardError.new("test error")) + + delta = { FEATURES => { "new-flag".to_sym => flag } } + result = subject.apply_delta(delta) + + expect(result).to be false + # Original data should be intact + expect(subject.get(FEATURES, flag_key)).not_to be_nil + end + + it "handles empty delta" do + result = subject.apply_delta({ FEATURES => {}, SEGMENTS => {} }) + expect(result).to be true + + # Original data unchanged + expect(subject.get(FEATURES, flag_key)).not_to be_nil + expect(subject.get(SEGMENTS, segment_key)).not_to be_nil + end + end + + describe "thread safety" do + it "handles concurrent reads and writes" do + subject.set_basis({ FEATURES => { flag_key.to_sym => flag } }) + + threads = [] + errors = [] + + # Writer threads + 5.times do |i| + threads << Thread.new do + begin + 10.times do |j| + new_flag = flag.merge(key: "flag-#{i}-#{j}", version: j + 1) + subject.apply_delta({ FEATURES => { "flag-#{i}-#{j}".to_sym => new_flag } }) + end + rescue => e + errors << e + end + end + end + + # Reader threads + 5.times do + threads << Thread.new do + begin + 20.times do + subject.get(FEATURES, flag_key) + subject.all(FEATURES) + end + rescue => e + errors << e + end + end + end + + threads.each(&:join) + expect(errors).to be_empty + end end end end diff --git a/spec/impl/data_store/store_spec.rb b/spec/impl/data_store/store_spec.rb new file mode 100644 index 00000000..b3260ecb --- /dev/null +++ b/spec/impl/data_store/store_spec.rb @@ -0,0 +1,584 @@ +# frozen_string_literal: true + +require "spec_helper" +require "ldclient-rb/impl/data_store/store" +require "ldclient-rb/impl/broadcaster" +require "ldclient-rb/impl/data_store" +require "ldclient-rb/interfaces/data_system" + +module LaunchDarkly + module Impl + module DataStore + describe Store do + let(:logger) { double.as_null_object } + let(:flag_change_broadcaster) { LaunchDarkly::Impl::Broadcaster.new(Concurrent::SingleThreadExecutor.new, logger) } + let(:change_set_broadcaster) { LaunchDarkly::Impl::Broadcaster.new(Concurrent::SingleThreadExecutor.new, logger) } + subject { Store.new(flag_change_broadcaster, change_set_broadcaster, logger) } + + let(:flag_key) { :test_flag } # Use symbol like TestDataV2 does + let(:flag_data) do + { + key: flag_key.to_s, # key field in the object should be string + version: 1, + on: true, + variations: [true, false], + fallthrough: { variation: 0 }, + } + end + + let(:segment_key) { :test_segment } # Use symbol like TestDataV2 does + let(:segment_data) do + { + key: segment_key.to_s, # key field in the object should be string + version: 1, + included: ["user1"], + excluded: [], + rules: [], + } + end + + # Stub feature store for testing persistence + class StubPersistentStore + include LaunchDarkly::Interfaces::FeatureStore + + attr_reader :init_called_count, :upsert_calls, :data, :init_errors + + def initialize(should_fail: false) + @data = { + LaunchDarkly::Impl::DataStore::FEATURES => {}, + LaunchDarkly::Impl::DataStore::SEGMENTS => {}, + } + @initialized = false + @init_called_count = 0 + @upsert_calls = [] + @should_fail = should_fail + @init_errors = [] + end + + def init(all_data) + @init_called_count += 1 + if @should_fail + error = RuntimeError.new("Simulated persistent store failure") + @init_errors << error + raise error + end + @data[LaunchDarkly::Impl::DataStore::FEATURES] = (all_data[LaunchDarkly::Impl::DataStore::FEATURES] || {}).dup + @data[LaunchDarkly::Impl::DataStore::SEGMENTS] = (all_data[LaunchDarkly::Impl::DataStore::SEGMENTS] || {}).dup + @initialized = true + end + + def get(kind, key) + # Store uses symbol keys internally + item = @data[kind][key.to_sym] || @data[kind][key.to_s] + item && !item[:deleted] ? item : nil + end + + def all(kind) + @data[kind].reject { |_k, v| v[:deleted] } + end + + def upsert(kind, item) + @upsert_calls << [kind, item[:key], item[:version]] + # Use symbol keys consistently + key = item[:key].is_a?(Symbol) ? item[:key] : item[:key].to_sym + @data[kind][key] = item + end + + def delete(kind, key, version) + @data[kind][key.to_sym] = { key: key, version: version, deleted: true } + end + + def initialized? + @initialized + end + + def stop + # No-op + end + + def reset_tracking + @init_called_count = 0 + @upsert_calls = [] + @init_errors = [] + end + end + + describe "#apply" do + it "applies TRANSFER_FULL changeset with set_basis" do + change = LaunchDarkly::Interfaces::DataSystem::Change.new( + action: LaunchDarkly::Interfaces::DataSystem::ChangeType::PUT, + kind: LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + key: flag_key, + version: 1, + object: flag_data + ) + + change_set = LaunchDarkly::Interfaces::DataSystem::ChangeSet.new( + intent_code: LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL, + changes: [change], + selector: LaunchDarkly::Interfaces::DataSystem::Selector.no_selector + ) + + subject.apply(change_set, false) + + expect(subject.initialized?).to be true + # InMemoryFeatureStoreV2's get method handles both string and symbol keys + result = subject.get_active_store.get(FEATURES, flag_key) + expect(result).not_to be_nil + expect(result.key).to eq(flag_key.to_s) # key field is string + end + + it "applies TRANSFER_CHANGES changeset with apply_delta" do + # First initialize with some data + initial_change = LaunchDarkly::Interfaces::DataSystem::Change.new( + action: LaunchDarkly::Interfaces::DataSystem::ChangeType::PUT, + kind: LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + key: flag_key, + version: 1, + object: flag_data + ) + + initial_change_set = LaunchDarkly::Interfaces::DataSystem::ChangeSet.new( + intent_code: LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL, + changes: [initial_change], + selector: LaunchDarkly::Interfaces::DataSystem::Selector.no_selector + ) + + subject.apply(initial_change_set, false) + + # Now apply a delta + updated_flag_data = flag_data.merge(version: 2, on: false) + delta_change = LaunchDarkly::Interfaces::DataSystem::Change.new( + action: LaunchDarkly::Interfaces::DataSystem::ChangeType::PUT, + kind: LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + key: flag_key, + version: 2, + object: updated_flag_data + ) + + delta_change_set = LaunchDarkly::Interfaces::DataSystem::ChangeSet.new( + intent_code: LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_CHANGES, + changes: [delta_change], + selector: LaunchDarkly::Interfaces::DataSystem::Selector.no_selector + ) + + subject.apply(delta_change_set, false) + + result = subject.get_active_store.get(FEATURES, flag_key) + expect(result.version).to eq(2) + expect(result.on).to be false + end + + it "handles TRANSFER_NONE as no-op" do + change_set = LaunchDarkly::Interfaces::DataSystem::ChangeSet.new( + intent_code: LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_NONE, + changes: [], + selector: LaunchDarkly::Interfaces::DataSystem::Selector.no_selector + ) + + subject.apply(change_set, false) + expect(subject.initialized?).to be false + end + + it "applies DELETE changes" do + # Initialize with flag + put_change = LaunchDarkly::Interfaces::DataSystem::Change.new( + action: LaunchDarkly::Interfaces::DataSystem::ChangeType::PUT, + kind: LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + key: flag_key, + version: 1, + object: flag_data + ) + + init_change_set = LaunchDarkly::Interfaces::DataSystem::ChangeSet.new( + intent_code: LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL, + changes: [put_change], + selector: LaunchDarkly::Interfaces::DataSystem::Selector.no_selector + ) + + subject.apply(init_change_set, false) + expect(subject.get_active_store.get(FEATURES, flag_key)).not_to be_nil + + # Delete the flag + delete_change = LaunchDarkly::Interfaces::DataSystem::Change.new( + action: LaunchDarkly::Interfaces::DataSystem::ChangeType::DELETE, + kind: LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + key: flag_key, + version: 2, + object: nil + ) + + delete_change_set = LaunchDarkly::Interfaces::DataSystem::ChangeSet.new( + intent_code: LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_CHANGES, + changes: [delete_change], + selector: LaunchDarkly::Interfaces::DataSystem::Selector.no_selector + ) + + subject.apply(delete_change_set, false) + expect(subject.get_active_store.get(FEATURES, flag_key)).to be_nil + end + + it "broadcasts changeset to listeners" do + received_changesets = [] + listener = Object.new + listener.define_singleton_method(:update) do |change_set| + received_changesets << change_set + end + change_set_broadcaster.add_listener(listener) + + change = LaunchDarkly::Interfaces::DataSystem::Change.new( + action: LaunchDarkly::Interfaces::DataSystem::ChangeType::PUT, + kind: LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + key: flag_key, + version: 1, + object: flag_data + ) + + change_set = LaunchDarkly::Interfaces::DataSystem::ChangeSet.new( + intent_code: LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL, + changes: [change], + selector: LaunchDarkly::Interfaces::DataSystem::Selector.no_selector + ) + + subject.apply(change_set, false) + + # Give broadcaster time to notify + sleep 0.1 + + expect(received_changesets).not_to be_empty + expect(received_changesets.first.changes.first.key).to eq(flag_key) + end + end + + describe "#commit" do + context "without persistent store" do + it "returns nil and does nothing" do + change = LaunchDarkly::Interfaces::DataSystem::Change.new( + action: LaunchDarkly::Interfaces::DataSystem::ChangeType::PUT, + kind: LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + key: flag_key, + version: 1, + object: flag_data + ) + + change_set = LaunchDarkly::Interfaces::DataSystem::ChangeSet.new( + intent_code: LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL, + changes: [change], + selector: LaunchDarkly::Interfaces::DataSystem::Selector.no_selector + ) + + subject.apply(change_set, false) + + error = subject.commit + expect(error).to be_nil + end + end + + context "with writable persistent store" do + let(:persistent_store) { StubPersistentStore.new } + + before do + subject.with_persistence(persistent_store, true, nil) + end + + it "writes in-memory data to persistent store" do + # Add data to in-memory store + change = LaunchDarkly::Interfaces::DataSystem::Change.new( + action: LaunchDarkly::Interfaces::DataSystem::ChangeType::PUT, + kind: LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + key: flag_key, + version: 1, + object: flag_data + ) + + change_set = LaunchDarkly::Interfaces::DataSystem::ChangeSet.new( + intent_code: LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL, + changes: [change], + selector: LaunchDarkly::Interfaces::DataSystem::Selector.no_selector + ) + + subject.apply(change_set, true) # persist=true + persistent_store.reset_tracking + + # Commit should write to persistent store + error = subject.commit + expect(error).to be_nil + expect(persistent_store.init_called_count).to eq(1) + + # Verify data in persistent store + stored_flag = persistent_store.get(FEATURES, flag_key) + expect(stored_flag).not_to be_nil + expect(stored_flag[:key]).to eq(flag_key.to_s) # key field is string + end + + it "encodes data correctly before writing" do + flag_with_complex_data = flag_data.merge( + rules: [{ id: "rule1", variation: 0 }], + prerequisites: [{ key: "prereq1", variation: 1 }] + ) + + change = LaunchDarkly::Interfaces::DataSystem::Change.new( + action: LaunchDarkly::Interfaces::DataSystem::ChangeType::PUT, + kind: LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + key: flag_key, + version: 1, + object: flag_with_complex_data + ) + + change_set = LaunchDarkly::Interfaces::DataSystem::ChangeSet.new( + intent_code: LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL, + changes: [change], + selector: LaunchDarkly::Interfaces::DataSystem::Selector.no_selector + ) + + subject.apply(change_set, true) + persistent_store.reset_tracking + + error = subject.commit + expect(error).to be_nil + + stored_flag = persistent_store.get(FEATURES, flag_key) + expect(stored_flag[:key]).to eq(flag_key.to_s) # key field is string + expect(stored_flag[:version]).to eq(1) + end + + it "returns exception when persistent store fails" do + failing_store = StubPersistentStore.new(should_fail: true) + subject.with_persistence(failing_store, true, nil) + + change = LaunchDarkly::Interfaces::DataSystem::Change.new( + action: LaunchDarkly::Interfaces::DataSystem::ChangeType::PUT, + kind: LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + key: flag_key, + version: 1, + object: flag_data + ) + + change_set = LaunchDarkly::Interfaces::DataSystem::ChangeSet.new( + intent_code: LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL, + changes: [change], + selector: LaunchDarkly::Interfaces::DataSystem::Selector.no_selector + ) + + subject.apply(change_set, true) + + error = subject.commit + expect(error).to be_a(RuntimeError) + expect(error.message).to include("Simulated persistent store failure") + end + end + + context "with read-only persistent store" do + let(:persistent_store) { StubPersistentStore.new } + + before do + subject.with_persistence(persistent_store, false, nil) # writable=false + end + + it "does not write to read-only store" do + change = LaunchDarkly::Interfaces::DataSystem::Change.new( + action: LaunchDarkly::Interfaces::DataSystem::ChangeType::PUT, + kind: LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + key: flag_key, + version: 1, + object: flag_data + ) + + change_set = LaunchDarkly::Interfaces::DataSystem::ChangeSet.new( + intent_code: LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL, + changes: [change], + selector: LaunchDarkly::Interfaces::DataSystem::Selector.no_selector + ) + + subject.apply(change_set, true) # persist=true + persistent_store.reset_tracking + + error = subject.commit + expect(error).to be_nil + expect(persistent_store.init_called_count).to eq(0) + end + end + end + + describe "persistent store integration" do + let(:persistent_store) { StubPersistentStore.new } + + context "in READ_WRITE mode" do + before do + subject.with_persistence(persistent_store, true, nil) + end + + it "writes full data sets to persistent store" do + change = LaunchDarkly::Interfaces::DataSystem::Change.new( + action: LaunchDarkly::Interfaces::DataSystem::ChangeType::PUT, + kind: LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + key: flag_key, + version: 1, + object: flag_data + ) + + change_set = LaunchDarkly::Interfaces::DataSystem::ChangeSet.new( + intent_code: LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL, + changes: [change], + selector: LaunchDarkly::Interfaces::DataSystem::Selector.no_selector + ) + + subject.apply(change_set, true) + + expect(persistent_store.init_called_count).to be >= 1 + stored_flag = persistent_store.get(FEATURES, flag_key) + expect(stored_flag).not_to be_nil + end + + it "writes delta updates to persistent store using upsert" do + # Initialize + init_change = LaunchDarkly::Interfaces::DataSystem::Change.new( + action: LaunchDarkly::Interfaces::DataSystem::ChangeType::PUT, + kind: LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + key: flag_key, + version: 1, + object: flag_data + ) + + init_change_set = LaunchDarkly::Interfaces::DataSystem::ChangeSet.new( + intent_code: LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL, + changes: [init_change], + selector: LaunchDarkly::Interfaces::DataSystem::Selector.no_selector + ) + + subject.apply(init_change_set, true) + persistent_store.reset_tracking + + # Apply delta + updated_flag_data = flag_data.merge(version: 2, on: false) + delta_change = LaunchDarkly::Interfaces::DataSystem::Change.new( + action: LaunchDarkly::Interfaces::DataSystem::ChangeType::PUT, + kind: LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + key: flag_key, + version: 2, + object: updated_flag_data + ) + + delta_change_set = LaunchDarkly::Interfaces::DataSystem::ChangeSet.new( + intent_code: LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_CHANGES, + changes: [delta_change], + selector: LaunchDarkly::Interfaces::DataSystem::Selector.no_selector + ) + + subject.apply(delta_change_set, true) + + expect(persistent_store.upsert_calls).not_to be_empty + expect(persistent_store.upsert_calls.any? { |call| call[1] == flag_key.to_s }).to be true # upsert stores string key + end + + it "writes DELETE operations to persistent store" do + # Initialize + init_change = LaunchDarkly::Interfaces::DataSystem::Change.new( + action: LaunchDarkly::Interfaces::DataSystem::ChangeType::PUT, + kind: LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + key: flag_key, + version: 1, + object: flag_data + ) + + init_change_set = LaunchDarkly::Interfaces::DataSystem::ChangeSet.new( + intent_code: LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL, + changes: [init_change], + selector: LaunchDarkly::Interfaces::DataSystem::Selector.no_selector + ) + + subject.apply(init_change_set, true) + persistent_store.reset_tracking + + # Delete + delete_change = LaunchDarkly::Interfaces::DataSystem::Change.new( + action: LaunchDarkly::Interfaces::DataSystem::ChangeType::DELETE, + kind: LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + key: flag_key, + version: 2, + object: nil + ) + + delete_change_set = LaunchDarkly::Interfaces::DataSystem::ChangeSet.new( + intent_code: LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_CHANGES, + changes: [delete_change], + selector: LaunchDarkly::Interfaces::DataSystem::Selector.no_selector + ) + + subject.apply(delete_change_set, true) + + expect(persistent_store.upsert_calls).not_to be_empty + # Verify deleted flag was written + stored_flag = persistent_store.get(FEATURES, flag_key) + expect(stored_flag).to be_nil # get returns nil for deleted items + end + end + + context "in READ_ONLY mode" do + before do + subject.with_persistence(persistent_store, false, nil) + end + + it "does not write to persistent store on full data set" do + change = LaunchDarkly::Interfaces::DataSystem::Change.new( + action: LaunchDarkly::Interfaces::DataSystem::ChangeType::PUT, + kind: LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + key: flag_key, + version: 1, + object: flag_data + ) + + change_set = LaunchDarkly::Interfaces::DataSystem::ChangeSet.new( + intent_code: LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL, + changes: [change], + selector: LaunchDarkly::Interfaces::DataSystem::Selector.no_selector + ) + + subject.apply(change_set, true) + + expect(persistent_store.init_called_count).to eq(0) + end + + it "does not write to persistent store on delta updates" do + # Initialize (this won't write due to READ_ONLY) + init_change = LaunchDarkly::Interfaces::DataSystem::Change.new( + action: LaunchDarkly::Interfaces::DataSystem::ChangeType::PUT, + kind: LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + key: flag_key, + version: 1, + object: flag_data + ) + + init_change_set = LaunchDarkly::Interfaces::DataSystem::ChangeSet.new( + intent_code: LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL, + changes: [init_change], + selector: LaunchDarkly::Interfaces::DataSystem::Selector.no_selector + ) + + subject.apply(init_change_set, true) + + # Apply delta + delta_change = LaunchDarkly::Interfaces::DataSystem::Change.new( + action: LaunchDarkly::Interfaces::DataSystem::ChangeType::PUT, + kind: LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + key: "new-flag", + version: 1, + object: flag_data.merge(key: "new-flag") + ) + + delta_change_set = LaunchDarkly::Interfaces::DataSystem::ChangeSet.new( + intent_code: LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_CHANGES, + changes: [delta_change], + selector: LaunchDarkly::Interfaces::DataSystem::Selector.no_selector + ) + + subject.apply(delta_change_set, true) + + expect(persistent_store.upsert_calls).to be_empty + end + end + end + end + end + end +end diff --git a/spec/impl/data_system/fdv2_persistence_spec.rb b/spec/impl/data_system/fdv2_persistence_spec.rb index 71e58122..ebee45ff 100644 --- a/spec/impl/data_system/fdv2_persistence_spec.rb +++ b/spec/impl/data_system/fdv2_persistence_spec.rb @@ -298,6 +298,190 @@ def get_data_snapshot fdv2.stop end + + it "writes delta updates to persistent store in READ_WRITE mode" do + persistent_store = StubFeatureStore.new + + td_synchronizer = LaunchDarkly::Integrations::TestDataV2.data_source + td_synchronizer.update(td_synchronizer.flag("flagkey").on(true)) + + data_system_config = LaunchDarkly::DataSystem::ConfigBuilder.new + .initializers(nil) + .synchronizers(td_synchronizer.method(:build_synchronizer)) + .data_store(persistent_store, :read_write) + .build + + fdv2 = FDv2.new(sdk_key, config, data_system_config) + + ready_event = fdv2.start + expect(ready_event.wait(2)).to be true + + # Wait a bit for initial sync to complete + sleep 0.2 + + # Reset tracking after initial sync + persistent_store.reset_operation_tracking + + # Set up flag change listener to detect the update + flag_changed = Concurrent::Event.new + listener = Object.new + listener.define_singleton_method(:update) do |flag_change| + flag_changed.set if flag_change.key == :flagkey + end + + fdv2.flag_change_broadcaster.add_listener(listener) + + # Make a delta update + td_synchronizer.update(td_synchronizer.flag("flagkey").on(false)) + + # Wait for flag change to propagate + expect(flag_changed.wait(3)).to be true + + # Verify the update was written to persistent store via upsert + # (The test verifies upsert was called; exact timing of snapshot may vary) + expect(persistent_store.upsert_calls).not_to be_empty + expect(persistent_store.upsert_calls.any? { |call| call[1] == "flagkey" && call[2] >= 2 }).to be true # version should be >= 2 for the update + + fdv2.stop + end + + it "does not write delta updates to persistent store in READ_ONLY mode" do + persistent_store = StubFeatureStore.new + + td_synchronizer = LaunchDarkly::Integrations::TestDataV2.data_source + td_synchronizer.update(td_synchronizer.flag("flagkey").on(true)) + + data_system_config = LaunchDarkly::DataSystem::ConfigBuilder.new + .initializers(nil) + .synchronizers(td_synchronizer.method(:build_synchronizer)) + .data_store(persistent_store, :read_only) + .build + + fdv2 = FDv2.new(sdk_key, config, data_system_config) + + # Set up flag change listener + flag_changed = Concurrent::Event.new + change_count = 0 + + listener = Object.new + listener.define_singleton_method(:update) do |_flag_change| + change_count += 1 + flag_changed.set if change_count == 2 + end + + fdv2.flag_change_broadcaster.add_listener(listener) + ready_event = fdv2.start + + expect(ready_event.wait(2)).to be true + + persistent_store.reset_operation_tracking + + # Make a delta update + td_synchronizer.update(td_synchronizer.flag("flagkey").on(false)) + + # Wait for flag change + expect(flag_changed.wait(2)).to be true + + # Verify NO updates were written to persistent store in READ_ONLY mode + expect(persistent_store.upsert_calls).to be_empty + + fdv2.stop + end + + it "persists data from both initializer and synchronizer in READ_WRITE mode" do + persistent_store = StubFeatureStore.new + + # Create initializer with one flag + td_initializer = LaunchDarkly::Integrations::TestDataV2.data_source + td_initializer.update(td_initializer.flag("init-flag").on(true)) + + # Create synchronizer with another flag + td_synchronizer = LaunchDarkly::Integrations::TestDataV2.data_source + td_synchronizer.update(td_synchronizer.flag("sync-flag").on(false)) + + data_system_config = LaunchDarkly::DataSystem::ConfigBuilder.new + .initializers([td_initializer.method(:build_initializer)]) + .synchronizers(td_synchronizer.method(:build_synchronizer)) + .data_store(persistent_store, :read_write) + .build + + fdv2 = FDv2.new(sdk_key, config, data_system_config) + + # Set up flag change listener to detect when synchronizer data arrives + sync_flag_arrived = Concurrent::Event.new + + listener = Object.new + listener.define_singleton_method(:update) do |flag_change| + sync_flag_arrived.set if flag_change.key == :"sync-flag" + end + + fdv2.flag_change_broadcaster.add_listener(listener) + ready_event = fdv2.start + + expect(ready_event.wait(2)).to be true + + # Wait for synchronizer to fully initialize + expect(sync_flag_arrived.wait(2)).to be true + + # The synchronizer flag should be in the persistent store + # (synchronizer does a full data set transfer, replacing initializer data) + snapshot = persistent_store.get_data_snapshot + expect(snapshot[LaunchDarkly::Impl::DataStore::FEATURES]).not_to have_key(:"init-flag") + expect(snapshot[LaunchDarkly::Impl::DataStore::FEATURES]).to have_key(:"sync-flag") + + fdv2.stop + end + + it "handles data store status provider correctly" do + persistent_store = StubFeatureStore.new + + td_synchronizer = LaunchDarkly::Integrations::TestDataV2.data_source + td_synchronizer.update(td_synchronizer.flag("flagkey").on(true)) + + data_system_config = LaunchDarkly::DataSystem::ConfigBuilder.new + .initializers(nil) + .synchronizers(td_synchronizer.method(:build_synchronizer)) + .data_store(persistent_store, :read_write) + .build + + fdv2 = FDv2.new(sdk_key, config, data_system_config) + + # Verify data store status provider exists + status_provider = fdv2.data_store_status_provider + expect(status_provider).not_to be_nil + + # Get initial status + status = status_provider.status + expect(status).not_to be_nil + expect(status.available).to be true + + ready_event = fdv2.start + expect(ready_event.wait(2)).to be true + + fdv2.stop + end + + it "has data store status provider even without persistent store" do + td_synchronizer = LaunchDarkly::Integrations::TestDataV2.data_source + td_synchronizer.update(td_synchronizer.flag("flagkey").on(true)) + + data_system_config = LaunchDarkly::DataSystem::ConfigBuilder.new + .initializers(nil) + .synchronizers(td_synchronizer.method(:build_synchronizer)) + .data_store(nil, :read_write) # No persistent store + .build + + fdv2 = FDv2.new(sdk_key, config, data_system_config) + + # Status provider should exist but not be monitoring + status_provider = fdv2.data_store_status_provider + expect(status_provider).not_to be_nil + + ready_event = fdv2.start + expect(ready_event.wait(2)).to be true + + fdv2.stop + end end end end