• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright 2017 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5import 'dart:async';
6
7import 'package:file/file.dart';
8import 'package:stream_channel/stream_channel.dart';
9
10import 'base/io.dart';
11import 'base/process.dart';
12import 'convert.dart';
13import 'globals.dart';
14
15const String _kManifest = 'MANIFEST.txt';
16const String _kRequest = 'request';
17const String _kResponse = 'response';
18const String _kId = 'id';
19const String _kType = 'type';
20const String _kData = 'data';
21
22/// A [StreamChannel] that expects VM service (JSON-rpc) protocol messages and
23/// serializes all such messages to the file system for later playback.
24class RecordingVMServiceChannel extends DelegatingStreamChannel<String> {
25  RecordingVMServiceChannel(StreamChannel<String> delegate, Directory location)
26      : super(delegate) {
27    addShutdownHook(() async {
28      // Sort the messages such that they are ordered
29      // `[request1, response1, request2, response2, ...]`. This serves no
30      // purpose other than to make the serialized format more human-readable.
31      _messages.sort();
32
33      final File file = _getManifest(location);
34      final String json = const JsonEncoder.withIndent('  ').convert(_messages);
35      await file.writeAsString(json, flush: true);
36    }, ShutdownStage.SERIALIZE_RECORDING);
37  }
38
39  final List<_Message> _messages = <_Message>[];
40
41  _RecordingStream _streamRecorder;
42  _RecordingSink _sinkRecorder;
43
44  @override
45  Stream<String> get stream {
46    _streamRecorder ??= _RecordingStream(super.stream, _messages);
47    return _streamRecorder.stream;
48  }
49
50  @override
51  StreamSink<String> get sink => _sinkRecorder ??= _RecordingSink(super.sink, _messages);
52}
53
54/// Base class for request and response JSON-rpc messages.
55abstract class _Message implements Comparable<_Message> {
56  _Message(this.type, this.data);
57
58  factory _Message.fromRecording(Map<String, dynamic> recordingData) {
59    return recordingData[_kType] == _kRequest
60        ? _Request(recordingData[_kData])
61        : _Response(recordingData[_kData]);
62  }
63
64  final String type;
65  final Map<String, dynamic> data;
66
67  int get id => data[_kId];
68
69  /// Allows [JsonEncoder] to properly encode objects of this type.
70  Map<String, dynamic> toJson() {
71    return <String, dynamic>{
72      _kType: type,
73      _kData: data,
74    };
75  }
76
77  @override
78  int compareTo(_Message other) {
79    if (id == null) {
80      printError('Invalid VMService message data detected: $data');
81      return -1;
82    }
83    final int result = id.compareTo(other.id);
84    if (result != 0) {
85      return result;
86    } else if (type == _kRequest) {
87      return -1;
88    } else {
89      return 1;
90    }
91  }
92}
93
94/// A VM service JSON-rpc request (sent to the VM).
95class _Request extends _Message {
96  _Request(Map<String, dynamic> data) : super(_kRequest, data);
97  _Request.fromString(String data) : this(json.decoder.convert(data));
98}
99
100/// A VM service JSON-rpc response (from the VM).
101class _Response extends _Message {
102  _Response(Map<String, dynamic> data) : super(_kResponse, data);
103  _Response.fromString(String data) : this(json.decoder.convert(data));
104}
105
106/// A matching request/response pair.
107///
108/// A request and response match by virtue of having matching
109/// [IDs](_Message.id).
110class _Transaction {
111  _Request request;
112  _Response response;
113}
114
115/// A helper class that monitors a [Stream] of VM service JSON-rpc responses
116/// and saves the responses to a recording.
117class _RecordingStream {
118  _RecordingStream(Stream<String> stream, this._recording)
119      : _delegate = stream,
120        _controller = stream.isBroadcast
121            ? StreamController<String>.broadcast()
122            : StreamController<String>() {
123    _controller.onListen = () {
124      assert(_subscription == null);
125      _subscription = _listenToStream();
126    };
127    _controller.onCancel = () async {
128      assert(_subscription != null);
129      await _subscription.cancel();
130      _subscription = null;
131    };
132    _controller.onPause = () {
133      assert(_subscription != null && !_subscription.isPaused);
134      _subscription.pause();
135    };
136    _controller.onResume = () {
137      assert(_subscription != null && _subscription.isPaused);
138      _subscription.resume();
139    };
140  }
141
142  final Stream<String> _delegate;
143  final StreamController<String> _controller;
144  final List<_Message> _recording;
145  StreamSubscription<String> _subscription;
146
147  StreamSubscription<String> _listenToStream() {
148    return _delegate.listen(
149      (String element) {
150        _recording.add(_Response.fromString(element));
151        _controller.add(element);
152      },
153      onError: _controller.addError, // We currently don't support recording of errors.
154      onDone: _controller.close,
155    );
156  }
157
158  /// The wrapped [Stream] to expose to callers.
159  Stream<String> get stream => _controller.stream;
160}
161
162/// A [StreamSink] that monitors VM service JSON-rpc requests and saves the
163/// requests to a recording.
164class _RecordingSink implements StreamSink<String> {
165  _RecordingSink(this._delegate, this._recording);
166
167  final StreamSink<String> _delegate;
168  final List<_Message> _recording;
169
170  @override
171  Future<dynamic> close() => _delegate.close();
172
173  @override
174  Future<dynamic> get done => _delegate.done;
175
176  @override
177  void add(String data) {
178    _delegate.add(data);
179    _recording.add(_Request.fromString(data));
180  }
181
182  @override
183  void addError(dynamic errorEvent, [ StackTrace stackTrace ]) {
184    throw UnimplementedError('Add support for this if the need ever arises');
185  }
186
187  @override
188  Future<dynamic> addStream(Stream<String> stream) {
189    throw UnimplementedError('Add support for this if the need ever arises');
190  }
191}
192
193/// A [StreamChannel] that expects VM service (JSON-rpc) requests to be written
194/// to its [StreamChannel.sink], looks up those requests in a recording, and
195/// replays the corresponding responses back from the recording.
196class ReplayVMServiceChannel extends StreamChannelMixin<String> {
197  ReplayVMServiceChannel(Directory location)
198    : _transactions = _loadTransactions(location);
199
200  final Map<int, _Transaction> _transactions;
201  final StreamController<String> _controller = StreamController<String>();
202  _ReplaySink _replaySink;
203
204  static Map<int, _Transaction> _loadTransactions(Directory location) {
205    final File file = _getManifest(location);
206    final String jsonData = file.readAsStringSync();
207    final Iterable<_Message> messages = json.decoder.convert(jsonData).map<_Message>(_toMessage);
208    final Map<int, _Transaction> transactions = <int, _Transaction>{};
209    for (_Message message in messages) {
210      final _Transaction transaction =
211          transactions.putIfAbsent(message.id, () => _Transaction());
212      if (message.type == _kRequest) {
213        assert(transaction.request == null);
214        transaction.request = message;
215      } else {
216        assert(transaction.response == null);
217        transaction.response = message;
218      }
219    }
220    return transactions;
221  }
222
223  static _Message _toMessage(Map<String, dynamic> jsonData) {
224    return _Message.fromRecording(jsonData);
225  }
226
227  void send(_Request request) {
228    if (!_transactions.containsKey(request.id))
229      throw ArgumentError('No matching invocation found');
230    final _Transaction transaction = _transactions.remove(request.id);
231    // TODO(tvolkert): validate that `transaction.request` matches `request`
232    if (transaction.response == null) {
233      // This signals that when we were recording, the VM shut down before
234      // we received the response. This is typically due to the user quitting
235      // the app runner. We follow suit here and exit.
236      printStatus('Exiting due to dangling request');
237      exit(0);
238    } else {
239      _controller.add(json.encoder.convert(transaction.response.data));
240      if (_transactions.isEmpty)
241        _controller.close();
242    }
243  }
244
245  @override
246  StreamSink<String> get sink => _replaySink ??= _ReplaySink(this);
247
248  @override
249  Stream<String> get stream => _controller.stream;
250}
251
252class _ReplaySink implements StreamSink<String> {
253  _ReplaySink(this.channel);
254
255  final ReplayVMServiceChannel channel;
256  final Completer<void> _completer = Completer<void>();
257
258  @override
259  Future<dynamic> close() {
260    _completer.complete();
261    return _completer.future;
262  }
263
264  @override
265  Future<dynamic> get done => _completer.future;
266
267  @override
268  void add(String data) {
269    if (_completer.isCompleted)
270      throw StateError('Sink already closed');
271    channel.send(_Request.fromString(data));
272  }
273
274  @override
275  void addError(dynamic errorEvent, [ StackTrace stackTrace ]) {
276    throw UnimplementedError('Add support for this if the need ever arises');
277  }
278
279  @override
280  Future<dynamic> addStream(Stream<String> stream) {
281    throw UnimplementedError('Add support for this if the need ever arises');
282  }
283}
284
285File _getManifest(Directory location) {
286  final String path = location.fileSystem.path.join(location.path, _kManifest);
287  return location.fileSystem.file(path);
288}
289