标签:
1 /****************************************************************************** 2 Module: Queue.cpp 3 Notices: Copyright (c) 2008 Jeffrey Richter & Christophe Nasarre 4 ******************************************************************************/ 5 6 7 #include "..\CommonFiles\CmnHdr.h" /* See Appendix A. */ 8 #include <windowsx.h> 9 #include <tchar.h> 10 #include <StrSafe.h> 11 #include "Resource.h" 12 13 14 /////////////////////////////////////////////////////////////////////////////// 15 16 17 class CQueue { 18 public: 19 struct ELEMENT { 20 int m_nThreadNum; 21 int m_nRequestNum; 22 // Other element data should go here 23 }; 24 typedef ELEMENT* PELEMENT; 25 26 private: 27 struct INNER_ELEMENT { 28 int m_nStamp; // 0 means empty 29 ELEMENT m_element; 30 }; 31 typedef INNER_ELEMENT* PINNER_ELEMENT; 32 33 private: 34 PINNER_ELEMENT m_pElements; // Array of elements to be processed 35 int m_nMaxElements; // Maximum # of elements in the array 36 int m_nCurrentStamp; // Keep track of the # of added elements 37 38 private: 39 int GetFreeSlot(); 40 int GetNextSlot(int nThreadNum); 41 42 public: 43 CQueue(int nMaxElements); 44 ~CQueue(); 45 BOOL IsFull(); 46 BOOL IsEmpty(int nThreadNum); 47 void AddElement(ELEMENT e); 48 BOOL GetNewElement(int nThreadNum, ELEMENT& e); 49 }; 50 51 52 /////////////////////////////////////////////////////////////////////////////// 53 54 55 CQueue::CQueue(int nMaxElements) { 56 57 // Allocate and initialize the elements 58 m_pElements = (PINNER_ELEMENT) 59 HeapAlloc(GetProcessHeap(), 0, sizeof(INNER_ELEMENT) * nMaxElements); 60 ZeroMemory(m_pElements, sizeof(INNER_ELEMENT) * nMaxElements); 61 62 // Initialize the element counter 63 m_nCurrentStamp = 0; 64 65 // Remember the max number of elements 66 m_nMaxElements = nMaxElements; 67 } 68 69 70 CQueue::~CQueue() { 71 72 HeapFree(GetProcessHeap(), 0, m_pElements); 73 } 74 75 76 BOOL CQueue::IsFull() { 77 78 return(GetFreeSlot() == -1); 79 } 80 81 82 BOOL CQueue::IsEmpty(int nThreadNum) { 83 84 return(GetNextSlot(nThreadNum) == -1); 85 } 86 87 88 int CQueue::GetFreeSlot() { 89 90 // Look for the first element with a 0 stamp 91 for(int current = 0; current < m_nMaxElements; current++) { 92 if (m_pElements[current].m_nStamp == 0) 93 return(current); 94 } 95 96 // No free slot was found 97 return(-1); 98 } 99 100 101 int CQueue::GetNextSlot(int nThreadNum) { 102 103 // By default, there is no slot for this thread 104 int firstSlot = -1; 105 106 // The element can‘t have a stamp higher than the last added 107 int firstStamp = m_nCurrentStamp+1; 108 109 // Look for the even (thread 0) / odd (thread 1) element that is not free 110 for(int current = 0; current < m_nMaxElements; current++) { 111 112 // Keep track of the first added (lowest stamp) in the queue 113 // --> so that "first in first out" behavior is ensured 114 if ((m_pElements[current].m_nStamp != 0) && // free element 115 ((m_pElements[current].m_element.m_nRequestNum % 2) == nThreadNum) && 116 (m_pElements[current].m_nStamp < firstStamp)) { 117 118 firstStamp = m_pElements[current].m_nStamp; 119 firstSlot = current; 120 } 121 } 122 123 return(firstSlot); 124 } 125 126 127 void CQueue::AddElement(ELEMENT e) { 128 129 // Do nothing if the queue is full 130 int nFreeSlot = GetFreeSlot(); 131 if (nFreeSlot == -1) 132 return; 133 134 // Copy the content of the element 135 m_pElements[nFreeSlot].m_element = e; 136 137 // Mark the element with the new stamp 138 m_pElements[nFreeSlot].m_nStamp = ++m_nCurrentStamp; 139 } 140 141 142 BOOL CQueue::GetNewElement(int nThreadNum, ELEMENT& e) { 143 144 int nNewSlot = GetNextSlot(nThreadNum); 145 if (nNewSlot == -1) 146 return(FALSE); 147 148 // Copy the content of the element 149 e = m_pElements[nNewSlot].m_element; 150 151 // Mark the element as read 152 m_pElements[nNewSlot].m_nStamp = 0; 153 154 return(TRUE); 155 } 156 157 158 /////////////////////////////////////////////////////////////////////////////// 159 160 161 CQueue g_q(10); // The shared queue 162 volatile LONG g_fShutdown;// Signals client/server threads to die 163 HWND g_hWnd; // How client/server threads give status 164 SRWLOCK g_srwLock; // Reader/writer lock to protect the queue 165 CONDITION_VARIABLE g_cvReadyToConsume; // Signaled by writers 166 CONDITION_VARIABLE g_cvReadyToProduce; // Signaled by readers 167 168 169 // Handles to all reader/writer threads 170 HANDLE g_hThreads[MAXIMUM_WAIT_OBJECTS]; 171 172 // Number of reader/writer threads 173 int g_nNumThreads = 0; 174 175 176 /////////////////////////////////////////////////////////////////////////////// 177 178 179 void AddText(HWND hWndLB, PCTSTR pszFormat, ...) { 180 181 va_list argList; 182 va_start(argList, pszFormat); 183 184 TCHAR sz[20 * 1024]; 185 _vstprintf_s(sz, _countof(sz), pszFormat, argList); 186 ListBox_SetCurSel(hWndLB, ListBox_AddString(hWndLB, sz)); 187 188 va_end(argList); 189 } 190 191 192 BOOL ConsumeElement(int nThreadNum, int nRequestNum, HWND hWndLB) { 193 194 // Get access to the queue to consume a new element 195 AcquireSRWLockShared(&g_srwLock); 196 197 // Fall asleep until there is something to read. 198 // Check if, while it was asleep, 199 // it was not decided that the thread should stop 200 while (g_q.IsEmpty(nThreadNum) && !g_fShutdown) { 201 // There was not a readable element 202 AddText(hWndLB, TEXT("[%d] Nothing to process"), nThreadNum); 203 204 // The queue is empty 205 // --> Wait until a writer adds a new element to read 206 // and come back with the lock acquired in shared mode 207 SleepConditionVariableSRW(&g_cvReadyToConsume, &g_srwLock, 208 INFINITE, CONDITION_VARIABLE_LOCKMODE_SHARED); 209 } 210 211 // When thread is exiting, the lock should be released for writer 212 // and readers should be signaled through the condition variable 213 if (g_fShutdown) { 214 // Show that the current thread is exiting 215 AddText(hWndLB, TEXT("[%d] bye bye"), nThreadNum); 216 217 // Another writer thread might still be blocked on the lock 218 // --> release it before exiting 219 ReleaseSRWLockShared(&g_srwLock); 220 221 // Notify other readers that it is time to exit 222 // --> release readers 223 WakeConditionVariable(&g_cvReadyToConsume); 224 225 return(FALSE); 226 } 227 228 // Get the first new element 229 CQueue::ELEMENT e; 230 // Note: No need to test the return value since IsEmpty 231 // returned FALSE 232 g_q.GetNewElement(nThreadNum, e); 233 234 // No need to keep the lock any longer 235 ReleaseSRWLockShared(&g_srwLock); 236 237 // Show result of consuming the element 238 AddText(hWndLB, TEXT("[%d] Processing %d:%d"), 239 nThreadNum, e.m_nThreadNum, e.m_nRequestNum); 240 241 // A free slot is now available for writer threads to produce 242 // --> wake up a writer thread 243 WakeConditionVariable(&g_cvReadyToProduce); 244 245 return(TRUE); 246 } 247 248 249 DWORD WINAPI ReaderThread(PVOID pvParam) { 250 251 int nThreadNum = PtrToUlong(pvParam); 252 HWND hWndLB = GetDlgItem(g_hWnd, IDC_SERVERS); 253 254 for (int nRequestNum = 1; !g_fShutdown; nRequestNum++) { 255 256 if (!ConsumeElement(nThreadNum, nRequestNum, hWndLB)) 257 return(0); 258 259 Sleep(2500); // Wait before reading another element 260 } 261 262 // g_fShutdown has been set during Sleep 263 // --> Show that the current thread is exiting 264 AddText(hWndLB, TEXT("[%d] bye bye"), nThreadNum); 265 266 return(0); 267 } 268 269 270 /////////////////////////////////////////////////////////////////////////////// 271 272 273 DWORD WINAPI WriterThread(PVOID pvParam) { 274 275 int nThreadNum = PtrToUlong(pvParam); 276 HWND hWndLB = GetDlgItem(g_hWnd, IDC_CLIENTS); 277 278 for (int nRequestNum = 1; !g_fShutdown; nRequestNum++) { 279 280 CQueue::ELEMENT e = { nThreadNum, nRequestNum }; 281 282 // Require access for writing 283 AcquireSRWLockExclusive(&g_srwLock); 284 285 // If the queue is full, fall asleep as long as the condition variable 286 // is not signaled 287 // Note: During the wait for acquiring the lock, 288 // a stop might have been received 289 if (g_q.IsFull() & !g_fShutdown) { 290 // No more room in the queue 291 AddText(hWndLB, TEXT("[%d] Queue is full: impossible to add %d"), 292 nThreadNum, nRequestNum); 293 294 // --> Need to wait for a reader to empty a slot before acquiring 295 // the lock again 296 SleepConditionVariableSRW(&g_cvReadyToProduce, &g_srwLock, 297 INFINITE, 0); 298 } 299 300 // Other writer threads might still be blocked on the lock 301 // --> Release the lock and notify the remaining writer threads to quit 302 if (g_fShutdown) { 303 // Show that the current thread is exiting 304 AddText(hWndLB, TEXT("[%d] bye bye"), nThreadNum); 305 306 // No need to keep the lock any longer 307 ReleaseSRWLockExclusive(&g_srwLock); 308 309 // Signal other blocked writers threads that it is time to exit 310 WakeAllConditionVariable(&g_cvReadyToProduce); 311 312 // Bye bye 313 return(0); 314 } else { 315 // Add the new ELEMENT into the queue 316 g_q.AddElement(e); 317 318 // Show result of processing element 319 AddText(hWndLB, TEXT("[%d] Adding %d"), nThreadNum, nRequestNum); 320 321 // No need to keep the lock any longer 322 ReleaseSRWLockExclusive(&g_srwLock); 323 324 // Signal reader threads that there is an element to consume 325 WakeAllConditionVariable(&g_cvReadyToConsume); 326 327 // Wait before adding a new element 328 Sleep(1500); 329 } 330 } 331 332 // Show that the current thread is exiting 333 AddText(hWndLB, TEXT("[%d] bye bye"), nThreadNum); 334 335 return(0); 336 } 337 338 339 340 /////////////////////////////////////////////////////////////////////////////// 341 342 343 BOOL Dlg_OnInitDialog(HWND hWnd, HWND hWndFocus, LPARAM lParam) { 344 345 chSETDLGICONS(hWnd, IDI_QUEUE); 346 347 g_hWnd = hWnd; // Used by client/server threads to show status 348 349 // 初始化 SRWLock 350 InitializeSRWLock(&g_srwLock); 351 352 // 初始化 条件变量 353 InitializeConditionVariable(&g_cvReadyToConsume); 354 InitializeConditionVariable(&g_cvReadyToProduce); 355 356 // Will be set to TRUE in order to end threads 357 g_fShutdown = FALSE; 358 359 // 创建写入线程,INT_PTR和指针长度相等的int,x传入线程函数 360 DWORD dwThreadID; 361 for (int x = 0; x < 4; x++) 362 g_hThreads[g_nNumThreads++] = 363 chBEGINTHREADEX(NULL, 0, WriterThread, (PVOID) (INT_PTR) x, 364 0, &dwThreadID); 365 366 // 创建读取线程 367 for (int x = 0; x < 2; x++) 368 g_hThreads[g_nNumThreads++] = 369 chBEGINTHREADEX(NULL, 0, ReaderThread, (PVOID) (INT_PTR) x, 370 0, &dwThreadID); 371 372 return(TRUE); 373 } 374 375 376 /////////////////////////////////////////////////////////////////////////////// 377 378 379 void StopProcessing() { 380 381 if (!g_fShutdown) { 382 // Ask all threads to end 383 InterlockedExchange(&g_fShutdown, TRUE); 384 385 // Free all threads waiting on condition variables 386 WakeAllConditionVariable(&g_cvReadyToConsume); 387 WakeAllConditionVariable(&g_cvReadyToProduce); 388 389 // Wait for all the threads to terminate & then clean up 390 WaitForMultipleObjects(g_nNumThreads, g_hThreads, TRUE, INFINITE); 391 392 // Don‘t forget to clean up kernel resources 393 // Note: This is not really mandatory since the process is exiting 394 while (g_nNumThreads--) 395 CloseHandle(g_hThreads[g_nNumThreads]); 396 397 // Close each list box 398 AddText(GetDlgItem(g_hWnd, IDC_SERVERS), TEXT("---------------------")); 399 AddText(GetDlgItem(g_hWnd, IDC_CLIENTS), TEXT("---------------------")); 400 } 401 } 402 403 404 DWORD WINAPI StoppingThread(PVOID pvParam) { 405 406 StopProcessing(); 407 return(0); 408 } 409 410 411 /////////////////////////////////////////////////////////////////////////////// 412 413 414 void Dlg_OnCommand(HWND hWnd, int id, HWND hWndCtl, UINT codeNotify) { 415 416 switch (id) { 417 case IDCANCEL: 418 EndDialog(hWnd, id); 419 break; 420 421 case IDC_BTN_STOP: 422 { 423 // StopProcessing can‘t be called from the UI thread 424 // or a deadlock will occur: SendMessage() is used 425 // to fill up the listboxes 426 // --> Another thread is required 427 DWORD dwThreadID; 428 CloseHandle(chBEGINTHREADEX(NULL, 0, StoppingThread, 429 NULL, 0, &dwThreadID)); 430 431 // This button can‘t be pushed twice 432 Button_Enable(hWndCtl, FALSE); 433 } 434 break; 435 } 436 } 437 438 439 /////////////////////////////////////////////////////////////////////////////// 440 441 442 INT_PTR WINAPI Dlg_Proc(HWND hWnd, UINT uMsg, WPARAM wParam, LPARAM lParam) { 443 444 switch (uMsg) { 445 chHANDLE_DLGMSG(hWnd, WM_INITDIALOG, Dlg_OnInitDialog); 446 chHANDLE_DLGMSG(hWnd, WM_COMMAND, Dlg_OnCommand); 447 } 448 return(FALSE); 449 } 450 451 452 /////////////////////////////////////////////////////////////////////////////// 453 454 455 int WINAPI _tWinMain(HINSTANCE hinstExe, HINSTANCE, PTSTR pszCmdLine, int) { 456 457 DialogBox(hinstExe, MAKEINTRESOURCE(IDD_QUEUE), NULL, Dlg_Proc); 458 StopProcessing(); 459 return(0); 460 } 461 462 463 //////////////////////////////// End of File //////////////////////////////////
标签:
原文地址:http://www.cnblogs.com/dLong/p/4422588.html