Remote Call Framework 3.0
SubscriptionService.hpp
1 
2 //******************************************************************************
3 // RCF - Remote Call Framework
4 //
5 // Copyright (c) 2005 - 2018, Delta V Software. All rights reserved.
6 // http://www.deltavsoft.com
7 //
8 // RCF is distributed under dual licenses - closed source or GPL.
9 // Consult your particular license for conditions of use.
10 //
11 // If you have not purchased a commercial license, you are using RCF
12 // under GPL terms.
13 //
14 // Version: 3.0
15 // Contact: support <at> deltavsoft.com
16 //
17 //******************************************************************************
18 
19 #ifndef INCLUDE_RCF_SUBSCRIPTIONSERVICE_HPP
20 #define INCLUDE_RCF_SUBSCRIPTIONSERVICE_HPP
21 
22 #include <functional>
23 #include <map>
24 #include <memory>
25 #include <set>
26 #include <string>
27 #include <utility>
28 
29 #include <RCF/ClientStub.hpp>
30 #include <RCF/Export.hpp>
31 #include <RCF/RcfClient.hpp>
32 #include <RCF/RcfFwd.hpp>
33 #include <RCF/PeriodicTimer.hpp>
34 #include <RCF/ServerStub.hpp>
35 #include <RCF/Service.hpp>
36 
37 namespace RCF {
38 
40  class RCF_EXPORT Subscription : Noncopyable
41  {
42  private:
43 
45  SubscriptionService & subscriptionService,
46  ClientTransportUniquePtr clientTransportUniquePtr,
47  RcfSessionWeakPtr rcfSessionWeakPtr,
48  std::uint32_t incomingPingIntervalMs,
49  const std::string & publisherUrl,
50  const std::string & topic,
51  OnSubscriptionDisconnect onDisconnect);
52 
53  void setWeakThisPtr(SubscriptionWeakPtr thisWeakPtr);
54 
55  public:
56 
57  ~Subscription();
58 
59  unsigned int getPingTimestamp();
60  RcfSessionPtr getRcfSessionPtr();
61 
63  bool isConnected();
64 
66  void close();
67 
68  private:
69  friend class SubscriptionService;
70 
71  static void onDisconnect(SubscriptionWeakPtr subPtr, RcfSession & session);
72 
73  SubscriptionService & mSubscriptionService;
74  SubscriptionWeakPtr mThisWeakPtr;
75 
76  RecursiveMutex mMutex;
77  RcfSessionWeakPtr mRcfSessionWeakPtr;
78 
79  RcfClientPtr mConnectionPtr;
80 
81  std::uint32_t mPubToSubPingIntervalMs = 0;
82  bool mPublisherSupportsPubSubPings = true;
83  std::string mPublisherUrl;
84  std::string mTopic;
85 
86  OnSubscriptionDisconnect mOnDisconnect;
87  bool mClosed = false;
88  };
89 
91  class RCF_EXPORT SubscriptionParms
92  {
93  public:
95 
97  void setTopicName(const std::string & publisherName);
98 
100  std::string getTopicName() const;
101 
103  void setPublisherEndpoint(const Endpoint & publisherEp);
104 
106  void setPublisherEndpoint(I_RcfClient & rcfClient);
107 
109  void setOnSubscriptionDisconnect(OnSubscriptionDisconnect onSubscriptionDisconnect);
110 
112  void setOnAsyncSubscribeCompleted(OnAsyncSubscribeCompleted onAsyncSubscribeCompleted);
113 
114  private:
115 
116  friend class SubscriptionService;
117 
118  std::string mPublisherName;
119  ClientStub mClientStub;
120  OnSubscriptionDisconnect mOnDisconnect;
121  OnAsyncSubscribeCompleted mOnAsyncSubscribeCompleted;
122  };
123 
124  class RCF_EXPORT SubscriptionService :
125  public I_Service,
126  Noncopyable
127  {
128  public:
129 
130  SubscriptionService(std::uint32_t pingIntervalMs = 0);
131 
132  ~SubscriptionService();
133 
134  template<typename Interface, typename T>
135  SubscriptionPtr createSubscription(
136  T & t,
137  const SubscriptionParms & parms)
138  {
139  std::string defaultPublisherName = getInterfaceName((Interface *) NULL);
140 
141  std::reference_wrapper<T> refWrapper(t);
142 
143  RcfClientPtr rcfClientPtr(
144  createServerStub((Interface *) 0, (T *) 0, refWrapper));
145 
146  return createSubscriptionImpl(rcfClientPtr, parms, defaultPublisherName);
147  }
148 
149  template<typename Interface, typename T>
150  SubscriptionPtr createSubscription(
151  T & t,
152  const RCF::Endpoint & publisherEp)
153  {
154  SubscriptionParms parms;
155  parms.setPublisherEndpoint(publisherEp);
156  return createSubscription<Interface>(t, parms);
157  }
158 
159  SubscriptionPtr createSubscriptionImpl(
160  RcfClientPtr rcfClientPtr,
161  const SubscriptionParms & parms,
162  const std::string & defaultPublisherName);
163 
164  void createSubscriptionImplBegin(
165  RcfClientPtr rcfClientPtr,
166  const SubscriptionParms & parms,
167  const std::string & defaultPublisherName);
168 
169  void createSubscriptionImplEnd(
170  ExceptionPtr ePtr,
171  ClientStubPtr clientStubPtr,
172  std::int32_t ret,
173  const std::string & publisherName,
174  RcfClientPtr rcfClientPtr,
175  OnSubscriptionDisconnect onDisconnect,
176  OnAsyncSubscribeCompleted onCompletion,
177  std::uint32_t pubToSubPingIntervalMs,
178  bool pingsEnabled);
179 
180  void closeSubscription(SubscriptionWeakPtr subscriptionPtr);
181 
182  void setPingIntervalMs(std::uint32_t pingIntervalMs);
183  std::uint32_t getPingIntervalMs() const;
184 
185  private:
186 
187  void onServerStart(RcfServer &server);
188  void onServerStop(RcfServer &server);
189 
190  RcfServer * mpServer;
191  Mutex mSubscriptionsMutex;
192 
193  typedef std::set<SubscriptionWeakPtr> Subscriptions;
194  Subscriptions mSubscriptions;
195 
196  std::uint32_t mPingIntervalMs;
197  PeriodicTimer mPeriodicTimer;
198 
199  virtual void onTimer();
200  void pingAllSubscriptions();
201  void harvestExpiredSubscriptions();
202 
203  static void sOnPingCompleted(RecursiveLockPtr lockPtr);
204 
205  public:
206 
207  SubscriptionPtr onRequestSubscriptionCompleted(
208  std::int32_t ret,
209  const std::string & publisherName,
210  ClientStub & clientStub,
211  RcfClientPtr rcfClientPtr,
212  OnSubscriptionDisconnect onDisconnect,
213  std::uint32_t pubToSubPingIntervalMs,
214  bool pingsEnabled);
215 
216  private:
217 
218  std::int32_t doRequestSubscription(
219  ClientStub & clientStubOrig,
220  const std::string & publisherName,
221  std::uint32_t subToPubPingIntervalMs,
222  std::uint32_t & pubToSubPingIntervalMs,
223  bool & pingsEnabled);
224 
225  void doRequestSubscriptionAsync(
226  ClientStub & clientStubOrig,
227  const std::string & publisherName,
228  RcfClientPtr rcfClientPtr,
229  const SubscriptionParms & parms);
230 
231  void doRequestSubscriptionAsync_Complete(
232  Future<Void> fv,
233  RcfClientPtr requestClientPtr,
234  const std::string & publisherName,
235  RcfClientPtr rcfClientPtr,
236  OnSubscriptionDisconnect onDisconnect,
237  OnAsyncSubscribeCompleted onCompletion);
238 
239  // Legacy subscription requests.
240 
241  std::int32_t doRequestSubscription_Legacy(
242  ClientStub & clientStubOrig,
243  const std::string & publisherName,
244  std::uint32_t subToPubPingIntervalMs,
245  std::uint32_t & pubToSubPingIntervalMs,
246  bool & pingsEnabled);
247 
248  void doRequestSubscriptionAsync_Legacy(
249  ClientStub & clientStubOrig,
250  const std::string & publisherName,
251  RcfClientPtr rcfClientPtr,
252  const SubscriptionParms & parms);
253 
254  void doRequestSubscriptionAsync_Legacy_Complete(
255  ClientStubPtr clientStubPtr,
257  const std::string & publisherName,
258  RcfClientPtr rcfClientPtr,
259  OnSubscriptionDisconnect onDisconnect,
260  OnAsyncSubscribeCompleted onCompletion,
261  Future<std::uint32_t> pubToSubPingIntervalMs,
262  bool pingsEnabled);
263 
264 
265  };
266 
267  typedef std::shared_ptr<SubscriptionService> SubscriptionServicePtr;
268 
269 } // namespace RCF
270 
271 #endif // ! INCLUDE_RCF_SUBSCRIPTIONSERVICE_HPP
Base class of all RcfClient<> templates.
Definition: RcfClient.hpp:45
Represents a server side session, associated with a client connection.
Definition: RcfSession.hpp:67
Controls the client side of a RCF connection.
Definition: ClientStub.hpp:69
std::unique_ptr< ClientTransport > ClientTransportUniquePtr
Unique pointer wrapper for RCF::ClientTransport.
Definition: RcfFwd.hpp:43
void setPublisherEndpoint(const Endpoint &publisherEp)
Sets the network endpoint of the publishing server.
Provides the ability for remote calls to be executed asynchronously.
Definition: Future.hpp:51
Provides RCF server-side functionality.
Definition: RcfServer.hpp:54
Base class for all network endpoint types.
Definition: Endpoint.hpp:41
std::function< void(RcfSession &)> OnSubscriptionDisconnect
Describes a user-provided callback function to be called on the subscriber side, whenever a subscribe...
Definition: RcfFwd.hpp:77
General configuration of a subscription.
Definition: SubscriptionService.hpp:91
Definition: AmiIoHandler.hpp:24
Represents a subscription to a RCF publisher. To create a subscription, use RcfServer::createSubscrip...
Definition: SubscriptionService.hpp:40
std::function< void(SubscriptionPtr, ExceptionPtr)> OnAsyncSubscribeCompleted
Describes a user-provided callback function to be called on the subscriber side, when an subscription...
Definition: RcfFwd.hpp:80