• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 Huawei Technologies Co., Ltd
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ============================================================================
15import numpy as np
16import mindspore as ms
17import mindspore.nn as nn
18from mindspore import context
19from mindspore import Tensor
20from mindspore.ops import operations as P
21from mindspore.common.parameter import Parameter
22from mindspore.common.initializer import initializer
23from mindspore.train.model import Model
24from mindspore.nn.wrap.cell_wrapper import PipelineCell
25
26
27class DatasetLenet():
28    def __init__(self, data, label, length=3):
29        self.data = data
30        self.label = label
31        self.index = 1
32        self.length = length
33
34    def __iter__(self):
35        return self
36
37    def __next__(self):
38        if self.index >= self.length:
39            raise StopIteration
40        self.index += 1
41        return self.data, self.label
42
43    def reset(self):
44        self.index = 0
45
46    def get_dataset_size(self):
47        return 32
48
49    def get_repeat_count(self):
50        return 1
51
52    def get_batch_size(self):
53        return 32
54
55    def create_tuple_iterator(self, num_epochs=1, do_copy=True):
56        return self
57
58
59class MatMulCell(nn.Cell):
60    def __init__(self, strategy1, strategy2, param=None):
61        super().__init__()
62        self.param = Parameter(initializer("zeros", [64, 64]), name="param")
63        if param is not None:
64            self.param = param
65        self.param1 = Parameter(initializer("zeros", [64, 64]), name="param1")
66        self.matmul = P.MatMul().shard(strategy1)
67        self.matmul1 = P.MatMul().shard(strategy2)
68
69    def construct(self, x):
70        out = self.matmul(x, self.param)
71        out = self.matmul1(out, self.param1)
72        return out
73
74
75class Net(nn.Cell):
76    def __init__(self, strategy1, strategy2, param=None):
77        super().__init__()
78        self.block = nn.CellList()
79        for i in range(2):
80            cell = MatMulCell(strategy1, strategy2, param)
81            cell.pipeline_stage = i
82            self.block.append(cell)
83
84    def construct(self, x):
85        for i in range(2):
86            x = self.block[i](x)
87        return x
88
89
90class PipelineSplit(nn.Cell):
91    def __init__(self, strategy1, strategy2):
92        super().__init__()
93        self.cell = Net(strategy1, strategy2)
94        self.cell.block[0].matmul.add_prim_attr("parameter_start", 0)
95
96    def construct(self, x, label):
97        x = self.cell(x)
98        return x
99
100
101class PipelineSplit2(nn.Cell):
102    def __init__(self, strategy1, strategy2):
103        super().__init__()
104        self.param = Parameter(initializer("zeros", [64, 64]), name="param")
105        self.cell = Net(strategy1, strategy2, self.param)
106        self.cell.block[0].matmul.add_prim_attr("parameter_start", 0)
107
108    def construct(self, x, label):
109        x = self.cell(x)
110        return x
111
112
113def test_pipeline_split_stage0():
114    context.set_auto_parallel_context(device_num=8, global_rank=0, pipeline_stages=2)
115    context.set_auto_parallel_context(parallel_mode="semi_auto_parallel")
116    data = Tensor(np.ones([32, 64]), dtype=ms.float32)
117    label = Tensor(np.ones([64, 64]), dtype=ms.float32)
118    strategy1 = ((4, 1), (1, 1))
119    strategy2 = ((2, 1), (1, 1))
120    net = PipelineCell(PipelineSplit(strategy1, strategy2), 4)
121    params = net.network.cell.block[0].trainable_params()
122    dataset = DatasetLenet(data, label, 3)
123    optimizer = nn.Lamb(params, learning_rate=0.01)
124    model = Model(net, optimizer=optimizer)
125    model.train(2, dataset, dataset_sink_mode=False)
126    for _, param in model._train_network.parameters_and_names():
127        assert param.name != "cell.block.1.param"
128        assert param.name != "cell.block.1.param1"
129
130def test_pipeline_split_stage1():
131    context.set_auto_parallel_context(device_num=8, global_rank=4, pipeline_stages=2)
132    context.set_auto_parallel_context(parallel_mode="semi_auto_parallel")
133    data = Tensor(np.ones([32, 64]), dtype=ms.float32)
134    label = Tensor(np.ones([64, 64]), dtype=ms.float32)
135    strategy1 = ((4, 1), (1, 1))
136    strategy2 = ((2, 1), (1, 1))
137    net = PipelineCell(PipelineSplit(strategy1, strategy2), 4)
138    params = net.network.cell.block[1].trainable_params()
139    dataset = DatasetLenet(data, label, 3)
140    optimizer = nn.Lamb(params, learning_rate=0.01)
141    model = Model(net, optimizer=optimizer)
142    model.train(2, dataset, dataset_sink_mode=False)
143    for _, param in model._train_network.parameters_and_names():
144        assert param.name != "cell.block.0.param"
145        assert param.name != "cell.block.0.param1"
146
147
148def test_pipeline_split_shared_parameter_stage0():
149    context.set_auto_parallel_context(device_num=8, global_rank=0, pipeline_stages=2)
150    context.set_auto_parallel_context(parallel_mode="semi_auto_parallel")
151    data = Tensor(np.ones([32, 64]), dtype=ms.float32)
152    label = Tensor(np.ones([64, 64]), dtype=ms.float32)
153    strategy1 = ((4, 1), (1, 1))
154    strategy2 = ((2, 1), (1, 1))
155    net = PipelineCell(PipelineSplit2(strategy1, strategy2), 4)
156    params = net.network.cell.block[0].trainable_params()
157    dataset = DatasetLenet(data, label, 3)
158    optimizer = nn.Lamb(params, learning_rate=0.01)
159    model = Model(net, optimizer=optimizer)
160    model.train(2, dataset, dataset_sink_mode=False)
161
162
163def test_pipeline_split_shared_parameter_stage1():
164    context.set_auto_parallel_context(device_num=8, global_rank=4, pipeline_stages=2)
165    context.set_auto_parallel_context(parallel_mode="semi_auto_parallel")
166    data = Tensor(np.ones([32, 64]), dtype=ms.float32)
167    label = Tensor(np.ones([64, 64]), dtype=ms.float32)
168    strategy1 = ((4, 1), (1, 1))
169    strategy2 = ((2, 1), (1, 1))
170    net = PipelineCell(PipelineSplit2(strategy1, strategy2), 4)
171    params = net.network.cell.block[1].trainable_params()
172    dataset = DatasetLenet(data, label, 3)
173    optimizer = nn.Lamb(params, learning_rate=0.01)
174    model = Model(net, optimizer=optimizer)
175    model.train(2, dataset, dataset_sink_mode=False)
176
177
178def test_pipeline_split_shared_parameter_stage0_predict():
179    context.set_auto_parallel_context(device_num=8, global_rank=0, pipeline_stages=2, full_batch=True)
180    context.set_auto_parallel_context(parallel_mode="semi_auto_parallel")
181    data = Tensor(np.ones([32, 64]), dtype=ms.float32)
182    label = Tensor(np.ones([64, 64]), dtype=ms.float32)
183    strategy1 = ((4, 1), (1, 1))
184    strategy2 = ((2, 1), (1, 1))
185    net = PipelineSplit2(strategy1, strategy2)
186    model = Model(net)
187    model.predict(data, label)
188
189
190def test_pipeline_split_shared_parameter_stage1_predict():
191    context.set_auto_parallel_context(device_num=8, global_rank=4, pipeline_stages=2, full_batch=True)
192    context.set_auto_parallel_context(parallel_mode="semi_auto_parallel")
193    data = Tensor(np.ones([32, 64]), dtype=ms.float32)
194    label = Tensor(np.ones([64, 64]), dtype=ms.float32)
195    strategy1 = ((4, 1), (1, 1))
196    strategy2 = ((2, 1), (1, 1))
197    net = PipelineSplit2(strategy1, strategy2)
198    model = Model(net)
199    model.predict(data, label)
200
201
202def test_pipeline_split_stage0_opt_shard():
203    context.set_auto_parallel_context(device_num=8, global_rank=0, pipeline_stages=2, enable_parallel_optimizer=True)
204    context.set_auto_parallel_context(parallel_mode="semi_auto_parallel")
205    data = Tensor(np.ones([32, 64]), dtype=ms.float32)
206    label = Tensor(np.ones([64, 64]), dtype=ms.float32)
207    strategy1 = ((4, 1), (1, 1))
208    strategy2 = ((2, 1), (1, 1))
209    net = PipelineCell(PipelineSplit(strategy1, strategy2), 4)
210    params = net.network.cell.block[0].trainable_params()
211    dataset = DatasetLenet(data, label, 3)
212    optimizer = nn.Lamb(params, learning_rate=0.01)
213    model = Model(net, optimizer=optimizer)
214    model.train(2, dataset, dataset_sink_mode=False)
215    for _, param in model._train_network.parameters_and_names():
216        assert param.name != "cell.block.1.param"
217        assert param.name != "cell.block.1.param1"
218
219
220def test_pipeline_split_stage1_opt_shard():
221    context.set_auto_parallel_context(device_num=8, global_rank=4, pipeline_stages=2, enable_parallel_optimizer=True)
222    context.set_auto_parallel_context(parallel_mode="semi_auto_parallel")
223    data = Tensor(np.ones([32, 64]), dtype=ms.float32)
224    label = Tensor(np.ones([64, 64]), dtype=ms.float32)
225    strategy1 = ((4, 1), (1, 1))
226    strategy2 = ((2, 1), (1, 1))
227    net = PipelineCell(PipelineSplit(strategy1, strategy2), 4)
228    params = net.network.cell.block[1].trainable_params()
229    dataset = DatasetLenet(data, label, 3)
230    optimizer = nn.Lamb(params, learning_rate=0.01)
231    model = Model(net, optimizer=optimizer)
232    model.train(2, dataset, dataset_sink_mode=False)
233    for _, param in model._train_network.parameters_and_names():
234        assert param.name != "cell.block.0.param"
235        assert param.name != "cell.block.0.param1"
236
237
238def test_pipeline_split_shared_parameter_stage0_opt_shard():
239    context.set_auto_parallel_context(device_num=8, global_rank=0, pipeline_stages=2, enable_parallel_optimizer=True)
240    context.set_auto_parallel_context(parallel_mode="semi_auto_parallel")
241    data = Tensor(np.ones([32, 64]), dtype=ms.float32)
242    label = Tensor(np.ones([64, 64]), dtype=ms.float32)
243    strategy1 = ((4, 1), (1, 1))
244    strategy2 = ((2, 1), (1, 1))
245    net = PipelineCell(PipelineSplit2(strategy1, strategy2), 4)
246    params = net.network.cell.block[0].trainable_params()
247    dataset = DatasetLenet(data, label, 3)
248    optimizer = nn.Lamb(params, learning_rate=0.01)
249    model = Model(net, optimizer=optimizer)
250    model.train(2, dataset, dataset_sink_mode=False)
251
252
253def test_pipeline_split_shared_parameter_stage1_opt_shard():
254    context.set_auto_parallel_context(device_num=8, global_rank=4, pipeline_stages=2, enable_parallel_optimizer=True)
255    context.set_auto_parallel_context(parallel_mode="semi_auto_parallel")
256    data = Tensor(np.ones([32, 64]), dtype=ms.float32)
257    label = Tensor(np.ones([64, 64]), dtype=ms.float32)
258    strategy1 = ((4, 1), (1, 1))
259    strategy2 = ((2, 1), (1, 1))
260    net = PipelineCell(PipelineSplit2(strategy1, strategy2), 4)
261    params = net.network.cell.block[1].trainable_params()
262    dataset = DatasetLenet(data, label, 3)
263    optimizer = nn.Lamb(params, learning_rate=0.01)
264    model = Model(net, optimizer=optimizer)
265    model.train(2, dataset, dataset_sink_mode=False)
266