成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

netty無(wú)縫切換rabbitmq、activemq、roc-創(chuàng)新互聯(lián)

netty無(wú)縫切換rabbitmq、activemq、roc

創(chuàng)新互聯(lián)公司是一家成都網(wǎng)站設(shè)計(jì)、成都做網(wǎng)站、外貿(mào)網(wǎng)站建設(shè),提供網(wǎng)頁(yè)設(shè)計(jì),網(wǎng)站設(shè)計(jì),網(wǎng)站制作,建網(wǎng)站,按需搭建網(wǎng)站,網(wǎng)站開發(fā)公司,從2013年創(chuàng)立是互聯(lián)行業(yè)建設(shè)者,服務(wù)者。以提升客戶品牌價(jià)值為核心業(yè)務(wù),全程參與項(xiàng)目的網(wǎng)站策劃設(shè)計(jì)制作,前端開發(fā),后臺(tái)程序制作以及后期項(xiàng)目運(yùn)營(yíng)并提出專業(yè)建議和思路。

netty無(wú)縫切換rabbitmq、activemq、roc

netty的pipeline處理鏈上的handler:需要IdleStateHandler心跳檢測(cè)channel是否可以,以及處理登錄認(rèn)證的UserAuthHandler和消息處理MessageHandler

protected void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(defLoopGroup,
        //編碼解碼器
        new HttpServerCodec(),
        //將多個(gè)消息轉(zhuǎn)換成單一的消息對(duì)象
        new HttpObjectAggregator(65536),
        //支持異步發(fā)送大的碼流,一般用于發(fā)送文件流
        new ChunkedWriteHandler(),
        //檢測(cè)鏈路是否讀空閑,配合心跳handler檢測(cè)channel是否正常
        new IdleStateHandler(60, 0, 0),
        //處理握手和認(rèn)證
        new UserAuthHandler(),
        //處理消息的發(fā)送
        new MessageHandler()
    );
}

對(duì)于所有連進(jìn)來(lái)的channel,我們需要保存起來(lái),往后的群發(fā)消息需要依靠這些channel

public static void addChannel(Channel channel) {
        String remoteAddr = NettyUtil.parseChannelRemoteAddr(channel);
        System.out.println("addChannel:" + remoteAddr);
        if (!channel.isActive()) {
            logger.error("channel is not active, address: {}", remoteAddr);
        }
        UserInfo userInfo = new UserInfo();
        userInfo.setAddr(remoteAddr);
        userInfo.setChannel(channel);
        userInfo.setTime(System.currentTimeMillis());
        userInfos.put(channel, userInfo);
    }

登錄后,channel就變成有效的channel,無(wú)效的channel之后將會(huì)丟棄

public static boolean saveUser(Channel channel, String nick, String password) {
        UserInfo userInfo = userInfos.get(channel);
        if (userInfo == null) {
            return false;
        }
        if (!channel.isActive()) {
            logger.error("channel is not active, address: {}, nick: {}", userInfo.getAddr(), nick);
            return false;
        }

        if (nick == null || password == null) {
            return false;
        }
        LambdaQueryWrapper<Account> lambdaQueryWrapper = new LambdaQueryWrapper<>();
        lambdaQueryWrapper.eq(Account::getUsername, nick).eq(Account::getPassword, password);
        Account account = accountMapperStatic.selectOne(lambdaQueryWrapper);
        if (account == null) {
            return false;
        }
        // 增加一個(gè)認(rèn)證用戶
        userCount.incrementAndGet();
        userInfo.setNick(nick);
        userInfo.setAuth(true);
        userInfo.setId(account.getId());
        userInfo.setUsername(account.getUsername());
        userInfo.setGroupNumber(account.getGroupNumber());
        userInfo.setTime(System.currentTimeMillis());

        // 注冊(cè)該用戶推送消息的通道
        offlineInfoTransmitStatic.registerPull(channel);
        return true;
    }

當(dāng)channel關(guān)閉時(shí),就不再接收消息。unregisterPull就是注銷信息消費(fèi)者,客戶端不再接取聊天消息。此外,從下方有一個(gè)加寫鎖的操作,就是為了避免channel還在發(fā)送消息時(shí),這邊突然關(guān)閉channel,這樣會(huì)導(dǎo)致報(bào)錯(cuò)。

