diff --git a/src/collatypes.h b/colla/collatypes.h similarity index 100% rename from src/collatypes.h rename to colla/collatypes.h diff --git a/colla/coropool.c b/colla/coropool.c new file mode 100644 index 0000000..209962f --- /dev/null +++ b/colla/coropool.c @@ -0,0 +1,322 @@ +#include "coropool.h" + +#if 0 +#include +#include + +#include "jobpool.h" + +enum { + NUM_JOBS = 50 +}; + +struct _pool_internal_t; + +typedef union job_t { + struct { + mco_coro *co; + struct _pool_internal_t *pool; + }; + struct { + union job_t *next_free; + void *__padding; + }; +} job_t; + +static inline bool _isJobValid(job_t *job) { + return job->pool != NULL; +} + +typedef struct jobchunk_t { + job_t jobs[NUM_JOBS]; + job_t *first_free; + struct jobchunk_t *next; +} jobchunk_t; + +void _chunkInit(jobchunk_t *chunk) { + if (!chunk) return; + chunk->first_free = chunk->jobs; + for (int i = 0; i < (NUM_JOBS - 1); ++i) { + chunk->jobs[i].next_free = chunk->jobs + i + 1; + } +} + +jobchunk_t *_chunkNew(jobchunk_t *prev) { + jobchunk_t *chunk = calloc(1, sizeof(jobchunk_t)); + _chunkInit(chunk); + prev->next = chunk; + return chunk; +} + +job_t *_chunkGetFirstFree(jobchunk_t *chunk) { + job_t *first_free = chunk->first_free; + if (first_free) { + chunk->first_free = first_free->next_free; + } + return first_free; +} + +void _chunkSetFree(jobchunk_t *chunk, job_t *job) { + job->pool = NULL; + job->next_free = chunk->first_free; + chunk->first_free = job; +} + +typedef struct _pool_internal_t { + jobpool_t pool; + jobchunk_t head_chunk; + cmutex_t chunk_mtx; +} _pool_internal_t; + +static int _worker(void *arg); + +coropool_t copoInit(uint32 num) { + _pool_internal_t *pool = calloc(1, sizeof(_pool_internal_t)); + pool->pool = poolInit(num); + _chunkInit(&pool->head_chunk); + pool->chunk_mtx = mtxInit(); + return pool; +} + +void copoFree(coropool_t copo) { + _pool_internal_t *pool = copo; + + poolWait(pool->pool); + poolFree(pool->pool); + + jobchunk_t *chunk = &pool->head_chunk; + while (chunk) { + jobchunk_t *next = chunk->next; + free(chunk); + chunk = next; + } + + mtxFree(pool->chunk_mtx); + free(pool); +} + +bool copoAdd(coropool_t copo, copo_func_f fn) { + mco_desc desc = mco_desc_init(fn, 0); + return copoAddDesc(copo, &desc); +} + +bool copoAddDesc(coropool_t copo, mco_desc *desc) { + mco_coro *co; + mco_create(&co, desc); + return copoAddCo(copo, co); +} + +static bool _copoAddJob(job_t *job) { + //return poolAdd(job->pool->pool, _worker, job); + return true; +} + +static bool _copoRemoveJob(job_t *job) { + _pool_internal_t *pool = job->pool; + // find chunk + jobchunk_t *chunk = &pool->head_chunk; + while (chunk) { + if (chunk->jobs < job && (chunk->jobs + NUM_JOBS) > job) { + break; + } + chunk = chunk->next; + } + if (!chunk) return false; + mtxLock(pool->chunk_mtx); + _chunkSetFree(chunk, job); + mtxUnlock(pool->chunk_mtx); +} + +bool copoAddCo(coropool_t copo, mco_coro *co) { + _pool_internal_t *pool = copo; + + job_t *new_job = NULL; + jobchunk_t *chunk = &pool->head_chunk; + + mtxLock(pool->chunk_mtx); + while (!new_job) { + new_job = _chunkGetFirstFree(chunk); + if (!new_job) { + if (!chunk->next) { + info("adding new chunk"); + chunk->next = _chunkNew(chunk); + } + chunk = chunk->next; + } + } + mtxUnlock(pool->chunk_mtx); + + new_job->co = co; + new_job->pool = pool; + + //return poolAdd(pool->pool, _worker, new_job); + return _copoAddJob(new_job); +} + +void copoWait(coropool_t copo) { + _pool_internal_t *pool = copo; + poolWait(pool->pool); +} + +static int _worker(void *arg) { + job_t *job = arg; + mco_result res = mco_resume(job->co); + if (res != MCO_DEAD) { + _copoAddJob(job); + } + else { + _copoRemoveJob(job); + } + return 0; +} +#endif + +#include + +typedef struct { + vec(mco_coro *) jobs; + uint32 head; + cmutex_t work_mutex; + condvar_t work_cond; + condvar_t working_cond; + int32 working_count; + int32 thread_count; + bool stop; +} _copo_internal_t; + +static mco_coro *_getJob(_copo_internal_t *copo); +static int _copoWorker(void *arg); + +coropool_t copoInit(uint32 num) { + if (!num) num = 2; + + _copo_internal_t *copo = malloc(sizeof(_copo_internal_t)); + *copo = (_copo_internal_t){ + .work_mutex = mtxInit(), + .work_cond = condInit(), + .working_cond = condInit(), + .thread_count = (int32)num + }; + + for (usize i = 0; i < num; ++i) { + thrDetach(thrCreate(_copoWorker, copo)); + } + + return copo; +} + +void copoFree(coropool_t copo_in) { + _copo_internal_t *copo = copo_in; + if (!copo) return; + + mtxLock(copo->work_mutex); + copo->stop = true; + condWakeAll(copo->work_cond); + mtxUnlock(copo->work_mutex); + + copoWait(copo); + + vecFree(copo->jobs); + mtxFree(copo->work_mutex); + condFree(copo->work_cond); + condFree(copo->working_cond); + + free(copo); +} + +bool copoAdd(coropool_t copo, copo_func_f fn) { + mco_desc desc = mco_desc_init(fn, 0); + return copoAddDesc(copo, &desc); +} + +bool copoAddDesc(coropool_t copo, mco_desc *desc) { + mco_coro *co; + mco_create(&co, desc); + return copoAddCo(copo, co); +} + +bool copoAddCo(coropool_t copo_in, mco_coro *co) { + _copo_internal_t *copo = copo_in; + if (!copo) return false; + + mtxLock(copo->work_mutex); + + if (copo->head > vecLen(copo->jobs)) { + vecClear(copo->jobs); + copo->head = 0; + } + + vecAppend(copo->jobs, co); + + condWake(copo->work_cond); + mtxUnlock(copo->work_mutex); + + return true; +} + +void copoWait(coropool_t copo_in) { + _copo_internal_t *copo = copo_in; + if (!copo) return; + + mtxLock(copo->work_mutex); + // while its either + // - working and there's still some threads doing some work + // - not working and there's still some threads exiting + while ((!copo->stop && copo->working_count > 0) || + (copo->stop && copo->thread_count > 0) + ) { + condWait(copo->working_cond, copo->work_mutex); + } + mtxUnlock(copo->work_mutex); +} + +// == PRIVATE FUNCTIONS =================================== + +static mco_coro *_getJob(_copo_internal_t *copo) { + if (copo->head >= vecLen(copo->jobs)) { + copo->head = 0; + } + return copo->jobs[copo->head++]; +} + +static int _copoWorker(void *arg) { + _copo_internal_t *copo = arg; + + while (true) { + mtxLock(copo->work_mutex); + // wait for a new job + while (copo->head >= vecLen(copo->jobs) && !copo->stop) { + condWait(copo->work_cond, copo->work_mutex); + } + + if (copo->stop) { + break; + } + + mco_coro *job = _getJob(copo); + copo->working_count++; + mtxUnlock(copo->work_mutex); + + if (job && mco_status(job) != MCO_DEAD) { + mco_resume(job); + if (mco_status(job) != MCO_DEAD) { + copoAddCo(copo, job); + } + } + + mtxLock(copo->work_mutex); + copo->working_count--; + if (!copo->stop && + copo->working_count == 0 && + copo->head == vecLen(copo->jobs) + ) { + condWake(copo->working_cond); + } + mtxUnlock(copo->work_mutex); + } + + copo->thread_count--; + condWake(copo->working_cond); + mtxUnlock(copo->work_mutex); + return 0; +} \ No newline at end of file diff --git a/colla/coropool.h b/colla/coropool.h new file mode 100644 index 0000000..14cf370 --- /dev/null +++ b/colla/coropool.h @@ -0,0 +1,17 @@ +#pragma once + +#include +#include + +#include "minicoro.h" + +typedef void *coropool_t; +typedef void (*copo_func_f)(mco_coro *co); + +coropool_t copoInit(uint32 num); +void copoFree(coropool_t copo); + +bool copoAdd(coropool_t copo, copo_func_f fn); +bool copoAddDesc(coropool_t copo, mco_desc *desc); +bool copoAddCo(coropool_t copo, mco_coro *co); +void copoWait(coropool_t copo); diff --git a/src/cthreads.c b/colla/cthreads.c similarity index 64% rename from src/cthreads.c rename to colla/cthreads.c index f680421..923c394 100644 --- a/src/cthreads.c +++ b/colla/cthreads.c @@ -75,8 +75,9 @@ cmutex_t mtxInit(void) { return (cmutex_t)crit_sec; } -void mtxDestroy(cmutex_t ctx) { +void mtxFree(cmutex_t ctx) { DeleteCriticalSection((CRITICAL_SECTION *)ctx); + free((CRITICAL_SECTION *)ctx); } bool mtxValid(cmutex_t ctx) { @@ -97,6 +98,35 @@ bool mtxUnlock(cmutex_t ctx) { return true; } +// == CONDITION VARIABLE =============================== + +#include + +condvar_t condInit(void) { + CONDITION_VARIABLE *cond = malloc(sizeof(CONDITION_VARIABLE)); + InitializeConditionVariable(cond); + return (condvar_t)cond; +} + +void condFree(condvar_t cond) { + free((CONDITION_VARIABLE *)cond); +} + +void condWake(condvar_t cond) { + WakeConditionVariable((CONDITION_VARIABLE *)cond); +} + +void condWakeAll(condvar_t cond) { + WakeAllConditionVariable((CONDITION_VARIABLE *)cond); +} + +void condWait(condvar_t cond, cmutex_t mtx) { + BOOL res = SleepConditionVariableCS((CONDITION_VARIABLE *)cond, (CRITICAL_SECTION *)mtx, INFINITE); +} + +void condWaitTimed(condvar_t cond, cmutex_t mtx, int milliseconds) { + SleepConditionVariableCS((CONDITION_VARIABLE *)cond, (CRITICAL_SECTION *)mtx, milliseconds); +} #else #include @@ -168,14 +198,16 @@ cmutex_t mtxInit(void) { pthread_mutex_t *mutex = malloc(sizeof(pthread_mutex_t)); if(mutex) { - int res = pthread_mutex_init(mutex, NULL); - if(res != 0) mutex = NULL; + if(pthread_mutex_init(mutex, NULL)) { + free(mutex); + mutex = NULL; + } } return (cmutex_t)mutex; } -void mtxDestroy(cmutex_t ctx) { +void mtxFree(cmutex_t ctx) { pthread_mutex_destroy((pthread_mutex_t *)ctx); } @@ -195,4 +227,44 @@ bool mtxUnlock(cmutex_t ctx) { return pthread_mutex_unlock((pthread_mutex_t *)ctx) == 0; } +// == CONDITION VARIABLE =============================== + +condvar_t condInit(void) { + pthread_cond_t *cond = malloc(sizeof(pthread_cond_t)); + + if(cond) { + if(pthread_cond_init(cond, NULL)) { + free(cond); + cond = NULL; + } + } + + return (condvar_t)cond; +} + +void condFree(condvar_t cond) { + if (!cond) return; + pthread_cond_destroy((pthread_cond_t *)cond); + free((pthread_cond_t *)cond); +} + +void condWake(condvar_t cond) { + pthread_cond_signal((pthread_cond_t *)cond); +} + +void condWakeAll(condvar_t cond) { + pthread_cond_broadcast((pthread_cond_t *)cond); +} + +void condWait(condvar_t cond, cmutex_t mtx) { + pthread_cond_wait((pthread_cond_t *)cond, (pthread_mutex_t *)mtx); +} + +void condWaitTimed(condvar_t cond, cmutex_t mtx, int milliseconds) { + struct timespec timeout; + time(&timeout.tv_sec); + timeout.tv_nsec += milliseconds * 1000000; + pthread_cond_timedwait((pthread_cond_t *)cond, (pthread_mutex_t *)mtx, &timeout); +} + #endif diff --git a/src/cthreads.h b/colla/cthreads.h similarity index 71% rename from src/cthreads.h rename to colla/cthreads.h index 52b187c..944b3cd 100644 --- a/src/cthreads.h +++ b/colla/cthreads.h @@ -29,7 +29,7 @@ bool thrJoin(cthread_t ctx, int *code); typedef uintptr_t cmutex_t; cmutex_t mtxInit(void); -void mtxDestroy(cmutex_t ctx); +void mtxFree(cmutex_t ctx); bool mtxValid(cmutex_t ctx); @@ -62,6 +62,21 @@ struct lock_t { }; #endif +// == CONDITION VARIABLE =============================== + +typedef uintptr_t condvar_t; + +#define COND_WAIT_INFINITE 0xFFFFFFFF + +condvar_t condInit(void); +void condFree(condvar_t cond); + +void condWake(condvar_t cond); +void condWakeAll(condvar_t cond); + +void condWait(condvar_t cond, cmutex_t mtx); +void condWaitTimed(condvar_t cond, cmutex_t mtx, int milliseconds); + #ifdef __cplusplus } // extern "C" #endif diff --git a/src/dir.c b/colla/dir.c similarity index 100% rename from src/dir.c rename to colla/dir.c diff --git a/src/dir.h b/colla/dir.h similarity index 100% rename from src/dir.h rename to colla/dir.h diff --git a/src/file.c b/colla/file.c similarity index 89% rename from src/file.c rename to colla/file.c index 6cd03b0..13a028c 100644 --- a/src/file.c +++ b/colla/file.c @@ -24,10 +24,7 @@ static DWORD _toWin32Creation(filemode_t mode) { } bool fileExists(const char *fname) { - DWORD dwAttrib = GetFileAttributesA(fname); - - return (dwAttrib != INVALID_FILE_ATTRIBUTES && - !(dwAttrib & FILE_ATTRIBUTE_DIRECTORY)); + return GetFileAttributesA(fname) != INVALID_FILE_ATTRIBUTES; } file_t fileOpen(const char *fname, filemode_t mode) { @@ -95,6 +92,12 @@ uint64 fileTell(file_t ctx) { return result == TRUE ? (uint64)tell.QuadPart : 0; } +uint64 fileGetTime(file_t ctx) { + uint64 fp_time = 0; + GetFileTime((HANDLE)ctx, NULL, NULL, (FILETIME *)&fp_time); + return fp_time; +} + #else #include #include @@ -164,6 +167,10 @@ void fileRewind(file_t ctx) { uint64 fileTell(file_t ctx) { return (uint64)ftell((FILE*)ctx); } + +uint64 fileGetTime(file_t ctx) { + +} #endif static str_t _readWholeInternalStr(file_t ctx) { @@ -292,3 +299,13 @@ bool fileWriteWholeText(const char *fname, strview_t string) { bool fileWriteWholeTextFP(file_t ctx, strview_t string) { return fileWriteWholeFP(ctx, (filebuf_t){ (uint8 *)string.buf, string.len }); } + +uint64 fileGetTimePath(const char *path) { + file_t fp = fileOpen(path, FILE_READ); + if (!fileIsValid(fp)) { + return 0; + } + uint64 fp_time = fileGetTime(fp); + fileClose(fp); + return fp_time; +} \ No newline at end of file diff --git a/src/file.h b/colla/file.h similarity index 90% rename from src/file.h rename to colla/file.h index 98e0497..9cc1fa5 100644 --- a/src/file.h +++ b/colla/file.h @@ -54,6 +54,9 @@ bool fileWriteWholeFP(file_t ctx, filebuf_t data); bool fileWriteWholeText(const char *fname, strview_t string); bool fileWriteWholeTextFP(file_t ctx, strview_t string); +uint64 fileGetTime(file_t ctx); +uint64 fileGetTimePath(const char *path); + #ifdef __cplusplus } // extern "C" #endif diff --git a/src/hashmap.c b/colla/hashmap.c similarity index 100% rename from src/hashmap.c rename to colla/hashmap.c diff --git a/src/hashmap.h b/colla/hashmap.h similarity index 100% rename from src/hashmap.h rename to colla/hashmap.h diff --git a/src/http.c b/colla/http.c similarity index 61% rename from src/http.c rename to colla/http.c index f9cee75..4767eef 100644 --- a/src/http.c +++ b/colla/http.c @@ -4,7 +4,7 @@ #include #include -#include "os.h" +// #include "os.h" #include "tracelog.h" #include "vec.h" @@ -48,6 +48,62 @@ static void _setField(vec(http_field_t) *fields_vec, const char *key, const char vecAppend(*fields_vec, field); } +static void _parseFields(vec(http_field_t) *fields, str_istream_t *in) { + strview_t line; + + do { + line = istrGetview(in, '\r'); + + usize pos = strvFind(line, ':', 0); + if(pos != STRV_NOT_FOUND) { + strview_t key = strvSub(line, 0, pos); + strview_t value = strvSub(line, pos + 2, SIZE_MAX); + + char *key_str = NULL; + char *value_str = NULL; + + key_str = strvCopy(key).buf; + value_str = strvCopy(value).buf; + + _setField(fields, key_str, value_str); + + free(key_str); + free(value_str); + } + + istrSkip(in, 2); // skip \r\n + } while(line.len > 2); +} + +// == HTTP STATUS ============================================================= + +const char *httpGetStatusString(resstatus_t status) { + switch (status) { + case STATUS_OK: return "OK"; + case STATUS_CREATED: return "CREATED"; + case STATUS_ACCEPTED: return "ACCEPTED"; + case STATUS_NO_CONTENT: return "NO CONTENT"; + case STATUS_RESET_CONTENT: return "RESET CONTENT"; + case STATUS_PARTIAL_CONTENT: return "PARTIAL CONTENT"; + case STATUS_MULTIPLE_CHOICES: return "MULTIPLE CHOICES"; + case STATUS_MOVED_PERMANENTLY: return "MOVED PERMANENTLY"; + case STATUS_MOVED_TEMPORARILY: return "MOVED TEMPORARILY"; + case STATUS_NOT_MODIFIED: return "NOT MODIFIED"; + case STATUS_BAD_REQUEST: return "BAD REQUEST"; + case STATUS_UNAUTHORIZED: return "UNAUTHORIZED"; + case STATUS_FORBIDDEN: return "FORBIDDEN"; + case STATUS_NOT_FOUND: return "NOT FOUND"; + case STATUS_RANGE_NOT_SATISFIABLE: return "RANGE NOT SATISFIABLE"; + case STATUS_INTERNAL_SERVER_ERROR: return "INTERNAL SERVER_ERROR"; + case STATUS_NOT_IMPLEMENTED: return "NOT IMPLEMENTED"; + case STATUS_BAD_GATEWAY: return "BAD GATEWAY"; + case STATUS_SERVICE_NOT_AVAILABLE: return "SERVICE NOT AVAILABLE"; + case STATUS_GATEWAY_TIMEOUT: return "GATEWAY TIMEOUT"; + case STATUS_VERSION_NOT_SUPPORTED: return "VERSION NOT SUPPORTED"; + } + return "UNKNOWN"; +} + // == HTTP VERSION ============================================================ int httpVerNumber(http_version_t ver) { @@ -57,22 +113,65 @@ int httpVerNumber(http_version_t ver) { // == HTTP REQUEST ============================================================ http_request_t reqInit() { - http_request_t req; - memset(&req, 0, sizeof(req)); - reqSetUri(&req, strvInit("/")); + http_request_t req = {0}; + reqSetUri(&req, strvInit("")); req.version = (http_version_t){1, 1}; return req; } -void reqFree(http_request_t *ctx) { - for (uint32 i = 0; i < vecLen(ctx->fields); ++i) { - free(ctx->fields[i].key); - free(ctx->fields[i].value); +http_request_t reqParse(const char *request) { + http_request_t req = {0}; + str_istream_t in = istrInit(request); + + // get data + + strview_t method = strvTrim(istrGetview(&in, '/')); + istrSkip(&in, 1); // skip / + strview_t page = strvTrim(istrGetview(&in, ' ')); + strview_t http = strvTrim(istrGetview(&in, '\n')); + + istrSkip(&in, 1); // skip \n + + _parseFields(&req.fields, &in); + + strview_t body = strvTrim(istrGetviewLen(&in, 0, SIZE_MAX)); + + // parse data + + // -- method + const char *methods[] = { "GET", "POST", "HEAD", "PUT", "DELETE" }; + const int methods_count = sizeof(methods) / sizeof(*methods); + + for (int i = 0; i < methods_count; ++i) { + if (strvCompare(method, strvInit(methods[i])) == 0) { + req.method = (reqtype_t)i; + } } - vecFree(ctx->fields); - free(ctx->uri); - free(ctx->body); - memset(ctx, 0, sizeof(http_request_t)); + + // -- page + req.uri = strvCopy(page).buf; + + // -- http + in = istrInitLen(http.buf, http.len); + istrIgnoreAndSkip(&in, '/'); // skip HTTP/ + istrGetu8(&in, &req.version.major); + istrSkip(&in, 1); // skip . + istrGetu8(&in, &req.version.minor); + + // -- body + req.body = strvCopy(body).buf; + + return req; +} + +void reqFree(http_request_t ctx) { + for (http_field_t *it = ctx.fields; it != vecEnd(ctx.fields); ++it) { + free(it->key); + free(it->value); + } + vecFree(ctx.fields); + free(ctx.uri); + free(ctx.body); } bool reqHasField(http_request_t *ctx, const char *key) { @@ -91,15 +190,10 @@ void reqSetField(http_request_t *ctx, const char *key, const char *value) { void reqSetUri(http_request_t *ctx, strview_t uri) { if (strvIsEmpty(uri)) return; free(ctx->uri); - if (uri.buf[0] != '/') { - ctx->uri = (char *)realloc(ctx->uri, uri.len + 1); - ctx->uri[0] = '/'; - memcpy(ctx->uri + 1, uri.buf, uri.len); - ctx->uri[uri.len] = '\0'; - } - else { - ctx->uri = strvCopy(uri).buf; + if (uri.buf[0] == '/') { + strvRemovePrefix(uri, 1); } + ctx->uri = strvCopy(uri).buf; } str_ostream_t reqPrepare(http_request_t *ctx) { @@ -115,7 +209,7 @@ str_ostream_t reqPrepare(http_request_t *ctx) { default: err("unrecognized method: %d", method); goto error; } - ostrPrintf(&out, "%s %s HTTP/%hhu.%hhu\r\n", + ostrPrintf(&out, "%s /%s HTTP/%hhu.%hhu\r\n", method, ctx->uri, ctx->version.major, ctx->version.minor ); @@ -139,18 +233,49 @@ str_t reqString(http_request_t *ctx) { // == HTTP RESPONSE =========================================================== -http_response_t resInit() { - return (http_response_t) {0}; +http_response_t resParse(const char *data) { + http_response_t ctx = {0}; + str_istream_t in = istrInit(data); + + char hp[5]; + istrGetstringBuf(&in, hp, 5); + if(stricmp(hp, "http") != 0) { + err("response doesn't start with 'HTTP', instead with %c%c%c%c", hp[0], hp[1], hp[2], hp[3]); + return ctx; + } + istrSkip(&in, 1); // skip / + istrGetu8(&in, &ctx.version.major); + istrSkip(&in, 1); // skip . + istrGetu8(&in, &ctx.version.minor); + istrGeti32(&in, (int32*)&ctx.status_code); + + istrIgnore(&in, '\n'); + istrSkip(&in, 1); // skip \n + + resParseFields(&ctx, &in); + + const char *tran_encoding = resGetField(&ctx, "transfer-encoding"); + if(tran_encoding == NULL || stricmp(tran_encoding, "chunked") != 0) { + strview_t body = istrGetviewLen(&in, 0, SIZE_MAX); + vecClear(ctx.body); + vecReserve(ctx.body, body.len); + memcpy(ctx.body, body.buf, body.len); + } + else { + // fatal("chunked encoding not implemented yet"); + err("chunked encoding not implemented yet"); + } + + return ctx; } -void resFree(http_response_t *ctx) { - for(uint32 i = 0; i < vecLen(ctx->fields); ++i) { - free(ctx->fields[i].key); - free(ctx->fields[i].value); +void resFree(http_response_t ctx) { + for (http_field_t *it = ctx.fields; it != vecEnd(ctx.fields); ++it) { + free(it->key); + free(it->value); } - vecFree(ctx->fields); - vecFree(ctx->body); - memset(ctx, 0, sizeof(http_response_t)); + vecFree(ctx.fields); + vecFree(ctx.body); } bool resHasField(http_response_t *ctx, const char *key) { @@ -162,6 +287,10 @@ bool resHasField(http_response_t *ctx, const char *key) { return false; } +void resSetField(http_response_t *ctx, const char *key, const char *value) { + _setField(&ctx->fields, key, value); +} + const char *resGetField(http_response_t *ctx, const char *field) { for(uint32 i = 0; i < vecLen(ctx->fields); ++i) { if(stricmp(ctx->fields[i].key, field) == 0) { @@ -171,63 +300,30 @@ const char *resGetField(http_response_t *ctx, const char *field) { return NULL; } -void resParse(http_response_t *ctx, const char *data) { - str_istream_t in = istrInit(data); - - char hp[5]; - istrGetstringBuf(&in, hp, 5); - if(stricmp(hp, "http") != 0) { - err("response doesn't start with 'HTTP', instead with %c%c%c%c", hp[0], hp[1], hp[2], hp[3]); - return; - } - istrSkip(&in, 1); // skip / - istrGetu8(&in, &ctx->version.major); - istrSkip(&in, 1); // skip . - istrGetu8(&in, &ctx->version.minor); - istrGeti32(&in, (int32*)&ctx->status_code); - - istrIgnore(&in, '\n'); - istrSkip(&in, 1); // skip \n - - resParseFields(ctx, &in); - - const char *tran_encoding = resGetField(ctx, "transfer-encoding"); - if(tran_encoding == NULL || stricmp(tran_encoding, "chunked") != 0) { - strview_t body = istrGetviewLen(&in, 0, SIZE_MAX); - vecClear(ctx->body); - vecReserve(ctx->body, body.len); - memcpy(ctx->body, body.buf, body.len); - } - else { - fatal("chunked encoding not implemented yet"); - } +void resParseFields(http_response_t *ctx, str_istream_t *in) { + _parseFields(&ctx->fields, in); } -void resParseFields(http_response_t *ctx, str_istream_t *in) { - strview_t line; +str_ostream_t resPrepare(http_response_t *ctx) { + str_ostream_t out = ostrInitLen(1024); - do { - line = istrGetview(in, '\r'); + ostrPrintf( + &out, "HTTP/%hhu.%hhu %d %s\r\n", + ctx->version.major, ctx->version.minor, + ctx->status_code, httpGetStatusString(ctx->status_code) + ); + for (http_field_t *field = ctx->fields; field != vecEnd(ctx->fields); ++field) { + ostrPrintf(&out, "%s: %s\r\n", field->key, field->value); + } + ostrPuts(&out, "\r\n"); + ostrAppendview(&out, strvInitLen(ctx->body, vecLen(ctx->body))); - usize pos = strvFind(line, ':', 0); - if(pos != STRV_NOT_FOUND) { - strview_t key = strvSub(line, 0, pos); - strview_t value = strvSub(line, pos + 2, SIZE_MAX); + return out; +} - char *key_str = NULL; - char *value_str = NULL; - - key_str = strvCopy(key).buf; - value_str = strvCopy(value).buf; - - _setField(&ctx->fields, key_str, value_str); - - free(key_str); - free(value_str); - } - - istrSkip(in, 2); // skip \r\n - } while(line.len > 2); +str_t resString(http_response_t *ctx) { + str_ostream_t out = resPrepare(ctx); + return ostrAsStr(out); } // == HTTP CLIENT ============================================================= @@ -238,9 +334,8 @@ http_client_t hcliInit() { }; } -void hcliFree(http_client_t *ctx) { - strFree(ctx->host_name); - memset(ctx, 0, sizeof(http_client_t)); +void hcliFree(http_client_t ctx) { + strFree(ctx.host_name); } void hcliSetHost(http_client_t *ctx, strview_t hostname) { @@ -283,7 +378,7 @@ http_response_t hcliSendRequest(http_client_t *ctx, http_request_t *req) { reqSetField(req, "Connection", "close"); } - http_response_t res = resInit(); + http_response_t res = {0}; str_t req_str = strInit(); str_ostream_t received = ostrInitLen(1024); @@ -327,7 +422,7 @@ http_response_t hcliSendRequest(http_client_t *ctx, http_request_t *req) { received.len--; } - resParse(&res, received.buf); + res = resParse(received.buf); } else { err("Couldn't connect to host %s -> %s", ctx->host_name, skGetErrorString()); @@ -357,8 +452,8 @@ http_response_t httpGet(strview_t hostname, strview_t uri) { http_response_t res = hcliSendRequest(&client, &request); - reqFree(&request); - hcliFree(&client); + reqFree(request); + hcliFree(client); return res; } diff --git a/src/http.h b/colla/http.h similarity index 83% rename from src/http.h rename to colla/http.h index 9e9f327..b0fb23d 100644 --- a/src/http.h +++ b/colla/http.h @@ -51,6 +51,8 @@ typedef enum { STATUS_VERSION_NOT_SUPPORTED = 505, } resstatus_t; +const char *httpGetStatusString(resstatus_t status); + typedef struct { uint8 major; uint8 minor; @@ -77,7 +79,8 @@ typedef struct { } http_request_t; http_request_t reqInit(void); -void reqFree(http_request_t *ctx); +http_request_t reqParse(const char *request); +void reqFree(http_request_t ctx); bool reqHasField(http_request_t *ctx, const char *key); @@ -96,14 +99,17 @@ typedef struct { vec(uint8) body; } http_response_t; -http_response_t resInit(void); -void resFree(http_response_t *ctx); +http_response_t resParse(const char *data); +void resFree(http_response_t ctx); bool resHasField(http_response_t *ctx, const char *key); +void resSetField(http_response_t *ctx, const char *key, const char *value); const char *resGetField(http_response_t *ctx, const char *field); -void resParse(http_response_t *ctx, const char *data); +// void resParse(http_response_t *ctx, const char *data); void resParseFields(http_response_t *ctx, str_istream_t *in); +str_ostream_t resPrepare(http_response_t *ctx); +str_t resString(http_response_t *ctx); // == HTTP CLIENT ============================================================= @@ -114,7 +120,7 @@ typedef struct { } http_client_t; http_client_t hcliInit(void); -void hcliFree(http_client_t *ctx); +void hcliFree(http_client_t ctx); void hcliSetHost(http_client_t *ctx, strview_t hostname); http_response_t hcliSendRequest(http_client_t *ctx, http_request_t *request); diff --git a/src/ini.c b/colla/ini.c similarity index 89% rename from src/ini.c rename to colla/ini.c index 823ebc3..00df248 100644 --- a/src/ini.c +++ b/colla/ini.c @@ -6,8 +6,11 @@ // == INI READER ======================================================================== -static const iniopts_t default_opts = {0}; +static const iniopts_t default_opts = { + .key_value_divider = '=' +}; +static iniopts_t setDefaultOptions(const iniopts_t *options); static initable_t *findTable(ini_t *ctx, strview_t name); static inivalue_t *findValue(vec(inivalue_t) values, strview_t key); static void addTable(ini_t *ctx, str_istream_t *in, const iniopts_t *options); @@ -32,14 +35,13 @@ void _iniParseInternal(ini_t *ini, const iniopts_t *options) { } istrSkipWhitespace(&in); } - } ini_t iniParse(const char *filename, const iniopts_t *options) { ini_t ini = { .text = fileReadWholeText(filename) }; if (strIsEmpty(ini.text)) return ini; - if (!options) options = &default_opts; - _iniParseInternal(&ini, options); + iniopts_t opts = setDefaultOptions(options); + _iniParseInternal(&ini, &opts); return ini; } @@ -265,6 +267,23 @@ winitable_t *winiAddTabView(iniwriter_t *ctx, strview_t name) { // == PRIVATE FUNCTIONS ======================================================== +static iniopts_t setDefaultOptions(const iniopts_t *options) { + if (!options) return default_opts; + + iniopts_t opts = default_opts; + + if (options->merge_duplicate_keys) + opts.merge_duplicate_keys = options->merge_duplicate_keys; + + if (options->merge_duplicate_tables) + opts.merge_duplicate_tables = options->merge_duplicate_tables; + + if (options->key_value_divider) + opts.key_value_divider = options->key_value_divider; + + return opts; +} + static initable_t *findTable(ini_t *ctx, strview_t name) { if (strvIsEmpty(name)) return NULL; for (uint32 i = 1; i < vecLen(ctx->tables); ++i) { @@ -312,7 +331,7 @@ static void addTable(ini_t *ctx, str_istream_t *in, const iniopts_t *options) { static void addValue(initable_t *table, str_istream_t *in, const iniopts_t *options) { if (!table) fatal("table is null"); - strview_t key = strvTrim(istrGetview(in, '=')); + strview_t key = strvTrim(istrGetview(in, options->key_value_divider)); istrSkip(in, 1); strview_t value = strvTrim(istrGetview(in, '\n')); // value might be until EOF, in that case no use in skipping diff --git a/src/ini.h b/colla/ini.h similarity index 87% rename from src/ini.h rename to colla/ini.h index 945dacf..24d8010 100644 --- a/src/ini.h +++ b/colla/ini.h @@ -27,8 +27,9 @@ typedef struct { } ini_t; typedef struct { - bool merge_duplicate_tables; - bool merge_duplicate_keys; + bool merge_duplicate_tables; // default false + bool merge_duplicate_keys; // default false + char key_value_divider; // default = } iniopts_t; ini_t iniParse(const char *filename, const iniopts_t *options); @@ -63,7 +64,8 @@ typedef struct { } iniwriter_t; typedef struct { - bool no_discalimer; + bool no_discalimer; // default false + char key_value_divider; // default = } winiopts_t; iniwriter_t winiInit(); diff --git a/colla/jobpool.c b/colla/jobpool.c new file mode 100644 index 0000000..5fdb86f --- /dev/null +++ b/colla/jobpool.c @@ -0,0 +1,144 @@ +#include "jobpool.h" + +#include + +typedef struct { + cthread_func_t func; + void *arg; +} job_t; + +typedef struct { + vec(job_t) jobs; + uint32 head; + cmutex_t work_mutex; + condvar_t work_cond; + condvar_t working_cond; + int32 working_count; + int32 thread_count; + bool stop; +} _pool_internal_t; + +static job_t _getJob(_pool_internal_t *pool); +static int _poolWorker(void *arg); + +jobpool_t poolInit(uint32 num) { + if (!num) num = 2; + + _pool_internal_t *pool = malloc(sizeof(_pool_internal_t)); + *pool = (_pool_internal_t){ + .work_mutex = mtxInit(), + .work_cond = condInit(), + .working_cond = condInit(), + .thread_count = (int32)num + }; + + for (usize i = 0; i < num; ++i) { + thrDetach(thrCreate(_poolWorker, pool)); + } + + return pool; +} + +void poolFree(jobpool_t pool_in) { + _pool_internal_t *pool = pool_in; + if (!pool) return; + + mtxLock(pool->work_mutex); + pool->stop = true; + condWakeAll(pool->work_cond); + mtxUnlock(pool->work_mutex); + + poolWait(pool); + + vecFree(pool->jobs); + mtxFree(pool->work_mutex); + condFree(pool->work_cond); + condFree(pool->working_cond); + + free(pool); +} + +bool poolAdd(jobpool_t pool_in, cthread_func_t func, void *arg) { + _pool_internal_t *pool = pool_in; + if (!pool) return false; + + mtxLock(pool->work_mutex); + + if (pool->head > vecLen(pool->jobs)) { + vecClear(pool->jobs); + pool->head = 0; + } + + job_t job = { func, arg }; + vecAppend(pool->jobs, job); + + condWake(pool->work_cond); + mtxUnlock(pool->work_mutex); + + return true; +} + +void poolWait(jobpool_t pool_in) { + _pool_internal_t *pool = pool_in; + if (!pool) return; + + mtxLock(pool->work_mutex); + // while its either + // - working and there's still some threads doing some work + // - not working and there's still some threads exiting + while ((!pool->stop && pool->working_count > 0) || + (pool->stop && pool->thread_count > 0) + ) { + condWait(pool->working_cond, pool->work_mutex); + } + mtxUnlock(pool->work_mutex); +} + +// == PRIVATE FUNCTIONS =================================== + +static job_t _getJob(_pool_internal_t *pool) { + if (pool->head >= vecLen(pool->jobs)) { + pool->head = 0; + } + job_t job = pool->jobs[pool->head++]; + return job; +} + +static int _poolWorker(void *arg) { + _pool_internal_t *pool = arg; + + while (true) { + mtxLock(pool->work_mutex); + // wait for a new job + while (pool->head >= vecLen(pool->jobs) && !pool->stop) { + condWait(pool->work_cond, pool->work_mutex); + } + + if (pool->stop) { + break; + } + + job_t job = _getJob(pool); + pool->working_count++; + mtxUnlock(pool->work_mutex); + + if (job.func) { + job.func(job.arg); + } + + mtxLock(pool->work_mutex); + pool->working_count--; + if (!pool->stop && + pool->working_count == 0 && + pool->head == vecLen(pool->jobs) + ) { + condWake(pool->working_cond); + } + mtxUnlock(pool->work_mutex); + } + + pool->thread_count--; + condWake(pool->working_cond); + mtxUnlock(pool->work_mutex); + return 0; +} diff --git a/colla/jobpool.h b/colla/jobpool.h new file mode 100644 index 0000000..e33c2dc --- /dev/null +++ b/colla/jobpool.h @@ -0,0 +1,12 @@ +#pragma once + +#include +#include + +typedef void *jobpool_t; + +jobpool_t poolInit(uint32 num); +void poolFree(jobpool_t pool); + +bool poolAdd(jobpool_t pool, cthread_func_t func, void *arg); +void poolWait(jobpool_t pool); diff --git a/src/socket.c b/colla/socket.c similarity index 95% rename from src/socket.c rename to colla/socket.c index 02727a7..72d0e66 100644 --- a/src/socket.c +++ b/colla/socket.c @@ -21,6 +21,7 @@ static const char *_win_skGetErrorString(); #include #include #include // strerror +#include #define INVALID_SOCKET (-1) #define SOCKET_ERROR (-1) @@ -187,6 +188,14 @@ int skReceiveFromPro(socket_t sock, void *buf, int len, int flags, sk_addr_t *fr return recvfrom(sock, buf, len, flags, from, fromlen); } +int skPoll(skpoll_t *to_poll, int num_to_poll, int timeout) { +#if SOCK_WINDOWS + return WSAPoll(to_poll, num_to_poll, timeout); +#elif SOCK_POSIX + return poll(to_poll, num_to_poll, timeout); +#endif +} + bool skIsValid(socket_t sock) { return sock != INVALID_SOCKET; } diff --git a/src/socket.h b/colla/socket.h similarity index 94% rename from src/socket.h rename to colla/socket.h index 1256ed0..b0ce112 100644 --- a/src/socket.h +++ b/colla/socket.h @@ -32,6 +32,7 @@ extern "C" { typedef struct sockaddr sk_addr_t; typedef struct sockaddr_in sk_addrin_t; +typedef struct pollfd skpoll_t; typedef enum { SOCK_TCP, @@ -54,12 +55,15 @@ socket_t skOpenEx(const char *protocol); // Opens a socket, check socket_t with skValid socket_t skOpenPro(int af, int type, int protocol); -// Fill out a sk_addrin_t structure with "ip" and "port" -sk_addrin_t skAddrinInit(const char *ip, uint16_t port); +// Checks that a opened socket is valid, returns true on success +bool skIsValid(socket_t sock); // Closes a socket, returns true on success bool skClose(socket_t sock); +// Fill out a sk_addrin_t structure with "ip" and "port" +sk_addrin_t skAddrinInit(const char *ip, uint16_t port); + // Associate a local address with a socket bool skBind(socket_t sock, const char *ip, uint16_t port); // Associate a local address with a socket @@ -97,8 +101,8 @@ int skReceiveFrom(socket_t sock, void *buf, int len, sk_addrin_t *from); // Receives a datagram and stores the source address. int skReceiveFromPro(socket_t sock, void *buf, int len, int flags, sk_addr_t *from, sk_len_t *fromlen); -// Checks that a opened socket is valid, returns true on success -bool skIsValid(socket_t sock); +// Wait for an event on some sockets +int skPoll(skpoll_t *to_poll, int num_to_poll, int timeout); // Returns latest socket error, returns 0 if there is no error int skGetError(void); diff --git a/src/str.c b/colla/str.c similarity index 100% rename from src/str.c rename to colla/str.c diff --git a/src/str.h b/colla/str.h similarity index 100% rename from src/str.h rename to colla/str.h diff --git a/src/strstream.c b/colla/strstream.c similarity index 100% rename from src/strstream.c rename to colla/strstream.c diff --git a/src/strstream.h b/colla/strstream.h similarity index 100% rename from src/strstream.h rename to colla/strstream.h diff --git a/src/tracelog.c b/colla/tracelog.c similarity index 92% rename from src/tracelog.c rename to colla/tracelog.c index 6a97034..af02cad 100644 --- a/src/tracelog.c +++ b/colla/tracelog.c @@ -72,7 +72,15 @@ void traceLog(int level, const char *fmt, ...) { va_end(args); } +#include "cthreads.h" + +static cmutex_t g_mtx = 0; + void traceLogVaList(int level, const char *fmt, va_list args) { + if (!g_mtx) g_mtx = mtxInit(); + + mtxLock(g_mtx); + char buffer[MAX_TRACELOG_MSG_LENGTH]; memset(buffer, 0, sizeof(buffer)); @@ -115,6 +123,8 @@ void traceLogVaList(int level, const char *fmt, va_list args) { #ifndef TLOG_DONT_EXIT_ON_FATAL if (level == LogFatal) exit(1); #endif + + mtxUnlock(g_mtx); } void traceUseNewline(bool newline) { diff --git a/src/tracelog.h b/colla/tracelog.h similarity index 82% rename from src/tracelog.h rename to colla/tracelog.h index f7fa192..b97b95b 100644 --- a/src/tracelog.h +++ b/colla/tracelog.h @@ -21,6 +21,15 @@ void traceLog(int level, const char *fmt, ...); void traceLogVaList(int level, const char *fmt, va_list args); void traceUseNewline(bool use_newline); +#ifdef NO_LOG +#define tall(...) +#define trace(...) +#define debug(...) +#define info(...) +#define warn(...) +#define err(...) +#define fatal(...) +#else #define tall(...) traceLog(LogAll, __VA_ARGS__) #define trace(...) traceLog(LogTrace, __VA_ARGS__) #define debug(...) traceLog(LogDebug, __VA_ARGS__) @@ -28,6 +37,7 @@ void traceUseNewline(bool use_newline); #define warn(...) traceLog(LogWarning, __VA_ARGS__) #define err(...) traceLog(LogError, __VA_ARGS__) #define fatal(...) traceLog(LogFatal, __VA_ARGS__) +#endif #ifdef __cplusplus } // extern "C" diff --git a/src/utf8.c b/colla/utf8.c similarity index 100% rename from src/utf8.c rename to colla/utf8.c diff --git a/src/utf8.h b/colla/utf8.h similarity index 100% rename from src/utf8.h rename to colla/utf8.h diff --git a/src/vec.h b/colla/vec.h similarity index 87% rename from src/vec.h rename to colla/vec.h index 0085a33..8982c9b 100644 --- a/src/vec.h +++ b/colla/vec.h @@ -6,10 +6,12 @@ extern "C" { #define vec(T) T * -#define vecFree(vec) ((vec) ? free(_vecheader(vec)),NULL : NULL) +#define vecFree(vec) ((vec) ? free(_vecheader(vec)), NULL : NULL) #define vecCopy(src, dest) (vecFree(dest), vecAdd(dest, vecCount(src)), memcpy(dest, src, vecCount(src))) -#define vecAppend(vec, val) (_vecmaygrow(vec, 1), (vec)[_veclen(vec)] = (val), _veclen(vec)++) +#define vecAppend(vec, ...) (_vecmaygrow(vec, 1), (vec)[_veclen(vec)] = (__VA_ARGS__), _veclen(vec)++) +#define vecRemove(vec, ind) ((vec) ? (vec)[(ind)] = (vec)[--_veclen(vec)], NULL : 0) +#define vecRemoveIt(vec, it) (vecRemove((vec), (it) - (vec))) #define vecLen(vec) ((vec) ? _veclen(vec) : 0) #define vecCap(vec) ((vec) ? _veccap(vec) : 0) diff --git a/src/win32_slim.h b/colla/win32_slim.h similarity index 100% rename from src/win32_slim.h rename to colla/win32_slim.h