diff options
author | Jamey Sharp <jamey@minilop.net> | 2010-05-10 16:51:24 -0700 |
---|---|---|
committer | Jamey Sharp <jamey@minilop.net> | 2010-05-10 16:51:24 -0700 |
commit | fd82552d5c0ce1931f29006a0c36f5e03cf8577e (patch) | |
tree | cad7da601336e6706cdb41267bef9d3205a84175 | |
parent | aae2a4a7aab26de3fa715d6ecd0a0e0926b37fc9 (diff) | |
parent | 933aee1d5c53b0cc7d608011a29188b594c8d70b (diff) |
Merge branch 'xlib-xcb-thread-fixes'
-rw-r--r-- | src/XlibInt.c | 11 | ||||
-rw-r--r-- | src/Xprivate.h | 12 | ||||
-rw-r--r-- | src/Xxcbint.h | 8 | ||||
-rw-r--r-- | src/xcb_disp.c | 6 | ||||
-rw-r--r-- | src/xcb_io.c | 425 |
5 files changed, 287 insertions, 175 deletions
diff --git a/src/XlibInt.c b/src/XlibInt.c index 13e65087..955cd079 100644 --- a/src/XlibInt.c +++ b/src/XlibInt.c @@ -86,14 +86,6 @@ xthread_t (*_Xthread_self_fn)(void) = NULL; (*(d)->lock->push_reader)(d,&(d)->lock->reply_awaiters_tail) : NULL) #define QueueEventReaderLock(d) ((d)->lock ? \ (*(d)->lock->push_reader)(d,&(d)->lock->event_awaiters_tail) : NULL) - -#if defined(XTHREADS_WARN) || defined(XTHREADS_FILE_LINE) -#define InternalLockDisplay(d,wskip) if ((d)->lock) \ - (*(d)->lock->internal_lock_display)(d,wskip,__FILE__,__LINE__) -#else -#define InternalLockDisplay(d,wskip) if ((d)->lock) \ - (*(d)->lock->internal_lock_display)(d,wskip) -#endif #endif /* !USE_XCB */ #else /* XTHREADS else */ @@ -101,7 +93,6 @@ xthread_t (*_Xthread_self_fn)(void) = NULL; #if !USE_XCB #define UnlockNextReplyReader(d) #define UnlockNextEventReader(d) -#define InternalLockDisplay(d,wskip) #endif /* !USE_XCB */ #endif /* XTHREADS else */ @@ -575,7 +566,7 @@ void sync_while_locked(Display *dpy) #endif UnlockDisplay(dpy); SyncHandle(); - LockDisplay(dpy); + InternalLockDisplay(dpy, /* don't skip user locks */ 0); #ifdef XTHREADS if (dpy->lock) (*dpy->lock->user_unlock_display)(dpy); diff --git a/src/Xprivate.h b/src/Xprivate.h index cba07db1..ebe51c1e 100644 --- a/src/Xprivate.h +++ b/src/Xprivate.h @@ -13,4 +13,16 @@ extern void _XSeqSyncFunction(Display *dpy); extern void _XSetPrivSyncFunction(Display *dpy); extern void _XSetSeqSyncFunction(Display *dpy); +#ifdef XTHREADS +#if defined(XTHREADS_WARN) || defined(XTHREADS_FILE_LINE) +#define InternalLockDisplay(d,wskip) if ((d)->lock) \ + (*(d)->lock->internal_lock_display)(d,wskip,__FILE__,__LINE__) +#else +#define InternalLockDisplay(d,wskip) if ((d)->lock) \ + (*(d)->lock->internal_lock_display)(d,wskip) +#endif +#else /* XTHREADS else */ +#define InternalLockDisplay(d,wskip) +#endif /* XTHREADS else */ + #endif /* XPRIVATE_H */ diff --git a/src/Xxcbint.h b/src/Xxcbint.h index f6afa183..c8191620 100644 --- a/src/Xxcbint.h +++ b/src/Xxcbint.h @@ -16,12 +16,13 @@ typedef struct PendingRequest PendingRequest; struct PendingRequest { PendingRequest *next; unsigned long sequence; + unsigned reply_waiter; }; typedef struct _X11XCBPrivate { xcb_connection_t *connection; PendingRequest *pending_requests; - PendingRequest **pending_requests_tail; + PendingRequest *pending_requests_tail; xcb_generic_event_t *next_event; char *real_bufmax; char *reply_data; @@ -31,11 +32,10 @@ typedef struct _X11XCBPrivate { enum XEventQueueOwner event_owner; XID next_xid; - /* handle simultaneous threads waiting for events, - * used in wait_or_poll_for_event - */ + /* handle simultaneous threads waiting for responses */ xcondition_t event_notify; int event_waiter; + xcondition_t reply_notify; } _X11XCBPrivate; /* xcb_disp.c */ diff --git a/src/xcb_disp.c b/src/xcb_disp.c index 99e08365..690a6036 100644 --- a/src/xcb_disp.c +++ b/src/xcb_disp.c @@ -95,13 +95,14 @@ int _XConnectXCB(Display *dpy, _Xconst char *display, char **fullnamep, int *scr dpy->fd = xcb_get_file_descriptor(c); dpy->xcb->connection = c; - dpy->xcb->pending_requests_tail = &dpy->xcb->pending_requests; dpy->xcb->next_xid = xcb_generate_id(dpy->xcb->connection); dpy->xcb->event_notify = xcondition_malloc(); - if (!dpy->xcb->event_notify) + dpy->xcb->reply_notify = xcondition_malloc(); + if (!dpy->xcb->event_notify || !dpy->xcb->reply_notify) return 0; xcondition_init(dpy->xcb->event_notify); + xcondition_init(dpy->xcb->reply_notify); return !xcb_connection_has_error(c); } @@ -116,5 +117,6 @@ void _XFreeX11XCBStructure(Display *dpy) free(tmp); } xcondition_free(dpy->xcb->event_notify); + xcondition_free(dpy->xcb->reply_notify); Xfree(dpy->xcb); } diff --git a/src/xcb_io.c b/src/xcb_io.c index 463ef844..dac7622c 100644 --- a/src/xcb_io.c +++ b/src/xcb_io.c @@ -25,7 +25,7 @@ static void return_socket(void *closure) { Display *dpy = closure; - LockDisplay(dpy); + InternalLockDisplay(dpy, /* don't skip user locks */ 0); _XSend(dpy, NULL, 0); dpy->bufmax = dpy->buffer; UnlockDisplay(dpy); @@ -116,6 +116,39 @@ static void check_internal_connections(Display *dpy) } } +static PendingRequest *append_pending_request(Display *dpy, unsigned long sequence) +{ + PendingRequest *node = malloc(sizeof(PendingRequest)); + assert(node); + node->next = NULL; + node->sequence = sequence; + node->reply_waiter = 0; + if(dpy->xcb->pending_requests_tail) + { + assert(XLIB_SEQUENCE_COMPARE(dpy->xcb->pending_requests_tail->sequence, <, node->sequence)); + assert(dpy->xcb->pending_requests_tail->next == NULL); + dpy->xcb->pending_requests_tail->next = node; + } + else + dpy->xcb->pending_requests = node; + dpy->xcb->pending_requests_tail = node; + return node; +} + +static void dequeue_pending_request(Display *dpy, PendingRequest *req) +{ + assert(req == dpy->xcb->pending_requests); + dpy->xcb->pending_requests = req->next; + if(!dpy->xcb->pending_requests) + { + assert(req == dpy->xcb->pending_requests_tail); + dpy->xcb->pending_requests_tail = NULL; + } + else + assert(XLIB_SEQUENCE_COMPARE(req->sequence, <, dpy->xcb->pending_requests->sequence)); + free(req); +} + static int handle_error(Display *dpy, xError *err, Bool in_XReply) { _XExtension *ext; @@ -134,43 +167,6 @@ static int handle_error(Display *dpy, xError *err, Bool in_XReply) return 0; } -static void call_handlers(Display *dpy, xcb_generic_reply_t *buf) -{ - _XAsyncHandler *async, *next; - for(async = dpy->async_handlers; async; async = next) - { - next = async->next; - if(async->handler(dpy, (xReply *) buf, (char *) buf, sizeof(xReply) + (buf->length << 2), async->data)) - return; - } -} - -static xcb_generic_event_t * wait_or_poll_for_event(Display *dpy, int wait) -{ - xcb_connection_t *c = dpy->xcb->connection; - xcb_generic_event_t *event; - if(wait) - { - if(dpy->xcb->event_waiter) - { - ConditionWait(dpy, dpy->xcb->event_notify); - event = xcb_poll_for_event(c); - } - else - { - dpy->xcb->event_waiter = 1; - UnlockDisplay(dpy); - event = xcb_wait_for_event(c); - LockDisplay(dpy); - dpy->xcb->event_waiter = 0; - ConditionBroadcast(dpy, dpy->xcb->event_notify); - } - } - else - event = xcb_poll_for_event(c); - return event; -} - /* Widen a 32-bit sequence number into a native-word-size (unsigned long) * sequence number. Treating the comparison as a 1 and shifting it avoids a * conditional branch, and shifting by 16 twice avoids a compiler warning when @@ -181,98 +177,114 @@ static void widen(unsigned long *wide, unsigned int narrow) *wide = new + ((unsigned long) (new < *wide) << 16 << 16); } -static void process_responses(Display *dpy, int wait_for_first_event, xcb_generic_error_t **current_error, unsigned long current_request) -{ - void *reply; - xcb_generic_event_t *event = dpy->xcb->next_event; - xcb_generic_error_t *error; - xcb_connection_t *c = dpy->xcb->connection; - if(!event && dpy->xcb->event_owner == XlibOwnsEventQueue) - event = wait_or_poll_for_event(dpy, wait_for_first_event); +/* Thread-safety rules: + * + * At most one thread can be reading from XCB's event queue at a time. + * If you are not the current event-reading thread and you need to find + * out if an event is available, you must wait. + * + * The same rule applies for reading replies. + * + * A single thread cannot be both the the event-reading and the + * reply-reading thread at the same time. + * + * We always look at both the current event and the first pending reply + * to decide which to process next. + * + * We always process all responses in sequence-number order, which may + * mean waiting for another thread (either the event_waiter or the + * reply_waiter) to handle an earlier response before we can process or + * return a later one. If so, we wait on the corresponding condition + * variable for that thread to process the response and wake us up. + */ +static xcb_generic_reply_t *poll_for_event(Display *dpy) +{ + /* Make sure the Display's sequence numbers are valid */ require_socket(dpy); - while(1) + /* Precondition: This thread can safely get events from XCB. */ + assert(dpy->xcb->event_owner == XlibOwnsEventQueue && !dpy->xcb->event_waiter); + + if(!dpy->xcb->next_event) + dpy->xcb->next_event = xcb_poll_for_event(dpy->xcb->connection); + + if(dpy->xcb->next_event) { PendingRequest *req = dpy->xcb->pending_requests; + xcb_generic_event_t *event = dpy->xcb->next_event; unsigned long event_sequence = dpy->last_request_read; - if(event) - widen(&event_sequence, event->full_sequence); - assert(!(req && current_request && !XLIB_SEQUENCE_COMPARE(req->sequence, <=, current_request))); - if(event && (!req || XLIB_SEQUENCE_COMPARE(event_sequence, <=, req->sequence))) + widen(&event_sequence, event->full_sequence); + if(!req || XLIB_SEQUENCE_COMPARE(event_sequence, <, req->sequence) + || (event->response_type != X_Error && event_sequence == req->sequence)) { + assert(XLIB_SEQUENCE_COMPARE(event_sequence, <=, dpy->request)); dpy->last_request_read = event_sequence; - if(event->response_type != X_Error) - { - /* GenericEvents may be > 32 bytes. In this - * case, the event struct is trailed by the - * additional bytes. the xcb_generic_event_t - * struct uses 4 bytes for internal numbering, - * so we need to shift the trailing data to be - * after the first 32 bytes. */ - if (event->response_type == GenericEvent && - ((xcb_ge_event_t*)event)->length) - { - memmove(&event->full_sequence, - &event[1], - ((xcb_ge_event_t*)event)->length * 4); - } - _XEnq(dpy, (xEvent *) event); - wait_for_first_event = 0; - } - else if(current_error && event_sequence == current_request) - { - /* This can only occur when called from - * _XReply, which doesn't need a new event. */ - *current_error = (xcb_generic_error_t *) event; - event = NULL; - break; - } - else - handle_error(dpy, (xError *) event, current_error != NULL); - free(event); - event = wait_or_poll_for_event(dpy, wait_for_first_event); + dpy->xcb->next_event = NULL; + return (xcb_generic_reply_t *) event; } - else if(req && req->sequence == current_request) + } + return NULL; +} + +static xcb_generic_reply_t *poll_for_response(Display *dpy) +{ + void *response; + xcb_generic_error_t *error; + PendingRequest *req; + while(!(response = poll_for_event(dpy)) && + (req = dpy->xcb->pending_requests) && + !req->reply_waiter && + xcb_poll_for_reply(dpy->xcb->connection, req->sequence, &response, &error)) + { + assert(XLIB_SEQUENCE_COMPARE(req->sequence, <=, dpy->request)); + dpy->last_request_read = req->sequence; + if(!response) + dequeue_pending_request(dpy, req); + if(error) + return (xcb_generic_reply_t *) error; + } + return response; +} + +static void handle_response(Display *dpy, xcb_generic_reply_t *response, Bool in_XReply) +{ + _XAsyncHandler *async, *next; + switch(response->response_type) + { + case X_Reply: + for(async = dpy->async_handlers; async; async = next) { - break; + next = async->next; + if(async->handler(dpy, (xReply *) response, (char *) response, sizeof(xReply) + (response->length << 2), async->data)) + break; } - else if(req && xcb_poll_for_reply(dpy->xcb->connection, req->sequence, &reply, &error)) + break; + + case X_Error: + handle_error(dpy, (xError *) response, in_XReply); + break; + + default: /* event */ + /* GenericEvents may be > 32 bytes. In this case, the + * event struct is trailed by the additional bytes. the + * xcb_generic_event_t struct uses 4 bytes for internal + * numbering, so we need to shift the trailing data to + * be after the first 32 bytes. */ + if(response->response_type == GenericEvent && ((xcb_ge_event_t *) response)->length) { - if(reply || error) - dpy->last_request_read = req->sequence; - if(reply) - { - call_handlers(dpy, reply); - free(reply); - } - if(error) - { - handle_error(dpy, (xError *) error, current_error != NULL); - free(error); - } - if(!reply) - { - dpy->xcb->pending_requests = req->next; - if(!dpy->xcb->pending_requests) - dpy->xcb->pending_requests_tail = &dpy->xcb->pending_requests; - free(req); - } + xcb_ge_event_t *event = (xcb_ge_event_t *) response; + memmove(&event->full_sequence, &event[1], event->length * 4); } - else - break; + _XEnq(dpy, (xEvent *) response); + break; } - - dpy->xcb->next_event = event; - - if(xcb_connection_has_error(c)) - _XIOError(dpy); - - assert(XLIB_SEQUENCE_COMPARE(dpy->last_request_read, <=, dpy->request)); + free(response); } int _XEventsQueued(Display *dpy, int mode) { + xcb_generic_reply_t *response; if(dpy->flags & XlibDisplayIOError) return 0; if(dpy->xcb->event_owner != XlibOwnsEventQueue) @@ -282,7 +294,17 @@ int _XEventsQueued(Display *dpy, int mode) _XSend(dpy, NULL, 0); else check_internal_connections(dpy); - process_responses(dpy, 0, NULL, 0); + + /* If another thread is blocked waiting for events, then we must + * let that thread pick up the next event. Since it blocked, we + * can reasonably claim there are no new events right now. */ + if(!dpy->xcb->event_waiter) + { + while((response = poll_for_response(dpy))) + handle_response(dpy, response, False); + if(xcb_connection_has_error(dpy->xcb->connection)) + _XIOError(dpy); + } return dpy->qlen; } @@ -291,15 +313,66 @@ int _XEventsQueued(Display *dpy, int mode) */ void _XReadEvents(Display *dpy) { + xcb_generic_reply_t *response; + unsigned long serial; + if(dpy->flags & XlibDisplayIOError) return; _XSend(dpy, NULL, 0); if(dpy->xcb->event_owner != XlibOwnsEventQueue) return; check_internal_connections(dpy); - do { - process_responses(dpy, 1, NULL, 0); - } while (dpy->qlen == 0); + + serial = dpy->next_event_serial_num; + while(serial == dpy->next_event_serial_num || dpy->qlen == 0) + { + if(dpy->xcb->event_waiter) + { + ConditionWait(dpy, dpy->xcb->event_notify); + /* Maybe the other thread got us an event. */ + continue; + } + + if(!dpy->xcb->next_event) + { + xcb_generic_event_t *event; + dpy->xcb->event_waiter = 1; + UnlockDisplay(dpy); + event = xcb_wait_for_event(dpy->xcb->connection); + InternalLockDisplay(dpy, /* don't skip user locks */ 0); + dpy->xcb->event_waiter = 0; + ConditionBroadcast(dpy, dpy->xcb->event_notify); + if(!event) + _XIOError(dpy); + dpy->xcb->next_event = event; + } + + /* We've established most of the conditions for + * poll_for_response to return non-NULL. The exceptions + * are connection shutdown, and finding that another + * thread is waiting for the next reply we'd like to + * process. */ + + response = poll_for_response(dpy); + if(response) + handle_response(dpy, response, False); + else if(dpy->xcb->pending_requests->reply_waiter) + { /* need braces around ConditionWait */ + ConditionWait(dpy, dpy->xcb->reply_notify); + } + else + _XIOError(dpy); + } + + /* The preceding loop established that there is no + * event_waiter--unless we just called ConditionWait because of + * a reply_waiter, in which case another thread may have become + * the event_waiter while we slept unlocked. */ + if(!dpy->xcb->event_waiter) + while((response = poll_for_response(dpy))) + handle_response(dpy, response, False); + if(xcb_connection_has_error(dpy->xcb->connection)) + _XIOError(dpy); } /* @@ -330,15 +403,8 @@ void _XSend(Display *dpy, const char *data, long size) if(dpy->xcb->event_owner != XlibOwnsEventQueue || dpy->async_handlers) { uint64_t sequence; - for(sequence = dpy->xcb->last_flushed; sequence < dpy->request; ++sequence) - { - PendingRequest *req = malloc(sizeof(PendingRequest)); - assert(req); - req->next = NULL; - req->sequence = sequence; - *dpy->xcb->pending_requests_tail = req; - dpy->xcb->pending_requests_tail = &req->next; - } + for(sequence = dpy->xcb->last_flushed + 1; sequence <= dpy->request; ++sequence) + append_pending_request(dpy, sequence); } requests = dpy->request - dpy->xcb->last_flushed; dpy->xcb->last_flushed = dpy->request; @@ -385,13 +451,7 @@ static const XID inval_id = ~0UL; void _XIDHandler(Display *dpy) { if (dpy->xcb->next_xid == inval_id) - { - /* We drop the Display lock to call xcb_generate_id, and - * xcb_generate_id might take the socket back, which will call - * LockDisplay. Avoid recursing. */ - dpy->xcb->next_xid = inval_id - 1; _XAllocIDs(dpy, &dpy->xcb->next_xid, 1); - } } /* _XAllocID - resource ID allocation routine. */ @@ -416,7 +476,7 @@ void _XAllocIDs(Display *dpy, XID *ids, int count) for (i = 0; i < count; i++) ids[i] = xcb_generate_id(dpy->xcb->connection); #ifdef XTHREADS - LockDisplay(dpy); + InternalLockDisplay(dpy, /* don't skip user locks */ 0); if (dpy->lock) (*dpy->lock->user_unlock_display)(dpy); #endif @@ -430,24 +490,6 @@ static void _XFreeReplyData(Display *dpy, Bool force) dpy->xcb->reply_data = NULL; } -static PendingRequest * insert_pending_request(Display *dpy) -{ - PendingRequest **cur = &dpy->xcb->pending_requests; - while(*cur && XLIB_SEQUENCE_COMPARE((*cur)->sequence, <, dpy->request)) - cur = &((*cur)->next); - if(!*cur || (*cur)->sequence != dpy->request) - { - PendingRequest *node = malloc(sizeof(PendingRequest)); - assert(node); - node->next = *cur; - node->sequence = dpy->request; - if(cur == dpy->xcb->pending_requests_tail) - dpy->xcb->pending_requests_tail = &(node->next); - *cur = node; - } - return *cur; -} - /* * _XReply - Wait for a reply packet and copy its contents into the * specified rep. @@ -467,20 +509,87 @@ Status _XReply(Display *dpy, xReply *rep, int extra, Bool discard) return 0; _XSend(dpy, NULL, 0); - current = insert_pending_request(dpy); - /* FIXME: drop the Display lock while waiting? - * Complicates process_responses. */ - reply = xcb_wait_for_reply(c, current->sequence, &error); + if(dpy->xcb->pending_requests_tail && dpy->xcb->pending_requests_tail->sequence == dpy->request) + current = dpy->xcb->pending_requests_tail; + else + current = append_pending_request(dpy, dpy->request); + /* Don't let any other thread get this reply. */ + current->reply_waiter = 1; + while(1) + { + PendingRequest *req = dpy->xcb->pending_requests; + xcb_generic_reply_t *response; + + if(req != current && req->reply_waiter) + { + ConditionWait(dpy, dpy->xcb->reply_notify); + /* Another thread got this reply. */ + continue; + } + req->reply_waiter = 1; + UnlockDisplay(dpy); + response = xcb_wait_for_reply(c, req->sequence, &error); + InternalLockDisplay(dpy, /* don't skip user locks */ 0); + + /* We have the response we're looking for. Now, before + * letting anyone else process this sequence number, we + * need to process any events that should have come + * earlier. */ + + if(dpy->xcb->event_owner == XlibOwnsEventQueue) + { + xcb_generic_reply_t *event; + /* If some thread is already waiting for events, + * it will get the first one. That thread must + * process that event before we can continue. */ + /* FIXME: That event might be after this reply, + * and might never even come--or there might be + * multiple threads trying to get events. */ + while(dpy->xcb->event_waiter) + { /* need braces around ConditionWait */ + ConditionWait(dpy, dpy->xcb->event_notify); + } + while((event = poll_for_event(dpy))) + handle_response(dpy, event, True); + } + + req->reply_waiter = 0; + ConditionBroadcast(dpy, dpy->xcb->reply_notify); + assert(XLIB_SEQUENCE_COMPARE(req->sequence, <=, dpy->request)); + dpy->last_request_read = req->sequence; + if(!response) + dequeue_pending_request(dpy, req); + + if(req == current) + { + reply = (char *) response; + break; + } + + if(error) + handle_response(dpy, (xcb_generic_reply_t *) error, True); + else if(response) + handle_response(dpy, response, True); + } check_internal_connections(dpy); - process_responses(dpy, 0, &error, current->sequence); + + if(dpy->xcb->next_event && dpy->xcb->next_event->response_type == X_Error) + { + xcb_generic_event_t *event = dpy->xcb->next_event; + unsigned long event_sequence = dpy->last_request_read; + widen(&event_sequence, event->full_sequence); + if(event_sequence == current->sequence) + { + error = (xcb_generic_error_t *) event; + dpy->xcb->next_event = NULL; + } + } if(error) { int ret_code; - dpy->last_request_read = error->full_sequence; - /* Xlib is evil and assumes that even errors will be * copied into rep. */ memcpy(rep, error, 32); @@ -523,8 +632,6 @@ Status _XReply(Display *dpy, xReply *rep, int extra, Bool discard) return 0; } - dpy->last_request_read = current->sequence; - /* there's no error and we have a reply. */ dpy->xcb->reply_data = reply; dpy->xcb->reply_consumed = sizeof(xReply) + (extra * 4); |