• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python2
2# Copyright 2016 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Unit test for pubsub_utils.py"""
7
8from __future__ import print_function
9import os
10import unittest
11
12import mox
13
14from apiclient import discovery
15from oauth2client.client import ApplicationDefaultCredentialsError
16from oauth2client.client import GoogleCredentials
17from googleapiclient.errors import UnknownApiNameOrVersion
18
19import pubsub_utils
20
21_TEST_CLOUD_SERVICE_ACCOUNT_FILE = '/tmp/test-credential'
22
23
24class MockedPubSub(object):
25    """A mocked PubSub handle."""
26    def __init__(self, test, topic, msg, retry, ret_val=None,
27                 raise_except=False):
28        self.test = test
29        self.topic = topic
30        self.msg = msg
31        self.retry = retry
32        self.ret_val = ret_val
33        self.raise_except = raise_except
34
35    def projects(self):
36        """Mocked PubSub projects."""
37        return self
38
39    def topics(self):
40        """Mocked PubSub topics."""
41        return self
42
43    def publish(self, topic, body):
44        """Mocked PubSub publish method.
45
46        @param topic: PubSub topic string.
47        @param body: PubSub notification body.
48        """
49        self.test.assertEquals(self.topic, topic)
50        self.test.assertEquals(self.msg, body['messages'][0])
51        return self
52
53    def execute(self, num_retries):
54        """Mocked PubSub execute method.
55
56        @param num_retries: Number of retries.
57        """
58        self.test.assertEquals(self.retry, num_retries)
59        if self.raise_except:
60            raise Exception()
61        return self.ret_val
62
63
64def _create_sample_message():
65    """Creates a sample pubsub message."""
66    msg_payload = {'data': 'sample data'}
67    msg_attributes = {}
68    msg_attributes['var'] = 'value'
69    msg_payload['attributes'] = msg_attributes
70
71    return msg_payload
72
73
74class PubSubTests(mox.MoxTestBase):
75    """Tests for pubsub related functios."""
76
77    def test_pubsub_with_no_service_account(self):
78        """Test getting the pubsub service"""
79        self.mox.StubOutWithMock(os.path, 'isfile')
80        self.mox.ReplayAll()
81        with self.assertRaises(pubsub_utils.PubSubException):
82            pubsub_utils.PubSubClient()
83        self.mox.VerifyAll()
84
85    def test_pubsub_with_non_existing_service_account(self):
86        """Test getting the pubsub service"""
87        self.mox.StubOutWithMock(os.path, 'isfile')
88        os.path.isfile(_TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(False)
89        self.mox.ReplayAll()
90        with self.assertRaises(pubsub_utils.PubSubException):
91            pubsub_utils.PubSubClient(_TEST_CLOUD_SERVICE_ACCOUNT_FILE)
92        self.mox.VerifyAll()
93
94    def test_pubsub_with_corrupted_service_account(self):
95        """Test pubsub with corrupted service account."""
96        self.mox.StubOutWithMock(os.path, 'isfile')
97        self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
98        os.path.isfile(_TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
99        GoogleCredentials.from_stream(
100            _TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndRaise(
101                ApplicationDefaultCredentialsError())
102        self.mox.ReplayAll()
103        with self.assertRaises(pubsub_utils.PubSubException):
104            pubsub_utils.PubSubClient(_TEST_CLOUD_SERVICE_ACCOUNT_FILE)
105        self.mox.VerifyAll()
106
107    def test_pubsub_with_invalid_service_account(self):
108        """Test pubsubwith invalid service account."""
109        self.mox.StubOutWithMock(os.path, 'isfile')
110        self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
111        os.path.isfile(_TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
112        credentials = self.mox.CreateMock(GoogleCredentials)
113        GoogleCredentials.from_stream(
114            _TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(credentials)
115        credentials.create_scoped_required().AndReturn(True)
116        credentials.create_scoped(pubsub_utils.PUBSUB_SCOPES).AndReturn(
117            credentials)
118        self.mox.StubOutWithMock(discovery, 'build')
119        discovery.build(
120            pubsub_utils.PUBSUB_SERVICE_NAME,
121            pubsub_utils.PUBSUB_VERSION,
122            credentials=credentials).AndRaise(UnknownApiNameOrVersion())
123        self.mox.ReplayAll()
124        with self.assertRaises(pubsub_utils.PubSubException):
125            msg = _create_sample_message()
126            pubsub_client = pubsub_utils.PubSubClient(
127                _TEST_CLOUD_SERVICE_ACCOUNT_FILE)
128            pubsub_client.publish_notifications('test_topic', [msg])
129        self.mox.VerifyAll()
130
131    def test_publish_notifications(self):
132        """Test getting the pubsub service"""
133        self.mox.StubOutWithMock(os.path, 'isfile')
134        self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
135        os.path.isfile(_TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
136        credentials = self.mox.CreateMock(GoogleCredentials)
137        GoogleCredentials.from_stream(
138            _TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(credentials)
139        credentials.create_scoped_required().AndReturn(True)
140        credentials.create_scoped(pubsub_utils.PUBSUB_SCOPES).AndReturn(
141            credentials)
142        self.mox.StubOutWithMock(discovery, 'build')
143        msg = _create_sample_message()
144        discovery.build(
145            pubsub_utils.PUBSUB_SERVICE_NAME,
146            pubsub_utils.PUBSUB_VERSION,
147            credentials=credentials).AndReturn(MockedPubSub(
148                self,
149                'test_topic',
150                msg,
151                pubsub_utils.DEFAULT_PUBSUB_NUM_RETRIES,
152                # use tuple ('123') instead of list just for easy to
153                # write the test.
154                ret_val={'messageIds': ('123')}))
155
156        self.mox.ReplayAll()
157        pubsub_client = pubsub_utils.PubSubClient(
158                _TEST_CLOUD_SERVICE_ACCOUNT_FILE)
159        msg_ids = pubsub_client.publish_notifications('test_topic', [msg])
160        self.assertEquals(('123'), msg_ids)
161
162        self.mox.VerifyAll()
163
164
165if __name__ == '__main__':
166    unittest.main()
167