Apama  10.5.4.1
sag_connectivity_chain_managers.hpp
Go to the documentation of this file.
1 /*
2  * $Copyright (c) 2015-2018, 2020 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.$
3  * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG
4  * @Version: $Id: sag_connectivity_chain_managers.hpp 366779 2020-02-05 10:42:04Z matj $
5  */
6 
7 
8 #ifndef _SAG_CONNECTIVITY_CHAIN_MANAGERS_HPP_
9 #define _SAG_CONNECTIVITY_CHAIN_MANAGERS_HPP_
10 
12 #include <memory>
13 #include <unordered_map>
14 #include <vector>
15 
20 namespace com {
21 namespace softwareag {
22 namespace _DATAT_INTERNAL_CPP_NAMESPACE {
23 namespace chainmanagers {
24 
43 {
44 public:
81  virtual void onChannelCreated(const std::string &channel, Direction direction) = 0;
82 
99  virtual void onChannelDestroyed(const std::string &channel, Direction direction) = 0;
100 };
101 
102 template<typename TRANSPORT>
103 class AbstractChainManager; // forward decl for Host friend
104 
105 template<typename TRANSPORT>
106 class ChainManagerHost; // forward decl for Chain friend
107 
114  std::string pluginName;
115  map_t config;
116  PluginConfiguration(const std::string &pluginName, const map_t &config) : pluginName(pluginName), config(config.copy()) {}
117  PluginConfiguration(const PluginConfiguration &other) : pluginName(other.pluginName), config(other.config.copy()) {}
118  PluginConfiguration &operator=(const PluginConfiguration &other) {
119  if(this != &other) {
120  pluginName = other.pluginName;
121  config=other.config.copy();
122  }
123  return *this;
124  }
125  PluginConfiguration(PluginConfiguration&& other) = default;
126  PluginConfiguration &operator=(PluginConfiguration &&other) = default;
127  ~PluginConfiguration() = default;
128 
130  const std::string& getPluginName() const { return pluginName; }
131 
133  const map_t &getConfiguration() const { return config; }
134 };
135 
136 
149 struct ChainDefinition : public std::vector<PluginConfiguration> {
150  typedef std::vector<PluginConfiguration> vector_t;
151  ChainDefinition() {}
156  const map_t& getTransportConfig() const {
157  return back().getConfiguration();
158  }
159 };
160 
161 typedef std::unordered_map<std::string, ChainDefinition> ChainDefinitions;
162 
163 namespace converters_impl {
164 ChainDefinitions convertChainDefinitions(const map_t &chainDefinitions);
165 list_t convertChainDefinition(const ChainDefinition& chainDefinition);
166 }
167 
168 
175 template<typename TRANSPORT>
176 class Chain
177 {
178  // so host can call the private constructor
179  friend class ChainManagerHost<TRANSPORT>;
180  // not copyable - this is a singleton ptr, but does not own the lifetime.
181  Chain(const Chain &other) = delete;
182  Chain &operator=(const Chain &other) = delete;
183  Chain(const Chain &&other) = delete;
184  Chain &operator=(const Chain &&other) = delete;
185  void startImpl() const;
186 public:
187  ~Chain() = default; // default destructor is OK, though @private
203  const Chain &start() const {
204  startImpl();
205  return *this;
206  }
223  startImpl();
224  return *this;
225  }
227  void destroy();
228 
230  TRANSPORT *getTransport() const {
231  if (!transport) throw std::runtime_error("Cannot call getTransport after chain has been destroyed");
232  return transport;
233  }
234 
236  const std::string &getChainId() { return chainId; }
237 
238  typedef std::shared_ptr<Chain> ptr_t;
239 
240 
241 private:
242  Chain(TRANSPORT* transport, void* connectivityManager, const std::string &chainId, const std::string &managerName)
243  :transport(transport), connectivityManager(connectivityManager), chainId(chainId), managerName(managerName)
244  {}
245 
246  TRANSPORT* transport;
247  void* connectivityManager;
248  std::string chainId;
249  // just for error messages for now
250  std::string managerName;
251 };
252 
258 template<typename TRANSPORT>
259 class ChainManagerHost
260 {
261  // give friend status to ChainManager so it can get the hostptr
262  friend class AbstractChainManager<TRANSPORT>;
263 
265  ChainManagerHost(void *connectivityManager, const AbstractChainManager<TRANSPORT> *chainManager): connectivityManager(connectivityManager), chainManager(chainManager) {}
266 
267 public:
288  template<typename ... ARGS>
289  typename Chain<TRANSPORT>::ptr_t createChain(const std::string &chainInstanceIdSuffix, const std::string &defaultChannelTowardsHost, const list_t &subscribeChannels, const ChainDefinition &chainDefinition, const map_t &substitutions, ARGS ... transportConstructorArgs) const;
290 
302  void addChannelLifecycleListener(ChannelLifecycleListener *listener, const std::string &channelPrefix) const;
303 private:
304 
305  template<typename TUPLE>
306  typename Chain<TRANSPORT>::ptr_t createChainImpl(const std::string &chainInstanceIdSuffix, const std::string &defaultChannelTowardsHost, const list_t &subscribeChannels, const ChainDefinition &chainDefinition, const map_t &substitutions, const TUPLE& tuple) const;
307 
309  static sag_error_t deleteChannelLifecycleListener(sag_plugin_t callback);
311  static sag_error_t callOnChannelCreated(const char *managerLogName, sag_plugin_t callback, const char*, bool);
313  static sag_error_t callOnChannelDestroyed(const char *managerLogName, sag_plugin_t callback, const char*, bool);
315  void *connectivityManager;
317  const AbstractChainManager<TRANSPORT> *chainManager;
318 };
319 
320 
331 {
332 public:
338  const list_t &getSubscribeChannels() const { return subscribeChannels; }
339 
345  const std::string &getDefaultChannelTowardsHost() const { return defaultChannelTowardsHost; }
346 
350  ManagedTransportConstructorParameters(const std::string &pluginName, const std::string &chainId, const map_t &config, void* connectivityManager,
351  const std::string &defaultChannelTowardsHost, const list_t subscribeChannels, void* reserved)
352  : TransportConstructorParameters(pluginName, chainId, config, connectivityManager, reserved),
353  defaultChannelTowardsHost(defaultChannelTowardsHost), subscribeChannels(subscribeChannels.copy())
354  {}
355 
356 private:
357  std::string defaultChannelTowardsHost;
358  list_t subscribeChannels;
359 };
360 
368 {
369  template <typename TRANSPORT> friend class AbstractChainManager;
370  // not copyable: ChainManagers should use the config, etc that are their field members.
372  ChainManagerConstructorParameters& operator=(const ChainManagerConstructorParameters &params) = delete;
373 public:
378  const map_t &getManagerConfig() const { return managerConfig; }
379 
383  const std::string &getManagerName() const { return managerName; }
384 
388  const std::string &getTransportPluginName() const { return transportPluginName; }
389 
393  ChainManagerConstructorParameters(const map_t &managerConfig, const std::string &managerName, const std::string &transportPluginName, void* connectivityManager, const map_t &chainDefs, void* reserved)
394  : managerName(managerName), transportPluginName(transportPluginName), managerConfig(managerConfig), chainDefs(chainDefs), connectivityManager(connectivityManager), reserved(reserved)
395  {}
396 
397 private:
398  std::string managerName;
399  std::string transportPluginName;
400  const map_t &managerConfig;
401  const map_t &chainDefs;
403  void* connectivityManager;
405  void* reserved;
406 };
407 
408 
409 
426 template<typename TRANSPORT>
427 class AbstractChainManager
428 {
429 
430 public:
431 
433  typedef TRANSPORT transport_t;
434 
437  :
438  logger("connectivity." + params.getManagerName() + "." + (params.getManagerName() == params.getTransportPluginName() ? "manager" : params.getTransportPluginName())),
439  managerName(params.getManagerName()), transportPluginName(params.getTransportPluginName()),
440  chainDefinitions(std::move(converters_impl::convertChainDefinitions(params.chainDefs))),
441  host(params.connectivityManager, this),
442  config(params.getManagerConfig().copy()),
443  statusReporter(new StatusReporter(params.connectivityManager))
444  {}
447 
453  virtual void start() {}
454 
469  virtual void shutdown() {}
470 
475  return *statusReporter;
476  }
477 
485  const Logger logger;
486 
490  const std::string managerName;
491 
495  const std::string transportPluginName;
496 
497 protected:
498  // typedefs for templated classes that are useful when writing subclasses
499 
501  typedef typename Chain<TRANSPORT>::ptr_t chain_t;
504 
506  ChainDefinitions chainDefinitions;
507 
512 
515 
520  if(chainDefinitions.size() != 1) throw std::runtime_error("This manager only supports a single chain definition, but there was not exactly one specified.");
521  return chainDefinitions.begin()->second;
522  }
527  const ChainDefinition &getChainDefinition(const std::string &str) {
528  auto it = chainDefinitions.find(str);
529  if(it == chainDefinitions.end()) throw std::runtime_error("Cannot find chain definition "+str);
530  return it->second;
531  }
532 
533 private:
534  std::unique_ptr<StatusReporter> statusReporter;
535 };
536 
537 
538 // ***********************************************************************************************
539 
540 
541 
542 }}
543 
544 namespace connectivity { namespace chainmanagers { using namespace _DATAT_INTERNAL_CPP_NAMESPACE::chainmanagers; } }
545 
546 }} // com.softwareag.connectivity.chainmanagers
547 
548 #include <sag_internal/chain_managers.hpp>
549 
550 
563 #define SAG_DECLARE_CONNECTIVITY_TRANSPORT_CHAIN_MANAGER_CLASS(Class) _SAG_DECLARE_CONNECTIVITY_TRANSPORT_CHAIN_MANAGER_CLASS(Class)
564 
565 #endif // _SAG_CONNECTIVITY_CHAIN_MANAGERS_HPP_
const ChainManagerHost< TRANSPORT > host
The interface through which the host provides services required by chain managers.
Definition: sag_connectivity_chain_managers.hpp:511
API for Chain Manager to listen for changes in channel subscriptions within the correlator.
Definition: sag_connectivity_chain_managers.hpp:42
Base class for transport chain manager plug-ins.
Definition: sag_connectivity_chain_managers.hpp:103
Chain & start()
Calls start on all of the plug-ins in the chain and connects the chain to receive events.
Definition: sag_connectivity_chain_managers.hpp:222
API provided to Chain Managers for callbacks into the host, and for operations such as creating chain...
Definition: sag_connectivity_chain_managers.hpp:106
Direction
The enumeration indicating the direction of message flow - towards the transport or towards the host.
Definition: sag_connectivity_plugins.hpp:69
virtual void start()
Called after the chain manager is created to indicate that it can make external connections,...
Definition: sag_connectivity_chain_managers.hpp:453
Class for writing to the system logger.
Definition: sag_plugin_logging.hpp:72
map_t config
The configuration of this plug-in.
Definition: sag_connectivity_chain_managers.hpp:514
const std::string & getManagerName() const
Definition: sag_connectivity_chain_managers.hpp:383
A container for parameters passed to the constructor of a managed transport (i.e.
Definition: sag_connectivity_chain_managers.hpp:330
virtual ~AbstractChainManager()
Virtual destructor.
Definition: sag_connectivity_chain_managers.hpp:446
const ChainDefinition & getChainDefinition(const std::string &str)
Helper method to get the chain definition containing the transport associated with this manager,...
Definition: sag_connectivity_chain_managers.hpp:527
Chain< TRANSPORT >::ptr_t chain_t
The chain that this host can create.
Definition: sag_connectivity_chain_managers.hpp:501
AbstractChainManager(const ChainManagerConstructorParameters &params)
Give the manager instance name, a host object for callbacks, the configuration specified in managerCo...
Definition: sag_connectivity_chain_managers.hpp:436
const std::string managerName
The name used in the configuration for this dynamic chain manager.
Definition: sag_connectivity_chain_managers.hpp:490
const ChainDefinition & getChainDefinition()
Helper method to get the chain definition containing the transport associated with this manager,...
Definition: sag_connectivity_chain_managers.hpp:519
A class allowing a plug-in to report status values to the host.
Definition: sag_connectivity_plugins.hpp:226
const map_t & getManagerConfig() const
Get the configuration for this plug-in.
Definition: sag_connectivity_chain_managers.hpp:378
A list class which implements many of the functions on std::vector.
Definition: sag_connectivity_cpp.hpp:34
const map_t & getTransportConfig() const
Definition: sag_connectivity_chain_managers.hpp:156
const Logger logger
A logger to be used by this manager for anything which needs to be written to the host's log.
Definition: sag_connectivity_chain_managers.hpp:485
A plug-in configuration.
Definition: sag_connectivity_chain_managers.hpp:113
Contains the headers needed to implement your own Connectivity Plug-ins.
virtual ~ChannelLifecycleListener()
Virtual destructor.
Definition: sag_connectivity_chain_managers.hpp:46
const std::string & getTransportPluginName() const
Definition: sag_connectivity_chain_managers.hpp:388
const std::string & getDefaultChannelTowardsHost() const
Get the default channel used on this chain for messages sent from the transport towards the host.
Definition: sag_connectivity_chain_managers.hpp:345
const std::string & getPluginName() const
while we override the copy behaviour, no change to destructor
Definition: sag_connectivity_chain_managers.hpp:130
const map_t & getConfiguration() const
Definition: sag_connectivity_chain_managers.hpp:133
ChainDefinitions chainDefinitions
string-> map of chain definitions
Definition: sag_connectivity_chain_managers.hpp:506
A map class which implements many of the functions on std::map.
Definition: sag_connectivity_cpp.hpp:36
const std::string & getChainId()
Get the unique identifier of this chain.
Definition: sag_connectivity_chain_managers.hpp:236
const std::string transportPluginName
The name used in the configuration for the transport plug-in that created this chain manager.
Definition: sag_connectivity_chain_managers.hpp:495
virtual void shutdown()
Release any resources created by the connectivity plug-in, and terminate and join any background thre...
Definition: sag_connectivity_chain_managers.hpp:469
map_t copy() const
Return a deep copy of this map.
Definition: sag_connectivity_cpp.hpp:235
Represents a dynamic chain instance, created using ChainManagerHost.createChain.
Definition: sag_connectivity_chain_managers.hpp:176
StatusReporter & getStatusReporter()
Allows reporting status information from this plug-in, such as connected/disconnected status and numb...
Definition: sag_connectivity_chain_managers.hpp:474
A container for parameters passed to the constructor of a transport plug-in.
Definition: sag_connectivity_plugins.hpp:183
const list_t & getSubscribeChannels() const
Get the channels that the chain containing this transport instance is subscribed to for receiving mes...
Definition: sag_connectivity_chain_managers.hpp:338
A container for parameters passed to the constructor of a dynamic chain manager.
Definition: sag_connectivity_chain_managers.hpp:367
const Chain & start() const
Calls start on all of the plug-ins in the chain and connects the chain to receive events.
Definition: sag_connectivity_chain_managers.hpp:203
AbstractChainManager< TRANSPORT > abstract_chain_manager_t
The base class.
Definition: sag_connectivity_chain_managers.hpp:503
A dynamic chain definition, providing the configuration for each plug-in in a chain.
Definition: sag_connectivity_chain_managers.hpp:149
TRANSPORT * getTransport() const
Get the transport plug-in at the end of the chain.
Definition: sag_connectivity_chain_managers.hpp:230
TRANSPORT transport_t
The transport that this host can create chains for.
Definition: sag_connectivity_chain_managers.hpp:433