分類
發燒車訊

荷研發植物製塑膠瓶 一年內可生物分解

摘錄自2020年7月21日公視報導

荷蘭一間生化公司開發出以植物原料製造的塑膠瓶,使用完不僅可以回收,它還能在一年內自行分解。

荷蘭生化公司技術總監古魯特表示:「而且因為從隔離膜的角度來看,PEF(生質聚酯塑膠)性能確實很好,因為紙瓶的優勢來自於紙質製造,只需要一層薄薄的PEF即可實現阻隔(液體)的性能,能好好將內容物妥善保存一段時間。」

這種植物塑膠,是由玉米、小麥,和甜菜根作成,可以用來盛裝包括氣泡型的飲料,能大幅減少塑膠污染,跟市場對化石燃料的依賴。經過將瓶子放入淡水、鹽水、泥土跟沉澱物的實驗證明,這種塑膠經過堆肥處理後,一年內就可以完全分解,就算放在戶外,也只要幾年時間就能分解。

生化公司認為這些瓶子,可以回收再利用,因此爭取到了美國可口可樂公司,和丹麥啤酒製造商「嘉士伯」的支持,持續開發。由於植物塑膠的製作成本,生化公司了解一開始無法在價格上占有市場優勢,因此預計先每年生產5000公噸,預計將在2023年前,會與飲料公司合作,讓產品上架。

污染治理
國際新聞
荷蘭
可生物分解
廢棄物

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※教你寫出一流的銷售文案?

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※回頭車貨運收費標準

※別再煩惱如何寫文案,掌握八大原則!

※超省錢租車方案

※產品缺大量曝光嗎?你需要的是一流包裝設計!

分類
發燒車訊

研究:提高空調能源效率、改用氣候友善製冷劑 將可省下數千億噸碳排

環境資訊中心綜合外電;姜唯 編譯;林大利 審校

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※超省錢租車方案

※別再煩惱如何寫文案,掌握八大原則!

※回頭車貨運收費標準

※教你寫出一流的銷售文案?

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

分類
發燒車訊

在k8s上部署日誌系統elfk

日誌系統elfk

前言

經過上周的技術預研,在本周一通過開會研究,根據公司的現有業務流量和技術棧,決定選擇的日誌系統方案為:elasticsearch(es)+logstash(lo)+filebeat(fi)+kibana(ki)組合。es選擇使用aliyun提供的es,lo&fi選擇自己部署,ki是阿里雲送的。因為申請ecs需要一定的時間,暫時選擇部署在測試&生產環境(吐槽一下,我司測試和生產公用一套k8s並且託管與aliyun……)。用時一天(前期有部署的差不多過)完成在kubernetes上部署完成elfk(先部署起來再說,優化什麼的後期根據需要再搞)。

組件簡介

es 是一個實時的、分佈式的可擴展的搜索引擎,允許進行全文、結構化搜索,它通常用於索引和搜索大量日誌數據,也可用於搜索許多不同類型的文。

lo 主要的有點就是它的靈活性,主要因為它有很多插件,詳細的文檔以及直白的配置格式讓它可以在多種場景下應用。我們基本上可以在網上找到很多資源,幾乎可以處理任何問題。

作為 Beats 家族的一員,fi 是一個輕量級的日誌傳輸工具,它的存在正彌補了 lo 的缺點fi作為一個輕量級的日誌傳輸工具可以將日誌推送到中心lo。

ki是一個分析和可視化平台,它可以瀏覽、可視化存儲在es集群上排名靠前的日誌數據,並構建儀錶盤。ki結合es操作簡單集成了絕大多數es的API,是專業的日誌展示應用。

數據採集流程圖

日誌流向:logs_data—> fi —> lo —> es—> ki。

logs_data通過fi收集日誌,輸出到lo,通過lo做一些過濾和修改之後傳送到es數據庫,ki讀取es數據庫做分析。

部署

根據我司的實際集群狀況,此文檔部署將完全還原日誌系統的部署情況。

在本地MAC安裝kubectl連接aliyun託管k8s

在客戶端(隨便本地一台虛機上)安裝和託管的k8s一樣版本的kubectl

curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.14.8/bin/linux/amd64/kubectl   
chmod +x ./kubectl 
mv ./kubectl /usr/local/bin/kubectl  
將阿里雲託管的k8s的kubeconfig 複製到$HOME/.kube/config 目錄下,注意用戶權限的問題
部署ELFK

申請一個名稱空間(一般一個項目一個名稱空間)。

# cat kube-logging.yaml 
apiVersion: v1
kind: Namespace
metadata:
  name: loging

部署es。網上找個差不多的資源清單,根據自己的需求進行適當的修改,運行,出錯就根據日誌進行再修改。

# cat elasticsearch.yaml 
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: local-class
  namespace: loging
provisioner: kubernetes.io/no-provisioner
volumeBindingMode: WaitForFirstConsumer
# Supported policies: Delete, Retain
reclaimPolicy: Delete
---
kind: PersistentVolume
apiVersion: v1
metadata:
  name: datadir1
  namespace: logging
  labels:
    type: local
spec:
  storageClassName: local-class
  capacity:
    storage: 5Gi
  accessModes:
    - ReadWriteOnce
  hostPath:
    path: "/data/data1"
--- 
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: elasticsearch
  namespace: loging
spec:
  serviceName: elasticsearch
  selector:
    matchLabels:
      app: elasticsearch
  template:
    metadata:
      labels:
        app: elasticsearch
    spec:
      containers:
      - name: elasticsearch
        image: elasticsearch:7.3.1
        resources:
            limits:
              cpu: 1000m
            requests:
              cpu: 100m
        ports:
        - containerPort: 9200
          name: rest
          protocol: TCP
        - containerPort: 9300
          name: inter-node
          protocol: TCP
        volumeMounts:
        - name: data
          mountPath: /usr/share/elasticsearch/data
        env:
          - name: "discovery.type"
            value: "single-node"
          - name: cluster.name
            value: k8s-logs
          - name: node.name
            valueFrom:
              fieldRef:
                fieldPath: metadata.name
          - name: ES_JAVA_OPTS
            value: "-Xms512m -Xmx512m"
      initContainers:
      - name: fix-permissions
        image: busybox
        command: ["sh", "-c", "chown -R 1000:1000 /usr/share/elasticsearch/data"]
        securityContext:
          privileged: true
        volumeMounts:
        - name: data
          mountPath: /usr/share/elasticsearch/data
      - name: increase-vm-max-map
        image: busybox
        command: ["sysctl", "-w", "vm.max_map_count=262144"]
        securityContext:
          privileged: true
      - name: increase-fd-ulimit
        image: busybox
        command: ["sh", "-c", "ulimit -n 65536"]
        securityContext:
          privileged: true
  volumeClaimTemplates:
  - metadata:
      name: data
      labels:
        app: elasticsearch
    spec:
      accessModes: [ "ReadWriteOnce" ]
      storageClassName: "local-class"
      resources:
        requests:
          storage: 5Gi
---
kind: Service
apiVersion: v1
metadata:
  name: elasticsearch
  namespace: loging
  labels:
    app: elasticsearch
spec:
  selector:
    app: elasticsearch
  clusterIP: None
  ports:
    - port: 9200
      name: rest
    - port: 9300
      name: inter-node

部署ki。因為根據數據採集流程圖,ki是和es結合的,配置相對簡單。

# cat kibana.yaml 
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: kibana
  namespace: loging
  labels:
    k8s-app: kibana
spec:
  replicas: 1
  selector:
    matchLabels:
      k8s-app: kibana
  template:
    metadata:
      labels:
        k8s-app: kibana
    spec:
      containers:
      - name: kibana
        image: kibana:7.3.1
        resources:
          limits:
            cpu: 1
            memory: 500Mi
          requests:
            cpu: 0.5
            memory: 200Mi
        env:
          - name: ELASTICSEARCH_HOSTS
#注意value是es的services,因為es是有狀態,用的無頭服務,所以連接的就不僅僅是pod的名字了
            value: http://elasticsearch:9200   
        ports:
        - containerPort: 5601
          name: ui
          protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
  name: kibana
  namespace: loging
spec:
  ports:
  - port: 5601
    protocol: TCP
    targetPort: ui
  selector:
    k8s-app: kibana

配置ingress-controller。因為我司用的是阿里雲託管的k8s自帶的nginx-ingress,並且配置了強制轉換https。所以kibana-ingress也要配成https。

# openssl genrsa -out tls.key 2048
# openssl req -new -x509 -key tls.key -out tls.crt -subj /C=CN/ST=Beijing/L=Beijing/O=DevOps/CN=kibana.test.realibox.com
# kubectl create secret tls kibana-ingress-secret --cert=tls.crt --key=tls.key

kibana-ingress配置如下。提供兩種,一種是https,一種是http。

https:
# cat kibana-ingress.yaml 
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  name: kibana
  namespace: loging
spec:
  tls:
  - hosts:
    - kibana.test.realibox.com
    secretName: kibana-ingress-secret
  rules:
  - host: kibana.test.realibox.com
    http:
      paths:
      - path: /
        backend:
          serviceName: kibana
          servicePort: 5601

http:
# cat kibana-ingress.yaml 
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  name: kibana
  namespace: loging
spec:
  rules:
  - host: kibana.test.realibox.com
    http:
      paths:
      - path: /
        backend:
          serviceName: kibana
          servicePort: 5601

部署lo。因為lo的作用是對fi收集到的日誌進行過濾,需要根據不同的日誌做不同的處理,所以可能要經常性的進行改動,要進行解耦。所以選擇以configmap的形式進行掛載。

# cat logstash.yaml
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: logstash
  namespace: loging
