分類
發燒車訊

福特計畫推出全新新能源車型 或將命名為Model E

據海外媒體報導稱福特計畫推出一款全新新能源車型,其很有可能會命名為Model E。

福特公司把這款Model E定位於一款緊湊車型,這款車會推出混動、插電式混動以及純電動版本,主要取代的是純電動版福克斯以及混動版和插電式混動版C-MAX。

另外,福特公司將會投資16億美元的資金,在墨西哥San Luis Potosi建設全新的工廠,並且這款Model E也將會在這座工廠生產。這款車將在2018年正式亮相,在2019年投放市場。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業 缺乏曝光? 下一步”網站設計“幫您第一時間規劃公司的門面形象

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※試算大陸海運運費!

分類
發燒車訊

第六屆中國國際新能源汽車論壇2016即將在下周隆重開幕!

2016年4月20-22日∣中國·上海安亭穎奕皇冠假日酒店
電池、電控電機、充電設施、汽車電子
車聯網、車後市、創新核心技術、無線充電

由希邁商務諮詢(上海)有限公司主辦的2016年第六屆中國國際新能源汽車論壇即將于下周4月20日-4月22日在上海隆重舉行。此次論壇獲得了亞太電動車學會、國家新能源機動車產品品質監督檢驗中心、上海交大密西根學院、賽迪顧問及中國綠色能源產業技術創新戰略聯盟的大力支持。截止3月14日,論壇已經確認42位演講嘉賓出席本次論壇並做高品質學術演講。演講嘉賓分別來自菲律賓電動車協會、國家新能源機動車產品品質監督檢驗中心、美國國家能源局、中國工程院等在內的政府單位與研究機構,以及包括寶馬、通用汽車、特斯拉、長安汽車、宇通客車、奇瑞、比亞迪戴姆勒、上汽、北汽、觀致等在內的多個知名整車商,將在論壇上共同研討新能源汽車行業發展趨勢、技術路線及難點、基礎設施建設、商業模式,延續以往的豐碩成果,繼續為新能源汽車行業作出貢獻。

本屆得到行業內世界百強企業麥格納動力總成與格特拉克聯合作為論壇的鉑金贊助,同時也獲得了寧波招寶磁業有限公司作為論壇的銀牌贊助,廈門宏發電力電器有限公司作為論壇的銅牌贊助及北京易微行科技有限公司作為論壇的晚宴贊助的大力支持。此外,論壇還獲得了30多家來自全世界各地的知名媒體進行宣傳,並且將會在現場為部分企業進行專題訪問,將在為期3天的論壇上進行全方位的即時報導。

本屆論壇相比歷屆舉辦規模最大的第六屆新能源汽車論壇,涉及主論壇及三個分論壇、考察活動、頒獎典禮和交流晚宴。屆時將有全球範圍內的整車製造商、電網電力公司、電池廠商、零部件供應商、核心技術提供商和政府官員一起,對新能源汽車產業面臨的挑戰,機遇與對策各方面進行為期三天更深層次並具有建設和戰略性的探討。歡迎各位行業內人士積極參與,如有意向請聯繫組委會,期待與您下周共同參與!

會議結構

大會連絡人:Hill ZENG(曾先生)
聯繫電話:0086 21-6045 1760 或郵箱
我們期待與貴單位一起出席於2016年4月20-22日在上海舉辦的第六屆中國國際新能源汽車論壇2016,以利決策!

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想要讓你的商品成為最夯、最多人討論的話題?網頁設計公司讓你強力曝光

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

※專營大陸快遞台灣服務

台灣快遞大陸的貨運公司有哪些呢?

分類
發燒車訊

蘋果電動車專案未止步,挖角、研發行動低調進行中

市場瘋傳蘋果(Apple)將推出電動車,但至今仍只聞樓梯響。不過,海外媒體持續傳出蘋果電動車專案仍有大小動作,包含聘用新人、營運研發實驗室等,計畫看來並未胎死腹中。

蘋果電動車的計畫代號傳聞為「Titan」,傳言將在2019年正式開始出貨。蘋果電動車被報導將搭載自動駕駛技術,也會研究採用載運工具共享服務。

科技網站Electrek報導,蘋果將聘用原特斯拉(Tesla)工程開發副總裁Chris Porritt接替將要離職的Steve Zadesky,繼續執行與電動車專案相關的工作。而在這之前,蘋果已從特斯拉、福特、賓士、通用汽車等國際車廠與電池廠挖角專家,也聘用曾在福斯汽車、Nvidia等公司服務,專注於先進駕駛輔助系統與自動駕駛系統領域的專業人士。

此外,MacRumors也引述德國FrankfurterAllgemeine Zeitung報導,指出蘋果正在德國柏林暗中營運一座研發實驗室,共有15~20名研發人員,主要來自德國汽車產業界,或許正是蘋果電動車專案的一部分。

蘋果執行長Tim Cook過去曾參訪BMW的電動車i3產線,與電動車相關的傳言滿天飛。但蘋果對於電動車計畫始終守口如瓶,沒有正面回應。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

大陸寄台灣空運注意事項

大陸海運台灣交貨時間多久?

※避免吃悶虧無故遭抬價!台中搬家公司免費估價,有契約讓您安心有保障!

分類
發燒車訊

長安李偉:汽車智慧化分為智慧駕駛、智慧互聯和智慧交互三大發展範疇

未來汽車應該是什麼樣子?每個人都有自己的想像,每個車企都有各自的概念。而在差異化之中的相似之處是,許多企業都把智慧化作為發展方向,將無人駕駛作為嘗試的關鍵步驟。

長安汽車副總裁李偉表示智慧汽車可分為三大發展範疇:智慧駕駛、智慧互聯和智慧交互,而智慧駕駛又可分為四級技術水準。

智慧汽車的第一個範疇——智慧駕駛。對於長安汽車來講,智慧駕駛的一級技術已經成熟且在車上搭載,例如全速的自我調整巡航,緊急高速自動、緊急制動等,這些技術都已經在16款睿騁、CS75、逸動等車型上實現量產;智慧駕駛的二級技術,現在已經在做產品的研發和測試,二級技術主要是在一級自我調整巡航系統的基礎上升級,爭取把手解脫了,另外再加一個全自動倒車,二級的系統長安預計在2017年要量產;智慧駕駛三級技術水準是實現在高速路段的無人駕駛,從重慶到北京的整個無人駕駛汽車,實際上就是智慧駕駛的三級水準,長安汽車計畫在2018年實現整個技術儲備開發,全部匹配結束,2019年能夠得以上市。另外全自動化駕駛技術,就是智慧駕駛的最高級四級,長安努力爭取在2025年前能夠實現量產。

智能汽車的第二個範疇——智能互聯。李偉簡單舉了個例子,“現在長安在美國MTC現場進行叫智慧互聯汽車,它實際上是車和車可以通訊,車和路可以通訊,車和交通信號可以通訊等等。大家如果設想一下,我們現在長安目前的無人駕駛狀態,實際上是靠車本身的信號識別來判斷我的交通情況。未來如果說我們城市是智慧城市,我的交通都是數位的信號,在一公里之前車就能感知我那邊的紅綠燈什麼時候變紅燈什麼時候變綠燈,除了前邊車之外還有什麼車,通過車和車的通訊就會知道,這樣整個交通就會更加更加智慧。未來智慧城市,車聯網和車更加融合,這個車就會更加智慧,沒看到就會知道是什麼前邊情況,從這個方面來講,傳統自動智慧駕駛汽車和智慧互聯再有機的融合,就會帶來更聰明的汽車,就會有更自動的汽車。”

智能汽車的第三個範疇——智能交互。這個階段就是所謂的”人機交互”階段,需要發出什麼指令,不用操作,也不用說出來,只需要在腦袋裡一想,汽車就能執行相應命令。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

大陸寄台灣空運注意事項

大陸海運台灣交貨時間多久?

※避免吃悶虧無故遭抬價!台中搬家公司免費估價,有契約讓您安心有保障!

分類
發燒車訊

一分鐘帶你了解下Spring Security!

一、什麼是Spring Security?

Spring Security是一個功能強大且高度可定製的身份驗證和訪問控制框架,它是用於保護基於Spring的應用程序的實際標準。

Spring Security是一個框架,致力於為Java應用程序提供身份驗證和授權。與所有Spring項目一樣,Spring Security的真正強大之處在於可以輕鬆擴展以滿足自定義要求。

更多信息可以查看官網:https://spring.io/projects/spring-security

二、Spring Security的主要功能

  • 認證:驗證用戶名和密碼是否合法(是否系統中用戶)
  • 授權:是系統用戶不代表你能使用某些功能,因為你可能沒有權限
  • 防禦會話固定,點擊劫持,跨站點請求偽造等攻擊
  • Servlet API集成
  • 與Spring Web MVC的可選集成

三、快速入門

新建一個SpringBoot的web項目spring-boot-security。

案例1:接口不添加保護

pom文件中不引入Spring Security,然後新建一個controller:

@RestController
public class AppController {

    @GetMapping("/hello")
    public String hello() {
        return "Hello,spring security!";
    }
}

然後打開瀏覽器訪問:http://localhost:8080/hello,成功后返回:

Hello,spring security!

案例2:接口添加保護

  1. pom文件添加依賴

pom文件中引入Spring Security的starter:

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-security</artifactId>
        </dependency>
  1. 訪問接口

打開瀏覽器再次訪問http://localhost:8080/hello,會被重定向到登錄頁http://localhost:8080/login,截圖如下:

要登錄系統,我們需要知道用戶名和密碼,Spring Security默認的用戶名是user,項目啟動的時候會生成默認密碼(在啟動日誌中可以看到),輸入用戶名和密碼后就可以訪問/hello接口了。

當然也可以自定義用戶名密碼,在配置文件添加如下內容即可:

spring.security.user.name=java_suisui
spring.security.user.password=123456

四、自定義認證和授權

上面說過Spring Security的功能有“認證”和“授權”,下面通過一個簡單的例子實現下自定義的認證和授權。

假設系統中有兩個角色:

  • ADMIN 可以訪問/admin下的資源
  • USER 可以訪問/user下的資源

按照下面步驟操作即可。

  1. 新建一個配置類

對於用戶名、密碼、登錄頁面、訪問權限等都可以在 WebSecurityConfigurerAdapter 的實現類中配置。

WebSecurityConfig代碼如下:

/**
 * 配置類
 * @Author java_suisui
 *
 */
@EnableWebSecurity
@Configuration
public class WebSecurityConfig extends WebSecurityConfigurerAdapter {

    @Override
    protected void configure(AuthenticationManagerBuilder auth) throws Exception {
        //配置內存中的 用戶名、密碼和角色
        auth.inMemoryAuthentication().passwordEncoder(new MyPasswordEncoder()).withUser("user").password("123456").roles("USER");
        auth.inMemoryAuthentication().passwordEncoder(new MyPasswordEncoder()).withUser("admin").password("123456").roles("ADMIN");
    }

    @Override
    protected void configure(HttpSecurity http) throws Exception {
        http.authorizeRequests()
                .antMatchers("/login").permitAll()
                .antMatchers("/user").hasRole("USER") //訪問 /user這個接口,需要有USER角色
                .antMatchers("/admin").hasRole("ADMIN")
                .anyRequest().authenticated() //剩餘的其他接口,登錄之後就能訪問
                .and()
                .formLogin().defaultSuccessUrl("/hello");
    }
}
  1. 創建PasswordEncorder的實現類

內存用戶驗證時,Spring Boot 2.0以上版本引用的security 依賴是 spring security 5.X版本,此版本需要提供一個PasswordEncorder的實例。

MyPasswordEncoder代碼如下:

public class MyPasswordEncoder implements PasswordEncoder {
    @Override
    public String encode(CharSequence rawPassword) {
        return rawPassword.toString();
    }

    @Override
    public boolean matches(CharSequence rawPassword, String encodedPassword) {
        return encodedPassword.equals(rawPassword);
    }
}
  1. 登錄驗證

瀏覽器打開http://localhost:8080/login,

  • 使用user登錄,可以訪問/user
  • 使用admin登錄,可以訪問/admin

如果使用user登錄后訪問/admin,會報403錯誤,具體錯誤信息如下:

Whitelabel Error Page
This application has no explicit mapping for /error, so you are seeing this as a fallback.

Tue Nov 19 16:26:28 CST 2019
There was an unexpected error (type=Forbidden, status=403).
Forbidden

結果和我們預期的一致,說明簡單的自定義認證和授權功能已經實現了。

完整源碼地址:

推薦閱讀

Java碎碎念,一個堅持原創的公眾號,為您提供一系列系統架構、微服務、Java、SpringBoot、SpringCloud等高質量技術文章。
如果覺得文章不錯,希望可以隨手轉發或者”在看“哦,非常感謝哈!
關注下方公眾號后回復「1024」,有驚喜哦!

本文由博客一文多發平台 發布!

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業 缺乏曝光? 下一步”網站設計“幫您第一時間規劃公司的門面形象

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※試算大陸海運運費!

分類
發燒車訊

在 ASP.NET Core 項目中使用 MediatR 實現中介者模式

 一、前言

   最近有在看 DDD 的相關資料以及微軟的  這個項目中基於 DDD 的架構設計,在  這個示例服務中,可以看到各層之間的代碼調用與我們之前傳統的調用方式似乎差異很大,整個項目各個層之間的代碼全部是通過注入 IMediator 進行調用的,F12 查看源碼后可以看到該接口是屬於  這個組件的。既然要照葫蘆畫瓢,那我們就先來了解下如何在 ASP.NET Core 項目中使用 。

  代碼倉儲:

 二、Step by Step

    從 github 的項目主頁上可以看到作者對於這個項目的描述是基於中介者模式的 .NET 實現,是一種基於進程內的數據傳遞。也就是說這個組件主要實現的是在一個應用中實現數據傳遞,如果想要實現多個應用間的數據傳遞就不太適合了。從作者的 github 個人主頁上可以看到,他還是  這個 OOM 組件的作者,PS,如果你想要了解如何在 ASP.NET Core 項目中使用 AutoMapper,你可以查看我之前寫的這一篇文章()。而對於 MediatR 來說,在具體的學習使用之前,我們先來了解下什麼是中介者模式。

  1、什麼是中介者模式

  很多舶來詞的中文翻譯其實最終都會與實際的含義相匹配,例如軟件開發過程中的 23 種設計模式的中文名稱,我們其實可以比較容易的從中文名稱中得知出該設計模式具體想要實現的作用,就像這裏介紹的中介者模式。

  在我們通過代碼實現實際的業務邏輯時,如果涉及到多個對象類之間的交互,通常我們都是會採用直接引用的形式,隨着業務邏輯變的越來越複雜,對於一個簡單的業務抽象出的實現方法中,可能會被我們添加上各種判斷的邏輯或是對於數據的業務邏輯處理方法。

  例如一個簡單的用戶登錄事件,我們可能最終會抽象出如下的業務流程實現。

