diff --git a/bigquery/quickstart.js b/bigquery/quickstart.js index af2dac581f..dae5b0c60f 100644 --- a/bigquery/quickstart.js +++ b/bigquery/quickstart.js @@ -15,7 +15,6 @@ // [START bigquery_quickstart] // Imports and instantiates the Google Cloud client library -// for Google BigQuery const bigquery = require('@google-cloud/bigquery')({ projectId: 'YOUR_PROJECT_ID' }); diff --git a/datastore/quickstart.js b/datastore/quickstart.js index 05feb6dc0a..60a2c40c04 100644 --- a/datastore/quickstart.js +++ b/datastore/quickstart.js @@ -15,7 +15,6 @@ // [START datastore_quickstart] // Imports and instantiates the Google Cloud client library -// for Google Cloud Datastore const datastore = require('@google-cloud/datastore')({ projectId: 'YOUR_PROJECT_ID' }); diff --git a/logging/quickstart.js b/logging/quickstart.js index 0257750986..7bf1c8da79 100644 --- a/logging/quickstart.js +++ b/logging/quickstart.js @@ -15,7 +15,6 @@ // [START logging_quickstart] // Imports and instantiates the Google Cloud client library -// for Stackdriver Logging const logging = require('@google-cloud/logging')({ projectId: 'YOUR_PROJECT_ID' }); diff --git a/pubsub/package.json b/pubsub/package.json index da04a8f322..af9187367d 100644 --- a/pubsub/package.json +++ b/pubsub/package.json @@ -13,6 +13,7 @@ "yargs": "^5.0.0" }, "devDependencies": { + "async": "^2.0.1", "mocha": "^3.0.2", "node-uuid": "^1.4.7" }, diff --git a/pubsub/quickstart.js b/pubsub/quickstart.js index c6fd49246b..d8f72ef489 100644 --- a/pubsub/quickstart.js +++ b/pubsub/quickstart.js @@ -15,7 +15,6 @@ // [START pubsub_quickstart] // Imports and instantiates the Google Cloud client library -// for Google Cloud Pub/Sub const pubsub = require('@google-cloud/pubsub')({ projectId: 'YOUR_PROJECT_ID' }); diff --git a/pubsub/subscriptions.js b/pubsub/subscriptions.js index b4f4c97d45..7bd79e643e 100644 --- a/pubsub/subscriptions.js +++ b/pubsub/subscriptions.js @@ -23,10 +23,13 @@ 'use strict'; -const pubsubClient = require(`@google-cloud/pubsub`)(); +const PubSub = require(`@google-cloud/pubsub`); // [START pubsub_list_subscriptions] function listSubscriptions (callback) { + // Instantiates the client library + const pubsubClient = PubSub(); + // Lists all subscriptions in the current project pubsubClient.getSubscriptions((err, subscriptions) => { if (err) { @@ -43,6 +46,9 @@ function listSubscriptions (callback) { // [START pubsub_list_topic_subscriptions] function listTopicSubscriptions (topicName, callback) { + // Instantiates the client library + const pubsubClient = PubSub(); + // References an existing topic, e.g. "my-topic" const topic = pubsubClient.topic(topicName); @@ -62,6 +68,9 @@ function listTopicSubscriptions (topicName, callback) { // [START pubsub_create_subscription] function createSubscription (topicName, subscriptionName, callback) { + // Instantiates the client library + const pubsubClient = PubSub(); + // References an existing topic, e.g. "my-topic" const topic = pubsubClient.topic(topicName); @@ -80,16 +89,18 @@ function createSubscription (topicName, subscriptionName, callback) { // [START pubsub_create_push_subscription] function createPushSubscription (topicName, subscriptionName, callback) { + // Instantiates the client library + const pubsubClient = PubSub(); + // References an existing topic, e.g. "my-topic" const topic = pubsubClient.topic(topicName); - const projectId = process.env.GCLOUD_PROJECT || 'YOU_PROJECT_ID'; // Creates a new push subscription, e.g. "my-new-subscription" topic.subscribe(subscriptionName, { pushConfig: { // Set to an HTTPS endpoint of your choice. If necessary, register // (authorize) the domain on which the server is hosted. - pushEndpoint: `https://${projectId}.appspot.com/push` + pushEndpoint: `https://${pubsubClient.projectId}.appspot.com/push` } }, (err, subscription) => { if (err) { @@ -105,6 +116,9 @@ function createPushSubscription (topicName, subscriptionName, callback) { // [START pubsub_delete_subscription] function deleteSubscription (subscriptionName, callback) { + // Instantiates the client library + const pubsubClient = PubSub(); + // References an existing subscription, e.g. "my-subscription" const subscription = pubsubClient.subscription(subscriptionName); @@ -121,8 +135,11 @@ function deleteSubscription (subscriptionName, callback) { } // [END pubsub_delete_subscription] -// [START pubsub_get_subscription_metadata] -function getSubscriptionMetadata (subscriptionName, callback) { +// [START pubsub_get_subscription] +function getSubscription (subscriptionName, callback) { + // Instantiates the client library + const pubsubClient = PubSub(); + // References an existing subscription, e.g. "my-subscription" const subscription = pubsubClient.subscription(subscriptionName); @@ -140,10 +157,13 @@ function getSubscriptionMetadata (subscriptionName, callback) { callback(); }); } -// [END pubsub_get_subscription_metadata] +// [END pubsub_get_subscription] // [START pubsub_pull_messages] function pullMessages (subscriptionName, callback) { + // Instantiates the client library + const pubsubClient = PubSub(); + // References an existing subscription, e.g. "my-subscription" const subscription = pubsubClient.subscription(subscriptionName); @@ -168,8 +188,73 @@ function pullMessages (subscriptionName, callback) { } // [END pubsub_pull_messages] +let subscribeCounterValue = 1; + +function getSubscribeCounterValue () { + return subscribeCounterValue; +} + +function setSubscribeCounterValue (value) { + subscribeCounterValue = value; +} + +// [START pubsub_pull_ordered_messages] +const outstandingMessages = {}; + +function pullOrderedMessages (subscriptionName, callback) { + // Instantiates the client library + const pubsubClient = PubSub(); + + // References an existing subscription, e.g. "my-subscription" + const subscription = pubsubClient.subscription(subscriptionName); + + // Pulls messages. Set returnImmediately to false to block until messages are + // received. + subscription.pull({ returnImmediately: true }, (err, messages) => { + if (err) { + callback(err); + return; + } + + // Pub/Sub messages are unordered, so here we manually order messages by + // their "counterId" attribute which was set when they were published. + messages.forEach((message) => { + outstandingMessages[message.attributes.counterId] = message; + }); + + const outstandingIds = Object.keys(outstandingMessages).map((counterId) => +counterId); + outstandingIds.sort(); + + outstandingIds.forEach((counterId) => { + const counter = getSubscribeCounterValue(); + const message = outstandingMessages[counterId]; + + if (counterId < counter) { + // The message has already been processed + subscription.ack(message.ackId); + delete outstandingMessages[counterId]; + } else if (counterId === counter) { + // Process the message + console.log(`* %d %j %j`, message.id, message.data, message.attributes); + + setSubscribeCounterValue(counterId + 1); + subscription.ack(message.ackId); + delete outstandingMessages[counterId]; + } else { + // Have not yet processed the message on which this message is dependent + return false; + } + }); + callback(); + }); +} +// [END pubsub_pull_ordered_messages] + // [START pubsub_get_subscription_policy] function getSubscriptionPolicy (subscriptionName, callback) { + // Instantiates the client library + const pubsubClient = PubSub(); + // References an existing subscription, e.g. "my-subscription" const subscription = pubsubClient.subscription(subscriptionName); @@ -188,6 +273,9 @@ function getSubscriptionPolicy (subscriptionName, callback) { // [START pubsub_set_subscription_policy] function setSubscriptionPolicy (subscriptionName, callback) { + // Instantiates the client library + const pubsubClient = PubSub(); + // References an existing subscription, e.g. "my-subscription" const subscription = pubsubClient.subscription(subscriptionName); @@ -222,6 +310,9 @@ function setSubscriptionPolicy (subscriptionName, callback) { // [START pubsub_test_subscription_permissions] function testSubscriptionPermissions (subscriptionName, callback) { + // Instantiates the client library + const pubsubClient = PubSub(); + // References an existing subscription, e.g. "my-subscription" const subscription = pubsubClient.subscription(subscriptionName); @@ -253,8 +344,9 @@ const program = module.exports = { createSubscription: createSubscription, createPushSubscription: createPushSubscription, deleteSubscription: deleteSubscription, - getSubscriptionMetadata: getSubscriptionMetadata, + getSubscription: getSubscription, pullMessages: pullMessages, + pullOrderedMessages: pullOrderedMessages, getSubscriptionPolicy: getSubscriptionPolicy, setSubscriptionPolicy: setSubscriptionPolicy, testSubscriptionPermissions: testSubscriptionPermissions, @@ -283,7 +375,7 @@ cli program.deleteSubscription(options.subscriptionName, makeHandler(false)); }) .command(`get `, `Gets the metadata for a subscription.`, {}, (options) => { - program.getSubscriptionMetadata(options.subscriptionName, makeHandler(false)); + program.getSubscription(options.subscriptionName, makeHandler(false)); }) .command(`pull `, `Pulls messages for a subscription.`, {}, (options) => { program.pullMessages(options.subscriptionName, makeHandler(false)); diff --git a/pubsub/system-test/subscriptions.test.js b/pubsub/system-test/subscriptions.test.js index bfe5b9150a..3a6d9d215a 100644 --- a/pubsub/system-test/subscriptions.test.js +++ b/pubsub/system-test/subscriptions.test.js @@ -13,6 +13,7 @@ 'use strict'; +const async = require(`async`); const pubsub = require(`@google-cloud/pubsub`)(); const uuid = require(`node-uuid`); const path = require(`path`); @@ -108,10 +109,52 @@ describe(`pubsub:subscriptions`, () => { `* ${messageIds[0]} "${expected}" {}`; assert.equal(output, expectedOutput); done(); - }, 5000); + }, 2000); }); }); + it(`should pull ordered messages`, (done) => { + const subscriptions = require('../subscriptions'); + const expected = `Hello, world!`; + const publishedMessageIds = []; + + async.waterfall([ + (cb) => { + pubsub.topic(topicName).publish({ data: expected, attributes: { counterId: '3' } }, cb); + }, + (messageIds, apiResponse, cb) => { + publishedMessageIds.push(messageIds[0]); + setTimeout(() => subscriptions.pullOrderedMessages(subscriptionNameOne, cb), 2000); + }, + (cb) => { + assert.equal(console.log.callCount, 0); + pubsub.topic(topicName).publish({ data: expected, attributes: { counterId: '1' } }, cb); + }, + (messageIds, apiResponse, cb) => { + publishedMessageIds.push(messageIds[0]); + setTimeout(() => subscriptions.pullOrderedMessages(subscriptionNameOne, cb), 2000); + }, + (cb) => { + assert.equal(console.log.callCount, 1); + assert.deepEqual(console.log.firstCall.args, [`* %d %j %j`, publishedMessageIds[1], expected, { counterId: '1' }]); + pubsub.topic(topicName).publish({ data: expected, attributes: { counterId: '1' } }, cb); + }, + (messageIds, apiResponse, cb) => { + pubsub.topic(topicName).publish({ data: expected, attributes: { counterId: '2' } }, cb); + }, + (messageIds, apiResponse, cb) => { + publishedMessageIds.push(messageIds[0]); + setTimeout(() => subscriptions.pullOrderedMessages(subscriptionNameOne, cb), 2000); + }, + (cb) => { + assert.equal(console.log.callCount, 3); + assert.deepEqual(console.log.secondCall.args, [`* %d %j %j`, publishedMessageIds[2], expected, { counterId: '2' }]); + assert.deepEqual(console.log.thirdCall.args, [`* %d %j %j`, publishedMessageIds[0], expected, { counterId: '3' }]); + cb(); + } + ], done); + }); + it(`should set the IAM policy for a subscription`, (done) => { run(`${cmd} set-policy ${subscriptionNameOne}`, cwd); pubsub.subscription(subscriptionNameOne).iam.getPolicy((err, policy) => { diff --git a/pubsub/system-test/topics.test.js b/pubsub/system-test/topics.test.js index d84c701f60..a2cd561b6c 100644 --- a/pubsub/system-test/topics.test.js +++ b/pubsub/system-test/topics.test.js @@ -13,6 +13,7 @@ 'use strict'; +const async = require(`async`); const pubsub = require(`@google-cloud/pubsub`)(); const uuid = require(`node-uuid`); const path = require(`path`); @@ -58,29 +59,66 @@ describe(`pubsub:topics`, () => { }); it(`should publish a simple message`, (done) => { - pubsub.topic(topicName).subscribe(subscriptionName, (err, subscription) => { - assert.ifError(err); - run(`${cmd} publish ${topicName} "${message.data}"`, cwd); - subscription.pull((err, messages) => { - assert.ifError(err); - console.log(JSON.stringify(messages, null, 2)); + async.waterfall([ + (cb) => { + pubsub.topic(topicName).subscribe(subscriptionName, cb); + }, + (subscription, apiResponse, cb) => { + run(`${cmd} publish ${topicName} "${message.data}"`, cwd); + setTimeout(() => subscription.pull(cb), 2000); + }, + (messages, apiResponse, cb) => { assert.equal(messages[0].data, message.data); - done(); - }); - }); + cb(); + } + ], done); }); it(`should publish a JSON message`, (done) => { - pubsub.topic(topicName).subscribe(subscriptionName, { reuseExisting: true }, (err, subscription) => { - assert.ifError(err); - run(`${cmd} publish ${topicName} '${JSON.stringify(message)}'`, cwd); - subscription.pull((err, messages) => { - assert.ifError(err); - console.log(JSON.stringify(messages, null, 2)); + async.waterfall([ + (cb) => { + pubsub.topic(topicName).subscribe(subscriptionName, { reuseExisting: true }, cb); + }, + (subscription, apiResponse, cb) => { + run(`${cmd} publish ${topicName} '${JSON.stringify(message)}'`, cwd); + setTimeout(() => subscription.pull(cb), 2000); + }, + (messages, apiResponse, cb) => { assert.deepEqual(messages[0].data, message); - done(); - }); - }); + cb(); + } + ], done); + }); + + it(`should publish ordered messages`, (done) => { + const topics = require('../topics'); + let subscription; + + async.waterfall([ + (cb) => { + pubsub.topic(topicName).subscribe(subscriptionName, { reuseExisting: true }, cb); + }, + (_subscription, apiResponse, cb) => { + subscription = _subscription; + topics.publishOrderedMessage(topicName, message.data, cb); + }, + (cb) => { + setTimeout(() => subscription.pull(cb), 2000); + }, + (messages, apiResponse, cb) => { + assert.equal(messages[0].data, message.data); + assert.equal(messages[0].attributes.counterId, '1'); + topics.publishOrderedMessage(topicName, message.data, cb); + }, + (cb) => { + setTimeout(() => subscription.pull(cb), 2000); + }, + (messages, apiResponse, cb) => { + assert.equal(messages[0].data, message.data); + assert.equal(messages[0].attributes.counterId, '2'); + topics.publishOrderedMessage(topicName, message.data, cb); + } + ], done); }); it(`should set the IAM policy for a topic`, (done) => { diff --git a/pubsub/test/subscriptions.test.js b/pubsub/test/subscriptions.test.js index 982cd13d9b..b675b27293 100644 --- a/pubsub/test/subscriptions.test.js +++ b/pubsub/test/subscriptions.test.js @@ -48,15 +48,16 @@ describe(`pubsub:subscriptions`, () => { program.createSubscription(topicName, subscriptionName, callback); program.createPushSubscription(topicName, subscriptionName, callback); program.deleteSubscription(subscriptionName, callback); - program.getSubscriptionMetadata(subscriptionName, callback); + program.getSubscription(subscriptionName, callback); program.listSubscriptions(callback); program.listTopicSubscriptions(topicName, callback); program.pullMessages(subscriptionName, callback); + program.pullOrderedMessages(subscriptionName, callback); program.getSubscriptionPolicy(subscriptionName, callback); program.setSubscriptionPolicy(subscriptionName, callback); program.testSubscriptionPermissions(subscriptionName, callback); - assert.equal(callback.callCount, 10); + assert.equal(callback.callCount, 11); assert.equal(callback.alwaysCalledWithExactly(error), true); }); }); diff --git a/pubsub/test/topics.test.js b/pubsub/test/topics.test.js index 7e21d79dc5..22f6ab70dd 100644 --- a/pubsub/test/topics.test.js +++ b/pubsub/test/topics.test.js @@ -42,12 +42,13 @@ describe(`pubsub:topics`, () => { program.createTopic(topicName, callback); program.deleteTopic(topicName, callback); program.publishMessage(topicName, {}, callback); + program.publishOrderedMessage(topicName, {}, callback); program.listTopics(callback); program.getTopicPolicy(topicName, callback); program.setTopicPolicy(topicName, callback); program.testTopicPermissions(topicName, callback); - assert.equal(callback.callCount, 7); + assert.equal(callback.callCount, 8); assert.equal(callback.alwaysCalledWithExactly(error), true); }); }); diff --git a/pubsub/topics.js b/pubsub/topics.js index 6380ca7ccd..ef9a0c7ec2 100644 --- a/pubsub/topics.js +++ b/pubsub/topics.js @@ -23,10 +23,13 @@ 'use strict'; -const pubsubClient = require(`@google-cloud/pubsub`)(); +const PubSub = require(`@google-cloud/pubsub`); // [START pubsub_list_topics] function listTopics (callback) { + // Instantiates the client library + const pubsubClient = PubSub(); + // Lists all topics in the current project pubsubClient.getTopics((err, topics) => { if (err) { @@ -43,6 +46,9 @@ function listTopics (callback) { // [START pubsub_create_topic] function createTopic (topicName, callback) { + // Instantiates the client library + const pubsubClient = PubSub(); + // Creates a new topic, e.g. "my-new-topic" pubsubClient.createTopic(topicName, (err, topic) => { if (err) { @@ -58,6 +64,9 @@ function createTopic (topicName, callback) { // [START pubsub_delete_topic] function deleteTopic (topicName, callback) { + // Instantiates the client library + const pubsubClient = PubSub(); + // References an existing topic, e.g. "my-topic" const topic = pubsubClient.topic(topicName); @@ -76,6 +85,9 @@ function deleteTopic (topicName, callback) { // [START pubsub_publish_message] function publishMessage (topicName, data, callback) { + // Instantiates the client library + const pubsubClient = PubSub(); + // References an existing topic, e.g. "my-topic" const topic = pubsubClient.topic(topicName); @@ -105,8 +117,63 @@ function publishMessage (topicName, data, callback) { } // [END pubsub_publish_message] +let publishCounterValue = 1; + +function getPublishCounterValue () { + return publishCounterValue; +} + +function setPublishCounterValue (value) { + publishCounterValue = value; +} + +// [START pubsub_publish_ordered_message] +function publishOrderedMessage (topicName, data, callback) { + // Instantiates the client library + const pubsubClient = PubSub(); + + // References an existing topic, e.g. "my-topic" + const topic = pubsubClient.topic(topicName); + + /** + * In Node.js, a PubSub message requires a "data" property, which can have a + * string or an object as its value. An optional "attributes" property can be + * an object of key/value pairs, where the keys and values are both strings. + * See https://cloud.google.com/pubsub/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage + * + * Topic#publish() takes either a single message object or an array of message + * objects. See https://googlecloudplatform.github.io/google-cloud-node/#/docs/pubsub/latest/pubsub/topic?method=publish + */ + const message = { + data: data, + + // Pub/Sub messages are unordered, so assign an order id to the message to + // manually order messages + attributes: { + counterId: '' + getPublishCounterValue() + } + }; + + topic.publish(message, (err, messageIds) => { + if (err) { + callback(err); + return; + } + + // Update the counter value + setPublishCounterValue(+message.attributes.counterId + 1); + + console.log(`Message ${messageIds[0]} published.`); + callback(); + }); +} +// [END pubsub_publish_ordered_message] + // [START pubsub_get_topic_policy] function getTopicPolicy (topicName, callback) { + // Instantiates the client library + const pubsubClient = PubSub(); + // References an existing topic, e.g. "my-topic" const topic = pubsubClient.topic(topicName); @@ -125,6 +192,9 @@ function getTopicPolicy (topicName, callback) { // [START pubsub_set_topic_policy] function setTopicPolicy (topicName, callback) { + // Instantiates the client library + const pubsubClient = PubSub(); + // References an existing topic, e.g. "my-topic" const topic = pubsubClient.topic(topicName); @@ -159,6 +229,9 @@ function setTopicPolicy (topicName, callback) { // [START pubsub_test_topic_permissions] function testTopicPermissions (topicName, callback) { + // Instantiates the client library + const pubsubClient = PubSub(); + // References an existing topic, e.g. "my-topic" const topic = pubsubClient.topic(topicName); @@ -191,6 +264,7 @@ const program = module.exports = { createTopic: createTopic, deleteTopic: deleteTopic, publishMessage: publishMessage, + publishOrderedMessage: publishOrderedMessage, getTopicPolicy: getTopicPolicy, setTopicPolicy: setTopicPolicy, testTopicPermissions: testTopicPermissions, diff --git a/storage/quickstart.js b/storage/quickstart.js index 012344afe3..89c0b662cb 100644 --- a/storage/quickstart.js +++ b/storage/quickstart.js @@ -15,7 +15,6 @@ // [START storage_quickstart] // Imports and instantiates the Google Cloud client library -// for Google Cloud Storage const storage = require('@google-cloud/storage')({ projectId: 'YOUR_PROJECT_ID' }); diff --git a/translate/quickstart.js b/translate/quickstart.js index 46ce2607ca..59c34f5248 100644 --- a/translate/quickstart.js +++ b/translate/quickstart.js @@ -15,13 +15,12 @@ // [START translate_quickstart] // Imports and instantiates the Google Cloud client library -// for the Google Translate API const translate = require('@google-cloud/translate')({ key: 'YOUR_API_KEY' }); // Translates some text into Russian -translate.translate('Hello, world!', 'ru', (err, translation, apiResponse) => { +translate.translate('Hello, world!', 'ru', (err, translation) => { if (!err) { // The text was translated successfully }