分類
發燒車訊

程序員不能一直停留在愛學習的階段

今天在人人都是產品經理的上,看到一篇文章 《一個創業程序員的35歲人生總結(下)》 。其實也道出了我曾經作為技術人員,各種失敗的嘗試。

 

下面是一種的一段引用,我非常認可

先說技術,技術是我死磕時間最長的技能。最早在大學選擇FLASH,完全是出於愛好,當時別說我,全世界估計也沒幾個人能預測到僅僅兩年後,FLASH程序員就會隨着網頁遊戲的興起,成為當時最搶手的程序員種類之一。後來畢業了,選工作的時候,更多是學習的心態,創業什麼的,甚至工資,都無所謂,只要能提升技術就行。後來技術到一定程度了,就希望能幫助項目和公司更好地實現大家想要的產品,最終實現大家共同的夢想。

在類似我經歷的公司中,有兩個問題,會同時困擾大部分程序員和老闆。第一個問題就是“學習”!

程序員,尤其是前端程序員,天生有一種極強的學習慾望。前端這門技術,半年不學習可能就要落後,一年不學習估計就有被淘汰的風險了。程序員愛學習,不停提升自己的專業技能,這本來是好事。

但是對於很多創業公司,卻成為不能承受之重。因為很多程序員,會極端地掉入學習的漩渦中,簡直跟掉入錢眼兒里的老闆有得一拼,眼中除了學習啥也容不下,比如曾經的我。更要命的是,有些程序員,自己的人生規劃和學習方向,還跟公司的業務方向不太一致。

我是06年畢業,畢業就進入了一所當時還不錯的互聯網公司,公司名稱就不說了,反正對這家公司也沒有啥好感,雖然現在很多人都想進去修福報。

  在這家公司裏面,認識了幾個比較好的朋友,算是一個非常大的收穫。更關鍵的是,我們都是一群比較有想法的人,喜歡用技術去做各種各樣的嘗試。   在2006年的時候,我們第一次嘗試做一個網址收藏夾。當時的想法很簡單,我們可以把自己的喜歡的網站地址收藏起來,並且可以隨時隨地的分享到網站裏面,實現起來算是比較簡答。但是對標了一個競品,名字忘記了,好像叫做 ”好網角“。當時三個人,一個負責產品,兩個搞技術。按照道理說,用wordpress之類的網站很快的。結果當時我們兩個做技術的,被公司的環境給洗腦了,覺得一定要用牛逼的技術做出來的東西才有價值。等基礎架構搭建完成之後,我們就不想寫業務代碼了,覺得好無趣,結果不了了之。   在2007年,由於當時我們都是單身,有一天吃飯,想到了是不是可以做一個妹子網站,上面都是妹子。作為單身者,尤其是程序員,完全可以去上面找妹子約會。結果還是卡在了產品設計,因為沒有願意做產品,都想着做技術。由於公司的引導,內部開始架構化轉型,從此開始了架構文化,我這個時候對技術的更加執着。   在2008年和2009年這兩年之間,加班比較多。正好趕緊上公司晉陞P,就比較老實了,沒有太多的想法。   在2010年,微博出來,比較火爆的時候。發現一個蠻有趣的現象,就是微博必須註冊才能看內容,屏蔽了百度的搜索。當時我們知道做垃圾站可以賺錢,就是利用百度seo的流量。就開始搞。這一次吸取了教訓,快速用wordpress搭建了一個網站,然後也不不用爬蟲,直接人工編輯的方式,每天人肉搬運微博最熱門的內容。後來就搬運百度top的內容,反正就是什麼熱門放什麼。這個時候就已經開始在考慮,能不能把今天最熱門的資訊信息找出來,做成一個類似今日熱門的諮詢網站。我們的方法很原始,就是爬蟲去top.baidu.com,微博熱門資訊排行的內容。運營半年的時候,有一天一個帖子爆了,當時獨立IP直接突破5W。不過也因為這個,被新聞辦公廳警告了。後面連續幾次違規,省新聞辦公廳以沒有新聞出版牌照,把網站關停了。不過這個網站看來,還算是比較成功的。至少運營了大半年,也開始讓我思考運營的價值,時間的價值。   2011-2013年,在網站被關停之後,我就意識到內容的價值,開始持續輸出技術文章。曾經的blog鏈接: https://www.cnblogs.com/aigongsi/ ,一共100多篇文章,大概500W閱讀。 不過在輸出內容上,也犯了一個同樣的錯誤,就是覺得應該寫能突出技術能力的內容。當時身邊的技術人都挺瞧不起阮一峰的,覺得他的內容太基礎,沒有啥技術含量。在我輸出內容的時候,純粹就是愛好,沒有考慮目標受眾和持續的運營,更沒有想過IP的問題。這個時候如果有產品經理思維,可能就會明白,阮一峰的目標受眾非常廣,並且商業變現價值是非常高。而所謂技術架構類文章,受眾就小了很多,商業的價值更在於合作,而非變現上。   2015年開始創業,就慢慢開始轉向產品、營銷,運營,算是徹底轉型了。   我現在身邊的技術圈的朋友都是我的同事,都在創業,但是和他們交流起來,他們談論最多的還是怎麼實現功能,用什麼樣的方案,能夠支撐多少用戶量。很少談論到我們的用戶是誰,他們有什麼樣的需求,我們的產品怎麼解決這個需求。更不會考慮我們的產品適合在什麼樣的推廣方式,是內容營銷,還是渠道合作,或者說sem。    
為什麼不能一直停留在愛學習的階段 其實程序員應該是最愛學習的一個群體。包括很多知識付費,都在輸出各種知識,主要面向的對象都是各種專業技能的。 對於初學者,學習能力肯定是重要的能力。你必須具備基本的知識、技能、經驗。如果你是科研工作者,持續的學習研究是晉陞的必要條件。   商業或者創業,本質上是要拿結果的,反映在經營公司上就是“利潤”,反映在產品上,就是用戶數的增長。技術人員肯定有太多的理由拒絕這些結果,比如曾經我覺得做這些沒有技術挑戰;這些臟活累活對我的職業生涯發展不好;我不喜歡這樣的用戶,他們太low了。   反映在職場上,你也要拿結果。我第一家公司之前一直強調以結果為導向。如果你要晉陞,和領導關係是不是要處理,向上管理是不是要學會。晉陞的話,要準備PPT,基本的PPT技能和演講是不是要學會。如果說邀功是晉陞的必要條件,那就怎麼學者做自我營銷,在公司裏面做自己的IP,讓自己更多的曝光,這樣晉陞的幾率是不是更高一些。我過去這些基本上全部都沒有做到,以至於在職場很難混下去。   如果你以拿結果的思維去看一些事情,技術的牛逼與否僅僅是其中的一個環節。很多時候,我們說自己愛學習,其實是給自己找了一個不去拿結果的借口。因為拿結果太難了,並且很多時候都會面臨失敗。當我們害怕失敗的時候,就會找一個理由拒絕行動,並且這個理由會讓加強我們的某些特質,那麼“愛學習”這個就是非常好一個理由。某些知識付費上癮者,內心是在逃避。   本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

分類
發燒車訊

C#數據結構與算法系列(十):逆波蘭計算器——逆波蘭表達式(後綴表達式)

1.介紹

後綴表達式又稱逆波蘭表達式,與前綴表達式相似,只是運算符位於操作數之後

2.舉例說明

(3+4)*5-6對應的後綴表達式就是3 4 +5 * 6 –

3.示例

輸入一個逆波蘭表達式(後綴表達式),使用棧(Stack),計算其結果

思路分析:

從左至右掃描表達式,遇到数字時,將数字壓入堆棧,遇到運算符時,彈出棧頂的兩個數,用運算符對它們做相應的計算(次頂元素 和 棧頂元素),並將結果入棧;

重複上述過程直到表達式最右端,最後運算得出的值即為表達式的結果例如: (3+4)×5-6 對應的後綴表達式就是 3 4 + 5 × 6 – , 

針對後綴表達式求值步驟如下:

從左至右掃描,將3和4壓入堆棧;
遇到+運算符,因此彈出4和3(4為棧頂元素,3為次頂元素),計算出3+4的值,得7,再將7入棧;
將5入棧;
接下來是×運算符,因此彈出5和7,計算出7×5=35,將35入棧;
將6入棧;
最後是-運算符,計算出35-6的值,即29,由此得出最終結果

代碼實現:

using System;
using System.Collections.Generic;
using System.Text.RegularExpressions;

namespace DataStructure
{
    public class PolandNotation
    {
        public static void Test()
        {
            try
            {
                //定義逆波蘭表達式
                string suffixExpression = "3 4 + 5 * 6 -";

                //將suffixExpression轉換成鏈表的方式
                var list = GetListString(suffixExpression);

                //輸出結果
                var result = Calculate(list);

                Console.WriteLine($"{suffixExpression}的結果是{result}");
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
           
        }
        /// <summary>
        /// 獲取集合
        /// </summary>
        /// <param name="suffixExpression"></param>
        /// <returns></returns>
        public static List<string> GetListString(string suffixExpression)
        {
            //首先實例化List
            List<string> list = new List<string>();

            //將字符串通過空格切換成數組
            string[] split=suffixExpression.Split(" ");

            //循環添加
            foreach (var item in split)
            {
                list.Add(item);
            }

            return list;
        }

        /// <summary>
        /// 計算
        /// </summary>
        /// <param name="list"></param>
        /// <returns></returns>
        public static int Calculate(List<string> list)
        {
            //創建棧
            Stack<string> stack = new Stack<string>();

            //循環遍歷
            list.ForEach(item =>
            {
                //正則表達式判斷是否是数字,匹配的是多位數
                if (Regex.IsMatch(item,"\\d+"))
                {
                    //如果是数字直接入棧
                    stack.Push(item);
                }
                //如果是操作符
                else
                {
                    //出棧兩個数字,並運算,再入棧
                    int num1 =int.Parse(stack.Pop());

                    int num2 = int.Parse(stack.Pop());

                    int result = 0;

                    if(item.Equals("+"))
                    {
                        result = num2 + num1;
                    }
                    else if(item.Equals("*"))
                    {
                        result = num2 * num1;
                    }
                    else if(item.Equals("/"))
                    {
                        result = num2 / num1;
                    }
                    else if (item.Equals("-"))
                    {
                        result = num2 - num1;
                    }
                    else
                    {
                        throw new Exception("無法識別符號");
                    }

                    stack.Push(""+result);
                }
            });

            //最後把stack中數據返回
            return int.Parse(stack.Pop());
        }
    }
}

結果圖:

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

新北清潔公司,居家、辦公、裝潢細清專業服務

※推薦評價好的iphone維修中心

分類
發燒車訊

Kubernetes學習筆記(九):StatefulSet–部署有狀態的多副本應用

StatefulSet如何提供穩定的網絡標識和狀態

ReplicaSet中的Pod都是無狀態,可隨意替代的。又因為ReplicaSet中的Pod是根據模板生成的多副本,無法對每個副本都指定單獨的PVC。

來看一下StatefulSet如何解決的。

提供穩定的網絡標識

StatefulSet創建Pod都有一個從零開始的順序索引,這會體現在Pod的名稱和主機名上,同樣也會體現在Pod對應的固定存儲上。所以這些名字是可預先知道的,不同於ReplicaSet的隨機生成名字。

因為他們的名字都是固定的,而且彼此狀態都不同,通常會操作他們其中的一個。如此情況,一般都會創建一個與之對應的headless Service,通過這個Service,每個Pod將擁有獨立的DNS記錄。

擴容一個StatefulSet會使用下一個順序索引創建一個新的Pod,縮容會刪除索引值最高的。並且縮容任何時候只會操作一個Pod。

如何提供穩定的存儲

StatefulSet可以擁有一個或多個PVC模板,這些PVC會在創建Pod前創建出來,綁定到一個Pod實例上。

擴容的時候會創建一個Pod以及若干個PVC,刪除的時候只會刪除Pod。StatefulSet縮容時不會刪除PVC,擴容時會重新掛上。

使用StatefulSet

定義三個PV

定義pv-(a|b|c)

# stateful-pv-list.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
  name: pv-a
spec:
  capacity:
    storage: 1Mi
  accessModes:
  - ReadWriteOnce
  persistentVolumeReclaimPolicy: Recycle
  hostPath:
    path: /tmp/pva
---
apiVersion: v1
kind: PersistentVolume
# 以下忽略

headless的Service

# stateful-service-headless.yaml
apiVersion: v1
kind: Service
metadata:
  name: rwfile
spec:
  clusterIP: None
  selector:
    app: rwfile
  ports:
  - port: 80

定義StatefulSet

先創建兩個Pod副本。使用volumeClaimTemplates定義了PVC模板。

# stateful.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: rwfile
spec:
  replicas: 2
  serviceName: rwfile
  selector:
    matchLabels:
     app: rwfile
  template:
    metadata:
      labels:
        app: rwfile
    spec:
      containers:
      - image: registry.cn-hangzhou.aliyuncs.com/orzi/rwfile
        name: rwfile
        ports:
        - containerPort: 8000
        volumeMounts:
        - name: data
          mountPath: /tmp/data
  volumeClaimTemplates:
  - metadata:
      name: data
    spec:
      resources:
        requests:
          storage: 1Mi
      accessModes:
      - ReadWriteOnce

創建三個PV,一個headless的Service,一個StatefulSet

-> [root@kube0.vm] [~] k create -f stateful-pv-list.yaml
persistentvolume/pv-a created
persistentvolume/pv-b created
persistentvolume/pv-c created

-> [root@kube0.vm] [~] k create -f stateful-service-headless.yaml
service/rwfile created

-> [root@kube0.vm] [~] k create -f stateful.yaml
statefulset.apps/rwfile created

查看

-> [root@kube0.vm] [~] k get all -o wide
NAME                    READY   STATUS      RESTARTS   AGE   IP            NODE       NOMINATED NODE   READINESS GATES
pod/rwfile-0            1/1     Running     0          12s   10.244.1.52   kube1.vm   <none>           <none>
pod/rwfile-1            1/1     Running     0          8s    10.244.2.56   kube2.vm   <none>           <none>

NAME                 TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)   AGE   SELECTOR
service/kubernetes   ClusterIP   10.96.0.1    <none>        443/TCP   81s   <none>
service/rwfile       ClusterIP   None         <none>        80/TCP    23s   app=rwfile

