实战基于SpringBoot2的WebFlux和mLab搭建反应式Web Spring Framework 5带来了新的Reactive Stack⾮阻塞式Web框架:Spring WebFlux。作为与Spring MVC并⾏使⽤的Web框架,Spring WebFlux依赖了反应式流适配器(Reactive Streams Adapter),在Netty和Servlet3.1的容器下,可以提供⾮阻塞式的Web服务,充分发挥下⼀代多核处理器的优势,⽀撑海量的并发访问。
以上是官⽹的介绍,事实上在基于Spring Boot 2强⼤的微服务架构帮助下,WebFlux和Spring MVC⼀起,成为Java应⽤开发的两⼤选择,可以让我们迅速地搭建起反应式的Web应⽤。本⽂拟通过模拟⼀个简单的微博应⽤,实战通过Spring Boot 2+ Spring WebFlux + MongoDB 开发⼀个Web应⽤。
Spring WebFlux及其编程范式
Spring WebFlux通过核⼼库Reactor提供反应式⽀持,Reactor实现了Reactive Streams,后者是⼀个带⾮阻塞式背压的异步流处理器。Reactor包含两个重要的成员变量Flux和Mono,它们都实现了Reactive Streams提供的Publisher接⼝. Flux是⼀个代表了0..N元素的流,Mono是代表了⼀个0..1元素的流。虽然WebFlux使⽤Reactor作为它的核⼼依赖,它在应⽤层⾯,它也同时⽀持RxJava。
Spring WebFlux⽀持两种类型的编程范式:
1. 传统的基于注解的⽅式,如@Controller、@RequestMapping等沿⽤了Spring MVC的模式.
2. 基于Java8的Lambda函数式编程模式
本⽂主要是使⽤基于注解的⽅式,今后另⽂补充基于函数式编程的范式。
基于Spring Boot 2+ Spring WebFlux + MongoDB的轻量级微博应⽤
以下展⽰如何搭建⼀个轻量级的微博应⽤,这个应⽤只包括⼀个domain类Tweet,使⽤基于MongoDB的在线MongoDB数据库mLab作为存储,并且使⽤异步的RESTful API提供基本的增删查改功能。
此外还会⽤到Spring Test组件,通过使⽤Maven的插件功能,实现对微服务应⽤的测试。
1. 新建项⽬
1. 点击
2. 选择2.x以上的Spring Boot版本
3. 输⼊artifact的值,⽐如webflux-demo
4. 选择Reactive Web和Reactive MongoDB依赖
5. 点击Generate Project,⽣成并下载⼀个微服务框架到本地,并解压
6. 使⽤IDE,⽐如eclipse,导⼊解压出来的项⽬⽂件
2. 注册mLab账户,并新建⼀个MongoDB数据库
MongoDB数据库是常⽤的⽂档类型数据库,⼴泛⽤于社交⽹站、电商等引⽤中。⽽mLab是⼀个在线MongoDB数据库平台,提供MongoDB 的在线服务。这个应⽤使⽤到它。
1. 前往
2. 根据要求注册账户
3. ⽹站会有免费和收费的服务选择,选择AWS的免费MongoDB服务
4. 服务选择完毕,平台会提供⼀个数据库镜像,可以点击数据库前往管理页⾯。
5. 在User标签下,新建数据库的登录名和密码。
完成以上步骤,数据库就可以开始使⽤了。你会看到如下图所⽰的页⾯:
3. 在项⽬中配置MongoDB数据库
前往IDE中的项⽬资源⽂件夹,到application.properties。添加你在mLad的MongoDB URI
db.uri=mongodb://username:password@ds063439.mlab:63439/springdb   
在应⽤启动的时候,Springboot会⾃动读取该配置⽂件。
4. 编写应⽤各模块
WebFlux可以认为是基于Spring的Web开发的⼀个新的模式或选择,因此它既有Spring MVC有的模块如Domain、Controller、Service,也有新增的如Handler、Router等。下⾯分别编写各模块。
4.1 Domain包
Domain包只包括⼀个domain类Tweet.java,因为使⽤了⽂档数据库,因此使⽤@Document注解修饰类,并且使⽤@Id修饰成员变量id。@NotBlank、@NotNull和@Size限定了成员变量的值的范围。代码如下:
@Document(collection = "tweets")
public class Tweet {
@Id
private String id;
@NotBlank
@Size(max = 140)
private String text;
@NotNull
private Date createAt = new Date();
public Tweet() {
}
//省略Tweet的getter和setter⽅法
}
4.2 Repository
Repository接⼝是DAO,继承了ReactiveMongoRepository接⼝⽤于连接MongoDB数据库做数据持久化,
pository;
2
3import org.db.repository.ReactiveMongoRepository;
4import org.springframework.stereotype.Repository;
5
del.Tweet;
7
8 @Repository
9public interface TweetRepository extends ReactiveMongoRepository<Tweet, String> {
10
11 }
其中⽗接⼝ReactiveMongoRepository的源码如下:
public interface ReactiveMongoRepository<T, ID> extends ReactiveSortingRepository<T, ID>, ReactiveQueryByExampleExecutor<T> {
<S extends T> Mono<S> insert(S entity);
<S extends T> Flux<S> insert(Iterable<S> entities);
<S extends T> Flux<S> insert(Publisher<S> entities);
<S extends T> Flux<S> findAll(Example<S> example);
<S extends T> Flux<S> findAll(Example<S> example, Sort sort);
}
通过查看源码可知,⽗接⼝ReactiveMongoRepository包含对MongoDB数据库基本的增删改查⽅法。在运⾏时,Spring Boot会⾃动实现⼀个SimpleReactiveMongoRepository类,⽤于执⾏增删改查⽅法。这样极⼤地节省了程序员持久化的精⼒,可以专注于业务开发。
4.3 Controller
Controller是WebFlux的核⼼类,该类定义了增删查改对应的⽅法,代码如下:
@RestController
public class TweetController {
@Autowired
private TweetRepository tweetRepository;
//通过接受Get请求,返回Flux类型的Tweet对象流
@GetMapping("/tweets")
public Flux<Tweet> getAllTweets(){
return tweetRepository.findAll();
}
//通过接受POST请求,新增⼀个Tweet对象
@PostMapping("/tweets")
public Mono<Tweet> createTweets(@Valid @RequestBody Tweet tweet){
return tweetRepository.save(tweet);
}
//通过id查Tweet
@GetMapping("/tweets/{id}")
public Mono<ResponseEntity<Tweet>> getTweetById(@PathVariable(value = "id") String tweetId) {
return tweetRepository.findById(tweetId)
.
map(savedTweet -> ResponseEntity.ok(savedTweet))
.Found().build());
}
     //通过id更新Tweet,使⽤到SpringMVC的相关注解
  @PutMapping("/tweets/{id}")
  public Mono<ResponseEntity<Tweet>> updateTweet(@PathVariable(value = "id") String tweetId,
@Valid @RequestBody Tweet tweet) {
  return tweetRepository.findById(tweetId)
.flatMap(existingTweet -> {
existingTweet.Text());
return tweetRepository.save(existingTweet);
})
.map(updatedTweet -> new ResponseEntity<>(updatedTweet, HttpStatus.OK))
.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
      //通过id删除tweet
  @DeleteMapping("/tweets/{id}")
  public Mono<ResponseEntity<Void>> deleteTweet(@PathVariable(value = "id") String tweetId) {
  return tweetRepository.findById(tweetId)
.flatMap(existingTweet ->
tweetRepository.delete(existingTweet)
.then(Mono.just(new ResponseEntity<Void>(HttpStatus.OK)))
)
.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
// 基于反应式流发送微博⾄客户端
@GetMapping(value = "/stream/tweets", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Tweet> streamAllTweets() {
return tweetRepository.findAll();
}
Controller是FluxWeb编程的核⼼,与SpringMVC不同,所有的处理⽅法返回的都是Flux或Mono对象。
Flux 和 Mono 是 Reactor 中的两个基本概念。Flux 表⽰的是包含 0 到 N 个元素的异步序列。在该序列中可以包含三种不同类型的消息通知:正常的包含元素的消息、序列结束的消息和序列出错的消息。当消息通知产⽣时,订阅者中对应的⽅法 onNext(), onComplete()和onError()会被调⽤。
Mono 表⽰的是包含 0 或者 1 个元素的异步序列。该序列中同样可以包含与 Flux 相同的三种类型的消息通知。
Flux 和 Mono 之间可以进⾏转换。对⼀个 Flux 序列进⾏计数操作,得到的结果是⼀个 Mono<Long>对象。把两个 Mono 序列合并在⼀起,得到的是⼀个 Flux 对象。
Controller使⽤Flux或Mono作为对象,返回给不同的请求。反应式编码主要在最后⼀个⽅法:
// 基于反应式流发送微博⾄客户端
@GetMapping(value = "/stream/tweets", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Tweet> streamAllTweets() {
return tweetRepository.findAll();
这个⽅法和getAllTweet⽅法⼀样,会返回⼀个JSON流到客户端,区别在于streamAllTweets以Server-send-event的⽅式返回⼀个Json流到浏览器,这种流可以被浏览器识别和使⽤。这⾥涉及到服务器推送事件(Server-Send Event)
web布局框架
服务器推送事件(Server-Sent Events,SSE)允许服务器端不断地推送数据到客户端。相对于 WebSocket ⽽⾔,服务器推送事件只⽀持服务器端到客户端的单向数据传递。虽然功能较弱,但优势在于 SSE 在已有的 HTTP 协议上使⽤简单易懂的⽂本格式(如JSON)来表⽰传输的数据。
作为 W3C 的推荐规范,SSE 在浏览器端的⽀持也⽐较⼴泛,除了 IE 之外的其他浏览器都提供了⽀持。在 IE 上也可以使⽤ polyfill 库来提供⽀持。在服务器端来说,SSE 是⼀个不断产⽣新数据的流,⾮常适合于⽤反应式流来表⽰。在 WebFlux 中创建 SSE 的服务器端是⾮常简单的。只需要返回的对象的类型是 Flux<ServerSentEvent>,就会被⾃动按照 SSE 规范要求的格式来发送响应。
使⽤WebTestClient测试应⽤
WebTestClient是Spring 5提供的⼀个异步反应式Http客户端,可以⽤于测试反应式的RestFul微服务应⽤。在IDE的测试⽂件夹中,可以到测试类,编写代码如下:
public class WebfluxDemoApplicationTests {
@Autowired
private WebTestClient webTestClient;
@Autowired
TweetRepository tweetRepository;
@Test
public void testCreateTweet() {
Tweet tweet = new Tweet("这是⼀条测试微博");
webTestClient.post().uri("/tweets")
.contentType(MediaType.APPLICATION_JSON_UTF8)
.accept(MediaType.APPLICATION_JSON_UTF8)
.body(Mono.just(tweet), Tweet.class)
.exchange()
.expectStatus().isOk()
.expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8)
.
expectBody()
.jsonPath("$.id").isNotEmpty()
.jsonPath("$.text").isEqualTo("这是⼀条测试微博");
}
在测试类中通过控制反转注⼊WebTestClient和DAO的对象,调⽤WebTestClient⽅法进⾏测试,使⽤mvn test命令,测试所有的测试类。结果如下:
查看mLab的数据库,数据被成功添加: