This commit is contained in:
alessandrobason 2022-09-09 11:19:32 +01:00
parent b2c9f9bd63
commit e3cc2fcf5e
29 changed files with 876 additions and 117 deletions

322
colla/coropool.c Normal file
View file

@ -0,0 +1,322 @@
#include "coropool.h"
#if 0
#include <stdlib.h>
#include <tracelog.h>
#include "jobpool.h"
enum {
NUM_JOBS = 50
};
struct _pool_internal_t;
typedef union job_t {
struct {
mco_coro *co;
struct _pool_internal_t *pool;
};
struct {
union job_t *next_free;
void *__padding;
};
} job_t;
static inline bool _isJobValid(job_t *job) {
return job->pool != NULL;
}
typedef struct jobchunk_t {
job_t jobs[NUM_JOBS];
job_t *first_free;
struct jobchunk_t *next;
} jobchunk_t;
void _chunkInit(jobchunk_t *chunk) {
if (!chunk) return;
chunk->first_free = chunk->jobs;
for (int i = 0; i < (NUM_JOBS - 1); ++i) {
chunk->jobs[i].next_free = chunk->jobs + i + 1;
}
}
jobchunk_t *_chunkNew(jobchunk_t *prev) {
jobchunk_t *chunk = calloc(1, sizeof(jobchunk_t));
_chunkInit(chunk);
prev->next = chunk;
return chunk;
}
job_t *_chunkGetFirstFree(jobchunk_t *chunk) {
job_t *first_free = chunk->first_free;
if (first_free) {
chunk->first_free = first_free->next_free;
}
return first_free;
}
void _chunkSetFree(jobchunk_t *chunk, job_t *job) {
job->pool = NULL;
job->next_free = chunk->first_free;
chunk->first_free = job;
}
typedef struct _pool_internal_t {
jobpool_t pool;
jobchunk_t head_chunk;
cmutex_t chunk_mtx;
} _pool_internal_t;
static int _worker(void *arg);
coropool_t copoInit(uint32 num) {
_pool_internal_t *pool = calloc(1, sizeof(_pool_internal_t));
pool->pool = poolInit(num);
_chunkInit(&pool->head_chunk);
pool->chunk_mtx = mtxInit();
return pool;
}
void copoFree(coropool_t copo) {
_pool_internal_t *pool = copo;
poolWait(pool->pool);
poolFree(pool->pool);
jobchunk_t *chunk = &pool->head_chunk;
while (chunk) {
jobchunk_t *next = chunk->next;
free(chunk);
chunk = next;
}
mtxFree(pool->chunk_mtx);
free(pool);
}
bool copoAdd(coropool_t copo, copo_func_f fn) {
mco_desc desc = mco_desc_init(fn, 0);
return copoAddDesc(copo, &desc);
}
bool copoAddDesc(coropool_t copo, mco_desc *desc) {
mco_coro *co;
mco_create(&co, desc);
return copoAddCo(copo, co);
}
static bool _copoAddJob(job_t *job) {
//return poolAdd(job->pool->pool, _worker, job);
return true;
}
static bool _copoRemoveJob(job_t *job) {
_pool_internal_t *pool = job->pool;
// find chunk
jobchunk_t *chunk = &pool->head_chunk;
while (chunk) {
if (chunk->jobs < job && (chunk->jobs + NUM_JOBS) > job) {
break;
}
chunk = chunk->next;
}
if (!chunk) return false;
mtxLock(pool->chunk_mtx);
_chunkSetFree(chunk, job);
mtxUnlock(pool->chunk_mtx);
}
bool copoAddCo(coropool_t copo, mco_coro *co) {
_pool_internal_t *pool = copo;
job_t *new_job = NULL;
jobchunk_t *chunk = &pool->head_chunk;
mtxLock(pool->chunk_mtx);
while (!new_job) {
new_job = _chunkGetFirstFree(chunk);
if (!new_job) {
if (!chunk->next) {
info("adding new chunk");
chunk->next = _chunkNew(chunk);
}
chunk = chunk->next;
}
}
mtxUnlock(pool->chunk_mtx);
new_job->co = co;
new_job->pool = pool;
//return poolAdd(pool->pool, _worker, new_job);
return _copoAddJob(new_job);
}
void copoWait(coropool_t copo) {
_pool_internal_t *pool = copo;
poolWait(pool->pool);
}
static int _worker(void *arg) {
job_t *job = arg;
mco_result res = mco_resume(job->co);
if (res != MCO_DEAD) {
_copoAddJob(job);
}
else {
_copoRemoveJob(job);
}
return 0;
}
#endif
#include <vec.h>
typedef struct {
vec(mco_coro *) jobs;
uint32 head;
cmutex_t work_mutex;
condvar_t work_cond;
condvar_t working_cond;
int32 working_count;
int32 thread_count;
bool stop;
} _copo_internal_t;
static mco_coro *_getJob(_copo_internal_t *copo);
static int _copoWorker(void *arg);
coropool_t copoInit(uint32 num) {
if (!num) num = 2;
_copo_internal_t *copo = malloc(sizeof(_copo_internal_t));
*copo = (_copo_internal_t){
.work_mutex = mtxInit(),
.work_cond = condInit(),
.working_cond = condInit(),
.thread_count = (int32)num
};
for (usize i = 0; i < num; ++i) {
thrDetach(thrCreate(_copoWorker, copo));
}
return copo;
}
void copoFree(coropool_t copo_in) {
_copo_internal_t *copo = copo_in;
if (!copo) return;
mtxLock(copo->work_mutex);
copo->stop = true;
condWakeAll(copo->work_cond);
mtxUnlock(copo->work_mutex);
copoWait(copo);
vecFree(copo->jobs);
mtxFree(copo->work_mutex);
condFree(copo->work_cond);
condFree(copo->working_cond);
free(copo);
}
bool copoAdd(coropool_t copo, copo_func_f fn) {
mco_desc desc = mco_desc_init(fn, 0);
return copoAddDesc(copo, &desc);
}
bool copoAddDesc(coropool_t copo, mco_desc *desc) {
mco_coro *co;
mco_create(&co, desc);
return copoAddCo(copo, co);
}
bool copoAddCo(coropool_t copo_in, mco_coro *co) {
_copo_internal_t *copo = copo_in;
if (!copo) return false;
mtxLock(copo->work_mutex);
if (copo->head > vecLen(copo->jobs)) {
vecClear(copo->jobs);
copo->head = 0;
}
vecAppend(copo->jobs, co);
condWake(copo->work_cond);
mtxUnlock(copo->work_mutex);
return true;
}
void copoWait(coropool_t copo_in) {
_copo_internal_t *copo = copo_in;
if (!copo) return;
mtxLock(copo->work_mutex);
// while its either
// - working and there's still some threads doing some work
// - not working and there's still some threads exiting
while ((!copo->stop && copo->working_count > 0) ||
(copo->stop && copo->thread_count > 0)
) {
condWait(copo->working_cond, copo->work_mutex);
}
mtxUnlock(copo->work_mutex);
}
// == PRIVATE FUNCTIONS ===================================
static mco_coro *_getJob(_copo_internal_t *copo) {
if (copo->head >= vecLen(copo->jobs)) {
copo->head = 0;
}
return copo->jobs[copo->head++];
}
static int _copoWorker(void *arg) {
_copo_internal_t *copo = arg;
while (true) {
mtxLock(copo->work_mutex);
// wait for a new job
while (copo->head >= vecLen(copo->jobs) && !copo->stop) {
condWait(copo->work_cond, copo->work_mutex);
}
if (copo->stop) {
break;
}
mco_coro *job = _getJob(copo);
copo->working_count++;
mtxUnlock(copo->work_mutex);
if (job && mco_status(job) != MCO_DEAD) {
mco_resume(job);
if (mco_status(job) != MCO_DEAD) {
copoAddCo(copo, job);
}
}
mtxLock(copo->work_mutex);
copo->working_count--;
if (!copo->stop &&
copo->working_count == 0 &&
copo->head == vecLen(copo->jobs)
) {
condWake(copo->working_cond);
}
mtxUnlock(copo->work_mutex);
}
copo->thread_count--;
condWake(copo->working_cond);
mtxUnlock(copo->work_mutex);
return 0;
}

