/*
 * Copyright (C) 2015 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.android.usbtuner;

import android.media.MediaDataSource;
import android.os.Environment;
import android.util.Log;
import android.util.SparseBooleanArray;

import com.android.usbtuner.ChannelScanFileParser.ScanChannel;
import com.android.usbtuner.data.Channel;
import com.android.usbtuner.data.TunerChannel;
import com.android.usbtuner.ts.TsParser;
import com.android.usbtuner.tvinput.EventDetector;
import com.android.usbtuner.tvinput.FileSourceEventDetector;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.List;

/**
 * A {@link DataSource} implementation which provides the MPEG-2 TS stream from a local file
 * generated by capturing TV signal.
 */
public class FileDataSource extends MediaDataSource implements InputStreamSource {
    private static final String TAG = "FileDataSource";

    private static final int TS_PACKET_SIZE = 188;
    private static final int TS_SYNC_BYTE = 0x47;
    private static final int MIN_READ_UNIT = TS_PACKET_SIZE * 10;
    private static final int READ_BUFFER_SIZE = MIN_READ_UNIT * 10; // ~20KB
    private static final int CIRCULAR_BUFFER_SIZE = MIN_READ_UNIT * 4000; // ~ 8MB
    private static final int PADDING_SIZE = MIN_READ_UNIT * 1000; // ~2MB
    private static final int READ_TIMEOUT_MS = 10000; // 10 secs.
    private static final int BUFFER_UNDERRUN_SLEEP_MS = 10;
    private static final String FILE_DIR =
            new File(Environment.getExternalStorageDirectory(), "Streams").getAbsolutePath();

    // Virtual frequency base used for file-based source
    public static final int FREQ_BASE = 100;

    private final Object mCircularBufferMonitor = new Object();
    private final byte[] mCircularBuffer = new byte[CIRCULAR_BUFFER_SIZE];
    private final FileSourceEventDetector mEventDetector;

    private long mBytesFetched;
    private long mLastReadPosition;
    private boolean mStreaming;

    private Thread mStreamingThread;
    private StreamProvider mSource;

    public FileDataSource(EventDetector.EventListener eventListener) {
        mEventDetector = new FileSourceEventDetector(eventListener);
    }

    @Override
    public boolean setScanChannel(ScanChannel channel) {
        String filepath = new File(FILE_DIR, channel.filename).getAbsolutePath();
        mSource = new StreamProvider(filepath);
        if (mSource.isReady()) {
            mEventDetector.start(mSource);
            return true;
        }
        return false;
    }

    /**
     * Sets the channel required to start streaming from this device. Afterwards, prepares
     * the tuner device for streaming. Package retrieval can be made at any time after invoking
     * this method and before stopping the stream.
     *
     * @param channel a {@link TunerChannel} instance tune to
     * @return {@code true} if the entire operation was successful; {@code false} otherwise
     */
    @Override
    public boolean tuneToChannel(TunerChannel channel) {
        Log.i(TAG, "tuneToChannel with: " + channel.getFilepath());
        mSource = new StreamProvider(channel.getFilepath());
        if (!mSource.isReady()) {
            return false;
        }
        mEventDetector.start(mSource);
        mSource.addPidFilter(channel.getVideoPid());
        mSource.addPidFilter(channel.getAudioPid());
        mSource.addPidFilter(channel.getPcrPid());
        return true;
    }

    /**
     * Starts streaming data.
     */
    @Override
    public void startStream() {
        mSource.addPidFilter(TsParser.ATSC_SI_BASE_PID);
        mSource.addPidFilter(TsParser.PAT_PID);
        synchronized (mCircularBufferMonitor) {
            if (mStreaming) {
                return;
            }
            mStreaming = true;
        }

        mStreamingThread = new StreamingThread();
        mStreamingThread.start();
        Log.i(TAG, "Streaming started");
    }

