1# Copyright 2016 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from __future__ import absolute_import 6from __future__ import division 7from __future__ import print_function 8 9import multiprocessing 10import os 11import tempfile 12import time 13import unittest 14 15from six.moves import range # pylint: disable=redefined-builtin 16 17from py_utils import lock 18 19 20def _AppendTextToFile(file_name): 21 with open(file_name, 'a') as f: 22 lock.AcquireFileLock(f, lock.LOCK_EX) 23 # Sleep 100 ms to increase the chance of another process trying to acquire 24 # the lock of file as the same time. 25 time.sleep(0.1) 26 f.write('Start') 27 for _ in range(10000): 28 f.write('*') 29 f.write('End') 30 31 32def _ReadFileWithSharedLockBlockingThenWrite(read_file, write_file): 33 with open(read_file, 'r') as f: 34 lock.AcquireFileLock(f, lock.LOCK_SH) 35 content = f.read() 36 with open(write_file, 'a') as f2: 37 lock.AcquireFileLock(f2, lock.LOCK_EX) 38 f2.write(content) 39 40 41def _ReadFileWithExclusiveLockNonBlocking(target_file, status_file): 42 with open(target_file, 'r') as f: 43 try: 44 lock.AcquireFileLock(f, lock.LOCK_EX | lock.LOCK_NB) 45 with open(status_file, 'w') as f2: 46 f2.write('LockException was not raised') 47 except lock.LockException: 48 with open(status_file, 'w') as f2: 49 f2.write('LockException raised') 50 51 52class FileLockTest(unittest.TestCase): 53 def setUp(self): 54 tf = tempfile.NamedTemporaryFile(delete=False) 55 tf.close() 56 self.temp_file_path = tf.name 57 58 def tearDown(self): 59 os.remove(self.temp_file_path) 60 61 def testExclusiveLock(self): 62 processess = [] 63 for _ in range(10): 64 p = multiprocessing.Process( 65 target=_AppendTextToFile, args=(self.temp_file_path,)) 66 p.start() 67 processess.append(p) 68 for p in processess: 69 p.join() 70 71 # If the file lock works as expected, there should be 10 atomic writes of 72 # 'Start***...***End' to the file in some order, which lead to the final 73 # file content as below. 74 expected_file_content = ''.join((['Start'] + ['*']*10000 + ['End']) * 10) 75 with open(self.temp_file_path, 'r') as f: 76 # Use assertTrue instead of assertEquals since the strings are big, hence 77 # assertEquals's assertion failure will contain huge strings. 78 self.assertTrue(expected_file_content == f.read()) 79 80 def testSharedLock(self): 81 tf = tempfile.NamedTemporaryFile(delete=False) 82 tf.close() 83 temp_write_file = tf.name 84 try: 85 with open(self.temp_file_path, 'w') as f: 86 f.write('0123456789') 87 with open(self.temp_file_path, 'r') as f: 88 # First, acquire a shared lock on temp_file_path 89 lock.AcquireFileLock(f, lock.LOCK_SH) 90 91 processess = [] 92 # Create 10 processes that also try to acquire shared lock from 93 # temp_file_path then append temp_file_path's content to temp_write_file 94 for _ in range(10): 95 p = multiprocessing.Process( 96 target=_ReadFileWithSharedLockBlockingThenWrite, 97 args=(self.temp_file_path, temp_write_file)) 98 p.start() 99 processess.append(p) 100 for p in processess: 101 p.join() 102 103 # temp_write_file should contains 10 copy of temp_file_path's content. 104 with open(temp_write_file, 'r') as f: 105 self.assertEquals('0123456789'*10, f.read()) 106 finally: 107 os.remove(temp_write_file) 108 109 def testNonBlockingLockAcquiring(self): 110 tf = tempfile.NamedTemporaryFile(delete=False) 111 tf.close() 112 temp_status_file = tf.name 113 try: 114 with open(self.temp_file_path, 'w') as f: 115 lock.AcquireFileLock(f, lock.LOCK_EX) 116 p = multiprocessing.Process( 117 target=_ReadFileWithExclusiveLockNonBlocking, 118 args=(self.temp_file_path, temp_status_file)) 119 p.start() 120 p.join() 121 with open(temp_status_file, 'r') as f: 122 self.assertEquals('LockException raised', f.read()) 123 finally: 124 os.remove(temp_status_file) 125 126 def testUnlockBeforeClosingFile(self): 127 tf = tempfile.NamedTemporaryFile(delete=False) 128 tf.close() 129 temp_status_file = tf.name 130 try: 131 with open(self.temp_file_path, 'r') as f: 132 lock.AcquireFileLock(f, lock.LOCK_SH) 133 lock.ReleaseFileLock(f) 134 p = multiprocessing.Process( 135 target=_ReadFileWithExclusiveLockNonBlocking, 136 args=(self.temp_file_path, temp_status_file)) 137 p.start() 138 p.join() 139 with open(temp_status_file, 'r') as f: 140 self.assertEquals('LockException was not raised', f.read()) 141 finally: 142 os.remove(temp_status_file) 143 144 def testContextualLock(self): 145 tf = tempfile.NamedTemporaryFile(delete=False) 146 tf.close() 147 temp_status_file = tf.name 148 try: 149 with open(self.temp_file_path, 'r') as f: 150 with lock.FileLock(f, lock.LOCK_EX): 151 # Within this block, accessing self.temp_file_path from another 152 # process should raise exception. 153 p = multiprocessing.Process( 154 target=_ReadFileWithExclusiveLockNonBlocking, 155 args=(self.temp_file_path, temp_status_file)) 156 p.start() 157 p.join() 158 with open(temp_status_file, 'r') as f: 159 self.assertEquals('LockException raised', f.read()) 160 161 # Accessing self.temp_file_path here should not raise exception. 162 p = multiprocessing.Process( 163 target=_ReadFileWithExclusiveLockNonBlocking, 164 args=(self.temp_file_path, temp_status_file)) 165 p.start() 166 p.join() 167 with open(temp_status_file, 'r') as f: 168 self.assertEquals('LockException was not raised', f.read()) 169 finally: 170 os.remove(temp_status_file) 171