内容简介:一些关于通常用于处理协议的子类为:先看一个简单的例子
一些关于 twisted 的简单内容
TCP SERVER
protocol
通常用于处理协议的子类为: twisted.internet.protocol.Protocol , 为了保存配置文件,使用工厂模式: twisted.internet.protocol.Factory
先看一个简单的例子
class Echo(Protocol): def dataReceived(self, data): self.transport.write(data)
这就是一个简单的协议,重写了 dataReceived
当收到消息时,返回给 sender 同样内容。
from twisted.internet.protocol import Protocol class QOTD(Protocol): def connectionMade(self): self.transport.write(b"An apple a day keeps the doctor away\r\n") self.transport.loseConnection()
上面这个重写了 connectionMade
方法,这个时双方建立连接时的协商工作。
from twisted.protocols.basic import LineReceiver class Answer(LineReceiver): answers = {b'How are you?': b'Fine', None: b"I don't know what you mean"} def lineReceived(self, line): if line in self.answers: self.sendLine(self.answers[line]) else: self.sendLine(self.answers[None])
这是一个没收到一行就处理一次的Protocol。
运行server
from twisted.internet.protocol import Factory from twisted.internet.endpoints import TCP4ServerEndpoint from twisted.internet import reactor class QOTDFactory(Factory): def buildProtocol(self, addr): return QOTD() # 8007 is the port you want to run under. Choose something >1024 endpoint = TCP4ServerEndpoint(reactor, 8007) endpoint.listen(QOTDFactory()) reactor.run()
创建工程类,重写 buildProtocol 方法,返回你的 Protocol子类 。然后通过 TCP4ServerEndpoint 绑定端口,然后指定工厂类。通过 reactor.run() 来启动整个流程。(目前我还不怎么会用reactor)
另一种工程创建方式
from twisted.internet.protocol import Factory, Protocol from twisted.internet.endpoints import TCP4ServerEndpoint from twisted.internet import reactor class QOTD(Protocol): def connectionMade(self): # self.factory was set by the factory's default buildProtocol: self.transport.write(self.factory.quote + b'\r\n') self.transport.loseConnection() class QOTDFactory(Factory): # This will be used by the default buildProtocol to create new protocols: protocol = QOTD def __init__(self, quote=None): self.quote = quote or b'An apple a day keeps the doctor away' endpoint = TCP4ServerEndpoint(reactor, 8007) endpoint.listen(QOTDFactory(b"configurable quote")) reactor.run()
工厂类的startup & shutdown
# -*- coding: utf-8 -*- from twisted.internet.protocol import Factory from twisted.protocols.basic import LineReceiver from twisted.internet.endpoints import TCP4ServerEndpoint from twisted.internet import reactor class LoggingProtocol(LineReceiver): def lineReceived(self, line): print(line) if line == b'quit': # self.factory.stopFactory() pass self.factory.fp.write(line + b'\n') self.transport.loseConnection() class LogfileFactory(Factory): protocol = LoggingProtocol def __init__(self, fileName): self.file = fileName def startFactory(self): self.fp = open(self.file, 'ab') def stopFactory(self): self.fp.close() endpoint = TCP4ServerEndpoint(reactor, 8006) endpoint.listen(LogfileFactory("1.txt")) reactor.run()
这个需要通过 python test.py 来启动,然后通过 CTRL+C 来终止。如果时pycharm直接kill的话,无法调用 stopFactory 函数,导致无法写日志。官方文档说,用户不应该手动调用此方法。
# -*- coding: utf-8 -*- from twisted.internet.protocol import Protocol from twisted.internet.protocol import Factory from twisted.internet.endpoints import TCP4ServerEndpoint from twisted.internet import reactor class Echo(Protocol): def __init__(self, factory): self.factory = factory def connectionMade(self): self.factory.numProtocols = self.factory.numProtocols + 1 self.transport.write( b"Welcome! There are currently %d open connections.\n" % (self.factory.numProtocols,)) def connectionLost(self, reason): self.factory.numProtocols = self.factory.numProtocols - 1 def dataReceived(self, data): self.transport.write(data) class EchoFactory(Factory): numProtocols = 0 def buildProtocol(self, addr): return Echo(self) # 通过self将factory实例传入 # 34 35的代码可以通过如下代码替换 # reactor.listenTCP(8006, EchoFactory()) endpoint = TCP4ServerEndpoint(reactor, 8006) endpoint.listen(EchoFactory()) reactor.run()
listenTCP是将Factory连接到网络的方法。
TCP CLIENT
In many cases, the protocol only needs to connect to the server once, and the code just wants to get a connected instance of the protocol.
文档意思是大多数情况下,Clinet只需要请求一次SERVER,所以不需要Factory。 twisted.internet.endpoints 提供了适当的API,特别是connectProtocol,它采用协议实例而不是工厂。
一个简单的协议子类与 SERVER 版本没什么区别。
from twisted.internet.protocol import Protocol from sys import stdout class Echo(Protocol): def dataReceived(self, data): stdout.write(data) ######################################################## from twisted.internet.protocol import Protocol class WelcomeMessage(Protocol): def connectionMade(self): self.transport.write("Hello server, I am the client!\r\n") self.transport.loseConnection()
下面是官方文档使用的例子。主要是关注下它启动的方式。还有另一种启动方式,已经不推荐。
# -*- coding: utf-8 -*- from twisted.internet import reactor from twisted.internet.protocol import Protocol from twisted.internet.endpoints import TCP4ClientEndpoint, connectProtocol class Greeter(Protocol): def connectionMade(self): self.transport.write(b"Hello server, I am the client!\r\n") # self.transport.loseConnection() def dataReceived(self, data): self.sendMessage(b'yes') def sendMessage(self, msg): self.transport.write(b"MESSAGE %s\n" % msg) def gotProtocol(p): p.sendMessage(b"Hello") reactor.callLater(1, p.sendMessage, b"This is sent in a second") reactor.callLater(2, p.transport.loseConnection) point = TCP4ClientEndpoint(reactor, "127.0.0.1", 1234) d = connectProtocol(point, Greeter()) d.addCallback(gotProtocol) reactor.run()
工厂方式
from twisted.internet.protocol import Protocol, ClientFactory from sys import stdout class Echo(Protocol): def dataReceived(self, data): stdout.write(data) class EchoClientFactory(ClientFactory): def startedConnecting(self, connector): print('Started to connect.') def buildProtocol(self, addr): print('Connected.') return Echo() def clientConnectionLost(self, connector, reason): print('Lost connection. Reason:', reason) def clientConnectionFailed(self, connector, reason): print('Connection failed. Reason:', reason) from twisted.internet import reactor reactor.connectTCP('127.0.0.1', 1234, EchoClientFactory()) reactor.run()
无法建立连接时调用clientConnectionFailed,并且在建立连接然后断开连接时调用clientConnectionLost。
下面的例子是当连接超时或者出错,进行重新连接的操作。
from twisted.internet.protocol import Protocol, ReconnectingClientFactory from sys import stdout class Echo(Protocol): def dataReceived(self, data): stdout.write(data) class EchoClientFactory(ReconnectingClientFactory): def startedConnecting(self, connector): print('Started to connect.') def buildProtocol(self, addr): print('Connected.') print('Resetting reconnection delay') self.resetDelay() return Echo() def clientConnectionLost(self, connector, reason): print('Lost connection. Reason:', reason) ReconnectingClientFactory.clientConnectionLost(self, connector, reason) def clientConnectionFailed(self, connector, reason): print('Connection failed. Reason:', reason) ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)
schedule
对于 schedule ,官方给出3个使用案例。
延时调用
# -*- coding: utf-8 -*- from twisted.internet import task from twisted.internet import reactor def f(s): print("This will run 3.5 seconds after it was scheduled: %s" % s) return 'hehe' d = task.deferLater(reactor, 1, f, "hello, world") #也可以用下面的语句,但我还不知道着2个是否有什么区别 reactor.callLater(1, f, "This is sent in a second") def called(result): print(result) d.addCallback(called) reactor.run()
循环调用
from twisted.internet import task from twisted.internet import reactor loopTimes = 3 # 循环次数 failInTheEnd = False _loopCounter = 0 # 已经循环的次数 def runEverySecond(): """ Called at ever loop interval. """ global _loopCounter if _loopCounter < loopTimes: _loopCounter += 1 print('A new second has passed.') return # 如果发生错误,则抛出异常 if failInTheEnd: raise Exception('Failure during loop execution.') # We looped enough times. loop.stop() return def cbLoopDone(result): """ Called when loop was stopped with success. """ print("Loop done.") reactor.stop() def ebLoopFailed(failure): """ Called when loop execution failed. """ print(failure.getBriefTraceback()) reactor.stop() loop = task.LoopingCall(runEverySecond) # Start looping every 1 second. loopDeferred = loop.start(1.0) # Add callbacks for stop and failure. loopDeferred.addCallback(cbLoopDone) loopDeferred.addErrback(ebLoopFailed) reactor.run()
取消调用
from twisted.internet import reactor def f(): print("I'll never run.") callID = reactor.callLater(5, f) callID.cancel() reactor.run()
Deferred
个人理解
Deferred是一个可以添加回调链的对象。上面代码中的Deferred都是通过调用延时来模拟的。实际上会有类似于请求WEB的这种API作为真实的Deferred参数。
from twisted.internet import reactor, defer def getDummyData(inputData): print('getDummyData called') deferred = defer.Deferred() # 通过callLater来模拟一个会产生延时的函数 reactor.callLater(2, deferred.callback, inputData * 3) return deferred def cbPrintData(result): # 用于处理上面的输出 print('Result received: {}'.format(result)) deferred = getDummyData(3) deferred.addCallback(cbPrintData) # 设置结束 reactor.callLater(4, reactor.stop) # start up the Twisted reactor (event loop handler) manually print('Starting the reactor') reactor.run()
- 先从数据源获取数据,这个获取的方法会产生一个 Deferred 对象。
- 然后开始回调。
关于回调函数
- 如果成功,则调用 .callback(result) 。如果失败,则 .errback(failure)
- 回调函数总是将上一个函数的返回作为下个函数的参数进行传递。
- 如果 callback 中产生异常,则切换到 errback 中。
对于一个正常的 python 处理异常:
try: # code that may throw an exception cookSpamAndEggs() except (SpamException, EggException): # Handle SpamExceptions and EggExceptions ...
如果用twisted的异常链,应该是如下内容:
def errorHandler(failure): failure.trap(SpamException, EggException) # Handle SpamExceptions and EggExceptions d.addCallback(cookSpamAndEggs) d.addErrback(errorHandler)
如果 failure.trap(…) 没有匹配到异常,则会抛出另外的异常。
另一个需要注意的就是下面两种添加会掉链的方式可能有所不同。
# Case 1 d = getDeferredFromSomewhere() d.addCallback(callback1) # A d.addErrback(errback1) # B d.addCallback(callback2) d.addErrback(errback2) # Case 2 d = getDeferredFromSomewhere() d.addCallbacks(callback1, errback1) # C d.addCallbacks(callback2, errback2)
对于 case1 而言,在 callback1 发生异常时,errback1会被调用。 而对于 case2 而言,则errback2会被调用。
maybeDeferred
这个函数用来处理,当你不知道这个到底是 同步 还是 异步 时候使用。
# 同步的验证函数 def synchronousIsValidUser(user): ''' Return true if user is a valid user, false otherwise ''' return user in ["Alice", "Angus", "Agnes"] from twisted.internet import reactor, defer # 异步函数 def asynchronousIsValidUser(user): d = defer.Deferred() reactor.callLater(2, d.callback, user in ["Alice", "Angus", "Agnes"]) return d from twisted.internet import defer # 返回结果 def printResult(result): if result: print("User is authenticated") else: print("User is not authenticated") def authenticateUser(isValidUser, user): d = defer.maybeDeferred(isValidUser, user) d.addCallback(printResult) # 通过同步验证 authenticateUser(synchronousIsValidUser,user) # 通过异步验证 authenticateUser(asynchronousIsValidUser,user)
取消任务
动机
假如在请求某个连接时,一直在转圈圈,那么用户想停止请求它,就要取消。看一下下面模拟的例子:
import random from twisted.internet import task def f(): return "Hopefully this will be called in 3 seconds or less" def main(reactor): delay = random.uniform(1, 5) # 随机等待 1-5 秒 def called(result): print("{0} seconds later:".format(delay), result) d = task.deferLater(reactor, delay, f) d.addTimeout(3, reactor).addBoth(called) # 如果超过3秒,就会打断上面的命令,抛出异常 # 如果没有超过3秒,就不会打断上面命令 return d # f() will be timed out if the random delay is greater than 3 seconds task.react(main)
DeferredList
# A callback that unpacks and prints the results of a DeferredList def printResult(result): for (success, value) in result: if success: print('Success:', value) else: print('Failure:', value.getErrorMessage()) # Create three deferreds. deferred1 = defer.Deferred() deferred2 = defer.Deferred() deferred3 = defer.Deferred() # Pack them into a DeferredList dl = defer.DeferredList([deferred1, deferred2, deferred3], consumeErrors=True) # Add our callback dl.addCallback(printResult) # Fire our three deferreds with various values. deferred1.callback('one') deferred2.errback(Exception('bang!')) deferred3.callback('three') # At this point, dl will fire its callback, printing: # Success: one # Failure: bang! # Success: three # (note that defer.SUCCESS == True, and defer.FAILURE == False)
如果在将Deferred添加到DeferredList之后向Deferred添加回调,则该回调返回的值将不会提供给DeferredList的回调。为避免混淆,我们建议在DeferredList中使用后,不要向Deferred添加回调
def printResult(result): print(result) def addTen(result): return result + " ten" # Deferred gets callback before DeferredList is created deferred1 = defer.Deferred() deferred2 = defer.Deferred() deferred1.addCallback(addTen) dl = defer.DeferredList([deferred1, deferred2]) dl.addCallback(printResult) deferred1.callback("one") # fires addTen, checks DeferredList, stores "one ten" deferred2.callback("two") # At this point, dl will fire its callback, printing: # [(1, 'one ten'), (1, 'two')] # Deferred gets callback after DeferredList is created deferred1 = defer.Deferred() deferred2 = defer.Deferred() dl = defer.DeferredList([deferred1, deferred2]) deferred1.addCallback(addTen) # will run *after* DeferredList gets its value dl.addCallback(printResult) deferred1.callback("one") # checks DeferredList, stores "one", fires addTen deferred2.callback("two") # At this point, dl will fire its callback, printing: # [(1, 'one), (1, 'two')]
from twisted.internet import defer d1 = defer.Deferred() d2 = defer.Deferred() d = defer.gatherResults([d1, d2], consumeErrors=True) def cbPrintResult(result): print(result) d.addCallback(cbPrintResult) d1.callback("one") # nothing is printed yet; d is still awaiting completion of d2 d2.callback("two") # printResult prints ["one", "two"]
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
写给大家看的设计书(第3版)
[美] Robin Williams / 苏金国、刘亮 / 人民邮电出版社 / 2009-1 / 49.00元
这本书出自一位世界级设计师之手。复杂的设计原理在书中凝炼为亲密性、对齐、重复和对比4 个基本原则。作者以其简洁明快的风格,将优秀设计所必须遵循的这4 个基本原则及其背后的原理通俗易懂地展现在读者面前。本书包含大量的示例,让你了解怎样才能按照自己的方式设计出美观且内容丰富的产品。 此书适用于各行各业需要从事设计工作的读者,也适用于有经验的设计人员。一起来看看 《写给大家看的设计书(第3版)》 这本书的介绍吧!