summaryrefslogtreecommitdiff
path: root/engine.c
diff options
context:
space:
mode:
Diffstat (limited to 'engine.c')
-rw-r--r--engine.c514
1 files changed, 514 insertions, 0 deletions
diff --git a/engine.c b/engine.c
new file mode 100644
index 0000000..4da6ff4
--- /dev/null
+++ b/engine.c
@@ -0,0 +1,514 @@
+#include <unistd.h>
+#include "engine.h"
+#include "epoll.h"
+
+typedef struct FdInfo FdInfo;
+typedef struct FdAction FdAction;
+
+struct FdInfo
+{
+};
+
+struct Context
+{
+ Engine *engine;
+ gpointer data;
+ GMutex *mutex;
+};
+
+typedef void (* EngineFdFunc) (const EPollEvent *event, gpointer data);
+
+struct FdAction
+{
+ int fd;
+ EPollEventType mask;
+ EngineFdFunc func;
+ gpointer data;
+};
+
+struct Engine
+{
+ gboolean quitting;
+
+ GMutex * mutex;
+
+ GCond * new_job_cond;
+ GCond * idle_cond;
+ GCond * exit_cond;
+
+ GQueue * jobs;
+
+ int n_threads;
+ int n_idle_threads;
+ int n_exited_threads;
+
+ GQueue * fd_actions;
+
+ /* polling */
+ gboolean polling;
+ EPoll * epoll;
+
+ int read_fd;
+ int write_fd;
+};
+
+static gpointer engine_thread (gpointer);
+
+/*
+ * Engine
+ */
+
+static void
+engine_lock (Engine *engine)
+{
+ g_mutex_lock (engine->mutex);
+}
+
+static void
+engine_unlock (Engine *engine)
+{
+ g_mutex_unlock (engine->mutex);
+}
+
+static void
+make_pipe (int *rd, int *wr)
+{
+ int fds[2];
+
+ if (pipe (fds) < 0)
+ g_error ("Could not create wake-up pipe\n");
+
+ *rd = fds[0];
+ *wr = fds[1];
+}
+
+static void
+poll_job (gpointer data)
+{
+ Engine *engine = data;
+ int i, n_events;
+ EPollEvent *events;
+ GList *list;
+
+ engine_lock (engine);
+
+ for (list = engine->fd_actions->head; list != NULL; list = list->next)
+ {
+ FdAction *action = list->data;
+ EPollEventType mask;
+
+ }
+
+ engine->polling = TRUE;
+
+ engine_unlock (engine);
+
+ events = epoll_wait (engine->epoll, &n_events, -1);
+
+ for (i = 0; i < n_events; ++i)
+ {
+#if 0
+ FdInfo *info;
+#endif
+ EPollEvent *event = &(events[i]);
+
+#if 0
+ info = &(engine->fd_infos[event->fd]);
+#endif
+ }
+
+ g_free (events);
+
+ engine_append_job (engine, poll_job, engine);
+}
+
+Engine *
+engine_new (guint n_threads)
+{
+ Engine *engine;
+ int i;
+
+ g_return_val_if_fail (n_threads > 0, NULL);
+
+ if (!g_thread_supported())
+ g_thread_init (NULL);
+
+ engine = g_new0 (Engine, 1);
+
+ engine->mutex = g_mutex_new ();
+ engine->jobs = g_queue_new ();
+
+ engine->new_job_cond = g_cond_new ();
+ engine->idle_cond = g_cond_new ();
+ engine->exit_cond = g_cond_new ();
+ engine->n_idle_threads = 0;
+ engine->n_exited_threads = 0;
+ engine->quitting = FALSE;
+ engine->n_threads = n_threads;
+
+ engine->epoll = epoll_new ();
+
+ engine->polling = FALSE;
+
+ make_pipe (&engine->read_fd, &engine->write_fd);
+ epoll_add_fd (engine->epoll, engine->read_fd,
+ EPOLL_READ | EPOLL_ERROR, engine);
+
+ engine->fd_actions = g_queue_new ();
+
+ engine_lock (engine);
+
+ for (i = 0; i < n_threads; ++i)
+ g_thread_create (engine_thread, engine, TRUE, NULL);
+
+ engine_unlock (engine);
+
+ engine_append_job (engine, poll_job, engine);
+
+ return engine;
+}
+
+static guint
+engine_n_jobs (Engine *engine)
+{
+ g_assert (g_queue_get_length (engine->jobs) % 2 == 0);
+
+ return (g_queue_get_length (engine->jobs)) / 2;
+}
+
+/* Wait until all previously submitted jobs have finished */
+void
+engine_sync (Engine *engine)
+{
+ engine_lock (engine);
+
+ g_cond_broadcast (engine->new_job_cond);
+
+ while (!(engine->n_idle_threads == engine->n_threads &&
+ engine_n_jobs (engine) == 0))
+ {
+ g_cond_wait (engine->idle_cond, engine->mutex);
+ }
+
+ engine_unlock (engine);
+}
+
+void
+engine_free (Engine *engine)
+{
+ engine_lock (engine);
+
+ engine->quitting = TRUE;
+
+ g_cond_broadcast (engine->new_job_cond);
+
+ while (engine->n_exited_threads != engine->n_threads)
+ g_cond_wait (engine->exit_cond, engine->mutex);
+
+ engine_unlock (engine);
+
+ while (!g_queue_is_empty (engine->jobs))
+ g_queue_pop_head (engine->jobs);
+
+ g_mutex_free (engine->mutex);
+ g_cond_free (engine->new_job_cond);
+ g_cond_free (engine->idle_cond);
+ g_cond_free (engine->exit_cond);
+ g_queue_free (engine->jobs);
+}
+
+void
+engine_append_job (Engine *engine,
+ EngineFunc job,
+ gpointer data)
+{
+ engine_lock (engine);
+
+ g_queue_push_tail (engine->jobs, job);
+ g_queue_push_tail (engine->jobs, data);
+
+ g_cond_signal (engine->new_job_cond);
+
+ engine_unlock (engine);
+}
+
+void
+engine_prepend_job (Engine *engine,
+ EngineFunc job,
+ gpointer data)
+{
+ engine_lock (engine);
+
+ g_queue_push_head (engine->jobs, data);
+ g_queue_push_head (engine->jobs, job);
+
+ g_cond_signal (engine->new_job_cond);
+
+ engine_unlock (engine);
+}
+
+static gpointer
+engine_thread (gpointer data)
+{
+ Engine *engine = data;
+
+ engine_lock (engine);
+
+ while (!engine->quitting)
+ {
+ if (!engine_n_jobs (engine))
+ {
+ engine->n_idle_threads++;
+
+ g_cond_signal (engine->idle_cond);
+
+ g_cond_wait (engine->new_job_cond, engine->mutex);
+
+ engine->n_idle_threads--;
+ }
+
+ if (engine->quitting)
+ break;
+
+ if (engine_n_jobs (engine))
+ {
+ EngineFunc job;
+ gpointer data;
+
+ job = g_queue_pop_head (engine->jobs);
+ data = g_queue_pop_head (engine->jobs);
+
+ engine_unlock (engine);
+
+ job (data);
+
+ engine_lock (engine);
+ }
+ }
+
+ engine->n_exited_threads++;
+
+ g_cond_signal (engine->exit_cond);
+
+ engine_unlock (engine);
+
+ return NULL;
+}
+
+static void
+ensure_wake_up (Engine *engine)
+{
+ if (engine->polling)
+ {
+ char c = '!';
+
+ write (engine->write_fd, &c, 1);
+ engine->polling = FALSE;
+ }
+}
+
+typedef struct
+{
+ int fd;
+ EngineFdFunc func;
+ gpointer data;
+} EngineAddFd;
+
+typedef struct
+{
+ int fd;
+ EPollEventType mask;
+} EngineChangeFd;
+
+typedef struct
+{
+ int fd;
+} EngineRemoveFd;
+
+static void
+engine_queue_add_fd (Engine *engine,
+ int fd,
+ EngineFdFunc func,
+ gpointer data)
+{
+ EngineAddFd *add_fd = g_new (EngineAddFd, 1);
+
+ engine_lock (engine);
+
+ add_fd->fd = fd;
+ add_fd->func = func;
+ add_fd->data = data;
+
+ g_queue_push_tail (engine->fd_actions, add_fd);
+
+ ensure_wake_up (engine);
+
+ engine_unlock (engine);
+}
+
+static void
+engine_queue_change_fd (Engine *engine,
+ int fd,
+ EPollEventType mask)
+{
+ EngineChangeFd *change_fd = g_new (EngineChangeFd, 1);
+
+ engine_lock (engine);
+
+ change_fd->fd = fd;
+ change_fd->mask = mask;
+
+ ensure_wake_up (engine);
+
+ engine_unlock (engine);
+}
+
+static void
+engine_queue_remove_fd (Engine *engine,
+ int fd)
+{
+ EngineRemoveFd *remove_fd = g_new (EngineRemoveFd, 1);
+
+ engine_lock (engine);
+
+ remove_fd->fd = fd;
+
+ ensure_wake_up (engine);
+
+ engine_unlock (engine);
+}
+
+/*
+ * Context
+ */
+static void
+context_lock (Context *context)
+{
+ g_mutex_lock (context->mutex);
+}
+
+static void
+context_unlock (Context *context)
+{
+ g_mutex_unlock (context->mutex);
+}
+
+Context *
+context_new (Engine *engine,
+ gpointer data)
+{
+ Context *context = g_new0 (Context, 1);
+
+ context->engine = engine;
+ context->data = data;
+ context->mutex = g_mutex_new ();
+
+ return context;
+}
+
+void
+context_free (Context *context)
+{
+ context_lock (context);
+
+ context_unlock (context);
+
+ g_mutex_free (context->mutex);
+ g_free (context);
+}
+
+Engine *
+context_get_engine (Context *context)
+{
+ Engine *result;
+
+ context_lock (context);
+
+ result = context->engine;
+
+ context_unlock (context);
+
+ return result;
+}
+
+gpointer
+context_get_data (Context *context)
+{
+ gpointer result;
+
+ context_lock (context);
+
+ result = context->data;
+
+ context_unlock (context);
+
+ return result;
+}
+
+static void
+on_fd_event (const EPollEvent *event,
+ gpointer data)
+{
+ Context *context = data;
+
+ context_lock (context);
+
+ /* find fd in context */
+ /* add relevant callbacks to queue */
+
+ context_unlock (context);
+}
+
+void
+context_add_fd (Context *context,
+ int fd)
+{
+ context_lock (context);
+
+ engine_queue_add_fd (context->engine, fd, on_fd_event, context);
+
+ context_unlock (context);
+}
+
+void
+context_remove_fd (Context *context,
+ int fd)
+{
+ context_lock (context);
+
+ engine_queue_remove_fd (context->engine, fd);
+
+ context_unlock (context);
+}
+
+void
+context_set_write (Context *context,
+ int fd,
+ ContextFdFunc callback)
+{
+ EPollEventType mask;
+
+ context_lock (context);
+
+ mask = 0; /* FIXME */
+
+ engine_queue_change_fd (context->engine, fd, mask);
+
+ context_unlock (context);
+}
+
+void
+context_set_read (Context *context,
+ int fd,
+ ContextFdFunc callback)
+{
+ EPollEventType mask;
+
+ context_lock (context);
+
+ mask = 0; /* FIXME */
+
+ engine_queue_change_fd (context->engine, fd, mask);
+
+ context_unlock (context);
+}