RpcService.h
1 #pragma once
2 
3 #include "stormancer/BuildConfig.h"
4 
5 #include <memory>
6 #include <string>
7 #include <unordered_map>
8 
9 #include "stormancer/Tasks.h"
10 #include "rxcpp/rx.hpp"
11 #include "stormancer/RPC/RpcRequestContext.h"
12 #include "stormancer/Scene.h"
13 #include "stormancer/IActionDispatcher.h"
15 #include "stormancer/Logger/ILogger.h"
16 #include "stormancer/MessageOriginFilter.h"
17 #include "stormancer/Packet.h"
18 
19 
20 namespace Stormancer
21 {
22  class ILogger;
23  class RpcPlugin;
24  class RpcRequest;
25  class RpcService
26  {
27  friend class RpcPlugin;
28 
29  public:
30 
31 #pragma region public_methods
32 
33  RpcService(std::weak_ptr<Scene> scene, std::shared_ptr<IActionDispatcher> dispatcher);
34 
35  virtual ~RpcService();
36 
41  {
45  MessageOriginFilter filter = MessageOriginFilter::Host;
46 
50  bool ordered = false;
51 
55  DispatchMethod dispatchMethod = DispatchMethod::ActionDispatcher;
56  };
57 
59  void addProcedure(const std::string& route, std::function<pplx::task<void>(RpcRequestContext_ptr)> handler, MessageOriginFilter filter = MessageOriginFilter::Host, bool ordered = false)
60  {
61  ServerRpcOptions options;
62  options.filter = filter;
63  options.ordered = ordered;
64  addProcedure(route, handler, options);
65  }
66 
67  void addProcedure(const std::string& route, std::function<pplx::task<void>(RpcRequestContext_ptr)> handler, const ServerRpcOptions& options);
68 
69  uint16 pendingRequests();
70 
72  void cancelAll(const std::string& reason);
73 
74  std::shared_ptr<IActionDispatcher> getDispatcher();
75 
80  {
85 
89  DispatchMethod dispatchMethod = DispatchMethod::ActionDispatcher;
90  };
91 
93  rxcpp::observable<Packetisp_ptr> rpcObservable(const std::string& route, const StreamWriter& streamWriter, PacketPriority priority = PacketPriority::MEDIUM_PRIORITY)
94  {
95  ClientRpcOptions options;
96  options.priority = priority;
97  return rpcObservable(route, streamWriter, options);
98  }
99 
100  rxcpp::observable<Packetisp_ptr> rpcObservable(const std::string& route, const StreamWriter& streamWriter, const ClientRpcOptions& options);
101 
102  template<typename TOut = void, typename... TIn>
103  pplx::task<TOut> rpc(const std::string& route, pplx::cancellation_token ct, const TIn& ... args)
104  {
105  auto& serializer = _serializer;
106  auto streamWriter = [&serializer, &args...](obytestream& stream)
107  {
108  serializer.serialize(stream, args...);
109  };
110  return rpcImpl<TOut>(rpcObservable(route, streamWriter), route, ct);
111  }
112 
113  template<typename TOut = void, typename TStreamWriter>
114  typename std::enable_if<std::is_convertible<TStreamWriter, StreamWriter>::value, pplx::task<TOut>>::type rpc(const std::string& route, pplx::cancellation_token ct, const TStreamWriter& streamWriter)
115  {
116  return rpcImpl<TOut>(rpcObservable(route, streamWriter), route, ct);
117  }
118 
119  template<typename TOut = void, typename... TIn>
120  pplx::task<TOut> rpc(const std::string& route, const TIn& ... args)
121  {
122  return rpc<TOut, TIn...>(route, pplx::cancellation_token::none(), args...);
123  }
124 
125  template<typename TOut = void, typename TStreamWriter>
126  typename std::enable_if<std::is_convertible<TStreamWriter, StreamWriter>::value, pplx::task<TOut>>::type rpc(const std::string& route, const TStreamWriter& streamWriter)
127  {
128  return rpc<TOut, TStreamWriter>(route, pplx::cancellation_token::none(), streamWriter);
129  }
130 
131 #pragma endregion
132 
133  private:
134 
135 #pragma region private_classes
136 
137  struct RunningServerRpc
138  {
139  pplx::cancellation_token_source cts;
140  DispatchMethod dispatchMethod;
141 
142  RunningServerRpc(pplx::cancellation_token_source cts, DispatchMethod dispatchMethod)
143  : cts(cts)
144  , dispatchMethod(dispatchMethod)
145  {}
146  };
147 
148 #pragma endregion
149 
150 #pragma region private_methods
151 
152  template<typename TOut>
153  pplx::task<TOut> rpcImpl(rxcpp::observable<Packetisp_ptr> observable, const std::string& route, pplx::cancellation_token ct = pplx::cancellation_token::none())
154  {
155  pplx::task_completion_event<TOut> tce;
156 
157  auto serializer = _serializer;
158  auto onNext = [serializer, tce](Packetisp_ptr packet)
159  {
160  TOut out = serializer.deserializeOne<TOut>(packet->stream);
161  tce.set(out);
162  };
163 
164  auto onComplete = []()
165  {
166  };
167 
168  return rpcInternal(observable, tce, route, onNext, onComplete, ct);
169  }
170 
171  template<typename TOut>
172  pplx::task<TOut> rpcInternal(rxcpp::observable<Packetisp_ptr> observable, pplx::task_completion_event<TOut> tce, const std::string& route, const std::function<void(Packetisp_ptr)>& onNext, const std::function<void()>& onComplete, pplx::cancellation_token ct = pplx::cancellation_token::none())
173  {
174  auto logger = _logger;
175  auto onError = [logger, tce, route](std::exception_ptr error)
176  {
177  logger->log(LogLevel::Trace, "Rpc", "An exception occurred during the rpc '" + route + "'");
178  tce.set_exception(error);
179  };
180 
181  auto subscription = observable.subscribe(onNext, onError, onComplete);
182 
183  if (ct.is_cancelable())
184  {
185  ct.register_callback([subscription]()
186  {
187  if (subscription.is_subscribed())
188  {
189  subscription.unsubscribe();
190  }
191  });
192  }
193 
194  return create_task(tce, getDispatcher());
195  }
196 
197  void next(Packetisp_ptr packet);
198  void error(Packetisp_ptr packet);
199  void complete(Packetisp_ptr packet);
200  void cancel(Packetisp_ptr packet);
201 
202  uint16 reserveId();
203  std::shared_ptr<RpcRequest> getPendingRequest(Packetisp_ptr packet);
204  void eraseRequest(uint16 requestId);
205 
206 #pragma endregion
207 
208 #pragma region private_members
209 
210  std::shared_ptr<IActionDispatcher> _dispatcher;
211  std::unordered_map<uint16, std::shared_ptr<RpcRequest>> _pendingRequests;
212  std::mutex _pendingRequestsMutex;
213  std::unordered_map<uint16, RunningServerRpc> _runningRequests;
214  std::mutex _runningRequestsMutex;
215  std::weak_ptr<Scene> _scene;
216  Serializer _serializer;
217  const std::string _rpcServerChannelIdentifier = "RPC_server";
218  std::shared_ptr<Stormancer::ILogger> _logger;
219  uint16 _currentId = 0;
220 
221 #pragma endregion
222  };
223 
224  template<>
225  pplx::task<Packetisp_ptr> RpcService::rpcImpl(rxcpp::observable<Packetisp_ptr> observable, const std::string& route, pplx::cancellation_token ct);
226 
227  template<>
228  pplx::task<void> RpcService::rpcImpl(rxcpp::observable<Packetisp_ptr> observable, const std::string& route, pplx::cancellation_token ct);
229 }
void addProcedure(const std::string &route, std::function< pplx::task< void >(RpcRequestContext_ptr)> handler, MessageOriginFilter filter=MessageOriginFilter::Host, bool ordered=false)
Add a procedure to execute when the server send an RPC.
Definition: RpcService.h:59
DispatchMethod dispatchMethod
How the RPC response should be dispatched.
Definition: RpcService.h:89
bool ordered
Whether messages sent via RpcRequestContext::sendValue should be ordered for this RPC.
Definition: RpcService.h:50
Options for client-to-server RPCs.
Definition: RpcService.h:79
Definition: RpcPlugin.h:10
rxcpp::observable< Packetisp_ptr > rpcObservable(const std::string &route, const StreamWriter &streamWriter, PacketPriority priority=PacketPriority::MEDIUM_PRIORITY)
Send an RPC and returns an observable.
Definition: RpcService.h:93
MessageOriginFilter filter
Filter incoming RPCs based on sender type (Host and/or Peer).
Definition: RpcService.h:45
Definition: RpcService.h:25
Definition: obytestream.h:13
Options for server-to-client RPCs.
Definition: RpcService.h:40
@ MEDIUM_PRIORITY
Definition: PacketPriority.h:32
PacketPriority
These enumerations are used to describe when packets are delivered.
Definition: PacketPriority.h:21
PacketPriority priority
Priority of the RPC.
Definition: RpcService.h:84
This file contains enumerations for packet priority and reliability enumerations.
void cancelAll(const std::string &reason)
Cancel all RPCs.
DispatchMethod dispatchMethod
How to dispatch the RPC handler when an RPC call is received.
Definition: RpcService.h:55
TOutput deserializeOne(ibytestream &stream) const
Definition: Serializer.h:34