/*
 * Copyright (C) 2014 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "LogReaderThread.h"

#include <errno.h>
#include <string.h>
#include <sys/prctl.h>

#include <thread>

#include "LogBuffer.h"
#include "LogReaderList.h"
#include "SerializedFlushToState.h"

LogReaderThread::LogReaderThread(LogBuffer* log_buffer, LogReaderList* reader_list,
                                 std::unique_ptr<LogWriter> writer, bool non_block,
                                 unsigned long tail, LogMask log_mask, pid_t pid,
                                 log_time start_time, uint64_t start,
                                 std::chrono::steady_clock::time_point deadline)
    : log_buffer_(log_buffer),
      reader_list_(reader_list),
      writer_(std::move(writer)),
      pid_(pid),
      tail_(tail),
      count_(0),
      index_(0),
      start_time_(start_time),
      deadline_(deadline),
      non_block_(non_block) {
    CleanSkip();
    flush_to_state_ = log_buffer_->CreateFlushToState(start, log_mask);
}

void LogReaderThread::Run() {
    auto thread = std::thread{&LogReaderThread::ThreadFunction, this};
    thread.detach();
}

void LogReaderThread::ThreadFunction() {
    prctl(PR_SET_NAME, "logd.reader.per");

    auto lock = std::unique_lock{logd_lock};
    auto lock_assertion = android::base::ScopedLockAssertion{logd_lock};

    while (!release_) {
        if (deadline_.time_since_epoch().count() != 0) {
            if (thread_triggered_condition_.wait_until(lock, deadline_) ==
                std::cv_status::timeout) {
                deadline_ = {};
            }
            if (release_) {
                break;
            }
        }

        if (tail_) {
            auto first_pass_state = log_buffer_->CreateFlushToState(flush_to_state_->start(),
                                                                    flush_to_state_->log_mask());
            log_buffer_->FlushTo(writer_.get(), *first_pass_state,
                                 [this](log_id_t log_id, pid_t pid, uint64_t sequence,
                                        log_time realtime) REQUIRES(logd_lock) {
                                     return FilterFirstPass(log_id, pid, sequence, realtime);
                                 });
        }
        bool flush_success = log_buffer_->FlushTo(
                writer_.get(), *flush_to_state_,
                [this](log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime) REQUIRES(
                        logd_lock) { return FilterSecondPass(log_id, pid, sequence, realtime); });

        // We only ignore entries before the original start time for the first flushTo(), if we
        // get entries after this first flush before the original start time, then the client
        // wouldn't have seen them.
        // Note: this is still racy and may skip out of order events that came in since the last
        // time the client disconnected and then reconnected with the new start time.  The long term
        // solution here is that clients must request events since a specific sequence number.
        start_time_.tv_sec = 0;
        start_time_.tv_nsec = 0;

        if (!flush_success) {
            break;
        }

        if (non_block_ || release_) {
            break;
        }

        CleanSkip();

        if (deadline_.time_since_epoch().count() == 0) {
            thread_triggered_condition_.wait(lock);
        }
    }

    writer_->Release();
    reader_list_->RemoveRunningThread(this);
}

// A first pass to count the number of elements
FilterResult LogReaderThread::FilterFirstPass(log_id_t, pid_t pid, uint64_t, log_time realtime) {
    if ((!pid_ || pid_ == pid) && (start_time_ == log_time::EPOCH || start_time_ <= realtime)) {
        ++count_;
    }

    return FilterResult::kSkip;
}

// A second pass to send the selected elements
FilterResult LogReaderThread::FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t,
                                               log_time realtime) {
    if (skip_ahead_[log_id]) {
        skip_ahead_[log_id]--;
        return FilterResult::kSkip;
    }

    // Truncate to close race between first and second pass
    if (non_block_ && tail_ && index_ >= count_) {
        return FilterResult::kStop;
    }

    if (pid_ && pid_ != pid) {
        return FilterResult::kSkip;
    }

    if (start_time_ != log_time::EPOCH && realtime <= start_time_) {
        return FilterResult::kSkip;
    }

    if (release_) {
        return FilterResult::kStop;
    }

    if (!tail_) {
        goto ok;
    }

    ++index_;

    if (count_ > tail_ && index_ <= (count_ - tail_)) {
        return FilterResult::kSkip;
    }

    if (!non_block_) {
        tail_ = 0;
    }

ok:
    if (!skip_ahead_[log_id]) {
        return FilterResult::kWrite;
    }
    return FilterResult::kSkip;
}