• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2015 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.tv.data;
18 
19 import android.content.ContentResolver;
20 import android.content.Context;
21 import android.database.ContentObserver;
22 import android.database.Cursor;
23 import android.media.tv.TvContract;
24 import android.media.tv.TvContract.Programs;
25 import android.net.Uri;
26 import android.os.Handler;
27 import android.os.Looper;
28 import android.os.Message;
29 import android.support.annotation.AnyThread;
30 import android.support.annotation.MainThread;
31 import android.support.annotation.VisibleForTesting;
32 import android.util.ArraySet;
33 import android.util.Log;
34 import android.util.LongSparseArray;
35 import android.util.LruCache;
36 import com.android.tv.TvSingletons;
37 import com.android.tv.common.SoftPreconditions;
38 import com.android.tv.common.memory.MemoryManageable;
39 import com.android.tv.common.util.Clock;
40 import com.android.tv.data.api.Channel;
41 import com.android.tv.perf.EventNames;
42 import com.android.tv.perf.PerformanceMonitor;
43 import com.android.tv.perf.TimerEvent;
44 import com.android.tv.util.AsyncDbTask;
45 import com.android.tv.util.MultiLongSparseArray;
46 import com.android.tv.util.TvProviderUtils;
47 import com.android.tv.util.Utils;
48 import com.android.tv.common.flags.BackendKnobsFlags;
49 import java.util.ArrayList;
50 import java.util.Collections;
51 import java.util.HashMap;
52 import java.util.HashSet;
53 import java.util.List;
54 import java.util.ListIterator;
55 import java.util.Map;
56 import java.util.Objects;
57 import java.util.Set;
58 import java.util.concurrent.ConcurrentHashMap;
59 import java.util.concurrent.Executor;
60 import java.util.concurrent.TimeUnit;
61 
62 @MainThread
63 public class ProgramDataManager implements MemoryManageable {
64     private static final String TAG = "ProgramDataManager";
65     private static final boolean DEBUG = false;
66 
67     // To prevent from too many program update operations at the same time, we give random interval
68     // between PERIODIC_PROGRAM_UPDATE_MIN_MS and PERIODIC_PROGRAM_UPDATE_MAX_MS.
69     @VisibleForTesting
70     static final long PERIODIC_PROGRAM_UPDATE_MIN_MS = TimeUnit.MINUTES.toMillis(5);
71 
72     private static final long PERIODIC_PROGRAM_UPDATE_MAX_MS = TimeUnit.MINUTES.toMillis(10);
73     private static final long PROGRAM_PREFETCH_UPDATE_WAIT_MS = TimeUnit.SECONDS.toMillis(5);
74     // TODO: need to optimize consecutive DB updates.
75     private static final long CURRENT_PROGRAM_UPDATE_WAIT_MS = TimeUnit.SECONDS.toMillis(5);
76     @VisibleForTesting static final long PROGRAM_GUIDE_SNAP_TIME_MS = TimeUnit.MINUTES.toMillis(30);
77 
78     // TODO: Use TvContract constants, once they become public.
79     private static final String PARAM_START_TIME = "start_time";
80     private static final String PARAM_END_TIME = "end_time";
81     // COLUMN_CHANNEL_ID, COLUMN_END_TIME_UTC_MILLIS are added to detect duplicated programs.
82     // Duplicated programs are always consecutive by the sorting order.
83     private static final String SORT_BY_TIME =
84             Programs.COLUMN_START_TIME_UTC_MILLIS
85                     + ", "
86                     + Programs.COLUMN_CHANNEL_ID
87                     + ", "
88                     + Programs.COLUMN_END_TIME_UTC_MILLIS;
89 
90     private static final int MSG_UPDATE_CURRENT_PROGRAMS = 1000;
91     private static final int MSG_UPDATE_ONE_CURRENT_PROGRAM = 1001;
92     private static final int MSG_UPDATE_PREFETCH_PROGRAM = 1002;
93 
94     private final Context mContext;
95     private final Clock mClock;
96     private final ContentResolver mContentResolver;
97     private final Executor mDbExecutor;
98     private final BackendKnobsFlags mBackendKnobsFlags;
99     private final PerformanceMonitor mPerformanceMonitor;
100     private final ChannelDataManager mChannelDataManager;
101     private boolean mStarted;
102     // Updated only on the main thread.
103     private volatile boolean mCurrentProgramsLoadFinished;
104     private ProgramsUpdateTask mProgramsUpdateTask;
105     private final LongSparseArray<UpdateCurrentProgramForChannelTask> mProgramUpdateTaskMap =
106             new LongSparseArray<>();
107     private final Map<Long, Program> mChannelIdCurrentProgramMap = new ConcurrentHashMap<>();
108     private final MultiLongSparseArray<OnCurrentProgramUpdatedListener>
109             mChannelId2ProgramUpdatedListeners = new MultiLongSparseArray<>();
110     private final Handler mHandler;
111     private final Set<Callback> mCallbacks = new ArraySet<>();
112     private Map<Long, ArrayList<Program>> mChannelIdProgramCache = new ConcurrentHashMap<>();
113     private final Set<Long> mCompleteInfoChannelIds = new HashSet<>();
114     private final ContentObserver mProgramObserver;
115 
116     private boolean mPrefetchEnabled;
117     private long mProgramPrefetchUpdateWaitMs;
118     private long mLastPrefetchTaskRunMs;
119     private ProgramsPrefetchTask mProgramsPrefetchTask;
120 
121     // Any program that ends prior to this time will be removed from the cache
122     // when a channel's current program is updated.
123     // Note that there's no limit for end time.
124     private long mPrefetchTimeRangeStartMs;
125 
126     private boolean mPauseProgramUpdate = false;
127     private final LruCache<Long, Program> mZeroLengthProgramCache = new LruCache<>(10);
128 
129     @MainThread
ProgramDataManager(Context context)130     public ProgramDataManager(Context context) {
131         this(
132                 context,
133                 TvSingletons.getSingletons(context).getDbExecutor(),
134                 context.getContentResolver(),
135                 Clock.SYSTEM,
136                 Looper.myLooper(),
137                 TvSingletons.getSingletons(context).getBackendKnobs(),
138                 TvSingletons.getSingletons(context).getPerformanceMonitor(),
139                 TvSingletons.getSingletons(context).getChannelDataManager());
140     }
141 
142     @VisibleForTesting
ProgramDataManager( Context context, Executor executor, ContentResolver contentResolver, Clock time, Looper looper, BackendKnobsFlags backendKnobsFlags, PerformanceMonitor performanceMonitor, ChannelDataManager channelDataManager)143     ProgramDataManager(
144             Context context,
145             Executor executor,
146             ContentResolver contentResolver,
147             Clock time,
148             Looper looper,
149             BackendKnobsFlags backendKnobsFlags,
150             PerformanceMonitor performanceMonitor,
151             ChannelDataManager channelDataManager) {
152         mContext = context;
153         mDbExecutor = executor;
154         mClock = time;
155         mContentResolver = contentResolver;
156         mHandler = new MyHandler(looper);
157         mBackendKnobsFlags = backendKnobsFlags;
158         mPerformanceMonitor = performanceMonitor;
159         mChannelDataManager = channelDataManager;
160         mProgramObserver =
161                 new ContentObserver(mHandler) {
162                     @Override
163                     public void onChange(boolean selfChange) {
164                         if (!mHandler.hasMessages(MSG_UPDATE_CURRENT_PROGRAMS)) {
165                             mHandler.sendEmptyMessage(MSG_UPDATE_CURRENT_PROGRAMS);
166                         }
167                         if (isProgramUpdatePaused()) {
168                             return;
169                         }
170                         if (mPrefetchEnabled) {
171                             // The delay time of an existing MSG_UPDATE_PREFETCH_PROGRAM could be
172                             // quite long
173                             // up to PROGRAM_GUIDE_SNAP_TIME_MS. So we need to remove the existing
174                             // message
175                             // and send MSG_UPDATE_PREFETCH_PROGRAM again.
176                             mHandler.removeMessages(MSG_UPDATE_PREFETCH_PROGRAM);
177                             mHandler.sendEmptyMessage(MSG_UPDATE_PREFETCH_PROGRAM);
178                         }
179                     }
180                 };
181         mProgramPrefetchUpdateWaitMs = PROGRAM_PREFETCH_UPDATE_WAIT_MS;
182     }
183 
184     @VisibleForTesting
getContentObserver()185     ContentObserver getContentObserver() {
186         return mProgramObserver;
187     }
188 
189     /**
190      * Set the program prefetch update wait which gives the delay to query all programs from DB to
191      * prevent from too frequent DB queries. Default value is {@link
192      * #PROGRAM_PREFETCH_UPDATE_WAIT_MS}
193      */
194     @VisibleForTesting
setProgramPrefetchUpdateWait(long programPrefetchUpdateWaitMs)195     void setProgramPrefetchUpdateWait(long programPrefetchUpdateWaitMs) {
196         mProgramPrefetchUpdateWaitMs = programPrefetchUpdateWaitMs;
197     }
198 
199     /** Starts the manager. */
start()200     public void start() {
201         if (mStarted) {
202             return;
203         }
204         mStarted = true;
205         // Should be called directly instead of posting MSG_UPDATE_CURRENT_PROGRAMS message
206         // to the handler. If not, another DB task can be executed before loading current programs.
207         handleUpdateCurrentPrograms();
208         if (mPrefetchEnabled) {
209             mHandler.sendEmptyMessage(MSG_UPDATE_PREFETCH_PROGRAM);
210         }
211         mContentResolver.registerContentObserver(Programs.CONTENT_URI, true, mProgramObserver);
212     }
213 
214     /**
215      * Stops the manager. It clears manager states and runs pending DB operations. Added listeners
216      * aren't automatically removed by this method.
217      */
218     @VisibleForTesting
stop()219     public void stop() {
220         if (!mStarted) {
221             return;
222         }
223         mStarted = false;
224         mContentResolver.unregisterContentObserver(mProgramObserver);
225         mHandler.removeCallbacksAndMessages(null);
226 
227         clearTask(mProgramUpdateTaskMap);
228         cancelPrefetchTask();
229         if (mProgramsUpdateTask != null) {
230             mProgramsUpdateTask.cancel(true);
231             mProgramsUpdateTask = null;
232         }
233     }
234 
235     @AnyThread
isCurrentProgramsLoadFinished()236     public boolean isCurrentProgramsLoadFinished() {
237         return mCurrentProgramsLoadFinished;
238     }
239 
240     /** Returns the current program at the specified channel. */
241     @AnyThread
getCurrentProgram(long channelId)242     public Program getCurrentProgram(long channelId) {
243         return mChannelIdCurrentProgramMap.get(channelId);
244     }
245 
246     /** Returns all the current programs. */
247     @AnyThread
getCurrentPrograms()248     public List<Program> getCurrentPrograms() {
249         return new ArrayList<>(mChannelIdCurrentProgramMap.values());
250     }
251 
252     /** Reloads program data. */
reload()253     public void reload() {
254         if (!mHandler.hasMessages(MSG_UPDATE_CURRENT_PROGRAMS)) {
255             mHandler.sendEmptyMessage(MSG_UPDATE_CURRENT_PROGRAMS);
256         }
257         if (mPrefetchEnabled && !mHandler.hasMessages(MSG_UPDATE_PREFETCH_PROGRAM)) {
258             mHandler.sendEmptyMessage(MSG_UPDATE_PREFETCH_PROGRAM);
259         }
260     }
261 
prefetchChannel(long channelId)262     public void prefetchChannel(long channelId) {
263         if (mCompleteInfoChannelIds.add(channelId)) {
264             long startTimeMs =
265                     Utils.floorTime(
266                             mClock.currentTimeMillis() - PROGRAM_GUIDE_SNAP_TIME_MS,
267                             PROGRAM_GUIDE_SNAP_TIME_MS);
268             long endTimeMs = startTimeMs + TimeUnit.HOURS.toMillis(getFetchDuration());
269             new SingleChannelPrefetchTask(channelId, startTimeMs, endTimeMs).executeOnDbThread();
270         }
271     }
272 
273     /** A Callback interface to receive notification on program data retrieval from DB. */
274     public interface Callback {
275         /**
276          * Called when a Program data is now available through getProgram() after the DB operation
277          * is done which wasn't before. This would be called only if fetched data is around the
278          * selected program.
279          */
onProgramUpdated()280         void onProgramUpdated();
281 
282         /**
283          * Called when we update complete program data of specific channel during scrolling. Data is
284          * loaded from DB on request basis.
285          *
286          * @param channelId
287          */
onSingleChannelUpdated(long channelId)288         void onSingleChannelUpdated(long channelId);
289     }
290 
291     /** Adds the {@link Callback}. */
addCallback(Callback callback)292     public void addCallback(Callback callback) {
293         mCallbacks.add(callback);
294     }
295 
296     /** Removes the {@link Callback}. */
removeCallback(Callback callback)297     public void removeCallback(Callback callback) {
298         mCallbacks.remove(callback);
299     }
300 
301     /** Enables or Disables program prefetch. */
setPrefetchEnabled(boolean enable)302     public void setPrefetchEnabled(boolean enable) {
303         if (mPrefetchEnabled == enable) {
304             return;
305         }
306         if (enable) {
307             mPrefetchEnabled = true;
308             mLastPrefetchTaskRunMs = 0;
309             if (mStarted) {
310                 mHandler.sendEmptyMessage(MSG_UPDATE_PREFETCH_PROGRAM);
311             }
312         } else {
313             mPrefetchEnabled = false;
314             cancelPrefetchTask();
315             mChannelIdProgramCache.clear();
316             mHandler.removeMessages(MSG_UPDATE_PREFETCH_PROGRAM);
317         }
318     }
319 
320     /**
321      * Returns the programs for the given channel which ends after the given start time.
322      *
323      * <p>Prefetch should be enabled to call it.
324      *
325      * @return {@link List} with Programs. It may includes dummy program if the entry needs DB
326      *     operations to get.
327      */
getPrograms(long channelId, long startTime)328     public List<Program> getPrograms(long channelId, long startTime) {
329         SoftPreconditions.checkState(mPrefetchEnabled, TAG, "Prefetch is disabled.");
330         ArrayList<Program> cachedPrograms = mChannelIdProgramCache.get(channelId);
331         if (cachedPrograms == null) {
332             return Collections.emptyList();
333         }
334         int startIndex = getProgramIndexAt(cachedPrograms, startTime);
335         return Collections.unmodifiableList(
336                 cachedPrograms.subList(startIndex, cachedPrograms.size()));
337     }
338 
339     /**
340      * Returns the index of program that is played at the specified time.
341      *
342      * <p>If there isn't, return the first program among programs that starts after the given time
343      * if returnNextProgram is {@code true}.
344      */
getProgramIndexAt(List<Program> programs, long time)345     private int getProgramIndexAt(List<Program> programs, long time) {
346         Program key = mZeroLengthProgramCache.get(time);
347         if (key == null) {
348             key = createDummyProgram(time, time);
349             mZeroLengthProgramCache.put(time, key);
350         }
351         int index = Collections.binarySearch(programs, key);
352         if (index < 0) {
353             index = -(index + 1); // change it to index to be added.
354             if (index > 0 && isProgramPlayedAt(programs.get(index - 1), time)) {
355                 // A program is played at that time.
356                 return index - 1;
357             }
358             return index;
359         }
360         return index;
361     }
362 
isProgramPlayedAt(Program program, long time)363     private boolean isProgramPlayedAt(Program program, long time) {
364         return program.getStartTimeUtcMillis() <= time && time <= program.getEndTimeUtcMillis();
365     }
366 
367     /**
368      * Adds the listener to be notified if current program is updated for a channel.
369      *
370      * @param channelId A channel ID to get notified. If it's {@link Channel#INVALID_ID}, the
371      *     listener would be called whenever a current program is updated.
372      */
addOnCurrentProgramUpdatedListener( long channelId, OnCurrentProgramUpdatedListener listener)373     public void addOnCurrentProgramUpdatedListener(
374             long channelId, OnCurrentProgramUpdatedListener listener) {
375         mChannelId2ProgramUpdatedListeners.put(channelId, listener);
376     }
377 
378     /**
379      * Removes the listener previously added by {@link #addOnCurrentProgramUpdatedListener(long,
380      * OnCurrentProgramUpdatedListener)}.
381      */
removeOnCurrentProgramUpdatedListener( long channelId, OnCurrentProgramUpdatedListener listener)382     public void removeOnCurrentProgramUpdatedListener(
383             long channelId, OnCurrentProgramUpdatedListener listener) {
384         mChannelId2ProgramUpdatedListeners.remove(channelId, listener);
385     }
386 
notifyCurrentProgramUpdate(long channelId, Program program)387     private void notifyCurrentProgramUpdate(long channelId, Program program) {
388         for (OnCurrentProgramUpdatedListener listener :
389                 mChannelId2ProgramUpdatedListeners.get(channelId)) {
390             listener.onCurrentProgramUpdated(channelId, program);
391         }
392         for (OnCurrentProgramUpdatedListener listener :
393                 mChannelId2ProgramUpdatedListeners.get(Channel.INVALID_ID)) {
394             listener.onCurrentProgramUpdated(channelId, program);
395         }
396     }
397 
updateCurrentProgram(long channelId, Program program)398     private void updateCurrentProgram(long channelId, Program program) {
399         Program previousProgram =
400                 program == null
401                         ? mChannelIdCurrentProgramMap.remove(channelId)
402                         : mChannelIdCurrentProgramMap.put(channelId, program);
403         if (!Objects.equals(program, previousProgram)) {
404             if (mPrefetchEnabled) {
405                 removePreviousProgramsAndUpdateCurrentProgramInCache(channelId, program);
406             }
407             notifyCurrentProgramUpdate(channelId, program);
408         }
409 
410         long delayedTime;
411         if (program == null) {
412             delayedTime =
413                     PERIODIC_PROGRAM_UPDATE_MIN_MS
414                             + (long)
415                                     (Math.random()
416                                             * (PERIODIC_PROGRAM_UPDATE_MAX_MS
417                                                     - PERIODIC_PROGRAM_UPDATE_MIN_MS));
418         } else {
419             delayedTime = program.getEndTimeUtcMillis() - mClock.currentTimeMillis();
420         }
421         mHandler.sendMessageDelayed(
422                 mHandler.obtainMessage(MSG_UPDATE_ONE_CURRENT_PROGRAM, channelId), delayedTime);
423     }
424 
removePreviousProgramsAndUpdateCurrentProgramInCache( long channelId, Program currentProgram)425     private void removePreviousProgramsAndUpdateCurrentProgramInCache(
426             long channelId, Program currentProgram) {
427         SoftPreconditions.checkState(mPrefetchEnabled, TAG, "Prefetch is disabled.");
428         if (!Program.isProgramValid(currentProgram)) {
429             return;
430         }
431         ArrayList<Program> cachedPrograms = mChannelIdProgramCache.remove(channelId);
432         if (cachedPrograms == null) {
433             return;
434         }
435         ListIterator<Program> i = cachedPrograms.listIterator();
436         while (i.hasNext()) {
437             Program cachedProgram = i.next();
438             if (cachedProgram.getEndTimeUtcMillis() <= mPrefetchTimeRangeStartMs) {
439                 // Remove previous programs which will not be shown in program guide.
440                 i.remove();
441                 continue;
442             }
443 
444             if (cachedProgram.getEndTimeUtcMillis() <= currentProgram.getStartTimeUtcMillis()) {
445                 // Keep the programs that ends earlier than current program
446                 // but later than mPrefetchTimeRangeStartMs.
447                 continue;
448             }
449 
450             // Update dummy program around current program if any.
451             if (cachedProgram.getStartTimeUtcMillis() < currentProgram.getStartTimeUtcMillis()) {
452                 // The dummy program starts earlier than the current program. Adjust its end time.
453                 i.set(
454                         createDummyProgram(
455                                 cachedProgram.getStartTimeUtcMillis(),
456                                 currentProgram.getStartTimeUtcMillis()));
457                 i.add(currentProgram);
458             } else {
459                 i.set(currentProgram);
460             }
461             if (currentProgram.getEndTimeUtcMillis() < cachedProgram.getEndTimeUtcMillis()) {
462                 // The dummy program ends later than the current program. Adjust its start time.
463                 i.add(
464                         createDummyProgram(
465                                 currentProgram.getEndTimeUtcMillis(),
466                                 cachedProgram.getEndTimeUtcMillis()));
467             }
468             break;
469         }
470         if (cachedPrograms.isEmpty()) {
471             // If all the cached programs finish before mPrefetchTimeRangeStartMs, the
472             // currentProgram would not have a chance to be inserted to the cache.
473             cachedPrograms.add(currentProgram);
474         }
475         mChannelIdProgramCache.put(channelId, cachedPrograms);
476     }
477 
handleUpdateCurrentPrograms()478     private void handleUpdateCurrentPrograms() {
479         if (mProgramsUpdateTask != null) {
480             mHandler.sendEmptyMessageDelayed(
481                     MSG_UPDATE_CURRENT_PROGRAMS, CURRENT_PROGRAM_UPDATE_WAIT_MS);
482             return;
483         }
484         clearTask(mProgramUpdateTaskMap);
485         mHandler.removeMessages(MSG_UPDATE_ONE_CURRENT_PROGRAM);
486         mProgramsUpdateTask = new ProgramsUpdateTask(mClock.currentTimeMillis());
487         mProgramsUpdateTask.executeOnDbThread();
488     }
489 
490     private class ProgramsPrefetchTask
491             extends AsyncDbTask<Void, Void, Map<Long, ArrayList<Program>>> {
492         private final long mStartTimeMs;
493         private final long mEndTimeMs;
494 
495         private boolean mSuccess;
496         private TimerEvent mFromEmptyCacheTimeEvent;
497 
ProgramsPrefetchTask()498         public ProgramsPrefetchTask() {
499             super(mDbExecutor);
500             long time = mClock.currentTimeMillis();
501             mStartTimeMs =
502                     Utils.floorTime(time - PROGRAM_GUIDE_SNAP_TIME_MS, PROGRAM_GUIDE_SNAP_TIME_MS);
503             mEndTimeMs = mStartTimeMs + TimeUnit.HOURS.toMillis(getFetchDuration());
504             mSuccess = false;
505         }
506 
507         @Override
onPreExecute()508         protected void onPreExecute() {
509             if (mChannelIdCurrentProgramMap.isEmpty()) {
510                 // No current program guide is shown.
511                 // Measure the delay before users can see program guides.
512                 mFromEmptyCacheTimeEvent = mPerformanceMonitor.startTimer();
513             }
514         }
515 
516         @Override
doInBackground(Void... params)517         protected Map<Long, ArrayList<Program>> doInBackground(Void... params) {
518             TimerEvent asyncTimeEvent = mPerformanceMonitor.startTimer();
519             Map<Long, ArrayList<Program>> programMap = new HashMap<>();
520             if (DEBUG) {
521                 Log.d(
522                         TAG,
523                         "Starts programs prefetch. "
524                                 + Utils.toTimeString(mStartTimeMs)
525                                 + "-"
526                                 + Utils.toTimeString(mEndTimeMs));
527             }
528             Uri uri =
529                     Programs.CONTENT_URI
530                             .buildUpon()
531                             .appendQueryParameter(PARAM_START_TIME, String.valueOf(mStartTimeMs))
532                             .appendQueryParameter(PARAM_END_TIME, String.valueOf(mEndTimeMs))
533                             .build();
534             final int RETRY_COUNT = 3;
535             Program lastReadProgram = null;
536             for (int retryCount = RETRY_COUNT; retryCount > 0; retryCount--) {
537                 if (isProgramUpdatePaused()) {
538                     return null;
539                 }
540                 programMap.clear();
541 
542                 String[] projection =
543                         mBackendKnobsFlags.enablePartialProgramFetch()
544                                 ? Program.PARTIAL_PROJECTION
545                                 : Program.PROJECTION;
546                 if (TvProviderUtils.checkSeriesIdColumn(mContext, Programs.CONTENT_URI)) {
547                     if (Utils.isProgramsUri(uri)) {
548                         projection =
549                                 TvProviderUtils.addExtraColumnsToProjection(
550                                         projection, TvProviderUtils.EXTRA_PROGRAM_COLUMN_SERIES_ID);
551                     }
552                 }
553                 try (Cursor c = mContentResolver.query(uri, projection, null, null, SORT_BY_TIME)) {
554                     if (c == null) {
555                         continue;
556                     }
557                     while (c.moveToNext()) {
558                         int duplicateCount = 0;
559                         if (isCancelled()) {
560                             if (DEBUG) {
561                                 Log.d(TAG, "ProgramsPrefetchTask canceled.");
562                             }
563                             return null;
564                         }
565                         Program program =
566                                 mBackendKnobsFlags.enablePartialProgramFetch()
567                                         ? Program.fromCursorPartialProjection(c)
568                                         : Program.fromCursor(c);
569                         if (Program.isDuplicate(program, lastReadProgram)) {
570                             duplicateCount++;
571                             continue;
572                         } else {
573                             lastReadProgram = program;
574                         }
575                         ArrayList<Program> programs = programMap.get(program.getChannelId());
576                         if (programs == null) {
577                             programs = new ArrayList<>();
578                             if (mBackendKnobsFlags.enablePartialProgramFetch()) {
579                                 // To skip already loaded complete data.
580                                 Program currentProgramInfo =
581                                         mChannelIdCurrentProgramMap.get(program.getChannelId());
582                                 if (currentProgramInfo != null
583                                         && Program.isDuplicate(program, currentProgramInfo)) {
584                                     program = currentProgramInfo;
585                                 }
586                             }
587                             programMap.put(program.getChannelId(), programs);
588                         }
589                         programs.add(program);
590                         if (duplicateCount > 0) {
591                             Log.w(TAG, "Found " + duplicateCount + " duplicate programs");
592                         }
593                     }
594                     mSuccess = true;
595                     break;
596                 } catch (IllegalStateException e) {
597                     if (DEBUG) {
598                         Log.d(TAG, "Database is changed while querying. Will retry.");
599                     }
600                 } catch (SecurityException e) {
601                     Log.w(TAG, "Security exception during program data query", e);
602                 } catch (Exception e) {
603                     Log.w(TAG, "Error during program data query", e);
604                 }
605             }
606             if (DEBUG) {
607                 Log.d(TAG, "Ends programs prefetch for " + programMap.size() + " channels");
608             }
609             mPerformanceMonitor.stopTimer(
610                     asyncTimeEvent,
611                     EventNames.PROGRAM_DATA_MANAGER_PROGRAMS_PREFETCH_TASK_DO_IN_BACKGROUND);
612             return programMap;
613         }
614 
615         @Override
onPostExecute(Map<Long, ArrayList<Program>> programs)616         protected void onPostExecute(Map<Long, ArrayList<Program>> programs) {
617             mProgramsPrefetchTask = null;
618             if (isProgramUpdatePaused()) {
619                 // ProgramsPrefetchTask will run again once setPauseProgramUpdate(false) is called.
620                 return;
621             }
622             long nextMessageDelayedTime;
623             if (mSuccess) {
624                 long currentTime = mClock.currentTimeMillis();
625                 mLastPrefetchTaskRunMs = currentTime;
626                 nextMessageDelayedTime =
627                         Utils.floorTime(
628                                         mLastPrefetchTaskRunMs + PROGRAM_GUIDE_SNAP_TIME_MS,
629                                         PROGRAM_GUIDE_SNAP_TIME_MS)
630                                 - currentTime;
631                 // Issue second pre-fetch immediately after the first partial update
632                 if (mChannelIdProgramCache.isEmpty()) {
633                     nextMessageDelayedTime = 0;
634                 }
635                 mChannelIdProgramCache = programs;
636                 if (mBackendKnobsFlags.enablePartialProgramFetch()) {
637                     // Since cache has partial data we need to reset the map of complete data.
638                     mCompleteInfoChannelIds.clear();
639                 }
640                 notifyProgramUpdated();
641                 if (mFromEmptyCacheTimeEvent != null) {
642                     mPerformanceMonitor.stopTimer(
643                             mFromEmptyCacheTimeEvent,
644                             EventNames.PROGRAM_GUIDE_SHOW_FROM_EMPTY_CACHE);
645                     mFromEmptyCacheTimeEvent = null;
646                 }
647             } else {
648                 nextMessageDelayedTime = PERIODIC_PROGRAM_UPDATE_MIN_MS;
649             }
650             if (!mHandler.hasMessages(MSG_UPDATE_PREFETCH_PROGRAM)) {
651                 mHandler.sendEmptyMessageDelayed(
652                         MSG_UPDATE_PREFETCH_PROGRAM, nextMessageDelayedTime);
653             }
654         }
655     }
656 
getFetchDuration()657     private long getFetchDuration() {
658         if (mChannelIdProgramCache.isEmpty()) {
659             return Math.max(1L, mBackendKnobsFlags.programGuideInitialFetchHours());
660         } else {
661             long durationHours;
662             int channelCount = mChannelDataManager.getChannelCount();
663             long knobsMaxHours = mBackendKnobsFlags.programGuideMaxHours();
664             long targetChannelCount = mBackendKnobsFlags.epgTargetChannelCount();
665             if (channelCount <= targetChannelCount) {
666                 durationHours = Math.max(48L, knobsMaxHours);
667             } else {
668                 // 2 days <= duration <= 14 days (336 hours)
669                 durationHours = knobsMaxHours * targetChannelCount / channelCount;
670                 if (durationHours < 48L) {
671                     durationHours = 48L;
672                 } else if (durationHours > 336L) {
673                     durationHours = 336L;
674                 }
675             }
676             return durationHours;
677         }
678     }
679 
680     private class SingleChannelPrefetchTask extends AsyncDbTask.AsyncQueryTask<ArrayList<Program>> {
681         long mChannelId;
682 
SingleChannelPrefetchTask(long channelId, long startTimeMs, long endTimeMs)683         public SingleChannelPrefetchTask(long channelId, long startTimeMs, long endTimeMs) {
684             super(
685                     mDbExecutor,
686                     mContext,
687                     TvContract.buildProgramsUriForChannel(channelId, startTimeMs, endTimeMs),
688                     Program.PROJECTION,
689                     null,
690                     null,
691                     SORT_BY_TIME);
692             mChannelId = channelId;
693         }
694 
695         @Override
onQuery(Cursor c)696         protected ArrayList<Program> onQuery(Cursor c) {
697             ArrayList<Program> programMap = new ArrayList<>();
698             while (c.moveToNext()) {
699                 Program program = Program.fromCursor(c);
700                 programMap.add(program);
701             }
702             return programMap;
703         }
704 
705         @Override
onPostExecute(ArrayList<Program> programs)706         protected void onPostExecute(ArrayList<Program> programs) {
707             mChannelIdProgramCache.put(mChannelId, programs);
708             notifySingleChannelUpdated(mChannelId);
709         }
710     }
711 
notifyProgramUpdated()712     private void notifyProgramUpdated() {
713         for (Callback callback : mCallbacks) {
714             callback.onProgramUpdated();
715         }
716     }
717 
notifySingleChannelUpdated(long channelId)718     private void notifySingleChannelUpdated(long channelId) {
719         for (Callback callback : mCallbacks) {
720             callback.onSingleChannelUpdated(channelId);
721         }
722     }
723 
724     private class ProgramsUpdateTask extends AsyncDbTask.AsyncQueryTask<List<Program>> {
ProgramsUpdateTask(long time)725         public ProgramsUpdateTask(long time) {
726             super(
727                     mDbExecutor,
728                     mContext,
729                     Programs.CONTENT_URI
730                             .buildUpon()
731                             .appendQueryParameter(PARAM_START_TIME, String.valueOf(time))
732                             .appendQueryParameter(PARAM_END_TIME, String.valueOf(time))
733                             .build(),
734                     Program.PROJECTION,
735                     null,
736                     null,
737                     SORT_BY_TIME);
738         }
739 
740         @Override
onQuery(Cursor c)741         public List<Program> onQuery(Cursor c) {
742             final List<Program> programs = new ArrayList<>();
743             if (c != null) {
744                 int duplicateCount = 0;
745                 Program lastReadProgram = null;
746                 while (c.moveToNext()) {
747                     if (isCancelled()) {
748                         return programs;
749                     }
750                     Program program = Program.fromCursor(c);
751                     if (Program.isDuplicate(program, lastReadProgram)) {
752                         duplicateCount++;
753                         continue;
754                     } else {
755                         lastReadProgram = program;
756                     }
757                     programs.add(program);
758                 }
759                 if (duplicateCount > 0) {
760                     Log.w(TAG, "Found " + duplicateCount + " duplicate programs");
761                 }
762             }
763             return programs;
764         }
765 
766         @Override
onPostExecute(List<Program> programs)767         protected void onPostExecute(List<Program> programs) {
768             if (DEBUG) Log.d(TAG, "ProgramsUpdateTask done");
769             mProgramsUpdateTask = null;
770             if (programs != null) {
771                 Set<Long> removedChannelIds = new HashSet<>(mChannelIdCurrentProgramMap.keySet());
772                 for (Program program : programs) {
773                     long channelId = program.getChannelId();
774                     updateCurrentProgram(channelId, program);
775                     removedChannelIds.remove(channelId);
776                 }
777                 for (Long channelId : removedChannelIds) {
778                     if (mPrefetchEnabled) {
779                         mChannelIdProgramCache.remove(channelId);
780                         if (mBackendKnobsFlags.enablePartialProgramFetch()) {
781                             mCompleteInfoChannelIds.remove(channelId);
782                         }
783                     }
784                     mChannelIdCurrentProgramMap.remove(channelId);
785                     notifyCurrentProgramUpdate(channelId, null);
786                 }
787             }
788             mCurrentProgramsLoadFinished = true;
789         }
790     }
791 
792     private class UpdateCurrentProgramForChannelTask extends AsyncDbTask.AsyncQueryTask<Program> {
793         private final long mChannelId;
794 
UpdateCurrentProgramForChannelTask(long channelId, long time)795         private UpdateCurrentProgramForChannelTask(long channelId, long time) {
796             super(
797                     mDbExecutor,
798                     mContext,
799                     TvContract.buildProgramsUriForChannel(channelId, time, time),
800                     Program.PROJECTION,
801                     null,
802                     null,
803                     SORT_BY_TIME);
804             mChannelId = channelId;
805         }
806 
807         @Override
onQuery(Cursor c)808         public Program onQuery(Cursor c) {
809             Program program = null;
810             if (c != null && c.moveToNext()) {
811                 program = Program.fromCursor(c);
812             }
813             return program;
814         }
815 
816         @Override
onPostExecute(Program program)817         protected void onPostExecute(Program program) {
818             mProgramUpdateTaskMap.remove(mChannelId);
819             updateCurrentProgram(mChannelId, program);
820         }
821     }
822 
823     private class MyHandler extends Handler {
MyHandler(Looper looper)824         public MyHandler(Looper looper) {
825             super(looper);
826         }
827 
828         @Override
handleMessage(Message msg)829         public void handleMessage(Message msg) {
830             switch (msg.what) {
831                 case MSG_UPDATE_CURRENT_PROGRAMS:
832                     handleUpdateCurrentPrograms();
833                     break;
834                 case MSG_UPDATE_ONE_CURRENT_PROGRAM:
835                     {
836                         long channelId = (Long) msg.obj;
837                         UpdateCurrentProgramForChannelTask oldTask =
838                                 mProgramUpdateTaskMap.get(channelId);
839                         if (oldTask != null) {
840                             oldTask.cancel(true);
841                         }
842                         UpdateCurrentProgramForChannelTask task =
843                                 new UpdateCurrentProgramForChannelTask(
844                                         channelId, mClock.currentTimeMillis());
845                         mProgramUpdateTaskMap.put(channelId, task);
846                         task.executeOnDbThread();
847                         break;
848                     }
849                 case MSG_UPDATE_PREFETCH_PROGRAM:
850                     {
851                         if (isProgramUpdatePaused()) {
852                             return;
853                         }
854                         if (mProgramsPrefetchTask != null) {
855                             mHandler.sendEmptyMessageDelayed(
856                                     msg.what, mProgramPrefetchUpdateWaitMs);
857                             return;
858                         }
859                         long delayMillis =
860                                 mLastPrefetchTaskRunMs
861                                         + mProgramPrefetchUpdateWaitMs
862                                         - mClock.currentTimeMillis();
863                         if (delayMillis > 0) {
864                             mHandler.sendEmptyMessageDelayed(
865                                     MSG_UPDATE_PREFETCH_PROGRAM, delayMillis);
866                         } else {
867                             mProgramsPrefetchTask = new ProgramsPrefetchTask();
868                             mProgramsPrefetchTask.executeOnDbThread();
869                         }
870                         break;
871                     }
872                 default:
873                     // Do nothing
874             }
875         }
876     }
877 
878     /**
879      * Pause program update. Updating program data will result in UI refresh, but UI is fragile to
880      * handle it so we'd better disable it for a while.
881      *
882      * <p>Prefetch should be enabled to call it.
883      */
setPauseProgramUpdate(boolean pauseProgramUpdate)884     public void setPauseProgramUpdate(boolean pauseProgramUpdate) {
885         SoftPreconditions.checkState(mPrefetchEnabled, TAG, "Prefetch is disabled.");
886         if (mPauseProgramUpdate && !pauseProgramUpdate) {
887             if (!mHandler.hasMessages(MSG_UPDATE_PREFETCH_PROGRAM)) {
888                 // MSG_UPDATE_PRFETCH_PROGRAM can be empty
889                 // if prefetch task is launched while program update is paused.
890                 // Update immediately in that case.
891                 mHandler.sendEmptyMessage(MSG_UPDATE_PREFETCH_PROGRAM);
892             }
893         }
894         mPauseProgramUpdate = pauseProgramUpdate;
895     }
896 
isProgramUpdatePaused()897     private boolean isProgramUpdatePaused() {
898         // Although pause is requested, we need to keep updating if cache is empty.
899         return mPauseProgramUpdate && !mChannelIdProgramCache.isEmpty();
900     }
901 
902     /**
903      * Sets program data prefetch time range. Any program data that ends before the start time will
904      * be removed from the cache later. Note that there's no limit for end time.
905      *
906      * <p>Prefetch should be enabled to call it.
907      */
setPrefetchTimeRange(long startTimeMs)908     public void setPrefetchTimeRange(long startTimeMs) {
909         SoftPreconditions.checkState(mPrefetchEnabled, TAG, "Prefetch is disabled.");
910         if (mPrefetchTimeRangeStartMs > startTimeMs) {
911             // Fetch the programs immediately to re-create the cache.
912             if (!mHandler.hasMessages(MSG_UPDATE_PREFETCH_PROGRAM)) {
913                 mHandler.sendEmptyMessage(MSG_UPDATE_PREFETCH_PROGRAM);
914             }
915         }
916         mPrefetchTimeRangeStartMs = startTimeMs;
917     }
918 
clearTask(LongSparseArray<UpdateCurrentProgramForChannelTask> tasks)919     private void clearTask(LongSparseArray<UpdateCurrentProgramForChannelTask> tasks) {
920         for (int i = 0; i < tasks.size(); i++) {
921             tasks.valueAt(i).cancel(true);
922         }
923         tasks.clear();
924     }
925 
cancelPrefetchTask()926     private void cancelPrefetchTask() {
927         if (mProgramsPrefetchTask != null) {
928             mProgramsPrefetchTask.cancel(true);
929             mProgramsPrefetchTask = null;
930         }
931     }
932 
933     // Create dummy program which indicates data isn't loaded yet so DB query is required.
createDummyProgram(long startTimeMs, long endTimeMs)934     private Program createDummyProgram(long startTimeMs, long endTimeMs) {
935         return new Program.Builder()
936                 .setChannelId(Channel.INVALID_ID)
937                 .setStartTimeUtcMillis(startTimeMs)
938                 .setEndTimeUtcMillis(endTimeMs)
939                 .build();
940     }
941 
942     @Override
performTrimMemory(int level)943     public void performTrimMemory(int level) {
944         mChannelId2ProgramUpdatedListeners.clearEmptyCache();
945     }
946 }
947