상세 컨텐츠

본문 제목

SendBuffer

똑똑한 개발/C++ 게임개발

by 성댕쓰 2021. 12. 21. 22:48

본문

기존 send는 호출할 때마다 buffer를 복사하기 때문에 복사비용이 많이 발생한다. server에서 하나의 client에만 send하는 경우보다 여러 client에 broadcast하는 경우가 훨씬 많으므로 더 적은 비용의 sendbuffer를 구현해보자.

 

SendBuffer.h

#pragma once

/*-----------
  SendBuffer
-----------*/

class SendBuffer : enable_shared_from_this<SendBuffer>
{
public:
    SendBuffer(int32 bufferSize);
    ~SendBuffer();

    BYTE* Buffer() { return _buffer.data(); }
    int32 WriteSize() { return _writeSize; }
    int32 Capacity() { return static_cast<int32>(_buffer.size()); }

    void CopyData(void* data, int32 len);

private:
    Vector<BYTE> _buffer;
    int32 _writeSize = 0;
};

SendBuffer.cpp

#include "pch.h"
#include "SendBuffer.h"

/*-----------
  SendBuffer
-----------*/

SendBuffer::SendBuffer(int32 bufferSize)
{
    _buffer.resize(bufferSize);
}

SendBuffer::~SendBuffer()
{
}

void SendBuffer::CopyData(void* data, int32 len)
{
    ASSERT_CRASH(Capacity() >= len);
    ::memcpy(_buffer.data(), data, len);
    _writeSize = len;
}

SendEvent가 다수의 SendBuffer를 가지도록 변경한다.

IocpEvent.h

...
/*-----------------
  SendEvent
-----------------*/
class SendEvent : public IocpEvent
{
public:
    SendEvent() : IocpEvent(EventType::Send) {}

    Vector<SendBufferRef> sendBuffers;
};

Session에서 SendEvent를 활용하도록 변경한다.

Session.h

...
class Service;
/*----------------
  Session
----------------*/

class Session : public IocpObject
{
...
public:
    /* 외부에서 사용 */
    void Send(SendBufferRef sendBuffer);
...

private:
    /* 전송 관련 */
...
    void RegisterRecv();
    void RegisterSend();

...
    void ProcessSend(int32 numOfBytes);

...

private:
    USE_LOCK;

    /* 수신 관련 */
    RecvBuffer _recvBuffer;

    /* 송신 관련 */
    Queue<SendBufferRef> _sendQueue;
    Atomic<bool> _sendRegistered = false;

private:
    /* IocpEvent 재사용 */
...
    RecvEvent _recvEvent;
    SendEvent _sendEvent;
};

Session.cpp

...

