這篇文章主要講解了“nacos server中PushService的原理和應用”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“nacos server中PushService的原理和應用”吧!
十多年建站經驗, 成都網站制作、成都網站建設、外貿營銷網站建設客戶的見證與正確選擇。創(chuàng)新互聯提供完善的營銷型網頁建站明細報價表。后期開發(fā)更加便捷高效,我們致力于追求更美、更快、更規(guī)范。
本文主要研究一下nacos server的PushService
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java
@Component public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> { @Autowired private SwitchDomain switchDomain; private ApplicationContext applicationContext; private static final long ACK_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10L); private static final int MAX_RETRY_TIMES = 1; private static volatile ConcurrentMap<String, Receiver.AckEntry> ackMap = new ConcurrentHashMap<String, Receiver.AckEntry>(); private static ConcurrentMap<String, ConcurrentMap<String, PushClient>> clientMap = new ConcurrentHashMap<String, ConcurrentMap<String, PushClient>>(); private static volatile ConcurrentHashMap<String, Long> udpSendTimeMap = new ConcurrentHashMap<String, Long>(); public static volatile ConcurrentHashMap<String, Long> pushCostMap = new ConcurrentHashMap<String, Long>(); private static int totalPush = 0; private static int failedPush = 0; private static ConcurrentHashMap<String, Long> lastPushMillisMap = new ConcurrentHashMap<>(); private static DatagramSocket udpSocket; private static Map<String, Future> futureMap = new ConcurrentHashMap<>(); private static ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); t.setName("com.alibaba.nacos.naming.push.retransmitter"); return t; } }); private static ScheduledExecutorService udpSender = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); t.setName("com.alibaba.nacos.naming.push.udpSender"); return t; } }); static { try { udpSocket = new DatagramSocket(); Receiver receiver = new Receiver(); Thread inThread = new Thread(receiver); inThread.setDaemon(true); inThread.setName("com.alibaba.nacos.naming.push.receiver"); inThread.start(); executorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { removeClientIfZombie(); } catch (Throwable e) { Loggers.PUSH.warn("[NACOS-PUSH] failed to remove client zombie"); } } }, 0, 20, TimeUnit.SECONDS); } catch (SocketException e) { Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service"); } } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } //...... public static void removeClientIfZombie() { int size = 0; for (Map.Entry<String, ConcurrentMap<String, PushClient>> entry : clientMap.entrySet()) { ConcurrentMap<String, PushClient> clientConcurrentMap = entry.getValue(); for (Map.Entry<String, PushClient> entry1 : clientConcurrentMap.entrySet()) { PushClient client = entry1.getValue(); if (client.zombie()) { clientConcurrentMap.remove(entry1.getKey()); } } size += clientConcurrentMap.size(); } if (Loggers.PUSH.isDebugEnabled()) { Loggers.PUSH.debug("[NACOS-PUSH] clientMap size: {}", size); } } //...... }
PushService實現了ApplicationContextAware、ApplicationListener<ServiceChangeEvent>接口;它有兩個ScheduledExecutorService,一個用于retransmitter,一個用于udpSender;其static代碼塊創(chuàng)建了一個deamon線程執(zhí)行Receiver,同時注冊了一個定時任務執(zhí)行removeClientIfZombie,它會遍歷clientMap,移除zombie的client
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java
public static class Receiver implements Runnable { @Override public void run() { while (true) { byte[] buffer = new byte[1024 * 64]; DatagramPacket packet = new DatagramPacket(buffer, buffer.length); try { udpSocket.receive(packet); String json = new String(packet.getData(), 0, packet.getLength(), Charset.forName("UTF-8")).trim(); AckPacket ackPacket = JSON.parseObject(json, AckPacket.class); InetSocketAddress socketAddress = (InetSocketAddress) packet.getSocketAddress(); String ip = socketAddress.getAddress().getHostAddress(); int port = socketAddress.getPort(); if (System.nanoTime() - ackPacket.lastRefTime > ACK_TIMEOUT_NANOS) { Loggers.PUSH.warn("ack takes too long from {} ack json: {}", packet.getSocketAddress(), json); } String ackKey = getACKKey(ip, port, ackPacket.lastRefTime); AckEntry ackEntry = ackMap.remove(ackKey); if (ackEntry == null) { throw new IllegalStateException("unable to find ackEntry for key: " + ackKey + ", ack json: " + json); } long pushCost = System.currentTimeMillis() - udpSendTimeMap.get(ackKey); Loggers.PUSH.info("received ack: {} from: {}:, cost: {} ms, unacked: {}, total push: {}", json, ip, port, pushCost, ackMap.size(), totalPush); pushCostMap.put(ackKey, pushCost); udpSendTimeMap.remove(ackKey); } catch (Throwable e) { Loggers.PUSH.error("[NACOS-PUSH] error while receiving ack data", e); } } } //...... public static class AckPacket { public String type; public long lastRefTime; public String data; } }
Receiver實現了Runnable接口,其run方法使用while true循環(huán)來執(zhí)行udpSocket.receive,之后解析AckPacket,從ackMap移除該ackKey,更新pushCostMap,同時從udpSendTimeMap移除該ackKey
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java
public class PushClient { private String namespaceId; private String serviceName; private String clusters; private String agent; private String tenant; private String app; private InetSocketAddress socketAddr; private DataSource dataSource; private Map<String, String[]> params; public Map<String, String[]> getParams() { return params; } public void setParams(Map<String, String[]> params) { this.params = params; } public long lastRefTime = System.currentTimeMillis(); public PushClient(String namespaceId, String serviceName, String clusters, String agent, InetSocketAddress socketAddr, DataSource dataSource, String tenant, String app) { this.namespaceId = namespaceId; this.serviceName = serviceName; this.clusters = clusters; this.agent = agent; this.socketAddr = socketAddr; this.dataSource = dataSource; this.tenant = tenant; this.app = app; } public DataSource getDataSource() { return dataSource; } public PushClient(InetSocketAddress socketAddr) { this.socketAddr = socketAddr; } public boolean zombie() { return System.currentTimeMillis() - lastRefTime > switchDomain.getPushCacheMillis(serviceName); } @Override public String toString() { return "serviceName: " + serviceName + ", clusters: " + clusters + ", ip: " + socketAddr.getAddress().getHostAddress() + ", port: " + socketAddr.getPort() + ", agent: " + agent; } public String getAgent() { return agent; } public String getAddrStr() { return socketAddr.getAddress().getHostAddress() + ":" + socketAddr.getPort(); } public String getIp() { return socketAddr.getAddress().getHostAddress(); } @Override public int hashCode() { return Objects.hash(serviceName, clusters, socketAddr); } @Override public boolean equals(Object obj) { if (!(obj instanceof PushClient)) { return false; } PushClient other = (PushClient) obj; return serviceName.equals(other.serviceName) && clusters.equals(other.clusters) && socketAddr.equals(other.socketAddr); } public String getClusters() { return clusters; } public void setClusters(String clusters) { this.clusters = clusters; } public String getNamespaceId() { return namespaceId; } public void setNamespaceId(String namespaceId) { this.namespaceId = namespaceId; } public String getServiceName() { return serviceName; } public void setServiceName(String serviceName) { this.serviceName = serviceName; } public String getTenant() { return tenant; } public void setTenant(String tenant) { this.tenant = tenant; } public String getApp() { return app; } public void setApp(String app) { this.app = app; } public InetSocketAddress getSocketAddr() { return socketAddr; } public void refresh() { lastRefTime = System.currentTimeMillis(); } }
PushClient封裝了要推送的目標服務地址等信息,它提供了zombie方法來判斷目標服務是否zombie,它判斷距離lastRefTime的時間差是否超過switchDomain指定的該serviceName的PushCacheMillis(默認為10秒
),超過則判定為zombie
@Component public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> { //...... @Override public void onApplicationEvent(ServiceChangeEvent event) { Service service = event.getService(); String serviceName = service.getName(); String namespaceId = service.getNamespaceId(); Future future = udpSender.schedule(new Runnable() { @Override public void run() { try { Loggers.PUSH.info(serviceName + " is changed, add it to push queue."); ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName)); if (MapUtils.isEmpty(clients)) { return; } Map<String, Object> cache = new HashMap<>(16); long lastRefTime = System.nanoTime(); for (PushClient client : clients.values()) { if (client.zombie()) { Loggers.PUSH.debug("client is zombie: " + client.toString()); clients.remove(client.toString()); Loggers.PUSH.debug("client is zombie: " + client.toString()); continue; } Receiver.AckEntry ackEntry; Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString()); String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent()); byte[] compressData = null; Map<String, Object> data = null; if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) { org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key); compressData = (byte[]) (pair.getValue0()); data = (Map<String, Object>) pair.getValue1(); Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr()); } if (compressData != null) { ackEntry = prepareAckEntry(client, compressData, data, lastRefTime); } else { ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime); if (ackEntry != null) { cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data)); } } Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}", client.getServiceName(), client.getAddrStr(), client.getAgent(), (ackEntry == null ? null : ackEntry.key)); udpPush(ackEntry); } } catch (Exception e) { Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e); } finally { futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName)); } } }, 1000, TimeUnit.MILLISECONDS); futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future); } //...... public void serviceChanged(Service service) { // merge some change events to reduce the push frequency: if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) { return; } this.applicationContext.publishEvent(new ServiceChangeEvent(this, service)); } //...... }
onApplicationEvent會處理ServiceChangeEvent,它會注冊一個延時任務并將該future放入futureMap;該延時任務會從clientMap獲取指定namespaceId, serviceName的clients;然后遍歷clients判斷是否是zombie,如果是的話則移除該client,否則創(chuàng)建Receiver.AckEntry,然后執(zhí)行udpPush(ackEntry),最后從futureMap移除該future;serviceChanged方法提供給外部調用發(fā)布ServiceChangeEvent
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java
@Component public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> { //...... public static class Receiver implements Runnable { //...... public static class AckEntry { public AckEntry(String key, DatagramPacket packet) { this.key = key; this.origin = packet; } public void increaseRetryTime() { retryTimes.incrementAndGet(); } public int getRetryTimes() { return retryTimes.get(); } public String key; public DatagramPacket origin; private AtomicInteger retryTimes = new AtomicInteger(0); public Map<String, Object> data; } //...... } private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) { if (ackEntry == null) { Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null."); return null; } if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) { Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key); ackMap.remove(ackEntry.key); udpSendTimeMap.remove(ackEntry.key); failedPush += 1; return ackEntry; } try { if (!ackMap.containsKey(ackEntry.key)) { totalPush++; } ackMap.put(ackEntry.key, ackEntry); udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis()); Loggers.PUSH.info("send udp packet: " + ackEntry.key); udpSocket.send(ackEntry.origin); ackEntry.increaseRetryTime(); executorService.schedule(new Retransmitter(ackEntry), TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS); return ackEntry; } catch (Exception e) { Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data, ackEntry.origin.getAddress().getHostAddress(), e); ackMap.remove(ackEntry.key); udpSendTimeMap.remove(ackEntry.key); failedPush += 1; return null; } } //...... }
udpPush方法會根據Receiver.AckEntry的信息進行判斷,如果其重試次數大于MAX_RETRY_TIMES則終止push,將其從ackMap、udpSendTimeMap中移除;如果可以重試則將其ackEntry.key放入ackMap及udpSendTimeMap,然后執(zhí)行udpSocket.send(ackEntry.origin)及ackEntry.increaseRetryTime(),并注冊Retransmitter的延時任務;如果出現異常則將其從ackMap、udpSendTimeMap移除
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java
public static class Retransmitter implements Runnable { Receiver.AckEntry ackEntry; public Retransmitter(Receiver.AckEntry ackEntry) { this.ackEntry = ackEntry; } @Override public void run() { if (ackMap.containsKey(ackEntry.key)) { Loggers.PUSH.info("retry to push data, key: " + ackEntry.key); udpPush(ackEntry); } } }
Retransmitter實現了Runnable方法,其run方法在ackMap包含ackEntry.key的條件下執(zhí)行udpPush重試
PushService實現了ApplicationContextAware、ApplicationListener<ServiceChangeEvent>接口
其static代碼塊創(chuàng)建了一個deamon線程執(zhí)行Receiver,同時注冊了一個定時任務執(zhí)行removeClientIfZombie,它會遍歷clientMap,移除zombie的client
其onApplicationEvent會處理ServiceChangeEvent,它會注冊一個延時任務并將該future放入futureMap;該延時任務會從clientMap獲取指定namespaceId, serviceName的clients;然后遍歷clients判斷是否是zombie,如果是的話則移除該client,否則創(chuàng)建Receiver.AckEntry,然后執(zhí)行udpPush(ackEntry),最后從futureMap移除該future;serviceChanged方法提供給外部調用發(fā)布ServiceChangeEvent
感謝各位的閱讀,以上就是“nacos server中PushService的原理和應用”的內容了,經過本文的學習后,相信大家對nacos server中PushService的原理和應用這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是創(chuàng)新互聯,小編將為大家推送更多相關知識點的文章,歡迎關注!
標題名稱:nacosserver中PushService的原理和應用
分享鏈接:http://jinyejixie.com/article24/psipce.html
成都網站建設公司_創(chuàng)新互聯,為您提供定制開發(fā)、企業(yè)網站制作、網頁設計公司、、外貿建站、動態(tài)網站
聲明:本網站發(fā)布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