内容简介:在之前的文章中已经对这里先来看下
在之前的文章中已经对 SpringSession
的功能结构,请求/响应重写等做了介绍。本文将继续来介绍下 SpringSession
中存储部分的设计。存储是分布式 session
中算是最核心的部分,通过引入三方的存储容器来实现 session
的存储,从而有效的解决 session
共享的问题。
1、SpringSession存储的顶级抽象接口
SpringSession
存储的顶级抽象接口是 org.springframework.session
包下的 SessionRepository
这个接口。 SessionRepository
的类图结构如下:
这里先来看下 SessionRepository
这个顶层接口中定义了哪些方法:
public interface SessionRepository<S extends Session> { //创建一个session S createSession(); //保存session void save(S session); //通过ID查找session S findById(String id); //通过ID删除一个session void deleteById(String id); } 复制代码
从代码来看还是很简单的,就是增删查。下面看具体实现。在2.0版本开始 SpringSession
中也提供了一个和 SessionRepository
具体相同能力的 ReactiveSessionRepository
,用于支持响应式编程模式。
2、MapSessionRepository
基于HashMap实现的基于内存存储的存储器实现,这里就主要看下对于接口中几个方法的实现。
public class MapSessionRepository implements SessionRepository<MapSession> { private Integer defaultMaxInactiveInterval; private final Map<String, Session> sessions; //... } 复制代码
可以看到就是一个 Map
,那后面关于增删查其实就是操作这个 Map
了。
createSession
@Override public MapSession createSession() { MapSession result = new MapSession(); if (this.defaultMaxInactiveInterval != null) { result.setMaxInactiveInterval( Duration.ofSeconds(this.defaultMaxInactiveInterval)); } return result; } 复制代码
这里很直接,就是 new
了一个 MapSession
,然后设置了 session
的有效期。
save
@Override public void save(MapSession session) { if (!session.getId().equals(session.getOriginalId())) { this.sessions.remove(session.getOriginalId()); } this.sessions.put(session.getId(), new MapSession(session)); } 复制代码
这里面先判断了 session
中的两个 ID
,一个 originalId
,一个当前 id
。 originalId
是第一次生成 session
对象时创建的,后面都不会在变化。通过源码来看,对于 originalId
,只提供了 get
方法。对于 id
呢,其实是可以通过 changeSessionId
来改变的。
这里的这个操作实际上是一种优化行为,及时的清除掉老的 session
数据来释放内存空间。
findById
@Override public MapSession findById(String id) { Session saved = this.sessions.get(id); if (saved == null) { return null; } if (saved.isExpired()) { deleteById(saved.getId()); return null; } return new MapSession(saved); } 复制代码
这个逻辑也很简单,先从 Map
中根据 id
取出 session
数据,如果没有就返回 null
,如果有则再判断下是否过期了,如果过期了就删除掉,然后返回 null
。如果查到了,并且没有过期的话,则构建一个 MapSession
返回。
OK,基于内存存储的实现系列就是这些了,下面继续来看其他存储的实现。
3、FindByIndexNameSessionRepository
FindByIndexNameSessionRepository
继承了 SessionRepository
接口,用于扩展对第三方存储的实现。
public interface FindByIndexNameSessionRepository<S extends Session> extends SessionRepository<S> { String PRINCIPAL_NAME_INDEX_NAME = FindByIndexNameSessionRepository.class.getName() .concat(".PRINCIPAL_NAME_INDEX_NAME"); Map<String, S> findByIndexNameAndIndexValue(String indexName, String indexValue); default Map<String, S> findByPrincipalName(String principalName) { return findByIndexNameAndIndexValue(PRINCIPAL_NAME_INDEX_NAME, principalName); } } 复制代码
FindByIndexNameSessionRepository
添加一个单独的方法为指定用户查询所有会话。这是通过设置名为 FindByIndexNameSessionRepository.PRINCIPAL_NAME_INDEX_NAME
的 Session
的属性值为指定用户的 username
来完成的。开发人员有责任确保属性被赋值,因为 SpringSession
不会在意被使用的认证机制。官方文档中给出的例子如下:
String username = "username"; this.session.setAttribute( FindByIndexNameSessionRepository.PRINCIPAL_NAME_INDEX_NAME, username); 复制代码
FindByIndexNameSessionRepository
的一些实现会提供一些钩子自动的索引其他的 session
属性。比如,很多实现都会自动的确保当前的 Spring Security
用户名称可通过索引名称 FindByIndexNameSessionRepository.PRINCIPAL_NAME_INDEX_NAME
进行索引。一旦会话被索引,就可以通过下面的代码检索:
String username = "username"; Map<String, Session> sessionIdToSession = this.sessionRepository.findByIndexNameAndIndexValue( FindByIndexNameSessionRepository.PRINCIPAL_NAME_INDEX_NAME,username); 复制代码
下图是 FindByIndexNameSessionRepository
接口的三个实现类:
下面来分别分析下这三个存储的实现细节。
3.1 RedisOperationsSessionRepository
RedisOperationsSessionRepository
的类图结构如下, MessageListener
是 redis
消息订阅的监听接口。
代码有点长,就不在这里面贴了,一些注释可以在这个 SpringSession中文分支 来看。这里还是主要来看下对于那几个方法的实现。
3.1.1 createSession
这里和 MapSessionRepository
的实现基本一样的,那区别就在于 Session
的封装模型不一样,这里是 RedisSession
,实际上 RedisSession
的实现是对 MapSession
又包了一层。下面会分析 RedisSession
这个类。
@Override public RedisSession createSession() { // RedisSession,这里和MapSession区别开 RedisSession redisSession = new RedisSession(); if (this.defaultMaxInactiveInterval != null) { redisSession.setMaxInactiveInterval( Duration.ofSeconds(this.defaultMaxInactiveInterval)); } return redisSession; } 复制代码
在看其他两个方法之前,先来看下 RedisSession
这个类。
3.1.2 RedisSession
这个在模型上是对 MapSession
的扩展,增加了 delta
这个东西。
final class RedisSession implements Session { // MapSession 实例对象,主要存数据的地方 private final MapSession cached; // 原始最后访问时间 private Instant originalLastAccessTime; private Map<String, Object> delta = new HashMap<>(); // 是否是新的session对象 private boolean isNew; // 原始主名称 private String originalPrincipalName; // 原始sessionId private String originalSessionId; 复制代码
delta
是一个Map结构,那么这里面到底是放什么的呢?具体细节见 saveDelta 这个方法。 saveDelta
这个方法会在两个地方被调用,一个是下面要说道的 save
方法,另外一个是 flushImmediateIfNecessary
这个方法:
private void flushImmediateIfNecessary() { if (RedisOperationsSessionRepository.this.redisFlushMode == RedisFlushMode.IMMEDIATE) { saveDelta(); } } 复制代码
RedisFlushMode
提供了两种推送模式:
- ON_SAVE:只有在调用
save
方法时执行,在web
环境中这样做通常是尽快提交HTTP响应 - IMMEDIATE:只要有变更就会直接写到
redis
中,不会像ON_SAVE
一样,在最后commit
时一次性写入
追踪 flushImmediateIfNecessary
方法调用链如下:
save
这个方法,当主动调用
save
时就是将数据推到
redis
中去的,也就是
ON_SAVE
这种情况。那么对于
IMMEDIATE
这种情况,只有调用了上面的四个方法,
SpringSession
才会将数据推送到
redis
。
所以 delta
里面存的是当前一些变更的 key-val
键值对象,而这些变更是由 setAttribute
、 removeAttribute
、 setMaxInactiveIntervalInSeconds
、 setLastAccessedTime
这四个方法触发的;比如 setAttribute(k,v)
,那么这个 k->v
就会被保存到 delta
里面。
3.1.3 save
在理解了 saveDelta
方法之后再来看 save
方法就简单多了。 save
对应的就是 RedisFlushMode.ON_SAVE
。
@Override public void save(RedisSession session) { // 直接调用 saveDelta推数据到redis session.saveDelta(); if (session.isNew()) { // sessionCreatedKey->channl String sessionCreatedKey = getSessionCreatedChannel(session.getId()); // 发布一个消息事件,新增 session,以供 MessageListener 回调处理。 this.sessionRedisOperations.convertAndSend(sessionCreatedKey, session.delta); session.setNew(false); } } 复制代码
3.1.4 findById
查询这部分和基于 Map
的差别比较大,因为这里并不是直接操作 Map
,而是与 Redis
进行一次交互。
@Override public RedisSession findById(String id) { return getSession(id, false); } 复制代码
调用 getSession
方法:
private RedisSession getSession(String id, boolean allowExpired) { // 根据ID从 redis 中取出数据 Map<Object, Object> entries = getSessionBoundHashOperations(id).entries(); if (entries.isEmpty()) { return null; } //转换成MapSession MapSession loaded = loadSession(id, entries); if (!allowExpired && loaded.isExpired()) { return null; } //转换成RedisSession RedisSession result = new RedisSession(loaded); result.originalLastAccessTime = loaded.getLastAccessedTime(); return result; } 复制代码
loadSession
中构建 MapSession
:
private MapSession loadSession(String id, Map<Object, Object> entries) { // 生成MapSession实例 MapSession loaded = new MapSession(id); //遍历数据 for (Map.Entry<Object, Object> entry : entries.entrySet()) { String key = (String) entry.getKey(); if (CREATION_TIME_ATTR.equals(key)) { // 设置创建时间 loaded.setCreationTime(Instant.ofEpochMilli((long) entry.getValue())); } else if (MAX_INACTIVE_ATTR.equals(key)) { // 设置最大有效时间 loaded.setMaxInactiveInterval(Duration.ofSeconds((int) entry.getValue())); } else if (LAST_ACCESSED_ATTR.equals(key)) { // 设置最后访问时间 loaded.setLastAccessedTime(Instant.ofEpochMilli((long) entry.getValue())); } else if (key.startsWith(SESSION_ATTR_PREFIX)) { // 设置属性 loaded.setAttribute(key.substring(SESSION_ATTR_PREFIX.length()), entry.getValue()); } } return loaded; } 复制代码
3.1.5 deleteById
根据 sessionId
删除 session
数据。具体过程看代码注释。
@Override public void deleteById(String sessionId) { // 获取 RedisSession RedisSession session = getSession(sessionId, true); if (session == null) { return; } // 清楚当前session数据的索引 cleanupPrincipalIndex(session); //执行删除操作 this.expirationPolicy.onDelete(session); String expireKey = getExpiredKey(session.getId()); //删除expireKey this.sessionRedisOperations.delete(expireKey); //session有效期设置为0 session.setMaxInactiveInterval(Duration.ZERO); save(session); } 复制代码
3.1.6 onMessage
最后来看下这个订阅回调处理。这里看下核心的一段逻辑:
boolean isDeleted = channel.equals(this.sessionDeletedChannel); // Deleted 还是 Expired ? if (isDeleted || channel.equals(this.sessionExpiredChannel)) { // 此处省略无关代码 // Deleted if (isDeleted) { // 发布一个 SessionDeletedEvent 事件 handleDeleted(session); } // Expired else { // 发布一个 SessionExpiredEvent 事件 handleExpired(session); } } 复制代码
3.2 Redis 存储的一些思考
首先按照我们自己常规的思路来设计的话,我们会怎么来考虑这个事情。这里首先要声明下,我对 Redis
这个东西不是很熟,没有做过深入的研究;那如果是我来做,可能也就仅仅限于存储。
-
findByIndexNameAndIndexValue
的设计,这个的作用是通过indexName
和indexValue
来返回当前用户的所有会话。但是这里需要考虑的一个事情是,通常情况下,一个用户只会关联到一个会话上面去,那这种设计很显然,我的理解是为了支持单用户多会话的场景。- indexName:FindByIndexNameSessionRepository.PRINCIPAL_NAME_INDEX_NAME
- indexValue:username
- 实现
MessageListener
接口,增加事件通知能力。通过监听这些事件,可以做一些session
操作管控。但是实际上SpringSession
中并没有做任何事情,从代码来看,publishEvent
方法是空实现。等待回复中 #issue 1287
private ApplicationEventPublisher eventPublisher = new ApplicationEventPublisher() { @Override public void publishEvent(ApplicationEvent event) { } @Override public void publishEvent(Object event) { } }; 复制代码
-
RedisFlushMode
,SpringSession
中提供了两种模式的推送,一种是ON_SAVE
,另外一种是IMMEDIATE
。默认是ON_SAVE
,也就是常规的在请求处理结束时进行一次sessionCommit
操作。RedisFlushMode
的设计感觉是为session
数据持久化的时机提供了另外一种思路。
小结
存储机制设计部分就一基于内存和基于 Redis
两种来分析;另外基于 jdbc
和 hazelcast
有兴趣的同学可以自己查看源码。
最后也欢迎访问我的个人博客:www.glmapper.com
参考
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Web 存储机制
- Kubernetes存储机制的实现
- Kafka 源码解析:日志数据存储机制
- 回顾 Android 11 中的存储机制更新
- kafka日志索引存储及Compact压实机制深入剖析-kafka 商业环境实战
- 块存储、文件存储、对象存储三者之比较
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Everything Store
Brad Stone / Little, Brown and Company / 2013-10-22 / USD 28.00
The definitive story of Amazon.com, one of the most successful companies in the world, and of its driven, brilliant founder, Jeff Bezos. Amazon.com started off delivering books through the mail. Bu......一起来看看 《The Everything Store》 这本书的介绍吧!