这篇文章主要讲解了“nacos server中PushService的原理和应用”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“nacos server中PushService的原理和应用”吧!

十多年建站经验, 成都网站制作、成都网站建设、外贸营销网站建设客户的见证与正确选择。创新互联提供完善的营销型网页建站明细报价表。后期开发更加便捷高效,我们致力于追求更美、更快、更规范。
本文主要研究一下nacos server的PushService
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java
@Component public class PushService implements ApplicationContextAware, ApplicationListener{ @Autowired private SwitchDomain switchDomain; private ApplicationContext applicationContext; private static final long ACK_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10L); private static final int MAX_RETRY_TIMES = 1; private static volatile ConcurrentMap ackMap = new ConcurrentHashMap (); private static ConcurrentMap > clientMap = new ConcurrentHashMap >(); private static volatile ConcurrentHashMap udpSendTimeMap = new ConcurrentHashMap (); public static volatile ConcurrentHashMap pushCostMap = new ConcurrentHashMap (); private static int totalPush = 0; private static int failedPush = 0; private static ConcurrentHashMap lastPushMillisMap = new ConcurrentHashMap<>(); private static DatagramSocket udpSocket; private static Map futureMap = new ConcurrentHashMap<>(); private static ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); t.setName("com.alibaba.nacos.naming.push.retransmitter"); return t; } }); private static ScheduledExecutorService udpSender = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); t.setName("com.alibaba.nacos.naming.push.udpSender"); return t; } }); static { try { udpSocket = new DatagramSocket(); Receiver receiver = new Receiver(); Thread inThread = new Thread(receiver); inThread.setDaemon(true); inThread.setName("com.alibaba.nacos.naming.push.receiver"); inThread.start(); executorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { removeClientIfZombie(); } catch (Throwable e) { Loggers.PUSH.warn("[NACOS-PUSH] failed to remove client zombie"); } } }, 0, 20, TimeUnit.SECONDS); } catch (SocketException e) { Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service"); } } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } //...... public static void removeClientIfZombie() { int size = 0; for (Map.Entry > entry : clientMap.entrySet()) { ConcurrentMap clientConcurrentMap = entry.getValue(); for (Map.Entry entry1 : clientConcurrentMap.entrySet()) { PushClient client = entry1.getValue(); if (client.zombie()) { clientConcurrentMap.remove(entry1.getKey()); } } size += clientConcurrentMap.size(); } if (Loggers.PUSH.isDebugEnabled()) { Loggers.PUSH.debug("[NACOS-PUSH] clientMap size: {}", size); } } //...... }
PushService实现了ApplicationContextAware、ApplicationListener
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java
public static class Receiver implements Runnable {
@Override
public void run() {
while (true) {
byte[] buffer = new byte[1024 * 64];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
try {
udpSocket.receive(packet);
String json = new String(packet.getData(), 0, packet.getLength(), Charset.forName("UTF-8")).trim();
AckPacket ackPacket = JSON.parseObject(json, AckPacket.class);
InetSocketAddress socketAddress = (InetSocketAddress) packet.getSocketAddress();
String ip = socketAddress.getAddress().getHostAddress();
int port = socketAddress.getPort();
if (System.nanoTime() - ackPacket.lastRefTime > ACK_TIMEOUT_NANOS) {
Loggers.PUSH.warn("ack takes too long from {} ack json: {}", packet.getSocketAddress(), json);
}
String ackKey = getACKKey(ip, port, ackPacket.lastRefTime);
AckEntry ackEntry = ackMap.remove(ackKey);
if (ackEntry == null) {
throw new IllegalStateException("unable to find ackEntry for key: " + ackKey
+ ", ack json: " + json);
}
long pushCost = System.currentTimeMillis() - udpSendTimeMap.get(ackKey);
Loggers.PUSH.info("received ack: {} from: {}:, cost: {} ms, unacked: {}, total push: {}",
json, ip, port, pushCost, ackMap.size(), totalPush);
pushCostMap.put(ackKey, pushCost);
udpSendTimeMap.remove(ackKey);
} catch (Throwable e) {
Loggers.PUSH.error("[NACOS-PUSH] error while receiving ack data", e);
}
}
}
//......
public static class AckPacket {
public String type;
public long lastRefTime;
public String data;
}
}Receiver实现了Runnable接口,其run方法使用while true循环来执行udpSocket.receive,之后解析AckPacket,从ackMap移除该ackKey,更新pushCostMap,同时从udpSendTimeMap移除该ackKey
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java
public class PushClient {
private String namespaceId;
private String serviceName;
private String clusters;
private String agent;
private String tenant;
private String app;
private InetSocketAddress socketAddr;
private DataSource dataSource;
private Map params;
public Map getParams() {
return params;
}
public void setParams(Map params) {
this.params = params;
}
public long lastRefTime = System.currentTimeMillis();
public PushClient(String namespaceId,
String serviceName,
String clusters,
String agent,
InetSocketAddress socketAddr,
DataSource dataSource,
String tenant,
String app) {
this.namespaceId = namespaceId;
this.serviceName = serviceName;
this.clusters = clusters;
this.agent = agent;
this.socketAddr = socketAddr;
this.dataSource = dataSource;
this.tenant = tenant;
this.app = app;
}
public DataSource getDataSource() {
return dataSource;
}
public PushClient(InetSocketAddress socketAddr) {
this.socketAddr = socketAddr;
}
public boolean zombie() {
return System.currentTimeMillis() - lastRefTime > switchDomain.getPushCacheMillis(serviceName);
}
@Override
public String toString() {
return "serviceName: " + serviceName
+ ", clusters: " + clusters
+ ", ip: " + socketAddr.getAddress().getHostAddress()
+ ", port: " + socketAddr.getPort()
+ ", agent: " + agent;
}
public String getAgent() {
return agent;
}
public String getAddrStr() {
return socketAddr.getAddress().getHostAddress() + ":" + socketAddr.getPort();
}
public String getIp() {
return socketAddr.getAddress().getHostAddress();
}
@Override
public int hashCode() {
return Objects.hash(serviceName, clusters, socketAddr);
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof PushClient)) {
return false;
}
PushClient other = (PushClient) obj;
return serviceName.equals(other.serviceName) && clusters.equals(other.clusters) && socketAddr.equals(other.socketAddr);
}
public String getClusters() {
return clusters;
}
public void setClusters(String clusters) {
this.clusters = clusters;
}
public String getNamespaceId() {
return namespaceId;
}
public void setNamespaceId(String namespaceId) {
this.namespaceId = namespaceId;
}
public String getServiceName() {
return serviceName;
}
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
public String getTenant() {
return tenant;
}
public void setTenant(String tenant) {
this.tenant = tenant;
}
public String getApp() {
return app;
}
public void setApp(String app) {
this.app = app;
}
public InetSocketAddress getSocketAddr() {
return socketAddr;
}
public void refresh() {
lastRefTime = System.currentTimeMillis();
}
} PushClient封装了要推送的目标服务地址等信息,它提供了zombie方法来判断目标服务是否zombie,它判断距离lastRefTime的时间差是否超过switchDomain指定的该serviceName的PushCacheMillis(默认为10秒),超过则判定为zombie
@Component public class PushService implements ApplicationContextAware, ApplicationListener{ //...... @Override public void onApplicationEvent(ServiceChangeEvent event) { Service service = event.getService(); String serviceName = service.getName(); String namespaceId = service.getNamespaceId(); Future future = udpSender.schedule(new Runnable() { @Override public void run() { try { Loggers.PUSH.info(serviceName + " is changed, add it to push queue."); ConcurrentMap clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName)); if (MapUtils.isEmpty(clients)) { return; } Map cache = new HashMap<>(16); long lastRefTime = System.nanoTime(); for (PushClient client : clients.values()) { if (client.zombie()) { Loggers.PUSH.debug("client is zombie: " + client.toString()); clients.remove(client.toString()); Loggers.PUSH.debug("client is zombie: " + client.toString()); continue; } Receiver.AckEntry ackEntry; Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString()); String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent()); byte[] compressData = null; Map data = null; if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) { org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key); compressData = (byte[]) (pair.getValue0()); data = (Map ) pair.getValue1(); Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr()); } if (compressData != null) { ackEntry = prepareAckEntry(client, compressData, data, lastRefTime); } else { ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime); if (ackEntry != null) { cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data)); } } Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}", client.getServiceName(), client.getAddrStr(), client.getAgent(), (ackEntry == null ? null : ackEntry.key)); udpPush(ackEntry); } } catch (Exception e) { Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e); } finally { futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName)); } } }, 1000, TimeUnit.MILLISECONDS); futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future); } //...... public void serviceChanged(Service service) { // merge some change events to reduce the push frequency: if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) { return; } this.applicationContext.publishEvent(new ServiceChangeEvent(this, service)); } //...... }
onApplicationEvent会处理ServiceChangeEvent,它会注册一个延时任务并将该future放入futureMap;该延时任务会从clientMap获取指定namespaceId, serviceName的clients;然后遍历clients判断是否是zombie,如果是的话则移除该client,否则创建Receiver.AckEntry,然后执行udpPush(ackEntry),最后从futureMap移除该future;serviceChanged方法提供给外部调用发布ServiceChangeEvent
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java
@Component public class PushService implements ApplicationContextAware, ApplicationListener{ //...... public static class Receiver implements Runnable { //...... public static class AckEntry { public AckEntry(String key, DatagramPacket packet) { this.key = key; this.origin = packet; } public void increaseRetryTime() { retryTimes.incrementAndGet(); } public int getRetryTimes() { return retryTimes.get(); } public String key; public DatagramPacket origin; private AtomicInteger retryTimes = new AtomicInteger(0); public Map data; } //...... } private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) { if (ackEntry == null) { Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null."); return null; } if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) { Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key); ackMap.remove(ackEntry.key); udpSendTimeMap.remove(ackEntry.key); failedPush += 1; return ackEntry; } try { if (!ackMap.containsKey(ackEntry.key)) { totalPush++; } ackMap.put(ackEntry.key, ackEntry); udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis()); Loggers.PUSH.info("send udp packet: " + ackEntry.key); udpSocket.send(ackEntry.origin); ackEntry.increaseRetryTime(); executorService.schedule(new Retransmitter(ackEntry), TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS); return ackEntry; } catch (Exception e) { Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data, ackEntry.origin.getAddress().getHostAddress(), e); ackMap.remove(ackEntry.key); udpSendTimeMap.remove(ackEntry.key); failedPush += 1; return null; } } //...... }
udpPush方法会根据Receiver.AckEntry的信息进行判断,如果其重试次数大于MAX_RETRY_TIMES则终止push,将其从ackMap、udpSendTimeMap中移除;如果可以重试则将其ackEntry.key放入ackMap及udpSendTimeMap,然后执行udpSocket.send(ackEntry.origin)及ackEntry.increaseRetryTime(),并注册Retransmitter的延时任务;如果出现异常则将其从ackMap、udpSendTimeMap移除
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java
public static class Retransmitter implements Runnable {
Receiver.AckEntry ackEntry;
public Retransmitter(Receiver.AckEntry ackEntry) {
this.ackEntry = ackEntry;
}
@Override
public void run() {
if (ackMap.containsKey(ackEntry.key)) {
Loggers.PUSH.info("retry to push data, key: " + ackEntry.key);
udpPush(ackEntry);
}
}
}Retransmitter实现了Runnable方法,其run方法在ackMap包含ackEntry.key的条件下执行udpPush重试
PushService实现了ApplicationContextAware、ApplicationListener
其static代码块创建了一个deamon线程执行Receiver,同时注册了一个定时任务执行removeClientIfZombie,它会遍历clientMap,移除zombie的client
其onApplicationEvent会处理ServiceChangeEvent,它会注册一个延时任务并将该future放入futureMap;该延时任务会从clientMap获取指定namespaceId, serviceName的clients;然后遍历clients判断是否是zombie,如果是的话则移除该client,否则创建Receiver.AckEntry,然后执行udpPush(ackEntry),最后从futureMap移除该future;serviceChanged方法提供给外部调用发布ServiceChangeEvent
感谢各位的阅读,以上就是“nacos server中PushService的原理和应用”的内容了,经过本文的学习后,相信大家对nacos server中PushService的原理和应用这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!