package cn.com.atlants.websocket.util;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import javax.websocket.EncodeException;
import javax.websocket.SendHandler;
import javax.websocket.Session;
import cn.com.atlants.websocket.WebsocketHub;
import cn.com.atlants.websocket.vo.SessionTag;
/**
* websocket工具类
*
* @author wujinbin
* @date 2017年7月6日
*/
public class WebsocketUtil {
/**
* websocket标签-频道
*/
public final static String TAG_CHANNEL = "tag.channel";
/**
* 添加websocket标签
*
* @param sessionId
* sessionId
* @param tags
* 标签
*/
public static void AddTags(String sessionId, SessionTag... tags) {
for (SessionTag sessionTag : tags) {
WebsocketHub.addTag(sessionId, sessionTag);
}
}
/**
* 删除并关闭session
*
* @param sessionId
* @throws IOException
*/
public static void delAndCloseSession(String sessionId) throws IOException {
WebsocketHub.delAndCloseSession(sessionId);
}
/**
* 删除session
*
* @param sessionId
*/
public static void delSession(String sessionId) {
WebsocketHub.delSession(sessionId);
}
/**
* 根据标签属性获取session
*
* @param tags
* session标签
* @return
*/
public static Set<Session> getSession(SessionTag... tags) {
return WebsocketHub.getSessionsByTag(tags);
}
/**
* 根据sessionId获取session标签
*
* @param sessionId
* sessionId
* @return session标签
*/
public static Set<SessionTag> getTags(String sessionId) {
Session session = WebsocketHub.getSession(sessionId);
Set<SessionTag> result = new HashSet<SessionTag>();
if (session == null) {
return result;
}
Map<String, Object> userProperties = session.getUserProperties();
for (Entry<String, Object> entry : userProperties.entrySet()) {
if (entry.getValue() instanceof String) {
result.add(new SessionTag(entry.getKey(), (String) entry
.getValue()));
}
}
return result;
}
/**
* 用户是否在线
*
* @param tags
* 需要过滤的session属性
* @return
*/
public static boolean isOnline(SessionTag... tags) {
Set<Session> sessionsByTag = WebsocketHub.getSessionsByTag(tags);
for (Session session : sessionsByTag) {
if (session.isOpen()) {
return true;
}
}
return false;
}
/**
* 异步发送二进制流
*
* @param data
* 二进制流
* @param handler
* 消息头
* @param tags
* 需要发送的session标签值
*/
public static void sendBinary(ByteBuffer data, SendHandler handler,
SessionTag... tags) {
Set<Session> sessions = WebsocketHub.getSessionsByTag(tags);
for (Session session : sessions) {
WebsocketUtil.sendBinary(session, handler, data);
}
}
/**
* 异步发送二进制流
*
* @param data
* 二进制流
* @param tags
* 需要发送的session标签值
*/
public static void sendBinary(ByteBuffer data, SessionTag... tags) {
Set<Session> sessions = WebsocketHub.getSessionsByTag(tags);
for (Session session : sessions) {
WebsocketUtil.sendBinary(session, data);
}
}
/**
* 异步发送二进制流
*
* @param session
* 会话连接
* @param data
* 二进制流
*/
public static void sendBinary(Session session, ByteBuffer data) {
if (session == null || !session.isOpen()) {
return;
}
session.getAsyncRemote().sendBinary(data);
}
/**
* 异步发送二进制流
*
* @param session
* 会话连接
* @param handler
* 消息头
* @param data
* 二进制流
*/
public static void sendBinary(Session session, SendHandler handler,
ByteBuffer data) {
if (session == null || !session.isOpen()) {
return;
}
session.getAsyncRemote().sendBinary(data, handler);
}
/**
* 同步发送二进制流<br/>
* 若其中一个报错,则后面的发送不会执行
*
* @param data
* 二进制流
* @param tags
* 需要发送的session标签值
* @throws IOException
*/
public static void sendBinarySync(ByteBuffer data, SessionTag... tags)
throws IOException {
Set<Session> sessions = WebsocketHub.getSessionsByTag(tags);
for (Session session : sessions) {
WebsocketUtil.sendBinarySync(session, data);
}
}
/**
* 同步发送二进制流
*
* @param session
* 会话连接
* @param data
* 二进制流
* @throws IOException
*/
public static void sendBinarySync(Session session, ByteBuffer data)
throws IOException {
if (session == null || !session.isOpen()) {
return;
}
session.getBasicRemote().sendBinary(data);
}
/**
* 异步发送对象<br/>
* 发送对象必须能被编码器识别并正确转换
*
* @param object
* 对象
* @param handler
* 消息头
* @param tags
* 需要发送的session标签值
*/
public static void sendObject(Object object, SendHandler handler,
SessionTag... tags) {
Set<Session> sessions = WebsocketHub.getSessionsByTag(tags);
for (Session session : sessions) {
WebsocketUtil.sendObject(session, handler, object);
}
}
/**
* 异步发送对象<br/>
* 发送对象必须能被编码器识别并正确转换
*
* @param object
* 对象
* @param tags
* 需要发送的session标签值
*/
public static void sendObject(Object object, SessionTag... tags) {
Set<Session> sessions = WebsocketHub.getSessionsByTag(tags);
for (Session session : sessions) {
WebsocketUtil.sendObject(session, object);
}
}
/**
* 异步发送对象<br/>
* 发送对象必须能被编码器识别并正确转换
*
* @param session
* 会话连接
* @param object
* 发送对象
*/
public static void sendObject(Session session, Object object) {
if (session == null || !session.isOpen()) {
return;
}
session.getAsyncRemote().sendObject(object);
}
/**
* 异步发送对象<br/>
* 发送对象必须能被编码器识别并正确转换
*
* @param session
* 会话连接
* @param handler
* 消息头
* @param object
* 发送对象
*/
public static void sendObject(Session session, SendHandler handler,
Object object) {
if (session == null || !session.isOpen()) {
return;
}
session.getAsyncRemote().sendObject(object, handler);
}
/**
* 同步发送对象<br/>
* 发送对象必须能被编码器识别并正确转换<br/>
* 若其中一个报错,则后面的发送不会执行
*
* @param object
* 对象
* @param tags
* 需要发送的session标签值
* @throws IOException
* @throws EncodeException
*/
public static void sendObjectSync(Object object, SessionTag... tags)
throws IOException, EncodeException {
Set<Session> sessions = WebsocketHub.getSessionsByTag(tags);
for (Session session : sessions) {
WebsocketUtil.sendObjectSync(session, object);
}
}
/**
* 同步发送对象<br/>
* 发送对象必须能被编码器识别并正确转换
*
* @param session
* 会话连接
* @param object
* 发送对象
* @throws IOException
* @throws EncodeException
*/
public static void sendObjectSync(Session session, Object object)
throws IOException, EncodeException {
if (session == null || !session.isOpen()) {
return;
}
session.getBasicRemote().sendObject(object);
}
/**
* 异步发送文本
*
* @p