1# Copyright 2020 Google Inc. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# 15################################################################################ 16"""Utility functions for testing cloud functions.""" 17import datetime 18import os 19import subprocess 20import threading 21 22import requests 23 24DATASTORE_READY_INDICATOR = b'is now running' 25DATASTORE_EMULATOR_PORT = 8432 26EMULATOR_TIMEOUT = 20 27TEST_PROJECT_ID = 'test-project' 28 29 30# pylint: disable=arguments-differ 31class SpoofedDatetime(datetime.datetime): 32 """Mocking Datetime class for now() function.""" 33 34 @classmethod 35 def now(cls): 36 return datetime.datetime(2020, 1, 1, 0, 0, 0) 37 38 39def start_datastore_emulator(): 40 """Start Datastore emulator.""" 41 return subprocess.Popen([ 42 'gcloud', 43 'beta', 44 'emulators', 45 'datastore', 46 'start', 47 '--consistency=1.0', 48 '--host-port=localhost:' + str(DATASTORE_EMULATOR_PORT), 49 '--project=' + TEST_PROJECT_ID, 50 '--no-store-on-disk', 51 ], 52 stdout=subprocess.PIPE, 53 stderr=subprocess.STDOUT) 54 55 56def wait_for_emulator_ready(proc, 57 emulator, 58 indicator, 59 timeout=EMULATOR_TIMEOUT): 60 """Wait for emulator to be ready.""" 61 62 def _read_thread(proc, ready_event): 63 """Thread to continuously read from the process stdout.""" 64 ready = False 65 while True: 66 line = proc.stdout.readline() 67 if not line: 68 break 69 if not ready and indicator in line: 70 ready = True 71 ready_event.set() 72 73 # Wait for process to become ready. 74 ready_event = threading.Event() 75 thread = threading.Thread(target=_read_thread, args=(proc, ready_event)) 76 thread.daemon = True 77 thread.start() 78 if not ready_event.wait(timeout): 79 raise RuntimeError( 80 '{} emulator did not get ready in time.'.format(emulator)) 81 return thread 82 83 84def reset_ds_emulator(): 85 """Reset ds emulator/clean all entities.""" 86 req = requests.post( 87 'http://localhost:{}/reset'.format(DATASTORE_EMULATOR_PORT)) 88 req.raise_for_status() 89 90 91def cleanup_emulator(ds_emulator): 92 """Cleanup the system processes made by ds emulator.""" 93 del ds_emulator #To do, find a better way to cleanup emulator 94 os.system('pkill -f datastore') 95 96 97def set_gcp_environment(): 98 """Set environment variables for simulating in google cloud platform.""" 99 os.environ['DATASTORE_EMULATOR_HOST'] = 'localhost:' + str( 100 DATASTORE_EMULATOR_PORT) 101 os.environ['GOOGLE_CLOUD_PROJECT'] = TEST_PROJECT_ID 102 os.environ['DATASTORE_DATASET'] = TEST_PROJECT_ID 103 os.environ['GCP_PROJECT'] = TEST_PROJECT_ID 104 os.environ['FUNCTION_REGION'] = 'us-central1' 105