• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2016 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import __builtin__
6import Queue
7import base64
8import datetime
9import logging
10import os
11import shutil
12import signal
13import stat
14import sys
15import tempfile
16import time
17import unittest
18
19import mox
20
21import common
22from autotest_lib.client.common_lib import global_config, site_utils
23from autotest_lib.client.common_lib import time_utils
24from autotest_lib.client.common_lib import utils
25from autotest_lib.site_utils import gs_offloader
26from autotest_lib.site_utils import job_directories
27from autotest_lib.tko import models
28
29# Test value to use for `days_old`, if nothing else is required.
30_TEST_EXPIRATION_AGE = 7
31
32# When constructing sample time values for testing expiration,
33# allow this many seconds between the expiration time and the
34# current time.
35_MARGIN_SECS = 10.0
36
37
38def _get_options(argv):
39    """Helper function to exercise command line parsing.
40
41    @param argv Value of sys.argv to be parsed.
42
43    """
44    sys.argv = ['bogus.py'] + argv
45    return gs_offloader.parse_options()
46
47
48def is_fifo(path):
49  """Determines whether a path is a fifo."""
50  return stat.S_ISFIFO(os.lstat(path).st_mode)
51
52
53class OffloaderOptionsTests(mox.MoxTestBase):
54    """Tests for the `Offloader` constructor.
55
56    Tests that offloader instance fields are set as expected
57    for given command line options.
58
59    """
60
61    _REGULAR_ONLY = set([job_directories.RegularJobDirectory])
62    _SPECIAL_ONLY = set([job_directories.SpecialJobDirectory])
63    _BOTH = _REGULAR_ONLY | _SPECIAL_ONLY
64
65
66    def setUp(self):
67        super(OffloaderOptionsTests, self).setUp()
68        self.mox.StubOutWithMock(utils, 'get_offload_gsuri')
69        gs_offloader.GS_OFFLOADING_ENABLED = True
70        gs_offloader.GS_OFFLOADER_MULTIPROCESSING = False
71
72
73    def _mock_get_offload_func(self, is_moblab, multiprocessing=False,
74                               pubsub_topic=None, delete_age=0):
75        """Mock the process of getting the offload_dir function."""
76        if is_moblab:
77            expected_gsuri = '%sresults/%s/%s/' % (
78                    global_config.global_config.get_config_value(
79                            'CROS', 'image_storage_server'),
80                    'Fa:ke:ma:c0:12:34', 'rand0m-uu1d')
81        else:
82            expected_gsuri = utils.DEFAULT_OFFLOAD_GSURI
83        utils.get_offload_gsuri().AndReturn(expected_gsuri)
84        offload_func = gs_offloader.get_offload_dir_func(expected_gsuri,
85            multiprocessing, delete_age, pubsub_topic)
86        self.mox.StubOutWithMock(gs_offloader, 'get_offload_dir_func')
87        gs_offloader.get_offload_dir_func(expected_gsuri, multiprocessing,
88            delete_age, pubsub_topic).AndReturn(offload_func)
89        self.mox.ReplayAll()
90        return offload_func
91
92
93    def test_process_no_options(self):
94        """Test default offloader options."""
95        offload_func = self._mock_get_offload_func(False)
96        offloader = gs_offloader.Offloader(_get_options([]))
97        self.assertEqual(set(offloader._jobdir_classes),
98                         self._REGULAR_ONLY)
99        self.assertEqual(offloader._processes, 1)
100        self.assertEqual(offloader._offload_func,
101                         offload_func)
102        self.assertEqual(offloader._upload_age_limit, 0)
103        self.assertEqual(offloader._delete_age_limit, 0)
104
105
106    def test_process_all_option(self):
107        """Test offloader handling for the --all option."""
108        offload_func = self._mock_get_offload_func(False)
109        offloader = gs_offloader.Offloader(_get_options(['--all']))
110        self.assertEqual(set(offloader._jobdir_classes), self._BOTH)
111        self.assertEqual(offloader._processes, 1)
112        self.assertEqual(offloader._offload_func,
113                         offload_func)
114        self.assertEqual(offloader._upload_age_limit, 0)
115        self.assertEqual(offloader._delete_age_limit, 0)
116
117
118    def test_process_hosts_option(self):
119        """Test offloader handling for the --hosts option."""
120        offload_func = self._mock_get_offload_func(False)
121        offloader = gs_offloader.Offloader(
122                _get_options(['--hosts']))
123        self.assertEqual(set(offloader._jobdir_classes),
124                         self._SPECIAL_ONLY)
125        self.assertEqual(offloader._processes, 1)
126        self.assertEqual(offloader._offload_func,
127                         offload_func)
128        self.assertEqual(offloader._upload_age_limit, 0)
129        self.assertEqual(offloader._delete_age_limit, 0)
130
131
132    def test_parallelism_option(self):
133        """Test offloader handling for the --parallelism option."""
134        offload_func = self._mock_get_offload_func(False)
135        offloader = gs_offloader.Offloader(
136                _get_options(['--parallelism', '2']))
137        self.assertEqual(set(offloader._jobdir_classes),
138                         self._REGULAR_ONLY)
139        self.assertEqual(offloader._processes, 2)
140        self.assertEqual(offloader._offload_func,
141                         offload_func)
142        self.assertEqual(offloader._upload_age_limit, 0)
143        self.assertEqual(offloader._delete_age_limit, 0)
144
145
146    def test_delete_only_option(self):
147        """Test offloader handling for the --delete_only option."""
148        offloader = gs_offloader.Offloader(
149                _get_options(['--delete_only']))
150        self.assertEqual(set(offloader._jobdir_classes),
151                         self._REGULAR_ONLY)
152        self.assertEqual(offloader._processes, 1)
153        self.assertEqual(offloader._offload_func,
154                         gs_offloader.delete_files)
155        self.assertEqual(offloader._upload_age_limit, 0)
156        self.assertEqual(offloader._delete_age_limit, 0)
157        self.assertIsNone(offloader._pubsub_topic)
158
159
160    def test_days_old_option(self):
161        """Test offloader handling for the --days_old option."""
162        offload_func = self._mock_get_offload_func(False, delete_age=7)
163        offloader = gs_offloader.Offloader(
164                _get_options(['--days_old', '7']))
165        self.assertEqual(set(offloader._jobdir_classes),
166                         self._REGULAR_ONLY)
167        self.assertEqual(offloader._processes, 1)
168        self.assertEqual(offloader._offload_func,
169                         offload_func)
170        self.assertEqual(offloader._upload_age_limit, 7)
171        self.assertEqual(offloader._delete_age_limit, 7)
172
173
174    def test_moblab_gsuri_generation(self):
175        """Test offloader construction for Moblab."""
176        offload_func = self._mock_get_offload_func(True)
177        offloader = gs_offloader.Offloader(_get_options([]))
178        self.assertEqual(set(offloader._jobdir_classes),
179                         self._REGULAR_ONLY)
180        self.assertEqual(offloader._processes, 1)
181        self.assertEqual(offloader._offload_func,
182                         offload_func)
183        self.assertEqual(offloader._upload_age_limit, 0)
184        self.assertEqual(offloader._delete_age_limit, 0)
185
186
187    def test_globalconfig_offloading_flag(self):
188        """Test enabling of --delete_only via global_config."""
189        gs_offloader.GS_OFFLOADING_ENABLED = False
190        offloader = gs_offloader.Offloader(
191                _get_options([]))
192        self.assertEqual(offloader._offload_func,
193                         gs_offloader.delete_files)
194
195    def test_offloader_multiprocessing_flag_set(self):
196        """Test multiprocessing is set."""
197        offload_func = self._mock_get_offload_func(True, True)
198        offloader = gs_offloader.Offloader(_get_options(['-m']))
199        self.assertEqual(offloader._offload_func,
200                         offload_func)
201        self.mox.VerifyAll()
202
203    def test_offloader_multiprocessing_flag_not_set_default_false(self):
204        """Test multiprocessing is set."""
205        gs_offloader.GS_OFFLOADER_MULTIPROCESSING = False
206        offload_func = self._mock_get_offload_func(True, False)
207        offloader = gs_offloader.Offloader(_get_options([]))
208        self.assertEqual(offloader._offload_func,
209                         offload_func)
210        self.mox.VerifyAll()
211
212    def test_offloader_multiprocessing_flag_not_set_default_true(self):
213        """Test multiprocessing is set."""
214        gs_offloader.GS_OFFLOADER_MULTIPROCESSING = True
215        offload_func = self._mock_get_offload_func(True, True)
216        offloader = gs_offloader.Offloader(_get_options([]))
217        self.assertEqual(offloader._offload_func,
218                         offload_func)
219        self.mox.VerifyAll()
220
221    def test_offloader_pubsub_topic_not_set(self):
222        """Test multiprocessing is set."""
223        offload_func = self._mock_get_offload_func(True, False)
224        offloader = gs_offloader.Offloader(_get_options([]))
225        self.assertEqual(offloader._offload_func,
226                         offload_func)
227        self.mox.VerifyAll()
228
229    def test_offloader_pubsub_topic_set(self):
230        """Test multiprocessing is set."""
231        offload_func = self._mock_get_offload_func(True, False, 'test-topic')
232        offloader = gs_offloader.Offloader(_get_options(['-t', 'test-topic']))
233        self.assertEqual(offloader._offload_func,
234                         offload_func)
235        self.mox.VerifyAll()
236
237
238def _make_timestamp(age_limit, is_expired):
239    """Create a timestamp for use by `job_directories.is_job_expired()`.
240
241    The timestamp will meet the syntactic requirements for
242    timestamps used as input to `is_job_expired()`.  If
243    `is_expired` is true, the timestamp will be older than
244    `age_limit` days before the current time; otherwise, the
245    date will be younger.
246
247    @param age_limit    The number of days before expiration of the
248                        target timestamp.
249    @param is_expired   Whether the timestamp should be expired
250                        relative to `age_limit`.
251
252    """
253    seconds = -_MARGIN_SECS
254    if is_expired:
255        seconds = -seconds
256    delta = datetime.timedelta(days=age_limit, seconds=seconds)
257    reference_time = datetime.datetime.now() - delta
258    return reference_time.strftime(time_utils.TIME_FMT)
259
260
261class JobExpirationTests(unittest.TestCase):
262    """Tests to exercise `job_directories.is_job_expired()`."""
263
264    def test_expired(self):
265        """Test detection of an expired job."""
266        timestamp = _make_timestamp(_TEST_EXPIRATION_AGE, True)
267        self.assertTrue(
268            job_directories.is_job_expired(
269                _TEST_EXPIRATION_AGE, timestamp))
270
271
272    def test_alive(self):
273        """Test detection of a job that's not expired."""
274        # N.B.  This test may fail if its run time exceeds more than
275        # about _MARGIN_SECS seconds.
276        timestamp = _make_timestamp(_TEST_EXPIRATION_AGE, False)
277        self.assertFalse(
278            job_directories.is_job_expired(
279                _TEST_EXPIRATION_AGE, timestamp))
280
281
282class _MockJobDirectory(job_directories._JobDirectory):
283    """Subclass of `_JobDirectory` used as a helper for tests."""
284
285    GLOB_PATTERN = '[0-9]*-*'
286
287
288    def __init__(self, resultsdir):
289        """Create new job in initial state."""
290        super(_MockJobDirectory, self).__init__(resultsdir)
291        self._timestamp = None
292        self.queue_args = [resultsdir, os.path.dirname(resultsdir), self._timestamp]
293
294
295    def get_timestamp_if_finished(self):
296        return self._timestamp
297
298
299    def set_finished(self, days_old):
300        """Make this job appear to be finished.
301
302        After calling this function, calls to `enqueue_offload()`
303        will find this job as finished, but not expired and ready
304        for offload.  Note that when `days_old` is 0,
305        `enqueue_offload()` will treat a finished job as eligible
306        for offload.
307
308        @param days_old The value of the `days_old` parameter that
309                        will be passed to `enqueue_offload()` for
310                        testing.
311
312        """
313        self._timestamp = _make_timestamp(days_old, False)
314        self.queue_args[2] = self._timestamp
315
316
317    def set_expired(self, days_old):
318        """Make this job eligible to be offloaded.
319
320        After calling this function, calls to `offload` will attempt
321        to offload this job.
322
323        @param days_old The value of the `days_old` parameter that
324                        will be passed to `enqueue_offload()` for
325                        testing.
326
327        """
328        self._timestamp = _make_timestamp(days_old, True)
329        self.queue_args[2] = self._timestamp
330
331
332    def set_incomplete(self):
333        """Make this job appear to have failed offload just once."""
334        self._offload_count += 1
335        self._first_offload_start = time.time()
336        if not os.path.isdir(self._dirname):
337            os.mkdir(self._dirname)
338
339
340    def set_reportable(self):
341        """Make this job be reportable."""
342        self.set_incomplete()
343        self._offload_count += 1
344
345
346    def set_complete(self):
347        """Make this job be completed."""
348        self._offload_count += 1
349        if os.path.isdir(self._dirname):
350            os.rmdir(self._dirname)
351
352
353    def process_gs_instructions(self):
354        """Always still offload the job directory."""
355        return True
356
357
358class CommandListTests(unittest.TestCase):
359    """Tests for `get_cmd_list()`."""
360
361    def _command_list_assertions(self, job, use_rsync=True, multi=False):
362        """Call `get_cmd_list()` and check the return value.
363
364        Check the following assertions:
365          * The command name (argv[0]) is 'gsutil'.
366          * '-m' option (argv[1]) is on when the argument, multi, is True.
367          * The arguments contain the 'cp' subcommand.
368          * The next-to-last argument (the source directory) is the
369            job's `queue_args[0]`.
370          * The last argument (the destination URL) is the job's
371            'queue_args[1]'.
372
373        @param job A job with properly calculated arguments to
374                   `get_cmd_list()`
375        @param use_rsync True when using 'rsync'. False when using 'cp'.
376        @param multi True when using '-m' option for gsutil.
377
378        """
379        test_bucket_uri = 'gs://a-test-bucket'
380
381        gs_offloader.USE_RSYNC_ENABLED = use_rsync
382
383        command = gs_offloader.get_cmd_list(
384                multi, job.queue_args[0],
385                os.path.join(test_bucket_uri, job.queue_args[1]))
386
387        self.assertEqual(command[0], 'gsutil')
388        if multi:
389            self.assertEqual(command[1], '-m')
390        self.assertEqual(command[-2], job.queue_args[0])
391
392        if use_rsync:
393            self.assertTrue('rsync' in command)
394            self.assertEqual(command[-1],
395                             os.path.join(test_bucket_uri, job.queue_args[0]))
396        else:
397            self.assertTrue('cp' in command)
398            self.assertEqual(command[-1],
399                             os.path.join(test_bucket_uri, job.queue_args[1]))
400
401
402    def test_get_cmd_list_regular(self):
403        """Test `get_cmd_list()` as for a regular job."""
404        job = _MockJobDirectory('118-debug')
405        self._command_list_assertions(job)
406
407
408    def test_get_cmd_list_special(self):
409        """Test `get_cmd_list()` as for a special job."""
410        job = _MockJobDirectory('hosts/host1/118-reset')
411        self._command_list_assertions(job)
412
413
414    def test_get_cmd_list_regular_no_rsync(self):
415        """Test `get_cmd_list()` as for a regular job."""
416        job = _MockJobDirectory('118-debug')
417        self._command_list_assertions(job, use_rsync=False)
418
419
420    def test_get_cmd_list_special_no_rsync(self):
421        """Test `get_cmd_list()` as for a special job."""
422        job = _MockJobDirectory('hosts/host1/118-reset')
423        self._command_list_assertions(job, use_rsync=False)
424
425
426    def test_get_cmd_list_regular_multi(self):
427        """Test `get_cmd_list()` as for a regular job with True multi."""
428        job = _MockJobDirectory('118-debug')
429        self._command_list_assertions(job, multi=True)
430
431
432    def test_get_cmd_list_special_multi(self):
433        """Test `get_cmd_list()` as for a special job with True multi."""
434        job = _MockJobDirectory('hosts/host1/118-reset')
435        self._command_list_assertions(job, multi=True)
436
437
438class PubSubTest(mox.MoxTestBase):
439    """Test the test result notifcation data structure."""
440
441    def test_create_test_result_notification(self):
442        """Tests the test result notification message."""
443        self.mox.StubOutWithMock(site_utils, 'get_moblab_id')
444        self.mox.StubOutWithMock(site_utils,
445                                 'get_default_interface_mac_address')
446        site_utils.get_default_interface_mac_address().AndReturn(
447            '1c:dc:d1:11:01:e1')
448        site_utils.get_moblab_id().AndReturn(
449            'c8386d92-9ad1-11e6-80f5-111111111111')
450        self.mox.ReplayAll()
451        msg = gs_offloader._create_test_result_notification(
452                'gs://test_bucket', '123-moblab')
453        self.assertEquals(base64.b64encode(
454            gs_offloader.NEW_TEST_RESULT_MESSAGE), msg['data'])
455        self.assertEquals(
456            gs_offloader.NOTIFICATION_VERSION,
457            msg['attributes'][gs_offloader.NOTIFICATION_ATTR_VERSION])
458        self.assertEquals(
459            '1c:dc:d1:11:01:e1',
460            msg['attributes'][gs_offloader.NOTIFICATION_ATTR_MOBLAB_MAC])
461        self.assertEquals(
462            'c8386d92-9ad1-11e6-80f5-111111111111',
463            msg['attributes'][gs_offloader.NOTIFICATION_ATTR_MOBLAB_ID])
464        self.assertEquals(
465            'gs://test_bucket/123-moblab',
466            msg['attributes'][gs_offloader.NOTIFICATION_ATTR_GCS_URI])
467        self.mox.VerifyAll()
468
469
470class _MockJob(object):
471    """Class to mock the return value of `AFE.get_jobs()`."""
472    def __init__(self, created):
473        self.created_on = created
474
475
476class _MockHostQueueEntry(object):
477    """Class to mock the return value of `AFE.get_host_queue_entries()`."""
478    def __init__(self, finished):
479        self.finished_on = finished
480
481
482class _MockSpecialTask(object):
483    """Class to mock the return value of `AFE.get_special_tasks()`."""
484    def __init__(self, finished):
485        self.time_finished = finished
486
487
488class JobDirectorySubclassTests(mox.MoxTestBase):
489    """Test specific to RegularJobDirectory and SpecialJobDirectory.
490
491    This provides coverage for the implementation in both
492    RegularJobDirectory and SpecialJobDirectory.
493
494    """
495
496    def setUp(self):
497        super(JobDirectorySubclassTests, self).setUp()
498        self.mox.StubOutWithMock(job_directories._AFE, 'get_jobs')
499        self.mox.StubOutWithMock(job_directories._AFE,
500                                 'get_host_queue_entries')
501        self.mox.StubOutWithMock(job_directories._AFE,
502                                 'get_special_tasks')
503
504
505    def test_regular_job_fields(self):
506        """Test the constructor for `RegularJobDirectory`.
507
508        Construct a regular job, and assert that the `_dirname`
509        and `_id` attributes are set as expected.
510
511        """
512        resultsdir = '118-fubar'
513        job = job_directories.RegularJobDirectory(resultsdir)
514        self.assertEqual(job._dirname, resultsdir)
515        self.assertEqual(job._id, 118)
516
517
518    def test_special_job_fields(self):
519        """Test the constructor for `SpecialJobDirectory`.
520
521        Construct a special job, and assert that the `_dirname`
522        and `_id` attributes are set as expected.
523
524        """
525        destdir = 'hosts/host1'
526        resultsdir = destdir + '/118-reset'
527        job = job_directories.SpecialJobDirectory(resultsdir)
528        self.assertEqual(job._dirname, resultsdir)
529        self.assertEqual(job._id, 118)
530
531
532    def _check_finished_job(self, jobtime, hqetimes, expected):
533        """Mock and test behavior of a finished job.
534
535        Initialize the mocks for a call to
536        `get_timestamp_if_finished()`, then simulate one call.
537        Assert that the returned timestamp matches the passed
538        in expected value.
539
540        @param jobtime Time used to construct a _MockJob object.
541        @param hqetimes List of times used to construct
542                        _MockHostQueueEntry objects.
543        @param expected Expected time to be returned by
544                        get_timestamp_if_finished
545
546        """
547        job = job_directories.RegularJobDirectory('118-fubar')
548        job_directories._AFE.get_jobs(
549                id=job._id, finished=True).AndReturn(
550                        [_MockJob(jobtime)])
551        job_directories._AFE.get_host_queue_entries(
552                finished_on__isnull=False,
553                job_id=job._id).AndReturn(
554                        [_MockHostQueueEntry(t) for t in hqetimes])
555        self.mox.ReplayAll()
556        self.assertEqual(expected, job.get_timestamp_if_finished())
557        self.mox.VerifyAll()
558
559
560    def test_finished_regular_job(self):
561        """Test getting the timestamp for a finished regular job.
562
563        Tests the return value for
564        `RegularJobDirectory.get_timestamp_if_finished()` when
565        the AFE indicates the job is finished.
566
567        """
568        created_timestamp = _make_timestamp(1, True)
569        hqe_timestamp = _make_timestamp(0, True)
570        self._check_finished_job(created_timestamp,
571                                 [hqe_timestamp],
572                                 hqe_timestamp)
573
574
575    def test_finished_regular_job_multiple_hqes(self):
576        """Test getting the timestamp for a regular job with multiple hqes.
577
578        Tests the return value for
579        `RegularJobDirectory.get_timestamp_if_finished()` when
580        the AFE indicates the job is finished and the job has multiple host
581        queue entries.
582
583        Tests that the returned timestamp is the latest timestamp in
584        the list of HQEs, regardless of the returned order.
585
586        """
587        created_timestamp = _make_timestamp(2, True)
588        older_hqe_timestamp = _make_timestamp(1, True)
589        newer_hqe_timestamp = _make_timestamp(0, True)
590        hqe_list = [older_hqe_timestamp,
591                    newer_hqe_timestamp]
592        self._check_finished_job(created_timestamp,
593                                 hqe_list,
594                                 newer_hqe_timestamp)
595        self.mox.ResetAll()
596        hqe_list.reverse()
597        self._check_finished_job(created_timestamp,
598                                 hqe_list,
599                                 newer_hqe_timestamp)
600
601
602    def test_finished_regular_job_null_finished_times(self):
603        """Test getting the timestamp for an aborted regular job.
604
605        Tests the return value for
606        `RegularJobDirectory.get_timestamp_if_finished()` when
607        the AFE indicates the job is finished and the job has aborted host
608        queue entries.
609
610        """
611        timestamp = _make_timestamp(0, True)
612        self._check_finished_job(timestamp, [], timestamp)
613
614
615    def test_unfinished_regular_job(self):
616        """Test getting the timestamp for an unfinished regular job.
617
618        Tests the return value for
619        `RegularJobDirectory.get_timestamp_if_finished()` when
620        the AFE indicates the job is not finished.
621
622        """
623        job = job_directories.RegularJobDirectory('118-fubar')
624        job_directories._AFE.get_jobs(
625                id=job._id, finished=True).AndReturn([])
626        self.mox.ReplayAll()
627        self.assertIsNone(job.get_timestamp_if_finished())
628        self.mox.VerifyAll()
629
630
631    def test_finished_special_job(self):
632        """Test getting the timestamp for a finished special job.
633
634        Tests the return value for
635        `SpecialJobDirectory.get_timestamp_if_finished()` when
636        the AFE indicates the job is finished.
637
638        """
639        job = job_directories.SpecialJobDirectory(
640                'hosts/host1/118-reset')
641        timestamp = _make_timestamp(0, True)
642        job_directories._AFE.get_special_tasks(
643                id=job._id, is_complete=True).AndReturn(
644                    [_MockSpecialTask(timestamp)])
645        self.mox.ReplayAll()
646        self.assertEqual(timestamp,
647                         job.get_timestamp_if_finished())
648        self.mox.VerifyAll()
649
650
651    def test_unfinished_special_job(self):
652        """Test getting the timestamp for an unfinished special job.
653
654        Tests the return value for
655        `SpecialJobDirectory.get_timestamp_if_finished()` when
656        the AFE indicates the job is not finished.
657
658        """
659        job = job_directories.SpecialJobDirectory(
660                'hosts/host1/118-reset')
661        job_directories._AFE.get_special_tasks(
662                id=job._id, is_complete=True).AndReturn([])
663        self.mox.ReplayAll()
664        self.assertIsNone(job.get_timestamp_if_finished())
665        self.mox.VerifyAll()
666
667
668class _TempResultsDirTestBase(mox.MoxTestBase):
669    """Base class for tests using a temporary results directory."""
670
671    REGULAR_JOBLIST = [
672        '111-fubar', '112-fubar', '113-fubar', '114-snafu']
673    HOST_LIST = ['host1', 'host2', 'host3']
674    SPECIAL_JOBLIST = [
675        'hosts/host1/333-reset', 'hosts/host1/334-reset',
676        'hosts/host2/444-reset', 'hosts/host3/555-reset']
677
678
679    def setUp(self):
680        super(_TempResultsDirTestBase, self).setUp()
681        self._resultsroot = tempfile.mkdtemp()
682        self._cwd = os.getcwd()
683        os.chdir(self._resultsroot)
684
685
686    def tearDown(self):
687        os.chdir(self._cwd)
688        shutil.rmtree(self._resultsroot)
689        super(_TempResultsDirTestBase, self).tearDown()
690
691
692    def make_job(self, jobdir):
693        """Create a job with results in `self._resultsroot`.
694
695        @param jobdir Name of the subdirectory to be created in
696                      `self._resultsroot`.
697
698        """
699        os.mkdir(jobdir)
700        return _MockJobDirectory(jobdir)
701
702
703    def make_job_hierarchy(self):
704        """Create a sample hierarchy of job directories.
705
706        `self.REGULAR_JOBLIST` is a list of directories for regular
707        jobs to be created; `self.SPECIAL_JOBLIST` is a list of
708        directories for special jobs to be created.
709
710        """
711        for d in self.REGULAR_JOBLIST:
712            os.mkdir(d)
713        hostsdir = 'hosts'
714        os.mkdir(hostsdir)
715        for host in self.HOST_LIST:
716            os.mkdir(os.path.join(hostsdir, host))
717        for d in self.SPECIAL_JOBLIST:
718            os.mkdir(d)
719
720
721class FailedOffloadsLogTest(_TempResultsDirTestBase):
722    """Test the formatting of failed offloads log file."""
723    # Below is partial sample of a failed offload log file.  This text is
724    # deliberately hard-coded and then parsed to create the test data; the idea
725    # is to make sure the actual text format will be reviewed by a human being.
726    #
727    # first offload      count  directory
728    # --+----1----+----  ----+ ----+----1----+----2----+----3
729    _SAMPLE_DIRECTORIES_REPORT = '''\
730    =================== ======  ==============================
731    2014-03-14 15:09:26      1  118-fubar
732    2014-03-14 15:19:23      2  117-fubar
733    2014-03-14 15:29:20      6  116-fubar
734    2014-03-14 15:39:17     24  115-fubar
735    2014-03-14 15:49:14    120  114-fubar
736    2014-03-14 15:59:11    720  113-fubar
737    2014-03-14 16:09:08   5040  112-fubar
738    2014-03-14 16:19:05  40320  111-fubar
739    '''
740
741    def setUp(self):
742        super(FailedOffloadsLogTest, self).setUp()
743        self._offloader = gs_offloader.Offloader(_get_options([]))
744        self._joblist = []
745        for line in self._SAMPLE_DIRECTORIES_REPORT.split('\n')[1 : -1]:
746            date_, time_, count, dir_ = line.split()
747            job = _MockJobDirectory(dir_)
748            job._offload_count = int(count)
749            timestruct = time.strptime("%s %s" % (date_, time_),
750                                       gs_offloader.FAILED_OFFLOADS_TIME_FORMAT)
751            job._first_offload_start = time.mktime(timestruct)
752            # enter the jobs in reverse order, to make sure we
753            # test that the output will be sorted.
754            self._joblist.insert(0, job)
755
756
757    def assert_report_well_formatted(self, report_file):
758        with open(report_file, 'r') as f:
759            report_lines = f.read().split()
760
761        for end_of_header_index in range(len(report_lines)):
762            if report_lines[end_of_header_index].startswith('=='):
763                break
764        self.assertLess(end_of_header_index, len(report_lines),
765                        'Failed to find end-of-header marker in the report')
766
767        relevant_lines = report_lines[end_of_header_index:]
768        expected_lines = self._SAMPLE_DIRECTORIES_REPORT.split()
769        self.assertListEqual(relevant_lines, expected_lines)
770
771
772    def test_failed_offload_log_format(self):
773        """Trigger an e-mail report and check its contents."""
774        log_file = os.path.join(self._resultsroot, 'failed_log')
775        report = self._offloader._log_failed_jobs_locally(self._joblist,
776                                                          log_file=log_file)
777        self.assert_report_well_formatted(log_file)
778
779
780    def test_failed_offload_file_overwrite(self):
781        """Verify that we can saefly overwrite the log file."""
782        log_file = os.path.join(self._resultsroot, 'failed_log')
783        with open(log_file, 'w') as f:
784            f.write('boohoohoo')
785        report = self._offloader._log_failed_jobs_locally(self._joblist,
786                                                          log_file=log_file)
787        self.assert_report_well_formatted(log_file)
788
789
790class OffloadDirectoryTests(_TempResultsDirTestBase):
791    """Tests for `offload_dir()`."""
792
793    def setUp(self):
794        super(OffloadDirectoryTests, self).setUp()
795        # offload_dir() logs messages; silence them.
796        self._saved_loglevel = logging.getLogger().getEffectiveLevel()
797        logging.getLogger().setLevel(logging.CRITICAL+1)
798        self._job = self.make_job(self.REGULAR_JOBLIST[0])
799        self.mox.StubOutWithMock(gs_offloader, 'get_cmd_list')
800        self.mox.StubOutWithMock(signal, 'alarm')
801        self.mox.StubOutWithMock(models.test, 'parse_job_keyval')
802
803
804    def tearDown(self):
805        logging.getLogger().setLevel(self._saved_loglevel)
806        super(OffloadDirectoryTests, self).tearDown()
807
808    def _mock_upload_testresult_files(self):
809        self.mox.StubOutWithMock(gs_offloader, 'upload_testresult_files')
810        gs_offloader.upload_testresult_files(
811                mox.IgnoreArg(),mox.IgnoreArg()).AndReturn(None)
812
813    def _mock_create_marker_file(self):
814        self.mox.StubOutWithMock(__builtin__, 'open')
815        mock_marker_file = self.mox.CreateMock(file)
816        open(mox.IgnoreArg(), 'a').AndReturn(mock_marker_file)
817        mock_marker_file.close()
818
819
820    def _mock_offload_dir_calls(self, command, queue_args,
821                                marker_initially_exists=False,
822                                marker_eventually_exists=True):
823        """Mock out the calls needed by `offload_dir()`.
824
825        This covers only the calls made when there is no timeout.
826
827        @param command Command list to be returned by the mocked
828                       call to `get_cmd_list()`.
829
830        """
831        self.mox.StubOutWithMock(os.path, 'isfile')
832        os.path.isfile(mox.IgnoreArg()).AndReturn(marker_initially_exists)
833        signal.alarm(gs_offloader.OFFLOAD_TIMEOUT_SECS)
834        command.append(queue_args[0])
835        gs_offloader.get_cmd_list(
836                False, queue_args[0],
837                '%s%s' % (utils.DEFAULT_OFFLOAD_GSURI,
838                          queue_args[1])).AndReturn(command)
839        self._mock_upload_testresult_files()
840        signal.alarm(0)
841        signal.alarm(0)
842        os.path.isfile(mox.IgnoreArg()).AndReturn(marker_eventually_exists)
843
844
845    def _run_offload_dir(self, should_succeed, delete_age):
846        """Make one call to `offload_dir()`.
847
848        The caller ensures all mocks are set up already.
849
850        @param should_succeed True iff the call to `offload_dir()`
851                              is expected to succeed and remove the
852                              offloaded job directory.
853
854        """
855        self.mox.ReplayAll()
856        gs_offloader.get_offload_dir_func(
857                utils.DEFAULT_OFFLOAD_GSURI, False, delete_age)(
858                        self._job.queue_args[0],
859                        self._job.queue_args[1],
860                        self._job.queue_args[2])
861        self.mox.VerifyAll()
862        self.assertEqual(not should_succeed,
863                         os.path.isdir(self._job.queue_args[0]))
864
865
866    def test_offload_success(self):
867        """Test that `offload_dir()` can succeed correctly."""
868        self._mock_offload_dir_calls(['test', '-d'],
869                                     self._job.queue_args)
870        self._mock_create_marker_file()
871        self._run_offload_dir(True, 0)
872
873
874    def test_offload_failure(self):
875        """Test that `offload_dir()` can fail correctly."""
876        self._mock_offload_dir_calls(['test', '!', '-d'],
877                                     self._job.queue_args,
878                                     marker_eventually_exists=False)
879        self._run_offload_dir(False, 0)
880
881
882    def test_offload_timeout_early(self):
883        """Test that `offload_dir()` times out correctly.
884
885        This test triggers timeout at the earliest possible moment,
886        at the first call to set the timeout alarm.
887
888        """
889        self._mock_upload_testresult_files()
890        signal.alarm(gs_offloader.OFFLOAD_TIMEOUT_SECS).AndRaise(
891                        gs_offloader.TimeoutException('fubar'))
892        signal.alarm(0)
893        self._run_offload_dir(False, 0)
894
895
896    def test_offload_timeout_late(self):
897        """Test that `offload_dir()` times out correctly.
898
899        This test triggers timeout at the latest possible moment, at
900        the call to clear the timeout alarm.
901
902        """
903        signal.alarm(gs_offloader.OFFLOAD_TIMEOUT_SECS)
904        gs_offloader.get_cmd_list(
905                False, mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(
906                        ['test', '-d', self._job.queue_args[0]])
907        self._mock_upload_testresult_files()
908        signal.alarm(0).AndRaise(
909                gs_offloader.TimeoutException('fubar'))
910        signal.alarm(0)
911        self._run_offload_dir(False, 0)
912
913
914    def test_sanitize_dir(self):
915        """Test that folder/file name with invalid character can be corrected.
916        """
917        results_folder = tempfile.mkdtemp()
918        invalid_chars = '_'.join(gs_offloader.INVALID_GS_CHARS)
919        invalid_files = []
920        invalid_folder_name = 'invalid_name_folder_%s' % invalid_chars
921        invalid_folder = os.path.join(
922                results_folder,
923                invalid_folder_name)
924        invalid_files.append(os.path.join(
925                invalid_folder,
926                'invalid_name_file_%s' % invalid_chars))
927        for r in gs_offloader.INVALID_GS_CHAR_RANGE:
928            for c in range(r[0], r[1]+1):
929                # NULL cannot be in file name.
930                if c != 0:
931                    invalid_files.append(os.path.join(
932                            invalid_folder,
933                            'invalid_name_file_%s' % chr(c)))
934        good_folder =  os.path.join(results_folder, 'valid_name_folder')
935        good_file = os.path.join(good_folder, 'valid_name_file')
936        for folder in [invalid_folder, good_folder]:
937            os.makedirs(folder)
938        for f in invalid_files + [good_file]:
939            with open(f, 'w'):
940                pass
941        # check that broken symlinks don't break sanitization
942        symlink = os.path.join(invalid_folder, 'broken-link')
943        os.symlink(os.path.join(results_folder, 'no-such-file'),
944                   symlink)
945        fifo1 = os.path.join(results_folder, 'test_fifo1')
946        fifo2 = os.path.join(good_folder, 'test_fifo2')
947        fifo3 = os.path.join(invalid_folder, 'test_fifo3')
948        invalid_fifo4_name = 'test_fifo4_%s' % invalid_chars
949        fifo4 = os.path.join(invalid_folder, invalid_fifo4_name)
950        os.mkfifo(fifo1)
951        os.mkfifo(fifo2)
952        os.mkfifo(fifo3)
953        os.mkfifo(fifo4)
954        gs_offloader.sanitize_dir(results_folder)
955        for _, dirs, files in os.walk(results_folder):
956            for name in dirs + files:
957                self.assertEqual(name, gs_offloader.get_sanitized_name(name))
958                for c in name:
959                    self.assertFalse(c in gs_offloader.INVALID_GS_CHARS)
960                    for r in gs_offloader.INVALID_GS_CHAR_RANGE:
961                        self.assertFalse(ord(c) >= r[0] and ord(c) <= r[1])
962        self.assertTrue(os.path.exists(good_file))
963
964        self.assertTrue(os.path.exists(fifo1))
965        self.assertFalse(is_fifo(fifo1))
966        self.assertTrue(os.path.exists(fifo2))
967        self.assertFalse(is_fifo(fifo2))
968        corrected_folder = os.path.join(
969                results_folder,
970                gs_offloader.get_sanitized_name(invalid_folder_name))
971        corrected_fifo3 = os.path.join(
972                corrected_folder,
973                'test_fifo3')
974        self.assertFalse(os.path.exists(fifo3))
975        self.assertTrue(os.path.exists(corrected_fifo3))
976        self.assertFalse(is_fifo(corrected_fifo3))
977        corrected_fifo4 = os.path.join(
978                corrected_folder,
979                gs_offloader.get_sanitized_name(invalid_fifo4_name))
980        self.assertFalse(os.path.exists(fifo4))
981        self.assertTrue(os.path.exists(corrected_fifo4))
982        self.assertFalse(is_fifo(corrected_fifo4))
983
984        corrected_symlink = os.path.join(
985                corrected_folder,
986                'broken-link')
987        self.assertFalse(os.path.lexists(symlink))
988        self.assertTrue(os.path.exists(corrected_symlink))
989        self.assertFalse(os.path.islink(corrected_symlink))
990        shutil.rmtree(results_folder)
991
992
993    def check_limit_file_count(self, is_test_job=True):
994        """Test that folder with too many files can be compressed.
995
996        @param is_test_job: True to check the method with test job result
997                            folder. Set to False for special task folder.
998        """
999        results_folder = tempfile.mkdtemp()
1000        host_folder = os.path.join(
1001                results_folder,
1002                'lab1-host1' if is_test_job else 'hosts/lab1-host1/1-repair')
1003        debug_folder = os.path.join(host_folder, 'debug')
1004        sysinfo_folder = os.path.join(host_folder, 'sysinfo')
1005        for folder in [debug_folder, sysinfo_folder]:
1006            os.makedirs(folder)
1007            for i in range(10):
1008                with open(os.path.join(folder, str(i)), 'w') as f:
1009                    f.write('test')
1010
1011        gs_offloader.MAX_FILE_COUNT = 100
1012        gs_offloader.limit_file_count(
1013                results_folder if is_test_job else host_folder)
1014        self.assertTrue(os.path.exists(sysinfo_folder))
1015
1016        gs_offloader.MAX_FILE_COUNT = 10
1017        gs_offloader.limit_file_count(
1018                results_folder if is_test_job else host_folder)
1019        self.assertFalse(os.path.exists(sysinfo_folder))
1020        self.assertTrue(os.path.exists(sysinfo_folder + '.tgz'))
1021        self.assertTrue(os.path.exists(debug_folder))
1022
1023        shutil.rmtree(results_folder)
1024
1025
1026    def test_limit_file_count(self):
1027        """Test that folder with too many files can be compressed.
1028        """
1029        self.check_limit_file_count(is_test_job=True)
1030        self.check_limit_file_count(is_test_job=False)
1031
1032
1033    def test_is_valid_result(self):
1034        """Test _is_valid_result."""
1035        release_build = 'veyron_minnie-cheets-release/R52-8248.0.0'
1036        pfq_build = 'cyan-cheets-android-pfq/R54-8623.0.0-rc1'
1037        trybot_build = 'trybot-samus-release/R54-8640.0.0-b5092'
1038        trybot_2_build = 'trybot-samus-pfq/R54-8640.0.0-b5092'
1039        release_2_build = 'test-trybot-release/R54-8640.0.0-b5092'
1040        self.assertTrue(gs_offloader._is_valid_result(
1041            release_build, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts'))
1042        self.assertTrue(gs_offloader._is_valid_result(
1043            release_build, gs_offloader.CTS_RESULT_PATTERN, 'test_that_wrapper'))
1044        self.assertFalse(gs_offloader._is_valid_result(
1045            release_build, gs_offloader.CTS_RESULT_PATTERN, 'arc-bvt-cq'))
1046        self.assertTrue(gs_offloader._is_valid_result(
1047            release_build, gs_offloader.CTS_V2_RESULT_PATTERN, 'arc-gts'))
1048        self.assertFalse(gs_offloader._is_valid_result(
1049            None, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts'))
1050        self.assertFalse(gs_offloader._is_valid_result(
1051            release_build, gs_offloader.CTS_RESULT_PATTERN, None))
1052        self.assertFalse(gs_offloader._is_valid_result(
1053            pfq_build, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts'))
1054        self.assertFalse(gs_offloader._is_valid_result(
1055            trybot_build, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts'))
1056        self.assertFalse(gs_offloader._is_valid_result(
1057            trybot_2_build, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts'))
1058        self.assertTrue(gs_offloader._is_valid_result(
1059            release_2_build, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts'))
1060
1061
1062    def create_results_folder(self):
1063        """Create CTS/GTS results folders."""
1064        results_folder = tempfile.mkdtemp()
1065        host_folder = os.path.join(results_folder, 'chromeos4-row9-rack11-host22')
1066        debug_folder = os.path.join(host_folder, 'debug')
1067        sysinfo_folder = os.path.join(host_folder, 'sysinfo')
1068        cts_result_folder = os.path.join(
1069                host_folder, 'cheets_CTS.android.dpi', 'results', 'cts-results')
1070        cts_v2_result_folder = os.path.join(host_folder,
1071                'cheets_CTS_N.CtsGraphicsTestCases', 'results', 'android-cts')
1072        gts_result_folder = os.path.join(
1073                host_folder, 'cheets_GTS.google.admin', 'results', 'android-gts')
1074        timestamp_str = '2016.04.28_01.41.44'
1075        timestamp_cts_folder = os.path.join(cts_result_folder, timestamp_str)
1076        timestamp_cts_v2_folder = os.path.join(cts_v2_result_folder, timestamp_str)
1077        timestamp_gts_folder = os.path.join(gts_result_folder, timestamp_str)
1078
1079        # Test results in cts_result_folder with a different time-stamp.
1080        timestamp_str_2 = '2016.04.28_10.41.44'
1081        timestamp_cts_folder_2 = os.path.join(cts_result_folder, timestamp_str_2)
1082
1083        for folder in [debug_folder, sysinfo_folder, cts_result_folder,
1084                       timestamp_cts_folder, timestamp_cts_folder_2,
1085                       timestamp_cts_v2_folder, timestamp_gts_folder]:
1086            os.makedirs(folder)
1087
1088        path_pattern_pair = [(timestamp_cts_folder, gs_offloader.CTS_RESULT_PATTERN),
1089                             (timestamp_cts_folder_2, gs_offloader.CTS_RESULT_PATTERN),
1090                             (timestamp_cts_v2_folder, gs_offloader.CTS_V2_RESULT_PATTERN),
1091                             (timestamp_gts_folder, gs_offloader.CTS_V2_RESULT_PATTERN)]
1092
1093        # Create timestamp.zip file_path.
1094        cts_zip_file = os.path.join(cts_result_folder, timestamp_str + '.zip')
1095        cts_zip_file_2 = os.path.join(cts_result_folder, timestamp_str_2 + '.zip')
1096        cts_v2_zip_file = os.path.join(cts_v2_result_folder, timestamp_str + '.zip')
1097        gts_zip_file = os.path.join(gts_result_folder, timestamp_str + '.zip')
1098
1099        # Create xml file_path.
1100        cts_result_file = os.path.join(timestamp_cts_folder, 'testResult.xml')
1101        cts_result_file_2 = os.path.join(timestamp_cts_folder_2,
1102                                         'testResult.xml')
1103        gts_result_file = os.path.join(timestamp_gts_folder, 'test_result.xml')
1104        cts_v2_result_file = os.path.join(timestamp_cts_v2_folder,
1105                                         'test_result.xml')
1106
1107        for file_path in [cts_zip_file, cts_zip_file_2, cts_v2_zip_file,
1108                          gts_zip_file, cts_result_file, cts_result_file_2,
1109                          gts_result_file, cts_v2_result_file]:
1110            with open(file_path, 'w') as f:
1111                f.write('test')
1112
1113        return (results_folder, host_folder, path_pattern_pair)
1114
1115
1116    def test_upload_testresult_files(self):
1117        """Test upload_testresult_files."""
1118        results_folder, host_folder, path_pattern_pair = self.create_results_folder()
1119
1120        self.mox.StubOutWithMock(gs_offloader, '_upload_files')
1121        gs_offloader._upload_files(
1122            mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg(), False).AndReturn(
1123                ['test', '-d', host_folder])
1124        gs_offloader._upload_files(
1125            mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg(), False).AndReturn(
1126                ['test', '-d', host_folder])
1127        gs_offloader._upload_files(
1128            mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg(), False).AndReturn(
1129                ['test', '-d', host_folder])
1130
1131        self.mox.ReplayAll()
1132        gs_offloader.upload_testresult_files(results_folder, False)
1133        self.mox.VerifyAll()
1134        shutil.rmtree(results_folder)
1135
1136
1137    def test_upload_files(self):
1138        """Test upload_files"""
1139        results_folder, host_folder, path_pattern_pair = self.create_results_folder()
1140
1141        for path, pattern in path_pattern_pair:
1142            models.test.parse_job_keyval(mox.IgnoreArg()).AndReturn({
1143                'build': 'veyron_minnie-cheets-release/R52-8248.0.0',
1144                'parent_job_id': 'p_id',
1145                'suite': 'arc-cts'
1146            })
1147
1148            gs_offloader.get_cmd_list(
1149                False, mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(
1150                    ['test', '-d', path])
1151            gs_offloader.get_cmd_list(
1152                False, mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(
1153                    ['test', '-d', path])
1154
1155            self.mox.ReplayAll()
1156            gs_offloader._upload_files(host_folder, path, pattern, False)
1157            self.mox.VerifyAll()
1158            self.mox.ResetAll()
1159
1160        shutil.rmtree(results_folder)
1161
1162
1163class JobDirectoryOffloadTests(_TempResultsDirTestBase):
1164    """Tests for `_JobDirectory.enqueue_offload()`.
1165
1166    When testing with a `days_old` parameter of 0, we use
1167    `set_finished()` instead of `set_expired()`.  This causes the
1168    job's timestamp to be set in the future.  This is done so as
1169    to test that when `days_old` is 0, the job is always treated
1170    as eligible for offload, regardless of the timestamp's value.
1171
1172    Testing covers the following assertions:
1173     A. Each time `enqueue_offload()` is called, a message that
1174        includes the job's directory name will be logged using
1175        `logging.debug()`, regardless of whether the job was
1176        enqueued.  Nothing else is allowed to be logged.
1177     B. If the job is not eligible to be offloaded,
1178        `get_failure_time()` and `get_failure_count()` are 0.
1179     C. If the job is not eligible for offload, nothing is
1180        enqueued in `queue`.
1181     D. When the job is offloaded, `get_failure_count()` increments
1182        each time.
1183     E. When the job is offloaded, the appropriate parameters are
1184        enqueued exactly once.
1185     F. The first time a job is offloaded, `get_failure_time()` is
1186        set to the current time.
1187     G. `get_failure_time()` only changes the first time that the
1188        job is offloaded.
1189
1190    The test cases below are designed to exercise all of the
1191    meaningful state transitions at least once.
1192
1193    """
1194
1195    def setUp(self):
1196        super(JobDirectoryOffloadTests, self).setUp()
1197        self._job = self.make_job(self.REGULAR_JOBLIST[0])
1198        self._queue = Queue.Queue()
1199
1200
1201    def _offload_unexpired_job(self, days_old):
1202        """Make calls to `enqueue_offload()` for an unexpired job.
1203
1204        This method tests assertions B and C that calling
1205        `enqueue_offload()` has no effect.
1206
1207        """
1208        self.assertEqual(self._job.get_failure_count(), 0)
1209        self.assertEqual(self._job.get_failure_time(), 0)
1210        self._job.enqueue_offload(self._queue, days_old)
1211        self._job.enqueue_offload(self._queue, days_old)
1212        self.assertTrue(self._queue.empty())
1213        self.assertEqual(self._job.get_failure_count(), 0)
1214        self.assertEqual(self._job.get_failure_time(), 0)
1215
1216
1217    def _offload_expired_once(self, days_old, count):
1218        """Make one call to `enqueue_offload()` for an expired job.
1219
1220        This method tests assertions D and E regarding side-effects
1221        expected when a job is offloaded.
1222
1223        """
1224        self._job.enqueue_offload(self._queue, days_old)
1225        self.assertEqual(self._job.get_failure_count(), count)
1226        self.assertFalse(self._queue.empty())
1227        v = self._queue.get_nowait()
1228        self.assertTrue(self._queue.empty())
1229        self.assertEqual(v, self._job.queue_args)
1230
1231
1232    def _offload_expired_job(self, days_old):
1233        """Make calls to `enqueue_offload()` for a just-expired job.
1234
1235        This method directly tests assertions F and G regarding
1236        side-effects on `get_failure_time()`.
1237
1238        """
1239        t0 = time.time()
1240        self._offload_expired_once(days_old, 1)
1241        t1 = self._job.get_failure_time()
1242        self.assertLessEqual(t1, time.time())
1243        self.assertGreaterEqual(t1, t0)
1244        self._offload_expired_once(days_old, 2)
1245        self.assertEqual(self._job.get_failure_time(), t1)
1246        self._offload_expired_once(days_old, 3)
1247        self.assertEqual(self._job.get_failure_time(), t1)
1248
1249
1250    def test_case_1_no_expiration(self):
1251        """Test a series of `enqueue_offload()` calls with `days_old` of 0.
1252
1253        This tests that offload works as expected if calls are
1254        made both before and after the job becomes expired.
1255
1256        """
1257        self._offload_unexpired_job(0)
1258        self._job.set_finished(0)
1259        self._offload_expired_job(0)
1260
1261
1262    def test_case_2_no_expiration(self):
1263        """Test a series of `enqueue_offload()` calls with `days_old` of 0.
1264
1265        This tests that offload works as expected if calls are made
1266        only after the job becomes expired.
1267
1268        """
1269        self._job.set_finished(0)
1270        self._offload_expired_job(0)
1271
1272
1273    def test_case_1_with_expiration(self):
1274        """Test a series of `enqueue_offload()` calls with `days_old` non-zero.
1275
1276        This tests that offload works as expected if calls are made
1277        before the job finishes, before the job expires, and after
1278        the job expires.
1279
1280        """
1281        self._offload_unexpired_job(_TEST_EXPIRATION_AGE)
1282        self._job.set_finished(_TEST_EXPIRATION_AGE)
1283        self._offload_unexpired_job(_TEST_EXPIRATION_AGE)
1284        self._job.set_expired(_TEST_EXPIRATION_AGE)
1285        self._offload_expired_job(_TEST_EXPIRATION_AGE)
1286
1287
1288    def test_case_2_with_expiration(self):
1289        """Test a series of `enqueue_offload()` calls with `days_old` non-zero.
1290
1291        This tests that offload works as expected if calls are made
1292        between finishing and expiration, and after the job expires.
1293
1294        """
1295        self._job.set_finished(_TEST_EXPIRATION_AGE)
1296        self._offload_unexpired_job(_TEST_EXPIRATION_AGE)
1297        self._job.set_expired(_TEST_EXPIRATION_AGE)
1298        self._offload_expired_job(_TEST_EXPIRATION_AGE)
1299
1300
1301    def test_case_3_with_expiration(self):
1302        """Test a series of `enqueue_offload()` calls with `days_old` non-zero.
1303
1304        This tests that offload works as expected if calls are made
1305        only before finishing and after expiration.
1306
1307        """
1308        self._offload_unexpired_job(_TEST_EXPIRATION_AGE)
1309        self._job.set_expired(_TEST_EXPIRATION_AGE)
1310        self._offload_expired_job(_TEST_EXPIRATION_AGE)
1311
1312
1313    def test_case_4_with_expiration(self):
1314        """Test a series of `enqueue_offload()` calls with `days_old` non-zero.
1315
1316        This tests that offload works as expected if calls are made
1317        only after expiration.
1318
1319        """
1320        self._job.set_expired(_TEST_EXPIRATION_AGE)
1321        self._offload_expired_job(_TEST_EXPIRATION_AGE)
1322
1323
1324class GetJobDirectoriesTests(_TempResultsDirTestBase):
1325    """Tests for `_JobDirectory.get_job_directories()`."""
1326
1327    def setUp(self):
1328        super(GetJobDirectoriesTests, self).setUp()
1329        self.make_job_hierarchy()
1330        os.mkdir('not-a-job')
1331        open('not-a-dir', 'w').close()
1332
1333
1334    def _run_get_directories(self, cls, expected_list):
1335        """Test `get_job_directories()` for the given class.
1336
1337        Calls the method, and asserts that the returned list of
1338        directories matches the expected return value.
1339
1340        @param expected_list Expected return value from the call.
1341        """
1342        dirlist = cls.get_job_directories()
1343        self.assertEqual(set(dirlist), set(expected_list))
1344
1345
1346    def test_get_regular_jobs(self):
1347        """Test `RegularJobDirectory.get_job_directories()`."""
1348        self._run_get_directories(job_directories.RegularJobDirectory,
1349                                  self.REGULAR_JOBLIST)
1350
1351
1352    def test_get_special_jobs(self):
1353        """Test `SpecialJobDirectory.get_job_directories()`."""
1354        self._run_get_directories(job_directories.SpecialJobDirectory,
1355                                  self.SPECIAL_JOBLIST)
1356
1357
1358class AddJobsTests(_TempResultsDirTestBase):
1359    """Tests for `Offloader._add_new_jobs()`."""
1360
1361    MOREJOBS = ['115-fubar', '116-fubar', '117-fubar', '118-snafu']
1362
1363    def setUp(self):
1364        super(AddJobsTests, self).setUp()
1365        self._initial_job_names = (
1366            set(self.REGULAR_JOBLIST) | set(self.SPECIAL_JOBLIST))
1367        self.make_job_hierarchy()
1368        self._offloader = gs_offloader.Offloader(_get_options(['-a']))
1369        self.mox.StubOutWithMock(logging, 'debug')
1370
1371
1372    def _run_add_new_jobs(self, expected_key_set):
1373        """Basic test assertions for `_add_new_jobs()`.
1374
1375        Asserts the following:
1376          * The keys in the offloader's `_open_jobs` dictionary
1377            matches the expected set of keys.
1378          * For every job in `_open_jobs`, the job has the expected
1379            directory name.
1380
1381        """
1382        count = len(expected_key_set) - len(self._offloader._open_jobs)
1383        logging.debug(mox.IgnoreArg(), count)
1384        self.mox.ReplayAll()
1385        self._offloader._add_new_jobs()
1386        self.assertEqual(expected_key_set,
1387                         set(self._offloader._open_jobs.keys()))
1388        for jobkey, job in self._offloader._open_jobs.items():
1389            self.assertEqual(jobkey, job._dirname)
1390        self.mox.VerifyAll()
1391        self.mox.ResetAll()
1392
1393
1394    def test_add_jobs_empty(self):
1395        """Test adding jobs to an empty dictionary.
1396
1397        Calls the offloader's `_add_new_jobs()`, then perform
1398        the assertions of `self._check_open_jobs()`.
1399
1400        """
1401        self._run_add_new_jobs(self._initial_job_names)
1402
1403
1404    def test_add_jobs_non_empty(self):
1405        """Test adding jobs to a non-empty dictionary.
1406
1407        Calls the offloader's `_add_new_jobs()` twice; once from
1408        initial conditions, and then again after adding more
1409        directories.  After the second call, perform the assertions
1410        of `self._check_open_jobs()`.  Additionally, assert that
1411        keys added by the first call still map to their original
1412        job object after the second call.
1413
1414        """
1415        self._run_add_new_jobs(self._initial_job_names)
1416        jobs_copy = self._offloader._open_jobs.copy()
1417        for d in self.MOREJOBS:
1418            os.mkdir(d)
1419        self._run_add_new_jobs(self._initial_job_names |
1420                                 set(self.MOREJOBS))
1421        for key in jobs_copy.keys():
1422            self.assertIs(jobs_copy[key],
1423                          self._offloader._open_jobs[key])
1424
1425
1426class JobStateTests(_TempResultsDirTestBase):
1427    """Tests for job state predicates.
1428
1429    This tests for the expected results from the
1430    `is_offloaded()` predicate method.
1431
1432    """
1433
1434    def test_unfinished_job(self):
1435        """Test that an unfinished job reports the correct state.
1436
1437        A job is "unfinished" if it isn't marked complete in the
1438        database.  A job in this state is neither "complete" nor
1439        "reportable".
1440
1441        """
1442        job = self.make_job(self.REGULAR_JOBLIST[0])
1443        self.assertFalse(job.is_offloaded())
1444
1445
1446    def test_incomplete_job(self):
1447        """Test that an incomplete job reports the correct state.
1448
1449        A job is "incomplete" if exactly one attempt has been made
1450        to offload the job, but its results directory still exists.
1451        A job in this state is neither "complete" nor "reportable".
1452
1453        """
1454        job = self.make_job(self.REGULAR_JOBLIST[0])
1455        job.set_incomplete()
1456        self.assertFalse(job.is_offloaded())
1457
1458
1459    def test_reportable_job(self):
1460        """Test that a reportable job reports the correct state.
1461
1462        A job is "reportable" if more than one attempt has been made
1463        to offload the job, and its results directory still exists.
1464        A job in this state is "reportable", but not "complete".
1465
1466        """
1467        job = self.make_job(self.REGULAR_JOBLIST[0])
1468        job.set_reportable()
1469        self.assertFalse(job.is_offloaded())
1470
1471
1472    def test_completed_job(self):
1473        """Test that a completed job reports the correct state.
1474
1475        A job is "completed" if at least one attempt has been made
1476        to offload the job, and its results directory still exists.
1477        A job in this state is "complete", and not "reportable".
1478
1479        """
1480        job = self.make_job(self.REGULAR_JOBLIST[0])
1481        job.set_complete()
1482        self.assertTrue(job.is_offloaded())
1483
1484
1485class ReportingTests(_TempResultsDirTestBase):
1486    """Tests for `Offloader._update_offload_results()`."""
1487
1488    def setUp(self):
1489        super(ReportingTests, self).setUp()
1490        self._offloader = gs_offloader.Offloader(_get_options([]))
1491        self.mox.StubOutWithMock(self._offloader, '_log_failed_jobs_locally')
1492        self.mox.StubOutWithMock(logging, 'debug')
1493
1494
1495    def _add_job(self, jobdir):
1496        """Add a job to the dictionary of unfinished jobs."""
1497        j = self.make_job(jobdir)
1498        self._offloader._open_jobs[j._dirname] = j
1499        return j
1500
1501
1502    def _expect_log_message(self, new_open_jobs, with_failures):
1503        """Mock expected logging calls.
1504
1505        `_update_offload_results()` logs one message with the number
1506        of jobs removed from the open job set and the number of jobs
1507        still remaining.  Additionally, if there are reportable
1508        jobs, then it logs the number of jobs that haven't yet
1509        offloaded.
1510
1511        This sets up the logging calls using `new_open_jobs` to
1512        figure the job counts.  If `with_failures` is true, then
1513        the log message is set up assuming that all jobs in
1514        `new_open_jobs` have offload failures.
1515
1516        @param new_open_jobs New job set for calculating counts
1517                             in the messages.
1518        @param with_failures Whether the log message with a
1519                             failure count is expected.
1520
1521        """
1522        count = len(self._offloader._open_jobs) - len(new_open_jobs)
1523        logging.debug(mox.IgnoreArg(), count, len(new_open_jobs))
1524        if with_failures:
1525            logging.debug(mox.IgnoreArg(), len(new_open_jobs))
1526
1527
1528    def _run_update(self, new_open_jobs):
1529        """Call `_update_offload_results()`.
1530
1531        Initial conditions are set up by the caller.  This calls
1532        `_update_offload_results()` once, and then checks these
1533        assertions:
1534          * The offloader's new `_open_jobs` field contains only
1535            the entries in `new_open_jobs`.
1536
1537        @param new_open_jobs A dictionary representing the expected
1538                             new value of the offloader's
1539                             `_open_jobs` field.
1540        """
1541        self.mox.ReplayAll()
1542        self._offloader._update_offload_results()
1543        self.assertEqual(self._offloader._open_jobs, new_open_jobs)
1544        self.mox.VerifyAll()
1545        self.mox.ResetAll()
1546
1547
1548    def _expect_failed_jobs(self, failed_jobs):
1549        """Mock expected call to log the failed jobs on local disk.
1550
1551        TODO(crbug.com/686904): The fact that we have to mock an internal
1552        function for this test is evidence that we need to pull out the local
1553        file formatter in its own object in a future CL.
1554
1555        @param failed_jobs: The list of jobs being logged as failed.
1556        """
1557        self._offloader._log_failed_jobs_locally(failed_jobs)
1558
1559
1560    def test_no_jobs(self):
1561        """Test `_update_offload_results()` with no open jobs.
1562
1563        Initial conditions are an empty `_open_jobs` list.
1564        Expected result is an empty `_open_jobs` list.
1565
1566        """
1567        self._expect_log_message({}, False)
1568        self._expect_failed_jobs([])
1569        self._run_update({})
1570
1571
1572    def test_all_completed(self):
1573        """Test `_update_offload_results()` with only complete jobs.
1574
1575        Initial conditions are an `_open_jobs` list consisting of only completed
1576        jobs.
1577        Expected result is an empty `_open_jobs` list.
1578
1579        """
1580        for d in self.REGULAR_JOBLIST:
1581            self._add_job(d).set_complete()
1582        self._expect_log_message({}, False)
1583        self._expect_failed_jobs([])
1584        self._run_update({})
1585
1586
1587    def test_none_finished(self):
1588        """Test `_update_offload_results()` with only unfinished jobs.
1589
1590        Initial conditions are an `_open_jobs` list consisting of only
1591        unfinished jobs.
1592        Expected result is no change to the `_open_jobs` list.
1593
1594        """
1595        for d in self.REGULAR_JOBLIST:
1596            self._add_job(d)
1597        new_jobs = self._offloader._open_jobs.copy()
1598        self._expect_log_message(new_jobs, False)
1599        self._expect_failed_jobs([])
1600        self._run_update(new_jobs)
1601
1602
1603if __name__ == '__main__':
1604    unittest.main()
1605