* threads support with mutexes
This commit is contained in:
alessandrobason 2021-10-27 11:45:27 +01:00
parent 59b55c7f6c
commit bb6f3f967c
5 changed files with 248 additions and 31 deletions

View file

@ -1,4 +1,4 @@
add_library(Colla STATIC add_library(colla STATIC
socket.c socket.c
tracelog.c tracelog.c
http.c http.c
@ -10,13 +10,14 @@ add_library(Colla STATIC
file.c file.c
dir.c dir.c
dirwatch.c dirwatch.c
cthreads.c
) )
if(MSVC) if(MSVC)
target_link_libraries(Colla ws2_32.lib) target_link_libraries(colla ws2_32.lib)
target_compile_options(Colla PRIVATE /W4) target_compile_options(colla PRIVATE /W4)
else() else()
target_link_libraries(Colla pthread) target_link_libraries(colla pthread)
target_compile_options(Colla PRIVATE -Wall -Wextra -Wpedantic) target_compile_options(colla PRIVATE -Wall -Wextra -Wpedantic)
target_compile_definitions(Colla PUBLIC _DEFAULT_SOURCE) target_compile_definitions(colla PUBLIC _DEFAULT_SOURCE)
endif() endif()

172
cthreads.c Normal file
View file

@ -0,0 +1,172 @@
#include "cthreads.h"
typedef struct {
cthread_func_t func;
void *arg;
} _thr_internal_t;
#ifdef _WIN32
#define VC_EXTRALEAN
#include <windows.h>
// == THREAD ===========================================
static DWORD _thrFuncInternal(void *arg) {
_thr_internal_t *params = (_thr_internal_t *)arg;
cthread_func_t func = params->func;
void *argument = params->arg;
free(params);
return (DWORD)func(argument);
}
cthread_t thrCreate(cthread_func_t func, void *arg) {
HANDLE thread = INVALID_HANDLE_VALUE;
_thr_internal_t *params = malloc(sizeof(_thr_internal_t));
if(params) {
params->func = func;
params->arg = arg;
thread = CreateThread(NULL, 0, _thrFuncInternal, params, 0, NULL);
}
return (cthread_t)thread;
}
bool thrValid(cthread_t ctx) {
return (HANDLE)ctx != INVALID_HANDLE_VALUE;
}
bool thrDetach(cthread_t ctx) {
return CloseHandle((HANDLE)ctx);
}
cthread_t thrCurrent(void) {
return (cthread_t)GetCurrentThread();
}
int thrCurrentId(void) {
return GetCurrentThreadId();
}
int thrGetId(cthread_t ctx) {
return GetThreadId((HANDLE)ctx);
}
void thrExit(int code) {
ExitThread(code);
}
bool thrJoin(cthread_t ctx, int *code) {
if(!ctx) return false;
int return_code = WaitForSingleObject((HANDLE)ctx, INFINITE);
if(code) *code = return_code;
BOOL success = CloseHandle((HANDLE)ctx);
return return_code != WAIT_FAILED && success;
}
// == MUTEX ============================================
cmutex_t mtxInit(void) {
CRITICAL_SECTION *crit_sec = malloc(sizeof(CRITICAL_SECTION));
if(crit_sec) {
InitializeCriticalSection(crit_sec);
}
return (cmutex_t)crit_sec;
// return (cmutex_t)CreateMutexW(NULL, false, NULL);
}
void mtxDestroy(cmutex_t ctx) {
DeleteCriticalSection((CRITICAL_SECTION *)ctx);
// CloseHandle((HANDLE)ctx);
}
bool mtxValid(cmutex_t ctx) {
return (void *)ctx != NULL;
}
bool mtxLock(cmutex_t ctx) {
EnterCriticalSection((CRITICAL_SECTION *)ctx);
return true;
// DWORD result = WaitForSingleObject((HANDLE)ctx, INFINITE);
// // TODO maybe remove abandoned? or return a enum? it'll hurt usability tho
// return result == WAIT_OBJECT_0 || result == WAIT_ABANDONED;
}
bool mtxTryLock(cmutex_t ctx) {
return TryEnterCriticalSection((CRITICAL_SECTION *)ctx);
// int result = mtxTimedLock(ctx, 0);
// if(result == CMTX_TIMEDOUT) return CMTX_BUSY;
// return result;
}
bool mtxUnlock(cmutex_t ctx) {
LeaveCriticalSection((CRITICAL_SECTION *)ctx);
return true;
// return ReleaseMutex((HANDLE)ctx);
}
#else
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <sys/types.h>
#define INT_TO_VOIDP(a) ((void *)((uintptr_t)(a)))
static void *_thrFuncInternal(void *arg) {
_thr_internal_t *params = (_thr_internal_t *)arg;
cthread_func_t func = params->func;
void *argument = params->arg;
free(params);
return INT_TO_VOIDP(func(argument));
}
cthread_t thrCreate(cthread_func_t func, void *arg) {
pthread_t handle = (pthread_t)NULL;
_thr_internal_t *params = malloc(sizeof(_thr_internal_t));
if(params) {
params->func = func;
params->arg = arg;
int result = pthread_create(&handle, NULL, _thrFuncInternal, params);
if(result) handle = (pthread_t)NULL;
}
return (cthread_t)handle;
}
bool thrValid(cthread_t ctx) {
return (void *)ctx != NULL;
}
bool thrDetach(cthread_t ctx) {
return pthread_detach((pthread_t)ctx) == 0;
}
cthread_t thrCurrent(void) {
return (cthread_t)pthread_self();
}
int thrCurrentId(void) {
return (int)pthread_self();
}
int thrGetId(cthread_t ctx) {
return (int)ctx;
}
void thrExit(int code) {
pthread_exit(INT_TO_VOIDP(code));
}
bool thrJoin(cthread_t ctx, int *code) {
void *result = code;
return pthread_join((pthread_t)ctx, &result) != 0;
}
#endif

42
cthreads.h Normal file
View file

@ -0,0 +1,42 @@
#pragma once
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdbool.h>
// == THREAD ===========================================
typedef uintptr_t cthread_t;
typedef int (*cthread_func_t)(void *);
cthread_t thrCreate(cthread_func_t func, void *arg);
bool thrValid(cthread_t ctx);
bool thrDetach(cthread_t ctx);
cthread_t thrCurrent(void);
int thrCurrentId(void);
int thrGetId(cthread_t ctx);
void thrExit(int code);
bool thrJoin(cthread_t ctx, int *code);
// == MUTEX ============================================
typedef uintptr_t cmutex_t;
cmutex_t mtxInit(void);
void mtxDestroy(cmutex_t ctx);
bool mtxValid(cmutex_t ctx);
bool mtxLock(cmutex_t ctx);
bool mtxTryLock(cmutex_t ctx);
bool mtxUnlock(cmutex_t ctx);
#ifdef __cplusplus
} // extern "C"
#endif

