• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Functions for working with pw_rpc packets."""
15
16from google.protobuf import message
17from pw_status import Status
18
19from pw_rpc.internal import packet_pb2
20
21
22def decode(data: bytes):
23    packet = packet_pb2.RpcPacket()
24    packet.MergeFromString(data)
25    return packet
26
27
28def decode_payload(packet, payload_type):
29    payload = payload_type()
30    payload.MergeFromString(packet.payload)
31    return payload
32
33
34def _ids(rpc: tuple) -> tuple:
35    return tuple(item if isinstance(item, int) else item.id for item in rpc)
36
37
38def encode_request(rpc: tuple, request: message.Message) -> bytes:
39    channel, service, method = _ids(rpc)
40
41    return packet_pb2.RpcPacket(
42        type=packet_pb2.PacketType.REQUEST,
43        channel_id=channel,
44        service_id=service,
45        method_id=method,
46        payload=request.SerializeToString()).SerializeToString()
47
48
49def encode_response(rpc: tuple, response: message.Message) -> bytes:
50    channel, service, method = _ids(rpc)
51
52    return packet_pb2.RpcPacket(
53        type=packet_pb2.PacketType.RESPONSE,
54        channel_id=channel,
55        service_id=service,
56        method_id=method,
57        payload=response.SerializeToString()).SerializeToString()
58
59
60def encode_client_error(packet, status: Status) -> bytes:
61    return packet_pb2.RpcPacket(type=packet_pb2.PacketType.CLIENT_ERROR,
62                                channel_id=packet.channel_id,
63                                service_id=packet.service_id,
64                                method_id=packet.method_id,
65                                status=status.value).SerializeToString()
66
67
68def encode_cancel(rpc: tuple) -> bytes:
69    channel, service, method = _ids(rpc)
70    return packet_pb2.RpcPacket(
71        type=packet_pb2.PacketType.CANCEL_SERVER_STREAM,
72        channel_id=channel,
73        service_id=service,
74        method_id=method).SerializeToString()
75
76
77def for_server(packet):
78    return packet.type % 2 == 0
79