• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020-2021 Huawei Technologies Co., Ltd
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ============================================================================
15"""Writes events to disk in a logdir."""
16import os
17import stat
18from urllib.parse import quote
19from shutil import disk_usage
20
21import numpy as np
22
23from mindspore.train.summary.enums import PluginEnum, WriterPluginEnum
24from mindspore import log as logger
25
26from .._utils import _make_directory
27from ._summary_adapter import package_init_event
28from ..._c_expression import security
29if not security.enable_security():
30    from ..._c_expression import EventWriter_
31
32
33FREE_DISK_SPACE_TIMES = 32
34FILE_MODE = 0o400
35
36
37class BaseWriter:
38    """BaseWriter to be subclass."""
39
40    def __init__(self, filepath, max_file_size=None) -> None:
41        self._filepath, self._max_file_size = filepath, max_file_size
42        self._writer: EventWriter_ = None
43
44    def init_writer(self):
45        """Write some metadata etc."""
46
47    @property
48    def writer(self):
49        """Get the writer."""
50        if self._writer is not None:
51            return self._writer
52
53        with open(self._filepath, 'w'):
54            os.chmod(self._filepath, stat.S_IWUSR | stat.S_IRUSR)
55        self._writer = EventWriter_(self._filepath)
56        self.init_writer()
57        return self._writer
58
59    def write(self, plugin, data):
60        """Write data to file."""
61        # 8: data length
62        # 4: crc32 of data length
63        # 4: crc32 of data
64        metadata_length = 8 + 4 + 4
65        required_length = len(data) + metadata_length
66        if self.writer and disk_usage(self._filepath).free < required_length * FREE_DISK_SPACE_TIMES:
67            raise RuntimeError(f"The disk space may be soon exhausted by the '{self._filepath}'.")
68        if self._max_file_size is None:
69            self.writer.Write(data)
70        elif self._max_file_size >= required_length:
71            self._max_file_size -= required_length
72            self.writer.Write(data)
73        else:
74            raise RuntimeWarning(f"'max_file_size' reached: There are {self._max_file_size} bytes remaining, "
75                                 f"but the '{self._filepath}' requires to write {required_length} bytes.")
76
77    def flush(self):
78        """Flush the writer."""
79        if self._writer is not None:
80            self._writer.Flush()
81
82    def close(self):
83        """Close the writer."""
84        try:
85            os.chmod(self._filepath, FILE_MODE)
86        except FileNotFoundError:
87            logger.debug("The summary file %r has been removed.", self._filepath)
88        if self._writer is not None:
89            self._writer.Shut()
90
91
92class SummaryWriter(BaseWriter):
93    """SummaryWriter for write summaries."""
94
95    def init_writer(self):
96        """Write some metadata etc."""
97        self.write(WriterPluginEnum.SUMMARY.value, package_init_event().SerializeToString())
98
99    def write(self, plugin, data):
100        """Write data to file."""
101        if plugin in (WriterPluginEnum.SUMMARY.value, PluginEnum.GRAPH.value):
102            super().write(plugin, data)
103
104
105class LineageWriter(BaseWriter):
106    """LineageWriter for write lineage."""
107
108    def write(self, plugin, data):
109        """Write data to file."""
110        if plugin in (PluginEnum.DATASET_GRAPH.value, PluginEnum.TRAIN_LINEAGE.value, PluginEnum.EVAL_LINEAGE.value,
111                      PluginEnum.CUSTOM_LINEAGE_DATA.value):
112            super().write(plugin, data)
113
114
115class ExplainWriter(BaseWriter):
116    """ExplainWriter for write explain data."""
117
118    def write(self, plugin, data):
119        """Write data to file."""
120        if plugin == WriterPluginEnum.EXPLAINER.value:
121            super().write(plugin, data)
122
123
124class ExportWriter(BaseWriter):
125    """ExportWriter for export data."""
126
127    def write(self, plugin, data):
128        """Write data to file."""
129        if plugin == WriterPluginEnum.EXPORTER.value:
130            self.export_data(data, data.get('export_option'))
131
132    def flush(self):
133        """Flush the writer."""
134
135    def close(self):
136        """Close the writer."""
137
138    def export_data(self, data, export_option):
139        """
140        export the tensor data.
141
142        Args:
143            data (dict): Export data info.
144            export_option (Union[None, str]): The export options.
145        """
146        options = {
147            'npy': self._export_npy
148        }
149
150        if export_option in options:
151            options[export_option](data, self._filepath)
152
153    @staticmethod
154    def _export_npy(data, export_dir):
155        """
156        export the tensor data as npy.
157
158        Args:
159            data (dict): Export data info.
160            export_dir (str): The path of export dir.
161        """
162        tag = quote(data.get('tag'), safe="")
163        step = int(data.get('step'))
164        np_value = data.get('value')
165        path = _make_directory(os.path.join(export_dir, 'tensor'))
166
167        #  128 is the typical length of header of npy file
168        metadata_length = 128
169        required_length = np_value.nbytes + metadata_length
170        if disk_usage(path).free < required_length * FREE_DISK_SPACE_TIMES:
171            raise RuntimeError(f"The disk space may be soon exhausted by the '{path}'.")
172
173        np_path = "{}/{}_{}.npy".format(path, tag, step)
174        np.save(np_path, np_value)
175        os.chmod(np_path, FILE_MODE)
176