1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 """Channel notifications support.
16
17 Classes and functions to support channel subscriptions and notifications
18 on those channels.
19
20 Notes:
21 - This code is based on experimental APIs and is subject to change.
22 - Notification does not do deduplication of notification ids, that's up to
23 the receiver.
24 - Storing the Channel between calls is up to the caller.
25
26
27 Example setting up a channel:
28
29 # Create a new channel that gets notifications via webhook.
30 channel = new_webhook_channel("https://example.com/my_web_hook")
31
32 # Store the channel, keyed by 'channel.id'. Store it before calling the
33 # watch method because notifications may start arriving before the watch
34 # method returns.
35 ...
36
37 resp = service.objects().watchAll(
38 bucket="some_bucket_id", body=channel.body()).execute()
39 channel.update(resp)
40
41 # Store the channel, keyed by 'channel.id'. Store it after being updated
42 # since the resource_id value will now be correct, and that's needed to
43 # stop a subscription.
44 ...
45
46
47 An example Webhook implementation using webapp2. Note that webapp2 puts
48 headers in a case insensitive dictionary, as headers aren't guaranteed to
49 always be upper case.
50
51 id = self.request.headers[X_GOOG_CHANNEL_ID]
52
53 # Retrieve the channel by id.
54 channel = ...
55
56 # Parse notification from the headers, including validating the id.
57 n = notification_from_headers(channel, self.request.headers)
58
59 # Do app specific stuff with the notification here.
60 if n.resource_state == 'sync':
61 # Code to handle sync state.
62 elif n.resource_state == 'exists':
63 # Code to handle the exists state.
64 elif n.resource_state == 'not_exists':
65 # Code to handle the not exists state.
66
67
68 Example of unsubscribing.
69
70 service.channels().stop(channel.body()).execute()
71 """
72 from __future__ import absolute_import
73
74 import datetime
75 import uuid
76
77 from googleapiclient import errors
78 from googleapiclient import _helpers as util
79 import six
80
81
82
83 EPOCH = datetime.datetime.utcfromtimestamp(0)
84
85
86
87 CHANNEL_PARAMS = {
88 'address': 'address',
89 'id': 'id',
90 'expiration': 'expiration',
91 'params': 'params',
92 'resourceId': 'resource_id',
93 'resourceUri': 'resource_uri',
94 'type': 'type',
95 'token': 'token',
96 }
97
98 X_GOOG_CHANNEL_ID = 'X-GOOG-CHANNEL-ID'
99 X_GOOG_MESSAGE_NUMBER = 'X-GOOG-MESSAGE-NUMBER'
100 X_GOOG_RESOURCE_STATE = 'X-GOOG-RESOURCE-STATE'
101 X_GOOG_RESOURCE_URI = 'X-GOOG-RESOURCE-URI'
102 X_GOOG_RESOURCE_ID = 'X-GOOG-RESOURCE-ID'
106 new_headers = {}
107 for k, v in six.iteritems(headers):
108 new_headers[k.upper()] = v
109 return new_headers
110
113 """A Notification from a Channel.
114
115 Notifications are not usually constructed directly, but are returned
116 from functions like notification_from_headers().
117
118 Attributes:
119 message_number: int, The unique id number of this notification.
120 state: str, The state of the resource being monitored.
121 uri: str, The address of the resource being monitored.
122 resource_id: str, The unique identifier of the version of the resource at
123 this event.
124 """
125 @util.positional(5)
126 - def __init__(self, message_number, state, resource_uri, resource_id):
127 """Notification constructor.
128
129 Args:
130 message_number: int, The unique id number of this notification.
131 state: str, The state of the resource being monitored. Can be one
132 of "exists", "not_exists", or "sync".
133 resource_uri: str, The address of the resource being monitored.
134 resource_id: str, The identifier of the watched resource.
135 """
136 self.message_number = message_number
137 self.state = state
138 self.resource_uri = resource_uri
139 self.resource_id = resource_id
140
143 """A Channel for notifications.
144
145 Usually not constructed directly, instead it is returned from helper
146 functions like new_webhook_channel().
147
148 Attributes:
149 type: str, The type of delivery mechanism used by this channel. For
150 example, 'web_hook'.
151 id: str, A UUID for the channel.
152 token: str, An arbitrary string associated with the channel that
153 is delivered to the target address with each event delivered
154 over this channel.
155 address: str, The address of the receiving entity where events are
156 delivered. Specific to the channel type.
157 expiration: int, The time, in milliseconds from the epoch, when this
158 channel will expire.
159 params: dict, A dictionary of string to string, with additional parameters
160 controlling delivery channel behavior.
161 resource_id: str, An opaque id that identifies the resource that is
162 being watched. Stable across different API versions.
163 resource_uri: str, The canonicalized ID of the watched resource.
164 """
165
166 @util.positional(5)
167 - def __init__(self, type, id, token, address, expiration=None,
168 params=None, resource_id="", resource_uri=""):
169 """Create a new Channel.
170
171 In user code, this Channel constructor will not typically be called
172 manually since there are functions for creating channels for each specific
173 type with a more customized set of arguments to pass.
174
175 Args:
176 type: str, The type of delivery mechanism used by this channel. For
177 example, 'web_hook'.
178 id: str, A UUID for the channel.
179 token: str, An arbitrary string associated with the channel that
180 is delivered to the target address with each event delivered
181 over this channel.
182 address: str, The address of the receiving entity where events are
183 delivered. Specific to the channel type.
184 expiration: int, The time, in milliseconds from the epoch, when this
185 channel will expire.
186 params: dict, A dictionary of string to string, with additional parameters
187 controlling delivery channel behavior.
188 resource_id: str, An opaque id that identifies the resource that is
189 being watched. Stable across different API versions.
190 resource_uri: str, The canonicalized ID of the watched resource.
191 """
192 self.type = type
193 self.id = id
194 self.token = token
195 self.address = address
196 self.expiration = expiration
197 self.params = params
198 self.resource_id = resource_id
199 self.resource_uri = resource_uri
200
202 """Build a body from the Channel.
203
204 Constructs a dictionary that's appropriate for passing into watch()
205 methods as the value of body argument.
206
207 Returns:
208 A dictionary representation of the channel.
209 """
210 result = {
211 'id': self.id,
212 'token': self.token,
213 'type': self.type,
214 'address': self.address
215 }
216 if self.params:
217 result['params'] = self.params
218 if self.resource_id:
219 result['resourceId'] = self.resource_id
220 if self.resource_uri:
221 result['resourceUri'] = self.resource_uri
222 if self.expiration:
223 result['expiration'] = self.expiration
224
225 return result
226
228 """Update a channel with information from the response of watch().
229
230 When a request is sent to watch() a resource, the response returned
231 from the watch() request is a dictionary with updated channel information,
232 such as the resource_id, which is needed when stopping a subscription.
233
234 Args:
235 resp: dict, The response from a watch() method.
236 """
237 for json_name, param_name in six.iteritems(CHANNEL_PARAMS):
238 value = resp.get(json_name)
239 if value is not None:
240 setattr(self, param_name, value)
241
244 """Parse a notification from the webhook request headers, validate
245 the notification, and return a Notification object.
246
247 Args:
248 channel: Channel, The channel that the notification is associated with.
249 headers: dict, A dictionary like object that contains the request headers
250 from the webhook HTTP request.
251
252 Returns:
253 A Notification object.
254
255 Raises:
256 errors.InvalidNotificationError if the notification is invalid.
257 ValueError if the X-GOOG-MESSAGE-NUMBER can't be converted to an int.
258 """
259 headers = _upper_header_keys(headers)
260 channel_id = headers[X_GOOG_CHANNEL_ID]
261 if channel.id != channel_id:
262 raise errors.InvalidNotificationError(
263 'Channel id mismatch: %s != %s' % (channel.id, channel_id))
264 else:
265 message_number = int(headers[X_GOOG_MESSAGE_NUMBER])
266 state = headers[X_GOOG_RESOURCE_STATE]
267 resource_uri = headers[X_GOOG_RESOURCE_URI]
268 resource_id = headers[X_GOOG_RESOURCE_ID]
269 return Notification(message_number, state, resource_uri, resource_id)
270
274 """Create a new webhook Channel.
275
276 Args:
277 url: str, URL to post notifications to.
278 token: str, An arbitrary string associated with the channel that
279 is delivered to the target address with each notification delivered
280 over this channel.
281 expiration: datetime.datetime, A time in the future when the channel
282 should expire. Can also be None if the subscription should use the
283 default expiration. Note that different services may have different
284 limits on how long a subscription lasts. Check the response from the
285 watch() method to see the value the service has set for an expiration
286 time.
287 params: dict, Extra parameters to pass on channel creation. Currently
288 not used for webhook channels.
289 """
290 expiration_ms = 0
291 if expiration:
292 delta = expiration - EPOCH
293 expiration_ms = delta.microseconds/1000 + (
294 delta.seconds + delta.days*24*3600)*1000
295 if expiration_ms < 0:
296 expiration_ms = 0
297
298 return Channel('web_hook', str(uuid.uuid4()),
299 token, url, expiration=expiration_ms,
300 params=params)
301