From 200f4d6ff1688c4342549fa531188a3f3d511d20 Mon Sep 17 00:00:00 2001 From: chatton Date: Wed, 14 Jan 2026 16:20:52 +0000 Subject: [PATCH 01/11] feat: add sequencer tracing instrumentation Add OpenTelemetry tracing for the core Sequencer interface. This traces all three main operations: - SubmitBatchTxs: tracks tx count and batch size - GetNextBatch: tracks tx count, forced inclusion count, batch size - VerifyBatch: tracks batch data count and verification result The tracing wrapper can be used with any Sequencer implementation (single, based, etc.) via WithTracingSequencer(). --- block/components.go | 6 + pkg/telemetry/sequencer_tracing.go | 128 +++++++++++ pkg/telemetry/sequencer_tracing_test.go | 293 ++++++++++++++++++++++++ 3 files changed, 427 insertions(+) create mode 100644 pkg/telemetry/sequencer_tracing.go create mode 100644 pkg/telemetry/sequencer_tracing_test.go diff --git a/block/components.go b/block/components.go index 3d276206b..f37c10737 100644 --- a/block/components.go +++ b/block/components.go @@ -17,6 +17,7 @@ import ( coreexecutor "github.com/evstack/ev-node/core/execution" coresequencer "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/pkg/config" + "github.com/evstack/ev-node/pkg/telemetry" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/store" @@ -205,6 +206,11 @@ func NewAggregatorComponents( // error channel for critical failures errorCh := make(chan error, 1) + // wrap sequencer with tracing if enabled + if config.Instrumentation.IsTracingEnabled() { + sequencer = telemetry.WithTracingSequencer(sequencer) + } + executor, err := executing.NewExecutor( store, exec, diff --git a/pkg/telemetry/sequencer_tracing.go b/pkg/telemetry/sequencer_tracing.go new file mode 100644 index 000000000..ce6902e67 --- /dev/null +++ b/pkg/telemetry/sequencer_tracing.go @@ -0,0 +1,128 @@ +package telemetry + +import ( + "context" + "encoding/hex" + + coresequencer "github.com/evstack/ev-node/core/sequencer" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +var _ coresequencer.Sequencer = (*tracedSequencer)(nil) + +// tracedSequencer decorates a Sequencer with OpenTelemetry spans. +type tracedSequencer struct { + inner coresequencer.Sequencer + tracer trace.Tracer +} + +// WithTracingSequencer decorates the provided Sequencer with tracing spans. +func WithTracingSequencer(inner coresequencer.Sequencer) coresequencer.Sequencer { + return &tracedSequencer{ + inner: inner, + tracer: otel.Tracer("ev-node/sequencer"), + } +} + +func (t *tracedSequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) { + txCount := 0 + totalBytes := 0 + if req.Batch != nil { + txCount = len(req.Batch.Transactions) + for _, tx := range req.Batch.Transactions { + totalBytes += len(tx) + } + } + + ctx, span := t.tracer.Start(ctx, "Sequencer.SubmitBatchTxs", + trace.WithAttributes( + attribute.String("chain.id", hex.EncodeToString(req.Id)), + attribute.Int("tx.count", txCount), + attribute.Int("batch.size_bytes", totalBytes), + ), + ) + defer span.End() + + res, err := t.inner.SubmitBatchTxs(ctx, req) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + return res, nil +} + +func (t *tracedSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextBatchRequest) (*coresequencer.GetNextBatchResponse, error) { + ctx, span := t.tracer.Start(ctx, "Sequencer.GetNextBatch", + trace.WithAttributes( + attribute.String("chain.id", hex.EncodeToString(req.Id)), + attribute.Int64("max_bytes", int64(req.MaxBytes)), + ), + ) + defer span.End() + + res, err := t.inner.GetNextBatch(ctx, req) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + if res.Batch != nil { + txCount := len(res.Batch.Transactions) + forcedCount := 0 + for _, forced := range res.Batch.ForceIncludedMask { + if forced { + forcedCount++ + } + } + totalBytes := 0 + for _, tx := range res.Batch.Transactions { + totalBytes += len(tx) + } + + span.SetAttributes( + attribute.Int("tx.count", txCount), + attribute.Int("forced_inclusion.count", forcedCount), + attribute.Int("batch.size_bytes", totalBytes), + attribute.Int64("timestamp", res.Timestamp.Unix()), + ) + } + + return res, nil +} + +func (t *tracedSequencer) VerifyBatch(ctx context.Context, req coresequencer.VerifyBatchRequest) (*coresequencer.VerifyBatchResponse, error) { + ctx, span := t.tracer.Start(ctx, "Sequencer.VerifyBatch", + trace.WithAttributes( + attribute.String("chain.id", hex.EncodeToString(req.Id)), + attribute.Int("batch_data.count", len(req.BatchData)), + ), + ) + defer span.End() + + res, err := t.inner.VerifyBatch(ctx, req) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + span.SetAttributes( + attribute.Bool("verified", res.Status), + ) + + return res, nil +} + +func (t *tracedSequencer) SetDAHeight(height uint64) { + t.inner.SetDAHeight(height) +} + +func (t *tracedSequencer) GetDAHeight() uint64 { + return t.inner.GetDAHeight() +} diff --git a/pkg/telemetry/sequencer_tracing_test.go b/pkg/telemetry/sequencer_tracing_test.go new file mode 100644 index 000000000..244024075 --- /dev/null +++ b/pkg/telemetry/sequencer_tracing_test.go @@ -0,0 +1,293 @@ +package telemetry + +import ( + "context" + "errors" + "testing" + "time" + + coresequencer "github.com/evstack/ev-node/core/sequencer" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" +) + +type mockSequencer struct { + submitBatchTxsFn func(context.Context, coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) + getNextBatchFn func(context.Context, coresequencer.GetNextBatchRequest) (*coresequencer.GetNextBatchResponse, error) + verifyBatchFn func(context.Context, coresequencer.VerifyBatchRequest) (*coresequencer.VerifyBatchResponse, error) + daHeight uint64 +} + +func (m *mockSequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) { + if m.submitBatchTxsFn != nil { + return m.submitBatchTxsFn(ctx, req) + } + return &coresequencer.SubmitBatchTxsResponse{}, nil +} + +func (m *mockSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextBatchRequest) (*coresequencer.GetNextBatchResponse, error) { + if m.getNextBatchFn != nil { + return m.getNextBatchFn(ctx, req) + } + return &coresequencer.GetNextBatchResponse{ + Batch: &coresequencer.Batch{}, + Timestamp: time.Now(), + }, nil +} + +func (m *mockSequencer) VerifyBatch(ctx context.Context, req coresequencer.VerifyBatchRequest) (*coresequencer.VerifyBatchResponse, error) { + if m.verifyBatchFn != nil { + return m.verifyBatchFn(ctx, req) + } + return &coresequencer.VerifyBatchResponse{Status: true}, nil +} + +func (m *mockSequencer) SetDAHeight(height uint64) { + m.daHeight = height +} + +func (m *mockSequencer) GetDAHeight() uint64 { + return m.daHeight +} + +var _ coresequencer.Sequencer = (*mockSequencer)(nil) + +func setupSequencerTrace(t *testing.T, inner coresequencer.Sequencer) (coresequencer.Sequencer, *tracetest.SpanRecorder) { + t.Helper() + sr := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + t.Cleanup(func() { _ = tp.Shutdown(context.Background()) }) + otel.SetTracerProvider(tp) + return WithTracingSequencer(inner), sr +} + +func TestTracedSequencer_SubmitBatchTxs_Success(t *testing.T) { + mock := &mockSequencer{ + submitBatchTxsFn: func(ctx context.Context, req coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) { + return &coresequencer.SubmitBatchTxsResponse{}, nil + }, + } + seq, sr := setupSequencerTrace(t, mock) + ctx := context.Background() + + req := coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{ + Transactions: [][]byte{[]byte("tx1"), []byte("tx2"), []byte("tx3")}, + }, + } + + res, err := seq.SubmitBatchTxs(ctx, req) + require.NoError(t, err) + require.NotNil(t, res) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "Sequencer.SubmitBatchTxs", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + requireSequencerAttribute(t, attrs, "tx.count", 3) + requireSequencerAttribute(t, attrs, "batch.size_bytes", 9) // "tx1" + "tx2" + "tx3" = 9 bytes +} + +func TestTracedSequencer_SubmitBatchTxs_Error(t *testing.T) { + mock := &mockSequencer{ + submitBatchTxsFn: func(ctx context.Context, req coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) { + return nil, errors.New("queue full") + }, + } + seq, sr := setupSequencerTrace(t, mock) + ctx := context.Background() + + req := coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{Transactions: [][]byte{[]byte("tx1")}}, + } + + _, err := seq.SubmitBatchTxs(ctx, req) + require.Error(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) +} + +func TestTracedSequencer_GetNextBatch_Success(t *testing.T) { + mock := &mockSequencer{ + getNextBatchFn: func(ctx context.Context, req coresequencer.GetNextBatchRequest) (*coresequencer.GetNextBatchResponse, error) { + return &coresequencer.GetNextBatchResponse{ + Batch: &coresequencer.Batch{ + Transactions: [][]byte{[]byte("tx1"), []byte("forced-tx")}, + ForceIncludedMask: []bool{false, true}, + }, + Timestamp: time.Unix(1700000000, 0), + }, nil + }, + } + seq, sr := setupSequencerTrace(t, mock) + ctx := context.Background() + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000, + } + + res, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + require.NotNil(t, res) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "Sequencer.GetNextBatch", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + requireSequencerAttribute(t, attrs, "tx.count", 2) + requireSequencerAttribute(t, attrs, "forced_inclusion.count", 1) + requireSequencerAttribute(t, attrs, "max_bytes", int64(1000)) +} + +func TestTracedSequencer_GetNextBatch_Error(t *testing.T) { + mock := &mockSequencer{ + getNextBatchFn: func(ctx context.Context, req coresequencer.GetNextBatchRequest) (*coresequencer.GetNextBatchResponse, error) { + return nil, errors.New("failed to fetch from DA") + }, + } + seq, sr := setupSequencerTrace(t, mock) + ctx := context.Background() + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000, + } + + _, err := seq.GetNextBatch(ctx, req) + require.Error(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) +} + +func TestTracedSequencer_VerifyBatch_Success(t *testing.T) { + mock := &mockSequencer{ + verifyBatchFn: func(ctx context.Context, req coresequencer.VerifyBatchRequest) (*coresequencer.VerifyBatchResponse, error) { + return &coresequencer.VerifyBatchResponse{Status: true}, nil + }, + } + seq, sr := setupSequencerTrace(t, mock) + ctx := context.Background() + + req := coresequencer.VerifyBatchRequest{ + Id: []byte("test-chain"), + BatchData: [][]byte{[]byte("proof1"), []byte("proof2")}, + } + + res, err := seq.VerifyBatch(ctx, req) + require.NoError(t, err) + require.NotNil(t, res) + require.True(t, res.Status) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "Sequencer.VerifyBatch", span.Name()) + + attrs := span.Attributes() + requireSequencerAttribute(t, attrs, "batch_data.count", 2) + requireSequencerAttribute(t, attrs, "verified", true) +} + +func TestTracedSequencer_VerifyBatch_Failure(t *testing.T) { + mock := &mockSequencer{ + verifyBatchFn: func(ctx context.Context, req coresequencer.VerifyBatchRequest) (*coresequencer.VerifyBatchResponse, error) { + return &coresequencer.VerifyBatchResponse{Status: false}, nil + }, + } + seq, sr := setupSequencerTrace(t, mock) + ctx := context.Background() + + req := coresequencer.VerifyBatchRequest{ + Id: []byte("test-chain"), + BatchData: [][]byte{[]byte("invalid-proof")}, + } + + res, err := seq.VerifyBatch(ctx, req) + require.NoError(t, err) + require.NotNil(t, res) + require.False(t, res.Status) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + + attrs := span.Attributes() + requireSequencerAttribute(t, attrs, "verified", false) +} + +func TestTracedSequencer_VerifyBatch_Error(t *testing.T) { + mock := &mockSequencer{ + verifyBatchFn: func(ctx context.Context, req coresequencer.VerifyBatchRequest) (*coresequencer.VerifyBatchResponse, error) { + return nil, errors.New("failed to get proofs") + }, + } + seq, sr := setupSequencerTrace(t, mock) + ctx := context.Background() + + req := coresequencer.VerifyBatchRequest{ + Id: []byte("test-chain"), + BatchData: [][]byte{[]byte("proof")}, + } + + _, err := seq.VerifyBatch(ctx, req) + require.Error(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) +} + +func TestTracedSequencer_DAHeightPassthrough(t *testing.T) { + mock := &mockSequencer{} + seq, _ := setupSequencerTrace(t, mock) + + seq.SetDAHeight(100) + require.Equal(t, uint64(100), seq.GetDAHeight()) + + seq.SetDAHeight(200) + require.Equal(t, uint64(200), seq.GetDAHeight()) +} + +func requireSequencerAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { + t.Helper() + found := false + for _, attr := range attrs { + if string(attr.Key) == key { + found = true + switch v := expected.(type) { + case string: + require.Equal(t, v, attr.Value.AsString()) + case int64: + require.Equal(t, v, attr.Value.AsInt64()) + case int: + require.Equal(t, int64(v), attr.Value.AsInt64()) + case bool: + require.Equal(t, v, attr.Value.AsBool()) + default: + t.Fatalf("unsupported attribute type: %T", expected) + } + break + } + } + require.True(t, found, "attribute %s not found", key) +} From 1ef693a891c38fa9f3e8efd31d1a23fcff7fb220 Mon Sep 17 00:00:00 2001 From: chatton Date: Mon, 19 Jan 2026 08:47:49 +0000 Subject: [PATCH 02/11] chore: using helper fn instead of having it duplicated --- block/components.go | 2 +- block/internal/da/tracing_test.go | 37 +++------------ block/internal/executing/tracing_test.go | 38 ++++------------ block/internal/syncing/tracing_test.go | 42 ++++------------- execution/evm/eth_rpc_tracing_test.go | 48 ++++++-------------- pkg/rpc/server/tracing_test.go | 58 +++++++----------------- pkg/telemetry/executor_tracing_test.go | 46 +++++-------------- pkg/telemetry/sequencer_tracing_test.go | 42 ++++------------- pkg/telemetry/testutil/attributes.go | 34 ++++++++++++++ 9 files changed, 110 insertions(+), 237 deletions(-) create mode 100644 pkg/telemetry/testutil/attributes.go diff --git a/block/components.go b/block/components.go index f37c10737..a7a883bb3 100644 --- a/block/components.go +++ b/block/components.go @@ -17,10 +17,10 @@ import ( coreexecutor "github.com/evstack/ev-node/core/execution" coresequencer "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/pkg/config" - "github.com/evstack/ev-node/pkg/telemetry" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/store" + "github.com/evstack/ev-node/pkg/telemetry" "github.com/evstack/ev-node/types" ) diff --git a/block/internal/da/tracing_test.go b/block/internal/da/tracing_test.go index ca288770c..ea01c9e42 100644 --- a/block/internal/da/tracing_test.go +++ b/block/internal/da/tracing_test.go @@ -7,12 +7,12 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" datypes "github.com/evstack/ev-node/pkg/da/types" + "github.com/evstack/ev-node/pkg/telemetry/testutil" ) // mockFullClient provides function hooks for testing the tracing decorator. @@ -87,8 +87,8 @@ func TestTracedDA_Submit_Success(t *testing.T) { require.Equal(t, codes.Unset, span.Status().Code) attrs := span.Attributes() - requireAttribute(t, attrs, "blob.count", 2) - requireAttribute(t, attrs, "blob.total_size_bytes", 3) + testutil.RequireAttribute(t, attrs, "blob.count", 2) + testutil.RequireAttribute(t, attrs, "blob.total_size_bytes", 3) // namespace hex string length assertion // 2 bytes = 4 hex characters foundNS := false @@ -134,8 +134,8 @@ func TestTracedDA_Retrieve_Success(t *testing.T) { span := spans[0] require.Equal(t, "DA.Retrieve", span.Name()) attrs := span.Attributes() - requireAttribute(t, attrs, "ns.length", 1) - requireAttribute(t, attrs, "blob.count", 2) + testutil.RequireAttribute(t, attrs, "ns.length", 1) + testutil.RequireAttribute(t, attrs, "blob.count", 2) } func TestTracedDA_Retrieve_Error(t *testing.T) { @@ -174,8 +174,8 @@ func TestTracedDA_Get_Success(t *testing.T) { span := spans[0] require.Equal(t, "DA.Get", span.Name()) attrs := span.Attributes() - requireAttribute(t, attrs, "id.count", 2) - requireAttribute(t, attrs, "blob.count", 2) + testutil.RequireAttribute(t, attrs, "id.count", 2) + testutil.RequireAttribute(t, attrs, "blob.count", 2) } func TestTracedDA_Get_Error(t *testing.T) { @@ -197,26 +197,3 @@ func TestTracedDA_Get_Error(t *testing.T) { require.Equal(t, codes.Error, span.Status().Code) require.Equal(t, "get failed", span.Status().Description) } - -// helper copied from eth tracing tests -func requireAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { - t.Helper() - found := false - for _, attr := range attrs { - if string(attr.Key) == key { - found = true - switch v := expected.(type) { - case string: - require.Equal(t, v, attr.Value.AsString()) - case int64: - require.Equal(t, v, attr.Value.AsInt64()) - case int: - require.Equal(t, int64(v), attr.Value.AsInt64()) - default: - t.Fatalf("unsupported attribute type: %T", expected) - } - break - } - } - require.True(t, found, "attribute %s not found", key) -} diff --git a/block/internal/executing/tracing_test.go b/block/internal/executing/tracing_test.go index c5c08ec74..fa5f4bebc 100644 --- a/block/internal/executing/tracing_test.go +++ b/block/internal/executing/tracing_test.go @@ -7,12 +7,12 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" coresequencer "github.com/evstack/ev-node/core/sequencer" + "github.com/evstack/ev-node/pkg/telemetry/testutil" "github.com/evstack/ev-node/types" ) @@ -131,7 +131,7 @@ func TestTracedBlockProducer_RetrieveBatch_Success(t *testing.T) { require.Equal(t, codes.Unset, span.Status().Code) attrs := span.Attributes() - requireAttribute(t, attrs, "batch.tx_count", 2) + testutil.RequireAttribute(t, attrs, "batch.tx_count", 2) } func TestTracedBlockProducer_RetrieveBatch_Error(t *testing.T) { @@ -180,8 +180,8 @@ func TestTracedBlockProducer_CreateBlock_Success(t *testing.T) { require.Equal(t, codes.Unset, span.Status().Code) attrs := span.Attributes() - requireAttribute(t, attrs, "block.height", int64(100)) - requireAttribute(t, attrs, "tx.count", 3) + testutil.RequireAttribute(t, attrs, "block.height", int64(100)) + testutil.RequireAttribute(t, attrs, "tx.count", 3) } func TestTracedBlockProducer_CreateBlock_Error(t *testing.T) { @@ -234,9 +234,9 @@ func TestTracedBlockProducer_ApplyBlock_Success(t *testing.T) { require.Equal(t, codes.Unset, span.Status().Code) attrs := span.Attributes() - requireAttribute(t, attrs, "block.height", int64(50)) - requireAttribute(t, attrs, "tx.count", 2) - requireAttribute(t, attrs, "state_root", "deadbeef") + testutil.RequireAttribute(t, attrs, "block.height", int64(50)) + testutil.RequireAttribute(t, attrs, "tx.count", 2) + testutil.RequireAttribute(t, attrs, "state_root", "deadbeef") } func TestTracedBlockProducer_ApplyBlock_Error(t *testing.T) { @@ -291,7 +291,7 @@ func TestTracedBlockProducer_ValidateBlock_Success(t *testing.T) { require.Equal(t, codes.Unset, span.Status().Code) attrs := span.Attributes() - requireAttribute(t, attrs, "block.height", int64(75)) + testutil.RequireAttribute(t, attrs, "block.height", int64(75)) } func TestTracedBlockProducer_ValidateBlock_Error(t *testing.T) { @@ -320,25 +320,3 @@ func TestTracedBlockProducer_ValidateBlock_Error(t *testing.T) { require.Equal(t, codes.Error, span.Status().Code) require.Equal(t, "validation failed", span.Status().Description) } - -func requireAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { - t.Helper() - found := false - for _, attr := range attrs { - if string(attr.Key) == key { - found = true - switch v := expected.(type) { - case string: - require.Equal(t, v, attr.Value.AsString()) - case int64: - require.Equal(t, v, attr.Value.AsInt64()) - case int: - require.Equal(t, int64(v), attr.Value.AsInt64()) - default: - t.Fatalf("unsupported attribute type: %T", expected) - } - break - } - } - require.True(t, found, "attribute %s not found", key) -} diff --git a/block/internal/syncing/tracing_test.go b/block/internal/syncing/tracing_test.go index d0d398301..679f3f7a3 100644 --- a/block/internal/syncing/tracing_test.go +++ b/block/internal/syncing/tracing_test.go @@ -7,12 +7,12 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/pkg/telemetry/testutil" "github.com/evstack/ev-node/types" ) @@ -92,9 +92,9 @@ func TestTracedBlockSyncer_TrySyncNextBlock_Success(t *testing.T) { require.Equal(t, codes.Unset, span.Status().Code) attrs := span.Attributes() - requireAttribute(t, attrs, "block.height", int64(100)) - requireAttribute(t, attrs, "da.height", int64(50)) - requireAttribute(t, attrs, "source", string(common.SourceDA)) + testutil.RequireAttribute(t, attrs, "block.height", int64(100)) + testutil.RequireAttribute(t, attrs, "da.height", int64(50)) + testutil.RequireAttribute(t, attrs, "source", string(common.SourceDA)) } func TestTracedBlockSyncer_TrySyncNextBlock_Error(t *testing.T) { @@ -159,9 +159,9 @@ func TestTracedBlockSyncer_ApplyBlock_Success(t *testing.T) { require.Equal(t, codes.Unset, span.Status().Code) attrs := span.Attributes() - requireAttribute(t, attrs, "block.height", int64(50)) - requireAttribute(t, attrs, "tx.count", 2) - requireAttribute(t, attrs, "state_root", "deadbeef") + testutil.RequireAttribute(t, attrs, "block.height", int64(50)) + testutil.RequireAttribute(t, attrs, "tx.count", 2) + testutil.RequireAttribute(t, attrs, "state_root", "deadbeef") } func TestTracedBlockSyncer_ApplyBlock_Error(t *testing.T) { @@ -216,7 +216,7 @@ func TestTracedBlockSyncer_ValidateBlock_Success(t *testing.T) { require.Equal(t, codes.Unset, span.Status().Code) attrs := span.Attributes() - requireAttribute(t, attrs, "block.height", int64(75)) + testutil.RequireAttribute(t, attrs, "block.height", int64(75)) } func TestTracedBlockSyncer_ValidateBlock_Error(t *testing.T) { @@ -274,8 +274,8 @@ func TestTracedBlockSyncer_VerifyForcedInclusionTxs_Success(t *testing.T) { require.Equal(t, codes.Unset, span.Status().Code) attrs := span.Attributes() - requireAttribute(t, attrs, "block.height", int64(100)) - requireAttribute(t, attrs, "da.height", int64(50)) + testutil.RequireAttribute(t, attrs, "block.height", int64(100)) + testutil.RequireAttribute(t, attrs, "da.height", int64(50)) } func TestTracedBlockSyncer_VerifyForcedInclusionTxs_Error(t *testing.T) { @@ -305,25 +305,3 @@ func TestTracedBlockSyncer_VerifyForcedInclusionTxs_Error(t *testing.T) { require.Equal(t, codes.Error, span.Status().Code) require.Equal(t, "forced inclusion verification failed", span.Status().Description) } - -func requireAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { - t.Helper() - found := false - for _, attr := range attrs { - if string(attr.Key) == key { - found = true - switch v := expected.(type) { - case string: - require.Equal(t, v, attr.Value.AsString()) - case int64: - require.Equal(t, v, attr.Value.AsInt64()) - case int: - require.Equal(t, int64(v), attr.Value.AsInt64()) - default: - t.Fatalf("unsupported attribute type: %T", expected) - } - break - } - } - require.True(t, found, "attribute %s not found", key) -} diff --git a/execution/evm/eth_rpc_tracing_test.go b/execution/evm/eth_rpc_tracing_test.go index 832a03fef..4d33e899b 100644 --- a/execution/evm/eth_rpc_tracing_test.go +++ b/execution/evm/eth_rpc_tracing_test.go @@ -9,10 +9,11 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" + + "github.com/evstack/ev-node/pkg/telemetry/testutil" ) // setupTestEthRPCTracing creates a traced eth RPC client with an in-memory span recorder @@ -90,13 +91,13 @@ func TestTracedEthRPCClient_HeaderByNumber_Success(t *testing.T) { // verify attributes attrs := span.Attributes() - requireAttribute(t, attrs, "method", "eth_getBlockByNumber") - requireAttribute(t, attrs, "block_number", "100") - requireAttribute(t, attrs, "block_hash", expectedHeader.Hash().Hex()) - requireAttribute(t, attrs, "state_root", expectedHeader.Root.Hex()) - requireAttribute(t, attrs, "gas_limit", int64(expectedHeader.GasLimit)) - requireAttribute(t, attrs, "gas_used", int64(expectedHeader.GasUsed)) - requireAttribute(t, attrs, "timestamp", int64(expectedHeader.Time)) + testutil.RequireAttribute(t, attrs, "method", "eth_getBlockByNumber") + testutil.RequireAttribute(t, attrs, "block_number", "100") + testutil.RequireAttribute(t, attrs, "block_hash", expectedHeader.Hash().Hex()) + testutil.RequireAttribute(t, attrs, "state_root", expectedHeader.Root.Hex()) + testutil.RequireAttribute(t, attrs, "gas_limit", int64(expectedHeader.GasLimit)) + testutil.RequireAttribute(t, attrs, "gas_used", int64(expectedHeader.GasUsed)) + testutil.RequireAttribute(t, attrs, "timestamp", int64(expectedHeader.Time)) } func TestTracedEthRPCClient_HeaderByNumber_Latest(t *testing.T) { @@ -131,7 +132,7 @@ func TestTracedEthRPCClient_HeaderByNumber_Latest(t *testing.T) { // verify block_number is "latest" when nil attrs := span.Attributes() - requireAttribute(t, attrs, "block_number", "latest") + testutil.RequireAttribute(t, attrs, "block_number", "latest") } func TestTracedEthRPCClient_HeaderByNumber_Error(t *testing.T) { @@ -206,8 +207,8 @@ func TestTracedEthRPCClient_GetTxs_Success(t *testing.T) { // verify attributes attrs := span.Attributes() - requireAttribute(t, attrs, "method", "txpoolExt_getTxs") - requireAttribute(t, attrs, "tx_count", len(expectedTxs)) + testutil.RequireAttribute(t, attrs, "method", "txpoolExt_getTxs") + testutil.RequireAttribute(t, attrs, "tx_count", len(expectedTxs)) } func TestTracedEthRPCClient_GetTxs_EmptyPool(t *testing.T) { @@ -235,7 +236,7 @@ func TestTracedEthRPCClient_GetTxs_EmptyPool(t *testing.T) { // verify tx_count is 0 attrs := span.Attributes() - requireAttribute(t, attrs, "tx_count", 0) + testutil.RequireAttribute(t, attrs, "tx_count", 0) } func TestTracedEthRPCClient_GetTxs_Error(t *testing.T) { @@ -276,26 +277,3 @@ func TestTracedEthRPCClient_GetTxs_Error(t *testing.T) { require.NotEqual(t, "tx_count", string(attr.Key)) } } - -// requireAttribute is a helper to check span attributes -func requireAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { - t.Helper() - found := false - for _, attr := range attrs { - if string(attr.Key) == key { - found = true - switch v := expected.(type) { - case string: - require.Equal(t, v, attr.Value.AsString()) - case int64: - require.Equal(t, v, attr.Value.AsInt64()) - case int: - require.Equal(t, int64(v), attr.Value.AsInt64()) - default: - t.Fatalf("unsupported attribute type: %T", expected) - } - break - } - } - require.True(t, found, "attribute %s not found", key) -} diff --git a/pkg/rpc/server/tracing_test.go b/pkg/rpc/server/tracing_test.go index f950f9e28..a64180d9f 100644 --- a/pkg/rpc/server/tracing_test.go +++ b/pkg/rpc/server/tracing_test.go @@ -9,13 +9,13 @@ import ( "connectrpc.com/connect" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" + "github.com/evstack/ev-node/pkg/telemetry/testutil" pb "github.com/evstack/ev-node/types/pb/evnode/v1" "github.com/evstack/ev-node/types/pb/evnode/v1/v1connect" ) @@ -174,9 +174,9 @@ func TestTracedStoreService_GetBlock_Success(t *testing.T) { require.Equal(t, codes.Unset, span.Status().Code) attrs := span.Attributes() - requireAttribute(t, attrs, "height", int64(10)) - requireAttribute(t, attrs, "found", true) - requireAttribute(t, attrs, "tx_count", 2) + testutil.RequireAttribute(t, attrs, "height", int64(10)) + testutil.RequireAttribute(t, attrs, "found", true) + testutil.RequireAttribute(t, attrs, "tx_count", 2) } func TestTracedStoreService_GetBlock_Error(t *testing.T) { @@ -228,9 +228,9 @@ func TestTracedStoreService_GetState_Success(t *testing.T) { require.Equal(t, "StoreService.GetState", span.Name()) attrs := span.Attributes() - requireAttribute(t, attrs, "height", int64(100)) - requireAttribute(t, attrs, "app_hash", "aabb") - requireAttribute(t, attrs, "da_height", int64(50)) + testutil.RequireAttribute(t, attrs, "height", int64(100)) + testutil.RequireAttribute(t, attrs, "app_hash", "aabb") + testutil.RequireAttribute(t, attrs, "da_height", int64(50)) } func TestTracedStoreService_GetMetadata_Success(t *testing.T) { @@ -258,8 +258,8 @@ func TestTracedStoreService_GetMetadata_Success(t *testing.T) { require.Equal(t, "StoreService.GetMetadata", span.Name()) attrs := span.Attributes() - requireAttribute(t, attrs, "key", "test_key") - requireAttribute(t, attrs, "value_size_bytes", 14) + testutil.RequireAttribute(t, attrs, "key", "test_key") + testutil.RequireAttribute(t, attrs, "value_size_bytes", 14) } func TestTracedStoreService_GetGenesisDaHeight_Success(t *testing.T) { @@ -284,7 +284,7 @@ func TestTracedStoreService_GetGenesisDaHeight_Success(t *testing.T) { require.Equal(t, "StoreService.GetGenesisDaHeight", span.Name()) attrs := span.Attributes() - requireAttribute(t, attrs, "genesis_da_height", int64(1000)) + testutil.RequireAttribute(t, attrs, "genesis_da_height", int64(1000)) } func TestTracedStoreService_GetP2PStoreInfo_Success(t *testing.T) { @@ -312,7 +312,7 @@ func TestTracedStoreService_GetP2PStoreInfo_Success(t *testing.T) { require.Equal(t, "StoreService.GetP2PStoreInfo", span.Name()) attrs := span.Attributes() - requireAttribute(t, attrs, "store_count", 2) + testutil.RequireAttribute(t, attrs, "store_count", 2) } // P2PService tests @@ -342,7 +342,7 @@ func TestTracedP2PService_GetPeerInfo_Success(t *testing.T) { require.Equal(t, "P2PService.GetPeerInfo", span.Name()) attrs := span.Attributes() - requireAttribute(t, attrs, "peer_count", 2) + testutil.RequireAttribute(t, attrs, "peer_count", 2) } func TestTracedP2PService_GetPeerInfo_Error(t *testing.T) { @@ -389,8 +389,8 @@ func TestTracedP2PService_GetNetInfo_Success(t *testing.T) { require.Equal(t, "P2PService.GetNetInfo", span.Name()) attrs := span.Attributes() - requireAttribute(t, attrs, "node_id", "node123") - requireAttribute(t, attrs, "listen_address_count", 1) + testutil.RequireAttribute(t, attrs, "node_id", "node123") + testutil.RequireAttribute(t, attrs, "listen_address_count", 1) } // ConfigService tests @@ -418,8 +418,8 @@ func TestTracedConfigService_GetNamespace_Success(t *testing.T) { require.Equal(t, "ConfigService.GetNamespace", span.Name()) attrs := span.Attributes() - requireAttribute(t, attrs, "header_namespace", "0x0001020304050607") - requireAttribute(t, attrs, "data_namespace", "0x08090a0b0c0d0e0f") + testutil.RequireAttribute(t, attrs, "header_namespace", "0x0001020304050607") + testutil.RequireAttribute(t, attrs, "data_namespace", "0x08090a0b0c0d0e0f") } func TestTracedConfigService_GetNamespace_Error(t *testing.T) { @@ -463,7 +463,7 @@ func TestTracedConfigService_GetSignerInfo_Success(t *testing.T) { require.Equal(t, "ConfigService.GetSignerInfo", span.Name()) attrs := span.Attributes() - requireAttribute(t, attrs, "signer_address", "01020304") + testutil.RequireAttribute(t, attrs, "signer_address", "01020304") } func TestTracedConfigService_GetSignerInfo_Error(t *testing.T) { @@ -484,27 +484,3 @@ func TestTracedConfigService_GetSignerInfo_Error(t *testing.T) { span := spans[0] require.Equal(t, codes.Error, span.Status().Code) } - -func requireAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { - t.Helper() - found := false - for _, attr := range attrs { - if string(attr.Key) == key { - found = true - switch v := expected.(type) { - case string: - require.Equal(t, v, attr.Value.AsString()) - case int64: - require.Equal(t, v, attr.Value.AsInt64()) - case int: - require.Equal(t, int64(v), attr.Value.AsInt64()) - case bool: - require.Equal(t, v, attr.Value.AsBool()) - default: - t.Fatalf("unsupported attribute type: %T", expected) - } - break - } - } - require.True(t, found, "attribute %s not found", key) -} diff --git a/pkg/telemetry/executor_tracing_test.go b/pkg/telemetry/executor_tracing_test.go index 9a79ba2f6..472ba9852 100644 --- a/pkg/telemetry/executor_tracing_test.go +++ b/pkg/telemetry/executor_tracing_test.go @@ -6,16 +6,15 @@ import ( "testing" "time" + coreexec "github.com/evstack/ev-node/core/execution" + "github.com/evstack/ev-node/pkg/telemetry/testutil" + "github.com/evstack/ev-node/test/mocks" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" - - coreexec "github.com/evstack/ev-node/core/execution" - "github.com/evstack/ev-node/test/mocks" ) // setupTestTracing creates a traced executor with an in-memory span recorder for testing @@ -72,9 +71,9 @@ func TestWithTracingExecutor_InitChain_Success(t *testing.T) { // verify attributes attrs := span.Attributes() require.Len(t, attrs, 3) - requireAttribute(t, attrs, "chain.id", chainID) - requireAttribute(t, attrs, "initial.height", int64(initialHeight)) - requireAttribute(t, attrs, "genesis.time_unix", genesisTime.Unix()) + testutil.RequireAttribute(t, attrs, "chain.id", chainID) + testutil.RequireAttribute(t, attrs, "initial.height", int64(initialHeight)) + testutil.RequireAttribute(t, attrs, "genesis.time_unix", genesisTime.Unix()) } func TestWithTracingExecutor_InitChain_Error(t *testing.T) { @@ -137,7 +136,7 @@ func TestWithTracingExecutor_GetTxs_Success(t *testing.T) { // verify tx.count attribute attrs := span.Attributes() - requireAttribute(t, attrs, "tx.count", len(expectedTxs)) + testutil.RequireAttribute(t, attrs, "tx.count", len(expectedTxs)) } func TestWithTracingExecutor_GetTxs_Error(t *testing.T) { @@ -202,9 +201,9 @@ func TestWithTracingExecutor_ExecuteTxs_Success(t *testing.T) { // verify attributes attrs := span.Attributes() - requireAttribute(t, attrs, "tx.count", len(txs)) - requireAttribute(t, attrs, "block.height", int64(blockHeight)) - requireAttribute(t, attrs, "timestamp", timestamp.Unix()) + testutil.RequireAttribute(t, attrs, "tx.count", len(txs)) + testutil.RequireAttribute(t, attrs, "block.height", int64(blockHeight)) + testutil.RequireAttribute(t, attrs, "timestamp", timestamp.Unix()) } func TestWithTracingExecutor_ExecuteTxs_Error(t *testing.T) { @@ -260,7 +259,7 @@ func TestWithTracingExecutor_SetFinal_Success(t *testing.T) { require.Equal(t, codes.Unset, span.Status().Code) attrs := span.Attributes() - requireAttribute(t, attrs, "block.height", int64(blockHeight)) + testutil.RequireAttribute(t, attrs, "block.height", int64(blockHeight)) } func TestWithTracingExecutor_SetFinal_Error(t *testing.T) { @@ -371,26 +370,3 @@ type mockExecutorWithHeight struct { func (m *mockExecutorWithHeight) GetLatestHeight(ctx context.Context) (uint64, error) { return m.height, m.err } - -// requireAttribute is a helper to check span attributes -func requireAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { - t.Helper() - found := false - for _, attr := range attrs { - if string(attr.Key) == key { - found = true - switch v := expected.(type) { - case string: - require.Equal(t, v, attr.Value.AsString()) - case int64: - require.Equal(t, v, attr.Value.AsInt64()) - case int: - require.Equal(t, int64(v), attr.Value.AsInt64()) - default: - t.Fatalf("unsupported attribute type: %T", expected) - } - break - } - } - require.True(t, found, "attribute %s not found", key) -} diff --git a/pkg/telemetry/sequencer_tracing_test.go b/pkg/telemetry/sequencer_tracing_test.go index 244024075..bea229f74 100644 --- a/pkg/telemetry/sequencer_tracing_test.go +++ b/pkg/telemetry/sequencer_tracing_test.go @@ -7,9 +7,9 @@ import ( "time" coresequencer "github.com/evstack/ev-node/core/sequencer" + "github.com/evstack/ev-node/pkg/telemetry/testutil" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" @@ -92,8 +92,8 @@ func TestTracedSequencer_SubmitBatchTxs_Success(t *testing.T) { require.Equal(t, codes.Unset, span.Status().Code) attrs := span.Attributes() - requireSequencerAttribute(t, attrs, "tx.count", 3) - requireSequencerAttribute(t, attrs, "batch.size_bytes", 9) // "tx1" + "tx2" + "tx3" = 9 bytes + testutil.RequireAttribute(t, attrs, "tx.count", 3) + testutil.RequireAttribute(t, attrs, "batch.size_bytes", 9) // "tx1" + "tx2" + "tx3" = 9 bytes } func TestTracedSequencer_SubmitBatchTxs_Error(t *testing.T) { @@ -150,9 +150,9 @@ func TestTracedSequencer_GetNextBatch_Success(t *testing.T) { require.Equal(t, codes.Unset, span.Status().Code) attrs := span.Attributes() - requireSequencerAttribute(t, attrs, "tx.count", 2) - requireSequencerAttribute(t, attrs, "forced_inclusion.count", 1) - requireSequencerAttribute(t, attrs, "max_bytes", int64(1000)) + testutil.RequireAttribute(t, attrs, "tx.count", 2) + testutil.RequireAttribute(t, attrs, "forced_inclusion.count", 1) + testutil.RequireAttribute(t, attrs, "max_bytes", int64(1000)) } func TestTracedSequencer_GetNextBatch_Error(t *testing.T) { @@ -203,8 +203,8 @@ func TestTracedSequencer_VerifyBatch_Success(t *testing.T) { require.Equal(t, "Sequencer.VerifyBatch", span.Name()) attrs := span.Attributes() - requireSequencerAttribute(t, attrs, "batch_data.count", 2) - requireSequencerAttribute(t, attrs, "verified", true) + testutil.RequireAttribute(t, attrs, "batch_data.count", 2) + testutil.RequireAttribute(t, attrs, "verified", true) } func TestTracedSequencer_VerifyBatch_Failure(t *testing.T) { @@ -231,7 +231,7 @@ func TestTracedSequencer_VerifyBatch_Failure(t *testing.T) { span := spans[0] attrs := span.Attributes() - requireSequencerAttribute(t, attrs, "verified", false) + testutil.RequireAttribute(t, attrs, "verified", false) } func TestTracedSequencer_VerifyBatch_Error(t *testing.T) { @@ -267,27 +267,3 @@ func TestTracedSequencer_DAHeightPassthrough(t *testing.T) { seq.SetDAHeight(200) require.Equal(t, uint64(200), seq.GetDAHeight()) } - -func requireSequencerAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { - t.Helper() - found := false - for _, attr := range attrs { - if string(attr.Key) == key { - found = true - switch v := expected.(type) { - case string: - require.Equal(t, v, attr.Value.AsString()) - case int64: - require.Equal(t, v, attr.Value.AsInt64()) - case int: - require.Equal(t, int64(v), attr.Value.AsInt64()) - case bool: - require.Equal(t, v, attr.Value.AsBool()) - default: - t.Fatalf("unsupported attribute type: %T", expected) - } - break - } - } - require.True(t, found, "attribute %s not found", key) -} diff --git a/pkg/telemetry/testutil/attributes.go b/pkg/telemetry/testutil/attributes.go new file mode 100644 index 000000000..6a6ede4cd --- /dev/null +++ b/pkg/telemetry/testutil/attributes.go @@ -0,0 +1,34 @@ +// Package testutil provides test utilities for OpenTelemetry tracing tests. +package testutil + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" +) + +// RequireAttribute asserts that an attribute with the given key exists and has the expected value. +func RequireAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { + t.Helper() + found := false + for _, attr := range attrs { + if string(attr.Key) == key { + found = true + switch v := expected.(type) { + case string: + require.Equal(t, v, attr.Value.AsString()) + case int64: + require.Equal(t, v, attr.Value.AsInt64()) + case int: + require.Equal(t, int64(v), attr.Value.AsInt64()) + case bool: + require.Equal(t, v, attr.Value.AsBool()) + default: + t.Fatalf("unsupported attribute type: %T", expected) + } + break + } + } + require.True(t, found, "attribute %s not found", key) +} From ef202f428ade529e6f0db1cf8602b533d5cc697c Mon Sep 17 00:00:00 2001 From: chatton Date: Mon, 19 Jan 2026 09:50:50 +0000 Subject: [PATCH 03/11] chore: adding da retreiver syncing --- .../internal/syncing/da_retriever_tracing.go | 57 ++++++++ .../syncing/da_retriever_tracing_test.go | 123 ++++++++++++++++++ block/internal/syncing/syncer.go | 3 + 3 files changed, 183 insertions(+) create mode 100644 block/internal/syncing/da_retriever_tracing.go create mode 100644 block/internal/syncing/da_retriever_tracing_test.go diff --git a/block/internal/syncing/da_retriever_tracing.go b/block/internal/syncing/da_retriever_tracing.go new file mode 100644 index 000000000..894fc67ba --- /dev/null +++ b/block/internal/syncing/da_retriever_tracing.go @@ -0,0 +1,57 @@ +package syncing + +import ( + "context" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + + "github.com/evstack/ev-node/block/internal/common" +) + +var _ DARetriever = (*tracedDARetriever)(nil) + +// tracedDARetriever wraps a DARetriever with OpenTelemetry tracing. +type tracedDARetriever struct { + inner DARetriever + tracer trace.Tracer +} + +// WithTracingDARetriever wraps a DARetriever with OpenTelemetry tracing. +func WithTracingDARetriever(inner DARetriever) DARetriever { + return &tracedDARetriever{ + inner: inner, + tracer: otel.Tracer("ev-node/da-retriever"), + } +} + +func (t *tracedDARetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) { + ctx, span := t.tracer.Start(ctx, "DARetriever.RetrieveFromDA", + trace.WithAttributes( + attribute.Int64("da.height", int64(daHeight)), + ), + ) + defer span.End() + + events, err := t.inner.RetrieveFromDA(ctx, daHeight) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return events, err + } + + span.SetAttributes(attribute.Int("event.count", len(events))) + + // add block heights from events + if len(events) > 0 { + heights := make([]int64, len(events)) + for i, event := range events { + heights[i] = int64(event.Header.Height()) + } + span.SetAttributes(attribute.Int64Slice("block.heights", heights)) + } + + return events, nil +} diff --git a/block/internal/syncing/da_retriever_tracing_test.go b/block/internal/syncing/da_retriever_tracing_test.go new file mode 100644 index 000000000..d83ed99d2 --- /dev/null +++ b/block/internal/syncing/da_retriever_tracing_test.go @@ -0,0 +1,123 @@ +package syncing + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + + "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/pkg/telemetry/testutil" + "github.com/evstack/ev-node/types" +) + +type mockDARetriever struct { + retrieveFromDAFn func(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) +} + +func (m *mockDARetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) { + if m.retrieveFromDAFn != nil { + return m.retrieveFromDAFn(ctx, daHeight) + } + return nil, nil +} + +func setupDARetrieverTrace(t *testing.T, inner DARetriever) (DARetriever, *tracetest.SpanRecorder) { + t.Helper() + sr := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + t.Cleanup(func() { _ = tp.Shutdown(context.Background()) }) + otel.SetTracerProvider(tp) + return WithTracingDARetriever(inner), sr +} + +func TestTracedDARetriever_RetrieveFromDA_Success(t *testing.T) { + mock := &mockDARetriever{ + retrieveFromDAFn: func(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) { + return []common.DAHeightEvent{ + { + Header: &types.SignedHeader{ + Header: types.Header{ + BaseHeader: types.BaseHeader{Height: 100}, + }, + }, + DaHeight: daHeight, + Source: common.SourceDA, + }, + { + Header: &types.SignedHeader{ + Header: types.Header{ + BaseHeader: types.BaseHeader{Height: 101}, + }, + }, + DaHeight: daHeight, + Source: common.SourceDA, + }, + }, nil + }, + } + retriever, sr := setupDARetrieverTrace(t, mock) + ctx := context.Background() + + events, err := retriever.RetrieveFromDA(ctx, 50) + require.NoError(t, err) + require.Len(t, events, 2) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "DARetriever.RetrieveFromDA", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + testutil.RequireAttribute(t, attrs, "da.height", int64(50)) + testutil.RequireAttribute(t, attrs, "event.count", 2) +} + +func TestTracedDARetriever_RetrieveFromDA_NoEvents(t *testing.T) { + mock := &mockDARetriever{ + retrieveFromDAFn: func(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) { + return []common.DAHeightEvent{}, nil + }, + } + retriever, sr := setupDARetrieverTrace(t, mock) + ctx := context.Background() + + events, err := retriever.RetrieveFromDA(ctx, 50) + require.NoError(t, err) + require.Empty(t, events) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + testutil.RequireAttribute(t, attrs, "event.count", 0) +} + +func TestTracedDARetriever_RetrieveFromDA_Error(t *testing.T) { + expectedErr := errors.New("DA retrieval failed") + mock := &mockDARetriever{ + retrieveFromDAFn: func(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) { + return nil, expectedErr + }, + } + retriever, sr := setupDARetrieverTrace(t, mock) + ctx := context.Background() + + _, err := retriever.RetrieveFromDA(ctx, 50) + require.Error(t, err) + require.Equal(t, expectedErr, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, expectedErr.Error(), span.Status().Description) +} diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index a00c2f4fc..6365c548b 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -201,6 +201,9 @@ func (s *Syncer) Start(ctx context.Context) error { // Initialize handlers s.daRetriever = NewDARetriever(s.daClient, s.cache, s.genesis, s.logger) + if s.config.Instrumentation.IsTracingEnabled() { + s.daRetriever = WithTracingDARetriever(s.daRetriever) + } s.fiRetriever = da.NewForcedInclusionRetriever(s.daClient, s.logger, s.config, s.genesis.DAStartHeight, s.genesis.DAEpochForcedInclusion) s.p2pHandler = NewP2PHandler(s.headerStore.Store(), s.dataStore.Store(), s.cache, s.genesis, s.logger) if currentHeight, err := s.store.Height(s.ctx); err != nil { From eeea8b66dae6c70aa245ef92535adc27de591923 Mon Sep 17 00:00:00 2001 From: chatton Date: Mon, 19 Jan 2026 10:34:47 +0000 Subject: [PATCH 04/11] chore: bump sonic version to work with 1.25 --- test/docker-e2e/go.mod | 7 ++++--- test/docker-e2e/go.sum | 20 +++++++++----------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/test/docker-e2e/go.mod b/test/docker-e2e/go.mod index 099fdcbb3..e6ea03df7 100644 --- a/test/docker-e2e/go.mod +++ b/test/docker-e2e/go.mod @@ -16,10 +16,11 @@ require ( github.com/StackExchange/wmi v1.2.1 // indirect github.com/bcp-innovations/hyperlane-cosmos v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.20.0 // indirect - github.com/bytedance/sonic v1.13.1 // indirect - github.com/bytedance/sonic/loader v0.2.4 // indirect + github.com/bytedance/gopkg v0.1.3 // indirect + github.com/bytedance/sonic v1.14.2 // indirect + github.com/bytedance/sonic/loader v0.4.0 // indirect github.com/celestiaorg/go-square/v3 v3.0.2 // indirect - github.com/cloudwego/base64x v0.1.5 // indirect + github.com/cloudwego/base64x v0.1.6 // indirect github.com/consensys/gnark-crypto v0.18.1 // indirect github.com/containerd/continuity v0.4.5 // indirect github.com/crate-crypto/go-eth-kzg v1.4.0 // indirect diff --git a/test/docker-e2e/go.sum b/test/docker-e2e/go.sum index 000946024..92268cc0b 100644 --- a/test/docker-e2e/go.sum +++ b/test/docker-e2e/go.sum @@ -126,11 +126,12 @@ github.com/btcsuite/btcd/btcutil v1.1.6 h1:zFL2+c3Lb9gEgqKNzowKUPQNb8jV7v5Oaodi/ github.com/btcsuite/btcd/btcutil v1.1.6/go.mod h1:9dFymx8HpuLqBnsPELrImQeTQfKBQqzqGbbV3jK55aE= github.com/bufbuild/protocompile v0.14.1 h1:iA73zAf/fyljNjQKwYzUHD6AD4R8KMasmwa/FBatYVw= github.com/bufbuild/protocompile v0.14.1/go.mod h1:ppVdAIhbr2H8asPk6k4pY7t9zB1OU5DoEw9xY/FUi1c= -github.com/bytedance/sonic v1.13.1 h1:Jyd5CIvdFnkOWuKXr+wm4Nyk2h0yAFsr8ucJgEasO3g= -github.com/bytedance/sonic v1.13.1/go.mod h1:o68xyaF9u2gvVBuGHPlUVCy+ZfmNNO5ETf1+KgkJhz4= -github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= -github.com/bytedance/sonic/loader v0.2.4 h1:ZWCw4stuXUsn1/+zQDqeE7JKP+QO47tz7QCNan80NzY= -github.com/bytedance/sonic/loader v0.2.4/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI= +github.com/bytedance/gopkg v0.1.3 h1:TPBSwH8RsouGCBcMBktLt1AymVo2TVsBVCY4b6TnZ/M= +github.com/bytedance/gopkg v0.1.3/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM= +github.com/bytedance/sonic v1.14.2 h1:k1twIoe97C1DtYUo+fZQy865IuHia4PR5RPiuGPPIIE= +github.com/bytedance/sonic v1.14.2/go.mod h1:T80iDELeHiHKSc0C9tubFygiuXoGzrkjKzX2quAx980= +github.com/bytedance/sonic/loader v0.4.0 h1:olZ7lEqcxtZygCK9EKYKADnpQoYkRQxaeY2NYzevs+o= +github.com/bytedance/sonic/loader v0.4.0/go.mod h1:AR4NYCk5DdzZizZ5djGqQ92eEhCCcdf5x77udYiSJRo= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/celestiaorg/celestia-core v0.39.4 h1:h0WaG8KsP0JyiAVhHipoIgvBP0CYLG/9whUccy1lDlY= github.com/celestiaorg/celestia-core v0.39.4/go.mod h1:t7cSYwLFmpz5RjIBpC3QjpbRoa+RfQ0ULdh+LciKuq8= @@ -162,9 +163,8 @@ github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6D github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= -github.com/cloudwego/base64x v0.1.5 h1:XPciSp1xaq2VCSt6lF0phncD4koWyULpl5bUxbfCyP4= -github.com/cloudwego/base64x v0.1.5/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= -github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= +github.com/cloudwego/base64x v0.1.6 h1:t11wG9AECkCDk5fMSoxmufanudBtJ+/HemLstXDLI2M= +github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= @@ -561,12 +561,10 @@ github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYs github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= -github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE= github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/klauspost/reedsolomon v1.12.5 h1:4cJuyH926If33BeDgiZpI5OU0pE+wUHZvMSyNGqN73Y= github.com/klauspost/reedsolomon v1.12.5/go.mod h1:LkXRjLYGM8K/iQfujYnaPeDmhZLqkrGUyG9p7zs5L68= -github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= @@ -853,6 +851,7 @@ github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1F github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= @@ -1233,7 +1232,6 @@ honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt nhooyr.io/websocket v1.8.6/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= nhooyr.io/websocket v1.8.17 h1:KEVeLJkUywCKVsnLIDlD/5gtayKp8VoCkksHCGGfT9Y= nhooyr.io/websocket v1.8.17/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= -nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50= pgregory.net/rapid v1.2.0 h1:keKAYRcjm+e1F0oAuU5F5+YPAWcyxNNRK2wud503Gnk= pgregory.net/rapid v1.2.0/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04= sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= From 19add9cc2cb97ba4fdd79744fb51007249de0d53 Mon Sep 17 00:00:00 2001 From: chatton Date: Mon, 19 Jan 2026 11:23:52 +0000 Subject: [PATCH 05/11] chore: adding tracing for da submitter --- block/components.go | 10 +- .../submitting/da_submitter_tracing.go | 97 +++++++++ .../submitting/da_submitter_tracing_test.go | 190 ++++++++++++++++++ block/internal/submitting/submitter.go | 8 +- 4 files changed, 299 insertions(+), 6 deletions(-) create mode 100644 block/internal/submitting/da_submitter_tracing.go create mode 100644 block/internal/submitting/da_submitter_tracing_test.go diff --git a/block/components.go b/block/components.go index a7a883bb3..17cbe018d 100644 --- a/block/components.go +++ b/block/components.go @@ -157,7 +157,10 @@ func NewSyncComponents( } // Create submitter for sync nodes (no signer, only DA inclusion processing) - daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger) + var daSubmitter submitting.DASubmitterAPI = submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger) + if config.Instrumentation.IsTracingEnabled() { + daSubmitter = submitting.WithTracingDASubmitter(daSubmitter) + } submitter := submitting.NewSubmitter( store, exec, @@ -256,7 +259,10 @@ func NewAggregatorComponents( }, nil } - daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger) + var daSubmitter submitting.DASubmitterAPI = submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger) + if config.Instrumentation.IsTracingEnabled() { + daSubmitter = submitting.WithTracingDASubmitter(daSubmitter) + } submitter := submitting.NewSubmitter( store, exec, diff --git a/block/internal/submitting/da_submitter_tracing.go b/block/internal/submitting/da_submitter_tracing.go new file mode 100644 index 000000000..e3c531fcf --- /dev/null +++ b/block/internal/submitting/da_submitter_tracing.go @@ -0,0 +1,97 @@ +package submitting + +import ( + "context" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + + "github.com/evstack/ev-node/block/internal/cache" + "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/signer" + "github.com/evstack/ev-node/types" +) + +var _ DASubmitterAPI = (*tracedDASubmitter)(nil) + +// tracedDASubmitter wraps a DASubmitterAPI with OpenTelemetry tracing. +type tracedDASubmitter struct { + inner DASubmitterAPI + tracer trace.Tracer +} + +// WithTracingDASubmitter wraps a DASubmitterAPI with OpenTelemetry tracing. +func WithTracingDASubmitter(inner DASubmitterAPI) DASubmitterAPI { + return &tracedDASubmitter{ + inner: inner, + tracer: otel.Tracer("ev-node/da-submitter"), + } +} + +func (t *tracedDASubmitter) SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error { + ctx, span := t.tracer.Start(ctx, "DASubmitter.SubmitHeaders", + trace.WithAttributes( + attribute.Int("header.count", len(headers)), + ), + ) + defer span.End() + + // calculate total size + var totalBytes int + for _, h := range marshalledHeaders { + totalBytes += len(h) + } + span.SetAttributes(attribute.Int("header.total_bytes", totalBytes)) + + // add height range if headers present + if len(headers) > 0 { + span.SetAttributes( + attribute.Int64("header.start_height", int64(headers[0].Height())), + attribute.Int64("header.end_height", int64(headers[len(headers)-1].Height())), + ) + } + + err := t.inner.SubmitHeaders(ctx, headers, marshalledHeaders, cache, signer) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} + +func (t *tracedDASubmitter) SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { + ctx, span := t.tracer.Start(ctx, "DASubmitter.SubmitData", + trace.WithAttributes( + attribute.Int("data.count", len(signedDataList)), + ), + ) + defer span.End() + + // calculate total size + var totalBytes int + for _, d := range marshalledData { + totalBytes += len(d) + } + span.SetAttributes(attribute.Int("data.total_bytes", totalBytes)) + + // add height range if data present + if len(signedDataList) > 0 { + span.SetAttributes( + attribute.Int64("data.start_height", int64(signedDataList[0].Height())), + attribute.Int64("data.end_height", int64(signedDataList[len(signedDataList)-1].Height())), + ) + } + + err := t.inner.SubmitData(ctx, signedDataList, marshalledData, cache, signer, genesis) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} diff --git a/block/internal/submitting/da_submitter_tracing_test.go b/block/internal/submitting/da_submitter_tracing_test.go new file mode 100644 index 000000000..6edc5c5ec --- /dev/null +++ b/block/internal/submitting/da_submitter_tracing_test.go @@ -0,0 +1,190 @@ +package submitting + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + + "github.com/evstack/ev-node/block/internal/cache" + "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/signer" + "github.com/evstack/ev-node/pkg/telemetry/testutil" + "github.com/evstack/ev-node/types" +) + +type mockDASubmitterAPI struct { + submitHeadersFn func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error + submitDataFn func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error +} + +func (m *mockDASubmitterAPI) SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error { + if m.submitHeadersFn != nil { + return m.submitHeadersFn(ctx, headers, marshalledHeaders, cache, signer) + } + return nil +} + +func (m *mockDASubmitterAPI) SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { + if m.submitDataFn != nil { + return m.submitDataFn(ctx, signedDataList, marshalledData, cache, signer, genesis) + } + return nil +} + +func setupDASubmitterTrace(t *testing.T, inner DASubmitterAPI) (DASubmitterAPI, *tracetest.SpanRecorder) { + t.Helper() + sr := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + t.Cleanup(func() { _ = tp.Shutdown(context.Background()) }) + otel.SetTracerProvider(tp) + return WithTracingDASubmitter(inner), sr +} + +func TestTracedDASubmitter_SubmitHeaders_Success(t *testing.T) { + mock := &mockDASubmitterAPI{ + submitHeadersFn: func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error { + return nil + }, + } + submitter, sr := setupDASubmitterTrace(t, mock) + ctx := context.Background() + + headers := []*types.SignedHeader{ + {Header: types.Header{BaseHeader: types.BaseHeader{Height: 100}}}, + {Header: types.Header{BaseHeader: types.BaseHeader{Height: 101}}}, + {Header: types.Header{BaseHeader: types.BaseHeader{Height: 102}}}, + } + marshalledHeaders := [][]byte{ + []byte("header1"), + []byte("header2"), + []byte("header3"), + } + + err := submitter.SubmitHeaders(ctx, headers, marshalledHeaders, nil, nil) + require.NoError(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "DASubmitter.SubmitHeaders", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + testutil.RequireAttribute(t, attrs, "header.count", 3) + testutil.RequireAttribute(t, attrs, "header.total_bytes", 21) // 7+7+7 + testutil.RequireAttribute(t, attrs, "header.start_height", int64(100)) + testutil.RequireAttribute(t, attrs, "header.end_height", int64(102)) +} + +func TestTracedDASubmitter_SubmitHeaders_Error(t *testing.T) { + expectedErr := errors.New("DA submission failed") + mock := &mockDASubmitterAPI{ + submitHeadersFn: func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error { + return expectedErr + }, + } + submitter, sr := setupDASubmitterTrace(t, mock) + ctx := context.Background() + + headers := []*types.SignedHeader{ + {Header: types.Header{BaseHeader: types.BaseHeader{Height: 100}}}, + } + marshalledHeaders := [][]byte{[]byte("header1")} + + err := submitter.SubmitHeaders(ctx, headers, marshalledHeaders, nil, nil) + require.Error(t, err) + require.Equal(t, expectedErr, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, expectedErr.Error(), span.Status().Description) +} + +func TestTracedDASubmitter_SubmitHeaders_Empty(t *testing.T) { + mock := &mockDASubmitterAPI{ + submitHeadersFn: func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error { + return nil + }, + } + submitter, sr := setupDASubmitterTrace(t, mock) + ctx := context.Background() + + err := submitter.SubmitHeaders(ctx, []*types.SignedHeader{}, [][]byte{}, nil, nil) + require.NoError(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + + attrs := span.Attributes() + testutil.RequireAttribute(t, attrs, "header.count", 0) + testutil.RequireAttribute(t, attrs, "header.total_bytes", 0) +} + +func TestTracedDASubmitter_SubmitData_Success(t *testing.T) { + mock := &mockDASubmitterAPI{ + submitDataFn: func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { + return nil + }, + } + submitter, sr := setupDASubmitterTrace(t, mock) + ctx := context.Background() + + signedDataList := []*types.SignedData{ + {Data: types.Data{Metadata: &types.Metadata{Height: 100}}}, + {Data: types.Data{Metadata: &types.Metadata{Height: 101}}}, + } + marshalledData := [][]byte{ + []byte("data1data1"), + []byte("data2data2"), + } + + err := submitter.SubmitData(ctx, signedDataList, marshalledData, nil, nil, genesis.Genesis{}) + require.NoError(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "DASubmitter.SubmitData", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + testutil.RequireAttribute(t, attrs, "data.count", 2) + testutil.RequireAttribute(t, attrs, "data.total_bytes", 20) // 10+10 + testutil.RequireAttribute(t, attrs, "data.start_height", int64(100)) + testutil.RequireAttribute(t, attrs, "data.end_height", int64(101)) +} + +func TestTracedDASubmitter_SubmitData_Error(t *testing.T) { + expectedErr := errors.New("data submission failed") + mock := &mockDASubmitterAPI{ + submitDataFn: func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { + return expectedErr + }, + } + submitter, sr := setupDASubmitterTrace(t, mock) + ctx := context.Background() + + signedDataList := []*types.SignedData{ + {Data: types.Data{Metadata: &types.Metadata{Height: 100}}}, + } + marshalledData := [][]byte{[]byte("data1")} + + err := submitter.SubmitData(ctx, signedDataList, marshalledData, nil, nil, genesis.Genesis{}) + require.Error(t, err) + require.Equal(t, expectedErr, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, expectedErr.Error(), span.Status().Description) +} diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index e6a2328e9..3e1d96777 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -23,8 +23,8 @@ import ( "github.com/evstack/ev-node/types" ) -// daSubmitterAPI defines minimal methods needed by Submitter for DA submissions. -type daSubmitterAPI interface { +// DASubmitterAPI defines minimal methods needed by Submitter for DA submissions. +type DASubmitterAPI interface { SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error } @@ -43,7 +43,7 @@ type Submitter struct { metrics *common.Metrics // DA submitter - daSubmitter daSubmitterAPI + daSubmitter DASubmitterAPI // Optional signer (only for aggregator nodes) signer signer.Signer @@ -80,7 +80,7 @@ func NewSubmitter( metrics *common.Metrics, config config.Config, genesis genesis.Genesis, - daSubmitter daSubmitterAPI, + daSubmitter DASubmitterAPI, sequencer coresequencer.Sequencer, // Can be nil for sync nodes signer signer.Signer, // Can be nil for sync nodes logger zerolog.Logger, From 380933b97212a42bb938af266572401c6ac2d30e Mon Sep 17 00:00:00 2001 From: chatton Date: Mon, 19 Jan 2026 14:10:10 +0000 Subject: [PATCH 06/11] chore: adding forced inclusion tracing --- block/forced_inclusion_tracing.go | 55 +++++++ block/forced_inclusion_tracing_test.go | 155 ++++++++++++++++++ .../syncing/forced_inclusion_tracing.go | 65 ++++++++ block/internal/syncing/syncer.go | 5 +- block/public.go | 6 +- 5 files changed, 284 insertions(+), 2 deletions(-) create mode 100644 block/forced_inclusion_tracing.go create mode 100644 block/forced_inclusion_tracing_test.go create mode 100644 block/internal/syncing/forced_inclusion_tracing.go diff --git a/block/forced_inclusion_tracing.go b/block/forced_inclusion_tracing.go new file mode 100644 index 000000000..4d518d184 --- /dev/null +++ b/block/forced_inclusion_tracing.go @@ -0,0 +1,55 @@ +package block + +import ( + "context" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +var _ ForcedInclusionRetriever = (*tracedForcedInclusionRetriever)(nil) + +type tracedForcedInclusionRetriever struct { + inner ForcedInclusionRetriever + tracer trace.Tracer +} + +// WithTracingForcedInclusionRetriever wraps a ForcedInclusionRetriever with OpenTelemetry tracing. +func WithTracingForcedInclusionRetriever(inner ForcedInclusionRetriever) ForcedInclusionRetriever { + return &tracedForcedInclusionRetriever{ + inner: inner, + tracer: otel.Tracer("ev-node/forced-inclusion"), + } +} + +func (t *tracedForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) { + ctx, span := t.tracer.Start(ctx, "ForcedInclusionRetriever.RetrieveForcedIncludedTxs", + trace.WithAttributes( + attribute.Int64("da.height", int64(daHeight)), + ), + ) + defer span.End() + + event, err := t.inner.RetrieveForcedIncludedTxs(ctx, daHeight) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return event, err + } + + if event != nil { + span.SetAttributes( + attribute.Int64("event.start_da_height", int64(event.StartDaHeight)), + attribute.Int64("event.end_da_height", int64(event.EndDaHeight)), + attribute.Int("event.tx_count", len(event.Txs)), + ) + } + + return event, nil +} + +func (t *tracedForcedInclusionRetriever) Stop() { + t.inner.Stop() +} diff --git a/block/forced_inclusion_tracing_test.go b/block/forced_inclusion_tracing_test.go new file mode 100644 index 000000000..259a6a347 --- /dev/null +++ b/block/forced_inclusion_tracing_test.go @@ -0,0 +1,155 @@ +package block + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + + "github.com/evstack/ev-node/pkg/telemetry/testutil" +) + +type mockForcedInclusionRetriever struct { + retrieveFn func(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) + stopCalled bool +} + +func (m *mockForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) { + if m.retrieveFn != nil { + return m.retrieveFn(ctx, daHeight) + } + return nil, nil +} + +func (m *mockForcedInclusionRetriever) Stop() { + m.stopCalled = true +} + +func setupForcedInclusionTrace(t *testing.T, inner ForcedInclusionRetriever) (ForcedInclusionRetriever, *tracetest.SpanRecorder) { + t.Helper() + sr := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + t.Cleanup(func() { _ = tp.Shutdown(context.Background()) }) + otel.SetTracerProvider(tp) + return WithTracingForcedInclusionRetriever(inner), sr +} + +func TestTracedForcedInclusionRetriever_RetrieveForcedIncludedTxs_Success(t *testing.T) { + expectedEvent := &ForcedInclusionEvent{ + Timestamp: time.Now(), + StartDaHeight: 100, + EndDaHeight: 109, + Txs: [][]byte{[]byte("tx1"), []byte("tx2"), []byte("tx3")}, + } + + mock := &mockForcedInclusionRetriever{ + retrieveFn: func(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) { + return expectedEvent, nil + }, + } + retriever, sr := setupForcedInclusionTrace(t, mock) + ctx := context.Background() + + event, err := retriever.RetrieveForcedIncludedTxs(ctx, 109) + require.NoError(t, err) + require.Equal(t, expectedEvent, event) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "ForcedInclusionRetriever.RetrieveForcedIncludedTxs", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + testutil.RequireAttribute(t, attrs, "da.height", int64(109)) + testutil.RequireAttribute(t, attrs, "event.start_da_height", int64(100)) + testutil.RequireAttribute(t, attrs, "event.end_da_height", int64(109)) + testutil.RequireAttribute(t, attrs, "event.tx_count", 3) +} + +func TestTracedForcedInclusionRetriever_RetrieveForcedIncludedTxs_Error(t *testing.T) { + expectedErr := errors.New("retrieval failed") + mock := &mockForcedInclusionRetriever{ + retrieveFn: func(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) { + return nil, expectedErr + }, + } + retriever, sr := setupForcedInclusionTrace(t, mock) + ctx := context.Background() + + event, err := retriever.RetrieveForcedIncludedTxs(ctx, 100) + require.Error(t, err) + require.Equal(t, expectedErr, err) + require.Nil(t, event) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, expectedErr.Error(), span.Status().Description) +} + +func TestTracedForcedInclusionRetriever_RetrieveForcedIncludedTxs_NilEvent(t *testing.T) { + mock := &mockForcedInclusionRetriever{ + retrieveFn: func(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) { + return nil, nil + }, + } + retriever, sr := setupForcedInclusionTrace(t, mock) + ctx := context.Background() + + event, err := retriever.RetrieveForcedIncludedTxs(ctx, 100) + require.NoError(t, err) + require.Nil(t, event) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Unset, span.Status().Code) + + // only da.height should be present since event is nil + attrs := span.Attributes() + testutil.RequireAttribute(t, attrs, "da.height", int64(100)) +} + +func TestTracedForcedInclusionRetriever_RetrieveForcedIncludedTxs_EmptyTxs(t *testing.T) { + expectedEvent := &ForcedInclusionEvent{ + Timestamp: time.Now(), + StartDaHeight: 100, + EndDaHeight: 100, + Txs: [][]byte{}, + } + + mock := &mockForcedInclusionRetriever{ + retrieveFn: func(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) { + return expectedEvent, nil + }, + } + retriever, sr := setupForcedInclusionTrace(t, mock) + ctx := context.Background() + + event, err := retriever.RetrieveForcedIncludedTxs(ctx, 100) + require.NoError(t, err) + require.Equal(t, expectedEvent, event) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + + attrs := span.Attributes() + testutil.RequireAttribute(t, attrs, "event.tx_count", 0) +} + +func TestTracedForcedInclusionRetriever_Stop(t *testing.T) { + mock := &mockForcedInclusionRetriever{} + retriever, _ := setupForcedInclusionTrace(t, mock) + + retriever.Stop() + require.True(t, mock.stopCalled) +} diff --git a/block/internal/syncing/forced_inclusion_tracing.go b/block/internal/syncing/forced_inclusion_tracing.go new file mode 100644 index 000000000..816c82a2a --- /dev/null +++ b/block/internal/syncing/forced_inclusion_tracing.go @@ -0,0 +1,65 @@ +package syncing + +import ( + "context" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + + "github.com/evstack/ev-node/block/internal/da" +) + +// forcedInclusionRetriever defines the interface for retrieving forced inclusion +// transactions from DA. This local interface is defined to avoid import cycles +// since block/ imports syncing/. +type forcedInclusionRetriever interface { + RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*da.ForcedInclusionEvent, error) + Stop() +} + +var _ forcedInclusionRetriever = (*tracedForcedInclusionRetriever)(nil) + +type tracedForcedInclusionRetriever struct { + inner forcedInclusionRetriever + tracer trace.Tracer +} + +// withTracingForcedInclusionRetriever wraps a forcedInclusionRetriever with OpenTelemetry tracing. +func withTracingForcedInclusionRetriever(inner forcedInclusionRetriever) forcedInclusionRetriever { + return &tracedForcedInclusionRetriever{ + inner: inner, + tracer: otel.Tracer("ev-node/forced-inclusion"), + } +} + +func (t *tracedForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*da.ForcedInclusionEvent, error) { + ctx, span := t.tracer.Start(ctx, "ForcedInclusionRetriever.RetrieveForcedIncludedTxs", + trace.WithAttributes( + attribute.Int64("da.height", int64(daHeight)), + ), + ) + defer span.End() + + event, err := t.inner.RetrieveForcedIncludedTxs(ctx, daHeight) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return event, err + } + + if event != nil { + span.SetAttributes( + attribute.Int64("event.start_da_height", int64(event.StartDaHeight)), + attribute.Int64("event.end_da_height", int64(event.EndDaHeight)), + attribute.Int("event.tx_count", len(event.Txs)), + ) + } + + return event, nil +} + +func (t *tracedForcedInclusionRetriever) Stop() { + t.inner.Stop() +} diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 6365c548b..92d326315 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -104,7 +104,7 @@ type Syncer struct { // Handlers daRetriever DARetriever - fiRetriever *da.ForcedInclusionRetriever + fiRetriever forcedInclusionRetriever p2pHandler p2pHandler // Forced inclusion tracking @@ -205,6 +205,9 @@ func (s *Syncer) Start(ctx context.Context) error { s.daRetriever = WithTracingDARetriever(s.daRetriever) } s.fiRetriever = da.NewForcedInclusionRetriever(s.daClient, s.logger, s.config, s.genesis.DAStartHeight, s.genesis.DAEpochForcedInclusion) + if s.config.Instrumentation.IsTracingEnabled() { + s.fiRetriever = withTracingForcedInclusionRetriever(s.fiRetriever) + } s.p2pHandler = NewP2PHandler(s.headerStore.Store(), s.dataStore.Store(), s.cache, s.genesis, s.logger) if currentHeight, err := s.store.Height(s.ctx); err != nil { s.logger.Error().Err(err).Msg("failed to set initial processed height for p2p handler") diff --git a/block/public.go b/block/public.go index 54bba68c7..0041982b5 100644 --- a/block/public.go +++ b/block/public.go @@ -83,5 +83,9 @@ func NewForcedInclusionRetriever( logger zerolog.Logger, daStartHeight, daEpochSize uint64, ) ForcedInclusionRetriever { - return da.NewForcedInclusionRetriever(client, logger, cfg, daStartHeight, daEpochSize) + base := da.NewForcedInclusionRetriever(client, logger, cfg, daStartHeight, daEpochSize) + if cfg.Instrumentation.IsTracingEnabled() { + return WithTracingForcedInclusionRetriever(base) + } + return base } From 8298a39df83a5c55b6a76c758025e277d578e6e9 Mon Sep 17 00:00:00 2001 From: chatton Date: Mon, 19 Jan 2026 14:19:28 +0000 Subject: [PATCH 07/11] chore: handle tracing internally --- .../internal/da/forced_inclusion_retriever.go | 8 +++++-- .../forced_inclusion_tracing.go | 21 +++++++------------ block/internal/syncing/syncer.go | 5 +---- block/public.go | 7 ++----- 4 files changed, 17 insertions(+), 24 deletions(-) rename block/internal/{syncing => da}/forced_inclusion_tracing.go (62%) diff --git a/block/internal/da/forced_inclusion_retriever.go b/block/internal/da/forced_inclusion_retriever.go index 7b07b7d5d..48968e17f 100644 --- a/block/internal/da/forced_inclusion_retriever.go +++ b/block/internal/da/forced_inclusion_retriever.go @@ -40,7 +40,7 @@ func NewForcedInclusionRetriever( logger zerolog.Logger, cfg config.Config, daStartHeight, daEpochSize uint64, -) *ForcedInclusionRetriever { +) ForcedInclusionRetrieverAPI { retrieverLogger := logger.With().Str("component", "forced_inclusion_retriever").Logger() // Create async block retriever for background prefetching @@ -54,13 +54,17 @@ func NewForcedInclusionRetriever( ) asyncFetcher.Start() - return &ForcedInclusionRetriever{ + base := &ForcedInclusionRetriever{ client: client, logger: retrieverLogger, daStartHeight: daStartHeight, daEpochSize: daEpochSize, asyncFetcher: asyncFetcher, } + if cfg.Instrumentation.IsTracingEnabled() { + return withTracingForcedInclusionRetriever(base) + } + return base } // Stop stops the background prefetcher. diff --git a/block/internal/syncing/forced_inclusion_tracing.go b/block/internal/da/forced_inclusion_tracing.go similarity index 62% rename from block/internal/syncing/forced_inclusion_tracing.go rename to block/internal/da/forced_inclusion_tracing.go index 816c82a2a..fbc28a665 100644 --- a/block/internal/syncing/forced_inclusion_tracing.go +++ b/block/internal/da/forced_inclusion_tracing.go @@ -1,4 +1,4 @@ -package syncing +package da import ( "context" @@ -7,34 +7,29 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" - - "github.com/evstack/ev-node/block/internal/da" ) -// forcedInclusionRetriever defines the interface for retrieving forced inclusion -// transactions from DA. This local interface is defined to avoid import cycles -// since block/ imports syncing/. -type forcedInclusionRetriever interface { - RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*da.ForcedInclusionEvent, error) +// ForcedInclusionRetrieverAPI defines the interface for retrieving forced inclusion transactions from DA. +type ForcedInclusionRetrieverAPI interface { + RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) Stop() } -var _ forcedInclusionRetriever = (*tracedForcedInclusionRetriever)(nil) +var _ ForcedInclusionRetrieverAPI = (*tracedForcedInclusionRetriever)(nil) type tracedForcedInclusionRetriever struct { - inner forcedInclusionRetriever + inner ForcedInclusionRetrieverAPI tracer trace.Tracer } -// withTracingForcedInclusionRetriever wraps a forcedInclusionRetriever with OpenTelemetry tracing. -func withTracingForcedInclusionRetriever(inner forcedInclusionRetriever) forcedInclusionRetriever { +func withTracingForcedInclusionRetriever(inner ForcedInclusionRetrieverAPI) ForcedInclusionRetrieverAPI { return &tracedForcedInclusionRetriever{ inner: inner, tracer: otel.Tracer("ev-node/forced-inclusion"), } } -func (t *tracedForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*da.ForcedInclusionEvent, error) { +func (t *tracedForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) { ctx, span := t.tracer.Start(ctx, "ForcedInclusionRetriever.RetrieveForcedIncludedTxs", trace.WithAttributes( attribute.Int64("da.height", int64(daHeight)), diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 92d326315..71f032230 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -104,7 +104,7 @@ type Syncer struct { // Handlers daRetriever DARetriever - fiRetriever forcedInclusionRetriever + fiRetriever da.ForcedInclusionRetrieverAPI p2pHandler p2pHandler // Forced inclusion tracking @@ -205,9 +205,6 @@ func (s *Syncer) Start(ctx context.Context) error { s.daRetriever = WithTracingDARetriever(s.daRetriever) } s.fiRetriever = da.NewForcedInclusionRetriever(s.daClient, s.logger, s.config, s.genesis.DAStartHeight, s.genesis.DAEpochForcedInclusion) - if s.config.Instrumentation.IsTracingEnabled() { - s.fiRetriever = withTracingForcedInclusionRetriever(s.fiRetriever) - } s.p2pHandler = NewP2PHandler(s.headerStore.Store(), s.dataStore.Store(), s.cache, s.genesis, s.logger) if currentHeight, err := s.store.Height(s.ctx); err != nil { s.logger.Error().Err(err).Msg("failed to set initial processed height for p2p handler") diff --git a/block/public.go b/block/public.go index 0041982b5..f8586eebc 100644 --- a/block/public.go +++ b/block/public.go @@ -77,15 +77,12 @@ type ForcedInclusionRetriever interface { // NewForcedInclusionRetriever creates a new forced inclusion retriever. // It internally creates and manages an AsyncBlockRetriever for background prefetching. +// Tracing is automatically enabled when configured. func NewForcedInclusionRetriever( client DAClient, cfg config.Config, logger zerolog.Logger, daStartHeight, daEpochSize uint64, ) ForcedInclusionRetriever { - base := da.NewForcedInclusionRetriever(client, logger, cfg, daStartHeight, daEpochSize) - if cfg.Instrumentation.IsTracingEnabled() { - return WithTracingForcedInclusionRetriever(base) - } - return base + return da.NewForcedInclusionRetriever(client, logger, cfg, daStartHeight, daEpochSize) } From 0afb107a9d34988b69817ea67cd44b371ccaa056 Mon Sep 17 00:00:00 2001 From: chatton Date: Mon, 19 Jan 2026 14:23:48 +0000 Subject: [PATCH 08/11] chore: removed duplicate tracer --- block/forced_inclusion_tracing.go | 55 --------- block/forced_inclusion_tracing_test.go | 155 ------------------------- 2 files changed, 210 deletions(-) delete mode 100644 block/forced_inclusion_tracing.go delete mode 100644 block/forced_inclusion_tracing_test.go diff --git a/block/forced_inclusion_tracing.go b/block/forced_inclusion_tracing.go deleted file mode 100644 index 4d518d184..000000000 --- a/block/forced_inclusion_tracing.go +++ /dev/null @@ -1,55 +0,0 @@ -package block - -import ( - "context" - - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/trace" -) - -var _ ForcedInclusionRetriever = (*tracedForcedInclusionRetriever)(nil) - -type tracedForcedInclusionRetriever struct { - inner ForcedInclusionRetriever - tracer trace.Tracer -} - -// WithTracingForcedInclusionRetriever wraps a ForcedInclusionRetriever with OpenTelemetry tracing. -func WithTracingForcedInclusionRetriever(inner ForcedInclusionRetriever) ForcedInclusionRetriever { - return &tracedForcedInclusionRetriever{ - inner: inner, - tracer: otel.Tracer("ev-node/forced-inclusion"), - } -} - -func (t *tracedForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) { - ctx, span := t.tracer.Start(ctx, "ForcedInclusionRetriever.RetrieveForcedIncludedTxs", - trace.WithAttributes( - attribute.Int64("da.height", int64(daHeight)), - ), - ) - defer span.End() - - event, err := t.inner.RetrieveForcedIncludedTxs(ctx, daHeight) - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - return event, err - } - - if event != nil { - span.SetAttributes( - attribute.Int64("event.start_da_height", int64(event.StartDaHeight)), - attribute.Int64("event.end_da_height", int64(event.EndDaHeight)), - attribute.Int("event.tx_count", len(event.Txs)), - ) - } - - return event, nil -} - -func (t *tracedForcedInclusionRetriever) Stop() { - t.inner.Stop() -} diff --git a/block/forced_inclusion_tracing_test.go b/block/forced_inclusion_tracing_test.go deleted file mode 100644 index 259a6a347..000000000 --- a/block/forced_inclusion_tracing_test.go +++ /dev/null @@ -1,155 +0,0 @@ -package block - -import ( - "context" - "errors" - "testing" - "time" - - "github.com/stretchr/testify/require" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/codes" - sdktrace "go.opentelemetry.io/otel/sdk/trace" - "go.opentelemetry.io/otel/sdk/trace/tracetest" - - "github.com/evstack/ev-node/pkg/telemetry/testutil" -) - -type mockForcedInclusionRetriever struct { - retrieveFn func(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) - stopCalled bool -} - -func (m *mockForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) { - if m.retrieveFn != nil { - return m.retrieveFn(ctx, daHeight) - } - return nil, nil -} - -func (m *mockForcedInclusionRetriever) Stop() { - m.stopCalled = true -} - -func setupForcedInclusionTrace(t *testing.T, inner ForcedInclusionRetriever) (ForcedInclusionRetriever, *tracetest.SpanRecorder) { - t.Helper() - sr := tracetest.NewSpanRecorder() - tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) - t.Cleanup(func() { _ = tp.Shutdown(context.Background()) }) - otel.SetTracerProvider(tp) - return WithTracingForcedInclusionRetriever(inner), sr -} - -func TestTracedForcedInclusionRetriever_RetrieveForcedIncludedTxs_Success(t *testing.T) { - expectedEvent := &ForcedInclusionEvent{ - Timestamp: time.Now(), - StartDaHeight: 100, - EndDaHeight: 109, - Txs: [][]byte{[]byte("tx1"), []byte("tx2"), []byte("tx3")}, - } - - mock := &mockForcedInclusionRetriever{ - retrieveFn: func(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) { - return expectedEvent, nil - }, - } - retriever, sr := setupForcedInclusionTrace(t, mock) - ctx := context.Background() - - event, err := retriever.RetrieveForcedIncludedTxs(ctx, 109) - require.NoError(t, err) - require.Equal(t, expectedEvent, event) - - spans := sr.Ended() - require.Len(t, spans, 1) - span := spans[0] - require.Equal(t, "ForcedInclusionRetriever.RetrieveForcedIncludedTxs", span.Name()) - require.Equal(t, codes.Unset, span.Status().Code) - - attrs := span.Attributes() - testutil.RequireAttribute(t, attrs, "da.height", int64(109)) - testutil.RequireAttribute(t, attrs, "event.start_da_height", int64(100)) - testutil.RequireAttribute(t, attrs, "event.end_da_height", int64(109)) - testutil.RequireAttribute(t, attrs, "event.tx_count", 3) -} - -func TestTracedForcedInclusionRetriever_RetrieveForcedIncludedTxs_Error(t *testing.T) { - expectedErr := errors.New("retrieval failed") - mock := &mockForcedInclusionRetriever{ - retrieveFn: func(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) { - return nil, expectedErr - }, - } - retriever, sr := setupForcedInclusionTrace(t, mock) - ctx := context.Background() - - event, err := retriever.RetrieveForcedIncludedTxs(ctx, 100) - require.Error(t, err) - require.Equal(t, expectedErr, err) - require.Nil(t, event) - - spans := sr.Ended() - require.Len(t, spans, 1) - span := spans[0] - require.Equal(t, codes.Error, span.Status().Code) - require.Equal(t, expectedErr.Error(), span.Status().Description) -} - -func TestTracedForcedInclusionRetriever_RetrieveForcedIncludedTxs_NilEvent(t *testing.T) { - mock := &mockForcedInclusionRetriever{ - retrieveFn: func(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) { - return nil, nil - }, - } - retriever, sr := setupForcedInclusionTrace(t, mock) - ctx := context.Background() - - event, err := retriever.RetrieveForcedIncludedTxs(ctx, 100) - require.NoError(t, err) - require.Nil(t, event) - - spans := sr.Ended() - require.Len(t, spans, 1) - span := spans[0] - require.Equal(t, codes.Unset, span.Status().Code) - - // only da.height should be present since event is nil - attrs := span.Attributes() - testutil.RequireAttribute(t, attrs, "da.height", int64(100)) -} - -func TestTracedForcedInclusionRetriever_RetrieveForcedIncludedTxs_EmptyTxs(t *testing.T) { - expectedEvent := &ForcedInclusionEvent{ - Timestamp: time.Now(), - StartDaHeight: 100, - EndDaHeight: 100, - Txs: [][]byte{}, - } - - mock := &mockForcedInclusionRetriever{ - retrieveFn: func(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) { - return expectedEvent, nil - }, - } - retriever, sr := setupForcedInclusionTrace(t, mock) - ctx := context.Background() - - event, err := retriever.RetrieveForcedIncludedTxs(ctx, 100) - require.NoError(t, err) - require.Equal(t, expectedEvent, event) - - spans := sr.Ended() - require.Len(t, spans, 1) - span := spans[0] - - attrs := span.Attributes() - testutil.RequireAttribute(t, attrs, "event.tx_count", 0) -} - -func TestTracedForcedInclusionRetriever_Stop(t *testing.T) { - mock := &mockForcedInclusionRetriever{} - retriever, _ := setupForcedInclusionTrace(t, mock) - - retriever.Stop() - require.True(t, mock.stopCalled) -} From f79103894f1fa697c06a0ce84ef33fbd7deddce3 Mon Sep 17 00:00:00 2001 From: chatton Date: Mon, 19 Jan 2026 14:28:59 +0000 Subject: [PATCH 09/11] chore: simplified naming --- .../internal/da/forced_inclusion_retriever.go | 18 ++++++++++++------ block/internal/da/forced_inclusion_tracing.go | 12 +++--------- block/internal/syncing/syncer.go | 2 +- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/block/internal/da/forced_inclusion_retriever.go b/block/internal/da/forced_inclusion_retriever.go index 48968e17f..9b0ad5529 100644 --- a/block/internal/da/forced_inclusion_retriever.go +++ b/block/internal/da/forced_inclusion_retriever.go @@ -16,8 +16,14 @@ import ( // ErrForceInclusionNotConfigured is returned when the forced inclusion namespace is not configured. var ErrForceInclusionNotConfigured = errors.New("forced inclusion namespace not configured") -// ForcedInclusionRetriever handles retrieval of forced inclusion transactions from DA. -type ForcedInclusionRetriever struct { +// ForcedInclusionRetriever defines the interface for retrieving forced inclusion transactions from DA. +type ForcedInclusionRetriever interface { + RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) + Stop() +} + +// forcedInclusionRetriever handles retrieval of forced inclusion transactions from DA. +type forcedInclusionRetriever struct { client Client logger zerolog.Logger daEpochSize uint64 @@ -40,7 +46,7 @@ func NewForcedInclusionRetriever( logger zerolog.Logger, cfg config.Config, daStartHeight, daEpochSize uint64, -) ForcedInclusionRetrieverAPI { +) ForcedInclusionRetriever { retrieverLogger := logger.With().Str("component", "forced_inclusion_retriever").Logger() // Create async block retriever for background prefetching @@ -54,7 +60,7 @@ func NewForcedInclusionRetriever( ) asyncFetcher.Start() - base := &ForcedInclusionRetriever{ + base := &forcedInclusionRetriever{ client: client, logger: retrieverLogger, daStartHeight: daStartHeight, @@ -68,14 +74,14 @@ func NewForcedInclusionRetriever( } // Stop stops the background prefetcher. -func (r *ForcedInclusionRetriever) Stop() { +func (r *forcedInclusionRetriever) Stop() { r.asyncFetcher.Stop() } // RetrieveForcedIncludedTxs retrieves forced inclusion transactions at the given DA height. // It respects epoch boundaries and only fetches at epoch end. // It tries to get blocks from the async fetcher cache first, then falls back to sync fetching. -func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) { +func (r *forcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) { // when daStartHeight is not set or no namespace is configured, we retrieve nothing. if !r.client.HasForcedInclusionNamespace() { return nil, ErrForceInclusionNotConfigured diff --git a/block/internal/da/forced_inclusion_tracing.go b/block/internal/da/forced_inclusion_tracing.go index fbc28a665..7e777161f 100644 --- a/block/internal/da/forced_inclusion_tracing.go +++ b/block/internal/da/forced_inclusion_tracing.go @@ -9,20 +9,14 @@ import ( "go.opentelemetry.io/otel/trace" ) -// ForcedInclusionRetrieverAPI defines the interface for retrieving forced inclusion transactions from DA. -type ForcedInclusionRetrieverAPI interface { - RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) - Stop() -} - -var _ ForcedInclusionRetrieverAPI = (*tracedForcedInclusionRetriever)(nil) +var _ ForcedInclusionRetriever = (*tracedForcedInclusionRetriever)(nil) type tracedForcedInclusionRetriever struct { - inner ForcedInclusionRetrieverAPI + inner ForcedInclusionRetriever tracer trace.Tracer } -func withTracingForcedInclusionRetriever(inner ForcedInclusionRetrieverAPI) ForcedInclusionRetrieverAPI { +func withTracingForcedInclusionRetriever(inner ForcedInclusionRetriever) ForcedInclusionRetriever { return &tracedForcedInclusionRetriever{ inner: inner, tracer: otel.Tracer("ev-node/forced-inclusion"), diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 71f032230..ede86c9a2 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -104,7 +104,7 @@ type Syncer struct { // Handlers daRetriever DARetriever - fiRetriever da.ForcedInclusionRetrieverAPI + fiRetriever da.ForcedInclusionRetriever p2pHandler p2pHandler // Forced inclusion tracking From a289ef3579ee13622216702448474ecceaece74a Mon Sep 17 00:00:00 2001 From: chatton Date: Mon, 19 Jan 2026 14:49:15 +0000 Subject: [PATCH 10/11] chore: add store tracing --- node/full.go | 3 + pkg/store/tracing.go | 337 +++++++++++++++++++++++++++++++++++ pkg/store/tracing_test.go | 361 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 701 insertions(+) create mode 100644 pkg/store/tracing.go create mode 100644 pkg/store/tracing_test.go diff --git a/node/full.go b/node/full.go index c6086bb5d..874909c45 100644 --- a/node/full.go +++ b/node/full.go @@ -81,6 +81,9 @@ func newFullNode( mainKV := store.NewEvNodeKVStore(database) evstore := store.New(mainKV) + if nodeConfig.Instrumentation.IsTracingEnabled() { + evstore = store.WithTracingStore(evstore) + } headerSyncService, err := initHeaderSyncService(mainKV, nodeConfig, genesis, p2pClient, logger) if err != nil { diff --git a/pkg/store/tracing.go b/pkg/store/tracing.go new file mode 100644 index 000000000..201832a54 --- /dev/null +++ b/pkg/store/tracing.go @@ -0,0 +1,337 @@ +package store + +import ( + "context" + + ds "github.com/ipfs/go-datastore" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + + "github.com/evstack/ev-node/types" +) + +var _ Store = (*tracedStore)(nil) + +type tracedStore struct { + inner Store + tracer trace.Tracer +} + +// WithTracingStore wraps a Store with OpenTelemetry tracing. +func WithTracingStore(inner Store) Store { + return &tracedStore{ + inner: inner, + tracer: otel.Tracer("ev-node/store"), + } +} + +func (t *tracedStore) Height(ctx context.Context) (uint64, error) { + ctx, span := t.tracer.Start(ctx, "Store.Height") + defer span.End() + + height, err := t.inner.Height(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return height, err + } + + span.SetAttributes(attribute.Int64("height", int64(height))) + return height, nil +} + +func (t *tracedStore) GetBlockData(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) { + ctx, span := t.tracer.Start(ctx, "Store.GetBlockData", + trace.WithAttributes(attribute.Int64("height", int64(height))), + ) + defer span.End() + + header, data, err := t.inner.GetBlockData(ctx, height) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return header, data, err + } + + return header, data, nil +} + +func (t *tracedStore) GetBlockByHash(ctx context.Context, hash []byte) (*types.SignedHeader, *types.Data, error) { + ctx, span := t.tracer.Start(ctx, "Store.GetBlockByHash", + trace.WithAttributes(attribute.String("hash", string(hash))), + ) + defer span.End() + + header, data, err := t.inner.GetBlockByHash(ctx, hash) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return header, data, err + } + + if header != nil { + span.SetAttributes(attribute.Int64("height", int64(header.Height()))) + } + return header, data, nil +} + +func (t *tracedStore) GetSignature(ctx context.Context, height uint64) (*types.Signature, error) { + ctx, span := t.tracer.Start(ctx, "Store.GetSignature", + trace.WithAttributes(attribute.Int64("height", int64(height))), + ) + defer span.End() + + sig, err := t.inner.GetSignature(ctx, height) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return sig, err + } + + return sig, nil +} + +func (t *tracedStore) GetSignatureByHash(ctx context.Context, hash []byte) (*types.Signature, error) { + ctx, span := t.tracer.Start(ctx, "Store.GetSignatureByHash", + trace.WithAttributes(attribute.String("hash", string(hash))), + ) + defer span.End() + + sig, err := t.inner.GetSignatureByHash(ctx, hash) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return sig, err + } + + return sig, nil +} + +func (t *tracedStore) GetHeader(ctx context.Context, height uint64) (*types.SignedHeader, error) { + ctx, span := t.tracer.Start(ctx, "Store.GetHeader", + trace.WithAttributes(attribute.Int64("height", int64(height))), + ) + defer span.End() + + header, err := t.inner.GetHeader(ctx, height) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return header, err + } + + return header, nil +} + +func (t *tracedStore) GetState(ctx context.Context) (types.State, error) { + ctx, span := t.tracer.Start(ctx, "Store.GetState") + defer span.End() + + state, err := t.inner.GetState(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return state, err + } + + span.SetAttributes(attribute.Int64("state.height", int64(state.LastBlockHeight))) + return state, nil +} + +func (t *tracedStore) GetStateAtHeight(ctx context.Context, height uint64) (types.State, error) { + ctx, span := t.tracer.Start(ctx, "Store.GetStateAtHeight", + trace.WithAttributes(attribute.Int64("height", int64(height))), + ) + defer span.End() + + state, err := t.inner.GetStateAtHeight(ctx, height) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return state, err + } + + return state, nil +} + +func (t *tracedStore) GetMetadata(ctx context.Context, key string) ([]byte, error) { + ctx, span := t.tracer.Start(ctx, "Store.GetMetadata", + trace.WithAttributes(attribute.String("key", key)), + ) + defer span.End() + + data, err := t.inner.GetMetadata(ctx, key) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return data, err + } + + span.SetAttributes(attribute.Int("value.size", len(data))) + return data, nil +} + +func (t *tracedStore) SetMetadata(ctx context.Context, key string, value []byte) error { + ctx, span := t.tracer.Start(ctx, "Store.SetMetadata", + trace.WithAttributes( + attribute.String("key", key), + attribute.Int("value.size", len(value)), + ), + ) + defer span.End() + + err := t.inner.SetMetadata(ctx, key, value) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} + +func (t *tracedStore) Rollback(ctx context.Context, height uint64, aggregator bool) error { + ctx, span := t.tracer.Start(ctx, "Store.Rollback", + trace.WithAttributes( + attribute.Int64("height", int64(height)), + attribute.Bool("aggregator", aggregator), + ), + ) + defer span.End() + + err := t.inner.Rollback(ctx, height, aggregator) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} + +func (t *tracedStore) Close() error { + return t.inner.Close() +} + +func (t *tracedStore) NewBatch(ctx context.Context) (Batch, error) { + ctx, span := t.tracer.Start(ctx, "Store.NewBatch") + defer span.End() + + batch, err := t.inner.NewBatch(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + return &tracedBatch{ + inner: batch, + tracer: t.tracer, + }, nil +} + +var _ Batch = (*tracedBatch)(nil) + +type tracedBatch struct { + inner Batch + tracer trace.Tracer +} + +func (b *tracedBatch) SaveBlockData(header *types.SignedHeader, data *types.Data, signature *types.Signature) error { + _, span := b.tracer.Start(context.Background(), "Batch.SaveBlockData", + trace.WithAttributes(attribute.Int64("height", int64(header.Height()))), + ) + defer span.End() + + err := b.inner.SaveBlockData(header, data, signature) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} + +func (b *tracedBatch) SetHeight(height uint64) error { + _, span := b.tracer.Start(context.Background(), "Batch.SetHeight", + trace.WithAttributes(attribute.Int64("height", int64(height))), + ) + defer span.End() + + err := b.inner.SetHeight(height) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} + +func (b *tracedBatch) UpdateState(state types.State) error { + _, span := b.tracer.Start(context.Background(), "Batch.UpdateState", + trace.WithAttributes(attribute.Int64("state.height", int64(state.LastBlockHeight))), + ) + defer span.End() + + err := b.inner.UpdateState(state) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} + +func (b *tracedBatch) Commit() error { + _, span := b.tracer.Start(context.Background(), "Batch.Commit") + defer span.End() + + err := b.inner.Commit() + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} + +func (b *tracedBatch) Put(key ds.Key, value []byte) error { + _, span := b.tracer.Start(context.Background(), "Batch.Put", + trace.WithAttributes( + attribute.String("key", key.String()), + attribute.Int("value.size", len(value)), + ), + ) + defer span.End() + + err := b.inner.Put(key, value) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} + +func (b *tracedBatch) Delete(key ds.Key) error { + _, span := b.tracer.Start(context.Background(), "Batch.Delete", + trace.WithAttributes(attribute.String("key", key.String())), + ) + defer span.End() + + err := b.inner.Delete(key) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} diff --git a/pkg/store/tracing_test.go b/pkg/store/tracing_test.go new file mode 100644 index 000000000..a5dca2417 --- /dev/null +++ b/pkg/store/tracing_test.go @@ -0,0 +1,361 @@ +package store + +import ( + "context" + "errors" + "testing" + + ds "github.com/ipfs/go-datastore" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + + "github.com/evstack/ev-node/pkg/telemetry/testutil" + "github.com/evstack/ev-node/types" +) + +type tracingMockStore struct { + heightFn func(ctx context.Context) (uint64, error) + getBlockDataFn func(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) + getBlockByHashFn func(ctx context.Context, hash []byte) (*types.SignedHeader, *types.Data, error) + getSignatureFn func(ctx context.Context, height uint64) (*types.Signature, error) + getSignatureByHash func(ctx context.Context, hash []byte) (*types.Signature, error) + getHeaderFn func(ctx context.Context, height uint64) (*types.SignedHeader, error) + getStateFn func(ctx context.Context) (types.State, error) + getStateAtHeightFn func(ctx context.Context, height uint64) (types.State, error) + getMetadataFn func(ctx context.Context, key string) ([]byte, error) + setMetadataFn func(ctx context.Context, key string, value []byte) error + rollbackFn func(ctx context.Context, height uint64, aggregator bool) error + newBatchFn func(ctx context.Context) (Batch, error) +} + +func (m *tracingMockStore) Height(ctx context.Context) (uint64, error) { + if m.heightFn != nil { + return m.heightFn(ctx) + } + return 0, nil +} + +func (m *tracingMockStore) GetBlockData(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) { + if m.getBlockDataFn != nil { + return m.getBlockDataFn(ctx, height) + } + return nil, nil, nil +} + +func (m *tracingMockStore) GetBlockByHash(ctx context.Context, hash []byte) (*types.SignedHeader, *types.Data, error) { + if m.getBlockByHashFn != nil { + return m.getBlockByHashFn(ctx, hash) + } + return nil, nil, nil +} + +func (m *tracingMockStore) GetSignature(ctx context.Context, height uint64) (*types.Signature, error) { + if m.getSignatureFn != nil { + return m.getSignatureFn(ctx, height) + } + return nil, nil +} + +func (m *tracingMockStore) GetSignatureByHash(ctx context.Context, hash []byte) (*types.Signature, error) { + if m.getSignatureByHash != nil { + return m.getSignatureByHash(ctx, hash) + } + return nil, nil +} + +func (m *tracingMockStore) GetHeader(ctx context.Context, height uint64) (*types.SignedHeader, error) { + if m.getHeaderFn != nil { + return m.getHeaderFn(ctx, height) + } + return nil, nil +} + +func (m *tracingMockStore) GetState(ctx context.Context) (types.State, error) { + if m.getStateFn != nil { + return m.getStateFn(ctx) + } + return types.State{}, nil +} + +func (m *tracingMockStore) GetStateAtHeight(ctx context.Context, height uint64) (types.State, error) { + if m.getStateAtHeightFn != nil { + return m.getStateAtHeightFn(ctx, height) + } + return types.State{}, nil +} + +func (m *tracingMockStore) GetMetadata(ctx context.Context, key string) ([]byte, error) { + if m.getMetadataFn != nil { + return m.getMetadataFn(ctx, key) + } + return nil, nil +} + +func (m *tracingMockStore) SetMetadata(ctx context.Context, key string, value []byte) error { + if m.setMetadataFn != nil { + return m.setMetadataFn(ctx, key, value) + } + return nil +} + +func (m *tracingMockStore) Rollback(ctx context.Context, height uint64, aggregator bool) error { + if m.rollbackFn != nil { + return m.rollbackFn(ctx, height, aggregator) + } + return nil +} + +func (m *tracingMockStore) Close() error { + return nil +} + +func (m *tracingMockStore) NewBatch(ctx context.Context) (Batch, error) { + if m.newBatchFn != nil { + return m.newBatchFn(ctx) + } + return &tracingMockBatch{}, nil +} + +type tracingMockBatch struct { + saveBlockDataFn func(header *types.SignedHeader, data *types.Data, signature *types.Signature) error + setHeightFn func(height uint64) error + updateStateFn func(state types.State) error + commitFn func() error + putFn func(key ds.Key, value []byte) error + deleteFn func(key ds.Key) error +} + +func (b *tracingMockBatch) SaveBlockData(header *types.SignedHeader, data *types.Data, signature *types.Signature) error { + if b.saveBlockDataFn != nil { + return b.saveBlockDataFn(header, data, signature) + } + return nil +} + +func (b *tracingMockBatch) SetHeight(height uint64) error { + if b.setHeightFn != nil { + return b.setHeightFn(height) + } + return nil +} + +func (b *tracingMockBatch) UpdateState(state types.State) error { + if b.updateStateFn != nil { + return b.updateStateFn(state) + } + return nil +} + +func (b *tracingMockBatch) Commit() error { + if b.commitFn != nil { + return b.commitFn() + } + return nil +} + +func (b *tracingMockBatch) Put(key ds.Key, value []byte) error { + if b.putFn != nil { + return b.putFn(key, value) + } + return nil +} + +func (b *tracingMockBatch) Delete(key ds.Key) error { + if b.deleteFn != nil { + return b.deleteFn(key) + } + return nil +} + +func setupStoreTrace(t *testing.T, inner Store) (Store, *tracetest.SpanRecorder) { + t.Helper() + sr := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + t.Cleanup(func() { _ = tp.Shutdown(context.Background()) }) + otel.SetTracerProvider(tp) + return WithTracingStore(inner), sr +} + +func TestTracedStore_Height_Success(t *testing.T) { + mock := &tracingMockStore{ + heightFn: func(ctx context.Context) (uint64, error) { + return 100, nil + }, + } + store, sr := setupStoreTrace(t, mock) + ctx := context.Background() + + height, err := store.Height(ctx) + require.NoError(t, err) + require.Equal(t, uint64(100), height) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "Store.Height", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + testutil.RequireAttribute(t, attrs, "height", int64(100)) +} + +func TestTracedStore_Height_Error(t *testing.T) { + expectedErr := errors.New("height error") + mock := &tracingMockStore{ + heightFn: func(ctx context.Context) (uint64, error) { + return 0, expectedErr + }, + } + store, sr := setupStoreTrace(t, mock) + ctx := context.Background() + + _, err := store.Height(ctx) + require.Error(t, err) + require.Equal(t, expectedErr, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) +} + +func TestTracedStore_GetBlockData_Success(t *testing.T) { + expectedHeader := &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{Height: 50}}} + expectedData := &types.Data{} + mock := &tracingMockStore{ + getBlockDataFn: func(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) { + return expectedHeader, expectedData, nil + }, + } + store, sr := setupStoreTrace(t, mock) + ctx := context.Background() + + header, data, err := store.GetBlockData(ctx, 50) + require.NoError(t, err) + require.Equal(t, expectedHeader, header) + require.Equal(t, expectedData, data) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "Store.GetBlockData", span.Name()) + + attrs := span.Attributes() + testutil.RequireAttribute(t, attrs, "height", int64(50)) +} + +func TestTracedStore_GetState_Success(t *testing.T) { + expectedState := types.State{LastBlockHeight: 200} + mock := &tracingMockStore{ + getStateFn: func(ctx context.Context) (types.State, error) { + return expectedState, nil + }, + } + store, sr := setupStoreTrace(t, mock) + ctx := context.Background() + + state, err := store.GetState(ctx) + require.NoError(t, err) + require.Equal(t, expectedState, state) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "Store.GetState", span.Name()) + + attrs := span.Attributes() + testutil.RequireAttribute(t, attrs, "state.height", int64(200)) +} + +func TestTracedStore_Rollback_Success(t *testing.T) { + mock := &tracingMockStore{ + rollbackFn: func(ctx context.Context, height uint64, aggregator bool) error { + return nil + }, + } + store, sr := setupStoreTrace(t, mock) + ctx := context.Background() + + err := store.Rollback(ctx, 50, true) + require.NoError(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "Store.Rollback", span.Name()) + + attrs := span.Attributes() + testutil.RequireAttribute(t, attrs, "height", int64(50)) + testutil.RequireAttribute(t, attrs, "aggregator", true) +} + +func TestTracedStore_NewBatch_Success(t *testing.T) { + mock := &tracingMockStore{ + newBatchFn: func(ctx context.Context) (Batch, error) { + return &tracingMockBatch{}, nil + }, + } + store, sr := setupStoreTrace(t, mock) + ctx := context.Background() + + batch, err := store.NewBatch(ctx) + require.NoError(t, err) + require.NotNil(t, batch) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "Store.NewBatch", span.Name()) +} + +func TestTracedBatch_Commit_Success(t *testing.T) { + mock := &tracingMockStore{ + newBatchFn: func(ctx context.Context) (Batch, error) { + return &tracingMockBatch{}, nil + }, + } + store, sr := setupStoreTrace(t, mock) + ctx := context.Background() + + batch, err := store.NewBatch(ctx) + require.NoError(t, err) + + err = batch.Commit() + require.NoError(t, err) + + spans := sr.Ended() + require.Len(t, spans, 2) + require.Equal(t, "Store.NewBatch", spans[0].Name()) + require.Equal(t, "Batch.Commit", spans[1].Name()) +} + +func TestTracedBatch_SaveBlockData_Success(t *testing.T) { + mock := &tracingMockStore{ + newBatchFn: func(ctx context.Context) (Batch, error) { + return &tracingMockBatch{}, nil + }, + } + store, sr := setupStoreTrace(t, mock) + ctx := context.Background() + + batch, err := store.NewBatch(ctx) + require.NoError(t, err) + + header := &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{Height: 100}}} + data := &types.Data{} + sig := &types.Signature{} + + err = batch.SaveBlockData(header, data, sig) + require.NoError(t, err) + + spans := sr.Ended() + require.Len(t, spans, 2) + require.Equal(t, "Store.NewBatch", spans[0].Name()) + require.Equal(t, "Batch.SaveBlockData", spans[1].Name()) + + attrs := spans[1].Attributes() + testutil.RequireAttribute(t, attrs, "height", int64(100)) +} From 0f5a541b29030b99abb151ba81282bc6f01e69e3 Mon Sep 17 00:00:00 2001 From: chatton Date: Tue, 20 Jan 2026 10:42:53 +0000 Subject: [PATCH 11/11] chore: add http tracing propagation --- pkg/rpc/server/server.go | 8 ++- pkg/telemetry/http_extract.go | 20 +++++++ pkg/telemetry/http_extract_test.go | 86 ++++++++++++++++++++++++++++++ 3 files changed, 113 insertions(+), 1 deletion(-) create mode 100644 pkg/telemetry/http_extract.go create mode 100644 pkg/telemetry/http_extract_test.go diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index fb07058c9..661edec35 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -24,6 +24,7 @@ import ( "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/p2p" "github.com/evstack/ev-node/pkg/store" + "github.com/evstack/ev-node/pkg/telemetry" "github.com/evstack/ev-node/types" pb "github.com/evstack/ev-node/types/pb/evnode/v1" rpc "github.com/evstack/ev-node/types/pb/evnode/v1/v1connect" @@ -413,8 +414,13 @@ func NewServiceHandler( // Register custom HTTP endpoints RegisterCustomHTTPEndpoints(mux, store, peerManager, config, bestKnown, logger) + var handler http.Handler = mux + if config.Instrumentation.IsTracingEnabled() { + handler = telemetry.ExtractTraceContext(mux) + } + // Use h2c to support HTTP/2 without TLS - return h2c.NewHandler(mux, &http2.Server{ + return h2c.NewHandler(handler, &http2.Server{ IdleTimeout: 120 * time.Second, MaxReadFrameSize: 1 << 24, MaxConcurrentStreams: 100, diff --git a/pkg/telemetry/http_extract.go b/pkg/telemetry/http_extract.go new file mode 100644 index 000000000..0cbe77077 --- /dev/null +++ b/pkg/telemetry/http_extract.go @@ -0,0 +1,20 @@ +package telemetry + +import ( + "net/http" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" +) + +// ExtractTraceContext returns HTTP middleware that extracts W3C Trace Context +// headers (traceparent, tracestate) from incoming requests and adds them to +// the request context. This enables spans created downstream to be children +// of the caller's trace. +func ExtractTraceContext(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + prop := otel.GetTextMapPropagator() + ctx := prop.Extract(r.Context(), propagation.HeaderCarrier(r.Header)) + next.ServeHTTP(w, r.WithContext(ctx)) + }) +} diff --git a/pkg/telemetry/http_extract_test.go b/pkg/telemetry/http_extract_test.go new file mode 100644 index 000000000..41f035ddc --- /dev/null +++ b/pkg/telemetry/http_extract_test.go @@ -0,0 +1,86 @@ +package telemetry + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" +) + +func TestExtractTraceContext_WithParentTrace(t *testing.T) { + sr := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + t.Cleanup(func() { _ = tp.Shutdown(context.Background()) }) + otel.SetTracerProvider(tp) + otel.SetTextMapPropagator(propagation.TraceContext{}) + + tracer := tp.Tracer("test") + + // create a parent span to get a valid trace context + parentCtx, parentSpan := tracer.Start(context.Background(), "parent") + parentSpanCtx := parentSpan.SpanContext() + parentSpan.End() + + // inject the parent context into headers + headers := make(http.Header) + otel.GetTextMapPropagator().Inject(parentCtx, propagation.HeaderCarrier(headers)) + + var extractedSpanCtx trace.SpanContext + handler := ExtractTraceContext(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // create a child span using the extracted context + _, childSpan := tracer.Start(r.Context(), "child") + extractedSpanCtx = childSpan.SpanContext() + childSpan.End() + w.WriteHeader(http.StatusOK) + })) + + req := httptest.NewRequest(http.MethodGet, "/test", nil) + req.Header = headers + rec := httptest.NewRecorder() + + handler.ServeHTTP(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + require.True(t, extractedSpanCtx.IsValid(), "child span should be valid") + require.Equal(t, parentSpanCtx.TraceID(), extractedSpanCtx.TraceID(), "child should have same trace ID as parent") +} + +func TestExtractTraceContext_WithoutParentTrace(t *testing.T) { + sr := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + t.Cleanup(func() { _ = tp.Shutdown(context.Background()) }) + otel.SetTracerProvider(tp) + otel.SetTextMapPropagator(propagation.TraceContext{}) + + tracer := tp.Tracer("test") + + var extractedSpanCtx trace.SpanContext + handler := ExtractTraceContext(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // create a span - should be a root span since no parent headers + _, span := tracer.Start(r.Context(), "root") + extractedSpanCtx = span.SpanContext() + span.End() + w.WriteHeader(http.StatusOK) + })) + + req := httptest.NewRequest(http.MethodGet, "/test", nil) + // no trace headers + rec := httptest.NewRecorder() + + handler.ServeHTTP(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + require.True(t, extractedSpanCtx.IsValid(), "span should be valid") + + // verify it's a root span by checking no parent exists in the recorded spans + spans := sr.Ended() + require.Len(t, spans, 1) + require.False(t, spans[0].Parent().IsValid(), "span should be a root span with no parent") +}