Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion bigquery/quickstart.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

// [START bigquery_quickstart]
// Imports and instantiates the Google Cloud client library
// for Google BigQuery
const bigquery = require('@google-cloud/bigquery')({
projectId: 'YOUR_PROJECT_ID'
});
Expand Down
1 change: 0 additions & 1 deletion datastore/quickstart.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

// [START datastore_quickstart]
// Imports and instantiates the Google Cloud client library
// for Google Cloud Datastore
const datastore = require('@google-cloud/datastore')({
projectId: 'YOUR_PROJECT_ID'
});
Expand Down
1 change: 0 additions & 1 deletion logging/quickstart.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

// [START logging_quickstart]
// Imports and instantiates the Google Cloud client library
// for Stackdriver Logging
const logging = require('@google-cloud/logging')({
projectId: 'YOUR_PROJECT_ID'
});
Expand Down
1 change: 1 addition & 0 deletions pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"yargs": "^5.0.0"
},
"devDependencies": {
"async": "^2.0.1",
"mocha": "^3.0.2",
"node-uuid": "^1.4.7"
},
Expand Down
1 change: 0 additions & 1 deletion pubsub/quickstart.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

// [START pubsub_quickstart]
// Imports and instantiates the Google Cloud client library
// for Google Cloud Pub/Sub
const pubsub = require('@google-cloud/pubsub')({
projectId: 'YOUR_PROJECT_ID'
});
Expand Down
108 changes: 100 additions & 8 deletions pubsub/subscriptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@

'use strict';

const pubsubClient = require(`@google-cloud/pubsub`)();
const PubSub = require(`@google-cloud/pubsub`);

