1# Copyright 2014 Google Inc. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Channel notifications support. 16 17Classes and functions to support channel subscriptions and notifications 18on those channels. 19 20Notes: 21 - This code is based on experimental APIs and is subject to change. 22 - Notification does not do deduplication of notification ids, that's up to 23 the receiver. 24 - Storing the Channel between calls is up to the caller. 25 26 27Example setting up a channel: 28 29 # Create a new channel that gets notifications via webhook. 30 channel = new_webhook_channel("https://example.com/my_web_hook") 31 32 # Store the channel, keyed by 'channel.id'. Store it before calling the 33 # watch method because notifications may start arriving before the watch 34 # method returns. 35 ... 36 37 resp = service.objects().watchAll( 38 bucket="some_bucket_id", body=channel.body()).execute() 39 channel.update(resp) 40 41 # Store the channel, keyed by 'channel.id'. Store it after being updated 42 # since the resource_id value will now be correct, and that's needed to 43 # stop a subscription. 44 ... 45 46 47An example Webhook implementation using webapp2. Note that webapp2 puts 48headers in a case insensitive dictionary, as headers aren't guaranteed to 49always be upper case. 50 51 id = self.request.headers[X_GOOG_CHANNEL_ID] 52 53 # Retrieve the channel by id. 54 channel = ... 55 56 # Parse notification from the headers, including validating the id. 57 n = notification_from_headers(channel, self.request.headers) 58 59 # Do app specific stuff with the notification here. 60 if n.resource_state == 'sync': 61 # Code to handle sync state. 62 elif n.resource_state == 'exists': 63 # Code to handle the exists state. 64 elif n.resource_state == 'not_exists': 65 # Code to handle the not exists state. 66 67 68Example of unsubscribing. 69 70 service.channels().stop(channel.body()).execute() 71""" 72from __future__ import absolute_import 73 74import datetime 75import uuid 76 77from googleapiclient import errors 78from googleapiclient import _helpers as util 79import six 80 81 82# The unix time epoch starts at midnight 1970. 83EPOCH = datetime.datetime.utcfromtimestamp(0) 84 85# Map the names of the parameters in the JSON channel description to 86# the parameter names we use in the Channel class. 87CHANNEL_PARAMS = { 88 'address': 'address', 89 'id': 'id', 90 'expiration': 'expiration', 91 'params': 'params', 92 'resourceId': 'resource_id', 93 'resourceUri': 'resource_uri', 94 'type': 'type', 95 'token': 'token', 96 } 97 98X_GOOG_CHANNEL_ID = 'X-GOOG-CHANNEL-ID' 99X_GOOG_MESSAGE_NUMBER = 'X-GOOG-MESSAGE-NUMBER' 100X_GOOG_RESOURCE_STATE = 'X-GOOG-RESOURCE-STATE' 101X_GOOG_RESOURCE_URI = 'X-GOOG-RESOURCE-URI' 102X_GOOG_RESOURCE_ID = 'X-GOOG-RESOURCE-ID' 103 104 105def _upper_header_keys(headers): 106 new_headers = {} 107 for k, v in six.iteritems(headers): 108 new_headers[k.upper()] = v 109 return new_headers 110 111 112class Notification(object): 113 """A Notification from a Channel. 114 115 Notifications are not usually constructed directly, but are returned 116 from functions like notification_from_headers(). 117 118 Attributes: 119 message_number: int, The unique id number of this notification. 120 state: str, The state of the resource being monitored. 121 uri: str, The address of the resource being monitored. 122 resource_id: str, The unique identifier of the version of the resource at 123 this event. 124 """ 125 @util.positional(5) 126 def __init__(self, message_number, state, resource_uri, resource_id): 127 """Notification constructor. 128 129 Args: 130 message_number: int, The unique id number of this notification. 131 state: str, The state of the resource being monitored. Can be one 132 of "exists", "not_exists", or "sync". 133 resource_uri: str, The address of the resource being monitored. 134 resource_id: str, The identifier of the watched resource. 135 """ 136 self.message_number = message_number 137 self.state = state 138 self.resource_uri = resource_uri 139 self.resource_id = resource_id 140 141 142class Channel(object): 143 """A Channel for notifications. 144 145 Usually not constructed directly, instead it is returned from helper 146 functions like new_webhook_channel(). 147 148 Attributes: 149 type: str, The type of delivery mechanism used by this channel. For 150 example, 'web_hook'. 151 id: str, A UUID for the channel. 152 token: str, An arbitrary string associated with the channel that 153 is delivered to the target address with each event delivered 154 over this channel. 155 address: str, The address of the receiving entity where events are 156 delivered. Specific to the channel type. 157 expiration: int, The time, in milliseconds from the epoch, when this 158 channel will expire. 159 params: dict, A dictionary of string to string, with additional parameters 160 controlling delivery channel behavior. 161 resource_id: str, An opaque id that identifies the resource that is 162 being watched. Stable across different API versions. 163 resource_uri: str, The canonicalized ID of the watched resource. 164 """ 165 166 @util.positional(5) 167 def __init__(self, type, id, token, address, expiration=None, 168 params=None, resource_id="", resource_uri=""): 169 """Create a new Channel. 170 171 In user code, this Channel constructor will not typically be called 172 manually since there are functions for creating channels for each specific 173 type with a more customized set of arguments to pass. 174 175 Args: 176 type: str, The type of delivery mechanism used by this channel. For 177 example, 'web_hook'. 178 id: str, A UUID for the channel. 179 token: str, An arbitrary string associated with the channel that 180 is delivered to the target address with each event delivered 181 over this channel. 182 address: str, The address of the receiving entity where events are 183 delivered. Specific to the channel type. 184 expiration: int, The time, in milliseconds from the epoch, when this 185 channel will expire. 186 params: dict, A dictionary of string to string, with additional parameters 187 controlling delivery channel behavior. 188 resource_id: str, An opaque id that identifies the resource that is 189 being watched. Stable across different API versions. 190 resource_uri: str, The canonicalized ID of the watched resource. 191 """ 192 self.type = type 193 self.id = id 194 self.token = token 195 self.address = address 196 self.expiration = expiration 197 self.params = params 198 self.resource_id = resource_id 199 self.resource_uri = resource_uri 200 201 def body(self): 202 """Build a body from the Channel. 203 204 Constructs a dictionary that's appropriate for passing into watch() 205 methods as the value of body argument. 206 207 Returns: 208 A dictionary representation of the channel. 209 """ 210 result = { 211 'id': self.id, 212 'token': self.token, 213 'type': self.type, 214 'address': self.address 215 } 216 if self.params: 217 result['params'] = self.params 218 if self.resource_id: 219 result['resourceId'] = self.resource_id 220 if self.resource_uri: 221 result['resourceUri'] = self.resource_uri 222 if self.expiration: 223 result['expiration'] = self.expiration 224 225 return result 226 227 def update(self, resp): 228 """Update a channel with information from the response of watch(). 229 230 When a request is sent to watch() a resource, the response returned 231 from the watch() request is a dictionary with updated channel information, 232 such as the resource_id, which is needed when stopping a subscription. 233 234 Args: 235 resp: dict, The response from a watch() method. 236 """ 237 for json_name, param_name in six.iteritems(CHANNEL_PARAMS): 238 value = resp.get(json_name) 239 if value is not None: 240 setattr(self, param_name, value) 241 242 243def notification_from_headers(channel, headers): 244 """Parse a notification from the webhook request headers, validate 245 the notification, and return a Notification object. 246 247 Args: 248 channel: Channel, The channel that the notification is associated with. 249 headers: dict, A dictionary like object that contains the request headers 250 from the webhook HTTP request. 251 252 Returns: 253 A Notification object. 254 255 Raises: 256 errors.InvalidNotificationError if the notification is invalid. 257 ValueError if the X-GOOG-MESSAGE-NUMBER can't be converted to an int. 258 """ 259 headers = _upper_header_keys(headers) 260 channel_id = headers[X_GOOG_CHANNEL_ID] 261 if channel.id != channel_id: 262 raise errors.InvalidNotificationError( 263 'Channel id mismatch: %s != %s' % (channel.id, channel_id)) 264 else: 265 message_number = int(headers[X_GOOG_MESSAGE_NUMBER]) 266 state = headers[X_GOOG_RESOURCE_STATE] 267 resource_uri = headers[X_GOOG_RESOURCE_URI] 268 resource_id = headers[X_GOOG_RESOURCE_ID] 269 return Notification(message_number, state, resource_uri, resource_id) 270 271 272@util.positional(2) 273def new_webhook_channel(url, token=None, expiration=None, params=None): 274 """Create a new webhook Channel. 275 276 Args: 277 url: str, URL to post notifications to. 278 token: str, An arbitrary string associated with the channel that 279 is delivered to the target address with each notification delivered 280 over this channel. 281 expiration: datetime.datetime, A time in the future when the channel 282 should expire. Can also be None if the subscription should use the 283 default expiration. Note that different services may have different 284 limits on how long a subscription lasts. Check the response from the 285 watch() method to see the value the service has set for an expiration 286 time. 287 params: dict, Extra parameters to pass on channel creation. Currently 288 not used for webhook channels. 289 """ 290 expiration_ms = 0 291 if expiration: 292 delta = expiration - EPOCH 293 expiration_ms = delta.microseconds/1000 + ( 294 delta.seconds + delta.days*24*3600)*1000 295 if expiration_ms < 0: 296 expiration_ms = 0 297 298 return Channel('web_hook', str(uuid.uuid4()), 299 token, url, expiration=expiration_ms, 300 params=params) 301 302