1# Copyright 2015 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5 6import cherrypy 7 8import common 9import logging 10from fake_device_server import common_util 11from fake_device_server import server_errors 12 13OAUTH_PATH = 'oauth' 14 15TEST_API_KEY = 'this_is_an_api_key' 16TEST_DEVICE_ACCESS_TOKEN = 'a_device_access_token' 17TEST_DEVICE_REFRESH_TOKEN = 'a_device_refresh_token' 18TOKEN_EXPIRATION_SECONDS = 24 * 60 * 60 # 24 hours. 19 20 21class OAuth(object): 22 """The bare minimum to make Buffet think its talking to OAuth.""" 23 24 # Needed for cherrypy to expose this to requests. 25 exposed = True 26 27 def __init__(self, fail_control_handler): 28 self._device_access_token = TEST_DEVICE_ACCESS_TOKEN 29 self._device_refresh_token = TEST_DEVICE_REFRESH_TOKEN 30 self._fail_control_handler = fail_control_handler 31 32 33 def get_api_key_from_access_token(self, access_token): 34 if access_token == self._device_access_token: 35 return TEST_API_KEY 36 return None 37 38 39 def is_request_authorized(self): 40 """Checks if the access token in an incoming request is correct.""" 41 access_token = common_util.get_access_token() 42 if access_token == self._device_access_token: 43 return True 44 logging.info('Wrong access token - expected %s but device sent %s', 45 self._device_access_token, access_token) 46 return False 47 48 49 @cherrypy.tools.json_out() 50 def POST(self, *args, **kwargs): 51 """Handle a post to get a refresh/access token. 52 53 We expect the device to provide (a subset of) the following parameters. 54 55 code 56 client_id 57 client_secret 58 redirect_uri 59 scope 60 grant_type 61 refresh_token 62 63 in the request body in query-string format (see the OAuth docs 64 for details). Since we're a bare-minimum implementation we're 65 going to ignore most of these. 66 67 """ 68 self._fail_control_handler.ensure_not_in_failure_mode() 69 path = list(args) 70 if path == ['token']: 71 body_length = int(cherrypy.request.headers.get('Content-Length', 0)) 72 body = cherrypy.request.rfile.read(body_length) 73 params = cherrypy.lib.httputil.parse_query_string(body) 74 refresh_token = params.get('refresh_token') 75 if refresh_token and refresh_token != self._device_refresh_token: 76 logging.info('Wrong refresh token - expected %s but ' 77 'device sent %s', 78 self._device_refresh_token, refresh_token) 79 cherrypy.response.status = 400 80 response = {'error': 'invalid_grant'} 81 return response 82 response = { 83 'access_token': self._device_access_token, 84 'refresh_token': self._device_refresh_token, 85 'expires_in': TOKEN_EXPIRATION_SECONDS, 86 } 87 return response 88 elif path == ['invalidate_all_access_tokens']: 89 # By concatenating '_X' to the end of existing access 90 # token, this will effectively invalidate the access token 91 # previously granted to a device and cause us to return 92 # the concatenated one for future requests. 93 self._device_access_token += '_X' 94 return dict() 95 elif path == ['invalidate_all_refresh_tokens']: 96 # Same here, only for the refresh token. 97 self._device_refresh_token += '_X' 98 return dict() 99 else: 100 raise server_errors.HTTPError( 101 400, 'Unsupported oauth path %s' % path) 102