Skip to content

Commit 0e03ca9

Browse files
authored
refactor(experimental): add cluster level subscriptions API for library
This change adds the cluster-level APIs to the main library's subscriptions.
1 parent cbf8f38 commit 0e03ca9

File tree

5 files changed

+213
-30
lines changed

5 files changed

+213
-30
lines changed
Lines changed: 164 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,176 @@
11
import { SolanaRpcSubscriptions, SolanaRpcSubscriptionsUnstable } from '@solana/rpc-core';
2-
import { createJsonSubscriptionRpc } from '@solana/rpc-transport';
3-
import type { RpcSubscriptions } from '@solana/rpc-types';
2+
import {
3+
createWebSocketTransport,
4+
IRpcWebSocketTransport,
5+
IRpcWebSocketTransportDevnet,
6+
IRpcWebSocketTransportMainnet,
7+
IRpcWebSocketTransportTestnet,
8+
RpcSubscriptionsDevnet,
9+
RpcSubscriptionsMainnet,
10+
RpcSubscriptionsTestnet,
11+
} from '@solana/rpc-transport';
12+
import { devnet, mainnet, RpcSubscriptions, testnet } from '@solana/rpc-types';
413

514
import { createSolanaRpcSubscriptions, createSolanaRpcSubscriptions_UNSTABLE } from '../rpc';
15+
import { createDefaultRpcSubscriptionsTransport } from '../rpc-websocket-transport';
616

7-
const config = null as unknown as Omit<Parameters<typeof createJsonSubscriptionRpc>[0], 'api'>;
17+
// Creating default websocket transports
818

9-
createSolanaRpcSubscriptions(config) satisfies RpcSubscriptions<SolanaRpcSubscriptions>;
19+
const genericUrl = 'http://localhost:8899';
20+
const devnetUrl = devnet('https://api.devnet.solana.com');
21+
const testnetUrl = testnet('https://api.testnet.solana.com');
22+
const mainnetUrl = mainnet('https://api.mainnet-beta.solana.com');
23+
24+
// No cluster specified should be generic `IRpcWebSocketTransport`
25+
createDefaultRpcSubscriptionsTransport({ url: genericUrl }) satisfies IRpcWebSocketTransport;
26+
//@ts-expect-error Should not be a devnet transport
27+
createDefaultRpcSubscriptionsTransport({ url: genericUrl }) satisfies IRpcWebSocketTransportDevnet;
28+
//@ts-expect-error Should not be a testnet transport
29+
createDefaultRpcSubscriptionsTransport({ url: genericUrl }) satisfies IRpcWebSocketTransportTestnet;
30+
//@ts-expect-error Should not be a mainnet transport
31+
createDefaultRpcSubscriptionsTransport({ url: genericUrl }) satisfies IRpcWebSocketTransportMainnet;
32+
33+
// Devnet cluster should be `IRpcWebSocketTransportDevnet`
34+
createDefaultRpcSubscriptionsTransport({ url: devnetUrl }) satisfies IRpcWebSocketTransportDevnet;
35+
//@ts-expect-error Should not be a testnet transport
36+
createDefaultRpcSubscriptionsTransport({ url: devnetUrl }) satisfies IRpcWebSocketTransportTestnet;
37+
//@ts-expect-error Should not be a mainnet transport
38+
createDefaultRpcSubscriptionsTransport({ url: devnetUrl }) satisfies IRpcWebSocketTransportMainnet;
39+
40+
// Testnet cluster should be `IRpcWebSocketTransportTestnet`
41+
createDefaultRpcSubscriptionsTransport({ url: testnetUrl }) satisfies IRpcWebSocketTransportTestnet;
42+
//@ts-expect-error Should not be a devnet transport
43+
createDefaultRpcSubscriptionsTransport({ url: testnetUrl }) satisfies IRpcWebSocketTransportDevnet;
44+
//@ts-expect-error Should not be a mainnet transport
45+
createDefaultRpcSubscriptionsTransport({ url: testnetUrl }) satisfies IRpcWebSocketTransportMainnet;
46+
47+
// Mainnet cluster should be `IRpcWebSocketTransportMainnet`
48+
createDefaultRpcSubscriptionsTransport({ url: mainnetUrl }) satisfies IRpcWebSocketTransportMainnet;
49+
//@ts-expect-error Should not be a devnet transport
50+
createDefaultRpcSubscriptionsTransport({ url: mainnetUrl }) satisfies IRpcWebSocketTransportDevnet;
51+
//@ts-expect-error Should not be a testnet transport
52+
createDefaultRpcSubscriptionsTransport({ url: mainnetUrl }) satisfies IRpcWebSocketTransportTestnet;
53+
54+
// Creating JSON Subscription RPC clients
55+
56+
const genericWebSocketTransport = createWebSocketTransport({
57+
sendBufferHighWatermark: 0,
58+
url: genericUrl,
59+
});
60+
const devnetWebSocketTransport = createWebSocketTransport({
61+
sendBufferHighWatermark: 0,
62+
url: devnetUrl,
63+
});
64+
const testnetWebSocketTransport = createWebSocketTransport({
65+
sendBufferHighWatermark: 0,
66+
url: testnetUrl,
67+
});
68+
const mainnetWebSocketTransport = createWebSocketTransport({
69+
sendBufferHighWatermark: 0,
70+
url: mainnetUrl,
71+
});
72+
73+
// Checking stable vs unstable subscriptions
74+
75+
createSolanaRpcSubscriptions({
76+
transport: genericWebSocketTransport,
77+
}) satisfies RpcSubscriptions<SolanaRpcSubscriptions>;
1078
// @ts-expect-error Should not have unstable subscriptions
1179
createSolanaRpcSubscriptions(config) satisfies RpcSubscriptions<
1280
SolanaRpcSubscriptions & SolanaRpcSubscriptionsUnstable
1381
>;
1482

