1 /* 2 * Copyright (C) 2015 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.tv.tuner.source; 18 19 import android.content.Context; 20 import android.os.Environment; 21 import android.util.Log; 22 import android.util.SparseBooleanArray; 23 24 import com.google.android.exoplayer.C; 25 import com.google.android.exoplayer.upstream.DataSpec; 26 import com.android.tv.Features; 27 import com.android.tv.common.SoftPreconditions; 28 import com.android.tv.tuner.ChannelScanFileParser.ScanChannel; 29 import com.android.tv.tuner.data.TunerChannel; 30 import com.android.tv.tuner.ts.TsParser; 31 import com.android.tv.tuner.tvinput.EventDetector; 32 import com.android.tv.tuner.tvinput.FileSourceEventDetector; 33 34 import java.io.BufferedInputStream; 35 import java.io.File; 36 import java.io.FileInputStream; 37 import java.io.IOException; 38 import java.util.List; 39 import java.util.concurrent.atomic.AtomicLong; 40 41 /** 42 * Provides MPEG-2 TS stream sources for both channel scanning and channel playing from a local file 43 * generated by capturing TV signal. 44 */ 45 public class FileTsStreamer implements TsStreamer { 46 private static final String TAG = "FileTsStreamer"; 47 48 private static final int TS_PACKET_SIZE = 188; 49 private static final int TS_SYNC_BYTE = 0x47; 50 private static final int MIN_READ_UNIT = TS_PACKET_SIZE * 10; 51 private static final int READ_BUFFER_SIZE = MIN_READ_UNIT * 10; // ~20KB 52 private static final int CIRCULAR_BUFFER_SIZE = MIN_READ_UNIT * 4000; // ~ 8MB 53 private static final int PADDING_SIZE = MIN_READ_UNIT * 1000; // ~2MB 54 private static final int READ_TIMEOUT_MS = 10000; // 10 secs. 55 private static final int BUFFER_UNDERRUN_SLEEP_MS = 10; 56 private static final String FILE_DIR = 57 new File(Environment.getExternalStorageDirectory(), "Streams").getAbsolutePath(); 58 59 // Virtual frequency base used for file-based source 60 public static final int FREQ_BASE = 100; 61 62 private final Object mCircularBufferMonitor = new Object(); 63 private final byte[] mCircularBuffer = new byte[CIRCULAR_BUFFER_SIZE]; 64 private final FileSourceEventDetector mEventDetector; 65 private final Context mContext; 66 67 private long mBytesFetched; 68 private long mLastReadPosition; 69 private boolean mStreaming; 70 71 private Thread mStreamingThread; 72 private StreamProvider mSource; 73 74 public static class FileDataSource extends TsDataSource { 75 private final FileTsStreamer mTsStreamer; 76 private final AtomicLong mLastReadPosition = new AtomicLong(0); 77 private long mStartBufferedPosition; 78 FileDataSource(FileTsStreamer tsStreamer)79 private FileDataSource(FileTsStreamer tsStreamer) { 80 mTsStreamer = tsStreamer; 81 mStartBufferedPosition = tsStreamer.getBufferedPosition(); 82 } 83 84 @Override getBufferedPosition()85 public long getBufferedPosition() { 86 return mTsStreamer.getBufferedPosition() - mStartBufferedPosition; 87 } 88 89 @Override getLastReadPosition()90 public long getLastReadPosition() { 91 return mLastReadPosition.get(); 92 } 93 94 @Override shiftStartPosition(long offset)95 public void shiftStartPosition(long offset) { 96 SoftPreconditions.checkState(mLastReadPosition.get() == 0); 97 SoftPreconditions.checkArgument(0 <= offset && offset <= getBufferedPosition()); 98 mStartBufferedPosition += offset; 99 } 100 101 @Override open(DataSpec dataSpec)102 public long open(DataSpec dataSpec) throws IOException { 103 mLastReadPosition.set(0); 104 return C.LENGTH_UNBOUNDED; 105 } 106 107 @Override close()108 public void close() { 109 } 110 111 @Override read(byte[] buffer, int offset, int readLength)112 public int read(byte[] buffer, int offset, int readLength) throws IOException { 113 int ret = mTsStreamer.readAt(mStartBufferedPosition + mLastReadPosition.get(), buffer, 114 offset, readLength); 115 if (ret > 0) { 116 mLastReadPosition.addAndGet(ret); 117 } 118 return ret; 119 } 120 } 121 122 /** 123 * Creates {@link TsStreamer} for scanning & playing MPEG-2 TS file. 124 * @param eventListener the listener for channel & program information 125 */ FileTsStreamer(EventDetector.EventListener eventListener, Context context)126 public FileTsStreamer(EventDetector.EventListener eventListener, Context context) { 127 mEventDetector = 128 new FileSourceEventDetector( 129 eventListener, Features.ENABLE_FILE_DVB.isEnabled(context)); 130 mContext = context; 131 } 132 133 @Override startStream(ScanChannel channel)134 public boolean startStream(ScanChannel channel) { 135 String filepath = new File(FILE_DIR, channel.filename).getAbsolutePath(); 136 mSource = new StreamProvider(filepath); 137 if (!mSource.isReady()) { 138 return false; 139 } 140 mEventDetector.start(mSource, FileSourceEventDetector.ALL_PROGRAM_NUMBERS); 141 mSource.addPidFilter(TsParser.PAT_PID); 142 mSource.addPidFilter(TsParser.ATSC_SI_BASE_PID); 143 if (Features.ENABLE_FILE_DVB.isEnabled(mContext)) { 144 mSource.addPidFilter(TsParser.DVB_EIT_PID); 145 mSource.addPidFilter(TsParser.DVB_SDT_PID); 146 } 147 synchronized (mCircularBufferMonitor) { 148 if (mStreaming) { 149 return true; 150 } 151 mStreaming = true; 152 } 153 154 mStreamingThread = new StreamingThread(); 155 mStreamingThread.start(); 156 Log.i(TAG, "Streaming started"); 157 return true; 158 } 159 160 @Override startStream(TunerChannel channel)161 public boolean startStream(TunerChannel channel) { 162 Log.i(TAG, "tuneToChannel with: " + channel.getFilepath()); 163 mSource = new StreamProvider(channel.getFilepath()); 164 if (!mSource.isReady()) { 165 return false; 166 } 167 mEventDetector.start(mSource, channel.getProgramNumber()); 168 mSource.addPidFilter(channel.getVideoPid()); 169 for (Integer i : channel.getAudioPids()) { 170 mSource.addPidFilter(i); 171 } 172 mSource.addPidFilter(channel.getPcrPid()); 173 mSource.addPidFilter(TsParser.PAT_PID); 174 mSource.addPidFilter(TsParser.ATSC_SI_BASE_PID); 175 if (Features.ENABLE_FILE_DVB.isEnabled(mContext)) { 176 mSource.addPidFilter(TsParser.DVB_EIT_PID); 177 mSource.addPidFilter(TsParser.DVB_SDT_PID); 178 } 179 synchronized (mCircularBufferMonitor) { 180 if (mStreaming) { 181 return true; 182 } 183 mStreaming = true; 184 } 185 186 mStreamingThread = new StreamingThread(); 187 mStreamingThread.start(); 188 Log.i(TAG, "Streaming started"); 189 return true; 190 } 191 192 /** 193 * Blocks the current thread until the streaming thread stops. In rare cases when the tuner 194 * device is overloaded this can take a while, but usually it returns pretty quickly. 195 */ 196 @Override stopStream()197 public void stopStream() { 198 synchronized (mCircularBufferMonitor) { 199 mStreaming = false; 200 mCircularBufferMonitor.notify(); 201 } 202 203 try { 204 if (mStreamingThread != null) { 205 mStreamingThread.join(); 206 } 207 } catch (InterruptedException e) { 208 Thread.currentThread().interrupt(); 209 } 210 } 211 212 @Override createDataSource()213 public TsDataSource createDataSource() { 214 return new FileDataSource(this); 215 } 216 217 /** 218 * Returns the current buffered position from the file. 219 * @return the current buffered position 220 */ getBufferedPosition()221 public long getBufferedPosition() { 222 synchronized (mCircularBufferMonitor) { 223 return mBytesFetched; 224 } 225 } 226 227 /** 228 * Provides MPEG-2 transport stream from a local file. Stream can be filtered by PID. 229 */ 230 public static class StreamProvider { 231 private final String mFilepath; 232 private final SparseBooleanArray mPids = new SparseBooleanArray(); 233 private final byte[] mPreBuffer = new byte[READ_BUFFER_SIZE]; 234 235 private BufferedInputStream mInputStream; 236 StreamProvider(String filepath)237 private StreamProvider(String filepath) { 238 mFilepath = filepath; 239 open(filepath); 240 } 241 open(String filepath)242 private void open(String filepath) { 243 try { 244 mInputStream = new BufferedInputStream(new FileInputStream(filepath)); 245 } catch (IOException e) { 246 Log.e(TAG, "Error opening input stream", e); 247 mInputStream = null; 248 } 249 } 250 isReady()251 private boolean isReady() { 252 return mInputStream != null; 253 } 254 255 /** 256 * Returns the file path of the MPEG-2 TS file. 257 */ getFilepath()258 public String getFilepath() { 259 return mFilepath; 260 } 261 262 /** 263 * Adds a pid for filtering from the MPEG-2 TS file. 264 */ addPidFilter(int pid)265 public void addPidFilter(int pid) { 266 mPids.put(pid, true); 267 } 268 269 /** 270 * Returns whether the current pid filter is empty or not. 271 */ isFilterEmpty()272 public boolean isFilterEmpty() { 273 return mPids.size() == 0; 274 } 275 276 /** 277 * Clears the current pid filter. 278 */ clearPidFilter()279 public void clearPidFilter() { 280 mPids.clear(); 281 } 282 283 /** 284 * Returns whether a pid is in the pid filter or not. 285 * @param pid the pid to check 286 */ isInFilter(int pid)287 public boolean isInFilter(int pid) { 288 return mPids.get(pid); 289 } 290 291 /** 292 * Reads from the MPEG-2 TS file to buffer. 293 * 294 * @param inputBuffer to read 295 * @return the number of read bytes 296 */ read(byte[] inputBuffer)297 private int read(byte[] inputBuffer) { 298 int readSize = readInternal(); 299 if (readSize <= 0) { 300 // Reached the end of stream. Restart from the beginning. 301 close(); 302 open(mFilepath); 303 if (mInputStream == null) { 304 return -1; 305 } 306 readSize = readInternal(); 307 } 308 309 if (mPreBuffer[0] != TS_SYNC_BYTE) { 310 Log.e(TAG, "Error reading input stream - no TS sync found"); 311 return -1; 312 } 313 int filteredSize = 0; 314 for (int i = 0, destPos = 0; i < readSize; i += TS_PACKET_SIZE) { 315 if (mPreBuffer[i] == TS_SYNC_BYTE) { 316 int pid = ((mPreBuffer[i + 1] & 0x1f) << 8) + (mPreBuffer[i + 2] & 0xff); 317 if (mPids.get(pid)) { 318 System.arraycopy(mPreBuffer, i, inputBuffer, destPos, TS_PACKET_SIZE); 319 destPos += TS_PACKET_SIZE; 320 filteredSize += TS_PACKET_SIZE; 321 } 322 } 323 } 324 return filteredSize; 325 } 326 readInternal()327 private int readInternal() { 328 int readSize; 329 try { 330 readSize = mInputStream.read(mPreBuffer, 0, mPreBuffer.length); 331 } catch (IOException e) { 332 Log.e(TAG, "Error reading input stream", e); 333 return -1; 334 } 335 return readSize; 336 } 337 close()338 private void close() { 339 try { 340 mInputStream.close(); 341 } catch (IOException e) { 342 Log.e(TAG, "Error closing input stream:", e); 343 } 344 mInputStream = null; 345 } 346 } 347 348 /** 349 * Reads data from internal buffer. 350 * @param pos the position to read from 351 * @param buffer to read 352 * @param offset start position of the read buffer 353 * @param amount number of bytes to read 354 * @return number of read bytes when successful, {@code -1} otherwise 355 * @throws IOException 356 */ readAt(long pos, byte[] buffer, int offset, int amount)357 public int readAt(long pos, byte[] buffer, int offset, int amount) throws IOException { 358 synchronized (mCircularBufferMonitor) { 359 long initialBytesFetched = mBytesFetched; 360 while (mBytesFetched < pos + amount && mStreaming) { 361 try { 362 mCircularBufferMonitor.wait(READ_TIMEOUT_MS); 363 } catch (InterruptedException e) { 364 // Wait again. 365 Thread.currentThread().interrupt(); 366 } 367 if (initialBytesFetched == mBytesFetched) { 368 Log.w(TAG, "No data update for " + READ_TIMEOUT_MS + "ms. returning -1."); 369 370 // Returning -1 will make demux report EOS so that the input service can retry 371 // the playback. 372 return -1; 373 } 374 } 375 if (!mStreaming) { 376 Log.w(TAG, "Stream is already stopped."); 377 return -1; 378 } 379 if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) { 380 Log.e(TAG, "Demux is requesting the data which is already overwritten."); 381 return -1; 382 } 383 int posInBuffer = (int) (pos % CIRCULAR_BUFFER_SIZE); 384 int bytesToCopyInFirstPass = amount; 385 if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) { 386 bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer; 387 } 388 System.arraycopy(mCircularBuffer, posInBuffer, buffer, offset, bytesToCopyInFirstPass); 389 if (bytesToCopyInFirstPass < amount) { 390 System.arraycopy(mCircularBuffer, 0, buffer, offset + bytesToCopyInFirstPass, 391 amount - bytesToCopyInFirstPass); 392 } 393 mLastReadPosition = pos + amount; 394 mCircularBufferMonitor.notify(); 395 return amount; 396 } 397 } 398 399 /** 400 * Adds {@link ScanChannel} instance for local files. 401 * 402 * @param output a list of channels where the results will be placed in 403 */ addLocalStreamFiles(List<ScanChannel> output)404 public static void addLocalStreamFiles(List<ScanChannel> output) { 405 File dir = new File(FILE_DIR); 406 if (!dir.exists()) return; 407 408 File[] tsFiles = dir.listFiles(); 409 if (tsFiles == null) return; 410 int freq = FileTsStreamer.FREQ_BASE; 411 for (File file : tsFiles) { 412 if (!file.isFile()) continue; 413 output.add(ScanChannel.forFile(freq, file.getName())); 414 freq += 100; 415 } 416 } 417 418 /** 419 * A thread managing a circular buffer that holds stream data to be consumed by player. 420 * Keeps reading data in from a {@link StreamProvider} to hold enough amount for buffering. 421 * Started and stopped by {@link #startStream()} and {@link #stopStream()}, respectively. 422 */ 423 private class StreamingThread extends Thread { 424 @Override run()425 public void run() { 426 byte[] dataBuffer = new byte[READ_BUFFER_SIZE]; 427 428 synchronized (mCircularBufferMonitor) { 429 mBytesFetched = 0; 430 mLastReadPosition = 0; 431 } 432 433 while (true) { 434 synchronized (mCircularBufferMonitor) { 435 while ((mBytesFetched - mLastReadPosition + PADDING_SIZE) > CIRCULAR_BUFFER_SIZE 436 && mStreaming) { 437 try { 438 mCircularBufferMonitor.wait(); 439 } catch (InterruptedException e) { 440 // Wait again. 441 Thread.currentThread().interrupt(); 442 } 443 } 444 if (!mStreaming) { 445 break; 446 } 447 } 448 449 int bytesWritten = mSource.read(dataBuffer); 450 if (bytesWritten <= 0) { 451 try { 452 // When buffer is underrun, we sleep for short time to prevent 453 // unnecessary CPU draining. 454 sleep(BUFFER_UNDERRUN_SLEEP_MS); 455 } catch (InterruptedException e) { 456 Thread.currentThread().interrupt(); 457 } 458 continue; 459 } 460 461 mEventDetector.feedTSStream(dataBuffer, 0, bytesWritten); 462 463 synchronized (mCircularBufferMonitor) { 464 int posInBuffer = (int) (mBytesFetched % CIRCULAR_BUFFER_SIZE); 465 int bytesToCopyInFirstPass = bytesWritten; 466 if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) { 467 bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer; 468 } 469 System.arraycopy(dataBuffer, 0, mCircularBuffer, posInBuffer, 470 bytesToCopyInFirstPass); 471 if (bytesToCopyInFirstPass < bytesWritten) { 472 System.arraycopy(dataBuffer, bytesToCopyInFirstPass, mCircularBuffer, 0, 473 bytesWritten - bytesToCopyInFirstPass); 474 } 475 mBytesFetched += bytesWritten; 476 mCircularBufferMonitor.notify(); 477 } 478 } 479 480 Log.i(TAG, "Streaming stopped"); 481 mSource.close(); 482 } 483 } 484 } 485