1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import dpkt 6import re 7 8 9CROS_P2P_PROTO = '_cros_p2p._tcp' 10CROS_P2P_PORT = 16725 11 12 13class CrosP2PDaemon(object): 14 """Simulates a P2P server. 15 16 The simulated P2P server will instruct the underlying ZeroconfDaemon to 17 reply to requests sharing the files registered on this server. 18 """ 19 def __init__(self, zeroconf, port=CROS_P2P_PORT): 20 """Initialize the CrosP2PDaemon. 21 22 @param zeroconf: A ZeroconfDaemon instance where this P2P server will be 23 announced. 24 @param port: The port where the HTTP server part of the P2P protocol is 25 listening. The HTTP server is assumend to be running on the same host as 26 the provided ZeroconfDaemon server. 27 """ 28 self._zeroconf = zeroconf 29 self._files = {} 30 self._num_connections = 0 31 32 self._p2p_domain = CROS_P2P_PROTO + '.' + zeroconf.domain 33 # Register the HTTP Server. 34 zeroconf.register_SRV(zeroconf.hostname, CROS_P2P_PROTO, 0, 0, port) 35 # Register the P2P running on this server. 36 zeroconf.register_PTR(self._p2p_domain, zeroconf.hostname) 37 self._update_records(False) 38 39 40 def add_file(self, file_id, file_size, announce=False): 41 """Add or update a shared file. 42 43 @param file_id: The name of the file (without .p2p extension). 44 @param file_size: The expected total size of the file. 45 @param announce: If True, the method will also announce the changes 46 on the network. 47 """ 48 self._files[file_id] = file_size 49 self._update_records(announce) 50 51 52 def remove_file(self, file_id, announce=False): 53 """Remove a shared file. 54 55 @param file_id: The name of the file (without .p2p extension). 56 @param announce: If True, the method will also announce the changes 57 on the network. 58 """ 59 del self._files[file_id] 60 self._update_records(announce) 61 62 63 def set_num_connections(self, num_connections, announce=False): 64 """Sets the number of connections that the HTTP server is handling. 65 66 This method allows the P2P server to properly announce the number of 67 connections it is currently handling. 68 69 @param num_connections: An integer with the number of connections. 70 @param announce: If True, the method will also announce the changes 71 on the network. 72 """ 73 self._num_connections = num_connections 74 self._update_records(announce) 75 76 77 def _update_records(self, announce): 78 # Build the TXT records: 79 txts = ['num_connections=%d' % self._num_connections] 80 for file_id, file_size in self._files.iteritems(): 81 txts.append('id_%s=%d' % (file_id, file_size)) 82 self._zeroconf.register_TXT( 83 self._zeroconf.hostname + '.' + self._p2p_domain, txts, announce) 84 85 86class CrosP2PClient(object): 87 """Simulates a P2P client. 88 89 The P2P client interacts with a ZeroconfDaemon instance that inquires the 90 network and collects the mDNS responses. A P2P client instance decodes those 91 responses according to the P2P protocol implemented over mDNS. 92 """ 93 def __init__(self, zeroconf): 94 self._zeroconf = zeroconf 95 self._p2p_domain = CROS_P2P_PROTO + '.' + zeroconf.domain 96 self._in_query = 0 97 zeroconf.add_answer_observer(self._new_answers) 98 99 100 def start_query(self): 101 """Sends queries to gather all the p2p information on the network. 102 103 When a response that requires to send a new query to the peer is 104 received, such query will be sent until stop_query() is called. 105 Responses received when no query is running will not generate a new. 106 """ 107 self._in_query += 1 108 ts = self._zeroconf.send_request([(self._p2p_domain, dpkt.dns.DNS_PTR)]) 109 # Also send requests for all the known PTR records. 110 queries = [] 111 112 113 # The PTR record points to a SRV name. 114 ptr_recs = self._zeroconf.cached_results( 115 self._p2p_domain, dpkt.dns.DNS_PTR, ts) 116 for _rrname, _rrtype, p2p_peer, _deadline in ptr_recs: 117 # Request all the information for that peer. 118 queries.append((p2p_peer, dpkt.dns.DNS_ANY)) 119 # The SRV points to a hostname, port, etc. 120 srv_recs = self._zeroconf.cached_results( 121 p2p_peer, dpkt.dns.DNS_SRV, ts) 122 for _rrname, _rrtype, service, _deadline in srv_recs: 123 srvname, _priority, _weight, port = service 124 # Request all the information for the host name. 125 queries.append((srvname, dpkt.dns.DNS_ANY)) 126 if queries: 127 self._zeroconf.send_request(queries) 128 129 130 def stop_query(self): 131 """Stops a started query.""" 132 self._in_query -= 1 133 134 135 def _new_answers(self, answers): 136 if not self._in_query: 137 return 138 queries = [] 139 for rrname, rrtype, data in answers: 140 if rrname == self._p2p_domain and rrtype == dpkt.dns.DNS_PTR: 141 # data is a "ptrname" string. 142 queries.append((ptrname, dpkt.dns.DNS_ANY)) 143 if queries: 144 self._zeroconf.send_request(queries) 145 146 147 def get_peers(self, timestamp=None): 148 """Return the cached list of peers. 149 150 @param timestamp: The deadline timestamp to consider the responses. 151 @return: A list of tuples of the form (peer_name, hostname, list_of_IPs, 152 port). 153 """ 154 res = [] 155 # The PTR record points to a SRV name. 156 ptr_recs = self._zeroconf.cached_results( 157 self._p2p_domain, dpkt.dns.DNS_PTR, timestamp) 158 for _rrname, _rrtype, p2p_peer, _deadline in ptr_recs: 159 # The SRV points to a hostname, port, etc. 160 srv_recs = self._zeroconf.cached_results( 161 p2p_peer, dpkt.dns.DNS_SRV, timestamp) 162 for _rrname, _rrtype, service, _deadline in srv_recs: 163 srvname, _priority, _weight, port = service 164 # Each service points to a hostname (srvname). 165 a_recs = self._zeroconf.cached_results( 166 srvname, dpkt.dns.DNS_A, timestamp) 167 ip_list = [ip for _rrname, _rrtype, ip, _deadline in a_recs] 168 res.append((p2p_peer, srvname, ip_list, port)) 169 return res 170 171 172 def get_peer_files(self, peer_name, timestamp=None): 173 """Returns the cached list of files of the given peer. 174 175 @peer_name: The peer_name as provided by get_peers(). 176 @param timestamp: The deadline timestamp to consider the responses. 177 @return: A list of tuples of the form (file_name, current_size). 178 """ 179 res = [] 180 txt_records = self._zeroconf.cached_results( 181 peer_name, dpkt.dns.DNS_TXT, timestamp) 182 for _rrname, _rrtype, txt_list, _deadline in txt_records: 183 for txt in txt_list: 184 m = re.match(r'^id_(.*)=([0-9]+)$', txt) 185 if not m: 186 continue 187 file_name, size = m.groups() 188 res.append((file_name, int(size))) 189 return res 190 191 192 def get_peer_connections(self, peer_name, timestamp=None): 193 """Returns the cached num_connections of the given peer. 194 195 @peer_name: The peer_name as provided by get_peers(). 196 @param timestamp: The deadline timestamp to consider the responses. 197 @return: A list of tuples of the form (file_name, current_size). 198 """ 199 txt_records = self._zeroconf.cached_results( 200 peer_name, dpkt.dns.DNS_TXT, timestamp) 201 for _rrname, _rrtype, txt_list, _deadline in txt_records: 202 for txt in txt_list: 203 m = re.match(r'num_connections=(\d+)$', txt) 204 if m: 205 return int(m.group(1)) 206 return None # No num_connections found. 207