Skip to content

Commit

Permalink
Add gymnasium_interface files
Browse files Browse the repository at this point in the history
  • Loading branch information
mkhoshnam committed Nov 26, 2024
1 parent e75a355 commit 79f005f
Show file tree
Hide file tree
Showing 3 changed files with 340 additions and 0 deletions.
54 changes: 54 additions & 0 deletions src/gymnasium_interface/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import logging
from gymnasium_interface.pycram_gym_env import PyCRAMGymEnv
from pycram.datastructures.enums import Arms, Grasp
from pycram.datastructures.pose import Pose

# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

def custom_reward(state):
"""
Custom reward function.
:param state: The current state of the environment.
:type state: dict
:return: Reward value based on the state.
:rtype: float
"""
return 10.0 if state else -1.0

# Define actions as a list of strings
actions = ["navigate", "pick_up"]

# Define default parameters for each action
default_params = {
"navigate": {"target_pose": Pose(position=[1.0, 2.0, 0.0], orientation=[0.0, 0.0, 0.0, 1.0])},
"pick_up": {"object_desig": "milk", "arm": Arms.RIGHT, "grasps": [Grasp.FRONT]},
}

# Define objects to initialize in the environment
objects = [
{
"name": "milk",
"type": "object",
"urdf": "milk.stl",
"pose": Pose(position=[2.5, 2.10, 1.02]),
}
]

# Initialize the Gymnasium environment
env = PyCRAMGymEnv(actions=actions, default_params=default_params, objects=objects, reward_function=custom_reward)

# Reset the environment and retrieve the initial state
state, info = env.reset()
logging.info(f"State after reset: {state}")

# Perform a step in the environment
try:
state, reward, done, truncated, info = env.step(
action=1, # Index of the action to execute
params={"object_desig": "milk", "arm": Arms.RIGHT, "grasps": [Grasp.FRONT]},
)
logging.info(f"State after step: {state}, Reward: {reward}, Done: {done}, Truncated: {truncated}")
except ValueError as e:
logging.error(f"Action failed: {e}")
111 changes: 111 additions & 0 deletions src/gymnasium_interface/pycram_gym_env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import gymnasium as gym
from gymnasium.spaces import Discrete
from gymnasium_interface.task_executor import PyCRAMTaskExecutor # Use absolute import
from pycram.process_module import simulated_robot


class PyCRAMGymEnv(gym.Env):
"""
A Gymnasium-compatible environment for integrating PyCRAM task execution.
This environment allows users to execute PyCRAM tasks within a Gymnasium-compatible
framework. It supports dynamic task initialization, state tracking, and custom reward
calculations.
:param actions: List of valid action classes or functions (e.g., [NavigateAction, PickUpAction]).
:type actions: list
:param default_params: Default parameters for each action, keyed by action class/function (optional).
:type default_params: dict
:param objects: List of objects to initialize in the environment (optional).
:type objects: list
:param reward_function: Custom user-defined function to compute rewards (optional).
:type reward_function: callable
"""

def __init__(self, actions, default_params=None, objects=None, reward_function=None):
self.actions = actions
self.default_params = default_params or {}
self.objects = objects or []
self.reward_function = reward_function

# Dynamically define the action space
self.action_space = Discrete(len(actions))

# Initialize the task executor
self.executor = PyCRAMTaskExecutor()

# Initialize the state
self.state = None
self.reset()

def reset(self):
"""
Resets the environment.
:return: The initial state of the environment.
:rtype: tuple
"""
with simulated_robot:
self.executor.reset_task(self.objects)
self.state = self.executor.get_current_state()
return self.state, {}

def step(self, action, params=None):
"""
Executes a step in the environment.
:param action: The action index to execute.
:type action: int
:param params: Additional parameters for the action.
:type params: dict, optional
:return: A tuple containing the next state, reward, done flag, truncated flag, and additional info.
:rtype: tuple
"""
with simulated_robot:
action_name = self.actions[action]
action_params = self.default_params.get(action_name, {}).copy()
if params:
action_params.update(params)

# Execute the action
self.executor.execute_action(action_name, action_params)

# Update the state
self.state = self._get_observation()

# Calculate reward
reward = self._calculate_reward()

# Placeholder: done logic can be updated later
done = self._is_done()

return self.state, reward, done, False, {}

def _get_observation(self):
"""
Fetches the current state of the environment.
:return: The current state of the environment.
:rtype: dict
"""
return self.state

def _calculate_reward(self):
"""
Calculates the reward using the user-defined reward function.
:return: The calculated reward.
:rtype: float
"""
if self.reward_function:
return self.reward_function(self.state)
return 1.0