NAME                      READY   AGE   CONTAINERS   IMAGES
statefulset.apps/rwfile   2/2     12s   rwfile       registry.cn-hangzhou.aliyuncs.com/orzi/rwfile

查看PV和PVC,可以看到已經有兩個PVC綁定了PV

-> [root@kube0.vm] [~] k get pv,pvc -o wide
NAME                    CAPACITY   ACCESS MODES   RECLAIM POLICY   STATUS      CLAIM                   STORAGECLASS   REASON   AGE     VOLUMEMODE
persistentvolume/pv-a   1Mi        RWO            Recycle          Bound       default/data-rwfile-0                           7m20s   Filesystem
persistentvolume/pv-b   1Mi        RWO            Recycle          Bound       default/data-rwfile-1                           7m20s   Filesystem
persistentvolume/pv-c   1Mi        RWO            Recycle          Available                                                   7m20s   Filesystem

NAME                                  STATUS   VOLUME   CAPACITY   ACCESS MODES   STORAGECLASS   AGE     VOLUMEMODE
persistentvolumeclaim/data-rwfile-0   Bound    pv-a     1Mi        RWO                           6m55s   Filesystem
persistentvolumeclaim/data-rwfile-1   Bound    pv-b     1Mi        RWO                           6m51s   Filesystem

請求Pod

啟動代理

-> [root@kube0.vm] [~] k proxy
Starting to serve on 127.0.0.1:8001

發送請求

-> [root@kube0.vm] [~] curl http://localhost:8001/api/v1/namespaces/default/pods/rwfile-0/proxy/ -d "a=123"
data stored in : rwfile-0

-> [root@kube0.vm] [~] curl http://localhost:8001/api/v1/namespaces/default/pods/rwfile-0/proxy/
a=123

刪除測試

刪除rwfile-0,然後查看,從時間上看確實是被刪除重建的。

-> [root@kube0.vm] [~] k delete po rwfile-0
pod "rwfile-0" deleted

-> [root@kube0.vm] [~] k get po
NAME                READY   STATUS      RESTARTS   AGE
rwfile-0            1/1     Running     0          7s
rwfile-1            1/1     Running     0          19m

看一下之前存儲的數據還在不在

-> [root@kube0.vm] [~] curl http://localhost:8001/api/v1/namespaces/default/pods/rwfile-0/proxy/
a=123

還是在的,此次測試實際上也證明了StatefulSet提供了穩定的網絡標識和存儲。

發現StatefulSet的夥伴節點

使用DNS解析headless的Service的FQDN。
例子以後再寫吧。。

如何處理節點失效

除非確定節點無法運行或者不會在訪問,否則不要強制刪除有狀態的Pod

k delete pod rwfile-0 --force --grace-period 0

小結

  • StatefulSet創建Pod都有一個從零開始的順序索引
  • 通常會創建一個與StatefulSet對應的headless Service。
  • 擴容一個StatefulSet會使用下一個順序索引創建一個新的Pod,縮容會刪除索引值最高的。
  • 新建StatefulSet需要指定headless ServiceName和volumeClaimTemplates。
  • 使用DNS發現StatefulSet的夥伴節點
  • 強制刪除:k delete pod rwfile-0 --force --grace-period 0

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

※回頭車貨運收費標準

分類
發燒車訊

挖洞入門_union型SQL注入

簡介:在漏洞盒子挖洞已經有一段時間了,雖說還不是大佬,但技術也有所進步,安全行業就是這樣,只有自己動手去做,才能將理論的知識變為個人的經驗。本篇文章打算分享一下我在挖union型SQL注入漏洞過程中的一些個人理解,如有不足也請大佬不吝指教。

0x00:什麼是SQL注入

SQL注入,相信大多數人一開始接觸安全,聽說的第一種漏洞類型就會是SQL注入,眾所周知,其本質就是將用戶輸入的數據當成了SQL語句來執行

開發過網站的朋友應該都清楚,大多數的小型企業或個人的站點大都採用了LAMP結構,即Linux + Apache + MySQL + PHP,當然還有一些其它常見的技術如下錶:

操作系統 Web服務器 數據庫 編程語言
Linux Apache MySQL PHP
Windows Server Nginx Oracle JSP
Tomcat SQL Server ASP
Python

總的來說,絕大多數網站都採用了動態Web開發技術,而動態Web開發離不開數據庫,如果沒有處理好這兩者之間的關係,那麼SQL注入就會隨之而來了。

舉例來說,當我們想要通過參數id來獲取相對應的新聞時,整個過程簡單來說就是用戶通過URL請求新聞–>後台通過用戶請求去數據庫查詢相對應的新聞–>將查詢到的新聞回傳給用戶。在第二步查詢相對應的新聞時,後台會執行SQL語句來查詢,就像SELECT * FROM news WHERE id=''id的值是用戶來控制的,當id=1時,就會返回id=1的新聞,id=2時返回id=2的新聞,以此類推,就可以動態的控制web界面了。

這時,當用戶輸入的id值不正確時,後台就無法獲取相對應的新聞,前端就會沒有數據显示,可當用戶輸入的數據為1'; DROP TABLE news-- a時,恐怖的事情就發生了,數據庫中的news表被刪除了,這就說明這個參數存在SQL注入

回到剛才用戶輸入的數據,拼接到後台查詢數據時,整個SQL語句就變成了SELECT * FROM news WHERE id='1'; DROP TABLE news-- a',分析這條語句可知,用戶輸入的單引號閉合了id的值分號閉合了SELECT語句,然後又新建了一條DROP語句刪除了表news,最後的— a註釋掉了id值后的那個單引號,SQL注入就這麼產生了。

當一個站點存在SQL注入時用戶的輸入就可以傳入數據庫執行,理論上這樣可以獲得數據庫的全部數據,也就是常說的脫庫了。獲得數據的方法也多種多樣,可以通過頁面直接返回想要查詢的數據,也可以通過sleep延時函數猜測數據,都不行的話我們還可以使用DNS解析日誌來獲得數據。其中,最簡單的一種方法就是union型的SQL注入了。

union型SQL注入只是SQL注入的其中一種,也是最簡單的一種,對於這種漏洞的防範也特別簡單,可這種漏洞在互聯網中仍不計其數…這也可見全國乃至全球對於網絡安全知識普及的不足,接下來,我會從三個方面來講講這種漏洞,分別是為什麼會產生怎麼利用以及怎麼防範

0x01:為什麼會產生union型SQL注入

union型SQL注入,看名字就能知道,使用這種方法可以直接在頁面中返回我們要查詢的數據,方法也很簡單,即使用UNION聯合查詢即可。

但使用UNION聯合查詢時還要滿足一個條件,那就是我們構造的SELECT語句的字段數要和當前表的字段數相同才能聯合查詢,即首先我們要確定當前表的字段數。order by x是數據庫中的一個排序語句,order by 1即通過第一個字段進行排序。這時我們就可以構造SELECT * FROM news WHERE id='1' order by x-- a'來猜測當前表的字段數,x值遞增,當頁面返回數據異常時,即無當前字段時,用當前的x值減一即可得到當前表的字段數了。

知道了當前表的字段數,就可以進行UNION聯合查詢了。但聯合查詢時,頁面只會显示查詢到數據的第一條,也就是UNION前的SELECT語句的結果,想要显示我們自己聯合查詢的結果時,還必須使前一條語句失效,這裏我們構造and 1=2使前一句SELECT語句失效。回到剛才的案例,假設當前表的字段數為3,我們就可以構造SELECT * FROM news WHERE id='1' and 1=2 UNION SELECT 1,2,3-- a'來查詢當前頁面的顯錯點了,通過下圖的案例可知,當前的顯錯點為第一字段第三字段

這個顯錯點又是什麼意思呢?比如當前表中共有三個字段,一個是標題(title)、一個是時間(time)、一個是內容(data),而我們前端不需要显示時間,只需要展示標題和內容即可。那麼從數據庫獲得的數據中,也只有標題字段和內容字段會展示在頁面上,這兩個點就是顯錯點

通過這裏的顯錯點,用戶就可以獲得數據庫中的所有數據了。當用戶輸入的數據為1' and 1=2 UNION SELECT 1,2,database()-- a時,即SQL語句為SELECT * FROM news WHERE id='1' and 1=2 UNION SELECT 1,2,database()-- a'時,就可以直接得到數據庫的庫名

0x02:怎麼利用union型SQL注入

1.判斷是否存在注入

構造and 1=1/and 1=2查看頁面是否有異常,若有異常,即有可能存在注入,另外還可通過該語句判斷該站點是否有WAF,若有WAF的話會有攔截警告,當然,WAF也是可以繞過的。。。

2.查詢當前表的字段數

構造order by x,當頁面返回異常時,利用x減一即可得到當前表的字段數

3.查詢顯錯點

構造and 1=2 union select 1,2,3,若頁面显示了我們構造的1,2,3,則對應的字段即為顯錯點

4.查詢數據庫庫名

構造and 1=2 union select 1,2,database(),即可在顯錯點显示當前數據庫庫名

一般挖漏洞的話到此步驟就可以提交了,切記千萬不可非法獲得數據,挖洞有風險,同志需謹慎!

5.查詢數據庫中的表名

構造and 1=2 union select 1,2,table_name from information_schema.tables where table_schema=database() limit 0,1,即可在顯錯點显示當前庫中的表名,因為顯錯點一次只能显示一條數據,這時可以通過limit語句選擇不同的表名進行查看。

6.查詢選擇表中的字段名

構造and 1=2 union select 1,2,column_name from information_schema.columns where table_schema=database() and table_name='XXX' limit 0,1,即可在顯錯點显示字段名,這裏也是通過limit語句選擇不同的字段名進行查看。

7.查詢數據庫中的數據

構造and 1=2 union select 1,2,XXX from XXX limit 0,1,即可獲得數據庫中的數據了。

0x03:怎麼防範union型SQL注入

針對union型SQL注入,我們可以對用戶輸入的數據進行一次篩查,設置黑名單,攔截注入常用的一些關鍵詞,比如andorder byunionselectfrom等。

除了設置黑名單外,還有一種比較靠譜的方法,即使用預編譯語句,而不是動態的生成SQL語句,這樣可以有效的避免用戶輸入的數據連接到數據庫執行,就是實現起來比較複雜,需要設置大量的預編譯語句。

另外還有一種目前最靠譜的方法,實現起來還簡單,就是上硬件防火牆。。。就是有點小貴。

0x04:互聯網中的一些案例

依據網絡安全法,本文旨在分享個人學習經驗,內容禁止用於違法犯罪行為!

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

※教你寫出一流的銷售文案?

※超省錢租車方案

分類
發燒車訊

RocketMQ(1)—架構原理及環境搭建

一、架構簡述

RocketMQ阿里開源的一個分佈式消息傳遞和流媒體平台,具有低延遲,高性能和可靠性, 萬億級容量和靈活的可伸縮性。跟其它中間件相比,RocketMQ的特點是JAVA實現,在發生宕機和其它故障時消息丟失率更低

它由四個部分組成:nameserver,broker,生產者和消費者。它們中的每一個都可以水平擴展,而沒有單個故障點。  

Nameserver:提供輕量級的服務發現和路由。生產者和消費者通過nameserver獲取broker信息。它幾乎是無狀態的,nameserver結點之間沒有任何的數據同步,broker註冊信息時會註冊到每一個nameserver結點上面,所以每個nameserver節點都記錄了完整broker信息,提供相應的讀寫服務,並支持快速的存儲擴展。

Broker:通過提供輕量級的topic和queue機制來存儲消息。與nameserver中的每個節點建立長連接,定時註冊topic等信息到nameserver上面。broker一般都是主從模式,因為消息是真實存儲消息的地方,避免一個結點掛了,導致這個節點數據全部丟失。

Producer:與nameserver集群中的一個結點建立長連接,定期的拉取broker 的topic路由信息,再將消息發送到對應broker的topic上面

Consumer:與nameserver集群中的一個結點建立長連接,定期的拉取broker 的topic路由信息,再去消費對應broker的topic信息

 

 

二、環境搭建

 1.官網下載:http://rocketmq.apache.org/release_notes/release-notes-4.7.0/

2.解壓 unzip rocketmq-all-4.7.0-bin-release.zip

3.修改啟動參數配置。默認的jvm參數內存設置特別大,如果自己機器不行的話需要手動改下bin目錄下的啟動參數文件:runbroker.sh 和runserver.sh文件 我的虛擬機內存分配不大,改成256m 256m 128m

   這是默認的

 

 

