# Copyright 2020 Google Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ################################################################################ """Utility functions for testing cloud functions.""" import datetime import os import subprocess import threading import requests DATASTORE_READY_INDICATOR = b'is now running' DATASTORE_EMULATOR_PORT = 8432 EMULATOR_TIMEOUT = 20 TEST_PROJECT_ID = 'test-project' # pylint: disable=arguments-differ class SpoofedDatetime(datetime.datetime): """Mocking Datetime class for now() function.""" @classmethod def now(cls): return datetime.datetime(2020, 1, 1, 0, 0, 0) def start_datastore_emulator(): """Start Datastore emulator.""" return subprocess.Popen([ 'gcloud', 'beta', 'emulators', 'datastore', 'start', '--consistency=1.0', '--host-port=localhost:' + str(DATASTORE_EMULATOR_PORT), '--project=' + TEST_PROJECT_ID, '--no-store-on-disk', ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) def wait_for_emulator_ready(proc, emulator, indicator, timeout=EMULATOR_TIMEOUT): """Wait for emulator to be ready.""" def _read_thread(proc, ready_event): """Thread to continuously read from the process stdout.""" ready = False while True: line = proc.stdout.readline() if not line: break if not ready and indicator in line: ready = True ready_event.set() # Wait for process to become ready. ready_event = threading.Event() thread = threading.Thread(target=_read_thread, args=(proc, ready_event)) thread.daemon = True thread.start() if not ready_event.wait(timeout): raise RuntimeError( '{} emulator did not get ready in time.'.format(emulator)) return thread def reset_ds_emulator(): """Reset ds emulator/clean all entities.""" req = requests.post( 'http://localhost:{}/reset'.format(DATASTORE_EMULATOR_PORT)) req.raise_for_status() def cleanup_emulator(ds_emulator): """Cleanup the system processes made by ds emulator.""" del ds_emulator #To do, find a better way to cleanup emulator os.system('pkill -f datastore') def set_gcp_environment(): """Set environment variables for simulating in google cloud platform.""" os.environ['DATASTORE_EMULATOR_HOST'] = 'localhost:' + str( DATASTORE_EMULATOR_PORT) os.environ['GOOGLE_CLOUD_PROJECT'] = TEST_PROJECT_ID os.environ['DATASTORE_DATASET'] = TEST_PROJECT_ID os.environ['GCP_PROJECT'] = TEST_PROJECT_ID os.environ['FUNCTION_REGION'] = 'us-central1'