Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 21 additions & 53 deletions pkg/processor/transaction/structlog/handlers_large_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@ package structlog

import (
"context"
"fmt"
"math/big"

"github.com/0xsequence/ethkit/go-ethereum/core/types"
"github.com/sirupsen/logrus"

"github.com/ethpandaops/execution-processor/pkg/ethereum/execution"
)

// processTransactionWithLargeTxHandling processes a transaction with large transaction lock management
Expand All @@ -18,64 +15,35 @@ func (p *Processor) processTransactionWithLargeTxHandling(ctx context.Context, b
return p.ProcessSingleTransaction(ctx, block, index, tx)
}

// First, we need to check the size by doing a quick trace extraction
// This is not ideal but necessary to determine if it's a large transaction
// In production, you might want to cache this or estimate based on gas usage
node := p.pool.GetHealthyExecutionNode()
if node == nil {
return 0, fmt.Errorf("no healthy execution node available")
}

// Get trace to count structlogs
trace, err := node.DebugTraceTransaction(ctx, tx.Hash().String(), block.Number(), execution.DefaultTraceOptions())
if err != nil {
return 0, fmt.Errorf("failed to get trace for size check: %w", err)
}

structlogCount := len(trace.Structlogs)
isLargeTx := p.largeTxLock.IsLargeTransaction(structlogCount)

p.log.WithFields(logrus.Fields{
"tx_hash": txHash,
"structlog_count": structlogCount,
"is_large_tx": isLargeTx,
"threshold": p.largeTxLock.config.StructlogThreshold,
}).Debug("Checked transaction size")

if isLargeTx && p.largeTxLock.config.EnableSequentialMode {
// This is a large transaction, acquire lock
if err := p.largeTxLock.AcquireLock(ctx, txHash, structlogCount, "process"); err != nil {
// Failed to acquire lock within timeout
p.log.WithError(err).WithFields(logrus.Fields{
"tx_hash": txHash,
"structlog_count": structlogCount,
}).Warn("Failed to acquire large transaction lock")

return 0, err
}
defer p.largeTxLock.ReleaseLock(txHash)

// Process the large transaction
return p.ProcessSingleTransaction(ctx, block, index, tx)
} else if !isLargeTx && p.largeTxLock.active.Load() {
// Normal transaction but a large transaction is being processed
// Wait for it to complete
// Check if a large transaction is currently being processed
if p.largeTxLock.active.Load() {
// Wait for the large transaction to complete before processing
if err := p.largeTxLock.WaitForLargeTransaction(ctx, txHash); err != nil {
// Failed to wait, return error to retry later
p.log.WithError(err).WithFields(logrus.Fields{
"tx_hash": txHash,
"structlog_count": structlogCount,
"tx_hash": txHash,
}).Warn("Failed to wait for large transaction")

return 0, err
}
}

// Large transaction completed, process normally
return p.ProcessSingleTransaction(ctx, block, index, tx)
// Process the transaction and get size info
structlogCount, isLarge, err := p.ProcessSingleTransactionWithSizeInfo(ctx, block, index, tx)
if err != nil {
return 0, err
}

// Normal transaction with no large transaction active
return p.ProcessSingleTransaction(ctx, block, index, tx)
// If it turned out to be a large transaction and we haven't acquired the lock yet
if isLarge && p.largeTxLock.config.EnableSequentialMode && !p.largeTxLock.HasLock(txHash) {
// This is a large transaction that we just discovered
// For now, we've already processed it, but log this for monitoring
p.log.WithFields(logrus.Fields{
"tx_hash": txHash,
"structlog_count": structlogCount,
"threshold": p.largeTxLock.config.StructlogThreshold,
}).Info("Processed large transaction without pre-acquired lock - consider implementing retry logic")
}

return structlogCount, nil
}

// verifyTransactionWithLargeTxHandling verifies a transaction with large transaction lock management
Expand Down
5 changes: 5 additions & 0 deletions pkg/processor/transaction/structlog/large_tx_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,11 @@ func (m *LargeTxLockManager) WaitForLargeTransaction(ctx context.Context, worker
}
}

// HasLock checks if the given transaction currently holds the lock
func (m *LargeTxLockManager) HasLock(txHash string) bool {
return m.active.Load() && m.currentTx == txHash
}