4.啟動nameserver: nohup sh mqnamesrv ‐n 192.168.0.67:9876 &   (將日誌輸出當前目錄的nohub.out文件,方便查看啟動日誌,ip是當前機器的ip)

 

 

 

 5.啟動broker:nohup sh mqbroker ‐n 192.168.0.67:9876 autoCreateTopicEnable=true &  (autoCreateTopicEnable=true 自動創建topic,如果不設置true的話,生產者發送消息的時候如果沒有topic就會發送失敗,需要提前把topic創建好,設置true會在發送時自動創建topic,192.168.0.67:9876 是name server)

也可以使用配置文件啟動broker:nohup sh mqbroker ‐n 192.168.0.67:9876 ‐c conf/broker.conf &

簡單看下默認的配置文件中的一些參數:

#集群名字
brokerClusterName = DefaultCluster
#broker名字,集群中主從都要用這個名字,才會組成一個集群
brokerName = broker-a
#id為0的是master  非0的slava
brokerId = 0
#消息處理時間,凌晨4點
deleteWhen = 04
#消息保存時間默認48小時,48小時之後的凌晨4點就會清理
fileReservedTime = 48
#集群主從之間數據同步方式 
#異步只需要發到master成功就返回客戶端段成功,性能高,但是如果master掛了 slave還未同步就會丟失消息。根據自身業務場景選擇合適方式
brokerRole = ASYNC_MASTER
#消息刷盤機制,和主從數據同步類似,同步就是說需要寫進磁盤了才返回成功。異步就是寫進內存了就返回成功,後面再去落盤。
flushDiskType = ASYNC_FLUSH
#自動創建topic
autoCreateTopicEnable=true

使用配置文件啟動:nohup sh bin/mqbroker ‐n 192.168.0.67:9876 -c conf/broker.conf &

broker 192.168.0.67:10911關聯的nameserver是192.168.0.67:9876

 

 

至此一個單機的rocketMQ的環境就搭建好了   正常退出: sh mqshutdown broker  和  sh mqshutdown namesrv

 

測試下消息發送,使用rocketMQ提供的測試腳本:

export NAMESRV_ADDR=192.168.0.67:9876

生產者腳本

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer 消費者腳本

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer 發送消息:  

 

 消費消息:

 

三、控制台搭建

1.下載rocketMQ的擴展包,master分支:https://github.com/apache/rocketmq-external 

 2.啟動rocketmq-console-ng模塊

 

 3.修改此模塊配置:

  3.1 maven依賴rocketMQ版本改成自己部署版本對應的,我部署的MQ是最新的4.7.0版本

 

 

  3.2 配置文件中配置namserver地址和控制台數據存放地址

 

 正常來說改完這兩個地方就可以直接啟動控制台的這個springboot程序了。

但是因為我用的MQ是最新的4.7版本,控制台對應的還沒有更新到最新的。編譯都有會報錯的地方

1.DefaultMQPullConsumer這個類已經不推薦使用了,並且4.7.0中有兩個類似的構造器,原來代碼直接傳了一個null,第二個參數無法識別是哪個構造器的。修改下把第二參數強轉String或者RPCHook

 

 

2.MQAdminExt這個接口中加了新方法,但是控制台中MQAdminExtImpl還沒有實現對應的方法。這個問題在github上幾天前已經有人提了Issues了,我這裡是自己添加一下默認實現然後服務就可以正常啟動了,還不確定後續後面有什麼影響沒有,至少可以啟動了

 

 

 

 

之前使用測試腳本發送的消息 以及topic都可以在控制台看到了

 

 

 

 

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

網頁設計最專業,超強功能平台可客製化

分類
發燒車訊

接口中的默認方法與靜態方法

在Java8之前的版本中,接口中只能聲明常量和抽象方法,接口的實現類中必須實現接口中所有的抽象方法。而在Java8中,接口中可以聲明默認方法靜態方法。

接口中的默認方法

Java 8中允許接口中包含具有具體實現的方法,該方法稱為“默認方法”,默認方法使用“ default ”關鍵字修飾 。

示例:

public interface MyInterface {
    default String getMsg(String srcMsg){
        return "======"+srcMsg;
    }
}

接口中的默認方法,有一個“類優先”原則:

若一個接口中定義了一個默認方法,而另外一個父類或接口中又定義了同一個同名的方法時:

  • 選擇父類中的方法。如果一個父類提供了具體的實現,那麼接口中具有相同名稱的參數的默認方法會被忽略。
  • 接口衝突。如果一個父接口提供一個默認方法,而另一個接口中也提供了一個具有相同名稱和參數列表的方法(不管方法是否是默認方法),那麼必須覆蓋該方法來解決衝突。

示例1:

public interface MyInterface1 {
    default String getMsg(String srcMsg){
        return "===我是MyInterface1111111==="+srcMsg;
    }
}
///////////////////////////////////////////////////////
public class MyClass1 {
    public String getMsg(String srcMsg){
        return "===我是MyClass11111==="+srcMsg;
    }
}
///////////////////////////////////////////////////////
public class MySubClass1 extends MyClass1 implements MyInterface1 {
}

///////////////////////////////////////////////////////
public class InterfaceTest {

    public static void main(String[] args) {
        MySubClass1 ms1 = new MySubClass1();

        String srcMsg = "Java 牛逼!!";
        //MySubClass1 類繼承了 MyClass1 類,實現了MyInterface1 接口,根據類優先原則,調用同名方法時,會忽略掉接口 MyInterface1 中的默認方法。
        System.out.println(ms1.getMsg(srcMsg));//輸出結果:===我是MyClass11111===Java 牛逼!!

    }
}

示例2:

public interface MyInterface2 {
    default String getMsg(String srcMsg){
        return "===我是MyInterface2222222==="+srcMsg;
    }
}
////////////////////////////////////////////////////////////////
public class MySubClass2 implements MyInterface1,MyInterface2 {
    @Override
    public String getMsg(String srcMsg) {
        //同時實現了 MyInterface1,MyInterface2  接口,根據 類優先 原則,兩個父接口中都提供了相同的方法,那麼子類中就必須重寫這個方法來解決衝突。
        return MyInterface1.super.getMsg(srcMsg);
        //return MyInterface2.super.getMsg(srcMsg);
        //return "------"+srcMsg;
    }
}
////////////////////////////////////////////////////////////////
public class InterfaceTest {

    public static void main(String[] args) {
        MySubClass2 ms2 = new MySubClass2();

        //MySubClass2 重新實現了兩個父接口中都存在的相同名稱的方法。
        System.out.println(ms2.getMsg(srcMsg));//輸出結果:===我是MyInterface1111111===Java 牛逼!!
    }
}

 

接口中的靜態方法

在Java8中,接口中允許添加 靜態方法,使用方式:“接口名.方法名”

示例:

public interface MyInterface3 {
    static String getMsg(String msg){
        return "我是接口中的靜態方法:"+msg;
    }

    static void main(String[] args) {
        System.out.println(MyInterface3.getMsg("Java牛逼!!"));
    }
}

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

南投搬家公司費用需注意的眉眉角角,別等搬了再說!

新北清潔公司,居家、辦公、裝潢細清專業服務

※教你寫出一流的銷售文案?

分類
發燒車訊

磨皮美顏算法 附完整C代碼

前言

2017年底時候寫了這篇《集 降噪 美顏 虛化 增強 為一體的極速圖像潤色算法 附Demo程序

這也算是學習過程中比較有成就感的一個算法。

自2015年做算法開始到今天,還有個把月,就滿五年了。

歲月匆匆,人生能有多少個五年。

這五年裡,從音頻圖像到視頻,從傳統算法到深度學習,從2D到3D各種算法幾乎都走了一個遍。

好在,不論在哪個領域都能有些許建樹,這是博主我自身很欣慰的事情。

雖然有所間斷但是仍然堅持寫博客,並且堅持完整開源分享。

目的就是為了幫助那些一開始跟我一樣,想要學習算法的萌新,

一起踏入算法領域去跟大家“排排坐,吃果果”。

引子

在這個特別的時間點,就想做點特別的事情。

那就是開源當時寫的這個“美顏算法”,開源代碼和當時的版本有些許出入,但是思路是一樣的。

早些年的時候大家發現採用保邊濾波的思路可以做到降噪,進而衍生出來針對皮膚的降噪,簡稱磨皮或者美顏。

從此百家爭鳴,而這個課題到今天也還在發展,當然日新月異了。

故此,想談談針對美顏磨皮的一些算法思路,為後續想學習並改進的萌新提供一些養分。

概述美顏磨皮方法

1.基於保邊降噪

這類算法有很多方法,但不外乎2種基礎思路,

基於空間和基於頻率,當然再展開的話,還可以細分為紋理和顏色。

例如通過膚色或紋理區域做針對性的處理。

這類算法的優點是計算簡單,通用型強,但缺點就是不夠細膩完美。

2.基於人臉檢測貼圖

這種嚴格意義上來說,是易容術,就是基於人臉檢測出的關鍵數據。

例如人臉關鍵點,將人臉皮膚區域提取出來,重新貼上一張事先準備的皮膚圖,進行皮膚貼合融合。

臉已經被置換了,效果很贊。有點繆修斯之船的味道。

這類算法優點是效果極其驚艷,但是算法複雜通用性差,一般只能針對少數角度表情的人臉。

3.結合1和2的深度學習方法

前兩者的思路早期大行其道,如今到了數據時代,

基於深度學習的工具方案,可以非常好地結合前兩者的思路,進行訓練,求一個數據解。

很多人將深度學習等同於AI,這個做法有點激進。

基於深度學習的做法,仍然存在前兩者一樣的問題,簡單的不夠細膩,細膩的不夠簡單,

而如果要設計一個優秀的模型,其實跟設計一個傳統算法一樣困難。

基於數據驅動的算法,驗證成本非常高,可控性比較差,當然在金錢的驅動下確實能產出還不錯的算法模型。

這類算法的優點,往往能求出很不錯的局部最優解,甚至以假亂真,缺點就是需要大量金錢和數據的驅動。

總結來說的話,不付出代價,就別想有好的結果,非常的現實。

 

據我所知目前使用最多的方案是第一種和第三種,第二種可操作性不強,只有少數公司掌握了這方面的核心技術。

但是不管是哪種方案,無非就是以下幾個步驟。

1.確定人臉的皮膚區域

2.定位人臉的雜質(痘痘,斑點,痣,膚色不均等)

3.根據定位到雜質進行填補修復或濾除

 

這就是圖像處理經典三部曲

1.定位 2.檢測 3.處理

每一個細分展開,都非常宏大且複雜的算法。

 

以上,僅以磨皮美顏為例子,闡述圖像方面的算法想要或正在解決什麼樣的問題。

我們在工作中碰到的圖像問題無非以上幾個核心問題,問題都是類似的,只是不同場景和需求下各有難處。

本次開源的算法思路

本次開源的算法是基於保邊降噪的思路,

當然這個思路可以通過改寫,參數化后可以集成到深度學習中,作為一個先驗層輔助訓練。

算法步驟如下:

1.  檢測皮膚顏色,確定皮膚占圖像的比率

2. 根據皮膚比率進行邊緣檢測,產出細節映射圖

3. 基於細節映射圖和磨皮強度進行保邊降噪

4. 對降噪好的圖進行再一次膚色檢測,保留膚色區域的降噪,其他區域還原為原圖

步驟比較簡單,但是要同時兼顧效果性能,是很不容易的。

當然這個算法膚色檢測那一部分可以採用深度學習“語義分割”方面的思路進而改進效果。

做得好,將本算法改良到准商用,驚艷的程度是沒有問題的。

深度學習相關技術就不展開細說了,有能力的朋友,感興趣的話,可以自行實操。

 

完整源代碼開源地址:

https://github.com/cpuimage/skin_smoothing

項目沒有第三方依賴,完整純c代碼。

有編譯問題的同學自行參考《Windows下C,C++開發環境搭建指南》搭建編譯環境。

附上算法效果的示例:

 

 

以上,權當拋磚引玉之用。

授人以魚不如授人以漁。

 

2020年,疫情之下,

願大家都能事業有成,身體健康。

世界和平,人們皆友愛。

 

若有其他相關問題或者需求也可以郵件聯繫俺探討。

郵箱地址是: gaozhihan@vip.qq.com

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※幫你省時又省力,新北清潔一流服務好口碑

※別再煩惱如何寫文案,掌握八大原則!

※產品缺大量曝光嗎?你需要的是一流包裝設計!

分類
發燒車訊

Kafka源碼解析(二)—Log分析

上一篇文章講了LogSegment和Log的初始化,這篇來講講Log的主要操作有哪些。

一般來說Log 的常見操作分為 4 大部分。

  1. 高水位管理操作
  2. 日誌段管理
  3. 關鍵位移值管理
  4. 讀寫操作

其中關鍵位移值管理主要包含Log Start Offset 和 LEO等。

高水位HighWatermark

高水位HighWatermark初始化

高水位是通過LogOffsetMetadata類來定義的:

@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)

這裏傳入的初始值是logStartOffset,表明當首次構建高水位時,它會被賦值成 Log Start Offset 值。

我們再來看看LogOffsetMetadata類:

case class LogOffsetMetadata(messageOffset: Long,
                             segmentBaseOffset: Long = Log.UnknownOffset,
                             relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) {

  // check if this offset is already on an older segment compared with the given offset
  def onOlderSegment(that: LogOffsetMetadata): Boolean = {
    if (messageOffsetOnly)
      throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")

    this.segmentBaseOffset < that.segmentBaseOffset
  }
  ...
}

LogOffsetMetadata有三個初始值:

messageOffset表示消息位移值;

