Skip to content

Commit

Permalink
[Abstract World] Cleaned Object constructor, added more internal meth…
Browse files Browse the repository at this point in the history
…ods to Object class.
  • Loading branch information
AbdelrhmanBassiouny committed Jan 30, 2024
1 parent 1aaa11e commit eace428
Showing 1 changed file with 95 additions and 114 deletions.
209 changes: 95 additions & 114 deletions src/pycram/world.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ class World(ABC):
Global reference for the data directories, this is used to search for the URDF files of the robot and the objects.
"""

cach_dir = data_directory[0] + '/cached/'
"""
Global reference for the cache directory, this is used to cache the URDF files of the robot and the objects.
"""

def __init__(self, mode: WorldMode, is_prospection_world: bool, simulation_frequency: float):
"""
Creates a new simulation, the mode decides if the simulation should be a rendered window or just run in the
Expand Down Expand Up @@ -390,26 +395,6 @@ def get_contact_points_between_two_objects(self, obj1: Object, obj2: Object) ->
"""
pass

def get_joint_ranges(self) -> Tuple[List, List, List, List, List]:
"""
Calculates the lower and upper limits, the joint ranges and the joint damping. For a given robot Object.
Fixed joints will be skipped because they don't have limits or ranges.
:return: The lists for the upper and lower limits, joint ranges, rest poses and joint damping
"""
ll, ul, jr, rp, jd = [], [], [], [], []
joint_names = self.get_joint_names(self.robot)
for name in joint_names:
joint_type = self.get_object_joint_type(self.robot, name)
if joint_type != JointType.FIXED:
ll.append(self.get_object_joint_lower_limit(self.robot, name))
ul.append(self.get_object_joint_upper_limit(self.robot, name))
jr.append(ul[-1] - ll[-1])
rp.append(self.get_joint_rest_pose(self.robot, name))
jd.append(self.get_joint_damping(self.robot, name))

return ll, ul, jr, rp, jd

def get_joint_rest_pose(self, obj: Object, joint_name: str) -> float:
"""
Get the rest pose of a joint of an articulated object
Expand Down Expand Up @@ -664,7 +649,7 @@ def exit(self, wait_time_before_exit_in_secs: Optional[float] = None) -> None:
self.reset_robot()
self.join_threads()

def exit_prospection_world_if_exists(self):
def exit_prospection_world_if_exists(self) -> None:
"""
Exits the prospection world if it exists.
"""
Expand All @@ -673,33 +658,33 @@ def exit_prospection_world_if_exists(self):
self.prospection_world.exit()

@abstractmethod
def disconnect_from_physics_server(self):
def disconnect_from_physics_server(self) -> None:
"""
Disconnects the world from the physics server.
"""
pass

def reset_current_world(self):
def reset_current_world(self) -> None:
"""
Resets the current world to None if this is the current world.
"""
if World.current_world == self:
World.current_world = None

def reset_robot(self):
def reset_robot(self) -> None:
"""
Sets the robot class variable to None.
"""
self.set_robot(None)

@abstractmethod
def join_threads(self):
def join_threads(self) -> None:
"""
Join any running threads. Useful for example when exiting the world.
"""
pass

def terminate_world_sync(self):
def terminate_world_sync(self) -> None:
"""
Terminates the world sync thread.
"""
Expand All @@ -718,7 +703,7 @@ def save_state(self) -> int:
self.saved_states.append(state_id)
return state_id

