Remote Call Framework 3.4
SubscriptionService.hpp
1 
2 //******************************************************************************
3 // RCF - Remote Call Framework
4 //
5 // Copyright (c) 2005 - 2023, Delta V Software. All rights reserved.
6 // https://www.deltavsoft.com
7 //
8 // RCF is distributed under dual licenses - closed source or GPL.
9 // Consult your particular license for conditions of use.
10 //
11 // If you have not purchased a commercial license, you are using RCF under GPL terms.
12 //
13 // Version: 3.4
14 // Contact: support <at> deltavsoft.com
15 //
16 //******************************************************************************
17 
18 #ifndef INCLUDE_RCF_SUBSCRIPTIONSERVICE_HPP
19 #define INCLUDE_RCF_SUBSCRIPTIONSERVICE_HPP
20 
21 #include <functional>
22 #include <map>
23 #include <memory>
24 #include <set>
25 #include <string>
26 #include <utility>
27 
28 #include <RCF/ClientStub.hpp>
29 #include <RCF/Export.hpp>
30 #include <RCF/RcfClient.hpp>
31 #include <RCF/RcfFwd.hpp>
32 #include <RCF/PeriodicTimer.hpp>
33 #include <RCF/ServerStub.hpp>
34 #include <RCF/Service.hpp>
35 
36 namespace RCF {
37 
39  class RCF_EXPORT Subscription : Noncopyable
40  {
41  private:
42 
44  SubscriptionService & subscriptionService,
45  ClientTransportUniquePtr clientTransportUniquePtr,
46  RcfSessionWeakPtr rcfSessionWeakPtr,
47  std::uint32_t incomingPingIntervalMs,
48  const std::string & publisherUrl,
49  const std::string & topic,
50  OnSubscriptionDisconnect onDisconnect);
51 
52  void setWeakThisPtr(SubscriptionWeakPtr thisWeakPtr);
53 
54  void setPublisherConnectionGuid(const std::string& connectionGuid);
55 
56  public:
57 
58  ~Subscription();
59 
60  unsigned int getPingTimestamp();
61  RcfSessionPtr getRcfSessionPtr();
62 
64  bool isConnected();
65 
67  void close();
68 
69  std::string getPublisherConnectionGuid() const;
70 
71  private:
72  friend class SubscriptionService;
73 
74  static void onDisconnect(SubscriptionWeakPtr subPtr, RcfSession & session);
75 
76  SubscriptionService & mSubscriptionService;
77  SubscriptionWeakPtr mThisWeakPtr;
78 
79  RecursiveMutex mMutex;
80  RcfSessionWeakPtr mRcfSessionWeakPtr;
81 
82  RcfClientPtr mConnectionPtr;
83 
84  std::uint32_t mPubToSubPingIntervalMs = 0;
85  bool mPublisherSupportsPubSubPings = true;
86  std::string mPublisherUrl;
87  std::string mTopic;
88  std::string mPublisherConnectionGuid;
89 
90  OnSubscriptionDisconnect mOnDisconnect;
91  bool mClosed = false;
92  };
93 
95  class RCF_EXPORT SubscriptionParms
96  {
97  public:
99 
101  void setTopicName(const std::string & publisherName);
102 
104  std::string getTopicName() const;
105 
107  void setPublisherEndpoint(const Endpoint & publisherEp);
108 
110  void setPublisherEndpoint(I_RcfClient & rcfClient);
111 
113  void setOnSubscriptionDisconnect(OnSubscriptionDisconnect onSubscriptionDisconnect);
114 
116  void setOnAsyncSubscribeCompleted(OnAsyncSubscribeCompleted onAsyncSubscribeCompleted);
117 
118  private:
119 
120  friend class SubscriptionService;
121 
122  std::string mPublisherName;
123  ClientStub mClientStub;
124  OnSubscriptionDisconnect mOnDisconnect;
125  OnAsyncSubscribeCompleted mOnAsyncSubscribeCompleted;
126  };
127 
128  class RCF_EXPORT SubscriptionService :
129  public I_Service,
130  Noncopyable
131  {
132  public:
133 
134  SubscriptionService(std::uint32_t pingIntervalMs = 0);
135 
136  ~SubscriptionService();
137 
138  template<typename Interface, typename T>
139  SubscriptionPtr createSubscription(
140  T & t,
141  const SubscriptionParms & parms)
142  {
143  std::string defaultPublisherName = getInterfaceName((Interface *) NULL);
144 
145  std::reference_wrapper<T> refWrapper(t);
146 
147  RcfClientPtr rcfClientPtr(
148  createServerStub((Interface *) 0, (T *) 0, refWrapper));
149 
150  return createSubscriptionImpl(rcfClientPtr, parms, defaultPublisherName);
151  }
152 
153  template<typename Interface, typename T>
154  SubscriptionPtr createSubscription(
155  T & t,
156  const RCF::Endpoint & publisherEp)
157  {
158  SubscriptionParms parms;
159  parms.setPublisherEndpoint(publisherEp);
160  return createSubscription<Interface>(t, parms);
161  }
162 
163  SubscriptionPtr createSubscriptionImpl(
164  RcfClientPtr rcfClientPtr,
165  const SubscriptionParms & parms,
166  const std::string & defaultPublisherName);
167 
168  void createSubscriptionImplBegin(
169  RcfClientPtr rcfClientPtr,
170  const SubscriptionParms & parms,
171  const std::string & defaultPublisherName);
172 
173  void createSubscriptionImplEnd(
174  ExceptionPtr ePtr,
175  ClientStubPtr clientStubPtr,
176  std::int32_t ret,
177  const std::string & publisherName,
178  RcfClientPtr rcfClientPtr,
179  OnSubscriptionDisconnect onDisconnect,
180  OnAsyncSubscribeCompleted onCompletion,
181  std::uint32_t pubToSubPingIntervalMs,
182  bool pingsEnabled);
183 
184  void closeSubscription(SubscriptionWeakPtr subscriptionPtr);
185 
186  void setPingIntervalMs(std::uint32_t pingIntervalMs);
187  std::uint32_t getPingIntervalMs() const;
188 
189  private:
190 
191  void onServerStart(RcfServer &server);
192  void onServerStop(RcfServer &server);
193 
194  RcfServer * mpServer;
195  Mutex mSubscriptionsMutex;
196 
197  typedef std::set<SubscriptionWeakPtr> Subscriptions;
198  Subscriptions mSubscriptions;
199 
200  std::uint32_t mPingIntervalMs;
201  PeriodicTimer mPeriodicTimer;
202 
203  virtual void onTimer();
204  void pingAllSubscriptions();
205  void harvestExpiredSubscriptions();
206 
207  static void sOnPingCompleted(RecursiveLockPtr lockPtr);
208 
209  public:
210 
211  SubscriptionPtr onRequestSubscriptionCompleted(
212  std::int32_t ret,
213  const std::string & publisherName,
214  ClientStub & clientStub,
215  RcfClientPtr rcfClientPtr,
216  OnSubscriptionDisconnect onDisconnect,
217  std::uint32_t pubToSubPingIntervalMs,
218  bool pingsEnabled);
219 
220  private:
221 
222  std::int32_t doRequestSubscription(
223  ClientStub & clientStubOrig,
224  const std::string & publisherName,
225  std::uint32_t subToPubPingIntervalMs,
226  std::uint32_t & pubToSubPingIntervalMs,
227  bool & pingsEnabled);
228 
229  void doRequestSubscriptionAsync(
230  ClientStub & clientStubOrig,
231  const std::string & publisherName,
232  RcfClientPtr rcfClientPtr,
233  const SubscriptionParms & parms);
234 
235  void doRequestSubscriptionAsync_Complete(
236  Future<Void> fv,
237  RcfClientPtr requestClientPtr,
238  const std::string & publisherName,
239  RcfClientPtr rcfClientPtr,
240  OnSubscriptionDisconnect onDisconnect,
241  OnAsyncSubscribeCompleted onCompletion);
242 
243  // Legacy subscription requests.
244 
245  std::int32_t doRequestSubscription_Legacy(
246  ClientStub & clientStubOrig,
247  const std::string & publisherName,
248  std::uint32_t subToPubPingIntervalMs,
249  std::uint32_t & pubToSubPingIntervalMs,
250  bool & pingsEnabled);
251 
252  void doRequestSubscriptionAsync_Legacy(
253  ClientStub & clientStubOrig,
254  const std::string & publisherName,
255  RcfClientPtr rcfClientPtr,
256  const SubscriptionParms & parms);
257 
258  void doRequestSubscriptionAsync_Legacy_Complete(
259  ClientStubPtr clientStubPtr,
261  const std::string & publisherName,
262  RcfClientPtr rcfClientPtr,
263  OnSubscriptionDisconnect onDisconnect,
264  OnAsyncSubscribeCompleted onCompletion,
265  Future<std::uint32_t> pubToSubPingIntervalMs,
266  bool pingsEnabled);
267 
268 
269  };
270 
271  typedef std::shared_ptr<SubscriptionService> SubscriptionServicePtr;
272 
273 } // namespace RCF
274 
275 #endif // ! INCLUDE_RCF_SUBSCRIPTIONSERVICE_HPP
Base class of all RcfClient<> templates.
Definition: RcfClient.hpp:44
Represents a server side session, associated with a client connection.
Definition: RcfSession.hpp:64
Controls the client side of a RCF connection.
Definition: ClientStub.hpp:82
std::unique_ptr< ClientTransport > ClientTransportUniquePtr
Unique pointer wrapper for RCF::ClientTransport.
Definition: RcfFwd.hpp:43
void setPublisherEndpoint(const Endpoint &publisherEp)
Sets the network endpoint of the publishing server.
Provides the ability for remote calls to be executed asynchronously.
Definition: Future.hpp:50
Provides RCF server-side functionality.
Definition: RcfServer.hpp:53
Base class for all network endpoint types.
Definition: Endpoint.hpp:40
std::function< void(RcfSession &)> OnSubscriptionDisconnect
Describes a user-provided callback function to be called on the subscriber side, whenever a subscribe...
Definition: RcfFwd.hpp:77
General configuration of a subscription.
Definition: SubscriptionService.hpp:95
Definition: AmiIoHandler.hpp:23
Represents a subscription to a RCF publisher. To create a subscription, use RcfServer::createSubscrip...
Definition: SubscriptionService.hpp:39
std::function< void(SubscriptionPtr, ExceptionPtr)> OnAsyncSubscribeCompleted
Describes a user-provided callback function to be called on the subscriber side, when an subscription...
Definition: RcfFwd.hpp:80