segmentBaseOffset保存消息位移值所在日誌段的起始位移,用來判斷兩條消息是否處於同一個日誌段的;

relativePositionSegment保存消息位移值所在日誌段的物理磁盤位置;

上面的onOlderSegment表明,要比較哪個日誌段更老,只需要比較segmentBaseOffset的大小就可以了。

高水位HighWatermark設值與更新

  private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = {
    //高水位的值不可能小於零
    if (newHighWatermark.messageOffset < 0)
      throw new IllegalArgumentException("High watermark offset should be non-negative")

    lock synchronized {// 保護Log對象修改的Monitor鎖
      highWatermarkMetadata = newHighWatermark// 賦值新的高水位值
      //事務相關,暫時忽略
      producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset)
      //事務相關,暫時忽略
      maybeIncrementFirstUnstableOffset()
    }
    trace(s"Setting high watermark $newHighWatermark")
  }

設置高水位的值是很簡單的,首先校驗高水位的值是否大於零,然後通過直接加鎖之後更新高水位的值。

更新更新高水位值的方法有兩個:updateHighWatermark 和 maybeIncrementHighWatermark,我們分別分析。

updateHighWatermark

  def updateHighWatermark(hw: Long): Long = {
    //傳入的高水位的值如果小於logStartOffset,設置為logStartOffset
    val newHighWatermark = if (hw < logStartOffset)
      logStartOffset
    //  傳入的高水位的值如果大於LEO,那麼設置為LEO
    else if (hw > logEndOffset)
      logEndOffset
    else
      hw
    //將newHighWatermark封裝成一個LogOffsetMetadata然後更新高水位的值
    updateHighWatermarkMetadata(LogOffsetMetadata(newHighWatermark))
    //返回新的高水位的值
    newHighWatermark
  }

這個方法邏輯也很簡潔,因為高水位的值是不可能大於LEO,也不可能小於logStartOffset,所以需要對傳入的hw校驗然後設置成正確的值,然後調用上面的設置高水位的方法設值。

maybeIncrementHighWatermark

/**
 * Update the high watermark to a new value if and only if it is larger than the old value. It is
 * an error to update to a value which is larger than the log end offset.
 *
 * This method is intended to be used by the leader to update the high watermark after follower
 * fetch offsets have been updated.
 *
 * @return the old high watermark, if updated by the new value
 */
//  當新的高水位的值大於舊的高水位的值時才做更新,如果新的高水位的值大於LEO,會報錯
//  這個方法是leader在確認Follower已經拉取了日誌之後才做更新
def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = {
  //如果新的高水位的值大於LEO,會報錯
  if (newHighWatermark.messageOffset > logEndOffset)
    throw new IllegalArgumentException(s"High watermark $newHighWatermark update exceeds current " +
      s"log end offset $logEndOffsetMetadata")

  lock.synchronized {
    // 獲取老的高水位值
    val oldHighWatermark = fetchHighWatermarkMetadata

    // Ensure that the high watermark increases monotonically. We also update the high watermark when the new
    // offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment.
    //只有當新的高水位值大於老的值,因為要維護高水位的單調遞增性
    //或者當新的高水位值和老的高水位值相等,但是新的高水位在一個新的日誌段上面時才做更新
    if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
      (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
      updateHighWatermarkMetadata(newHighWatermark)
      Some(oldHighWatermark)// 返回老的高水位值
    } else {
      None
    }
  }
}

這個方法我將這個方法的英文註釋貼出來了,這個註釋的說明我也寫到方法上了,邏輯很清楚,大家看看註釋應該能理解。

這兩個方法主要的區別是,updateHighWatermark 方法,主要用在 Follower 副本從 Leader 副本獲取到消息后更新高水位值。而 maybeIncrementHighWatermark 方法,主要是用來更新 Leader 副本的高水位值。

上面的方法中通過調用fetchHighWatermarkMetadata來獲取高水位的值,我們下面看看這個方法:

fetchHighWatermarkMetadata

  private def fetchHighWatermarkMetadata: LogOffsetMetadata = {
    // 讀取時確保日誌不能被關閉
    checkIfMemoryMappedBufferClosed()

    val offsetMetadata = highWatermarkMetadata
    if (offsetMetadata.messageOffsetOnly) {//沒有獲得到完整的高水位元數據
      lock.synchronized {
        // 通過讀日誌文件的方式把完整的高水位元數據信息拉出來
        val fullOffset = convertToOffsetMetadataOrThrow(highWatermark)
        updateHighWatermarkMetadata(fullOffset)
        fullOffset
      }
    } else {
      offsetMetadata
    }
  }

  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = {
    //通過給的offset,去日誌文件中找到相應的日誌信息
    val fetchDataInfo = read(offset,
      maxLength = 1,
      isolation = FetchLogEnd,
      minOneMessage = false)
    fetchDataInfo.fetchOffsetMetadata
  }

然後我們提前看一下日誌的read方法,是如何根據索引讀取數據的:

日誌段操作

日誌讀取操作

read

  def read(startOffset: Long,
           maxLength: Int,
           isolation: FetchIsolation,
           minOneMessage: Boolean): FetchDataInfo = {
    maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {
      trace(s"Reading $maxLength bytes from offset $startOffset of length $size bytes")

      //convertToOffsetMetadataOrThrow傳進來是FetchLogEnd,所以這裡是false
      val includeAbortedTxns = isolation == FetchTxnCommitted
 
      // 由於沒有使用鎖,所以使用變量緩存當前的nextOffsetMetadata狀態
      val endOffsetMetadata = nextOffsetMetadata
      val endOffset = endOffsetMetadata.messageOffset
      // 到日字段中根據索引尋找最近的日誌段
      var segmentEntry = segments.floorEntry(startOffset)

      // return error on attempt to read beyond the log end offset or read below log start offset
      // 這裏給出了幾種異常場景:
      // 1. 給的日誌索引大於最大值;
      // 2. 通過索引找的日誌段為空;
      // 3. 給的日誌索引小於logStartOffset
      if (startOffset > endOffset || segmentEntry == null || startOffset < logStartOffset)
        throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
          s"but we only have log segments in the range $logStartOffset to $endOffset.")

      //convertToOffsetMetadataOrThrow傳進來是FetchLogEnd,所以最大值是endOffsetMetadata
      // 查看一下讀取隔離級別設置。
      // 普通消費者能夠看到[Log Start Offset, LEO)之間的消息
      // 事務型消費者只能看到[Log Start Offset, Log Stable Offset]之間的消息。Log Stable Offset(LSO)是比LEO值小的位移值,為Kafka事務使用
      // Follower副本消費者能夠看到[Log Start Offset,高水位值]之間的消息
      val maxOffsetMetadata = isolation match {
        case FetchLogEnd => endOffsetMetadata
        case FetchHighWatermark => fetchHighWatermarkMetadata
        case FetchTxnCommitted => fetchLastStableOffsetMetadata
      }
      //如果尋找的索引等於maxOffsetMetadata,那麼直接返回
      if (startOffset == maxOffsetMetadata.messageOffset) {
        return emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
      //如果尋找的索引大於maxOffsetMetadata,返回空的消息集合,因為沒法讀取任何消息
      } else if (startOffset > maxOffsetMetadata.messageOffset) {
        val startOffsetMetadata = convertToOffsetMetadataOrThrow(startOffset)
        return emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns)
      }
 
      // 開始遍曆日志段對象,直到讀出東西來或者讀到日誌末尾
      while (segmentEntry != null) {
        val segment = segmentEntry.getValue
        // 找到日誌段中最大的日誌位移
        val maxPosition = { 
          if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) {
            maxOffsetMetadata.relativePositionInSegment
          } else {
            segment.size
          }
        }
        // 根據位移信息從日誌段中讀取日誌信息
        val fetchInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
        // 如果找不到日誌信息,那麼去日誌段集合中找更大的日誌位移的日誌段
        if (fetchInfo == null) {
          segmentEntry = segments.higherEntry(segmentEntry.getKey)
        } else {
          return if (includeAbortedTxns)
            addAbortedTransactions(startOffset, segmentEntry, fetchInfo)
          else
            fetchInfo
        }
      }

      //找了所有日誌段的位移依然找不到,這可能是因為大於指定的日誌位移的消息都被刪除了,這種情況返回空
      FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
    }
  }

read方法,有四個參數,分別是:

  • startOffset:讀取的日誌索引位置。
  • maxLength:讀取數據量長度。
  • isolation:隔離級別,多用於 Kafka 事務。
  • minOneMessage:是否至少返回一條消息。設想如果消息很大,超過了 maxLength,正常情況下 read 方法永遠不會返回任何消息。但如果設置了該參數為 true,read 方法就保證至少能夠返回一條消息。

代碼中使用了segments,來根據位移查找日誌段:

  private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]

我們下面看看read方法具體做了哪些事:

  1. 由於沒有使用鎖,所以使用變量緩存當前的nextOffsetMetadata狀態,作為最大索引LEO;
  2. 去日誌段集合里尋找小於或等於指定索引的日誌段;
  3. 校驗異常情況:
    1. startOffset是不是超過了LEO;
    2. 是不是日誌段集合里沒有索引小於startOffset;
    3. startOffset小於Log Start Offset;
  4. 接下來獲取一下隔離級別;
  5. 如果尋找的索引等於LEO,那麼返回空;
  6. 如果尋找的索引大於LEO,返回空的消息集合,因為沒法讀取任何消息;
  7. 開始遍曆日志段對象,直到讀出東西來或者讀到日誌末尾;
    1. 首先找到日誌段中最大的位置;
    2. 根據位移信息從日誌段中讀取日誌信息(這個read方法我們上一篇已經講解過了);
    3. 如果找不到日誌信息,那麼讀取日誌段集合中下一個日誌段;
  8. 找了所有日誌段的位移依然找不到,這可能是因為大於指定的日誌位移的消息都被刪除了,這種情況返回空;

我們在上面的read操作中可以看到,使用了segments來查找日誌。我們主要看看刪除操作

刪除日誌

刪除日誌的入口是:deleteOldSegments

  //  如果topic deletion開關是打開的,那麼會刪去過期的日誌段以及超過設置保留日誌大小的日誌
  // 無論是否開啟刪除規則,都會刪除在log start offset之前的日誌段
  def deleteOldSegments(): Int = {
    if (config.delete) {
      deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
    } else {
      deleteLogStartOffsetBreachedSegments()
    }
  }

deleteOldSegments方法會判斷是否開啟刪除規則,如果開啟,那麼會分別調用:

deleteRetentionMsBreachedSegments刪除segment的時間戳超過了設置時間的日誌段;

deleteRetentionSizeBreachedSegments刪除日誌段空間超過設置空間大小的日誌段;

deleteLogStartOffsetBreachedSegments刪除日誌段的baseOffset小於logStartOffset的日誌段;

我這裏列舉一下這三個方法主要是怎麼實現的:

  private def deleteRetentionMsBreachedSegments(): Int = {
    if (config.retentionMs < 0) return 0
    val startMs = time.milliseconds
    //調用deleteOldSegments方法,並傳入匿名函數,判斷當前的segment的時間戳是否超過了設置時間
    deleteOldSegments((segment, _) => startMs - segment.largestTimestamp > config.retentionMs,
      reason = s"retention time ${config.retentionMs}ms breach")
  }
  
  private def deleteRetentionSizeBreachedSegments(): Int = {
    if (config.retentionSize < 0 || size < config.retentionSize) return 0
    var diff = size - config.retentionSize
    //判斷日誌段空間是否超過設置空間大小
    //shouldDelete函數會將傳入的日誌段去減diff,直到小於等於零
    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = {
      if (diff - segment.size >= 0) {
        diff -= segment.size
        true
      } else {
        false
      }
    }

    deleteOldSegments(shouldDelete, reason = s"retention size in bytes ${config.retentionSize} breach")
  }
  
  private def deleteLogStartOffsetBreachedSegments(): Int = {
    //shouldDelete函數主要判斷日誌段的baseOffset是否小於logStartOffset
    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) =
      nextSegmentOpt.exists(_.baseOffset <= logStartOffset)

    deleteOldSegments(shouldDelete, reason = s"log start offset $logStartOffset breach")
  }

這種寫代碼的方式非常的靈活,通過不同方法設置不同的函數來實現代碼復用的目的,最後都是通過調用deleteOldSegments來實現刪除日誌段的目的。

下面我們來看一下deleteOldSegments的操作:

deleteOldSegments

這個deleteOldSegments方法和上面的入口方法傳入的參數是不一致的,這個方法傳入了一個predicate函數,用於判斷哪些日誌段是可以被刪除的,reason用來說明被刪除的原因。

  private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = {
    //刪除任何匹配到predicate規則的日誌段
    lock synchronized {
      val deletable = deletableSegments(predicate)
      if (deletable.nonEmpty)
        info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason")
      deleteSegments(deletable)
    }
  }

這個方法調用了兩個主要的方法,一個是deletableSegments,用於獲取可以被刪除的日誌段的集合;deleteSegments用於刪除日誌段。

