twisted-1

栏目: Python · 发布时间: 5年前

内容简介:一些关于通常用于处理协议的子类为:先看一个简单的例子

一些关于 twisted 的简单内容

TCP SERVER

protocol

通常用于处理协议的子类为: twisted.internet.protocol.Protocol , 为了保存配置文件,使用工厂模式: twisted.internet.protocol.Factory

先看一个简单的例子

class Echo(Protocol):
    def dataReceived(self, data):
        self.transport.write(data)

这就是一个简单的协议,重写了 dataReceived 当收到消息时,返回给 sender 同样内容。

from twisted.internet.protocol import Protocol
class QOTD(Protocol):
    def connectionMade(self):
        self.transport.write(b"An apple a day keeps the doctor away\r\n")
        self.transport.loseConnection()

上面这个重写了 connectionMade 方法,这个时双方建立连接时的协商工作。

from twisted.protocols.basic import LineReceiver

class Answer(LineReceiver):

    answers = {b'How are you?': b'Fine', None: b"I don't know what you mean"}

    def lineReceived(self, line):
        if line in self.answers:
            self.sendLine(self.answers[line])
        else:
            self.sendLine(self.answers[None])

这是一个没收到一行就处理一次的Protocol。

运行server

from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet import reactor

class QOTDFactory(Factory):
    def buildProtocol(self, addr):
        return QOTD()

# 8007 is the port you want to run under. Choose something >1024
endpoint = TCP4ServerEndpoint(reactor, 8007)
endpoint.listen(QOTDFactory())
reactor.run()

创建工程类,重写 buildProtocol 方法,返回你的 Protocol子类 。然后通过 TCP4ServerEndpoint 绑定端口,然后指定工厂类。通过 reactor.run() 来启动整个流程。(目前我还不怎么会用reactor)

另一种工程创建方式

from twisted.internet.protocol import Factory, Protocol
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet import reactor

class QOTD(Protocol):

    def connectionMade(self):
        # self.factory was set by the factory's default buildProtocol:
        self.transport.write(self.factory.quote + b'\r\n')
        self.transport.loseConnection()


class QOTDFactory(Factory):

    # This will be used by the default buildProtocol to create new protocols:
    protocol = QOTD

    def __init__(self, quote=None):
        self.quote = quote or b'An apple a day keeps the doctor away'

endpoint = TCP4ServerEndpoint(reactor, 8007)
endpoint.listen(QOTDFactory(b"configurable quote"))
reactor.run()

工厂类的startup & shutdown

# -*- coding: utf-8 -*-
from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet import reactor

class LoggingProtocol(LineReceiver):

    def lineReceived(self, line):
        print(line)
        if line == b'quit':
            # self.factory.stopFactory()
            pass

        self.factory.fp.write(line + b'\n')
        self.transport.loseConnection()


class LogfileFactory(Factory):
    protocol = LoggingProtocol

    def __init__(self, fileName):
        self.file = fileName

    def startFactory(self):
        self.fp = open(self.file, 'ab')

    def stopFactory(self):
        self.fp.close()

endpoint = TCP4ServerEndpoint(reactor, 8006)
endpoint.listen(LogfileFactory("1.txt"))
reactor.run()

这个需要通过 python test.py 来启动,然后通过 CTRL+C 来终止。如果时pycharm直接kill的话,无法调用 stopFactory 函数,导致无法写日志。官方文档说,用户不应该手动调用此方法。

# -*- coding: utf-8 -*-
from twisted.internet.protocol import Protocol
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet import reactor


class Echo(Protocol):

    def __init__(self, factory):
        self.factory = factory

    def connectionMade(self):
        self.factory.numProtocols = self.factory.numProtocols + 1
        self.transport.write(
            b"Welcome! There are currently %d open connections.\n" %
            (self.factory.numProtocols,))

    def connectionLost(self, reason):
        self.factory.numProtocols = self.factory.numProtocols - 1

    def dataReceived(self, data):
        self.transport.write(data)


class EchoFactory(Factory):
    numProtocols = 0

    def buildProtocol(self, addr):
        return Echo(self)  # 通过self将factory实例传入

# 34 35的代码可以通过如下代码替换
# reactor.listenTCP(8006, EchoFactory())
endpoint = TCP4ServerEndpoint(reactor, 8006)
endpoint.listen(EchoFactory())
reactor.run()