public bool Login(AppUserLoginDto dto, out string msg)
{
    bool flag = false;
    try
    {
        // 1、驗證碼是否正確
        flag = _redisLogic.GetValueByKey(dto.VerificationCode);
        if (!flag)
        {
            msg = "驗證碼錯誤,請重試";
            return false;
        }

        // 2、驗證賬戶密碼是否正確
        flag = _userLogic.GetAppUser(dto.Account.Trim(), dto.Password.Trim(), out AppUserDetailDto appUser);
        if (!flag)
        {
            msg = "賬戶或密碼錯誤,請重試";
            return false;
        }

        // 3、驗證賬戶是否可以登錄當前的站點(未被鎖定 or 具有登錄當前系統的權限...)
        flag = _authLogic.CheckIsAvailable(appUser);
        if (!flag)
        {
            msg = "用戶被禁止登錄當前系統,請重試";
            return false;
        }

        // 4、設置當前登錄用戶信息
        _authLogic.SetCurrentUser(appUser);

        // 5、記錄登錄記錄
        _userLogic.SaveLoginRecord(appUser);

        msg = "";
        return true;
    }
    catch (Exception ex)
    {
        // 記錄錯誤信息
        msg = $"用戶登錄失敗:{ex.Message}";
        return false;
    }
}

  這裏我們假設對於登錄事件的實現方法存在於 UserAppService 這個類中,對於 redis 資源的操作在 RedisLogic 類中,對於用戶相關資源的操作在 UserLogic 中,而對於權限校驗相關的資源操作位於 AuthLogic 類中。

  可以看到,為了實現 UserAppService 類中定義的登錄方法,我們至少需要依賴於 RedisLogic、UserLogic 以及 AuthLogic,甚至在某些情況下可能在 UserLogic 和 AuthLogic 之間也存在着某種依賴關係,因此我們可以從中得到如下圖所示的類之間的依賴關係。

  一個簡單的登錄業務尚且如此,如果我們需要對登錄業務添加新的需求,例如現在很多網站的登錄和註冊其實是放在一起的,當登錄時如果判斷沒有當前的用戶信息,其實會催生創建新用戶的流程,那麼,對於原本的登錄功能實現,是不是會存在繼續添加新的依賴關係的情況。同時對於很多本身就很複雜的業務,最終實現出來的方法是不是會有更多的對象類之間存在各種的依賴關係,牽一發而動全身,後期修改測試的成本會不會變得更高。

  那麼,中介者模式是如何解決這個問題呢?

  在上文有提到,對於舶來詞的中文名稱,中文更多的會根據實際的含義進行命名,試想一下我們在現實生活中提到中介,是不是更多的會想到房屋中介這一角色。當我們來到一個新的城市,面臨着租房的問題,絕大多數的情況下,我們最終需要通過中介去達成我們租房的目的。在租房這個案例中,房屋中介其實就是一个中介者,他承接我們對於想要租的房子的各種需求,從自己的房屋數據庫中去尋找符合條件的,最終以一個橋樑的形式,連接我們與房東,最終就房屋的租住達成一致。

  而在軟件開發中,中介者模式則是要求我們根據實際的業務去定義一個包含各種對象之間交互關係的對象類,之後,所有涉及到該業務的對象都只關聯於這一个中介對象類,不再顯式的調用其它類。採用了中介者模式之後設計的登錄功能所涉及到的類依賴如下圖所示,這裏的 AppUserLoginEventHandler 其實就是我們的中介類。

  當然,任何事都會有利有弊,不會存在百分百完美的事情,就像我們通過房租中介去尋找合適的房屋,最終我們需要付給中介一筆費用去作為酬勞,採用中介者模式設計的代碼架構也會存在別的問題。因為在代碼中引入了中介者這一對象,勢必會增加我們代碼的複雜度,可能會使原本很輕鬆就實現的代碼變得複雜。同時,我們引入中介者模式的初衷是為了解決各個對象類之間複雜的引用關係,對於某些業務來說,本身就很複雜,最終必定會導致這个中介者對象異常複雜。

  畢竟,軟件開發的過程中不會存在銀彈去幫我們解決所有的問題。

  那麼,在本篇文章的示例代碼中,我將使用 MediatR 這一組件,通過引入中介者模式的思想來完成上面的用戶登錄這一案例。

  2、組件加載

  在使用 MediatR 之前,這裏簡單介紹下這篇文章的示例 demo 項目。這個示例項目的架構分層可以看成是介於傳統的多層架構與採用 DDD 的思想的架構分層。嗯,你可以理解成四不像,屬於那種傳統模式下的開發人員在往 DDD 思想上進行遷移的成品,具體的代碼分層說明解釋如下。

  01_Infrastructure:基礎架構層,這層會包含一些對於基礎組件的配置或是幫助類的代碼,對於每個新建的服務來說,該層的代碼幾乎都是差不多的,所以對於基礎架構層的代碼其實最好是發布到公有 or 私有的 Nuget 倉庫中,然後我們直接在項目中通過 Nuget 去引用。

  對於採用 DDD 的思想構建的項目來說,很多人可能習慣將一些實體的配置也放置在基礎架構層,我的個人理解還是應該置於領域層,對於基礎架構層,只做一些基礎組件的封裝。如果有什麼不對的地方,歡迎在評論區提出。

  02_Domain:領域層,這層會包含我們根據業務劃分出的領域的幾乎所有重要的部分,有領域對象(Domain Object)、值對象(Value Object)、領域事件(Domain Event)、以及倉儲(Repository)等等領域組件。

  這裏雖然我創建了 AggregateModels(聚合實體)這個文件夾,其實在這個項目中,我創建的還是不包含任何業務邏輯的貧血模型。同時,對於倉儲(Repository)在領域分層中是置於 Infrastructure(基礎架構層)還是位於 Domain(領域層),每個人都會有自己的理解,這裏我還是更傾向於放在 Domain 層中更符合其定位。

  03_Application:應用層,這一層會包含我們基於領域所封裝出的各種實際的業務邏輯,每個封裝出的服務應用之間並不會出現互相調用的情況。

  Sample.Api:API 接口層,這層就很簡單了,主要是通過 API 接口暴露出我們基於領域對外提供的各種服務。

  整個示例項目的分層結構如下圖所示。

  與使用其它的第三方組件的使用方式相同,在使用之前,我們需要在項目中通過 Nuget 添加對於 MediatR 的程序集引用。

  這裏需要注意,因為我們主要是通過引用 MediatR 來實現中介者模式,所以我們只需要在領域層和應用層加載 MediatR 即可。而對於 Sample.Api 這個 Web API 項目,因為需要通過依賴注入的方式來使用我們基於 MediatR 所構建出的各種服務,所以這裏我們還要添加 MediatR.Extensions.Microsoft.DependencyInjection 這個程序集到 Sample.Api 中。

Install-Package MediatR
Install-Package MediatR.Extensions.Microsoft.DependencyInjection

  3、案例實現

  首先我們在 Sample.Domain 這個類庫的 AggregateModels 文件夾下添加 AppUser(用戶信息)類 和 Address(地址信息) 類,這裏雖然並沒有採用 DDD 的思想去劃分領域對象和值對象,我們創建出來的都是不含任何業務邏輯的貧血模型。但是在用戶管理這個業務中,對於用戶所包含的聯繫地址信息,其實是一種無狀態的數據。也就是說對於同一個地址信息,不會因為置於多個用戶中而出現數據的二義性。因此,對於地址信息來說,是不需要唯一的標識就可以區分出這個數據的,所以這裏的 Address 類就不需要添加主鍵,其實也就是對應於領域建模中的值對象。

  這裏我是使用的 EF Core 作為項目的 ORM 組件,當創建好需要使用實體之後,我們在 Sample.Domain 這個類庫下面新建一個 SeedWorks 文件夾,添加自定義的 DbContext 對象和用於執行 EF Core 第一次生成數據庫時寫入預置種子數據的信息類。

  這裏需要注意,在 EF Core 中,當我們需要將編寫的 C# 類通過 Code First 創建出數據庫表時,我們的 C# 類必須包含主鍵信息。而對應到我們這裏的 Address 類來說,它更多的是作為 AppUser 類中的屬性信息來展示的,所以這裏我們需要對 EF Core 生成數據庫表的過程進行重寫。

  這裏我們在 SeedWorks 文件夾下創建一個新的文件夾 EntityConfigurations,在這裏用來存放我們自定義的 EF Core 創建表的規則。新建一個繼承於 IEntityTypeConfiguration<AppUser> 接口的 AppUserConfiguration 配置類,在接口默認 Configure 方法中,我們需要編寫映射規則,將 Address 類作為 AppUser 類中的字段進行显示,最終實現后的代碼如下所示。 

public class AppUserConfiguration : IEntityTypeConfiguration<AppUser>
{
    public void Configure(EntityTypeBuilder<AppUser> builder)
    {
        // 表名稱
        builder.ToTable("appuser");

        // 實體屬性配置
        builder.OwnsOne(i => i.Address, n =>
        {
            n.Property(p => p.Province).HasMaxLength(50)
                .HasColumnName("Province")
                .HasDefaultValue("");

            n.Property(p => p.City).HasMaxLength(50)
                .HasColumnName("City")
                .HasDefaultValue("");

            n.Property(p => p.Street).HasMaxLength(50)
                .HasColumnName("Street")
                .HasDefaultValue("");

            n.Property(p => p.ZipCode).HasMaxLength(50)
                .HasColumnName("ZipCode")
                .HasDefaultValue("");
        });
    }
}

  當創建表的映射規則編寫完成后,我們就可以對 UserApplicationDbContext 類進行重寫 OnModelCreating 方法。在這個方法中,我們就可以去應用我們自定義設置的實體映射規則,從而讓 EF Core 按照我們的想法去創建數據庫,最終實現的代碼如下所示。

public class UserApplicationDbContext : DbContext
{
    public DbSet<AppUser> AppUsers { get; set; }

    public UserApplicationDbContext(DbContextOptions<UserApplicationDbContext> options)
        : base(options)
    {
    }

    /// <summary>
    ///
    /// </summary>
    /// <param name="modelBuilder"></param>
    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        // 自定義 AppUser 表創建規則
        modelBuilder.ApplyConfiguration(new AppUserConfiguration());
    }
}

  當我們創建好 DbContext 后,我們需要在 Startup 類的 ConfigureServices 方法中進行注入。在示例代碼中,我使用的是 MySQL 8.0 數據庫,將配置文件寫入到 appsettings.json 文件中,最終注入 DbContext 的代碼如下所示。

public void ConfigureServices(IServiceCollection services)
{
    // 配置數據庫連接字符串
    services.AddDbContext<UserApplicationDbContext>(options =>
        options.UseMySql(Configuration.GetConnectionString("SampleConnection")));
}

  數據庫的連接字符串配置如下。

{
  "ConnectionStrings": {
    "SampleConnection": "server=127.0.0.1;database=sample.application;user=root;password=123456@sql;port=3306;persistsecurityinfo=True;"
  }
}

  在上文有提到,除了創建一個 DbContext 對象,我們還創建了一個 DbInitializer 類用於在 EF Core 第一次執行創建數據庫操作時將我們預置的信息寫入到對應的數據庫表中。這裏我們只是簡單的判斷下 AppUser 這張表是否存在數據,如果沒有數據,我們就添加一條新的記錄,最終實現的代碼如下所示。

public class DbInitializer
{
    public static void Initialize(UserApplicationDbContext context)
    {
        context.Database.EnsureCreated();

        if (context.AppUsers.Any())
            return;

        AppUser admin = new AppUser()
        {
            Id = Guid.NewGuid(),
            Name = "墨墨墨墨小宇",
            Account = "danvic.wang",
            Phone = "13912345678",
            Age = 12,
            Password = "123456",
            Gender = true,
            IsEnabled = true,
            Address = new Address("啦啦啦啦街道", "啦啦啦市", "啦啦啦省", "12345"),
            Email = "danvic.wang@yuiter.com",
        };

        context.AppUsers.Add(admin);
        context.SaveChanges();
    }
}

  當我們完成種子數據植入的代碼,我們需要在程序啟動之前就去執行我們的代碼。因此我們需要修改 Program 類中的 Main 方法,實現在運行 web 程序之前去執行種子數據的植入。

public class Program
{
    public static void Main(string[] args)
    {
        var host = CreateWebHostBuilder(args).Build();

        using (var scope = host.Services.CreateScope())
        {
            // 執行種子數據植入
            //
            var services = scope.ServiceProvider;
            var context = services.GetRequiredService<UserApplicationDbContext>();
            DbInitializer.Initialize(context);
        }
    }

    public static IWebHostBuilder CreateWebHostBuilder(string[] args) =>
        WebHost.CreateDefaultBuilder(args)
            .UseStartup<Startup>();
}

  這時,運行我們的項目,程序就會自動執行創建數據庫的操作,同時會將我們預設好的種子數據寫入到數據庫表中,最終實現的效果如下圖所示。

  基礎的項目代碼已經完成之後,我們就可以開始學習如何通過 MediatR 來實現中介者模式。在這一章的示例項目中,我們會使用到 MediatR 中兩個很重要的接口類型:IRequest 和 INotification。

  在 Github 上,作者針對這兩個接口做了如下的,這裏我會按照我的理解去進行使用。同時,為了防止我的理解出現了偏差,從而對各位造成影響,這裏貼上作者回復解釋的原文。

Requests are for:
1 request to 1 handler. Handler may or may not return a value
Notifications are for:
1 notification to n handlers. Handler may not return a value.


