Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions cmd/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,19 @@ The process can be interrupted at any time (Ctrl+C), and it will attempt to save
c.log.Info("all logs have been saved", "outputFile", outputFile)
}()

compressFunc := func() error {
closeFunc := func() error {
if compress {
if err := gzipstore.CompressFile(outputFile, outputFile+".gzip"); err != nil {
return fmt.Errorf("error compressing file: %w", err)
}
c.log.Info("File compressed", "outputFile", outputFile+".gzip")
}

if err := filestore.SaveLogsJSON(client.GetStats(), outputFile+".stats.json"); err != nil {
return fmt.Errorf("error saving stats: %w", err)
}
c.log.Info("Stats saved", "outputFile", outputFile+".stats.json")

return nil
}

Expand All @@ -112,7 +118,7 @@ The process can be interrupted at any time (Ctrl+C), and it will attempt to save
c.log.Info("still retrieving logs...")
case <-ctx.Done():
c.log.Info("context canceled, waiting for logs to be saved...")
if err := compressFunc(); err != nil {
if err := closeFunc(); err != nil {
return errors.Join(fmt.Errorf("error compressing file: %w", err), ctx.Err())
}
return ctx.Err()
Expand All @@ -124,7 +130,7 @@ The process can be interrupted at any time (Ctrl+C), and it will attempt to save
}

wg.Wait()
if err := compressFunc(); err != nil {
if err := closeFunc(); err != nil {
return fmt.Errorf("error compressing file: %w", err)
}

Expand Down
25 changes: 19 additions & 6 deletions pkg/eventfetcher/eventfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type Client struct {
batchTopUpTopic common.Hash
batchDepthIncreaseTopic common.Hash
priceUpdateTopic common.Hash
pausedTopic common.Hash
}

func NewClient(client *ethclientwrapper.Client, postageStampContractABI abi.ABI, blockRangeLimit uint32, logger log.Logger) *Client {
Expand All @@ -51,6 +50,10 @@ type Request struct {
EndBlock uint64
}

func (c *Client) GetStats() logcache.Stats {
return c.logCache.GetStats()
}

// GetLogs fetches logs and sends them to a channel
func (c *Client) GetLogs(ctx context.Context, tr *Request) (<-chan types.Log, <-chan error) {
logChan := make(chan types.Log, 100)
Expand All @@ -61,7 +64,7 @@ func (c *Client) GetLogs(ctx context.Context, tr *Request) (<-chan types.Log, <-
defer close(errorChan)
// send the last cached value to the channel
// defer func() {
// priceUpdateLog := c.logCache.Get()
// priceUpdateLog := c.logCache.GetLastPriceUpdateLog()
// if priceUpdateLog != nil {
// c.logger.Info("sending last cached value", "transactionHash", priceUpdateLog.TxHash)
// logChan <- *priceUpdateLog
Expand Down Expand Up @@ -136,11 +139,22 @@ func (c *Client) fetchLogs(ctx context.Context, query ethereum.FilterQuery, logs
}

for _, log := range logs {
// cache the price update log and skip sending it to the channel
// if log.Topics[0] == c.priceUpdateTopic {
// c.logCache.Set(&log)
var isPriceUpdate, isBatchCreated, isBatchTopUp, isBatchDepthIncrease bool
if len(log.Topics) > 0 {
topic := log.Topics[0]
isPriceUpdate = topic == c.priceUpdateTopic
isBatchCreated = topic == c.batchCreatedTopic
isBatchTopUp = topic == c.batchTopUpTopic
isBatchDepthIncrease = topic == c.batchDepthIncreaseTopic
}

c.logCache.ProcessLogAndStats(log, isPriceUpdate, isBatchCreated, isBatchTopUp, isBatchDepthIncrease)

// skip sending it to the channel
// if isPriceUpdate {
// continue
// }

select {
case logsChan <- log:
case <-ctx.Done():
Expand Down Expand Up @@ -173,7 +187,6 @@ func (c *Client) filterQuery(postageStampContractAddress common.Address, from, t
c.batchTopUpTopic,
c.batchDepthIncreaseTopic,
c.priceUpdateTopic,
c.pausedTopic,
},
},
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/filestore/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,19 @@ func SaveLogsAsync(ctx context.Context, logChan <-chan types.Log, filePath strin
}
}
}

func SaveLogsJSON(data any, filePath string) error {
file, err := os.Create(filePath)
if err != nil {
return fmt.Errorf("error creating file: %w", err)
}
defer file.Close()

encoder := json.NewEncoder(file)
encoder.SetIndent("", " ")
if err := encoder.Encode(data); err != nil {
return fmt.Errorf("error encoding data: %w", err)
}

return nil
}
62 changes: 54 additions & 8 deletions pkg/logcache/logcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,69 @@ import (
"github.com/ethereum/go-ethereum/core/types"
)

type Stats struct {
FirstBlock uint64 `json:"first_block"`
LastBlock uint64 `json:"last_block"`
PriceUpdateCounter uint64 `json:"price_update_counter"`
BatchCreatedCounter uint64 `json:"batch_created_counter"`
BatchTopUpCounter uint64 `json:"batch_top_up_counter"`
BatchDepthIncreaseCounter uint64 `json:"batch_depth_increase_counter"`
}

type Cache struct {
l *types.Log
m sync.Mutex
lastPriceUpdateLog *types.Log
stats Stats
m sync.Mutex
}

func New() *Cache {
return &Cache{}
return &Cache{
stats: Stats{},
}
}

// ProcessLogAndStats updates statistics based on the given log and its type.
// It also caches the log if it's a price update event.
func (c *Cache) ProcessLogAndStats(log types.Log, isPriceUpdate, isBatchCreated, isBatchTopUp, isBatchDepthIncrease bool) {
c.m.Lock()
defer c.m.Unlock()

// Update FirstBlock
if c.stats.FirstBlock == 0 || log.BlockNumber < c.stats.FirstBlock {
c.stats.FirstBlock = log.BlockNumber
}

// Update LastBlock
if log.BlockNumber > c.stats.LastBlock {
c.stats.LastBlock = log.BlockNumber
}

// Update counters based on event type
if isPriceUpdate {
c.stats.PriceUpdateCounter++
c.lastPriceUpdateLog = &log // cache the instance of this price update log
}
if isBatchCreated {
c.stats.BatchCreatedCounter++
}
if isBatchTopUp {
c.stats.BatchTopUpCounter++
}
if isBatchDepthIncrease {
c.stats.BatchDepthIncreaseCounter++
}
}

func (c *Cache) Set(l *types.Log) {
// GetLastPriceUpdateLog retrieves the last cached PriceUpdate log.
func (c *Cache) GetLastPriceUpdateLog() *types.Log {
c.m.Lock()
c.l = l
c.m.Unlock()
defer c.m.Unlock()
return c.lastPriceUpdateLog
}

func (c *Cache) Get() *types.Log {
// GetStats retrieves a copy of the current statistics.
func (c *Cache) GetStats() Stats {
c.m.Lock()
defer c.m.Unlock()
return c.l
return c.stats
}
8 changes: 4 additions & 4 deletions pkg/logcache/logcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"github.com/ethersphere/batch-export/pkg/logcache"
)

func TestBasic(t *testing.T) {
func TestGetLastPriceUpdateLog(t *testing.T) {
t.Parallel()
c := logcache.New()
b := &types.Log{}
c.Set(b)
if c.Get() == nil {
b := types.Log{}
c.ProcessLogAndStats(b, true, false, false, false)
if c.GetLastPriceUpdateLog() == nil {
t.Fatal("expected block to be cached")
}
}