• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2011 Igalia S.L.
3  * Copyright (C) 2010 Apple Inc. All rights reserved.
4  * Portions Copyright (c) 2010 Motorola Mobility, Inc.  All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  *
15  * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
19  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
25  * THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include "config.h"
29 #include "WorkQueue.h"
30 
31 #include "WKBase.h"
32 #include <WebCore/NotImplemented.h>
33 #include <gio/gio.h>
34 #include <glib.h>
35 #include <wtf/gobject/GRefPtr.h>
36 
37 // WorkQueue::EventSource
38 class WorkQueue::EventSource {
39 public:
EventSource(PassOwnPtr<WorkItem> workItem,WorkQueue * workQueue,GCancellable * cancellable)40     EventSource(PassOwnPtr<WorkItem> workItem, WorkQueue* workQueue, GCancellable* cancellable)
41         : m_workItem(workItem)
42         , m_workQueue(workQueue)
43         , m_cancellable(cancellable)
44     {
45     }
46 
cancel()47     void cancel()
48     {
49         if (!m_cancellable)
50             return;
51         g_cancellable_cancel(m_cancellable);
52     }
53 
executeEventSource(EventSource * eventSource)54     static void executeEventSource(EventSource* eventSource)
55     {
56         ASSERT(eventSource);
57         WorkQueue* queue = eventSource->m_workQueue;
58         {
59             MutexLocker locker(queue->m_isValidMutex);
60             if (!queue->m_isValid)
61                 return;
62         }
63 
64         eventSource->m_workItem->execute();
65     }
66 
performWorkOnce(EventSource * eventSource)67     static gboolean performWorkOnce(EventSource* eventSource)
68     {
69         executeEventSource(eventSource);
70         return FALSE;
71     }
72 
performWork(GSocket * socket,GIOCondition condition,EventSource * eventSource)73     static gboolean performWork(GSocket* socket, GIOCondition condition, EventSource* eventSource)
74     {
75         if (!(condition & G_IO_IN) && !(condition & G_IO_HUP) && !(condition & G_IO_ERR)) {
76             // EventSource has been cancelled, return FALSE to destroy the source.
77             return FALSE;
78         }
79 
80         executeEventSource(eventSource);
81         return TRUE;
82     }
83 
performWorkOnTermination(GPid,gint,EventSource * eventSource)84     static gboolean performWorkOnTermination(GPid, gint, EventSource* eventSource)
85     {
86         executeEventSource(eventSource);
87         return FALSE;
88     }
89 
deleteEventSource(EventSource * eventSource)90     static void deleteEventSource(EventSource* eventSource)
91     {
92         ASSERT(eventSource);
93         delete eventSource;
94     }
95 
96 public:
97     PassOwnPtr<WorkItem> m_workItem;
98     WorkQueue* m_workQueue;
99     GCancellable* m_cancellable;
100 };
101 
102 // WorkQueue
platformInitialize(const char * name)103 void WorkQueue::platformInitialize(const char* name)
104 {
105     m_eventContext = g_main_context_new();
106     ASSERT(m_eventContext);
107     m_eventLoop = g_main_loop_new(m_eventContext, FALSE);
108     ASSERT(m_eventLoop);
109     m_workQueueThread = createThread(reinterpret_cast<WTF::ThreadFunction>(&WorkQueue::startWorkQueueThread), this, name);
110 }
111 
platformInvalidate()112 void WorkQueue::platformInvalidate()
113 {
114     MutexLocker locker(m_eventLoopLock);
115 
116     if (m_eventLoop) {
117         if (g_main_loop_is_running(m_eventLoop))
118             g_main_loop_quit(m_eventLoop);
119 
120         g_main_loop_unref(m_eventLoop);
121         m_eventLoop = 0;
122     }
123 
124     if (m_eventContext) {
125         g_main_context_unref(m_eventContext);
126         m_eventContext = 0;
127     }
128 }
129 
startWorkQueueThread(WorkQueue * workQueue)130 void* WorkQueue::startWorkQueueThread(WorkQueue* workQueue)
131 {
132     workQueue->workQueueThreadBody();
133     return 0;
134 }
135 
workQueueThreadBody()136 void WorkQueue::workQueueThreadBody()
137 {
138     g_main_loop_run(m_eventLoop);
139 }
140 
registerEventSourceHandler(int fileDescriptor,int condition,PassOwnPtr<WorkItem> item)141 void WorkQueue::registerEventSourceHandler(int fileDescriptor, int condition, PassOwnPtr<WorkItem> item)
142 {
143     GRefPtr<GSocket> socket = adoptGRef(g_socket_new_from_fd(fileDescriptor, 0));
144     ASSERT(socket);
145     GRefPtr<GCancellable> cancellable = adoptGRef(g_cancellable_new());
146     GRefPtr<GSource> dispatchSource = adoptGRef(g_socket_create_source(socket.get(), static_cast<GIOCondition>(condition), cancellable.get()));
147     ASSERT(dispatchSource);
148     EventSource* eventSource = new EventSource(item, this, cancellable.get());
149     ASSERT(eventSource);
150 
151     g_source_set_callback(dispatchSource.get(), reinterpret_cast<GSourceFunc>(&WorkQueue::EventSource::performWork),
152         eventSource, reinterpret_cast<GDestroyNotify>(&WorkQueue::EventSource::deleteEventSource));
153 
154     // Set up the event sources under the mutex since this is shared across multiple threads.
155     {
156         MutexLocker locker(m_eventSourcesLock);
157         Vector<EventSource*> sources;
158         EventSourceIterator it = m_eventSources.find(fileDescriptor);
159         if (it != m_eventSources.end())
160             sources = it->second;
161 
162         sources.append(eventSource);
163         m_eventSources.set(fileDescriptor, sources);
164     }
165 
166     g_source_attach(dispatchSource.get(), m_eventContext);
167 }
168 
unregisterEventSourceHandler(int fileDescriptor)169 void WorkQueue::unregisterEventSourceHandler(int fileDescriptor)
170 {
171     ASSERT(fileDescriptor);
172 
173     MutexLocker locker(m_eventSourcesLock);
174 
175     EventSourceIterator it = m_eventSources.find(fileDescriptor);
176     ASSERT(it != m_eventSources.end());
177     ASSERT(m_eventSources.contains(fileDescriptor));
178 
179     if (it != m_eventSources.end()) {
180         Vector<EventSource*> sources = it->second;
181         for (unsigned i = 0; i < sources.size(); i++)
182             sources[i]->cancel();
183 
184         m_eventSources.remove(it);
185     }
186 }
187 
scheduleWorkOnSource(GSource * dispatchSource,PassOwnPtr<WorkItem> item,GSourceFunc sourceCallback)188 void WorkQueue::scheduleWorkOnSource(GSource* dispatchSource, PassOwnPtr<WorkItem> item, GSourceFunc sourceCallback)
189 {
190     EventSource* eventSource = new EventSource(item, this, 0);
191 
192     g_source_set_callback(dispatchSource, sourceCallback, eventSource,
193                           reinterpret_cast<GDestroyNotify>(&WorkQueue::EventSource::deleteEventSource));
194 
195     g_source_attach(dispatchSource, m_eventContext);
196 }
197 
scheduleWork(PassOwnPtr<WorkItem> item)198 void WorkQueue::scheduleWork(PassOwnPtr<WorkItem> item)
199 {
200     GRefPtr<GSource> dispatchSource = adoptGRef(g_idle_source_new());
201     ASSERT(dispatchSource);
202     g_source_set_priority(dispatchSource.get(), G_PRIORITY_DEFAULT);
203 
204     scheduleWorkOnSource(dispatchSource.get(), item, reinterpret_cast<GSourceFunc>(&WorkQueue::EventSource::performWorkOnce));
205 }
206 
scheduleWorkAfterDelay(PassOwnPtr<WorkItem> item,double delay)207 void WorkQueue::scheduleWorkAfterDelay(PassOwnPtr<WorkItem> item, double delay)
208 {
209     GRefPtr<GSource> dispatchSource = adoptGRef(g_timeout_source_new(static_cast<guint>(delay * 1000)));
210     ASSERT(dispatchSource);
211 
212     scheduleWorkOnSource(dispatchSource.get(), item, reinterpret_cast<GSourceFunc>(&WorkQueue::EventSource::performWorkOnce));
213 }
214 
scheduleWorkOnTermination(WebKit::PlatformProcessIdentifier process,PassOwnPtr<WorkItem> item)215 void WorkQueue::scheduleWorkOnTermination(WebKit::PlatformProcessIdentifier process, PassOwnPtr<WorkItem> item)
216 {
217     GRefPtr<GSource> dispatchSource = adoptGRef(g_child_watch_source_new(process));
218     ASSERT(dispatchSource);
219 
220     scheduleWorkOnSource(dispatchSource.get(), item, reinterpret_cast<GSourceFunc>(&WorkQueue::EventSource::performWorkOnTermination));
221 }
222