1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4# Copyright (c) 2021 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import heapq 18import itertools 19import operator 20 21 22class BlocksManager(object): 23 """ 24 blocks manager 25 """ 26 27 def __init__(self, range_data=None): 28 self.monotonic = False 29 if isinstance(range_data, str): 30 tmp_range = [] 31 if range_data != "0": 32 for i in range_data.split(): 33 if i != "0": 34 tmp_range.append(i) 35 range_data = ' '.join(tmp_range) 36 self.__parse_data_text(range_data) 37 elif range_data: 38 if len(range_data) % 2 != 0: 39 raise RuntimeError 40 self.range_data = tuple(self.__remove_repeated_pairs(range_data)) 41 self.monotonic = all( 42 x < y for x, y in zip(self.range_data, self.range_data[1:])) 43 else: 44 self.range_data = () 45 46 def __iter__(self): 47 for i in range(0, len(self.range_data), 2): 48 yield self.range_data[i:i + 2] 49 50 def __eq__(self, other): 51 return self.range_data == other.range_data 52 53 def __ne__(self, other): 54 return self.range_data != other.range_data 55 56 def __parse_data_text(self, text): 57 """ 58 Parse data from text content. 59 """ 60 data = [] 61 monotonic = True 62 last = -1 63 for split_content in text.split(): 64 if "-" in split_content: 65 start_value, end_value = \ 66 (int(n) for n in split_content.split("-")) 67 data.append(start_value) 68 data.append(end_value + 1) 69 if last <= start_value <= end_value: 70 last = end_value 71 else: 72 monotonic = False 73 else: 74 int_content = int(split_content) 75 data.append(int_content) 76 data.append(int_content + 1) 77 if last <= int_content: 78 last = int_content + 1 79 else: 80 monotonic = False 81 data.sort() 82 self.range_data = tuple(self.__remove_repeated_pairs(data)) 83 self.monotonic = monotonic 84 85 @staticmethod 86 def __remove_repeated_pairs(source): 87 """ 88 Remove repeated blocks. 89 """ 90 new = None 91 for num in source: 92 if num == new: 93 new = None 94 else: 95 if new is not None: 96 yield new 97 new = num 98 if new is not None: 99 yield new 100 101 def to_string_raw(self): 102 if len(self.range_data) == 0: 103 raise RuntimeError 104 return "".join([str(len(self.range_data)), ",", ",".join( 105 str(i) for i in self.range_data)]) 106 107 def get_union_with_other(self, other): 108 """ 109 Obtain the intersection. 110 """ 111 range_a = self.get_subtract_with_other(other) 112 range_b = other.get_subtract_with_other(self) 113 range_c = self.get_intersect_with_other(other) 114 range_e, range_f, range_g = \ 115 list(range_a.range_data), list(range_b.range_data), list( 116 range_c.range_data) 117 range_d = [] 118 range_d.extend(range_e) 119 range_d.extend(range_f) 120 range_d.extend(range_g) 121 range_d.sort() 122 return BlocksManager(range_data=range_d) 123 124 def get_intersect_with_other(self, other): 125 """ 126 Obtain the intersection. 127 """ 128 other_data, data, new_data = list(self.range_data), list( 129 other.range_data), [] 130 for i in range(len(data) // 2): 131 for j in range(len(other_data) // 2): 132 data_list1 = [data[i * 2], data[i * 2 + 1], other_data[j * 2], 133 other_data[j * 2 + 1]] 134 data_list2 = [other_data[j * 2], other_data[j * 2 + 1], 135 data[i * 2], data[i * 2 + 1]] 136 sort_list = [data[i * 2], data[i * 2 + 1], other_data[j * 2], 137 other_data[j * 2 + 1]] 138 sort_list.sort() 139 if operator.ne(sort_list, data_list1) and \ 140 operator.ne(sort_list, data_list2): 141 new_data.append(sort_list[1]) 142 new_data.append(sort_list[2]) 143 return BlocksManager(range_data=new_data) 144 145 def get_subtract_with_other(self, other): 146 """ 147 Obtain the difference set. 148 """ 149 intersect_ran = self.get_intersect_with_other(other) 150 data, intersect_data = list(self.range_data), list( 151 intersect_ran.range_data) 152 new_data = data + intersect_data 153 new_data.sort() 154 return BlocksManager(range_data=new_data) 155 156 def is_overlaps(self, other): 157 """ 158 Determine whether there is non-empty overlap. 159 """ 160 intersect_range = self.get_intersect_with_other(other) 161 if intersect_range.size(): 162 return True 163 return False 164 165 def size(self): 166 """ 167 Obtain the self size. 168 """ 169 total = 0 170 data = list(self.range_data) 171 for i in range(len(data) // 2): 172 total += data[i * 2 + 1] - data[i * 2] 173 return total 174 175 def get_map_within(self, other): 176 """ 177 When other is a subset of self, 178 obtain the continuous range starting from 0. 179 :param other: 180 :return: 181 """ 182 out = [] 183 offset = 0 184 start = None 185 for be_num, af_num in \ 186 heapq.merge(zip(self.range_data, itertools.cycle((-5, +5))), 187 zip(other.range_data, itertools.cycle((-1, +1)))): 188 if af_num == -5: 189 start = be_num 190 elif af_num == +5: 191 offset += be_num - start 192 start = None 193 else: 194 out.append(offset + be_num - start) 195 return BlocksManager(range_data=out) 196 197 def extend_value_to_blocks(self, value): 198 """ 199 Extend self 200 :param value: 201 :return: 202 """ 203 data = list(self.range_data) 204 remove_data = [] 205 for i in range(len(data) // 2): 206 data[i * 2 + 1] = data[i * 2 + 1] + value 207 data[i * 2] = max(0, data[i * 2] - value) 208 for i in range(len(data) // 2 - 1): 209 sign_1 = data[i * 2 + 1] 210 sign_2 = data[(i + 1) * 2] 211 if sign_1 >= sign_2: 212 remove_data.append(sign_2) 213 remove_data.append(sign_1) 214 for j in remove_data: 215 data.remove(j) 216 return BlocksManager(data) 217 218 def get_first_block_obj(self, value): 219 """ 220 Return the first range pair containing the value. 221 :param value: 222 :return: 223 """ 224 if self.size() <= value: 225 return self 226 data = list(self.range_data) 227 be_value, af_value = 0, 1 228 for i in range(len(data) // 2): 229 be_value += data[i * 2 + 1] - data[i * 2] 230 if be_value > value: 231 data[i * 2 + 1] = data[i * 2 + 1] - be_value + value 232 break 233 else: 234 af_value += 1 235 return BlocksManager(range_data=data[:af_value * 2]) 236