Expose job queue, update init function

This commit is contained in:
Doyle Thai 2017-06-23 23:12:20 +10:00
parent f9de41b6c0
commit 45bc637773
3 changed files with 91 additions and 75 deletions

129
dqn.h
View File

@ -12,6 +12,13 @@
#include "dqn.h"
*/
// Conventions
// All data structures fields are exposed by default, with exceptions here and there. The rationale
// is I rarely go into libraries and start changing values around in fields unless I know the
// implementation OR we're required to fill out a struct for some function.
// Just treat all struct fields to be internal and read-only unless explicitly stated otherwise.
////////////////////////////////////////////////////////////////////////////////
// Table Of Contents #TOC #TableOfContents
////////////////////////////////////////////////////////////////////////////////
@ -53,7 +60,7 @@
// #DqnSprintf Cross-platform Sprintf Implementation (Public Domain lib stb_sprintf)
////////////////////////////////////////////////////////////////////////////////
// Global Preprocessor Checks
// Preprocessor Checks
////////////////////////////////////////////////////////////////////////////////
// This needs to be above the portable layer so that, if the user requests
// a platform implementation, platform specific implementations in the portable
@ -209,9 +216,14 @@ enum DqnMemStackFlag
typedef struct DqnMemStack
{
// The memory block allocated for the stack
struct DqnMemStackBlock *block;
// Bits set from enum DqnMemStackFlag
u32 flags;
i32 tempRegionCount;
// Allocations are address aligned to this value. Not to be modified as popping allocations uses this to realign size
u32 byteAlign;
#if defined(DQN_CPP_MODE)
@ -300,7 +312,10 @@ DQN_FILE_SCOPE void DqnMemStack_ClearCurrBlock(DqnMemStack *const stack, const
// regions
typedef struct DqnMemStackTempRegion
{
// The stack associated with this TempRegion
DqnMemStack *stack;
// Store memBlock state to revert back to on DqnMemStackTempRegion_End()
struct DqnMemStackBlock *startingBlock;
size_t used;
} DqnMemStackTempRegion;
@ -330,10 +345,12 @@ private:
// NOT be modified directly, only indirectly through the regular API.
typedef struct DqnMemStackBlock
{
// The raw memory block, size and used count
u8 *memory;
size_t size;
size_t used;
// The allocator uses a linked list approach for additional blocks beyond capacity
DqnMemStackBlock *prevBlock;
} DqnMemStackBlock;
@ -404,7 +421,7 @@ typedef struct DqnMemAPICallbackResult
enum DqnMemAPICallbackType type;
} DqnMemAPICallbackResult;
// Function prototype for implementing a DqnMemAPI_Callback. You must fill out the result structure.
// Function prototype for implementing a DqnMemAPI_Callback. You must fill out the "result" structure.
// result: Is always guaranteed to be a valid pointer.
typedef void DqnMemAPI_Callback(DqnMemAPICallbackInfo info, DqnMemAPICallbackResult *result);
@ -424,12 +441,15 @@ DQN_FILE_SCOPE DqnMemAPI DqnMemAPI_DefaultUseCalloc();
template <typename T>
struct DqnArray
{
// Function pointers to custom allocators
DqnMemAPI memAPI;
// Array state
u64 count;
u64 capacity;
T *data;
// API
void Init (const size_t capacity, DqnMemAPI memAPI = DqnMemAPI_DefaultUseCalloc());
bool Free ();
bool Grow ();
@ -1065,6 +1085,7 @@ DQN_FILE_SCOPE u32 DqnAtomic_Sub32 (u32 volatile *src);
// wait for all jobs to complete using DqnJobQueue_TryExecuteNextJob() or spinlock on
// DqnJobQueue_AllJobsComplete(). Alternatively you can combine both for the main thread to help
// complete work and not move on until all tasks are complete.
typedef struct DqnJobQueue DqnJobQueue;
typedef void DqnJob_Callback(DqnJobQueue *const queue, void *const userData);
@ -1074,10 +1095,35 @@ typedef struct DqnJob
void *userData;
} DqnJob;
// memSize: The size of the supplied memory. If "mem" is NULL OR "memsize" is NULL/0 OR "queueSize"
// is 0, the function puts required size it needs into "memSize".
// return: The JobQueue or NULL if args invalid.
DQN_FILE_SCOPE DqnJobQueue *DqnJobQueue_InitWithMem(const void *const mem, size_t *const memSize, const u32 queueSize, const u32 numThreads);
typedef struct DqnJobQueue
{
// JobList Circular Array, is setup in Init()
DqnJob *jobList;
u32 size;
// NOTE(doyle): Modified by main+worker threads
u32 volatile jobToExecuteIndex;
u32 volatile numJobsToComplete;
void *semaphore;
// NOTE: Modified by main thread ONLY
u32 volatile jobInsertIndex;
#if defined(DQN_CPP_MODE)
bool Init (DqnJob const *const jobList_, const u32 jobListSize, const u32 numThreads);
bool AddJob (const DqnJob job);
bool TryExecuteNextJob();
bool AllJobsComplete ();
#endif
} DqnJobQueue;
// queue: Pass a pointer to a zero cleared DqnJobQueue struct
// jobList: Pass in a pointer to an array of DqnJob's
// jobListSize: The number of elements in the jobList array
// numThreads: The number of threads the queue should request from the OS for working on the queue
// return: FALSE if invalid args.
DQN_FILE_SCOPE bool DqnJobQueue_Init(DqnJobQueue *const queue, DqnJob const *const jobList,
const u32 jobListSize, const u32 numThreads);
// return: FALSE if the job is not able to be added, this occurs if the queue is full.
DQN_FILE_SCOPE bool DqnJobQueue_AddJob(DqnJobQueue *const queue, const DqnJob job);
@ -1734,9 +1780,9 @@ DqnMemStackInternal_AllocateBlock(u32 byteAlign, size_t size)
// #DqnMemStack CPP Implementation
////////////////////////////////////////////////////////////////////////////////
#if defined(DQN_CPP_MODE)
bool DqnMemStack::InitWithFixedMem (u8 *const mem, const size_t memSize, const u32 byteAlignment) { return DqnMemStack_InitWithFixedMem (this, mem, memSize, byteAlign); }
bool DqnMemStack::InitWithFixedSize(const size_t size, const bool zeroClear, const u32 byteAlignment) { return DqnMemStack_InitWithFixedSize(this, size, zeroClear, byteAlign); }
bool DqnMemStack::Init (const size_t size, const bool zeroClear, const u32 byteAlignment) { return DqnMemStack_Init (this, size, zeroClear, byteAlign); }
bool DqnMemStack::InitWithFixedMem (u8 *const mem, const size_t memSize, const u32 byteAlignment) { return DqnMemStack_InitWithFixedMem (this, mem, memSize, byteAlignment); }
bool DqnMemStack::InitWithFixedSize(const size_t size, const bool zeroClear, const u32 byteAlignment) { return DqnMemStack_InitWithFixedSize(this, size, zeroClear, byteAlignment); }
bool DqnMemStack::Init (const size_t size, const bool zeroClear, const u32 byteAlignment) { return DqnMemStack_Init (this, size, zeroClear, byteAlignment); }
void *DqnMemStack::Push(size_t size) { return DqnMemStack_Push(this, size); }
void DqnMemStack::Pop (void *const ptr, size_t size) { DqnMemStack_Pop (this, ptr, size); }
@ -6049,23 +6095,6 @@ DQN_FILE_SCOPE u32 DqnAtomic_Sub32(u32 volatile *src)
#endif
}
////////////////////////////////////////////////////////////////////////////////
// Win32Platform > #DqnJobQueue Implementation
////////////////////////////////////////////////////////////////////////////////
typedef struct DqnJobQueue
{
DqnJob *jobList;
u32 size;
// NOTE: Modified by main+worker threads
u32 volatile jobToExecuteIndex;
u32 volatile numJobsToComplete;
void *semaphore;
// NOTE: Modified by main thread ONLY
u32 volatile jobInsertIndex;
} DqnJobQueue;
////////////////////////////////////////////////////////////////////////////////
// Win32Platform > #DqnJobQueueInternal Implementation
////////////////////////////////////////////////////////////////////////////////
@ -6114,35 +6143,13 @@ FILE_SCOPE u32 DqnJobQueueInternal_ThreadCallback(void *threadParam)
////////////////////////////////////////////////////////////////////////////////
// Win32Platform > #DqnJobQueue Implementation
////////////////////////////////////////////////////////////////////////////////
DQN_FILE_SCOPE DqnJobQueue *DqnJobQueue_InitWithMem(const void *const mem, size_t *const memSize,
const u32 queueSize, const u32 numThreads)
DQN_FILE_SCOPE bool DqnJobQueue_Init(DqnJobQueue *const queue, DqnJob *const jobList,
const u32 jobListSize, const u32 numThreads)
{
DqnJobQueue emptyQueue = {};
size_t reqStructSize = sizeof(emptyQueue);
size_t reqQueueSize = sizeof(*emptyQueue.jobList) * queueSize;
if (!queue || !jobList || jobListSize == 0 || numThreads == 0) return false;
queue->jobList = jobList;
queue->size = jobListSize;
if (!mem || !memSize || *memSize == 0 || queueSize == 0)
{
*memSize = reqStructSize + reqQueueSize;
return NULL;
}
u8 *memPtr = (u8 *)mem;
// Sub-allocate Queue
DqnJobQueue *queue = (DqnJobQueue *)memPtr;
*queue = emptyQueue;
queue->size = queueSize;
// Sub-allocate jobList
memPtr += reqStructSize;
queue->jobList = (DqnJob *)memPtr;
// Validate memPtr used size
memPtr += reqQueueSize;
DQN_ASSERT_HARD((size_t)(memPtr - (u8 *)mem) <= *memSize);
// Create semaphore
#ifdef DQN_WIN32_PLATFORM
queue->semaphore = (void *)CreateSemaphore(NULL, 0, numThreads, NULL);
DQN_ASSERT_HARD(queue->semaphore);
@ -6155,7 +6162,8 @@ DQN_FILE_SCOPE DqnJobQueue *DqnJobQueue_InitWithMem(const void *const mem, size_
DQN_JOB_QUEUE_INTERNAL_THREAD_DEFAULT_STACK_SIZE, DqnJobQueueInternal_ThreadCallback,
(void *)queue, numThreads);
DQN_ASSERT_HARD(numThreads == numThreadsCreated);
return queue;
return true;
}
DQN_FILE_SCOPE bool DqnJobQueue_AddJob(DqnJobQueue *const queue, const DqnJob job)
@ -6208,6 +6216,19 @@ DQN_FILE_SCOPE bool DqnJobQueue_AllJobsComplete(DqnJobQueue *const queue)
return result;
}
////////////////////////////////////////////////////////////////////////////////
// Win32Platform > #DqnJobQueue CPP Implementation
////////////////////////////////////////////////////////////////////////////////
bool DqnJobQueue::Init(DqnJob const *const jobList_, const u32 jobListSize, const u32 numThreads)
{
bool result = DqnJobQueue_Init(this, jobList, jobListSize, numThreads);
return result;
}
bool DqnJobQueue::AddJob (const DqnJob job) { return DqnJobQueue_AddJob(this, job); }
bool DqnJobQueue::TryExecuteNextJob() { return DqnJobQueue_TryExecuteNextJob(this); }
bool DqnJobQueue::AllJobsComplete () { return DqnJobQueue_AllJobsComplete(this); }
////////////////////////////////////////////////////////////////////////////////
// Win32Platform > #DqnWin32 Implementation
////////////////////////////////////////////////////////////////////////////////

