SpringBoot - 状态机框架StateMachine使用详解3(状态机持久化)
通常来说,我们项目的状态机不可能都是从头一路走到尾的,而是可能需要在某个环节停留,然后等待其他业务的触发,再继续下面的流程。比如用户 A 创建了个订单 A,但他可能要第二天才付款。而在他付款前,可能会有其他用户也需要下单。那么我们就需要在订单 A 创建后将状态机状态保存起来,等用户 A 付款前再将其恢复。这个便是持久化。
(2)接着测试一下,我们每次发送事件前会先恢复状态机状态,等发送后再保存状态。
(3)上面代码我们首先发送订单 1 的支付事件,而订单 1 的收货事件等待 6 秒钟后才发送。在这之前我们又分别发送订单 2 的支付事件、收货事件。运行结果如下,可以看到由于做了持久化,状态机不会发生冲突。
(2)然后编辑项目的 application.properties 文件,添加 Redis 连接配置:
(3)然后状态机持久化的配置类内容如下:
(4)最后测试一下,这部分代码和上面内存持久化是一样的:
(5)查看 Redis 也可以看到里面确实保存了状态机状态数据:
七、状态机持久化
1,状态持久化意义
通过持久化状态,Spring StateMachine 可以保证流程可恢复性,高可用性,和可维护性,使得状态机可以长时间运行,在分布式系统中运行。具体原因主要有以下几点:- 故障恢复: 如果状态机的运行过程中发生故障,持久化状态可以帮助状态机恢复到故障之前的状态,从而继续执行业务流程。
- 可恢复性: 如果状态机的运行过程中发生意外中断,例如应用程序重新启动或宕机,持久化状态可以帮助状态机恢复到中断之前的状态,继续执行业务流程。
- 长时间运行: 如果状态机需要长时间运行,例如几天或几周,持久化状态可以帮助状态机恢复到中断之前的状态,并继续执行业务流程。
- 分布式系统: 如果状态机是分布式系统中的一部分,持久化状态可以帮助状态机在不同节点之间进行状态共享,从而维护整体业务流程的一致性。
2,状态持久化方式
- 内存持久化:状态机的状态存储在内存中,在应用程序重新启动后不会被持久化。如果没有配置其他持久化存储,则默认行为为此行为。
- 基于 JDBC 的持久化:我们可以使用 MySQL,PostgreSQL 或 H2 等关系数据库来持久化状态机的状态。为此,我们需要通过扩展 AbstractPersistStateMachineHandler 创建自定义持久化插件,然后配置状态机使用自定义插件。
- 基于 MongoDB 的持久化:我们可以使用 MongoDB 来持久化状态机的状态,使用 MongoDbStateMachinePersister。为此,我们需要配置 MongoTemplate bean 然后将其添加为拦截器到状态机上,可以参考上面的示例
- 基于 Redis 的持久化:我们可以使用 Redis 作为状态机的存储,通过配置 RedisStateMachinePersister。 我们需要配置 RedisConnectionFactory bean,然后创建 RedisStateMachineContextRepository 来保存状态。
3,内存持久化样例
(1)我们对前文的样例做个改造,增加一个持久化配置类,用于实现内存持久化。其原理使用唯一 id 作为 key(本样例是订单 id),把状态机保存到 map 表里面,等需要恢复时再从 map 中取出。
// 状态机持久化的配置类 @Configuration public class StateMachinePersisterConfig { /** * 内存持久化配置 */ @Bean public DefaultStateMachinePersister persister() { return new DefaultStateMachinePersister(new StateMachinePersist() { //用户保存所有状态机上下文 private Map map = new HashMap(); // 持久化状态机 @Override public void write(StateMachineContext context, Object contextObj) throws Exception { System.out.println("持久化状态机:contextObj:" + contextObj.toString()); map.put(contextObj, context); } // 获取状态机 @Override public StateMachineContext read(Object contextObj) throws Exception { System.out.println("获取状态机,contextObj:" + contextObj.toString()); StateMachineContext stateMachineContext = (StateMachineContext) map.get(contextObj); return stateMachineContext; } }); } }
(2)接着测试一下,我们每次发送事件前会先恢复状态机状态,等发送后再保存状态。
@SpringBootApplication public class TestApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(TestApplication.class, args); } // 状态机对象 @Autowired private StateMachine<States, Events> stateMachine; @Autowired private StateMachinePersister<States, Events, String> stateMachineMemPersister; //在run函数中,我们定义了整个流程的处理过程 @Override public void run(String... args) throws Exception { // 创建第1个订单对象 Order order1 = new Order(); order1.setStates(States.UNPAID); order1.setId(1); System.out.println("--- 发送第1个订单支付事件 ---"); boolean result = this.sendEvent(Events.PAY, order1); System.out.println("> 事件是否发送成功:" + result + ",订单编号:" + order1.getId() +",当前状态:" + stateMachine.getState().getId()); // 在子线程中等待6秒后再发送收货事件 new Thread(() -> { try { Thread.sleep(6000); System.out.println("--- 发送第1个订单收货事件 ---"); boolean result2 = this.sendEvent(Events.RECEIVE, order1); System.out.println("> 事件是否发送成功:" + result2 + ",订单编号:" + order1.getId() +",当前状态:" + stateMachine.getState().getId()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); // 创建第2个订单对象 Order order2 = new Order(); order2.setStates(States.UNPAID); order2.setId(2); System.out.println("--- 发送第2个订单支付事件 ---"); result = this.sendEvent(Events.PAY, order2); System.out.println("> 事件是否发送成功:" + result + ",订单编号:" + order2.getId() +",当前状态:" + stateMachine.getState().getId()); System.out.println("--- 发送第2个订单收货事件 ---"); result = this.sendEvent(Events.RECEIVE, order2); System.out.println("> 事件是否发送成功:" + result + ",订单编号:" + order2.getId() +",当前状态:" + stateMachine.getState().getId()); } /** * 发送订单状态转换事件 * synchronized修饰保证这个方法是线程安全的 */ private synchronized boolean sendEvent(Events changeEvent, Order order) { boolean result = false; try { //启动状态机 stateMachine.start(); //尝试恢复状态机状态 stateMachineMemPersister.restore(stateMachine, String.valueOf(order.getId())); Message message = MessageBuilder.withPayload(changeEvent) .setHeader("order", order).build(); result = stateMachine.sendEvent(message); //持久化状态机状态 stateMachineMemPersister.persist(stateMachine, String.valueOf(order.getId())); } catch (Exception e) { System.out.println("操作失败:" + e.getMessage()); } finally { stateMachine.stop(); } return result; } }
(3)上面代码我们首先发送订单 1 的支付事件,而订单 1 的收货事件等待 6 秒钟后才发送。在这之前我们又分别发送订单 2 的支付事件、收货事件。运行结果如下,可以看到由于做了持久化,状态机不会发生冲突。
4,基于 Redis 的持久化
(1)要使用 Redis 进行持久化,首先编辑项目的 pom.xml 文件,添加相关依赖:
<!-- redis持久化状态机 --> <dependency> <groupId>org.springframework.statemachine</groupId> <artifactId>spring-statemachine-redis</artifactId> <version>1.2.14.RELEASE</version> </dependency>
(2)然后编辑项目的 application.properties 文件,添加 Redis 连接配置:
# Redis数据库索引(默认为0) spring.redis.database=0 # Redis服务器地址 spring.redis.host=192.168.60.9 # Redis服务器连接端口 spring.redis.port=6379 # Redis服务器连接密码(默认为空) spring.redis.password=123456 # 连接池最大连接数(使用负值表示没有限制) spring.redis.pool.max-active=8 # 连接池最大阻塞等待时间(使用负值表示没有限制) spring.redis.pool.max-wait=-1 # 连接池中的最大空闲连接 spring.redis.pool.max-idle=8 # 连接池中的最小空闲连接 spring.redis.pool.min-idle=0 # 连接超时时间(毫秒) spring.redis.timeout=0
(3)然后状态机持久化的配置类内容如下:
// 状态机持久化的配置类 @Configuration public class StateMachinePersisterConfig { @Resource private RedisConnectionFactory redisConnectionFactory; /** * Redis持久化配置 */ @Bean public RedisStateMachinePersister persister() { RedisStateMachineContextRepository<Events, States> repository = new RedisStateMachineContextRepository(redisConnectionFactory); RepositoryStateMachinePersist p = new RepositoryStateMachinePersist<>(repository); return new RedisStateMachinePersister<>(p); } }
(4)最后测试一下,这部分代码和上面内存持久化是一样的:
@SpringBootApplication public class TestApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(TestApplication.class, args); } // 状态机对象 @Autowired private StateMachine<States, Events> stateMachine; @Autowired private StateMachinePersister<States, Events, String> stateMachineMemPersister; //在run函数中,我们定义了整个流程的处理过程 @Override public void run(String... args) throws Exception { // 创建第1个订单对象 Order order1 = new Order(); order1.setStates(States.UNPAID); order1.setId(1); System.out.println("--- 发送第1个订单支付事件 ---"); boolean result = this.sendEvent(Events.PAY, order1); System.out.println("> 事件是否发送成功:" + result + ",订单编号:" + order1.getId() +",当前状态:" + stateMachine.getState().getId()); // 在子线程中等待6秒后再发送收货事件 new Thread(() -> { try { Thread.sleep(6000); System.out.println("--- 发送第1个订单收货事件 ---"); boolean result2 = this.sendEvent(Events.RECEIVE, order1); System.out.println("> 事件是否发送成功:" + result2 + ",订单编号:" + order1.getId() +",当前状态:" + stateMachine.getState().getId()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); // 创建第2个订单对象 Order order2 = new Order(); order2.setStates(States.UNPAID); order2.setId(2); System.out.println("--- 发送第2个订单支付事件 ---"); result = this.sendEvent(Events.PAY, order2); System.out.println("> 事件是否发送成功:" + result + ",订单编号:" + order2.getId() +",当前状态:" + stateMachine.getState().getId()); System.out.println("--- 发送第2个订单收货事件 ---"); result = this.sendEvent(Events.RECEIVE, order2); System.out.println("> 事件是否发送成功:" + result + ",订单编号:" + order2.getId() +",当前状态:" + stateMachine.getState().getId()); } /** * 发送订单状态转换事件 * synchronized修饰保证这个方法是线程安全的 */ private synchronized boolean sendEvent(Events changeEvent, Order order) { boolean result = false; try { //启动状态机 stateMachine.start(); //尝试恢复状态机状态 stateMachineMemPersister.restore(stateMachine, String.valueOf(order.getId())); Message message = MessageBuilder.withPayload(changeEvent) .setHeader("order", order).build(); result = stateMachine.sendEvent(message); //持久化状态机状态 stateMachineMemPersister.persist(stateMachine, String.valueOf(order.getId())); } catch (Exception e) { System.out.println("操作失败:" + e.getMessage()); } finally { stateMachine.stop(); } return result; } }
(5)查看 Redis 也可以看到里面确实保存了状态机状态数据: