HierarchyCraft as planning problem

HierarchyCraft environments can be converted to planning problem in one line thanks to the Unified Planning Framework (UPF):

problem = env.planning_problem()
print(problem.upf_problem)

Then they can be solved with an installed planner (default is enhsp):

problem.solve()
print(problem.plan)

Actions can be extracted from a planner to interact with the environment:

done = False
_observation, _info = env.reset()

while not done:
    # Automatically replan at the end of each plan until env termination

    # Observations are not used when blindly following a current plan
    # But the state in required in order to replan if there is no plan left
    action = planning_problem.action_from_plan(env.state)
    if action is None:
        # Plan is existing but empty, thus nothing to do, thus terminates
        done = True
        continue
    _observation, _reward, terminated, truncated, _info = env.step(action)
    done = terminated or truncated

if terminated:
    print("Success ! The plan worked in the actual environment !")
else:
    print("Failed ... Something went wrong with the plan or the episode was truncated.")

HierarchyCraft as PDDL2.1 domain & problem

The Unified Planning Framework itself allows to write planning problems in the PDDL2.1 language, commonly used in the planning community. For instructions, refer to the UPF documentation.

Types

There is only three types in every HierarchyCraft environment:

  • zones
  • items that the player can obtain
  • items that zones can obtain.

Constants

Most of the time, zones and items will be directly referenced in actions of the PDDL problem. Hence, all zones and items will be constants of the domain and not objects of the problem.

Predicates

The only predicates are about the player position. They allow to track where is the player currently ("pos") and where has has been ("visited").

Functions

HierarchyCraft relies on numerical-fluents from PDDL2.1, they allow to define the player inventory ("amount") and the inventories of each zone ("amount_at")

Actions

Actions are directly defined by transformations. Each transformation will be converted to an action. Respectfully, the min/max conditions of the transformation will become the preconditions of the PDDL action and the added/removed items and updated position of the transformation will become the effects of the PDDL action.

Initialisation

The problem file, will define the initial state of every inventory for every item together with the initial position of the player.

Goal

The PDDL goal is translated from the HierarchyCraft purpose. If multiple terminal groups exists in the purpose, the PDDL goal will be the highest value terminal group.

  1"""# HierarchyCraft as planning problem
  2
  3HierarchyCraft environments can be converted to planning problem in one line
  4thanks to the Unified Planning Framework (UPF):
  5
  6```python
  7problem = env.planning_problem()
  8print(problem.upf_problem)
  9```
 10
 11Then they can be solved with an installed planner (default is enhsp):
 12
 13```python
 14problem.solve()
 15print(problem.plan)
 16```
 17
 18Actions can be extracted from a planner to interact with the environment:
 19
 20```python
 21
 22done = False
 23_observation, _info = env.reset()
 24
 25while not done:
 26    # Automatically replan at the end of each plan until env termination
 27
 28    # Observations are not used when blindly following a current plan
 29    # But the state in required in order to replan if there is no plan left
 30    action = planning_problem.action_from_plan(env.state)
 31    if action is None:
 32        # Plan is existing but empty, thus nothing to do, thus terminates
 33        done = True
 34        continue
 35    _observation, _reward, terminated, truncated, _info = env.step(action)
 36    done = terminated or truncated
 37
 38if terminated:
 39    print("Success ! The plan worked in the actual environment !")
 40else:
 41    print("Failed ... Something went wrong with the plan or the episode was truncated.")
 42
 43```
 44
 45## HierarchyCraft as PDDL2.1 domain & problem
 46
 47The Unified Planning Framework itself allows to write planning problems in the PDDL2.1 language,
 48commonly used in the planning community. For instructions, refer to the
 49[UPF documentation](https://unified-planning.readthedocs.io/en/stable/interoperability.html).
 50
 51
 52![](../../docs/images/PDDL_HierarchyCraft_domain.png)
 53
 54### Types
 55There is only three types in every HierarchyCraft environment:
 56- zones
 57- items that the player can obtain
 58- items that zones can obtain.
 59
 60### Constants
 61Most of the time, zones and items will be directly referenced in actions of the PDDL problem.
 62Hence, all zones and items will be constants of the domain and not objects of the problem.
 63
 64### Predicates
 65The only predicates are about the player position.
 66They allow to track where is the player currently ("pos") and where has has been ("visited").
 67
 68### Functions
 69HierarchyCraft relies on numerical-fluents from PDDL2.1,
 70they allow to define the player inventory ("amount") and the inventories of each zone ("amount_at")
 71
 72### Actions
 73Actions are directly defined by transformations.
 74Each transformation will be converted to an action.
 75Respectfully, the min/max conditions of the transformation will become the preconditions of the PDDL action
 76and the added/removed items and updated position of the transformation will become the effects of the PDDL action.
 77
 78![](../../docs/images/PDDL_HierarchyCraft_problem.png)
 79
 80### Initialisation
 81The problem file, will define the initial state
 82of every inventory for every item together with the initial position of the player.
 83
 84### Goal
 85The PDDL goal is translated from the HierarchyCraft purpose.
 86If multiple terminal groups exists in the purpose, the PDDL goal will be the highest value terminal group.
 87
 88"""
 89
 90from warnings import warn
 91from typing import TYPE_CHECKING, Dict, Optional, Union, List
 92from copy import deepcopy
 93
 94
 95from hcraft.transformation import Transformation, InventoryOwner
 96from hcraft.task import Task, GetItemTask, PlaceItemTask, GoToZoneTask
 97from hcraft.purpose import Purpose
 98from hcraft.elements import Zone, Item
 99