    /**
     * Blocks the current thread until the streaming thread stops. In rare cases when the tuner
     * device is overloaded this can take a while, but usually it returns pretty quickly.
     */
    @Override
    public void stopStream() {
        synchronized (mCircularBufferMonitor) {
            mStreaming = false;
            mCircularBufferMonitor.notify();
        }

        try {
            if (mStreamingThread != null) {
                mStreamingThread.join();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public long getLimit() {
        synchronized (mCircularBufferMonitor) {
            return mBytesFetched;
        }
    }

    @Override
    public long getPosition() {
        synchronized (mCircularBufferMonitor) {
            return mLastReadPosition;
        }
    }

    /**
     * Provides MPEG-2 transport stream from a local file. Stream can be filtered by PID.
     */
    public static class StreamProvider {
        private final String mFilepath;
        private final SparseBooleanArray mPids = new SparseBooleanArray();
        private final byte[] mPreBuffer = new byte[READ_BUFFER_SIZE];

        private BufferedInputStream mInputStream;

        private StreamProvider(String filepath) {
            mFilepath = filepath;
            open(filepath);
        }

        private void open(String filepath) {
            try {
                mInputStream = new BufferedInputStream(new FileInputStream(filepath));
            } catch (IOException e) {
                Log.e(TAG, "Error opening input stream", e);
                mInputStream = null;
            }
        }

        private boolean isReady() {
            return mInputStream != null;
        }

        public String getFilepath() {
            return mFilepath;
        }

        public void addPidFilter(int pid) {
            mPids.put(pid, true);
        }

        public boolean isFilterEmpty() {
            return mPids.size() > 0;
        }

        public void clearPidFilter() {
            mPids.clear();
        }

        public boolean isInFilter(int pid) {
            return mPids.get(pid);
        }

        private int read(byte[] inputBuffer) {
            int readSize = readInternal();
            if (readSize <= 0) {
                // Reached the end of stream. Restart from the beginning.
                close();
                open(mFilepath);
                if (mInputStream == null) {
                    return -1;
                }
                readSize = readInternal();
            }

            if (mPreBuffer[0] != TS_SYNC_BYTE) {
                Log.e(TAG, "Error reading input stream - no TS sync found");
                return -1;
            }
            int filteredSize = 0;
            for (int i = 0, destPos = 0; i < readSize; i += TS_PACKET_SIZE) {
                if (mPreBuffer[i] == TS_SYNC_BYTE) {
                    int pid = ((mPreBuffer[i + 1] & 0x1f) << 8) + (mPreBuffer[i + 2] & 0xff);
                    if (mPids.get(pid)) {
                        System.arraycopy(mPreBuffer, i, inputBuffer, destPos, TS_PACKET_SIZE);
                        destPos += TS_PACKET_SIZE;
                        filteredSize += TS_PACKET_SIZE;
                    }
                }
            }
            return filteredSize;
        }

        private int readInternal() {
            int readSize;
            try {
                readSize = mInputStream.read(mPreBuffer, 0, mPreBuffer.length);
            } catch (IOException e) {
                Log.e(TAG, "Error reading input stream", e);
                return -1;
            }
            return readSize;
        }

        private void close() {
            try {
                mInputStream.close();
            } catch (IOException e) {
                Log.e(TAG, "Error closing input stream:", e);
            }
            mInputStream = null;
        }
    }

    @Override
    public int readAt(long pos, byte[] buffer, int offset, int amount) throws IOException {
        synchronized (mCircularBufferMonitor) {
            long initialBytesFetched = mBytesFetched;
            while (mBytesFetched < pos + amount && mStreaming) {
                try {
                    mCircularBufferMonitor.wait(READ_TIMEOUT_MS);
                } catch (InterruptedException e) {
                    // Wait again.
                    Thread.currentThread().interrupt();
                }
                if (initialBytesFetched == mBytesFetched) {
                    Log.w(TAG, "No data update for " + READ_TIMEOUT_MS + "ms. returning -1.");

                    // Returning -1 will make demux report EOS so that the input service can retry
                    // the playback.
                    return -1;
                }
            }
            if (!mStreaming) {
                Log.w(TAG, "Stream is already stopped.");
                return -1;
            }
            if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) {
                Log.e(TAG, "Demux is requesting the data which is already overwritten.");
                return -1;
            }
            int posInBuffer = (int) (pos % CIRCULAR_BUFFER_SIZE);
            int bytesToCopyInFirstPass = amount;
            if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) {
                bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer;
            }
            System.arraycopy(mCircularBuffer, posInBuffer, buffer, offset, bytesToCopyInFirstPass);
            if (bytesToCopyInFirstPass < amount) {
                System.arraycopy(mCircularBuffer, 0, buffer, offset + bytesToCopyInFirstPass,
                        amount - bytesToCopyInFirstPass);
            }
            mLastReadPosition = pos + amount;
            mCircularBufferMonitor.notify();
            return amount;
        }
    }

    @Override
    public long getSize() throws IOException {
        return -1;
    }

    @Override
    public void close() {}

    @Override
    public int getType() {
        return Channel.TYPE_FILE;
    }

    /**
     * Adds {@link ScanChannel} instance for local files.
     *
     * @param output a list of channels where the results will be placed in
     */
    public static void addLocalStreamFiles(List<ScanChannel> output) {
        File dir = new File(FILE_DIR);
        if (!dir.exists()) return;

        File[] tsFiles = dir.listFiles();
        if (tsFiles == null) return;
        int freq = FileDataSource.FREQ_BASE;
        for (File file : tsFiles) {
            if (!file.isFile()) continue;
            output.add(ScanChannel.forFile(freq, file.getName()));
            freq += 100;
        }
    }

    /**
     * A thread managing a circular buffer that holds stream data to be consumed by player.
     * Keeps reading data in from a {@link StreamProvider} to hold enough amount for buffering.
     * Started and stopped by {@link #startStream()} and {@link #stopStream()}, respectively.
     */
    private class StreamingThread extends Thread {
        @Override
        public void run() {
            byte[] dataBuffer = new byte[READ_BUFFER_SIZE];

            synchronized (mCircularBufferMonitor) {
                mBytesFetched = 0;
                mLastReadPosition = 0;
            }

            while (true) {
                synchronized (mCircularBufferMonitor) {
                    while ((mBytesFetched - mLastReadPosition + PADDING_SIZE) > CIRCULAR_BUFFER_SIZE
                            && mStreaming) {
                        try {
                            mCircularBufferMonitor.wait();
                        } catch (InterruptedException e) {
                            // Wait again.
                            Thread.currentThread().interrupt();
                        }
                    }
                    if (!mStreaming) {
                        break;
                    }
                }

                int bytesWritten = mSource.read(dataBuffer);
                if (bytesWritten <= 0) {
                    try {
                        // When buffer is underrun, we sleep for short time to prevent
                        // unnecessary CPU draining.
                        sleep(BUFFER_UNDERRUN_SLEEP_MS);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    continue;
                }

                mEventDetector.feedTSStream(dataBuffer, 0, bytesWritten);

                synchronized (mCircularBufferMonitor) {
                    int posInBuffer = (int) (mBytesFetched % CIRCULAR_BUFFER_SIZE);
                    int bytesToCopyInFirstPass = bytesWritten;
                    if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) {
                        bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer;
                    }
                    System.arraycopy(dataBuffer, 0, mCircularBuffer, posInBuffer,
                            bytesToCopyInFirstPass);
                    if (bytesToCopyInFirstPass < bytesWritten) {
                        System.arraycopy(dataBuffer, bytesToCopyInFirstPass, mCircularBuffer, 0,
                                bytesWritten - bytesToCopyInFirstPass);
                    }
                    mBytesFetched += bytesWritten;
                    mCircularBufferMonitor.notify();
                }
            }

            Log.i(TAG, "Streaming stopped");
            mSource.close();
        }
    }
}