public static void removeChannel(Channel channel) {
        try {
            logger.warn("channel will be remove, address is :{}", NettyUtil.parseChannelRemoteAddr(channel));
            //加上讀寫鎖保證移除channel時(shí),避免channel關(guān)閉時(shí),還有別的線程對(duì)其操作,造成錯(cuò)誤
            rwLock.writeLock().lock();
            channel.close();
            UserInfo userInfo = userInfos.get(channel);
            if (userInfo != null) {
                if (userInfo.isAuth()) {
                    offlineInfoTransmitStatic.unregisterPull(channel);
                    // 減去一個(gè)認(rèn)證用戶
                    userCount.decrementAndGet();
                }
                userInfos.remove(channel);
            }
        } finally {
            rwLock.writeLock().unlock();
        }

    }

為了無(wú)縫切換使用rabbitmq、rocketmq、activemq、不使用中間件存儲(chǔ)和轉(zhuǎn)發(fā)聊天消息這4種狀態(tài),定義如下4個(gè)接口。依次是發(fā)送單聊消息、群聊消息、客戶端啟動(dòng)接收消息、客戶端下線不接收消息。

public interface OfflineInfoTransmit {
    void pushP2P(Integer userId, String message);

    void pushGroup(String groupNumber, String message);

    void registerPull(Channel channel);

    void unregisterPull(Channel channel);
}

其中,如何使用rabbitmq、rocketmq、activemq三種中間件中的一種來(lái)存儲(chǔ)和轉(zhuǎn)發(fā)聊天消息,它的處理流程如下:

  1. 單聊的模型參考線程池的模型,如果用戶在線,直接通過(guò)channel發(fā)送給用戶。如果用戶離線,則發(fā)往中間件存儲(chǔ),下次用戶上線時(shí)直接從中間件拉取消息。這樣做對(duì)比所有消息的發(fā)送都通過(guò)中間件來(lái)轉(zhuǎn)的好處是提升了性能
  2. 群聊則是完全通過(guò)中間件來(lái)轉(zhuǎn)發(fā)消息,消息發(fā)送中間件,客戶端從中間件接取消息。如果仍像單聊那樣操作,在線用戶直接通過(guò)channel發(fā)送,操作過(guò)于繁瑣,要判斷這個(gè)群組的哪些用戶是否在線
  3. 如果用戶在線就注冊(cè)消費(fèi)者,從中間件接取消息。否則,就斷開消費(fèi)者,消息保留在中間件中,以便客戶端下次上線時(shí)拉取。這樣就實(shí)現(xiàn)了離線消息的接收。
  4. 不管使用哪種中間件或使用不使用中間件,它的處理流程都遵循上面的3個(gè)要求,就能無(wú)縫切換上方的4種方法來(lái)存儲(chǔ)和轉(zhuǎn)發(fā)消息。需要哪種方法開啟相應(yīng)注解即可。

netty無(wú)縫切換rabbitmq、activemq、roc

代碼地址:
https://github.com/shuangyueliao/netty-chat

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)cdcxhl.cn,海內(nèi)外云服務(wù)器15元起步,三天無(wú)理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。

文章名稱:netty無(wú)縫切換rabbitmq、activemq、roc-創(chuàng)新互聯(lián)
網(wǎng)站URL:http://jinyejixie.com/article18/djcgdp.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供全網(wǎng)營(yíng)銷推廣云服務(wù)器、微信公眾號(hào)App設(shè)計(jì)、Google、網(wǎng)站改版

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都seo排名網(wǎng)站優(yōu)化
城固县| 时尚| 汉寿县| 玛沁县| 黄石市| 沁源县| 宝山区| 景泰县| 古丈县| 滦平县| 安岳县| 永嘉县| 古浪县| 呼伦贝尔市| 桐庐县| 剑川县| 宿迁市| 泸定县| 阿勒泰市| 长顺县| 新河县| 孟州市| 西畴县| 巴林左旗| 曲水县| 陕西省| 克拉玛依市| 连城县| 福泉市| 永宁县| 大名县| 区。| 略阳县| 武功县| 大厂| 广西| 黔西县| 中方县| 海城市| 石景山区| 灌阳县|