diff --git a/tests/conftest.py b/tests/conftest.py index ba082220e..c3d5a987b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,22 +1,68 @@ -import random as rand +"""Common test fixtures for MUSE tests.""" + from collections.abc import Mapping +from contextlib import contextmanager +from copy import deepcopy +from importlib import import_module +from logging import CRITICAL, getLogger +from os import walk from pathlib import Path -from typing import Callable +from typing import Callable, Optional, Union from unittest.mock import patch +from warnings import filterwarnings, simplefilter import numpy as np -from pandas import DataFrame +import pandas as pd +from numpy.random import choice, default_rng, rand, randint from pytest import fixture from xarray import DataArray, Dataset from muse.__main__ import patched_broadcast_compat_data from muse.agents import Agent +from muse.commodities import CommodityUsage +from muse.timeslices import setup_module + +RANDOM_SEED = 123 + +DEFAULT_TIMESLICES = """ +[timeslices] +winter.weekday.night = 396 +winter.weekday.morning = 396 +winter.weekday.afternoon = 264 +winter.weekday.early-peak = 66 +winter.weekday.late-peak = 66 +winter.weekday.evening = 396 +winter.weekend.night = 156 +winter.weekend.morning = 156 +winter.weekend.afternoon = 156 +winter.weekend.evening = 156 +spring-autumn.weekday.night = 792 +spring-autumn.weekday.morning = 792 +spring-autumn.weekday.afternoon = 528 +spring-autumn.weekday.early-peak = 132 +spring-autumn.weekday.late-peak = 132 +spring-autumn.weekday.evening = 792 +spring-autumn.weekend.night = 300 +spring-autumn.weekend.morning = 300 +spring-autumn.weekend.afternoon = 300 +spring-autumn.weekend.evening = 300 +summer.weekday.night = 396 +summer.weekday.morning = 396 +summer.weekday.afternoon = 264 +summer.weekday.early-peak = 66 +summer.weekday.late-peak = 66 +summer.weekday.evening = 396 +summer.weekend.night = 150 +summer.weekend.morning = 150 +summer.weekend.afternoon = 150 +summer.weekend.evening = 150 +level_names = ["month", "day", "hour"] +""" @fixture(autouse=True) def logger(): - from logging import CRITICAL, getLogger - + """Configure logger for tests.""" logger = getLogger("muse") logger.setLevel(CRITICAL) return logger @@ -33,31 +79,45 @@ def patch_broadcast_compat_data(): @fixture(autouse=True) def random(): """Set random seed for all tests to make them reproducible.""" - rand.seed(123) - np.random.seed(123) + rng = default_rng(RANDOM_SEED) + np.random.seed(RANDOM_SEED) + return rng def compare_df( - expected: DataFrame, - actual: DataFrame, + expected: pd.DataFrame, + actual: pd.DataFrame, rtol: float = 1e-5, atol: float = 1e-8, - equal_nan=False, - msg=None, -): - """Compares two dataframes approximately. - - Uses `numpy.allclose` for columns which are floating points. + equal_nan: bool = False, + msg: Optional[str] = None, +) -> None: + """Compare two dataframes approximately. + + Args: + expected: Expected dataframe + actual: Actual dataframe to compare + rtol: Relative tolerance + atol: Absolute tolerance + equal_nan: Whether to consider NaN values equal + msg: Optional message to display on failure + + Raises: + AssertionError: If dataframes don't match within tolerances """ from pytest import approx - assert set(expected.columns) == set(actual.columns) - assert expected.shape == actual.shape - assert set(expected.index) == set(actual.index) + assert set(expected.columns) == set(actual.columns), "Columns don't match" + assert expected.shape == actual.shape, "Shapes don't match" + assert set(expected.index) == set(actual.index), "Indices don't match" floats = [u for (u, d) in zip(actual.columns, actual.dtypes) if d == "float"] nonfloats = [u for (u, d) in zip(actual.columns, actual.dtypes) if d != "float"] - assert all(expected[nonfloats] == actual.loc[expected.index, nonfloats]) + + assert all(expected[nonfloats] == actual.loc[expected.index, nonfloats]), ( + "Non-float columns don't match" + ) + for col in floats: actual_col = actual.loc[expected.index, col].values expected_col = expected[col].values @@ -80,23 +140,32 @@ def compare_df( @fixture def compare_dirs() -> Callable: - def compare_dirs(actual_dir, expected_dir, **kwargs): - """Compares all the csv files in a directory.""" - from os import walk + """Factory for directory comparison function.""" - from pandas import read_csv + def compare_dirs( + actual_dir: Union[str, Path], expected_dir: Union[str, Path], **kwargs + ) -> None: + """Compare all CSV files in two directories. + Args: + actual_dir: Path to directory with actual files + expected_dir: Path to directory with expected files + **kwargs: Additional arguments passed to compare_df + + Raises: + AssertionError: If directories don't match or test is not set up correctly + """ compared_something = False for dirpath, _, filenames in walk(expected_dir): subdir = Path(actual_dir) / Path(dirpath).relative_to(expected_dir) for filename in filenames: compared_something = True expected_filename = Path(dirpath) / filename - expected = read_csv(expected_filename) + expected = pd.read_csv(expected_filename) actual_filename = Path(subdir) / filename - assert actual_filename.exists() - assert actual_filename.is_file() - actual = read_csv(actual_filename) + assert actual_filename.exists(), f"Missing file: {actual_filename}" + assert actual_filename.is_file(), f"Not a file: {actual_filename}" + actual = pd.read_csv(actual_filename) try: compare_df(expected, actual, msg=filename, **kwargs) except Exception: @@ -116,48 +185,13 @@ def compare_dirs(actual_dir, expected_dir, **kwargs): @fixture def default_timeslice_globals(): - from muse.timeslices import setup_module - - default_timeslices = """ - [timeslices] - winter.weekday.night = 396 - winter.weekday.morning = 396 - winter.weekday.afternoon = 264 - winter.weekday.early-peak = 66 - winter.weekday.late-peak = 66 - winter.weekday.evening = 396 - winter.weekend.night = 156 - winter.weekend.morning = 156 - winter.weekend.afternoon = 156 - winter.weekend.evening = 156 - spring-autumn.weekday.night = 792 - spring-autumn.weekday.morning = 792 - spring-autumn.weekday.afternoon = 528 - spring-autumn.weekday.early-peak = 132 - spring-autumn.weekday.late-peak = 132 - spring-autumn.weekday.evening = 792 - spring-autumn.weekend.night = 300 - spring-autumn.weekend.morning = 300 - spring-autumn.weekend.afternoon = 300 - spring-autumn.weekend.evening = 300 - summer.weekday.night = 396 - summer.weekday.morning = 396 - summer.weekday.afternoon = 264 - summer.weekday.early-peak = 66 - summer.weekday.late-peak = 66 - summer.weekday.evening = 396 - summer.weekend.night = 150 - summer.weekend.morning = 150 - summer.weekend.afternoon = 150 - summer.weekend.evening = 150 - level_names = ["month", "day", "hour"] - """ - - setup_module(default_timeslices) + """Set up default timeslice configuration.""" + setup_module(DEFAULT_TIMESLICES) @fixture -def timeslice(default_timeslice_globals) -> Dataset: +def timeslice(default_timeslice_globals) -> DataArray: + """Get the default timeslice dataset.""" from muse.timeslices import TIMESLICE return TIMESLICE @@ -165,9 +199,15 @@ def timeslice(default_timeslice_globals) -> Dataset: @fixture def coords() -> Mapping: - """Technoeconomics coordinates.""" + """Return standard coordinates for test cases. + + Returns: + Mapping with technology, region, year, commodity and comm_type coordinates + """ return { "technology": ["burger_flipper", "soda_shaker", "deep_frier", "salad_arranger"], + "tech_type": ["solid", "liquid", "solid", "liquid"], + "fuel": ["person", "person", "oil", "person"], "region": ["ASEAN", "USA"], "year": [2010, 2030], "commodity": [ @@ -194,10 +234,15 @@ def coords() -> Mapping: @fixture -def agent_args(coords) -> Mapping: - """Some standard arguments defining an agent.""" - from numpy.random import choice, rand, randint +def agent_args(coords: Mapping) -> Mapping: + """Generate standard arguments for creating an agent. + Args: + coords: Standard coordinate mapping + + Returns: + Mapping with region, share, enduses and maturity_threshold + """ return { "region": choice(coords["region"]), "share": "agent_share", @@ -209,40 +254,52 @@ def agent_args(coords) -> Mapping: } -@fixture -def technologies(coords) -> Dataset: - """Randomly generated technology characteristics.""" - from numpy import nonzero, sum - from numpy.random import choice, rand, randint +def var_generator(result: Dataset, dims: list[str], factor: float = 100.0) -> tuple: + """Generate random variables for a dataset. - from muse.commodities import CommodityUsage + Args: + result: Dataset to generate variables for + dims: Dimensions to generate variables over + factor: Scaling factor for random values - result = Dataset(coords=coords) + Returns: + Tuple of (dims, random_values) + """ + shape = tuple(len(result[u]) for u in dims) + return dims, (rand(*shape) * factor).astype(type(factor)) - result["comm_type"] = ("commodity", coords["comm_type"]) - result["tech_type"] = "technology", ["solid", "liquid", "solid", "liquid"] - result = result.set_coords(("comm_type", "tech_type")) +@fixture +def technologies(coords: Mapping) -> Dataset: + """Generate random technology characteristics. + + Args: + coords: Standard coordinate mapping - def var(*dims, factor=100.0): - shape = tuple(len(result[u]) for u in dims) - return dims, (rand(*shape) * factor).astype(type(factor)) + Returns: + Dataset with technology characteristics + """ + result = Dataset(coords=coords) + result["comm_type"] = "commodity", coords["comm_type"] + result["tech_type"] = "technology", coords["tech_type"] + result["fuel"] = "technology", coords["fuel"] + result = result.set_coords(("comm_type", "tech_type")) - result["agent_share"] = var("technology", "region", "year") - result["agent_share"] /= sum(result.agent_share) - result["agent_share_zero"] = result["agent_share"] * 0 + # We have a single agent with a share of 1 for all technologies + result["agent_share"] = var_generator(result, ["technology", "region", "year"]) + result["agent_share"] /= np.sum(result.agent_share) # first create a mask so each tech will have consistent inputs/outputs across years # and regions fuels = result.comm_type == "energy" - result["fixed_inputs"] = var("technology", "commodity") + result["fixed_inputs"] = var_generator(result, ["technology", "commodity"]) result.fixed_inputs[:] = randint(0, 3, result.fixed_inputs.shape) == 0 result.fixed_inputs.loc[{"commodity": ~fuels}] = 0 result["flexible_inputs"] = result.fixed_inputs * ( randint(0, 2, result.fixed_inputs.shape) == 0 ) - result["fixed_outputs"] = var("technology", "commodity") + result["fixed_outputs"] = var_generator(result, ["technology", "commodity"]) result.fixed_outputs[:] = randint(0, 3, result.fixed_outputs.shape) == 0 enduses = result.comm_type == "service" environmentals = result.comm_type == "environmental" @@ -252,15 +309,15 @@ def var(*dims, factor=100.0): for tech in result.technology: fin = result.fixed_inputs if (fin.sel(technology=tech, commodity=fuels) < 1e-12).all(): - i = result.commodity[choice(nonzero(fuels.values)[0])] + i = result.commodity[choice(np.nonzero(fuels.values)[0])] fin.loc[{"technology": tech, "commodity": i}] = 1 fout = result.fixed_outputs if (fout.sel(technology=tech, commodity=enduses) < 1e-12).all(): - i = result.commodity[choice(nonzero(enduses.values)[0])] + i = result.commodity[choice(np.nonzero(enduses.values)[0])] fout.loc[{"technology": tech, "commodity": i}] = 1 - # expand along year and region, and fill with random numbers + # Expand along year and region dimensions ones = (result.year == result.year) * (result.region == result.region) result["fixed_inputs"] = result.fixed_inputs * ones result.fixed_inputs[:] *= rand(*result.fixed_inputs.shape) @@ -269,27 +326,55 @@ def var(*dims, factor=100.0): result["fixed_outputs"] = result.fixed_outputs * ones result.fixed_outputs[:] *= rand(*result.fixed_outputs.shape) - result["total_capacity_limit"] = var("technology", "region", "year") + # Generate capacity and utilization parameters + result["total_capacity_limit"] = var_generator( + result, ["technology", "region", "year"] + ) result.total_capacity_limit.loc[{"year": 2030}] += result.total_capacity_limit.sel( year=2030 ) - result["max_capacity_addition"] = var("technology", "region", "year") - result["max_capacity_growth"] = var("technology", "region", "year") + result["max_capacity_addition"] = var_generator( + result, ["technology", "region", "year"] + ) + result["max_capacity_growth"] = var_generator( + result, ["technology", "region", "year"] + ) - result["utilization_factor"] = var("technology", "region", "year", factor=0.05) + result["utilization_factor"] = var_generator( + result, ["technology", "region", "year"], factor=0.05 + ) result.utilization_factor.values += 0.95 - result["fix_par"] = var("technology", "region", "year", factor=2.0) - result["cap_par"] = var("technology", "region", "year", factor=30.0) - result["var_par"] = var("technology", "region", "year", factor=1.0) - result["fix_exp"] = var("technology", "region", "year", factor=1.0) - result["cap_exp"] = var("technology", "region", "year", factor=1.0) - result["var_exp"] = var("technology", "region", "year", factor=1.0) - - result["technical_life"] = var("technology", "region", "year", factor=10) - result["technical_life"] = result.technical_life.astype(int).clip(min=1) - result["interest_rate"] = var("technology", "region", "year", factor=0.1) + # Generate cost parameters + result["fix_par"] = var_generator( + result, ["technology", "region", "year"], factor=2.0 + ) + result["cap_par"] = var_generator( + result, ["technology", "region", "year"], factor=30.0 + ) + result["var_par"] = var_generator( + result, ["technology", "region", "year"], factor=1.0 + ) + result["fix_exp"] = var_generator( + result, ["technology", "region", "year"], factor=1.0 + ) + result["cap_exp"] = var_generator( + result, ["technology", "region", "year"], factor=1.0 + ) + result["var_exp"] = var_generator( + result, ["technology", "region", "year"], factor=1.0 + ) + # Generate technical parameters + result["technical_life"] = var_generator( + result, ["technology", "region", "year"], factor=10 + ) + result["technical_life"] = result.technical_life.astype(int).clip(min=1) + result["interest_rate"] = var_generator( + result, ["technology", "region", "year"], factor=0.1 + ) + + # Set commodity usage result["comm_usage"] = "commodity", CommodityUsage.from_technologies(result).values result = result.set_coords("comm_usage").drop_vars("comm_type") @@ -297,54 +382,88 @@ def var(*dims, factor=100.0): @fixture -def agent_market(coords, timeslice) -> Dataset: - from numpy.random import rand +def agent_market(coords: Mapping, timeslice: DataArray) -> Dataset: + """Generate market data for agent testing. + + Args: + coords: Standard coordinate mapping + timeslice: Timeslice dataset + Returns: + Dataset with market data for agents + """ result = Dataset(coords=timeslice.coords) result["commodity"] = "commodity", coords["commodity"] result["region"] = "region", coords["region"] result["technology"] = "technology", coords["technology"] result["year"] = "year", coords["year"] - def var(*dims, factor=100.0): - shape = tuple(len(result[u]) for u in dims) - return dims, (rand(*shape) * factor).astype(type(factor)) - - result["capacity"] = var("technology", "region", "year") - result["supply"] = var("commodity", "region", "timeslice", "year") - result["consumption"] = var("commodity", "region", "timeslice", "year") - result["prices"] = var("commodity", "region", "year", "timeslice") + # Generate market variables + result["capacity"] = var_generator(result, ["technology", "region", "year"]) + result["supply"] = var_generator( + result, ["commodity", "region", "timeslice", "year"] + ) + result["consumption"] = var_generator( + result, ["commodity", "region", "timeslice", "year"] + ) + result["prices"] = var_generator( + result, ["commodity", "region", "year", "timeslice"] + ) return result @fixture -def market(coords, timeslice) -> Dataset: - from numpy.random import rand +def market(coords: Mapping, timeslice: DataArray) -> Dataset: + """Generate market data for testing. + Args: + coords: Standard coordinate mapping + timeslice: Timeslice dataset + + Returns: + Dataset with market data + """ result = Dataset(coords=timeslice.coords) result["commodity"] = "commodity", coords["commodity"] result["region"] = "region", coords["region"] result["year"] = "year", coords["year"] - def var(*dims, factor=100.0): - shape = tuple(len(result[u]) for u in dims) - return dims, (rand(*shape) * factor).astype(type(factor)) - - result["consumption"] = var("commodity", "region", "year", "timeslice") - result["supply"] = var("commodity", "region", "year", "timeslice") - result["prices"] = var("commodity", "region", "year", "timeslice") + # Generate market variables + result["consumption"] = var_generator( + result, ["commodity", "region", "year", "timeslice"] + ) + result["supply"] = var_generator( + result, ["commodity", "region", "year", "timeslice"] + ) + result["prices"] = var_generator( + result, ["commodity", "region", "year", "timeslice"] + ) return result -def create_agent(agent_args, technologies, stock, agent_type="retrofit") -> Agent: - from numpy.random import choice +def create_agent( + agent_args: Mapping, + technologies: Dataset, + stock: Dataset, + agent_type: str = "retrofit", +) -> Agent: + """Create an agent for testing. - from muse.agents.factories import create_agent + Args: + agent_args: Arguments for agent creation + technologies: Technology characteristics + stock: Stock data + agent_type: Type of agent to create ("retrofit" or "newcapa") + + Returns: + Created agent instance + """ + from muse.agents.factories import create_agent as factory_create_agent region = agent_args["region"] - agent = create_agent( + agent = factory_create_agent( agent_type=agent_type, technologies=technologies.sel(region=region), capacity=stock.where(stock.region == region, drop=True).assign_coords( @@ -364,37 +483,71 @@ def create_agent(agent_args, technologies, stock, agent_type="retrofit") -> Agen list(technology_names), len(technology_names) // 2, replace=False ) agent.assets = agent.assets.where(agent.assets.technology.isin(techs)) + return agent @fixture -def newcapa_agent(agent_args, technologies, stock) -> Agent: +def newcapa_agent(agent_args: Mapping, technologies: Dataset, stock: Dataset) -> Agent: + """Create a new capacity agent for testing. + + Args: + agent_args: Arguments for agent creation + technologies: Technology characteristics + stock: Stock data + + Returns: + New capacity agent instance + """ return create_agent(agent_args, technologies, stock.capacity, "newcapa") @fixture -def retro_agent(agent_args, technologies, stock) -> Agent: +def retro_agent(agent_args: Mapping, technologies: Dataset, stock: Dataset) -> Agent: + """Create a retrofit agent for testing. + + Args: + agent_args: Arguments for agent creation + technologies: Technology characteristics + stock: Stock data + + Returns: + Retrofit agent instance + """ return create_agent(agent_args, technologies, stock.capacity, "retrofit") @fixture -def stock(coords, technologies) -> Dataset: +def stock(coords: Mapping, technologies: Dataset) -> Dataset: + """Generate stock data for testing. + + Args: + coords: Standard coordinate mapping + technologies: Technology characteristics + + Returns: + Dataset with stock data + """ return _stock(coords, technologies) -def _stock( - coords, - technologies, -) -> Dataset: +def _stock(coords: Mapping, technologies: Dataset) -> Dataset: + """Internal function to generate stock data. + + Args: + coords: Standard coordinate mapping + technologies: Technology characteristics + + Returns: + Dataset with stock data + """ from numpy import cumprod, stack - from numpy.random import choice, rand - from xarray import Dataset from muse.utilities import broadcast_over_assets n_assets = 10 - # Create assets + # Create asset coordinates asset_coords = { "technology": ("asset", choice(coords["technology"], n_assets, replace=True)), "region": ("asset", choice(coords["region"], n_assets, replace=True)), @@ -402,7 +555,7 @@ def _stock( } assets = Dataset(coords=asset_coords) - # Create random capacity data + # Generate random capacity data capacity_limits = broadcast_over_assets(technologies.total_capacity_limit, assets) factors = cumprod(rand(n_assets, len(coords["year"])) / 4 + 0.75, axis=1).clip( max=1 @@ -412,7 +565,7 @@ def _stock( axis=1, ) - # Create capacity dataset + # Create final dataset result = assets.copy() result["year"] = "year", coords["year"] result["capacity"] = ("asset", "year"), capacity @@ -420,10 +573,16 @@ def _stock( @fixture -def demand_share(coords, timeslice): - """Example demand share, as would be computed by an agent.""" - from numpy.random import choice, rand +def demand_share(coords: Mapping, timeslice: DataArray) -> DataArray: + """Generate demand share data for testing. + Args: + coords: Standard coordinate mapping + timeslice: Timeslice dataset + + Returns: + DataArray with demand share data + """ n_assets = 5 axes = { "commodity": coords["commodity"], @@ -431,24 +590,26 @@ def demand_share(coords, timeslice): "technology": (["asset"], choice(coords["technology"], n_assets, replace=True)), "region": (["asset"], choice(coords["region"], n_assets, replace=True)), } - shape = ( - len(axes["commodity"]), - len(axes["timeslice"]), - n_assets, - ) - result = DataArray( + shape = (len(axes["commodity"]), len(axes["timeslice"]), n_assets) + + return DataArray( rand(*shape), dims=["commodity", "timeslice", "asset"], coords=axes, name="demand_share", ) - return result def create_fake_capacity(n: int, technologies: Dataset) -> DataArray: - from numpy.random import choice, rand - from xarray import Dataset + """Create fake capacity data for testing. + + Args: + n: Number of assets to create + technologies: Technology characteristics + Returns: + DataArray with fake capacity data + """ years = technologies.year techs = choice(technologies.technology.values, 5) regions = choice(technologies.region.values, 5) @@ -465,62 +626,32 @@ def create_fake_capacity(n: int, technologies: Dataset) -> DataArray: @fixture def capacity(technologies: Dataset) -> DataArray: - return create_fake_capacity(20, technologies) - + """Generate capacity data for testing. -@fixture -def settings(tmpdir) -> dict: - """Creates a dummy settings dictionary out of the default settings.""" - import toml - - from muse.readers import DEFAULT_SETTINGS_PATH - from muse.readers.toml import format_paths - - def drop_optionals(settings): - from copy import copy - - for k, v in copy(settings).items(): - if v == "OPTIONAL": - settings.pop(k) - elif isinstance(v, Mapping): - drop_optionals(v) - - settings = toml.load(DEFAULT_SETTINGS_PATH) - drop_optionals(settings) - out = format_paths(settings, cwd=tmpdir, path=tmpdir, muse_sectors=tmpdir) - - required = { - "time_framework": [2010, 2015, 2020], - "regions": ["MEX"], - "equilibrium": False, - "maximum_iterations": 3, - "tolerance": 0.1, - "interpolation_mode": "linear", - } - out.update(required) + Args: + technologies: Technology characteristics - carbon_budget_required = { - "budget": [420000, 413000, 403000], - "commodities": ["CO2f", "CO2r", "CH4", "N2O"], - } - - out["carbon_budget_control"].update(carbon_budget_required) - - return out + Returns: + DataArray with capacity data + """ + return create_fake_capacity(20, technologies) @fixture(autouse=True) def warnings_as_errors(request): - from warnings import filterwarnings, simplefilter + """Configure warnings to be treated as errors during testing. - # disable fixture for some tests + Args: + request: Pytest request object + """ + # Disable fixture for specific tests if ( request.module.__name__ == "test_outputs" and request.node.name == "test_save_with_fullpath_to_excel_with_sink" ): return - # Fail test if the following warnings are raised + # Configure warning filters simplefilter("error", FutureWarning) simplefilter("error", DeprecationWarning) simplefilter("error", PendingDeprecationWarning) @@ -538,19 +669,23 @@ def warnings_as_errors(request): @fixture def save_registries(): - from contextlib import contextmanager + """Save and restore registry state during tests.""" @contextmanager def saveme(module_name: str, registry_name: str): - from copy import deepcopy - from importlib import import_module + """Save and restore a specific registry. + Args: + module_name: Name of module containing registry + registry_name: Name of registry to save/restore + """ module = import_module(module_name) old = getattr(module, registry_name) setattr(module, registry_name, deepcopy(old)) yield setattr(module, registry_name, deepcopy(old)) + # List of registries to save/restore iterators = [ saveme("muse.sectors", "SECTORS_REGISTERED"), saveme("muse.objectives", "OBJECTIVES"), @@ -578,10 +713,3 @@ def saveme(module_name: str, registry_name: str): map(next, iterators) yield map(next, iterators) - - -@fixture -def rng(request): - from numpy.random import default_rng - - return default_rng(getattr(request.config.option, "randomly_seed", None)) diff --git a/tests/test_costs.py b/tests/test_costs.py index f66237b04..8b7b54785 100644 --- a/tests/test_costs.py +++ b/tests/test_costs.py @@ -18,301 +18,369 @@ supply_cost, variable_costs, ) -from muse.quantities import production_amplitude -from muse.timeslices import broadcast_timeslice +from muse.quantities import capacity_to_service_demand, production_amplitude +from muse.timeslices import broadcast_timeslice, distribute_timeslice +from muse.utilities import broadcast_over_assets YEAR = 2030 @fixture -def _capacity(_technologies, demand_share): - """Capacity for each asset.""" - from muse.quantities import capacity_to_service_demand - - return capacity_to_service_demand(technologies=_technologies, demand=demand_share) - - -@fixture -def _technologies(technologies, demand_share): - """Technology parameters for each asset.""" - from muse.utilities import broadcast_over_assets - - return broadcast_over_assets(technologies.sel(year=YEAR), demand_share) - - -@fixture -def _prices(market, demand_share): - """Prices relevant to each asset.""" - from muse.utilities import broadcast_over_assets - - prices = market.prices.sel(year=YEAR) - return broadcast_over_assets(prices, demand_share, installed_as_year=False) +def cost_inputs(technologies, market, demand_share): + """Creates the complete dataset needed for cost calculations. + + The transformation follows these steps: + 1. Extract year-specific data from technologies and market + 2. Transform data to asset level + 3. Calculate capacity for each asset + 4. Calculate production and consumption data + + Returns: + dict: Contains all necessary data for cost calculations: + - technologies: Technology parameters for each asset + - prices: Prices relevant to each asset + - capacity: Capacity for each asset + - production: Production data for each asset + - consumption: Consumption data for each asset + """ + # Step 1: Extract year-specific data + tech_year = technologies.sel(year=YEAR) + prices_year = market.prices.sel(year=YEAR) + + # Step 2: Transform to asset level + tech_assets = broadcast_over_assets(tech_year, demand_share) + prices_assets = broadcast_over_assets( + prices_year, demand_share, installed_as_year=False + ) + # Step 3: Calculate capacity + capacity = capacity_to_service_demand(technologies=tech_assets, demand=demand_share) -@fixture -def _production(_technologies, _capacity): - """Production data for each asset.""" - from muse.timeslices import broadcast_timeslice, distribute_timeslice - - return ( - broadcast_timeslice(_capacity) - * distribute_timeslice(_technologies.fixed_outputs) - * broadcast_timeslice(_technologies.utilization_factor) + # Step 4: Calculate production and consumption + production = ( + broadcast_timeslice(capacity) + * distribute_timeslice(tech_assets.fixed_outputs) + * broadcast_timeslice(tech_assets.utilization_factor) ) - -@fixture -def _consumption(_technologies, _capacity): - """Consumption data for each asset.""" - from muse.timeslices import broadcast_timeslice, distribute_timeslice - - return ( - broadcast_timeslice(_capacity) - * distribute_timeslice(_technologies.fixed_inputs) - * broadcast_timeslice(_technologies.utilization_factor) + consumption = ( + broadcast_timeslice(capacity) + * distribute_timeslice(tech_assets.fixed_inputs) + * broadcast_timeslice(tech_assets.utilization_factor) ) + return { + "technologies": tech_assets, + "prices": prices_assets, + "capacity": capacity, + "production": production, + "consumption": consumption, + } + -def test_fixtures(_technologies, _prices, _capacity, _production, _consumption): +def test_fixtures(cost_inputs): """Validate fixture dimensions.""" - assert set(_technologies.dims) == {"asset", "commodity"} - assert set(_prices.dims) == {"asset", "commodity", "timeslice"} - assert set(_capacity.dims) == {"asset"} - assert set(_production.dims) == {"asset", "commodity", "timeslice"} - assert set(_consumption.dims) == {"asset", "commodity", "timeslice"} + assert set(cost_inputs["technologies"].dims) == {"asset", "commodity"} + assert set(cost_inputs["prices"].dims) == {"asset", "commodity", "timeslice"} + assert set(cost_inputs["capacity"].dims) == {"asset"} + assert set(cost_inputs["production"].dims) == {"asset", "commodity", "timeslice"} + assert set(cost_inputs["consumption"].dims) == {"asset", "commodity", "timeslice"} -def test_capital_costs(_technologies, _capacity): - result = capital_costs(_technologies, _capacity) +def test_capital_costs(cost_inputs): + result = capital_costs(cost_inputs["technologies"], cost_inputs["capacity"]) assert set(result.dims) == {"asset"} -def test_environmental_costs(_technologies, _prices, _production): - result = environmental_costs(_technologies, _prices, _production) +def test_environmental_costs(cost_inputs): + result = environmental_costs( + cost_inputs["technologies"], cost_inputs["prices"], cost_inputs["production"] + ) assert set(result.dims) == {"asset", "timeslice"} -def test_fuel_costs(_technologies, _prices, _consumption): - result = fuel_costs(_technologies, _prices, _consumption) +def test_fuel_costs(cost_inputs): + result = fuel_costs( + cost_inputs["technologies"], cost_inputs["prices"], cost_inputs["consumption"] + ) assert set(result.dims) == {"asset", "timeslice"} -def test_material_costs(_technologies, _prices, _consumption): - result = material_costs(_technologies, _prices, _consumption) +def test_material_costs(cost_inputs): + result = material_costs( + cost_inputs["technologies"], cost_inputs["prices"], cost_inputs["consumption"] + ) assert set(result.dims) == {"asset", "timeslice"} -def test_fixed_costs(_technologies, _capacity): - result = fixed_costs(_technologies, _capacity) +def test_fixed_costs(cost_inputs): + result = fixed_costs(cost_inputs["technologies"], cost_inputs["capacity"]) assert set(result.dims) == {"asset"} -def test_variable_costs(_technologies, _production): - result = variable_costs(_technologies, _production) +def test_variable_costs(cost_inputs): + result = variable_costs(cost_inputs["technologies"], cost_inputs["production"]) assert set(result.dims) == {"asset"} -def test_running_costs(_technologies, _prices, _capacity, _production, _consumption): - result = running_costs(_technologies, _prices, _capacity, _production, _consumption) +def test_running_costs(cost_inputs): + result = running_costs( + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], + ) assert set(result.dims) == {"asset", "timeslice"} -def test_net_present_value( - _technologies, _prices, _capacity, _production, _consumption -): +def test_net_present_value(cost_inputs): result = net_present_value( - _technologies, _prices, _capacity, _production, _consumption + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], ) assert set(result.dims) == {"asset", "timeslice"} -def test_net_present_cost(_technologies, _prices, _capacity, _production, _consumption): +def test_net_present_cost(cost_inputs): result = net_present_cost( - _technologies, _prices, _capacity, _production, _consumption + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], ) assert set(result.dims) == {"asset", "timeslice"} -def test_equivalent_annual_cost( - _technologies, _prices, _capacity, _production, _consumption -): +def test_equivalent_annual_cost(cost_inputs): result = equivalent_annual_cost( - _technologies, _prices, _capacity, _production, _consumption + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], ) assert set(result.dims) == {"asset", "timeslice"} @mark.parametrize("method", ["annual", "lifetime"]) -def test_levelized_cost_of_energy( - _technologies, _prices, _capacity, _production, _consumption, method -): +def test_levelized_cost_of_energy(cost_inputs, method): result = levelized_cost_of_energy( - _technologies, _prices, _capacity, _production, _consumption, method=method + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], + method=method, ) assert set(result.dims) == {"asset", "timeslice"} -def test_supply_cost(_technologies, _prices, _capacity, _production, _consumption): +def test_supply_cost(cost_inputs): lcoe = levelized_cost_of_energy( - _technologies, _prices, _capacity, _production, _consumption, method="annual" + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], + method="annual", ) - result = supply_cost(_production, lcoe) + result = supply_cost(cost_inputs["production"], lcoe) assert set(result.dims) == {"commodity", "region", "timeslice"} -def test_capital_recovery_factor(_technologies): - result = capital_recovery_factor(_technologies) - assert set(result.dims) == set(_technologies.interest_rate.dims) +def test_capital_recovery_factor(cost_inputs): + result = capital_recovery_factor(cost_inputs["technologies"]) + assert set(result.dims) == set(cost_inputs["technologies"].interest_rate.dims) # Test zero interest rates - _technologies["interest_rate"] = 0 - result = capital_recovery_factor(_technologies) + cost_inputs["technologies"]["interest_rate"] = 0 + result = capital_recovery_factor(cost_inputs["technologies"]) assert isfinite(result).all() -def test_annual_to_lifetime(_technologies, _prices, _consumption): - _fuel_costs = fuel_costs(_technologies, _prices, _consumption) - _fuel_costs_lifetime = annual_to_lifetime(_fuel_costs, _technologies) +def test_annual_to_lifetime(cost_inputs): + _fuel_costs = fuel_costs( + cost_inputs["technologies"], cost_inputs["prices"], cost_inputs["consumption"] + ) + _fuel_costs_lifetime = annual_to_lifetime(_fuel_costs, cost_inputs["technologies"]) assert set(_fuel_costs.dims) == set(_fuel_costs_lifetime.dims) assert (_fuel_costs_lifetime > _fuel_costs).all() @mark.parametrize("method", ["annual", "lifetime"]) -def test_lcoe_flow_scaling( - _technologies, _prices, _capacity, _production, _consumption, method -): +def test_lcoe_flow_scaling(cost_inputs, method): """Test LCOE independence of input/output flow scaling.""" - _technologies["var_exp"] = 1 + cost_inputs["technologies"]["var_exp"] = 1 # Original LCOE lcoe1 = levelized_cost_of_energy( - _technologies, _prices, _capacity, _production, _consumption, method=method + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], + method=method, ) # Scale inputs/outputs and var_par by 2 - _technologies_scaled = _technologies.copy() - _technologies_scaled["fixed_inputs"] *= 2 - _technologies_scaled["flexible_inputs"] *= 2 - _technologies_scaled["fixed_outputs"] *= 2 - _technologies_scaled["var_par"] *= 2 + technologies_scaled = cost_inputs["technologies"].copy() + technologies_scaled["fixed_inputs"] *= 2 + technologies_scaled["flexible_inputs"] *= 2 + technologies_scaled["fixed_outputs"] *= 2 + technologies_scaled["var_par"] *= 2 lcoe2 = levelized_cost_of_energy( - _technologies_scaled, - _prices, - _capacity, - _production, - _consumption, + technologies_scaled, + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], method=method, ) assert isclose(lcoe1, lcoe2).all() @mark.parametrize("method", ["annual", "lifetime"]) -def test_lcoe_prod_scaling( - _technologies, _prices, _capacity, _production, _consumption, method -): +def test_lcoe_prod_scaling(cost_inputs, method): """Test LCOE independence of production scaling with linear costs.""" - _technologies["var_exp"] = 1 - _technologies["cap_exp"] = 1 - _technologies["fix_exp"] = 1 + cost_inputs["technologies"]["var_exp"] = 1 + cost_inputs["technologies"]["cap_exp"] = 1 + cost_inputs["technologies"]["fix_exp"] = 1 lcoe1 = levelized_cost_of_energy( - _technologies, _prices, _capacity, _production, _consumption, method=method + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], + method=method, ) lcoe2 = levelized_cost_of_energy( - _technologies, - _prices, - _capacity * 2, - _production * 2, - _consumption * 2, + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"] * 2, + cost_inputs["production"] * 2, + cost_inputs["consumption"] * 2, method=method, ) assert isclose(lcoe1, lcoe2).all() @mark.parametrize("method", ["annual", "lifetime"]) -def test_lcoe_equal_prices( - _technologies, _prices, _capacity, _production, _consumption, method -): +def test_lcoe_equal_prices(cost_inputs, method): """Test LCOE behavior with uniform prices across timeslices.""" lcoe1 = levelized_cost_of_energy( - _technologies, _prices, _capacity, _production, _consumption, method=method + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], + method=method, ) with raises(AssertionError): assert_allclose(lcoe1, broadcast_timeslice(lcoe1.isel(timeslice=0))) # Test with uniform prices - _prices = broadcast_timeslice(_prices.mean("timeslice")) + prices_uniform = broadcast_timeslice(cost_inputs["prices"].mean("timeslice")) lcoe2 = levelized_cost_of_energy( - _technologies, _prices, _capacity, _production, _consumption, method=method + cost_inputs["technologies"], + prices_uniform, + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], + method=method, ) assert_allclose(lcoe2, broadcast_timeslice(lcoe2.isel(timeslice=0))) -def test_npv_equal_prices(_technologies, _prices, _capacity, _production, _consumption): +def test_npv_equal_prices(cost_inputs): """Test NPV linearity with production under uniform prices.""" npv1 = net_present_value( - _technologies, _prices, _capacity, _production, _consumption + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], + ) + tech_activity = production_amplitude( + cost_inputs["production"], cost_inputs["technologies"] ) - tech_activity = production_amplitude(_production, _technologies) npv1_scaled = npv1 / tech_activity with raises(AssertionError): assert_allclose(npv1_scaled, broadcast_timeslice(npv1_scaled.isel(timeslice=0))) # Test with uniform prices - _prices = broadcast_timeslice(_prices.mean("timeslice")) + prices_uniform = broadcast_timeslice(cost_inputs["prices"].mean("timeslice")) npv2 = net_present_value( - _technologies, _prices, _capacity, _production, _consumption + cost_inputs["technologies"], + prices_uniform, + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], ) npv2_scaled = npv2 / tech_activity assert_allclose(npv2_scaled, broadcast_timeslice(npv2_scaled.isel(timeslice=0))) @mark.parametrize("method", ["annual", "lifetime"]) -def test_lcoe_zero_production( - _technologies, _prices, _capacity, _production, _consumption, method -): +def test_lcoe_zero_production(cost_inputs, method): """Test LCOE behavior with zero production.""" lcoe1 = levelized_cost_of_energy( - _technologies, _prices, _capacity, _production, _consumption, method=method + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], + method=method, ) assert not (lcoe1.isel(timeslice=0) == 0).all() # Test with zero production in first timeslice - _production.isel(timeslice=0)[:] = 0 - _consumption.isel(timeslice=0)[:] = 0 + production_zero = cost_inputs["production"].copy() + consumption_zero = cost_inputs["consumption"].copy() + production_zero.isel(timeslice=0)[:] = 0 + consumption_zero.isel(timeslice=0)[:] = 0 + lcoe2 = levelized_cost_of_energy( - _technologies, _prices, _capacity, _production, _consumption, method=method + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + production_zero, + consumption_zero, + method=method, ) assert (lcoe2.isel(timeslice=0) == 0).all() @mark.parametrize("method", ["annual", "lifetime"]) -def test_lcoe_aggregate( - _technologies, _prices, _capacity, _production, _consumption, method -): +def test_lcoe_aggregate(cost_inputs, method): """Test LCOE aggregation over timeslices.""" result = levelized_cost_of_energy( - _technologies, - _prices, - _capacity, - _production, - _consumption, + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], method=method, aggregate_timeslices=True, ) assert set(result.dims) == {"asset"} -def test_npv_aggregate(_technologies, _prices, _capacity, _production, _consumption): +def test_npv_aggregate(cost_inputs): """Test NPV aggregation over timeslices.""" result = net_present_value( - _technologies, - _prices, - _capacity, - _production, - _consumption, + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], aggregate_timeslices=True, ) assert set(result.dims) == {"asset"} diff --git a/tests/test_filters.py b/tests/test_filters.py index ad36047cf..edd832cda 100644 --- a/tests/test_filters.py +++ b/tests/test_filters.py @@ -146,7 +146,7 @@ def test_similar_fuels(retro_agent, search_space, technologies): assert (actual == expected).all() -def test_currently_existing(retro_agent, search_space, technologies, agent_market, rng): +def test_currently_existing(retro_agent, search_space, technologies, agent_market): # Test with zero capacity agent_market.capacity[:] = 0 actual = currently_existing_tech( @@ -165,9 +165,9 @@ def test_currently_existing(retro_agent, search_space, technologies, agent_marke assert actual.sel(replacement=in_market).all() # Test with partial capacity - techs = rng.choice( + techs = np.random.choice( list(set(agent_market.technology.values)), - 1 + rng.choice(range(len(set(agent_market.technology.values)))), + 1 + np.random.choice(range(len(set(agent_market.technology.values)))), replace=False, ) agent_market.capacity[:] = 0 @@ -228,13 +228,13 @@ def test_init_from_tech(demand_share, technologies, agent_market): assert not space.any() -def test_init_from_asset(technologies, rng): +def test_init_from_asset(technologies): # Create test data - technology = rng.choice(technologies.technology, 5) - installed = rng.choice((2020, 2025), len(technology)) + technology = np.random.choice(technologies.technology, 5) + installed = np.random.choice((2020, 2025), len(technology)) year = np.arange(2020, 2040, 5) capacity = xr.DataArray( - rng.choice([0, 0, 1, 10], (len(technology), len(year))), + np.random.choice([0, 0, 1, 10], (len(technology), len(year))), coords={ "technology": ("asset", technology), "installed": ("asset", installed), @@ -253,7 +253,7 @@ def test_init_from_asset(technologies, rng): assert set(space.asset.asset.values) == set(capacity.technology.values) -def test_init_from_asset_no_assets(technologies, rng): +def test_init_from_asset_no_assets(technologies): agent = namedtuple("DummyAgent", ["assets"])( xr.Dataset(dict(capacity=xr.DataArray(0))) ) diff --git a/tests/test_objectives.py b/tests/test_objectives.py index 06a42fef0..78461b643 100644 --- a/tests/test_objectives.py +++ b/tests/test_objectives.py @@ -1,34 +1,64 @@ +from numpy.random import rand from pytest import fixture, mark YEAR = 2030 @fixture -def _demand(demand_share): - return demand_share +def objective_inputs(technologies, market, demand_share): + """Creates the complete dataset needed for objective calculations. + + The transformation follows these steps: + 1. Extract year-specific data from technologies and market + 2. Transform technology data to asset level with replacement dimension + 3. Transform price data to asset level + 4. Add any additional variables needed for specific objectives + + Returns: + dict: Contains all necessary data for objective calculations: + - technologies: Technology parameters with replacement dimension + - prices: Prices relevant to each asset + - demand: Demand share data + """ + from muse.utilities import broadcast_over_assets + # Step 1: Extract year-specific data + tech_year = technologies.sel(year=YEAR).rename(technology="replacement") + prices_year = market.prices.sel(year=YEAR) -@fixture -def _technologies(technologies, demand_share): - from muse.utilities import broadcast_over_assets + # Step 2 & 3: Transform to asset level + tech_assets = broadcast_over_assets(tech_year, demand_share) + prices_assets = broadcast_over_assets( + prices_year, demand_share, installed_as_year=False + ) - techs = technologies.sel(year=YEAR).rename(technology="replacement") - return broadcast_over_assets(techs, demand_share) + # Step 4: Add computed variables needed by some objectives + tech_assets["comfort"] = _add_var(tech_assets, "replacement") + tech_assets["efficiency"] = _add_var(tech_assets, "replacement") + tech_assets["scaling_size"] = _add_var(tech_assets, "replacement") + return { + "technologies": tech_assets, + "prices": prices_assets, + "demand": demand_share, + } -@fixture -def _prices(market, demand_share): - from muse.utilities import broadcast_over_assets - prices = market.prices.sel(year=YEAR) - return broadcast_over_assets(prices, demand_share, installed_as_year=False) +def _add_var(coordinates, *dims, factor=100.0): + """Helper function to add random variables with specified dimensions.""" + shape = tuple(len(coordinates[u]) for u in dims) + return dims, (rand(*shape) * factor).astype(type(factor)) -def test_fixtures(_technologies, _demand, _prices): - """Validating that the fixtures have appropriate dimensions.""" - assert set(_technologies.dims) == {"asset", "commodity", "replacement"} - assert set(_demand.dims) == {"asset", "commodity", "timeslice"} - assert set(_prices.dims) == {"asset", "commodity", "timeslice"} +def test_fixtures(objective_inputs): + """Validating that the fixture data has appropriate dimensions.""" + assert set(objective_inputs["technologies"].dims) == { + "asset", + "commodity", + "replacement", + } + assert set(objective_inputs["demand"].dims) == {"asset", "commodity", "timeslice"} + assert set(objective_inputs["prices"].dims) == {"asset", "commodity", "timeslice"} @mark.usefixtures("save_registries") @@ -51,7 +81,7 @@ def b_objective(*args, **kwargs): @mark.usefixtures("save_registries") -def test_computing_objectives(_technologies, _demand, _prices): +def test_computing_objectives(objective_inputs): from muse.objectives import factory, register_objective @register_objective @@ -78,20 +108,27 @@ def second(technologies, demand, assets=None, *args, **kwargs): # Test first objective with/without switch objectives = factory("first")( - technologies=_technologies, demand=_demand, prices=_prices, switch=True + technologies=objective_inputs["technologies"], + demand=objective_inputs["demand"], + prices=objective_inputs["prices"], + switch=True, ) assert set(objectives.data_vars) == {"first"} assert (objectives.first == 1).all() + objectives = factory("first")( - technologies=_technologies, demand=_demand, prices=_prices, switch=False + technologies=objective_inputs["technologies"], + demand=objective_inputs["demand"], + prices=objective_inputs["prices"], + switch=False, ) assert (objectives.first == 2).all() # Test multiple objectives objectives = factory(["first", "second"])( - technologies=_technologies, - demand=_demand, - prices=_prices, + technologies=objective_inputs["technologies"], + demand=objective_inputs["demand"], + prices=objective_inputs["prices"], switch=False, assets=0, ) @@ -103,109 +140,135 @@ def second(technologies, demand, assets=None, *args, **kwargs): assert (objectives.second.isel(asset=1) == 5).all() -def test_comfort(_technologies, _demand): +def test_comfort(objective_inputs): from muse.objectives import comfort - _technologies["comfort"] = add_var(_technologies, "replacement") - result = comfort(_technologies, _demand) + result = comfort(objective_inputs["technologies"], objective_inputs["demand"]) assert set(result.dims) == {"replacement", "asset"} -def test_efficiency(_technologies, _demand): +def test_efficiency(objective_inputs): from muse.objectives import efficiency - _technologies["efficiency"] = add_var(_technologies, "replacement") - result = efficiency(_technologies, _demand) + result = efficiency(objective_inputs["technologies"], objective_inputs["demand"]) assert set(result.dims) == {"replacement", "asset"} -def test_capacity_to_service_demand(_technologies, _demand): +def test_capacity_to_service_demand(objective_inputs): from muse.objectives import capacity_to_service_demand - result = capacity_to_service_demand(_technologies, _demand) + result = capacity_to_service_demand( + objective_inputs["technologies"], objective_inputs["demand"] + ) assert set(result.dims) == {"replacement", "asset"} -def test_capacity_in_use(_technologies, _demand): +def test_capacity_in_use(objective_inputs): from muse.objectives import capacity_in_use - result = capacity_in_use(_technologies, _demand) + result = capacity_in_use( + objective_inputs["technologies"], objective_inputs["demand"] + ) assert set(result.dims) == {"replacement", "asset"} -def test_consumption(_technologies, _demand, _prices): +def test_consumption(objective_inputs): from muse.objectives import consumption - result = consumption(_technologies, _demand, _prices) + result = consumption( + objective_inputs["technologies"], + objective_inputs["demand"], + objective_inputs["prices"], + ) assert set(result.dims) == {"replacement", "asset", "timeslice"} -def test_fixed_costs(_technologies, _demand): +def test_fixed_costs(objective_inputs): from muse.objectives import fixed_costs - result = fixed_costs(_technologies, _demand) + result = fixed_costs(objective_inputs["technologies"], objective_inputs["demand"]) assert set(result.dims) == {"replacement", "asset"} -def test_capital_costs(_technologies, _demand): +def test_capital_costs(objective_inputs): from muse.objectives import capital_costs - _technologies["scaling_size"] = add_var(_technologies, "replacement") - result = capital_costs(_technologies, _demand) + result = capital_costs(objective_inputs["technologies"], objective_inputs["demand"]) assert set(result.dims) == {"replacement", "asset"} -def test_emission_cost(_technologies, _demand, _prices): +def test_emission_cost(objective_inputs): from muse.objectives import emission_cost - result = emission_cost(_technologies, _demand, _prices) + result = emission_cost( + objective_inputs["technologies"], + objective_inputs["demand"], + objective_inputs["prices"], + ) assert set(result.dims) == {"replacement", "asset", "timeslice"} -def test_fuel_consumption_cost(_technologies, _demand, _prices): +def test_fuel_consumption_cost(objective_inputs): from muse.objectives import fuel_consumption_cost - result = fuel_consumption_cost(_technologies, _demand, _prices) + result = fuel_consumption_cost( + objective_inputs["technologies"], + objective_inputs["demand"], + objective_inputs["prices"], + ) assert set(result.dims) == {"replacement", "asset", "timeslice"} -def test_annual_levelized_cost_of_energy(_technologies, _demand, _prices): +def test_annual_levelized_cost_of_energy(objective_inputs): from muse.objectives import annual_levelized_cost_of_energy - result = annual_levelized_cost_of_energy(_technologies, _demand, _prices) + result = annual_levelized_cost_of_energy( + objective_inputs["technologies"], + objective_inputs["demand"], + objective_inputs["prices"], + ) assert set(result.dims) == {"replacement", "asset"} -def test_lifetime_levelized_cost_of_energy(_technologies, _demand, _prices): +def test_lifetime_levelized_cost_of_energy(objective_inputs): from muse.objectives import lifetime_levelized_cost_of_energy - result = lifetime_levelized_cost_of_energy(_technologies, _demand, _prices) + result = lifetime_levelized_cost_of_energy( + objective_inputs["technologies"], + objective_inputs["demand"], + objective_inputs["prices"], + ) assert set(result.dims) == {"replacement", "asset"} -def test_net_present_value(_technologies, _demand, _prices): +def test_net_present_value(objective_inputs): from muse.objectives import net_present_value - result = net_present_value(_technologies, _demand, _prices) + result = net_present_value( + objective_inputs["technologies"], + objective_inputs["demand"], + objective_inputs["prices"], + ) assert set(result.dims) == {"replacement", "asset"} -def test_net_present_cost(_technologies, _demand, _prices): +def test_net_present_cost(objective_inputs): from muse.objectives import net_present_cost - result = net_present_cost(_technologies, _demand, _prices) + result = net_present_cost( + objective_inputs["technologies"], + objective_inputs["demand"], + objective_inputs["prices"], + ) assert set(result.dims) == {"replacement", "asset"} -def test_equivalent_annual_cost(_technologies, _demand, _prices): +def test_equivalent_annual_cost(objective_inputs): from muse.objectives import equivalent_annual_cost - result = equivalent_annual_cost(_technologies, _demand, _prices) + result = equivalent_annual_cost( + objective_inputs["technologies"], + objective_inputs["demand"], + objective_inputs["prices"], + ) assert set(result.dims) == {"replacement", "asset"} - - -def add_var(coordinates, *dims, factor=100.0): - from numpy.random import rand - - shape = tuple(len(coordinates[u]) for u in dims) - return dims, (rand(*shape) * factor).astype(type(factor)) diff --git a/tests/test_readers.py b/tests/test_readers.py index ab566cc5e..7847b7987 100644 --- a/tests/test_readers.py +++ b/tests/test_readers.py @@ -1,3 +1,4 @@ +from collections.abc import Mapping from itertools import chain, permutations from pathlib import Path from unittest.mock import patch @@ -16,6 +17,35 @@ ] +@fixture +def settings(tmpdir) -> dict: + """Generate settings for testing. + + Args: + tmpdir: Temporary directory path + + Returns: + Dictionary with test settings + """ + import toml + + from muse.readers import DEFAULT_SETTINGS_PATH + from muse.readers.toml import format_paths + + def drop_optionals(settings: dict) -> None: + """Remove optional settings from dictionary.""" + for k, v in list(settings.items()): + if v == "OPTIONAL": + settings.pop(k) + elif isinstance(v, Mapping): + drop_optionals(v) + + settings = toml.load(DEFAULT_SETTINGS_PATH) + drop_optionals(settings) + settings = format_paths(settings, cwd=tmpdir, path=tmpdir, muse_sectors=tmpdir) + return settings + + @fixture def user_data_files(settings: dict) -> None: """Creates test files related to user data."""