• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python2
2# Copyright 2016 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Unit test for pubsub_utils.py"""
7
8from __future__ import print_function
9import os
10import unittest
11
12import mox
13
14# TODO(crbug.com/1050892): The unittests rely on apiclient in chromite.
15import chromite  # pylint: disable=unused-import
16
17from apiclient import discovery
18from oauth2client.client import ApplicationDefaultCredentialsError
19from oauth2client.client import GoogleCredentials
20from googleapiclient.errors import UnknownApiNameOrVersion
21
22import pubsub_utils
23
24_TEST_CLOUD_SERVICE_ACCOUNT_FILE = '/tmp/test-credential'
25
26
27class MockedPubSub(object):
28    """A mocked PubSub handle."""
29    def __init__(self, test, topic, msg, retry, ret_val=None,
30                 raise_except=False):
31        self.test = test
32        self.topic = topic
33        self.msg = msg
34        self.retry = retry
35        self.ret_val = ret_val
36        self.raise_except = raise_except
37
38    def projects(self):
39        """Mocked PubSub projects."""
40        return self
41
42    def topics(self):
43        """Mocked PubSub topics."""
44        return self
45
46    def publish(self, topic, body):
47        """Mocked PubSub publish method.
48
49        @param topic: PubSub topic string.
50        @param body: PubSub notification body.
51        """
52        self.test.assertEquals(self.topic, topic)
53        self.test.assertEquals(self.msg, body['messages'][0])
54        return self
55
56    def execute(self, num_retries):
57        """Mocked PubSub execute method.
58
59        @param num_retries: Number of retries.
60        """
61        self.test.assertEquals(self.retry, num_retries)
62        if self.raise_except:
63            raise Exception()
64        return self.ret_val
65
66
67def _create_sample_message():
68    """Creates a sample pubsub message."""
69    msg_payload = {'data': 'sample data'}
70    msg_attributes = {}
71    msg_attributes['var'] = 'value'
72    msg_payload['attributes'] = msg_attributes
73
74    return msg_payload
75
76
77class PubSubTests(mox.MoxTestBase):
78    """Tests for pubsub related functios."""
79
80    def test_pubsub_with_no_service_account(self):
81        """Test getting the pubsub service"""
82        self.mox.StubOutWithMock(os.path, 'isfile')
83        self.mox.ReplayAll()
84        with self.assertRaises(pubsub_utils.PubSubException):
85            pubsub_utils.PubSubClient()
86        self.mox.VerifyAll()
87
88    def test_pubsub_with_non_existing_service_account(self):
89        """Test getting the pubsub service"""
90        self.mox.StubOutWithMock(os.path, 'isfile')
91        os.path.isfile(_TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(False)
92        self.mox.ReplayAll()
93        with self.assertRaises(pubsub_utils.PubSubException):
94            pubsub_utils.PubSubClient(_TEST_CLOUD_SERVICE_ACCOUNT_FILE)
95        self.mox.VerifyAll()
96
97    def test_pubsub_with_corrupted_service_account(self):
98        """Test pubsub with corrupted service account."""
99        self.mox.StubOutWithMock(os.path, 'isfile')
100        self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
101        os.path.isfile(_TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
102        GoogleCredentials.from_stream(
103            _TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndRaise(
104                ApplicationDefaultCredentialsError())
105        self.mox.ReplayAll()
106        with self.assertRaises(pubsub_utils.PubSubException):
107            pubsub_utils.PubSubClient(_TEST_CLOUD_SERVICE_ACCOUNT_FILE)
108        self.mox.VerifyAll()
109
110    def test_pubsub_with_invalid_service_account(self):
111        """Test pubsubwith invalid service account."""
112        self.mox.StubOutWithMock(os.path, 'isfile')
113        self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
114        os.path.isfile(_TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
115        credentials = self.mox.CreateMock(GoogleCredentials)
116        GoogleCredentials.from_stream(
117            _TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(credentials)
118        credentials.create_scoped_required().AndReturn(True)
119        credentials.create_scoped(pubsub_utils.PUBSUB_SCOPES).AndReturn(
120            credentials)
121        self.mox.StubOutWithMock(discovery, 'build')
122        discovery.build(
123            pubsub_utils.PUBSUB_SERVICE_NAME,
124            pubsub_utils.PUBSUB_VERSION,
125            credentials=credentials).AndRaise(UnknownApiNameOrVersion())
126        self.mox.ReplayAll()
127        with self.assertRaises(pubsub_utils.PubSubException):
128            msg = _create_sample_message()
129            pubsub_client = pubsub_utils.PubSubClient(
130                _TEST_CLOUD_SERVICE_ACCOUNT_FILE)
131            pubsub_client.publish_notifications('test_topic', [msg])
132        self.mox.VerifyAll()
133
134    def test_publish_notifications(self):
135        """Test getting the pubsub service"""
136        self.mox.StubOutWithMock(os.path, 'isfile')
137        self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
138        os.path.isfile(_TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
139        credentials = self.mox.CreateMock(GoogleCredentials)
140        GoogleCredentials.from_stream(
141            _TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(credentials)
142        credentials.create_scoped_required().AndReturn(True)
143        credentials.create_scoped(pubsub_utils.PUBSUB_SCOPES).AndReturn(
144            credentials)
145        self.mox.StubOutWithMock(discovery, 'build')
146        msg = _create_sample_message()
147        discovery.build(
148            pubsub_utils.PUBSUB_SERVICE_NAME,
149            pubsub_utils.PUBSUB_VERSION,
150            credentials=credentials).AndReturn(MockedPubSub(
151                self,
152                'test_topic',
153                msg,
154                pubsub_utils.DEFAULT_PUBSUB_NUM_RETRIES,
155                # use tuple ('123') instead of list just for easy to
156                # write the test.
157                ret_val={'messageIds': ('123')}))
158
159        self.mox.ReplayAll()
160        pubsub_client = pubsub_utils.PubSubClient(
161                _TEST_CLOUD_SERVICE_ACCOUNT_FILE)
162        msg_ids = pubsub_client.publish_notifications('test_topic', [msg])
163        self.assertEquals(('123'), msg_ids)
164
165        self.mox.VerifyAll()
166
167
168if __name__ == '__main__':
169    unittest.main()
170