100# unified_planning is an optional dependency.
101UPF_AVAILABLE = True
102try:
103    import unified_planning.shortcuts as ups
104    from unified_planning.plans import SequentialPlan
105    from unified_planning.engines.results import PlanGenerationResult
106    from unified_planning.model.problem import Problem
107
108    UserType = ups.UserType
109    Object = ups.Object
110    BoolType = ups.BoolType
111    IntType = ups.IntType
112    Fluent = ups.Fluent
113    InstantaneousAction = ups.InstantaneousAction
114    OneshotPlanner = ups.OneshotPlanner
115    OR = ups.Or
116    AND = ups.And
117    GE = ups.GE
118    LE = ups.LE
119
120
121except ImportError:
122    UPF_AVAILABLE = False
123
124
125if TYPE_CHECKING:
126    from hcraft.state import HcraftState
127
128Statistics = Dict[str, Union[int, float]]
129
130
131class HcraftPlanningProblem:
132    """Interface between the unified planning framework and HierarchyCraft."""
133
134    def __init__(
135        self,
136        state: "HcraftState",
137        name: str,
138        purpose: Optional["Purpose"],
139        timeout: float = 60,
140        planner_name: Optional[str] = None,
141    ) -> None:
142        """Initialize a HierarchyCraft planning problem on the given state and purpose.
143
144        Args:
145            state: Initial state of the HierarchyCraft environment.
146            name: Name of the planning problem.
147            purpose: Purpose used to compute the planning goal.
148            timeout: Time budget (s) for the plan to be found before giving up.
149                Set to -1 for no limit. Defaults to 60.
150        """
151        if not UPF_AVAILABLE:
152            raise ImportError(
153                "Missing planning dependencies. Install with:\n"
154                "pip install hcraft[planning]"
155            )
156        self.upf_problem: "Problem" = self._init_problem(state, name, purpose)
157        self.plan: Optional["SequentialPlan"] = None
158        self.plans: List["SequentialPlan"] = []
159        self.stats: List["Statistics"] = []
160        self.timeout = timeout
161        self.planner_name = planner_name
162
163    def action_from_plan(self, state: "HcraftState") -> Optional[int]:
164        """Get the next gym action from a given state.
165
166        If a plan is already existing, just use the next action in the plan.
167        If no plan exists, first update and solve the planning problem.
168
169        Args:
170            state (HcraftState): Current state of the hcraft environement.
171
172        Returns:
173            int: Action to take according to the plan. Returns None if no action is required.
174        """
175        if self.plan is None:
176            self.update_problem_to_state(self.upf_problem, state)
177            self.solve()
178        if not self.plan.actions:  # Empty plan, nothing to do
179            return None
180        plan_action_name = str(self.plan.actions.pop(0))
181        if not self.plan.actions:
182            self.plan = None
183        action = int(plan_action_name.split("_")[0])
184        return action
185
186    def update_problem_to_state(self, upf_problem: "Problem", state: "HcraftState"):
187        """Update the planning problem initial state to the given state.
188
189        Args:
190            state: HierarchyCraft state to use as reference for the
191                initial state of the planning problem.
192        """
193        current_pos = state.current_zone
194        for zone in state.world.zones:
195            discovered = state.has_discovered(zone)
196            if current_pos is not None:
197                upf_problem.set_initial_value(
198                    self.pos(self.zones_obj[zone]), zone == current_pos
199                )
200            upf_problem.set_initial_value(
201                self.visited(self.zones_obj[zone]), discovered
202            )
203
204        for item in state.world.items:
205            quantity = state.amount_of(item)
206            upf_problem.set_initial_value(self.amount(self.items_obj[item]), quantity)
207
208        for zone in state.world.zones:
209            for zone_item in state.world.zones_items:
210                quantity = state.amount_of(zone_item, zone)
211                upf_problem.set_initial_value(
212                    self.amount_at(
213                        self.zone_items_obj[zone_item], self.zones_obj[zone]
214                    ),
215                    quantity,
216                )
217
218    def solve(self) -> "PlanGenerationResult":
219        """Solve the current planning problem with a planner."""
220        planner_kwargs = {"problem_kind": self.upf_problem.kind}
221        if self.planner_name is not None:
222            planner_kwargs.update(name=self.planner_name)
223        with OneshotPlanner(**planner_kwargs) as planner:
224            results: "PlanGenerationResult" = planner.solve(
225                self.upf_problem, timeout=self.timeout
226            )
227        if results.plan is None:
228            raise ValueError("Not plan could be found for this problem.")
229        self.plan = deepcopy(results.plan)
230        self.plans.append(deepcopy(results.plan))
231        self.stats.append(_read_statistics(results))
232        return results
233
234    def _init_problem(
235        self, state: "HcraftState", name: str, purpose: Optional["Purpose"]
236    ) -> "Problem":
237        """Build a unified planning problem from the given world and purpose.
238
239        Args:
240            world: HierarchyCraft world to generate the problem from.
241            name: Name given to the planning problem.
242            purpose: Purpose of the agent.
243                Will be used to set the goal of the planning problem.
244
245        Returns:
246            Problem: Unified planning problem.
247        """
248        upf_problem = Problem(name)
249        self.zone_type = UserType("zone")
250        self.player_item_type = UserType("player_item")
251        self.zone_item_type = UserType("zone_item")
252
253        self.zones_obj: Dict[Zone, "Object"] = {}
254        for zone in state.world.zones:
255            self.zones_obj[zone] = Object(zone.name, self.zone_type)
256
257        self.items_obj: Dict[Item, "Object"] = {}
258        for item in state.world.items:
259            self.items_obj[item] = Object(item.name, self.player_item_type)
260
261        self.zone_items_obj: Dict[Item, "Object"] = {}
262        for item in state.world.zones_items:
263            self.zone_items_obj[item] = Object(
264                f"{item.name}_in_zone", self.zone_item_type
265            )
266
267        upf_problem.add_objects(self.zones_obj.values())
268        upf_problem.add_objects(self.items_obj.values())
269        upf_problem.add_objects(self.zone_items_obj.values())
270
271        self.pos = Fluent("pos", BoolType(), zone=self.zone_type)
272        self.visited = Fluent("visited", BoolType(), zone=self.zone_type)
273        self.amount = Fluent("amount", IntType(), item=self.player_item_type)
274        self.amount_at = Fluent(
275            "amount_at", IntType(), item=self.zone_item_type, zone=self.zone_type
276        )
277
278        upf_problem.add_fluent(self.pos, default_initial_value=False)
279        upf_problem.add_fluent(self.visited, default_initial_value=False)
280        upf_problem.add_fluent(self.amount, default_initial_value=0)
281        upf_problem.add_fluent(self.amount_at, default_initial_value=0)
282
283        actions = []
284        for t_id, transfo in enumerate(state.world.transformations):
285            actions.append(self._action_from_transformation(transfo, t_id))
286
287        upf_problem.add_actions(actions)
288
289        if purpose is not None and purpose.terminal_groups:
290            upf_problem.add_goal(self._purpose_to_goal(purpose))
291        else:
292            warn("No purpose was given, thus all plans will be empty.")
293
294        self.update_problem_to_state(upf_problem, state)
295        return upf_problem
296
297    def _action_from_transformation(
298        self, transformation: "Transformation", transformation_id: int
299    ) -> "InstantaneousAction":
300        action_name = f"{transformation_id}_{transformation.name}"
301        action = InstantaneousAction(action_name)
302        loc = None
303        if len(self.zones_obj) > 0:
304            action = InstantaneousAction(action_name, loc=self.zone_type)
305            loc = action.parameter("loc")
306            action.add_precondition(self.pos(loc))
307
308        if transformation.zone and len(self.zones_obj) > 1:
309            action.add_precondition(self.pos(self.zones_obj[transformation.zone]))
310
311        if transformation.destination is not None:
312            action.add_effect(self.pos(loc), False)
313            action.add_effect(
314                self.visited(self.zones_obj[transformation.destination]), True
315            )
316            action.add_effect(
317                self.pos(self.zones_obj[transformation.destination]), True
318            )
319
320        self._add_player_operation(action, transformation)
321        self._add_current_zone_operations(action, transformation, loc)
322        return action
323
324    def _add_player_operation(
325        self,
326        action: "InstantaneousAction",
327        transfo: Transformation,
328    ):
329        player = InventoryOwner.PLAYER
330        for stack in transfo.get_changes(player, "add", []):
331            stack_amount = self.amount(self.items_obj[stack.item])
332            action.add_increase_effect(stack_amount, stack.quantity)
333
334        for max_stack in transfo.get_changes(player, "max", []):
335            stack_amount = self.amount(self.items_obj[max_stack.item])
336            action.add_precondition(LE(stack_amount, max_stack.quantity))
337
338        for stack in transfo.get_changes(player, "remove", []):
339            stack_amount = self.amount(self.items_obj[stack.item])
340            action.add_decrease_effect(stack_amount, stack.quantity)
341
342        for min_stack in transfo.get_changes(player, "min", []):
343            stack_amount = self.amount(self.items_obj[min_stack.item])
344            action.add_precondition(GE(stack_amount, min_stack.quantity))
345
346    def _add_current_zone_operations(
347        self,
348        action: "InstantaneousAction",
349        transfo: Transformation,
350        loc,
351    ):
352        current = InventoryOwner.CURRENT
353        for stack in transfo.get_changes(current, "add", []):
354            amount_at_loc = self.amount_at(self.zone_items_obj[stack.item], loc)
355            action.add_increase_effect(amount_at_loc, stack.quantity)
356
357        for max_stack in transfo.get_changes(current, "max", []):
358            amount_at_loc = self.amount_at(self.zone_items_obj[max_stack.item], loc)
359            action.add_precondition(LE(amount_at_loc, max_stack.quantity))
360
361        for rem_stack in transfo.get_changes(current, "remove", []):
362            amount_at_loc = self.amount_at(self.zone_items_obj[rem_stack.item], loc)
363            action.add_decrease_effect(amount_at_loc, rem_stack.quantity)
364
365        for min_stack in transfo.get_changes(current, "min", []):
366            stack_amount = self.amount_at(self.zone_items_obj[min_stack.item], loc)
367            action.add_precondition(GE(stack_amount, min_stack.quantity))
368
369    def _task_to_goal(self, task: "Task"):
370        if isinstance(task, GetItemTask):
371            item = self.items_obj[task.item_stack.item]
372            return GE(self.amount(item), task.item_stack.quantity)
373        if isinstance(task, PlaceItemTask):
374            item = self.zone_items_obj[task.item_stack.item]
375            zones = self.zones_obj.keys()
376            if task.zone is not None:
377                zones = [task.zone]
378            conditions = [
379                GE(
380                    self.amount_at(item, self.zones_obj[zone]),
381                    task.item_stack.quantity,
382                )
383                for zone in zones
384            ]
385            if len(conditions) == 1:
386                return conditions[0]
387            return OR(*conditions)
388        if isinstance(task, GoToZoneTask):
389            return self.visited(self.zones_obj[task.zone])
390        raise NotImplementedError
391
392    def _purpose_to_goal(self, purpose: "Purpose"):
393        # Individual tasks goals
394        goals = {}
395        for task in purpose.tasks:
396            goals[task] = self._task_to_goal(task)
397
398        # We only consider the best terminal group goal
399        return AND(*[goals[task] for task in purpose.best_terminal_group.tasks])
400
401
402def _read_statistics(results: "PlanGenerationResult") -> Statistics:
403    if results.engine_name == "enhsp":
404        return _read_enhsp_stats(results)
405    elif results.engine_name == "aries":
406        return _read_aries_stats(results)
407    elif results.engine_name == "lpg":
408        return _read_lpg_stats(results)
409    raise NotImplementedError(
410        "Cannot read statistics for engine %s", results.engine_name
411    )
412
413
414def _read_lpg_stats(results: "PlanGenerationResult"):
415    metric = results.metrics if results.metrics is not None else {}
416    statistic_logs = results.log_messages[0].message.split("\n\n")[-1].split("\r\n")
417
418    solution_metrics_paragraph = False
419    for stat_log in statistic_logs:
420        stat_log = stat_log.replace("\r", "")
421        name_and_value = stat_log.split(":")
422        if name_and_value[0].strip() == "Solution number":
423            solution_metrics_paragraph = True
424        elif not solution_metrics_paragraph:
425            continue
426        elif name_and_value[0].strip() == "Plan file":
427            solution_metrics_paragraph = False
428            continue
429        try:
430            metric[name_and_value[0]] = int(name_and_value[1])
431        except ValueError:
432            metric[name_and_value[0]] = float(name_and_value[1])
433    return metric
434
435
436def _read_aries_stats(results: "PlanGenerationResult"):
437    return results.metrics
438
439
440def _read_enhsp_stats(results: "PlanGenerationResult"):
441    statistic_logs = results.log_messages[0].message.split("\n\n")[-1].split("\n")
442    stats = {}
443    for stat_log in statistic_logs:
444        stat_log = stat_log.replace("\r", "")
445        name_and_value = stat_log.split(":")
446        if len(name_and_value) != 2:
447            continue
448        try:
449            stats[name_and_value[0]] = int(name_and_value[1])
450        except ValueError:
451            stats[name_and_value[0]] = float(name_and_value[1])
452    return stats