In practical terms, requests are "commands", notifications are "events".
Command would be directing MediatR to do something like "ApproveInvoiceCommand -> ApproveInvoiceHandler". Event would be
notifications, like "InvoiceApprovedEvent -> SendThankYouEmailToCustomerHandler"

 

  對於繼承於 IRequest 接口的類來說,一個請求(request)只會有一個針對這個請求的處理程序(requestHandler),它可以返回值或者不返回任何信息;

  而對於繼承於 INotification 接口的類來說,一個通知(notification)會對應多個針對這個通知的處理程序(notificationHandlers),而它們不會返回任何的數據。

  請求(request)更像是一種命令(command),而通知(notification)更像是一種事件(event)。嗯,可能看起來更暈了,jbogard 這裏給了一個案例給我們進一步的解釋了 request 與 notification 之間的差異性。

  雙十一剛過,很多人都會瘋狂剁手,對於購買大件來說,為了能夠更好地擁有售後服務,我們在購買后肯定會期望商家給我們提供發票,這裏的要求商家提供發票就是一種 request,而針對我們的這個請求,商家會做出回應,不管能否開出來發票,商家都應當通知到我們,這裏的通知用戶就是一種 notification。

  對於提供發票這個 request 來說,不管最終的結果如何,它只會存在一種處理方式;而對於通知用戶這個 notification 來說,商家可以通過短信通知,可以通過公眾號推送,也可以通過郵件通知,不管採用什麼方式,只要完成了通知,對於這個事件來說也就已經完成了。    

  而對應於用戶登錄這個業務來說,用戶的登錄行為其實就是一個 request,對於這個 request 來說,我們可能會去數據庫查詢賬戶是否存在,判斷是不是具有登錄系統的權限等等。而不管我們在這個過程中做了多少的邏輯判斷,它只會有兩種結果,登錄成功或登錄失敗。而對於用戶登錄系統之後可能需要設置當前登錄人員信息,記錄用戶登錄日誌這些行為來說,則是歸屬於 notification 的。

  弄清楚了用戶登錄事件中的 request 和 notification 劃分,那麼接下來我們就可以通過代碼來實現我們的功能。這裏對於示例項目中的一些基礎組件的配置我就跳過了,如果你想要具體的了解這裏使用到的一些組件的使用方法,你可以查閱我之前的文章。

  首先,我們在 Sample.Application 這個類庫下面創建一個 Commands 文件夾,在下面存放用戶的請求信息。現在我們創建一個用於映射用戶登錄請求的 UserLoginCommand 類,它需要繼承於 IRequest<T> 這個泛型接口。因為對於用戶登錄這個請求來說,只會有可以或不可以這兩個結果,所以對於這個請求的響應的結果是 bool 類型的,也就是說,我們具體應該繼承的是 IRequest<bool>。

  對於用戶發起的各種請求來說,它其實只是包含了對於這次請求的一些基本信息,而對於 UserLoginCommand 這個用戶登錄請求類來說,它可能只會有賬號、密碼、驗證碼這三個信息,請求類代碼如下所示。

public class UserLoginCommand : IRequest<bool>
{
    /// <summary>
    /// 賬戶
    /// </summary>
    public string Account { get; private set; }

    /// <summary>
    /// 密碼
    /// </summary>
    public string Password { get; private set; }

    /// <summary>
    /// 驗證碼
    /// </summary>
    public string VerificationCode { get; private set; }

    /// <summary>
    /// ctor
    /// </summary>
    /// <param name="account">賬戶</param>
    /// <param name="password">密碼</param>
    /// <param name="verificationCode">驗證碼</param>
    public UserLoginCommand(string account, string password, string verificationCode)
    {
        Account = account;
        Password = password;
        VerificationCode = verificationCode;
    }
}

  當我們擁有了存儲用戶登錄請求信息的類之後,我們就需要對用戶的登錄請求進行處理。這裏,我們在 Sample.Application 這個類庫下面新建一個 CommandHandlers 文件夾用來存放用戶請求的處理類。

  現在我們創建一個繼承於 IRequestHandler 接口的 UserLoginCommandHandler 類用來實現對於用戶登錄請求的處理。IRequestHandler 是一個泛型的接口,它需要我們在繼承時聲明我們需要實現的請求,以及該請求的返回信息。因此,對於 UserLoginCommand 這個請求來說,UserLoginCommandHandler 這個請求的處理類,最終需要繼承於 IRequestHandler<UserLoginCommand, bool>。

  就像上面提到的一樣,我們需要在這個請求的處理類中對用戶請求的信息進行處理,在 UserLoginCommandHandler 類中,我們應該在 Handle 方法中去執行我們的判斷邏輯,這裏我們會引用到倉儲來獲取用戶的相關信息。倉儲中的代碼這裏我就不展示了,最終我們實現后的代碼如下所示。

public class UserLoginCommandHandler : IRequestHandler<UserLoginCommand, bool>
{
    #region Initizalize

    /// <summary>
    /// 倉儲實例
    /// </summary>
    private readonly IUserRepository _userRepository;

    /// <summary>
    /// ctor
    /// </summary>
    /// <param name="userRepository"></param>
    public UserLoginCommandHandler(IUserRepository userRepository)
    {
        _userRepository = userRepository ?? throw new ArgumentNullException(nameof(userRepository));
    }

    #endregion Initizalize

    /// <summary>
    /// Command Handler
    /// </summary>
    /// <param name="request"></param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    public async Task<bool> Handle(UserLoginCommand request, CancellationToken cancellationToken)
    {
        // 1、判斷驗證碼是否正確
        if (string.IsNullOrEmpty(request.VerificationCode))
            return false;

        // 2、驗證登錄密碼是否正確
        var appUser = await _userRepository.GetAppUserInfo(request.Account.Trim(), request.Password.Trim());
        if (appUser == null)
            return false;

        return true;
    }
}

  當我們完成了對於請求的處理代碼后,就可以在 controller 中提供用戶訪問的入口。當然,因為我們需要採用依賴注入的方式去使用 MediatR,所以在使用之前,我們需要將請求的對應處理關係注入到依賴注入容器中。

  在通過依賴注入的方式使用 MediatR 時,我們需要將所有的事件(請求以及通知)注入到容器中,而 MediatR 則會自動尋找對應事件的處理類,除此之外,我們也需要將通過依賴注入使用到的 IMediator 接口的實現類注入到容器中。而在這個示例項目中,我們主要是在 Sample.Domain、Sample.Application 以及我們的 Web Api 項目中使用到了 MediatR,因此,我們需要將這三個項目中使用到 MediatR 的類全部注入到容器中。

  一個個的注入會比較的麻煩,所以這裏我還是採用對指定的程序集進行反射操作,去獲取需要加載的信息批量的進行注入操作,最終實現后的代碼如下。

public static IServiceCollection AddCustomMediatR(this IServiceCollection services, MediatorDescriptionOptions options)
{
    // 獲取 Startup 類的 type 類型
    var mediators = new List<Type> { options.StartupClassType };

    // IRequest<T> 接口的 type 類型
    var parentRequestType = typeof(IRequest<>);

    // INotification 接口的 type 類型
    var parentNotificationType = typeof(INotification);

    foreach (var item in options.Assembly)
    {
        var instances = Assembly.Load(item).GetTypes();

        foreach (var instance in instances)
        {
            // 判斷是否繼承了接口
            //
            var baseInterfaces = instance.GetInterfaces();
            if (baseInterfaces.Count() == 0 || !baseInterfaces.Any())
                continue;

            // 判斷是否繼承了 IRequest<T> 接口
            //
            var requestTypes = baseInterfaces.Where(i => i.IsGenericType
                && i.GetGenericTypeDefinition() == parentRequestType);

            if (requestTypes.Count() != 0 || requestTypes.Any())
                mediators.Add(instance);

            // 判斷是否繼承了 INotification 接口
            //
            var notificationTypes = baseInterfaces.Where(i => i.FullName == parentNotificationType.FullName);

            if (notificationTypes.Count() != 0 || notificationTypes.Any())
                mediators.Add(instance);
        }
    }

    // 添加到依賴注入容器中
    services.AddMediatR(mediators.ToArray());

    return services;
}

  因為需要知道哪些程序集應該進行反射獲取信息,而對於 Web Api 這個項目來說,它只會通過依賴注入使用到 IMediator 這一個接口,所以這裏需要採用不同的參數的形式去確定具體需要通過反射加載哪些程序集。

public class MediatorDescriptionOptions
{
    /// <summary>
    /// Startup 類的 type 類型
    /// </summary>
    public Type StartupClassType { get; set; }

    /// <summary>
    /// 包含使用到 MediatR 組件的程序集
    /// </summary>
    public IEnumerable<string> Assembly { get; set; }
}

  最終,我們就可以在 Startup 類中通過擴展方法的信息進行快速的注入,實際使用的代碼如下,這裏我是將需要加載的程序集信息放在 appsetting 這個配置文件中的,你可以根據你的喜好進行調整。

public class Startup
{
    // This method gets called by the runtime. Use this method to add services to the container.
    public void ConfigureServices(IServiceCollection services)
    {
        // Config mediatr
        services.AddCustomMediatR(new MediatorDescriptionOptions
        {
            StartupClassType = typeof(Startup),
            Assembly = Configuration["Assembly:Mediator"].Split("|", StringSplitOptions.RemoveEmptyEntries)
        });
    }
}

  在這個示例項目中的配置信息如下所示。

{
  "Assembly": {
    "Function": "Sample.Domain",
    "Mapper": "Sample.Application",
    "Mediator": "Sample.Application|Sample.Domain"
  }
}

  當我們注入完成后,就可以直接在 controller 中進行使用。對於繼承了 IRequest 的方法,可以直接通過 Send 方法進行調用請求信息,MediatR 會幫我們找到對應請求的處理方法,最終登錄 action 中的代碼如下。

[ApiVersion("1.0")]
[ApiController]
[Route("api/v{version:apiVersion}/[controller]")]
public class UsersController : ControllerBase
{
    #region Initizalize

    /// <summary>
    ///
    /// </summary>
    private readonly IMediator _mediator;

    /// <summary>
    /// ctor
    /// </summary>
    /// <param name="mediator"></param>
    public UsersController(IMediator mediator)
    {
        _mediator = mediator ?? throw new ArgumentNullException(nameof(mediator));
    }

    #endregion Initizalize

    #region APIs

    /// <summary>
    /// 用戶登錄
    /// </summary>
    /// <param name="login">用戶登錄數據傳輸對象</param>
    /// <returns></returns>
    [HttpPost("login")]
    [ProducesResponseType(StatusCodes.Status200OK)]
    [ProducesResponseType(StatusCodes.Status401Unauthorized)]
    public async Task<IActionResult> Post([FromBody] AppUserLoginDto login)
    {
        // 實體映射轉換
        var command = new UserLoginCommand(login.Account, login.Password, login.VerificationCode);

        bool flag = await _mediator.Send(command);

        if (flag)
            return Ok(new
            {
                code = 20001,
                msg = $"{login.Account} 用戶登錄成功",
                data = login
            });
        else
            return Unauthorized(new
            {
                code = 40101,
                msg = $"{login.Account} 用戶登錄失敗",
                data = login
            });
    }

    #endregion APIs
}

  當我們完成了對於用戶登錄請求的處理之後,就可以去執行後續的“通知類”的事件。與用戶登錄的請求信息類相似,對於用戶登錄事件的通知類也只是包含一些通知的基礎信息。在 Smaple.Domain 這個類庫下面,創建一個 Events 文件用來存放我們的事件,我們來新建一個繼承於 INotification 接口的 AppUserLoginEvent 類,用來對用戶登錄事件進行相關的處理。

public class AppUserLoginEvent : INotification
{
    /// <summary>
    /// 賬戶
    /// </summary>
    public string Account { get; }

    /// <summary>
    /// ctor
    /// </summary>
    /// <param name="account"></param>
    public AppUserLoginEvent(string account)
    {
        Account = account;
    }
}

  在上文中有提到過,對於一個通知事件可能會存在着多種處理方式,所以這裏我們在 Smaple.Application 這個類庫的 DomainEventHandlers 文件夾下面會按照事件去創建對應的文件夾去存放實際處理方法。

  對於繼承了 INotification 接口的通知類來說,在 MediatR 中我們可以通過創建繼承於 INotificationHandler 接口的類去處理對應的事件。因為一個 notification 可以有多個的處理程序,所以我們可以創建多個的 NotificationHandler 類去處理同一個 notification。一個示例的 NotificationHandler 類如下所示。

public class SetCurrentUserEventHandler : INotificationHandler<AppUserLoginEvent>
{
    #region Initizalize

    /// <summary>
    ///
    /// </summary>
    private readonly ILogger<SetCurrentUserEventHandler> _logger;

    /// <summary>
    ///
    /// </summary>
    /// <param name="logger"></param>
    public SetCurrentUserEventHandler(ILogger<SetCurrentUserEventHandler> logger)
    {
        _logger = logger ?? throw new ArgumentNullException(nameof(logger));
    }

    #endregion Initizalize

    /// <summary>
    /// Notification handler
    /// </summary>
    /// <param name="notification"></param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    public Task Handle(AppUserLoginEvent notification, CancellationToken cancellationToken)
    {
        _logger.LogInformation($"CurrentUser with Account: {notification.Account} has been successfully setup");

        return Task.FromResult(true);
    }
}

  如何去引發這個事件,對於領域驅動設計的架構來說,一個更好的方法是將各種領域事件添加到事件的集合中,然後在提交事務之前或之後立即調度這些域事件,而對於我們這個項目來說,因為這不在這篇文章考慮的範圍內,只是演示如何去使用 MediatR 這個組件,所以這裏我就採取在請求邏輯處理完成后直接觸發事件的方式。

  在 UserLoginCommandHandler 類中,修改我們的代碼,在確認登錄成功后,通過調用 AppUser 類的 SetUserLoginRecord 方法來觸發我們的通知事件,修改后的代碼如下所示。

public class UserLoginCommandHandler : IRequestHandler<UserLoginCommand, bool>
{
    #region Initizalize

    /// <summary>
    /// 倉儲實例
    /// </summary>
    private readonly IUserRepository _userRepository;

    /// <summary>
    ///
    /// </summary>
    private readonly IMediator _mediator;

    /// <summary>
    /// ctor
    /// </summary>
    /// <param name="userRepository"></param>
    /// <param name="mediator"></param>
    public UserLoginCommandHandler(IUserRepository userRepository, IMediator mediator)
    {
        _userRepository = userRepository ?? throw new ArgumentNullException(nameof(userRepository));
        _mediator = mediator ?? throw new ArgumentNullException(nameof(mediator));
    }

    #endregion Initizalize

    /// <summary>
    /// Command Handler
    /// </summary>
    /// <param name="request"></param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    public async Task<bool> Handle(UserLoginCommand request, CancellationToken cancellationToken)
    {
        // 1、判斷驗證碼是否正確
        if (string.IsNullOrEmpty(request.VerificationCode))
            return false;

        // 2、驗證登錄密碼是否正確
        var appUser = await _userRepository.GetAppUserInfo(request.Account.Trim(), request.Password.Trim());
        if (appUser == null)
            return false;

        // 3、觸發登錄事件
        appUser.SetUserLoginRecord(_mediator);

        return true;
    }
}

  與使用 Send 方法去調用 request 類的請求不同,對於繼承於 INotification 接口的事件通知類,我們需要採用 Publish 的方法去調用。至此,對於一個採用中介者模式設計的登錄流程就結束了,SetUserLoginRecord 方法的定義,以及最終我們實現的效果如下所示。

