1// Copyright 2016 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5import 'dart:async'; 6import 'dart:convert'; 7import 'dart:developer'; 8import 'dart:io'; 9import 'dart:isolate'; 10 11import 'package:logging/logging.dart'; 12import 'package:stack_trace/stack_trace.dart'; 13 14import 'running_processes.dart'; 15import 'utils.dart'; 16 17/// Represents a unit of work performed in the CI environment that can 18/// succeed, fail and be retried independently of others. 19typedef TaskFunction = Future<TaskResult> Function(); 20 21bool _isTaskRegistered = false; 22 23/// Registers a [task] to run, returns the result when it is complete. 24/// 25/// The task does not run immediately but waits for the request via the 26/// VM service protocol to run it. 27/// 28/// It is ok for a [task] to perform many things. However, only one task can be 29/// registered per Dart VM. 30Future<TaskResult> task(TaskFunction task) { 31 if (_isTaskRegistered) 32 throw StateError('A task is already registered'); 33 34 _isTaskRegistered = true; 35 36 // TODO(ianh): allow overriding logging. 37 Logger.root.level = Level.ALL; 38 Logger.root.onRecord.listen((LogRecord rec) { 39 print('${rec.level.name}: ${rec.time}: ${rec.message}'); 40 }); 41 42 final _TaskRunner runner = _TaskRunner(task); 43 runner.keepVmAliveUntilTaskRunRequested(); 44 return runner.whenDone; 45} 46 47class _TaskRunner { 48 _TaskRunner(this.task) { 49 registerExtension('ext.cocoonRunTask', 50 (String method, Map<String, String> parameters) async { 51 final Duration taskTimeout = parameters.containsKey('timeoutInMinutes') 52 ? Duration(minutes: int.parse(parameters['timeoutInMinutes'])) 53 : null; 54 final TaskResult result = await run(taskTimeout); 55 return ServiceExtensionResponse.result(json.encode(result.toJson())); 56 }); 57 registerExtension('ext.cocoonRunnerReady', 58 (String method, Map<String, String> parameters) async { 59 return ServiceExtensionResponse.result('"ready"'); 60 }); 61 } 62 63 final TaskFunction task; 64 65 // TODO(ianh): workaround for https://github.com/dart-lang/sdk/issues/23797 66 RawReceivePort _keepAlivePort; 67 Timer _startTaskTimeout; 68 bool _taskStarted = false; 69 70 final Completer<TaskResult> _completer = Completer<TaskResult>(); 71 72 static final Logger logger = Logger('TaskRunner'); 73 74 /// Signals that this task runner finished running the task. 75 Future<TaskResult> get whenDone => _completer.future; 76 77 Future<TaskResult> run(Duration taskTimeout) async { 78 try { 79 _taskStarted = true; 80 print('Running task.'); 81 final String exe = Platform.isWindows ? '.exe' : ''; 82 section('Checking running Dart$exe processes'); 83 final Set<RunningProcessInfo> beforeRunningDartInstances = await getRunningProcesses( 84 processName: 'dart$exe', 85 ).toSet(); 86 beforeRunningDartInstances.forEach(print); 87 88 Future<TaskResult> futureResult = _performTask(); 89 if (taskTimeout != null) 90 futureResult = futureResult.timeout(taskTimeout); 91 TaskResult result = await futureResult; 92 93 section('Checking running Dart$exe processes after task...'); 94 final List<RunningProcessInfo> afterRunningDartInstances = await getRunningProcesses( 95 processName: 'dart$exe', 96 ).toList(); 97 for (final RunningProcessInfo info in afterRunningDartInstances) { 98 if (!beforeRunningDartInstances.contains(info)) { 99 print('$info was leaked by this test.'); 100 // TODO(dnfield): remove this special casing after https://github.com/flutter/flutter/issues/29141 is resolved. 101 if (result is TaskResultCheckProcesses) { 102 result = TaskResult.failure('This test leaked dart processes'); 103 } 104 final bool killed = await killProcess(info.pid); 105 if (!killed) { 106 print('Failed to kill process ${info.pid}.'); 107 } else { 108 print('Killed process id ${info.pid}.'); 109 } 110 } 111 } 112 113 _completer.complete(result); 114 return result; 115 } on TimeoutException catch (_) { 116 print('Task timed out in framework.dart after $taskTimeout.'); 117 return TaskResult.failure('Task timed out after $taskTimeout'); 118 } finally { 119 print('Cleaning up after task...'); 120 await forceQuitRunningProcesses(); 121 _closeKeepAlivePort(); 122 } 123 } 124 125 /// Causes the Dart VM to stay alive until a request to run the task is 126 /// received via the VM service protocol. 127 void keepVmAliveUntilTaskRunRequested() { 128 if (_taskStarted) 129 throw StateError('Task already started.'); 130 131 // Merely creating this port object will cause the VM to stay alive and keep 132 // the VM service server running until the port is disposed of. 133 _keepAlivePort = RawReceivePort(); 134 135 // Timeout if nothing bothers to connect and ask us to run the task. 136 const Duration taskStartTimeout = Duration(seconds: 60); 137 _startTaskTimeout = Timer(taskStartTimeout, () { 138 if (!_taskStarted) { 139 logger.severe('Task did not start in $taskStartTimeout.'); 140 _closeKeepAlivePort(); 141 exitCode = 1; 142 } 143 }); 144 } 145 146 /// Disables the keep-alive port, allowing the VM to exit. 147 void _closeKeepAlivePort() { 148 _startTaskTimeout?.cancel(); 149 _keepAlivePort?.close(); 150 } 151 152 Future<TaskResult> _performTask() { 153 final Completer<TaskResult> completer = Completer<TaskResult>(); 154 Chain.capture(() async { 155 completer.complete(await task()); 156 }, onError: (dynamic taskError, Chain taskErrorStack) { 157 final String message = 'Task failed: $taskError'; 158 stderr 159 ..writeln(message) 160 ..writeln('\nStack trace:') 161 ..writeln(taskErrorStack.terse); 162 // IMPORTANT: We're completing the future _successfully_ but with a value 163 // that indicates a task failure. This is intentional. At this point we 164 // are catching errors coming from arbitrary (and untrustworthy) task 165 // code. Our goal is to convert the failure into a readable message. 166 // Propagating it further is not useful. 167 if (!completer.isCompleted) 168 completer.complete(TaskResult.failure(message)); 169 }); 170 return completer.future; 171 } 172} 173 174/// A result of running a single task. 175class TaskResult { 176 /// Constructs a successful result. 177 TaskResult.success(this.data, {this.benchmarkScoreKeys = const <String>[]}) 178 : succeeded = true, 179 message = 'success' { 180 const JsonEncoder prettyJson = JsonEncoder.withIndent(' '); 181 if (benchmarkScoreKeys != null) { 182 for (String key in benchmarkScoreKeys) { 183 if (!data.containsKey(key)) { 184 throw 'Invalid Golem score key "$key". It does not exist in task ' 185 'result data ${prettyJson.convert(data)}'; 186 } else if (data[key] is! num) { 187 throw 'Invalid Golem score for key "$key". It is expected to be a num ' 188 'but was ${data[key].runtimeType}: ${prettyJson.convert(data[key])}'; 189 } 190 } 191 } 192 } 193 194 /// Constructs a successful result using JSON data stored in a file. 195 factory TaskResult.successFromFile(File file, 196 {List<String> benchmarkScoreKeys}) { 197 return TaskResult.success(json.decode(file.readAsStringSync()), 198 benchmarkScoreKeys: benchmarkScoreKeys); 199 } 200 201 /// Constructs an unsuccessful result. 202 TaskResult.failure(this.message) 203 : succeeded = false, 204 data = null, 205 benchmarkScoreKeys = const <String>[]; 206 207 /// Whether the task succeeded. 208 final bool succeeded; 209 210 /// Task-specific JSON data 211 final Map<String, dynamic> data; 212 213 /// Keys in [data] that store scores that will be submitted to Golem. 214 /// 215 /// Each key is also part of a benchmark's name tracked by Golem. 216 /// A benchmark name is computed by combining [Task.name] with a key 217 /// separated by a dot. For example, if a task's name is 218 /// `"complex_layout__start_up"` and score key is 219 /// `"engineEnterTimestampMicros"`, the score will be submitted to Golem under 220 /// `"complex_layout__start_up.engineEnterTimestampMicros"`. 221 /// 222 /// This convention reduces the amount of configuration that needs to be done 223 /// to submit benchmark scores to Golem. 224 final List<String> benchmarkScoreKeys; 225 226 /// Whether the task failed. 227 bool get failed => !succeeded; 228 229 /// Explains the result in a human-readable format. 230 final String message; 231 232 /// Serializes this task result to JSON format. 233 /// 234 /// The JSON format is as follows: 235 /// 236 /// { 237 /// "success": true|false, 238 /// "data": arbitrary JSON data valid only for successful results, 239 /// "benchmarkScoreKeys": [ 240 /// contains keys into "data" that represent benchmarks scores, which 241 /// can be uploaded, for example. to golem, valid only for successful 242 /// results 243 /// ], 244 /// "reason": failure reason string valid only for unsuccessful results 245 /// } 246 Map<String, dynamic> toJson() { 247 final Map<String, dynamic> json = <String, dynamic>{ 248 'success': succeeded, 249 }; 250 251 if (succeeded) { 252 json['data'] = data; 253 json['benchmarkScoreKeys'] = benchmarkScoreKeys; 254 } else { 255 json['reason'] = message; 256 } 257 258 return json; 259 } 260} 261 262class TaskResultCheckProcesses extends TaskResult { 263 TaskResultCheckProcesses() : super.success(null); 264} 265