1#!/usr/bin/env python 2# 3# rt-mutex tester 4# 5# (C) 2006 Thomas Gleixner <tglx@linutronix.de> 6# 7# This program is free software; you can redistribute it and/or modify 8# it under the terms of the GNU General Public License version 2 as 9# published by the Free Software Foundation. 10# 11import os 12import sys 13import getopt 14import shutil 15import string 16 17# Globals 18quiet = 0 19test = 0 20comments = 0 21 22sysfsprefix = "/sys/devices/system/rttest/rttest" 23statusfile = "/status" 24commandfile = "/command" 25 26# Command opcodes 27cmd_opcodes = { 28 "schedother" : "1", 29 "schedfifo" : "2", 30 "lock" : "3", 31 "locknowait" : "4", 32 "lockint" : "5", 33 "lockintnowait" : "6", 34 "lockcont" : "7", 35 "unlock" : "8", 36 "lockbkl" : "9", 37 "unlockbkl" : "10", 38 "signal" : "11", 39 "resetevent" : "98", 40 "reset" : "99", 41 } 42 43test_opcodes = { 44 "prioeq" : ["P" , "eq" , None], 45 "priolt" : ["P" , "lt" , None], 46 "priogt" : ["P" , "gt" , None], 47 "nprioeq" : ["N" , "eq" , None], 48 "npriolt" : ["N" , "lt" , None], 49 "npriogt" : ["N" , "gt" , None], 50 "unlocked" : ["M" , "eq" , 0], 51 "trylock" : ["M" , "eq" , 1], 52 "blocked" : ["M" , "eq" , 2], 53 "blockedwake" : ["M" , "eq" , 3], 54 "locked" : ["M" , "eq" , 4], 55 "opcodeeq" : ["O" , "eq" , None], 56 "opcodelt" : ["O" , "lt" , None], 57 "opcodegt" : ["O" , "gt" , None], 58 "eventeq" : ["E" , "eq" , None], 59 "eventlt" : ["E" , "lt" , None], 60 "eventgt" : ["E" , "gt" , None], 61 } 62 63# Print usage information 64def usage(): 65 print "rt-tester.py <-c -h -q -t> <testfile>" 66 print " -c display comments after first command" 67 print " -h help" 68 print " -q quiet mode" 69 print " -t test mode (syntax check)" 70 print " testfile: read test specification from testfile" 71 print " otherwise from stdin" 72 return 73 74# Print progress when not in quiet mode 75def progress(str): 76 if not quiet: 77 print str 78 79# Analyse a status value 80def analyse(val, top, arg): 81 82 intval = int(val) 83 84 if top[0] == "M": 85 intval = intval / (10 ** int(arg)) 86 intval = intval % 10 87 argval = top[2] 88 elif top[0] == "O": 89 argval = int(cmd_opcodes.get(arg, arg)) 90 else: 91 argval = int(arg) 92 93 # progress("%d %s %d" %(intval, top[1], argval)) 94 95 if top[1] == "eq" and intval == argval: 96 return 1 97 if top[1] == "lt" and intval < argval: 98 return 1 99 if top[1] == "gt" and intval > argval: 100 return 1 101 return 0 102 103# Parse the commandline 104try: 105 (options, arguments) = getopt.getopt(sys.argv[1:],'chqt') 106except getopt.GetoptError, ex: 107 usage() 108 sys.exit(1) 109 110# Parse commandline options 111for option, value in options: 112 if option == "-c": 113 comments = 1 114 elif option == "-q": 115 quiet = 1 116 elif option == "-t": 117 test = 1 118 elif option == '-h': 119 usage() 120 sys.exit(0) 121 122# Select the input source 123if arguments: 124 try: 125 fd = open(arguments[0]) 126 except Exception,ex: 127 sys.stderr.write("File not found %s\n" %(arguments[0])) 128 sys.exit(1) 129else: 130 fd = sys.stdin 131 132linenr = 0 133 134# Read the test patterns 135while 1: 136 137 linenr = linenr + 1 138 line = fd.readline() 139 if not len(line): 140 break 141 142 line = line.strip() 143 parts = line.split(":") 144 145 if not parts or len(parts) < 1: 146 continue 147 148 if len(parts[0]) == 0: 149 continue 150 151 if parts[0].startswith("#"): 152 if comments > 1: 153 progress(line) 154 continue 155 156 if comments == 1: 157 comments = 2 158 159 progress(line) 160 161 cmd = parts[0].strip().lower() 162 opc = parts[1].strip().lower() 163 tid = parts[2].strip() 164 dat = parts[3].strip() 165 166 try: 167 # Test or wait for a status value 168 if cmd == "t" or cmd == "w": 169 testop = test_opcodes[opc] 170 171 fname = "%s%s%s" %(sysfsprefix, tid, statusfile) 172 if test: 173 print fname 174 continue 175 176 while 1: 177 query = 1 178 fsta = open(fname, 'r') 179 status = fsta.readline().strip() 180 fsta.close() 181 stat = status.split(",") 182 for s in stat: 183 s = s.strip() 184 if s.startswith(testop[0]): 185 # Seperate status value 186 val = s[2:].strip() 187 query = analyse(val, testop, dat) 188 break 189 if query or cmd == "t": 190 break 191 192 progress(" " + status) 193 194 if not query: 195 sys.stderr.write("Test failed in line %d\n" %(linenr)) 196 sys.exit(1) 197 198 # Issue a command to the tester 199 elif cmd == "c": 200 cmdnr = cmd_opcodes[opc] 201 # Build command string and sys filename 202 cmdstr = "%s:%s" %(cmdnr, dat) 203 fname = "%s%s%s" %(sysfsprefix, tid, commandfile) 204 if test: 205 print fname 206 continue 207 fcmd = open(fname, 'w') 208 fcmd.write(cmdstr) 209 fcmd.close() 210 211 except Exception,ex: 212 sys.stderr.write(str(ex)) 213 sys.stderr.write("\nSyntax error in line %d\n" %(linenr)) 214 if not test: 215 fd.close() 216 sys.exit(1) 217 218# Normal exit pass 219print "Pass" 220sys.exit(0) 221 222 223