Apama  10.15.3.4
sag_connectivity_chain_managers.hpp
Go to the documentation of this file.
1 /*
2  * $Copyright (c) 2015-2018, 2020, 2022 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.$
3  * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG
4  */
5 
6 
7 #ifndef _SAG_CONNECTIVITY_CHAIN_MANAGERS_HPP_
8 #define _SAG_CONNECTIVITY_CHAIN_MANAGERS_HPP_
9 
11 #include <memory>
12 #include <unordered_map>
13 #include <vector>
14 
19 namespace com {
20 namespace softwareag {
21 namespace _DATAT_INTERNAL_CPP_NAMESPACE {
22 namespace chainmanagers {
23 
42 {
43 public:
80  virtual void onChannelCreated(const std::string &channel, Direction direction) = 0;
81 
98  virtual void onChannelDestroyed(const std::string &channel, Direction direction) = 0;
99 };
100 
101 template<typename TRANSPORT>
102 class AbstractChainManager; // forward decl for Host friend
103 
104 template<typename TRANSPORT>
105 class ChainManagerHost; // forward decl for Chain friend
106 
113  std::string pluginName;
114  map_t config;
115  PluginConfiguration(const std::string &pluginName, const map_t &config) : pluginName(pluginName), config(config.copy()) {}
116  PluginConfiguration(const PluginConfiguration &other) : pluginName(other.pluginName), config(other.config.copy()) {}
117  PluginConfiguration &operator=(const PluginConfiguration &other) {
118  if(this != &other) {
119  pluginName = other.pluginName;
120  config=other.config.copy();
121  }
122  return *this;
123  }
124  PluginConfiguration(PluginConfiguration&& other) = default;
125  PluginConfiguration &operator=(PluginConfiguration &&other) = default;
126  ~PluginConfiguration() = default;
127 
129  const std::string& getPluginName() const { return pluginName; }
130 
132  const map_t &getConfiguration() const { return config; }
133 };
134 
135 
148 struct ChainDefinition : public std::vector<PluginConfiguration> {
149  typedef std::vector<PluginConfiguration> vector_t;
150  ChainDefinition() {}
155  const map_t& getTransportConfig() const {
156  return back().getConfiguration();
157  }
158 };
159 
160 typedef std::unordered_map<std::string, ChainDefinition> ChainDefinitions;
161 
162 namespace converters_impl {
163 ChainDefinitions convertChainDefinitions(const map_t &chainDefinitions);
164 list_t convertChainDefinition(const ChainDefinition& chainDefinition);
165 }
166 
167 
174 template<typename TRANSPORT>
175 class Chain
176 {
177  // so host can call the private constructor
178  friend class ChainManagerHost<TRANSPORT>;
179  // not copyable - this is a singleton ptr, but does not own the lifetime.
180  Chain(const Chain &other) = delete;
181  Chain &operator=(const Chain &other) = delete;
182  Chain(const Chain &&other) = delete;
183  Chain &operator=(const Chain &&other) = delete;
184  void startImpl() const;
185 public:
186  ~Chain() = default; // default destructor is OK, though @private
202  const Chain &start() const {
203  startImpl();
204  return *this;
205  }
222  startImpl();
223  return *this;
224  }
226  void destroy();
227 
229  TRANSPORT *getTransport() const {
230  if (!transport) throw std::runtime_error("Cannot call getTransport after chain has been destroyed");
231  return transport;
232  }
233 
235  const std::string &getChainId() { return chainId; }
236 
237  typedef std::shared_ptr<Chain> ptr_t;
238 
239 
240 private:
241  Chain(TRANSPORT* transport, void* connectivityManager, const std::string &chainId, const std::string &managerName)
242  :transport(transport), connectivityManager(connectivityManager), chainId(chainId), managerName(managerName)
243  {}
244 
245  TRANSPORT* transport;
246  void* connectivityManager;
247  std::string chainId;
248  // just for error messages for now
249  std::string managerName;
250 };
251 
257 template<typename TRANSPORT>
258 class ChainManagerHost
259 {
260  // give friend status to ChainManager so it can get the hostptr
261  friend class AbstractChainManager<TRANSPORT>;
262 
264  ChainManagerHost(void *connectivityManager, const AbstractChainManager<TRANSPORT> *chainManager): connectivityManager(connectivityManager), chainManager(chainManager) {}
265 
266 public:
287  template<typename ... ARGS>
288  typename Chain<TRANSPORT>::ptr_t createChain(const std::string &chainInstanceIdSuffix, const std::string &defaultChannelTowardsHost, const list_t &subscribeChannels, const ChainDefinition &chainDefinition, const map_t &substitutions, ARGS ... transportConstructorArgs) const;
289 
301  void addChannelLifecycleListener(ChannelLifecycleListener *listener, const std::string &channelPrefix) const;
302 private:
303 
304  template<typename TUPLE>
305  typename Chain<TRANSPORT>::ptr_t createChainImpl(const std::string &chainInstanceIdSuffix, const std::string &defaultChannelTowardsHost, const list_t &subscribeChannels, const ChainDefinition &chainDefinition, const map_t &substitutions, const TUPLE& tuple) const;
306 
308  static sag_error_t deleteChannelLifecycleListener(sag_plugin_t callback);
310  static sag_error_t callOnChannelCreated(const char *managerLogName, sag_plugin_t callback, const char*, bool);
312  static sag_error_t callOnChannelDestroyed(const char *managerLogName, sag_plugin_t callback, const char*, bool);
314  void *connectivityManager;
316  const AbstractChainManager<TRANSPORT> *chainManager;
317 };
318 
319 
330 {
331 public:
337  const list_t &getSubscribeChannels() const { return subscribeChannels; }
338 
344  const std::string &getDefaultChannelTowardsHost() const { return defaultChannelTowardsHost; }
345 
349  ManagedTransportConstructorParameters(const std::string &pluginName, const std::string &chainId, const map_t &config, void* connectivityManager,
350  const std::string &defaultChannelTowardsHost, const list_t subscribeChannels, void* reserved)
351  : TransportConstructorParameters(pluginName, chainId, config, connectivityManager, reserved),
352  defaultChannelTowardsHost(defaultChannelTowardsHost), subscribeChannels(subscribeChannels.copy())
353  {}
354 
355 private:
356  std::string defaultChannelTowardsHost;
357  list_t subscribeChannels;
358 };
359 
367 {
368  template <typename TRANSPORT> friend class AbstractChainManager;
369  // not copyable: ChainManagers should use the config, etc that are their field members.
371  ChainManagerConstructorParameters& operator=(const ChainManagerConstructorParameters &params) = delete;
372 public:
377  const map_t &getManagerConfig() const { return managerConfig; }
378 
382  const std::string &getManagerName() const { return managerName; }
383 
387  const std::string &getTransportPluginName() const { return transportPluginName; }
388 
392  ChainManagerConstructorParameters(const map_t &managerConfig, const std::string &managerName, const std::string &transportPluginName, void* connectivityManager, const map_t &chainDefs, void* reserved)
393  : managerName(managerName), transportPluginName(transportPluginName), managerConfig(managerConfig), chainDefs(chainDefs), connectivityManager(connectivityManager), reserved(reserved)
394  {}
395 
396 private:
397  std::string managerName;
398  std::string transportPluginName;
399  const map_t &managerConfig;
400  const map_t &chainDefs;
402  void* connectivityManager;
404  void* reserved;
405 };
406 
407 
408 
425 template<typename TRANSPORT>
426 class AbstractChainManager
427 {
428 
429 public:
430 
432  typedef TRANSPORT transport_t;
433 
436  :
437  logger("connectivity." + params.getManagerName() + "." + (params.getManagerName() == params.getTransportPluginName() ? "manager" : params.getTransportPluginName())),
438  managerName(params.getManagerName()), transportPluginName(params.getTransportPluginName()),
439  chainDefinitions(converters_impl::convertChainDefinitions(params.chainDefs)),
440  host(params.connectivityManager, this),
441  config(params.getManagerConfig().copy()),
442  statusReporter(new StatusReporter(params.connectivityManager))
443  {}
446 
452  virtual void start() {}
453 
468  virtual void shutdown() {}
469 
474  return *statusReporter;
475  }
476 
484  const Logger logger;
485 
489  const std::string managerName;
490 
494  const std::string transportPluginName;
495 
496 protected:
497  // typedefs for templated classes that are useful when writing subclasses
498 
500  typedef typename Chain<TRANSPORT>::ptr_t chain_t;
503 
505  ChainDefinitions chainDefinitions;
506 
511 
514 
519  if(chainDefinitions.size() != 1) throw std::runtime_error("This manager only supports a single chain definition, but there was not exactly one specified.");
520  return chainDefinitions.begin()->second;
521  }
526  const ChainDefinition &getChainDefinition(const std::string &str) {
527  auto it = chainDefinitions.find(str);
528  if(it == chainDefinitions.end()) throw std::runtime_error("Cannot find chain definition "+str);
529  return it->second;
530  }
531 
532 private:
533  std::unique_ptr<StatusReporter> statusReporter;
534 };
535 
536 
537 // ***********************************************************************************************
538 
539 
540 
541 }}
542 
543 namespace connectivity { namespace chainmanagers { using namespace _DATAT_INTERNAL_CPP_NAMESPACE::chainmanagers; } }
544 
545 }} // com.softwareag.connectivity.chainmanagers
546 
547 #include <sag_internal/chain_managers.hpp>
548 
549 
562 #define SAG_DECLARE_CONNECTIVITY_TRANSPORT_CHAIN_MANAGER_CLASS(Class) _SAG_DECLARE_CONNECTIVITY_TRANSPORT_CHAIN_MANAGER_CLASS(Class)
563 
564 #endif // _SAG_CONNECTIVITY_CHAIN_MANAGERS_HPP_
const ChainManagerHost< TRANSPORT > host
The interface through which the host provides services required by chain managers.
Definition: sag_connectivity_chain_managers.hpp:510
API for Chain Manager to listen for changes in channel subscriptions within the correlator.
Definition: sag_connectivity_chain_managers.hpp:41
Base class for transport chain manager plug-ins.
Definition: sag_connectivity_chain_managers.hpp:102
Chain & start()
Calls start on all of the plug-ins in the chain and connects the chain to receive events.
Definition: sag_connectivity_chain_managers.hpp:221
API provided to Chain Managers for callbacks into the host, and for operations such as creating chain...
Definition: sag_connectivity_chain_managers.hpp:105
Direction
The enumeration indicating the direction of message flow - towards the transport or towards the host.
Definition: sag_connectivity_plugins.hpp:68
virtual void start()
Called after the chain manager is created to indicate that it can make external connections,...
Definition: sag_connectivity_chain_managers.hpp:452
Class for writing to the system logger.
Definition: sag_plugin_logging.hpp:71
map_t config
The configuration of this plug-in.
Definition: sag_connectivity_chain_managers.hpp:513
const std::string & getManagerName() const
Definition: sag_connectivity_chain_managers.hpp:382
A container for parameters passed to the constructor of a managed transport (i.e.
Definition: sag_connectivity_chain_managers.hpp:329
virtual ~AbstractChainManager()
Virtual destructor.
Definition: sag_connectivity_chain_managers.hpp:445
const ChainDefinition & getChainDefinition(const std::string &str)
Helper method to get the chain definition containing the transport associated with this manager,...
Definition: sag_connectivity_chain_managers.hpp:526
Chain< TRANSPORT >::ptr_t chain_t
The chain that this host can create.
Definition: sag_connectivity_chain_managers.hpp:500
AbstractChainManager(const ChainManagerConstructorParameters &params)
Give the manager instance name, a host object for callbacks, the configuration specified in managerCo...
Definition: sag_connectivity_chain_managers.hpp:435
const std::string managerName
The name used in the configuration for this dynamic chain manager.
Definition: sag_connectivity_chain_managers.hpp:489
const ChainDefinition & getChainDefinition()
Helper method to get the chain definition containing the transport associated with this manager,...
Definition: sag_connectivity_chain_managers.hpp:518
A class allowing a plug-in to report status values to the host.
Definition: sag_connectivity_plugins.hpp:221
const map_t & getManagerConfig() const
Get the configuration for this plug-in.
Definition: sag_connectivity_chain_managers.hpp:377
A list class which implements many of the functions on std::vector.
Definition: sag_connectivity_cpp.hpp:33
const map_t & getTransportConfig() const
Definition: sag_connectivity_chain_managers.hpp:155
const Logger logger
A logger to be used by this manager for anything which needs to be written to the host's log.
Definition: sag_connectivity_chain_managers.hpp:484
A plug-in configuration.
Definition: sag_connectivity_chain_managers.hpp:112
Contains the headers needed to implement your own Connectivity Plug-ins.
virtual ~ChannelLifecycleListener()
Virtual destructor.
Definition: sag_connectivity_chain_managers.hpp:45
const std::string & getTransportPluginName() const
Definition: sag_connectivity_chain_managers.hpp:387
const std::string & getDefaultChannelTowardsHost() const
Get the default channel used on this chain for messages sent from the transport towards the host.
Definition: sag_connectivity_chain_managers.hpp:344
const std::string & getPluginName() const
while we override the copy behaviour, no change to destructor
Definition: sag_connectivity_chain_managers.hpp:129
const map_t & getConfiguration() const
Definition: sag_connectivity_chain_managers.hpp:132
ChainDefinitions chainDefinitions
string-> map of chain definitions
Definition: sag_connectivity_chain_managers.hpp:505
A map class which implements many of the functions on std::map.
Definition: sag_connectivity_cpp.hpp:35
const std::string & getChainId()
Get the unique identifier of this chain.
Definition: sag_connectivity_chain_managers.hpp:235
const std::string transportPluginName
The name used in the configuration for the transport plug-in that created this chain manager.
Definition: sag_connectivity_chain_managers.hpp:494
virtual void shutdown()
Release any resources created by the connectivity plug-in, and terminate and join any background thre...
Definition: sag_connectivity_chain_managers.hpp:468
map_t copy() const
Return a deep copy of this map.
Definition: sag_connectivity_cpp.hpp:242
Represents a dynamic chain instance, created using ChainManagerHost.createChain.
Definition: sag_connectivity_chain_managers.hpp:175
StatusReporter & getStatusReporter()
Allows reporting status information from this plug-in, such as connected/disconnected status and numb...
Definition: sag_connectivity_chain_managers.hpp:473
A container for parameters passed to the constructor of a transport plug-in.
Definition: sag_connectivity_plugins.hpp:182
const list_t & getSubscribeChannels() const
Get the channels that the chain containing this transport instance is subscribed to for receiving mes...
Definition: sag_connectivity_chain_managers.hpp:337
A container for parameters passed to the constructor of a dynamic chain manager.
Definition: sag_connectivity_chain_managers.hpp:366
const Chain & start() const
Calls start on all of the plug-ins in the chain and connects the chain to receive events.
Definition: sag_connectivity_chain_managers.hpp:202
AbstractChainManager< TRANSPORT > abstract_chain_manager_t
The base class.
Definition: sag_connectivity_chain_managers.hpp:502
A dynamic chain definition, providing the configuration for each plug-in in a chain.
Definition: sag_connectivity_chain_managers.hpp:148
TRANSPORT * getTransport() const
Get the transport plug-in at the end of the chain.
Definition: sag_connectivity_chain_managers.hpp:229
TRANSPORT transport_t
The transport that this host can create chains for.
Definition: sag_connectivity_chain_managers.hpp:432