1 /* 2 * Copyright (C) 2015 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.tv.data; 18 19 import android.content.ContentResolver; 20 import android.content.Context; 21 import android.database.ContentObserver; 22 import android.database.Cursor; 23 import android.media.tv.TvContract; 24 import android.media.tv.TvContract.Programs; 25 import android.net.Uri; 26 import android.os.Handler; 27 import android.os.Looper; 28 import android.os.Message; 29 import android.support.annotation.AnyThread; 30 import android.support.annotation.MainThread; 31 import android.support.annotation.VisibleForTesting; 32 import android.util.ArraySet; 33 import android.util.Log; 34 import android.util.LongSparseArray; 35 import android.util.LruCache; 36 import com.android.tv.TvSingletons; 37 import com.android.tv.common.SoftPreconditions; 38 import com.android.tv.common.memory.MemoryManageable; 39 import com.android.tv.common.util.Clock; 40 import com.android.tv.data.api.Channel; 41 import com.android.tv.perf.EventNames; 42 import com.android.tv.perf.PerformanceMonitor; 43 import com.android.tv.perf.TimerEvent; 44 import com.android.tv.util.AsyncDbTask; 45 import com.android.tv.util.MultiLongSparseArray; 46 import com.android.tv.util.TvProviderUtils; 47 import com.android.tv.util.Utils; 48 import com.android.tv.common.flags.BackendKnobsFlags; 49 import java.util.ArrayList; 50 import java.util.Collections; 51 import java.util.HashMap; 52 import java.util.HashSet; 53 import java.util.List; 54 import java.util.ListIterator; 55 import java.util.Map; 56 import java.util.Objects; 57 import java.util.Set; 58 import java.util.concurrent.ConcurrentHashMap; 59 import java.util.concurrent.Executor; 60 import java.util.concurrent.TimeUnit; 61 62 @MainThread 63 public class ProgramDataManager implements MemoryManageable { 64 private static final String TAG = "ProgramDataManager"; 65 private static final boolean DEBUG = false; 66 67 // To prevent from too many program update operations at the same time, we give random interval 68 // between PERIODIC_PROGRAM_UPDATE_MIN_MS and PERIODIC_PROGRAM_UPDATE_MAX_MS. 69 @VisibleForTesting 70 static final long PERIODIC_PROGRAM_UPDATE_MIN_MS = TimeUnit.MINUTES.toMillis(5); 71 72 private static final long PERIODIC_PROGRAM_UPDATE_MAX_MS = TimeUnit.MINUTES.toMillis(10); 73 private static final long PROGRAM_PREFETCH_UPDATE_WAIT_MS = TimeUnit.SECONDS.toMillis(5); 74 // TODO: need to optimize consecutive DB updates. 75 private static final long CURRENT_PROGRAM_UPDATE_WAIT_MS = TimeUnit.SECONDS.toMillis(5); 76 @VisibleForTesting static final long PROGRAM_GUIDE_SNAP_TIME_MS = TimeUnit.MINUTES.toMillis(30); 77 78 // TODO: Use TvContract constants, once they become public. 79 private static final String PARAM_START_TIME = "start_time"; 80 private static final String PARAM_END_TIME = "end_time"; 81 // COLUMN_CHANNEL_ID, COLUMN_END_TIME_UTC_MILLIS are added to detect duplicated programs. 82 // Duplicated programs are always consecutive by the sorting order. 83 private static final String SORT_BY_TIME = 84 Programs.COLUMN_START_TIME_UTC_MILLIS 85 + ", " 86 + Programs.COLUMN_CHANNEL_ID 87 + ", " 88 + Programs.COLUMN_END_TIME_UTC_MILLIS; 89 90 private static final int MSG_UPDATE_CURRENT_PROGRAMS = 1000; 91 private static final int MSG_UPDATE_ONE_CURRENT_PROGRAM = 1001; 92 private static final int MSG_UPDATE_PREFETCH_PROGRAM = 1002; 93 94 private final Context mContext; 95 private final Clock mClock; 96 private final ContentResolver mContentResolver; 97 private final Executor mDbExecutor; 98 private final BackendKnobsFlags mBackendKnobsFlags; 99 private final PerformanceMonitor mPerformanceMonitor; 100 private final ChannelDataManager mChannelDataManager; 101 private boolean mStarted; 102 // Updated only on the main thread. 103 private volatile boolean mCurrentProgramsLoadFinished; 104 private ProgramsUpdateTask mProgramsUpdateTask; 105 private final LongSparseArray<UpdateCurrentProgramForChannelTask> mProgramUpdateTaskMap = 106 new LongSparseArray<>(); 107 private final Map<Long, Program> mChannelIdCurrentProgramMap = new ConcurrentHashMap<>(); 108 private final MultiLongSparseArray<OnCurrentProgramUpdatedListener> 109 mChannelId2ProgramUpdatedListeners = new MultiLongSparseArray<>(); 110 private final Handler mHandler; 111 private final Set<Callback> mCallbacks = new ArraySet<>(); 112 private Map<Long, ArrayList<Program>> mChannelIdProgramCache = new ConcurrentHashMap<>(); 113 private final Set<Long> mCompleteInfoChannelIds = new HashSet<>(); 114 private final ContentObserver mProgramObserver; 115 116 private boolean mPrefetchEnabled; 117 private long mProgramPrefetchUpdateWaitMs; 118 private long mLastPrefetchTaskRunMs; 119 private ProgramsPrefetchTask mProgramsPrefetchTask; 120 121 // Any program that ends prior to this time will be removed from the cache 122 // when a channel's current program is updated. 123 // Note that there's no limit for end time. 124 private long mPrefetchTimeRangeStartMs; 125 126 private boolean mPauseProgramUpdate = false; 127 private final LruCache<Long, Program> mZeroLengthProgramCache = new LruCache<>(10); 128 129 @MainThread ProgramDataManager(Context context)130 public ProgramDataManager(Context context) { 131 this( 132 context, 133 TvSingletons.getSingletons(context).getDbExecutor(), 134 context.getContentResolver(), 135 Clock.SYSTEM, 136 Looper.myLooper(), 137 TvSingletons.getSingletons(context).getBackendKnobs(), 138 TvSingletons.getSingletons(context).getPerformanceMonitor(), 139 TvSingletons.getSingletons(context).getChannelDataManager()); 140 } 141 142 @VisibleForTesting ProgramDataManager( Context context, Executor executor, ContentResolver contentResolver, Clock time, Looper looper, BackendKnobsFlags backendKnobsFlags, PerformanceMonitor performanceMonitor, ChannelDataManager channelDataManager)143 ProgramDataManager( 144 Context context, 145 Executor executor, 146 ContentResolver contentResolver, 147 Clock time, 148 Looper looper, 149 BackendKnobsFlags backendKnobsFlags, 150 PerformanceMonitor performanceMonitor, 151 ChannelDataManager channelDataManager) { 152 mContext = context; 153 mDbExecutor = executor; 154 mClock = time; 155 mContentResolver = contentResolver; 156 mHandler = new MyHandler(looper); 157 mBackendKnobsFlags = backendKnobsFlags; 158 mPerformanceMonitor = performanceMonitor; 159 mChannelDataManager = channelDataManager; 160 mProgramObserver = 161 new ContentObserver(mHandler) { 162 @Override 163 public void onChange(boolean selfChange) { 164 if (!mHandler.hasMessages(MSG_UPDATE_CURRENT_PROGRAMS)) { 165 mHandler.sendEmptyMessage(MSG_UPDATE_CURRENT_PROGRAMS); 166 } 167 if (isProgramUpdatePaused()) { 168 return; 169 } 170 if (mPrefetchEnabled) { 171 // The delay time of an existing MSG_UPDATE_PREFETCH_PROGRAM could be 172 // quite long 173 // up to PROGRAM_GUIDE_SNAP_TIME_MS. So we need to remove the existing 174 // message 175 // and send MSG_UPDATE_PREFETCH_PROGRAM again. 176 mHandler.removeMessages(MSG_UPDATE_PREFETCH_PROGRAM); 177 mHandler.sendEmptyMessage(MSG_UPDATE_PREFETCH_PROGRAM); 178 } 179 } 180 }; 181 mProgramPrefetchUpdateWaitMs = PROGRAM_PREFETCH_UPDATE_WAIT_MS; 182 } 183 184 @VisibleForTesting getContentObserver()185 ContentObserver getContentObserver() { 186 return mProgramObserver; 187 } 188 189 /** 190 * Set the program prefetch update wait which gives the delay to query all programs from DB to 191 * prevent from too frequent DB queries. Default value is {@link 192 * #PROGRAM_PREFETCH_UPDATE_WAIT_MS} 193 */ 194 @VisibleForTesting setProgramPrefetchUpdateWait(long programPrefetchUpdateWaitMs)195 void setProgramPrefetchUpdateWait(long programPrefetchUpdateWaitMs) { 196 mProgramPrefetchUpdateWaitMs = programPrefetchUpdateWaitMs; 197 } 198 199 /** Starts the manager. */ start()200 public void start() { 201 if (mStarted) { 202 return; 203 } 204 mStarted = true; 205 // Should be called directly instead of posting MSG_UPDATE_CURRENT_PROGRAMS message 206 // to the handler. If not, another DB task can be executed before loading current programs. 207 handleUpdateCurrentPrograms(); 208 if (mPrefetchEnabled) { 209 mHandler.sendEmptyMessage(MSG_UPDATE_PREFETCH_PROGRAM); 210 } 211 mContentResolver.registerContentObserver(Programs.CONTENT_URI, true, mProgramObserver); 212 } 213 214 /** 215 * Stops the manager. It clears manager states and runs pending DB operations. Added listeners 216 * aren't automatically removed by this method. 217 */ 218 @VisibleForTesting stop()219 public void stop() { 220 if (!mStarted) { 221 return; 222 } 223 mStarted = false; 224 mContentResolver.unregisterContentObserver(mProgramObserver); 225 mHandler.removeCallbacksAndMessages(null); 226 227 clearTask(mProgramUpdateTaskMap); 228 cancelPrefetchTask(); 229 if (mProgramsUpdateTask != null) { 230 mProgramsUpdateTask.cancel(true); 231 mProgramsUpdateTask = null; 232 } 233 } 234 235 @AnyThread isCurrentProgramsLoadFinished()236 public boolean isCurrentProgramsLoadFinished() { 237 return mCurrentProgramsLoadFinished; 238 } 239 240 /** Returns the current program at the specified channel. */ 241 @AnyThread getCurrentProgram(long channelId)242 public Program getCurrentProgram(long channelId) { 243 return mChannelIdCurrentProgramMap.get(channelId); 244 } 245 246 /** Returns all the current programs. */ 247 @AnyThread getCurrentPrograms()248 public List<Program> getCurrentPrograms() { 249 return new ArrayList<>(mChannelIdCurrentProgramMap.values()); 250 } 251 252 /** Reloads program data. */ reload()253 public void reload() { 254 if (!mHandler.hasMessages(MSG_UPDATE_CURRENT_PROGRAMS)) { 255 mHandler.sendEmptyMessage(MSG_UPDATE_CURRENT_PROGRAMS); 256 } 257 if (mPrefetchEnabled && !mHandler.hasMessages(MSG_UPDATE_PREFETCH_PROGRAM)) { 258 mHandler.sendEmptyMessage(MSG_UPDATE_PREFETCH_PROGRAM); 259 } 260 } 261 prefetchChannel(long channelId)262 public void prefetchChannel(long channelId) { 263 if (mCompleteInfoChannelIds.add(channelId)) { 264 long startTimeMs = 265 Utils.floorTime( 266 mClock.currentTimeMillis() - PROGRAM_GUIDE_SNAP_TIME_MS, 267 PROGRAM_GUIDE_SNAP_TIME_MS); 268 long endTimeMs = startTimeMs + TimeUnit.HOURS.toMillis(getFetchDuration()); 269 new SingleChannelPrefetchTask(channelId, startTimeMs, endTimeMs).executeOnDbThread(); 270 } 271 } 272 273 /** A Callback interface to receive notification on program data retrieval from DB. */ 274 public interface Callback { 275 /** 276 * Called when a Program data is now available through getProgram() after the DB operation 277 * is done which wasn't before. This would be called only if fetched data is around the 278 * selected program. 279 */ onProgramUpdated()280 void onProgramUpdated(); 281 282 /** 283 * Called when we update complete program data of specific channel during scrolling. Data is 284 * loaded from DB on request basis. 285 * 286 * @param channelId 287 */ onSingleChannelUpdated(long channelId)288 void onSingleChannelUpdated(long channelId); 289 } 290 291 /** Adds the {@link Callback}. */ addCallback(Callback callback)292 public void addCallback(Callback callback) { 293 mCallbacks.add(callback); 294 } 295 296 /** Removes the {@link Callback}. */ removeCallback(Callback callback)297 public void removeCallback(Callback callback) { 298 mCallbacks.remove(callback); 299 } 300 301 /** Enables or Disables program prefetch. */ setPrefetchEnabled(boolean enable)302 public void setPrefetchEnabled(boolean enable) { 303 if (mPrefetchEnabled == enable) { 304 return; 305 } 306 if (enable) { 307 mPrefetchEnabled = true; 308 mLastPrefetchTaskRunMs = 0; 309 if (mStarted) { 310 mHandler.sendEmptyMessage(MSG_UPDATE_PREFETCH_PROGRAM); 311 } 312 } else { 313 mPrefetchEnabled = false; 314 cancelPrefetchTask(); 315 mChannelIdProgramCache.clear(); 316 mHandler.removeMessages(MSG_UPDATE_PREFETCH_PROGRAM); 317 } 318 } 319 320 /** 321 * Returns the programs for the given channel which ends after the given start time. 322 * 323 * <p>Prefetch should be enabled to call it. 324 * 325 * @return {@link List} with Programs. It may includes dummy program if the entry needs DB 326 * operations to get. 327 */ getPrograms(long channelId, long startTime)328 public List<Program> getPrograms(long channelId, long startTime) { 329 SoftPreconditions.checkState(mPrefetchEnabled, TAG, "Prefetch is disabled."); 330 ArrayList<Program> cachedPrograms = mChannelIdProgramCache.get(channelId); 331 if (cachedPrograms == null) { 332 return Collections.emptyList(); 333 } 334 int startIndex = getProgramIndexAt(cachedPrograms, startTime); 335 return Collections.unmodifiableList( 336 cachedPrograms.subList(startIndex, cachedPrograms.size())); 337 } 338 339 /** 340 * Returns the index of program that is played at the specified time. 341 * 342 * <p>If there isn't, return the first program among programs that starts after the given time 343 * if returnNextProgram is {@code true}. 344 */ getProgramIndexAt(List<Program> programs, long time)345 private int getProgramIndexAt(List<Program> programs, long time) { 346 Program key = mZeroLengthProgramCache.get(time); 347 if (key == null) { 348 key = createDummyProgram(time, time); 349 mZeroLengthProgramCache.put(time, key); 350 } 351 int index = Collections.binarySearch(programs, key); 352 if (index < 0) { 353 index = -(index + 1); // change it to index to be added. 354 if (index > 0 && isProgramPlayedAt(programs.get(index - 1), time)) { 355 // A program is played at that time. 356 return index - 1; 357 } 358 return index; 359 } 360 return index; 361 } 362 isProgramPlayedAt(Program program, long time)363 private boolean isProgramPlayedAt(Program program, long time) { 364 return program.getStartTimeUtcMillis() <= time && time <= program.getEndTimeUtcMillis(); 365 } 366 367 /** 368 * Adds the listener to be notified if current program is updated for a channel. 369 * 370 * @param channelId A channel ID to get notified. If it's {@link Channel#INVALID_ID}, the 371 * listener would be called whenever a current program is updated. 372 */ addOnCurrentProgramUpdatedListener( long channelId, OnCurrentProgramUpdatedListener listener)373 public void addOnCurrentProgramUpdatedListener( 374 long channelId, OnCurrentProgramUpdatedListener listener) { 375 mChannelId2ProgramUpdatedListeners.put(channelId, listener); 376 } 377 378 /** 379 * Removes the listener previously added by {@link #addOnCurrentProgramUpdatedListener(long, 380 * OnCurrentProgramUpdatedListener)}. 381 */ removeOnCurrentProgramUpdatedListener( long channelId, OnCurrentProgramUpdatedListener listener)382 public void removeOnCurrentProgramUpdatedListener( 383 long channelId, OnCurrentProgramUpdatedListener listener) { 384 mChannelId2ProgramUpdatedListeners.remove(channelId, listener); 385 } 386 notifyCurrentProgramUpdate(long channelId, Program program)387 private void notifyCurrentProgramUpdate(long channelId, Program program) { 388 for (OnCurrentProgramUpdatedListener listener : 389 mChannelId2ProgramUpdatedListeners.get(channelId)) { 390 listener.onCurrentProgramUpdated(channelId, program); 391 } 392 for (OnCurrentProgramUpdatedListener listener : 393 mChannelId2ProgramUpdatedListeners.get(Channel.INVALID_ID)) { 394 listener.onCurrentProgramUpdated(channelId, program); 395 } 396 } 397 updateCurrentProgram(long channelId, Program program)398 private void updateCurrentProgram(long channelId, Program program) { 399 Program previousProgram = 400 program == null 401 ? mChannelIdCurrentProgramMap.remove(channelId) 402 : mChannelIdCurrentProgramMap.put(channelId, program); 403 if (!Objects.equals(program, previousProgram)) { 404 if (mPrefetchEnabled) { 405 removePreviousProgramsAndUpdateCurrentProgramInCache(channelId, program); 406 } 407 notifyCurrentProgramUpdate(channelId, program); 408 } 409 410 long delayedTime; 411 if (program == null) { 412 delayedTime = 413 PERIODIC_PROGRAM_UPDATE_MIN_MS 414 + (long) 415 (Math.random() 416 * (PERIODIC_PROGRAM_UPDATE_MAX_MS 417 - PERIODIC_PROGRAM_UPDATE_MIN_MS)); 418 } else { 419 delayedTime = program.getEndTimeUtcMillis() - mClock.currentTimeMillis(); 420 } 421 mHandler.sendMessageDelayed( 422 mHandler.obtainMessage(MSG_UPDATE_ONE_CURRENT_PROGRAM, channelId), delayedTime); 423 } 424 removePreviousProgramsAndUpdateCurrentProgramInCache( long channelId, Program currentProgram)425 private void removePreviousProgramsAndUpdateCurrentProgramInCache( 426 long channelId, Program currentProgram) { 427 SoftPreconditions.checkState(mPrefetchEnabled, TAG, "Prefetch is disabled."); 428 if (!Program.isProgramValid(currentProgram)) { 429 return; 430 } 431 ArrayList<Program> cachedPrograms = mChannelIdProgramCache.remove(channelId); 432 if (cachedPrograms == null) { 433 return; 434 } 435 ListIterator<Program> i = cachedPrograms.listIterator(); 436 while (i.hasNext()) { 437 Program cachedProgram = i.next(); 438 if (cachedProgram.getEndTimeUtcMillis() <= mPrefetchTimeRangeStartMs) { 439 // Remove previous programs which will not be shown in program guide. 440 i.remove(); 441 continue; 442 } 443 444 if (cachedProgram.getEndTimeUtcMillis() <= currentProgram.getStartTimeUtcMillis()) { 445 // Keep the programs that ends earlier than current program 446 // but later than mPrefetchTimeRangeStartMs. 447 continue; 448 } 449 450 // Update dummy program around current program if any. 451 if (cachedProgram.getStartTimeUtcMillis() < currentProgram.getStartTimeUtcMillis()) { 452 // The dummy program starts earlier than the current program. Adjust its end time. 453 i.set( 454 createDummyProgram( 455 cachedProgram.getStartTimeUtcMillis(), 456 currentProgram.getStartTimeUtcMillis())); 457 i.add(currentProgram); 458 } else { 459 i.set(currentProgram); 460 } 461 if (currentProgram.getEndTimeUtcMillis() < cachedProgram.getEndTimeUtcMillis()) { 462 // The dummy program ends later than the current program. Adjust its start time. 463 i.add( 464 createDummyProgram( 465 currentProgram.getEndTimeUtcMillis(), 466 cachedProgram.getEndTimeUtcMillis())); 467 } 468 break; 469 } 470 if (cachedPrograms.isEmpty()) { 471 // If all the cached programs finish before mPrefetchTimeRangeStartMs, the 472 // currentProgram would not have a chance to be inserted to the cache. 473 cachedPrograms.add(currentProgram); 474 } 475 mChannelIdProgramCache.put(channelId, cachedPrograms); 476 } 477 handleUpdateCurrentPrograms()478 private void handleUpdateCurrentPrograms() { 479 if (mProgramsUpdateTask != null) { 480 mHandler.sendEmptyMessageDelayed( 481 MSG_UPDATE_CURRENT_PROGRAMS, CURRENT_PROGRAM_UPDATE_WAIT_MS); 482 return; 483 } 484 clearTask(mProgramUpdateTaskMap); 485 mHandler.removeMessages(MSG_UPDATE_ONE_CURRENT_PROGRAM); 486 mProgramsUpdateTask = new ProgramsUpdateTask(mClock.currentTimeMillis()); 487 mProgramsUpdateTask.executeOnDbThread(); 488 } 489 490 private class ProgramsPrefetchTask 491 extends AsyncDbTask<Void, Void, Map<Long, ArrayList<Program>>> { 492 private final long mStartTimeMs; 493 private final long mEndTimeMs; 494 495 private boolean mSuccess; 496 private TimerEvent mFromEmptyCacheTimeEvent; 497 ProgramsPrefetchTask()498 public ProgramsPrefetchTask() { 499 super(mDbExecutor); 500 long time = mClock.currentTimeMillis(); 501 mStartTimeMs = 502 Utils.floorTime(time - PROGRAM_GUIDE_SNAP_TIME_MS, PROGRAM_GUIDE_SNAP_TIME_MS); 503 mEndTimeMs = mStartTimeMs + TimeUnit.HOURS.toMillis(getFetchDuration()); 504 mSuccess = false; 505 } 506 507 @Override onPreExecute()508 protected void onPreExecute() { 509 if (mChannelIdCurrentProgramMap.isEmpty()) { 510 // No current program guide is shown. 511 // Measure the delay before users can see program guides. 512 mFromEmptyCacheTimeEvent = mPerformanceMonitor.startTimer(); 513 } 514 } 515 516 @Override doInBackground(Void... params)517 protected Map<Long, ArrayList<Program>> doInBackground(Void... params) { 518 TimerEvent asyncTimeEvent = mPerformanceMonitor.startTimer(); 519 Map<Long, ArrayList<Program>> programMap = new HashMap<>(); 520 if (DEBUG) { 521 Log.d( 522 TAG, 523 "Starts programs prefetch. " 524 + Utils.toTimeString(mStartTimeMs) 525 + "-" 526 + Utils.toTimeString(mEndTimeMs)); 527 } 528 Uri uri = 529 Programs.CONTENT_URI 530 .buildUpon() 531 .appendQueryParameter(PARAM_START_TIME, String.valueOf(mStartTimeMs)) 532 .appendQueryParameter(PARAM_END_TIME, String.valueOf(mEndTimeMs)) 533 .build(); 534 final int RETRY_COUNT = 3; 535 Program lastReadProgram = null; 536 for (int retryCount = RETRY_COUNT; retryCount > 0; retryCount--) { 537 if (isProgramUpdatePaused()) { 538 return null; 539 } 540 programMap.clear(); 541 542 String[] projection = 543 mBackendKnobsFlags.enablePartialProgramFetch() 544 ? Program.PARTIAL_PROJECTION 545 : Program.PROJECTION; 546 if (TvProviderUtils.checkSeriesIdColumn(mContext, Programs.CONTENT_URI)) { 547 if (Utils.isProgramsUri(uri)) { 548 projection = 549 TvProviderUtils.addExtraColumnsToProjection( 550 projection, TvProviderUtils.EXTRA_PROGRAM_COLUMN_SERIES_ID); 551 } 552 } 553 try (Cursor c = mContentResolver.query(uri, projection, null, null, SORT_BY_TIME)) { 554 if (c == null) { 555 continue; 556 } 557 while (c.moveToNext()) { 558 int duplicateCount = 0; 559 if (isCancelled()) { 560 if (DEBUG) { 561 Log.d(TAG, "ProgramsPrefetchTask canceled."); 562 } 563 return null; 564 } 565 Program program = 566 mBackendKnobsFlags.enablePartialProgramFetch() 567 ? Program.fromCursorPartialProjection(c) 568 : Program.fromCursor(c); 569 if (Program.isDuplicate(program, lastReadProgram)) { 570 duplicateCount++; 571 continue; 572 } else { 573 lastReadProgram = program; 574 } 575 ArrayList<Program> programs = programMap.get(program.getChannelId()); 576 if (programs == null) { 577 programs = new ArrayList<>(); 578 if (mBackendKnobsFlags.enablePartialProgramFetch()) { 579 // To skip already loaded complete data. 580 Program currentProgramInfo = 581 mChannelIdCurrentProgramMap.get(program.getChannelId()); 582 if (currentProgramInfo != null 583 && Program.isDuplicate(program, currentProgramInfo)) { 584 program = currentProgramInfo; 585 } 586 } 587 programMap.put(program.getChannelId(), programs); 588 } 589 programs.add(program); 590 if (duplicateCount > 0) { 591 Log.w(TAG, "Found " + duplicateCount + " duplicate programs"); 592 } 593 } 594 mSuccess = true; 595 break; 596 } catch (IllegalStateException e) { 597 if (DEBUG) { 598 Log.d(TAG, "Database is changed while querying. Will retry."); 599 } 600 } catch (SecurityException e) { 601 Log.w(TAG, "Security exception during program data query", e); 602 } catch (Exception e) { 603 Log.w(TAG, "Error during program data query", e); 604 } 605 } 606 if (DEBUG) { 607 Log.d(TAG, "Ends programs prefetch for " + programMap.size() + " channels"); 608 } 609 mPerformanceMonitor.stopTimer( 610 asyncTimeEvent, 611 EventNames.PROGRAM_DATA_MANAGER_PROGRAMS_PREFETCH_TASK_DO_IN_BACKGROUND); 612 return programMap; 613 } 614 615 @Override onPostExecute(Map<Long, ArrayList<Program>> programs)616 protected void onPostExecute(Map<Long, ArrayList<Program>> programs) { 617 mProgramsPrefetchTask = null; 618 if (isProgramUpdatePaused()) { 619 // ProgramsPrefetchTask will run again once setPauseProgramUpdate(false) is called. 620 return; 621 } 622 long nextMessageDelayedTime; 623 if (mSuccess) { 624 long currentTime = mClock.currentTimeMillis(); 625 mLastPrefetchTaskRunMs = currentTime; 626 nextMessageDelayedTime = 627 Utils.floorTime( 628 mLastPrefetchTaskRunMs + PROGRAM_GUIDE_SNAP_TIME_MS, 629 PROGRAM_GUIDE_SNAP_TIME_MS) 630 - currentTime; 631 // Issue second pre-fetch immediately after the first partial update 632 if (mChannelIdProgramCache.isEmpty()) { 633 nextMessageDelayedTime = 0; 634 } 635 mChannelIdProgramCache = programs; 636 if (mBackendKnobsFlags.enablePartialProgramFetch()) { 637 // Since cache has partial data we need to reset the map of complete data. 638 mCompleteInfoChannelIds.clear(); 639 } 640 notifyProgramUpdated(); 641 if (mFromEmptyCacheTimeEvent != null) { 642 mPerformanceMonitor.stopTimer( 643 mFromEmptyCacheTimeEvent, 644 EventNames.PROGRAM_GUIDE_SHOW_FROM_EMPTY_CACHE); 645 mFromEmptyCacheTimeEvent = null; 646 } 647 } else { 648 nextMessageDelayedTime = PERIODIC_PROGRAM_UPDATE_MIN_MS; 649 } 650 if (!mHandler.hasMessages(MSG_UPDATE_PREFETCH_PROGRAM)) { 651 mHandler.sendEmptyMessageDelayed( 652 MSG_UPDATE_PREFETCH_PROGRAM, nextMessageDelayedTime); 653 } 654 } 655 } 656 getFetchDuration()657 private long getFetchDuration() { 658 if (mChannelIdProgramCache.isEmpty()) { 659 return Math.max(1L, mBackendKnobsFlags.programGuideInitialFetchHours()); 660 } else { 661 long durationHours; 662 int channelCount = mChannelDataManager.getChannelCount(); 663 long knobsMaxHours = mBackendKnobsFlags.programGuideMaxHours(); 664 long targetChannelCount = mBackendKnobsFlags.epgTargetChannelCount(); 665 if (channelCount <= targetChannelCount) { 666 durationHours = Math.max(48L, knobsMaxHours); 667 } else { 668 // 2 days <= duration <= 14 days (336 hours) 669 durationHours = knobsMaxHours * targetChannelCount / channelCount; 670 if (durationHours < 48L) { 671 durationHours = 48L; 672 } else if (durationHours > 336L) { 673 durationHours = 336L; 674 } 675 } 676 return durationHours; 677 } 678 } 679 680 private class SingleChannelPrefetchTask extends AsyncDbTask.AsyncQueryTask<ArrayList<Program>> { 681 long mChannelId; 682 SingleChannelPrefetchTask(long channelId, long startTimeMs, long endTimeMs)683 public SingleChannelPrefetchTask(long channelId, long startTimeMs, long endTimeMs) { 684 super( 685 mDbExecutor, 686 mContext, 687 TvContract.buildProgramsUriForChannel(channelId, startTimeMs, endTimeMs), 688 Program.PROJECTION, 689 null, 690 null, 691 SORT_BY_TIME); 692 mChannelId = channelId; 693 } 694 695 @Override onQuery(Cursor c)696 protected ArrayList<Program> onQuery(Cursor c) { 697 ArrayList<Program> programMap = new ArrayList<>(); 698 while (c.moveToNext()) { 699 Program program = Program.fromCursor(c); 700 programMap.add(program); 701 } 702 return programMap; 703 } 704 705 @Override onPostExecute(ArrayList<Program> programs)706 protected void onPostExecute(ArrayList<Program> programs) { 707 mChannelIdProgramCache.put(mChannelId, programs); 708 notifySingleChannelUpdated(mChannelId); 709 } 710 } 711 notifyProgramUpdated()712 private void notifyProgramUpdated() { 713 for (Callback callback : mCallbacks) { 714 callback.onProgramUpdated(); 715 } 716 } 717 notifySingleChannelUpdated(long channelId)718 private void notifySingleChannelUpdated(long channelId) { 719 for (Callback callback : mCallbacks) { 720 callback.onSingleChannelUpdated(channelId); 721 } 722 } 723 724 private class ProgramsUpdateTask extends AsyncDbTask.AsyncQueryTask<List<Program>> { ProgramsUpdateTask(long time)725 public ProgramsUpdateTask(long time) { 726 super( 727 mDbExecutor, 728 mContext, 729 Programs.CONTENT_URI 730 .buildUpon() 731 .appendQueryParameter(PARAM_START_TIME, String.valueOf(time)) 732 .appendQueryParameter(PARAM_END_TIME, String.valueOf(time)) 733 .build(), 734 Program.PROJECTION, 735 null, 736 null, 737 SORT_BY_TIME); 738 } 739 740 @Override onQuery(Cursor c)741 public List<Program> onQuery(Cursor c) { 742 final List<Program> programs = new ArrayList<>(); 743 if (c != null) { 744 int duplicateCount = 0; 745 Program lastReadProgram = null; 746 while (c.moveToNext()) { 747 if (isCancelled()) { 748 return programs; 749 } 750 Program program = Program.fromCursor(c); 751 if (Program.isDuplicate(program, lastReadProgram)) { 752 duplicateCount++; 753 continue; 754 } else { 755 lastReadProgram = program; 756 } 757 programs.add(program); 758 } 759 if (duplicateCount > 0) { 760 Log.w(TAG, "Found " + duplicateCount + " duplicate programs"); 761 } 762 } 763 return programs; 764 } 765 766 @Override onPostExecute(List<Program> programs)767 protected void onPostExecute(List<Program> programs) { 768 if (DEBUG) Log.d(TAG, "ProgramsUpdateTask done"); 769 mProgramsUpdateTask = null; 770 if (programs != null) { 771 Set<Long> removedChannelIds = new HashSet<>(mChannelIdCurrentProgramMap.keySet()); 772 for (Program program : programs) { 773 long channelId = program.getChannelId(); 774 updateCurrentProgram(channelId, program); 775 removedChannelIds.remove(channelId); 776 } 777 for (Long channelId : removedChannelIds) { 778 if (mPrefetchEnabled) { 779 mChannelIdProgramCache.remove(channelId); 780 if (mBackendKnobsFlags.enablePartialProgramFetch()) { 781 mCompleteInfoChannelIds.remove(channelId); 782 } 783 } 784 mChannelIdCurrentProgramMap.remove(channelId); 785 notifyCurrentProgramUpdate(channelId, null); 786 } 787 } 788 mCurrentProgramsLoadFinished = true; 789 } 790 } 791 792 private class UpdateCurrentProgramForChannelTask extends AsyncDbTask.AsyncQueryTask<Program> { 793 private final long mChannelId; 794 UpdateCurrentProgramForChannelTask(long channelId, long time)795 private UpdateCurrentProgramForChannelTask(long channelId, long time) { 796 super( 797 mDbExecutor, 798 mContext, 799 TvContract.buildProgramsUriForChannel(channelId, time, time), 800 Program.PROJECTION, 801 null, 802 null, 803 SORT_BY_TIME); 804 mChannelId = channelId; 805 } 806 807 @Override onQuery(Cursor c)808 public Program onQuery(Cursor c) { 809 Program program = null; 810 if (c != null && c.moveToNext()) { 811 program = Program.fromCursor(c); 812 } 813 return program; 814 } 815 816 @Override onPostExecute(Program program)817 protected void onPostExecute(Program program) { 818 mProgramUpdateTaskMap.remove(mChannelId); 819 updateCurrentProgram(mChannelId, program); 820 } 821 } 822 823 private class MyHandler extends Handler { MyHandler(Looper looper)824 public MyHandler(Looper looper) { 825 super(looper); 826 } 827 828 @Override handleMessage(Message msg)829 public void handleMessage(Message msg) { 830 switch (msg.what) { 831 case MSG_UPDATE_CURRENT_PROGRAMS: 832 handleUpdateCurrentPrograms(); 833 break; 834 case MSG_UPDATE_ONE_CURRENT_PROGRAM: 835 { 836 long channelId = (Long) msg.obj; 837 UpdateCurrentProgramForChannelTask oldTask = 838 mProgramUpdateTaskMap.get(channelId); 839 if (oldTask != null) { 840 oldTask.cancel(true); 841 } 842 UpdateCurrentProgramForChannelTask task = 843 new UpdateCurrentProgramForChannelTask( 844 channelId, mClock.currentTimeMillis()); 845 mProgramUpdateTaskMap.put(channelId, task); 846 task.executeOnDbThread(); 847 break; 848 } 849 case MSG_UPDATE_PREFETCH_PROGRAM: 850 { 851 if (isProgramUpdatePaused()) { 852 return; 853 } 854 if (mProgramsPrefetchTask != null) { 855 mHandler.sendEmptyMessageDelayed( 856 msg.what, mProgramPrefetchUpdateWaitMs); 857 return; 858 } 859 long delayMillis = 860 mLastPrefetchTaskRunMs 861 + mProgramPrefetchUpdateWaitMs 862 - mClock.currentTimeMillis(); 863 if (delayMillis > 0) { 864 mHandler.sendEmptyMessageDelayed( 865 MSG_UPDATE_PREFETCH_PROGRAM, delayMillis); 866 } else { 867 mProgramsPrefetchTask = new ProgramsPrefetchTask(); 868 mProgramsPrefetchTask.executeOnDbThread(); 869 } 870 break; 871 } 872 default: 873 // Do nothing 874 } 875 } 876 } 877 878 /** 879 * Pause program update. Updating program data will result in UI refresh, but UI is fragile to 880 * handle it so we'd better disable it for a while. 881 * 882 * <p>Prefetch should be enabled to call it. 883 */ setPauseProgramUpdate(boolean pauseProgramUpdate)884 public void setPauseProgramUpdate(boolean pauseProgramUpdate) { 885 SoftPreconditions.checkState(mPrefetchEnabled, TAG, "Prefetch is disabled."); 886 if (mPauseProgramUpdate && !pauseProgramUpdate) { 887 if (!mHandler.hasMessages(MSG_UPDATE_PREFETCH_PROGRAM)) { 888 // MSG_UPDATE_PRFETCH_PROGRAM can be empty 889 // if prefetch task is launched while program update is paused. 890 // Update immediately in that case. 891 mHandler.sendEmptyMessage(MSG_UPDATE_PREFETCH_PROGRAM); 892 } 893 } 894 mPauseProgramUpdate = pauseProgramUpdate; 895 } 896 isProgramUpdatePaused()897 private boolean isProgramUpdatePaused() { 898 // Although pause is requested, we need to keep updating if cache is empty. 899 return mPauseProgramUpdate && !mChannelIdProgramCache.isEmpty(); 900 } 901 902 /** 903 * Sets program data prefetch time range. Any program data that ends before the start time will 904 * be removed from the cache later. Note that there's no limit for end time. 905 * 906 * <p>Prefetch should be enabled to call it. 907 */ setPrefetchTimeRange(long startTimeMs)908 public void setPrefetchTimeRange(long startTimeMs) { 909 SoftPreconditions.checkState(mPrefetchEnabled, TAG, "Prefetch is disabled."); 910 if (mPrefetchTimeRangeStartMs > startTimeMs) { 911 // Fetch the programs immediately to re-create the cache. 912 if (!mHandler.hasMessages(MSG_UPDATE_PREFETCH_PROGRAM)) { 913 mHandler.sendEmptyMessage(MSG_UPDATE_PREFETCH_PROGRAM); 914 } 915 } 916 mPrefetchTimeRangeStartMs = startTimeMs; 917 } 918 clearTask(LongSparseArray<UpdateCurrentProgramForChannelTask> tasks)919 private void clearTask(LongSparseArray<UpdateCurrentProgramForChannelTask> tasks) { 920 for (int i = 0; i < tasks.size(); i++) { 921 tasks.valueAt(i).cancel(true); 922 } 923 tasks.clear(); 924 } 925 cancelPrefetchTask()926 private void cancelPrefetchTask() { 927 if (mProgramsPrefetchTask != null) { 928 mProgramsPrefetchTask.cancel(true); 929 mProgramsPrefetchTask = null; 930 } 931 } 932 933 // Create dummy program which indicates data isn't loaded yet so DB query is required. createDummyProgram(long startTimeMs, long endTimeMs)934 private Program createDummyProgram(long startTimeMs, long endTimeMs) { 935 return new Program.Builder() 936 .setChannelId(Channel.INVALID_ID) 937 .setStartTimeUtcMillis(startTimeMs) 938 .setEndTimeUtcMillis(endTimeMs) 939 .build(); 940 } 941 942 @Override performTrimMemory(int level)943 public void performTrimMemory(int level) { 944 mChannelId2ProgramUpdatedListeners.clearEmptyCache(); 945 } 946 } 947