diff options
Diffstat (limited to 'engine.c')
-rw-r--r-- | engine.c | 514 |
1 files changed, 514 insertions, 0 deletions
diff --git a/engine.c b/engine.c new file mode 100644 index 0000000..4da6ff4 --- /dev/null +++ b/engine.c @@ -0,0 +1,514 @@ +#include <unistd.h> +#include "engine.h" +#include "epoll.h" + +typedef struct FdInfo FdInfo; +typedef struct FdAction FdAction; + +struct FdInfo +{ +}; + +struct Context +{ + Engine *engine; + gpointer data; + GMutex *mutex; +}; + +typedef void (* EngineFdFunc) (const EPollEvent *event, gpointer data); + +struct FdAction +{ + int fd; + EPollEventType mask; + EngineFdFunc func; + gpointer data; +}; + +struct Engine +{ + gboolean quitting; + + GMutex * mutex; + + GCond * new_job_cond; + GCond * idle_cond; + GCond * exit_cond; + + GQueue * jobs; + + int n_threads; + int n_idle_threads; + int n_exited_threads; + + GQueue * fd_actions; + + /* polling */ + gboolean polling; + EPoll * epoll; + + int read_fd; + int write_fd; +}; + +static gpointer engine_thread (gpointer); + +/* + * Engine + */ + +static void +engine_lock (Engine *engine) +{ + g_mutex_lock (engine->mutex); +} + +static void +engine_unlock (Engine *engine) +{ + g_mutex_unlock (engine->mutex); +} + +static void +make_pipe (int *rd, int *wr) +{ + int fds[2]; + + if (pipe (fds) < 0) + g_error ("Could not create wake-up pipe\n"); + + *rd = fds[0]; + *wr = fds[1]; +} + +static void +poll_job (gpointer data) +{ + Engine *engine = data; + int i, n_events; + EPollEvent *events; + GList *list; + + engine_lock (engine); + + for (list = engine->fd_actions->head; list != NULL; list = list->next) + { + FdAction *action = list->data; + EPollEventType mask; + + } + + engine->polling = TRUE; + + engine_unlock (engine); + + events = epoll_wait (engine->epoll, &n_events, -1); + + for (i = 0; i < n_events; ++i) + { +#if 0 + FdInfo *info; +#endif + EPollEvent *event = &(events[i]); + +#if 0 + info = &(engine->fd_infos[event->fd]); +#endif + } + + g_free (events); + + engine_append_job (engine, poll_job, engine); +} + +Engine * +engine_new (guint n_threads) +{ + Engine *engine; + int i; + + g_return_val_if_fail (n_threads > 0, NULL); + + if (!g_thread_supported()) + g_thread_init (NULL); + + engine = g_new0 (Engine, 1); + + engine->mutex = g_mutex_new (); + engine->jobs = g_queue_new (); + + engine->new_job_cond = g_cond_new (); + engine->idle_cond = g_cond_new (); + engine->exit_cond = g_cond_new (); + engine->n_idle_threads = 0; + engine->n_exited_threads = 0; + engine->quitting = FALSE; + engine->n_threads = n_threads; + + engine->epoll = epoll_new (); + + engine->polling = FALSE; + + make_pipe (&engine->read_fd, &engine->write_fd); + epoll_add_fd (engine->epoll, engine->read_fd, + EPOLL_READ | EPOLL_ERROR, engine); + + engine->fd_actions = g_queue_new (); + + engine_lock (engine); + + for (i = 0; i < n_threads; ++i) + g_thread_create (engine_thread, engine, TRUE, NULL); + + engine_unlock (engine); + + engine_append_job (engine, poll_job, engine); + + return engine; +} + +static guint +engine_n_jobs (Engine *engine) +{ + g_assert (g_queue_get_length (engine->jobs) % 2 == 0); + + return (g_queue_get_length (engine->jobs)) / 2; +} + +/* Wait until all previously submitted jobs have finished */ +void +engine_sync (Engine *engine) +{ + engine_lock (engine); + + g_cond_broadcast (engine->new_job_cond); + + while (!(engine->n_idle_threads == engine->n_threads && + engine_n_jobs (engine) == 0)) + { + g_cond_wait (engine->idle_cond, engine->mutex); + } + + engine_unlock (engine); +} + +void +engine_free (Engine *engine) +{ + engine_lock (engine); + + engine->quitting = TRUE; + + g_cond_broadcast (engine->new_job_cond); + + while (engine->n_exited_threads != engine->n_threads) + g_cond_wait (engine->exit_cond, engine->mutex); + + engine_unlock (engine); + + while (!g_queue_is_empty (engine->jobs)) + g_queue_pop_head (engine->jobs); + + g_mutex_free (engine->mutex); + g_cond_free (engine->new_job_cond); + g_cond_free (engine->idle_cond); + g_cond_free (engine->exit_cond); + g_queue_free (engine->jobs); +} + +void +engine_append_job (Engine *engine, + EngineFunc job, + gpointer data) +{ + engine_lock (engine); + + g_queue_push_tail (engine->jobs, job); + g_queue_push_tail (engine->jobs, data); + + g_cond_signal (engine->new_job_cond); + + engine_unlock (engine); +} + +void +engine_prepend_job (Engine *engine, + EngineFunc job, + gpointer data) +{ + engine_lock (engine); + + g_queue_push_head (engine->jobs, data); + g_queue_push_head (engine->jobs, job); + + g_cond_signal (engine->new_job_cond); + + engine_unlock (engine); +} + +static gpointer +engine_thread (gpointer data) +{ + Engine *engine = data; + + engine_lock (engine); + + while (!engine->quitting) + { + if (!engine_n_jobs (engine)) + { + engine->n_idle_threads++; + + g_cond_signal (engine->idle_cond); + + g_cond_wait (engine->new_job_cond, engine->mutex); + + engine->n_idle_threads--; + } + + if (engine->quitting) + break; + + if (engine_n_jobs (engine)) + { + EngineFunc job; + gpointer data; + + job = g_queue_pop_head (engine->jobs); + data = g_queue_pop_head (engine->jobs); + + engine_unlock (engine); + + job (data); + + engine_lock (engine); + } + } + + engine->n_exited_threads++; + + g_cond_signal (engine->exit_cond); + + engine_unlock (engine); + + return NULL; +} + +static void +ensure_wake_up (Engine *engine) +{ + if (engine->polling) + { + char c = '!'; + + write (engine->write_fd, &c, 1); + engine->polling = FALSE; + } +} + +typedef struct +{ + int fd; + EngineFdFunc func; + gpointer data; +} EngineAddFd; + +typedef struct +{ + int fd; + EPollEventType mask; +} EngineChangeFd; + +typedef struct +{ + int fd; +} EngineRemoveFd; + +static void +engine_queue_add_fd (Engine *engine, + int fd, + EngineFdFunc func, + gpointer data) +{ + EngineAddFd *add_fd = g_new (EngineAddFd, 1); + + engine_lock (engine); + + add_fd->fd = fd; + add_fd->func = func; + add_fd->data = data; + + g_queue_push_tail (engine->fd_actions, add_fd); + + ensure_wake_up (engine); + + engine_unlock (engine); +} + +static void +engine_queue_change_fd (Engine *engine, + int fd, + EPollEventType mask) +{ + EngineChangeFd *change_fd = g_new (EngineChangeFd, 1); + + engine_lock (engine); + + change_fd->fd = fd; + change_fd->mask = mask; + + ensure_wake_up (engine); + + engine_unlock (engine); +} + +static void +engine_queue_remove_fd (Engine *engine, + int fd) +{ + EngineRemoveFd *remove_fd = g_new (EngineRemoveFd, 1); + + engine_lock (engine); + + remove_fd->fd = fd; + + ensure_wake_up (engine); + + engine_unlock (engine); +} + +/* + * Context + */ +static void +context_lock (Context *context) +{ + g_mutex_lock (context->mutex); +} + +static void +context_unlock (Context *context) +{ + g_mutex_unlock (context->mutex); +} + +Context * +context_new (Engine *engine, + gpointer data) +{ + Context *context = g_new0 (Context, 1); + + context->engine = engine; + context->data = data; + context->mutex = g_mutex_new (); + + return context; +} + +void +context_free (Context *context) +{ + context_lock (context); + + context_unlock (context); + + g_mutex_free (context->mutex); + g_free (context); +} + +Engine * +context_get_engine (Context *context) +{ + Engine *result; + + context_lock (context); + + result = context->engine; + + context_unlock (context); + + return result; +} + +gpointer +context_get_data (Context *context) +{ + gpointer result; + + context_lock (context); + + result = context->data; + + context_unlock (context); + + return result; +} + +static void +on_fd_event (const EPollEvent *event, + gpointer data) +{ + Context *context = data; + + context_lock (context); + + /* find fd in context */ + /* add relevant callbacks to queue */ + + context_unlock (context); +} + +void +context_add_fd (Context *context, + int fd) +{ + context_lock (context); + + engine_queue_add_fd (context->engine, fd, on_fd_event, context); + + context_unlock (context); +} + +void +context_remove_fd (Context *context, + int fd) +{ + context_lock (context); + + engine_queue_remove_fd (context->engine, fd); + + context_unlock (context); +} + +void +context_set_write (Context *context, + int fd, + ContextFdFunc callback) +{ + EPollEventType mask; + + context_lock (context); + + mask = 0; /* FIXME */ + + engine_queue_change_fd (context->engine, fd, mask); + + context_unlock (context); +} + +void +context_set_read (Context *context, + int fd, + ContextFdFunc callback) +{ + EPollEventType mask; + + context_lock (context); + + mask = 0; /* FIXME */ + + engine_queue_change_fd (context->engine, fd, mask); + + context_unlock (context); +} |