• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2012 Google Inc. All Rights Reserved.
2# Author: mrdmnd@ (Matt Redmond)
3"""A unit test for sending data to Bartlett. Requires poster module."""
4
5import cookielib
6import os
7import signal
8import subprocess
9import tempfile
10import time
11import unittest
12import urllib2
13
14from poster.encode import multipart_encode
15from poster.streaminghttp import register_openers
16
17SERVER_DIR = '../.'
18SERVER_URL = 'http://localhost:8080/'
19GET = '_ah/login?email=googler@google.com&action=Login&continue=%s'
20AUTH_URL = SERVER_URL + GET
21
22
23class ServerTest(unittest.TestCase):
24  """A unit test for the bartlett server. Tests upload, serve, and delete."""
25
26  def setUp(self):
27    """Instantiate the files and server needed to test upload functionality."""
28    self._server_proc = LaunchLocalServer()
29    self._jar = cookielib.LWPCookieJar()
30    self.opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._jar))
31
32    # We need these files to not delete when closed, because we have to reopen
33    # them in read mode after we write and close them.
34    self.profile_data = tempfile.NamedTemporaryFile(delete=False)
35
36    size = 16 * 1024
37    self.profile_data.write(os.urandom(size))
38
39  def tearDown(self):
40    self.profile_data.close()
41    os.remove(self.profile_data.name)
42    os.kill(self._server_proc.pid, signal.SIGINT)
43
44  def testIntegration(self):  # pylint: disable-msg=C6409
45    key = self._testUpload()
46    self._testListAll()
47    self._testServeKey(key)
48    self._testDelKey(key)
49
50  def _testUpload(self):  # pylint: disable-msg=C6409
51    register_openers()
52    data = {'profile_data': self.profile_data,
53            'board': 'x86-zgb',
54            'chromeos_version': '2409.0.2012_06_08_1114'}
55    datagen, headers = multipart_encode(data)
56    request = urllib2.Request(SERVER_URL + 'upload', datagen, headers)
57    response = urllib2.urlopen(request).read()
58    self.assertTrue(response)
59    return response
60
61  def _testListAll(self):  # pylint: disable-msg=C6409
62    request = urllib2.Request(AUTH_URL % (SERVER_URL + 'serve'))
63    response = self.opener.open(request).read()
64    self.assertTrue(response)
65
66  def _testServeKey(self, key):  # pylint: disable-msg=C6409
67    request = urllib2.Request(AUTH_URL % (SERVER_URL + 'serve/' + key))
68    response = self.opener.open(request).read()
69    self.assertTrue(response)
70
71  def _testDelKey(self, key):  # pylint: disable-msg=C6409
72    # There is no response to a delete request.
73    # We will check the listAll page to ensure there is no data.
74    request = urllib2.Request(AUTH_URL % (SERVER_URL + 'del/' + key))
75    response = self.opener.open(request).read()
76    request = urllib2.Request(AUTH_URL % (SERVER_URL + 'serve'))
77    response = self.opener.open(request).read()
78    self.assertFalse(response)
79
80
81def LaunchLocalServer():
82  """Launch and store an authentication cookie with a local server."""
83  proc = subprocess.Popen(
84      ['dev_appserver.py', '--clear_datastore', SERVER_DIR],
85      stdout=subprocess.PIPE,
86      stderr=subprocess.PIPE)
87  # Wait for server to come up
88  while True:
89    time.sleep(1)
90    try:
91      request = urllib2.Request(SERVER_URL + 'serve')
92      response = urllib2.urlopen(request).read()
93      if response:
94        break
95    except urllib2.URLError:
96      continue
97  return proc
98
99
100if __name__ == '__main__':
101  unittest.main()
102