Netty整合Http与WebSocket的Demo⼊门Netty我就不多说了,是什么能看到这篇⽂章的都很清楚
⽹上很多⽂章直接黏贴复制的不说,还基本没办法拿出来当个例⼦⾛⼀遍。
我这版虽然也是照着能⽤的修修改改,但最起码保证能⽤,⽽且注释很详细。
话不多说,直接搞重点。
我的需求是什么:
⽤Netty搭建⼀个项⽬,能接到Http、WebSocket请求,处理它,返回它。
请求类型eg:
ws://www.anyongliang:8888/ws
实现需求我需要什么:
电脑
Jdk1.8(推荐)
IDE(推荐idea,我也⽤idea做演⽰)
jar管理(推荐gradle,我也⽤gradle做演⽰)
开始实现:
我的注释很多,就不⼀⼀细写了,没什么卵⽤,demo搭起来,⾛个⼏遍,打⼏个断点,什么都懂了。
个别不懂的针对类去搜,实在不⾏去官⽹译本:
1:创建项⽬,并添加依赖。
项⽬核⼼结构如图:
依赖:
testCompile group: 'junit', name: 'junit', version: '4.12'
//netty-4
compile group: 'ioty', name: 'netty-all', version: '4.1.19.Final'
//mongo-Bson
compile group: 'db', name: 'mongo-java-driver', version: '3.8.2'
//google-Gson
compile group: 'de.gson', name: 'gson', version: '2.8.5'
//apache-commons⼯具包
compile group: 'org.apachemons', name: 'commons-dbcp2', version: '2.1.1'    compile group: 'org.apachemons', name: 'commons-lang3', version: '3.6'    compile group: 'commons-io', name: 'commons-io', version: '2.5'
compile group: 'commons-codec', name: 'commons-codec', version: '1.10'
//⽇志-slf4j
compile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.25'
核⼼代码:
1:解码器,⽤来解析请求
package cn.ayl.socket.decoder;
import fig.Const;
import cn.ayl.socket.handler.HeartBeatHandler;
import cn.ayl.socket.handler.HttpAndWebSocketHandler;
import ioty.channel.ChannelInitializer;
import ioty.channel.ChannelPipeline;
import ioty.channel.socket.SocketChannel;
import dec.http.HttpObjectAggregator;
import dec.http.HttpServerCodec;
import dec.http.websocketx.WebSocketServerProtocolHandler;
import ioty.handler.stream.ChunkedWriteHandler;
import ioty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* created by Rock-Ayl 2019-11-6
* WebSocket请求的解码器,⼀个请求需要先从这⾥⾛,实现init
*/
public class HttpAndWebSocketDecoder extends ChannelInitializer<SocketChannel> {
protected static Logger logger = Logger(HttpAndWebSocketDecoder.class);
@Override
protected void initChannel(SocketChannel ch) throws Exception {
logger.info("解析请求.");
ChannelPipeline pipeline = ch.pipeline();
//http解码
initHttpChannel(pipeline);
//⼼跳检测
initHeartBeat(pipeline);
/
/基于http的WebSocket
initWebSocket(pipeline);
//处理器
pipeline.addLast(new HttpAndWebSocketHandler());
}
//Http部分
private void initHttpChannel(ChannelPipeline pipeline) throws Exception {
//http解码器(webSocket是http的升级)
pipeline.addLast(new HttpServerCodec());
//以块的⽅式来写的处理器,解决⼤码流的问题,ChunkedWriteHandler:可以向客户端发送HTML5⽂件
pipeline.addLast(new ChunkedWriteHandler());
//netty是基于分段请求的,HttpObjectAggregator的作⽤是将HTTP消息的多个部分合成⼀条完整的HTTP消息,参数是聚合字节的最⼤长度
pipeline.addLast(new HttpObjectAggregator(Const.MaxContentLength));
}
//⼼跳部分
private void initHeartBeat(ChannelPipeline pipeline) throws Exception {
// 针对客户端,如果在1分钟时没有向服务端发送读写⼼跳(ALL),则主动断开,如果是读空闲或者写空闲,不处理
pipeline.addLast(new IdleStateHandler(Const.ReaderIdleTimeSeconds, Const.WriterIdleTimeSeconds, Const.AllIdleTimeSeconds));
// ⾃定义的空闲状态检测
pipeline.addLast(new HeartBeatHandler());
}
//WebSocket部分
private void initWebSocket(ChannelPipeline pipeline) throws Exception {
/**
* WebSocketServerProtocolHandler负责websocket握⼿以及处理控制框架(Close,Ping(⼼跳检检测request),Pong(⼼跳检测响应))。        * 参数为ws请求的访问路径 eg:ws://127.0.0.1:8888/WebSocket。
*/
pipeline.addLast(new WebSocketServerProtocolHandler(Const.WebSocketPath));
}
}
2:处理器,⼀共两个,⼀个⽤来控制⼼跳,⼀个⽤来分发请求并处理
package cn.ayl.socket.handler;
import ioty.channel.Channel;
import ioty.channel.ChannelHandlerContext;
import ioty.channel.ChannelInboundHandlerAdapter;
import ioty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* created by Rock-Ayl 2019-11-17
* WebSocket⼼跳处理程序
*/
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
protected static Logger logger = Logger(HeartBeatHandler.class);
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
// 判断evt是否是 IdleStateEvent(超时的事件,⽤于触发⽤户事件,包含读空闲/写空闲/读写空闲)        if (msg instanceof IdleStateEvent) {
// 强制类型转换
IdleStateEvent event = (IdleStateEvent) msg;
switch (event.state()) {
case READER_IDLE:
websocket和socketlogger.info("进⼊读空闲...");
break;
case WRITER_IDLE:
logger.info("进⼊写空闲...");
break;
case ALL_IDLE:
logger.info("开始杀死⽆⽤通道,节约资源");
Channel channel = ctx.channel();
channel.close();
break;
}
}
}
}
package cn.ayl.socket.handler;
import cn.ayl.json.JsonObject;
import cn.ayl.json.JsonUtil;
import ioty.buffer.ByteBuf;
import ioty.channel.*;
import dec.http.*;
import dec.http.multipart.DefaultHttpDataFactory;
import dec.http.multipart.HttpPostRequestDecoder;
import dec.http.multipart.InterfaceHttpData;
import dec.http.multipart.MemoryAttribute;
import dec.http.websocketx.*;
import ioty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map;
import static ioty.piedBuffer;
/
**
* created by Rock-Ayl on 2019-11-7
* Http请求和WebSocket请求的处理程序
*/
public class HttpAndWebSocketHandler extends ChannelInboundHandlerAdapter {
protected static Logger logger = Logger(HttpAndWebSocketHandler.class);
private WebSocketServerHandshaker webSocketServerHandshaker;
/**
* 通道,请求过来从这⾥分类
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//处理Http请求和WebSocket请求的分别处理
if (msg instanceof FullHttpRequest) {
handleHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof HttpContent) {
//todo handleHttpContent
} else if (msg instanceof WebSocketFrame) {
handleWebSocketRequest(ctx, (WebSocketFrame) msg);
}
}
/**
* 每个channel都有⼀个唯⼀的id值
* asLongText⽅法是channel的id的全名
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//todo 连接打开时
logger.info(ctx.channel().localAddress().toString() + " handlerAdded!, channelId=" + ctx.channel().id().asLongText());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
//todo 连接关闭时
logger.info(ctx.channel().localAddress().toString() + " handlerRemoved!, channelId=" + ctx.channel().id().asLongText());    }
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//todo 出现异常
<("Client:" + ctx.channel().remoteAddress() + "error", Message());
ctx.close();
}
// 处理Websocket的代码
private void handleWebSocketRequest(ChannelHandlerContext ctx, WebSocketFrame frame) {
// 判断是否是关闭链路的指令
if (frame instanceof CloseWebSocketFrame) {
webSocketServerHandshaker.close(ctx.channel(), (CloseWebSocketFrame) ain());
return;
}
// 判断是否是Ping消息
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new t().retain()));
return;
}
// ⽂本消息,不⽀持⼆进制消息
if (frame instanceof TextWebSocketFrame) {