# Copyright 2021-2022 Huawei Technologies Co., Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================ """utility functions for mindspore.scipy st tests""" import platform from typing import List from functools import cmp_to_key import numpy as onp import scipy.sparse.linalg from scipy.linalg import eigvals from mindspore import Tensor, CSRTensor import mindspore.ops as ops import mindspore.numpy as mnp from mindspore.common import dtype as mstype def to_tensor(obj, dtype=None, indice_dtype=onp.int32): """ This function is used to initialize Tensor or CSRTensor. 'obj' can be three type: 1. tuple or list Must be the format: (list, str), and str should be 'Tensor' or 'CSRTensor'. 2. numpy.ndarray 3. scipy.sparse.csr_matrix """ if isinstance(obj, (tuple, list)): obj, tensor_type = obj if tensor_type == "Tensor": obj = onp.array(obj) elif tensor_type == "CSRTensor": obj = scipy.sparse.csr_matrix(obj) if dtype is None: dtype = obj.dtype if isinstance(obj, onp.ndarray): obj = Tensor(obj.astype(dtype)) elif isinstance(obj, scipy.sparse.csr_matrix): obj = CSRTensor(indptr=Tensor(obj.indptr.astype(indice_dtype)), indices=Tensor(obj.indices.astype(indice_dtype)), values=Tensor(obj.data.astype(dtype)), shape=obj.shape) return obj def to_ndarray(obj, dtype=None): if isinstance(obj, Tensor): obj = obj.asnumpy() elif isinstance(obj, CSRTensor): obj = scipy.sparse.csr_matrix((obj.values.asnumpy(), obj.indices.asnumpy(), obj.indptr.asnumpy()), shape=obj.shape) obj = obj.toarray() if dtype is not None: obj = obj.astype(dtype) return obj def match_array(actual, expected, error=0, err_msg=''): if isinstance(actual, int): actual = onp.asarray(actual) if isinstance(expected, (int, tuple)): expected = onp.asarray(expected) if error > 0: onp.testing.assert_almost_equal(actual, expected, decimal=error, err_msg=err_msg) else: onp.testing.assert_equal(actual, expected, err_msg=err_msg) def match_matrix(actual, expected, error=0, err_msg=''): if actual.shape != expected.shape: raise ValueError( err_msg.join(f" actual shape {actual.shape} is not equal to expected input shape {expected.shape}")) sub_abs = mnp.abs(mnp.subtract(actual, expected)) no_zero_max = sub_abs.max() if no_zero_max > Tensor(error, dtype=mstype.float64): raise ValueError( err_msg.join(f" actual value: {actual} is not equal to expected input value: {expected}")) def create_full_rank_matrix(shape, dtype): if len(shape) < 2 or shape[-1] != shape[-2]: raise ValueError( 'Full rank matrix must be a square matrix, but has shape: ', shape) invertible = False a = None while not invertible: a = onp.random.random(shape).astype(dtype) try: onp.linalg.inv(a) invertible = True except onp.linalg.LinAlgError: pass return a def create_random_rank_matrix(shape, dtype): if dtype in [onp.complex64, onp.complex128]: random_data = onp.random.uniform(low=-1.0, high=1.0, size=shape).astype(dtype) random_data += 1j * onp.random.uniform(low=-1.0, high=1.0, size=shape).astype(dtype) elif dtype in [onp.int32, onp.int64]: random_data = onp.random.randint(10000, size=shape).astype(dtype) else: random_data = onp.random.random(shape).astype(dtype) return random_data def create_sym_pos_matrix(shape, dtype): if len(shape) != 2 or shape[0] != shape[1]: raise ValueError( 'Symmetric positive definite matrix must be a square matrix, but has shape: ', shape) n = shape[-1] count = 0 while count < 100: x = onp.random.random(shape).astype(dtype) a = (onp.matmul(x, x.T) + onp.eye(n)).astype(dtype) count += 1 if onp.min(eigvals(a)) > 0: return a raise ValueError('Symmetric positive definite matrix create failed') def gradient_check(x, net, epsilon=1e-3, symmetric=False, enumerate_fn=onp.ndenumerate): # Some utils def _tensor_to_numpy(arg: List[Tensor]) -> List[onp.ndarray]: return [_arg.asnumpy() for _arg in arg] def _numpy_to_tensor(arg: List[onp.ndarray]) -> List[Tensor]: return [Tensor(_arg) for _arg in arg] def _add_value(arg: List[onp.ndarray], outer, inner, value): arg[outer][inner] += value return arg def _flatten(arg: List[onp.ndarray]) -> onp.ndarray: arg = [_arg.reshape((-1,)) for _arg in arg] return onp.concatenate(arg) if isinstance(x, Tensor): x = [x] # Using automatic differentiation to calculate gradient grad_net = ops.GradOperation(get_all=True)(net) x_grad = grad_net(*x) x_grad = _tensor_to_numpy(x_grad) # Using the definition of a derivative to calculate gradient x = _tensor_to_numpy(x) x_grad_approx = [onp.zeros_like(_x) for _x in x_grad] for outer, _x in enumerate(x): for inner, _ in enumerate_fn(_x): x = _add_value(x, outer, inner, epsilon) y_plus = net(*_numpy_to_tensor(x)).asnumpy() x = _add_value(x, outer, inner, -2 * epsilon) y_minus = net(*_numpy_to_tensor(x)).asnumpy() y_grad = (y_plus - y_minus) / (2 * epsilon) x = _add_value(x, outer, inner, epsilon) x_grad_approx = _add_value(x_grad_approx, outer, inner, y_grad) if symmetric: x_grad_approx = [0.5 * (_x_grad + _x_grad.conj().T) for _x_grad in x_grad_approx] x_grad = _flatten(x_grad) x_grad_approx = _flatten(x_grad_approx) numerator = onp.linalg.norm(x_grad - x_grad_approx) denominator = onp.linalg.norm(x_grad) + onp.linalg.norm(x_grad_approx) difference = numerator / denominator return difference def compare_eigen_decomposition(src_res, tgt_res, compute_v, rtol, atol): def my_argsort(w): """ Sort eigenvalues, by comparing the real part first, and then the image part when the real part is comparatively same (less than rtol). """ def my_cmp(x_id, y_id): x = w[x_id] y = w[y_id] if abs(onp.real(x) - onp.real(y)) < rtol: return onp.imag(x) - onp.imag(y) return onp.real(x) - onp.real(y) w_ind = list(range(len(w))) w_ind.sort(key=cmp_to_key(my_cmp)) return w_ind sw, mw = src_res[0], tgt_res[0] s_perm = my_argsort(sw) m_perm = my_argsort(mw) sw = onp.take(sw, s_perm, -1) mw = onp.take(mw, m_perm, -1) assert onp.allclose(sw, mw, rtol=rtol, atol=atol) if compute_v: sv, mv = src_res[1], tgt_res[1] sv = onp.take(sv, s_perm, -1) mv = onp.take(mv, m_perm, -1) # Normalize eigenvectors. phases = onp.sum(sv.conj() * mv, -2, keepdims=True) sv = phases / onp.abs(phases) * sv assert onp.allclose(sv, mv, rtol=rtol, atol=atol) def get_platform(): return platform.system().lower()