1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "config.h"
6 #include "core/streams/ReadableStream.h"
7
8 #include "bindings/core/v8/ExceptionState.h"
9 #include "bindings/core/v8/ScriptFunction.h"
10 #include "bindings/core/v8/ScriptPromiseResolver.h"
11 #include "bindings/core/v8/V8Binding.h"
12 #include "core/dom/DOMException.h"
13 #include "core/dom/ExceptionCode.h"
14 #include "core/dom/ExecutionContext.h"
15 #include "core/streams/UnderlyingSource.h"
16
17 namespace blink {
18
ReadableStream(ExecutionContext * executionContext,UnderlyingSource * source)19 ReadableStream::ReadableStream(ExecutionContext* executionContext, UnderlyingSource* source)
20 : m_source(source)
21 , m_isStarted(false)
22 , m_isDraining(false)
23 , m_isPulling(false)
24 , m_isSchedulingPull(false)
25 , m_state(Waiting)
26 , m_wait(new WaitPromise(executionContext, this, WaitPromise::Ready))
27 , m_closed(new ClosedPromise(executionContext, this, ClosedPromise::Closed))
28 {
29 }
30
~ReadableStream()31 ReadableStream::~ReadableStream()
32 {
33 }
34
stateString() const35 String ReadableStream::stateString() const
36 {
37 switch (m_state) {
38 case Readable:
39 return "readable";
40 case Waiting:
41 return "waiting";
42 case Closed:
43 return "closed";
44 case Errored:
45 return "errored";
46 }
47 ASSERT(false);
48 return String();
49 }
50
enqueuePreliminaryCheck(size_t chunkSize)51 bool ReadableStream::enqueuePreliminaryCheck(size_t chunkSize)
52 {
53 if (m_state == Errored || m_state == Closed || m_isDraining)
54 return false;
55
56 // FIXME: Query strategy.
57 return true;
58 }
59
enqueuePostAction(size_t totalQueueSize)60 bool ReadableStream::enqueuePostAction(size_t totalQueueSize)
61 {
62 m_isPulling = false;
63
64 // FIXME: Set needsMore correctly.
65 bool needsMore = true;
66
67 if (m_state == Waiting) {
68 m_state = Readable;
69 m_wait->resolve(V8UndefinedType());
70 }
71
72 return needsMore;
73 }
74
close()75 void ReadableStream::close()
76 {
77 if (m_state == Waiting) {
78 m_wait->resolve(V8UndefinedType());
79 m_closed->resolve(V8UndefinedType());
80 m_state = Closed;
81 } else if (m_state == Readable) {
82 m_isDraining = true;
83 }
84 }
85
readPreliminaryCheck(ExceptionState & exceptionState)86 void ReadableStream::readPreliminaryCheck(ExceptionState& exceptionState)
87 {
88 if (m_state == Waiting) {
89 exceptionState.throwTypeError("read is called while state is waiting");
90 return;
91 }
92 if (m_state == Closed) {
93 exceptionState.throwTypeError("read is called while state is closed");
94 return;
95 }
96 if (m_state == Errored) {
97 exceptionState.throwDOMException(m_exception->code(), m_exception->message());
98 return;
99 }
100 }
101
readPostAction()102 void ReadableStream::readPostAction()
103 {
104 ASSERT(m_state == Readable);
105 if (isQueueEmpty()) {
106 if (m_isDraining) {
107 m_state = Closed;
108 m_wait->reset();
109 m_wait->resolve(V8UndefinedType());
110 m_closed->resolve(V8UndefinedType());
111 } else {
112 m_state = Waiting;
113 m_wait->reset();
114 callOrSchedulePull();
115 }
116 }
117 }
118
wait(ScriptState * scriptState)119 ScriptPromise ReadableStream::wait(ScriptState* scriptState)
120 {
121 if (m_state == Waiting)
122 callOrSchedulePull();
123 return m_wait->promise(scriptState->world());
124 }
125
cancel(ScriptState * scriptState,ScriptValue reason)126 ScriptPromise ReadableStream::cancel(ScriptState* scriptState, ScriptValue reason)
127 {
128 if (m_state == Errored) {
129 RefPtr<ScriptPromiseResolver> resolver = ScriptPromiseResolver::create(scriptState);
130 ScriptPromise promise = resolver->promise();
131 resolver->reject(m_exception);
132 return promise;
133 }
134 if (m_state == Closed)
135 return ScriptPromise::cast(scriptState, v8::Undefined(scriptState->isolate()));
136
137 if (m_state == Waiting) {
138 m_wait->resolve(V8UndefinedType());
139 } else {
140 ASSERT(m_state == Readable);
141 m_wait->reset();
142 m_wait->resolve(V8UndefinedType());
143 }
144
145 clearQueue();
146 m_state = Closed;
147 m_closed->resolve(V8UndefinedType());
148 return m_source->cancelSource(scriptState, reason);
149 }
150
closed(ScriptState * scriptState)151 ScriptPromise ReadableStream::closed(ScriptState* scriptState)
152 {
153 return m_closed->promise(scriptState->world());
154 }
155
error(PassRefPtrWillBeRawPtr<DOMException> exception)156 void ReadableStream::error(PassRefPtrWillBeRawPtr<DOMException> exception)
157 {
158 if (m_state == Readable) {
159 clearQueue();
160 m_wait->reset();
161 }
162
163 if (m_state == Waiting || m_state == Readable) {
164 m_state = Errored;
165 m_exception = exception;
166 if (m_wait->state() == m_wait->Pending)
167 m_wait->reject(m_exception);
168 m_closed->reject(m_exception);
169 }
170 }
171
didSourceStart()172 void ReadableStream::didSourceStart()
173 {
174 m_isStarted = true;
175 if (m_isSchedulingPull)
176 m_source->pullSource();
177 }
178
callOrSchedulePull()179 void ReadableStream::callOrSchedulePull()
180 {
181 if (m_isPulling)
182 return;
183 m_isPulling = true;
184 if (m_isStarted)
185 m_source->pullSource();
186 else
187 m_isSchedulingPull = true;
188 }
189
trace(Visitor * visitor)190 void ReadableStream::trace(Visitor* visitor)
191 {
192 visitor->trace(m_source);
193 visitor->trace(m_wait);
194 visitor->trace(m_closed);
195 visitor->trace(m_exception);
196 }
197
198 } // namespace blink
199
200