Skip to content

Commit fbd7359

Browse files
samples: add samples for Cloud Storage ingestion, and a few small fixes (#1985)
* build: fix a small issue with the typeless bot invocation * samples: add samples for cloud storage ingestion * samples: fix a paste-o in Kinesis sample * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * samples: fix paste-o with incorrect sample function name * tests: fix sample test quotes --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 7019003 commit fbd7359

8 files changed

+297
-13
lines changed

README.md

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,15 @@ npm install @google-cloud/pubsub
6666

6767
```javascript
6868
// Imports the Google Cloud client library
69-
const { PubSub } = require("@google-cloud/pubsub");
69+
const {PubSub} = require('@google-cloud/pubsub');
7070

7171
async function quickstart(
72-
projectId = 'your-project-id', // Your Google Cloud Platform project ID
73-
topicNameOrId = 'my-topic', // Name for the new topic to create
74-
subscriptionName = 'my-sub' // Name for the new subscription to create
72+
projectId = 'your-project-id', // Your Google Cloud Platform project ID
73+
topicNameOrId = 'my-topic', // Name for the new topic to create
74+
subscriptionName = 'my-sub' // Name for the new subscription to create
7575
) {
7676
// Instantiates a client
77-
const pubsub = new PubSub({ projectId });
77+
const pubsub = new PubSub({projectId});
7878

7979
// Creates a new topic
8080
const [topic] = await pubsub.createTopic(topicNameOrId);
@@ -84,19 +84,19 @@ subscriptionName = 'my-sub' // Name for the new subscription to create
8484
const [subscription] = await topic.createSubscription(subscriptionName);
8585

8686
// Receive callbacks for new messages on the subscription
87-
subscription.on('message', (message) => {
87+
subscription.on('message', message => {
8888
console.log('Received message:', message.data.toString());
8989
process.exit(0);
9090
});
9191

9292
// Receive callbacks for errors on the subscription
93-
subscription.on('error', (error) => {
93+
subscription.on('error', error => {
9494
console.error('Received error:', error);
9595
process.exit(1);
9696
});
9797

9898
// Send a message to the topic
99-
topic.publishMessage({ data: Buffer.from('Test message!') });
99+
topic.publishMessage({data: Buffer.from('Test message!')});
100100
}
101101

102102
```
@@ -138,6 +138,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-pubsub/tree
138138
| Create Subscription with ordering enabled | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithOrdering.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithOrdering.js,samples/README.md) |
139139
| Create Subscription With Retry Policy | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithRetryPolicy.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithRetryPolicy.js,samples/README.md) |
140140
| Create Topic | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopic.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopic.js,samples/README.md) |
141+
| Create Topic With Cloud Storage Ingestion | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopicWithCloudStorageIngestion.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopicWithCloudStorageIngestion.js,samples/README.md) |
141142
| Create Topic With Kinesis Ingestion | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopicWithKinesisIngestion.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopicWithKinesisIngestion.js,samples/README.md) |
142143
| Create Topic With Schema | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopicWithSchema.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopicWithSchema.js,samples/README.md) |
143144
| Create Topic With Schema Revisions | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopicWithSchemaRevisions.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopicWithSchemaRevisions.js,samples/README.md) |

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
"prelint": "cd samples; npm link ../; npm install",
4747
"precompile": "gts clean",
4848
"typeless": "npx typeless-sample-bot --outputpath samples --targets samples --recursive",
49-
"posttypeless": "cd samples && npm i && cd .. && npx eslint --ignore-pattern owl-bot-staging --fix"
49+
"posttypeless": "cd samples && npm i && cd .. && npx eslint --ignore-pattern owl-bot-staging --fix samples"
5050
},
5151
"dependencies": {
5252
"@google-cloud/paginator": "^5.0.0",

samples/README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ guides.
3535
* [Create Subscription with ordering enabled](#create-subscription-with-ordering-enabled)
3636
* [Create Subscription With Retry Policy](#create-subscription-with-retry-policy)
3737
* [Create Topic](#create-topic)
38+
* [Create Topic With Cloud Storage Ingestion](#create-topic-with-cloud-storage-ingestion)
3839
* [Create Topic With Kinesis Ingestion](#create-topic-with-kinesis-ingestion)
3940
* [Create Topic With Schema](#create-topic-with-schema)
4041
* [Create Topic With Schema Revisions](#create-topic-with-schema-revisions)
@@ -389,6 +390,25 @@ __Usage:__
389390

390391

391392

393+
### Create Topic With Cloud Storage Ingestion
394+
395+
Creates a new topic, with Cloud Storage ingestion enabled.
396+
397+
View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopicWithCloudStorageIngestion.js).
398+
399+
[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopicWithCloudStorageIngestion.js,samples/README.md)
400+
401+
__Usage:__
402+
403+
404+
`node createTopicWithCloudStorageIngestion.js <topic-name> <bucket> <input-format> <text-delimiter> <match-glob> <minimum-object-creation-time>`
405+
406+
407+
-----
408+
409+
410+
411+
392412
### Create Topic With Kinesis Ingestion
393413

394414
Creates a new topic, with Kinesis ingestion enabled.
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
// Copyright 2024 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// This is a generated sample, using the typeless sample bot. Please
16+
// look for the source TypeScript sample (.ts) for modifications.
17+
'use strict';
18+
19+
/**
20+
* This sample demonstrates how to perform basic operations on topics with
21+
* the Google Cloud Pub/Sub API.
22+
*
23+
* For more information, see the README.md under /pubsub and the documentation
24+
* at https://cloud.google.com/pubsub/docs.
25+
*/
26+
27+
// sample-metadata:
28+
// title: Create Topic With Cloud Storage Ingestion
29+
// description: Creates a new topic, with Cloud Storage ingestion enabled.
30+
// usage: node createTopicWithCloudStorageIngestion.js <topic-name> <bucket> <input-format> <text-delimiter> <match-glob> <minimum-object-creation-time>
31+
32+
// [START pubsub_create_topic_with_cloud_storage_ingestion]
33+
/**
34+
* TODO(developer): Uncomment these variables before running the sample.
35+
*/
36+
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
37+
// const bucket = 'YOUR_BUCKET_NAME';
38+
// const inputFormat = 'text';
39+
// const textDelimiter = '\n';
40+
// const matchGlob = '**.txt';
41+
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;
42+
43+
// Imports the Google Cloud client library
44+
const {PubSub} = require('@google-cloud/pubsub');
45+
46+
// Creates a client; cache this for further use
47+
const pubSubClient = new PubSub();
48+
49+
async function createTopicWithCloudStorageIngestion(
50+
topicNameOrId,
51+
bucket,
52+
inputFormat,
53+
textDelimiter,
54+
matchGlob,
55+
minimumObjectCreateTime
56+
) {
57+
const minimumDate = Date.parse(minimumObjectCreateTime);
58+
const topicMetadata = {
59+
name: topicNameOrId,
60+
ingestionDataSourceSettings: {
61+
cloudStorage: {
62+
bucket,
63+
minimumObjectCreateTime: {
64+
seconds: minimumDate / 1000,
65+
nanos: (minimumDate % 1000) * 1000,
66+
},
67+
matchGlob,
68+
},
69+
},
70+
};
71+
72+
// Make a format appropriately.
73+
switch (inputFormat) {
74+
case 'text':
75+
topicMetadata.ingestionDataSourceSettings.cloudStorage.textFormat = {
76+
delimiter: textDelimiter,
77+
};
78+
break;
79+
case 'avro':
80+
topicMetadata.ingestionDataSourceSettings.cloudStorage.avroFormat = {};
81+
break;
82+
case 'pubsub_avro':
83+
topicMetadata.ingestionDataSourceSettings.cloudStorage.pubsubAvroFormat =
84+
{};
85+
break;
86+
default:
87+
console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
88+
return;
89+
}
90+
91+
// Creates a new topic with Cloud Storage ingestion.
92+
await pubSubClient.createTopic(topicMetadata);
93+
console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
94+
}
95+
// [END pubsub_create_topic_with_cloud_storage_ingestion]
96+
97+
function main(
98+
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
99+
bucket = 'YOUR_BUCKET_NAME',
100+
inputFormat = 'text',
101+
textDelimiter = '\n',
102+
matchGlob = '**.txt',
103+
minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ'
104+
) {
105+
createTopicWithCloudStorageIngestion(
106+
topicNameOrId,
107+
bucket,
108+
inputFormat,
109+
textDelimiter,
110+
matchGlob,
111+
minimumObjectCreateTime
112+
).catch(err => {
113+
console.error(err.message);
114+
process.exitCode = 1;
115+
});
116+
}
117+
118+
main(...process.argv.slice(2));

samples/createTopicWithKinesisIngestion.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,7 @@ async function createTopicWithKinesisIngestion(
5252
streamArn,
5353
consumerArn
5454
) {
55-
// Creates a new topic with a schema. Note that you might also
56-
// pass Encodings.Json or Encodings.Binary here.
55+
// Creates a new topic with Kinesis ingestion.
5756
await pubSubClient.createTopic({
5857
name: topicNameOrId,
5958
ingestionDataSourceSettings: {

samples/system-test/topics.test.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
import {Message, PubSub, Topic, Subscription} from '@google-cloud/pubsub';
16+
import {Bucket, Storage} from '@google-cloud/storage';
1617
import {assert} from 'chai';
1718
import {describe, it, after} from 'mocha';
1819
import {execSync, commandFor} from './common';
@@ -52,6 +53,17 @@ describe('topics', () => {
5253
return {t: topic, tname, s: sub};
5354
}
5455

56+
async function createStorageBucket(testName: string): Promise<Bucket> {
57+
const storage = new Storage({
58+
projectId,
59+
});
60+
61+
const name = resources.generateStorageName(testName);
62+
63+
const [bucket] = await storage.createBucket(name);
64+
return bucket;
65+
}
66+
5567
async function cleanSubs() {
5668
const [subscriptions] = await pubsub.getSubscriptions();
5769
await Promise.all(
@@ -121,6 +133,27 @@ describe('topics', () => {
121133
assert.ok(exists, 'Topic was created');
122134
});
123135

136+
it('should create a topic with cloud storage ingestion', async () => {
137+
const testId = 'create-gcs-ingestion';
138+
const name = topicName(testId);
139+
const bucket = await createStorageBucket(testId);
140+
const bucketName = bucket.name;
141+
142+
try {
143+
const output = execSync(
144+
`${commandFor('createTopicWithCloudStorageIngestion')} ${name} ${
145+
bucketName
146+
} text '\n' '**.txt' '2024-10-10T00:00:00Z'`
147+
);
148+
assert.include(output, `Topic ${name} created with Cloud Storage ingestion.`);
149+
const [topics] = await pubsub.getTopics();
150+
const exists = topics.some(t => t.name === fullTopicName(name));
151+
assert.ok(exists, 'Topic was created');
152+
} finally {
153+
await bucket.delete();
154+
}
155+
});
156+
124157
it('should update a topic with kinesis integration', async () => {
125158
const pair = await createPair('update-kinesis');
126159
const output = execSync(
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
// Copyright 2024 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/**
16+
* This sample demonstrates how to perform basic operations on topics with
17+
* the Google Cloud Pub/Sub API.
18+
*
19+
* For more information, see the README.md under /pubsub and the documentation
20+
* at https://cloud.google.com/pubsub/docs.
21+
*/
22+
23+
// sample-metadata:
24+
// title: Create Topic With Cloud Storage Ingestion
25+
// description: Creates a new topic, with Cloud Storage ingestion enabled.
26+
// usage: node createTopicWithCloudStorageIngestion.js <topic-name> <bucket> <input-format> <text-delimiter> <match-glob> <minimum-object-creation-time>
27+
28+
// [START pubsub_create_topic_with_cloud_storage_ingestion]
29+
/**
30+
* TODO(developer): Uncomment these variables before running the sample.
31+
*/
32+
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
33+
// const bucket = 'YOUR_BUCKET_NAME';
34+
// const inputFormat = 'text';
35+
// const textDelimiter = '\n';
36+
// const matchGlob = '**.txt';
37+
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;
38+
39+
// Imports the Google Cloud client library
40+
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';
41+
42+
// Creates a client; cache this for further use
43+
const pubSubClient = new PubSub();
44+
45+
async function createTopicWithCloudStorageIngestion(
46+
topicNameOrId: string,
47+
bucket: string,
48+
inputFormat: string,
49+
textDelimiter: string,
50+
matchGlob: string,
51+
minimumObjectCreateTime: string
52+
) {
53+
const minimumDate = Date.parse(minimumObjectCreateTime);
54+
const topicMetadata: TopicMetadata = {
55+
name: topicNameOrId,
56+
ingestionDataSourceSettings: {
57+
cloudStorage: {
58+
bucket,
59+
minimumObjectCreateTime: {
60+
seconds: minimumDate / 1000,
61+
nanos: (minimumDate % 1000) * 1000,
62+
},
63+
matchGlob,
64+
},
65+
},
66+
};
67+
68+
// Make a format appropriately.
69+
switch (inputFormat) {
70+
case 'text':
71+
topicMetadata.ingestionDataSourceSettings!.cloudStorage!.textFormat = {
72+
delimiter: textDelimiter,
73+
};
74+
break;
75+
case 'avro':
76+
topicMetadata.ingestionDataSourceSettings!.cloudStorage!.avroFormat = {};
77+
break;
78+
case 'pubsub_avro':
79+
topicMetadata.ingestionDataSourceSettings!.cloudStorage!.pubsubAvroFormat =
80+
{};
81+
break;
82+
default:
83+
console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
84+
return;
85+
}
86+
87+
// Creates a new topic with Cloud Storage ingestion.
88+
await pubSubClient.createTopic(topicMetadata);
89+
console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
90+
}
91+
// [END pubsub_create_topic_with_cloud_storage_ingestion]
92+
93+
function main(
94+
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
95+
bucket = 'YOUR_BUCKET_NAME',
96+
inputFormat = 'text',
97+
textDelimiter = '\n',
98+
matchGlob = '**.txt',
99+
minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ'
100+
) {
101+
createTopicWithCloudStorageIngestion(
102+
topicNameOrId,
103+
bucket,
104+
inputFormat,
105+
textDelimiter,
106+
matchGlob,
107+
minimumObjectCreateTime
108+
).catch(err => {
109+
console.error(err.message);
110+
process.exitCode = 1;
111+
});
112+
}
113+
114+
main(...process.argv.slice(2));

0 commit comments

Comments
 (0)