Port atomics, mutex lock and job queue to unix

This commit is contained in:
Doyle Thai 2017-06-26 01:57:51 +10:00
parent 19c427d756
commit fca8db0366
3 changed files with 514 additions and 209 deletions

680
dqn.h
View File

@ -47,12 +47,13 @@
// #DqnFile File I/O (Read, Write, Delete)
// #DqnDir Directory Querying
// #DqnTimer High Resolution Timer
// #DqnLock Mutex Synchronisation
// #DqnJobQueue Multithreaded Job Queue
// #DqnAtomic Interlocks/Atomic Operations
// #DqnPlatform Common Platform API helpers
// #Platform
// - #Win32Platform
// - #DqnLock Mutex Synchronisation
// - #DqnAtomic Interlocks/Atomic Operations
// - #DqnJobQueue Multithreaded Job Queue
// - #DqnWin32 Common Win32 API Helpers
// #External Code
@ -82,6 +83,7 @@
////////////////////////////////////////////////////////////////////////////////
#ifndef DQN_H
#define DQN_H
#ifdef DQN_MAKE_STATIC
#define DQN_FILE_SCOPE static
#else
@ -273,7 +275,7 @@ DQN_FILE_SCOPE bool DqnMemStack_InitWithFixedSize(DqnMemStack *const stack, size
// Dynamically expandable stack. Akin to DqnMemStack_InitWithFixedSize() except if the MemStack does
// not have enough space for allocation it will automatically attach another MemBlock using
// DqnMem_Alloc().
// DqnMem_Calloc().
DQN_FILE_SCOPE bool DqnMemStack_Init(DqnMemStack *const stack, size_t size, const bool zeroClear, const u32 byteAlign = 4);
////////////////////////////////////////////////////////////////////////////////
@ -913,13 +915,17 @@ DQN_FILE_SCOPE i32 DqnStr_LenDelimitWith(char *const a, const char delimiter);
DQN_FILE_SCOPE char *DqnStr_Copy(char *const dest, const char *const src, const i32 numChars);
DQN_FILE_SCOPE bool DqnStr_Reverse (char *const buf, const i32 bufSize);
// return: The offset into the src to first char of the found string. Returns -1 if not found
DQN_FILE_SCOPE i32 DqnStr_FindFirstOccurence(const char *const src, const i32 srcLen, const char *const find, const i32 findLen);
DQN_FILE_SCOPE bool DqnStr_HasSubstring (const char *const src, const i32 srcLen, const char *const find, const i32 findLen);
#define DQN_32BIT_NUM_MAX_STR_SIZE 11
#define DQN_64BIT_NUM_MAX_STR_SIZE 21
// Return the len of the derived string. If buf is NULL and or bufSize is 0 the
// function returns the required string length for the integer.
// Return the len of the derived string. If buf is NULL and or bufSize is 0 the function returns the
// required string length for the integer
// TODO NOTE(doyle): Parsing stops when a non-digit is encountered, so numbers with ',' don't work
// atm.
DQN_FILE_SCOPE i32 Dqn_I64ToStr(const i64 value, char *const buf, const i32 bufSize);
DQN_FILE_SCOPE i64 Dqn_StrToI64(const char *const buf, const i32 bufSize);
// WARNING: Not robust, precision errors and whatnot but good enough!
@ -970,6 +976,16 @@ DQN_FILE_SCOPE i32 DqnRnd_PCGRange(DqnRandPCGState *pcg, i32 min, i32 max);
// Functions in the Cross Platform are guaranteed to be supported in both Unix
// and Win32
#ifdef DQN_XPLATFORM_LAYER
#if defined(DQN_WIN32_PLATFORM)
#define WIN32_LEAN_AND_MEAN
#include <Windows.h>
#elif defined(DQN_UNIX_PLATFORM)
#include <pthread.h>
#include <semaphore.h>
#endif
////////////////////////////////////////////////////////////////////////////////
// XPlatform > #DqnFile Public API - File I/O
////////////////////////////////////////////////////////////////////////////////
@ -1003,7 +1019,8 @@ typedef struct DqnFile
size_t size;
#if defined(DQN_CPP_MODE)
// If true, RAII cleanup in the destructor on scope exit. Can be changed at any point by user.
// If raiiCleanup is true, close() is called in the destructor on scope exit. Can be changed at
// any point by user.
bool raiiCleanup;
DqnFile (const bool raiiCleanup = false);
~DqnFile();
@ -1013,7 +1030,6 @@ typedef struct DqnFile
size_t Write(u8 *const buffer, const size_t numBytesToWrite, const size_t fileOffset);
size_t Read (u8 *const buffer, const size_t numBytesToRead);
void Close();
#endif
} DqnFile;
@ -1031,6 +1047,8 @@ DQN_FILE_SCOPE size_t DqnFile_Write(const DqnFile *const file, u8 *const buffer,
// return: The number of bytes read. 0 if invalid args or it failed to read.
DQN_FILE_SCOPE size_t DqnFile_Read (const DqnFile *const file, u8 *const buffer, const size_t numBytesToRead);
// File close invalidates the handle after it is called.
DQN_FILE_SCOPE void DqnFile_Close(DqnFile *const file);
// NOTE: You can't delete a file unless the handle has been closed to it on Win32.
@ -1051,33 +1069,31 @@ DQN_FILE_SCOPE void DqnDir_ReadFree(char **fileList, u32 numFiles);
////////////////////////////////////////////////////////////////////////////////
DQN_FILE_SCOPE f64 DqnTimer_NowInMs();
DQN_FILE_SCOPE f64 DqnTimer_NowInS ();
#endif // DQN_XPLATFORM_LAYER
////////////////////////////////////////////////////////////////////////////////
// #Platform Public API
////////////////////////////////////////////////////////////////////////////////
// Functions here are only available for the #defined sections (i.e. all functions in
// DQN_WIN32_PLATFORM only have a valid implementation in Win32.
////////////////////////////////////////////////////////////////////////////////
// #Win32Platform Public API
////////////////////////////////////////////////////////////////////////////////
#ifdef DQN_WIN32_PLATFORM
#define WIN32_LEAN_AND_MEAN
#include <Windows.h>
////////////////////////////////////////////////////////////////////////////////
// Win32Platform > #DqnLock Public API - Mutex Synchronisation
// XPlatform > #DqnLock Public API - Mutex Synchronisation
////////////////////////////////////////////////////////////////////////////////
typedef struct DqnLock
{
#if defined(DQN_WIN32_PLATFORM)
CRITICAL_SECTION win32Handle;
#elif defined(DQN_UNIX_PLATFORM)
pthread_mutex_t unixHandle;
#endif
// Win32 only, when trying to acquire a locked lock, it is the number of spins permitted
// spinlocking on the lock before being blocked. Set before init if you want a different value.
u32 win32SpinCount = 16000;
#if defined(DQN_CPP_MODE)
bool Init(const u32 spinCount = 16000);
bool Init ();
void Acquire();
void Release();
void Delete();
void Delete ();
struct DqnLockGuard LockGuard();
#endif
} DqnLock;
@ -1097,20 +1113,15 @@ private:
};
#endif
DQN_FILE_SCOPE bool DqnLock_Init (DqnLock *const lock, const u32 spinCount = 16000);
// lock: Pass in a pointer to a default DqnLock struct.
// In Win32, you may optionally change the win32Spincount.
DQN_FILE_SCOPE bool DqnLock_Init (DqnLock *const lock);
DQN_FILE_SCOPE void DqnLock_Acquire(DqnLock *const lock);
DQN_FILE_SCOPE void DqnLock_Release(DqnLock *const lock);
DQN_FILE_SCOPE void DqnLock_Delete (DqnLock *const lock);
////////////////////////////////////////////////////////////////////////////////
// Win32Platform > #DqnAtomic Public API - Interlocks/Atomic Operations
////////////////////////////////////////////////////////////////////////////////
DQN_FILE_SCOPE u32 DqnAtomic_CompareSwap32(u32 volatile *dest, u32 swapVal, u32 compareVal);
DQN_FILE_SCOPE u32 DqnAtomic_Add32 (u32 volatile *src);
DQN_FILE_SCOPE u32 DqnAtomic_Sub32 (u32 volatile *src);
////////////////////////////////////////////////////////////////////////////////
// Win32Platform > #DqnJobQueue Public API - Multithreaded Job Queue
// XPlatform > #DqnJobQueue Public API - Multithreaded Job Queue
////////////////////////////////////////////////////////////////////////////////
// DqnJobQueue is a platform abstracted "lockless" multithreaded work queue. It will create threads
// and assign threads to complete the job via the job "callback" using the "userData" supplied.
@ -1144,8 +1155,19 @@ typedef struct DqnJobQueue
// NOTE(doyle): Modified by main+worker threads
u32 volatile jobToExecuteIndex;
u32 volatile numJobsToComplete;
#if defined(DQN_WIN32_PLATFORM)
void *semaphore;
#elif defined(DQN_UNIX_PLATFORM)
sem_t semaphore;
#else
#error Unknown platform
#endif
// NOTE: Modified by main thread ONLY
u32 volatile jobInsertIndex;
@ -1186,6 +1208,44 @@ DQN_FILE_SCOPE void DqnJobQueue_BlockAndCompleteAllJobs(DqnJobQueue *const queue
DQN_FILE_SCOPE bool DqnJobQueue_TryExecuteNextJob(DqnJobQueue *const queue);
DQN_FILE_SCOPE bool DqnJobQueue_AllJobsComplete (DqnJobQueue *const queue);
////////////////////////////////////////////////////////////////////////////////
// XPlatform > #DqnAtomic Public API - Interlocks/Atomic Operations
////////////////////////////////////////////////////////////////////////////////
// All atomic operations generate a full read/write barrier. This is implicitly enforced by the
// OS calls, not explicitly in my code.
// swapVal: The value to put into "dest", IF at point of read, "dest" has the value of "compareVal"
// compareVal: The value to check in "dest"
// return: Return the original value that was in "dest"
DQN_FILE_SCOPE u32 DqnAtomic_CompareSwap32(u32 volatile *const dest, const u32 swapVal, const u32 compareVal);
// Increment the value at src by 1
// return: The incremented value
DQN_FILE_SCOPE u32 DqnAtomic_Add32(u32 volatile *const src);
// Decrement the value at src by 1
// return: The decremented value
DQN_FILE_SCOPE u32 DqnAtomic_Sub32(u32 volatile *const src);
////////////////////////////////////////////////////////////////////////////////
// XPlatform > #DqnPlatform Public API - Common Platform API Helpers
////////////////////////////////////////////////////////////////////////////////
// Uses a single call to DqnMem_Calloc() and DqnMem_Free(). Not completely platform "independent" for Unix.
// numCores: numThreadsPerCore: Can be NULL, the function will just skip it.
DQN_FILE_SCOPE void DqnPlatform_GetNumThreadsAndCores(u32 *const numCores, u32 *const numThreadsPerCore);
#endif // DQN_XPLATFORM_LAYER
////////////////////////////////////////////////////////////////////////////////
// #Platform Public API
////////////////////////////////////////////////////////////////////////////////
// Functions here are only available for the #defined sections (i.e. all functions in
// DQN_WIN32_PLATFORM only have a valid implementation in Win32.
////////////////////////////////////////////////////////////////////////////////
// #Win32Platform Public API
////////////////////////////////////////////////////////////////////////////////
#ifdef DQN_WIN32_PLATFORM
////////////////////////////////////////////////////////////////////////////////
// Platform > #DqnWin32 Public API - Common Win32 API Helpers
////////////////////////////////////////////////////////////////////////////////
@ -1218,9 +1278,6 @@ DQN_FILE_SCOPE void DqnWin32_OutputDebugString(const char *const formatStr, ...)
// return: The offset to the last backslash. -1 if bufLen was not large enough or buf is null.
DQN_FILE_SCOPE i32 DqnWin32_GetEXEDirectory(char *const buf, const u32 bufLen);
// numCores: numThreadsPerCore: Can be NULL, the function will just skip it.
// Uses calloc and free for querying numCores.
DQN_FILE_SCOPE void DqnWin32_GetNumThreadsAndCores(i32 *const numCores, i32 *const numThreadsPerCore);
#endif // DQN_WIN32_PLATFORM
////////////////////////////////////////////////////////////////////////////////
@ -3335,6 +3392,11 @@ DQN_FILE_SCOPE i64 Dqn_StrToI64(const char *const buf, const i32 bufSize)
if (!buf || bufSize == 0) return 0;
i32 index = 0;
while (buf[index] == ' ')
{
index++;
}
bool isNegative = false;
if (buf[index] == '-' || buf[index] == '+')
{
@ -5544,12 +5606,13 @@ void DqnIni_PropertyValueSet(DqnIni *ini, int section, int property,
// and Win32
#ifdef DQN_UNIX_PLATFORM
#include <sys/stat.h>
#include <sys/time.h>
#include <time.h>
#include <stdio.h> // Basic File I/O // TODO(doyle): Syscall versions
#include <unistd.h> // unlink()
#include <dirent.h> // readdir()/opendir()/closedir()
#include <sys/stat.h> // file size query
#include <sys/time.h> // high resolution timer
#include <time.h> // timespec
#include <unistd.h> // unlink()
#endif
////////////////////////////////////////////////////////////////////////////////
@ -5699,7 +5762,7 @@ DQN_FILE_SCOPE char **DqnDirInternal_PlatformRead(const char *const dir,
sizeof(*list) * (currNumFiles));
if (!list)
{
DQN_WIN32_ERROR_BOX("DqnMem_Alloc() failed.", NULL);
DQN_WIN32_ERROR_BOX("DqnMem_Calloc() failed.", NULL);
*numFiles = 0;
return NULL;
}
@ -5712,7 +5775,7 @@ DQN_FILE_SCOPE char **DqnDirInternal_PlatformRead(const char *const dir,
for (u32 j = 0; j < i; j++)
DqnMem_Free(list[j]);
DQN_WIN32_ERROR_BOX("DqnMem_Alloc() failed.", NULL);
DQN_WIN32_ERROR_BOX("DqnMem_Clloc() failed.", NULL);
*numFiles = 0;
return NULL;
}
@ -5777,10 +5840,14 @@ FILE_SCOPE bool DqnFileInternal_UnixOpen(const char *const path,
// TODO(doyle): What about not reading as a binary file and appending to end
// of file.
u32 modeIndex = 0;
char mode[4] = {};
mode[0] = operation;
mode[1] = (updateFlag) ? '+' : 0;
mode[2] = 'b';
mode[modeIndex++] = operation;
if (updateFlag) mode[modeIndex++] = '+';
mode[modeIndex++] = 'b';
DQN_ASSERT_HARD(modeIndex <= DQN_ARRAY_COUNT(mode) - 1);
// TODO(doyle): Use open syscall
// TODO(doyle): Query errno
@ -5796,9 +5863,20 @@ FILE_SCOPE bool DqnFileInternal_UnixOpen(const char *const path,
}
file->handle = (void *)handle;
file->size = fileStat.st_size;
file->permissionFlags = permissionFlags;
// NOTE: Can occur in some instances where files are generated on demand,
// i.e. /proc/cpuinfo
if (fileStat.st_size == 0)
{
// TODO
file->size = fileStat.st_size;
}
else
{
file->size = fileStat.st_size;
}
return true;
}
@ -6092,26 +6170,28 @@ DQN_FILE_SCOPE f64 DqnTimer_NowInMs()
};
DQN_FILE_SCOPE f64 DqnTimer_NowInS() { return DqnTimer_NowInMs() / 1000.0f; }
#endif // DQN_XPLATFORM_LAYER
////////////////////////////////////////////////////////////////////////////////
// #Platform Implementation
// XPlatform > #DqnLock Implementation
////////////////////////////////////////////////////////////////////////////////
#ifdef DQN_WIN32_PLATFORM
////////////////////////////////////////////////////////////////////////////////
// #Win32Platform Implementation
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
// Win32Platform > #DqnLock Implementation
////////////////////////////////////////////////////////////////////////////////
bool DqnLock_Init(DqnLock *const lock, const u32 spinCount)
bool DqnLock_Init(DqnLock *const lock)
{
if (!lock) return false;
#ifdef DQN_WIN32_PLATFORM
if (InitializeCriticalSectionEx(&lock->win32Handle, spinCount, 0))
#if defined(DQN_WIN32_PLATFORM)
if (InitializeCriticalSectionEx(&lock->win32Handle, lock->win32SpinCount, 0))
return true;
#elif defined(DQN_UNIX_PLATFORM)
// NOTE: Static initialise, pre-empt a lock so that it gets initialised as per documentation
lock->unixHandle = PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP;
DqnLock_Acquire(lock);
DqnLock_Release(lock);
return true;
#else
#error Unsupported platform
#endif
return false;
@ -6120,42 +6200,71 @@ bool DqnLock_Init(DqnLock *const lock, const u32 spinCount)
void DqnLock_Acquire(DqnLock *const lock)
{
if (!lock) return;
#ifdef DQN_WIN32_PLATFORM
#if defined(DQN_WIN32_PLATFORM)
EnterCriticalSection(&lock->win32Handle);
#elif defined(DQN_UNIX_PLATFORM)
// TODO(doyle): Better error handling
i32 error = pthread_mutex_lock(&lock->unixHandle);
DQN_ASSERT_HARD(error == 0);
#else
DQN_ASSERT_HARD(DQN_INVALID_CODE_PATH);
#error Unsupported platform
#endif
}
void DqnLock_Release(DqnLock *const lock)
{
if (!lock) return;
#ifdef DQN_WIN32_PLATFORM
#if defined(DQN_WIN32_PLATFORM)
LeaveCriticalSection(&lock->win32Handle);
#elif defined (DQN_UNIX_PLATFORM)
// TODO(doyle): better error handling
i32 error = pthread_mutex_unlock(&lock->unixHandle);
DQN_ASSERT_HARD(error == 0);
#else
DQN_ASSERT_HARD(DQN_INVALID_CODE_PATH);
#error Unsupported platform
#endif
}
void DqnLock_Delete(DqnLock *const lock)
{
if (!lock) return;
#ifdef DQN_WIN32_PLATFORM
#if defined(DQN_WIN32_PLATFORM)
DeleteCriticalSection(&lock->win32Handle);
#elif defined(DQN_UNIX_PLATFORM)
i32 error = pthread_mutex_destroy(&lock->unixHandle);
DQN_ASSERT_HARD(error == 0);
#else
DQN_ASSERT_HARD(DQN_INVALID_CODE_PATH);
#error Unsupported platform
#endif
}
////////////////////////////////////////////////////////////////////////////////
// Win32Platform > #DqnLock CPP Implementation
// XPlatform > #DqnLock CPP Implementation
////////////////////////////////////////////////////////////////////////////////
bool DqnLock::Init() { return DqnLock_Init (this); }
void DqnLock::Acquire() { DqnLock_Acquire(this); }
void DqnLock::Release() { DqnLock_Release(this); }
void DqnLock::Delete() { DqnLock_Delete (this); }
DqnLockGuard DqnLock::LockGuard()
{
return DqnLockGuard(this, NULL);
}
////////////////////////////////////////////////////////////////////////////////
// Win32Platform > #DqnLockGuard CPP Implementation
// XPlatform > #DqnLockGuard CPP Implementation
////////////////////////////////////////////////////////////////////////////////
DqnLockGuard::DqnLockGuard(DqnLock *const lock_, bool *const succeeded)
{
@ -6177,55 +6286,19 @@ DqnLockGuard::~DqnLockGuard()
}
////////////////////////////////////////////////////////////////////////////////
// Win32Platform > #DqnAtomic Implementation
// XPlatform > #DqnJobQueueInternal Implementation
////////////////////////////////////////////////////////////////////////////////
DQN_FILE_SCOPE u32 DqnAtomic_CompareSwap32(u32 volatile *dest, u32 swapVal, u32 compareVal)
{
#ifdef DQN_WIN32_PLATFORM
DQN_ASSERT(sizeof(LONG) == sizeof(u32));
u32 result =
(u32)InterlockedCompareExchange((LONG volatile *)dest, (LONG)swapVal, (LONG)compareVal);
return result;
#else
DQN_ASSERT_HARD(DQN_INVALID_CODE_PATH);
return 0;
#endif
}
DQN_FILE_SCOPE u32 DqnAtomic_Add32(u32 volatile *src)
{
#ifdef DQN_WIN32_PLATFORM
DQN_ASSERT(sizeof(LONG) == sizeof(u32));
u32 result = (u32)InterlockedIncrement((LONG volatile *)src);
return result;
#else
DQN_ASSERT_HARD(DQN_INVALID_CODE_PATH);
return 0;
#endif
}
DQN_FILE_SCOPE u32 DqnAtomic_Sub32(u32 volatile *src)
{
#ifdef DQN_WIN32_PLATFORM
DQN_ASSERT(sizeof(LONG) == sizeof(u32));
u32 result = (u32)InterlockedDecrement((LONG volatile *)src);
return result;
#else
DQN_ASSERT_HARD(DQN_INVALID_CODE_PATH);
return 0;
#endif
}
////////////////////////////////////////////////////////////////////////////////
// Win32Platform > #DqnJobQueueInternal Implementation
////////////////////////////////////////////////////////////////////////////////
typedef void *DqnThreadCallbackInternal(void *threadParam);
size_t DQN_JOB_QUEUE_INTERNAL_THREAD_DEFAULT_STACK_SIZE = 0;
FILE_SCOPE u32 DqnJobQueueInternal_ThreadCreate(const size_t stackSize, void *threadCallback,
void *threadParam, const u32 numThreads)
FILE_SCOPE u32 DqnJobQueueInternal_ThreadCreate(const size_t stackSize,
DqnThreadCallbackInternal *const threadCallback,
void *const threadParam, const u32 numThreads)
{
u32 numThreadsCreated = 0;
#ifdef DQN_WIN32_PLATFORM
#if defined(DQN_WIN32_PLATFORM)
DQN_ASSERT_HARD(stackSize == 0 || !threadCallback);
for (u32 i = 0; i < numThreads; i++)
{
@ -6235,8 +6308,27 @@ FILE_SCOPE u32 DqnJobQueueInternal_ThreadCreate(const size_t stackSize, void *th
numThreadsCreated++;
}
#elif defined(DQN_UNIX_PLATFORM)
// TODO(doyle): Better error handling
pthread_attr_t attribute = {};
DQN_ASSERT(pthread_attr_init(&attribute) == 0);
// Allows us to use pthread_join() which lets us wait till a thread finishes execution
DQN_ASSERT(pthread_attr_setdetachstate(&attribute, PTHREAD_CREATE_JOINABLE) == 0);
// TODO(doyle): Persist thread handles
for (u32 i = 0; i < numThreads; i++)
{
pthread_t thread = {};
pthread_create(&thread, &attribute, threadCallback, threadParam);
numThreadsCreated++;
}
DQN_ASSERT(pthread_attr_destroy(&attribute) == 0);
#else
DQN_ASSERT(DQN_INVALID_CODE_PATH);
#error Unsupported platform
#endif
DQN_ASSERT(numThreadsCreated == numThreads);
@ -6244,25 +6336,75 @@ FILE_SCOPE u32 DqnJobQueueInternal_ThreadCreate(const size_t stackSize, void *th
}
FILE_SCOPE u32 DqnJobQueueInternal_ThreadCallback(void *threadParam)
FILE_SCOPE void *DqnJobQueueInternal_ThreadCallback(void *threadParam)
{
DqnJobQueue *queue = (DqnJobQueue *)threadParam;
#ifdef DQN_WIN32_PLATFORM
for (;;)
{
if (!DqnJobQueue_TryExecuteNextJob(queue))
{
#if defined(DQN_WIN32_PLATFORM)
WaitForSingleObjectEx(queue->semaphore, INFINITE, false);
}
}
#elif defined(DQN_UNIX_PLATFORM)
sem_wait(&queue->semaphore);
#else
DQN_ASSERT(DQN_INVALID_CODE_PATH);
return 0;
#error Unsupported platform
#endif
}
}
}
FILE_SCOPE bool DqnJobQueueInternal_CreateSemaphore(DqnJobQueue *const queue, const u32 initSignalCount, const u32 maxSignalCount)
{
if (!queue) return false;
#if defined(DQN_WIN32_PLATFORM)
queue->semaphore = (void *)CreateSemaphore(NULL, initSignalCount, maxSignalCount, NULL);
DQN_ASSERT_HARD(queue->semaphore);
#elif defined(DQN_UNIX_PLATFORM)
// TODO(doyle): Use max count for unix
// TODO(doyle): Error handling
const u32 UNIX_DONT_SHARE_BETWEEN_PROCESSES = 0;
i32 error = sem_init(&queue->semaphore, UNIX_DONT_SHARE_BETWEEN_PROCESSES, 0);
DQN_ASSERT_HARD(error == 0);
for (u32 i = 0; i < initSignalCount; i++)
DQN_ASSERT_HARD(sem_post(&queue->semaphore) == 0);
#else
#error Unknown platform
#endif
return true;
}
FILE_SCOPE bool DqnJobQueueInternal_ReleaseSemaphore(DqnJobQueue *const queue)
{
DQN_ASSERT(queue);
#if defined(DQN_WIN32_PLATFORM)
DQN_ASSERT(queue->semaphore);
ReleaseSemaphore(queue->semaphore, 1, NULL);
#elif defined(DQN_UNIX_PLATFORM)
// TODO(doyle): Error handling
DQN_ASSERT_HARD(sem_post(&queue->semaphore) == 0);
#else
#error Unknown platform
#endif
return true;
}
////////////////////////////////////////////////////////////////////////////////
// Win32Platform > #DqnJobQueue Implementation
// XPlatform > #DqnJobQueue Implementation
////////////////////////////////////////////////////////////////////////////////
DQN_FILE_SCOPE bool DqnJobQueue_Init(DqnJobQueue *const queue, DqnJob *const jobList,
const u32 jobListSize, const u32 numThreads)
@ -6271,12 +6413,7 @@ DQN_FILE_SCOPE bool DqnJobQueue_Init(DqnJobQueue *const queue, DqnJob *const job
queue->jobList = jobList;
queue->size = jobListSize;
#ifdef DQN_WIN32_PLATFORM
queue->semaphore = (void *)CreateSemaphore(NULL, 0, numThreads, NULL);
DQN_ASSERT_HARD(queue->semaphore);
#else
DQN_ASSERT_HARD(DQN_INVALID_CODE_PATH);
#endif
DQN_ASSERT(DqnJobQueueInternal_CreateSemaphore(queue, 0, numThreads));
// Create threads
u32 numThreadsCreated = DqnJobQueueInternal_ThreadCreate(
@ -6295,12 +6432,8 @@ DQN_FILE_SCOPE bool DqnJobQueue_AddJob(DqnJobQueue *const queue, const DqnJob jo
queue->jobList[queue->jobInsertIndex] = job;
DqnAtomic_Add32(&queue->numJobsToComplete);
DQN_ASSERT(DqnJobQueueInternal_ReleaseSemaphore(queue));
#ifdef DQN_WIN32_PLATFORM
ReleaseSemaphore(queue->semaphore, 1, NULL);
#else
DQN_ASSERT_HARD(DQN_INVALID_CODE_PATH);
#endif
queue->jobInsertIndex = newJobInsertIndex;
return true;
}
@ -6348,7 +6481,7 @@ DQN_FILE_SCOPE bool DqnJobQueue_AllJobsComplete(DqnJobQueue *const queue)
}
////////////////////////////////////////////////////////////////////////////////
// Win32Platform > #DqnJobQueue CPP Implementation
// XPlatform > #DqnJobQueue CPP Implementation
////////////////////////////////////////////////////////////////////////////////
bool DqnJobQueue::Init(const DqnJob *const jobList_, const u32 jobListSize, const u32 numThreads)
{
@ -6361,6 +6494,236 @@ void DqnJobQueue::BlockAndCompleteAllJobs() { DqnJobQueue_Block
bool DqnJobQueue::TryExecuteNextJob() { return DqnJobQueue_TryExecuteNextJob(this); }
bool DqnJobQueue::AllJobsComplete () { return DqnJobQueue_AllJobsComplete(this); }
////////////////////////////////////////////////////////////////////////////////
// XPlatform > #DqnAtomic Implementation
////////////////////////////////////////////////////////////////////////////////
DQN_FILE_SCOPE u32 DqnAtomic_CompareSwap32(u32 volatile *const dest, const u32 swapVal,
const u32 compareVal)
{
u32 result = 0;
#if defined(DQN_WIN32_PLATFORM)
DQN_ASSERT(sizeof(LONG) == sizeof(u32));
result = (u32)InterlockedCompareExchange((LONG volatile *)dest, (LONG)swapVal, (LONG)compareVal);
#elif defined(DQN_UNIX_PLATFORM)
result = __sync_val_compare_and_swap(dest, compareVal, swapVal);
#else
#error Unsupported platform
#endif
return result;
}
DQN_FILE_SCOPE u32 DqnAtomic_Add32(u32 volatile *const src)
{
u32 result = 0;
#if defined(DQN_WIN32_PLATFORM)
DQN_ASSERT(sizeof(LONG) == sizeof(u32));
result = (u32)InterlockedIncrement((LONG volatile *)src);
#elif defined(DQN_UNIX_PLATFORM)
result = __sync_add_and_fetch(src, 1);
#else
#error Unsupported platform
#endif
return result;
}
DQN_FILE_SCOPE u32 DqnAtomic_Sub32(u32 volatile *const src)
{
u32 result = 0;
#if defined(DQN_WIN32_PLATFORM)
DQN_ASSERT(sizeof(LONG) == sizeof(u32));
result = (u32)InterlockedDecrement((LONG volatile *)src);
#elif defined(DQN_UNIX_PLATFORM)
result = __sync_sub_and_fetch(src, 1);
#else
#error Unsupported platform
#endif
return result;
}
////////////////////////////////////////////////////////////////////////////////
// XPlatform > #DqnPlatformInternal Implementation
////////////////////////////////////////////////////////////////////////////////
#if defined(DQN_UNIX_PLATFORM)
FILE_SCOPE void DqnPlatformInternal_UnixGetNumCoresAndThreads(u32 *const numCores,
u32 *const numThreadsPerCore)
{
if (!numThreadsPerCore && !numCores) return;
// TODO(doyle): Not exactly standard
DqnFile file = {};
DQN_ASSERT_HARD(
DqnFile_Open("/proc/cpuinfo", &file, DqnFilePermissionFlag_Read, DqnFileAction_OpenOnly));
// NOTE: cpuinfo is generated by kernel when queried, so we don't actually know the size.
DQN_ASSERT_HARD(file.size == 0);
char c = fgetc((FILE *)file.handle);
while(c != EOF)
{
file.size++;
c = fgetc((FILE *)file.handle);
}
rewind((FILE *)file.handle);
DQN_ASSERT_HARD(file.size > 0);
u8 *readBuffer = (u8 *)DqnMem_Calloc(file.size);
if (readBuffer)
{
size_t bytesRead = DqnFile_Read(&file, readBuffer, file.size);
DQN_ASSERT_HARD(bytesRead == file.size);
const char *srcPtr = (char *)readBuffer;
u32 srcLen = file.size;
#define DQN_ADVANCE_CHAR_PTR_AND_LEN_INTERNAL(ptr, len, offset) \
ptr += offset; \
len -= offset
if (numThreadsPerCore)
{
*numThreadsPerCore = 0;
// Find the offset to the processor field and move to it
const char processorStr[] = "processor";
i32 processorOffset = DqnStr_FindFirstOccurence(srcPtr, srcLen, processorStr,
DQN_ARRAY_COUNT(processorStr));
DQN_ASSERT_HARD(processorOffset != -1);
DQN_ADVANCE_CHAR_PTR_AND_LEN_INTERNAL(srcPtr, srcLen, processorOffset);
// Find the offset to the colon delimiter and advance to 1 after it
i32 colonOffset = DqnStr_FindFirstOccurence(srcPtr, srcLen, ":", 1) + 1;
DQN_ASSERT_HARD(colonOffset != -1);
DQN_ADVANCE_CHAR_PTR_AND_LEN_INTERNAL(srcPtr, srcLen, colonOffset);
// Read num processors, i.e. logical cores/hyper threads
*numThreadsPerCore = Dqn_StrToI64(srcPtr, srcLen);
if (*numThreadsPerCore == 0) *numThreadsPerCore = 1;
}
if (numCores)
{
*numCores = 0;
// Find the offset to the cpu cores field and move to it
const char cpuCoresStr[] = "cpu cores";
i32 cpuCoresOffset = DqnStr_FindFirstOccurence(srcPtr, srcLen, cpuCoresStr,
DQN_ARRAY_COUNT(cpuCoresStr));
DQN_ASSERT_HARD(cpuCoresOffset != -1);
DQN_ADVANCE_CHAR_PTR_AND_LEN_INTERNAL(srcPtr, srcLen, cpuCoresOffset);
// Find the offset to the colon delimiter and advance to 1 after it
i32 colonOffset = DqnStr_FindFirstOccurence(srcPtr, srcLen, ":", 1) + 1;
DQN_ASSERT_HARD(colonOffset != -1);
DQN_ADVANCE_CHAR_PTR_AND_LEN_INTERNAL(srcPtr, srcLen, colonOffset);
// Read num cores value, i.e. physical cores
*numCores = Dqn_StrToI64(srcPtr, srcLen);
}
DqnFile_Close(&file);
DqnMem_Free(readBuffer);
}
else
{
// TODO(doyle): Out of mem
DQN_ASSERT_HARD(DQN_INVALID_CODE_PATH);
}
}
#endif // DQN_UNIX_PLATFORM
#if defined(DQN_WIN32_PLATFORM)
FILE_SCOPE void DqnPlatformInternal_Win32GetNumCoresAndThreads(u32 *const numCores,
u32 *const numThreadsPerCore)
{
if (numThreadsPerCore)
{
SYSTEM_INFO systemInfo;
GetNativeSystemInfo(&systemInfo);
*numThreadsPerCore = systemInfo.dwNumberOfProcessors;
}
if (numCores)
{
*numCores = 0;
DWORD requiredSize = 0;
u8 insufficientBuffer = {0};
GetLogicalProcessorInformationEx(
RelationProcessorCore, (SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX *)insufficientBuffer,
&requiredSize);
u8 *rawProcInfoArray = (u8 *)DqnMem_Calloc(requiredSize);
if (!DQN_ASSERT_MSG(rawProcInfoArray, "Calloc failed, could not allocate memory"))
{
return;
}
if (GetLogicalProcessorInformationEx(
RelationProcessorCore, (SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX *)rawProcInfoArray,
&requiredSize))
{
SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX *logicalProcInfo =
(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX *)rawProcInfoArray;
DWORD bytesRead = 0;
do
{
// NOTE: High efficiency value has greater performance and less efficiency.
PROCESSOR_RELATIONSHIP *procInfo = &logicalProcInfo->Processor;
u32 efficiency = procInfo->EfficiencyClass;
(*numCores)++;
DQN_ASSERT_HARD(logicalProcInfo->Relationship == RelationProcessorCore);
DQN_ASSERT_HARD(procInfo->GroupCount == 1);
bytesRead += logicalProcInfo->Size;
logicalProcInfo =
(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX *)((u8 *)logicalProcInfo +
logicalProcInfo->Size);
} while (bytesRead < requiredSize);
}
else
{
DqnWin32_DisplayLastError("GetLogicalProcessorInformationEx() failed");
}
DqnMem_Free(rawProcInfoArray);
}
}
#endif // DQN_WIN32_PLATFORM
////////////////////////////////////////////////////////////////////////////////
// XPlatform > #DqnPlatform Implementation
////////////////////////////////////////////////////////////////////////////////
DQN_FILE_SCOPE void DqnPlatform_GetNumThreadsAndCores(u32 *const numCores, u32 *const numThreadsPerCore)
{
#if defined(DQN_WIN32_PLATFORM)
DqnPlatformInternal_Win32GetNumCoresAndThreads(numCores, numThreadsPerCore);
#elif defined(DQN_UNIX_PLATFORM)
DqnPlatformInternal_UnixGetNumCoresAndThreads(numCores, numThreadsPerCore);
#else
#error Unsupported platform
#endif
}
#endif // DQN_XPLATFORM_LAYER
////////////////////////////////////////////////////////////////////////////////
// #Platform Implementation
////////////////////////////////////////////////////////////////////////////////
#ifdef DQN_WIN32_PLATFORM
////////////////////////////////////////////////////////////////////////////////
// #Win32Platform Implementation
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
// Win32Platform > #DqnWin32 Implementation
////////////////////////////////////////////////////////////////////////////////
@ -6473,59 +6836,4 @@ DQN_FILE_SCOPE i32 DqnWin32_GetEXEDirectory(char *const buf, const u32 bufLen)
return lastSlashIndex;
}
DQN_FILE_SCOPE void DqnWin32_GetNumThreadsAndCores(i32 *const numCores, i32 *const numThreadsPerCore)
{
if (numThreadsPerCore)
{
SYSTEM_INFO systemInfo;
GetNativeSystemInfo(&systemInfo);
*numThreadsPerCore = systemInfo.dwNumberOfProcessors;
}
if (numCores)
{
*numCores = 0;
DWORD requiredSize = 0;
u8 insufficientBuffer = {0};
GetLogicalProcessorInformationEx(
RelationProcessorCore, (SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX *)insufficientBuffer,
&requiredSize);
u8 *rawProcInfoArray = (u8 *)DqnMem_Calloc(requiredSize);
if (!DQN_ASSERT_MSG(rawProcInfoArray, "Calloc failed, could not allocate memory"))
{
return;
}
if (GetLogicalProcessorInformationEx(
RelationProcessorCore, (SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX *)rawProcInfoArray,
&requiredSize))
{
SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX *logicalProcInfo =
(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX *)rawProcInfoArray;
DWORD bytesRead = 0;
do
{
// NOTE: High efficiency value has greater performance and less efficiency.
PROCESSOR_RELATIONSHIP *procInfo = &logicalProcInfo->Processor;
u32 efficiency = procInfo->EfficiencyClass;
(*numCores)++;
DQN_ASSERT_HARD(logicalProcInfo->Relationship == RelationProcessorCore);
DQN_ASSERT_HARD(procInfo->GroupCount == 1);
bytesRead += logicalProcInfo->Size;
logicalProcInfo =
(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX *)((u8 *)logicalProcInfo +
logicalProcInfo->Size);
} while (bytesRead < requiredSize);
}
else
{
DqnWin32_DisplayLastError("GetLogicalProcessorInformationEx() failed");
}
DqnMem_Free(rawProcInfoArray);
}
}
#endif // DQN_WIN32_PLATFORM

View File

@ -1697,7 +1697,6 @@ void FileTest()
}
#endif
#ifdef DQN_WIN32_IMPLEMENTATION
FILE_SCOPE u32 volatile globalDebugCounter;
FILE_SCOPE DqnLock globalJobQueueLock;
const u32 QUEUE_SIZE = 256;
@ -1706,14 +1705,16 @@ FILE_SCOPE void JobQueueDebugCallbackIncrementCounter(DqnJobQueue *const queue,
{
DQN_ASSERT(queue->size == QUEUE_SIZE);
{
bool succeeded;
DqnLockGuard guard = DqnLockGuard(&globalJobQueueLock, &succeeded);
DQN_ASSERT(succeeded);
DqnLockGuard guard = globalJobQueueLock.LockGuard();
globalDebugCounter++;
u32 number = globalDebugCounter;
#if defined(DQN_WIN32_IMPLEMENTATION)
printf("JobQueueDebugCallbackIncrementCounter(): Thread %d: Incrementing Number: %d\n",
GetCurrentThreadId(), number);
#elif defined(DQN_UNIX_IMPLEMENTATION)
printf("JobQueueDebugCallbackIncrementCounter(): Thread unix: Incrementing Number: %d\n",
number);
#endif
}
}
@ -1725,10 +1726,12 @@ FILE_SCOPE void JobQueueTest()
DqnMemStack memStack = {};
DQN_ASSERT_HARD(memStack.Init(DQN_MEGABYTE(1), true));
i32 numThreads, numCores;
DqnWin32_GetNumThreadsAndCores(&numCores, &numThreads);
u32 numThreads, numCores;
DqnPlatform_GetNumThreadsAndCores(&numCores, &numThreads);
DQN_ASSERT(numThreads > 0 && numCores > 0);
i32 totalThreads = (numCores - 1) * numThreads;
u32 totalThreads = (numCores - 1) * numThreads;
if (totalThreads == 0) totalThreads = 1;
DqnJobQueue jobQueue = {};
DqnJob *jobList = (DqnJob *)memStack.Push(sizeof(*jobQueue.jobList) * QUEUE_SIZE);
@ -1736,7 +1739,7 @@ FILE_SCOPE void JobQueueTest()
const u32 WORK_ENTRIES = 2048;
DQN_ASSERT(DqnLock_Init(&globalJobQueueLock));
for (i32 i = 0; i < WORK_ENTRIES; i++)
for (u32 i = 0; i < WORK_ENTRIES; i++)
{
DqnJob job = {};
job.callback = JobQueueDebugCallbackIncrementCounter;
@ -1752,11 +1755,9 @@ FILE_SCOPE void JobQueueTest()
DQN_ASSERT(globalDebugCounter == WORK_ENTRIES);
DqnLock_Delete(&globalJobQueueLock);
}
#endif
int main(void)
{
#if 1
StringsTest();
RandomTest();
MathTest();
@ -1764,14 +1765,10 @@ int main(void)
VecTest();
ArrayTest();
MemStackTest();
#endif
#ifdef DQN_XPLATFORM_LAYER
FileTest();
OtherTest();
#endif
#ifdef DQN_WIN32_IMPLEMENTATION
JobQueueTest();
#endif

View File

@ -1,3 +1,3 @@
all: dqn_unit_test.cpp
mkdir -p bin
g++ -o bin/dqn_unit_test dqn_unit_test.cpp -lm -Wall -Werror
g++ -o bin/dqn_unit_test dqn_unit_test.cpp -lm -Wall -Werror -pthread -ggdb