spec:
  replicas: 1
  selector:
    matchLabels:
      app: logstash
  template:
    metadata:
      labels:
        app: logstash
    spec:
      containers:
      - name: logstash
        image: elastic/logstash:7.3.1
        volumeMounts:
        - name: config
          mountPath: /opt/logstash/config/containers.conf
          subPath: containers.conf
        command:
        - "/bin/sh"
        - "-c"
        - "/opt/logstash/bin/logstash -f /opt/logstash/config/containers.conf"
      volumes:
      - name: config
        configMap:
          name: logstash-k8s-config
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: logstash
  name: logstash
  namespace: loging
spec:
  ports:
    - port: 8080       
      targetPort: 8080
  selector:
    app: logstash
  type: ClusterIP

# cat logstash-config.yaml 
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: logstash
  name: logstash
  namespace: loging
spec:
  ports:
    - port: 8080       
      targetPort: 8080
  selector:
    app: logstash
  type: ClusterIP
---

apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-k8s-config
  namespace: loging
data:
  containers.conf: |
    input {
      beats {
        port => 8080  #filebeat連接端口
      }
    }
    output {
      elasticsearch {
        hosts => ["elasticsearch:9200"]  #es的service
        index => "logstash-%{+YYYY.MM.dd}"
      }
    }
注意:修改configmap 相當於修改鏡像。必須重新apply 應用資源清單才能生效。根據數據採集流程圖,lo的數據由fi流入,流向es。

部署fi。fi的主要作用是進行日誌的採集,然後將數據交給lo。

# cat filebeat.yaml 
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: filebeat-config
  namespace: loging
  labels:
    app: filebeat
