Skip to content

Commit 8ffd334

Browse files
bestbeforetodaydenyeart
authored andcommitted
Locate correct block number for transaction ID in ChaincodeEvents (#3289)
If the request contains an `after_transaction_id`, start reading from the block containing that transaction ID and ignore any specified start block. If the specified transaction has not been committed in a block, fall back to any specified start block or next committed block. Signed-off-by: Mark S. Lewis <[email protected]>
1 parent f64eea2 commit 8ffd334

File tree

16 files changed

+680
-778
lines changed

16 files changed

+680
-778
lines changed

internal/pkg/gateway/api.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ import (
1919
ab "github.com/hyperledger/fabric-protos-go/orderer"
2020
"github.com/hyperledger/fabric-protos-go/peer"
2121
"github.com/hyperledger/fabric/common/flogging"
22-
"github.com/hyperledger/fabric/common/ledger"
2322
"github.com/hyperledger/fabric/core/aclmgmt/resources"
2423
"github.com/hyperledger/fabric/core/chaincode"
2524
"github.com/hyperledger/fabric/internal/pkg/gateway/event"
25+
"github.com/hyperledger/fabric/internal/pkg/gateway/ledger"
2626
"github.com/hyperledger/fabric/protoutil"
2727
"google.golang.org/grpc/codes"
2828
"google.golang.org/grpc/status"
@@ -543,7 +543,7 @@ func (gs *Server) ChaincodeEvents(signedRequest *gp.SignedChaincodeEventsRequest
543543
return status.Error(codes.NotFound, err.Error())
544544
}
545545

546-
startBlock, err := startBlockFromLedgerPosition(ledger, request.GetStartPosition())
546+
startBlock, err := chaincodeEventsStartBlock(ledger, request)
547547
if err != nil {
548548
return err
549549
}
@@ -611,6 +611,17 @@ func chaincodeEventMatcher(request *gp.ChaincodeEventsRequest) func(event *peer.
611611
}
612612
}
613613

614+
func chaincodeEventsStartBlock(ledger ledger.Ledger, request *gp.ChaincodeEventsRequest) (uint64, error) {
615+
afterTransactionID := request.GetAfterTransactionId()
616+
if len(afterTransactionID) > 0 {
617+
if block, err := ledger.GetBlockByTxID(afterTransactionID); err == nil {
618+
return block.GetHeader().GetNumber(), nil
619+
}
620+
}
621+
622+
return startBlockFromLedgerPosition(ledger, request.GetStartPosition())
623+
}
624+
614625
func startBlockFromLedgerPosition(ledger ledger.Ledger, position *ab.SeekPosition) (uint64, error) {
615626
switch seek := position.GetType().(type) {
616627
case nil:

internal/pkg/gateway/api_test.go

Lines changed: 71 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,14 @@ import (
2626
"github.com/hyperledger/fabric/common/crypto/tlsgen"
2727
"github.com/hyperledger/fabric/common/flogging"
2828
"github.com/hyperledger/fabric/common/flogging/mock"
29-
commonledger "github.com/hyperledger/fabric/common/ledger"
29+
"github.com/hyperledger/fabric/common/ledger"
3030
"github.com/hyperledger/fabric/gossip/api"
3131
"github.com/hyperledger/fabric/gossip/common"
3232
gdiscovery "github.com/hyperledger/fabric/gossip/discovery"
3333
"github.com/hyperledger/fabric/internal/pkg/comm"
3434
"github.com/hyperledger/fabric/internal/pkg/gateway/commit"
3535
"github.com/hyperledger/fabric/internal/pkg/gateway/config"
36+
ledgermocks "github.com/hyperledger/fabric/internal/pkg/gateway/ledger/mocks"
3637
"github.com/hyperledger/fabric/internal/pkg/gateway/mocks"
3738
idmocks "github.com/hyperledger/fabric/internal/pkg/identity/mocks"
3839
"github.com/hyperledger/fabric/protoutil"
@@ -78,19 +79,9 @@ type aclChecker interface {
7879
ACLChecker
7980
}
8081

81-
//go:generate counterfeiter -o mocks/ledgerprovider.go --fake-name LedgerProvider . ledgerProvider
82-
type ledgerProvider interface {
83-
LedgerProvider
84-
}
85-
86-
//go:generate counterfeiter -o mocks/ledger.go --fake-name Ledger . mockLedger
87-
type mockLedger interface {
88-
commonledger.Ledger
89-
}
90-
91-
//go:generate counterfeiter -o mocks/resultsiterator.go --fake-name ResultsIterator . resultsIterator
82+
//go:generate counterfeiter -o mocks/resultsiterator.go --fake-name ResultsIterator . mockResultsIterator
9283
type mockResultsIterator interface {
93-
commonledger.ResultsIterator
84+
ledger.ResultsIterator
9485
}
9586

9687
type (
@@ -169,8 +160,8 @@ type preparedTest struct {
169160
finder *mocks.CommitFinder
170161
eventsServer *mocks.ChaincodeEventsServer
171162
policy *mocks.ACLChecker
172-
ledgerProvider *mocks.LedgerProvider
173-
ledger *mocks.Ledger
163+
ledgerProvider *ledgermocks.Provider
164+
ledger *ledgermocks.Ledger
174165
blockIterator *mocks.ResultsIterator
175166
logLevel string
176167
logFields []string
@@ -1652,7 +1643,7 @@ func TestChaincodeEvents(t *testing.T) {
16521643
},
16531644
},
16541645
{
1655-
name: "identifies previous transaction ID if from different chaincode",
1646+
name: "identifies specified transaction if from different chaincode",
16561647
blocks: []*cp.Block{
16571648
differentChaincodePartReadBlock,
16581649
},
@@ -1672,7 +1663,7 @@ func TestChaincodeEvents(t *testing.T) {
16721663
},
16731664
},
16741665
{
1675-
name: "identifies previous transaction ID if not in start block",
1666+
name: "identifies specified transaction if not in first read block",
16761667
blocks: []*cp.Block{
16771668
noMatchingEventsBlock,
16781669
partReadBlock,
@@ -1781,6 +1772,65 @@ func TestChaincodeEvents(t *testing.T) {
17811772
require.EqualValues(t, 101, test.ledger.GetBlocksIteratorArgsForCall(0))
17821773
},
17831774
},
1775+
{
1776+
name: "uses block containing specified transaction instead of start block",
1777+
blocks: []*cp.Block{
1778+
matchingEventBlock,
1779+
},
1780+
postSetup: func(t *testing.T, test *preparedTest) {
1781+
ledgerInfo := &cp.BlockchainInfo{
1782+
Height: 101,
1783+
}
1784+
test.ledger.GetBlockchainInfoReturns(ledgerInfo, nil)
1785+
1786+
block := &cp.Block{
1787+
Header: &cp.BlockHeader{
1788+
Number: 99,
1789+
},
1790+
}
1791+
test.ledger.GetBlockByTxIDReturns(block, nil)
1792+
},
1793+
afterTxID: "TX_ID",
1794+
startPosition: &ab.SeekPosition{
1795+
Type: &ab.SeekPosition_Specified{
1796+
Specified: &ab.SeekSpecified{
1797+
Number: 1,
1798+
},
1799+
},
1800+
},
1801+
postTest: func(t *testing.T, test *preparedTest) {
1802+
require.Equal(t, 1, test.ledger.GetBlocksIteratorCallCount())
1803+
require.EqualValues(t, 99, test.ledger.GetBlocksIteratorArgsForCall(0))
1804+
require.Equal(t, 1, test.ledger.GetBlockByTxIDCallCount())
1805+
require.Equal(t, "TX_ID", test.ledger.GetBlockByTxIDArgsForCall(0))
1806+
},
1807+
},
1808+
{
1809+
name: "uses start block if specified transaction not found",
1810+
blocks: []*cp.Block{
1811+
matchingEventBlock,
1812+
},
1813+
postSetup: func(t *testing.T, test *preparedTest) {
1814+
ledgerInfo := &cp.BlockchainInfo{
1815+
Height: 101,
1816+
}
1817+
test.ledger.GetBlockchainInfoReturns(ledgerInfo, nil)
1818+
1819+
test.ledger.GetBlockByTxIDReturns(nil, errors.New("NOT_FOUND"))
1820+
},
1821+
afterTxID: "TX_ID",
1822+
startPosition: &ab.SeekPosition{
1823+
Type: &ab.SeekPosition_Specified{
1824+
Specified: &ab.SeekSpecified{
1825+
Number: 1,
1826+
},
1827+
},
1828+
},
1829+
postTest: func(t *testing.T, test *preparedTest) {
1830+
require.Equal(t, 1, test.ledger.GetBlocksIteratorCallCount())
1831+
require.EqualValues(t, 1, test.ledger.GetBlocksIteratorArgsForCall(0))
1832+
},
1833+
},
17841834
{
17851835
name: "returns error for unsupported start position type",
17861836
blocks: []*cp.Block{
@@ -1899,7 +1949,7 @@ func TestNilArgs(t *testing.T) {
18991949
&mocks.Discovery{},
19001950
&mocks.CommitFinder{},
19011951
&mocks.ACLChecker{},
1902-
&mocks.LedgerProvider{},
1952+
&ledgermocks.Provider{},
19031953
gdiscovery.NetworkMember{
19041954
PKIid: common.PKIidType("id1"),
19051955
Endpoint: "localhost:7051",
@@ -1978,7 +2028,7 @@ func prepareTest(t *testing.T, tt *testDef) *preparedTest {
19782028
blockChannel <- block
19792029
}
19802030
close(blockChannel)
1981-
mockBlockIterator.NextCalls(func() (commonledger.QueryResult, error) {
2031+
mockBlockIterator.NextCalls(func() (ledger.QueryResult, error) {
19822032
if tt.eventErr != nil {
19832033
return nil, tt.eventErr
19842034
}
@@ -1991,14 +2041,14 @@ func prepareTest(t *testing.T, tt *testDef) *preparedTest {
19912041
return block, nil
19922042
})
19932043

1994-
mockLedger := &mocks.Ledger{}
2044+
mockLedger := &ledgermocks.Ledger{}
19952045
ledgerInfo := &cp.BlockchainInfo{
19962046
Height: 1,
19972047
}
19982048
mockLedger.GetBlockchainInfoReturns(ledgerInfo, nil)
19992049
mockLedger.GetBlocksIteratorReturns(mockBlockIterator, nil)
20002050

2001-
mockLedgerProvider := &mocks.LedgerProvider{}
2051+
mockLedgerProvider := &ledgermocks.Provider{}
20022052
mockLedgerProvider.LedgerReturns(mockLedger, nil)
20032053

20042054
validProposal := createProposal(t, testChannel, testChaincode, tt.transientData)

internal/pkg/gateway/commit/finder.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"context"
1313

1414
"github.com/hyperledger/fabric-protos-go/peer"
15+
"github.com/hyperledger/fabric/internal/pkg/gateway/ledger"
1516

1617
"github.com/pkg/errors"
1718
)
@@ -22,21 +23,15 @@ type Status struct {
2223
Code peer.TxValidationCode
2324
}
2425

25-
// QueryProvider provides status of previously committed transactions on a given channel. An error is returned if the
26-
// transaction is not present in the ledger.
27-
type QueryProvider interface {
28-
TransactionStatus(channelName string, transactionID string) (peer.TxValidationCode, uint64, error)
29-
}
30-
3126
// Finder is used to obtain transaction status.
3227
type Finder struct {
33-
query QueryProvider
28+
provider ledger.Provider
3429
notifier *Notifier
3530
}
3631

37-
func NewFinder(query QueryProvider, notifier *Notifier) *Finder {
32+
func NewFinder(provider ledger.Provider, notifier *Notifier) *Finder {
3833
return &Finder{
39-
query: query,
34+
provider: provider,
4035
notifier: notifier,
4136
}
4237
}
@@ -53,7 +48,12 @@ func (finder *Finder) TransactionStatus(ctx context.Context, channelName string,
5348
return nil, err
5449
}
5550

56-
if code, blockNumber, err := finder.query.TransactionStatus(channelName, transactionID); err == nil {
51+
ledger, err := finder.provider.Ledger(channelName)
52+
if err != nil {
53+
return nil, err
54+
}
55+
56+
if code, blockNumber, err := ledger.GetTxValidationCodeByTxID(transactionID); err == nil {
5757
status := &Status{
5858
BlockNumber: blockNumber,
5959
TransactionID: transactionID,

internal/pkg/gateway/commit/finder_test.go

Lines changed: 23 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,11 @@ import (
1313

1414
"github.com/hyperledger/fabric-protos-go/peer"
1515
"github.com/hyperledger/fabric/core/ledger"
16-
"github.com/hyperledger/fabric/internal/pkg/gateway/commit/mocks"
16+
"github.com/hyperledger/fabric/internal/pkg/gateway/ledger/mocks"
1717
"github.com/pkg/errors"
1818
"github.com/stretchr/testify/require"
1919
)
2020

21-
//go:generate counterfeiter -o mocks/queryprovider.go --fake-name QueryProvider . queryProvider
22-
type queryProvider interface { // Mimic QueryProvider to avoid circular import with generated mock
23-
QueryProvider
24-
}
25-
2621
func TestFinder(t *testing.T) {
2722
sendUntilDone := func(commitSend chan<- *ledger.CommitNotification, msg *ledger.CommitNotification) chan struct{} {
2823
done := make(chan struct{})
@@ -40,35 +35,39 @@ func TestFinder(t *testing.T) {
4035
return done
4136
}
4237

43-
t.Run("passes channel name to query provider", func(t *testing.T) {
44-
provider := &mocks.QueryProvider{}
45-
provider.TransactionStatusReturns(peer.TxValidationCode_MVCC_READ_CONFLICT, 101, nil)
38+
newMocks := func(code peer.TxValidationCode, blockNumber uint64, err error) (*mocks.Provider, *mocks.Ledger) {
39+
provider, ledger := newLedgerMocks()
40+
ledger.GetTxValidationCodeByTxIDReturns(code, blockNumber, err)
41+
42+
return provider, ledger
43+
}
44+
45+
t.Run("passes channel name to provider", func(t *testing.T) {
46+
provider, _ := newMocks(peer.TxValidationCode_MVCC_READ_CONFLICT, 101, nil)
4647
finder := NewFinder(provider, newTestNotifier(nil))
4748

4849
finder.TransactionStatus(context.Background(), "CHANNEL", "TX_ID")
4950

50-
require.Equal(t, 1, provider.TransactionStatusCallCount())
51+
require.Equal(t, 1, provider.LedgerCallCount())
5152

52-
actual, _ := provider.TransactionStatusArgsForCall(0)
53+
actual := provider.LedgerArgsForCall(0)
5354
require.Equal(t, "CHANNEL", actual)
5455
})
5556

56-
t.Run("passes transaction ID to query provider", func(t *testing.T) {
57-
provider := &mocks.QueryProvider{}
58-
provider.TransactionStatusReturns(peer.TxValidationCode_MVCC_READ_CONFLICT, 101, nil)
57+
t.Run("passes transaction ID to ledger", func(t *testing.T) {
58+
provider, ledger := newMocks(peer.TxValidationCode_MVCC_READ_CONFLICT, 101, nil)
5959
finder := NewFinder(provider, newTestNotifier(nil))
6060

6161
finder.TransactionStatus(context.Background(), "CHANNEL", "TX_ID")
6262

63-
require.Equal(t, 1, provider.TransactionStatusCallCount())
63+
require.Equal(t, 1, ledger.GetTxValidationCodeByTxIDCallCount())
6464

65-
_, actual := provider.TransactionStatusArgsForCall(0)
65+
actual := ledger.GetTxValidationCodeByTxIDArgsForCall(0)
6666
require.Equal(t, "TX_ID", actual)
6767
})
6868

6969
t.Run("returns previously committed transaction status", func(t *testing.T) {
70-
provider := &mocks.QueryProvider{}
71-
provider.TransactionStatusReturns(peer.TxValidationCode_MVCC_READ_CONFLICT, 101, nil)
70+
provider, _ := newMocks(peer.TxValidationCode_MVCC_READ_CONFLICT, 101, nil)
7271
finder := NewFinder(provider, newTestNotifier(nil))
7372

7473
actual, err := finder.TransactionStatus(context.Background(), "CHANNEL", "TX_ID")
@@ -83,8 +82,7 @@ func TestFinder(t *testing.T) {
8382
})
8483

8584
t.Run("returns notified transaction status when no previous commit", func(t *testing.T) {
86-
provider := &mocks.QueryProvider{}
87-
provider.TransactionStatusReturns(0, 0, errors.New("NOT_FOUND"))
85+
provider, _ := newMocks(peer.TxValidationCode_MVCC_READ_CONFLICT, 101, nil)
8886
commitSend := make(chan *ledger.CommitNotification)
8987
finder := NewFinder(provider, newTestNotifier(commitSend))
9088

@@ -111,34 +109,16 @@ func TestFinder(t *testing.T) {
111109
})
112110

113111
t.Run("returns error from notifier", func(t *testing.T) {
114-
provider := &mocks.QueryProvider{}
115-
provider.TransactionStatusReturns(0, 0, errors.New("NOT_FOUND"))
116-
supplier := &mocks.NotificationSupplier{}
117-
supplier.CommitNotificationsReturns(nil, errors.New("MY_ERROR"))
118-
finder := NewFinder(provider, NewNotifier(supplier))
112+
provider, ledger := newMocks(0, 0, errors.New("NOT_FOUND"))
113+
ledger.CommitNotificationsChannelReturns(nil, errors.New("MY_ERROR"))
114+
finder := NewFinder(provider, NewNotifier(provider))
119115

120116
_, err := finder.TransactionStatus(context.Background(), "CHANNEL", "TX_ID")
121117
require.ErrorContains(t, err, "MY_ERROR")
122118
})
123119

124-
t.Run("passes channel name to supplier", func(t *testing.T) {
125-
provider := &mocks.QueryProvider{}
126-
provider.TransactionStatusReturns(0, 0, errors.New("NOT_FOUND"))
127-
supplier := &mocks.NotificationSupplier{}
128-
supplier.CommitNotificationsReturns(nil, errors.New("MY_ERROR"))
129-
finder := NewFinder(provider, NewNotifier(supplier))
130-
131-
finder.TransactionStatus(context.Background(), "CHANNEL", "TX_ID")
132-
133-
require.Equal(t, 1, supplier.CommitNotificationsCallCount())
134-
135-
_, actual := supplier.CommitNotificationsArgsForCall(0)
136-
require.Equal(t, "CHANNEL", actual)
137-
})
138-
139120
t.Run("returns context error when context cancelled", func(t *testing.T) {
140-
provider := &mocks.QueryProvider{}
141-
provider.TransactionStatusReturns(0, 0, errors.New("NOT_FOUND"))
121+
provider, _ := newMocks(0, 0, errors.New("NOT_FOUND"))
142122
finder := NewFinder(provider, newTestNotifier(nil))
143123

144124
ctx, cancel := context.WithCancel(context.Background())
@@ -149,8 +129,7 @@ func TestFinder(t *testing.T) {
149129
})
150130

151131
t.Run("returns error when notification supplier fails", func(t *testing.T) {
152-
provider := &mocks.QueryProvider{}
153-
provider.TransactionStatusReturns(0, 0, errors.New("NOT_FOUND"))
132+
provider, _ := newMocks(0, 0, errors.New("NOT_FOUND"))
154133
commitSend := make(chan *ledger.CommitNotification)
155134
close(commitSend)
156135
finder := NewFinder(provider, newTestNotifier(commitSend))

0 commit comments

Comments
 (0)