分類
發燒車訊

12萬買1.6T合資緊湊型SUV?看完恍然大悟!

7-2。9米C級=中大型車長度4。8米-5米,軸距2。8-3米,排量超過2。4LD級=大型車長度超過5米,軸距超過3米,排量超過3L呃,根據這個分類方法,不同的車型基本都可以找到各自對應的分類,而且通過看車型的分類級別,就能一目瞭然找到自己要的大小的車子。

昨晚,現代ENCINO上市

除了分體式大燈以及標配的1.6T發動機十分好看之外

還注意到一個有意思的地方

有媒體竟然叫它緊湊型SUV!

緊湊型SUV?

就是和CR-V、RAV4榮放一個級別?

只要12萬而且還標配1.6T?

這也太划算了吧

然而,事情沒那麼簡單,這貨的尺寸為4195*1800*1575mm,軸距為2600mm

長度和軸距甚至還不如本田XR-V這種小型SUV

比起中國車企的東南DX3、傳祺GS3等小型SUV也差了不少

這樣的尺寸也敢叫緊湊型SUV,是梁靜茹給它的勇氣嗎?

吐槽的同時

也必須弄清一個概念

汽車級別怎麼劃分的?

如今多數媒體車企給車型劃分分類的標準多是歐洲標準

也就是大眾汽車的分級辦法,綜合排量、車型大小等因素分為

A00級、A0級、A級、B級、C級、D級。

A00級=微型車

長度4米內,軸距2-2.3米

A0級=小型車

長度4-4.3米,軸距2.3-2.5米

A級=緊湊型車

長度4.2-4.6米,軸距2.5-2.7米

B級=中型車

長度4.5-4.9米,軸距2.7-2.9米

C級=中大型車

長度4.8米-5米,軸距2.8-3米,排量超過2.4L

D級=大型車

長度超過5米,軸距超過3米,排量超過3L

呃,根據這個分類方法,不同的車型基本都可以找到各自對應的分類,而且通過看車型的分類級別,就能一目瞭然找到自己要的大小的車子。因此這一套分類方法十分流行。

然而,這套方法的分級卻經常被車企混用,比如這一次,尺寸4195*1800*1575mm,軸距為2600mm的ENCINO也敢叫自己緊湊型SUV。

除了ENCINO之外,還有不少這類型的例子:

咱們熟悉的緊湊型轎車科魯茲,在官網pDF上把自己叫做“新銳性能中級車”,這,莫非科魯茲是中型轎車?

非也,中型車與中級車一字之差,但是差距可不是一星半點,前文中說到中型車是歐洲分類標準,而中級車則是咱們中國的標準了,依照中國汽車分類標準(GB9417-89)的分級方法,中級車屬於排量1.6-2.5L的車型,因此依照這個標準來看,科魯茲還確實是中級車,而且大多數緊湊型車也確實可以叫自己中級車,不過在咱們大多數人的理解中,中級車=中型車啊!因此科魯茲也確實有鑽這個空子的嫌疑。

為了產品賣得好一點,吹出一點牛皮也是合情合理的,不過相比上面兩款車型的手法,下面這些才是真大佬!

奔馳S級:再次發明汽車

奔馳S級在上市之初打出了許多十分誇張的口號,比如:“汽車發明者,再次發明汽車”“再見愛迪生”等等,雖然S級從設計的角度來說確實達到了一個新高度,但是再次發明汽車的口號也有些太狂了。

昂科威:百萬級最好的隔音

昂科威是別克旗下的中型SUV,售價21.99-31.99萬,這個價位的SUV老老實實賣車才是王道,然而昂科威並不安分,在上市之初昂科威便把百萬內最好的隔音作為賣點,要知道不同價位車型之間的差別可是十分大的,昂科威這口號也是夠大膽的,不過經實測,昂科威隔音確實比百萬級的卡宴更好。

君越:圖書館級靜音

同樣宣傳隔音的還有君越,這一次君越使用了圖書館級靜音水準這個詞彙,而根據《圖書館、博物館、美術館、展覽館衛生標準》(GB9669-1996)規定,圖書館的噪聲標準為≤50dB(A),這樣的噪音數值恐怕君越只有怠速工況下能夠達到吧~

攬勝:越野車中的勞斯萊斯

嚴格來說,這個稱號是廣大粉絲送的,不過也是非常霸氣的一個稱號了,除了越野車中的勞斯萊斯之外,路虎還有英國皇室狩獵專用車等頭銜,不過勞斯萊斯的越野車馬上就要上市了…

總結:

汽車廣告與宣傳中往往用到許多誇張的詞彙,越級、澎湃、奢華等詞語的出鏡率十分高,這樣的宣傳往往能讓人印象深刻,不過如果真的太相信這些宣傳詞彙,到頭來往往會讓人失望,汽車說到底也只是普通商品,既然是商品那麼一分錢一分貨這個道理還是適用的,用10萬元買到20萬的品質這種事情往往不會存在的,作為消費者,在看車企宣傳的同時一定要自己辨別,這樣才能避免被騙哦~本站聲明:網站內容來源於http://www.auto6s.com/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※教你寫出一流的銷售文案?

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※回頭車貨運收費標準

※別再煩惱如何寫文案,掌握八大原則!

※超省錢租車方案

※產品缺大量曝光嗎?你需要的是一流包裝設計!

分類
發燒車訊

空間大做工好!這是6萬元區間最好的7座車?

相比較7座SUV來說,歐尚A800的後備箱也小有優勢,主要體現在A800的後備箱高度和進深上,在這兩個參數上A800十分有優勢,超過1米的後備箱高度十分誇張。A800搭載了一台1。5T渦輪增壓發動機,型號為JL476ZQCD,這台全鋁發動機帶有DVVT技術,最大功率156馬力,最大扭矩225牛米,參數並不是很高。

看過了非常適合家用的SUV奇駿、夠大夠霸氣的銳界、大氣實用的奧德賽、精緻好用的途安L之後,你是覺得SUV好還是MpV好呢?有興趣的朋友可以點擊鏈接查看往期文章:

奧德賽:67.9分

途安L:64.6分

銳界:66分

奇駿:65.4分

說起8萬左右的MpV車型,就不得不提歐尚A800了!它最大的特點當然是空間、動力以及配置,這些方面它絲毫不遜於對手寶駿,而且由於這台車還是我們的工作車的原因,長期使用下來我們對它也是非常熟悉,歐尚A800外表雖然不算出色,但是論及內在絕對是一名出色的选手!

在測試中歐尚A800也表現出了強大的實力,無論是在外觀品質、動力表現以及車內空間上都可圈可點。

相比較長安以往的車型,歐尚A800在設計上盡量營造出時尚感與精緻感,從外觀很多細節上都能看到它的設計思路,這樣的造型設計顯然是成功的,A800雖然尺寸龐大,但是看上去卻並不顯臃腫,而且較大的車窗也能夠提供非常不錯的採光。

內飾也是如此,我們這台高配車型中控台非常簡潔,碩大的屏幕與空調操作區的按鈕擺放都很有檔次感,全液晶儀錶盤在這個價格區間的車型里也十分少見,加上內飾的材質比較考究,整體營造的氛圍還是不錯的。

A800的外觀工藝相比較更高價位的車型也毫不遜色,無論是外觀的鈑金縫隙,還是車漆的噴漆均勻度都很不錯,不過車漆的厚度平均不足100微米則有點太薄了。

雖然內飾看上去不錯,但是受限於價格,A800在內飾材質上大面積使用了硬塑料,如果真的談及觸感的話還是顯得有一些廉價,不過好在內飾的拼裝工藝還是不錯的,塑料件也沒有毛刺。

有了龐大的尺寸以及方正的設計,A800的內部空間可以說十分寬裕,無論是前排後排還是第三排空間都可以用寬敞來形容,而且A800的第二排還是採用獨立座椅設計,相比較大多數轎車來說都要更加舒適,不過受限於第三排地板以及空間,第三排的座椅規格比前兩排要小一些,硬度上也更硬一點。

相比較7座SUV來說,歐尚A800的後備箱也小有優勢,主要體現在A800的後備箱高度和進深上,在這兩個參數上A800十分有優勢,超過1米的後備箱高度十分誇張。

A800搭載了一台1.5T渦輪增壓發動機,型號為JL476ZQCD,這台全鋁發動機帶有DVVT技術,最大功率156馬力,最大扭矩225牛米,參數並不是很高。

與之匹配的是6擋手動變速箱,這台變速箱齒比比較綿密,尤其是前兩個擋位可以說是為拉貨設計的,非常大的齒比對於載重來說是一件好事。

不過由於齒比比較綿密,因此在加速上A800就有些吃虧了,2擋僅能跑到70km/h的速度來,再升上3擋之後才能破百,而3擋的加速度就遠不如1/2擋了,因此最終A800的破百成績為12.5秒,這樣的成績對於這台大傢伙來說倒也還算可以。

作為一台MpV車型,A800顯然和運動扯不上關係,對於這類車型來說我們的要求也就是好開,從這個角度考慮A800確實算得上不錯,首先A800的離合點十分清晰,變速箱的換擋手感也不錯!加上發動機的低扭還算不錯,開起來比較得心應手。

不過由於尺寸龐大且車身較高,懸挂也偏軟,因此A800在高速行駛的穩定性上和轎車以及多數SUV比還是不佔優勢,尤其是面對橫風的時候需要更加集中精力駕駛。

雖然加速成績是橫評車型里最慢的,不過在實際動力感受上還是不錯,尤其是低速駕駛的時候會感覺車子很有力,再加上不錯的變速箱,A800是一台很能輕鬆駕馭的手動擋車型。

對於這類型的MpV,其實最讓人擔心的就是隔音了,由於車內空間比較大,車子的迎風面積也大,所以容易在第二/三排產生較大的共鳴聲和風聲,不過在實際體驗中A800這個問題倒也不算嚴重,當然相比較轎車那肯定是差一些了。

在售價上歐尚A800的指導價算是自主入門MpV中比較低的了,性價比還是不錯的。

A800在諸多方面的表現都堪稱出色,優異的配置、不錯的駕駛感受和寬敞的空間都是它的優勢所在,對於這個價位買車的消費者來說這恰恰也是它們最關心的,再加上較低的售價使得這款車有了不錯的性價比,所以在6-9萬的MpV市場中A800確實算得上一個稱心的好選擇!

本站聲明:網站內容來源於http://www.auto6s.com/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※超省錢租車方案

※別再煩惱如何寫文案,掌握八大原則!

※回頭車貨運收費標準

※教你寫出一流的銷售文案?

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

分類
發燒車訊

ThreadLocal源碼解析-Java8,ThreadLocal的使用場景分析,利用線性探測法解決hash衝突,Java-強引用、軟引用、弱引用、虛引用,利用線性探測法解決hash衝突,分析ThreadLocal的弱引用與內存泄漏問題-Java8

目錄

一.ThreadLocal介紹

  1.1 ThreadLocal的功能

  1.2 ThreadLocal使用示例

二.源碼分析-ThreadLocal

  2.1 ThreadLocal的類層級關係

  2.2 ThreadLocal的屬性字段

  2.3 創建ThreadLocal對象

  2.4 ThreadLocal-set操作

  2.5 ThreadLocal-get操作

  2.6 ThreadLocal-remove操作

三.ThreadLocalMap類

  3.0 線性探測算法解決hash衝突

  3.1 Entry內部類

  3.2 ThreadLocalMap的常量介紹

  3.3 實例化ThreadLocalMap

  3.4 ThreadLocalMap的set操作

  3.5 清理陳舊Entry和rehash

四.總結 

 

一.介紹ThreadLocal

1.1ThreadLocal的功能

  我們知道,變量從作用域範圍進行分類,可以分為“全局變量”、“局部變量”兩種:

  1.全局變量(global variable),比如類的靜態屬性(加static關鍵字),在類的整個生命周期都有效;

  2.局部變量(local variable),比如在一個方法中定義的變量,作用域只是在當前方法內,方法執行完畢后,變量就銷毀(釋放)了;

  使用全局變量,當多個線程同時修改靜態屬性,就容易出現併發問題,導致臟數據;而局部變量一般來說不會出現併發問題(在方法中開啟多線程併發修改局部變量,仍可能引起併發問題);

  再看ThreadLocal,可以用來保存局部變量,只不過這個“局部”是指“線程”作用域,也就是說,該變量在該線程的整個生命周期中有效。

  關於ThreadLocal的使用場景,可以查看ThreadLocal的使用場景分析。

 

1.2ThreadLocal的使用示例

  ThreadLocal使用非常簡單。

package cn.ganlixin;

import org.junit.Test;

import java.util.Arrays;
import java.util.List;

public class TestThreadLocal {

    private static class Goods {
        public Integer id;
        public List<String> tags;
    }

    @Test
    public void testReference() {
        Goods goods1 = new Goods();
        goods1.id = 10;
        goods1.tags = Arrays.asList("healthy", "cheap");

        ThreadLocal<Goods> threadLocal = new ThreadLocal<>();
        threadLocal.set(goods1);

        Goods goods2 = threadLocal.get();
        System.out.println(goods1); // cn.ganlixin.TestThreadLocal$Goods@1c655221
        System.out.println(goods2); // cn.ganlixin.TestThreadLocal$Goods@1c655221

        goods2.id = 100;
        System.out.println(goods1.id);  // 100
        System.out.println(goods2.id);  // 100

        threadLocal.remove();
        System.out.println(threadLocal.get()); // null
    }

    @Test
    public void test2() {
        // 一個線程中,可以創建多個ThreadLocal對象,多個ThreadLoca對象互不影響
        ThreadLocal<String> threadLocal1 = new ThreadLocal<>();
        ThreadLocal<String> threadLocal2 = new ThreadLocal<>();
        // ThreadLocal存的值默認為null

        System.out.println(threadLocal1.get()); // null

        threadLocal1.set("this is value1");
        threadLocal2.set("this is value2");
        System.out.println(threadLocal1.get()); // this is value1
        System.out.println(threadLocal2.get());  // this is value2

        // 可以重寫initialValue進行設置初始值
        ThreadLocal<String> threadLocal3 = new ThreadLocal<String>() {
            @Override
            protected String initialValue() {
                return "this is initial value";
            }
        };
        System.out.println(threadLocal3.get()); // this is initial value
    }
}

  

二.源碼分析-ThreadLocal

2.1ThreadLocal類層級關係

  

  ThreadLocal類中有一個內部類ThreadLocalMap,這個類特別重要,ThreadLocal的各種操作基本都是圍繞ThreadLocalMap進行的

  對於ThreadLocalMap有來說,它內部定義了一個Entry內部類,有一個table屬性,是一個Entry數組,和HashMap有一些相似的地方,但是ThreadLocalMap和HashMap並沒有什麼關係。

  先大概看一下內存關係圖,不理解也沒關係,看了後面的代碼應該就能理解了:

   

  大概解釋一下,棧中的Thread ref(引用)堆中的Thread對象,Thread對象有一個屬性threadlocals(ThreadLocalMap類型),這個Map中每一項(Entry)的value是ThreadLocal.set()的值,而Map的key則是ThreadLocal對象。

  下面在介紹源碼的時候,會從兩部分進行介紹,先介紹ThreadLocal的常用api,然後再介紹ThreadLocalMap,因為ThreadLocal的api內部其實都是在操作ThreadLocalMap,所以看源碼時一定要知道他們倆之間的關係

 

2.2ThreadLocal的屬性

  ThreadLocal有3個屬性,主要的功能就是生成ThreadLocal的hash值。

// threadLocalHashCode用來表示當前ThreadLocal對象的hashCode,通過計算獲得
private final int threadLocalHashCode = nextHashCode();

// 一個AtomicInteger類型的屬性,功能就是計數,各種操作都是原子性的,在併發時不會出現問題
private static AtomicInteger nextHashCode = new AtomicInteger();

// hash值的增量,不是隨便指定的,被稱為“黃金分割數”,能讓hash結果均衡分佈
private static final int HASH_INCREMENT = 0x61c88647;

/**
 * 通過計算,為當前ThreadLocal對象生成一個HashCode
 */
private static int nextHashCode() {
    // 獲取當前nextHashCode,然後遞增HASH_INCREMENT
    return nextHashCode.getAndAdd(HASH_INCREMENT);
}

  

2.3創建ThreadLocal對象

  ThreadLocal類,只有一個無參構造器,如果需要是指默認值,則可以重寫initialValue方法:

public ThreadLocal() {}

/**
 * 初始值默認為null,要設置初始值,只需要設置為方法返回值即可
 *
 * @return ThreadLocal的初始值
 */
protected T initialValue() {
    return null;
}

  需要注意的是initialValue方法並不會在創建ThreadLocal對象的時候設置初始值,而是延遲執行:當ThreadLocal直接調用get時才會觸發initialValue執行(get之前沒有調用set來設置過值),initialValue方法在後面還會介紹。 

 

2.4ThreadLocal-set操作

  下面這段代碼只給出了ThreadLocal的set代碼:

public void set(T value) {
    // 獲取當前線程
    Thread t = Thread.currentThread();

    // 獲取當前線程的ThreadLocalMap屬性,ThreadLocal有一個threadLocals屬性(ThreadLocalMap類型)
    ThreadLocalMap map = getMap(t);

    if (map != null) {
        // 如果當前線程有關聯的ThreadLocalMap對象,則調用ThreadLocalMap的set方法進行設置
        map.set(this, value);
    } else {
        // 創建一個與當前線程關聯的ThreadLocalMap對象,並設置對應的value
        createMap(t, value);
    }
}

/**
 * 獲取線程關聯的ThreadLocalMap對象
 */
ThreadLocalMap getMap(Thread t) {
    return t.threadLocals;
}

/**
 * 創建ThreadLocalMap
 * @param t          key為當前線程
 * @param firstValue value為ThreadLocal.set的值
 */
void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}

  如果想立即了解ThreadLocalMap的set方法,則可點此跳轉!

 

2.5ThreadLocal-get操作

  前面說過“重寫ThreadLocal的initialValue方法來設置ThreadLocal的默認值,並不是在創建ThreadLocal的時候執行的,而是在直接get的時候執行的”,看了下面的代碼,就知道這句話的具體含義了,感覺設計很巧妙:

public T get() {
    // 獲取當前線程
    Thread t = Thread.currentThread();

    // 獲取當前線程對象的threadLocals屬性
    ThreadLocalMap map = getMap(t);

    // 若當前線程對象的threadLocals屬性不為空(map不為空)
    if (map != null) {
        // 當前ThreadLocal對象作為key,獲取ThreadLocalMap中對應的Entry
        ThreadLocalMap.Entry e = map.getEntry(this);

        // 如果找到對應的Entry,則證明該線程的該ThreadLocal有值,返回值即可
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T) e.value;
            return result;
        }
    }

    // 1.當前線程對象的threadLocals屬性為空(map為空)
    // 2.或者map不為空,但是未在map中查詢到以該ThreadLocal對象為key對應的entry
    // 這兩種情況,都會進行設置初始值,並將初始值返回
    return setInitialValue();
}

/**
 * 設置ThreadLocal初始值
 *
 * @return 初始值
 */
private T setInitialValue() {
    // 調用initialValue方法,該方法可以在創建ThreadLocal的時候重寫
    T value = initialValue();
    Thread t = Thread.currentThread();

    // 獲取當前線程的threadLocals屬性(map)
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        // threadLocals屬性值不為空,則進行調用ThreadLocalMap的set方法
        map.set(this, value);
    } else {
        // 沒有關聯的threadLocals,則創建ThreadLocalMap,並在map中新增一個Entry
        createMap(t, value);
    }

    // 返回初始值
    return value;
}

/**
 * 初始值默認為null,要設置初始值,只需要設置為方法返回值即可
 * 創建ThreadLocal設置默認值,可以覆蓋initialValue方法,initialValue方法不是在創建ThreadLocal時執行,而是這個時候執行
 *
 * @return ThreadLocal的初始值
 */
protected T initialValue() {
    return null;
}

     

2.6ThreadLocal-remove操作

  一般是在ThreadLocal對象使用完后,調用ThreadLocal的remove方法,在一定程度上,可以避免內存泄露;

 

/**
 * 刪除當前線程中threadLocals屬性(map)中的Entry(以當前ThreadLocal為key的)
 */
public void remove() {
    // 獲取當前線程的threadLocals屬性(ThreadLocalMap)
    ThreadLocalMap m = getMap(Thread.currentThread());

    if (m != null) {
        // 調用ThreadLocalMap的remove方法,刪除map中以當前ThreadLocal為key的entry
        m.remove(this);
    }
}

 

三.ThreadLocalMap內部類

3.0 線性探測算法解決hash衝突

  在介紹ThreadLocalMap的之前,強烈建議先了解一下線性探測算法,這是一種解決Hash衝突的方案,如果不了解這個算法就去看ThreadLocalMap的源碼就會非常吃力,會感到莫名其妙。

  鏈接在此:利用線性探測法解決hash衝突

 

3.1Entry內部類

  ThreadLocalMap是ThreadLocal的內部類,ThreadLocalMap底層使用數組實現,每一個數組的元素都是Entry類型(在ThreadLocalMap中定義的),源碼如下:

/**
 * ThreadLocalMap中存放的元素類型,繼承了弱引用類
 */
static class Entry extends WeakReference<ThreadLocal<?>> {
    // key對應的value,注意key是ThreadLocal類型
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}

  ThreadLocalMap和HashMap類似,比較一下:

  a:底層都是使用數組實現,數組元素類型都是內部定義,Java8中,HashMap的元素是Node類型(或者TreeNode類型),ThreadLocalMap中的元素類型是Entry類型;

  b.都是通過計算得到一個值,將這個值與數組的長度(容量)進行與操作,確定Entry應該放到哪個位置;

  c.都有初始容量、負載因子,超過擴容閾值將會觸發擴容;但是HashMap的初始容量、負載因子是可以更改的,而ThreadLocalMap的初始容量和負載因子不可修改;

  注意Entry繼承自WeakReference類,在實例化Entry時,將接收的key傳給父類構造器(也就是WeakReference的構造器),WeakReference構造器又將key傳給它的父類構造器(Reference):

// 創建Reference對象,接受一個引用
Reference(T referent) {
    this(referent, null);
}

// 設置引用
Reference(T referent, ReferenceQueue<? super T> queue) {
    this.referent = referent;
    this.queue = (queue == null) ? ReferenceQueue.NULL : queue;
}

  關於Java的各種引用,可以參考:Java-強引用、軟引用、弱引用、虛引用

 

3.2ThreadLocalMap的常量介紹

// ThreadLocalMap的初始容量
private static final int INITIAL_CAPACITY = 16;

// ThreadLocalMap底層存數據的數組
private Entry[] table;

// ThreadLocalMap中元素的個數
private int size = 0;

// 擴容閾值,當size達到閾值時會觸發擴容(loadFactor=2/3;newCapacity=2*oldCapacity)
private int threshold; // Default to 0

  

3.3創建ThreadLocalMap對象

  創建ThreadLocalMap,是在第一次調用ThreadLocal的set或者get方法時執行,其中第一次未set值,直接調用get時,就會利用ThreadLocal的初始值來創建ThreadLocalMap。

  ThreadLocalMap內部類的源碼如下:

/**
 * 初始化一個ThreadLocalMap對象(第一次調用ThreadLocal的set方法時創建),傳入ThreadLocal對象和對應的value
 */
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
    // 創建一個Entry數組,容量為16(默認)
    table = new Entry[INITIAL_CAPACITY];

    // 計算新增的元素,應該放到數組的哪個位置,根據ThreadLocal的hash值與初始容量進行"與"操作
    int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);

    // 創建一個Entry,設置key和value,注意Entry中沒有key屬性,key屬性是傳給Entry的父類WeakReference
    table[i] = new Entry(firstKey, firstValue);

    // 初始容量為1
    size = 1;

    // 設置擴容閾值
    setThreshold(INITIAL_CAPACITY);
}

/**
 * 設置擴容閾值,接收容量值,負載因子固定為2/3
 */
private void setThreshold(int len) {
    threshold = len * 2 / 3;
}

 

3.4 ThreadLocalMap的set操作

  ThreadLocal的set方法,其實核心就是調用ThreadLocalMap的set方法,set方法的流程比較長

/**
 * 為當前ThreadLocal對象設置value
 */
private void set(ThreadLocal<?> key, Object value) {
    Entry[] tab = table;
    int len = tab.length;

    // 計算新元素應該放到哪個位置(這個位置不一定是最終存放的位置,因為可能會出現hash衝突)
    int i = key.threadLocalHashCode & (len - 1);

    // 判斷計算出來的位置是否被佔用,如果被佔用,則需要找出應該存放的位置
    for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
        // 獲取Entry中key,也就是弱引用的對象
        ThreadLocal<?> k = e.get();

        // 判斷key是否相等(判斷弱引用的是否為同一個ThreadLocal對象)如果是,則進行覆蓋
        if (k == key) {
            e.value = value;
            return;
        }

        // k為null,也就是Entry的key已經被回收了,當前的Entry是一個陳舊的元素(stale entry)
        if (k == null) {
            // 用新元素替換掉陳舊元素,同時也會清理其他陳舊元素,防止內存泄露
            replaceStaleEntry(key, value, i);
            return;
        }
    }

    // map中沒有ThreadLocal對應的key,或者說沒有找到陳舊的Entry,則創建一個新的Entry,放入數組中
    tab[i] = new Entry(key, value);
    // ThreadLocalMap的元素數量加1
    int sz = ++size;

    // 先清理map中key為null的Entry元素,該Entry也應該被回收掉,防止內存泄露
    // 如果清理出陳舊的Entry,那麼就判斷是否需要擴容,如果需要的話,則進行rehash
    if (!cleanSomeSlots(i, sz) && sz >= threshold) {
        rehash();
    }
}

  上面最後幾行代碼涉及到清理陳舊Entry和rehash,這兩塊的代碼在下面。

 

3.5清理陳舊Entry和rehash

  陳舊的Entry,是指Entry的key為null,這種情況下,該Entry是不可訪問的,但是卻不會被回收,為了避免出現內存泄漏,所以需要在每次get、set、replace時,進行清理陳舊的Entry,下面只給出一部分代碼:

/**
 * 清理map中key為null的Entry元素,該Entry也應該被回收掉,防止內存泄露
 *
 * @param i 新Entry插入的位置
 * @param n 數組中元素的數量
 * @return 是否有陳舊的entry的清除
 */
private boolean cleanSomeSlots(int i, int n) {
    boolean removed = false;
    Entry[] tab = table;
    int len = tab.length;
    do {
        i = nextIndex(i, len);
        Entry e = tab[i];
        if (e != null && e.get() == null) {
            n = len;
            removed = true;
            i = expungeStaleEntry(i);
        }
    } while ((n >>>= 1) != 0);
    return removed;
}

private void rehash() {
    // 清除底層數組中所有陳舊的(stale)的Entry,也就是key為null的Entry
    // 同時每清除一個Entry,就對其後面的Entry重新計算hash,獲取新位置,使用線性探測法,重新確定最終位置
    expungeStaleEntries();

    // 清理完陳舊Entry后,判斷是否需要擴容
    if (size >= threshold - threshold / 4) {
        // 擴容時,容量變為舊容量的2倍,再進行rehash,並使用線性探測發確定Entry的新位置
        resize();
    }
}

  在rehash的時候,涉及到“線性探測法”,是一種用來解決hash衝突的方案,可以查看利用線性探測法解決hash衝突了解詳情。

 

3.6ThreadLocalMap-remove操作

  remove操作,是調用ThreadLocal.remove()方法時,刪除當前線程的ThreadLocalMap中該ThreadLocal為key的Entry。

/**
 * 移除當前線程的threadLocals屬性中key為ThreadLocal的Entry
 *
 * @param key 要移除的Entry的key(ThreadLocal對象)
 */
private void remove(ThreadLocal<?> key) {
    Entry[] tab = table;
    int len = tab.length;

    // 計算出該ThreadLocal對應的key應該存放的位置
    int i = key.threadLocalHashCode & (len - 1);

    // 找到指定位置,開始按照線性探測算法進行查找到該Thread的Entry
    for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {

        // 如果Entry的key相同
        if (e.get() == key) {
            // 調用WeakReference的clear方法,Entry的key是弱引用,指向ThreadLocal,現在將key指向null
            // 則該ThreadLocal對象在會在下一次gc時,被垃圾收集器回收
            e.clear();

            // 將該位置的Entry中的value置為null,於是value引用的對象也會被垃圾收集器回收(不會造成內存泄漏)
            // 同時內部會調整Entry的順序(開放探測算法的特點,刪除元素後會重新調整順序)
            expungeStaleEntry(i);

            return;
        }
    }
}

 

四.總結

  在學習ThreadLocal類源碼的過程還是受益頗多的:

  1.ThreadLocal的使用場景;

  2.initialValue的延遲執行;

  3.HashMap使用鏈表+紅黑樹解決hash衝突,ThreadLocalMap使用線性探測算法(開放尋址)解決hash衝突

  另外,ThreadLocal還有一部分內容,是關於弱引用和內存泄漏的問題,可以參考:分析ThreadLocal的弱引用與內存泄漏問題-Java8。

 

  原文地址:https://www.cnblogs.com/-beyond/p/13093032.html

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

分類
發燒車訊

線上服務的FGC問題排查,看這篇就夠了!

線上服務的GC問題,是Java程序非常典型的一類問題,非常考驗工程師排查問題的能力。同時,幾乎是面試必考題,但是能真正答好此題的人並不多,要麼原理沒吃透,要麼缺乏實戰經驗。

過去半年時間里,我們的廣告系統出現了多次和GC相關的線上問題,有Full GC過於頻繁的,有Young GC耗時過長的,這些問題帶來的影響是:GC過程中的程序卡頓,進一步導致服務超時從而影響到廣告收入。

這篇文章,我將以一個FGC頻繁的線上案例作為引子,詳細介紹下GC的排查過程,另外會結合GC的運行原理給出一份實踐指南,希望對你有所幫助。內容分成以下3個部分:

1、從一次FGC頻繁的線上案例說起

2、GC的運行原理介紹

3、排查FGC問題的實踐指南

01 從一次FGC頻繁的線上案例說起

去年10月份,我們的廣告召回系統在程序上線后收到了FGC頻繁的系統告警,通過下面的監控圖可以看到:平均每35分鐘就進行了一次FGC。而程序上線前,我們的FGC頻次大概是2天一次。下面,詳細介紹下該問題的排查過程。

1. 檢查JVM配置

通過以下命令查看JVM的啟動參數:
ps aux | grep “applicationName=adsearch”

-Xms4g -Xmx4g -Xmn2g -Xss1024K
-XX:ParallelGCThreads=5
-XX:+UseConcMarkSweepGC
-XX:+UseParNewGC
-XX:+UseCMSCompactAtFullCollection
-XX:CMSInitiatingOccupancyFraction=80

可以看到堆內存為4G,新生代為2G,老年代也為2G,新生代採用ParNew收集器,老年代採用併發標記清除的CMS收集器,當老年代的內存佔用率達到80%時會進行FGC。

進一步通過 jmap -heap 7276 | head -n20 可以得知新生代的Eden區為1.6G,S0和S1區均為0.2G。

2. 觀察老年代的內存變化

通過觀察老年代的使用情況,可以看到:每次FGC后,內存都能回到500M左右,因此我們排除了內存泄漏的情況。

3. 通過jmap命令查看堆內存中的對象

通過命令 jmap -histo 7276 | head -n20

上圖中,按照對象所佔內存大小排序,显示了存活對象的實例數、所佔內存、類名。可以看到排名第一的是:int[],而且所佔內存大小遠遠超過其他存活對象。至此,我們將懷疑目標鎖定在了 int[] .

4. 進一步dump堆內存文件進行分析

鎖定 int[] 后,我們打算dump堆內存文件,通過可視化工具進一步跟蹤對象的來源。考慮堆轉儲過程中會暫停程序,因此我們先從服務管理平台摘掉了此節點,然後通過以下命令dump堆內存:

jmap -dump:format=b,file=heap 7276

通過JVisualVM工具導入dump出來的堆內存文件,同樣可以看到各個對象所佔空間,其中int[]佔到了50%以上的內存,進一步往下便可以找到 int[] 所屬的業務對象,發現它來自於架構團隊提供的codis基礎組件。

5. 通過代碼分析可疑對象

通過代碼分析,codis基礎組件每分鐘會生成約40M大小的int數組,用於統計TP99 和 TP90,數組的生命周期是一分鐘。而根據第2步觀察老年代的內存變化時,發現老年代的內存基本上也是每分鐘增加40多M,因此推斷:這40M的int數組應該是從新生代晉陞到老年代。

我們進一步查看了YGC的頻次監控,通過下圖可以看到大概1分鐘有8次左右的YGC,這樣基本驗證了我們的推斷:因為CMS收集器默認的分代年齡是6次,即YGC 6次后還存活的對象就會晉陞到老年代,而codis組件中的大數組生命周期是1分鐘,剛好滿足這個要求。

至此,整個排查過程基本結束了,那為什麼程序上線前沒出現此問題呢?通過上圖可以看到:程序上線前YGC的頻次在5次左右,此次上線后YGC頻次變成了8次左右,從而引發了此問題。

6. 解決方案

為了快速解決問題,我們將CMS收集器的分代年齡改成了15次,改完后FGC頻次恢復到了2天一次,後續如果YGC的頻次超過每分鐘15次還會再次觸發此問題。當然,我們最根本的解決方案是:優化程序以降低YGC的頻率,同時縮短codis組件中int數組的生命周期,這裏就不做展開了。

02 GC的運行原理介紹

上面整個案例的分析過程中,其實涉及到很多GC的原理知識,如果不懂得這些原理就着手處理,其實整個排查過程是很抓瞎的。

這裏,我選擇幾個最核心的知識點,展開介紹下GC的運行原理,最後再給出一份實踐指南。

1. 堆內存結構

大家都知道: GC分為YGC和FGC,它們均發生在JVM的堆內存上。先來看下JDK8的堆內存結構:

可以看到,堆內存採用了分代結構,包括新生代和老年代。新生代又分為:Eden區,From Survivor區(簡稱S0),To Survivor區(簡稱S1區),三者的默認比例為8:1:1。另外,新生代和老年代的默認比例為1:2。

堆內存之所以採用分代結構,是考慮到絕大部分對象都是短生命周期的,這樣不同生命周期的對象可放在不同的區域中,然後針對新生代和老年代採用不同的垃圾回收算法,從而使得GC效率最高。

2. YGC是什麼時候觸發的?

大多數情況下,對象直接在年輕代中的Eden區進行分配,如果Eden區域沒有足夠的空間,那麼就會觸發YGC(Minor GC),YGC處理的區域只有新生代。因為大部分對象在短時間內都是可收回掉的,因此YGC后只有極少數的對象能存活下來,而被移動到S0區(採用的是複製算法)。

當觸發下一次YGC時,會將Eden區和S0區的存活對象移動到S1區,同時清空Eden區和S0區。當再次觸發YGC時,這時候處理的區域就變成了Eden區和S1區(即S0和S1進行角色交換)。每經過一次YGC,存活對象的年齡就會加1。

3. FGC又是什麼時候觸發的?

下面4種情況,對象會進入到老年代中:

1、YGC時,To Survivor區不足以存放存活的對象,對象會直接進入到老年代。

2、經過多次YGC后,如果存活對象的年齡達到了設定閾值,則會晉陞到老年代中。

3、動態年齡判定規則,To Survivor區中相同年齡的對象,如果其大小之和佔到了 To Survivor區一半以上的空間,那麼大於此年齡的對象會直接進入老年代,而不需要達到默認的分代年齡。

4、大對象:由-XX:PretenureSizeThreshold啟動參數控制,若對象大小大於此值,就會繞過新生代, 直接在老年代中分配。

當晉陞到老年代的對象大於了老年代的剩餘空間時,就會觸發FGC(Major GC),FGC處理的區域同時包括新生代和老年代。除此之外,還有以下4種情況也會觸發FGC:

1、老年代的內存使用率達到了一定閾值(可通過參數調整),直接觸發FGC。

2、空間分配擔保:在YGC之前,會先檢查老年代最大可用的連續空間是否大於新生代所有對象的總空間。如果小於,說明YGC是不安全的,則會查看參數 HandlePromotionFailure 是否被設置成了允許擔保失敗,如果不允許則直接觸發Full GC;如果允許,那麼會進一步檢查老年代最大可用的連續空間是否大於歷次晉陞到老年代對象的平均大小,如果小於也會觸發 Full GC。

3、Metaspace(元空間)在空間不足時會進行擴容,當擴容到了-XX:MetaspaceSize 參數的指定值時,也會觸發FGC。

4、System.gc() 或者Runtime.gc() 被顯式調用時,觸發FGC。

4. 在什麼情況下,GC會對程序產生影響?

不管YGC還是FGC,都會造成一定程度的程序卡頓(即Stop The World問題:GC線程開始工作,其他工作線程被掛起),即使採用ParNew、CMS或者G1這些更先進的垃圾回收算法,也只是在減少卡頓時間,而並不能完全消除卡頓。

那到底什麼情況下,GC會對程序產生影響呢?根據嚴重程度從高到底,我認為包括以下4種情況:

1、FGC過於頻繁:FGC通常是比較慢的,少則幾百毫秒,多則幾秒,正常情況FGC每隔幾個小時甚至幾天才執行一次,對系統的影響還能接受。但是,一旦出現FGC頻繁(比如幾十分鐘就會執行一次),這種肯定是存在問題的,它會導致工作線程頻繁被停止,讓系統看起來一直有卡頓現象,也會使得程序的整體性能變差。

2、YGC耗時過長:一般來說,YGC的總耗時在幾十或者上百毫秒是比較正常的,雖然會引起系統卡頓幾毫秒或者幾十毫秒,這種情況幾乎對用戶無感知,對程序的影響可以忽略不計。但是如果YGC耗時達到了1秒甚至幾秒(都快趕上FGC的耗時了),那卡頓時間就會增大,加上YGC本身比較頻繁,就會導致比較多的服務超時問題。

3、FGC耗時過長:FGC耗時增加,卡頓時間也會隨之增加,尤其對於高併發服務,可能導致FGC期間比較多的超時問題,可用性降低,這種也需要關注。

4、YGC過於頻繁:即使YGC不會引起服務超時,但是YGC過於頻繁也會降低服務的整體性能,對於高併發服務也是需要關注的。

其中,「FGC過於頻繁」和「YGC耗時過長」,這兩種情況屬於比較典型的GC問題,大概率會對程序的服務質量產生影響。剩餘兩種情況的嚴重程度低一些,但是對於高併發或者高可用的程序也需要關注。

03 排查FGC問題的實踐指南

通過上面的案例分析以及理論介紹,再總結下FGC問題的排查思路,作為一份實踐指南供大家參考。

1. 清楚從程序角度,有哪些原因導致FGC?

1、大對象:系統一次性加載了過多數據到內存中(比如SQL查詢未做分頁),導致大對象進入了老年代。

2、內存泄漏:頻繁創建了大量對象,但是無法被回收(比如IO對象使用完后未調用close方法釋放資源),先引發FGC,最後導致OOM.

3、程序頻繁生成一些長生命周期的對象,當這些對象的存活年齡超過分代年齡時便會進入老年代,最後引發FGC. (即本文中的案例)

4、程序BUG導致動態生成了很多新類,使得 Metaspace 不斷被佔用,先引發FGC,最後導致OOM.

5、代碼中顯式調用了gc方法,包括自己的代碼甚至框架中的代碼。

6、JVM參數設置問題:包括總內存大小、新生代和老年代的大小、Eden區和S區的大小、元空間大小、垃圾回收算法等等。

2. 清楚排查問題時能使用哪些工具

1、公司的監控系統:大部分公司都會有,可全方位監控JVM的各項指標。

2、JDK的自帶工具,包括jmap、jstat等常用命令:

查看堆內存各區域的使用率以及GC情況
jstat -gcutil -h20 pid 1000

查看堆內存中的存活對象,並按空間排序
jmap -histo pid | head -n20

dump堆內存文件
jmap -dump:format=b,file=heap pid

3、可視化的堆內存分析工具:JVisualVM、MAT等

3. 排查指南

1、查看監控,以了解出現問題的時間點以及當前FGC的頻率(可對比正常情況看頻率是否正常)

2、了解該時間點之前有沒有程序上線、基礎組件升級等情況。

3、了解JVM的參數設置,包括:堆空間各個區域的大小設置,新生代和老年代分別採用了哪些垃圾收集器,然後分析JVM參數設置是否合理。

4、再對步驟1中列出的可能原因做排除法,其中元空間被打滿、內存泄漏、代碼顯式調用gc方法比較容易排查。

5、針對大對象或者長生命周期對象導致的FGC,可通過 jmap -histo 命令並結合dump堆內存文件作進一步分析,需要先定位到可疑對象。

6、通過可疑對象定位到具體代碼再次分析,這時候要結合GC原理和JVM參數設置,弄清楚可疑對象是否滿足了進入到老年代的條件才能下結論。

04 最後的話

這篇文章通過線上案例並結合GC原理詳細介紹了FGC的排查過程,同時給出了一份實踐指南。

後續會以類似的方式,再分享一個YGC耗時過長的案例,希望能幫助大家吃透GC問題排查,如果覺得本文對你有幫助,請大家關注我的個人公眾號!

– End –

作者簡介:程序員,985碩士,前亞馬遜Java工程師,現58轉轉技術總監。持續分享技術和管理方向的文章。如果感興趣,可微信掃描下面的二維碼關注我的公眾號:『IT人的職場進階』

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

分類
發燒車訊

3dTiles 數據規範詳解[1] 介紹

版權:轉載請帶原地址。https://www.cnblogs.com/onsummer/p/12799366.html @秋意正寒

Web中的三維

html5和webgl技術使得瀏覽器三維變成了可能。

巧婦難為無米之炊,三維數據(三維模型)是三維可視化重要的一環,事實上就是:三維數據眾多,行業跨界廣。

參考資料:http://www.bgteach.com/article/132

three.js的各種加載器實現了大部分通用三維格式的加載,屏蔽了格式不同的數據結構差異。

然而,這樣還是不能滿足日益增長的效果需求,比如場景一大,模型文件體積變大,解析所耗費的時間越來越長。

webgl,包括所有gpu有關的圖形渲染編程,幾乎只認這樣的三維數據:頂點、頂點顏色、頂點法線、着色語言…

所以,三維圖形界的通用格式:glTF應運而生,它面向終點,它按照圖形編程所需的格式來存儲數據,藉以二進制編碼提高傳輸速度。

它不再使用面向對象的思維存儲三維模型、貼圖紋理,而是按顯卡的思維存儲,存的是頂點、法線、頂點顏色等最基礎的信息,只不過組織結構上進行了精心的設計。

它面向終點,就意味着可編輯性差,因為渲染性能的提高犧牲了可編輯性,它不再像3ds、dae甚至是max、skp一樣容易編輯和轉換。

事實上,大多數三維軟件提供了glTF格式的轉換,或多一步,或一步到位。

地理真三維

早年,地理的三維還處於地形三維上,即数字高程模型(DEM)提供地表的高度拉伸。柵格高程數據、等高線、不規則三角網等均是数字高程模型的具體案例。
下圖是不規則三角網,也即所謂的三角面片(圖形渲染中很常見):

隨着學科的融合、計算機技術和硬件的更新換代,使得有模型、有細節的真三維融入到GIS中成為了可能,或者說,計算機技術和硬件的升級,給GIS以更廣闊的視角觀察世界。

cesium.js 號稱是 webgl 封裝的三維地理庫,是支持 gltf 模型的加載的。

面對大規模精細三維數據的加載,還要照顧到GIS的各種坐標系統、分析計算,gltf這種單個模型的方案顯得力不從心。

2016年,Cesium 團隊借鑒傳統2DGIS的地圖規範——WMTS,借鑒圖形學中的層次細節模型,打造出大規模的三維數據標準—— 3d-Tiles,中文譯名:三維瓦片。

它在模型上利用了 gltf 渲染快的特點,對大規模的三維數據進行組織,包括層次細節模型、模型的屬性數據、模型的層級數據等。

3dTiles的設計思想

3dTiles 繼承了 gltf 的優點:貼合圖形渲染 API 的邏輯,討 GPU 喜愛,webgl 對其內部組織起來的三維模型數據,不需要轉換,可以直接渲染(glTF 的功勞)。

關於 glTF 是如何嵌入到 3dTiles 中的,開篇不談,後續精講。

我們區分一組概念:規範和實現。

3dTiles 是一種規範,在規範的指導下,各種資源文件可以是獨立存在於硬盤中的目錄、文件,也可以以二進制形式寫入數據庫中。目前,3dTiles 的官方實現只有 “散列文件”,也就是文件、文件夾的形式存儲在硬盤中,有關如何存儲到數據庫中的討論,官方仍在進行中(截至發博客)。

glTF 也是一種規範,它的數據文件不一定就是後綴名為 .gltf 的文件,也不一定只有一個文件(glTF 的文件還可以是二進制文件、紋理貼圖文件等,扯遠了哈)。
在本文,會嚴格指明是數據還是數據標準,如果我說的是 “XXX文件(例如 Bird.glb 文件)” ,那就是在指特定的文件。

3dTiles還有一個特點:那就是不記錄模型數據,只記錄各級“Tile”的邏輯關係,以及“Tile”自己的屬性信息。所謂的模型數據,是指三維模型的頂點、貼圖材質、法線、顏色等信息。邏輯關係是指,各級Tile是如何在空間中保持連續的,LOD是如何組織的。屬性信息就很簡單啦,門有門的生產商,窗戶有窗戶的使用年限等,往大了說,建築還有它自己的壽命、法人、施工單位等屬性信息。

3dTiles的特點總結如下:

  • 三維模型使用了 glTF 規範,繼承它的渲染高性能
  • 除了嵌入的 glTF,3dTiles 自己 只記錄各級Tile的空間邏輯關係(如何構成整個3dtiles)和屬性信息,以及模型與屬性如何掛接在一起的信息

我覺得你還是雲里霧裡的,下一節將展示3dTiles具體數據,說說3dTiles的組織結構,說說3dTiles中的”Tile”,也就是“三維瓦片數據”中的“瓦片”是什麼。

3dTiles系列博客最終目錄:

01 引入與博客目錄:3dTiles 數據規範詳解

02 Tileset與Tile

03 內嵌在瓦片文件中的兩大數據表

04.1 B3dm 類型

04.2 I3dm 類型

04.3 Pnts 類型

04.4 Cmpt 類型

04.5 未發布的瓦片規範

05 3dTiles強大的擴展能力

06 優缺點

07 與I3S比較

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

※推薦評價好的iphone維修中心

分類
發燒車訊

併發系列(一)——線程池源碼(ThreadPoolExecutor類)簡析

前言

  本文主要是結合源碼去線程池執行任務的過程,基於JDK 11,整個過程基本與JDK 8相同。

  個人水平有限,文中若有表達有誤的,歡迎大夥留言指出,謝謝了!

一、線程池簡介

  1.1 使用線程池的優點

    1)通過復用已創建的線程,降低資源的消耗(線程的創建/銷毀是要消耗資源的)、提高響應速度;

    2)管理線程的個數,線程的個數在初始化線程池的時候指定;

    3)統一管理線程,比如停止,stop()方法;

  1.2 線程池執行任務過程

    線程池執行任務的過程如下圖所示,主要分為以下4步,其中參數的含義會在後面詳細講解:

    1)判斷工作的線程是否小於核心線程數據(workerCountOf(c) < corePoolSize),若小於則會新建一個線程去執行任務,這一步僅僅的是根據線程個數決定;

    2)若核心線程池滿了,就會判斷線程池的狀態,若是running狀態,則嘗試加入任務隊列,若加入成功后還會做一些事情,後面詳細說;

    3)若任務隊列滿了,則加入失敗,此時會判斷整個線程池線程是否滿,若沒有則創建非核心線程執行任務;

    4)若線程池滿了,則根據拒絕測試處理無法執行的任務;

    整體過程如下圖:

二、ThreadPoolExecutor類解析

  2.1 ThreadPoolExecutor的構造函數

    ThreadPoolExecutor類一共提供了4個構造函數,涉及5~7個參數,下面就5個必備參數的構造函數進行說明:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    1)corePoolSize :初始化核心線程池中線程個數的大小;

    2)maxmumPoolSize:線程池中線程大小;

    3)keepAliveTime:非核心線程的超時時長;

      非核心線程空閑時常大於該值就會被終止。

    4)unit :keepAliveTime的單位,類型可以參見TimeUnit類;

    5)BlockingQueue workQueue:阻塞隊列,維護等待執行的任務;

  2.2  私有類Worker

    在ThreadPoolExecutor類中有兩個集合類型比較重要,一個是用於放置等待任務的workQueue,其類型是阻塞對列;一個是用於用於存放工作線程的works,其是Set類型,其中存放的類型是Worker。

    進一步簡化線程池執行過程,可以理解為works中的工作線程不停的去阻塞對列中取任務,執行結束,線程重新加入大works中。

    為此,有必要簡單了解一下Work類型的組成。

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /** Thread this worker is running in.  Null if factory fails. */
        //工作線程,由線程的工廠類初始化
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
        //不可重入的鎖
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        .......
    }

    Worker類繼承於隊列同步器(AbstractQueueSynchronizer),隊列同步器是採取鎖或其他同步組件的基礎框架,其主要結構是自旋獲取鎖的同步隊列和等待喚醒的等待隊列,其方法因此可以分為兩類:對state改變的方法 和 入、出隊列的方法,即獲取獲取鎖的資格的變化(可能描述的不準確)。關於隊列同步器後續博客會詳細分析,此處不展開討論。

    Work類中通過CAS設置狀態失敗后直接返回false,而不是判斷當前線程是否已獲取鎖來實現不可重入的鎖,源碼註釋中解釋這樣做的原因是因為避免work tash重新獲取到控制線程池全局的方法,如setCorePoolSize。

  2.3  拒絕策略類

    ThreadPoolExecutor的拒絕策略類是以私有類的方式實現的,有四種策略:

    1)AbortPolicy:丟棄任務並拋出RejectedExecutionException異常(默認拒絕處理策略)。

      2)DiscardPolicy:拋棄新來的任務,但是不拋出異常。

      3)DiscardOldestPolicy:拋棄等待隊列頭部(最舊的)的任務,然後重新嘗試執行程序(失敗則會重複此過程)。

      4)CallerRunsPolicy:由調用線程處理該任務。

    其代碼相對簡單,可以參考源碼。

三、任務執行過程分析

  3.1 execute(Runnable)方法

    execute(Runnable)方法的整體過程如上文1.2所述,其實現方式如下:

public void execute(Runnable command) {
        //執行的任務為空,直接拋出異常
        if (command == null)
            throw new NullPointerException();
        //ctl是ThreadPoolExecutor中很關鍵的一個AtomicInteger,主線程池的控制狀態
        int c = ctl.get();
        //1、判斷是否小於核心線程池的大小,若是則直接嘗試新建一個work線程
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //2、大於核心線程池的大小或新建work失敗(如創建thread失敗),會先判斷線程池是否是running狀態,若是則加入阻塞對列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //重新驗證線程池是否為running,若否,則嘗試從對列中刪除,成功后執行拒絕策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //若線程池的狀態為shutdown則,嘗試去執行完阻塞對列中的任務
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //3、新建非核心線程去執行任務,若失敗,則採取拒絕策略
        else if (!addWorker(command, false))
            reject(command);
    }

  3.2 addWorker(Runnable,boole)方法

    execute(Runnable)方法中,新建(非)核心線程執行任務主要是通過addWorker方法實現的,其執行過程如下:

private boolean addWorker(Runnable firstTask, boolean core) {
        //此處反覆檢查線程池的狀態以及工作線程是否超過給定的值
        retry:
        for (int c = ctl.get();;) {
            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;

            for (;;) {
            //核心和非核心線程的區別
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            //通過工廠方法初始化,可能失敗,即可能為null
            final Thread t = w.thread;
            if (t != null) {
            //獲取全局鎖
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    //線程池處於running狀態
                    //或shutdown狀態但無需要執行的task,個人理解為用於去阻塞隊列中取任務執行
                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //執行任務,這裡會執行thread的firstTask獲取阻塞對列中取任務
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
            //開始失敗,則會從workers中刪除新建的work,work數量減1,嘗試關閉線程池,這些過程會獲取全局鎖
                addWorkerFailed(w);
        }
        return workerStarted;
    }

  3.3  runWorker(this) 方法

     在3.2 中當新建的worker線程加入在workers中成功后,就會啟動對應任務,其調用的是Worker類中的run()方法,即調用runWorker(this)方法,其過程如下:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
        //while()循環中,前者是新建線程執行firstTask,對應線程個數小於核心線程和阻塞隊列滿的情況,
        //getTask()則是從阻塞對列中取任務執行
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                //僅線程池狀態為stop時,線程響應中斷,這裏也就解釋了調用shutdown時,正在工作的線程會繼續工作
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    try {
                    //執行任務
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    //完成的個數+1
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //處理後續工作
            processWorkerExit(w, completedAbruptly);
        }
    }

   3.4 processWorkerExit(Worker,boole)方法

    當任務執行結果后,在滿足一定條件下會新增一個worker線程,代碼如下:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            //對工作線程的增減需要加全局鎖
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        //嘗試終止線程池
        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
        //線程不是中斷,會維持最小的個數
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            //執行完任務后,線程重新加入workers中
            addWorker(null, false);
        }
    }

  至此,線程池執行任務的過程分析結束,其他方法的實現過程可以參考源碼。

 

Ref:

[1]http://concurrent.redspider.group/article/03/12.html

[2]《Java併發編程的藝術》

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※回頭車貨運收費標準

分類
發燒車訊

Dart Memo for Android Developers

Dart Memo for Android Developers

Dart語言一些語法特點和編程規範.

本文適合: 日常使用Kotlin, 突然想寫個Flutter程序的Android程序員.

Dart語言

完整的請看A tour of the Dart language

  • 創建對象可以不用new. -> 並且規範不讓用new, lint會報錯.
  • 聲明變量可以用var, 也可以用具體類型如String. 不變量用final, 常量用const.
  • 沒有訪問修飾符, 用_來表示私有: 文件級別.
  • 字符串可以用單引號'.
  • 語句結尾要用;.
  • 創建數組可以用: var list = [1, 2, 3];.
  • assert()常用來斷定開發時不可能會出現的情況.
  • 空測試操作符: ??.
  • 過濾操作符: where.
  • 兩個點..表示鏈式調用.
  • dynamic說明類型未指定.
  • 除了throw異常, 還可以throw別的東西, 比如字符串.

函數

  • 函數返回值在函數最開頭, 可以不標. -> 但是規範會建議標註返回值.
bool isNoble(int atomicNumber) {
  return _nobleGases[atomicNumber] != null;
}
  • =>箭頭符號, 用來簡化一句話的方法.
bool isNoble(int atomicNumber) => _nobleGases[atomicNumber] != null;

構造函數

  • 構造函數{}表示帶名字, 參數可選, 若要必選加上@required.
const Scrollbar({Key key, @required Widget child})
  • 構造函數名可以是ClassName或者ClassName.identifier.
  • 空構造函數體可以省略, 用;結尾就行:
class Point {
  double x, y;
  Point(this.x, this.y);
}

這裡會初始化相應的變量, 也不用聲明具體的參數類型.

  • factory構造, 可以用來返回緩存實例, 或者返回類型的子類:
factory Logger(String name) {
    return _cache.putIfAbsent(name, () => Logger._internal(name));
}

異步代碼

Future<String> lookUpVersion() async => '1.0.0';

Future checkVersion() async {
  var version = await lookUpVersion();
  // Do something with version
}

編程規範類

完整的規範在這裏: Effective Dart.

有一些Good和Bad的舉例, 這裏僅列出比較常用的幾項.

  • 文件名要蛇形命名: lowercase_with_underscores. 類名: UpperCamelCase.
  • 對自己程序的文件, 兩種import都可以(package開頭或者相對路徑), 但是要保持一致.
  • Flutter程序嵌套比較多, 要用結尾的,來幫助格式化.

本文緣由

年初的時候學了一陣子Flutter, 寫了各種大小demo. 結果隔了兩個月之後, 突然心血來潮想寫個小東西, 打開Android Studio, 首先發現創建Flutter程序的按鈕都不見了. (估計是Android Studio4.0升級之後Flutter的插件沒跟上).

接着用命令行創建了工程, 打開之後稍微整理了一下心情, 然後就….懵逼了.

突然不知道如何下手.
宏觀的東西還記得, 要用什麼package, 基本常用的幾個Widget都是啥, 但是微觀的, 忘了函數和數組都是咋定義的了.
這種懵逼的狀態令我很憤怒, 果然是上年紀了嗎, 無縫切換個語言都不行.

於是就想着還是寫個備忘錄吧.

References

  • A tour of the Dart language
  • Effective Dart

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

※教你寫出一流的銷售文案?

※超省錢租車方案

分類
發燒車訊

【asp.net core 系列】9 實戰之 UnitOfWork以及自定義代碼生成

0. 前言

在前一篇中我們創建了一個基於EF的數據查詢接口實現基類,這一篇我將帶領大家講一下為這EF補充一些功能,並且提供一個解決避免寫大量配置類的方案。

1. SaveChanges的外移

在之前介紹EF Core的時候,我們提到過使用EF需要在每次使用之後,調用一次SaveChanges將數據提交給數據庫。在實際開發中,我們不能添加一條數據或者做一次修改就調用一次SaveChanges,這完全不現實。因為每次調用SaveChanges是EF向數據庫提交變更的時候,所以EF推薦的是每次執行完用戶的請求之後統一提交數據給數據庫。

這樣就會造成一個問題,可能也不是問題:我們需要一個接口來管理EF 的SaveChanges操作。

1.1 創建一個IUnitOfWork接口

通常我們會在Domain項目中添加一個IUnitOfWork接口,這個接口有一個方法就是SaveChanges,代碼如下:

namespace Domain.Insfrastructure
{
    public interface IUnitOfWork
    {
        void SaveChanges();
    }
}

這個方法的意思表示到執行該方法的時候,一個完整的工作流程執行完成了。也就是說,當執行該方法后,當前請求不會再與數據庫發生連接。

1.2 實現IUnitOfWork接口

在 Domain.Implement中添加IUnitOfWork實現類:

using Domain.Insfrastructure;
using Microsoft.EntityFrameworkCore;

namespace Domain.Implements.Insfrastructure
{
    public class UnitOfWork: IUnitOfWork
    {
        private DbContext DbContext;
        public UnitOfWork(DbContext context)
        {
            DbContext = context;
        }

        public void SaveChanges()
        {
            DbContext.SaveChanges();
        }
    }
}

1.3 調用時機

到現在我們已經創建了一個UnitOfWork的方法,那麼問題來了,我們該在什麼時候調用呢,或者說如何調用呢?

我的建議是創建一個ActionFilter,針對所有的控制器進行SaveChanges進行處理。當然了,也可以在控制器中持有一個IUnitOfWork的示例,然後在Action結束的時候,執行SaveChanges。不過這樣存在一個問題,可能會存在遺漏的方法。所以我推薦這樣操作,這裏簡單演示一下如何創建攔截器:

在Web的根目錄下,創建一個Filters目錄,這個目錄里用來存儲一些過濾器,創建我們需要的過濾器:

using Domain.Insfrastructure;
using Microsoft.AspNetCore.Mvc.Filters;

namespace Web.Filters
{
    public class UnitOfWorkFilterAttribute : ActionFilterAttribute
    {
        public IUnitOfWork UnitOfWork;

        public override void OnActionExecuted(ActionExecutedContext context)
        {
            UnitOfWork.SaveChanges();
        }
    }
}

使用一個ActionFilter可以很方便的解決一些容易遺漏但又必須執行的代碼。這裏就先不介紹如何配置Filter的啟用和詳細介紹了,請允許我賣個關子。當然了,有些小夥伴肯定也能猜到這是一個Attribute類,所以可以按照Attribute給Controller打標記。

2. 創建一個簡單的代碼生成方法

之前在介紹EF的時候,有個小夥伴跟我說,還要寫配置文件啊,太麻煩了。是的,之前我介紹了很多關於寫配置文件不使用特性的好處,但不解決這個問題就無法真正體檢配置類的好處。

雖然說,EF Core約定優先,但是如果默認約定的話,得在DBContext中聲明 DbSet<T> 來聲明這個字段,實體類少的話,比較簡單。如果多個數據表的話,就會非常麻煩。

所以這時候就要使用工具類, 那麼簡單的分析一下,這個工具類需要有哪些功能:

  • 第一步,找到實體類並解析出實體類的類名
  • 第二步,生成配置文件
  • 第三步,創建對應的Repository接口和實現類

很簡單的三步,但是難點就是找實體類並解析出實體類名。

在Util項目中添加一個Develop目錄,並創建Develop類:

namespace Utils.Develop
{
    public class Develop
    {
        
    }
}

定位當前類所在目錄,通過

Directory.GetCurrentDirectory()

這個方法可以獲取當前執行的DLL所在目錄,當然不同的編譯器在執行的時候,會有微妙的不同。所以我們需要以此為根據然後獲取項目的根目錄,一個簡單的方法,查找*.sln 所在目錄:

public static string CurrentDirect
{
    get
    {
        var execute = Directory.GetCurrentDirectory();
        var parent = Directory.GetParent(execute);
        while(parent.GetFiles("*.sln",SearchOption.TopDirectoryOnly).Length == 0)
        {
            parent = parent.Parent;
            if(parent == null)
            {
                return null;
            }
        }
        return parent.FullName;
    }
}

2.1 獲取實體類

那麼獲取到根目錄之後,我們下一步就是獲取實體類。因為我們的實體類都要求是繼承BaseEntity或者命名空間都是位於Data.Models下面。當然這個名稱都是根據實際業務場景約束的,這裏只是以當前項目舉例。那麼,我們可以通過以下方法找到我們設置的實體類:

public static Type[] LoadEntities()
{
    var assembly = Assembly.Load("Data");
    var allTypes = assembly.GetTypes();
    var ofNamespace = allTypes.Where(t => t.Namespace == "Data.Models" || t.Namespace.StartsWith("Data.Models."));
    var subTypes = allTypes.Where(t => t.BaseType.Name == "BaseEntity`1");
    return ofNamespace.Union(subTypes).ToArray();
}

通過 Assembly加載Data的程序集,然後選擇出符合我們要求的實體類。

2.2 編寫Repository接口

我們先約定Model的Repository接口定義在 Domain/Repository目錄下,所以它們的命名空間應該是:

namespace Domain.Repository	
{
}

假設目錄情況與Data/Models下面的代碼結構保持一致,然後生成代碼應該如下:

public static void CreateRepositoryInterface(Type type)
{
    var targetNamespace = type.Namespace.Replace("Data.Models", "");
    if (targetNamespace.StartsWith("."))
    {
        targetNamespace = targetNamespace.Remove(0);
    }
    var targetDir = Path.Combine(new[]{CurrentDirect,"Domain", "Repository"}.Concat(
        targetNamespace.Split('.')).ToArray());
    if (!Directory.Exists(targetDir))
    {
        Directory.CreateDirectory(targetDir);
    }

    var baseName = type.Name.Replace("Entity","");

    if (!string.IsNullOrEmpty(targetNamespace))
    {
        targetNamespace = $".{targetNamespace}";
    }
    var file = $"using {type.Namespace};\r\n"
        + $"using Domain.Insfrastructure;\r\n"
        + $"namespace Domain.Repository{targetNamespace}\r\n"
        + "{\r\n"
        + $"\tpublic interface I{baseName}ModifyRepository : IModifyRepository<{type.Name}>\r\n" +
        "\t{\r\n\t}\r\n"
        + $"\tpublic interface I{baseName}SearchRepository : ISearchRepository<{type.Name}>\r\n" +
        "\t{\r\n\t}\r\n}";

    File.WriteAllText(Path.Combine(targetDir, $"{baseName}Repository.cs"), file);
}

2.3 編寫Repository的實現類

因為我們提供了一個基類,所以我們在生成方法的時候,推薦繼承這個類,那麼實現方法應該如下:

public static void CreateRepositoryImplement(Type type)
{
    var targetNamespace = type.Namespace.Replace("Data.Models", "");
    if (targetNamespace.StartsWith("."))
    {
        targetNamespace = targetNamespace.Remove(0);
    }

    var targetDir = Path.Combine(new[] {CurrentDirect, "Domain.Implements", "Repository"}.Concat(
        targetNamespace.Split('.')).ToArray());
    if (!Directory.Exists(targetDir))
    {
        Directory.CreateDirectory(targetDir);
    }
    var baseName = type.Name.Replace("Entity", "");
    if (!string.IsNullOrEmpty(targetNamespace))
    {
        targetNamespace = $".{targetNamespace}";
    }

    var file = $"using {type.Namespace};" +
        $"\r\nusing Domain.Implements.Insfrastructure;" +
        $"\r\nusing Domain.Repository{targetNamespace};" +
        $"\r\nusing Microsoft.EntityFrameworkCore;" +
        $"namespace Domain.Implements.Repository{targetNamespace}\r\n" +
        "{" +
        $"\r\n\tpublic class {baseName}Repository :BaseRepository<{type.Name}> ,I{baseName}ModifyRepository,I{baseName}SearchRepository " +
        "\r\n\t{" +
        $"\r\n\t\tpublic {baseName}Repository(DbContext context) : base(context)"+
        "\r\n\t\t{"+
        "\r\n\t\t}\r\n"+
        "\t}\r\n}";
    File.WriteAllText(Path.Combine(targetDir, $"{baseName}Repository.cs"), file);
}

2.4 配置文件的生成

仔細觀察一下代碼,可以發現整體都是十分簡單的。所以這篇就不掩飾如何生成配置文件了,小夥伴們可以自行嘗試一下哦。具體實現可以等一下篇哦。

3. 總結

這一篇初略的介紹了兩個用來輔助EF Core實現的方法或類,這在開發中很重要。UnitOfWork用來確保一次請求一個工作流程,簡單的代碼生成類讓我們能讓我們忽略那些繁重的創建同類代碼的工作。

更多內容煩請關注我的博客《高先生小屋》

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

網頁設計最專業,超強功能平台可客製化

分類
發燒車訊

曹工說Redis源碼(8)–面試時,redis 內存淘汰總被問,但是總答不好

文章導航

Redis源碼系列的初衷,是幫助我們更好地理解Redis,更懂Redis,而怎麼才能懂,光看是不夠的,建議跟着下面的這一篇,把環境搭建起來,後續可以自己閱讀源碼,或者跟着我這邊一起閱讀。由於我用c也是好幾年以前了,些許錯誤在所難免,希望讀者能不吝指出。

曹工說Redis源碼(1)– redis debug環境搭建,使用clion,達到和調試java一樣的效果

曹工說Redis源碼(2)– redis server 啟動過程解析及簡單c語言基礎知識補充

曹工說Redis源碼(3)– redis server 啟動過程完整解析(中)

曹工說Redis源碼(4)– 通過redis server源碼來理解 listen 函數中的 backlog 參數

曹工說Redis源碼(5)– redis server 啟動過程解析,以及EventLoop每次處理事件前的前置工作解析(下)

曹工說Redis源碼(6)– redis server 主循環大體流程解析

曹工說Redis源碼(7)– redis server 的周期執行任務,到底要做些啥

什麼是內存淘汰

內存淘汰,和平時我們設置redis key的過期時間,不是一回事;內存淘汰是說,假設我們限定redis只能使用8g內存,現在已經使用了這麼多了(包括設置了過期時間的key和沒設過期時間的key),那,後續的set操作,還怎麼辦呢?

是不是只能報錯了?

那不行啊,不科學吧,因為有的key,可能已經很久沒人用了,可能以後也不會再用到了,那我們是不是可以把這類key給幹掉呢?

幹掉key的過程,就是內存淘汰。

內存淘汰什麼時候啟用

當我們在配置文件里設置了如下屬性時:

# maxmemory <bytes>

默認,該屬性是被註釋掉的。

其實,這個配置項的註釋,相當有價值,我們來看看:

# Don't use more memory than the specified amount of bytes.
# When the memory limit is reached Redis will try to remove keys
# according to the eviction policy selected (see maxmemory-policy).
#
# If Redis can't remove keys according to the policy, or if the policy is
# set to 'noeviction', Redis will start to reply with errors to commands
# that would use more memory, like SET, LPUSH, and so on, and will continue
# to reply to read-only commands like GET.
#
# This option is usually useful when using Redis as an LRU cache, or to set
# a hard memory limit for an instance (using the 'noeviction' policy).
#
# WARNING: If you have slaves attached to an instance with maxmemory on,
# the size of the output buffers needed to feed the slaves are subtracted
# from the used memory count, so that network problems / resyncs will
# not trigger a loop where keys are evicted, and in turn the output
# buffer of slaves is full with DELs of keys evicted triggering the deletion
# of more keys, and so forth until the database is completely emptied.
#
# In short... if you have slaves attached it is suggested that you set a lower
# limit for maxmemory so that there is some free RAM on the system for slave
# output buffers (but this is not needed if the policy is 'noeviction').
#
# maxmemory <bytes>

渣翻譯如下:

不能使用超過指定數量bytes的內存。當該內存限制被達到時,redis會根據過期策略(eviction policy,通過參數 maxmemory-policy來指定)來驅逐key。

如果redis根據指定的策略,或者策略被設置為“noeviction”,redis會開始針對如下這種命令,回復錯誤。什麼命令呢?會使用更多內存的那類命令,比如set、lpush;只讀命令還是不受影響,可以正常響應。

該選項通常在redis使用LRU緩存時有用,或者在使用noeviction策略時,設置一個進程級別的內存limit。

內存淘汰策略

所謂策略,意思是,當我們要刪除部分key的時候,刪哪些,不刪哪些?是不是需要一個策略?比如是隨機刪,就像滅霸一樣?還是按照lru時間來刪,lru的策略意思就是,最近最少使用的key,將被優先刪除。

總之,我們需要定一個規則。

redis默認支持以下策略:

# MAXMEMORY POLICY: how Redis will select what to remove when maxmemory
# is reached. You can select among five behaviors:
# 
# volatile-lru -> remove the key with an expire set using an LRU algorithm
# allkeys-lru -> remove any key accordingly to the LRU algorithm
# volatile-random -> remove a random key with an expire set
# allkeys-random -> remove a random key, any key
# volatile-ttl -> remove the key with the nearest expire time (minor TTL)
# noeviction -> don't expire at all, just return an error on write operations
# 
# Note: with any of the above policies, Redis will return an error on write
#       operations, when there are not suitable keys for eviction.
#
#       At the date of writing this commands are: set setnx setex append
#       incr decr rpush lpush rpushx lpushx linsert lset rpoplpush sadd
#       sinter sinterstore sunion sunionstore sdiff sdiffstore zadd zincrby
#       zunionstore zinterstore hset hsetnx hmset hincrby incrby decrby
#       getset mset msetnx exec sort
#
# The default is:
#
# maxmemory-policy noeviction
maxmemory-policy allkeys-lru
針對設置了過期時間的,使用lru算法
# volatile-lru -> remove the key with an expire set using an LRU algorithm

針對全部key,使用lru算法
# allkeys-lru -> remove any key accordingly to the LRU algorithm

針對設置了過期時間的,隨機刪
# volatile-random -> remove a random key with an expire set

針對全部key,隨機刪
# allkeys-random -> remove a random key, any key

針對設置了過期時間的,馬上要過期的,刪掉
# volatile-ttl -> remove the key with the nearest expire time (minor TTL)

不過期,不能寫了,就報錯
# noeviction -> don't expire at all, just return an error on write operations

一般呢,我們會設置為:

allkeys-lru,即,針對全部key,進行lru。

源碼實現

配置讀取

在如下結構體中,定義了如下字段:

struct redisServer {
	...
	unsigned long long maxmemory;   /* Max number of memory bytes to use */
    int maxmemory_policy;           /* Policy for key eviction */
    int maxmemory_samples;          /* Pricision of random sampling */
    ...
}

當我們在配置文件中,進入如下配置時,該結構體中幾個字段的值如下:

maxmemory 3mb
maxmemory-policy allkeys-lru
# maxmemory-samples 5  這個取了默認值

maxmemory_policy為3,是因為枚舉值為3:

#define REDIS_MAXMEMORY_VOLATILE_LRU 0
#define REDIS_MAXMEMORY_VOLATILE_TTL 1
#define REDIS_MAXMEMORY_VOLATILE_RANDOM 2
#define REDIS_MAXMEMORY_ALLKEYS_LRU 3
#define REDIS_MAXMEMORY_ALLKEYS_RANDOM 4
#define REDIS_MAXMEMORY_NO_EVICTION 5
#define REDIS_DEFAULT_MAXMEMORY_POLICY REDIS_MAXMEMORY_NO_EVICTION

處理命令時,判斷是否進行內存淘汰

在處理命令的時候,會調用中的

redis.c  processCommand
    
int processCommand(redisClient *c) {
    /* The QUIT command is handled separately. Normal command procs will
     * go through checking for replication and QUIT will cause trouble
     * when FORCE_REPLICATION is enabled and would be implemented in
     * a regular command proc. */
    // 特別處理 quit 命令
    void *commandName = c->argv[0]->ptr;
    redisLog(REDIS_NOTICE, "The server is now processing %s", commandName);

    if (!strcasecmp(c->argv[0]->ptr, "quit")) {
        addReply(c, shared.ok);
        c->flags |= REDIS_CLOSE_AFTER_REPLY;
        return REDIS_ERR;
    }

    /* Now lookup the command and check ASAP about trivial error conditions
     * such as wrong arity, bad command name and so forth. */
    // 1 查找命令,並進行命令合法性檢查,以及命令參數個數檢查
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    if (!c->cmd) {
        // 沒找到指定的命令
        flagTransaction(c);
        addReplyErrorFormat(c, "unknown command '%s'",
                            (char *) c->argv[0]->ptr);
        return REDIS_OK;
    }

    /* Check if the user is authenticated */
    //2 檢查認證信息
    if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) {
        flagTransaction(c);
        addReply(c, shared.noautherr);
        return REDIS_OK;
    }

    /* If cluster is enabled perform the cluster redirection here.
     *
     * 3 如果開啟了集群模式,那麼在這裏進行轉向操作。
     *
     * However we don't perform the redirection if:
     *
     * 不過,如果有以下情況出現,那麼節點不進行轉向:
     *
     * 1) The sender of this command is our master.
     *    命令的發送者是本節點的主節點
     *
     * 2) The command has no key arguments. 
     *    命令沒有 key 參數
     */
    if (server.cluster_enabled &&
        !(c->flags & REDIS_MASTER) &&
        !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) {
        int hashslot;

        // 集群已下線
        if (server.cluster->state != REDIS_CLUSTER_OK) {
            flagTransaction(c);
            addReplySds(c, sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n"));
            return REDIS_OK;

            // 集群運作正常
        } else {
            int error_code;
            clusterNode *n = getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, &error_code);
            // 不能執行多鍵處理命令
            if (n == NULL) {
                flagTransaction(c);
                if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) {
                    addReplySds(c, sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
                } else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) {
                    /* The request spawns mutliple keys in the same slot,
                     * but the slot is not "stable" currently as there is
                     * a migration or import in progress. */
                    addReplySds(c, sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
                } else {
                    redisPanic("getNodeByQuery() unknown error.");
                }
                return REDIS_OK;

                //3.1 命令針對的槽和鍵不是本節點處理的,進行轉向
            } else if (n != server.cluster->myself) {
                flagTransaction(c);
                // -<ASK or MOVED> <slot> <ip>:<port>
                // 例如 -ASK 10086 127.0.0.1:12345
                addReplySds(c, sdscatprintf(sdsempty(),
                                            "-%s %d %s:%d\r\n",
                                            (error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
                                            hashslot, n->ip, n->port));

                return REDIS_OK;
            }

            // 如果執行到這裏,說明鍵 key 所在的槽由本節點處理
            // 或者客戶端執行的是無參數命令
        }
    }

    /* Handle the maxmemory directive.
     *
     * First we try to free some memory if possible (if there are volatile
     * keys in the dataset). If there are not the only thing we can do
     * is returning an error. */
    //4 如果設置了最大內存,那麼檢查內存是否超過限制,並做相應的操作
    if (server.maxmemory) {
        //4.1 如果內存已超過限制,那麼嘗試通過刪除過期鍵來釋放內存
        int retval = freeMemoryIfNeeded();
        // 如果即將要執行的命令可能佔用大量內存(REDIS_CMD_DENYOOM)
        // 並且前面的內存釋放失敗的話
        // 那麼向客戶端返回內存錯誤
        if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
            flagTransaction(c);
            addReply(c, shared.oomerr);
            return REDIS_OK;
        }
    }    
    ....
  • 1處,查找命令,對應的函數指針(類似於java里的策略模式,根據命令,找對應的策略)
  • 2處,檢查,是否密碼正確
  • 3處,集群相關操作;
  • 3.1處,不是本節點處理,直接返回ask,指示客戶端轉向
  • 4處,判斷是否設置了maxMemory,這裏就是本文重點:設置了maxMemory時,內存淘汰策略
  • 4.1處,調用了下方的 freeMemoryIfNeeded

接下來,深入4.1處:


int freeMemoryIfNeeded(void) {
    size_t mem_used, mem_tofree, mem_freed;
    int slaves = listLength(server.slaves);

    /* Remove the size of slaves output buffers and AOF buffer from the
     * count of used memory. */
    // 計算出 Redis 目前佔用的內存總數,但有兩個方面的內存不會計算在內:
    // 1)從服務器的輸出緩衝區的內存
    // 2)AOF 緩衝區的內存
    mem_used = zmalloc_used_memory();
    if (slaves) {
		...
    }
    if (server.aof_state != REDIS_AOF_OFF) {
        mem_used -= sdslen(server.aof_buf);
        mem_used -= aofRewriteBufferSize();
    }

    /* Check if we are over the memory limit. */
    //1 如果目前使用的內存大小比設置的 maxmemory 要小,那麼無須執行進一步操作
    if (mem_used <= server.maxmemory) return REDIS_OK;

    //2 如果佔用內存比 maxmemory 要大,但是 maxmemory 策略為不淘汰,那麼直接返回
    if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION)
        return REDIS_ERR; /* We need to free memory, but policy forbids. */

    /* Compute how much memory we need to free. */
    // 3 計算需要釋放多少字節的內存
    mem_tofree = mem_used - server.maxmemory;

    // 初始化已釋放內存的字節數為 0
    mem_freed = 0;

    // 根據 maxmemory 策略,
    //4 遍歷字典,釋放內存並記錄被釋放內存的字節數
    while (mem_freed < mem_tofree) {
        int j, k, keys_freed = 0;

        // 遍歷所有字典
        for (j = 0; j < server.dbnum; j++) {
            long bestval = 0; /* just to prevent warning */
            sds bestkey = NULL;
            dictEntry *de;
            redisDb *db = server.db + j;
            dict *dict;

            if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||
                server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM) {
                // 如果策略是 allkeys-lru 或者 allkeys-random 
                //5 那麼淘汰的目標為所有數據庫鍵
                dict = server.db[j].dict;
            } else {
                // 如果策略是 volatile-lru 、 volatile-random 或者 volatile-ttl 
                //6 那麼淘汰的目標為帶過期時間的數據庫鍵
                dict = server.db[j].expires;
            }


            /* volatile-random and allkeys-random policy */
            // 如果使用的是隨機策略,那麼從目標字典中隨機選出鍵
            if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM ||
                server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_RANDOM) {
                de = dictGetRandomKey(dict);
                bestkey = dictGetKey(de);
            }
            /* volatile-lru and allkeys-lru policy */
            //7 
            else if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||
                     server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU) {
                struct evictionPoolEntry *pool = db->eviction_pool;

                while (bestkey == NULL) {
                    // 8 
                    evictionPoolPopulate(dict, db->dict, db->eviction_pool);
                    /* Go backward from best to worst element to evict. */
                    for (k = REDIS_EVICTION_POOL_SIZE - 1; k >= 0; k--) {
                        if (pool[k].key == NULL) continue;
                        // 8.1
                        de = dictFind(dict, pool[k].key);

                        /* 8.2 Remove the entry from the pool. */
                        sdsfree(pool[k].key);
                        /* Shift all elements on its right to left. */
                        memmove(pool + k, pool + k + 1,
                                sizeof(pool[0]) * (REDIS_EVICTION_POOL_SIZE - k - 1));
                        /* Clear the element on the right which is empty
                         * since we shifted one position to the left.  */
                        pool[REDIS_EVICTION_POOL_SIZE - 1].key = NULL;
                        pool[REDIS_EVICTION_POOL_SIZE - 1].idle = 0;

                        /* If the key exists, is our pick. Otherwise it is
                         * a ghost and we need to try the next element. */
                        // 8.3
                        if (de) {
                            bestkey = dictGetKey(de);
                            break;
                        } else {
                            /* Ghost... */
                            continue;
                        }
                    }
                }
            }

                /* volatile-ttl */
                // 策略為 volatile-ttl ,從一集 sample 鍵中選出過期時間距離當前時間最接近的鍵
            else if (server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_TTL) {
                ...
            }

            /* Finally remove the selected key. */
            // 8.4 刪除被選中的鍵
            if (bestkey) {
                long long delta;

                robj *keyobj = createStringObject(bestkey, sdslen(bestkey));
                propagateExpire(db, keyobj);
                /* We compute the amount of memory freed by dbDelete() alone.
                 * It is possible that actually the memory needed to propagate
                 * the DEL in AOF and replication link is greater than the one
                 * we are freeing removing the key, but we can't account for
                 * that otherwise we would never exit the loop.
                 *
                 * AOF and Output buffer memory will be freed eventually so
                 * we only care about memory used by the key space. */
                // 計算刪除鍵所釋放的內存數量
                delta = (long long) zmalloc_used_memory();
                dbDelete(db, keyobj);
                delta -= (long long) zmalloc_used_memory();
                mem_freed += delta;

                // 對淘汰鍵的計數器增一
                server.stat_evictedkeys++;

                notifyKeyspaceEvent(REDIS_NOTIFY_EVICTED, "evicted",
                                    keyobj, db->id);
                decrRefCount(keyobj);
                keys_freed++;
				...
            }
        }

        if (!keys_freed) return REDIS_ERR; /* nothing to free... */
    }

    return REDIS_OK;
}
  • 1處,如果目前使用的內存大小比設置的 maxmemory 要小,那麼無須執行進一步操作

  • 2處,如果佔用內存比 maxmemory 要大,但是 maxmemory 策略為不淘汰,那麼直接返回

  • 3處,計算需要釋放多少字節的內存

  • 4處,遍歷字典,釋放內存並記錄被釋放內存的字節數

  • 5處,如果策略是 allkeys-lru 或者 allkeys-random 那麼淘汰的目標為所有數據庫鍵

  • 6處,如果策略是 volatile-lru 、 volatile-random 或者 volatile-ttl ,那麼淘汰的目標為帶過期時間的數據庫鍵

  • 7處,如果使用的是 LRU 策略, 那麼從 sample 鍵中選出 IDLE 時間最長的那個鍵

  • 8處,調用evictionPoolPopulate,該函數在下面講解,該函數的功能是,傳入一個鏈表,即這裏的db->eviction_pool,然後在函數內部,隨機找出n個key,放入傳入的鏈表中,並按照空閑時間排序,空閑最久的,放到最後。

    當該函數,返回后,db->eviction_pool這個鏈表裡就存放了我們要淘汰的key。

  • 8.1處,找到這個key,這個key,在後邊會被刪除

  • 8.2處,下面這一段,從db->eviction_pool將這個已經處理了的key刪掉

  • 8.3處,如果這個key,是存在的,則跳出循環,在後面8.4處,會被刪除

  • 8.4處,刪除這個key

選擇哪些key作為被淘汰的key

前面我們看到,在7處,如果為lru策略,則會進入8處的函數:

evictionPoolPopulate。

該函數的名稱為:填充(populate)驅逐(eviction)對象池(pool)。驅逐的意思,就是現在達到了maxmemory,沒辦法,只能開始刪除掉一部分元素,來騰空間了,不然新的put類型的命令,根本沒辦法執行。

該方法的大概思路是,使用lru的時候,隨機找n個key,類似於抽樣,然後放到一個鏈表,根據空閑時間排序。

具體看看該方法的實現:

void evictionPoolPopulate(dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {

其中,傳入的第三個參數,是要被填充的對象,在c語言中,習慣傳入一個入參,然後在函數內部填充或者修改入參對象的屬性。

該屬性,就是前面說的那個鏈表,用來存放收集的隨機的元素,該鏈表中節點的結構如下:

struct evictionPoolEntry {
    unsigned long long idle;    /* Object idle time. */
    sds key;                    /* Key name. */
};

該結構共2個字段,一個存儲key,一個存儲空閑時間。

該鏈表中,共maxmemory-samples個元素,會按照idle時間長短排序,idle時間長的在鏈表尾部,(假設頭在左,尾在右)。

void evictionPoolPopulate(dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
    int j, k, count;
    dictEntry *_samples[EVICTION_SAMPLES_ARRAY_SIZE];
    dictEntry **samples;

    /* Try to use a static buffer: this function is a big hit...
     * Note: it was actually measured that this helps. */
    if (server.maxmemory_samples <= EVICTION_SAMPLES_ARRAY_SIZE) {
        samples = _samples;
    } else {
        samples = zmalloc(sizeof(samples[0]) * server.maxmemory_samples);
    }

    /* 1 Use bulk get by default. */
    count = dictGetRandomKeys(sampledict, samples, server.maxmemory_samples);

	// 2
    for (j = 0; j < count; j++) {
        unsigned long long idle;
        sds key;
        robj *o;
        dictEntry *de;

        de = samples[j];
        key = dictGetKey(de);
        /* If the dictionary we are sampling from is not the main
         * dictionary (but the expires one) we need to lookup the key
         * again in the key dictionary to obtain the value object. */
        if (sampledict != keydict) de = dictFind(keydict, key);
        // 3
        o = dictGetVal(de);
        // 4
        idle = estimateObjectIdleTime(o);

        /* 5  Insert the element inside the pool.
         * First, find the first empty bucket or the first populated
         * bucket that has an idle time smaller than our idle time. */
        k = 0;
        while (k < REDIS_EVICTION_POOL_SIZE &&
               pool[k].key &&
               pool[k].idle < idle)
            k++;
        
		...
            
        // 6
        pool[k].key = sdsdup(key);
        pool[k].idle = idle;
    }
    if (samples != _samples) zfree(samples);
}
  • 1處,獲取 server.maxmemory_samples個key,這裡是隨機獲取的,(dictGetRandomKeys),這個值,默認值為5,放到samples中

  • 2處,遍歷返回來的samples

  • 3處,調用如下宏,獲取val

    he的類型為dictEntry:

    /*
     * 哈希表節點
     */
    typedef struct dictEntry {
        
        // 鍵
        void *key;
    
        // 值
        union {
            // 1
            void *val;
            uint64_t u64;
            int64_t s64;
        } v;
    
        // 指向下個哈希表節點,形成鏈表
        struct dictEntry *next;
    
    } dictEntry;
    

    所以,這裏去

    robj *o; 
    
    o = dictGetVal(de);
    

    實際就是獲取其v屬性中的val,(1處):

    #define dictGetVal(he) ((he)->v.val)
    
  • 4處,準備計算該val的空閑時間

    我們上面3處,看到,獲取的o的類型為robj。我們現在看看怎麼計算對象的空閑時長:

    /* Given an object returns the min number of milliseconds the object was never
     * requested, using an approximated LRU algorithm. */
    unsigned long long estimateObjectIdleTime(robj *o) {
        //4.1 獲取系統的當前時間
        unsigned long long lruclock = LRU_CLOCK();
        // 4.2
        if (lruclock >= o->lru) {
            // 4.3
            return (lruclock - o->lru) * REDIS_LRU_CLOCK_RESOLUTION;
        } else {
            return (lruclock + (REDIS_LRU_CLOCK_MAX - o->lru)) *
                        REDIS_LRU_CLOCK_RESOLUTION;
        }
    }
    

    這裏,4.1處,獲取系統的當前時間;

    4.2處,如果系統時間,大於對象的lru時間

    4.3處,則用系統時間減去對象的lru時間,再乘以單位,換算為毫秒,最終返回的單位,為毫秒(可以看註釋。)

    #define REDIS_LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */
    
  • 5處,這裏拿當前元素,和pool中已經放進去的元素,從第0個開始比較,如果當前元素的idle時長,大於pool中指針0指向的元素,則和pool中索引1的元素比較;直到條件不滿足為止。

    這句話意思就是,類似於冒泡,把當前元素一直往後冒,直到idle時長小於被比較的元素為止。

  • 6處,把當前元素放進pool中。

經過上面的處理后,鏈表中存放了全部的抽樣元素,且ide時間最長的,在最右邊。

對象還有字段存儲空閑時間?

前面4處,說到,用系統的當前時間,減去對象的lru時間。

大家看看對象的結構體

typedef struct redisObject {

    // 類型
    unsigned type:4;

    // 編碼
    unsigned encoding:4;

    //1 對象最後一次被訪問的時間
    unsigned lru:REDIS_LRU_BITS; /* lru time (relative to server.lruclock) */

    // 引用計數
    int refcount;

    // 指向實際值的指針
    void *ptr;

} robj;

上面1處,lru屬性,就是用來存儲這個。

創建對象時,直接使用當前系統時間創建

robj *createObject(int type, void *ptr) {

    robj *o = zmalloc(sizeof(*o));

    o->type = type;
    o->encoding = REDIS_ENCODING_RAW;
    o->ptr = ptr;
    o->refcount = 1;

    /*1 Set the LRU to the current lruclock (minutes resolution). */
    o->lru = LRU_CLOCK();
    return o;
}

1處即是。

robj *createEmbeddedStringObject(char *ptr, size_t len) {
    robj *o = zmalloc(sizeof(robj)+sizeof(struct sdshdr)+len+1);
    struct sdshdr *sh = (void*)(o+1);

    o->type = REDIS_STRING;
    o->encoding = REDIS_ENCODING_EMBSTR;
    o->ptr = sh+1;
    o->refcount = 1;
    // 1
    o->lru = LRU_CLOCK();

    sh->len = len;
    sh->free = 0;
    if (ptr) {
        memcpy(sh->buf,ptr,len);
        sh->buf[len] = '\0';
    } else {
        memset(sh->buf,0,len+1);
    }
    return o;
}

1處即是。

每次查找該key時,刷新時間

robj *lookupKey(redisDb *db, robj *key) {

    // 查找鍵空間
    dictEntry *de = dictFind(db->dict,key->ptr);

    // 節點存在
    if (de) {
        

        // 取出值
        robj *val = dictGetVal(de);

        /* Update the access time for the ageing algorithm.
         * Don't do it if we have a saving child, as this will trigger
         * a copy on write madness. */
        // 更新時間信息(只在不存在子進程時執行,防止破壞 copy-on-write 機制)
        if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
            // 1
            val->lru = LRU_CLOCK();

        // 返回值
        return val;
    } else {

        // 節點不存在

        return NULL;
    }
}

1處即是,包括get、set等各種操作,都會刷新該時間。

仔細看下面的堆棧,set的,get同理:

總結

大家有沒有更清楚一些呢?

總的來說,就是,設置了max-memory后,達到該內存限制后,會在處理命令時,檢查是否要進行內存淘汰;如果要淘汰,則根據maxmemory-policy的策略來。

隨機選擇maxmemory-sample個元素,按照空閑時間排序,拉鏈表;挨個挨個清除。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

南投搬家公司費用需注意的眉眉角角,別等搬了再說!

※教你寫出一流的銷售文案?

分類
發燒車訊

老大吩咐的可重入分佈式鎖,終於完美的實現了!!!

重做永遠比改造簡單

最近在做一個項目,將一個其他公司的實現系統(下文稱作舊系統),完整的整合到自己公司的系統(下文稱作新系統)中,這其中需要將對方實現的功能完整在自己系統也實現一遍。

舊系統還有一批存量商戶,為了不影響存量商戶的體驗,新系統提供的對外接口,還必須得跟以前一致。最後系統完整切換之後,功能只運行在新系統中,這就要求舊系統的數據還需要完整的遷移到新系統中。

當然這些在做這個項目之前就有預期,想過這個過程很難,但是沒想到有那麼難。原本感覺排期大半年,時間還是挺寬裕,現在感覺就是大坑,還不得不在坑裡一點點去填。

哎,說多都是淚,不吐槽了,等到下次做完再給大家復盤下真正心得體會。

回到正文,上篇文章Redis 分佈式鎖,咱們基於 Redis 實現一個分佈式鎖。這個分佈式鎖基本功能沒什麼問題,但是缺少可重入的特性,所以這篇文章小黑哥就帶大家來實現一下可重入的分佈式鎖。

本篇文章將會涉及以下內容:

  • 可重入
  • 基於 ThreadLocal 實現方案
  • 基於 Redis Hash 實現方案

先贊后看,養成習慣。微信搜索「程序通事」,關注就完事了~

可重入

說到可重入鎖,首先我們來看看一段來自 wiki 上可重入的解釋:

若一個程序或子程序可以“在任意時刻被中斷然後操作系統調度執行另外一段代碼,這段代碼又調用了該子程序不會出錯”,則稱其為可重入(reentrant或re-entrant)的。即當該子程序正在運行時,執行線程可以再次進入並執行它,仍然獲得符合設計時預期的結果。與多線程併發執行的線程安全不同,可重入強調對單個線程執行時重新進入同一個子程序仍然是安全的。

當一個線程執行一段代碼成功獲取鎖之後,繼續執行時,又遇到加鎖的代碼,可重入性就就保證線程能繼續執行,而不可重入就是需要等待鎖釋放之後,再次獲取鎖成功,才能繼續往下執行。

用一段 Java 代碼解釋可重入:

public synchronized void a() {
    b();
}

public synchronized void b() {
    // pass
}

假設 X 線程在 a 方法獲取鎖之後,繼續執行 b 方法,如果此時不可重入,線程就必須等待鎖釋放,再次爭搶鎖。

鎖明明是被 X 線程擁有,卻還需要等待自己釋放鎖,然後再去搶鎖,這看起來就很奇怪,我釋放我自己~

可重入性就可以解決這個尷尬的問題,當線程擁有鎖之後,往後再遇到加鎖方法,直接將加鎖次數加 1,然後再執行方法邏輯。退出加鎖方法之後,加鎖次數再減 1,當加鎖次數為 0 時,鎖才被真正的釋放。

可以看到可重入鎖最大特性就是計數,計算加鎖的次數。所以當可重入鎖需要在分佈式環境實現時,我們也就需要統計加鎖次數。

分佈式可重入鎖實現方式有兩種:

  • 基於 ThreadLocal 實現方案
  • 基於 Redis Hash 實現方案

首先我們看下基於 ThreadLocal 實現方案。

基於 ThreadLocal 實現方案

實現方式

Java 中 ThreadLocal可以使每個線程擁有自己的實例副本,我們可以利用這個特性對線程重入次數進行技術。

下面我們定義一個ThreadLocal的全局變量 LOCKS,內存存儲 Map 實例變量。

private static ThreadLocal<Map<String, Integer>> LOCKS = ThreadLocal.withInitial(HashMap::new);

每個線程都可以通過 ThreadLocal獲取自己的 Map實例,Mapkey 存儲鎖的名稱,而 value存儲鎖的重入次數。

加鎖的代碼如下:

/**
 * 可重入鎖
 *
 * @param lockName  鎖名字,代表需要爭臨界資源
 * @param request   唯一標識,可以使用 uuid,根據該值判斷是否可以重入
 * @param leaseTime 鎖釋放時間
 * @param unit      鎖釋放時間單位
 * @return
 */
public Boolean tryLock(String lockName, String request, long leaseTime, TimeUnit unit) {
    Map<String, Integer> counts = LOCKS.get();
    if (counts.containsKey(lockName)) {
        counts.put(lockName, counts.get(lockName) + 1);
        return true;
    } else {
        if (redisLock.tryLock(lockName, request, leaseTime, unit)) {
            counts.put(lockName, 1);
            return true;
        }
    }
    return false;
}

ps: redisLock#tryLock 為上一篇文章實現的分佈鎖。

由於公號外鏈無法直接跳轉,關注『程序通事』,回復分佈式鎖獲取源代碼。

加鎖方法首先判斷當前線程是否已經已經擁有該鎖,若已經擁有,直接對鎖的重入次數加 1。

若還沒擁有該鎖,則嘗試去 Redis 加鎖,加鎖成功之後,再對重入次數加 1 。

釋放鎖的代碼如下:

/**
 * 解鎖需要判斷不同線程池
 *
 * @param lockName
 * @param request
 */
public void unlock(String lockName, String request) {
    Map<String, Integer> counts = LOCKS.get();
    if (counts.getOrDefault(lockName, 0) <= 1) {
        counts.remove(lockName);
        Boolean result = redisLock.unlock(lockName, request);
        if (!result) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName:+" + lockName + " with request: "
                    + request);
        }

    } else {
        counts.put(lockName, counts.get(lockName) - 1);
    }
}

釋放鎖的時首先判斷重入次數,若大於 1,則代表該鎖是被該線程擁有,所以直接將鎖重入次數減 1 即可。

若當前可重入次數小於等於 1,首先移除 Map中鎖對應的 key,然後再到 Redis 釋放鎖。

這裏需要注意的是,當鎖未被該線程擁有,直接解鎖,可重入次數也是小於等於 1 ,這次可能無法直接解鎖成功。

ThreadLocal 使用過程要記得及時清理內部存儲實例變量,防止發生內存泄漏,上下文數據串用等問題。

下次咱來聊聊最近使用 ThreadLocal 寫的 Bug。

相關問題

使用 ThreadLocal 這種本地記錄重入次數,雖然真的簡單高效,但是也存在一些問題。

過期時間問題

上述加鎖的代碼可以看到,重入加鎖時,僅僅對本地計數加 1 而已。這樣可能就會導致一種情況,由於業務執行過長,Redis 已經過期釋放鎖。

而再次重入加鎖時,由於本地還存在數據,認為鎖還在被持有,這就不符合實際情況。

如果要在本地增加過期時間,還需要考慮本地與 Redis 過期時間一致性的,代碼就會變得很複雜。

不同線程/進程可重入問題

狹義上可重入性應該只是對於同一線程的可重入,但是實際業務可能需要不同的應用線程之間可以重入同把鎖。

ThreadLocal的方案僅僅只能滿足同一線程重入,無法解決不同線程/進程之間重入問題。

不同線程/進程重入問題就需要使用下述方案 Redis Hash 方案解決。

基於 Redis Hash 可重入鎖

實現方式

ThreadLocal 的方案中我們使用了 Map 記載鎖的可重入次數,而 Redis 也同樣提供了 Hash (哈希表)這種可以存儲鍵值對數據結構。所以我們可以使用 Redis Hash 存儲的鎖的重入次數,然後利用 lua 腳本判斷邏輯。

加鎖的 lua 腳本如下:

---- 1 代表 true
---- 0 代表 false

if (redis.call('exists', KEYS[1]) == 0) then
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return 1;
end ;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return 1;
end ;
return 0;

如果 KEYS:[lock],ARGV[1000,uuid]

不熟悉 lua 語言同學也不要怕,上述邏輯還是比較簡單的。

加鎖代碼首先使用 Redis exists 命令判斷當前 lock 這個鎖是否存在。

如果鎖不存在的話,直接使用 hincrby創建一個鍵為 lock hash 表,並且為 Hash 表中鍵為 uuid 初始化為 0,然後再次加 1,最後再設置過期時間。

如果當前鎖存在,則使用 hexists判斷當前 lock 對應的 hash 表中是否存在 uuid 這個鍵,如果存在,再次使用 hincrby 加 1,最後再次設置過期時間。

最後如果上述兩個邏輯都不符合,直接返回。

加鎖代碼如下:

// 初始化代碼

String lockLuaScript = IOUtils.toString(ResourceUtils.getURL("classpath:lock.lua").openStream(), Charsets.UTF_8);
lockScript = new DefaultRedisScript<>(lockLuaScript, Boolean.class);

/**
 * 可重入鎖
 *
 * @param lockName  鎖名字,代表需要爭臨界資源
 * @param request   唯一標識,可以使用 uuid,根據該值判斷是否可以重入
 * @param leaseTime 鎖釋放時間
 * @param unit      鎖釋放時間單位
 * @return
 */
public Boolean tryLock(String lockName, String request, long leaseTime, TimeUnit unit) {
    long internalLockLeaseTime = unit.toMillis(leaseTime);
    return stringRedisTemplate.execute(lockScript, Lists.newArrayList(lockName), String.valueOf(internalLockLeaseTime), request);
}

Spring-Boot 2.2.7.RELEASE

只要搞懂 Lua 腳本加鎖邏輯,Java 代碼實現還是挺簡單的,直接使用 SpringBoot 提供的 StringRedisTemplate 即可。

解鎖的 Lua 腳本如下:

-- 判斷 hash set 可重入 key 的值是否等於 0
-- 如果為 0 代表 該可重入 key 不存在
if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then
    return nil;
end ;
-- 計算當前可重入次數
local counter = redis.call('hincrby', KEYS[1], ARGV[1], -1);
-- 小於等於 0 代表可以解鎖
if (counter > 0) then
    return 0;
else
    redis.call('del', KEYS[1]);
    return 1;
end ;
return nil;

首先使用 hexists 判斷 Redis Hash 表是否存給定的域。

如果 lock 對應 Hash 表不存在,或者 Hash 表不存在 uuid 這個 key,直接返回 nil

若存在的情況下,代表當前鎖被其持有,首先使用 hincrby使可重入次數減 1 ,然後判斷計算之後可重入次數,若小於等於 0,則使用 del 刪除這把鎖。

解鎖的 Java 代碼如下:

// 初始化代碼:


String unlockLuaScript = IOUtils.toString(ResourceUtils.getURL("classpath:unlock.lua").openStream(), Charsets.UTF_8);
unlockScript = new DefaultRedisScript<>(unlockLuaScript, Long.class);

/**
 * 解鎖
 * 若可重入 key 次數大於 1,將可重入 key 次數減 1 <br>
 * 解鎖 lua 腳本返回含義:<br>
 * 1:代表解鎖成功 <br>
 * 0:代表鎖未釋放,可重入次數減 1 <br>
 * nil:代表其他線程嘗試解鎖 <br>
 * <p>
 * 如果使用 DefaultRedisScript<Boolean>,由於 Spring-data-redis eval 類型轉化,<br>
 * 當 Redis 返回  Nil bulk, 默認將會轉化為 false,將會影響解鎖語義,所以下述使用:<br>
 * DefaultRedisScript<Long>
 * <p>
 * 具體轉化代碼請查看:<br>
 * JedisScriptReturnConverter<br>
 *
 * @param lockName 鎖名稱
 * @param request  唯一標識,可以使用 uuid
 * @throws IllegalMonitorStateException 解鎖之前,請先加鎖。若為加鎖,解鎖將會拋出該錯誤
 */
public void unlock(String lockName, String request) {
    Long result = stringRedisTemplate.execute(unlockScript, Lists.newArrayList(lockName), request);
    // 如果未返回值,代表其他線程嘗試解鎖
    if (result == null) {
        throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName:+" + lockName + " with request: "
                + request);
    }
}

解鎖代碼執行方式與加鎖類似,只不過解鎖的執行結果返回類型使用 Long。這裏之所以沒有跟加鎖一樣使用 Boolean ,這是因為解鎖 lua 腳本中,三個返回值含義如下:

  • 1 代表解鎖成功,鎖被釋放
  • 0 代表可重入次數被減 1
  • null 代表其他線程嘗試解鎖,解鎖失敗

如果返回值使用 BooleanSpring-data-redis 進行類型轉換時將會把 null 轉為 false,這就會影響我們邏輯判斷,所以返回類型只好使用 Long

以下代碼來自 JedisScriptReturnConverter

相關問題

spring-data-redis 低版本問題

如果 Spring-Boot 使用 Jedis 作為連接客戶端,並且使用Redis Cluster 集群模式,需要使用 2.1.9 以上版本的spring-boot-starter-data-redis,不然執行過程中將會拋出:

org.springframework.dao.InvalidDataAccessApiUsageException: EvalSha is not supported in cluster environment.

如果當前應用無法升級 spring-data-redis也沒關係,可以使用如下方式,直接使用原生 Jedis 連接執行 lua 腳本。

以加鎖代碼為例:

public boolean tryLock(String lockName, String reentrantKey, long leaseTime, TimeUnit unit) {
    long internalLockLeaseTime = unit.toMillis(leaseTime);
    Boolean result = stringRedisTemplate.execute((RedisCallback<Boolean>) connection -> {
        Object innerResult = eval(connection.getNativeConnection(), lockScript, Lists.newArrayList(lockName), Lists.newArrayList(String.valueOf(internalLockLeaseTime), reentrantKey));
        return convert(innerResult);
    });
    return result;
}

private Object eval(Object nativeConnection, RedisScript redisScript, final List<String> keys, final List<String> args) {

    Object innerResult = null;
    // 集群模式和單點模式雖然執行腳本的方法一樣,但是沒有共同的接口,所以只能分開執行
    // 集群
    if (nativeConnection instanceof JedisCluster) {
        innerResult = evalByCluster((JedisCluster) nativeConnection, redisScript, keys, args);
    }
    // 單點
    else if (nativeConnection instanceof Jedis) {
        innerResult = evalBySingle((Jedis) nativeConnection, redisScript, keys, args);
    }
    return innerResult;
}

數據類型轉化問題

如果使用 Jedis 原生連接執行 Lua 腳本,那麼可能又會碰到數據類型的轉換坑。

可以看到 Jedis#eval返回 Object,我們需要具體根據 Lua 腳本的返回值的,再進行相關轉化。這其中就涉及到 Lua 數據類型轉化為 Redis 數據類型。

下面主要我們來講下 Lua 數據轉化 Redis 的規則中幾條比較容易踩坑:

1、Lua number 與 Redis 數據類型轉換

Lua 中 number 類型是一個雙精度的浮點數,但是 Redis 只支持整數類型,所以這個轉化過程將會丟棄小數位。

2、Lua boolean 與 Redis 類型轉換

這個轉化比較容易踩坑,Redis 中是不存在 boolean 類型,所以當Lua 中 true 將會轉為 Redis 整數 1。而 Lua 中 false 並不是轉化整數,而是轉化 null 返回給客戶端。

3、Lua nil 與 Redis 類型轉換

Lua nil 可以當做是一個空值,可以等同於 Java 中的 null。在 Lua 中如果 nil 出現在條件表達式,將會當做 false 處理。

所以 Lua nil 也將會 null 返回給客戶端。

其他轉化規則比較簡單,詳情參考:

http://doc.redisfans.com/script/eval.html

總結

可重入分佈式鎖關鍵在於對於鎖重入的計數,這篇文章主要給出兩種解決方案,一種基於 ThreadLocal 實現方案,這種方案實現簡單,運行也比較高效。但是若要處理鎖過期的問題,代碼實現就比較複雜。

另外一種採用 Redis Hash 數據結構實現方案,解決了 ThreadLocal 的缺陷,但是代碼實現難度稍大,需要熟悉 Lua 腳本,以及Redis 一些命令。另外使用 spring-data-redis 等操作 Redis 時不經意間就會遇到各種問題。

幫助

https://www.sofastack.tech/blog/sofa-jraft-rheakv-distributedlock/

https://tech.meituan.com/2016/09/29/distributed-system-mutually-exclusive-idempotence-cerberus-gtis.html

最後說兩句(求關注)

看完文章,哥哥姐姐們點個吧,周更真的超累,不知覺又寫了两天,拒絕白嫖,來點正反饋唄~。

最後感謝各位的閱讀,才疏學淺,難免存在紕漏,如果你發現錯誤的地方,可以留言指出。如果看完文章還有其他不懂的地方,歡迎加我,互相學習,一起成長~

最後謝謝大家支持~

最最後,重要的事再說一篇~

快來關注我呀~
快來關注我呀~
快來關注我呀~

歡迎關注我的公眾號:程序通事,獲得日常乾貨推送。如果您對我的專題內容感興趣,也可以關注我的博客:studyidea.cn

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※別再煩惱如何寫文案,掌握八大原則!

※產品缺大量曝光嗎?你需要的是一流包裝設計!