event handler 순차 실행을 보장하는 api.
이를 활용하면, 멀티 쓰레드 환경에서 명시적 lock없이 코드를 만들 수 있다.
strand는 application code와 handler 실행 사이에 layer를 제공한다.
worker thread가 직접 handler를 호출하지 않고 strand queue에 쌓는다.
Strand가 보장해야 할 기능은 다음과 같다.
구현해야할 주요 메서드는 다음과 같다.
사용한 Helper class는 다음과 같다.
그리고 monitor 클래스.
Object T에 접근할 때 동기화 기능을 갖게 한다.
template <class T>
class Monitor {
private:
mutable T m_t;
mutable std::mutex m_mtx;
public:
using Type = T;
Monitor() {}
Monitor(T t_) : m_t(std::move(t_)) {}
template <typename F>
auto operator()(F f) const -> decltype(f(m_t)) {
std::lock_guard<std::mutex> hold{m_mtx};
return f(m_t);
}
};
Strand 구현
#pragma once
#include "Callstack.h"
#include "Monitor.h"
#include <assert.h>
#include <queue>
#include <functional>
//
// A strand serializes handler execution.
// It guarantees the following:
// - No handlers executes concurrently
// - Handlers are only executed from the specified Processor
// - Handler execution order is not guaranteed
//
// Specified Processor must implement the following interface:
//
// template <typename F> void Processor::push(F w);
// Add a new work item to the processor. F is a callable convertible
// to std::function<void()>
//
// bool Processor::canDispatch();
// Should return true if we are in the Processor's dispatching function in
// the current thread.
//
template <typename Processor>
class Strand {
public:
Strand(Processor& proc) : m_proc(proc) {}
Strand(const Strand&) = delete;
Strand& operator=(const Strand&) = delete;
// Executes the handler immediately if all the strand guarantees are met,
// or posts the handler for later execution if the guarantees are not met
// from inside this call
template <typename F>
void dispatch(F handler) {
// If we are not currently in the processor dispatching function (in
// this thread), then we cannot possibly execute the handler here, so
// enqueue it and bail out
if (!m_proc.canDispatch()) {
post(std::move(handler));
return;
}
// NOTE: At this point we know we are in a worker thread (because of the
// check above)
// If we are running the strand in this thread, then we can execute the
// handler immediately without any other checks, since by design no
// other threads can be running the strand
if (runningInThisThread()) {
handler();
return;
}
// At this point we know we are in a worker thread, but not running the
// strand in this thread.
// The strand can still be running in another worker thread, so we need
// to atomically enqueue the handler for the other thread to execute OR
// mark the strand as running in this thread
auto trigger = m_data([&](Data& data) {
if (data.running) {
data.q.push(std::move(handler));
return false;
} else {
data.running = true;
return true;
}
});
if (trigger) {
// Add a marker to the callstack, so the handler knows the strand is
// running in the current thread
Callstack<Strand>::Context ctx(this);
handler();
// Run any remaining handlers.
// At this point we own the strand (It's marked as running in
// this thread), and we don't release it until the queue is empty.
// This means any other threads adding handlers to the strand will
// enqueue them, and they will be executed here.
run();
}
}
// Post an handler for execution and returns immediately.
// The handler is never executed as part of this call.
template <typename F>
void post(F handler) {
// We atomically enqueue the handler AND check if we need to start the
// running process.
bool trigger = m_data([&](Data& data) {
data.q.push(std::move(handler));
if (data.running) {
return false;
} else {
data.running = true;
return true;
}
});
// The strand was not running, so trigger a run
if (trigger) {
m_proc.push([this] { run(); });
}
}
// Checks if we are currently running the strand in this thread
bool runningInThisThread() {
return Callstack<Strand>::contains(this) != nullptr;
}
private:
// Processes any enqueued handlers.
// This assumes the strand is marked as running.
// When there are no more handlers, it marks the strand as not running.
void run() {
Callstack<Strand>::Context ctx(this);
while (true) {
std::function<void()> handler;
m_data([&](Data& data) {
assert(data.running);
if (data.q.size()) {
handler = std::move(data.q.front());
data.q.pop();
} else {
data.running = false;
}
});
if (handler)
handler();
else
return;
}
}
struct Data {
bool running = false;
std::queue<std::function<void()>> q;
};
Monitor<Data> m_data;
Processor& m_proc;
};
사용예
#include "Strand.h"
#include "WorkQueue.h"
#include <random>
#include <stdlib.h>
#include <string>
#include <atomic>
int randInRange(int min, int max) {
std::random_device rd;
std::mt19937 eng(rd());
std::uniform_int_distribution<> distr(min, max);
return distr(eng);
}
struct Obj {
explicit Obj(int n, WorkQueue& wp) : strand(wp) {
name = "Obj " + std::to_string(n);
}
void doSomething(int val) {
printf("%s : doing %dn", name.c_str(), val);
}
std::string name;
Strand<WorkQueue> strand;
};
void strandSample() {
WorkQueue workQueue;
// Start a couple of worker threads
std::vector<std::thread> workerThreads;
for (int i = 0; i < 4; i++) {
workerThreads.push_back(std::thread([&workQueue] { workQueue.run(); }));
}
// Create a couple of objects that need strands
std::vector<std::unique_ptr<Obj>> objs;
for (int i = 0; i < 8; i++) {
objs.push_back(std::make_unique<Obj>(i, workQueue));
}
// Counter used by all strands, so we can check if all work was done
std::atomic<int> doneCount(0);
// Add work to random objects
const int todo = 20;
for (int i = 0; i < todo; i++) {
auto&& obj = objs[randInRange(0, objs.size() - 1)];
obj->strand.post([&obj, i, &doneCount] {
obj->doSomething(i);
++doneCount;
});
}
workQueue.stop();
for (auto&& t : workerThreads) {
t.join();
}
assert(doneCount == todo);
}
참고 : https://www.gamedeveloper.com/programming/how-strands-work-and-why-you-should-use-them
DllMain 정리 (0) | 2022.08.24 |
---|---|
[boost] asio#2 (0) | 2022.05.02 |
[boost] asio #1 (0) | 2022.05.01 |
[Visual Studio] Character Set Unicode vs MBCS (0) | 2022.04.26 |
enable_shared_from_this (0) | 2022.04.22 |
댓글 영역