void Session::Send(SendBufferRef sendBuffer)
{
    // 현재 RegisterSend가 걸리지 않은 상태라면, 걸어준다
    WRITE_LOCK;

    _sendQueue.push(sendBuffer);

    if (_sendRegistered.exchange(true) == false)
        RegisterSend();
}
...
void Session::Dispatch(IocpEvent* iocpEvent, int32 numOfBytes)
{
    switch (iocpEvent->eventType)
    {
    case EventType::Connect:
        ProcessConnect();
        break;
    case EventType::Disconnect:
        ProcessDisconnect();
        break;
    case EventType::Recv:
        ProcessRecv(numOfBytes);
        break;
    case EventType::Send:
        ProcessSend(numOfBytes);
        break;
    }
}
...
void Session::RegisterSend()
{
    if (IsConnected() == false)
        return;

    _sendEvent.Init();
    _sendEvent.owner = shared_from_this(); // ADD_REF

    // 보낼 데이터를 sendEvent에 등록
    {
        WRITE_LOCK;

        int32 writeSize = 0;
        while (_sendQueue.empty() == false)
        {
            SendBufferRef sendBuffer = _sendQueue.front();

            writeSize += sendBuffer->WriteSize();
            // TODO : 예외 체크

            _sendQueue.pop();
            _sendEvent.sendBuffers.push_back(sendBuffer);
        }
    }

    // Scatter-Gather (흩어져 있는 데이터들을 모아서 한 방에 보낸다)
    Vector<WSABUF> wsaBufs;
    wsaBufs.reserve(_sendEvent.sendBuffers.size());
    for (SendBufferRef sendBuffer : _sendEvent.sendBuffers)
    {
        WSABUF wsaBuf;
        wsaBuf.buf = reinterpret_cast<char*>(sendBuffer->Buffer());
        wsaBuf.len = static_cast<LONG>(sendBuffer->WriteSize());
        wsaBufs.push_back(wsaBuf);
    }

    DWORD numOfBytes = 0;
    if (SOCKET_ERROR == ::WSASend(_socket, wsaBufs.data(), static_cast<DWORD>(wsaBufs.size()), OUT & numOfBytes, 0, &_sendEvent, nullptr))
    {
        int32 errorCode = ::WSAGetLastError();
        if (errorCode != WSA_IO_PENDING)
        {
            HandleError(errorCode);
            _sendEvent.owner = nullptr; //RELEASE_REF
            _sendEvent.sendBuffers.clear(); //RELEASE_REF
            _sendRegistered.store(false);
        }
    }
}
...
void Session::ProcessSend(int32 numOfBytes)
{
    _sendEvent.owner = nullptr; // RELEASE_REF
    _sendEvent.sendBuffers.clear(); // RELEASE_REF

    if (numOfBytes == 0)
    {
        Disconnect(L"Send 0");
        return;
    }

    // 컨텐츠 코드에서 오버라이딩
    OnSend(numOfBytes);

    WRITE_LOCK;
    if (_sendQueue.empty())
        _sendRegistered.store(false);
    else
        RegisterSend();
}
...

sendEvent를 멤버로 두고 다수의 send요청이 와도 한 번의 RegisterSend를 호출하도록 구현했다.

RegisterSend에서 Scatter-Gather 방법으로 WSASend를 호출한다. ProcessSend 실행 동안 sendQueue에 데이터가 있으면 바로 RegisterSend를 호출한다.

 

DummyClient에 변경된 Session 내용을 적용한다.

DummyClient.cpp

...
char sendData[] = "Hello World";

class ServerSession : public Session
{
public:
    ~ServerSession()
    {
        cout << "~ServerSession" << endl;
    }

    virtual void OnConnected() override
    {
        cout << "Connected To Server" << endl;

        SendBufferRef sendBuffer = MakeShared<SendBuffer>(4096);
        sendBuffer->CopyData(sendData, sizeof(sendData));
        Send(sendBuffer);
    }

    virtual int32 OnRecv(BYTE* buffer, int32 len) override
    {
        //Echo
        cout << "OnRecv Len = " << len << endl;

        this_thread::sleep_for(1s);

        SendBufferRef sendBuffer = MakeShared<SendBuffer>(4096);
        sendBuffer->CopyData(sendData, sizeof(sendData));
        Send(sendBuffer);

        return len;
    }
...
};
int main()
{
    this_thread::sleep_for(1s);

    ClientServiceRef service = MakeShared<ClientService>(
        NetAddress(L"127.0.0.1", 7777),
        MakeShared<IocpCore>(),
        MakeShared<ServerSession>,
        5);

...
}

 

DeadLockProfile 부분에  lockStack을 일반 멤버변수로 사용하는 버그가 있었다. 서로 다른 쓰레드에서 같은 lockStack을 사용하니 데이터가 꼬이는 문제가 발생한다. tls 변수로 변경하여 쓰레드별 독립된 lockStack을 사용하도록 수정하자.

 

CoreTLS.h

#pragma once
#include <stack>

extern thread_local uint32 LThreadId;
extern thread_local std::stack<int32> LLockStack;

CoreTLS.cpp

#include "pch.h"
#include "CoreTLS.h"

