1// Copyright 2017 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5import 'dart:async'; 6 7import 'package:file/file.dart'; 8import 'package:stream_channel/stream_channel.dart'; 9 10import 'base/io.dart'; 11import 'base/process.dart'; 12import 'convert.dart'; 13import 'globals.dart'; 14 15const String _kManifest = 'MANIFEST.txt'; 16const String _kRequest = 'request'; 17const String _kResponse = 'response'; 18const String _kId = 'id'; 19const String _kType = 'type'; 20const String _kData = 'data'; 21 22/// A [StreamChannel] that expects VM service (JSON-rpc) protocol messages and 23/// serializes all such messages to the file system for later playback. 24class RecordingVMServiceChannel extends DelegatingStreamChannel<String> { 25 RecordingVMServiceChannel(StreamChannel<String> delegate, Directory location) 26 : super(delegate) { 27 addShutdownHook(() async { 28 // Sort the messages such that they are ordered 29 // `[request1, response1, request2, response2, ...]`. This serves no 30 // purpose other than to make the serialized format more human-readable. 31 _messages.sort(); 32 33 final File file = _getManifest(location); 34 final String json = const JsonEncoder.withIndent(' ').convert(_messages); 35 await file.writeAsString(json, flush: true); 36 }, ShutdownStage.SERIALIZE_RECORDING); 37 } 38 39 final List<_Message> _messages = <_Message>[]; 40 41 _RecordingStream _streamRecorder; 42 _RecordingSink _sinkRecorder; 43 44 @override 45 Stream<String> get stream { 46 _streamRecorder ??= _RecordingStream(super.stream, _messages); 47 return _streamRecorder.stream; 48 } 49 50 @override 51 StreamSink<String> get sink => _sinkRecorder ??= _RecordingSink(super.sink, _messages); 52} 53 54/// Base class for request and response JSON-rpc messages. 55abstract class _Message implements Comparable<_Message> { 56 _Message(this.type, this.data); 57 58 factory _Message.fromRecording(Map<String, dynamic> recordingData) { 59 return recordingData[_kType] == _kRequest 60 ? _Request(recordingData[_kData]) 61 : _Response(recordingData[_kData]); 62 } 63 64 final String type; 65 final Map<String, dynamic> data; 66 67 int get id => data[_kId]; 68 69 /// Allows [JsonEncoder] to properly encode objects of this type. 70 Map<String, dynamic> toJson() { 71 return <String, dynamic>{ 72 _kType: type, 73 _kData: data, 74 }; 75 } 76 77 @override 78 int compareTo(_Message other) { 79 if (id == null) { 80 printError('Invalid VMService message data detected: $data'); 81 return -1; 82 } 83 final int result = id.compareTo(other.id); 84 if (result != 0) { 85 return result; 86 } else if (type == _kRequest) { 87 return -1; 88 } else { 89 return 1; 90 } 91 } 92} 93 94/// A VM service JSON-rpc request (sent to the VM). 95class _Request extends _Message { 96 _Request(Map<String, dynamic> data) : super(_kRequest, data); 97 _Request.fromString(String data) : this(json.decoder.convert(data)); 98} 99 100/// A VM service JSON-rpc response (from the VM). 101class _Response extends _Message { 102 _Response(Map<String, dynamic> data) : super(_kResponse, data); 103 _Response.fromString(String data) : this(json.decoder.convert(data)); 104} 105 106/// A matching request/response pair. 107/// 108/// A request and response match by virtue of having matching 109/// [IDs](_Message.id). 110class _Transaction { 111 _Request request; 112 _Response response; 113} 114 115/// A helper class that monitors a [Stream] of VM service JSON-rpc responses 116/// and saves the responses to a recording. 117class _RecordingStream { 118 _RecordingStream(Stream<String> stream, this._recording) 119 : _delegate = stream, 120 _controller = stream.isBroadcast 121 ? StreamController<String>.broadcast() 122 : StreamController<String>() { 123 _controller.onListen = () { 124 assert(_subscription == null); 125 _subscription = _listenToStream(); 126 }; 127 _controller.onCancel = () async { 128 assert(_subscription != null); 129 await _subscription.cancel(); 130 _subscription = null; 131 }; 132 _controller.onPause = () { 133 assert(_subscription != null && !_subscription.isPaused); 134 _subscription.pause(); 135 }; 136 _controller.onResume = () { 137 assert(_subscription != null && _subscription.isPaused); 138 _subscription.resume(); 139 }; 140 } 141 142 final Stream<String> _delegate; 143 final StreamController<String> _controller; 144 final List<_Message> _recording; 145 StreamSubscription<String> _subscription; 146 147 StreamSubscription<String> _listenToStream() { 148 return _delegate.listen( 149 (String element) { 150 _recording.add(_Response.fromString(element)); 151 _controller.add(element); 152 }, 153 onError: _controller.addError, // We currently don't support recording of errors. 154 onDone: _controller.close, 155 ); 156 } 157 158 /// The wrapped [Stream] to expose to callers. 159 Stream<String> get stream => _controller.stream; 160} 161 162/// A [StreamSink] that monitors VM service JSON-rpc requests and saves the 163/// requests to a recording. 164class _RecordingSink implements StreamSink<String> { 165 _RecordingSink(this._delegate, this._recording); 166 167 final StreamSink<String> _delegate; 168 final List<_Message> _recording; 169 170 @override 171 Future<dynamic> close() => _delegate.close(); 172 173 @override 174 Future<dynamic> get done => _delegate.done; 175 176 @override 177 void add(String data) { 178 _delegate.add(data); 179 _recording.add(_Request.fromString(data)); 180 } 181 182 @override 183 void addError(dynamic errorEvent, [ StackTrace stackTrace ]) { 184 throw UnimplementedError('Add support for this if the need ever arises'); 185 } 186 187 @override 188 Future<dynamic> addStream(Stream<String> stream) { 189 throw UnimplementedError('Add support for this if the need ever arises'); 190 } 191} 192 193/// A [StreamChannel] that expects VM service (JSON-rpc) requests to be written 194/// to its [StreamChannel.sink], looks up those requests in a recording, and 195/// replays the corresponding responses back from the recording. 196class ReplayVMServiceChannel extends StreamChannelMixin<String> { 197 ReplayVMServiceChannel(Directory location) 198 : _transactions = _loadTransactions(location); 199 200 final Map<int, _Transaction> _transactions; 201 final StreamController<String> _controller = StreamController<String>(); 202 _ReplaySink _replaySink; 203 204 static Map<int, _Transaction> _loadTransactions(Directory location) { 205 final File file = _getManifest(location); 206 final String jsonData = file.readAsStringSync(); 207 final Iterable<_Message> messages = json.decoder.convert(jsonData).map<_Message>(_toMessage); 208 final Map<int, _Transaction> transactions = <int, _Transaction>{}; 209 for (_Message message in messages) { 210 final _Transaction transaction = 211 transactions.putIfAbsent(message.id, () => _Transaction()); 212 if (message.type == _kRequest) { 213 assert(transaction.request == null); 214 transaction.request = message; 215 } else { 216 assert(transaction.response == null); 217 transaction.response = message; 218 } 219 } 220 return transactions; 221 } 222 223 static _Message _toMessage(Map<String, dynamic> jsonData) { 224 return _Message.fromRecording(jsonData); 225 } 226 227 void send(_Request request) { 228 if (!_transactions.containsKey(request.id)) 229 throw ArgumentError('No matching invocation found'); 230 final _Transaction transaction = _transactions.remove(request.id); 231 // TODO(tvolkert): validate that `transaction.request` matches `request` 232 if (transaction.response == null) { 233 // This signals that when we were recording, the VM shut down before 234 // we received the response. This is typically due to the user quitting 235 // the app runner. We follow suit here and exit. 236 printStatus('Exiting due to dangling request'); 237 exit(0); 238 } else { 239 _controller.add(json.encoder.convert(transaction.response.data)); 240 if (_transactions.isEmpty) 241 _controller.close(); 242 } 243 } 244 245 @override 246 StreamSink<String> get sink => _replaySink ??= _ReplaySink(this); 247 248 @override 249 Stream<String> get stream => _controller.stream; 250} 251 252class _ReplaySink implements StreamSink<String> { 253 _ReplaySink(this.channel); 254 255 final ReplayVMServiceChannel channel; 256 final Completer<void> _completer = Completer<void>(); 257 258 @override 259 Future<dynamic> close() { 260 _completer.complete(); 261 return _completer.future; 262 } 263 264 @override 265 Future<dynamic> get done => _completer.future; 266 267 @override 268 void add(String data) { 269 if (_completer.isCompleted) 270 throw StateError('Sink already closed'); 271 channel.send(_Request.fromString(data)); 272 } 273 274 @override 275 void addError(dynamic errorEvent, [ StackTrace stackTrace ]) { 276 throw UnimplementedError('Add support for this if the need ever arises'); 277 } 278 279 @override 280 Future<dynamic> addStream(Stream<String> stream) { 281 throw UnimplementedError('Add support for this if the need ever arises'); 282 } 283} 284 285File _getManifest(Directory location) { 286 final String path = location.fileSystem.path.join(location.path, _kManifest); 287 return location.fileSystem.file(path); 288} 289