상세 컨텐츠

본문 제목

Deadlock 탐지

똑똑한 개발/C++ 게임개발

by 성댕쓰 2021. 9. 6. 21:21

본문

그래프 자료구조를 이용해 Deadlock 탐지 기능을 만들어 보자.

그래프는 vertex와 edge로 이루어져있다.

방향 그래프는 edge에 방향 정보도 있는 그래프이다.

 

2차원 벡터(행렬)을 이용하여 그래프 표현가능하다.

 

그래프 사이클 판별하여 dead lock 일어나는지 확인가능하다. 각 vertex를 어떠한 lock을 잡은 상태라고 두고 edge를 따라 연결된 vertex는 lock잡은 상태에서 다른 lock을 잡은 것이라고 생각하자.

 

그렇다면 그래프에서 사이클이 생기면 dead lock이 일어날 수 있는 상황이라고 판단가능하다.

 

다음의 용어에 대한 정의를 알아보자

1. 순방향 간선

  - edge를 따라간 vertex가 교차간선, 역방향 간선이 아님.

2. 교차 간선

  - edge를 따라간 vertex가 다른 탐색 그룹에 의해 이미 발견된 vertex임.

3. 역방향 간선

  - edge를 따라간 vertex가 현재 탐색 그룹에 의해 이미 발견된 vertex임.

 

DeadLockProfiler.h

#pragma once
#include <stack>
#include <map>
#include <vector>

/*----------------------
	DeadLockProfiler
----------------------*/

class DeadLockProfiler
{
public:
	void PushLock(const char* name);
	void PopLock(const char* name);
	void CheckCycle();

private:
	void Dfs(int32 here);

private:
	unordered_map<const char*, int32> _nameToId;
	unordered_map<int32, const char*> _idToName;
	stack<int32> _lockStack; // 멤버변수로 잡으면 안되고, tls에 선언해야 함.
	map<int32, set<int32>> _lockHistory; // 0번 락이 1번 락을 잡았다와 같은 히스토리

	Mutex _lock;

private:
	vector<int32> _discoveredOrder; // vertex가 발견된 순서를 기록하는 배열
	int32 _discoveredCount = 0; // vertex가 발견된 순서
	vector<bool> _finished; // Dfs(i)가 종료 되었는지 여부
	vector<int32> _parent;
};

 

DeadLockProfiler.cpp

#include "pch.h"
#include "DeadLockProfiler.h"

/*----------------------
	DeadLockProfiler
----------------------*/

void DeadLockProfiler::PushLock(const char* name)
{
	LockGuard guard(_lock);

	// 아이디를 찾거나 발급한다.
	int32 lockId = 0;

	auto findIt = _nameToId.find(name);
	if (findIt == _nameToId.end())
	{
		lockId = static_cast<int32>(_nameToId.size());
		_nameToId[name] = lockId;
		_idToName[lockId] = name;
	}
	else
	{
		lockId = findIt->second;
	}

	// 잡고 있는 락이 있었다면
	if (_lockStack.empty() == false)
	{
		// 기존에 발견되지 않은 케이스라면 데드락 여부 다시 확인한다.
		const int32 prevId = _lockStack.top();
		if (lockId != prevId)
		{
			set<int32>& history = _lockHistory[prevId];
			if (history.find(lockId) == history.end())
			{
				history.insert(lockId);
				CheckCycle();
			}
		}
	}

	_lockStack.push(lockId);
}

void DeadLockProfiler::PopLock(const char* name)
{
	LockGuard guard(_lock);

	if (_lockStack.empty())
		CRASH("MULTIPLE UNLOCK");

	int32 lockId = _nameToId[name];
	if (_lockStack.top() != lockId)
		CRASH("INVALID UNLOCK");

	_lockStack.pop();
}

void DeadLockProfiler::CheckCycle()
{
	const int32 lockCount = static_cast<int32>(_nameToId.size());
	_discoveredOrder = vector<int32>(lockCount, -1);
	_discoveredCount = 0;
	_finished = vector<bool>(lockCount, false);
	_parent = vector<int32>(lockCount, -1);

	for (int32 lockId = 0; lockId < lockCount; lockId++)
		Dfs(lockId);

	// 연산이 끝났으면 정리한다.
	_discoveredOrder.clear();
	_finished.clear();
	_parent.clear();
}

