-
Notifications
You must be signed in to change notification settings - Fork 0
HH-124106 add sync cached based on consul blocking queries #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
consul/base.py
Outdated
self.watch_seconds = watch_seconds | ||
self.index = None | ||
self._running = True | ||
self._cache_process = threading.Thread(target=self._update_cache, daemon=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_cache_process -> _cache_thread
def stop(self): | ||
self._running = False | ||
|
||
def add_listener(self, callback, trigger_current=False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Возможно, правильнее чтобы callback не только value принимал но и key
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
А зачем, кстати, там в cache лежит dict всегда из 1-го элемента?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Прав, для текущих кешей это не нужно, но кажется, что при расширении могут быть разные варианты наполнения кеша с ключами. И да, тогда имеет смысл key тоже передавать в callback.
consul/base.py
Outdated
'wait': self.watch_seconds | ||
} | ||
log.debug(f'Param for health query: {params}') | ||
self.index, values = self.health_client.service(**params) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
long_pooling и блокировка на строчк
self.index, values = self.health_client.service(**params)
происходит?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Судя по тому, что мы в ответ получаем сразу значения, то вызов блокирующий.
А ошибки тут обработать не нужно? Они могут быть?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ошибки быть могут. Мы допускаем только 404, в случае других ошибок нужно выбрасывать исключение. Вот тут описаны ошибки, которые могу возникнуть
Line 208 in b085726
def _status(klass, response, allow_404=True): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Тогда обращаю внимание, что этот отдельный тред в случае эксепшена просто умрет )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
А вообще прав, я не подумал. Тред умрет, приложение будет дальше работать. Оберну в трай-кэтч, чтобы пока приложение живо поток пытался получить данные из консула
def __init__(self, watch_seconds): | ||
self.cache = dict() | ||
self.callbacks = [] | ||
self.watch_seconds = watch_seconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
для watch_seconds как параметра этот Cache-объекта имя бы какое-нибудь другое дать, просто щас непотяно что оно значит.
По факту же это сколько максимально по времени держать long-pooling соединение, в случае если ничего не изменится в консуле за это время?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Тут все так, это дань унификации. У нас в клиенте джавовом и конфигах он так же называется. Мне кажется лучше оставить так, чтобы разъездов меньше было. Если переименовывать, то по хорошему и в конфигах и в джавовом клиенте. Что думаешь?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
думаю, надо оставить
consul/base.py
Outdated
self.index, values = self.kv_client.get(**params) | ||
old_cache = self.cache | ||
self.cache = {self.path: values} | ||
if self.callbacks and self._running: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
это на вкус канеш, но я бы инвертировал логику проверок с ранним выхходом на continue
чтобы пирамидку чутка разгрузить
consul/base.py
Outdated
return {'passing': passing, 'warning': warning} | ||
|
||
|
||
class ConsulCacheBase(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
не надо от object наследоваться
consul/base.py
Outdated
self.watch_seconds = watch_seconds | ||
self.index = None | ||
self._running = True | ||
self._cache_process = threading.Thread(target=self._update_cache, daemon=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Я бы еще какой-нибудь понятный name
задал бы, чтобы в трейсбеках можно было бы следить и для метрик может пригодиться
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
А есть конвенция как имена тредам давать в питоне? snake-case, camel-case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
не, нет какой-то особой, по-умолчанию они называются как Thread-N
, где N это small int
consul/base.py
Outdated
for key, value in self.cache: | ||
callback(value) | ||
|
||
def _update_cache(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Если не предполагается, что этот класс кто-то будет использовать напрямую, то я бы его объявил бы как
class ConsulCacheBase(metaclass=ABCMeta):
...
А на этот метод повесил бы
@abstractmethod
consul/base.py
Outdated
""" | ||
|
||
def __init__(self, health_client, watch_seconds, service, passing): | ||
ConsulCacheBase.__init__(self, watch_seconds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
лучше всегда использовать super().__init__
. Тут это не критично, но если вдруг базщовый класс будет использовать множественное наследование то тут все развалится
consul/base.py
Outdated
Consul health service cache | ||
""" | ||
|
||
def __init__(self, health_client, watch_seconds, service, passing): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Укажи, пожалуйста типы аргументов, тут они совсем не очевидны
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
И, возможно стоит и значение некоторых пояснить, в докстринге, например - что такое passing
consul/base.py
Outdated
'wait': self.watch_seconds | ||
} | ||
log.debug(f'Param for health query: {params}') | ||
self.index, values = self.health_client.service(**params) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Судя по тому, что мы в ответ получаем сразу значения, то вызов блокирующий.
А ошибки тут обработать не нужно? Они могут быть?
consul/base.py
Outdated
self.index, values = self.health_client.service(**params) | ||
self.cache = {self.service: values} | ||
old_cache = self.cache | ||
if self.callbacks and self._running: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Нет смысла тут повторно проверять на is_running, потому что до этого момента никто не успеет этот флаг поменять.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
могут по идее, тут ведь отдельный тред
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Да, я чот плохо подумал, в теории GIL гарантирует эксклюзивный доступ, но тут будет блокировка на моменте self.health_client.service
и вот тут может другой поток запуститься
consul/base.py
Outdated
old_cache = self.cache | ||
if self.callbacks and self._running: | ||
for key, old_value in old_cache.items(): | ||
new_value = self.cache.get(key, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
А в чем логика callback(None)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ничего не получили из KV хранилища консула, должны в сервисе задать значение по умолчанию
consul/base.py
Outdated
def __init__(self, kv_client, watch_seconds, path, total_timeout, consistency_mode): | ||
ConsulCacheBase.__init__(self, watch_seconds) | ||
self.total_timeout = total_timeout | ||
self.consistency_mode = consistency_mode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Выглядит так, как будто это должен быть Enum
consul/base.py
Outdated
if self.callbacks and self._running: | ||
for key, new_value in self.cache.items(): | ||
old_value = old_cache.get(key, None) | ||
if old_value != new_value: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Значения каких типов тут могут быть?
consul/base.py
Outdated
pass | ||
|
||
|
||
class ConsulCacheException(Exception): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Не нашел использований, заче эта ошибка?
consul/base.py
Outdated
self.callbacks.append(callback) | ||
log.debug(f'Registered callback: {self.callbacks}') | ||
if trigger_current: | ||
for key, value in self.cache: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Так не сработает, надо in self.cache.items()
consul/base.py
Outdated
self.consistency_mode = consistency_mode | ||
self.path = path | ||
self.kv_client = kv_client | ||
self.cache = {self.path: kv_client.get(path)[1]} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
вот тут давай-ка заложим-ка кастомный таймаут?
отхожим местом чую - понадобится он
def __init__(self, watch_seconds): | ||
self.cache = dict() | ||
self.callbacks = [] | ||
self.watch_seconds = watch_seconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
думаю, надо оставить
kv_client: 'Consul.KV', | ||
watch_seconds: str, | ||
path: str, | ||
total_timeout: int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cache_initial_warmup_timeout? и наверное этот параметр необязательный. если меня общий таймаут устраивает, то че бы нет
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Имеешь ввиду еще один параметр добавить, чтобы тут его использовать при инициализации кеша ?
self.cache = {self.path: kv_client.get(key=path, total_timeout=total_timeout)[1]}
calls to wait for changes since this query was last run. | ||
""" | ||
|
||
def __init__(self, watch_seconds: str): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
А watch_seconds точно должен быть str? Выглядит несколько нелогичным
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Да, это консула. Он в таком виде : 10s, 10m, 10ms передается
consul/base.py
Outdated
""" | ||
|
||
def __init__(self, | ||
health_client: 'Consul.Health', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
А нельзя сюда просто заимпортить этот тип?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
А как его "заимпортить"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Что такое health_client, откуда он возьмётся?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ещё пара моментов
consul/base.py
Outdated
watch_seconds: str, | ||
path: str, | ||
total_timeout: int, | ||
consistency_mode: 'ConsistencyMode'): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Тут можно тип без кавычек указать, он же в этом же файле, выше, определён
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
есть!
consul/base.py
Outdated
log.debug(f'Value was changed for key={key}. old: {old_value} new: {new_value}') | ||
for callback in self.callbacks: | ||
callback(key, new_value) | ||
except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Использовать дженерный класс Exception
считается плохой практикой, потому что:
- при добавлении нового кода можно забыть, что ловятся все ошибки и поймать лишнюю
- постороннему разработчику не очевидно какие именно ошибки тут могут возникнуть
Тут точно нельзя указать конкретные ошибки, которые мы ждём?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Уточнил тип
2455fd5
to
c9b0123
Compare
No description provided.