data:
  filebeat.yml: |-
    filebeat.config:
      inputs:
        # Mounted `filebeat-inputs` configmap:
        path: ${path.config}/inputs.d/*.yml
        # Reload inputs configs as they change:
        reload.enabled: false
      modules:
        path: ${path.config}/modules.d/*.yml
        # Reload module configs as they change:
        reload.enabled: false
    # To enable hints based autodiscover, remove `filebeat.config.inputs` configuration and uncomment this:
    #filebeat.autodiscover:
    #  providers:
    #    - type: kubernetes
    #      hints.enabled: true
    output.logstash:
      hosts: ['${LOGSTASH_HOST:logstash}:${LOGSTASH_PORT:8080}']   #流向lo
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: filebeat-inputs
  namespace: loging
  labels:
    app: filebeat
data:
  kubernetes.yml: |-
    - type: docker
      containers.ids:
      - "*"
      processors:
        - add_kubernetes_metadata:
            in_cluster: true
---
apiVersion: extensions/v1beta1
kind: DaemonSet
metadata:
  name: filebeat
  namespace: loging
  labels:
    app: filebeat
spec:
  selector:
    matchLabels:
      app: filebeat
  template:
    metadata:
      labels:
        app: filebeat
    spec:
      serviceAccountName: filebeat
      terminationGracePeriodSeconds: 30
      containers:
      - name: filebeat
        image: elastic/filebeat:7.3.1
        args: [
          "-c", "/etc/filebeat.yml",
          "-e",
        ]
        env:   #注入變量
        - name: LOGSTASH_HOST
          value: logstash
        - name: LOGSTASH_PORT
          value: "8080"
        securityContext:
          runAsUser: 0
          # If using Red Hat OpenShift uncomment this:
          #privileged: true
        resources:
          limits:
            memory: 200Mi
          requests:
            cpu: 100m
            memory: 100Mi
        volumeMounts:
        - name: config
          mountPath: /etc/filebeat.yml
          readOnly: true
          subPath: filebeat.yml
        - name: inputs
          mountPath: /usr/share/filebeat/inputs.d
          readOnly: true
        - name: data
          mountPath: /usr/share/filebeat/data
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
      volumes:
      - name: config
        configMap:
          defaultMode: 0600
          name: filebeat-config
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers
      - name: inputs
        configMap:
          defaultMode: 0600
          name: filebeat-inputs
      # data folder stores a registry of read status for all files, so we don't send everything again on a Filebeat pod restart
      - name: data
        hostPath:
          path: /var/lib/filebeat-data
          type: DirectoryOrCreate
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
  name: filebeat
subjects:
- kind: ServiceAccount
  name: filebeat
  namespace: loging
roleRef:
  kind: ClusterRole
  name: filebeat
  apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
  name: filebeat
  labels:
    app: filebeat
rules:
- apiGroups: [""] # "" indicates the core API group
  resources:
  - namespaces
  - pods
  verbs:
  - get
  - watch
  - list
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: filebeat
  namespace: loging
  labels:
    app: filebeat
---

至此完成在k8s上部署es+lo+fi+ki ,進行簡單驗證。

驗證

查看svc、pod、ingress信息

# kubectl get svc,pods,ingress -n loging
NAME                    TYPE        CLUSTER-IP        EXTERNAL-IP   PORT(S)             AGE
service/elasticsearch   ClusterIP   None              <none>        9200/TCP,9300/TCP   151m
service/kibana          ClusterIP   xxx.168.239.2xx   <none>        5601/TCP            20h
service/logstash        ClusterIP   xxx.168.38.1xx   <none>        8080/TCP            122m

NAME                            READY   STATUS    RESTARTS   AGE
pod/elasticsearch-0             1/1     Running   0          151m
pod/filebeat-24zl7              1/1     Running   0          118m
pod/filebeat-4w7b6              1/1     Running   0          118m
pod/filebeat-m5kv4              1/1     Running   0          118m
pod/filebeat-t6x4t              1/1     Running   0          118m
pod/kibana-689f4bd647-7jrqd     1/1     Running   0          20h
pod/logstash-76bc9b5f95-qtngp   1/1     Running   0          122m

NAME                        HOSTS                       ADDRESS        PORTS     AGE
ingress.extensions/kibana   kibana.test.realibox.com   xxx.xx.xx.xxx   80, 443   19h
web配置

配置索引

發現

至此算是簡單完成。後續需要不斷優化,不過那是後事了。

問題總結

這應該算是第一次親自在測試&生產環境部署應用了,而且是自己很不熟悉的日子系統,遇到了很多問題,需要總結。

  1. 如何調研一項技術棧;
  2. 如何選定方案;
  3. 因為網上幾乎沒有找到類似的方案(也不曉得別的公司是怎麼搞的,反正網上找不到有效的可能借鑒的)。需要自己根據不同的文檔總結嘗試;
  4. 一個組件的標籤盡可能一致;
  5. 如何查看公司是否做了端口限制和https強制轉換;
  6. 遇到IT的事一定要看日誌,這點很重要,日誌可以解決絕大多數問題;
  7. 一個人再怎麼整也會忽略一些點,自己先嘗試然後請教朋友,共同進步。
  8. 項目先上線再說別的,目前是這樣,一件事又百分之20的把握就可以去做了。百分之80再去做就沒啥意思了。
  9. 自學重點學的是理論,公司才能學到操作。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

※推薦評價好的iphone維修中心

分類
發燒車訊

RocketMQ系列(四)順序消費

折騰了好長時間才寫這篇文章,順序消費,看上去挺好理解的,就是消費的時候按照隊列中的順序一個一個消費;而併發消費,則是消費者同時從隊列中取消息,同時消費,沒有先後順序。RocketMQ也有這兩種方式的實現,但是在實踐的過程中,就是不能順序消費,好不容易能夠實現順序消費了,發現採用併發消費的方式,消費的結果也是順序的,頓時就蒙圈了,到底怎麼回事?哪裡出了問題?百思不得其解。

經過多次調試,查看資料,debug跟蹤程序,最後終於搞清楚了,但是又不知道怎麼去寫這篇文章,是按部就班的講原理,講如何配置到最後實現,還是按照我的調試過程去寫呢?我覺得還是按照我的調試過程去寫這篇文章吧,因為我的調成過程應該和大多數人的理解思路是一致的,大家也更容易重視。

環境回顧

我們先來回顧一下前面搭建的RocketMQ的環境,這對於我們理解RocketMQ的順序消費是至關重要的。我們的RocketMQ環境是一個兩主兩從的異步集群,其中有兩個broker,broker-a和broker-b,另外,我們創建了兩個Topic,“cluster-topic”,這個Topic我們在創建的時候指定的是集群,也就是說我們發送消息的時候,如果Topic指定為“cluster-topic”,那麼這個消息應該在broker-a和broker-b之間負載;另外創建的一個Topic是“broker-a-topic”,這個Topic我們在創建的時候指定的是broker-a,當我們發送這個Topic的消息時,這個消息只會在broker-a當中,不會出現在broker-b中。

和大家羅嗦了這麼多,大家只要記住,我們的環境中有兩個broker,“broker-a”和“broker-b”,有兩個Topic,“cluster-topic”和“broker-a-topic”就可以了。

cluster-topic可以順序消費嗎

我們發送的消息,如果指定Topic為“cluster-topic”,那麼這種消息將在broker-a和broker-b直接負載,這種情況能夠做到順序消費嗎?我們試驗一下,

消費端的代碼如下:

@Bean(name = "pushConsumerOrderly", initMethod = "start",destroyMethod = "shutdown")
public DefaultMQPushConsumer pushConsumerOrderly() throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("pushConsumerOrderly");
    consumer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;");
    consumer.subscribe("cluster-topic","*");
    consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
        Random random = new Random();
        try {
            Thread.sleep(random.nextInt(5) * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (MessageExt msg : msgs) {
            System.out.println(new String(msg.getBody()));
        }
        return ConsumeOrderlyStatus.SUCCESS;
    });
    return consumer;
}
  • 消費者組的名稱,連接的NameServer,訂閱的Topic,這裏就不多說了;
  • 再來看一下註冊的消息監聽器,它是MessageListenerOrderly,順序消費,具體實現里我們打印出了消息體的內容,最後返回消費成功ConsumeOrderlyStatus.SUCCESS。
  • 重點看一下打印語句之前的隨機休眠,這是非常重要的一步,它可以驗證消息是否是順序消費的,如果消費者是消費完一個消息以後,再去取下一個消息,那麼順序是沒有問題,但是如果消費者是併發地取消息,但是每個消費者的休眠時間又不一樣,那麼打印出來的就是亂序

生產端我們採用同步發送的方式,代碼如下:

@Test
public void producerTest() throws Exception {

    for (int i = 0;i<5;i++) {
        Message message = new Message();
        message.setTopic("cluster-topic");
        message.setKeys("key-"+i);
        message.setBody(("this is simpleMQ,my NO is "+i+"---"+new Date()).getBytes());

        SendResult sendResult = defaultMQProducer.send(message);
        System.out.println("i=" + i);
        System.out.println("BrokerName:" + sendResult.getMessageQueue().getBrokerName());
    }
}

和前面一樣,我們發送5個消息,並且打印出i的值和broker的名稱,發送消息的順序是0,1,2,3,4,發送完成后,我們觀察一下消費端的日誌,如果順序也是0,1,2,3,4,那麼就是順序消費。我們運行一下,看看結果吧。

生產者的發送日誌如下:

i=0
BrokerName:broker-a
i=1
BrokerName:broker-a
i=2
BrokerName:broker-a
i=3
BrokerName:broker-a
i=4
BrokerName:broker-b

發送5個消息,其中4個在broker-a,1個在broker-b。再來看看消費端的日誌:

this is simpleMQ,my NO is 3---Wed Jun 10 13:48:57 CST 2020
this is simpleMQ,my NO is 2---Wed Jun 10 13:48:57 CST 2020
this is simpleMQ,my NO is 4---Wed Jun 10 13:48:57 CST 2020
this is simpleMQ,my NO is 1---Wed Jun 10 13:48:57 CST 2020
this is simpleMQ,my NO is 0---Wed Jun 10 13:48:56 CST 2020

順序是亂的?怎麼回事?說明消費者在並不是一個消費完再去消費另一個,而是拉取了一個消息以後,並沒有消費完就去拉取下一個消息了,那這不是併發消費嗎?可是我們程序中設置的是順序消費啊。這裏我們就開始懷疑是broker的問題,難道是因為兩個broker引起的?順序消費只能在一個broker里才能實現嗎?那我們使用broker-a-topic這個試一下吧。

broker-a-topic可以順序消費嗎?

我們把上面的程序稍作修改,只把訂閱的Topic和發送消息時消息的Topic改為broker-a-topic即可。代碼在這裏就不給大家重複寫了,重啟一下程序,發送消息看看日誌吧。

生產者端的日誌如下:

i=0
BrokerName:broker-a
i=1
BrokerName:broker-a
i=2
BrokerName:broker-a
i=3
BrokerName:broker-a
i=4
BrokerName:broker-a

我們看到5個消息都發送到了broker-a中,再來看看消費端的日誌,

this is simpleMQ,my NO is 0---Wed Jun 10 14:00:28 CST 2020
this is simpleMQ,my NO is 2---Wed Jun 10 14:00:29 CST 2020
this is simpleMQ,my NO is 3---Wed Jun 10 14:00:29 CST 2020
this is simpleMQ,my NO is 4---Wed Jun 10 14:00:29 CST 2020
this is simpleMQ,my NO is 1---Wed Jun 10 14:00:29 CST 2020

消費的順序還是亂的,這是怎麼回事?消息都在broker-a中了,為什麼消費時順序還是亂的?程序有問題嗎?review了好幾遍沒有發現問題。

問題排查

問題卡在這個地方,卡了好長時間,最後在官網的示例中發現,它在發送消息時,使用了一個MessageQueueSelector,我們也實現一下試試吧,改造一下發送端的程序,如下:

SendResult sendResult = defaultMQProducer.send(message, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        return mqs.get(0);
    }
},i);

在發送的方法中,我們實現了MessageQueueSelector接口中的select方法,這個方法有3個參數,mq的集合,發送的消息msg,和我們傳入的參數,這個參數就是最後的那個變量i,大家不要漏了。這個select方法需要返回的是MessageQueue,也就是mqs變量中的一個,那麼mqs中有多少個MessageQueue呢?我們猜測是2個,因為我們只有broker-a和broker-b,到底是不是呢?我們打斷點看一下,

MessageQueue有8個,並且brokerName都是broker-a,原來Broker和MessageQueue不是相同的概念,之前我們都理解錯了。我們可以用下面的方式理解,

集群 ——–》 Broker ————》 MessageQueue

一個RocketMQ集群里可以有多個Broker,一個Broker里可以有多個MessageQueue,默認是8個。

那現在對於順序消費,就有了正確的理解了,順序消費是只在一個MessageQueue內,順序消費,我們驗證一下吧,先看看發送端的日誌,

i=0
BrokerName:broker-a
i=1
BrokerName:broker-a
i=2
BrokerName:broker-a
i=3
BrokerName:broker-a
i=4
BrokerName:broker-a

5個消息都發送到了broker-a中,通過前面的改造程序,這5個消息應該都是在MessageQueue-0當中,再來看看消費端的日誌,

this is simpleMQ,my NO is 0---Wed Jun 10 14:21:40 CST 2020
this is simpleMQ,my NO is 1---Wed Jun 10 14:21:41 CST 2020
this is simpleMQ,my NO is 2---Wed Jun 10 14:21:41 CST 2020
this is simpleMQ,my NO is 3---Wed Jun 10 14:21:41 CST 2020
this is simpleMQ,my NO is 4---Wed Jun 10 14:21:41 CST 2020

這回是順序消費了,每一個消費者都是等前面的消息消費完以後,才去消費下一個消息,這就完全解釋的通了,我們再把消費端改成併發消費看看,如下:

@Bean(name = "pushConsumerOrderly", initMethod = "start",destroyMethod = "shutdown")
public DefaultMQPushConsumer pushConsumerOrderly() throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("pushConsumerOrderly");
    consumer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;");
    consumer.subscribe("broker-a-topic","*");
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
        Random random = new Random();
        try {
            Thread.sleep(random.nextInt(5) * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (MessageExt msg : msgs) {
            System.out.println(new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    return consumer;
}

這回使用的是併發消費,我們再看看結果,

i=0
BrokerName:broker-a
i=1
BrokerName:broker-a
i=2
BrokerName:broker-a
i=3
BrokerName:broker-a
i=4
BrokerName:broker-a

5個消息都在broker-a中,並且知道它們都在同一個MessageQueue中,再看看消費端,

this is simpleMQ,my NO is 1---Wed Jun 10 14:28:00 CST 2020
this is simpleMQ,my NO is 0---Wed Jun 10 14:28:00 CST 2020
this is simpleMQ,my NO is 3---Wed Jun 10 14:28:00 CST 2020
this is simpleMQ,my NO is 2---Wed Jun 10 14:28:00 CST 2020
this is simpleMQ,my NO is 4---Wed Jun 10 14:28:00 CST 2020

是亂序的,說明消費者是併發的消費這些消息的,即使它們在同一個MessageQueue中。

總結

好了,到這裏終於把順序消費搞明白了,其中的關鍵就是Broker中還有多個MessageQueue,同一個MessageQueue中的消息才能順序消費。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

網頁設計最專業,超強功能平台可客製化

分類
發燒車訊

.Net Core實戰之基於角色的訪問控制的設計,.Net微服務實戰之技術架構分層篇,.Net微服務實戰之技術選型篇

前言

  上個月,我寫了兩篇微服務的文章:《.Net微服務實戰之技術架構分層篇》與《.Net微服務實戰之技術選型篇》,微服務系列原有三篇,當我憋第三篇的內容時候一直沒有靈感,因此先打算放一放。

  本篇文章與源碼原本打算實在去年的時候完成併發布的,然而我一直忙於公司項目的微服務的實施,所以該篇文章一拖再拖。如今我花了點時間整理了下代碼,並以此篇文章描述整個實現思路,並開放了源碼給予需要的人一些參考。

  源碼:https://github.com/SkyChenSky/Sikiro.RBAC

RBAC

  Role-Based Access Contro翻譯成中文就是基於角色的訪問控制,文章以下我都用他的簡稱RBAC來描述。

  現信息系統的權限控制大多數採取RBAC的思想進行實現,其本質思想是對系統各種的操作權限不是直接授予具體的某個用戶,而是在用戶集合與權限集合之間建立一個角色,作為間接關聯。每一種角色對應一組相應的權限。一旦用戶被分配了適當的角色后,該用戶就擁有此角色的所有操作權限。

  通過以上的描述,我們可以分析出以下信息:

  •   用戶與權限是通過角色間接關聯的
  •   角色的本質就是權限組(權限集合)

  這樣做的好處在於,不必在每次創建用戶時都進行分配權限的操作,只要分配用戶相應的角色即可,而且角色的權限變更比用戶的權限變更要少得多,這樣將簡化用戶的權限管理,減少系統的開銷。

  

功能分析

權限分類

從權限的作用可以分為三種,功能權限、訪問權限、數據權限

  • 功能權限
    • 功能權限指系統用戶允許在頁面進行按鈕操作的權限。如果有權限則功能按鈕展示,否則隱藏。
  • 訪問權限
    • 訪問權限指系統用戶通過點擊按鈕後進行地址的請求訪問的權限(地址跳轉與接口請求),如果無權限訪問,則由頁面提示無權限訪問。
  • 數據權限
    • 數據權限指用戶可訪問系統的數據權限,不同的用戶可以訪問不同的數據粒度。

數據權限的實現可大可小,大可大到對條件進行動態配置,小可小到只針對某個維度進行硬編碼。不納入這次的討論範圍。

用例圖

非功能性需求

  時效性,直接影響到安全性,既然是權限控制,那麼理應一修改權限后就立刻生效。曾經有同行問過我,是不是每一個請求都得去查一次數據庫是否滿足權限,如果是,數據庫壓力豈不是很大?

  安全性,每一個頁面跳轉,每一個讀寫請求都的進行一次權限驗證,不滿足的權限的功能按鈕就不需要渲染,避免樣式display:none的情況。

  開發效率,權限控制理應是框架層面的,因此盡可能作為非業務的侵入性,讓開發人員保持原有的數據善增改查與頁面渲染。

技術選型

LayUI

  學習門檻極低,開箱即用。其外在極簡,卻又不失飽滿的內在,體積輕盈,組件豐盈,從核心代碼到 API 的每一處細節都經過精心雕琢,非常適合界面的快速開發,它更多是為服務端程序員量身定做,無需涉足各種前端工具的複雜配置,只需面對瀏覽器本身,讓一切你所需要的元素與交互,從這裏信手拈來。作為國人的開源項目,完整的接口文檔與Demo示例讓入門者非常友好的上手,開箱即用的Api讓學習成本盡可能的低,其易用性成為快速開發框架的基礎。

MongoDB

  主要兩大優勢,無模式與橫向擴展。對於權限模塊來說,無需SQL來寫複雜查詢和報表,也不需要使用到多表的強事務,上面提到的時效性的數據庫壓力問題也可以通過分片解決。無模式使得開發人員無需預定義存儲結構,結合MongoDB官方提供的驅動可以做到快速的開發。

數據庫設計

 E-R圖

 

  一個管理員可以擁有多個角色,因此管理員與角色是一對多的關聯;角色作為權限組的存在,又可以選擇多個功能權限值與菜單,所以角色與菜單、功能權限值也是一對多的關係。

類圖

Deparment與Position屬於非核心,可以按照自己的實際業務進行擴展。

功能權限值初始化

  隨着業務發展,需求功能是千奇百怪的,根本無法抽象出來,那麼功能按鈕就要隨着業務進行定義。在我的項目里使用了枚舉值進行定義每個功能權限,通過自定義的PermissionAttribute與響應的action進行綁定,在系統啟動時,通過反射把功能權限的枚舉值與相應的controller、action映射到MenuAction表,枚舉值對應code字段,controller與action拼接后對應url字段。

  已初始化到數據庫的權限值可以到菜單頁把相對應的菜單與權限通過用戶界面關聯起來。

權限值綁定action

1         [HttpPost]
2         [Permission(PermCode.Administrator_Edit)]
3         public IActionResult Edit(EditModel edit)
4         {
5             //do something
6 
7             return Json(result);
8         }

初始化權限值

 1     /// <summary>
 2     /// 功能權限
 3     /// </summary>
 4     public static class PermissionUtil
 5     {
 6         public static readonly Dictionary<string, IEnumerable<int>> PermissionUrls = new Dictionary<string, IEnumerable<int>>();
 7         private static MongoRepository _mongoRepository;
 8 
 9         /// <summary>
10         /// 判斷權限值是否被重複使用
11         /// </summary>
12         public static void ValidPermissions()
13         {
14             var codes = Enum.GetValues(typeof(PermCode)).Cast<int>();
15             var dic = new Dictionary<int, int>();
16             foreach (var code in codes)
17             {
18                 if (!dic.ContainsKey(code))
19                     dic.Add(code, 1);
20                 else
21                     throw new Exception($"權限值 {code} 被重複使用,請檢查 PermCode 的定義");
22             }
23         }
24 
25         /// <summary>
26         /// 初始化添加預定義權限值
27         /// </summary>
28         /// <param name="app"></param>
29         public static void InitPermission(IApplicationBuilder app)
30         {
31             //驗證權限值是否重複
32             ValidPermissions();
33 
34             //反射被標記的Controller和Action
35             _mongoRepository = (MongoRepository)app.ApplicationServices.GetService(typeof(MongoRepository));
36 
37             var permList = new List<MenuAction>();
38             var actions = typeof(PermissionUtil).Assembly.GetTypes()
39                 .Where(t => typeof(Controller).IsAssignableFrom(t) && !t.IsAbstract)
40                 .SelectMany(t => t.GetMethods(BindingFlags.Instance | BindingFlags.Public | BindingFlags.DeclaredOnly));
41 
42             //遍歷集合整理信息
43             foreach (var action in actions)
44             {
45                 var permissionAttribute =
46                     action.GetCustomAttributes(typeof(PermissionAttribute), false).ToList();
47                 if (!permissionAttribute.Any())
48                     continue;
49 
50                 var codes = permissionAttribute.Select(a => ((PermissionAttribute)a).Code).ToArray();
51                 var controllerName = action?.ReflectedType?.Name.Replace("Controller", "").ToLower();
52                 var actionName = action.Name.ToLower();
53 
54                 foreach (var item in codes)
55                 {
56                     if (permList.Exists(c => c.Code == item))
57                     {
58                         var menuAction = permList.FirstOrDefault(a => a.Code == item);
59                         menuAction?.Url.Add($"{controllerName}/{actionName}".ToLower());
60                     }
61                     else
62                     {
63                         var perm = new MenuAction
64                         {
65                             Id = item.ToString().EncodeMd5String().ToObjectId(),
66                             CreateDateTime = DateTime.Now,
67                             Url = new List<string> { $"{controllerName}/{actionName}".ToLower() },
68                             Code = item,
69                             Name = ((PermCode)item).GetDisplayName() ?? ((PermCode)item).ToString()
70                         };
71                         permList.Add(perm);
72                     }
73                 }
74                 PermissionUrls.TryAdd($"{controllerName}/{actionName}".ToLower(), codes);
75             }
76 
77             //業務功能持久化
78             _mongoRepository.Delete<MenuAction>(a => true);
79             _mongoRepository.BatchAdd(permList);
80         }
81 
82         /// <summary>
83         /// 獲取當前路徑
84         /// </summary>
85         /// <param name="filterContext"></param>
86         /// <returns></returns>
87         public static string CurrentUrl(HttpContext filterContext)
88         {
89             var url = filterContext.Request.Path.ToString().ToLower().Trim('/');
90             return url;
91         }
92     }

關聯菜單與功能權限

訪問權限

  當所有權限關係關聯上后,用戶訪問系統時,需要對其所有操作進行攔截與實時的權限判斷,我們註冊一個全局的GlobalAuthorizeAttribute,其主要攔截所有已經標識PermissionAttribute的action,查詢該用戶所關聯所有角色的權限是否滿足允許通過。

  我的實現有個細節,給判斷用戶IsSuper==true,也就是超級管理員,如果是超級管理員則繞過所有判斷,可能有人會問為什麼不在角色添加一個名叫超級管理員進行判斷,因為名稱是不可控的,在代碼邏輯里並不知道用戶起的所謂的超級管理員,就是我們需要繞過驗證的超級管理員,假如他叫無敵管理員呢?

 1  /// <summary>
 2     /// 全局的訪問權限控制
 3     /// </summary>
 4     public class GlobalAuthorizeAttribute : System.Attribute, IAuthorizationFilter
 5     {
 6         #region 初始化
 7         private string _currentUrl;
 8         private string _unauthorizedMessage;
 9         private readonly List<string> _noCheckPage = new List<string> { "home/index", "home/indexpage", "/" };
10 
11         private readonly AdministratorService _administratorService;
12         private readonly MenuService _menuService;
13 
14         public GlobalAuthorizeAttribute(AdministratorService administratorService, MenuService menuService)
15         {
16             _administratorService = administratorService;
17             _menuService = menuService;
18         } 
19         #endregion
20 
21         public void OnAuthorization(AuthorizationFilterContext context)
22         {
23             context.ThrowIfNull();
24 
25             _currentUrl = PermissionUtil.CurrentUrl(context.HttpContext);
26 
27             //不需要驗證登錄的直接跳過
28             if (context.Filters.Count(a => a is AllowAnonymousFilter) > 0)
29                 return;
30 
31             var user = GetCurrentUser(context);
32             if (user == null)
33             {
34                 if (_noCheckPage.Contains(_currentUrl))
35                     return;
36 
37                 _unauthorizedMessage = "登錄失效";
38 
39                 if (context.HttpContext.Request.IsAjax())
40                     NoUserResult(context);
41                 else
42                     LogoutResult(context);
43                 return;
44             }
45 
46             //超級管理員跳過
47             if (user.IsSuper)
48                 return;
49 
50             //賬號狀態判斷
51             var administrator = _administratorService.GetById(user.UserId);
52             if (administrator != null && administrator.Status != EAdministratorStatus.Normal)
53             {
54                 if (_noCheckPage.Contains(_currentUrl))
55                     return;
56 
57                 _unauthorizedMessage = "親~您的賬號已被停用,如有需要請您聯繫系統管理員";
58 
59                 if (context.HttpContext.Request.IsAjax())
60                     AjaxResult(context);
61                 else
62                     AuthResult(context, 403, GoErrorPage(true));
63 
64                 return;
65             }
66 
67             if (_noCheckPage.Contains(_currentUrl))
68                 return;
69 
70             var userUrl = _administratorService.GetUserCanPassUrl(user.UserId);
71 
72             // 判斷菜單訪問權限與菜單訪問權限
73             if (IsMenuPass(userUrl) && IsActionPass(userUrl))
74                 return;
75 
76             if (context.HttpContext.Request.IsAjax())
77                 AuthResult(context, 200, GetJsonResult());
78             else
79                 AuthResult(context, 403, GoErrorPage());
80         }
81     }

功能權限

  在權限驗證通過後,返回view之前,還是利用了Filter進行一個實時的權限查詢,主要把該用戶所擁有功能權限值查詢出來通過ViewData[“PermCodes”]傳到頁面,然後通過razor進行按鈕的渲染判斷。

  然而我在項目中封裝了大部分常用的LayUI控件,主要利用.Net Core的TagHelper進行了封裝,TagHelper內部與ViewData[“PermCodes”]進行判斷是否輸出HTML。

全局功能權限值查詢

 1 /// <summary>
 2     /// 全局用戶權限值查詢
 3     /// </summary>
 4     public class GobalPermCodeAttribute : IActionFilter
 5     {
 6         private readonly AdministratorService _administratorService;
 7 
 8         public GobalPermCodeAttribute(AdministratorService administratorService)
 9         {
10             _administratorService = administratorService;
11         }
12 
13         private static AdministratorData GetCurrentUser(HttpContext context)
14         {
15             return context.User.Claims.FirstOrDefault(c => c.Type == ClaimTypes.UserData)?.Value.FromJson<AdministratorData>();
16         }
17 
18 
19         public void OnActionExecuting(ActionExecutingContext context)
20         {
21             ((Controller)context.Controller).ViewData["PermCodes"] = new List<int>();
22 
23             if (context.HttpContext.Request.IsAjax())
24                 return;
25 
26             var user = GetCurrentUser(context.HttpContext);
27             if (user == null)
28                 return;
29 
30             if (user.IsSuper)
31                 return;
32 
33             ((Controller)context.Controller).ViewData["PermCodes"] = _administratorService.GetActionCode(user.UserId).ToList();
34         }
35 
36         public void OnActionExecuted(ActionExecutedContext context)
37         {
38         }
39     }

LayUI Buttom的TagHelper封裝

 1   [HtmlTargetElement("LayuiButton")]
 2     public class LayuiButtonTag : TagHelper
 3     {
 4         #region 初始化
 5         private const string PermCodeAttributeName = "PermCode";
 6         private const string ClasstAttributeName = "class";
 7         private const string LayEventAttributeName = "lay-event";
 8         private const string LaySubmitAttributeName = "LaySubmit";
 9         private const string LayIdAttributeName = "id";
10         private const string StyleAttributeName = "style";
11 
12         [HtmlAttributeName(StyleAttributeName)]
13         public string Style { get; set; }
14 
15         [HtmlAttributeName(LayIdAttributeName)]
16         public string Id { get; set; }
17 
18         [HtmlAttributeName(LaySubmitAttributeName)]
19         public string LaySubmit { get; set; }
20 
21         [HtmlAttributeName(LayEventAttributeName)]
22         public string LayEvent { get; set; }
23 
24         [HtmlAttributeName(ClasstAttributeName)]
25         public string Class { get; set; }
26 
27         [HtmlAttributeName(PermCodeAttributeName)]
28         public int PermCode { get; set; }
29 
30         [HtmlAttributeNotBound]
31         [ViewContext]
32         public ViewContext ViewContext { get; set; }
33 
34         #endregion
35         public override async void Process(TagHelperContext context, TagHelperOutput output)
36         {
37             context.ThrowIfNull();
38             output.ThrowIfNull();
39 
40             var administrator = ViewContext.HttpContext.GetCurrentUser();
41             if (administrator == null)
42                 return;
43 
44             var childContent = await output.GetChildContentAsync();
45 
46             if (((List<int>)ViewContext.ViewData["PermCodes"]).Contains(PermCode) || administrator.IsSuper)
47             {
48                 foreach (var item in context.AllAttributes)
49                 {
50                     output.Attributes.Add(item.Name, item.Value);
51                 }
52 
53                 output.TagName = "a";
54                 output.TagMode = TagMode.StartTagAndEndTag;
55                 output.Content.SetHtmlContent(childContent.GetContent());
56             }
57             else
58             {
59                 output.TagName = "";
60                 output.TagMode = TagMode.StartTagAndEndTag;
61                 output.Content.SetHtmlContent("");
62             }
63         }
64     }

 

視圖代碼

結尾

  以上就是我本篇分享的內容,項目是以單體應用提供的,方案思路也適用於前後端分離。最後附上幾個系統效果圖

 

 

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※別再煩惱如何寫文案,掌握八大原則!

※產品缺大量曝光嗎?你需要的是一流包裝設計!

分類
發燒車訊

被迫重構代碼,這次我幹掉了 if-else

本文收錄在個人博客:www.chengxy-nds.top,技術資源共享,一起進步

最近公司貌似融到資了!開始發了瘋似的找渠道推廣,現在終於明白為啥前一段大肆的招人了,原來是在下一盤大棋,對員工總的來看是個好事,或許是時候該跟boss提一提漲工資的話題了。

不過,漲工資還沒下文,隨之而來的卻是一車一車的需求,每天都有新渠道接入,而且每個渠道都要提供個性化支持,開發量陡增。最近都沒什麼時間更文,準點下班都成奢望了!

由於推廣渠道的激增,而每一個下單來源在下單時都做特殊的邏輯處理,可能每两天就會加一個來源,已經把之前的下單邏輯改的面目全。出於長遠的考慮,我決定對現有的邏輯進行重構,畢竟長痛不如短痛。

傳統的實現方式

我們看下邊的偽代碼,大致就是重構前下單邏輯的代碼,由於來源比較少,簡單的做if-else邏輯判斷足以滿足需求。

現在每種訂單來源的處理邏輯都有幾百行代碼,看着已經比較臃腫,可我愣是遲遲沒動手重構,一方面業務方總像催命鬼一樣的讓你趕工期,想快速實現需求,這樣寫是最快;另一方面是不敢動,面對古董級代碼,還是想求個安穩。

但這次來源一下子增加幾十個,再用這種方式做已經無法維護了,想象一下那種臃腫的if-else代碼,別說開發想想都頭大!

public class OrderServiceImpl implements IOrderService {
    @Override
    public String handle(OrderDTO dto) {
        String type = dto.getType();
        if ("1".equals(type)) {
            return "處理普通訂單";
        } else if ("2".equals(type)) {
            return "處理團購訂單";
        } else if ("3".equals(type)) {
            return "處理促銷訂單";
        }
        return null;
    }
}

策略模式的實現方式

思來想去基於當前業務場景重構,還是用策略模式比較合適,它是oop中比較著名的設計模式之一,對方法行為的抽象。

策略模式定義了一個擁有共同行為的算法族,每個算法都被封裝起來,可以互相替換,獨立於客戶端而變化。

一、策略模式的使用場景:

  • 針對同一問題的多種處理方式,僅僅是具體行為有差別時;
  • 需要安全地封裝多種同一類型的操作時;
  • 同一抽象類有多個子類,而客戶端需要使用if-else 或者 switch-case 來選擇具體子類時。

這個是用策略模式修改後代碼:

@Component
@OrderHandlerType(16)
public class DispatchModeProcessor extends AbstractHandler{

	@Autowired
	private OrderStencilledService orderStencilledService;
	
	@Override
	public void handle(OrderBO orderBO) {
		
		/**
    	 * 訂單完結廣播通知(1 - 支付完成)
    	 */
    	orderStencilledService.dispatchModeFanout(orderBO);
		
    	/**
    	 *  SCMS 出庫單
    	 */
    	orderStencilledService.createScmsDeliveryOrder(orderBO.getPayOrderInfoBO().getLocalOrderNo());
	}
}

