본문 바로가기

High Level Technique/Window System

쓰레드 풀 (Thread Pool)

쓰레드 풀 (Thread Pool)


쓰레드가 계속해서 생성되고 소멸되면 시스템에 많은 부담을 줍니다. 쓰레드를 소멸시키지 않고 다시 사용하는 것을 쓰레드 풀이라고 합니다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
typedef void(*WORK) (void);
 
typedef struct __WorkerThread
{
    HANDLE hTread;
    DWORD idThread;
 
} WorkerThread;
 
struct __ThreadPool
{
    WORK workList[WORK_MAX];
 
    WorkerThread workerThreadList[THREAD_MAX];
    HANDLE workerEventList[THREAD_MAX];
 
    DWORD idxOfCurrentWork;
    DWORD idxOfLastAddedWork;
 
    DWORD threadIdx;
} gThreadPool;
cs


위 구조체가 쓰레드에게 일을 시키기 위한 기본입니다. 쓰레드에게 일을 시키기 위해서는 일에 해당하는 하나의 함수를 정의해야 하는데, 이 함수의 반환형과 매개변수가 모두 void 형이어야 합니다.


WorkerThread 구조체를 보면 쓰레드의 정보를 담기 위한 구조체입니다. 쓰레드의 핸들과 ID를 담는 구조체인데 추가적으로 필요한 부분이 있다면 더 넣을 수 있습니다.


__ThreadPool 구조체를 보면 typedef 없이 자료형 선언과 동시에 전역변수 형태로 gThreadPool를 선언했습니다. 바로 이것이 쓰레드 풀에 해당합니다.


12라인의 workList는 실행해야 하는 일을 등록하는 장소입니다. idxOfLastAddedWork는 Work의 index보다 +1을 유지하면서 새로운 일이 등록될 때 위치를 알려줍니다.


idxOfCurrentWork는 현재 처리되어야 할 일의 위치를 알려줍니다.




풀에 저장된 쓰레드 정보는 workerThreadList에 저장합니다. workerThreadList[]와 workerEventList[]는 쌍을 이루고 있습니다. threadIdx는 저장된 쓰레드의 개수를 나타냅니다.


workerThreadList[]는 쓰레드를 풀에 저장하기 위한 것이라는 것은 알겠는데 workerEventList는 왜 필요할까요?


쓰레드가 일을 한다는 것은 쓰레드가 호출해서 실행할 함수를 지정해 준다는 것입니다. 만약 쓰레드에게 할당된 일이 없다면 쓰레드는 WaitForXXObject 관련 홤수 호출을 통해서 blocked 상태가 되어야 하고, 새로운 일이 생겼을 때 다시 일을 해야 합니다. 이 과정을 이루기 위해서 쓰레드 하나당 하나의 이벤트 오브젝트가 필요한데 이를 workerEventList[]에 저장을 하는 것입니다.



앞서 idxOfCurrentWork와 idxOfAddedWork는 위치를 알려준다고 했는데  WORK_MAX 만큼만 만들수 있고 그 이후에는 만들 수 없게 됩니다.

이는 원형 형태의 배열로 변경하면 됩니다. (자료구조 부분)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
#include <stdio.h>
#include <tchar.h>
#include <Windows.h>
 
#define WORK_MAX 10000
#define THREAD_MAX 50
 
typedef void(*WORK) (void);
 
DWORD AddWorkToPool(WORK work);
WORK GetWorkFromPool(void);
DWORD MakeThredToPool(DWORD numOfThread);
void WorkerThreadFunction(LPVOID pParam);
 
typedef struct __WorkerThread
{
    HANDLE hThread;
    DWORD idThread;
 
} WorkerThread;
 
//Work and Thread Sturct
struct __ThreadPool
{
    // Work List
    WORK workList[WORK_MAX];
 
    // Thread and Thread's Event
    WorkerThread workerThreadList[THREAD_MAX];
    HANDLE workerEventList[THREAD_MAX];
 
    // Work Info
    DWORD idxOfCurrentWork;
    DWORD idxOfLastAddedWork;
 