void DeadLockProfiler::Dfs(int32 here)
{
	// 이미 방문했으면 리턴
	if (_discoveredOrder[here] != -1)
		return;
	// 방문한 노드의 순번을 표시
	_discoveredOrder[here] = _discoveredCount++;

	// 모든 인접한 정점을 순회한다.
	auto findIt = _lockHistory.find(here);
	if (findIt == _lockHistory.end())
	{
		_finished[here] = true;
		return;
	}

	set<int32>& nextSet = findIt->second;
	for (int32 there : nextSet)
	{
		// 아직 방문한 적이 없다면 방문한다.
		if (_discoveredOrder[there] == -1)
		{
			_parent[there] = here;
			Dfs(there);
			continue;
		}

		// here가 there보다 먼저 발견되었다면,. there는 here의 후손이다. (순방향 간선)
		if (_discoveredOrder[here] < _discoveredOrder[there])
			continue;

		// 순방향이 아니고, Dfs(there)가 아직 종료하지 않았다면, there는 here의 선조이다. (역방향 간선)
		if (_finished[there] == false)
		{
			printf("%s -> %s\n", _idToName[here], _idToName[there]);

			int32 now = here;
			while (true)
			{
				printf("%s -> %s\n", _idToName[_parent[now]], _idToName[now]);
				now = _parent[now];
				if (now == there)
					break;
			}
			CRASH("DEADLOCK DETECTED");
		}
	}

	_finished[here] = true;
}

 

사용은 다음과 같이 한다.

CoreGlobal.h, CoreGlobal.cpp

#pragma once

extern class ThreadManager* GThreadManager;

extern class DeadLockProfiler* GDeadLockProfiler;
#include "pch.h"
#include "CoreGlobal.h"
#include "ThreadManager.h"
#include "DeadLockProfiler.h"

ThreadManager* GThreadManager = nullptr;
DeadLockProfiler* GDeadLockProfiler = nullptr;

class CoreGlobal
{
public:
	CoreGlobal()
	{
		GThreadManager = new ThreadManager();
		GDeadLockProfiler = new DeadLockProfiler();
	}
	~CoreGlobal()
	{
		delete GThreadManager;
		delete GDeadLockProfiler;
	}
} GCoreGlobal;

 

Lock.cpp

void Lock::WriteLock(const char* name)
{

#if _DEBUG
	GDeadLockProfiler->PushLock(name);
#endif
...
}

void Lock::WriteUnlock(const char* name)
{
#if _DEBUG
	GDeadLockProfiler->PopLock(name);
#endif
...
}

void Lock::ReadLock(const char* name)
{
#if _DEBUG
	GDeadLockProfiler->PushLock(name);
#endif
...
}

void Lock::ReadUnlock(const char* name)
{
#if _DEBUG
	GDeadLockProfiler->PopLock(name);
#endif
...
}

1. deadlock 탐지기의 기본 원리는?

- 그래프 중, 방향 그래프 이용. 0번 락을 잡았을 때 1번 락 잡는 상황을 그래프로 표현.

- deadlock 발생하는 상황은 그래프 노드간 사이클이 일어날 때임.

- 방문 순서대로 증가하는 정수를 이용하여, 순서대로 방문했는지, 역방향 간선이 발생하는지 확인함.

2. lockstack 역할은?

- 바로 이전 lock을 기억하기 위해서

- 잡힌 lock이 있는지 확인하기 위해서

3. lockstack에서 poplock 할때, top에 같은 lock 없으면 crash 인 이유는?

- lockstack이 tls인 경우, lock 해제 순서에 맞지 않기 때문.

4. lockhistory 역할은?

- 그래프 자료구조.

5. poplock에서 해제한 lock의 히스토리를 전부 해제하지 않는 이유는?

- 살아있는 lock 만 잡게 되면 절묘한 타이밍에 순환이 일어나는 락을 잡지 못함.

- 구조적으로 모든 deadlock 가능성을 제거하기 위해서임.

 

 

참조 : [C++과 언리얼로 만드는 MMORPG 게임 개발 시리즈] Part4: 게임 서버 - 인프런 | 강의 (inflearn.com)

'똑똑한 개발 > C++ 게임개발' 카테고리의 다른 글

스마트 포인터  (0) 2021.09.10
Reference Counting  (0) 2021.09.08
Read-Writer lock  (0) 2021.09.03
ThreadManager  (0) 2021.09.01
Lock free stack #3  (0) 2021.08.31

관련글 더보기

댓글 영역