直策应用WebSocket由于没有高层级的协议,以是须要我们定义运用之间所发送的语义,还要确保联机的两端都能遵照。因此Stomp作为文本定向协议常常作为子协议用于集成WebSocket,用来定义的语义。要乞降相应数据格式类似,STOMP帧由命令、一个或多个头信息以及负载所组成。
下图是测试利用Stomp协议从做事端主动推送给客户端信息,这全体被称为“帧”,个中:
<<< MESSAGE #命令message-id:6834f845-49dd-480e-8b94-84d31372d744 #头信息subscription:sub-0 #头信息content-length:136 #头信息content-type:application/json #头信息{"content":"此条内容为发送的--你好,天下!
"} # 负载
由此可以看出Stomp既没有Http协议那样包含了太多的头信息而占用过多的网络带宽,又能供应比较丰富的格式语义方便双方的通讯。

“即便Netty供应了STOMP编辑码器,但STOMP 是为中间件而设计的一种通报协议,而Netty 供应的是异步的、事宜驱动的网络运用程序框架,并非作为代理,因此从功能契合的意义上说Netty不支持 STOMP协议。”———— Apache ActiveMQ 开拓职员 Justin Bertram
不过实际开拓中如果只是纯挚的采取Netty+WebSocket会导致通信形式层级过低从而导致通信缺少可靠性,以是Spring社区就提倡将Stomp作为WebSocket子协议增强通信的语义,《Spring实战》也给出了如何通集成Spring、WebSocket、Stomp如何进行集成开拓。但是Netty+WebSocket如何将Stomp作为子协议,本人搜索了Google、Stack Overflow后并没有详细的范例,直至后面在Netty的GitHub社区中看到了大略的范例解释,因此特意记录下来希望帮助到有须要的人。
详细代码可参考: https://github.com/1148973713/whales-netty
3.Netty通讯的握手事理在集成STOMP协议过程中,最主要的是官方供应的 WebSocketServerProtocolHandler 处理器(代码3-1):
protected void initChannel(SocketChannel channel) throws Exception { channel.pipeline() .addLast(new HttpServerCodec()) .addLast(new HttpObjectAggregator(65536)) .addLast(StompWebSocketClientPageHandler.INSTANCE) .addLast(new WebSocketServerProtocolHandler(chatPath, StompVersion.SUB_PROTOCOLS)) .addLast(stompWebSocketProtocolCodec); }
让我们进入WebSocketServerProtocolHandler类中创造此处须要传入的两个参数分别是:websocketPath--websocket地址、subprotocols--附加的子协议。
public WebSocketServerProtocolHandler(String websocketPath, String subprotocols) { this(websocketPath, subprotocols, 10000L); }
此方法终极调用的是(代码3-2):
public WebSocketServerProtocolHandler(String websocketPath, String subprotocols, boolean checkStartsWith, boolean dropPongFrames, long handshakeTimeoutMillis, WebSocketDecoderConfig decoderConfig) { this(WebSocketServerProtocolConfig.newBuilder() .websocketPath(websocketPath) .subprotocols(subprotocols) .checkStartsWith(checkStartsWith) .handshakeTimeoutMillis(handshakeTimeoutMillis) .dropPongFrames(dropPongFrames) .decoderConfig(decoderConfig) .build());}
个中WebSocketServerProtocolConfig.build()是将前面的各种配置参数传入,末了创建一个WebSocketServerProtocolConfig实体类:
public WebSocketServerProtocolConfig build() { return new WebSocketServerProtocolConfig( websocketPath, subprotocols, checkStartsWith, handshakeTimeoutMillis, forceCloseTimeoutMillis, handleCloseFrames, sendCloseFrame, dropPongFrames, decoderConfigBuilder == null ? decoderConfig : decoderConfigBuilder.build() ); }
此处为了读者能理解后续的代码实行流程先插入一个知识点,ChannelHandler 回调方法的实行顺序:
连接要求,handlerAdded () -> channelRegistered () -> channelActive () -> channelRead () -> channelReadComplete ();数据要求,channelRead () -> channelReadComplete ();通道被关闭,channelInactive () -> channelUnregistered () -> handlerRemoved ();handlerAdded ()、channelRegistered ()、channelActive ()只会在生命周期实行一次,后面便不会再被实行。
详细可以查看链接: https://book.itxueyuan.com/b2A7/ZGGQg
在Netty的ChannelHandler生命周期中handlerAdded()方法的实行是在检测到新要求连接时添加一个handler处理器到链表之中。经由WebSocketServerProtocolConfig.build()后,ChannelHandler会触发handlerAdded()方法[注:以下代码也是存在与WebSocketServerProtocolHandler类中](代码3-3)
@Overridepublic void handlerAdded(ChannelHandlerContext ctx) { ChannelPipeline cp = ctx.pipeline(); if (cp.get(WebSocketServerProtocolHandshakeHandler.class) == null) { // Add the WebSocketHandshakeHandler before this one. cp.addBefore(ctx.name(), WebSocketServerProtocolHandshakeHandler.class.getName(), new WebSocketServerProtocolHandshakeHandler(serverConfig)); } if (serverConfig.decoderConfig().withUTF8Validator() && cp.get(Utf8FrameValidator.class) == null) { // Add the UFT8 checking before this one. cp.addBefore(ctx.name(), Utf8FrameValidator.class.getName(), new Utf8FrameValidator(serverConfig.decoderConfig().closeOnProtocolViolation())); } }
在上述代码中由于handlerAdded中止定第一次连接时,Pipeline管道内不存在WebSocketServerProtocolHandshakeHandler(WebSocket做事器协议握手处理程序),就会通过addBefore()方法将WebSocketServerProtocolHandshakeHandler添加在WebSocketServerProtocolHandler前。
终极形成WebSocketServerProtocolHandshakeHandler------>Utf8FrameValidator(道理和上面一样)----->WebSocketServerProtocolHandler
我们先不管Utf8FrameValidator,着重剖析WebSocketServerProtocolHandshakeHandler处理器内部代码(代码3-4):
@Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { final HttpObject httpObject = (HttpObject) msg; if (httpObject instanceof HttpRequest) { //获取当前要求信息 final HttpRequest req = (HttpRequest) httpObject; //进行比对要求路径,验证协议 isWebSocketPath = isWebSocketPath(req); if (!isWebSocketPath) { ctx.fireChannelRead(msg); return; } try { ... ... //创建握手工厂,从参数可以看出:webSocketURL--webSocket的路径地址、subprotocols--添加的自定义子协议、decoderConfig解码设置 final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( getWebSocketLocation(ctx.pipeline(), req, serverConfig.websocketPath()), serverConfig.subprotocols(), serverConfig.decoderConfig()); ////创建一个握手处理器 final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req); //握手回调 final ChannelPromise localHandshakePromise = handshakePromise; if (handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { //设置处理器 WebSocketServerProtocolHandler.setHandshaker(ctx.channel(), handshaker); ctx.pipeline().remove(this); //ChannelFuture的浸染是用来保存Channel异步操作的结果。在Netty中所有的I/O操作都是异步的。ChannelFuture代表了异步打算的结果,这个接口的紧张方法便是检讨打算是否已完成,等待打算然后返回打算结果。 final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req); //进行结果监听 handshakeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { ... ... //如果发送成功情形,保持兼容性触发事宜 localHandshakePromise.trySuccess();//如果发送成功情形,则触发握手成功事宜 ctx.fireUserEventTriggered( new WebSocketServerProtocolHandler.HandshakeComplete( req.uri(), req.headers(), handshaker.selectedSubprotocol())); } }); applyHandshakeTimeout(); } } }
以下是笔者先用SpirngBoot+WebSocket整合Stomp协议做了基本模板,我们可以看到在相应头中存在sec-websocket-protocol属性,其意义便是所携带的STOMP协议和其版本信息:
如本文开头所说,STOMP协议意在让WebSocket协议变得可靠一些,通过客户端与做事端达成的语义相互确认后才进行连接通讯。因此如何获取客户端要求中的子协议信息并让做事端匹配确认,这一步骤在握手环节过程中是非常主要的。
以是,在上述代码中 wsFactory.newHandshaker(req) 、 handshaker.handshake(ctx.channel(), req) 两个方法实现了我们想要的效果。
首先是newHandshaker方法,它是属于WebSocketServerHandshakerFactory类中的方法(代码3-5):
public WebSocketServerHandshaker newHandshaker(HttpRequest req) { //获取头信息中sec-websocket-protocol属性的值 CharSequence version = req.headers().get(HttpHeaderNames.SEC_WEBSOCKET_VERSION); if (version != null) { //平凡的websocket协议版本 ... ... } else { //由于我们是附加了STOMP子协议,以是返回的是WebSocketServerHandshaker00 return new WebSocketServerHandshaker00(webSocketURL, subprotocols, decoderConfig); } }
WebSocketServerHandshaker00()紧张是构建一个自定义的握手办法,个中字符串参数subprotocols便是由开拓职员自定义的协议,它在先前WebSocketServerHandshakerFactory工具创建之前就被传入(详细可以看代码3-4)。随后subprotocols将会被分割形成保存自定义协议名称的数组。
之后的handshaker.handshake(ctx.channel(), req)方法将会开始处理握手,由于个中传入参数req正式客户端发过来的要求,以是此方法部分浸染便是用来比对客户端携带的sec-websocket-protocol属性值是否与做事端自定义属性相对应,如果是则返回握手成功的结果(代码3-6):
public ChannelFuture handshake(Channel channel, FullHttpRequest req) { return handshake(channel, req, null, channel.newPromise());}public final ChannelFuture handshake(Channel channel, FullHttpRequest req, HttpHeaders responseHeaders, final ChannelPromise promise) { ... ... FullHttpResponse response = newHandshakeResponse(req, responseHeaders); ... ... return promise;}
newHandshakeResponse由WebSocketServerHandshaker00类自身实现,它紧张的浸染便是用来判断匹配并在匹配成功后设置由做事端返回给客户真个相应内容(代码3-7):
@Override protected FullHttpResponse newHandshakeResponse(FullHttpRequest req, HttpHeaders headers) { // 做事WebSocket握手要求 if (!req.headers().containsValue(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE, true) || !HttpHeaderValues.WEBSOCKET.contentEqualsIgnoreCase(req.headers().get(HttpHeaderNames.UPGRADE))) { throw new WebSocketServerHandshakeException("not a WebSocket handshake request: missing upgrade", req); } // Hixie 75不包含这些报头,而Hixie 76包含 //查询头信息中是否包含sec-websocket-key1属性与sec-websocket-key2属性 boolean isHixie76 = req.headers().contains(HttpHeaderNames.SEC_WEBSOCKET_KEY1) && req.headers().contains(HttpHeaderNames.SEC_WEBSOCKET_KEY2); //获取头信息Origin(起源)值 String origin = req.headers().get(HttpHeaderNames.ORIGIN); ... ... if (isHixie76) { ... ... } else { //由于我们上面要求头是不包含上面两个属性的,以是直接到这里面了 //添加头属性与对应值 res.headers().add(HttpHeaderNames.WEBSOCKET_ORIGIN, origin); res.headers().add(HttpHeaderNames.WEBSOCKET_LOCATION, uri()); //获取sec-websocket-protocol属性值 String protocol = req.headers().get(HttpHeaderNames.WEBSOCKET_PROTOCOL); if (protocol != null) { //先比拟,后设置sec-websocket-protocol res.headers().set(HttpHeaderNames.WEBSOCKET_PROTOCOL, selectSubprotocol(protocol)); } } return res; }
在代码末了调用了selectSubprotocol(protocol),传入的参数是客户端发来的要求中携带的sec-websocket-protocol属性值(代码3-8):
protected String selectSubprotocol(String requestedSubprotocols) { ... ... //将sec-websocket-protocol属性值进行分割 String[] requestedSubprotocolArray = requestedSubprotocols.split(","); for (String p: requestedSubprotocolArray) { String requestedSubprotocol = p.trim(); for (String supportedSubprotocol: subprotocols) { if (SUB_PROTOCOL_WILDCARD.equals(supportedSubprotocol) || requestedSubprotocol.equals(supportedSubprotocol)) { selectedSubprotocol = requestedSubprotocol; return requestedSubprotocol; } } } return null; }
我们可以把稳到if中进行了requestedSubprotocol.equals(supportedSubprotocol)判断,个中requestedSubprotocol是要求中所携带的协议信息,而supportedSubprotocol则是自定义的协议信息,在(代码3-5)中已经随着WebSocketServerHandshaker00创建授予了值。
以是根据上述的流程,我们应该创建一个StompVersion和一个握手事宜成功后触发的处理器,分别用于握手前传入WebSocketServerProtocolHandler()方法以及握手成功后返回相应和对应的STOMP帧见告客户端连接成功可以进行通讯。
4.Netty如何兼容STOMP协议根据上述代码流程我们可以创造,当确认握手成功建立连接后,就会触发握手成功事宜。因此我门只须要编写事宜触发后吸收事宜的处理器,并添加到pipeline(返回看3-1代码)中(代码4-1):
public class StompWebSocketProtocolCodec extends MessageToMessageCodec<WebSocketFrame, StompSubframe> { private final StompChatHandler stompChatHandler = new StompChatHandler(); private final StompWebSocketFrameEncoder stompWebSocketFrameEncoder = new StompWebSocketFrameEncoder(); //userEventTriggered:当fireUserEventTriggered()方法触发时候被调用。 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { //当事宜属于HandshakeComplete(握手成功时),与上面new WebSocketServerProtocolHandler.HandshakeComplete对应 if (evt instanceof HandshakeComplete) { //查找出STOMP协议版本 StompVersion stompVersion = StompVersion.findBySubProtocol(((HandshakeComplete) evt).selectedSubprotocol()); ctx.channel().attr(StompVersion.CHANNEL_ATTRIBUTE_KEY).set(stompVersion); ctx.pipeline() .addLast(new WebSocketFrameAggregator(65536)) .addLast(new StompSubframeDecoder()) .addLast(new StompSubframeAggregator(65536)) .addLast(stompChatHandler) .remove(StompWebSocketClientPageHandler.INSTANCE); } else { super.userEventTriggered(ctx, evt); } } @Override protected void encode(ChannelHandlerContext ctx, StompSubframe stompFrame, List<Object> out) throws Exception { System.out.println("---------------------->进行编码"); stompWebSocketFrameEncoder.encode(ctx, stompFrame, out); } @Override protected void decode(ChannelHandlerContext ctx, WebSocketFrame webSocketFrame, List<Object> out) { System.out.println("---------------------->进行解码"); if (webSocketFrame instanceof TextWebSocketFrame || webSocketFrame instanceof BinaryWebSocketFrame) { out.add(webSocketFrame.content().retain()); } else { ctx.close(); } }}
在StompChatHandler处理器中(代码过长,直接下载文章开头的势力代码进行阅读即可),我们根据STOMP官方规定的命令(CONNECT、SUBSCRIBE、SEND、UNSUBSCRIBE、DISCONNECT)进行判断和对内容的处理。以下以SEND命令为例(代码4-2):
private final ConcurrentMap<String, Set<StompSubscription>> chatDestinations = new ConcurrentHashMap<String, Set<StompSubscription>>();private void onSend(ChannelHandlerContext ctx, StompFrame inboundFrame) { //获取目的地名称 String destination = inboundFrame.headers().getAsString(DESTINATION); if (destination == null) { sendErrorFrame("missed header", "required 'destination' header missed", ctx); return; } //根据目的地名称获取相应订阅地址信息 Set<StompSubscription> subscriptions = chatDestinations.get(destination); for (StompSubscription subscription : subscriptions) { subscription.channel().writeAndFlush(transformToMessage(inboundFrame, subscription)); } } private static StompFrame transformToMessage(StompFrame sendFrame, StompSubscription subscription) { Charset charset = StandardCharsets.UTF_8; //设置命令为MESSAGE,文本内容副本,并返回DefaultStompFrame类型 StompFrame messageFrame = new DefaultStompFrame(StompCommand.MESSAGE, sendFrame.content().retainedDuplicate()); String id = UUID.randomUUID().toString(); //设置STOMP帧的头信息,详细参照文章开始STOMP帧格式 messageFrame.headers() .set(MESSAGE_ID, id) .set(SUBSCRIPTION, subscription.id()) .set(CONTENT_LENGTH, Integer.toString(messageFrame.content().readableBytes())); //获取STOMP内容类型,默认未BYTEBUF CharSequence contentType = sendFrame.headers().get(CONTENT_TYPE); if (contentType != null) { //如果有,则也设置入头中 messageFrame.headers().set(CONTENT_TYPE, contentType); } //返回设置好的格式 return messageFrame; }
进入DefaultStompFrame可知,我们须要发送的文本被转化成ByteBuf类型。如果默认采取此类型而不加以设置会导致当内容为中文时产生乱码。
public DefaultStompFrame(StompCommand command, ByteBuf content) { this(command, content, null); }
如上问题,我们须要编写一个编码器StompWebSocketFrameEncoder(返回看代码4-1)对发送出去的进行编码,个中比较主要的方法如下(代码4-3):
@Override protected WebSocketFrame convertFullFrame(StompFrame original, ByteBuf encoded) { //如果content-type头类型为text或则是application/json,则采取TextWebSocketFrame编码办法 if (isTextFrame(original)) { return new TextWebSocketFrame(encoded); } //否则采取BinaryWebSocketFrame(二进制)编码办法 return new BinaryWebSocketFrame(encoded); } private static boolean isTextFrame(StompHeadersSubframe headersSubframe) { //判断STOMP帧是否携带content-type头,并且对类型进行判断 String contentType = headersSubframe.headers().getAsString(StompHeaders.CONTENT_TYPE); return contentType != null && (contentType.startsWith("text") || contentType.startsWith("application/json")); }
为什么发送出去要用的编码器而不是解码器,我们以下面这流程图为例:
5.编写StompVersion
纵然Spring官方推举采取SockJS以兼容更多的浏览器,但考虑到目前主流的浏览器都可以支持WebSocket协议,因此笔者便放弃采取SockJS+ Stomp.over()创建客户端,而是选用Stomp.client(url)。前者与后者差异紧张在于Stomp.client(url)调用时采取的是浏览器自身所支持的不同的WebSocket协议,而不是教由SockJS统一实现。
根据源码可以看到STOMP当的默认版本是 ['v10.stomp', 'v11.stomp'] ,因此要让Netty兼容STOMP则必须让做事端明白当前采取STOMP协议与其对应的版本,可以看到STOMP协议版本:v10.stomp、v11.stomp。
client: function(url, protocols) { var klass, ws; if (protocols == null) { protocols = ['v10.stomp', 'v11.stomp']; } klass = Stomp.WebSocketClass || WebSocket; ws = new klass(url, protocols); return new Client(ws); }
还记得(代码3-1)与(代码4-1)中 我们须要通报附加的子协议吗?这便是为什么须要编写StompVersion(代码5-1):
public enum StompVersion { STOMP_V11("1.1", "v11.stomp"), STOMP_V12("1.2", "v12.stomp"); //获取key为stomp_version的常量。如果没有这样的常量,将创建并返回一个新的常量。 public static final AttributeKey<StompVersion> CHANNEL_ATTRIBUTE_KEY = AttributeKey.valueOf("stomp_version"); public static final String SUB_PROTOCOLS; static { List<String> subProtocols = new ArrayList<String>(values().length); for (StompVersion stompVersion : values()) { subProtocols.add(stompVersion.subProtocol); } //将列举类型转化为字符串格式: v11.stomp,v12.stomp SUB_PROTOCOLS = StringUtil.join(",", subProtocols).toString(); } private final String version; private final String subProtocol; StompVersion(String version, String subProtocol) { this.version = version; this.subProtocol = subProtocol; } public String version() { return version; } public String subProtocol() { return subProtocol; } //根据subProtocol返回匹配上的列举 public static StompVersion findBySubProtocol(String subProtocol) { if (subProtocol != null) { for (StompVersion stompVersion : values()) { if (stompVersion.subProtocol().equals(subProtocol)) { return stompVersion; } } } }
在代码(代码4-1)中,即握手事宜成功时,我们须要比对并获取当前Stomp协议的版本,并将该版本信息设置进CHANNEL_ATTRIBUTE_KEY中:
StompVersion stompVersion = StompVersion.findBySubProtocol(((HandshakeComplete) evt).selectedSubprotocol()); ctx.channel().attr(StompVersion.CHANNEL_ATTRIBUTE_KEY).set(stompVersion);
而当客户端真正连接做事端,发送CONNECT命令时,就会触发StompChatHandler代码中onSend方法(代码5-2):
private static void onConnect(ChannelHandlerContext ctx, StompFrame inboundFrame) { String acceptVersions = inboundFrame.headers().getAsString(ACCEPT_VERSION); //获取当前采取的STOMP协议版本 StompVersion handshakeAcceptVersion = ctx.channel().attr(StompVersion.CHANNEL_ATTRIBUTE_KEY).get(); StompFrame connectedFrame = new DefaultStompFrame(StompCommand.CONNECTED); //设置头信息 connectedFrame.headers() .set(VERSION, handshakeAcceptVersion.version()) .set(SERVER, "Netty-Server") .set(HEART_BEAT, "0,0"); ctx.writeAndFlush(connectedFrame); }
这时候客户端会接管到做事端发送来封装好的帧,而这些帧正如HTTP中的头信息携带语义,可以让客户端得知当前连接成功以及开始后续其他操作:
<<< CONNECTEDversion:1.1server:Netty-Serverheart-beat:0,0
自此,全体Netty整合STOMP协议告一段落,由于除了Netty官网的示例外,暂时没有看到有其他博客对此有解释记录,因此尽兴了比较详细的讲解。内容或许过于分散,希望读者随着源码进行阅读理解。