• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 Huawei Technologies Co., Ltd
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ============================================================================
15
16import argparse
17import os
18import sys
19import ast
20from time import time
21import numpy as np
22from mindspore import context, Tensor
23from mindspore.train.serialization import save_checkpoint, load_checkpoint
24from src.adam import AdamWeightDecayOp as AdamWeightDecay
25from src.config import train_cfg, server_net_cfg
26from src.utils import restore_params
27from src.model import AlbertModelCLS
28from src.cell_wrapper import NetworkWithCLSLoss, NetworkTrainCell
29
30
31def parse_args():
32    """
33    parse args
34    """
35    parser = argparse.ArgumentParser(description='server task')
36    parser.add_argument('--device_target', type=str, default='GPU', choices=['Ascend', 'GPU', 'CPU'])
37    parser.add_argument('--device_id', type=str, default='0')
38    parser.add_argument('--tokenizer_dir', type=str, default='../model_save/init/')
39    parser.add_argument('--server_data_path', type=str, default='../datasets/semi_supervise/server/train.txt')
40    parser.add_argument('--model_path', type=str, default='../model_save/init/albert_init.ckpt')
41    parser.add_argument('--output_dir', type=str, default='../model_save/train_server/')
42    parser.add_argument('--vocab_map_ids_path', type=str, default='../model_save/init/vocab_map_ids.txt')
43    parser.add_argument('--logging_step', type=int, default=1)
44
45    parser.add_argument("--server_mode", type=str, default="FEDERATED_LEARNING")
46    parser.add_argument("--ms_role", type=str, default="MS_WORKER")
47    parser.add_argument("--worker_num", type=int, default=0)
48    parser.add_argument("--server_num", type=int, default=1)
49    parser.add_argument("--scheduler_ip", type=str, default="127.0.0.1")
50    parser.add_argument("--scheduler_port", type=int, default=8113)
51    parser.add_argument("--fl_server_port", type=int, default=6666)
52    parser.add_argument("--start_fl_job_threshold", type=int, default=1)
53    parser.add_argument("--start_fl_job_time_window", type=int, default=3000)
54    parser.add_argument("--update_model_ratio", type=float, default=1.0)
55    parser.add_argument("--update_model_time_window", type=int, default=3000)
56    parser.add_argument("--fl_name", type=str, default="Lenet")
57    parser.add_argument("--fl_iteration_num", type=int, default=25)
58    parser.add_argument("--client_epoch_num", type=int, default=20)
59    parser.add_argument("--client_batch_size", type=int, default=32)
60    parser.add_argument("--client_learning_rate", type=float, default=0.1)
61    parser.add_argument("--worker_step_num_per_iteration", type=int, default=65)
62    parser.add_argument("--scheduler_manage_port", type=int, default=11202)
63    parser.add_argument("--dp_eps", type=float, default=50.0)
64    parser.add_argument("--dp_delta", type=float, default=0.01)  # usually equals 1/start_fl_job_threshold
65    parser.add_argument("--dp_norm_clip", type=float, default=1.0)
66    parser.add_argument("--encrypt_type", type=str, default="NOT_ENCRYPT")
67    parser.add_argument("--share_secrets_ratio", type=float, default=1.0)
68    parser.add_argument("--cipher_time_window", type=int, default=300000)
69    parser.add_argument("--reconstruct_secrets_threshold", type=int, default=3)
70    parser.add_argument("--client_password", type=str, default="")
71    parser.add_argument("--server_password", type=str, default="")
72    parser.add_argument("--enable_ssl", type=ast.literal_eval, default=False)
73    return parser.parse_args()
74
75
76def server_train(args):
77    start = time()
78
79    os.environ['CUDA_VISIBLE_DEVICES'] = args.device_id
80    model_path = args.model_path
81    output_dir = args.output_dir
82
83    device_target = args.device_target
84    server_mode = args.server_mode
85    ms_role = args.ms_role
86    worker_num = args.worker_num
87    server_num = args.server_num
88    scheduler_ip = args.scheduler_ip
89    scheduler_port = args.scheduler_port
90    fl_server_port = args.fl_server_port
91    start_fl_job_threshold = args.start_fl_job_threshold
92    start_fl_job_time_window = args.start_fl_job_time_window
93    update_model_ratio = args.update_model_ratio
94    update_model_time_window = args.update_model_time_window
95    fl_name = args.fl_name
96    fl_iteration_num = args.fl_iteration_num
97    client_epoch_num = args.client_epoch_num
98    client_batch_size = args.client_batch_size
99    client_learning_rate = args.client_learning_rate
100    scheduler_manage_port = args.scheduler_manage_port
101    dp_delta = args.dp_delta
102    dp_norm_clip = args.dp_norm_clip
103    encrypt_type = args.encrypt_type
104    share_secrets_ratio = args.share_secrets_ratio
105    cipher_time_window = args.cipher_time_window
106    reconstruct_secrets_threshold = args.reconstruct_secrets_threshold
107    client_password = args.client_password
108    server_password = args.server_password
109    enable_ssl = args.enable_ssl
110
111    # Replace some parameters with federated learning parameters.
112    train_cfg.max_global_epoch = fl_iteration_num
113
114    fl_ctx = {
115        "enable_fl": True,
116        "server_mode": server_mode,
117        "ms_role": ms_role,
118        "worker_num": worker_num,
119        "server_num": server_num,
120        "scheduler_ip": scheduler_ip,
121        "scheduler_port": scheduler_port,
122        "fl_server_port": fl_server_port,
123        "start_fl_job_threshold": start_fl_job_threshold,
124        "start_fl_job_time_window": start_fl_job_time_window,
125        "update_model_ratio": update_model_ratio,
126        "update_model_time_window": update_model_time_window,
127        "fl_name": fl_name,
128        "fl_iteration_num": fl_iteration_num,
129        "client_epoch_num": client_epoch_num,
130        "client_batch_size": client_batch_size,
131        "client_learning_rate": client_learning_rate,
132        "scheduler_manage_port": scheduler_manage_port,
133        "dp_delta": dp_delta,
134        "dp_norm_clip": dp_norm_clip,
135        "encrypt_type": encrypt_type,
136        "share_secrets_ratio": share_secrets_ratio,
137        "cipher_time_window": cipher_time_window,
138        "reconstruct_secrets_threshold": reconstruct_secrets_threshold,
139        "client_password": client_password,
140        "server_password": server_password,
141        "enable_ssl": enable_ssl
142    }
143
144    if not os.path.exists(output_dir):
145        os.makedirs(output_dir)
146
147    # mindspore context
148    context.set_context(mode=context.GRAPH_MODE, device_target=device_target)
149    context.set_fl_context(**fl_ctx)
150    print('Context setting is done! Time cost: {}'.format(time() - start))
151    sys.stdout.flush()
152    start = time()
153
154    # construct model
155    albert_model_cls = AlbertModelCLS(server_net_cfg)
156    network_with_cls_loss = NetworkWithCLSLoss(albert_model_cls)
157    network_with_cls_loss.set_train(True)
158
159    print('Model construction is done! Time cost: {}'.format(time() - start))
160    sys.stdout.flush()
161    start = time()
162
163    # train prepare
164    param_dict = load_checkpoint(model_path)
165    if 'learning_rate' in param_dict:
166        del param_dict['learning_rate']
167
168    # server optimizer
169    server_params = [_ for _ in network_with_cls_loss.trainable_params()]
170    server_decay_params = list(
171        filter(train_cfg.optimizer_cfg.AdamWeightDecay.decay_filter, server_params)
172    )
173    server_other_params = list(
174        filter(lambda x: not train_cfg.optimizer_cfg.AdamWeightDecay.decay_filter(x), server_params)
175    )
176    server_group_params = [
177        {'params': server_decay_params, 'weight_decay': train_cfg.optimizer_cfg.AdamWeightDecay.weight_decay},
178        {'params': server_other_params, 'weight_decay': 0.0},
179        {'order_params': server_params}
180    ]
181    server_optimizer = AdamWeightDecay(server_group_params,
182                                       learning_rate=train_cfg.server_cfg.learning_rate,
183                                       eps=train_cfg.optimizer_cfg.AdamWeightDecay.eps)
184    server_network_train_cell = NetworkTrainCell(network_with_cls_loss, optimizer=server_optimizer)
185
186    restore_params(server_network_train_cell, param_dict)
187
188    print('Optimizer construction is done! Time cost: {}'.format(time() - start))
189    sys.stdout.flush()
190    start = time()
191
192    # train process
193    for _ in range(1):
194        input_ids = Tensor(np.zeros((train_cfg.batch_size, server_net_cfg.seq_length), np.int32))
195        attention_mask = Tensor(np.zeros((train_cfg.batch_size, server_net_cfg.seq_length), np.int32))
196        token_type_ids = Tensor(np.zeros((train_cfg.batch_size, server_net_cfg.seq_length), np.int32))
197        label_ids = Tensor(np.zeros((train_cfg.batch_size,), np.int32))
198        model_start_time = time()
199        cls_loss = server_network_train_cell(input_ids, attention_mask, token_type_ids, label_ids)
200        time_cost = time() - model_start_time
201        print('server: cls_loss {} time_cost {}'.format(cls_loss, time_cost))
202        sys.stdout.flush()
203        del input_ids, attention_mask, token_type_ids, label_ids, cls_loss
204        output_path = os.path.join(output_dir, 'final.ckpt')
205        save_checkpoint(server_network_train_cell.network, output_path)
206
207    print('Training process is done! Time cost: {}'.format(time() - start))
208
209
210if __name__ == '__main__':
211    args_opt = parse_args()
212    server_train(args_opt)
213