• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2#
3# Copyright 2010 Google Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Model classes which are used to communicate between parts of implementation.
18
19These model classes are describing mapreduce, its current state and
20communication messages. They are either stored in the datastore or
21serialized to/from json and passed around with other means.
22"""
23
24# Disable "Invalid method name"
25# pylint: disable=g-bad-name
26
27
28
29__all__ = ["MapreduceState",
30           "MapperSpec",
31           "MapreduceControl",
32           "MapreduceSpec",
33           "ShardState",
34           "CountersMap",
35           "TransientShardState",
36           "QuerySpec",
37           "HugeTask"]
38
39import cgi
40import datetime
41import urllib
42import zlib
43from graphy import bar_chart
44from graphy.backends import google_chart_api
45
46try:
47  import json
48except ImportError:
49  import simplejson as json
50
51from google.appengine.api import memcache
52from google.appengine.api import taskqueue
53from google.appengine.datastore import datastore_rpc
54from google.appengine.ext import db
55from mapreduce import context
56from mapreduce import hooks
57from mapreduce import json_util
58from mapreduce import util
59
60
61# pylint: disable=protected-access
62
63
64# Special datastore kinds for MR.
65_MAP_REDUCE_KINDS = ("_AE_MR_MapreduceControl",
66                     "_AE_MR_MapreduceState",
67                     "_AE_MR_ShardState",
68                     "_AE_MR_TaskPayload")
69
70
71class _HugeTaskPayload(db.Model):
72  """Model object to store task payload."""
73
74  payload = db.BlobProperty()
75
76  @classmethod
77  def kind(cls):
78    """Returns entity kind."""
79    return "_AE_MR_TaskPayload"
80
81
82class HugeTask(object):
83  """HugeTask is a taskqueue.Task-like class that can store big payloads.
84
85  Payloads are stored either in the task payload itself or in the datastore.
86  Task handlers should inherit from base_handler.HugeTaskHandler class.
87  """
88
89  PAYLOAD_PARAM = "__payload"
90  PAYLOAD_KEY_PARAM = "__payload_key"
91
92  # Leave some wiggle room for headers and other fields.
93  MAX_TASK_PAYLOAD = taskqueue.MAX_PUSH_TASK_SIZE_BYTES - 1024
94  MAX_DB_PAYLOAD = datastore_rpc.BaseConnection.MAX_RPC_BYTES
95
96  PAYLOAD_VERSION_HEADER = "AE-MR-Payload-Version"
97  # Update version when payload handling is changed
98  # in a backward incompatible way.
99  PAYLOAD_VERSION = "1"
100
101  def __init__(self,
102               url,
103               params,
104               name=None,
105               eta=None,
106               countdown=None,
107               parent=None,
108               headers=None):
109    """Init.
110
111    Args:
112      url: task url in str.
113      params: a dict from str to str.
114      name: task name.
115      eta: task eta.
116      countdown: task countdown.
117      parent: parent entity of huge task's payload.
118      headers: a dict of headers for the task.
119
120    Raises:
121      ValueError: when payload is too big even for datastore, or parent is
122    not specified when payload is stored in datastore.
123    """
124    self.url = url
125    self.name = name
126    self.eta = eta
127    self.countdown = countdown
128    self._headers = {
129        "Content-Type": "application/octet-stream",
130        self.PAYLOAD_VERSION_HEADER: self.PAYLOAD_VERSION
131    }
132    if headers:
133      self._headers.update(headers)
134
135    # TODO(user): Find a more space efficient way than urlencoding.
136    payload_str = urllib.urlencode(params)
137    compressed_payload = ""
138    if len(payload_str) > self.MAX_TASK_PAYLOAD:
139      compressed_payload = zlib.compress(payload_str)
140
141    # Payload is small. Don't bother with anything.
142    if not compressed_payload:
143      self._payload = payload_str
144    # Compressed payload is small. Don't bother with datastore.
145    elif len(compressed_payload) < self.MAX_TASK_PAYLOAD:
146      self._payload = self.PAYLOAD_PARAM + compressed_payload
147    elif len(compressed_payload) > self.MAX_DB_PAYLOAD:
148      raise ValueError(
149          "Payload from %s to big to be stored in database: %s" %
150          (self.name, len(compressed_payload)))
151    # Store payload in the datastore.
152    else:
153      if not parent:
154        raise ValueError("Huge tasks should specify parent entity.")
155
156      payload_entity = _HugeTaskPayload(payload=compressed_payload,
157                                        parent=parent)
158      payload_key = payload_entity.put()
159      self._payload = self.PAYLOAD_KEY_PARAM + str(payload_key)
160
161  def add(self, queue_name, transactional=False):
162    """Add task to the queue."""
163    task = self.to_task()
164    task.add(queue_name, transactional)
165
166  def to_task(self):
167    """Convert to a taskqueue task."""
168    # Never pass params to taskqueue.Task. Use payload instead. Otherwise,
169    # it's up to a particular taskqueue implementation to generate
170    # payload from params. It could blow up payload size over limit.
171    return taskqueue.Task(
172        url=self.url,
173        payload=self._payload,
174        name=self.name,
175        eta=self.eta,
176        countdown=self.countdown,
177        headers=self._headers)
178
179  @classmethod
180  def decode_payload(cls, request):
181    """Decode task payload.
182
183    HugeTask controls its own payload entirely including urlencoding.
184    It doesn't depend on any particular web framework.
185
186    Args:
187      request: a webapp Request instance.
188
189    Returns:
190      A dict of str to str. The same as the params argument to __init__.
191
192    Raises:
193      DeprecationWarning: When task payload constructed from an older
194        incompatible version of mapreduce.
195    """
196    # TODO(user): Pass mr_id into headers. Otherwise when payload decoding
197    # failed, we can't abort a mr.
198    if request.headers.get(cls.PAYLOAD_VERSION_HEADER) != cls.PAYLOAD_VERSION:
199      raise DeprecationWarning(
200          "Task is generated by an older incompatible version of mapreduce. "
201          "Please kill this job manually")
202    return cls._decode_payload(request.body)
203
204  @classmethod
205  def _decode_payload(cls, body):
206    compressed_payload_str = None
207    if body.startswith(cls.PAYLOAD_KEY_PARAM):
208      payload_key = body[len(cls.PAYLOAD_KEY_PARAM):]
209      payload_entity = _HugeTaskPayload.get(payload_key)
210      compressed_payload_str = payload_entity.payload
211    elif body.startswith(cls.PAYLOAD_PARAM):
212      compressed_payload_str = body[len(cls.PAYLOAD_PARAM):]
213
214    if compressed_payload_str:
215      payload_str = zlib.decompress(compressed_payload_str)
216    else:
217      payload_str = body
218
219    result = {}
220    for (name, value) in cgi.parse_qs(payload_str).items():
221      if len(value) == 1:
222        result[name] = value[0]
223      else:
224        result[name] = value
225    return result
226
227
228class CountersMap(json_util.JsonMixin):
229  """Maintains map from counter name to counter value.
230
231  The class is used to provide basic arithmetics of counter values (buil
232  add/remove), increment individual values and store/load data from json.
233  """
234
235  def __init__(self, initial_map=None):
236    """Constructor.
237
238    Args:
239      initial_map: initial counter values map from counter name (string) to
240        counter value (int).
241    """
242    if initial_map:
243      self.counters = initial_map
244    else:
245      self.counters = {}
246
247  def __repr__(self):
248    """Compute string representation."""
249    return "mapreduce.model.CountersMap(%r)" % self.counters
250
251  def get(self, counter_name, default=0):
252    """Get current counter value.
253
254    Args:
255      counter_name: counter name as string.
256      default: default value if one doesn't exist.
257
258    Returns:
259      current counter value as int. 0 if counter was not set.
260    """
261    return self.counters.get(counter_name, default)
262
263  def increment(self, counter_name, delta):
264    """Increment counter value.
265
266    Args:
267      counter_name: counter name as String.
268      delta: increment delta as Integer.
269
270    Returns:
271      new counter value.
272    """
273    current_value = self.counters.get(counter_name, 0)
274    new_value = current_value + delta
275    self.counters[counter_name] = new_value
276    return new_value
277
278  def add_map(self, counters_map):
279    """Add all counters from the map.
280
281    For each counter in the passed map, adds its value to the counter in this
282    map.
283
284    Args:
285      counters_map: CounterMap instance to add.
286    """
287    for counter_name in counters_map.counters:
288      self.increment(counter_name, counters_map.counters[counter_name])
289
290  def sub_map(self, counters_map):
291    """Subtracts all counters from the map.
292
293    For each counter in the passed map, subtracts its value to the counter in
294    this map.
295
296    Args:
297      counters_map: CounterMap instance to subtract.
298    """
299    for counter_name in counters_map.counters:
300      self.increment(counter_name, -counters_map.counters[counter_name])
301
302  def clear(self):
303    """Clear all values."""
304    self.counters = {}
305
306  def to_json(self):
307    """Serializes all the data in this map into json form.
308
309    Returns:
310      json-compatible data representation.
311    """
312    return {"counters": self.counters}
313
314  @classmethod
315  def from_json(cls, json):
316    """Create new CountersMap from the json data structure, encoded by to_json.
317
318    Args:
319      json: json representation of CountersMap .
320
321    Returns:
322      an instance of CountersMap with all data deserialized from json.
323    """
324    counters_map = cls()
325    counters_map.counters = json["counters"]
326    return counters_map
327
328  def to_dict(self):
329    """Convert to dictionary.
330
331    Returns:
332      a dictionary with counter name as key and counter values as value.
333    """
334    return self.counters
335
336
337class MapperSpec(json_util.JsonMixin):
338  """Contains a specification for the mapper phase of the mapreduce.
339
340  MapperSpec instance can be changed only during mapreduce starting process,
341  and it remains immutable for the rest of mapreduce execution. MapperSpec is
342  passed as a payload to all mapreduce tasks in JSON encoding as part of
343  MapreduceSpec.
344
345  Specifying mapper handlers:
346    * '<module_name>.<class_name>' - __call__ method of class instance will be
347      called
348    * '<module_name>.<function_name>' - function will be called.
349    * '<module_name>.<class_name>.<method_name>' - class will be instantiated
350      and method called.
351  """
352
353  def __init__(self,
354               handler_spec,
355               input_reader_spec,
356               params,
357               shard_count,
358               output_writer_spec=None):
359    """Creates a new MapperSpec.
360
361    Args:
362      handler_spec: handler specification as string (see class doc for
363        details).
364      input_reader_spec: The class name of the input reader to use.
365      params: Dictionary of additional parameters for the mapper.
366      shard_count: number of shards to process in parallel.
367
368    Properties:
369      handler_spec: name of handler class/function to use.
370      input_reader_spec: The class name of the input reader to use.
371      params: Dictionary of additional parameters for the mapper.
372      shard_count: number of shards to process in parallel.
373      output_writer_spec: The class name of the output writer to use.
374    """
375    self.handler_spec = handler_spec
376    self.input_reader_spec = input_reader_spec
377    self.output_writer_spec = output_writer_spec
378    self.shard_count = int(shard_count)
379    self.params = params
380
381  def get_handler(self):
382    """Get mapper handler instance.
383
384    This always creates a new instance of the handler. If the handler is a
385    callable instance, MR only wants to create a new instance at the
386    beginning of a shard or shard retry. The pickled callable instance
387    should be accessed from TransientShardState.
388
389    Returns:
390      handler instance as callable.
391    """
392    return util.handler_for_name(self.handler_spec)
393
394  handler = property(get_handler)
395
396  def input_reader_class(self):
397    """Get input reader class.
398
399    Returns:
400      input reader class object.
401    """
402    return util.for_name(self.input_reader_spec)
403
404  def output_writer_class(self):
405    """Get output writer class.
406
407    Returns:
408      output writer class object.
409    """
410    return self.output_writer_spec and util.for_name(self.output_writer_spec)
411
412  def to_json(self):
413    """Serializes this MapperSpec into a json-izable object."""
414    result = {
415        "mapper_handler_spec": self.handler_spec,
416        "mapper_input_reader": self.input_reader_spec,
417        "mapper_params": self.params,
418        "mapper_shard_count": self.shard_count
419    }
420    if self.output_writer_spec:
421      result["mapper_output_writer"] = self.output_writer_spec
422    return result
423
424  def __str__(self):
425    return "MapperSpec(%s, %s, %s, %s)" % (
426        self.handler_spec, self.input_reader_spec, self.params,
427        self.shard_count)
428
429  @classmethod
430  def from_json(cls, json):
431    """Creates MapperSpec from a dict-like object."""
432    return cls(json["mapper_handler_spec"],
433               json["mapper_input_reader"],
434               json["mapper_params"],
435               json["mapper_shard_count"],
436               json.get("mapper_output_writer")
437              )
438
439  def __eq__(self, other):
440    if not isinstance(other, self.__class__):
441      return False
442    return self.to_json() == other.to_json()
443
444
445class MapreduceSpec(json_util.JsonMixin):
446  """Contains a specification for the whole mapreduce.
447
448  MapreduceSpec instance can be changed only during mapreduce starting process,
449  and it remains immutable for the rest of mapreduce execution. MapreduceSpec is
450  passed as a payload to all mapreduce tasks in json encoding.
451  """
452
453  # Url to call when mapreduce finishes its execution.
454  PARAM_DONE_CALLBACK = "done_callback"
455  # Queue to use to call done callback
456  PARAM_DONE_CALLBACK_QUEUE = "done_callback_queue"
457
458  def __init__(self,
459               name,
460               mapreduce_id,
461               mapper_spec,
462               params={},
463               hooks_class_name=None):
464    """Create new MapreduceSpec.
465
466    Args:
467      name: The name of this mapreduce job type.
468      mapreduce_id: ID of the mapreduce.
469      mapper_spec: JSON-encoded string containing a MapperSpec.
470      params: dictionary of additional mapreduce parameters.
471      hooks_class_name: The fully qualified name of the hooks class to use.
472
473    Properties:
474      name: The name of this mapreduce job type.
475      mapreduce_id: unique id of this mapreduce as string.
476      mapper: This MapreduceSpec's instance of MapperSpec.
477      params: dictionary of additional mapreduce parameters.
478      hooks_class_name: The fully qualified name of the hooks class to use.
479    """
480    self.name = name
481    self.mapreduce_id = mapreduce_id
482    self.mapper = MapperSpec.from_json(mapper_spec)
483    self.params = params
484    self.hooks_class_name = hooks_class_name
485    self.__hooks = None
486    self.get_hooks()  # Fail fast on an invalid hook class.
487
488  def get_hooks(self):
489    """Returns a hooks.Hooks class or None if no hooks class has been set."""
490    if self.__hooks is None and self.hooks_class_name is not None:
491      hooks_class = util.for_name(self.hooks_class_name)
492      if not isinstance(hooks_class, type):
493        raise ValueError("hooks_class_name must refer to a class, got %s" %
494                         type(hooks_class).__name__)
495      if not issubclass(hooks_class, hooks.Hooks):
496        raise ValueError(
497            "hooks_class_name must refer to a hooks.Hooks subclass")
498      self.__hooks = hooks_class(self)
499
500    return self.__hooks
501
502  def to_json(self):
503    """Serializes all data in this mapreduce spec into json form.
504
505    Returns:
506      data in json format.
507    """
508    mapper_spec = self.mapper.to_json()
509    return {
510        "name": self.name,
511        "mapreduce_id": self.mapreduce_id,
512        "mapper_spec": mapper_spec,
513        "params": self.params,
514        "hooks_class_name": self.hooks_class_name,
515    }
516
517  @classmethod
518  def from_json(cls, json):
519    """Create new MapreduceSpec from the json, encoded by to_json.
520
521    Args:
522      json: json representation of MapreduceSpec.
523
524    Returns:
525      an instance of MapreduceSpec with all data deserialized from json.
526    """
527    mapreduce_spec = cls(json["name"],
528                         json["mapreduce_id"],
529                         json["mapper_spec"],
530                         json.get("params"),
531                         json.get("hooks_class_name"))
532    return mapreduce_spec
533
534  def __str__(self):
535    return str(self.to_json())
536
537  def __eq__(self, other):
538    if not isinstance(other, self.__class__):
539      return False
540    return self.to_json() == other.to_json()
541
542  @classmethod
543  def _get_mapreduce_spec(cls, mr_id):
544    """Get Mapreduce spec from mr id."""
545    key = 'GAE-MR-spec: %s' % mr_id
546    spec_json = memcache.get(key)
547    if spec_json:
548      return cls.from_json(spec_json)
549    state = MapreduceState.get_by_job_id(mr_id)
550    spec = state.mapreduce_spec
551    spec_json = spec.to_json()
552    memcache.set(key, spec_json)
553    return spec
554
555
556class MapreduceState(db.Model):
557  """Holds accumulated state of mapreduce execution.
558
559  MapreduceState is stored in datastore with a key name equal to the
560  mapreduce ID. Only controller tasks can write to MapreduceState.
561
562  Properties:
563    mapreduce_spec: cached deserialized MapreduceSpec instance. read-only
564    active: if this MR is still running.
565    last_poll_time: last time controller job has polled this mapreduce.
566    counters_map: shard's counters map as CountersMap. Mirrors
567      counters_map_json.
568    chart_url: last computed mapreduce status chart url. This chart displays the
569      progress of all the shards the best way it can.
570    sparkline_url: last computed mapreduce status chart url in small format.
571    result_status: If not None, the final status of the job.
572    active_shards: How many shards are still processing. This starts as 0,
573      then set by KickOffJob handler to be the actual number of input
574      readers after input splitting, and is updated by Controller task
575      as shards finish.
576    start_time: When the job started.
577    writer_state: Json property to be used by writer to store its state.
578      This is filled when single output per job. Will be deprecated.
579      Use OutputWriter.get_filenames instead.
580  """
581
582  RESULT_SUCCESS = "success"
583  RESULT_FAILED = "failed"
584  RESULT_ABORTED = "aborted"
585
586  _RESULTS = frozenset([RESULT_SUCCESS, RESULT_FAILED, RESULT_ABORTED])
587
588  # Functional properties.
589  # TODO(user): Replace mapreduce_spec with job_config.
590  mapreduce_spec = json_util.JsonProperty(MapreduceSpec, indexed=False)
591  active = db.BooleanProperty(default=True, indexed=False)
592  last_poll_time = db.DateTimeProperty(required=True)
593  counters_map = json_util.JsonProperty(
594      CountersMap, default=CountersMap(), indexed=False)
595  app_id = db.StringProperty(required=False, indexed=True)
596  writer_state = json_util.JsonProperty(dict, indexed=False)
597  active_shards = db.IntegerProperty(default=0, indexed=False)
598  failed_shards = db.IntegerProperty(default=0, indexed=False)
599  aborted_shards = db.IntegerProperty(default=0, indexed=False)
600  result_status = db.StringProperty(required=False, choices=_RESULTS)
601
602  # For UI purposes only.
603  chart_url = db.TextProperty(default="")
604  chart_width = db.IntegerProperty(default=300, indexed=False)
605  sparkline_url = db.TextProperty(default="")
606  start_time = db.DateTimeProperty(auto_now_add=True)
607
608  @classmethod
609  def kind(cls):
610    """Returns entity kind."""
611    return "_AE_MR_MapreduceState"
612
613  @classmethod
614  def get_key_by_job_id(cls, mapreduce_id):
615    """Retrieves the Key for a Job.
616
617    Args:
618      mapreduce_id: The job to retrieve.
619
620    Returns:
621      Datastore Key that can be used to fetch the MapreduceState.
622    """
623    return db.Key.from_path(cls.kind(), str(mapreduce_id))
624
625  @classmethod
626  def get_by_job_id(cls, mapreduce_id):
627    """Retrieves the instance of state for a Job.
628
629    Args:
630      mapreduce_id: The mapreduce job to retrieve.
631
632    Returns:
633      instance of MapreduceState for passed id.
634    """
635    return db.get(cls.get_key_by_job_id(mapreduce_id))
636
637  def set_processed_counts(self, shards_processed, shards_status):
638    """Updates a chart url to display processed count for each shard.
639
640    Args:
641      shards_processed: list of integers with number of processed entities in
642        each shard
643    """
644    chart = google_chart_api.BarChart()
645
646    def filter_status(status_to_filter):
647      return [count if status == status_to_filter else 0
648              for count, status in zip(shards_processed, shards_status)]
649
650    if shards_status:
651      # Each index will have only one non-zero count, so stack them to color-
652      # code the bars by status
653      # These status values are computed in _update_state_from_shard_states,
654      # in mapreduce/handlers.py.
655      chart.stacked = True
656      chart.AddBars(filter_status("unknown"), color="404040")
657      chart.AddBars(filter_status("success"), color="00ac42")
658      chart.AddBars(filter_status("running"), color="3636a9")
659      chart.AddBars(filter_status("aborted"), color="e29e24")
660      chart.AddBars(filter_status("failed"), color="f6350f")
661    else:
662      chart.AddBars(shards_processed)
663
664    shard_count = len(shards_processed)
665
666    if shard_count > 95:
667      # Auto-spacing does not work for large numbers of shards.
668      pixels_per_shard = 700.0 / shard_count
669      bar_thickness = int(pixels_per_shard * .9)
670
671      chart.style = bar_chart.BarChartStyle(bar_thickness=bar_thickness,
672        bar_gap=0.1, use_fractional_gap_spacing=True)
673
674    if shards_processed and shard_count <= 95:
675      # Adding labels puts us in danger of exceeding the URL length, only
676      # do it when we have a small amount of data to plot.
677      # Only 16 labels on the whole chart.
678      stride_length = max(1, shard_count / 16)
679      chart.bottom.labels = []
680      for x in xrange(shard_count):
681        if (x % stride_length == 0 or
682            x == shard_count - 1):
683          chart.bottom.labels.append(x)
684        else:
685          chart.bottom.labels.append("")
686      chart.left.labels = ["0", str(max(shards_processed))]
687      chart.left.min = 0
688
689    self.chart_width = min(700, max(300, shard_count * 20))
690    self.chart_url = chart.display.Url(self.chart_width, 200)
691
692  def get_processed(self):
693    """Number of processed entities.
694
695    Returns:
696      The total number of processed entities as int.
697    """
698    return self.counters_map.get(context.COUNTER_MAPPER_CALLS)
699
700  processed = property(get_processed)
701
702  @staticmethod
703  def create_new(mapreduce_id=None,
704                 gettime=datetime.datetime.now):
705    """Create a new MapreduceState.
706
707    Args:
708      mapreduce_id: Mapreduce id as string.
709      gettime: Used for testing.
710    """
711    if not mapreduce_id:
712      mapreduce_id = MapreduceState.new_mapreduce_id()
713    state = MapreduceState(key_name=mapreduce_id,
714                           last_poll_time=gettime())
715    state.set_processed_counts([], [])
716    return state
717
718  @staticmethod
719  def new_mapreduce_id():
720    """Generate new mapreduce id."""
721    return util._get_descending_key()
722
723  def __eq__(self, other):
724    if not isinstance(other, self.__class__):
725      return False
726    return self.properties() == other.properties()
727
728
729class TransientShardState(object):
730  """A shard's states that are kept in task payload.
731
732  TransientShardState holds two types of states:
733  1. Some states just don't need to be saved to datastore. e.g.
734     serialized input reader and output writer instances.
735  2. Some states are duplicated from datastore, e.g. slice_id, shard_id.
736     These are used to validate the task.
737  """
738
739  def __init__(self,
740               base_path,
741               mapreduce_spec,
742               shard_id,
743               slice_id,
744               input_reader,
745               initial_input_reader,
746               output_writer=None,
747               retries=0,
748               handler=None):
749    """Init.
750
751    Args:
752      base_path: base path of this mapreduce job. Deprecated.
753      mapreduce_spec: an instance of MapReduceSpec.
754      shard_id: shard id.
755      slice_id: slice id. When enqueuing task for the next slice, this number
756        is incremented by 1.
757      input_reader: input reader instance for this shard.
758      initial_input_reader: the input reader instance before any iteration.
759        Used by shard retry.
760      output_writer: output writer instance for this shard, if exists.
761      retries: the number of retries of the current shard. Used to drop
762        tasks from old retries.
763      handler: map/reduce handler.
764    """
765    self.base_path = base_path
766    self.mapreduce_spec = mapreduce_spec
767    self.shard_id = shard_id
768    self.slice_id = slice_id
769    self.input_reader = input_reader
770    self.initial_input_reader = initial_input_reader
771    self.output_writer = output_writer
772    self.retries = retries
773    self.handler = handler
774    self._input_reader_json = self.input_reader.to_json()
775
776  def reset_for_retry(self, output_writer):
777    """Reset self for shard retry.
778
779    Args:
780      output_writer: new output writer that contains new output files.
781    """
782    self.input_reader = self.initial_input_reader
783    self.slice_id = 0
784    self.retries += 1
785    self.output_writer = output_writer
786    self.handler = self.mapreduce_spec.mapper.handler
787
788  def advance_for_next_slice(self, recovery_slice=False):
789    """Advance relavent states for next slice.
790
791    Args:
792      recovery_slice: True if this slice is running recovery logic.
793        See handlers.MapperWorkerCallbackHandler._attempt_slice_recovery
794        for more info.
795    """
796    if recovery_slice:
797      self.slice_id += 2
798      # Restore input reader to the beginning of the slice.
799      self.input_reader = self.input_reader.from_json(self._input_reader_json)
800    else:
801      self.slice_id += 1
802
803  def to_dict(self):
804    """Convert state to dictionary to save in task payload."""
805    result = {"mapreduce_spec": self.mapreduce_spec.to_json_str(),
806              "shard_id": self.shard_id,
807              "slice_id": str(self.slice_id),
808              "input_reader_state": self.input_reader.to_json_str(),
809              "initial_input_reader_state":
810              self.initial_input_reader.to_json_str(),
811              "retries": str(self.retries)}
812    if self.output_writer:
813      result["output_writer_state"] = self.output_writer.to_json_str()
814    serialized_handler = util.try_serialize_handler(self.handler)
815    if serialized_handler:
816      result["serialized_handler"] = serialized_handler
817    return result
818
819  @classmethod
820  def from_request(cls, request):
821    """Create new TransientShardState from webapp request."""
822    mapreduce_spec = MapreduceSpec.from_json_str(request.get("mapreduce_spec"))
823    mapper_spec = mapreduce_spec.mapper
824    input_reader_spec_dict = json.loads(request.get("input_reader_state"),
825                                        cls=json_util.JsonDecoder)
826    input_reader = mapper_spec.input_reader_class().from_json(
827        input_reader_spec_dict)
828    initial_input_reader_spec_dict = json.loads(
829        request.get("initial_input_reader_state"), cls=json_util.JsonDecoder)
830    initial_input_reader = mapper_spec.input_reader_class().from_json(
831        initial_input_reader_spec_dict)
832
833    output_writer = None
834    if mapper_spec.output_writer_class():
835      output_writer = mapper_spec.output_writer_class().from_json(
836          json.loads(request.get("output_writer_state", "{}"),
837                     cls=json_util.JsonDecoder))
838      assert isinstance(output_writer, mapper_spec.output_writer_class()), (
839          "%s.from_json returned an instance of wrong class: %s" % (
840              mapper_spec.output_writer_class(),
841              output_writer.__class__))
842
843    handler = util.try_deserialize_handler(request.get("serialized_handler"))
844    if not handler:
845      handler = mapreduce_spec.mapper.handler
846
847    return cls(mapreduce_spec.params["base_path"],
848               mapreduce_spec,
849               str(request.get("shard_id")),
850               int(request.get("slice_id")),
851               input_reader,
852               initial_input_reader,
853               output_writer=output_writer,
854               retries=int(request.get("retries")),
855               handler=handler)
856
857
858class ShardState(db.Model):
859  """Single shard execution state.
860
861  The shard state is stored in the datastore and is later aggregated by
862  controller task. ShardState key_name is equal to shard_id.
863
864  Shard state contains critical state to ensure the correctness of
865  shard execution. It is the single source of truth about a shard's
866  progress. For example:
867  1. A slice is allowed to run only if its payload matches shard state's
868     expectation.
869  2. A slice is considered running only if it has acquired the shard's lock.
870  3. A slice is considered done only if it has successfully committed shard
871     state to db.
872
873  Properties about the shard:
874    active: if we have this shard still running as boolean.
875    counters_map: shard's counters map as CountersMap. All counters yielded
876      within mapreduce are stored here.
877    mapreduce_id: unique id of the mapreduce.
878    shard_id: unique id of this shard as string.
879    shard_number: ordered number for this shard.
880    retries: the number of times this shard has been retried.
881    result_status: If not None, the final status of this shard.
882    update_time: The last time this shard state was updated.
883    shard_description: A string description of the work this shard will do.
884    last_work_item: A string description of the last work item processed.
885    writer_state: writer state for this shard. The shard's output writer
886      instance can save in-memory output references to this field in its
887      "finalize" method.
888
889   Properties about slice management:
890    slice_id: slice id of current executing slice. A slice's task
891      will not run unless its slice_id matches this. Initial
892      value is 0. By the end of slice execution, this number is
893      incremented by 1.
894    slice_start_time: a slice updates this to now at the beginning of
895      execution. If the transaction succeeds, the current task holds
896      a lease of slice duration + some grace period. During this time, no
897      other task with the same slice_id will execute. Upon slice failure,
898      the task should try to unset this value to allow retries to carry on
899      ASAP.
900    slice_request_id: the request id that holds/held the lease. When lease has
901      expired, new request needs to verify that said request has indeed
902      ended according to logs API. Do this only when lease has expired
903      because logs API is expensive. This field should always be set/unset
904      with slice_start_time. It is possible Logs API doesn't log a request
905      at all or doesn't log the end of a request. So a new request can
906      proceed after a long conservative timeout.
907    slice_retries: the number of times a slice has been retried due to
908      processing data when lock is held. Taskqueue/datastore errors
909      related to slice/shard management are not counted. This count is
910      only a lower bound and is used to determined when to fail a slice
911      completely.
912    acquired_once: whether the lock for this slice has been acquired at
913      least once. When this is True, duplicates in outputs are possible.
914  """
915
916  RESULT_SUCCESS = "success"
917  RESULT_FAILED = "failed"
918  # Shard can be in aborted state when user issued abort, or controller
919  # issued abort because some other shard failed.
920  RESULT_ABORTED = "aborted"
921
922  _RESULTS = frozenset([RESULT_SUCCESS, RESULT_FAILED, RESULT_ABORTED])
923
924  # Maximum number of shard states to hold in memory at any time.
925  _MAX_STATES_IN_MEMORY = 10
926
927  # Functional properties.
928  mapreduce_id = db.StringProperty(required=True)
929  active = db.BooleanProperty(default=True, indexed=False)
930  input_finished = db.BooleanProperty(default=False, indexed=False)
931  counters_map = json_util.JsonProperty(
932      CountersMap, default=CountersMap(), indexed=False)
933  result_status = db.StringProperty(choices=_RESULTS, indexed=False)
934  retries = db.IntegerProperty(default=0, indexed=False)
935  writer_state = json_util.JsonProperty(dict, indexed=False)
936  slice_id = db.IntegerProperty(default=0, indexed=False)
937  slice_start_time = db.DateTimeProperty(indexed=False)
938  slice_request_id = db.ByteStringProperty(indexed=False)
939  slice_retries = db.IntegerProperty(default=0, indexed=False)
940  acquired_once = db.BooleanProperty(default=False, indexed=False)
941
942  # For UI purposes only.
943  update_time = db.DateTimeProperty(auto_now=True, indexed=False)
944  shard_description = db.TextProperty(default="")
945  last_work_item = db.TextProperty(default="")
946
947  def __str__(self):
948    kv = {"active": self.active,
949          "slice_id": self.slice_id,
950          "last_work_item": self.last_work_item,
951          "update_time": self.update_time}
952    if self.result_status:
953      kv["result_status"] = self.result_status
954    if self.retries:
955      kv["retries"] = self.retries
956    if self.slice_start_time:
957      kv["slice_start_time"] = self.slice_start_time
958    if self.slice_retries:
959      kv["slice_retries"] = self.slice_retries
960    if self.slice_request_id:
961      kv["slice_request_id"] = self.slice_request_id
962    if self.acquired_once:
963      kv["acquired_once"] = self.acquired_once
964    keys = kv.keys()
965    keys.sort()
966
967    result = "ShardState is {"
968    for k in keys:
969      result += k + ":" + str(kv[k]) + ","
970    result += "}"
971    return result
972
973  def reset_for_retry(self):
974    """Reset self for shard retry."""
975    self.retries += 1
976    self.last_work_item = ""
977    self.active = True
978    self.result_status = None
979    self.input_finished = False
980    self.counters_map = CountersMap()
981    self.slice_id = 0
982    self.slice_start_time = None
983    self.slice_request_id = None
984    self.slice_retries = 0
985    self.acquired_once = False
986
987  def advance_for_next_slice(self, recovery_slice=False):
988    """Advance self for next slice.
989
990    Args:
991      recovery_slice: True if this slice is running recovery logic.
992        See handlers.MapperWorkerCallbackHandler._attempt_slice_recovery
993        for more info.
994    """
995    self.slice_start_time = None
996    self.slice_request_id = None
997    self.slice_retries = 0
998    self.acquired_once = False
999    if recovery_slice:
1000      self.slice_id += 2
1001    else:
1002      self.slice_id += 1
1003
1004  def set_for_failure(self):
1005    self.active = False
1006    self.result_status = self.RESULT_FAILED
1007
1008  def set_for_abort(self):
1009    self.active = False
1010    self.result_status = self.RESULT_ABORTED
1011
1012  def set_input_finished(self):
1013    self.input_finished = True
1014
1015  def is_input_finished(self):
1016    return self.input_finished
1017
1018  def set_for_success(self):
1019    self.active = False
1020    self.result_status = self.RESULT_SUCCESS
1021    self.slice_start_time = None
1022    self.slice_request_id = None
1023    self.slice_retries = 0
1024    self.acquired_once = False
1025
1026  def copy_from(self, other_state):
1027    """Copy data from another shard state entity to self."""
1028    for prop in self.properties().values():
1029      setattr(self, prop.name, getattr(other_state, prop.name))
1030
1031  def __eq__(self, other):
1032    if not isinstance(other, self.__class__):
1033      return False
1034    return self.properties() == other.properties()
1035
1036  def get_shard_number(self):
1037    """Gets the shard number from the key name."""
1038    return int(self.key().name().split("-")[-1])
1039
1040  shard_number = property(get_shard_number)
1041
1042  def get_shard_id(self):
1043    """Returns the shard ID."""
1044    return self.key().name()
1045
1046  shard_id = property(get_shard_id)
1047
1048  @classmethod
1049  def kind(cls):
1050    """Returns entity kind."""
1051    return "_AE_MR_ShardState"
1052
1053  @classmethod
1054  def shard_id_from_number(cls, mapreduce_id, shard_number):
1055    """Get shard id by mapreduce id and shard number.
1056
1057    Args:
1058      mapreduce_id: mapreduce id as string.
1059      shard_number: shard number to compute id for as int.
1060
1061    Returns:
1062      shard id as string.
1063    """
1064    return "%s-%d" % (mapreduce_id, shard_number)
1065
1066  @classmethod
1067  def get_key_by_shard_id(cls, shard_id):
1068    """Retrieves the Key for this ShardState.
1069
1070    Args:
1071      shard_id: The shard ID to fetch.
1072
1073    Returns:
1074      The Datatore key to use to retrieve this ShardState.
1075    """
1076    return db.Key.from_path(cls.kind(), shard_id)
1077
1078  @classmethod
1079  def get_by_shard_id(cls, shard_id):
1080    """Get shard state from datastore by shard_id.
1081
1082    Args:
1083      shard_id: shard id as string.
1084
1085    Returns:
1086      ShardState for given shard id or None if it's not found.
1087    """
1088    return cls.get_by_key_name(shard_id)
1089
1090  @classmethod
1091  def find_by_mapreduce_state(cls, mapreduce_state):
1092    """Find all shard states for given mapreduce.
1093
1094    Deprecated. Use find_all_by_mapreduce_state.
1095    This will be removed after 1.8.9 release.
1096
1097    Args:
1098      mapreduce_state: MapreduceState instance
1099
1100    Returns:
1101      A list of ShardStates.
1102    """
1103    return list(cls.find_all_by_mapreduce_state(mapreduce_state))
1104
1105  @classmethod
1106  def find_all_by_mapreduce_state(cls, mapreduce_state):
1107    """Find all shard states for given mapreduce.
1108
1109    Args:
1110      mapreduce_state: MapreduceState instance
1111
1112    Yields:
1113      shard states sorted by shard id.
1114    """
1115    keys = cls.calculate_keys_by_mapreduce_state(mapreduce_state)
1116    i = 0
1117    while i < len(keys):
1118      @db.non_transactional
1119      def no_tx_get(i):
1120        return db.get(keys[i:i+cls._MAX_STATES_IN_MEMORY])
1121      # We need a separate function to so that we can mix non-transactional and
1122      # use be a generator
1123      states = no_tx_get(i)
1124      for s in states:
1125        i += 1
1126        if s is not None:
1127          yield s
1128
1129  @classmethod
1130  def calculate_keys_by_mapreduce_state(cls, mapreduce_state):
1131    """Calculate all shard states keys for given mapreduce.
1132
1133    Args:
1134      mapreduce_state: MapreduceState instance
1135
1136    Returns:
1137      A list of keys for shard states, sorted by shard id.
1138      The corresponding shard states may not exist.
1139    """
1140    if mapreduce_state is None:
1141      return []
1142
1143    keys = []
1144    for i in range(mapreduce_state.mapreduce_spec.mapper.shard_count):
1145      shard_id = cls.shard_id_from_number(mapreduce_state.key().name(), i)
1146      keys.append(cls.get_key_by_shard_id(shard_id))
1147    return keys
1148
1149  @classmethod
1150  def create_new(cls, mapreduce_id, shard_number):
1151    """Create new shard state.
1152
1153    Args:
1154      mapreduce_id: unique mapreduce id as string.
1155      shard_number: shard number for which to create shard state.
1156
1157    Returns:
1158      new instance of ShardState ready to put into datastore.
1159    """
1160    shard_id = cls.shard_id_from_number(mapreduce_id, shard_number)
1161    state = cls(key_name=shard_id,
1162                mapreduce_id=mapreduce_id)
1163    return state
1164
1165
1166class MapreduceControl(db.Model):
1167  """Datastore entity used to control mapreduce job execution.
1168
1169  Only one command may be sent to jobs at a time.
1170
1171  Properties:
1172    command: The command to send to the job.
1173  """
1174
1175  ABORT = "abort"
1176
1177  _COMMANDS = frozenset([ABORT])
1178  _KEY_NAME = "command"
1179
1180  command = db.TextProperty(choices=_COMMANDS, required=True)
1181
1182  @classmethod
1183  def kind(cls):
1184    """Returns entity kind."""
1185    return "_AE_MR_MapreduceControl"
1186
1187  @classmethod
1188  def get_key_by_job_id(cls, mapreduce_id):
1189    """Retrieves the Key for a mapreduce ID.
1190
1191    Args:
1192      mapreduce_id: The job to fetch.
1193
1194    Returns:
1195      Datastore Key for the command for the given job ID.
1196    """
1197    return db.Key.from_path(cls.kind(), "%s:%s" % (mapreduce_id, cls._KEY_NAME))
1198
1199  @classmethod
1200  def abort(cls, mapreduce_id, **kwargs):
1201    """Causes a job to abort.
1202
1203    Args:
1204      mapreduce_id: The job to abort. Not verified as a valid job.
1205    """
1206    cls(key_name="%s:%s" % (mapreduce_id, cls._KEY_NAME),
1207        command=cls.ABORT).put(**kwargs)
1208
1209
1210class QuerySpec(object):
1211  """Encapsulates everything about a query needed by DatastoreInputReader."""
1212
1213  DEFAULT_BATCH_SIZE = 50
1214  DEFAULT_OVERSPLIT_FACTOR = 1
1215
1216  def __init__(self,
1217               entity_kind,
1218               keys_only=None,
1219               filters=None,
1220               batch_size=None,
1221               oversplit_factor=None,
1222               model_class_path=None,
1223               app=None,
1224               ns=None):
1225    self.entity_kind = entity_kind
1226    self.keys_only = keys_only or False
1227    self.filters = filters or None
1228    self.batch_size = batch_size or self.DEFAULT_BATCH_SIZE
1229    self.oversplit_factor = (oversplit_factor or
1230                             self.DEFAULT_OVERSPLIT_FACTOR)
1231    self.model_class_path = model_class_path
1232    self.app = app
1233    self.ns = ns
1234
1235  def to_json(self):
1236    return {"entity_kind": self.entity_kind,
1237            "keys_only": self.keys_only,
1238            "filters": self.filters,
1239            "batch_size": self.batch_size,
1240            "oversplit_factor": self.oversplit_factor,
1241            "model_class_path": self.model_class_path,
1242            "app": self.app,
1243            "ns": self.ns}
1244
1245  @classmethod
1246  def from_json(cls, json):
1247    return cls(json["entity_kind"],
1248               json["keys_only"],
1249               json["filters"],
1250               json["batch_size"],
1251               json["oversplit_factor"],
1252               json["model_class_path"],
1253               json["app"],
1254               json["ns"])
1255