diff --git a/pkg/processor/transaction/structlog/handlers_large_tx.go b/pkg/processor/transaction/structlog/handlers_large_tx.go index f0aed1b..92a1af5 100644 --- a/pkg/processor/transaction/structlog/handlers_large_tx.go +++ b/pkg/processor/transaction/structlog/handlers_large_tx.go @@ -2,13 +2,10 @@ package structlog import ( "context" - "fmt" "math/big" "github.com/0xsequence/ethkit/go-ethereum/core/types" "github.com/sirupsen/logrus" - - "github.com/ethpandaops/execution-processor/pkg/ethereum/execution" ) // processTransactionWithLargeTxHandling processes a transaction with large transaction lock management @@ -18,64 +15,35 @@ func (p *Processor) processTransactionWithLargeTxHandling(ctx context.Context, b return p.ProcessSingleTransaction(ctx, block, index, tx) } - // First, we need to check the size by doing a quick trace extraction - // This is not ideal but necessary to determine if it's a large transaction - // In production, you might want to cache this or estimate based on gas usage - node := p.pool.GetHealthyExecutionNode() - if node == nil { - return 0, fmt.Errorf("no healthy execution node available") - } - - // Get trace to count structlogs - trace, err := node.DebugTraceTransaction(ctx, tx.Hash().String(), block.Number(), execution.DefaultTraceOptions()) - if err != nil { - return 0, fmt.Errorf("failed to get trace for size check: %w", err) - } - - structlogCount := len(trace.Structlogs) - isLargeTx := p.largeTxLock.IsLargeTransaction(structlogCount) - - p.log.WithFields(logrus.Fields{ - "tx_hash": txHash, - "structlog_count": structlogCount, - "is_large_tx": isLargeTx, - "threshold": p.largeTxLock.config.StructlogThreshold, - }).Debug("Checked transaction size") - - if isLargeTx && p.largeTxLock.config.EnableSequentialMode { - // This is a large transaction, acquire lock - if err := p.largeTxLock.AcquireLock(ctx, txHash, structlogCount, "process"); err != nil { - // Failed to acquire lock within timeout - p.log.WithError(err).WithFields(logrus.Fields{ - "tx_hash": txHash, - "structlog_count": structlogCount, - }).Warn("Failed to acquire large transaction lock") - - return 0, err - } - defer p.largeTxLock.ReleaseLock(txHash) - - // Process the large transaction - return p.ProcessSingleTransaction(ctx, block, index, tx) - } else if !isLargeTx && p.largeTxLock.active.Load() { - // Normal transaction but a large transaction is being processed - // Wait for it to complete + // Check if a large transaction is currently being processed + if p.largeTxLock.active.Load() { + // Wait for the large transaction to complete before processing if err := p.largeTxLock.WaitForLargeTransaction(ctx, txHash); err != nil { - // Failed to wait, return error to retry later p.log.WithError(err).WithFields(logrus.Fields{ - "tx_hash": txHash, - "structlog_count": structlogCount, + "tx_hash": txHash, }).Warn("Failed to wait for large transaction") - return 0, err } + } - // Large transaction completed, process normally - return p.ProcessSingleTransaction(ctx, block, index, tx) + // Process the transaction and get size info + structlogCount, isLarge, err := p.ProcessSingleTransactionWithSizeInfo(ctx, block, index, tx) + if err != nil { + return 0, err } - // Normal transaction with no large transaction active - return p.ProcessSingleTransaction(ctx, block, index, tx) + // If it turned out to be a large transaction and we haven't acquired the lock yet + if isLarge && p.largeTxLock.config.EnableSequentialMode && !p.largeTxLock.HasLock(txHash) { + // This is a large transaction that we just discovered + // For now, we've already processed it, but log this for monitoring + p.log.WithFields(logrus.Fields{ + "tx_hash": txHash, + "structlog_count": structlogCount, + "threshold": p.largeTxLock.config.StructlogThreshold, + }).Info("Processed large transaction without pre-acquired lock - consider implementing retry logic") + } + + return structlogCount, nil } // verifyTransactionWithLargeTxHandling verifies a transaction with large transaction lock management diff --git a/pkg/processor/transaction/structlog/large_tx_lock.go b/pkg/processor/transaction/structlog/large_tx_lock.go index 49811a3..77d8063 100644 --- a/pkg/processor/transaction/structlog/large_tx_lock.go +++ b/pkg/processor/transaction/structlog/large_tx_lock.go @@ -221,6 +221,11 @@ func (m *LargeTxLockManager) WaitForLargeTransaction(ctx context.Context, worker } } +// HasLock checks if the given transaction currently holds the lock +func (m *LargeTxLockManager) HasLock(txHash string) bool { + return m.active.Load() && m.currentTx == txHash +} + // GetStatus returns current status of the lock manager func (m *LargeTxLockManager) GetStatus() map[string]interface{} { status := map[string]interface{}{ diff --git a/pkg/processor/transaction/structlog/transaction_processing.go b/pkg/processor/transaction/structlog/transaction_processing.go index ba1bba9..146699d 100644 --- a/pkg/processor/transaction/structlog/transaction_processing.go +++ b/pkg/processor/transaction/structlog/transaction_processing.go @@ -37,50 +37,39 @@ type Structlog struct { // ProcessSingleTransaction processes a single transaction using batch collector (exposed for worker handlers) func (p *Processor) ProcessSingleTransaction(ctx context.Context, block *types.Block, index int, tx *types.Transaction) (int, error) { - // Extract structlog data - structlogs, err := p.ExtractStructlogs(ctx, block, index, tx) + // Extract structlog data with optimized memory handling + structlogCount, err := p.ExtractAndProcessStructlogs(ctx, block, index, tx) if err != nil { return 0, err } - // Store count before processing - structlogCount := len(structlogs) - - // Ensure we clear the slice on exit to allow GC, especially important for failed inserts - defer func() { - // Clear the slice to release memory - structlogs = nil - // Force GC for large transactions or on errors - if structlogCount > 1000 { - runtime.GC() - } - }() + // Record success metrics + common.TransactionsProcessed.WithLabelValues(p.network.Name, "structlog", "success").Inc() - // Send to batch collector for insertion - if err := p.sendToBatchCollector(ctx, structlogs); err != nil { - common.TransactionsProcessed.WithLabelValues(p.network.Name, "structlog", "failed").Inc() - // Log memory cleanup for large failed batches - if structlogCount > 10000 { - p.log.WithFields(logrus.Fields{ - "transaction_hash": tx.Hash().String(), - "structlog_count": structlogCount, - "error": err.Error(), - }).Info("Cleaning up memory after failed batch insert") - } + return structlogCount, nil +} - runtime.GC() +// ProcessSingleTransactionWithSizeInfo processes a transaction and returns size info for large tx handling +func (p *Processor) ProcessSingleTransactionWithSizeInfo(ctx context.Context, block *types.Block, index int, tx *types.Transaction) (int, bool, error) { + // This will be called from handlers_large_tx.go to get size info during processing + structlogCount, err := p.ExtractAndProcessStructlogs(ctx, block, index, tx) + if err != nil { + return 0, false, err + } - return 0, fmt.Errorf("failed to insert structlogs via batch collector: %w", err) + isLarge := false + if p.largeTxLock != nil && p.largeTxLock.config.Enabled { + isLarge = p.largeTxLock.IsLargeTransaction(structlogCount) } // Record success metrics common.TransactionsProcessed.WithLabelValues(p.network.Name, "structlog", "success").Inc() - return structlogCount, nil + return structlogCount, isLarge, nil } -// ExtractStructlogs extracts structlog data from a transaction without inserting to database -func (p *Processor) ExtractStructlogs(ctx context.Context, block *types.Block, index int, tx *types.Transaction) ([]Structlog, error) { +// ExtractAndProcessStructlogs extracts and processes structlog data with optimized memory handling +func (p *Processor) ExtractAndProcessStructlogs(ctx context.Context, block *types.Block, index int, tx *types.Transaction) (int, error) { start := time.Now() defer func() { duration := time.Since(start) @@ -90,37 +79,55 @@ func (p *Processor) ExtractStructlogs(ctx context.Context, block *types.Block, i // Get execution node node := p.pool.GetHealthyExecutionNode() if node == nil { - return nil, fmt.Errorf("no healthy execution node available") + return 0, fmt.Errorf("no healthy execution node available") } // Process transaction with timeout processCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() - // Get transaction trace + // Get transaction trace with stack enabled for CALL operations trace, err := node.DebugTraceTransaction(processCtx, tx.Hash().String(), block.Number(), execution.StackTraceOptions()) if err != nil { - return nil, fmt.Errorf("failed to trace transaction: %w", err) + return 0, fmt.Errorf("failed to trace transaction: %w", err) } - // Convert trace to structlog rows - var structlogs []Structlog + if trace == nil || len(trace.Structlogs) == 0 { + return 0, nil + } - uIndex := uint32(index) //nolint:gosec // index is bounded by block.Transactions() length + // Store the count for return + structlogCount := len(trace.Structlogs) - if trace != nil { - // Pre-allocate slice for better memory efficiency - structlogs = make([]Structlog, 0, len(trace.Structlogs)) + // Pre-extract call addresses into a map to avoid keeping stack data + callAddresses := make(map[int]string) + for i, structLog := range trace.Structlogs { + if structLog.Op == "CALL" && structLog.Stack != nil && len(*structLog.Stack) > 1 { + callAddresses[i] = (*structLog.Stack)[len(*structLog.Stack)-2] + } + // Clear stack data immediately after extraction + structLog.Stack = nil + } - // For extremely large traces, log memory usage periodically - logInterval := 100000 + // Process in batches for better memory efficiency + const batchSize = 10000 + uIndex := uint32(index) //nolint:gosec // index is bounded by block.Transactions() length + + for batchStart := 0; batchStart < len(trace.Structlogs); batchStart += batchSize { + batchEnd := batchStart + batchSize + if batchEnd > len(trace.Structlogs) { + batchEnd = len(trace.Structlogs) + } - for i, structLog := range trace.Structlogs { + // Create batch of structlogs + batch := make([]Structlog, 0, batchEnd-batchStart) + + for i := batchStart; i < batchEnd; i++ { + structLog := trace.Structlogs[i] + var callToAddress *string - - if structLog.Op == "CALL" && structLog.Stack != nil && len(*structLog.Stack) > 1 { - stackValue := (*structLog.Stack)[len(*structLog.Stack)-2] - callToAddress = &stackValue + if addr, exists := callAddresses[i]; exists { + callToAddress = &addr } row := Structlog{ @@ -145,25 +152,60 @@ func (p *Processor) ExtractStructlogs(ctx context.Context, block *types.Block, i MetaNetworkName: p.network.Name, } - structlogs = append(structlogs, row) + batch = append(batch, row) + } - // For very large traces, periodically log memory usage - if i > 0 && i%logInterval == 0 && len(trace.Structlogs) > 200000 { + // Send batch to collector + if err := p.sendToBatchCollector(ctx, batch); err != nil { + common.TransactionsProcessed.WithLabelValues(p.network.Name, "structlog", "failed").Inc() + + // Log memory cleanup for large failed batches + if structlogCount > 10000 { p.log.WithFields(logrus.Fields{ "transaction_hash": tx.Hash().String(), - "progress": fmt.Sprintf("%d/%d", i, len(trace.Structlogs)), - }).Debug("Processing large trace") + "structlog_count": structlogCount, + "batch_start": batchStart, + "batch_end": batchEnd, + "error": err.Error(), + }).Info("Failed to insert batch") + } - // Force GC during processing of extremely large traces - if i%200000 == 0 { - runtime.GC() - } + // Clear batch and force GC on error + batch = nil + if structlogCount > 100000 { + runtime.GC() } + + return 0, fmt.Errorf("failed to insert structlogs via batch collector: %w", err) } - // Clear the original trace data to free memory - trace.Structlogs = nil + // Clear batch after successful insertion + batch = nil + + // Log progress for very large traces + if batchEnd > 100000 && batchEnd%100000 == 0 { + p.log.WithFields(logrus.Fields{ + "transaction_hash": tx.Hash().String(), + "progress": fmt.Sprintf("%d/%d", batchEnd, structlogCount), + }).Debug("Processing large trace") + } + + // Force GC periodically for extremely large traces + if batchEnd > 500000 && batchEnd%500000 == 0 { + runtime.GC() + } } - return structlogs, nil + // Clear all remaining data + trace.Structlogs = nil + trace = nil + callAddresses = nil + + // Final GC for very large transactions + if structlogCount > 100000 { + runtime.GC() + } + + return structlogCount, nil } +