每個訂單來源都有自己單獨的邏輯實現類,而每次需要添加訂單來源,直接新建實現類,修改@OrderHandlerType(16)的數值即可,再也不用去翻又臭又長的if-lese

不僅如此在分配任務時,每個人負責開發幾種訂單來源邏輯,都可以做到互不干擾,而且很大程度上減少了合併代碼的衝突。

二、具體的實現過程:

1、定義註解

定義一個標識訂單來源的註解@OrderHandlerType

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface OrderHandlerType {
	int value() default 0;
}

2、抽象業務處理器

向上抽象出來一個具體的業務處理器

public abstract class AbstractHandler {
	abstract public void handle(OrderBO orderBO);
}

3、項目啟動掃描 handler 入口

@Component
@SuppressWarnings({"unused","rawtypes"})
public class HandlerProcessor implements BeanFactoryPostProcessor {
	
	private String basePackage = "com.ecej.order.pipeline.processor";
	
    public static final Logger log = LoggerFactory.getLogger(HandlerProcessor.class);
	
	@Override
	public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
		
		Map<Integer,Class> map = new HashMap<Integer,Class>();
		
		ClassScaner.scan(basePackage, OrderHandlerType.class).forEach(x ->{
			int type = x.getAnnotation(OrderHandlerType.class).value();
			map.put(type,x);
		});
		
