成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

dubbo中MonitorFilter的作用是什么

這篇文章主要介紹“dubbo中MonitorFilter的作用是什么”,在日常操作中,相信很多人在dubbo中MonitorFilter的作用是什么問題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”dubbo中MonitorFilter的作用是什么”的疑惑有所幫助!接下來,請(qǐng)跟著小編一起來學(xué)習(xí)吧!

成都創(chuàng)新互聯(lián)公司專注于企業(yè)成都全網(wǎng)營(yíng)銷、網(wǎng)站重做改版、達(dá)川網(wǎng)站定制設(shè)計(jì)、自適應(yīng)品牌網(wǎng)站建設(shè)、H5建站、商城網(wǎng)站建設(shè)、集團(tuán)公司官網(wǎng)建設(shè)、外貿(mào)網(wǎng)站制作、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁(yè)設(shè)計(jì)等建站業(yè)務(wù),價(jià)格優(yōu)惠性價(jià)比高,為達(dá)川等各大城市提供網(wǎng)站開發(fā)制作服務(wù)。

本文主要研究一下dubbo的MonitorFilter

MonitorFilter

dubbo-2.7.2/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java

@Activate(group = {PROVIDER, CONSUMER})
public class MonitorFilter extends ListenableFilter {

    private static final Logger logger = LoggerFactory.getLogger(MonitorFilter.class);
    private static final String MONITOR_FILTER_START_TIME = "monitor_filter_start_time";

    public MonitorFilter() {
        super.listener = new MonitorListener();
    }
    /**
     * The Concurrent counter
     */
    private final ConcurrentMap<String, AtomicInteger> concurrents = new ConcurrentHashMap<String, AtomicInteger>();

    /**
     * The MonitorFactory
     */
    private MonitorFactory monitorFactory;

    public void setMonitorFactory(MonitorFactory monitorFactory) {
        this.monitorFactory = monitorFactory;
    }


    /**
     * The invocation interceptor,it will collect the invoke data about this invocation and send it to monitor center
     *
     * @param invoker    service
     * @param invocation invocation.
     * @return {@link Result} the invoke result
     * @throws RpcException
     */
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
            invocation.setAttachment(MONITOR_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));
            getConcurrent(invoker, invocation).incrementAndGet(); // count up
        }
        return invoker.invoke(invocation); // proceed invocation chain
    }

    // concurrent counter
    private AtomicInteger getConcurrent(Invoker<?> invoker, Invocation invocation) {
        String key = invoker.getInterface().getName() + "." + invocation.getMethodName();
        AtomicInteger concurrent = concurrents.get(key);
        if (concurrent == null) {
            concurrents.putIfAbsent(key, new AtomicInteger());
            concurrent = concurrents.get(key);
        }
        return concurrent;
    }

    //......
}
  • MonitorFilter繼承了ListenableFilter,其invoke方法在invoker的URL中包含有monitor參數(shù)時(shí)會(huì)給invocation設(shè)置monitor_filter_start_time的attachment,然后遞增當(dāng)前并發(fā)的次數(shù);其創(chuàng)建的listener為MonitorListener

MonitorListener

