Motorcortex Core
version: 2.7.6
cm_reqrep.h
1
/*
2
* Developer : Alexey Zakharov (alexey.zakharov@vectioneer.com)
3
* All rights reserved. Copyright (c) 2019 VECTIONEER.
4
*/
5
6
#ifndef CM_REQREP_H
7
#define CM_REQREP_H
8
9
#include "cm_basereqrep.h"
10
#include "cm_rpcmotorcortex.h"
11
#include "cm_groupmanager.h"
12
#include "cm_paramtree.h"
13
#include "cm_conndata.h"
14
#include "utl_realtime.h"
15
#include "utl_buffer.h"
16
17
#include <string>
18
#include <vector>
19
#include <nng/nng.h>
20
#include <nng/supplemental/tls/tls.h>
21
22
namespace
mcx {
23
24
namespace
parameter_server {
25
class
Parameter;
26
}
27
28
namespace
comm {
29
30
class
AuthorizationBase;
31
32
struct
PipeEvent
{
33
nng_pipe_ev event;
34
nng_pipe pipe;
35
};
36
37
using
RPCProcessorMap = std::map<uint32_t, RPCProcessor*>;
38
39
// The server keeps a list of work items, sorted by expiration time,
40
// so that we can use this to set the timeout to the correct value for
41
// use in poll.
42
struct
Work
{
43
enum
{
44
INIT, RECV, SEND, FINISHED
45
} state;
46
nng_aio* aio;
47
nng_msg* msg;
48
nng_ctx ctx;
49
utils::MpmcQueue<PipeEvent>
* pipe_event_queue;
50
AuthorizationBase
* authorization_ptr;
51
RPCProcessorMap* rpc_processor_map;
52
GroupManager
* group_manager;
53
utils::PageFaults
page_faults;
54
};
55
56
class
RequestReply
:
public
BaseRequestReply
{
57
58
static
constexpr
auto
SOCKET_EVENT_QUEUE = 128;
59
60
public
:
61
explicit
RequestReply
(ProtocolType protocol_type = ProtocolType::SINGLE_TIMESTAMP);
62
63
RequestReply
(
const
RequestReply
& orig) =
delete
;
64
65
~
RequestReply
()
override
;
66
67
bool
addRPCProcessor(
const
char
* name,
RPCProcessor
* rpc_processor,
bool
force =
false
);
68
69
bool
configure(
parameter_server::Parameter
* root,
const
std::string& path);
70
71
bool
start(
const
ConnectionData
& conn_data);
72
73
void
iterate(
unsigned
int
timeout_micro_s = 50);
74
75
void
stop();
76
77
private
:
78
79
static
void
pipeEvent(nng_pipe pipe, nng_pipe_ev pipe_ev,
void
* ptr);
80
81
// needed for older g++ in yocto
82
static
void
pipeEvent(nng_pipe pipe,
int
pipe_ev,
void
* ptr);
83
84
static
bool
process(
Work
* work);
85
86
static
void
serverCb(
void
* arg);
87
88
static
struct
Work
* allocWork(nng_socket sock);
89
90
91
#ifdef __clang__
92
#pragma clang diagnostic push
93
#pragma clang diagnostic ignored "-Wunused-private-field"
94
#endif
95
struct
utils::PageFaults
page_faults_{};
96
#ifdef __clang__
97
#pragma clang diagnostic pop
98
#endif
99
100
nng_socket sock_{};
101
union
{
102
nng_listener listener;
103
nng_dialer dialer;
104
} endpoint_;
105
bool
is_listener_{};
106
nng_tls_config* tls_cfg_{
nullptr
};
107
utils::MpmcQueue<PipeEvent>
pipe_event_queue_{SOCKET_EVENT_QUEUE};
108
std::vector<struct Work*> works_;
109
std::map<uint32_t, RPCProcessor*> rpc_processor_map_;
110
111
AuthorizationBase
* authorization_ptr_{};
112
ParameterTree
parameter_tree_;
113
RPCMotorcortex
rpc_motorcortex_;
114
115
};
116
117
}
// namespace comm
118
119
}
// namespace mcx
120
121
#endif
/*CM_REQREP_H*/
mcx::comm::GroupManager
Definition:
cm_groupmanager.h:55
mcx::parameter_server::Parameter
Definition:
ps_parameter.h:45
mcx::comm::BaseRequestReply
Definition:
cm_basereqrep.h:18
mcx::comm::ConnectionData
Definition:
cm_conndata.h:18
mcx::comm::RPCProcessor
Definition:
cm_rpcprocessor.h:13
mcx::comm::RPCMotorcortex
Definition:
cm_rpcmotorcortex.h:31
mcx::comm::AuthorizationBase
Definition:
cm_authorization.h:29
mcx::comm::PipeEvent
Definition:
cm_reqrep.h:32
mcx::comm::RequestReply
Definition:
cm_reqrep.h:56
mcx::utils::PageFaults
Page faults.
Definition:
utl_realtime.h:49
mcx::comm::ParameterTree
Definition:
cm_paramtree.h:30
mcx::comm::Work
Definition:
cm_reqrep.h:42
mcx::utils::MpmcQueue
Definition:
utl_buffer.h:70
communication
cm_reqrep.h
Generated by
1.8.18