View File

@ -1705,42 +1705,37 @@ FILE_SCOPE void JobQueueDebugCallbackIncrementCounter(DqnJobQueue *const queue,
FILE_SCOPE void JobQueueTest()
{
size_t requiredSize;
const i32 QUEUE_SIZE = 256;
DqnJobQueue_InitWithMem(NULL, &requiredSize, QUEUE_SIZE, 0);
DqnMemStack memStack = {};
DQN_ASSERT_HARD(DqnMemStack_Init(&memStack, requiredSize, true));
DQN_ASSERT_HARD(memStack.Init(DQN_MEGABYTE(1), true));
i32 numThreads, numCores;
DqnWin32_GetNumThreadsAndCores(&numCores, &numThreads);
DQN_ASSERT(numThreads > 0 && numCores > 0);
i32 totalThreads = (numCores - 1) * numThreads;
void *jobQueueMem = DqnMemStack_Push(&memStack, requiredSize);
DQN_ASSERT_HARD(jobQueueMem);
DqnJobQueue *jobQueue =
DqnJobQueue_InitWithMem(jobQueueMem, &requiredSize, QUEUE_SIZE, totalThreads);
DQN_ASSERT_HARD(jobQueue);
const i32 QUEUE_SIZE = 256;
DqnJobQueue jobQueue = {};
DqnJob *jobList = (DqnJob *)memStack.Push(sizeof(*jobQueue.jobList) * QUEUE_SIZE);
DQN_ASSERT(DqnJobQueue_Init(&jobQueue, jobList, QUEUE_SIZE, totalThreads));
DQN_ASSERT(DqnLock_Init(&globalJobQueueLock));
for (i32 i = 0; i < DQN_ARRAY_COUNT(globalDebugCounterMemoize); i++)
{
DqnJob job = {};
job.callback = JobQueueDebugCallbackIncrementCounter;
while (!DqnJobQueue_AddJob(jobQueue, job))
while (!DqnJobQueue_AddJob(&jobQueue, job))
{
DqnJobQueue_TryExecuteNextJob(jobQueue);
DqnJobQueue_TryExecuteNextJob(&jobQueue);
}
}
while (DqnJobQueue_TryExecuteNextJob(jobQueue))
while (DqnJobQueue_TryExecuteNextJob(&jobQueue))
;
for (i32 i = 0; i < DQN_ARRAY_COUNT(globalDebugCounterMemoize); i++)
DQN_ASSERT(globalDebugCounterMemoize[i]);
while (DqnJobQueue_TryExecuteNextJob(jobQueue) && !DqnJobQueue_AllJobsComplete(jobQueue))
while (DqnJobQueue_TryExecuteNextJob(&jobQueue) && !DqnJobQueue_AllJobsComplete(&jobQueue))
;
printf("\nJobQueueTest(): Final incremented value: %d\n", globalDebugCounter);

View File

@ -21,7 +21,7 @@ Global
Release|x86 = Release|x86
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{87785192-6F49-4F85-AA0D-F0AFA5CCCDDA}.Release|x86.ActiveCfg = Release|x86
{87785192-6F49-4F85-AA0D-F0AFA5CCCDDA}.Release|x86.ActiveCfg = Release|x64
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE