API
15 minute read
motorcortex
motorcortex-python
Motorcortex is a Python library for connecting to Motorcortex servers, handling requests, subscriptions, and parameter trees. Provides high-level APIs for communication, login, and data exchange using protocol buffers.
See documentation for usage examples.
parseUrl
def parseUrl(url: str) -> tuple[str, str, int | None, int | None]
Parses a Motorcortex connection URL to extract request and subscribe addresses and ports.
Arguments:
urlstr - The connection URL, expected in the format ‘address:req_port:sub_port’.
Returns:
-
tuple- (req_address, sub_address, req_port, sub_port)- req_address (str): Address for request connection.
- sub_address (str): Address for subscribe connection.
- req_port (int or None): Port for request connection.
- sub_port (int or None): Port for subscribe connection.
If the URL does not contain ports, default endpoints ‘/mcx_req’ and ‘/mcx_sub’ are appended.
makeUrl
def makeUrl(address: str, port: int | None) -> str
Constructs a URL string from an address and port.
Arguments:
addressstr - The base address.portint or None - The port number.
Returns:
str- The combined address and port in the format ‘address:port’, or just ‘address’ if port is None.
connect
def connect(url: str,
motorcortex_types: object,
param_tree: "ParameterTree",
reconnect: bool = True,
**kwargs) -> tuple["Request", "Subscribe"]
Establishes connections to Motorcortex request and subscribe endpoints, performs login, and loads the parameter tree.
Arguments:
urlstr - Connection URL in the format ‘address:req_port:sub_port’.motorcortex_typesmodule - Motorcortex message types module.param_treeParameterTree - ParameterTree instance to load parameters into.reconnectbool, optional - Whether to enable automatic reconnection. Defaults to True.**kwargs- Additional keyword arguments, including ‘login’ and ‘password’ for authentication.
Returns:
tuple- (req, sub)- req (Request): Established request connection.
- sub (Subscribe): Established subscribe connection.
Raises:
RuntimeError- If connection or login fails.
Examples:
from motorcortex import connect, MessageTypes, ParameterTree
url = "127.0.0.1:5555:5556"
types = MessageTypes()
tree = ParameterTree()
req, sub = connect(url, types, tree, certificate="mcx.cert.crt", timeout_ms=1000, login="admin", password="iddqd")
print(tree) # Parameter tree loaded from server
statusToStr
def statusToStr(motorcortex_msg: object, code: int) -> str
Converts status codes to a readable message.
Arguments:
motorcortex_msg(Module)- reference to a motorcortex modulecode(int)- status code
Returns:
str- status message
Examples:
login_reply = req.login("admin", "iddqd")
login_reply_msg = login_reply.get()
if login_reply_msg.status != motorcortex_msg.OK:
print(motorcortex.statusToStr(motorcortex_msg, login_reply_msg.status))
motorcortex.message_types
ParameterMsg
Data type which represents parameter description and value
ParameterListMsg
Data type which represents a list of parameters with descriptions and values
MessageTypes Objects
class MessageTypes(object)
Class for handling motorcortex data types, loads proto files and hash files, creates a dictionary with all available data types, resolves data types by, name or by hash, performs encoding and decoding of the messages.
motorcortex
def motorcortex()
Returns default motorcortex messages, provided with the package. System messages could be replaced at runtime with a newer version, by load([{‘proto’: ‘path to the new message proto’, ‘hash’: ‘path to the new message hash’}])
Returns:
returns motorcortex messages
load
def load(proto_hash_pair_list=None)
Loads an array of .proto and .json file pairs.
Arguments:
proto_hash_pair_list([{'hash'-str,'proto'-str}])- list of hash and proto messages
Returns:
list(Module)- list of loaded modules with protobuf messages.
Examples:
motorcortex_msg, motionsl_msg = motorcortex_types.load(
# motorcortex hashes and messages
[{'proto': './motorcortex-msg/motorcortex_pb2.py',
'hash': './motorcortex-msg/motorcortex_hash.json'},
# robot motion hashes and messages
{'proto': './motorcortex-msg/motionSL_pb2.py',
'hash': './motorcortex-msg/motionSL_hash.json'}])
createType
def createType(type_name)
Returns an instance of the loaded data type given type name.
Arguments:
type_name(str)- type name
Returns:
an instance of the requested type.
getTypeByHash
def getTypeByHash(id)
Returns a data type given its hash.
Arguments:
id(int)- type hash
Returns:
requested data type.
getTypeByName
def getTypeByName(name)
Returns a data type given its name.
Arguments:
name(str)- type name
Returns:
requested data type.
getNamespace
def getNamespace(name)
Returns a module/namespace with data types.
Arguments:
name(str)- module name
Returns:
requested module
Examples:
# loading module motion_spec
MotionSpecType = motorcortex_types.getNamespace("motion_spec")
# instantiating a motion program from the module
motion_program = MotionSpecType.MotionProgram()
decode
def decode(wire_data)
Decodes data received from the server
encode
def encode(proto_data)
Encodes data to send to the server
motorcortex.parameter_tree
ParameterTree Objects
class ParameterTree(object)
Represents a parameter tree, obtained from the server.
Reference to a parameter tree instance is needed for resolving parameters, data types and other information to build a correct request message.
__init__
def __init__() -> None
Initializes an empty ParameterTree.
load
def load(parameter_tree_msg: Any) -> None
Loads a parameter tree from ParameterTreeMsg received from the server.
Arguments:
parameter_tree_msgAny - Parameter tree message from the server (should have a .params attribute).
Examples:
parameter_tree = motorcortex.ParameterTree()
parameter_tree_msg = param_tree_reply.get()
parameter_tree.load(parameter_tree_msg)
getParameterTree
def getParameterTree() -> List[Any]
Returns the list of parameter descriptions in the tree.
Returns:
List[Any]- A list of parameter descriptions (ParameterInfo objects).
getInfo
def getInfo(parameter_path: str) -> Optional[Any]
Get the parameter description for a given path.
Arguments:
parameter_pathstr - Path of the parameter.
Returns:
Optional[Any]- Parameter description (ParameterInfo) if found, else None.
getDataType
def getDataType(parameter_path: str) -> Optional[Any]
Get the data type for a given parameter path.
Arguments:
parameter_pathstr - Path of the parameter.
Returns:
Optional[Any]- Parameter data type if found, else None.
motorcortex.reply
Reply Objects
class Reply(object)
Reply handle is a JavaScript-like Promise.
It is resolved when a reply is received with successful status and fails otherwise.
get
def get(timeout_ms=None)
A blocking call to wait for the reply and returns a value.
Arguments:
timeout_ms(integer)- timeout for reply in milliseconds
Returns:
A protobuf message with a parameter description and value.
Examples:
param_tree_reply = req.getParameterTree()
value = param_tree_reply.get()
done
def done()
Returns:
bool- True if the call was successfully canceled or finished running.
then
def then(received_clb, *args, **kwargs)
JavaScript-like promise, which is resolved when a reply is received.
Arguments:
received_clb- callback which is resolved when the reply is received.
Returns:
self pointer to add ‘catch’ callback
Examples:
param_tree_reply.then(lambda reply: print("got reply: %s"%reply))
.catch(lambda g: print("failed"))
catch
def catch(failed_clb, *args, **kwargs)
JavaScript-like promise, which is resolved when receive has failed.
Arguments:
failed_clb- callback which is resolved when receive has failed
Returns:
self pointer to add ‘then’ callback
Examples:
param_tree_reply.catch(lambda g: print("failed")).then(lambda reply: print("got reply: %s"%reply))
motorcortex.request
motorcortex.request
Provides the Request class for managing request connections to a Motorcortex server, including login, parameter retrieval, parameter updates, and group management.
ConnectionState Objects
class ConnectionState(Enum)
Enumeration of connection states.
- CONNECTING: Connection is being established.
- CONNECTION_OK: Connection is successfully established.
- CONNECTION_LOST: Connection was lost.
- CONNECTION_FAILED: Connection attempt failed.
- DISCONNECTING: Connection is being closed.
- DISCONNECTED: Connection is closed.
Request Objects
class Request()
Represents a request connection to a Motorcortex server.
The Request class allows you to:
- Establish and manage a connection to a Motorcortex server.
- Perform login authentication.
- Retrieve, set, and overwrite parameter values.
- Manage parameter groups for efficient batch operations.
- Save and load parameter trees.
- Chain asynchronous operations using a promise-like interface (
Reply).
Methods:
url() -> Optional[str] Returns the current connection URL.
-
connect(url- str, **kwargs) -> Reply Establishes a connection to the server.close() -> None Closes the connection and cleans up resources.
-
send(encoded_msg- Any, do_not_decode_reply: bool = False) -> Optional[Reply] Sends an encoded message to the server. -
login(login- str, password: str) -> Reply Sends a login request.connectionState() -> ConnectionState Returns the current connection state.
getParameterTreeHash() -> Reply Requests the parameter tree hash from the server.
getParameterTree() -> Reply Requests the parameter tree from the server.
-
save(path- str, file_name: str) -> Reply Requests the server to save the parameter tree to a file. -
setParameter(path- str, value: Any, type_name: Optional[str] = None, offset: int = 0, length: int = 0) -> Reply Sets a new value for a parameter. -
setParameterList(param_list- List[dict]) -> Reply Sets new values for a list of parameters. -
getParameter(path- str) -> Reply Requests a parameter value and description. -
getParameterList(path_list- List[str]) -> Reply Requests values and descriptions for a list of parameters. -
overwriteParameter(path- str, value: Any, force_activate: bool = False, type_name: Optional[str] = None) -> Reply Overwrites a parameter value and optionally forces it to stay active. -
releaseParameter(path- str) -> Reply Releases the overwrite operation for a parameter. -
createGroup(path_list- List[str], group_alias: str, frq_divider: int = 1) -> Reply Creates a subscription group for a list of parameters. -
removeGroup(group_alias- str) -> Reply Unsubscribes from a group.
Examples:
# Establish a connection
req = motorcortex.Request(protobuf_types, parameter_tree)
reply = req.connect("tls+tcp://localhost:6501", certificate="path/to/ca.crt")
if reply.get():
print("Connected!")
# Login
login_reply = req.login("user", "password")
if login_reply.get().status == motorcortex.OK:
print("Login successful")
# Get a parameter
param_reply = req.getParameter("MyDevice.MyParam")
param = param_reply.get()
print("Value:", param.value)
# Set a parameter
req.setParameter("MyDevice.MyParam", 42)
# Clean up
req.close()
__init__
def __init__(protobuf_types: "MessageTypes",
parameter_tree: "ParameterTree") -> None
Initialize a Request object.
Arguments:
protobuf_types- Motorcortex message types module.parameter_tree- ParameterTree instance.
url
def url() -> Optional[str]
Return the current connection URL.
connect
def connect(url: str, **kwargs) -> Reply
Establish a connection to the Motorcortex server.
Arguments:
url- Connection URL.**kwargs- Additional connection parameters. Supported keys include:certificatestr, optional - Path to a TLS certificate file for secure connections.conn_timeout_msint, optional - Connection timeout in milliseconds (default: 1000).recv_timeout_msint, optional - Receive timeout in milliseconds (default: 500).loginstr, optional - Username for authentication (if required by server).passwordstr, optional - Password for authentication (if required by server).state_updateCallable, optional - Callback function to be called on connection state changes.timeout_msint, optional - Alternative timeout in milliseconds (used by Request.parse).
Returns:
Reply- A promise that resolves when the connection is established.
close
def close() -> None
Close the request connection and clean up resources.
send
def send(encoded_msg: Any,
do_not_decode_reply: bool = False) -> Optional[Reply]
Send an encoded message to the server.
Arguments:
encoded_msg- Encoded protobuf message.do_not_decode_reply- If True, do not decode the reply.
Returns:
Reply or None: A promise for the reply, or None if not connected.
login
def login(login: str, password: str) -> Reply
Send a login request to the server.
Arguments:
login- User login.password- User password.
Returns:
Reply- A promise for the login reply.
connectionState
def connectionState() -> ConnectionState
Get the current connection state.
Returns:
ConnectionState- The current state.
getParameterTreeHash
def getParameterTreeHash() -> Reply
Request a parameter tree hash from the server.
Returns:
Reply- A promise for the parameter tree hash.
getParameterTree
def getParameterTree() -> Reply
Request a parameter tree from the server.
Returns:
Reply- A promise for the parameter tree.
save
def save(path: str, file_name: str) -> Reply
Request the server to save a parameter tree to a file.
Arguments:
path- Path to save the file.file_name- Name of the file.
Returns:
Reply- A promise for the save operation.
setParameter
def setParameter(path: str,
value: Any,
type_name: Optional[str] = None,
offset: int = 0,
length: int = 0) -> Reply
Set a new value for a parameter.
Arguments:
path- Parameter path.value- New value.type_name- Type name (optional).offset- Offset in array (optional).length- Number of elements to update (optional).
Returns:
Reply- A promise for the set operation.
setParameterList
def setParameterList(param_list: List[dict])
Set new values to a parameter list
Arguments:
param_list([{‘path’-str,‘value’-any, ‘offset’, ‘length’}]): a list of the parameters which values update
Returns:
Reply(StatusMsg)- A Promise, which resolves when parameters from the list are updated, otherwise fails.
Examples:
req.setParameterList([
{'path': 'root/Control/generator/enable', 'value': False},
{'path': 'root/Control/generator/amplitude', 'value': 1.4}])
{'path': 'root/Control/myArray6', 'value': [1.4, 1.5], 'offset': 1, 'length': 2}])
getParameter
def getParameter(path: str) -> Reply
Request a parameter value and description from the server.
Arguments:
path- Parameter path.
Returns:
Reply- A promise for the parameter value.
getParameterList
def getParameterList(path_list: List[str]) -> Reply
Request values and descriptions for a list of parameters.
Arguments:
path_list- List of parameter paths.
Returns:
Reply- A promise for the parameter list.
overwriteParameter
def overwriteParameter(path: str,
value: Any,
force_activate: bool = False,
type_name: Optional[str] = None) -> Reply
Overwrite a parameter value and optionally force it to stay active.
Arguments:
path- Parameter path.value- New value.force_activate- Force value to stay active.type_name- Type name (optional).
Returns:
Reply- A promise for the overwrite operation.
releaseParameter
def releaseParameter(path: str) -> Reply
Release the overwrite operation for a parameter.
Arguments:
path- Parameter path.
Returns:
Reply- A promise for the release operation.
createGroup
def createGroup(path_list: List[str],
group_alias: str,
frq_divider: int = 1) -> Reply
Create a subscription group for a list of parameters.
Arguments:
path_list- List of parameter paths.group_alias- Group alias.frq_divider- Frequency divider.
Returns:
Reply- A promise for the group creation.
removeGroup
def removeGroup(group_alias: str) -> Reply
Unsubscribe from a group.
Arguments:
group_alias- Group alias.
Returns:
Reply- A promise for the unsubscribe operation.
parse
@staticmethod
def parse(
conn_timeout_ms: int = 0,
timeout_ms: Optional[int] = None,
recv_timeout_ms: Optional[int] = None,
certificate: Optional[str] = None,
login: Optional[str] = None,
password: Optional[str] = None,
state_update: Optional[Callable] = None
) -> tuple[int, Optional[int], Optional[str], Optional[Callable]]
Parses connection parameters for the connect method.
Arguments:
conn_timeout_ms- Connection timeout in milliseconds.timeout_ms- Alternative timeout in milliseconds.recv_timeout_ms- Receive timeout in milliseconds.certificate- Path to the TLS certificate.login- Optional login name.password- Optional password.state_update- Optional state update callback.
Returns:
Tuple of (conn_timeout_ms, recv_timeout_ms, certificate, state_update).
waitForConnection
@staticmethod
def waitForConnection(
event: Event,
timeout_sec: float,
is_connected_fn: Optional[Callable[[], bool]] = None) -> bool
Waits for the connection event or times out.
Arguments:
event- Event to wait on.timeout_sec- Timeout in seconds.is_connected_fn- Optional function to check connection status.
Returns:
True if connection is established, raises on failure or timeout.
saveParameterTreeFile
@staticmethod
def saveParameterTreeFile(path: str,
parameter_tree: ParameterTree) -> ParameterTree
Saves the parameter tree to a file in the cache.
Arguments:
path- File path to save the parameter tree.parameter_tree- ParameterTree instance.
Returns:
The saved ParameterTree instance.
loadParameterTreeFile
@staticmethod
def loadParameterTreeFile(
path: str, protobuf_types: MessageTypes) -> Optional[ParameterTree]
Loads the parameter tree from a cached file if available and valid.
Arguments:
path- File path to load the parameter tree from.protobuf_types- Protobuf type definitions.
Returns:
The loaded ParameterTree instance, or None if not found/invalid.
motorcortex.setup_logger
motorcortex.state_callback_handler
StateCallbackHandler Objects
class StateCallbackHandler()
Handles state change callbacks processing.
This class manages a background thread and a queue to process state update notifications using a user-provided callback function.
__init__
def __init__() -> None
Initializes the StateCallbackHandler.
start
def start(state_update_handler: Callable[..., Any]) -> None
Start the callback handler with the given update function.
Arguments:
state_update_handler (Callable[…, Any]): The function to call when a state update is notified. Should accept arbitrary arguments.
stop
def stop() -> None
Stop the callback handler and clean up resources.
notify
def notify(*args: Any) -> None
Queue a state update notification.
Arguments:
*argsAny - Arguments to pass to the state update handler.
motorcortex.subscribe
Subscribe Objects
class Subscribe()
Subscribe class is used to receive continuous parameter updates from the motorcortex server.
This class simplifies creating and removing subscription groups, managing the connection, and handling the reception of parameter updates.
__init__
def __init__(req: Request, protobuf_types: Any) -> None
Initialize a Subscribe object.
Arguments:
reqRequest - Reference to a Request instance.protobuf_typesAny - Reference to a MessageTypes instance.
connect
def connect(url: str, **kwargs: Any) -> Reply
Open a subscription connection.
Arguments:
urlstr - Motorcortex server URL.**kwargs- Additional connection parameters. Supported keys include:certificatestr, optional - Path to a TLS certificate file for secure connections.conn_timeout_msint, optional - Connection timeout in milliseconds (default: 1000).recv_timeout_msint, optional - Receive timeout in milliseconds (default: 500).loginstr, optional - Username for authentication (if required by server).passwordstr, optional - Password for authentication (if required by server).state_updateCallable, optional - Callback function to be called on connection state changes.timeout_msint, optional - Alternative timeout in milliseconds (used by Request.parse).
Returns:
Reply- A promise that resolves when the connection is established.
close
def close() -> None
Close connection to the server and clean up resources.
run
def run(socket: Sub0) -> None
Main receive loop for the subscription socket.
Arguments:
socketSub0 - The subscription socket to receive messages from.
subscribe
def subscribe(param_list: List[str],
group_alias: str,
frq_divider: int = 1) -> Subscription
Create a subscription group for a list of parameters.
Arguments:
param_listList[str] - List of the parameters to subscribe to.group_aliasstr - Name of the group.frq_dividerint, optional - Frequency divider for the group publish rate. Defaults to 1.
Returns:
Subscription- A subscription handle, which acts as a JavaScript Promise. It is resolved when the subscription is ready or failed. After the subscription is ready, the handle is used to retrieve the latest data.
unsubscribe
def unsubscribe(subscription: Subscription) -> Reply
Unsubscribe from the group.
Arguments:
subscriptionSubscription - Subscription handle.
Returns:
Reply- A promise that resolves when the unsubscribe operation is complete, fails otherwise.
connectionState
def connectionState() -> ConnectionState
Get the current connection state.
Returns:
ConnectionState- The current state of the subscription connection.
resubscribe
def resubscribe() -> None
Resubscribe all current groups after a reconnect.
motorcortex.subscription
Subscription Objects
class Subscription(object)
Represents a subscription to a group of parameters in Motorcortex.
The Subscription class allows you to:
- Access the latest values and timestamps for a group of parameters.
- Poll for updates or use observer callbacks for real-time notifications.
- Chain asynchronous operations using
thenandcatch(promise-like interface).
Attributes:
group_aliasstr - Alias for the parameter group.protobuf_typesAny - Protobuf type definitions.frq_dividerstr - Frequency divider for the group.poolAny - Thread or process pool for observer callbacks.
Methods:
id() -> int Returns the subscription identifier.
alias() -> str Returns the group alias.
frqDivider() -> str Returns the frequency divider of the group.
read() -> Optional[List[Parameter]] Returns the latest values of the parameters in the group.
layout() -> Optional[List[str]] Returns the ordered list of parameter paths in the group.
done() -> bool Returns True if the subscription is finished or cancelled.
-
get(timeout_sec- float = 1.0) -> Optional[Any] Waits for the subscription to complete, returns the result or None on timeout. -
then(subscribed_clb- Callable[[Any], None]) -> Subscription Registers a callback for successful subscription completion. -
catch(failed- Callable[[], None]) -> Subscription Registers a callback for subscription failure. -
notify(observer_list- Union[Callable, List[Callable]]) -> None Registers observer(s) to be notified on every group update.
Examples:
# Make sure you have a valid connection
subscription = sub.subscribe(paths, "group1", 100)
result = subscription.get()
if result is not None and result.status == motorcortex.OK:
print(f"Subscription successful, layout: {subscription.layout()}")
else:
print(f"Subscription failed. Check parameter paths: {paths}")
sub.close()
exit()
# Use promise-like interface
subscription.then(lambda res: print("Subscribed:", res)).catch(lambda: print("Failed"))
# Use observer for real-time updates
def on_update(parameters):
for param in parameters:
timestamp = param.timestamp.sec + param.timestamp.nsec * 1e-9
print(f"Update: {timestamp:.6f}, {param.value}")
subscription.notify(on_update)
print("Waiting for parameter updates...")
import time
while True:
time.sleep(1)
id
def id() -> int
Returns:
int- subscription identifier
alias
def alias() -> str
Returns:
str- group alias
frqDivider
def frqDivider() -> str
Returns:
str- frequency divider of the group
read
def read() -> Optional[List["Parameter"]]
Read the latest values of the parameters in the group.
Returns:
list of Parameter: list of parameters
layout
def layout() -> Optional[List[str]]
Get a layout of the group.
Returns:
list of str: ordered list of the parameters in the group
done
def done() -> bool