Skip to content

gcpubsub broker isn't working yet - 'str' object has no attribute 'seconds' #2212

@balaji-v4d

Description

@balaji-v4d

May be not ready yet, but i couldn't stop trying rc4, if this is premature close the bug.

$ python3 celery-rc55-test.py                                                                                                                                                           [28/1603]
Traceback (most recent call last):
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/google/protobuf/internal/well_known_types.py", line 439, in FromTimedelta
    td.seconds + td.days * _SECONDS_PER_DAY,
AttributeError: 'str' object has no attribute 'seconds'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/balaji_vinci4d_ai/celery-rc55-test.py", line 22, in <module>
    result = add.delay(4, 6)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/celery/app/task.py", line 444, in delay
    return self.apply_async(args, kwargs)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/celery/app/task.py", line 599, in apply_async
    return app.send_task(
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/celery/app/base.py", line 922, in send_task
    amqp.send_task_message(P, name, message, **options)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/celery/app/amqp.py", line 523, in send_task_message
    ret = producer.publish(
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/messaging.py", line 190, in publish
    return _publish(
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/connection.py", line 556, in _ensured
    return fun(*args, **kwargs)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/messaging.py", line 208, in _publish
    maybe_declare(entity, retry=retry, **retry_policy)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/messaging.py", line 107, in maybe_declare
    return maybe_declare(entity, self.channel, retry, **retry_policy)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/common.py", line 112, in maybe_declare
    return _imaybe_declare(entity, channel, **retry_policy)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/common.py", line 167, in _imaybe_declare
    return entity.channel.connection.client.ensure(
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/connection.py", line 556, in _ensured
    return fun(*args, **kwargs)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/common.py", line 153, in _maybe_declare
    entity.declare(channel=channel)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/entity.py", line 617, in declare
    self._create_queue(nowait=nowait, channel=channel)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/entity.py", line 628, in _create_queue
    self.queue_bind(nowait=nowait, channel=channel)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/entity.py", line 672, in queue_bind
    return self.bind_to(self.exchange, self.routing_key,
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/entity.py", line 681, in bind_to
    return (channel or self.channel).queue_bind(
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/transport/virtual/base.py", line 577, in queue_bind
    self._queue_bind(exchange, *meta)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/transport/gcpubsub.py", line 244, in _queue_bind
    self._create_subscription(
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/transport/gcpubsub.py", line 315, in _create_subscription
    self.subscriber.create_subscription(
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/google/pubsub_v1/services/subscriber/client.py", line 887, in create_subscription
    request = pubsub.Subscription(request)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/proto/message.py", line 728, in __init__
    pb_value = marshal.to_proto(pb_type, value)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/proto/marshal/marshal.py", line 235, in to_proto
    pb_value = self.get_rule(proto_type=proto_type).to_proto(value)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/proto/marshal/rules/message.py", line 36, in to_proto
    return self._descriptor(**value)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/google/protobuf/internal/well_known_types.py", line 449, in _internal_assign
    self.FromTimedelta(td)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/google/protobuf/internal/well_known_types.py", line 443, in FromTimedelta
    raise AttributeError(
AttributeError: Fail to convert to Duration. Expected a timedelta like object got str: 'str' object has no attribute 'seconds'

deps

 $ cat requirements.txt
celery==5.5.0rc4
google-cloud-pubsub>=2.18.4
google-cloud-monitoring>=2.16.0
grpcio==1.66.2

example

$ cat celery-rc55-test.py
from celery import Celery

expiration_seconds = 86400
broker_transport_options = {
        'ack_deadline_seconds': 60,
        'polling_interval': 0.3,
        'queue_name_prefix': 'kombu-',
        'expiration_seconds': 96400,
}

broker_url = 'gcpubsub://projects/v4d-dev'
app = Celery(
    "example_pubsub_app",
    broker=broker_url,
    broker_transport_options = broker_transport_options,
)

@app.task
def add(x, y):
   return x + y

result = add.delay(4, 6)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions