• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright (C) 2018 The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15import {defer} from '../base/deferred';
16import {assertExists, assertTrue} from '../base/logging';
17import * as init_trace_processor from '../gen/trace_processor';
18
19// The Initialize() call will allocate a buffer of REQ_BUF_SIZE bytes which
20// will be used to copy the input request data. This is to avoid passing the
21// input data on the stack, which has a limited (~1MB) size.
22// The buffer will be allocated by the C++ side and reachable at
23// HEAPU8[reqBufferAddr, +REQ_BUFFER_SIZE].
24const REQ_BUF_SIZE = 32 * 1024 * 1024;
25
26// The end-to-end interaction between JS and Wasm is as follows:
27// - [JS] Inbound data received by the worker (onmessage() in engine/index.ts).
28//   - [JS] onRpcDataReceived() (this file)
29//     - [C++] trace_processor_on_rpc_request (wasm_bridge.cc)
30//       - [C++] some TraceProcessor::method()
31//         for (batch in result_rows)
32//           - [C++] RpcResponseFunction(bytes) (wasm_bridge.cc)
33//             - [JS] onReply() (this file)
34//               - [JS] postMessage() (this file)
35export class WasmBridge {
36  // When this promise has resolved it is safe to call callWasm.
37  whenInitialized: Promise<void>;
38
39  private aborted: boolean;
40  private connection: init_trace_processor.Module;
41  private reqBufferAddr = 0;
42  private lastStderr: string[] = [];
43  private messagePort?: MessagePort;
44
45  constructor() {
46    this.aborted = false;
47    const deferredRuntimeInitialized = defer<void>();
48    this.connection = init_trace_processor({
49      locateFile: (s: string) => s,
50      print: (line: string) => console.log(line),
51      printErr: (line: string) => this.appendAndLogErr(line),
52      onRuntimeInitialized: () => deferredRuntimeInitialized.resolve(),
53    });
54    this.whenInitialized = deferredRuntimeInitialized.then(() => {
55      const fn = this.connection.addFunction(this.onReply.bind(this), 'vii');
56      this.reqBufferAddr = this.connection.ccall(
57          'trace_processor_rpc_init',
58          /*return=*/ 'number',
59          /*args=*/['number', 'number'],
60          [fn, REQ_BUF_SIZE]);
61    });
62  }
63
64  initialize(port: MessagePort) {
65    // Ensure that initialize() is called only once.
66    assertTrue(this.messagePort === undefined);
67    this.messagePort = port;
68    // Note: setting .onmessage implicitly calls port.start() and dispatches the
69    // queued messages. addEventListener('message') doesn't.
70    this.messagePort.onmessage = this.onMessage.bind(this);
71  }
72
73  onMessage(msg: MessageEvent) {
74    if (this.aborted) {
75      throw new Error('Wasm module crashed');
76    }
77    assertTrue(msg.data instanceof Uint8Array);
78    const data = msg.data as Uint8Array;
79    let wrSize = 0;
80    // If the request data is larger than our JS<>Wasm interop buffer, split it
81    // into multiple writes. The RPC channel is byte-oriented and is designed to
82    // deal with arbitrary fragmentations.
83    while (wrSize < data.length) {
84      const sliceLen = Math.min(data.length - wrSize, REQ_BUF_SIZE);
85      const dataSlice = data.subarray(wrSize, wrSize + sliceLen);
86      this.connection.HEAPU8.set(dataSlice, this.reqBufferAddr);
87      wrSize += sliceLen;
88      try {
89        this.connection.ccall(
90            'trace_processor_on_rpc_request',  // C function name.
91            'void',                            // Return type.
92            ['number'],                        // Arg types.
93            [sliceLen]                         // Args.
94        );
95      } catch (err) {
96        this.aborted = true;
97        let abortReason = `${err}`;
98        if (err instanceof Error) {
99          abortReason = `${err.name}: ${err.message}\n${err.stack}`;
100        }
101        abortReason += '\n\nstderr: \n' + this.lastStderr.join('\n');
102        throw new Error(abortReason);
103      }
104    }  // while(wrSize < data.length)
105  }
106
107  // This function is bound and passed to Initialize and is called by the C++
108  // code while in the ccall(trace_processor_on_rpc_request).
109  private onReply(heapPtr: number, size: number) {
110    const data = this.connection.HEAPU8.slice(heapPtr, heapPtr + size);
111    assertExists(this.messagePort).postMessage(data, [data.buffer]);
112  }
113
114  private appendAndLogErr(line: string) {
115    console.warn(line);
116    // Keep the last N lines in the |lastStderr| buffer.
117    this.lastStderr.push(line);
118    if (this.lastStderr.length > 512) {
119      this.lastStderr.shift();
120    }
121  }
122}
123