WebSocket 是在 http protocol 進行雙向通訊傳輸的協定,可以用 UTF-8 Text 或 Binary format。message 沒有長度限制,但 framing 有限制長度。可發送無限個訊息。訊息必須依照順序傳送,無法支援 interleaved messages。
WebSocket connection state
有四種
State | Description |
---|---|
Connecting | 當 HTTP upgrade 到 Websocket |
Open | socket is open, ready to read/write |
Closing | 啟動 WebSocket Close Handshake |
Closed | websocket is closed |
WebSocket Events
Event | Description |
---|---|
on Connect | 成功連線,會收到 org.eclipse.jetty.websocket.api.Session object reference,這是該 socket 的 session |
on Close | 會有 Status Code |
on Error | websocket 發生 error |
on Message | 代表收到完整的 message,可以是 UTF-8 Text 或是 raw BINARY message |
Jetty 提供的 WebSocket Spec
RFC-6455
目前支援 WebSocket Protocol version 13JSR-356
Java WebScoket API (javax.webscoket),這是處理 websocket 的標準 java API
目前還不穩定的功能
perframe-compression
Per Frame Compression Extension
這是 Google/Chromium team 提供的 frame compression,但還在 early draft,Jetty 支援 draft-04 spec,目前已經被 permessage-compression 取代
permessage-compression
Per Message Compression Extension
將壓縮改為整個 message,而不是每一個 frame
WebSocket Session
websocket Session 物件有以下的使用方式
檢查 connection state (opened or not)
if(session.isOpen()) { }
檢查 secure
if(session.isSecure()) { // connection is using 'wss://' }
有哪些在 Upgrade Request and Response
UpgradeRequest req = session.getUpgradeRequest(); String channelName = req.getParameterMap().get("channelName"); UpgradeResponse resp = session.getUpgradeResponse(); String subprotocol = resp.getAcceptedSubProtocol();
取得 Local and Remote Address
InetSocketAddress remoteAddr = session.getRemoteAddress();
存取 idle timeout
session.setIdleTimeout(2000); // 2 second timeout
Jetty WebSocket API
同時支援 server 及 client
要開發 Jetty Websocket 程式,首先要在 Maven POM 加上 library,因測試同時要支援 RFC-6455 及 JSR-356,故同時加上了兩種 library
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>tw.com.maxkit</groupId>
<artifactId>test</artifactId>
<version>0.1</version>
<properties>
<jetty.version>9.4.12.v20180830</jetty.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-maven-plugin</artifactId>
<version>${jetty.version}</version>
<configuration>
<scanIntervalSeconds>2</scanIntervalSeconds>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!--Jetty dependencies start here -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>${jetty.version}</version>
</dependency>
<!--Jetty dependencies end here -->
<!--Jetty Websocket server side dependencies start here -->
<!--Jetty JSR-356 Websocket server side dependency -->
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>javax-websocket-server-impl</artifactId>
<version>${jetty.version}</version>
</dependency>
<!--Jetty Websocket API server side dependency -->
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-server</artifactId>
<version>${jetty.version}</version>
</dependency>
<!--Jetty Websocket server dependencies end here -->
<!--Jetty Websocket client side dependencies start here -->
<!--JSR-356 Websocket client side depencency -->
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>javax-websocket-client-impl</artifactId>
<version>${jetty.version}</version>
</dependency>
<!--Jetty Websocket API client side dependency -->
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-client</artifactId>
<version>${jetty.version}</version>
</dependency>
<!--Jetty Websocket client side dependencies end here -->
</dependencies>
</project>
RFC-6455 websocket Server
首先要將 Jetty path 透過 WebSocketServlet 跟 WebSocket class 綁定。
以下是 ToUpperWebSocketServlet 的 servlet,會處理 /toUpper
這個 url,因為在 IDE 裡面,通常會將 webapp 對應到某個 context,假設是 test,那麼 websocket 服務的 url,應該是 ws://localhost:8080/test/toUpper
ToUpperWebSocketServlet.java
package tw.com.maxkit.jetty.server;
import javax.servlet.annotation.WebServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
@WebServlet(name = "ToUpper WebSocket Servlet", urlPatterns="/toUpper")
public class ToUpperWebSocketServlet extends WebSocketServlet{
@Override
public void configure(WebSocketServletFactory factory) {
// set a 10 second timeout
factory.getPolicy().setIdleTimeout(10000);
// factory.register(ToUpperWebSocket.class);
// factory.register(ToUpperWebSocketListener.class);
factory.register(ToUpperWebSocketAdapter.class);
}
}
程式裡面設定了 ide timeout 的時間為 10s,另外有三種真正實作 websocket 訊息的方式,如果要使用某一種實作方式,只要調整 register 的 implementation class 即可。
// factory.register(ToUpperWebSocket.class);
// factory.register(ToUpperWebSocketListener.class);
factory.register(ToUpperWebSocketAdapter.class);
- WebSocket annotation
annotation | description |
---|---|
@WebSocket | 將這個 POJO 標記為 WebSocket,class 不能是 abstract and public |
@OnWebSocketClose | (optional) 收到 onClose event |
@OnWebSocketMessage | (optional) 有兩個 method,分別是 TEXT 與 BINARY message |
@OnWebSocketError | (optional) 收到 error event |
@OnWebSocketFrame | (optional) 收到 frame event |
ToUppderWebSocket.java
package tw.com.maxkit.jetty.server;
import java.io.IOException;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
@WebSocket
public class ToUpperWebSocket {
@OnWebSocketMessage
public void onText(Session session, String message) throws IOException {
System.out.println("ToUpperWebSocket received:" + message);
if (session.isOpen()) {
String response = message.toUpperCase();
session.getRemote().sendString(response);
}
}
@OnWebSocketConnect
public void onConnect(Session session) throws IOException {
System.out.println( session.getRemoteAddress().getHostName() + " connected!");
}
@OnWebSocketClose
public void onClose(Session session, int status, String reason) {
System.out.println(session.getRemoteAddress().getHostName() + " closed!");
}
}
- WebSocketListener
ToUpperWebSocketListener.java
package tw.com.maxkit.jetty.server;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketListener;
public class ToUpperWebSocketListener implements WebSocketListener {
private Session outbound;
public void onWebSocketBinary(byte[] payload, int offset, int len) {
/* only interested in text messages */
}
public void onWebSocketClose(int statusCode, String reason) {
this.outbound = null;
}
public void onWebSocketConnect(Session session) {
this.outbound = session;
}
public void onWebSocketError(Throwable cause) {
cause.printStackTrace(System.err);
}
public void onWebSocketText(String message) {
if ((outbound != null) && (outbound.isOpen())) {
System.out.printf("ToUpperWebSocketListener [%s]%n", message);
// echo the message back
outbound.getRemote().sendString(message.toUpperCase(), null);
}
}
}
- WebSocketAdpapter
比 listener 簡單,提供檢查 session state 的 methods
ToUpperWebSocketAdapter.java
package tw.com.maxkit.jetty.server;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import java.io.IOException;
public class ToUpperWebSocketAdapter extends WebSocketAdapter
{
@Override
public void onWebSocketText(String message)
{
if (isConnected())
{
try
{
System.out.printf("ToUpperWebSocketAdapter received: [%s]%n",message);
// echo the message back
getRemote().sendString(message.toUpperCase());
}
catch (IOException e)
{
e.printStackTrace(System.err);
}
}
}
}
JSR-356 websocket Server
在網址 ws://localhost:8008/test/jsr356toUpper 提供服務
ToUpper356Socket.java
package tw.com.maxkit.jsr356.server;
import java.io.IOException;
import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
@ServerEndpoint("/jsr356toUpper")
public class ToUpper356Socket {
@OnOpen
public void onOpen(Session session) {
System.out.println("WebSocket opened: " + session.getId());
}
@OnMessage
public void onMessage(String txt, Session session) throws IOException {
System.out.println("Message received: " + txt);
session.getBasicRemote().sendText(txt.toUpperCase());
}
@OnClose
public void onClose(CloseReason reason, Session session) {
System.out.println("Closing a WebSocket due to " + reason.getReasonPhrase());
}
}
測試網頁
websocketecho.html
<html>
<body>
<div>
<input type="text" id="input" />
</div>
<div>
<input type="button" id="connectBtn" value="CONNECT"
onclick="connect()" /> <input type="button" id="sendBtn"
value="SEND" onclick="send()" disabled="true" />
</div>
<div id="output">
<p>Output</p>
</div>
</body>
<script type="text/javascript">
var webSocket;
var output = document.getElementById("output");
var connectBtn = document.getElementById("connectBtn");
var sendBtn = document.getElementById("sendBtn");
function connect() {
// oprn the connection if one does not exist
if (webSocket !== undefined
&& webSocket.readyState !== WebSocket.CLOSED) {
return;
}
// Create a websocket
webSocket = new WebSocket("ws://localhost:8080/test/toUpper");
webSocket.onopen = function(event) {
updateOutput("Connected!");
connectBtn.disabled = true;
sendBtn.disabled = false;
};
webSocket.onmessage = function(event) {
updateOutput(event.data);
};
webSocket.onclose = function(event) {
updateOutput("Connection Closed");
connectBtn.disabled = false;
sendBtn.disabled = true;
};
}
function send() {
var text = document.getElementById("input").value;
webSocket.send(text);
}
function closeSocket() {
webSocket.close();
}
function updateOutput(text) {
output.innerHTML += "<br/>" + text;
}
</script>
</html>
WebSocket Client
client 同樣分 RFC-6455 與 JSR-356 兩種
RFC-6455
WebSocketClientMain.java
package tw.com.maxkit.jetty.client;
import java.net.URI;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
public class WebSocketClientMain {
public static void main(String[] args) {
String dest = "ws://localhost:8080/test/toUpper";
WebSocketClient client = new WebSocketClient();
try {
ToUpperClientSocket socket = new ToUpperClientSocket();
client.start();
URI echoUri = new URI(dest);
ClientUpgradeRequest request = new ClientUpgradeRequest();
client.connect(socket, echoUri, request);
socket.getLatch().await();
socket.sendMessage("echo");
socket.sendMessage("test");
Thread.sleep(10000l);
} catch (Throwable t) {
t.printStackTrace();
} finally {
try {
client.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
ToUpperClientSocket.java
package tw.com.maxkit.jetty.client;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
@WebSocket
public class ToUpperClientSocket {
private Session session;
CountDownLatch latch= new CountDownLatch(1);
@OnWebSocketMessage
public void onText(Session session, String message) throws IOException {
System.out.println("Message received from server:" + message);
}
@OnWebSocketConnect
public void onConnect(Session session) {
System.out.println("Connected to server");
this.session=session;
latch.countDown();
}
public void sendMessage(String str) {
try {
session.getRemote().sendString(str);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public CountDownLatch getLatch() {
return latch;
}
}
JSR-356 Client
WebSocket356ClientMain.java
package tw.com.maxkit.jsr356.client;
import java.net.URI;
import javax.websocket.ContainerProvider;
import javax.websocket.WebSocketContainer;
public class WebSocket356ClientMain {
public static void main(String[] args) {
try {
String dest = "ws://localhost:8080/test/jsr356toUpper";
ToUpper356ClientSocket socket = new ToUpper356ClientSocket();
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
container.connectToServer(socket, new URI(dest));
socket.getLatch().await();
socket.sendMessage("echo356");
socket.sendMessage("test356");
Thread.sleep(10000l);
} catch (Throwable t) {
t.printStackTrace();
}
}
}
ToUpper356ClientSocket.java
package tw.com.maxkit.jsr356.client;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import javax.websocket.ClientEndpoint;
import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
@ClientEndpoint
public class ToUpper356ClientSocket {
CountDownLatch latch = new CountDownLatch(1);
private Session session;
@OnOpen
public void onOpen(Session session) {
System.out.println("Connected to server");
this.session = session;
latch.countDown();
}
@OnMessage
public void onText(String message, Session session) {
System.out.println("Message received from server:" + message);
}
@OnClose
public void onClose(CloseReason reason, Session session) {
System.out.println("Closing a WebSocket due to " + reason.getReasonPhrase());
}
public CountDownLatch getLatch() {
return latch;
}
public void sendMessage(String str) {
try {
session.getBasicRemote().sendText(str);
} catch (IOException e) {
e.printStackTrace();
}
}
}
Sending Message to Remote Endpoint
發送訊息有幾種方式
Blocking Send Message
在完成訊息發送後,該 method 才會 return
這是發送 binary message
RemoteEndpoint remote = session.getRemote();
// Blocking Send of a BINARY message to remote endpoint
ByteBuffer buf = ByteBuffer.wrap(new byte[] { 0x11, 0x22, 0x33, 0x44 });
try
{
remote.sendBytes(buf);
}
catch (IOException e)
{
e.printStackTrace(System.err);
}
這是發送 text message
RemoteEndpoint remote = session.getRemote();
// Blocking Send of a TEXT message to remote endpoint
try
{
remote.sendString("Hello World");
}
catch (IOException e)
{
e.printStackTrace(System.err);
}
發送 Partial Message
如果有個大訊息,希望切割成多個部分,可利用 partial message sending methods,最後一個的 isLast == true
binary message
RemoteEndpoint remote = session.getRemote();
// Blocking Send of a BINARY message to remote endpoint
// Part 1
ByteBuffer buf1 = ByteBuffer.wrap(new byte[] { 0x11, 0x22 });
// Part 2 (last part)
ByteBuffer buf2 = ByteBuffer.wrap(new byte[] { 0x33, 0x44 });
try
{
remote.sendPartialBytes(buf1,false);
remote.sendPartialBytes(buf2,true); // isLast is true
}
catch (IOException e)
{
e.printStackTrace(System.err);
}
text message
RemoteEndpoint remote = session.getRemote();
// Blocking Send of a TEXT message to remote endpoint
String part1 = "Hello";
String part2 = " World";
try
{
remote.sendPartialString(part1,false);
remote.sendPartialString(part2,true); // last part
}
catch (IOException e)
{
e.printStackTrace(System.err);
}
發送 Ping / Pong Control Frame
PING
RemoteEndpoint remote = session.getRemote();
// Blocking Send of a PING to remote endpoint
String data = "You There?";
ByteBuffer payload = ByteBuffer.wrap(data.getBytes());
try
{
remote.sendPing(payload);
}
catch (IOException e)
{
e.printStackTrace(System.err);
}
PONG
RemoteEndpoint remote = session.getRemote();
// Blocking Send of a PONG to remote endpoint
String data = "Yup, I'm here";
ByteBuffer payload = ByteBuffer.wrap(data.getBytes());
try
{
remote.sendPong(payload);
}
catch (IOException e)
{
e.printStackTrace(System.err);
}
發非同步訊息發送
有兩個 async send message methods
- RemoteEndpoint.sendBytesByFuture(ByteBuffer message)
- RemoteEndpoint.sendStringByFuture(String message)
會回傳 java.util.concurrent.Future
binary
RemoteEndpoint remote = session.getRemote();
// Async Send of a BINARY message to remote endpoint
ByteBuffer buf = ByteBuffer.wrap(new byte[] { 0x11, 0x22, 0x33, 0x44 });
remote.sendBytesByFuture(buf);
可利用 get 等待發送是否完成
RemoteEndpoint remote = session.getRemote();
// Async Send of a BINARY message to remote endpoint
ByteBuffer buf = ByteBuffer.wrap(new byte[] { 0x11, 0x22, 0x33, 0x44 });
try
{
Future<Void> fut = remote.sendBytesByFuture(buf);
// wait for completion (forever)
fut.get();
}
catch (ExecutionException | InterruptedException e)
{
// Send failed
e.printStackTrace();
}
可在 get 加上 timeout 時間
RemoteEndpoint remote = session.getRemote();
// Async Send of a BINARY message to remote endpoint
ByteBuffer buf = ByteBuffer.wrap(new byte[] { 0x11, 0x22, 0x33, 0x44 });
Future<Void> fut = null;
try
{
fut = remote.sendBytesByFuture(buf);
// wait for completion (timeout)
fut.get(2,TimeUnit.SECONDS);
}
catch (ExecutionException | InterruptedException e)
{
// Send failed
e.printStackTrace();
}
catch (TimeoutException e)
{
// timeout
e.printStackTrace();
if (fut != null)
{
// cancel the message
fut.cancel(true);
}
}
text 訊息跟 binary 類似,只是將 sendBytesByFuture 換成 sendStringByFuture
沒有留言:
張貼留言