1 /* 2 * Copyright (C) 2015 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.tv.tuner.source; 18 19 import android.content.Context; 20 import android.os.Environment; 21 import android.util.Log; 22 import android.util.SparseBooleanArray; 23 import com.android.tv.common.SoftPreconditions; 24 import com.android.tv.tuner.api.ScanChannel; 25 import com.android.tv.tuner.data.TunerChannel; 26 import com.android.tv.tuner.features.TunerFeatures; 27 import com.android.tv.tuner.ts.EventDetector.EventListener; 28 import com.android.tv.tuner.ts.TsParser; 29 import com.google.android.exoplayer.C; 30 import com.google.android.exoplayer.upstream.DataSpec; 31 import java.io.BufferedInputStream; 32 import java.io.File; 33 import java.io.FileInputStream; 34 import java.io.IOException; 35 import java.util.List; 36 import java.util.concurrent.atomic.AtomicLong; 37 38 /** 39 * Provides MPEG-2 TS stream sources for both channel scanning and channel playing from a local file 40 * generated by capturing TV signal. 41 */ 42 public class FileTsStreamer implements TsStreamer { 43 private static final String TAG = "FileTsStreamer"; 44 45 private static final int TS_PACKET_SIZE = 188; 46 private static final int TS_SYNC_BYTE = 0x47; 47 private static final int MIN_READ_UNIT = TS_PACKET_SIZE * 10; 48 private static final int READ_BUFFER_SIZE = MIN_READ_UNIT * 10; // ~20KB 49 private static final int CIRCULAR_BUFFER_SIZE = MIN_READ_UNIT * 4000; // ~ 8MB 50 private static final int PADDING_SIZE = MIN_READ_UNIT * 1000; // ~2MB 51 private static final int READ_TIMEOUT_MS = 10000; // 10 secs. 52 private static final int BUFFER_UNDERRUN_SLEEP_MS = 10; 53 private static final String FILE_DIR = 54 new File(Environment.getExternalStorageDirectory(), "Streams").getAbsolutePath(); 55 56 // Virtual frequency base used for file-based source 57 public static final int FREQ_BASE = 100; 58 59 private final Object mCircularBufferMonitor = new Object(); 60 private final byte[] mCircularBuffer = new byte[CIRCULAR_BUFFER_SIZE]; 61 private final FileSourceEventDetector mEventDetector; 62 private final Context mContext; 63 64 private long mBytesFetched; 65 private long mLastReadPosition; 66 private boolean mStreaming; 67 68 private Thread mStreamingThread; 69 private StreamProvider mSource; 70 71 public static class FileDataSource extends TsDataSource { 72 private final FileTsStreamer mTsStreamer; 73 private final AtomicLong mLastReadPosition = new AtomicLong(0); 74 private long mStartBufferedPosition; 75 FileDataSource(FileTsStreamer tsStreamer)76 private FileDataSource(FileTsStreamer tsStreamer) { 77 mTsStreamer = tsStreamer; 78 mStartBufferedPosition = tsStreamer.getBufferedPosition(); 79 } 80 81 @Override getBufferedPosition()82 public long getBufferedPosition() { 83 return mTsStreamer.getBufferedPosition() - mStartBufferedPosition; 84 } 85 86 @Override getLastReadPosition()87 public long getLastReadPosition() { 88 return mLastReadPosition.get(); 89 } 90 91 @Override shiftStartPosition(long offset)92 public void shiftStartPosition(long offset) { 93 SoftPreconditions.checkState(mLastReadPosition.get() == 0); 94 SoftPreconditions.checkArgument(0 <= offset && offset <= getBufferedPosition()); 95 mStartBufferedPosition += offset; 96 } 97 98 @Override open(DataSpec dataSpec)99 public long open(DataSpec dataSpec) throws IOException { 100 mLastReadPosition.set(0); 101 return C.LENGTH_UNBOUNDED; 102 } 103 104 @Override close()105 public void close() {} 106 107 @Override read(byte[] buffer, int offset, int readLength)108 public int read(byte[] buffer, int offset, int readLength) throws IOException { 109 int ret = 110 mTsStreamer.readAt( 111 mStartBufferedPosition + mLastReadPosition.get(), 112 buffer, 113 offset, 114 readLength); 115 if (ret > 0) { 116 mLastReadPosition.addAndGet(ret); 117 } 118 return ret; 119 } 120 } 121 122 /** 123 * Creates {@link TsStreamer} for scanning & playing MPEG-2 TS file. 124 * 125 * @param eventListener the listener for channel & program information 126 */ FileTsStreamer(EventListener eventListener, Context context)127 public FileTsStreamer(EventListener eventListener, Context context) { 128 mEventDetector = 129 new FileSourceEventDetector( 130 eventListener, TunerFeatures.ENABLE_FILE_DVB.isEnabled(context)); 131 mContext = context; 132 } 133 134 @Override startStream(ScanChannel channel)135 public boolean startStream(ScanChannel channel) { 136 String filepath = new File(FILE_DIR, channel.filename).getAbsolutePath(); 137 mSource = new StreamProvider(filepath); 138 if (!mSource.isReady()) { 139 return false; 140 } 141 mEventDetector.start(mSource, FileSourceEventDetector.ALL_PROGRAM_NUMBERS); 142 mSource.addPidFilter(TsParser.PAT_PID); 143 mSource.addPidFilter(TsParser.ATSC_SI_BASE_PID); 144 if (TunerFeatures.ENABLE_FILE_DVB.isEnabled(mContext)) { 145 mSource.addPidFilter(TsParser.DVB_EIT_PID); 146 mSource.addPidFilter(TsParser.DVB_SDT_PID); 147 } 148 synchronized (mCircularBufferMonitor) { 149 if (mStreaming) { 150 return true; 151 } 152 mStreaming = true; 153 } 154 155 mStreamingThread = new StreamingThread(); 156 mStreamingThread.start(); 157 Log.i(TAG, "Streaming started"); 158 return true; 159 } 160 161 @Override startStream(TunerChannel channel)162 public boolean startStream(TunerChannel channel) { 163 Log.i(TAG, "tuneToChannel with: " + channel.getFilepath()); 164 mSource = new StreamProvider(channel.getFilepath()); 165 if (!mSource.isReady()) { 166 return false; 167 } 168 mEventDetector.start(mSource, channel.getProgramNumber()); 169 mSource.addPidFilter(channel.getVideoPid()); 170 for (Integer i : channel.getAudioPids()) { 171 mSource.addPidFilter(i); 172 } 173 mSource.addPidFilter(channel.getPcrPid()); 174 mSource.addPidFilter(TsParser.PAT_PID); 175 mSource.addPidFilter(TsParser.ATSC_SI_BASE_PID); 176 if (TunerFeatures.ENABLE_FILE_DVB.isEnabled(mContext)) { 177 mSource.addPidFilter(TsParser.DVB_EIT_PID); 178 mSource.addPidFilter(TsParser.DVB_SDT_PID); 179 } 180 synchronized (mCircularBufferMonitor) { 181 if (mStreaming) { 182 return true; 183 } 184 mStreaming = true; 185 } 186 187 mStreamingThread = new StreamingThread(); 188 mStreamingThread.start(); 189 Log.i(TAG, "Streaming started"); 190 return true; 191 } 192 193 /** 194 * Blocks the current thread until the streaming thread stops. In rare cases when the tuner 195 * device is overloaded this can take a while, but usually it returns pretty quickly. 196 */ 197 @Override stopStream()198 public void stopStream() { 199 synchronized (mCircularBufferMonitor) { 200 mStreaming = false; 201 mCircularBufferMonitor.notify(); 202 } 203 204 try { 205 if (mStreamingThread != null) { 206 mStreamingThread.join(); 207 } 208 } catch (InterruptedException e) { 209 Thread.currentThread().interrupt(); 210 } 211 } 212 213 @Override createDataSource()214 public TsDataSource createDataSource() { 215 return new FileDataSource(this); 216 } 217 218 /** 219 * Returns the current buffered position from the file. 220 * 221 * @return the current buffered position 222 */ getBufferedPosition()223 public long getBufferedPosition() { 224 synchronized (mCircularBufferMonitor) { 225 return mBytesFetched; 226 } 227 } 228 229 /** Provides MPEG-2 transport stream from a local file. Stream can be filtered by PID. */ 230 public static class StreamProvider { 231 private final String mFilepath; 232 private final SparseBooleanArray mPids = new SparseBooleanArray(); 233 private final byte[] mPreBuffer = new byte[READ_BUFFER_SIZE]; 234 235 private BufferedInputStream mInputStream; 236 StreamProvider(String filepath)237 private StreamProvider(String filepath) { 238 mFilepath = filepath; 239 open(filepath); 240 } 241 open(String filepath)242 private void open(String filepath) { 243 try { 244 mInputStream = new BufferedInputStream(new FileInputStream(filepath)); 245 } catch (IOException e) { 246 Log.e(TAG, "Error opening input stream", e); 247 mInputStream = null; 248 } 249 } 250 isReady()251 private boolean isReady() { 252 return mInputStream != null; 253 } 254 255 /** Returns the file path of the MPEG-2 TS file. */ getFilepath()256 public String getFilepath() { 257 return mFilepath; 258 } 259 260 /** Adds a pid for filtering from the MPEG-2 TS file. */ addPidFilter(int pid)261 public void addPidFilter(int pid) { 262 mPids.put(pid, true); 263 } 264 265 /** Returns whether the current pid filter is empty or not. */ isFilterEmpty()266 public boolean isFilterEmpty() { 267 return mPids.size() == 0; 268 } 269 270 /** Clears the current pid filter. */ clearPidFilter()271 public void clearPidFilter() { 272 mPids.clear(); 273 } 274 275 /** 276 * Returns whether a pid is in the pid filter or not. 277 * 278 * @param pid the pid to check 279 */ isInFilter(int pid)280 public boolean isInFilter(int pid) { 281 return mPids.get(pid); 282 } 283 284 /** 285 * Reads from the MPEG-2 TS file to buffer. 286 * 287 * @param inputBuffer to read 288 * @return the number of read bytes 289 */ read(byte[] inputBuffer)290 private int read(byte[] inputBuffer) { 291 int readSize = readInternal(); 292 if (readSize <= 0) { 293 // Reached the end of stream. Restart from the beginning. 294 close(); 295 open(mFilepath); 296 if (mInputStream == null) { 297 return -1; 298 } 299 readSize = readInternal(); 300 } 301 302 if (mPreBuffer[0] != TS_SYNC_BYTE) { 303 Log.e(TAG, "Error reading input stream - no TS sync found"); 304 return -1; 305 } 306 int filteredSize = 0; 307 for (int i = 0, destPos = 0; i < readSize; i += TS_PACKET_SIZE) { 308 if (mPreBuffer[i] == TS_SYNC_BYTE) { 309 int pid = ((mPreBuffer[i + 1] & 0x1f) << 8) + (mPreBuffer[i + 2] & 0xff); 310 if (mPids.get(pid)) { 311 System.arraycopy(mPreBuffer, i, inputBuffer, destPos, TS_PACKET_SIZE); 312 destPos += TS_PACKET_SIZE; 313 filteredSize += TS_PACKET_SIZE; 314 } 315 } 316 } 317 return filteredSize; 318 } 319 readInternal()320 private int readInternal() { 321 int readSize; 322 try { 323 readSize = mInputStream.read(mPreBuffer, 0, mPreBuffer.length); 324 } catch (IOException e) { 325 Log.e(TAG, "Error reading input stream", e); 326 return -1; 327 } 328 return readSize; 329 } 330 close()331 private void close() { 332 try { 333 mInputStream.close(); 334 } catch (IOException e) { 335 Log.e(TAG, "Error closing input stream:", e); 336 } 337 mInputStream = null; 338 } 339 } 340 341 /** 342 * Reads data from internal buffer. 343 * 344 * @param pos the position to read from 345 * @param buffer to read 346 * @param offset start position of the read buffer 347 * @param amount number of bytes to read 348 * @return number of read bytes when successful, {@code -1} otherwise 349 * @throws IOException 350 */ readAt(long pos, byte[] buffer, int offset, int amount)351 public int readAt(long pos, byte[] buffer, int offset, int amount) throws IOException { 352 synchronized (mCircularBufferMonitor) { 353 long initialBytesFetched = mBytesFetched; 354 while (mBytesFetched < pos + amount && mStreaming) { 355 try { 356 mCircularBufferMonitor.wait(READ_TIMEOUT_MS); 357 } catch (InterruptedException e) { 358 // Wait again. 359 Thread.currentThread().interrupt(); 360 } 361 if (initialBytesFetched == mBytesFetched) { 362 Log.w(TAG, "No data update for " + READ_TIMEOUT_MS + "ms. returning -1."); 363 364 // Returning -1 will make demux report EOS so that the input service can retry 365 // the playback. 366 return -1; 367 } 368 } 369 if (!mStreaming) { 370 Log.w(TAG, "Stream is already stopped."); 371 return -1; 372 } 373 if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) { 374 Log.e(TAG, "Demux is requesting the data which is already overwritten."); 375 return -1; 376 } 377 int posInBuffer = (int) (pos % CIRCULAR_BUFFER_SIZE); 378 int bytesToCopyInFirstPass = amount; 379 if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) { 380 bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer; 381 } 382 System.arraycopy(mCircularBuffer, posInBuffer, buffer, offset, bytesToCopyInFirstPass); 383 if (bytesToCopyInFirstPass < amount) { 384 System.arraycopy( 385 mCircularBuffer, 386 0, 387 buffer, 388 offset + bytesToCopyInFirstPass, 389 amount - bytesToCopyInFirstPass); 390 } 391 mLastReadPosition = pos + amount; 392 mCircularBufferMonitor.notify(); 393 return amount; 394 } 395 } 396 397 /** 398 * Adds {@link ScanChannel} instance for local files. 399 * 400 * @param output a list of channels where the results will be placed in 401 */ addLocalStreamFiles(List<ScanChannel> output)402 public static void addLocalStreamFiles(List<ScanChannel> output) { 403 File dir = new File(FILE_DIR); 404 if (!dir.exists()) return; 405 406 File[] tsFiles = dir.listFiles(); 407 if (tsFiles == null) return; 408 int freq = FileTsStreamer.FREQ_BASE; 409 for (File file : tsFiles) { 410 if (!file.isFile()) continue; 411 output.add(ScanChannel.forFile(freq, file.getName())); 412 freq += 100; 413 } 414 } 415 416 /** 417 * A thread managing a circular buffer that holds stream data to be consumed by player. Keeps 418 * reading data in from a {@link StreamProvider} to hold enough amount for buffering. Started 419 * and stopped by {@link #startStream()} and {@link #stopStream()}, respectively. 420 */ 421 private class StreamingThread extends Thread { 422 @Override run()423 public void run() { 424 byte[] dataBuffer = new byte[READ_BUFFER_SIZE]; 425 426 synchronized (mCircularBufferMonitor) { 427 mBytesFetched = 0; 428 mLastReadPosition = 0; 429 } 430 431 while (true) { 432 synchronized (mCircularBufferMonitor) { 433 while ((mBytesFetched - mLastReadPosition + PADDING_SIZE) > CIRCULAR_BUFFER_SIZE 434 && mStreaming) { 435 try { 436 mCircularBufferMonitor.wait(); 437 } catch (InterruptedException e) { 438 // Wait again. 439 Thread.currentThread().interrupt(); 440 } 441 } 442 if (!mStreaming) { 443 break; 444 } 445 } 446 447 int bytesWritten = mSource.read(dataBuffer); 448 if (bytesWritten <= 0) { 449 try { 450 // When buffer is underrun, we sleep for short time to prevent 451 // unnecessary CPU draining. 452 sleep(BUFFER_UNDERRUN_SLEEP_MS); 453 } catch (InterruptedException e) { 454 Thread.currentThread().interrupt(); 455 } 456 continue; 457 } 458 459 mEventDetector.feedTSStream(dataBuffer, 0, bytesWritten); 460 461 synchronized (mCircularBufferMonitor) { 462 int posInBuffer = (int) (mBytesFetched % CIRCULAR_BUFFER_SIZE); 463 int bytesToCopyInFirstPass = bytesWritten; 464 if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) { 465 bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer; 466 } 467 System.arraycopy( 468 dataBuffer, 0, mCircularBuffer, posInBuffer, bytesToCopyInFirstPass); 469 if (bytesToCopyInFirstPass < bytesWritten) { 470 System.arraycopy( 471 dataBuffer, 472 bytesToCopyInFirstPass, 473 mCircularBuffer, 474 0, 475 bytesWritten - bytesToCopyInFirstPass); 476 } 477 mBytesFetched += bytesWritten; 478 mCircularBufferMonitor.notify(); 479 } 480 } 481 482 Log.i(TAG, "Streaming stopped"); 483 mSource.close(); 484 } 485 } 486 } 487