listenTCP是将Factory连接到网络的方法。

TCP CLIENT

In many cases, the protocol only needs to connect to the server once, and the code just wants to get a connected instance of the protocol.

文档意思是大多数情况下,Clinet只需要请求一次SERVER,所以不需要Factory。 twisted.internet.endpoints 提供了适当的API,特别是connectProtocol,它采用协议实例而不是工厂。

一个简单的协议子类与 SERVER 版本没什么区别。

from twisted.internet.protocol import Protocol
from sys import stdout

class Echo(Protocol):
    def dataReceived(self, data):
        stdout.write(data)
########################################################
from twisted.internet.protocol import Protocol

class WelcomeMessage(Protocol):
    def connectionMade(self):
        self.transport.write("Hello server, I am the client!\r\n")
        self.transport.loseConnection()

下面是官方文档使用的例子。主要是关注下它启动的方式。还有另一种启动方式,已经不推荐。

# -*- coding: utf-8 -*-
from twisted.internet import reactor
from twisted.internet.protocol import Protocol
from twisted.internet.endpoints import TCP4ClientEndpoint, connectProtocol

class Greeter(Protocol):
    def connectionMade(self):
        self.transport.write(b"Hello server, I am the client!\r\n")
        # self.transport.loseConnection()

    def dataReceived(self, data):
        self.sendMessage(b'yes')

    def sendMessage(self, msg):
        self.transport.write(b"MESSAGE %s\n" % msg)

def gotProtocol(p):
    p.sendMessage(b"Hello")
    reactor.callLater(1, p.sendMessage, b"This is sent in a second")
    reactor.callLater(2, p.transport.loseConnection)

point = TCP4ClientEndpoint(reactor, "127.0.0.1", 1234)
d = connectProtocol(point, Greeter())
d.addCallback(gotProtocol)
reactor.run()

工厂方式

from twisted.internet.protocol import Protocol, ClientFactory
from sys import stdout

class Echo(Protocol):
    def dataReceived(self, data):
        stdout.write(data)

class EchoClientFactory(ClientFactory):
    def startedConnecting(self, connector):
        print('Started to connect.')

    def buildProtocol(self, addr):
        print('Connected.')
        return Echo()

    def clientConnectionLost(self, connector, reason):
        print('Lost connection.  Reason:', reason)

    def clientConnectionFailed(self, connector, reason):
        print('Connection failed. Reason:', reason)
        
from twisted.internet import reactor
reactor.connectTCP('127.0.0.1', 1234, EchoClientFactory())
reactor.run()

无法建立连接时调用clientConnectionFailed,并且在建立连接然后断开连接时调用clientConnectionLost。

下面的例子是当连接超时或者出错,进行重新连接的操作。

from twisted.internet.protocol import Protocol, ReconnectingClientFactory
from sys import stdout

class Echo(Protocol):
    def dataReceived(self, data):
        stdout.write(data)

class EchoClientFactory(ReconnectingClientFactory):
    def startedConnecting(self, connector):
        print('Started to connect.')

    def buildProtocol(self, addr):
        print('Connected.')
        print('Resetting reconnection delay')
        self.resetDelay()
        return Echo()

    def clientConnectionLost(self, connector, reason):
        print('Lost connection.  Reason:', reason)
        ReconnectingClientFactory.clientConnectionLost(self, connector, reason)

    def clientConnectionFailed(self, connector, reason):
        print('Connection failed. Reason:', reason)
        ReconnectingClientFactory.clientConnectionFailed(self, connector,
                                                         reason)

schedule

对于 schedule ,官方给出3个使用案例。

延时调用

# -*- coding: utf-8 -*-
from twisted.internet import task
from twisted.internet import reactor


def f(s):
    print("This will run 3.5 seconds after it was scheduled: %s" % s)
    return 'hehe'

d = task.deferLater(reactor, 1, f, "hello, world")
#也可以用下面的语句,但我还不知道着2个是否有什么区别
reactor.callLater(1, f, "This is sent in a second")

def called(result):
    print(result)

d.addCallback(called)
reactor.run()

循环调用

from twisted.internet import task
from twisted.internet import reactor

