1717package  core
1818
1919import  (
20- 	"errors" 
2120	"fmt" 
21+ 	"sync/atomic" 
2222
2323	"github.com/ethereum/go-ethereum/common" 
2424	"github.com/ethereum/go-ethereum/core/rawdb" 
@@ -47,26 +47,38 @@ type txIndexer struct {
4747	//       and all others shouldn't. 
4848	limit  uint64 
4949
50+ 	// The current head of blockchain for transaction indexing. This field 
51+ 	// is accessed by both the indexer and the indexing progress queries. 
52+ 	head  atomic.Uint64 
53+ 
54+ 	// The current tail of the indexed transactions, null indicates 
55+ 	// that no transactions have been indexed yet. 
56+ 	// 
57+ 	// This field is accessed by both the indexer and the indexing 
58+ 	// progress queries. 
59+ 	tail  atomic.Pointer [uint64 ]
60+ 
5061	// cutoff denotes the block number before which the chain segment should 
5162	// be pruned and not available locally. 
52- 	cutoff    uint64 
53- 	db        ethdb.Database 
54- 	progress  chan  chan  TxIndexProgress 
55- 	term      chan  chan  struct {}
56- 	closed    chan  struct {}
63+ 	cutoff  uint64 
64+ 	db      ethdb.Database 
65+ 	term    chan  chan  struct {}
66+ 	closed  chan  struct {}
5767}
5868
5969// newTxIndexer initializes the transaction indexer. 
6070func  newTxIndexer (limit  uint64 , chain  * BlockChain ) * txIndexer  {
6171	cutoff , _  :=  chain .HistoryPruningCutoff ()
6272	indexer  :=  & txIndexer {
63- 		limit :    limit ,
64- 		cutoff :   cutoff ,
65- 		db :       chain .db ,
66- 		progress : make (chan  chan  TxIndexProgress ),
67- 		term :     make (chan  chan  struct {}),
68- 		closed :   make (chan  struct {}),
73+ 		limit :  limit ,
74+ 		cutoff : cutoff ,
75+ 		db :     chain .db ,
76+ 		term :   make (chan  chan  struct {}),
77+ 		closed : make (chan  struct {}),
6978	}
79+ 	indexer .head .Store (indexer .resolveHead ())
80+ 	indexer .tail .Store (rawdb .ReadTxIndexTail (chain .db ))
81+ 
7082	go  indexer .loop (chain )
7183
7284	var  msg  string 
@@ -154,6 +166,7 @@ func (indexer *txIndexer) repair(head uint64) {
154166		// A crash may occur between the two delete operations, 
155167		// potentially leaving dangling indexes in the database. 
156168		// However, this is considered acceptable. 
169+ 		indexer .tail .Store (nil )
157170		rawdb .DeleteTxIndexTail (indexer .db )
158171		rawdb .DeleteAllTxLookupEntries (indexer .db , nil )
159172		log .Warn ("Purge transaction indexes" , "head" , head , "tail" , * tail )
@@ -174,6 +187,7 @@ func (indexer *txIndexer) repair(head uint64) {
174187		// Traversing the database directly within the transaction 
175188		// index namespace might be slow and expensive, but we 
176189		// have no choice. 
190+ 		indexer .tail .Store (nil )
177191		rawdb .DeleteTxIndexTail (indexer .db )
178192		rawdb .DeleteAllTxLookupEntries (indexer .db , nil )
179193		log .Warn ("Purge transaction indexes" , "head" , head , "cutoff" , indexer .cutoff )
@@ -187,6 +201,7 @@ func (indexer *txIndexer) repair(head uint64) {
187201		// A crash may occur between the two delete operations, 
188202		// potentially leaving dangling indexes in the database. 
189203		// However, this is considered acceptable. 
204+ 		indexer .tail .Store (& indexer .cutoff )
190205		rawdb .WriteTxIndexTail (indexer .db , indexer .cutoff )
191206		rawdb .DeleteAllTxLookupEntries (indexer .db , func (txhash  common.Hash , blob  []byte ) bool  {
192207			n  :=  rawdb .DecodeTxLookupEntry (blob , indexer .db )
@@ -216,16 +231,15 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
216231
217232	// Listening to chain events and manipulate the transaction indexes. 
218233	var  (
219- 		stop  chan  struct {}           // Non-nil if background routine is active 
220- 		done  chan  struct {}           // Non-nil if background routine is active 
221- 		head  =  indexer .resolveHead () // The latest announced chain head 
222- 
234+ 		stop    chan  struct {} // Non-nil if background routine is active 
235+ 		done    chan  struct {} // Non-nil if background routine is active 
223236		headCh  =  make (chan  ChainHeadEvent )
224237		sub     =  chain .SubscribeChainHeadEvent (headCh )
225238	)
226239	defer  sub .Unsubscribe ()
227240
228241	// Validate the transaction indexes and repair if necessary 
242+ 	head  :=  indexer .head .Load ()
229243	indexer .repair (head )
230244
231245	// Launch the initial processing if chain is not empty (head != genesis). 
@@ -238,17 +252,18 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
238252	for  {
239253		select  {
240254		case  h  :=  <- headCh :
255+ 			indexer .head .Store (h .Header .Number .Uint64 ())
241256			if  done  ==  nil  {
242257				stop  =  make (chan  struct {})
243258				done  =  make (chan  struct {})
244259				go  indexer .run (h .Header .Number .Uint64 (), stop , done )
245260			}
246- 			 head   =   h . Header . Number . Uint64 () 
261+ 
247262		case  <- done :
248263			stop  =  nil 
249264			done  =  nil 
250- 		case   ch   :=   <- indexer .progress : 
251- 			 ch   <-   indexer . report ( head ) 
265+ 			 indexer .tail . Store ( rawdb . ReadTxIndexTail ( indexer . db )) 
266+ 
252267		case  ch  :=  <- indexer .term :
253268			if  stop  !=  nil  {
254269				close (stop )
@@ -264,7 +279,7 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
264279}
265280
266281// report returns the tx indexing progress. 
267- func  (indexer  * txIndexer ) report (head  uint64 ) TxIndexProgress  {
282+ func  (indexer  * txIndexer ) report (head  uint64 ,  tail   * uint64 ) TxIndexProgress  {
268283	// Special case if the head is even below the cutoff, 
269284	// nothing to index. 
270285	if  head  <  indexer .cutoff  {
@@ -284,7 +299,6 @@ func (indexer *txIndexer) report(head uint64) TxIndexProgress {
284299	}
285300	// Compute how many blocks have been indexed 
286301	var  indexed  uint64 
287- 	tail  :=  rawdb .ReadTxIndexTail (indexer .db )
288302	if  tail  !=  nil  {
289303		indexed  =  head  -  * tail  +  1 
290304	}
@@ -300,16 +314,12 @@ func (indexer *txIndexer) report(head uint64) TxIndexProgress {
300314	}
301315}
302316
303- // txIndexProgress retrieves the tx indexing progress, or an error if the 
304- // background tx indexer is already stopped. 
305- func  (indexer  * txIndexer ) txIndexProgress () (TxIndexProgress , error ) {
306- 	ch  :=  make (chan  TxIndexProgress , 1 )
307- 	select  {
308- 	case  indexer .progress  <-  ch :
309- 		return  <- ch , nil 
310- 	case  <- indexer .closed :
311- 		return  TxIndexProgress {}, errors .New ("indexer is closed" )
312- 	}
317+ // txIndexProgress retrieves the transaction indexing progress. The reported 
318+ // progress may slightly lag behind the actual indexing state, as the tail is 
319+ // only updated at the end of each indexing operation. However, this delay is 
320+ // considered acceptable. 
321+ func  (indexer  * txIndexer ) txIndexProgress () TxIndexProgress  {
322+ 	return  indexer .report (indexer .head .Load (), indexer .tail .Load ())
313323}
314324
315325// close shutdown the indexer. Safe to be called for multiple times. 
0 commit comments