pianod2
multisource multiuser scriptable networked music player
threadsafecircularqueue.h
Go to the documentation of this file.
1 
9 #pragma once
10 
11 #include <config.h>
12 
13 #include <mutex>
14 #include <condition_variable>
15 #include <atomic>
16 #include <chrono>
17 
18 #include <cassert>
19 #include <cstdint>
20 #include <ctime>
21 
22 #include <type_traits>
23 
27 template <typename ValueType, uint32_t capacity>
29  using index_type = uint32_t;
30  struct StorageType {
31  int overflows {0};
32  time_t time;
33  ValueType value;
34  };
35  StorageType data[capacity];
36  std::atomic<index_type> read_index;
37  std::mutex read_mutex;
38  std::condition_variable read_bell;
39 
40  std::atomic<index_type> write_index;
41  std::mutex write_mutex;
42  std::condition_variable write_bell;
43 
45  inline index_type next (index_type index) {
46  return (index + 1) % capacity;
47  }
48 
50  inline void write_common() {
51  data[write_index].time = time (nullptr);
54  read_bell.notify_one();
55  }
56 
57 public:
58  using value_type = ValueType;
59 
63  template <typename VT>
64  void push_back (VT &&value) {
65  std::unique_lock<std::mutex> lock (write_mutex);
66  while (next (write_index) == read_index) {
67  write_bell.wait (lock);
68  }
69  data[write_index].value = std::move (value);
70  write_common();
71  }
72 
81  template <typename VT>
82  bool try_push_back (VT &&value,
83  const std::chrono::milliseconds &wait_duration = std::chrono::milliseconds::zero()) {
84  std::unique_lock<std::mutex> lock (write_mutex);
85  if (next (write_index) == read_index) {
86  if (!read_bell.wait_for (lock, wait_duration, [this] ()->bool {
87  return (next (write_index) != read_index);
88  })) {
90  return false;
91  }
92  }
93  data[write_index].value = std::move (value);
94  write_common();
95  return true;
96  }
97 
103  ValueType pop_front (int *overflows = nullptr, time_t *sent_time = nullptr) {
104  std::unique_lock<std::mutex> lock (read_mutex);
105  while (read_index == write_index) {
106  read_bell.wait (lock);
107  }
108  if (overflows) {
109  *overflows = data[read_index].overflows;
110  }
111  if (sent_time) {
112  *sent_time = data[read_index].time;
113  }
114  ValueType temp = std::move (data[read_index].value);
116  write_bell.notify_one();
117  return temp;
118  }
119 
126  bool try_pop_front (ValueType *value,
127  int *overflows = nullptr,
128  time_t *sent_time = nullptr,
129  std::chrono::milliseconds wait_duration = std::chrono::milliseconds::zero()) {
130  std::unique_lock<std::mutex> lock (read_mutex);
131  if (read_index == write_index) {
132  if (!read_bell.wait_for (lock, wait_duration, [this] ()->bool {
133  return (read_index != write_index);
134  })) {
135  return false;
136  };
137  }
138  if (overflows) {
139  *overflows = data[read_index].overflows;
140  }
141  if (sent_time) {
142  *sent_time = data[read_index].time;
143  }
144  *value = std::move (data[read_index].value);
146  write_bell.notify_one();
147  return true;
148  }
149 };
An circular queue meant for sharing data across threads.
Definition: threadsafecircularqueue.h:28
index_type next(index_type index)
Increment an index, wrapping around buffer size.
Definition: threadsafecircularqueue.h:45
ValueType value_type
Definition: threadsafecircularqueue.h:58
std::mutex write_mutex
Definition: threadsafecircularqueue.h:41
ValueType pop_front(int *overflows=nullptr, time_t *sent_time=nullptr)
Retrieve an item from the queue.
Definition: threadsafecircularqueue.h:103
uint32_t index_type
Definition: threadsafecircularqueue.h:29
void write_common()
Perform actions common to the various forms of push_back.
Definition: threadsafecircularqueue.h:50
std::atomic< index_type > write_index
Definition: threadsafecircularqueue.h:40
bool try_pop_front(ValueType *value, int *overflows=nullptr, time_t *sent_time=nullptr, std::chrono::milliseconds wait_duration=std::chrono::milliseconds::zero())
Retrieve an item from the queue.
Definition: threadsafecircularqueue.h:126
std::condition_variable write_bell
Definition: threadsafecircularqueue.h:42
void push_back(VT &&value)
Add or move an item into the queue.
Definition: threadsafecircularqueue.h:64
bool try_push_back(VT &&value, const std::chrono::milliseconds &wait_duration=std::chrono::milliseconds::zero())
Add or move an item into the queue.
Definition: threadsafecircularqueue.h:82
StorageType data[capacity]
Definition: threadsafecircularqueue.h:35
std::mutex read_mutex
Definition: threadsafecircularqueue.h:37
std::condition_variable read_bell
Definition: threadsafecircularqueue.h:38
std::atomic< index_type > read_index
Definition: threadsafecircularqueue.h:36
uint32_t value
Definition: audiooutput.cpp:68
Definition: threadsafecircularqueue.h:30
int overflows
Number of messages lost due to queue overflow.
Definition: threadsafecircularqueue.h:31
ValueType value
Definition: threadsafecircularqueue.h:33
time_t time
Time at which an item was queued.
Definition: threadsafecircularqueue.h:32