|
1 # -*- coding: UTF-8 -*- |
|
2 # Copyright (c) 2011, Pascal Volk |
|
3 # See COPYING for distribution information. |
|
4 """ |
|
5 VirtualMailManager.network |
|
6 ~~~~~~~~~~~~~~~~~~~~~~~~~~ |
|
7 |
|
8 Network/IP address related class and function |
|
9 """ |
|
10 |
|
11 import socket |
|
12 |
|
13 |
|
14 class NetInfo(object): |
|
15 """Simple class for CIDR network addresses an IP addresses.""" |
|
16 __slots__ = ('_addr', '_prefix', '_bits_max', '_family', '_nw_addr') |
|
17 |
|
18 def __init__(self, nw_address): |
|
19 """Creates a new `NetInfo` instance. |
|
20 |
|
21 Argument: |
|
22 |
|
23 `nw_address` : basestring |
|
24 string representation of an IPv4/IPv6 address or network address. |
|
25 E.g. 192.0.2.13, 192.0.2.0/24, 2001:db8::/32 or ::1 |
|
26 When the address has no netmask the prefix length will be set to |
|
27 32 for IPv4 addresses and 128 for IPv6 addresses. |
|
28 """ |
|
29 self._addr = None |
|
30 self._prefix = 0 |
|
31 self._bits_max = 0 |
|
32 self._family = 0 |
|
33 self._nw_addr = nw_address |
|
34 self._parse_net_range() |
|
35 |
|
36 def __hash__(self): |
|
37 return hash((self._addr, self._family, self._prefix)) |
|
38 |
|
39 def __repr__(self): |
|
40 return "NetInfo('%s')" % self._nw_addr |
|
41 |
|
42 def _parse_net_range(self): |
|
43 """Parse the network range of `self._nw_addr and assign values |
|
44 to the class attributes. |
|
45 `""" |
|
46 sep = '/' |
|
47 if self._nw_addr.count(sep): |
|
48 ip_address, sep, self._prefix = self._nw_addr.partition(sep) |
|
49 self._family, self._addr = get_ip_addr_info(ip_address) |
|
50 else: |
|
51 self._family, self._addr = get_ip_addr_info(self._nw_addr) |
|
52 self._bits_max = (128, 32)[self._family is socket.AF_INET] |
|
53 if self._prefix is 0: |
|
54 self._prefix = self._bits_max |
|
55 else: |
|
56 try: |
|
57 self._prefix = int(self._prefix) |
|
58 except ValueError: |
|
59 raise ValueError('Invalid prefix length: %r' % self._prefix) |
|
60 if self._prefix > self._bits_max or self._prefix < 0: |
|
61 raise ValueError('Invalid prefix length: %r' % self._prefix) |
|
62 |
|
63 @property |
|
64 def family(self): |
|
65 """Address family: `socket.AF_INET` or `socket.AF_INET6`""" |
|
66 return self._family |
|
67 |
|
68 def address_in_net(self, ip_address): |
|
69 """Checks if the `ip_address` belongs to the same subnet.""" |
|
70 family, address = get_ip_addr_info(ip_address) |
|
71 if family != self._family: |
|
72 return False |
|
73 return address >> self._bits_max - self._prefix == \ |
|
74 self._addr >> self._bits_max - self._prefix |
|
75 |
|
76 |
|
77 def get_ip_addr_info(ip_address): |
|
78 """Checks if the string `ip_address` is a valid IPv4 or IPv6 address. |
|
79 |
|
80 When the `ip_address` could be validated successfully a tuple |
|
81 `(address_family, address_as_long)` will be returned. The |
|
82 `address_family`will be either `socket.AF_INET` or `socket.AF_INET6`. |
|
83 """ |
|
84 if not isinstance(ip_address, basestring) or not ip_address: |
|
85 raise TypeError('ip_address must be a non empty string.') |
|
86 if not ip_address.count(':'): |
|
87 family = socket.AF_INET |
|
88 try: |
|
89 address = socket.inet_aton(ip_address) |
|
90 except socket.error: |
|
91 raise ValueError('Not a valid IPv4 address: %r' % ip_address) |
|
92 elif not socket.has_ipv6: |
|
93 raise ValueError('Unsupported IP address (IPv6): %r' % ip_address) |
|
94 else: |
|
95 family = socket.AF_INET6 |
|
96 try: |
|
97 address = socket.inet_pton(family, ip_address) |
|
98 except socket.error: |
|
99 raise ValueError('Not a valid IPv6 address: %r' % ip_address) |
|
100 return (family, long(address.encode('hex'), 16)) |