1 /*
2 * Copyright (C) 2011 Igalia S.L.
3 * Copyright (C) 2010 Apple Inc. All rights reserved.
4 * Portions Copyright (c) 2010 Motorola Mobility, Inc. All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
16 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
19 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
25 * THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28 #include "config.h"
29 #include "WorkQueue.h"
30
31 #include "WKBase.h"
32 #include <WebCore/NotImplemented.h>
33 #include <gio/gio.h>
34 #include <glib.h>
35 #include <wtf/gobject/GRefPtr.h>
36
37 // WorkQueue::EventSource
38 class WorkQueue::EventSource {
39 public:
EventSource(PassOwnPtr<WorkItem> workItem,WorkQueue * workQueue,GCancellable * cancellable)40 EventSource(PassOwnPtr<WorkItem> workItem, WorkQueue* workQueue, GCancellable* cancellable)
41 : m_workItem(workItem)
42 , m_workQueue(workQueue)
43 , m_cancellable(cancellable)
44 {
45 }
46
cancel()47 void cancel()
48 {
49 if (!m_cancellable)
50 return;
51 g_cancellable_cancel(m_cancellable);
52 }
53
executeEventSource(EventSource * eventSource)54 static void executeEventSource(EventSource* eventSource)
55 {
56 ASSERT(eventSource);
57 WorkQueue* queue = eventSource->m_workQueue;
58 {
59 MutexLocker locker(queue->m_isValidMutex);
60 if (!queue->m_isValid)
61 return;
62 }
63
64 eventSource->m_workItem->execute();
65 }
66
performWorkOnce(EventSource * eventSource)67 static gboolean performWorkOnce(EventSource* eventSource)
68 {
69 executeEventSource(eventSource);
70 return FALSE;
71 }
72
performWork(GSocket * socket,GIOCondition condition,EventSource * eventSource)73 static gboolean performWork(GSocket* socket, GIOCondition condition, EventSource* eventSource)
74 {
75 if (!(condition & G_IO_IN) && !(condition & G_IO_HUP) && !(condition & G_IO_ERR)) {
76 // EventSource has been cancelled, return FALSE to destroy the source.
77 return FALSE;
78 }
79
80 executeEventSource(eventSource);
81 return TRUE;
82 }
83
performWorkOnTermination(GPid,gint,EventSource * eventSource)84 static gboolean performWorkOnTermination(GPid, gint, EventSource* eventSource)
85 {
86 executeEventSource(eventSource);
87 return FALSE;
88 }
89
deleteEventSource(EventSource * eventSource)90 static void deleteEventSource(EventSource* eventSource)
91 {
92 ASSERT(eventSource);
93 delete eventSource;
94 }
95
96 public:
97 PassOwnPtr<WorkItem> m_workItem;
98 WorkQueue* m_workQueue;
99 GCancellable* m_cancellable;
100 };
101
102 // WorkQueue
platformInitialize(const char * name)103 void WorkQueue::platformInitialize(const char* name)
104 {
105 m_eventContext = g_main_context_new();
106 ASSERT(m_eventContext);
107 m_eventLoop = g_main_loop_new(m_eventContext, FALSE);
108 ASSERT(m_eventLoop);
109 m_workQueueThread = createThread(reinterpret_cast<WTF::ThreadFunction>(&WorkQueue::startWorkQueueThread), this, name);
110 }
111
platformInvalidate()112 void WorkQueue::platformInvalidate()
113 {
114 MutexLocker locker(m_eventLoopLock);
115
116 if (m_eventLoop) {
117 if (g_main_loop_is_running(m_eventLoop))
118 g_main_loop_quit(m_eventLoop);
119
120 g_main_loop_unref(m_eventLoop);
121 m_eventLoop = 0;
122 }
123
124 if (m_eventContext) {
125 g_main_context_unref(m_eventContext);
126 m_eventContext = 0;
127 }
128 }
129
startWorkQueueThread(WorkQueue * workQueue)130 void* WorkQueue::startWorkQueueThread(WorkQueue* workQueue)
131 {
132 workQueue->workQueueThreadBody();
133 return 0;
134 }
135
workQueueThreadBody()136 void WorkQueue::workQueueThreadBody()
137 {
138 g_main_loop_run(m_eventLoop);
139 }
140
registerEventSourceHandler(int fileDescriptor,int condition,PassOwnPtr<WorkItem> item)141 void WorkQueue::registerEventSourceHandler(int fileDescriptor, int condition, PassOwnPtr<WorkItem> item)
142 {
143 GRefPtr<GSocket> socket = adoptGRef(g_socket_new_from_fd(fileDescriptor, 0));
144 ASSERT(socket);
145 GRefPtr<GCancellable> cancellable = adoptGRef(g_cancellable_new());
146 GRefPtr<GSource> dispatchSource = adoptGRef(g_socket_create_source(socket.get(), static_cast<GIOCondition>(condition), cancellable.get()));
147 ASSERT(dispatchSource);
148 EventSource* eventSource = new EventSource(item, this, cancellable.get());
149 ASSERT(eventSource);
150
151 g_source_set_callback(dispatchSource.get(), reinterpret_cast<GSourceFunc>(&WorkQueue::EventSource::performWork),
152 eventSource, reinterpret_cast<GDestroyNotify>(&WorkQueue::EventSource::deleteEventSource));
153
154 // Set up the event sources under the mutex since this is shared across multiple threads.
155 {
156 MutexLocker locker(m_eventSourcesLock);
157 Vector<EventSource*> sources;
158 EventSourceIterator it = m_eventSources.find(fileDescriptor);
159 if (it != m_eventSources.end())
160 sources = it->second;
161
162 sources.append(eventSource);
163 m_eventSources.set(fileDescriptor, sources);
164 }
165
166 g_source_attach(dispatchSource.get(), m_eventContext);
167 }
168
unregisterEventSourceHandler(int fileDescriptor)169 void WorkQueue::unregisterEventSourceHandler(int fileDescriptor)
170 {
171 ASSERT(fileDescriptor);
172
173 MutexLocker locker(m_eventSourcesLock);
174
175 EventSourceIterator it = m_eventSources.find(fileDescriptor);
176 ASSERT(it != m_eventSources.end());
177 ASSERT(m_eventSources.contains(fileDescriptor));
178
179 if (it != m_eventSources.end()) {
180 Vector<EventSource*> sources = it->second;
181 for (unsigned i = 0; i < sources.size(); i++)
182 sources[i]->cancel();
183
184 m_eventSources.remove(it);
185 }
186 }
187
scheduleWorkOnSource(GSource * dispatchSource,PassOwnPtr<WorkItem> item,GSourceFunc sourceCallback)188 void WorkQueue::scheduleWorkOnSource(GSource* dispatchSource, PassOwnPtr<WorkItem> item, GSourceFunc sourceCallback)
189 {
190 EventSource* eventSource = new EventSource(item, this, 0);
191
192 g_source_set_callback(dispatchSource, sourceCallback, eventSource,
193 reinterpret_cast<GDestroyNotify>(&WorkQueue::EventSource::deleteEventSource));
194
195 g_source_attach(dispatchSource, m_eventContext);
196 }
197
scheduleWork(PassOwnPtr<WorkItem> item)198 void WorkQueue::scheduleWork(PassOwnPtr<WorkItem> item)
199 {
200 GRefPtr<GSource> dispatchSource = adoptGRef(g_idle_source_new());
201 ASSERT(dispatchSource);
202 g_source_set_priority(dispatchSource.get(), G_PRIORITY_DEFAULT);
203
204 scheduleWorkOnSource(dispatchSource.get(), item, reinterpret_cast<GSourceFunc>(&WorkQueue::EventSource::performWorkOnce));
205 }
206
scheduleWorkAfterDelay(PassOwnPtr<WorkItem> item,double delay)207 void WorkQueue::scheduleWorkAfterDelay(PassOwnPtr<WorkItem> item, double delay)
208 {
209 GRefPtr<GSource> dispatchSource = adoptGRef(g_timeout_source_new(static_cast<guint>(delay * 1000)));
210 ASSERT(dispatchSource);
211
212 scheduleWorkOnSource(dispatchSource.get(), item, reinterpret_cast<GSourceFunc>(&WorkQueue::EventSource::performWorkOnce));
213 }
214
scheduleWorkOnTermination(WebKit::PlatformProcessIdentifier process,PassOwnPtr<WorkItem> item)215 void WorkQueue::scheduleWorkOnTermination(WebKit::PlatformProcessIdentifier process, PassOwnPtr<WorkItem> item)
216 {
217 GRefPtr<GSource> dispatchSource = adoptGRef(g_child_watch_source_new(process));
218 ASSERT(dispatchSource);
219
220 scheduleWorkOnSource(dispatchSource.get(), item, reinterpret_cast<GSourceFunc>(&WorkQueue::EventSource::performWorkOnTermination));
221 }
222