SpringSession:存储机制设计

栏目: 数据库 · 发布时间: 6年前

内容简介:在之前的文章中已经对这里先来看下

在之前的文章中已经对 SpringSession 的功能结构,请求/响应重写等做了介绍。本文将继续来介绍下 SpringSession 中存储部分的设计。存储是分布式 session 中算是最核心的部分,通过引入三方的存储容器来实现 session 的存储,从而有效的解决 session 共享的问题。

1、SpringSession存储的顶级抽象接口

SpringSession 存储的顶级抽象接口是 org.springframework.session 包下的 SessionRepository 这个接口。 SessionRepository 的类图结构如下:

SpringSession:存储机制设计

这里先来看下 SessionRepository 这个顶层接口中定义了哪些方法:

public interface SessionRepository<S extends Session> {
    //创建一个session
	S createSession();
	//保存session
	void save(S session);
	//通过ID查找session
	S findById(String id);
	//通过ID删除一个session
	void deleteById(String id);
}
复制代码

从代码来看还是很简单的,就是增删查。下面看具体实现。在2.0版本开始 SpringSession 中也提供了一个和 SessionRepository 具体相同能力的 ReactiveSessionRepository ,用于支持响应式编程模式。

2、MapSessionRepository

基于HashMap实现的基于内存存储的存储器实现,这里就主要看下对于接口中几个方法的实现。

public class MapSessionRepository implements SessionRepository<MapSession> {
	private Integer defaultMaxInactiveInterval;
	private final Map<String, Session> sessions;
	//...
}
复制代码

可以看到就是一个 Map ,那后面关于增删查其实就是操作这个 Map 了。

createSession

@Override
public MapSession createSession() {
	MapSession result = new MapSession();
	if (this.defaultMaxInactiveInterval != null) {
		result.setMaxInactiveInterval(
			Duration.ofSeconds(this.defaultMaxInactiveInterval));
	}
	return result;
}
复制代码

这里很直接,就是 new 了一个 MapSession ,然后设置了 session 的有效期。

save

@Override
public void save(MapSession session) {
	if (!session.getId().equals(session.getOriginalId())) {
		this.sessions.remove(session.getOriginalId());
	}
	this.sessions.put(session.getId(), new MapSession(session));
}
复制代码

这里面先判断了 session 中的两个 ID ,一个 originalId ,一个当前 idoriginalId 是第一次生成 session 对象时创建的,后面都不会在变化。通过源码来看,对于 originalId ,只提供了 get 方法。对于 id 呢,其实是可以通过 changeSessionId 来改变的。

这里的这个操作实际上是一种优化行为,及时的清除掉老的 session 数据来释放内存空间。

findById

@Override
public MapSession findById(String id) {
	Session saved = this.sessions.get(id);
	if (saved == null) {
		return null;
	}
	if (saved.isExpired()) {
		deleteById(saved.getId());
		return null;
	}
	return new MapSession(saved);
}
复制代码

这个逻辑也很简单,先从 Map 中根据 id 取出 session 数据,如果没有就返回 null ,如果有则再判断下是否过期了,如果过期了就删除掉,然后返回 null 。如果查到了,并且没有过期的话,则构建一个 MapSession 返回。

OK,基于内存存储的实现系列就是这些了,下面继续来看其他存储的实现。

3、FindByIndexNameSessionRepository

FindByIndexNameSessionRepository 继承了 SessionRepository 接口,用于扩展对第三方存储的实现。

public interface FindByIndexNameSessionRepository<S extends Session>
		extends SessionRepository<S> {
		
	String PRINCIPAL_NAME_INDEX_NAME = FindByIndexNameSessionRepository.class.getName()
			.concat(".PRINCIPAL_NAME_INDEX_NAME");

	Map<String, S> findByIndexNameAndIndexValue(String indexName, String indexValue);

	default Map<String, S> findByPrincipalName(String principalName) {
		return findByIndexNameAndIndexValue(PRINCIPAL_NAME_INDEX_NAME, principalName);
	}
}
复制代码

FindByIndexNameSessionRepository 添加一个单独的方法为指定用户查询所有会话。这是通过设置名为 FindByIndexNameSessionRepository.PRINCIPAL_NAME_INDEX_NAMESession 的属性值为指定用户的 username 来完成的。开发人员有责任确保属性被赋值,因为 SpringSession 不会在意被使用的认证机制。官方文档中给出的例子如下:

String username = "username";
this.session.setAttribute(
	FindByIndexNameSessionRepository.PRINCIPAL_NAME_INDEX_NAME, username);
复制代码

FindByIndexNameSessionRepository 的一些实现会提供一些钩子自动的索引其他的 session 属性。比如,很多实现都会自动的确保当前的 Spring Security 用户名称可通过索引名称 FindByIndexNameSessionRepository.PRINCIPAL_NAME_INDEX_NAME 进行索引。一旦会话被索引,就可以通过下面的代码检索:

String username = "username";
Map<String, Session> sessionIdToSession = 
	this.sessionRepository.findByIndexNameAndIndexValue(
	FindByIndexNameSessionRepository.PRINCIPAL_NAME_INDEX_NAME,username);
复制代码

下图是 FindByIndexNameSessionRepository 接口的三个实现类:

SpringSession:存储机制设计

下面来分别分析下这三个存储的实现细节。

3.1 RedisOperationsSessionRepository

RedisOperationsSessionRepository 的类图结构如下, MessageListenerredis 消息订阅的监听接口。

SpringSession:存储机制设计

代码有点长,就不在这里面贴了,一些注释可以在这个 SpringSession中文分支 来看。这里还是主要来看下对于那几个方法的实现。

3.1.1 createSession

这里和 MapSessionRepository 的实现基本一样的,那区别就在于 Session 的封装模型不一样,这里是 RedisSession ,实际上 RedisSession 的实现是对 MapSession 又包了一层。下面会分析 RedisSession 这个类。

@Override
public RedisSession createSession() { 
    // RedisSession,这里和MapSession区别开
	RedisSession redisSession = new RedisSession();
	if (this.defaultMaxInactiveInterval != null) {
		redisSession.setMaxInactiveInterval(
				Duration.ofSeconds(this.defaultMaxInactiveInterval));
	}
	return redisSession;
}
复制代码

在看其他两个方法之前,先来看下 RedisSession 这个类。

3.1.2 RedisSession

这个在模型上是对 MapSession 的扩展,增加了 delta 这个东西。

final class RedisSession implements Session {
       // MapSession 实例对象,主要存数据的地方
		private final MapSession cached;
		// 原始最后访问时间
		private Instant originalLastAccessTime;
		private Map<String, Object> delta = new HashMap<>();
		// 是否是新的session对象
		private boolean isNew;
		// 原始主名称
		private String originalPrincipalName;
		// 原始sessionId
		private String originalSessionId;
复制代码

delta 是一个Map结构,那么这里面到底是放什么的呢?具体细节见 saveDelta 这个方法。 saveDelta 这个方法会在两个地方被调用,一个是下面要说道的 save 方法,另外一个是 flushImmediateIfNecessary 这个方法:

private void flushImmediateIfNecessary() {
	if (RedisOperationsSessionRepository.this.redisFlushMode == RedisFlushMode.IMMEDIATE) {
		saveDelta();
	}
}
复制代码

RedisFlushMode 提供了两种推送模式:

