1# Copyright 2020 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Functions for working with pw_rpc packets.""" 15 16from google.protobuf import message 17from pw_status import Status 18 19from pw_rpc.internal import packet_pb2 20 21 22def decode(data: bytes): 23 packet = packet_pb2.RpcPacket() 24 packet.MergeFromString(data) 25 return packet 26 27 28def decode_payload(packet, payload_type): 29 payload = payload_type() 30 payload.MergeFromString(packet.payload) 31 return payload 32 33 34def _ids(rpc: tuple) -> tuple: 35 return tuple(item if isinstance(item, int) else item.id for item in rpc) 36 37 38def encode_request(rpc: tuple, request: message.Message) -> bytes: 39 channel, service, method = _ids(rpc) 40 41 return packet_pb2.RpcPacket( 42 type=packet_pb2.PacketType.REQUEST, 43 channel_id=channel, 44 service_id=service, 45 method_id=method, 46 payload=request.SerializeToString()).SerializeToString() 47 48 49def encode_response(rpc: tuple, response: message.Message) -> bytes: 50 channel, service, method = _ids(rpc) 51 52 return packet_pb2.RpcPacket( 53 type=packet_pb2.PacketType.RESPONSE, 54 channel_id=channel, 55 service_id=service, 56 method_id=method, 57 payload=response.SerializeToString()).SerializeToString() 58 59 60def encode_client_error(packet, status: Status) -> bytes: 61 return packet_pb2.RpcPacket(type=packet_pb2.PacketType.CLIENT_ERROR, 62 channel_id=packet.channel_id, 63 service_id=packet.service_id, 64 method_id=packet.method_id, 65 status=status.value).SerializeToString() 66 67 68def encode_cancel(rpc: tuple) -> bytes: 69 channel, service, method = _ids(rpc) 70 return packet_pb2.RpcPacket( 71 type=packet_pb2.PacketType.CANCEL_SERVER_STREAM, 72 channel_id=channel, 73 service_id=service, 74 method_id=method).SerializeToString() 75 76 77def for_server(packet): 78 return packet.type % 2 == 0 79