• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 Huawei Technologies Co., Ltd
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ============================================================================
15
16import numpy as np
17import pytest
18
19import mindspore.context as context
20import mindspore.nn as nn
21from mindspore import Tensor
22from mindspore.common.api import ms_function
23from mindspore.common.parameter import Parameter
24from mindspore.ops import operations as P
25import mindspore as ms
26
27
28class RLBufferAppend(nn.Cell):
29    def __init__(self, capcity, shapes, types):
30        super(RLBufferAppend, self).__init__()
31        self._capacity = capcity
32        self.count = Parameter(Tensor(0, ms.int32), name="count")
33        self.head = Parameter(Tensor(0, ms.int32), name="head")
34        self.buffer_append = P.BufferAppend(self._capacity, shapes, types)
35
36    @ms_function
37    def construct(self, buffer, exps):
38        return self.buffer_append(buffer, exps, self.count, self.head)
39
40
41class RLBufferGet(nn.Cell):
42    def __init__(self, capcity, shapes, types):
43        super(RLBufferGet, self).__init__()
44        self._capacity = capcity
45        self.count = Parameter(Tensor(5, ms.int32), name="count")
46        self.head = Parameter(Tensor(0, ms.int32), name="head")
47        self.buffer_get = P.BufferGetItem(self._capacity, shapes, types)
48
49    @ms_function
50    def construct(self, buffer, index):
51        return self.buffer_get(buffer, self.count, self.head, index)
52
53
54class RLBufferSample(nn.Cell):
55    def __init__(self, capcity, batch_size, shapes, types):
56        super(RLBufferSample, self).__init__()
57        self._capacity = capcity
58        self.count = Parameter(Tensor(5, ms.int32), name="count")
59        self.head = Parameter(Tensor(0, ms.int32), name="head")
60        self.buffer_sample = P.BufferSample(self._capacity, batch_size, shapes, types)
61
62    @ms_function
63    def construct(self, buffer):
64        return self.buffer_sample(buffer, self.count, self.head)
65
66
67states = Tensor(np.arange(4*5).reshape(5, 4).astype(np.float32)/10.0)
68actions = Tensor(np.arange(2*5).reshape(5, 2).astype(np.int32))
69rewards = Tensor(np.ones((5, 1)).astype(np.int32))
70states_ = Tensor(np.arange(4*5).reshape(5, 4).astype(np.float32))
71b = [states, actions, rewards, states_]
72
73s = Tensor(np.array([2, 2, 2, 2]), ms.float32)
74a = Tensor(np.array([0, 0]), ms.int32)
75r = Tensor(np.array([0]), ms.int32)
76s_ = Tensor(np.array([3, 3, 3, 3]), ms.float32)
77exp = [s, a, r, s_]
78exp1 = [s_, a, r, s]
79
80c = [Tensor(np.array([[6, 6, 6, 6], [6, 6, 6, 6]]), ms.float32),
81     Tensor(np.array([[6, 6], [6, 6]]), ms.int32),
82     Tensor(np.array([[6], [6]]), ms.int32),
83     Tensor(np.array([[6, 6, 6, 6], [6, 6, 6, 6]]), ms.float32)]
84
85@ pytest.mark.level0
86@ pytest.mark.platform_x86_cpu
87@ pytest.mark.env_onecard
88def test_BufferSample():
89    context.set_context(mode=context.PYNATIVE_MODE, device_target='CPU')
90    buffer_sample = RLBufferSample(capcity=5, batch_size=3, shapes=[(4,), (2,), (1,), (4,)], types=[
91        ms.float32, ms.int32, ms.int32, ms.float32])
92    ss, aa, rr, ss_ = buffer_sample(b)
93    print(ss, aa, rr, ss_)
94
95
96@ pytest.mark.level0
97@ pytest.mark.platform_x86_cpu
98@ pytest.mark.env_onecard
99def test_BufferGet():
100    context.set_context(mode=context.PYNATIVE_MODE, device_target='CPU')
101    buffer_get = RLBufferGet(capcity=5, shapes=[(4,), (2,), (1,), (4,)], types=[
102        ms.float32, ms.int32, ms.int32, ms.float32])
103    ss, aa, rr, ss_ = buffer_get(b, 1)
104    expect_s = [0.4, 0.5, 0.6, 0.7]
105    expect_a = [2, 3]
106    expect_r = [1]
107    expect_s_ = [4, 5, 6, 7]
108    np.testing.assert_almost_equal(ss.asnumpy(), expect_s)
109    np.testing.assert_almost_equal(aa.asnumpy(), expect_a)
110    np.testing.assert_almost_equal(rr.asnumpy(), expect_r)
111    np.testing.assert_almost_equal(ss_.asnumpy(), expect_s_)
112
113
114@ pytest.mark.level0
115@ pytest.mark.platform_x86_cpu
116@ pytest.mark.env_onecard
117def test_BufferAppend():
118    context.set_context(mode=context.PYNATIVE_MODE, device_target='CPU')
119    buffer_append = RLBufferAppend(capcity=5, shapes=[(4,), (2,), (1,), (4,)], types=[
120        ms.float32, ms.int32, ms.int32, ms.float32])
121
122    buffer_append(b, exp)
123    buffer_append(b, exp)
124    buffer_append(b, exp)
125    buffer_append(b, exp)
126    buffer_append(b, exp)
127    buffer_append(b, exp1)
128    expect_s = [[3, 3, 3, 3], [2, 2, 2, 2], [2, 2, 2, 2], [2, 2, 2, 2], [2, 2, 2, 2]]
129    expect_a = [[0, 0], [0, 0], [0, 0], [0, 0], [0, 0]]
130    expect_r = [[0], [0], [0], [0], [0]]
131    expect_s_ = [[2, 2, 2, 2], [3, 3, 3, 3], [3, 3, 3, 3], [3, 3, 3, 3], [3, 3, 3, 3]]
132    np.testing.assert_almost_equal(b[0].asnumpy(), expect_s)
133    np.testing.assert_almost_equal(b[1].asnumpy(), expect_a)
134    np.testing.assert_almost_equal(b[2].asnumpy(), expect_r)
135    np.testing.assert_almost_equal(b[3].asnumpy(), expect_s_)
136    buffer_append(b, exp1)
137    buffer_append(b, c)
138    buffer_append(b, c)
139    expect_s2 = [[6, 6, 6, 6], [3, 3, 3, 3], [6, 6, 6, 6], [6, 6, 6, 6], [6, 6, 6, 6]]
140    expect_a2 = [[6, 6], [0, 0], [6, 6], [6, 6], [6, 6]]
141    expect_r2 = [[6], [0], [6], [6], [6]]
142    expect_s2_ = [[6, 6, 6, 6], [2, 2, 2, 2], [6, 6, 6, 6], [6, 6, 6, 6], [6, 6, 6, 6]]
143    np.testing.assert_almost_equal(b[0].asnumpy(), expect_s2)
144    np.testing.assert_almost_equal(b[1].asnumpy(), expect_a2)
145    np.testing.assert_almost_equal(b[2].asnumpy(), expect_r2)
146    np.testing.assert_almost_equal(b[3].asnumpy(), expect_s2_)
147