Start creating multithreading work queue

This commit is contained in:
Doyle Thai 2017-06-17 22:25:29 +10:00
parent 64fb913239
commit 5a6564fa94
2 changed files with 110 additions and 17 deletions

View File

@ -441,22 +441,65 @@ i32 Win32GetModuleDirectory(char *const buf, const u32 bufLen)
return lastSlashIndex; return lastSlashIndex;
} }
typedef struct Win32Thread
{
HANDLE handle;
i32 logicalIndex;
DWORD id;
} Win32Thread;
typedef struct Win32ThreadContext
{
Win32Thread *threadList;
i32 size;
} Win32ThreadContext;
typedef struct ThreadJob typedef struct ThreadJob
{ {
int numberToPrint; i32 numberToPrint;
} ThreadJob; } ThreadJob;
typedef struct ThreadJobQueue { typedef struct Win32ThreadJobQueue {
ThreadJob *queue; ThreadJob *jobList;
int queueSize;
} ThreadJobQueue;
FILE_SCOPE ThreadJobQueue jobQueue[16]; LONG volatile jobInsertIndex;
LONG volatile currJobIndex;
LONG size;
} Win32ThreadJobQueue;
FILE_SCOPE Win32ThreadJobQueue globalJobQueue;
FILE_SCOPE Win32ThreadContext globalThreadContext;
FILE_SCOPE HANDLE globalSemaphore;
void PushThreadJob(Win32ThreadJobQueue *queue, ThreadJob job)
{
DQN_ASSERT(queue->jobInsertIndex < queue->size);
queue->jobList[queue->jobInsertIndex] = job;
queue->jobInsertIndex++;
ReleaseSemaphore(globalSemaphore, 1, NULL);
}
DWORD WINAPI Win32ThreadCallback(void *lpParameter) DWORD WINAPI Win32ThreadCallback(void *lpParameter)
{ {
OutputDebugString("Threaded Hello World!\n"); Win32Thread *thread = (Win32Thread *)lpParameter;
return 0; DQN_ASSERT(thread);
for (;;)
{
if (globalJobQueue.currJobIndex < globalJobQueue.jobInsertIndex)
{
LONG workIndex = InterlockedIncrement(&globalJobQueue.currJobIndex) - 1;
ThreadJob job = globalJobQueue.jobList[workIndex];
DqnWin32_OutputDebugString("Thread %d: Printing number: %d\n", thread->logicalIndex,
job.numberToPrint);
}
else
{
WaitForSingleObjectEx(globalSemaphore, INFINITE, false);
}
}
} }
int WINAPI wWinMain(HINSTANCE hInstance, HINSTANCE hPrevInstance, LPWSTR lpCmdLine, int nShowCmd) int WINAPI wWinMain(HINSTANCE hInstance, HINSTANCE hPrevInstance, LPWSTR lpCmdLine, int nShowCmd)
@ -593,7 +636,7 @@ int WINAPI wWinMain(HINSTANCE hInstance, HINSTANCE hPrevInstance, LPWSTR lpCmdLi
//////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////
// Query CPU Cores // Query CPU Cores
//////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////
i32 numCores = 1; i32 numCores = 0;
i32 numLogicalCores = 0; i32 numLogicalCores = 0;
SYSTEM_INFO systemInfo; SYSTEM_INFO systemInfo;
@ -648,18 +691,67 @@ int WINAPI wWinMain(HINSTANCE hInstance, HINSTANCE hPrevInstance, LPWSTR lpCmdLi
{ {
DQN_WIN32_ERROR_BOX("DqnMemStack_Push() failed", NULL); DQN_WIN32_ERROR_BOX("DqnMemStack_Push() failed", NULL);
} }
DqnMemStackTempRegion_End(memRegion);
//////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////
// Threading // Threading
//////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////
const i32 USE_DEFAULT_STACK_SIZE = 0; const i32 QUEUE_SIZE = 20;
void *threadParam = NULL; globalJobQueue.jobList = (ThreadJob *)DqnMemStack_Push(&globalPlatformMemory.tempStack,
DWORD threadId; sizeof(ThreadJob) * QUEUE_SIZE);
HANDLE threadHandle = CreateThread(NULL, USE_DEFAULT_STACK_SIZE, Win32ThreadCallback, if (globalJobQueue.jobList)
threadParam, 0, &threadId); {
// NOTE: (numCores - 1), 1 core is already exclusively for main thread
i32 availableThreads = (numCores - 1) * numLogicalCores;
// TODO(doyle): Logic for single core/thread processors
DQN_ASSERT(availableThreads > 0);
DqnMemStackTempRegion_End(memRegion); globalSemaphore = CreateSemaphore(NULL, 0, availableThreads, NULL);
if (globalSemaphore)
{
// Create jobs
globalJobQueue.size = QUEUE_SIZE;
for (i32 i = 0; i < 10; i++)
{
ThreadJob job = {};
job.numberToPrint = i;
PushThreadJob(&globalJobQueue, job);
}
// Create threads
globalThreadContext.threadList = (Win32Thread *)DqnMemStack_Push(
&globalPlatformMemory.tempStack, sizeof(Win32Thread) * availableThreads);
if (globalThreadContext.threadList)
{
globalThreadContext.size = availableThreads;
for (i32 i = 0; i < globalThreadContext.size; i++)
{
const i32 USE_DEFAULT_STACK_SIZE = 0;
Win32Thread *thread = &globalThreadContext.threadList[i];
thread->logicalIndex = i;
thread->handle =
CreateThread(NULL, USE_DEFAULT_STACK_SIZE, Win32ThreadCallback,
(void *)thread, 0, &thread->id);
if (!thread->handle)
{
DqnWin32_DisplayLastError("CreateThread() failed");
}
}
}
else
{
// TODO(doyle): Failed allocation
}
}
else
{
// TODO(doyle): Semaphore failed.
DqnWin32_DisplayLastError("CreateSemaphore() failed");
}
}
} }
//////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////

View File

@ -1686,9 +1686,10 @@ DQN_FILE_SCOPE void DqnMemStackTempRegion_End(DqnMemStackTempRegion region)
{ {
DQN_ASSERT(stack->block->used >= region.used); DQN_ASSERT(stack->block->used >= region.used);
stack->block->used = region.used; stack->block->used = region.used;
DQN_ASSERT(stack->tempRegionCount >= 0);
} }
stack->tempRegionCount--; stack->tempRegionCount--;
DQN_ASSERT(stack->tempRegionCount >= 0);
} }
#ifdef DQN_CPP_MODE #ifdef DQN_CPP_MODE
@ -3321,7 +3322,7 @@ DQN_FILE_SCOPE void DqnWin32_DisplayErrorCode(const DWORD error, const char *con
DQN_FILE_SCOPE void DqnWin32_OutputDebugString(const char *const formatStr, ...) DQN_FILE_SCOPE void DqnWin32_OutputDebugString(const char *const formatStr, ...)
{ {
LOCAL_PERSIST char str[1024] = {0}; char str[1024] = {0};
va_list argList; va_list argList;
va_start(argList, formatStr); va_start(argList, formatStr);