1# Copyright 2021 Huawei Technologies Co., Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================ 15 16import numpy as np 17import pytest 18 19import mindspore.context as context 20import mindspore.nn as nn 21from mindspore import Tensor 22from mindspore.common.api import ms_function 23from mindspore.common.parameter import Parameter 24from mindspore.ops import operations as P 25import mindspore as ms 26 27 28class RLBufferAppend(nn.Cell): 29 def __init__(self, capcity, shapes, types): 30 super(RLBufferAppend, self).__init__() 31 self._capacity = capcity 32 self.count = Parameter(Tensor(0, ms.int32), name="count") 33 self.head = Parameter(Tensor(0, ms.int32), name="head") 34 self.buffer_append = P.BufferAppend(self._capacity, shapes, types) 35 36 @ms_function 37 def construct(self, buffer, exps): 38 return self.buffer_append(buffer, exps, self.count, self.head) 39 40 41class RLBufferGet(nn.Cell): 42 def __init__(self, capcity, shapes, types): 43 super(RLBufferGet, self).__init__() 44 self._capacity = capcity 45 self.count = Parameter(Tensor(5, ms.int32), name="count") 46 self.head = Parameter(Tensor(0, ms.int32), name="head") 47 self.buffer_get = P.BufferGetItem(self._capacity, shapes, types) 48 49 @ms_function 50 def construct(self, buffer, index): 51 return self.buffer_get(buffer, self.count, self.head, index) 52 53 54class RLBufferSample(nn.Cell): 55 def __init__(self, capcity, batch_size, shapes, types): 56 super(RLBufferSample, self).__init__() 57 self._capacity = capcity 58 self.count = Parameter(Tensor(5, ms.int32), name="count") 59 self.head = Parameter(Tensor(0, ms.int32), name="head") 60 self.buffer_sample = P.BufferSample( 61 self._capacity, batch_size, shapes, types) 62 63 @ms_function 64 def construct(self, buffer): 65 return self.buffer_sample(buffer, self.count, self.head) 66 67 68states = Tensor(np.arange(4*5).reshape(5, 4).astype(np.float32)/10.0) 69actions = Tensor(np.arange(2*5).reshape(5, 2).astype(np.int32)) 70rewards = Tensor(np.ones((5, 1)).astype(np.int32)) 71states_ = Tensor(np.arange(4*5).reshape(5, 4).astype(np.float32)) 72b = [states, actions, rewards, states_] 73 74s = Tensor(np.array([2, 2, 2, 2]), ms.float32) 75a = Tensor(np.array([0, 0]), ms.int32) 76r = Tensor(np.array([0]), ms.int32) 77s_ = Tensor(np.array([3, 3, 3, 3]), ms.float32) 78exp = [s, a, r, s_] 79exp1 = [s_, a, r, s] 80 81c = [Tensor(np.array([[6, 6, 6, 6], [6, 6, 6, 6]]), ms.float32), 82 Tensor(np.array([[6, 6], [6, 6]]), ms.int32), 83 Tensor(np.array([[6], [6]]), ms.int32), 84 Tensor(np.array([[6, 6, 6, 6], [6, 6, 6, 6]]), ms.float32)] 85 86@ pytest.mark.level0 87@ pytest.mark.platform_x86_gpu_training 88@ pytest.mark.env_onecard 89def test_BufferSample(): 90 context.set_context(mode=context.PYNATIVE_MODE, device_target='GPU') 91 buffer_sample = RLBufferSample(capcity=5, batch_size=3, shapes=[(4,), (2,), (1,), (4,)], types=[ 92 ms.float32, ms.int32, ms.int32, ms.float32]) 93 ss, aa, rr, ss_ = buffer_sample(b) 94 print(ss, aa, rr, ss_) 95 96 97@ pytest.mark.level0 98@ pytest.mark.platform_x86_gpu_training 99@ pytest.mark.env_onecard 100def test_BufferGet(): 101 context.set_context(mode=context.PYNATIVE_MODE, device_target='GPU') 102 buffer_get = RLBufferGet(capcity=5, shapes=[(4,), (2,), (1,), (4,)], types=[ 103 ms.float32, ms.int32, ms.int32, ms.float32]) 104 ss, aa, rr, ss_ = buffer_get(b, 1) 105 expect_s = [0.4, 0.5, 0.6, 0.7] 106 expect_a = [2, 3] 107 expect_r = [1] 108 expect_s_ = [4, 5, 6, 7] 109 np.testing.assert_almost_equal(ss.asnumpy(), expect_s) 110 np.testing.assert_almost_equal(aa.asnumpy(), expect_a) 111 np.testing.assert_almost_equal(rr.asnumpy(), expect_r) 112 np.testing.assert_almost_equal(ss_.asnumpy(), expect_s_) 113 114 115@ pytest.mark.level0 116@ pytest.mark.platform_x86_gpu_training 117@ pytest.mark.env_onecard 118def test_BufferAppend(): 119 context.set_context(mode=context.PYNATIVE_MODE, device_target='GPU') 120 buffer_append = RLBufferAppend(capcity=5, shapes=[(4,), (2,), (1,), (4,)], types=[ 121 ms.float32, ms.int32, ms.int32, ms.float32]) 122 123 buffer_append(b, exp) 124 buffer_append(b, exp) 125 buffer_append(b, exp) 126 buffer_append(b, exp) 127 buffer_append(b, exp) 128 buffer_append(b, exp1) 129 expect_s = [[3, 3, 3, 3], [2, 2, 2, 2], [2, 2, 2, 2], [2, 2, 2, 2], [2, 2, 2, 2]] 130 expect_a = [[0, 0], [0, 0], [0, 0], [0, 0], [0, 0]] 131 expect_r = [[0], [0], [0], [0], [0]] 132 expect_s_ = [[2, 2, 2, 2], [3, 3, 3, 3], [3, 3, 3, 3], [3, 3, 3, 3], [3, 3, 3, 3]] 133 np.testing.assert_almost_equal(b[0].asnumpy(), expect_s) 134 np.testing.assert_almost_equal(b[1].asnumpy(), expect_a) 135 np.testing.assert_almost_equal(b[2].asnumpy(), expect_r) 136 np.testing.assert_almost_equal(b[3].asnumpy(), expect_s_) 137 buffer_append(b, exp1) 138 buffer_append(b, c) 139 buffer_append(b, c) 140 expect_s2 = [[6, 6, 6, 6], [3, 3, 3, 3], [6, 6, 6, 6], [6, 6, 6, 6], [6, 6, 6, 6]] 141 expect_a2 = [[6, 6], [0, 0], [6, 6], [6, 6], [6, 6]] 142 expect_r2 = [[6], [0], [6], [6], [6]] 143 expect_s2_ = [[6, 6, 6, 6], [2, 2, 2, 2], [6, 6, 6, 6], [6, 6, 6, 6], [6, 6, 6, 6]] 144 np.testing.assert_almost_equal(b[0].asnumpy(), expect_s2) 145 np.testing.assert_almost_equal(b[1].asnumpy(), expect_a2) 146 np.testing.assert_almost_equal(b[2].asnumpy(), expect_r2) 147 np.testing.assert_almost_equal(b[3].asnumpy(), expect_s2_) 148