Apama  10.2.0.3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
sag_connectivity_chain_managers.hpp
Go to the documentation of this file.
1 /*
2  * $Copyright (c) 2015-2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.$
3  * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG
4  * @Version: $Id: sag_connectivity_chain_managers.hpp 320870 2017-12-01 12:50:59Z bsp $
5  */
6 
7 
8 #ifndef _SAG_CONNECTIVITY_CHAIN_MANAGERS_HPP_
9 #define _SAG_CONNECTIVITY_CHAIN_MANAGERS_HPP_
10 
12 #include <memory>
13 #include <unordered_map>
14 #include <vector>
15 
20 namespace com {
21 namespace softwareag {
22 namespace connectivity {
23 namespace chainmanagers {
24 
43 {
44 public:
81  virtual void onChannelCreated(const std::string &channel, Direction direction) = 0;
82 
99  virtual void onChannelDestroyed(const std::string &channel, Direction direction) = 0;
100 };
101 
102 template<typename TRANSPORT>
103 class AbstractChainManager; // forward decl for Host friend
104 
105 template<typename TRANSPORT>
106 class ChainManagerHost; // forward decl for Chain friend
107 
114  std::string pluginName;
115  map_t config;
116  PluginConfiguration(const std::string &pluginName, const map_t &config) : pluginName(pluginName), config(config.copy()) {}
117  PluginConfiguration(const PluginConfiguration &other) : pluginName(other.pluginName), config(other.config.copy()) {}
118  PluginConfiguration &operator=(const PluginConfiguration &other) {
119  if(this != &other) {
120  pluginName = other.pluginName;
121  config=other.config.copy();
122  }
123  return *this;
124  }
125  PluginConfiguration(PluginConfiguration&& other) = default;
126  PluginConfiguration &operator=(PluginConfiguration &&other) = default;
127  ~PluginConfiguration() = default;
128 
130  const std::string& getPluginName() const { return pluginName; }
131 
133  const map_t &getConfiguration() const { return config; }
134 };
135 
136 
149 struct ChainDefinition : public std::vector<PluginConfiguration> {
150  typedef std::vector<PluginConfiguration> vector_t;
151  ChainDefinition() {}
156  const map_t& getTransportConfig() const {
157  return back().getConfiguration();
158  }
159 };
160 
161 typedef std::unordered_map<std::string, ChainDefinition> ChainDefinitions;
162 
163 namespace converters_impl {
164 ChainDefinitions convertChainDefinitions(const map_t &chainDefinitions);
165 list_t convertChainDefinition(const ChainDefinition& chainDefinition);
166 }
167 
168 
175 template<typename TRANSPORT>
176 class Chain
177 {
178  // so host can call the private constructor
179  friend class ChainManagerHost<TRANSPORT>;
180  // not copyable - this is a singleton ptr, but does not own the lifetime.
181  Chain(const Chain &other) = delete;
182  Chain &operator=(const Chain &other) = delete;
183  Chain(const Chain &&other) = delete;
184  Chain &operator=(const Chain &&other) = delete;
185  void startImpl() const;
186 public:
187  ~Chain() = default; // default destructor is OK, though @private
203  const Chain &start() const {
204  startImpl();
205  return *this;
206  }
223  startImpl();
224  return *this;
225  }
227  void destroy();
228 
230  TRANSPORT *getTransport() const {
231  if (!transport) throw std::runtime_error("Cannot call getTransport after chain has been destroyed");
232  return transport;
233  }
234 
236  const std::string &getChainId() { return chainId; }
237 
238  typedef std::shared_ptr<Chain> ptr_t;
239 
240 
241 private:
242  Chain(TRANSPORT* transport, void* connectivityManager, const std::string &chainId, const std::string &managerName)
243  :transport(transport), connectivityManager(connectivityManager), chainId(chainId), managerName(managerName)
244  {}
245 
246  TRANSPORT* transport;
247  void* connectivityManager;
248  std::string chainId;
249  // just for error messages for now
250  std::string managerName;
251 };
252 
258 template<typename TRANSPORT>
259 class ChainManagerHost
260 {
261  // give friend status to ChainManager so it can get the hostptr
262  friend class AbstractChainManager<TRANSPORT>;
263 
265  ChainManagerHost(void *connectivityManager, const AbstractChainManager<TRANSPORT> *chainManager): connectivityManager(connectivityManager), chainManager(chainManager) {}
266 
267 public:
284  template<typename ... ARGS>
285  typename Chain<TRANSPORT>::ptr_t createChain(const std::string &chainInstanceIdSuffix, const std::string &defaultChannelTowardsHost, const list_t &subscribeChannels, const ChainDefinition &chainDefinition, const map_t &substitutions, ARGS ... transportConstructorArgs) const;
286 
298  void addChannelLifecycleListener(ChannelLifecycleListener *listener, const std::string &channelPrefix) const;
299 private:
300 
301  template<typename TUPLE>
302  typename Chain<TRANSPORT>::ptr_t createChainImpl(const std::string &chainInstanceIdSuffix, const std::string &defaultChannelTowardsHost, const list_t &subscribeChannels, const ChainDefinition &chainDefinition, const map_t &substitutions, const TUPLE& tuple) const;
303 
305  static sag_error_t deleteChannelLifecycleListener(sag_plugin_t callback);
307  static sag_error_t callOnChannelCreated(const char *managerLogName, sag_plugin_t callback, const char*, bool);
309  static sag_error_t callOnChannelDestroyed(const char *managerLogName, sag_plugin_t callback, const char*, bool);
311  void *connectivityManager;
313  const AbstractChainManager<TRANSPORT> *chainManager;
314 };
315 
316 
327 {
328 public:
334  const list_t &getSubscribeChannels() const { return subscribeChannels; }
335 
341  const std::string &getDefaultChannelTowardsHost() const { return defaultChannelTowardsHost; }
342 
346  ManagedTransportConstructorParameters(const std::string &pluginName, const std::string &chainId, const map_t &config, void* connectivityManager,
347  const std::string &defaultChannelTowardsHost, const list_t subscribeChannels, void* reserved)
348  : TransportConstructorParameters(pluginName, chainId, config, connectivityManager, reserved),
349  defaultChannelTowardsHost(defaultChannelTowardsHost), subscribeChannels(subscribeChannels.copy())
350  {}
351 
352 private:
353  std::string defaultChannelTowardsHost;
354  list_t subscribeChannels;
355 };
356 
364 {
365  template <typename TRANSPORT> friend class AbstractChainManager;
366  // not copyable: ChainManagers should use the config, etc that are their field members.
368  ChainManagerConstructorParameters& operator=(const ChainManagerConstructorParameters &params) = delete;
369 public:
374  const map_t &getManagerConfig() const { return managerConfig; }
375 
379  const std::string &getManagerName() const { return managerName; }
380 
384  const std::string &getTransportPluginName() const { return transportPluginName; }
385 
389  ChainManagerConstructorParameters(const map_t &managerConfig, const std::string &managerName, const std::string &transportPluginName, void* connectivityManager, const map_t &chainDefs, void* reserved)
390  : managerName(managerName), transportPluginName(transportPluginName), managerConfig(managerConfig), chainDefs(chainDefs), connectivityManager(connectivityManager), reserved(reserved)
391  {}
392 
393 private:
394  std::string managerName;
395  std::string transportPluginName;
396  const map_t &managerConfig;
397  const map_t &chainDefs;
399  void* connectivityManager;
401  void* reserved;
402 };
403 
404 
405 
422 template<typename TRANSPORT>
423 class AbstractChainManager
424 {
425 
426 public:
427 
429  typedef TRANSPORT transport_t;
430 
433  :
434  logger("connectivity." + params.getManagerName() + "." + (params.getManagerName() == params.getTransportPluginName() ? "manager" : params.getTransportPluginName())),
435  managerName(params.getManagerName()), transportPluginName(params.getTransportPluginName()),
436  chainDefinitions(std::move(converters_impl::convertChainDefinitions(params.chainDefs))),
437  host(params.connectivityManager, this),
438  config(params.getManagerConfig().copy()),
439  statusReporter(new StatusReporter(params.connectivityManager))
440  {}
443 
449  virtual void start() {}
450 
465  virtual void shutdown() {}
466 
471  return *statusReporter;
472  }
473 
481  const Logger logger;
482 
486  const std::string managerName;
487 
491  const std::string transportPluginName;
492 
493 protected:
494  // typedefs for templated classes that are useful when writing subclasses
495 
497  typedef typename Chain<TRANSPORT>::ptr_t chain_t;
500 
502  ChainDefinitions chainDefinitions;
503 
508 
511 
516  if(chainDefinitions.size() != 1) throw std::runtime_error("This manager only supports a single chain definition, but there was not exactly one specified.");
517  return chainDefinitions.begin()->second;
518  }
523  const ChainDefinition &getChainDefinition(const std::string &str) {
524  auto it = chainDefinitions.find(str);
525  if(it == chainDefinitions.end()) throw std::runtime_error("Cannot find chain definition "+str);
526  return it->second;
527  }
528 
529 private:
530  std::unique_ptr<StatusReporter> statusReporter;
531 };
532 
533 
534 // ***********************************************************************************************
535 
536 
537 
538 }}}} // com.softwareag.connectivity.chainmanagers
539 
540 #include <sag_internal/chain_managers.hpp>
541 
542 
555 #define SAG_DECLARE_CONNECTIVITY_TRANSPORT_CHAIN_MANAGER_CLASS(Class) _SAG_DECLARE_CONNECTIVITY_TRANSPORT_CHAIN_MANAGER_CLASS(Class)
556 
557 #endif // _SAG_CONNECTIVITY_CHAIN_MANAGERS_HPP_
API provided to Chain Managers for callbacks into the host, and for operations such as creating chain...
Definition: sag_connectivity_chain_managers.hpp:106
API for Chain Manager to listen for changes in channel subscriptions within the correlator.
Definition: sag_connectivity_chain_managers.hpp:42
const map_t & getTransportConfig() const
Definition: sag_connectivity_chain_managers.hpp:156
Chain & start()
Calls start on all of the plug-ins in the chain and connects the chain to receive events...
Definition: sag_connectivity_chain_managers.hpp:222
const std::string & getTransportPluginName() const
Definition: sag_connectivity_chain_managers.hpp:384
map_t config
The configuration of this plug-in.
Definition: sag_connectivity_chain_managers.hpp:510
Direction
The enumeration indicating the direction of message flow - towards the transport or towards the host...
Definition: sag_connectivity_plugins.hpp:69
AbstractChainManager< TRANSPORT > abstract_chain_manager_t
The base class.
Definition: sag_connectivity_chain_managers.hpp:499
Class for writing to the system logger.
Definition: sag_plugin_logging.hpp:72
virtual void onChannelCreated(const std::string &channel, Direction direction)=0
Called by host when a channel is created which starts with the prefix provided during listener regist...
ChainDefinitions chainDefinitions
string-> map of chain definitions
Definition: sag_connectivity_chain_managers.hpp:502
const std::string transportPluginName
The name used in the configuration for the transport plug-in that created this chain manager...
Definition: sag_connectivity_chain_managers.hpp:491
TRANSPORT transport_t
The transport that this host can create chains for.
Definition: sag_connectivity_chain_managers.hpp:429
A container for parameters passed to the constructor of a managed transport (i.e. ...
Definition: sag_connectivity_chain_managers.hpp:326
const Chain & start() const
Calls start on all of the plug-ins in the chain and connects the chain to receive events...
Definition: sag_connectivity_chain_managers.hpp:203
Chain< TRANSPORT >::ptr_t chain_t
The chain that this host can create.
Definition: sag_connectivity_chain_managers.hpp:497
virtual void shutdown()
Release any resources created by the connectivity plug-in, and terminate and join any background thre...
Definition: sag_connectivity_chain_managers.hpp:465
STL namespace.
Definition: sag_connectivity_threading.h:178
const ChainDefinition & getChainDefinition(const std::string &str)
Helper method to get the chain definition containing the transport associated with this manager...
Definition: sag_connectivity_chain_managers.hpp:523
StatusReporter & getStatusReporter()
Allows reporting status information from this plug-in, such as connected/disconnected status and numb...
Definition: sag_connectivity_chain_managers.hpp:470
const std::string & getManagerName() const
Definition: sag_connectivity_chain_managers.hpp:379
A class allowing a plug-in to report status values to the host.
Definition: sag_connectivity_plugins.hpp:220
Chain< TRANSPORT >::ptr_t createChain(const std::string &chainInstanceIdSuffix, const std::string &defaultChannelTowardsHost, const list_t &subscribeChannels, const ChainDefinition &chainDefinition, const map_t &substitutions, ARGS...transportConstructorArgs) const
Create a chain instance programmatically.
A list class which implements many of the functions on std::vector.
Definition: sag_connectivity_cpp.hpp:34
A plug-in configuration.
Definition: sag_connectivity_chain_managers.hpp:113
Contains the headers needed to implement your own Connectivity Plug-ins.
virtual ~ChannelLifecycleListener()
Virtual destructor.
Definition: sag_connectivity_chain_managers.hpp:46
const map_t & getManagerConfig() const
Get the configuration for this plug-in.
Definition: sag_connectivity_chain_managers.hpp:374
Base class for transport chain manager plug-ins.
Definition: sag_connectivity_chain_managers.hpp:103
virtual ~AbstractChainManager()
Virtual destructor.
Definition: sag_connectivity_chain_managers.hpp:442
void addChannelLifecycleListener(ChannelLifecycleListener *listener, const std::string &channelPrefix) const
Register a callback handler for subscribe/unsubscribe and send-to notifications.
virtual void start()
Called after the chain manager is created to indicate that it can make external connections, create threads and create chains.
Definition: sag_connectivity_chain_managers.hpp:449
A map class which implements many of the functions on std::map.
Definition: sag_connectivity_cpp.hpp:36
const std::string & getChainId()
Get the unique identifier of this chain.
Definition: sag_connectivity_chain_managers.hpp:236
TRANSPORT * getTransport() const
Get the transport plug-in at the end of the chain.
Definition: sag_connectivity_chain_managers.hpp:230
~PluginConfiguration()=default
while we override the copy behaviour, no change to move behaviour
const list_t & getSubscribeChannels() const
Get the channels that the chain containing this transport instance is subscribed to for receiving mes...
Definition: sag_connectivity_chain_managers.hpp:334
Represents a dynamic chain instance, created using ChainManagerHost.createChain.
Definition: sag_connectivity_chain_managers.hpp:176
const ChainDefinition & getChainDefinition()
Helper method to get the chain definition containing the transport associated with this manager...
Definition: sag_connectivity_chain_managers.hpp:515
A container for parameters passed to the constructor of a transport plug-in.
Definition: sag_connectivity_plugins.hpp:183
const std::string & getDefaultChannelTowardsHost() const
Get the default channel used on this chain for messages sent from the transport towards the host...
Definition: sag_connectivity_chain_managers.hpp:341
const Logger logger
A logger to be used by this manager for anything which needs to be written to the host's log...
Definition: sag_connectivity_chain_managers.hpp:481
const std::string & getPluginName() const
while we override the copy behaviour, no change to destructor
Definition: sag_connectivity_chain_managers.hpp:130
map_t copy() const
Return a deep copy of this map.
Definition: sag_connectivity_cpp.hpp:211
void destroy()
Shutdown and destroy a chain.
const std::string managerName
The name used in the configuration for this dynamic chain manager.
Definition: sag_connectivity_chain_managers.hpp:486
virtual void onChannelDestroyed(const std::string &channel, Direction direction)=0
Called by host when a channel is destroyed which starts with the prefix provided during listener regi...
const map_t & getConfiguration() const
Definition: sag_connectivity_chain_managers.hpp:133
A container for parameters passed to the constructor of a dynamic chain manager.
Definition: sag_connectivity_chain_managers.hpp:363
AbstractChainManager(const ChainManagerConstructorParameters &params)
Give the manager instance name, a host object for callbacks, the configuration specified in managerCo...
Definition: sag_connectivity_chain_managers.hpp:432
A dynamic chain definition, providing the configuration for each plug-in in a chain.
Definition: sag_connectivity_chain_managers.hpp:149
const ChainManagerHost< TRANSPORT > host
The interface through which the host provides services required by chain managers.
Definition: sag_connectivity_chain_managers.hpp:507