• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2015 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.tv.tuner.source;
18 
19 import android.os.Environment;
20 import android.util.Log;
21 import android.util.SparseBooleanArray;
22 
23 import com.google.android.exoplayer.C;
24 import com.google.android.exoplayer.upstream.DataSpec;
25 import com.android.tv.common.SoftPreconditions;
26 import com.android.tv.tuner.ChannelScanFileParser.ScanChannel;
27 import com.android.tv.tuner.data.TunerChannel;
28 import com.android.tv.tuner.ts.TsParser;
29 import com.android.tv.tuner.tvinput.EventDetector;
30 import com.android.tv.tuner.tvinput.FileSourceEventDetector;
31 
32 import java.io.BufferedInputStream;
33 import java.io.File;
34 import java.io.FileInputStream;
35 import java.io.IOException;
36 import java.util.List;
37 import java.util.concurrent.atomic.AtomicLong;
38 
39 /**
40  * Provides MPEG-2 TS stream sources for both channel scanning and channel playing from a local file
41  * generated by capturing TV signal.
42  */
43 public class FileTsStreamer implements TsStreamer {
44     private static final String TAG = "FileTsStreamer";
45 
46     private static final int TS_PACKET_SIZE = 188;
47     private static final int TS_SYNC_BYTE = 0x47;
48     private static final int MIN_READ_UNIT = TS_PACKET_SIZE * 10;
49     private static final int READ_BUFFER_SIZE = MIN_READ_UNIT * 10; // ~20KB
50     private static final int CIRCULAR_BUFFER_SIZE = MIN_READ_UNIT * 4000; // ~ 8MB
51     private static final int PADDING_SIZE = MIN_READ_UNIT * 1000; // ~2MB
52     private static final int READ_TIMEOUT_MS = 10000; // 10 secs.
53     private static final int BUFFER_UNDERRUN_SLEEP_MS = 10;
54     private static final String FILE_DIR =
55             new File(Environment.getExternalStorageDirectory(), "Streams").getAbsolutePath();
56 
57     // Virtual frequency base used for file-based source
58     public static final int FREQ_BASE = 100;
59 
60     private final Object mCircularBufferMonitor = new Object();
61     private final byte[] mCircularBuffer = new byte[CIRCULAR_BUFFER_SIZE];
62     private final FileSourceEventDetector mEventDetector;
63 
64     private long mBytesFetched;
65     private long mLastReadPosition;
66     private boolean mStreaming;
67 
68     private Thread mStreamingThread;
69     private StreamProvider mSource;
70 
71     public static class FileDataSource extends TsDataSource {
72         private final FileTsStreamer mTsStreamer;
73         private final AtomicLong mLastReadPosition = new AtomicLong(0);
74         private long mStartBufferedPosition;
75 
FileDataSource(FileTsStreamer tsStreamer)76         private FileDataSource(FileTsStreamer tsStreamer) {
77             mTsStreamer = tsStreamer;
78             mStartBufferedPosition = tsStreamer.getBufferedPosition();
79         }
80 
81         @Override
getBufferedPosition()82         public long getBufferedPosition() {
83             return mTsStreamer.getBufferedPosition() - mStartBufferedPosition;
84         }
85 
86         @Override
getLastReadPosition()87         public long getLastReadPosition() {
88             return mLastReadPosition.get();
89         }
90 
91         @Override
shiftStartPosition(long offset)92         public void shiftStartPosition(long offset) {
93             SoftPreconditions.checkState(mLastReadPosition.get() == 0);
94             SoftPreconditions.checkArgument(0 <= offset && offset <= getBufferedPosition());
95             mStartBufferedPosition += offset;
96         }
97 
98         @Override
open(DataSpec dataSpec)99         public long open(DataSpec dataSpec) throws IOException {
100             mLastReadPosition.set(0);
101             return C.LENGTH_UNBOUNDED;
102         }
103 
104         @Override
close()105         public void close() {
106         }
107 
108         @Override
read(byte[] buffer, int offset, int readLength)109         public int read(byte[] buffer, int offset, int readLength) throws IOException {
110             int ret = mTsStreamer.readAt(mStartBufferedPosition + mLastReadPosition.get(), buffer,
111                     offset, readLength);
112             if (ret > 0) {
113                 mLastReadPosition.addAndGet(ret);
114             }
115             return ret;
116         }
117     }
118 
119     /**
120      * Creates {@link TsStreamer} for scanning & playing MPEG-2 TS file.
121      * @param eventListener the listener for channel & program information
122      */
FileTsStreamer(EventDetector.EventListener eventListener)123     public FileTsStreamer(EventDetector.EventListener eventListener) {
124         mEventDetector = new FileSourceEventDetector(eventListener);
125     }
126 
127     @Override
startStream(ScanChannel channel)128     public boolean startStream(ScanChannel channel) {
129         String filepath = new File(FILE_DIR, channel.filename).getAbsolutePath();
130         mSource = new StreamProvider(filepath);
131         if (!mSource.isReady()) {
132             return false;
133         }
134         mEventDetector.start(mSource, FileSourceEventDetector.ALL_PROGRAM_NUMBERS);
135         mSource.addPidFilter(TsParser.ATSC_SI_BASE_PID);
136         mSource.addPidFilter(TsParser.PAT_PID);
137         synchronized (mCircularBufferMonitor) {
138             if (mStreaming) {
139                 return true;
140             }
141             mStreaming = true;
142         }
143 
144         mStreamingThread = new StreamingThread();
145         mStreamingThread.start();
146         Log.i(TAG, "Streaming started");
147         return true;
148     }
149 
150     @Override
startStream(TunerChannel channel)151     public boolean startStream(TunerChannel channel) {
152         Log.i(TAG, "tuneToChannel with: " + channel.getFilepath());
153         mSource = new StreamProvider(channel.getFilepath());
154         if (!mSource.isReady()) {
155             return false;
156         }
157         mEventDetector.start(mSource, channel.getProgramNumber());
158         mSource.addPidFilter(channel.getVideoPid());
159         for (Integer i : channel.getAudioPids()) {
160             mSource.addPidFilter(i);
161         }
162         mSource.addPidFilter(channel.getPcrPid());
163         mSource.addPidFilter(TsParser.ATSC_SI_BASE_PID);
164         mSource.addPidFilter(TsParser.PAT_PID);
165         synchronized (mCircularBufferMonitor) {
166             if (mStreaming) {
167                 return true;
168             }
169             mStreaming = true;
170         }
171 
172         mStreamingThread = new StreamingThread();
173         mStreamingThread.start();
174         Log.i(TAG, "Streaming started");
175         return true;
176     }
177 
178     /**
179      * Blocks the current thread until the streaming thread stops. In rare cases when the tuner
180      * device is overloaded this can take a while, but usually it returns pretty quickly.
181      */
182     @Override
stopStream()183     public void stopStream() {
184         synchronized (mCircularBufferMonitor) {
185             mStreaming = false;
186             mCircularBufferMonitor.notify();
187         }
188 
189         try {
190             if (mStreamingThread != null) {
191                 mStreamingThread.join();
192             }
193         } catch (InterruptedException e) {
194             Thread.currentThread().interrupt();
195         }
196     }
197 
198     @Override
createDataSource()199     public TsDataSource createDataSource() {
200         return new FileDataSource(this);
201     }
202 
203     /**
204      * Returns the current buffered position from the file.
205      * @return the current buffered position
206      */
getBufferedPosition()207     public long getBufferedPosition() {
208         synchronized (mCircularBufferMonitor) {
209             return mBytesFetched;
210         }
211     }
212 
213     /**
214      * Provides MPEG-2 transport stream from a local file. Stream can be filtered by PID.
215      */
216     public static class StreamProvider {
217         private final String mFilepath;
218         private final SparseBooleanArray mPids = new SparseBooleanArray();
219         private final byte[] mPreBuffer = new byte[READ_BUFFER_SIZE];
220 
221         private BufferedInputStream mInputStream;
222 
StreamProvider(String filepath)223         private StreamProvider(String filepath) {
224             mFilepath = filepath;
225             open(filepath);
226         }
227 
open(String filepath)228         private void open(String filepath) {
229             try {
230                 mInputStream = new BufferedInputStream(new FileInputStream(filepath));
231             } catch (IOException e) {
232                 Log.e(TAG, "Error opening input stream", e);
233                 mInputStream = null;
234             }
235         }
236 
isReady()237         private boolean isReady() {
238             return mInputStream != null;
239         }
240 
241         /**
242          * Returns the file path of the MPEG-2 TS file.
243          */
getFilepath()244         public String getFilepath() {
245             return mFilepath;
246         }
247 
248         /**
249          * Adds a pid for filtering from the MPEG-2 TS file.
250          */
addPidFilter(int pid)251         public void addPidFilter(int pid) {
252             mPids.put(pid, true);
253         }
254 
255         /**
256          * Returns whether the current pid filter is empty or not.
257          */
isFilterEmpty()258         public boolean isFilterEmpty() {
259             return mPids.size() > 0;
260         }
261 
262         /**
263          * Clears the current pid filter.
264          */
clearPidFilter()265         public void clearPidFilter() {
266             mPids.clear();
267         }
268 
269         /**
270          * Returns whether a pid is in the pid filter or not.
271          * @param pid the pid to check
272          */
isInFilter(int pid)273         public boolean isInFilter(int pid) {
274             return mPids.get(pid);
275         }
276 
277         /**
278          * Reads from the MPEG-2 TS file to buffer.
279          *
280          * @param inputBuffer to read
281          * @return the number of read bytes
282          */
read(byte[] inputBuffer)283         private int read(byte[] inputBuffer) {
284             int readSize = readInternal();
285             if (readSize <= 0) {
286                 // Reached the end of stream. Restart from the beginning.
287                 close();
288                 open(mFilepath);
289                 if (mInputStream == null) {
290                     return -1;
291                 }
292                 readSize = readInternal();
293             }
294 
295             if (mPreBuffer[0] != TS_SYNC_BYTE) {
296                 Log.e(TAG, "Error reading input stream - no TS sync found");
297                 return -1;
298             }
299             int filteredSize = 0;
300             for (int i = 0, destPos = 0; i < readSize; i += TS_PACKET_SIZE) {
301                 if (mPreBuffer[i] == TS_SYNC_BYTE) {
302                     int pid = ((mPreBuffer[i + 1] & 0x1f) << 8) + (mPreBuffer[i + 2] & 0xff);
303                     if (mPids.get(pid)) {
304                         System.arraycopy(mPreBuffer, i, inputBuffer, destPos, TS_PACKET_SIZE);
305                         destPos += TS_PACKET_SIZE;
306                         filteredSize += TS_PACKET_SIZE;
307                     }
308                 }
309             }
310             return filteredSize;
311         }
312 
readInternal()313         private int readInternal() {
314             int readSize;
315             try {
316                 readSize = mInputStream.read(mPreBuffer, 0, mPreBuffer.length);
317             } catch (IOException e) {
318                 Log.e(TAG, "Error reading input stream", e);
319                 return -1;
320             }
321             return readSize;
322         }
323 
close()324         private void close() {
325             try {
326                 mInputStream.close();
327             } catch (IOException e) {
328                 Log.e(TAG, "Error closing input stream:", e);
329             }
330             mInputStream = null;
331         }
332     }
333 
334     /**
335      * Reads data from internal buffer.
336      * @param pos the position to read from
337      * @param buffer to read
338      * @param offset start position of the read buffer
339      * @param amount number of bytes to read
340      * @return number of read bytes when successful, {@code -1} otherwise
341      * @throws IOException
342      */
readAt(long pos, byte[] buffer, int offset, int amount)343     public int readAt(long pos, byte[] buffer, int offset, int amount) throws IOException {
344         synchronized (mCircularBufferMonitor) {
345             long initialBytesFetched = mBytesFetched;
346             while (mBytesFetched < pos + amount && mStreaming) {
347                 try {
348                     mCircularBufferMonitor.wait(READ_TIMEOUT_MS);
349                 } catch (InterruptedException e) {
350                     // Wait again.
351                     Thread.currentThread().interrupt();
352                 }
353                 if (initialBytesFetched == mBytesFetched) {
354                     Log.w(TAG, "No data update for " + READ_TIMEOUT_MS + "ms. returning -1.");
355 
356                     // Returning -1 will make demux report EOS so that the input service can retry
357                     // the playback.
358                     return -1;
359                 }
360             }
361             if (!mStreaming) {
362                 Log.w(TAG, "Stream is already stopped.");
363                 return -1;
364             }
365             if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) {
366                 Log.e(TAG, "Demux is requesting the data which is already overwritten.");
367                 return -1;
368             }
369             int posInBuffer = (int) (pos % CIRCULAR_BUFFER_SIZE);
370             int bytesToCopyInFirstPass = amount;
371             if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) {
372                 bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer;
373             }
374             System.arraycopy(mCircularBuffer, posInBuffer, buffer, offset, bytesToCopyInFirstPass);
375             if (bytesToCopyInFirstPass < amount) {
376                 System.arraycopy(mCircularBuffer, 0, buffer, offset + bytesToCopyInFirstPass,
377                         amount - bytesToCopyInFirstPass);
378             }
379             mLastReadPosition = pos + amount;
380             mCircularBufferMonitor.notify();
381             return amount;
382         }
383     }
384 
385     /**
386      * Adds {@link ScanChannel} instance for local files.
387      *
388      * @param output a list of channels where the results will be placed in
389      */
addLocalStreamFiles(List<ScanChannel> output)390     public static void addLocalStreamFiles(List<ScanChannel> output) {
391         File dir = new File(FILE_DIR);
392         if (!dir.exists()) return;
393 
394         File[] tsFiles = dir.listFiles();
395         if (tsFiles == null) return;
396         int freq = FileTsStreamer.FREQ_BASE;
397         for (File file : tsFiles) {
398             if (!file.isFile()) continue;
399             output.add(ScanChannel.forFile(freq, file.getName()));
400             freq += 100;
401         }
402     }
403 
404     /**
405      * A thread managing a circular buffer that holds stream data to be consumed by player.
406      * Keeps reading data in from a {@link StreamProvider} to hold enough amount for buffering.
407      * Started and stopped by {@link #startStream()} and {@link #stopStream()}, respectively.
408      */
409     private class StreamingThread extends Thread {
410         @Override
run()411         public void run() {
412             byte[] dataBuffer = new byte[READ_BUFFER_SIZE];
413 
414             synchronized (mCircularBufferMonitor) {
415                 mBytesFetched = 0;
416                 mLastReadPosition = 0;
417             }
418 
419             while (true) {
420                 synchronized (mCircularBufferMonitor) {
421                     while ((mBytesFetched - mLastReadPosition + PADDING_SIZE) > CIRCULAR_BUFFER_SIZE
422                             && mStreaming) {
423                         try {
424                             mCircularBufferMonitor.wait();
425                         } catch (InterruptedException e) {
426                             // Wait again.
427                             Thread.currentThread().interrupt();
428                         }
429                     }
430                     if (!mStreaming) {
431                         break;
432                     }
433                 }
434 
435                 int bytesWritten = mSource.read(dataBuffer);
436                 if (bytesWritten <= 0) {
437                     try {
438                         // When buffer is underrun, we sleep for short time to prevent
439                         // unnecessary CPU draining.
440                         sleep(BUFFER_UNDERRUN_SLEEP_MS);
441                     } catch (InterruptedException e) {
442                         Thread.currentThread().interrupt();
443                     }
444                     continue;
445                 }
446 
447                 mEventDetector.feedTSStream(dataBuffer, 0, bytesWritten);
448 
449                 synchronized (mCircularBufferMonitor) {
450                     int posInBuffer = (int) (mBytesFetched % CIRCULAR_BUFFER_SIZE);
451                     int bytesToCopyInFirstPass = bytesWritten;
452                     if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) {
453                         bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer;
454                     }
455                     System.arraycopy(dataBuffer, 0, mCircularBuffer, posInBuffer,
456                             bytesToCopyInFirstPass);
457                     if (bytesToCopyInFirstPass < bytesWritten) {
458                         System.arraycopy(dataBuffer, bytesToCopyInFirstPass, mCircularBuffer, 0,
459                                 bytesWritten - bytesToCopyInFirstPass);
460                     }
461                     mBytesFetched += bytesWritten;
462                     mCircularBufferMonitor.notify();
463                 }
464             }
465 
466             Log.i(TAG, "Streaming stopped");
467             mSource.close();
468         }
469     }
470 }
471