public void SetUserLoginRecord(IMediator mediator)
{
    mediator.Publish(new AppUserLoginEvent(Account));
}

 三、總結

  這一章主要是介紹了如何通過 MediatR 來實現中介者模式,因為自己也是第一次接觸這種思想,對於 MediatR 這個組件也是第一次使用,所以僅僅是採用案例分享的方式對中介者模式的使用方法進行了一個解釋。如果你想要對中介者模式的具體定義與基礎的概念進行進一步的了解的話,可能需要你自己去找資料去弄明白具體的定義。因為初次接觸,難免會有遺漏或錯誤,如果從文章中發現有不對的地方,歡迎在評論區中指出,先行感謝。

 四、參考

  1、

  2、

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想要讓你的商品成為最夯、最多人討論的話題?網頁設計公司讓你強力曝光

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

※專營大陸快遞台灣服務

台灣快遞大陸的貨運公司有哪些呢?

分類
發燒車訊

Kafka needs no Keeper(關於KIP-500的討論)

寫在前面的

最近看了Kafka Summit上的這個分享,覺得名字很霸氣,標題直接沿用了。這個分享源於社區的,大體的意思今後Apache Kafka不再需要ZooKeeper。整個分享大約40幾分鐘。完整看下來感覺乾貨很多,這裏特意總結出來。如果你把這個分享看做是《三國志》的話,那麼姑且就把我的這篇看做是裴松之注吧:)

客戶端演進

首先,社區committer給出了Kafka Java客戶端移除ZooKeeper依賴的演進過程。下面兩張圖總結了0.8.x版本和0.11.x版本(是否真的是從0.11版本開始的變化並不重要)及以後的功能變遷:在Kafka 0.8時代,Kafka有3個客戶端,分別是Producer、Consumer和Admin Tool。其中Producer負責向Kafka寫消息,Consumer負責從Kafka讀消息,而Admin Tool執行各種運維任務,比如創建或刪除主題等。其中Consumer的位移數據保存在ZooKeeper上,因此Consumer端的位移提交和位移獲取操作都需要訪問ZooKeeper。另外Admin Tool執行運維操作也要訪問ZooKeeper,比如在對應的ZooKeeper znode上創建一個臨時節點,然後由預定義的Watch觸發相應的處理邏輯。

後面隨着Kafka的演進,社區引入了__consumer_offsets位移主題,同時定義了OffsetFetch和OffsetCommit等新的RPC協議,這樣Consumer的位移提交和位移獲取操作全部轉移到與位移主題進行交互,避免了對ZooKeeper的訪問。同時社區引入了新的運維工具AdminClient以及相應的CreateTopics、DeleteTopics、AlterConfigs等RPC協議,替換了原先的Admin Tool,這樣創建和刪除主題這樣的運維操作也完全移動Kafka這一端來做,就像下面右邊這張圖展示的:

至此, Kafka的3個客戶端基本上都不需要和ZooKeeper交互了。應該說移除ZooKeeper的工作完成了大部分,但依然還有一部分工作要在ZooKeeper的幫助下完成,即Consumer的Rebalance操作。在0.8時代,Consumer Group的管理是交由ZooKeeper完成的,包括組成員的管理和訂閱分區的分配。這個設計在新版Consumer中也得到了修正。全部的Group管理操作交由Kafka Broker端新引入的Coordinator組件來完成。要完成這些工作,Broker端新增了很多RPC協議,比如JoinGroup、SyncGroup、Heartbeat、LeaveGroup等。

  

此時,Kafka的Java客戶端除了AdminClient還有一點要依賴ZooKeeper之外,所有其他的組件全部擺脫了對ZooKeeper的依賴。

之後,社區引入了Kafka安全層,實現了對用戶的認證和授權。這個額外的安全層也是不需要訪問ZooKeeper的,因此之前依賴ZooKeeper的客戶端是無法“享用”這個安全層。一旦啟用,新版Clients都需要首先接入這一層並通過審核之後才能訪問到Broker,如下圖所示:

這麼做的好處在於統一了Clients訪問Broker的模式,即定義RPC協議,比如我們熟知的PRODUCE協議、FETCH協議、METADATA協議、CreateTopics協議等。如果後面需要實現更多的功能,社區只需要定義新的RPC協議即可。同時新引入的安全層負責對這套RPC協議進行安全校驗,統一了訪問模式。另外這些協議都是版本化的(versioned),因此能夠獨立地進行演進,同時也兼顧了兼容性方面的考量。

Broker間交互

說完了Clients端,我們說下Broker端的現狀。目前,應該說Kafka Broker端對ZooKeeper是重度依賴的,主要表現在以下幾個方面:

  • Broker註冊管理
  • ACL安全層配置管理
  • 動態參數管理
  • 副本ISR管理
  • Controller選舉

我們拿一張圖來說明,圖中有4個Broker節點和一個ZooKeeper,左上角的Broker充當Controller的角色。當前,所有的Broker啟動后都必須維持與ZooKeeper的會話。Kafka依賴於這個會話實現Broker端的註冊,而且Kafka集群中的所有配置信息、副本信息、主題信息也都保存在ZooKeeper上。最後Controller與集群中每個Broker都維持了一個TCP長連接用於向這些Broker發送RPC請求。當前的Controller RPC類型主要有3大類:

  • LeaderAndIsr:主要用於向集群廣播主題分區Leader和ISR的變更情況,比如對應的Broker應該是特定分區的Leader還是Follower
  • StopReplica:向集群廣播執行停止副本的命令
  • UpdateMetadata:向集群廣播執行變更元數據信息的命令

圖中還新增了一個AlterISR RPC,這是KIP-497要實現的新RPC協議。現階段Kafka各個主題的ISR信息全部保存在ZooKeeper中。如果後續要捨棄ZooKeeper,必須要將這些信息從ZooKeeper中移出來,放在了Controller一端來做。同時還要在程序層面支持對ISR的管理。因此社區計劃在KIP-497上增加AlterISR協議。對了,還要提一句,當前Controller的選舉也是依靠ZooKeeper完成的。

所以後面Broker端的演進可能和Clients端的路線差不多:首先是把Broker與ZooKeeper的交互全部幹掉,只讓Controller與ZooKeeper進行交互,而其他所有Broker都只與Controller交互,如下圖所示:

 

看上去這種演進路線社區已經走得輕車熟路了,但實際上還有遺留了一些問題需要解決。

Broker Liveness

首先就是Broker的liveness問題,即Kafka如何判斷一個Broker到底是否存活?在目前的設計中,Broker的生存性監測完全依賴於與ZooKeeper之間的會話。一旦會話超時或斷開Controller自動觸發ZooKeeper端的Watch來移除該Broker,並對其上的分區做善後處理。如果移除了ZooKeeper,Kafka應該採用什麼機制來判斷Broker的生存性是一個問題。

Network Partition

如何防範網絡分區也是一個需要討論的話題。當前可能出現的Network Partition有4種:1、單個Broker完全與集群隔離;2、Broker間無法通訊;3、Broker與ZooKeeper無法通訊;4、Broker與Controller無法通訊。下面4張圖分別展示了這4種情況:

 

我們分別討論下。首先是第一種情況,單Broker與集群其他Broker隔離,這其實並不算太嚴重的問題。當前的設計已然能夠保證很好地應對此種情況。一旦Broker被隔離,Controller會將其從集群中摘除,雖然可用性降低了,但是整個集群的一致性依然能夠得到保證。第二種情況是Broker間無法通訊,可能的後果是消息的備份機制無法執行,Kafka要收縮ISR,依然是可用性上的降低,但是一致性狀態並沒有被破壞。情況三是Broker無法與ZooKeeper通訊。Broker能正常運轉,它只是無法與ZooKeeper進行通訊。此時我們說該Broker處於殭屍狀態,即所謂的Zoobie狀態。因Zoobie狀態引入的一致性bug社區jira中一直沒有斷過,社區這幾年也一直在修正這方面的問題,主要對抗的機制就是fencing。比如leader epoch等。最後一類情況是Broker無法與Controller通訊,那麼所有的元數據更新通道被堵死,即使這個Broker依然是healthy的,但是它保存的元數據信息可能是非常過期的。這樣連接該Broker的客戶端可能會看到各種非常古怪的問題。之前在知乎上回答過類似的問題:4。目前,社區對這種情況並沒有太好的解決辦法,主要的原因是Broker的liveness完全交由ZooKeeper來做的。一旦Broker與ZooKeeper之間的交互沒有問題,其他原因導致的liveness問題就無法徹底規避。

第四類Network Partition引入了一個經典的場景:元數據不一致。目前每個Broker都緩存了一份集群的元數據信息,這份數據是異步更新的。當第四類Partition發生時,Broker端緩存的元數據信息必然與Controller的不同步,從而造成各種各樣的問題。

下面簡要介紹一下元數據更新的過程。主要的流程就是Controller啟動時會同步地從ZooKeeper上拉取集群全量的元數據信息,之後再以異步的方式同步給其他Broker。其他Broker與Controller之間的同步往往有一個時間差,也就是說可能Clients訪問的元數據並不是最新的。我個人認為現在社區很多flaky test failure都是因為這個原因導致的。 事實上,實際使用過程中有很多場景是Broker端的元數據與Controller端永遠不同步。通常情況下如果我們不重啟Broker的話,那麼這個Broker上的元數據將永遠“錯誤”下去。好在社區還給出了一個最後的“大招”: 登錄到ZooKeeper SHELL,手動執行rmr /controller,強迫Controller重選舉,然後重新加載元數據,並給所有Broker重刷一份。不過在實際生產環境,我懷疑是否有人真的要這麼干,畢竟代價不小,而且最關鍵的是這麼做依然可能存在兩個問題:1. 我們如何確保Controller和Broker的數據是一致的?2. 加載元數據的過程通常很慢。

這裏詳細說說第二點,即加載元數據的性能問題。總體來說,加載元數據是一個O(N)時間複雜度的過程,這裏的N就是你集群中總的分區數。考慮到Controller從ZooKeeper加載之後還要推給其他的Broker,那麼做這件事的總的時間複雜度就是O(N * M),其中M是集群中Broker的數量。可以想見,當M和N都很大時,在集群中廣播元數據不是一個很快的過程。

Metadata as an Event Log

Okay,鑒於以上所提到的所有問題,當Kafka拋棄了ZooKeeper之後,社區應該如何解決它們呢?總體的思路就是Metadata as an Event Log + Controller quorum。我們先說metadata as an event log。如果你讀過Jay Kreps的《I ️Logs》,你應該有感觸,整個Kafka的架構其實都是構建在Log上的。每個topic的分區本質上就是一個Commit Log,但元數據信息的保存卻不是Log形式。在現有的架構設計中你基本上可以認為元數據的數據結構是KV形式的。這一次,社區採用了與消息相同的數據保存方式,即將元數據作為Log的方式保存起來,如下錶所示:

 

這樣做的好處在於每次元數據的變更都被當做是一條消息保存在Log中,而這個Log可以被視作是一個普通的Kafka主題被備份到多台Broker上。Log的一個好處在於它有清晰的前後順序關係,即每個事件發生的時間是可以排序的,配合以恰當的處理邏輯,我們就能保證對元數據變更的處理是按照變更發生時間順序處理,不出現亂序的情形。另外Log機制還有一個好處是,在Broker間同步元數據時,我們可以選擇同步增量數據(delta),而非全量狀態。現在Kafka Broker間同步元數據都是全量狀態同步的。前面說過了,當集群分區數很大時,這個開銷是很可觀的。如果我們能夠只同步增量狀態,勢必能極大地降低同步成本。最後一個好處是,我們可以很容易地量化元數據同步的進度,因為對Log的消費有位移數據,因此通過監控Log Lag就能算出當前同步的進度或是落後的進度。

採用Log機制后,其他Broker像是一個普通的Consumer,從Controller拉取元數據變更消息或事件。由於每個Broker都是一個Consumer,所以它們會維護自己的消費位移,就像下面這張圖一樣:

 這種設計下,Controller所在的Broker必須要承擔起所有元數據topic的管理工作,包括創建topic、管理topic分區的leader以及為每個元數據變更創建相應的事件等。既然社區選擇和__consumer_offsets類似的處理方式,一個很自然的問題在於這個元數據topic的管理是否能夠復用Kafka現有的副本機制?答案是:不可行。理由是現有的副本機制依賴於Controller,因此Kafka沒法依靠現有的副本機制來實現Controller——按照我們的俗語來說,這有點雞生蛋、蛋生雞的問題,屬於典型的循環依賴。為了實現這個,Kafka需要一套leader選舉協議,而這套協議或算法是不依賴於Controller的,即它是一個自管理的集群quorum(抱歉,在分佈式領域內,特別是分佈式共識算法領域中,針對quorum的恰當翻譯我目前還未找到,因此直接使用quorum原詞了)。最終社區決定採用Raft來實現這組quorum。這就是上面我們提到的第二個解決思路:Controller quorum。

Controller Quorum

與藉助Controller幫忙選擇Leader不同,Raft是讓自己的節點自行選擇Leader並最終令所有節點達成共識——對選擇Controller而言,這是一個很好的特性。其實Kafka現有的備份機制與Raft已經很接近了,下錶羅列了一下它們的異同:

 一眼掃過去,其實Kafka的備份機制和Raft很類似,比如Kafka中的offset其實就是Raft中的index,epoch對應於term。當然Raft中採用的半數機制來確保消息被提交以及Leader選舉,而Kafka設計了ISR機制來實現這兩點。總體來說,社區認為只需要對備份機製做一些小改動就應該可以很容易地切換到Raft-based算法。

下面這張圖展示Controller quorum可能更加直觀:

整個controller quorum類似於一個小的集群。和ZooKeeper類似,這個quorum通常是3台或5台機器,不需要讓Kafka中的每個Broker都自動稱為這個quorum中的一個節點。該quorum裏面有一個Leader負責處理客戶端發來的讀寫請求,這個Leader就是Kafka中的active controller。根據ZooKeeper的Zab協議,leader處理所有的寫請求,而follower是可以處理讀請求的。當寫請求發送給follower后,follower會將該請求轉發給leader處理。不過我猜Kafka應該不會這樣實現,它應該只會讓leader(即active controller)處理所有的讀寫請求,而客戶端(也就是其他Broker)壓根就不會發送讀寫請求給follower。在這一點上,這種設計和現有的Kafka請求處理機制是一致的。

