diff --git a/node/full.go b/node/full.go index c6086bb5d..874909c45 100644 --- a/node/full.go +++ b/node/full.go @@ -81,6 +81,9 @@ func newFullNode( mainKV := store.NewEvNodeKVStore(database) evstore := store.New(mainKV) + if nodeConfig.Instrumentation.IsTracingEnabled() { + evstore = store.WithTracingStore(evstore) + } headerSyncService, err := initHeaderSyncService(mainKV, nodeConfig, genesis, p2pClient, logger) if err != nil { diff --git a/pkg/store/tracing.go b/pkg/store/tracing.go new file mode 100644 index 000000000..a63ffb782 --- /dev/null +++ b/pkg/store/tracing.go @@ -0,0 +1,340 @@ +package store + +import ( + "context" + "encoding/hex" + + ds "github.com/ipfs/go-datastore" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + + "github.com/evstack/ev-node/types" +) + +var _ Store = (*tracedStore)(nil) + +type tracedStore struct { + inner Store + tracer trace.Tracer +} + +// WithTracingStore wraps a Store with OpenTelemetry tracing. +func WithTracingStore(inner Store) Store { + return &tracedStore{ + inner: inner, + tracer: otel.Tracer("ev-node/store"), + } +} + +func (t *tracedStore) Height(ctx context.Context) (uint64, error) { + ctx, span := t.tracer.Start(ctx, "Store.Height") + defer span.End() + + height, err := t.inner.Height(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return height, err + } + + span.SetAttributes(attribute.Int64("height", int64(height))) + return height, nil +} + +func (t *tracedStore) GetBlockData(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) { + ctx, span := t.tracer.Start(ctx, "Store.GetBlockData", + trace.WithAttributes(attribute.Int64("height", int64(height))), + ) + defer span.End() + + header, data, err := t.inner.GetBlockData(ctx, height) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return header, data, err + } + + return header, data, nil +} + +func (t *tracedStore) GetBlockByHash(ctx context.Context, hash []byte) (*types.SignedHeader, *types.Data, error) { + ctx, span := t.tracer.Start(ctx, "Store.GetBlockByHash", + trace.WithAttributes(attribute.String("hash", hex.EncodeToString(hash))), + ) + defer span.End() + + header, data, err := t.inner.GetBlockByHash(ctx, hash) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return header, data, err + } + + if header != nil { + span.SetAttributes(attribute.Int64("height", int64(header.Height()))) + } + return header, data, nil +} + +func (t *tracedStore) GetSignature(ctx context.Context, height uint64) (*types.Signature, error) { + ctx, span := t.tracer.Start(ctx, "Store.GetSignature", + trace.WithAttributes(attribute.Int64("height", int64(height))), + ) + defer span.End() + + sig, err := t.inner.GetSignature(ctx, height) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return sig, err + } + + return sig, nil +} + +func (t *tracedStore) GetSignatureByHash(ctx context.Context, hash []byte) (*types.Signature, error) { + ctx, span := t.tracer.Start(ctx, "Store.GetSignatureByHash", + trace.WithAttributes(attribute.String("hash", hex.EncodeToString(hash))), + ) + defer span.End() + + sig, err := t.inner.GetSignatureByHash(ctx, hash) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return sig, err + } + + return sig, nil +} + +func (t *tracedStore) GetHeader(ctx context.Context, height uint64) (*types.SignedHeader, error) { + ctx, span := t.tracer.Start(ctx, "Store.GetHeader", + trace.WithAttributes(attribute.Int64("height", int64(height))), + ) + defer span.End() + + header, err := t.inner.GetHeader(ctx, height) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return header, err + } + + return header, nil +} + +func (t *tracedStore) GetState(ctx context.Context) (types.State, error) { + ctx, span := t.tracer.Start(ctx, "Store.GetState") + defer span.End() + + state, err := t.inner.GetState(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return state, err + } + + span.SetAttributes(attribute.Int64("state.height", int64(state.LastBlockHeight))) + return state, nil +} + +func (t *tracedStore) GetStateAtHeight(ctx context.Context, height uint64) (types.State, error) { + ctx, span := t.tracer.Start(ctx, "Store.GetStateAtHeight", + trace.WithAttributes(attribute.Int64("height", int64(height))), + ) + defer span.End() + + state, err := t.inner.GetStateAtHeight(ctx, height) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return state, err + } + + return state, nil +} + +func (t *tracedStore) GetMetadata(ctx context.Context, key string) ([]byte, error) { + ctx, span := t.tracer.Start(ctx, "Store.GetMetadata", + trace.WithAttributes(attribute.String("key", key)), + ) + defer span.End() + + data, err := t.inner.GetMetadata(ctx, key) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return data, err + } + + span.SetAttributes(attribute.Int("value.size", len(data))) + return data, nil +} + +func (t *tracedStore) SetMetadata(ctx context.Context, key string, value []byte) error { + ctx, span := t.tracer.Start(ctx, "Store.SetMetadata", + trace.WithAttributes( + attribute.String("key", key), + attribute.Int("value.size", len(value)), + ), + ) + defer span.End() + + err := t.inner.SetMetadata(ctx, key, value) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} + +func (t *tracedStore) Rollback(ctx context.Context, height uint64, aggregator bool) error { + ctx, span := t.tracer.Start(ctx, "Store.Rollback", + trace.WithAttributes( + attribute.Int64("height", int64(height)), + attribute.Bool("aggregator", aggregator), + ), + ) + defer span.End() + + err := t.inner.Rollback(ctx, height, aggregator) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} + +func (t *tracedStore) Close() error { + return t.inner.Close() +} + +func (t *tracedStore) NewBatch(ctx context.Context) (Batch, error) { + ctx, span := t.tracer.Start(ctx, "Store.NewBatch") + defer span.End() + + batch, err := t.inner.NewBatch(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + return &tracedBatch{ + inner: batch, + tracer: t.tracer, + ctx: ctx, + }, nil +} + +var _ Batch = (*tracedBatch)(nil) + +type tracedBatch struct { + inner Batch + tracer trace.Tracer + ctx context.Context +} + +func (b *tracedBatch) SaveBlockData(header *types.SignedHeader, data *types.Data, signature *types.Signature) error { + _, span := b.tracer.Start(b.ctx, "Batch.SaveBlockData", + trace.WithAttributes(attribute.Int64("height", int64(header.Height()))), + ) + defer span.End() + + err := b.inner.SaveBlockData(header, data, signature) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} + +func (b *tracedBatch) SetHeight(height uint64) error { + _, span := b.tracer.Start(b.ctx, "Batch.SetHeight", + trace.WithAttributes(attribute.Int64("height", int64(height))), + ) + defer span.End() + + err := b.inner.SetHeight(height) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} + +func (b *tracedBatch) UpdateState(state types.State) error { + _, span := b.tracer.Start(b.ctx, "Batch.UpdateState", + trace.WithAttributes(attribute.Int64("state.height", int64(state.LastBlockHeight))), + ) + defer span.End() + + err := b.inner.UpdateState(state) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} + +func (b *tracedBatch) Commit() error { + _, span := b.tracer.Start(b.ctx, "Batch.Commit") + defer span.End() + + err := b.inner.Commit() + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} + +func (b *tracedBatch) Put(key ds.Key, value []byte) error { + _, span := b.tracer.Start(b.ctx, "Batch.Put", + trace.WithAttributes( + attribute.String("key", key.String()), + attribute.Int("value.size", len(value)), + ), + ) + defer span.End() + + err := b.inner.Put(key, value) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} + +func (b *tracedBatch) Delete(key ds.Key) error { + _, span := b.tracer.Start(b.ctx, "Batch.Delete", + trace.WithAttributes(attribute.String("key", key.String())), + ) + defer span.End() + + err := b.inner.Delete(key) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} diff --git a/pkg/store/tracing_test.go b/pkg/store/tracing_test.go new file mode 100644 index 000000000..a5dca2417 --- /dev/null +++ b/pkg/store/tracing_test.go @@ -0,0 +1,361 @@ +package store + +import ( + "context" + "errors" + "testing" + + ds "github.com/ipfs/go-datastore" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + + "github.com/evstack/ev-node/pkg/telemetry/testutil" + "github.com/evstack/ev-node/types" +) + +type tracingMockStore struct { + heightFn func(ctx context.Context) (uint64, error) + getBlockDataFn func(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) + getBlockByHashFn func(ctx context.Context, hash []byte) (*types.SignedHeader, *types.Data, error) + getSignatureFn func(ctx context.Context, height uint64) (*types.Signature, error) + getSignatureByHash func(ctx context.Context, hash []byte) (*types.Signature, error) + getHeaderFn func(ctx context.Context, height uint64) (*types.SignedHeader, error) + getStateFn func(ctx context.Context) (types.State, error) + getStateAtHeightFn func(ctx context.Context, height uint64) (types.State, error) + getMetadataFn func(ctx context.Context, key string) ([]byte, error) + setMetadataFn func(ctx context.Context, key string, value []byte) error + rollbackFn func(ctx context.Context, height uint64, aggregator bool) error + newBatchFn func(ctx context.Context) (Batch, error) +} + +func (m *tracingMockStore) Height(ctx context.Context) (uint64, error) { + if m.heightFn != nil { + return m.heightFn(ctx) + } + return 0, nil +} + +func (m *tracingMockStore) GetBlockData(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) { + if m.getBlockDataFn != nil { + return m.getBlockDataFn(ctx, height) + } + return nil, nil, nil +} + +func (m *tracingMockStore) GetBlockByHash(ctx context.Context, hash []byte) (*types.SignedHeader, *types.Data, error) { + if m.getBlockByHashFn != nil { + return m.getBlockByHashFn(ctx, hash) + } + return nil, nil, nil +} + +func (m *tracingMockStore) GetSignature(ctx context.Context, height uint64) (*types.Signature, error) { + if m.getSignatureFn != nil { + return m.getSignatureFn(ctx, height) + } + return nil, nil +} + +func (m *tracingMockStore) GetSignatureByHash(ctx context.Context, hash []byte) (*types.Signature, error) { + if m.getSignatureByHash != nil { + return m.getSignatureByHash(ctx, hash) + } + return nil, nil +} + +func (m *tracingMockStore) GetHeader(ctx context.Context, height uint64) (*types.SignedHeader, error) { + if m.getHeaderFn != nil { + return m.getHeaderFn(ctx, height) + } + return nil, nil +} + +func (m *tracingMockStore) GetState(ctx context.Context) (types.State, error) { + if m.getStateFn != nil { + return m.getStateFn(ctx) + } + return types.State{}, nil +} + +func (m *tracingMockStore) GetStateAtHeight(ctx context.Context, height uint64) (types.State, error) { + if m.getStateAtHeightFn != nil { + return m.getStateAtHeightFn(ctx, height) + } + return types.State{}, nil +} + +func (m *tracingMockStore) GetMetadata(ctx context.Context, key string) ([]byte, error) { + if m.getMetadataFn != nil { + return m.getMetadataFn(ctx, key) + } + return nil, nil +} + +func (m *tracingMockStore) SetMetadata(ctx context.Context, key string, value []byte) error { + if m.setMetadataFn != nil { + return m.setMetadataFn(ctx, key, value) + } + return nil +} + +func (m *tracingMockStore) Rollback(ctx context.Context, height uint64, aggregator bool) error { + if m.rollbackFn != nil { + return m.rollbackFn(ctx, height, aggregator) + } + return nil +} + +func (m *tracingMockStore) Close() error { + return nil +} + +func (m *tracingMockStore) NewBatch(ctx context.Context) (Batch, error) { + if m.newBatchFn != nil { + return m.newBatchFn(ctx) + } + return &tracingMockBatch{}, nil +} + +type tracingMockBatch struct { + saveBlockDataFn func(header *types.SignedHeader, data *types.Data, signature *types.Signature) error + setHeightFn func(height uint64) error + updateStateFn func(state types.State) error + commitFn func() error + putFn func(key ds.Key, value []byte) error + deleteFn func(key ds.Key) error +} + +func (b *tracingMockBatch) SaveBlockData(header *types.SignedHeader, data *types.Data, signature *types.Signature) error { + if b.saveBlockDataFn != nil { + return b.saveBlockDataFn(header, data, signature) + } + return nil +} + +func (b *tracingMockBatch) SetHeight(height uint64) error { + if b.setHeightFn != nil { + return b.setHeightFn(height) + } + return nil +} + +func (b *tracingMockBatch) UpdateState(state types.State) error { + if b.updateStateFn != nil { + return b.updateStateFn(state) + } + return nil +} + +func (b *tracingMockBatch) Commit() error { + if b.commitFn != nil { + return b.commitFn() + } + return nil +} + +func (b *tracingMockBatch) Put(key ds.Key, value []byte) error { + if b.putFn != nil { + return b.putFn(key, value) + } + return nil +} + +func (b *tracingMockBatch) Delete(key ds.Key) error { + if b.deleteFn != nil { + return b.deleteFn(key) + } + return nil +} + +func setupStoreTrace(t *testing.T, inner Store) (Store, *tracetest.SpanRecorder) { + t.Helper() + sr := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + t.Cleanup(func() { _ = tp.Shutdown(context.Background()) }) + otel.SetTracerProvider(tp) + return WithTracingStore(inner), sr +} + +func TestTracedStore_Height_Success(t *testing.T) { + mock := &tracingMockStore{ + heightFn: func(ctx context.Context) (uint64, error) { + return 100, nil + }, + } + store, sr := setupStoreTrace(t, mock) + ctx := context.Background() + + height, err := store.Height(ctx) + require.NoError(t, err) + require.Equal(t, uint64(100), height) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "Store.Height", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + testutil.RequireAttribute(t, attrs, "height", int64(100)) +} + +func TestTracedStore_Height_Error(t *testing.T) { + expectedErr := errors.New("height error") + mock := &tracingMockStore{ + heightFn: func(ctx context.Context) (uint64, error) { + return 0, expectedErr + }, + } + store, sr := setupStoreTrace(t, mock) + ctx := context.Background() + + _, err := store.Height(ctx) + require.Error(t, err) + require.Equal(t, expectedErr, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) +} + +func TestTracedStore_GetBlockData_Success(t *testing.T) { + expectedHeader := &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{Height: 50}}} + expectedData := &types.Data{} + mock := &tracingMockStore{ + getBlockDataFn: func(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) { + return expectedHeader, expectedData, nil + }, + } + store, sr := setupStoreTrace(t, mock) + ctx := context.Background() + + header, data, err := store.GetBlockData(ctx, 50) + require.NoError(t, err) + require.Equal(t, expectedHeader, header) + require.Equal(t, expectedData, data) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "Store.GetBlockData", span.Name()) + + attrs := span.Attributes() + testutil.RequireAttribute(t, attrs, "height", int64(50)) +} + +func TestTracedStore_GetState_Success(t *testing.T) { + expectedState := types.State{LastBlockHeight: 200} + mock := &tracingMockStore{ + getStateFn: func(ctx context.Context) (types.State, error) { + return expectedState, nil + }, + } + store, sr := setupStoreTrace(t, mock) + ctx := context.Background() + + state, err := store.GetState(ctx) + require.NoError(t, err) + require.Equal(t, expectedState, state) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "Store.GetState", span.Name()) + + attrs := span.Attributes() + testutil.RequireAttribute(t, attrs, "state.height", int64(200)) +} + +func TestTracedStore_Rollback_Success(t *testing.T) { + mock := &tracingMockStore{ + rollbackFn: func(ctx context.Context, height uint64, aggregator bool) error { + return nil + }, + } + store, sr := setupStoreTrace(t, mock) + ctx := context.Background() + + err := store.Rollback(ctx, 50, true) + require.NoError(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "Store.Rollback", span.Name()) + + attrs := span.Attributes() + testutil.RequireAttribute(t, attrs, "height", int64(50)) + testutil.RequireAttribute(t, attrs, "aggregator", true) +} + +func TestTracedStore_NewBatch_Success(t *testing.T) { + mock := &tracingMockStore{ + newBatchFn: func(ctx context.Context) (Batch, error) { + return &tracingMockBatch{}, nil + }, + } + store, sr := setupStoreTrace(t, mock) + ctx := context.Background() + + batch, err := store.NewBatch(ctx) + require.NoError(t, err) + require.NotNil(t, batch) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "Store.NewBatch", span.Name()) +} + +func TestTracedBatch_Commit_Success(t *testing.T) { + mock := &tracingMockStore{ + newBatchFn: func(ctx context.Context) (Batch, error) { + return &tracingMockBatch{}, nil + }, + } + store, sr := setupStoreTrace(t, mock) + ctx := context.Background() + + batch, err := store.NewBatch(ctx) + require.NoError(t, err) + + err = batch.Commit() + require.NoError(t, err) + + spans := sr.Ended() + require.Len(t, spans, 2) + require.Equal(t, "Store.NewBatch", spans[0].Name()) + require.Equal(t, "Batch.Commit", spans[1].Name()) +} + +func TestTracedBatch_SaveBlockData_Success(t *testing.T) { + mock := &tracingMockStore{ + newBatchFn: func(ctx context.Context) (Batch, error) { + return &tracingMockBatch{}, nil + }, + } + store, sr := setupStoreTrace(t, mock) + ctx := context.Background() + + batch, err := store.NewBatch(ctx) + require.NoError(t, err) + + header := &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{Height: 100}}} + data := &types.Data{} + sig := &types.Signature{} + + err = batch.SaveBlockData(header, data, sig) + require.NoError(t, err) + + spans := sr.Ended() + require.Len(t, spans, 2) + require.Equal(t, "Store.NewBatch", spans[0].Name()) + require.Equal(t, "Batch.SaveBlockData", spans[1].Name()) + + attrs := spans[1].Attributes() + testutil.RequireAttribute(t, attrs, "height", int64(100)) +}