• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2015 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.tv.tuner.source;
18 
19 import android.content.Context;
20 import android.os.Environment;
21 import android.util.Log;
22 import android.util.SparseBooleanArray;
23 import com.android.tv.common.SoftPreconditions;
24 import com.android.tv.tuner.api.ScanChannel;
25 import com.android.tv.tuner.data.TunerChannel;
26 import com.android.tv.tuner.features.TunerFeatures;
27 import com.android.tv.tuner.ts.EventDetector.EventListener;
28 import com.android.tv.tuner.ts.TsParser;
29 import com.google.android.exoplayer.C;
30 import com.google.android.exoplayer.upstream.DataSpec;
31 import java.io.BufferedInputStream;
32 import java.io.File;
33 import java.io.FileInputStream;
34 import java.io.IOException;
35 import java.util.List;
36 import java.util.concurrent.atomic.AtomicLong;
37 
38 /**
39  * Provides MPEG-2 TS stream sources for both channel scanning and channel playing from a local file
40  * generated by capturing TV signal.
41  */
42 public class FileTsStreamer implements TsStreamer {
43     private static final String TAG = "FileTsStreamer";
44 
45     private static final int TS_PACKET_SIZE = 188;
46     private static final int TS_SYNC_BYTE = 0x47;
47     private static final int MIN_READ_UNIT = TS_PACKET_SIZE * 10;
48     private static final int READ_BUFFER_SIZE = MIN_READ_UNIT * 10; // ~20KB
49     private static final int CIRCULAR_BUFFER_SIZE = MIN_READ_UNIT * 4000; // ~ 8MB
50     private static final int PADDING_SIZE = MIN_READ_UNIT * 1000; // ~2MB
51     private static final int READ_TIMEOUT_MS = 10000; // 10 secs.
52     private static final int BUFFER_UNDERRUN_SLEEP_MS = 10;
53     private static final String FILE_DIR =
54             new File(Environment.getExternalStorageDirectory(), "Streams").getAbsolutePath();
55 
56     // Virtual frequency base used for file-based source
57     public static final int FREQ_BASE = 100;
58 
59     private final Object mCircularBufferMonitor = new Object();
60     private final byte[] mCircularBuffer = new byte[CIRCULAR_BUFFER_SIZE];
61     private final FileSourceEventDetector mEventDetector;
62     private final Context mContext;
63 
64     private long mBytesFetched;
65     private long mLastReadPosition;
66     private boolean mStreaming;
67 
68     private Thread mStreamingThread;
69     private StreamProvider mSource;
70 
71     public static class FileDataSource extends TsDataSource {
72         private final FileTsStreamer mTsStreamer;
73         private final AtomicLong mLastReadPosition = new AtomicLong(0);
74         private long mStartBufferedPosition;
75 
FileDataSource(FileTsStreamer tsStreamer)76         private FileDataSource(FileTsStreamer tsStreamer) {
77             mTsStreamer = tsStreamer;
78             mStartBufferedPosition = tsStreamer.getBufferedPosition();
79         }
80 
81         @Override
getBufferedPosition()82         public long getBufferedPosition() {
83             return mTsStreamer.getBufferedPosition() - mStartBufferedPosition;
84         }
85 
86         @Override
getLastReadPosition()87         public long getLastReadPosition() {
88             return mLastReadPosition.get();
89         }
90 
91         @Override
shiftStartPosition(long offset)92         public void shiftStartPosition(long offset) {
93             SoftPreconditions.checkState(mLastReadPosition.get() == 0);
94             SoftPreconditions.checkArgument(0 <= offset && offset <= getBufferedPosition());
95             mStartBufferedPosition += offset;
96         }
97 
98         @Override
open(DataSpec dataSpec)99         public long open(DataSpec dataSpec) throws IOException {
100             mLastReadPosition.set(0);
101             return C.LENGTH_UNBOUNDED;
102         }
103 
104         @Override
close()105         public void close() {}
106 
107         @Override
read(byte[] buffer, int offset, int readLength)108         public int read(byte[] buffer, int offset, int readLength) throws IOException {
109             int ret =
110                     mTsStreamer.readAt(
111                             mStartBufferedPosition + mLastReadPosition.get(),
112                             buffer,
113                             offset,
114                             readLength);
115             if (ret > 0) {
116                 mLastReadPosition.addAndGet(ret);
117             }
118             return ret;
119         }
120     }
121 
122     /**
123      * Creates {@link TsStreamer} for scanning & playing MPEG-2 TS file.
124      *
125      * @param eventListener the listener for channel & program information
126      */
FileTsStreamer(EventListener eventListener, Context context)127     public FileTsStreamer(EventListener eventListener, Context context) {
128         mEventDetector =
129                 new FileSourceEventDetector(
130                         eventListener, TunerFeatures.ENABLE_FILE_DVB.isEnabled(context));
131         mContext = context;
132     }
133 
134     @Override
startStream(ScanChannel channel)135     public boolean startStream(ScanChannel channel) {
136         String filepath = new File(FILE_DIR, channel.filename).getAbsolutePath();
137         mSource = new StreamProvider(filepath);
138         if (!mSource.isReady()) {
139             return false;
140         }
141         mEventDetector.start(mSource, FileSourceEventDetector.ALL_PROGRAM_NUMBERS);
142         mSource.addPidFilter(TsParser.PAT_PID);
143         mSource.addPidFilter(TsParser.ATSC_SI_BASE_PID);
144         if (TunerFeatures.ENABLE_FILE_DVB.isEnabled(mContext)) {
145             mSource.addPidFilter(TsParser.DVB_EIT_PID);
146             mSource.addPidFilter(TsParser.DVB_SDT_PID);
147         }
148         synchronized (mCircularBufferMonitor) {
149             if (mStreaming) {
150                 return true;
151             }
152             mStreaming = true;
153         }
154 
155         mStreamingThread = new StreamingThread();
156         mStreamingThread.start();
157         Log.i(TAG, "Streaming started");
158         return true;
159     }
160 
161     @Override
startStream(TunerChannel channel)162     public boolean startStream(TunerChannel channel) {
163         Log.i(TAG, "tuneToChannel with: " + channel.getFilepath());
164         mSource = new StreamProvider(channel.getFilepath());
165         if (!mSource.isReady()) {
166             return false;
167         }
168         mEventDetector.start(mSource, channel.getProgramNumber());
169         mSource.addPidFilter(channel.getVideoPid());
170         for (Integer i : channel.getAudioPids()) {
171             mSource.addPidFilter(i);
172         }
173         mSource.addPidFilter(channel.getPcrPid());
174         mSource.addPidFilter(TsParser.PAT_PID);
175         mSource.addPidFilter(TsParser.ATSC_SI_BASE_PID);
176         if (TunerFeatures.ENABLE_FILE_DVB.isEnabled(mContext)) {
177             mSource.addPidFilter(TsParser.DVB_EIT_PID);
178             mSource.addPidFilter(TsParser.DVB_SDT_PID);
179         }
180         synchronized (mCircularBufferMonitor) {
181             if (mStreaming) {
182                 return true;
183             }
184             mStreaming = true;
185         }
186 
187         mStreamingThread = new StreamingThread();
188         mStreamingThread.start();
189         Log.i(TAG, "Streaming started");
190         return true;
191     }
192 
193     /**
194      * Blocks the current thread until the streaming thread stops. In rare cases when the tuner
195      * device is overloaded this can take a while, but usually it returns pretty quickly.
196      */
197     @Override
stopStream()198     public void stopStream() {
199         synchronized (mCircularBufferMonitor) {
200             mStreaming = false;
201             mCircularBufferMonitor.notify();
202         }
203 
204         try {
205             if (mStreamingThread != null) {
206                 mStreamingThread.join();
207             }
208         } catch (InterruptedException e) {
209             Thread.currentThread().interrupt();
210         }
211     }
212 
213     @Override
createDataSource()214     public TsDataSource createDataSource() {
215         return new FileDataSource(this);
216     }
217 
218     /**
219      * Returns the current buffered position from the file.
220      *
221      * @return the current buffered position
222      */
getBufferedPosition()223     public long getBufferedPosition() {
224         synchronized (mCircularBufferMonitor) {
225             return mBytesFetched;
226         }
227     }
228 
229     /** Provides MPEG-2 transport stream from a local file. Stream can be filtered by PID. */
230     public static class StreamProvider {
231         private final String mFilepath;
232         private final SparseBooleanArray mPids = new SparseBooleanArray();
233         private final byte[] mPreBuffer = new byte[READ_BUFFER_SIZE];
234 
235         private BufferedInputStream mInputStream;
236 
StreamProvider(String filepath)237         private StreamProvider(String filepath) {
238             mFilepath = filepath;
239             open(filepath);
240         }
241 
open(String filepath)242         private void open(String filepath) {
243             try {
244                 mInputStream = new BufferedInputStream(new FileInputStream(filepath));
245             } catch (IOException e) {
246                 Log.e(TAG, "Error opening input stream", e);
247                 mInputStream = null;
248             }
249         }
250 
isReady()251         private boolean isReady() {
252             return mInputStream != null;
253         }
254 
255         /** Returns the file path of the MPEG-2 TS file. */
getFilepath()256         public String getFilepath() {
257             return mFilepath;
258         }
259 
260         /** Adds a pid for filtering from the MPEG-2 TS file. */
addPidFilter(int pid)261         public void addPidFilter(int pid) {
262             mPids.put(pid, true);
263         }
264 
265         /** Returns whether the current pid filter is empty or not. */
isFilterEmpty()266         public boolean isFilterEmpty() {
267             return mPids.size() == 0;
268         }
269 
270         /** Clears the current pid filter. */
clearPidFilter()271         public void clearPidFilter() {
272             mPids.clear();
273         }
274 
275         /**
276          * Returns whether a pid is in the pid filter or not.
277          *
278          * @param pid the pid to check
279          */
isInFilter(int pid)280         public boolean isInFilter(int pid) {
281             return mPids.get(pid);
282         }
283 
284         /**
285          * Reads from the MPEG-2 TS file to buffer.
286          *
287          * @param inputBuffer to read
288          * @return the number of read bytes
289          */
read(byte[] inputBuffer)290         private int read(byte[] inputBuffer) {
291             int readSize = readInternal();
292             if (readSize <= 0) {
293                 // Reached the end of stream. Restart from the beginning.
294                 close();
295                 open(mFilepath);
296                 if (mInputStream == null) {
297                     return -1;
298                 }
299                 readSize = readInternal();
300             }
301 
302             if (mPreBuffer[0] != TS_SYNC_BYTE) {
303                 Log.e(TAG, "Error reading input stream - no TS sync found");
304                 return -1;
305             }
306             int filteredSize = 0;
307             for (int i = 0, destPos = 0; i < readSize; i += TS_PACKET_SIZE) {
308                 if (mPreBuffer[i] == TS_SYNC_BYTE) {
309                     int pid = ((mPreBuffer[i + 1] & 0x1f) << 8) + (mPreBuffer[i + 2] & 0xff);
310                     if (mPids.get(pid)) {
311                         System.arraycopy(mPreBuffer, i, inputBuffer, destPos, TS_PACKET_SIZE);
312                         destPos += TS_PACKET_SIZE;
313                         filteredSize += TS_PACKET_SIZE;
314                     }
315                 }
316             }
317             return filteredSize;
318         }
319 
readInternal()320         private int readInternal() {
321             int readSize;
322             try {
323                 readSize = mInputStream.read(mPreBuffer, 0, mPreBuffer.length);
324             } catch (IOException e) {
325                 Log.e(TAG, "Error reading input stream", e);
326                 return -1;
327             }
328             return readSize;
329         }
330 
close()331         private void close() {
332             try {
333                 mInputStream.close();
334             } catch (IOException e) {
335                 Log.e(TAG, "Error closing input stream:", e);
336             }
337             mInputStream = null;
338         }
339     }
340 
341     /**
342      * Reads data from internal buffer.
343      *
344      * @param pos the position to read from
345      * @param buffer to read
346      * @param offset start position of the read buffer
347      * @param amount number of bytes to read
348      * @return number of read bytes when successful, {@code -1} otherwise
349      * @throws IOException
350      */
readAt(long pos, byte[] buffer, int offset, int amount)351     public int readAt(long pos, byte[] buffer, int offset, int amount) throws IOException {
352         synchronized (mCircularBufferMonitor) {
353             long initialBytesFetched = mBytesFetched;
354             while (mBytesFetched < pos + amount && mStreaming) {
355                 try {
356                     mCircularBufferMonitor.wait(READ_TIMEOUT_MS);
357                 } catch (InterruptedException e) {
358                     // Wait again.
359                     Thread.currentThread().interrupt();
360                 }
361                 if (initialBytesFetched == mBytesFetched) {
362                     Log.w(TAG, "No data update for " + READ_TIMEOUT_MS + "ms. returning -1.");
363 
364                     // Returning -1 will make demux report EOS so that the input service can retry
365                     // the playback.
366                     return -1;
367                 }
368             }
369             if (!mStreaming) {
370                 Log.w(TAG, "Stream is already stopped.");
371                 return -1;
372             }
373             if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) {
374                 Log.e(TAG, "Demux is requesting the data which is already overwritten.");
375                 return -1;
376             }
377             int posInBuffer = (int) (pos % CIRCULAR_BUFFER_SIZE);
378             int bytesToCopyInFirstPass = amount;
379             if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) {
380                 bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer;
381             }
382             System.arraycopy(mCircularBuffer, posInBuffer, buffer, offset, bytesToCopyInFirstPass);
383             if (bytesToCopyInFirstPass < amount) {
384                 System.arraycopy(
385                         mCircularBuffer,
386                         0,
387                         buffer,
388                         offset + bytesToCopyInFirstPass,
389                         amount - bytesToCopyInFirstPass);
390             }
391             mLastReadPosition = pos + amount;
392             mCircularBufferMonitor.notify();
393             return amount;
394         }
395     }
396 
397     /**
398      * Adds {@link ScanChannel} instance for local files.
399      *
400      * @param output a list of channels where the results will be placed in
401      */
addLocalStreamFiles(List<ScanChannel> output)402     public static void addLocalStreamFiles(List<ScanChannel> output) {
403         File dir = new File(FILE_DIR);
404         if (!dir.exists()) return;
405 
406         File[] tsFiles = dir.listFiles();
407         if (tsFiles == null) return;
408         int freq = FileTsStreamer.FREQ_BASE;
409         for (File file : tsFiles) {
410             if (!file.isFile()) continue;
411             output.add(ScanChannel.forFile(freq, file.getName()));
412             freq += 100;
413         }
414     }
415 
416     /**
417      * A thread managing a circular buffer that holds stream data to be consumed by player. Keeps
418      * reading data in from a {@link StreamProvider} to hold enough amount for buffering. Started
419      * and stopped by {@link #startStream()} and {@link #stopStream()}, respectively.
420      */
421     private class StreamingThread extends Thread {
422         @Override
run()423         public void run() {
424             byte[] dataBuffer = new byte[READ_BUFFER_SIZE];
425 
426             synchronized (mCircularBufferMonitor) {
427                 mBytesFetched = 0;
428                 mLastReadPosition = 0;
429             }
430 
431             while (true) {
432                 synchronized (mCircularBufferMonitor) {
433                     while ((mBytesFetched - mLastReadPosition + PADDING_SIZE) > CIRCULAR_BUFFER_SIZE
434                             && mStreaming) {
435                         try {
436                             mCircularBufferMonitor.wait();
437                         } catch (InterruptedException e) {
438                             // Wait again.
439                             Thread.currentThread().interrupt();
440                         }
441                     }
442                     if (!mStreaming) {
443                         break;
444                     }
445                 }
446 
447                 int bytesWritten = mSource.read(dataBuffer);
448                 if (bytesWritten <= 0) {
449                     try {
450                         // When buffer is underrun, we sleep for short time to prevent
451                         // unnecessary CPU draining.
452                         sleep(BUFFER_UNDERRUN_SLEEP_MS);
453                     } catch (InterruptedException e) {
454                         Thread.currentThread().interrupt();
455                     }
456                     continue;
457                 }
458 
459                 mEventDetector.feedTSStream(dataBuffer, 0, bytesWritten);
460 
461                 synchronized (mCircularBufferMonitor) {
462                     int posInBuffer = (int) (mBytesFetched % CIRCULAR_BUFFER_SIZE);
463                     int bytesToCopyInFirstPass = bytesWritten;
464                     if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) {
465                         bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer;
466                     }
467                     System.arraycopy(
468                             dataBuffer, 0, mCircularBuffer, posInBuffer, bytesToCopyInFirstPass);
469                     if (bytesToCopyInFirstPass < bytesWritten) {
470                         System.arraycopy(
471                                 dataBuffer,
472                                 bytesToCopyInFirstPass,
473                                 mCircularBuffer,
474                                 0,
475                                 bytesWritten - bytesToCopyInFirstPass);
476                     }
477                     mBytesFetched += bytesWritten;
478                     mCircularBufferMonitor.notify();
479                 }
480             }
481 
482             Log.i(TAG, "Streaming stopped");
483             mSource.close();
484         }
485     }
486 }
487