Source code for director


"""
    A Director ...
"""
from os import listdir
from os.path import isfile, join

import rospkg
import rospy
import actionlib

import xml.etree.ElementTree as ET

from control_msgs.msg import FollowJointTrajectoryAction
from mh5_director.msg import RunScript

# from mh5_director.msg import RunScriptAction, RunScriptResult, RunScriptFeedback
from portfolio import Portfolio

[docs]class Director: """The Director class Reads script definitions from YAML files and can execute them by passing the pose information to the ``dynamixel_control/follow_joint_trajectory`` action server. Listens to the ``director/run`` topic for commands to execute a script. """ def __init__(self, portfolio_path=None): """Initializes the director. If the path is not provided in the call, the method uses the ``portfolio`` directory in the ``mh5_director`` package. :param portfolio_path: the path to use for loading script definctions , defaults to None :type portfolio_path: string, optional """ if not portfolio_path: self.portfolio_path = '/portfolio' else: self.portfolio_path = portfolio_path self.portfolios = {} # self.joint_controllers = {} # self.clients = {} self.run_server = None self.action_client = None
[docs] def load_scripts(self): """Loads XML definitions from the param server and stores them in the ``portfolios`` attribute. """ root = ET.fromstring(rospy.get_param(self.portfolio_path)) for p in root: if p.tag != 'portfolio': raise ValueError(f'>>> unexpected {p.tag} tag; only <portfolio> tag should be used') if 'name' not in p.attrib: raise ValueError('[mh5_director] missing name attribute in portfolio definition') name = p.attrib['name'] rospy.loginfo(f'[mh5_director] loading portfolio {name}') portfolio = Portfolio.from_xml(p) if (portfolio): self.portfolios[portfolio.name] = portfolio for script in portfolio.scripts: rospy.loginfo(f"[mh5_director] ... script {script} available in portfolio {portfolio.name}")
[docs] def setup_services(self): """Starts the subscriptions. Director subscribes to: - ``director/run`` - used to trigger the execution of a script - ... """ self.run_server = rospy.Subscriber('director/run', RunScript, self.run_script_callback) rospy.loginfo('[mh5_director] director/run waiting for commands...')
[docs] def setup_action_client(self): """Sets up the subscription to the ``dynamixel_control/follow_joint_trajectory`` action server. The function will wait for the action server to become available. """ self.action_client = actionlib.SimpleActionClient('dynamixel_control/follow_joint_trajectory', FollowJointTrajectoryAction) rospy.loginfo('[mh5_director] wating for action server: dynamixel_control/follow_joint_trajectory') self.action_client.wait_for_server() rospy.loginfo('[mh5_director]...action server available')
[docs] def run_script_callback(self, msg): """Callback for ``director/run`` The request script should be in the form: <portfolio.script>. Will log errors if the requested portfolio or script in that portfolio doesn't exist. The information in the script is converted into a JointTrajectoryGoal and passed to the action server. If the ``feedback`` attribute in the message is True, the script_feedback_callback() will be also submitted to the send_goal() method of the action client. If the ``wait`` attribute in the message is True the method will wait for the action server to finish before completing. :param msg: message received :type msg: RunScript """ combo_name = msg.script port_name, scr_name = combo_name.split('.')[:2] if port_name not in self.portfolios: rospy.logerr(f'[mh5_director] portfolio {port_name} does not exist') return if scr_name not in self.portfolios[port_name].scripts: rospy.logerr(f'[mh5_director] script {scr_name} does not exist in portfolio {port_name}') return rospy.loginfo(f'[mh5_director] running script: {scr_name} in portfolio {port_name}') goal = self.portfolios[port_name].to_joint_trajectory_goal(scr_name, msg.playback_speed) if msg.feedback: self.action_client.send_goal(goal, feedback_cb=self.script_feedback_callback) else: self.action_client.send_goal(goal) if msg.wait: self.action_client.wait_for_result() rospy.loginfo(f'[mh5_director] script {port_name}.{scr_name} completed')
# print some results # ...
[docs] def script_feedback_callback(self, feedback): """Provides feedback while running the script. :param feedback: the feedback provided by the action server :type feedback: FollowJointTrajectoryFeedback """ num_joints = len(feedback.joint_names) # rospy.loginfo(f'[mh5_director] {feedback.actual.time_from_start.to_sec()}') rospy.loginfo(f'[director] avg error: {sum(feedback.error.positions)/num_joints:.2f}, max error: {max(feedback.error.positions):.2f}')
# detail feedback