dubbo-2.7.2/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java

    class MonitorListener implements Listener {

        @Override
        public void onResponse(Result result, Invoker<?> invoker, Invocation invocation) {
            if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
                collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), false);
                getConcurrent(invoker, invocation).decrementAndGet(); // count down
            }
        }

        @Override
        public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
            if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
                collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), true);
                getConcurrent(invoker, invocation).decrementAndGet(); // count down
            }
        }

        /**
         * The collector logic, it will be handled by the default monitor
         *
         * @param invoker
         * @param invocation
         * @param result     the invoke result
         * @param remoteHost the remote host address
         * @param start      the timestamp the invoke begin
         * @param error      if there is an error on the invoke
         */
        private void collect(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
            try {
                URL monitorUrl = invoker.getUrl().getUrlParameter(MONITOR_KEY);
                Monitor monitor = monitorFactory.getMonitor(monitorUrl);
                if (monitor == null) {
                    return;
                }
                URL statisticsURL = createStatisticsUrl(invoker, invocation, result, remoteHost, start, error);
                monitor.collect(statisticsURL);
            } catch (Throwable t) {
                logger.warn("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);
            }
        }

        /**
         * Create statistics url
         *
         * @param invoker
         * @param invocation
         * @param result
         * @param remoteHost
         * @param start
         * @param error
         * @return
         */
        private URL createStatisticsUrl(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
            // ---- service statistics ----
            long elapsed = System.currentTimeMillis() - start; // invocation cost
            int concurrent = getConcurrent(invoker, invocation).get(); // current concurrent count
            String application = invoker.getUrl().getParameter(APPLICATION_KEY);
            String service = invoker.getInterface().getName(); // service name
            String method = RpcUtils.getMethodName(invocation); // method name
            String group = invoker.getUrl().getParameter(GROUP_KEY);
            String version = invoker.getUrl().getParameter(VERSION_KEY);

            int localPort;
            String remoteKey, remoteValue;
            if (CONSUMER_SIDE.equals(invoker.getUrl().getParameter(SIDE_KEY))) {
                // ---- for service consumer ----
                localPort = 0;
                remoteKey = MonitorService.PROVIDER;
                remoteValue = invoker.getUrl().getAddress();
            } else {
                // ---- for service provider ----
                localPort = invoker.getUrl().getPort();
                remoteKey = MonitorService.CONSUMER;
                remoteValue = remoteHost;
            }
            String input = "", output = "";
            if (invocation.getAttachment(INPUT_KEY) != null) {
                input = invocation.getAttachment(INPUT_KEY);
            }
            if (result != null && result.getAttachment(OUTPUT_KEY) != null) {
                output = result.getAttachment(OUTPUT_KEY);
            }

            return new URL(COUNT_PROTOCOL, NetUtils.getLocalHost(), localPort, service + PATH_SEPARATOR + method, MonitorService.APPLICATION, application, MonitorService.INTERFACE, service, MonitorService.METHOD, method, remoteKey, remoteValue, error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1", MonitorService.ELAPSED, String.valueOf(elapsed), MonitorService.CONCURRENT, String.valueOf(concurrent), INPUT_KEY, input, OUTPUT_KEY, output, GROUP_KEY, group, VERSION_KEY, version);
        }

    }
  • MonitorListener實(shí)現(xiàn)了Listener接口,其onResponse及onError方法在invoker的URL中包含有monitor參數(shù)時(shí)會(huì)上報(bào)指標(biāo),然后遞減并發(fā)次數(shù)

實(shí)例

dubbo-2.7.2/dubbo-monitor/dubbo-monitor-api/src/test/java/org/apache/dubbo/monitor/support/MonitorFilterTest.java

public class MonitorFilterTest {

    private volatile URL lastStatistics;

    private volatile Invocation lastInvocation;

    private final Invoker<MonitorService> serviceInvoker = new Invoker<MonitorService>() {
        @Override
        public Class<MonitorService> getInterface() {
            return MonitorService.class;
        }

        public URL getUrl() {
            try {
                return URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":20880?" + APPLICATION_KEY + "=abc&" + SIDE_KEY + "=" + CONSUMER_SIDE + "&" + MONITOR_KEY + "=" + URLEncoder.encode("dubbo://" + NetUtils.getLocalHost() + ":7070", "UTF-8"));
            } catch (UnsupportedEncodingException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
        }

        @Override
        public boolean isAvailable() {
            return false;
        }

        public Result invoke(Invocation invocation) throws RpcException {
            lastInvocation = invocation;
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        }

        @Override
        public void destroy() {
        }
    };

    private MonitorFactory monitorFactory = new MonitorFactory() {
        @Override
        public Monitor getMonitor(final URL url) {
            return new Monitor() {
                public URL getUrl() {
                    return url;
                }

                @Override
                public boolean isAvailable() {
                    return true;
                }

                @Override
                public void destroy() {
                }

                public void collect(URL statistics) {
                    MonitorFilterTest.this.lastStatistics = statistics;
                }

                public List<URL> lookup(URL query) {
                    return Arrays.asList(MonitorFilterTest.this.lastStatistics);
                }
            };
        }
    };

    @Test
    public void testFilter() throws Exception {
        MonitorFilter monitorFilter = new MonitorFilter();
        monitorFilter.setMonitorFactory(monitorFactory);
        Invocation invocation = new RpcInvocation("aaa", new Class<?>[0], new Object[0]);
        RpcContext.getContext().setRemoteAddress(NetUtils.getLocalHost(), 20880).setLocalAddress(NetUtils.getLocalHost(), 2345);
        Result result = monitorFilter.invoke(serviceInvoker, invocation);
        result.thenApplyWithContext((r) -> {
            monitorFilter.listener().onResponse(r, serviceInvoker, invocation);
            return r;
        });
        while (lastStatistics == null) {
            Thread.sleep(10);
        }
        Assertions.assertEquals("abc", lastStatistics.getParameter(MonitorService.APPLICATION));
        Assertions.assertEquals(MonitorService.class.getName(), lastStatistics.getParameter(MonitorService.INTERFACE));
        Assertions.assertEquals("aaa", lastStatistics.getParameter(MonitorService.METHOD));
        Assertions.assertEquals(NetUtils.getLocalHost() + ":20880", lastStatistics.getParameter(MonitorService.PROVIDER));
        Assertions.assertEquals(NetUtils.getLocalHost(), lastStatistics.getAddress());
        Assertions.assertEquals(null, lastStatistics.getParameter(MonitorService.CONSUMER));
        Assertions.assertEquals(1, lastStatistics.getParameter(MonitorService.SUCCESS, 0));
        Assertions.assertEquals(0, lastStatistics.getParameter(MonitorService.FAILURE, 0));
        Assertions.assertEquals(1, lastStatistics.getParameter(MonitorService.CONCURRENT, 0));
        Assertions.assertEquals(invocation, lastInvocation);
    }

    @Test
    public void testSkipMonitorIfNotHasKey() {
        MonitorFilter monitorFilter = new MonitorFilter();
        MonitorFactory mockMonitorFactory = mock(MonitorFactory.class);
        monitorFilter.setMonitorFactory(mockMonitorFactory);
        Invocation invocation = new RpcInvocation("aaa", new Class<?>[0], new Object[0]);
        Invoker invoker = mock(Invoker.class);
        given(invoker.getUrl()).willReturn(URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":20880?" + APPLICATION_KEY + "=abc&" + SIDE_KEY + "=" + CONSUMER_SIDE));

        monitorFilter.invoke(invoker, invocation);

        verify(mockMonitorFactory, never()).getMonitor(any(URL.class));
    }

    @Test
    public void testGenericFilter() throws Exception {
        MonitorFilter monitorFilter = new MonitorFilter();
        monitorFilter.setMonitorFactory(monitorFactory);
        Invocation invocation = new RpcInvocation("$invoke", new Class<?>[]{String.class, String[].class, Object[].class}, new Object[]{"xxx", new String[]{}, new Object[]{}});
        RpcContext.getContext().setRemoteAddress(NetUtils.getLocalHost(), 20880).setLocalAddress(NetUtils.getLocalHost(), 2345);
        Result result = monitorFilter.invoke(serviceInvoker, invocation);
        result.thenApplyWithContext((r) -> {
            monitorFilter.listener().onResponse(r, serviceInvoker, invocation);
            return r;
        });
        while (lastStatistics == null) {
            Thread.sleep(10);
        }
        Assertions.assertEquals("abc", lastStatistics.getParameter(MonitorService.APPLICATION));
        Assertions.assertEquals(MonitorService.class.getName(), lastStatistics.getParameter(MonitorService.INTERFACE));
        Assertions.assertEquals("xxx", lastStatistics.getParameter(MonitorService.METHOD));
        Assertions.assertEquals(NetUtils.getLocalHost() + ":20880", lastStatistics.getParameter(MonitorService.PROVIDER));
        Assertions.assertEquals(NetUtils.getLocalHost(), lastStatistics.getAddress());
        Assertions.assertEquals(null, lastStatistics.getParameter(MonitorService.CONSUMER));
        Assertions.assertEquals(1, lastStatistics.getParameter(MonitorService.SUCCESS, 0));
        Assertions.assertEquals(0, lastStatistics.getParameter(MonitorService.FAILURE, 0));
        Assertions.assertEquals(1, lastStatistics.getParameter(MonitorService.CONCURRENT, 0));
        Assertions.assertEquals(invocation, lastInvocation);
    }

    @Test
    public void testSafeFailForMonitorCollectFail() {
        MonitorFilter monitorFilter = new MonitorFilter();
        MonitorFactory mockMonitorFactory = mock(MonitorFactory.class);
        Monitor mockMonitor = mock(Monitor.class);
        Mockito.doThrow(new RuntimeException()).when(mockMonitor).collect(any(URL.class));

        monitorFilter.setMonitorFactory(mockMonitorFactory);
        given(mockMonitorFactory.getMonitor(any(URL.class))).willReturn(mockMonitor);
        Invocation invocation = new RpcInvocation("aaa", new Class<?>[0], new Object[0]);

        monitorFilter.invoke(serviceInvoker, invocation);
    }
}
  • MonitorFilterTest驗(yàn)證了testFilter、testSkipMonitorIfNotHasKey、testGenericFilter、testSafeFailForMonitorCollectFail這幾個(gè)場(chǎng)景

