• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright 2016 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5import 'dart:async';
6import 'dart:convert';
7import 'dart:developer';
8import 'dart:io';
9import 'dart:isolate';
10
11import 'package:logging/logging.dart';
12import 'package:stack_trace/stack_trace.dart';
13
14import 'running_processes.dart';
15import 'utils.dart';
16
17/// Represents a unit of work performed in the CI environment that can
18/// succeed, fail and be retried independently of others.
19typedef TaskFunction = Future<TaskResult> Function();
20
21bool _isTaskRegistered = false;
22
23/// Registers a [task] to run, returns the result when it is complete.
24///
25/// The task does not run immediately but waits for the request via the
26/// VM service protocol to run it.
27///
28/// It is ok for a [task] to perform many things. However, only one task can be
29/// registered per Dart VM.
30Future<TaskResult> task(TaskFunction task) {
31  if (_isTaskRegistered)
32    throw StateError('A task is already registered');
33
34  _isTaskRegistered = true;
35
36  // TODO(ianh): allow overriding logging.
37  Logger.root.level = Level.ALL;
38  Logger.root.onRecord.listen((LogRecord rec) {
39    print('${rec.level.name}: ${rec.time}: ${rec.message}');
40  });
41
42  final _TaskRunner runner = _TaskRunner(task);
43  runner.keepVmAliveUntilTaskRunRequested();
44  return runner.whenDone;
45}
46
47class _TaskRunner {
48  _TaskRunner(this.task) {
49    registerExtension('ext.cocoonRunTask',
50        (String method, Map<String, String> parameters) async {
51      final Duration taskTimeout = parameters.containsKey('timeoutInMinutes')
52        ? Duration(minutes: int.parse(parameters['timeoutInMinutes']))
53        : null;
54      final TaskResult result = await run(taskTimeout);
55      return ServiceExtensionResponse.result(json.encode(result.toJson()));
56    });
57    registerExtension('ext.cocoonRunnerReady',
58        (String method, Map<String, String> parameters) async {
59      return ServiceExtensionResponse.result('"ready"');
60    });
61  }
62
63  final TaskFunction task;
64
65  // TODO(ianh): workaround for https://github.com/dart-lang/sdk/issues/23797
66  RawReceivePort _keepAlivePort;
67  Timer _startTaskTimeout;
68  bool _taskStarted = false;
69
70  final Completer<TaskResult> _completer = Completer<TaskResult>();
71
72  static final Logger logger = Logger('TaskRunner');
73
74  /// Signals that this task runner finished running the task.
75  Future<TaskResult> get whenDone => _completer.future;
76
77  Future<TaskResult> run(Duration taskTimeout) async {
78    try {
79      _taskStarted = true;
80      print('Running task.');
81      final String exe = Platform.isWindows ? '.exe' : '';
82      section('Checking running Dart$exe processes');
83      final Set<RunningProcessInfo> beforeRunningDartInstances = await getRunningProcesses(
84        processName: 'dart$exe',
85      ).toSet();
86      beforeRunningDartInstances.forEach(print);
87
88      Future<TaskResult> futureResult = _performTask();
89      if (taskTimeout != null)
90        futureResult = futureResult.timeout(taskTimeout);
91      TaskResult result = await futureResult;
92
93      section('Checking running Dart$exe processes after task...');
94      final List<RunningProcessInfo> afterRunningDartInstances = await getRunningProcesses(
95        processName: 'dart$exe',
96      ).toList();
97      for (final RunningProcessInfo info in afterRunningDartInstances) {
98        if (!beforeRunningDartInstances.contains(info)) {
99          print('$info was leaked by this test.');
100          // TODO(dnfield): remove this special casing after https://github.com/flutter/flutter/issues/29141 is resolved.
101          if (result is TaskResultCheckProcesses) {
102            result = TaskResult.failure('This test leaked dart processes');
103          }
104          final bool killed = await killProcess(info.pid);
105          if (!killed) {
106            print('Failed to kill process ${info.pid}.');
107          } else {
108            print('Killed process id ${info.pid}.');
109          }
110        }
111      }
112
113      _completer.complete(result);
114      return result;
115    } on TimeoutException catch (_) {
116      print('Task timed out in framework.dart after $taskTimeout.');
117      return TaskResult.failure('Task timed out after $taskTimeout');
118    } finally {
119      print('Cleaning up after task...');
120      await forceQuitRunningProcesses();
121      _closeKeepAlivePort();
122    }
123  }
124
125  /// Causes the Dart VM to stay alive until a request to run the task is
126  /// received via the VM service protocol.
127  void keepVmAliveUntilTaskRunRequested() {
128    if (_taskStarted)
129      throw StateError('Task already started.');
130
131    // Merely creating this port object will cause the VM to stay alive and keep
132    // the VM service server running until the port is disposed of.
133    _keepAlivePort = RawReceivePort();
134
135    // Timeout if nothing bothers to connect and ask us to run the task.
136    const Duration taskStartTimeout = Duration(seconds: 60);
137    _startTaskTimeout = Timer(taskStartTimeout, () {
138      if (!_taskStarted) {
139        logger.severe('Task did not start in $taskStartTimeout.');
140        _closeKeepAlivePort();
141        exitCode = 1;
142      }
143    });
144  }
145
146  /// Disables the keep-alive port, allowing the VM to exit.
147  void _closeKeepAlivePort() {
148    _startTaskTimeout?.cancel();
149    _keepAlivePort?.close();
150  }
151
152  Future<TaskResult> _performTask() {
153    final Completer<TaskResult> completer = Completer<TaskResult>();
154    Chain.capture(() async {
155      completer.complete(await task());
156    }, onError: (dynamic taskError, Chain taskErrorStack) {
157      final String message = 'Task failed: $taskError';
158      stderr
159        ..writeln(message)
160        ..writeln('\nStack trace:')
161        ..writeln(taskErrorStack.terse);
162      // IMPORTANT: We're completing the future _successfully_ but with a value
163      // that indicates a task failure. This is intentional. At this point we
164      // are catching errors coming from arbitrary (and untrustworthy) task
165      // code. Our goal is to convert the failure into a readable message.
166      // Propagating it further is not useful.
167      if (!completer.isCompleted)
168        completer.complete(TaskResult.failure(message));
169    });
170    return completer.future;
171  }
172}
173
174/// A result of running a single task.
175class TaskResult {
176  /// Constructs a successful result.
177  TaskResult.success(this.data, {this.benchmarkScoreKeys = const <String>[]})
178      : succeeded = true,
179        message = 'success' {
180    const JsonEncoder prettyJson = JsonEncoder.withIndent('  ');
181    if (benchmarkScoreKeys != null) {
182      for (String key in benchmarkScoreKeys) {
183        if (!data.containsKey(key)) {
184          throw 'Invalid Golem score key "$key". It does not exist in task '
185              'result data ${prettyJson.convert(data)}';
186        } else if (data[key] is! num) {
187          throw 'Invalid Golem score for key "$key". It is expected to be a num '
188              'but was ${data[key].runtimeType}: ${prettyJson.convert(data[key])}';
189        }
190      }
191    }
192  }
193
194  /// Constructs a successful result using JSON data stored in a file.
195  factory TaskResult.successFromFile(File file,
196      {List<String> benchmarkScoreKeys}) {
197    return TaskResult.success(json.decode(file.readAsStringSync()),
198        benchmarkScoreKeys: benchmarkScoreKeys);
199  }
200
201  /// Constructs an unsuccessful result.
202  TaskResult.failure(this.message)
203      : succeeded = false,
204        data = null,
205        benchmarkScoreKeys = const <String>[];
206
207  /// Whether the task succeeded.
208  final bool succeeded;
209
210  /// Task-specific JSON data
211  final Map<String, dynamic> data;
212
213  /// Keys in [data] that store scores that will be submitted to Golem.
214  ///
215  /// Each key is also part of a benchmark's name tracked by Golem.
216  /// A benchmark name is computed by combining [Task.name] with a key
217  /// separated by a dot. For example, if a task's name is
218  /// `"complex_layout__start_up"` and score key is
219  /// `"engineEnterTimestampMicros"`, the score will be submitted to Golem under
220  /// `"complex_layout__start_up.engineEnterTimestampMicros"`.
221  ///
222  /// This convention reduces the amount of configuration that needs to be done
223  /// to submit benchmark scores to Golem.
224  final List<String> benchmarkScoreKeys;
225
226  /// Whether the task failed.
227  bool get failed => !succeeded;
228
229  /// Explains the result in a human-readable format.
230  final String message;
231
232  /// Serializes this task result to JSON format.
233  ///
234  /// The JSON format is as follows:
235  ///
236  ///     {
237  ///       "success": true|false,
238  ///       "data": arbitrary JSON data valid only for successful results,
239  ///       "benchmarkScoreKeys": [
240  ///         contains keys into "data" that represent benchmarks scores, which
241  ///         can be uploaded, for example. to golem, valid only for successful
242  ///         results
243  ///       ],
244  ///       "reason": failure reason string valid only for unsuccessful results
245  ///     }
246  Map<String, dynamic> toJson() {
247    final Map<String, dynamic> json = <String, dynamic>{
248      'success': succeeded,
249    };
250
251    if (succeeded) {
252      json['data'] = data;
253      json['benchmarkScoreKeys'] = benchmarkScoreKeys;
254    } else {
255      json['reason'] = message;
256    }
257
258    return json;
259  }
260}
261
262class TaskResultCheckProcesses extends TaskResult {
263  TaskResultCheckProcesses() : super.success(null);
264}
265