diff --git a/macq/__init__.py b/macq/__init__.py index 30bd90a9..20bd09b7 100644 --- a/macq/__init__.py +++ b/macq/__init__.py @@ -4,4 +4,4 @@ ---- .. include:: ../docs/index.md -""" +""" \ No newline at end of file diff --git a/macq/core/__init__.py b/macq/core/__init__.py new file mode 100644 index 00000000..eb91b34d --- /dev/null +++ b/macq/core/__init__.py @@ -0,0 +1,27 @@ +from .signature_parameter import SignatureParameter +from .object_type import ObjectType, CircularTypeHierarchyException, is_circular_type_hierarchy_error +from .fluents import LiftedFluent, ParameterBoundFluent, GroundedFluent +from .actions import LiftedAction, GroundedAction +from .model_type_validate import ModelTypeValidator, ModelType +from .factory_pattern_model import Model, create_lifted_model, create_grounded_model + +"""package contains core elements of an action model. + enabling lifted and grounded representations""" + + +__all__ = [ + "ObjectType", + "CircularTypeHierarchyException", + "is_circular_type_hierarchy_error", + "SignatureParameter", + "LiftedAction", + "GroundedAction", + "LiftedFluent", + "ParameterBoundFluent", + "GroundedFluent", + "ModelType", + "ModelTypeValidator", + "Model", + "create_lifted_model", + "create_grounded_model" +] \ No newline at end of file diff --git a/macq/core/action_model.py b/macq/core/action_model.py new file mode 100644 index 00000000..1e56d25d --- /dev/null +++ b/macq/core/action_model.py @@ -0,0 +1,362 @@ +from json import dumps +from typing import Set, Union, Optional, List + +import tarski +import tarski.fstrips as fs +from tarski import FirstOrderLanguage +from tarski.io import fstrips as iofs +from tarski.model import create +from tarski.syntax import land +from tarski.syntax.formulas import CompoundFormula, Connective, top, Formula + +from . import ObjectType +from .actions import GroundedAction, LiftedAction +from .fluents import LiftedFluent, GroundedFluent +from .model_type_validate import ModelType, ModelTypeValidator, ModelValidationError +from ..utils import ComplexEncoder + + +def _get_str_as_tarski_grounded_formula(name: str, lang: FirstOrderLanguage) -> Formula: + """Converts a string (referencing an attribute of a LearnedAction, i.e., a specific precondition or effect)) + to a Formula. + """ + return lang.get(name.replace(" ", "_"))() + + +class Model: + """Class representation of an Action-model.""" + + def __init__( + self, + fluents: Union[Set[LiftedFluent], Set[GroundedFluent]], + actions: Union[Set[LiftedAction], Set[GroundedAction]], + learned_sorts: Optional[List] = None, + model_type: Optional[ModelType] = None, + _skip_validation: bool = False + ): + """ + Internal constructor. Use factory methods for type-safe creation. + + Args: + fluents: Set of fluents + actions: Set of actions + learned_sorts: Optional list of sorts + model_type: Optional model type (will be detected if not provided) + _skip_validation: Internal flag to skip validation (used by factory methods) + """ + if not _skip_validation: + # Validate consistency + detected_type = ModelTypeValidator.validate_model_consistency(actions, fluents) + if model_type and model_type != detected_type: + raise ModelValidationError( + f"Specified model type {model_type.value} doesn't match " + f"detected type {detected_type.value}" + ) + model_type = detected_type + + self.fluents = fluents + self.actions = actions + self.learned_sorts = learned_sorts + self._model_type = model_type + + @property + def model_type(self) -> ModelType: + """Get the model type.""" + if self._model_type is None: + self._model_type = ModelTypeValidator.validate_model_consistency( + self.actions, self.fluents + ) + return self._model_type + + def is_lifted_model(self) -> bool: + """Check if this is a lifted model.""" + return self.model_type == ModelType.LIFTED + + def is_grounded_model(self) -> bool: + """Check if this is a grounded model.""" + return self.model_type == ModelType.GROUNDED + + # Existing methods remain the same... + def __eq__(self, other) -> bool: + if not isinstance(other, Model): + return False + return (self.fluents == other.fluents and + self.actions == other.actions and + self.learned_sorts == other.learned_sorts) + + def details(self) -> str: + """Return detailed string representation of the model.""" + indent = " " * 2 + string = f"Model ({self.model_type.value}):\n" + string += f"{indent}Fluents: {', '.join(map(str, self.fluents))}\n" + string += f"{indent}Actions:\n" + for line in self._get_action_details().splitlines(): + string += f"{indent * 2}{line}\n" + return string + + def _get_action_details(self) -> str: + """Get detailed string representation of all actions.""" + # Implementation depends on your action classes having certain methods + # This is a simplified version + indent = " " * 2 + details = "" + for action in self.actions: + details += f"{action.name}:\n" + # Add more details based on your action interface + return details + + def serialize(self, filepath: Optional[str] = None) -> str: + """Serialize the model to JSON.""" + serial = dumps(self._serialize(), cls=ComplexEncoder) + if filepath is not None: + with open(filepath, "w") as fp: + fp.write(serial) + return serial + + def _serialize(self) -> dict: + """Internal serialization method.""" + return dict( + fluents=list(self.fluents), + actions=list(self.actions), + learned_sorts=self.learned_sorts, + model_type=self.model_type.value + ) + + def to_pddl(self, domain_name: str, problem_name: str = "", + domain_filename: str = "", problem_filename: str = ""): + """Export to PDDL format based on the model type.""" + + if not problem_name: + problem_name = domain_name + "_problem" + if not domain_filename: + domain_filename = domain_name + ".pddl" + if not problem_filename: + problem_filename = problem_name + ".pddl" + + if self.is_lifted_model(): + self.to_pddl_lifted(domain_name, problem_name, domain_filename, problem_filename) + elif self.is_grounded_model(): + self.to_pddl_grounded(domain_name, problem_name, domain_filename, problem_filename) + else: + raise ModelValidationError(f"Cannot export {self.model_type.value} model to PDDL") + + def to_pddl_lifted( + self, + domain_name: str, + problem_name: str, + domain_filename: str, + problem_filename: str, + ): + """Dumps a Model with typed lifted actions & fluents to PDDL files. + + Args: + domain_name (str): + The name of the domain to be generated. + problem_name (str): + The name of the problem to be generated. + domain_filename (str): + The name of the domain file to be generated. + problem_filename (str): + The name of the problem file to be generated. + """ + + lang = tarski.language(domain_name) + problem = tarski.fstrips.create_fstrips_problem( + domain_name=domain_name, problem_name=problem_name, language=lang + ) + object_types = {"object"} + if self.learned_sorts is not None: + for s in self.learned_sorts: + if isinstance(s, ObjectType) and s.type_name not in object_types: + if s.parent is None: + lang.sort(name=s.type_name) + object_types.add(s.type_name) + for s in self.learned_sorts: + if isinstance(s, ObjectType) and s.type_name not in object_types: + if s.parent is not None: + lang.sort(name=s.type_name, parent=s.parent) + object_types.add(s.type_name) + + + if self.fluents: + for f in self.fluents: + param_types = [parm.object_type.type_name for parm in f.parameters] + for object_type in param_types: + if object_type not in object_types: + lang.sort(object_type) + object_types.add(object_type) + lang.predicate(f.name, *param_types) + + if self.actions: + for a in self.actions: + param_types = [parm.object_type.type_name for parm in a.params] + vars = [lang.variable(f"x{i}", s) for i, s in enumerate(param_types)] + + positive_precond_list = [lang.get(f.name)(*[vars[i] for i, _ in enumerate(f.bounded_params)]) + for f in a.positive_preconditions] + + neg_precond_list = [] + try: + if a.negative_preconditions: + for f in a.negative_preconditions: + negated_predicate = CompoundFormula( + Connective.Not,[lang.get(f.name)( + *[vars[i] for i, _ in enumerate(f.bounded_params)])],) + neg_precond_list.append(negated_predicate) + except AttributeError: + pass + + precond_list = positive_precond_list + neg_precond_list + if len(precond_list) == 1: + precond = precond_list[0] + elif len(precond_list) == 0: + precond = top # always true + + else: + precond = CompoundFormula(Connective.And, precond_list,) + + adds = [lang.get(f.name)(*[vars[i] for i, _ in enumerate(f.bounded_params)]) for f in a.add_effects] # type: ignore TODO validate for i, _ in enumerate(f.bounded_params) + dels = [lang.get(f.name)(*[vars[i] for i, _ in enumerate(f.bounded_params)]) for f in a.delete_effects] # type: ignore TODO for i, _ in enumerate(f.bounded_params) + effects = [fs.AddEffect(e) for e in adds] + [fs.DelEffect(e) for e in dels] # fmt: skip + + problem.action( + a.name, + parameters=vars, + precondition=precond, + effects=effects, + ) + + problem.init = tarski.model.create(lang) # type: ignore + problem.goal = land() # type: ignore + writer = iofs.FstripsWriter(problem) + writer.write(domain_filename, problem_filename) + + def __to_tarski_formula(self, + attribute: Set[str], + lang: FirstOrderLanguage) -> Union[CompoundFormula, top]: + """Converts a set of strings (referencing an attribute of a LearnedAction, i.e., its preconditions) + to an Atom or CompoundFormula, in order to set up a tarski action. + + Args: + attribute (Set[str]): + The attribute to be converted to an Atom or CompoundFormula. + lang (FirstOrderLanguage): + The relevant language. + + Returns: + The attribute of the LearnedAction, converted to an Atom or CompoundFormula. + """ + # return top if there are no constraints + if not attribute: + return top + # creates Atom + elif len(attribute) == 1: + grounding = _get_str_as_tarski_grounded_formula(attribute, lang) # type: ignore + return CompoundFormula(Connective.And, [grounding],) + + # creates CompoundFormula + else: + return CompoundFormula( + Connective.And, [self._get_str_as_tarski_formula(a, lang) for a in attribute],) # type: ignore + + + def to_pddl_grounded( + self, + domain_name: str, + problem_name: str, + domain_filename: str, + problem_filename: str, + ): + """Dumps a Model to two PDDL files. The conversion only uses 0-arity predicates, and no types, objects, + or parameters of any kind are used. Actions are represented as ground actions with no parameters. + + Args: + domain_name (str): + The name of the domain to be generated. + problem_name (str): + The name of the problem to be generated. + domain_filename (str): + The name of the domain file to be generated. + problem_filename (str): + The name of the problem file to be generated. + """ + + lang = tarski.language(domain_name) + problem = tarski.fstrips.create_fstrips_problem( + domain_name=domain_name, problem_name=problem_name, language=lang + ) + if self.fluents: + # create 0-arity predicates + for f in self.fluents: + # NOTE: want there to be no brackets in any fluents referenced as tarski adds these later. + # fluents (their string conversion) must be in the following format: (on object a object b) + test = str(f) + lang.predicate(str(f)[1:-1].replace(" ", "_")) + + if self.actions: + for a in self.actions: + + # fetch all the relevant 0-arity predicates and create formulas to set up the ground actions + positive_preconds = [_get_str_as_tarski_grounded_formula( + a[1:-1], lang) for a in a.positive_preconditions] + # create a set of negative preconditions + negative_preconds = [CompoundFormula(Connective.Not, + _get_str_as_tarski_grounded_formula(a[1:-1], lang),) + for a in a.negative_preconditions] + + precond_list = positive_preconds + negative_preconds + if len(precond_list) == 1: + preconds = precond_list[0] + elif len(precond_list) == 0: + preconds = top # always true + else: + preconds = CompoundFormula(Connective.And, precond_list,) + + + + adds = [lang.get(f"{e.replace(' ', '_')[1:-1]}")() for e in a.add_effects] #todo validate on different + dels = [lang.get(f"{e.replace(' ', '_')[1:-1]}")() for e in a.delete_effects] + effects = [fs.AddEffect(e) for e in adds] + effects.extend([fs.DelEffect(e) for e in dels]) + # set up action + problem.action( + name=a.details() + .replace("(", "") + .replace(")", "") + .replace(" ", "_"), + parameters=[], + precondition=preconds, + effects=effects,) + + # create empty init and goal + problem.init = tarski.model.create(lang) + problem.goal = land() + # write to files + writer = iofs.FstripsWriter(problem) + writer.write(domain_filename, problem_filename) + + # TODO complete + # @classmethod + # def _from_json(cls, data: dict): + # actions = set(map(_deserialize, data["actions"])) + # return cls(set(data["fluents"]), actions) + + # TODO complete + # @staticmethod + # def deserialize(string: str): + # """Deserializes a json string into a Model. + # + # Args: + # string (str): + # The json string representing a model. + # + # Returns: + # A Model object matching the one specified by `string`. + # """ + # return Model._from_json(loads(string)) + +# TODO finish + # @classmethod + # def _from_json(cls, data: dict): + # actions = set(map(LearnedAction._deserialize, data["actions"])) + # return cls(set(data["fluents"]), actions) diff --git a/macq/core/actions/__init__.py b/macq/core/actions/__init__.py new file mode 100644 index 00000000..6ef207fe --- /dev/null +++ b/macq/core/actions/__init__.py @@ -0,0 +1,12 @@ + +from .lifted_action import LiftedAction +from .grounded_action import GroundedAction + +"""package contains elements of an action model - Action. + enabling lifted and grounded representations of actions""" + + +__all__ = [ + "LiftedAction", + "GroundedAction", +] \ No newline at end of file diff --git a/macq/core/actions/grounded_action.py b/macq/core/actions/grounded_action.py new file mode 100644 index 00000000..7c40c155 --- /dev/null +++ b/macq/core/actions/grounded_action.py @@ -0,0 +1,59 @@ +from typing import List, Set + +from ..planning_object import PlanningObject +from ..fluents.grounded_fluent import GroundedFluent + + +# TODO finish implementation +class GroundedAction: + name: str + params: List[PlanningObject] + positive_preconditions: Set[GroundedFluent] + negative_preconditions: Set[GroundedFluent] + add_effects: Set[GroundedFluent] + delete_effects: Set[GroundedFluent] + cost: int|None + + + def __init__(self, name: str, objects: List[PlanningObject], is_strips:bool= False, cost: int=None): + self.name = name + self.objects = objects + self.is_strips = is_strips + self.positive_preconditions = set() + self.negative_preconditions = set() + self.add_effects = set() + self.delete_effects = set() + self.cost = None + + def __eq__(self, other): + return hash(self) == hash(other) + + def __repr__(self): + string = f"{self.name} {' '.join(map(str, self.obj_params))}" + return string + + def __hash__(self): + raise hash(self.details()) + + def _serialize(self): + return self.name + + def details(self): + string = f"{self.name} {' '.join([o.details() for o in self.objects])}" + return string + + def clone(self, atomic=False): + if atomic: + return AtomicAction( + self.name, [obj.details() for obj in self.objects], self.cost) + + return GroundedAction(self.name, self.objects.copy(), is_strips=self.is_strips, cost=self.cost) + +class AtomicAction(GroundedAction): + """An Action where the objects are represented by strings.""" + + def __init__(self, name: str, obj_params: List[str], cost: int = 0): + super().__init__(name, [PlanningObject(obj) for obj in obj_params], cost=cost) + self.name = name + self.obj_params = obj_params + self.cost = cost diff --git a/macq/core/actions/lifted_action.py b/macq/core/actions/lifted_action.py new file mode 100644 index 00000000..0be9ff76 --- /dev/null +++ b/macq/core/actions/lifted_action.py @@ -0,0 +1,45 @@ +from typing import List, Set + +from ..signature_parameter import SignatureParameter +from ..fluents.parameter_bound_fluent import ParameterBoundFluent + +class LiftedAction: + + name: str + params: List[SignatureParameter] + positive_preconditions: Set[ParameterBoundFluent] + negative_preconditions: Set[ParameterBoundFluent] | None + add_effects: Set[ParameterBoundFluent] + delete_effects: Set[ParameterBoundFluent] + + + def __init__(self, name: str, params: List[SignatureParameter]): + self.positive_preconditions = set() + self.negative_preconditions = None + self.add_effects = set() + self.delete_effects = set() + self.name = name + self.params = params + + + def __eq__(self, other): + return hash(self) == hash(other) + + def __hash__(self): + return hash(( + self.name, + tuple(self.params), + frozenset(self.positive_preconditions), + frozenset(self.negative_preconditions), + frozenset(self.add_effects), + frozenset(self.delete_effects),)) + + + def __str__(self): + return self.details() + + def details(self): + return f"({self.name} {' '.join(param.object_type.type_name for param in self.params)})" + + def __repr__(self) -> str: + return self.details() diff --git a/macq/core/factory_pattern_model.py b/macq/core/factory_pattern_model.py new file mode 100644 index 00000000..1e59e39e --- /dev/null +++ b/macq/core/factory_pattern_model.py @@ -0,0 +1,157 @@ +from typing import Set, Union, List, Optional + +from .action_model import Model +from .actions import GroundedAction, LiftedAction +from .fluents import LiftedFluent, ParameterBoundFluent, GroundedFluent +from .model_type_validate import ModelTypeValidator, ModelType, ModelValidationError + + +# Factory Functions - The main interface for creating models + +def create_lifted_model( + fluents: Set[Union[LiftedFluent, ParameterBoundFluent]], + actions: Set[LiftedAction], + learned_sorts: Optional[List] = None +) -> Model: + """ + Create a lifted model with type validation. + + Args: + fluents: Set of LiftedFluent or ParameterBoundLiteral objects + actions: Set of LiftedAction objects + learned_sorts: Optional list of sorts + + Returns: + A validated lifted Model + + Raises: + ModelValidationError: If the provided components are not consistent + """ + # Validate types + for fluent in fluents: + if not isinstance(fluent, (LiftedFluent, ParameterBoundFluent)): + raise ModelValidationError( + f"Expected LiftedFluent or ParameterBoundLiteral, got {type(fluent)}" + ) + + for action in actions: + if not isinstance(action, LiftedAction): + raise ModelValidationError( + f"Expected LiftedAction, got {type(action)}" + ) + + return Model( + fluents=fluents, + actions=actions, + learned_sorts=learned_sorts, + model_type=ModelType.LIFTED, + _skip_validation=True # We already validated + ) + +def create_grounded_model( + fluents: Set[GroundedFluent], + actions: Set[GroundedAction], + learned_sorts: Optional[List] = None +) -> Model: + """ + Create a grounded model with type validation. + + Args: + fluents: Set of GroundedFluent objects + actions: Set of GroundedAction objects + learned_sorts: Optional list of sorts + + Returns: + A validated grounded Model + + Raises: + ModelValidationError: If the provided components are not consistent + """ + # Validate types + for fluent in fluents: + if not isinstance(fluent, GroundedFluent): + raise ModelValidationError( + f"Expected GroundedFluent, got {type(fluent)}" + ) + + for action in actions: + if not isinstance(action, GroundedAction): + raise ModelValidationError( + f"Expected GroundedAction, got {type(action)}" + ) + + return Model( + fluents=fluents, + actions=actions, + learned_sorts=learned_sorts, + model_type=ModelType.GROUNDED, + _skip_validation=True # We already validated + ) + +def create_model_from_components( + fluents: Set, + actions: Set, + learned_sorts: Optional[List] = None +) -> Model: + """ + Create a model by auto-detecting the type from components. + + Args: + fluents: Set of fluent objects + actions: Set of action objects + learned_sorts: Optional list of sorts + + Returns: + A validated Model of the appropriate type + + Raises: + ModelValidationError: If the components are inconsistent or unknown type + """ + model_type = ModelTypeValidator.validate_model_consistency(actions, fluents) + + if model_type == ModelType.UNKNOWN: + raise ModelValidationError("Cannot determine model type from provided components") + + return Model( + fluents=fluents, + actions=actions, + learned_sorts=learned_sorts, + model_type=model_type, + _skip_validation=True) + + +# Usage examples and type-safe functions + +def merge_models_safe(model1: Model, model2: Model) -> Model: + """Safely merge two models of the same type.""" + if model1.model_type != model2.model_type: + raise ModelValidationError( + f"Cannot merge models of different types: {model1.model_type.value} " + f"and {model2.model_type.value}" + ) + + merged_fluents = model1.fluents | model2.fluents + merged_actions = model1.actions | model2.actions + + # Merge learned_sorts + merged_sorts = None + if model1.learned_sorts and model2.learned_sorts: + merged_sorts = list(set(model1.learned_sorts + model2.learned_sorts)) + elif model1.learned_sorts: + merged_sorts = model1.learned_sorts + elif model2.learned_sorts: + merged_sorts = model2.learned_sorts + + return create_model_from_components(merged_fluents, merged_actions, merged_sorts) + +# def convert_to_grounded(lifted_model: Model, objects: List) -> Model: +# """Convert a lifted model to a grounded model (placeholder implementation).""" + # if not lifted_model.is_lifted_model(): + # raise ModelValidationError("Can only ground lifted models") + # + # # This would contain your actual grounding logic + # # For now, it's just a placeholder + # grounded_fluents = set() # Your grounding logic here + # grounded_actions = set() # Your grounding logic here + # + # return create_grounded_model(grounded_fluents, grounded_actions) \ No newline at end of file diff --git a/macq/core/fluents/__init__.py b/macq/core/fluents/__init__.py new file mode 100644 index 00000000..a2c7fea7 --- /dev/null +++ b/macq/core/fluents/__init__.py @@ -0,0 +1,14 @@ +from typing import TypeVar + +from .lifted_fluent import LiftedFluent +from .parameter_bound_fluent import ParameterBoundFluent +from .grounded_fluent import GroundedFluent + +"""package of core elements of an action model Fluent. + enabling lifted, parameter-bound and grounded fluent representations""" + +__all__ = [ + "LiftedFluent", + "ParameterBoundFluent", + "GroundedFluent", +] \ No newline at end of file diff --git a/macq/core/fluents/grounded_fluent.py b/macq/core/fluents/grounded_fluent.py new file mode 100644 index 00000000..2eb7e659 --- /dev/null +++ b/macq/core/fluents/grounded_fluent.py @@ -0,0 +1,45 @@ +from typing import List + +from ..planning_object import PlanningObject + +class GroundedFluent: + name: str + objects: List[PlanningObject] + + def __init__(self, name: str, objects: List[PlanningObject]): + self.name = name + self.objects = objects + + def get_binding(self, action_parameters: List[PlanningObject]) -> List[int]: + """Returns indices of fluent objects in action parameters list.""" + binding = [] + for obj in self.objects: + try: + binding.append(action_parameters.index(obj)) + except ValueError: + raise ValueError(f"Object {obj} not found in action parameters {action_parameters}") + return binding + + def __str__(self): + return self.details() + + def details(self): + if len(self.objects) > 0: + string = f"{self.name} {' '.join([o.details() for o in self.objects])}" + + else: + string = self.name + return f"({string})" + + + def __repr__(self): + return self.details() + + def _serialize(self): + return str(self) + + def __hash__(self): + return hash(self.name) + + def __eq__(self, other): + return isinstance(other, GroundedFluent) and hash(self) == hash(other) \ No newline at end of file diff --git a/macq/core/fluents/lifted_fluent.py b/macq/core/fluents/lifted_fluent.py new file mode 100644 index 00000000..8dce7988 --- /dev/null +++ b/macq/core/fluents/lifted_fluent.py @@ -0,0 +1,42 @@ +from typing import List +from ..signature_parameter import SignatureParameter + +class LiftedFluent: + """Represents a lifted fluent, which is a structural element often used in + planning domains to describe state properties over objects and their + relationships. + + A lifted fluent is characterized by its name and parameters. It can be + hashed, printed as a string, or compared with other instances based on + its name and parameters. + + Attributes: + name (str): The name of the lifted fluent, indicating its identity + in the planning domain. + parameters (List[SignatureParameter]): A list of parameters that + define the schema of the lifted fluent. + """ + name: str + parameters: List[SignatureParameter] + + def __init__(self, name: str, parameters: List[SignatureParameter]): + self.name = name + self.parameters = parameters + + def __hash__(self): + return hash(self.name) + hash(tuple(self.parameters)) + + def __str__(self): + return self.name + ' '.join(map(str,self.parameters)) + + def details(self): + return str(self) + + def __repr__(self): + return self.name + + def __eq__(self, other): + return isinstance(other, LiftedFluent) and hash(self) == hash(other) + + def _serialize(self): + return self.details() \ No newline at end of file diff --git a/macq/core/fluents/parameter_bound_fluent.py b/macq/core/fluents/parameter_bound_fluent.py new file mode 100644 index 00000000..ba0a2562 --- /dev/null +++ b/macq/core/fluents/parameter_bound_fluent.py @@ -0,0 +1,34 @@ +from typing import List + +from ..planning_object import PlanningObject + + +class ParameterBoundFluent: + """Represents a parameter bound literal often refered as a parameter bound fluent, which is a structural element often used in + planning domains to describe state properties over objects and their + relationships""" + name: str + bounded_params: List[int] + + def __init__(self, name: str, bounded_params: List[int]): + self.name = name + self.bounded_params = bounded_params + + def __str__(self): + string = f"{self.name + ' '.join(map(str,self.bounded_params))}" + return string + + def __repr__(self): + return self.details() + + def details(self): + return str(self) + + + def serialize(self): + return str(self) + + # todo implement + def ground(self, objects: List[PlanningObject]): + ... + # return Fluent(self.name, objects) \ No newline at end of file diff --git a/macq/core/model_type_validate.py b/macq/core/model_type_validate.py new file mode 100644 index 00000000..4911390c --- /dev/null +++ b/macq/core/model_type_validate.py @@ -0,0 +1,113 @@ +from enum import Enum +from typing import Set, TypeVar + +from macq.core import LiftedAction, GroundedAction, LiftedFluent, GroundedFluent + +LiftedFeature = LiftedFluent | LiftedAction +GroundedFeature = GroundedFluent | GroundedAction + +class ModelType(Enum): + """Enumeration of supported model types.""" + LIFTED = "lifted" + GROUNDED = "grounded" + MIXED = "mixed" # For cases where you might have both + UNKNOWN = "unknown" + +#todo: make informative +class ModelValidationError(Exception): + """Raised when model components are inconsistent.""" + pass + + + +class ModelTypeValidator: + """Utility class for validating model type consistency.""" + + @staticmethod + def detect_feature_type(feature_set: Set) -> ModelType: + """Detect the type of in a set.""" + if not feature_set: + return ModelType.UNKNOWN + + first_item = next(iter(feature_set)) + + if isinstance(first_item, LiftedFeature): + # Check if all actions are lifted + if all(isinstance(feature_item, LiftedFeature) for feature_item in feature_set): + return ModelType.LIFTED + else: + return ModelType.MIXED + + elif isinstance(first_item, GroundedFeature): + # Check if all actions are grounded + if all(isinstance(feature_item, GroundedFeature) for feature_item, in feature_set): + return ModelType.GROUNDED + else: + return ModelType.MIXED + else: + return ModelType.UNKNOWN + + + @staticmethod + def validate_model_consistency(actions: Set, fluents: Set) -> ModelType: + """Validate that actions and fluents are consistent with each other.""" + action_type = ModelTypeValidator.detect_feature_type(actions) + fluent_type = ModelTypeValidator.detect_feature_type(fluents) + + if action_type == ModelType.UNKNOWN or fluent_type == ModelType.UNKNOWN: + return ModelType.UNKNOWN + else: + if action_type == fluent_type: + return action_type + else: + raise ModelValidationError( + f"Inconsistent model types: actions are {action_type.value} " + f"but fluents are {fluent_type.value}" + ) + + + #replaced with detect_feature_type for less code. + # @staticmethod + # def detect_action_type(actions: Set) -> ModelType: + # """Detect the type of actions in a set.""" + # if not actions: + # return ModelType.UNKNOWN + # + # first_action = next(iter(actions)) + # + # if isinstance(first_action, LiftedAction): + # # Check if all actions are lifted + # if all(isinstance(action, LiftedAction) for action in actions): + # return ModelType.LIFTED + # else: + # return ModelType.MIXED + # elif isinstance(first_action, GroundedAction): + # # Check if all actions are grounded + # if all(isinstance(action, GroundedAction) for action in actions): + # return ModelType.GROUNDED + # else: + # return ModelType.MIXED + # else: + # return ModelType.UNKNOWN + # + # @staticmethod + # def detect_fluent_type(fluents: Set) -> ModelType: + # """Detect the type of fluents in a set.""" + # if not fluents: + # return ModelType.UNKNOWN + # + # first_fluent = next(iter(fluents)) + # + # if isinstance(first_fluent, LiftedFluent): + # # Both LiftedFluent and ParameterBoundLiteral work with lifted models + # if all(isinstance(fluent, LiftedFluent) for fluent in fluents): + # return ModelType.LIFTED + # else: + # return ModelType.MIXED + # elif isinstance(first_fluent, GroundedFluent): + # if all(isinstance(fluent, GroundedFluent) for fluent in fluents): + # return ModelType.GROUNDED + # else: + # return ModelType.MIXED + # else: + # return ModelType.UNKNOWN \ No newline at end of file diff --git a/macq/core/object_type.py b/macq/core/object_type.py new file mode 100644 index 00000000..33fbb08a --- /dev/null +++ b/macq/core/object_type.py @@ -0,0 +1,99 @@ +from typing import Optional + + +class CircularTypeHierarchyException(Exception): + def __init__(self, message=None): + super().__init__() + self.message = message + + def __str__(self): + return "Circular Type Hierarchy Error: " + self.message if self.message \ + else "Circular Type Hierarchy Error - validate inputs of ObjectType init function." + +def is_circular_type_hierarchy_error(child_type_name: str, parent: "ObjectType"): + """ + Checks whether adding a `child_type_name` as a child of `parent` creates a circular hierarchy. + + A circular hierarchy occurs when a type becomes its own ancestor or descendant, + causing inconsistencies in the hierarchical structure. This method ensures the + validity of the hierarchical relationship by validating the absence of such + circular dependencies. + + Args: + child_type_name (ObjectType): The proposed child type in the hierarchical + relation. + parent (ObjectType): The parent type to check against for circular + dependencies. + + Returns: + bool: True if the addition creates a circular hierarchy, otherwise False. + """ + + current = parent + while current is not None: + if current.type_name == child_type_name: + return True + current = current.parent + return False + +class ObjectType: + """clss representation of a type matching the tarski lang sort representation + the type can be illustrated as a directed graph with no cycles""" + type_name: str + parent_type: Optional["ObjectType"] + + + def __init__(self, type_name: str ="object", parent=None): + """if sort has a parent of type DomainSort, input the parent sort name + if the sort has no learned parent, the parent argument can remain empty""" + if is_circular_type_hierarchy_error(type_name, parent): + raise CircularTypeHierarchyException( + f" Type '{type_name}' would create a circular hierarchy with parent '{parent.type_name}'") + + self.type_name = type_name + self.parent = parent + + def __eq__(self, other): + if not isinstance(other, ObjectType): + return False + return self.type_name == other.type_name + + def __ne__(self, other): + return not self.__eq__(other) + + def __lt__(self, other): + if not isinstance(other, ObjectType): + return NotImplemented + + # Check if other is an ancestor of self + curr = self.parent + while curr is not None: + if curr == other: + return True + curr = curr.parent + return False + + def __gt__(self, other): + if not isinstance(other, ObjectType): + return NotImplemented + + # Check if self is an ancestor of other + curr = other.parent + while curr is not None: + if curr == self: + return True + curr = curr.parent + return False + + def __str__(self): + return self.type_name + + def __hash__(self): + return hash(self.details()) + + + def details(self): + postfix = "" if self.parent is None else f" parent: {self.parent}" + return f"name: {self.type_name}{postfix}" + + diff --git a/macq/core/planning_object.py b/macq/core/planning_object.py new file mode 100644 index 00000000..73824ead --- /dev/null +++ b/macq/core/planning_object.py @@ -0,0 +1,41 @@ +from .object_type import ObjectType + +class PlanningObject: + """An object of a planning domain. + + Attributes: + name (str): + the object's name. + obj_type (str): + The type of the object in the problem domain. + Example: "block". + + """ + + def __init__(self, name: str, obj_type: ObjectType): + """Initializes a PlanningObject with a type and a name. + + Args: + name (str): + The name of the object. + obj_type (str): + The type of the object in the problem domain. + """ + self.name = name + self.obj_type = obj_type + + + def __hash__(self): + return hash(self.name) + + def __eq__(self, other): + return isinstance(other, PlanningObject) and self.name == other.name + + def details(self): + return " ".join([self.obj_type.type_name, self.name]) + + def __repr__(self): + return self.details() + + def _serialize(self): + return self.details() \ No newline at end of file diff --git a/macq/core/signature_parameter.py b/macq/core/signature_parameter.py new file mode 100644 index 00000000..f1e5b3ef --- /dev/null +++ b/macq/core/signature_parameter.py @@ -0,0 +1,22 @@ +from dataclasses import dataclass + +from .object_type import ObjectType + +@dataclass +class SignatureParameter: + name: str + object_type: ObjectType = None + + def __post_init__(self): + # Initialize with the default ObjectType if None is provided + if self.object_type is None: + self.object_type = ObjectType("object") + + def __str__(self): + return f"{self.name}" + + def details(self): + return f"{self.name} {self.object_type}" + + def __hash__(self): + return hash(str(self)) diff --git a/macq/extract/__init__.py b/macq/extract/__init__.py index 5a3463ef..3d5e3410 100644 --- a/macq/extract/__init__.py +++ b/macq/extract/__init__.py @@ -12,12 +12,14 @@ from .locm import LOCM from .slaf import SLAF from .observer import Observer +from macq.core.signature_parameter import ObjectType __all__ = [ "LearnedAction", "LearnedLiftedAction", "LearnedFluent", "LearnedLiftedFluent", + "ObjectType", "Model", "Extract", "modes", diff --git a/macq/extract/learned_action.py b/macq/extract/learned_action.py index da45ee0e..8a89223c 100644 --- a/macq/extract/learned_action.py +++ b/macq/extract/learned_action.py @@ -126,6 +126,12 @@ def __repr__(self) -> str: def details(self): return f"({self.name} {' '.join(self.param_sorts)})" + # NOTE that by definition the preconditions of + # a lifted action is a set of parameter-bound-literals and not a lifted fluent + # the LearnedLiftedFluent is in fact implemented as a partially parameter-bound literal and partially lifted fluent. + # notice the hash function if the class referring to the object as a lifted fluent, whereas the constructor refers to + # the object as a parameter-bound literal by initializing the param-act_inds field. that causes a hash collision when implementing learning + # algorithms and referring the set as a parameter-bound literal. I would suggest renaming classes and fixing classes definitions to fit the mathematical definitions. def update_precond( self, fluents: Union[LearnedLiftedFluent, Set[LearnedLiftedFluent]] ): diff --git a/macq/extract/learned_fluent.py b/macq/extract/learned_fluent.py index 37305a2c..71751c3d 100644 --- a/macq/extract/learned_fluent.py +++ b/macq/extract/learned_fluent.py @@ -1,5 +1,4 @@ from typing import List - from macq.trace.fluent import PlanningObject diff --git a/macq/extract/model.py b/macq/extract/model.py index f5195753..9743b718 100644 --- a/macq/extract/model.py +++ b/macq/extract/model.py @@ -8,7 +8,6 @@ from tarski.syntax import land from tarski.syntax.formulas import CompoundFormula, Connective, top -from ..trace import Fluent from ..utils import ComplexEncoder from .learned_action import LearnedAction, LearnedLiftedAction from .learned_fluent import LearnedFluent, LearnedLiftedFluent diff --git a/macq/generate/pddl/fd_random_walk.py b/macq/generate/pddl/fd_random_walk.py index f2ef4f92..1069bbc7 100644 --- a/macq/generate/pddl/fd_random_walk.py +++ b/macq/generate/pddl/fd_random_walk.py @@ -33,6 +33,7 @@ def __init__( prob: str = None, problem_id: int = None, observe_pres_effs: bool = False, + ignore_static_fluents: bool = True, # defaults to true to not break any pre-implemented scripts max_time: float = 30, init_h: int = None, num_traces: int = 1, @@ -65,6 +66,7 @@ def __init__( prob=prob, problem_id=problem_id, observe_pres_effs=observe_pres_effs, + ignore_static_fluents=ignore_static_fluents, num_traces=num_traces, seed=seed, max_time=max_time, diff --git a/macq/generate/pddl/generator.py b/macq/generate/pddl/generator.py index acbb65a0..a91d42e5 100644 --- a/macq/generate/pddl/generator.py +++ b/macq/generate/pddl/generator.py @@ -1,23 +1,26 @@ import re from time import sleep from typing import Set, List, Union -from tarski.io import PDDLReader -from tarski.search import GroundForwardSearchModel -from tarski.search.operations import progress + +import requests +from tarski.fstrips.action import PlainOperator +from tarski.fstrips.fstrips import AddEffect +from tarski.grounding.common import StateVariableLite from tarski.grounding.lp_grounding import ( ground_problem_schemas_into_plain_operators, LPGroundingStrategy, ) +from tarski.io import PDDLReader +from tarski.io import fstrips as iofs +from tarski.model import Model, create +from tarski.search import GroundForwardSearchModel +from tarski.search.operations import progress from tarski.syntax import land -from tarski.syntax.ops import CompoundFormula, flatten -from tarski.syntax.formulas import Atom, neg from tarski.syntax.builtins import BuiltinPredicateSymbol -from tarski.fstrips.action import PlainOperator -from tarski.fstrips.fstrips import AddEffect -from tarski.model import Model, create -from tarski.io import fstrips as iofs +from tarski.syntax.formulas import Atom +from tarski.syntax.ops import CompoundFormula, flatten +from tarski.util import SymbolIndex -import requests from .planning_domains_api import get_problem, get_plan from ..plan import Plan from ...trace import Action, State, PlanningObject, Fluent, Trace, Step @@ -44,7 +47,7 @@ def __init__(self, fluent, message=None): class Generator: """A Generator. - A basic PDDL state trace generator. Handles all parsing and stores the problem, + A basic PDDL state trace generator. Handles all parsings and stores the problem, language, and grounded instance for the child generators to easily access and use. Attributes: @@ -74,9 +77,11 @@ def __init__( prob: str = None, problem_id: int = None, observe_pres_effs: bool = False, + ignore_static_fluents: bool = True # defaults to true to not break any pre-implemented scripts that + # did not consider this argument. ): """Creates a basic PDDL state trace generator. Takes either the raw filenames - of the domain and problem, or a problem ID. + of the domain and problem or a problem ID. Args: dom (str): @@ -87,8 +92,10 @@ def __init__( The ID of the problem to access. observe_pres_effs (bool): Option to observe action preconditions and effects upon generation. + ignore_static_fluents (bool): option to ignore static fluents when generating traces. (default: True) """ # get attributes + self.ignore_static_fluents = ignore_static_fluents self.pddl_dom = dom self.pddl_prob = prob self.problem_id = problem_id @@ -114,7 +121,7 @@ def extract_action_typing(self): """Retrieves a dictionary mapping all of this problem's actions and the types of objects they act upon. - i.e. given the standard blocks problem/domain, this function would return: + i.e., given the standard blocks problem/domain, this function would return: {'pick-up': ['object'], 'put-down': ['object'], 'stack': ['object', 'object'], 'unstack': ['object', 'object']} @@ -133,7 +140,7 @@ def extract_predicate_typing(self): """Retrieves a dictionary mapping all of this problem's predicates and the types of objects they act upon. - i.e. given the standard blocks problem/domain, this function would return: + i.e., given the standard blocks problem/domain, this function would return: {'=': ['object', 'object'], '!=': ['object', 'object'], 'on': ['object', 'object'], 'ontable': ['object'], 'clear': ['object'], 'handempty': [], 'holding': ['object']} @@ -168,20 +175,21 @@ def __get_op_dict(self): op_dict["".join(["(", o.name.replace("(", " ").replace(",", "")])] = o return op_dict - def __get_all_grounded_fluents(self): + def __get_all_grounded_fluents(self) -> List[Fluent]: """Extracts all the grounded fluents in the problem. Returns: A list of all the grounded fluents in the problem, in the form of macq Fluents. """ - return [ - self.__tarski_atom_to_macq_fluent(grounded_fluent.to_atom()) - for grounded_fluent in LPGroundingStrategy( - self.problem, include_variable_inequalities=True - ) - .ground_state_variables() - .objects - ] + if self.ignore_static_fluents: + l1 = [self.__tarski_atom_to_macq_fluent(grounded_fluent.to_atom()) for grounded_fluent in LPGroundingStrategy( + self.problem, include_variable_inequalities=True).ground_state_variables().objects] + else: + l1 = [ + self.__tarski_atom_to_macq_fluent(grounded_fluent.to_atom()) + for grounded_fluent in ExtractStaticFluents( + self.problem, include_variable_inequalities=True).ground_state_variables().objects] + return l1 def __effect_split(self, act: PlainOperator): """Converts the effects of an action as defined by tarski to fluents as defined by macq. @@ -294,6 +302,7 @@ def tarski_act_to_macq(self, tarski_act: PlainOperator): else Action(name=name, obj_params=obj_params) ) + def change_init( self, init_fluents: Union[Set[Fluent], List[Fluent]], @@ -303,7 +312,7 @@ def change_init( """Changes the initial state of the `Generator`. The domain and problem PDDL files are rewritten to accomodate the new goal for later use by a planner. - Args: + Parameters: init_fluents (Union[Set[Fluent], List[Fluent]]): The collection of fluents that will make up the new initial state. new_domain (str): @@ -376,7 +385,7 @@ def change_goal( self.pddl_dom = new_domain self.pddl_prob = new_prob - def generate_plan(self, from_ipc_file: bool = False, filename: str = None): + def generate_plan(self, from_ipc_file: bool = False, filename: str = None) -> Plan | None: """Generates a plan. If reading from an IPC file, the `Plan` is read directly. Otherwise, if the initial state or goal was changed, these changes are taken into account through the updated PDDL files. If no changes were made, the default nitial state/goal in the initial problem file is used. @@ -422,7 +431,7 @@ def get_api_response(delays: List[int]): plan_list = [f'({action})' for action in actions_with_objects] return plan_list - except TypeError: + except KeyError: return get_api_response(delays[1:]) plan = get_api_response([0, 1, 3, 5, 10]) @@ -465,3 +474,29 @@ def generate_single_trace_from_plan(self, plan: Plan): else: trace.append(Step(macq_state, None, i + 1)) return trace + + +class ExtractStaticFluents(LPGroundingStrategy): + def __init__(self, problem, ground_actions=True, include_variable_inequalities=False): + super().__init__(problem=problem, ground_actions=ground_actions, + include_variable_inequalities=include_variable_inequalities) + + def ground_state_variables(self): + """ Create and index all state variables of the problem by exhaustively grounding all predicate and function + symbols that are considered to be fluent with respect to the problem constants. Thus, if the problem has one + fluent predicate "p" and one static predicate "q", and constants "a", "b", "c", the result of this operation + will be the state variables "p(a)", "p(b)" and "p(c)". + """ + model = self._solve_lp() + + variables = SymbolIndex() + for symbol in self.fluent_symbols.union(self.static_symbols): + + lang = symbol.language + key = 'atom_' + symbol.name + if key in model: # in case there is no reachable ground state variable from that fluent symbol + for binding in model[key]: + binding_with_constants = tuple(lang.get(c) for c in binding) + variables.add(StateVariableLite(symbol, binding_with_constants)) + + return variables diff --git a/macq/generate/pddl/random_goal_sampling.py b/macq/generate/pddl/random_goal_sampling.py index 064c45ad..1c926349 100644 --- a/macq/generate/pddl/random_goal_sampling.py +++ b/macq/generate/pddl/random_goal_sampling.py @@ -1,7 +1,6 @@ import random -from typing import Dict -from tarski.syntax.formulas import Atom from collections import OrderedDict +from typing import Dict from . import VanillaSampling from ...trace import TraceList, State from ...utils import PercentError, basic_timer, progress @@ -39,6 +38,7 @@ def __init__( problem_id: int = None, max_time: float = 30, observe_pres_effs: bool = False, + ignore_static_fluents: bool = True, # defaults to true to not break any pre-implemented scripts ): """ Initializes a random goal state trace sampler using the plan length, number of traces, @@ -78,6 +78,7 @@ def __init__( problem_id=problem_id, num_traces=num_traces, observe_pres_effs=observe_pres_effs, + ignore_static_fluents=ignore_static_fluents, max_time=max_time, ) diff --git a/macq/generate/pddl/trace_from_goal.py b/macq/generate/pddl/trace_from_goal.py index 9196c9d1..fe49a4da 100644 --- a/macq/generate/pddl/trace_from_goal.py +++ b/macq/generate/pddl/trace_from_goal.py @@ -8,6 +8,8 @@ def __init__( prob: str = None, problem_id: int = None, observe_pres_effs: bool = False, + ignore_static_fluents: bool = True, # defaults to true to not break any pre-implemented scripts + ): """ Initializes a goal state trace sampler using the domain and problem. This method of sampling @@ -28,6 +30,7 @@ def __init__( prob=prob, problem_id=problem_id, observe_pres_effs=observe_pres_effs, + ignore_static_fluents=ignore_static_fluents ) self.trace = self.generate_trace() diff --git a/macq/generate/pddl/vanilla_sampling.py b/macq/generate/pddl/vanilla_sampling.py index 5a1a786a..eda01486 100644 --- a/macq/generate/pddl/vanilla_sampling.py +++ b/macq/generate/pddl/vanilla_sampling.py @@ -39,6 +39,7 @@ def __init__( prob: str = None, problem_id: int = None, observe_pres_effs: bool = False, + ignore_static_fluents: bool = True, # defaults to true to not break any pre-implemented scripts plan_len: int = 1, num_traces: int = 0, seed: int = None, @@ -69,6 +70,7 @@ def __init__( prob=prob, problem_id=problem_id, observe_pres_effs=observe_pres_effs, + ignore_static_fluents=ignore_static_fluents ) if max_time <= 0: raise InvalidTime() diff --git a/macq/trace/fluent.py b/macq/trace/fluent.py index f3c977c8..7f2d1130 100644 --- a/macq/trace/fluent.py +++ b/macq/trace/fluent.py @@ -1,6 +1,7 @@ from typing import List + class PlanningObject: """An object of a planning domain. @@ -83,6 +84,8 @@ def __eq__(self, other): and self.objects == other.objects ) + # why do one fluent has the property of being 'smaller' under some relation over all fluents? + # should be implemented as an internal method for the learning algorthm that uses it! def __lt__(self, other): if not isinstance(other, Fluent): raise TypeError(f"Cannot compare Fluent to {other.__name__}.") diff --git a/tests/core/__init__.py b/tests/core/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/core/test_actions.py b/tests/core/test_actions.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/core/test_extract.py b/tests/core/test_extract.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/core/test_fluent.py b/tests/core/test_fluent.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/core/test_model.py b/tests/core/test_model.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/core/test_object.py b/tests/core/test_object.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/core/test_observations.py b/tests/core/test_observations.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/core/test_trace.py b/tests/core/test_trace.py new file mode 100644 index 00000000..e69de29b