    // Num of Thread
    DWORD threadIdx;
 
} gThreadPool;
 
 
static HANDLE mutex = NULL;
 
void InitMutex(void)
{
    mutex = CreateMutex(NULL, FALSE, NULL);
}
 
void DeInitMutex(void)
{
    BOOL ret = CloseHandle(mutex);
}
 
void AcquireMutex(void)
{
    DWORD ret = WaitForSingleObject(mutex, INFINITE);
    
    if (ret == WAIT_FAILED)
    {
        printf("Error Occur! \n");
    }
}
 
void ReleaseMutex(void)
{
    BOOL ret = ReleaseMutex(mutex);
    if (ret == 0)
    {
        printf("Error Occur! \n");
    }
}
 
DWORD AddWorkToPool(WORK work)
{
    AcquireMutex();
    if (gThreadPool.idxOfLastAddedWork >= WORK_MAX)
    {
        printf("AddWorkToPool Fail! \n");
        return NULL;
    }
 
    //Insert Work
    gThreadPool.workList[gThreadPool.idxOfLastAddedWork++= work;
 
    // After regist Work, Wait thread go to execute.
    // All thread exeute. so Efficiency is bad.
 
    for (int i = 0; i < gThreadPool.threadIdx; i++)
    {
        SetEvent(gThreadPool.workerEventList[i]);
    }
 
    ReleaseMutex();
    return 1;
}
 
 
WORK GetWorkFromPool()
{
    WORK work = NULL;
    AcquireMutex();
 
    // if work doesn't exist
    if (!(gThreadPool.idxOfCurrentWork < gThreadPool.idxOfLastAddedWork))
    {
        ReleaseMutex();
        return NULL;
    }
 
    work = gThreadPool.workList[gThreadPool.idxOfCurrentWork++];
    ReleaseMutex();
 
    return work;
}
 
DWORD MakeThreadToPool(DWORD numOfThread)
{
    InitMutex();
    DWORD capacity = WORK_MAX - (gThreadPool.threadIdx);
 
    if (capacity < numOfThread)
    {
        numOfThread = capacity;
    }
 
    for (int i = 0; i < numOfThread; i++)
    {
        DWORD idThread;
        HANDLE hThread;
 
        gThreadPool.workerEventList[gThreadPool.threadIdx] = CreateEvent(NULL, FALSE, FALSE, NULL);
 
        hThread = CreateThread(NULL0, (LPTHREAD_START_ROUTINE)WorkerThreadFunction, (LPVOID)gThreadPool.threadIdx, 0&idThread);
 
        gThreadPool.workerThreadList[gThreadPool.threadIdx].hThread = hThread;
        gThreadPool.workerThreadList[gThreadPool.threadIdx].idThread = idThread;
 
        gThreadPool.threadIdx++;
    }
 
    return numOfThread;
}
 
void WorkerThreadFunction(LPVOID pParam)
{
    WORK workFunction;
    HANDLE event = gThreadPool.workerEventList[(DWORD)pParam];
 
    while (1)
    {
        workFunction = GetWorkFromPool();
        if (workFunction == NULL)
        {
            WaitForSingleObject(event, INFINITE);
            continue;
        }
 
        workFunction();
    }
}
 
void TestFunction()
{
    static int i = 0;
    i++;
 
    printf("Good Test -- %d: Processing Thread: %d --\n\n", i, GetCurrentThreadId);
}
 
int _tmain(int argc, TCHAR* argv[])
{
    MakeThreadToPool(3);
 
    for (int i = 0; i < 100; i++)
    {
        AddWorkToPool(TestFunction);
    }
 
    Sleep(50000);
    return 0;
}
 
cs




'High Level Technique > Window System' 카테고리의 다른 글

SEH (Structured Exception Handling)  (0) 2017.01.09
캐쉬와 가상메모리  (0) 2017.01.05
생산자/소비자 모델  (1) 2017.01.04
쓰레드 동기화  (0) 2017.01.03
Thread 생성과 소멸  (0) 2017.01.02