""" Программа для проверки открытых портов. Как работает: 1. Python запускает локальный веб-сервер. 2. В браузере открывается HTML-страница с формой. 3. Пользователь выбирает хост, диапазон портов, таймаут и количество потоков. 4. Python пробует подключиться к каждому порту через TCP. 5. На странице показываются только открытые порты и примерное имя сервиса. Важно: проверяй только свои устройства, локальный компьютер или сеть, для которой у тебя есть разрешение. """ from concurrent.futures import ThreadPoolExecutor, as_completed from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from socket import AF_INET, SOCK_STREAM, socket from urllib.parse import parse_qs, urlparse import html import json import socket as socket_module import webbrowser DEFAULT_HOST = "127.0.0.1" DEFAULT_START_PORT = 1 DEFAULT_END_PORT = 65535 DEFAULT_TIMEOUT = 0.25 DEFAULT_WORKERS = 300 PAGE = """ Проверка портов

Проверка портов

Пояснение

Эта программа запускает локальную веб-страницу и проверяет, какие TCP-порты открыты на выбранном хосте.

  1. Вводится адрес хоста, например 127.0.0.1.
  2. Задается диапазон портов от 1 до 65535.
  3. Python пробует подключиться к каждому порту.
  4. Если подключение получилось, порт считается открытым и появляется в таблице.
Готово Открытых портов: 0 Время: 0 c
Запусти проверку, и здесь появятся открытые порты.
""" def clamp_int(value, default, minimum, maximum): try: number = int(value) except (TypeError, ValueError): return default return max(minimum, min(maximum, number)) def clamp_float(value, default, minimum, maximum): try: number = float(value) except (TypeError, ValueError): return default return max(minimum, min(maximum, number)) def service_name(port): try: return socket_module.getservbyport(port) except OSError: return "" def check_port(host, port, timeout): with socket(AF_INET, SOCK_STREAM) as sock: sock.settimeout(timeout) result = sock.connect_ex((host, port)) if result == 0: return {"port": port, "service": service_name(port)} return None def scan_ports(host, start_port, end_port, timeout, workers): open_ports = [] ports = range(start_port, end_port + 1) with ThreadPoolExecutor(max_workers=workers) as executor: futures = [executor.submit(check_port, host, port, timeout) for port in ports] for future in as_completed(futures): result = future.result() if result: open_ports.append(result) return sorted(open_ports, key=lambda item: item["port"]) class PortScannerHandler(BaseHTTPRequestHandler): def do_GET(self): parsed_url = urlparse(self.path) if parsed_url.path == "/": self.send_html(PAGE) return if parsed_url.path == "/scan": self.handle_scan(parsed_url.query) return self.send_error(404, "Страница не найдена") def handle_scan(self, query): params = parse_qs(query) host = params.get("host", [DEFAULT_HOST])[0].strip() or DEFAULT_HOST host = html.escape(host, quote=True) start_port = clamp_int(params.get("start", [DEFAULT_START_PORT])[0], DEFAULT_START_PORT, 1, 65535) end_port = clamp_int(params.get("end", [DEFAULT_END_PORT])[0], DEFAULT_END_PORT, 1, 65535) timeout = clamp_float(params.get("timeout", [DEFAULT_TIMEOUT])[0], DEFAULT_TIMEOUT, 0.05, 5.0) workers = clamp_int(params.get("workers", [DEFAULT_WORKERS])[0], DEFAULT_WORKERS, 1, 1000) if start_port > end_port: start_port, end_port = end_port, start_port try: socket_module.gethostbyname(host) except OSError: self.send_json({"error": "Не удалось найти такой хост."}, status=400) return from time import perf_counter started = perf_counter() try: open_ports = scan_ports(host, start_port, end_port, timeout, workers) except OSError as error: self.send_json({"error": str(error)}, status=400) return elapsed = round(perf_counter() - started, 2) self.send_json( { "host": host, "start_port": start_port, "end_port": end_port, "timeout": timeout, "workers": workers, "elapsed_seconds": elapsed, "open_ports": open_ports, } ) def send_html(self, content): data = content.encode("utf-8") self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(data))) self.end_headers() self.wfile.write(data) def send_json(self, content, status=200): data = json.dumps(content, ensure_ascii=False).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(data))) self.end_headers() self.wfile.write(data) def log_message(self, format, *args): return def find_free_port(start=8000, end=8100): for port in range(start, end + 1): try: server = ThreadingHTTPServer(("127.0.0.1", port), PortScannerHandler) return server, port except OSError: continue raise OSError("Не удалось найти свободный порт для веб-страницы.") if __name__ == "__main__": httpd, port = find_free_port() url = f"http://127.0.0.1:{port}" print(f"Открываю страницу: {url}") print("Закрыть сервер: Ctrl+C") webbrowser.open(url) httpd.serve_forever()