Servidor SSH con Twisted

Actualicé este ejemplo, tamibién incluido en el libro de Twisted para que no tire Deprecation Warnings:

from twisted.cred import portal, checkers, credentials
from twisted.conch import error, avatar, recvline, interfaces as conchinterfaces
from twisted.conch.ssh import factory, userauth, connection, keys, session, common
from twisted.conch.insults import insults
from twisted.application import service, internet
from zope.interface import implements
import os
 
class SSHDemoProtocol(recvline.HistoricRecvLine):
    def __init__(self, user):
        self.user = user
 
    def connectionMade(self) :
        recvline.HistoricRecvLine.connectionMade(self)
        self.terminal.write("Welcome to my test SSH server.")
        self.terminal.nextLine()
        self.do_help()
        self.showPrompt()
 
    def showPrompt(self):
        self.terminal.write("$ ")
 
    def getCommandFunc(self, cmd):
        return getattr(self, 'do_' + cmd, None)
 
    def lineReceived(self, line):
        line = line.strip()
        if line:
            cmdAndArgs = line.split()
            cmd = cmdAndArgs[0]
            args = cmdAndArgs[1:]
            func = self.getCommandFunc(cmd)
            if func:
               try:
                   func(*args)
               except Exception, e:
                   self.terminal.write("Error: %s" % e)
                   self.terminal.nextLine()
            else:
               self.terminal.write("No such command.")
               self.terminal.nextLine()
        self.showPrompt()
 
    def do_help(self, cmd=''):
        "Get help on a command. Usage: help command"
        if cmd:
            func = self.getCommandFunc(cmd)
            if func:
                self.terminal.write(func.__doc__)
                self.terminal.nextLine()
                return
 
        publicMethods = filter(
            lambda funcname: funcname.startswith('do_'), dir(self))
        commands = [cmd.replace('do_', '', 1) for cmd in publicMethods]
        self.terminal.write("Commands: " + " ".join(commands))
        self.terminal.nextLine()
 
    def do_echo(self, *args):
        "Echo a string. Usage: echo my line of text"
        self.terminal.write(" ".join(args))
        self.terminal.nextLine()
 
    def do_whoami(self):
        "Prints your user name. Usage: whoami"
        self.terminal.write(self.user.username)
        self.terminal.nextLine()
 
    def do_quit(self):
        "Ends your session. Usage: quit"
        self.terminal.write("Thanks for playing!")
        self.terminal.nextLine()
        self.terminal.loseConnection()
 
    def do_clear(self):
        "Clears the screen. Usage: clear"
        self.terminal.reset()
 
class SSHDemoAvatar(avatar.ConchUser):
    implements(conchinterfaces.ISession)
 
    def __init__(self, username):
        avatar.ConchUser.__init__(self)
        self.username = username
        self.channelLookup.update({'session':session.SSHSession})
 
    def openShell(self, protocol):
        serverProtocol = insults.ServerProtocol(SSHDemoProtocol, self)
        serverProtocol.makeConnection(protocol)
        protocol.makeConnection(session.wrapProtocol(serverProtocol))
 
    def getPty(self, terminal, windowSize, attrs):
        return None
 
    def execCommand(self, protocol, cmd):
        raise NotImplementedError
 
    def closed(self):
        pass
 
class SSHDemoRealm:
    implements(portal.IRealm)
 
    def requestAvatar(self, avatarId, mind, *interfaces):
        if conchinterfaces.IConchUser in interfaces:
            return interfaces[0], SSHDemoAvatar(avatarId), lambda: None
        else:
            raise Exception, "No supported interfaces found."
 
def getRSAKeys():
    if not (os.path.exists('public.key') and os.path.exists('private.key')):
        # generate a RSA keypair
        print "Generating RSA keypair..."
        from Crypto.PublicKey import RSA
        KEY_LENGTH = 1024
        rsaKey = RSA.generate(KEY_LENGTH, common.entropy.get_bytes)
        publicKeyString = keys.Key(rsaKey).toString()
        privateKeyString = keys.makePrivateKeyString(rsaKey)
        # save keys for next time
        file('public.key', 'w+b').write(publicKeyString)
        file('private.key', 'w+b').write(privateKeyString)
        print "done."
    else:
        publicKeyString = file('public.key').read()
        privateKeyString = file('private.key').read()
    return publicKeyString, privateKeyString
 
if __name__ == "__main__":
    sshFactory = factory.SSHFactory()
    sshFactory.portal = portal.Portal(SSHDemoRealm())
    users = {'admin': 'aaa', 'guest': 'bbb'}
    sshFactory.portal.registerChecker(
 checkers.InMemoryUsernamePasswordDatabaseDontUse(**users))
 
    pubKeyString, privKeyString = getRSAKeys()
    sshFactory.publicKeys = {
        'ssh-rsa': keys.Key.fromString(pubKeyString)}
    sshFactory.privateKeys = {
        'ssh-rsa': keys.Key.fromString(privKeyString)}
 
    from twisted.internet import reactor
    reactor.listenTCP(2222, sshFactory)
    reactor.run()

Acerca de Juanjo

Mi nombre es Juanjo Conti, vivo en Santa Fe y soy Ingeniero en Sistemas de Información. Mi lenguaje de programación de cabecera es Python (aunque ahora estoy aprendiendo Ruby); los uso para trabajar, estudiar y jugar. Como hobby escribí algunos libros.
Esta entrada fue publicada en Aprendiendo Python, Twisted y etiquetada , . Guarda el enlace permanente.
  • Juanjo

    Las lìneas:

    rsaKey = RSA.generate(KEY_LENGTH, common.entropy.get_bytes)
    publicKeyString = keys.Key(rsaKey).toString()

    deben cambiarse por:

    rsaKey = RSA.generate(KEY_LENGTH, twisted.python.randbytes.secureRandom)
    publicKeyString = keys.Key(rsaKey).toString('OPENSSH')
  • http://pucheta.myopenid.com/ pucheta

    GROSO!!!!!!!!!!! :)