1 /** 2 * Copyright 2020 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_MESSAGE_HANDLER_H_ 18 #define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_MESSAGE_HANDLER_H_ 19 20 #include <functional> 21 #include <iostream> 22 #include <string> 23 #include <memory> 24 #include <vector> 25 #include "utils/log_adapter.h" 26 #include "ps/core/communicator/message.h" 27 #include "proto/comm.pb.h" 28 #include "proto/ps.pb.h" 29 #include "utils/convert_utils_base.h" 30 #include "include/backend/distributed/ps/constants.h" 31 32 namespace mindspore { 33 namespace ps { 34 namespace core { 35 using messageReceive = 36 std::function<void(const std::shared_ptr<MessageMeta> &, const Protos &, const void *, size_t size)>; 37 38 constexpr size_t kHeaderLen = sizeof(MessageHeader); 39 40 class TcpMessageHandler { 41 public: TcpMessageHandler()42 TcpMessageHandler() : remaining_length_(0), cur_header_len_(0), last_copy_len_(0) {} 43 virtual ~TcpMessageHandler() = default; 44 45 void SetCallback(const messageReceive &cb); 46 void ReceiveMessage(const void *buffer, size_t num); 47 48 void Reset(); 49 50 private: 51 messageReceive message_callback_; 52 std::vector<uint8_t> message_buffer_; 53 uint8_t header_[kHeaderLen]{0}; 54 size_t remaining_length_; 55 size_t cur_header_len_ = 0; 56 size_t last_copy_len_; 57 MessageHeader message_header_; 58 }; 59 } // namespace core 60 } // namespace ps 61 } // namespace mindspore 62 63 #endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_MESSAGE_HANDLER_H_ 64