deletableSegments

  private def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
    //如果日誌段是空的,那麼直接返回
    if (segments.isEmpty) {
      Seq.empty
    } else {
      val deletable = ArrayBuffer.empty[LogSegment]
      var segmentEntry = segments.firstEntry
      //如果日誌段集合不為空,找到第一個日誌段
      while (segmentEntry != null) {
        val segment = segmentEntry.getValue
        //獲取下一個日誌段
        val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey)
        val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) = if (nextSegmentEntry != null)
          (nextSegmentEntry.getValue, nextSegmentEntry.getValue.baseOffset, false)
        else
          (null, logEndOffset, segment.size == 0)
        //如果下一個日誌段的位移沒有大於或等於HW,並且日誌段是匹配predicate函數的,下一個日誌段也不是空的
        //那麼將這個日誌段放入可刪除集合中,然後遍歷下一個日誌段
        if (highWatermark >= upperBoundOffset && predicate(segment, Option(nextSegment)) && !isLastSegmentAndEmpty) {
          deletable += segment
          segmentEntry = nextSegmentEntry
        } else {
          segmentEntry = null
        }
      }
      deletable
    }
  }

這個方法邏輯十分清晰,主要做了如下幾件事:

  1. 判斷日誌段集合是否為空,為空那麼直接返回空集合;

  2. 如果日誌段集合不為空,那麼從日誌段集合的第一個日誌段開始遍歷;

  3. 判斷當前被遍曆日志段是否能夠被刪除

    1. 日誌段的下一個日誌段的位移有沒有大於或等於HW;
    2. 日誌段是否能夠通過predicate函數校驗;
    3. 日誌段是否是最後一個日誌段;
  4. 將符合條件的日誌段都加入到deletable集合中,並返回。

接下來調用deleteSegments函數:

  private def deleteSegments(deletable: Iterable[LogSegment]): Int = {
    maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
      val numToDelete = deletable.size
      if (numToDelete > 0) {
        // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
        // 我們至少保證要存在一個日誌段,如果要刪除所有的日誌;
        //所以調用roll方法創建一個全新的日誌段對象,並且關閉當前寫入的日誌段對象;
        if (segments.size == numToDelete)
          roll()
        lock synchronized {
          // 確保Log對象沒有被關閉
          checkIfMemoryMappedBufferClosed()
          // remove the segments for lookups
          // 刪除給定的日誌段對象以及底層的物理文件
          removeAndDeleteSegments(deletable, asyncDelete = true)
          // 嘗試更新日誌的Log Start Offset值
          maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset)
        }
      }
      numToDelete
    }
  }

寫日誌

寫日誌的方法主要有兩個:

appendAsLeader

  def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true,
                     interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
    append(records, isFromClient, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch)
  }

appendAsFollower

  def appendAsFollower(records: MemoryRecords): LogAppendInfo = {
    append(records, isFromClient = false, interBrokerProtocolVersion = ApiVersion.latestVersion, assignOffsets = false, leaderEpoch = -1)
  }

appendAsLeader 是用於寫 Leader 副本的,appendAsFollower 是用於 Follower 副本同步的。它們的底層都調用了 append 方法

append

  private def append(records: MemoryRecords, isFromClient: Boolean, interBrokerProtocolVersion: ApiVersion, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
    maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
      // 第1步:分析和驗證待寫入消息集合,並返回校驗結果
      val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient)

      // return if we have no valid messages or if this is a duplicate of the last appended entry
      // 如果壓根就不需要寫入任何消息,直接返回即可
      if (appendInfo.shallowCount == 0)
        return appendInfo

      // trim any invalid bytes or partial messages before appending it to the on-disk log
      // 第2步:消息格式規整,即刪除無效格式消息或無效字節
      var validRecords = trimInvalidBytes(records, appendInfo)

      // they are valid, insert them in the log
      lock synchronized {
        // 確保Log對象未關閉
        checkIfMemoryMappedBufferClosed()
        //需要分配位移值
        if (assignOffsets) {
          // assign offsets to the message set
          // 第3步:使用當前LEO值作為待寫入消息集合中第一條消息的位移值,nextOffsetMetadata為LEO值
          val offset = new LongRef(nextOffsetMetadata.messageOffset)
          appendInfo.firstOffset = Some(offset.value)
          val now = time.milliseconds
          val validateAndOffsetAssignResult = try {
            LogValidator.validateMessagesAndAssignOffsets(validRecords,
              topicPartition,
              offset,
              time,
              now,
              appendInfo.sourceCodec,
              appendInfo.targetCodec,
              config.compact,
              config.messageFormatVersion.recordVersion.value,
              config.messageTimestampType,
              config.messageTimestampDifferenceMaxMs,
              leaderEpoch,
              isFromClient,
              interBrokerProtocolVersion,
              brokerTopicStats)
          } catch {
            case e: IOException =>
              throw new KafkaException(s"Error validating messages while appending to log $name", e)
          }
          // 更新校驗結果對象類LogAppendInfo
          validRecords = validateAndOffsetAssignResult.validatedRecords
          appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
          appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
          appendInfo.lastOffset = offset.value - 1
          appendInfo.recordConversionStats = validateAndOffsetAssignResult.recordConversionStats
          if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
            appendInfo.logAppendTime = now

          // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
          // format conversion)
          // 第4步:驗證消息,確保消息大小不超限
          if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
            for (batch <- validRecords.batches.asScala) {
              if (batch.sizeInBytes > config.maxMessageSize) {
                // we record the original message set size instead of the trimmed size
                // to be consistent with pre-compression bytesRejectedRate recording
                brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
                brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
                throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +
                  s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")
              }
            }
          }
          // 直接使用給定的位移值,無需自己分配位移值
        } else {
          // we are taking the offsets we are given
          if (!appendInfo.offsetsMonotonic)// 確保消息位移值的單調遞增性
            throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " +
                                                 records.records.asScala.map(_.offset))

          if (appendInfo.firstOrLastOffsetOfFirstBatch < nextOffsetMetadata.messageOffset) {
            // we may still be able to recover if the log is empty
            // one example: fetching from log start offset on the leader which is not batch aligned,
            // which may happen as a result of AdminClient#deleteRecords()
            val firstOffset = appendInfo.firstOffset match {
              case Some(offset) => offset
              case None => records.batches.asScala.head.baseOffset()
            }

            val firstOrLast = if (appendInfo.firstOffset.isDefined) "First offset" else "Last offset of the first batch"
            throw new UnexpectedAppendOffsetException(
              s"Unexpected offset in append to $topicPartition. $firstOrLast " +
              s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. " +
              s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" +
              s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset",
              firstOffset, appendInfo.lastOffset)
          }
        }

        // update the epoch cache with the epoch stamped onto the message by the leader
        // 第5步:更新Leader Epoch緩存
        validRecords.batches.asScala.foreach { batch =>
          if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
            maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
          } else {
            // In partial upgrade scenarios, we may get a temporary regression to the message format. In
            // order to ensure the safety of leader election, we clear the epoch cache so that we revert
            // to truncation by high watermark after the next leader election.
            leaderEpochCache.filter(_.nonEmpty).foreach { cache =>
              warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")
              cache.clearAndFlush()
            }
          }
        }

        // check messages set size may be exceed config.segmentSize
        // 第6步:確保消息大小不超限
        if (validRecords.sizeInBytes > config.segmentSize) {
          throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " +
            s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")
        }

        // maybe roll the log if this segment is full
        // 第7步:執行日誌切分。當前日誌段剩餘容量可能無法容納新消息集合,因此有必要創建一個新的日誌段來保存待寫入的所有消息
        //下面情況將會執行日誌切分:
        //logSegment 已經滿了
        //日誌段中的第一個消息的maxTime已經過期
        //index索引滿了
        val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)

        val logOffsetMetadata = LogOffsetMetadata(
          messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
          segmentBaseOffset = segment.baseOffset,
          relativePositionInSegment = segment.size)

        // now that we have valid records, offsets assigned, and timestamps updated, we need to
        // validate the idempotent/transactional state of the producers and collect some metadata
        // 第8步:驗證事務狀態
        val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
          logOffsetMetadata, validRecords, isFromClient)

        maybeDuplicate.foreach { duplicate =>
          appendInfo.firstOffset = Some(duplicate.firstOffset)
          appendInfo.lastOffset = duplicate.lastOffset
          appendInfo.logAppendTime = duplicate.timestamp
          appendInfo.logStartOffset = logStartOffset
          return appendInfo
        }
        // 第9步:執行真正的消息寫入操作,主要調用日誌段對象的append方法實現
        segment.append(largestOffset = appendInfo.lastOffset,
          largestTimestamp = appendInfo.maxTimestamp,
          shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
          records = validRecords)

        // Increment the log end offset. We do this immediately after the append because a
        // write to the transaction index below may fail and we want to ensure that the offsets
        // of future appends still grow monotonically. The resulting transaction index inconsistency
        // will be cleaned up after the log directory is recovered. Note that the end offset of the
        // ProducerStateManager will not be updated and the last stable offset will not advance
        // if the append to the transaction index fails.
        // 第10步:更新LEO對象,其中,LEO值是消息集合中最後一條消息位移值+1
        // 前面說過,LEO值永遠指向下一條不存在的消息
        updateLogEndOffset(appendInfo.lastOffset + 1)

        // update the producer state
        // 第11步:更新事務狀態
        for (producerAppendInfo <- updatedProducers.values) {
          producerStateManager.update(producerAppendInfo)
        }

        // update the transaction index with the true last stable offset. The last offset visible
        // to consumers using READ_COMMITTED will be limited by this value and the high watermark.
        for (completedTxn <- completedTxns) {
          val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
          segment.updateTxnIndex(completedTxn, lastStableOffset)
          producerStateManager.completeTxn(completedTxn)
        }

        // always update the last producer id map offset so that the snapshot reflects the current offset
        // even if there isn't any idempotent data being written
        producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)

        // update the first unstable offset (which is used to compute LSO)
        maybeIncrementFirstUnstableOffset()

        trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +
          s"first offset: ${appendInfo.firstOffset}, " +
          s"next offset: ${nextOffsetMetadata.messageOffset}, " +
          s"and messages: $validRecords")
        // 是否需要手動落盤。一般情況下我們不需要設置Broker端參數log.flush.interval.messages
        // 落盤操作交由操作系統來完成。但某些情況下,可以設置該參數來確保高可靠性
        if (unflushedMessages >= config.flushInterval)
          flush()
        // 第12步:返回寫入結果
        appendInfo
      }
    }
  }

上面代碼的主要步驟如下:

我們下面看看analyzeAndValidateRecords是如何進行消息校驗的:

analyzeAndValidateRecords

  private def analyzeAndValidateRecords(records: MemoryRecords, isFromClient: Boolean): LogAppendInfo = {
    var shallowMessageCount = 0
    var validBytesCount = 0
    var firstOffset: Option[Long] = None
    var lastOffset = -1L
    var sourceCodec: CompressionCodec = NoCompressionCodec
    var monotonic = true
    var maxTimestamp = RecordBatch.NO_TIMESTAMP
    var offsetOfMaxTimestamp = -1L
    var readFirstMessage = false
    var lastOffsetOfFirstBatch = -1L

    for (batch <- records.batches.asScala) {
      // we only validate V2 and higher to avoid potential compatibility issues with older clients
      // 消息格式Version 2的消息批次,起始位移值必須從0開始
      if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && isFromClient && batch.baseOffset != 0)
        throw new InvalidRecordException(s"The baseOffset of the record batch in the append to $topicPartition should " +
          s"be 0, but it is ${batch.baseOffset}")

      // update the first offset if on the first message. For magic versions older than 2, we use the last offset
      // to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message).
      // For magic version 2, we can get the first offset directly from the batch header.
      // When appending to the leader, we will update LogAppendInfo.baseOffset with the correct value. In the follower
      // case, validation will be more lenient.
      // Also indicate whether we have the accurate first offset or not
      if (!readFirstMessage) {
        if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
          firstOffset = Some(batch.baseOffset) // 更新firstOffset字段
        lastOffsetOfFirstBatch = batch.lastOffset // 更新lastOffsetOfFirstBatch字段
        readFirstMessage = true
      }

      // check that offsets are monotonically increasing
      // 一旦出現當前lastOffset不小於下一個batch的lastOffset,說明上一個batch中有消息的位移值大於後面batch的消息
      // 這違反了位移值單調遞增性
      if (lastOffset >= batch.lastOffset)
        monotonic = false

      // update the last offset seen
      // 使用當前batch最後一條消息的位移值去更新lastOffset
      lastOffset = batch.lastOffset

      // Check if the message sizes are valid.
      val batchSize = batch.sizeInBytes
      // 檢查消息批次總字節數大小是否超限,即是否大於Broker端參數max.message.bytes值
      if (batchSize > config.maxMessageSize) {
        brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
        brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
        throw new RecordTooLargeException(s"The record batch size in the append to $topicPartition is $batchSize bytes " +
          s"which exceeds the maximum configured value of ${config.maxMessageSize}.")
      }

      // check the validity of the message by checking CRC
      // 執行消息批次校驗,包括格式是否正確以及CRC校驗
      if (!batch.isValid) {
        brokerTopicStats.allTopicsStats.invalidMessageCrcRecordsPerSec.mark()
        throw new CorruptRecordException(s"Record is corrupt (stored crc = ${batch.checksum()}) in topic partition $topicPartition.")
      }
      // 更新maxTimestamp字段和offsetOfMaxTimestamp
      if (batch.maxTimestamp > maxTimestamp) {
        maxTimestamp = batch.maxTimestamp
        offsetOfMaxTimestamp = lastOffset
      }
      // 累加消息批次計數器以及有效字節數,更新shallowMessageCount字段
      shallowMessageCount += 1
      validBytesCount += batchSize
      // 從消息批次中獲取壓縮器類型
      val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id)
      if (messageCodec != NoCompressionCodec)
        sourceCodec = messageCodec
    }

    // Apply broker-side compression if any
    // 獲取Broker端設置的壓縮器類型,即Broker端參數compression.type值。
    // 該參數默認值是producer,表示sourceCodec用的什麼壓縮器,targetCodec就用什麼
    val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
    // 最後生成LogAppendInfo對象並返回
    LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset,
      RecordConversionStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch)
  }

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

