twisted-1

栏目: Python · 发布时间: 6年前

内容简介:一些关于通常用于处理协议的子类为:先看一个简单的例子

一些关于 twisted 的简单内容

TCP SERVER

protocol

通常用于处理协议的子类为: twisted.internet.protocol.Protocol , 为了保存配置文件,使用工厂模式: twisted.internet.protocol.Factory

先看一个简单的例子

class Echo(Protocol):
    def dataReceived(self, data):
        self.transport.write(data)

这就是一个简单的协议,重写了 dataReceived 当收到消息时,返回给 sender 同样内容。

from twisted.internet.protocol import Protocol
class QOTD(Protocol):
    def connectionMade(self):
        self.transport.write(b"An apple a day keeps the doctor away\r\n")
        self.transport.loseConnection()

上面这个重写了 connectionMade 方法,这个时双方建立连接时的协商工作。

from twisted.protocols.basic import LineReceiver

class Answer(LineReceiver):

    answers = {b'How are you?': b'Fine', None: b"I don't know what you mean"}

    def lineReceived(self, line):
        if line in self.answers:
            self.sendLine(self.answers[line])
        else:
            self.sendLine(self.answers[None])

这是一个没收到一行就处理一次的Protocol。

运行server

from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet import reactor

class QOTDFactory(Factory):
    def buildProtocol(self, addr):
        return QOTD()

# 8007 is the port you want to run under. Choose something >1024
endpoint = TCP4ServerEndpoint(reactor, 8007)
endpoint.listen(QOTDFactory())
reactor.run()

创建工程类,重写 buildProtocol 方法,返回你的 Protocol子类 。然后通过 TCP4ServerEndpoint 绑定端口,然后指定工厂类。通过 reactor.run() 来启动整个流程。(目前我还不怎么会用reactor)

另一种工程创建方式

from twisted.internet.protocol import Factory, Protocol
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet import reactor

class QOTD(Protocol):

    def connectionMade(self):
        # self.factory was set by the factory's default buildProtocol:
        self.transport.write(self.factory.quote + b'\r\n')
        self.transport.loseConnection()


class QOTDFactory(Factory):

    # This will be used by the default buildProtocol to create new protocols:
    protocol = QOTD

    def __init__(self, quote=None):
        self.quote = quote or b'An apple a day keeps the doctor away'

endpoint = TCP4ServerEndpoint(reactor, 8007)
endpoint.listen(QOTDFactory(b"configurable quote"))
reactor.run()

工厂类的startup & shutdown

# -*- coding: utf-8 -*-
from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet import reactor

class LoggingProtocol(LineReceiver):

    def lineReceived(self, line):
        print(line)
        if line == b'quit':
            # self.factory.stopFactory()
            pass

        self.factory.fp.write(line + b'\n')
        self.transport.loseConnection()


class LogfileFactory(Factory):
    protocol = LoggingProtocol

    def __init__(self, fileName):
        self.file = fileName

    def startFactory(self):
        self.fp = open(self.file, 'ab')

    def stopFactory(self):
        self.fp.close()

endpoint = TCP4ServerEndpoint(reactor, 8006)
endpoint.listen(LogfileFactory("1.txt"))
reactor.run()

这个需要通过 python test.py 来启动,然后通过 CTRL+C 来终止。如果时pycharm直接kill的话,无法调用 stopFactory 函数,导致无法写日志。官方文档说,用户不应该手动调用此方法。

# -*- coding: utf-8 -*-
from twisted.internet.protocol import Protocol
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet import reactor


class Echo(Protocol):

    def __init__(self, factory):
        self.factory = factory

    def connectionMade(self):
        self.factory.numProtocols = self.factory.numProtocols + 1
        self.transport.write(
            b"Welcome! There are currently %d open connections.\n" %
            (self.factory.numProtocols,))

    def connectionLost(self, reason):
        self.factory.numProtocols = self.factory.numProtocols - 1

    def dataReceived(self, data):
        self.transport.write(data)


class EchoFactory(Factory):
    numProtocols = 0

    def buildProtocol(self, addr):
        return Echo(self)  # 通过self将factory实例传入

