SpringSession:存储机制设计

栏目: 数据库 · 发布时间: 5年前

内容简介:在之前的文章中已经对这里先来看下

在之前的文章中已经对 SpringSession 的功能结构,请求/响应重写等做了介绍。本文将继续来介绍下 SpringSession 中存储部分的设计。存储是分布式 session 中算是最核心的部分,通过引入三方的存储容器来实现 session 的存储,从而有效的解决 session 共享的问题。

1、SpringSession存储的顶级抽象接口

SpringSession 存储的顶级抽象接口是 org.springframework.session 包下的 SessionRepository 这个接口。 SessionRepository 的类图结构如下:

SpringSession:存储机制设计

这里先来看下 SessionRepository 这个顶层接口中定义了哪些方法:

public interface SessionRepository<S extends Session> {
    //创建一个session
	S createSession();
	//保存session
	void save(S session);
	//通过ID查找session
	S findById(String id);
	//通过ID删除一个session
	void deleteById(String id);
}
复制代码

从代码来看还是很简单的,就是增删查。下面看具体实现。在2.0版本开始 SpringSession 中也提供了一个和 SessionRepository 具体相同能力的 ReactiveSessionRepository ,用于支持响应式编程模式。

2、MapSessionRepository

基于HashMap实现的基于内存存储的存储器实现,这里就主要看下对于接口中几个方法的实现。

public class MapSessionRepository implements SessionRepository<MapSession> {
	private Integer defaultMaxInactiveInterval;
	private final Map<String, Session> sessions;
	//...
}
复制代码

可以看到就是一个 Map ,那后面关于增删查其实就是操作这个 Map 了。

createSession

@Override
public MapSession createSession() {
	MapSession result = new MapSession();
	if (this.defaultMaxInactiveInterval != null) {
		result.setMaxInactiveInterval(
			Duration.ofSeconds(this.defaultMaxInactiveInterval));
	}
	return result;
}
复制代码

这里很直接,就是 new 了一个 MapSession ,然后设置了 session 的有效期。

save

@Override
public void save(MapSession session) {
	if (!session.getId().equals(session.getOriginalId())) {
		this.sessions.remove(session.getOriginalId());
	}
	this.sessions.put(session.getId(), new MapSession(session));
}
复制代码

这里面先判断了 session 中的两个 ID ,一个 originalId ,一个当前 idoriginalId 是第一次生成 session 对象时创建的,后面都不会在变化。通过源码来看,对于 originalId ,只提供了 get 方法。对于 id 呢,其实是可以通过 changeSessionId 来改变的。

这里的这个操作实际上是一种优化行为,及时的清除掉老的 session 数据来释放内存空间。

findById

@Override
public MapSession findById(String id) {
	Session saved = this.sessions.get(id);
	if (saved == null) {
		return null;
	}
	if (saved.isExpired()) {
		deleteById(saved.getId());
		return null;
	}
	return new MapSession(saved);
}
复制代码

这个逻辑也很简单,先从 Map 中根据 id 取出 session 数据,如果没有就返回 null ,如果有则再判断下是否过期了,如果过期了就删除掉,然后返回 null 。如果查到了,并且没有过期的话,则构建一个 MapSession 返回。

OK,基于内存存储的实现系列就是这些了,下面继续来看其他存储的实现。

3、FindByIndexNameSessionRepository

FindByIndexNameSessionRepository 继承了 SessionRepository 接口,用于扩展对第三方存储的实现。

public interface FindByIndexNameSessionRepository<S extends Session>
		extends SessionRepository<S> {
		
	String PRINCIPAL_NAME_INDEX_NAME = FindByIndexNameSessionRepository.class.getName()
			.concat(".PRINCIPAL_NAME_INDEX_NAME");

	Map<String, S> findByIndexNameAndIndexValue(String indexName, String indexValue);

	default Map<String, S> findByPrincipalName(String principalName) {
		return findByIndexNameAndIndexValue(PRINCIPAL_NAME_INDEX_NAME, principalName);
	}
}
复制代码

FindByIndexNameSessionRepository 添加一个单独的方法为指定用户查询所有会话。这是通过设置名为 FindByIndexNameSessionRepository.PRINCIPAL_NAME_INDEX_NAMESession 的属性值为指定用户的 username 来完成的。开发人员有责任确保属性被赋值,因为 SpringSession 不会在意被使用的认证机制。官方文档中给出的例子如下:

String username = "username";
this.session.setAttribute(
	FindByIndexNameSessionRepository.PRINCIPAL_NAME_INDEX_NAME, username);
复制代码

FindByIndexNameSessionRepository 的一些实现会提供一些钩子自动的索引其他的 session 属性。比如,很多实现都会自动的确保当前的 Spring Security 用户名称可通过索引名称 FindByIndexNameSessionRepository.PRINCIPAL_NAME_INDEX_NAME 进行索引。一旦会话被索引,就可以通过下面的代码检索:

