기존 send는 호출할 때마다 buffer를 복사하기 때문에 복사비용이 많이 발생한다. server에서 하나의 client에만 send하는 경우보다 여러 client에 broadcast하는 경우가 훨씬 많으므로 더 적은 비용의 sendbuffer를 구현해보자.
SendBuffer.h
#pragma once
/*-----------
SendBuffer
-----------*/
class SendBuffer : enable_shared_from_this<SendBuffer>
{
public:
SendBuffer(int32 bufferSize);
~SendBuffer();
BYTE* Buffer() { return _buffer.data(); }
int32 WriteSize() { return _writeSize; }
int32 Capacity() { return static_cast<int32>(_buffer.size()); }
void CopyData(void* data, int32 len);
private:
Vector<BYTE> _buffer;
int32 _writeSize = 0;
};
SendBuffer.cpp
#include "pch.h"
#include "SendBuffer.h"
/*-----------
SendBuffer
-----------*/
SendBuffer::SendBuffer(int32 bufferSize)
{
_buffer.resize(bufferSize);
}
SendBuffer::~SendBuffer()
{
}
void SendBuffer::CopyData(void* data, int32 len)
{
ASSERT_CRASH(Capacity() >= len);
::memcpy(_buffer.data(), data, len);
_writeSize = len;
}
SendEvent가 다수의 SendBuffer를 가지도록 변경한다.
IocpEvent.h
...
/*-----------------
SendEvent
-----------------*/
class SendEvent : public IocpEvent
{
public:
SendEvent() : IocpEvent(EventType::Send) {}
Vector<SendBufferRef> sendBuffers;
};
Session에서 SendEvent를 활용하도록 변경한다.
Session.h
...
class Service;
/*----------------
Session
----------------*/
class Session : public IocpObject
{
...
public:
/* 외부에서 사용 */
void Send(SendBufferRef sendBuffer);
...
private:
/* 전송 관련 */
...
void RegisterRecv();
void RegisterSend();
...
void ProcessSend(int32 numOfBytes);
...
private:
USE_LOCK;
/* 수신 관련 */
RecvBuffer _recvBuffer;
/* 송신 관련 */
Queue<SendBufferRef> _sendQueue;
Atomic<bool> _sendRegistered = false;
private:
/* IocpEvent 재사용 */
...
RecvEvent _recvEvent;
SendEvent _sendEvent;
};
Session.cpp
...
void Session::Send(SendBufferRef sendBuffer)
{
// 현재 RegisterSend가 걸리지 않은 상태라면, 걸어준다
WRITE_LOCK;
_sendQueue.push(sendBuffer);
if (_sendRegistered.exchange(true) == false)
RegisterSend();
}
...
void Session::Dispatch(IocpEvent* iocpEvent, int32 numOfBytes)
{
switch (iocpEvent->eventType)
{
case EventType::Connect:
ProcessConnect();
break;
case EventType::Disconnect:
ProcessDisconnect();
break;
case EventType::Recv:
ProcessRecv(numOfBytes);
break;
case EventType::Send:
ProcessSend(numOfBytes);
break;
}
}
...
void Session::RegisterSend()
{
if (IsConnected() == false)
return;
_sendEvent.Init();
_sendEvent.owner = shared_from_this(); // ADD_REF
// 보낼 데이터를 sendEvent에 등록
{
WRITE_LOCK;
int32 writeSize = 0;
while (_sendQueue.empty() == false)
{
SendBufferRef sendBuffer = _sendQueue.front();
writeSize += sendBuffer->WriteSize();
// TODO : 예외 체크
_sendQueue.pop();
_sendEvent.sendBuffers.push_back(sendBuffer);
}
}
// Scatter-Gather (흩어져 있는 데이터들을 모아서 한 방에 보낸다)
Vector<WSABUF> wsaBufs;
wsaBufs.reserve(_sendEvent.sendBuffers.size());
for (SendBufferRef sendBuffer : _sendEvent.sendBuffers)
{
WSABUF wsaBuf;
wsaBuf.buf = reinterpret_cast<char*>(sendBuffer->Buffer());
wsaBuf.len = static_cast<LONG>(sendBuffer->WriteSize());
wsaBufs.push_back(wsaBuf);
}
DWORD numOfBytes = 0;
if (SOCKET_ERROR == ::WSASend(_socket, wsaBufs.data(), static_cast<DWORD>(wsaBufs.size()), OUT & numOfBytes, 0, &_sendEvent, nullptr))
{
int32 errorCode = ::WSAGetLastError();
if (errorCode != WSA_IO_PENDING)
{
HandleError(errorCode);
_sendEvent.owner = nullptr; //RELEASE_REF
_sendEvent.sendBuffers.clear(); //RELEASE_REF
_sendRegistered.store(false);
}
}
}
...
void Session::ProcessSend(int32 numOfBytes)
{
_sendEvent.owner = nullptr; // RELEASE_REF
_sendEvent.sendBuffers.clear(); // RELEASE_REF
if (numOfBytes == 0)
{
Disconnect(L"Send 0");
return;
}
// 컨텐츠 코드에서 오버라이딩
OnSend(numOfBytes);
WRITE_LOCK;
if (_sendQueue.empty())
_sendRegistered.store(false);
else
RegisterSend();
}
...
sendEvent를 멤버로 두고 다수의 send요청이 와도 한 번의 RegisterSend를 호출하도록 구현했다.
RegisterSend에서 Scatter-Gather 방법으로 WSASend를 호출한다. ProcessSend 실행 동안 sendQueue에 데이터가 있으면 바로 RegisterSend를 호출한다.
DummyClient에 변경된 Session 내용을 적용한다.
DummyClient.cpp
...
char sendData[] = "Hello World";
class ServerSession : public Session
{
public:
~ServerSession()
{
cout << "~ServerSession" << endl;
}
virtual void OnConnected() override
{
cout << "Connected To Server" << endl;
SendBufferRef sendBuffer = MakeShared<SendBuffer>(4096);
sendBuffer->CopyData(sendData, sizeof(sendData));
Send(sendBuffer);
}
virtual int32 OnRecv(BYTE* buffer, int32 len) override
{
//Echo
cout << "OnRecv Len = " << len << endl;
this_thread::sleep_for(1s);
SendBufferRef sendBuffer = MakeShared<SendBuffer>(4096);
sendBuffer->CopyData(sendData, sizeof(sendData));
Send(sendBuffer);
return len;
}
...
};
int main()
{
this_thread::sleep_for(1s);
ClientServiceRef service = MakeShared<ClientService>(
NetAddress(L"127.0.0.1", 7777),
MakeShared<IocpCore>(),
MakeShared<ServerSession>,
5);
...
}
DeadLockProfile 부분에 lockStack을 일반 멤버변수로 사용하는 버그가 있었다. 서로 다른 쓰레드에서 같은 lockStack을 사용하니 데이터가 꼬이는 문제가 발생한다. tls 변수로 변경하여 쓰레드별 독립된 lockStack을 사용하도록 수정하자.
CoreTLS.h
#pragma once
#include <stack>
extern thread_local uint32 LThreadId;
extern thread_local std::stack<int32> LLockStack;
CoreTLS.cpp
#include "pch.h"
#include "CoreTLS.h"
thread_local uint32 LThreadId = 0;
thread_local std::stack<int32> LLockStack;
DeadLockProfiler.h _lockStack삭제
DeadLockProfiler.cpp _lockStack -> LLockStack으로 이름 변경
여러 client로 send하는 broadcast 기능을 구현해보자.
GameSession을 관리하는 GameSession Manager가 broadcast를 정의를 갖는다. GameSession은 broadcast가 필요하면 GameSessionManager를 참조하여 boradcast한다.
GameSession.h
#pragma once
#include "Session.h"
class GameSession : public Session
{
public:
~GameSession()
{
cout << "~GameSession" << endl;
}
virtual void OnConnected() override;
virtual void OnDisconnected() override;
virtual int32 OnRecv(BYTE* buffer, int32 len) override;
virtual void OnSend(int32 len) override;
};
GameSession.cpp
#include "pch.h"
#include "GameSession.h"
#include "GameSessionManager.h"
void GameSession::OnConnected()
{
GSessionManager.Add(static_pointer_cast<GameSession>(shared_from_this()));
}
void GameSession::OnDisconnected()
{
GSessionManager.Remove(static_pointer_cast<GameSession>(shared_from_this()));
}
int32 GameSession::OnRecv(BYTE* buffer, int32 len)
{
//Echo
cout << "OnRecv Len = " << len << endl;
SendBufferRef sendBuffer = MakeShared<SendBuffer>(4096);
sendBuffer->CopyData(buffer, len);
GSessionManager.Broadcast(sendBuffer);
return len;
}
void GameSession::OnSend(int32 len)
{
cout << "OnSend Len = " << len << endl;
}
GameSessionManager.h
#pragma once
class GameSession;
using GameSessionRef = shared_ptr<GameSession>;
class GameSessionManager
{
public:
void Add(GameSessionRef session);
void Remove(GameSessionRef session);
void Broadcast(SendBufferRef sendBuffer);
private:
USE_LOCK;
Set<GameSessionRef> _sessions;
};
extern GameSessionManager GSessionManager;
GameSessionManager.cpp
#include "pch.h"
#include "GameSessionManager.h"
#include "GameSession.h"
GameSessionManager GSessionManager;
void GameSessionManager::Add(GameSessionRef session)
{
WRITE_LOCK;
_sessions.insert(session);
}
void GameSessionManager::Remove(GameSessionRef session)
{
WRITE_LOCK;
_sessions.erase(session);
}
void GameSessionManager::Broadcast(SendBufferRef sendBuffer)
{
WRITE_LOCK;
for (GameSessionRef session : _sessions)
{
session->Send(sendBuffer);
}
}
1. Sendbuffer를 구현할 때 RecvBuffer와 달리 고려해야 할 것은?
- 하나의 data를 하나의 세션에 보내는 게 아닌, 여러 세션에 보내야 함. 따라서 복사비용을 생각해야 함.
2. SendBuffer가 refcount 대상인 이유는?
- WSASend 완료 통지가 받을 때까지 buffer 메모리 해제 되면 안되기 때문.
3. BroadCast 방법은?
- BroadCast할 세션을 모아서 Manager로 관리하고 Manager가 가지고 있는 세션을 순회하면서 Send 호출한다.
참조 : [C++과 언리얼로 만드는 MMORPG 게임 개발 시리즈] Part4: 게임 서버 - 인프런 | 강의 (inflearn.com)
Packet Session (0) | 2023.04.16 |
---|---|
SendBuffer Pooling (0) | 2023.04.05 |
RecvBuffer (0) | 2021.12.20 |
네트워크 라이브러리 만들기(Session #3) (0) | 2021.12.13 |
네트워크 라이브러리 만들기(Session #2) (0) | 2021.12.08 |
댓글 영역