現在還需要解決一個問題,即Leader是怎麼被選出來的?既然是Raft-based,那麼採用的也是Raft算法中的Leader選舉策略。讓Raft選出的Leader稱為active controller。網上有很多關於Raft選主的文章,這裏就不在贅述了,有興趣的可以讀一讀Raft的論文:《In Search of an Understandable Consensus Algorithm(Extended Version)》。

這套Raft quorum的一個好處在於它天然提供了低延時的failover,因此leader的切換會非常的迅速和及時,因為理論上不再有元數據加載的過程了,所有的元數據現在都同步保存follower節點的內存中,它已經有其他Broker需要拉取的所有元數據信息了!更酷的是,它避免了現在機制中一旦Controller切換要全量拉取元數據的低效行為,Broker無需重新拉取之前已經“消費”的元數據變更消息,它只需要從新Leader繼續“消費”即可。

另一個好處在於:採用了這套機制后,Kafka可以做元數據的緩存了(metadata caching):即Broker能夠把元數據保存在磁盤上,同時就像剛才說的,Broker只需讀取它關心的那部分數據即可。還有,和現在snapshot機制類似,如果一個Broker保存的元數據落後Controller太多或者是一個全新的Broker,Kafka甚至可以像Raft那樣直接發送一個snapshot文件,快速令其追上進度。當然大多數情況下,Broker只需要拉取delta增量數據即可。

Post KIP-500 Broker註冊

當前Broker啟動之後會向ZooKeeper註冊自己的信息,比如自己的主機名、端口、監聽協議等數據。移除ZooKeeper之後,Broker的註冊機制也要發生變化:Broker需要向active controller發送心跳來進行註冊。Controller收集心跳中包含的Broker數據構建整個Kafka集群信息,如下圖所示:

 同時Controller也會對心跳進行響應,顯式地告知Broker它們是否被允許加入集群——如果不允許,則可能需要被隔離(fenced)。當然controller自己也可以對自己進行隔離。我們針對前面提到的隔離場景討論下KIP-500是怎麼應對的。

Fencing

首先是普通Broker與集群完全隔離的場景,比如該Broker無法與controller和其他Broker進行通信,但它依然可以和客戶端程序交互。此時,fencing機制就很簡單了,直接讓controller令其下線即可。這和現在依靠ZooKeeper會話機制維持Broker判活的機制是一模一樣的,沒有太大改進。

第二種情況是Broker間的通訊中斷。此時消息無法在leader、follower間進行備份。但是對於元數據而言,我們不會看到數據不一致的情形,因為Broker依然可以和controller通訊,因此也不會有什麼問題。

第三種情況是Broker與Controller的隔離。現有機制下這是個問題,但KIP-500之後,Controller僅僅將該Broker“踢出場”即可,不會造成元數據的不一致。

最後一種情況是Broker與ZooKeeper的隔離, 既然ZooKeeper要被移除了,自然這也不是問題了。

部署

終於聊到KIP-500之後的Kafka運維了。下錶總結了KIP-500前後的部署情況對比:

很簡單,現在任何時候部署和運維Kafka都要考慮對ZooKeeper的運維管理。在KIP-500之後我們只需要關心Kafka即可。

Controller quorum共享模式

如前所述,controller改成Raft quorum機制后,可能使用3或5台機器構成一個小的quorum。那麼一個很自然的問題是,這些Broker機器還能否用作他用,是唯一用作controller quorum還是和其他Broker一樣正常處理。社區對此也做了解釋:兩種都支持!

如果你的Kafka集群資源很緊張,你可以使用共享controller模式(Shared Controller Mode),即充當controller quorum的Broker機器也能處理普通的客戶端請求;相反地,如果你的Kafka資源很充足,專屬controller模式(Separate Controller Mode)可能是更適合的,即在controller quorum中的Broker機器排它地用作Controller的選舉之用,不再對客戶端提供讀寫服務。這樣可以實現更好的資源隔離,適用於大集群。

Roadmap

最後說一下KIP-500的計劃。社區計劃分三步走:

第一步是移除客戶端對ZooKeeper的依賴——這一步基本上已經完成了,除了目前AdminClient還有少量的API依賴ZooKeeper之外,其他客戶端應該說都不需要訪問ZooKeeper了;第二步是移除Broker端的ZooKeeper依賴:這主要包括移除Broker端需要訪問ZooKeeper的代碼,以及增加新的Broker端API,如前面所說的AlterISR等,最後是將對ZooKeeper的訪問全部集中在controller端;最後一步就是實現controller quorum,實現Raft-based的quorum負責controller的選舉。

至於Kafka升級,如果從現有的Kafka直接升級到KIP-500之後的Kafka會比較困難,因此社區打算引入一個名為Bridge Release的中間過渡版本,如下圖所示:

這個Bridge版本的特點在於所有對ZooKeeper的訪問都集中到了controller端,Broker訪問ZooKeeper的其他代碼都被移除了。 

總結

KIP-500應該說是最近幾年社區提出的最重磅的KIP改進了。它幾乎是顛覆了Kafka已有的使用模式,摒棄了之前重度依賴的Apache ZooKeeper。就我個人而言,我是很期待這個KIP,後續有最新消息我也會在一併同步出來。讓我們靜觀其變吧~~~

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

大陸寄台灣空運注意事項

大陸海運台灣交貨時間多久?

※避免吃悶虧無故遭抬價!台中搬家公司免費估價,有契約讓您安心有保障!

分類
發燒車訊

020.掌握Pod-Pod基礎使用

一 Pod定義詳解

1.1 完整Pod定義文件

  1 apiVersion: v1			#必選,版本號,例如v1,版本號必須可以用 kubectl api-versions 查詢到
  2 kind: Pod				#必選,Pod
  3 metadata:				#必選,元數據
  4   name: string			#必選,Pod名稱,需符合RFC 1035規範
  5   namespace: string			#必選,Pod所屬的命名空間,默認為"default"
  6   labels:				#自定義標籤
  7     - name: string			#自定義標籤名字
  8   annotations:			#自定義註釋列表
  9     - name: string
 10 spec:				#必選,Pod中容器的詳細定義
 11   containers:			#必選,Pod中容器列表
 12   - name: string			#必選,容器名稱,需符合RFC 1035規範
 13     image: string			#必選,容器的鏡像名稱
 14     imagePullPolicy: [ Always|Never|IfNotPresent ]	#獲取鏡像的策略,Alawys表示每次都嘗試下載鏡像,IfnotPresent表示優先使用本地鏡像,否則下載鏡像,Nerver表示僅使用本地鏡像
 15     command: [string]		#容器的啟動命令列表,如不指定,使用打包時使用的啟動命令
 16     args: [string]			#容器的啟動命令參數列表
 17     workingDir: string		#容器的工作目錄
 18     volumeMounts:			#掛載到容器內部的存儲卷配置
 19     - name: string			#引用pod定義的共享存儲卷的名稱,需用volumes[]部分定義的的卷名
 20       mountPath: string		#存儲卷在容器內mount的絕對路徑,應少於512字符
 21       readOnly: boolean		#是否為只讀模式,默認為讀寫模式
 22     ports:				#需要暴露的端口庫號列表
 23     - name: string			#端口的名稱
 24       containerPort: int		#容器需要監聽的端口號
 25       hostPort: int		        #容器所在主機需要監聽的端口號,默認與Container相同
 26       protocol: string		#端口協議,支持TCP和UDP,默認TCP
 27     env:				#容器運行前需設置的環境變量列表
 28     - name: string			#環境變量名稱
 29       value: string		        #環境變量的值
 30     resources:			#資源限制和請求的設置
 31       limits:			#資源限制的設置
 32         cpu: string		        #CPU的限制,單位為core數,將用於docker run --cpu-shares參數
 33         memory: string		#內存限制,單位可以為Mib/Gib,將用於docker run --memory參數
 34       requests:			#資源請求的設置
 35         cpu: string		        #CPU請求,容器啟動的初始可用數量
 36         memory: string		#內存請求,容器啟動的初始可用數量
 37     livenessProbe:			#對Pod內各容器健康檢查的設置,當探測無響應幾次后將自動重啟該容器,檢查方法有exec、httpGet和tcpSocket,對一個容器只需設置其中一種方法即可
 38       exec:			        #對Pod容器內檢查方式設置為exec方式
 39         command: [string]		#exec方式需要制定的命令或腳本
 40       httpGet:			#對Pod內個容器健康檢查方法設置為HttpGet,需要制定Path、port
 41         path: string
 42         port: number
 43         host: string
 44         scheme: string
 45         HttpHeaders:
 46         - name: string
 47           value: string
 48       tcpSocket:			#對Pod內個容器健康檢查方式設置為tcpSocket方式
 49          port: number
 50        initialDelaySeconds: 0	#容器啟動完成后首次探測的時間,單位為秒
 51        timeoutSeconds: 0		#對容器健康檢查探測等待響應的超時時間,單位秒,默認1秒
 52        periodSeconds: 0		#對容器監控檢查的定期探測時間設置,單位秒,默認10秒一次
 53        successThreshold: 0
 54        failureThreshold: 0
 55        securityContext:
 56          privileged: false
 57     restartPolicy: [Always | Never | OnFailure]	#Pod的重啟策略,Always表示一旦不管以何種方式終止運行,kubelet都將重啟,OnFailure表示只有Pod以非0退出碼退出才重啟,Nerver表示不再重啟該Pod
 58     nodeSelector: obeject		#設置NodeSelector表示將該Pod調度到包含這個label的node上,以key:value的格式指定
 59     imagePullSecrets:		#Pull鏡像時使用的secret名稱,以key:secretkey格式指定
 60     - name: string
 61     hostNetwork: false		#是否使用主機網絡模式,默認為false,如果設置為true,表示使用宿主機網絡
 62     volumes:			#在該pod上定義共享存儲卷列表
 63     - name: string			#共享存儲卷名稱 (volumes類型有很多種)
 64       emptyDir: {}			#類型為emtyDir的存儲卷,與Pod同生命周期的一個臨時目錄。為空值
 65       hostPath: string		#類型為hostPath的存儲卷,表示掛載Pod所在宿主機的目錄
 66         path: string		#Pod所在宿主機的目錄,將被用於同期中mount的目錄
 67       secret:			#類型為secret的存儲卷,掛載集群與定義的secre對象到容器內部
 68         scretname: string
 69         items:
 70         - key: string
 71           path: string
 72       configMap:			#類型為configMap的存儲卷,掛載預定義的configMap對象到容器內部
 73         name: string
 74         items:
 75         - key: string
 76           path: string

二 Pod的基本用法

2.1 創建Pod


Pod可以由1個或多個容器組合而成,通常對於緊耦合的兩個應用,應該組合成一個整體對外提供服務,則應該將這兩個打包為一個pod。

屬於一個Pod的多個容器應用之間相互訪問只需要通過localhost即可通信,這一組容器被綁定在一個環境中。

  1 [root@k8smaster01 study]# vi frontend-localredis-pod.yaml
  2 apiVersion: v1
  3 kind: Pod
  4 metadata:
  5   name: redis-php
  6   label:
  7     name: redis-php
  8 spec:
  9   containers:
 10   - name: frontend
 11     image: kubeguide/guestbook-php-frontend:localredis
 12     ports:
 13     - containersPort: 80
 14   - name: redis-php
 15     image: kubeguide/redis-master
 16     ports:
 17     - containersPort: 6379
 18 
 19 [root@k8smaster01 study]# kubectl create -f frontend-localredis-pod.yaml
 20 


2.2 查看Pod

  1 [root@k8smaster01 study]# kubectl get pods	                #READY為2/2,表示此Pod中運行了yaml定義的兩個容器
  2 NAME        READY   STATUS    RESTARTS   AGE
  3 redis-php   2/2     Running   0          7m45s
  4 [root@k8smaster01 study]# kubectl describe pod redis-php	#查看詳細信息
  5 


三 靜態Pod

3.1 靜態Pod概述


靜態pod是由kubelet進行管理的僅存在於特定Node的Pod上,他們不能通過API Server進行管理,無法與ReplicationController、Deployment或者DaemonSet進行關聯,並且kubelet無法對他們進行健康檢查。靜態Pod總是由kubelet進行創建,並且總是在kubelet所在的Node上運行。

創建靜態Pod有兩種方式:配置文件或者HTTP方式。

3.2 配置文件方式創建

  1 [root@k8snode01 ~]# mkdir -p /etc/kubelet.d
  2 [root@k8snode01 ~]# vi /etc/kubelet.d/static-web.yaml
  3 apiVersion: v1
  4 kind: Pod
  5 metadata:
  6   name: static-web
  7   label:
  8     name: static-web
  9 spec:
 10   containers:
 11   - name: static-web
 12     image: nginx
 13     ports:
 14     - name: web
 15       containersPort: 80
 16 
 17 [root@k8snode01 ~]# vi /etc/systemd/system/kubelet.service
 18 ……
 19   --config=/etc/kubelet.d/ \·				#加入此參數
 20 ……
 21 [root@k8snode01 ~]# systemctl daemon-reload
 22 [root@k8snode01 ~]# systemctl restart kubelet.service	#重啟kubelet
 23 [root@k8snode01 ~]# docker ps				#查看創建的pod



提示:由於靜態pod不能通過API Server進行管理,因此在Master節點執行刪除操作後會變為Pending狀態,且無法刪除。刪除該pod只能在其運行的node上,將定義POD的yaml刪除。

3.3 HTTP方式


通過設置kubelet的啟動參數–mainfest-url,會定期從該URL下載Pod的定義文件,並以.yaml或.json文件的格式進行解析,從而創建Pod。

四 Pod容器共享Volume

4.1 共享Volume


在同一個Pod中的多個容器能夠共享Pod級別的存儲就Volume。Volume可以被定義為各種類型,多個容器各自進行掛載操作,將一個Volume掛載為容器內部需要的目錄。


示例1:

Pod級別設置Volume “app-logs”,同時Pod包含兩個容器,Tomcat向該Volume寫日誌,busybox讀取日誌文件。

  1 [root@k8smaster01 study]# vi pod-volume-applogs.yaml
  2 apiVersion: v1
  3 kind: Pod
  4 metadata:
  5   name: volume-pod
  6 spec:
  7   containers:
  8   - name: tomcat
  9     image: tomcat
 10     ports:
 11     - containerPort: 8080
 12     volumeMounts:
 13     - name: app-logs
 14       mountPath: /usr/local/tomcat/logs
 15   - name: logreader
 16     image: busybox
 17     command: ["sh","-c","tail -f /logs/catalina*.log"]
 18     volumeMounts:
 19     - name: app-logs
 20       mountPath: /logs
 21   volumes:
 22   - name: app-logs
 23     emptyDir: {}

