HierarchyCraft as planning problem
HierarchyCraft environments can be converted to planning problem in one line thanks to the Unified Planning Framework (UPF):
problem = env.planning_problem()
print(problem.upf_problem)
Then they can be solved with an installed planner (default is enhsp):
problem.solve()
print(problem.plan)
Actions can be extracted from a planner to interact with the environment:
done = False
_observation, _info = env.reset()
while not done:
# Automatically replan at the end of each plan until env termination
# Observations are not used when blindly following a current plan
# But the state in required in order to replan if there is no plan left
action = planning_problem.action_from_plan(env.state)
if action is None:
# Plan is existing but empty, thus nothing to do, thus terminates
done = True
continue
_observation, _reward, terminated, truncated, _info = env.step(action)
done = terminated or truncated
if terminated:
print("Success ! The plan worked in the actual environment !")
else:
print("Failed ... Something went wrong with the plan or the episode was truncated.")
HierarchyCraft as PDDL2.1 domain & problem
The Unified Planning Framework itself allows to write planning problems in the PDDL2.1 language, commonly used in the planning community. For instructions, refer to the UPF documentation.
Types
There is only three types in every HierarchyCraft environment:
- zones
- items that the player can obtain
- items that zones can obtain.
Constants
Most of the time, zones and items will be directly referenced in actions of the PDDL problem. Hence, all zones and items will be constants of the domain and not objects of the problem.
Predicates
The only predicates are about the player position. They allow to track where is the player currently ("pos") and where has has been ("visited").
Functions
HierarchyCraft relies on numerical-fluents from PDDL2.1, they allow to define the player inventory ("amount") and the inventories of each zone ("amount_at")
Actions
Actions are directly defined by transformations. Each transformation will be converted to an action. Respectfully, the min/max conditions of the transformation will become the preconditions of the PDDL action and the added/removed items and updated position of the transformation will become the effects of the PDDL action.
Initialisation
The problem file, will define the initial state of every inventory for every item together with the initial position of the player.
Goal
The PDDL goal is translated from the HierarchyCraft purpose. If multiple terminal groups exists in the purpose, the PDDL goal will be the highest value terminal group.
1"""# HierarchyCraft as planning problem 2 3HierarchyCraft environments can be converted to planning problem in one line 4thanks to the Unified Planning Framework (UPF): 5 6```python 7problem = env.planning_problem() 8print(problem.upf_problem) 9``` 10 11Then they can be solved with an installed planner (default is enhsp): 12 13```python 14problem.solve() 15print(problem.plan) 16``` 17 18Actions can be extracted from a planner to interact with the environment: 19 20```python 21 22done = False 23_observation, _info = env.reset() 24 25while not done: 26 # Automatically replan at the end of each plan until env termination 27 28 # Observations are not used when blindly following a current plan 29 # But the state in required in order to replan if there is no plan left 30 action = planning_problem.action_from_plan(env.state) 31 if action is None: 32 # Plan is existing but empty, thus nothing to do, thus terminates 33 done = True 34 continue 35 _observation, _reward, terminated, truncated, _info = env.step(action) 36 done = terminated or truncated 37 38if terminated: 39 print("Success ! The plan worked in the actual environment !") 40else: 41 print("Failed ... Something went wrong with the plan or the episode was truncated.") 42 43``` 44 45## HierarchyCraft as PDDL2.1 domain & problem 46 47The Unified Planning Framework itself allows to write planning problems in the PDDL2.1 language, 48commonly used in the planning community. For instructions, refer to the 49[UPF documentation](https://unified-planning.readthedocs.io/en/stable/interoperability.html). 50 51 52 53 54### Types 55There is only three types in every HierarchyCraft environment: 56- zones 57- items that the player can obtain 58- items that zones can obtain. 59 60### Constants 61Most of the time, zones and items will be directly referenced in actions of the PDDL problem. 62Hence, all zones and items will be constants of the domain and not objects of the problem. 63 64### Predicates 65The only predicates are about the player position. 66They allow to track where is the player currently ("pos") and where has has been ("visited"). 67 68### Functions 69HierarchyCraft relies on numerical-fluents from PDDL2.1, 70they allow to define the player inventory ("amount") and the inventories of each zone ("amount_at") 71 72### Actions 73Actions are directly defined by transformations. 74Each transformation will be converted to an action. 75Respectfully, the min/max conditions of the transformation will become the preconditions of the PDDL action 76and the added/removed items and updated position of the transformation will become the effects of the PDDL action. 77 78 79 80### Initialisation 81The problem file, will define the initial state 82of every inventory for every item together with the initial position of the player. 83 84### Goal 85The PDDL goal is translated from the HierarchyCraft purpose. 86If multiple terminal groups exists in the purpose, the PDDL goal will be the highest value terminal group. 87 88""" 89 90from warnings import warn 91from typing import TYPE_CHECKING, Dict, Optional, Union, List 92from copy import deepcopy 93 94 95from hcraft.transformation import Transformation, InventoryOwner 96from hcraft.task import Task, GetItemTask, PlaceItemTask, GoToZoneTask 97from hcraft.purpose import Purpose 98from hcraft.elements import Zone, Item 99 100# unified_planning is an optional dependency. 101UPF_AVAILABLE = True 102try: 103 import unified_planning.shortcuts as ups 104 from unified_planning.plans import SequentialPlan 105 from unified_planning.engines.results import PlanGenerationResult 106 from unified_planning.model.problem import Problem 107 108 UserType = ups.UserType 109 Object = ups.Object 110 BoolType = ups.BoolType 111 IntType = ups.IntType 112 Fluent = ups.Fluent 113 InstantaneousAction = ups.InstantaneousAction 114 OneshotPlanner = ups.OneshotPlanner 115 OR = ups.Or 116 AND = ups.And 117 GE = ups.GE 118 LE = ups.LE 119 120 121except ImportError: 122 UPF_AVAILABLE = False 123 124 125if TYPE_CHECKING: 126 from hcraft.state import HcraftState 127 128Statistics = Dict[str, Union[int, float]] 129 130 131class HcraftPlanningProblem: 132 """Interface between the unified planning framework and HierarchyCraft.""" 133 134 def __init__( 135 self, 136 state: "HcraftState", 137 name: str, 138 purpose: Optional["Purpose"], 139 timeout: float = 60, 140 planner_name: Optional[str] = None, 141 ) -> None: 142 """Initialize a HierarchyCraft planning problem on the given state and purpose. 143 144 Args: 145 state: Initial state of the HierarchyCraft environment. 146 name: Name of the planning problem. 147 purpose: Purpose used to compute the planning goal. 148 timeout: Time budget (s) for the plan to be found before giving up. 149 Set to -1 for no limit. Defaults to 60. 150 """ 151 if not UPF_AVAILABLE: 152 raise ImportError( 153 "Missing planning dependencies. Install with:\n" 154 "pip install hcraft[planning]" 155 ) 156 self.upf_problem: "Problem" = self._init_problem(state, name, purpose) 157 self.plan: Optional["SequentialPlan"] = None 158 self.plans: List["SequentialPlan"] = [] 159 self.stats: List["Statistics"] = [] 160 self.timeout = timeout 161 self.planner_name = planner_name 162 163 def action_from_plan(self, state: "HcraftState") -> Optional[int]: 164 """Get the next gym action from a given state. 165 166 If a plan is already existing, just use the next action in the plan. 167 If no plan exists, first update and solve the planning problem. 168 169 Args: 170 state (HcraftState): Current state of the hcraft environement. 171 172 Returns: 173 int: Action to take according to the plan. Returns None if no action is required. 174 """ 175 if self.plan is None: 176 self.update_problem_to_state(self.upf_problem, state) 177 self.solve() 178 if not self.plan.actions: # Empty plan, nothing to do 179 return None 180 plan_action_name = str(self.plan.actions.pop(0)) 181 if not self.plan.actions: 182 self.plan = None 183 action = int(plan_action_name.split("_")[0]) 184 return action 185 186 def update_problem_to_state(self, upf_problem: "Problem", state: "HcraftState"): 187 """Update the planning problem initial state to the given state. 188 189 Args: 190 state: HierarchyCraft state to use as reference for the 191 initial state of the planning problem. 192 """ 193 current_pos = state.current_zone 194 for zone in state.world.zones: 195 discovered = state.has_discovered(zone) 196 if current_pos is not None: 197 upf_problem.set_initial_value( 198 self.pos(self.zones_obj[zone]), zone == current_pos 199 ) 200 upf_problem.set_initial_value( 201 self.visited(self.zones_obj[zone]), discovered 202 ) 203 204 for item in state.world.items: 205 quantity = state.amount_of(item) 206 upf_problem.set_initial_value(self.amount(self.items_obj[item]), quantity) 207 208 for zone in state.world.zones: 209 for zone_item in state.world.zones_items: 210 quantity = state.amount_of(zone_item, zone) 211 upf_problem.set_initial_value( 212 self.amount_at( 213 self.zone_items_obj[zone_item], self.zones_obj[zone] 214 ), 215 quantity, 216 ) 217 218 def solve(self) -> "PlanGenerationResult": 219 """Solve the current planning problem with a planner.""" 220 planner_kwargs = {"problem_kind": self.upf_problem.kind} 221 if self.planner_name is not None: 222 planner_kwargs.update(name=self.planner_name) 223 with OneshotPlanner(**planner_kwargs) as planner: 224 results: "PlanGenerationResult" = planner.solve( 225 self.upf_problem, timeout=self.timeout 226 ) 227 if results.plan is None: 228 raise ValueError("Not plan could be found for this problem.") 229 self.plan = deepcopy(results.plan) 230 self.plans.append(deepcopy(results.plan)) 231 self.stats.append(_read_statistics(results)) 232 return results 233 234 def _init_problem( 235 self, state: "HcraftState", name: str, purpose: Optional["Purpose"] 236 ) -> "Problem": 237 """Build a unified planning problem from the given world and purpose. 238 239 Args: 240 world: HierarchyCraft world to generate the problem from. 241 name: Name given to the planning problem. 242 purpose: Purpose of the agent. 243 Will be used to set the goal of the planning problem. 244 245 Returns: 246 Problem: Unified planning problem. 247 """ 248 upf_problem = Problem(name) 249 self.zone_type = UserType("zone") 250 self.player_item_type = UserType("player_item") 251 self.zone_item_type = UserType("zone_item") 252 253 self.zones_obj: Dict[Zone, "Object"] = {} 254 for zone in state.world.zones: 255 self.zones_obj[zone] = Object(zone.name, self.zone_type) 256 257 self.items_obj: Dict[Item, "Object"] = {} 258 for item in state.world.items: 259 self.items_obj[item] = Object(item.name, self.player_item_type) 260 261 self.zone_items_obj: Dict[Item, "Object"] = {} 262 for item in state.world.zones_items: 263 self.zone_items_obj[item] = Object( 264 f"{item.name}_in_zone", self.zone_item_type 265 ) 266 267 upf_problem.add_objects(self.zones_obj.values()) 268 upf_problem.add_objects(self.items_obj.values()) 269 upf_problem.add_objects(self.zone_items_obj.values()) 270 271 self.pos = Fluent("pos", BoolType(), zone=self.zone_type) 272 self.visited = Fluent("visited", BoolType(), zone=self.zone_type) 273 self.amount = Fluent("amount", IntType(), item=self.player_item_type) 274 self.amount_at = Fluent( 275 "amount_at", IntType(), item=self.zone_item_type, zone=self.zone_type 276 ) 277 278 upf_problem.add_fluent(self.pos, default_initial_value=False) 279 upf_problem.add_fluent(self.visited, default_initial_value=False) 280 upf_problem.add_fluent(self.amount, default_initial_value=0) 281 upf_problem.add_fluent(self.amount_at, default_initial_value=0) 282 283 actions = [] 284 for t_id, transfo in enumerate(state.world.transformations): 285 actions.append(self._action_from_transformation(transfo, t_id)) 286 287 upf_problem.add_actions(actions) 288 289 if purpose is not None and purpose.terminal_groups: 290 upf_problem.add_goal(self._purpose_to_goal(purpose)) 291 else: 292 warn("No purpose was given, thus all plans will be empty.") 293 294 self.update_problem_to_state(upf_problem, state) 295 return upf_problem 296 297 def _action_from_transformation( 298 self, transformation: "Transformation", transformation_id: int 299 ) -> "InstantaneousAction": 300 action_name = f"{transformation_id}_{transformation.name}" 301 action = InstantaneousAction(action_name) 302 loc = None 303 if len(self.zones_obj) > 0: 304 action = InstantaneousAction(action_name, loc=self.zone_type) 305 loc = action.parameter("loc") 306 action.add_precondition(self.pos(loc)) 307 308 if transformation.zone and len(self.zones_obj) > 1: 309 action.add_precondition(self.pos(self.zones_obj[transformation.zone])) 310 311 if transformation.destination is not None: 312 action.add_effect(self.pos(loc), False) 313 action.add_effect( 314 self.visited(self.zones_obj[transformation.destination]), True 315 ) 316 action.add_effect( 317 self.pos(self.zones_obj[transformation.destination]), True 318 ) 319 320 self._add_player_operation(action, transformation) 321 self._add_current_zone_operations(action, transformation, loc) 322 return action 323 324 def _add_player_operation( 325 self, 326 action: "InstantaneousAction", 327 transfo: Transformation, 328 ): 329 player = InventoryOwner.PLAYER 330 for stack in transfo.get_changes(player, "add", []): 331 stack_amount = self.amount(self.items_obj[stack.item]) 332 action.add_increase_effect(stack_amount, stack.quantity) 333 334 for max_stack in transfo.get_changes(player, "max", []): 335 stack_amount = self.amount(self.items_obj[max_stack.item]) 336 action.add_precondition(LE(stack_amount, max_stack.quantity)) 337 338 for stack in transfo.get_changes(player, "remove", []): 339 stack_amount = self.amount(self.items_obj[stack.item]) 340 action.add_decrease_effect(stack_amount, stack.quantity) 341 342 for min_stack in transfo.get_changes(player, "min", []): 343 stack_amount = self.amount(self.items_obj[min_stack.item]) 344 action.add_precondition(GE(stack_amount, min_stack.quantity)) 345 346 def _add_current_zone_operations( 347 self, 348 action: "InstantaneousAction", 349 transfo: Transformation, 350 loc, 351 ): 352 current = InventoryOwner.CURRENT 353 for stack in transfo.get_changes(current, "add", []): 354 amount_at_loc = self.amount_at(self.zone_items_obj[stack.item], loc) 355 action.add_increase_effect(amount_at_loc, stack.quantity) 356 357 for max_stack in transfo.get_changes(current, "max", []): 358 amount_at_loc = self.amount_at(self.zone_items_obj[max_stack.item], loc) 359 action.add_precondition(LE(amount_at_loc, max_stack.quantity)) 360 361 for rem_stack in transfo.get_changes(current, "remove", []): 362 amount_at_loc = self.amount_at(self.zone_items_obj[rem_stack.item], loc) 363 action.add_decrease_effect(amount_at_loc, rem_stack.quantity) 364 365 for min_stack in transfo.get_changes(current, "min", []): 366 stack_amount = self.amount_at(self.zone_items_obj[min_stack.item], loc) 367 action.add_precondition(GE(stack_amount, min_stack.quantity)) 368 369 def _task_to_goal(self, task: "Task"): 370 if isinstance(task, GetItemTask): 371 item = self.items_obj[task.item_stack.item] 372 return GE(self.amount(item), task.item_stack.quantity) 373 if isinstance(task, PlaceItemTask): 374 item = self.zone_items_obj[task.item_stack.item] 375 zones = self.zones_obj.keys() 376 if task.zone is not None: 377 zones = [task.zone] 378 conditions = [ 379 GE( 380 self.amount_at(item, self.zones_obj[zone]), 381 task.item_stack.quantity, 382 ) 383 for zone in zones 384 ] 385 if len(conditions) == 1: 386 return conditions[0] 387 return OR(*conditions) 388 if isinstance(task, GoToZoneTask): 389 return self.visited(self.zones_obj[task.zone]) 390 raise NotImplementedError 391 392 def _purpose_to_goal(self, purpose: "Purpose"): 393 # Individual tasks goals 394 goals = {} 395 for task in purpose.tasks: 396 goals[task] = self._task_to_goal(task) 397 398 # We only consider the best terminal group goal 399 return AND(*[goals[task] for task in purpose.best_terminal_group.tasks]) 400 401 402def _read_statistics(results: "PlanGenerationResult") -> Statistics: 403 if results.engine_name == "enhsp": 404 return _read_enhsp_stats(results) 405 elif results.engine_name == "aries": 406 return _read_aries_stats(results) 407 elif results.engine_name == "lpg": 408 return _read_lpg_stats(results) 409 raise NotImplementedError( 410 "Cannot read statistics for engine %s", results.engine_name 411 ) 412 413 414def _read_lpg_stats(results: "PlanGenerationResult"): 415 metric = results.metrics if results.metrics is not None else {} 416 statistic_logs = results.log_messages[0].message.split("\n\n")[-1].split("\r\n") 417 418 solution_metrics_paragraph = False 419 for stat_log in statistic_logs: 420 stat_log = stat_log.replace("\r", "") 421 name_and_value = stat_log.split(":") 422 if name_and_value[0].strip() == "Solution number": 423 solution_metrics_paragraph = True 424 elif not solution_metrics_paragraph: 425 continue 426 elif name_and_value[0].strip() == "Plan file": 427 solution_metrics_paragraph = False 428 continue 429 try: 430 metric[name_and_value[0]] = int(name_and_value[1]) 431 except ValueError: 432 metric[name_and_value[0]] = float(name_and_value[1]) 433 return metric 434 435 436def _read_aries_stats(results: "PlanGenerationResult"): 437 return results.metrics 438 439 440def _read_enhsp_stats(results: "PlanGenerationResult"): 441 statistic_logs = results.log_messages[0].message.split("\n\n")[-1].split("\n") 442 stats = {} 443 for stat_log in statistic_logs: 444 stat_log = stat_log.replace("\r", "") 445 name_and_value = stat_log.split(":") 446 if len(name_and_value) != 2: 447 continue 448 try: 449 stats[name_and_value[0]] = int(name_and_value[1]) 450 except ValueError: 451 stats[name_and_value[0]] = float(name_and_value[1]) 452 return stats
API Documentation
132class HcraftPlanningProblem: 133 """Interface between the unified planning framework and HierarchyCraft.""" 134 135 def __init__( 136 self, 137 state: "HcraftState", 138 name: str, 139 purpose: Optional["Purpose"], 140 timeout: float = 60, 141 planner_name: Optional[str] = None, 142 ) -> None: 143 """Initialize a HierarchyCraft planning problem on the given state and purpose. 144 145 Args: 146 state: Initial state of the HierarchyCraft environment. 147 name: Name of the planning problem. 148 purpose: Purpose used to compute the planning goal. 149 timeout: Time budget (s) for the plan to be found before giving up. 150 Set to -1 for no limit. Defaults to 60. 151 """ 152 if not UPF_AVAILABLE: 153 raise ImportError( 154 "Missing planning dependencies. Install with:\n" 155 "pip install hcraft[planning]" 156 ) 157 self.upf_problem: "Problem" = self._init_problem(state, name, purpose) 158 self.plan: Optional["SequentialPlan"] = None 159 self.plans: List["SequentialPlan"] = [] 160 self.stats: List["Statistics"] = [] 161 self.timeout = timeout 162 self.planner_name = planner_name 163 164 def action_from_plan(self, state: "HcraftState") -> Optional[int]: 165 """Get the next gym action from a given state. 166 167 If a plan is already existing, just use the next action in the plan. 168 If no plan exists, first update and solve the planning problem. 169 170 Args: 171 state (HcraftState): Current state of the hcraft environement. 172 173 Returns: 174 int: Action to take according to the plan. Returns None if no action is required. 175 """ 176 if self.plan is None: 177 self.update_problem_to_state(self.upf_problem, state) 178 self.solve() 179 if not self.plan.actions: # Empty plan, nothing to do 180 return None 181 plan_action_name = str(self.plan.actions.pop(0)) 182 if not self.plan.actions: 183 self.plan = None 184 action = int(plan_action_name.split("_")[0]) 185 return action 186 187 def update_problem_to_state(self, upf_problem: "Problem", state: "HcraftState"): 188 """Update the planning problem initial state to the given state. 189 190 Args: 191 state: HierarchyCraft state to use as reference for the 192 initial state of the planning problem. 193 """ 194 current_pos = state.current_zone 195 for zone in state.world.zones: 196 discovered = state.has_discovered(zone) 197 if current_pos is not None: 198 upf_problem.set_initial_value( 199 self.pos(self.zones_obj[zone]), zone == current_pos 200 ) 201 upf_problem.set_initial_value( 202 self.visited(self.zones_obj[zone]), discovered 203 ) 204 205 for item in state.world.items: 206 quantity = state.amount_of(item) 207 upf_problem.set_initial_value(self.amount(self.items_obj[item]), quantity) 208 209 for zone in state.world.zones: 210 for zone_item in state.world.zones_items: 211 quantity = state.amount_of(zone_item, zone) 212 upf_problem.set_initial_value( 213 self.amount_at( 214 self.zone_items_obj[zone_item], self.zones_obj[zone] 215 ), 216 quantity, 217 ) 218 219 def solve(self) -> "PlanGenerationResult": 220 """Solve the current planning problem with a planner.""" 221 planner_kwargs = {"problem_kind": self.upf_problem.kind} 222 if self.planner_name is not None: 223 planner_kwargs.update(name=self.planner_name) 224 with OneshotPlanner(**planner_kwargs) as planner: 225 results: "PlanGenerationResult" = planner.solve( 226 self.upf_problem, timeout=self.timeout 227 ) 228 if results.plan is None: 229 raise ValueError("Not plan could be found for this problem.") 230 self.plan = deepcopy(results.plan) 231 self.plans.append(deepcopy(results.plan)) 232 self.stats.append(_read_statistics(results)) 233 return results 234 235 def _init_problem( 236 self, state: "HcraftState", name: str, purpose: Optional["Purpose"] 237 ) -> "Problem": 238 """Build a unified planning problem from the given world and purpose. 239 240 Args: 241 world: HierarchyCraft world to generate the problem from. 242 name: Name given to the planning problem. 243 purpose: Purpose of the agent. 244 Will be used to set the goal of the planning problem. 245 246 Returns: 247 Problem: Unified planning problem. 248 """ 249 upf_problem = Problem(name) 250 self.zone_type = UserType("zone") 251 self.player_item_type = UserType("player_item") 252 self.zone_item_type = UserType("zone_item") 253 254 self.zones_obj: Dict[Zone, "Object"] = {} 255 for zone in state.world.zones: 256 self.zones_obj[zone] = Object(zone.name, self.zone_type) 257 258 self.items_obj: Dict[Item, "Object"] = {} 259 for item in state.world.items: 260 self.items_obj[item] = Object(item.name, self.player_item_type) 261 262 self.zone_items_obj: Dict[Item, "Object"] = {} 263 for item in state.world.zones_items: 264 self.zone_items_obj[item] = Object( 265 f"{item.name}_in_zone", self.zone_item_type 266 ) 267 268 upf_problem.add_objects(self.zones_obj.values()) 269 upf_problem.add_objects(self.items_obj.values()) 270 upf_problem.add_objects(self.zone_items_obj.values()) 271 272 self.pos = Fluent("pos", BoolType(), zone=self.zone_type) 273 self.visited = Fluent("visited", BoolType(), zone=self.zone_type) 274 self.amount = Fluent("amount", IntType(), item=self.player_item_type) 275 self.amount_at = Fluent( 276 "amount_at", IntType(), item=self.zone_item_type, zone=self.zone_type 277 ) 278 279 upf_problem.add_fluent(self.pos, default_initial_value=False) 280 upf_problem.add_fluent(self.visited, default_initial_value=False) 281 upf_problem.add_fluent(self.amount, default_initial_value=0) 282 upf_problem.add_fluent(self.amount_at, default_initial_value=0) 283 284 actions = [] 285 for t_id, transfo in enumerate(state.world.transformations): 286 actions.append(self._action_from_transformation(transfo, t_id)) 287 288 upf_problem.add_actions(actions) 289 290 if purpose is not None and purpose.terminal_groups: 291 upf_problem.add_goal(self._purpose_to_goal(purpose)) 292 else: 293 warn("No purpose was given, thus all plans will be empty.") 294 295 self.update_problem_to_state(upf_problem, state) 296 return upf_problem 297 298 def _action_from_transformation( 299 self, transformation: "Transformation", transformation_id: int 300 ) -> "InstantaneousAction": 301 action_name = f"{transformation_id}_{transformation.name}" 302 action = InstantaneousAction(action_name) 303 loc = None 304 if len(self.zones_obj) > 0: 305 action = InstantaneousAction(action_name, loc=self.zone_type) 306 loc = action.parameter("loc") 307 action.add_precondition(self.pos(loc)) 308 309 if transformation.zone and len(self.zones_obj) > 1: 310 action.add_precondition(self.pos(self.zones_obj[transformation.zone])) 311 312 if transformation.destination is not None: 313 action.add_effect(self.pos(loc), False) 314 action.add_effect( 315 self.visited(self.zones_obj[transformation.destination]), True 316 ) 317 action.add_effect( 318 self.pos(self.zones_obj[transformation.destination]), True 319 ) 320 321 self._add_player_operation(action, transformation) 322 self._add_current_zone_operations(action, transformation, loc) 323 return action 324 325 def _add_player_operation( 326 self, 327 action: "InstantaneousAction", 328 transfo: Transformation, 329 ): 330 player = InventoryOwner.PLAYER 331 for stack in transfo.get_changes(player, "add", []): 332 stack_amount = self.amount(self.items_obj[stack.item]) 333 action.add_increase_effect(stack_amount, stack.quantity) 334 335 for max_stack in transfo.get_changes(player, "max", []): 336 stack_amount = self.amount(self.items_obj[max_stack.item]) 337 action.add_precondition(LE(stack_amount, max_stack.quantity)) 338 339 for stack in transfo.get_changes(player, "remove", []): 340 stack_amount = self.amount(self.items_obj[stack.item]) 341 action.add_decrease_effect(stack_amount, stack.quantity) 342 343 for min_stack in transfo.get_changes(player, "min", []): 344 stack_amount = self.amount(self.items_obj[min_stack.item]) 345 action.add_precondition(GE(stack_amount, min_stack.quantity)) 346 347 def _add_current_zone_operations( 348 self, 349 action: "InstantaneousAction", 350 transfo: Transformation, 351 loc, 352 ): 353 current = InventoryOwner.CURRENT 354 for stack in transfo.get_changes(current, "add", []): 355 amount_at_loc = self.amount_at(self.zone_items_obj[stack.item], loc) 356 action.add_increase_effect(amount_at_loc, stack.quantity) 357 358 for max_stack in transfo.get_changes(current, "max", []): 359 amount_at_loc = self.amount_at(self.zone_items_obj[max_stack.item], loc) 360 action.add_precondition(LE(amount_at_loc, max_stack.quantity)) 361 362 for rem_stack in transfo.get_changes(current, "remove", []): 363 amount_at_loc = self.amount_at(self.zone_items_obj[rem_stack.item], loc) 364 action.add_decrease_effect(amount_at_loc, rem_stack.quantity) 365 366 for min_stack in transfo.get_changes(current, "min", []): 367 stack_amount = self.amount_at(self.zone_items_obj[min_stack.item], loc) 368 action.add_precondition(GE(stack_amount, min_stack.quantity)) 369 370 def _task_to_goal(self, task: "Task"): 371 if isinstance(task, GetItemTask): 372 item = self.items_obj[task.item_stack.item] 373 return GE(self.amount(item), task.item_stack.quantity) 374 if isinstance(task, PlaceItemTask): 375 item = self.zone_items_obj[task.item_stack.item] 376 zones = self.zones_obj.keys() 377 if task.zone is not None: 378 zones = [task.zone] 379 conditions = [ 380 GE( 381 self.amount_at(item, self.zones_obj[zone]), 382 task.item_stack.quantity, 383 ) 384 for zone in zones 385 ] 386 if len(conditions) == 1: 387 return conditions[0] 388 return OR(*conditions) 389 if isinstance(task, GoToZoneTask): 390 return self.visited(self.zones_obj[task.zone]) 391 raise NotImplementedError 392 393 def _purpose_to_goal(self, purpose: "Purpose"): 394 # Individual tasks goals 395 goals = {} 396 for task in purpose.tasks: 397 goals[task] = self._task_to_goal(task) 398 399 # We only consider the best terminal group goal 400 return AND(*[goals[task] for task in purpose.best_terminal_group.tasks])
Interface between the unified planning framework and HierarchyCraft.
135 def __init__( 136 self, 137 state: "HcraftState", 138 name: str, 139 purpose: Optional["Purpose"], 140 timeout: float = 60, 141 planner_name: Optional[str] = None, 142 ) -> None: 143 """Initialize a HierarchyCraft planning problem on the given state and purpose. 144 145 Args: 146 state: Initial state of the HierarchyCraft environment. 147 name: Name of the planning problem. 148 purpose: Purpose used to compute the planning goal. 149 timeout: Time budget (s) for the plan to be found before giving up. 150 Set to -1 for no limit. Defaults to 60. 151 """ 152 if not UPF_AVAILABLE: 153 raise ImportError( 154 "Missing planning dependencies. Install with:\n" 155 "pip install hcraft[planning]" 156 ) 157 self.upf_problem: "Problem" = self._init_problem(state, name, purpose) 158 self.plan: Optional["SequentialPlan"] = None 159 self.plans: List["SequentialPlan"] = [] 160 self.stats: List["Statistics"] = [] 161 self.timeout = timeout 162 self.planner_name = planner_name
Initialize a HierarchyCraft planning problem on the given state and purpose.
Arguments:
- state: Initial state of the HierarchyCraft environment.
- name: Name of the planning problem.
- purpose: Purpose used to compute the planning goal.
- timeout: Time budget (s) for the plan to be found before giving up. Set to -1 for no limit. Defaults to 60.
164 def action_from_plan(self, state: "HcraftState") -> Optional[int]: 165 """Get the next gym action from a given state. 166 167 If a plan is already existing, just use the next action in the plan. 168 If no plan exists, first update and solve the planning problem. 169 170 Args: 171 state (HcraftState): Current state of the hcraft environement. 172 173 Returns: 174 int: Action to take according to the plan. Returns None if no action is required. 175 """ 176 if self.plan is None: 177 self.update_problem_to_state(self.upf_problem, state) 178 self.solve() 179 if not self.plan.actions: # Empty plan, nothing to do 180 return None 181 plan_action_name = str(self.plan.actions.pop(0)) 182 if not self.plan.actions: 183 self.plan = None 184 action = int(plan_action_name.split("_")[0]) 185 return action
Get the next gym action from a given state.
If a plan is already existing, just use the next action in the plan. If no plan exists, first update and solve the planning problem.
Arguments:
- state (HcraftState): Current state of the hcraft environement.
Returns:
int: Action to take according to the plan. Returns None if no action is required.
187 def update_problem_to_state(self, upf_problem: "Problem", state: "HcraftState"): 188 """Update the planning problem initial state to the given state. 189 190 Args: 191 state: HierarchyCraft state to use as reference for the 192 initial state of the planning problem. 193 """ 194 current_pos = state.current_zone 195 for zone in state.world.zones: 196 discovered = state.has_discovered(zone) 197 if current_pos is not None: 198 upf_problem.set_initial_value( 199 self.pos(self.zones_obj[zone]), zone == current_pos 200 ) 201 upf_problem.set_initial_value( 202 self.visited(self.zones_obj[zone]), discovered 203 ) 204 205 for item in state.world.items: 206 quantity = state.amount_of(item) 207 upf_problem.set_initial_value(self.amount(self.items_obj[item]), quantity) 208 209 for zone in state.world.zones: 210 for zone_item in state.world.zones_items: 211 quantity = state.amount_of(zone_item, zone) 212 upf_problem.set_initial_value( 213 self.amount_at( 214 self.zone_items_obj[zone_item], self.zones_obj[zone] 215 ), 216 quantity, 217 )
Update the planning problem initial state to the given state.
Arguments:
- state: HierarchyCraft state to use as reference for the initial state of the planning problem.
219 def solve(self) -> "PlanGenerationResult": 220 """Solve the current planning problem with a planner.""" 221 planner_kwargs = {"problem_kind": self.upf_problem.kind} 222 if self.planner_name is not None: 223 planner_kwargs.update(name=self.planner_name) 224 with OneshotPlanner(**planner_kwargs) as planner: 225 results: "PlanGenerationResult" = planner.solve( 226 self.upf_problem, timeout=self.timeout 227 ) 228 if results.plan is None: 229 raise ValueError("Not plan could be found for this problem.") 230 self.plan = deepcopy(results.plan) 231 self.plans.append(deepcopy(results.plan)) 232 self.stats.append(_read_statistics(results)) 233 return results
Solve the current planning problem with a planner.