• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4# Copyright (c) 2021 Huawei Device Co., Ltd.
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import heapq
18import itertools
19import operator
20
21
22class BlocksManager(object):
23    """
24    blocks manager
25    """
26
27    def __init__(self, range_data=None):
28        self.monotonic = False
29        if isinstance(range_data, str):
30            self.__parse_data_text(range_data)
31        elif range_data:
32            if len(range_data) % 2 != 0:
33                raise RuntimeError
34            self.range_data = tuple(self.__remove_repeated_pairs(range_data))
35            self.monotonic = all(
36                x < y for x, y in zip(self.range_data, self.range_data[1:]))
37        else:
38            self.range_data = ()
39
40    def __iter__(self):
41        for i in range(0, len(self.range_data), 2):
42            yield self.range_data[i:i + 2]
43
44    def __eq__(self, other):
45        return self.range_data == other.range_data
46
47    def __ne__(self, other):
48        return self.range_data != other.range_data
49
50    def __parse_data_text(self, text):
51        """
52        Parse data from text content.
53        """
54        data = []
55        monotonic = True
56        last = -1
57        for split_content in text.split():
58            if "-" in split_content:
59                start_value, end_value = \
60                    (int(n) for n in split_content.split("-"))
61                data.append(start_value)
62                data.append(end_value + 1)
63                if last <= start_value <= end_value:
64                    last = end_value
65                else:
66                    monotonic = False
67            else:
68                int_content = int(split_content)
69                data.append(int_content)
70                data.append(int_content + 1)
71                if last <= int_content:
72                    last = int_content + 1
73                else:
74                    monotonic = False
75        data.sort()
76        self.range_data = tuple(self.__remove_repeated_pairs(data))
77        self.monotonic = monotonic
78
79    @staticmethod
80    def __remove_repeated_pairs(source):
81        """
82        Remove repeated blocks.
83        """
84        new = None
85        for num in source:
86            if num == new:
87                new = None
88            else:
89                if new is not None:
90                    yield new
91                new = num
92        if new is not None:
93            yield new
94
95    def to_string_raw(self):
96        if len(self.range_data) == 0:
97            raise RuntimeError
98        return "".join([str(len(self.range_data)), ",", ",".join(
99            str(i) for i in self.range_data)])
100
101    def get_union_with_other(self, other):
102        """
103        Obtain the intersection.
104        """
105        range_a = self.get_subtract_with_other(other)
106        range_b = other.get_subtract_with_other(self)
107        range_c = self.get_intersect_with_other(other)
108        range_e, range_f, range_g = \
109            list(range_a.range_data), list(range_b.range_data), list(
110                range_c.range_data)
111        range_d = []
112        range_d.extend(range_e)
113        range_d.extend(range_f)
114        range_d.extend(range_g)
115        range_d.sort()
116        return BlocksManager(range_data=range_d)
117
118    def get_intersect_with_other(self, other):
119        """
120        Obtain the intersection.
121        """
122        other_data, data, new_data = list(self.range_data), list(
123            other.range_data), []
124        for i in range(len(data) // 2):
125            for j in range(len(other_data) // 2):
126                data_list1 = [data[i * 2], data[i * 2 + 1], other_data[j * 2],
127                              other_data[j * 2 + 1]]
128                data_list2 = [other_data[j * 2], other_data[j * 2 + 1],
129                              data[i * 2], data[i * 2 + 1]]
130                sort_list = [data[i * 2], data[i * 2 + 1], other_data[j * 2],
131                             other_data[j * 2 + 1]]
132                sort_list.sort()
133                if operator.ne(sort_list, data_list1) and \
134                        operator.ne(sort_list, data_list2):
135                    new_data.append(sort_list[1])
136                    new_data.append(sort_list[2])
137        return BlocksManager(range_data=new_data)
138
139    def get_subtract_with_other(self, other):
140        """
141        Obtain the difference set.
142        """
143        intersect_ran = self.get_intersect_with_other(other)
144        data, intersect_data = list(self.range_data), list(
145            intersect_ran.range_data)
146        new_data = data + intersect_data
147        new_data.sort()
148        return BlocksManager(range_data=new_data)
149
150    def is_overlaps(self, other):
151        """
152        Determine whether there is non-empty overlap.
153        """
154        intersect_range = self.get_intersect_with_other(other)
155        if intersect_range.size():
156            return True
157        return False
158
159    def size(self):
160        """
161        Obtain the self size.
162        """
163        total = 0
164        data = list(self.range_data)
165        for i in range(len(data) // 2):
166            total += data[i * 2 + 1] - data[i * 2]
167        return total
168
169    def get_map_within(self, other):
170        """
171        When other is a subset of self,
172        obtain the continuous range starting from 0.
173        :param other:
174        :return:
175        """
176        out = []
177        offset = 0
178        start = None
179        for be_num, af_num in \
180                heapq.merge(zip(self.range_data, itertools.cycle((-5, +5))),
181                            zip(other.range_data, itertools.cycle((-1, +1)))):
182            if af_num == -5:
183                start = be_num
184            elif af_num == +5:
185                offset += be_num - start
186                start = None
187            else:
188                out.append(offset + be_num - start)
189        return BlocksManager(range_data=out)
190
191    def extend_value_to_blocks(self, value):
192        """
193        Extend self
194        :param value:
195        :return:
196        """
197        data = list(self.range_data)
198        remove_data = []
199        for i in range(len(data) // 2):
200            data[i * 2 + 1] = data[i * 2 + 1] + value
201            data[i * 2] = max(0, data[i * 2] - value)
202        for i in range(len(data) // 2 - 1):
203            sign_1 = data[i * 2 + 1]
204            sign_2 = data[(i + 1) * 2]
205            if sign_1 >= sign_2:
206                remove_data.append(sign_2)
207                remove_data.append(sign_1)
208        for j in remove_data:
209            data.remove(j)
210        return BlocksManager(data)
211
212    def get_first_block_obj(self, value):
213        """
214        Return the first range pair containing the value.
215        :param value:
216        :return:
217        """
218        if self.size() <= value:
219            return self
220        data = list(self.range_data)
221        be_value, af_value = 0, 1
222        for i in range(len(data) // 2):
223            be_value += data[i * 2 + 1] - data[i * 2]
224            if be_value > value:
225                data[i * 2 + 1] = data[i * 2 + 1] - be_value + value
226                break
227            else:
228                af_value += 1
229        return BlocksManager(range_data=data[:af_value * 2])
230