1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3# 4# Project ___| | | | _ \| | 5# / __| | | | |_) | | 6# | (__| |_| | _ <| |___ 7# \___|\___/|_| \_\_____| 8# 9# Copyright (C) 2017 - 2020, Daniel Stenberg, <daniel@haxx.se>, et al. 10# 11# This software is licensed as described in the file COPYING, which 12# you should have received as part of this distribution. The terms 13# are also available at https://curl.se/docs/copyright.html. 14# 15# You may opt to use, copy, modify, merge, publish, distribute and/or sell 16# copies of the Software, and permit persons to whom the Software is 17# furnished to do so, under the terms of the COPYING file. 18# 19# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY 20# KIND, either express or implied. 21# 22"""Server for testing SMB""" 23 24from __future__ import absolute_import, division, print_function 25# NOTE: the impacket configuration is not unicode_literals compatible! 26 27import argparse 28import logging 29import os 30import sys 31import tempfile 32 33# Import our curl test data helper 34from util import ClosingFileHandler, TestData 35 36if sys.version_info.major >= 3: 37 import configparser 38else: 39 import ConfigParser as configparser 40 41# impacket needs to be installed in the Python environment 42try: 43 import impacket 44except ImportError: 45 sys.stderr.write('Python package impacket needs to be installed!\n') 46 sys.stderr.write('Use pip or your package manager to install it.\n') 47 sys.exit(1) 48from impacket import smb as imp_smb 49from impacket import smbserver as imp_smbserver 50from impacket.nt_errors import (STATUS_ACCESS_DENIED, STATUS_NO_SUCH_FILE, 51 STATUS_SUCCESS) 52 53log = logging.getLogger(__name__) 54SERVER_MAGIC = "SERVER_MAGIC" 55TESTS_MAGIC = "TESTS_MAGIC" 56VERIFIED_REQ = "verifiedserver" 57VERIFIED_RSP = "WE ROOLZ: {pid}\n" 58 59 60def smbserver(options): 61 """Start up a TCP SMB server that serves forever 62 63 """ 64 if options.pidfile: 65 pid = os.getpid() 66 # see tests/server/util.c function write_pidfile 67 if os.name == "nt": 68 pid += 65536 69 with open(options.pidfile, "w") as f: 70 f.write(str(pid)) 71 72 # Here we write a mini config for the server 73 smb_config = configparser.ConfigParser() 74 smb_config.add_section("global") 75 smb_config.set("global", "server_name", "SERVICE") 76 smb_config.set("global", "server_os", "UNIX") 77 smb_config.set("global", "server_domain", "WORKGROUP") 78 smb_config.set("global", "log_file", "") 79 smb_config.set("global", "credentials_file", "") 80 81 # We need a share which allows us to test that the server is running 82 smb_config.add_section("SERVER") 83 smb_config.set("SERVER", "comment", "server function") 84 smb_config.set("SERVER", "read only", "yes") 85 smb_config.set("SERVER", "share type", "0") 86 smb_config.set("SERVER", "path", SERVER_MAGIC) 87 88 # Have a share for tests. These files will be autogenerated from the 89 # test input. 90 smb_config.add_section("TESTS") 91 smb_config.set("TESTS", "comment", "tests") 92 smb_config.set("TESTS", "read only", "yes") 93 smb_config.set("TESTS", "share type", "0") 94 smb_config.set("TESTS", "path", TESTS_MAGIC) 95 96 if not options.srcdir or not os.path.isdir(options.srcdir): 97 raise ScriptException("--srcdir is mandatory") 98 99 test_data_dir = os.path.join(options.srcdir, "data") 100 101 smb_server = TestSmbServer((options.host, options.port), 102 config_parser=smb_config, 103 test_data_directory=test_data_dir) 104 log.info("[SMB] setting up SMB server on port %s", options.port) 105 smb_server.processConfigFile() 106 smb_server.serve_forever() 107 return 0 108 109 110class TestSmbServer(imp_smbserver.SMBSERVER): 111 """ 112 Test server for SMB which subclasses the impacket SMBSERVER and provides 113 test functionality. 114 """ 115 116 def __init__(self, 117 address, 118 config_parser=None, 119 test_data_directory=None): 120 imp_smbserver.SMBSERVER.__init__(self, 121 address, 122 config_parser=config_parser) 123 124 # Set up a test data object so we can get test data later. 125 self.ctd = TestData(test_data_directory) 126 127 # Override smbComNtCreateAndX so we can pretend to have files which 128 # don't exist. 129 self.hookSmbCommand(imp_smb.SMB.SMB_COM_NT_CREATE_ANDX, 130 self.create_and_x) 131 132 def create_and_x(self, conn_id, smb_server, smb_command, recv_packet): 133 """ 134 Our version of smbComNtCreateAndX looks for special test files and 135 fools the rest of the framework into opening them as if they were 136 normal files. 137 """ 138 conn_data = smb_server.getConnectionData(conn_id) 139 140 # Wrap processing in a try block which allows us to throw SmbException 141 # to control the flow. 142 try: 143 ncax_parms = imp_smb.SMBNtCreateAndX_Parameters( 144 smb_command["Parameters"]) 145 146 path = self.get_share_path(conn_data, 147 ncax_parms["RootFid"], 148 recv_packet["Tid"]) 149 log.info("[SMB] Requested share path: %s", path) 150 151 disposition = ncax_parms["Disposition"] 152 log.debug("[SMB] Requested disposition: %s", disposition) 153 154 # Currently we only support reading files. 155 if disposition != imp_smb.FILE_OPEN: 156 raise SmbException(STATUS_ACCESS_DENIED, 157 "Only support reading files") 158 159 # Check to see if the path we were given is actually a 160 # magic path which needs generating on the fly. 161 if path not in [SERVER_MAGIC, TESTS_MAGIC]: 162 # Pass the command onto the original handler. 163 return imp_smbserver.SMBCommands.smbComNtCreateAndX(conn_id, 164 smb_server, 165 smb_command, 166 recv_packet) 167 168 flags2 = recv_packet["Flags2"] 169 ncax_data = imp_smb.SMBNtCreateAndX_Data(flags=flags2, 170 data=smb_command[ 171 "Data"]) 172 requested_file = imp_smbserver.decodeSMBString( 173 flags2, 174 ncax_data["FileName"]) 175 log.debug("[SMB] User requested file '%s'", requested_file) 176 177 if path == SERVER_MAGIC: 178 fid, full_path = self.get_server_path(requested_file) 179 else: 180 assert (path == TESTS_MAGIC) 181 fid, full_path = self.get_test_path(requested_file) 182 183 resp_parms = imp_smb.SMBNtCreateAndXResponse_Parameters() 184 resp_data = "" 185 186 # Simple way to generate a fid 187 if len(conn_data["OpenedFiles"]) == 0: 188 fakefid = 1 189 else: 190 fakefid = conn_data["OpenedFiles"].keys()[-1] + 1 191 resp_parms["Fid"] = fakefid 192 resp_parms["CreateAction"] = disposition 193 194 if os.path.isdir(path): 195 resp_parms[ 196 "FileAttributes"] = imp_smb.SMB_FILE_ATTRIBUTE_DIRECTORY 197 resp_parms["IsDirectory"] = 1 198 else: 199 resp_parms["IsDirectory"] = 0 200 resp_parms["FileAttributes"] = ncax_parms["FileAttributes"] 201 202 # Get this file's information 203 resp_info, error_code = imp_smbserver.queryPathInformation( 204 "", full_path, level=imp_smb.SMB_QUERY_FILE_ALL_INFO) 205 206 if error_code != STATUS_SUCCESS: 207 raise SmbException(error_code, "Failed to query path info") 208 209 resp_parms["CreateTime"] = resp_info["CreationTime"] 210 resp_parms["LastAccessTime"] = resp_info[ 211 "LastAccessTime"] 212 resp_parms["LastWriteTime"] = resp_info["LastWriteTime"] 213 resp_parms["LastChangeTime"] = resp_info[ 214 "LastChangeTime"] 215 resp_parms["FileAttributes"] = resp_info[ 216 "ExtFileAttributes"] 217 resp_parms["AllocationSize"] = resp_info[ 218 "AllocationSize"] 219 resp_parms["EndOfFile"] = resp_info["EndOfFile"] 220 221 # Let's store the fid for the connection 222 # smbServer.log("Create file %s, mode:0x%x" % (pathName, mode)) 223 conn_data["OpenedFiles"][fakefid] = {} 224 conn_data["OpenedFiles"][fakefid]["FileHandle"] = fid 225 conn_data["OpenedFiles"][fakefid]["FileName"] = path 226 conn_data["OpenedFiles"][fakefid]["DeleteOnClose"] = False 227 228 except SmbException as s: 229 log.debug("[SMB] SmbException hit: %s", s) 230 error_code = s.error_code 231 resp_parms = "" 232 resp_data = "" 233 234 resp_cmd = imp_smb.SMBCommand(imp_smb.SMB.SMB_COM_NT_CREATE_ANDX) 235 resp_cmd["Parameters"] = resp_parms 236 resp_cmd["Data"] = resp_data 237 smb_server.setConnectionData(conn_id, conn_data) 238 239 return [resp_cmd], None, error_code 240 241 def get_share_path(self, conn_data, root_fid, tid): 242 conn_shares = conn_data["ConnectedShares"] 243 244 if tid in conn_shares: 245 if root_fid > 0: 246 # If we have a rootFid, the path is relative to that fid 247 path = conn_data["OpenedFiles"][root_fid]["FileName"] 248 log.debug("RootFid present %s!" % path) 249 else: 250 if "path" in conn_shares[tid]: 251 path = conn_shares[tid]["path"] 252 else: 253 raise SmbException(STATUS_ACCESS_DENIED, 254 "Connection share had no path") 255 else: 256 raise SmbException(imp_smbserver.STATUS_SMB_BAD_TID, 257 "TID was invalid") 258 259 return path 260 261 def get_server_path(self, requested_filename): 262 log.debug("[SMB] Get server path '%s'", requested_filename) 263 264 if requested_filename not in [VERIFIED_REQ]: 265 raise SmbException(STATUS_NO_SUCH_FILE, "Couldn't find the file") 266 267 fid, filename = tempfile.mkstemp() 268 log.debug("[SMB] Created %s (%d) for storing '%s'", 269 filename, fid, requested_filename) 270 271 contents = "" 272 273 if requested_filename == VERIFIED_REQ: 274 log.debug("[SMB] Verifying server is alive") 275 pid = os.getpid() 276 # see tests/server/util.c function write_pidfile 277 if os.name == "nt": 278 pid += 65536 279 contents = VERIFIED_RSP.format(pid=pid).encode('utf-8') 280 281 self.write_to_fid(fid, contents) 282 return fid, filename 283 284 def write_to_fid(self, fid, contents): 285 # Write the contents to file descriptor 286 os.write(fid, contents) 287 os.fsync(fid) 288 289 # Rewind the file to the beginning so a read gets us the contents 290 os.lseek(fid, 0, os.SEEK_SET) 291 292 def get_test_path(self, requested_filename): 293 log.info("[SMB] Get reply data from 'test%s'", requested_filename) 294 295 fid, filename = tempfile.mkstemp() 296 log.debug("[SMB] Created %s (%d) for storing test '%s'", 297 filename, fid, requested_filename) 298 299 try: 300 contents = self.ctd.get_test_data(requested_filename).encode('utf-8') 301 self.write_to_fid(fid, contents) 302 return fid, filename 303 304 except Exception: 305 log.exception("Failed to make test file") 306 raise SmbException(STATUS_NO_SUCH_FILE, "Failed to make test file") 307 308 309class SmbException(Exception): 310 def __init__(self, error_code, error_message): 311 super(SmbException, self).__init__(error_message) 312 self.error_code = error_code 313 314 315class ScriptRC(object): 316 """Enum for script return codes""" 317 SUCCESS = 0 318 FAILURE = 1 319 EXCEPTION = 2 320 321 322class ScriptException(Exception): 323 pass 324 325 326def get_options(): 327 parser = argparse.ArgumentParser() 328 329 parser.add_argument("--port", action="store", default=9017, 330 type=int, help="port to listen on") 331 parser.add_argument("--host", action="store", default="127.0.0.1", 332 help="host to listen on") 333 parser.add_argument("--verbose", action="store", type=int, default=0, 334 help="verbose output") 335 parser.add_argument("--pidfile", action="store", 336 help="file name for the PID") 337 parser.add_argument("--logfile", action="store", 338 help="file name for the log") 339 parser.add_argument("--srcdir", action="store", help="test directory") 340 parser.add_argument("--id", action="store", help="server ID") 341 parser.add_argument("--ipv4", action="store_true", default=0, 342 help="IPv4 flag") 343 344 return parser.parse_args() 345 346 347def setup_logging(options): 348 """ 349 Set up logging from the command line options 350 """ 351 root_logger = logging.getLogger() 352 add_stdout = False 353 354 formatter = logging.Formatter("%(asctime)s %(levelname)-5.5s %(message)s") 355 356 # Write out to a logfile 357 if options.logfile: 358 handler = ClosingFileHandler(options.logfile) 359 handler.setFormatter(formatter) 360 handler.setLevel(logging.DEBUG) 361 root_logger.addHandler(handler) 362 else: 363 # The logfile wasn't specified. Add a stdout logger. 364 add_stdout = True 365 366 if options.verbose: 367 # Add a stdout logger as well in verbose mode 368 root_logger.setLevel(logging.DEBUG) 369 add_stdout = True 370 else: 371 root_logger.setLevel(logging.INFO) 372 373 if add_stdout: 374 stdout_handler = logging.StreamHandler(sys.stdout) 375 stdout_handler.setFormatter(formatter) 376 stdout_handler.setLevel(logging.DEBUG) 377 root_logger.addHandler(stdout_handler) 378 379 380if __name__ == '__main__': 381 # Get the options from the user. 382 options = get_options() 383 384 # Setup logging using the user options 385 setup_logging(options) 386 387 # Run main script. 388 try: 389 rc = smbserver(options) 390 except Exception as e: 391 log.exception(e) 392 rc = ScriptRC.EXCEPTION 393 394 log.info("[SMB] Returning %d", rc) 395 sys.exit(rc) 396