MCX-Python API

9 minute read

motorcortex

statusToStr

statusToStr(motorcortex_msg, code)

Converts status codes to a readable message.

Arguments:

  • motorcortex_msg(Module) - refernce to a motorcortex module
  • code(int) - status code

Returns:

  • str - status message

Examples:

login_reply = req.login(“admin”, “iddqd”) login_reply_msg = login_reply.get() if login_reply_msg.status != motorcortex_msg.OK: print(motorcortex.statusToStr(motorcortex_msg, login_reply_msg.status))

motorcortex.motorcortex_pb2

motorcortex.subscription

timespec

Timestamp of the parameter

Arguments:

  • sec(int) - seconds
  • nsec(int) - nanoseconds

Parameter

Parameter value with a timestamp

Arguments:

  • timestamp(timespec) - Parameter timestamp
  • value(any) - Parameter value

Subscription Objects

class Subscription(object)

Subscription class represents a group of parameters.

It returns the latest values and a timestamp of the group.

Subscription class could be used as an observer, which notifies on every update or could be used as polling.

id

 | id()

Returns:

  • int - subscription identifier

alias

 | alias()

Returns:

  • str - group alias

read

 | read()

Read the latest values of the parameters in the group.

Returns:

  • list(Parameter) - list of parameters

layout

 | layout()

Get a layout of the group.

Returns:

  • list(str) - ordered list of the parameters in the group

done

 | done()

Returns:

  • bool - True if the call was successfully cancelled or finished running.

Examples:

subscription = sub.subscribe(“root/logger/logOut”, “log”) while not subscription.done(): time.sleep(0.1)

get

 | get(timeout_sec=1.0)

Returns:

  • bool - StatusMsg if the call was successfully, None if timeout happened.

Examples:

subscription = sub.subscribe(“root/logger/logOut”, “log”) done = subscription.get()

then

 | then(subscribed_clb)

JavaScript-like promise, which is resolved when subscription is completed.

Arguments:

  • subscribed_clb - callback which is resolved when subscription is completed.

Returns:

self pointer to add ‘catch’ callback

Examples:

subscription = sub.subscribe(“root/logger/logOut”, “log”) subscription.then(lambda val: print(“got: %s”%val)).catch(lambda d: print(“failed”))

catch

 | catch(failed)

JavaScript-like promise, which is resolved when subscription has failed.

Arguments:

  • failed - callback which is resolved when subscription has failed

Returns:

self pointer to add ‘then’ callback

Examples:

subscription = sub.subscribe(“root/logger/logOut”, “log”) subscription.catch(lambda d: print(“failed”)).then(lambda val: print(“got: %s”%val))

notify

 | notify(observer_list)

Set an observer, which is notified on every group update.

Arguments:

  • observer_list - a callback function (or list of callback functions) to notify when new values are available

Examples:

def update(parameters): print(parameters) list of Parameter tuples

data_sub.notify(update)

motorcortex.subscribe

Subscribe Objects

class Subscribe()

Subscribe class is used to receive continuous parameter updates from motorcortex server.

Subscribe class simplifies creating and removing subscription groups.

Arguments:

  • req(Request) - reference to a Request instance
  • protobuf_types(MessageTypes) - reference to a MessageTypes instance

connect

 | connect(url, **kwargs)

Open a subscribe connection.

Arguments:

  • url(str) - motorcortex server URL

Returns:

  • bool - True - if connected, False otherwise

close

 | close()

Close connection to the server

subscribe

 | subscribe(param_list, group_alias, frq_divider=1)

Create a subscription group for a list of the parameters.

Arguments:

  • param_list(list(str)) - list of the parameters to subscribe to
  • group_alias(str) - name of the group
  • frq_divider(int) - frequency divider is a downscaling factor for the group publish rate

Returns:

  • Subscription - A subscription handle, which acts as a JavaScript Promise, it is resolved when subscription is ready or failed. After the subscription is ready the handle is used to retrieve latest data.

unsubscribe

 | unsubscribe(subscription)

Unsubscribe from the group.

Arguments:

  • subscription(Subscription) - subscription handle

Returns:

  • Reply - Returns a Promise, which resolves when the unsubscribe operation is complete, fails otherwise.

motorcortex.parameter_tree

ParameterTree Objects

class ParameterTree(object)

This class represents a parameter tree, obtained from the server.

Reference to a parameter tree instance is needed for resolving parameters, data types and other information to build a correct request message.

load

 | load(parameter_tree_msg)

Loads a parameter tree from ParameterTreeMsg received from the server

Arguments:

  • parameter_tree_msg(ParameterTreeMsg) - parameter tree message from the server

Examples:

parameter_tree = motorcortex.ParameterTree() parameter_tree_msg = param_tree_reply.get() parameter_tree.load(parameter_tree_msg)

getParameterTree

 | getParameterTree()

Returns:

  • list(ParameterInfo) - a list of parameter descriptions

getInfo

 | getInfo(parameter_path)

Arguments:

  • parameter_path(str) - path of the parameter

Returns:

  • ParameterInfo - parameter description

getDataType

 | getDataType(parameter_path)

Arguments:

  • parameter_path(str) - path of the parameter

Returns:

  • DataType - parameter data type

motorcortex.request

Request Objects

class Request(object)

login

 | login(login, password)

Send a login request to the server

Arguments:

  • login(str) - user login

  • password(str) - user password

    Results:

  • Reply(StatusMsg) - A Promise, which resolves if login is successful and fails otherwise. Returned message has a status code, which indicates a status of the login.

Examples:

login_reply = req.login(‘operator’, ‘iddqd’) login_msg = login_reply.get() if login_msg.status == motorcortex_msg.OK print(‘User logged-in’)

getParameterTreeHash

 | getParameterTreeHash()

Request a parameter tree hash from the server.

Returns:

  • Reply(ParameterTreeMsg) - A Promise, which resolves when parameter tree hash is received or fails otherwise. ParameterTreeHashMsg message has a status field to check the status of the operation.

Examples:

param_tree_hash_reply = req.getParameterTreeHash() value = param_tree_hash_reply.get()

getParameterTree

 | getParameterTree()

Request a parameter tree from the server.

Returns:

  • Reply(ParameterTreeMsg) - A Promise, which resolves when parameter tree is received or fails otherwise. ParameterTreeMsg message has a status field to check the status of the operation.

Examples:

param_tree_reply = req.getParameterTree() value = param_tree_reply.get() parameter_tree.load(value)

save

 | save(path, file_name)

Request a server to save a parameter tree to file.

Arguments:

  • path(str) - path where to save
  • file_name(str) - file name

Returns:

  • Reply(StatusMsg) - A promise, which resolves when save operation is completed, fails otherwise.

setParameter

 | setParameter(path, value, type_name=None)

Set new value to a parameter with given path

Arguments:

  • path(str) - parameter path in the tree
  • value(any) - new parameter value
  • type_name(str) - type of the value (by default resolved automatically)

Returns:

  • Reply(StatusMsg) - A Promise, which resolves when parameter value is updated or fails otherwise.

Examples:

reply = req.setParameter(“root/Control/activateSemiAuto”, False) reply.get() reply = req.setParameter(“root/Control/targetJointAngles”, [0.2, 3.14, 0.4]) reply.get()

setParameterList

 | setParameterList(param_list)

Set new values to a parameter list

Arguments:

  • param_list([{'path'-str,'value'-any}]) - a list of the parameters which values update

Returns:

  • Reply(StatusMsg) - A Promise, which resolves when parameters from the list are updated, otherwise fails.

Examples:

req.setParameterList([ {‘path’: ‘root/Control/generator/enable’, ‘value’: False}, {‘path’: ‘root/Control/generator/amplitude’, ‘value’: 1.4}])

getParameter

 | getParameter(path)

Request a parameter with description and value from the server.

Arguments:

  • path(str) - parameter path in the tree.

Returns:

  • Reply(ParameterMsg) - Returns a Promise, which resolves when parameter message is successfully obtained, fails otherwise.

Examples:

param_reply = req.getParameter(‘root/Control/actualActuatorPositions’) param_full = param_reply.get() # Value and description

getParameterList

 | getParameterList(path_list)

Get description and values of requested parameters.

Arguments:

  • path_list(str) - list of parameter paths in the tree.

Returns:

  • Reply(ParameterListMsg) - A Promise, which resolves when list of the parameter values is received, fails otherwise.

Examples:

params_reply = req.getParameter([‘root/Control/joint1’, ‘root/Control/joint2’]) params_full = params_reply.get() # Values and descriptions print(params_full.params)

overwriteParameter

 | overwriteParameter(path, value, force_activate=False, type_name=None)

Overwrites actual value of the parameter and depending on the flag forces this value to stay active. This method of setting values is useful during debug and installation process, it is not recommended to use this method during normal operation.

Arguments:

  • path(str) - parameter path in the tree
  • value(any) - new parameter value
  • force_activate(bool) - forces new value to stay active. (by default is set to ‘False’)
  • type_name(str) - type of the value (by default resolved automatically)

Returns:

  • Reply(StatusMsg) - A Promise, which resolves when parameter value is updated or fails otherwise.

Examples:

reply = req.overwriteParameter(“root/Control/dummyBool”, False, True) reply.get()

releaseParameter

 | releaseParameter(path)

Deactivate overwrite operation of the parameter.

