转载请注明出处:小锋学长生活大爆炸[xfxuezhang.cn]
背景介绍
有时候需要用到ssh的端口转发功能。目前来说,要么是cmd里手敲指令,但每次敲也太麻烦了;或者打开termius、mobaxterm这种ssh软件,但对于只想使用端口转发的需求来说,打开这种软件也太臃肿了。因此开发了一个专门用于管理端口转发的小工具。
软件介绍
1、添加、修改、删除操作;
2、双击项目连接、再双击断开连接;
3、多端口转发连接;
用到的主要技术:
- SocketServer
- threading
- paramiko
- tkinter
开源代码
{hide}
import threading
import paramiko
import tkinter as tk
from tkinter import ttk, messagebox, simpledialog, Label, Entry, Scrollbar
import json
import os
import socket
import base64
from cryptography.fernet import Fernet
import os
import socket
import select
try:
import SocketServer
except ImportError:
import socketserver as SocketServer
#-----------------------------------------------------------------------------------#
# 来自:https://github.com/paramiko/paramiko/blob/main/demos/forward.py
class ForwardServer(SocketServer.ThreadingTCPServer):
daemon_threads = True
allow_reuse_address = True
class Handler(SocketServer.BaseRequestHandler):
def handle(self):
try:
chan = self.ssh_transport.open_channel(
"direct-tcpip",
(self.chain_host, self.chain_port),
self.request.getpeername(),
)
except Exception as e:
print(
"Incoming request to %s:%d failed: %s"
% (self.chain_host, self.chain_port, repr(e))
)
return
if chan is None:
print(
"Incoming request to %s:%d was rejected by the SSH server."
% (self.chain_host, self.chain_port)
)
return
print(
"Connected! Tunnel open %r -> %r -> %r"
% (
self.request.getpeername(),
chan.getpeername(),
(self.chain_host, self.chain_port),
)
)
while True:
r, w, x = select.select([self.request, chan], [], [])
if self.request in r:
data = self.request.recv(1024)
if len(data) == 0:
break
chan.send(data)
if chan in r:
data = chan.recv(1024)
if len(data) == 0:
break
self.request.send(data)
peername = self.request.getpeername()
chan.close()
self.request.close()
print("Tunnel closed from %r" % (peername,))
#-----------------------------------------------------------------------------------#
# SSH Connection class definition
class SSHConnection:
def __init__(self, user, password, host, port, debug_print):
self.host = host
self.port = port
self.user = user
self.password = password
self.debug_print = debug_print
self.client = paramiko.SSHClient() # paramiko.Transport((self.host, self.port))
self.client.load_system_host_keys()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.allow_run = True
self.server = None
def connect(self):
try:
self.client.connect(
self.host,
self.port,
username=self.user,
password=self.password,
)
self.debug_print("SSH connection established successfully.")
return True
except paramiko.AuthenticationException:
self.debug_print("Authentication failed.")
return False
except Exception as e:
self.debug_print(f"Failed to establish SSH connection due to: {str(e)}")
return False
def forward_port(self, local_ip, local_port, remote_host, remote_port):
ssh_transport = self.client.get_transport()
try:
ssh_transport.request_port_forward('', int(remote_port), 'localhost', local_port)
self.debug_print("Success to open SSH channel")
except Exception as e:
self.debug_print("Failed to open SSH channel due to:"+str(e))
return False
def forward_tunnel(self, local_port, remote_host, remote_port):
# this is a little convoluted, but lets me configure things for the Handler
# object. (SocketServer doesn't give Handlers any way to access the outer
# server normally.)
class SubHander(Handler):
ssh_transport = self.client.get_transport()
chain_host = remote_host
chain_port = remote_port
ssh_transport = ssh_transport
self.server = ForwardServer(("", local_port), SubHander)
self.server.serve_forever()
def close(self):
if self.client:
self.client.close()
self.server.shutdown()
self.server.server_close()
return True
return False
# Custom input dialog
class InputDialog(simpledialog.Dialog):
def __init__(self, parent, title=None, initial_data=None):
self.initial_data = initial_data or {}
super().__init__(parent, title=title)
def body(self, master):
self.entries = {}
fields = ["User", "Password", "SSH IP", "SSH Port", "Target IP", "Target Port", "Local IP", "Local Port"]
default_values = {
"SSH Port": "22",
"Target IP": "localhost",
"Local IP": "127.0.0.1"
}
for i, field in enumerate(fields):
Label(master, text=f"{field}:").grid(row=i, column=0, pady=5)
is_password = field == "Password"
self.entries[field] = Entry(master, show="*" if is_password else "")
self.entries[field].insert(0, self.initial_data.get(field, default_values.get(field, "")))
self.entries[field].grid(row=i, column=1, pady=5)
self.entries["Target Port"].bind('<FocusOut>', self.sync_local_port)
return self.entries["User"]
def sync_local_port(self, event):
if self.entries["Target Port"].get():
self.entries["Local Port"].delete(0, tk.END)
self.entries["Local Port"].insert(0, self.entries["Target Port"].get())
def apply(self):
self.result = {field: entry.get() for field, entry in self.entries.items()}
# Check if a port is available
def is_port_available(port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.bind(("0.0.0.0", int(port)))
return True
except OSError:
return False
finally:
s.close()
# GUI App definition
CONFIG_FILE = "./ssh_config.json"
class SSHApp:
def __init__(self, master):
self.master = master
self.master.title("SSH Port Forwarding by 小锋学长(xfxuezhang.cn)")
self.frame = ttk.Frame(master)
self.frame.pack(fill=tk.BOTH, expand=1)
self.scrollbar_x = Scrollbar(self.frame, orient=tk.HORIZONTAL)
self.scrollbar_x.pack(side=tk.BOTTOM, fill=tk.X)
self.scrollbar_y = Scrollbar(self.frame)
self.scrollbar_y.pack(side=tk.RIGHT, fill=tk.Y)
self.tree = ttk.Treeview(self.frame, xscrollcommand=self.scrollbar_x.set, yscrollcommand=self.scrollbar_y.set, columns=("User", "Password", "SSH IP", "SSH Port", "Target IP", "Target Port", "Local IP", "Local Port"))
self.tree.column("#0", width=0, stretch=tk.NO)
self.tree.column("User", anchor=tk.CENTER, width=40)
self.tree.column("Password", anchor=tk.CENTER, width=60)
self.tree.column("SSH IP", anchor=tk.CENTER, width=100)
self.tree.column("SSH Port", anchor=tk.CENTER, width=50)
self.tree.column("Target IP", anchor=tk.CENTER, width=100)
self.tree.column("Target Port", anchor=tk.CENTER, width=50)
self.tree.column("Local IP", anchor=tk.CENTER, width=100)
self.tree.column("Local Port", anchor=tk.CENTER, width=50)
self.tree.heading("User", text="SSH User", anchor=tk.CENTER)
self.tree.heading("Password", text="SSH Password", anchor=tk.CENTER)
self.tree.heading("SSH IP", text="SSH IP", anchor=tk.CENTER)
self.tree.heading("SSH Port", text="SSH Port", anchor=tk.CENTER)
self.tree.heading("Target IP", text="Target IP", anchor=tk.CENTER)
self.tree.heading("Target Port", text="Target Port", anchor=tk.CENTER)
self.tree.heading("Local IP", text="Local IP", anchor=tk.CENTER)
self.tree.heading("Local Port", text="Local Port", anchor=tk.CENTER)
self.tree.pack(pady=20, expand=True, fill=tk.BOTH)
self.scrollbar_x.config(command=self.tree.xview)
self.scrollbar_y.config(command=self.tree.yview)
self.btn_frame = ttk.Frame(master)
self.btn_frame.pack(pady=20)
self.connect_btn = ttk.Button(self.btn_frame, text="Dis/Connect", command=self.connect_selected)
self.connect_btn.grid(row=0, column=0, padx=10)
self.add_btn = ttk.Button(self.btn_frame, text="Add", command=self.add_entry)
self.add_btn.grid(row=0, column=1, padx=10)
self.edit_btn = ttk.Button(self.btn_frame, text="Edit", command=self.modify_entry)
self.edit_btn.grid(row=0, column=2, padx=10)
self.delete_btn = ttk.Button(self.btn_frame, text="Delete", command=self.delete_entry)
self.delete_btn.grid(row=0, column=3, padx=10)
# self.undo_btn = ttk.Button(self.btn_frame, text="Undo", command=self.undo_delete)
# self.undo_btn.grid(row=0, column=4, padx=10)
self.debug_frame = ttk.Frame(self.master)
self.debug_frame.pack(fill=tk.BOTH, expand=1)
self.debug_output = tk.Text(self.debug_frame, height=10, wrap=tk.WORD)
self.debug_output.pack(fill=tk.BOTH, expand=True)
self.connected = {}
self.tree.bind('<Double-1>', self.toggle_connection)
self.load_config()
def debug_print(self, message):
self.debug_output.insert(tk.END, message + '\n')
self.debug_output.see(tk.END)
self.debug_output.update_idletasks()
def modify_entry(self):
selected = self.tree.selection()[0]
item_data = self.tree.item(selected)
values = item_data["values"]
# if len(values) < 8:
# messagebox.showerror("Error", "The selected entry is missing some data. Please delete and re-add it.")
# return
data_dict = {
"User": values[0],
"Password": values[1],
"SSH IP": values[2],
"SSH Port": values[3],
"Target IP": values[4],
"Target Port": values[5],
"Local IP": values[6],
"Local Port": values[7]
}
dialog = InputDialog(self.master, title="Modify Entry", initial_data=data_dict)
if dialog.result:
self.debug_print("已修改内容")
self.tree.item(selected, values=(dialog.result["User"], dialog.result["Password"], dialog.result["SSH IP"], dialog.result["SSH Port"], dialog.result["Target IP"], dialog.result["Target Port"], dialog.result["Local IP"], dialog.result["Local Port"]))
self.save_config()
def load_config(self):
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r') as f:
data = json.load(f)
for entry in data:
user = entry["User"]
password = entry["Password"] # Encrypt the password before displaying it in the Treeview
ssh_ip = entry["SSH IP"]
ssh_port = entry["SSH Port"]
target_ip = entry["Target IP"]
target_port = entry["Target Port"]
local_ip = entry["Local IP"]
local_port = entry["Local Port"]
self.tree.insert("", "end", values=(user, password, ssh_ip, ssh_port, target_ip, target_port, local_ip, local_port))
def connect_selected(self):
# Check if an item is selected
if self.tree.selection():
self.toggle_connection(None) # Use the existing method to toggle the connection
else:
messagebox.showwarning("Warning", "Please select an item to connect.")
def save_config(self):
data = []
for item in self.tree.get_children():
values = self.tree.item(item, "values")
entry_data = {
"User": values[0],
"Password": values[1],
"SSH IP": values[2],
"SSH Port": values[3],
"Target IP": values[4],
"Target Port": values[5],
"Local IP": values[6],
"Local Port": values[7]
}
data.append(entry_data)
with open(CONFIG_FILE, 'w') as f:
json.dump(data, f)
self.debug_print("已保存内容")
def add_entry(self):
dialog = InputDialog(self.master)
entries = dialog.result
if entries and all(entries.values()):
self.tree.insert("", "end", values=(entries["User"], entries["Password"], entries["SSH IP"], entries["SSH Port"], entries["Target IP"], entries["Target Port"], entries["Local IP"], entries["Local Port"]))
self.debug_print("已新增内容")
self.save_config()
def delete_entry(self):
selected = self.tree.selection()
if selected:
self.tree.delete(selected)
self.debug_print("已删除内容")
self.save_config()
def undo_delete(self):
# This can be implemented by keeping a stack of deleted entries
pass
def toggle_connection(self, event):
item = self.tree.selection()[0]
user, password, ip, port, target_ip, target_port, local_ip, local_port = self.tree.item(item, "values")
print(self.connected.get(ip))
if self.connected.get(ip) and self.connected[ip].client.get_transport().is_active():
self.debug_print("关闭连接")
if self.connected[ip].close():
del self.connected[ip]
self.tree.item(item, image='')
messagebox.showinfo("Note", f"{ip} 关闭成功.")
else:
messagebox.showerror("Note", f"{ip} 关闭失败.")
else:
self.debug_print("开始连接")
if not is_port_available(int(local_port)):
messagebox.showerror("Error", f"端口 {local_port} 已被使用.")
return
try:
def establish_connection(user, password, ip, port, local_port, target_port):
try:
ssh_conn = SSHConnection(user, password, ip, int(port), self.debug_print)
if not ssh_conn.connect():
return
self.connected[ip] = ssh_conn
ssh_conn.forward_tunnel(int(local_port), ip, int(target_port))
except Exception as e:
if self.connected.get(ip) and not self.connected[ip].client.get_transport().is_active():
del self.connected[ip]
self.debug_print(f"Error establishing connection: {e}")
thread = threading.Thread(target=establish_connection, args=(user, password, ip, port, local_port, target_port))
thread.daemon = True
thread.start()
# Indicate with green color
self.tree.item(item, image=self.get_green_icon())
except Exception as e:
messagebox.showerror("Error", f"Failed to connect: {str(e)}")
def get_green_icon(self):
# Create a green icon for the connected state
icon = tk.PhotoImage(width=10, height=10)
icon.put("#00FF00", to=(0, 0, 10, 10))
return icon
def center_window(root, width, height):
# 获取屏幕宽度和高度
screen_width = root.winfo_screenwidth()
screen_height = root.winfo_screenheight()
# 计算 (x, y) 坐标以将窗口居中
x = (screen_width / 2) - (width / 2)
y = (screen_height / 2) - (height / 2)
# 设置窗口位置和大小
root.geometry(f'{width}x{height}+{int(x)}+{int(y)}')
root = tk.Tk()
app = SSHApp(root)
root.geometry('800x400')
center_window(root, 800, 400)
root.mainloop()
{/hide}
软件下载
提供Windows版和Mac版。