Skip to content

Commit

Permalink
Updated task_executor.py
Browse files Browse the repository at this point in the history
  • Loading branch information
mkhoshnam committed Nov 27, 2024
1 parent 46eb6a8 commit e04bc72
Showing 1 changed file with 39 additions and 36 deletions.
75 changes: 39 additions & 36 deletions src/gymnasium_interface/task_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,37 @@
from pycram.world_concepts.world_object import Object
from pycram.datastructures.enums import ObjectType, WorldMode, Grasp
from pycram.datastructures.pose import Pose
from pycram.designators.action_designator import NavigateAction, PickUpAction, PlaceAction, OpenAction, CloseAction
from pycram.designators.action_designator import (
NavigateAction,
PickUpAction,
PlaceAction,
OpenAction,
CloseAction,
)
from pycram.designators.object_designator import BelieveObject
from pycram.process_module import simulated_robot
import logging
from typing import Dict, List, Union

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")


class PyCRAMTaskExecutor:
"""
Handles task execution in a PyCRAM environment. This class integrates with BulletWorld for
managing objects and robot tasks in the simulation.
Handles task execution in a PyCRAM environment. This class integrates with BulletWorld
for managing objects and robot tasks in the simulation.
Attributes:
world (BulletWorld): The BulletWorld instance managing the environment.
robot (Object): The robot object in the environment.
apartment (Object): The apartment or environment object in the simulation.
world: The BulletWorld instance managing the environment.
robot: The robot object in the environment.
apartment: The apartment or environment object in the simulation.
"""

world: BulletWorld
robot: Object
apartment: Object

def __init__(self):
"""
Initializes the task executor for PyCRAM actions.
Expand All @@ -29,7 +41,7 @@ def __init__(self):
self.robot = None
self.apartment = None

def clear_world(self):
def clear_world(self) -> None:
"""
Removes all objects from the BulletWorld.
"""
Expand All @@ -38,12 +50,11 @@ def clear_world(self):
obj.remove()
logging.info("All objects removed from BulletWorld.")

def reset_task(self, objects):
def reset_task(self, objects: List[Dict[str, Union[str, Pose]]]) -> None:
"""
Resets the simulation environment dynamically by clearing the world and adding new objects.
:param objects: List of objects to be added to the environment.
:type objects: list[dict]
"""
self.clear_world()

Expand Down Expand Up @@ -71,46 +82,42 @@ def reset_task(self, objects):

logging.info("Environment reset: Apartment, robot, and dynamic objects added.")

def execute_action(self, action, params):
def execute_action(self, action: str, params: Dict[str, Union[str, Pose, List[Grasp]]]) -> None:
"""
Executes a PyCRAM action based on the provided parameters.
:param action: The action to be executed (e.g., "navigate", "pick_up").
:type action: str
:param params: Parameters required for the action.
:type params: dict
"""
if action == "navigate":
self._navigate(params)
elif action == "pick_up":
self._pick_up(params)
elif action == "place":
self._place(params)
elif action == "open":
self._open(params)
elif action == "close":
self._close(params)
"""
action_map = {
"navigate": self._navigate,
"pick_up": self._pick_up,
"place": self._place,
"open": self._open,
"close": self._close,
}

if action in action_map:
action_map[action](params)
else:
raise ValueError(f"Unknown action: {action}")

def _navigate(self, params):
def _navigate(self, params: Dict[str, Pose]) -> None:
"""
Navigates the robot to a target location.
:param params: Parameters for the navigate action, including "target_pose".
:type params: dict
"""
target_pose = params.get("target_pose")
if not target_pose:
raise ValueError("Missing parameter: target_pose")
NavigateAction(target_locations=[target_pose]).resolve().perform()

def _pick_up(self, params):
def _pick_up(self, params: Dict[str, Union[str, List[Grasp]]]) -> None:
"""
Picks up an object.
:param params: Parameters for the pick-up action, including "object_desig" and "arm".
:type params: dict
"""
object_name = params.get("object_desig")
arm = params.get("arm")
Expand All @@ -123,12 +130,11 @@ def _pick_up(self, params):
).resolve()
action.perform()

def _place(self, params):
def _place(self, params: Dict[str, Union[str, Pose]]) -> None:
"""
Places an object at a target location.
:param params: Parameters for the place action, including "object_desig", "target_pose", and "arm".
:type params: dict
"""
object_desig = params.get("object_desig")
target_pose = params.get("target_pose")
Expand All @@ -137,38 +143,35 @@ def _place(self, params):
raise ValueError("Missing parameters: object_desig, target_pose, and arm are required")
PlaceAction(object_designator_description=object_desig, target_locations=[target_pose], arms=[arm]).resolve().perform()

def _open(self, params):
def _open(self, params: Dict[str, str]) -> None:
"""
Opens an object (e.g., a drawer or door).
:param params: Parameters for the open action, including "handle_desig" and "arm".
:type params: dict
"""
handle_desig = params.get("handle_desig")
arm = params.get("arm")
if not handle_desig or not arm:
raise ValueError("Missing parameters: handle_desig and arm are required")
OpenAction(handle_desig, [arm]).resolve().perform()

def _close(self, params):
def _close(self, params: Dict[str, str]) -> None:
"""
Closes an object (e.g., a drawer or door).
:param params: Parameters for the close action, including "handle_desig" and "arm".
:type params: dict
"""
handle_desig = params.get("handle_desig")
arm = params.get("arm")
if not handle_desig or not arm:
raise ValueError("Missing parameters: handle_desig and arm are required")
CloseAction(handle_desig, [arm]).resolve().perform()

def get_current_state(self):
def get_current_state(self) -> Dict[str, Union[Pose, List[Dict[str, Pose]]]]:
"""
Fetches the current state of the environment, including the robot pose and objects.
:return: Dictionary containing the robot pose and a list of objects with their poses.
:rtype: dict
"""
robot_pose = self.robot.get_pose() if self.robot else None
objects = [{"name": obj.name, "pose": obj.pose} for obj in self.world.objects]
Expand Down

0 comments on commit e04bc72

Please sign in to comment.