# 34 35的代码可以通过如下代码替换
# reactor.listenTCP(8006, EchoFactory())
endpoint = TCP4ServerEndpoint(reactor, 8006)
endpoint.listen(EchoFactory())
reactor.run()

listenTCP是将Factory连接到网络的方法。

TCP CLIENT

In many cases, the protocol only needs to connect to the server once, and the code just wants to get a connected instance of the protocol.

文档意思是大多数情况下,Clinet只需要请求一次SERVER,所以不需要Factory。 twisted.internet.endpoints 提供了适当的API,特别是connectProtocol,它采用协议实例而不是工厂。

一个简单的协议子类与 SERVER 版本没什么区别。

from twisted.internet.protocol import Protocol
from sys import stdout

class Echo(Protocol):
    def dataReceived(self, data):
        stdout.write(data)
########################################################
from twisted.internet.protocol import Protocol

class WelcomeMessage(Protocol):
    def connectionMade(self):
        self.transport.write("Hello server, I am the client!\r\n")
        self.transport.loseConnection()

下面是官方文档使用的例子。主要是关注下它启动的方式。还有另一种启动方式,已经不推荐。

# -*- coding: utf-8 -*-
from twisted.internet import reactor
from twisted.internet.protocol import Protocol
from twisted.internet.endpoints import TCP4ClientEndpoint, connectProtocol

class Greeter(Protocol):
    def connectionMade(self):
        self.transport.write(b"Hello server, I am the client!\r\n")
        # self.transport.loseConnection()

    def dataReceived(self, data):
        self.sendMessage(b'yes')

    def sendMessage(self, msg):
        self.transport.write(b"MESSAGE %s\n" % msg)

def gotProtocol(p):
    p.sendMessage(b"Hello")
    reactor.callLater(1, p.sendMessage, b"This is sent in a second")
    reactor.callLater(2, p.transport.loseConnection)

point = TCP4ClientEndpoint(reactor, "127.0.0.1", 1234)
d = connectProtocol(point, Greeter())
d.addCallback(gotProtocol)
reactor.run()

工厂方式

from twisted.internet.protocol import Protocol, ClientFactory
from sys import stdout

class Echo(Protocol):
    def dataReceived(self, data):
        stdout.write(data)

class EchoClientFactory(ClientFactory):
    def startedConnecting(self, connector):
        print('Started to connect.')

    def buildProtocol(self, addr):
        print('Connected.')
        return Echo()

    def clientConnectionLost(self, connector, reason):
        print('Lost connection.  Reason:', reason)

    def clientConnectionFailed(self, connector, reason):
        print('Connection failed. Reason:', reason)
        
from twisted.internet import reactor
reactor.connectTCP('127.0.0.1', 1234, EchoClientFactory())
reactor.run()

无法建立连接时调用clientConnectionFailed,并且在建立连接然后断开连接时调用clientConnectionLost。

下面的例子是当连接超时或者出错,进行重新连接的操作。

from twisted.internet.protocol import Protocol, ReconnectingClientFactory
from sys import stdout

class Echo(Protocol):
    def dataReceived(self, data):
        stdout.write(data)

class EchoClientFactory(ReconnectingClientFactory):
    def startedConnecting(self, connector):
        print('Started to connect.')

    def buildProtocol(self, addr):
        print('Connected.')
        print('Resetting reconnection delay')
        self.resetDelay()
        return Echo()

    def clientConnectionLost(self, connector, reason):
        print('Lost connection.  Reason:', reason)
        ReconnectingClientFactory.clientConnectionLost(self, connector, reason)

    def clientConnectionFailed(self, connector, reason):
        print('Connection failed. Reason:', reason)
        ReconnectingClientFactory.clientConnectionFailed(self, connector,
                                                         reason)

schedule

对于 schedule ,官方给出3个使用案例。

延时调用

# -*- coding: utf-8 -*-
from twisted.internet import task
from twisted.internet import reactor


def f(s):
    print("This will run 3.5 seconds after it was scheduled: %s" % s)
    return 'hehe'

d = task.deferLater(reactor, 1, f, "hello, world")
#也可以用下面的语句,但我还不知道着2个是否有什么区别
reactor.callLater(1, f, "This is sent in a second")

def called(result):
    print(result)

d.addCallback(called)
reactor.run()

循环调用

from twisted.internet import task
from twisted.internet import reactor