15-
createSolanaRpcSubscriptions_UNSTABLE(config) satisfies RpcSubscriptions<
83+
createSolanaRpcSubscriptions_UNSTABLE({ transport: genericWebSocketTransport }) satisfies RpcSubscriptions<
84+
SolanaRpcSubscriptions & SolanaRpcSubscriptionsUnstable
85+
>;
86+
87+
// Checking cluster-level subscriptions API
88+
89+
// No cluster specified should be generic `RpcSubscriptions`
90+
createSolanaRpcSubscriptions({
91+
transport: genericWebSocketTransport,
92+
}) satisfies RpcSubscriptions<SolanaRpcSubscriptions>;
93+
createSolanaRpcSubscriptions({
94+
transport: genericWebSocketTransport,
95+
//@ts-expect-error Should not be a devnet RPC
96+
}) satisfies RpcSubscriptionsDevnet<SolanaRpcSubscriptions>;
97+
createSolanaRpcSubscriptions({
98+
transport: genericWebSocketTransport,
99+
//@ts-expect-error Should not be a testnet RPC
100+
}) satisfies RpcSubscriptionsTestnet<SolanaRpcSubscriptions>;
101+
createSolanaRpcSubscriptions({
102+
transport: genericWebSocketTransport,
103+
//@ts-expect-error Should not be a mainnet RPC
104+
}) satisfies RpcSubscriptionsMainnet<SolanaRpcSubscriptions>;
105+
106+
// Devnet cluster should be `RpcSubscriptionsDevnet`
107+
createSolanaRpcSubscriptions({
108+
transport: devnetWebSocketTransport,
109+
}) satisfies RpcSubscriptions<SolanaRpcSubscriptions>;
110+
createSolanaRpcSubscriptions({
111+
transport: devnetWebSocketTransport,
112+
}) satisfies RpcSubscriptionsDevnet<SolanaRpcSubscriptions>;
113+
createSolanaRpcSubscriptions({
114+
transport: devnetWebSocketTransport,
115+
//@ts-expect-error Should not be a testnet RPC
116+
}) satisfies RpcSubscriptionsTestnet<SolanaRpcSubscriptions>;
117+
createSolanaRpcSubscriptions({
118+
transport: devnetWebSocketTransport,
119+
//@ts-expect-error Should not be a mainnet RPC
120+
}) satisfies RpcSubscriptionsMainnet<SolanaRpcSubscriptions>;
121+
//@ts-expect-error Should not have unstable subscriptions
122+
createSolanaRpcSubscriptions({ transport: devnetWebSocketTransport }) satisfies RpcSubscriptionsDevnet<
123+
SolanaRpcSubscriptions & SolanaRpcSubscriptionsUnstable
124+
>;
125+
// Unstable methods present with proper initializer
126+
createSolanaRpcSubscriptions_UNSTABLE({ transport: devnetWebSocketTransport }) satisfies RpcSubscriptionsDevnet<
127+
SolanaRpcSubscriptions & SolanaRpcSubscriptionsUnstable
128+
>;
129+
130+
// Testnet cluster should be `RpcSubscriptionsTestnet`
131+
createSolanaRpcSubscriptions({
132+
transport: testnetWebSocketTransport,
133+
}) satisfies RpcSubscriptions<SolanaRpcSubscriptions>;
134+
createSolanaRpcSubscriptions({
135+
transport: testnetWebSocketTransport,
136+
}) satisfies RpcSubscriptionsTestnet<SolanaRpcSubscriptions>;
137+
createSolanaRpcSubscriptions({
138+
transport: testnetWebSocketTransport,
139+
//@ts-expect-error Should not be a devnet RPC
140+
}) satisfies RpcSubscriptionsDevnet<SolanaRpcSubscriptions>;
141+
createSolanaRpcSubscriptions({
142+
transport: testnetWebSocketTransport,
143+
//@ts-expect-error Should not be a mainnet RPC
144+
}) satisfies RpcSubscriptionsMainnet<SolanaRpcSubscriptions>;
145+
//@ts-expect-error Should not have unstable subscriptions
146+
createSolanaRpcSubscriptions({ transport: testnetWebSocketTransport }) satisfies RpcSubscriptionsTestnet<
147+
SolanaRpcSubscriptions & SolanaRpcSubscriptionsUnstable
148+
>;
149+
// Unstable methods present with proper initializer
150+
createSolanaRpcSubscriptions_UNSTABLE({ transport: testnetWebSocketTransport }) satisfies RpcSubscriptionsTestnet<
151+
SolanaRpcSubscriptions & SolanaRpcSubscriptionsUnstable
152+
>;
153+
154+
// Mainnet cluster should be `RpcSubscriptionsMainnet`
155+
createSolanaRpcSubscriptions({
156+
transport: mainnetWebSocketTransport,
157+
}) satisfies RpcSubscriptions<SolanaRpcSubscriptions>;
158+
createSolanaRpcSubscriptions({
159+
transport: mainnetWebSocketTransport,
160+
}) satisfies RpcSubscriptionsMainnet<SolanaRpcSubscriptions>;
161+
createSolanaRpcSubscriptions({
162+
transport: mainnetWebSocketTransport,
163+
//@ts-expect-error Should not be a devnet RPC
164+
}) satisfies RpcSubscriptionsDevnet<SolanaRpcSubscriptions>;
165+
createSolanaRpcSubscriptions({
166+
transport: mainnetWebSocketTransport,
167+
//@ts-expect-error Should not be a testnet RPC
168+
}) satisfies RpcSubscriptionsTestnet<SolanaRpcSubscriptions>;
169+
//@ts-expect-error Should not have unstable subscriptions
170+
createSolanaRpcSubscriptions({ transport: mainnetWebSocketTransport }) satisfies RpcSubscriptionsMainnet<
171+
SolanaRpcSubscriptions & SolanaRpcSubscriptionsUnstable
172+
>;
173+
// Unstable methods present with proper initializer
174+
createSolanaRpcSubscriptions_UNSTABLE({ transport: mainnetWebSocketTransport }) satisfies RpcSubscriptionsMainnet<
16175
SolanaRpcSubscriptions & SolanaRpcSubscriptionsUnstable
17176
>;

