1# Copyright 2016 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from __future__ import absolute_import 6from __future__ import division 7from __future__ import print_function 8 9import multiprocessing 10import os 11import tempfile 12import time 13import unittest 14 15from py_utils import lock 16from six.moves import range # pylint: disable=redefined-builtin 17 18 19def _AppendTextToFile(file_name): 20 with open(file_name, 'a') as f: 21 lock.AcquireFileLock(f, lock.LOCK_EX) 22 # Sleep 100 ms to increase the chance of another process trying to acquire 23 # the lock of file as the same time. 24 time.sleep(0.1) 25 f.write('Start') 26 for _ in range(10000): 27 f.write('*') 28 f.write('End') 29 30 31def _ReadFileWithSharedLockBlockingThenWrite(read_file, write_file): 32 with open(read_file, 'r') as f: 33 lock.AcquireFileLock(f, lock.LOCK_SH) 34 content = f.read() 35 with open(write_file, 'a') as f2: 36 lock.AcquireFileLock(f2, lock.LOCK_EX) 37 f2.write(content) 38 39 40def _ReadFileWithExclusiveLockNonBlocking(target_file, status_file): 41 with open(target_file, 'r') as f: 42 try: 43 lock.AcquireFileLock(f, lock.LOCK_EX | lock.LOCK_NB) 44 with open(status_file, 'w') as f2: 45 f2.write('LockException was not raised') 46 except lock.LockException: 47 with open(status_file, 'w') as f2: 48 f2.write('LockException raised') 49 50 51class FileLockTest(unittest.TestCase): 52 def setUp(self): 53 tf = tempfile.NamedTemporaryFile(delete=False) 54 tf.close() 55 self.temp_file_path = tf.name 56 57 def tearDown(self): 58 os.remove(self.temp_file_path) 59 60 def testExclusiveLock(self): 61 processess = [] 62 for _ in range(10): 63 p = multiprocessing.Process( 64 target=_AppendTextToFile, args=(self.temp_file_path,)) 65 p.start() 66 processess.append(p) 67 for p in processess: 68 p.join() 69 70 # If the file lock works as expected, there should be 10 atomic writes of 71 # 'Start***...***End' to the file in some order, which lead to the final 72 # file content as below. 73 expected_file_content = ''.join((['Start'] + ['*']*10000 + ['End']) * 10) 74 with open(self.temp_file_path, 'r') as f: 75 # Use assertTrue instead of assertEquals since the strings are big, hence 76 # assertEquals's assertion failure will contain huge strings. 77 self.assertTrue(expected_file_content == f.read()) 78 79 def testSharedLock(self): 80 tf = tempfile.NamedTemporaryFile(delete=False) 81 tf.close() 82 temp_write_file = tf.name 83 try: 84 with open(self.temp_file_path, 'w') as f: 85 f.write('0123456789') 86 with open(self.temp_file_path, 'r') as f: 87 # First, acquire a shared lock on temp_file_path 88 lock.AcquireFileLock(f, lock.LOCK_SH) 89 90 processess = [] 91 # Create 10 processes that also try to acquire shared lock from 92 # temp_file_path then append temp_file_path's content to temp_write_file 93 for _ in range(10): 94 p = multiprocessing.Process( 95 target=_ReadFileWithSharedLockBlockingThenWrite, 96 args=(self.temp_file_path, temp_write_file)) 97 p.start() 98 processess.append(p) 99 for p in processess: 100 p.join() 101 102 # temp_write_file should contains 10 copy of temp_file_path's content. 103 with open(temp_write_file, 'r') as f: 104 self.assertEquals('0123456789'*10, f.read()) 105 finally: 106 os.remove(temp_write_file) 107 108 def testNonBlockingLockAcquiring(self): 109 tf = tempfile.NamedTemporaryFile(delete=False) 110 tf.close() 111 temp_status_file = tf.name 112 try: 113 with open(self.temp_file_path, 'w') as f: 114 lock.AcquireFileLock(f, lock.LOCK_EX) 115 p = multiprocessing.Process( 116 target=_ReadFileWithExclusiveLockNonBlocking, 117 args=(self.temp_file_path, temp_status_file)) 118 p.start() 119 p.join() 120 with open(temp_status_file, 'r') as f: 121 self.assertEquals('LockException raised', f.read()) 122 finally: 123 os.remove(temp_status_file) 124 125 def testUnlockBeforeClosingFile(self): 126 tf = tempfile.NamedTemporaryFile(delete=False) 127 tf.close() 128 temp_status_file = tf.name 129 try: 130 with open(self.temp_file_path, 'r') as f: 131 lock.AcquireFileLock(f, lock.LOCK_SH) 132 lock.ReleaseFileLock(f) 133 p = multiprocessing.Process( 134 target=_ReadFileWithExclusiveLockNonBlocking, 135 args=(self.temp_file_path, temp_status_file)) 136 p.start() 137 p.join() 138 with open(temp_status_file, 'r') as f: 139 self.assertEquals('LockException was not raised', f.read()) 140 finally: 141 os.remove(temp_status_file) 142 143 def testContextualLock(self): 144 tf = tempfile.NamedTemporaryFile(delete=False) 145 tf.close() 146 temp_status_file = tf.name 147 try: 148 with open(self.temp_file_path, 'r') as f: 149 with lock.FileLock(f, lock.LOCK_EX): 150 # Within this block, accessing self.temp_file_path from another 151 # process should raise exception. 152 p = multiprocessing.Process( 153 target=_ReadFileWithExclusiveLockNonBlocking, 154 args=(self.temp_file_path, temp_status_file)) 155 p.start() 156 p.join() 157 with open(temp_status_file, 'r') as f: 158 self.assertEquals('LockException raised', f.read()) 159 160 # Accessing self.temp_file_path here should not raise exception. 161 p = multiprocessing.Process( 162 target=_ReadFileWithExclusiveLockNonBlocking, 163 args=(self.temp_file_path, temp_status_file)) 164 p.start() 165 p.join() 166 with open(temp_status_file, 'r') as f: 167 self.assertEquals('LockException was not raised', f.read()) 168 finally: 169 os.remove(temp_status_file) 170