		beanFactory.registerSingleton(OrderHandlerType.class.getName(), map);
		
		log.info("處理器初始化{}", JSONObject.toJSONString(beanFactory.getBean(OrderHandlerType.class.getName())));
	}
}

4、掃描需要用到的工具類

public class ClassScaner {
	private ResourcePatternResolver resourcePatternResolver = new PathMatchingResourcePatternResolver();

	private final List<TypeFilter> includeFilters = new ArrayList<TypeFilter>();

	private final List<TypeFilter> excludeFilters = new ArrayList<TypeFilter>();

	private MetadataReaderFactory metadataReaderFactory = new CachingMetadataReaderFactory(resourcePatternResolver);
	
	/**
	 * 添加包含的Fiter
	 * @param includeFilter
	 */
	public void addIncludeFilter(TypeFilter includeFilter) {
		this.includeFilters.add(includeFilter);
	}

	/**
	 * 添加排除的Fiter
	 * @param includeFilter
	 */
	public void addExcludeFilter(TypeFilter excludeFilter) {
		this.excludeFilters.add(excludeFilter);
	}
	
	/**
	 * 掃描指定的包,獲取包下所有的Class
	 * @param basePackage 包名
	 * @param targetTypes 需要指定的目標類型,可以是pojo,可以是註解
	 * @return Set<Class<?>>
	 */
	public static Set<Class<?>> scan(String basePackage,
			Class<?>... targetTypes) {
		ClassScaner cs = new ClassScaner();
		for (Class<?> targetType : targetTypes){
			if(TypeUtils.isAssignable(Annotation.class, targetType)){
				cs.addIncludeFilter(new AnnotationTypeFilter((Class<? extends Annotation>) targetType));
			}else{
				cs.addIncludeFilter(new AssignableTypeFilter(targetType));
			}
		}
		return cs.doScan(basePackage);
	}
	