loopTimes = 3  # 循环次数
failInTheEnd = False
_loopCounter = 0  # 已经循环的次数

def runEverySecond():
    """
    Called at ever loop interval.
    """
    global _loopCounter

    if _loopCounter < loopTimes:
        _loopCounter += 1
        print('A new second has passed.')
        return

    # 如果发生错误,则抛出异常
    if failInTheEnd:
        raise Exception('Failure during loop execution.')

    # We looped enough times.
    loop.stop()
    return


def cbLoopDone(result):
    """
    Called when loop was stopped with success.
    """
    print("Loop done.")
    reactor.stop()


def ebLoopFailed(failure):
    """
    Called when loop execution failed.
    """
    print(failure.getBriefTraceback())
    reactor.stop()


loop = task.LoopingCall(runEverySecond)

# Start looping every 1 second.
loopDeferred = loop.start(1.0)

# Add callbacks for stop and failure.
loopDeferred.addCallback(cbLoopDone)
loopDeferred.addErrback(ebLoopFailed)

reactor.run()

取消调用

from twisted.internet import reactor

def f():
    print("I'll never run.")

callID = reactor.callLater(5, f)
callID.cancel()
reactor.run()

Deferred

个人理解

Deferred是一个可以添加回调链的对象。上面代码中的Deferred都是通过调用延时来模拟的。实际上会有类似于请求WEB的这种API作为真实的Deferred参数。

from twisted.internet import reactor, defer

def getDummyData(inputData):
    print('getDummyData called')
    deferred = defer.Deferred()
    # 通过callLater来模拟一个会产生延时的函数
    reactor.callLater(2, deferred.callback, inputData * 3)
    return deferred

def cbPrintData(result):
  	# 用于处理上面的输出
    print('Result received: {}'.format(result))

deferred = getDummyData(3)
deferred.addCallback(cbPrintData)

# 设置结束
reactor.callLater(4, reactor.stop)
# start up the Twisted reactor (event loop handler) manually
print('Starting the reactor')
reactor.run()

twisted-1

  • 先从数据源获取数据,这个获取的方法会产生一个 Deferred 对象。
  • 然后开始回调。

关于回调函数

  1. 如果成功,则调用 .callback(result) 。如果失败,则 .errback(failure)
  2. 回调函数总是将上一个函数的返回作为下个函数的参数进行传递。
  3. 如果 callback 中产生异常,则切换到 errback 中。

对于一个正常的 python 处理异常:

try:
    # code that may throw an exception
    cookSpamAndEggs()
except (SpamException, EggException):
    # Handle SpamExceptions and EggExceptions
    ...

如果用twisted的异常链,应该是如下内容:

def errorHandler(failure):
    failure.trap(SpamException, EggException)
    # Handle SpamExceptions and EggExceptions

d.addCallback(cookSpamAndEggs)
d.addErrback(errorHandler)

如果 failure.trap(…) 没有匹配到异常,则会抛出另外的异常。

另一个需要注意的就是下面两种添加会掉链的方式可能有所不同。

# Case 1
d = getDeferredFromSomewhere()
d.addCallback(callback1)       # A
d.addErrback(errback1)         # B
d.addCallback(callback2)
d.addErrback(errback2)

# Case 2
d = getDeferredFromSomewhere()
d.addCallbacks(callback1, errback1)  # C
d.addCallbacks(callback2, errback2)

对于 case1 而言,在 callback1 发生异常时,errback1会被调用。 而对于 case2 而言,则errback2会被调用。

maybeDeferred

这个函数用来处理,当你不知道这个到底是 同步 还是 异步 时候使用。

# 同步的验证函数
def synchronousIsValidUser(user):
    '''
    Return true if user is a valid user, false otherwise
    '''
    return user in ["Alice", "Angus", "Agnes"]
from twisted.internet import reactor, defer
# 异步函数
def asynchronousIsValidUser(user):
    d = defer.Deferred()
    reactor.callLater(2, d.callback, user in ["Alice", "Angus", "Agnes"])
    return d
from twisted.internet import defer

# 返回结果
def printResult(result):
    if result:
        print("User is authenticated")
    else:
        print("User is not authenticated")

def authenticateUser(isValidUser, user):
    d = defer.maybeDeferred(isValidUser, user)
    d.addCallback(printResult)
