内容简介:看源代码,解析一次完整的通过上图,我们至少要知道两件事:下面来一一解析。
看源代码,解析一次完整的 public channel
下发流程。
此图来自网上,如有侵权,通知我删除
通过上图,我们至少要知道两件事:
Socket.io Server Brocadcasted Data
下面来一一解析。
BroadcastServiceProvider
BroadcastServiceProvider
主要包含了 Broadcast
相关的五个驱动器、 Broadcast
事件、 Broadcast
队列等方法,比较简单就不在解析了,今天主要说说怎么通过 redis
来驱动 Broadcast
的。
首先还是简单配置下 Broadcast
的 config
:
// broadcasting.php <?php return [ /* |-------------------------------------------------------------------------- | Default Broadcaster |-------------------------------------------------------------------------- | | This option controls the default broadcaster that will be used by the | framework when an event needs to be broadcast. You may set this to | any of the connections defined in the "connections" array below. | | Supported: "pusher", "redis", "log", "null" | */ 'default' => env('BROADCAST_DRIVER', 'null'), /* |-------------------------------------------------------------------------- | Broadcast Connections |-------------------------------------------------------------------------- | | Here you may define all of the broadcast connections that will be used | to broadcast events to other systems or over websockets. Samples of | each available type of connection are provided inside this array. | */ 'connections' => [ 'pusher' => [ 'driver' => 'pusher', 'key' => env('PUSHER_APP_KEY'), 'secret' => env('PUSHER_APP_SECRET'), 'app_id' => env('PUSHER_APP_ID'), 'options' => [ // ], ], 'redis' => [ 'driver' => 'redis', 'connection' => 'default', ], 'log' => [ 'driver' => 'log', ], 'null' => [ 'driver' => 'null', ], ], ]; // .env BROADCAST_DRIVER=redis REDIS_HOST=redis REDIS_PASSWORD=null REDIS_PORT=6379 复制代码
之前了解过 Laravel 的 ServiceProvider
的工作原理,所以我们就不用赘述太多这方面的流程了,我们主要看看 BroadcastServiceProvider
的注册方法:
public function register() { $this->app->singleton(BroadcastManager::class, function ($app) { return new BroadcastManager($app); }); $this->app->singleton(BroadcasterContract::class, function ($app) { return $app->make(BroadcastManager::class)->connection(); }); $this->app->alias( BroadcastManager::class, BroadcastingFactory::class ); } 复制代码
我们写一个发送 Broadcast
demo:
// routes/console.php Artisan::command('public_echo', function () { event(new RssPublicEvent()); })->describe('echo demo'); // app/Events/RssPublicEvent.php <?php namespace AppEvents; use CarbonCarbon; use IlluminateBroadcastingChannel; use IlluminateQueueSerializesModels; use IlluminateBroadcastingPrivateChannel; use IlluminateBroadcastingPresenceChannel; use IlluminateFoundationEventsDispatchable; use IlluminateBroadcastingInteractsWithSockets; use IlluminateContractsBroadcastingShouldBroadcast; class RssPublicEvent implements ShouldBroadcast { use Dispatchable, InteractsWithSockets, SerializesModels; /** * Create a new event instance. * * @return void */ public function __construct() { // } /** * Get the channels the event should broadcast on. * * @return IlluminateBroadcastingChannel|array */ public function broadcastOn() { return new Channel('public_channel'); } /** * 指定广播数据。 * * @return array */ public function broadcastWith() { // 返回当前时间 return ['name' => 'public_channel_'.Carbon::now()->toDateTimeString()]; } } 复制代码
有了这下发 Event
,我们看看它是怎么执行的,主要看 BroadcastEvent
的 handle
方法:
public function handle(Broadcaster $broadcaster) { // 主要看,有没有自定义该 Event 名称,没有的话,直接使用类名 $name = method_exists($this->event, 'broadcastAs') ? $this->event->broadcastAs() : get_class($this->event); $broadcaster->broadcast( Arr::wrap($this->event->broadcastOn()), $name, $this->getPayloadFromEvent($this->event) ); } 复制代码
先看怎么获取参数的 $this->getPayloadFromEvent($this->event)
:
protected function getPayloadFromEvent($event) { if (method_exists($event, 'broadcastWith')) { return array_merge( $event->broadcastWith(), ['socket' => data_get($event, 'socket')] ); } $payload = []; foreach ((new ReflectionClass($event))->getProperties(ReflectionProperty::IS_PUBLIC) as $property) { $payload[$property->getName()] = $this->formatProperty($property->getValue($event)); } unset($payload['broadcastQueue']); return $payload; } 复制代码
主要传入我们自定义的数组,见函数 $event->broadcastWith()
、['socket' => data_get($event, 'socket')] 和 Event
中定义的所有 public
属性。
最后就是执行方法了:
$broadcaster->broadcast( Arr::wrap($this->event->broadcastOn()), $name, $this->getPayloadFromEvent($this->event) ); 复制代码
看上面的例子, $this->event->broadcastOn()
对应的是:
return new Channel('public_channel'); 复制代码
好了,该是看看接口 Broadcaster
了。
<?php namespace IlluminateContractsBroadcasting; interface Broadcaster { /** * Authenticate the incoming request for a given channel. * * @param IlluminateHttpRequest $request * @return mixed */ public function auth($request); /** * Return the valid authentication response. * * @param IlluminateHttpRequest $request * @param mixed $result * @return mixed */ public function validAuthenticationResponse($request, $result); /** * Broadcast the given event. * * @param array $channels * @param string $event * @param array $payload * @return void */ public function broadcast(array $channels, $event, array $payload = []); } 复制代码
这里主要提供三个函数,我们暂时看目前最关心的 broadcast()
,通过「PhpStorm」IDE,我们也能看出,继承这个接口的,主要就是平台 config
配置提供的几个驱动器:
我们开始往下走,看 redis
驱动器:
public function broadcast(array $channels, $event, array $payload = []) { $connection = $this->redis->connection($this->connection); $payload = json_encode([ 'event' => $event, 'data' => $payload, 'socket' => Arr::pull($payload, 'socket'), ]); foreach ($this->formatChannels($channels) as $channel) { $connection->publish($channel, $payload); } } 复制代码
这就简单的,无非就是创建 redis
连接,然后将数据 (包含 event
、 data
和 socket
构成的数组),利用 redis publish
出去,等着 laravel-echo-server
监听接收!
注:redis 有发布 ( publish
),就会有订阅,如: Psubscribe
。
好了,我们开始研究 laravel-echo-server
,看它怎么订阅的。
laravel-echo-server
在 Laravel 项目没有专门提供该 Server,很多项目都是使用 tlaverdure/laravel-echo-server
( github.com/tlaverdure/… ),其中我们的偶像 Laradock
也集成了该工具。
所以我们就拿 Laradock
配置来说一说。
. |____Dockerfile |____laravel-echo-server.json |____package.json 复制代码
主要包含三个文件,一个 Dockerfile 文件,用来创建容器; package.json
主要是安装 tlaverdure/laravel-echo-server
插件; laravel-echo-server.json
文件就是与 Laravel 交互的配置文件。
看看 Dockfile 内容:
FROM node:alpine RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories # Create app directory RUN mkdir -p /usr/src/app WORKDIR /usr/src/app # Install app dependencies COPY package.json /usr/src/app/ RUN apk add --update \n python \n python-dev \n py-pip \n build-base RUN npm install # Bundle app source COPY laravel-echo-server.json /usr/src/app/laravel-echo-server.json EXPOSE 3000 CMD [ "npm", "start" ] 复制代码
主要是以 node:alpine
为底,将项目部署在路径 /usr/src/app/
,执行命令 npm install
安装插件,参考文件 package.json
:
{ "name": "laravel-echo-server-docker", "description": "Docker container for running laravel-echo-server", "version": "0.0.1", "license": "MIT", "dependencies": { "laravel-echo-server": "^1.3.9" }, "scripts": { "start": "laravel-echo-server start" } } 复制代码
然后,在将配置文件加载进该路径下,最后执行 npm start
,也就是执行命令 laravel-echo-server start
,并且放出 3000 端口。
我们通过启动容器,然后进入容器看看文件结构:
执行 docker-compose up laravel-echo-server
后就可以看到 server
启动:
同样的,我们也可以下载它的源代码,来运行达到效果。
tlaverdure/laravel-echo-server
Laravel Echo Node JS Server forSocket.io
下载源代码:
git clone https://github.com/tlaverdure/laravel-echo-server.git 复制代码
进入项目安装插件:
npm install 复制代码
执行后,直接生成 dist
文件夹:
. |____api | |____http-api.js | |____index.js |____channels | |____channel.js | |____index.js | |____presence-channel.js | |____private-channel.js |____cli | |____cli.js | |____index.js |____database | |____database-driver.js | |____database.js | |____index.js | |____redis.js | |____sqlite.js |____echo-server.js |____index.js |____log.js |____server.js |____subscribers | |____http-subscriber.js | |____index.js | |____redis-subscriber.js | |____subscriber.js 复制代码
通过提供的 example
可以知道执行的入口在于 EchoServer
的 run
方法,简单修改下 options
配置:
var echo = require('../dist/index.js'); var options = { "authHost": "http://lrss.learning.test", "authEndpoint": "/broadcasting/auth", "clients": [], "database": "redis", "databaseConfig": { "redis": { "port": "63794", "host": "0.0.0.0" } }, "devMode": true, "host": null, "port": "6001", "protocol": "http", "socketio": {}, "sslCertPath": "", "sslKeyPath": "" }; echo.run(options); 复制代码
测试一下看看,是否和 Laravel 服务连接到位:
看 Laravel-echo-server
打印结果:
说明连接上了。
刚才的 dist
文件夹是通过 TypeScript
生成的结果,当然,我们需要通过它的源代码来解读:
. |____api | |____http-api.ts | |____index.ts |____channels | |____channel.ts | |____index.ts | |____presence-channel.ts | |____private-channel.ts |____cli | |____cli.ts | |____index.ts |____database | |____database-driver.ts | |____database.ts | |____index.ts | |____redis.ts | |____sqlite.ts |____echo-server.ts |____index.ts |____log.ts |____server.ts |____subscribers | |____http-subscriber.ts | |____index.ts | |____redis-subscriber.ts | |____subscriber.ts 复制代码
主要包含:接口 ( api
)、频道 ( channels
)、 数据库 ( database
)、订阅 ( subscribers
) 等,我们会一个个来说的。
我们先看 echo-server.ts
的 listen
函数:
/** * Listen for incoming event from subscibers. * * @return {void} */ listen(): Promise<any> { return new Promise((resolve, reject) => { let http = this.httpSub.subscribe((channel, message) => { return this.broadcast(channel, message); }); let redis = this.redisSub.subscribe((channel, message) => { return this.broadcast(channel, message); }); Promise.all([http, redis]).then(() => resolve()); }); } 复制代码
我们主要看 this.redisSub.subscribe()
无非就是通过 redis
订阅,然后再把 channel
和 message
广播出去,好了,我们看看怎么做到订阅的,看 redis-subscriber
的 subscribe()
函数:
/** * Subscribe to events to broadcast. * * @return {Promise<any>} */ subscribe(callback): Promise<any> { return new Promise((resolve, reject) => { this._redis.on('pmessage', (subscribed, channel, message) => { try { message = JSON.parse(message); if (this.options.devMode) { Log.info("Channel: " + channel); Log.info("Event: " + message.event); } callback(channel, message); } catch (e) { if (this.options.devMode) { Log.info("No JSON message"); } } }); this._redis.psubscribe('*', (err, count) => { if (err) { reject('Redis could not subscribe.') } Log.success('Listening for redis events...'); resolve(); }); }); } 复制代码
这里我们就可以看到之前提到的 redis
订阅函数了:
this._redis.psubscribe('*', (err, count) => { if (err) { reject('Redis could not subscribe.') } Log.success('Listening for redis events...'); resolve(); }); 复制代码
好了,只要获取信息,就可以广播出去了:
this._redis.on('pmessage', (subscribed, channel, message) => { try { message = JSON.parse(message); if (this.options.devMode) { Log.info("Channel: " + channel); Log.info("Event: " + message.event); } // callback(channel, message); // return this.broadcast(channel, message); if (message.socket && this.find(message.socket)) { this.server.io.sockets.connected[message.socket](channel) .emit(message.event, channel, message.data); return true } else { this.server.io.to(channel) .emit(message.event, channel, message.data); return true } } catch (e) { if (this.options.devMode) { Log.info("No JSON message"); } } }); 复制代码
到此,我们已经知道 Laravel 是怎么和 Laravel-echo-server
利用 redis
订阅和发布消息的。同时,也知道是用 socket.io
和前端 emit/on
交互的。
下面我们看看前端是怎么接收消息的。
laravel-echo
前端需要安装两个插件: laravel-echo
和 socket.io-client
,除了做配置外,监听一个公开的 channel
,写法还是比较简单的:
window.Echo.channel('public_channel') .listen('RssPublicEvent', (e) => { that.names.push(e.name) }); 复制代码
达到的效果就是,只要接收到服务器发出的在公开频道 public_channel
的事件 RssPublicEvent
,就会把消息内容显示出来:
我们开始看看这个 Laravel-echo
源代码了:
先看配置信息:
window.Echo = new Echo({ broadcaster: 'socket.io', host: window.location.hostname + ':6001', auth: { headers: { 'authorization': 'Bearer ' + store.getters.token } } }); 复制代码
配置的 broadcaster
是: socket.io
,所有用的是:
// echo.ts constructor(options: any) { this.options = options; if (typeof Vue === 'function' && Vue.http) { this.registerVueRequestInterceptor(); } if (typeof axios === 'function') { this.registerAxiosRequestInterceptor(); } if (typeof jQuery === 'function') { this.registerjQueryAjaxSetup(); } if (this.options.broadcaster == 'pusher') { this.connector = new PusherConnector(this.options); } else if (this.options.broadcaster == 'socket.io') { this.connector = new SocketIoConnector(this.options); } else if (this.options.broadcaster == 'null') { this.connector = new NullConnector(this.options); } } 复制代码
接着看 channel
函数:
// echo.ts channel(channel: string): Channel { return this.connector.channel(channel); } // socketio-connector.ts channel(name: string): SocketIoChannel { if (!this.channels[name]) { this.channels[name] = new SocketIoChannel( this.socket, name, this.options ); } return this.channels[name]; } 复制代码
主要是创建 SocketIoChannel
,我们看看怎么做 listen
:
// socketio-connector.ts listen(event: string, callback: Function): SocketIoChannel { this.on(this.eventFormatter.format(event), callback); return this; } 复制代码
继续看 on()
on(event: string, callback: Function): void { let listener = (channel, data) => { if (this.name == channel) { callback(data); } }; this.socket.on(event, listener); this.bind(event, listener); } 复制代码
到这就比较清晰了,只用利用 this.socket.on(event, listener);
注:更多有关 socketio/socket.io-client
,可以看官网: github.com/socketio/so…
总结
到目前为止,通过解读这几个插件和源代码,我们基本跑通了一个 public channel
流程。
这过程主要参考:
下一步主要看看怎么解析一个 private channel
?
看完 public channel
的流程,我们该来说说怎么跑通 private channel
了。
本文结合之前使用的 JWT
来做身份认证。
但这个流程,我们要先从前端说起。
socker.io
我们先写一个 demo:
window.Echo.private('App.User.3') .listen('RssCreatedEvent', (e) => { that.names.push(e.name) }); 复制代码
先创建 private channel
:
/** * Get a private channel instance by name. * * @param {string} name * @return {SocketIoChannel} */ privateChannel(name: string): SocketIoPrivateChannel { if (!this.channels['private-' + name]) { this.channels['private-' + name] = new SocketIoPrivateChannel( this.socket, 'private-' + name, this.options ); } return this.channels['private-' + name]; } 复制代码
它与 public channel
的区别在于为 private channel
的 channel
名前头增加 private-
。
接着我们需要为每次请求添加认证信息 headers
:
window.Echo = new Echo({ broadcaster: 'socket.io', host: window.location.hostname + ':6001', auth: { headers: { 'authorization': 'Bearer ' + store.getters.token } } }); 复制代码
这里,我们用 store.getters.token
存储着 jwt
登录后下发的认证 token
。
好了,只要创新页面,就会先往 Laravel-echo-server
发送一个 subscribe
事件:
/** * Subscribe to a Socket.io channel. * * @return {object} */ subscribe(): any { this.socket.emit('subscribe', { channel: this.name, auth: this.options.auth || {} }); } 复制代码
我们来看看 Laravel-echo-server
怎么接收到这个事件,并把 auth
,也就是 jwt token
发到后台的?在研究怎么发之前,我们还是先把 Laravel 的 private channel Event
建好。
RssCreatedEvent
我们创建 Laravel PrivateChannel
:
// RssCreatedEvent <?php namespace AppEvents; use AppUser; use CarbonCarbon; use IlluminateBroadcastingChannel; use IlluminateQueueSerializesModels; use IlluminateBroadcastingPrivateChannel; use IlluminateBroadcastingPresenceChannel; use IlluminateFoundationEventsDispatchable; use IlluminateBroadcastingInteractsWithSockets; use IlluminateContractsBroadcastingShouldBroadcast; class RssCreatedEvent implements ShouldBroadcast { use Dispatchable, InteractsWithSockets, SerializesModels; /** * Create a new event instance. * * @return void */ public function __construct() { } /** * Get the channels the event should broadcast on. * * @return IlluminateBroadcastingChannel|array */ public function broadcastOn() { // 14. 创建频道 info('broadcastOn'); return new PrivateChannel('App.User.3'); } /** * 指定广播数据。 * * @return array */ public function broadcastWith() { // 返回当前时间 return ['name' => 'private_channel_'.Carbon::now()->toDateTimeString()]; } } // routes/console.php Artisan::command('echo', function () { event(new RssCreatedEvent()); })->describe('echo demo'); 复制代码
与 jwt 结合
修改 BroadcastServiceprovider
的认证路由为 api:
// 修改前 // Broadcast::routes(); // 修改后 Broadcast::routes(["middleware" => "auth:api"]); 复制代码
当然,我们的认证方式也已经改成 JWT 方式了:
<?php return [ /* |-------------------------------------------------------------------------- | Authentication Defaults |-------------------------------------------------------------------------- | | This option controls the default authentication "guard" and password | reset options for your application. You may change these defaults | as required, but they're a perfect start for most applications. | */ 'defaults' => [ 'guard' => 'api', 'passwords' => 'users', ], ... 'guards' => [ 'web' => [ 'driver' => 'session', 'provider' => 'users', ], 'api' => [ 'driver' => 'jwt', 'provider' => 'users', ], ], 复制代码
最后,别忘了把 BroadcastServiceprovider
加入 app.config
中。
注:更多有关 JWT
欢迎查看之前的文章
- 《学习 Lumen 用户认证 (一)》
- 学习 Lumen 用户认证 (二) —— 使用 jwt-auth 插件
Laravel-echo-server
有了前端和后台的各自 private channel
,那必然需要用 Laravel-echo-server
来衔接。
先说回怎么接收前端发过来的 subscribe
事件和 token
。
首先看 echo-server
初始化:
init(io: any): Promise<any> { return new Promise((resolve, reject) => { this.channel = new Channel(io, this.options); this.redisSub = new RedisSubscriber(this.options); this.httpSub = new HttpSubscriber(this.server.express, this.options); this.httpApi = new HttpApi(io, this.channel, this.server.express, this.options.apiOriginAllow); this.httpApi.init(); this.onConnect(); this.listen().then(() => resolve(), err => Log.error(err)); }); } 复制代码
其中, this.onConnect()
:
onConnect(): void { this.server.io.on('connection', socket => { this.onSubscribe(socket); this.onUnsubscribe(socket); this.onDisconnecting(socket); this.onClientEvent(socket); }); } 复制代码
主要注册了四个事件,第一个就是我们需要关注的:
onSubscribe(socket: any): void { socket.on('subscribe', data => { this.channel.join(socket, data); }); } 复制代码
这就和前端呼应上了,接着看 join
函数:
join(socket, data): void { if (data.channel) { if (this.isPrivate(data.channel)) { this.joinPrivate(socket, data); } else { socket.join(data.channel); this.onJoin(socket, data.channel); } } } 复制代码
看 isPrivate()
函数:
/** * Channels and patters for private channels. * * @type {array} */ protected _privateChannels: string[] = ['private-*', 'presence-*']; /** * Check if the incoming socket connection is a private channel. * * @param {string} channel * @return {boolean} */ isPrivate(channel: string): boolean { let isPrivate = false; this._privateChannels.forEach(privateChannel => { let regex = new RegExp(privateChannel.replace('*', '.*')); if (regex.test(channel)) isPrivate = true; }); return isPrivate; } 复制代码
这也是印证了,为什么 private channel
要以 private-
开头了。接着看代码:
/** * Join private channel, emit data to presence channels. * * @param {object} socket * @param {object} data * @return {void} */ joinPrivate(socket: any, data: any): void { this.private.authenticate(socket, data).then(res => { socket.join(data.channel); if (this.isPresence(data.channel)) { var member = res.channel_data; try { member = JSON.parse(res.channel_data); } catch (e) { } this.presence.join(socket, data.channel, member); } this.onJoin(socket, data.channel); }, error => { if (this.options.devMode) { Log.error(error.reason); } this.io.sockets.to(socket.id) .emit('subscription_error', data.channel, error.status); }); } 复制代码
就因为是 private channel
,所以需要走认证流程:
/** * Send authentication request to application server. * * @param {any} socket * @param {any} data * @return {Promise<any>} */ authenticate(socket: any, data: any): Promise<any> { let options = { url: this.authHost(socket) + this.options.authEndpoint, form: { channel_name: data.channel }, headers: (data.auth && data.auth.headers) ? data.auth.headers : {}, rejectUnauthorized: false }; return this.serverRequest(socket, options); } /** * Send a request to the server. * * @param {any} socket * @param {any} options * @return {Promise<any>} */ protected serverRequest(socket: any, options: any): Promise<any> { return new Promise<any>((resolve, reject) => { options.headers = this.prepareHeaders(socket, options); let body; this.request.post(options, (error, response, body, next) => { if (error) { if (this.options.devMode) { Log.error(`[${new Date().toLocaleTimeString()}] - Error authenticating ${socket.id} for ${options.form.channel_name}`); Log.error(error); } reject({ reason: 'Error sending authentication request.', status: 0 }); } else if (response.statusCode !== 200) { if (this.options.devMode) { Log.warning(`[${new Date().toLocaleTimeString()}] - ${socket.id} could not be authenticated to ${options.form.channel_name}`); Log.error(response.body); } reject({ reason: 'Client can not be authenticated, got HTTP status ' + response.statusCode, status: response.statusCode }); } else { if (this.options.devMode) { Log.info(`[${new Date().toLocaleTimeString()}] - ${socket.id} authenticated for: ${options.form.channel_name}`); } try { body = JSON.parse(response.body); } catch (e) { body = response.body } resolve(body); } }); }); } 复制代码
到此,相信你就看的出来了,会把前端发过来的 auth.headers
加入发往后台的请求中。
测试
好了,我们测试下,先刷新页面,加入 private channel
中,
然后在后台,发一个事件出来,看前端是不是可以接收
以上所述就是小编给大家介绍的《深入浅出 Laravel Echo》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 【1】JavaScript 基础深入——数据类型深入理解与总结
- 深入理解 Java 函数式编程,第 5 部分: 深入解析 Monad
- 深入理解 HTTPS
- 深入理解 HTTPS
- 深入浅出Disruptor
- 深入了解 JSONP
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Web Analytics 2.0
Avinash Kaushik / Sybex / 2009-10-26 / USD 39.99
The bestselling book Web Analytics: An Hour A Day was the first book in the analytics space to move beyond clickstream analysis. Web Analytics 2.0 will significantly evolve the approaches from the fir......一起来看看 《Web Analytics 2.0》 这本书的介绍吧!