1#!/usr/bin/env python2 2# Copyright 2016 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""Unit test for pubsub_utils.py""" 7 8from __future__ import print_function 9import os 10import unittest 11 12import mox 13 14# TODO(crbug.com/1050892): The unittests rely on apiclient in chromite. 15import chromite # pylint: disable=unused-import 16 17from apiclient import discovery 18from oauth2client.client import ApplicationDefaultCredentialsError 19from oauth2client.client import GoogleCredentials 20from googleapiclient.errors import UnknownApiNameOrVersion 21 22import pubsub_utils 23 24_TEST_CLOUD_SERVICE_ACCOUNT_FILE = '/tmp/test-credential' 25 26 27class MockedPubSub(object): 28 """A mocked PubSub handle.""" 29 def __init__(self, test, topic, msg, retry, ret_val=None, 30 raise_except=False): 31 self.test = test 32 self.topic = topic 33 self.msg = msg 34 self.retry = retry 35 self.ret_val = ret_val 36 self.raise_except = raise_except 37 38 def projects(self): 39 """Mocked PubSub projects.""" 40 return self 41 42 def topics(self): 43 """Mocked PubSub topics.""" 44 return self 45 46 def publish(self, topic, body): 47 """Mocked PubSub publish method. 48 49 @param topic: PubSub topic string. 50 @param body: PubSub notification body. 51 """ 52 self.test.assertEquals(self.topic, topic) 53 self.test.assertEquals(self.msg, body['messages'][0]) 54 return self 55 56 def execute(self, num_retries): 57 """Mocked PubSub execute method. 58 59 @param num_retries: Number of retries. 60 """ 61 self.test.assertEquals(self.retry, num_retries) 62 if self.raise_except: 63 raise Exception() 64 return self.ret_val 65 66 67def _create_sample_message(): 68 """Creates a sample pubsub message.""" 69 msg_payload = {'data': 'sample data'} 70 msg_attributes = {} 71 msg_attributes['var'] = 'value' 72 msg_payload['attributes'] = msg_attributes 73 74 return msg_payload 75 76 77class PubSubTests(mox.MoxTestBase): 78 """Tests for pubsub related functios.""" 79 80 def test_pubsub_with_no_service_account(self): 81 """Test getting the pubsub service""" 82 self.mox.StubOutWithMock(os.path, 'isfile') 83 self.mox.ReplayAll() 84 with self.assertRaises(pubsub_utils.PubSubException): 85 pubsub_utils.PubSubClient() 86 self.mox.VerifyAll() 87 88 def test_pubsub_with_non_existing_service_account(self): 89 """Test getting the pubsub service""" 90 self.mox.StubOutWithMock(os.path, 'isfile') 91 os.path.isfile(_TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(False) 92 self.mox.ReplayAll() 93 with self.assertRaises(pubsub_utils.PubSubException): 94 pubsub_utils.PubSubClient(_TEST_CLOUD_SERVICE_ACCOUNT_FILE) 95 self.mox.VerifyAll() 96 97 def test_pubsub_with_corrupted_service_account(self): 98 """Test pubsub with corrupted service account.""" 99 self.mox.StubOutWithMock(os.path, 'isfile') 100 self.mox.StubOutWithMock(GoogleCredentials, 'from_stream') 101 os.path.isfile(_TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True) 102 GoogleCredentials.from_stream( 103 _TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndRaise( 104 ApplicationDefaultCredentialsError()) 105 self.mox.ReplayAll() 106 with self.assertRaises(pubsub_utils.PubSubException): 107 pubsub_utils.PubSubClient(_TEST_CLOUD_SERVICE_ACCOUNT_FILE) 108 self.mox.VerifyAll() 109 110 def test_pubsub_with_invalid_service_account(self): 111 """Test pubsubwith invalid service account.""" 112 self.mox.StubOutWithMock(os.path, 'isfile') 113 self.mox.StubOutWithMock(GoogleCredentials, 'from_stream') 114 os.path.isfile(_TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True) 115 credentials = self.mox.CreateMock(GoogleCredentials) 116 GoogleCredentials.from_stream( 117 _TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(credentials) 118 credentials.create_scoped_required().AndReturn(True) 119 credentials.create_scoped(pubsub_utils.PUBSUB_SCOPES).AndReturn( 120 credentials) 121 self.mox.StubOutWithMock(discovery, 'build') 122 discovery.build( 123 pubsub_utils.PUBSUB_SERVICE_NAME, 124 pubsub_utils.PUBSUB_VERSION, 125 credentials=credentials).AndRaise(UnknownApiNameOrVersion()) 126 self.mox.ReplayAll() 127 with self.assertRaises(pubsub_utils.PubSubException): 128 msg = _create_sample_message() 129 pubsub_client = pubsub_utils.PubSubClient( 130 _TEST_CLOUD_SERVICE_ACCOUNT_FILE) 131 pubsub_client.publish_notifications('test_topic', [msg]) 132 self.mox.VerifyAll() 133 134 def test_publish_notifications(self): 135 """Test getting the pubsub service""" 136 self.mox.StubOutWithMock(os.path, 'isfile') 137 self.mox.StubOutWithMock(GoogleCredentials, 'from_stream') 138 os.path.isfile(_TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True) 139 credentials = self.mox.CreateMock(GoogleCredentials) 140 GoogleCredentials.from_stream( 141 _TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(credentials) 142 credentials.create_scoped_required().AndReturn(True) 143 credentials.create_scoped(pubsub_utils.PUBSUB_SCOPES).AndReturn( 144 credentials) 145 self.mox.StubOutWithMock(discovery, 'build') 146 msg = _create_sample_message() 147 discovery.build( 148 pubsub_utils.PUBSUB_SERVICE_NAME, 149 pubsub_utils.PUBSUB_VERSION, 150 credentials=credentials).AndReturn(MockedPubSub( 151 self, 152 'test_topic', 153 msg, 154 pubsub_utils.DEFAULT_PUBSUB_NUM_RETRIES, 155 # use tuple ('123') instead of list just for easy to 156 # write the test. 157 ret_val={'messageIds': ('123')})) 158 159 self.mox.ReplayAll() 160 pubsub_client = pubsub_utils.PubSubClient( 161 _TEST_CLOUD_SERVICE_ACCOUNT_FILE) 162 msg_ids = pubsub_client.publish_notifications('test_topic', [msg]) 163 self.assertEquals(('123'), msg_ids) 164 165 self.mox.VerifyAll() 166 167 168if __name__ == '__main__': 169 unittest.main() 170