diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 6051aa4a..e5add9e5 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file. The format is inspired by `Keep a Changelog `_ and this project adheres to `Semantic Versioning `_. +`v0.12.0`_ - 11-November-2023 +----------------------------- +Added ++++++ +- Add option to interpolate variables as an alternative for simple variable + proxies. + `v0.11.3`_ - 0-Undefined-2023 ----------------------------- Changed @@ -399,6 +406,7 @@ Added - Initial release. +.. _v0.12.0: https://github.com/joke2k/django-environ/compare/v0.11.3...v0.12.0 .. _v0.11.3: https://github.com/joke2k/django-environ/compare/v0.11.2...v0.11.3 .. _v0.11.2: https://github.com/joke2k/django-environ/compare/v0.11.1...v0.11.2 .. _v0.11.1: https://github.com/joke2k/django-environ/compare/v0.11.0...v0.11.1 diff --git a/docs/tips.rst b/docs/tips.rst index 66538c40..5ece6c8c 100644 --- a/docs/tips.rst +++ b/docs/tips.rst @@ -290,20 +290,53 @@ The following example demonstrates the above: Proxy value =========== -Values that being with a ``$`` may be interpolated. Pass ``interpolate=True`` to -``environ.Env()`` to enable this feature: +Values that begin with a ``$`` may be interpolated: .. code-block:: python import environ - env = environ.Env(interpolate=True) + env = environ.Env() # BAR=FOO # PROXY=$BAR >>> print(env.str('PROXY')) FOO +Interpolation +============= + +Variable interpolation can be enabled with ``interpolation``. It allows for more +complex variable interpolation than proxies: + +.. code-block:: python + + import environ + + env = environ.Env() + env.interpolation = True + + # FOO=abc + # BAR=def + # INTERPOLATED=prefix:$FOO@${BAR}Suffix + env.str('INTERPOLATED') # prefix:abc@defSuffix + +Variables with escaped dollar sign (``\$``) are not interpolated. + +When variable does not exist, an exception will be raised. These exceptions can +be disabled with ``raise_on_missing``: + +.. code-block:: python + + import environ + + env = environ.Env() + env.interpolation = True + env.raise_on_missing = False + + # FOO=abc + # INTERPOLATED=prefix:$FOO@${BAR}Suffix + env.str('INTERPOLATED') # prefix:abc@${BAR}Suffix Escape Proxy ============ diff --git a/environ/__init__.py b/environ/__init__.py index ddf05f92..d469ecb7 100644 --- a/environ/__init__.py +++ b/environ/__init__.py @@ -21,7 +21,7 @@ __copyright__ = 'Copyright (C) 2013-2023 Daniele Faraglia' """The copyright notice of the package.""" -__version__ = '0.11.3' +__version__ = '0.12.0' """The version of the package.""" __license__ = 'MIT' diff --git a/environ/environ.py b/environ/environ.py index a3d64f2d..b4a2881c 100644 --- a/environ/environ.py +++ b/environ/environ.py @@ -12,6 +12,7 @@ """ import ast +import functools import itertools import logging import os @@ -37,6 +38,8 @@ ) from .fileaware_mapping import FileAwareMapping +INTERPOLATION_CACHE_SIZE = 128 + Openable = (str, os.PathLike) logger = logging.getLogger(__name__) @@ -189,92 +192,104 @@ class Env: for s in ('', 's')] CLOUDSQL = 'cloudsql' + _VAR_PATTERN = re.compile(r""" + (?P + (?P\\)? + \$(?P[A-Z_][0-9A-Z_]*|\{[A-Z_][0-9A-Z]*}) + ) + """, re.IGNORECASE | re.VERBOSE) + def __init__(self, **scheme): self.smart_cast = True self.escape_proxy = False + self.interpolation = False + self.raise_on_missing = True self.prefix = "" self.scheme = scheme - def __call__(self, var, cast=None, default=NOTSET, parse_default=False): + def __call__(self, var, cast=None, default=NOTSET, parse_default=False, interpolate=None): return self.get_value( var, cast=cast, default=default, - parse_default=parse_default + parse_default=parse_default, + interpolate=interpolate ) def __contains__(self, var): return var in self.ENVIRON - def str(self, var, default=NOTSET, multiline=False): + def str(self, var, default=NOTSET, multiline=False, interpolate=None): """ :rtype: str """ - value = self.get_value(var, cast=str, default=default) + value = self.get_value(var, cast=str, default=default, interpolate=interpolate) if multiline: return re.sub(r'(\\r)?\\n', r'\n', value) return value - def bytes(self, var, default=NOTSET, encoding='utf8'): + def bytes(self, var, default=NOTSET, encoding='utf8', interpolate=None): """ :rtype: bytes """ - value = self.get_value(var, cast=str, default=default) + value = self.get_value(var, cast=str, default=default, interpolate=interpolate) if hasattr(value, 'encode'): return value.encode(encoding) return value - def bool(self, var, default=NOTSET): + def bool(self, var, default=NOTSET, interpolate=None): """ :rtype: bool """ - return self.get_value(var, cast=bool, default=default) + return self.get_value(var, cast=bool, default=default, interpolate=interpolate) - def int(self, var, default=NOTSET): + def int(self, var, default=NOTSET, interpolate=None): """ :rtype: int """ - return self.get_value(var, cast=int, default=default) + return self.get_value(var, cast=int, default=default, interpolate=interpolate) - def float(self, var, default=NOTSET): + def float(self, var, default=NOTSET, interpolate=None): """ :rtype: float """ - return self.get_value(var, cast=float, default=default) + return self.get_value(var, cast=float, default=default, interpolate=interpolate) - def json(self, var, default=NOTSET): + def json(self, var, default=NOTSET, interpolate=None): """ :returns: Json parsed """ - return self.get_value(var, cast=json.loads, default=default) + return self.get_value(var, cast=json.loads, default=default, interpolate=interpolate) - def list(self, var, cast=None, default=NOTSET): + def list(self, var, cast=None, default=NOTSET, interpolate=None): """ :rtype: list """ return self.get_value( var, cast=list if not cast else [cast], - default=default + default=default, + interpolate=interpolate ) - def tuple(self, var, cast=None, default=NOTSET): + def tuple(self, var, cast=None, default=NOTSET, interpolate=None): """ :rtype: tuple """ return self.get_value( var, cast=tuple if not cast else (cast,), - default=default + default=default, + interpolate=interpolate ) - def dict(self, var, cast=dict, default=NOTSET): + def dict(self, var, cast=dict, default=NOTSET, interpolate=None): """ :rtype: dict """ - return self.get_value(var, cast=cast, default=default) + return self.get_value(var, cast=cast, default=default, interpolate=interpolate) - def url(self, var, default=NOTSET): + def url(self, var, default=NOTSET, interpolate=None): """ :rtype: urllib.parse.ParseResult """ @@ -282,10 +297,11 @@ def url(self, var, default=NOTSET): var, cast=urlparse, default=default, - parse_default=True + parse_default=True, + interpolate=interpolate ) - def db_url(self, var=DEFAULT_DATABASE_ENV, default=NOTSET, engine=None): + def db_url(self, var=DEFAULT_DATABASE_ENV, default=NOTSET, engine=None, interpolate=None): """Returns a config dictionary, defaulting to DATABASE_URL. The db method is an alias for db_url. @@ -293,13 +309,13 @@ def db_url(self, var=DEFAULT_DATABASE_ENV, default=NOTSET, engine=None): :rtype: dict """ return self.db_url_config( - self.get_value(var, default=default), + self.get_value(var, default=default, interpolate=interpolate), engine=engine ) db = db_url - def cache_url(self, var=DEFAULT_CACHE_ENV, default=NOTSET, backend=None): + def cache_url(self, var=DEFAULT_CACHE_ENV, default=NOTSET, backend=None, interpolate=None): """Returns a config dictionary, defaulting to CACHE_URL. The cache method is an alias for cache_url. @@ -307,13 +323,13 @@ def cache_url(self, var=DEFAULT_CACHE_ENV, default=NOTSET, backend=None): :rtype: dict """ return self.cache_url_config( - self.url(var, default=default), + self.url(var, default=default, interpolate=interpolate), backend=backend ) cache = cache_url - def email_url(self, var=DEFAULT_EMAIL_ENV, default=NOTSET, backend=None): + def email_url(self, var=DEFAULT_EMAIL_ENV, default=NOTSET, backend=None, interpolate=None): """Returns a config dictionary, defaulting to EMAIL_URL. The email method is an alias for email_url. @@ -321,29 +337,29 @@ def email_url(self, var=DEFAULT_EMAIL_ENV, default=NOTSET, backend=None): :rtype: dict """ return self.email_url_config( - self.url(var, default=default), + self.url(var, default=default, interpolate=interpolate), backend=backend ) email = email_url - def search_url(self, var=DEFAULT_SEARCH_ENV, default=NOTSET, engine=None): + def search_url(self, var=DEFAULT_SEARCH_ENV, default=NOTSET, engine=None, interpolate=None): """Returns a config dictionary, defaulting to SEARCH_URL. :rtype: dict """ return self.search_url_config( - self.url(var, default=default), + self.url(var, default=default, interpolate=interpolate), engine=engine ) - def path(self, var, default=NOTSET, **kwargs): + def path(self, var, default=NOTSET, interpolate=None, **kwargs): """ :rtype: Path """ - return Path(self.get_value(var, default=default), **kwargs) + return Path(self.get_value(var, default=default, interpolate=interpolate), **kwargs) - def get_value(self, var, cast=None, default=NOTSET, parse_default=False): + def get_value(self, var, cast=None, default=NOTSET, parse_default=False, interpolate=None): """Return value for given environment variable. :param str var: @@ -354,6 +370,8 @@ def get_value(self, var, cast=None, default=NOTSET, parse_default=False): If var not present in environ, return this instead. :param bool parse_default: Force to parse default. + :param bool, optional interpolate: + Enable or disable interpolation. Uses class-defined ``interpolation`` as default. :returns: Value from environment or default (if set). :rtype: typing.IO[typing.Any] """ @@ -396,9 +414,17 @@ def get_value(self, var, cast=None, default=NOTSET, parse_default=False): # Resolve any proxied values prefix = b'$' if isinstance(value, bytes) else '$' escape = rb'\$' if isinstance(value, bytes) else r'\$' - if hasattr(value, 'startswith') and value.startswith(prefix): - value = value.lstrip(prefix) - value = self.get_value(value, cast=cast, default=default) + + if interpolate is None: + interpolate = self.interpolation + if interpolate: + # Interpolate variables + value = self.interpolate(value, var_name) + else: + # Resolve any proxied values + if hasattr(value, 'startswith') and value.startswith(prefix): + value = value.lstrip(prefix) + value = self.get_value(value, cast=cast, default=default) if self.escape_proxy and hasattr(value, 'replace'): value = value.replace(escape, prefix) @@ -416,6 +442,34 @@ def get_value(self, var, cast=None, default=NOTSET, parse_default=False): return value + @functools.lru_cache(maxsize=INTERPOLATION_CACHE_SIZE, typed=True) + def interpolate(self, value, var_name=None): + """Interpolate variables in provided value + + :param IO value: + String or bytes object to interpolate. + :param str, optional var_name: + Name of the variable whose value will be interpolated. + + :returns: Interpolated value. + """ + str_value = value.decode('utf-8') if isinstance(value, bytes) else value + for match in self._VAR_PATTERN.finditer(str_value): + if match.group('escape'): + continue # skip escaped variables + to_replace = match.group('to_replace') + name = match.group('name').lstrip('{').rstrip('}') + if var_name and name == var_name: + error_msg = f'Variable {name} references itself' + raise ImproperlyConfigured(error_msg) + try: + str_value = str_value.replace(to_replace, self.get_value(name)) + except ImproperlyConfigured: + if self.raise_on_missing: + raise + logger.warning("environment variable %s is not set", name) + return str_value.encode('utf-8') if isinstance(value, bytes) else str_value + @classmethod def parse_value(cls, value, cast): """Parse and cast provided value diff --git a/tests/fixtures.py b/tests/fixtures.py index 69e5e90f..584a4e3b 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -92,4 +92,17 @@ def generate_data(cls): EXPORTED_VAR=cls.EXPORTED, SAML_ATTRIBUTE_MAPPING='uid=username;mail=email;cn=first_name;sn=last_name;', PREFIX_TEST='foo', + FOO="foo", + BAR="bar", + INTERPOLATE_SIMPLE="$FOO", + INTERPOLATE_SIMPLE_PARENTHESES="${FOO}", + INTERPOLATE_SIMPLE_ESCAPED="\\$FOO", + INTERPOLATE_SIMPLE_PARENTHESES_ESCAPED="\\${FOO}", + INTERPOLATE_SIMPLE_PARENTHESES_NOT_OPENED="$FOO}", + INTERPOLATE_SIMPLE_PARENTHESES_NOT_CLOSED="${FOO", + INTERPOLATE_PARENTHESES_MISMATCH="${FOO$BAR}", + INTERPOLATE_PREFIXED="PREFIXED$FOO", + INTERPOLATE_SUFFIXED="$FOO@SUFFIXED", + INTERPOLATE_MULTIPLE="$FOO$BAR", + INTERPOLATE_MISSING="$MISSING", ) diff --git a/tests/test_env.py b/tests/test_env.py index 85e0499f..f2630a48 100644 --- a/tests/test_env.py +++ b/tests/test_env.py @@ -407,6 +407,70 @@ def test_prefix(self): self.env.prefix = 'PREFIX_' assert self.env('TEST') == 'foo' + def test_interpolation_simple(self): + self.env.interpolation = True + assert self.env('INTERPOLATE_SIMPLE') == 'foo' + + def test_interpolation_simple_parentheses(self): + self.env.interpolation = True + assert self.env('INTERPOLATE_SIMPLE_PARENTHESES') == 'foo' + + def test_interpolation_simple_escaped(self): + self.env.interpolation = True + assert self.env('INTERPOLATE_SIMPLE_ESCAPED') == '\\$FOO' + + def test_interpolation_simple_parentheses_escaped(self): + self.env.interpolation = True + assert self.env('INTERPOLATE_SIMPLE_PARENTHESES_ESCAPED') == '\\${FOO}' + + def test_interpolation_simple_parentheses_not_opened(self): + self.env.interpolation = True + assert self.env('INTERPOLATE_SIMPLE_PARENTHESES_NOT_OPENED') == 'foo}' + + def test_interpolation_simple_parentheses_not_closed(self): + self.env.interpolation = True + assert self.env('INTERPOLATE_SIMPLE_PARENTHESES_NOT_CLOSED') == '${FOO' + + def test_interpolation_parentheses_mismatch(self): + self.env.interpolation = True + assert self.env('INTERPOLATE_PARENTHESES_MISMATCH') == '${FOObar}' + + def test_interpolation_prefixed(self): + self.env.interpolation = True + assert self.env('INTERPOLATE_PREFIXED') == 'PREFIXEDfoo' + + def test_interpolation_suffixed(self): + self.env.interpolation = True + assert self.env('INTERPOLATE_SUFFIXED') == 'foo@SUFFIXED' + + def test_interpolation_multiple(self): + self.env.interpolation = True + assert self.env('INTERPOLATE_MULTIPLE') == 'foobar' + + def test_interpolation_missing(self): + self.env.interpolation = True + with pytest.raises(ImproperlyConfigured) as excinfo: + self.env('INTERPOLATE_MISSING') + assert str(excinfo.value) == 'Set the MISSING environment variable' + assert excinfo.value.__cause__ is not None + + def test_interpolation_missing_disabled(self): + self.env.interpolation = True + self.env.raise_on_missing = False + assert self.env('INTERPOLATE_MISSING') == '$MISSING' + + def test_interpolation_force_enabled(self): + self.env.interpolation = False + assert self.env.get_value('INTERPOLATE_PREFIXED', interpolate=True) == 'PREFIXEDfoo' + + def test_interpolation_force_disabled_use_proxy(self): + self.env.interpolation = True + assert self.env.get_value('INTERPOLATE_SIMPLE', interpolate=False) == 'foo' + + def test_interpolation_force_disabled_no_proxy(self): + self.env.interpolation = True + assert self.env.get_value('INTERPOLATE_PREFIXED', interpolate=False) == 'PREFIXED$FOO' + class TestFileEnv(TestEnv): def setup_method(self, method): diff --git a/tests/test_env.txt b/tests/test_env.txt index 39ab896a..08454e69 100644 --- a/tests/test_env.txt +++ b/tests/test_env.txt @@ -65,3 +65,18 @@ export EXPORTED_VAR="exported var" # Prefixed PREFIX_TEST='foo' + +# Interpolation +FOO="foo" +BAR="bar" +INTERPOLATE_SIMPLE="$FOO" +INTERPOLATE_SIMPLE_PARENTHESES="${FOO}" +INTERPOLATE_SIMPLE_ESCAPED="\\$FOO" +INTERPOLATE_SIMPLE_PARENTHESES_ESCAPED="\\${FOO}" +INTERPOLATE_SIMPLE_PARENTHESES_NOT_OPENED="$FOO}" +INTERPOLATE_SIMPLE_PARENTHESES_NOT_CLOSED="${FOO" +INTERPOLATE_PARENTHESES_MISMATCH="${FOO$BAR}" +INTERPOLATE_PREFIXED="PREFIXED$FOO" +INTERPOLATE_SUFFIXED="$FOO@SUFFIXED" +INTERPOLATE_MULTIPLE="$FOO$BAR" +INTERPOLATE_MISSING="$MISSING"