diff options
author | Søren Sandmann <sandmann@redhat.com> | 2009-08-30 21:50:51 -0400 |
---|---|---|
committer | Søren Sandmann <sandmann@redhat.com> | 2009-08-30 21:50:51 -0400 |
commit | e947945d98e54fbd72c7764418b6556c7df7c7cc (patch) | |
tree | 88854705a9aa8b78a65f130f71ad3b1a5b89031b |
-rw-r--r-- | TODO | 138 | ||||
-rwxr-xr-x | build.sh | 3 | ||||
-rw-r--r-- | bytequeue.c | 191 | ||||
-rw-r--r-- | bytequeue.h | 63 | ||||
-rw-r--r-- | connection.h | 43 | ||||
-rw-r--r-- | engine.c | 514 | ||||
-rw-r--r-- | engine.h | 71 | ||||
-rw-r--r-- | epoll.c | 192 | ||||
-rw-r--r-- | epoll.h | 41 | ||||
-rw-r--r-- | executor.c | 178 | ||||
-rw-r--r-- | executor.h | 18 | ||||
-rw-r--r-- | hest.c | 52 | ||||
-rw-r--r-- | http-server.c | 59 | ||||
-rw-r--r-- | jobqueue.c | 180 | ||||
-rw-r--r-- | jobqueue.h | 26 | ||||
-rw-r--r-- | locking | 21 | ||||
-rw-r--r-- | main.c | 7 | ||||
-rw-r--r-- | maincontext.c | 175 | ||||
-rw-r--r-- | maincontext.h | 47 | ||||
-rw-r--r-- | mainloop.c | 142 | ||||
-rw-r--r-- | mainloop.h | 67 | ||||
-rw-r--r-- | mainloop2.c | 238 | ||||
-rw-r--r-- | mainloop2.h | 67 | ||||
-rw-r--r-- | test.c | 60 |
24 files changed, 2593 insertions, 0 deletions
@@ -0,0 +1,138 @@ +New, simpler design: + +There is an 'Engine' which is both Executor and Mainloop. It uses an +Epoll object to handle the polling. This epoll object does not do any +locking by itself. + +Contexts can be created; they are like jobqueues, except they also +allow you to add fd's and idles and timeouts. They can do that since +they have access to the engine. These are completely unlocked; if you +want to access a context from more than one thread, you must lock them +yourself. It is guaranteed that only one callback will be active for a +context at the same time. From within that callback it is then safe to +remove other callbacks etc. In general such a context can act pretty +much like one thread. + +A web server will consist of a listening context, which will create +other contexts as necessary and add the fd's to them with the +appropriate callback. Or maybe just call back with the new fd. I guess +there should be a generic listener object that will create its own +context and call back. Then an http module can just create a listener +and do whatever it wants in response. + +One possible problem. The thread that creates a context and adds the +fd not necessarily the one that will execute the first callback. Can +these interfere? Maybe the solution is just that the creating thread +should not do anything with the context after it adds the fd. A +better idea may be to just have a context_dispatch() function that you +must call after creating the context. Until this is called, nothing +the context is not active. + +Beginning of this new design is implemented and can be built with +build2.sh. + + + +Old design (referenced above) + +Highlevel design for a web server: + + - Executor + takes callbacks and executes them, possibly in parallell + or out of order. The callbacks are likely to be called in + a different thread than the one that queued them. + + Should support both push_back() and push_front(). Push_front() + is needed to do parallel requests with good latency. If + a request can be parallellized, the parallel tasks should + be put at the front of the queue. + + - EPoll + Simple class that wraps epoll()/poll(). Note: must support + oneshot polling and rearming. + + - JobQueue + Maintain a queue of jobs, uses executor to execute them + in order. Two jobs in the same JobQueue will never run + at the same time. + + - Mainloop + Uses executor, epoll. + - Takes filedescriptors and callbacks. calls + back when descriptor is readable/writable/etc. + - The callbacks are put on a job queue, which is passed in + - Each callback is a oneshot - ie., after calling, the + filedescriptor is not polled again until it is rearmed. + The mainloop will have a method to rearm filedescriptors. + This ensures that the mainloop can start a new poll() + whenever it wants without waiting for all the callbacks + to finish. + - Also takes timeouts that can be canceled. Timeouts also + need to be put on a queue. + + Sketch of implementation: + + polljob() + { + timeout = compute min_timeout (); + if (timeout > 0) + poll_armed_describtors(timeout); + call all timeouts (ie., put them on queues); + call all callbacks (ie., put them on queues); + schedule (polljob); + } + + - MainContext + + - The thing clients will deal with + + - Uses the main loop + + - Filedescriptors + maincontext is responsible for rearming the + descriptor after the client callback has been + called. + + - Timeouts + + - Idle handlers + + - Everything associated with a main context happens + serialized - ie., as if only one thread executed it. + + - This means a client structure doesn't need to be locked. + + - Has a get_executor() method so that clients can parallelize + if they want to. + + - Current thinking is that if we have main contexts, who + really needs a main loop? The only thing you could do + with it is to pass it to main contexts. + + OTOH that's true of several of the objects here. + see notes at top of maincontext.c + + - Worth noting that stuff that has to be passed in to + create a client object must be available to the + listener callback. But see http_server.c for an example + + - Err, the epoll in itself is not enough for maincontext, + since it needs to be shared between maincontexts. Ie., + who would call epoll_wait(). We do really need a main + loop that will call back. + + - Listener + Listens on a port. Calls back with a file descriptor when + someone connects. + + - Connection + initialized with a MainContext and a filedescriptor. + Creates events when something happens + + - Http: + Has a Connection + parses http, emits events such as + "get hostname pagename query" + "post etc etc etc" + + - ContentProvider diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..43e482f --- /dev/null +++ b/build.sh @@ -0,0 +1,3 @@ +gcc -Wall `pkg-config --cflags --libs glib-2.0 gthread-2.0 lac-1.0` \ + epoll.c jobqueue.c executor.c mainloop.c \ + maincontext.c bytequeue.c test.c diff --git a/bytequeue.c b/bytequeue.c new file mode 100644 index 0000000..538ba42 --- /dev/null +++ b/bytequeue.c @@ -0,0 +1,191 @@ +#include <string.h> + +#include "bytequeue.h" + +struct ByteQueue +{ + gsize segment_size; + guint8 * segment; + guint8 * start; + guint8 * end; +}; + +ByteQueue * +byte_queue_new (void) +{ + ByteQueue *queue = g_new0 (ByteQueue, 1); + + queue->segment_size = 0; + queue->segment = NULL; + queue->start = NULL; + queue->end = NULL; + + return queue; +} + +guint8 * +byte_queue_free (ByteQueue *queue, + gboolean free_data) +{ + guint8 *result; + + if (free_data) + { + g_free (queue->segment); + + result = NULL; + } + else + { + memmove (queue->segment, queue->start, queue->end - queue->start); + + result = queue->segment; + } + + g_free (queue); + + return result; +} + +/* The data returned is owned by the byte queue and becomes invalid + * as soon as any method is called on the queue. It is explicitly + * allowed to push the returned data back into the queue, and indeed + * in that case the queue will avoid copying if it can. The push + * must be the first method called on the queue after the read. + */ +const guint8 * +byte_queue_get_data (ByteQueue *queue, + gsize *n_bytes) +{ + guint8 *result; + + if (n_bytes) + *n_bytes = queue->end - queue->start; + + result = queue->start; + + queue->start = queue->segment; + queue->end = queue->segment; + + return result; +} + +static gboolean +in_segment (ByteQueue *queue, + const guint8 *bytes, + gsize n_bytes) +{ + return bytes >= queue->segment && bytes + n_bytes < queue->end; +} + +static gboolean +is_empty (ByteQueue *queue) +{ + return queue->start == queue->end; +} + +static gsize +power_of_two_bigger_than (gsize n) +{ + gsize result = 1; + + while (result <= n) + result *= 2; + + return result; +} + +static void +ensure_room (ByteQueue *queue, + gsize extra) +{ + gsize old_data_size = queue->end - queue->start; + gsize new_data_size = old_data_size + extra; + + if (queue->end + new_data_size > queue->segment + queue->segment_size) + { + gsize new_segment_size = power_of_two_bigger_than (2 * new_data_size); + + memmove (queue->start, queue->segment, old_data_size); + + if (new_segment_size > queue->segment_size) + { + queue->segment_size = new_segment_size; + queue->segment = g_realloc (queue->segment, new_segment_size); + } + + queue->start = queue->segment; + queue->end = queue->start + new_data_size; + } +} + +guint8 * +byte_queue_alloc_tail (ByteQueue *queue, + gsize size) +{ + ensure_room (queue, size); + + queue->end += size; + + return queue->end - size; +} + +void +byte_queue_delete_tail (ByteQueue *queue, + gsize size) +{ + if (queue->end - queue->start < size) + queue->end = queue->start; + else + queue->end -= size; +} + +void +byte_queue_append (ByteQueue *queue, + const guint8 *bytes, + gsize n_bytes) +{ + if (in_segment (queue, bytes, n_bytes) && is_empty (queue)) + { + queue->start = (guint8 *)bytes; + queue->end = (guint8 *)bytes + n_bytes; + } + else + { + guint8 *tail = byte_queue_alloc_tail (queue, n_bytes); + + memcpy (tail, bytes, n_bytes); + } +} + +/* Transfer data from @src to @dest, if possible without copying. + * The data is appended to @dest's data if @dest is not empty + */ +void +byte_queue_steal_data (ByteQueue *dest, + ByteQueue *src) +{ + if (is_empty (dest)) + { + if (dest->segment) + g_free (dest->segment); + + dest->segment_size = src->segment_size; + dest->segment = src->segment; + dest->start = src->start; + dest->end = src->end; + + src->segment_size = 0; + src->segment = NULL; + src->start = NULL; + src->end = NULL; + } + else + { + const guint8 *data; + gsize size; + + data = byte_queue_get_data (src, &size); + byte_queue_append (dest, data, size); + } +} diff --git a/bytequeue.h b/bytequeue.h new file mode 100644 index 0000000..c789a9a --- /dev/null +++ b/bytequeue.h @@ -0,0 +1,63 @@ +#ifndef _BYTEQUEUE_H +#define _BYTEQUEUE_H + +#include <glib.h> + +typedef struct ByteQueue ByteQueue; + +ByteQueue *byte_queue_new (void); +guint8 *byte_queue_free (ByteQueue *queue, + gboolean free_data); +gsize byte_queue_get_length (ByteQueue *queue); + +/* The data returned is owned by the byte queue and becomes invalid + * as soon as any method is called on the queue. It is explicitly + * allowed to call byte_queue_append() with the returned data, and + * in that case the queue will avoid copying if it can. The append + * must be the first method called on the queue after the read. + */ +const guint8 *byte_queue_get_data (ByteQueue *queue, + gsize *n_bytes); +void byte_queue_put_back (ByteQueue *queue, + const guint8 *bytes, + gsize n_bytes); +void byte_queue_append (ByteQueue *queue, + const guint8 *bytes, + gsize n_bytes); +/* This function appends uninitialized data of length @size + * to the queue, then returns a pointer to the added data. + * The intention is that the app can read() into this + * memory, then use byte_queue_pop_tail() to delete the + * area that wasn't read into. Example + * + * guint8 *area = byte_queue_alloc_tail (queue, 8192); + * + * n_read = read (fd, area, 8192); + * + * byte_queue_delete_tail (queue, 8192 - (n_read < 0)? 0 : n_read); + * + * if (n_read < 0) + * { + * byte_queue_delete_tail (queue, 8192); + * handle_error(); + * n_read = 0; + * } + * else + * { + * byte_queue_delete_tail (8192 - n_read); + * + * // enjoy the new data in the queue + * } + */ + +guint8 *byte_queue_alloc_tail (ByteQueue *queue, + gsize size); +void byte_queue_delete_tail (ByteQueue *queue, + gsize size); + +/* Transfer data from @src to @dest without copying + */ +void byte_queue_steal_data (ByteQueue *dest, + ByteQueue *src); + +#endif diff --git a/connection.h b/connection.h new file mode 100644 index 0000000..9d6322b --- /dev/null +++ b/connection.h @@ -0,0 +1,43 @@ +#ifndef _CONNECTION_H_ +#define _CONNECTION_H_ + +typedef struct Connection Connection; +typedef union ConnectionEvent ConnectionEvent; + +typedef enum +{ + CONNECTION_READ, + CONNECTION_CLOSE, + CONNECTION_ERROR +} ConnectionEventType; + +union ConnectionEvent +{ + ConnectionEventType type; + + struct + { + ConnectionEventType type; + } read; + + struct + { + ConnectionEventType type; + } close; + + struct + { + ConnectionEventType type; + } error; +}; + +typedef void (* ConnectionFunc) (Connection *connection, + ConnectionEvent *event); + +Connection *connection_new (int fd, + MainContext *context, + ConnectionFunc func, + gpointer data); + + +#endif diff --git a/engine.c b/engine.c new file mode 100644 index 0000000..4da6ff4 --- /dev/null +++ b/engine.c @@ -0,0 +1,514 @@ +#include <unistd.h> +#include "engine.h" +#include "epoll.h" + +typedef struct FdInfo FdInfo; +typedef struct FdAction FdAction; + +struct FdInfo +{ +}; + +struct Context +{ + Engine *engine; + gpointer data; + GMutex *mutex; +}; + +typedef void (* EngineFdFunc) (const EPollEvent *event, gpointer data); + +struct FdAction +{ + int fd; + EPollEventType mask; + EngineFdFunc func; + gpointer data; +}; + +struct Engine +{ + gboolean quitting; + + GMutex * mutex; + + GCond * new_job_cond; + GCond * idle_cond; + GCond * exit_cond; + + GQueue * jobs; + + int n_threads; + int n_idle_threads; + int n_exited_threads; + + GQueue * fd_actions; + + /* polling */ + gboolean polling; + EPoll * epoll; + + int read_fd; + int write_fd; +}; + +static gpointer engine_thread (gpointer); + +/* + * Engine + */ + +static void +engine_lock (Engine *engine) +{ + g_mutex_lock (engine->mutex); +} + +static void +engine_unlock (Engine *engine) +{ + g_mutex_unlock (engine->mutex); +} + +static void +make_pipe (int *rd, int *wr) +{ + int fds[2]; + + if (pipe (fds) < 0) + g_error ("Could not create wake-up pipe\n"); + + *rd = fds[0]; + *wr = fds[1]; +} + +static void +poll_job (gpointer data) +{ + Engine *engine = data; + int i, n_events; + EPollEvent *events; + GList *list; + + engine_lock (engine); + + for (list = engine->fd_actions->head; list != NULL; list = list->next) + { + FdAction *action = list->data; + EPollEventType mask; + + } + + engine->polling = TRUE; + + engine_unlock (engine); + + events = epoll_wait (engine->epoll, &n_events, -1); + + for (i = 0; i < n_events; ++i) + { +#if 0 + FdInfo *info; +#endif + EPollEvent *event = &(events[i]); + +#if 0 + info = &(engine->fd_infos[event->fd]); +#endif + } + + g_free (events); + + engine_append_job (engine, poll_job, engine); +} + +Engine * +engine_new (guint n_threads) +{ + Engine *engine; + int i; + + g_return_val_if_fail (n_threads > 0, NULL); + + if (!g_thread_supported()) + g_thread_init (NULL); + + engine = g_new0 (Engine, 1); + + engine->mutex = g_mutex_new (); + engine->jobs = g_queue_new (); + + engine->new_job_cond = g_cond_new (); + engine->idle_cond = g_cond_new (); + engine->exit_cond = g_cond_new (); + engine->n_idle_threads = 0; + engine->n_exited_threads = 0; + engine->quitting = FALSE; + engine->n_threads = n_threads; + + engine->epoll = epoll_new (); + + engine->polling = FALSE; + + make_pipe (&engine->read_fd, &engine->write_fd); + epoll_add_fd (engine->epoll, engine->read_fd, + EPOLL_READ | EPOLL_ERROR, engine); + + engine->fd_actions = g_queue_new (); + + engine_lock (engine); + + for (i = 0; i < n_threads; ++i) + g_thread_create (engine_thread, engine, TRUE, NULL); + + engine_unlock (engine); + + engine_append_job (engine, poll_job, engine); + + return engine; +} + +static guint +engine_n_jobs (Engine *engine) +{ + g_assert (g_queue_get_length (engine->jobs) % 2 == 0); + + return (g_queue_get_length (engine->jobs)) / 2; +} + +/* Wait until all previously submitted jobs have finished */ +void +engine_sync (Engine *engine) +{ + engine_lock (engine); + + g_cond_broadcast (engine->new_job_cond); + + while (!(engine->n_idle_threads == engine->n_threads && + engine_n_jobs (engine) == 0)) + { + g_cond_wait (engine->idle_cond, engine->mutex); + } + + engine_unlock (engine); +} + +void +engine_free (Engine *engine) +{ + engine_lock (engine); + + engine->quitting = TRUE; + + g_cond_broadcast (engine->new_job_cond); + + while (engine->n_exited_threads != engine->n_threads) + g_cond_wait (engine->exit_cond, engine->mutex); + + engine_unlock (engine); + + while (!g_queue_is_empty (engine->jobs)) + g_queue_pop_head (engine->jobs); + + g_mutex_free (engine->mutex); + g_cond_free (engine->new_job_cond); + g_cond_free (engine->idle_cond); + g_cond_free (engine->exit_cond); + g_queue_free (engine->jobs); +} + +void +engine_append_job (Engine *engine, + EngineFunc job, + gpointer data) +{ + engine_lock (engine); + + g_queue_push_tail (engine->jobs, job); + g_queue_push_tail (engine->jobs, data); + + g_cond_signal (engine->new_job_cond); + + engine_unlock (engine); +} + +void +engine_prepend_job (Engine *engine, + EngineFunc job, + gpointer data) +{ + engine_lock (engine); + + g_queue_push_head (engine->jobs, data); + g_queue_push_head (engine->jobs, job); + + g_cond_signal (engine->new_job_cond); + + engine_unlock (engine); +} + +static gpointer +engine_thread (gpointer data) +{ + Engine *engine = data; + + engine_lock (engine); + + while (!engine->quitting) + { + if (!engine_n_jobs (engine)) + { + engine->n_idle_threads++; + + g_cond_signal (engine->idle_cond); + + g_cond_wait (engine->new_job_cond, engine->mutex); + + engine->n_idle_threads--; + } + + if (engine->quitting) + break; + + if (engine_n_jobs (engine)) + { + EngineFunc job; + gpointer data; + + job = g_queue_pop_head (engine->jobs); + data = g_queue_pop_head (engine->jobs); + + engine_unlock (engine); + + job (data); + + engine_lock (engine); + } + } + + engine->n_exited_threads++; + + g_cond_signal (engine->exit_cond); + + engine_unlock (engine); + + return NULL; +} + +static void +ensure_wake_up (Engine *engine) +{ + if (engine->polling) + { + char c = '!'; + + write (engine->write_fd, &c, 1); + engine->polling = FALSE; + } +} + +typedef struct +{ + int fd; + EngineFdFunc func; + gpointer data; +} EngineAddFd; + +typedef struct +{ + int fd; + EPollEventType mask; +} EngineChangeFd; + +typedef struct +{ + int fd; +} EngineRemoveFd; + +static void +engine_queue_add_fd (Engine *engine, + int fd, + EngineFdFunc func, + gpointer data) +{ + EngineAddFd *add_fd = g_new (EngineAddFd, 1); + + engine_lock (engine); + + add_fd->fd = fd; + add_fd->func = func; + add_fd->data = data; + + g_queue_push_tail (engine->fd_actions, add_fd); + + ensure_wake_up (engine); + + engine_unlock (engine); +} + +static void +engine_queue_change_fd (Engine *engine, + int fd, + EPollEventType mask) +{ + EngineChangeFd *change_fd = g_new (EngineChangeFd, 1); + + engine_lock (engine); + + change_fd->fd = fd; + change_fd->mask = mask; + + ensure_wake_up (engine); + + engine_unlock (engine); +} + +static void +engine_queue_remove_fd (Engine *engine, + int fd) +{ + EngineRemoveFd *remove_fd = g_new (EngineRemoveFd, 1); + + engine_lock (engine); + + remove_fd->fd = fd; + + ensure_wake_up (engine); + + engine_unlock (engine); +} + +/* + * Context + */ +static void +context_lock (Context *context) +{ + g_mutex_lock (context->mutex); +} + +static void +context_unlock (Context *context) +{ + g_mutex_unlock (context->mutex); +} + +Context * +context_new (Engine *engine, + gpointer data) +{ + Context *context = g_new0 (Context, 1); + + context->engine = engine; + context->data = data; + context->mutex = g_mutex_new (); + + return context; +} + +void +context_free (Context *context) +{ + context_lock (context); + + context_unlock (context); + + g_mutex_free (context->mutex); + g_free (context); +} + +Engine * +context_get_engine (Context *context) +{ + Engine *result; + + context_lock (context); + + result = context->engine; + + context_unlock (context); + + return result; +} + +gpointer +context_get_data (Context *context) +{ + gpointer result; + + context_lock (context); + + result = context->data; + + context_unlock (context); + + return result; +} + +static void +on_fd_event (const EPollEvent *event, + gpointer data) +{ + Context *context = data; + + context_lock (context); + + /* find fd in context */ + /* add relevant callbacks to queue */ + + context_unlock (context); +} + +void +context_add_fd (Context *context, + int fd) +{ + context_lock (context); + + engine_queue_add_fd (context->engine, fd, on_fd_event, context); + + context_unlock (context); +} + +void +context_remove_fd (Context *context, + int fd) +{ + context_lock (context); + + engine_queue_remove_fd (context->engine, fd); + + context_unlock (context); +} + +void +context_set_write (Context *context, + int fd, + ContextFdFunc callback) +{ + EPollEventType mask; + + context_lock (context); + + mask = 0; /* FIXME */ + + engine_queue_change_fd (context->engine, fd, mask); + + context_unlock (context); +} + +void +context_set_read (Context *context, + int fd, + ContextFdFunc callback) +{ + EPollEventType mask; + + context_lock (context); + + mask = 0; /* FIXME */ + + engine_queue_change_fd (context->engine, fd, mask); + + context_unlock (context); +} diff --git a/engine.h b/engine.h new file mode 100644 index 0000000..b590053 --- /dev/null +++ b/engine.h @@ -0,0 +1,71 @@ +#include <glib.h> + +typedef struct Engine Engine; + +typedef void (* EngineFunc) (gpointer data); + +Engine *engine_new (guint n_threads); +void engine_prepend_job (Engine *engine, + EngineFunc func, + gpointer data); +void engine_append_job (Engine *engine, + EngineFunc func, + gpointer data); +void engine_sync (Engine *engine); +void engine_free (Engine *engine); + +/* All functions executed within an Context + * are run serialized, ie., two functions from the + * same context never run at the same time. + * + * So if the data those functions operate on (typically + * a client structure) are only ever used by functions queued + * through the context, no locking is necessary. + * + * Note that if more than one thread will be accessing + * a context, locking is necessary. It is not generally + * allowed to call context_* on the same context from more + * than one thread without locking. + */ +typedef struct Context Context; + +typedef void (* ContextFdFunc) (Context *context, + int fd); +typedef void (* ContextFdRemovedFunc) (Context *context, + int fd, + gpointer data); + +Context *context_new (Engine *engine, + gpointer data); +void context_dispatch (Context *context); +Engine * context_get_engine (Context *context); +gpointer context_get_data (Context *context); + +/* fd */ +void context_add_fd (Context *contenxt, + int fd); +/* Note that removing an fd from the context also closes it. + * Yeah, that's a little unusual, but the alternative would + * be to add a notification that the fd was no longer in + * any outstanding poll() routines. + */ +void context_remove_fd (Context *context, + int fd); +void context_set_write (Context *contenxt, + int fd, + ContextFdFunc callback); +void context_set_read (Context *context, + int fd, + ContextFdFunc callback); + +/* other */ +void context_add_idle (Context *context, + EngineFunc func, + gpointer data); +void context_add_timeout (Context *context, + guint timeout_ms, + EngineFunc func, + gpointer data); +void context_add_task (Context *context, + EngineFunc func, + gpointer data); @@ -0,0 +1,192 @@ +#include "epoll.h" +#include <sys/poll.h> + +typedef struct FdInfo FdInfo; + +struct FdInfo +{ + /* FIXME: we might want to compress this struct at some point. + * Though, if we have 100000 fd's, it's only 1.6 MB, which is + * almost nothing for that many clients, Still, cache use + * would improve. + */ + gboolean valid; + gboolean disabled; + EPollEventType mask; + gpointer data; +}; + +struct EPoll +{ + int n_fds; + gsize n_fd_infos; + FdInfo * fd_info; +}; + +EPoll * +epoll_new (void) +{ + EPoll *epoll = g_new0 (EPoll, 1); + + epoll->n_fds = 0; + epoll->n_fd_infos = 1; + epoll->fd_info = g_new0 (FdInfo, 1); + + return epoll; +} + +void +epoll_add_fd (EPoll *epoll, + int fd, + EPollEventType mask, + gpointer data) +{ + g_return_if_fail (!epoll_has_fd (epoll, fd)); + + while (fd >= epoll->n_fd_infos) + epoll->n_fd_infos *= 2; + + epoll->fd_info = g_renew (FdInfo, epoll->fd_info, epoll->n_fd_infos); + + epoll->n_fds++; + + epoll->fd_info[fd].valid = TRUE; + epoll->fd_info[fd].mask = mask; + epoll->fd_info[fd].data = data; +} + +gboolean +epoll_has_fd (EPoll *epoll, + int fd) +{ + g_return_val_if_fail (epoll != NULL, FALSE); + + return (fd < epoll->n_fd_infos && epoll->fd_info[fd].valid); +} + +gint +epoll_get_n_fds (EPoll *epoll) +{ + return epoll->n_fds; +} + +void +epoll_remove_fd (EPoll *epoll, + int fd) +{ + g_return_if_fail (epoll_has_fd (epoll, fd)); + + epoll->fd_info[fd].valid = FALSE; +} + +gpointer +epoll_get_fd_data (EPoll *epoll, + int fd) +{ + g_return_val_if_fail (epoll != NULL, NULL); + g_return_val_if_fail (epoll_has_fd (epoll, fd), NULL); + + return epoll->fd_info[fd].data; +} + +/* epoll_wait() is a one-shot polling mechanism, so this function + * must be called to reenable fd's after they have been returned + * by epoll_wait(). + */ +void +epoll_reenable_fd (EPoll *epoll, + int fd) +{ + g_return_if_fail (epoll_has_fd (epoll, fd)); + + epoll->fd_info[fd].disabled = FALSE; +} + +EPollEvent * +epoll_wait (EPoll *epoll, + int *n_events, + int timeout) +{ + struct pollfd *fds; + int i, j; + int n_poll_fds; + int n_ready; + EPollEvent *events; + + fds = g_new (struct pollfd, epoll->n_fds); + + j = 0; + for (i = 0; i < epoll->n_fd_infos; ++i) + { + if (epoll->fd_info[i].valid && + !epoll->fd_info[i].disabled && + epoll->fd_info[i].mask != 0) + { + fds[j].events = 0; + + if (epoll->fd_info[i].mask & EPOLL_READ) + fds[j].events |= POLLIN; + + if (epoll->fd_info[i].mask & EPOLL_WRITE) + fds[j].events |= POLLOUT; + + if (epoll->fd_info[i].mask & EPOLL_HANGUP) + fds[j].events |= POLLHUP; + + if (epoll->fd_info[i].mask & EPOLL_PRIORITY) + fds[j].events |= POLLPRI; + + if (epoll->fd_info[i].mask & EPOLL_ERROR) + fds[j].events |= POLLERR; + + fds[j].fd = i; + + j++; + } + } + + n_poll_fds = j; + + events = NULL; + + n_ready = poll (fds, n_poll_fds, timeout); + + if (n_ready) + { + events = g_new0 (EPollEvent, n_ready); + + j = 0; + for (i = 0; i < n_poll_fds; ++i) + { + if (fds[i].revents) + { + events[j].events = 0; + + if (fds[i].revents & POLLIN) + events[j].events |= EPOLL_READ; + + if (fds[i].revents & POLLOUT) + events[j].events |= EPOLL_WRITE; + + if (fds[i].revents & POLLHUP) + events[j].events |= EPOLL_HANGUP; + + if (fds[i].revents & POLLERR) + events[j].events |= EPOLL_ERROR; + + if (fds[i].revents & POLLPRI) + events[j].events |= EPOLL_PRIORITY; + + events[j].fd = fds[i].fd; + epoll->fd_info[fds[i].fd].disabled = TRUE; + } + } + } + + g_free (fds); + + if (n_events) + *n_events = n_ready; + + return events; +} @@ -0,0 +1,41 @@ +#ifndef _EPOLL_H_ +#define _EPOLL_H_ + +#include <glib.h> + +typedef struct EPoll EPoll; + +typedef enum +{ + EPOLL_READ = 1 << 0, + EPOLL_WRITE = 1 << 1, + EPOLL_HANGUP = 1 << 2, + EPOLL_ERROR = 1 << 3, + EPOLL_PRIORITY = 1 << 4, +} EPollEventType; + +typedef struct +{ + EPollEventType events; + int fd; +} EPollEvent; + +EPoll *epoll_new (void); +void epoll_add_fd (EPoll *epoll, + int fd, + EPollEventType mask, + gpointer data); +void epoll_remove_fd (EPoll *epoll, + int fd); +gpointer epoll_get_fd_data (EPoll *epoll, + int fd); +gboolean epoll_has_fd (EPoll *epoll, + int fd); +void epoll_reenable_fd (EPoll *epoll, + int fd); +gint epoll_get_n_fds (EPoll *epoll); +EPollEvent * epoll_wait (EPoll *epoll, + int *n_events, + int timeout); + +#endif diff --git a/executor.c b/executor.c new file mode 100644 index 0000000..a70c248 --- /dev/null +++ b/executor.c @@ -0,0 +1,178 @@ +#include "executor.h" + +struct Executor +{ + gboolean quitting; + + GMutex * mutex; + + GCond * new_job_cond; + GCond * idle_cond; + GCond * exit_cond; + + GQueue * jobs; + + int n_threads; + int n_idle_threads; + int n_exited_threads; +}; + +static gpointer executor_thread (gpointer); + +static void +executor_lock (Executor *executor) +{ + g_mutex_lock (executor->mutex); +} + +static void +executor_unlock (Executor *executor) +{ + g_mutex_unlock (executor->mutex); +} + +Executor * +executor_new (guint n_threads) +{ + Executor *executor; + int i; + + g_return_val_if_fail (n_threads > 0, NULL); + + if (!g_thread_supported()) + g_thread_init (NULL); + + executor = g_new0 (Executor, 1); + + executor->mutex = g_mutex_new (); + executor->jobs = g_queue_new (); + + executor->new_job_cond = g_cond_new (); + executor->idle_cond = g_cond_new (); + executor->exit_cond = g_cond_new (); + executor->n_idle_threads = 0; + executor->n_exited_threads = 0; + executor->quitting = FALSE; + executor->n_threads = n_threads; + + executor_lock (executor); + + for (i = 0; i < n_threads; ++i) + g_thread_create (executor_thread, executor, TRUE, NULL); + + executor_unlock (executor); + + return executor; +} + +static guint +executor_n_jobs (Executor *executor) +{ + g_assert (g_queue_get_length (executor->jobs) % 2 == 0); + + return (g_queue_get_length (executor->jobs)) / 2; +} + +/* Wait until all previously submitted jobs have finished */ +void +executor_sync (Executor *executor) +{ + executor_lock (executor); + + g_cond_broadcast (executor->new_job_cond); + + while (!(executor->n_idle_threads == executor->n_threads && + executor_n_jobs (executor) == 0)) + { + g_cond_wait (executor->idle_cond, executor->mutex); + } + + executor_unlock (executor); +} + +void +executor_free (Executor *executor) +{ + executor_lock (executor); + + executor->quitting = TRUE; + + g_cond_broadcast (executor->new_job_cond); + + while (executor->n_exited_threads != executor->n_threads) + g_cond_wait (executor->exit_cond, executor->mutex); + + executor_unlock (executor); + + while (!g_queue_is_empty (executor->jobs)) + g_queue_pop_head (executor->jobs); + + g_mutex_free (executor->mutex); + g_cond_free (executor->new_job_cond); + g_cond_free (executor->idle_cond); + g_cond_free (executor->exit_cond); + g_queue_free (executor->jobs); +} + +void +executor_add_job (Executor *exe, + ExecutorJob job, + gpointer data) +{ + executor_lock (exe); + + g_queue_push_tail (exe->jobs, job); + g_queue_push_tail (exe->jobs, data); + + g_cond_signal (exe->new_job_cond); + + executor_unlock (exe); +} + +static gpointer +executor_thread (gpointer data) +{ + Executor *executor = data; + + executor_lock (executor); + + while (!executor->quitting) + { + if (!executor_n_jobs (executor)) + { + executor->n_idle_threads++; + + g_cond_signal (executor->idle_cond); + + g_cond_wait (executor->new_job_cond, executor->mutex); + + executor->n_idle_threads--; + } + + if (executor->quitting) + break; + + if (executor_n_jobs (executor)) + { + ExecutorJob job; + gpointer data; + + job = g_queue_pop_head (executor->jobs); + data = g_queue_pop_head (executor->jobs); + + executor_unlock (executor); + + job (data); + + executor_lock (executor); + } + } + + executor->n_exited_threads++; + + g_cond_signal (executor->exit_cond); + + executor_unlock (executor); + + return NULL; +} diff --git a/executor.h b/executor.h new file mode 100644 index 0000000..622a55e --- /dev/null +++ b/executor.h @@ -0,0 +1,18 @@ +#ifndef EXECUTOR_H +#define EXECUTOR_H + +#include <glib.h> + +typedef struct Executor Executor; + +typedef void (* ExecutorJob) (gpointer data); + +Executor *executor_new (guint n_threads); +void executor_add_job (Executor *exe, + ExecutorJob job, + gpointer data); +void executor_free (Executor *executor); +void executor_sync (Executor *executor); + +#endif + @@ -0,0 +1,52 @@ +#include <glib.h> +#include "executor.h" + +G_LOCK_DEFINE (blah); + +GList *list; + +static void +the_job (gpointer data) +{ + int i; + + G_LOCK (blah); + + for (i = 0; i < 34540; ++i) + ; + +#if 0 + g_print ("%d he string is: %p\n", + (GPOINTER_TO_INT (data)), g_thread_self()); +#endif + + list = g_list_prepend (list, data); + + G_UNLOCK (blah); +} + +int +main () +{ + Executor *exe = executor_new (4); + int i; + + for (i = 0; i < 200000; ++i) + { + executor_add_job (exe, the_job, i * 10 + 0); + executor_add_job (exe, the_job, i * 10 + 1); + executor_add_job (exe, the_job, i * 10 + 2); + executor_add_job (exe, the_job, i * 10 + 3); + executor_add_job (exe, the_job, i * 10 + 4); + executor_add_job (exe, the_job, i * 10 + 5); + executor_add_job (exe, the_job, i * 10 + 6); + executor_add_job (exe, the_job, i * 10 + 7); + executor_add_job (exe, the_job, i * 10 + 8); + executor_add_job (exe, the_job, i * 10 + 9); + } + + executor_sync (exe); + executor_free (exe); + + g_print ("%d\n", g_list_length (list)); +} diff --git a/http-server.c b/http-server.c new file mode 100644 index 0000000..cbf490f --- /dev/null +++ b/http-server.c @@ -0,0 +1,59 @@ +static void +client_free (Client *client) +{ + delete (peer); + close (client->fd); + main_context_free (client->context); + connection_destroy (client->connection); + g_free (client); +} + +static void +on_connection_event (connection) +{ + Client *client = lac_connection_get_data (client); + + switch (event) + { + case READ: + read_the_stuff(); + process_it(); + break; + + case ERROR: + connection_close(); + break; + + case CLOSED: + client_free (); + break; + } + + process_stuff (); + + connection_write_stuff (); +} + +static void +on_connection (LacListener *listener, int fd, LacAddress *peer, gpointer data) +{ + Server *server = lac_listener_get_data (listener); + Client *client = g_new (Client, 1); + client->peer = peer; + client->fd = fd; + + client->context = context_new (server->executor, server->poll, client); + client->connection = connection_new (fd, on_connection_event); +} + +Server * +server_new (int port) +{ + server->poll = epll_new(); + server->executor = executor_new (); + server->port = port; + + create_listener (port, on_connection, server); + + return server; +} diff --git a/jobqueue.c b/jobqueue.c new file mode 100644 index 0000000..427b62e --- /dev/null +++ b/jobqueue.c @@ -0,0 +1,180 @@ +#include "jobqueue.h" + +typedef struct Job Job; + +struct Job +{ + JobQueue * queue; + ExecutorJob job; + gpointer data; +}; + +struct JobQueue +{ + Executor * executor; + GQueue * jobs; + GMutex * mutex; + gboolean in_executor;; + int ref_count; +}; + +static Job * +job_new (JobQueue * queue, + ExecutorJob func, + gpointer data) +{ + Job *job = g_new0 (Job, 1); + job->job = func; + job->queue = queue; + job->data = data; + + return job; +} + +static void +job_queue_lock (JobQueue *queue) +{ + g_mutex_lock (queue->mutex); +} + +static void +job_queue_unlock (JobQueue *queue) +{ + g_mutex_unlock (queue->mutex); +} + +static void +delete_queue (JobQueue *queue) +{ + g_assert (queue->ref_count == 0); + + g_queue_free (queue->jobs); + g_mutex_free (queue->mutex); + g_free (queue); +} + +JobQueue * +job_queue_new (Executor *executor) +{ + JobQueue *queue = g_new0 (JobQueue, 1); + + queue->executor = executor; + queue->mutex = g_mutex_new (); + queue->in_executor = FALSE; + queue->ref_count = 1; + queue->jobs = g_queue_new (); + + return queue; +} + +static void +run_queue (gpointer data) +{ + JobQueue *queue = data; + gboolean do_free = FALSE; + + job_queue_lock (queue); + + g_assert (queue->in_executor == TRUE); + + while (!g_queue_is_empty (queue->jobs)) + { + Job *job = g_queue_pop_head (queue->jobs); + + job_queue_unlock (queue); + + job->job (job->data); + + g_free (job); + + job_queue_lock (queue); + } + + queue->ref_count--; + queue->in_executor = FALSE; + + if (queue->ref_count == 0) + do_free = TRUE; + + job_queue_unlock (queue); + + if (do_free) + delete_queue (queue); +} + +gpointer +job_queue_add (JobQueue *queue, + ExecutorJob func, + gpointer data) +{ + Job *job; + + job_queue_lock (queue); + + job = job_new (queue, func, data); + g_queue_push_tail (queue->jobs, job); + + if (!queue->in_executor) + { + queue->in_executor = TRUE; + queue->ref_count++; + executor_add_job (queue->executor, run_queue, queue); + } + + job_queue_unlock (queue); + + return job; +} + +void +job_queue_remove (JobQueue *queue, + gpointer job_id) +{ + Job *job = job_id; + GList *link; + + job_queue_lock (job->queue); + + link = g_queue_find (queue->jobs, job); + if (link) + { + g_queue_delete_link (queue->jobs, link); + g_free (job); + } + + job_queue_unlock (job->queue); +} + +void +job_queue_free (JobQueue *queue) +{ + gboolean do_free = FALSE; + + job_queue_lock (queue); + + if (queue->in_executor) + { + g_warning ("Trying to free a running queue"); + job_queue_unlock (queue); + return; + } + + while (!g_queue_is_empty (queue->jobs)) + { + Job *job = g_queue_pop_head (queue->jobs); + + g_free (job); + } + + g_queue_free (queue->jobs); + + queue->ref_count--; + + if (queue->ref_count == 0) + do_free = TRUE; + + job_queue_unlock (queue); + + if (do_free) + delete_queue (queue); +} diff --git a/jobqueue.h b/jobqueue.h new file mode 100644 index 0000000..9f2c20c --- /dev/null +++ b/jobqueue.h @@ -0,0 +1,26 @@ +#ifndef _JOB_QUEUE_H_ +#define _JOB_QUEUE_H_ + +#include "executor.h" + +typedef struct JobQueue JobQueue; + +/* A job queue has the property that no more than one + * job will be active at the same time. So if the currently + * running task knows that some object will only be touched by itself + * and other tasks in the same queue, it doesn't have to lock it. + * + * Also, the task can safely cancel other jobs in the queue. The + * pointer you get back is guaranteed to be valid until the corresponding + * task has returned + */ +JobQueue *job_queue_new (Executor *executor); +gpointer job_queue_add (JobQueue *queue, + ExecutorJob job, + gpointer data); +void job_queue_remove (JobQueue *queue, + gpointer job); +void job_queue_free (JobQueue *queue); + +#endif + @@ -0,0 +1,21 @@ +Locking is hard + +A possible design: + - Every object has its own lock + - No more than one lock can be taken by the same + thread at a time. + ++ Is a checkable invariant. A wrapper around g_mutex_lock() can be + written that checks it. + +- Draconican Rules: + - whenever you call a callback or a method on another object: + + - first make a copy of all the state yuo wish to pass + - then unlock + - then call and store the return value on the stack + - then lock + - then do something with the return value. + + And at this point be aware that other threads could + have changed you. @@ -0,0 +1,7 @@ +#include "engine.h" + +int +main () +{ + return 0; +} diff --git a/maincontext.c b/maincontext.c new file mode 100644 index 0000000..0edc79a --- /dev/null +++ b/maincontext.c @@ -0,0 +1,175 @@ +#include "jobqueue.h" +#include "maincontext.h" +#include "mainloop.h" + +/* A main context will generally correspond to one client in the server. + * + * Tasks added to it, either as fd tasks, or as idle/timeout tasks + * will run serially, always. + * + * Parallelism can be done through the - as yet - non-existant + * get_executor(). Or perhaps, create_job_queue()? + * + * Also, given a main context, who really needs a "mainloop"? How about + * just passing in an epoll and an executor at creation time? + * + * Sounds like a better idea. Hmm, the problem is who calls epoll_wait()? + * + * Also should probably have a main_context_get_data() and then pass the + * context to the callbacks, rather than the data directly. This way + * the callbacks have acess to the context as well. Though I guess the + * user data will generally contain a pointer to the context. So data + * itself is probably more convenient. Or is it? If you have data, then + * you have to do my_stuff->context to get at the context, if it is an + * argument, then you can just use it. Ie., there is no useless "data" + * parameter. + * + * We will most likely need per-fd data for things like Connections. + */ +typedef struct ContextFdInfo ContextFdInfo; + +struct MainContext +{ + MainLoop * loop; + gpointer data; + JobQueue * job_queue; +}; + +struct ContextFdInfo +{ + MainContext * context; + MainFdTask prepare; + MainFdTask read; + MainFdTask write; + gpointer data; +}; + +MainContext * +main_context_new (MainLoop *loop, + gpointer data) +{ + MainContext *context = g_new0 (MainContext, 1); + + context->loop = loop; + context->data = data; + context->job_queue = job_queue_new (main_loop_get_executor (loop)); + + return context; +} + +void +main_context_free (MainContext *context) +{ + /* FIXME: this will complain if we run it when there are still + * active jobs. But we kinda have to delete the job out of a + * task that runs in it. So what do we do? Maybe just make it + * legal to delete an active job queue. I don't really see the + * harm in that, except that the execution function in jobqueue.c + * might get a little bit more complex + */ + + job_queue_free (context->job_queue); + + g_free (context); +} + +static void +on_fd_activity (int fd, + EPollEventType mask, + gpointer data) +{ + +} + +/* Filedescriptors */ +void +main_context_add_fd (MainContext *context, + int fd, + gpointer data) +{ + ContextFdInfo *info = g_new0 (ContextFdInfo, 1); + + info->context = context; + info->data = data; + info->prepare = NULL; + info->read = NULL; + info->write = NULL; + + main_loop_add_fd (context->loop, + fd, + context->job_queue, + on_fd_activity, + info); +} + +void +main_context_remove_fd (MainContext *context, + int fd) +{ + +} + +static void +update_poll_mask (MainContext *context, + ContextFdInfo *info, + int fd) +{ + +} + +/* Called before the fd is polled */ +void +main_context_set_prepare_callback (MainContext *context, + int fd, + MainFdTask func) +{ + +} + +/* Called when the fd is writable */ +void +main_context_set_write_callback (MainContext *context, + int fd, + MainFdTask func) +{ + +} + +/* Called when there is data to be read, if the fd is closed/hungup, or + * if there is an error + */ +void +main_context_set_read_callback (MainContext *context, + int fd, + MainFdTask func) +{ + +} + +/* Idle/timeout tasks */ +#if 0 +typedef gboolean (* MainTaskFunc) (gpointer data); + +MainTask * +main_context_add_idle (MainContext *context, + MainTaskFunc func, + gpointer data) +{ + +} + +MainTask * +main_context_add_timeout (MainContext *context, + int millisecs, + MainTaskFunc func, + gpointer data) +{ + +} + +void +main_task_remove (MainTask *task) +{ + +} +#endif diff --git a/maincontext.h b/maincontext.h new file mode 100644 index 0000000..32f3af3 --- /dev/null +++ b/maincontext.h @@ -0,0 +1,47 @@ +#include <glib.h> +#include "mainloop.h" + +typedef struct MainContext MainContext; + +MainContext *main_context_new (MainLoop *loop, + gpointer data); +void main_context_free (MainContext *context); + +/* Filedescriptors */ +typedef void (* MainFdTask) (MainContext *context, + int fd, + gpointer data); + +void main_context_add_fd (MainContext *context, + int fd, + gpointer data); +void main_context_remove_fd (MainContext *context, + int fd); +/* Called before the fd is polled */ +void main_context_set_prepare_callback (MainContext *context, + int fd, + MainFdTask func); +/* Called when the fd is writable */ +void main_context_set_write_callback (MainContext *context, + int fd, + MainFdTask func); +/* Called when there is data to be read, if the fd is closed/hungup, or + * if there is an error + */ +void main_context_set_read_callback (MainContext *context, + int fd, + MainFdTask func); + +/* Idle/timeout tasks */ +#if 0 +typedef gboolean (* MainTaskFunc) (gpointer data); + +MainTask *main_context_add_idle (MainContext *context, + MainTaskFunc func, + gpointer data); +MainTask *main_context_add_timeout (MainContext *context, + int millisecs, + MainTaskFunc func, + gpointer data); +void main_task_remove (MainTask *task); +#endif diff --git a/mainloop.c b/mainloop.c new file mode 100644 index 0000000..754d84b --- /dev/null +++ b/mainloop.c @@ -0,0 +1,142 @@ +#include "mainloop.h" +#include "epoll.h" + +typedef struct FdInfo FdInfo; +typedef struct FdCallbackInfo FdCallbackInfo; + +struct FdInfo +{ + JobQueue * queue; + MainFdFunc func; + gpointer data; +}; + +struct MainLoop +{ + Executor *executor; + EPoll *epoll; +}; + +struct FdCallbackInfo +{ + EPoll *epoll; + int fd; + EPollEventType mask; + MainFdFunc func; + gpointer data; +}; + +static void +run_fd_callback (gpointer data) +{ + FdCallbackInfo *info = data; + + info->func (info->fd, info->mask, info->data); + + epoll_reenable_fd (info->epoll, info->fd); + + g_free (info); +} + +static void +poll_job (gpointer data) +{ + MainLoop *loop = data; + EPollEvent *events; + int i, n_events; + + events = epoll_wait (loop->epoll, &n_events, -1); + + for (i = 0; i < n_events; ++i) + { + EPollEvent *event = &(events[i]); + FdInfo *info = epoll_get_fd_data (loop->epoll, event->fd); + + FdCallbackInfo *callback_info = g_new0 (FdCallbackInfo, 1); + + callback_info->epoll = loop->epoll; + callback_info->fd = event->fd; + callback_info->mask = event->events; + callback_info->func = info->func; + callback_info->data = info->data; + + job_queue_add (info->queue, run_fd_callback, callback_info); + } + + g_free (events); + + executor_add_job (loop->executor, poll_job, loop); +} + +MainLoop * +main_loop_new (Executor *executor) +{ + MainLoop *loop; + + g_return_val_if_fail (executor != NULL, NULL); + + loop = g_new0 (MainLoop, 1); + loop->executor = executor; + loop->epoll = epoll_new (); + + executor_add_job (executor, poll_job, loop); + + return loop; +} + +void +main_loop_add_fd (MainLoop *loop, + int fd, + JobQueue *queue, + MainFdFunc func, + gpointer data) +{ + FdInfo *info; + + g_return_if_fail (loop != NULL); + g_return_if_fail (!epoll_has_fd (loop->epoll, fd)); + + info = g_new0 (FdInfo, 1); + info->queue = queue; + info->func = func; + info->data = data; + + epoll_add_fd (loop->epoll, fd, 0, info); +} + +#if 0 +void +main_loop_set_fd_poll_mask (MainLoop *loop, + int fd, + EPollEventType poll_mask) +{ + g_return_if_fail (loop != NULL); + g_return_if_fail (epoll_has_fd (loop->epoll, fd)); + + epoll_fd_set_mask (loop->epoll, fd, poll_mask); +} +#endif + +void +main_loop_remove_fd (MainLoop *loop, + int fd) +{ + FdInfo *info; + + g_return_if_fail (loop != NULL); + g_return_if_fail (epoll_has_fd (loop->epoll, fd)); + + info = epoll_get_fd_data (loop->epoll, fd); + + g_free (info); + + epoll_remove_fd (loop->epoll, fd); +} + +Executor * +main_loop_get_executor (MainLoop *loop) +{ + g_return_val_if_fail (loop != NULL, NULL); + + return loop->executor; +} diff --git a/mainloop.h b/mainloop.h new file mode 100644 index 0000000..70ed4d4 --- /dev/null +++ b/mainloop.h @@ -0,0 +1,67 @@ +#ifndef _MAINLOOP_H +#define _MAINLOOP_H + +#include <glib.h> +#include "jobqueue.h" +#include "epoll.h" + +typedef struct MainLoop MainLoop; + +typedef void (* MainFdFunc) (int fd, + EPollEventType mask, + gpointer data); + +MainLoop *main_loop_new (Executor *executor); +Executor *main_loop_get_executor (MainLoop *loop); +void main_loop_add_fd (MainLoop *loop, + int fd, + JobQueue *queue, + MainFdFunc func, + gpointer data); +void main_loop_set_fd_poll_mask (MainLoop *loop, + int fd, + EPollEventType poll_mask); +void main_loop_remove_fd (MainLoop *loop, + int fd); + +/* Missing: */ + +/* Maybe these functions really belong in the JobQueue? + * That would require support from the executor though. + * But fundamentally these are valid (in a threaded server) + * without fds, which is what the main loop provides. + * The executor already has a gcond_wait() that could be + * made a timed one. + * + * Note that a good priority queue is going to be needed in + * whatever class eventually implements these. + * + * Also, note that cancellation support is essential for these + * tasks. + * + * Yes, should probably happen in JobQueue + Executor: + * + * executor should maintain all jobs in a priority queue + * sorted by execution time. + * + * normal jobs get an execution time of "now", where timeout + * jobs get an execution of sometime in the future. + * + * this avoids starvation of timeouts since a timeout will + * eventually move to the front of the queue (with a time + * in the past possibly) + * + * + * + */ +#if 0 +void main_loop_add_timeout (MainLoop *loop, + JobQueue *queue, + MainFunc *task); +void main_loop_add_idle (MainLoop *loop, + JobQueue *queue, + MainFunc *task); +#endif + +#endif /* _MAINLOO_H_ */ + diff --git a/mainloop2.c b/mainloop2.c new file mode 100644 index 0000000..32a4fbe --- /dev/null +++ b/mainloop2.c @@ -0,0 +1,238 @@ +#ifndef _MAINLOOP_H +#define _MAINLOOP_H + +#include <glib.h> + +#include "mainloop2.h" + +struct FdInfo +{ + JobQueue * queue; + MainFdFunc func; + gpointer data; +}; + +struct MainLoop +{ + EPoll *epoll; + GMutex *mutex; + int wake_up_pipe_write; + int wake_up_pipe_read; + GQueue *new_fds; + GMutex *new_fds_mutex; +}; + +struct FdCallbackInfo +{ + EPoll *epoll; + int fd; + EPollEventType mask; + MainFdFunc func; + gpointer data; +}; + +struct MainLoop +{ +}; + +#if 0 +typedef void (* MainFdFunc) (int fd, + EPollEventType mask, + gpointer data); +#endif + +static void +main_loop_lock (MainLoop *loop) +{ + g_mutex_lock (loop->mutex); +} + +static void +main_loop_unlock (MainLoop *loop) +{ + g_mutex_unlock (loop->mutex); +} + +static void +setup_wakeup_pipe (MainLoop *main_loop) +{ + /* FIXME: check errors */ + int p[2]; + pipe (p); + main_loop->wake_up_pipe_read = p[0]; + main_loop->wake_up_pipe_write = p[1]; + epoll_add_fd (main_loop->epoll, main_loop->wake_up_pipe_read, NULL); +} + +MainLoop * +main_loop_new (void) +{ + MainLoop *main_loop = g_new0 (MainLoop, 1); + int p[2]; + + main_loop->epoll = epoll_new (void); + main_loop->mutex = g_mutex_new (); + + setup_wakeup_pipe (main_loop); + + main_loop->new_fds = g_queue_new (); + main_loop->new_fds_mutex = g_mutex_new (); + + return main_loop; +} + +typedef struct +{ + int fd; + MainFdFunc func; + gpointer data; + EPollEventType mask; +} EventInfo; + +static void +handle_fd (gpointer data) +{ + EventInfo *info = data; + + info->func (info->fd, info->mask, info->data); + + g_free (info); +} + +void +main_loop_run (MainLoop *loop) +{ + while (TRUE) + { + EPollEvent *events; + int n_events; + int i; + GList *list; + + main_loop_lock (loop); + + events = epoll_wait (loop->epoll, &n_events, -1); + + main_loop_unlock (loop); + + for (i = 0; i < n_events; ++i) + { + EPollEvent *event = &(events[i]); + + if (event->fd == loop->wake_up_pipe_read) + { + read_wake_up_pipe (event->fd); + } + else + { + FdInfo *info = epoll_get_fd_data (loop->epoll); + EventInfo *event_info = g_new (EventInfo, 1); + + event_info->fd = event->fd; + event_info->func = info->func; + event_info->data = info->func; + event_info->mask = event->mask; + + job_queue_add (info->queue, handle_fd, event_info); + } + } + + main_loop_lock (loop); + + g_mutex_lock (loop->new_fds_mutex); + + for (list = loop->new_fds; list != NULL; list = list->next->next) + { + int fd = GPOINTER_TO_INT (list->data); + FdInfo *info = list->next->data; + + epoll_add_fd (loop->epoll, fd, info); + } + + g_list_free (loop->new_fds); + loop->new_fds = NULL; + + g_mutex_unlock (loop->new_fds_mutex); + } +} + +void +main_loop_add_fd (MainLoop *loop, + int fd, + JobQueue *queue, + MainFdFunc func, + gpointer data) +{ + FdInfo *info; + + info = g_new0 (FdInfo, 1); + info->queue = queue; + info->func = func; + info->data = data; + + g_mutex_lock (loop->new_fds_mutex); + + loop->new_fds = g_list_prepend (loop->new_fds, info); + loop->new_fds = g_list_prepend (loop->new_fds, GINT_TO_POINTER (fd)); + + g_mutex_unlock (loop->new_fds_mutex); + + main_loop_wake_up (loop); +} + +void +main_loop_set_fd_poll_mask (MainLoop * loop, + int fd, + EPollEventType poll_mask) +{ + FdInfo *info; +} + +void +main_loop_remove_fd (MainLoop *loop, + int fd) +{ + +} + +/* Missing: */ + +/* Maybe these functions really belong in the JobQueue? + * That would require support from the executor though. + * But fundamentally these are valid (in a threaded server) + * without fds, which is what the main loop provides. + * The executor already has a gcond_wait() that could be + * made a timed one. + * + * Note that a good priority queue is going to be needed in + * whatever class eventually implements these. + * + * Also, note that cancellation support is essential for these + * tasks. + * + * Yes, should probably happen in JobQueue + Executor: + * + * executor should maintain all jobs in a priority queue + * sorted by execution time. + * + * normal jobs get an execution time of "now", where timeout + * jobs get an execution of sometime in the future. + * + * this avoids starvation of timeouts since a timeout will + * eventually move to the front of the queue (with a time + * in the past possibly) + * + * + * + */ +#if 0 +void main_loop_add_timeout (MainLoop *loop, + JobQueue *queue, + MainFunc *task); +void main_loop_add_idle (MainLoop *loop, + JobQueue *queue, + MainFunc *task); +#endif + +#endif /* _MAINLOO_H_ */ + diff --git a/mainloop2.h b/mainloop2.h new file mode 100644 index 0000000..e78877a --- /dev/null +++ b/mainloop2.h @@ -0,0 +1,67 @@ +#ifndef _MAINLOOP_H +#define _MAINLOOP_H + +#include <glib.h> +#include "jobqueue.h" +#include "epoll.h" + +typedef struct MainLoop MainLoop; + +typedef void (* MainFdFunc) (int fd, + EPollEventType mask, + gpointer data); + +MainLoop *main_loop_new (Executor *executor); +void main_loop_run (MainLoop *loop); +void main_loop_add_fd (MainLoop *loop, + int fd, + JobQueue *queue, + MainFdFunc func, + gpointer data); +void main_loop_set_fd_poll_mask (MainLoop *loop, + int fd, + EPollEventType poll_mask); +void main_loop_remove_fd (MainLoop *loop, + int fd); + +/* Missing: */ + +/* Maybe these functions really belong in the JobQueue? + * That would require support from the executor though. + * But fundamentally these are valid (in a threaded server) + * without fds, which is what the main loop provides. + * The executor already has a gcond_wait() that could be + * made a timed one. + * + * Note that a good priority queue is going to be needed in + * whatever class eventually implements these. + * + * Also, note that cancellation support is essential for these + * tasks. + * + * Yes, should probably happen in JobQueue + Executor: + * + * executor should maintain all jobs in a priority queue + * sorted by execution time. + * + * normal jobs get an execution time of "now", where timeout + * jobs get an execution of sometime in the future. + * + * this avoids starvation of timeouts since a timeout will + * eventually move to the front of the queue (with a time + * in the past possibly) + * + * + * + */ +#if 0 +void main_loop_add_timeout (MainLoop *loop, + JobQueue *queue, + MainFunc *task); +void main_loop_add_idle (MainLoop *loop, + JobQueue *queue, + MainFunc *task); +#endif + +#endif /* _MAINLOO_H_ */ + @@ -0,0 +1,60 @@ +#include <glib.h> +#include "jobqueue.h" + +G_LOCK_DEFINE (blah); + +GList *list; + +static void +the_job (gpointer data) +{ + int i; + +#if 0 + G_LOCK (blah); +#endif + + for (i = 0; i < 34540; ++i) + ; + +#if 0 + g_print ("%d he string is: %p\n", + (GPOINTER_TO_INT (data)), g_thread_self()); +#endif + +#if 0 + list = g_list_prepend (list, data); + + G_UNLOCK (blah); +#endif +} + +int +main () +{ + Executor *exe = executor_new (10); + int i; + + for (i = 0; i < 2000; ++i) + { + JobQueue *queue = job_queue_new (exe); + + job_queue_add (queue, the_job, GINT_TO_POINTER (i * 10 + 0)); + job_queue_add (queue, the_job, GINT_TO_POINTER (i * 10 + 1)); + job_queue_add (queue, the_job, GINT_TO_POINTER (i * 10 + 2)); + job_queue_add (queue, the_job, GINT_TO_POINTER (i * 10 + 3)); + job_queue_add (queue, the_job, GINT_TO_POINTER (i * 10 + 4)); + job_queue_add (queue, the_job, GINT_TO_POINTER (i * 10 + 5)); + job_queue_add (queue, the_job, GINT_TO_POINTER (i * 10 + 6)); + job_queue_add (queue, the_job, GINT_TO_POINTER (i * 10 + 7)); + job_queue_add (queue, the_job, GINT_TO_POINTER (i * 10 + 8)); + job_queue_add (queue, the_job, GINT_TO_POINTER (i * 10 + 9)); + } + + executor_sync (exe); + executor_free (exe); + + g_print ("%d\n", g_list_length (list)); + + return 0; +} |