	/**
	 * 掃描指定的包,獲取包下所有的Class
	 * @param basePackages 包名,多個
	 * @param targetTypes 需要指定的目標類型,可以是pojo,可以是註解
	 * @return Set<Class<?>>
	 */
	public static Set<Class<?>> scan(String[] basePackages,
			Class<?>... targetTypes) {
		ClassScaner cs = new ClassScaner();
		for (Class<?> targetType : targetTypes){
			if(TypeUtils.isAssignable(Annotation.class, targetType)){
				cs.addIncludeFilter(new AnnotationTypeFilter((Class<? extends Annotation>) targetType));
			}else{
				cs.addIncludeFilter(new AssignableTypeFilter(targetType));
			}
		}
		Set<Class<?>> classes = new HashSet<Class<?>>();
		for (String s : basePackages){
			classes.addAll(cs.doScan(s));
		}
		return classes;
	}
	
	/**
	 * 掃描指定的包,獲取包下所有的Class
	 * @param basePackages 包名
	 * @return Set<Class<?>>
	 */
	public Set<Class<?>> doScan(String [] basePackages) {
		Set<Class<?>> classes = new HashSet<Class<?>>();
		for (String basePackage :basePackages) {
			classes.addAll(doScan(basePackage));
		}
		return classes;
	}
	
	/**
	 * 掃描指定的包,獲取包下所有的Class
	 * @param basePackages 包名
	 * @return Set<Class<?>>
	 */
	public Set<Class<?>> doScan(String basePackage) {
		Set<Class<?>> classes = new HashSet<Class<?>>();
		try {
			String packageSearchPath = ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX
					+ ClassUtils.convertClassNameToResourcePath(
							SystemPropertyUtils.resolvePlaceholders(basePackage))+"/**/*.class";
			Resource[] resources = this.resourcePatternResolver.getResources(packageSearchPath);
			for (int i = 0; i < resources.length; i++) {
				Resource resource = resources[i];
				if (resource.isReadable()) {
					MetadataReader metadataReader = this.metadataReaderFactory.getMetadataReader(resource);
					if ((includeFilters.size() == 0 && excludeFilters.size() == 0)|| matches(metadataReader)) {
						try {
							classes.add(Class.forName(metadataReader.getClassMetadata().getClassName()));
						} catch (ClassNotFoundException ignore) {}
					}
				}
			}
		} catch (IOException ex) {
			throw new RuntimeException("I/O failure during classpath scanning", ex);
		}
		return classes;
	}
	
	/**
	 * 處理 excludeFilters和includeFilters
	 * @param metadataReader
	 * @return boolean
	 * @throws IOException
	 */
	private boolean matches(MetadataReader metadataReader) throws IOException {
		for (TypeFilter tf : this.excludeFilters) {
			if (tf.match(metadataReader, this.metadataReaderFactory)) {
				return false;
			}
		}
		for (TypeFilter tf : this.includeFilters) {
			if (tf.match(metadataReader, this.metadataReaderFactory)) {
				return true;
			}
		}
		return false;
	}
}

5、根據類型實例化抽象類


@Component
public class HandlerContext {

	@Autowired
	private ApplicationContext beanFactory;

	public  AbstractHandler getInstance(Integer type){
		
		Map<Integer,Class> map = (Map<Integer, Class>) beanFactory.getBean(OrderHandlerType.class.getName());
		
		return (AbstractHandler)beanFactory.getBean(map.get(type));
	}
	
}

6、調用入口

我這裡是在接受到MQ消息時,處理多個訂單來源業務,不同訂單來源路由到不同的業務處理類中。


@Component
@RabbitListener(queues = "OrderPipelineQueue")
public class PipelineSubscribe{
 
	private final Logger LOGGER = LoggerFactory.getLogger(PipelineSubscribe.class);
	
	@Autowired
	private HandlerContext HandlerContext;
	
	@Autowired
	private OrderValidateService orderValidateService;
	
    @RabbitHandler
    public void subscribeMessage(MessageBean bean){
    	
    	OrderBO orderBO = JSONObject.parseObject(bean.getOrderBO(), OrderBO.class);
    	
    	if(null != orderBO &&CollectionUtils.isNotEmpty(bean.getType()))
    	{
    		for(int value:bean.getType())
    		{
       		 AbstractHandler handler = HandlerContext.getInstance(value);
       		 handler.handle(orderBO);
    		}
		}
	}
}

接收實體 MessageBean 類代碼

public class MessageBean implements Serializable {
    private static final long serialVersionUID = 5454831432308782668L;
    private String cachKey;
    private List<Integer> type;
    private String orderBO;

    public MessageBean(List<Integer> type, String orderBO) {
        this.type = type;
        this.orderBO = orderBO;
    }
}

以上設計模式方式看着略顯複雜,很些小夥伴提出質疑:“你為了個if-else,弄的如此的麻煩,又是自定義註解,又弄這麼多類不麻煩嗎?” 還有一些小夥伴糾結於性能問題,策略模式的性能可能確實不如if-else

但我覺得吧增加一點複雜度、犧牲一丟丟性能,換代碼的整潔和可維護性還是值得的。不過,一個人一個想法,怎麼選還是看具體業務場景吧!

策略模式的優缺點

優點

  • 易於擴展,增加一個新的策略只需要添加一個具體的策略類即可,基本不需要改變原有的代碼,符合開放封閉原則
  • 避免使用多重條件選擇語句,充分體現面向對象設計思想 策略類之間可以自由切換,由於策略類都實現同一個接口,所以使它們之間可以自由切換
  • 每個策略類使用一個策略類,符合單一職責原則 客戶端與策略算法解耦,兩者都依賴於抽象策略接口,符合依賴反轉原則
  • 客戶端不需要知道都有哪些策略類,符合最小知識原則

缺點

  • 策略模式,當策略算法太多時,會造成很多的策略類
  • 客戶端不知道有哪些策略類,不能決定使用哪個策略類,這點可以通過封裝common公共包解決,也可以考慮使IOC容器依賴注入的方式來解決。

以下是訂單來源策略類的一部分,不得不說策略類確實比較多。

總結

凡事都有他的兩面性,if-else多層嵌套和也都有其各自的優缺點:

  • if-else的優點就是簡單,想快速迭代功能,邏輯嵌套少且不會持續增加,if-else更好些,缺點也是顯而易見,代碼臃腫繁瑣不便於維護。

  • 策略模式 將各個場景的邏輯剝離出來維護,同一抽象類有多個子類,需要使用if-else 或者 switch-case 來選擇具體子類時,建議選策略模式,他的缺點就是會產生比較多的策略類文件。

兩種實現方式各有利弊,如何選擇還是要依據具體業務場景,還是那句話設計模式不是為了用而用,一定要用在最合適的位置。

閑聊

平常和粉絲私下聊天,好多人對於學設計模式的感受:設計模式背了一大堆,可平常開發還不是成天寫if-else業務邏輯,根本就用不到。

學設計模式也不是用不到,只是有時候沒有合適它的場景而已,像我們今天說的這種業務場景,用設計模式就可以完美的解決嘛。

學了N多技術可工作用不到是一種很常見的事情,一個穩定的項目使用一種技術會有諸多考量的,新技術會不會提升系統複雜度?它有哪些性能瓶頸?這些都必須考慮到,畢竟項目穩定才是最重要,誰也不敢輕易冒險嘗試。

而我們學習技術可不僅為了眼下項目中是否會用到,是要做一個技術積累,做長遠打算,人往高處走,沒點能力可不行。

原創不易,燃燒秀髮輸出內容,希望你能有一丟丟收穫!

整理了幾百本各類技術电子書,送給小夥伴們。關公眾號回復【666】自行領取。和一些小夥伴們建了一個技術交流群,一起探討技術、分享技術資料,旨在共同學習進步,如果感興趣就掃碼加入我們吧!

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※別再煩惱如何寫文案,掌握八大原則!

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※超省錢租車方案

※教你寫出一流的銷售文案?

網頁設計最專業,超強功能平台可客製化

※產品缺大量曝光嗎?你需要的是一流包裝設計!

分類
發燒車訊

120行代碼打造.netcore生產力工具-小而美的後台異步組件

相信絕大部分開發者都接觸過用戶註冊的流程,通常情況下大概的流程如下所示:

  1. 接收用戶提交註冊信息
  2. 持久化註冊信息(數據庫+redis)
  3. 發送註冊成功短信(郵件)
  4. 寫操作日誌(可選)

偽代碼如下:

public async Task<IActionResult> Reg([FromBody] User user)
{
    _logger.LogInformation("持久化數據開始");
    await Task.Delay(50);
    _logger.LogInformation("持久化結束");
    _logger.LogInformation("發送短信開始");
    await Task.Delay(100);
    _logger.LogInformation("發送短信結束");
    _logger.LogInformation("操作日誌開始");
    await _logRepository.Insert(new Log { Txt = "註冊日誌" });
    _logger.LogInformation("操作日誌結束");
    return Ok("註冊成功");
}

在以上的代碼中,我使用Task.Delay方法阻塞主線程,用以模擬實際場景中的執行耗時。以上流程應該是包含了絕大部分註冊流程所需要的操作。對於任何開發者來講,以上業務流程沒任何難度,無非是順序的執行各個流程的代碼即可。

