Health-GPS  1.2.2.0
Global Health Policy Simulation model (Health-GPS)
channel.h
Go to the documentation of this file.
1 #pragma once
2 
3 #include <atomic>
4 #include <condition_variable>
5 #include <mutex>
6 #include <optional>
7 #include <queue>
8 
9 namespace hgps {
10 
24 template <typename T> class Channel {
25  public:
26  using value_type = T;
27  using size_type = std::size_t;
28 
31  explicit Channel(size_type capacity = 0) : capacity_{capacity}, is_closed_{false} {}
32 
33  Channel(const Channel &) = delete;
34  Channel &operator=(const Channel &) = delete;
35  Channel(Channel &&) = delete;
36  Channel &operator=(Channel &&) = delete;
37 
39  virtual ~Channel() = default;
40 
44  bool send(const value_type &message) { return do_send(message); }
45 
49  bool send(value_type &&message) { return do_send(std::move(message)); }
50 
54  std::optional<value_type> try_receive(int timeout_millis = 0) {
55  std::unique_lock<std::mutex> lock{mtx_};
56 
57  if (timeout_millis <= 0) {
58  cond_var_.wait(lock, [this] { return buffer_.size() > 0 || closed(); });
59  } else {
60  cond_var_.wait_for(lock, std::chrono::milliseconds(timeout_millis),
61  [this] { return buffer_.size() > 0 || closed(); });
62  }
63 
64  if (buffer_.empty()) {
65  return {};
66  }
67 
68  auto entry = std::move(buffer_.front());
69  buffer_.pop();
70 
71  cond_var_.notify_one();
72  return entry;
73  }
74 
77  [[nodiscard]] size_type constexpr size() const noexcept { return buffer_.size(); }
78 
81  [[nodiscard]] bool constexpr empty() const noexcept { return buffer_.empty(); }
82 
84  void close() noexcept {
85  cond_var_.notify_one();
86  is_closed_.store(true);
87  }
88 
91  [[nodiscard]] bool closed() const noexcept { return is_closed_.load(); }
92 
93  private:
94  const size_type capacity_;
95  std::queue<value_type> buffer_;
96  std::atomic<bool> is_closed_;
97  std::condition_variable cond_var_;
98  std::mutex mtx_;
99 
100  bool do_send(auto &&payload) {
101  if (is_closed_.load()) {
102  return false;
103  }
104 
105  std::unique_lock<std::mutex> lock(mtx_);
106  if (capacity_ > 0 && buffer_.size() >= capacity_) {
107  cond_var_.wait(lock, [this]() { return buffer_.size() < capacity_; });
108  }
109 
110  buffer_.push(std::forward<decltype(payload)>(payload));
111  cond_var_.notify_one();
112  return true;
113  }
114 };
115 } // namespace hgps
Thread-safe communication channel data type.
Definition: channel.h:24
Channel(Channel &&)=delete
Channel & operator=(Channel &&)=delete
void close() noexcept
Close the channel, no new messages are accepted.
Definition: channel.h:84
virtual ~Channel()=default
Destroys a Channel instance.
Channel(const Channel &)=delete
bool send(value_type &&message)
Sends a new message through the channel.
Definition: channel.h:49
constexpr size_type size() const noexcept
Gets the current channel size, number of messages.
Definition: channel.h:77
Channel(size_type capacity=0)
Initialises a new instance of the Channel class.
Definition: channel.h:31
bool send(const value_type &message)
Sends a new message through the channel by reference.
Definition: channel.h:44
std::optional< value_type > try_receive(int timeout_millis=0)
Try to receive a message from the channel.
Definition: channel.h:54
Channel & operator=(const Channel &)=delete
std::size_t size_type
Definition: channel.h:27
T value_type
Definition: channel.h:26
constexpr bool empty() const noexcept
Determine whether the channel is empty.
Definition: channel.h:81
bool closed() const noexcept
Determine whether the channel is closed.
Definition: channel.h:91
Top-level namespace for Health-GPS C++ API.
Definition: analysis_definition.h:8