1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging, random, re, string, traceback 6from autotest_lib.client.common_lib import error 7from autotest_lib.server import autotest 8from autotest_lib.server import hosts 9from autotest_lib.server import test 10 11class kernel_MemoryRamoop(test.test): 12 """ 13 This test verify that /dev/pstore/console-ramoops is preserved correctly 14 after system reboot/kernel crash and also verify that there is no memory 15 corrupt in that log. 16 17 There is also platform_KernelErrorPaths test that test kernel crash. But 18 this test focuses on verify that kernel create console-ramoops file 19 correctly and its content is not corrupt. Contrary to the other test 20 that test at the bigger scope, i.e. the whole error reporting mechanism. 21 """ 22 version = 1 23 24 _RAMOOP_PATH = '/dev/pstore/console-ramoops' 25 _KMSG_PATH = '/dev/kmsg' 26 _LKDTM_PATH = '/sys/kernel/debug/provoke-crash/DIRECT' 27 28 # ramopp have size of 128K so we will generate about 100K of random message 29 _MSG_LINE_COUNT = 1000 30 _MSG_LINE_LENGTH = 80 31 _MSG_MAGIC = 'ramoop_test' 32 33 def run_once(self, client_ip): 34 """ 35 Run the test. 36 """ 37 if not client_ip: 38 error.TestError("Must provide client's IP address to test") 39 40 self._client = hosts.create_host(client_ip) 41 self._client_at = autotest.Autotest(self._client) 42 43 self._run_test(self._do_reboot, '.*Restarting system.*$') 44 45 if self._client.check_for_lkdtm(): 46 self._run_test(self._do_kernel_panic, '.*lkdtm:.*PANIC$') 47 self._run_test(self._do_kernel_bug, '.*lkdtm:.*BUG$') 48 else: 49 logging.warn('DUT did not have kernel dump test module') 50 51 self._run_test(self._do_reboot_with_suspend, '.*Restarting system.*$') 52 53 def _run_test(self, test_function, sig_pattern): 54 """ 55 Run the test using by write random message to kernel log. Then 56 restart/crash the kernel and then verify integrity of console-ramoop 57 58 @param test_function: fuction to call to reboot / crash DUT 59 @param sig_pattern: regex of kernel log message generate when reboot 60 or crash by test_function 61 """ 62 63 msg = self._generate_random_msg() 64 65 for line in msg: 66 cmd = 'echo "%s" > %s' % (line, self._KMSG_PATH) 67 self._client.run(cmd) 68 69 test_function() 70 71 cmd = 'cat %s' % self._RAMOOP_PATH 72 ramoop = self._client.run(cmd).stdout 73 74 self._verify_random_msg(ramoop, msg, sig_pattern) 75 76 def _do_reboot(self): 77 """ 78 Reboot host machine 79 """ 80 logging.info('Server: reboot client') 81 try: 82 self._client.reboot() 83 except error.AutoservRebootError as err: 84 raise error.TestFail('%s.\nTest failed with error %s' % ( 85 traceback.format_exc(), str(err))) 86 87 def _do_reboot_with_suspend(self): 88 """ 89 Reboot host machine after suspend once 90 """ 91 self._client.suspend(suspend_time=15) 92 93 logging.info('Server: reboot client') 94 try: 95 self._client.reboot() 96 except error.AutoservRebootError as err: 97 raise error.TestFail('%s.\nTest failed with error %s' % ( 98 traceback.format_exc(), str(err))) 99 100 def _do_kernel_panic(self): 101 """ 102 Cause kernel panic using kernel dump test module 103 """ 104 logging.info('Server: make client kernel panic') 105 106 cmd = 'echo PANIC > %s' % self._LKDTM_PATH 107 boot_id = self._client.get_boot_id() 108 self._client.run(cmd, ignore_status=True) 109 self._client.wait_for_restart(old_boot_id=boot_id) 110 111 def _do_kernel_bug(self): 112 """ 113 Cause kernel bug using kernel dump test module 114 """ 115 logging.info('Server: make client kernel bug') 116 117 cmd = 'echo BUG > %s' % self._LKDTM_PATH 118 boot_id = self._client.get_boot_id() 119 self._client.run(cmd, ignore_status=True) 120 self._client.wait_for_restart(old_boot_id=boot_id) 121 122 def _generate_random_msg(self): 123 """ 124 Generate random message to put in kernel log 125 The message format is [magic string]: [3 digit id] [random char/digit] 126 """ 127 valid_char = string.letters + string.digits 128 ret = [] 129 for i in range(self._MSG_LINE_COUNT): 130 line = '%s: %03d ' % (self._MSG_MAGIC, i) 131 for _ in range(self._MSG_LINE_LENGTH): 132 line += random.choice(valid_char) 133 ret += [line] 134 return ret 135 136 def _verify_random_msg(self, ramoop, src_msg, sig_pattern): 137 """ 138 Verify random message generated by _generate_random_msg 139 140 There are 3 things to verify. 141 1. At least one random message exist. (earlier random message may be 142 cutoff because console-ramoops has limited size. 143 2. Integrity of random message. 144 3. Signature of reboot / kernel crash 145 146 @param ramoop: console-ramoops file in DUT 147 @param src_msg: message write to kernel log 148 @param sig_patterm: regex of kernel log to verify 149 """ 150 # time stamp magic id random 151 pattern = str("\\[ *(\\d+\\.\\d+)\\].*(%s: (\\d{3}) \\w{%d})" % 152 (self._MSG_MAGIC, self._MSG_LINE_LENGTH)) 153 matcher = re.compile(pattern) 154 155 logging.info('%s', pattern) 156 157 state = 'find_rand_msg' 158 159 last_timestamp = 0 160 for line in ramoop.split('\n'): 161 if state == 'find_rand_msg': 162 if not matcher.match(line): 163 continue 164 last_id = int(matcher.split(line)[3]) - 1 165 state = 'match_rand_pattern' 166 logging.info("%s: %s", state, line) 167 168 if state == 'match_rand_pattern': 169 if not matcher.match(line): 170 continue 171 components = matcher.split(line) 172 timestamp = float(components[1]) 173 msg = components[2] 174 id = int(components[3]) 175 176 if timestamp < last_timestamp: 177 logging.info("last_timestamp: %f, timestamp: %d", 178 last_timestamp, timestamp) 179 raise error.TestFail('Found reverse time stamp.') 180 last_timestamp = timestamp 181 182 if id != last_id + 1: 183 logging.info("last_id: %d, id: %d", last_id, id) 184 raise error.TestFail('Found missing message.') 185 last_id = id 186 187 if msg != src_msg[id]: 188 logging.info("cur_msg: '%s'", msg) 189 logging.info("src_msg: '%s'", src_msg[id]) 190 raise error.TestFail('Found corrupt message.') 191 192 if id == self._MSG_LINE_COUNT - 1: 193 state = 'find_signature' 194 195 if state == 'find_signature': 196 if re.match(sig_pattern, line): 197 logging.info("%s: %s", state, line) 198 break 199 200 # error case: successful run must break in find_sigature state 201 else: 202 raise error.TestFail(str('Verify failed at state %s' % state)) 203