• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2015 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.tv.tuner.source;
18 
19 import android.content.Context;
20 import android.os.Environment;
21 import android.util.Log;
22 import android.util.SparseBooleanArray;
23 
24 import com.google.android.exoplayer.C;
25 import com.google.android.exoplayer.upstream.DataSpec;
26 import com.android.tv.Features;
27 import com.android.tv.common.SoftPreconditions;
28 import com.android.tv.tuner.ChannelScanFileParser.ScanChannel;
29 import com.android.tv.tuner.data.TunerChannel;
30 import com.android.tv.tuner.ts.TsParser;
31 import com.android.tv.tuner.tvinput.EventDetector;
32 import com.android.tv.tuner.tvinput.FileSourceEventDetector;
33 
34 import java.io.BufferedInputStream;
35 import java.io.File;
36 import java.io.FileInputStream;
37 import java.io.IOException;
38 import java.util.List;
39 import java.util.concurrent.atomic.AtomicLong;
40 
41 /**
42  * Provides MPEG-2 TS stream sources for both channel scanning and channel playing from a local file
43  * generated by capturing TV signal.
44  */
45 public class FileTsStreamer implements TsStreamer {
46     private static final String TAG = "FileTsStreamer";
47 
48     private static final int TS_PACKET_SIZE = 188;
49     private static final int TS_SYNC_BYTE = 0x47;
50     private static final int MIN_READ_UNIT = TS_PACKET_SIZE * 10;
51     private static final int READ_BUFFER_SIZE = MIN_READ_UNIT * 10; // ~20KB
52     private static final int CIRCULAR_BUFFER_SIZE = MIN_READ_UNIT * 4000; // ~ 8MB
53     private static final int PADDING_SIZE = MIN_READ_UNIT * 1000; // ~2MB
54     private static final int READ_TIMEOUT_MS = 10000; // 10 secs.
55     private static final int BUFFER_UNDERRUN_SLEEP_MS = 10;
56     private static final String FILE_DIR =
57             new File(Environment.getExternalStorageDirectory(), "Streams").getAbsolutePath();
58 
59     // Virtual frequency base used for file-based source
60     public static final int FREQ_BASE = 100;
61 
62     private final Object mCircularBufferMonitor = new Object();
63     private final byte[] mCircularBuffer = new byte[CIRCULAR_BUFFER_SIZE];
64     private final FileSourceEventDetector mEventDetector;
65     private final Context mContext;
66 
67     private long mBytesFetched;
68     private long mLastReadPosition;
69     private boolean mStreaming;
70 
71     private Thread mStreamingThread;
72     private StreamProvider mSource;
73 
74     public static class FileDataSource extends TsDataSource {
75         private final FileTsStreamer mTsStreamer;
76         private final AtomicLong mLastReadPosition = new AtomicLong(0);
77         private long mStartBufferedPosition;
78 
FileDataSource(FileTsStreamer tsStreamer)79         private FileDataSource(FileTsStreamer tsStreamer) {
80             mTsStreamer = tsStreamer;
81             mStartBufferedPosition = tsStreamer.getBufferedPosition();
82         }
83 
84         @Override
getBufferedPosition()85         public long getBufferedPosition() {
86             return mTsStreamer.getBufferedPosition() - mStartBufferedPosition;
87         }
88 
89         @Override
getLastReadPosition()90         public long getLastReadPosition() {
91             return mLastReadPosition.get();
92         }
93 
94         @Override
shiftStartPosition(long offset)95         public void shiftStartPosition(long offset) {
96             SoftPreconditions.checkState(mLastReadPosition.get() == 0);
97             SoftPreconditions.checkArgument(0 <= offset && offset <= getBufferedPosition());
98             mStartBufferedPosition += offset;
99         }
100 
101         @Override
open(DataSpec dataSpec)102         public long open(DataSpec dataSpec) throws IOException {
103             mLastReadPosition.set(0);
104             return C.LENGTH_UNBOUNDED;
105         }
106 
107         @Override
close()108         public void close() {
109         }
110 
111         @Override
read(byte[] buffer, int offset, int readLength)112         public int read(byte[] buffer, int offset, int readLength) throws IOException {
113             int ret = mTsStreamer.readAt(mStartBufferedPosition + mLastReadPosition.get(), buffer,
114                     offset, readLength);
115             if (ret > 0) {
116                 mLastReadPosition.addAndGet(ret);
117             }
118             return ret;
119         }
120     }
121 
122     /**
123      * Creates {@link TsStreamer} for scanning & playing MPEG-2 TS file.
124      * @param eventListener the listener for channel & program information
125      */
FileTsStreamer(EventDetector.EventListener eventListener, Context context)126     public FileTsStreamer(EventDetector.EventListener eventListener, Context context) {
127         mEventDetector =
128                 new FileSourceEventDetector(
129                         eventListener, Features.ENABLE_FILE_DVB.isEnabled(context));
130         mContext = context;
131     }
132 
133     @Override
startStream(ScanChannel channel)134     public boolean startStream(ScanChannel channel) {
135         String filepath = new File(FILE_DIR, channel.filename).getAbsolutePath();
136         mSource = new StreamProvider(filepath);
137         if (!mSource.isReady()) {
138             return false;
139         }
140         mEventDetector.start(mSource, FileSourceEventDetector.ALL_PROGRAM_NUMBERS);
141         mSource.addPidFilter(TsParser.PAT_PID);
142         mSource.addPidFilter(TsParser.ATSC_SI_BASE_PID);
143         if (Features.ENABLE_FILE_DVB.isEnabled(mContext)) {
144             mSource.addPidFilter(TsParser.DVB_EIT_PID);
145             mSource.addPidFilter(TsParser.DVB_SDT_PID);
146         }
147         synchronized (mCircularBufferMonitor) {
148             if (mStreaming) {
149                 return true;
150             }
151             mStreaming = true;
152         }
153 
154         mStreamingThread = new StreamingThread();
155         mStreamingThread.start();
156         Log.i(TAG, "Streaming started");
157         return true;
158     }
159 
160     @Override
startStream(TunerChannel channel)161     public boolean startStream(TunerChannel channel) {
162         Log.i(TAG, "tuneToChannel with: " + channel.getFilepath());
163         mSource = new StreamProvider(channel.getFilepath());
164         if (!mSource.isReady()) {
165             return false;
166         }
167         mEventDetector.start(mSource, channel.getProgramNumber());
168         mSource.addPidFilter(channel.getVideoPid());
169         for (Integer i : channel.getAudioPids()) {
170             mSource.addPidFilter(i);
171         }
172         mSource.addPidFilter(channel.getPcrPid());
173         mSource.addPidFilter(TsParser.PAT_PID);
174         mSource.addPidFilter(TsParser.ATSC_SI_BASE_PID);
175         if (Features.ENABLE_FILE_DVB.isEnabled(mContext)) {
176             mSource.addPidFilter(TsParser.DVB_EIT_PID);
177             mSource.addPidFilter(TsParser.DVB_SDT_PID);
178         }
179         synchronized (mCircularBufferMonitor) {
180             if (mStreaming) {
181                 return true;
182             }
183             mStreaming = true;
184         }
185 
186         mStreamingThread = new StreamingThread();
187         mStreamingThread.start();
188         Log.i(TAG, "Streaming started");
189         return true;
190     }
191 
192     /**
193      * Blocks the current thread until the streaming thread stops. In rare cases when the tuner
194      * device is overloaded this can take a while, but usually it returns pretty quickly.
195      */
196     @Override
stopStream()197     public void stopStream() {
198         synchronized (mCircularBufferMonitor) {
199             mStreaming = false;
200             mCircularBufferMonitor.notify();
201         }
202 
203         try {
204             if (mStreamingThread != null) {
205                 mStreamingThread.join();
206             }
207         } catch (InterruptedException e) {
208             Thread.currentThread().interrupt();
209         }
210     }
211 
212     @Override
createDataSource()213     public TsDataSource createDataSource() {
214         return new FileDataSource(this);
215     }
216 
217     /**
218      * Returns the current buffered position from the file.
219      * @return the current buffered position
220      */
getBufferedPosition()221     public long getBufferedPosition() {
222         synchronized (mCircularBufferMonitor) {
223             return mBytesFetched;
224         }
225     }
226 
227     /**
228      * Provides MPEG-2 transport stream from a local file. Stream can be filtered by PID.
229      */
230     public static class StreamProvider {
231         private final String mFilepath;
232         private final SparseBooleanArray mPids = new SparseBooleanArray();
233         private final byte[] mPreBuffer = new byte[READ_BUFFER_SIZE];
234 
235         private BufferedInputStream mInputStream;
236 
StreamProvider(String filepath)237         private StreamProvider(String filepath) {
238             mFilepath = filepath;
239             open(filepath);
240         }
241 
open(String filepath)242         private void open(String filepath) {
243             try {
244                 mInputStream = new BufferedInputStream(new FileInputStream(filepath));
245             } catch (IOException e) {
246                 Log.e(TAG, "Error opening input stream", e);
247                 mInputStream = null;
248             }
249         }
250 
isReady()251         private boolean isReady() {
252             return mInputStream != null;
253         }
254 
255         /**
256          * Returns the file path of the MPEG-2 TS file.
257          */
getFilepath()258         public String getFilepath() {
259             return mFilepath;
260         }
261 
262         /**
263          * Adds a pid for filtering from the MPEG-2 TS file.
264          */
addPidFilter(int pid)265         public void addPidFilter(int pid) {
266             mPids.put(pid, true);
267         }
268 
269         /**
270          * Returns whether the current pid filter is empty or not.
271          */
isFilterEmpty()272         public boolean isFilterEmpty() {
273             return mPids.size() == 0;
274         }
275 
276         /**
277          * Clears the current pid filter.
278          */
clearPidFilter()279         public void clearPidFilter() {
280             mPids.clear();
281         }
282 
283         /**
284          * Returns whether a pid is in the pid filter or not.
285          * @param pid the pid to check
286          */
isInFilter(int pid)287         public boolean isInFilter(int pid) {
288             return mPids.get(pid);
289         }
290 
291         /**
292          * Reads from the MPEG-2 TS file to buffer.
293          *
294          * @param inputBuffer to read
295          * @return the number of read bytes
296          */
read(byte[] inputBuffer)297         private int read(byte[] inputBuffer) {
298             int readSize = readInternal();
299             if (readSize <= 0) {
300                 // Reached the end of stream. Restart from the beginning.
301                 close();
302                 open(mFilepath);
303                 if (mInputStream == null) {
304                     return -1;
305                 }
306                 readSize = readInternal();
307             }
308 
309             if (mPreBuffer[0] != TS_SYNC_BYTE) {
310                 Log.e(TAG, "Error reading input stream - no TS sync found");
311                 return -1;
312             }
313             int filteredSize = 0;
314             for (int i = 0, destPos = 0; i < readSize; i += TS_PACKET_SIZE) {
315                 if (mPreBuffer[i] == TS_SYNC_BYTE) {
316                     int pid = ((mPreBuffer[i + 1] & 0x1f) << 8) + (mPreBuffer[i + 2] & 0xff);
317                     if (mPids.get(pid)) {
318                         System.arraycopy(mPreBuffer, i, inputBuffer, destPos, TS_PACKET_SIZE);
319                         destPos += TS_PACKET_SIZE;
320                         filteredSize += TS_PACKET_SIZE;
321                     }
322                 }
323             }
324             return filteredSize;
325         }
326 
readInternal()327         private int readInternal() {
328             int readSize;
329             try {
330                 readSize = mInputStream.read(mPreBuffer, 0, mPreBuffer.length);
331             } catch (IOException e) {
332                 Log.e(TAG, "Error reading input stream", e);
333                 return -1;
334             }
335             return readSize;
336         }
337 
close()338         private void close() {
339             try {
340                 mInputStream.close();
341             } catch (IOException e) {
342                 Log.e(TAG, "Error closing input stream:", e);
343             }
344             mInputStream = null;
345         }
346     }
347 
348     /**
349      * Reads data from internal buffer.
350      * @param pos the position to read from
351      * @param buffer to read
352      * @param offset start position of the read buffer
353      * @param amount number of bytes to read
354      * @return number of read bytes when successful, {@code -1} otherwise
355      * @throws IOException
356      */
readAt(long pos, byte[] buffer, int offset, int amount)357     public int readAt(long pos, byte[] buffer, int offset, int amount) throws IOException {
358         synchronized (mCircularBufferMonitor) {
359             long initialBytesFetched = mBytesFetched;
360             while (mBytesFetched < pos + amount && mStreaming) {
361                 try {
362                     mCircularBufferMonitor.wait(READ_TIMEOUT_MS);
363                 } catch (InterruptedException e) {
364                     // Wait again.
365                     Thread.currentThread().interrupt();
366                 }
367                 if (initialBytesFetched == mBytesFetched) {
368                     Log.w(TAG, "No data update for " + READ_TIMEOUT_MS + "ms. returning -1.");
369 
370                     // Returning -1 will make demux report EOS so that the input service can retry
371                     // the playback.
372                     return -1;
373                 }
374             }
375             if (!mStreaming) {
376                 Log.w(TAG, "Stream is already stopped.");
377                 return -1;
378             }
379             if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) {
380                 Log.e(TAG, "Demux is requesting the data which is already overwritten.");
381                 return -1;
382             }
383             int posInBuffer = (int) (pos % CIRCULAR_BUFFER_SIZE);
384             int bytesToCopyInFirstPass = amount;
385             if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) {
386                 bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer;
387             }
388             System.arraycopy(mCircularBuffer, posInBuffer, buffer, offset, bytesToCopyInFirstPass);
389             if (bytesToCopyInFirstPass < amount) {
390                 System.arraycopy(mCircularBuffer, 0, buffer, offset + bytesToCopyInFirstPass,
391                         amount - bytesToCopyInFirstPass);
392             }
393             mLastReadPosition = pos + amount;
394             mCircularBufferMonitor.notify();
395             return amount;
396         }
397     }
398 
399     /**
400      * Adds {@link ScanChannel} instance for local files.
401      *
402      * @param output a list of channels where the results will be placed in
403      */
addLocalStreamFiles(List<ScanChannel> output)404     public static void addLocalStreamFiles(List<ScanChannel> output) {
405         File dir = new File(FILE_DIR);
406         if (!dir.exists()) return;
407 
408         File[] tsFiles = dir.listFiles();
409         if (tsFiles == null) return;
410         int freq = FileTsStreamer.FREQ_BASE;
411         for (File file : tsFiles) {
412             if (!file.isFile()) continue;
413             output.add(ScanChannel.forFile(freq, file.getName()));
414             freq += 100;
415         }
416     }
417 
418     /**
419      * A thread managing a circular buffer that holds stream data to be consumed by player.
420      * Keeps reading data in from a {@link StreamProvider} to hold enough amount for buffering.
421      * Started and stopped by {@link #startStream()} and {@link #stopStream()}, respectively.
422      */
423     private class StreamingThread extends Thread {
424         @Override
run()425         public void run() {
426             byte[] dataBuffer = new byte[READ_BUFFER_SIZE];
427 
428             synchronized (mCircularBufferMonitor) {
429                 mBytesFetched = 0;
430                 mLastReadPosition = 0;
431             }
432 
433             while (true) {
434                 synchronized (mCircularBufferMonitor) {
435                     while ((mBytesFetched - mLastReadPosition + PADDING_SIZE) > CIRCULAR_BUFFER_SIZE
436                             && mStreaming) {
437                         try {
438                             mCircularBufferMonitor.wait();
439                         } catch (InterruptedException e) {
440                             // Wait again.
441                             Thread.currentThread().interrupt();
442                         }
443                     }
444                     if (!mStreaming) {
445                         break;
446                     }
447                 }
448 
449                 int bytesWritten = mSource.read(dataBuffer);
450                 if (bytesWritten <= 0) {
451                     try {
452                         // When buffer is underrun, we sleep for short time to prevent
453                         // unnecessary CPU draining.
454                         sleep(BUFFER_UNDERRUN_SLEEP_MS);
455                     } catch (InterruptedException e) {
456                         Thread.currentThread().interrupt();
457                     }
458                     continue;
459                 }
460 
461                 mEventDetector.feedTSStream(dataBuffer, 0, bytesWritten);
462 
463                 synchronized (mCircularBufferMonitor) {
464                     int posInBuffer = (int) (mBytesFetched % CIRCULAR_BUFFER_SIZE);
465                     int bytesToCopyInFirstPass = bytesWritten;
466                     if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) {
467                         bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer;
468                     }
469                     System.arraycopy(dataBuffer, 0, mCircularBuffer, posInBuffer,
470                             bytesToCopyInFirstPass);
471                     if (bytesToCopyInFirstPass < bytesWritten) {
472                         System.arraycopy(dataBuffer, bytesToCopyInFirstPass, mCircularBuffer, 0,
473                                 bytesWritten - bytesToCopyInFirstPass);
474                     }
475                     mBytesFetched += bytesWritten;
476                     mCircularBufferMonitor.notify();
477                 }
478             }
479 
480             Log.i(TAG, "Streaming stopped");
481             mSource.close();
482         }
483     }
484 }
485