View file

@ -16,7 +16,7 @@ typedef struct {
HANDLE stop_event; HANDLE stop_event;
} __dirwatch_internal_t; } __dirwatch_internal_t;
static DWORD watchDirThread(void *cdata) { static int watchDirThread(void *cdata) {
__dirwatch_internal_t *desc = (__dirwatch_internal_t*)cdata; __dirwatch_internal_t *desc = (__dirwatch_internal_t*)cdata;
// stop_event is called from another thread when watchDirThread should exit // stop_event is called from another thread when watchDirThread should exit
@ -136,16 +136,9 @@ dirwatch_t watchDir(dirwatch_desc_t desc) {
dir.desc = (dirwatch_desc_t *)opts; dir.desc = (dirwatch_desc_t *)opts;
dir.handle = CreateThread( dir.handle = thrCreate(watchDirThread, (void *)dir.desc);
NULL,
0,
watchDirThread,
(void *)dir.desc,
0,
NULL
);
if(dir.handle) { if(thrValid(dir.handle)) {
info("watching %s", desc.path); info("watching %s", desc.path);
} }
@ -153,9 +146,15 @@ dirwatch_t watchDir(dirwatch_desc_t desc) {
} }
void waitForWatchDir(dirwatch_t *ctx) { void waitForWatchDir(dirwatch_t *ctx) {
if(!ctx->handle) return; if(!thrValid(ctx->handle)) {
err("not valid");
return;
}
WaitForSingleObject((HANDLE)ctx->handle, INFINITE); if(!thrJoin(ctx->handle, NULL)) {
err("dirwatch: couldn't wait for thread");
}
info("waited");
HeapFree(GetProcessHeap(), 0, ctx->desc); HeapFree(GetProcessHeap(), 0, ctx->desc);
} }
@ -169,7 +168,9 @@ void stopWatchDir(dirwatch_t *ctx, bool immediately) {
err("couldn't signal event stop_event: %d", GetLastError()); err("couldn't signal event stop_event: %d", GetLastError());
} }
} }
WaitForSingleObject((HANDLE)ctx->handle, INFINITE); if(!thrJoin(ctx->handle, NULL)) {
err("dirwatch: couldn't wait for thread");
}
HeapFree(GetProcessHeap(), 0, ctx->desc); HeapFree(GetProcessHeap(), 0, ctx->desc);
} }
@ -181,7 +182,6 @@ void stopWatchDir(dirwatch_t *ctx, bool immediately) {
#include <unistd.h> // read #include <unistd.h> // read
#include <string.h> #include <string.h>
#include <errno.h> #include <errno.h>
#include <pthread.h>
#include <linux/limits.h> // MAX_PATH #include <linux/limits.h> // MAX_PATH
#include <stdatomic.h> #include <stdatomic.h>
@ -198,7 +198,7 @@ typedef struct {
int wd; int wd;
} __dirwatch_internal_t; } __dirwatch_internal_t;
static void *watchDirThread(void *cdata) { static int watchDirThread(void *cdata) {
__dirwatch_internal_t *desc = (__dirwatch_internal_t *)cdata; __dirwatch_internal_t *desc = (__dirwatch_internal_t *)cdata;
info("watching %s", desc->path); info("watching %s", desc->path);
@ -255,9 +255,9 @@ static void *watchDirThread(void *cdata) {
inotify_rm_watch(desc->fd, desc->wd); inotify_rm_watch(desc->fd, desc->wd);
close(desc->fd); close(desc->fd);
return (void*)0; return 0;
error: error:
return (void*)1; return 1;
} }
dirwatch_t watchDir(dirwatch_desc_t desc) { dirwatch_t watchDir(dirwatch_desc_t desc) {
@ -272,16 +272,13 @@ dirwatch_t watchDir(dirwatch_desc_t desc) {
dir.desc = (dirwatch_desc_t *)opts; dir.desc = (dirwatch_desc_t *)opts;
pthread_t thread; dir.handle = thrCreate(watchDirThread, opts);
pthread_create(&thread, NULL, watchDirThread, opts);
dir.handle = (void *)thread;
return dir; return dir;
} }
void waitForWatchDir(dirwatch_t *ctx) { void waitForWatchDir(dirwatch_t *ctx) {
pthread_join((pthread_t)ctx->handle, NULL); thrJoin(ctx->handle, NULL);
free(ctx->desc); free(ctx->desc);
} }
@ -293,7 +290,7 @@ void stopWatchDir(dirwatch_t *ctx, bool immediately) {
inotify_rm_watch(opts->fd, opts->wd); inotify_rm_watch(opts->fd, opts->wd);
close(opts->fd); close(opts->fd);
} }
pthread_join((pthread_t)ctx->handle, NULL); thrJoin(ctx->handle, NULL);
free(opts); free(opts);
} }

View file

@ -28,6 +28,7 @@ extern "C" {
*/ */
#include <stdbool.h> #include <stdbool.h>
#include "cthreads.h"
enum { enum {
DIRWATCH_FILE_ADDED, DIRWATCH_FILE_ADDED,
@ -48,7 +49,7 @@ typedef struct {
} dirwatch_desc_t; } dirwatch_desc_t;
typedef struct { typedef struct {
void *handle; cthread_t handle;
dirwatch_desc_t *desc; dirwatch_desc_t *desc;
} dirwatch_t; } dirwatch_t;
@ -59,4 +60,8 @@ dirwatch_t watchDir(dirwatch_desc_t desc);
void waitForWatchDir(dirwatch_t *ctx); void waitForWatchDir(dirwatch_t *ctx);
// stops dirwatch thread, if immediately is true, it will try to close it right away // stops dirwatch thread, if immediately is true, it will try to close it right away
// otherwise it might wait for one last event // otherwise it might wait for one last event
void stopWatchDir(dirwatch_t *ctx, bool immediately); void stopWatchDir(dirwatch_t *ctx, bool immediately);
#ifdef __cplusplus
} // extern "C"
#endif