From 5a6564fa94d999ed4afdf272dd44ddc2f624d43d Mon Sep 17 00:00:00 2001 From: Doyle Thai Date: Sat, 17 Jun 2017 22:25:29 +1000 Subject: [PATCH] Start creating multithreading work queue --- src/Win32DTRenderer.cpp | 122 +++++++++++++++++++++++++++++++++++----- src/dqn.h | 5 +- 2 files changed, 110 insertions(+), 17 deletions(-) diff --git a/src/Win32DTRenderer.cpp b/src/Win32DTRenderer.cpp index c19cad2..ef25f78 100644 --- a/src/Win32DTRenderer.cpp +++ b/src/Win32DTRenderer.cpp @@ -441,22 +441,65 @@ i32 Win32GetModuleDirectory(char *const buf, const u32 bufLen) return lastSlashIndex; } +typedef struct Win32Thread +{ + HANDLE handle; + i32 logicalIndex; + DWORD id; +} Win32Thread; + +typedef struct Win32ThreadContext +{ + Win32Thread *threadList; + i32 size; +} Win32ThreadContext; + typedef struct ThreadJob { - int numberToPrint; + i32 numberToPrint; } ThreadJob; -typedef struct ThreadJobQueue { - ThreadJob *queue; - int queueSize; -} ThreadJobQueue; +typedef struct Win32ThreadJobQueue { + ThreadJob *jobList; -FILE_SCOPE ThreadJobQueue jobQueue[16]; + LONG volatile jobInsertIndex; + LONG volatile currJobIndex; + LONG size; +} Win32ThreadJobQueue; + +FILE_SCOPE Win32ThreadJobQueue globalJobQueue; +FILE_SCOPE Win32ThreadContext globalThreadContext; +FILE_SCOPE HANDLE globalSemaphore; + +void PushThreadJob(Win32ThreadJobQueue *queue, ThreadJob job) +{ + DQN_ASSERT(queue->jobInsertIndex < queue->size); + queue->jobList[queue->jobInsertIndex] = job; + + queue->jobInsertIndex++; + ReleaseSemaphore(globalSemaphore, 1, NULL); +} DWORD WINAPI Win32ThreadCallback(void *lpParameter) { - OutputDebugString("Threaded Hello World!\n"); - return 0; + Win32Thread *thread = (Win32Thread *)lpParameter; + DQN_ASSERT(thread); + + for (;;) + { + if (globalJobQueue.currJobIndex < globalJobQueue.jobInsertIndex) + { + LONG workIndex = InterlockedIncrement(&globalJobQueue.currJobIndex) - 1; + ThreadJob job = globalJobQueue.jobList[workIndex]; + + DqnWin32_OutputDebugString("Thread %d: Printing number: %d\n", thread->logicalIndex, + job.numberToPrint); + } + else + { + WaitForSingleObjectEx(globalSemaphore, INFINITE, false); + } + } } int WINAPI wWinMain(HINSTANCE hInstance, HINSTANCE hPrevInstance, LPWSTR lpCmdLine, int nShowCmd) @@ -593,7 +636,7 @@ int WINAPI wWinMain(HINSTANCE hInstance, HINSTANCE hPrevInstance, LPWSTR lpCmdLi //////////////////////////////////////////////////////////////////////// // Query CPU Cores //////////////////////////////////////////////////////////////////////// - i32 numCores = 1; + i32 numCores = 0; i32 numLogicalCores = 0; SYSTEM_INFO systemInfo; @@ -648,18 +691,67 @@ int WINAPI wWinMain(HINSTANCE hInstance, HINSTANCE hPrevInstance, LPWSTR lpCmdLi { DQN_WIN32_ERROR_BOX("DqnMemStack_Push() failed", NULL); } + DqnMemStackTempRegion_End(memRegion); //////////////////////////////////////////////////////////////////////// // Threading //////////////////////////////////////////////////////////////////////// - const i32 USE_DEFAULT_STACK_SIZE = 0; - void *threadParam = NULL; - DWORD threadId; + const i32 QUEUE_SIZE = 20; + globalJobQueue.jobList = (ThreadJob *)DqnMemStack_Push(&globalPlatformMemory.tempStack, + sizeof(ThreadJob) * QUEUE_SIZE); - HANDLE threadHandle = CreateThread(NULL, USE_DEFAULT_STACK_SIZE, Win32ThreadCallback, - threadParam, 0, &threadId); + if (globalJobQueue.jobList) + { + // NOTE: (numCores - 1), 1 core is already exclusively for main thread + i32 availableThreads = (numCores - 1) * numLogicalCores; + // TODO(doyle): Logic for single core/thread processors + DQN_ASSERT(availableThreads > 0); - DqnMemStackTempRegion_End(memRegion); + globalSemaphore = CreateSemaphore(NULL, 0, availableThreads, NULL); + if (globalSemaphore) + { + // Create jobs + globalJobQueue.size = QUEUE_SIZE; + for (i32 i = 0; i < 10; i++) + { + ThreadJob job = {}; + job.numberToPrint = i; + PushThreadJob(&globalJobQueue, job); + } + + // Create threads + globalThreadContext.threadList = (Win32Thread *)DqnMemStack_Push( + &globalPlatformMemory.tempStack, sizeof(Win32Thread) * availableThreads); + if (globalThreadContext.threadList) + { + globalThreadContext.size = availableThreads; + for (i32 i = 0; i < globalThreadContext.size; i++) + { + const i32 USE_DEFAULT_STACK_SIZE = 0; + Win32Thread *thread = &globalThreadContext.threadList[i]; + thread->logicalIndex = i; + thread->handle = + CreateThread(NULL, USE_DEFAULT_STACK_SIZE, Win32ThreadCallback, + (void *)thread, 0, &thread->id); + + if (!thread->handle) + { + DqnWin32_DisplayLastError("CreateThread() failed"); + } + } + } + else + { + // TODO(doyle): Failed allocation + } + + } + else + { + // TODO(doyle): Semaphore failed. + DqnWin32_DisplayLastError("CreateSemaphore() failed"); + } + } } //////////////////////////////////////////////////////////////////////////// diff --git a/src/dqn.h b/src/dqn.h index 16b3086..4a70e7f 100644 --- a/src/dqn.h +++ b/src/dqn.h @@ -1686,9 +1686,10 @@ DQN_FILE_SCOPE void DqnMemStackTempRegion_End(DqnMemStackTempRegion region) { DQN_ASSERT(stack->block->used >= region.used); stack->block->used = region.used; - DQN_ASSERT(stack->tempRegionCount >= 0); } + stack->tempRegionCount--; + DQN_ASSERT(stack->tempRegionCount >= 0); } #ifdef DQN_CPP_MODE @@ -3321,7 +3322,7 @@ DQN_FILE_SCOPE void DqnWin32_DisplayErrorCode(const DWORD error, const char *con DQN_FILE_SCOPE void DqnWin32_OutputDebugString(const char *const formatStr, ...) { - LOCAL_PERSIST char str[1024] = {0}; + char str[1024] = {0}; va_list argList; va_start(argList, formatStr);