FB行銷專家,教你從零開始的技巧

分類
發燒車訊

Docker Dockerfile 指令詳解與實戰案例

 

Dockerfile介紹及常用指令,包括FROM,RUN,還提及了 COPY,ADD,EXPOSE,WORKDIR等,其實 Dockerfile 功能很強大,它提供了十多個指令。

 

Dockerfile介紹

Dockerfile 是一個用來構建鏡像的文本文件,文本內容包含了一條條構建鏡像所需的指令和說明。

在Docker中創建鏡像最常用的方式,就是使用Dockerfile。Dockerfile是一個Docker鏡像的描述文件,我們可以理解成火箭發射的A、B、C、D…的步驟。Dockerfile其內部包含了一條條的指令,每一條指令構建一層,因此每一條指令的內容,就是描述該層應當如何構建。

FROM 指令-指定基礎鏡像

所謂定製鏡像,那一定是以一個鏡像為基礎,在其上進行定製。而 FROM 就是指定基礎鏡像,因此一個 Dockerfile 中 FROM 是必備的指令,並且必須是第一條指令。如下:

FROM centos

 

MAINTAINER 維護者信息

該鏡像是由誰維護的

MAINTAINER lightzhang lightzhang@xxx.com

 

ENV 設置環境變量

格式有兩種:

ENV <key> <value>
ENV <key1>=<value1> <key2>=<value2>...

 

這個指令很簡單,就是設置環境變量而已,無論是後面的其它指令,如 RUN,還是運行時的應用,都可以直接使用這裏定義的環境變量。

ENV VERSION=1.0 DEBUG=on \
    NAME="Happy Feet"

這個例子中演示了如何換行,以及對含有空格的值用雙引號括起來的辦法,這和 Shell 下的行為是一致的。

下列指令可以支持環境變量展開:

ADD、COPY、ENV、EXPOSE、FROM、LABEL、USER、WORKDIR、VOLUME、STOPSIGNAL、ONBUILD、RUN。

可以從這個指令列表裡感覺到,環境變量可以使用的地方很多,很強大。通過環境變量,我們可以讓一份 Dockerfile 製作更多的鏡像,只需使用不同的環境變量即可。

 

ARG 構建參數

格式:ARG <參數名>[=<默認值>]

構建參數和 ENV 的效果一樣,都是設置環境變量。所不同的是,ARG 所設置的構建環境的環境變量,在將來容器運行時是不會存在這些環境變量的。但是不要因此就使用 ARG 保存密碼之類的信息,因為 docker history 還是可以看到所有值的。

Dockerfile 中的 ARG 指令是定義參數名稱,以及定義其默認值。該默認值可以在構建命令 docker build 中用 –build-arg <參數名>=<值> 來覆蓋

在 1.13 之前的版本,要求 –build-arg 中的參數名,必須在 Dockerfile 中用 ARG 定義過了,換句話說,就是 –build-arg 指定的參數,必須在 Dockerfile 中使用了。如果對應參數沒有被使用,則會報錯退出構建。

從 1.13 開始,這種嚴格的限制被放開,不再報錯退出,而是显示警告信息,並繼續構建。這對於使用 CI 系統,用同樣的構建流程構建不同的 Dockerfile 的時候比較有幫助,避免構建命令必須根據每個 Dockerfile 的內容修改。

 

RUN 執行命令

RUN 指令是用來執行命令行命令的。由於命令行的強大能力,RUN 指令在定製鏡像時是最常用的指令之一。其格式有兩種:

shell 格式:RUN <命令>,就像直接在命令行中輸入的命令一樣。如下:

RUN echo '<h1>Hello, Docker!</h1>' > /usr/share/nginx/html/index.html

 

exec 格式:RUN [“可執行文件”, “參數1”, “參數2”],這更像是函數調用中的格式。

Dockerfile 中每一個指令都會建立一層,RUN 也不例外。每一個 RUN 的行為,都會新建立一層,在其上執行這些命令,執行結束后,commit 這一層的修改,構成新的鏡像。

Dockerfile 不推薦寫法:

 1 FROM debian:stretch
 2 
 3 RUN apt-get update
 4 RUN apt-get install -y gcc libc6-dev make wget
 5 RUN wget -O redis.tar.gz "http://download.redis.io/releases/redis-5.0.3.tar.gz"
 6 RUN mkdir -p /usr/src/redis
 7 RUN tar -xzf redis.tar.gz -C /usr/src/redis --strip-components=1
 8 RUN make -C /usr/src/redis
 9 RUN make -C /usr/src/redis install
10 RUN rm redis.tar.gz

上面的這種寫法,創建了 8 層鏡像。這是完全沒有意義的,而且很多運行時不需要的東西,都被裝進了鏡像里,比如編譯環境、更新的軟件包等等。最後一行即使刪除了軟件包,那也只是當前層的刪除;雖然我們看不見這個包了,但軟件包卻早已存在於鏡像中並一直跟隨着鏡像,沒有真正的刪除。

結果就是產生非常臃腫、非常多層的鏡像,不僅僅增加了構建部署的時間,也很容易出錯。 這是很多初學 Docker 的人常犯的一個錯誤。

另外:Union FS 是有最大層數限制的,比如 AUFS,曾經是最大不得超過 42 層,現在是不得超過 127 層。

 

Dockerfile 正確寫法:

 1 FROM debian:stretch
 2 
 3 RUN buildDeps='gcc libc6-dev make wget' \
 4     && apt-get update \
 5     && apt-get install -y $buildDeps \
 6     && wget -O redis.tar.gz "http://download.redis.io/releases/redis-5.0.3.tar.gz" \
 7     && mkdir -p /usr/src/redis \
 8     && tar -xzf redis.tar.gz -C /usr/src/redis --strip-components=1 \
 9     && make -C /usr/src/redis \
10     && make -C /usr/src/redis install \
11     && rm -rf /var/lib/apt/lists/* \
12     && rm redis.tar.gz \
13     && rm -r /usr/src/redis \
14     && apt-get purge -y --auto-remove $buildDeps

這裏沒有使用很多個 RUN 對應不同的命令,而是僅僅使用一個 RUN 指令,並使用 && 將各個所需命令串聯起來。將之前的 8 層,簡化為了 1 層,且後面刪除了不需要的包和目錄。在撰寫 Dockerfile 的時候,要經常提醒自己,這並不是在寫 Shell 腳本,而是在定義每一層該如何構建。因此鏡像構建時,一定要確保每一層只添加真正需要添加的東西,任何無關的東西都應該清理掉。

很多人初學 Docker 製作出了很臃腫的鏡像的原因之一,就是忘記了每一層構建的最後一定要清理掉無關文件。

 

COPY 複製文件

格式:

1 COPY [--chown=<user>:<group>] <源路徑>... <目標路徑>
2 COPY [--chown=<user>:<group>] ["<源路徑1>",... "<目標路徑>"]

 

COPY 指令將從構建上下文目錄中 <源路徑> 的文件/目錄複製到新的一層的鏡像內的 <目標路徑> 位置。如:

COPY package.json /usr/src/app/

 

<源路徑> 可以是多個,甚至可以是通配符,其通配符規則要滿足 Go 的 filepath.Match 規則,如:

1 COPY hom* /mydir/
2 COPY hom?.txt /mydir/

<目標路徑> 可以是容器內的絕對路徑,也可以是相對於工作目錄的相對路徑(工作目錄可以用 WORKDIR 指令來指定)。目標路徑不需要事先創建,如果目錄不存在會在複製文件前先行創建缺失目錄。

此外,還需要注意一點,使用 COPY 指令,源文件的各種元數據都會保留。比如讀、寫、執行權限、文件變更時間等。這個特性對於鏡像定製很有用。特別是構建相關文件都在使用 Git 進行管理的時候。

在使用該指令的時候還可以加上 –chown=<user>:<group> 選項來改變文件的所屬用戶及所屬組。

1 COPY --chown=55:mygroup files* /mydir/
2 COPY --chown=bin files* /mydir/
3 COPY --chown=1 files* /mydir/
4 COPY --chown=10:11 files* /mydir/

 

ADD 更高級的複製文件

ADD 指令和 COPY 的格式和性質基本一致。但是在 COPY 基礎上增加了一些功能。

如果 <源路徑> 為一個 tar 壓縮文件的話,壓縮格式為 gzip, bzip2 以及 xz 的情況下,ADD 指令將會自動解壓縮這個壓縮文件到 <目標路徑> 去。

在某些情況下,如果我們真的是希望複製個壓縮文件進去,而不解壓縮,這時就不可以使用 ADD 命令了。

在 Docker 官方的 Dockerfile 最佳實踐文檔 中要求,盡可能的使用 COPY,因為 COPY 的語義很明確,就是複製文件而已,而 ADD 則包含了更複雜的功能,其行為也不一定很清晰。最適合使用 ADD 的場合,就是所提及的需要自動解壓縮的場合。

特別說明:在 COPY 和 ADD 指令中選擇的時候,可以遵循這樣的原則,所有的文件複製均使用 COPY 指令,僅在需要自動解壓縮的場合使用 ADD。

在使用該指令的時候還可以加上 –chown=<user>:<group> 選項來改變文件的所屬用戶及所屬組。

1 ADD --chown=55:mygroup files* /mydir/
2 ADD --chown=bin files* /mydir/
3 ADD --chown=1 files* /mydir/
4 ADD --chown=10:11 files* /mydir/

 

WORKDIR 指定工作目錄

格式為 WORKDIR <工作目錄路徑>

使用 WORKDIR 指令可以來指定工作目錄(或者稱為當前目錄),以後各層的當前目錄就被改為指定的目錄,如該目錄不存在,WORKDIR 會幫你建立目錄。

之前提到一些初學者常犯的錯誤是把 Dockerfile 等同於 Shell 腳本來書寫,這種錯誤的理解還可能會導致出現下面這樣的錯誤:

1 RUN cd /app
2 RUN echo "hello" > world.txt

如果將這個 Dockerfile 進行構建鏡像運行后,會發現找不到 /app/world.txt 文件,或者其內容不是 hello。原因其實很簡單,在 Shell 中,連續兩行是同一個進程執行環境,因此前一個命令修改的內存狀態,會直接影響后一個命令;而在 Dockerfile 中,這兩行 RUN 命令的執行環境根本不同,是兩個完全不同的容器。這就是對 Dockerfile 構建分層存儲的概念不了解所導致的錯誤。

之前說過每一個 RUN 都是啟動一個容器、執行命令、然後提交存儲層文件變更。第一層 RUN cd /app 的執行僅僅是當前進程的工作目錄變更,一個內存上的變化而已,其結果不會造成任何文件變更。而到第二層的時候,啟動的是一個全新的容器,跟第一層的容器更完全沒關係,自然不可能繼承前一層構建過程中的內存變化。

因此如果需要改變以後各層的工作目錄的位置,那麼應該使用 WORKDIR 指令。

 

USER 指定當前用戶

格式:USER <用戶名>[:<用戶組>]

USER 指令和 WORKDIR 相似,都是改變環境狀態並影響以後的層。WORKDIR 是改變工作目錄,USER 則是改變之後層的執行 RUN, CMD 以及 ENTRYPOINT 這類命令的身份。

當然,和 WORKDIR 一樣,USER 只是幫助你切換到指定用戶而已,這個用戶必須是事先建立好的,否則無法切換。

1 RUN groupadd -r redis && useradd -r -g redis redis
2 USER redis
3 RUN [ "redis-server" ]

 

如果以 root 執行的腳本,在執行期間希望改變身份,比如希望以某個已經建立好的用戶來運行某個服務進程,不要使用 su 或者 sudo,這些都需要比較麻煩的配置,而且在 TTY 缺失的環境下經常出錯。建議使用 gosu。

1 # 建立 redis 用戶,並使用 gosu 換另一個用戶執行命令
2 RUN groupadd -r redis && useradd -r -g redis redis
3 # 下載 gosu
4 RUN wget -O /usr/local/bin/gosu "https://github.com/tianon/gosu/releases/download/1.7/gosu-amd64" \
5     && chmod +x /usr/local/bin/gosu \
6     && gosu nobody true
7 # 設置 CMD,並以另外的用戶執行
8 CMD [ "exec", "gosu", "redis", "redis-server" ]

 

VOLUME 定義匿名卷

格式為:

1 VOLUME ["<路徑1>", "<路徑2>"...]
2 VOLUME <路徑>

之前我們說過,容器運行時應該盡量保持容器存儲層不發生寫操作,對於數據庫類需要保存動態數據的應用,其數據庫文件應該保存於卷(volume)中。為了防止運行時用戶忘記將動態文件所保存目錄掛載為卷,在 Dockerfile 中,我們可以事先指定某些目錄掛載為匿名卷,這樣在運行時如果用戶不指定掛載,其應用也可以正常運行,不會向容器存儲層寫入大量數據。

 

VOLUME /data

這裏的 /data 目錄就會在運行時自動掛載為匿名卷,任何向 /data 中寫入的信息都不會記錄進容器存儲層,從而保證了容器存儲層的無狀態化。當然,運行時可以覆蓋這個掛載設置。比如:

docker run -d -v mydata:/data xxxx

在這行命令中,就使用了 mydata 這個命名卷掛載到了 /data 這個位置,替代了 Dockerfile 中定義的匿名卷的掛載配置。

 

EXPOSE 聲明端口

格式為 EXPOSE <端口1> [<端口2>...]

EXPOSE 指令是聲明運行時容器提供服務端口,這隻是一個聲明,在運行時並不會因為這個聲明應用就會開啟這個端口的服務。在 Dockerfile 中寫入這樣的聲明有兩個好處,一個是幫助鏡像使用者理解這個鏡像服務的守護端口,以方便配置映射;另一個用處則是在運行時使用隨機端口映射時,也就是 docker run -P 時,會自動隨機映射 EXPOSE 的端口。

要將 EXPOSE 和在運行時使用 -p <宿主端口>:<容器端口> 區分開來。-p,是映射宿主端口和容器端口,換句話說,就是將容器的對應端口服務公開給外界訪問,而 EXPOSE 僅僅是聲明容器打算使用什麼端口而已,並不會自動在宿主進行端口映射。

 

ENTRYPOINT 入口點

ENTRYPOINT 的格式和 RUN 指令格式一樣,分為 exec 格式和 shell 格式

ENTRYPOINT 的目的和 CMD 一樣,都是在指定容器啟動程序及參數。ENTRYPOINT 在運行時也可以替代,不過比 CMD 要略顯繁瑣,需要通過 docker run 的參數 –entrypoint 來指定。

當指定了 ENTRYPOINT 后,CMD 的含義就發生了改變,不再是直接的運行其命令,而是將 CMD 的內容作為參數【】傳給 ENTRYPOINT 指令,換句話說實際執行時,將變為:

<ENTRYPOINT> "<CMD>"

 

那麼有了 CMD 后,為什麼還要有 ENTRYPOINT 呢?這種 <ENTRYPOINT> “<CMD>“ 有什麼好處么?讓我們來看幾個場景。

場景一:讓鏡像變成像命令一樣使用

假設我們需要一個得知自己當前公網 IP 的鏡像,那麼可以先用 CMD 來實現:

1 FROM centos:7.7.1908
2 CMD [ "curl", "-s", "ifconfig.io" ]

 

假如我們使用 docker build -t myip . 來構建鏡像的話,如果我們需要查詢當前公網 IP,只需要執行:

1 $ docker run myip
2 183.226.75.148

 

嗯,這麼看起來好像可以直接把鏡像當做命令使用了,不過命令總有參數,如果我們希望加參數呢?比如從上面的 CMD 中可以看到實質的命令是 curl,那麼如果我們希望显示 HTTP 頭信息,就需要加上 -i 參數。那麼我們可以直接加 -i 參數給 docker run myip 么?

1 $ docker run myip -i
2 docker: Error response from daemon: OCI runtime create failed: container_linux.go:348: starting container process caused "exec: \"-i\": executable file not found in $PATH": unknown.
3 ERRO[0000] error waiting for container: context canceled

 

我們可以看到可執行文件找不到的報錯,executable file not found。之前我們說過,跟在鏡像名後面的是 command【Usage: docker run [OPTIONS] IMAGE [COMMAND] [ARG…]】,運行時會替換 CMD 的默認值。因此這裏的 -i 替換了原來的 CMD,而不是添加在原來的 curl -s ifconfig.io 後面。而 -i 根本不是命令,所以自然找不到。

那麼如果我們希望加入 -i 這參數,我們就必須重新完整的輸入這個命令:

$ docker run myip curl -s ifconfig.io -i

 

這顯然不是很好的解決方案,而使用 ENTRYPOINT 就可以解決這個問題。現在我們重新用 ENTRYPOINT 來實現這個鏡像:

1 FROM centos:7.7.1908
2 ENTRYPOINT [ "curl", "-s", "ifconfig.io" ]

 

使用 docker build -t myip2 . 構建完成后,這次我們再來嘗試直接使用 docker run myip2 -i:

 1 $ docker run myip2 
 2 183.226.75.148
 3 
 4 $ docker run myip2 -i
 5 HTTP/1.1 200 OK
 6 Date: Sun, 19 Apr 2020 02:20:48 GMT
 7 Content-Type: text/plain; charset=utf-8
 8 Content-Length: 15
 9 Connection: keep-alive
10 Set-Cookie: __cfduid=d76a2e007bbe7ec2d230b0a6636d115151587262848; expires=Tue, 19-May-20 02:20:48 GMT; path=/; domain=.ifconfig.io; HttpOnly; SameSite=Lax
11 CF-Cache-Status: DYNAMIC
12 Server: cloudflare
13 CF-RAY: 586326015c3199a1-LAX
14 alt-svc: h3-27=":443"; ma=86400, h3-25=":443"; ma=86400, h3-24=":443"; ma=86400, h3-23=":443"; ma=86400
15 cf-request-id: 0231d614d9000099a1e10d7200000001
16 
17 183.226.75.148

 

可以看到,這次成功了。這是因為當存在 ENTRYPOINT 后,CMD 的內容將會作為參數傳給 ENTRYPOINT,而這裏 -i 就是新的 CMD,因此會作為參數傳給 curl,從而達到了我們預期的效果。

 

場景二:應用運行前的準備工作

啟動容器就是啟動主進程,但有些時候,啟動主進程前,需要一些準備工作。

比如 mysql 類的數據庫,可能需要一些數據庫配置、初始化的工作,這些工作要在最終的 mysql 服務器運行之前解決。

此外,可能希望避免使用 root 用戶去啟動服務,從而提高安全性,而在啟動服務前還需要以 root 身份執行一些必要的準備工作,最後切換到服務用戶身份啟動服務。或者除了服務外,其它命令依舊可以使用 root 身份執行,方便調試等。

這些準備工作是和容器 CMD 無關的,無論 CMD 是什麼,都需要事先進行一個預處理的工作。這種情況下,可以寫一個腳本,然後放入 ENTRYPOINT 中去執行,而這個腳本會將接到的參數(也就是 <CMD>)作為命令,在腳本最後執行。比如官方鏡像 redis 中就是這麼做的:

1 FROM alpine:3.4
2 ...
3 RUN addgroup -S redis && adduser -S -G redis redis
4 ...
5 ENTRYPOINT ["docker-entrypoint.sh"]
6 
7 EXPOSE 6379
8 CMD [ "redis-server" ]

 

可以看到其中為了 redis 服務創建了 redis 用戶,並在最後指定了 ENTRYPOINT 為 docker-entrypoint.sh 腳本。

1 #!/bin/sh
2 ...
3 # allow the container to be started with `--user`
4 if [ "$1" = 'redis-server' -a "$(id -u)" = '0' ]; then
5     chown -R redis .
6     exec su-exec redis "$0" "$@"
7 fi
8 
9 exec "$@"

 

該腳本的內容就是根據 CMD 的內容來判斷,如果是 redis-server 的話,則切換到 redis 用戶身份啟動服務器,否則依舊使用 root 身份執行。比如:

1 $ docker run -it redis id
2 uid=0(root) gid=0(root) groups=0(root)

 

CMD 容器啟動命令

CMD 指令的格式和 RUN 相似,也是兩種格式和一種特殊格式:

1 shell 格式:CMD <命令>
2 exec 格式:CMD ["可執行文件", "參數1", "參數2"...]
3 參數列表格式:CMD ["參數1", "參數2"...]。在指定了 ENTRYPOINT 指令后,用 CMD 指定具體的參數。

 

之前介紹容器的時候曾經說過,Docker 不是虛擬機,容器就是進程。既然是進程,那麼在啟動容器的時候,需要指定所運行的程序及參數。CMD 指令就是用於指定默認的容器主進程的啟動命令的。

在指令格式上,一般推薦使用 exec 格式,這類格式在解析時會被解析為 JSON 數組,因此一定要使用雙引號 “,而不要使用單引號。

如果使用 shell 格式的話,實際的命令會被包裝為 sh -c 的參數的形式進行執行。比如:

CMD echo $HOME

 

在實際執行中,會將其變更為:

CMD [ "sh", "-c", "echo $HOME" ]

 

這就是為什麼我們可以使用環境變量的原因,因為這些環境變量會被 shell 進行解析處理。

提到 CMD 就不得不提容器中應用在前台執行和後台執行的問題。這是初學者常出現的一個混淆。

Docker 不是虛擬機,容器中的應用都應該以前台執行,而不是像虛擬機、物理機裏面那樣,用 systemd 去啟動後台服務,容器內沒有後台服務的概念

對於容器而言,其啟動程序就是容器應用進程,容器就是為了主進程而存在的,主進程退出,容器就失去了存在的意義,從而退出,其它輔助進程不是它需要關心的東西。

一些初學者將 CMD 寫為:

CMD service nginx start

 

使用 service nginx start 命令,則是希望 upstart 來以後台守護進程形式啟動 nginx 服務。而剛才說了 CMD service nginx start 會被理解為 CMD [ “sh”, “-c”, “service nginx start”],因此主進程實際上是 sh。那麼當 service nginx start 命令結束后,sh 也就結束了,sh 作為主進程退出了,自然就會令容器退出。

正確的做法是直接執行 nginx 可執行文件,並且要求以前台形式運行。比如:

CMD ["nginx", "-g", "daemon off;"]

 

構建鏡像案例-Nginx

構建文件

 1 [root@docker01 make03]# pwd
 2 /root/docker_test/make03
 3 [root@docker01 make03]# ll
 4 total 12
 5 -rw-r--r-- 1 root root 720 Apr 19 16:46 Dockerfile
 6 -rw-r--r-- 1 root root  95 Apr 19 16:19 entrypoint.sh
 7 -rw-r--r-- 1 root root  22 Apr 19 16:18 index.html
 8 [root@docker01 make03]# cat Dockerfile   # Dockerfile文件
 9 # 基礎鏡像
10 FROM centos:7.7.1908
11 
12 # 維護者
13 MAINTAINER lightzhang lightzhang@xxx.com
14 
15 # 命令:做了些什麼操作
16 RUN echo 'nameserver 223.5.5.5' > /etc/resolv.conf && echo 'nameserver 223.6.6.6' >> /etc/resolv.conf
17 RUN curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo && curl -o /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-7.repo
18 RUN yum install -y nginx-1.16.1 && yum clean all
19 RUN echo "daemon off;" >> /etc/nginx/nginx.conf
20 
21 # 添加文件
22 COPY index.html /usr/share/nginx/html/index.html
23 COPY entrypoint.sh /usr/local/bin/entrypoint.sh
24 
25 # 對外暴露端口聲明
26 EXPOSE 80
27 
28 # 執行
29 ENTRYPOINT ["sh", "entrypoint.sh"]
30 
31 # 執行命令
32 CMD ["nginx"]
33 
34 [root@docker01 make03]# cat index.html   # 訪問文件
35 nginx in docker, html
36 [root@docker01 make03]# cat entrypoint.sh   # entrypoint 文件
37 #!/bin/bash
38 if [ "$1" = 'nginx' ]; then
39     exec nginx -c /etc/nginx/nginx.conf
40 fi
41 exec "$@"

 

構建鏡像

 1 [root@docker01 make03]# docker build -t base/nginx:1.16.1 .
 2 Sending build context to Docker daemon  4.608kB
 3 Step 1/11 : FROM centos:7.7.1908
 4  ---> 08d05d1d5859
 5 Step 2/11 : MAINTAINER lightzhang lightzhang@xxx.com
 6  ---> Using cache
 7  ---> 1dc29e78d94f
 8 Step 3/11 : RUN echo 'nameserver 223.5.5.5' > /etc/resolv.conf && echo 'nameserver 223.6.6.6' >> /etc/resolv.conf
 9  ---> Using cache
10  ---> 19398ad9b023
11 Step 4/11 : RUN curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo && curl -o /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-7.repo
12  ---> Using cache
13  ---> b2451c5856c5
14 Step 5/11 : RUN yum install -y nginx-1.16.1 && yum clean all
15  ---> Using cache
16  ---> 291f27cae4df
17 Step 6/11 : RUN echo "daemon off;" >> /etc/nginx/nginx.conf
18  ---> Using cache
19  ---> 115e07b6313e
20 Step 7/11 : COPY index.html /usr/share/nginx/html/index.html
21  ---> Using cache
22  ---> 9d714d2e2a84
23 Step 8/11 : COPY entrypoint.sh /usr/local/bin/entrypoint.sh
24  ---> Using cache
25  ---> b16983911b56
26 Step 9/11 : EXPOSE 80
27  ---> Using cache
28  ---> d8675d6c2d43
29 Step 10/11 : ENTRYPOINT ["sh", "entrypoint.sh"]
30  ---> Using cache
31  ---> 802a1a67db37
32 Step 11/11 : CMD ["nginx"]
33  ---> Using cache
34  ---> f2517b4d5510
35 Successfully built f2517b4d5510
36 Successfully tagged base/nginx:1.16.1

 

發布容器與端口查看

 1 [root@docker01 ~]# docker run -d -p 80:80 --name mynginx_v2 base/nginx:1.16.1   # 啟動容器
 2 50a45a0894d8669308de7c70d47c96db8cd8990d3e34d1d125e5289ed062f126
 3 [root@docker01 ~]# 
 4 [root@docker01 ~]# docker ps 
 5 CONTAINER ID   IMAGE               COMMAND                  CREATED         STATUS         PORTS                NAMES
 6 50a45a0894d8   base/nginx:1.16.1   "sh entrypoint.sh ng…"   3 minutes ago   Up 3 minutes   0.0.0.0:80->80/tcp   mynginx_v2
 7 [root@docker01 ~]# netstat -lntup  # 端口查看
 8 Active Internet connections (only servers)
 9 Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name    
10 tcp        0      0 127.0.0.1:25            0.0.0.0:*               LISTEN      1634/master         
11 tcp        0      0 0.0.0.0:111             0.0.0.0:*               LISTEN      1/systemd           
12 tcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN      1349/sshd           
13 tcp6       0      0 ::1:25                  :::*                    LISTEN      1634/master         
14 tcp6       0      0 :::111                  :::*                    LISTEN      1/systemd           
15 tcp6       0      0 :::80                   :::*                    LISTEN      13625/docker-proxy  
16 tcp6       0      0 :::8080                 :::*                    LISTEN      2289/docker-proxy   
17 tcp6       0      0 :::22                   :::*                    LISTEN      1349/sshd           
18 udp        0      0 0.0.0.0:1021            0.0.0.0:*                           847/rpcbind         
19 udp        0      0 0.0.0.0:111             0.0.0.0:*                           1/systemd           
20 udp6       0      0 :::1021                 :::*                                847/rpcbind         
21 udp6       0      0 :::111                  :::*                                1/systemd

 

瀏覽器訪問

http://172.16.1.31/

 

 

  

 

 

———END———
如果覺得不錯就關注下唄 (-^O^-) !

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※超省錢租車方案

※教你寫出一流的銷售文案?

網頁設計最專業,超強功能平台可客製化

分類
發燒車訊

Python 網絡爬蟲實戰:爬取 B站《全職高手》20萬條評論數據

本周我們的目標是:B站(嗶哩嗶哩彈幕網 https://www.bilibili.com )視頻評論數據。

我們都知道,B站有很多號稱“鎮站之寶”的視頻,擁有着數量極其恐怖的評論和彈幕。所以這次我們的目標就是,爬取B站視頻的評論數據,分析其為何會深受大家喜愛。

首先去調研一下,B站評論數量最多的視頻是哪一個。。。好在已經有大佬已經統計過了,我們來看一哈!

​【B站大數據可視化】B站評論數最多的視頻究竟是?來自 <https://www.bilibili.com/video/av34900167/>

 

嗯?《全職高手》,有點意思,第一集和最後一集分別佔據了評論數量排行榜的第二名和第一名,遠超了其他很多很火的番。那好,就拿它下手吧,看看它到底強在哪兒。

廢話不多說,先去B站看看這部神劇到底有多好看 https://www.bilibili.com/bangumi/play/ep107656

額,需要開通大會員才能觀看。。。

好吧,不看就不看,不過好在雖然視頻看不了,評論卻是可以看的。

感受到它的恐怖了嗎?63w6條的評論!9千多頁!果然是不同凡響啊。

接下來,我們就開始編寫爬蟲,爬取這些數據吧。

 

使用爬蟲爬取網頁一般分為四個階段:分析目標網頁,獲取網頁內容,提取關鍵信息,輸出保存。

1. 分析目標網頁

  • 首先觀察評論區結構,發現評論區為鼠標點擊翻頁形式,共 9399 頁,每一頁有 20 條評論,每條評論中包含 用戶名、評論內容、評論樓層、時間日期、點贊數等信息展示。

  • 接着我們按 F12 召喚出開發者工具,切換到Network。然後用鼠標點擊評論翻頁,觀察這個過程有什麼變化,並以此來制定我們的爬取策略。

  • 我們不難發現,整個過程中 URL 不變,說明評論區翻頁不是通過 URL 控制。而在每翻一頁的時候,網頁會向服務器發出這樣的請求(請看 Request URL)。

  • 點擊 Preview 欄,可以切換到預覽頁面,也就是說,可以看到這個請求返回的結果是什麼。下面是該請求返回的 json 文件,包含了在 replies 里包含了本頁的評論數據。在這個 json 文件里,我們可以發現,這裏面包含了太多的信息,除了網頁上展示的信息,還有很多沒展示出來的信息也有,簡直是挖到寶了。不過,我們這裏用不到,通通忽略掉,只挑我們關注的部分就好了。

2. 獲取網頁內容

網頁內容分析完畢,可以正式寫代碼爬了。

 1 import requests
 2 
 3 def fetchURL(url):
 4     '''
 5     功能:訪問 url 的網頁,獲取網頁內容並返回
 6     參數:
 7         url :目標網頁的 url
 8     返回:目標網頁的 html 內容
 9     '''
10     headers = {
11         'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
12         'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36',
13     }
14     
15     try:
16         r = requests.get(url,headers=headers)
17         r.raise_for_status()
18         print(r.url)
19         return r.text
20     except requests.HTTPError as e:
21         print(e)
22         print("HTTPError")
23     except requests.RequestException as e:
24         print(e)
25     except:
26         print("Unknown Error !")
27         
28 
29 if __name__ == '__main__':
30     url = 'https://api.bilibili.com/x/v2/reply?callback=jQuery172020326544171595695_1541502273311&jsonp=jsonp&pn=2&type=1&oid=11357166&sort=0&_=1541502312050'
31     html = fetchURL(url)
32     print(html)

不過,在運行過後,你會發現,403 錯誤,服務器拒絕了我們的訪問。

運行結果:

403 Client Error: Forbidden for url: https://api.bilibili.com/x/v2/reply?callback=jQuery172020326544171595695_1541502273311&jsonp=jsonp&pn=2&type=1&oid=11357166&sort=0&_=1541502312050
HTTPError
None

同樣的,這個請求放瀏覽器地址欄裏面直接打開,會變403,什麼也訪問不到。

這是我們本次爬蟲遇到的第一個坑。在瀏覽器中能正常返迴響應,但是直接打開請求鏈接時,卻會被服務器拒絕。(我第一反應是 cookie ,將瀏覽器中的 cookie 放入爬蟲的請求頭中,重新訪問,發現沒用),或許這也算是一個小的反爬蟲機制吧。

網上查閱資料之後,我找到了解決的方法(雖然不了解原理),原請求的 URL 參數如下:

callback = jQuery1720913511919053787_1541340948898
jsonp = jsonp
pn = 2
type = 1
oid = 11357166&sort=0
_ = 1541341035236

其中,真正有用的參數只有三個:pn(頁數),type(=1)和oid(視頻id)。刪除其餘不必要的參數之後,用新整理出的url去訪問,成功獲取到評論數據。

https://api.bilibili.com/x/v2/reply?type=1&oid=11357166&pn=2

然後,在主函數中,通過寫一個 for 循環,通過改變 pn 的值,獲取每一頁的評論數據。

1 if __name__ == '__main__':
2     for page in range(0,9400):
3         url = 'https://api.bilibili.com/x/v2/reply?type=1&oid=11357166&pn=' + str(page)
4         html = fetchURL(url)

 

3. 提取關鍵信息

通過 json 庫對獲取到的響應內容進行解析,然後提取我們需要的內容:樓層,用戶名,性別,時間,評價,點贊數,回複數。

 1 import json
 2 import time
 3 
 4 def parserHtml(html):
 5     '''
 6     功能:根據參數 html 給定的內存型 HTML 文件,嘗試解析其結構,獲取所需內容
 7     參數:
 8             html:類似文件的內存 HTML 文本對象
 9     '''
10     s = json.loads(html)
11 
12     for i in range(20):
13         comment = s['data']['replies'][i]
14 
15         # 樓層,用戶名,性別,時間,評價,點贊數,回複數
16         floor = comment['floor']
17         username = comment['member']['uname']
18         sex = comment['member']['sex']
19         ctime = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(comment['ctime']))
20         content = comment['content']['message']
21         likes = comment['like']
22         rcounts = comment['rcount']
23 
24         print('--'+str(floor) + ':' + username + '('+sex+')' + ':'+ctime)
25         print(content)
26         print('like : '+ str(likes) + '      ' + 'replies : ' + str(rcounts))
27         print('  ')
部分運行結果如下:
--204187:day可可鈴(保密):2018-11-05 18:16:22
太太又出本了,這次真的木錢了(´;ω;`)
like : 1      replies : 0
  
--204186:長夜未央233(女):2018-11-05 16:24:52
12區打卡
like : 2      replies : 0
  
--204185:果然還是人渣一枚(男):2018-11-05 13:48:09
貌似忘來了好幾天
like : 1      replies : 1
  
--204183:day可可鈴(保密):2018-11-05 13:12:38
要準備去學校了,萬惡的期中考試( ´_ゝ`)
like : 2      replies : 0
  
--204182:拾秋以恭弘=叶 恭弘(保密):2018-11-05 12:04:19
11月5日打卡( ̄▽ ̄)
like : 1      replies : 0
  
--204181:芝米士噠(女):2018-11-05 07:53:43
這次是真的錯過了一個億[蛆音娘_扶額]
like : 2      replies : 1

4. 保存輸出

我們把這些數據以 csv 的格式保存於本地,即完成了本次爬蟲的全部任務。下面附上爬蟲的全部代碼。

  1 import requests
  2 import json
  3 import time
  4 
  5 def fetchURL(url):
  6     '''
  7     功能:訪問 url 的網頁,獲取網頁內容並返回
  8     參數:
  9         url :目標網頁的 url
 10     返回:目標網頁的 html 內容
 11     '''
 12     headers = {
 13         'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
 14         'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36',
 15     }
 16     
 17     try:
 18         r = requests.get(url,headers=headers)
 19         r.raise_for_status()
 20         print(r.url)
 21         return r.text
 22     except requests.HTTPError as e:
 23         print(e)
 24         print("HTTPError")
 25     except requests.RequestException as e:
 26         print(e)
 27     except:
 28         print("Unknown Error !")
 29         
 30 
 31 def parserHtml(html):
 32     '''
 33     功能:根據參數 html 給定的內存型 HTML 文件,嘗試解析其結構,獲取所需內容
 34     參數:
 35             html:類似文件的內存 HTML 文本對象
 36     '''
 37     try:
 38         s = json.loads(html)
 39     except:
 40         print('error')
 41         
 42     commentlist = []
 43     hlist = []
 44 
 45     hlist.append("序號")
 46     hlist.append("名字")
 47     hlist.append("性別")
 48     hlist.append("時間")
 49     hlist.append("評論")
 50     hlist.append("點贊數")
 51     hlist.append("回複數")
 52 
 53     #commentlist.append(hlist)
 54 
 55     # 樓層,用戶名,性別,時間,評價,點贊數,回複數
 56     for i in range(20):
 57         comment = s['data']['replies'][i]
 58         blist = []
 59 
 60         floor = comment['floor']
 61         username = comment['member']['uname']
 62         sex = comment['member']['sex']
 63         ctime = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(comment['ctime']))
 64         content = comment['content']['message']
 65         likes = comment['like']
 66         rcounts = comment['rcount']
 67 
 68         blist.append(floor)
 69         blist.append(username)
 70         blist.append(sex)
 71         blist.append(ctime)
 72         blist.append(content)
 73         blist.append(likes)
 74         blist.append(rcounts)
 75 
 76         commentlist.append(blist)
 77 
 78     writePage(commentlist)
 79     print('---'*20)
 80 
 81 def writePage(urating):
 82     '''
 83         Function : To write the content of html into a local file
 84         html : The response content
 85         filename : the local filename to be used stored the response
 86     '''
 87     
 88     import pandas as pd
 89     dataframe = pd.DataFrame(urating)
 90     dataframe.to_csv('Bilibili_comment5-1000條.csv', mode='a', index=False, sep=',', header=False)
 91 
 92 
 93 if __name__ == '__main__':
 94     for page in range(0,9400):
 95         url = 'https://api.bilibili.com/x/v2/reply?type=1&oid=11357166&pn=' + str(page)
 96         html = fetchURL(url)
 97         parserHtml(html)
 98 
 99         # 為了降低被封ip的風險,每爬20頁便歇5秒。
100         if page%20 == 0:
101             time.sleep(5)

 

寫在最後

在爬取過程中,還是遇到了很多的小坑的。

1. 請求的 url 不能直接用,需要對參數進行篩選整理后才能訪問。

2. 爬取過程其實並不順利,因為如果爬取期間如果有用戶發表評論,則請求返回的響應會為空導致程序出錯。所以在實際爬取過程中,記錄爬取的位置,以便出錯之後從該位置繼續爬。(並且,挑選深夜一兩點這種發帖人數少的時間段,可以極大程度的減少程序出錯的機率)

3. 爬取到的數據有多處不一致,其實這個不算是坑,不過這裏還是講一下,免得產生困惑。

        a. 就是評論區樓層只到了20多萬,但是評論數量卻有63萬多條,這個不一致主要是由於B站的評論是可以回復的,回復的評論也會計算到總評論數里。我們這裏只爬樓層的評論,而評論的回復則忽略,只統計回複數即可。

        b. 評論區樓層在20萬條左右,但是我們最後爬取下來的數據只有18萬條左右,反覆檢查爬蟲程序及原網站后發現,這個屬於正常現象,因為有刪評論的情況,評論刪除之後,後面的樓層並不會重新排序,而是就這樣把刪掉的那層空下了。導致樓層數和評論數不一致。

 

 

 如果文章中有哪裡沒有講明白,或者講解有誤的地方,歡迎在評論區批評指正,或者掃描下面的二維碼,加我微信,大家一起學習交流,共同進步。

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※教你寫出一流的銷售文案?

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※回頭車貨運收費標準

※別再煩惱如何寫文案,掌握八大原則!

※超省錢租車方案

※產品缺大量曝光嗎?你需要的是一流包裝設計!