1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/kernels/batching_util/periodic_function.h"
17
18 #include <algorithm>
19
20 #include "tensorflow/core/lib/strings/strcat.h"
21 #include "tensorflow/core/platform/logging.h"
22
23 namespace tensorflow {
24 namespace serving {
25
PeriodicFunction(const std::function<void ()> & function,const int64 interval_micros,const Options & options)26 PeriodicFunction::PeriodicFunction(const std::function<void()>& function,
27 const int64 interval_micros,
28 const Options& options)
29 : function_(function),
30 interval_micros_([interval_micros]() -> int64 {
31 if (interval_micros < 0) {
32 const string error = strings::StrCat(
33 " The value of 'interval_micros' should be >= 0: ",
34 interval_micros, ". ");
35 DCHECK(false) << error;
36 LOG(WARNING) << error << "Resetting it to 0.";
37 return 0;
38 }
39 return interval_micros;
40 }()),
41 options_(options) {
42 thread_.reset(options_.env->StartThread(
__anon92d06bed0202() 43 options_.thread_options, options_.thread_name_prefix, [this]() {
44 // Record the starting time here instead of in RunLoop. That way, if
45 // there is a delay starting RunLoop, that does not affect the timing
46 // of
47 // the first function. (Such a delay can often happen in tests where
48 // the test simulates a large time delay immediately after calling
49 // Start.)
50 RunLoop(options_.env->NowMicros());
51 }));
52 }
53
~PeriodicFunction()54 PeriodicFunction::~PeriodicFunction() {
55 NotifyStop();
56
57 // Waits for thread_ to complete and clean up.
58 thread_.reset();
59 }
60
NotifyStop()61 void PeriodicFunction::NotifyStop() {
62 if (!stop_thread_.HasBeenNotified()) {
63 stop_thread_.Notify();
64 }
65 }
66
RunLoop(const int64 start)67 void PeriodicFunction::RunLoop(const int64 start) {
68 {
69 if (options_.startup_delay_micros > 0) {
70 const int64 deadline = start + options_.startup_delay_micros;
71 options_.env->SleepForMicroseconds(deadline - start);
72 }
73
74 while (!stop_thread_.HasBeenNotified()) {
75 VLOG(3) << "Running function.";
76 const int64 begin = options_.env->NowMicros();
77 function_();
78
79 // Take the max() here to guard against time going backwards which
80 // sometimes happens in multiproc machines.
81 const int64 end =
82 std::max(static_cast<int64>(options_.env->NowMicros()), begin);
83
84 // The deadline is relative to when the last function started.
85 const int64 deadline = begin + interval_micros_;
86
87 // We want to sleep until 'deadline'.
88 if (deadline > end) {
89 if (end > begin) {
90 VLOG(3) << "Reducing interval_micros from " << interval_micros_
91 << " to " << (deadline - end);
92 }
93 options_.env->SleepForMicroseconds(deadline - end);
94 } else {
95 VLOG(3) << "Function took longer than interval_micros, so not sleeping";
96 }
97 }
98 }
99 }
100
101 } // namespace serving
102 } // namespace tensorflow
103