SpringCloudGateway之获取请求体(RequestBody)的⼏种⽅
Spring Cloud Gateway 获取请求体
⼀、直接在全局中获取,伪代码如下
private String  resolveBodyFromRequest(ServerHttpRequest serverHttpRequest){
Flux<DataBuffer> body = Body();
AtomicReference<String> bodyRef = new AtomicReference<>();
body.subscribe(buffer -> {
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer.asByteBuffer());
bodyRef.String());
});
();
}
存在的缺陷:其他⽆法再通过该⽅式获取请求体(因为请求体已被消费),并且会抛出异常
Only one connection receive subscriber allowed.Caused by: java.lang.IllegalStateException: Only one connection receive subscriber allowed.
异常原因:实际上spring-cloud-gateway反向代理的原理是,⾸先读取原请求的数据,然后构造⼀个新的请求,将原请求的数据封装到新的请求中,然后再转发出去。然⽽我们在他封装之前读取了⼀次request body,⽽request body只能读取⼀次。因此就出现了上⾯的错误。
再者受版本限制
这种⽅法在spring-boot-starter-parent 2.0.6.RELEASE + Spring Cloud Finchley.SR2 body 中⽣效,
但是在spring-boot-starter-parent 2.1.0.RELEASE + Spring Cloud Greenwich.M3 body 中不⽣效,总是为空
⼆、先在全局过滤器中获取,然后再把request重新包装,继续向下传递传递
@Override
springcloud和springboot
public GatewayFilter apply(NameValueConfig nameValueConfig) {
return (exchange, chain) -> {
URI uri = Request().getURI();
URI ex = UriComponentsBuilder.fromUri(uri).build(true).toUri();
ServerHttpRequest request = Request().mutate().uri(ex).build();
if("POST".MethodValue())){//判断是否为POST请求
Flux<DataBuffer> body = Body();
AtomicReference<String> bodyRef = new AtomicReference<>();
body.subscribe(dataBuffer -> {
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(dataBuffer.asByteBuffer());
bodyRef.String());
});//读取request body到缓存
String bodyStr = ();//获取request body
System.out.println(bodyStr);//这⾥是我们需要做的操作
DataBuffer bodyDataBuffer = stringBuffer(bodyStr);
Flux<DataBuffer> bodyFlux = Flux.just(bodyDataBuffer);
request = new ServerHttpRequestDecorator(request){
@Override
public Flux<DataBuffer> getBody() {
return bodyFlux;
}
};//封装我们的request
}
return chain.filter(exchange.mutate().request(request).build());
};
}
  protected DataBuffer stringBuffer(String value) {
byte[] bytes = Bytes(StandardCharsets.UTF_8);
NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(bytes.length);
buffer.write(bytes);
return buffer;
}
该⽅案的缺陷:request body获取不完整(因为异步原因),只能获取1024B的数据。并且请求体超过1024B,会出现响应超慢(因为我是开启了熔断)。
三、过滤器加路线定位器
翻查源码发现ReadBodyPredicateFactory⾥⾯缓存了request body的信息,于是在⾃定义router中配置了ReadBodyPredicateFactory,然后在filter中通过cachedRequestBodyObject缓存字段获取request body信息。
/**
* @description: 获取POST请求的请求体
* ReadBodyPredicateFactory 发现⾥⾯缓存了request body的信息,
* 于是在⾃定义router中配置了ReadBodyPredicateFactory
* @modified:
*/
@EnableAutoConfiguration
@Configuration
public class RouteLocatorRequestBoby{
  //⾃定义过滤器
@Resource
private ReqTraceFilter reqTraceFilter;
@Resource
private RibbonLoadBalancerClient ribbonLoadBalancerClient;
private static final String SERVICE = "/leap/**";
private static final String HTTP_PREFIX = "";
private static final String COLON = ":";
@Bean
public RouteLocator myRoutes(RouteLocatorBuilder builder) {
//通过负载均衡获取服务实例
ServiceInstance instance = ribbonLoadBalancerClient.choose("PLATFORM-SERVICE");
//拼接路径
StringBuilder forwardAddress = new StringBuilder(HTTP_PREFIX);
forwardAddress.Host())
.append(COLON)
.
Port());
utes()
//拦截请求类型为POST Content-Type application/json application/json;charset=UTF-8
.route(r -> r
.header(HttpHeaders.CONTENT_TYPE,
MediaType.APPLICATION_JSON_VALUE + MediaType.APPLICATION_JSON_UTF8_VALUE)
.and()
.method(HttpMethod.POST)
.and()
//获取缓存中的请求体
.readBody(Object.class, readBody -> {
return true;
})
.and()
.path(SERVICE)
//把请求体传递给reqTraceFilter
.filters(f -> {
f.filter(reqTraceFilter);
return f;
})
.String())).build();
}
/
**
* @description: 过滤器,⽤于获取请求体,和处理请求体业务,列如记录⽇志
* @modified:
*/
@Component
public class ReqTraceFilter implements GlobalFilter, GatewayFilter,Ordered {
private static final String CONTENT_TYPE = "Content-Type";
private static final String CONTENT_TYPE_JSON = "application/json";
//获取请求路由详细信息Route route = Attribute(GATEWAY_ROUTE_BEAN)
private static final String GATEWAY_ROUTE_BEAN = "org.springframework.cloud.gateway.support.ServerWebExchangeUtils.gatewayRoute";
private static final String CACHE_REQUEST_BODY_OBJECT_KEY = "cachedRequestBodyObject";
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = Request();
//判断过滤器是否执⾏
String requestUrl = CurrentRequest(request);
if (!RequestUtils.isFilter(requestUrl)) {
String bodyStr = "";
String contentType = Headers().getFirst(CONTENT_TYPE);
String method = MethodValue();
//判断是否为POST请求
if (null != contentType && HttpMethod.POST.name().equalsIgnoreCase(method) && ains(CONTENT_TYPE_JSON)) {
Object cachedBody = Attribute(CACHE_REQUEST_BODY_OBJECT_KEY);
if(null != cachedBody){
bodyStr = String();
}
}
if (HttpMethod.GET.name().equalsIgnoreCase(method)) {
bodyStr = QueryParams().toString();
}
log.info("请求体内容:{}",bodyStr);
}
return chain.filter(exchange);
}
@Override
public int getOrder() {
return 5;
}
}
该⽅案优点:这种解决,⼀不会带来重复读取问题,⼆不会带来requestbody取不全问题。三在低版本的Spring Cloud Finchley.SR2也可以运⾏。
缺点:不⽀持multipart/form-data(异常415),这个致命。
四、通过org.springframework.cloud.gateway.write包下有个ModifyRequestBodyGatewayFilterFactory,顾名思义,这就是修改 Request Body 的过滤器⼯⼚类。
@Component
@Slf4j
public class ReqTraceFilter implements GlobalFilter, GatewayFilter, Ordered {
@Resource
private IPlatformFeignClient platformFeignClient;
/**
* httpheader,traceId的key名称
*/
private static final String REQUESTID = "traceId";
private static final String CONTENT_TYPE = "Content-Type";
private static final String CONTENT_TYPE_JSON = "application/json";
private static final String GATEWAY_ROUTE_BEAN = "org.springframework.cloud.gateway.support.ServerWebExchangeUtils.gatewayRoute";
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = Request();
//判断过滤器是否执⾏
String requestUrl = CurrentRequest(request);
if (!RequestUtils.isFilter(requestUrl)) {
String bodyStr = "";
String contentType = Headers().getFirst(CONTENT_TYPE);
String method = MethodValue();
//判断是否为POST请求
if (null != contentType && HttpMethod.POST.name().equalsIgnoreCase(method) && ains(CONTENT_TYPE_JSON)) {
ServerRequest serverRequest = new DefaultServerRequest(exchange);
List<String> list = new ArrayList<>();
// 读取请求体
Mono<String> modifiedBody = serverRequest.bodyToMono(String.class)
.flatMap(body -> {
//记录请求体⽇志
final String nId = saveRequestOperLog(exchange, body);
//记录⽇志id
list.add(nId);
return Mono.just(body);
});
BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class);
HttpHeaders headers = new HttpHeaders();
headers.Request().getHeaders());
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);                return bodyInserter.insert(outputMessage, new BodyInserterContext())
.then(Mono.defer(() -> {
ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(
@Override
public HttpHeaders getHeaders() {
long contentLength = ContentLength();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.Headers());
httpHeaders.put(REQUESTID,list);
if (contentLength > 0) {
httpHeaders.setContentLength(contentLength);
} else {
httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
}
return httpHeaders;
}
@Override
public Flux<DataBuffer> getBody() {
Body();
}
};
return chain.filter(exchange.mutate().request(decorator).build());
}));
}
if (HttpMethod.GET.name().equalsIgnoreCase(method)) {
bodyStr = QueryParams().toString();
String nId = saveRequestOperLog(exchange, bodyStr);
ServerHttpRequest userInfo = Request().mutate()
.
header(REQUESTID, nId).build();
return chain.filter(exchange.mutate().request(userInfo).build());
}
}
return chain.filter(exchange);
}
/**
* 保存请求⽇志
*
* @param exchange
* @param requestParameters
* @return
*/
private String saveRequestOperLog(ServerWebExchange exchange, String requestParameters) {
log.debug("接⼝请求参数:{}", requestParameters);
ServerHttpRequest request = Request();
String ip = RemoteAddress()).getAddress().getHostAddress();
SaveOperLogVO vo = new  SaveOperLogVO();
vo.setIp(ip);
vo.CurrentRequest(request));
vo.MethodValue());
vo.setRequestParameters(requestParameters);
Route route = Attribute(GATEWAY_ROUTE_BEAN);
//是否配置路由
if (route != null) {
vo.Id());
}
ResEntity<String> res = platformFeignClient.saveOperLog(vo);
log.debug("当前请求ID返回的数据:{}", res);
Data();
}
@Override
public int getOrder() {
return 5;
}
}
该⽅案:完美解决以上所有问题