API Documentation

UPF_AVAILABLE = True
Statistics = typing.Dict[str, typing.Union[int, float]]
class HcraftPlanningProblem:
132class HcraftPlanningProblem:
133    """Interface between the unified planning framework and HierarchyCraft."""
134
135    def __init__(
136        self,
137        state: "HcraftState",
138        name: str,
139        purpose: Optional["Purpose"],
140        timeout: float = 60,
141        planner_name: Optional[str] = None,
142    ) -> None:
143        """Initialize a HierarchyCraft planning problem on the given state and purpose.
144
145        Args:
146            state: Initial state of the HierarchyCraft environment.
147            name: Name of the planning problem.
148            purpose: Purpose used to compute the planning goal.
149            timeout: Time budget (s) for the plan to be found before giving up.
150                Set to -1 for no limit. Defaults to 60.
151        """
152        if not UPF_AVAILABLE:
153            raise ImportError(
154                "Missing planning dependencies. Install with:\n"
155                "pip install hcraft[planning]"
156            )
157        self.upf_problem: "Problem" = self._init_problem(state, name, purpose)
158        self.plan: Optional["SequentialPlan"] = None
159        self.plans: List["SequentialPlan"] = []
160        self.stats: List["Statistics"] = []
161        self.timeout = timeout
162        self.planner_name = planner_name
163
164    def action_from_plan(self, state: "HcraftState") -> Optional[int]:
165        """Get the next gym action from a given state.
166
167        If a plan is already existing, just use the next action in the plan.
168        If no plan exists, first update and solve the planning problem.
169
170        Args:
171            state (HcraftState): Current state of the hcraft environement.
172
173        Returns:
174            int: Action to take according to the plan. Returns None if no action is required.
175        """
176        if self.plan is None:
177            self.update_problem_to_state(self.upf_problem, state)
178            self.solve()
179        if not self.plan.actions:  # Empty plan, nothing to do
180            return None
181        plan_action_name = str(self.plan.actions.pop(0))
182        if not self.plan.actions:
183            self.plan = None
184        action = int(plan_action_name.split("_")[0])
185        return action
186
187    def update_problem_to_state(self, upf_problem: "Problem", state: "HcraftState"):
188        """Update the planning problem initial state to the given state.
189
190        Args:
191            state: HierarchyCraft state to use as reference for the
192                initial state of the planning problem.
193        """
194        current_pos = state.current_zone
195        for zone in state.world.zones:
196            discovered = state.has_discovered(zone)
197            if current_pos is not None:
198                upf_problem.set_initial_value(
199                    self.pos(self.zones_obj[zone]), zone == current_pos
200                )
201            upf_problem.set_initial_value(
202                self.visited(self.zones_obj[zone]), discovered
203            )
204
205        for item in state.world.items:
206            quantity = state.amount_of(item)
207            upf_problem.set_initial_value(self.amount(self.items_obj[item]), quantity)
208
209        for zone in state.world.zones:
210            for zone_item in state.world.zones_items:
211                quantity = state.amount_of(zone_item, zone)
212                upf_problem.set_initial_value(
213                    self.amount_at(
214                        self.zone_items_obj[zone_item], self.zones_obj[zone]
215                    ),
216                    quantity,
217                )
218
219    def solve(self) -> "PlanGenerationResult":
220        """Solve the current planning problem with a planner."""
221        planner_kwargs = {"problem_kind": self.upf_problem.kind}
222        if self.planner_name is not None:
223            planner_kwargs.update(name=self.planner_name)
224        with OneshotPlanner(**planner_kwargs) as planner:
225            results: "PlanGenerationResult" = planner.solve(
226                self.upf_problem, timeout=self.timeout
227            )
228        if results.plan is None:
229            raise ValueError("Not plan could be found for this problem.")
230        self.plan = deepcopy(results.plan)
231        self.plans.append(deepcopy(results.plan))
232        self.stats.append(_read_statistics(results))
233        return results
234
235    def _init_problem(
236        self, state: "HcraftState", name: str, purpose: Optional["Purpose"]
237    ) -> "Problem":
238        """Build a unified planning problem from the given world and purpose.
239
240        Args:
241            world: HierarchyCraft world to generate the problem from.
242            name: Name given to the planning problem.
243            purpose: Purpose of the agent.
244                Will be used to set the goal of the planning problem.
245
246        Returns:
247            Problem: Unified planning problem.
248        """
249        upf_problem = Problem(name)
250        self.zone_type = UserType("zone")
251        self.player_item_type = UserType("player_item")
252        self.zone_item_type = UserType("zone_item")
253
254        self.zones_obj: Dict[Zone, "Object"] = {}
255        for zone in state.world.zones:
256            self.zones_obj[zone] = Object(zone.name, self.zone_type)
257
258        self.items_obj: Dict[Item, "Object"] = {}
259        for item in state.world.items:
260            self.items_obj[item] = Object(item.name, self.player_item_type)
261
262        self.zone_items_obj: Dict[Item, "Object"] = {}
263        for item in state.world.zones_items:
264            self.zone_items_obj[item] = Object(
265                f"{item.name}_in_zone", self.zone_item_type
266            )
267
268        upf_problem.add_objects(self.zones_obj.values())
269        upf_problem.add_objects(self.items_obj.values())
270        upf_problem.add_objects(self.zone_items_obj.values())
271
272        self.pos = Fluent("pos", BoolType(), zone=self.zone_type)
273        self.visited = Fluent("visited", BoolType(), zone=self.zone_type)
274        self.amount = Fluent("amount", IntType(), item=self.player_item_type)
275        self.amount_at = Fluent(
276            "amount_at", IntType(), item=self.zone_item_type, zone=self.zone_type
277        )
278
279        upf_problem.add_fluent(self.pos, default_initial_value=False)
280        upf_problem.add_fluent(self.visited, default_initial_value=False)
281        upf_problem.add_fluent(self.amount, default_initial_value=0)
282        upf_problem.add_fluent(self.amount_at, default_initial_value=0)
283
284        actions = []
285        for t_id, transfo in enumerate(state.world.transformations):
286            actions.append(self._action_from_transformation(transfo, t_id))
287
288        upf_problem.add_actions(actions)
289
290        if purpose is not None and purpose.terminal_groups:
291            upf_problem.add_goal(self._purpose_to_goal(purpose))
292        else:
293            warn("No purpose was given, thus all plans will be empty.")
294
295        self.update_problem_to_state(upf_problem, state)
296        return upf_problem
297
298    def _action_from_transformation(
299        self, transformation: "Transformation", transformation_id: int
300    ) -> "InstantaneousAction":
301        action_name = f"{transformation_id}_{transformation.name}"
302        action = InstantaneousAction(action_name)
303        loc = None
304        if len(self.zones_obj) > 0:
305            action = InstantaneousAction(action_name, loc=self.zone_type)
306            loc = action.parameter("loc")
307            action.add_precondition(self.pos(loc))
308
309        if transformation.zone and len(self.zones_obj) > 1:
310            action.add_precondition(self.pos(self.zones_obj[transformation.zone]))
311
312        if transformation.destination is not None:
313            action.add_effect(self.pos(loc), False)
314            action.add_effect(
315                self.visited(self.zones_obj[transformation.destination]), True
316            )
317            action.add_effect(
318                self.pos(self.zones_obj[transformation.destination]), True
319            )
320
321        self._add_player_operation(action, transformation)
322        self._add_current_zone_operations(action, transformation, loc)
323        return action
324
325    def _add_player_operation(
326        self,
327        action: "InstantaneousAction",
328        transfo: Transformation,
329    ):
330        player = InventoryOwner.PLAYER
331        for stack in transfo.get_changes(player, "add", []):
332            stack_amount = self.amount(self.items_obj[stack.item])
333            action.add_increase_effect(stack_amount, stack.quantity)
334
335        for max_stack in transfo.get_changes(player, "max", []):
336            stack_amount = self.amount(self.items_obj[max_stack.item])
337            action.add_precondition(LE(stack_amount, max_stack.quantity))
338
339        for stack in transfo.get_changes(player, "remove", []):
340            stack_amount = self.amount(self.items_obj[stack.item])
341            action.add_decrease_effect(stack_amount, stack.quantity)
342
343        for min_stack in transfo.get_changes(player, "min", []):
344            stack_amount = self.amount(self.items_obj[min_stack.item])
345            action.add_precondition(GE(stack_amount, min_stack.quantity))
346
347    def _add_current_zone_operations(
348        self,
349        action: "InstantaneousAction",
350        transfo: Transformation,
351        loc,
352    ):
353        current = InventoryOwner.CURRENT
354        for stack in transfo.get_changes(current, "add", []):
355            amount_at_loc = self.amount_at(self.zone_items_obj[stack.item], loc)
356            action.add_increase_effect(amount_at_loc, stack.quantity)
357
358        for max_stack in transfo.get_changes(current, "max", []):
359            amount_at_loc = self.amount_at(self.zone_items_obj[max_stack.item], loc)
360            action.add_precondition(LE(amount_at_loc, max_stack.quantity))
361
362        for rem_stack in transfo.get_changes(current, "remove", []):
363            amount_at_loc = self.amount_at(self.zone_items_obj[rem_stack.item], loc)
364            action.add_decrease_effect(amount_at_loc, rem_stack.quantity)
365
366        for min_stack in transfo.get_changes(current, "min", []):
367            stack_amount = self.amount_at(self.zone_items_obj[min_stack.item], loc)
368            action.add_precondition(GE(stack_amount, min_stack.quantity))
369
370    def _task_to_goal(self, task: "Task"):
371        if isinstance(task, GetItemTask):
372            item = self.items_obj[task.item_stack.item]
373            return GE(self.amount(item), task.item_stack.quantity)
374        if isinstance(task, PlaceItemTask):
375            item = self.zone_items_obj[task.item_stack.item]
376            zones = self.zones_obj.keys()
377            if task.zone is not None:
378                zones = [task.zone]
379            conditions = [
380                GE(
381                    self.amount_at(item, self.zones_obj[zone]),
382                    task.item_stack.quantity,
383                )
384                for zone in zones
385            ]
386            if len(conditions) == 1:
387                return conditions[0]
388            return OR(*conditions)
389        if isinstance(task, GoToZoneTask):
390            return self.visited(self.zones_obj[task.zone])
391        raise NotImplementedError
392
393    def _purpose_to_goal(self, purpose: "Purpose"):
394        # Individual tasks goals
395        goals = {}
396        for task in purpose.tasks:
397            goals[task] = self._task_to_goal(task)
398
399        # We only consider the best terminal group goal
400        return AND(*[goals[task] for task in purpose.best_terminal_group.tasks])

Interface between the unified planning framework and HierarchyCraft.

HcraftPlanningProblem( state: hcraft.HcraftState, name: str, purpose: Optional[hcraft.Purpose], timeout: float = 60, planner_name: Optional[str] = None)
135    def __init__(
136        self,
137        state: "HcraftState",
138        name: str,
139        purpose: Optional["Purpose"],
140        timeout: float = 60,
141        planner_name: Optional[str] = None,
142    ) -> None:
143        """Initialize a HierarchyCraft planning problem on the given state and purpose.
144
145        Args:
146            state: Initial state of the HierarchyCraft environment.
147            name: Name of the planning problem.
148            purpose: Purpose used to compute the planning goal.
149            timeout: Time budget (s) for the plan to be found before giving up.
150                Set to -1 for no limit. Defaults to 60.
151        """
152        if not UPF_AVAILABLE:
153            raise ImportError(
154                "Missing planning dependencies. Install with:\n"
155                "pip install hcraft[planning]"
156            )
157        self.upf_problem: "Problem" = self._init_problem(state, name, purpose)
158        self.plan: Optional["SequentialPlan"] = None
159        self.plans: List["SequentialPlan"] = []
160        self.stats: List["Statistics"] = []
161        self.timeout = timeout
162        self.planner_name = planner_name

Initialize a HierarchyCraft planning problem on the given state and purpose.

Arguments:
  • state: Initial state of the HierarchyCraft environment.
  • name: Name of the planning problem.
  • purpose: Purpose used to compute the planning goal.
  • timeout: Time budget (s) for the plan to be found before giving up. Set to -1 for no limit. Defaults to 60.
upf_problem: unified_planning.model.problem.Problem
plan: Optional[unified_planning.plans.sequential_plan.SequentialPlan]
plans: List[unified_planning.plans.sequential_plan.SequentialPlan]
stats: List[Dict[str, Union[int, float]]]
timeout
planner_name
def action_from_plan(self, state: hcraft.HcraftState) -> Optional[int]:
164    def action_from_plan(self, state: "HcraftState") -> Optional[int]:
165        """Get the next gym action from a given state.
166
167        If a plan is already existing, just use the next action in the plan.
168        If no plan exists, first update and solve the planning problem.
169
170        Args:
171            state (HcraftState): Current state of the hcraft environement.
172
173        Returns:
174            int: Action to take according to the plan. Returns None if no action is required.
175        """
176        if self.plan is None:
177            self.update_problem_to_state(self.upf_problem, state)
178            self.solve()
179        if not self.plan.actions:  # Empty plan, nothing to do
180            return None
181        plan_action_name = str(self.plan.actions.pop(0))
182        if not self.plan.actions:
183            self.plan = None
184        action = int(plan_action_name.split("_")[0])
185        return action

Get the next gym action from a given state.

If a plan is already existing, just use the next action in the plan. If no plan exists, first update and solve the planning problem.

Arguments:
  • state (HcraftState): Current state of the hcraft environement.
Returns:

int: Action to take according to the plan. Returns None if no action is required.

def update_problem_to_state( self, upf_problem: unified_planning.model.problem.Problem, state: hcraft.HcraftState):
187    def update_problem_to_state(self, upf_problem: "Problem", state: "HcraftState"):
188        """Update the planning problem initial state to the given state.
189
190        Args:
191            state: HierarchyCraft state to use as reference for the
192                initial state of the planning problem.
193        """
194        current_pos = state.current_zone
195        for zone in state.world.zones:
196            discovered = state.has_discovered(zone)
197            if current_pos is not None:
198                upf_problem.set_initial_value(
199                    self.pos(self.zones_obj[zone]), zone == current_pos
200                )
201            upf_problem.set_initial_value(
202                self.visited(self.zones_obj[zone]), discovered
203            )
204
205        for item in state.world.items:
206            quantity = state.amount_of(item)
207            upf_problem.set_initial_value(self.amount(self.items_obj[item]), quantity)
208
209        for zone in state.world.zones:
210            for zone_item in state.world.zones_items:
211                quantity = state.amount_of(zone_item, zone)
212                upf_problem.set_initial_value(
213                    self.amount_at(
214                        self.zone_items_obj[zone_item], self.zones_obj[zone]
215                    ),
216                    quantity,
217                )

Update the planning problem initial state to the given state.

Arguments:
  • state: HierarchyCraft state to use as reference for the initial state of the planning problem.
def solve(self) -> unified_planning.engines.results.PlanGenerationResult:
219    def solve(self) -> "PlanGenerationResult":
220        """Solve the current planning problem with a planner."""
221        planner_kwargs = {"problem_kind": self.upf_problem.kind}
222        if self.planner_name is not None:
223            planner_kwargs.update(name=self.planner_name)
224        with OneshotPlanner(**planner_kwargs) as planner:
225            results: "PlanGenerationResult" = planner.solve(
226                self.upf_problem, timeout=self.timeout
227            )
228        if results.plan is None:
229            raise ValueError("Not plan could be found for this problem.")
230        self.plan = deepcopy(results.plan)
231        self.plans.append(deepcopy(results.plan))
232        self.stats.append(_read_statistics(results))
233        return results

Solve the current planning problem with a planner.