• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4# Copyright (c) 2021 Huawei Device Co., Ltd.
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import heapq
18import itertools
19import operator
20
21
22class BlocksManager(object):
23    """
24    blocks manager
25    """
26
27    def __init__(self, range_data=None):
28        self.monotonic = False
29        if isinstance(range_data, str):
30	        tmp_range = []
31            if range_data != "0":
32                for i in range_data.split():
33                    if i != "0":
34                        tmp_range.append(i)
35                range_data = ' '.join(tmp_range)
36            self.__parse_data_text(range_data)
37        elif range_data:
38            if len(range_data) % 2 != 0:
39                raise RuntimeError
40            self.range_data = tuple(self.__remove_repeated_pairs(range_data))
41            self.monotonic = all(
42                x < y for x, y in zip(self.range_data, self.range_data[1:]))
43        else:
44            self.range_data = ()
45
46    def __iter__(self):
47        for i in range(0, len(self.range_data), 2):
48            yield self.range_data[i:i + 2]
49
50    def __eq__(self, other):
51        return self.range_data == other.range_data
52
53    def __ne__(self, other):
54        return self.range_data != other.range_data
55
56    def __parse_data_text(self, text):
57        """
58        Parse data from text content.
59        """
60        data = []
61        monotonic = True
62        last = -1
63        for split_content in text.split():
64            if "-" in split_content:
65                start_value, end_value = \
66                    (int(n) for n in split_content.split("-"))
67                data.append(start_value)
68                data.append(end_value + 1)
69                if last <= start_value <= end_value:
70                    last = end_value
71                else:
72                    monotonic = False
73            else:
74                int_content = int(split_content)
75                data.append(int_content)
76                data.append(int_content + 1)
77                if last <= int_content:
78                    last = int_content + 1
79                else:
80                    monotonic = False
81        data.sort()
82        self.range_data = tuple(self.__remove_repeated_pairs(data))
83        self.monotonic = monotonic
84
85    @staticmethod
86    def __remove_repeated_pairs(source):
87        """
88        Remove repeated blocks.
89        """
90        new = None
91        for num in source:
92            if num == new:
93                new = None
94            else:
95                if new is not None:
96                    yield new
97                new = num
98        if new is not None:
99            yield new
100
101    def to_string_raw(self):
102        if len(self.range_data) == 0:
103            raise RuntimeError
104        return "".join([str(len(self.range_data)), ",", ",".join(
105            str(i) for i in self.range_data)])
106
107    def get_union_with_other(self, other):
108        """
109        Obtain the intersection.
110        """
111        range_a = self.get_subtract_with_other(other)
112        range_b = other.get_subtract_with_other(self)
113        range_c = self.get_intersect_with_other(other)
114        range_e, range_f, range_g = \
115            list(range_a.range_data), list(range_b.range_data), list(
116                range_c.range_data)
117        range_d = []
118        range_d.extend(range_e)
119        range_d.extend(range_f)
120        range_d.extend(range_g)
121        range_d.sort()
122        return BlocksManager(range_data=range_d)
123
124    def get_intersect_with_other(self, other):
125        """
126        Obtain the intersection.
127        """
128        other_data, data, new_data = list(self.range_data), list(
129            other.range_data), []
130        for i in range(len(data) // 2):
131            for j in range(len(other_data) // 2):
132                data_list1 = [data[i * 2], data[i * 2 + 1], other_data[j * 2],
133                              other_data[j * 2 + 1]]
134                data_list2 = [other_data[j * 2], other_data[j * 2 + 1],
135                              data[i * 2], data[i * 2 + 1]]
136                sort_list = [data[i * 2], data[i * 2 + 1], other_data[j * 2],
137                             other_data[j * 2 + 1]]
138                sort_list.sort()
139                if operator.ne(sort_list, data_list1) and \
140                        operator.ne(sort_list, data_list2):
141                    new_data.append(sort_list[1])
142                    new_data.append(sort_list[2])
143        return BlocksManager(range_data=new_data)
144
145    def get_subtract_with_other(self, other):
146        """
147        Obtain the difference set.
148        """
149        intersect_ran = self.get_intersect_with_other(other)
150        data, intersect_data = list(self.range_data), list(
151            intersect_ran.range_data)
152        new_data = data + intersect_data
153        new_data.sort()
154        return BlocksManager(range_data=new_data)
155
156    def is_overlaps(self, other):
157        """
158        Determine whether there is non-empty overlap.
159        """
160        intersect_range = self.get_intersect_with_other(other)
161        if intersect_range.size():
162            return True
163        return False
164
165    def size(self):
166        """
167        Obtain the self size.
168        """
169        total = 0
170        data = list(self.range_data)
171        for i in range(len(data) // 2):
172            total += data[i * 2 + 1] - data[i * 2]
173        return total
174
175    def get_map_within(self, other):
176        """
177        When other is a subset of self,
178        obtain the continuous range starting from 0.
179        :param other:
180        :return:
181        """
182        out = []
183        offset = 0
184        start = None
185        for be_num, af_num in \
186                heapq.merge(zip(self.range_data, itertools.cycle((-5, +5))),
187                            zip(other.range_data, itertools.cycle((-1, +1)))):
188            if af_num == -5:
189                start = be_num
190            elif af_num == +5:
191                offset += be_num - start
192                start = None
193            else:
194                out.append(offset + be_num - start)
195        return BlocksManager(range_data=out)
196
197    def extend_value_to_blocks(self, value):
198        """
199        Extend self
200        :param value:
201        :return:
202        """
203        data = list(self.range_data)
204        remove_data = []
205        for i in range(len(data) // 2):
206            data[i * 2 + 1] = data[i * 2 + 1] + value
207            data[i * 2] = max(0, data[i * 2] - value)
208        for i in range(len(data) // 2 - 1):
209            sign_1 = data[i * 2 + 1]
210            sign_2 = data[(i + 1) * 2]
211            if sign_1 >= sign_2:
212                remove_data.append(sign_2)
213                remove_data.append(sign_1)
214        for j in remove_data:
215            data.remove(j)
216        return BlocksManager(data)
217
218    def get_first_block_obj(self, value):
219        """
220        Return the first range pair containing the value.
221        :param value:
222        :return:
223        """
224        if self.size() <= value:
225            return self
226        data = list(self.range_data)
227        be_value, af_value = 0, 1
228        for i in range(len(data) // 2):
229            be_value += data[i * 2 + 1] - data[i * 2]
230            if be_value > value:
231                data[i * 2 + 1] = data[i * 2 + 1] - be_value + value
232                break
233            else:
234                af_value += 1
235        return BlocksManager(range_data=data[:af_value * 2])
236