• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2     tests/test_iostream.cpp -- Usage of scoped_output_redirect
3 
4     Copyright (c) 2017 Henry F. Schreiner
5 
6     All rights reserved. Use of this source code is governed by a
7     BSD-style license that can be found in the LICENSE file.
8 */
9 
10 #if defined(_MSC_VER) && _MSC_VER < 1910  // VS 2015's MSVC
11 #  pragma warning(disable: 4702) // unreachable code in system header (xatomic.h(382))
12 #endif
13 
14 #include <pybind11/iostream.h>
15 #include "pybind11_tests.h"
16 #include <atomic>
17 #include <iostream>
18 #include <thread>
19 
20 
noisy_function(std::string msg,bool flush)21 void noisy_function(std::string msg, bool flush) {
22 
23     std::cout << msg;
24     if (flush)
25         std::cout << std::flush;
26 }
27 
noisy_funct_dual(std::string msg,std::string emsg)28 void noisy_funct_dual(std::string msg, std::string emsg) {
29     std::cout << msg;
30     std::cerr << emsg;
31 }
32 
33 // object to manage C++ thread
34 // simply repeatedly write to std::cerr until stopped
35 // redirect is called at some point to test the safety of scoped_estream_redirect
36 struct TestThread {
TestThreadTestThread37     TestThread() : t_{nullptr}, stop_{false} {
__anon8cf8c8760102null38         auto thread_f = [this] {
39             while (!stop_) {
40                 std::cout << "x" << std::flush;
41                 std::this_thread::sleep_for(std::chrono::microseconds(50));
42             } };
43         t_ = new std::thread(std::move(thread_f));
44     }
45 
~TestThreadTestThread46     ~TestThread() {
47         delete t_;
48     }
49 
stopTestThread50     void stop() { stop_ = true; }
51 
joinTestThread52     void join() {
53         py::gil_scoped_release gil_lock;
54         t_->join();
55     }
56 
sleepTestThread57     void sleep() {
58         py::gil_scoped_release gil_lock;
59         std::this_thread::sleep_for(std::chrono::milliseconds(50));
60     }
61 
62     std::thread * t_;
63     std::atomic<bool> stop_;
64 };
65 
66 
TEST_SUBMODULE(iostream,m)67 TEST_SUBMODULE(iostream, m) {
68 
69     add_ostream_redirect(m);
70 
71     // test_evals
72 
73     m.def("captured_output_default", [](std::string msg) {
74         py::scoped_ostream_redirect redir;
75         std::cout << msg << std::flush;
76     });
77 
78     m.def("captured_output", [](std::string msg) {
79         py::scoped_ostream_redirect redir(std::cout, py::module_::import("sys").attr("stdout"));
80         std::cout << msg << std::flush;
81     });
82 
83     m.def("guard_output", &noisy_function,
84             py::call_guard<py::scoped_ostream_redirect>(),
85             py::arg("msg"), py::arg("flush")=true);
86 
87     m.def("captured_err", [](std::string msg) {
88         py::scoped_ostream_redirect redir(std::cerr, py::module_::import("sys").attr("stderr"));
89         std::cerr << msg << std::flush;
90     });
91 
92     m.def("noisy_function", &noisy_function, py::arg("msg"), py::arg("flush") = true);
93 
94     m.def("dual_guard", &noisy_funct_dual,
95             py::call_guard<py::scoped_ostream_redirect, py::scoped_estream_redirect>(),
96             py::arg("msg"), py::arg("emsg"));
97 
98     m.def("raw_output", [](std::string msg) {
99         std::cout << msg << std::flush;
100     });
101 
102     m.def("raw_err", [](std::string msg) {
103         std::cerr << msg << std::flush;
104     });
105 
106     m.def("captured_dual", [](std::string msg, std::string emsg) {
107         py::scoped_ostream_redirect redirout(std::cout, py::module_::import("sys").attr("stdout"));
108         py::scoped_ostream_redirect redirerr(std::cerr, py::module_::import("sys").attr("stderr"));
109         std::cout << msg << std::flush;
110         std::cerr << emsg << std::flush;
111     });
112 
113     py::class_<TestThread>(m, "TestThread")
114         .def(py::init<>())
115         .def("stop", &TestThread::stop)
116         .def("join", &TestThread::join)
117         .def("sleep", &TestThread::sleep);
118 }
119