def save_objects_state(self, state_id: int):
def save_objects_state(self, state_id: int) -> None:
"""
Saves the state of all objects in the World according to the given state using the unique state id.
:param state_id: The unique id representing the state.
Expand Down Expand Up @@ -747,23 +732,23 @@ def save_physics_simulator_state(self) -> int:
pass

@abstractmethod
def remove_physics_simulator_state(self, state_id: int):
def remove_physics_simulator_state(self, state_id: int) -> None:
"""
Removes the state of the physics simulator with the given id.
:param state_id: The unique id representing the state.
"""
pass

@abstractmethod
def restore_physics_simulator_state(self, state_id: int):
def restore_physics_simulator_state(self, state_id: int) -> None:
"""
Restores the objects and environment state in the physics simulator according to
the given state using the unique state id.
:param state_id: The unique id representing the state.
"""
pass

def restore_objects_states(self, state_id: int):
def restore_objects_states(self, state_id: int) -> None:
"""
Restores the state of all objects in the World according to the given state using the unique state id.
:param state_id: The unique id representing the state.
Expand Down Expand Up @@ -866,7 +851,7 @@ def reset_world(self, remove_saved_states=True) -> None:
for obj in self.objects:
obj.reset(remove_saved_states)

def remove_saved_states(self):
def remove_saved_states(self) -> None:
"""
Removes all saved states of the World.
"""
Expand Down Expand Up @@ -1134,7 +1119,7 @@ def restore_state(self, state_id: int) -> None:
"""
self.constraint_ids = self.saved_states[state_id].constraint_ids

def get_current_state(self):
def get_current_state(self) -> LinkState:
"""
:return: The current state of this link as a LinkState object.
"""
Expand Down Expand Up @@ -1185,7 +1170,7 @@ def is_root(self) -> bool:
"""
return self.object.get_root_link_id() == self.id

