• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 the V8 project authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "src/wasm/streaming-decoder.h"
6 
7 #include "src/handles/handles.h"
8 #include "src/objects/descriptor-array.h"
9 #include "src/objects/dictionary.h"
10 #include "src/objects/objects-inl.h"
11 #include "src/wasm/decoder.h"
12 #include "src/wasm/leb-helper.h"
13 #include "src/wasm/module-decoder.h"
14 #include "src/wasm/wasm-code-manager.h"
15 #include "src/wasm/wasm-limits.h"
16 #include "src/wasm/wasm-objects.h"
17 #include "src/wasm/wasm-result.h"
18 
19 #define TRACE_STREAMING(...)                            \
20   do {                                                  \
21     if (FLAG_trace_wasm_streaming) PrintF(__VA_ARGS__); \
22   } while (false)
23 
24 namespace v8 {
25 namespace internal {
26 namespace wasm {
27 
28 class V8_EXPORT_PRIVATE AsyncStreamingDecoder : public StreamingDecoder {
29  public:
30   explicit AsyncStreamingDecoder(std::unique_ptr<StreamingProcessor> processor);
31   AsyncStreamingDecoder(const AsyncStreamingDecoder&) = delete;
32   AsyncStreamingDecoder& operator=(const AsyncStreamingDecoder&) = delete;
33 
34   // The buffer passed into OnBytesReceived is owned by the caller.
35   void OnBytesReceived(Vector<const uint8_t> bytes) override;
36 
37   void Finish() override;
38 
39   void Abort() override;
40 
41   // Notify the StreamingDecoder that compilation ended and the
42   // StreamingProcessor should not be called anymore.
NotifyCompilationEnded()43   void NotifyCompilationEnded() override { Fail(); }
44 
45   void NotifyNativeModuleCreated(
46       const std::shared_ptr<NativeModule>& native_module) override;
47 
48  private:
49   // The SectionBuffer is the data object for the content of a single section.
50   // It stores all bytes of the section (including section id and section
51   // length), and the offset where the actual payload starts.
52   class SectionBuffer : public WireBytesStorage {
53    public:
54     // id: The section id.
55     // payload_length: The length of the payload.
56     // length_bytes: The section length, as it is encoded in the module bytes.
SectionBuffer(uint32_t module_offset,uint8_t id,size_t payload_length,Vector<const uint8_t> length_bytes)57     SectionBuffer(uint32_t module_offset, uint8_t id, size_t payload_length,
58                   Vector<const uint8_t> length_bytes)
59         :  // ID + length + payload
60           module_offset_(module_offset),
61           bytes_(OwnedVector<uint8_t>::NewForOverwrite(
62               1 + length_bytes.length() + payload_length)),
63           payload_offset_(1 + length_bytes.length()) {
64       bytes_.start()[0] = id;
65       memcpy(bytes_.start() + 1, &length_bytes.first(), length_bytes.length());
66     }
67 
section_code() const68     SectionCode section_code() const {
69       return static_cast<SectionCode>(bytes_.start()[0]);
70     }
71 
GetCode(WireBytesRef ref) const72     Vector<const uint8_t> GetCode(WireBytesRef ref) const final {
73       DCHECK_LE(module_offset_, ref.offset());
74       uint32_t offset_in_code_buffer = ref.offset() - module_offset_;
75       return bytes().SubVector(offset_in_code_buffer,
76                                offset_in_code_buffer + ref.length());
77     }
78 
module_offset() const79     uint32_t module_offset() const { return module_offset_; }
bytes() const80     Vector<uint8_t> bytes() const { return bytes_.as_vector(); }
payload() const81     Vector<uint8_t> payload() const { return bytes() + payload_offset_; }
length() const82     size_t length() const { return bytes_.size(); }
payload_offset() const83     size_t payload_offset() const { return payload_offset_; }
84 
85    private:
86     const uint32_t module_offset_;
87     const OwnedVector<uint8_t> bytes_;
88     const size_t payload_offset_;
89   };
90 
91   // The decoding of a stream of wasm module bytes is organized in states. Each
92   // state provides a buffer to store the bytes required for the current state,
93   // information on how many bytes have already been received, how many bytes
94   // are needed, and a {Next} function which starts the next state once all
95   // bytes of the current state were received.
96   //
97   // The states change according to the following state diagram:
98   //
99   //       Start
100   //         |
101   //         |
102   //         v
103   // DecodeModuleHeader
104   //         |   _________________________________________
105   //         |   |                                        |
106   //         v   v                                        |
107   //  DecodeSectionID --> DecodeSectionLength --> DecodeSectionPayload
108   //         A                  |
109   //         |                  | (if the section id == code)
110   //         |                  v
111   //         |      DecodeNumberOfFunctions -- > DecodeFunctionLength
112   //         |                                          A    |
113   //         |                                          |    |
114   //         |  (after all functions were read)         |    v
115   //         ------------------------------------- DecodeFunctionBody
116   //
117   class DecodingState {
118    public:
119     virtual ~DecodingState() = default;
120 
121     // Reads the bytes for the current state and returns the number of read
122     // bytes.
123     virtual size_t ReadBytes(AsyncStreamingDecoder* streaming,
124                              Vector<const uint8_t> bytes);
125 
126     // Returns the next state of the streaming decoding.
127     virtual std::unique_ptr<DecodingState> Next(
128         AsyncStreamingDecoder* streaming) = 0;
129     // The buffer to store the received bytes.
130     virtual Vector<uint8_t> buffer() = 0;
131     // The number of bytes which were already received.
offset() const132     size_t offset() const { return offset_; }
set_offset(size_t value)133     void set_offset(size_t value) { offset_ = value; }
134     // A flag to indicate if finishing the streaming decoder is allowed without
135     // error.
is_finishing_allowed() const136     virtual bool is_finishing_allowed() const { return false; }
137 
138    private:
139     size_t offset_ = 0;
140   };
141 
142   // Forward declarations of the concrete states. This is needed so that they
143   // can access private members of the AsyncStreamingDecoder.
144   class DecodeVarInt32;
145   class DecodeModuleHeader;
146   class DecodeSectionID;
147   class DecodeSectionLength;
148   class DecodeSectionPayload;
149   class DecodeNumberOfFunctions;
150   class DecodeFunctionLength;
151   class DecodeFunctionBody;
152 
153   // Creates a buffer for the next section of the module.
154   SectionBuffer* CreateNewBuffer(uint32_t module_offset, uint8_t section_id,
155                                  size_t length,
156                                  Vector<const uint8_t> length_bytes);
157 
Error(const WasmError & error)158   std::unique_ptr<DecodingState> Error(const WasmError& error) {
159     if (ok()) processor_->OnError(error);
160     Fail();
161     return std::unique_ptr<DecodingState>(nullptr);
162   }
163 
Error(std::string message)164   std::unique_ptr<DecodingState> Error(std::string message) {
165     return Error(WasmError{module_offset_ - 1, std::move(message)});
166   }
167 
ProcessModuleHeader()168   void ProcessModuleHeader() {
169     if (!ok()) return;
170     if (!processor_->ProcessModuleHeader(state_->buffer(), 0)) Fail();
171   }
172 
ProcessSection(SectionBuffer * buffer)173   void ProcessSection(SectionBuffer* buffer) {
174     if (!ok()) return;
175     if (!processor_->ProcessSection(
176             buffer->section_code(), buffer->payload(),
177             buffer->module_offset() +
178                 static_cast<uint32_t>(buffer->payload_offset()))) {
179       Fail();
180     }
181   }
182 
StartCodeSection(int num_functions,std::shared_ptr<WireBytesStorage> wire_bytes_storage,int code_section_length)183   void StartCodeSection(int num_functions,
184                         std::shared_ptr<WireBytesStorage> wire_bytes_storage,
185                         int code_section_length) {
186     if (!ok()) return;
187     // The offset passed to {ProcessCodeSectionHeader} is an error offset and
188     // not the start offset of a buffer. Therefore we need the -1 here.
189     if (!processor_->ProcessCodeSectionHeader(
190             num_functions, module_offset() - 1, std::move(wire_bytes_storage),
191             code_section_length)) {
192       Fail();
193     }
194   }
195 
ProcessFunctionBody(Vector<const uint8_t> bytes,uint32_t module_offset)196   void ProcessFunctionBody(Vector<const uint8_t> bytes,
197                            uint32_t module_offset) {
198     if (!ok()) return;
199     if (!processor_->ProcessFunctionBody(bytes, module_offset)) Fail();
200   }
201 
Fail()202   void Fail() {
203     // We reset the {processor_} field to represent failure. This also ensures
204     // that we do not accidentally call further methods on the processor after
205     // failure.
206     processor_.reset();
207   }
208 
ok() const209   bool ok() const { return processor_ != nullptr; }
210 
module_offset() const211   uint32_t module_offset() const { return module_offset_; }
212 
213   std::unique_ptr<StreamingProcessor> processor_;
214   std::unique_ptr<DecodingState> state_;
215   std::vector<std::shared_ptr<SectionBuffer>> section_buffers_;
216   bool code_section_processed_ = false;
217   uint32_t module_offset_ = 0;
218   size_t total_size_ = 0;
219   bool stream_finished_ = false;
220 
221   // We need wire bytes in an array for deserializing cached modules.
222   std::vector<uint8_t> wire_bytes_for_deserializing_;
223 };
224 
OnBytesReceived(Vector<const uint8_t> bytes)225 void AsyncStreamingDecoder::OnBytesReceived(Vector<const uint8_t> bytes) {
226   if (deserializing()) {
227     wire_bytes_for_deserializing_.insert(wire_bytes_for_deserializing_.end(),
228                                          bytes.begin(), bytes.end());
229     return;
230   }
231 
232   TRACE_STREAMING("OnBytesReceived(%zu bytes)\n", bytes.size());
233 
234   size_t current = 0;
235   while (ok() && current < bytes.size()) {
236     size_t num_bytes =
237         state_->ReadBytes(this, bytes.SubVector(current, bytes.size()));
238     current += num_bytes;
239     module_offset_ += num_bytes;
240     if (state_->offset() == state_->buffer().size()) {
241       state_ = state_->Next(this);
242     }
243   }
244   total_size_ += bytes.size();
245   if (ok()) {
246     processor_->OnFinishedChunk();
247   }
248 }
249 
ReadBytes(AsyncStreamingDecoder * streaming,Vector<const uint8_t> bytes)250 size_t AsyncStreamingDecoder::DecodingState::ReadBytes(
251     AsyncStreamingDecoder* streaming, Vector<const uint8_t> bytes) {
252   Vector<uint8_t> remaining_buf = buffer() + offset();
253   size_t num_bytes = std::min(bytes.size(), remaining_buf.size());
254   TRACE_STREAMING("ReadBytes(%zu bytes)\n", num_bytes);
255   memcpy(remaining_buf.begin(), &bytes.first(), num_bytes);
256   set_offset(offset() + num_bytes);
257   return num_bytes;
258 }
259 
Finish()260 void AsyncStreamingDecoder::Finish() {
261   TRACE_STREAMING("Finish\n");
262   DCHECK(!stream_finished_);
263   stream_finished_ = true;
264   if (!ok()) return;
265 
266   if (deserializing()) {
267     Vector<const uint8_t> wire_bytes = VectorOf(wire_bytes_for_deserializing_);
268     // Try to deserialize the module from wire bytes and module bytes.
269     if (processor_->Deserialize(compiled_module_bytes_, wire_bytes)) return;
270 
271     // Deserialization failed. Restart decoding using |wire_bytes|.
272     compiled_module_bytes_ = {};
273     DCHECK(!deserializing());
274     OnBytesReceived(wire_bytes);
275     // The decoder has received all wire bytes; fall through and finish.
276   }
277 
278   if (!state_->is_finishing_allowed()) {
279     // The byte stream ended too early, we report an error.
280     Error("unexpected end of stream");
281     return;
282   }
283 
284   OwnedVector<uint8_t> bytes =
285       OwnedVector<uint8_t>::NewForOverwrite(total_size_);
286   uint8_t* cursor = bytes.start();
287   {
288 #define BYTES(x) (x & 0xFF), (x >> 8) & 0xFF, (x >> 16) & 0xFF, (x >> 24) & 0xFF
289     uint8_t module_header[]{BYTES(kWasmMagic), BYTES(kWasmVersion)};
290 #undef BYTES
291     memcpy(cursor, module_header, arraysize(module_header));
292     cursor += arraysize(module_header);
293   }
294   for (const auto& buffer : section_buffers_) {
295     DCHECK_LE(cursor - bytes.start() + buffer->length(), total_size_);
296     memcpy(cursor, buffer->bytes().begin(), buffer->length());
297     cursor += buffer->length();
298   }
299   processor_->OnFinishedStream(std::move(bytes));
300 }
301 
Abort()302 void AsyncStreamingDecoder::Abort() {
303   TRACE_STREAMING("Abort\n");
304   if (stream_finished_) return;
305   stream_finished_ = true;
306   if (!ok()) return;  // Failed already.
307   processor_->OnAbort();
308   Fail();
309 }
310 
311 namespace {
312 
313 class TopTierCompiledCallback {
314  public:
TopTierCompiledCallback(std::weak_ptr<NativeModule> native_module,AsyncStreamingDecoder::ModuleCompiledCallback callback)315   TopTierCompiledCallback(
316       std::weak_ptr<NativeModule> native_module,
317       AsyncStreamingDecoder::ModuleCompiledCallback callback)
318       : native_module_(std::move(native_module)),
319         callback_(std::move(callback)) {}
320 
operator ()(CompilationEvent event) const321   void operator()(CompilationEvent event) const {
322     if (event != CompilationEvent::kFinishedTopTierCompilation) return;
323     // If the native module is still alive, get back a shared ptr and call the
324     // callback.
325     if (std::shared_ptr<NativeModule> native_module = native_module_.lock()) {
326       callback_(native_module);
327     }
328 #ifdef DEBUG
329     DCHECK(!called_);
330     called_ = true;
331 #endif
332   }
333 
334  private:
335   const std::weak_ptr<NativeModule> native_module_;
336   const AsyncStreamingDecoder::ModuleCompiledCallback callback_;
337 #ifdef DEBUG
338   mutable bool called_ = false;
339 #endif
340 };
341 
342 }  // namespace
343 
NotifyNativeModuleCreated(const std::shared_ptr<NativeModule> & native_module)344 void AsyncStreamingDecoder::NotifyNativeModuleCreated(
345     const std::shared_ptr<NativeModule>& native_module) {
346   if (!module_compiled_callback_) return;
347   auto* comp_state = native_module->compilation_state();
348   comp_state->AddCallback(TopTierCompiledCallback{
349       std::move(native_module), std::move(module_compiled_callback_)});
350   module_compiled_callback_ = {};
351 }
352 
353 // An abstract class to share code among the states which decode VarInts. This
354 // class takes over the decoding of the VarInt and then calls the actual decode
355 // code with the decoded value.
356 class AsyncStreamingDecoder::DecodeVarInt32 : public DecodingState {
357  public:
DecodeVarInt32(size_t max_value,const char * field_name)358   explicit DecodeVarInt32(size_t max_value, const char* field_name)
359       : max_value_(max_value), field_name_(field_name) {}
360 
buffer()361   Vector<uint8_t> buffer() override { return ArrayVector(byte_buffer_); }
362 
363   size_t ReadBytes(AsyncStreamingDecoder* streaming,
364                    Vector<const uint8_t> bytes) override;
365 
366   std::unique_ptr<DecodingState> Next(
367       AsyncStreamingDecoder* streaming) override;
368 
369   virtual std::unique_ptr<DecodingState> NextWithValue(
370       AsyncStreamingDecoder* streaming) = 0;
371 
372  protected:
373   uint8_t byte_buffer_[kMaxVarInt32Size];
374   // The maximum valid value decoded in this state. {Next} returns an error if
375   // this value is exceeded.
376   const size_t max_value_;
377   const char* const field_name_;
378   size_t value_ = 0;
379   size_t bytes_consumed_ = 0;
380 };
381 
382 class AsyncStreamingDecoder::DecodeModuleHeader : public DecodingState {
383  public:
buffer()384   Vector<uint8_t> buffer() override { return ArrayVector(byte_buffer_); }
385 
386   std::unique_ptr<DecodingState> Next(
387       AsyncStreamingDecoder* streaming) override;
388 
389  private:
390   // Checks if the magic bytes of the module header are correct.
391   void CheckHeader(Decoder* decoder);
392 
393   // The size of the module header.
394   static constexpr size_t kModuleHeaderSize = 8;
395   uint8_t byte_buffer_[kModuleHeaderSize];
396 };
397 
398 class AsyncStreamingDecoder::DecodeSectionID : public DecodingState {
399  public:
DecodeSectionID(uint32_t module_offset)400   explicit DecodeSectionID(uint32_t module_offset)
401       : module_offset_(module_offset) {}
402 
buffer()403   Vector<uint8_t> buffer() override { return {&id_, 1}; }
is_finishing_allowed() const404   bool is_finishing_allowed() const override { return true; }
405 
406   std::unique_ptr<DecodingState> Next(
407       AsyncStreamingDecoder* streaming) override;
408 
409  private:
410   uint8_t id_ = 0;
411   // The start offset of this section in the module.
412   const uint32_t module_offset_;
413 };
414 
415 class AsyncStreamingDecoder::DecodeSectionLength : public DecodeVarInt32 {
416  public:
DecodeSectionLength(uint8_t id,uint32_t module_offset)417   explicit DecodeSectionLength(uint8_t id, uint32_t module_offset)
418       : DecodeVarInt32(max_module_size(), "section length"),
419         section_id_(id),
420         module_offset_(module_offset) {}
421 
422   std::unique_ptr<DecodingState> NextWithValue(
423       AsyncStreamingDecoder* streaming) override;
424 
425  private:
426   const uint8_t section_id_;
427   // The start offset of this section in the module.
428   const uint32_t module_offset_;
429 };
430 
431 class AsyncStreamingDecoder::DecodeSectionPayload : public DecodingState {
432  public:
DecodeSectionPayload(SectionBuffer * section_buffer)433   explicit DecodeSectionPayload(SectionBuffer* section_buffer)
434       : section_buffer_(section_buffer) {}
435 
buffer()436   Vector<uint8_t> buffer() override { return section_buffer_->payload(); }
437 
438   std::unique_ptr<DecodingState> Next(
439       AsyncStreamingDecoder* streaming) override;
440 
441  private:
442   SectionBuffer* const section_buffer_;
443 };
444 
445 class AsyncStreamingDecoder::DecodeNumberOfFunctions : public DecodeVarInt32 {
446  public:
DecodeNumberOfFunctions(SectionBuffer * section_buffer)447   explicit DecodeNumberOfFunctions(SectionBuffer* section_buffer)
448       : DecodeVarInt32(kV8MaxWasmFunctions, "functions count"),
449         section_buffer_(section_buffer) {}
450 
451   std::unique_ptr<DecodingState> NextWithValue(
452       AsyncStreamingDecoder* streaming) override;
453 
454  private:
455   SectionBuffer* const section_buffer_;
456 };
457 
458 class AsyncStreamingDecoder::DecodeFunctionLength : public DecodeVarInt32 {
459  public:
DecodeFunctionLength(SectionBuffer * section_buffer,size_t buffer_offset,size_t num_remaining_functions)460   explicit DecodeFunctionLength(SectionBuffer* section_buffer,
461                                 size_t buffer_offset,
462                                 size_t num_remaining_functions)
463       : DecodeVarInt32(kV8MaxWasmFunctionSize, "body size"),
464         section_buffer_(section_buffer),
465         buffer_offset_(buffer_offset),
466         // We are reading a new function, so one function less is remaining.
467         num_remaining_functions_(num_remaining_functions - 1) {
468     DCHECK_GT(num_remaining_functions, 0);
469   }
470 
471   std::unique_ptr<DecodingState> NextWithValue(
472       AsyncStreamingDecoder* streaming) override;
473 
474  private:
475   SectionBuffer* const section_buffer_;
476   const size_t buffer_offset_;
477   const size_t num_remaining_functions_;
478 };
479 
480 class AsyncStreamingDecoder::DecodeFunctionBody : public DecodingState {
481  public:
DecodeFunctionBody(SectionBuffer * section_buffer,size_t buffer_offset,size_t function_body_length,size_t num_remaining_functions,uint32_t module_offset)482   explicit DecodeFunctionBody(SectionBuffer* section_buffer,
483                               size_t buffer_offset, size_t function_body_length,
484                               size_t num_remaining_functions,
485                               uint32_t module_offset)
486       : section_buffer_(section_buffer),
487         buffer_offset_(buffer_offset),
488         function_body_length_(function_body_length),
489         num_remaining_functions_(num_remaining_functions),
490         module_offset_(module_offset) {}
491 
buffer()492   Vector<uint8_t> buffer() override {
493     Vector<uint8_t> remaining_buffer =
494         section_buffer_->bytes() + buffer_offset_;
495     return remaining_buffer.SubVector(0, function_body_length_);
496   }
497 
498   std::unique_ptr<DecodingState> Next(
499       AsyncStreamingDecoder* streaming) override;
500 
501  private:
502   SectionBuffer* const section_buffer_;
503   const size_t buffer_offset_;
504   const size_t function_body_length_;
505   const size_t num_remaining_functions_;
506   const uint32_t module_offset_;
507 };
508 
ReadBytes(AsyncStreamingDecoder * streaming,Vector<const uint8_t> bytes)509 size_t AsyncStreamingDecoder::DecodeVarInt32::ReadBytes(
510     AsyncStreamingDecoder* streaming, Vector<const uint8_t> bytes) {
511   Vector<uint8_t> buf = buffer();
512   Vector<uint8_t> remaining_buf = buf + offset();
513   size_t new_bytes = std::min(bytes.size(), remaining_buf.size());
514   TRACE_STREAMING("ReadBytes of a VarInt\n");
515   memcpy(remaining_buf.begin(), &bytes.first(), new_bytes);
516   buf.Truncate(offset() + new_bytes);
517   Decoder decoder(buf,
518                   streaming->module_offset() - static_cast<uint32_t>(offset()));
519   value_ = decoder.consume_u32v(field_name_);
520 
521   if (decoder.failed()) {
522     if (new_bytes == remaining_buf.size()) {
523       // We only report an error if we read all bytes.
524       streaming->Error(decoder.error());
525     }
526     set_offset(offset() + new_bytes);
527     return new_bytes;
528   }
529 
530   // The number of bytes we actually needed to read.
531   DCHECK_GT(decoder.pc(), buffer().begin());
532   bytes_consumed_ = static_cast<size_t>(decoder.pc() - buf.begin());
533   TRACE_STREAMING("  ==> %zu bytes consumed\n", bytes_consumed_);
534 
535   // We read all the bytes we needed.
536   DCHECK_GT(bytes_consumed_, offset());
537   new_bytes = bytes_consumed_ - offset();
538   // Set the offset to the buffer size to signal that we are at the end of this
539   // section.
540   set_offset(buffer().size());
541   return new_bytes;
542 }
543 
544 std::unique_ptr<AsyncStreamingDecoder::DecodingState>
Next(AsyncStreamingDecoder * streaming)545 AsyncStreamingDecoder::DecodeVarInt32::Next(AsyncStreamingDecoder* streaming) {
546   if (!streaming->ok()) return nullptr;
547 
548   if (value_ > max_value_) {
549     std::ostringstream oss;
550     oss << "function size > maximum function size: " << value_ << " < "
551         << max_value_;
552     return streaming->Error(oss.str());
553   }
554 
555   return NextWithValue(streaming);
556 }
557 
558 std::unique_ptr<AsyncStreamingDecoder::DecodingState>
Next(AsyncStreamingDecoder * streaming)559 AsyncStreamingDecoder::DecodeModuleHeader::Next(
560     AsyncStreamingDecoder* streaming) {
561   TRACE_STREAMING("DecodeModuleHeader\n");
562   streaming->ProcessModuleHeader();
563   if (!streaming->ok()) return nullptr;
564   return std::make_unique<DecodeSectionID>(streaming->module_offset());
565 }
566 
567 std::unique_ptr<AsyncStreamingDecoder::DecodingState>
Next(AsyncStreamingDecoder * streaming)568 AsyncStreamingDecoder::DecodeSectionID::Next(AsyncStreamingDecoder* streaming) {
569   TRACE_STREAMING("DecodeSectionID: %s section\n",
570                   SectionName(static_cast<SectionCode>(id_)));
571   if (id_ == SectionCode::kCodeSectionCode) {
572     // Explicitly check for multiple code sections as module decoder never
573     // sees the code section and hence cannot track this section.
574     if (streaming->code_section_processed_) {
575       // TODO(wasm): This error message (and others in this class) is different
576       // for non-streaming decoding. Bring them in sync and test.
577       return streaming->Error("code section can only appear once");
578     }
579     streaming->code_section_processed_ = true;
580   }
581   return std::make_unique<DecodeSectionLength>(id_, module_offset_);
582 }
583 
584 std::unique_ptr<AsyncStreamingDecoder::DecodingState>
NextWithValue(AsyncStreamingDecoder * streaming)585 AsyncStreamingDecoder::DecodeSectionLength::NextWithValue(
586     AsyncStreamingDecoder* streaming) {
587   TRACE_STREAMING("DecodeSectionLength(%zu)\n", value_);
588   SectionBuffer* buf =
589       streaming->CreateNewBuffer(module_offset_, section_id_, value_,
590                                  buffer().SubVector(0, bytes_consumed_));
591   DCHECK_NOT_NULL(buf);
592   if (value_ == 0) {
593     if (section_id_ == SectionCode::kCodeSectionCode) {
594       return streaming->Error("code section cannot have size 0");
595     }
596     // Process section without payload as well, to enforce section order and
597     // other feature checks specific to each individual section.
598     streaming->ProcessSection(buf);
599     if (!streaming->ok()) return nullptr;
600     // There is no payload, we go to the next section immediately.
601     return std::make_unique<DecodeSectionID>(streaming->module_offset_);
602   }
603   if (section_id_ == SectionCode::kCodeSectionCode) {
604     // We reached the code section. All functions of the code section are put
605     // into the same SectionBuffer.
606     return std::make_unique<DecodeNumberOfFunctions>(buf);
607   }
608   return std::make_unique<DecodeSectionPayload>(buf);
609 }
610 
611 std::unique_ptr<AsyncStreamingDecoder::DecodingState>
Next(AsyncStreamingDecoder * streaming)612 AsyncStreamingDecoder::DecodeSectionPayload::Next(
613     AsyncStreamingDecoder* streaming) {
614   TRACE_STREAMING("DecodeSectionPayload\n");
615   streaming->ProcessSection(section_buffer_);
616   if (!streaming->ok()) return nullptr;
617   return std::make_unique<DecodeSectionID>(streaming->module_offset());
618 }
619 
620 std::unique_ptr<AsyncStreamingDecoder::DecodingState>
NextWithValue(AsyncStreamingDecoder * streaming)621 AsyncStreamingDecoder::DecodeNumberOfFunctions::NextWithValue(
622     AsyncStreamingDecoder* streaming) {
623   TRACE_STREAMING("DecodeNumberOfFunctions(%zu)\n", value_);
624   // Copy the bytes we read into the section buffer.
625   Vector<uint8_t> payload_buf = section_buffer_->payload();
626   if (payload_buf.size() < bytes_consumed_) {
627     return streaming->Error("invalid code section length");
628   }
629   memcpy(payload_buf.begin(), buffer().begin(), bytes_consumed_);
630 
631   // {value} is the number of functions.
632   if (value_ == 0) {
633     if (payload_buf.size() != bytes_consumed_) {
634       return streaming->Error("not all code section bytes were used");
635     }
636     return std::make_unique<DecodeSectionID>(streaming->module_offset());
637   }
638 
639   DCHECK_GE(kMaxInt, payload_buf.length());
640   int code_section_len = static_cast<int>(payload_buf.length());
641   DCHECK_GE(kMaxInt, value_);
642   streaming->StartCodeSection(static_cast<int>(value_),
643                               streaming->section_buffers_.back(),
644                               code_section_len);
645   if (!streaming->ok()) return nullptr;
646   return std::make_unique<DecodeFunctionLength>(
647       section_buffer_, section_buffer_->payload_offset() + bytes_consumed_,
648       value_);
649 }
650 
651 std::unique_ptr<AsyncStreamingDecoder::DecodingState>
NextWithValue(AsyncStreamingDecoder * streaming)652 AsyncStreamingDecoder::DecodeFunctionLength::NextWithValue(
653     AsyncStreamingDecoder* streaming) {
654   TRACE_STREAMING("DecodeFunctionLength(%zu)\n", value_);
655   // Copy the bytes we consumed into the section buffer.
656   Vector<uint8_t> fun_length_buffer = section_buffer_->bytes() + buffer_offset_;
657   if (fun_length_buffer.size() < bytes_consumed_) {
658     return streaming->Error("read past code section end");
659   }
660   memcpy(fun_length_buffer.begin(), buffer().begin(), bytes_consumed_);
661 
662   // {value} is the length of the function.
663   if (value_ == 0) return streaming->Error("invalid function length (0)");
664 
665   if (buffer_offset_ + bytes_consumed_ + value_ > section_buffer_->length()) {
666     return streaming->Error("not enough code section bytes");
667   }
668 
669   return std::make_unique<DecodeFunctionBody>(
670       section_buffer_, buffer_offset_ + bytes_consumed_, value_,
671       num_remaining_functions_, streaming->module_offset());
672 }
673 
674 std::unique_ptr<AsyncStreamingDecoder::DecodingState>
Next(AsyncStreamingDecoder * streaming)675 AsyncStreamingDecoder::DecodeFunctionBody::Next(
676     AsyncStreamingDecoder* streaming) {
677   TRACE_STREAMING("DecodeFunctionBody\n");
678   streaming->ProcessFunctionBody(buffer(), module_offset_);
679   if (!streaming->ok()) return nullptr;
680 
681   size_t end_offset = buffer_offset_ + function_body_length_;
682   if (num_remaining_functions_ > 0) {
683     return std::make_unique<DecodeFunctionLength>(section_buffer_, end_offset,
684                                                   num_remaining_functions_);
685   }
686   // We just read the last function body. Continue with the next section.
687   if (end_offset != section_buffer_->length()) {
688     return streaming->Error("not all code section bytes were used");
689   }
690   return std::make_unique<DecodeSectionID>(streaming->module_offset());
691 }
692 
AsyncStreamingDecoder(std::unique_ptr<StreamingProcessor> processor)693 AsyncStreamingDecoder::AsyncStreamingDecoder(
694     std::unique_ptr<StreamingProcessor> processor)
695     : processor_(std::move(processor)),
696       // A module always starts with a module header.
697       state_(new DecodeModuleHeader()) {}
698 
CreateNewBuffer(uint32_t module_offset,uint8_t section_id,size_t length,Vector<const uint8_t> length_bytes)699 AsyncStreamingDecoder::SectionBuffer* AsyncStreamingDecoder::CreateNewBuffer(
700     uint32_t module_offset, uint8_t section_id, size_t length,
701     Vector<const uint8_t> length_bytes) {
702   // Section buffers are allocated in the same order they appear in the module,
703   // they will be processed and later on concatenated in that same order.
704   section_buffers_.emplace_back(std::make_shared<SectionBuffer>(
705       module_offset, section_id, length, length_bytes));
706   return section_buffers_.back().get();
707 }
708 
CreateAsyncStreamingDecoder(std::unique_ptr<StreamingProcessor> processor)709 std::unique_ptr<StreamingDecoder> StreamingDecoder::CreateAsyncStreamingDecoder(
710     std::unique_ptr<StreamingProcessor> processor) {
711   return std::make_unique<AsyncStreamingDecoder>(std::move(processor));
712 }
713 
714 }  // namespace wasm
715 }  // namespace internal
716 }  // namespace v8
717 
718 #undef TRACE_STREAMING
719