thread_local uint32 LThreadId = 0;
thread_local std::stack<int32> LLockStack;

DeadLockProfiler.h _lockStack삭제

DeadLockProfiler.cpp _lockStack -> LLockStack으로 이름 변경

 

여러 client로 send하는 broadcast 기능을 구현해보자.

GameSession을 관리하는 GameSession Manager가  broadcast를 정의를 갖는다. GameSession은 broadcast가 필요하면 GameSessionManager를 참조하여 boradcast한다.

 

GameSession.h

#pragma once
#include "Session.h"

class GameSession : public Session
{
public:
    ~GameSession()
    {
        cout << "~GameSession" << endl;
    }

    virtual void OnConnected() override;
    virtual void OnDisconnected() override;
    virtual int32 OnRecv(BYTE* buffer, int32 len) override;

    virtual void OnSend(int32 len) override;
};

GameSession.cpp

#include "pch.h"
#include "GameSession.h"
#include "GameSessionManager.h"

void GameSession::OnConnected()
{
    GSessionManager.Add(static_pointer_cast<GameSession>(shared_from_this()));
}

void GameSession::OnDisconnected()
{
    GSessionManager.Remove(static_pointer_cast<GameSession>(shared_from_this()));
}

int32 GameSession::OnRecv(BYTE* buffer, int32 len)
{
    //Echo
    cout << "OnRecv Len = " << len << endl;

    SendBufferRef sendBuffer = MakeShared<SendBuffer>(4096);
    sendBuffer->CopyData(buffer, len);
    
    GSessionManager.Broadcast(sendBuffer);

    return len;
}

void GameSession::OnSend(int32 len)
{
    cout << "OnSend Len = " << len << endl;
}

 

GameSessionManager.h

#pragma once

class GameSession;

using GameSessionRef = shared_ptr<GameSession>;

class GameSessionManager
{
public:
    void Add(GameSessionRef session);
    void Remove(GameSessionRef session);
    void Broadcast(SendBufferRef sendBuffer);

private:
    USE_LOCK;
    Set<GameSessionRef> _sessions;
};

extern GameSessionManager GSessionManager;

 

GameSessionManager.cpp

#include "pch.h"
#include "GameSessionManager.h"
#include "GameSession.h"

GameSessionManager GSessionManager;

void GameSessionManager::Add(GameSessionRef session)
{
    WRITE_LOCK;
    _sessions.insert(session);
}

void GameSessionManager::Remove(GameSessionRef session)
{
    WRITE_LOCK;
    _sessions.erase(session);
}

void GameSessionManager::Broadcast(SendBufferRef sendBuffer)
{
    WRITE_LOCK;
    for (GameSessionRef session : _sessions)
    {
        session->Send(sendBuffer);
    }
}

 

1. Sendbuffer를 구현할 때 RecvBuffer와 달리 고려해야 할 것은?

- 하나의 data를 하나의 세션에 보내는 게 아닌, 여러 세션에 보내야 함. 따라서 복사비용을 생각해야 함.

2. SendBuffer가 refcount 대상인 이유는?

- WSASend 완료 통지가 받을 때까지 buffer 메모리 해제 되면 안되기 때문.

3. BroadCast 방법은?

- BroadCast할 세션을 모아서 Manager로 관리하고 Manager가 가지고 있는 세션을 순회하면서 Send 호출한다.

 

참조 : [C++과 언리얼로 만드는 MMORPG 게임 개발 시리즈] Part4: 게임 서버 - 인프런 | 강의 (inflearn.com)

'똑똑한 개발 > C++ 게임개발' 카테고리의 다른 글

Packet Session  (0) 2023.04.16
SendBuffer Pooling  (0) 2023.04.05
RecvBuffer  (0) 2021.12.20
네트워크 라이브러리 만들기(Session #3)  (0) 2021.12.13
네트워크 라이브러리 만들기(Session #2)  (0) 2021.12.08

관련글 더보기

댓글 영역