稍微有點開發經驗的應該會將以上的流程進行拆分,但有些人可能就要問了,為什麼要拆分呢?拆分之後的代碼應該怎麼寫呢?下面我們就來簡單聊下如此場景的正確打開方式。

首先,註冊成功的依據應該是是否成功的將用戶信息持久化(至於是先持久化到數據庫,異或是先寫到redis不在本篇文章討論的範疇),至於發送註冊短信(郵件)以及寫日誌的操作應該不能成為影響註冊是否成功的因素,而發送短信/郵件等相關操作通常情況下也是比較耗時的,所以在對此接口做性能優化時,可優先考慮將短信/郵件以及寫日誌等相關操作與主流程(持久化數據)拆分,使其不阻塞主流程的執行,從而達到提高響應速度的目的。

知道了為什麼要拆,但具體如何拆分呢?怎樣才能用最少的改動,達到所需的目的呢?

條條大路通羅馬,所以要達成我們的目的也是有很多方案的,具體選擇哪種方案需要根據具體的業務場景,業務體量等多種因素綜合考慮,下面我將一一介紹分析相關方案。

在正式介紹可用方案前,筆者想先介紹一種很多新手容易錯誤使用的一種方案(因為筆者就曾經天真的使用過這種錯誤的方案)。

提到異步,絕大部分.net開發者應該第一想到的就是Task,async,await等,的確,async,await的語法糖簡化了.net開發者異步編程的門檻,減少了很多代碼量。通常一個返回Task類型的方法,在被調用時,會在方法的前面加上await,表示需要等待此方法的執行結果,再繼續執行後面的代碼。但如果不加await時,則不會等待方法的執行結果,進而也不會阻塞主線程。所以,有些人可能就會將發送短信/郵件以及寫日誌的操作如下方式進行改造。

public async Task<IActionResult> Reg1([FromBody] User user)
{
    _logger.LogInformation("持久化數據開始");
    await Task.Delay(50);
    _logger.LogInformation("持久化結束");
    _ = Task.Run(async () =>
     {
         _logger.LogInformation("發送短信開始");
         await Task.Delay(100);
         _logger.LogInformation("發送短信結束");
         _logger.LogInformation("操作日誌開始");
         await _logRepository.Insert(new Log { Txt = "註冊日誌" });
         _logger.LogInformation("操作日誌結束");
     });
    return Ok("註冊成功");
}

然後使用jmeter分別壓測改造前和改造后的接口,結果如下:

有沒有被驚訝到?就這樣一個簡單的改造,吞吐量就提高了三四倍。既然已經提高了三四倍,那為什麼說這是一種錯誤的改造方法嗎?各位看官且往下看。

熟悉.netcore的大佬,應該都知道.netcore的依賴注入的生命周期吧。通常情況下,注入的生命周期包括:Singleton,Scope,Transient。
在以上的流程中,假如寫操作日誌的實例的生命周期是Scope,當在Task中調用Controller獲取到的實例的方法時,因為Task.Run並沒有阻塞主線程,當調用Action return后,當前請求的scope注入的對象會被回收,如果對象被回收之後,Task.Run還未執行完,則會報System.ObjectDisposedException: Cannot access a disposed object. 異常。意思是,不能訪問一個已disposed的對象。正確的做法是使用IServiceScopeFactory創建一個新的作用域,在新的作用域中獲取獲取日誌倉儲服務的實例。這樣就可以避免System.ObjectDisposedException異常了。
改造后的示例代碼如下:

public async Task<IActionResult> Reg1([FromBody] User user)
{
    _logger.LogInformation("持久化數據開始");
    await Task.Delay(50);
    _logger.LogInformation("持久化結束");
    _ = Task.Run(async () =>
    {
        using (var scope = _scopeFactory.CreateScope())
        {
            var sp = scope.ServiceProvider;
            var logRepository = sp.GetService<ILogRepository>();
            _logger.LogInformation("發送短信開始");
            await Task.Delay(100);
            _logger.LogInformation("發送短信結束");

            _logger.LogInformation("操作日誌開始");
            await logRepository.Insert(new Log { Txt = "註冊日誌" });
            _logger.LogInformation("操作日誌結束");
        }
    });
    return Ok("註冊成功");
}

雖然得到了正解,但上述的代碼着實有點多,如果一個項目有多個相似的業務場景,就要考慮對CreateScope相關的操作進行封裝。

下面就來一一介紹下筆者覺得實現此業務場景的幾種方案。
1.消息隊列
2.Quartz任務調度組件
3.Hangfire任務調度組件
4.Weshare.TransferJob(推薦)
首先說下消息隊列的方式。準確的說,消息隊列應該是這種場景的最優解決方案,消息隊列的其中一個比較重要的特性就是解耦,從而提高吞吐量。但並不是所有的應用程序都需要上消息隊列。有些業務場景使用消息隊列時,往往會給人一種”殺雞用牛刀”的感覺。

其次Quartz和Hangfire都是任務調度框架,都提供了可實現以上業務場景的邏輯,但Quartz和Hangfire都需要持久化作業數據。雖然Hangfire提供了內存版本,但經過我的測試,發現Hangfire的內存版本特別消耗內存,所以不太推薦使用任務調度框架來實現類似於這樣的業務邏輯。

最後,也就是本文的重點,筆者結合了消息隊列和任務調度的思想,實現了一個輕量級的轉移作業到後台執行的組件。此組件完美的解決了Scope生命周期實例獲取的問題,一行代碼將不需要等待的操作轉移到後台線程執行。
接入步驟如下:
1.使用nuget安裝Weshare.TransferJob
2.在Stratup中注入服務。

services.AddTransferJob();

3.通過構造函數或其他方法獲取到IBackgroundRunService的實例。
4.調用實例的Transfer方法將作業轉移到後台線程。

_backgroundRunService.Transfer(log=>log.Insert(new Log(){Txt = "註冊日誌"}));

就是這麼簡單的實現了這樣的業務場景,不僅簡化了代碼,而且大大提高了系統的吞吐量。

下面再來一起分析下Weshare.TransferJob的核心代碼(畢竟文章要點題)。各位器宇不凡的看官請繼續往下看。
下面的代碼是AddTransferJob方法的實現:

public static IServiceCollection AddTransferJob(this IServiceCollection services)
{
    services.AddSingleton<IBackgroundRunService, BackgroundRunService>();
    services.AddHostedService<TransferJobHostedService>();
    return services;
}

聰明”絕頂”的各位看官應該已經發現上述代碼的關鍵所在。是的, 你沒有看錯,此組件的就是利用.net core提供的HostedService在後台執行被轉移的作業的。
我們再來一起看看TransferJobHostedService的代碼:

public class TransferJobHostedService:BackgroundService
{
    private IBackgroundRunService _runService;
    public TransferJobHostedService(IBackgroundRunService runService)
    {
        _runService = runService;
    }
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            await _runService.Execute(stoppingToken);
        }
    }
}

這個類的代碼也很簡單,重寫了BackgroundService類的ExecuteAsync,循環調用IBackgroundRunService實例的Execute方法。所以,最最關鍵的代碼是IBackgroundRunService的實現類中。
詳細代碼如下:

public class BackgroundRunService : IBackgroundRunService
{
    private readonly SemaphoreSlim _slim;
    private readonly ConcurrentQueue<LambdaExpression> queue;
    private ILogger<BackgroundRunService> _logger;
    private readonly IServiceProvider _serviceProvider;
    public BackgroundRunService(ILogger<BackgroundRunService> logger, IServiceProvider serviceProvider)
    {
        _slim = new SemaphoreSlim(1);
        _logger = logger;
        _serviceProvider = serviceProvider;
        queue = new ConcurrentQueue<LambdaExpression>();
    }
    public async Task Execute(CancellationToken cancellationToken)
    {
        try
        {
            await _slim.WaitAsync(cancellationToken);
            if (queue.TryDequeue(out var job))
            {
                using (var scope = _serviceProvider.GetRequiredService<IServiceScopeFactory>().CreateScope())
                {
                    var action = job.Compile();
                    var isTask = action.Method.ReturnType == typeof(Task);
                    var parameters = job.Parameters;
                    var pars = new List<object>();
                    if (parameters.Any())
                    {
                        var type = parameters[0].Type;
                        var param = scope.ServiceProvider.GetRequiredService(type);
                        pars.Add(param);
                    }
                    if (isTask)
                    {
                        await (Task)action.DynamicInvoke(pars.ToArray());
                    }
                    else
                    {
                        action.DynamicInvoke(pars.ToArray());
                    }
                }
            }
        }
        catch (Exception e)
        {
            _logger.LogError(e.ToString());
        }
    }
    public void Transfer<T>(Expression<Func<T, Task>> expression)
    {
        queue.Enqueue(expression);
        _slim.Release();
    }
    public void Transfer(Expression<Action> expression)
    {
        queue.Enqueue(expression);
        _slim.Release();
    }
}

納尼?嫌代碼多看不懂?那咱們一起來剖析下吧。
首先,此類有三個較重要的私有變量,對應的類型分別是SemaphoreSlim, ConcurrentQueue ,IServiceProvider。
其中SemaphoreSlim是為了控制後台作業執行的順序的,在構造函數中初始化了此對象的信號量為1,表示在後台服務的ExecuteAsync方法的循環中每次只能有一個作業執行。
ConcurrentQueue 的對象是用來存儲被轉移到後台服務執行的作業的邏輯,所以使用LambdaExpression作為隊列的類型。
IServiceProvider是為了解決依賴注入的生命周期的。

然後在Execute方法中,第一行代碼如下:

await _slim.WaitAsync(cancellationToken);

作用是等待一個信號量,當沒有可用的信號量時,會阻塞線程的執行,這樣在後台服務的ExecuteAsync方法的死循環就不會一直執行下去,只有獲取到信號量才會繼續執行。
當獲取到信號量后,則說明有新的作業等待執行,所以此時則需要從隊列中讀出要執行的LambdaExpression表達式,創建一個新的Scope后,編譯此表達式樹,判斷返回類型,獲取泛型的具體類型,最後獲取到泛型對應的實例,執行對應的方法。

另外,Transfer方法就是暴露給調用者的方法,用於將表達式樹寫到隊列中,同時釋放信號量。

到此為止,Weshare.TransferJob的實現原理已分析完畢,由於此組件的原理只是將任務轉移到後台進行執行,所以並不是適合對事務有要求的場景。正如本文開頭所假設的場景,TransferJob最適合的場景還是那些和主操作關聯性較低的、失敗或成功並不會影響業務的正常運行。
同時,此組件的定位就是小而美,像延遲執行、定時執行的功能在最初的規劃中其實是有的,後來發現這些功能quartz已經有了,所以沒必要重複造這樣的輪子。
後期會根據使用場景,嘗試加入異常重試機制,以及異常通知回調機制。

最後,不知道有沒有較真的看官想計算下代碼量是否超過120行。
為了證明我不是標題黨,現將此組件進行開源,地址是:
https://github.com/fuluteam/WeShare.TransferJob

橋豆麻袋,筆者辛苦敲的代碼,難道各位看官想白嫖嗎? 點個贊再走唄。點完贊還有力氣的話,如果git上能點個star的話,那也是最好不過的。小生這廂先行謝過。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※教你寫出一流的銷售文案?

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※回頭車貨運收費標準

※別再煩惱如何寫文案,掌握八大原則!

※超省錢租車方案

※產品缺大量曝光嗎?你需要的是一流包裝設計!

分類
發燒車訊

分佈式鎖的一些理解

 在多線程併發的情況下,單個節點內的線程安全可以通過synchronized關鍵字和Lock接口來保證。

synchronized和lock的區別

  1. Lock是一個接口,是基於在語言層面實現的鎖,而synchronized是Java中的關鍵字,是基於JVM實現的內置鎖,Java中的每一個對象都可以使用synchronized添加鎖。

  2. synchronized在發生異常時,會自動釋放線程佔有的鎖,因此不會導致死鎖現象發生;而Lock在發生異常時,如果沒有主動通過unLock()去釋放鎖,則很可能造成死鎖現象,因此使用Lock時需要在finally塊中釋放鎖;

  3. Lock可以讓等待鎖的線程響應中斷,而synchronized卻不行,使用synchronized時,等待的線程會一直等待下去,不能夠響應中斷;

  4. Lock可以提高多個線程進行讀操作的效率。(可以通過readwritelock實現讀寫分離,一個用來獲取讀鎖,一個用來獲取寫鎖。)

  當開發的應用程序處於一個分佈式的集群環境中,涉及到多節點,多進程共同完成時,如何保證線程的執行順序是正確的。比如在高併發的情況下,很多企業都會使用Nginx反向代理服務器實現負載均衡的目的,這個時候很多請求會被分配到不同的Server上,一旦這些請求涉及到對統一資源進行修改操作時,就會出現問題,這個時候在分佈式系統中就需要一個全局鎖實現多個線程(不同進程中的線程)之間的同步。

  常見的處理辦法有三種:數據庫、緩存、分佈式協調系統。數據庫和緩存是比較常用的,但是分佈式協調系統是不常用的。

  常用的分佈式鎖的實現包含:

      Redis分佈式鎖Zookeeper分佈式鎖Memcached

基於 Redis 做分佈式鎖

 Redis提供的三種方法:

(1)鎖 SETNX:只在鍵 key 不存在的情況下, 將鍵 key 的值設置為 value 。若鍵 key 已經存在, 則 SETNX 命令不做任何動作。SETNX 是『SET if Not eXists』(如果不存在,則 SET)的簡寫。命令在設置成功時返回 1 , 設置失敗時返回 0

redis> SETNX job "programmer"    # job 設置成功
(integer) 1

redis> SETNX job "code-farmer"   # 嘗試覆蓋 job ,失敗

(2)解鎖 DEL:刪除給定的一個或多個 key

(3)鎖超時 EXPIRE: 為給定 key 設置生存時間,當 key 過期時(生存時間為 0 ),它會被自動刪除。

  每次當一個節點想要去操作臨界資源的時候,我們可以通過redis來的鍵值對來標記一把鎖,每一進程首先通過Redis訪問同一個key,對於每一個進程來說,如果該key不存在,則該線程可以獲取鎖,將該鍵值對寫入redis,如果存在,則說明鎖已經被其他進程所佔用。具體邏輯的偽代碼如下:

try{
	if(SETNX(key, 1) == 1){
		//do something ......
	}finally{
	DEL(key);
}

  但是此時,又會出現問題,因為SETNX和DEL操作並不是原子操作,如果程序在執行完SETNX后,而並沒有執行EXPIRE就已經宕機了,這樣一來,原先的問題依然存在,整個系統都將被阻塞。

  幸虧Redis又提供了SET key value timeout NX方法,可以以原子操作的方式完成SETNX和EXPIRE的操作。此時只需如下操作即可。

try{
	if(SET(key, 1, 30, timeout, NX) == 1){
		//do something ......
	}
}finally{
	DEL(key);
}

  解決了原子操作,仍然還有一點需要注意,例如,A節點的進程獲取到鎖的時候,A進程可能執行的很慢,在do something未完成的情況下,30秒的時間片已經使用完,此時會將該key給深處掉,此時B進程發現這個key不存在,則去訪問,並成功的獲取到鎖,開始執行do something,此時A線程恰好執行到DEL(key),會將B的key刪除掉,此時相當於B線程在訪問沒有加鎖的臨界資源,而其餘進程都有機會同時去操作這個臨界資源,會造成一些錯誤的結果。對於該問題的解決辦法是進程在刪除key之前可以做一個判斷,驗證當前的鎖是不是本進程加的鎖。

String threadId = Thread.currentThread().getId()
try{
	if(SET(key, threadId, 30, timeout, NX) == 1){
		//do something ......
	}
}finally{
    if(threadId.equals(redisClient.get(key))){
        DEL(key);
    }
}

   上面的改進雖然解決鎖被不同的進程釋放的危險,但並沒有解決獲取到鎖的進程在指定的時間內未完成do something操作(上面的代碼還有一點小問題,就是判斷操作和釋放鎖是兩個獨立的操作,不具備原子性。假設線程A判斷完確實是自己加的鎖 , 這時還沒del ,這時有效的時間用完了 , 緊接着線程B又馬上搶到了鎖 , 然後線程A才執行del命令 , 就會把B搶到的鎖給誤刪了),使得卡住的進程有可能與後來的進程同時同問臨界資源,而出現問題,因此一旦某個進程無法在超時時間內完成對臨界資源的操作,就需要延長超時的時間。此時可以啟動一個守護進程,監視指定時間內獲取鎖的進程是否完成操作,如果沒有,則添加超時時間,讓程序繼續執行。

String threadId = Thread.currentThread().getId()
try{
	if(SET(key, threadId, 30, timeout, NX) == 1){
		new Thread(){
            @Override
            public void run() {
            	//start Daemon
            }
         }
		//do something ......
	}
}finally{
    if(threadId.equals(redisClient.get(key))){
        DEL(key);
    }
}

  基於以上的分析,基本上可以通過Redis實現一個分佈式鎖,如果我們想提升該分佈式的性能,我們可以對連接資源進行分段處理,將請求均勻的分佈到這些臨界資源段中,比如一個買票系統,我們可以將100張票分為10 部分,每部分包含10張票放在其他的服務節點上,這些請求可以通過Nginx被均勻的分散到這些處理節點上,可以加快對臨界資源的處理。

參考資料

  1. 併發編程的鎖機制:synchronized和lock

  2. B站視頻上一部分講解

  3. 什麼是分佈式鎖?

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※超省錢租車方案

※別再煩惱如何寫文案,掌握八大原則!

※回頭車貨運收費標準

※教你寫出一流的銷售文案?

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

分類
發燒車訊

助長毀林?巴西銷往歐洲的黃豆 至少20%來自森林砍伐

環境資訊中心綜合外電;姜唯 編譯;林大利 審校

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

分類
發燒車訊

研究:全球礁鯊數量銳減 部分海域已絕跡

摘錄自2020年7月22日中央社報導

22日發布在科學期刊「自然」(Nature)的一份調查,首度全面了解全球哪裡的礁鯊實際上已滅絕。這份調查為期四年,在近60個國家超過370個暗礁海域進行研究。

加拿大達爾豪希大學(Dalhousie University)副教授麥克尼爾(Aaron MacNeil)表示:「我們原本預期……地球上每個暗礁海域應該都有鯊魚出沒,結果發現我們調查的海域中,有20%沒有任何鯊魚,這令人非常擔心。」

調查顯示,在卡達、印度、越南及肯亞等八個國家的暗礁海域,完全找不到鯊魚。這不代表這些國家的海域沒有鯊魚,但證明當地暗礁海域的鯊魚數量少得可憐。這表示礁鯊在當地生態系統已失去任何角色,也就是牠們已經功能性滅絕。

研究指出,破壞性的捕魚活動最可能是礁鯊數量大減罪魁禍首。「使用流刺網和延繩捕魚,對原本數量相對豐沛的礁鯊帶來最大負面影響。」

生物多樣性
物種保育
土地利用
農林漁牧業
國際新聞
全球
鯊魚
暗礁
滅絕
捕魚
生態系統
流刺網
延繩釣

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

※教你寫出一流的銷售文案?

※超省錢租車方案