#!/usr/bin/env python3 import asyncio, asyncssh, argon2, sys, json, os, importlib, shlex, threading from argon2 import PasswordHasher from typing import Optional from datetime import datetime from queue import Queue # Create a timestamp def timestamp(tz=None, **kwargs): if tz is None: now = datetime.now() else: now = datetime.now(timezone(tz)) if "preset" in kwargs.keys(): if kwargs['preset'] == "datetime": return now.strftime("%Y-%m-%d_%H:%M:%S") if kwargs['preset'] == "date": return now.strftime("%Y-%m-%d") if kwargs['preset'] == "time": return now.strftime("%H:%M:%S") if "format" in kwargs.keys(): return now.strftime(kwargs['format']) class ConfigManager: def __init__(self): self.file = FileManager() self.file_path = os.path.join(self.file.base_path, 'config.json') self.data = None def load_config(self): # Load the configuration values from the configuration file file = self.file.read_json_file(self.file_path) if file['status'] == 0: self.data = file['content'] return 0 if file['status'] == 1: return 1 class AccountManager: def __init__(self): self.config = ConfigManager() def get_users(self, username=None): accounts = {} for account in self.config.data['accounts']: print(account) username = account['username'] password = account['password'] environment = account['environment'] accounts[username] = [password, environment] if not accounts: # Empty dictionary return None elif username is None: # Non-empty dictionary, username is None return accounts elif username in accounts: # Non-empty dictionary, username is not None return accounts[username] else: return None # Non-empty dictionary, username does not exist def add_user(self): pass def delete_user(self): pass def modify_user(self): pass def get_user(self, username): self.get_users() if self.accounts[username]: return self.accounts[username] else: return False class FileManager: def __init__(self): self.locks = {} # Define the base path for the server files and directories self.base_path = os.path.dirname(os.path.abspath(__file__)) def list_files(self): files = os.listdir(self.base_path) return files def read_file(self, filename): filepath = os.path.join(self.base_path, filename) if not self.is_within_base_path(filepath): return {'status': 2} lock = self.get_lock(filepath) lock.acquire() # Acquire the lock before performing the file operation try: if os.path.isfile(filename): with open(filepath, "r") as file: content = file.read() else: return {'status': 1} finally: lock.release() # Release the lock after completing the file operation return {'status': 0, 'content': content} def write_file(self, filename, content): filepath = os.path.join(self.base_path, filename) if not self.is_within_base_path(filepath): raise Exception("Invalid file path") lock = self.get_lock(filepath) lock.acquire() # Acquire the lock before performing the file operation try: with open(filepath, "w") as file: file.write(content) finally: lock.release() # Release the lock after completing the file operation def delete_file(self, filename): filepath = os.path.join(self.base_path, filename) if not self.is_within_base_path(filepath): raise Exception("Invalid file path") lock = self.get_lock(filepath) lock.acquire() # Acquire the lock before performing the file operation try: if os.path.exists(filepath): os.remove(filepath) else: raise Exception("File does not exist") finally: lock.release() # Release the lock after completing the file operation def read_json_file(self, filename): file = self.read_file(filename) if file['status'] == 0: data = json.loads(file['content']) return {'status': file['status'], 'content': data} else: return {'status': file['status']} def write_json_file(self, filename, json_data): content = json.dumps(json_data) self.write_file(filename, content) def get_lock(self, filepath): if filepath not in self.locks: self.locks[filepath] = threading.Lock() return self.locks[filepath] def is_within_base_path(self, filepath): return os.path.commonpath([self.base_path, filepath]) == self.base_path class SSHServerProcess: def __init__(self, process: asyncssh.SSHServerProcess): self._process = process self.timestamp = timestamp @classmethod async def handle_client(cls, process: asyncssh.SSHServerProcess) -> None: await cls(process).run() # Wait for the client to disconnect await process.wait_closed() def send(self, message): self._process.stdout.write(f"{message}\n") def termsize(self): term_size = self._process.get_terminal_size() return {'rows': term_size[0], 'columns': term_size[1], 'pixel_height': term_size[2], 'pixel_width': term_size[3]} async def run(self) -> None: # Get the username from the process object username = self._process.get_extra_info('username') # Get the account info for the user from the accounts dictionary account = accounts.get(username) # Get the environment name from the account info env_name = account[1] envExists = False for plugin in plugins.values(): for environment in environments: if environment['name'] == env_name: env_commands = environment['commands'] envExists = True if envExists: # If the environment is valid, print a welcome message to the user print(f"User '{username}' successfully authenticated.") self.send('Welcome to the HelenaServ shell server!') self.send("Type 'exit' to safely exit the shell server at any time.") self.send(f'Username: {username}') self.send(f'Environment: {env_name}') # Loop until the client disconnects while True: try: # Write the prompt to the client self._process.stdout.write('> ') # Read a line from the client line = await self._process.stdin.readline() # Split the line into a list line = shlex.split(line) try: command = line[0] except IndexError: command = None arguments = line[1:] # Check if command is valid if command is None: # If the command is an empty string, return a newline self._process.stdout.write("") elif command in commands: if command in env_commands: # If the command is valid, call the corresponding command method print(f"Received existant command '{command}' from user '{username}'.") command_method = commands[command] command_method(self, arguments) else: print(f"Command '{command}' executed by user '{username}' is not available in environment '{env_name}'.") self.send(f'Command not found: {command}') else: # If the command is not valid, print an error message print(f"Received non-existant command '{command}' from user '{username}'.") self.send(f'Command not found: {command}') except asyncssh.BreakReceived: # Handle the SSH "break" signal self._process.stdout.write('^C\n') print(f"Received break signal from user '{username}'.") except asyncssh.TerminalSizeChanged: # Handle changes to the client's terminal size self._process.stdout.write('\nTerminal size changed.\n') print(f"Terminal size changed for user '{username}'.") else: # If the environment is not valid, print an error message self._process.stdout.write(f'\nNo such environment: {env_name}\n') print(f"Invalid environment '{env_name}' for user '{username}'.") # Wait for the client to disconnect await process.wait_closed() class SSHServer(asyncssh.SSHServer): def __init__(self): self.account = AccountManager() async def start_server(self, address, port): try: # Create the SSH server async with asyncssh.create_server(SSHServer, address, port, server_host_keys=['ssh_host_key'], process_factory=SSHServerProcess.handle_client) as server: print(f"Listening on address '{address}:{port}'") await server.wait_closed() except OSError as exc: if exc.errno == 98: # If the server cannot be started due to an address already in use error, print an error message print("Error starting server: Address already in use. Please choose a different address or port or stop the process using port {}.".format(port)) else: # If the server cannot be started for some other reason, print the exception print("Error starting server: " + str(exc)) def connection_made(self, conn: asyncssh.SSHServerConnection) -> None: # Print a message when a connection is made print('SSH connection received from %s.' % conn.get_extra_info('peername')[0]) def connection_lost(self, exc: Optional[Exception]) -> None: # Print a message when a connection is lost if exc: print('SSH connection error: ' + str(exc), file=sys.stderr) else: print('SSH connection closed.') def begin_auth(self, username: str) -> bool: # Print a message when authentication begins print(f"Beginning authentication for user '{username}'.") return True def password_auth_supported(self) -> bool: # Indicate that password authentication is supported return True def validate_password(self, username: str, password: str) -> bool: # Get the hashed password for the user from the accounts dictionary print("Test") print(self.account.get_users(username)) print("Test") # account = self.accounts.get(username) account = self.account.get_users(username) hashed_pw = account[0] if hashed_pw is None: # If there is no hashed password for the user, authentication fails return False try: # Verify the provided password using Argon2 PasswordHasher().verify(hashed_pw, password.encode('utf-8')) # If the password is valid, authentication succeeds return True except argon2.exceptions.VerifyMismatchError: # If the password is invalid, authentication fails return False class HelenaServ: def __init__(self): self.file = FileManager() self.config = ConfigManager() self.server = SSHServer() # Define the path for the server plugins directory self.plugins_path = os.path.join(self.file.base_path, 'plugins') # Define a empty dictionary to keep track of plugins self.plugins = {} # Define a empty list to keep track of environments self.environments = [] # Define a empty dictionary to keep track of commands self.commands = {} # Define a empty dictionary to keep track of accounts self.accounts = {} self.address = None self.port = None def run(self): # Print a message indicating that the server is starting up print("The HelenaServ shell server is starting up...") # Print a message indicating the base path for the server files and directories print(f"Base path: {self.file.base_path}") # Print a message indicating the path for the server configuration file print(f"Configuration file path: {self.config.file_path}") # Print a message indicating the path for the server plugins directory print(f"Plugins directory path: {self.plugins_path}") # Load the configuration file if self.config.load_config() == 0: print("Loaded the configuration file.") if self.config.load_config() == 1: exit("The configuration file file does not exist or is not a regular file.") # Extract the configuration values self.address = self.config.data['address'] self.port = self.config.data['port'] ssh_host_keys = self.config.data['ssh_host_keys'] # Extract the accounts from the configuration file accounts = {} for account in self.config.data['accounts']: username = account['username'] password = account['password'] environment = account['environment'] self.accounts[username] = [password, environment] # Iterate over the files in the plugins directory for root, dirs, files in os.walk(self.plugins_path): for plugin_filename in files: # Load Python files that end with '.py' if plugin_filename.endswith('.py'): # Get the plugin name from the filename plugin_name = plugin_filename[:-3] # Print a message indicating that the plugin is being loaded print(f"Loading plugin '{plugin_name}'.") # Load the plugin as a module plugin_spec = importlib.util.spec_from_file_location(plugin_name, f"{self.plugins_path}/{plugin_filename}") plugin_object = importlib.util.module_from_spec(plugin_spec) plugin_spec.loader.exec_module(plugin_object) # Get the plugin class and its environments and command names plugin_class = getattr(plugin_object, 'Plugin') hasEnvironments = False try: plugin_environments = getattr(plugin_class, 'define_environments') hasEnvironments = True except AttributeError: print(f"Plugin '{plugin_name}' has no environments.") if hasEnvironments: plugin_environments = plugin_environments() # Add the plugin's environments to the environments list for plugin_environment in plugin_environments: # Check for duplicate environments if plugin_environment in self.environments: exit(f"Duplicate environment '{plugin_environment['name']}' found in plugin '{plugin_name}'. Exiting...") else: self.environments.append(plugin_environment) print(f"Loaded environment '{plugin_environment['name']}' from plugin '{plugin_name}'.") plugin_command_names = [attr[8:] for attr in dir(plugin_class) if attr.startswith('command_')] # Add the plugin's commands to the commands dictionary for plugin_command_name in plugin_command_names: # Check for duplicate command names if plugin_command_name in self.commands: exit(f"Duplicate command '{plugin_command_name}' found in plugin '{plugin_name}'. Exiting...") else: plugin_command_method = getattr(plugin_class, 'command_%s' % plugin_command_name) self.commands[plugin_command_name] = plugin_command_method print(f"Loaded command '{plugin_command_name}' from plugin '{plugin_name}'.") # Add the plugin class to the plugins dictionary self.plugins[plugin_name] = plugin_class # Print a message indicating that the plugin has been loaded print(f"Loaded plugin '{plugin_name}'.") # Print a message indicating how many plugins have been loaded print(f"Loaded {len(self.plugins)} plugins.") loop = asyncio.get_event_loop() try: loop.run_until_complete(self.server.start_server(self.address, self.port)) except (OSError, asyncssh.Error) as exc: sys.exit('Error starting server: ' + str(exc)) loop.run_forever() def main(): app = HelenaServ() app.run() if __name__ == "__main__": main()