1# Copyright 2021 Huawei Technologies Co., Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================ 15 16import numpy as np 17import pytest 18 19import mindspore.context as context 20import mindspore.nn as nn 21from mindspore import Tensor 22from mindspore.common.api import ms_function 23from mindspore.common.parameter import Parameter 24from mindspore.ops import operations as P 25import mindspore as ms 26 27 28class RLBufferAppend(nn.Cell): 29 def __init__(self, capcity, shapes, types): 30 super(RLBufferAppend, self).__init__() 31 self._capacity = capcity 32 self.count = Parameter(Tensor(0, ms.int32), name="count") 33 self.head = Parameter(Tensor(0, ms.int32), name="head") 34 self.buffer_append = P.BufferAppend(self._capacity, shapes, types) 35 36 @ms_function 37 def construct(self, buffer, exps): 38 return self.buffer_append(buffer, exps, self.count, self.head) 39 40 41class RLBufferGet(nn.Cell): 42 def __init__(self, capcity, shapes, types): 43 super(RLBufferGet, self).__init__() 44 self._capacity = capcity 45 self.count = Parameter(Tensor(5, ms.int32), name="count") 46 self.head = Parameter(Tensor(0, ms.int32), name="head") 47 self.buffer_get = P.BufferGetItem(self._capacity, shapes, types) 48 49 @ms_function 50 def construct(self, buffer, index): 51 return self.buffer_get(buffer, self.count, self.head, index) 52 53 54class RLBufferSample(nn.Cell): 55 def __init__(self, capcity, batch_size, shapes, types): 56 super(RLBufferSample, self).__init__() 57 self._capacity = capcity 58 self.count = Parameter(Tensor(5, ms.int32), name="count") 59 self.head = Parameter(Tensor(0, ms.int32), name="head") 60 self.buffer_sample = P.BufferSample(self._capacity, batch_size, shapes, types) 61 62 @ms_function 63 def construct(self, buffer): 64 return self.buffer_sample(buffer, self.count, self.head) 65 66 67states = Tensor(np.arange(4*5).reshape(5, 4).astype(np.float32)/10.0) 68actions = Tensor(np.arange(2*5).reshape(5, 2).astype(np.int32)) 69rewards = Tensor(np.ones((5, 1)).astype(np.int32)) 70states_ = Tensor(np.arange(4*5).reshape(5, 4).astype(np.float32)) 71b = [states, actions, rewards, states_] 72 73s = Tensor(np.array([2, 2, 2, 2]), ms.float32) 74a = Tensor(np.array([0, 0]), ms.int32) 75r = Tensor(np.array([0]), ms.int32) 76s_ = Tensor(np.array([3, 3, 3, 3]), ms.float32) 77exp = [s, a, r, s_] 78exp1 = [s_, a, r, s] 79 80c = [Tensor(np.array([[6, 6, 6, 6], [6, 6, 6, 6]]), ms.float32), 81 Tensor(np.array([[6, 6], [6, 6]]), ms.int32), 82 Tensor(np.array([[6], [6]]), ms.int32), 83 Tensor(np.array([[6, 6, 6, 6], [6, 6, 6, 6]]), ms.float32)] 84 85@ pytest.mark.level0 86@ pytest.mark.platform_x86_cpu 87@ pytest.mark.env_onecard 88def test_BufferSample(): 89 context.set_context(mode=context.PYNATIVE_MODE, device_target='CPU') 90 buffer_sample = RLBufferSample(capcity=5, batch_size=3, shapes=[(4,), (2,), (1,), (4,)], types=[ 91 ms.float32, ms.int32, ms.int32, ms.float32]) 92 ss, aa, rr, ss_ = buffer_sample(b) 93 print(ss, aa, rr, ss_) 94 95 96@ pytest.mark.level0 97@ pytest.mark.platform_x86_cpu 98@ pytest.mark.env_onecard 99def test_BufferGet(): 100 context.set_context(mode=context.PYNATIVE_MODE, device_target='CPU') 101 buffer_get = RLBufferGet(capcity=5, shapes=[(4,), (2,), (1,), (4,)], types=[ 102 ms.float32, ms.int32, ms.int32, ms.float32]) 103 ss, aa, rr, ss_ = buffer_get(b, 1) 104 expect_s = [0.4, 0.5, 0.6, 0.7] 105 expect_a = [2, 3] 106 expect_r = [1] 107 expect_s_ = [4, 5, 6, 7] 108 np.testing.assert_almost_equal(ss.asnumpy(), expect_s) 109 np.testing.assert_almost_equal(aa.asnumpy(), expect_a) 110 np.testing.assert_almost_equal(rr.asnumpy(), expect_r) 111 np.testing.assert_almost_equal(ss_.asnumpy(), expect_s_) 112 113 114@ pytest.mark.level0 115@ pytest.mark.platform_x86_cpu 116@ pytest.mark.env_onecard 117def test_BufferAppend(): 118 context.set_context(mode=context.PYNATIVE_MODE, device_target='CPU') 119 buffer_append = RLBufferAppend(capcity=5, shapes=[(4,), (2,), (1,), (4,)], types=[ 120 ms.float32, ms.int32, ms.int32, ms.float32]) 121 122 buffer_append(b, exp) 123 buffer_append(b, exp) 124 buffer_append(b, exp) 125 buffer_append(b, exp) 126 buffer_append(b, exp) 127 buffer_append(b, exp1) 128 expect_s = [[3, 3, 3, 3], [2, 2, 2, 2], [2, 2, 2, 2], [2, 2, 2, 2], [2, 2, 2, 2]] 129 expect_a = [[0, 0], [0, 0], [0, 0], [0, 0], [0, 0]] 130 expect_r = [[0], [0], [0], [0], [0]] 131 expect_s_ = [[2, 2, 2, 2], [3, 3, 3, 3], [3, 3, 3, 3], [3, 3, 3, 3], [3, 3, 3, 3]] 132 np.testing.assert_almost_equal(b[0].asnumpy(), expect_s) 133 np.testing.assert_almost_equal(b[1].asnumpy(), expect_a) 134 np.testing.assert_almost_equal(b[2].asnumpy(), expect_r) 135 np.testing.assert_almost_equal(b[3].asnumpy(), expect_s_) 136 buffer_append(b, exp1) 137 buffer_append(b, c) 138 buffer_append(b, c) 139 expect_s2 = [[6, 6, 6, 6], [3, 3, 3, 3], [6, 6, 6, 6], [6, 6, 6, 6], [6, 6, 6, 6]] 140 expect_a2 = [[6, 6], [0, 0], [6, 6], [6, 6], [6, 6]] 141 expect_r2 = [[6], [0], [6], [6], [6]] 142 expect_s2_ = [[6, 6, 6, 6], [2, 2, 2, 2], [6, 6, 6, 6], [6, 6, 6, 6], [6, 6, 6, 6]] 143 np.testing.assert_almost_equal(b[0].asnumpy(), expect_s2) 144 np.testing.assert_almost_equal(b[1].asnumpy(), expect_a2) 145 np.testing.assert_almost_equal(b[2].asnumpy(), expect_r2) 146 np.testing.assert_almost_equal(b[3].asnumpy(), expect_s2_) 147