Arguments:

  • path(str) - parameter path in the tree

Returns:

  • Reply(StatusMsg) - A Promise, which resolves when parameter value is released or fails otherwise.

Examples:

reply = req.releaseParameter(“root/Control/dummyBool”) reply.get()

createGroup

 | createGroup(path_list, group_alias, frq_divider=1)

Create a subscription group for a list of the parameters.

This method is used inside Subscription class, use subscription class instead.

Arguments:

  • path_list(list(str)) - list of the parameters to subscribe to
  • group_alias(str) - name of the group
  • frq_divider(int) - frequency divider is a downscaling factor for the group publish rate

Returns:

  • Reply(GroupStatusMsg) - A Promise, which resolves when subscription is complete, fails otherwise.

removeGroup

 | removeGroup(group_alias)

Unsubscribe from the group.

This method is used inside Subscription class, use subscription class instead.

Arguments:

  • group_alias(str) - name of the group to unsubscribe from

Returns:

  • Reply(StatusMsg) - A Promise, which resolves when the unsubscribe operation is complete, fails otherwise.

motorcortex.reply

Reply Objects

class Reply(object)

Reply handle is a JavaScript-like Promise.

It is resolved when reply is received with successful status and fails otherwise.

get

 | get(timeout_ms=None)

A blocking call to wait for the reply and returns a value.

Arguments:

  • timeout_ms(integer) - timeout for reply in milliseconds

Returns:

A protobuf message with a parameter description and value.

Examples:

param_tree_reply = req.getParameterTree() value = param_tree_reply.get()

done

 | done()

Returns:

  • bool - True if the call was successfully cancelled or finished running.

then

 | then(received_clb, *args, **kwargs)

JavaScript-like promise, which is resolved when reply is received.

Arguments:

  • received_clb - callback which is resolved when reply is received.

Returns:

self pointer to add ‘catch’ callback

Examples:

param_tree_reply.then(lambda reply: print(“got reply: %s”%reply)) .catch(lambda g: print(“failed”))

catch

 | catch(failed_clb, *args, **kwargs)

JavaScript-like promise, which is resolved when receive has failed.

Arguments:

  • failed_clb - callback which is resolved when receive has failed

Returns:

self pointer to add ‘then’ callback

Examples:

param_tree_reply.catch(lambda g: print(“failed”)).then(lambda reply: print(“got reply: %s”%reply))

motorcortex.message_types

ParameterMsg

Data type which represents parameter description and value

ParameterListMsg

Data type which represents a list of parameters with descriptions and values

MessageTypes Objects

class MessageTypes(object)

Class for handling motorcortex data types, loads proto files and hash files, creates a dictionary with all available data types, resolves data types by, name or by hash, performs encoding and decoding of the messages.

motorcortex

 | motorcortex()

Returns default motorcortex messages, provided with the package. System messages could be replaced at runtime with a newer version, by load([{‘proto’: ‘path to the new message proto’, ‘hash’: ‘path to the new message hash’}])

Returns:

returns motorcortex messages

load

 | load(proto_hash_pair_list=None)

Loads an array of .proto and .json file pairs.

Arguments:

  • proto_hash_pair_list([{'hash'-str,'proto'-str}]) - list of hash and proto messages

Returns:

  • list(Module) - list of loaded modules with protobuf messages.

Examples:

motorcortex_msg, motionsl_msg = motorcortex_types.load( # motorcortex hashes and messages [{‘proto’: ‘./motorcortex-msg/motorcortex_pb2.py’, ‘hash’: ‘./motorcortex-msg/motorcortex_hash.json’}, # robot motion hashes and messages {‘proto’: ‘./motorcortex-msg/motionSL_pb2.py’, ‘hash’: ‘./motorcortex-msg/motionSL_hash.json’}])

createType

 | createType(type_name)

Returns an instance of the loaded data type given type name.

Arguments:

  • type_name(str) - type name

Returns:

an instance of the requested type.

getTypeByHash

 | getTypeByHash(id)

Returns a data type given its hash.

Arguments:

  • id(int) - type hash

Returns:

requested data type.

getTypeByName

 | getTypeByName(name)

Returns a data type given its name.

Arguments:

  • name(str) - type name

Returns:

requested data type.

getNamespace

 | getNamespace(name)

Returns a module/namespace with data types.

Arguments:

  • name(str) - module name

Returns:

requested module

Examples:

loading module motion_spec

MotionSpecType = motorcortex_types.getNamespace(“motion_spec”)

instantiating a motion program from the module

motion_program = MotionSpecType.MotionProgram()

decode

 | decode(wire_data)

Decodes data received from the server

encode

 | encode(proto_data)

Encodes data to send to the server

motorcortex.setup_logger