解釋:

Volume名:app-logs;

emptyDir:為Pod分配到Node的時候創建。無需指定宿主機的目錄文件,為Kubernetes自動分配的目錄。

  1 [root@k8smaster01 study]# kubectl create -f pod-volume-applogs.yaml	#創建
  2 [root@k8smaster01 study]# kubectl get pods				#查看
  3 [root@k8smaster01 study]# kubectl logs volume-pod -c busybox	#讀取log




  1 [root@k8smaster01 study]# kubectl exec -it volume-pod -c tomcat -- ls /usr/local/tomcat/logs
  2 catalina.2019-06-29.log      localhost_access_log.2019-06-29.txt
  3 host-manager.2019-06-29.log  manager.2019-06-29.log
  4 localhost.2019-06-29.log
  5 [root@k8smaster01 study]# kubectl exec -it volume-pod -c tomcat -- tail /usr/local/tomcat/logs/catalina.2019-06-29.log



提示:通過tomcat容器可查看日誌,對比busybox通過共享Volume查看的日誌是否一致。

五 Pod配置管理

5.1 Pod配置概述


應用部署的一個最佳實踐是將應用所需的配置信息與程序進行分離,使程序更加靈活。將相應的應用打包為鏡像,可以通過環境變量或者外掛volume的方式在創建容器的時候進行配置注入,從而實現更好的復用。

Kubernetes提供一種統一的應用配置管理方案:ConfigMap。

5.2 ConfigMap概述


ConfigMap供容器使用的主要場景:

  • 生成容器內部的環境變量;
  • 設置容器的啟動命令的參數(需設置為環境變量);
  • 以volume的形式掛載為容器內部的文件或者目錄。


ConfigMap以一個或多個key:value的形式定義。value可以是string也可以是一個文件內容,可以通過yaml配置文件或者通過kubectl create configmap 的方式創建configMap。

5.3 創建ConfigMap資源對象——yaml方式

  1 [root@k8smaster01 study]# vi cm-appvars.yaml
  2 apiVersion: v1
  3 kind: ConfigMap
  4 metadata:
  5   name: cm-appvars
  6 data:
  7   apploglevel: info
  8   appdatadir: /var/data
  9 
 10 [root@k8smaster01 study]# kubectl create -f cm-appvars.yaml
 11 configmap/cm-appvars created
 12 [root@k8smaster01 study]# kubectl get configmaps
 13 NAME         DATA   AGE
 14 cm-appvars   2      8s
 15 [root@k8smaster01 study]# kubectl describe configmaps cm-appvars



  1 [root@k8smaster01 study]# kubectl get configmaps cm-appvars -o yaml


5.4 創建ConfigMap資源對象——命令行方式


語法1

  1 # kubectl create configmap NAME --from-file=[key=]source --from-file=[key=]source



解釋:通過–from-file參數從文件中創建,可以指定key名稱,也可以制定多個key。

語法2

  1 # kubectl create configmap NAME --from-file=config-files-dir



解釋:通過–from-file參數從目錄中創建,該目錄下的每個配置文件名都被設置為key,文件的內容被設置為value。

語法3

  1 # kubectl create configmap NAME --from-literal=key1=value1 --from-literal=key2=value2



解釋:通過–from-literal參數從文本中創建,直接將指定的key#=value#創建為ConfigMap的內容。

5.5 Pod使用ConfigMap


容器應用使用ConfigMap有兩種方式:

  • 通過環境變量獲取ConfigMap中的內容;
  • 通過Volume掛載的方式將ConfigMap中的內容掛載為容器內容的文件或目錄。

  1 [root@k8smaster01 study]# vi cm-test-pod.yaml
  2 apiVersion: v1
  3 kind: Pod
  4 metadata:
  5   name: cm-test-pod
  6 spec:
  7   containers:
  8   - name: cm-test
  9     image: busybox
 10     command: ["/bin/sh","-c","env|grep APP"]	#容器里執行查看環境變量的命令
 11     env:
 12     - name: APPLOGLEVEL				#定義容器環境變量名稱
 13       valueFrom:
 14         configMapKeyRef:			#環境變量的值來自ConfigMap
 15           name: cm-appvars			#指定來自cm-appvars的ConfigMap
 16           key: apploglevel			#key為apploglevel
 17     - name: APPDATADIR
 18       valueFrom:
 19         configMapKeyRef:
 20           name: cm-appvars
 21           key: appdatadir
 22 
 23 [root@k8smaster01 study]# kubectl create -f cm-test-pod.yaml
 24 [root@k8smaster01 study]# kubectl get pods
 25 NAME          READY   STATUS      RESTARTS   AGE
 26 cm-test-pod   0/1     Completed   2          24s



【掛載形式-待補充】

5.6 ConfigMap限制


  • Configmap必須在pod創建之間創建;
  • ConfigMap受到namespace的限制,只有同一個命名空間下才能引用;
  • ConfigMap暫時無法配置配額;
  • 靜態的pod無法使用ConfigMap;
  • 在使用volumeMount掛載的時候,configMap基於items創建的文件會整體的將掛載數據卷的容器的目錄下的文件全部覆蓋。

六 Pod獲取自身信息

6.1 Downward API


pod擁有唯一的名字、IP地址,並且處於某個Namespace中。pod的容器內獲取pod的信息科通過Downward API實現。具體有以下兩種方式:

  • 環境變量:用於單個變量,可以將pod信息和container信息注入容器內部;
  • volume掛載:將數組類信息生成為文件,掛載至容器內部。


舉例1:通過Downward API將Pod的IP、名稱和所在的Namespace注入容器的環境變量。

  1 [root@k8smaster01 study]# vi dapi-test-pod.yaml
  2 apiVersion: v1
  3 kind: Pod
  4 metadata:
  5   name: dapi-test-pod
  6 spec:
  7   containers:
  8     - name: test-container
  9       image: busybox
 10       command: [ "/bin/sh", "-c", "env" ]
 11       env:
 12         - name: MY_POD_NAME
 13           valueFrom:
 14             fieldRef:
 15               fieldPath: metadata.name
 16         - name: MY_POD_NAMESPACE
 17           valueFrom:
 18             fieldRef:
 19               fieldPath: metadata.namespace
 20         - name: MY_POD_IP
 21           valueFrom:
 22             fieldRef:
 23               fieldPath: status.podIP
 24   restartPolicy: Never



提示:Downward API提供如下變量:

metadata.name:Pod的名稱,當Pod通過RC生成時,其名稱是RC隨機產生的唯一名稱;

status.podIP:Pod的IP地址,POd的IP屬於狀態數據,而非元數據;

metadata.namespace:Pod所在的namespace。

  1 [root@k8smaster01 study]# kubectl create -f dapi-test-pod.yaml
  2 [root@k8smaster01 study]# kubectl logs dapi-test-pod | grep MY_POD
  3 MY_POD_NAMESPACE=default
  4 MY_POD_IP=172.30.240.4
  5 MY_POD_NAME=dapi-test-pod
  6 



舉例2:通過Downward API將Container的自願請求和限制信息注入容器的環境變量。

  1 [root@k8smaster01 study]# vi dapi-test-pod-container-vars.yaml
  2 apiVersion: v1
  3 kind: Pod
  4 metadata:
  5   name: dapi-test-pod-container-vars
  6 spec:
  7   containers:
  8     - name: test-container
  9       image: busybox
 10       imagePullPolicy: Never
 11       command: [ "/bin/sh", "-c" ]
 12       args:
 13       - while true; do
 14           echo -en '\n';
 15           printenv MY_CPU_REQUEST MY_CPU_LIMIT;
 16           printenv MY_MEM_REQUEST MY_MEM_LIMIT;
 17           sleep 3600;
 18         done;
 19       resources:
 20         requests:
 21           memory: "32Mi"
 22           cpu: "125m"
 23         limits:
 24           memory: "64Mi"
 25           cpu: "250m"
 26       env:
 27         - name: MY_CPU_REQUEST
 28           valueFrom:
 29             resourceFieldRef:
 30               containerName: test-container
 31               resource: requests.cpu
 32         - name: MY_CPU_LIMIT
 33           valueFrom:
 34             resourceFieldRef:
 35               containerName: test-container
 36               resource: limits.cpu
 37         - name: MY_MEM_REQUEST
 38           valueFrom:
 39             resourceFieldRef:
 40               containerName: test-container
 41               resource: requests.memory
 42         - name: MY_MEM_LIMIT
 43           valueFrom:
 44             resourceFieldRef:
 45               containerName: test-container
 46               resource: limits.memory
 47   restartPolicy: Never



提示:Downward API提供如下變量:

requests.cpu:容器的CPU請求值;

limits.cpu:容器的CPU限制值;

requests.memory:容器的內存請求值;

limits.memory:容器的內存限制值。

  1 [root@k8smaster01 study]# kubectl create -f dapi-test-pod-container-vars.yaml
  2 [root@k8smaster01 study]# kubectl logs dapi-test-pod-container-vars
  3 1
  4 1
  5 33554432
  6 67108864



舉例3:通過Downward API將Pod的Label、Annotation列表通過Volume掛載為容器內的一個文件。

  1 [root@k8smaster01 study]# vi dapi-test-pod-volume.yaml
  2 apiVersion: v1
  3 kind: Pod
  4 metadata:
  5   name: dapi-test-pod-volume
  6   labels:
  7     zone: us-est-coast
  8     cluster: test-cluster1
  9     rack: rack-22
 10   annotations:
 11     build: two
 12     builder: john-doe
 13 spec:
 14   containers:
 15     - name: test-container
 16       image: busybox
 17       imagePullPolicy: Never
 18       command: [ "/bin/sh", "-c" ]
 19       args:
 20       - while true; do
 21           if [[ -e /etc/labels ]]; then
 22             echo -en '\n\n'; cat /etc/labels; fi;
 23           if [[ -e /etc/annotations ]]; then
 24             echo -en '\n\n'; cat /etc/annotations; fi;
 25           sleep 3600;
 26         done;
 27       volumeMounts:
 28         - name: podinfo
 29           mountPath: /etc
 30           readOnly: false
 31   volumes:
 32     - name: podinfo
 33       downwardAPI:
 34         items:
 35           - path: "labels"
 36             fieldRef:
 37               fieldPath: metadata.labels
 38           - path: "annotations"
 39             fieldRef:
 40               fieldPath: metadata.annotations



注意:Volume中的ddownwardAPI的items語法,將會以path的名稱生成文件。如上所示,會在容器內生產/etc/labels和/etc/annotations兩個文件,分別包含metadata.labels和metadata.annotations的全部Label。

  1 [root@k8smaster01 study]# kubectl create -f dapi-test-pod-volume.yaml
  2 [root@k8smaster01 study]# kubectl logs dapi-test-pod-volume
  3 



提示:DownwardAPI意義:

在某些集群中,集群中的每個節點需要將自身的標識(ID)及進程綁定的IP地址等信息事先寫入配置文件中,進程啟動時讀取此類信息,然後發布到某個類似註冊服務中心。此時可通過DowanwardAPI,將一個預啟動腳本或Init Container,通過環境變量或文件方式獲取Pod自身的信息,然後寫入主程序配置文件中,最後啟動主程序。本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業 缺乏曝光? 下一步”網站設計“幫您第一時間規劃公司的門面形象

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※試算大陸海運運費!

分類
發燒車訊

SpringBoot基本配置詳解

SpringBoot項目有一些基本的配置,比如啟動圖案(banner),比如默認配置文件application.properties,以及相關的默認配置項。

示例項目代碼在:

一、啟動圖案banner

編寫banner.txt放入resources文件夾下,然後啟動項目即可修改默認圖案。

關於banner的生成,可以去一些專門的網站。

比如:https://www.bootschool.net/ascii

二、配置文件application

2.1 application.properties/yml

resources下通常會默認生成一個application.properties文件,這個文件包含了SpringBoot項目的全局配置文件。裏面的配置項通常是這樣的:

server.port=8080

在這個文件里我們可以添加框架支持的配置項,比如項目端口號、JDBC連接的數據源、日誌級別等等。

現在比較流行的是將properties文件改為yml文件。yml文件的格式yaml是這樣的:

server:
    port: 8080

yml和properties的作用是一樣的。而yml的好處是顯而易見的——更易寫易讀。

屬性之間互相調用使用${name}:

eknown:
    email: eknown@163.com
    uri: http://www.eknown.cn
    title: 'hello, link to ${eknown.uri} or email to ${eknown.email}'

鏈接:

2.2 多環境配置文件

通常開發一個應用會有多個環境,常見如dev/prod,也會有test,甚至其他一些自定義的環境,SpringBoot支持配置文件的靈活切換。

定義新配置文件需要遵循以下格式:application-{profile}.properties 或者application-{profile}.yml

比如現在有dev和prod兩個環境,我需要在application.yml文件之外新建兩個文件:

  1. application-dev.yml

    server:
       port: 8080
  2. application-prod.yml

    server:
      port: 8081

然後在application.yml中通過application.profiles.active={profile}指明啟用那個配置:

application:
    profiles:
      active: dev

除了在application.yml中指定配置文件外,還可以通過啟動命令指定:java -jar xxx.jar --spring.profiles.active=dev

2.2 自定義配置項並獲取它

主要介紹兩種方式,獲取單個配置項和獲取多個配置項。

舉例:

eknown:
    email: eknown@163.com
    uri: http://www.eknown.cn

2.2.1 使用@Value註解獲取單個配置項

@Value("${eknown.email}")
private String email;

@Value("${eknown.uri}")
private String url;

注意:使用@Value註解的時候,所在類必須被Spring容器管理,也就是被@Component、@Controller、@Service等註解定義的類。

2.2.2 獲取多個配置項

第一種,定義一個bean類,通過@Value獲取多個配置項:

@Component
public class MyConfigBean {
  
}

然後我們通過get方法來獲取這些值:

@RestController
public class BasicAction {
  
  @Autowired
  private MyConfigBean myConfigBean;

}

第二種,使用註解@ConfigurationProperties:

@Component
@ConfigurationProperties(perfix="eknown")
public class MyConfigBean {

  private String email;
  private String uri;
}

這裏只需要通過prefix指定前綴即可,後面的值自動匹配。

這裏我們還使用了@Component註解來讓spring容器管理這個MyConfigBean。

此外,我們可以不需要引入@Component,轉而在Application啟動類上加上@EnableConfigurationProperties({MyConfigBean.class})來啟動這個配置。

