分類
發燒車訊

Java I/O體系從原理到應用,這一篇全說清楚了

本文介紹操作系統I/O工作原理,Java I/O設計,基本使用,開源項目中實現高性能I/O常見方法和實現,徹底搞懂高性能I/O之道

基礎概念

在介紹I/O原理之前,先重溫幾個基礎概念:

  • (1) 操作系統與內核

操作系統:管理計算機硬件與軟件資源的系統軟件
內核:操作系統的核心軟件,負責管理系統的進程、內存、設備驅動程序、文件和網絡系統等等,為應用程序提供對計算機硬件的安全訪問服務

  • 2 內核空間和用戶空間

為了避免用戶進程直接操作內核,保證內核安全,操作系統將內存尋址空間劃分為兩部分:
內核空間(Kernel-space),供內核程序使用
用戶空間(User-space),供用戶進程使用
為了安全,內核空間和用戶空間是隔離的,即使用戶的程序崩潰了,內核也不受影響

  • 3 數據流

計算機中的數據是基於隨着時間變換高低電壓信號傳輸的,這些數據信號連續不斷,有着固定的傳輸方向,類似水管中水的流動,因此抽象數據流(I/O流)的概念:指一組有順序的、有起點和終點的字節集合

抽象出數據流的作用:實現程序邏輯與底層硬件解耦,通過引入數據流作為程序與硬件設備之間的抽象層,面向通用的數據流輸入輸出接口編程,而不是具體硬件特性,程序和底層硬件可以獨立靈活替換和擴展

I/O 工作原理

1 磁盤I/O

典型I/O讀寫磁盤工作原理如下:

tips: DMA:全稱叫直接內存存取(Direct Memory Access),是一種允許外圍設備(硬件子系統)直接訪問系統主內存的機制。基於 DMA 訪問方式,系統主內存與硬件設備的數據傳輸可以省去CPU 的全程調度

值得注意的是:

  • 讀寫操作基於系統調用實現
  • 讀寫操作經過用戶緩衝區,內核緩衝區,應用進程並不能直接操作磁盤
  • 應用進程讀操作時需阻塞直到讀取到數據

2 網絡I/O

這裏先以最經典的阻塞式I/O模型介紹:

tips:recvfrom,經socket接收數據的函數

值得注意的是:

  • 網絡I/O讀寫操作經過用戶緩衝區,Sokcet緩衝區
  • 服務端線程在從調用recvfrom開始到它返回有數據報準備好這段時間是阻塞的,recvfrom返回成功后,線程開始處理數據報

Java I/O設計

1 I/O分類

Java中對數據流進行具體化和實現,關於Java數據流一般關注以下幾個點:

  • (1) 流的方向
    從外部到程序,稱為輸入流;從程序到外部,稱為輸出流

  • (2) 流的數據單位
    程序以字節作為最小讀寫數據單元,稱為字節流,以字符作為最小讀寫數據單元,稱為字符流

  • (3) 流的功能角色

從/向一個特定的IO設備(如磁盤,網絡)或者存儲對象(如內存數組)讀/寫數據的流,稱為節點流
對一個已有流進行連接和封裝,通過封裝后的流來實現數據的讀/寫功能,稱為處理流(或稱為過濾流);

2 I/O操作接口

java.io包下有一堆I/O操作類,初學時看了容易搞不懂,其實仔細觀察其中還是有規律:
這些I/O操作類都是在繼承4個基本抽象流的基礎上,要麼是節點流,要麼是處理流

2.1 四個基本抽象流

java.io包中包含了流式I/O所需要的所有類,java.io包中有四個基本抽象流,分別處理字節流和字符流:

  • InputStream
  • OutputStream
  • Reader
  • Writer

2.2 節點流

節點流I/O類名由節點流類型 + 抽象流類型組成,常見節點類型有:

  • File文件
  • Piped 進程內線程通信管道
  • ByteArray / CharArray (字節數組 / 字符數組)
  • StringBuffer / String (字符串緩衝區 / 字符串)

節點流的創建通常是在構造函數傳入數據源,例如:

FileReader reader = new FileReader(new File("file.txt"));
FileWriter writer = new FileWriter(new File("file.txt"));

2.3 處理流

處理流I/O類名由對已有流封裝的功能 + 抽象流類型組成,常見功能有:

  • 緩衝:對節點流讀寫的數據提供了緩衝的功能,數據可以基於緩衝批量讀寫,提高效率。常見有BufferedInputStream、BufferedOutputStream
  • 字節流轉換為字符流:由InputStreamReader、OutputStreamWriter實現
  • 字節流與基本類型數據相互轉換:這裏基本數據類型數據如int、long、short,由DataInputStream、DataOutputStream實現
  • 字節流與對象實例相互轉換:用於實現對象序列化,由ObjectInputStream、ObjectOutputStream實現

處理流的應用了適配器/裝飾模式,轉換/擴展已有流,處理流的創建通常是在構造函數傳入已有的節點流或處理流:

FileOutputStream fileOutputStream = new FileOutputStream("file.txt");
// 擴展提供緩衝寫
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream);
 // 擴展提供提供基本數據類型寫
DataOutputStream out = new DataOutputStream(bufferedOutputStream);

3 Java NIO

3.1 標準I/O存在問題

Java NIO(New I/O)是一個可以替代標準Java I/O API的IO API(從Java 1.4開始),Java NIO提供了與標準I/O不同的I/O工作方式,目的是為了解決標準 I/O存在的以下問題:

  • (1) 數據多次拷貝

標準I/O處理,完成一次完整的數據讀寫,至少需要從底層硬件讀到內核空間,再讀到用戶文件,又從用戶空間寫入內核空間,再寫入底層硬件

此外,底層通過write、read等函數進行I/O系統調用時,需要傳入數據所在緩衝區起始地址和長度
由於JVM GC的存在,導致對象在堆中的位置往往會發生移動,移動後傳入系統函數的地址參數就不是真正的緩衝區地址了

可能導致讀寫出錯,為了解決上面的問題,使用標準I/O進行系統調用時,還會額外導致一次數據拷貝:把數據從JVM的堆內拷貝到堆外的連續空間內存(堆外內存)

所以總共經歷6次數據拷貝,執行效率較低

  • (2) 操作阻塞

傳統的網絡I/O處理中,由於請求建立連接(connect),讀取網絡I/O數據(read),發送數據(send)等操作是線程阻塞的

// 等待連接
Socket socket = serverSocket.accept();

// 連接已建立,讀取請求消息
StringBuilder req = new StringBuilder();
byte[] recvByteBuf = new byte[1024];
int len;
while ((len = socket.getInputStream().read(recvByteBuf)) != -1) {
    req.append(new String(recvByteBuf, 0, len, StandardCharsets.UTF_8));
}

// 寫入返回消息
socket.getOutputStream().write(("server response msg".getBytes()));
socket.shutdownOutput();

以上面服務端程序為例,當請求連接已建立,讀取請求消息,服務端調用read方法時,客戶端數據可能還沒就緒(例如客戶端數據還在寫入中或者傳輸中),線程需要在read方法阻塞等待直到數據就緒

為了實現服務端併發響應,每個連接需要獨立的線程單獨處理,當併發請求量大時為了維護連接,內存、線程切換開銷過大

3.2 Buffer

Java NIO核心三大核心組件是Buffer(緩衝區)、Channel(通道)、Selector

Buffer提供了常用於I/O操作的字節緩衝區,常見的緩存區有ByteBuffer, CharBuffer, DoubleBuffer, FloatBuffer, IntBuffer, LongBuffer, ShortBuffer,分別對應基本數據類型: byte, char, double, float, int, long, short,下面介紹主要以最常用的ByteBuffer為例,Buffer底層支持Java堆內(HeapByteBuffer)或堆外內存(DirectByteBuffer)

堆外內存是指與堆內存相對應的,把內存對象分配在JVM堆以外的內存,這些內存直接受操作系統管理(而不是虛擬機,相比堆內內存,I/O操作中使用堆外內存的優勢在於:

  • 不用被JVM GC線回收,減少GC線程資源佔有
  • 在I/O系統調用時,直接操作堆外內存,可以節省一次堆外內存和堆內內存的複製

ByteBuffer底層堆外內存的分配和釋放基於malloc和free函數,對外allocateDirect方法可以申請分配堆外內存,並返回繼承ByteBuffer類的DirectByteBuffer對象:

public static ByteBuffer allocateDirect(int capacity) {
    return new DirectByteBuffer(capacity);
}

堆外內存的回收基於DirectByteBuffer的成員變量Cleaner類,提供clean方法可以用於主動回收,Netty中大部分堆外內存通過記錄定位Cleaner的存在,主動調用clean方法來回收;
另外,當DirectByteBuffer對象被GC時,關聯的堆外內存也會被回收

tips: JVM參數不建議設置-XX:+DisableExplicitGC,因為部分依賴Java NIO的框架(例如Netty)在內存異常耗盡時,會主動調用System.gc(),觸發Full GC,回收DirectByteBuffer對象,作為回收堆外內存的最後保障機制,設置該參數之後會導致在該情況下堆外內存得不到清理

堆外內存基於基礎ByteBuffer類的DirectByteBuffer類成員變量:Cleaner對象,這個Cleaner對象會在合適的時候執行unsafe.freeMemory(address),從而回收這塊堆外內存

Buffer可以見到理解為一組基本數據類型,存儲地址連續的的數組,支持讀寫操作,對應讀模式和寫模式,通過幾個變量來保存這個數據的當前位置狀態:capacity、 position、 limit:

  • capacity 緩衝區數組的總長度
  • position 下一個要操作的數據元素的位置
  • limit 緩衝區數組中不可操作的下一個元素的位置:limit <= capacity

3.3 Channel

Channel(通道)的概念可以類比I/O流對象,NIO中I/O操作主要基於Channel:
從Channel進行數據讀取 :創建一個緩衝區,然後請求Channel讀取數據
從Channel進行數據寫入 :創建一個緩衝區,填充數據,請求Channel寫入數據

Channel和流非常相似,主要有以下幾點區別:

  • Channel可以讀和寫,而標準I/O流是單向的
  • Channel可以異步讀寫,標準I/O流需要線程阻塞等待直到讀寫操作完成
  • Channel總是基於緩衝區Buffer讀寫

Java NIO中最重要的幾個Channel的實現:

  • FileChannel: 用於文件的數據讀寫,基於FileChannel提供的方法能減少讀寫文件數據拷貝次數,後面會介紹
  • DatagramChannel: 用於UDP的數據讀寫
  • SocketChannel: 用於TCP的數據讀寫,代表客戶端連接
  • ServerSocketChannel: 監聽TCP連接請求,每個請求會創建會一個SocketChannel,一般用於服務端

基於標準I/O中,我們第一步可能要像下面這樣獲取輸入流,按字節把磁盤上的數據讀取到程序中,再進行下一步操作,而在NIO編程中,需要先獲取Channel,再進行讀寫

FileInputStream fileInputStream = new FileInputStream("test.txt");
FileChannel channel = fileInputStream.channel();

tips: FileChannel僅能運行在阻塞模式下,文件異步處理的 I/O 是在JDK 1.7 才被加入的 java.nio.channels.AsynchronousFileChannel

// server socket channel:
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(InetAddress.getLocalHost(), 9091));

while (true) {
    SocketChannel socketChannel = serverSocketChannel.accept();
    ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
    int readBytes = socketChannel.read(buffer);
    if (readBytes > 0) {
        // 從寫數據到buffer翻轉為從buffer讀數據
        buffer.flip();
        byte[] bytes = new byte[buffer.remaining()];
        buffer.get(bytes);
        String body = new String(bytes, StandardCharsets.UTF_8);
        System.out.println("server 收到:" + body);
    }
}

3.4 Selector

Selector(選擇器) ,它是Java NIO核心組件中的一個,用於檢查一個或多個NIO Channel(通道)的狀態是否處於可讀、可寫。實現單線程管理多個Channel,也就是可以管理多個網絡連接

Selector核心在於基於操作系統提供的I/O復用功能,單個線程可以同時監視多個連接描述符,一旦某個連接就緒(一般是讀就緒或者寫就緒),能夠通知程序進行相應的讀寫操作,常見有select、poll、epoll等不同實現

Java NIO Selector基本工作原理如下:

  • (1) 初始化Selector對象,服務端ServerSocketChannel對象
  • (2) 向Selector註冊ServerSocketChannel的socket-accept事件
  • (3) 線程阻塞於selector.select(),當有客戶端請求服務端,線程退出阻塞
  • (4) 基於selector獲取所有就緒事件,此時先獲取到socket-accept事件,向Selector註冊客戶端SocketChannel的數據就緒可讀事件事件
  • (5) 線程再次阻塞於selector.select(),當有客戶端連接數據就緒,可讀
  • (6) 基於ByteBuffer讀取客戶端請求數據,然後寫入響應數據,關閉channel

示例如下,完整可運行代碼已經上傳github(https://github.com/caison/caison-blog-demo):

Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(9091));
// 配置通道為非阻塞模式
serverSocketChannel.configureBlocking(false);
// 註冊服務端的socket-accept事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
    // selector.select()會一直阻塞,直到有channel相關操作就緒
    selector.select();
    // SelectionKey關聯的channel都有就緒事件
    Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();

    while (keyIterator.hasNext()) {
        SelectionKey key = keyIterator.next();
        // 服務端socket-accept
        if (key.isAcceptable()) {
            // 獲取客戶端連接的channel
            SocketChannel clientSocketChannel = serverSocketChannel.accept();
            // 設置為非阻塞模式
            clientSocketChannel.configureBlocking(false);
            // 註冊監聽該客戶端channel可讀事件,併為channel關聯新分配的buffer
            clientSocketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocateDirect(1024));
        }

        // channel可讀
        if (key.isReadable()) {
            SocketChannel socketChannel = (SocketChannel) key.channel();
            ByteBuffer buf = (ByteBuffer) key.attachment();

            int bytesRead;
            StringBuilder reqMsg = new StringBuilder();
            while ((bytesRead = socketChannel.read(buf)) > 0) {
                // 從buf寫模式切換為讀模式
                buf.flip();
                int bufRemain = buf.remaining();
                byte[] bytes = new byte[bufRemain];
                buf.get(bytes, 0, bytesRead);
                // 這裏當數據包大於byteBuffer長度,有可能有粘包/拆包問題
                reqMsg.append(new String(bytes, StandardCharsets.UTF_8));
                buf.clear();
            }
            System.out.println("服務端收到報文:" + reqMsg.toString());
            if (bytesRead == -1) {
                byte[] bytes = "[這是服務回的報文的報文]".getBytes(StandardCharsets.UTF_8);

                int length;
                for (int offset = 0; offset < bytes.length; offset += length) {
                    length = Math.min(buf.capacity(), bytes.length - offset);
                    buf.clear();
                    buf.put(bytes, offset, length);
                    buf.flip();
                    socketChannel.write(buf);
                }
                socketChannel.close();
            }
        }
        // Selector不會自己從已selectedKeys中移除SelectionKey實例
        // 必須在處理完通道時自己移除 下次該channel變成就緒時,Selector會再次將其放入selectedKeys中
        keyIterator.remove();
    }
}

tips: Java NIO基於Selector實現高性能網絡I/O這塊使用起來比較繁瑣,使用不友好,一般業界使用基於Java NIO進行封裝優化,擴展豐富功能的Netty框架來優雅實現

高性能I/O優化

下面結合業界熱門開源項目介紹高性能I/O的優化

1 零拷貝

零拷貝(zero copy)技術,用於在數據讀寫中減少甚至完全避免不必要的CPU拷貝,減少內存帶寬的佔用,提高執行效率,零拷貝有幾種不同的實現原理,下面介紹常見開源項目中零拷貝實現

1.1 Kafka零拷貝

Kafka基於Linux 2.1內核提供,並在2.4 內核改進的的sendfile函數 + 硬件提供的DMA Gather Copy實現零拷貝,將文件通過socket傳送

函數通過一次系統調用完成了文件的傳送,減少了原來read/write方式的模式切換。同時減少了數據的copy, sendfile的詳細過程如下:

基本流程如下:

  • (1) 用戶進程發起sendfile系統調用
  • (2) 內核基於DMA Copy將文件數據從磁盤拷貝到內核緩衝區
  • (3) 內核將內核緩衝區中的文件描述信息(文件描述符,數據長度)拷貝到Socket緩衝區
  • (4) 內核基於Socket緩衝區中的文件描述信息和DMA硬件提供的Gather Copy功能將內核緩衝區數據複製到網卡
  • (5) 用戶進程sendfile系統調用完成並返回

相比傳統的I/O方式,sendfile + DMA Gather Copy方式實現的零拷貝,數據拷貝次數從4次降為2次,系統調用從2次降為1次,用戶進程上下文切換次數從4次變成2次DMA Copy,大大提高處理效率

Kafka底層基於java.nio包下的FileChannel的transferTo:

public abstract long transferTo(long position, long count, WritableByteChannel target)

transferTo將FileChannel關聯的文件發送到指定channel,當Comsumer消費數據,Kafka Server基於FileChannel將文件中的消息數據發送到SocketChannel

1.2 RocketMQ零拷貝

RocketMQ基於mmap + write的方式實現零拷貝:
mmap() 可以將內核中緩衝區的地址與用戶空間的緩衝區進行映射,實現數據共享,省去了將數據從內核緩衝區拷貝到用戶緩衝區

tmp_buf = mmap(file, len); 
write(socket, tmp_buf, len);

mmap + write 實現零拷貝的基本流程如下:

  • (1) 用戶進程向內核發起系統mmap調用
  • (2) 將用戶進程的內核空間的讀緩衝區與用戶空間的緩存區進行內存地址映射
  • (3) 內核基於DMA Copy將文件數據從磁盤複製到內核緩衝區
  • (4) 用戶進程mmap系統調用完成並返回
  • (5) 用戶進程向內核發起write系統調用
  • (6) 內核基於CPU Copy將數據從內核緩衝區拷貝到Socket緩衝區
  • (7) 內核基於DMA Copy將數據從Socket緩衝區拷貝到網卡
  • (8) 用戶進程write系統調用完成並返回

RocketMQ中消息基於mmap實現存儲和加載的邏輯寫在org.apache.rocketmq.store.MappedFile中,內部實現基於nio提供的java.nio.MappedByteBuffer,基於FileChannel的map方法得到mmap的緩衝區:

// 初始化
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);

查詢CommitLog的消息時,基於mappedByteBuffer偏移量pos,數據大小size查詢:

public SelectMappedBufferResult selectMappedBuffer(int pos, int size) {
    int readPosition = getReadPosition();
    // ...各種安全校驗
    
    // 返回mappedByteBuffer視圖
    ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
    byteBuffer.position(pos);
    ByteBuffer byteBufferNew = byteBuffer.slice();
    byteBufferNew.limit(size);
    return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
}

tips: transientStorePoolEnable機制
Java NIO mmap的部分內存並不是常駐內存,可以被置換到交換內存(虛擬內存),RocketMQ為了提高消息發送的性能,引入了內存鎖定機制,即將最近需要操作的CommitLog文件映射到內存,並提供內存鎖定功能,確保這些文件始終存在內存中,該機制的控制參數就是transientStorePoolEnable

因此,MappedFile數據保存CommitLog刷盤有2種方式:

  • 1 開啟transientStorePoolEnable:寫入內存字節緩衝區(writeBuffer) -> 從內存字節緩衝區(writeBuffer)提交(commit)到文件通道(fileChannel) -> 文件通道(fileChannel) -> flush到磁盤
  • 2 未開啟transientStorePoolEnable:寫入映射文件字節緩衝區(mappedByteBuffer) -> 映射文件字節緩衝區(mappedByteBuffer) -> flush到磁盤

RocketMQ 基於 mmap+write 實現零拷貝,適用於業務級消息這種小塊文件的數據持久化和傳輸
Kafka 基於 sendfile 這種零拷貝方式,適用於系統日誌消息這種高吞吐量的大塊文件的數據持久化和傳輸

tips: Kafka 的索引文件使用的是 mmap+write 方式,數據文件發送網絡使用的是 sendfile 方式

1.3 Netty零拷貝

Netty 的零拷貝分為兩種:

  • 1 基於操作系統實現的零拷貝,底層基於FileChannel的transferTo方法
  • 2 基於Java 層操作優化,對數組緩存對象(ByteBuf )進行封裝優化,通過對ByteBuf數據建立數據視圖,支持ByteBuf 對象合併,切分,當底層僅保留一份數據存儲,減少不必要拷貝

2 多路復用

Netty中對Java NIO功能封裝優化之後,實現I/O多路復用代碼優雅了很多:

// 創建mainReactor
NioEventLoopGroup boosGroup = new NioEventLoopGroup();
// 創建工作線程組
NioEventLoopGroup workerGroup = new NioEventLoopGroup();

final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap 
     // 組裝NioEventLoopGroup 
    .group(boosGroup, workerGroup)
     // 設置channel類型為NIO類型
    .channel(NioServerSocketChannel.class)
    // 設置連接配置參數
    .option(ChannelOption.SO_BACKLOG, 1024)
    .childOption(ChannelOption.SO_KEEPALIVE, true)
    .childOption(ChannelOption.TCP_NODELAY, true)
    // 配置入站、出站事件handler
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) {
            // 配置入站、出站事件channel
            ch.pipeline().addLast(...);
            ch.pipeline().addLast(...);
        }
    });

// 綁定端口
int port = 8080;
serverBootstrap.bind(port).addListener(future -> {
    if (future.isSuccess()) {
        System.out.println(new Date() + ": 端口[" + port + "]綁定成功!");
    } else {
        System.err.println("端口[" + port + "]綁定失敗!");
    }
});

3 頁緩存(PageCache)

頁緩存(PageCache)是操作系統對文件的緩存,用來減少對磁盤的 I/O 操作,以頁為單位的,內容就是磁盤上的物理塊,頁緩存能幫助程序對文件進行順序讀寫的速度幾乎接近於內存的讀寫速度,主要原因就是由於OS使用PageCache機制對讀寫訪問操作進行了性能優化:

頁緩存讀取策略:當進程發起一個讀操作 (比如,進程發起一個 read() 系統調用),它首先會檢查需要的數據是否在頁緩存中:

  • 如果在,則放棄訪問磁盤,而直接從頁緩存中讀取
  • 如果不在,則內核調度塊 I/O 操作從磁盤去讀取數據,並讀入緊隨其後的少數幾個頁面(不少於一個頁面,通常是三個頁面),然後將數據放入頁緩存中

頁緩存寫策略:當進程發起write系統調用寫數據到文件中,先寫到頁緩存,然後方法返回。此時數據還沒有真正的保存到文件中去,Linux 僅僅將頁緩存中的這一頁數據標記為“臟”,並且被加入到臟頁鏈表中

然後,由flusher 回寫線程周期性將臟頁鏈表中的頁寫到磁盤,讓磁盤中的數據和內存中保持一致,最後清理“臟”標識。在以下三種情況下,臟頁會被寫回磁盤:

  • 空閑內存低於一個特定閾值
  • 臟頁在內存中駐留超過一個特定的閾值時
  • 當用戶進程調用 sync() 和 fsync() 系統調用時

RocketMQ中,ConsumeQueue邏輯消費隊列存儲的數據較少,並且是順序讀取,在page cache機制的預讀取作用下,Consume Queue文件的讀性能幾乎接近讀內存,即使在有消息堆積情況下也不會影響性能,提供了2種消息刷盤策略:

  • 同步刷盤:在消息真正持久化至磁盤后RocketMQ的Broker端才會真正返回給Producer端一個成功的ACK響應
  • 異步刷盤,能充分利用操作系統的PageCache的優勢,只要消息寫入PageCache即可將成功的ACK返回給Producer端。消息刷盤採用後台異步線程提交的方式進行,降低了讀寫延遲,提高了MQ的性能和吞吐量

Kafka實現消息高性能讀寫也利用了頁緩存,這裏不再展開

參考

《深入理解Linux內核 —— Daniel P.Bovet》

更多精彩,歡迎關注公眾號 分佈式系統架構

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

分類
發燒車訊

python機器學習——隨機梯度下降

上一篇我們實現了使用梯度下降法的自適應線性神經元,這個方法會使用所有的訓練樣本來對權重向量進行更新,也可以稱之為批量梯度下降(batch gradient descent)。假設現在我們數據集中擁有大量的樣本,比如百萬條樣本,那麼如果我們現在使用批量梯度下降來訓練模型,每更新一次權重向量,我們都要使用百萬條樣本,訓練時間很長,效率很低,我們能不能找到一種方法,既能使用梯度下降法,但是又不要每次更新權重都要使用到所有的樣本,於是隨機梯度下降法(stochastic gradient descent)便被提出來了。

隨機梯度下降法可以只用一個訓練樣本來對權重向量進行更新:
\[ \eta(y^i-\phi(z^i))x^i \]
這種方法比批量梯度下降法收斂的更快,因為它可以更加頻繁的更新權重向量,並且使用當個樣本來更新權重,相比於使用全部的樣本來更新更具有隨機性,有助於算法避免陷入到局部最小值,使用這個方法的要注意在選取樣本進行更新時一定要隨機選取,每次迭代前都要打亂所有的樣本順序,保證訓練的隨機性,並且在訓練時的學習率也不是固定不變的,可以隨着迭代次數的增加,學習率逐漸減小,這種方法可以有助於算法收斂。

現在我們有了使用全部樣本的批量梯度下降法,也有了使用單個樣本的隨機梯度下降法,那麼一種折中的方法,稱為最小批學習(mini-batch learning),它每次使用一部分訓練樣本來更新權重向量。

接下來我們實現使用隨機梯度下降法的Adaline

from numpy.random import seed
class AdalineSGD(object):
    """ADAptive LInear NEuron classifier.

    Parameters
    ----------
    eta:float
        Learning rate(between 0.0 and 1.0
    n_iter:int
        Passes over the training dataset.

    Attributes
    ----------
    w_: 1d-array
        weights after fitting.
    errors_: list
        Number of miscalssifications in every epoch.
    shuffle:bool(default: True)
        Shuffle training data every epoch
        if True to prevent cycles.
    random_state: int(default: None)
        Set random state for shuffling
        and initalizing the weights.

    """

    def __init__(self, eta=0.01, n_iter=10, shuffle=True, random_state=None):
        self.eta = eta
        self.n_iter = n_iter
        self.w_initialized = False
        self.shuffle = shuffle
        if random_state:
            seed(random_state)

    def fit(self, X, y):
        """Fit training data.

        :param X:{array-like}, shape=[n_samples, n_features]
        :param y: array-like, shape=[n_samples]
        :return:
        self:object

        """

        self._initialize_weights(X.shape[1])
        self.cost_ = []

        for i in range(self.n_iter):
            if self.shuffle:
                X, y = self._shuffle(X, y)
            cost = []
            for xi, target in zip(X, y):
                cost.append(self._update_weights(xi, target))
            avg_cost = sum(cost)/len(y)
            self.cost_.append(avg_cost)
        return self
    
    def partial_fit(self, X, y):
        """Fit training data without reinitializing the weights."""
        if not self.w_initialized:
            self._initialize_weights(X.shape[1])
        if y.ravel().shape[0] > 1:
            for xi, target in zip(X, y):
                self._update_weights(xi, target)
        else:
            self._update_weights(X, y)
        return self
    
    def _shuffle(self, X, y):
        """Shuffle training data"""
        r = np.random.permutation(len(y))
        return X[r], y[r]
    
    def _initialize_weights(self, m):
        """Initialize weights to zeros"""
        self.w_ = np.zeros(1 + m)
        self.w_initialized = True
    
    def _update_weights(self, xi, target):
        """Apply Adaline learning rule to update the weights"""
        output = self.net_input(xi)
        error = (target - output)
        self.w_[1:] += self.eta * xi.dot(error)
        self.w_[0] += self.eta * error
        cost = 0.5 * error ** 2
        return cost
    
    def net_input(self, X):
        """Calculate net input"""
        return np.dot(X, self.w_[1:]) + self.w_[0]
    
    def activation(self, X):
        """Computer linear activation"""
        return self.net_input(X)
    
    def predict(self, X):
        """Return class label after unit step"""
        return np.where(self.activation(X) >= 0.0, 1, -1)

其中_shuffle方法中,調用numpy.random中的permutation函數得到0-100的一個隨機序列,然後這個序列作為特徵矩陣和類別向量的下標,就可以起到打亂樣本順序的功能。

現在開始訓練

ada = AdalineSGD(n_iter=15, eta=0.01, random_state=1)
ada.fit(X_std, y)

畫出分界圖和訓練曲線圖

plot_decision_region(X_std, y, classifier=ada)
plt.title('Adaline - Stochastic Gradient Desent')
plt.xlabel('sepal length [standardized]')
plt.ylabel('petal length [standardized]')
plt.legend(loc = 'upper left')
plt.show()
plt.plot(range(1, len(ada.cost_) + 1), ada.cost_, marker='o')
plt.xlabel('Epochs')
plt.ylabel('Average Cost')
plt.show()

從上圖可以看出,平均損失下降很快,在大概第15次迭代后,分界線和使用批量梯度下降的Adaline分界線很類似。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

分類
發燒車訊

解鎖新夥伴關係 政府、企業與社會的氣候合作

文:李翊僑(荷蘭馬斯垂克大學永續科學、政策與社會碩士生)

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

分類
發燒車訊

電動車商機蓬勃,兩岸投資兩樣情

全球電動車市場持續成長,直接刺激鋰電池與電動馬達等相關零組件的需求量。台灣、中國兩岸廠商積極針對電動車進行投資布局,但局勢因策略與市場的不同而有所差異。

中國投資額高,但潛藏不穩定性

中國是目前全球電動車市場成長速度最快的國家之一,但今年首季因補貼政策所造成的不確定性,使電動車銷量比去年同期暴跌九成。

不少中國地方政府將電動車與新能源車列為新興產業投資鼓勵對象,也吸引了大筆投資。據統計,2015年至今,中國大陸各地規劃或已展開建設的新能源汽車建設專案超過30個,總投資額超過人民幣1,000億元,呈現遍地開花的局勢。

但《經濟日報》引述中國媒體指出,上述專案多以「新能源汽車產業圈」的形式存在,看似投資新能源車,實際上是為了恢復原有的汽車產能。部分電動車生產企業甚至已進入停產狀態,未來若無法改善,可能會被強制退市。

雖然中國電動車相關投資積極,但卻有泡沫化的隱憂。

台電池廠投入國際電動車市場

相較之下,台灣長泓能源科技、宏境科技對電動車產業的佈局較為穩健,且也更為注重其他海外市場。

長泓能源科技與台灣車廠、客運業者合作,推動電動大巴專案,目標搶下台灣政府10年內投入1萬輛電動大巴的政策。同時,長泓已與特斯拉的移動式20kWh儲能系統、台塑汽車啟動電池等展開測試合作,積極步入電池組裝領域。

長泓能源科技董事長陳明德表示,公司強化氧化鋰鐵電池的產品性能與應用整合,安全性高,且有機會搶攻中國大陸電動車市場。

另一方面,宏境科技已取得美國電動沙灘巡邏車電池組的1,000輛訂單,同時與某汽車通路商合作,打入中國大陸NEXT EV油電混和車馬達控制器供應鏈,數量可望超過一萬輛。宏境科技也已取得中瑞電動車每年550輛的動力鋰電池組訂單,每年可貢獻約1億元新台幣營收。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

分類
發燒車訊

F-立凱、五龍動力聯手搶攻中國電動車市場

F-立凱公告將與五龍動力、香港上市公司五龍電動車集團簽訂三方交易契約,共同深化電動車產業的上、下游布局,著眼爭取中國乃至於全球的電動車市場。

根據協議,三方將以策略聯盟、資本合作的方式取得股權轉換方案。契約載明,五龍動力將以每股新台幣35元的價格取得立凱電新發行的普通股,佔增資後股本21.8%;思慕完成後,五龍電動車集團在另外以1億港幣(約新台幣4.2億)現金取得立凱電旗下車電事業部門──立凱綠能(蓋曼)股權,以及台灣立凱綠能的部分資產。交易完成後,五龍動力與五龍電動車集團預計將投資凱電超過新台幣20億元。

此外,立凱電同時將以每股0.5元港幣的價格認購五龍電動車集團新發行之430萬普通股與2.75億元港幣的無擔保可轉換公司債,總投資額預計將達港幣4.9億元,並成為五龍電動車集團股東。此舉將幫助立凱電正式邁入中國電動車市場。

F-立凱由尹衍樑投資,為電動巴士系統與磷酸鐵鋰電池正極材料廠商。與五龍動力、五龍電動車集團的交易完成後,將在兩岸三地與國際市場進行明確產業分工,由立凱電繼續投入正極材料研發、製造與銷售,同時為五龍動力提供LFP-NCO奈米金屬氧化物共晶體化磷酸鋰鐵電池正極材料M系列產品的生產與技術顧問服務。三方也將合作於中國建立材料廠,以因應未來中國電動車龐大的材料需求。

此外,台灣立凱綠能也將與五龍電動車在電池芯、電池模組以及電動車技術等領域合作,同時不忘繼續拓展台灣電動巴士業務。F-立凱表示,本次策略聯盟,將可幫助F-立凱、五龍動力、五龍電動車集團、立凱電整合技術、製造、市場、供應鏈與資金,全面進軍中國與全球的儲能市場和電動車市場。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

分類
發燒車訊

Java 8 Streams API 詳解

流式編程作為Java 8的亮點之一,是繼Java 5之後對集合的再一次升級,可以說Java 8幾大特性中,Streams API 是作為Java 函數式的主角來設計的,誇張的說,有了Streams API之後,萬物皆可一行代碼。

什麼是Stream

Stream被翻譯為流,它的工作過程像將一瓶水導入有很多過濾閥的管道一樣,水每經過一個過濾閥,便被操作一次,比如過濾,轉換等,最後管道的另外一頭有一個容器負責接收剩下的水。

示意圖如下:

首先通過source產生流,然後依次通過一些中間操作,比如過濾,轉換,限制等,最後結束對流的操作。

Stream也可以理解為一個更加高級的迭代器,主要的作用便是遍歷其中每一個元素。

為什麼需要Stream

Stream作為Java 8的一大亮點,它專門針對集合的各種操作提供各種非常便利,簡單,高效的API,Stream API主要是通過Lambda表達式完成,極大的提高了程序的效率和可讀性,同時Stram API中自帶的并行流使得併發處理集合的門檻再次降低,使用Stream API編程無需多寫一行多線程的大門就可以非常方便的寫出高性能的併發程序。使用Stream API能夠使你的代碼更加優雅。

流的另一特點是可無限性,使用Stream,你的數據源可以是無限大的。

在沒有Stream之前,我們想提取出所有年齡大於18的學生,我們需要這樣做:

List<Student> result=new ArrayList<>();
for(Student student:students){
 
    if(student.getAge()>18){
        result.add(student);
    }
}
return result;

使用Stream,我們可以參照上面的流程示意圖來做,首先產生Stream,然後filter過濾,最後歸併到容器中。

轉換為代碼如下:

return students.stream().filter(s->s.getAge()>18).collect(Collectors.toList());
  • 首先stream()獲得流
  • 然後filter(s->s.getAge()>18)過濾
  • 最後collect(Collectors.toList())歸併到容器中

是不是很像在寫sql?

如何使用Stream

我們可以發現,當我們使用一個流的時候,主要包括三個步驟:

  • 獲取流
  • 對流進行操作
  • 結束對流的操作

獲取流

獲取流的方式有多種,對於常見的容器(Collection)可以直接.stream()獲取
例如:

  • Collection.stream()
  • Collection.parallelStream()
  • Arrays.stream(T array) or Stream.of()

對於IO,我們也可以通過lines()方法獲取流:

  • java.nio.file.Files.walk()
  • java.io.BufferedReader.lines()

最後,我們還可以從無限大的數據源中產生流:

  • Random.ints()

值得注意的是,JDK中針對基本數據類型的昂貴的裝箱和拆箱操作,提供了基本數據類型的流:

  • IntStream
  • LongStream
  • DoubleStream

這三種基本數據類型和普通流差不多,不過他們流裏面的數據都是指定的基本數據類型。

Intstream.of(new int[]{1,2,3});
Intstream.rang(1,3);

對流進行操作

這是本章的重點,產生流比較容易,但是不同的業務系統的需求會涉及到很多不同的要求,明白我們能對流做什麼,怎麼做,才能更好的利用Stream API的特點。

流的操作類型分為兩種:

  • Intermediate:中間操作,一個流可以後面跟隨零個或多個intermediate操作。其目的主要是打開流,做出某種程度的數據映射/過濾,然後會返回一個新的流,交給下一個操作使用。這類操作都是惰性化的(lazy),就是說,僅僅調用到這類方法,並沒有真正開始流的遍歷。

    map (mapToInt, flatMap 等)、 filter、 distinct、 sorted、 peek、 limit、 skip、 parallel、 sequential、 unordered

  • Terminal:終結操作,一個流只能有一個terminal操作,當這個操作執行后,流就被使用“光”了,無法再被操作。所以這必定是流的最後一個操作。Terminal操作的執行,才會真正開始流的遍歷,並且會生成一個結果,或者一個 side effect。

    forEach、 forEachOrdered、 toArray、 reduce、 collect、 min、 max、 count、 anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 iterator

IntermediateTerminal完全可以按照上圖的流程圖理解,Intermediate表示在管道中間的過濾器,水會流入過濾器,然後再流出去,而Terminal操作便是最後一個過濾器,它在管道的最後面,流入Terminal的水,最後便會流出管道。

下面依次詳細的解讀下每一個操作所能產生的效果:

中間操作

對於中間操作,所有的API的返回值基本都是Stream<T>,因此以後看見一個陌生的API也能通過返回值判斷它的所屬類型。

map/flatMap

map顧名思義,就是映射,map操作能夠將流中的每一個元素映射為另外的元素。

 <R> Stream<R> map(Function<? super T, ? extends R> mapper);

可以看到map接受的是一個Function,也就是接收參數,並返回一個值。

比如:

//提取 List<Student>  所有student 的名字 
List<String> studentNames = students.stream().map(Student::getName)
                                             .collect(Collectors.toList());

上面的代碼等同於以前的:

List<String> studentNames=new ArrayList<>();
for(Student student:students){
    studentNames.add(student.getName());
}

再比如:將List中所有字母轉換為大寫:

List<String> words=Arrays.asList("a","b","c");
List<String> upperWords=words.stream().map(String::toUpperCase)
                                      .collect(Collectors.toList());

flatMap顧名思義就是扁平化映射,它具體的操作是將多個stream連接成一個stream,這個操作是針對類似多維數組的,比如容器裡面包含容器等。

List<List<Integer>> ints=new ArrayList<>(Arrays.asList(Arrays.asList(1,2),
                                          Arrays.asList(3,4,5)));
List<Integer> flatInts=ints.stream().flatMap(Collection::stream).
                                       collect(Collectors.toList());

可以看到,相當於降維。

filter

filter顧名思義,就是過濾,通過測試的元素會被留下來並生成一個新的Stream

Stream<T> filter(Predicate<? super T> predicate);

同理,我們可以filter接收的參數是Predicate,也就是推斷型函數式接口,接收參數,並返回boolean值。

比如:

//獲取所有大於18歲的學生
List<Student> studentNames = students.stream().filter(s->s.getAge()>18)
                                              .collect(Collectors.toList());

distinct

distinct是去重操作,它沒有參數

  Stream<T> distinct();

sorted

sorted排序操作,默認是從小到大排列,sorted方法包含一個重載,使用sorted方法,如果沒有傳遞參數,那麼流中的元素就需要實現Comparable<T>方法,也可以在使用sorted方法的時候傳入一個Comparator<T>

Stream<T> sorted(Comparator<? super T> comparator);

Stream<T> sorted();

值得一說的是這個ComparatorJava 8之後被打上了@FunctionalInterface,其他方法都提供了default實現,因此我們可以在sort中使用Lambda表達式

例如:

//以年齡排序
students.stream().sorted((s,o)->Integer.compare(s.getAge(),o.getAge()))
                                  .forEach(System.out::println);;

然而還有更方便的,Comparator默認也提供了實現好的方法引用,使得我們更加方便的使用:

例如上面的代碼可以改成如下:

//以年齡排序 
students.stream().sorted(Comparator.comparingInt(Student::getAge))
                            .forEach(System.out::println);;

或者:

//以姓名排序
students.stream().sorted(Comparator.comparing(Student::getName)).
                          forEach(System.out::println);

是不是更加簡潔。

peek

peek有遍歷的意思,和forEach一樣,但是它是一个中間操作。

peek接受一個消費型的函數式接口。

Stream<T> peek(Consumer<? super T> action);

例如:

//去重以後打印出來,然後再歸併為List
List<Student> sortedStudents= students.stream().distinct().peek(System.out::println).
                                                collect(Collectors.toList());

limit

limit裁剪操作,和String::subString(0,x)有點先溝通,limit接受一個long類型參數,通過limit之後的元素只會剩下min(n,size)個元素,n表示參數,size表示流中元素個數

 Stream<T> limit(long maxSize);

例如:

//只留下前6個元素並打印
students.stream().limit(6).forEach(System.out::println);

skip

skip表示跳過多少個元素,和limit比較像,不過limit是保留前面的元素,skip是保留後面的元素

Stream<T> skip(long n);

例如:

//跳過前3個元素並打印 
students.stream().skip(3).forEach(System.out::println);

終結操作

一個流處理中,有且只能有一個終結操作,通過終結操作之後,流才真正被處理,終結操作一般都返回其他的類型而不再是一個流,一般來說,終結操作都是將其轉換為一個容器。

forEach

forEach是終結操作的遍歷,操作和peek一樣,但是forEach之後就不會再返迴流

 void forEach(Consumer<? super T> action);

例如:

//遍歷打印
students.stream().forEach(System.out::println);

上面的代碼和一下代碼效果相同:

for(Student student:students){
    System.out.println(sudents);
}

toArray

toArrayList##toArray()用法差不多,包含一個重載。

默認的toArray()返回一個Object[]

也可以傳入一個IntFunction<A[]> generator指定數據類型

一般建議第二種方式。

Object[] toArray();

<A> A[] toArray(IntFunction<A[]> generator);

例如:

 Student[] studentArray = students.stream().skip(3).toArray(Student[]::new);

max/min

max/min即使找出最大或者最小的元素。max/min必須傳入一個Comparator

Optional<T> min(Comparator<? super T> comparator);

Optional<T> max(Comparator<? super T> comparator);

count

count返迴流中的元素數量

long count();

例如:

long  count = students.stream().skip(3).count();

reduce

reduce為歸納操作,主要是將流中各個元素結合起來,它需要提供一個起始值,然後按一定規則進行運算,比如相加等,它接收一個二元操作 BinaryOperator函數式接口。從某種意義上來說,sum,min,max,average都是特殊的reduce

reduce包含三個重載:

T reduce(T identity, BinaryOperator<T> accumulator);

Optional<T> reduce(BinaryOperator<T> accumulator);

 <U> U reduce(U identity,
                 BiFunction<U, ? super T, U> accumulator,
                 BinaryOperator<U> combiner);

例如:

List<Integer> integers = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
        
long count = integers.stream().reduce(0,(x,y)->x+y);

以上代碼等同於:

long count = integers.stream().reduce(Integer::sum).get();

reduce兩個參數和一個參數的區別在於有沒有提供一個起始值,

如果提供了起始值,則可以返回一個確定的值,如果沒有提供起始值,則返回Opeational防止流中沒有足夠的元素。

anyMatch allMatch noneMatch

測試是否有任意元素\所有元素\沒有元素匹配表達式

他們都接收一個推斷類型的函數式接口:Predicate

 boolean anyMatch(Predicate<? super T> predicate);

 boolean allMatch(Predicate<? super T> predicate);

 boolean noneMatch(Predicate<? super T> predicate)

例如:

 boolean test = integers.stream().anyMatch(x->x>3);

findFirst、 findAny

獲取元素,這兩個API都不接受任何參數,findFirt返迴流中第一個元素,findAny返迴流中任意一個元素。

Optional<T> findFirst();

Optional<T> findAny();

也有有人會問findAny()這麼奇怪的操作誰會用?這個API主要是為了在并行條件下想要獲取任意元素,以最大性能獲取任意元素

例如:

int foo = integers.stream().findAny().get();

collect

collect收集操作,這個API放在後面將是因為它太重要了,基本上所有的流操作最後都會使用它。

我們先看collect的定義:

 <R> R collect(Supplier<R> supplier,
                  BiConsumer<R, ? super T> accumulator,
                  BiConsumer<R, R> combiner);

<R, A> R collect(Collector<? super T, A, R> collector);

可以看到,collect包含兩個重載:

一個參數和三個參數,

三個參數我們很少使用,因為JDK提供了足夠我們使用的Collector供我們直接使用,我們可以簡單了解下這三個參數什麼意思:

  • Supplier:用於產生最後存放元素的容器的生產者
  • accumulator:將元素添加到容器中的方法
  • combiner:將分段元素全部添加到容器中的方法

前兩個元素我們都很好理解,第三個元素是幹嘛的呢?因為流提供了并行操作,因此有可能一個流被多個線程分別添加,然後再將各個子列表依次添加到最終的容器中。

↓ – – – – – – – – –

↓ — — —

↓ ———

如上圖,分而治之。

例如:

List<String> result = stream.collect(ArrayList::new, List::add, List::addAll);

接下來看只有一個參數的collect

一般來說,只有一個參數的collect,我們都直接傳入Collectors中的方法引用即可:

List<Integer> = integers.stream().collect(Collectors.toList());

Collectors中包含很多常用的轉換器。toList(),toSet()等。

Collectors中還包括一個groupBy(),他和Sql中的groupBy一樣都是分組,返回一個Map

例如:

//按學生年齡分組
Map<Integer,List<Student>> map= students.stream().
                                collect(Collectors.groupingBy(Student::getAge));

groupingBy可以接受3個參數,分別是

  1. 第一個參數:分組按照什麼分類
  2. 第二個參數:分組最後用什麼容器保存返回(當只有兩個參數是,此參數默認為HashMap
  3. 第三個參數:按照第一個參數分類后,對應的分類的結果如何收集

有時候單參數的groupingBy不滿足我們需求的時候,我們可以使用多個參數的groupingBy

例如:

//將學生以年齡分組,每組中只存學生的名字而不是對象
Map<Integer,List<String>> map =  students.stream().
  collect(Collectors.groupingBy(Student::getAge,Collectors.mapping(Student::getName,Collectors.toList())));

toList默認生成的是ArrayList,toSet默認生成的是HashSet,如果想要指定其他容器,可以如下操作:

 students.stream().collect(Collectors.toCollection(TreeSet::new));

Collectors還包含一個toMap,利用這個API我們可以將List轉換為Map

  Map<Integer,Student> map=students.stream().
                           collect(Collectors.toMap(Student::getAge,s->s));

值得注意的一點是,IntStreamLongStream,DoubleStream是沒有collect()方法的,因為對於基本數據類型,要進行裝箱,拆箱操作,SDK並沒有將它放入流中,對於基本數據類型流,我們只能將其toArray()

優雅的使用Stream

了解了Stream API,下面詳細介紹一下如果優雅的使用Steam

  • 了解流的惰性操作

    前面說到,流的中間操作是惰性的,如果一個流操作流程中只有中間操作,沒有終結操作,那麼這個流什麼都不會做,整個流程中會一直等到遇到終結操作操作才會真正的開始執行。

    例如:

    students.stream().peek(System.out::println);

    這樣的流操作只有中間操作,沒有終結操作,那麼不管流裡面包含多少元素,他都不會執行任何操作。

  • 明白流操作的順序的重要性

    Stream API中,還包括一類Short-circuiting,它能夠改變流中元素的數量,一般這類API如果是中間操作,最好寫在靠前位置:

    考慮下面兩行代碼:

    students.stream().sorted(Comparator.comparingInt(Student::getAge)).
                      peek(System.out::println).
                      limit(3).              
                      collect(Collectors.toList());
    students.stream().limit(3).
                      sorted(Comparator.comparingInt(Student::getAge)).
                      peek(System.out::println).
                      collect(Collectors.toList());

    兩段代碼所使用的API都是相同的,但是由於順序不同,帶來的結果都非常不一樣的,

    第一段代碼會先排序所有的元素,再依次打印一遍,最後獲取前三個最小的放入list中,

    第二段代碼會先截取前3個元素,在對這三個元素排序,然後遍歷打印,最後放入list中。

  • 明白Lambda的局限性

    由於Java目前只能Pass-by-value,因此對於Lambda也和有匿名類一樣的final的局限性。

    具體原因可以參考

    因此我們無法再lambda表達式中修改外部元素的值。

    同時,在Stream中,我們無法使用break提前返回。

  • 合理編排Stream的代碼格式

    由於可能在使用流式編程的時候會處理很多的業務邏輯,導致API非常長,此時最後使用換行將各個操作分離開來,使得代碼更加易讀。

    例如:

    students.stream().limit(3).
                      sorted(Comparator.comparingInt(Student::getAge)).
                      peek(System.out::println).
                      collect(Collectors.toList());

    而不是:

    students.stream().limit(3).sorted(Comparator.comparingInt(Student::getAge)).peek(System.out::println).collect(Collectors.toList());

    同時由於Lambda表達式省略了參數類型,因此對於變量,盡量使用完成的名詞,比如student而不是s,增加代碼的可讀性。

    盡量寫出敢在代碼註釋上留下你的名字的代碼!

總結

總之,Stream是Java 8 提供的簡化代碼的神器,合理使用它,能讓你的代碼更加優雅。

尊重勞動成功,轉載註明出處

參考鏈接:

《Effective Java》3th

如果覺得寫得不錯,歡迎關注微信公眾號:逸游Java ,每天不定時發布一些有關Java乾貨的文章,感謝關注

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

※評比南投搬家公司費用收費行情懶人包大公開

分類
發燒車訊

不甩歐盟反對 川普批准制裁俄歐天然氣管線

摘錄自2019年12月21日中央社報導

法新社報導,美國總統川普21日批准,可制裁替俄國建設天然氣管線到德國的企業。美國國會擔心,管線將讓俄國獲得影響歐洲盟邦的危險籌碼;但歐盟反對制裁,認為自己有權決定能源政策。

美國的制裁鎖定在波羅的海建造北溪天然氣2號管線(Nord Stream 2)的相關公司;造價近110億美元的這條管線,可讓俄國輸送至歐洲經濟體龍頭德國的天然氣量倍增。美國國會議員警告,管線會助長一個含敵意的俄國政府,並在歐陸緊張升溫之際,大增俄國總統蒲亭(Vladimir Putin)的影響力。

不過華府此舉已激怒莫斯科及歐盟,後者表示他們有權決定自己的能源政策。

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

※評比南投搬家公司費用收費行情懶人包大公開

分類
發燒車訊

戴姆勒總裁蔡澈:電動汽車續航里程應達499公里以上

日前,戴姆勒集團總裁迪特•蔡澈(Dieter Zetsche)稱,電動汽車一次充電必須能供應至少310英里(499公里)的行程,才能替代燃油汽車,成為主流。

但蔡澈在接受美國媒體採訪時稱,在短時期內讓所有消費者接受電動汽車不太可能,並稱這是一個連續的過程。首先要降低車載電池的成本。蔡澈估計該價格在170美元每千瓦時左右,而110-130美元則會使汽車很有競爭優勢。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

※評比南投搬家公司費用收費行情懶人包大公開

分類
發燒車訊

如何高效的學習技術

  我們相信努力學習一定會有收穫,但是方法不當,既讓人身心疲憊,也沒有切實的回報。高中時代,我的同桌是個漂亮女同學。她的物理成績很差,雖然她非常勤奮的學習,但成績總是不理想。為了鞏固純潔的同學關係,我親密無間地輔導她的物理,發現她不知道題目考什麼。我們的教科書與試題都圍繞着考試大綱展開,看到一道題,應該先想想它在考哪些定理和公式的運用。
  不少朋友每天都閱讀技術文章,但是第二天就忘乾淨了。工作中領導和同事都認可你的溝通和技術能力,但是跳槽面試卻屢屢碰壁。面試官問技術方案,明明心裏清楚,用嘴說出來卻前言不搭后語。面試官再問底層算法,你說看過但是忘記了。他不在乎你看沒看過,答不上就是零分。正如男女相親,男方談吐瀟洒才能吸引姑娘。可是男方緊張了,平時挺能說,關鍵時候卻支支吾吾,姑娘必然認為他不行。人生充滿了許多考試,有形的和無形的,每次考試的機會只有一次。
  工作五年十年後,別人成了架構師,自己還在基層打滾,原因是什麼?職場上無法成功升遷的原因有很多,沒有持續學習、學習效果不好、無法通過心儀公司的的面試,一定是很重要的原因。
  把自己當成一台計算機,既有輸入,也要有輸出,用輸出倒逼輸入

  近些年誕生了許多新技術,比如最時髦的AI(目前還在智障階段),數學基礎是初中就接觸過的概率統計。萬丈高樓從地起,不要被新工具或者中間件迷住雙眼,一味地追新求快。基礎知識是所有技術的基石,在未來很長的時間都不會變化,應該花費足夠的時間鞏固基礎。
  以數據結構和算法為例,大家閱讀一下Java的BitSet的源碼,裏面有大量的移位操作,移位運算掌握的好,看這份源碼就沒問題。Java同步工具類AQS用到了雙向鏈表,鏈表知識不過關,肯定搞不懂它的原理。互聯網大廠都喜歡考算法,為了通過面試也要精通算法。
  以Java工程師應該掌握的知識為例,按重要程度排出六個梯度:

  • 第一梯度:計算機組成原理、數據結構和算法、網絡通信原理、操作系統原理;
  • 第二梯度:Java基礎、JVM內存模型和GC算法、JVM性能調優、JDK工具、設計模式;
  • 第三梯度:Spring系列、Mybatis、Dubbo等主流框架的運用和原理;
  • 第四梯度:MySQL(含SQL編程)、Redis、RabbitMQ/RocketMQ/Kafka、ZooKeeper等數據庫或者中間件的運用和原理;
  • 第五梯度:CAP理論、BASE理論、Paxos和Raft算法等其他分佈式理論;
  • 第六梯度:容器化、大數據、AI、區塊鏈等等前沿技術理論;

有同學認為第五梯度應該在移到第一梯度。其實很多小公司的日活犹如古天樂一樣平平無奇,離大型分佈式架構還遠得很。學習框架和中間件的時候,順手掌握分佈式理論,效果更好。

  許多公司的招聘JD沒有設定技術人員年齡門檻,但是會加上一句“具備與年齡相當的知識的廣度與深度”。多廣才算廣,多深才算深?這是很主觀的話題,這裏不展開討論。
  如何變得更廣更深呢?突破收入上升的瓶頸,發掘自己真正的興趣
  大多數人只是公司的普通職員,收入上升的瓶頸就是升職加薪。許多IT公司會對技術人員有個評級,如果你的評級不高,那就依照晉級章程努力升級。如果你在一個小公司,收入一般,發展前景不明,準備大廠的面試就是最好的學習過程。在這些過程中,你必然學習更多知識,變得更廣更深。
  個人興趣是前進的動力之一,許多知名開源項目都源於作者的興趣。個人興趣並不局限技術領域,可以是其他學科。我有個朋友喜歡玩山地自行車,還給一些做自行車話題的自媒體投稿。久而久之,居然能夠寫一手好文章了,我相信他也能寫好技術文檔。

  哲學不是故作高深的學科,它的現實意義就是解決問題。年輕小伙是怎麼泡妞的?三天兩頭花不斷,大庭廣眾跪求愛。這類套路為什麼總是能成功呢?禮物滿足女人的物慾,當眾求愛滿足女人的虛榮心,投其所好。食堂大媽打菜的手越來越抖,辣子雞丁變成辣子辣丁,為什麼呢?食堂要控製成本,直接提價會惹眾怒。
  科學上的哲學,一般指研究事物發展的規律,歸納終極的解決方案。軟件行業充滿哲學味道的作品非常多,比如。舉個例子,當軟件系統遇到性能問題,嘗試下面兩種哲學思想提升性能:

  • 空間換時間:比如引入緩存,消耗額外的存儲提高響應速度。
  • 時間換空間:比如大文件的分片處理,分段處理后再匯總結果。

設計穩健高可用的系統,嘗試從三個方面考慮問題:

  • 存儲:數據會丟失嗎,數據一致性怎麼解決。
  • 計算:計算怎麼擴容,應用允許任意增加節點嗎。
  • 傳輸:網絡中斷或擁塞怎麼辦。

從無數的失敗或者成功的經驗中,總結出高度概括性的方案,讓我們下一步做的更好。

  英語是極為重要的基礎,學好英語與掌握編程語言一樣重要。且不說外企對英語的要求,許多知名博客就是把英文翻譯成中文,充當知識的搬運工。如果英語足夠好,直接閱讀一手英語資料,避免他人翻譯存在的謬誤。

  體系化的知識比零散的更容易記憶和理解,這正如一部好的電視劇,劇情環環相扣才能吸引觀眾。建議大家使用思維導圖羅列知識點,構建體繫結構,如下圖所示:

  高中是我們知識的巔峰時刻,每周小考每月大考,教輔資料堆成山,地獄式的反覆操練強化記憶。複習是對抗遺忘的唯一辦法。大腦的遺忘是有規律的,先快后慢。一天後,學到的知識只剩下原來的25%,甚至更低。隨着時間的推移,遺忘的速度減慢,遺忘的數量也就減少。

時間間隔 記憶量
剛看完 100%
20分鐘后 60%
1小時后 40%
1天後 30%
2天後 27%

每個人的遺忘程度都不一樣,建議第二天複習前一天的內容,七天後複習這段時間的所有內容。

  不少朋友利用碎片時間學習,比如在公交上看公眾號的推送。其實我們都高估了自己的抗干擾能力,如果處在嘈雜的環境,注意力容易被打斷,記憶留存度也很低。碎片時間適合學習簡單孤立的知識點,比如鏈表的定義與實現。
  學習複雜的知識,需要大段的連續時間。圖書館是個好地方,安靜氛圍好。手機放一邊,不要理會QQ微信,最好閱讀紙質書,泡上一整天。有些城市出現了付費自習室,提供格子間、茶水等等,也是非常好的選擇。

  從下面這張圖我們可以看到,教授他人是知識留存率最高的方式。

  準備PPT和演講內容,給同事來一場技術分享。不光複習知識,還鍛煉口才。曾經有個同事說話又快又急,口頭禪也多,比如”對吧、是不是”,別人經常聽不清,但是他本人不以為然。領導讓他做了幾次技術分享,聽眾的反應可想而知,他才徹底認清缺點。
  堅持寫技術博客,別在意你寫的東西在網上已經重複千百遍。當自己動手的時候,才會意識到眼高手低。讓文章讀起來流暢清晰,需要嘔心瀝血的刪改。寫作是對大腦的長期考驗,想不到肯定寫不出,想不清楚肯定寫不清楚。

我們經常說不要重複造輪子。為了開發效率,可以不造輪子,但是必須具備造輪子的能力。建議造一個簡單的MQ,你能用到通信協議、設計模式、隊列等許多知識。在造輪子的過程中,你會頻繁的翻閱各種手冊或者博客,這就是用輸出倒逼輸入

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

※評比南投搬家公司費用收費行情懶人包大公開

分類
發燒車訊

詳解Kafka Producer

上一篇文章我們主要介紹了什麼是 Kafka,Kafka 的基本概念是什麼,Kafka 單機和集群版的搭建,以及對基本的配置文件進行了大致的介紹,還對 Kafka 的幾個主要角色進行了描述,我們知道,不管是把 Kafka 用作消息隊列、消息總線還是數據存儲平台來使用,最終是繞不過消息這個詞的,這也是 Kafka 最最核心的內容,Kafka 的消息從哪裡來?到哪裡去?都干什麼了?別著急,一步一步來,先說說 Kafka 的消息從哪來。

生產者概述

在 Kafka 中,我們把產生消息的那一方稱為生產者,比如我們經常回去淘寶購物,你打開淘寶的那一刻,你的登陸信息,登陸次數都會作為消息傳輸到 Kafka 後台,當你瀏覽購物的時候,你的瀏覽信息,你的搜索指數,你的購物愛好都會作為一個個消息傳遞給 Kafka 後台,然後淘寶會根據你的愛好做智能推薦,致使你的錢包從來都禁不住誘惑,那麼這些生產者產生的消息是怎麼傳到 Kafka 應用程序的呢?發送過程是怎麼樣的呢?

儘管消息的產生非常簡單,但是消息的發送過程還是比較複雜的,如圖

我們從創建一個ProducerRecord 對象開始,ProducerRecord 是 Kafka 中的一個核心類,它代表了一組 Kafka 需要發送的 key/value 鍵值對,它由記錄要發送到的主題名稱(Topic Name),可選的分區號(Partition Number)以及可選的鍵值對構成。

在發送 ProducerRecord 時,我們需要將鍵值對對象由序列化器轉換為字節數組,這樣它們才能夠在網絡上傳輸。然後消息到達了分區器。

如果發送過程中指定了有效的分區號,那麼在發送記錄時將使用該分區。如果發送過程中未指定分區,則將使用key 的 hash 函數映射指定一個分區。如果發送的過程中既沒有分區號也沒有,則將以循環的方式分配一個分區。選好分區后,生產者就知道向哪個主題和分區發送數據了。

ProducerRecord 還有關聯的時間戳,如果用戶沒有提供時間戳,那麼生產者將會在記錄中使用當前的時間作為時間戳。Kafka 最終使用的時間戳取決於 topic 主題配置的時間戳類型。

  • 如果將主題配置為使用 CreateTime,則生產者記錄中的時間戳將由 broker 使用。
  • 如果將主題配置為使用LogAppendTime,則生產者記錄中的時間戳在將消息添加到其日誌中時,將由 broker 重寫。

然後,這條消息被存放在一個記錄批次里,這個批次里的所有消息會被發送到相同的主題和分區上。由一個獨立的線程負責把它們發到 Kafka Broker 上。

Kafka Broker 在收到消息時會返回一個響應,如果寫入成功,會返回一個 RecordMetaData 對象,它包含了主題和分區信息,以及記錄在分區里的偏移量,上面兩種的時間戳類型也會返回給用戶。如果寫入失敗,會返回一個錯誤。生產者在收到錯誤之後會嘗試重新發送消息,幾次之後如果還是失敗的話,就返回錯誤消息。

創建 Kafka 生產者

要往 Kafka 寫入消息,首先需要創建一個生產者對象,並設置一些屬性。Kafka 生產者有3個必選的屬性

  • bootstrap.servers

該屬性指定 broker 的地址清單,地址的格式為 host:port。清單里不需要包含所有的 broker 地址,生產者會從給定的 broker 里查找到其他的 broker 信息。不過建議至少要提供兩個 broker 信息,一旦其中一個宕機,生產者仍然能夠連接到集群上。

  • key.serializer

broker 需要接收到序列化之後的 key/value值,所以生產者發送的消息需要經過序列化之後才傳遞給 Kafka Broker。生產者需要知道採用何種方式把 Java 對象轉換為字節數組。key.serializer 必須被設置為一個實現了org.apache.kafka.common.serialization.Serializer 接口的類,生產者會使用這個類把鍵對象序列化為字節數組。這裏拓展一下 Serializer 類

Serializer 是一個接口,它表示類將會採用何種方式序列化,它的作用是把對象轉換為字節,實現了 Serializer 接口的類主要有 ByteArraySerializerStringSerializerIntegerSerializer ,其中 ByteArraySerialize 是 Kafka 默認使用的序列化器,其他的序列化器還有很多,你可以通過 查看其他序列化器。要注意的一點:key.serializer 是必須要設置的,即使你打算只發送值的內容

  • value.serializer

與 key.serializer 一樣,value.serializer 指定的類會將值序列化。

下面代碼演示了如何創建一個 Kafka 生產者,這裏只指定了必要的屬性,其他使用默認的配置

private Properties properties = new Properties();
properties.put("bootstrap.servers","broker1:9092,broker2:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties = new KafkaProducer<String,String>(properties);

來解釋一下這段代碼

  • 首先創建了一個 Properties 對象
  • 使用 StringSerializer 序列化器序列化 key / value 鍵值對
  • 在這裏我們創建了一個新的生產者對象,併為鍵值設置了恰當的類型,然後把 Properties 對象傳遞給他。

實例化生產者對象后,接下來就可以開始發送消息了,發送消息主要由下面幾種方式

直接發送,不考慮結果

使用這種發送方式,不會關心消息是否到達,會丟失一些消息,因為 Kafka 是高可用的,生產者會自動嘗試重發,這種發送方式和 UDP 運輸層協議很相似。

同步發送

同步發送仍然使用 send() 方法發送消息,它會返回一個 Future 對象,調用 get() 方法進行等待,就可以知道消息時候否發送成功。

異步發送

異步發送指的是我們調用 send() 方法,並制定一個回調函數,服務器在返迴響應時調用該函數。

下一節我們會重新討論這三種實現。

向 Kafka 發送消息

簡單消息發送

Kafka 最簡單的消息發送如下:

ProducerRecord<String,String> record =
                new ProducerRecord<String, String>("CustomerCountry","West","France");

producer.send(record);

代碼中生產者(producer)的 send() 方法需要把 ProducerRecord 的對象作為參數進行發送,ProducerRecord 有很多構造函數,這個我們下面討論,這裏調用的是

public ProducerRecord(String topic, K key, V value) {}

這個構造函數,需要傳遞的是 topic主題,key 和 value。

把對應的參數傳遞完成后,生產者調用 send() 方法發送消息(ProducerRecord對象)。我們可以從生產者的架構圖中看出,消息是先被寫入分區中的緩衝區中,然後分批次發送給 Kafka Broker。

發送成功后,send() 方法會返回一個 Future(java.util.concurrent) 對象,Future 對象的類型是 RecordMetadata 類型,我們上面這段代碼沒有考慮返回值,所以沒有生成對應的 Future 對象,所以沒有辦法知道消息是否發送成功。如果不是很重要的信息或者對結果不會產生影響的信息,可以使用這種方式進行發送。

我們可以忽略發送消息時可能發生的錯誤或者在服務器端可能發生的錯誤,但在消息發送之前,生產者還可能發生其他的異常。這些異常有可能是 SerializationException(序列化失敗)BufferedExhaustedException 或 TimeoutException(說明緩衝區已滿),又或是 InterruptedException(說明發送線程被中斷)

同步發送消息

第二種消息發送機制如下所示

ProducerRecord<String,String> record =
                new ProducerRecord<String, String>("CustomerCountry","West","France");

try{
  RecordMetadata recordMetadata = producer.send(record).get();
}catch(Exception e){
  e.printStackTrace();
}

這種發送消息的方式較上面的發送方式有了改進,首先調用 send() 方法,然後再調用 get() 方法等待 Kafka 響應。如果服務器返回錯誤,get() 方法會拋出異常,如果沒有發生錯誤,我們會得到 RecordMetadata 對象,可以用它來查看消息記錄。

生產者(KafkaProducer)在發送的過程中會出現兩類錯誤:其中一類是重試錯誤,這類錯誤可以通過重發消息來解決。比如連接的錯誤,可以通過再次建立連接來解決;無錯誤則可以通過重新為分區選舉首領來解決。KafkaProducer 被配置為自動重試,如果多次重試后仍無法解決問題,則會拋出重試異常。另一類錯誤是無法通過重試來解決的,比如消息過大對於這類錯誤,KafkaProducer 不會進行重試,直接拋出異常。

異步發送消息

同步發送消息都有個問題,那就是同一時間只能有一個消息在發送,這會造成許多消息無法直接發送,造成消息滯后,無法發揮效益最大化。

比如消息在應用程序和 Kafka 集群之間一個來回需要 10ms。如果發送完每個消息后都等待響應的話,那麼發送100個消息需要 1 秒,但是如果是異步方式的話,發送 100 條消息所需要的時間就會少很多很多。大多數時候,雖然Kafka 會返回 RecordMetadata 消息,但是我們並不需要等待響應。

為了在異步發送消息的同時能夠對異常情況進行處理,生產者提供了回掉支持。下面是回調的一個例子

ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("CustomerCountry", "Huston", "America");
        producer.send(producerRecord,new DemoProducerCallBack());


class DemoProducerCallBack implements Callback {

  public void onCompletion(RecordMetadata metadata, Exception exception) {
    if(exception != null){
      exception.printStackTrace();;
    }
  }
}

首先實現回調需要定義一個實現了org.apache.kafka.clients.producer.Callback的類,這個接口只有一個 onCompletion方法。如果 kafka 返回一個錯誤,onCompletion 方法會拋出一個非空(non null)異常,這裏我們只是簡單的把它打印出來,如果是生產環境需要更詳細的處理,然後在 send() 方法發送的時候傳遞一個 Callback 回調的對象。

生產者分區機制

Kafka 對於數據的讀寫是以分區為粒度的,分區可以分佈在多個主機(Broker)中,這樣每個節點能夠實現獨立的數據寫入和讀取,並且能夠通過增加新的節點來增加 Kafka 集群的吞吐量,通過分區部署在多個 Broker 來實現負載均衡的效果。

上面我們介紹了生產者的發送方式有三種:不管結果如何直接發送發送並返回結果發送並回調。由於消息是存在主題(topic)的分區(partition)中的,所以當 Producer 生產者發送產生一條消息發給 topic 的時候,你如何判斷這條消息會存在哪個分區中呢?

這其實就設計到 Kafka 的分區機制了。

分區策略

Kafka 的分區策略指的就是將生產者發送到哪個分區的算法。Kafka 為我們提供了默認的分區策略,同時它也支持你自定義分區策略。

如果要自定義分區策略的話,你需要显示配置生產者端的參數 Partitioner.class,我們可以看一下這個類它位於 org.apache.kafka.clients.producer 包下

public interface Partitioner extends Configurable, Closeable {
  
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

  public void close();
  
  default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {}
}

Partitioner 類有三個方法,分別來解釋一下

  • partition(): 這個類有幾個參數: topic,表示需要傳遞的主題;key 表示消息中的鍵值;keyBytes表示分區中序列化過後的key,byte數組的形式傳遞;value 表示消息的 value 值;valueBytes 表示分區中序列化后的值數組;cluster表示當前集群的原數據。Kafka 給你這麼多信息,就是希望讓你能夠充分地利用這些信息對消息進行分區,計算出它要被發送到哪個分區中。
  • close() : 繼承了 Closeable 接口能夠實現 close() 方法,在分區關閉時調用。
  • onNewBatch(): 表示通知分區程序用來創建新的批次

其中與分區策略息息相關的就是 partition() 方法了,分區策略有下面這幾種

順序輪訓

順序分配,消息是均勻的分配給每個 partition,即每個分區存儲一次消息。就像下面這樣

上圖表示的就是輪訓策略,輪訓策略是 Kafka Producer 提供的默認策略,如果你不使用指定的輪訓策略的話,Kafka 默認會使用順序輪訓策略的方式。

隨機輪訓

隨機輪訓簡而言之就是隨機的向 partition 中保存消息,如下圖所示

實現隨機分配的代碼只需要兩行,如下

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());

先計算出該主題總的分區數,然後隨機地返回一個小於它的正整數。

本質上看隨機策略也是力求將數據均勻地打散到各個分區,但從實際表現來看,它要遜於輪詢策略,所以如果追求數據的均勻分佈,還是使用輪詢策略比較好。事實上,隨機策略是老版本生產者使用的分區策略,在新版本中已經改為輪詢了。

按照 key 進行消息保存

這個策略也叫做 key-ordering 策略,Kafka 中每條消息都會有自己的key,一旦消息被定義了 Key,那麼你就可以保證同一個 Key 的所有消息都進入到相同的分區裏面,由於每個分區下的消息處理都是有順序的,故這個策略被稱為按消息鍵保序策略,如下圖所示

實現這個策略的 partition 方法同樣簡單,只需要下面兩行代碼即可:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();

上面這幾種分區策略都是比較基礎的策略,除此之外,你還可以自定義分區策略。

生產者壓縮機制

壓縮一詞簡單來講就是一種互換思想,它是一種經典的用 CPU 時間去換磁盤空間或者 I/O 傳輸量的思想,希望以較小的 CPU 開銷帶來更少的磁盤佔用或更少的網絡 I/O 傳輸。如果你還不了解的話我希望你先讀完這篇文章 ,然後你就明白壓縮是怎麼回事了。

Kafka 壓縮是什麼

Kafka 的消息分為兩層:消息集合 和 消息。一個消息集合中包含若干條日誌項,而日誌項才是真正封裝消息的地方。Kafka 底層的消息日誌由一系列消息集合日誌項組成。Kafka 通常不會直接操作具體的一條條消息,它總是在消息集合這個層面上進行寫入操作。

在 Kafka 中,壓縮會發生在兩個地方:Kafka Producer 和 Kafka Consumer,為什麼啟用壓縮?說白了就是消息太大,需要變小一點 來使消息發的更快一些。

Kafka Producer 中使用 compression.type 來開啟壓縮

private Properties properties = new Properties();
properties.put("bootstrap.servers","192.168.1.9:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type", "gzip");

Producer<String,String> producer = new KafkaProducer<String, String>(properties);

ProducerRecord<String,String> record =
  new ProducerRecord<String, String>("CustomerCountry","Precision Products","France");

上面代碼錶明該 Producer 的壓縮算法使用的是 GZIP

有壓縮必有解壓縮,Producer 使用壓縮算法壓縮消息后併發送給服務器后,由 Consumer 消費者進行解壓縮,因為採用的何種壓縮算法是隨着 key、value 一起發送過去的,所以消費者知道採用何種壓縮算法。

Kafka 重要參數配置

在上一篇文章 中,我們主要介紹了一下 kafka 集群搭建的參數,本篇文章我們來介紹一下 Kafka 生產者重要的配置,生產者有很多可配置的參數,在文檔里(

key.serializer

用於 key 鍵的序列化,它實現了 org.apache.kafka.common.serialization.Serializer 接口

value.serializer

用於 value 值的序列化,實現了 org.apache.kafka.common.serialization.Serializer 接口

acks

acks 參數指定了要有多少個分區副本接收消息,生產者才認為消息是寫入成功的。此參數對消息丟失的影響較大

  • 如果 acks = 0,就表示生產者也不知道自己產生的消息是否被服務器接收了,它才知道它寫成功了。如果發送的途中產生了錯誤,生產者也不知道,它也比較懵逼,因為沒有返回任何消息。這就類似於 UDP 的運輸層協議,只管發,服務器接受不接受它也不關心。
  • 如果 acks = 1,只要集群的 Leader 接收到消息,就會給生產者返回一條消息,告訴它寫入成功。如果發送途中造成了網絡異常或者 Leader 還沒選舉出來等其他情況導致消息寫入失敗,生產者會受到錯誤消息,這時候生產者往往會再次重發數據。因為消息的發送也分為 同步異步,Kafka 為了保證消息的高效傳輸會決定是同步發送還是異步發送。如果讓客戶端等待服務器的響應(通過調用 Future 中的 get() 方法),顯然會增加延遲,如果客戶端使用回調,就會解決這個問題。
  • 如果 acks = all,這種情況下是只有當所有參与複製的節點都收到消息時,生產者才會接收到一個來自服務器的消息。不過,它的延遲比 acks =1 時更高,因為我們要等待不只一個服務器節點接收消息。

buffer.memory

此參數用來設置生產者內存緩衝區的大小,生產者用它緩衝要發送到服務器的消息。如果應用程序發送消息的速度超過發送到服務器的速度,會導致生產者空間不足。這個時候,send() 方法調用要麼被阻塞,要麼拋出異常,具體取決於 block.on.buffer.null 參數的設置。

compression.type

此參數來表示生產者啟用何種壓縮算法,默認情況下,消息發送時不會被壓縮。該參數可以設置為 snappy、gzip 和 lz4,它指定了消息發送給 broker 之前使用哪一種壓縮算法進行壓縮。下面是各壓縮算法的對比

retries

生產者從服務器收到的錯誤有可能是臨時性的錯誤(比如分區找不到首領),在這種情況下,reteis 參數的值決定了生產者可以重發的消息次數,如果達到這個次數,生產者會放棄重試並返回錯誤。默認情況下,生產者在每次重試之間等待 100ms,這個等待參數可以通過 retry.backoff.ms 進行修改。

batch.size

當有多個消息需要被發送到同一個分區時,生產者會把它們放在同一個批次里。該參數指定了一個批次可以使用的內存大小,按照字節數計算。當批次被填滿,批次里的所有消息會被發送出去。不過生產者井不一定都會等到批次被填滿才發送,任意條數的消息都可能被發送。

client.id

此參數可以是任意的字符串,服務器會用它來識別消息的來源,一般配置在日誌里

max.in.flight.requests.per.connection

此參數指定了生產者在收到服務器響應之前可以發送多少消息,它的值越高,就會佔用越多的內存,不過也會提高吞吐量。把它設為1 可以保證消息是按照發送的順序寫入服務器。

timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms

request.timeout.ms 指定了生產者在發送數據時等待服務器返回的響應時間,metadata.fetch.timeout.ms 指定了生產者在獲取元數據(比如目標分區的首領是誰)時等待服務器返迴響應的時間。如果等待時間超時,生產者要麼重試發送數據,要麼返回一個錯誤。timeout.ms 指定了 broker 等待同步副本返回消息確認的時間,與 asks 的配置相匹配—-如果在指定時間內沒有收到同步副本的確認,那麼 broker 就會返回一個錯誤。

max.block.ms

此參數指定了在調用 send() 方法或使用 partitionFor() 方法獲取元數據時生產者的阻塞時間當生產者的發送緩衝區已捕,或者沒有可用的元數據時,這些方法就會阻塞。在阻塞時間達到 max.block.ms 時,生產者會拋出超時異常。

max.request.size

該參數用於控制生產者發送的請求大小。它可以指能發送的單個消息的最大值,也可以指單個請求里所有消息的總大小。

receive.buffer.bytes 和 send.buffer.bytes

Kafka 是基於 TCP 實現的,為了保證可靠的消息傳輸,這兩個參數分別指定了 TCP Socket 接收和發送數據包的緩衝區的大小。如果它們被設置為 -1,就使用操作系統的默認值。如果生產者或消費者與 broker 處於不同的數據中心,那麼可以適當增大這些值。

文章參考:

《Kafka 權威指南》

極客時間 -《Kafka 核心技術與實戰》

Kafka 源碼

關注公眾號獲取更多優質电子書,關注一下你就知道資源是有多好了

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

※評比南投搬家公司費用收費行情懶人包大公開