• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#  Copyright (c) 2016, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import collections
31import io
32import struct
33
34from binascii import hexlify
35from enum import IntEnum
36
37
38class CoapMessageType(IntEnum):
39    CON = 0  # Confirmable
40    NON = 1  # Non-confirmable
41    ACK = 2  # Acknowledgement
42    RST = 3  # Reset
43
44
45class CoapOptionsTypes(IntEnum):
46    IF_MATCH = 1
47    URI_HOST = 3
48    ETAG = 4
49    IF_NOT_MATCH = 5
50    URI_PORT = 7
51    LOCATION_PATH = 8
52    URI_PATH = 11
53    CONTENT_FORMAT = 12
54    MAX_AGE = 14
55    URI_QUERY = 15
56    ACCEPT = 17
57    LOCATION_QUERY = 20
58    PROXY_URI = 35
59    PROXY_SCHEME = 39
60    SIZE1 = 60
61
62
63class CoapOptionHeader(object):
64    """ Class representing CoAP optiona header. """
65
66    def __init__(self, delta, length):
67        self._delta = delta
68        self._length = length
69
70    @property
71    def delta(self):
72        return self._delta
73
74    @property
75    def length(self):
76        return self._length
77
78    @property
79    def is_payload_marker(self):
80        return self.delta == 0xF and self.length == 0xF
81
82    @classmethod
83    def _read_extended_value(cls, data, value):
84        if value == 13:
85            return ord(data.read(1)) + 13
86        elif value == 14:
87            data.read(1)
88            return ord(data.read(1)) + 269
89        else:
90            return value
91
92    @classmethod
93    def from_bytes(cls, data):
94        initial_byte = ord(data.read(1))
95
96        delta = (initial_byte >> 4) & 0xF
97        length = initial_byte & 0xF
98
99        delta = cls._read_extended_value(data, delta)
100        length = cls._read_extended_value(data, length)
101
102        return cls(delta, length)
103
104
105class CoapOption(object):
106    """ Class representing CoAP option. """
107
108    def __init__(self, _type, value):
109        self._type = _type
110        self._value = value
111
112    @property
113    def type(self):
114        return self._type
115
116    @property
117    def value(self):
118        return self._value
119
120    def __repr__(self):
121        return "CoapOption(type={}, value={})".format(self.type, hexlify(self.value))
122
123
124class CoapOptionsFactory(object):
125    """ Factory that produces CoAP options. """
126
127    def parse(self, data, message_info):
128        options = []
129
130        _type = 0
131        while data.tell() < len(data.getvalue()):
132            option_header = CoapOptionHeader.from_bytes(data)
133            if option_header.is_payload_marker:
134                break
135
136            _type += option_header.delta
137            value = data.read(option_header.length)
138
139            option = CoapOption(_type, value)
140            options.append(option)
141
142        return options
143
144
145class CoapCode(object):
146    """ Class representing CoAP code. """
147
148    def __init__(self, code):
149        self._code = code
150
151    @property
152    def code(self):
153        return self._code
154
155    @property
156    def _class(self):
157        return (self.code >> 5) & 0x7
158
159    @property
160    def detail(self):
161        return self.code & 0x1F
162
163    @classmethod
164    def from_class_and_detail(cls, _class, detail):
165        return cls(((_class & 0x7) << 5) | (detail & 0x1F))
166
167    @classmethod
168    def from_dotted(cls, dotted_str):
169        _class, detail = dotted_str.split(".")
170        return cls.from_class_and_detail(int(_class), int(detail))
171
172    def is_equal_dotted(self, dotted_code):
173        other = self.from_dotted(dotted_code)
174        return self.code == other.code
175
176    @property
177    def dotted(self):
178        return ".".join(["{:01d}".format(self._class), "{:02d}".format(self.detail)])
179
180    def __eq__(self, other):
181        if isinstance(other, int):
182            return self.code == other
183
184        elif isinstance(other, str):
185            return self.is_equal_dotted(other)
186
187        elif isinstance(other, self.__class__):
188            return self.code == other.code
189
190        else:
191            raise TypeError("Could not compare {} and {}".format(type(self), type(other)))
192
193    def __repr__(self):
194        return self.dotted
195
196
197class CoapMessage(object):
198    """ Class representing CoAP message. """
199
200    def __init__(
201        self,
202        version,
203        _type,
204        code,
205        message_id,
206        token,
207        options,
208        payload,
209        uri_path=None,
210    ):
211        self._version = version
212        self._type = _type
213        self._code = code
214        self._message_id = message_id
215        self._token = token
216        self._options = options
217        self._payload = payload
218        self._uri_path = uri_path
219
220    @property
221    def version(self):
222        return self._version
223
224    @property
225    def type(self):
226        return self._type
227
228    @property
229    def code(self):
230        return self._code
231
232    @property
233    def message_id(self):
234        return self._message_id
235
236    @property
237    def token(self):
238        return self._token
239
240    @property
241    def tkl(self):
242        return len(self._token)
243
244    @property
245    def options(self):
246        return self._options
247
248    @property
249    def payload(self):
250        return self._payload
251
252    @property
253    def uri_path(self):
254        return self._uri_path
255
256    def __repr__(self):
257        options_str = ", ".join([repr(opt) for opt in self.options])
258        return ("CoapMessage(version={}, type={}, code={}, message_id={}, token={}, options=[{}], payload={},",
259                "uri-path='{}')").format(
260                    self.version,
261                    CoapMessageType.name[self.type],
262                    self.code,
263                    self.message_id,
264                    hexlify(self.token),
265                    options_str,
266                    self.payload,
267                    self.uri_path,
268                )
269
270
271class CoapMessageProxy(object):
272    """ Proxy class of CoAP message.
273
274    The main idea behind this class is to delay parsing payload. Due to architecture of the existing solution
275    it is possible to process confirmation message before a request message. In such case it is not possible
276    to get URI path to get proper payload parser.
277    """
278
279    def __init__(
280        self,
281        coap_message,
282        message_info,
283        mid_to_uri_path_binder,
284        uri_path_based_payload_factories,
285    ):
286        self._coap_message = coap_message
287        self._message_info = message_info
288        self._mid_to_uri_path_binder = mid_to_uri_path_binder
289        self._uri_path_based_payload_factories = (uri_path_based_payload_factories)
290
291    @property
292    def version(self):
293        return self._coap_message.version
294
295    @property
296    def type(self):
297        return self._coap_message.type
298
299    @property
300    def code(self):
301        return self._coap_message.code
302
303    @property
304    def message_id(self):
305        return self._coap_message.message_id
306
307    @property
308    def token(self):
309        return self._coap_message.token
310
311    @property
312    def tkl(self):
313        return self._coap_message.tkl
314
315    @property
316    def options(self):
317        return self._coap_message.options
318
319    @property
320    def payload(self):
321        try:
322            binded_uri_path = self._mid_to_uri_path_binder.get_uri_path_for(self.message_id, self.token)
323
324            factory = self._uri_path_based_payload_factories[binded_uri_path]
325
326            return factory.parse(io.BytesIO(self._coap_message.payload), self._message_info)
327
328        except RuntimeError:
329            return self._coap_message.payload
330
331    @property
332    def uri_path(self):
333        return self._coap_message.uri_path
334
335    def __repr__(self):
336        options_str = ", ".join([repr(opt) for opt in self.options])
337        return ("CoapMessageProxy(version={}, type={}, code={}, message_id={}, token={}, options=[{}], payload={},",
338                "uri-path='{}')").format(
339                    self.version,
340                    self.type,
341                    self.code,
342                    self.message_id,
343                    hexlify(self.token),
344                    options_str,
345                    self.payload,
346                    self.uri_path,
347                )
348
349
350class CoapMessageIdToUriPathBinder:
351    """ Class binds message id and token with URI path. """
352
353    def __init__(self):
354        self._uri_path_binds = collections.defaultdict(collections.defaultdict)
355
356    def add_uri_path_for(self, message_id, token, uri_path):
357        self._uri_path_binds[message_id][hexlify(token)] = uri_path
358
359    def get_uri_path_for(self, message_id, token):
360        try:
361            return self._uri_path_binds[message_id][hexlify(token)]
362        except KeyError:
363            raise RuntimeError("Could not find URI PATH for message_id: {} and token: {}".format(
364                message_id, hexlify(token)))
365
366
367class CoapMessageFactory(object):
368    """ Factory that produces CoAP messages. """
369
370    def __init__(
371        self,
372        options_factory,
373        uri_path_based_payload_factories,
374        message_id_to_uri_path_binder,
375    ):
376        self._options_factory = options_factory
377        self._uri_path_based_payload_factories = (uri_path_based_payload_factories)
378        self._mid_to_uri_path_binder = message_id_to_uri_path_binder
379
380    def _uri_path_from(self, options):
381        uri_path_options = []
382
383        for option in options:
384            if option.type == CoapOptionsTypes.URI_PATH:
385                uri_path_options.append(option.value.decode("utf-8"))
386
387        if not uri_path_options:
388            return None
389
390        return "/" + "/".join(uri_path_options)
391
392    def _parse_initial_byte(self, data, message_info):
393        initial_byte = ord(data.read(1))
394
395        version = (initial_byte >> 6) & 0x3
396        _type = CoapMessageType((initial_byte >> 4) & 0x3)
397        token_length = initial_byte & 0xF
398
399        return version, _type, token_length
400
401    def parse(self, data, message_info):
402        version, _type, token_length = self._parse_initial_byte(data, message_info)
403
404        code = CoapCode(ord(data.read(1)))
405        message_id = struct.unpack(">H", data.read(2))[0]
406        token = data.read(token_length)
407
408        options = self._options_factory.parse(data, message_info)
409
410        uri_path = self._uri_path_from(options)
411        if uri_path is not None:
412            self._mid_to_uri_path_binder.add_uri_path_for(message_id, token, uri_path)
413
414        coap_message = CoapMessage(
415            version,
416            _type,
417            code,
418            message_id,
419            token,
420            options,
421            data.read(),
422            uri_path,
423        )
424
425        return CoapMessageProxy(
426            coap_message,
427            message_info,
428            self._mid_to_uri_path_binder,
429            self._uri_path_based_payload_factories,
430        )
431