注意:我們這裡是從主配置文件,也就是SpringBoot默認的application-profile文件中獲取配置數據的。

而從自定義的配置文件,比如test.yml這種形式中獲取配置項時,情況是有點不大一樣的。

三、自定義配置文件

上面介紹的配置文件都是springboot默認的application開頭的文件。如果要自定義一個配置文件呢,比如test.yml或test.properties,怎麼獲取其中的配置項呢?

使用@PageResource註解即可。

首先我們來看一下讀取自定義的properties文件里的內容:

test.properties

hello.time=2019.11.19
hello.name=eknown

定義Configuration類:

@Configuration
@PropertySource("classpath:test.properties")
//@PropertySource("classpath:test.yml") // 注意,yml文件不能直接這樣寫,會讀不出數據
@ConfigurationProperties(prefix = "hello")
public class TestConfiguration {
    private String name;
    private String time;

    // hide get and set methods
}

測試一下:

@RestController
@RequestMapping(value = "test")
public class TestAction {

    @Autowired
    private TestConfiguration testConfiguration;

    @GetMapping(value = "config")
    public String test() {
        return testConfiguration.getName() + "<br/>" + testConfiguration.getTime();
    }
}

如果將properties文件換成yml文件呢?

我們嘗試一下,發現:

讀不出數據?

分析一下@PropertySource註解,發現其使用的PropertySourceFactory是DefaultPropertySourceFactory.

這個類的源碼如下:

public class DefaultPropertySourceFactory implements PropertySourceFactory {
    public DefaultPropertySourceFactory() {
    }

    public PropertySource<?> createPropertySource(@Nullable String name, EncodedResource resource) throws IOException {
        return name != null ? new ResourcePropertySource(name, resource) : new ResourcePropertySource(resource);
    }
}

這個類只能處理properties文件,無法處理yml文件。所以我們需要自定義一個YmlSourceFactory。

public class YamlSourceFactory extends DefaultPropertySourceFactory {

    @Override
    public PropertySource<?> createPropertySource(String name, EncodedResource resource) throws IOException {
        return new YamlPropertySourceLoader().load(resource.getResource().getFilename()
                , resource.getResource()).get(0);
    }
}

然後定義test.yml文件的config類:

@Configuration
@PropertySource(value = "classpath:test.yml", encoding = "utf-8", factory = YamlSourceFactory.class)
@ConfigurationProperties(prefix = "yml.hello")
public class TestYamlConfiguration {
    private String name;
    private String time;

    // hide get and set methods
}

注:為了區分test.properties和test.yml,這裏的test.yml中的屬性以yml.hello開頭。

編寫一下測試:

    @Autowired
    private TestYamlConfiguration ymlConfiguration;

    @GetMapping(value = "yml")
    public String testYml() {
        return "yml config: <br/>" + ymlConfiguration.getName() + "<br/>" + ymlConfiguration.getTime();
    }

訪問:

四、補充@ConfigurationProperties

網上一些資料中,為配合使用@ConfigurationProperties,還使用了@EnableConfigurationProperties註解。

經過測試發現:

  1. 從SpringBoot默認配置文件讀取配置信息,使用@ConfigurationProperties + @Component/@Configuration,或者@ConfigurationProperties + 在啟動類添加@EnableConfigurationProperties({class})。這兩種方式都能解決問題

  2. 從非默認配置文件讀取配置信息,需要利用@PropertySource註解。同樣兩種方式:

    2.1 @PropertySource + @ConfigurationProperties + @Component/@Configuration

    2.2 @PropertySource + @ConfigurationProperties + @Component/@Configuration + @EnableConfigurationProperties,第二種方式存在一個問題,即還是必須要使用@Component註解,如果不使用,則會導致讀取配置信息為null,但程序不會報錯;而如果採用了,則會導致bean類的set方法被執行兩次(也就是生成了兩個同樣類型的bean類)。這種方式不建議!

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想要讓你的商品成為最夯、最多人討論的話題?網頁設計公司讓你強力曝光

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

※專營大陸快遞台灣服務

台灣快遞大陸的貨運公司有哪些呢?

分類
發燒車訊

Netty學習篇⑤–編、解碼源碼分析

前言

學習Netty也有一段時間了,Netty作為一個高性能的異步框架,很多RPC框架也運用到了Netty中的知識,在rpc框架中豐富的數據協議及編解碼可以讓使用者更加青睞;
Netty支持豐富的編解碼框架,其本身內部提供的編解碼也可以應對各種業務場景;
今天主要就是學習下Netty中提供的編、解碼類,之前只是簡單的使用了下Netty提供的解碼類,今天更加深入的研究下Netty中編、解碼的源碼及部分使用。

編、解碼的概念

  • 編碼(Encoder)

    編碼就是將我們發送的數據編碼成字節數組方便在網絡中進行傳輸,類似Java中的序列化,將對象序列化成字節傳輸
  • 解碼(Decoder)

    解碼和編碼相反,將傳輸過來的字節數組轉化為各種對象來進行展示等,類似Java中的反序列化
    如:
    // 將字節數組轉化為字符串
    new String(byte bytes[], Charset charset)

編、解碼超類

ByteToMessageDecoder: 解碼超類,將字節轉換成消息

解碼解碼一般用於將獲取到的消息解碼成系統可識別且自己需要的數據結構;因此ByteToMessageDecoder需要繼承ChannelInboundHandlerAdapter入站適配器來獲取到入站的數據,在handler使用之前通過channelRead獲取入站數據進行一波解碼;
ByteToMessageDecoder類圖

源碼分析

通過channelRead獲取入站數據,將數據緩存至cumulation數據緩衝區,最後在傳給decode進行解碼,在read完成之後清空緩存的數據

1. 獲取入站數據

/**
*  通過重寫channelRead方法來獲取入站數據
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 檢測是否是byteBuf對象格式數據
    if (msg instanceof ByteBuf) {
        // 實例化字節解碼成功輸出集合 即List<Object> out
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            // 獲取到的請求的數據
            ByteBuf data = (ByteBuf) msg;
            // 如果緩衝數據區為空則代表是首次觸發read方法
            first = cumulation == null;
            if (first) {
                // 如果是第一次read則當前msg數據為緩衝數據
                cumulation = data;
            } else {
                // 如果不是則觸發累加,將緩衝區的舊數據和新獲取到的數據通過        expandCumulation 方法累加在一起存入緩衝區cumulation
                // cumulator 累加類,將緩衝池中數據和新數據進行組合在一起
                // private Cumulator cumulator = MERGE_CUMULATOR;
                cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
            }
            // 將緩衝區數據cumulation進行解碼
            callDecode(ctx, cumulation, out);
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable t) {
            throw new DecoderException(t);
        } finally {
            // 在解碼完畢后釋放引用和清空全局字節緩衝區
            if (cumulation != null && !cumulation.isReadable()) {
                numReads = 0;
                cumulation.release();
                cumulation = null;
                // discardAfterReads為netty中設置的讀取多少次后開始丟棄字節 默認值16
                // 可通過setDiscardAfterReads(int n)來設置值不設置默認16次
            } else if (++ numReads >= discardAfterReads) {
                // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                // 在我們讀取了足夠的數據可以嘗試丟棄一些字節已保證不出現內存溢出的異常
                // 
                // See https://github.com/netty/netty/issues/4275
                // 讀取次數重置為0
                numReads = 0;
                // 重置讀寫指針或丟棄部分已讀取的字節
                discardSomeReadBytes();
            }
            // out為解碼成功的傳遞給下一個handler
            int size = out.size();
            decodeWasNull = !out.insertSinceRecycled();
            // 結束當前read傳遞到下個ChannelHandler
            fireChannelRead(ctx, out, size);
            // 回收響應集合 將insertSinceRecycled設置為false;
            // insertSinceRecycled用於channelReadComplete判斷使用
            out.recycle();
        }
    } else {
        // 不是的話直接fire傳遞給下一個handler
        ctx.fireChannelRead(msg);
    }
}
2. 初始化字節緩衝區計算器: Cumulator主要用於全局字節緩衝區和新讀取的字節緩衝區組合在一起擴容
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
    
    /**
    * alloc ChannelHandlerContext分配的字節緩衝區
    * cumulation 當前ByteToMessageDecoder類全局的字節緩衝區
    * in 入站的字節緩衝區
    **/
    @Override
    public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
        final ByteBuf buffer;
        // 如果全局ByteBuf寫入的字節+當前入站的字節數據大於全局緩衝區最大的容量或者全局緩衝區的引用數大於1個或全局緩衝區只讀
        if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
            || cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
            // Expand cumulation (by replace it) when either there is not more room in the buffer
            // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
            // duplicate().retain() or if its read-only.
            //
            // See:
            // - https://github.com/netty/netty/issues/2327
            // - https://github.com/netty/netty/issues/1764
            // 進行擴展全局字節緩衝區(容量大小 = 新數據追加到舊數據末尾組成新的全局字節緩衝區)
            buffer = expandCumulation(alloc, cumulation, in.readableBytes());
        } else {
            buffer = cumulation;
        }
        // 將新數據寫入緩衝區
        buffer.writeBytes(in);
        // 釋放當前的字節緩衝區的引用
        in.release();
        
        return buffer;
    }
};


/**
* alloc 字節緩衝區操作類
* cumulation 全局累加字節緩衝區
* readable 讀取到的字節數長度
*/
// 字節緩衝區擴容方法
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
    // 舊數據
    ByteBuf oldCumulation = cumulation;
    // 通過ByteBufAllocator將緩衝區擴大到oldCumulation + readable大小
    cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
    // 將舊數據重新寫入到新的字節緩衝區
    cumulation.writeBytes(oldCumulation);
    // 舊字節緩衝區引用-1
    oldCumulation.release();
    return cumulation;
}
3. ByteBuf釋放當前字節緩衝區的引用: 通過調用ReferenceCounted接口中的release方法來釋放
@Override
public boolean release() {
    return release0(1);
}

@Override
public boolean release(int decrement) {
    return release0(checkPositive(decrement, "decrement"));
}

/**
* decrement 減量
*/
private boolean release0(int decrement) {
    for (;;) {
        int refCnt = this.refCnt;
        // 當前引用小於減量
        if (refCnt < decrement) {
            throw new IllegalReferenceCountException(refCnt, -decrement);
        }
        // 這裏就利用里線程併發中的知識CAS,線程安全的設置refCnt的值
        if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
            // 如果減量和引用量相等
            if (refCnt == decrement) {
                // 全部釋放
                deallocate();
                return true;
            }
            return false;
        }
    }
}

4. 將全局字節緩衝區進行解碼

/**
* ctx ChannelHandler的上下文,用於傳輸數據與下一個handler來交互
* in 入站數據
* out 解析之後的出站集合 (此出站不是返回給客戶端的而是傳遞給下個handler的)
*/
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    try {
        // 如果入站數據還有沒解析的
        while (in.isReadable()) {
            // 解析成功的出站集合長度
            int outSize = out.size();
            // 如果大於0則說明解析成功的數據還沒被消費完,直接fire掉給通道中的後續handler繼續                消費
            if (outSize > 0) {
                fireChannelRead(ctx, out, outSize);
                out.clear();

                // Check if this handler was removed before continuing with decoding.
                // 在這個handler刪除之前檢查是否還在繼續解碼
                // If it was removed, it is not safe to continue to operate on the buffer.
                // 如果移除了,它繼續操作緩衝區是不安全的
                //
                // See:
                // - https://github.com/netty/netty/issues/4635
                if (ctx.isRemoved()) {
                    break;
                }
                outSize = 0;
            }
            // 入站數據字節長度
            int oldInputLength = in.readableBytes();
            // 開始解碼數據
            decodeRemovalReentryProtection(ctx, in, out);

            // Check if this handler was removed before continuing the loop.
            // 
            // If it was removed, it is not safe to continue to operate on the buffer.
            //
            // See https://github.com/netty/netty/issues/1664
            if (ctx.isRemoved()) {
                break;
            }

            // 解析完畢跳出循環
            if (outSize == out.size()) {
                if (oldInputLength == in.readableBytes()) {
                    break;
                } else {
                    continue;
                }
            }

            if (oldInputLength == in.readableBytes()) {
                throw new DecoderException(
                    StringUtil.simpleClassName(getClass()) +
                    ".decode() did not read anything but decoded a message.");
            }

            if (isSingleDecode()) {
                break;
            }
        }
    } catch (DecoderException e) {
        throw e;
    } catch (Throwable cause) {
        throw new DecoderException(cause);
    }
}

final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 設置解碼狀態為正在解碼  STATE_INIT = 0; STATE_CALLING_CHILD_DECODE = 1;             STATE_HANDLER_REMOVED_PENDING = 2; 分別為初始化; 解碼; 解碼完畢移除
        decodeState = STATE_CALLING_CHILD_DECODE;
        try {
            // 具體的解碼邏輯(netty提供的解碼器或自定義解碼器中重寫的decode方法)
            decode(ctx, in, out);
        } finally {
            // 此時decodeState為正在解碼中 值為1,返回false
            boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
            // 在設置為初始化等待解碼
            decodeState = STATE_INIT;
            // 解碼完成移除當前ChannelHandler標記為不處理
            // 可以看看handlerRemoved源碼。如果緩衝區還有數據直接傳遞給下一個handler
            if (removePending) {
                handlerRemoved(ctx);
            }
        }
    }
5. 執行channelReadComplete
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    // 讀取次數重置
    numReads = 0;
    // 重置讀寫index
    discardSomeReadBytes();
    // 在channelRead meth中定義賦值 decodeWasNull = !out.insertSinceRecycled();
    // out指的是解碼集合List<Object> out; 咱們可以點進
    if (decodeWasNull) {
        decodeWasNull = false;
        if (!ctx.channel().config().isAutoRead()) {
            ctx.read();
        }
    }
    // fire掉readComplete傳遞到下一個handler的readComplete
    ctx.fireChannelReadComplete();
}

/**
*  然後我們可以搜索下insertSinceRecucled在什麼地方被賦值了
* Returns {@code true} if any elements where added or set. This will be reset once {@link #recycle()} was called.
*/
boolean insertSinceRecycled() {
    return insertSinceRecycled;
}


// 搜索下insert的調用我們可以看到是CodecOutputList類即為channelRead中的out集合,眾所周知在    decode完之後,解碼數據就會被調用add方法,此時insertSinceRecycled被設置為true
private void insert(int index, Object element) {
    array[index] = element;
    insertSinceRecycled = true;
}


