diff --git a/cmd/export.go b/cmd/export.go index 2048a42..9ef134a 100644 --- a/cmd/export.go +++ b/cmd/export.go @@ -90,13 +90,19 @@ The process can be interrupted at any time (Ctrl+C), and it will attempt to save c.log.Info("all logs have been saved", "outputFile", outputFile) }() - compressFunc := func() error { + closeFunc := func() error { if compress { if err := gzipstore.CompressFile(outputFile, outputFile+".gzip"); err != nil { return fmt.Errorf("error compressing file: %w", err) } c.log.Info("File compressed", "outputFile", outputFile+".gzip") } + + if err := filestore.SaveLogsJSON(client.GetStats(), outputFile+".stats.json"); err != nil { + return fmt.Errorf("error saving stats: %w", err) + } + c.log.Info("Stats saved", "outputFile", outputFile+".stats.json") + return nil } @@ -112,7 +118,7 @@ The process can be interrupted at any time (Ctrl+C), and it will attempt to save c.log.Info("still retrieving logs...") case <-ctx.Done(): c.log.Info("context canceled, waiting for logs to be saved...") - if err := compressFunc(); err != nil { + if err := closeFunc(); err != nil { return errors.Join(fmt.Errorf("error compressing file: %w", err), ctx.Err()) } return ctx.Err() @@ -124,7 +130,7 @@ The process can be interrupted at any time (Ctrl+C), and it will attempt to save } wg.Wait() - if err := compressFunc(); err != nil { + if err := closeFunc(); err != nil { return fmt.Errorf("error compressing file: %w", err) } diff --git a/pkg/eventfetcher/eventfetcher.go b/pkg/eventfetcher/eventfetcher.go index 09d6de9..76cf4ba 100644 --- a/pkg/eventfetcher/eventfetcher.go +++ b/pkg/eventfetcher/eventfetcher.go @@ -27,7 +27,6 @@ type Client struct { batchTopUpTopic common.Hash batchDepthIncreaseTopic common.Hash priceUpdateTopic common.Hash - pausedTopic common.Hash } func NewClient(client *ethclientwrapper.Client, postageStampContractABI abi.ABI, blockRangeLimit uint32, logger log.Logger) *Client { @@ -51,6 +50,10 @@ type Request struct { EndBlock uint64 } +func (c *Client) GetStats() logcache.Stats { + return c.logCache.GetStats() +} + // GetLogs fetches logs and sends them to a channel func (c *Client) GetLogs(ctx context.Context, tr *Request) (<-chan types.Log, <-chan error) { logChan := make(chan types.Log, 100) @@ -61,7 +64,7 @@ func (c *Client) GetLogs(ctx context.Context, tr *Request) (<-chan types.Log, <- defer close(errorChan) // send the last cached value to the channel // defer func() { - // priceUpdateLog := c.logCache.Get() + // priceUpdateLog := c.logCache.GetLastPriceUpdateLog() // if priceUpdateLog != nil { // c.logger.Info("sending last cached value", "transactionHash", priceUpdateLog.TxHash) // logChan <- *priceUpdateLog @@ -136,11 +139,22 @@ func (c *Client) fetchLogs(ctx context.Context, query ethereum.FilterQuery, logs } for _, log := range logs { - // cache the price update log and skip sending it to the channel - // if log.Topics[0] == c.priceUpdateTopic { - // c.logCache.Set(&log) + var isPriceUpdate, isBatchCreated, isBatchTopUp, isBatchDepthIncrease bool + if len(log.Topics) > 0 { + topic := log.Topics[0] + isPriceUpdate = topic == c.priceUpdateTopic + isBatchCreated = topic == c.batchCreatedTopic + isBatchTopUp = topic == c.batchTopUpTopic + isBatchDepthIncrease = topic == c.batchDepthIncreaseTopic + } + + c.logCache.ProcessLogAndStats(log, isPriceUpdate, isBatchCreated, isBatchTopUp, isBatchDepthIncrease) + + // skip sending it to the channel + // if isPriceUpdate { // continue // } + select { case logsChan <- log: case <-ctx.Done(): @@ -173,7 +187,6 @@ func (c *Client) filterQuery(postageStampContractAddress common.Address, from, t c.batchTopUpTopic, c.batchDepthIncreaseTopic, c.priceUpdateTopic, - c.pausedTopic, }, }, } diff --git a/pkg/filestore/filestore.go b/pkg/filestore/filestore.go index 9617158..ed116e1 100644 --- a/pkg/filestore/filestore.go +++ b/pkg/filestore/filestore.go @@ -34,3 +34,19 @@ func SaveLogsAsync(ctx context.Context, logChan <-chan types.Log, filePath strin } } } + +func SaveLogsJSON(data any, filePath string) error { + file, err := os.Create(filePath) + if err != nil { + return fmt.Errorf("error creating file: %w", err) + } + defer file.Close() + + encoder := json.NewEncoder(file) + encoder.SetIndent("", " ") + if err := encoder.Encode(data); err != nil { + return fmt.Errorf("error encoding data: %w", err) + } + + return nil +} diff --git a/pkg/logcache/logcache.go b/pkg/logcache/logcache.go index 54ed98e..028cde3 100644 --- a/pkg/logcache/logcache.go +++ b/pkg/logcache/logcache.go @@ -6,23 +6,69 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) +type Stats struct { + FirstBlock uint64 `json:"first_block"` + LastBlock uint64 `json:"last_block"` + PriceUpdateCounter uint64 `json:"price_update_counter"` + BatchCreatedCounter uint64 `json:"batch_created_counter"` + BatchTopUpCounter uint64 `json:"batch_top_up_counter"` + BatchDepthIncreaseCounter uint64 `json:"batch_depth_increase_counter"` +} + type Cache struct { - l *types.Log - m sync.Mutex + lastPriceUpdateLog *types.Log + stats Stats + m sync.Mutex } func New() *Cache { - return &Cache{} + return &Cache{ + stats: Stats{}, + } +} + +// ProcessLogAndStats updates statistics based on the given log and its type. +// It also caches the log if it's a price update event. +func (c *Cache) ProcessLogAndStats(log types.Log, isPriceUpdate, isBatchCreated, isBatchTopUp, isBatchDepthIncrease bool) { + c.m.Lock() + defer c.m.Unlock() + + // Update FirstBlock + if c.stats.FirstBlock == 0 || log.BlockNumber < c.stats.FirstBlock { + c.stats.FirstBlock = log.BlockNumber + } + + // Update LastBlock + if log.BlockNumber > c.stats.LastBlock { + c.stats.LastBlock = log.BlockNumber + } + + // Update counters based on event type + if isPriceUpdate { + c.stats.PriceUpdateCounter++ + c.lastPriceUpdateLog = &log // cache the instance of this price update log + } + if isBatchCreated { + c.stats.BatchCreatedCounter++ + } + if isBatchTopUp { + c.stats.BatchTopUpCounter++ + } + if isBatchDepthIncrease { + c.stats.BatchDepthIncreaseCounter++ + } } -func (c *Cache) Set(l *types.Log) { +// GetLastPriceUpdateLog retrieves the last cached PriceUpdate log. +func (c *Cache) GetLastPriceUpdateLog() *types.Log { c.m.Lock() - c.l = l - c.m.Unlock() + defer c.m.Unlock() + return c.lastPriceUpdateLog } -func (c *Cache) Get() *types.Log { +// GetStats retrieves a copy of the current statistics. +func (c *Cache) GetStats() Stats { c.m.Lock() defer c.m.Unlock() - return c.l + return c.stats } diff --git a/pkg/logcache/logcache_test.go b/pkg/logcache/logcache_test.go index 2d0d3d5..e2d15b8 100644 --- a/pkg/logcache/logcache_test.go +++ b/pkg/logcache/logcache_test.go @@ -7,12 +7,12 @@ import ( "github.com/ethersphere/batch-export/pkg/logcache" ) -func TestBasic(t *testing.T) { +func TestGetLastPriceUpdateLog(t *testing.T) { t.Parallel() c := logcache.New() - b := &types.Log{} - c.Set(b) - if c.Get() == nil { + b := types.Log{} + c.ProcessLogAndStats(b, true, false, false, false) + if c.GetLastPriceUpdateLog() == nil { t.Fatal("expected block to be cached") } }