分類
發燒車訊

【Flutter實戰】圖片組件及四大案例

老孟導讀:大家好,這是【Flutter實戰】系列文章的第三篇,這一篇講解圖片組件,Image有很多高級用法,希望對您有所幫助。

圖片組件是Flutter基礎組件之一,和文本組件一樣必不可少。圖片組件包含Image和Icon兩個組件,本質上Icon不屬於圖片組件,但其外形效果上類似於圖片。

在項目中建議優先使用Icon組件,Icon本質上是一種字體,只不過显示的不是文字,而是圖標,而Image組件先通過圖片解碼器將圖片解碼,所以Icon有如下優點:

  • 通常情況下,圖標比圖片體積更小,顯著的減少App包體積。
  • 圖標不會出現失真或者模糊的現象,例如將20×20的圖片,渲染在200×200的屏幕上,圖片會失真或模糊,而圖標是矢量圖,不會失真,就像字體一樣。
  • 多個圖標可以存放在一個文件中,方便管理。
  • 全平台通用。

Image

Image組件用於显示圖片,圖片的來源可以是網絡、項目中圖片或者設備上的圖片。

加載網絡圖片:

Image.network(
  'http://pic1.win4000.com/pic/c/cf/cdc983699c.jpg',
)

加載項目中圖片:

首先將圖片拷貝到項目中,通常情況下,拷貝到assets/images/目錄下,assets/images/目錄為手動創建,新建的項目默認是沒有此目錄的。

設置pubspec.yaml配置文件:

assets:
  - assets/images/

或者指定具體圖片的名稱:

assets:
  - assets/images/aa.jpg

通常情況下,使用第一種方式,因為圖片會有很多張,增加一張就這裏配置一個太麻煩。

注意:assets前面的空格問題,極容易引發編譯異常,正確格式如下:

加載圖片:

Image.asset('assets/images/aa.jpg')

加載設備上的圖片:

要加載設備(手機)上的圖片首先需要獲取設備圖片的路徑,由於不同平台的路徑不同,因此路徑的獲取必須依靠原生支持,如果了解原生(Android和iOS)開發,可以直接使用MethodChannel獲取路徑,如果不懂原生(Android和iOS)開發,可以使用第三方插件獲取路徑,這裏推薦官方的path_provider

加載設備上的圖片:

Image.file(File('path'))

設置圖片的大小:

Image.asset('assets/images/aa.jpg',width: 100,height: 200,),

當Image的大小和圖片大小不匹配時,需要設置填充模式fit,設置組件大小為150×150,

Container(
  color: Colors.red.withOpacity(.3),
  child: Image.asset('assets/images/aa.jpg',width: 150,height: 150),
)

看到,圖片左右兩邊有空白區域(淺紅色填充的區域),如果想要圖片充滿整個區域,設置如下:

Container(
  color: Colors.red.withOpacity(.3),
  child: Image.asset('assets/images/aa.jpg',width: 150,height: 150,fit: BoxFit.fill,),
)

雖然圖片充滿整個區域,但圖片變形了,使圖片等比拉伸,直到兩邊都充滿區域:

Container(
  color: Colors.red.withOpacity(.3),
  child: Image.asset('assets/images/aa.jpg',width: 150,height: 150,fit: BoxFit.cover,),
)

此時,圖片未變形且兩邊都充滿區域,不過圖片被裁減了一部分。

fit參數就是設置填充方式,其值介紹如下:

  • fill:完全填充,寬高比可能會變。
  • contain:等比拉伸,直到一邊填充滿。
  • cover:等比拉伸,直到2邊都填充滿,此時一邊可能超出範圍。
  • fitWidth:等比拉伸,寬填充滿。
  • fitHeight:等比拉伸,高填充滿。
  • none:當組件比圖片小時,不拉伸,超出範圍截取。
  • scaleDown:當組件比圖片小時,圖片等比縮小,效果和contain一樣。

BoxFit.none的裁減和alignment相關,默認居中,

Image.asset(
  'assets/images/aa.jpg',
  width: 150,
  height: 150,
  fit: BoxFit.none,
  alignment: Alignment.centerRight,
),

左邊為原圖。

設置對齊方式:

Container(
  color: Colors.red.withOpacity(.3),
  child: Image.asset(
    'assets/images/aa.jpg',
    width: 150,
    height: 150,
    alignment: Alignment.centerLeft,
  ),
),

colorcolorBlendMode用於將顏色和圖片進行顏色混合,colorBlendMode表示混合模式,下面介紹的混合模式比較多,瀏覽一遍即可,此屬性可以用於簡單的濾鏡效果。

  • clear:清楚源圖像和目標圖像。
  • color:獲取源圖像的色相和飽和度以及目標圖像的光度。
  • colorBurn:將目標的倒數除以源,然後將結果倒數。
  • colorDodge:將目標除以源的倒數。
  • darken:通過從每個顏色通道中選擇最小值來合成源圖像和目標圖像。
  • difference:從每個通道的較大值中減去較小的值。合成黑色沒有效果。合成白色會使另一張圖像的顏色反轉。
  • dst:僅繪製目標圖像。
  • dstATop:將目標圖像合成到源圖像上,但僅在與源圖像重疊的位置合成。
  • dstIn:显示目標圖像,但僅显示兩個圖像重疊的位置。不渲染源圖像,僅將其視為蒙版。源的顏色通道將被忽略,只有不透明度才起作用。
  • dstOut:显示目標圖像,但僅显示兩個圖像不重疊的位置。不渲染源圖像,僅將其視為蒙版。源的顏色通道將被忽略,只有不透明度才起作用。
  • dstOver:將源圖像合成到目標圖像下。
  • exclusion:從兩個圖像的總和中減去兩個圖像的乘積的兩倍。
  • hardLight:調整源圖像和目標圖像的成分以使其適合源圖像之後,將它們相乘。
  • hue:獲取源圖像的色相,以及目標圖像的飽和度和光度。
  • lighten:通過從每個顏色通道中選擇最大值來合成源圖像和目標圖像。
  • luminosity:獲取源圖像的亮度,以及目標圖像的色相和飽和度。
  • modulate:將源圖像和目標圖像的顏色分量相乘。
  • multiply:將源圖像和目標圖像的分量相乘,包括alpha通道。
  • overlay:調整源圖像和目標圖像的分量以使其適合目標后,將它們相乘。
  • plus:對源圖像和目標圖像的組成部分求和。
  • saturation:獲取源圖像的飽和度以及目標圖像的色相和亮度。
  • screen:將源圖像和目標圖像的分量的逆值相乘,然後對結果求逆。
  • softLight:對於低於0.5的源值使用colorDodge,對於高於0.5的源值使用colorBurn。
  • src:放置目標圖像,僅繪製源圖像。
  • srcATop:將源圖像合成到目標圖像上,但僅在與目標圖像重疊的位置合成。
  • srcIn:显示源圖像,但僅显示兩個圖像重疊的位置。目標圖像未渲染,僅被視為蒙版。目標的顏色通道將被忽略,只有不透明度才起作用。
  • srcOut:显示源圖像,但僅显示兩個圖像不重疊的位置。
  • srcOver:將源圖像合成到目標圖像上。
  • xor:將按位異或運算符應用於源圖像和目標圖像。

是不是感覺看了和沒看差不多,看了也看不懂。正常,估計只有學過視覺算法的才能看懂吧,直接看下各個屬性的效果吧:

repeat表示當組件有空餘位置時,將會重複显示圖片

Image.asset(
  'assets/images/aa.jpg',
  width: double.infinity,
  height: 150,
  repeat: ImageRepeat.repeatX,
)

重複的模式有:

  • repeat:x,y方向都充滿。
  • repeatX:x方向充滿。
  • repeatY:y方向充滿。
  • noRepeat:不重複。

matchTextDirection設置為true時,圖片的繪製方向為TextDirection設置的方向,其父組件必須為Directionality

Directionality(
    textDirection: TextDirection.rtl,
    child: Image.asset(
      'assets/images/logo.png',
      height: 150,
      matchTextDirection: true,
    )),

左邊為原圖,效果是左右鏡像。

filterQuality表示繪製圖像的質量,從高到低為:high->medium->low->none。越高效果越好,越平滑,當然性能損耗越大,默認是low,如果發現圖片有鋸齒,可以設置此參數。

當加載圖片的時候回調frameBuilder,當此參數為null時,此控件將會在圖片加載完成后显示,未加載完成時显示空白,尤其在加載網絡圖片時會更明顯。因此此參數可以用於處理圖片加載時显示佔位圖片和加載圖片的過渡效果,比如淡入淡出效果。

下面的案例是淡入淡出效果:

Image.network(
  'https://flutter.github.io/assets-for-api-docs/assets/widgets/puffin.jpg',
  frameBuilder: (BuildContext context, Widget child, int frame,
      bool wasSynchronouslyLoaded) {
    if (wasSynchronouslyLoaded) {
      return child;
    }
    return AnimatedOpacity(
      child: child,
      opacity: frame == null ? 0 : 1,
      duration: const Duration(seconds: 2),
      curve: Curves.easeOut,
    );
  },
)

loadingBuilder參數比frameBuilder控制的力度更細,可以獲取圖片加載的進度,下面的案例显示了加載進度條:

Image.network(
    'https://flutter.github.io/assets-for-api-docs/assets/widgets/puffin.jpg',
    loadingBuilder: (BuildContext context, Widget child,
        ImageChunkEvent loadingProgress) {
  if (loadingProgress == null) {
    return child;
  }
  return Center(
    child: CircularProgressIndicator(
      value: loadingProgress.expectedTotalBytes != null
          ? loadingProgress.cumulativeBytesLoaded /
              loadingProgress.expectedTotalBytes
          : null,
    ),
  );
})

centerSlice用於.9圖,.9圖用於拉伸圖片的特定區域,centerSlice設置的區域(Rect)就是拉伸的區域。.9圖通常用於控件大小、寬高比不固定的場景,比如聊天背景圖片等。

Container(
    width: 250,
    height: 300,
    decoration: BoxDecoration(
        image: DecorationImage(
            centerSlice: Rect.fromLTWH(20, 20, 10, 10),
            image: AssetImage(
              'assets/images/abc.jpg',
            ),
            fit: BoxFit.fill))),

上面為原圖,下面為拉伸的圖片。

在使用時大概率會出現如下異常:

這是由於圖片比組件的尺寸大,如果使用centerSlice屬性,圖片必須比組件的尺寸小,一般情況下,.9圖的尺寸都非常小。

Icon

Icon是圖標組件,Icon不具有交互屬性,如果想要交互,可以使用IconButton。

Icon(Icons.add),

設置圖標的大小和顏色:

Icon(
  Icons.add,
  size: 40,
  color: Colors.red,
)

上面的黑色為默認大小和顏色。

Icons.add是系統提供的圖標,創建Flutter項目的時候,pubspec.yaml中默認有如下配置:

所有的圖標在Icons中已經定義,可以直接在源代碼中查看,也可以到官網查看所有圖標。

所有圖標效果如下:

案例

聊天背景(.9圖實現)

Container(
  width: 200,
  padding: EdgeInsets.only(left: 8,top: 8,right: 20,bottom: 8),
  decoration: BoxDecoration(
      image: DecorationImage(
          centerSlice: Rect.fromLTWH(20, 20, 1, 1),
          image: AssetImage(
            'assets/images/chat.png',
          ),
          fit: BoxFit.fill)),
  child: Text('老孟,專註分享Flutter技術和應用實戰。'
      '老孟,專註分享Flutter技術和應用實戰。'
      '老孟,專註分享Flutter技術和應用實戰。',),
)

背景圖片大小是57×80:

右側三角已經不在中間了,如果想讓其一直保持居中,修改拉伸區域:

centerSlice: Rect.fromLTWH(20, 10, 1, 60),

圓形帶邊框的頭像

Container(
  width: 100,
  height: 100,
  padding: EdgeInsets.all(3),
  decoration: BoxDecoration(shape: BoxShape.circle, color: Colors.blue),
  child: Container(
    decoration: BoxDecoration(
        shape: BoxShape.circle,
        image: DecorationImage(
            image: AssetImage('assets/images/aa.jpg'), fit: BoxFit.cover)),
  ),
)

圖片佔位符:

Image.network(
  'https://flutter.github.io/assets-for-api-docs/assets/widgets/puffin.jpg',
  height: 150,
  width: 150,
  fit: BoxFit.cover,
  frameBuilder: (
    BuildContext context,
    Widget child,
    int frame,
    bool wasSynchronouslyLoaded,
  ) {
    if (frame == null) {
      return Image.asset(
        'assets/images/place.png',
        height: 150,
        width: 150,
        fit: BoxFit.cover,
      );
    }
    return child;
  },
)

添加自己的圖標庫

如果系統提供的圖標沒有我們想要的圖標,這時需要引入第三方庫的圖標,下面以阿里巴巴的圖標庫為例。

打開阿里巴巴的圖標官網,找到自己想要的圖標后,將鼠標放置到圖標上,加入購物車:

點擊右上角的購物車,然後點擊添加至項目:

如果沒有添加過項目,需要創建一個新項目:

創建好后加入此項目,跳轉到我的項目頁面,點擊下載:

解壓下載的文件,解壓出來的文件有好幾個,如下圖:

選擇iconfont.ttf文件拷貝到 Flutter 項目的assets/fonts目錄下,assets/fonts目錄默認是沒有的,需要手動創建,在pubspec.yaml設置如下:

千萬注意紅框內開頭的空格問題,否則編譯不通過,family後面跟的字符串最好有意義,後面用圖標的時候需要用到。

用法如下:

Icon(IconData(0xe613,fontFamily: 'appIconFonts')

0xe613在下載圖標時已經標註,將&#替換為0,如下圖:

fontFamily是在pubspec.yaml中設置的family屬性,第三方的圖標和系統圖標一樣,可以設置其顏色和大小。

交流

老孟Flutter博客地址(330個控件用法):http://laomengit.com

歡迎加入Flutter交流群(微信:laomengit)、關注公眾號【老孟Flutter】:

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

※推薦評價好的iphone維修中心

分類
發燒車訊

[源碼解析] GroupReduce,GroupCombine 和 Flink SQL group by

[源碼解析] GroupReduce,GroupCombine和Flink SQL group by

目錄

  • [源碼解析] GroupReduce,GroupCombine和Flink SQL group by
    • 0x00 摘要
    • 0x01 緣由
    • 0x02 概念
      • 2.1 GroupReduce
      • 2.2 GroupCombine
      • 2.3 例子
    • 0x03 代碼
    • 0x04 Flink SQL內部翻譯
    • 0x05 JobGraph
    • 0x06 Runtime
      • 6.1 ChainedFlatMapDriver
      • 6.2 GroupReduceCombineDriver
      • 6.3 GroupReduceDriver & ChainedFlatMapDriver
    • 0x07 總結
    • 0x08 參考

0x00 摘要

本文從源碼和實例入手,為大家解析 Flink 中 GroupReduce 和 GroupCombine 的用途。也涉及到了 Flink SQL group by 的內部實現。

0x01 緣由

在前文[源碼解析] Flink的Groupby和reduce究竟做了什麼中,我們剖析了Group和reduce都做了些什麼,也對combine有了一些了解。但是總感覺意猶未盡,因為在Flink還提出了若干新算子,比如GroupReduce和GroupCombine。這幾個算子不搞定,總覺得如鯁在喉,但沒有找到一個良好的例子來進行剖析說明。

本文是筆者在探究Flink SQL UDF問題的一個副產品。起初是為了調試一段sql代碼,結果發現Flink本身給出了一個GroupReduce和GroupCombine使用的完美例子。於是就拿出來和大家共享,一起分析看看究竟如何使用這兩個算子。

請注意:這個例子是Flink SQL,所以本文中將涉及Flink SQL goup by內部實現的知識。

0x02 概念

Flink官方對於這兩個算子的使用說明如下:

2.1 GroupReduce

GroupReduce算子應用在一個已經分組了的DataSet上,其會對每個分組都調用到用戶定義的group-reduce函數。它與Reduce的區別在於用戶定義的函數會立即獲得整個組。

Flink將在組的所有元素上使用Iterable調用用戶自定義函數,並且可以返回任意數量的結果元素。

2.2 GroupCombine

GroupCombine轉換是可組合GroupReduceFunction中組合步驟的通用形式。它在某種意義上被概括為允許將輸入類型 I 組合到任意輸出類型O。與此相對的是,GroupReduce中的組合步驟僅允許從輸入類型 I 到輸出類型 I 的組合。這是因為GroupReduceFunction的 “reduce步驟” 期望自己的輸入類型為 I。

在一些應用中,我們期望在執行附加變換(例如,減小數據大小)之前將DataSet組合成中間格式。這可以通過CombineGroup轉換能以非常低的成本實現。

注意:分組數據集上的GroupCombine在內存中使用貪婪策略執行,該策略可能不會一次處理所有數據,而是以多個步驟處理。它也可以在各個分區上執行,而無需像GroupReduce轉換那樣進行數據交換。這可能會導致輸出的是部分結果,所以GroupCombine是不能替代GroupReduce操作的,儘管它們的操作內容可能看起來都一樣。

2.3 例子

是不是有點暈?還是直接讓代碼來說話吧。以下官方示例演示了如何將CombineGroup和GroupReduce轉換用於WordCount實現。即通過combine操作先對單詞數目進行初步排序,然後通過reduceGroup對combine產生的結果進行最終排序。因為combine進行了初步排序,所以在算子之間傳輸的數據量就少多了

DataSet<String> input = [..] // The words received as input

// 這裏通過combine操作先對單詞數目進行初步排序,其優勢在於用戶定義的combine函數只調用一次,因為runtime已經把輸入數據一次性都提供給了自定義函數。  
DataSet<Tuple2<String, Integer>> combinedWords = input
  .groupBy(0) // group identical words
  .combineGroup(new GroupCombineFunction<String, Tuple2<String, Integer>() {

    public void combine(Iterable<String> words, Collector<Tuple2<String, Integer>>) { // combine
        String key = null;
        int count = 0;

        for (String word : words) {
            key = word;
            count++;
        }
        // emit tuple with word and count
        out.collect(new Tuple2(key, count));
    }
});

// 這裏對combine的結果進行第二次排序,其優勢在於用戶定義的reduce函數只調用一次,因為runtime已經把輸入數據一次性都提供給了自定義函數。  
DataSet<Tuple2<String, Integer>> output = combinedWords
  .groupBy(0)                              // group by words again
  .reduceGroup(new GroupReduceFunction() { // group reduce with full data exchange

    public void reduce(Iterable<Tuple2<String, Integer>>, Collector<Tuple2<String, Integer>>) {
        String key = null;
        int count = 0;

        for (Tuple2<String, Integer> word : words) {
            key = word;
            count++;
        }
        // emit tuple with word and count
        out.collect(new Tuple2(key, count));
    }
});

看到這裏,有的兄弟已經明白了,這和mapPartition很類似啊,都是runtime做了大量工作。為了讓大家這兩個算子的使用情形有深刻的認識,我們再通過一個sql的例子,向大家展示Flink內部是怎麼應用這兩個算子的,也能看出來他們的強大之處

0x03 代碼

下面代碼主要參考自 flink 使用問題匯總。我們可以看到這裏通過groupby進行了聚合操作。其中collect方法,類似於mysql的group_concat。

public class UdfExample {
    public static class MapToString extends ScalarFunction {

        public String eval(Map<String, Integer> map) {
            if(map==null || map.size()==0) {
                return "";
            }
            StringBuffer sb=new StringBuffer();
            for(Map.Entry<String, Integer> entity : map.entrySet()) {
                sb.append(entity.getKey()+",");
            }
            String result=sb.toString();
            return result.substring(0, result.length()-1);
        }
    }

    public static void main(String[] args) throws Exception {
        MemSourceBatchOp src = new MemSourceBatchOp(new Object[][]{
                new Object[]{"1", "a", 1L},
                new Object[]{"2", "b33", 2L},
                new Object[]{"2", "CCC", 2L},
                new Object[]{"2", "xyz", 2L},
                new Object[]{"1", "u", 1L}
        }, new String[]{"f0", "f1", "f2"});

        BatchTableEnvironment environment = MLEnvironmentFactory.getDefault().getBatchTableEnvironment();
        Table table = environment.fromDataSet(src.getDataSet());
        environment.registerTable("myTable", table);
        BatchOperator.registerFunction("MapToString",  new MapToString());
        BatchOperator.sqlQuery("select f0, mapToString(collect(f1)) as type from myTable group by f0").print();
    }
}

程序輸出是

f0|type
--|----
1|a,u
2|CCC,b33,xyz

0x04 Flink SQL內部翻譯

這個SQL語句的重點是group by。這個是程序猿經常使用的操作。但是大家有沒有想過這個group by在真實運行起來時候是怎麼操作的呢?針對大數據環境有沒有做了什麼優化呢?其實,Flink正是使用了GroupReduce和GroupCombine來實現並且優化了group by的功能。優化之處在於:

  • GroupReduce和GroupCombine的函數調用次數要遠低於正常的reduce算子,如果reduce操作中涉及到頻繁創建額外的對象或者外部資源操作,則會相當省時間。
  • 因為combine進行了初步排序,所以在算子之間傳輸的數據量就少多了。

SQL生成Flink的過程十分錯綜複雜,所以我們只能找最關鍵處。其是在 DataSetAggregate.translateToPlan 完成的。我們可以看到,對於SQL語句 “select f0, mapToString(collect(f1)) as type from myTable group by f0”,Flink系統把它翻譯成如下階段,即

  • pre-aggregation :排序 + combine;
  • final aggregation :排序 + reduce;

從之前的文章我們可以知道,groupBy這個其實不是一個算子,它只是排序過程中的一個輔助步驟而已,所以我們重點還是要看combineGroup和reduceGroup。這恰恰是我們想要的完美例子。

input ----> (groupBy + combineGroup) ----> (groupBy + reduceGroup) ----> output

SQL生成的Scala代碼如下,其中 combineGroup在後續中將生成GroupCombineOperator,reduceGroup將生成GroupReduceOperator。

  override def translateToPlan(
      tableEnv: BatchTableEnvImpl,
      queryConfig: BatchQueryConfig): DataSet[Row] = {

    if (grouping.length > 0) {
      // grouped aggregation
      ...... 
      if (preAgg.isDefined) { // 我們的例子是在這裏
        inputDS          
          // pre-aggregation
          .groupBy(grouping: _*)
          .combineGroup(preAgg.get) // 將生成GroupCombineOperator算子
          .returns(preAggType.get)
          .name(aggOpName)
          // final aggregation
          .groupBy(grouping.indices: _*) //將生成GroupReduceOperator算子。
          .reduceGroup(finalAgg.right.get)
          .returns(rowTypeInfo)
          .name(aggOpName)
      } else {
        ......
      }
    }
    else {
      ......
    }
  }
}

