Apama  9.10.0.4.289795
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
sag_connectivity_plugins.hpp
Go to the documentation of this file.
1 /*
2  * Title: sag_connectivity_plugins.hpp
3  * Description: C++ API for writing connectivity plugins
4  * $Copyright (c) 2015-2016 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.$
5  * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG
6  * @Version: $Id: sag_connectivity_plugins.hpp 272666 2016-01-11 13:30:16Z rit $
7  */
8 
14 #ifndef _SAG_CONNECTIVITY_PLUGINS_HPP_
15 #define _SAG_CONNECTIVITY_PLUGINS_HPP_
16 
17 #include <sag_connectivity_cpp.hpp>
18 #include <memory>
19 
20 namespace com {
21 namespace softwareag {
22 namespace connectivity {
23 
31 class Logger
32 {
33 public:
36 
38  Logger(const std::string &category): category(category) {}
46  template<level_t LEVEL, typename... ARGS>
47  void log(const char *format, ARGS... args) const;
48 
56  template<typename... ARGS>
57  void log(level_t LEVEL, const char *format, ARGS... args) const;
65  template<level_t LEVEL, typename... ARGS>
66  void log(const std::string &format, ARGS... args) const;
67 
75  template<typename... ARGS>
76  void log(level_t LEVEL, const std::string &format, ARGS... args) const;
77 
78 
80  bool isEnabled(level_t LEVEL) const
81  {
82  return LEVEL <= getLevel();
83  }
84 
86  level_t getLevel() const;
87 
89  bool isCritEnabled() const { return isEnabled(SAG_LOG_CRIT); }
91  bool isFatalEnabled() const { return isEnabled(SAG_LOG_FATAL); }
93  bool isErrorEnabled() const { return isEnabled(SAG_LOG_ERROR); }
95  bool isWarnEnabled() const { return isEnabled(SAG_LOG_WARN); }
97  bool isInfoEnabled() const { return isEnabled(SAG_LOG_INFO); }
99  bool isDebugEnabled() const { return isEnabled(SAG_LOG_DEBUG); }
101  bool isTraceEnabled() const { return isEnabled(SAG_LOG_TRACE); }
102 
104  template<typename... ARGS> void crit(const std::string &format, ARGS... args) const { log<SAG_LOG_CRIT>(format, args...); }
106  template<typename... ARGS> void fatal(const std::string &format, ARGS... args) const { log<SAG_LOG_FATAL>(format, args...); }
108  template<typename... ARGS> void error(const std::string &format, ARGS... args) const { log<SAG_LOG_ERROR>(format, args...); }
110  template<typename... ARGS> void warn(const std::string &format, ARGS... args) const { log<SAG_LOG_WARN>(format, args...); }
112  template<typename... ARGS> void info(const std::string &format, ARGS... args) const { log<SAG_LOG_INFO>(format, args...); }
114  template<typename... ARGS> void debug(const std::string &format, ARGS... args) const { log<SAG_LOG_DEBUG>(format, args...); }
116  template<typename... ARGS> void trace(const std::string &format, ARGS... args) const { log<SAG_LOG_TRACE>(format, args...); }
117 
119  template<typename... ARGS> void crit(const char *format, ARGS... args) const { log<SAG_LOG_CRIT>(format, args...); }
121  template<typename... ARGS> void fatal(const char *format, ARGS... args) const { log<SAG_LOG_FATAL>(format, args...); }
123  template<typename... ARGS> void error(const char *format, ARGS... args) const { log<SAG_LOG_ERROR>(format, args...); }
125  template<typename... ARGS> void warn(const char *format, ARGS... args) const { log<SAG_LOG_WARN>(format, args...); }
127  template<typename... ARGS> void info(const char *format, ARGS... args) const { log<SAG_LOG_INFO>(format, args...); }
129  template<typename... ARGS> void debug(const char *format, ARGS... args) const { log<SAG_LOG_DEBUG>(format, args...); }
131  template<typename... ARGS> void trace(const char *format, ARGS... args) const { log<SAG_LOG_TRACE>(format, args...); }
132 
133 private:
134  std::string category;
135 };
136 
155 class Plugin
156 {
157 public:
159  Plugin(const std::string &name, const std::string &chainId, const map_t &config)
160  : name(name), chainId(chainId), config(config.copy()), LOGGER(name+"."+chainId)
161  {}
163  virtual ~Plugin() {}
169  virtual void start() {}
174  virtual void hostReady() {}
190  virtual void shutdown() {}
192  const std::string &getName() const { return name; }
193 protected:
194  std::string name;
195  std::string chainId;
196  map_t config;
197 public:
198  Logger LOGGER;
199 };
200 
205 class HostSide
206 {
207 public:
209  virtual ~HostSide() {}
211  typedef std::auto_ptr<HostSide> ptr_t;
228  virtual void sendBatchTowardsHost(Message *start, Message *end) = 0;
229 };
230 
236 {
237 public:
238  RemoteHostSide(sag_plugin_t other, sag_send_fn_t fn): other(other), fn(fn) {}
239  virtual ~RemoteHostSide() {}
240  virtual void sendBatchTowardsHost(Message *start, Message *end);
241 private:
242  sag_plugin_t other;
243  sag_send_fn_t fn;
244 };
245 
251 {
252 public:
254  virtual ~TransportSide() {}
256  typedef std::auto_ptr<TransportSide> ptr_t;
273  virtual void sendBatchTowardsTransport(Message *start, Message *end) = 0;
274 };
275 
281 {
282 public:
283  RemoteTransportSide(sag_plugin_t other, sag_send_fn_t fn): other(other), fn(fn) {}
284  virtual ~RemoteTransportSide() {}
285  virtual void sendBatchTowardsTransport(Message *start, Message *end);
286 private:
287  sag_plugin_t other;
288  sag_send_fn_t fn;
289 };
290 
291 
320 class AbstractCodec: public Plugin, public HostSide, public TransportSide
321 {
322 public:
324  AbstractCodec(const std::string &name, const std::string &chainId, const map_t &config)
325  : Plugin(name, chainId, config)
326  {}
328  virtual void setNextTowardsHost(HostSide::ptr_t &&host)
329  {
330  hostSide = std::move(host);
331  }
334  {
335  transportSide = std::move(transport);
336  }
337 protected:
338  HostSide::ptr_t hostSide;
339  TransportSide::ptr_t transportSide;
340 };
341 
375 {
376 public:
378  AbstractTransport(const std::string &name, const std::string &chainId, const map_t &config)
379  : Plugin(name, chainId, config)
380  {}
382  virtual void setNextTowardsHost(HostSide::ptr_t &&host)
383  {
384  hostSide = std::move(host);
385  }
386 protected:
387  HostSide::ptr_t hostSide;
388 };
389 
397 {
398 public:
400  AbstractSimpleTransport(const std::string &name, const std::string &chainId, const map_t &config)
401  : AbstractTransport(name, chainId, config)
402  {}
407  {
408  for (Message *it = start; it != end; ++it) {
409  try {
410  if (it->getPayload().empty()) {
412  } else {
414  }
415  } catch (...) {
416  handleException(*it);
417  }
418  }
419  }
421  virtual void deliverMessageTowardsTransport(Message &msg) = 0;
424  {
425  // do nothing
426  }
442  virtual void handleException(Message &m)
443  {
444  try {
445  throw;
446  } catch (const std::exception &e) {
447  LOGGER.warn("Caught exception delivering message: %s", e.what());
448  } catch (...) {
449  LOGGER.warn("Caught unknown exception delivering message: %s");
450  }
451  }
452 };
453 
465 {
466 public:
468  AbstractSimpleCodec(const std::string &name, const std::string &chainId, const map_t &config)
469  : AbstractCodec(name, chainId, config)
470  {}
477  {
478  Message *curr = start;
479  for (Message *it = start; it != end; ++it) {
480  bool rv;
481  try {
482  if (it->getPayload().empty()) {
484  } else {
485  rv = transformMessageTowardsHost(*it);
486  }
487  } catch (...) {
488  rv = handleException(*it, false);
489  }
490  // if we keep it (and it didn't throw) swap it with the accumulator
491  if (rv) {
492  if (it != curr) it->swap(std::move(*curr)); // don't self-swap
493  ++curr;
494  }
495  }
496  if (hostSide.get()) hostSide->sendBatchTowardsHost(start, curr);
497  }
504  {
505  Message *curr = start;
506  for (Message *it = start; it != end; ++it) {
507  bool rv;
508  try {
509  // process the message
510  if (it->getPayload().empty()) {
512  } else {
514  }
515  } catch (...) {
516  rv = handleException(*it, true);
517  }
518  // if we keep it (and it didn't throw) swap it with the accumulator
519  if (rv) {
520  if (it != curr) it->swap(std::move(*curr)); // don't self-swap
521  ++curr;
522  }
523  }
524  if (transportSide.get()) transportSide->sendBatchTowardsTransport(start, curr);
525  }
526 
529  virtual bool transformMessageTowardsHost(Message &msg) = 0;
532  virtual bool transformMessageTowardsTransport(Message &msg) = 0;
537  {
538  // do nothing
539  return true;
540  }
545  {
546  // do nothing
547  return true;
548  }
569  virtual bool handleException(Message &m, bool towardsTransport)
570  {
571  try {
572  throw;
573  } catch (const std::exception &e) {
574  LOGGER.warn("Caught exception transforming message: %s", e.what());
575  } catch (...) {
576  LOGGER.warn("Caught unknown exception transforming message");
577  }
578  return false;
579  }
580 };
581 
582 }}} // com.softwareag.connectivity
583 
584 // internal implementation included from these files
585 #include <sag_internal/exception.hpp>
586 #include <sag_internal/remote_plugins.hpp>
587 #include <sag_internal/plugin_macros.hpp>
588 #include <sag_internal/logging.hpp>
589 
596 #define SAG_DECLARE_CONNECTIVITY_TRANSPORT(Class) _SAG_DECLARE_CONNECTIVITY_TRANSPORT(Class)
597 
604 #define SAG_DECLARE_CONNECTIVITY_CODEC(Class) _SAG_DECLARE_CONNECTIVITY_CODEC(Class)
605 
606 #endif // _SAG_CONNECTIVITY_PLUGINS_HPP_
Helper derivative of AbstractCodec implementing message-by-message transformation.
Definition: sag_connectivity_plugins.hpp:464
Helper derivative of AbstractTransport implementing message-by-message delivery.
Definition: sag_connectivity_plugins.hpp:396
Wrap the next plugin in order as a TransportSide object.
Definition: sag_connectivity_plugins.hpp:280
An interface to the next component (plugin or host) towards the host.
Definition: sag_connectivity_plugins.hpp:205
virtual void sendBatchTowardsHost(Message *start, Message *end)=0
Process or deliver a batch of messages heading towards the host.
bool isDebugEnabled() const
Returns true if DEBUG messages will be written to the log.
Definition: sag_connectivity_plugins.hpp:99
void debug(const std::string &format, ARGS...args) const
Log a message at DEBUG level.
Definition: sag_connectivity_plugins.hpp:114
bool isInfoEnabled() const
Returns true if INFO messages will be written to the log.
Definition: sag_connectivity_plugins.hpp:97
Fatal errors.
Definition: sag_connectivity_c.h:62
void warn(const char *format, ARGS...args) const
Log a message at WARN level.
Definition: sag_connectivity_plugins.hpp:125
Class for writing to the system logger.
Definition: sag_connectivity_plugins.hpp:31
void error(const char *format, ARGS...args) const
Log a message at ERROR level.
Definition: sag_connectivity_plugins.hpp:123
Trace.
Definition: sag_connectivity_c.h:67
Contains the C++ implementation of the underlying datatypes used by connectivity plugins and their ac...
bool isTraceEnabled() const
Returns true if TRACE messages will be written to the log.
Definition: sag_connectivity_plugins.hpp:101
bool isErrorEnabled() const
Returns true if ERROR messages will be written to the log.
Definition: sag_connectivity_plugins.hpp:93
virtual void sendBatchTowardsTransport(Message *start, Message *end)
Implements batch sending, calling deliverMessageTowardsTransport for each message individually...
Definition: sag_connectivity_plugins.hpp:406
Base of the inheritence tree for Connectivity plugins.
Definition: sag_connectivity_plugins.hpp:155
void warn(const std::string &format, ARGS...args) const
Log a message at WARN level.
Definition: sag_connectivity_plugins.hpp:110
sag_log_level_t level_t
The available log levels.
Definition: sag_connectivity_plugins.hpp:35
virtual void sendBatchTowardsTransport(Message *start, Message *end)=0
Process or deliver a batch of messages heading towards the transport.
void crit(const std::string &format, ARGS...args) const
Log a message at CRIT level.
Definition: sag_connectivity_plugins.hpp:104
Debug messages.
Definition: sag_connectivity_c.h:66
Critical log messages.
Definition: sag_connectivity_c.h:61
Definition: sag_connectivity_cpp.hpp:35
std::auto_ptr< TransportSide > ptr_t
Pointers to TransportSides should always be this ptr_t type, which is a std::auto_ptr.
Definition: sag_connectivity_plugins.hpp:256
void info(const char *format, ARGS...args) const
Log a message at INFO level.
Definition: sag_connectivity_plugins.hpp:127
virtual bool transformNullPayloadTowardsTransport(Message &msg)
Transform a message with a null payload in a transport-wards direction.
Definition: sag_connectivity_plugins.hpp:544
virtual void start()
Called when the plugin should be started up (after all plugins are connected together).
Definition: sag_connectivity_plugins.hpp:169
sag_log_level_t
Log levels for logging to the system log.
Definition: sag_connectivity_c.h:59
Non-fatal errors.
Definition: sag_connectivity_c.h:63
void fatal(const char *format, ARGS...args) const
Log a message at FATAL level.
Definition: sag_connectivity_plugins.hpp:121
virtual ~TransportSide()
Ensure virtual destruction.
Definition: sag_connectivity_plugins.hpp:254
Abstract class for Transports.
Definition: sag_connectivity_plugins.hpp:374
void error(const std::string &format, ARGS...args) const
Log a message at ERROR level.
Definition: sag_connectivity_plugins.hpp:108
Logger(const std::string &category)
Construct a logger for the given category.
Definition: sag_connectivity_plugins.hpp:38
AbstractSimpleTransport(const std::string &name, const std::string &chainId, const map_t &config)
Give the plugin instance name and the chain name.
Definition: sag_connectivity_plugins.hpp:400
Informational messages (default)
Definition: sag_connectivity_c.h:65
virtual void handleException(Message &m)
Handle an exception thrown while delivering a message.
Definition: sag_connectivity_plugins.hpp:442
bool isCritEnabled() const
Returns true if CRIT messages will be written to the log.
Definition: sag_connectivity_plugins.hpp:89
virtual void setNextTowardsTransport(TransportSide::ptr_t &&transport)
Set the next in the chain towards the transport.
Definition: sag_connectivity_plugins.hpp:333
Wrap the next plugin in order as a HostSide object.
Definition: sag_connectivity_plugins.hpp:235
virtual ~HostSide()
Ensure virtual destruction.
Definition: sag_connectivity_plugins.hpp:209
virtual void hostReady()
Called when the host indicates it is ready to start receiving input (sends will be queued until this ...
Definition: sag_connectivity_plugins.hpp:174
bool isEnabled(level_t LEVEL) const
Returns true if messages at the given level will be written to the log file.
Definition: sag_connectivity_plugins.hpp:80
Abstract class for Codecs.
Definition: sag_connectivity_plugins.hpp:320
void trace(const char *format, ARGS...args) const
Log a message at TRACE level.
Definition: sag_connectivity_plugins.hpp:131
virtual bool transformMessageTowardsTransport(Message &msg)=0
Transform an individual message in a transport-wards direction Return true to keep the message and fa...
void log(const char *format, ARGS...args) const
Log a message at a given level.
AbstractSimpleCodec(const std::string &name, const std::string &chainId, const map_t &config)
Give the plugin instance name and the chain name.
Definition: sag_connectivity_plugins.hpp:468
virtual bool transformMessageTowardsHost(Message &msg)=0
Transform an individual message in a host-wards direction Return true to keep the message and false (...
A map class which implements many of the functions on std::map.
Definition: sag_connectivity_cpp.hpp:719
A container for an payload and associated metadata.
Definition: sag_connectivity_cpp.hpp:1514
virtual void shutdown()
Release any resources created by the connectivity plug-in, and terminate and join any background thre...
Definition: sag_connectivity_plugins.hpp:190
level_t getLevel() const
Returns the current log level.
void fatal(const std::string &format, ARGS...args) const
Log a message at FATAL level.
Definition: sag_connectivity_plugins.hpp:106
virtual void sendBatchTowardsHost(Message *start, Message *end)
Implements batch sending, calling transformMessageTowardsHost for each message individually.
Definition: sag_connectivity_plugins.hpp:476
An interface to the next component (plugin or host) towards the transport.
Definition: sag_connectivity_plugins.hpp:250
AbstractCodec(const std::string &name, const std::string &chainId, const map_t &config)
Give the plugin instance name and the chain name.
Definition: sag_connectivity_plugins.hpp:324
virtual void deliverMessageTowardsTransport(Message &msg)=0
Deliver an individual message.
virtual void deliverNullPayloadTowardsTransport(Message &msg)
Deliver a message with a null payload. Default is to ignore it.
Definition: sag_connectivity_plugins.hpp:423
void crit(const char *format, ARGS...args) const
Log a message at CRIT level.
Definition: sag_connectivity_plugins.hpp:119
void debug(const char *format, ARGS...args) const
Log a message at DEBUG level.
Definition: sag_connectivity_plugins.hpp:129
void info(const std::string &format, ARGS...args) const
Log a message at INFO level.
Definition: sag_connectivity_plugins.hpp:112
bool isFatalEnabled() const
Returns true if FATAL messages will be written to the log.
Definition: sag_connectivity_plugins.hpp:91
AbstractTransport(const std::string &name, const std::string &chainId, const map_t &config)
Give the plugin instance name and the chain name.
Definition: sag_connectivity_plugins.hpp:378
virtual bool handleException(Message &m, bool towardsTransport)
Handle an exception thrown while delivering a message.
Definition: sag_connectivity_plugins.hpp:569
virtual void sendBatchTowardsHost(Message *start, Message *end)
Process or deliver a batch of messages heading towards the host.
virtual void setNextTowardsHost(HostSide::ptr_t &&host)
Set the next in the chain towards the host.
Definition: sag_connectivity_plugins.hpp:382
void trace(const std::string &format, ARGS...args) const
Log a message at TRACE level.
Definition: sag_connectivity_plugins.hpp:116
Plugin(const std::string &name, const std::string &chainId, const map_t &config)
Give the plugin instance name and the chain name.
Definition: sag_connectivity_plugins.hpp:159
virtual void sendBatchTowardsTransport(Message *start, Message *end)
Implements batch sending, calling transformMessageTowardsTransport for each message individually...
Definition: sag_connectivity_plugins.hpp:503
virtual void setNextTowardsHost(HostSide::ptr_t &&host)
Set the next in the chain towards the host.
Definition: sag_connectivity_plugins.hpp:328
bool isWarnEnabled() const
Returns true if WARN messages will be written to the log.
Definition: sag_connectivity_plugins.hpp:95
std::auto_ptr< HostSide > ptr_t
Pointers to HostSides should always be this ptr_t type, which is a std::auto_ptr. ...
Definition: sag_connectivity_plugins.hpp:211
virtual ~Plugin()
Virtual destructor to ensure that we can delete any part of the tree.
Definition: sag_connectivity_plugins.hpp:163
Warnings.
Definition: sag_connectivity_c.h:64
virtual void sendBatchTowardsTransport(Message *start, Message *end)
Process or deliver a batch of messages heading towards the transport.
const std::string & getName() const
Return the name of this plugin instance.
Definition: sag_connectivity_plugins.hpp:192
virtual bool transformNullPayloadTowardsHost(Message &msg)
Transform a message with a null payload in a host-wards direction.
Definition: sag_connectivity_plugins.hpp:536