DN/Source/Extra/dn_net_curl.cpp

688 lines
29 KiB
C++

#define DN_NET_CURL_CPP
#if defined(_CLANGD)
#include "dn_net.h"
#include "dn_net_curl.h"
#endif
struct DN_NETCurlRequest
{
void *handle;
struct curl_slist *slist;
char error[CURL_ERROR_SIZE];
bool ws_has_more;
DN_Str8Builder str8_builder;
};
enum DN_NETCurlRingEventType
{
DN_NETCurlRingEventType_Nil,
DN_NETCurlRingEventType_DoRequest,
DN_NETCurlRingEventType_SendWS,
DN_NETCurlRingEventType_ReceivedWSReceipt,
DN_NETCurlRingEventType_DeinitRequest,
};
struct DN_NETCurlRingEvent
{
DN_NETCurlRingEventType type;
DN_NETRequestHandle request;
DN_USize ws_send_size;
DN_NETWSSend ws_send;
};
static DN_NETCurlRequest *DN_NET_CurlRequestFromRequest_(DN_NETRequest *req)
{
DN_NETCurlRequest *result = req ? DN_Cast(DN_NETCurlRequest *) req->context[0] : 0;
return result;
}
static DN_NETCore *DN_NET_CurlNetFromRequest(DN_NETRequest *req)
{
DN_NETCore *result = req ? DN_Cast(DN_NETCore *) req->context[1] : 0;
return result;
}
static bool DN_NET_CurlRequestIsInList(DN_NETRequest const *first, DN_NETRequest const *find)
{
bool result = false;
for (DN_NETRequest const *it = first; !result && it; it = it->next)
result = find == it;
return result;
}
static void DN_NET_CurlMarkRequestDone_(DN_NETCore *net, DN_NETRequest *request)
{
DN_Assert(request);
DN_Assert(net);
// NOTE: The done list in CURL is also used as a place to put websocket requests after removing it
// from the 'ws_list'. By doing this we are stopping the CURL thread from receiving more data on
// the socket as that thread ticks the list of 'ws_list' sockets for data.
//
// Once the caller waited and has received the data from the websocket, the request is put back
// into the 'ws_list' which then lets the CURL thread start receiving more data for that socket.
//
// Since CURL uses a background thread, we do this behind a mutex
DN_NETCurlCore *curl = DN_Cast(DN_NETCurlCore *)net->context;
for (DN_OS_MutexScope(&curl->list_mutex)) {
DN_Assert(DN_NET_CurlRequestIsInList(curl->thread_request_list, request));
DN_DoublyLLDetach(curl->thread_request_list, request);
DN_DoublyLLAppend(curl->response_list, request);
}
DN_OS_SemaphoreIncrement(&net->completion_sem, 1);
DN_OS_SemaphoreIncrement(&request->completion_sem, 1);
}
static DN_USize DN_NET_CurlHTTPCallback_(char *payload, DN_USize size, DN_USize count, void *user_data)
{
DN_NETRequest *req = DN_Cast(DN_NETRequest *) user_data;
DN_NETCurlRequest *curl_req = DN_NET_CurlRequestFromRequest_(req);
DN_USize result = 0;
DN_USize payload_size = size * count;
if (DN_Str8BuilderAppendBytesCopy(&curl_req->str8_builder, payload, payload_size))
result = payload_size;
return result;
}
static int32_t DN_NET_CurlThreadEntryPoint_(DN_OSThread *thread)
{
DN_NETCore *net = DN_Cast(DN_NETCore *) thread->user_context;
DN_NETCurlCore *curl = DN_Cast(DN_NETCurlCore *) net->context;
DN_OS_ThreadSetName(DN_Str8FromPtr(curl->thread.name.data, curl->thread.name.size));
while (!curl->kill_thread) {
DN_OSTLSTMem tmem = DN_OS_TLSPushTMem(nullptr);
// NOTE: Handle events sitting in the ring queue
for (bool dequeue_ring = true; dequeue_ring;) {
DN_NETCurlRingEvent event = {};
for (DN_OS_MutexScope(&curl->ring_mutex)) {
if (DN_Ring_HasData(&curl->ring, sizeof(event)))
DN_Ring_Read(&curl->ring, &event, sizeof(event));
}
DN_NETRequest *req = DN_NET_RequestFromHandle(event.request);
DN_NETCurlRequest *curl_req = DN_NET_CurlRequestFromRequest_(req);
switch (event.type) {
case DN_NETCurlRingEventType_Nil: dequeue_ring = false; break;
case DN_NETCurlRingEventType_DoRequest: {
DN_Assert(req->response.state == DN_NETResponseState_Nil);
DN_Assert(req->type != DN_NETRequestType_Nil);
// NOTE: Attach it to the CURL thread's request list
for (DN_OS_MutexScope(&curl->list_mutex)) {
DN_Assert(DN_NET_CurlRequestIsInList(curl->request_list, req));
DN_DoublyLLDetach(curl->request_list, req);
}
DN_DoublyLLAppend(curl->thread_request_list, req);
// NOTE: Add the connection to CURLM and start ticking it once we finish handling all the
// ring events
CURLMcode multi_add = curl_multi_add_handle(curl->thread_curlm, curl_req->handle);
DN_Assert(multi_add == CURLM_OK);
} break;
case DN_NETCurlRingEventType_SendWS: {
DN_Str8 payload = {};
for (DN_OS_MutexScope(&curl->ring_mutex)) {
DN_Assert(DN_Ring_HasData(&curl->ring, event.ws_send_size));
payload = DN_Str8FromArena(tmem.arena, event.ws_send_size, DN_ZMem_No);
DN_Ring_Read(&curl->ring, payload.data, payload.size);
}
DN_U32 curlws_flag = 0;
switch (event.ws_send) {
case DN_NETWSSend_Text: curlws_flag = CURLWS_TEXT; break;
case DN_NETWSSend_Binary: curlws_flag = CURLWS_BINARY; break;
case DN_NETWSSend_Close: curlws_flag = CURLWS_CLOSE; break;
case DN_NETWSSend_Ping: curlws_flag = CURLWS_PING; break;
case DN_NETWSSend_Pong: curlws_flag = CURLWS_PONG; break;
}
DN_Assert(req->type == DN_NETRequestType_WS);
DN_Assert(req->response.state == DN_NETResponseState_WSOpen);
DN_Assert(DN_NET_CurlRequestIsInList(curl->thread_request_list, req));
DN_USize sent = 0;
CURLcode send_result = curl_ws_send(curl_req->handle, payload.data, payload.size, &sent, 0, curlws_flag);
DN_AssertF(send_result == CURLE_OK, "Failed to send: %s", curl_easy_strerror(send_result));
DN_AssertF(sent == payload.size, "Failed to send all bytes (%zu vs %zu)", sent, payload.size);
} break;
case DN_NETCurlRingEventType_ReceivedWSReceipt: {
DN_Assert(req->type == DN_NETRequestType_WS);
DN_Assert(req->response.state >= DN_NETResponseState_WSOpen && req->response.state <= DN_NETResponseState_WSPong);
req->response.state = DN_NETResponseState_WSOpen;
for (DN_OS_MutexScope(&curl->list_mutex)) {
DN_Assert(DN_NET_CurlRequestIsInList(curl->request_list, req));
DN_DoublyLLDetach(curl->request_list, req);
}
DN_DoublyLLAppend(curl->thread_request_list, req);
} break;
case DN_NETCurlRingEventType_DeinitRequest: {
DN_Assert(event.request.handle != 0);
DN_NETRequest *request = DN_Cast(DN_NETRequest *) event.request.handle;
// NOTE: Release resources
DN_ArenaClear(&request->arena);
DN_OS_SemaphoreDeinit(&request->completion_sem);
curl_multi_remove_handle(curl->thread_curlm, curl_req->handle);
curl_easy_reset(curl_req->handle);
curl_slist_free_all(curl_req->slist);
// NOTE: Zero the struct preserving just the data we need to retain
DN_NETRequest resetter = {};
resetter.arena = request->arena;
resetter.gen = request->gen;
DN_Memcpy(resetter.context, request->context, sizeof(resetter.context));
*request = resetter;
// NOTE: Add it to the free list
for (DN_OS_MutexScope(&curl->list_mutex)) {
DN_Assert(DN_NET_CurlRequestIsInList(curl->deinit_list, request));
DN_DoublyLLDetach(curl->deinit_list, request);
DN_DoublyLLAppend(curl->free_list, request);
}
} break;
}
}
// NOTE: Pump handles
int running_handles = 0;
CURLMcode perform_result = curl_multi_perform(curl->thread_curlm, &running_handles);
if (perform_result != CURLM_OK)
DN_InvalidCodePath;
// NOTE: Check pump result
for (;;) {
int msgs_in_queue = 0;
CURLMsg *msg = curl_multi_info_read(curl->thread_curlm, &msgs_in_queue);
if (msg) {
// NOTE: Get request handle
DN_NETRequest *req = nullptr;
curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, DN_Cast(void **) & req);
DN_Assert(req);
DN_Assert(DN_NET_CurlRequestIsInList(curl->thread_request_list, req));
DN_NETCurlRequest *curl_req = DN_NET_CurlRequestFromRequest_(req);
DN_Assert(curl_req->handle == msg->easy_handle);
if (msg->data.result == CURLE_OK) {
// NOTE: Get HTTP response code
CURLcode get_result = curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &req->response.http_status);
if (get_result == CURLE_OK) {
if (req->type == DN_NETRequestType_HTTP) {
req->response.state = DN_NETResponseState_HTTP;
} else {
DN_Assert(req->type == DN_NETRequestType_WS);
req->response.state = DN_NETResponseState_WSOpen;
}
} else {
req->response.error_str8 = DN_Str8FromFmtArena(&req->arena, "Failed to get HTTP response status (CURL %d): %s", msg->data.result, curl_easy_strerror(get_result));
req->response.state = DN_NETResponseState_Error;
}
} else {
DN_USize curl_extended_error_size = DN_CStr8Size(curl_req->error);
req->response.state = DN_NETResponseState_Error;
req->response.error_str8 = DN_Str8FromFmtArena(&req->arena,
"HTTP request '%.*s' failed (CURL %d): %s%s%s%s",
DN_Str8PrintFmt(req->url),
msg->data.result,
curl_easy_strerror(msg->data.result),
curl_extended_error_size ? " (" : "",
curl_extended_error_size ? curl_req->error : "",
curl_extended_error_size ? ")" : "");
}
if (req->type == DN_NETRequestType_HTTP || req->response.state == DN_NETResponseState_Error) {
// NOTE: Remove the request from the multi handle if we're a HTTP request
// because it typically terminates the connection. In websockets the
// connection remains in the multi-handle to allow you to send and
// receive WS data from it.
//
// If there's an error (either websocket or HTTP) we will also remove the
// connection from the multi handle as it failed. One a connection has
// failed, curl will not poll that connection so there's no point keeping
// it attached to the multi handle.
curl_multi_remove_handle(curl->thread_curlm, msg->easy_handle);
}
DN_NET_CurlMarkRequestDone_(net, req);
}
if (msgs_in_queue == 0)
break;
}
// NOTE: Check websockets
DN_USize ws_count = 0;
for (DN_NETRequest *req = curl->thread_request_list; req; req = req->next) {
DN_Assert(req->type == DN_NETRequestType_WS || req->type == DN_NETRequestType_HTTP);
if (req->type != DN_NETRequestType_WS || !(req->response.state >= DN_NETResponseState_WSOpen && req->response.state <= DN_NETResponseState_WSPong))
continue;
ws_count++;
const curl_ws_frame *meta = nullptr;
DN_NETCurlRequest *curl_req = DN_NET_CurlRequestFromRequest_(req);
CURLcode receive_result = CURLE_OK;
while (receive_result == CURLE_OK) {
// NOTE: Determine WS payload size received. Note that since we pass in a null pointer CURL
// will set meta->len to 0 and say that there's meta->bytesleft in the next chunk.
DN_USize bytes_read = 0;
receive_result = curl_ws_recv(curl_req->handle, nullptr, 0, &bytes_read, &meta);
if (receive_result != CURLE_OK)
continue;
DN_Assert(meta->len == 0);
if (meta->flags & CURLWS_TEXT)
req->response.state = DN_NETResponseState_WSText;
if (meta->flags & CURLWS_BINARY)
req->response.state = DN_NETResponseState_WSBinary;
if (meta->flags & CURLWS_PING)
req->response.state = DN_NETResponseState_WSPing;
if (meta->flags & CURLWS_PONG)
req->response.state = DN_NETResponseState_WSPong;
if (meta->flags & CURLWS_CLOSE)
req->response.state = DN_NETResponseState_WSClose;
curl_req->ws_has_more = meta->flags & CURLWS_CONT;
if (curl_req->ws_has_more) {
bool is_text_or_binary = req->response.state == DN_NETResponseState_WSText ||
req->response.state == DN_NETResponseState_WSBinary;
DN_Assert(is_text_or_binary);
}
// NOTE: Allocate and read (we use meta->bytesleft as per comment from initial recv)
if (meta->bytesleft) {
DN_Str8 buffer = DN_Str8FromArena(&req->arena, meta->bytesleft, DN_ZMem_No);
DN_Assert(buffer.size == DN_Cast(DN_USize)meta->bytesleft);
receive_result = curl_ws_recv(curl_req->handle, buffer.data, buffer.size, &buffer.size, &meta);
DN_Assert(buffer.size == DN_Cast(DN_USize)meta->len);
DN_Str8BuilderAppendRef(&curl_req->str8_builder, buffer);
}
// NOTE: There are more bytes coming if meta->bytesleft is set, (e.g. the next chunk. We
// just read the current chunk).
//
// > If this is not a complete fragment, the bytesleft field informs about how many
// additional bytes are expected to arrive before this fragment is complete.
curl_req->ws_has_more |= meta && meta->bytesleft > 0;
if (!curl_req->ws_has_more)
break;
}
// NOTE: curl_ws_recv returns CURLE_GOT_NOTHING if the associated connection is closed.
if (receive_result == CURLE_GOT_NOTHING)
curl_req->ws_has_more = false;
// NOTE: We read all the possible bytes that CURL has received for this message, but, there are
// more bytes left that we will receive on subsequent calls. We will continue to the next
// request and return back to this one when PumpRequests is called again where hopefully that
// data has arrived.
if (curl_req->ws_has_more)
continue;
// For CURLE_AGAIN
//
// > Instead of blocking, the function returns CURLE_AGAIN. The correct behavior is then to
// > wait for the socket to signal readability before calling this function again.
//
// In which case we continue ticking the other sockets and eventually exit once all ticked.
// Right after this we wait on the CURLM instance which will wake us up again when there's
// data to be read.
//
// if we received data, e.g. state was set to Text, Binary ... e.t.c we bypass this and
// report it to the user first. When the user waits for the response, they consume the data
// and then that will reinsert it into request list for CURL to read from the socket again.
bool received_data = (req->response.state >= DN_NETResponseState_WSText && req->response.state <= DN_NETResponseState_WSPong);
if (receive_result == CURLE_AGAIN && !received_data)
continue;
if (!received_data) {
if (receive_result == CURLE_GOT_NOTHING) {
req->response.state = DN_NETResponseState_WSClose;
} else if (receive_result != CURLE_OK) {
DN_USize curl_extended_error_size = DN_CStr8Size(curl_req->error);
req->response.state = DN_NETResponseState_Error;
req->response.error_str8 = DN_Str8FromFmtArena(&req->arena,
"Websocket receive '%.*s' failed (CURL %d): %s%s%s%s",
DN_Str8PrintFmt(req->url),
receive_result,
curl_easy_strerror(receive_result),
curl_extended_error_size ? " (" : "",
curl_extended_error_size ? curl_req->error : "",
curl_extended_error_size ? ")" : "");
}
}
DN_NETRequest *request_copy = req;
req = req->prev;
DN_NET_CurlMarkRequestDone_(net, request_copy);
if (!req)
break;
}
DN_I32 sleep_time_ms = ws_count > 0 ? 16 : INT32_MAX;
curl_multi_poll(curl->thread_curlm, nullptr, 0, sleep_time_ms, nullptr);
}
return 0;
}
DN_NETInterface DN_NET_CurlInterface()
{
DN_NETInterface result = {};
result.init = DN_NET_CurlInit;
result.deinit = DN_NET_CurlDeinit;
result.do_http = DN_NET_CurlDoHTTP;
result.do_ws = DN_NET_CurlDoWS;
result.do_ws_send = DN_NET_CurlDoWSSend;
result.wait_for_response = DN_NET_CurlWaitForResponse;
result.wait_for_any_response = DN_NET_CurlWaitForAnyResponse;
return result;
}
void DN_NET_CurlInit(DN_NETCore *net, char *base, DN_U64 base_size)
{
DN_NET_BaseInit_(net, base, base_size);
DN_NETCurlCore *curl = DN_ArenaNew(&net->arena, DN_NETCurlCore, DN_ZMem_Yes);
net->context = curl;
DN_USize arena_bytes_avail = (net->arena.curr->reserve - net->arena.curr->used);
curl->ring.size = arena_bytes_avail / 2;
curl->ring.base = DN_Cast(char *) DN_ArenaAlloc(&net->arena, curl->ring.size, /*align*/ 1, DN_ZMem_Yes);
DN_Assert(curl->ring.base);
curl->ring_mutex = DN_OS_MutexInit();
curl->list_mutex = DN_OS_MutexInit();
curl->thread_curlm = DN_Cast(CURLM *) curl_multi_init();
DN_FmtAppend(curl->thread.name.data, &curl->thread.name.size, sizeof(curl->thread.name.data), "NET (CURL)");
DN_OS_ThreadInit(&curl->thread, DN_NET_CurlThreadEntryPoint_, net);
}
void DN_NET_CurlDeinit(DN_NETCore *net)
{
DN_NETCurlCore *curl = DN_Cast(DN_NETCurlCore *) net->context;
curl->kill_thread = true;
curl_multi_wakeup(curl->thread_curlm);
DN_OS_ThreadDeinit(&curl->thread);
}
static DN_NETRequestHandle DN_NET_CurlDoRequest_(DN_NETCore *net, DN_Str8 url, DN_Str8 method, DN_NETDoHTTPArgs const *args, DN_NETRequestType type)
{
// NOTE: Allocate the request
DN_NETCurlCore *curl_core = DN_Cast(DN_NETCurlCore *) net->context;
DN_NETRequest *req = nullptr;
DN_NETRequestHandle result = {};
{
// NOTE: The free list is modified by both the calling thread and the CURLM thread (which ticks
// all the requests in the background for us)
for (DN_OS_MutexScope(&curl_core->list_mutex)) {
req = curl_core->free_list;
DN_DoublyLLDetach(curl_core->free_list, req);
}
// NOTE None in the free list so allocate one
if (!req) {
DN_U64 arena_pos = DN_ArenaPos(&net->arena);
req = DN_ArenaNew(&net->arena, DN_NETRequest, DN_ZMem_Yes);
DN_NETCurlRequest *curl_req = DN_ArenaNew(&net->arena, DN_NETCurlRequest, DN_ZMem_Yes);
if (!req || !curl_req) {
DN_ArenaPopTo(&net->arena, arena_pos);
return result;
}
curl_req->handle = DN_Cast(CURL *) curl_easy_init();
req->context[0] = DN_Cast(DN_UPtr) curl_req;
}
}
// NOTE: Setup the request
DN_NETCurlRequest *curl_req = DN_NET_CurlRequestFromRequest_(req);
{
result = DN_NET_SetupRequest_(req, url, method, args, type);
req->response.request = result;
req->context[1] = DN_Cast(DN_UPtr) net;
curl_req->str8_builder = DN_Str8BuilderFromArena(&req->arena);
}
// NOTE: Setup the request for curl API
{
CURL *curl = curl_req->handle;
curl_easy_setopt(curl, CURLOPT_PRIVATE, req);
curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, curl_req->error);
// NOTE: Perform request and read all response headers before handing
// control back to app.
curl_easy_setopt(curl, CURLOPT_URL, req->url.data);
curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
// NOTE: Setup response handler
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, DN_NET_CurlHTTPCallback_);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, req);
// NOTE: Assign HTTP headers
for (DN_ForItSize(it, DN_Str8, req->args.headers, req->args.headers_size))
curl_req->slist = curl_slist_append(curl_req->slist, it.data->data);
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, curl_req->slist);
// NOTE: Setup handle for protocol
switch (req->type) {
case DN_NETRequestType_Nil: DN_InvalidCodePath; break;
case DN_NETRequestType_WS: {
curl_easy_setopt(curl, CURLOPT_CONNECT_ONLY, 2L);
} break;
case DN_NETRequestType_HTTP: {
DN_Str8 const GET = DN_Str8Lit("GET");
DN_Str8 const POST = DN_Str8Lit("POST");
if (DN_Str8EqInsensitive(req->method, GET)) {
curl_easy_setopt(curl, CURLOPT_HTTPGET, 1);
} else if (DN_Str8EqInsensitive(req->method, POST)) {
curl_easy_setopt(curl, CURLOPT_POST, 1);
if (req->args.payload.size > DN_Gigabytes(2))
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE_LARGE, req->args.payload.size);
else
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, req->args.payload.size);
curl_easy_setopt(curl, CURLOPT_COPYPOSTFIELDS, req->args.payload.data);
} else {
DN_InvalidCodePathF("Unimplemented");
}
} break;
}
// NOTE: Handle basic auth
if (req->args.flags & DN_NETDoHTTPFlags_BasicAuth) {
if (req->args.username.size && req->args.password.size) {
DN_Assert(req->args.username.data[req->args.username.size] == 0);
DN_Assert(req->args.password.data[req->args.password.size] == 0);
curl_easy_setopt(curl, CURLOPT_USERNAME, req->args.username.data);
curl_easy_setopt(curl, CURLOPT_PASSWORD, req->args.password.data);
}
}
}
// NOTE: Dispatch the request to the CURL thread
{
// NOTE: Immediately add the request to the request list so it happens "atomically" in the
// calling thread. If the calling thread deinitialises this layer before the CURL thread can be
// pre-empted, we can lose track of this request.
for (DN_OS_MutexScope(&curl_core->list_mutex))
DN_DoublyLLAppend(curl_core->request_list, req);
// NOTE: Enqueue request to go into CURL's ring queue. The CURL thread will sleep and wait for
// bytes to come in for the request and then dump the response into the done list to be consumed
// via wait for response
DN_NETCurlRingEvent event = {};
event.type = DN_NETCurlRingEventType_DoRequest;
event.request = result;
for (DN_OS_MutexScope(&curl_core->ring_mutex))
DN_Ring_WriteStruct(&curl_core->ring, &event);
curl_multi_wakeup(curl_core->thread_curlm);
}
return result;
}
DN_NETRequestHandle DN_NET_CurlDoHTTP(DN_NETCore *net, DN_Str8 url, DN_Str8 method, DN_NETDoHTTPArgs const *args)
{
DN_NETRequestHandle result = DN_NET_CurlDoRequest_(net, url, method, args, DN_NETRequestType_HTTP);
return result;
}
DN_NETRequestHandle DN_NET_CurlDoWSArgs(DN_NETCore *net, DN_Str8 url, DN_NETDoHTTPArgs const *args)
{
DN_NETRequestHandle result = DN_NET_CurlDoRequest_(net, url, DN_Str8Lit(""), args, DN_NETRequestType_WS);
return result;
}
DN_NETRequestHandle DN_NET_CurlDoWS(DN_NETCore *net, DN_Str8 url)
{
DN_NETRequestHandle result = DN_NET_CurlDoWSArgs(net, url, nullptr);
return result;
}
void DN_NET_CurlDoWSSend(DN_NETRequestHandle handle, DN_Str8 payload, DN_NETWSSend send)
{
DN_NETRequest *req = DN_NET_RequestFromHandle(handle);
if (!req)
return;
DN_NETCore *net = DN_NET_CurlNetFromRequest(req);
DN_NETCurlCore *curl = DN_Cast(DN_NETCurlCore *) net->context;
DN_Assert(curl);
DN_NETCurlRingEvent event = {};
event.type = DN_NETCurlRingEventType_SendWS;
event.request = handle;
event.ws_send_size = payload.size;
event.ws_send = send;
for (DN_OS_MutexScope(&curl->ring_mutex)) {
DN_Assert(DN_Ring_HasSpace(&curl->ring, payload.size));
DN_Ring_WriteStruct(&curl->ring, &event);
DN_Ring_Write(&curl->ring, payload.data, payload.size);
}
curl_multi_wakeup(curl->thread_curlm);
}
static DN_NETResponse DN_NET_CurlHandleFinishedRequest_(DN_NETCurlCore *curl, DN_NETRequest *req, DN_Arena *arena)
{
// NOTE: Generate the response, copy out the strings into the user given memory
DN_NETResponse result = req->response;
{
DN_NETCurlRequest *curl_req = DN_NET_CurlRequestFromRequest_(req);
result.body = DN_Str8BuilderBuild(&curl_req->str8_builder, arena);
if (result.error_str8.size)
result.error_str8 = DN_Str8FromStr8Arena(arena, result.error_str8);
curl_req->str8_builder = DN_Str8BuilderFromArena(&req->arena);
}
DN_NET_EndFinishedRequest_(req);
bool continue_ws_request = false;
if (req->type == DN_NETRequestType_WS &&
req->response.state != DN_NETResponseState_Error &&
req->response.state != DN_NETResponseState_WSClose) {
continue_ws_request = true;
}
// NOTE: Put the request into the requisite list
for (DN_OS_MutexScope(&curl->list_mutex)) {
// NOTE: Dequeue the request, it _must_ have been in the response list at this point for it to
// have ben waitable in the first place.
DN_AssertF(DN_NET_CurlRequestIsInList(curl->response_list, req),
"A completed response should only signal the completion semaphore when it's in the response list");
DN_DoublyLLDetach(curl->response_list, req);
// NOTE: A websocket that is continuing to get data should go back into the request list because
// there's more data to be received. All other requests need to go into the deinit list (so that
// we keep track of it in the time inbetween it takes for the CURL thread to be scheduled and
// release the CURL handle from CURLM and release resources e.t.c.)
if (continue_ws_request)
DN_DoublyLLAppend(curl->request_list, req);
else
DN_DoublyLLAppend(curl->deinit_list, req);
}
// NOTE: Submit the post-request event to the CURL thread
DN_NETCurlRingEvent event = {};
event.request = DN_NET_HandleFromRequest(req);
if (continue_ws_request) {
event.type = DN_NETCurlRingEventType_ReceivedWSReceipt;
} else {
// NOTE: Deinit _has_ to be sent to the CURL thread because we need to remove the CURL handle
// from the CURLM instance and the CURL thread uses the CURLM instance (e.g. CURLM is not thread
// safe)
event.type = DN_NETCurlRingEventType_DeinitRequest;
}
for (DN_OS_MutexScope(&curl->ring_mutex))
DN_Ring_WriteStruct(&curl->ring, &event);
curl_multi_wakeup(curl->thread_curlm);
return result;
}
DN_NETResponse DN_NET_CurlWaitForResponse(DN_NETRequestHandle handle, DN_Arena *arena, DN_U32 timeout_ms)
{
DN_NETResponse result = {};
DN_NETRequest *req = DN_NET_RequestFromHandle(handle);
if (!req)
return result;
DN_NETCore *net = DN_Cast(DN_NETCore *) req->context[1];
DN_NETCurlCore *curl = DN_Cast(DN_NETCurlCore *) net->context;
DN_Assert(curl);
DN_OSSemaphoreWaitResult wait = DN_OS_SemaphoreWait(&req->completion_sem, timeout_ms);
if (wait != DN_OSSemaphoreWaitResult_Success)
return result;
// NOTE: Decrement the global 'request done' completion semaphore since the user consumed the
// request individually.
DN_OSSemaphoreWaitResult net_wait_result = DN_OS_SemaphoreWait(&net->completion_sem, 0 /*timeout_ms*/);
DN_AssertF(net_wait_result == DN_OSSemaphoreWaitResult_Success, "Wait result was: %zu", DN_Cast(DN_USize) net_wait_result);
// NOTE: Finish handling the response
result = DN_NET_CurlHandleFinishedRequest_(curl, req, arena);
return result;
}
DN_NETResponse DN_NET_CurlWaitForAnyResponse(DN_NETCore *net, DN_Arena *arena, DN_U32 timeout_ms)
{
DN_NETCurlCore *curl = DN_Cast(DN_NETCurlCore *) net->context;
DN_Assert(curl);
DN_NETResponse result = {};
DN_OSSemaphoreWaitResult req_wait = DN_OS_SemaphoreWait(&net->completion_sem, timeout_ms);
if (req_wait != DN_OSSemaphoreWaitResult_Success)
return result;
// NOTE: Just grab the handle, handle finished request will dequeue for us
DN_NETRequestHandle handle = {};
for (DN_OS_MutexScope(&curl->list_mutex)) {
DN_Assert(curl->response_list);
handle = DN_NET_HandleFromRequest(curl->response_list);
}
// NOTE: Decrement the request's completion semaphore since the user consumed the global semaphore
DN_NETRequest *req = DN_NET_RequestFromHandle(handle);
DN_OSSemaphoreWaitResult net_wait = DN_OS_SemaphoreWait(&req->completion_sem, 0 /*timeout_ms*/);
DN_AssertF(net_wait == DN_OSSemaphoreWaitResult_Success, "Wait result was: %zu", DN_Cast(DN_USize) net_wait);
// NOTE: Finish handling the response
result = DN_NET_CurlHandleFinishedRequest_(curl, req, arena);
return result;
}