# Copyright 2015 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import cherrypy import common import logging from fake_device_server import common_util from fake_device_server import server_errors OAUTH_PATH = 'oauth' TEST_API_KEY = 'this_is_an_api_key' TEST_DEVICE_ACCESS_TOKEN = 'a_device_access_token' TEST_DEVICE_REFRESH_TOKEN = 'a_device_refresh_token' TOKEN_EXPIRATION_SECONDS = 24 * 60 * 60 # 24 hours. class OAuth(object): """The bare minimum to make Buffet think its talking to OAuth.""" # Needed for cherrypy to expose this to requests. exposed = True def __init__(self, fail_control_handler): self._device_access_token = TEST_DEVICE_ACCESS_TOKEN self._device_refresh_token = TEST_DEVICE_REFRESH_TOKEN self._fail_control_handler = fail_control_handler def get_api_key_from_access_token(self, access_token): if access_token == self._device_access_token: return TEST_API_KEY return None def is_request_authorized(self): """Checks if the access token in an incoming request is correct.""" access_token = common_util.get_access_token() if access_token == self._device_access_token: return True logging.info('Wrong access token - expected %s but device sent %s', self._device_access_token, access_token) return False @cherrypy.tools.json_out() def POST(self, *args, **kwargs): """Handle a post to get a refresh/access token. We expect the device to provide (a subset of) the following parameters. code client_id client_secret redirect_uri scope grant_type refresh_token in the request body in query-string format (see the OAuth docs for details). Since we're a bare-minimum implementation we're going to ignore most of these. """ self._fail_control_handler.ensure_not_in_failure_mode() path = list(args) if path == ['token']: body_length = int(cherrypy.request.headers.get('Content-Length', 0)) body = cherrypy.request.rfile.read(body_length) params = cherrypy.lib.httputil.parse_query_string(body) refresh_token = params.get('refresh_token') if refresh_token and refresh_token != self._device_refresh_token: logging.info('Wrong refresh token - expected %s but ' 'device sent %s', self._device_refresh_token, refresh_token) cherrypy.response.status = 400 response = {'error': 'invalid_grant'} return response response = { 'access_token': self._device_access_token, 'refresh_token': self._device_refresh_token, 'expires_in': TOKEN_EXPIRATION_SECONDS, } return response elif path == ['invalidate_all_access_tokens']: # By concatenating '_X' to the end of existing access # token, this will effectively invalidate the access token # previously granted to a device and cause us to return # the concatenated one for future requests. self._device_access_token += '_X' return dict() elif path == ['invalidate_all_refresh_tokens']: # Same here, only for the refresh token. self._device_refresh_token += '_X' return dict() else: raise server_errors.HTTPError( 400, 'Unsupported oauth path %s' % path)