1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4# Copyright (c) 2021 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import heapq 18import itertools 19import operator 20 21 22class BlocksManager(object): 23 """ 24 blocks manager 25 """ 26 27 def __init__(self, range_data=None): 28 self.monotonic = False 29 if isinstance(range_data, str): 30 self.__parse_data_text(range_data) 31 elif range_data: 32 if len(range_data) % 2 != 0: 33 raise RuntimeError 34 self.range_data = tuple(self.__remove_repeated_pairs(range_data)) 35 self.monotonic = all( 36 x < y for x, y in zip(self.range_data, self.range_data[1:])) 37 else: 38 self.range_data = () 39 40 def __iter__(self): 41 for i in range(0, len(self.range_data), 2): 42 yield self.range_data[i:i + 2] 43 44 def __eq__(self, other): 45 return self.range_data == other.range_data 46 47 def __ne__(self, other): 48 return self.range_data != other.range_data 49 50 def __parse_data_text(self, text): 51 """ 52 Parse data from text content. 53 """ 54 data = [] 55 monotonic = True 56 last = -1 57 for split_content in text.split(): 58 if "-" in split_content: 59 start_value, end_value = \ 60 (int(n) for n in split_content.split("-")) 61 data.append(start_value) 62 data.append(end_value + 1) 63 if last <= start_value <= end_value: 64 last = end_value 65 else: 66 monotonic = False 67 else: 68 int_content = int(split_content) 69 data.append(int_content) 70 data.append(int_content + 1) 71 if last <= int_content: 72 last = int_content + 1 73 else: 74 monotonic = False 75 data.sort() 76 self.range_data = tuple(self.__remove_repeated_pairs(data)) 77 self.monotonic = monotonic 78 79 @staticmethod 80 def __remove_repeated_pairs(source): 81 """ 82 Remove repeated blocks. 83 """ 84 new = None 85 for num in source: 86 if num == new: 87 new = None 88 else: 89 if new is not None: 90 yield new 91 new = num 92 if new is not None: 93 yield new 94 95 def to_string_raw(self): 96 if len(self.range_data) == 0: 97 raise RuntimeError 98 return "".join([str(len(self.range_data)), ",", ",".join( 99 str(i) for i in self.range_data)]) 100 101 def get_union_with_other(self, other): 102 """ 103 Obtain the intersection. 104 """ 105 range_a = self.get_subtract_with_other(other) 106 range_b = other.get_subtract_with_other(self) 107 range_c = self.get_intersect_with_other(other) 108 range_e, range_f, range_g = \ 109 list(range_a.range_data), list(range_b.range_data), list( 110 range_c.range_data) 111 range_d = [] 112 range_d.extend(range_e) 113 range_d.extend(range_f) 114 range_d.extend(range_g) 115 range_d.sort() 116 return BlocksManager(range_data=range_d) 117 118 def get_intersect_with_other(self, other): 119 """ 120 Obtain the intersection. 121 """ 122 other_data, data, new_data = list(self.range_data), list( 123 other.range_data), [] 124 for i in range(len(data) // 2): 125 for j in range(len(other_data) // 2): 126 data_list1 = [data[i * 2], data[i * 2 + 1], other_data[j * 2], 127 other_data[j * 2 + 1]] 128 data_list2 = [other_data[j * 2], other_data[j * 2 + 1], 129 data[i * 2], data[i * 2 + 1]] 130 sort_list = [data[i * 2], data[i * 2 + 1], other_data[j * 2], 131 other_data[j * 2 + 1]] 132 sort_list.sort() 133 if operator.ne(sort_list, data_list1) and \ 134 operator.ne(sort_list, data_list2): 135 new_data.append(sort_list[1]) 136 new_data.append(sort_list[2]) 137 return BlocksManager(range_data=new_data) 138 139 def get_subtract_with_other(self, other): 140 """ 141 Obtain the difference set. 142 """ 143 intersect_ran = self.get_intersect_with_other(other) 144 data, intersect_data = list(self.range_data), list( 145 intersect_ran.range_data) 146 new_data = data + intersect_data 147 new_data.sort() 148 return BlocksManager(range_data=new_data) 149 150 def is_overlaps(self, other): 151 """ 152 Determine whether there is non-empty overlap. 153 """ 154 intersect_range = self.get_intersect_with_other(other) 155 if intersect_range.size(): 156 return True 157 return False 158 159 def size(self): 160 """ 161 Obtain the self size. 162 """ 163 total = 0 164 data = list(self.range_data) 165 for i in range(len(data) // 2): 166 total += data[i * 2 + 1] - data[i * 2] 167 return total 168 169 def get_map_within(self, other): 170 """ 171 When other is a subset of self, 172 obtain the continuous range starting from 0. 173 :param other: 174 :return: 175 """ 176 out = [] 177 offset = 0 178 start = None 179 for be_num, af_num in \ 180 heapq.merge(zip(self.range_data, itertools.cycle((-5, +5))), 181 zip(other.range_data, itertools.cycle((-1, +1)))): 182 if af_num == -5: 183 start = be_num 184 elif af_num == +5: 185 offset += be_num - start 186 start = None 187 else: 188 out.append(offset + be_num - start) 189 return BlocksManager(range_data=out) 190 191 def extend_value_to_blocks(self, value): 192 """ 193 Extend self 194 :param value: 195 :return: 196 """ 197 data = list(self.range_data) 198 remove_data = [] 199 for i in range(len(data) // 2): 200 data[i * 2 + 1] = data[i * 2 + 1] + value 201 data[i * 2] = max(0, data[i * 2] - value) 202 for i in range(len(data) // 2 - 1): 203 sign_1 = data[i * 2 + 1] 204 sign_2 = data[(i + 1) * 2] 205 if sign_1 >= sign_2: 206 remove_data.append(sign_2) 207 remove_data.append(sign_1) 208 for j in remove_data: 209 data.remove(j) 210 return BlocksManager(data) 211 212 def get_first_block_obj(self, value): 213 """ 214 Return the first range pair containing the value. 215 :param value: 216 :return: 217 """ 218 if self.size() <= value: 219 return self 220 data = list(self.range_data) 221 be_value, af_value = 0, 1 222 for i in range(len(data) // 2): 223 be_value += data[i * 2 + 1] - data[i * 2] 224 if be_value > value: 225 data[i * 2 + 1] = data[i * 2 + 1] - be_value + value 226 break 227 else: 228 af_value += 1 229 return BlocksManager(range_data=data[:af_value * 2]) 230