  • ON_SAVE:只有在调用 save 方法时执行,在 web 环境中这样做通常是尽快提交HTTP响应
  • IMMEDIATE:只要有变更就会直接写到 redis 中,不会像 ON_SAVE 一样,在最后 commit 时一次性写入

追踪 flushImmediateIfNecessary 方法调用链如下:

SpringSession:存储机制设计
那么到这里基本就清楚了,首先 save 这个方法,当主动调用 save 时就是将数据推到 redis 中去的,也就是 ON_SAVE 这种情况。那么对于 IMMEDIATE 这种情况,只有调用了上面的四个方法, SpringSession 才会将数据推送到 redis

所以 delta 里面存的是当前一些变更的 key-val 键值对象,而这些变更是由 setAttributeremoveAttributesetMaxInactiveIntervalInSecondssetLastAccessedTime 这四个方法触发的;比如 setAttribute(k,v) ,那么这个 k->v 就会被保存到 delta 里面。

3.1.3 save

在理解了 saveDelta 方法之后再来看 save 方法就简单多了。 save 对应的就是 RedisFlushMode.ON_SAVE

@Override
public void save(RedisSession session) {
   // 直接调用 saveDelta推数据到redis
	session.saveDelta();
	if (session.isNew()) {
	   // sessionCreatedKey->channl
		String sessionCreatedKey = getSessionCreatedChannel(session.getId());
		// 发布一个消息事件,新增 session,以供 MessageListener 回调处理。
		this.sessionRedisOperations.convertAndSend(sessionCreatedKey, session.delta);
		session.setNew(false);
	}
}
复制代码

3.1.4 findById

查询这部分和基于 Map 的差别比较大,因为这里并不是直接操作 Map ,而是与 Redis 进行一次交互。

@Override
public RedisSession findById(String id) {
	return getSession(id, false);
}
复制代码

调用 getSession 方法:

private RedisSession getSession(String id, boolean allowExpired) {
	// 根据ID从 redis 中取出数据
	Map<Object, Object> entries = getSessionBoundHashOperations(id).entries();
	if (entries.isEmpty()) {
		return null;
	}
	//转换成MapSession
	MapSession loaded = loadSession(id, entries);
	if (!allowExpired && loaded.isExpired()) {
		return null;
	}
	//转换成RedisSession
	RedisSession result = new RedisSession(loaded);
	result.originalLastAccessTime = loaded.getLastAccessedTime();
	return result;
}
复制代码

loadSession 中构建 MapSession

private MapSession loadSession(String id, Map<Object, Object> entries) {
   // 生成MapSession实例
	MapSession loaded = new MapSession(id);
	//遍历数据
	for (Map.Entry<Object, Object> entry : entries.entrySet()) {
		String key = (String) entry.getKey();
		if (CREATION_TIME_ATTR.equals(key)) {
		    // 设置创建时间
			loaded.setCreationTime(Instant.ofEpochMilli((long) entry.getValue()));
		}
		else if (MAX_INACTIVE_ATTR.equals(key)) {
			 // 设置最大有效时间
			loaded.setMaxInactiveInterval(Duration.ofSeconds((int) entry.getValue()));
		}
		else if (LAST_ACCESSED_ATTR.equals(key)) {
			// 设置最后访问时间
			loaded.setLastAccessedTime(Instant.ofEpochMilli((long) entry.getValue()));
		}
		else if (key.startsWith(SESSION_ATTR_PREFIX)) {
		// 设置属性
			loaded.setAttribute(key.substring(SESSION_ATTR_PREFIX.length()),
					entry.getValue());
		}
	}
	return loaded;
}
复制代码

3.1.5 deleteById

根据 sessionId 删除 session 数据。具体过程看代码注释。

@Override
public void deleteById(String sessionId) {
   // 获取 RedisSession
	RedisSession session = getSession(sessionId, true);
	if (session == null) {
		return;
	}
   // 清楚当前session数据的索引
	cleanupPrincipalIndex(session);
	//执行删除操作
	this.expirationPolicy.onDelete(session);
	String expireKey = getExpiredKey(session.getId());
	//删除expireKey
	this.sessionRedisOperations.delete(expireKey);
	//session有效期设置为0
	session.setMaxInactiveInterval(Duration.ZERO);
	save(session);
}
复制代码

3.1.6 onMessage

最后来看下这个订阅回调处理。这里看下核心的一段逻辑:

boolean isDeleted = channel.equals(this.sessionDeletedChannel);
// Deleted 还是 Expired ?
if (isDeleted || channel.equals(this.sessionExpiredChannel)) {
	// 此处省略无关代码
	// Deleted
	if (isDeleted) {
	   // 发布一个 SessionDeletedEvent 事件
		handleDeleted(session);
	}
	// Expired
	else {
		// 发布一个 SessionExpiredEvent 事件
		handleExpired(session);
	}
}
复制代码

3.2 Redis 存储的一些思考

首先按照我们自己常规的思路来设计的话,我们会怎么来考虑这个事情。这里首先要声明下,我对 Redis 这个东西不是很熟,没有做过深入的研究;那如果是我来做,可能也就仅仅限于存储。

  • findByIndexNameAndIndexValue 的设计,这个的作用是通过 indexNameindexValue 来返回当前用户的所有会话。但是这里需要考虑的一个事情是,通常情况下,一个用户只会关联到一个会话上面去,那这种设计很显然,我的理解是为了支持单用户多会话的场景。
    • indexName:FindByIndexNameSessionRepository.PRINCIPAL_NAME_INDEX_NAME
    • indexValue:username
  • 实现 MessageListener 接口,增加事件通知能力。通过监听这些事件,可以做一些 session 操作管控。但是实际上 SpringSession 中并没有做任何事情,从代码来看, publishEvent 方法是空实现。等待回复中 #issue 1287
private ApplicationEventPublisher eventPublisher = new ApplicationEventPublisher() {
	@Override
	public void publishEvent(ApplicationEvent event) {
	}
	@Override
	public void publishEvent(Object event) {
	}
};
复制代码
  • RedisFlushModeSpringSession 中提供了两种模式的推送,一种是 ON_SAVE ,另外一种是 IMMEDIATE 。默认是 ON_SAVE ,也就是常规的在请求处理结束时进行一次 sessionCommit 操作。 RedisFlushMode 的设计感觉是为 session 数据持久化的时机提供了另外一种思路。

小结

存储机制设计部分就一基于内存和基于 Redis 两种来分析;另外基于 jdbchazelcast 有兴趣的同学可以自己查看源码。

最后也欢迎访问我的个人博客:www.glmapper.com

参考


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Everything Store

The Everything Store

Brad Stone / Little, Brown and Company / 2013-10-22 / USD 28.00

The definitive story of Amazon.com, one of the most successful companies in the world, and of its driven, brilliant founder, Jeff Bezos. Amazon.com started off delivering books through the mail. Bu......一起来看看 《The Everything Store》 这本书的介绍吧!

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

URL 编码/解码
URL 编码/解码

URL 编码/解码

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具