Skip to content
Merged
19 changes: 8 additions & 11 deletions planet/clients/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from .. import exceptions
from ..constants import PLANET_BASE_URL
from ..http import Session
from ..models import Paged, StreamingBody
from ..models import Paged
from ..specs import validate_data_item_type

BASE_URL = f'{PLANET_BASE_URL}/data/v1/'
Expand Down Expand Up @@ -586,23 +586,20 @@ async def download_asset(self,

Raises:
planet.exceptions.APIError: On API error.
planet.exceptions.ClientError: If asset is not active or asset
description is not valid.
planet.exceptions.ClientError: If asset is not active, asset
description is not valid, or retry limit is exceeded.
"""
try:
location = asset['location']
except KeyError:
raise exceptions.ClientError(
'asset missing ["location"] entry. Is asset active?')

async with self._session.stream(method='GET', url=location) as resp:
body = StreamingBody(resp)
dl_path = Path(directory, filename or body.name)
dl_path.parent.mkdir(exist_ok=True, parents=True)
await body.write(dl_path,
overwrite=overwrite,
progress_bar=progress_bar)
return dl_path
return await self._session.write(location,
filename=filename,
directory=directory,
overwrite=overwrite,
progress_bar=progress_bar)

@staticmethod
def validate_checksum(asset: dict, filename: Path):
Expand Down
18 changes: 9 additions & 9 deletions planet/clients/orders.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from .. import exceptions
from ..constants import PLANET_BASE_URL
from ..http import Session
from ..models import Paged, StreamingBody
from ..models import Paged

BASE_URL = f'{PLANET_BASE_URL}/compute/ops'
STATS_PATH = '/stats/orders/v2'
Expand Down Expand Up @@ -251,15 +251,15 @@ async def download_asset(self,

Raises:
planet.exceptions.APIError: On API error.
planet.exceptions.ClientError: If location is not valid or retry
limit is exceeded.

"""
async with self._session.stream(method='GET', url=location) as resp:
body = StreamingBody(resp)
dl_path = Path(directory, filename or body.name)
dl_path.parent.mkdir(exist_ok=True, parents=True)
await body.write(dl_path,
overwrite=overwrite,
progress_bar=progress_bar)
return dl_path
return await self._session.write(location,
filename=filename,
directory=directory,
overwrite=overwrite,
progress_bar=progress_bar)

async def download_order(self,
order_id: str,
Expand Down
138 changes: 121 additions & 17 deletions planet/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@
from __future__ import annotations # https://stackoverflow.com/a/33533514
import asyncio
from collections import Counter
from contextlib import asynccontextmanager
from http import HTTPStatus
import logging
import mimetypes
from pathlib import Path
import random
import re
import string
import time
from typing import AsyncGenerator, Optional
from typing import Optional
from urllib.parse import urlparse

import httpx
from tqdm.asyncio import tqdm
from typing_extensions import Literal

from .auth import Auth, AuthType
Expand All @@ -42,7 +47,7 @@
httpx.ReadTimeout,
httpx.RemoteProtocolError,
exceptions.BadGateway,
exceptions.TooManyRequests
exceptions.TooManyRequests,
]
MAX_RETRIES = 5
MAX_RETRY_BACKOFF = 64 # seconds
Expand Down Expand Up @@ -394,26 +399,80 @@ async def _send(self, request, stream=False) -> httpx.Response:

return http_resp

@asynccontextmanager
async def stream(
self, method: str,
url: str) -> AsyncGenerator[models.StreamingResponse, None]:
"""Submit a request and get the response as a stream context manager.
async def write(self,
url: str,
filename: Optional[str] = None,
directory: Path = Path('.'),
overwrite: bool = False,
progress_bar: bool = False) -> Path:
Copy link
Contributor

@sgillies sgillies Jul 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jreiberkyle this is not the right level for these features. Let's allow the Session class to work in a simple space with no progress bars and no directories. Just requests and responses and bytes going in and out. If we do, I think we'll find it easier to refactor and maintain over time. Principles of least concern and strong boundaries are what I'm invoking.

Concretely, let's give the download methods the responsibility of ensuring that directories exist and that we have a valid destination filename. Also, those methods should create progress bars and pass them to Session.write, which should just do progress.update() (dependency injection).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed on separating concerns. I'm getting stuck on initiating the progress bar, which requires a total which can only be read by the response. I will look deeper into how to make this work with the client download functions handling creating of the progress bar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this time, retry is managed by Session. For Session to manage retry, it has to submit the original request for the download (so it can retry that request). When it submits the request, the response is not available to the client download function. So, the download function cannot access the total value in the response headers. There are two ways I can think of to allow download function to create the progress bar:

  1. pass two callbacks to Session.write, one for update and one for setting the total
  2. have the download function submit the original request to get the total value, then have Session.write submit the original request again as a part of the write process

I tend to think more than one callback is complex and so prefer option 2 even though it will be less efficient communication-wise, but I am not sure my aversion to multiple callbacks is warrented. @sgillies what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like tqdm really supports setting the total after the progress bar has been initialized, so I am going to go with option 2: have the download function submit the original request to get the total, then have session.write submit it again to perform the download.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jreiberkyle option 2 is better, I agree. Can the double requests be avoided by making a decorator form of the retrier and re-trying not the HTTP requests, but download_asset()? Like this

    @retry
    async def download_asset(...):
        ...
        async with self._session._client.stream, method='GET', url=location) as resp:
            content_length = resp.headers['Content-Length']
            dl_path = Path(directory, filename or parsed_from_content_disposition_header)
            dl_path.parent.mkdir(exist_ok=True, parents=True)
            return await self._session._write_response(
                resp,
                dl_path,
                overwrite=overwrite,
                progress_bar=tqdm(total=content_length, ...)
             )

Maybe for the next iteration? I think double requests are tolerable for now. Would it be best to make sure the first one gets closed/finalized before the second one is made? That's something to look into.

It also occurs to me that file sizes are in the order manifest.json file, so there's another way to get them. Also making another request, but only one extra per order.

Copy link
Contributor

@sgillies sgillies Jul 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decorating the client method has an example here: https://github.com/hynek/stamina#production-grade-retries-made-easy. I bet you could turn the SDK's retrier into such a decorator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed that we can generalize the retry function and make it work for retrying download_asset(). In that case, we may want to change Session.request so that it does not also retry (thereby squaring the effective MAX_RETRIES), and then change all the functions calling Session.request so they DO retry. At any rate, it would require some more thought. So, I went with option 2 - two calls.

"""Write data to local file with limiting and retries.

Parameters:
method: HTTP request method.
url: Location of the API endpoint.
url: Remote location url
filename: Custom name to assign to downloaded file.
directory: Base directory for file download. This directory will be
created if it does not already exist.
overwrite: Overwrite any existing files.
progress_bar: Show progress bar during download.

Returns:
Context manager providing the streaming response.
Path to downloaded file.

Raises:
planet.exceptions.APIException: On API error.
planet.exceptions.ClientError: When retry limit is exceeded.

"""
request = self._client.build_request(method=method, url=url)
http_response = await self._retry(self._send, request, stream=True)
response = models.StreamingResponse(http_response)

async def _write():
async with self._client.stream('GET', url) as response:

dl_path = Path(
directory,
filename or _get_filename_from_response(response))
dl_path.parent.mkdir(exist_ok=True, parents=True)

await self._write_response(response,
dl_path,
overwrite=overwrite,
progress_bar=progress_bar)

return dl_path

async def _limited_write():
async with self._limiter:
dl_path = await _write()
return dl_path

return await self._retry(_limited_write)

async def _write_response(self,
response,
filename,
overwrite,
progress_bar):
total = int(response.headers["Content-Length"])

try:
yield response
finally:
await response.aclose()
mode = 'wb' if overwrite else 'xb'
with open(filename, mode) as fp:

with tqdm(total=total,
unit_scale=True,
unit_divisor=1024 * 1024,
unit='B',
desc=str(filename),
disable=not progress_bar) as progress:
previous = response.num_bytes_downloaded

async for chunk in response.aiter_bytes():
fp.write(chunk)
new = response.num_bytes_downloaded - previous
progress.update(new - previous)
previous = new
progress.update()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jreiberkyle do you mean to put this statement within the chunk iteration? Unless you do, I don't see how progress is shown.

except FileExistsError:
LOGGER.info(f'File {filename} exists, not overwriting')

def client(self,
name: Literal['data', 'orders', 'subscriptions'],
Expand All @@ -439,6 +498,51 @@ def client(self,
raise exceptions.ClientError("No such client.")


def _get_filename_from_response(response) -> str:
"""The name of the response resource.

The default is to use the content-disposition header value from the
response. If not found, falls back to resolving the name from the url
or generating a random name with the type from the response.
"""
name = (_get_filename_from_headers(response.headers)
or _get_filename_from_url(response.url)
or _get_random_filename(response.headers.get('content-type')))
return name


def _get_filename_from_headers(headers):
"""Get a filename from the Content-Disposition header, if available.

:param headers dict: a ``dict`` of response headers
:returns: a filename (i.e. ``basename``)
:rtype: str or None
"""
cd = headers.get('content-disposition', '')
match = re.search('filename="?([^"]+)"?', cd)
return match.group(1) if match else None


def _get_filename_from_url(url: str) -> Optional[str]:
"""Get a filename from a url.

Getting a name for Landsat imagery uses this function.
"""
path = urlparse(url).path
name = path[path.rfind('/') + 1:]
return name or None


def _get_random_filename(content_type=None) -> str:
"""Get a pseudo-random, Planet-looking filename.
"""
extension = mimetypes.guess_extension(content_type or '') or ''
characters = string.ascii_letters + '0123456789'
letters = ''.join(random.sample(characters, 8))
name = 'planet-{}{}'.format(letters, extension)
return name


class AuthSession(BaseSession):
"""Synchronous connection to the Planet Auth service."""

Expand Down
Loading