-
Notifications
You must be signed in to change notification settings - Fork 98
Add retries to file download to avoid throwing an exception in the case of normal retry errors #1002
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Add retries to file download to avoid throwing an exception in the case of normal retry errors #1002
Changes from 10 commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
54354d8
Bump version to next dev version
sgillies 74e8b77
Merge pull request #993 from planetlabs/rel-2.1
sgillies e94ea98
add ServerError to list of exceptions to retry
jreiberkyle 8b29a93
Merge tag '2.1.1' into main
sgillies ce8adc3
move write functionality to Session, include limiting and retries
jreiberkyle 6fbdefe
add tests, fixes so tests pass, remove ServerError retry because it s…
jreiberkyle e23916a
Merge branch 'main' into download-failure-974
jreiberkyle d761191
patch with asyncmock in python 3.7
jreiberkyle 11136e2
commented code clean up
jreiberkyle 05a02cb
merge maint=2.1
jreiberkyle 71d3085
move filename determination to models.Response and file and progress …
jreiberkyle File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,14 +16,19 @@ | |
| from __future__ import annotations # https://stackoverflow.com/a/33533514 | ||
| import asyncio | ||
| from collections import Counter | ||
| from contextlib import asynccontextmanager | ||
| from http import HTTPStatus | ||
| import logging | ||
| import mimetypes | ||
| from pathlib import Path | ||
| import random | ||
| import re | ||
| import string | ||
| import time | ||
| from typing import AsyncGenerator, Optional | ||
| from typing import Optional | ||
| from urllib.parse import urlparse | ||
|
|
||
| import httpx | ||
| from tqdm.asyncio import tqdm | ||
| from typing_extensions import Literal | ||
|
|
||
| from .auth import Auth, AuthType | ||
|
|
@@ -42,7 +47,7 @@ | |
| httpx.ReadTimeout, | ||
| httpx.RemoteProtocolError, | ||
| exceptions.BadGateway, | ||
| exceptions.TooManyRequests | ||
| exceptions.TooManyRequests, | ||
| ] | ||
| MAX_RETRIES = 5 | ||
| MAX_RETRY_BACKOFF = 64 # seconds | ||
|
|
@@ -394,26 +399,80 @@ async def _send(self, request, stream=False) -> httpx.Response: | |
|
|
||
| return http_resp | ||
|
|
||
| @asynccontextmanager | ||
| async def stream( | ||
| self, method: str, | ||
| url: str) -> AsyncGenerator[models.StreamingResponse, None]: | ||
| """Submit a request and get the response as a stream context manager. | ||
| async def write(self, | ||
| url: str, | ||
| filename: Optional[str] = None, | ||
| directory: Path = Path('.'), | ||
| overwrite: bool = False, | ||
| progress_bar: bool = False) -> Path: | ||
| """Write data to local file with limiting and retries. | ||
|
|
||
| Parameters: | ||
| method: HTTP request method. | ||
| url: Location of the API endpoint. | ||
| url: Remote location url | ||
| filename: Custom name to assign to downloaded file. | ||
| directory: Base directory for file download. This directory will be | ||
| created if it does not already exist. | ||
| overwrite: Overwrite any existing files. | ||
| progress_bar: Show progress bar during download. | ||
|
|
||
| Returns: | ||
| Context manager providing the streaming response. | ||
| Path to downloaded file. | ||
|
|
||
| Raises: | ||
| planet.exceptions.APIException: On API error. | ||
| planet.exceptions.ClientError: When retry limit is exceeded. | ||
|
|
||
| """ | ||
| request = self._client.build_request(method=method, url=url) | ||
| http_response = await self._retry(self._send, request, stream=True) | ||
| response = models.StreamingResponse(http_response) | ||
|
|
||
| async def _write(): | ||
| async with self._client.stream('GET', url) as response: | ||
|
|
||
| dl_path = Path( | ||
| directory, | ||
| filename or _get_filename_from_response(response)) | ||
| dl_path.parent.mkdir(exist_ok=True, parents=True) | ||
|
|
||
| await self._write_response(response, | ||
| dl_path, | ||
| overwrite=overwrite, | ||
| progress_bar=progress_bar) | ||
|
|
||
| return dl_path | ||
|
|
||
| async def _limited_write(): | ||
| async with self._limiter: | ||
| dl_path = await _write() | ||
| return dl_path | ||
|
|
||
| return await self._retry(_limited_write) | ||
|
|
||
| async def _write_response(self, | ||
| response, | ||
| filename, | ||
| overwrite, | ||
| progress_bar): | ||
| total = int(response.headers["Content-Length"]) | ||
|
|
||
| try: | ||
| yield response | ||
| finally: | ||
| await response.aclose() | ||
| mode = 'wb' if overwrite else 'xb' | ||
| with open(filename, mode) as fp: | ||
|
|
||
| with tqdm(total=total, | ||
| unit_scale=True, | ||
| unit_divisor=1024 * 1024, | ||
| unit='B', | ||
| desc=str(filename), | ||
| disable=not progress_bar) as progress: | ||
| previous = response.num_bytes_downloaded | ||
|
|
||
| async for chunk in response.aiter_bytes(): | ||
| fp.write(chunk) | ||
| new = response.num_bytes_downloaded - previous | ||
| progress.update(new - previous) | ||
| previous = new | ||
| progress.update() | ||
|
||
| except FileExistsError: | ||
| LOGGER.info(f'File {filename} exists, not overwriting') | ||
|
|
||
| def client(self, | ||
| name: Literal['data', 'orders', 'subscriptions'], | ||
|
|
@@ -439,6 +498,51 @@ def client(self, | |
| raise exceptions.ClientError("No such client.") | ||
|
|
||
|
|
||
| def _get_filename_from_response(response) -> str: | ||
| """The name of the response resource. | ||
|
|
||
| The default is to use the content-disposition header value from the | ||
| response. If not found, falls back to resolving the name from the url | ||
| or generating a random name with the type from the response. | ||
| """ | ||
| name = (_get_filename_from_headers(response.headers) | ||
| or _get_filename_from_url(response.url) | ||
| or _get_random_filename(response.headers.get('content-type'))) | ||
| return name | ||
|
|
||
|
|
||
| def _get_filename_from_headers(headers): | ||
| """Get a filename from the Content-Disposition header, if available. | ||
|
|
||
| :param headers dict: a ``dict`` of response headers | ||
| :returns: a filename (i.e. ``basename``) | ||
| :rtype: str or None | ||
| """ | ||
| cd = headers.get('content-disposition', '') | ||
| match = re.search('filename="?([^"]+)"?', cd) | ||
| return match.group(1) if match else None | ||
|
|
||
|
|
||
| def _get_filename_from_url(url: str) -> Optional[str]: | ||
| """Get a filename from a url. | ||
|
|
||
| Getting a name for Landsat imagery uses this function. | ||
| """ | ||
| path = urlparse(url).path | ||
| name = path[path.rfind('/') + 1:] | ||
| return name or None | ||
|
|
||
|
|
||
| def _get_random_filename(content_type=None) -> str: | ||
| """Get a pseudo-random, Planet-looking filename. | ||
| """ | ||
| extension = mimetypes.guess_extension(content_type or '') or '' | ||
| characters = string.ascii_letters + '0123456789' | ||
| letters = ''.join(random.sample(characters, 8)) | ||
| name = 'planet-{}{}'.format(letters, extension) | ||
| return name | ||
|
|
||
|
|
||
| class AuthSession(BaseSession): | ||
| """Synchronous connection to the Planet Auth service.""" | ||
|
|
||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jreiberkyle this is not the right level for these features. Let's allow the Session class to work in a simple space with no progress bars and no directories. Just requests and responses and bytes going in and out. If we do, I think we'll find it easier to refactor and maintain over time. Principles of least concern and strong boundaries are what I'm invoking.
Concretely, let's give the download methods the responsibility of ensuring that directories exist and that we have a valid destination filename. Also, those methods should create progress bars and pass them to Session.write, which should just do
progress.update()(dependency injection).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed on separating concerns. I'm getting stuck on initiating the progress bar, which requires a
totalwhich can only be read by the response. I will look deeper into how to make this work with the client download functions handling creating of the progress bar.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this time, retry is managed by
Session. ForSessionto manage retry, it has to submit the original request for the download (so it can retry that request). When it submits the request, the response is not available to the client download function. So, the download function cannot access thetotalvalue in the response headers. There are two ways I can think of to allow download function to create the progress bar:Session.write, one for update and one for setting the totalSession.writesubmit the original request again as a part of the write processI tend to think more than one callback is complex and so prefer option 2 even though it will be less efficient communication-wise, but I am not sure my aversion to multiple callbacks is warrented. @sgillies what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't look like tqdm really supports setting the total after the progress bar has been initialized, so I am going to go with option 2: have the download function submit the original request to get the total, then have session.write submit it again to perform the download.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jreiberkyle option 2 is better, I agree. Can the double requests be avoided by making a decorator form of the retrier and re-trying not the HTTP requests, but
download_asset()? Like thisMaybe for the next iteration? I think double requests are tolerable for now. Would it be best to make sure the first one gets closed/finalized before the second one is made? That's something to look into.
It also occurs to me that file sizes are in the order manifest.json file, so there's another way to get them. Also making another request, but only one extra per order.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Decorating the client method has an example here: https://github.com/hynek/stamina#production-grade-retries-made-easy. I bet you could turn the SDK's retrier into such a decorator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed that we can generalize the retry function and make it work for retrying
download_asset(). In that case, we may want to changeSession.requestso that it does not also retry (thereby squaring the effective MAX_RETRIES), and then change all the functions callingSession.requestso they DO retry. At any rate, it would require some more thought. So, I went with option 2 - two calls.