// 程序變量打印如下
this = {DataSetAggregate@5207} "Aggregate(groupBy: (f0), select: (f0, COLLECT(f1) AS $f1))"
 cluster = {RelOptCluster@5220} 

0x05 JobGraph

LocalExecutor.execute中會生成JobGraph。JobGraph是提交給 JobManager 的數據結構,是唯一被Flink的數據流引擎所識別的表述作業的數據結構,也正是這一共同的抽象體現了流處理和批處理在運行時的統一。

在生成JobGraph時候,系統得到如下JobVertex。

jobGraph = {JobGraph@5652} "JobGraph(jobId: 6aae8b5e5ad32f588136bef26f8b65f6)"
 taskVertices = {LinkedHashMap@5655}  size = 4

{JobVertexID@5677} "c625209bb7fb9a098807551840aeaa99" -> {InputOutputFormatVertex@5678} "CHAIN DataSource (at initializeDataSource(MemSourceBatchOp.java:98) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (select: (f0, f1)) (org.apache.flink.runtime.operators.DataSourceTask)"

{JobVertexID@5679} "b56ace4acd7a2f69ea110a9f262ff80a" -> {JobVertex@5680} "CHAIN GroupReduce (groupBy: (f0), select: (f0, COLLECT(f1) AS $f1)) -> FlatMap (select: (f0, mapToString($f1) AS type)) -> Map (Map at linkFrom(MapBatchOp.java:35)) (org.apache.flink.runtime.operators.BatchTask)"
 
{JobVertexID@5681} "3f5e2a0f700421d80ce85e02a6d9db73" -> {InputOutputFormatVertex@5682} "DataSink (collect()) (org.apache.flink.runtime.operators.DataSinkTask)"
 
{JobVertexID@5683} "ad29dc5b2e0a39ad2cd1d164b6f859f7" -> {JobVertex@5684} "GroupCombine (groupBy: (f0), select: (f0, COLLECT(f1) AS $f1)) (org.apache.flink.runtime.operators.BatchTask)"

我們可以看到,在JobGraph中就生成了對應的兩個算子。其中這裏的FlatMap就是用戶的UDF函數MapToString的映射生成。

GroupCombine (groupBy: (f0), select: (f0, COLLECT(f1) AS $f1))  
  
CHAIN GroupReduce (groupBy: (f0), select: (f0, COLLECT(f1) AS $f1)) -> FlatMap (select: (f0, mapToString($f1) AS type)) -> Map 

0x06 Runtime

最後,讓我們看看runtime會如何處理這兩個算子。

6.1 ChainedFlatMapDriver

首先,Flink會在ChainedFlatMapDriver.collect中對record進行處理,這是從Table中提取數據所必須經歷的,與後續的group by關係不大。

@Override
public void collect(IT record) {
   try {
      this.numRecordsIn.inc();
      this.mapper.flatMap(record, this.outputCollector);
   } catch (Exception ex) {
      throw new ExceptionInChainedStubException(this.taskName, ex);
   }
}

// 這裡能夠看出來,我們獲取了第一列記錄
record = {Row@9317} "1,a,1"
 fields = {Object[3]@9330} 
this.taskName = "FlatMap (select: (f0, f1))"

// 程序堆棧打印如下
collect:80, ChainedFlatMapDriver (org.apache.flink.runtime.operators.chaining)
collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
invoke:196, DataSourceTask (org.apache.flink.runtime.operators)
doRun:707, Task (org.apache.flink.runtime.taskmanager)
run:532, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)

6.2 GroupReduceCombineDriver

其次,GroupReduceCombineDriver.run()中會進行combine操作。

  1. 會通過this.sorter.write(value)把數據寫到排序緩衝區。
  2. 會通過sortAndCombineAndRetryWrite(value)進行實際的排序,合併。

因為是系統實現,所以Combine的用戶自定義函數就是由Table API提供的,比如org.apache.flink.table.functions.aggfunctions.CollectAccumulator.accumulate

@Override
public void run() throws Exception {
   final MutableObjectIterator<IN> in = this.taskContext.getInput(0);
   final TypeSerializer<IN> serializer = this.serializer;

   if (objectReuseEnabled) {
    .....
   }
   else {
      IN value;
      while (running && (value = in.next()) != null) {
         // try writing to the sorter first
         if (this.sorter.write(value)) {
            continue;
         }

         // do the actual sorting, combining, and data writing
         sortAndCombineAndRetryWrite(value);
      }
   }

   // sort, combine, and send the final batch
   if (running) {
      sortAndCombine();
   }
}

// 程序變量如下
value = {Row@9494} "1,a"
 fields = {Object[2]@9503} 

sortAndCombine是具體排序/合併的過程。

  1. 排序是通過 org.apache.flink.runtime.operators.sort.QuickSort 完成的。
  2. 合併是通過 org.apache.flink.table.functions.aggfunctions.CollectAccumulator.accumulate 完成的。
  3. 給下游是由 org.apache.flink.table.runtime.aggregate.DataSetPreAggFunction.combine 調用 out.collect(output) 完成的。
private void sortAndCombine() throws Exception {
   final InMemorySorter<IN> sorter = this.sorter;
   // 這裏進行實際的排序
   this.sortAlgo.sort(sorter);
   final GroupCombineFunction<IN, OUT> combiner = this.combiner;
   final Collector<OUT> output = this.output;

   // iterate over key groups
   if (objectReuseEnabled) {
			......		
   } else {
      final NonReusingKeyGroupedIterator<IN> keyIter = 
            new NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator);
      // 這裡是歸併操作
      while (this.running && keyIter.nextKey()) {
         // combiner.combiner 是用戶定義操作,runtime把某key對應的數據一次性傳給它
         combiner.combine(keyIter.getValues(), output);
      }
   }
}

具體調用棧如下:

accumulate:57, CollectAggFunction (org.apache.flink.table.functions.aggfunctions)
accumulate:-1, DataSetAggregatePrepareMapHelper$5
combine:71, DataSetPreAggFunction (org.apache.flink.table.runtime.aggregate)
sortAndCombine:213, GroupReduceCombineDriver (org.apache.flink.runtime.operators)
run:188, GroupReduceCombineDriver (org.apache.flink.runtime.operators)
run:504, BatchTask (org.apache.flink.runtime.operators)
invoke:369, BatchTask (org.apache.flink.runtime.operators)
doRun:707, Task (org.apache.flink.runtime.taskmanager)
run:532, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)

6.3 GroupReduceDriver & ChainedFlatMapDriver

這兩個放在一起,是因為他們組成了Operator Chain。

GroupReduceDriver.run中完成了reduce。具體reduce 操作是在 org.apache.flink.table.runtime.aggregate.DataSetFinalAggFunction.reduce 完成的,然後在其中直接發送給下游 out.collect(output)

@Override
public void run() throws Exception {
   // cache references on the stack
   final GroupReduceFunction<IT, OT> stub = this.taskContext.getStub();
 
   if (objectReuseEnabled) {
       ......	
   }
   else {
      final NonReusingKeyGroupedIterator<IT> iter = new NonReusingKeyGroupedIterator<IT>(this.input, this.comparator);
      // run stub implementation
      while (this.running && iter.nextKey()) {
         // stub.reduce 是用戶定義操作,runtime把某key對應的數據一次性傳給它
         stub.reduce(iter.getValues(), output);
      }
   }
}

從前文我們可以,這裏已經配置成了Operator Chain,所以out.collect(output)會調用到CountingCollector。CountingCollector的成員變量collector已經配置成了ChainedFlatMapDriver。

public void collect(OUT record) {
   this.numRecordsOut.inc();
   this.collector.collect(record);
}

this.collector = {ChainedFlatMapDriver@9643} 
 mapper = {FlatMapRunner@9610} 
 config = {TaskConfig@9655} 
 taskName = "FlatMap (select: (f0, mapToString($f1) AS type))"

於是程序就調用到了 ChainedFlatMapDriver.collect

public void collect(IT record) {
   try {
      this.numRecordsIn.inc();
      this.mapper.flatMap(record, this.outputCollector);
   } catch (Exception ex) {
      throw new ExceptionInChainedStubException(this.taskName, ex);
   }
}

最終調用棧如如下:

eval:21, UdfExample$MapToString (com.alibaba.alink)
flatMap:-1, DataSetCalcRule$14
flatMap:52, FlatMapRunner (org.apache.flink.table.runtime)
flatMap:31, FlatMapRunner (org.apache.flink.table.runtime)
collect:80, ChainedFlatMapDriver (org.apache.flink.runtime.operators.chaining)
collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
reduce:80, DataSetFinalAggFunction (org.apache.flink.table.runtime.aggregate)
run:131, GroupReduceDriver (org.apache.flink.runtime.operators)
run:504, BatchTask (org.apache.flink.runtime.operators)
invoke:369, BatchTask (org.apache.flink.runtime.operators)
doRun:707, Task (org.apache.flink.runtime.taskmanager)
run:532, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)

0x07 總結

由此我們可以看到:

  • GroupReduce,GroupCombine和mapPartition十分類似,都是從系統層面對算子進行優化,把循環操作放到用戶自定義函數來處理。
  • 對於group by這個SQL語句,Flink將其翻譯成 GroupReduce + GroupCombine,採用兩階段優化的方式來完成了對大數據下的處理。

0x08 參考

flink 使用問題匯總

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

※教你寫出一流的銷售文案?

※超省錢租車方案

分類
發燒車訊

漲姿勢了解一下Kafka消費位移可好?

摘要:Kafka中的位移是個極其重要的概念,因為數據一致性、準確性是一個很重要的語義,我們都不希望消息重複消費或者丟失。而位移就是控制消費進度的大佬。本文就詳細聊聊kafka消費位移的那些事,包括:

概念剖析

kafka的兩種位移

關於位移(Offset),其實在kafka的世界里有兩種位移:

  • 分區位移:生產者向分區寫入消息,每條消息在分區中的位置信息由一個叫offset的數據來表徵。假設一個生產者向一個空分區寫入了 10 條消息,那麼這 10 條消息的位移依次是 0、1、…、9;

  • 消費位移:消費者需要記錄消費進度,即消費到了哪個分區的哪個位置上,這是消費者位移(Consumer Offset)。

注意,這和上面所說的消息在分區上的位移完全不是一個概念。上面的“位移”表徵的是分區內的消息位置,它是不變的,即一旦消息被成功寫入到一個分區上,它的位移值就是固定的了。而消費者位移則不同,它可能是隨時變化的,畢竟它是消費者消費進度的指示器。

消費位移

消費位移,記錄的是 Consumer 要消費的下一條消息的位移,切記,是下一條消息的位移! 而不是目前最新消費消息的位移

假設一個分區中有 10 條消息,位移分別是 0 到 9。某個 Consumer 應用已消費了 5 條消息,這就說明該 Consumer 消費了位移為 0 到 4 的 5 條消息,此時 Consumer 的位移是 5,指向了下一條消息的位移。

至於為什麼要有消費位移,很好理解,當 Consumer 發生故障重啟之後,就能夠從 Kafka 中讀取之前提交的位移值,然後從相應的位移處繼續消費,從而避免整個消費過程重來一遍。就好像書籤一樣,需要書籤你才可以快速找到你上次讀書的位置。

那麼了解了位移是什麼以及它的重要性,我們自然而然會有一個疑問,kafka是怎麼記錄、怎麼保存、怎麼管理位移的呢?

位移的提交

Consumer 需要上報自己的位移數據,這個彙報過程被稱為位移提交。因為 Consumer 能夠同時消費多個分區的數據,所以位移的提交實際上是在分區粒度上進行的,即Consumer 需要為分配給它的每個分區提交各自的位移數據。

鑒於位移提交甚至是位移管理對 Consumer 端的巨大影響,KafkaConsumer API提供了多種提交位移的方法,每一種都有各自的用途,這些都是本文將要談到的方案。

void commitSync(Duration timeout);
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout);
void commitAsync();
void commitAsync(OffsetCommitCallback callback);
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);

先粗略的總結一下。位移提交分為自動提交和手動提交;而手動提交又分為同步提交和異步提交。

自動提交

當消費配置enable.auto.commit=true的時候代表自動提交位移。

自動提交位移是發生在什麼時候呢?auto.commit.interval.ms默認值是50000ms。即kafka每隔5s會幫你自動提交一次位移。自動位移提交的動作是在 poll()方法的邏輯里完成的,在每次真正向服務端發起拉取請求之前會檢查是否可以進行位移提交,如果可以,那麼就會提交上一次輪詢的位移。假如消費數據量特別大,可以設置的短一點。

越簡單的東西功能越不足,自動提交位移省事的同時肯定會帶來一些問題。自動提交帶來重複消費和消息丟失的問題:

  • 重複消費: 在默認情況下,Consumer 每 5 秒自動提交一次位移。現在,我們假設提交位移之後的 3 秒發生了 Rebalance 操作。在 Rebalance 之後,所有 Consumer 從上一次提交的位移處繼續消費,但該位移已經是 3 秒前的位移數據了,故在 Rebalance 發生前 3 秒消費的所有數據都要重新再消費一次。雖然你能夠通過減少 auto.commit.interval.ms 的值來提高提交頻率,但這麼做只能縮小重複消費的時間窗口,不可能完全消除它。這是自動提交機制的一個缺陷。

  • 消息丟失: 假設拉取了100條消息,正在處理第50條消息的時候,到達了自動提交窗口期,自動提交線程將拉取到的每個分區的最大消息位移進行提交,如果此時消費服務掛掉,消息並未處理結束,但卻提交了最大位移,下次重啟就從100條那消費,即發生了50-100條的消息丟失。

手動提交

當消費配置enable.auto.commit=false的時候代表手動提交位移。用戶必須在適當的時機(一般是處理完業務邏輯后),手動的調用相關api方法提交位移。比如在下面的案例中,我需要確認我的業務邏輯返回true之後再手動提交位移

 while (true) {
     try {
         ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
         if (!consumerRecords.isEmpty()) {
             for (ConsumerRecord<String, String> record : consumerRecords) {
                 KafkaMessage kafkaMessage = JSON.parseObject(record.value(), KafkaMessage.class);
                 // 處理業務
                 boolean handleResult = handle(kafkaMessage);
                 if (handleResult) {
                     log.info(" handle success, kafkaMessage={}" ,kafkaMessage);
                 } else {
                     log.info(" handle failed, kafkaMessage={}" ,kafkaMessage);
                 }
             }
             // 手動提交offset
             consumer.commitSync(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
        
         } 
     } catch (Exception e) {
         log.info("kafka consume error." ,e);
     }
 }

手動提交明顯能解決消息丟失的問題,因為你是處理完業務邏輯后再提交的,假如此時消費服務掛掉,消息並未處理結束,那麼重啟的時候還會重新消費。

但是對於業務層面的失敗導致消息未消費成功,是無法處理的。因為業務層的邏輯千變萬化、比如格式不正確,你叫Kafka消費端程序怎麼去處理?應該要業務層面自己處理,記錄失敗日誌做好監控等。

但是手動提交不能解決消息重複的問題,也很好理解,假如消費0-100條消息,50條時掛了,重啟後由於沒有提交這一批消息的offset,是會從0開始重新消費。至於如何避免重複消費的問題,在這篇文章有說。

手動提交又分為異步提交和同步提交。

同步提交

上面案例代碼使用的是commitSync() ,顧名思義,是同步提交位移的方法。同步提交位移Consumer 程序會處於阻塞狀態,等待 Broker 返回提交結果。同步模式下提交失敗的時候一直嘗試提交,直到遇到無法重試的情況下才會結束。在任何系統中,因為程序而非資源限制而導致的阻塞都可能是系統的瓶頸,會影響整個應用程序的 TPS。當然,你可以選擇拉長提交間隔,但這樣做的後果是 Consumer 的提交頻率下降,在下次 Consumer 重啟回來后,會有更多的消息被重新消費。因此,為了解決這些不足,kafka還提供了異步提交方法。

異步提交

異步提交會立即返回,不會阻塞,因此不會影響 Consumer 應用的 TPS。由於它是異步的,Kafka 提供了回調函數,供你實現提交之後的邏輯,比如記錄日誌或處理異常等。下面這段代碼展示了調用 commitAsync() 的方法

 consumer.commitAsync((offsets, exception) -> {
 if (exception != null)
     handleException(exception);
 });

但是異步提交會有一個問題,那就是它沒有重試機制,不過一般情況下,針對偶爾出現的提交失敗,不進行重試不會有太大問題,因為如果提交失敗是因為臨時問題導致的,那麼後續的提交總會有成功的。所以消息也是不會丟失和重複消費的。
但如果這是發生在關閉消費者或再均衡前的最後一次提交,就要確保能夠提交成功。因此,組合使用commitAsync()commitSync()是最佳的方式。

try {
    while (true) {
        ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
        if (!consumerRecords.isEmpty()) {
             for (ConsumerRecord<String, String> record : consumerRecords) {
                KafkaMessage kafkaMessage = JSON.parseObject(record.value(), KafkaMessage.class);
                boolean handleResult = handle(kafkaMessage);             
             }
             //異步提交位移               
             consumer.commitAsync((offsets, exception) -> {
             if (exception != null)
                 handleException(exception);
             });
           
        }
    }
} catch (Exception e) {
    System.out.println("kafka consumer error:" + e.toString());
} finally {
    try {
        //最後同步提交位移
        consumer.commitSync();
    } finally {
        consumer.close();
    }
}

讓位移提交更加靈活和可控

如果細心的閱讀了上面所有demo的代碼,那麼你會發現這樣幾個問題:

1、所有的提交,都是提交 poll 方法返回的所有消息的位移,poll 方法一次返回1000 條消息,則一次性地將這 1000 條消息的位移一併提交。可這樣一旦中間出現問題,位移沒有提交,下次會重新消費已經處理成功的數據。所以我想做到細粒度控制,比如每次提交100條,該怎麼辦?

答:可以通過commitSync(Map<TopicPartition, OffsetAndMetadata>)commitAsync(Map<TopicPartition, OffsetAndMetadata>)對位移進行精確控制。

2、poll和commit方法對於普通的開發人員而言是一個黑盒,無法精確地掌控其消費的具體位置。我都不知道這次的提交,是針對哪個partition,提交上去的offset是多少。

答:可以通過record.topic()獲取topic信息, record.partition()獲取分區信息,record.offset() + 1獲取消費位移,記住消費位移是指示下一條消費的位移,所以要加一。

3、我想自己管理offset怎麼辦?一方面更加保險,一方面下次重啟之後可以精準的從數據庫讀取最後的offset就不存在丟失和重複消費了。
答:可以將消費位移保存在數據庫中。消費端程序使用comsumer.seek方法指定從某個位移開始消費。

綜合以上幾個可優化點,並結合全文,可以給出一個比較完美且完整的demo:聯合異步提交和同步提交,對處理過程中所有的異常都進行了處理。細粒度的控制了消費位移的提交,並且保守的將消費位移記錄到了數據庫中,重新啟動消費端程序的時候會從數據庫讀取位移。這也是我們消費端程序位移提交的最佳實踐方案。你只要繼承這個抽象類,實現你具體的業務邏輯就可以了。

public abstract class PrefectCosumer {
    private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    int count = 0;
    public final void consume() {
        Properties properties = PropertiesConfig.getConsumerProperties();
        properties.put("group.id", getGroupId());
        Consumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(getTopics());
        consumer.poll(0);
        // 把offset記錄到數據庫中 從指定的offset處消費 
        consumer.partitionsFor(getTopics()).stream().map(info ->
        new TopicPartition(getTopics(), info.partition()))
        .forEach(tp -> {
               consumer.seek(tp, JdbcUtils.queryOffset().get(tp.partition()));   
         });
        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
                if (!consumerRecords.isEmpty()) {
                    for (ConsumerRecord<String, String> record : consumerRecords) {

                        KafkaMessage kafkaMessage = JSON.parseObject(record.value(), KafkaMessage.class);
                        boolean handleResult = handle(kafkaMessage);
                        if (handleResult) {
                            //注意:提交的是下一條消息的位移。所以OffsetAndMetadata 對象時,必須使用當前消息位移加 1。
                            offsets.put(new TopicPartition(record.topic(), record.partition()),
                                    new OffsetAndMetadata(record.offset() + 1));

                            // 細粒度控制提交 每10條提交一次offset
                            if (count % 10 == 0) {
                                // 異步提交offset
                                consumer.commitAsync(offsets, (offsets, exception) -> {
                                    if (exception != null) {
                                        handleException(exception);
                                    }
                                    // 將消費位移再記錄一份到數據庫中
                                    offsets.forEach((k, v) -> {
                                        String s = "insert into kafka_offset(`topic`,`group_id`,`partition_id`,`offset`) values" +
                                                " ('" + k.topic() + "','" + getGroupId() + "'," + k.partition() + "," + v.offset() + ")" +
                                                " on duplicate key update offset=values(offset);";
                                        JdbcUtils.insertTable(s);
                                    });


                                });
                            }
                            count++;
                        } else {         
                            System.out.println("消費消息失敗 kafkaMessage={}" + getTopics() + getGroupId() + kafkaMessage.toString());                         
                        }
                    }


                }
            }
        } catch (Exception e) {
            System.out.println("kafka consumer error:" + e.toString());
        } finally {
            try {
                // 最後一次提交 使用同步提交offset
                consumer.commitSync();
            } finally {
                consumer.close();
            }


        }
    }


    /**
     * 具體的業務邏輯
     *
     * @param kafkaMessage
     * @return
     */
    public abstract boolean handle(KafkaMessage kafkaMessage);

    public abstract List<String> getTopics();

    public abstract String getGroupId();

    void handleException(Exception e) {
        //異常處理
    }
}

控制位移提交的N種方式

剛剛我們說自己控制位移,使用seek方法可以指定offset消費。那到底怎麼控制位移?怎麼重設消費組位移?seek是什麼?現在就來仔細說說。

並不是所有的消息隊列都可以重設消費者組位移達到重新消費的目的。比如傳統的RabbitMq,它們處理消息是一次性的,即一旦消息被成功消費,就會被刪除。而Kafka消費消息是可以重演的,因為它是基於日誌結構(log-based)的消息引擎,消費者在消費消息時,僅僅是從磁盤文件上讀取數據而已,所以消費者不會刪除消息數據。同時,由於位移數據是由消費者控制的,因此它能夠很容易地修改位移的值,實現重複消費歷史數據的功能。

了解如何重設位移是很重要的。假設這麼一個場景,我已經消費了1000條消息后,我發現處理邏輯錯了,所以我需要重新消費一下,可是位移已經提交了,我到底該怎麼重新消費這1000條呢??假設我想從某個時間點開始消費,我又該如何處理呢?

首先說個誤區:auto.offset.reset=earliest/latest這個參數大家都很熟悉,但是初學者很容易誤會它。大部分朋友都覺得在任何情況下把這兩個值設置為earliest或者latest ,消費者就可以從最早或者最新的offset開始消費,但實際上並不是那麼回事,他們生效都有一個前提條件,那就是對於同一個groupid的消費者,如果這個topic某個分區有已經提交的offset,那麼無論是把auto.offset.reset=earliest還是latest,都將失效,消費者會從已經提交的offset開始消費。因此這個參數並不能解決用戶想重設消費位移的需求。

kafka有七種控制消費組消費offset的策略,主要分為位移維度和時間維度,包括:

  • 位移維度。這是指根據位移值來重設。也就是說,直接把消費者的位移值重設成我們給定的位移值。包括Earliest/Latest/Current/Specified-Offset/Shift-By-N策略

  • 時間維度。我們可以給定一個時間,讓消費者把位移調整成大於該時間的最小位移;也可以給出一段時間間隔,比如 30 分鐘前,然後讓消費者直接將位移調回 30 分鐘之前的位移值。包括DateTime和Duration策略

說完了重設策略,我們就來看一下具體應該如何實現,可以從兩個角度,API方式和命令行方式。

重設位移的方法之API方式

API方式只要記住用seek方法就可以了,包括seek,seekToBeginning 和 seekToEnd。

void seek(TopicPartition partition, long offset);    
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);    
void seekToBeginning(Collection<TopicPartition> partitions);    
void seekToEnd(Collection<TopicPartition> partitions);    

從方法簽名我們可以看出seekToBeginningseekToEnd是可以一次性重設n個分區的位移,而seek 只允許重設指定分區的位移,即為每個分區都單獨設置位移,因為不難得出,如果要自定義每個分區的位移值則用seek,如果希望kafka幫你批量重設所有分區位移,比如從最新數據消費或者從最早數據消費,那麼用seekToEnd和seekToBeginning。

Earliest 策略:從最早的數據開始消費

從主題當前最早位移處開始消費,這個最早位移不一定就是 0 ,因為很久遠的消息會被 Kafka 自動刪除,主要取決於你的刪除配置。

代碼如下:

Properties properties = PropertiesConfig.getConsumerProperties();
properties.put("group.id", getGroupId());
Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(getTopics());
consumer.poll(0);
consumer.seekToBeginning(
consumer.partitionsFor(getTopics()).stream().map(partitionInfo ->
   new TopicPartition(getTopics(), partitionInfo.partition()))
   .collect(Collectors.toList()));

首先是構造consumer對象,這樣我們可以通過partitionsFor獲取到分區的信息,然後我們就可以構造出TopicPartition集合,傳給seekToBegining方法。需要注意的一個地方是:需要用consumer.poll(0),而不能用consumer.poll(Duration.ofMillis(0))

在poll(0)中consumer會一直阻塞直到它成功獲取了所需的元數據信息,之後它才會發起fetch請求去獲取數據。而poll(Duration)會把元數據獲取也計入整個超時時間。由於本例中使用的是0,即瞬時超時,因此consumer根本無法在這麼短的時間內連接上coordinator,所以只能趕在超時前返回一個空集合。

Latest策略:從最新的數據開始消費

    consumer.seekToEnd(
        consumer.partitionsFor(getTopics().get(0)).stream().map(partitionInfo ->
            new TopicPartition(getTopics().get(0), partitionInfo.partition()))
              .collect(Collectors.toList()));

Current策略:從當前已經提交的offset處消費

consumer.partitionsFor(getTopics().get(0)).stream().map(info ->
        new TopicPartition(getTopics().get(0), info.partition()))
        .forEach(tp -> {
            long committedOffset = consumer.committed(tp).offset();
            consumer.seek(tp, committedOffset);
        });

**Special-offset策略:從指定的offset處消費 **

該策略使用的方法和current策略一樣,區別在於,current策略是直接從kafka元信息中讀取中已經提交的offset值,而special策略需要用戶自己為每一個分區指定offset值,我們一般是把offset記錄到數據庫中然後可以從數據庫去讀取這個值

    consumer.partitionsFor(getTopics().get(0)).stream().map(info ->
                new TopicPartition(getTopics().get(0), info.partition()))
                .forEach(tp -> {
                    try {
                        consumer.seek(tp, JdbcUtils.queryOffset().get(tp.partition()));
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                });

以上演示了用API方式重設位移,演示了四種常見策略的代碼,另外三種沒有演示,一方面是大同小異,另一方面在實際生產中,用API的方式不太可能去做時間維度的重設,而基本都是用命令行方式。

重設位移的方法之命令行方式

命令行方式重設位移是通過 kafka-consumer-groups 腳本。比起 API 的方式,用命令行重設位移要簡單得多。

Earliest 策略指定–to-earliest。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute

Latest 策略指定–to-latest。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute

Current 策略指定–to-current。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute

Specified-Offset 策略指定–to-offset。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset <offset> --execute

Shift-By-N 策略指定–shift-by N。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by <offset_N> --execute

DateTime 策略指定–to-datetime。

DateTime 允許你指定一個時間,然後將位移重置到該時間之後的最早位移處。常見的使用場景是,你想重新消費昨天的數據,那麼你可以使用該策略重設位移到昨天 0 點。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute

Duration 策略指定–by-duration。
Duration 策略則是指給定相對的時間間隔,然後將位移調整到距離當前給定時間間隔的位移處,具體格式是 PnDTnHnMnS。如果你熟悉 Java 8 引入的 Duration 類的話,你應該不會對這個格式感到陌生。它就是一個符合 ISO-8601 規範的 Duration 格式,以字母 P 開頭,後面由 4 部分組成,即 D、H、M 和 S,分別表示天、小時、分鐘和秒。舉個例子,如果你想將位移調回到 15 分鐘前,那麼你就可以指定 PT0H15M0S

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H30M0S --execute

提交的位移都去哪了?

通過上面那幾部分的內容,我們已經搞懂了位移提交的方方面面,那麼提交的位移它保存在哪裡呢?這就要去位移主題的的世界里一探究竟了。kafka把位移保存在一個叫做__consumer_offsets的內部主題中,叫做位移主題。

注意:老版本的kafka其實是把位移保存在zookeeper中的,但是zookeeper並不適合這種高頻寫的場景。所以新版本已經是改進了這個方案,直接保存到kafka。畢竟kafka本身就適合高頻寫的場景,並且kafka也可以保證高可用性和高持久性。

既然它也是主題,那麼離不開分區和副本這兩個機制。我們並沒有手動創建這個主題並且指定,所以是kafka自動創建的, 分區的數量取決於Broker 端參數 offsets.topic.num.partitions,默認是50個分區,而副本參數取決於offsets.topic.replication.factor,默認是3。

既然也是主題,肯定會有消息,那麼消息格式是什麼呢?參考前面我們手動設計將位移寫入數據庫的方案,我們保存了topic,group_id,partition,offset四個字段。topic,group_id,partition無疑是數據表中的聯合主鍵,而offset是不斷更新的。無疑kafka的位移主題消息也是類似這種設計。key也是那三個字段,而消息體其實很複雜,你可以先簡單理解為就是offset。

既然也是主題,肯定也會有刪除策略,否則消息會無限膨脹。但是位移主題的刪除策略和其他主題刪除策略又不太一樣。我們知道普通主題的刪除是可以通過配置刪除時間或者大小的。而位移主題的刪除,叫做 Compaction。Kafka 使用Compact 策略來刪除位移主題中的過期消息,對於同一個 Key 的兩條消息 M1 和 M2,如果 M1 的發送時間早於 M2,那麼 M1 就是過期消息。Compact 的過程就是掃描日誌的所有消息,剔除那些過期的消息,然後把剩下的消息整理在一起。

Kafka 提供了專門的後台線程定期地巡檢待 Compact 的主題,看看是否存在滿足條件的可刪除數據。這個後台線程叫 Log Cleaner。很多實際生產環境中都出現過位移主題無限膨脹佔用過多磁盤空間的問題,如果你的環境中也有這個問題,我建議你去檢查一下 Log Cleaner 線程的狀態,通常都是這個線程掛掉了導致的。

總結

kafka的位移是個極其重要的概念,控制着消費進度,也即控制着消費的準確性,完整性,為了保證消息不重複和不丟失。我們最好做到以下幾點:

  • 手動提交位移。

  • 手動提交有異步提交和同步提交兩種方式,既然兩者有利也有弊,那麼我們可以結合起來使用。

  • 細粒度的控制消費位移的提交,這樣可以避免重複消費的問題。

  • 保守的將消費位移再記錄到了數據庫中,重新啟動消費端程序的時候從數據庫讀取位移。

獲取Kafka全套原創學習資料及思維導圖,關注【胖滾豬學編程】公眾號,回復”kafka”。

本文來源於公眾號:【胖滾豬學編程】。一枚集顏值與才華於一身,不算聰明卻足夠努力的女程序媛。用漫畫形式讓編程so easy and interesting!求關注!

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

網頁設計最專業,超強功能平台可客製化

分類
發燒車訊

VSCode + WSL 2 + Ruby環境搭建詳解

vscode配置ruby開發環境

vscode近年來發展迅速,幾乎在3年之間就搶佔了原來vim、sublime text的很多份額,猶記得在2015-2016年的時候,ruby推薦的開發環境基本上都是vim和sublime text,然而,隨着vscode的發展,vscode下ruby的開發體驗已經非常不錯。現在基本上使用win 10 wsl2 + vscode + windows terminal的體驗已經不遜於mac + vim (sublime) + item 2的體驗了

總體步驟

使用win10專業版配置ruby開發環境大致分為以下幾步:

  1. 開啟win10 wsl功能
  2. 升級wsl2
  3. 安裝ubuntu
  4. 安裝ruby(rvm)
  5. 安裝vscode
  6. 安裝vscode wsl擴展
  7. 安裝vscode ruby相關擴展

經過以上7步就可以開始愉悅的ruby開發了,再開始之前,可以先看個效果圖。

1. 開啟win10 wsl功能

ruby對Linux和Mac比較友好,在windows下很多第三方庫要配合mingw或msys2才能安裝,不過好在windows 10提供了Linux子系統,在win10 2004版本中wsl也升級到了wsl2,速度更快,功能更完善。

要使用wsl2需要先在控制面板中開啟wsl功能:

  • 適用於Linux的Windows子系統
  • 虛擬機平台

2. 升級wsl2

目前wsl2還需要安裝一個內核升級包,具體可參考微軟說明:

  • wsl2安裝說明
  • wsl2 update包

更新包安裝完成后,輸入命令

wsl --set-default-version 2

3. 安裝Ubuntu

在微軟應用商店安裝Ubuntu,當前Ubuntu版本為20.04 LTS

安裝完成以後,配置Ubuntu默認為wsl2

# 查看
wsl --list --verbose

# 設置
wsl --set-version Ubuntu 2

4. 安裝ruby

在Linux下安裝ruby有多種方法,比較主流的方法是RVM,不過為了簡單起見,我直接通過ubuntu的apt工具進行了安裝。

關於RVM的安裝可參考如下網站:

  • RVM官網
  • RVM實用指南

通過APT安裝,輸入下列命令即可

sudo apt install ruby ruby-dev ri ruby-bundle

安裝完成以後需要配置gem國內鏡像,參考如下網址:

  • gem中文鏡像

輸入下列命令

# 設置gem source
gem sources --add https://gems.ruby-china.com/ --remove https://rubygems.org/

# 查看gem source
gem sources -l

# 設置bundle
bundle config mirror.https://rubygems.org https://gems.ruby-china.com

5. 安裝vscode

vscode直接在官網下載安裝即可,這裏我選擇了System Installer

  • vscode官網下載頁面

6. 安裝vscode wsl擴展

vscode安裝完成以後,可以在plugin中找到Remote – WSL擴展,點擊安裝即可

7. 安裝vscode ruby相關擴展

直接在plugin中搜索ruby在wsl中安裝下列五個擴展即可

  • Peng Lv/Ruby
  • Castwide/Ruby Solargraph(Language Server)
  • misogi/ruby-rubocop(Lint)
  • Simple Ruby ERB
  • endwise

其中,ruby solargraphrubocop除了安裝擴展,還需要通過gem安裝第三方包

sudo gem install rubocop
sudo gem install solargraph

重新加載vscode-wsl就可以愉快的使用ruby language進行開發了

vscode使用

在使用上基本只要require了相應的庫,就solargraph就會對require的庫中涉及的類和模塊進行提示,非常方便。唯一有問題的地方就是require的時候沒有提示,這可能就需要自己記一下庫的名稱,不過相比於原來已經好太多了,應該說在可以接受的範圍內。

1. 如果安裝了新的第三方庫會提示嗎?

如果安裝了sinatra這樣的庫,vscode-ruby如何給出提示呢?只需要Ctrl + Shift + P,選擇solargraph: build new gem documention即可

2. rubocop如何使用?

rubocop是一個Ruby Lint工具,可以進行Ruby代碼風格檢查,並能夠自動修復,只需要Ctrl + Shift + P,選擇Ruby: autocorrect by rubocop即可

3. 常用類型註釋

ruby是動態強類型語言,由於不需要指定函數返回值類型,這導致IDE無法自動推斷一些變量的類型。目前Python、PHP、TypeScript都在不斷的強化類型以方便IDE進行靜態檢查。IDE只有在知道類型的情況下才能準確地進行智能提示。

在ruby 2當中,我們可以通過類型註釋的方式增強IDE推斷能力。常見的類型註釋可參考YARD項目

下面代碼給出了一些示例。

require 'socket'

server = TCPServer.new 2000
loop do
  # 代碼塊參數類型註釋
  # @param {TCPSocket} client
  Thread.start(server.accept) do |client|
    client.puts 'hello !'
    client.puts "Time is #{Time.now}"
    client.close
  end
end

server = TCPServer.new 2000
loop do
  # 變量註釋
  # @type {TCPSocket} client
  client = server.accept
end

# 函數參數和返回值註釋,數組類型
# @param {Array(Integer)} nums
# @param {Integer} target
# @return {Array(Integer)}
def two_sum(nums, target)
  hash_nums = {}
  result = []
  nums.each_with_index do |num, index|
    hash_nums[num] = index
  end

  nums.each_with_index do |num, index|
    another = target - num
    if hash_nums[another] && hash_nums[another] != index
      result.push(index, hash_nums[another])
      break
    end
  end

  result
end

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

南投搬家公司費用需注意的眉眉角角,別等搬了再說!

※教你寫出一流的銷售文案?

分類
發燒車訊

SpringMVC-攔截器

概述

Java 里的攔截器是動態攔截 action 調用的對象。

可以在Controller 中的方法執行之前與執行之後,及頁面显示完畢后,執行指定的方法,自定義的攔截器必須實現HandlerInterceptor 接口。

方法介紹

preHandle

在業務處理器處理請求之前被調用

postHandle

在業務處理器處理完請求后

afterCompletion

在 DispatcherServlet 完全處理完請求后被調用

SpringMVC攔截器使用

攔截所有請求

創建一個類實現 HandlerInterceptor 接口

 

 配置文件當中添加攔截器

 

 內部源碼分析

 

 

 

 

攔截指定請求

 

 

配置多個攔截器執行順序

 

 

 

 

 

 

 

第 2 個返回 false

 

 

 

 

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※別再煩惱如何寫文案,掌握八大原則!

※產品缺大量曝光嗎?你需要的是一流包裝設計!

分類
發燒車訊

帶你學夠浪:Go語言基礎系列 – 8分鐘學複合類型

文章每周持續更新,原創不易,「三連」讓更多人看到是對我最大的肯定。可以微信搜索公眾號「 後端技術學堂 」第一時間閱讀(一般比博客早更新一到兩篇)

對於一般的語言使用者來說 ,20% 的語言特性就能夠滿足 80% 的使用需求,剩下在使用中掌握。基於這一理論,Go 基礎系列的文章不會刻意追求面面俱到,但該有知識點都會覆蓋,目的是帶你快跑趕上 Golang 這趟新車。

Hurry up , Let’s go !

前面我們學習過 Golang 中基礎數據類型,比如內置類型 int string bool 等,其實還有一些複雜一點點,但很好用的複合類型,類似 C 中的數組和 struct、C++ 中的 map ,今天我們就來學習 Go 中的複合類型。

通過本文的學習你將掌握以下知識:

  • 結構體
  • 指針類型
  • 數組和切片
  • 映射類型

指針

指針不保存實際數據的內容,而是保存了指向值的內存地址 。用 & 對變量取內存地址,用 * 來訪問指向的內存。這點和 C 中的指針是一樣,唯一不同的是 Go 中的指針不能運算。

 a := 3
 pa := &a // 用 `&` 對變量取內存地址
 fmt.Println("point", a, *pa) // 用 `*` 來訪問指向的內存

只聲明沒賦值的指針值是 nil ,代表空指針。

 var a0 *int // 只聲明沒賦值的指針是nil
 if a0 == nil {
  fmt.Println("point", "it is nil point")
 }

結構體

與C中的結構體類似, 結構體是一種聚合的數據類型,是由零個或多個任意類型的值聚合成的實體。每個值稱為結構體的成員,看例子:

type Test struct {
  a int
  b int
 }

語法上的不同看到了嗎? 每個結構體字段之後沒有分號,沒有分號寫起來還是很舒服的。

初始化

可以在定義的時候初始化

test := Test{1, 2}  // 定義結構體變量並初始化

初始化部分結構體字段

t2  = Test{a: 3}   //指定賦值Test.a為3 Test.b隱式賦值0

隱式初始化

t3  = Test{}       // .a .b都隱式賦值0

多個變量可以分組一起賦值

var (
    t1  = Test{8, 6}
    t2  = Test{a: 3}  //指定賦值Test.a Test.b隱式賦值0
    t3  = Test{}      // .a .b都隱式賦值0
    pt4 = &Test{8, 6} // 指針
)

訪問成員

通過 . 運算來訪問結構體成員,不區分結構體類型或是結構體指針類型。

fmt.Println("struct", st0.a, st0.b) // 通過 . 運算來訪問結構體成員

對於只聲明沒賦值的結構體,其內部變量被賦予零值,下面我們聲明了 st0 但沒有對其賦值。

var st0 Test  
fmt.Println("struct", st0.a, st0.b) //輸出:struct 0 0

數組

數組是一個由固定長度的特定類型元素組成的序列,一個數組可以由零個或多個元素組成。 數組可以用下標訪問元素,下標從 0 開始。

數組聲明后賦值

 var strarr [2]string // 數組聲明語法
 strarr[0] = "ready"
 strarr[1] = "go"

聲明賦值同時完成

 intarr := [5]int{6, 8, 9, 10, 7} // 聲明賦值同時完成

對於確定初始值個數的數組,可以省略數組長度

 intarr := [...]int{6, 8, 9, 10, 7} // 聲明賦值同時完成

Slice 切片

切片是變長的序列,序列中每個元素都有相同的類型。slice 語法和數組很像,只是沒有固定長度而已,切片底層引用一個數組對象,修改切片會修改原數組。

通過切片可以訪問數組的部分或全部元素,正因為切片長度不是固定的,因此切片比數組更加的常用。

聲明與初始化

常規初始化

簡短聲明並初始化切片

s0 := []int{1, 2, 3, 4, 5, 6} // 簡短聲明加賦值

聲明后再初始化

var s []int        // 聲明切片s
s = s0     // 用切片s0初始化切片s

聲明並初始化切片

var s00 []int = s0 // 用切片s0初始化切片s

切片的零值是 nil

// 切片的零值是nil 空切片長度和容量都是0
var nilslice []int
if nilslice == nil {
    fmt.Println("slice", "nilslice is nil ", len(nilslice), cap(nilslice))
}

make初始化

除了上述的常規初始化方法,還可以用 make 內置函數來創建切片

// 內建函數make創建切片,指定切片長度和容量
// make 函數會分配一個元素為零值的數組並返回一個引用了它的切片
s2 := make([]int, 4, 6) //創建元素都是0的切片s2, 長度為4,容量為6 第三個參數可以省略
fmt.Println("slice", len(s2), cap(s2), s2)

切片長度

長度表示切片中元素的數目,可用內置函數 len 函數得到。

切片容量

容量表示切片中第一個元素到引用的底層數組結尾所包含元素個數,可用內置函數 cap 求得。

切片區間

切片區間遵循「左閉右開」原則,

s0 := [5]int{6, 8, 9, 10, 7} // 數組定義
var slice []int = intarr[1:4]    // 創建切片slice 包含數組子序列

默認上下界。切片下界的默認值為 0,上界默認是該切片的長度。

fmt.Println("slice", s0[:], s0[0:], s0[:5], s0[0:5]) // 這四個切片相同

切片append操作

append 函數用於在切片末尾追加新元素。

添加元素也分兩種情況。

添加之後長度還在原切片容量範圍內

s2 := make([]int, 4, 6) //創建元素都是0的切片s2, 長度為4,容量為6 第三個參數可以省略
s22 := append(s2, 2)    // append每次都是在最後添加,所以此時,s21 s22指向同一個底層數組
fmt.Println(s21, s22)   // [0 0 0 0 2] [0 0 0 0 2]

添加元素之後長度超出原切片容量

此時會分配新的數組空間,並返回指向這個新分配的數組的切片。

下面例子中 s24 切片已經指向新分配的數組,s22 依然指向的是原來的數組空間,而 s24 已經指向了新的底層數組。

 s24 := append(s2, 1, 2, 3)
 fmt.Println(s24, s22) // s24 [0 0 0 0 1 2 3] [0 0 0 0 2]

二維切片

可以定義切片的切片,類似其他語言中的二維數組用法。參考代碼:

 s3 := [][]int{
  {1, 1, 1},
  {2, 2, 2},
 }
 fmt.Println(s3, s3[0], len(s3), cap(s3)) // 輸出: [[1 1 1] [2 2 2]] [1 1 1] 2 2

map 映射類型

在 Go 中 map 是鍵值對類型,代表 keyvalue 的映射關係,一個map就是一個哈希表的引用 。

定義和初始化

下面這樣定義並初始化一個 map 變量

 m0 := map[int]string{
  0: "0",
  1: "1",
 }

也可以用內置 make 函數來初始化一個 map 變量,後續再向其中添加鍵值對。像下面這樣:

 m1 := make(map[int]string) // make 函數會返回給定類型的映射,並將其初始化備用
 if m1 != nil {
  fmt.Println("map", "m1 is not nil", m1) // m1 不是nil
 }
 m1[0] = "1"
 m1[1] = "2"

注意:只聲明不初始化的map變量是 nil 映射,不能直接拿來用!

 var m map[int]string // 未初始化的m零值是nil映射
 if m == nil {
  fmt.Println("map", "m is nil", m)
 }
 //m[0] = "1" // 這句引發panic異常, 映射的零值為 nil 。nil映射既沒有鍵,也不能添加鍵。

元素讀取

使用語法:vaule= m[key] 獲取鍵 key 對應的元素 vaule 。

上面我們只用了一個變量來獲取元素,其實這個操作會返回兩個值,第一個返回值代表讀書的元素,第二個返回值是代表鍵是否存在的 bool 類型,舉例說明:

 v, st := m1[0]  // v是元素值,下標對應的元素存在st=true 否則st=false
 _, st1 := m1[0] // _ 符號表示忽略第一個元素
 v1, _ := m1[0]  // _ 符號表示忽略第二個元素 
 fmt.Println(v, st, v1, st1, m1[2]) // m1[2]不存在,返回元素string的零值「空字符」

刪除元素

內置函數 delete 可以刪除 map 元素,舉例:

delete(m1, 1)  // 刪除鍵是 1 的元素

range 遍歷

range 用於遍歷 切片 或 映射。

數組或切片遍歷

當使用for 循環和 range 遍曆數組或切片時,每次迭代都會返回兩個值。第一個值為當前元素的下標,第二個值為該下標所對應元素的一份副本。

s1 := []int{1, 2, 3, 4, 5, 6}  
for key, vaule := range s1 {
    fmt.Println("range", key, vaule)
}

for key := range s1 { // 只需要索引,忽略第二個變量即可
    fmt.Println("range", key)
}

for _, vaule := range s1 { // 只需要元素值,用'_'忽略索引
    fmt.Println("range", vaule)
}

map 遍歷

當使用for 循環和 range 遍歷map 時,每次迭代都會返回兩個值。第一個值為當前元素 key , 第二個值是 value

m0 := map[int]string{
    0: "0",
    1: "1",
}
fmt.Println("map", m0)

for k, v := range m0 { // range遍歷映射,返回key 和 vaule
    fmt.Println("map", "m0 key:", k, "vaule:", v)
}

總結

通過本文的學習,我們掌握了 Golang 中基本的控制流語句,利用這些控制語句加上一節介紹的變量等基礎知識,可以構成豐富的程序邏輯,你就能用 Golang 來做一些有意思的事情了。

感謝各位的閱讀,文章的目的是分享對知識的理解,技術類文章我都會反覆求證以求最大程度保證準確性,若文中出現明顯紕漏也歡迎指出,我們一起在探討中學習.

今天的技術分享就到這裏,我們下期再見。

創作不易,白票不是好習慣,如果在我這有收穫,動動手指「點贊」「關注」是對我持續創作的最大支持。

可以微信搜索公眾號「 後端技術學堂 」回復「資料」「1024」有我給你準備的各種編程學習資料。文章每周持續更新,我們下期見!

本文使用 mdnice 排版

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※別再煩惱如何寫文案,掌握八大原則!

※產品缺大量曝光嗎?你需要的是一流包裝設計!

分類
發燒車訊

新能源汽車需求井噴 助推鋰材料超預期大漲

據中國汽車工業協會最新發佈數據,今年前6個月我國新能源汽車產量達到7.6萬輛,這一產量同比增幅達到2.5倍。   然而,新能源整車產量快速增長的同時,配套動力電池的產量卻出現缺口。“現在動力電池基本上只要能造出來,銷售出去的問題不大。”乘用車市場資訊聯席會秘書長崔東樹向記者表示,這不僅大大制約了新能源汽車產能的釋放,同時也影響了動力電池技術的進步,“大家都忙著造,很難有人沉下心來做研發。”   實際上,據專家介紹,新能源汽車電池在生產上的技術門檻並不高,這直接導致的是動力電池產能處於快速擴張當中。然而,大批量技術含量較低電池企業的投產,則可能讓國內電池產能由短缺轉向過剩。業內人士預計,隨著產能的快速實現,電池產業可能將在2016年下半年迎來洗牌。  
研發上與日韓有較大差距   “國內電池企業在自動化和研發能力上都與日韓企業有較大差距。”華霆動力技術有限公司的一位負責人向記者介紹,目前日韓企業在生產成本和技術上都整體領先於國內動力電池企業。   一位電池技術專家告訴記者,現階段國內動力電池企業的生產成本大約是2元每瓦時,按照容量為25千瓦時的動力電池計算,成本大約在5萬元左右。   這樣的成本明顯高於LG、三星等韓國動力電池生產企業。據介紹,韓企的成本已降至1.8元每瓦時以下,這意味著同樣是25千瓦時的動力電池,其成本將會低於4.5萬元。   不僅如此,國內電池企業的能量密度也低於日韓企業。上述電池技術專家介紹,國內較好的動力電池模組的能量密度在130瓦時每千克,而松下等日本企業生產動力電池模組的能量密度則能超過200瓦時每千克,LG、三星等韓國企業所生產動力電池也能達到180瓦時每千克左右。   這意味著,國內電池企業生產容量25千瓦時的電池重量將超過190千克,而同樣容量的電池,韓企生產出來的重量為140千克左右,部分日企則能達到125千克。這對於新能源整車的輕量化影響不小。   “目前,在動力電池領域,松下領先LG和三星12~18個月,而LG和三星則領先國內企業12~18個月。”國內某動力電池企業的負責人向記者坦言,“國內電池企業的自動化程度不高,研發和製造水準都趕不上。”   乘聯會資料顯示,國內新能源整車企業除比亞迪擁有自己的配套電池廠外,大多數都通過外採的方式解決電池問題。  
明年底行業恐面臨洗牌   “現在國內電池企業的狀態普遍很浮躁。”上述電池企業負責人向記者表示,由於新能源車企對配套電池的需求持續旺盛,電池企業對產能投入的熱情已大於對研發和技術的追求。 隨著近年來新能源汽車產銷量高速增長,汽車電池產量的缺口也逐步展現出來。這激發了配套電池企業的投產熱情。   一位動力電池公司負責人向記者介紹,僅LG、三星、力神和CATL四家動力電池企業,明年將投產的產能就高達10吉瓦時以上,而每吉瓦時的電池產能可以滿足大約4萬輛新能源汽車的需要,也就是說,僅上述四家動力電池企業的產能就可以滿足40萬輛新能源車的裝配需要。“動力電池的技術門檻並不高。”一位充電設施企業的負責人告訴記者,目前動力電池的核心技術已相對成熟,因此企業實現投產並不難,這造成很多實力並不強大的電池企業紛紛上專案。   不過,“國內主流的12家新能源整車企業的採購,基本上都來自5家主要的動力電池廠家。”一位電池企業負責人向記者表示,隨著具備技術優勢的大電池企業產能的跟上,在技術和成本上都不具備優勢的小企業將很難生存,因此他預測“2016年底電池企業將面臨洗牌”。   乘聯會的資料顯示,目前電池企業CATL主要供應北汽、廣汽、長安和宇通等新能源車企;天津力神主要供應江淮、康迪、廣汽和宇通等;國軒高科則供應康迪、江淮、金龍、安凱和申沃等車企;萬向億能則供應上汽、奇瑞、廣汽、青年等車企。   文章來源:中財網

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

分類
發燒車訊

EnergyTrend微信新服務“太陽能價格評析” 立即關注!

EnergyTrend微信(簡中版)
只要在微信關注我們

EnergyTrend專業的分析師將直接告訴您
最即時的太陽能市場價格微評論

讓您一機在手隨時隨地輕鬆掌握太陽能市場最新動態

[加入關注]
1.微信→朋友→新增朋友→掃描QR Code→將右側QR Code置於方框內→關注
2.微信→朋友→新增朋友→查找官方帳戶→輸入”集邦新能源網”→關注

 

 

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

※推薦評價好的iphone維修中心

分類
發燒車訊

中國積極推動無補貼綠能專案,太陽能市場有望回穩

雖然中國政府在 2018 年中旬推出 531 新政,讓該國投資去年太陽能總投資下降 53%,重挫當地太陽能投資與建設發展,但該國政府目前已推出無補貼再生能源計畫,或許有望重振中國太陽能市場,彭博能源財經(BNEF)推測 2019 年中國太陽能新增裝置量仍可達到 34-44GW。

中國過去一直以來對當地太陽能產業發展相當優待,提供優渥的補貼金額與固定電價價格以鼓勵太陽能等再生能源發展,大量企業開始投資太陽能產業,形成一股靠補貼攀升的太陽能熱潮,造成產能過剩與補貼缺口過大,據統計,截至 2017 年底,再生能源補貼缺口已達 1,000 億人民幣(約新台幣 4,600 億元)。

因此中國政府在 2018 年中無預警推出新政策,大幅限制電廠建設與補助,為中國高速發展的太陽能產業踩下煞車踏板,未來也將採取嚴控指標方式,並積極鼓勵低價補貼或是無補貼專案。

無補貼專案優惠多

就好比中國在去年 8 月推出首項無補貼太陽能示範專案規劃,每省限定 300-500MW,並在 10 月開始申請、預計在 2019 年 3 月前後開工拚年底前併網發電。日前該政府也為了促進再生能源無補貼發展公布 12 項全新計畫,像是要做好風電、光電發電量檢測,不能在電力供過於求等預警紅色地區推行專案,廠商也要保證將來可以全額併網發電與不浪費。

中國此次將「無補貼專案」定義為無國家補貼、先導計畫不限規模、不佔用補貼指標的計畫,因此在政策方面也有釋出許多善意,要求地方政府對無補貼太陽能與風電在土地利用、成本上給予支持之餘,政府也會為綠色證書市場化交易指出明燈,未來無補貼或是低價補貼專案可以透過中國綠色電力證書交易獲益,與此同時也要求電力公司讓無補貼專案優先發電和全額收購電量,並鼓勵金融機構支持無補貼專案。

另外中國政府也將執行固定電價(FIT)政策,無補貼、低價補貼風電與太陽能專案可簽定 20 年以上的購電合約,提高電價的長期穩定性,也不要會求參與跨區電力市場化交易。

中國國家發改委表示,這些專案獲得核准後就能在 2020 年底前開始施工,但沒有在限定時間完工的專案將會被取消,為其他無補貼專案挪出空間。並明確指出,從現在到 2020 年底前獲準的專案都可採用這項政策,政府則會在 2020 年後依據技術與成本擬定新的政策。

只不過中國目前也有不少地方無法規劃無補貼專案,對此中國政府也表示,推動低價專案並非立即取消所有補貼,若無法達到無補貼或低價補貼仍可採競爭性配置專案政策,並希望這些專案可透過競爭降低電價以減少政府補貼。

BNEF 分析師 Jonathan Luan 表示,這些政策代表著未來中國將朝無補貼再生能源邁進,並有機會促成全新的太陽能專案,公司則對中國的太陽能市場樂觀看待,預測 2019 年新增裝置量可達 34-44GW。

(合作媒體:。首圖來源: CC BY 2.0)

延伸閱讀:

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

※教你寫出一流的銷售文案?

※超省錢租車方案

分類
發燒車訊

2019 太陽能五大趨勢:市場走向穩定與分散,度電成本將成為供應鏈價格依歸

2018 年可說是太陽能產業近年來波動最大的一年,歷經美國 201、301 條款,中國 531 新政,印度防衛性關稅,歐盟 MIP 結束等變動,從最上游的供應鏈到最下游的系統端都呈現極不穩定的狀態。由 EnergyTrend 所盤整的 2019 年五大趨勢來看,市況將會好轉,且產業也將在持續的變動中逐漸成熟。

趨勢一:2018 年低谷不低,2019 需求再創新高

中國的「531 新政」雖對市場造成衝擊,但因海外市場的需求走強,加上中國市場所受衝擊輕於預期,使 2018 年出現「低谷不低」的現象,預期全年新增併網量可達到 103GW(實際出貨量約95GW),年增4.9%。

展望 2019 年,在政策鼓勵與供應鏈價格持續下降的推波助瀾下,全球需求預計將繼續正成長,其中又以歐洲的成長幅度最大,最多可超過五成。2019 年預期新增併網量將來到 111.3GW,出現 7.7% 的成長,再次創下歷史新高。

趨勢二:市場持續分散,2019 年 GW 級市場增至 15 

全球市場規模自 2018 年起預計會持穩在 100-120GW 之間,各年度需求量變化幅度將低於 10%。而根據 EnergyTrend 的最新需求報告統計,GW 級市場從 2016  年的 6  個成長到 2019 年將有 15 個,可見市場持續分散化的趨勢。

2016-2020 年 GW 級市場

中國、美國將持續穩居全球前二大市場,印度則從 2017 年起成為第三大需求國,日本次之。東南亞、北非、中東、拉丁美洲等新興市場自 2018 年崛起,如中東地區 2018 年全年需求預計將較 2017 年增加近 100%,2019  年還將增加 50% 左右。全球市場規模自 2019 年起將趨於穩定,印度最有可能出現較大幅度的需求成長。

2016-2023 年全球市場需求趨勢

趨勢三:供應鏈上游更為集中,單晶將逆轉市佔

雖然供應鏈整體在 2018 年陷於供過於求、低利潤的困境,但技術和成本優勢較強、全球布局較廣的一線大廠仍保有強勁的營運動能,既有的擴產計畫多能持續進行,使供應鏈廠家有持續集中化的現象。根據 EnergyTrend  的供給資料庫,中國前五大多晶矽廠的新產能預計在 2Q19 陸續開出,屆時前五大廠的產能將佔全球近 70%,且現金成本更具競爭力。在矽晶圓環節,則將呈現隆基與中環雙龍頭主宰市場的現象,單晶供應鏈也將因而變得更具主導性,有機會拉升全年單晶佔比來到 6 成,2017 年底展開的單多晶之戰逐漸落幕。

趨勢四:雙面產品產能倍增,P-PERC 效率還有成長空間

雙面電池技術已十分成熟,且可在幾乎不增加額外成本的前提下創造額外的發電收益,因此產能比例持續上升,預計 2019 年雙面電池的總產能將接近 40GW,且以雙面單晶 PERC 電池產能增加最多。另一方面,單晶 PERC 電池的量產效率仍有成長空間。據 EnergyTrend 調查,單晶 PERC 電池的平均量產效率在 2019 年上半年即可站上 22%,且還可導入更多技術,在 2019 年底效率可望上看 23%。而單晶 PERC 的強勢也壓縮了次世代 N 型技術的發展空間,2019 年 N 型產能預期僅會有小幅增加。

雙面電池產能成長趨勢(Unit: GW)

趨勢五:均化度電成本成為模組價格降價指標

供應鏈價格持續下探,使太陽能逐步朝擺脫補貼、平價上網的方向邁進;而無補貼系統的普及程度及其實際的均化度電成本(LCOE)將成為未來供應鏈的價格指標。

太陽能產業在 2018 年面臨強大考驗,但同時也進入產業盤整階段,預期長期發展將趨於穩定化與健康化,供應鏈的價格將以整體系統的度電成本為依歸。儲能系統與智慧電網技術的投入,將成為太陽能產業進一步市場化的關鍵。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

網頁設計最專業,超強功能平台可客製化