1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# Project ___| | | | _ \| | 5# / __| | | | |_) | | 6# | (__| |_| | _ <| |___ 7# \___|\___/|_| \_\_____| 8# 9# Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al. 10# 11# This software is licensed as described in the file COPYING, which 12# you should have received as part of this distribution. The terms 13# are also available at https://curl.se/docs/copyright.html. 14# 15# You may opt to use, copy, modify, merge, publish, distribute and/or sell 16# copies of the Software, and permit persons to whom the Software is 17# furnished to do so, under the terms of the COPYING file. 18# 19# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY 20# KIND, either express or implied. 21# 22# SPDX-License-Identifier: curl 23# 24"""Module for extracting test data from the test data folder and other utils""" 25 26from __future__ import (absolute_import, division, print_function, 27 unicode_literals) 28 29import logging 30import os 31import re 32 33log = logging.getLogger(__name__) 34 35 36REPLY_DATA = re.compile("<reply>[ \t\n\r]*<data[^<]*>(.*?)</data>", re.MULTILINE | re.DOTALL) 37 38 39class ClosingFileHandler(logging.StreamHandler): 40 def __init__(self, filename): 41 super(ClosingFileHandler, self).__init__() 42 self.filename = os.path.abspath(filename) 43 self.setStream(None) 44 45 def emit(self, record): 46 with open(self.filename, "a") as fp: 47 self.setStream(fp) 48 super(ClosingFileHandler, self).emit(record) 49 self.setStream(None) 50 51 def setStream(self, stream): 52 setStream = getattr(super(ClosingFileHandler, self), 'setStream', None) 53 if callable(setStream): 54 return setStream(stream) 55 if stream is self.stream: 56 result = None 57 else: 58 result = self.stream 59 self.acquire() 60 try: 61 self.flush() 62 self.stream = stream 63 finally: 64 self.release() 65 return result 66 67class TestData(object): 68 def __init__(self, data_folder): 69 self.data_folder = data_folder 70 71 def get_test_data(self, test_number): 72 # Create the test file name 73 filename = os.path.join(self.data_folder, 74 "test{0}".format(test_number)) 75 76 log.debug("Parsing file %s", filename) 77 78 with open(filename, "rb") as f: 79 contents = f.read().decode("utf-8") 80 81 m = REPLY_DATA.search(contents) 82 if not m: 83 raise Exception("Couldn't find a <reply><data> section") 84 85 # Left-strip the data so we don't get a newline before our data. 86 return m.group(1).lstrip() 87 88 89if __name__ == '__main__': 90 td = TestData("./data") 91 data = td.get_test_data(1) 92 print(data) 93