小結(jié)

MonitorFilter繼承了ListenableFilter,其invoke方法在invoker的URL中包含有monitor參數(shù)時(shí)會(huì)給invocation設(shè)置monitor_filter_start_time的attachment,然后遞增當(dāng)前并發(fā)的次數(shù);其創(chuàng)建的listener為MonitorListener;MonitorListener實(shí)現(xiàn)了Listener接口,其onResponse及onError方法在invoker的URL中包含有monitor參數(shù)時(shí)會(huì)上報(bào)指標(biāo),然后遞減并發(fā)次數(shù)

到此,關(guān)于“dubbo中MonitorFilter的作用是什么”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

文章標(biāo)題:dubbo中MonitorFilter的作用是什么
網(wǎng)頁(yè)鏈接:http://jinyejixie.com/article38/gpijpp.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供域名注冊(cè)、動(dòng)態(tài)網(wǎng)站定制網(wǎng)站、品牌網(wǎng)站建設(shè)、響應(yīng)式網(wǎng)站用戶體驗(yàn)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

綿陽(yáng)服務(wù)器托管
镇原县| 安阳县| 贡嘎县| 枣庄市| 宁南县| 镇巴县| 保靖县| 东安县| 东台市| 河间市| 若羌县| 中方县| 杭州市| 潼南县| 菏泽市| 眉山市| 霍林郭勒市| 新源县| 龙川县| 东台市| 小金县| 河东区| 天等县| 江川县| 屏东县| 邯郸县| 沽源县| 遂昌县| 昌乐县| 南阳市| 宿州市| 章丘市| 宿松县| 剑阁县| 上林县| 沅江市| 徐闻县| 高密市| 襄汾县| 墨玉县| 上思县|