Skip to content

Commit 6fca8cf

Browse files
author
Ace Nassri
authored
Update Pub/Sub samples (GoogleCloudPlatform#464)
* Update Pub/Sub samples WIP * Make tests a little nicer * Update package.json
1 parent 4495be7 commit 6fca8cf

6 files changed

Lines changed: 217 additions & 151 deletions

File tree

pubsub/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ Commands:
8282
create-push <topicName> <subscriptionName> Creates a new push subscription.
8383
delete <subscriptionName> Deletes a subscription.
8484
get <subscriptionName> Gets the metadata for a subscription.
85-
pull <subscriptionName> Pulls messages for a subscription.
85+
listen <subscriptionName> Listens to messages for a subscription.
8686
get-policy <subscriptionName> Gets the IAM policy for a subscription.
8787
set-policy <subscriptionName> Sets the IAM policy for a subscription.
8888
test-permissions <subscriptionName> Tests the permissions for a subscription.

pubsub/package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717
"test": "samples test run --cmd ava -- -T 30s --verbose system-test/*.test.js"
1818
},
1919
"dependencies": {
20-
"@google-cloud/pubsub": "0.13.2",
20+
"@google-cloud/pubsub": "0.14.0",
2121
"yargs": "8.0.2"
2222
},
2323
"devDependencies": {
2424
"@google-cloud/nodejs-repo-tools": "1.4.17",
25-
"ava": "0.21.0",
25+
"ava": "0.22.0",
2626
"proxyquire": "1.8.0",
27-
"sinon": "3.2.0"
27+
"sinon": "3.2.1"
2828
},
2929
"cloud-repo-tools": {
3030
"requiresKeyFile": true,

pubsub/subscriptions.js

Lines changed: 77 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ function createSubscription (topicName, subscriptionName) {
7373
const topic = pubsub.topic(topicName);
7474

7575
// Creates a new subscription, e.g. "my-new-subscription"
76-
return topic.subscribe(subscriptionName)
76+
return topic.createSubscription(subscriptionName)
7777
.then((results) => {
7878
const subscription = results[0];
7979

@@ -101,7 +101,7 @@ function createPushSubscription (topicName, subscriptionName) {
101101
};
102102

103103
// Creates a new push subscription, e.g. "my-new-subscription"
104-
return topic.subscribe(subscriptionName, options)
104+
return topic.createSubscription(subscriptionName, options)
105105
.then((results) => {
106106
const subscription = results[0];
107107

@@ -151,32 +151,34 @@ function getSubscription (subscriptionName) {
151151
}
152152
// [END pubsub_get_subscription]
153153

154-
// [START pubsub_pull_messages]
155-
function pullMessages (subscriptionName) {
154+
// [START pubsub_listen_messages]
155+
function listenForMessages (subscriptionName, timeout) {
156156
// Instantiates a client
157157
const pubsub = PubSub();
158158

159159
// References an existing subscription, e.g. "my-subscription"
160160
const subscription = pubsub.subscription(subscriptionName);
161161

162-
// Pulls messages. Set returnImmediately to false to block until messages are
163-
// received.
164-
return subscription.pull()
165-
.then((results) => {
166-
const messages = results[0];
167-
168-
console.log(`Received ${messages.length} messages.`);
162+
// Create an event handler to handle messages
163+
let messageCount = 0;
164+
const messageHandler = (message) => {
165+
console.log(`Received message ${message.id}:`);
166+
console.log(`\tData: ${message.data}`);
167+
console.log(`\tAttributes: ${message.attributes}`);
168+
messageCount += 1;
169169

170-
messages.forEach((message) => {
171-
console.log(`* %d %j %j`, message.id, message.data, message.attributes);
172-
});
170+
// "Ack" (acknowledge receipt of) the message
171+
message.ack();
172+
};
173173

174-
// Acknowledges received messages. If you do not acknowledge, Pub/Sub will
175-
// redeliver the message.
176-
return subscription.ack(messages.map((message) => message.ackId));
177-
});
174+
// Listen for new messages until timeout is hit
175+
subscription.on(`message`, messageHandler);
176+
setTimeout(() => {
177+
subscription.removeListener('message', messageHandler);
178+
console.log(`${messageCount} message(s) received.`);
179+
}, timeout * 1000);
178180
}
179-
// [END pubsub_pull_messages]
181+
// [END pubsub_listen_messages]
180182

181183
let subscribeCounterValue = 1;
182184

@@ -188,54 +190,61 @@ function setSubscribeCounterValue (value) {
188190
subscribeCounterValue = value;
189191
}
190192

191-
// [START pubsub_pull_ordered_messages]
193+
// [START pubsub_listen_ordered_messages]
192194
const outstandingMessages = {};
193195

194-
function pullOrderedMessages (subscriptionName) {
196+
function listenForOrderedMessages (subscriptionName, timeout) {
195197
// Instantiates a client
196198
const pubsub = PubSub();
197199

198200
// References an existing subscription, e.g. "my-subscription"
199201
const subscription = pubsub.subscription(subscriptionName);
200202

201-
// Pulls messages. Set returnImmediately to false to block until messages are
202-
// received.
203-
return subscription.pull()
204-
.then((results) => {
205-
const messages = results[0];
206-
207-
// Pub/Sub messages are unordered, so here we manually order messages by
208-
// their "counterId" attribute which was set when they were published.
209-
messages.forEach((message) => {
210-
outstandingMessages[message.attributes.counterId] = message;
211-
});
212-
213-
const outstandingIds = Object.keys(outstandingMessages).map((counterId) => parseInt(counterId, 10));
214-
outstandingIds.sort();
215-
216-
outstandingIds.forEach((counterId) => {
217-
const counter = getSubscribeCounterValue();
218-
const message = outstandingMessages[counterId];
219-
220-
if (counterId < counter) {
221-
// The message has already been processed
222-
subscription.ack(message.ackId);
223-
delete outstandingMessages[counterId];
224-
} else if (counterId === counter) {
225-
// Process the message
226-
console.log(`* %d %j %j`, message.id, message.data, message.attributes);
227-
228-
setSubscribeCounterValue(counterId + 1);
229-
subscription.ack(message.ackId);
230-
delete outstandingMessages[counterId];
231-
} else {
232-
// Have not yet processed the message on which this message is dependent
233-
return false;
234-
}
235-
});
203+
// Create an event handler to handle messages
204+
const messageHandler = function (message) {
205+
// Buffer the message in an object (for later ordering)
206+
outstandingMessages[message.attributes.counterId] = message;
207+
208+
// "Ack" (acknowledge receipt of) the message
209+
message.ack();
210+
};
211+
212+
// Listen for new messages until timeout is hit
213+
return new Promise((resolve) => {
214+
subscription.on(`message`, messageHandler);
215+
setTimeout(() => {
216+
subscription.removeListener(`message`, messageHandler);
217+
resolve();
218+
}, timeout * 1000);
219+
})
220+
.then(() => {
221+
// Pub/Sub messages are unordered, so here we manually order messages by
222+
// their "counterId" attribute which was set when they were published.
223+
const outstandingIds = Object.keys(outstandingMessages).map((counterId) => parseInt(counterId, 10));
224+
outstandingIds.sort();
225+
226+
outstandingIds.forEach((counterId) => {
227+
const counter = getSubscribeCounterValue();
228+
const message = outstandingMessages[counterId];
229+
230+
if (counterId < counter) {
231+
// The message has already been processed
232+
message.ack();
233+
delete outstandingMessages[counterId];
234+
} else if (counterId === counter) {
235+
// Process the message
236+
console.log(`* %d %j %j`, message.id, message.data.toString(), message.attributes);
237+
setSubscribeCounterValue(counterId + 1);
238+
message.ack();
239+
delete outstandingMessages[counterId];
240+
} else {
241+
// Have not yet processed the message on which this message is dependent
242+
return false;
243+
}
236244
});
245+
});
237246
}
238-
// [END pubsub_pull_ordered_messages]
247+
// [END pubsub_listen_ordered_messages]
239248

240249
// [START pubsub_get_subscription_policy]
241250
function getSubscriptionPolicy (subscriptionName) {
@@ -318,7 +327,7 @@ function testSubscriptionPermissions (subscriptionName) {
318327
}
319328
// [END pubsub_test_subscription_permissions]
320329

321-
module.exports = { pullOrderedMessages };
330+
module.exports = { listenForOrderedMessages };
322331

323332
const cli = require(`yargs`)
324333
.demand(1)
@@ -359,10 +368,16 @@ const cli = require(`yargs`)
359368
(opts) => getSubscription(opts.subscriptionName)
360369
)
361370
.command(
362-
`pull <subscriptionName>`,
363-
`Pulls messages for a subscription.`,
364-
{},
365-
(opts) => pullMessages(opts.subscriptionName)
371+
`listen <subscriptionName>`,
372+
`Listens to messages for a subscription.`,
373+
{
374+
timeout: {
375+
alias: 't',
376+
type: 'number',
377+
default: 10
378+
}
379+
},
380+
(opts) => listenForMessages(opts.subscriptionName, opts.timeout)
366381
)
367382
.command(
368383
`get-policy <subscriptionName>`,

pubsub/system-test/subscriptions.test.js

Lines changed: 53 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -63,17 +63,22 @@ test.beforeEach(tools.stubConsole);
6363
test.afterEach.always(tools.restoreConsole);
6464

6565
test.serial(`should create a subscription`, async (t) => {
66+
t.plan(1);
6667
const output = await tools.runAsync(`${cmd} create ${topicNameOne} ${subscriptionNameOne}`, cwd);
6768
t.is(output, `Subscription ${fullSubscriptionNameOne} created.`);
68-
const results = await pubsub.subscription(subscriptionNameOne).exists();
69-
t.true(results[0]);
69+
await tools.tryTest(async (assert) => {
70+
const [subscriptions] = await pubsub.topic(topicNameOne).getSubscriptions();
71+
assert.equal(subscriptions[0].name, fullSubscriptionNameOne);
72+
}).start();
7073
});
7174

7275
test.serial(`should create a push subscription`, async (t) => {
7376
const output = await tools.runAsync(`${cmd} create-push ${topicNameOne} ${subscriptionNameTwo}`, cwd);
7477
t.is(output, `Subscription ${fullSubscriptionNameTwo} created.`);
75-
const results = await pubsub.subscription(subscriptionNameTwo).exists();
76-
t.true(results[0]);
78+
await tools.tryTest(async (assert) => {
79+
const [subscriptions] = await pubsub.topic(topicNameOne).getSubscriptions();
80+
assert(subscriptions.some((s) => s.name === fullSubscriptionNameTwo));
81+
}).start();
7782
});
7883

7984
test.serial(`should get metadata for a subscription`, async (t) => {
@@ -86,52 +91,60 @@ test.serial(`should get metadata for a subscription`, async (t) => {
8691
});
8792

8893
test.serial(`should list all subscriptions`, async (t) => {
89-
await tools.tryTest(async () => {
94+
t.plan(0);
95+
await tools.tryTest(async (assert) => {
9096
const output = await tools.runAsync(`${cmd} list`, cwd);
91-
t.true(output.includes(`Subscriptions:`));
92-
t.true(output.includes(fullSubscriptionNameOne));
93-
t.true(output.includes(fullSubscriptionNameTwo));
97+
assert(output.includes(`Subscriptions:`));
98+
assert(output.includes(fullSubscriptionNameOne));
99+
assert(output.includes(fullSubscriptionNameTwo));
94100
}).start();
95101
});
96102

97103
test.serial(`should list subscriptions for a topic`, async (t) => {
98-
const output = await tools.runAsync(`${cmd} list ${topicNameOne}`, cwd);
99-
t.true(output.includes(`Subscriptions for ${topicNameOne}:`));
100-
t.true(output.includes(fullSubscriptionNameOne));
101-
t.true(output.includes(fullSubscriptionNameTwo));
104+
t.plan(0);
105+
await tools.tryTest(async (assert) => {
106+
const output = await tools.runAsync(`${cmd} list ${topicNameOne}`, cwd);
107+
assert(output.includes(`Subscriptions for ${topicNameOne}:`));
108+
assert(output.includes(fullSubscriptionNameOne));
109+
assert(output.includes(fullSubscriptionNameTwo));
110+
}).start();
102111
});
103112

104-
test.serial(`should pull messages`, async (t) => {
113+
test.serial(`should listen for messages`, async (t) => {
105114
const expected = `Hello, world!`;
106-
const results = await pubsub.topic(topicNameOne).publish(expected);
107-
const messageIds = results[0];
108-
const expectedOutput = `Received ${messageIds.length} messages.\n* ${messageIds[0]} "${expected}" {}`;
109-
const output = await tools.runAsync(`${cmd} pull ${subscriptionNameOne}`, cwd);
110-
t.is(output, expectedOutput);
115+
const messageIds = await pubsub.topic(topicNameOne).publisher().publish(Buffer.from(expected));
116+
const output = await tools.runAsync(`${cmd} listen ${subscriptionNameOne}`, cwd);
117+
t.true(output.includes(`Received message ${messageIds[0]}:`));
111118
});
112119

113-
test.serial(`should pull ordered messages`, async (t) => {
120+
test.serial(`should listen for ordered messages`, async (t) => {
121+
const timeout = 5;
114122
const subscriptions = require('../subscriptions');
115123
const expected = `Hello, world!`;
124+
const expectedBuffer = Buffer.from(expected);
116125
const publishedMessageIds = [];
117-
await pubsub.topic(topicNameTwo).subscribe(subscriptionNameThree);
118-
let results = await pubsub.topic(topicNameTwo).publish({ data: expected, attributes: { counterId: '3' } }, { raw: true });
119-
publishedMessageIds.push(results[0][0]);
120-
await subscriptions.pullOrderedMessages(subscriptionNameThree);
126+
const publisherTwo = pubsub.topic(topicNameTwo).publisher();
127+
128+
await pubsub.topic(topicNameTwo).createSubscription(subscriptionNameThree);
129+
let [result] = await publisherTwo.publish(expectedBuffer, { counterId: '3' });
130+
publishedMessageIds.push(result);
131+
await subscriptions.listenForOrderedMessages(subscriptionNameThree, timeout);
121132
t.is(console.log.callCount, 0);
122-
results = await pubsub.topic(topicNameTwo).publish({ data: expected, attributes: { counterId: '1' } }, { raw: true });
123-
publishedMessageIds.push(results[0][0]);
124-
await subscriptions.pullOrderedMessages(subscriptionNameThree);
133+
134+
[result] = await publisherTwo.publish(expectedBuffer, { counterId: '1' });
135+
publishedMessageIds.push(result);
136+
await subscriptions.listenForOrderedMessages(subscriptionNameThree, timeout);
125137
t.is(console.log.callCount, 1);
126138
t.deepEqual(console.log.firstCall.args, [`* %d %j %j`, publishedMessageIds[1], expected, { counterId: '1' }]);
127-
results = await pubsub.topic(topicNameTwo).publish({ data: expected, attributes: { counterId: '1' } }, { raw: true });
128-
results = await pubsub.topic(topicNameTwo).publish({ data: expected, attributes: { counterId: '2' } }, { raw: true });
129-
publishedMessageIds.push(results[0][0]);
130-
await tools.tryTest(async () => {
131-
await subscriptions.pullOrderedMessages(subscriptionNameThree);
132-
t.is(console.log.callCount, 3);
133-
t.deepEqual(console.log.secondCall.args, [`* %d %j %j`, publishedMessageIds[2], expected, { counterId: '2' }]);
134-
t.deepEqual(console.log.thirdCall.args, [`* %d %j %j`, publishedMessageIds[0], expected, { counterId: '3' }]);
139+
140+
[result] = await publisherTwo.publish(expectedBuffer, { counterId: '1' });
141+
[result] = await publisherTwo.publish(expectedBuffer, { counterId: '2' });
142+
publishedMessageIds.push(result);
143+
await tools.tryTest(async (assert) => {
144+
await subscriptions.listenForOrderedMessages(subscriptionNameThree, timeout);
145+
assert.equal(console.log.callCount, 3);
146+
assert.deepEqual(console.log.secondCall.args, [`* %d %j %j`, publishedMessageIds[2], expected, { counterId: '2' }]);
147+
assert.deepEqual(console.log.thirdCall.args, [`* %d %j %j`, publishedMessageIds[0], expected, { counterId: '3' }]);
135148
});
136149
});
137150

@@ -163,8 +176,12 @@ test.serial(`should test permissions for a subscription`, async (t) => {
163176
});
164177

165178
test.serial(`should delete a subscription`, async (t) => {
179+
t.plan(1);
166180
const output = await tools.runAsync(`${cmd} delete ${subscriptionNameOne}`, cwd);
167181
t.is(output, `Subscription ${fullSubscriptionNameOne} deleted.`);
168-
const results = await pubsub.subscription(subscriptionNameOne).exists();
169-
t.false(results[0], false);
182+
await tools.tryTest(async (assert) => {
183+
const [subscriptions] = await pubsub.getSubscriptions();
184+
assert.ok(subscriptions);
185+
assert(subscriptions.every((s) => s.name !== fullSubscriptionNameOne));
186+
}).start();
170187
});

0 commit comments

Comments
 (0)