17
colla/coropool.h Normal file
View file

@ -0,0 +1,17 @@
#pragma once
#include <collatypes.h>
#include <cthreads.h>
#include "minicoro.h"
typedef void *coropool_t;
typedef void (*copo_func_f)(mco_coro *co);
coropool_t copoInit(uint32 num);
void copoFree(coropool_t copo);
bool copoAdd(coropool_t copo, copo_func_f fn);
bool copoAddDesc(coropool_t copo, mco_desc *desc);
bool copoAddCo(coropool_t copo, mco_coro *co);
void copoWait(coropool_t copo);

View file

@ -75,8 +75,9 @@ cmutex_t mtxInit(void) {
return (cmutex_t)crit_sec;
}
void mtxDestroy(cmutex_t ctx) {
void mtxFree(cmutex_t ctx) {
DeleteCriticalSection((CRITICAL_SECTION *)ctx);
free((CRITICAL_SECTION *)ctx);
}
bool mtxValid(cmutex_t ctx) {
@ -97,6 +98,35 @@ bool mtxUnlock(cmutex_t ctx) {
return true;
}
// == CONDITION VARIABLE ===============================
#include <tracelog.h>
condvar_t condInit(void) {
CONDITION_VARIABLE *cond = malloc(sizeof(CONDITION_VARIABLE));
InitializeConditionVariable(cond);
return (condvar_t)cond;
}
void condFree(condvar_t cond) {
free((CONDITION_VARIABLE *)cond);
}
void condWake(condvar_t cond) {
WakeConditionVariable((CONDITION_VARIABLE *)cond);
}
void condWakeAll(condvar_t cond) {
WakeAllConditionVariable((CONDITION_VARIABLE *)cond);
}
void condWait(condvar_t cond, cmutex_t mtx) {
BOOL res = SleepConditionVariableCS((CONDITION_VARIABLE *)cond, (CRITICAL_SECTION *)mtx, INFINITE);
}
void condWaitTimed(condvar_t cond, cmutex_t mtx, int milliseconds) {
SleepConditionVariableCS((CONDITION_VARIABLE *)cond, (CRITICAL_SECTION *)mtx, milliseconds);
}
#else
#include <pthread.h>
@ -168,14 +198,16 @@ cmutex_t mtxInit(void) {
pthread_mutex_t *mutex = malloc(sizeof(pthread_mutex_t));
if(mutex) {
int res = pthread_mutex_init(mutex, NULL);
if(res != 0) mutex = NULL;
if(pthread_mutex_init(mutex, NULL)) {
free(mutex);
mutex = NULL;
}
}
return (cmutex_t)mutex;
}
void mtxDestroy(cmutex_t ctx) {
void mtxFree(cmutex_t ctx) {
pthread_mutex_destroy((pthread_mutex_t *)ctx);
}
@ -195,4 +227,44 @@ bool mtxUnlock(cmutex_t ctx) {
return pthread_mutex_unlock((pthread_mutex_t *)ctx) == 0;
}
// == CONDITION VARIABLE ===============================
condvar_t condInit(void) {
pthread_cond_t *cond = malloc(sizeof(pthread_cond_t));
if(cond) {
if(pthread_cond_init(cond, NULL)) {
free(cond);
cond = NULL;
}
}
return (condvar_t)cond;
}
void condFree(condvar_t cond) {
if (!cond) return;
pthread_cond_destroy((pthread_cond_t *)cond);
free((pthread_cond_t *)cond);
}
void condWake(condvar_t cond) {
pthread_cond_signal((pthread_cond_t *)cond);
}
void condWakeAll(condvar_t cond) {
pthread_cond_broadcast((pthread_cond_t *)cond);
}
void condWait(condvar_t cond, cmutex_t mtx) {
pthread_cond_wait((pthread_cond_t *)cond, (pthread_mutex_t *)mtx);
}
void condWaitTimed(condvar_t cond, cmutex_t mtx, int milliseconds) {
struct timespec timeout;
time(&timeout.tv_sec);
timeout.tv_nsec += milliseconds * 1000000;
pthread_cond_timedwait((pthread_cond_t *)cond, (pthread_mutex_t *)mtx, &timeout);
}
#endif

View file

@ -29,7 +29,7 @@ bool thrJoin(cthread_t ctx, int *code);
typedef uintptr_t cmutex_t;
cmutex_t mtxInit(void);
void mtxDestroy(cmutex_t ctx);
void mtxFree(cmutex_t ctx);
bool mtxValid(cmutex_t ctx);
@ -62,6 +62,21 @@ struct lock_t {
};
#endif
// == CONDITION VARIABLE ===============================
typedef uintptr_t condvar_t;
#define COND_WAIT_INFINITE 0xFFFFFFFF
condvar_t condInit(void);
void condFree(condvar_t cond);
void condWake(condvar_t cond);
void condWakeAll(condvar_t cond);
void condWait(condvar_t cond, cmutex_t mtx);
void condWaitTimed(condvar_t cond, cmutex_t mtx, int milliseconds);
#ifdef __cplusplus
} // extern "C"
#endif

View file

@ -24,10 +24,7 @@ static DWORD _toWin32Creation(filemode_t mode) {
}
bool fileExists(const char *fname) {
DWORD dwAttrib = GetFileAttributesA(fname);
return (dwAttrib != INVALID_FILE_ATTRIBUTES &&
!(dwAttrib & FILE_ATTRIBUTE_DIRECTORY));
return GetFileAttributesA(fname) != INVALID_FILE_ATTRIBUTES;
}
file_t fileOpen(const char *fname, filemode_t mode) {
@ -95,6 +92,12 @@ uint64 fileTell(file_t ctx) {
return result == TRUE ? (uint64)tell.QuadPart : 0;
}
uint64 fileGetTime(file_t ctx) {
uint64 fp_time = 0;
GetFileTime((HANDLE)ctx, NULL, NULL, (FILETIME *)&fp_time);
return fp_time;
}
#else
#include <stdio.h>
#include <stdlib.h>
@ -164,6 +167,10 @@ void fileRewind(file_t ctx) {
uint64 fileTell(file_t ctx) {
return (uint64)ftell((FILE*)ctx);
}
uint64 fileGetTime(file_t ctx) {
}
#endif
static str_t _readWholeInternalStr(file_t ctx) {
@ -292,3 +299,13 @@ bool fileWriteWholeText(const char *fname, strview_t string) {
bool fileWriteWholeTextFP(file_t ctx, strview_t string) {
return fileWriteWholeFP(ctx, (filebuf_t){ (uint8 *)string.buf, string.len });
}
uint64 fileGetTimePath(const char *path) {
file_t fp = fileOpen(path, FILE_READ);
if (!fileIsValid(fp)) {
return 0;
}
uint64 fp_time = fileGetTime(fp);
fileClose(fp);
return fp_time;
}

View file

@ -54,6 +54,9 @@ bool fileWriteWholeFP(file_t ctx, filebuf_t data);
bool fileWriteWholeText(const char *fname, strview_t string);
bool fileWriteWholeTextFP(file_t ctx, strview_t string);
uint64 fileGetTime(file_t ctx);
uint64 fileGetTimePath(const char *path);
#ifdef __cplusplus
} // extern "C"
#endif

View file

@ -4,7 +4,7 @@
#include <stdio.h>
#include <stdlib.h>
#include "os.h"
// #include "os.h"
#include "tracelog.h"
#include "vec.h"
@ -48,6 +48,62 @@ static void _setField(vec(http_field_t) *fields_vec, const char *key, const char
vecAppend(*fields_vec, field);
}
static void _parseFields(vec(http_field_t) *fields, str_istream_t *in) {
strview_t line;
do {
line = istrGetview(in, '\r');
usize pos = strvFind(line, ':', 0);
if(pos != STRV_NOT_FOUND) {
strview_t key = strvSub(line, 0, pos);
strview_t value = strvSub(line, pos + 2, SIZE_MAX);
char *key_str = NULL;
char *value_str = NULL;
key_str = strvCopy(key).buf;
value_str = strvCopy(value).buf;
_setField(fields, key_str, value_str);
free(key_str);
free(value_str);
}
istrSkip(in, 2); // skip \r\n
} while(line.len > 2);
}
// == HTTP STATUS =============================================================
const char *httpGetStatusString(resstatus_t status) {
switch (status) {
case STATUS_OK: return "OK";
case STATUS_CREATED: return "CREATED";
case STATUS_ACCEPTED: return "ACCEPTED";
case STATUS_NO_CONTENT: return "NO CONTENT";
case STATUS_RESET_CONTENT: return "RESET CONTENT";
case STATUS_PARTIAL_CONTENT: return "PARTIAL CONTENT";
case STATUS_MULTIPLE_CHOICES: return "MULTIPLE CHOICES";
case STATUS_MOVED_PERMANENTLY: return "MOVED PERMANENTLY";
case STATUS_MOVED_TEMPORARILY: return "MOVED TEMPORARILY";
case STATUS_NOT_MODIFIED: return "NOT MODIFIED";
case STATUS_BAD_REQUEST: return "BAD REQUEST";
case STATUS_UNAUTHORIZED: return "UNAUTHORIZED";
case STATUS_FORBIDDEN: return "FORBIDDEN";
case STATUS_NOT_FOUND: return "NOT FOUND";
case STATUS_RANGE_NOT_SATISFIABLE: return "RANGE NOT SATISFIABLE";
case STATUS_INTERNAL_SERVER_ERROR: return "INTERNAL SERVER_ERROR";
case STATUS_NOT_IMPLEMENTED: return "NOT IMPLEMENTED";
case STATUS_BAD_GATEWAY: return "BAD GATEWAY";
case STATUS_SERVICE_NOT_AVAILABLE: return "SERVICE NOT AVAILABLE";
case STATUS_GATEWAY_TIMEOUT: return "GATEWAY TIMEOUT";
case STATUS_VERSION_NOT_SUPPORTED: return "VERSION NOT SUPPORTED";
}
return "UNKNOWN";
}
// == HTTP VERSION ============================================================
int httpVerNumber(http_version_t ver) {
@ -57,22 +113,65 @@ int httpVerNumber(http_version_t ver) {
// == HTTP REQUEST ============================================================
http_request_t reqInit() {
http_request_t req;
memset(&req, 0, sizeof(req));
reqSetUri(&req, strvInit("/"));
http_request_t req = {0};
reqSetUri(&req, strvInit(""));
req.version = (http_version_t){1, 1};
return req;
}
void reqFree(http_request_t *ctx) {
for (uint32 i = 0; i < vecLen(ctx->fields); ++i) {
free(ctx->fields[i].key);
free(ctx->fields[i].value);
http_request_t reqParse(const char *request) {
http_request_t req = {0};
str_istream_t in = istrInit(request);
// get data
strview_t method = strvTrim(istrGetview(&in, '/'));
istrSkip(&in, 1); // skip /
strview_t page = strvTrim(istrGetview(&in, ' '));
strview_t http = strvTrim(istrGetview(&in, '\n'));
istrSkip(&in, 1); // skip \n
_parseFields(&req.fields, &in);
strview_t body = strvTrim(istrGetviewLen(&in, 0, SIZE_MAX));
// parse data
// -- method
const char *methods[] = { "GET", "POST", "HEAD", "PUT", "DELETE" };
const int methods_count = sizeof(methods) / sizeof(*methods);
for (int i = 0; i < methods_count; ++i) {
if (strvCompare(method, strvInit(methods[i])) == 0) {
req.method = (reqtype_t)i;
}
vecFree(ctx->fields);
free(ctx->uri);
free(ctx->body);
memset(ctx, 0, sizeof(http_request_t));
}
// -- page
req.uri = strvCopy(page).buf;
// -- http
in = istrInitLen(http.buf, http.len);
istrIgnoreAndSkip(&in, '/'); // skip HTTP/
istrGetu8(&in, &req.version.major);
istrSkip(&in, 1); // skip .
istrGetu8(&in, &req.version.minor);
// -- body
req.body = strvCopy(body).buf;
return req;
}
void reqFree(http_request_t ctx) {
for (http_field_t *it = ctx.fields; it != vecEnd(ctx.fields); ++it) {
free(it->key);
free(it->value);
}
vecFree(ctx.fields);
free(ctx.uri);
free(ctx.body);
}
bool reqHasField(http_request_t *ctx, const char *key) {
@ -91,16 +190,11 @@ void reqSetField(http_request_t *ctx, const char *key, const char *value) {
void reqSetUri(http_request_t *ctx, strview_t uri) {
if (strvIsEmpty(uri)) return;
free(ctx->uri);
if (uri.buf[0] != '/') {
ctx->uri = (char *)realloc(ctx->uri, uri.len + 1);
ctx->uri[0] = '/';
memcpy(ctx->uri + 1, uri.buf, uri.len);
ctx->uri[uri.len] = '\0';
if (uri.buf[0] == '/') {
strvRemovePrefix(uri, 1);
}
else {
ctx->uri = strvCopy(uri).buf;
}
}
str_ostream_t reqPrepare(http_request_t *ctx) {
str_ostream_t out = ostrInitLen(1024);
@ -115,7 +209,7 @@ str_ostream_t reqPrepare(http_request_t *ctx) {
default: err("unrecognized method: %d", method); goto error;
}
ostrPrintf(&out, "%s %s HTTP/%hhu.%hhu\r\n",
ostrPrintf(&out, "%s /%s HTTP/%hhu.%hhu\r\n",
method, ctx->uri, ctx->version.major, ctx->version.minor
);
@ -139,18 +233,49 @@ str_t reqString(http_request_t *ctx) {
// == HTTP RESPONSE ===========================================================
http_response_t resInit() {
return (http_response_t) {0};
http_response_t resParse(const char *data) {
http_response_t ctx = {0};
str_istream_t in = istrInit(data);
char hp[5];
istrGetstringBuf(&in, hp, 5);
if(stricmp(hp, "http") != 0) {
err("response doesn't start with 'HTTP', instead with %c%c%c%c", hp[0], hp[1], hp[2], hp[3]);
return ctx;
}
istrSkip(&in, 1); // skip /
istrGetu8(&in, &ctx.version.major);
istrSkip(&in, 1); // skip .
istrGetu8(&in, &ctx.version.minor);
istrGeti32(&in, (int32*)&ctx.status_code);
istrIgnore(&in, '\n');
istrSkip(&in, 1); // skip \n
resParseFields(&ctx, &in);
const char *tran_encoding = resGetField(&ctx, "transfer-encoding");
if(tran_encoding == NULL || stricmp(tran_encoding, "chunked") != 0) {
strview_t body = istrGetviewLen(&in, 0, SIZE_MAX);
vecClear(ctx.body);
vecReserve(ctx.body, body.len);
memcpy(ctx.body, body.buf, body.len);
}
else {
// fatal("chunked encoding not implemented yet");
err("chunked encoding not implemented yet");
}
void resFree(http_response_t *ctx) {
for(uint32 i = 0; i < vecLen(ctx->fields); ++i) {
free(ctx->fields[i].key);
free(ctx->fields[i].value);
return ctx;
}
vecFree(ctx->fields);
vecFree(ctx->body);
memset(ctx, 0, sizeof(http_response_t));
void resFree(http_response_t ctx) {
for (http_field_t *it = ctx.fields; it != vecEnd(ctx.fields); ++it) {
free(it->key);
free(it->value);
}
vecFree(ctx.fields);
vecFree(ctx.body);
}
bool resHasField(http_response_t *ctx, const char *key) {
@ -162,6 +287,10 @@ bool resHasField(http_response_t *ctx, const char *key) {
return false;
}
void resSetField(http_response_t *ctx, const char *key, const char *value) {
_setField(&ctx->fields, key, value);
}
const char *resGetField(http_response_t *ctx, const char *field) {
for(uint32 i = 0; i < vecLen(ctx->fields); ++i) {
if(stricmp(ctx->fields[i].key, field) == 0) {
@ -171,63 +300,30 @@ const char *resGetField(http_response_t *ctx, const char *field) {
return NULL;
}
void resParse(http_response_t *ctx, const char *data) {
str_istream_t in = istrInit(data);
char hp[5];
istrGetstringBuf(&in, hp, 5);
if(stricmp(hp, "http") != 0) {
err("response doesn't start with 'HTTP', instead with %c%c%c%c", hp[0], hp[1], hp[2], hp[3]);
return;
}
istrSkip(&in, 1); // skip /
istrGetu8(&in, &ctx->version.major);
istrSkip(&in, 1); // skip .
istrGetu8(&in, &ctx->version.minor);
istrGeti32(&in, (int32*)&ctx->status_code);
istrIgnore(&in, '\n');
istrSkip(&in, 1); // skip \n
resParseFields(ctx, &in);
const char *tran_encoding = resGetField(ctx, "transfer-encoding");
if(tran_encoding == NULL || stricmp(tran_encoding, "chunked") != 0) {
strview_t body = istrGetviewLen(&in, 0, SIZE_MAX);
vecClear(ctx->body);
vecReserve(ctx->body, body.len);
memcpy(ctx->body, body.buf, body.len);
}
else {
fatal("chunked encoding not implemented yet");
}
}
void resParseFields(http_response_t *ctx, str_istream_t *in) {
strview_t line;
do {
line = istrGetview(in, '\r');
usize pos = strvFind(line, ':', 0);
if(pos != STRV_NOT_FOUND) {
strview_t key = strvSub(line, 0, pos);
strview_t value = strvSub(line, pos + 2, SIZE_MAX);
char *key_str = NULL;
char *value_str = NULL;
key_str = strvCopy(key).buf;
value_str = strvCopy(value).buf;
_setField(&ctx->fields, key_str, value_str);
free(key_str);
free(value_str);
_parseFields(&ctx->fields, in);
}
istrSkip(in, 2); // skip \r\n
} while(line.len > 2);
str_ostream_t resPrepare(http_response_t *ctx) {
str_ostream_t out = ostrInitLen(1024);
ostrPrintf(
&out, "HTTP/%hhu.%hhu %d %s\r\n",
ctx->version.major, ctx->version.minor,
ctx->status_code, httpGetStatusString(ctx->status_code)
);
for (http_field_t *field = ctx->fields; field != vecEnd(ctx->fields); ++field) {
ostrPrintf(&out, "%s: %s\r\n", field->key, field->value);
}
ostrPuts(&out, "\r\n");
ostrAppendview(&out, strvInitLen(ctx->body, vecLen(ctx->body)));
return out;
}
str_t resString(http_response_t *ctx) {
str_ostream_t out = resPrepare(ctx);
return ostrAsStr(out);
}
// == HTTP CLIENT =============================================================
@ -238,9 +334,8 @@ http_client_t hcliInit() {
};
}
void hcliFree(http_client_t *ctx) {
strFree(ctx->host_name);
memset(ctx, 0, sizeof(http_client_t));
void hcliFree(http_client_t ctx) {
strFree(ctx.host_name);
}
void hcliSetHost(http_client_t *ctx, strview_t hostname) {
@ -283,7 +378,7 @@ http_response_t hcliSendRequest(http_client_t *ctx, http_request_t *req) {
reqSetField(req, "Connection", "close");
}
http_response_t res = resInit();
http_response_t res = {0};
str_t req_str = strInit();
str_ostream_t received = ostrInitLen(1024);
@ -327,7 +422,7 @@ http_response_t hcliSendRequest(http_client_t *ctx, http_request_t *req) {
received.len--;
}
resParse(&res, received.buf);
res = resParse(received.buf);
}
else {
err("Couldn't connect to host %s -> %s", ctx->host_name, skGetErrorString());
@ -357,8 +452,8 @@ http_response_t httpGet(strview_t hostname, strview_t uri) {
http_response_t res = hcliSendRequest(&client, &request);
reqFree(&request);
hcliFree(&client);
reqFree(request);
hcliFree(client);
return res;
}

View file

@ -51,6 +51,8 @@ typedef enum {
STATUS_VERSION_NOT_SUPPORTED = 505,
} resstatus_t;
const char *httpGetStatusString(resstatus_t status);
typedef struct {
uint8 major;
uint8 minor;
@ -77,7 +79,8 @@ typedef struct {
} http_request_t;
http_request_t reqInit(void);
void reqFree(http_request_t *ctx);
http_request_t reqParse(const char *request);
void reqFree(http_request_t ctx);
bool reqHasField(http_request_t *ctx, const char *key);
@ -96,14 +99,17 @@ typedef struct {
vec(uint8) body;
} http_response_t;
http_response_t resInit(void);
void resFree(http_response_t *ctx);
http_response_t resParse(const char *data);
void resFree(http_response_t ctx);
bool resHasField(http_response_t *ctx, const char *key);
void resSetField(http_response_t *ctx, const char *key, const char *value);
const char *resGetField(http_response_t *ctx, const char *field);
void resParse(http_response_t *ctx, const char *data);
// void resParse(http_response_t *ctx, const char *data);
void resParseFields(http_response_t *ctx, str_istream_t *in);
str_ostream_t resPrepare(http_response_t *ctx);
str_t resString(http_response_t *ctx);
// == HTTP CLIENT =============================================================
@ -114,7 +120,7 @@ typedef struct {
} http_client_t;
http_client_t hcliInit(void);
void hcliFree(http_client_t *ctx);
void hcliFree(http_client_t ctx);
void hcliSetHost(http_client_t *ctx, strview_t hostname);
http_response_t hcliSendRequest(http_client_t *ctx, http_request_t *request);

View file

@ -6,8 +6,11 @@
// == INI READER ========================================================================
static const iniopts_t default_opts = {0};
static const iniopts_t default_opts = {
.key_value_divider = '='
};
static iniopts_t setDefaultOptions(const iniopts_t *options);
static initable_t *findTable(ini_t *ctx, strview_t name);
static inivalue_t *findValue(vec(inivalue_t) values, strview_t key);
static void addTable(ini_t *ctx, str_istream_t *in, const iniopts_t *options);
@ -32,14 +35,13 @@ void _iniParseInternal(ini_t *ini, const iniopts_t *options) {
}
istrSkipWhitespace(&in);
}
}
ini_t iniParse(const char *filename, const iniopts_t *options) {
ini_t ini = { .text = fileReadWholeText(filename) };
if (strIsEmpty(ini.text)) return ini;
if (!options) options = &default_opts;
_iniParseInternal(&ini, options);
iniopts_t opts = setDefaultOptions(options);
_iniParseInternal(&ini, &opts);
return ini;
}
@ -265,6 +267,23 @@ winitable_t *winiAddTabView(iniwriter_t *ctx, strview_t name) {
// == PRIVATE FUNCTIONS ========================================================
static iniopts_t setDefaultOptions(const iniopts_t *options) {
if (!options) return default_opts;
iniopts_t opts = default_opts;
if (options->merge_duplicate_keys)
opts.merge_duplicate_keys = options->merge_duplicate_keys;
if (options->merge_duplicate_tables)
opts.merge_duplicate_tables = options->merge_duplicate_tables;
if (options->key_value_divider)
opts.key_value_divider = options->key_value_divider;
return opts;
}
static initable_t *findTable(ini_t *ctx, strview_t name) {
if (strvIsEmpty(name)) return NULL;
for (uint32 i = 1; i < vecLen(ctx->tables); ++i) {
@ -312,7 +331,7 @@ static void addTable(ini_t *ctx, str_istream_t *in, const iniopts_t *options) {
static void addValue(initable_t *table, str_istream_t *in, const iniopts_t *options) {
if (!table) fatal("table is null");
strview_t key = strvTrim(istrGetview(in, '='));
strview_t key = strvTrim(istrGetview(in, options->key_value_divider));
istrSkip(in, 1);
strview_t value = strvTrim(istrGetview(in, '\n'));
// value might be until EOF, in that case no use in skipping

View file

@ -27,8 +27,9 @@ typedef struct {
} ini_t;
typedef struct {
bool merge_duplicate_tables;
bool merge_duplicate_keys;
bool merge_duplicate_tables; // default false
bool merge_duplicate_keys; // default false
char key_value_divider; // default =
} iniopts_t;
ini_t iniParse(const char *filename, const iniopts_t *options);
@ -63,7 +64,8 @@ typedef struct {
} iniwriter_t;
typedef struct {
bool no_discalimer;
bool no_discalimer; // default false
char key_value_divider; // default =
} winiopts_t;
iniwriter_t winiInit();

144
colla/jobpool.c Normal file
View file

@ -0,0 +1,144 @@
#include "jobpool.h"
#include <vec.h>
typedef struct {
cthread_func_t func;
void *arg;
} job_t;
typedef struct {
vec(job_t) jobs;
uint32 head;
cmutex_t work_mutex;
condvar_t work_cond;
condvar_t working_cond;
int32 working_count;
int32 thread_count;
bool stop;
} _pool_internal_t;
static job_t _getJob(_pool_internal_t *pool);
static int _poolWorker(void *arg);
jobpool_t poolInit(uint32 num) {
if (!num) num = 2;
_pool_internal_t *pool = malloc(sizeof(_pool_internal_t));
*pool = (_pool_internal_t){
.work_mutex = mtxInit(),
.work_cond = condInit(),
.working_cond = condInit(),
.thread_count = (int32)num
};
for (usize i = 0; i < num; ++i) {
thrDetach(thrCreate(_poolWorker, pool));
}
return pool;
}
void poolFree(jobpool_t pool_in) {
_pool_internal_t *pool = pool_in;
if (!pool) return;
mtxLock(pool->work_mutex);
pool->stop = true;
condWakeAll(pool->work_cond);
mtxUnlock(pool->work_mutex);
poolWait(pool);
vecFree(pool->jobs);
mtxFree(pool->work_mutex);
condFree(pool->work_cond);
condFree(pool->working_cond);
free(pool);
}
bool poolAdd(jobpool_t pool_in, cthread_func_t func, void *arg) {
_pool_internal_t *pool = pool_in;
if (!pool) return false;
mtxLock(pool->work_mutex);
if (pool->head > vecLen(pool->jobs)) {
vecClear(pool->jobs);
pool->head = 0;
}
job_t job = { func, arg };
vecAppend(pool->jobs, job);
condWake(pool->work_cond);
mtxUnlock(pool->work_mutex);
return true;
}
void poolWait(jobpool_t pool_in) {
_pool_internal_t *pool = pool_in;
if (!pool) return;
mtxLock(pool->work_mutex);
// while its either
// - working and there's still some threads doing some work
// - not working and there's still some threads exiting
while ((!pool->stop && pool->working_count > 0) ||
(pool->stop && pool->thread_count > 0)
) {
condWait(pool->working_cond, pool->work_mutex);
}
mtxUnlock(pool->work_mutex);
}
// == PRIVATE FUNCTIONS ===================================
static job_t _getJob(_pool_internal_t *pool) {
if (pool->head >= vecLen(pool->jobs)) {
pool->head = 0;
}
job_t job = pool->jobs[pool->head++];
return job;
}
static int _poolWorker(void *arg) {
_pool_internal_t *pool = arg;
while (true) {
mtxLock(pool->work_mutex);
// wait for a new job
while (pool->head >= vecLen(pool->jobs) && !pool->stop) {
condWait(pool->work_cond, pool->work_mutex);
}
if (pool->stop) {
break;
}
job_t job = _getJob(pool);
pool->working_count++;
mtxUnlock(pool->work_mutex);
if (job.func) {
job.func(job.arg);
}
mtxLock(pool->work_mutex);
pool->working_count--;
if (!pool->stop &&
pool->working_count == 0 &&
pool->head == vecLen(pool->jobs)
) {
condWake(pool->working_cond);
}
mtxUnlock(pool->work_mutex);
}
pool->thread_count--;
condWake(pool->working_cond);
mtxUnlock(pool->work_mutex);
return 0;
}

12
colla/jobpool.h Normal file
View file

@ -0,0 +1,12 @@
#pragma once
#include <collatypes.h>
#include <cthreads.h>
typedef void *jobpool_t;
jobpool_t poolInit(uint32 num);
void poolFree(jobpool_t pool);
bool poolAdd(jobpool_t pool, cthread_func_t func, void *arg);
void poolWait(jobpool_t pool);

View file

@ -21,6 +21,7 @@ static const char *_win_skGetErrorString();
#include <unistd.h>
#include <errno.h>
#include <string.h> // strerror
#include <poll.h>
#define INVALID_SOCKET (-1)
#define SOCKET_ERROR (-1)
@ -187,6 +188,14 @@ int skReceiveFromPro(socket_t sock, void *buf, int len, int flags, sk_addr_t *fr
return recvfrom(sock, buf, len, flags, from, fromlen);
}
int skPoll(skpoll_t *to_poll, int num_to_poll, int timeout) {
#if SOCK_WINDOWS
return WSAPoll(to_poll, num_to_poll, timeout);
#elif SOCK_POSIX
return poll(to_poll, num_to_poll, timeout);
#endif
}
bool skIsValid(socket_t sock) {
return sock != INVALID_SOCKET;
}

View file

@ -32,6 +32,7 @@ extern "C" {
typedef struct sockaddr sk_addr_t;
typedef struct sockaddr_in sk_addrin_t;
typedef struct pollfd skpoll_t;
typedef enum {
SOCK_TCP,
@ -54,12 +55,15 @@ socket_t skOpenEx(const char *protocol);
// Opens a socket, check socket_t with skValid
socket_t skOpenPro(int af, int type, int protocol);
// Fill out a sk_addrin_t structure with "ip" and "port"
sk_addrin_t skAddrinInit(const char *ip, uint16_t port);
// Checks that a opened socket is valid, returns true on success
bool skIsValid(socket_t sock);
// Closes a socket, returns true on success
bool skClose(socket_t sock);
// Fill out a sk_addrin_t structure with "ip" and "port"
sk_addrin_t skAddrinInit(const char *ip, uint16_t port);
// Associate a local address with a socket
bool skBind(socket_t sock, const char *ip, uint16_t port);
// Associate a local address with a socket
@ -97,8 +101,8 @@ int skReceiveFrom(socket_t sock, void *buf, int len, sk_addrin_t *from);
// Receives a datagram and stores the source address.
int skReceiveFromPro(socket_t sock, void *buf, int len, int flags, sk_addr_t *from, sk_len_t *fromlen);
// Checks that a opened socket is valid, returns true on success
bool skIsValid(socket_t sock);
// Wait for an event on some sockets
int skPoll(skpoll_t *to_poll, int num_to_poll, int timeout);
// Returns latest socket error, returns 0 if there is no error
int skGetError(void);

View file

@ -72,7 +72,15 @@ void traceLog(int level, const char *fmt, ...) {
va_end(args);
}
#include "cthreads.h"
static cmutex_t g_mtx = 0;
void traceLogVaList(int level, const char *fmt, va_list args) {
if (!g_mtx) g_mtx = mtxInit();
mtxLock(g_mtx);
char buffer[MAX_TRACELOG_MSG_LENGTH];
memset(buffer, 0, sizeof(buffer));
@ -115,6 +123,8 @@ void traceLogVaList(int level, const char *fmt, va_list args) {
#ifndef TLOG_DONT_EXIT_ON_FATAL
if (level == LogFatal) exit(1);
#endif
mtxUnlock(g_mtx);
}
void traceUseNewline(bool newline) {

View file

@ -21,6 +21,15 @@ void traceLog(int level, const char *fmt, ...);
void traceLogVaList(int level, const char *fmt, va_list args);
void traceUseNewline(bool use_newline);
#ifdef NO_LOG
#define tall(...)
#define trace(...)
#define debug(...)
#define info(...)
#define warn(...)
#define err(...)
#define fatal(...)
#else
#define tall(...) traceLog(LogAll, __VA_ARGS__)
#define trace(...) traceLog(LogTrace, __VA_ARGS__)
#define debug(...) traceLog(LogDebug, __VA_ARGS__)
@ -28,6 +37,7 @@ void traceUseNewline(bool use_newline);
#define warn(...) traceLog(LogWarning, __VA_ARGS__)
#define err(...) traceLog(LogError, __VA_ARGS__)
#define fatal(...) traceLog(LogFatal, __VA_ARGS__)
#endif
#ifdef __cplusplus
} // extern "C"

View file

@ -9,7 +9,9 @@ extern "C" {
#define vecFree(vec) ((vec) ? free(_vecheader(vec)), NULL : NULL)
#define vecCopy(src, dest) (vecFree(dest), vecAdd(dest, vecCount(src)), memcpy(dest, src, vecCount(src)))
#define vecAppend(vec, val) (_vecmaygrow(vec, 1), (vec)[_veclen(vec)] = (val), _veclen(vec)++)
#define vecAppend(vec, ...) (_vecmaygrow(vec, 1), (vec)[_veclen(vec)] = (__VA_ARGS__), _veclen(vec)++)
#define vecRemove(vec, ind) ((vec) ? (vec)[(ind)] = (vec)[--_veclen(vec)], NULL : 0)
#define vecRemoveIt(vec, it) (vecRemove((vec), (it) - (vec)))
#define vecLen(vec) ((vec) ? _veclen(vec) : 0)
#define vecCap(vec) ((vec) ? _veccap(vec) : 0)