成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

并發(fā)計數(shù)類LongAdder怎么用-創(chuàng)新互聯(lián)

本篇內(nèi)容介紹了“并發(fā)計數(shù)類LongAdder怎么用”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!

網(wǎng)站建設(shè)哪家好,找創(chuàng)新互聯(lián)公司!專注于網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、小程序制作、集團企業(yè)網(wǎng)站建設(shè)等服務(wù)項目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了伊州免費建站歡迎大家使用!

AtomicLong是通過CAS(即Compare And Swap)原理來完成原子遞增遞減操作,在并發(fā)情況下不會出現(xiàn)線程不安全結(jié)果。AtomicLong中的value是使用volatile修飾,并發(fā)下各個線程對value最新值均可見。我們以incrementAndGet()方法來深入。

  public final long incrementAndGet() {

        return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;

    }

這里是調(diào)用了unsafe的方法

    public final long getAndAddLong(Object var1, long var2, long var4) {

        long var6;

        do {

            var6 = this.getLongVolatile(var1, var2);

        } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));

        return var6;

    }

方法中this.compareAndSwapLong()有4個參數(shù),var1是需要修改的類對象,var2是需要修改的字段的內(nèi)存地址,var6是修改前字段的值,var6+var4是修改后字段的值。compareAndSwapLong只有該字段實際值和var6值相當(dāng)?shù)臅r候,才可以成功設(shè)置其為var6+var4。

再繼續(xù)往深一層去看

public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

這里Unsafe.compareAndSwapLong是native方法,底層通過JNI(Java Native Interface)來完成調(diào)用,實際就是借助C來調(diào)用CPU指令來完成。

實現(xiàn)中使用了do-while循環(huán),如果CAS失敗,則會繼續(xù)重試,直到成功為止。并發(fā)特別高的時候,雖然這里可能會有很多次循環(huán),但是還是可以保證線程安全的。不過如果自旋CAS操作長時間不成功,競爭較大,會帶CPU帶來極大的開銷,占用更多的執(zhí)行資源,可能會影響其他主業(yè)務(wù)的計算等。

LongAdder怎么優(yōu)化AtomicLong

Doug Lea在jdk1.5的時候就針對HashMap進行了線程安全和并發(fā)性能的優(yōu)化,推出了分段鎖實現(xiàn)的ConcurrentHashMap。一般Java面試,基本上離不開ConcurrentHashMap這個網(wǎng)紅問題。另外在ForkJoinPool中,Doug Lea在其工作竊取算法上對WorkQueue使用了細粒度鎖來較少并發(fā)的競爭,更多細節(jié)可參考我的原創(chuàng)文章ForkJoin使用和原理剖析。如果已經(jīng)對ConcurrentHashMap有了較為深刻的理解,那么現(xiàn)在來看LongAdder的實現(xiàn)就會相對簡單了。

來看LongAdder的increase()方法實現(xiàn),

    public void add(long x) {

        Cell[] as; long b, v; int m; Cell a;

        //第一個if進行了兩個判斷,(1)如果cells不為空,則直接進入第二個if語句中。(2)同樣會先使用cas指令來嘗試add,如果成功則直接返回。如果失敗則說明存在競爭,需要重新add

        if ((as = cells) != null || !casBase(b = base, b + x)) {

            boolean uncontended = true;

            if (as == null || (m = as.length - 1) < 0 ||

                (a = as[getProbe() & m]) == null ||

                !(uncontended = a.cas(v = a.value, v + x)))

                longAccumulate(x, null, uncontended);

        }

    }

這里用到了Cell類對象,Cell對象是LongAdder高并發(fā)實現(xiàn)的關(guān)鍵。在casBase沖突嚴重的時候,就會去創(chuàng)建Cell對象并添加到cells中,下面會詳細分析。

    @sun.misc.Contended static final class Cell {

        volatile long value;

        Cell(long x) { value = x; }

        //提供CAS方法修改當(dāng)前Cell對象上的value

        final boolean cas(long cmp, long val) {

            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);

        }

        // Unsafe mechanics

        private static final sun.misc.Unsafe UNSAFE;

        private static final long valueOffset;

        static {

            try {

                UNSAFE = sun.misc.Unsafe.getUnsafe();

                Class<?> ak = Cell.class;

                valueOffset = UNSAFE.objectFieldOffset

                    (ak.getDeclaredField("value"));

            } catch (Exception e) {

                throw new Error(e);

            }

        }

    }

而這一句a = as[getProbe() & m]其實就是通過getProbe()拿到當(dāng)前Thread的threadLocalRandomProbe的probe Hash值。這個值其實是一個隨機值,這個隨機值由當(dāng)前線程ThreadLocalRandom.current()產(chǎn)生。不用Rondom的原因是因為這里已經(jīng)是高并發(fā)了,多線程情況下Rondom會極大可能得到同一個隨機值。因此這里使用threadLocalRandomProbe在高并發(fā)時會更加隨機,減少沖突。更多ThreadLocalRandom信息想要深入了解可關(guān)注這篇文章并發(fā)包中ThreadLocalRandom類原理淺嘗。拿到as數(shù)組中當(dāng)前線程的Cell對象,然后再進行CAS的更新操作,我們在源碼上進行分析。longAccumulate()是在父類Striped64.java中。

    final void longAccumulate(long x, LongBinaryOperator fn,

                              boolean wasUncontended) {

        int h;

        if ((h = getProbe()) == 0) {

          //如果當(dāng)前線程的隨機數(shù)為0,則初始化隨機數(shù)

            ThreadLocalRandom.current(); // force initialization

            h = getProbe();

            wasUncontended = true;

        }

        boolean collide = false;                // True if last slot nonempty

        for (;;) {

            Cell[] as; Cell a; int n; long v;

            //如果當(dāng)前cells數(shù)組不為空

            if ((as = cells) != null && (n = as.length) > 0) {

            //如果線程隨機數(shù)對應(yīng)的cells對應(yīng)數(shù)組下標的Cell元素不為空,

                if ((a = as[(n - 1) & h]) == null) {

                //當(dāng)使用到LongAdder的Cell數(shù)組相關(guān)的操作時,需要先獲取全局的cellsBusy的鎖,才可以進行相關(guān)操作。如果當(dāng)前有其他線程的使用,則放棄這一步,繼續(xù)for循環(huán)重試。

                    if (cellsBusy == 0) {       // Try to attach new Cell

                    //Cell的初始值是x,創(chuàng)建完畢則說明已經(jīng)加上

                        Cell r = new Cell(x);   // Optimistically create

                        //casCellsBusy獲取鎖,cellsBusy通過CAS方式獲取鎖,當(dāng)成功設(shè)置cellsBusy為1時,則獲取到鎖。

                        if (cellsBusy == 0 && casCellsBusy()) {

                            boolean created = false;

                            try {               // Recheck under lock

                                Cell[] rs; int m, j;

                                if ((rs = cells) != null &&

                                    (m = rs.length) > 0 &&

                                    rs[j = (m - 1) & h] == null) {

                                    rs[j] = r;

                                    created = true;

                                }

                            } finally {

                              //finally里面釋放鎖

                                cellsBusy = 0;

                            }

                            if (created)

                                break;

                            continue;           // Slot is now non-empty

                        }

                    }

                    collide = false;

                }

                else if (!wasUncontended)       // CAS already known to fail

                    wasUncontended = true;      // Continue after rehash

                //如果a不為空,則對a進行cas增x操作,成功則返回    

                else if (a.cas(v = a.value, ((fn == null) ? v + x :

                                             fn.applyAsLong(v, x))))

                    break;

                //cells的長度n已經(jīng)大于CPU數(shù)量,則繼續(xù)擴容沒有意義,因此直接標記為不沖突

                else if (n >= NCPU || cells != as)

                    collide = false;            // At max size or stale

                else if (!collide)

                    collide = true;

                //到這一步則說明a不為空但是a上進行CAS操作也有多個線程在競爭,因此需要擴容cells數(shù)組,其長度為原長度的2倍

                else if (cellsBusy == 0 && casCellsBusy()) {

                    try {

                        if (cells == as) {      // Expand table unless stale

                            Cell[] rs = new Cell[n << 1];

                            for (int i = 0; i < n; ++i)

                                rs[i] = as[i];

                            cells = rs;

                        }

                    } finally {

                        cellsBusy = 0;

                    }

                    collide = false;

                    continue;                   // Retry with expanded table

                }

                //繼續(xù)使用新的隨機數(shù),避免在同一個Cell上競爭

                h = advanceProbe(h);

            }

            //如果cells為空,則需要先創(chuàng)建Cell數(shù)組。初始長度為2.(個人理解這個if放在前面會比較好一點,哈哈)

            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {

                boolean init = false;

                try {                           // Initialize table

                    if (cells == as) {

                        Cell[] rs = new Cell[2];

                        rs[h & 1] = new Cell(x);

                        cells = rs;

                        init = true;

                    }

                } finally {

                    cellsBusy = 0;

                }

                if (init)

                    break;

            }

            //如果在a上競爭失敗,且擴容競爭也失敗了,則在casBase上嘗試增加數(shù)量

            else if (casBase(v = base, ((fn == null) ? v + x :

                                        fn.applyAsLong(v, x))))

                break;                          // Fall back on using base

        }

    }

最后是求LongAdder的總數(shù),這一步就非常簡單了,把base的值和所有cells上的value值加起來就是總數(shù)了。

    public long sum() {

        Cell[] as = cells; Cell a;

        long sum = base;

        if (as != null) {

            for (int i = 0; i < as.length; ++i) {

                if ((a = as[i]) != null)

                    sum += a.value;

            }

        }

        return sum;

    }

思考

源碼中Cell數(shù)組的會控制在不超過NCPU的兩倍,原因是LongAdder其實在底層是依賴CPU的CAS指令來操作,如果多出太多,即使在代碼層面沒有競爭,在底層CPU的競爭會更多,所以這里會有一個數(shù)量的限制。所以在LongAdder的設(shè)計中,由于使用到CAS指令操作,瓶頸在于CPU上。

YY一下,那么有沒有方式可以突破這個瓶頸呢?我個人覺得是有的,但是有前提條件,應(yīng)用場景極其有限。基于ThreadLocal的設(shè)計,假設(shè)統(tǒng)計只在一個固定的線程池中進行,假設(shè)線程池中的線程不會銷毀(異常補充線程的就暫時不管了),則可以認為線程數(shù)量是固定且不變的,那么統(tǒng)計則可以依賴于只在當(dāng)前線程中進行,那么即使是高并發(fā),就轉(zhuǎn)化為ThreadLocal這種單線程操作了,完全可以擺脫CAS的CPU指令操作的限制,那么性能將極大提升。

“并發(fā)計數(shù)類LongAdder怎么用”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注創(chuàng)新互聯(lián)-成都網(wǎng)站建設(shè)公司網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

當(dāng)前題目:并發(fā)計數(shù)類LongAdder怎么用-創(chuàng)新互聯(lián)
路徑分享:http://jinyejixie.com/article26/depcjg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供營銷型網(wǎng)站建設(shè)、靜態(tài)網(wǎng)站動態(tài)網(wǎng)站、網(wǎng)站收錄、網(wǎng)站策劃、全網(wǎng)營銷推廣

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

商城網(wǎng)站建設(shè)
郧西县| 福安市| 论坛| 华阴市| 铜山县| 普兰县| 新闻| 景德镇市| 西宁市| 扶风县| 顺平县| 长丰县| 平乡县| 琼结县| 连云港市| 徐汇区| 维西| 陆良县| 江西省| 奈曼旗| 岳池县| 霍州市| 茂名市| 花莲市| 平和县| 子长县| 永吉县| 商水县| 云安县| 滦南县| 虎林市| 茶陵县| 泽库县| 日喀则市| 望江县| 石首市| 四会市| 永济市| 张家口市| 铅山县| 蚌埠市|