summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSøren Sandmann <sandmann@redhat.com>2009-08-30 21:50:51 -0400
committerSøren Sandmann <sandmann@redhat.com>2009-08-30 21:50:51 -0400
commite947945d98e54fbd72c7764418b6556c7df7c7cc (patch)
tree88854705a9aa8b78a65f130f71ad3b1a5b89031b
Initial checkinHEADmaster
-rw-r--r--TODO138
-rwxr-xr-xbuild.sh3
-rw-r--r--bytequeue.c191
-rw-r--r--bytequeue.h63
-rw-r--r--connection.h43
-rw-r--r--engine.c514
-rw-r--r--engine.h71
-rw-r--r--epoll.c192
-rw-r--r--epoll.h41
-rw-r--r--executor.c178
-rw-r--r--executor.h18
-rw-r--r--hest.c52
-rw-r--r--http-server.c59
-rw-r--r--jobqueue.c180
-rw-r--r--jobqueue.h26
-rw-r--r--locking21
-rw-r--r--main.c7
-rw-r--r--maincontext.c175
-rw-r--r--maincontext.h47
-rw-r--r--mainloop.c142
-rw-r--r--mainloop.h67
-rw-r--r--mainloop2.c238
-rw-r--r--mainloop2.h67
-rw-r--r--test.c60
24 files changed, 2593 insertions, 0 deletions
diff --git a/TODO b/TODO
new file mode 100644
index 0000000..91007e2
--- /dev/null
+++ b/TODO
@@ -0,0 +1,138 @@
+New, simpler design:
+
+There is an 'Engine' which is both Executor and Mainloop. It uses an
+Epoll object to handle the polling. This epoll object does not do any
+locking by itself.
+
+Contexts can be created; they are like jobqueues, except they also
+allow you to add fd's and idles and timeouts. They can do that since
+they have access to the engine. These are completely unlocked; if you
+want to access a context from more than one thread, you must lock them
+yourself. It is guaranteed that only one callback will be active for a
+context at the same time. From within that callback it is then safe to
+remove other callbacks etc. In general such a context can act pretty
+much like one thread.
+
+A web server will consist of a listening context, which will create
+other contexts as necessary and add the fd's to them with the
+appropriate callback. Or maybe just call back with the new fd. I guess
+there should be a generic listener object that will create its own
+context and call back. Then an http module can just create a listener
+and do whatever it wants in response.
+
+One possible problem. The thread that creates a context and adds the
+fd not necessarily the one that will execute the first callback. Can
+these interfere? Maybe the solution is just that the creating thread
+should not do anything with the context after it adds the fd. A
+better idea may be to just have a context_dispatch() function that you
+must call after creating the context. Until this is called, nothing
+the context is not active.
+
+Beginning of this new design is implemented and can be built with
+build2.sh.
+
+
+
+Old design (referenced above)
+
+Highlevel design for a web server:
+
+ - Executor
+ takes callbacks and executes them, possibly in parallell
+ or out of order. The callbacks are likely to be called in
+ a different thread than the one that queued them.
+
+ Should support both push_back() and push_front(). Push_front()
+ is needed to do parallel requests with good latency. If
+ a request can be parallellized, the parallel tasks should
+ be put at the front of the queue.
+
+ - EPoll
+ Simple class that wraps epoll()/poll(). Note: must support
+ oneshot polling and rearming.
+
+ - JobQueue
+ Maintain a queue of jobs, uses executor to execute them
+ in order. Two jobs in the same JobQueue will never run
+ at the same time.
+
+ - Mainloop
+ Uses executor, epoll.
+ - Takes filedescriptors and callbacks. calls
+ back when descriptor is readable/writable/etc.
+ - The callbacks are put on a job queue, which is passed in
+ - Each callback is a oneshot - ie., after calling, the
+ filedescriptor is not polled again until it is rearmed.
+ The mainloop will have a method to rearm filedescriptors.
+ This ensures that the mainloop can start a new poll()
+ whenever it wants without waiting for all the callbacks
+ to finish.
+ - Also takes timeouts that can be canceled. Timeouts also
+ need to be put on a queue.
+
+ Sketch of implementation:
+
+ polljob()
+ {
+ timeout = compute min_timeout ();
+ if (timeout > 0)
+ poll_armed_describtors(timeout);
+ call all timeouts (ie., put them on queues);
+ call all callbacks (ie., put them on queues);
+ schedule (polljob);
+ }
+
+ - MainContext
+
+ - The thing clients will deal with
+
+ - Uses the main loop
+
+ - Filedescriptors
+ maincontext is responsible for rearming the
+ descriptor after the client callback has been
+ called.
+
+ - Timeouts
+
+ - Idle handlers
+
+ - Everything associated with a main context happens
+ serialized - ie., as if only one thread executed it.
+
+ - This means a client structure doesn't need to be locked.
+
+ - Has a get_executor() method so that clients can parallelize
+ if they want to.
+
+ - Current thinking is that if we have main contexts, who
+ really needs a main loop? The only thing you could do
+ with it is to pass it to main contexts.
+
+ OTOH that's true of several of the objects here.
+ see notes at top of maincontext.c
+
+ - Worth noting that stuff that has to be passed in to
+ create a client object must be available to the
+ listener callback. But see http_server.c for an example
+
+ - Err, the epoll in itself is not enough for maincontext,
+ since it needs to be shared between maincontexts. Ie.,
+ who would call epoll_wait(). We do really need a main
+ loop that will call back.
+
+ - Listener
+ Listens on a port. Calls back with a file descriptor when
+ someone connects.
+
+ - Connection
+ initialized with a MainContext and a filedescriptor.
+ Creates events when something happens
+
+ - Http:
+ Has a Connection
+ parses http, emits events such as
+ "get hostname pagename query"
+ "post etc etc etc"
+
+ - ContentProvider
diff --git a/build.sh b/build.sh
new file mode 100755
index 0000000..43e482f
--- /dev/null
+++ b/build.sh
@@ -0,0 +1,3 @@
+gcc -Wall `pkg-config --cflags --libs glib-2.0 gthread-2.0 lac-1.0` \
+ epoll.c jobqueue.c executor.c mainloop.c \
+ maincontext.c bytequeue.c test.c
diff --git a/bytequeue.c b/bytequeue.c
new file mode 100644
index 0000000..538ba42
--- /dev/null
+++ b/bytequeue.c
@@ -0,0 +1,191 @@
+#include <string.h>
+
+#include "bytequeue.h"
+
+struct ByteQueue
+{
+ gsize segment_size;
+ guint8 * segment;
+ guint8 * start;
+ guint8 * end;
+};
+
+ByteQueue *
+byte_queue_new (void)
+{
+ ByteQueue *queue = g_new0 (ByteQueue, 1);
+
+ queue->segment_size = 0;
+ queue->segment = NULL;
+ queue->start = NULL;
+ queue->end = NULL;
+
+ return queue;
+}
+
+guint8 *
+byte_queue_free (ByteQueue *queue,
+ gboolean free_data)
+{
+ guint8 *result;
+
+ if (free_data)
+ {
+ g_free (queue->segment);
+
+ result = NULL;
+ }
+ else
+ {
+ memmove (queue->segment, queue->start, queue->end - queue->start);
+
+ result = queue->segment;
+ }
+
+ g_free (queue);
+
+ return result;
+}
+
+/* The data returned is owned by the byte queue and becomes invalid
+ * as soon as any method is called on the queue. It is explicitly
+ * allowed to push the returned data back into the queue, and indeed
+ * in that case the queue will avoid copying if it can. The push
+ * must be the first method called on the queue after the read.
+ */
+const guint8 *
+byte_queue_get_data (ByteQueue *queue,
+ gsize *n_bytes)
+{
+ guint8 *result;
+
+ if (n_bytes)
+ *n_bytes = queue->end - queue->start;
+
+ result = queue->start;
+
+ queue->start = queue->segment;
+ queue->end = queue->segment;
+
+ return result;
+}
+
+static gboolean
+in_segment (ByteQueue *queue,
+ const guint8 *bytes,
+ gsize n_bytes)
+{
+ return bytes >= queue->segment && bytes + n_bytes < queue->end;
+}
+
+static gboolean
+is_empty (ByteQueue *queue)
+{
+ return queue->start == queue->end;
+}
+
+static gsize
+power_of_two_bigger_than (gsize n)
+{
+ gsize result = 1;
+
+ while (result <= n)
+ result *= 2;
+
+ return result;
+}
+
+static void
+ensure_room (ByteQueue *queue,
+ gsize extra)
+{
+ gsize old_data_size = queue->end - queue->start;
+ gsize new_data_size = old_data_size + extra;
+
+ if (queue->end + new_data_size > queue->segment + queue->segment_size)
+ {
+ gsize new_segment_size = power_of_two_bigger_than (2 * new_data_size);
+
+ memmove (queue->start, queue->segment, old_data_size);
+
+ if (new_segment_size > queue->segment_size)
+ {
+ queue->segment_size = new_segment_size;
+ queue->segment = g_realloc (queue->segment, new_segment_size);
+ }
+
+ queue->start = queue->segment;
+ queue->end = queue->start + new_data_size;
+ }
+}
+
+guint8 *
+byte_queue_alloc_tail (ByteQueue *queue,
+ gsize size)
+{
+ ensure_room (queue, size);
+
+ queue->end += size;
+
+ return queue->end - size;
+}
+
+void
+byte_queue_delete_tail (ByteQueue *queue,
+ gsize size)
+{
+ if (queue->end - queue->start < size)
+ queue->end = queue->start;
+ else
+ queue->end -= size;
+}
+
+void
+byte_queue_append (ByteQueue *queue,
+ const guint8 *bytes,
+ gsize n_bytes)
+{
+ if (in_segment (queue, bytes, n_bytes) && is_empty (queue))
+ {
+ queue->start = (guint8 *)bytes;
+ queue->end = (guint8 *)bytes + n_bytes;
+ }
+ else
+ {
+ guint8 *tail = byte_queue_alloc_tail (queue, n_bytes);
+
+ memcpy (tail, bytes, n_bytes);
+ }
+}
+
+/* Transfer data from @src to @dest, if possible without copying.
+ * The data is appended to @dest's data if @dest is not empty
+ */
+void
+byte_queue_steal_data (ByteQueue *dest,
+ ByteQueue *src)
+{
+ if (is_empty (dest))
+ {
+ if (dest->segment)
+ g_free (dest->segment);
+
+ dest->segment_size = src->segment_size;
+ dest->segment = src->segment;
+ dest->start = src->start;
+ dest->end = src->end;
+
+ src->segment_size = 0;
+ src->segment = NULL;
+ src->start = NULL;
+ src->end = NULL;
+ }
+ else
+ {
+ const guint8 *data;
+ gsize size;
+
+ data = byte_queue_get_data (src, &size);
+ byte_queue_append (dest, data, size);
+ }
+}
diff --git a/bytequeue.h b/bytequeue.h
new file mode 100644
index 0000000..c789a9a
--- /dev/null
+++ b/bytequeue.h
@@ -0,0 +1,63 @@
+#ifndef _BYTEQUEUE_H
+#define _BYTEQUEUE_H
+
+#include <glib.h>
+
+typedef struct ByteQueue ByteQueue;
+
+ByteQueue *byte_queue_new (void);
+guint8 *byte_queue_free (ByteQueue *queue,
+ gboolean free_data);
+gsize byte_queue_get_length (ByteQueue *queue);
+
+/* The data returned is owned by the byte queue and becomes invalid
+ * as soon as any method is called on the queue. It is explicitly
+ * allowed to call byte_queue_append() with the returned data, and
+ * in that case the queue will avoid copying if it can. The append
+ * must be the first method called on the queue after the read.
+ */
+const guint8 *byte_queue_get_data (ByteQueue *queue,
+ gsize *n_bytes);
+void byte_queue_put_back (ByteQueue *queue,
+ const guint8 *bytes,
+ gsize n_bytes);
+void byte_queue_append (ByteQueue *queue,
+ const guint8 *bytes,
+ gsize n_bytes);
+/* This function appends uninitialized data of length @size
+ * to the queue, then returns a pointer to the added data.
+ * The intention is that the app can read() into this
+ * memory, then use byte_queue_pop_tail() to delete the
+ * area that wasn't read into. Example
+ *
+ * guint8 *area = byte_queue_alloc_tail (queue, 8192);
+ *
+ * n_read = read (fd, area, 8192);
+ *
+ * byte_queue_delete_tail (queue, 8192 - (n_read < 0)? 0 : n_read);
+ *
+ * if (n_read < 0)
+ * {
+ * byte_queue_delete_tail (queue, 8192);
+ * handle_error();
+ * n_read = 0;
+ * }
+ * else
+ * {
+ * byte_queue_delete_tail (8192 - n_read);
+ *
+ * // enjoy the new data in the queue
+ * }
+ */
+
+guint8 *byte_queue_alloc_tail (ByteQueue *queue,
+ gsize size);
+void byte_queue_delete_tail (ByteQueue *queue,
+ gsize size);
+
+/* Transfer data from @src to @dest without copying
+ */
+void byte_queue_steal_data (ByteQueue *dest,
+ ByteQueue *src);
+
+#endif
diff --git a/connection.h b/connection.h
new file mode 100644
index 0000000..9d6322b
--- /dev/null
+++ b/connection.h
@@ -0,0 +1,43 @@
+#ifndef _CONNECTION_H_
+#define _CONNECTION_H_
+
+typedef struct Connection Connection;
+typedef union ConnectionEvent ConnectionEvent;
+
+typedef enum
+{
+ CONNECTION_READ,
+ CONNECTION_CLOSE,
+ CONNECTION_ERROR
+} ConnectionEventType;
+
+union ConnectionEvent
+{
+ ConnectionEventType type;
+
+ struct
+ {
+ ConnectionEventType type;
+ } read;
+
+ struct
+ {
+ ConnectionEventType type;
+ } close;
+
+ struct
+ {
+ ConnectionEventType type;
+ } error;
+};
+
+typedef void (* ConnectionFunc) (Connection *connection,
+ ConnectionEvent *event);
+
+Connection *connection_new (int fd,
+ MainContext *context,
+ ConnectionFunc func,
+ gpointer data);
+
+
+#endif
diff --git a/engine.c b/engine.c
new file mode 100644
index 0000000..4da6ff4
--- /dev/null
+++ b/engine.c
@@ -0,0 +1,514 @@
+#include <unistd.h>
+#include "engine.h"
+#include "epoll.h"
+
+typedef struct FdInfo FdInfo;
+typedef struct FdAction FdAction;
+
+struct FdInfo
+{
+};
+
+struct Context
+{
+ Engine *engine;
+ gpointer data;
+ GMutex *mutex;
+};
+
+typedef void (* EngineFdFunc) (const EPollEvent *event, gpointer data);
+
+struct FdAction
+{
+ int fd;
+ EPollEventType mask;
+ EngineFdFunc func;
+ gpointer data;
+};
+
+struct Engine
+{
+ gboolean quitting;
+
+ GMutex * mutex;
+
+ GCond * new_job_cond;
+ GCond * idle_cond;
+ GCond * exit_cond;
+
+ GQueue * jobs;
+
+ int n_threads;
+ int n_idle_threads;
+ int n_exited_threads;
+
+ GQueue * fd_actions;
+
+ /* polling */
+ gboolean polling;
+ EPoll * epoll;
+
+ int read_fd;
+ int write_fd;
+};
+
+static gpointer engine_thread (gpointer);
+
+/*
+ * Engine
+ */
+
+static void
+engine_lock (Engine *engine)
+{
+ g_mutex_lock (engine->mutex);
+}
+
+static void
+engine_unlock (Engine *engine)
+{
+ g_mutex_unlock (engine->mutex);
+}
+
+static void
+make_pipe (int *rd, int *wr)
+{
+ int fds[2];
+
+ if (pipe (fds) < 0)
+ g_error ("Could not create wake-up pipe\n");
+
+ *rd = fds[0];
+ *wr = fds[1];
+}
+
+static void
+poll_job (gpointer data)
+{
+ Engine *engine = data;
+ int i, n_events;
+ EPollEvent *events;
+ GList *list;
+
+ engine_lock (engine);
+
+ for (list = engine->fd_actions->head; list != NULL; list = list->next)
+ {
+ FdAction *action = list->data;
+ EPollEventType mask;
+
+ }
+
+ engine->polling = TRUE;
+
+ engine_unlock (engine);
+
+ events = epoll_wait (engine->epoll, &n_events, -1);
+
+ for (i = 0; i < n_events; ++i)
+ {
+#if 0
+ FdInfo *info;
+#endif
+ EPollEvent *event = &(events[i]);
+
+#if 0
+ info = &(engine->fd_infos[event->fd]);
+#endif
+ }
+
+ g_free (events);
+
+ engine_append_job (engine, poll_job, engine);
+}
+
+Engine *
+engine_new (guint n_threads)
+{
+ Engine *engine;
+ int i;
+
+ g_return_val_if_fail (n_threads > 0, NULL);
+
+ if (!g_thread_supported())
+ g_thread_init (NULL);
+
+ engine = g_new0 (Engine, 1);
+
+ engine->mutex = g_mutex_new ();
+ engine->jobs = g_queue_new ();
+
+ engine->new_job_cond = g_cond_new ();
+ engine->idle_cond = g_cond_new ();
+ engine->exit_cond = g_cond_new ();
+ engine->n_idle_threads = 0;
+ engine->n_exited_threads = 0;
+ engine->quitting = FALSE;
+ engine->n_threads = n_threads;
+
+ engine->epoll = epoll_new ();
+
+ engine->polling = FALSE;
+
+ make_pipe (&engine->read_fd, &engine->write_fd);
+ epoll_add_fd (engine->epoll, engine->read_fd,
+ EPOLL_READ | EPOLL_ERROR, engine);
+
+ engine->fd_actions = g_queue_new ();
+
+ engine_lock (engine);
+
+ for (i = 0; i < n_threads; ++i)
+ g_thread_create (engine_thread, engine, TRUE, NULL);
+
+ engine_unlock (engine);
+
+ engine_append_job (engine, poll_job, engine);
+
+ return engine;
+}
+
+static guint
+engine_n_jobs (Engine *engine)
+{
+ g_assert (g_queue_get_length (engine->jobs) % 2 == 0);
+
+ return (g_queue_get_length (engine->jobs)) / 2;
+}
+
+/* Wait until all previously submitted jobs have finished */
+void
+engine_sync (Engine *engine)
+{
+ engine_lock (engine);
+
+ g_cond_broadcast (engine->new_job_cond);
+
+ while (!(engine->n_idle_threads == engine->n_threads &&
+ engine_n_jobs (engine) == 0))
+ {
+ g_cond_wait (engine->idle_cond, engine->mutex);
+ }
+
+ engine_unlock (engine);
+}
+
+void
+engine_free (Engine *engine)
+{
+ engine_lock (engine);
+
+ engine->quitting = TRUE;
+
+ g_cond_broadcast (engine->new_job_cond);
+
+ while (engine->n_exited_threads != engine->n_threads)
+ g_cond_wait (engine->exit_cond, engine->mutex);
+
+ engine_unlock (engine);
+
+ while (!g_queue_is_empty (engine->jobs))
+ g_queue_pop_head (engine->jobs);
+
+ g_mutex_free (engine->mutex);
+ g_cond_free (engine->new_job_cond);
+ g_cond_free (engine->idle_cond);
+ g_cond_free (engine->exit_cond);
+ g_queue_free (engine->jobs);
+}
+
+void
+engine_append_job (Engine *engine,
+ EngineFunc job,
+ gpointer data)
+{
+ engine_lock (engine);
+
+ g_queue_push_tail (engine->jobs, job);
+ g_queue_push_tail (engine->jobs, data);
+
+ g_cond_signal (engine->new_job_cond);
+
+ engine_unlock (engine);
+}
+
+void
+engine_prepend_job (Engine *engine,
+ EngineFunc job,
+ gpointer data)
+{
+ engine_lock (engine);
+
+ g_queue_push_head (engine->jobs, data);
+ g_queue_push_head (engine->jobs, job);
+
+ g_cond_signal (engine->new_job_cond);
+
+ engine_unlock (engine);
+}
+
+static gpointer
+engine_thread (gpointer data)
+{
+ Engine *engine = data;
+
+ engine_lock (engine);
+
+ while (!engine->quitting)
+ {
+ if (!engine_n_jobs (engine))
+ {
+ engine->n_idle_threads++;
+
+ g_cond_signal (engine->idle_cond);
+
+ g_cond_wait (engine->new_job_cond, engine->mutex);
+
+ engine->n_idle_threads--;
+ }
+
+ if (engine->quitting)
+ break;
+
+ if (engine_n_jobs (engine))
+ {
+ EngineFunc job;
+ gpointer data;
+
+ job = g_queue_pop_head (engine->jobs);
+ data = g_queue_pop_head (engine->jobs);
+
+ engine_unlock (engine);
+
+ job (data);
+
+ engine_lock (engine);
+ }
+ }
+
+ engine->n_exited_threads++;
+
+ g_cond_signal (engine->exit_cond);
+
+ engine_unlock (engine);
+
+ return NULL;
+}
+
+static void
+ensure_wake_up (Engine *engine)
+{
+ if (engine->polling)
+ {
+ char c = '!';
+
+ write (engine->write_fd, &c, 1);
+ engine->polling = FALSE;
+ }
+}
+
+typedef struct
+{
+ int fd;
+ EngineFdFunc func;
+ gpointer data;
+} EngineAddFd;
+
+typedef struct
+{
+ int fd;
+ EPollEventType mask;
+} EngineChangeFd;
+
+typedef struct
+{
+ int fd;
+} EngineRemoveFd;
+
+static void
+engine_queue_add_fd (Engine *engine,
+ int fd,
+ EngineFdFunc func,
+ gpointer data)
+{
+ EngineAddFd *add_fd = g_new (EngineAddFd, 1);
+
+ engine_lock (engine);
+
+ add_fd->fd = fd;
+ add_fd->func = func;
+ add_fd->data = data;
+
+ g_queue_push_tail (engine->fd_actions, add_fd);
+
+ ensure_wake_up (engine);
+
+ engine_unlock (engine);
+}
+
+static void
+engine_queue_change_fd (Engine *engine,
+ int fd,
+ EPollEventType mask)
+{
+ EngineChangeFd *change_fd = g_new (EngineChangeFd, 1);
+
+ engine_lock (engine);
+
+ change_fd->fd = fd;
+ change_fd->mask = mask;
+
+ ensure_wake_up (engine);
+
+ engine_unlock (engine);
+}
+
+static void
+engine_queue_remove_fd (Engine *engine,
+ int fd)
+{
+ EngineRemoveFd *remove_fd = g_new (EngineRemoveFd, 1);
+
+ engine_lock (engine);
+
+ remove_fd->fd = fd;
+
+ ensure_wake_up (engine);
+
+ engine_unlock (engine);
+}
+
+/*
+ * Context
+ */
+static void
+context_lock (Context *context)
+{
+ g_mutex_lock (context->mutex);
+}
+
+static void
+context_unlock (Context *context)
+{
+ g_mutex_unlock (context->mutex);
+}
+
+Context *
+context_new (Engine *engine,
+ gpointer data)
+{
+ Context *context = g_new0 (Context, 1);
+
+ context->engine = engine;
+ context->data = data;
+ context->mutex = g_mutex_new ();
+
+ return context;
+}
+
+void
+context_free (Context *context)
+{
+ context_lock (context);
+
+ context_unlock (context);
+
+ g_mutex_free (context->mutex);
+ g_free (context);
+}
+
+Engine *
+context_get_engine (Context *context)
+{
+ Engine *result;
+
+ context_lock (context);
+
+ result = context->engine;
+
+ context_unlock (context);
+
+ return result;
+}
+
+gpointer
+context_get_data (Context *context)
+{
+ gpointer result;
+
+ context_lock (context);
+
+ result = context->data;
+
+ context_unlock (context);
+
+ return result;
+}
+
+static void
+on_fd_event (const EPollEvent *event,
+ gpointer data)
+{
+ Context *context = data;
+
+ context_lock (context);
+
+ /* find fd in context */
+ /* add relevant callbacks to queue */
+
+ context_unlock (context);
+}
+
+void
+context_add_fd (Context *context,
+ int fd)
+{
+ context_lock (context);
+
+ engine_queue_add_fd (context->engine, fd, on_fd_event, context);
+
+ context_unlock (context);
+}
+
+void
+context_remove_fd (Context *context,
+ int fd)
+{
+ context_lock (context);
+
+ engine_queue_remove_fd (context->engine, fd);
+
+ context_unlock (context);
+}
+
+void
+context_set_write (Context *context,
+ int fd,
+ ContextFdFunc callback)
+{
+ EPollEventType mask;
+
+ context_lock (context);
+
+ mask = 0; /* FIXME */
+
+ engine_queue_change_fd (context->engine, fd, mask);
+
+ context_unlock (context);
+}
+
+void
+context_set_read (Context *context,
+ int fd,
+ ContextFdFunc callback)
+{
+ EPollEventType mask;
+
+ context_lock (context);
+
+ mask = 0; /* FIXME */
+
+ engine_queue_change_fd (context->engine, fd, mask);
+
+ context_unlock (context);
+}
diff --git a/engine.h b/engine.h
new file mode 100644
index 0000000..b590053
--- /dev/null
+++ b/engine.h
@@ -0,0 +1,71 @@
+#include <glib.h>
+
+typedef struct Engine Engine;
+
+typedef void (* EngineFunc) (gpointer data);
+
+Engine *engine_new (guint n_threads);
+void engine_prepend_job (Engine *engine,
+ EngineFunc func,
+ gpointer data);
+void engine_append_job (Engine *engine,
+ EngineFunc func,
+ gpointer data);
+void engine_sync (Engine *engine);
+void engine_free (Engine *engine);
+
+/* All functions executed within an Context
+ * are run serialized, ie., two functions from the
+ * same context never run at the same time.
+ *
+ * So if the data those functions operate on (typically
+ * a client structure) are only ever used by functions queued
+ * through the context, no locking is necessary.
+ *
+ * Note that if more than one thread will be accessing
+ * a context, locking is necessary. It is not generally
+ * allowed to call context_* on the same context from more
+ * than one thread without locking.
+ */
+typedef struct Context Context;
+
+typedef void (* ContextFdFunc) (Context *context,
+ int fd);
+typedef void (* ContextFdRemovedFunc) (Context *context,
+ int fd,
+ gpointer data);
+
+Context *context_new (Engine *engine,
+ gpointer data);
+void context_dispatch (Context *context);
+Engine * context_get_engine (Context *context);
+gpointer context_get_data (Context *context);
+
+/* fd */
+void context_add_fd (Context *contenxt,
+ int fd);
+/* Note that removing an fd from the context also closes it.
+ * Yeah, that's a little unusual, but the alternative would
+ * be to add a notification that the fd was no longer in
+ * any outstanding poll() routines.
+ */
+void context_remove_fd (Context *context,
+ int fd);
+void context_set_write (Context *contenxt,
+ int fd,
+ ContextFdFunc callback);
+void context_set_read (Context *context,
+ int fd,
+ ContextFdFunc callback);
+
+/* other */
+void context_add_idle (Context *context,
+ EngineFunc func,
+ gpointer data);
+void context_add_timeout (Context *context,
+ guint timeout_ms,
+ EngineFunc func,
+ gpointer data);
+void context_add_task (Context *context,
+ EngineFunc func,
+ gpointer data);
diff --git a/epoll.c b/epoll.c
new file mode 100644
index 0000000..a464cc7
--- /dev/null
+++ b/epoll.c
@@ -0,0 +1,192 @@
+#include "epoll.h"
+#include <sys/poll.h>
+
+typedef struct FdInfo FdInfo;
+
+struct FdInfo
+{
+ /* FIXME: we might want to compress this struct at some point.
+ * Though, if we have 100000 fd's, it's only 1.6 MB, which is
+ * almost nothing for that many clients, Still, cache use
+ * would improve.
+ */
+ gboolean valid;
+ gboolean disabled;
+ EPollEventType mask;
+ gpointer data;
+};
+
+struct EPoll
+{
+ int n_fds;
+ gsize n_fd_infos;
+ FdInfo * fd_info;
+};
+
+EPoll *
+epoll_new (void)
+{
+ EPoll *epoll = g_new0 (EPoll, 1);
+
+ epoll->n_fds = 0;
+ epoll->n_fd_infos = 1;
+ epoll->fd_info = g_new0 (FdInfo, 1);
+
+ return epoll;
+}
+
+void
+epoll_add_fd (EPoll *epoll,
+ int fd,
+ EPollEventType mask,
+ gpointer data)
+{
+ g_return_if_fail (!epoll_has_fd (epoll, fd));
+
+ while (fd >= epoll->n_fd_infos)
+ epoll->n_fd_infos *= 2;
+
+ epoll->fd_info = g_renew (FdInfo, epoll->fd_info, epoll->n_fd_infos);
+
+ epoll->n_fds++;
+
+ epoll->fd_info[fd].valid = TRUE;
+ epoll->fd_info[fd].mask = mask;
+ epoll->fd_info[fd].data = data;
+}
+
+gboolean
+epoll_has_fd (EPoll *epoll,
+ int fd)
+{
+ g_return_val_if_fail (epoll != NULL, FALSE);
+
+ return (fd < epoll->n_fd_infos && epoll->fd_info[fd].valid);
+}
+
+gint
+epoll_get_n_fds (EPoll *epoll)
+{
+ return epoll->n_fds;
+}
+
+void
+epoll_remove_fd (EPoll *epoll,
+ int fd)
+{
+ g_return_if_fail (epoll_has_fd (epoll, fd));
+
+ epoll->fd_info[fd].valid = FALSE;
+}
+
+gpointer
+epoll_get_fd_data (EPoll *epoll,
+ int fd)
+{
+ g_return_val_if_fail (epoll != NULL, NULL);
+ g_return_val_if_fail (epoll_has_fd (epoll, fd), NULL);
+
+ return epoll->fd_info[fd].data;
+}
+
+/* epoll_wait() is a one-shot polling mechanism, so this function
+ * must be called to reenable fd's after they have been returned
+ * by epoll_wait().
+ */
+void
+epoll_reenable_fd (EPoll *epoll,
+ int fd)
+{
+ g_return_if_fail (epoll_has_fd (epoll, fd));
+
+ epoll->fd_info[fd].disabled = FALSE;
+}
+
+EPollEvent *
+epoll_wait (EPoll *epoll,
+ int *n_events,
+ int timeout)
+{
+ struct pollfd *fds;
+ int i, j;
+ int n_poll_fds;
+ int n_ready;
+ EPollEvent *events;
+
+ fds = g_new (struct pollfd, epoll->n_fds);
+
+ j = 0;
+ for (i = 0; i < epoll->n_fd_infos; ++i)
+ {
+ if (epoll->fd_info[i].valid &&
+ !epoll->fd_info[i].disabled &&
+ epoll->fd_info[i].mask != 0)
+ {
+ fds[j].events = 0;
+
+ if (epoll->fd_info[i].mask & EPOLL_READ)
+ fds[j].events |= POLLIN;
+
+ if (epoll->fd_info[i].mask & EPOLL_WRITE)
+ fds[j].events |= POLLOUT;
+
+ if (epoll->fd_info[i].mask & EPOLL_HANGUP)
+ fds[j].events |= POLLHUP;
+
+ if (epoll->fd_info[i].mask & EPOLL_PRIORITY)
+ fds[j].events |= POLLPRI;
+
+ if (epoll->fd_info[i].mask & EPOLL_ERROR)
+ fds[j].events |= POLLERR;
+
+ fds[j].fd = i;
+
+ j++;
+ }
+ }
+
+ n_poll_fds = j;
+
+ events = NULL;
+
+ n_ready = poll (fds, n_poll_fds, timeout);
+
+ if (n_ready)
+ {
+ events = g_new0 (EPollEvent, n_ready);
+
+ j = 0;
+ for (i = 0; i < n_poll_fds; ++i)
+ {
+ if (fds[i].revents)
+ {
+ events[j].events = 0;
+
+ if (fds[i].revents & POLLIN)
+ events[j].events |= EPOLL_READ;
+
+ if (fds[i].revents & POLLOUT)
+ events[j].events |= EPOLL_WRITE;
+
+ if (fds[i].revents & POLLHUP)
+ events[j].events |= EPOLL_HANGUP;
+
+ if (fds[i].revents & POLLERR)
+ events[j].events |= EPOLL_ERROR;
+
+ if (fds[i].revents & POLLPRI)
+ events[j].events |= EPOLL_PRIORITY;
+
+ events[j].fd = fds[i].fd;
+ epoll->fd_info[fds[i].fd].disabled = TRUE;
+ }
+ }
+ }
+
+ g_free (fds);
+
+ if (n_events)
+ *n_events = n_ready;
+
+ return events;
+}
diff --git a/epoll.h b/epoll.h
new file mode 100644
index 0000000..e16f7ea
--- /dev/null
+++ b/epoll.h
@@ -0,0 +1,41 @@
+#ifndef _EPOLL_H_
+#define _EPOLL_H_
+
+#include <glib.h>
+
+typedef struct EPoll EPoll;
+
+typedef enum
+{
+ EPOLL_READ = 1 << 0,
+ EPOLL_WRITE = 1 << 1,
+ EPOLL_HANGUP = 1 << 2,
+ EPOLL_ERROR = 1 << 3,
+ EPOLL_PRIORITY = 1 << 4,
+} EPollEventType;
+
+typedef struct
+{
+ EPollEventType events;
+ int fd;
+} EPollEvent;
+
+EPoll *epoll_new (void);
+void epoll_add_fd (EPoll *epoll,
+ int fd,
+ EPollEventType mask,
+ gpointer data);
+void epoll_remove_fd (EPoll *epoll,
+ int fd);
+gpointer epoll_get_fd_data (EPoll *epoll,
+ int fd);
+gboolean epoll_has_fd (EPoll *epoll,
+ int fd);
+void epoll_reenable_fd (EPoll *epoll,
+ int fd);
+gint epoll_get_n_fds (EPoll *epoll);
+EPollEvent * epoll_wait (EPoll *epoll,
+ int *n_events,
+ int timeout);
+
+#endif
diff --git a/executor.c b/executor.c
new file mode 100644
index 0000000..a70c248
--- /dev/null
+++ b/executor.c
@@ -0,0 +1,178 @@
+#include "executor.h"
+
+struct Executor
+{
+ gboolean quitting;
+
+ GMutex * mutex;
+
+ GCond * new_job_cond;
+ GCond * idle_cond;
+ GCond * exit_cond;
+
+ GQueue * jobs;
+
+ int n_threads;
+ int n_idle_threads;
+ int n_exited_threads;
+};
+
+static gpointer executor_thread (gpointer);
+
+static void
+executor_lock (Executor *executor)
+{
+ g_mutex_lock (executor->mutex);
+}
+
+static void
+executor_unlock (Executor *executor)
+{
+ g_mutex_unlock (executor->mutex);
+}
+
+Executor *
+executor_new (guint n_threads)
+{
+ Executor *executor;
+ int i;
+
+ g_return_val_if_fail (n_threads > 0, NULL);
+
+ if (!g_thread_supported())
+ g_thread_init (NULL);
+
+ executor = g_new0 (Executor, 1);
+
+ executor->mutex = g_mutex_new ();
+ executor->jobs = g_queue_new ();
+
+ executor->new_job_cond = g_cond_new ();
+ executor->idle_cond = g_cond_new ();
+ executor->exit_cond = g_cond_new ();
+ executor->n_idle_threads = 0;
+ executor->n_exited_threads = 0;
+ executor->quitting = FALSE;
+ executor->n_threads = n_threads;
+
+ executor_lock (executor);
+
+ for (i = 0; i < n_threads; ++i)
+ g_thread_create (executor_thread, executor, TRUE, NULL);
+
+ executor_unlock (executor);
+
+ return executor;
+}
+
+static guint
+executor_n_jobs (Executor *executor)
+{
+ g_assert (g_queue_get_length (executor->jobs) % 2 == 0);
+
+ return (g_queue_get_length (executor->jobs)) / 2;
+}
+
+/* Wait until all previously submitted jobs have finished */
+void
+executor_sync (Executor *executor)
+{
+ executor_lock (executor);
+
+ g_cond_broadcast (executor->new_job_cond);
+
+ while (!(executor->n_idle_threads == executor->n_threads &&
+ executor_n_jobs (executor) == 0))
+ {
+ g_cond_wait (executor->idle_cond, executor->mutex);
+ }
+
+ executor_unlock (executor);
+}
+
+void
+executor_free (Executor *executor)
+{
+ executor_lock (executor);
+
+ executor->quitting = TRUE;
+
+ g_cond_broadcast (executor->new_job_cond);
+
+ while (executor->n_exited_threads != executor->n_threads)
+ g_cond_wait (executor->exit_cond, executor->mutex);
+
+ executor_unlock (executor);
+
+ while (!g_queue_is_empty (executor->jobs))
+ g_queue_pop_head (executor->jobs);
+
+ g_mutex_free (executor->mutex);
+ g_cond_free (executor->new_job_cond);
+ g_cond_free (executor->idle_cond);
+ g_cond_free (executor->exit_cond);
+ g_queue_free (executor->jobs);
+}
+
+void
+executor_add_job (Executor *exe,
+ ExecutorJob job,
+ gpointer data)
+{
+ executor_lock (exe);
+
+ g_queue_push_tail (exe->jobs, job);
+ g_queue_push_tail (exe->jobs, data);
+
+ g_cond_signal (exe->new_job_cond);
+
+ executor_unlock (exe);
+}
+
+static gpointer
+executor_thread (gpointer data)
+{
+ Executor *executor = data;
+
+ executor_lock (executor);
+
+ while (!executor->quitting)
+ {
+ if (!executor_n_jobs (executor))
+ {
+ executor->n_idle_threads++;
+
+ g_cond_signal (executor->idle_cond);
+
+ g_cond_wait (executor->new_job_cond, executor->mutex);
+
+ executor->n_idle_threads--;
+ }
+
+ if (executor->quitting)
+ break;
+
+ if (executor_n_jobs (executor))
+ {
+ ExecutorJob job;
+ gpointer data;
+
+ job = g_queue_pop_head (executor->jobs);
+ data = g_queue_pop_head (executor->jobs);
+
+ executor_unlock (executor);
+
+ job (data);
+
+ executor_lock (executor);
+ }
+ }
+
+ executor->n_exited_threads++;
+
+ g_cond_signal (executor->exit_cond);
+
+ executor_unlock (executor);
+
+ return NULL;
+}
diff --git a/executor.h b/executor.h
new file mode 100644
index 0000000..622a55e
--- /dev/null
+++ b/executor.h
@@ -0,0 +1,18 @@
+#ifndef EXECUTOR_H
+#define EXECUTOR_H
+
+#include <glib.h>
+
+typedef struct Executor Executor;
+
+typedef void (* ExecutorJob) (gpointer data);
+
+Executor *executor_new (guint n_threads);
+void executor_add_job (Executor *exe,
+ ExecutorJob job,
+ gpointer data);
+void executor_free (Executor *executor);
+void executor_sync (Executor *executor);
+
+#endif
+
diff --git a/hest.c b/hest.c
new file mode 100644
index 0000000..2faabe4
--- /dev/null
+++ b/hest.c
@@ -0,0 +1,52 @@
+#include <glib.h>
+#include "executor.h"
+
+G_LOCK_DEFINE (blah);
+
+GList *list;
+
+static void
+the_job (gpointer data)
+{
+ int i;
+
+ G_LOCK (blah);
+
+ for (i = 0; i < 34540; ++i)
+ ;
+
+#if 0
+ g_print ("%d he string is: %p\n",
+ (GPOINTER_TO_INT (data)), g_thread_self());
+#endif
+
+ list = g_list_prepend (list, data);
+
+ G_UNLOCK (blah);
+}
+
+int
+main ()
+{
+ Executor *exe = executor_new (4);
+ int i;
+
+ for (i = 0; i < 200000; ++i)
+ {
+ executor_add_job (exe, the_job, i * 10 + 0);
+ executor_add_job (exe, the_job, i * 10 + 1);
+ executor_add_job (exe, the_job, i * 10 + 2);
+ executor_add_job (exe, the_job, i * 10 + 3);
+ executor_add_job (exe, the_job, i * 10 + 4);
+ executor_add_job (exe, the_job, i * 10 + 5);
+ executor_add_job (exe, the_job, i * 10 + 6);
+ executor_add_job (exe, the_job, i * 10 + 7);
+ executor_add_job (exe, the_job, i * 10 + 8);
+ executor_add_job (exe, the_job, i * 10 + 9);
+ }
+
+ executor_sync (exe);
+ executor_free (exe);
+
+ g_print ("%d\n", g_list_length (list));
+}
diff --git a/http-server.c b/http-server.c
new file mode 100644
index 0000000..cbf490f
--- /dev/null
+++ b/http-server.c
@@ -0,0 +1,59 @@
+static void
+client_free (Client *client)
+{
+ delete (peer);
+ close (client->fd);
+ main_context_free (client->context);
+ connection_destroy (client->connection);
+ g_free (client);
+}
+
+static void
+on_connection_event (connection)
+{
+ Client *client = lac_connection_get_data (client);
+
+ switch (event)
+ {
+ case READ:
+ read_the_stuff();
+ process_it();
+ break;
+
+ case ERROR:
+ connection_close();
+ break;
+
+ case CLOSED:
+ client_free ();
+ break;
+ }
+
+ process_stuff ();
+
+ connection_write_stuff ();
+}
+
+static void
+on_connection (LacListener *listener, int fd, LacAddress *peer, gpointer data)
+{
+ Server *server = lac_listener_get_data (listener);
+ Client *client = g_new (Client, 1);
+ client->peer = peer;
+ client->fd = fd;
+
+ client->context = context_new (server->executor, server->poll, client);
+ client->connection = connection_new (fd, on_connection_event);
+}
+
+Server *
+server_new (int port)
+{
+ server->poll = epll_new();
+ server->executor = executor_new ();
+ server->port = port;
+
+ create_listener (port, on_connection, server);
+
+ return server;
+}
diff --git a/jobqueue.c b/jobqueue.c
new file mode 100644
index 0000000..427b62e
--- /dev/null
+++ b/jobqueue.c
@@ -0,0 +1,180 @@
+#include "jobqueue.h"
+
+typedef struct Job Job;
+
+struct Job
+{
+ JobQueue * queue;
+ ExecutorJob job;
+ gpointer data;
+};
+
+struct JobQueue
+{
+ Executor * executor;
+ GQueue * jobs;
+ GMutex * mutex;
+ gboolean in_executor;;
+ int ref_count;
+};
+
+static Job *
+job_new (JobQueue * queue,
+ ExecutorJob func,
+ gpointer data)
+{
+ Job *job = g_new0 (Job, 1);
+ job->job = func;
+ job->queue = queue;
+ job->data = data;
+
+ return job;
+}
+
+static void
+job_queue_lock (JobQueue *queue)
+{
+ g_mutex_lock (queue->mutex);
+}
+
+static void
+job_queue_unlock (JobQueue *queue)
+{
+ g_mutex_unlock (queue->mutex);
+}
+
+static void
+delete_queue (JobQueue *queue)
+{
+ g_assert (queue->ref_count == 0);
+
+ g_queue_free (queue->jobs);
+ g_mutex_free (queue->mutex);
+ g_free (queue);
+}
+
+JobQueue *
+job_queue_new (Executor *executor)
+{
+ JobQueue *queue = g_new0 (JobQueue, 1);
+
+ queue->executor = executor;
+ queue->mutex = g_mutex_new ();
+ queue->in_executor = FALSE;
+ queue->ref_count = 1;
+ queue->jobs = g_queue_new ();
+
+ return queue;
+}
+
+static void
+run_queue (gpointer data)
+{
+ JobQueue *queue = data;
+ gboolean do_free = FALSE;
+
+ job_queue_lock (queue);
+
+ g_assert (queue->in_executor == TRUE);
+
+ while (!g_queue_is_empty (queue->jobs))
+ {
+ Job *job = g_queue_pop_head (queue->jobs);
+
+ job_queue_unlock (queue);
+
+ job->job (job->data);
+
+ g_free (job);
+
+ job_queue_lock (queue);
+ }
+
+ queue->ref_count--;
+ queue->in_executor = FALSE;
+
+ if (queue->ref_count == 0)
+ do_free = TRUE;
+
+ job_queue_unlock (queue);
+
+ if (do_free)
+ delete_queue (queue);
+}
+
+gpointer
+job_queue_add (JobQueue *queue,
+ ExecutorJob func,
+ gpointer data)
+{
+ Job *job;
+
+ job_queue_lock (queue);
+
+ job = job_new (queue, func, data);
+ g_queue_push_tail (queue->jobs, job);
+
+ if (!queue->in_executor)
+ {
+ queue->in_executor = TRUE;
+ queue->ref_count++;
+ executor_add_job (queue->executor, run_queue, queue);
+ }
+
+ job_queue_unlock (queue);
+
+ return job;
+}
+
+void
+job_queue_remove (JobQueue *queue,
+ gpointer job_id)
+{
+ Job *job = job_id;
+ GList *link;
+
+ job_queue_lock (job->queue);
+
+ link = g_queue_find (queue->jobs, job);
+ if (link)
+ {
+ g_queue_delete_link (queue->jobs, link);
+ g_free (job);
+ }
+
+ job_queue_unlock (job->queue);
+}
+
+void
+job_queue_free (JobQueue *queue)
+{
+ gboolean do_free = FALSE;
+
+ job_queue_lock (queue);
+
+ if (queue->in_executor)
+ {
+ g_warning ("Trying to free a running queue");
+ job_queue_unlock (queue);
+ return;
+ }
+
+ while (!g_queue_is_empty (queue->jobs))
+ {
+ Job *job = g_queue_pop_head (queue->jobs);
+
+ g_free (job);
+ }
+
+ g_queue_free (queue->jobs);
+
+ queue->ref_count--;
+
+ if (queue->ref_count == 0)
+ do_free = TRUE;
+
+ job_queue_unlock (queue);
+
+ if (do_free)
+ delete_queue (queue);
+}
diff --git a/jobqueue.h b/jobqueue.h
new file mode 100644
index 0000000..9f2c20c
--- /dev/null
+++ b/jobqueue.h
@@ -0,0 +1,26 @@
+#ifndef _JOB_QUEUE_H_
+#define _JOB_QUEUE_H_
+
+#include "executor.h"
+
+typedef struct JobQueue JobQueue;
+
+/* A job queue has the property that no more than one
+ * job will be active at the same time. So if the currently
+ * running task knows that some object will only be touched by itself
+ * and other tasks in the same queue, it doesn't have to lock it.
+ *
+ * Also, the task can safely cancel other jobs in the queue. The
+ * pointer you get back is guaranteed to be valid until the corresponding
+ * task has returned
+ */
+JobQueue *job_queue_new (Executor *executor);
+gpointer job_queue_add (JobQueue *queue,
+ ExecutorJob job,
+ gpointer data);
+void job_queue_remove (JobQueue *queue,
+ gpointer job);
+void job_queue_free (JobQueue *queue);
+
+#endif
+
diff --git a/locking b/locking
new file mode 100644
index 0000000..edf2040
--- /dev/null
+++ b/locking
@@ -0,0 +1,21 @@
+Locking is hard
+
+A possible design:
+ - Every object has its own lock
+ - No more than one lock can be taken by the same
+ thread at a time.
+
++ Is a checkable invariant. A wrapper around g_mutex_lock() can be
+ written that checks it.
+
+- Draconican Rules:
+ - whenever you call a callback or a method on another object:
+
+ - first make a copy of all the state yuo wish to pass
+ - then unlock
+ - then call and store the return value on the stack
+ - then lock
+ - then do something with the return value.
+
+ And at this point be aware that other threads could
+ have changed you.
diff --git a/main.c b/main.c
new file mode 100644
index 0000000..0b10925
--- /dev/null
+++ b/main.c
@@ -0,0 +1,7 @@
+#include "engine.h"
+
+int
+main ()
+{
+ return 0;
+}
diff --git a/maincontext.c b/maincontext.c
new file mode 100644
index 0000000..0edc79a
--- /dev/null
+++ b/maincontext.c
@@ -0,0 +1,175 @@
+#include "jobqueue.h"
+#include "maincontext.h"
+#include "mainloop.h"
+
+/* A main context will generally correspond to one client in the server.
+ *
+ * Tasks added to it, either as fd tasks, or as idle/timeout tasks
+ * will run serially, always.
+ *
+ * Parallelism can be done through the - as yet - non-existant
+ * get_executor(). Or perhaps, create_job_queue()?
+ *
+ * Also, given a main context, who really needs a "mainloop"? How about
+ * just passing in an epoll and an executor at creation time?
+ *
+ * Sounds like a better idea. Hmm, the problem is who calls epoll_wait()?
+ *
+ * Also should probably have a main_context_get_data() and then pass the
+ * context to the callbacks, rather than the data directly. This way
+ * the callbacks have acess to the context as well. Though I guess the
+ * user data will generally contain a pointer to the context. So data
+ * itself is probably more convenient. Or is it? If you have data, then
+ * you have to do my_stuff->context to get at the context, if it is an
+ * argument, then you can just use it. Ie., there is no useless "data"
+ * parameter.
+ *
+ * We will most likely need per-fd data for things like Connections.
+ */
+typedef struct ContextFdInfo ContextFdInfo;
+
+struct MainContext
+{
+ MainLoop * loop;
+ gpointer data;
+ JobQueue * job_queue;
+};
+
+struct ContextFdInfo
+{
+ MainContext * context;
+ MainFdTask prepare;
+ MainFdTask read;
+ MainFdTask write;
+ gpointer data;
+};
+
+MainContext *
+main_context_new (MainLoop *loop,
+ gpointer data)
+{
+ MainContext *context = g_new0 (MainContext, 1);
+
+ context->loop = loop;
+ context->data = data;
+ context->job_queue = job_queue_new (main_loop_get_executor (loop));
+
+ return context;
+}
+
+void
+main_context_free (MainContext *context)
+{
+ /* FIXME: this will complain if we run it when there are still
+ * active jobs. But we kinda have to delete the job out of a
+ * task that runs in it. So what do we do? Maybe just make it
+ * legal to delete an active job queue. I don't really see the
+ * harm in that, except that the execution function in jobqueue.c
+ * might get a little bit more complex
+ */
+
+ job_queue_free (context->job_queue);
+
+ g_free (context);
+}
+
+static void
+on_fd_activity (int fd,
+ EPollEventType mask,
+ gpointer data)
+{
+
+}
+
+/* Filedescriptors */
+void
+main_context_add_fd (MainContext *context,
+ int fd,
+ gpointer data)
+{
+ ContextFdInfo *info = g_new0 (ContextFdInfo, 1);
+
+ info->context = context;
+ info->data = data;
+ info->prepare = NULL;
+ info->read = NULL;
+ info->write = NULL;
+
+ main_loop_add_fd (context->loop,
+ fd,
+ context->job_queue,
+ on_fd_activity,
+ info);
+}
+
+void
+main_context_remove_fd (MainContext *context,
+ int fd)
+{
+
+}
+
+static void
+update_poll_mask (MainContext *context,
+ ContextFdInfo *info,
+ int fd)
+{
+
+}
+
+/* Called before the fd is polled */
+void
+main_context_set_prepare_callback (MainContext *context,
+ int fd,
+ MainFdTask func)
+{
+
+}
+
+/* Called when the fd is writable */
+void
+main_context_set_write_callback (MainContext *context,
+ int fd,
+ MainFdTask func)
+{
+
+}
+
+/* Called when there is data to be read, if the fd is closed/hungup, or
+ * if there is an error
+ */
+void
+main_context_set_read_callback (MainContext *context,
+ int fd,
+ MainFdTask func)
+{
+
+}
+
+/* Idle/timeout tasks */
+#if 0
+typedef gboolean (* MainTaskFunc) (gpointer data);
+
+MainTask *
+main_context_add_idle (MainContext *context,
+ MainTaskFunc func,
+ gpointer data)
+{
+
+}
+
+MainTask *
+main_context_add_timeout (MainContext *context,
+ int millisecs,
+ MainTaskFunc func,
+ gpointer data)
+{
+
+}
+
+void
+main_task_remove (MainTask *task)
+{
+
+}
+#endif
diff --git a/maincontext.h b/maincontext.h
new file mode 100644
index 0000000..32f3af3
--- /dev/null
+++ b/maincontext.h
@@ -0,0 +1,47 @@
+#include <glib.h>
+#include "mainloop.h"
+
+typedef struct MainContext MainContext;
+
+MainContext *main_context_new (MainLoop *loop,
+ gpointer data);
+void main_context_free (MainContext *context);
+
+/* Filedescriptors */
+typedef void (* MainFdTask) (MainContext *context,
+ int fd,
+ gpointer data);
+
+void main_context_add_fd (MainContext *context,
+ int fd,
+ gpointer data);
+void main_context_remove_fd (MainContext *context,
+ int fd);
+/* Called before the fd is polled */
+void main_context_set_prepare_callback (MainContext *context,
+ int fd,
+ MainFdTask func);
+/* Called when the fd is writable */
+void main_context_set_write_callback (MainContext *context,
+ int fd,
+ MainFdTask func);
+/* Called when there is data to be read, if the fd is closed/hungup, or
+ * if there is an error
+ */
+void main_context_set_read_callback (MainContext *context,
+ int fd,
+ MainFdTask func);
+
+/* Idle/timeout tasks */
+#if 0
+typedef gboolean (* MainTaskFunc) (gpointer data);
+
+MainTask *main_context_add_idle (MainContext *context,
+ MainTaskFunc func,
+ gpointer data);
+MainTask *main_context_add_timeout (MainContext *context,
+ int millisecs,
+ MainTaskFunc func,
+ gpointer data);
+void main_task_remove (MainTask *task);
+#endif
diff --git a/mainloop.c b/mainloop.c
new file mode 100644
index 0000000..754d84b
--- /dev/null
+++ b/mainloop.c
@@ -0,0 +1,142 @@
+#include "mainloop.h"
+#include "epoll.h"
+
+typedef struct FdInfo FdInfo;
+typedef struct FdCallbackInfo FdCallbackInfo;
+
+struct FdInfo
+{
+ JobQueue * queue;
+ MainFdFunc func;
+ gpointer data;
+};
+
+struct MainLoop
+{
+ Executor *executor;
+ EPoll *epoll;
+};
+
+struct FdCallbackInfo
+{
+ EPoll *epoll;
+ int fd;
+ EPollEventType mask;
+ MainFdFunc func;
+ gpointer data;
+};
+
+static void
+run_fd_callback (gpointer data)
+{
+ FdCallbackInfo *info = data;
+
+ info->func (info->fd, info->mask, info->data);
+
+ epoll_reenable_fd (info->epoll, info->fd);
+
+ g_free (info);
+}
+
+static void
+poll_job (gpointer data)
+{
+ MainLoop *loop = data;
+ EPollEvent *events;
+ int i, n_events;
+
+ events = epoll_wait (loop->epoll, &n_events, -1);
+
+ for (i = 0; i < n_events; ++i)
+ {
+ EPollEvent *event = &(events[i]);
+ FdInfo *info = epoll_get_fd_data (loop->epoll, event->fd);
+
+ FdCallbackInfo *callback_info = g_new0 (FdCallbackInfo, 1);
+
+ callback_info->epoll = loop->epoll;
+ callback_info->fd = event->fd;
+ callback_info->mask = event->events;
+ callback_info->func = info->func;
+ callback_info->data = info->data;
+
+ job_queue_add (info->queue, run_fd_callback, callback_info);
+ }
+
+ g_free (events);
+
+ executor_add_job (loop->executor, poll_job, loop);
+}
+
+MainLoop *
+main_loop_new (Executor *executor)
+{
+ MainLoop *loop;
+
+ g_return_val_if_fail (executor != NULL, NULL);
+
+ loop = g_new0 (MainLoop, 1);
+ loop->executor = executor;
+ loop->epoll = epoll_new ();
+
+ executor_add_job (executor, poll_job, loop);
+
+ return loop;
+}
+
+void
+main_loop_add_fd (MainLoop *loop,
+ int fd,
+ JobQueue *queue,
+ MainFdFunc func,
+ gpointer data)
+{
+ FdInfo *info;
+
+ g_return_if_fail (loop != NULL);
+ g_return_if_fail (!epoll_has_fd (loop->epoll, fd));
+
+ info = g_new0 (FdInfo, 1);
+ info->queue = queue;
+ info->func = func;
+ info->data = data;
+
+ epoll_add_fd (loop->epoll, fd, 0, info);
+}
+
+#if 0
+void
+main_loop_set_fd_poll_mask (MainLoop *loop,
+ int fd,
+ EPollEventType poll_mask)
+{
+ g_return_if_fail (loop != NULL);
+ g_return_if_fail (epoll_has_fd (loop->epoll, fd));
+
+ epoll_fd_set_mask (loop->epoll, fd, poll_mask);
+}
+#endif
+
+void
+main_loop_remove_fd (MainLoop *loop,
+ int fd)
+{
+ FdInfo *info;
+
+ g_return_if_fail (loop != NULL);
+ g_return_if_fail (epoll_has_fd (loop->epoll, fd));
+
+ info = epoll_get_fd_data (loop->epoll, fd);
+
+ g_free (info);
+
+ epoll_remove_fd (loop->epoll, fd);
+}
+
+Executor *
+main_loop_get_executor (MainLoop *loop)
+{
+ g_return_val_if_fail (loop != NULL, NULL);
+
+ return loop->executor;
+}
diff --git a/mainloop.h b/mainloop.h
new file mode 100644
index 0000000..70ed4d4
--- /dev/null
+++ b/mainloop.h
@@ -0,0 +1,67 @@
+#ifndef _MAINLOOP_H
+#define _MAINLOOP_H
+
+#include <glib.h>
+#include "jobqueue.h"
+#include "epoll.h"
+
+typedef struct MainLoop MainLoop;
+
+typedef void (* MainFdFunc) (int fd,
+ EPollEventType mask,
+ gpointer data);
+
+MainLoop *main_loop_new (Executor *executor);
+Executor *main_loop_get_executor (MainLoop *loop);
+void main_loop_add_fd (MainLoop *loop,
+ int fd,
+ JobQueue *queue,
+ MainFdFunc func,
+ gpointer data);
+void main_loop_set_fd_poll_mask (MainLoop *loop,
+ int fd,
+ EPollEventType poll_mask);
+void main_loop_remove_fd (MainLoop *loop,
+ int fd);
+
+/* Missing: */
+
+/* Maybe these functions really belong in the JobQueue?
+ * That would require support from the executor though.
+ * But fundamentally these are valid (in a threaded server)
+ * without fds, which is what the main loop provides.
+ * The executor already has a gcond_wait() that could be
+ * made a timed one.
+ *
+ * Note that a good priority queue is going to be needed in
+ * whatever class eventually implements these.
+ *
+ * Also, note that cancellation support is essential for these
+ * tasks.
+ *
+ * Yes, should probably happen in JobQueue + Executor:
+ *
+ * executor should maintain all jobs in a priority queue
+ * sorted by execution time.
+ *
+ * normal jobs get an execution time of "now", where timeout
+ * jobs get an execution of sometime in the future.
+ *
+ * this avoids starvation of timeouts since a timeout will
+ * eventually move to the front of the queue (with a time
+ * in the past possibly)
+ *
+ *
+ *
+ */
+#if 0
+void main_loop_add_timeout (MainLoop *loop,
+ JobQueue *queue,
+ MainFunc *task);
+void main_loop_add_idle (MainLoop *loop,
+ JobQueue *queue,
+ MainFunc *task);
+#endif
+
+#endif /* _MAINLOO_H_ */
+
diff --git a/mainloop2.c b/mainloop2.c
new file mode 100644
index 0000000..32a4fbe
--- /dev/null
+++ b/mainloop2.c
@@ -0,0 +1,238 @@
+#ifndef _MAINLOOP_H
+#define _MAINLOOP_H
+
+#include <glib.h>
+
+#include "mainloop2.h"
+
+struct FdInfo
+{
+ JobQueue * queue;
+ MainFdFunc func;
+ gpointer data;
+};
+
+struct MainLoop
+{
+ EPoll *epoll;
+ GMutex *mutex;
+ int wake_up_pipe_write;
+ int wake_up_pipe_read;
+ GQueue *new_fds;
+ GMutex *new_fds_mutex;
+};
+
+struct FdCallbackInfo
+{
+ EPoll *epoll;
+ int fd;
+ EPollEventType mask;
+ MainFdFunc func;
+ gpointer data;
+};
+
+struct MainLoop
+{
+};
+
+#if 0
+typedef void (* MainFdFunc) (int fd,
+ EPollEventType mask,
+ gpointer data);
+#endif
+
+static void
+main_loop_lock (MainLoop *loop)
+{
+ g_mutex_lock (loop->mutex);
+}
+
+static void
+main_loop_unlock (MainLoop *loop)
+{
+ g_mutex_unlock (loop->mutex);
+}
+
+static void
+setup_wakeup_pipe (MainLoop *main_loop)
+{
+ /* FIXME: check errors */
+ int p[2];
+ pipe (p);
+ main_loop->wake_up_pipe_read = p[0];
+ main_loop->wake_up_pipe_write = p[1];
+ epoll_add_fd (main_loop->epoll, main_loop->wake_up_pipe_read, NULL);
+}
+
+MainLoop *
+main_loop_new (void)
+{
+ MainLoop *main_loop = g_new0 (MainLoop, 1);
+ int p[2];
+
+ main_loop->epoll = epoll_new (void);
+ main_loop->mutex = g_mutex_new ();
+
+ setup_wakeup_pipe (main_loop);
+
+ main_loop->new_fds = g_queue_new ();
+ main_loop->new_fds_mutex = g_mutex_new ();
+
+ return main_loop;
+}
+
+typedef struct
+{
+ int fd;
+ MainFdFunc func;
+ gpointer data;
+ EPollEventType mask;
+} EventInfo;
+
+static void
+handle_fd (gpointer data)
+{
+ EventInfo *info = data;
+
+ info->func (info->fd, info->mask, info->data);
+
+ g_free (info);
+}
+
+void
+main_loop_run (MainLoop *loop)
+{
+ while (TRUE)
+ {
+ EPollEvent *events;
+ int n_events;
+ int i;
+ GList *list;
+
+ main_loop_lock (loop);
+
+ events = epoll_wait (loop->epoll, &n_events, -1);
+
+ main_loop_unlock (loop);
+
+ for (i = 0; i < n_events; ++i)
+ {
+ EPollEvent *event = &(events[i]);
+
+ if (event->fd == loop->wake_up_pipe_read)
+ {
+ read_wake_up_pipe (event->fd);
+ }
+ else
+ {
+ FdInfo *info = epoll_get_fd_data (loop->epoll);
+ EventInfo *event_info = g_new (EventInfo, 1);
+
+ event_info->fd = event->fd;
+ event_info->func = info->func;
+ event_info->data = info->func;
+ event_info->mask = event->mask;
+
+ job_queue_add (info->queue, handle_fd, event_info);
+ }
+ }
+
+ main_loop_lock (loop);
+
+ g_mutex_lock (loop->new_fds_mutex);
+
+ for (list = loop->new_fds; list != NULL; list = list->next->next)
+ {
+ int fd = GPOINTER_TO_INT (list->data);
+ FdInfo *info = list->next->data;
+
+ epoll_add_fd (loop->epoll, fd, info);
+ }
+
+ g_list_free (loop->new_fds);
+ loop->new_fds = NULL;
+
+ g_mutex_unlock (loop->new_fds_mutex);
+ }
+}
+
+void
+main_loop_add_fd (MainLoop *loop,
+ int fd,
+ JobQueue *queue,
+ MainFdFunc func,
+ gpointer data)
+{
+ FdInfo *info;
+
+ info = g_new0 (FdInfo, 1);
+ info->queue = queue;
+ info->func = func;
+ info->data = data;
+
+ g_mutex_lock (loop->new_fds_mutex);
+
+ loop->new_fds = g_list_prepend (loop->new_fds, info);
+ loop->new_fds = g_list_prepend (loop->new_fds, GINT_TO_POINTER (fd));
+
+ g_mutex_unlock (loop->new_fds_mutex);
+
+ main_loop_wake_up (loop);
+}
+
+void
+main_loop_set_fd_poll_mask (MainLoop * loop,
+ int fd,
+ EPollEventType poll_mask)
+{
+ FdInfo *info;
+}
+
+void
+main_loop_remove_fd (MainLoop *loop,
+ int fd)
+{
+
+}
+
+/* Missing: */
+
+/* Maybe these functions really belong in the JobQueue?
+ * That would require support from the executor though.
+ * But fundamentally these are valid (in a threaded server)
+ * without fds, which is what the main loop provides.
+ * The executor already has a gcond_wait() that could be
+ * made a timed one.
+ *
+ * Note that a good priority queue is going to be needed in
+ * whatever class eventually implements these.
+ *
+ * Also, note that cancellation support is essential for these
+ * tasks.
+ *
+ * Yes, should probably happen in JobQueue + Executor:
+ *
+ * executor should maintain all jobs in a priority queue
+ * sorted by execution time.
+ *
+ * normal jobs get an execution time of "now", where timeout
+ * jobs get an execution of sometime in the future.
+ *
+ * this avoids starvation of timeouts since a timeout will
+ * eventually move to the front of the queue (with a time
+ * in the past possibly)
+ *
+ *
+ *
+ */
+#if 0
+void main_loop_add_timeout (MainLoop *loop,
+ JobQueue *queue,
+ MainFunc *task);
+void main_loop_add_idle (MainLoop *loop,
+ JobQueue *queue,
+ MainFunc *task);
+#endif
+
+#endif /* _MAINLOO_H_ */
+
diff --git a/mainloop2.h b/mainloop2.h
new file mode 100644
index 0000000..e78877a
--- /dev/null
+++ b/mainloop2.h
@@ -0,0 +1,67 @@
+#ifndef _MAINLOOP_H
+#define _MAINLOOP_H
+
+#include <glib.h>
+#include "jobqueue.h"
+#include "epoll.h"
+
+typedef struct MainLoop MainLoop;
+
+typedef void (* MainFdFunc) (int fd,
+ EPollEventType mask,
+ gpointer data);
+
+MainLoop *main_loop_new (Executor *executor);
+void main_loop_run (MainLoop *loop);
+void main_loop_add_fd (MainLoop *loop,
+ int fd,
+ JobQueue *queue,
+ MainFdFunc func,
+ gpointer data);
+void main_loop_set_fd_poll_mask (MainLoop *loop,
+ int fd,
+ EPollEventType poll_mask);
+void main_loop_remove_fd (MainLoop *loop,
+ int fd);
+
+/* Missing: */
+
+/* Maybe these functions really belong in the JobQueue?
+ * That would require support from the executor though.
+ * But fundamentally these are valid (in a threaded server)
+ * without fds, which is what the main loop provides.
+ * The executor already has a gcond_wait() that could be
+ * made a timed one.
+ *
+ * Note that a good priority queue is going to be needed in
+ * whatever class eventually implements these.
+ *
+ * Also, note that cancellation support is essential for these
+ * tasks.
+ *
+ * Yes, should probably happen in JobQueue + Executor:
+ *
+ * executor should maintain all jobs in a priority queue
+ * sorted by execution time.
+ *
+ * normal jobs get an execution time of "now", where timeout
+ * jobs get an execution of sometime in the future.
+ *
+ * this avoids starvation of timeouts since a timeout will
+ * eventually move to the front of the queue (with a time
+ * in the past possibly)
+ *
+ *
+ *
+ */
+#if 0
+void main_loop_add_timeout (MainLoop *loop,
+ JobQueue *queue,
+ MainFunc *task);
+void main_loop_add_idle (MainLoop *loop,
+ JobQueue *queue,
+ MainFunc *task);
+#endif
+
+#endif /* _MAINLOO_H_ */
+
diff --git a/test.c b/test.c
new file mode 100644
index 0000000..668ba1f
--- /dev/null
+++ b/test.c
@@ -0,0 +1,60 @@
+#include <glib.h>
+#include "jobqueue.h"
+
+G_LOCK_DEFINE (blah);
+
+GList *list;
+
+static void
+the_job (gpointer data)
+{
+ int i;
+
+#if 0
+ G_LOCK (blah);
+#endif
+
+ for (i = 0; i < 34540; ++i)
+ ;
+
+#if 0
+ g_print ("%d he string is: %p\n",
+ (GPOINTER_TO_INT (data)), g_thread_self());
+#endif
+
+#if 0
+ list = g_list_prepend (list, data);
+
+ G_UNLOCK (blah);
+#endif
+}
+
+int
+main ()
+{
+ Executor *exe = executor_new (10);
+ int i;
+
+ for (i = 0; i < 2000; ++i)
+ {
+ JobQueue *queue = job_queue_new (exe);
+
+ job_queue_add (queue, the_job, GINT_TO_POINTER (i * 10 + 0));
+ job_queue_add (queue, the_job, GINT_TO_POINTER (i * 10 + 1));
+ job_queue_add (queue, the_job, GINT_TO_POINTER (i * 10 + 2));
+ job_queue_add (queue, the_job, GINT_TO_POINTER (i * 10 + 3));
+ job_queue_add (queue, the_job, GINT_TO_POINTER (i * 10 + 4));
+ job_queue_add (queue, the_job, GINT_TO_POINTER (i * 10 + 5));
+ job_queue_add (queue, the_job, GINT_TO_POINTER (i * 10 + 6));
+ job_queue_add (queue, the_job, GINT_TO_POINTER (i * 10 + 7));
+ job_queue_add (queue, the_job, GINT_TO_POINTER (i * 10 + 8));
+ job_queue_add (queue, the_job, GINT_TO_POINTER (i * 10 + 9));
+ }
+
+ executor_sync (exe);
+ executor_free (exe);
+
+ g_print ("%d\n", g_list_length (list));
+
+ return 0;
+}