def update_transform(self, transform_time: Optional[rospy.Time] = None):
def update_transform(self, transform_time: Optional[rospy.Time] = None) -> None:
"""
Updates the transformation of this link at the given time.
:param transform_time: The time at which the transformation should be updated.
Expand Down Expand Up @@ -1393,15 +1378,14 @@ def __init__(self, name: str, obj_type: ObjectType, path: str,
self.obj_type: ObjectType = obj_type
self.color: Color = color

self.local_transformer = LocalTransformer()
self.original_pose = self.local_transformer.transform_pose(pose, "map")
self._current_pose = self.original_pose
self._init_local_transformer_and_pose(pose)

self.id, self.path = self._load_object(path, ignore_cached_files)
self._load_object_and_set_id_and_path(path, ignore_cached_files)

self.tf_frame = ("prospection/" if self.world.is_prospection_world else "") + self.name + "_" + str(self.id)

self._init_urdf_object()

if self.urdf_object.name == robot_description.name:
self.world.set_robot_if_not_set(self)

Expand All @@ -1423,7 +1407,16 @@ def __init__(self, name: str, obj_type: ObjectType, path: str,

self.world.objects.append(self)

def _load_object(self, path: str, ignore_cached_files: bool) -> Tuple[int, str]:
def _init_local_transformer_and_pose(self, pose: Pose) -> None:
"""
Initializes the local transformer and the pose of this object at the given pose and transforms it to the map
frame.
"""
self.local_transformer = LocalTransformer()
self.original_pose = self.local_transformer.transform_pose(pose, "map")
self._current_pose = self.original_pose

def _load_object_and_set_id_and_path(self, path, ignore_cached_files: bool) -> None:
"""
Loads an object to the given World with the given position and orientation. The rgba_color will only be
used when an .obj or .stl file is given.
Expand All @@ -1433,9 +1426,7 @@ def _load_object(self, path: str, ignore_cached_files: bool) -> Tuple[int, str]:
This new file will have resolved mesh file paths, meaning there will be no references
to ROS packges instead there will be absolute file paths.
:param path: The path to the source file or the name on the ROS parameter server
:param ignore_cached_files: Whether to ignore files in the cache directory.
:return: The unique id of the object and the path to the file used for spawning
"""
pa = pathlib.Path(path)
extension = pa.suffix
Expand All @@ -1448,16 +1439,13 @@ def _load_object(self, path: str, ignore_cached_files: bool) -> Tuple[int, str]:
if not path:
raise FileNotFoundError(
f"File {pa.name} could not be found in the resource directory {World.data_directory}")
# rospack = rospkg.RosPack()
# cach_dir = rospack.get_path('pycram') + '/resources/cached/'
cach_dir = World.data_directory[0] + '/cached/'
if not pathlib.Path(cach_dir).exists():
os.mkdir(cach_dir)
if not pathlib.Path(World.cach_dir).exists():
os.mkdir(World.cach_dir)

# if file is not yet cached corrcet the urdf and save if in the cache directory
if not _is_cached(path, self.name, cach_dir) or ignore_cached_files:
if not _is_cached(path) or ignore_cached_files:
if extension == ".obj" or extension == ".stl":
path = _generate_urdf_file(self.name, path, self.color, cach_dir)
path = self._generate_urdf_file(path)
elif extension == ".urdf":
with open(path, mode="r") as f:
urdf_string = fix_missing_inertial(f.read())
Expand All @@ -1468,34 +1456,70 @@ def _load_object(self, path: str, ignore_cached_files: bool) -> Tuple[int, str]:
except rospkg.ResourceNotFound as e:
rospy.logerr(f"Could not find resource package linked in this URDF")
raise e
path = cach_dir + pa.name
path = World.cach_dir + pa.name
with open(path, mode="w") as f:
f.write(urdf_string)
else: # Using the urdf from the parameter server
urdf_string = rospy.get_param(path)
path = cach_dir + self.name + ".urdf"
path = World.cach_dir + self.name + ".urdf"
with open(path, mode="w") as f:
f.write(_correct_urdf_string(urdf_string))
# save correct path in case the file is already in the cache directory
elif extension == ".obj" or extension == ".stl":
path = cach_dir + pa.stem + ".urdf"
path = World.cach_dir + pa.stem + ".urdf"
elif extension == ".urdf":
path = cach_dir + pa.name
path = World.cach_dir + pa.name
else:
path = cach_dir + self.name + ".urdf"
path = World.cach_dir + self.name + ".urdf"

try:
obj_id = self.world.load_urdf_and_get_object_id(path, Pose(self.get_position_as_list(),
self.get_orientation_as_list()))
return obj_id, path
self.id = obj_id
self.path = path
except Exception as e:
logging.error(
"The File could not be loaded. Please note that the path has to be either a URDF, stl or obj file or"
" the name of an URDF string on the parameter server.")
os.remove(path)
raise (e)

def _init_urdf_object(self):
def _generate_urdf_file(self, path) -> str:
"""
Generates an URDf file with the given .obj or .stl file as mesh. In addition, the given rgba_color will be
used to crate a material tag in the URDF. The resulting file will then be saved in the cach_dir path with
the name as filename.
:return: The absolute path of the created file
"""
urdf_template = '<?xml version="0.0" ?> \n \
<robot name="~a_object"> \n \
<link name="~a_main"> \n \
<visual> \n \
<geometry>\n \
<mesh filename="~b" scale="1 1 1"/> \n \
</geometry>\n \
<material name="white">\n \
<rgba_color rgba="~c"/>\n \
</material>\n \
</visual> \n \
<collision> \n \
<geometry>\n \
<mesh filename="~b" scale="1 1 1"/>\n \
</geometry>\n \
</collision>\n \
</link> \n \
</robot>'
urdf_template = fix_missing_inertial(urdf_template)
rgb = " ".join(list(map(str, self.color.get_rgba())))
pathlib_obj = pathlib.Path(path)
path = str(pathlib_obj.resolve())
content = urdf_template.replace("~a", self.name).replace("~b", path).replace("~c", rgb)
with open(World.cach_dir + pathlib_obj.stem + ".urdf", "w", encoding="utf-8") as file:
file.write(content)
return World.cach_dir + pathlib_obj.stem + ".urdf"

def _init_urdf_object(self) -> None:
"""
Initializes the URDF object from the URDF file.
"""
Expand All @@ -1522,7 +1546,7 @@ def _init_link_name_and_id_map(self) -> None:
self.link_name_to_id[self.urdf_object.get_root()] = -1
self.link_id_to_name[-1] = self.urdf_object.get_root()

def _init_links(self):
def _init_links(self) -> None:
"""
Initializes the link objects from the URDF file and creates a dictionary which maps the link names to the
corresponding link objects.
Expand Down Expand Up @@ -2165,29 +2189,6 @@ def __del__(self):
self.remove_constraint_if_exists()


