1# Copyright 2020-2021 Huawei Technologies Co., Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================ 15"""Writes events to disk in a logdir.""" 16import os 17import stat 18from urllib.parse import quote 19from shutil import disk_usage 20 21import numpy as np 22 23from mindspore.train.summary.enums import PluginEnum, WriterPluginEnum 24from mindspore import log as logger 25 26from .._utils import _make_directory 27from ._summary_adapter import package_init_event 28from ..._c_expression import security 29if not security.enable_security(): 30 from ..._c_expression import EventWriter_ 31 32 33FREE_DISK_SPACE_TIMES = 32 34FILE_MODE = 0o400 35 36 37class BaseWriter: 38 """BaseWriter to be subclass.""" 39 40 def __init__(self, filepath, max_file_size=None) -> None: 41 self._filepath, self._max_file_size = filepath, max_file_size 42 self._writer: EventWriter_ = None 43 44 def init_writer(self): 45 """Write some metadata etc.""" 46 47 @property 48 def writer(self): 49 """Get the writer.""" 50 if self._writer is not None: 51 return self._writer 52 53 with open(self._filepath, 'w'): 54 os.chmod(self._filepath, stat.S_IWUSR | stat.S_IRUSR) 55 self._writer = EventWriter_(self._filepath) 56 self.init_writer() 57 return self._writer 58 59 def write(self, plugin, data): 60 """Write data to file.""" 61 # 8: data length 62 # 4: crc32 of data length 63 # 4: crc32 of data 64 metadata_length = 8 + 4 + 4 65 required_length = len(data) + metadata_length 66 if self.writer and disk_usage(self._filepath).free < required_length * FREE_DISK_SPACE_TIMES: 67 raise RuntimeError(f"The disk space may be soon exhausted by the '{self._filepath}'.") 68 if self._max_file_size is None: 69 self.writer.Write(data) 70 elif self._max_file_size >= required_length: 71 self._max_file_size -= required_length 72 self.writer.Write(data) 73 else: 74 raise RuntimeWarning(f"'max_file_size' reached: There are {self._max_file_size} bytes remaining, " 75 f"but the '{self._filepath}' requires to write {required_length} bytes.") 76 77 def flush(self): 78 """Flush the writer.""" 79 if self._writer is not None: 80 self._writer.Flush() 81 82 def close(self): 83 """Close the writer.""" 84 try: 85 os.chmod(self._filepath, FILE_MODE) 86 except FileNotFoundError: 87 logger.debug("The summary file %r has been removed.", self._filepath) 88 if self._writer is not None: 89 self._writer.Shut() 90 91 92class SummaryWriter(BaseWriter): 93 """SummaryWriter for write summaries.""" 94 95 def init_writer(self): 96 """Write some metadata etc.""" 97 self.write(WriterPluginEnum.SUMMARY.value, package_init_event().SerializeToString()) 98 99 def write(self, plugin, data): 100 """Write data to file.""" 101 if plugin in (WriterPluginEnum.SUMMARY.value, PluginEnum.GRAPH.value): 102 super().write(plugin, data) 103 104 105class LineageWriter(BaseWriter): 106 """LineageWriter for write lineage.""" 107 108 def write(self, plugin, data): 109 """Write data to file.""" 110 if plugin in (PluginEnum.DATASET_GRAPH.value, PluginEnum.TRAIN_LINEAGE.value, PluginEnum.EVAL_LINEAGE.value, 111 PluginEnum.CUSTOM_LINEAGE_DATA.value): 112 super().write(plugin, data) 113 114 115class ExplainWriter(BaseWriter): 116 """ExplainWriter for write explain data.""" 117 118 def write(self, plugin, data): 119 """Write data to file.""" 120 if plugin == WriterPluginEnum.EXPLAINER.value: 121 super().write(plugin, data) 122 123 124class ExportWriter(BaseWriter): 125 """ExportWriter for export data.""" 126 127 def write(self, plugin, data): 128 """Write data to file.""" 129 if plugin == WriterPluginEnum.EXPORTER.value: 130 self.export_data(data, data.get('export_option')) 131 132 def flush(self): 133 """Flush the writer.""" 134 135 def close(self): 136 """Close the writer.""" 137 138 def export_data(self, data, export_option): 139 """ 140 export the tensor data. 141 142 Args: 143 data (dict): Export data info. 144 export_option (Union[None, str]): The export options. 145 """ 146 options = { 147 'npy': self._export_npy 148 } 149 150 if export_option in options: 151 options[export_option](data, self._filepath) 152 153 @staticmethod 154 def _export_npy(data, export_dir): 155 """ 156 export the tensor data as npy. 157 158 Args: 159 data (dict): Export data info. 160 export_dir (str): The path of export dir. 161 """ 162 tag = quote(data.get('tag'), safe="") 163 step = int(data.get('step')) 164 np_value = data.get('value') 165 path = _make_directory(os.path.join(export_dir, 'tensor')) 166 167 # 128 is the typical length of header of npy file 168 metadata_length = 128 169 required_length = np_value.nbytes + metadata_length 170 if disk_usage(path).free < required_length * FREE_DISK_SPACE_TIMES: 171 raise RuntimeError(f"The disk space may be soon exhausted by the '{path}'.") 172 173 np_path = "{}/{}_{}.npy".format(path, tag, step) 174 np.save(np_path, np_value) 175 os.chmod(np_path, FILE_MODE) 176