#include <atomic>
#include <thread>
#include <future>
#include <catch.hpp>
using namespace std::chrono_literals;
template <
typename Type>
using Deque = std::deque<Type>;
struct Context
{
EventQueue ConsumerMQ;
EventQueue Producer1MQ;
EventQueue Producer2MQ;
inline void terminate() noexcept
{
Producer1MQ.push(0);
Producer2MQ.push(0);
}
};
void Producer(Context &application, EventQueue &myMQ)
{
while (not MyEvents.empty())
{
if (myMQ.peek())
{
break;
}
Event event = MyEvents.front();
MyEvents.pop_front();
application.ConsumerMQ.push(std::move(event));
std::this_thread::sleep_for(50ms);
}
}
void Consumer(Context &application, TestResult &result)
{
while (true)
{
EventQueue::Enumerable events = application.ConsumerMQ.getEnumerable();
CHECK(events.size() > 0);
result.insert(result.end(), events.begin(), events.end());
if (result.back() == 0)
{
result.pop_back();
break;
}
}
}
using Future = std::future<void>;
using FutureStatus = std::future_status;
using Launch = std::launch;
SCENARIO("Message Queues are reliable", "[util][thread]")
{
GIVEN("I have two producer threads and a consumer thread")
{
Context context;
TestResult result;
Future consumer = std::async(Launch::async, &Consumer, std::ref(context), std::ref(result));
Future producer1 = std::async(Launch::async, &Producer, std::ref(context), std::ref(context.Producer1MQ));
Future producer2 = std::async(Launch::async, &Producer, std::ref(context), std::ref(context.Producer2MQ));
WHEN("My producers send all their messages to the consumer")
{
auto PingDelta = 50ms;
int maxPings = 100;
FutureStatus status = FutureStatus::timeout;
while (status == FutureStatus::timeout and maxPings-- > 0)
{
if (maxPings % 10 == 0)
status = producer1.wait_for(PingDelta);
}
if (status != FutureStatus::ready)
{
context.terminate();
CHECK(status == FutureStatus::ready);
}
status = FutureStatus::timeout;
while (status == FutureStatus::timeout and maxPings-- > 0)
{
if (maxPings % 10 == 0)
status = producer2.wait_for(PingDelta);
}
if (status != FutureStatus::ready)
{
context.terminate();
CHECK(status == FutureStatus::ready);
}
context.terminate();
status = FutureStatus::timeout;
while (status == FutureStatus::timeout and maxPings-- > 0)
{
if (maxPings % 10 == 0)
status = consumer.wait_for(PingDelta);
}
CHECK(status == FutureStatus::ready);
THEN("My consumer receives all their messages")
{
int x1 = 1, x2 = 1;
for (int event : result)
{
if (event == x1)
{
x1++;
}
else
{
CHECK(event == x2++);
}
}
}
}
}
}