// GetStatus returns current status of the lock manager
func (m *LargeTxLockManager) GetStatus() map[string]interface{} {
status := map[string]interface{}{
Expand Down
158 changes: 100 additions & 58 deletions pkg/processor/transaction/structlog/transaction_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,50 +37,39 @@

// ProcessSingleTransaction processes a single transaction using batch collector (exposed for worker handlers)
func (p *Processor) ProcessSingleTransaction(ctx context.Context, block *types.Block, index int, tx *types.Transaction) (int, error) {
// Extract structlog data
structlogs, err := p.ExtractStructlogs(ctx, block, index, tx)
// Extract structlog data with optimized memory handling
structlogCount, err := p.ExtractAndProcessStructlogs(ctx, block, index, tx)
if err != nil {
return 0, err
}

// Store count before processing
structlogCount := len(structlogs)

// Ensure we clear the slice on exit to allow GC, especially important for failed inserts
defer func() {
// Clear the slice to release memory
structlogs = nil
// Force GC for large transactions or on errors
if structlogCount > 1000 {
runtime.GC()
}
}()
// Record success metrics
common.TransactionsProcessed.WithLabelValues(p.network.Name, "structlog", "success").Inc()

// Send to batch collector for insertion
if err := p.sendToBatchCollector(ctx, structlogs); err != nil {
common.TransactionsProcessed.WithLabelValues(p.network.Name, "structlog", "failed").Inc()
// Log memory cleanup for large failed batches
if structlogCount > 10000 {
p.log.WithFields(logrus.Fields{
"transaction_hash": tx.Hash().String(),
"structlog_count": structlogCount,
"error": err.Error(),
}).Info("Cleaning up memory after failed batch insert")
}
return structlogCount, nil
}

runtime.GC()
// ProcessSingleTransactionWithSizeInfo processes a transaction and returns size info for large tx handling
func (p *Processor) ProcessSingleTransactionWithSizeInfo(ctx context.Context, block *types.Block, index int, tx *types.Transaction) (int, bool, error) {
// This will be called from handlers_large_tx.go to get size info during processing
structlogCount, err := p.ExtractAndProcessStructlogs(ctx, block, index, tx)
if err != nil {
return 0, false, err
}

return 0, fmt.Errorf("failed to insert structlogs via batch collector: %w", err)
isLarge := false
if p.largeTxLock != nil && p.largeTxLock.config.Enabled {
isLarge = p.largeTxLock.IsLargeTransaction(structlogCount)
}

// Record success metrics
common.TransactionsProcessed.WithLabelValues(p.network.Name, "structlog", "success").Inc()

return structlogCount, nil
return structlogCount, isLarge, nil
}

// ExtractStructlogs extracts structlog data from a transaction without inserting to database
func (p *Processor) ExtractStructlogs(ctx context.Context, block *types.Block, index int, tx *types.Transaction) ([]Structlog, error) {
// ExtractAndProcessStructlogs extracts and processes structlog data with optimized memory handling
func (p *Processor) ExtractAndProcessStructlogs(ctx context.Context, block *types.Block, index int, tx *types.Transaction) (int, error) {
start := time.Now()
defer func() {
duration := time.Since(start)
Expand All @@ -90,37 +79,55 @@
// Get execution node
node := p.pool.GetHealthyExecutionNode()
if node == nil {
return nil, fmt.Errorf("no healthy execution node available")
return 0, fmt.Errorf("no healthy execution node available")
}

// Process transaction with timeout
processCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

// Get transaction trace
// Get transaction trace with stack enabled for CALL operations
trace, err := node.DebugTraceTransaction(processCtx, tx.Hash().String(), block.Number(), execution.StackTraceOptions())
if err != nil {
return nil, fmt.Errorf("failed to trace transaction: %w", err)
return 0, fmt.Errorf("failed to trace transaction: %w", err)
}

// Convert trace to structlog rows
var structlogs []Structlog
if trace == nil || len(trace.Structlogs) == 0 {
return 0, nil
}

uIndex := uint32(index) //nolint:gosec // index is bounded by block.Transactions() length
// Store the count for return
structlogCount := len(trace.Structlogs)

if trace != nil {
// Pre-allocate slice for better memory efficiency
structlogs = make([]Structlog, 0, len(trace.Structlogs))
// Pre-extract call addresses into a map to avoid keeping stack data
callAddresses := make(map[int]string)
for i, structLog := range trace.Structlogs {

Check failure on line 104 in pkg/processor/transaction/structlog/transaction_processing.go

View workflow job for this annotation

GitHub Actions / lint

ranges should only be cuddled with assignments used in the iteration (wsl)
if structLog.Op == "CALL" && structLog.Stack != nil && len(*structLog.Stack) > 1 {
callAddresses[i] = (*structLog.Stack)[len(*structLog.Stack)-2]
}
// Clear stack data immediately after extraction
structLog.Stack = nil
}

// For extremely large traces, log memory usage periodically
logInterval := 100000
// Process in batches for better memory efficiency
const batchSize = 10000
uIndex := uint32(index) //nolint:gosec // index is bounded by block.Transactions() length

Check failure on line 114 in pkg/processor/transaction/structlog/transaction_processing.go

View workflow job for this annotation

GitHub Actions / lint

assignments should only be cuddled with other assignments (wsl)

Check failure on line 115 in pkg/processor/transaction/structlog/transaction_processing.go

View workflow job for this annotation

GitHub Actions / lint

File is not properly formatted (gofmt)
for batchStart := 0; batchStart < len(trace.Structlogs); batchStart += batchSize {
batchEnd := batchStart + batchSize
if batchEnd > len(trace.Structlogs) {
batchEnd = len(trace.Structlogs)
}

for i, structLog := range trace.Structlogs {
// Create batch of structlogs
batch := make([]Structlog, 0, batchEnd-batchStart)

for i := batchStart; i < batchEnd; i++ {
structLog := trace.Structlogs[i]

var callToAddress *string

if structLog.Op == "CALL" && structLog.Stack != nil && len(*structLog.Stack) > 1 {
stackValue := (*structLog.Stack)[len(*structLog.Stack)-2]
callToAddress = &stackValue
if addr, exists := callAddresses[i]; exists {
callToAddress = &addr
}

row := Structlog{
Expand All @@ -145,25 +152,60 @@
MetaNetworkName: p.network.Name,
}

structlogs = append(structlogs, row)
batch = append(batch, row)
}

// For very large traces, periodically log memory usage
if i > 0 && i%logInterval == 0 && len(trace.Structlogs) > 200000 {
// Send batch to collector
if err := p.sendToBatchCollector(ctx, batch); err != nil {
common.TransactionsProcessed.WithLabelValues(p.network.Name, "structlog", "failed").Inc()

// Log memory cleanup for large failed batches
if structlogCount > 10000 {
p.log.WithFields(logrus.Fields{
"transaction_hash": tx.Hash().String(),
"progress": fmt.Sprintf("%d/%d", i, len(trace.Structlogs)),
}).Debug("Processing large trace")
"structlog_count": structlogCount,
"batch_start": batchStart,
"batch_end": batchEnd,
"error": err.Error(),
}).Info("Failed to insert batch")
}

// Force GC during processing of extremely large traces
if i%200000 == 0 {
runtime.GC()
}
// Clear batch and force GC on error
batch = nil

Check failure on line 174 in pkg/processor/transaction/structlog/transaction_processing.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to batch (ineffassign)
if structlogCount > 100000 {

Check failure on line 175 in pkg/processor/transaction/structlog/transaction_processing.go

View workflow job for this annotation

GitHub Actions / lint

if statements should only be cuddled with assignments used in the if statement itself (wsl)
runtime.GC()
}

return 0, fmt.Errorf("failed to insert structlogs via batch collector: %w", err)
}

// Clear the original trace data to free memory
trace.Structlogs = nil
// Clear batch after successful insertion
batch = nil

Check failure on line 183 in pkg/processor/transaction/structlog/transaction_processing.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to batch (ineffassign)

// Log progress for very large traces
if batchEnd > 100000 && batchEnd%100000 == 0 {
p.log.WithFields(logrus.Fields{
"transaction_hash": tx.Hash().String(),
"progress": fmt.Sprintf("%d/%d", batchEnd, structlogCount),
}).Debug("Processing large trace")
}

// Force GC periodically for extremely large traces
if batchEnd > 500000 && batchEnd%500000 == 0 {
runtime.GC()
}
}

return structlogs, nil
// Clear all remaining data
trace.Structlogs = nil
trace = nil
callAddresses = nil

Check failure on line 202 in pkg/processor/transaction/structlog/transaction_processing.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to callAddresses (ineffassign)

// Final GC for very large transactions
if structlogCount > 100000 {
runtime.GC()
}

return structlogCount, nil
}

Loading