Apama  10.2.0.3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
sag_connectivity_plugins.hpp
Go to the documentation of this file.
1 /*
2  * Title: sag_connectivity_plugins.hpp
3  * Description: C++ API for writing connectivity plugins
4  * $Copyright (c) 2015-2018 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.$
5  * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG
6  * @Version: $Id: sag_connectivity_plugins.hpp 332799 2018-07-05 14:26:28Z hdy $
7  */
8 
17 #ifndef _SAG_CONNECTIVITY_PLUGINS_HPP_
18 #define _SAG_CONNECTIVITY_PLUGINS_HPP_
19 
20 #include <sag_connectivity_cpp.hpp>
21 #include <sag_plugin_logging.hpp>
22 #include <memory>
23 #include <mutex>
24 
25 // must define __STDC_FORMAT_MACROS before first include of inttypes else printf macros won't be defined
26 #ifndef __STDC_FORMAT_MACROS
27 #define __STDC_FORMAT_MACROS 1
28 #endif
29 #include <inttypes.h>
30 
31 namespace com {
32 namespace softwareag {
33 namespace connectivity {
34 
35 namespace
36 {
40  void replace(std::string &input, const std::string &from, const std::string &to)
41  {
42  size_t pos = 0;
43  while((pos = input.find(from, pos)) != std::string::npos)
44  {
45  input.replace(pos, from.length(), to);
46  pos =+ to.length();
47  }
48  }
49 
53  std::string to_string(const Message &m, bool truncate=true)
54  {
55  std::string payload = to_string(m.getPayload());
56  if (truncate && payload.length() > 200) payload = payload.substr(0, 196) + " ...";
57  // security sanitization to prevent log message faking
58  replace(payload, "\n", "\\n");
59  replace(payload, "\r", "\\r");
60  return "Message<metadata="+to_string(m.getMetadataMap())+", payload="+payload+">";
61  }
62 }
63 
69 enum class Direction {
73  TOWARDS_HOST = 1,
78 
79 };
80 
85 class PluginHost {
86  friend class Plugin;
87 public:
88  typedef std::unique_ptr<PluginHost> ptr_t;
89 
104  void enableReliability(Direction direction) {
105  if (SAG_ERROR_OK != sag_enable_reliability(chain, static_cast<int>(direction))) {
106  throw std::runtime_error("An error occurred while setting chain reliability");
107  }
108  }
109 
111  bool isShuttingDown() {
112  bool isShuttingDown = false;
113  if (SAG_ERROR_OK != sag_is_host_shutting_down(chain, isShuttingDown)) {
114  throw std::runtime_error("An error occurred while checking if host is shutting down");
115  }
116  return isShuttingDown;
117  }
118 private:
122  PluginHost(void* chain = nullptr) :chain(chain) {}
123  void* chain;
124 };
125 
126 // forward decl for parameters friend
127 class Plugin;
128 
139 {
140  // for constructor access to connectivityManager
141  friend class Plugin;
142 
143 public:
145 
147 
152  const map_t &getConfig() const { return config; }
153 
157  const std::string &getChainId() const { return chainId; }
158 
162  const std::string &getPluginName() const { return pluginName; }
163 
164 protected:
168  PluginConstructorParameters(const std::string &pluginName, const std::string &chainId, const map_t &config, void* connectivityManager, void* chain)
169  : chainId(chainId), pluginName(pluginName), config(config.copy()), connectivityManager(connectivityManager), chain(chain)
170  {}
171 
172 private:
173  std::string chainId;
174  std::string pluginName;
175  map_t config;
177  void* connectivityManager;
179  void* chain;
180 };
181 
184 {
185 public:
189  TransportConstructorParameters(const std::string &pluginName, const std::string &chainId, const map_t &config, void* connectivityManager, void* reserved)
190  : PluginConstructorParameters(pluginName, chainId, config, connectivityManager, reserved)
191  {}
192 };
193 
196 {
197 public:
201  CodecConstructorParameters(const std::string &pluginName, const std::string &chainId, const map_t &config, void* connectivityManager, void* reserved)
202  : PluginConstructorParameters(pluginName, chainId, config, connectivityManager, reserved)
203  {}
204 };
205 
206 
221 {
222 public:
223 
227  static const char* STATUS_ONLINE() { return "ONLINE"; }
231  static const char* STATUS_STARTING() { return "STARTING"; }
235  static const char* STATUS_FAILED() { return "FAILED"; }
236 
247  {
248  friend class StatusReporter;
249  public:
250 
251  ~StatusItem()
252  {
253  sag_delete_user_status_item(connectivityManager, underlying);
254  underlying.item = nullptr;
255  }
256 
262  void setStatus(const std::string &value)
263  {
264  std::unique_lock<std::mutex> ul(status_lock);
265  setStatusLocked(value);
266  }
267 
276  void setStatus(int64_t value)
277  {
278  std::unique_lock<std::mutex> ul(status_lock);
279  intValue = value;
280  setStatusLocked(convert_to_details::integerToString(value));
281  }
282 
288  std::string getStatus() { return lastValue; }
289 
297  void increment(int64_t incrementValue = 1)
298  {
299  std::unique_lock<std::mutex> ul(status_lock);
300  intValue += incrementValue;
301  setStatusLocked(convert_to_details::integerToString(intValue));
302  }
303 
306  const std::string &key() { return mkey; }
307 
308  private:
309  StatusItem(const StatusItem& other) = delete; // non construction-copyable
310  StatusItem& operator=(const StatusItem&) = delete; // non copyable
311 
312  StatusItem(void* connectivityManager, const std::string &key, const std::string &initialValue, const int64_t intValue)
313  : intValue(intValue),
314  mkey(key),
315  lastValue(initialValue),
316  connectivityManager(connectivityManager),
317  underlying(sag_create_user_status_item(connectivityManager, key.c_str(), initialValue.c_str()))
318  {
319  if (!underlying.item)
320  {
321  std::ostringstream oss;
322  oss << "Failed to create status item '" << key << "' (ensure the key is unique and that this plug-in has not been shutdown already)";
323  throw std::runtime_error(oss.str());
324  }
325  }
326 
327  void setStatusLocked(const std::string &value)
328  {
329  if (value == lastValue) return; // no-op this case
330  lastValue = value;
331 
332  sag_set_user_status_item(underlying, value.c_str());
333  }
334 
335  int64_t intValue;
336  const std::string mkey;
337  std::string lastValue;
338  void* connectivityManager;
339  sag_status_item_t underlying;
340  std::mutex status_lock;
341  };
342 
344  typedef std::unique_ptr<StatusItem> item_ptr;
345 
359  item_ptr createStatusItem(const std::string &key, const std::string &initialValue)
360  {
361  return std::unique_ptr<StatusItem>(new StatusItem(connectivityManager, key, initialValue, 0));
362  }
363 
376  item_ptr createStatusItem(const std::string &key, int64_t initialValue)
377  {
378  std::ostringstream oss;
379  oss << initialValue;
380  return std::unique_ptr<StatusItem>(new StatusItem(connectivityManager, key, oss.str(), initialValue));
381  }
382 
393  void setStatus(const map_t &statusmap)
394  {
395  sag_set_user_status_map(connectivityManager, reinterpret_cast<const sag_underlying_map_t&>(statusmap));
396  // keep track of what keys we've added so we can automatically remove them when we're destroyed
397  for (auto it = statusmap.cbegin(); it != statusmap.cend(); it++)
398  if (it->first.type_tag() == SAG_DATA_STRING) // ignore invalid ones
399  {
400  if (it->second.empty())
401  mapKeysToCleanup.erase(it->first);
402  else
403  mapKeysToCleanup.insert(it->first.copy(), std::move(data_t()));
404  }
405  }
406 
407 
408 
412  ~StatusReporter()
413  {
414  if (!mapKeysToCleanup.empty())
415  sag_set_user_status_map(connectivityManager, reinterpret_cast<const sag_underlying_map_t&>(mapKeysToCleanup));
416  }
417 
421  explicit StatusReporter(void *connectivityManager) : connectivityManager(connectivityManager), mapKeysToCleanup()
422  {
423  }
424 
425 private:
426  void* connectivityManager;
427  map_t mapKeysToCleanup;
428 
429 
430  StatusReporter() = delete;
431 
432  // non-copyable, due to the cleanup code we don't want people to shoot themselves in the foot by having multiple copies
433  StatusReporter(const StatusReporter& other) = delete;
434  StatusReporter& operator=(const StatusReporter&) = delete;
435 
436 };
437 
445 class Plugin
446 {
447 public:
451  Plugin(const std::string &pluginName, const std::string &chainId, const map_t &config)
452  : name(pluginName), pluginName(pluginName), chainId(chainId), config(config.copy()), host(new PluginHost()), LOGGER("connectivity." + pluginName + "." + chainId), logger("connectivity." + name + "." + chainId), statusReporter()
453  {
454  logger.info("The %s connectivity plug-in in chain '%s' is using the deprecated legacy (std::string&,std::string&,map_t&) constructor. We recommend changing to use the recommended (PluginConstructorParameters&) constructor instead.",
455  pluginName.c_str(), chainId.c_str());
456  }
457 
458  /* Constructor.
459  * @since 9.12.0.1
460  */
461  Plugin(const PluginConstructorParameters &params)
462  : name(params.getPluginName()), pluginName(params.getPluginName()), chainId(params.getChainId()), config(params.getConfig().copy()),
463  host(new PluginHost(params.chain)), LOGGER("connectivity." + params.getPluginName() + "." + params.getChainId()), logger("connectivity." + pluginName + "." + chainId),
464  statusReporter(new StatusReporter(params.connectivityManager))
465  {
466  }
467 
473  virtual ~Plugin() {}
474 
485  virtual void start() {}
486 
494  virtual void hostReady() {}
495 
518  virtual void shutdown() {}
520  const std::string &getName() const { return name; }
521 protected:
524  const std::string name;
528  const std::string pluginName;
530  const std::string chainId;
538  const PluginHost::ptr_t host;
547  if (statusReporter) return *statusReporter;
548  throw std::runtime_error("Cannot call getStatusReporter when using the legacy constructor");
549  }
550 
551 public:
558 
563 private:
564  std::unique_ptr<StatusReporter> statusReporter;
565 };
566 
571 class HostSide
572 {
573 public:
575  virtual ~HostSide() {}
577  typedef std::unique_ptr<HostSide> ptr_t;
597  virtual void sendBatchTowardsHost(Message *start, Message *end) = 0;
598 
606  void sendBatchTowardsHost(Message &&message) {
607  sendBatchTowardsHost(&message, &message+1);
608  }
609 
619  template<typename IT>
620  auto sendBatchTowardsHost(const IT &begin, const IT &end) -> typename std::enable_if<
621  !std::is_const<ampl::remove_ref_t<decltype(*begin)>>::value &&
622  ampl::is_same<Message, ampl::remove_const_t<ampl::remove_ref_t<decltype(*begin)>>>::value
623  , void>::type // this ensures we can only pass in non-const iterator pairs to Message
624  {
625  if(begin == end) sendBatchTowardsHost((Message*) nullptr, (Message*) nullptr);
626  else sendBatchTowardsHost(&(*begin), (&(*(end-1)))+1);
627  }
628 };
629 
634 class RemoteHostSide: public HostSide
635 {
636 public:
637  RemoteHostSide(sag_plugin_t other, sag_send_fn_t fn): other(other), fn(fn) {}
638  virtual ~RemoteHostSide() {}
639  virtual void sendBatchTowardsHost(Message *start, Message *end);
640 private:
641  sag_plugin_t other;
642  sag_send_fn_t fn;
643 };
644 
650 {
651 public:
653  virtual ~TransportSide() {}
655  typedef std::unique_ptr<TransportSide> ptr_t;
676  virtual void sendBatchTowardsTransport(Message *start, Message *end) = 0;
677 
686  sendBatchTowardsTransport(&message, &message+1);
687  }
688 
698  template<typename IT>
699  auto sendBatchTowardsTransport(const IT &begin, const IT &end) -> typename std::enable_if<
700  !std::is_const<ampl::remove_ref_t<decltype(*begin)>>::value &&
701  ampl::is_same<Message, ampl::remove_const_t<ampl::remove_ref_t<decltype(*begin)>>>::value
702  , void>::type // this ensures we can only pass in non-const iterator pairs to Message
703  {
704  if(begin == end) sendBatchTowardsTransport((Message*) nullptr, (Message*) nullptr);
705  else sendBatchTowardsTransport(&(*begin), (&(*(end-1)))+1);
706  }
707 };
708 
713 class RemoteTransportSide: public TransportSide
714 {
715 public:
716  RemoteTransportSide(sag_plugin_t other, sag_send_fn_t fn): other(other), fn(fn) {}
717  virtual ~RemoteTransportSide() {}
718  virtual void sendBatchTowardsTransport(Message *start, Message *end);
719 private:
720  sag_plugin_t other;
721  sag_send_fn_t fn;
722 };
723 
724 
749 class AbstractCodec: public Plugin, public HostSide, public TransportSide
750 {
751 public:
753 
769  AbstractCodec(const std::string &pluginName, const std::string &chainId, const map_t &config)
770  : Plugin(pluginName, chainId, config)
771  {}
772 
782  : Plugin(params)
783  {}
784 
785  // These methods do not need to show up in doxygen
786  /* Called between construction and start() to set the hostSide field */
787  virtual void setNextTowardsHost(HostSide::ptr_t &&host)
788  {
789  hostSide = std::move(host);
790  }
791  /* Called between construction and start() to set the transportSide field */
792  virtual void setNextTowardsTransport(TransportSide::ptr_t &&transport)
793  {
794  transportSide = std::move(transport);
795  }
796 protected:
802 
808 };
809 
842 {
843 public:
845 
860  AbstractTransport(const std::string &name, const std::string &chainId, const map_t &config)
861  : Plugin(name, chainId, config)
862  {}
863 
873  : Plugin(params)
874  {}
875 
876  // This method does not need to show up in doxygen
877  /* Called between construction and start() to set the hostSide field */
878  virtual void setNextTowardsHost(HostSide::ptr_t &&host)
879  {
880  hostSide = std::move(host);
881  }
882 
883 protected:
889 };
890 
906 {
907 public:
908 
923  AbstractSimpleTransport(const std::string &name, const std::string &chainId, const map_t &config)
924  : AbstractTransport(name, chainId, config)
925  {}
926 
936  : AbstractTransport(params)
937  {}
938 
939 
946  {
947  for (Message *it = start; it != end; ++it) {
948  try {
949  if (it->getPayload().empty()) {
951  } else {
953  }
954  } catch (...) {
955  handleException(*it);
956  }
957  }
958  }
960  virtual void deliverMessageTowardsTransport(Message &msg) = 0;
963  {
964  // do nothing
965  }
966 
985  virtual void handleException(Message &m)
986  {
987  try {
988  throw;
989  } catch (const std::exception &e) {
990  logger.warn("Error while delivering message: %s; %s will be dropped.", e.what(), to_string(m).c_str());
991  } catch (...) {
992  logger.warn("Unknown error delivering message: %s", to_string(m).c_str());
993  }
994  }
995 };
996 
1016 {
1017 public:
1032  AbstractSimpleCodec(const std::string &name, const std::string &chainId, const map_t &config)
1033  : AbstractCodec(name, chainId, config)
1034  {}
1035 
1045  : AbstractCodec(params)
1046  {}
1047 
1055  {
1056  Message *curr = start;
1057  for (Message *it = start; it != end; ++it) {
1058  bool rv;
1059  try {
1060  if (it->getPayload().empty()) {
1062  } else {
1063  rv = transformMessageTowardsHost(*it);
1064  }
1065  } catch (...) {
1066  rv = handleException(*it, false);
1067  }
1068  // if we keep it (and it didn't throw) swap it with the accumulator
1069  if (rv) {
1070  if (it != curr) it->swap(std::move(*curr)); // don't self-swap
1071  ++curr;
1072  }
1073  }
1074  if (hostSide.get()) hostSide->sendBatchTowardsHost(start, curr);
1075  }
1083  {
1084  Message *curr = start;
1085  for (Message *it = start; it != end; ++it) {
1086  bool rv;
1087  try {
1088  // process the message
1089  if (it->getPayload().empty()) {
1091  } else {
1093  }
1094  } catch (...) {
1095  rv = handleException(*it, true);
1096  }
1097  // if we keep it (and it didn't throw) swap it with the accumulator
1098  if (rv) {
1099  if (it != curr) it->swap(std::move(*curr)); // don't self-swap
1100  ++curr;
1101  }
1102  }
1103  if (transportSide.get()) transportSide->sendBatchTowardsTransport(start, curr);
1104  }
1105 
1110  virtual bool transformMessageTowardsHost(Message &msg) = 0;
1115  virtual bool transformMessageTowardsTransport(Message &msg) = 0;
1121  {
1122  // do nothing
1123  return true;
1124  }
1130  {
1131  // do nothing
1132  return true;
1133  }
1155  virtual bool handleException(Message &m, bool towardsTransport)
1156  {
1157  try {
1158  throw;
1159  } catch (const std::exception &e) {
1160  logger.warn("Error while transforming message: %s; %s will be dropped.", e.what(), to_string(m).c_str());
1161  } catch (...) {
1162  logger.warn("Unknown error transforming message: %s", to_string(m).c_str());
1163  }
1164  return false;
1165  }
1166 };
1167 
1168 }}} // com.softwareag.connectivity
1169 
1170 // internal implementation included from these files
1171 #include <sag_internal/exception.hpp>
1172 #include <sag_internal/remote_plugins.hpp>
1173 #include <sag_internal/plugin_macros.hpp>
1174 
1184 #define SAG_DECLARE_CONNECTIVITY_TRANSPORT_CLASS(Class) _SAG_DECLARE_CONNECTIVITY_TRANSPORT_CLASS(Class)
1185 
1197 #define SAG_DECLARE_CONNECTIVITY_TRANSPORT(Class) _SAG_DECLARE_CONNECTIVITY_TRANSPORT_LEGACY(Class)
1198 
1208 #define SAG_DECLARE_CONNECTIVITY_CODEC_CLASS(Class) _SAG_DECLARE_CONNECTIVITY_CODEC_CLASS(Class)
1209 
1220 #define SAG_DECLARE_CONNECTIVITY_CODEC(Class) _SAG_DECLARE_CONNECTIVITY_CODEC_LEGACY(Class)
1221 
1222 #endif // _SAG_CONNECTIVITY_PLUGINS_HPP_
A container for parameters passed to the constructor of a codec plug-in.
Definition: sag_connectivity_plugins.hpp:195
Logger LOGGER
Legacy logging for writing to the host log file [DEPRECATED].
Definition: sag_connectivity_plugins.hpp:557
bool empty() const
Returns true if the map is empty (size() == 0)
Definition: sag_connectivity_cpp.hpp:290
const std::string pluginName
The name used for this plug-in in the configuration file.
Definition: sag_connectivity_plugins.hpp:528
Base class that simplifies implementation of codec plug-ins that deal only with individual messages n...
Definition: sag_connectivity_plugins.hpp:1015
AbstractTransport(const PluginConstructorParameters::TransportConstructorParameters &params)
Constructor.
Definition: sag_connectivity_plugins.hpp:872
Base class that simplifies implementation of transport plug-ins that deal only with individual messag...
Definition: sag_connectivity_plugins.hpp:905
void increment(int64_t incrementValue=1)
Set an integer status value by incrementing the previous integer value that was set by this object...
Definition: sag_connectivity_plugins.hpp:297
HostSide::ptr_t hostSide
The next plug-in in the chain towards host.
Definition: sag_connectivity_plugins.hpp:801
An interface to the next component (plugin or host) towards the host.
Definition: sag_connectivity_plugins.hpp:571
virtual void sendBatchTowardsHost(Message *start, Message *end)=0
Abstract method that must be implemented to handle delivery of a batch of messages heading towards th...
map_t config
The configuration of this plug-in.
Definition: sag_connectivity_plugins.hpp:532
item_ptr createStatusItem(const std::string &key, int64_t initialValue)
Creates a StatusItem instance that can be used to report status for a given key, using an integral in...
Definition: sag_connectivity_plugins.hpp:376
Direction
The enumeration indicating the direction of message flow - towards the transport or towards the host...
Definition: sag_connectivity_plugins.hpp:69
void warn(const char *format, ARGS...args) const
Log a message at WARN level.
Definition: sag_plugin_logging.hpp:153
Class for writing to the system logger.
Definition: sag_plugin_logging.hpp:72
const PluginHost::ptr_t host
Interface to support miscellaneous requests from this plug-in to the host system. ...
Definition: sag_connectivity_plugins.hpp:538
StatusReporter & getStatusReporter()
Allows reporting status information from this plug-in, such as online or failed status and number of ...
Definition: sag_connectivity_plugins.hpp:546
Contains the C++ implementation of the underlying datatypes used by connectivity plugins and their ac...
void setStatus(const map_t &statusmap)
Set multiple related string status values at the same time (atomically).
Definition: sag_connectivity_plugins.hpp:393
virtual void sendBatchTowardsTransport(Message *start, Message *end)
Implements batch sending, calling deliverMessageTowardsTransport(Message&) for each message individua...
Definition: sag_connectivity_plugins.hpp:945
Base of the inheritance tree for Connectivity plugins.
Definition: sag_connectivity_plugins.hpp:445
TransportSide::ptr_t transportSide
The next plug-in in the chain towards transport.
Definition: sag_connectivity_plugins.hpp:807
std::unique_ptr< StatusItem > item_ptr
Unique pointer to a StatusItem.
Definition: sag_connectivity_plugins.hpp:344
virtual void sendBatchTowardsTransport(Message *start, Message *end)=0
Abstract method that must be implemented to handle delivery of a batch of messages heading towards th...
void setStatus(const std::string &value)
Set a string status value.
Definition: sag_connectivity_plugins.hpp:262
Definition: sag_connectivity_threading.h:178
AbstractSimpleTransport(const PluginConstructorParameters::TransportConstructorParameters &params)
Constructor.
Definition: sag_connectivity_plugins.hpp:935
Logger logger
Logging for writing to the host log file.
Definition: sag_connectivity_plugins.hpp:562
void info(const char *format, ARGS...args) const
Log a message at INFO level.
Definition: sag_plugin_logging.hpp:155
virtual bool transformNullPayloadTowardsTransport(Message &msg)
Transform a message with a null payload in a transport-wards direction.
Definition: sag_connectivity_plugins.hpp:1129
virtual void start()
Called when an entire chain has been created and the plugin is allowed to start up (after all plugins...
Definition: sag_connectivity_plugins.hpp:485
auto sendBatchTowardsTransport(const IT &begin, const IT &end) -> typename std::enable_if< !std::is_const< ampl::remove_ref_t< decltype(*begin)>>::value &&ampl::is_same< Message, ampl::remove_const_t< ampl::remove_ref_t< decltype(*begin)>>>::value, void >::type
Overload for sending messages using an iterator range.
Definition: sag_connectivity_plugins.hpp:699
std::enable_if< get_underlying< T >::value, std::string >::type to_string(const T &t)
Get a string representation of t.
A class allowing a plug-in to report status values to the host.
Definition: sag_connectivity_plugins.hpp:220
std::unique_ptr< TransportSide > ptr_t
Pointers to TransportSides should always be this ptr_t type, which is a std::unique_ptr.
Definition: sag_connectivity_plugins.hpp:655
size_t erase(const data_t &k)
Remove the item with the specified key.
virtual ~TransportSide()
Ensure virtual destruction.
Definition: sag_connectivity_plugins.hpp:653
Base class for transport plug-ins.
Definition: sag_connectivity_plugins.hpp:841
static const char * STATUS_ONLINE()
Returns a constant that should be used as the status value when a component is online, operational, connected, and ready to handles messages.
Definition: sag_connectivity_plugins.hpp:227
item_ptr createStatusItem(const std::string &key, const std::string &initialValue)
Creates a StatusItem instance that can be used to report status for a given key.
Definition: sag_connectivity_plugins.hpp:359
const_iterator cbegin() const
Forward const_iterator begin.
Definition: sag_connectivity_cpp.hpp:282
AbstractSimpleTransport(const std::string &name, const std::string &chainId, const map_t &config)
Legacy constructor [DEPRECATED].
Definition: sag_connectivity_plugins.hpp:923
bool isShuttingDown()
Check if host is shutting down.
Definition: sag_connectivity_plugins.hpp:111
const_iterator cend() const
Forward const_iterator end.
Definition: sag_connectivity_cpp.hpp:284
virtual void handleException(Message &m)
Handle an exception thrown while delivering a message towards the transport.
Definition: sag_connectivity_plugins.hpp:985
const std::string name
The name used for this plug-in in the configuration file.
Definition: sag_connectivity_plugins.hpp:524
static const char * STATUS_FAILED()
Returns a constant that should be used as the status value when a component is not currently operatio...
Definition: sag_connectivity_plugins.hpp:235
virtual ~HostSide()
Ensure virtual destruction.
Definition: sag_connectivity_plugins.hpp:575
AbstractSimpleCodec(const PluginConstructorParameters::CodecConstructorParameters &params)
Constructor.
Definition: sag_connectivity_plugins.hpp:1044
virtual void hostReady()
Called some time after start(), when the host is ready to start receiving input (sends will be queued...
Definition: sag_connectivity_plugins.hpp:494
Base class for codec plug-ins.
Definition: sag_connectivity_plugins.hpp:749
No error.
Definition: sag_connectivity_c.h:64
void sendBatchTowardsHost(Message &&message)
Overload for sending a batch containing a single message.
Definition: sag_connectivity_plugins.hpp:606
static const char * STATUS_STARTING()
Returns a constant that should be used as the status value when a component is still starting...
Definition: sag_connectivity_plugins.hpp:231
virtual bool transformMessageTowardsTransport(Message &msg)=0
Abstract method that must be implemented to handle transformation of an individual message in a trans...
std::pair< iterator, bool > insert(insertion_t &&v)
Insert a new key/value pair into the map.
AbstractSimpleCodec(const std::string &name, const std::string &chainId, const map_t &config)
Legacy constructor [DEPRECATED].
Definition: sag_connectivity_plugins.hpp:1032
void setStatus(int64_t value)
Set an integer status value.
Definition: sag_connectivity_plugins.hpp:276
virtual bool transformMessageTowardsHost(Message &msg)=0
Abstract method that must be implemented to handle transformation of an individual message...
A map class which implements many of the functions on std::map.
Definition: sag_connectivity_cpp.hpp:36
const std::string & key()
Get the unique key specified when this status item was created.
Definition: sag_connectivity_plugins.hpp:306
const std::string & getPluginName() const
Get the name used in the configuration file for this plug-in.
Definition: sag_connectivity_plugins.hpp:162
A class that can be used to efficiently update the value associated with a single status key...
Definition: sag_connectivity_plugins.hpp:246
A container for an payload and associated metadata.
Definition: sag_connectivity_cpp.hpp:27
virtual void shutdown()
Stop processing messages and terminate and join any background threads.
Definition: sag_connectivity_plugins.hpp:518
virtual void sendBatchTowardsHost(Message *start, Message *end)
Implements batch sending, calling transformMessageTowardsHost(Message &) for each message individuall...
Definition: sag_connectivity_plugins.hpp:1054
An interface to the next component (plugin or host) towards the transport.
Definition: sag_connectivity_plugins.hpp:649
AbstractCodec(const PluginConstructorParameters::CodecConstructorParameters &params)
Constructor.
Definition: sag_connectivity_plugins.hpp:781
virtual void deliverMessageTowardsTransport(Message &msg)=0
Abstract method that must be implemented to handle delivery of an individual message.
auto sendBatchTowardsHost(const IT &begin, const IT &end) -> typename std::enable_if< !std::is_const< ampl::remove_ref_t< decltype(*begin)>>::value &&ampl::is_same< Message, ampl::remove_const_t< ampl::remove_ref_t< decltype(*begin)>>>::value, void >::type
Overload for sending messages using an iterator range.
Definition: sag_connectivity_plugins.hpp:620
virtual void deliverNullPayloadTowardsTransport(Message &msg)
Deliver a message with a null payload.
Definition: sag_connectivity_plugins.hpp:962
void enableReliability(Direction direction)
Enable reliable messaging for the chain that this plug-in belongs to, in a particular direction i...
Definition: sag_connectivity_plugins.hpp:104
Contains the headers needed to implement your own Connectivity Plugins.
AbstractCodec(const std::string &pluginName, const std::string &chainId, const map_t &config)
Legacy constructor [DEPRECATED].
Definition: sag_connectivity_plugins.hpp:769
A container for parameters passed to the constructor of a transport plug-in.
Definition: sag_connectivity_plugins.hpp:183
The direction of messages flowing towards the host (from the transport).
AbstractTransport(const std::string &name, const std::string &chainId, const map_t &config)
Legacy constructor [DEPRECATED].
Definition: sag_connectivity_plugins.hpp:860
const std::string & getChainId() const
Get the identifier used for the chain this plug-in is part of.
Definition: sag_connectivity_plugins.hpp:157
Interface to support miscellaneous requests from a particular plug-in to the host system...
Definition: sag_connectivity_plugins.hpp:85
A base interface for parameters passed to the constructor of transport or codec plug-ins.
Definition: sag_connectivity_plugins.hpp:138
Plugin(const std::string &pluginName, const std::string &chainId, const map_t &config)
Legacy constructor [DEPRECATED].
Definition: sag_connectivity_plugins.hpp:451
virtual bool handleException(Message &m, bool towardsTransport)
Handle an exception thrown while delivering a message.
Definition: sag_connectivity_plugins.hpp:1155
utf8-encoded const char*
Definition: sag_connectivity_c.h:46
const std::string chainId
The identifier used for the chain this plug-in is part of.
Definition: sag_connectivity_plugins.hpp:530
The direction of messages flowing towards the transport (from the host).
void sendBatchTowardsTransport(Message &&message)
Overload for sending a batch containing a single message.
Definition: sag_connectivity_plugins.hpp:685
std::string getStatus()
Return the value this status item was set to most recently by this class.
Definition: sag_connectivity_plugins.hpp:288
A variant type which can be one of the following:
Definition: sag_connectivity_cpp.hpp:42
std::unique_ptr< HostSide > ptr_t
Pointers to HostSides should always be this ptr_t type, which is a std::unique_ptr.
Definition: sag_connectivity_plugins.hpp:577
const map_t & getConfig() const
Get the configuration for this plug-in.
Definition: sag_connectivity_plugins.hpp:152
virtual void sendBatchTowardsTransport(Message *start, Message *end)
Implements batch sending, calling transformMessageTowardsTransport(Message &) for each message indivi...
Definition: sag_connectivity_plugins.hpp:1082
virtual ~Plugin()
This destructor must be virtual.
Definition: sag_connectivity_plugins.hpp:473
HostSide::ptr_t hostSide
The next plug-in in the chain towards host.
Definition: sag_connectivity_plugins.hpp:888
const std::string & getName() const
The name used for this plug-in in the configuration file.
Definition: sag_connectivity_plugins.hpp:520
virtual bool transformNullPayloadTowardsHost(Message &msg)
Transform a message with a null payload in a host-wards direction.
Definition: sag_connectivity_plugins.hpp:1120