"""
Программа для проверки открытых портов.
Как работает:
1. Python запускает локальный веб-сервер.
2. В браузере открывается HTML-страница с формой.
3. Пользователь выбирает хост, диапазон портов, таймаут и количество потоков.
4. Python пробует подключиться к каждому порту через TCP.
5. На странице показываются только открытые порты и примерное имя сервиса.
Важно: проверяй только свои устройства, локальный компьютер или сеть,
для которой у тебя есть разрешение.
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from socket import AF_INET, SOCK_STREAM, socket
from urllib.parse import parse_qs, urlparse
import html
import json
import socket as socket_module
import webbrowser
DEFAULT_HOST = "127.0.0.1"
DEFAULT_START_PORT = 1
DEFAULT_END_PORT = 65535
DEFAULT_TIMEOUT = 0.25
DEFAULT_WORKERS = 300
PAGE = """
Проверка портов
Проверка портов
Пояснение
Эта программа запускает локальную веб-страницу и проверяет, какие TCP-порты открыты на выбранном хосте.
- Вводится адрес хоста, например 127.0.0.1.
- Задается диапазон портов от 1 до 65535.
- Python пробует подключиться к каждому порту.
- Если подключение получилось, порт считается открытым и появляется в таблице.
Готово
Открытых портов: 0
Время: 0 c
Запусти проверку, и здесь появятся открытые порты.
"""
def clamp_int(value, default, minimum, maximum):
try:
number = int(value)
except (TypeError, ValueError):
return default
return max(minimum, min(maximum, number))
def clamp_float(value, default, minimum, maximum):
try:
number = float(value)
except (TypeError, ValueError):
return default
return max(minimum, min(maximum, number))
def service_name(port):
try:
return socket_module.getservbyport(port)
except OSError:
return ""
def check_port(host, port, timeout):
with socket(AF_INET, SOCK_STREAM) as sock:
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
if result == 0:
return {"port": port, "service": service_name(port)}
return None
def scan_ports(host, start_port, end_port, timeout, workers):
open_ports = []
ports = range(start_port, end_port + 1)
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = [executor.submit(check_port, host, port, timeout) for port in ports]
for future in as_completed(futures):
result = future.result()
if result:
open_ports.append(result)
return sorted(open_ports, key=lambda item: item["port"])
class PortScannerHandler(BaseHTTPRequestHandler):
def do_GET(self):
parsed_url = urlparse(self.path)
if parsed_url.path == "/":
self.send_html(PAGE)
return
if parsed_url.path == "/scan":
self.handle_scan(parsed_url.query)
return
self.send_error(404, "Страница не найдена")
def handle_scan(self, query):
params = parse_qs(query)
host = params.get("host", [DEFAULT_HOST])[0].strip() or DEFAULT_HOST
host = html.escape(host, quote=True)
start_port = clamp_int(params.get("start", [DEFAULT_START_PORT])[0], DEFAULT_START_PORT, 1, 65535)
end_port = clamp_int(params.get("end", [DEFAULT_END_PORT])[0], DEFAULT_END_PORT, 1, 65535)
timeout = clamp_float(params.get("timeout", [DEFAULT_TIMEOUT])[0], DEFAULT_TIMEOUT, 0.05, 5.0)
workers = clamp_int(params.get("workers", [DEFAULT_WORKERS])[0], DEFAULT_WORKERS, 1, 1000)
if start_port > end_port:
start_port, end_port = end_port, start_port
try:
socket_module.gethostbyname(host)
except OSError:
self.send_json({"error": "Не удалось найти такой хост."}, status=400)
return
from time import perf_counter
started = perf_counter()
try:
open_ports = scan_ports(host, start_port, end_port, timeout, workers)
except OSError as error:
self.send_json({"error": str(error)}, status=400)
return
elapsed = round(perf_counter() - started, 2)
self.send_json(
{
"host": host,
"start_port": start_port,
"end_port": end_port,
"timeout": timeout,
"workers": workers,
"elapsed_seconds": elapsed,
"open_ports": open_ports,
}
)
def send_html(self, content):
data = content.encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
def send_json(self, content, status=200):
data = json.dumps(content, ensure_ascii=False).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
def log_message(self, format, *args):
return
def find_free_port(start=8000, end=8100):
for port in range(start, end + 1):
try:
server = ThreadingHTTPServer(("127.0.0.1", port), PortScannerHandler)
return server, port
except OSError:
continue
raise OSError("Не удалось найти свободный порт для веб-страницы.")
if __name__ == "__main__":
httpd, port = find_free_port()
url = f"http://127.0.0.1:{port}"
print(f"Открываю страницу: {url}")
print("Закрыть сервер: Ctrl+C")
webbrowser.open(url)
httpd.serve_forever()