Skip to content

Commit da4f95b

Browse files
committed
add metering testing framework
1 parent d27ad93 commit da4f95b

File tree

22 files changed

+1847
-6
lines changed

22 files changed

+1847
-6
lines changed

cmd/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
dauthsecret "github.com/streamingfast/dauth/secret"
1717
dauthtrust "github.com/streamingfast/dauth/trust"
1818
"github.com/streamingfast/dmetering"
19+
dmeteringfile "github.com/streamingfast/dmetering/file"
1920
dmeteringgrpc "github.com/streamingfast/dmetering/grpc"
2021
dmeteringlogger "github.com/streamingfast/dmetering/logger"
2122
firecore "github.com/streamingfast/firehose-core"
@@ -43,6 +44,7 @@ func Main[B firecore.Block](chain *firecore.Chain[B]) {
4344
dmetering.RegisterNull()
4445
dmeteringgrpc.Register()
4546
dmeteringlogger.Register()
47+
dmeteringfile.Register()
4648
paymentGatewayMetering.Register()
4749

4850
chain.Validate()

devel/standard/standard.yaml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ start:
44
- merger
55
- relayer
66
- firehose
7-
- substreams-tier1
8-
- substreams-tier2
97
flags:
108
advertise-block-id-encoding: "hex"
119
advertise-chain-name: "acme-dummy-blockchain"

firehose/info/endpoint_info.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ func (s *InfoServer) getBlockFromMergedBlocksStore(ctx context.Context, blockNum
108108
time.Sleep(time.Millisecond * 500)
109109
continue
110110
}
111+
111112
return block
112113
}
113114
}

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ require (
2424
github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1
2525
github.com/streamingfast/dgrpc v0.0.0-20240423143010-f36784700c9a
2626
github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4
27-
github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951
27+
github.com/streamingfast/dmetering v0.0.0-20241007182823-f92200a54cdb
2828
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545
2929
github.com/streamingfast/dstore v0.1.1-0.20240826190906-91345d4a31f2
3030
github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0
@@ -175,7 +175,7 @@ require (
175175
go.uber.org/automaxprocs v1.5.1
176176
golang.org/x/crypto v0.23.0 // indirect
177177
golang.org/x/mod v0.17.0 // indirect
178-
golang.org/x/net v0.23.0 // indirect
178+
golang.org/x/net v0.23.0
179179
golang.org/x/oauth2 v0.18.0
180180
golang.org/x/sync v0.8.0 // indirect
181181
golang.org/x/sys v0.24.0 // indirect

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -551,8 +551,8 @@ github.com/streamingfast/dgrpc v0.0.0-20240423143010-f36784700c9a h1:JwAGZ7f5vkB
551551
github.com/streamingfast/dgrpc v0.0.0-20240423143010-f36784700c9a/go.mod h1:EPtUX/vhRphE37Zo6sDcgD/S3sm5YqXHhxAgzS6Ebwo=
552552
github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4 h1:HKi8AIkLBzxZWmbCRUo1RxoOLK33iXO6gZprfsE9rf4=
553553
github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4/go.mod h1:ehPytv7E4rI65iLcrwTes4rNGGqPPiugnH+20nDQyp4=
554-
github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951 h1:6o6MS3JHrp9A7V6EBHbR7W7mzVCFmXc8U0AjTfvz7PI=
555-
github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI=
554+
github.com/streamingfast/dmetering v0.0.0-20241007182823-f92200a54cdb h1:SooWpzSSU04Z321lLWS6OkTAgoXH0qQEv5mVUi4b+q8=
555+
github.com/streamingfast/dmetering v0.0.0-20241007182823-f92200a54cdb/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI=
556556
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545 h1:SUl04bZKGAv207lp7/6CHOJIRpjUKunwItrno3K463Y=
557557
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545/go.mod h1:JbxEDbzWRG1dHdNIPrYfuPllEkktZMgm40AwVIBENcw=
558558
github.com/streamingfast/dstore v0.1.1-0.20240826190906-91345d4a31f2 h1:BB3VSDl8/OHBSvjqfgufwqr4tD5l7XPjXybDm6uudj4=

test/integration_test.go

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
package test
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"os"
7+
"os/exec"
8+
"path/filepath"
9+
"testing"
10+
"time"
11+
12+
_ "github.com/streamingfast/dmetering/file"
13+
"github.com/streamingfast/substreams/client"
14+
"github.com/streamingfast/substreams/manifest"
15+
pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
16+
"github.com/stretchr/testify/require"
17+
"golang.org/x/net/context"
18+
)
19+
20+
type Case struct {
21+
name string
22+
spkgRootPath string
23+
moduleName string
24+
startBlock uint64
25+
// set endBlock to 0 to connect live
26+
endBlock uint64
27+
expectedReadBytes float64
28+
}
29+
30+
func TestIntegration(t *testing.T) {
31+
if os.Getenv("RUN_INTEGRATION_TESTS") != "true" {
32+
t.Skip()
33+
}
34+
35+
cases := []Case{
36+
{
37+
name: "sunny path",
38+
spkgRootPath: "./substreams_acme/substreams-acme-v0.1.0.spkg",
39+
moduleName: "map_test_data",
40+
startBlock: 0,
41+
endBlock: 1000,
42+
expectedReadBytes: 696050,
43+
},
44+
45+
{
46+
name: "sunny path",
47+
spkgRootPath: "./substreams_acme/substreams-acme-v0.1.0.spkg",
48+
moduleName: "map_test_data",
49+
startBlock: 0,
50+
endBlock: 0,
51+
expectedReadBytes: 696050,
52+
},
53+
}
54+
55+
ctx := context.Background()
56+
57+
rootPath, err := filepath.Abs("../")
58+
if err != nil {
59+
t.Fatalf("getting absolute path: %v", err)
60+
}
61+
62+
go func() {
63+
err = runTier1(ctx, t, rootPath)
64+
require.NoError(t, err)
65+
}()
66+
67+
go func() {
68+
err = runTier2(ctx, t, rootPath)
69+
require.NoError(t, err)
70+
}()
71+
72+
var meteringServer *MeteringTestServer
73+
go func() {
74+
meteringServer = NewMeteringServer(t, ":10016")
75+
meteringServer.Run()
76+
}()
77+
78+
clientConfig := client.NewSubstreamsClientConfig("localhost:9003", "", 0, false, true)
79+
substreamsClient, _, _, _, err := client.NewSubstreamsClient(clientConfig)
80+
require.NoError(t, err)
81+
82+
// WAIT SERVERS TO BE READY
83+
time.Sleep(15 * time.Second)
84+
85+
for _, c := range cases {
86+
t.Run(c.name, func(t *testing.T) {
87+
if c.endBlock == 0 {
88+
// RUN LIVE
89+
go func() {
90+
err = runDummyNode(ctx, t)
91+
require.NoError(t, err)
92+
}()
93+
}
94+
95+
err = requestTier1(ctx, t, c, substreamsClient)
96+
require.NoError(t, err)
97+
98+
resultEvents := meteringServer.bufferedEvents
99+
var totalReadBytes float64
100+
for _, events := range resultEvents {
101+
for _, event := range events.Events {
102+
for _, metric := range event.Metrics {
103+
// TODO : CHOOSE THE RIGHT METRIC
104+
if metric.Key == "file_uncompressed_read_bytes" {
105+
totalReadBytes += metric.Value
106+
}
107+
}
108+
}
109+
}
110+
111+
require.Equal(t, c.expectedReadBytes, totalReadBytes)
112+
meteringServer.clearBufferedEvents()
113+
})
114+
}
115+
}
116+
117+
func runTier1(ctx context.Context, t *testing.T, rootDir string) error {
118+
cmdPath := filepath.Join(rootDir, "/cmd/firecore")
119+
firehoseDataStoragePath := filepath.Join(rootDir, "devel/standard/firehose-data/storage/")
120+
mergedBlocksStore := fmt.Sprintf("file://%s", filepath.Join(firehoseDataStoragePath, "merged-blocks"))
121+
forkedBlocksStore := fmt.Sprintf("file://%s", filepath.Join(firehoseDataStoragePath, "forked-blocks"))
122+
oneBlocksStore := fmt.Sprintf("file://%s", filepath.Join(firehoseDataStoragePath, "one-blocks"))
123+
124+
tier1Args := []string{
125+
"run",
126+
cmdPath,
127+
"start", "substreams-tier1",
128+
"--config-file=",
129+
"--log-to-file=false",
130+
"--common-auth-plugin=null://",
131+
fmt.Sprintf("--common-tmp-dir=%s", t.TempDir()),
132+
fmt.Sprintf("--common-metering-plugin=grpc://localhost:10016?network=dummy_blockchain"),
133+
"--common-system-shutdown-signal-delay=30s",
134+
fmt.Sprintf("--common-merged-blocks-store-url=%s", mergedBlocksStore),
135+
fmt.Sprintf("--common-one-block-store-url=%s", oneBlocksStore),
136+
fmt.Sprintf("--common-forked-blocks-store-url=%s", forkedBlocksStore),
137+
"--common-live-blocks-addr=localhost:10014",
138+
"--common-first-streamable-block=0",
139+
"--substreams-tier1-grpc-listen-addr=:9003",
140+
"--substreams-tier1-subrequests-endpoint=localhost:9004",
141+
"--substreams-tier1-subrequests-insecure=false",
142+
"--substreams-tier1-subrequests-plaintext=true",
143+
fmt.Sprintf("--substreams-state-store-url=%s/substreams_dummy", t.TempDir()),
144+
"--substreams-state-store-default-tag=vtestdummy",
145+
}
146+
147+
tier1Cmd := exec.CommandContext(ctx, "go", tier1Args...)
148+
149+
err := handlingTestInstance(t, tier1Cmd, "TIER1", true)
150+
if err != nil {
151+
return fmt.Errorf("handling instance %w", err)
152+
}
153+
154+
return err
155+
}
156+
157+
func runTier2(ctx context.Context, t *testing.T, rootDir string) error {
158+
cmdPath := rootDir + "/cmd/firecore"
159+
tier2Args := []string{
160+
"run",
161+
cmdPath,
162+
"start", "substreams-tier2",
163+
"--config-file=",
164+
"--log-to-file=false",
165+
fmt.Sprintf("--common-tmp-dir=%s", t.TempDir()),
166+
"--substreams-tier2-grpc-listen-addr=:9004",
167+
"--substreams-tier1-subrequests-plaintext=true",
168+
"--substreams-tier1-subrequests-insecure=false",
169+
}
170+
171+
tier2Cmd := exec.CommandContext(ctx, "go", tier2Args...)
172+
173+
err := handlingTestInstance(t, tier2Cmd, "TIER2", true)
174+
if err != nil {
175+
return fmt.Errorf("handling instance %w", err)
176+
}
177+
178+
return err
179+
}
180+
181+
func runDummyNode(ctx context.Context, t *testing.T) error {
182+
launchDummyCmd := exec.CommandContext(ctx, "../devel/standard/start.sh")
183+
184+
err := handlingTestInstance(t, launchDummyCmd, "DUMMY_BLOCKCHAIN", true)
185+
if err != nil {
186+
return fmt.Errorf("handling instance %w", err)
187+
}
188+
189+
return err
190+
}
191+
192+
func requestTier1(ctx context.Context, t *testing.T, testCase Case, substreamsClient pbsubstreamsrpc.StreamClient) error {
193+
manifestReader, err := manifest.NewReader(testCase.spkgRootPath)
194+
require.NoError(t, err)
195+
196+
pkgBundle, err := manifestReader.Read()
197+
require.NoError(t, err)
198+
199+
require.NotEmptyf(t, pkgBundle, "pkgBundle is empty")
200+
201+
request := pbsubstreamsrpc.Request{
202+
StartBlockNum: int64(testCase.startBlock),
203+
StartCursor: "",
204+
StopBlockNum: testCase.endBlock,
205+
FinalBlocksOnly: false,
206+
ProductionMode: false,
207+
OutputModule: testCase.moduleName,
208+
Modules: pkgBundle.Package.Modules,
209+
DebugInitialStoreSnapshotForModules: nil,
210+
NoopMode: false,
211+
}
212+
213+
stream, err := substreamsClient.Blocks(ctx, &request)
214+
require.NoError(t, err)
215+
216+
for {
217+
block, err := stream.Recv()
218+
if err == io.EOF {
219+
break
220+
}
221+
require.NoError(t, err)
222+
223+
t.Logf("[REQUESTER]: %v", block)
224+
}
225+
return nil
226+
}

test/metering_server.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package test
2+
3+
import (
4+
"context"
5+
"net"
6+
"testing"
7+
8+
"github.com/test-go/testify/require"
9+
10+
pbmetering "github.com/streamingfast/dmetering/pb/sf/metering/v1"
11+
"google.golang.org/grpc"
12+
"google.golang.org/protobuf/types/known/emptypb"
13+
)
14+
15+
type MeteringTestServer struct {
16+
pbmetering.UnimplementedMeteringServer
17+
httpListenAddr string
18+
t *testing.T
19+
bufferedEvents []*pbmetering.Events
20+
}
21+
22+
func NewMeteringServer(t *testing.T, httpListenAddr string) *MeteringTestServer {
23+
return &MeteringTestServer{
24+
t: t,
25+
httpListenAddr: httpListenAddr,
26+
bufferedEvents: make([]*pbmetering.Events, 0),
27+
}
28+
}
29+
30+
func (s *MeteringTestServer) Run() {
31+
lis, err := net.Listen("tcp", s.httpListenAddr)
32+
if err != nil {
33+
require.NoError(s.t, err)
34+
}
35+
36+
grpcServer := grpc.NewServer()
37+
38+
pbmetering.RegisterMeteringServer(grpcServer, s)
39+
40+
s.t.Logf("[Metering]: Server listening port %s", s.httpListenAddr)
41+
if err = grpcServer.Serve(lis); err != nil {
42+
require.NoError(s.t, err)
43+
}
44+
}
45+
46+
func (s *MeteringTestServer) Emit(ctx context.Context, events *pbmetering.Events) (*emptypb.Empty, error) {
47+
s.bufferedEvents = append(s.bufferedEvents, events)
48+
return &emptypb.Empty{}, nil
49+
}
50+
51+
func (s *MeteringTestServer) mustEmbedUnimplementedMeteringServer() {
52+
panic("implement me")
53+
}
54+
55+
func (s *MeteringTestServer) clearBufferedEvents() {
56+
s.bufferedEvents = make([]*pbmetering.Events, 0)
57+
}

test/substreams_acme/.gitignore

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# substreams auth file
2+
.substreams.env
3+
4+
# Compiled source files
5+
target/
6+
7+
# Sink data when running any sinker
8+
sink-data/
9+
10+
# The spkg packed by the subtreams cli
11+
*.spkg

0 commit comments

Comments
 (0)