# 通过同步验证
authenticateUser(synchronousIsValidUser,user)
# 通过异步验证
authenticateUser(asynchronousIsValidUser,user)

取消任务

动机

假如在请求某个连接时,一直在转圈圈,那么用户想停止请求它,就要取消。看一下下面模拟的例子:

import random
from twisted.internet import task

def f():
    return "Hopefully this will be called in 3 seconds or less"

def main(reactor):
    delay = random.uniform(1, 5)  # 随机等待 1-5 秒

    def called(result):
        print("{0} seconds later:".format(delay), result)

    d = task.deferLater(reactor, delay, f)
    d.addTimeout(3, reactor).addBoth(called)  # 如果超过3秒,就会打断上面的命令,抛出异常
    										  # 如果没有超过3秒,就不会打断上面命令

    return d

# f() will be timed out if the random delay is greater than 3 seconds
task.react(main)

DeferredList

# A callback that unpacks and prints the results of a DeferredList
def printResult(result):
    for (success, value) in result:
        if success:
            print('Success:', value)
        else:
            print('Failure:', value.getErrorMessage())

# Create three deferreds.
deferred1 = defer.Deferred()
deferred2 = defer.Deferred()
deferred3 = defer.Deferred()

# Pack them into a DeferredList
dl = defer.DeferredList([deferred1, deferred2, deferred3], consumeErrors=True)

# Add our callback
dl.addCallback(printResult)

# Fire our three deferreds with various values.
deferred1.callback('one')
deferred2.errback(Exception('bang!'))
deferred3.callback('three')

# At this point, dl will fire its callback, printing:
#    Success: one
#    Failure: bang!
#    Success: three
# (note that defer.SUCCESS == True, and defer.FAILURE == False)

如果在将Deferred添加到DeferredList之后向Deferred添加回调,则该回调返回的值将不会提供给DeferredList的回调。为避免混淆,我们建议在DeferredList中使用后,不要向Deferred添加回调

def printResult(result):
    print(result)

def addTen(result):
    return result + " ten"

# Deferred gets callback before DeferredList is created
deferred1 = defer.Deferred()
deferred2 = defer.Deferred()
deferred1.addCallback(addTen)
dl = defer.DeferredList([deferred1, deferred2])
dl.addCallback(printResult)
deferred1.callback("one") # fires addTen, checks DeferredList, stores "one ten"
deferred2.callback("two")
# At this point, dl will fire its callback, printing:
#     [(1, 'one ten'), (1, 'two')]

# Deferred gets callback after DeferredList is created
deferred1 = defer.Deferred()
deferred2 = defer.Deferred()
dl = defer.DeferredList([deferred1, deferred2])
deferred1.addCallback(addTen) # will run *after* DeferredList gets its value
dl.addCallback(printResult)
deferred1.callback("one") # checks DeferredList, stores "one", fires addTen
deferred2.callback("two")
# At this point, dl will fire its callback, printing:
#     [(1, 'one), (1, 'two')]
from twisted.internet import defer

d1 = defer.Deferred()
d2 = defer.Deferred()
d = defer.gatherResults([d1, d2], consumeErrors=True)

def cbPrintResult(result):
    print(result)

d.addCallback(cbPrintResult)

d1.callback("one")
# nothing is printed yet; d is still awaiting completion of d2
d2.callback("two")
# printResult prints ["one", "two"]

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

写给大家看的设计书(第3版)

写给大家看的设计书(第3版)

[美] Robin Williams / 苏金国、刘亮 / 人民邮电出版社 / 2009-1 / 49.00元

这本书出自一位世界级设计师之手。复杂的设计原理在书中凝炼为亲密性、对齐、重复和对比4 个基本原则。作者以其简洁明快的风格,将优秀设计所必须遵循的这4 个基本原则及其背后的原理通俗易懂地展现在读者面前。本书包含大量的示例,让你了解怎样才能按照自己的方式设计出美观且内容丰富的产品。 此书适用于各行各业需要从事设计工作的读者,也适用于有经验的设计人员。一起来看看 《写给大家看的设计书(第3版)》 这本书的介绍吧!

URL 编码/解码
URL 编码/解码

URL 编码/解码

MD5 加密
MD5 加密

MD5 加密工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具