几篇比较好的博客:
通过ROS控制真实机械臂(5)—Moveit!真实机械臂(move_group界面)
先上个官网的基础概念:https://moveit.ros.org/documentation/concepts/
什么是moveit?简单来说:moveit就是一个数学工具。现在设想,假如我们没有moveit,我们控制机械臂是一个怎样的流程呢?绝大多数情况下,开发机械臂需要完成的任务如下:
逆运动学建模。机械臂末端的位姿是各个关节叠加形成的。在绝大多数的实际应用中,我们往往是给机械臂的末端指定一个目标点,让它到达(并执行抓取等操作),而不是说我希望关节1运动多少角度,关节2运动多少角度。所以我们需要对机械臂进行逆运动学建模,也就是我们要计算机械臂末端在处于空间某一位姿时,机械臂的各个关节的角度。
路径规划。如果从空间中的一个点到达另一个点,我们应该怎么到达?这就涉及到了路径规划的算法。
控制机械臂进行运动。路径规划会规划出一个路径点集,也就是一组空间位姿,对其进行逆运动学求解就可以得到每个点对应的各个关节的角度,从而控制器就可以根据此角度控制电机运动。
上面就是常规的开发机械臂的基础任务。那么moveit做了什么工作呢?它把机械臂开发过程中,重复的、通用的部分(上述1、2)直接整合在了一起。moveit完成的工作如下:
所以,现在我们使用moveit控制机械臂,只要进行如下几个大家比较熟悉的步骤:
配置机械臂。向moveit的一个插件moveit_setup_assistant提供URDF机器人描述文件,然后根据要求完成机器人配置,步骤可以参考古月居的博客Moveit!配置助手的使用教程。最后会生成一个机器人的功能包。至此,就可以使用如下命令在rviz实现内实现机械臂的可视化路径规划等操作了。
roslaunch <your_package> demo.launch
值得一提的是,现在已经很多很好的机器人公司的产品支持ROS支持moveit的机器人一览,你可以点进去找一个你喜欢的,然后直接去他们的github主页下载上述生成的功能包。
控制机器人运动。这一点也正是接下来要讲的、做的。既然moveit已经帮我们完成了路径规划和求运动学逆解,那么我们就需要根据规划结果去控制机器人。
那我们就需要知道我们应该如何拿到这个规划结果以及规划结果是什么形式。
先看这张movieit的架构图:
可以看到右侧move_group与RobotController(真实的机器人控制器)通过一个JointTrajectoryAction连接。所以可以得知他们的通信模式是ROS Action。那么则需要一对Action Sever和Action Client。
首先,因为我们现在控制真的机器人运动,所以需要到demo.launch中修改如下的参数。将"fake_execution"改为false。
<include file="$(find ur5_moveit_config)/launch/move_group.launch">
<arg name="allow_trajectory_execution" value="true"/>
<arg name="fake_execution" value="true"/>
<arg name="info" value="true"/>
<arg name="debug" value="$(arg debug)"/>
</include>
然后再次运行demo.launch,会发现终端会报这样一个错。
说的是名叫“/follow_joint_trajectory”的Action client没有连接成功。这就对了,因为Action Server都没启动,client肯定会连接失败。这里的Sever端就是需要我们手动编写关于接收规划结果并控制机器人运动的代码。
关于这个Action Server,针对不同的机械臂也有不同的任务点。
先拿取moveit的路径规划结果,通过socket传给fp,fp根据规划结果执行路径。
那么需要做的有两个主任务:
但是,还有一个小任务,moveit需要知道机器人的实时位姿,所以还需要写一对socket:在fp得到joint_states发送给本机,然后在ros内发布。(实现不同的ros master下,接收joint_states)
该文件在/home/hezhou/catkin_ws/src/fp-robotics/fp_driver/src内,写了一个sever,在规划完成即on_goal时,得到规划结果,并将结果内的单位转化为fp的单位(弧度转换为度),再通过socket发给fp本体。
#!/usr/bin/env python
#!coding=utf-8
from __future__ import print_function
import roslib; roslib.load_manifest('ur_driver')
import time, sys, threading, math
import copy
import datetime
import socket, select
import struct
import traceback, code
import optparse
import json
import rospy
import actionlib
from sensor_msgs.msg import JointState
from control_msgs.msg import FollowJointTrajectoryAction
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from geometry_msgs.msg import WrenchStamped
from fp_core_msgs.srv import MoveJoint
import numpy as np
class FPTrajectoryFollower(object):
RATE = 0.02
def __init__(self, name):
self._action_name = name
self.joint_goal_tolerances = [0.05, 0.05, 0.05, 0.05, 0.05, 0.05]
self.server = actionlib.ActionServer("/follow_joint_trajectory",
FollowJointTrajectoryAction,
self.on_goal,auto_start=False)
self.server.start()
def on_goal(self, goal_handle):
# 存放moveit规划完成的路径点
#print(goal_handle.get_goal().trajectory)
joint_points=goal_handle.get_goal().trajectory.points
#print(joint_points)
print (len(joint_points))
joint_dict=[]
last_time=0
for index in range(len(joint_points)):
time_now=float(joint_points[index].time_from_start.secs)+float(joint_points[index].time_from_start.nsecs)/1000000000
#print (index,time_now)
print (index,time_now-last_time)
positions=(np.array(joint_points[index].positions)*180/3.14159).tolist()
velocities=(np.array(joint_points[index].velocities)*180/3.14159).tolist()
#print(index,velocities)
joint_dict.append({'position':positions,'velocity':velocities,'duration':time_now-last_time})
last_time=time_now
#print (joint_dict)
json_string = json.dumps(joint_dict)
#print(json_string)
msg = struct.pack('>I', len(json_string)) + json_string
odom_socket.sendall(msg)
goal_handle.set_accepted()
goal_handle.set_succeeded()
if __name__ == '__main__':
rospy.init_node('fp_server')
# rospy.wait_for_service('/PRob2R/core/move_joint')
server = FPTrajectoryFollower(rospy.get_name())
odom_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#odom_socket.connect(('127.0.0.1',60003))
odom_socket.connect(('192.168.31.139',60003))
print('start')
#odom_socket.send("333")
rospy.spin()
进入fp网页端,192.168.31.139,点击“程序”,新建一个程序并执行。
#!/usr/bin/env python
#!coding=utf-8
"""
2020-08-13
created by hezhou
测试TCP通信
"""
import copy
import datetime
import socket, select
import struct
import json
#message proto
# id | length | data
def recv_msg(sock):
try:
raw_msglen = recvall(sock, 4)
if not raw_msglen:
return None
msglen = struct.unpack('>I', raw_msglen)[0]
#print (msglen)
# Read the message data
return recvall(sock, msglen)
except ConnectionResetError as e:
return None
def recvall(sock, n):
# Helper function to recv n bytes or return None if EOF is hit
data = b''
while len(data) < n:
packet = sock.recv(n - len(data))
if not packet:
return None
data += packet
return data
#初始化socket,监听8000端口
odom_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
odom_socket.bind(('',60003))
odom_socket.listen(10)
#设置noblock,否则会阻塞在接听,下面while不会一直循环,只有在有数据才进行下一次循环
#fcntl.fcntl(client, fcntl.F_SETFL, os.O_NONBLOCK)
print (11)
# while True:# conn就是客户端链接过来而在服务端为期生成的一个链接实例
conn,addr = odom_socket.accept() #等待链接,多个链接的时候就会出现问题,其实返回了两个值
print(conn,addr)
while True:
try:
#data = conn.recv(1024) #接收数据
data = recv_msg(conn)
print("msg arrive")
joint_points = json.loads(data.decode('utf-8'))
# print(type(joint_points))
print(joint_points)
run_advanced_path(joint_points,move_to_initial_position=True)
except ConnectionResetError as e:
print('关闭了正在占线的链接!')
break
conn.close()
这个脚本没啥好说的,主要介绍一下fp的这个函数,未公开的,但可以使用
自带的calculate_advanced_path不好用,就用一个run_advanced_path即可。
因为fp内部有一个ros系统,它自己发布话题’/PRob2R/joint_states’,所以我们需要写一对socket,让客户端的ros-master位于fp的ip上,并订阅’/PRob2R/joint_states’,然后通过socket把该数据传给服务端,服务端得到信息过后就把该消息又重新发送到本机的ros系统内的"/joint_states"内。
这两段代码位于新建的node文件夹下。实际上的逻辑都是很短的,代码稍长主要是为了遵守socket的通信格式,所以定义了几个函数。这对socket通信部分的代码比较标准,可以当做模板。
server:
#! /usr/bin/env python
# -*- coding=utf-8 -*-
import socket
import time,os,fcntl
import struct
import rospy
from sensor_msgs.msg import JointState
#message proto
# id | length | data
def recv_msg(sock):
try:
# Read message length and unpack it into an integer
raw_id = recvall(sock, 4)
if not raw_id:
return None
id = struct.unpack('>I', raw_id)[0]
print "receive id: ",id
raw_msglen = recvall(sock, 4)
if not raw_msglen:
return None
msglen = struct.unpack('>I', raw_msglen)[0]
# Read the message data
return recvall(sock, msglen)
except Exception ,e:
return None
def recvall(sock, n):
# Helper function to recv n bytes or return None if EOF is hit
data = ''
while len(data) < n:
packet = sock.recv(n - len(data))
if not packet:
return None
data += packet
return data
##把接受的数据重新打包成ros topic发出去
def msg_construct(data):
list = data.split(',')
joint=JointState()
joint.header.stamp = rospy.Time.now()
joint.header.frame_id = "/base_link"
joint.name =["p_rob_joint1","p_rob_joint2","p_rob_joint3","p_rob_joint4","p_rob_joint5","p_rob_joint6"]
position=[float(list[0]),float(list[1]),float(list[2]),float(list[3]),float(list[4]),float(list[5])]
joint.position=position
return joint
#初始化socket,监听8000端口
odom_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
odom_socket.bind(('',8000))
odom_socket.listen(10)
(client,address) = odom_socket.accept()
rospy.init_node("client_node")
odom_pub = rospy.Publisher("/joint_states",JointState,queue_size=30)
r = rospy.Rate(1000)
#设置noblock,否则会阻塞在接听,下面while不会一直循环,只有在有数据才进行下一次循环
fcntl.fcntl(client, fcntl.F_SETFL, os.O_NONBLOCK)
while not rospy.is_shutdown():
data = recv_msg(client)
if data:
odom_pub.publish(msg_construct(data))
r.sleep()
client:
#! /usr/bin/env python
# -*- coding=utf-8 -*-
import socket
import time,os,fcntl
import struct
import rospy
from sensor_msgs.msg import JointState
#message proto
# id | length | data
def recv_msg(sock):
try:
# Read message length and unpack it into an integer
raw_id = recvall(sock, 4)
if not raw_id:
return None
id = struct.unpack('>I', raw_id)[0]
print "receive id: ",id
raw_msglen = recvall(sock, 4)
if not raw_msglen:
return None
msglen = struct.unpack('>I', raw_msglen)[0]
# Read the message data
return recvall(sock, msglen)
except Exception ,e:
return None
def recvall(sock, n):
# Helper function to recv n bytes or return None if EOF is hit
data = ''
while len(data) < n:
packet = sock.recv(n - len(data))
if not packet:
return None
data += packet
return data
##把接受的数据重新打包成ros topic发出去
def msg_construct(data):
list = data.split(',')
joint=JointState()
joint.header.stamp = rospy.Time.now()
joint.header.frame_id = "/base_link"
joint.name =["p_rob_joint1","p_rob_joint2","p_rob_joint3","p_rob_joint4","p_rob_joint5","p_rob_joint6"]
position=[float(list[0]),float(list[1]),float(list[2]),float(list[3]),float(list[4]),float(list[5])]
joint.position=position
return joint
#初始化socket,监听8000端口
odom_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
odom_socket.bind(('',8000))
odom_socket.listen(10)
(client,address) = odom_socket.accept()
rospy.init_node("client_node")
odom_pub = rospy.Publisher("/joint_states",JointState,queue_size=30)
r = rospy.Rate(1000)
#设置noblock,否则会阻塞在接听,下面while不会一直循环,只有在有数据才进行下一次循环
fcntl.fcntl(client, fcntl.F_SETFL, os.O_NONBLOCK)
while not rospy.is_shutdown():
data = recv_msg(client)
if data:
odom_pub.publish(msg_construct(data))
r.sleep()
这一部分可以参考雄克机器人的配置PPT非常详细。
ros_controllers.yaml,将默认生成的改成了如下。
controller_list:
- name: ""
action_ns: follow_joint_trajectory
default: True
type: FollowJointTrajectory
joints:
- p_rob_joint1
- p_rob_joint2
- p_rob_joint3
- p_rob_joint4
- p_rob_joint5
- p_rob_joint6
修改move_group.launch
创建了一个新的real_robot.launch用于真实机器人:把demo.launch拷贝过来,稍做修改即可。主要是把fake_execution改为false。
<launch>
<!-- By default, we are not in debug mode -->
<arg name="debug" default="false" />
<arg name="use_gui" default="false" />
<!-- Load the URDF, SRDF and other .yaml configuration files on the param server -->
<include file="$(find fp_moveit_config)/launch/planning_context.launch">
<arg name="load_robot_description" value="true"/>
</include>
<!-- Given the published joint states, publish tf for the robot links -->
<node name="robot_state_publisher" pkg="robot_state_publisher" type="robot_state_publisher" respawn="true" output="screen" />
<!-- Run the main MoveIt! executable without trajectory execution (we do not have controllers configured by default) -->
<include file="$(find fp_moveit_config)/launch/move_group.launch">
<arg name="allow_trajectory_execution" value="true"/>
<arg name="fake_execution" value="false"/>
<arg name="info" value="true"/>
<arg name="debug" value="$(arg debug)"/>
</include>
<!-- Run Rviz and load the default config to see the state of the move_group node -->
<include file="$(find fp_moveit_config)/launch/moveit_rviz.launch">
<arg name="config" value="true"/>
<arg name="debug" value="$(arg debug)"/>
</include>
</launch>