String username = "username";
Map<String, Session> sessionIdToSession = 
	this.sessionRepository.findByIndexNameAndIndexValue(
	FindByIndexNameSessionRepository.PRINCIPAL_NAME_INDEX_NAME,username);
复制代码

下图是 FindByIndexNameSessionRepository 接口的三个实现类:

SpringSession:存储机制设计

下面来分别分析下这三个存储的实现细节。

3.1 RedisOperationsSessionRepository

RedisOperationsSessionRepository 的类图结构如下, MessageListenerredis 消息订阅的监听接口。

SpringSession:存储机制设计

代码有点长,就不在这里面贴了,一些注释可以在这个 SpringSession中文分支 来看。这里还是主要来看下对于那几个方法的实现。

3.1.1 createSession

这里和 MapSessionRepository 的实现基本一样的,那区别就在于 Session 的封装模型不一样,这里是 RedisSession ,实际上 RedisSession 的实现是对 MapSession 又包了一层。下面会分析 RedisSession 这个类。

@Override
public RedisSession createSession() { 
    // RedisSession,这里和MapSession区别开
	RedisSession redisSession = new RedisSession();
	if (this.defaultMaxInactiveInterval != null) {
		redisSession.setMaxInactiveInterval(
				Duration.ofSeconds(this.defaultMaxInactiveInterval));
	}
	return redisSession;
}
复制代码

在看其他两个方法之前,先来看下 RedisSession 这个类。

3.1.2 RedisSession

这个在模型上是对 MapSession 的扩展,增加了 delta 这个东西。

final class RedisSession implements Session {
       // MapSession 实例对象,主要存数据的地方
		private final MapSession cached;
		// 原始最后访问时间
		private Instant originalLastAccessTime;
		private Map<String, Object> delta = new HashMap<>();
		// 是否是新的session对象
		private boolean isNew;
		// 原始主名称
		private String originalPrincipalName;
		// 原始sessionId
		private String originalSessionId;
复制代码

delta 是一个Map结构,那么这里面到底是放什么的呢?具体细节见 saveDelta 这个方法。 saveDelta 这个方法会在两个地方被调用,一个是下面要说道的 save 方法,另外一个是 flushImmediateIfNecessary 这个方法:

private void flushImmediateIfNecessary() {
	if (RedisOperationsSessionRepository.this.redisFlushMode == RedisFlushMode.IMMEDIATE) {
		saveDelta();
	}
}
复制代码

RedisFlushMode 提供了两种推送模式:

  • ON_SAVE:只有在调用 save 方法时执行,在 web 环境中这样做通常是尽快提交HTTP响应
  • IMMEDIATE:只要有变更就会直接写到 redis 中,不会像 ON_SAVE 一样,在最后 commit 时一次性写入

追踪 flushImmediateIfNecessary 方法调用链如下:

SpringSession:存储机制设计
那么到这里基本就清楚了,首先 save 这个方法,当主动调用 save 时就是将数据推到 redis 中去的,也就是 ON_SAVE 这种情况。那么对于 IMMEDIATE 这种情况,只有调用了上面的四个方法, SpringSession 才会将数据推送到 redis

所以 delta 里面存的是当前一些变更的 key-val 键值对象,而这些变更是由 setAttributeremoveAttributesetMaxInactiveIntervalInSecondssetLastAccessedTime 这四个方法触发的;比如 setAttribute(k,v) ,那么这个 k->v 就会被保存到 delta 里面。

3.1.3 save

在理解了 saveDelta 方法之后再来看 save 方法就简单多了。 save 对应的就是 RedisFlushMode.ON_SAVE

@Override
public void save(RedisSession session) {
   // 直接调用 saveDelta推数据到redis
	session.saveDelta();
	if (session.isNew()) {
	   // sessionCreatedKey->channl
		String sessionCreatedKey = getSessionCreatedChannel(session.getId());
		// 发布一个消息事件,新增 session,以供 MessageListener 回调处理。
		this.sessionRedisOperations.convertAndSend(sessionCreatedKey, session.delta);
		session.setNew(false);
	}
}
复制代码

3.1.4 findById

查询这部分和基于 Map 的差别比较大,因为这里并不是直接操作 Map ,而是与 Redis 进行一次交互。

@Override
public RedisSession findById(String id) {
	return getSession(id, false);
}
复制代码

调用 getSession 方法:

private RedisSession getSession(String id, boolean allowExpired) {
	// 根据ID从 redis 中取出数据
	Map<Object, Object> entries = getSessionBoundHashOperations(id).entries();
	if (entries.isEmpty()) {
		return null;
	}
	//转换成MapSession
	MapSession loaded = loadSession(id, entries);
	if (!allowExpired && loaded.isExpired()) {
		return null;
	}
	//转换成RedisSession
	RedisSession result = new RedisSession(loaded);
	result.originalLastAccessTime = loaded.getLastAccessedTime();
	return result;
}
复制代码

loadSession 中构建 MapSession

private MapSession loadSession(String id, Map<Object, Object> entries) {
   // 生成MapSession实例
	MapSession loaded = new MapSession(id);
	//遍历数据
	for (Map.Entry<Object, Object> entry : entries.entrySet()) {
		String key = (String) entry.getKey();
		if (CREATION_TIME_ATTR.equals(key)) {
		    // 设置创建时间
			loaded.setCreationTime(Instant.ofEpochMilli((long) entry.getValue()));
		}
		else if (MAX_INACTIVE_ATTR.equals(key)) {
			 // 设置最大有效时间
			loaded.setMaxInactiveInterval(Duration.ofSeconds((int) entry.getValue()));
		}
		else if (LAST_ACCESSED_ATTR.equals(key)) {
			// 设置最后访问时间
			loaded.setLastAccessedTime(Instant.ofEpochMilli((long) entry.getValue()));
		}
		else if (key.startsWith(SESSION_ATTR_PREFIX)) {
		// 设置属性
			loaded.setAttribute(key.substring(SESSION_ATTR_PREFIX.length()),
					entry.getValue());
		}
	}
	return loaded;
}
复制代码

3.1.5 deleteById

根据 sessionId 删除 session 数据。具体过程看代码注释。

@Override
public void deleteById(String sessionId) {
   // 获取 RedisSession
	RedisSession session = getSession(sessionId, true);
	if (session == null) {
		return;
	}
   // 清楚当前session数据的索引
	cleanupPrincipalIndex(session);
	//执行删除操作
	this.expirationPolicy.onDelete(session);
	String expireKey = getExpiredKey(session.getId());
	//删除expireKey
	this.sessionRedisOperations.delete(expireKey);
	//session有效期设置为0
	session.setMaxInactiveInterval(Duration.ZERO);
	save(session);
}
复制代码

3.1.6 onMessage

最后来看下这个订阅回调处理。这里看下核心的一段逻辑:

boolean isDeleted = channel.equals(this.sessionDeletedChannel);
// Deleted 还是 Expired ?
if (isDeleted || channel.equals(this.sessionExpiredChannel)) {
	// 此处省略无关代码
	// Deleted
	if (isDeleted) {
	   // 发布一个 SessionDeletedEvent 事件
		handleDeleted(session);
	}
	// Expired
	else {
		// 发布一个 SessionExpiredEvent 事件
		handleExpired(session);
	}
}
复制代码

3.2 Redis 存储的一些思考

首先按照我们自己常规的思路来设计的话,我们会怎么来考虑这个事情。这里首先要声明下,我对 Redis 这个东西不是很熟,没有做过深入的研究;那如果是我来做,可能也就仅仅限于存储。

  • findByIndexNameAndIndexValue 的设计,这个的作用是通过 indexNameindexValue 来返回当前用户的所有会话。但是这里需要考虑的一个事情是,通常情况下,一个用户只会关联到一个会话上面去,那这种设计很显然,我的理解是为了支持单用户多会话的场景。
    • indexName:FindByIndexNameSessionRepository.PRINCIPAL_NAME_INDEX_NAME
    • indexValue:username
  • 实现 MessageListener 接口,增加事件通知能力。通过监听这些事件,可以做一些 session 操作管控。但是实际上 SpringSession 中并没有做任何事情,从代码来看, publishEvent 方法是空实现。等待回复中 #issue 1287
private ApplicationEventPublisher eventPublisher = new ApplicationEventPublisher() {
	@Override
	public void publishEvent(ApplicationEvent event) {
	}
	@Override
	public void publishEvent(Object event) {
	}
};
复制代码
  • RedisFlushModeSpringSession 中提供了两种模式的推送,一种是 ON_SAVE ,另外一种是 IMMEDIATE 。默认是 ON_SAVE ,也就是常规的在请求处理结束时进行一次 sessionCommit 操作。 RedisFlushMode 的设计感觉是为 session 数据持久化的时机提供了另外一种思路。

小结

存储机制设计部分就一基于内存和基于 Redis 两种来分析;另外基于 jdbchazelcast 有兴趣的同学可以自己查看源码。

最后也欢迎访问我的个人博客:www.glmapper.com

参考


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

参与的胜利

参与的胜利

亨利·詹金斯 / 高芳芳 / 浙江大学出版社 / 2017-9-30 / CNY 42.00

《参与的胜利:网络时代的参与文化》是一场学者之间的对话,三位学者(亨利·詹金斯、伊藤瑞子和丹娜·博伊德)虽然来自不同的代际、不同的学科背景,但他们在相同的参与文化项目中展开合作,并试图解决相似的问题。 希望《参与的胜利:网络时代的参与文化》能够进一步激发团体内部及团体之间的对话,这些团体包括教育者、政策制定者、学者、关注参与文化的公民、业内人士、粉丝及其他任何关心我们文化的未来的人。理想的参......一起来看看 《参与的胜利》 这本书的介绍吧!

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具