@@ -110,6 +110,30 @@ def callback(message):
110110 time .sleep (60 )
111111
112112
113+ def listen_for_errors (project , subscription_name ):
114+ """Receives messages and catches errors from a pull subscription."""
115+ subscriber = pubsub_v1 .SubscriberClient ()
116+ subscription_path = subscriber .subscription_path (
117+ project , subscription_name )
118+
119+ def callback (message ):
120+ print ('Received message: {}' .format (message ))
121+ message .ack ()
122+
123+ subscription = subscriber .subscribe (subscription_path , callback = callback )
124+
125+ # Blocks the thread while messages are coming in through the stream. Any
126+ # exceptions that crop up on the thread will be set on the future.
127+ future = subscription .open (callback )
128+ try :
129+ future .result ()
130+ except Exception as e :
131+ print (
132+ 'Listening for messages on {} threw an Exception: {}.' .format (
133+ subscription_name , e ))
134+ raise
135+
136+
113137if __name__ == '__main__' :
114138 parser = argparse .ArgumentParser (
115139 description = __doc__ ,
@@ -143,6 +167,10 @@ def callback(message):
143167 help = receive_messages_with_flow_control .__doc__ )
144168 receive_with_flow_control_parser .add_argument ('subscription_name' )
145169
170+ listen_for_errors_parser = subparsers .add_parser (
171+ 'listen_for_errors' , help = listen_for_errors .__doc__ )
172+ listen_for_errors_parser .add_argument ('subscription_name' )
173+
146174 args = parser .parse_args ()
147175
148176 if args .command == 'list_in_topic' :
@@ -160,3 +188,5 @@ def callback(message):
160188 elif args .command == 'receive-flow-control' :
161189 receive_messages_with_flow_control (
162190 args .project , args .subscription_name )
191+ elif args .command == 'listen_for_errors' :
192+ listen_for_errors (args .project , args .subscription_name )
0 commit comments