1# 2# Copyright (c) 2011 Thomas Graf <tgraf@suug.ch> 3# 4 5"""Module providing access to network addresses 6""" 7 8from __future__ import absolute_import 9 10 11__version__ = '1.0' 12__all__ = [ 13 'AddressCache', 14 'Address'] 15 16import datetime 17from .. import core as netlink 18from . import capi as capi 19from . import link as Link 20from .. import util as util 21 22class AddressCache(netlink.Cache): 23 """Cache containing network addresses""" 24 25 def __init__(self, cache=None): 26 if not cache: 27 cache = self._alloc_cache_name('route/addr') 28 29 self._protocol = netlink.NETLINK_ROUTE 30 self._nl_cache = cache 31 32 def __getitem__(self, key): 33 # Using ifindex=0 here implies that the local address itself 34 # is unique, otherwise the first occurence is returned. 35 return self.lookup(0, key) 36 37 def lookup(self, ifindex, local): 38 if type(local) is str: 39 local = netlink.AbstractAddress(local) 40 41 addr = capi.rtnl_addr_get(self._nl_cache, ifindex, 42 local._nl_addr) 43 if addr is None: 44 raise KeyError() 45 46 return Address._from_capi(addr) 47 48 @staticmethod 49 def _new_object(obj): 50 return Address(obj) 51 52 @staticmethod 53 def _new_cache(cache): 54 return AddressCache(cache=cache) 55 56class Address(netlink.Object): 57 """Network address""" 58 59 def __init__(self, obj=None): 60 netlink.Object.__init__(self, 'route/addr', 'address', obj) 61 self._rtnl_addr = self._obj2type(self._nl_object) 62 63 @classmethod 64 def _from_capi(cls, obj): 65 return cls(capi.addr2obj(obj)) 66 67 @staticmethod 68 def _obj2type(obj): 69 return capi.obj2addr(obj) 70 71 def __cmp__(self, other): 72 # sort by: 73 # 1. network link 74 # 2. address family 75 # 3. local address (including prefixlen) 76 diff = self.ifindex - other.ifindex 77 78 if diff == 0: 79 diff = self.family - other.family 80 if diff == 0: 81 diff = capi.nl_addr_cmp(self.local, other.local) 82 83 return diff 84 85 @staticmethod 86 def _new_instance(obj): 87 return Address(obj) 88 89 @property 90 @netlink.nlattr(type=int, immutable=True, fmt=util.num) 91 def ifindex(self): 92 """interface index""" 93 return capi.rtnl_addr_get_ifindex(self._rtnl_addr) 94 95 @ifindex.setter 96 def ifindex(self, value): 97 link = Link.resolve(value) 98 if not link: 99 raise ValueError() 100 101 self.link = link 102 103 @property 104 @netlink.nlattr(type=str, fmt=util.string) 105 def link(self): 106 link = capi.rtnl_addr_get_link(self._rtnl_addr) 107 if not link: 108 return None 109 110 return Link.Link.from_capi(link) 111 112 @link.setter 113 def link(self, value): 114 if type(value) is str: 115 try: 116 value = Link.resolve(value) 117 except KeyError: 118 raise ValueError() 119 120 capi.rtnl_addr_set_link(self._rtnl_addr, value._rtnl_link) 121 122 # ifindex is immutable but we assume that if _orig does not 123 # have an ifindex specified, it was meant to be given here 124 if capi.rtnl_addr_get_ifindex(self._orig) == 0: 125 capi.rtnl_addr_set_ifindex(self._orig, value.ifindex) 126 127 @property 128 @netlink.nlattr(type=str, fmt=util.string) 129 def label(self): 130 """address label""" 131 return capi.rtnl_addr_get_label(self._rtnl_addr) 132 133 @label.setter 134 def label(self, value): 135 capi.rtnl_addr_set_label(self._rtnl_addr, value) 136 137 @property 138 @netlink.nlattr(type=str, fmt=util.string) 139 def flags(self): 140 """Flags 141 142 Setting this property will *Not* reset flags to value you supply in 143 144 Examples: 145 addr.flags = '+xxx' # add xxx flag 146 addr.flags = 'xxx' # exactly the same 147 addr.flags = '-xxx' # remove xxx flag 148 addr.flags = [ '+xxx', '-yyy' ] # list operation 149 """ 150 flags = capi.rtnl_addr_get_flags(self._rtnl_addr) 151 return capi.rtnl_addr_flags2str(flags, 256)[0].split(',') 152 153 def _set_flag(self, flag): 154 if flag.startswith('-'): 155 i = capi.rtnl_addr_str2flags(flag[1:]) 156 capi.rtnl_addr_unset_flags(self._rtnl_addr, i) 157 elif flag.startswith('+'): 158 i = capi.rtnl_addr_str2flags(flag[1:]) 159 capi.rtnl_addr_set_flags(self._rtnl_addr, i) 160 else: 161 i = capi.rtnl_addr_str2flags(flag) 162 capi.rtnl_addr_set_flags(self._rtnl_addr, i) 163 164 @flags.setter 165 def flags(self, value): 166 if type(value) is list: 167 for flag in value: 168 self._set_flag(flag) 169 else: 170 self._set_flag(value) 171 172 @property 173 @netlink.nlattr(type=int, immutable=True, fmt=util.num) 174 def family(self): 175 """Address family""" 176 fam = capi.rtnl_addr_get_family(self._rtnl_addr) 177 return netlink.AddressFamily(fam) 178 179 @family.setter 180 def family(self, value): 181 if not isinstance(value, netlink.AddressFamily): 182 value = netlink.AddressFamily(value) 183 184 capi.rtnl_addr_set_family(self._rtnl_addr, int(value)) 185 186 @property 187 @netlink.nlattr(type=int, fmt=util.num) 188 def scope(self): 189 """Address scope""" 190 scope = capi.rtnl_addr_get_scope(self._rtnl_addr) 191 return capi.rtnl_scope2str(scope, 32)[0] 192 193 @scope.setter 194 def scope(self, value): 195 if type(value) is str: 196 value = capi.rtnl_str2scope(value) 197 capi.rtnl_addr_set_scope(self._rtnl_addr, value) 198 199 @property 200 @netlink.nlattr(type=str, immutable=True, fmt=util.addr) 201 def local(self): 202 """Local address""" 203 a = capi.rtnl_addr_get_local(self._rtnl_addr) 204 return netlink.AbstractAddress(a) 205 206 @local.setter 207 def local(self, value): 208 a = netlink.AbstractAddress(value) 209 capi.rtnl_addr_set_local(self._rtnl_addr, a._nl_addr) 210 211 # local is immutable but we assume that if _orig does not 212 # have a local address specified, it was meant to be given here 213 if capi.rtnl_addr_get_local(self._orig) is None: 214 capi.rtnl_addr_set_local(self._orig, a._nl_addr) 215 216 @property 217 @netlink.nlattr(type=str, fmt=util.addr) 218 def peer(self): 219 """Peer address""" 220 a = capi.rtnl_addr_get_peer(self._rtnl_addr) 221 return netlink.AbstractAddress(a) 222 223 @peer.setter 224 def peer(self, value): 225 a = netlink.AbstractAddress(value) 226 capi.rtnl_addr_set_peer(self._rtnl_addr, a._nl_addr) 227 228 @property 229 @netlink.nlattr(type=str, fmt=util.addr) 230 def broadcast(self): 231 """Broadcast address""" 232 a = capi.rtnl_addr_get_broadcast(self._rtnl_addr) 233 return netlink.AbstractAddress(a) 234 235 @broadcast.setter 236 def broadcast(self, value): 237 a = netlink.AbstractAddress(value) 238 capi.rtnl_addr_set_broadcast(self._rtnl_addr, a._nl_addr) 239 240 @property 241 @netlink.nlattr(type=str, fmt=util.addr) 242 def multicast(self): 243 """multicast address""" 244 a = capi.rtnl_addr_get_multicast(self._rtnl_addr) 245 return netlink.AbstractAddress(a) 246 247 @multicast.setter 248 def multicast(self, value): 249 try: 250 a = netlink.AbstractAddress(value) 251 except ValueError as err: 252 raise AttributeError('multicast', err) 253 254 capi.rtnl_addr_set_multicast(self._rtnl_addr, a._nl_addr) 255 256 @property 257 @netlink.nlattr(type=str, fmt=util.addr) 258 def anycast(self): 259 """anycast address""" 260 a = capi.rtnl_addr_get_anycast(self._rtnl_addr) 261 return netlink.AbstractAddress(a) 262 263 @anycast.setter 264 def anycast(self, value): 265 a = netlink.AbstractAddress(value) 266 capi.rtnl_addr_set_anycast(self._rtnl_addr, a._nl_addr) 267 268 @property 269 @netlink.nlattr(type=int, immutable=True, fmt=util.num) 270 def valid_lifetime(self): 271 """Valid lifetime""" 272 msecs = capi.rtnl_addr_get_valid_lifetime(self._rtnl_addr) 273 if msecs == 0xFFFFFFFF: 274 return None 275 else: 276 return datetime.timedelta(seconds=msecs) 277 278 @valid_lifetime.setter 279 def valid_lifetime(self, value): 280 capi.rtnl_addr_set_valid_lifetime(self._rtnl_addr, int(value)) 281 282 @property 283 @netlink.nlattr(type=int, immutable=True, fmt=util.num) 284 def preferred_lifetime(self): 285 """Preferred lifetime""" 286 msecs = capi.rtnl_addr_get_preferred_lifetime(self._rtnl_addr) 287 if msecs == 0xFFFFFFFF: 288 return None 289 else: 290 return datetime.timedelta(seconds=msecs) 291 292 @preferred_lifetime.setter 293 def preferred_lifetime(self, value): 294 capi.rtnl_addr_set_preferred_lifetime(self._rtnl_addr, int(value)) 295 296 @property 297 @netlink.nlattr(type=int, immutable=True, fmt=util.num) 298 def create_time(self): 299 """Creation time""" 300 hsec = capi.rtnl_addr_get_create_time(self._rtnl_addr) 301 return datetime.timedelta(milliseconds=10*hsec) 302 303 @property 304 @netlink.nlattr(type=int, immutable=True, fmt=util.num) 305 def last_update(self): 306 """Last update""" 307 hsec = capi.rtnl_addr_get_last_update_time(self._rtnl_addr) 308 return datetime.timedelta(milliseconds=10*hsec) 309 310 def add(self, socket=None, flags=None): 311 if not socket: 312 socket = netlink.lookup_socket(netlink.NETLINK_ROUTE) 313 314 if not flags: 315 flags = netlink.NLM_F_CREATE 316 317 ret = capi.rtnl_addr_add(socket._sock, self._rtnl_addr, flags) 318 if ret < 0: 319 raise netlink.KernelError(ret) 320 321 def delete(self, socket, flags=0): 322 """Attempt to delete this address in the kernel""" 323 ret = capi.rtnl_addr_delete(socket._sock, self._rtnl_addr, flags) 324 if ret < 0: 325 raise netlink.KernelError(ret) 326 327 ################################################################### 328 # private properties 329 # 330 # Used for formatting output. USE AT OWN RISK 331 @property 332 def _flags(self): 333 return ','.join(self.flags) 334 335 def format(self, details=False, stats=False, nodev=False, indent=''): 336 """Return address as formatted text""" 337 fmt = util.MyFormatter(self, indent) 338 339 buf = fmt.format('{a|local!b}') 340 341 if not nodev: 342 buf += fmt.format(' {a|ifindex}') 343 344 buf += fmt.format(' {a|scope}') 345 346 if self.label: 347 buf += fmt.format(' "{a|label}"') 348 349 buf += fmt.format(' <{a|_flags}>') 350 351 if details: 352 buf += fmt.nl('\t{t|broadcast} {t|multicast}') \ 353 + fmt.nl('\t{t|peer} {t|anycast}') 354 355 if self.valid_lifetime: 356 buf += fmt.nl('\t{s|valid-lifetime!k} '\ 357 '{a|valid_lifetime}') 358 359 if self.preferred_lifetime: 360 buf += fmt.nl('\t{s|preferred-lifetime!k} '\ 361 '{a|preferred_lifetime}') 362 363 if stats and (self.create_time or self.last_update): 364 buf += self.nl('\t{s|created!k} {a|create_time}'\ 365 ' {s|last-updated!k} {a|last_update}') 366 367 return buf 368