// [START pubsub_list_subscriptions]
function listSubscriptions (callback) {
// Instantiates the client library
const pubsubClient = PubSub();

// Lists all subscriptions in the current project
pubsubClient.getSubscriptions((err, subscriptions) => {
if (err) {
Expand All @@ -43,6 +46,9 @@ function listSubscriptions (callback) {

// [START pubsub_list_topic_subscriptions]
function listTopicSubscriptions (topicName, callback) {
// Instantiates the client library
const pubsubClient = PubSub();

// References an existing topic, e.g. "my-topic"
const topic = pubsubClient.topic(topicName);

Expand All @@ -62,6 +68,9 @@ function listTopicSubscriptions (topicName, callback) {

// [START pubsub_create_subscription]
function createSubscription (topicName, subscriptionName, callback) {
// Instantiates the client library
const pubsubClient = PubSub();

// References an existing topic, e.g. "my-topic"
const topic = pubsubClient.topic(topicName);

Expand All @@ -80,16 +89,18 @@ function createSubscription (topicName, subscriptionName, callback) {

// [START pubsub_create_push_subscription]
function createPushSubscription (topicName, subscriptionName, callback) {
// Instantiates the client library
const pubsubClient = PubSub();

// References an existing topic, e.g. "my-topic"
const topic = pubsubClient.topic(topicName);
const projectId = process.env.GCLOUD_PROJECT || 'YOU_PROJECT_ID';

// Creates a new push subscription, e.g. "my-new-subscription"
topic.subscribe(subscriptionName, {
pushConfig: {
// Set to an HTTPS endpoint of your choice. If necessary, register
// (authorize) the domain on which the server is hosted.
pushEndpoint: `https://${projectId}.appspot.com/push`
pushEndpoint: `https://${pubsubClient.projectId}.appspot.com/push`
}
}, (err, subscription) => {
if (err) {
Expand All @@ -105,6 +116,9 @@ function createPushSubscription (topicName, subscriptionName, callback) {

// [START pubsub_delete_subscription]
function deleteSubscription (subscriptionName, callback) {
// Instantiates the client library
const pubsubClient = PubSub();

// References an existing subscription, e.g. "my-subscription"
const subscription = pubsubClient.subscription(subscriptionName);

Expand All @@ -121,8 +135,11 @@ function deleteSubscription (subscriptionName, callback) {
}
// [END pubsub_delete_subscription]

// [START pubsub_get_subscription_metadata]
function getSubscriptionMetadata (subscriptionName, callback) {
// [START pubsub_get_subscription]
function getSubscription (subscriptionName, callback) {
// Instantiates the client library
const pubsubClient = PubSub();

// References an existing subscription, e.g. "my-subscription"
const subscription = pubsubClient.subscription(subscriptionName);

Expand All @@ -140,10 +157,13 @@ function getSubscriptionMetadata (subscriptionName, callback) {
callback();
});
}
// [END pubsub_get_subscription_metadata]
// [END pubsub_get_subscription]

// [START pubsub_pull_messages]
function pullMessages (subscriptionName, callback) {
// Instantiates the client library
const pubsubClient = PubSub();

// References an existing subscription, e.g. "my-subscription"
const subscription = pubsubClient.subscription(subscriptionName);

Expand All @@ -168,8 +188,73 @@ function pullMessages (subscriptionName, callback) {
}
// [END pubsub_pull_messages]

let subscribeCounterValue = 1;

function getSubscribeCounterValue () {
return subscribeCounterValue;
}

function setSubscribeCounterValue (value) {
subscribeCounterValue = value;
}

// [START pubsub_pull_ordered_messages]
const outstandingMessages = {};

function pullOrderedMessages (subscriptionName, callback) {
// Instantiates the client library
const pubsubClient = PubSub();

// References an existing subscription, e.g. "my-subscription"
const subscription = pubsubClient.subscription(subscriptionName);

// Pulls messages. Set returnImmediately to false to block until messages are
// received.
subscription.pull({ returnImmediately: true }, (err, messages) => {
if (err) {
callback(err);
return;
}

// Pub/Sub messages are unordered, so here we manually order messages by
// their "counterId" attribute which was set when they were published.
messages.forEach((message) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe include a comment that mentions that the (re-)ordering is being done by JS, and not by the API itself?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

outstandingMessages[message.attributes.counterId] = message;
});

const outstandingIds = Object.keys(outstandingMessages).map((counterId) => +counterId);
outstandingIds.sort();

outstandingIds.forEach((counterId) => {
const counter = getSubscribeCounterValue();
const message = outstandingMessages[counterId];

if (counterId < counter) {
// The message has already been processed
subscription.ack(message.ackId);
delete outstandingMessages[counterId];
} else if (counterId === counter) {
// Process the message
console.log(`* %d %j %j`, message.id, message.data, message.attributes);

setSubscribeCounterValue(counterId + 1);
subscription.ack(message.ackId);
delete outstandingMessages[counterId];
} else {
// Have not yet processed the message on which this message is dependent
return false;
}
});
callback();
});
}
// [END pubsub_pull_ordered_messages]

// [START pubsub_get_subscription_policy]
function getSubscriptionPolicy (subscriptionName, callback) {
// Instantiates the client library
const pubsubClient = PubSub();

// References an existing subscription, e.g. "my-subscription"
const subscription = pubsubClient.subscription(subscriptionName);

Expand All @@ -188,6 +273,9 @@ function getSubscriptionPolicy (subscriptionName, callback) {

// [START pubsub_set_subscription_policy]
function setSubscriptionPolicy (subscriptionName, callback) {
// Instantiates the client library
const pubsubClient = PubSub();

// References an existing subscription, e.g. "my-subscription"
const subscription = pubsubClient.subscription(subscriptionName);

Expand Down Expand Up @@ -222,6 +310,9 @@ function setSubscriptionPolicy (subscriptionName, callback) {

// [START pubsub_test_subscription_permissions]
function testSubscriptionPermissions (subscriptionName, callback) {
// Instantiates the client library
const pubsubClient = PubSub();

// References an existing subscription, e.g. "my-subscription"
const subscription = pubsubClient.subscription(subscriptionName);

Expand Down Expand Up @@ -253,8 +344,9 @@ const program = module.exports = {
createSubscription: createSubscription,
createPushSubscription: createPushSubscription,
deleteSubscription: deleteSubscription,
getSubscriptionMetadata: getSubscriptionMetadata,
getSubscription: getSubscription,
pullMessages: pullMessages,
pullOrderedMessages: pullOrderedMessages,
getSubscriptionPolicy: getSubscriptionPolicy,
setSubscriptionPolicy: setSubscriptionPolicy,
testSubscriptionPermissions: testSubscriptionPermissions,
Expand Down Expand Up @@ -283,7 +375,7 @@ cli
program.deleteSubscription(options.subscriptionName, makeHandler(false));
})
.command(`get <subscriptionName>`, `Gets the metadata for a subscription.`, {}, (options) => {
program.getSubscriptionMetadata(options.subscriptionName, makeHandler(false));
program.getSubscription(options.subscriptionName, makeHandler(false));
})
.command(`pull <subscriptionName>`, `Pulls messages for a subscription.`, {}, (options) => {
program.pullMessages(options.subscriptionName, makeHandler(false));
Expand Down
45 changes: 44 additions & 1 deletion pubsub/system-test/subscriptions.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

'use strict';

const async = require(`async`);
const pubsub = require(`@google-cloud/pubsub`)();
const uuid = require(`node-uuid`);
const path = require(`path`);
Expand Down Expand Up @@ -108,10 +109,52 @@ describe(`pubsub:subscriptions`, () => {
`* ${messageIds[0]} "${expected}" {}`;
assert.equal(output, expectedOutput);
done();
}, 5000);
}, 2000);
});
});

it(`should pull ordered messages`, (done) => {
const subscriptions = require('../subscriptions');
const expected = `Hello, world!`;
const publishedMessageIds = [];

async.waterfall([
(cb) => {
pubsub.topic(topicName).publish({ data: expected, attributes: { counterId: '3' } }, cb);
},
(messageIds, apiResponse, cb) => {
publishedMessageIds.push(messageIds[0]);
setTimeout(() => subscriptions.pullOrderedMessages(subscriptionNameOne, cb), 2000);
},
(cb) => {
assert.equal(console.log.callCount, 0);
pubsub.topic(topicName).publish({ data: expected, attributes: { counterId: '1' } }, cb);
},
(messageIds, apiResponse, cb) => {
publishedMessageIds.push(messageIds[0]);
setTimeout(() => subscriptions.pullOrderedMessages(subscriptionNameOne, cb), 2000);
},
(cb) => {
assert.equal(console.log.callCount, 1);
assert.deepEqual(console.log.firstCall.args, [`* %d %j %j`, publishedMessageIds[1], expected, { counterId: '1' }]);
pubsub.topic(topicName).publish({ data: expected, attributes: { counterId: '1' } }, cb);
},
(messageIds, apiResponse, cb) => {
pubsub.topic(topicName).publish({ data: expected, attributes: { counterId: '2' } }, cb);
},
(messageIds, apiResponse, cb) => {
publishedMessageIds.push(messageIds[0]);
setTimeout(() => subscriptions.pullOrderedMessages(subscriptionNameOne, cb), 2000);
},
(cb) => {
assert.equal(console.log.callCount, 3);
assert.deepEqual(console.log.secondCall.args, [`* %d %j %j`, publishedMessageIds[2], expected, { counterId: '2' }]);
assert.deepEqual(console.log.thirdCall.args, [`* %d %j %j`, publishedMessageIds[0], expected, { counterId: '3' }]);
cb();
}
], done);
});

it(`should set the IAM policy for a subscription`, (done) => {
run(`${cmd} set-policy ${subscriptionNameOne}`, cwd);
pubsub.subscription(subscriptionNameOne).iam.getPolicy((err, policy) => {
Expand Down
Loading