loopTimes = 3  # 循环次数
failInTheEnd = False
_loopCounter = 0  # 已经循环的次数

def runEverySecond():
    """
    Called at ever loop interval.
    """
    global _loopCounter

    if _loopCounter < loopTimes:
        _loopCounter += 1
        print('A new second has passed.')
        return

    # 如果发生错误,则抛出异常
    if failInTheEnd:
        raise Exception('Failure during loop execution.')

    # We looped enough times.
    loop.stop()
    return


def cbLoopDone(result):
    """
    Called when loop was stopped with success.
    """
    print("Loop done.")
    reactor.stop()


def ebLoopFailed(failure):
    """
    Called when loop execution failed.
    """
    print(failure.getBriefTraceback())
    reactor.stop()


loop = task.LoopingCall(runEverySecond)

# Start looping every 1 second.
loopDeferred = loop.start(1.0)

# Add callbacks for stop and failure.
loopDeferred.addCallback(cbLoopDone)
loopDeferred.addErrback(ebLoopFailed)

reactor.run()

取消调用

from twisted.internet import reactor

def f():
    print("I'll never run.")

callID = reactor.callLater(5, f)
callID.cancel()
reactor.run()

Deferred

个人理解

Deferred是一个可以添加回调链的对象。上面代码中的Deferred都是通过调用延时来模拟的。实际上会有类似于请求WEB的这种API作为真实的Deferred参数。

from twisted.internet import reactor, defer

def getDummyData(inputData):
    print('getDummyData called')
    deferred = defer.Deferred()
    # 通过callLater来模拟一个会产生延时的函数
    reactor.callLater(2, deferred.callback, inputData * 3)
    return deferred

def cbPrintData(result):
  	# 用于处理上面的输出
    print('Result received: {}'.format(result))

deferred = getDummyData(3)
deferred.addCallback(cbPrintData)

# 设置结束
reactor.callLater(4, reactor.stop)
# start up the Twisted reactor (event loop handler) manually
print('Starting the reactor')
reactor.run()

twisted-1

  • 先从数据源获取数据,这个获取的方法会产生一个 Deferred 对象。
  • 然后开始回调。

关于回调函数

  1. 如果成功,则调用 .callback(result) 。如果失败,则 .errback(failure)
  2. 回调函数总是将上一个函数的返回作为下个函数的参数进行传递。
  3. 如果 callback 中产生异常,则切换到 errback 中。

对于一个正常的 python 处理异常:

try:
    # code that may throw an exception
    cookSpamAndEggs()
except (SpamException, EggException):
    # Handle SpamExceptions and EggExceptions
    ...

如果用twisted的异常链,应该是如下内容:

def errorHandler(failure):
    failure.trap(SpamException, EggException)
    # Handle SpamExceptions and EggExceptions

d.addCallback(cookSpamAndEggs)
d.addErrback(errorHandler)

如果 failure.trap(…) 没有匹配到异常,则会抛出另外的异常。

另一个需要注意的就是下面两种添加会掉链的方式可能有所不同。

# Case 1
d = getDeferredFromSomewhere()
d.addCallback(callback1)       # A
d.addErrback(errback1)         # B
d.addCallback(callback2)
d.addErrback(errback2)

# Case 2
d = getDeferredFromSomewhere()
d.addCallbacks(callback1, errback1)  # C
d.addCallbacks(callback2, errback2)

对于 case1 而言,在 callback1 发生异常时,errback1会被调用。 而对于 case2 而言,则errback2会被调用。

maybeDeferred

这个函数用来处理,当你不知道这个到底是 同步 还是 异步 时候使用。

# 同步的验证函数
def synchronousIsValidUser(user):
    '''
    Return true if user is a valid user, false otherwise
    '''
    return user in ["Alice", "Angus", "Agnes"]
from twisted.internet import reactor, defer
# 异步函数
def asynchronousIsValidUser(user):
    d = defer.Deferred()
    reactor.callLater(2, d.callback, user in ["Alice", "Angus", "Agnes"])
    return d
from twisted.internet import defer

# 返回结果
def printResult(result):
    if result:
        print("User is authenticated")
    else:
        print("User is not authenticated")