def _is_done(self):
"""
Checks if the task is complete.
:return: True if the task is done, otherwise False.
:rtype: bool
"""
return False
175 changes: 175 additions & 0 deletions src/gymnasium_interface/task_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
from pycram.worlds.bullet_world import BulletWorld
from pycram.world_concepts.world_object import Object
from pycram.datastructures.enums import ObjectType, WorldMode, Grasp
from pycram.datastructures.pose import Pose
from pycram.designators.action_designator import NavigateAction, PickUpAction, PlaceAction, OpenAction, CloseAction
from pycram.designators.object_designator import BelieveObject
from pycram.process_module import simulated_robot
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class PyCRAMTaskExecutor:
"""
Handles task execution in a PyCRAM environment. This class integrates with BulletWorld for
managing objects and robot tasks in the simulation.
Attributes:
world (BulletWorld): The BulletWorld instance managing the environment.
robot (Object): The robot object in the environment.
apartment (Object): The apartment or environment object in the simulation.
"""

def __init__(self):
"""
Initializes the task executor for PyCRAM actions.
"""
self.world = BulletWorld(WorldMode.GUI)
self.robot = None
self.apartment = None

def clear_world(self):
"""
Removes all objects from the BulletWorld.
"""
logging.info("Clearing all objects from BulletWorld...")
for obj in list(self.world.objects):
obj.remove()
logging.info("All objects removed from BulletWorld.")

def reset_task(self, objects):
"""
Resets the simulation environment dynamically by clearing the world and adding new objects.
:param objects: List of objects to be added to the environment.
:type objects: list[dict]
"""
self.clear_world()

# Reload the apartment URDF
self.apartment = Object("apartment", "environment", "apartment.urdf")

# Reinitialize the robot
self.robot = Object("pr2", ObjectType.ROBOT, "pr2.urdf", pose=Pose([1.2, 1, 0]))
self.world.robot = self.robot

# Add dynamic objects
for obj in objects:
name = obj["name"]
obj_type = obj["type"]
urdf = obj["urdf"]
pose = obj["pose"]

logging.info(f"Adding object: {name}, URDF path: {urdf}, Pose: {pose}")

existing_object = self.world.get_object_by_name(name)
if existing_object:
logging.info(f"Reusing existing object: {name}")
else:
Object(name, obj_type, urdf, pose=pose)

logging.info("Environment reset: Apartment, robot, and dynamic objects added.")

def execute_action(self, action, params):
"""
Executes a PyCRAM action based on the provided parameters.
:param action: The action to be executed (e.g., "navigate", "pick_up").
:type action: str
:param params: Parameters required for the action.
:type params: dict
"""
if action == "navigate":
self._navigate(params)
elif action == "pick_up":
self._pick_up(params)
elif action == "place":
self._place(params)
elif action == "open":
self._open(params)
elif action == "close":
self._close(params)
else:
raise ValueError(f"Unknown action: {action}")

def _navigate(self, params):
"""
Navigates the robot to a target location.
:param params: Parameters for the navigate action, including "target_pose".
:type params: dict
"""
target_pose = params.get("target_pose")
if not target_pose:
raise ValueError("Missing parameter: target_pose")
NavigateAction(target_locations=[target_pose]).resolve().perform()

def _pick_up(self, params):
"""
Picks up an object.
:param params: Parameters for the pick-up action, including "object_desig" and "arm".
:type params: dict
"""
object_name = params.get("object_desig")
arm = params.get("arm")
grasps = params.get("grasps", [Grasp.RIGHT])
if not object_name or not arm:
raise ValueError("Missing parameters: object_desig and arm are required")
object_desig = BelieveObject(names=[object_name])
action = PickUpAction(
object_designator_description=object_desig, arms=[arm], grasps=grasps
).resolve()
action.perform()

def _place(self, params):
"""
Places an object at a target location.
:param params: Parameters for the place action, including "object_desig", "target_pose", and "arm".
:type params: dict
"""
object_desig = params.get("object_desig")
target_pose = params.get("target_pose")
arm = params.get("arm")
if not object_desig or not target_pose or not arm:
raise ValueError("Missing parameters: object_desig, target_pose, and arm are required")
PlaceAction(object_designator_description=object_desig, target_locations=[target_pose], arms=[arm]).resolve().perform()

def _open(self, params):
"""
Opens an object (e.g., a drawer or door).
:param params: Parameters for the open action, including "handle_desig" and "arm".
:type params: dict
"""
handle_desig = params.get("handle_desig")
arm = params.get("arm")
if not handle_desig or not arm:
raise ValueError("Missing parameters: handle_desig and arm are required")
OpenAction(handle_desig, [arm]).resolve().perform()

def _close(self, params):
"""
Closes an object (e.g., a drawer or door).
:param params: Parameters for the close action, including "handle_desig" and "arm".
:type params: dict
"""
handle_desig = params.get("handle_desig")
arm = params.get("arm")
if not handle_desig or not arm:
raise ValueError("Missing parameters: handle_desig and arm are required")
CloseAction(handle_desig, [arm]).resolve().perform()

def get_current_state(self):
"""
Fetches the current state of the environment, including the robot pose and objects.
:return: Dictionary containing the robot pose and a list of objects with their poses.
:rtype: dict
"""
robot_pose = self.robot.get_pose() if self.robot else None
objects = [{"name": obj.name, "pose": obj.pose} for obj in self.world.objects]
return {"robot_pose": robot_pose, "objects": objects}

0 comments on commit 79f005f

Please sign in to comment.