1# Copyright 2021 Huawei Technologies Co., Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================ 15 16import argparse 17import os 18import sys 19import ast 20from time import time 21import numpy as np 22from mindspore import context, Tensor 23from mindspore.train.serialization import save_checkpoint, load_checkpoint 24from src.adam import AdamWeightDecayOp as AdamWeightDecay 25from src.config import train_cfg, server_net_cfg 26from src.utils import restore_params 27from src.model import AlbertModelCLS 28from src.cell_wrapper import NetworkWithCLSLoss, NetworkTrainCell 29 30 31def parse_args(): 32 """ 33 parse args 34 """ 35 parser = argparse.ArgumentParser(description='server task') 36 parser.add_argument('--device_target', type=str, default='GPU', choices=['Ascend', 'GPU', 'CPU']) 37 parser.add_argument('--device_id', type=str, default='0') 38 parser.add_argument('--tokenizer_dir', type=str, default='../model_save/init/') 39 parser.add_argument('--server_data_path', type=str, default='../datasets/semi_supervise/server/train.txt') 40 parser.add_argument('--model_path', type=str, default='../model_save/init/albert_init.ckpt') 41 parser.add_argument('--output_dir', type=str, default='../model_save/train_server/') 42 parser.add_argument('--vocab_map_ids_path', type=str, default='../model_save/init/vocab_map_ids.txt') 43 parser.add_argument('--logging_step', type=int, default=1) 44 45 parser.add_argument("--server_mode", type=str, default="FEDERATED_LEARNING") 46 parser.add_argument("--ms_role", type=str, default="MS_WORKER") 47 parser.add_argument("--worker_num", type=int, default=0) 48 parser.add_argument("--server_num", type=int, default=1) 49 parser.add_argument("--scheduler_ip", type=str, default="127.0.0.1") 50 parser.add_argument("--scheduler_port", type=int, default=8113) 51 parser.add_argument("--fl_server_port", type=int, default=6666) 52 parser.add_argument("--start_fl_job_threshold", type=int, default=1) 53 parser.add_argument("--start_fl_job_time_window", type=int, default=3000) 54 parser.add_argument("--update_model_ratio", type=float, default=1.0) 55 parser.add_argument("--update_model_time_window", type=int, default=3000) 56 parser.add_argument("--fl_name", type=str, default="Lenet") 57 parser.add_argument("--fl_iteration_num", type=int, default=25) 58 parser.add_argument("--client_epoch_num", type=int, default=20) 59 parser.add_argument("--client_batch_size", type=int, default=32) 60 parser.add_argument("--client_learning_rate", type=float, default=0.1) 61 parser.add_argument("--worker_step_num_per_iteration", type=int, default=65) 62 parser.add_argument("--scheduler_manage_port", type=int, default=11202) 63 parser.add_argument("--dp_eps", type=float, default=50.0) 64 parser.add_argument("--dp_delta", type=float, default=0.01) # usually equals 1/start_fl_job_threshold 65 parser.add_argument("--dp_norm_clip", type=float, default=1.0) 66 parser.add_argument("--encrypt_type", type=str, default="NOT_ENCRYPT") 67 parser.add_argument("--share_secrets_ratio", type=float, default=1.0) 68 parser.add_argument("--cipher_time_window", type=int, default=300000) 69 parser.add_argument("--reconstruct_secrets_threshold", type=int, default=3) 70 parser.add_argument("--client_password", type=str, default="") 71 parser.add_argument("--server_password", type=str, default="") 72 parser.add_argument("--enable_ssl", type=ast.literal_eval, default=False) 73 return parser.parse_args() 74 75 76def server_train(args): 77 start = time() 78 79 os.environ['CUDA_VISIBLE_DEVICES'] = args.device_id 80 model_path = args.model_path 81 output_dir = args.output_dir 82 83 device_target = args.device_target 84 server_mode = args.server_mode 85 ms_role = args.ms_role 86 worker_num = args.worker_num 87 server_num = args.server_num 88 scheduler_ip = args.scheduler_ip 89 scheduler_port = args.scheduler_port 90 fl_server_port = args.fl_server_port 91 start_fl_job_threshold = args.start_fl_job_threshold 92 start_fl_job_time_window = args.start_fl_job_time_window 93 update_model_ratio = args.update_model_ratio 94 update_model_time_window = args.update_model_time_window 95 fl_name = args.fl_name 96 fl_iteration_num = args.fl_iteration_num 97 client_epoch_num = args.client_epoch_num 98 client_batch_size = args.client_batch_size 99 client_learning_rate = args.client_learning_rate 100 scheduler_manage_port = args.scheduler_manage_port 101 dp_delta = args.dp_delta 102 dp_norm_clip = args.dp_norm_clip 103 encrypt_type = args.encrypt_type 104 share_secrets_ratio = args.share_secrets_ratio 105 cipher_time_window = args.cipher_time_window 106 reconstruct_secrets_threshold = args.reconstruct_secrets_threshold 107 client_password = args.client_password 108 server_password = args.server_password 109 enable_ssl = args.enable_ssl 110 111 # Replace some parameters with federated learning parameters. 112 train_cfg.max_global_epoch = fl_iteration_num 113 114 fl_ctx = { 115 "enable_fl": True, 116 "server_mode": server_mode, 117 "ms_role": ms_role, 118 "worker_num": worker_num, 119 "server_num": server_num, 120 "scheduler_ip": scheduler_ip, 121 "scheduler_port": scheduler_port, 122 "fl_server_port": fl_server_port, 123 "start_fl_job_threshold": start_fl_job_threshold, 124 "start_fl_job_time_window": start_fl_job_time_window, 125 "update_model_ratio": update_model_ratio, 126 "update_model_time_window": update_model_time_window, 127 "fl_name": fl_name, 128 "fl_iteration_num": fl_iteration_num, 129 "client_epoch_num": client_epoch_num, 130 "client_batch_size": client_batch_size, 131 "client_learning_rate": client_learning_rate, 132 "scheduler_manage_port": scheduler_manage_port, 133 "dp_delta": dp_delta, 134 "dp_norm_clip": dp_norm_clip, 135 "encrypt_type": encrypt_type, 136 "share_secrets_ratio": share_secrets_ratio, 137 "cipher_time_window": cipher_time_window, 138 "reconstruct_secrets_threshold": reconstruct_secrets_threshold, 139 "client_password": client_password, 140 "server_password": server_password, 141 "enable_ssl": enable_ssl 142 } 143 144 if not os.path.exists(output_dir): 145 os.makedirs(output_dir) 146 147 # mindspore context 148 context.set_context(mode=context.GRAPH_MODE, device_target=device_target) 149 context.set_fl_context(**fl_ctx) 150 print('Context setting is done! Time cost: {}'.format(time() - start)) 151 sys.stdout.flush() 152 start = time() 153 154 # construct model 155 albert_model_cls = AlbertModelCLS(server_net_cfg) 156 network_with_cls_loss = NetworkWithCLSLoss(albert_model_cls) 157 network_with_cls_loss.set_train(True) 158 159 print('Model construction is done! Time cost: {}'.format(time() - start)) 160 sys.stdout.flush() 161 start = time() 162 163 # train prepare 164 param_dict = load_checkpoint(model_path) 165 if 'learning_rate' in param_dict: 166 del param_dict['learning_rate'] 167 168 # server optimizer 169 server_params = [_ for _ in network_with_cls_loss.trainable_params()] 170 server_decay_params = list( 171 filter(train_cfg.optimizer_cfg.AdamWeightDecay.decay_filter, server_params) 172 ) 173 server_other_params = list( 174 filter(lambda x: not train_cfg.optimizer_cfg.AdamWeightDecay.decay_filter(x), server_params) 175 ) 176 server_group_params = [ 177 {'params': server_decay_params, 'weight_decay': train_cfg.optimizer_cfg.AdamWeightDecay.weight_decay}, 178 {'params': server_other_params, 'weight_decay': 0.0}, 179 {'order_params': server_params} 180 ] 181 server_optimizer = AdamWeightDecay(server_group_params, 182 learning_rate=train_cfg.server_cfg.learning_rate, 183 eps=train_cfg.optimizer_cfg.AdamWeightDecay.eps) 184 server_network_train_cell = NetworkTrainCell(network_with_cls_loss, optimizer=server_optimizer) 185 186 restore_params(server_network_train_cell, param_dict) 187 188 print('Optimizer construction is done! Time cost: {}'.format(time() - start)) 189 sys.stdout.flush() 190 start = time() 191 192 # train process 193 for _ in range(1): 194 input_ids = Tensor(np.zeros((train_cfg.batch_size, server_net_cfg.seq_length), np.int32)) 195 attention_mask = Tensor(np.zeros((train_cfg.batch_size, server_net_cfg.seq_length), np.int32)) 196 token_type_ids = Tensor(np.zeros((train_cfg.batch_size, server_net_cfg.seq_length), np.int32)) 197 label_ids = Tensor(np.zeros((train_cfg.batch_size,), np.int32)) 198 model_start_time = time() 199 cls_loss = server_network_train_cell(input_ids, attention_mask, token_type_ids, label_ids) 200 time_cost = time() - model_start_time 201 print('server: cls_loss {} time_cost {}'.format(cls_loss, time_cost)) 202 sys.stdout.flush() 203 del input_ids, attention_mask, token_type_ids, label_ids, cls_loss 204 output_path = os.path.join(output_dir, 'final.ckpt') 205 save_checkpoint(server_network_train_cell.network, output_path) 206 207 print('Training process is done! Time cost: {}'.format(time() - start)) 208 209 210if __name__ == '__main__': 211 args_opt = parse_args() 212 server_train(args_opt) 213