• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#! /usr/bin/env python
2
3# This file is part of Scapy.
4# See http://www.secdev.org/projects/scapy for more information.
5#
6# Scapy is free software: you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation, either version 2 of the License, or
9# (at your option) any later version.
10#
11# Scapy is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with Scapy.  If not, see <http://www.gnu.org/licenses/>.
18#
19# Copyright (C) 2016 Anmol Sarma <me@anmolsarma.in>
20
21# scapy.contrib.description = Constrained Application Protocol (CoAP)
22# scapy.contrib.status = loads
23
24"""
25RFC 7252 - Constrained Application Protocol (CoAP) layer for Scapy
26"""
27
28from scapy.fields import *
29from scapy.layers.inet import UDP
30from scapy.packet import *
31from scapy.error import warning
32from scapy.compat import raw
33
34coap_codes = {
35    0: "Empty",
36    # Request codes
37    1: "GET",
38    2: "POST",
39    3: "PUT",
40    4: "DELETE",
41    # Response codes
42    65: "2.01 Created",
43    66: "2.02 Deleted",
44    67: "2.03 Valid",
45    68: "2.04 Changed",
46    69: "2.05 Content",
47    128: "4.00 Bad Request",
48    129: "4.01 Unauthorized",
49    130: "4.02 Bad Option",
50    131: "4.03 Forbidden",
51    132: "4.04 Not Found",
52    133: "4.05 Method Not Allowed",
53    134: "4.06 Not Acceptable",
54    140: "4.12 Precondition Failed",
55    141: "4.13 Request Entity Too Large",
56    143: "4.15 Unsupported Content-Format",
57    160: "5.00 Internal Server Error",
58    161: "5.01 Not Implemented",
59    162: "5.02 Bad Gateway",
60    163: "5.03 Service Unavailable",
61    164: "5.04 Gateway Timeout",
62    165: "Proxying Not Supported"}
63
64coap_options = ({
65                    1: "If-Match",
66                    3: "Uri-Host",
67                    4: "ETag",
68                    5: "If-None-Match",
69                    7: "Uri-Port",
70                    8: "Location-Path",
71                    11: "Uri-Path",
72                    12: "Content-Format",
73                    14: "Max-Age",
74                    15: "Uri-Query",
75                    17: "Accept",
76                    20: "Location-Query",
77                    35: "Proxy-Uri",
78                    39: "Proxy-Scheme",
79                    60: "Size1"
80                },
81                {
82                    "If-Match": 1,
83                    "Uri-Host": 3,
84                    "ETag": 4,
85                    "If-None-Match": 5,
86                    "Uri-Port": 7,
87                    "Location-Path": 8,
88                    "Uri-Path": 11,
89                    "Content-Format": 12,
90                    "Max-Age": 14,
91                    "Uri-Query": 15,
92                    "Accept": 17,
93                    "Location-Query": 20,
94                    "Proxy-Uri": 35,
95                    "Proxy-Scheme": 39,
96                    "Size1": 60
97                })
98
99
100def _get_ext_field_size(val):
101    if val >= 15:
102        warning("Invalid Option Delta or Length")
103    if val == 14:
104        return 2
105    if val == 13:
106        return 1
107    return 0
108
109
110def _get_delta_ext_size(pkt):
111    return _get_ext_field_size(pkt.delta)
112
113
114def _get_len_ext_size(pkt):
115    return _get_ext_field_size(pkt.len)
116
117
118def _get_abs_val(val, ext_val):
119    if val >= 15:
120        warning("Invalid Option Length or Delta %d" % val)
121    if val == 14:
122        return 269 + struct.unpack('H', ext_val)[0]
123    if val == 13:
124        return 13 + struct.unpack('B', ext_val)[0]
125    return val
126
127
128def _get_opt_val_size(pkt):
129    return _get_abs_val(pkt.len, pkt.len_ext)
130
131
132class _CoAPOpt(Packet):
133    fields_desc = [BitField("delta", 0, 4),
134                   BitField("len", 0, 4),
135                   StrLenField("delta_ext", None, length_from=_get_delta_ext_size),
136                   StrLenField("len_ext", None, length_from=_get_len_ext_size),
137                   StrLenField("opt_val", None, length_from=_get_opt_val_size)]
138
139    @staticmethod
140    def _populate_extended(val):
141        if val >= 269:
142            return struct.pack('H', val - 269), 14
143        if val >= 13:
144            return struct.pack('B', val - 13), 13
145        return None, val
146
147    def do_build(self):
148        self.delta_ext, self.delta = self._populate_extended(self.delta)
149        self.len_ext, self.len = self._populate_extended(len(self.opt_val))
150
151        return Packet.do_build(self)
152
153    def guess_payload_class(self, payload):
154        if payload[:1] != b"\xff":
155            return _CoAPOpt
156        else:
157            return Packet.guess_payload_class(self, payload)
158
159
160class _CoAPOptsField(StrField):
161    islist = 1
162
163    def i2h(self, pkt, x):
164        return [(coap_options[0][o[0]], o[1]) if o[0] in coap_options[0] else o for o in x]
165
166    # consume only the coap layer from the wire string
167    def getfield(self, pkt, s):
168        opts = self.m2i(pkt, s)
169        used = 0
170        for o in opts:
171            used += o[0]
172        return s[used:], [ (o[1], o[2]) for o in opts ]
173
174    def m2i(self, pkt, x):
175        opts = []
176        o = _CoAPOpt(x)
177        cur_delta = 0
178        while isinstance(o, _CoAPOpt):
179            cur_delta += _get_abs_val(o.delta, o.delta_ext)
180            # size of this option in bytes
181            u = 1 + len(o.opt_val) + len(o.delta_ext) + len(o.len_ext)
182            opts.append((u, cur_delta, o.opt_val))
183            o = o.payload
184        return opts
185
186    def i2m(self, pkt, x):
187        if not x:
188            return b""
189        opt_lst = []
190        for o in x:
191            if isinstance(o[0], str):
192                opt_lst.append((coap_options[1][o[0]], o[1]))
193            else:
194                opt_lst.append(o)
195        opt_lst.sort(key=lambda o:o[0])
196
197        opts = _CoAPOpt(delta=opt_lst[0][0], opt_val=opt_lst[0][1])
198        high_opt = opt_lst[0][0]
199        for o in opt_lst[1:]:
200            opts = opts / _CoAPOpt(delta=o[0] - high_opt, opt_val=o[1])
201            high_opt = o[0]
202
203        return raw(opts)
204
205class _CoAPPaymark(StrField):
206
207    def i2h(self, pkt, x):
208        return x
209
210    def getfield(self, pkt, s):
211        (u, m) = self.m2i(pkt, s)
212        return s[u:], m
213
214    def m2i(self, pkt, x):
215        if len(x) > 0 and x[:1] == b"\xff":
216            return 1, b'\xff'
217        return 0, b'';
218
219    def i2m(self, pkt, x):
220        return x
221
222
223class CoAP(Packet):
224    __slots__ = ["content_format"]
225    name = "CoAP"
226
227    fields_desc = [BitField("ver", 1, 2),
228                   BitEnumField("type", 0, 2, {0: "CON", 1: "NON", 2: "ACK", 3: "RST"}),
229                   BitFieldLenField("tkl", None, 4, length_of='token'),
230                   ByteEnumField("code", 0, coap_codes),
231                   ShortField("msg_id", 0),
232                   StrLenField("token", "", length_from=lambda pkt: pkt.tkl),
233                   _CoAPOptsField("options", []),
234                   _CoAPPaymark("paymark", b"")
235                   ]
236
237    def getfieldval(self, attr):
238        v = getattr(self, attr)
239        if v:
240            return v
241        return Packet.getfieldval(self, attr)
242
243    def post_dissect(self, pay):
244        for k in self.options:
245            if k[0] == "Content-Format":
246                self.content_format = k[1]
247        return pay
248
249bind_layers(UDP, CoAP, sport=5683)
250bind_layers(UDP, CoAP, dport=5683)
251