Motorcortex Core  version: 2.7.6
cm_reqrep.h
1 /*
2  * Developer : Alexey Zakharov (alexey.zakharov@vectioneer.com)
3  * All rights reserved. Copyright (c) 2019 VECTIONEER.
4  */
5 
6 #ifndef CM_REQREP_H
7 #define CM_REQREP_H
8 
9 #include "cm_basereqrep.h"
10 #include "cm_rpcmotorcortex.h"
11 #include "cm_groupmanager.h"
12 #include "cm_paramtree.h"
13 #include "cm_conndata.h"
14 #include "utl_realtime.h"
15 #include "utl_buffer.h"
16 
17 #include <string>
18 #include <vector>
19 #include <nng/nng.h>
20 #include <nng/supplemental/tls/tls.h>
21 
22 namespace mcx {
23 
24 namespace parameter_server {
25 class Parameter;
26 }
27 
28 namespace comm {
29 
30 class AuthorizationBase;
31 
32 struct PipeEvent {
33  nng_pipe_ev event;
34  nng_pipe pipe;
35 };
36 
37 using RPCProcessorMap = std::map<uint32_t, RPCProcessor*>;
38 
39 // The server keeps a list of work items, sorted by expiration time,
40 // so that we can use this to set the timeout to the correct value for
41 // use in poll.
42 struct Work {
43  enum {
44  INIT, RECV, SEND, FINISHED
45  } state;
46  nng_aio* aio;
47  nng_msg* msg;
48  nng_ctx ctx;
49  utils::MpmcQueue<PipeEvent>* pipe_event_queue;
50  AuthorizationBase* authorization_ptr;
51  RPCProcessorMap* rpc_processor_map;
52  GroupManager* group_manager;
53  utils::PageFaults page_faults;
54 };
55 
57 
58  static constexpr auto SOCKET_EVENT_QUEUE = 128;
59 
60 public:
61  explicit RequestReply(ProtocolType protocol_type = ProtocolType::SINGLE_TIMESTAMP);
62 
63  RequestReply(const RequestReply& orig) = delete;
64 
65  ~RequestReply() override;
66 
67  bool addRPCProcessor(const char* name, RPCProcessor* rpc_processor, bool force = false);
68 
69  bool configure(parameter_server::Parameter* root, const std::string& path);
70 
71  bool start(const ConnectionData& conn_data);
72 
73  void iterate(unsigned int timeout_micro_s = 50);
74 
75  void stop();
76 
77 private:
78 
79  static void pipeEvent(nng_pipe pipe, nng_pipe_ev pipe_ev, void* ptr);
80 
81  // needed for older g++ in yocto
82  static void pipeEvent(nng_pipe pipe, int pipe_ev, void* ptr);
83 
84  static bool process(Work* work);
85 
86  static void serverCb(void* arg);
87 
88  static struct Work* allocWork(nng_socket sock);
89 
90 
91 #ifdef __clang__
92 #pragma clang diagnostic push
93 #pragma clang diagnostic ignored "-Wunused-private-field"
94 #endif
95  struct utils::PageFaults page_faults_{};
96 #ifdef __clang__
97 #pragma clang diagnostic pop
98 #endif
99 
100  nng_socket sock_{};
101  union {
102  nng_listener listener;
103  nng_dialer dialer;
104  } endpoint_;
105  bool is_listener_{};
106  nng_tls_config* tls_cfg_{nullptr};
107  utils::MpmcQueue<PipeEvent> pipe_event_queue_{SOCKET_EVENT_QUEUE};
108  std::vector<struct Work*> works_;
109  std::map<uint32_t, RPCProcessor*> rpc_processor_map_;
110 
111  AuthorizationBase* authorization_ptr_{};
112  ParameterTree parameter_tree_;
113  RPCMotorcortex rpc_motorcortex_;
114 
115 };
116 
117 } // namespace comm
118 
119 } // namespace mcx
120 
121 #endif /*CM_REQREP_H*/
mcx::comm::GroupManager
Definition: cm_groupmanager.h:55
mcx::parameter_server::Parameter
Definition: ps_parameter.h:45
mcx::comm::BaseRequestReply
Definition: cm_basereqrep.h:18
mcx::comm::ConnectionData
Definition: cm_conndata.h:18
mcx::comm::RPCProcessor
Definition: cm_rpcprocessor.h:13
mcx::comm::RPCMotorcortex
Definition: cm_rpcmotorcortex.h:31
mcx::comm::AuthorizationBase
Definition: cm_authorization.h:29
mcx::comm::PipeEvent
Definition: cm_reqrep.h:32
mcx::comm::RequestReply
Definition: cm_reqrep.h:56
mcx::utils::PageFaults
Page faults.
Definition: utl_realtime.h:49
mcx::comm::ParameterTree
Definition: cm_paramtree.h:30
mcx::comm::Work
Definition: cm_reqrep.h:42
mcx::utils::MpmcQueue
Definition: utl_buffer.h:70