• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2#
3# Copyright 2016 - The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16"""A client that manages Google Compute Engine.
17
18** ComputeClient **
19
20ComputeClient is a wrapper around Google Compute Engine APIs.
21It provides a set of methods for managing a google compute engine project,
22such as creating images, creating instances, etc.
23
24Design philosophy: We tried to make ComputeClient as stateless as possible,
25and it only keeps states about authentication. ComputeClient should be very
26generic, and only knows how to talk to Compute Engine APIs.
27"""
28# pylint: disable=too-many-lines
29import collections
30import copy
31import functools
32import logging
33import os
34
35from acloud import errors
36from acloud.internal.lib import base_cloud_client
37from acloud.internal.lib import utils
38
39logger = logging.getLogger(__name__)
40
41_MAX_RETRIES_ON_FINGERPRINT_CONFLICT = 10
42_METADATA_KEY = "key"
43_METADATA_KEY_VALUE = "value"
44_SSH_KEYS_NAME = "sshKeys"
45_ITEMS = "items"
46_METADATA = "metadata"
47
48BASE_DISK_ARGS = {
49    "type": "PERSISTENT",
50    "boot": True,
51    "mode": "READ_WRITE",
52    "autoDelete": True,
53    "initializeParams": {},
54}
55
56IP = collections.namedtuple("IP", ["external", "internal"])
57
58
59class OperationScope(object):
60    """Represents operation scope enum."""
61    ZONE = "zone"
62    REGION = "region"
63    GLOBAL = "global"
64
65
66class PersistentDiskType(object):
67    """Represents different persistent disk types.
68
69    pd-standard for regular hard disk.
70    pd-ssd for solid state disk.
71    """
72    STANDARD = "pd-standard"
73    SSD = "pd-ssd"
74
75
76class ImageStatus(object):
77    """Represents the status of an image."""
78    PENDING = "PENDING"
79    READY = "READY"
80    FAILED = "FAILED"
81
82
83def _IsFingerPrintError(exc):
84    """Determine if the exception is a HTTP error with code 412.
85
86    Args:
87        exc: Exception instance.
88
89    Returns:
90        Boolean. True if the exception is a "Precondition Failed" error.
91    """
92    return isinstance(exc, errors.HttpError) and exc.code == 412
93
94
95# pylint: disable=too-many-public-methods
96class ComputeClient(base_cloud_client.BaseCloudApiClient):
97    """Client that manages GCE."""
98
99    # API settings, used by BaseCloudApiClient.
100    API_NAME = "compute"
101    API_VERSION = "v1"
102    SCOPE = " ".join([
103        "https://www.googleapis.com/auth/compute",
104        "https://www.googleapis.com/auth/devstorage.read_write"
105    ])
106    # Default settings for gce operations
107    DEFAULT_INSTANCE_SCOPE = [
108        "https://www.googleapis.com/auth/devstorage.read_only",
109        "https://www.googleapis.com/auth/logging.write"
110    ]
111    OPERATION_TIMEOUT_SECS = 30 * 60  # 30 mins
112    OPERATION_POLL_INTERVAL_SECS = 20
113    MACHINE_SIZE_METRICS = ["guestCpus", "memoryMb"]
114    ACCESS_DENIED_CODE = 403
115
116    def __init__(self, acloud_config, oauth2_credentials):
117        """Initialize.
118
119        Args:
120            acloud_config: An AcloudConfig object.
121            oauth2_credentials: An oauth2client.OAuth2Credentials instance.
122        """
123        super(ComputeClient, self).__init__(oauth2_credentials)
124        self._project = acloud_config.project
125
126    def _GetOperationStatus(self, operation, operation_scope, scope_name=None):
127        """Get status of an operation.
128
129        Args:
130            operation: An Operation resource in the format of json.
131            operation_scope: A value from OperationScope, "zone", "region",
132                             or "global".
133            scope_name: If operation_scope is "zone" or "region", this should be
134                        the name of the zone or region, e.g. "us-central1-f".
135
136        Returns:
137            Status of the operation, one of "DONE", "PENDING", "RUNNING".
138
139        Raises:
140            errors.DriverError: if the operation fails.
141        """
142        operation_name = operation["name"]
143        if operation_scope == OperationScope.GLOBAL:
144            api = self.service.globalOperations().get(
145                project=self._project, operation=operation_name)
146            result = self.Execute(api)
147        elif operation_scope == OperationScope.ZONE:
148            api = self.service.zoneOperations().get(
149                project=self._project,
150                operation=operation_name,
151                zone=scope_name)
152            result = self.Execute(api)
153        elif operation_scope == OperationScope.REGION:
154            api = self.service.regionOperations().get(
155                project=self._project,
156                operation=operation_name,
157                region=scope_name)
158            result = self.Execute(api)
159
160        if result.get("error"):
161            errors_list = result["error"]["errors"]
162            raise errors.DriverError(
163                "Get operation state failed, errors: %s" % str(errors_list))
164        return result["status"]
165
166    def WaitOnOperation(self, operation, operation_scope, scope_name=None):
167        """Wait for an operation to finish.
168
169        Args:
170            operation: An Operation resource in the format of json.
171            operation_scope: A value from OperationScope, "zone", "region",
172                             or "global".
173            scope_name: If operation_scope is "zone" or "region", this should be
174                        the name of the zone or region, e.g. "us-central1-f".
175        """
176        timeout_exception = errors.GceOperationTimeoutError(
177            "Operation hits timeout, did not complete within %d secs." %
178            self.OPERATION_TIMEOUT_SECS)
179        utils.PollAndWait(
180            func=self._GetOperationStatus,
181            expected_return="DONE",
182            timeout_exception=timeout_exception,
183            timeout_secs=self.OPERATION_TIMEOUT_SECS,
184            sleep_interval_secs=self.OPERATION_POLL_INTERVAL_SECS,
185            operation=operation,
186            operation_scope=operation_scope,
187            scope_name=scope_name)
188
189    def GetProject(self):
190        """Get project information.
191
192        Returns:
193          A project resource in json.
194        """
195        api = self.service.projects().get(project=self._project)
196        return self.Execute(api)
197
198    def GetDisk(self, disk_name, zone):
199        """Get disk information.
200
201        Args:
202          disk_name: A string.
203          zone: String, name of zone.
204
205        Returns:
206          An disk resource in json.
207          https://cloud.google.com/compute/docs/reference/latest/disks#resource
208        """
209        api = self.service.disks().get(
210            project=self._project, zone=zone, disk=disk_name)
211        return self.Execute(api)
212
213    def CheckDiskExists(self, disk_name, zone):
214        """Check if disk exists.
215
216        Args:
217          disk_name: A string
218          zone: String, name of zone.
219
220        Returns:
221          True if disk exists, otherwise False.
222        """
223        try:
224            self.GetDisk(disk_name, zone)
225            exists = True
226        except errors.ResourceNotFoundError:
227            exists = False
228        logger.debug("CheckDiskExists: disk_name: %s, result: %s", disk_name,
229                     exists)
230        return exists
231
232    def CreateDisk(self,
233                   disk_name,
234                   source_image,
235                   size_gb,
236                   zone,
237                   source_project=None,
238                   disk_type=PersistentDiskType.STANDARD):
239        """Create a gce disk.
240
241        Args:
242            disk_name: String
243            source_image: String, name of the image.
244            size_gb: Integer, size in gb.
245            zone: String, name of the zone, e.g. us-central1-b.
246            source_project: String, required if the image is located in a different
247                            project.
248            disk_type: String, a value from PersistentDiskType, STANDARD
249                       for regular hard disk or SSD for solid state disk.
250        """
251        source_project = source_project or self._project
252        source_image = "projects/%s/global/images/%s" % (
253            source_project, source_image) if source_image else None
254        logger.info("Creating disk %s, size_gb: %d, source_image: %s",
255                    disk_name, size_gb, str(source_image))
256        body = {
257            "name": disk_name,
258            "sizeGb": size_gb,
259            "type": "projects/%s/zones/%s/diskTypes/%s" % (self._project, zone,
260                                                           disk_type),
261        }
262        api = self.service.disks().insert(
263            project=self._project,
264            sourceImage=source_image,
265            zone=zone,
266            body=body)
267        operation = self.Execute(api)
268        try:
269            self.WaitOnOperation(
270                operation=operation,
271                operation_scope=OperationScope.ZONE,
272                scope_name=zone)
273        except errors.DriverError:
274            logger.error("Creating disk failed, cleaning up: %s", disk_name)
275            if self.CheckDiskExists(disk_name, zone):
276                self.DeleteDisk(disk_name, zone)
277            raise
278        logger.info("Disk %s has been created.", disk_name)
279
280    def DeleteDisk(self, disk_name, zone):
281        """Delete a gce disk.
282
283        Args:
284            disk_name: A string, name of disk.
285            zone: A string, name of zone.
286        """
287        logger.info("Deleting disk %s", disk_name)
288        api = self.service.disks().delete(
289            project=self._project, zone=zone, disk=disk_name)
290        operation = self.Execute(api)
291        self.WaitOnOperation(
292            operation=operation,
293            operation_scope=OperationScope.ZONE,
294            scope_name=zone)
295        logger.info("Deleted disk %s", disk_name)
296
297    def DeleteDisks(self, disk_names, zone):
298        """Delete multiple disks.
299
300        Args:
301            disk_names: A list of disk names.
302            zone: A string, name of zone.
303
304        Returns:
305            A tuple, (deleted, failed, error_msgs)
306            deleted: A list of names of disks that have been deleted.
307            failed: A list of names of disks that we fail to delete.
308            error_msgs: A list of failure messages.
309        """
310        if not disk_names:
311            logger.warn("Nothing to delete. Arg disk_names is not provided.")
312            return [], [], []
313        # Batch send deletion requests.
314        logger.info("Deleting disks: %s", disk_names)
315        delete_requests = {}
316        for disk_name in set(disk_names):
317            request = self.service.disks().delete(
318                project=self._project, disk=disk_name, zone=zone)
319            delete_requests[disk_name] = request
320        return self._BatchExecuteAndWait(
321            delete_requests, OperationScope.ZONE, scope_name=zone)
322
323    def ListDisks(self, zone, disk_filter=None):
324        """List disks.
325
326        Args:
327            zone: A string, representing zone name. e.g. "us-central1-f"
328            disk_filter: A string representing a filter in format of
329                             FIELD_NAME COMPARISON_STRING LITERAL_STRING
330                             e.g. "name ne example-instance"
331                             e.g. "name eq "example-instance-[0-9]+""
332
333        Returns:
334            A list of disks.
335        """
336        return self.ListWithMultiPages(
337            api_resource=self.service.disks().list,
338            project=self._project,
339            zone=zone,
340            filter=disk_filter)
341
342    def CreateImage(self,
343                    image_name,
344                    source_uri=None,
345                    source_disk=None,
346                    labels=None):
347        """Create a Gce image.
348
349        Args:
350            image_name: String, name of image
351            source_uri: Full Google Cloud Storage URL where the disk image is
352                        stored.  e.g. "https://storage.googleapis.com/my-bucket/
353                        avd-system-2243663.tar.gz"
354            source_disk: String, this should be the disk's selfLink value
355                         (including zone and project), rather than the disk_name
356                         e.g. https://www.googleapis.com/compute/v1/projects/
357                              google.com:android-builds-project/zones/
358                              us-east1-d/disks/<disk_name>
359            labels: Dict, will be added to the image's labels.
360
361        Raises:
362            errors.DriverError: For malformed request or response.
363            errors.GceOperationTimeoutError: Operation takes too long to finish.
364        """
365        if self.CheckImageExists(image_name):
366            return
367        if (source_uri and source_disk) or (not source_uri
368                                            and not source_disk):
369            raise errors.DriverError(
370                "Creating image %s requires either source_uri %s or "
371                "source_disk %s but not both" % (image_name, source_uri,
372                                                 source_disk))
373        elif source_uri:
374            logger.info("Creating image %s, source_uri %s", image_name,
375                        source_uri)
376            body = {
377                "name": image_name,
378                "rawDisk": {
379                    "source": source_uri,
380                },
381            }
382        else:
383            logger.info("Creating image %s, source_disk %s", image_name,
384                        source_disk)
385            body = {
386                "name": image_name,
387                "sourceDisk": source_disk,
388            }
389        if labels is not None:
390            body["labels"] = labels
391        api = self.service.images().insert(project=self._project, body=body)
392        operation = self.Execute(api)
393        try:
394            self.WaitOnOperation(
395                operation=operation, operation_scope=OperationScope.GLOBAL)
396        except errors.DriverError:
397            logger.error("Creating image failed, cleaning up: %s", image_name)
398            if self.CheckImageExists(image_name):
399                self.DeleteImage(image_name)
400            raise
401        logger.info("Image %s has been created.", image_name)
402
403    @utils.RetryOnException(_IsFingerPrintError,
404                            _MAX_RETRIES_ON_FINGERPRINT_CONFLICT)
405    def SetImageLabels(self, image_name, new_labels):
406        """Update image's labels. Retry for finger print conflict.
407
408        Note: Decorator RetryOnException will retry the call for FingerPrint
409          conflict (HTTP error code 412). The fingerprint is used to detect
410          conflicts of GCE resource updates. The fingerprint is initially generated
411          by Compute Engine and changes after every request to modify or update
412          resources (e.g. GCE "image" resource has "fingerPrint" for "labels"
413          updates).
414
415        Args:
416            image_name: A string, the image name.
417            new_labels: Dict, will be added to the image's labels.
418
419        Returns:
420            A GlobalOperation resouce.
421            https://cloud.google.com/compute/docs/reference/latest/globalOperations
422        """
423        image = self.GetImage(image_name)
424        labels = image.get("labels", {})
425        labels.update(new_labels)
426        body = {
427            "labels": labels,
428            "labelFingerprint": image["labelFingerprint"]
429        }
430        api = self.service.images().setLabels(
431            project=self._project, resource=image_name, body=body)
432        return self.Execute(api)
433
434    def CheckImageExists(self, image_name):
435        """Check if image exists.
436
437        Args:
438            image_name: A string
439
440        Returns:
441            True if image exists, otherwise False.
442        """
443        try:
444            self.GetImage(image_name)
445            exists = True
446        except errors.ResourceNotFoundError:
447            exists = False
448        logger.debug("CheckImageExists: image_name: %s, result: %s",
449                     image_name, exists)
450        return exists
451
452    def GetImage(self, image_name, image_project=None):
453        """Get image information.
454
455        Args:
456            image_name: A string
457            image_project: A string
458
459        Returns:
460            An image resource in json.
461            https://cloud.google.com/compute/docs/reference/latest/images#resource
462        """
463        api = self.service.images().get(
464            project=image_project or self._project, image=image_name)
465        return self.Execute(api)
466
467    def DeleteImage(self, image_name):
468        """Delete an image.
469
470        Args:
471            image_name: A string
472        """
473        logger.info("Deleting image %s", image_name)
474        api = self.service.images().delete(
475            project=self._project, image=image_name)
476        operation = self.Execute(api)
477        self.WaitOnOperation(
478            operation=operation, operation_scope=OperationScope.GLOBAL)
479        logger.info("Deleted image %s", image_name)
480
481    def DeleteImages(self, image_names):
482        """Delete multiple images.
483
484        Args:
485            image_names: A list of image names.
486
487        Returns:
488            A tuple, (deleted, failed, error_msgs)
489            deleted: A list of names of images that have been deleted.
490            failed: A list of names of images that we fail to delete.
491            error_msgs: A list of failure messages.
492        """
493        if not image_names:
494            return [], [], []
495        # Batch send deletion requests.
496        logger.info("Deleting images: %s", image_names)
497        delete_requests = {}
498        for image_name in set(image_names):
499            request = self.service.images().delete(
500                project=self._project, image=image_name)
501            delete_requests[image_name] = request
502        return self._BatchExecuteAndWait(delete_requests,
503                                         OperationScope.GLOBAL)
504
505    def ListImages(self, image_filter=None, image_project=None):
506        """List images.
507
508        Args:
509            image_filter: A string representing a filter in format of
510                          FIELD_NAME COMPARISON_STRING LITERAL_STRING
511                          e.g. "name ne example-image"
512                          e.g. "name eq "example-image-[0-9]+""
513            image_project: String. If not provided, will list images from the default
514                           project. Otherwise, will list images from the given
515                           project, which can be any arbitrary project where the
516                           account has read access
517                           (i.e. has the role "roles/compute.imageUser")
518
519        Read more about image sharing across project:
520        https://cloud.google.com/compute/docs/images/sharing-images-across-projects
521
522        Returns:
523            A list of images.
524        """
525        return self.ListWithMultiPages(
526            api_resource=self.service.images().list,
527            project=image_project or self._project,
528            filter=image_filter)
529
530    def GetInstance(self, instance, zone):
531        """Get information about an instance.
532
533        Args:
534            instance: A string, representing instance name.
535            zone: A string, representing zone name. e.g. "us-central1-f"
536
537        Returns:
538            An instance resource in json.
539            https://cloud.google.com/compute/docs/reference/latest/instances#resource
540        """
541        api = self.service.instances().get(
542            project=self._project, zone=zone, instance=instance)
543        return self.Execute(api)
544
545    def AttachAccelerator(self, instance, zone, accelerator_count,
546                          accelerator_type):
547        """Attach a GPU accelerator to the instance.
548
549        Note: In order for this to succeed the following must hold:
550        - The machine schedule must be set to "terminate" i.e:
551          SetScheduling(self, instance, zone, on_host_maintenance="terminate")
552          must have been called.
553        - The machine is not starting or running. i.e.
554          StopInstance(self, instance) must have been called.
555
556        Args:
557            instance: A string, representing instance name.
558            zone: String, name of zone.
559            accelerator_count: The number accelerators to be attached to the instance.
560             a value of 0 will detach all accelerators.
561            accelerator_type: The type of accelerator to attach. e.g.
562              "nvidia-tesla-k80"
563        """
564        body = {
565            "guestAccelerators": [{
566                "acceleratorType":
567                self.GetAcceleratorUrl(accelerator_type, zone),
568                "acceleratorCount":
569                accelerator_count
570            }]
571        }
572        api = self.service.instances().setMachineResources(
573            project=self._project, zone=zone, instance=instance, body=body)
574        operation = self.Execute(api)
575        try:
576            self.WaitOnOperation(
577                operation=operation,
578                operation_scope=OperationScope.ZONE,
579                scope_name=zone)
580        except errors.GceOperationTimeoutError:
581            logger.error("Attach instance failed: %s", instance)
582            raise
583        logger.info("%d x %s have been attached to instance %s.",
584                    accelerator_count, accelerator_type, instance)
585
586    def AttachDisk(self, instance, zone, **kwargs):
587        """Attach the external disk to the instance.
588
589        Args:
590            instance: A string, representing instance name.
591            zone: String, name of zone.
592            **kwargs: The attachDisk request body. See "https://cloud.google.com/
593              compute/docs/reference/latest/instances/attachDisk" for detail.
594              {
595                "kind": "compute#attachedDisk",
596                "type": string,
597                "mode": string,
598                "source": string,
599                "deviceName": string,
600                "index": integer,
601                "boot": boolean,
602                "initializeParams": {
603                  "diskName": string,
604                  "sourceImage": string,
605                  "diskSizeGb": long,
606                  "diskType": string,
607                  "sourceImageEncryptionKey": {
608                    "rawKey": string,
609                    "sha256": string
610                  }
611                },
612                "autoDelete": boolean,
613                "licenses": [
614                  string
615                ],
616                "interface": string,
617                "diskEncryptionKey": {
618                  "rawKey": string,
619                  "sha256": string
620                }
621              }
622
623        Returns:
624            An disk resource in json.
625            https://cloud.google.com/compute/docs/reference/latest/disks#resource
626
627
628        Raises:
629            errors.GceOperationTimeoutError: Operation takes too long to finish.
630        """
631        api = self.service.instances().attachDisk(
632            project=self._project, zone=zone, instance=instance, body=kwargs)
633        operation = self.Execute(api)
634        try:
635            self.WaitOnOperation(
636                operation=operation,
637                operation_scope=OperationScope.ZONE,
638                scope_name=zone)
639        except errors.GceOperationTimeoutError:
640            logger.error("Attach instance failed: %s", instance)
641            raise
642        logger.info("Disk has been attached to instance %s.", instance)
643
644    def DetachDisk(self, instance, zone, disk_name):
645        """Attach the external disk to the instance.
646
647        Args:
648            instance: A string, representing instance name.
649            zone: String, name of zone.
650            disk_name: A string, the name of the detach disk.
651
652        Returns:
653            A ZoneOperation resource.
654            See https://cloud.google.com/compute/docs/reference/latest/zoneOperations
655
656        Raises:
657            errors.GceOperationTimeoutError: Operation takes too long to finish.
658        """
659        api = self.service.instances().detachDisk(
660            project=self._project,
661            zone=zone,
662            instance=instance,
663            deviceName=disk_name)
664        operation = self.Execute(api)
665        try:
666            self.WaitOnOperation(
667                operation=operation,
668                operation_scope=OperationScope.ZONE,
669                scope_name=zone)
670        except errors.GceOperationTimeoutError:
671            logger.error("Detach instance failed: %s", instance)
672            raise
673        logger.info("Disk has been detached to instance %s.", instance)
674
675    def StartInstance(self, instance, zone):
676        """Start |instance| in |zone|.
677
678        Args:
679            instance: A string, representing instance name.
680            zone: A string, representing zone name. e.g. "us-central1-f"
681
682        Raises:
683            errors.GceOperationTimeoutError: Operation takes too long to finish.
684        """
685        api = self.service.instances().start(
686            project=self._project, zone=zone, instance=instance)
687        operation = self.Execute(api)
688        try:
689            self.WaitOnOperation(
690                operation=operation,
691                operation_scope=OperationScope.ZONE,
692                scope_name=zone)
693        except errors.GceOperationTimeoutError:
694            logger.error("Start instance failed: %s", instance)
695            raise
696        logger.info("Instance %s has been started.", instance)
697
698    def StartInstances(self, instances, zone):
699        """Start |instances| in |zone|.
700
701        Args:
702            instances: A list of strings, representing instance names's list.
703            zone: A string, representing zone name. e.g. "us-central1-f"
704
705        Returns:
706            A tuple, (done, failed, error_msgs)
707            done: A list of string, representing the names of instances that
708              have been executed.
709            failed: A list of string, representing the names of instances that
710              we failed to execute.
711            error_msgs: A list of string, representing the failure messages.
712        """
713        action = functools.partial(
714            self.service.instances().start, project=self._project, zone=zone)
715        return self._BatchExecuteOnInstances(instances, zone, action)
716
717    def StopInstance(self, instance, zone):
718        """Stop |instance| in |zone|.
719
720        Args:
721            instance: A string, representing instance name.
722            zone: A string, representing zone name. e.g. "us-central1-f"
723
724        Raises:
725            errors.GceOperationTimeoutError: Operation takes too long to finish.
726        """
727        api = self.service.instances().stop(
728            project=self._project, zone=zone, instance=instance)
729        operation = self.Execute(api)
730        try:
731            self.WaitOnOperation(
732                operation=operation,
733                operation_scope=OperationScope.ZONE,
734                scope_name=zone)
735        except errors.GceOperationTimeoutError:
736            logger.error("Stop instance failed: %s", instance)
737            raise
738        logger.info("Instance %s has been terminated.", instance)
739
740    def StopInstances(self, instances, zone):
741        """Stop |instances| in |zone|.
742
743        Args:
744            instances: A list of strings, representing instance names's list.
745            zone: A string, representing zone name. e.g. "us-central1-f"
746
747        Returns:
748            A tuple, (done, failed, error_msgs)
749            done: A list of string, representing the names of instances that
750                  have been executed.
751            failed: A list of string, representing the names of instances that
752                    we failed to execute.
753            error_msgs: A list of string, representing the failure messages.
754        """
755        action = functools.partial(
756            self.service.instances().stop, project=self._project, zone=zone)
757        return self._BatchExecuteOnInstances(instances, zone, action)
758
759    def SetScheduling(self,
760                      instance,
761                      zone,
762                      automatic_restart=True,
763                      on_host_maintenance="MIGRATE"):
764        """Update scheduling config |automatic_restart| and |on_host_maintenance|.
765
766        Args:
767            instance: A string, representing instance name.
768            zone: A string, representing zone name. e.g. "us-central1-f".
769            automatic_restart: Boolean, determine whether the instance will
770                               automatically restart if it crashes or not,
771                               default to True.
772            on_host_maintenance: enum["MIGRATE", "TERMINATE"]
773                                 The instance's maintenance behavior, which
774                                 determines whether the instance is live
775                                 "MIGRATE" or "TERMINATE" when there is
776                                 a maintenance event.
777
778        Raises:
779            errors.GceOperationTimeoutError: Operation takes too long to finish.
780        """
781        body = {
782            "automaticRestart": automatic_restart,
783            "onHostMaintenance": on_host_maintenance
784        }
785        api = self.service.instances().setScheduling(
786            project=self._project, zone=zone, instance=instance, body=body)
787        operation = self.Execute(api)
788        try:
789            self.WaitOnOperation(
790                operation=operation,
791                operation_scope=OperationScope.ZONE,
792                scope_name=zone)
793        except errors.GceOperationTimeoutError:
794            logger.error("Set instance scheduling failed: %s", instance)
795            raise
796        logger.info(
797            "Instance scheduling changed:\n"
798            "    automaticRestart: %s\n"
799            "    onHostMaintenance: %s\n",
800            str(automatic_restart).lower(), on_host_maintenance)
801
802    def ListInstances(self, zone, instance_filter=None):
803        """List instances.
804
805        Args:
806            zone: A string, representing zone name. e.g. "us-central1-f"
807            instance_filter: A string representing a filter in format of
808                             FIELD_NAME COMPARISON_STRING LITERAL_STRING
809                             e.g. "name ne example-instance"
810                             e.g. "name eq "example-instance-[0-9]+""
811
812        Returns:
813            A list of instances.
814        """
815        return self.ListWithMultiPages(
816            api_resource=self.service.instances().list,
817            project=self._project,
818            zone=zone,
819            filter=instance_filter)
820
821    def SetSchedulingInstances(self,
822                               instances,
823                               zone,
824                               automatic_restart=True,
825                               on_host_maintenance="MIGRATE"):
826        """Update scheduling config |automatic_restart| and |on_host_maintenance|.
827
828        See //cloud/cluster/api/mixer_instances.proto Scheduling for config option.
829
830        Args:
831            instances: A list of string, representing instance names.
832            zone: A string, representing zone name. e.g. "us-central1-f".
833            automatic_restart: Boolean, determine whether the instance will
834                               automatically restart if it crashes or not,
835                               default to True.
836            on_host_maintenance: enum["MIGRATE", "TERMINATE"]
837                                 The instance's maintenance behavior, which
838                                 determines whether the instance is live
839                                 migrated or terminated when there is
840                                 a maintenance event.
841
842        Returns:
843            A tuple, (done, failed, error_msgs)
844            done: A list of string, representing the names of instances that
845                  have been executed.
846            failed: A list of string, representing the names of instances that
847                    we failed to execute.
848            error_msgs: A list of string, representing the failure messages.
849        """
850        body = {
851            "automaticRestart": automatic_restart,
852            "OnHostMaintenance": on_host_maintenance
853        }
854        action = functools.partial(
855            self.service.instances().setScheduling,
856            project=self._project,
857            zone=zone,
858            body=body)
859        return self._BatchExecuteOnInstances(instances, zone, action)
860
861    def _BatchExecuteOnInstances(self, instances, zone, action):
862        """Batch processing operations requiring computing time.
863
864        Args:
865            instances: A list of instance names.
866            zone: A string, e.g. "us-central1-f".
867            action: partial func, all kwargs for this gcloud action has been
868                    defined in the caller function (e.g. See "StartInstances")
869                    except 'instance' which will be defined by iterating the
870                    |instances|.
871
872        Returns:
873            A tuple, (done, failed, error_msgs)
874            done: A list of string, representing the names of instances that
875                  have been executed.
876            failed: A list of string, representing the names of instances that
877                    we failed to execute.
878            error_msgs: A list of string, representing the failure messages.
879        """
880        if not instances:
881            return [], [], []
882        # Batch send requests.
883        logger.info("Batch executing instances: %s", instances)
884        requests = {}
885        for instance_name in set(instances):
886            requests[instance_name] = action(instance=instance_name)
887        return self._BatchExecuteAndWait(
888            requests, operation_scope=OperationScope.ZONE, scope_name=zone)
889
890    def _BatchExecuteAndWait(self, requests, operation_scope, scope_name=None):
891        """Batch processing requests and wait on the operation.
892
893        Args:
894            requests: A dictionary. The key is a string representing the resource
895                      name. For example, an instance name, or an image name.
896            operation_scope: A value from OperationScope, "zone", "region",
897                             or "global".
898            scope_name: If operation_scope is "zone" or "region", this should be
899                        the name of the zone or region, e.g. "us-central1-f".
900        Returns:
901            A tuple, (done, failed, error_msgs)
902            done: A list of string, representing the resource names that have
903                  been executed.
904            failed: A list of string, representing resource names that
905                    we failed to execute.
906            error_msgs: A list of string, representing the failure messages.
907        """
908        results = self.BatchExecute(requests)
909        # Initialize return values
910        failed = []
911        error_msgs = []
912        for resource_name, (_, error) in results.iteritems():
913            if error is not None:
914                failed.append(resource_name)
915                error_msgs.append(str(error))
916        done = []
917        # Wait for the executing operations to finish.
918        logger.info("Waiting for executing operations")
919        for resource_name in requests.iterkeys():
920            operation, _ = results[resource_name]
921            if operation:
922                try:
923                    self.WaitOnOperation(operation, operation_scope,
924                                         scope_name)
925                    done.append(resource_name)
926                except errors.DriverError as exc:
927                    failed.append(resource_name)
928                    error_msgs.append(str(exc))
929        return done, failed, error_msgs
930
931    def ListZones(self):
932        """List all zone instances in the project.
933
934        Returns:
935            Gcompute response instance. For example:
936            {
937              "id": "projects/google.com%3Aandroid-build-staging/zones",
938              "kind": "compute#zoneList",
939              "selfLink": "https://www.googleapis.com/compute/v1/projects/"
940                  "google.com:android-build-staging/zones"
941              "items": [
942                {
943                  'creationTimestamp': '2014-07-15T10:44:08.663-07:00',
944                  'description': 'asia-east1-c',
945                  'id': '2222',
946                  'kind': 'compute#zone',
947                  'name': 'asia-east1-c',
948                  'region': 'https://www.googleapis.com/compute/v1/projects/'
949                      'google.com:android-build-staging/regions/asia-east1',
950                  'selfLink': 'https://www.googleapis.com/compute/v1/projects/'
951                      'google.com:android-build-staging/zones/asia-east1-c',
952                  'status': 'UP'
953                }, {
954                  'creationTimestamp': '2014-05-30T18:35:16.575-07:00',
955                  'description': 'asia-east1-b',
956                  'id': '2221',
957                  'kind': 'compute#zone',
958                  'name': 'asia-east1-b',
959                  'region': 'https://www.googleapis.com/compute/v1/projects/'
960                      'google.com:android-build-staging/regions/asia-east1',
961                  'selfLink': 'https://www.googleapis.com/compute/v1/projects'
962                      '/google.com:android-build-staging/zones/asia-east1-b',
963                  'status': 'UP'
964                }]
965            }
966            See cloud cluster's api/mixer_zones.proto
967        """
968        api = self.service.zones().list(project=self._project)
969        return self.Execute(api)
970
971    def ListRegions(self):
972        """List all the regions for a project.
973
974        Returns:
975            A dictionary containing all the zones and additional data. See this link
976            for the detailed response:
977            https://cloud.google.com/compute/docs/reference/latest/regions/list.
978            Example:
979            {
980              'items': [{
981                  'name':
982                      'us-central1',
983                  'quotas': [{
984                      'usage': 2.0,
985                      'limit': 24.0,
986                      'metric': 'CPUS'
987                  }, {
988                      'usage': 1.0,
989                      'limit': 23.0,
990                      'metric': 'IN_USE_ADDRESSES'
991                  }, {
992                      'usage': 209.0,
993                      'limit': 10240.0,
994                      'metric': 'DISKS_TOTAL_GB'
995                  }, {
996                      'usage': 1000.0,
997                      'limit': 20000.0,
998                      'metric': 'INSTANCES'
999                  }]
1000              },..]
1001            }
1002        """
1003        api = self.service.regions().list(project=self._project)
1004        return self.Execute(api)
1005
1006    def _GetNetworkArgs(self, network, zone):
1007        """Helper to generate network args that is used to create an instance.
1008
1009        Args:
1010            network: A string, e.g. "default".
1011            zone: String, representing zone name, e.g. "us-central1-f"
1012
1013        Returns:
1014            A dictionary representing network args.
1015        """
1016        return {
1017            "network": self.GetNetworkUrl(network),
1018            "subnetwork": self.GetSubnetworkUrl(network, zone),
1019            "accessConfigs": [{
1020                "name": "External NAT",
1021                "type": "ONE_TO_ONE_NAT"
1022            }]
1023        }
1024
1025    def _GetDiskArgs(self,
1026                     disk_name,
1027                     image_name,
1028                     image_project=None,
1029                     disk_size_gb=None):
1030        """Helper to generate disk args that is used to create an instance.
1031
1032        Args:
1033            disk_name: A string
1034            image_name: A string
1035            image_project: A string
1036            disk_size_gb: An integer
1037
1038        Returns:
1039            List holding dict of disk args.
1040        """
1041        args = copy.deepcopy(BASE_DISK_ARGS)
1042        args["initializeParams"] = {
1043            "diskName": disk_name,
1044            "sourceImage": self.GetImage(image_name,
1045                                         image_project)["selfLink"],
1046        }
1047        # TODO: Remove this check once it's validated that we can either pass in
1048        # a None diskSizeGb or we find an appropriate default val.
1049        if disk_size_gb:
1050            args["diskSizeGb"] = disk_size_gb
1051        return [args]
1052
1053    def _GetExtraDiskArgs(self, extra_disk_name, zone):
1054        """Get extra disk arg for given disk.
1055
1056        Args:
1057            extra_disk_name: String, name of the disk.
1058            zone: String, representing zone name, e.g. "us-central1-f"
1059
1060        Returns:
1061            A dictionary of disk args.
1062        """
1063        return [{
1064            "type": "PERSISTENT",
1065            "mode": "READ_WRITE",
1066            "source": "projects/%s/zones/%s/disks/%s" % (self._project, zone,
1067                                                         extra_disk_name),
1068            "autoDelete": True,
1069            "boot": False,
1070            "interface": "SCSI",
1071            "deviceName": extra_disk_name,
1072        }]
1073
1074    # pylint: disable=too-many-locals
1075    def CreateInstance(self,
1076                       instance,
1077                       image_name,
1078                       machine_type,
1079                       metadata,
1080                       network,
1081                       zone,
1082                       disk_args=None,
1083                       image_project=None,
1084                       gpu=None,
1085                       extra_disk_name=None,
1086                       labels=None,
1087                       extra_scopes=None):
1088        """Create a gce instance with a gce image.
1089
1090        Args:
1091            instance: String, instance name.
1092            image_name: String, source image used to create this disk.
1093            machine_type: String, representing machine_type,
1094                          e.g. "n1-standard-1"
1095            metadata: Dict, maps a metadata name to its value.
1096            network: String, representing network name, e.g. "default"
1097            zone: String, representing zone name, e.g. "us-central1-f"
1098            disk_args: A list of extra disk args (strings), see _GetDiskArgs
1099                       for example, if None, will create a disk using the given
1100                       image.
1101            image_project: String, name of the project where the image
1102                           belongs. Assume the default project if None.
1103            gpu: String, type of gpu to attach. e.g. "nvidia-tesla-k80", if
1104                 None no gpus will be attached. For more details see:
1105                 https://cloud.google.com/compute/docs/gpus/add-gpus
1106            extra_disk_name: String,the name of the extra disk to attach.
1107            labels: Dict, will be added to the instance's labels.
1108            extra_scopes: A list of extra scopes to be provided to the instance.
1109        """
1110        disk_args = (disk_args
1111                     or self._GetDiskArgs(instance, image_name, image_project))
1112        if extra_disk_name:
1113            disk_args.extend(self._GetExtraDiskArgs(extra_disk_name, zone))
1114
1115        scopes = []
1116        scopes.extend(self.DEFAULT_INSTANCE_SCOPE)
1117        if extra_scopes:
1118            scopes.extend(extra_scopes)
1119
1120        body = {
1121            "machineType": self.GetMachineType(machine_type, zone)["selfLink"],
1122            "name": instance,
1123            "networkInterfaces": [self._GetNetworkArgs(network, zone)],
1124            "disks": disk_args,
1125            "serviceAccounts": [{
1126                "email": "default",
1127                "scopes": scopes,
1128            }],
1129        }
1130
1131
1132        if labels is not None:
1133            body["labels"] = labels
1134        if gpu:
1135            body["guestAccelerators"] = [{
1136                "acceleratorType": self.GetAcceleratorUrl(gpu, zone),
1137                "acceleratorCount": 1
1138            }]
1139            # Instances with GPUs cannot live migrate because they are assigned
1140            # to specific hardware devices.
1141            body["scheduling"] = {"onHostMaintenance": "terminate"}
1142        if metadata:
1143            metadata_list = [{
1144                _METADATA_KEY: key,
1145                _METADATA_KEY_VALUE: val
1146            } for key, val in metadata.iteritems()]
1147            body[_METADATA] = {_ITEMS: metadata_list}
1148        logger.info("Creating instance: project %s, zone %s, body:%s",
1149                    self._project, zone, body)
1150        api = self.service.instances().insert(
1151            project=self._project, zone=zone, body=body)
1152        operation = self.Execute(api)
1153        self.WaitOnOperation(
1154            operation, operation_scope=OperationScope.ZONE, scope_name=zone)
1155        logger.info("Instance %s has been created.", instance)
1156
1157    def DeleteInstance(self, instance, zone):
1158        """Delete a gce instance.
1159
1160        Args:
1161            instance: A string, instance name.
1162            zone: A string, e.g. "us-central1-f"
1163        """
1164        logger.info("Deleting instance: %s", instance)
1165        api = self.service.instances().delete(
1166            project=self._project, zone=zone, instance=instance)
1167        operation = self.Execute(api)
1168        self.WaitOnOperation(
1169            operation, operation_scope=OperationScope.ZONE, scope_name=zone)
1170        logger.info("Deleted instance: %s", instance)
1171
1172    def DeleteInstances(self, instances, zone):
1173        """Delete multiple instances.
1174
1175        Args:
1176            instances: A list of instance names.
1177            zone: A string, e.g. "us-central1-f".
1178
1179        Returns:
1180            A tuple, (deleted, failed, error_msgs)
1181            deleted: A list of names of instances that have been deleted.
1182            failed: A list of names of instances that we fail to delete.
1183            error_msgs: A list of failure messages.
1184        """
1185        action = functools.partial(
1186            self.service.instances().delete, project=self._project, zone=zone)
1187        return self._BatchExecuteOnInstances(instances, zone, action)
1188
1189    def ResetInstance(self, instance, zone):
1190        """Reset the gce instance.
1191
1192        Args:
1193            instance: A string, instance name.
1194            zone: A string, e.g. "us-central1-f".
1195        """
1196        logger.info("Resetting instance: %s", instance)
1197        api = self.service.instances().reset(
1198            project=self._project, zone=zone, instance=instance)
1199        operation = self.Execute(api)
1200        self.WaitOnOperation(
1201            operation, operation_scope=OperationScope.ZONE, scope_name=zone)
1202        logger.info("Instance has been reset: %s", instance)
1203
1204    def GetMachineType(self, machine_type, zone):
1205        """Get URL for a given machine typle.
1206
1207        Args:
1208            machine_type: A string, name of the machine type.
1209            zone: A string, e.g. "us-central1-f"
1210
1211        Returns:
1212            A machine type resource in json.
1213            https://cloud.google.com/compute/docs/reference/latest/
1214            machineTypes#resource
1215        """
1216        api = self.service.machineTypes().get(
1217            project=self._project, zone=zone, machineType=machine_type)
1218        return self.Execute(api)
1219
1220    def GetAcceleratorUrl(self, accelerator_type, zone):
1221        """Get URL for a given type of accelator.
1222
1223        Args:
1224            accelerator_type: A string, representing the accelerator, e.g
1225              "nvidia-tesla-k80"
1226            zone: A string representing a zone, e.g. "us-west1-b"
1227
1228        Returns:
1229            A URL that points to the accelerator resource, e.g.
1230            https://www.googleapis.com/compute/v1/projects/<project id>/zones/
1231            us-west1-b/acceleratorTypes/nvidia-tesla-k80
1232        """
1233        api = self.service.acceleratorTypes().get(
1234            project=self._project, zone=zone, acceleratorType=accelerator_type)
1235        result = self.Execute(api)
1236        return result["selfLink"]
1237
1238    def GetNetworkUrl(self, network):
1239        """Get URL for a given network.
1240
1241        Args:
1242            network: A string, representing network name, e.g "default"
1243
1244        Returns:
1245            A URL that points to the network resource, e.g.
1246            https://www.googleapis.com/compute/v1/projects/<project id>/
1247            global/networks/default
1248        """
1249        api = self.service.networks().get(
1250            project=self._project, network=network)
1251        result = self.Execute(api)
1252        return result["selfLink"]
1253
1254    def GetSubnetworkUrl(self, network, zone):
1255        """Get URL for a given network and zone.
1256
1257        Return the subnetwork for the network in the specified region that the
1258        specified zone resides in. If there is no subnetwork for the specified
1259        zone, raise an exception.
1260
1261        Args:
1262            network: A string, representing network name, e.g "default"
1263            zone: String, representing zone name, e.g. "us-central1-f"
1264
1265        Returns:
1266            A URL that points to the network resource, e.g.
1267            https://www.googleapis.com/compute/v1/projects/<project id>/
1268            global/networks/default
1269
1270        Raises:
1271            errors.NoSubnetwork: When no subnetwork exists for the zone
1272            specified.
1273        """
1274        api = self.service.networks().get(
1275            project=self._project, network=network)
1276        result = self.Execute(api)
1277        region = zone.rsplit("-", 1)[0]
1278        for subnetwork in result["subnetworks"]:
1279            if region in subnetwork:
1280                return subnetwork
1281        raise errors.NoSubnetwork("No subnetwork for network %s in region %s" %
1282                                  (network, region))
1283
1284    def CompareMachineSize(self, machine_type_1, machine_type_2, zone):
1285        """Compare the size of two machine types.
1286
1287        Args:
1288            machine_type_1: A string representing a machine type, e.g. n1-standard-1
1289            machine_type_2: A string representing a machine type, e.g. n1-standard-1
1290            zone: A string representing a zone, e.g. "us-central1-f"
1291
1292        Returns:
1293            -1 if any metric of machine size of the first type is smaller than
1294                the second type.
1295            0 if all metrics of machine size are equal.
1296            1 if at least one metric of machine size of the first type is
1297                greater than the second type and all metrics of first type are
1298                greater or equal to the second type.
1299
1300        Raises:
1301            errors.DriverError: For malformed response.
1302        """
1303        machine_info_1 = self.GetMachineType(machine_type_1, zone)
1304        machine_info_2 = self.GetMachineType(machine_type_2, zone)
1305        result = 0
1306        for metric in self.MACHINE_SIZE_METRICS:
1307            if metric not in machine_info_1 or metric not in machine_info_2:
1308                raise errors.DriverError(
1309                    "Malformed machine size record: Can't find '%s' in %s or %s"
1310                    % (metric, machine_info_1, machine_info_2))
1311            cmp_result = machine_info_1[metric] - machine_info_2[metric]
1312            if  cmp_result < 0:
1313                return -1
1314            elif cmp_result > 0:
1315                result = 1
1316        return result
1317
1318    def GetSerialPortOutput(self, instance, zone, port=1):
1319        """Get serial port output.
1320
1321        Args:
1322            instance: string, instance name.
1323            zone: string, zone name.
1324            port: int, which COM port to read from, 1-4, default to 1.
1325
1326        Returns:
1327            String, contents of the output.
1328
1329        Raises:
1330            errors.DriverError: For malformed response.
1331        """
1332        api = self.service.instances().getSerialPortOutput(
1333            project=self._project, zone=zone, instance=instance, port=port)
1334        result = self.Execute(api)
1335        if "contents" not in result:
1336            raise errors.DriverError(
1337                "Malformed response for GetSerialPortOutput: %s" % result)
1338        return result["contents"]
1339
1340    def GetInstanceNamesByIPs(self, ips, zone):
1341        """Get Instance names by IPs.
1342
1343        This function will go through all instances, which
1344        could be slow if there are too many instances.  However, currently
1345        GCE doesn't support search for instance by IP.
1346
1347        Args:
1348            ips: A set of IPs.
1349            zone: String, name of the zone.
1350
1351        Returns:
1352            A dictionary where key is IP and value is instance name or None
1353            if instance is not found for the given IP.
1354        """
1355        ip_name_map = dict.fromkeys(ips)
1356        for instance in self.ListInstances(zone):
1357            try:
1358                ip = instance["networkInterfaces"][0]["accessConfigs"][0][
1359                    "natIP"]
1360                if ip in ips:
1361                    ip_name_map[ip] = instance["name"]
1362            except (IndexError, KeyError) as e:
1363                logger.error("Could not get instance names by ips: %s", str(e))
1364        return ip_name_map
1365
1366    def GetInstanceIP(self, instance, zone):
1367        """Get Instance IP given instance name.
1368
1369        Args:
1370            instance: String, representing instance name.
1371            zone: String, name of the zone.
1372
1373        Returns:
1374            NamedTuple of (internal, external) IP of the instance.
1375        """
1376        instance = self.GetInstance(instance, zone)
1377        internal_ip = instance["networkInterfaces"][0]["networkIP"]
1378        external_ip = instance["networkInterfaces"][0]["accessConfigs"][0]["natIP"]
1379        return IP(internal=internal_ip, external=external_ip)
1380
1381    @utils.TimeExecute(function_description="Updating instance metadata: ")
1382    def SetInstanceMetadata(self, zone, instance, body):
1383        """Set instance metadata.
1384
1385        Args:
1386            zone: String, name of zone.
1387            instance: String, representing instance name.
1388            body: Dict, Metadata body.
1389                  metdata is in the following format.
1390                  {
1391                    "kind": "compute#metadata",
1392                    "fingerprint": "a-23icsyx4E=",
1393                    "items": [
1394                      {
1395                        "key": "sshKeys",
1396                        "value": "key"
1397                      }, ...
1398                    ]
1399                  }
1400        """
1401        api = self.service.instances().setMetadata(
1402            project=self._project, zone=zone, instance=instance, body=body)
1403        operation = self.Execute(api)
1404        self.WaitOnOperation(
1405            operation, operation_scope=OperationScope.ZONE, scope_name=zone)
1406
1407    def AddSshRsaInstanceMetadata(self, zone, user, ssh_rsa_path, instance):
1408        """Add the public rsa key to the instance's metadata.
1409
1410        Confirm that the instance has this public key in the instance's
1411        metadata, if not we will add this public key.
1412
1413        Args:
1414            zone: String, name of zone.
1415            user: String, name of the user which the key belongs to.
1416            ssh_rsa_path: String, The absolute path to public rsa key.
1417            instance: String, representing instance name.
1418        """
1419        ssh_rsa_path = os.path.expanduser(ssh_rsa_path)
1420        rsa = GetRsaKey(ssh_rsa_path)
1421        entry = "%s:%s" % (user, rsa)
1422        logger.debug("New RSA entry: %s", entry)
1423
1424        gce_instance = self.GetInstance(instance, zone)
1425        metadata = gce_instance.get(_METADATA)
1426        if RsaNotInMetadata(metadata, entry):
1427            self.UpdateRsaInMetadata(zone, instance, metadata, entry)
1428
1429    def CheckAccess(self):
1430        """Check if the user has read access to the cloud project.
1431
1432        Returns:
1433            True if the user has at least read access to the project.
1434            False otherwise.
1435
1436        Raises:
1437            errors.HttpError if other unexpected error happens when
1438            accessing the project.
1439        """
1440        api = self.service.zones().list(project=self._project)
1441        retry_http_codes = copy.copy(self.RETRY_HTTP_CODES)
1442        retry_http_codes.remove(self.ACCESS_DENIED_CODE)
1443        try:
1444            self.Execute(api, retry_http_codes=retry_http_codes)
1445        except errors.HttpError as e:
1446            if e.code == self.ACCESS_DENIED_CODE:
1447                return False
1448            raise
1449        return True
1450
1451    def UpdateRsaInMetadata(self, zone, instance, metadata, entry):
1452        """Update ssh public key to sshKeys's value in this metadata.
1453
1454        Args:
1455            zone: String, name of zone.
1456            instance: String, representing instance name.
1457            metadata: Dict, maps a metadata name to its value.
1458            entry: String, ssh public key.
1459        """
1460        ssh_key_item = GetSshKeyFromMetadata(metadata)
1461        if ssh_key_item:
1462            # The ssh key exists in the metadata so update the reference to it
1463            # in the metadata. There may not be an actual ssh key value so
1464            # that's why we filter for None to avoid an empty line in front.
1465            ssh_key_item[_METADATA_KEY_VALUE] = "\n".join(
1466                filter(None, [ssh_key_item[_METADATA_KEY_VALUE], entry]))
1467        else:
1468            # Since there is no ssh key item in the metadata, we need to add it in.
1469            ssh_key_item = {_METADATA_KEY: _SSH_KEYS_NAME,
1470                            _METADATA_KEY_VALUE: entry}
1471            metadata[_ITEMS].append(ssh_key_item)
1472        utils.PrintColorString(
1473            "Ssh public key doesn't exist in the instance(%s), adding it."
1474            % instance, utils.TextColors.WARNING)
1475        self.SetInstanceMetadata(zone, instance, metadata)
1476
1477
1478def RsaNotInMetadata(metadata, entry):
1479    """Check ssh public key exist in sshKeys's value.
1480
1481    Args:
1482        metadata: Dict, maps a metadata name to its value.
1483        entry: String, ssh public key.
1484
1485    Returns:
1486        Boolean. True if ssh public key doesn't exist in metadata.
1487    """
1488    for item in metadata.setdefault(_ITEMS, []):
1489        if item[_METADATA_KEY] == _SSH_KEYS_NAME:
1490            if entry in item[_METADATA_KEY_VALUE]:
1491                return False
1492    return True
1493
1494
1495def GetSshKeyFromMetadata(metadata):
1496    """Get ssh key item from metadata.
1497
1498    Args:
1499        metadata: Dict, maps a metadata name to its value.
1500
1501    Returns:
1502        Dict of ssk_key_item in metadata, None if can't find the ssh key item
1503        in metadata.
1504    """
1505    for item in metadata.setdefault(_ITEMS, []):
1506        if item.get(_METADATA_KEY, '') == _SSH_KEYS_NAME:
1507            return item
1508    return None
1509
1510
1511def GetRsaKey(ssh_rsa_path):
1512    """Get rsa key from rsa path.
1513
1514    Args:
1515        ssh_rsa_path: String, The absolute path to public rsa key.
1516
1517    Returns:
1518        String, rsa key.
1519
1520    Raises:
1521        errors.DriverError: RSA file does not exist.
1522    """
1523    ssh_rsa_path = os.path.expanduser(ssh_rsa_path)
1524    if not os.path.exists(ssh_rsa_path):
1525        raise errors.DriverError(
1526            "RSA file %s does not exist." % ssh_rsa_path)
1527
1528    with open(ssh_rsa_path) as f:
1529        rsa = f.read()
1530        # The space must be removed here for string processing,
1531        # if it is not string, it doesn't have a strip function.
1532        rsa = rsa.strip() if rsa else rsa
1533        utils.VerifyRsaPubKey(rsa)
1534    return rsa
1535