packages/library/src/rpc-websocket-autopinger.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,24 @@
11
import type { IRpcWebSocketTransport } from '@solana/rpc-transport';
22

3-
type Config = Readonly<{
3+
type Config<TTransport extends IRpcWebSocketTransport> = Readonly<{
44
intervalMs: number;
5-
transport: IRpcWebSocketTransport;
5+
transport: TTransport;
66
}>;
77

88
const PING_PAYLOAD = {
99
jsonrpc: '2.0',
1010
method: 'ping',
1111
} as const;
1212

13-
export function getWebSocketTransportWithAutoping({ intervalMs, transport }: Config): IRpcWebSocketTransport {
13+
export function getWebSocketTransportWithAutoping<TTransport extends IRpcWebSocketTransport>({
14+
intervalMs,
15+
transport,
16+
}: Config<TTransport>): TTransport {
1417
const pingableConnections = new Map<
1518
Awaited<ReturnType<IRpcWebSocketTransport>>,
1619
Awaited<ReturnType<IRpcWebSocketTransport>>
1720
>();
18-
return async (...args) => {
21+
return (async (...args) => {
1922
const connection = await transport(...args);
2023
let intervalId: number | undefined;
2124
function sendPing() {
@@ -72,5 +75,5 @@ export function getWebSocketTransportWithAutoping({ intervalMs, transport }: Con
7275
}
7376
}
7477
return pingableConnections.get(connection)!;
75-
};
78+
}) as TTransport;
7679
}

packages/library/src/rpc-websocket-connection-sharding.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,24 @@ import type { IRpcWebSocketTransport } from '@solana/rpc-transport';
22

33
import { getCachedAbortableIterableFactory } from './cached-abortable-iterable';
44

5-
type Config = Readonly<{
5+
type Config<TTransport extends IRpcWebSocketTransport> = Readonly<{
66
/**
77
* You might like to open more subscriptions per connection than your RPC provider allows for.
88
* Using the initial payload as input, return a shard key from this method to assign
99
* subscriptions to separate connections. One socket will be opened per shard key.
1010
*/
1111
getShard?: (payload: unknown) => string | symbol;
12-
transport: IRpcWebSocketTransport;
12+
transport: TTransport;
1313
}>;
1414

1515
const NULL_SHARD_CACHE_KEY = Symbol(
1616
__DEV__ ? 'Cache key to use when there is no connection sharding strategy' : undefined,
1717
);
1818

19-
export function getWebSocketTransportWithConnectionSharding({ getShard, transport }: Config): IRpcWebSocketTransport {
19+
export function getWebSocketTransportWithConnectionSharding<TTransport extends IRpcWebSocketTransport>({
20+
getShard,
21+
transport,
22+
}: Config<TTransport>): TTransport {
2023
return getCachedAbortableIterableFactory({
2124
getAbortSignalFromInputArgs: ({ signal }) => signal,
2225
getCacheEntryMissingError(shardKey) {
@@ -30,5 +33,5 @@ export function getWebSocketTransportWithConnectionSharding({ getShard, transpor
3033
...config,
3134
signal: abortSignal,
3235
}),
33-
});
36+
}) as TTransport;
3437
}

packages/library/src/rpc-websocket-transport.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
import { pipe } from '@solana/functional';
2-
import { createWebSocketTransport, type IRpcWebSocketTransport } from '@solana/rpc-transport';
2+
import { createWebSocketTransport } from '@solana/rpc-transport';
3+
import { ClusterUrl } from '@solana/rpc-types';
34

45
import { getWebSocketTransportWithAutoping } from './rpc-websocket-autopinger';
56
import { getWebSocketTransportWithConnectionSharding } from './rpc-websocket-connection-sharding';
67

7-
export function createDefaultRpcSubscriptionsTransport(
8-
config: Omit<Parameters<typeof createWebSocketTransport>[0], 'sendBufferHighWatermark'> & {
8+
type Config<TClusterUrl extends ClusterUrl> = Readonly<{
9+
url: TClusterUrl;
10+
}>;
11+
12+
export function createDefaultRpcSubscriptionsTransport<TClusterUrl extends ClusterUrl>(
13+
config: Config<TClusterUrl> & {
914
/**
1015
* You might like to open more subscriptions per connection than your RPC provider allows
1116
* for. Using the initial payload as input, return a shard key from this method to assign
@@ -15,10 +20,10 @@ export function createDefaultRpcSubscriptionsTransport(
1520
intervalMs?: number;
1621
sendBufferHighWatermark?: number;
1722
},
18-
): IRpcWebSocketTransport {
23+
) {
1924
const { getShard, intervalMs, ...rest } = config;
2025
return pipe(
21-
createWebSocketTransport({
26+
createWebSocketTransport<TClusterUrl>({
2227
...rest,
2328
sendBufferHighWatermark:
2429
config.sendBufferHighWatermark ??

packages/library/src/rpc.ts

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,16 @@ import {
77
SolanaRpcSubscriptions,
88
SolanaRpcSubscriptionsUnstable,
99
} from '@solana/rpc-core';
10-
import { createJsonRpc, createJsonSubscriptionRpc, IRpcTransport, type RpcFromTransport } from '@solana/rpc-transport';
11-
import { IRpcTransportWithCluster } from '@solana/rpc-transport/dist/types/transports/transport-types';
12-
import type { RpcSubscriptions } from '@solana/rpc-types';
10+
import {
11+
createJsonRpc,
12+
createJsonSubscriptionRpc,
13+
IRpcTransport,
14+
IRpcTransportWithCluster,
15+
IRpcWebSocketTransport,
16+
IRpcWebSocketTransportWithCluster,
17+
RpcFromTransport,
18+
RpcSubscriptionsFromTransport,
19+
} from '@solana/rpc-transport';
1320
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
1421
// @ts-ignore
1522
import fastStableStringify from 'fast-stable-stringify';
@@ -31,9 +38,13 @@ export function createSolanaRpc<TTransport extends IRpcTransport | IRpcTransport
3138
}) as RpcFromTransport<SolanaRpcMethodsFromTransport<TTransport>, TTransport>;
3239
}
3340

34-
export function createSolanaRpcSubscriptions(
35-
config: Omit<Parameters<typeof createJsonSubscriptionRpc>[0], 'api'>,
36-
): RpcSubscriptions<SolanaRpcSubscriptions> {
41+
type RpcSubscriptionsConfig<TTransport extends IRpcWebSocketTransport | IRpcWebSocketTransportWithCluster> = Readonly<{
42+
transport: TTransport;
43+
}>;
44+
45+
export function createSolanaRpcSubscriptions<
46+
TTransport extends IRpcWebSocketTransport | IRpcWebSocketTransportWithCluster,
47+
>(config: RpcSubscriptionsConfig<TTransport>): RpcSubscriptionsFromTransport<SolanaRpcSubscriptions, TTransport> {
3748
return pipe(
3849
createJsonSubscriptionRpc({
3950
...config,
@@ -44,14 +55,16 @@ export function createSolanaRpcSubscriptions(
4455
getDeduplicationKey: (...args) => fastStableStringify(args),
4556
rpcSubscriptions,
4657
}),
47-
);
58+
) as RpcSubscriptionsFromTransport<SolanaRpcSubscriptions, TTransport>;
4859
}
4960

50-
export function createSolanaRpcSubscriptions_UNSTABLE(
51-
config: Omit<Parameters<typeof createJsonSubscriptionRpc>[0], 'api'>,
52-
): RpcSubscriptions<SolanaRpcSubscriptions & SolanaRpcSubscriptionsUnstable> {
61+
export function createSolanaRpcSubscriptions_UNSTABLE<
62+
TTransport extends IRpcWebSocketTransport | IRpcWebSocketTransportWithCluster,
63+
>(
64+
config: RpcSubscriptionsConfig<TTransport>,
65+
): RpcSubscriptionsFromTransport<SolanaRpcSubscriptions & SolanaRpcSubscriptionsUnstable, TTransport> {
5366
return createJsonSubscriptionRpc({
5467
...config,
5568
api: createSolanaRpcSubscriptionsApi_UNSTABLE(DEFAULT_RPC_CONFIG),
56-
});
69+
}) as RpcSubscriptionsFromTransport<SolanaRpcSubscriptions & SolanaRpcSubscriptionsUnstable, TTransport>;
5770
}

0 commit comments

Comments
 (0)