79 #ifndef CGU_ASYNC_CHANNEL_H
80 #define CGU_ASYNC_CHANNEL_H
93 #ifdef CGU_USE_SCHED_YIELD
105 --(*
static_cast<std::size_t*
>(arg));
212 enum Status {normal, closed, destructing} status;
261 if (status == normal) {
290 bool push(
const value_type& obj) {
291 bool waiting =
false;
293 if (status != normal)
return false;
304 while (size >= n && status == normal) {
312 pthread_cleanup_pop(
false);
316 Status local_status = status;
317 if (waiting) --waiters;
318 if (local_status != normal)
return false;
320 size_type next = (idx + size) % n;
321 new (
static_cast<void*
>(buf + next)) T{obj};
352 bool waiting =
false;
354 if (status != normal)
return false;
365 while (size >= n && status == normal) {
373 pthread_cleanup_pop(
false);
377 Status local_status = status;
378 if (waiting) --waiters;
379 if (local_status != normal)
return false;
381 size_type next = (idx + size) % n;
382 new (
static_cast<void*
>(buf + next)) T{std::move(obj)};
413 template<
class... Args>
415 bool waiting =
false;
417 if (status != normal)
return false;
428 while (size >= n && status == normal) {
436 pthread_cleanup_pop(
false);
440 Status local_status = status;
441 if (waiting) --waiters;
442 if (local_status != normal)
return false;
444 size_type next = (idx + size) % n;
445 new (
static_cast<void*
>(buf + next)) T{std::forward<Args>(args)...};
476 bool pop(value_type& obj) {
477 bool waiting =
false;
489 while (!size && status == normal) {
497 pthread_cleanup_pop(
false);
499 if (status == destructing) {
502 if (waiting) --waiters;
506 size_type old_idx = idx;
509 if (idx == n) idx = 0;
517 if (waiting) --waiters;
521 if (waiting) --waiters;
526 if (waiting) --waiters;
567 bool waiting =
false;
579 while (!size && status == normal) {
587 pthread_cleanup_pop(
false);
589 if (status == destructing) {
592 if (waiting) --waiters;
596 size_type old_idx = idx;
597 obj = std::move(buf[old_idx]);
599 if (idx == n) idx = 0;
607 if (waiting) --waiters;
611 if (waiting) --waiters;
616 if (waiting) --waiters;
658 buf(static_cast<T*>(
std::malloc(sizeof(T) * n))) {
659 static_assert(n != 0,
"AsyncChannel objects may not be created with size 0");
660 if (!buf)
throw std::bad_alloc();
678 status = destructing;
690 #ifdef CGU_USE_SCHED_YIELD
704 if (idx == n) idx = 0;
715 #ifndef DOXYGEN_PARSING
727 mutable Thread::Mutex mutex;
731 enum Status {normal, closed, destructing} status;
742 void close() noexcept {
743 Thread::Mutex::Lock lock{mutex};
744 if (status == normal) {
750 bool push(
const value_type& obj) {
751 bool waiting =
false;
752 Thread::Mutex::Lock lock{mutex};
753 if (status != normal)
return false;
764 while (full && status == normal) {
771 Thread::CancelBlock b;
772 pthread_cleanup_pop(
false);
776 Status local_status = status;
777 if (waiting) --waiters;
778 if (local_status != normal)
return false;
779 new (
static_cast<void*
>(datum)) T{obj};
785 bool push(value_type&& obj) {
786 bool waiting =
false;
787 Thread::Mutex::Lock lock{mutex};
788 if (status != normal)
return false;
799 while (full && status == normal) {
806 Thread::CancelBlock b;
807 pthread_cleanup_pop(
false);
811 Status local_status = status;
812 if (waiting) --waiters;
813 if (local_status != normal)
return false;
814 new (
static_cast<void*
>(datum)) T{std::move(obj)};
820 template<
class... Args>
822 bool waiting =
false;
823 Thread::Mutex::Lock lock{mutex};
824 if (status != normal)
return false;
835 while (full && status == normal) {
842 Thread::CancelBlock b;
843 pthread_cleanup_pop(
false);
847 Status local_status = status;
848 if (waiting) --waiters;
849 if (local_status != normal)
return false;
850 new (
static_cast<void*
>(datum)) T{std::forward<Args>(args)...};
856 bool pop(value_type& obj) {
857 bool waiting =
false;
858 Thread::Mutex::Lock lock{mutex};
869 while (!full && status == normal) {
876 Thread::CancelBlock b;
877 pthread_cleanup_pop(
false);
879 if (status == destructing) {
882 if (waiting) --waiters;
894 if (waiting) --waiters;
898 if (waiting) --waiters;
903 if (waiting) --waiters;
909 bool waiting =
false;
910 Thread::Mutex::Lock lock{mutex};
921 while (!full && status == normal) {
928 Thread::CancelBlock b;
929 pthread_cleanup_pop(
false);
931 if (status == destructing) {
934 if (waiting) --waiters;
938 obj = std::move(*datum);
946 if (waiting) --waiters;
950 if (waiting) --waiters;
955 if (waiting) --waiters;
960 AsyncChannel(): waiters(0), full(false), status(normal),
970 datum(static_cast<T*>(
std::malloc(sizeof(T)))) {
971 if (!datum)
throw std::bad_alloc();
976 status = destructing;
988 #ifdef CGU_USE_SCHED_YIELD
999 if (full) datum->~T();
1006 #endif // DOXYGEN_PARSING
1010 #endif // CGU_ASYNC_CHANNEL_H
A thread-safe "channel" class for inter-thread communication.
Definition: async_channel.h:201
int lock() noexcept
Definition: mutex.h:147
int unlock() noexcept
Definition: mutex.h:170
A wrapper class for pthread condition variables.
Definition: mutex.h:449
bool move_pop(value_type &obj)
Definition: async_channel.h:566
bool pop(value_type &obj)
Definition: async_channel.h:476
A class enabling the cancellation state of a thread to be controlled.
Definition: thread.h:723
A scoped locking class for exception safe Mutex locking.
Definition: mutex.h:207
T value_type
Definition: async_channel.h:203
bool push(value_type &&obj)
Definition: async_channel.h:351
bool emplace(Args &&...args)
Definition: async_channel.h:414
A wrapper class for pthread mutexes.
Definition: mutex.h:117
AsyncChannel & operator=(const AsyncChannel &)=delete
Provides wrapper classes for pthread mutexes and condition variables, and scoped locking classes for ...
Definition: application.h:44
~AsyncChannel()
Definition: async_channel.h:676
std::size_t size_type
Definition: async_channel.h:204
AsyncChannel()
Definition: async_channel.h:648
void close() noexcept
Definition: async_channel.h:259
int broadcast() noexcept
Definition: mutex.h:483
void cgu_async_channel_waiters_dec(void *arg)
Definition: async_channel.h:104
bool push(const value_type &obj)
Definition: async_channel.h:290
#define CGU_GLIB_MEMORY_SLICES_FUNCS
Definition: cgu_config.h:84
int wait(Mutex &mutex)
Definition: mutex.h:513