• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2014 Google Inc. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Channel notifications support.
16
17Classes and functions to support channel subscriptions and notifications
18on those channels.
19
20Notes:
21  - This code is based on experimental APIs and is subject to change.
22  - Notification does not do deduplication of notification ids, that's up to
23    the receiver.
24  - Storing the Channel between calls is up to the caller.
25
26
27Example setting up a channel:
28
29  # Create a new channel that gets notifications via webhook.
30  channel = new_webhook_channel("https://example.com/my_web_hook")
31
32  # Store the channel, keyed by 'channel.id'. Store it before calling the
33  # watch method because notifications may start arriving before the watch
34  # method returns.
35  ...
36
37  resp = service.objects().watchAll(
38    bucket="some_bucket_id", body=channel.body()).execute()
39  channel.update(resp)
40
41  # Store the channel, keyed by 'channel.id'. Store it after being updated
42  # since the resource_id value will now be correct, and that's needed to
43  # stop a subscription.
44  ...
45
46
47An example Webhook implementation using webapp2. Note that webapp2 puts
48headers in a case insensitive dictionary, as headers aren't guaranteed to
49always be upper case.
50
51  id = self.request.headers[X_GOOG_CHANNEL_ID]
52
53  # Retrieve the channel by id.
54  channel = ...
55
56  # Parse notification from the headers, including validating the id.
57  n = notification_from_headers(channel, self.request.headers)
58
59  # Do app specific stuff with the notification here.
60  if n.resource_state == 'sync':
61    # Code to handle sync state.
62  elif n.resource_state == 'exists':
63    # Code to handle the exists state.
64  elif n.resource_state == 'not_exists':
65    # Code to handle the not exists state.
66
67
68Example of unsubscribing.
69
70  service.channels().stop(channel.body()).execute()
71"""
72from __future__ import absolute_import
73
74import datetime
75import uuid
76
77from googleapiclient import errors
78from googleapiclient import _helpers as util
79import six
80
81
82# The unix time epoch starts at midnight 1970.
83EPOCH = datetime.datetime.utcfromtimestamp(0)
84
85# Map the names of the parameters in the JSON channel description to
86# the parameter names we use in the Channel class.
87CHANNEL_PARAMS = {
88    'address': 'address',
89    'id': 'id',
90    'expiration': 'expiration',
91    'params': 'params',
92    'resourceId': 'resource_id',
93    'resourceUri': 'resource_uri',
94    'type': 'type',
95    'token': 'token',
96    }
97
98X_GOOG_CHANNEL_ID     = 'X-GOOG-CHANNEL-ID'
99X_GOOG_MESSAGE_NUMBER = 'X-GOOG-MESSAGE-NUMBER'
100X_GOOG_RESOURCE_STATE = 'X-GOOG-RESOURCE-STATE'
101X_GOOG_RESOURCE_URI   = 'X-GOOG-RESOURCE-URI'
102X_GOOG_RESOURCE_ID    = 'X-GOOG-RESOURCE-ID'
103
104
105def _upper_header_keys(headers):
106  new_headers = {}
107  for k, v in six.iteritems(headers):
108    new_headers[k.upper()] = v
109  return new_headers
110
111
112class Notification(object):
113  """A Notification from a Channel.
114
115  Notifications are not usually constructed directly, but are returned
116  from functions like notification_from_headers().
117
118  Attributes:
119    message_number: int, The unique id number of this notification.
120    state: str, The state of the resource being monitored.
121    uri: str, The address of the resource being monitored.
122    resource_id: str, The unique identifier of the version of the resource at
123      this event.
124  """
125  @util.positional(5)
126  def __init__(self, message_number, state, resource_uri, resource_id):
127    """Notification constructor.
128
129    Args:
130      message_number: int, The unique id number of this notification.
131      state: str, The state of the resource being monitored. Can be one
132        of "exists", "not_exists", or "sync".
133      resource_uri: str, The address of the resource being monitored.
134      resource_id: str, The identifier of the watched resource.
135    """
136    self.message_number = message_number
137    self.state = state
138    self.resource_uri = resource_uri
139    self.resource_id = resource_id
140
141
142class Channel(object):
143  """A Channel for notifications.
144
145  Usually not constructed directly, instead it is returned from helper
146  functions like new_webhook_channel().
147
148  Attributes:
149    type: str, The type of delivery mechanism used by this channel. For
150      example, 'web_hook'.
151    id: str, A UUID for the channel.
152    token: str, An arbitrary string associated with the channel that
153      is delivered to the target address with each event delivered
154      over this channel.
155    address: str, The address of the receiving entity where events are
156      delivered. Specific to the channel type.
157    expiration: int, The time, in milliseconds from the epoch, when this
158      channel will expire.
159    params: dict, A dictionary of string to string, with additional parameters
160      controlling delivery channel behavior.
161    resource_id: str, An opaque id that identifies the resource that is
162      being watched. Stable across different API versions.
163    resource_uri: str, The canonicalized ID of the watched resource.
164  """
165
166  @util.positional(5)
167  def __init__(self, type, id, token, address, expiration=None,
168               params=None, resource_id="", resource_uri=""):
169    """Create a new Channel.
170
171    In user code, this Channel constructor will not typically be called
172    manually since there are functions for creating channels for each specific
173    type with a more customized set of arguments to pass.
174
175    Args:
176      type: str, The type of delivery mechanism used by this channel. For
177        example, 'web_hook'.
178      id: str, A UUID for the channel.
179      token: str, An arbitrary string associated with the channel that
180        is delivered to the target address with each event delivered
181        over this channel.
182      address: str,  The address of the receiving entity where events are
183        delivered. Specific to the channel type.
184      expiration: int, The time, in milliseconds from the epoch, when this
185        channel will expire.
186      params: dict, A dictionary of string to string, with additional parameters
187        controlling delivery channel behavior.
188      resource_id: str, An opaque id that identifies the resource that is
189        being watched. Stable across different API versions.
190      resource_uri: str, The canonicalized ID of the watched resource.
191    """
192    self.type = type
193    self.id = id
194    self.token = token
195    self.address = address
196    self.expiration = expiration
197    self.params = params
198    self.resource_id = resource_id
199    self.resource_uri = resource_uri
200
201  def body(self):
202    """Build a body from the Channel.
203
204    Constructs a dictionary that's appropriate for passing into watch()
205    methods as the value of body argument.
206
207    Returns:
208      A dictionary representation of the channel.
209    """
210    result = {
211        'id': self.id,
212        'token': self.token,
213        'type': self.type,
214        'address': self.address
215        }
216    if self.params:
217      result['params'] = self.params
218    if self.resource_id:
219      result['resourceId'] = self.resource_id
220    if self.resource_uri:
221      result['resourceUri'] = self.resource_uri
222    if self.expiration:
223      result['expiration'] = self.expiration
224
225    return result
226
227  def update(self, resp):
228    """Update a channel with information from the response of watch().
229
230    When a request is sent to watch() a resource, the response returned
231    from the watch() request is a dictionary with updated channel information,
232    such as the resource_id, which is needed when stopping a subscription.
233
234    Args:
235      resp: dict, The response from a watch() method.
236    """
237    for json_name, param_name in six.iteritems(CHANNEL_PARAMS):
238      value = resp.get(json_name)
239      if value is not None:
240        setattr(self, param_name, value)
241
242
243def notification_from_headers(channel, headers):
244  """Parse a notification from the webhook request headers, validate
245    the notification, and return a Notification object.
246
247  Args:
248    channel: Channel, The channel that the notification is associated with.
249    headers: dict, A dictionary like object that contains the request headers
250      from the webhook HTTP request.
251
252  Returns:
253    A Notification object.
254
255  Raises:
256    errors.InvalidNotificationError if the notification is invalid.
257    ValueError if the X-GOOG-MESSAGE-NUMBER can't be converted to an int.
258  """
259  headers = _upper_header_keys(headers)
260  channel_id = headers[X_GOOG_CHANNEL_ID]
261  if channel.id != channel_id:
262    raise errors.InvalidNotificationError(
263        'Channel id mismatch: %s != %s' % (channel.id, channel_id))
264  else:
265    message_number = int(headers[X_GOOG_MESSAGE_NUMBER])
266    state = headers[X_GOOG_RESOURCE_STATE]
267    resource_uri = headers[X_GOOG_RESOURCE_URI]
268    resource_id = headers[X_GOOG_RESOURCE_ID]
269    return Notification(message_number, state, resource_uri, resource_id)
270
271
272@util.positional(2)
273def new_webhook_channel(url, token=None, expiration=None, params=None):
274    """Create a new webhook Channel.
275
276    Args:
277      url: str, URL to post notifications to.
278      token: str, An arbitrary string associated with the channel that
279        is delivered to the target address with each notification delivered
280        over this channel.
281      expiration: datetime.datetime, A time in the future when the channel
282        should expire. Can also be None if the subscription should use the
283        default expiration. Note that different services may have different
284        limits on how long a subscription lasts. Check the response from the
285        watch() method to see the value the service has set for an expiration
286        time.
287      params: dict, Extra parameters to pass on channel creation. Currently
288        not used for webhook channels.
289    """
290    expiration_ms = 0
291    if expiration:
292      delta = expiration - EPOCH
293      expiration_ms = delta.microseconds/1000 + (
294          delta.seconds + delta.days*24*3600)*1000
295      if expiration_ms < 0:
296        expiration_ms = 0
297
298    return Channel('web_hook', str(uuid.uuid4()),
299                   token, url, expiration=expiration_ms,
300                   params=params)
301
302