def authenticateUser(isValidUser, user):
    d = defer.maybeDeferred(isValidUser, user)
    d.addCallback(printResult)
# 通过同步验证
authenticateUser(synchronousIsValidUser,user)
# 通过异步验证
authenticateUser(asynchronousIsValidUser,user)

取消任务

动机

假如在请求某个连接时,一直在转圈圈,那么用户想停止请求它,就要取消。看一下下面模拟的例子:

import random
from twisted.internet import task

def f():
    return "Hopefully this will be called in 3 seconds or less"

def main(reactor):
    delay = random.uniform(1, 5)  # 随机等待 1-5 秒

    def called(result):
        print("{0} seconds later:".format(delay), result)

    d = task.deferLater(reactor, delay, f)
    d.addTimeout(3, reactor).addBoth(called)  # 如果超过3秒,就会打断上面的命令,抛出异常
    										  # 如果没有超过3秒,就不会打断上面命令

    return d

# f() will be timed out if the random delay is greater than 3 seconds
task.react(main)

DeferredList

# A callback that unpacks and prints the results of a DeferredList
def printResult(result):
    for (success, value) in result:
        if success:
            print('Success:', value)
        else:
            print('Failure:', value.getErrorMessage())

# Create three deferreds.
deferred1 = defer.Deferred()
deferred2 = defer.Deferred()
deferred3 = defer.Deferred()

# Pack them into a DeferredList
dl = defer.DeferredList([deferred1, deferred2, deferred3], consumeErrors=True)

# Add our callback
dl.addCallback(printResult)

# Fire our three deferreds with various values.
deferred1.callback('one')
deferred2.errback(Exception('bang!'))
deferred3.callback('three')

# At this point, dl will fire its callback, printing:
#    Success: one
#    Failure: bang!
#    Success: three
# (note that defer.SUCCESS == True, and defer.FAILURE == False)

如果在将Deferred添加到DeferredList之后向Deferred添加回调,则该回调返回的值将不会提供给DeferredList的回调。为避免混淆,我们建议在DeferredList中使用后,不要向Deferred添加回调

def printResult(result):
    print(result)

def addTen(result):
    return result + " ten"

# Deferred gets callback before DeferredList is created
deferred1 = defer.Deferred()
deferred2 = defer.Deferred()
deferred1.addCallback(addTen)
dl = defer.DeferredList([deferred1, deferred2])
dl.addCallback(printResult)
deferred1.callback("one") # fires addTen, checks DeferredList, stores "one ten"
deferred2.callback("two")
# At this point, dl will fire its callback, printing:
#     [(1, 'one ten'), (1, 'two')]

# Deferred gets callback after DeferredList is created
deferred1 = defer.Deferred()
deferred2 = defer.Deferred()
dl = defer.DeferredList([deferred1, deferred2])
deferred1.addCallback(addTen) # will run *after* DeferredList gets its value
dl.addCallback(printResult)
deferred1.callback("one") # checks DeferredList, stores "one", fires addTen
deferred2.callback("two")
# At this point, dl will fire its callback, printing:
#     [(1, 'one), (1, 'two')]
from twisted.internet import defer

d1 = defer.Deferred()
d2 = defer.Deferred()
d = defer.gatherResults([d1, d2], consumeErrors=True)

def cbPrintResult(result):
    print(result)

d.addCallback(cbPrintResult)

d1.callback("one")
# nothing is printed yet; d is still awaiting completion of d2
d2.callback("two")
# printResult prints ["one", "two"]

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

编译原理

编译原理

Alfred V.Aho、Jeffrey D.Ullman、Ravi Sethi / 李建中 / 机械工业出版社 / 2003-8 / 55.00元

《编译原理》作者Alfred V.Aho、Ravi Sethi和Jeffrey D.Ullman是世界著名的计算机 科学家,他们在计算机科学理论、数据库等很多领域都做出了杰出贡献。《编译原理》 是编译领域无可替代的经典著作,被广大计算机专业人士誉为“龙书”。《编译原理》一 直被世界各地的著名高等院校和科研机构(如贝尔实验室、哥伦比亚大学、普 林斯顿大学和斯坦福大学等)广泛用作本科生和研究生编译原理......一起来看看 《编译原理》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

SHA 加密
SHA 加密

SHA 加密工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具