def _is_cached(path: str, name: str, cach_dir: str) -> bool:
"""
Checks if the file in the given path is already cached or if
there is already a cached file with the given name, this is the case if a .stl, .obj file or a description from
the parameter server is used.
:param path: The path given by the user to the source file.
:param name: The name for this object.
:param cach_dir: The absolute path the cach directory in the pycram package.
:return: True if there already exists a chached file, False in any other case.
"""
file_name = pathlib.Path(path).name
full_path = pathlib.Path(cach_dir + file_name)
if full_path.exists():
return True
# Returns filename without the filetype, e.g. returns "test" for "test.txt"
file_stem = pathlib.Path(path).stem
full_path = pathlib.Path(cach_dir + file_stem + ".urdf")
if full_path.exists():
return True
return False


def _correct_urdf_string(urdf_string: str) -> str:
"""
Changes paths for files in the URDF from ROS paths to paths in the file system. Since World (PyBullet legac)
Expand Down Expand Up @@ -2273,44 +2274,24 @@ def fix_link_attributes(urdf_string: str) -> str:
return xml.etree.ElementTree.tostring(tree.getroot(), encoding='unicode')


def _generate_urdf_file(name: str, path: str, color: Color, cach_dir: str) -> str:
def _is_cached(path) -> bool:
"""
Generates an URDf file with the given .obj or .stl file as mesh. In addition, the given rgba_color will be
used to crate a material tag in the URDF. The resulting file will then be saved in the cach_dir path with the name
as filename.
:param name: The name of the object
:param path: The path to the .obj or .stl file
:param color: The rgba_color which should be used for the material tag
:param cach_dir The absolute file path to the cach directory in the pycram package
:return: The absolute path of the created file
Checks if the file in the given path is already cached or if
there is already a cached file with the given name, this is the case if a .stl, .obj file or a description from
the parameter server is used.
:return: True if there already exists a cached file, False in any other case.
"""
urdf_template = '<?xml version="0.0" ?> \n \
<robot name="~a_object"> \n \
<link name="~a_main"> \n \
<visual> \n \
<geometry>\n \
<mesh filename="~b" scale="1 1 1"/> \n \
</geometry>\n \
<material name="white">\n \
<rgba_color rgba="~c"/>\n \
</material>\n \
</visual> \n \
<collision> \n \
<geometry>\n \
<mesh filename="~b" scale="1 1 1"/>\n \
</geometry>\n \
</collision>\n \
</link> \n \
</robot>'
urdf_template = fix_missing_inertial(urdf_template)
rgb = " ".join(list(map(str, color.get_rgba())))
pathlib_obj = pathlib.Path(path)
path = str(pathlib_obj.resolve())
content = urdf_template.replace("~a", name).replace("~b", path).replace("~c", rgb)
with open(cach_dir + pathlib_obj.stem + ".urdf", "w", encoding="utf-8") as file:
file.write(content)
return cach_dir + pathlib_obj.stem + ".urdf"
file_name = pathlib.Path(path).name
full_path = pathlib.Path(World.cach_dir + file_name)
if full_path.exists():
return True
# Returns filename without the filetype, e.g. returns "test" for "test.txt"
file_stem = pathlib.Path(path).stem
full_path = pathlib.Path(World.cach_dir + file_stem + ".urdf")
if full_path.exists():
return True
return False


def _world_and_id(world: World) -> Tuple[World, int]:
Expand Down

0 comments on commit eace428

Please sign in to comment.