상세 컨텐츠

본문 제목

Strand 정리

똑똑한 개발/C++

by 성댕쓰 2022. 8. 23. 14:54

본문

Strands 란

event handler 순차 실행을 보장하는 api.
이를 활용하면, 멀티 쓰레드 환경에서 명시적 lock없이 코드를 만들 수 있다.

strand는 application code와 handler 실행 사이에 layer를 제공한다.
worker thread가 직접 handler를 호출하지 않고 strand queue에 쌓는다.

 

Strand Implementation

Strand가 보장해야 할 기능은 다음과 같다.

  • 동시에 핸들러를 실행하지 않는다.
    • 이를 만족하기 위해, worker 스레드가 strand를 실행하고 있는지 확인할 수 있어야 한다.
    • strand는 queue를 가지고 handler를 queuing 한다.
  • 핸들러는 worker 스레드에서만 실행된다.(boost 용어로 io_service::run 실행하는 thread)
  • 핸들러 실행 순서는 보장하지 않음.

구현해야할 주요 메서드는 다음과 같다.

  • post
    • 추후 실행할 handler를 추가하는 기능. 핸들러를 실행하지 않는다.
  • dispatch
    • 조건이 맞으면 handler 바로 실행. 아니면, post 한다.
  • run
    • 대기중인 handler 실행. public interface 아님.



사용한 Helper class는 다음과 같다.

  • CallStack
    • 현재 콜스택에 마커를 놓고, 현재 스레드에서 함수가 실행되는지 확인할 수 있게 해줌.
  • WorkQueue
    • 간단한 mutiple producer / consumer queue.

그리고 monitor 클래스.
Object T에 접근할 때 동기화 기능을 갖게 한다.

template <class T>
class Monitor {
private:
    mutable T m_t;
    mutable std::mutex m_mtx;
 
public:
    using Type = T;
    Monitor() {}
    Monitor(T t_) : m_t(std::move(t_)) {}
    template <typename F>
    auto operator()(F f) const -> decltype(f(m_t)) {
        std::lock_guard<std::mutex> hold{m_mtx};
        return f(m_t);
    }
};

 

Strand 구현

#pragma once
#include "Callstack.h"
#include "Monitor.h"
#include <assert.h>
#include <queue>
#include <functional>
 
//
// A strand serializes handler execution.
// It guarantees the following:
// - No handlers executes concurrently
// - Handlers are only executed from the specified Processor
// - Handler execution order is not guaranteed
//
// Specified Processor must implement the following interface:
//
//  template <typename F> void Processor::push(F w);
//      Add a new work item to the processor. F is a callable convertible
// to std::function<void()>
//
//  bool Processor::canDispatch();
//      Should return true if we are in the Processor's dispatching function in
// the current thread.
//
template <typename Processor>
class Strand {
public:
    Strand(Processor& proc) : m_proc(proc) {}
 
    Strand(const Strand&) = delete;
    Strand& operator=(const Strand&) = delete;
 
    // Executes the handler immediately if all the strand guarantees are met,
    // or posts the handler for later execution if the guarantees are not met
    // from inside this call
    template <typename F>
    void dispatch(F handler) {
        // If we are not currently in the processor dispatching function (in
        // this thread), then we cannot possibly execute the handler here, so
        // enqueue it and bail out
        if (!m_proc.canDispatch()) {
            post(std::move(handler));
            return;
        }
 
        // NOTE: At this point we know we are in a worker thread (because of the
        // check above)
 
        // If we are running the strand in this thread, then we can execute the
        // handler immediately without any other checks, since by design no
        // other threads can be running the strand
        if (runningInThisThread()) {
            handler();
            return;
        }
 
        // At this point we know we are in a worker thread, but not running the
        // strand in this thread.
        // The strand can still be running in another worker thread, so we need
        // to atomically enqueue the handler for the other thread to execute OR
        // mark the strand as running in this thread
        auto trigger = m_data([&](Data& data) {
            if (data.running) {
                data.q.push(std::move(handler));
                return false;
            } else {
                data.running = true;
                return true;
            }
        });
 
        if (trigger) {
            // Add a marker to the callstack, so the handler knows the strand is
            // running in the current thread
            Callstack<Strand>::Context ctx(this);
            handler();
 
            // Run any remaining handlers.
            // At this point we own the strand (It's marked as running in
            // this thread), and we don't release it until the queue is empty.
            // This means any other threads adding handlers to the strand will
            // enqueue them, and they will be executed here.
            run();
        }
    }
 
