1# Copyright 2020 Huawei Technologies Co., Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================ 15import numpy as np 16import mindspore as ms 17import mindspore.nn as nn 18from mindspore import context 19from mindspore import Tensor 20from mindspore.ops import operations as P 21from mindspore.common.parameter import Parameter 22from mindspore.common.initializer import initializer 23from mindspore.train.model import Model 24from mindspore.nn.wrap.cell_wrapper import PipelineCell 25 26 27class DatasetLenet(): 28 def __init__(self, data, label, length=3): 29 self.data = data 30 self.label = label 31 self.index = 1 32 self.length = length 33 34 def __iter__(self): 35 return self 36 37 def __next__(self): 38 if self.index >= self.length: 39 raise StopIteration 40 self.index += 1 41 return self.data, self.label 42 43 def reset(self): 44 self.index = 0 45 46 def get_dataset_size(self): 47 return 32 48 49 def get_repeat_count(self): 50 return 1 51 52 def get_batch_size(self): 53 return 32 54 55 def create_tuple_iterator(self, num_epochs=1, do_copy=True): 56 return self 57 58 59class MatMulCell(nn.Cell): 60 def __init__(self, strategy1, strategy2, param=None): 61 super().__init__() 62 self.param = Parameter(initializer("zeros", [64, 64]), name="param") 63 if param is not None: 64 self.param = param 65 self.param1 = Parameter(initializer("zeros", [64, 64]), name="param1") 66 self.matmul = P.MatMul().shard(strategy1) 67 self.matmul1 = P.MatMul().shard(strategy2) 68 69 def construct(self, x): 70 out = self.matmul(x, self.param) 71 out = self.matmul1(out, self.param1) 72 return out 73 74 75class Net(nn.Cell): 76 def __init__(self, strategy1, strategy2, param=None): 77 super().__init__() 78 self.block = nn.CellList() 79 for i in range(2): 80 cell = MatMulCell(strategy1, strategy2, param) 81 cell.pipeline_stage = i 82 self.block.append(cell) 83 84 def construct(self, x): 85 for i in range(2): 86 x = self.block[i](x) 87 return x 88 89 90class PipelineSplit(nn.Cell): 91 def __init__(self, strategy1, strategy2): 92 super().__init__() 93 self.cell = Net(strategy1, strategy2) 94 self.cell.block[0].matmul.add_prim_attr("parameter_start", 0) 95 96 def construct(self, x, label): 97 x = self.cell(x) 98 return x 99 100 101class PipelineSplit2(nn.Cell): 102 def __init__(self, strategy1, strategy2): 103 super().__init__() 104 self.param = Parameter(initializer("zeros", [64, 64]), name="param") 105 self.cell = Net(strategy1, strategy2, self.param) 106 self.cell.block[0].matmul.add_prim_attr("parameter_start", 0) 107 108 def construct(self, x, label): 109 x = self.cell(x) 110 return x 111 112 113def test_pipeline_split_stage0(): 114 context.set_auto_parallel_context(device_num=8, global_rank=0, pipeline_stages=2) 115 context.set_auto_parallel_context(parallel_mode="semi_auto_parallel") 116 data = Tensor(np.ones([32, 64]), dtype=ms.float32) 117 label = Tensor(np.ones([64, 64]), dtype=ms.float32) 118 strategy1 = ((4, 1), (1, 1)) 119 strategy2 = ((2, 1), (1, 1)) 120 net = PipelineCell(PipelineSplit(strategy1, strategy2), 4) 121 params = net.network.cell.block[0].trainable_params() 122 dataset = DatasetLenet(data, label, 3) 123 optimizer = nn.Lamb(params, learning_rate=0.01) 124 model = Model(net, optimizer=optimizer) 125 model.train(2, dataset, dataset_sink_mode=False) 126 for _, param in model._train_network.parameters_and_names(): 127 assert param.name != "cell.block.1.param" 128 assert param.name != "cell.block.1.param1" 129 130def test_pipeline_split_stage1(): 131 context.set_auto_parallel_context(device_num=8, global_rank=4, pipeline_stages=2) 132 context.set_auto_parallel_context(parallel_mode="semi_auto_parallel") 133 data = Tensor(np.ones([32, 64]), dtype=ms.float32) 134 label = Tensor(np.ones([64, 64]), dtype=ms.float32) 135 strategy1 = ((4, 1), (1, 1)) 136 strategy2 = ((2, 1), (1, 1)) 137 net = PipelineCell(PipelineSplit(strategy1, strategy2), 4) 138 params = net.network.cell.block[1].trainable_params() 139 dataset = DatasetLenet(data, label, 3) 140 optimizer = nn.Lamb(params, learning_rate=0.01) 141 model = Model(net, optimizer=optimizer) 142 model.train(2, dataset, dataset_sink_mode=False) 143 for _, param in model._train_network.parameters_and_names(): 144 assert param.name != "cell.block.0.param" 145 assert param.name != "cell.block.0.param1" 146 147 148def test_pipeline_split_shared_parameter_stage0(): 149 context.set_auto_parallel_context(device_num=8, global_rank=0, pipeline_stages=2) 150 context.set_auto_parallel_context(parallel_mode="semi_auto_parallel") 151 data = Tensor(np.ones([32, 64]), dtype=ms.float32) 152 label = Tensor(np.ones([64, 64]), dtype=ms.float32) 153 strategy1 = ((4, 1), (1, 1)) 154 strategy2 = ((2, 1), (1, 1)) 155 net = PipelineCell(PipelineSplit2(strategy1, strategy2), 4) 156 params = net.network.cell.block[0].trainable_params() 157 dataset = DatasetLenet(data, label, 3) 158 optimizer = nn.Lamb(params, learning_rate=0.01) 159 model = Model(net, optimizer=optimizer) 160 model.train(2, dataset, dataset_sink_mode=False) 161 162 163def test_pipeline_split_shared_parameter_stage1(): 164 context.set_auto_parallel_context(device_num=8, global_rank=4, pipeline_stages=2) 165 context.set_auto_parallel_context(parallel_mode="semi_auto_parallel") 166 data = Tensor(np.ones([32, 64]), dtype=ms.float32) 167 label = Tensor(np.ones([64, 64]), dtype=ms.float32) 168 strategy1 = ((4, 1), (1, 1)) 169 strategy2 = ((2, 1), (1, 1)) 170 net = PipelineCell(PipelineSplit2(strategy1, strategy2), 4) 171 params = net.network.cell.block[1].trainable_params() 172 dataset = DatasetLenet(data, label, 3) 173 optimizer = nn.Lamb(params, learning_rate=0.01) 174 model = Model(net, optimizer=optimizer) 175 model.train(2, dataset, dataset_sink_mode=False) 176 177 178def test_pipeline_split_shared_parameter_stage0_predict(): 179 context.set_auto_parallel_context(device_num=8, global_rank=0, pipeline_stages=2, full_batch=True) 180 context.set_auto_parallel_context(parallel_mode="semi_auto_parallel") 181 data = Tensor(np.ones([32, 64]), dtype=ms.float32) 182 label = Tensor(np.ones([64, 64]), dtype=ms.float32) 183 strategy1 = ((4, 1), (1, 1)) 184 strategy2 = ((2, 1), (1, 1)) 185 net = PipelineSplit2(strategy1, strategy2) 186 model = Model(net) 187 model.predict(data, label) 188 189 190def test_pipeline_split_shared_parameter_stage1_predict(): 191 context.set_auto_parallel_context(device_num=8, global_rank=4, pipeline_stages=2, full_batch=True) 192 context.set_auto_parallel_context(parallel_mode="semi_auto_parallel") 193 data = Tensor(np.ones([32, 64]), dtype=ms.float32) 194 label = Tensor(np.ones([64, 64]), dtype=ms.float32) 195 strategy1 = ((4, 1), (1, 1)) 196 strategy2 = ((2, 1), (1, 1)) 197 net = PipelineSplit2(strategy1, strategy2) 198 model = Model(net) 199 model.predict(data, label) 200 201 202def test_pipeline_split_stage0_opt_shard(): 203 context.set_auto_parallel_context(device_num=8, global_rank=0, pipeline_stages=2, enable_parallel_optimizer=True) 204 context.set_auto_parallel_context(parallel_mode="semi_auto_parallel") 205 data = Tensor(np.ones([32, 64]), dtype=ms.float32) 206 label = Tensor(np.ones([64, 64]), dtype=ms.float32) 207 strategy1 = ((4, 1), (1, 1)) 208 strategy2 = ((2, 1), (1, 1)) 209 net = PipelineCell(PipelineSplit(strategy1, strategy2), 4) 210 params = net.network.cell.block[0].trainable_params() 211 dataset = DatasetLenet(data, label, 3) 212 optimizer = nn.Lamb(params, learning_rate=0.01) 213 model = Model(net, optimizer=optimizer) 214 model.train(2, dataset, dataset_sink_mode=False) 215 for _, param in model._train_network.parameters_and_names(): 216 assert param.name != "cell.block.1.param" 217 assert param.name != "cell.block.1.param1" 218 219 220def test_pipeline_split_stage1_opt_shard(): 221 context.set_auto_parallel_context(device_num=8, global_rank=4, pipeline_stages=2, enable_parallel_optimizer=True) 222 context.set_auto_parallel_context(parallel_mode="semi_auto_parallel") 223 data = Tensor(np.ones([32, 64]), dtype=ms.float32) 224 label = Tensor(np.ones([64, 64]), dtype=ms.float32) 225 strategy1 = ((4, 1), (1, 1)) 226 strategy2 = ((2, 1), (1, 1)) 227 net = PipelineCell(PipelineSplit(strategy1, strategy2), 4) 228 params = net.network.cell.block[1].trainable_params() 229 dataset = DatasetLenet(data, label, 3) 230 optimizer = nn.Lamb(params, learning_rate=0.01) 231 model = Model(net, optimizer=optimizer) 232 model.train(2, dataset, dataset_sink_mode=False) 233 for _, param in model._train_network.parameters_and_names(): 234 assert param.name != "cell.block.0.param" 235 assert param.name != "cell.block.0.param1" 236 237 238def test_pipeline_split_shared_parameter_stage0_opt_shard(): 239 context.set_auto_parallel_context(device_num=8, global_rank=0, pipeline_stages=2, enable_parallel_optimizer=True) 240 context.set_auto_parallel_context(parallel_mode="semi_auto_parallel") 241 data = Tensor(np.ones([32, 64]), dtype=ms.float32) 242 label = Tensor(np.ones([64, 64]), dtype=ms.float32) 243 strategy1 = ((4, 1), (1, 1)) 244 strategy2 = ((2, 1), (1, 1)) 245 net = PipelineCell(PipelineSplit2(strategy1, strategy2), 4) 246 params = net.network.cell.block[0].trainable_params() 247 dataset = DatasetLenet(data, label, 3) 248 optimizer = nn.Lamb(params, learning_rate=0.01) 249 model = Model(net, optimizer=optimizer) 250 model.train(2, dataset, dataset_sink_mode=False) 251 252 253def test_pipeline_split_shared_parameter_stage1_opt_shard(): 254 context.set_auto_parallel_context(device_num=8, global_rank=4, pipeline_stages=2, enable_parallel_optimizer=True) 255 context.set_auto_parallel_context(parallel_mode="semi_auto_parallel") 256 data = Tensor(np.ones([32, 64]), dtype=ms.float32) 257 label = Tensor(np.ones([64, 64]), dtype=ms.float32) 258 strategy1 = ((4, 1), (1, 1)) 259 strategy2 = ((2, 1), (1, 1)) 260 net = PipelineCell(PipelineSplit2(strategy1, strategy2), 4) 261 params = net.network.cell.block[1].trainable_params() 262 dataset = DatasetLenet(data, label, 3) 263 optimizer = nn.Lamb(params, learning_rate=0.01) 264 model = Model(net, optimizer=optimizer) 265 model.train(2, dataset, dataset_sink_mode=False) 266