Examples

This chapter will show some motorcortex-python. examples

Motorcortex-python examples

Using motorcortex-python a user can create:

  • Robot control scripts
  • Data acquisition/analysis logs tools
  • Automatic test scripts/reports

Connecting to Motorcortex and logging in

Motorcortex now supports WebSocket Security (wss), which requires a certificate (public key) on the client, for it to be able to connect. In general every Motorcortex server application should have its own certificate, such that access can be restricted to those clients that have a copy of the correct certificate installed. It is not recommended to share the same certificate between multiple servers or machines/robots.

Install the certificate that matches your server certificate (in the example below it is “motorcortex.crt”) to your client. When connecting, this certificate needs to be loaded.

The following code example shows how to connect an log-in to a motorcortex application:

# import the motorcortex library
import motorcortex
# Create a parameter tree object
parameter_tree = motorcortex.ParameterTree()
# Open request and subscribe connection
try:
  req, sub = motorcortex.connect("wss://localhost:5568:5567", motorcortex.MessageTypes(), parameter_tree,
                                   certificate="mcx.cert.crt", timeout_ms=1000,
                                   login="root", password="secret")
  tree = parameter_tree.getParameterTree()
  print(f"Parameters: {tree}")

except RuntimeError as err:
    print(err)

Getting the parameter tree

The first thing to do after establishing a connection with the motorcortex server is to get a list of all the available parameters.

# Requesting the parameter tree
param_tree_reply = req.getParameterTree()
param_tree_reply_msg = param_tree_reply.get(10)
if param_tree_reply_msg:
 print("Parameter tree received")
else:
 print("Failed to receive parameter tree")
parameter_tree.load(param_tree_reply_msg)

Or using a more javascript like then/catch:

# Alternative method for requesting a parameter tree (javascript-like then/catch)
param_tree_reply = req.getParameterTree()
value = param_tree_reply.then(lambda x: print("Parameter tree received")).catch(
lambda g: print("Failed to receive parameter tree")).get()
# load parameter tree into the previously initialized (but empty) variable
parameter_tree.load(param_tree_reply_msg)

Request/Reply

To get a value for a certain parameter from the server use:

get_param_reply_msg = req.getParameter('path/to/variable').get()
print("nget_param_reply_msg:")
print(get_param_reply_msg)

The value is stored in get_param_reply_msg.value which is always a tuple of values, even if the value is a scalar (which can be accessed via get_param_reply_msg.value[0]). To set a scalar value use:

set_param_reply_msg = req.setParameter('path/to/variable',
True).get()
print("nset_param_reply_msg:")
print(set_param_reply_msg)

To set an array of values use:

req.setParameter('path/to/array',
[1,2,3,4]).get()

To set a number of parameters at once:

paramlist_to_set = [{'path': 'root/Control/dummyBool',
'value': False},
 {'path': 'root/Control/dummyDoubleArray6', 'value': [4,3,2,1]}]
req.setParameterList(paramlist_to_set).get()

Subscribe to a datastream

To receive a continuous stream of values for a set of parameters as subscription can be created. The client application can bundle the desired parameters in a group. The complete group is sent by the server in a single packet and all signals in the group are therefore synchronized.

The client can create multiple subscriptions for different groups, for instance at different data-rates (by setting the “divider” parameter differently for different groups).

The group name is shared between clients and a group is published by the server once to all clients that have subscription to the group with that name. This way the communication can easily scale across large numbers of clients.

If a group name already exists on the server and another client requests a subscription to a set of signals with the same group name, the server adds any requested signals that are not already in the group to the group. The other clients using the same group name will then also receive the additional signals.

The motorcortex-python library makes it convenient to add subscriptions and handle the processing of the received data. Every subscription has a callback function associated with it, where the data can be processed and stored. Creating a subscription also created a new sub-process that handles the reception of data asynchronously to the main application.

Below is an example of how to create an handle a subscription.

import motorcortex
import time


# Define the callback function that will be called whenever a message is received
def message_received(parameters):
    for cnt in range(0, len(parameters)):
        param = parameters[cnt]
        timestamp = param.timestamp.sec + param.timestamp.nsec * 1e-9
        value = param.value
        # print the timestamp and value; convert the value to a string first
        # so we do not need to check all types before printing it
        print(f"Notify: {timestamp}, {value}")


def main():
    parameter_tree = motorcortex.ParameterTree()
    # Open request and subscribe connection
    try:
        req, sub = motorcortex.connect("wss://192.168.178.98:5568:5567", 
                                       motorcortex.MessageTypes(), parameter_tree,
                                       certificate="mcx.cert.crt", timeout_ms=1000,
                                       login="root", password="secret")
    except RuntimeError as err:
        print(err)
        exit()

    paths = ['root/Control_task/actual_cycle_max']
    # define the frequency divider that tells the server to publish only every
    # n-th sample. This depends on the update rate of the publisher.
    divider = 100
    # subscribe and wait for the reply with a timeout of 10 seconds
    subscription = sub.subscribe(paths, 'group1', divider)
    # get reply from the server
    is_subscribed = subscription.get()
    # print subscription status and layout
    if (is_subscribed is not None) and (is_subscribed.status == motorcortex.OK):
      print(f"Subscription successful, layout: {subscription.layout()}")
    else:
      print(f"Subscription failed, do your paths exist? \npaths: {paths}")
      sub.close()
      exit()
      
    # set the callback function that handles the received data
    # Note that this is a non-blocking call, starting a new thread that handles
    # the messages. You should keep the application alive for a s long as you need to
    # receive the messages
    subscription.notify(message_received)

    # polling subscription
    for i in range(100):
        value = subscription.read()
        if value:
            print(f"Polling, timestamp: {value[0].timestamp} value: {value[0].value}")
        time.sleep(1)


    # close the subscription when done
    sub.close()


if __name__ == "__main__":
    main()

Recovering a lost connection

The Motorcortex library handles temporary lost connections automatically. The client does not have to handle this. However, if the server is restarted, the client cannot automatically reconnect because the server has created a new session-id. The client may handle this case by discarding the old connection and creating a new connection and log-in again.

Closing a connection

to close a connection use:

req.close()

This is the end of the Python examples Make sure to check out the API or Tools. For creating your own Motorcortex-Python Application.