    // Post an handler for execution and returns immediately.
    // The handler is never executed as part of this call.
    template <typename F>
    void post(F handler) {
        // We atomically enqueue the handler AND check if we need to start the
        // running process.
        bool trigger = m_data([&](Data& data) {
            data.q.push(std::move(handler));
            if (data.running) {
                return false;
            } else {
                data.running = true;
                return true;
            }
        });
 
        // The strand was not running, so trigger a run
        if (trigger) {
            m_proc.push([this] { run(); });
        }
    }
 
    // Checks if we are currently running the strand in this thread
    bool runningInThisThread() {
        return Callstack<Strand>::contains(this) != nullptr;
    }
 
private:
    // Processes any enqueued handlers.
    // This assumes the strand is marked as running.
    // When there are no more handlers, it marks the strand as not running.
    void run() {
        Callstack<Strand>::Context ctx(this);
        while (true) {
            std::function<void()> handler;
            m_data([&](Data& data) {
                assert(data.running);
                if (data.q.size()) {
                    handler = std::move(data.q.front());
                    data.q.pop();
                } else {
                    data.running = false;
                }
            });
 
            if (handler)
                handler();
            else
                return;
        }
    }
 
    struct Data {
        bool running = false;
        std::queue<std::function<void()>> q;
    };
    Monitor<Data> m_data;
    Processor& m_proc;
};

사용예

#include "Strand.h"
#include "WorkQueue.h"
#include <random>
#include <stdlib.h>
#include <string>
#include <atomic>
 
int randInRange(int min, int max) {
    std::random_device rd;
    std::mt19937 eng(rd());
    std::uniform_int_distribution<> distr(min, max);
    return distr(eng);
}
 
struct Obj {
    explicit Obj(int n, WorkQueue& wp) : strand(wp) {
        name = "Obj " + std::to_string(n);
    }
 
    void doSomething(int val) {
        printf("%s : doing %dn", name.c_str(), val);
    }
    std::string name;
    Strand<WorkQueue> strand;
};
 
void strandSample() {
    WorkQueue workQueue;
    // Start a couple of worker threads
    std::vector<std::thread> workerThreads;
    for (int i = 0; i < 4; i++) {
        workerThreads.push_back(std::thread([&workQueue] { workQueue.run(); }));
    }
 
    // Create a couple of objects that need strands
    std::vector<std::unique_ptr<Obj>> objs;
    for (int i = 0; i < 8; i++) {
        objs.push_back(std::make_unique<Obj>(i, workQueue));
    }
 
    // Counter used by all strands, so we can check if all work was done
    std::atomic<int> doneCount(0);
 
    // Add work to random objects
    const int todo = 20;
    for (int i = 0; i < todo; i++) {
        auto&& obj = objs[randInRange(0, objs.size() - 1)];
        obj->strand.post([&obj, i, &doneCount] {
            obj->doSomething(i);
            ++doneCount;
        });
    }
 
    workQueue.stop();
    for (auto&& t : workerThreads) {
        t.join();
    }
 
    assert(doneCount == todo);
}

결론

  • 명시적 lock이 없어서 코드가 간결해지고, 버그 확률도 줄어든다.
  • blocking 이 적어진다.
  • Cache locality가 좋아진다.

참고 : https://www.gamedeveloper.com/programming/how-strands-work-and-why-you-should-use-them

'똑똑한 개발 > C++' 카테고리의 다른 글

DllMain 정리  (0) 2022.08.24
[boost] asio#2  (0) 2022.05.02
[boost] asio #1  (0) 2022.05.01
[Visual Studio] Character Set Unicode vs MBCS  (0) 2022.04.26
enable_shared_from_this  (0) 2022.04.22

관련글 더보기

댓글 영역