Fundamentals
8 minute read
Fundamentals
With three fundamental concepts of motorcortex python clients, you can create powerful client applications that interact with Motorcortex applications.
These examples are all done via motorcortex-python.
pip install motorcortex-python
Connecting to Motorcortex Applications
Motorcortex-python applications connect to Motorcortex applications using WebSockets. After establishing a connection the client application can request the parameter tree from the server and get/set parameter values or subscribe to data streams. This is the bases for all Motorcortex-python applications. They can be used to control robots, log data, run tests etc. and is as flexible as the Motorcortex platform itself.
The following code example shows how to connect to a motorcortex application. Make sure to change the IP_ADDRESS and PATH_TO_CERTIFICATE, LOGIN and PASSWORD variables to match your setup.
(In previous versions of the motorcortex-python library, the IP_ADDRESS required the port numbers, but this is no longer necessary as the library automatically uses the correct ports.)
# import the motorcortex library
import motorcortex
IP_ADDRESS = "wss://localhost"
PATH_TO_CERTIFICATE = "mcx.cert.crt"
LOGIN = "root"
PASSWORD = "secret"
# Create a parameter tree object
parameter_tree = motorcortex.ParameterTree()
# Open request and subscribe connection
try:
req, sub = motorcortex.connect(IP_ADDRESS, motorcortex.MessageTypes(), parameter_tree,
certificate=PATH_TO_CERTIFICATE, timeout_ms=300,
login=LOGIN, password=PASSWORD)
tree = parameter_tree.getParameterTree()
print(f"Parameters: {tree}")
except RuntimeError as err:
print(err)
The motorcortex.connect() function returns two connection objects:
req- Used for request/reply communication (getting/setting parameters, one-time queries)sub- Used for subscribe/notify communication (continuous data streams at specified rates) These will be used in the upcoming Examples
Note
Motorcortex supports WebSocket Security (wss), which requires a certificate (public key) on the client, for it to be able to connect. In general every Motorcortex server application should have its own certificate, such that access can be restricted to those clients that have a copy of the correct certificate installed. It is not recommended to share the same certificate between multiple servers or machines/robots. you can download the certificate at the [Install a Browser Certificate]({{< ref “/docs/getting-started/connect-your-pc/install-a-browser-certificate/” >}}) guide.
After a successful connection, the parameter tree will be printed to the console:
...
permissions: 432
param_type: INPUT
group_id: ADMINISTRATOR
unit: unit_undefined
path: "root/logger/logMarkerIn"
, id: 55
data_type: 514
data_size: 1
number_of_elements: 256
flags: 0
permissions: 432
param_type: OUTPUT
group_id: ADMINISTRATOR
unit: unit_undefined
path: "root/logger/logOut"
]
Recovering a lost connection
The Motorcortex library handles temporary lost connections automatically. The client does not have to handle this. However, if the server is restarted, the client cannot automatically reconnect because the server has created a new session-id. The client may handle this case by discarding the old connection and creating a new connection and log-in again.
Close connection
When the client application is done communicating with the server, it should close the connections properly:
# close request and subscribe connections
req.close()
sub.close()
Requesting Data
With the connection created in the previous example, you can now request data from the application. For example, the position of the end-effector of the robot can be requested and printed to the console every second. You can test it by adding the following code after the connection code in the previous example:
import time
while True:
# get the parameter value
position = req.getParameter('root/ManipulatorControl/manipulatorToolPoseActual').get()
if position is not None:
print(f"EE Position: {position.value}")
else:
print("Failed to get parameter value")
# wait for 1 second before requesting again
time.sleep(1)
Move the robot around and you should see the end-effector position updating in the console output.
By chaning the parameter path in the req.getParameter() function you can request any parameter that is available in the parameter tree of the connected application.
Subscribing to Data
Instead of requesting data in a loop, you can also subscribe to a parameter and receive updates whenever the value changes. This is more efficient for parameters that change frequently. You can subscribe to the same end-effector position parameter as follows:
# define a callback function that will be called when new data is received
def print_value(result):
print(f"Received new positions: {result[0].value}")
# Subscribe to parameter changes
sub_position = sub.subscribe('root/ManipulatorControl/manipulatorToolPoseActual', "Group1", frq_divider=100)
# get reply from the server
is_subscribed = sub_position.get()
# print subscription status and layout
if (is_subscribed is not None) and (is_subscribed.status == motorcortex.OK):
print(f"Subscription successful, layout: {sub_position.layout()}")
else:
print(f"Subscription failed")
sub.close()
exit()
# set the callback function that handles the received data
# Note that this is a non-blocking call, starting a new thread that handles
# the messages. You should keep the application alive for a s long as you need to
# receive the messages
sub_position.notify(print_value)
time.sleep(60) # keep the application running for 60 seconds to receive updates
sub.close() # close the subscription when done
With this code, the print_value function will be called whenever the value of the end-effector position gets updated through the subscription. This does not mean the value has changed necessarily, but that a new value is available. You can move the robot around to see the updates in the console output.
As the notify function runs in a separate thread, you need to keep the main application running (here for 60 seconds) to continue receiving updates. You also do not want to create too many subscriptions with each one parameter, as this will create new threads for each subscription. Instead, you can subscribe to multiple parameters in a single subscription by passing a list of parameter paths to the sub.subscribe() function. As they are grouped together the data will also be synchronized. Here is an example of subscribing to both the end-effector position and the joint positions of the robot in a single subscription:
paths = ['root/ManipulatorControl/manipulatorToolPoseActual',
'root/ManipulatorControl/jointPositionsActual']
sub_multi = sub.subscribe(paths, group_alias="Positions", frq_divider=1000)
is_subscribed = sub_multi.get()
# print subscription status and layout
if (is_subscribed is not None) and (is_subscribed.status == motorcortex.OK):
print(f"Subscription successful, layout: {sub_multi.layout()}")
else:
print(f"Subscription failed")
sub.close()
exit()
# set the callback function that handles the received data
def print_multi_value(result):
print(f"Received new EE position: {result[0].value}, \nJoint positions: {result[1].value}\n\n")
sub_multi.notify(print_multi_value)
time.sleep(60) # keep the application running for 60 seconds to receive updates
sub.close() # close the subscription when done
The client can create multiple subscriptions for different groups, for instance at different data-rates (by setting the “divider” parameter differently for different groups).
The group name is shared between clients and a group is published by the server once to all clients that have subscription to the group with that name. This way the communication can easily scale across large numbers of clients.
If a group name already exists on the server and another client requests a subscription to a set of signals with the same group name, the server adds any requested signals that are not already in the group to the group. The other clients using the same group name will then also receive the additional signals.
Setting parameter values
Now that we can listen to Data from the Motorcortex Application, we can try to send some Data to it. This is the way of interacting with the Motorcortex Application and controlling Robots or Machines.
Firstly, let us start up the robot!
reply = req.setParameter("root/Logic/stateCommand", 2).get()
print(f"Open: {reply}")
This code will send a command to the Logic Module of the Motorcortex Application to start up the Robot. The value 2 corresponds to the STARTUP command of the stateCommand parameter.
Using the same technique you can also open and close the gripper of the robot:
import time
for i in range(5):
reply = req.setParameter("root/UserParameters/IO/gripper", 0).get()
time.sleep(1)
reply = req.setParameter("root/UserParameters/IO/gripper", 1).get()
time.sleep(1)
print("Finished gripper open/close")
This code will open and close the gripper 5 times with a delay of 1 second in between.
Setting Array Values
Instead of setting single scalar values, you can also set array values. For example, to set the joint positions of the robot, you can use the following code:
reply = req.setParameter("root/Logic/modeCommand", 3).get()
print(f"Set to manual joint mode: {reply}")
time.sleep(2)
reply = req.setParameter("root/ManipulatorControl/hostInJointVelocity", [0.1, 0.3, 0.0, 0.0, 0.0, 0.3]).get()
print(f"Set joint velocity: {reply}")
You will notice that the robot turns on manual joint mode and starts moving the joints with the specified velocities. However, it only briefly moves. This is because the command is sent to root/ManipulatorControl/hostInJointVelocity which also has a watchdog running in the Motorcortex Application. If no new command is received within a certain time, the command is reset to zero for safety reasons.
The entire array does not have to be set all at once. You can also set individual elements of the array. For example, to set only the third and fourth joint velocity, you can use:
reply = req.setParameter("root/Logic/modeCommand", 3).get()
print(f"Set to manual joint mode: {reply}")
time.sleep(2)
reply = req.setParameter("root/ManipulatorControl/hostInJointVelocity", [0.1, 0.3], offset=2, length=2).get()
print(f"Set joint velocity: {reply}")
Overwriting parameters
During debugging and testing, you might want to overwrite certain parameters in the Motorcortex Application. This can be done using the overwriteParameter function. For example, to overwrite the root/ManipulatorControl/hostInJointVelocityGain parameter. You can then test out a new gain value without changing the configuration in the Motorcortex Application.
reply = req.overwriteParameter("root/ManipulatorControl/hostInJointVelocityGain", 0.0, force_activate=True).get()
print(f"Set joint position: {reply}")
time.sleep(5)
reply = req.releaseParameter("root/ManipulatorControl/hostInJointVelocityGain").get()
In the desk tool, you will see that the parameter is highlighted in yellow, indicating that it is being overwritten by a client application. After 5 seconds, the parameter is released and goes back to its original value.
Note
This method of setting values is useful during the debug and installation process, it is not recommended to use this method during normal operation.