/**
* 清空回收數組內部的所有元素和存儲空間
* Recycle the array which will clear it and null out all entries in the internal storage.
*/
// 搜索recycle的調用我么可以知道在channelRead的finally邏輯中 調用了out.recycle();此時        insertSinceRecycled被設置為false
void recycle() {
    for (int i = 0 ; i < size; i ++) {
        array[i] = null;
    }
    clear();
    insertSinceRecycled = false;
    handle.recycle(this);
}

至此ByteToMessageDecoder解碼類應該差不多比較清晰了!!!

MessageToByteEncoder: 編碼超類,將消息轉成字節進行編碼發出

何謂編碼,就是將發送數據轉化為客戶端和服務端約束好的數據結構和格式進行傳輸,我們可以在編碼過程中將消息體body的長度和一些頭部信息有序的設置到ByteBuf字節緩衝區中;方便解碼方靈活的運用來判斷(是否完整的包等)和處理業務;解碼是繼承入站數據,反之編碼應該繼承出站的數據;接下來我們看看編碼類是怎麼進行編碼的;
MessageToByteEncoder類圖如下

源碼分析

既然是繼承出站類,我們直接看看write方法是怎麼樣的

/**
* 通過write方法獲取到出站的數據即要發送出去的數據
* ctx channelHandler上下文
* msg 發送的數據 Object可以通過繼承類指定的泛型來指定
* promise channelPromise異步監聽,類似ChannelFuture,只不過promise可以設置監聽的結果,future只能通過獲取監聽的成功失敗結果;可以去了解下promise和future的區別
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        // 檢測發送數據的類型 通過TypeParameterMatcher類型匹配器
        if (acceptOutboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            I cast = (I) msg;
            // 分配字節緩衝區 preferDirect默認為true
            buf = allocateBuffer(ctx, cast, preferDirect);
            try {
                // 進行編碼
                encode(ctx, cast, buf);
            } finally {
                // 完成編碼后釋放對象的引用
                ReferenceCountUtil.release(cast);
            }
            // 如果緩衝區有數據則通過ctx發送出去,promise可以監聽數據傳輸並設置是否完成
            if (buf.isReadable()) {
                ctx.write(buf, promise);
            } else {
                // 如果沒有數據則釋放字節緩衝區的引用併發送一個empty的空包
                buf.release();
                ctx.write(Unpooled.EMPTY_BUFFER, promise);
            }
            buf = null;
        } else {
            // 非TypeParameterMatcher類型匹配器匹配的類型直接發送出去
            ctx.write(msg, promise);
        }
    } catch (EncoderException e) {
        throw e;
    } catch (Throwable e) {
        throw new EncoderException(e);
    } finally {
        if (buf != null) {
            buf.release();
        }
    }
}

// 初始化設置preferDirect為true
protected MessageToByteEncoder() {
    this(true);
}
protected MessageToByteEncoder(boolean preferDirect) {
    matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I");
    this.preferDirect = preferDirect;
}

編碼: 重寫encode方法,根據實際業務來進行數據編碼

// 此處就是我們需要重寫的編碼方法了,我們和根據約束好的或者自己定義好想要的數據格式發送給對方

// 下面是我自己寫的demo的編碼方法;頭部設置好body的長度,服務端可以根據長度來判斷是否是完整的包,僅僅自學寫的簡單的demo非正常線上運營項目的邏輯
public class MyClientEncode extends MessageToByteEncoder<String> {

    @Override
    protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
        if (null != msg) {
            byte[] request = msg.getBytes(Charset.forName("UTF-8"));
            out.writeInt(request.length);
            out.writeBytes(request);
        }
    }
}

編碼類相對要簡單很多,因為只需要將發送的數據序列化,按照一定的格式進行發送數據!!!

項目實戰

項目主要簡單的實現下自定義編解碼器的運用及LengthFieldBasedFrameDecoder的使用

  • 項目結構如下
    │  hetangyuese-netty-06.iml
    │  pom.xml
    │
    ├─src
    │  ├─main
    │  │  ├─java
    │  │  │  └─com
    │  │  │      └─hetangyuese
    │  │  │          └─netty
    │  │  │              ├─client
    │  │  │              │      MyClient06.java
    │  │  │              │      MyClientChannelInitializer.java
    │  │  │              │      MyClientDecoder.java
    │  │  │              │      MyClientEncode.java
    │  │  │              │      MyClientHandler.java
    │  │  │              │      MyMessage.java
    │  │  │              │
    │  │  │              └─server
    │  │  │                      MyChannelInitializer.java
    │  │  │                      MyServer06.java
    │  │  │                      MyServerDecoder.java
    │  │  │                      MyServerDecoderLength.java
    │  │  │                      MyServerEncoder.java
    │  │  │                      MyServerHandler.java
    │  │  │
    │  │  └─resources
    │  └─test
    │      └─java
    
  • 服務端

    Serverhandler: 只是簡單的將解碼的內容輸出

    public class MyServerHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("客戶端連接成功 time: " + new Date().toLocaleString());
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("客戶端斷開連接 time: " + new Date().toLocaleString());
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String body = (String) msg;
            System.out.println("content:" + body);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            // 出現異常關閉通道
            cause.printStackTrace();
            ctx.close();
        }
    }

    解碼器

    public class MyServerDecoder extends ByteToMessageDecoder {
    
        // 此處我頭部只塞了長度字段佔4個字節,別問為啥我知道,這是要客戶端和服務端約束好的
        private static int min_head_length = 4;
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            // 解碼的字節長度
            int size = in.readableBytes();
            if(size < min_head_length) {
                System.out.println("解析的數據長度小於頭部長度字段的長度");
                return ;
            }
            // 讀取的時候指針已經移位到長度字段的尾端
            int length = in.readInt();
            if (size < length) {
                System.out.println("解析的數據長度與長度不符合");
                return ;
            }
    
            // 上面已經讀取到了長度字段,後面的長度就是body
            ByteBuf decoderArr = in.readBytes(length);
            byte[] request = new byte[decoderArr.readableBytes()];
            // 將數據寫入空數組
            decoderArr.readBytes(request);
            String body = new String(request, Charset.forName("UTF-8"));
            out.add(body);
        }
    }

    將解碼器加入到channelHandler中:記得加到業務handler的前面否則無效

    public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline()
    //                .addLast(new MyServerDecoderLength(10240, 0, 4, 0, 0))
    //                .addLast(new LengthFieldBasedFrameDecoder(10240, 0, 4, 0, 0))
                    .addLast(new MyServerDecoder())
                    .addLast(new MyServerHandler())
            ;
        }
    }
  • 客戶端

    ClientHandler

    public class MyClientHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("與服務端連接成功");
            for (int i = 0; i<10; i++) {
                ctx.writeAndFlush("hhhhh" + i);
            }
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("與服務端斷開連接");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("收到服務端消息:" +msg+ " time: " + new Date().toLocaleString());
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }

    編碼器

    public class MyClientEncode extends MessageToByteEncoder<String> {
    
        @Override
        protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
            if (null != msg) {
                byte[] request = msg.getBytes(Charset.forName("UTF-8"));
                out.writeInt(request.length);
                out.writeBytes(request);
            }
        }
    }

    將編碼器加到ClientHandler的前面

    public class MyClientChannelInitializer extends ChannelInitializer<SocketChannel> {
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline()
                    .addLast(new MyClientDecoder())
                    .addLast(new MyClientEncode())
                    .addLast(new MyClientHandler())
            ;
    
        }
    }
  • 服務端運行結果
    MyServer06 is start ...................
    客戶端連接成功 time: 2019-11-19 16:35:47
    content:hhhhh0
    content:hhhhh1
    content:hhhhh2
    content:hhhhh3
    content:hhhhh4
    content:hhhhh5
    content:hhhhh6
    content:hhhhh7
    content:hhhhh8
    content:hhhhh9
  • 如果不用自定義的解碼器怎麼獲取到body內容呢

    將自定義編碼器換成LengthFieldBasedFrameDecoder(10240, 0, 4, 0, 0)

    public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline()
    //                .addLast(new MyServerDecoderLength(10240, 0, 4, 0, 0))
                    .addLast(new LengthFieldBasedFrameDecoder(10240, 0, 4, 0, 0))
    //                .addLast(new MyServerDecoder())
                    .addLast(new MyServerHandler())
            ;
        }
    }
    
    // 怕忘記的各個參數的含義在這在說明一次,自己不斷的修改每個值觀察結果就可以更加深刻的理解
    /**
    * maxFrameLength:消息體的最大長度,好像默認最大值為1024*1024
    * lengthFieldOffset 長度字段所在字節數組的下標 (我這是第一個write的所以下標是0)
    * lengthFieldLength 長度字段的字節長度(int類型佔4個字節)
    * lengthAdjustment 長度字段補償的數值 (lengthAdjustment =  數據包長度 - lengthFieldOffset - lengthFieldLength - 長度域的值),解析需要減去對應的數值
    * initialBytesToStrip 是否去掉長度字段(0不去除,對應長度域字節長度)
    */
    public LengthFieldBasedFrameDecoder(
                int maxFrameLength,
                int lengthFieldOffset, int lengthFieldLength,
                int lengthAdjustment, int initialBytesToStrip)
    結果: 前都帶上了長度
    MyServer06 is start ...................
    客戶端連接成功 time: 2019-11-19 17:53:42
    收到客戶端發來的消息:   hhhhh0, time: 2019-11-19 17:53:42
    收到客戶端發來的消息:   hhhhh1, time: 2019-11-19 17:53:42
    收到客戶端發來的消息:   hhhhh2, time: 2019-11-19 17:53:42
    收到客戶端發來的消息:   hhhhh3, time: 2019-11-19 17:53:42
    收到客戶端發來的消息:   hhhhh4, time: 2019-11-19 17:53:42
    收到客戶端發來的消息:   hhhhh5, time: 2019-11-19 17:53:42
    收到客戶端發來的消息:   hhhhh6, time: 2019-11-19 17:53:42
    收到客戶端發來的消息:   hhhhh7, time: 2019-11-19 17:53:42
    收到客戶端發來的消息:   hhhhh8, time: 2019-11-19 17:53:42
    收到客戶端發來的消息:   hhhhh9, time: 2019-11-19 17:53:42

    如果我們在客戶端的長度域中做手腳 LengthFieldBasedFrameDecoder(10240, 0, 4, 0, 0)

    舊: out.writeInt(request.length);
    新: out.writeInt(request.length + 1);
    // 看結果就不正常,0後面多了一個0;但是不知道為啥只解碼了一次??? 求解答
    MyServer06 is start ...................
    客戶端連接成功 time: 2019-11-19 17:56:55
    收到客戶端發來的消息:   hhhhh0 , time: 2019-11-19 17:56:55
    
    // 正確修改為 LengthFieldBasedFrameDecoder(10240, 0, 4, -1, 0)
    // 結果:
    MyServer06 is start ...................
    客戶端連接成功 time: 2019-11-19 18:02:18
    收到客戶端發來的消息:   hhhhh0, time: 2019-11-19 18:02:18
    收到客戶端發來的消息:   hhhhh1, time: 2019-11-19 18:02:18
    收到客戶端發來的消息:   hhhhh2, time: 2019-11-19 18:02:18
    收到客戶端發來的消息:   hhhhh3, time: 2019-11-19 18:02:18
    收到客戶端發來的消息:   hhhhh4, time: 2019-11-19 18:02:18
    收到客戶端發來的消息:   hhhhh5, time: 2019-11-19 18:02:18
    收到客戶端發來的消息:   hhhhh6, time: 2019-11-19 18:02:18
    收到客戶端發來的消息:   hhhhh7, time: 2019-11-19 18:02:18
    收到客戶端發來的消息:   hhhhh8, time: 2019-11-19 18:02:18
    收到客戶端發來的消息:   hhhhh9, time: 2019-11-19 18:02:18

    捨棄長度域 :LengthFieldBasedFrameDecoder(10240, 0, 4, 0, 4)

    // 結果
    MyServer06 is start ...................
    客戶端連接成功 time: 2019-11-19 18:03:44
    收到客戶端發來的消息:hhhhh0, time: 2019-11-19 18:03:44
    收到客戶端發來的消息:hhhhh1, time: 2019-11-19 18:03:44
    收到客戶端發來的消息:hhhhh2, time: 2019-11-19 18:03:44
    收到客戶端發來的消息:hhhhh3, time: 2019-11-19 18:03:44
    收到客戶端發來的消息:hhhhh4, time: 2019-11-19 18:03:44
    收到客戶端發來的消息:hhhhh5, time: 2019-11-19 18:03:44
    收到客戶端發來的消息:hhhhh6, time: 2019-11-19 18:03:44
    收到客戶端發來的消息:hhhhh7, time: 2019-11-19 18:03:44
    收到客戶端發來的消息:hhhhh8, time: 2019-11-19 18:03:44
    收到客戶端發來的消息:hhhhh9, time: 2019-11-19 18:03:44
    分析源碼示例中的 lengthAdjustment = 消息字節長度 – lengthFieldOffset-lengthFieldLength-長度域中的值
  • 源碼中的示例
     * <pre>
     * lengthFieldOffset   =  0
     * lengthFieldLength   =  2
     * <b>lengthAdjustment</b>    = <b>-2</b> (= the length of the Length field)
     * initialBytesToStrip =  0
     *
     * BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
     * +--------+----------------+      +--------+----------------+
     * | Length | Actual Content |----->| Length | Actual Content |
     * | 0x000E | "HELLO, WORLD" |      | 0x000E | "HELLO, WORLD" |
     * +--------+----------------+      +--------+----------------+
     * </pre>
    長度域中0x000E為16進制,轉換成10進制是14,說明消息體長度為14;根據公式:14-0-2-14 = -2
    * <pre>
     * lengthFieldOffset   = 0
     * lengthFieldLength   = 3
     * <b>lengthAdjustment</b>    = <b>2</b> (= the length of Header 1)
     * initialBytesToStrip = 0
     *
     * BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
     * +----------+----------+----------------+      +----------+----------+----------------+
     * |  Length  | Header 1 | Actual Content |----->|  Length  | Header 1 | Actual Content |
     * | 0x00000C |  0xCAFE  | "HELLO, WORLD" |      | 0x00000C |  0xCAFE  | "HELLO, WORLD" |
     * +----------+----------+----------------+      +----------+----------+----------------+
     * </pre>
    從上的例子可以知道;lengthAdjustment(2) = 17- 12(00000C)-lengthFieldOffset(0) - lengthFieldLength(3);

    …….等等

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業 缺乏曝光? 下一步”網站設計“幫您第一時間規劃公司的門面形象

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※試算大陸海運運費!