分類
發燒車訊

都在講DevOps,但你知道它的發展趨勢嗎?

根據最近的一項集體研究,DevOps的市場在2017年創造了約29億美元的產值,預計到2022年,這個数字將達到約66億美元。人工智能的融入和安全性的融入,加上向自動化的巨大轉變,可合理預測,在2020年,DevOps將成為軟件工程的主流模式。

DevOps具有以下優勢:

●對需求變更的迅速響應

●超快的交付速度及靈活的安全部署

●建立完善的協作溝通渠道

●快速識別代碼中的錯誤或漏洞

●讓團隊將注意力集中在其他關鍵的事情上,而不是集中在安全特性上

 

越來越多的企業正採用DevOps的產品交付模式:根據Statista的統計數據,全面採用DevOps的企業數量從2017年的約10%增長到了2018年的17%。

而devops也將在2020年迎來新趨勢。

 

自動化成為焦點

實施DevOps產品交付模式的組織已經見證了極高的效率和超快速的部署速度。在提到DevOps時,我們主要討論的是DevOps自動化,零接觸自動化是未來的發展方向。在DevOps生命周期的7C(持續發展、持續集成、持續測試、持續反饋、持續監測、持續部署、持續運維)中,應用自動化是未來的關鍵,因為預計這將是2020年的主要目標之一。

注意力從CI管道轉移到DevOps的裝配線

DevOps的重要目標是改進交付過程的計劃階段和自動化階段之間的協作。這不僅僅關乎CI(持續集成),更重要的是關乎CD(持續交付)。許多組織正在投入額外的精力和時間來使公司軟件開發的整個過程自動化。因此,對於這些組織來說,現在是聯繫DevOps諮詢服務提供商的時候了。預計到2020年,注意力將從CI管道轉移到DevOps的裝配線。裝配線的一些共同優點如下:

●原生集成

●堅固的嵌套可見性

●適當互用性的完美持續交付

●基於團隊的分析以及商業智能

●快速實現和擴展“一切皆代碼”理念

 

對無服務器架構的使用增加

使用無服務器架構可以將DevOps提升到更高的水平,這並不意味着沒有服務器,而是使用雲服務的整體架構。FaaS(Function as a Service,功能即服務)和BaaS(Backend as a Service,後端即服務)是無服務器架構的兩個關鍵方面。通過採用這種無服務器體繫結構,企業可以節省時間、降低成本,並擁有具有彈性的、靈活的工作流。

“一切皆代碼”的概念

程序編碼是IT部門及其服務系統的骨幹。對DevOps自動化工具和腳本的充分理解將支配整個2020年。這個特定IT領域的前景與產品的未來取決於開發人員、測試人員及運維人員的技術能力。現在,隨着交付周期的縮短,需要引入代碼來提高軟件生產周期的效率。“一切皆代碼”的概念是在DevOps內部完成代碼的SDLC的實踐。如果軟件測試人員還不開始學習編程和編寫測試腳本,工作很可能會受到阻礙。

更好的嵌入式安全性

隨着安全漏洞的出現,越來越多的大小企業意識到網絡安全的重要性。2020年,DevOps預計將迅速將安全問題納入流程。DevSecOps首先在應用程序的開發生命周期中注入安全性,這有助於減少各種缺陷和漏洞,增加業務信譽。公司轉向DevSecOps促使項目中每個人都擔負安全方面的責任,這將在軟件開發過程中帶來很棒的協作,因為它確保了軟件開發過程始終保持完美、高效和可操作。

人工智能的興起和數據科學的飛速發展

隨着人工智能驅動的應用程序大量增加,數據科學正在推動越來越多的公司在其工作流程中採用DevOps理念。隨着數據科學和開發團隊在軟件開發、部署以及人工智能驅動的應用程序管理方面的效率越來越高,這將會進一步推動數據科學的發展。

2020年的主要目標是實現零接觸自動化。 持續不斷的人工智能和數據科學熱潮改變着遊戲規則。 許多應用程序都引入了人工智能,這已經促使多個DevOps團隊通過人工智能和數據科學實現自動化,數據科學團隊和開發團隊相輔相成地提高彼此的技能與交付水平。

對無服務器架構的使用增加

使用無服務器架構可以將DevOps提升到更高的水平,這並不意味着沒有服務器,而是使用雲服務的整體架構。FaaS(Function as a Service,功能即服務)和BaaS(Backend as a Service,後端即服務)是無服務器架構的兩個關鍵方面。通過採用這種無服務器體繫結構,企業可以節省時間、降低成本,並擁有具有彈性的、靈活的工作流。

Kubernetes長足發展

Kubernetes提供了基於容器技術的分佈式架構領先方案產品,因自身性能及易用性,已經成為應用廣泛的容器技術。伴隨着各類企業進一步通過深度採用容器技術來運行它們的雲原生應用,K8s將會迎來更廣的普及、更大的發展。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

分類
發燒車訊

使用 Masstransit中的 Request/Response 與 Courier 功能實現最終一致性

簡介

  目前的.net 生態中,最終一致性組件的選擇一直是一個問題。本地事務表(cap)需要在每個服務的數據庫中插入消息表,而且做不了此類事務 比如:創建訂單需要 餘額滿足+庫存滿足,庫存和餘額處於兩個服務中。masstransit 是我目前主要用的方案。以往一般都用 masstransit 中的 sagas 來實現 最終一致性,但是隨着併發的增加必定會對sagas 持久化的數據庫造成很大的壓力,根據stackoverflow 中的一個回答 我發現了 一個用  Request/Response 與 Courier 功能 實現最終一致性的方案 Demo地址。

Masstransit 中 Resquest/Response 功能 

 消息DTO

    public class SampleMessageCommand
    {
    }

 消費者

    public class SampleMessageCommandHandler : IConsumer<SampleMessageCommand>
    {
        public async Task Consume(ConsumeContext<SampleMessageCommand> context)
        {
            await context.RespondAsync(new SampleMessageCommandResult() { Data = "Sample" });
        }
    }

 返回結果DTO

 

    public class SampleMessageCommandResult
    {
        public string Data { get; set; }
    }

 調用方式與註冊方式略過,詳情請看 官方文檔。

  

  本質上使用消息隊列實現 Resquest/Response,客戶端(生產者)將請求消息發送至指定消息隊列並賦予RequestId和ResponseAddress(臨時隊列 rabbitmq),服務端(消費者)消費消息並把 需要返回的消息放入指定ResponseAddress,客戶端收到 Response message  通過匹配 RequestId 找到 指定Request,最後返回信息。

Masstransit 中 Courier  功能

  通過有序組合一系列的Activity,得到一個routing slip。每個 activity(忽略 Execute Activities) 都有 Execute 和 Compensate 兩個方法。Compensate 用來執撤銷 Execute 方法產生的影響(就是回退 Execute 方法)。每個 Activity Execute 最後都會 調用 Completed 方法把 回退所需要的的信息記錄在message中,最後持久化到消息隊列的某一個消息中。

 餘額扣減的Activity ,這裏的 DeductBalanceModel 是請求扣減的數據模型,DeductBalanceLog 是回退時需要用到的信息。

public class DeductBalanceActivity : IActivity<DeductBalanceModel, DeductBalanceLog>
    {
        private readonly ILogger<DeductBalanceActivity> logger;
        public DeductBalanceActivity(ILogger<DeductBalanceActivity> logger)
        {
            this.logger = logger;
        }
        public async Task<CompensationResult> Compensate(CompensateContext<DeductBalanceLog> context)
        {
            logger.LogInformation("還原餘額");
            var log = context.Log; //可以獲取 所有execute 完成時保存的信息
            //throw new ArgumentException("some things were wrong");
            return context.Compensated();
        }

        public async Task<ExecutionResult> Execute(ExecuteContext<DeductBalanceModel> context)
        {

            logger.LogInformation("扣減餘額");
            await Task.Delay(100);
            return context.Completed(new DeductBalanceLog() { Price = 100 });
        }
    }

 

      扣減庫存 Activity

    public class DeductStockActivity : IActivity<DeductStockModel, DeductStockLog>
    {
        private readonly ILogger<DeductStockActivity> logger;
        public DeductStockActivity(ILogger<DeductStockActivity> logger)
        {
            this.logger = logger;
        }
        public async Task<CompensationResult> Compensate(CompensateContext<DeductStockLog> context)
        {
            var log = context.Log;
            logger.LogInformation("還原庫存");
            return context.Compensated();
        }

        public async Task<ExecutionResult> Execute(ExecuteContext<DeductStockModel> context)
        {
            var argument = context.Arguments;
            logger.LogInformation("扣減庫存");
            await Task.Delay(100);
            return context.Completed(new DeductStockLog() { ProductId = argument.ProductId, Amount = 1 });
        }
    }

       生成訂單 Execute Activity

    public class CreateOrderActivity : IExecuteActivity<CreateOrderModel>
    {
        private readonly ILogger<CreateOrderActivity> logger;
        public CreateOrderActivity(ILogger<CreateOrderActivity> logger)
        {
            this.logger = logger;
        }
        public async Task<ExecutionResult> Execute(ExecuteContext<CreateOrderModel> context)
        {
            logger.LogInformation("創建訂單");
            await Task.Delay(100);
            //throw new CommonActivityExecuteFaildException("當日訂單已達到上限");
            return context.CompletedWithVariables(new CreateOrderResult { OrderId="111122",Message="創建訂單成功" });
        }
    }

  組裝 以上 Activity 生成一個 Routing Slip,這是一個有序的組合,扣減庫存=》扣減餘額=》生成訂單

            var builder = new RoutingSlipBuilder(NewId.NextGuid());
builder.AddActivity("DeductStock", new Uri($"{configuration["RabbitmqConfig:HostUri"]}/DeductStock_execute"), new DeductStockModel { ProductId = request.Message.ProductId }); builder.AddActivity("DeductBalance", new Uri($"{configuration["RabbitmqConfig:HostUri"]}/DeductBalance_execute"), new DeductBalanceModel { CustomerId = request.Message.CustomerId, Price = request.Message.Price }); builder.AddActivity("CreateOrder", new Uri($"{configuration["RabbitmqConfig:HostUri"]}/CreateOrder_execute"), new CreateOrderModel { Price = request.Message.Price, CustomerId = request.Message.CustomerId, ProductId = request.Message.ProductId });
var routingSlip = builder.Build();

  執行 Routing Slip

await bus.Execute(routingSlip);

  

      這裡是沒有任何返回值的,所有activity都是 異步執行,雖然所有的activity可以執行完成或者由於某個Activity執行出錯而全部回退。(其實這裡有一種更壞的情況就是 Compensate 出錯,默認情況下 Masstransit 只會發送一個回退錯誤的消息,後面講到創建訂單的時候我會把它塞到錯誤隊列里,這樣我們可以通過修改 Compensate bug后重新導入到正常隊列來修正數據),這個功能完全滿足不了 創建訂單這個需求,執行 await bus.Execute(routingSlip) 后我們完全不知道訂單到底創建成功,還是由於庫存或餘額不足而失敗了(異步)。

     還好 routing slip 在執行過程中產生很多消息,比如 RoutingSlipCompleted ,RoutingSlipCompensationFailed ,RoutingSlipActivityCompleted,RoutingSlipActivityFaulted 等,具體文檔,我們可以訂閱這些事件,再結合Request/Response 實現 創建訂單的功能。

實現創建訂單(庫存滿足+餘額滿足)長流程

創建訂單 command 

    /// <summary>
    /// 長流程 分佈式事務
    /// </summary>
    public class CreateOrderCommand
    {
        public string ProductId { get; set; }
        public string CustomerId { get; set; }
        public int Price { get; set; }
    }

  事務第一步,扣減庫存相關 代碼

  public class DeductStockActivity : IActivity<DeductStockModel, DeductStockLog>
    {
        private readonly ILogger<DeductStockActivity> logger;
        public DeductStockActivity(ILogger<DeductStockActivity> logger)
        {
            this.logger = logger;
        }
        public async Task<CompensationResult> Compensate(CompensateContext<DeductStockLog> context)
        {
            var log = context.Log;
            logger.LogInformation("還原庫存");
            return context.Compensated();
        }

        public async Task<ExecutionResult> Execute(ExecuteContext<DeductStockModel> context)
        {
            var argument = context.Arguments;
            logger.LogInformation("扣減庫存");
            await Task.Delay(100);
            return context.Completed(new DeductStockLog() { ProductId = argument.ProductId, Amount = 1 });
        }
    }
    public class DeductStockModel
    {
        public string ProductId { get; set; }
    }
    public class DeductStockLog
    {
        public string ProductId { get; set; }
        public int Amount { get; set; }
    }

 事務第二步,扣減餘額相關代碼

public class DeductBalanceActivity : IActivity<DeductBalanceModel, DeductBalanceLog>
    {
        private readonly ILogger<DeductBalanceActivity> logger;
        public DeductBalanceActivity(ILogger<DeductBalanceActivity> logger)
        {
            this.logger = logger;
        }
        public async Task<CompensationResult> Compensate(CompensateContext<DeductBalanceLog> context)
        {
            logger.LogInformation("還原餘額");
            var log = context.Log;
            //throw new ArgumentException("some things were wrong");
            return context.Compensated();
        }

        public async Task<ExecutionResult> Execute(ExecuteContext<DeductBalanceModel> context)
        {

            logger.LogInformation("扣減餘額");
            await Task.Delay(100);
            return context.Completed(new DeductBalanceLog() { Price = 100 });
        }
    }
    public class DeductBalanceModel
    {
        public string CustomerId { get; set; }
        public int Price { get; set; }
    }
    public class DeductBalanceLog
    {
        public int Price { get; set; }
    }

 事務第三步,創建訂單相關代碼

 public class CreateOrderActivity : IExecuteActivity<CreateOrderModel>
    {
        private readonly ILogger<CreateOrderActivity> logger;
        public CreateOrderActivity(ILogger<CreateOrderActivity> logger)
        {
            this.logger = logger;
        }
        public async Task<ExecutionResult> Execute(ExecuteContext<CreateOrderModel> context)
        {
            logger.LogInformation("創建訂單");
            await Task.Delay(100);
            //throw new CommonActivityExecuteFaildException("當日訂單已達到上限");
            return context.CompletedWithVariables(new CreateOrderResult { OrderId="111122",Message="創建訂單成功" });
        }
    }
    public class CreateOrderModel
    {
        public string ProductId { get; set; }
        public string CustomerId { get; set; }
        public int Price { get; set; }
    }
    public class CreateOrderResult
    {
        public string OrderId { get; set; }
        public string Message { get; set; }
    }

   我通過 消費 創建訂單 request,獲取 request 的 response 地址與 RequestId,這兩個值 返回 response 時需要用到,我把這些信息存到 RoutingSlip中,並且訂閱 RoutingSlipEvents.Completed | RoutingSlipEvents.Faulted | RoutingSlipEvents.CompensationFailed 三種事件,當這三種消息出現時 我會根據 事件類別 和RoutingSlip中 之前加入的 (response 地址與 RequestId)生成 Response ,整個過程大概就是這麼個意思,沒理解可以看demo。這裏由於每一個事物所需要用到的 RoutingSlip + Request/Response 步驟都類似 可以抽象一下(模板方法),把Activity 的組裝 延遲到派生類去解決,這個代理類Masstransit有 ,但是官方沒有顧及到 CompensationFailed 的情況,所以我乾脆自己再寫一個。

    public abstract class RoutingSlipDefaultRequestProxy<TRequest> :
        IConsumer<TRequest>
        where TRequest : class
    {
        public async Task Consume(ConsumeContext<TRequest> context)
        {
            var builder = new RoutingSlipBuilder(NewId.NextGuid());

            builder.AddSubscription(context.ReceiveContext.InputAddress, RoutingSlipEvents.Completed | RoutingSlipEvents.Faulted | RoutingSlipEvents.CompensationFailed);
            
            builder.AddVariable("RequestId", context.RequestId);
            builder.AddVariable("ResponseAddress", context.ResponseAddress);
            builder.AddVariable("FaultAddress", context.FaultAddress);
            builder.AddVariable("Request", context.Message);

            await BuildRoutingSlip(builder, context);

            var routingSlip = builder.Build();

            await context.Execute(routingSlip).ConfigureAwait(false);
        }

        protected abstract Task BuildRoutingSlip(RoutingSlipBuilder builder, ConsumeContext<TRequest> request);
    }


 這個 是派生類 Routing slip 的拼裝過程 

    public class CreateOrderRequestProxy : RoutingSlipDefaultRequestProxy<CreateOrderCommand>

    {
        private readonly IConfiguration configuration;
        public CreateOrderRequestProxy(IConfiguration configuration)
        {
            this.configuration = configuration;
        }
        protected override Task BuildRoutingSlip(RoutingSlipBuilder builder, ConsumeContext<CreateOrderCommand> request)
        {
            builder.AddActivity("DeductStock", new Uri($"{configuration["RabbitmqConfig:HostUri"]}/DeductStock_execute"), new DeductStockModel { ProductId = request.Message.ProductId });

            builder.AddActivity("DeductBalance", new Uri($"{configuration["RabbitmqConfig:HostUri"]}/DeductBalance_execute"), new DeductBalanceModel { CustomerId = request.Message.CustomerId, Price = request.Message.Price });

            builder.AddActivity("CreateOrder", new Uri($"{configuration["RabbitmqConfig:HostUri"]}/CreateOrder_execute"), new CreateOrderModel { Price = request.Message.Price, CustomerId = request.Message.CustomerId, ProductId = request.Message.ProductId });

            return Task.CompletedTask;
        }
    }

  構造response 基類,主要是對三種情況做處理。

 

    public abstract class RoutingSlipDefaultResponseProxy<TRequest, TResponse, TFaultResponse> : IConsumer<RoutingSlipCompensationFailed>, IConsumer<RoutingSlipCompleted>,
        IConsumer<RoutingSlipFaulted>
        where TRequest : class
        where TResponse : class
        where TFaultResponse : class
    {
        public async Task Consume(ConsumeContext<RoutingSlipCompleted> context)
        {
            var request = context.Message.GetVariable<TRequest>("Request");
            var requestId = context.Message.GetVariable<Guid>("RequestId");

            Uri responseAddress = null;
            if (context.Message.Variables.ContainsKey("ResponseAddress"))
                responseAddress = context.Message.GetVariable<Uri>("ResponseAddress");

            if (responseAddress == null)
                throw new ArgumentException($"The response address could not be found for the faulted routing slip: {context.Message.TrackingNumber}");

            var endpoint = await context.GetResponseEndpoint<TResponse>(responseAddress, requestId).ConfigureAwait(false);

            var response = await CreateResponseMessage(context, request);

            await endpoint.Send(response).ConfigureAwait(false);
        }

        public async Task Consume(ConsumeContext<RoutingSlipFaulted> context)
        {
            var request = context.Message.GetVariable<TRequest>("Request");
            var requestId = context.Message.GetVariable<Guid>("RequestId");

            Uri faultAddress = null;
            if (context.Message.Variables.ContainsKey("FaultAddress"))
                faultAddress = context.Message.GetVariable<Uri>("FaultAddress");
            if (faultAddress == null && context.Message.Variables.ContainsKey("ResponseAddress"))
                faultAddress = context.Message.GetVariable<Uri>("ResponseAddress");

            if (faultAddress == null)
                throw new ArgumentException($"The fault/response address could not be found for the faulted routing slip: {context.Message.TrackingNumber}");

            var endpoint = await context.GetFaultEndpoint<TResponse>(faultAddress, requestId).ConfigureAwait(false);

            var response = await CreateFaultedResponseMessage(context, request, requestId);

            await endpoint.Send(response).ConfigureAwait(false);
        }
        public async Task Consume(ConsumeContext<RoutingSlipCompensationFailed> context)
        {
            var request = context.Message.GetVariable<TRequest>("Request");
            var requestId = context.Message.GetVariable<Guid>("RequestId");

            Uri faultAddress = null;
            if (context.Message.Variables.ContainsKey("FaultAddress"))
                faultAddress = context.Message.GetVariable<Uri>("FaultAddress");
            if (faultAddress == null && context.Message.Variables.ContainsKey("ResponseAddress"))
                faultAddress = context.Message.GetVariable<Uri>("ResponseAddress");

            if (faultAddress == null)
                throw new ArgumentException($"The fault/response address could not be found for the faulted routing slip: {context.Message.TrackingNumber}");

            var endpoint = await context.GetFaultEndpoint<TResponse>(faultAddress, requestId).ConfigureAwait(false);

            var response = await CreateCompensationFaultedResponseMessage(context, request, requestId);

            await endpoint.Send(response).ConfigureAwait(false);
        }
        protected abstract Task<TResponse> CreateResponseMessage(ConsumeContext<RoutingSlipCompleted> context, TRequest request);

        protected abstract Task<TFaultResponse> CreateFaultedResponseMessage(ConsumeContext<RoutingSlipFaulted> context, TRequest request, Guid requestId);
        protected abstract Task<TFaultResponse> CreateCompensationFaultedResponseMessage(ConsumeContext<RoutingSlipCompensationFailed> context, TRequest request, Guid requestId);
    }

 Response 派生類 ,這裏邏輯可以隨自己定義,我也是隨便寫了個 CommonResponse和一個業務錯誤拋錯(犧牲了一點性能)。

    public class CreateOrderResponseProxy :
            RoutingSlipDefaultResponseProxy<CreateOrderCommand, CommonCommandResponse<CreateOrderResult>, CommonCommandResponse<CreateOrderResult>>
    {

        protected override Task<CommonCommandResponse<CreateOrderResult>> CreateResponseMessage(ConsumeContext<RoutingSlipCompleted> context, CreateOrderCommand request)
        {

            return Task.FromResult(new CommonCommandResponse<CreateOrderResult>
            {
                Status = 1,
                Result = new CreateOrderResult
                {
                    Message = context.Message.Variables.TryGetAndReturn(nameof(CreateOrderResult.Message))?.ToString(),
                    OrderId = context.Message.Variables.TryGetAndReturn(nameof(CreateOrderResult.OrderId))?.ToString(),
                }
            });
        }
        protected override Task<CommonCommandResponse<CreateOrderResult>> CreateFaultedResponseMessage(ConsumeContext<RoutingSlipFaulted> context, CreateOrderCommand request, Guid requestId)
        {
            var commonActivityExecuteFaildException = context.Message.ActivityExceptions.FirstOrDefault(m => m.ExceptionInfo.ExceptionType == typeof(CommonActivityExecuteFaildException).FullName);
            if (commonActivityExecuteFaildException != null)
            {
                return Task.FromResult(new CommonCommandResponse<CreateOrderResult>
                {
                    Status = 2,
                    Message = commonActivityExecuteFaildException.ExceptionInfo.Message
                });
            }
            // system error  log here
            return Task.FromResult(new CommonCommandResponse<CreateOrderResult>
            {
                Status = 3,
                Message = "System error"
            });
        }

        protected override Task<CommonCommandResponse<CreateOrderResult>> CreateCompensationFaultedResponseMessage(ConsumeContext<RoutingSlipCompensationFailed> context, CreateOrderCommand request, Guid requestId)
        {
            var exception = context.Message.ExceptionInfo;
            // lg here context.Message.ExceptionInfo
            return Task.FromResult(new CommonCommandResponse<CreateOrderResult>
            {
                Status = 3,
                Message = "System error"
            });           
        }
    }

對於  CompensationFailed 的處理 通過 ActivityCompensateErrorTransportFilter 實現 發送到錯誤消息隊列,後續通過prometheus + rabbitmq-exporter + alertmanager 觸發告警 通知相關人員處理。

  public class ActivityCompensateErrorTransportFilter<TActivity, TLog> : IFilter<CompensateActivityContext<TActivity, TLog>>
        where TActivity : class, ICompensateActivity<TLog>
        where TLog : class
    {
        public void Probe(ProbeContext context)
        {
            context.CreateFilterScope("moveFault");
        }

        public async Task Send(CompensateActivityContext<TActivity, TLog> context, IPipe<CompensateActivityContext<TActivity, TLog>> next)
        {
            try
            {
                await next.Send(context).ConfigureAwait(false);
            }
            catch(Exception ex)
            {
                if (!context.TryGetPayload(out IErrorTransport transport))
                    throw new TransportException(context.ReceiveContext.InputAddress, $"The {nameof(IErrorTransport)} was not available on the {nameof(ReceiveContext)}.");
                var exceptionReceiveContext = new RescueExceptionReceiveContext(context.ReceiveContext, ex);
                await transport.Send(exceptionReceiveContext);
            }
        }
    }

註冊 filter 

    public class RoutingSlipCompensateErrorSpecification<TActivity, TLog> : IPipeSpecification<CompensateActivityContext<TActivity, TLog>>
        where TActivity : class, ICompensateActivity<TLog>
        where TLog : class
    {
        public void Apply(IPipeBuilder<CompensateActivityContext<TActivity, TLog>> builder)
        {
            builder.AddFilter(new ActivityCompensateErrorTransportFilter<TActivity, TLog>());
        }

        public IEnumerable<ValidationResult> Validate()
        {
           yield return this.Success("success");
        }
    }


            cfg.ReceiveEndpoint("DeductStock_compensate", ep =>
            {
                ep.PrefetchCount = 100;
                ep.CompensateActivityHost<DeductStockActivity, DeductStockLog>(context.Container, conf =>
                 {
                     conf.AddPipeSpecification(new RoutingSlipCompensateErrorSpecification<DeductStockActivity, DeductStockLog>());
                 });

            });

 

實現創建產品(創建完成+添加庫存)

實現了 創建訂單的功能,整個流程其實是同步的,我在想能不能實現最為簡單的最終一致性 比如 創建一個產品 ,然後異步生成它的庫存 ,我發現是可以的,因為我們可以監聽到每一個Execute Activity 的完成事件,並且把出錯時的信息通過 filter 塞到 錯誤隊列中。

這裏的代碼就不貼了,詳情請看 demo

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※超省錢租車方案

※教你寫出一流的銷售文案?

分類
發燒車訊

看準中國市場 LG 化學在南京建電動車電池工廠

韓國 LG 化學今(10)日表示,2015 年將在中國建電動車電池工廠,此舉是在押注身為全球最大汽車市場的中國需求將持續增加。

LG 化學稱,這座工廠設在中國南京,將滿足上汽集團等中國汽車製造商和通用汽車 (GM) 等全球性企業的需求,工廠耗資數億美元,預計 2020 年綜合營收將達到 1 兆韓元 ( 約 9.899 億美元)。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

新北清潔公司,居家、辦公、裝潢細清專業服務

※推薦評價好的iphone維修中心

分類
發燒車訊

雷諾與法國富豪攜手 搶進電動車市場

法國雷諾集團(Renault SA)與法國富豪波洛黑(Vincent Bollore)決定合作,共同製造電動車,瞄準零排放汽車日益強勁的需求,及正於各地興起的環保汽車租用潮流。   雷諾汽車將於 2015 下半年開始生產波洛黑的電動車款「藍車(Bluecar)」,但並未透露生產目標。該車款自 2011 年起,被使用於巴黎名為「Autolib」的電動車租賃共享計畫中,此計畫同時也於里昂及波爾多等地運行。   此外,雷諾汽車與波洛黑亦宣布,將成立一策略聯盟,其中波洛黑擁有 70% 股權,雷諾股權則達 30%。該聯盟將於法國及歐洲其他地方提供車輛共享服務。另外並進行研究,以協助雷諾製造一款 3 人座、使用波洛黑所生產電池的電動車。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

※回頭車貨運收費標準

分類
發燒車訊

ASP.NET Core Blazor Webassembly 之 漸進式應用(PWA)

Blazor支持漸進式應用開發也就是PWA。使用PWA模式可以使得web應用有原生應用般的體驗。

什麼是PWA

PWA應用是指那些使用指定技術和標準模式來開發的web應用,這將同時賦予它們web應用和原生應用的特性。
例如,web應用更加易於發現——相比於安裝應用,訪問一個網站顯然更加容易和迅速,並且你可以通過一個鏈接來分享web應用。
在另一方面,原生應用與操作系統可以更加完美的整合,也因此為用戶提供了無縫的用戶體驗。你可以通過安裝應用使得它在離線的狀態下也可以運行,並且相較於使用瀏覽器訪問,用戶也更喜歡通過點擊主頁上的圖標來訪問它們喜愛的應用。
PWA賦予了我們創建同時擁有以上兩種優勢的應用的能力。
這並不是一個新概念——這樣的想法在過去已經在web平台上通過許多方法出現了多次。漸進式增強和響應式設計已經可以讓我們構建對移動端友好的網站。在多年以前的Firefox OS的生態系統中離線運行和安裝web應用已經成為了可能。
PWAs, 不但如此,更是提供了所有的甚至是更多的特性,來讓web更加優秀。

引用自MDN

說人話就是PWA可以讓你的web程序跟一般應用一樣運行,有桌面圖標,能離線,沒有瀏覽器地址欄,一切看起來想個普通的程序/APP。

新建Blazor PWA程序

使用VS新建一個Blazor程序,選擇Webassembly模式,勾選支持PWA。

支持PWA的Blazor程序主要是多了幾個東西:

  1. manifest.json
  2. service-worker.js

manifest.json

manifest.json是個清單文件,當程序被安裝到設備上的時候會讀取裏面的信息,名稱是什麼,圖標是什麼,什麼語言等等。

{
  "name": "BlazorPWA",
  "short_name": "BlazorPWA",
  "start_url": "./",
  "display": "standalone",
  "background_color": "#ffffff",
  "theme_color": "#03173d",
  "icons": [
    {
      "src": "icon-512.png",
      "type": "image/png",
      "sizes": "512x512"
    }
  ]
}

service-worker.js

service-worker用來跑一些後台任務。它跟瀏覽器主進程是隔離的,也就是說跟原來的JavaScript運行時是分開,當然了它不會阻塞頁面。我們可以用它來完成一些功能,比如對所有的fetch/xhr請求進行過濾,哪些請求走緩存,哪些不走緩存;比如在後台偷偷給你拉一些數據緩存起來。

// Caution! Be sure you understand the caveats before publishing an application with
// offline support. See https://aka.ms/blazor-offline-considerations

self.importScripts('./service-worker-assets.js');
self.addEventListener('install', event => event.waitUntil(onInstall(event)));
self.addEventListener('activate', event => event.waitUntil(onActivate(event)));
self.addEventListener('fetch', event => event.respondWith(onFetch(event)));

const cacheNamePrefix = 'offline-cache-';
const cacheName = `${cacheNamePrefix}${self.assetsManifest.version}`;
const offlineAssetsInclude = [ /\.dll$/, /\.pdb$/, /\.wasm/, /\.html/, /\.js$/, /\.json$/, /\.css$/, /\.woff$/, /\.png$/, /\.jpe?g$/, /\.gif$/, /\.ico$/ ];
const offlineAssetsExclude = [ /^service-worker\.js$/ ];

async function onInstall(event) {
    console.info('Service worker: Install');

    // Fetch and cache all matching items from the assets manifest
    const assetsRequests = self.assetsManifest.assets
        .filter(asset => offlineAssetsInclude.some(pattern => pattern.test(asset.url)))
        .filter(asset => !offlineAssetsExclude.some(pattern => pattern.test(asset.url)))
        .map(asset => new Request(asset.url, { integrity: asset.hash }));
    await caches.open(cacheName).then(cache => cache.addAll(assetsRequests));
}

async function onActivate(event) {
    console.info('Service worker: Activate');

    // Delete unused caches
    const cacheKeys = await caches.keys();
    await Promise.all(cacheKeys
        .filter(key => key.startsWith(cacheNamePrefix) && key !== cacheName)
        .map(key => caches.delete(key)));
}

async function onFetch(event) {
    let cachedResponse = null;
    if (event.request.method === 'GET') {
        // For all navigation requests, try to serve index.html from cache
        // If you need some URLs to be server-rendered, edit the following check to exclude those URLs
        const shouldServeIndexHtml = event.request.mode === 'navigate';

        const request = shouldServeIndexHtml ? 'index.html' : event.request;
        const cache = await caches.open(cacheName);
        cachedResponse = await cache.match(request);
    }

    return cachedResponse || fetch(event.request);
}

項目里有2個service-worker.js文件,一個是開發時候的沒邏輯,還有一個是發布時候的有一些緩存的邏輯。

運行一下

如果是PWA程序,在瀏覽器地址欄有個+號一樣的圖標,點擊可以把程序安裝到本地。

安裝完了會在桌面生成一個圖標,打開會是一個沒有瀏覽器地址欄的界面。

這樣一個PWA程序已經可以運行了。

離線運行

如果只是這樣,僅僅是沒有瀏覽器地址欄,那PWA也太沒什麼吸引力了。個人覺得PWA最大的魅力就是可以離線運行,在沒有網絡的情況下依然可以運行,這樣才像一個原生編寫的程序。

修改service-worker

離線的原理也很簡單,就是請求的數據都緩存起來,一般是緩存Get請求,比如各種頁面圖片等。

// In development, always fetch from the network and do not enable offline support.
// This is because caching would make development more difficult (changes would not
// be reflected on the first load after each change).

self.addEventListener('fetch', event => event.respondWith(onFetch(event)));
self.addEventListener('install', event => event.waitUntil(onInstall(event)));

async function onInstall(event) {
    console.info('Service worker: Install');
}


async function onFetch(event) {
    let cachedResponse = null;
    const cache = await caches.open('blazor_pwa');
    if (event.request.method === 'GET') {
        const request = event.request;
        cachedResponse = await caches.match(request);
        if (cachedResponse) {
            return cachedResponse;
        }
        var resp = await fetch(event.request)
        cache.put(event.request, resp.clone());
        return resp;
    }

    return fetch(event.request);
}

修改一下sevice-worker.js,把GET請求全部緩存起來。這裏為了演示圖方便,其實情況顯然不會這麼簡單粗暴。為了能緩存頁面,顯然必須先在線運行成功一次。

模擬離線

當我們修改完上面的js,然後在線正常一次后,可以看到所有GET請求的資源都被緩存起來了。

我們可以用chrome來模擬離線情況:

選擇offline模式,然後刷新我們的頁面,如果依然可以正常運行則表示可以離線運行。

總結

使用Blazor可以快速的開發PWA應用。利用PWA跟Blazor Webassembly的特性,可以開發出類似桌面的應用程序。或許這是跨平台桌面應用開發除了electron的又一種方案吧。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

※回頭車貨運收費標準

分類
發燒車訊

使用Kubernetes、K3s和Traefik2進行本地開發

作者簡介

Vyacheslav,擁有運維和項目管理經驗的軟件工程師

這篇文章將承接我此前搭建的本地Docker開發環境,具體步驟已經放在在以下網址:

https://github.com/Voronenko/traefik2-compose-template

除了經典的docker化的項目之外,我還有其他的Kubernetes項目。儘管Kubernetes已經成為容器編排的事實標準,但是不得不承認Kubernetes是一個既消耗資源又消耗金錢的平台。由於我並不經常需要外部集群,因此我使用輕量級K3s發行版來進行Kubernetes本地開發。

K3s是為IoT和邊緣計算而構建的經過認證的Kubernetes發行版之一,還能夠按產品規模部署到VM。

我使用K3s的方式是這樣的:在我的工作筆記本上本地安裝K3s,儘管有時我需要在本地部署較重的測試工作負載,為此,我準備了兩個神器——兩個運行ESXi的外部Intel NUCs。

默認情況下,K3s安裝Traefik 1.x作為ingress,如果你對此十分滿意,那麼無需往下繼續閱讀了。

在我的場景中,我同時會牽涉到好幾個項目,特別是經典的docker和docker swarm,因此我經常遇到在獨立模式下部署Traefik的情況。

因此,本文其餘部分將深入介紹如何將外部traefik2配置為K3s集群的ingress。

安裝Kubernetes K3s系列集群

你可以按照常規方式使用命令curl -sfL https://get.k3s.io | sh -安裝K3s,或者你可以使用輕量實用程序k3sup安裝(https://github.com/alexellis/k3sup)。具體步驟在之前的文章介紹過。

與我們的設置不同的是,我們使用命令--no-deploy traefik專門安裝了不帶traefik組件的K3s。

export CLUSTER_MASTER=192.168.3.100
export CLUSTER_DEPLOY_USER=slavko
k3sup install --ip $CLUSTER_MASTER --user $CLUSTER_DEPLOY_USER --k3s-extra-args '--no-deploy traefik'

執行后,你將獲得使用kubectl所需的連接詳細信息。安裝K3s后,你可以快速檢查是否可以看到節點。

# Test your cluster with - export path to k3s cluster kubeconfig:
export KUBECONFIG=/home/slavko/kubeconfig
kubectl get node -o wide

注:這裏沒有固定的安裝模式,你甚至可以使用docker-compose自行啟動它。

server:
  image: rancher/k3s:v0.8.0
  command: server --disable-agent --no-deploy traefik
  environment:
    - K3S_CLUSTER_SECRET=somethingtotallyrandom
    - K3S_KUBECONFIG_OUTPUT=/output/kubeconfig.yaml
    - K3S_KUBECONFIG_MODE=666
  volumes:
    # k3s will generate a kubeconfig.yaml in this directory. This volume is mounted
    # on your host, so you can then 'export KUBECONFIG=/somewhere/on/your/host/out/kubeconfig.yaml',
    # in order for your kubectl commands to work.
    - /somewhere/on/your/host/out:/output
    # This directory is where you put all the (yaml) configuration files of
    # the Kubernetes resources.
    - /somewhere/on/your/host/in:/var/lib/rancher/k3s/server/manifests
  ports:
    - 6443:6443

node:
  image: rancher/k3s:v0.8.0
  privileged: true
  links:
    - server
  environment:
    - K3S_URL=https://server:6443
    - K3S_CLUSTER_SECRET=somethingtotallyrandom
  volumes:
    # this is where you would place a alternative traefik image (saved as a .tar file with
    # 'docker save'), if you want to use it, instead of the traefik:v2.0 image.
    - /sowewhere/on/your/host/custom-image:/var/lib/rancher/k3s/agent/images

配置Traefik 2,與Kubernetes一起使用

在文章開頭提到的鏈接中,我已經在我的系統中安裝了Traefik 2,並根據該鏈接內容,服務於一些需求。現在是時候配置Traefik 2 Kubernetes後端了。

Traefik 2使用CRD(自定義資源定義)來完成這一點。定義的最新示例可以在以下鏈接中找到,但這些示例僅適用於Traefik 2也作為Kubernetes工作負載的一部分執行的情況:

https://docs.traefik.io/reference/dynamic-configuration/kubernetes-crd/

對於外部Traefik 2,我們僅需要以下描述的定義子集。

我們引入一系列自定義資源定義,以允許我們來描述我們的Kubernetes服務將會如何暴露到外部,traefik-crd.yaml

apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: ingressroutes.traefik.containo.us

spec:
  group: traefik.containo.us
  version: v1alpha1
  names:
    kind: IngressRoute
    plural: ingressroutes
    singular: ingressroute
  scope: Namespaced

---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: ingressroutetcps.traefik.containo.us

spec:
  group: traefik.containo.us
  version: v1alpha1
  names:
    kind: IngressRouteTCP
    plural: ingressroutetcps
    singular: ingressroutetcp
  scope: Namespaced

---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: middlewares.traefik.containo.us

spec:
  group: traefik.containo.us
  version: v1alpha1
  names:
    kind: Middleware
    plural: middlewares
    singular: middleware
  scope: Namespaced

---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: tlsoptions.traefik.containo.us

spec:
  group: traefik.containo.us
  version: v1alpha1
  names:
    kind: TLSOption
    plural: tlsoptions
    singular: tlsoption
  scope: Namespaced

---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: traefikservices.traefik.containo.us

spec:
  group: traefik.containo.us
  version: v1alpha1
  names:
    kind: TraefikService
    plural: traefikservices
    singular: traefikservice
  scope: Namespaced  

同時,我們需要集群角色traefik-ingress-controller,以提供對服務、端點和secret的只讀訪問權限以及自定義的traefik.containo.us組,traefik-clusterrole.yaml

kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
  name: traefik-ingress-controller

rules:
  - apiGroups:
      - ""
    resources:
      - services
      - endpoints
      - secrets
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - extensions
    resources:
      - ingresses
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - extensions
    resources:
      - ingresses/status
    verbs:
      - update
  - apiGroups:
      - traefik.containo.us
    resources:
      - middlewares
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - traefik.containo.us
    resources:
      - ingressroutes
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - traefik.containo.us
    resources:
      - ingressroutetcps
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - traefik.containo.us
    resources:
      - tlsoptions
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - traefik.containo.us
    resources:
      - traefikservices
    verbs:
      - get
      - list
      - watch

最後,我們需要系統服務賬號traefik-ingress-controller與之前創建的集群角色traefik-ingress-controller相關聯。

---
kind: ServiceAccount
apiVersion: v1
metadata:
  namespace: kube-system
  name: traefik-ingress-controller

---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
  name: traefik-ingress-controller

roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: traefik-ingress-controller
subjects:
  - kind: ServiceAccount
    name: traefik-ingress-controller
    namespace: kube-system

我們應用以上資源之後:

apply:
  kubectl apply -f traefik-crd.yaml
  kubectl apply -f traefik-clusterrole.yaml
  kubectl apply -f traefik-service-account.yaml

我們已經準備好開始調整Traefik 2

將Traefik 2指向K3s集群

根據Traefik文檔的建議,當Traefik部署到Kubernetes中時,它將讀取環境變量KUBERNETES_SERVICE_HOST和KUBERNETES_SERVICE_PORT或KUBECONFIG來構造端點。

/var/run/secrets/kubernetes.io/serviceaccount/token中查找訪問token,而SSL CA證書將在/var/run/secrets/kubernetes.io/serviceaccount/ca.crt.中查找。當部署到Kubernetes內部時,兩者都會自動提供掛載。

當無法找到環境變量時,Traefik會嘗試使用external-cluster客戶端連接到Kubernetes API server。這一情況下,需要設置endpoint。具體來說,可以將其設置為kubectl代理使用的URL,以使用相關的kubeconfig授予的身份驗證和授權連接到Kubernetes集群。

Traefik 2可以使用任何受支持的配置類型來靜態配置-toml、yaml或命令行交換。

[providers.kubernetesCRD]
  endpoint = "http://localhost:8080"
  token = "mytoken"
providers:
  kubernetesCRD:
    endpoint = "http://localhost:8080"
    token = "mytoken"
    # ...
--providers.kubernetescrd.endpoint=http://localhost:8080 
--providers.kubernetescrd.token=mytoken

第一次運行時,如果你在外部有Traefik,很有可能沒有traefik-ingress-controller訪問token來指定mytoken。那麼,你需要執行以下命令:

# Check all possible clusters, as your .KUBECONFIG may have multiple contexts:
kubectl config view -o jsonpath='{"Cluster name\tServer\n"}{range .clusters[*]}{.name}{"\t"}{.cluster.server}{"\n"}{end}'

# Output kind of
# Alias tip: k config view -o jsonpath='{"Cluster name\tServer\n"}{range .clusters[*]}{.name}{"\t"}{.cluster.server}{"\n"}{end}'
# Cluster name  Server
# default  https://127.0.0.1:6443

# You are interested in: "default", if you did not name it differently

# Select name of cluster you want to interact with from above output:
export CLUSTER_NAME="default"

# Point to the API server referring the cluster name
export APISERVER=$(kubectl config view -o jsonpath="{.clusters[?(@.name==\"$CLUSTER_NAME\")].cluster.server}")
# usually https://127.0.0.1:6443

# Gets the token value
export TOKEN=$(kubectl get secrets -o jsonpath="{.items[?(@.metadata.annotations['kubernetes\.io/service-account\.name']=='traefik-ingress-controller')].data.token}" --namespace kube-system|base64 --decode)

# Explore the API with TOKEN

如果成功了,你應該收到以下響應:

{
  "kind": "APIVersions",
  "versions": [
    "v1"
  ],
  "serverAddressByClientCIDRs": [
    {
      "clientCIDR": "0.0.0.0/0",
      "serverAddress": "192.168.3.100:6443"
    }
  ]

以及一些事實,如token:

eyJhbGciOiJSUzI1NiIsImtpZCI6IjBUeTQyNm5nakVWbW5PaTRRbDhucGlPeWhlTHhxTXZjUDJsRmNacURjVnMifQ.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJrdWJlLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VjcmV0Lm5hbWUiOiJ0cmFlZmlrLWluZ3Jlc3MtY29udHJvbGxlci10b2tlbi12emM3diIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50Lm5hbWUiOiJ0cmFlZmlrLWluZ3Jlc3MtY29udHJvbGxlciIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50LnVpZCI6ImQ5NTc3ZTkxLTdlNjQtNGMwNi1iZDgyLWNkZTk0OWM4MTI1MSIsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDprdWJlLXN5c3RlbTp0cmFlZmlrLWluZ3Jlc3MtY29udHJvbGxlciJ9.Mk8EBS4soO8uX-uSnV3o4qZKR6Iw6bgeSmPhHbJ2fjuqFgLnLh4ggxa-N9AqmCsEWiYjSi5oKAu986UEC-_kGQh3xaCYsUwlkM8147fsnwCbomSeGIct14JztVL9F8JwoDH6T0BOEjn-J9uY8-fUKYL_Y7uTrilhFapuILPsj_bFfgIeOOapRD0XshKBQV9Qzg8URxyQyfzl68ilm1Q13h3jLj8CFE2RlgEUFk8TqYH4T4fhfpvV-gNdmKJGODsJDI1hOuWUtBaH_ce9w6woC9K88O3FLKVi7fbvlDFrFoJ2iVZbrRALPjoFN92VA7a6R3pXUbKebTI3aUJiXyfXRQ

根據上次響應的API server的外部地址:https://192.168.3.100:6443

同樣,提供的token中沒有任何特殊之處:這是JWT的token,你可以使用https://jwt.io/#debugger-io,檢查它的內容。

{
  "alg": "RS256",
  "kid": "0Ty426ngjEVmnOi4Ql8npiOyheLxqMvcP2lFcZqDcVs"
}
{
  "iss": "kubernetes/serviceaccount",
  "kubernetes.io/serviceaccount/namespace": "kube-system",
  "kubernetes.io/serviceaccount/secret.name": "traefik-ingress-controller-token-vzc7v",
  "kubernetes.io/serviceaccount/service-account.name": "traefik-ingress-controller",
  "kubernetes.io/serviceaccount/service-account.uid": "d9577e91-7e64-4c06-bd82-cde949c81251",
  "sub": "system:serviceaccount:kube-system:traefik-ingress-controller"
}

正確的配置非常重要,因此請確保對APISERVER的兩個調用均返回合理的響應。

export APISERVER=YOURAPISERVER
export TOKEN=YOURTOKEN

curl -X GET $APISERVER/api --header "Authorization: Bearer $TOKEN" --insecure

curl -X GET $APISERVER/api/v1/endpoints --header "Authorization: Bearer $TOKEN" --insecure

創建其他訪問token

控制器循環確保每個服務賬戶都有一個帶有API token的secret,可以像我們之前那樣被發現。

此外,你還可以為一個服務賬戶創建額外的token,創建一個ServiceAccountToken類型的secret,併為服務賬戶添加一個註釋,控制器會用生成的token來更新它。

---
apiVersion: v1
kind: Secret
namespace: kube-system
metadata:
  name: traefik-manual-token
  annotations:
    kubernetes.io/service-account.name: traefik-ingress-controller
type: kubernetes.io/service-account-token

# Any tokens for non-existent service accounts will be cleaned up by the token controller.

# kubectl describe secrets/traefik-manual-token

用以下命令創建:

kubectl create -f ./traefik-service-account-secret.yaml
kubectl describe secret traefik-manual-token

刪除/無效:

kubectl delete secret traefik-manual-token

對外部traefik 2的更改構成定義

我們需要在文章開頭給出的鏈接中獲得的traefik2配置進行哪些更改?

https://github.com/Voronenko/traefik2-compose-template

a) 我們在新文件夾kubernetes_data中存儲ca.crt文件,該文件用於驗證對Kubernetes授權的調用。這是可以在kubeconfig文件的clusters-> cluster-> certificate-authority-data下找到的證書。

該volume將映射在/var/run/secrets/kubernetes.io/serviceaccount下以獲取官方Traefik 2鏡像

volumes:
    ...
      - ./kubernetes_data:/var/run/secrets/kubernetes.io/serviceaccount

b) 調整Traefik 2 kubernetescrd後端以提供3個參數:endpoint、證書路徑和token。請注意,作為外部Traefik作為docker容器,你需要指定正確的endpoint地址,並確保以安全的方式進行。

  - "--providers.kubernetescrd=true"
      - "--providers.kubernetescrd.endpoint=https://192.168.3.100:6443"
      - "--providers.kubernetescrd.certauthfilepath=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
      - "--providers.kubernetescrd.token=YOURTOKENWITHOUTANYQUOTES

如果你都執行正確了,那麼你現在應該在Traefik UI上看到了一些希望。如果你沒有看到traefik,或者在運行Traefik時有問題,你可以查看之後的故障排除部分。

現在是時候通過Trafik 2暴露一些Kubernetes服務了,以確保Traefik 2能夠作為ingress工作。讓我們來看經典案例whoami服務,whoami-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: whoami

spec:
  ports:
    - protocol: TCP
      name: web
      port: 80
  selector:
    app: whoami

---
kind: Deployment
apiVersion: apps/v1
metadata:
  namespace: default
  name: whoami
  labels:
    app: whoami

spec:
  replicas: 2
  selector:
    matchLabels:
      app: whoami
  template:
    metadata:
      labels:
        app: whoami
    spec:
      containers:
        - name: whoami
          image: containous/whoami
          ports:
            - name: web
              containerPort: 80

並且以http或https的方式暴露它,whoami.k.voronenko.net全限定域名下的whoami-ingress-route.yaml

apiVersion: traefik.containo.us/v1alpha1
kind: IngressRoute
metadata:
  name: ingressroute-notls
  namespace: default
spec:
  entryPoints:
    - web
  routes:
    - match: Host(`whoami.k.voronenko.net`)
      kind: Rule
      services:
        - name: whoami
          port: 80

---
apiVersion: traefik.containo.us/v1alpha1
kind: IngressRoute
metadata:
  name: ingressroute-tls
  namespace: default
spec:
  entryPoints:
    - websecure
  routes:
    - match: Host(`whoami.k.voronenko.net`)
      kind: Rule
      services:
        - name: whoami
          port: 80
  tls:
    certResolver: default

然後應用它:

kubectl apply -f whoami-service.yaml
  kubectl apply -f whoami-ingress-route.yaml

應用后,你應該會在Traefik dashboard上看到一些希望,即KubernetesCRD後端。

正如你所看到的,Traefik已經檢測到我們的K3s Kubernetes集群上運行的新工作負載,而且它與我們在同一個盒子上的經典Docker工作負載(如portainer)很好地共存。

讓我們檢查一下Traefik 2是否將Traefik路由到了我們的Kubernetes工作負載:如你所見,你可以在http和https endpoint上成功地接觸到whoami工作負載,瀏覽器接受你的證書為可信任的“綠標籤”。

我們的目標達到了!我們在本地筆記本上配置了Traefik 2。Traefik 2將你的docker或Kubernetes工作流暴露在http或https endpoint上。帶可選的 letsencrypt 的 Traefik 2 將負責 https。

故障排查

正如你所知,在配置過程可能存在多個問題,你可以考慮使用一些分析工具,如:

https://github.com/Voronenko/dotfiles/blob/master/Makefile#L185

我特別建議:

a) VMWare octant:這是一個基於Web的功能強大的Kubernetes dashboard,你可以在上面使用你的kubeconfig

b) Rakess:這是一個獨立工具也是一個kubectl插件,用於显示Kubernetes服務器資源的訪問矩陣(https://github.com/corneliusweig/rakkess)

檢查系統賬戶的憑據

rakkess --sa kube-system:traefik-ingress-controller

c) kubectl

檢查哪些角色與服務賬戶相關聯

kubectl get clusterrolebindings -o json | jq -r '
  .items[] |
  select(
    .subjects // [] | .[] |
    [.kind,.namespace,.name] == ["ServiceAccount","kube-system","traefik-ingress-controller"]
  ) |
  .metadata.name'

d) Traefik 文檔:例如kubernetescrd後端提供了更多配置開關的方式。

--providers.kubernetescrd  (Default: "false")
        Enable Kubernetes backend with default settings.
    --providers.kubernetescrd.certauthfilepath  (Default: "")
        Kubernetes certificate authority file path (not needed for in-cluster client).
    --providers.kubernetescrd.disablepasshostheaders  (Default: "false")
        Kubernetes disable PassHost Headers.
    --providers.kubernetescrd.endpoint  (Default: "")
        Kubernetes server endpoint (required for external cluster client).
    --providers.kubernetescrd.ingressclass  (Default: "")
        Value of kubernetes.io/ingress.class annotation to watch for.
    --providers.kubernetescrd.labelselector  (Default: "")
        Kubernetes label selector to use.
    --providers.kubernetescrd.namespaces  (Default: "")
        Kubernetes namespaces.
    --providers.kubernetescrd.throttleduration  (Default: "0")
        Ingress refresh throttle duration
    --providers.kubernetescrd.token  (Default: "")
        Kubernetes bearer token (not needed for in-cluster client).
    --providers.kubernetesingress  (Default: "false")
        Enable Kubernetes backend with default settings.
    --providers.kubernetesingress.certauthfilepath  (Default: "")
        Kubernetes certificate authority file path (not needed for in-cluster client).
    --providers.kubernetesingress.disablepasshostheaders  (Default: "false")
        Kubernetes disable PassHost Headers.
    --providers.kubernetesingress.endpoint  (Default: "")
        Kubernetes server endpoint (required for external cluster client).
    --providers.kubernetesingress.ingressclass  (Default: "")
        Value of kubernetes.io/ingress.class annotation to watch for.
    --providers.kubernetesingress.ingressendpoint.hostname  (Default: "")
        Hostname used for Kubernetes Ingress endpoints.
    --providers.kubernetesingress.ingressendpoint.ip  (Default: "")
        IP used for Kubernetes Ingress endpoints.
    --providers.kubernetesingress.ingressendpoint.publishedservice  (Default: "")
        Published Kubernetes Service to copy status from.
    --providers.kubernetesingress.labelselector  (Default: "")
        Kubernetes Ingress label selector to use.
    --providers.kubernetesingress.namespaces  (Default: "")
        Kubernetes namespaces.
    --providers.kubernetesingress.throttleduration  (Default: "0")
        Ingress refresh throttle duration
    --providers.kubernetesingress.token  (Default: "")
        Kubernetes bearer token (not needed for in-cluster client).

e) 確保Traefik有足夠的權限可以訪問apiserver endpoint

如果你希望Traefik為你查詢信息:通過在配置中放置一些錯誤的apiserver地址,可以查看訪問的endpoint和查詢順序。有了這些知識和你的Traefik Kubernetes token,你就可以使用Traefik憑證檢查這些endpoint是否可以訪問。

traefik_1    | E0421 12:30:12.624877       1 reflector.go:125] pkg/mod/k8s.io/client-go@v0.0.0-20190718183610-8e956561bbf5/tools/cache/reflector.go:98: Failed to list *v1.Endpoints: Get https://192.168.3.101:6443/api/v1/endpoints?limit=500&resourceVersion=0:
traefik_1    | E0421 12:30:12.625341       1 reflector.go:125] pkg/mod/k8s.io/client-go@v0.0.0-20190718183610-8e956561bbf5/tools/cache/reflector.go:98: Failed to list *v1.Service: Get https://192.168.3.101:6443/api/v1/services?limit=500&resourceVersion=0:
traefik_1    | E0421 12:30:12.625395       1 reflector.go:125] pkg/mod/k8s.io/client-go@v0.0.0-20190718183610-8e956561bbf5/tools/cache/reflector.go:98: Failed to list *v1beta1.Ingress: Get https://192.168.3.101:6443/apis/extensions/v1beta1/ingresses?limit=500&resourceVersion=0:
traefik_1    | E0421 12:30:12.625449       1 reflector.go:125] pkg/mod/k8s.io/client-go@v0.0.0-20190718183610-8e956561bbf5/tools/cache/reflector.go:98: Failed to list *v1alpha1.Middleware: Get https://192.168.3.101:6443/apis/traefik.containo.us/v1alpha1/middlewares?limit=500&resourceVersion=0:
traefik_1    | E0421 12:30:12.625492       1 reflector.go:125] pkg/mod/k8s.io/client-go@v0.0.0-20190718183610-8e956561bbf5/tools/cache/reflector.go:98: Failed to list *v1alpha1.IngressRoute: Get https://192.168.3.101:6443/apis/traefik.containo.us/v1alpha1/ingressroutes?limit=500&resourceVersion=0:
traefik_1    | E0421 12:30:12.625531       1 reflector.go:125] pkg/mod/k8s.io/client-go@v0.0.0-20190718183610-8e956561bbf5/tools/cache/reflector.go:98: Failed to list *v1alpha1.TraefikService: Get https://192.168.3.101:6443/apis/traefik.containo.us/v1alpha1/traefikservices?limit=500&resourceVersion=0:
traefik_1    | E0421 12:30:12.625572       1 reflector.go:125] pkg/mod/k8s.io/client-go@v0.0.0-20190718183610-8e956561bbf5/tools/cache/reflector.go:98: Failed to list *v1alpha1.TLSOption: Get https://192.168.3.101:6443/apis/traefik.containo.us/v1alpha1/tlsoptions?limit=500&resourceVersion=0:
traefik_1    | E0421 12:30:12.625610       1 reflector.go:125] pkg/mod/k8s.io/client-go@v0.0.0-20190718183610-8e956561bbf5/tools/cache/reflector.go:98: Failed to list *v1alpha1.IngressRouteTCP: Get https://192.168.3.101:6443/apis/traefik.containo.us/v1alpha1/ingressroutetcps?limit=500&resourceVersion=0:

f) 記錄K3s本身

安裝腳本將自動檢測你的操作系統是使用systemd還是openrc並啟動服務。使用openrc運行時,將在/var/log/k3s.log中創建日誌。使用systemd運行時,將在/var/log/syslog中創建日誌,並使用journalctl -u k3s查看。

在那裡,你可能會得到一些提示,例如:

кві 21 15:42:44 u18d k3s[612]: E0421 15:42:44.936960     612 authentication.go:104] Unable to authenticate the request due to an error: invalid bearer token

這將為你提供有關K8s Traefik起初使用時出現問題的線索,Enjoy your journey!

相關代碼你可以在以下鏈接中找到:

https://github.com/Voronenko/k3s-mini

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

※教你寫出一流的銷售文案?

※超省錢租車方案

分類
發燒車訊

厄瓜多總統與原住民領袖達協議 化解反撙節示威

摘錄自2019年10月14日中央社報導

莫雷諾為向國際貨幣基金貸款42億美元而取消燃油補貼後,國內物價迅速飆升,引發長達12天的示威,造成7人喪命。暴力衝突迫使莫雷諾將政府遷移至厄瓜多第2大城瓜亞基爾(Guayaquil),並嚴重影響石油業,能源部暫停超過2/3原油的配送。抗議人士甚至佔領亞馬遜雨林地區的3處石油設施。

厄瓜多總統莫雷諾及原住民領袖瓦爾加斯13日達成協議,終結近2週來反對撙節措施的暴力抗議。政府是為為獲得國際貨幣基金(IMF)數十億美元貸款,而採取這些緊縮措施。

法新社報導,聯合國官員代表宣讀的聯合聲明說,「根據這項協議,厄瓜多各地的群眾動員劃下句點,我們承諾會恢復國內和平」。聲明並表示,政府已撤回取消燃油補貼的命令。臉上塗著油彩、頭上頂著羽毛頭飾的瓦爾加斯證實:「這些適用於我們全國各地的措施已經取消了。」

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

分類
發燒車訊

MapReduce 論文閱讀筆記

目錄

  • Abstract
  • Introduction
  • 2 Programming Model
    • 2.1 Example
    • 2.2 Types
    • 2.3 More Examples
  • 3 Implementation
    • 3.1 Execution Overview
    • 3.2 Master Data Structures
    • 3.3 Fault Tolerance
      • Worker Failure (工作節點故障)
      • Master Failure(主節點故障)
      • Semantics in the Presence of Failures(語義可能存在的故障)
    • 3.4 Locality
    • 3.5 Task Granularity
    • 3.6 Backup Tasks
  • 4 Refinements
    • 4.1 Partitioning Function
    • 4.2 Ordering Guarantees
    • 4.3 Combiner Function
    • 4.4 Input and Output Types
    • 4.5 Side-effects
    • 4.6 Skipping Bad Records
    • 4.7 Local Execution
    • 4.8 Status Information
    • 4.9 Counters
  • Others

Abstract

MapReduce :

  • programming model 編程模型
  • an associated implementation for processing and generating large data sets.

用戶只需要指定 Map(Map函數將 key/value 類型的 pair 生成中間結果的 pair) 和 Reduce 函數(Reduce 函數將所有具有相同中間結果的值組合起來)即可。

MapReduce 封裝隱藏了分佈式系統并行計算的細節:

  • 輸入數據的分割
  • 計劃將程序分配到一組計算機中
  • 處理機器故障
  • 管理集群內部的通信

程序(in functional style)分佈式的運行在大型分佈式的集群上,而且具有很好的可伸縮性 scalable。

Introduction

過去這些年,Google一直在尋找方法來實現處理大量數據(抓取到的文件,web日誌等)的方法,通常數據量很大而且必須分散在數以千計的電腦上來進行運算。為了處理如何使計算相互關聯,分配數據以及處理故障的問題,往往編寫大量的複雜代碼掩蓋了他們,最初的簡單計算的初衷卻被忽略掉。

為了解決這種複雜性,抽象出了一個簡單的計算模型放到一個庫中,這個庫隱藏了可能出現的問題:

  • 并行計算
  • 容錯
  • 數據分發
  • 負載均衡

這個抽象受到了 Lisp 以及很多函數式編程語言中存在的原語 mapreduce 的啟發。

大多數并行計算都包含兩個步驟:

  • map:將每個邏輯記錄變成 key/value 的中間形式方便計算
  • reduce:將所有具有相同 key 的值組合到一起來進行合適的處理

我們使用一個函數式的編程模型(functional programming model)可以讓處理大型的并行計算和使用重新執行作為容錯的主要機制變得很簡單。

這項工作的主要貢獻是:提供了一個簡單但是很強大的接口(interface)讓自動化的并行計算和大規模計算的分發成為可能,結合該接口的實現,可以在商用機的大型集群上實現高性能。

Section2 :描述了基本的編程模型給出幾個例子

Section3 :描述MapReduce 接口針對集群運算環境的實現

Section4 :一些針對該模型的細微的改良

Section5 :針對實現設計出一系列性能衡量方法

Section6 :MapReduce 在 Google 中的使用,以及使用 MapReduce 來重寫生產環境的索引系統

Section7 :相關以及未來的工作

2 Programming Model

input: a set of key/value pairs

output: a set of key/value pairs

MapReduce 的用戶將只會使用兩個函數 MapReduce

Map:用戶編寫,將輸入的 pair 變成 k/v 的中間 pairs,然後 MapReduce 會把具有相同 key 的 pair 送給 Reduce 函數

Reduce: 用戶編寫,接受中間結果 key 和 key 的一系列值。將這些值組合起來成為更少的 k/v;通常每個 Reduce 函數只輸出一個 或者 0 個值。中間結果太多無法全部放到內存中,可以通過迭代的方法來處理大量的 value

2.1 Example

設想一個需要統計文件中每個單詞數量的一個問題,我們很可能編寫這樣的代碼:

map(String key, String value):
	// key: document name
	// value: document contents
	for each word w in value:
		EmitIntermediate(w, "1")
      
reduce(String key, String values):
	// key: a word
	// values: a list of counts
	int result = 0
  for each v in values
    result += ParseInt(v)
  Emit(AsString(result))

map:給每個單詞添加一個屬性(出現的次數,這裏就是1)

reduce:給每個特定的單詞加起來計算總數並且提交

此外,用戶編寫代碼以使用輸入和輸出文件的名稱以及可選的調整參數來填充mapreduce規範對象。然後,用戶調用MapReduce函數,並將其傳遞給指定對象。用戶代碼與MapReduce庫(在C ++中實現)鏈接在一起。

2.2 Types

儘管前面偽代碼使用 string 來寫的輸入輸出,但是從概念上說,是由用戶來指定 map 和 reduce 的類型

map (k1, v1)  ->  list(k2, v2)
map (k2, list(v2)) -> list(v2)

輸入值和中間值來自不同的域,中間值和輸出值來自相同的域

C ++實現在用戶定義的函數之間來回傳遞字符串,並將其留給用戶代碼以在字符串和適當的類型之間進行轉換

2.3 More Examples

這有一些可以使用 MapReduce 簡化的計算:

Distributed Grep: 分佈式的匹配,map 函數提交一個符合匹配的 line, reduce 的作用只是複製中間結果到輸出

Count of URL Access Frequency: 網頁訪問計數,map 處理網頁請求並且輸出中間結果為 <URL, 1>, reduce 功能是將所有的相同的 URL 計算到一起提交為 <URL, total count>

Reverse Web-Link Graph: 翻轉網絡鏈接圖, map 輸出 <target, source> pairs ,將 target 命名為 source。reduce函數連接與給定目標URL關聯的所有源URL的列表,並提交該對 <target, list(source)>

Term-Vector per Host: 術語向量是出現在一篇文章中最重要的術語集合列表<word, frequency> pairs。map 函數給每個輸入文件輸出一個 <hostname, term vector> pairs,reduce 函數傳遞給特定主機的術語向量,然後去掉不常出現的向量最後提交一個 <hostname, term vector> pair

Invert Index:map函數解析每個文檔,併發出一系列<單詞,文檔ID>對。 reduce函數接受給定單詞的所有對,對相應的文檔ID進行排序,併發出一個“單詞,列表(文檔ID)”對。setofall輸出對形成一個簡單的倒排索引。易於擴展此計算以跟蹤單詞位置。

Distributed Sort: 分佈式排序,map函數功能從每個記錄中提取鍵,併發出一個<key, record>對。 reduce函數將所有對保持不變。這種計算取決於第4.1節中描述的分區功能和第4.2節中描述的排序屬性。

3 Implementation

MapReduce 可以有很多不同的實現,正確實現是根據你自己所在的環境來進行實現,例如某個實現可能很適合一個共享內存的小機器,某個實現可能是在NUMA多處理器的環境下,也可能是在一個大的網絡連接的集群的機器中。

NUMA Non-uniform memory access

非統一內存訪問架構是一種為多處理器的電腦設計的內存架構,內存訪問時間取決於內存相對於處理器的位置。在NUMA下,處理器訪問它自己的本地內存的速度比非本地內存快一些。 非統一內存訪問架構的特點是:被共享的內存物理上是分佈式的,所有這些內存的集合就是全局地址空間

這節描述的是 Google 雲計算環境下廣泛使用的,下面是 Google 的配置:

  1. 機器,都是典型的Linux系統,運行在基於x86的雙處理器上,每台機器 2-4GB 內存
  2. 網絡,使用商品網絡硬件,在機器級別通常為100Mb/s或1Gb/s,但平均平均對分帶寬要小得多
  3. 集群中有上百或者上千個機器,所以機器故障出現很正常
  4. 存儲,使用廉價的 IDE 硬盤直接保存每個機器自己的數據,開發出的分佈式文件系統來管理這些磁盤上的文件。文件系統使用複製來在不可靠的硬件上提供可用性和可靠性。
  5. 用戶通過一個任務調度系統提交任務。每個工作包含一系列的任務,使用任務調度器來分配到集群中可用的機器上

3.1 Execution Overview

Map 調用分佈在多個機器上,自動將輸入數據分配成 M 組,輸入的分割可以并行的發生在不同的機器上。 Reduce 調用也是分佈式的,通過將中間值的key使用一個分割函數(例如:hash(key) mod R)來將任務分配到不同的機器上。分區數量 R 的取值和分區數量也是通過用戶來指定的。

下面這個圖說明了 MapReduce 執行的完整流程:

  1. 用戶程序中的 MapReduce 庫首先將輸入文件分成 M 份(每份 16MB – 64 MB),然後開始在集群中複製很多拷貝
  2. 程序中有一份拷貝是特殊的(master)。剩下的 worker 來被分配工作,有 M 個map任務和 R 個 reduce 任務來分配給不同的 worker。master 來挑選空閑的 worker 分配給他們每個一個 map task 或者 reduce task
  3. 一個被分配到 map 工作的 worker 將會從對應的分割的內容中讀取。它會解析 k/v pair 到輸入數據並且傳給到用戶定義的 map 函數。Map 函數產生的中間 key/value 數據將會被保存在內存的緩存中
  4. 緩存中的 pair 會被周期性的寫入到本地磁盤上,通並且過分割函數將該文件分成的 R 個區域。這些緩衝對的位置在本地磁盤上被傳遞迴主服務器,該主服務器負責將這些位置轉發給reduce worker。
  5. 當一個 reduce worker 被 master 告知這些存儲的區域,reduce worker 將使用遠程過程調用來從 map workers 的本地磁盤以及緩存中讀取 pair 對。當一個 reduce worker 讀取了所有的中間數據,它將會跟配相同的 key 來進行排序。排序是必需的因為通常會有很多不同的 key 映射到同一個 reduce 任務。如果中間數據太大來放到內存中排序,外排序就會被使用
  6. reduce worker 迭代排過序的中間數據,對於每個獨特的中間值 key,它會傳遞這個 key 和對應中間值到用戶定義的 reduce 函數中來處理。Reduce函數的輸出將附加到此reduce分區的最終輸出文件中
  7. 當所有的 map task 和 reduce task 都被完成之後,master 喚醒用戶程序。這個時候,MapReduce 的調用返回到用戶的代碼邏輯中

在成功的完成之後,mapreduce執行的輸出將會在 R 個輸出文件中(每個reduce task 都會被用戶指定文件的名稱)。通常來說,用戶不會將 R 個輸出文件合併成一個,而是將這個文件作為另一個 MapReduce 的輸入。或者把他們當成另外的分佈式程序的輸入

3.2 Master Data Structures

master 保存一些數據結構。對於每個 map 和 reduce 任務,它會保存狀態(idle,in-progress,completed),以及識別每個 worker machine(非空閑任務)。

master 是處於 map 任務發送到 reduce 任務中間的導管,master 會保存中間文件區域的位置。因此,對於每個完成的 map 任務, master 保存 R 個由map任務產生的中間文件的大小和位置。當 map 任務完成之後,會更新這些文件的位置和大小。這些信息將逐漸被推送到已經在工作的 reduce 任務。

3.3 Fault Tolerance

由於 MapReduce 是用來在大量機器上處理大量數據的一個庫,所以這個庫必需能夠有很好的容錯能力。

Worker Failure (工作節點故障)

master 節點周期性的 ping 每個 worker 節點。如果在一個特定的時間內沒有收到回復,那麼 master 節點就會將這個 worker 標記為失敗。完成 map 任務的 worker 節點將會被重置為 idle 空閑狀態,然後就可以被其他 worker 節點安排。相似的,如果一個節點上的 map 或者 reduce 任務在執行過程中失敗了,那麼這個任務將會被重置然後分配然後重新分配。

如果一個 map 任務的節點在完成任務之後出現故障,那麼就需要重新執行這個任務,因為這個節點變得不可訪問。但是如果是 reduce 任務完成之後節點出現故障,不需要重新執行,這是因為 reduce 任務的輸出被保存到一個全局文件系統中。

當一個 map 任務首先在節點 A 上執行之後在節點 B 上執行時,所有正在執行的 reduce 節點將會被告知這次重新執行過程。所有還沒有從 worker A 讀取數據的 reduce 任務將會 worker B 讀取。

MapReduce 可以處理大規模的 worker 節點故障。例如在一次 MapReduce 任務中,一個由80台計算機組成的集群由於網絡問題無法訪問,MapReduce 的 master 節點只是簡單的讓那些不能正常執行任務的工作節點再次執行任務,然後繼續向前執行任務直到完成 MapReduce 操作。

Master Failure(主節點故障)

讓 master 節點周期性的上述master節點的數據結構的檢查點。如果 master task 失敗了,可以從上一個檢查點的拷貝恢復。但是如果只有一個主節點,那麼出現故障的可能性非常小,因此如果主節點出現故障,我們的當前的實現就中止了此次 MapReduce 任務。客戶端可以檢查到這種情況,然後可以選擇是否重試 MapReduce 操作。

Semantics in the Presence of Failures(語義可能存在的故障)

當用戶指定的 map 和 reduce 操作對於他們的輸入輸出都確定好了之後,分佈式的實現將會產生一個類似於線性執行過程任務執行的結果。

我們依賴於 map 和 reduce 任務的 原子性 commit 作為這個特性的保證。每個執行過程中的任務都會將把他的輸出保存到一個私有的臨時文件中。一個 reduce 任務產生一個這樣的文件,但是一個 map 任務將會產生 R 個這樣的文件。當一個 map 任務完成之後, worker 節點發送包含這 R 個文件名的消息到 master 節點。如果這個節點已經接收到完成的消息,那麼將會忽略這個消息,否則將會把這些文件名保存到 master 節點的數據結構中。

當一個 reduce 任務完成的時候,reduce worker 將會原子性的將它的臨時文件重命名成一個輸出文件。如果有以個 reduce 任務在多個機器上同時完成,那麼這個重命名的操作將會對於一個輸出文件多次執行。我們依賴於基礎文件系統提供的原子重命名操作,以確保最終文件系統狀態僅包含一次執行reduce任務所產生的數據。

map 和 reduce 操作絕大部分都是確定性的,事實上我們的語義將會和線性順序執行的程序的結果一致,這樣很容易分析程序的行為。當 map/reduce 操作是不確定的時候,我們提供弱化但是可信的語義。例如在一個不確定的語義中,一個特定 reduce 任務的輸出和這個任務順序執行的結果一致。然而,用於不同reduce任務R2的輸出可以對應於由不確定性程序的不同順序執行所產生的用於R2的輸出。(這裏保留疑問,沒有太懂什麼意思,指的是有可能是線性結果一致的意思嗎?)

考慮有一個 map 任務 M,和兩個 reduce 任務 R1,R2,e(Ri)是 Ri提交的結果,弱一點的語義指的是,e(R1) 可能讀取的 M 的一個執行的結果而 e(R2) 可能讀取的是 R 執行輸出的另外一個結果。

3.4 Locality

網絡帶寬是在雲計算環境中比較稀缺的資源(盡量少用)。通過將輸入文件(由GFS保管)保存到本地磁盤上來減少網絡帶寬的使用。GFS 將每個文件分成 64 MB的塊,然後將每塊保存幾個副本(通常為3份)在不同的機器上。MapReduce 盡量將這些位置信息保存下來然後盡量將含有某個文件主機的任務分配給它,這樣就可以減少網絡的傳遞使用。如果失敗,那麼將會嘗試從靠近輸入數據的一個副本主機去啟動這個任務。當在一個集群上執行大型的 MapReduce 操作的時候,輸入數據一般都是本地讀取,減少網絡帶寬的使用。

3.5 Task Granularity

我們將一個 map 任務分成 M 塊,然後 reduce 會處理最後輸出成 R 塊。

理想情況下,M 和 R 應該遠遠大於集群中的 worker 節點數量。讓每個節點執行不同的任務將會有利於動態的負載均衡,同時會加速當一個 worker 節點故障之後的恢復,map 任務完成之後可以分配到所有的其他節點上。

R 和 M 在實現過程中會有邊界,因為 schedule 決策需要 O(M + R)的時間,保存這個信息到內存中需要 O(M*R)的複雜度(常數內存很小)。

R 通常是由用戶指定的,這是由於每個 reduce 任務的輸出將會輸出到單獨的文件中。在實際中,通常 M 被選擇到每個單獨的任務輸入數據將會是 16MB ~ 64MB,讓 R 要比我們使用的機器的數目的小几倍。

例如,M =20,000,R = 5,000,worker machines = 2000

3.6 Backup Tasks

讓整個 MapReduce 任務時間延長的原因主要有 “拖延者”:某一個機器在 map/reduce 任務上花費了太多的時間。導致這個 “拖延者” 的原因可能有很多,比如一塊讀寫速度超級慢的硬盤 1MB/s(其他的是 30MB/s)比如集群在這個機器上也分配了其他任務,這些任務競爭使用 CPU、硬盤網絡等等。我們最近遇到的一個問題是機器初始化代碼中的一個錯誤,該錯誤導致禁用了處理器緩存:受影響機器的計算速度降低了一百倍。

4 Refinements

儘管上面說的夠用了,但是我們還是找到一些可以優化的點。

4.1 Partitioning Function

用戶將會指定輸出文件的數量 R。輸入數據將會根據中間值來把這些數據分區。一個默認的分區函數就是 hash函數(hash(key) mod R) 。通常情況下這樣很好,但是在某些情況下不是很好。例如,輸出數據是 URL key,我們希望具有相同主機 hostname 的 URL 在一起,這樣,MapReduce 提供了用戶自己制定 分區函數的方式,例如可以寫為(hash(Hostname(urlkey))),這樣具有相同的 hostname 的URL將會在一個相同的輸出文件中。

4.2 Ordering Guarantees

我們保證了在某個給定的分區中,中間值的 k/v pair 將會按照增序排列,這樣在某些需要有序的場景下是很有用的。

4.3 Combiner Function

在某些情況下,最終的輸出文件reduce是需要根據中間值來合併的。例如在2.1的 word count 中,需要統計每個單詞的數目,我們輸出的是 <word, 1> 這樣的形式。這些pair都需要通過網絡來進行發送到 reduce 工作節點,我們提供了一個 combiner 函數,讓用戶可以在發送數據之前執行的函數,也就是說在本地先合併,然後再發送到網絡中去。

在每個執行 map task 的機器都會執行 conbine 函數。

combine 和 reduce 的唯一區別:

  • combine 輸出是到中間值文件中
  • reduce 輸出到一個最終的輸出文件中

4.4 Input and Output Types

MapReduce 庫提供了幾種輸入的類型。

例如,在 “text” 模式下輸入將每一個行當作 k/v pair,這行的偏移量當作 key,這行的內容當作 value。

用戶可以通過實現 reader 接口在實現自己的輸入類型,儘管大多數用戶都只使用預定義的類型。

reader 不一定要從文件中讀取數據,也可以從數據庫中讀取數據,或者從內存中的某個的某個數據結構中獲取數據。

同樣的,輸出也可以自定義。

4.5 Side-effects

在某些情況下,用戶需要產生一些額外的輸出文件在reduce 的輸出結果中。我們依靠這個應用程序自己的編寫者來使此類副作用成為原子和冪等的。通常應用程序將會寫入一個臨時文件中,然後當它完成的時候將會原子性的重命名這個文件。

我們不提供原子性的兩節點提交由一個任務產生的多個輸出文件。因此,產生多個輸出的結果的任務應該是確定性的。

4.6 Skipping Bad Records

在處理大量數據的時候,由於用戶的 map/reduce 函數的錯誤在處理某些數據的時候產生bug,這個時候可以選擇跳過這些 bug。有的時候我們可以查找到bug所在的地方,但是有的時候我們找不到bug,因為可能是第三方的庫導致的錯誤,我們提供了一種可選的執行模式來跳過這些可能出現錯誤的記錄。

每個 worker 進程都會有一個監聽段錯誤和總線錯誤的處理器。在執行用戶的 map/reduce 函數之前,mapreduce 將會在一個全局變量中保存順序編號。如果用戶代碼產生了一個 signal,那麼就會發送一個 UDP包到 MapReduce 的 master 節點上。當 master 節點發現在某個幾點上出現了很多次故障的時候,之後就會跳過這個記錄。

4.7 Local Execution

在 Map/Reduce 中debug需要一些 trick,因為在分佈式系統中執行可能是在幾千台機器中,工作分配也是動態的。MapReduce 提供了一個另外的本地MapReduce 的本地實現(順序執行),這樣就可以在本地來進行 debug了。

4.8 Status Information

master 節點通過 HTTP 服務器提供一個显示當前狀態的網頁。這個界面显示了多少任務完成了,多少任務還在執行,中間數據有多少字節等,還包含了錯誤的標準輸出文件的鏈接等,用戶可以通過這個界面預估還有多久可以完成任務。當執行很慢的時候,可以通過這個界面來查找原因。

另外,top-level 的狀態信息還會显示哪些 worker 節點有故障,哪些任務失敗了。

4.9 Counters

提供一個全局的計數器來統計某些數據,例如統計大寫單詞的出現次數。

 Counter* uppercase;
  uppercase = GetCounter("uppercase");
  map(String name, String contents):
    for each word w in contents:
			if (IsCapitalized(w)): 
				uppercase->Increment(); 
			EmitIntermediate(w, "1");

計數器的值會從每個節點周期性的發送到master節點,主節點統計計數器的值並且在 狀態頁面显示。

有些值MapReduce 會自己去統計。

計數器功能對於完整性檢查MapReduce操作的行為很有用。例如,在某些MapReduce操作中,用戶代碼可能想要確保所生成的輸出對的數量完全等於所處理的輸入對的數量,或者所處理的德語文檔的比例在該比例之內。

Others

shuffle:將所有具有相同 key 的value 發送個單個的 reduce 進程,在網絡上傳輸數據,是MapReduce代價最大的部分

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※幫你省時又省力,新北清潔一流服務好口碑

※別再煩惱如何寫文案,掌握八大原則!

分類
發燒車訊

使用json-server與Mockjs搭建模擬服務

為什麼使用

在項目開發中,常常需要邊寫前端頁面邊寫後端接口,但是後端接口服務往往是滯後於前端開發的,或者是不能及時提供的。出於前端開發的迅速和便捷去考慮,我們可以根據後端接口數據結構去模擬(mock)數據從而實現前端的獨立開發。
JsonServer 主要的作用就是搭建本地的數據接口,創建json文件,便於調試調用
Mockjs 主要的作用就是生成隨機數據,支持生成隨機的文本、数字、布爾值、日期、郵箱、鏈接、圖片、顏色等

如何使用

以下操作需要node.js環境

1.創建文件夾

選擇自己喜歡的位置創建一個文件夾,比如E:/mock

2.安裝json-server

進入E:/mock

npm install json-server -g 

安裝完執行 json-server -h,若安裝成功則會显示選項

Options:
–config, -c Path to config file [default: “json-server.json”]
–port, -p Set port [default: 3000]
–host, -H Set host [default: “localhost”]
–watch, -w Watch file(s) [boolean]
–routes, -r Path to routes file
–middlewares, -m Paths to middleware files [array]
–static, -s Set static files directory
–read-only, –ro Allow only GET requests [boolean]
–no-cors, –nc Disable Cross-Origin Resource Sharing [boolean]
–no-gzip, –ng Disable GZIP Content-Encoding [boolean]
–snapshots, -S Set snapshots directory [default: “.”]
–delay, -d Add delay to responses (ms)
–id, -i Set database id property (e.g. _id) [default: “id”]
–foreignKeySuffix, –fks Set foreign key suffix (e.g. _id as in post_id)
[default: “Id”]
–quiet, -q Suppress log messages from output [boolean]
–help, -h Show help [boolean]
–version, -v Show version number [boolean]

3.使用json-server
  • 創建json文件,如db.json,文件內容如下
{ "posts": [ { "id": 1, "title": "json-server", "author": "typicode" } ], "comments": [ { "id": 1, "body": "some comment", "postId": 1 } ], "profile": { "name": "typicode" } } 
  • 啟動json-server,cmd執行
json-server --watch db.json 

看到如下內容

 

1574216715(1).jpg


則運行成功,直接訪問圖中显示的三個地址可得結果,如


 

1574216903(1).jpg


至此,簡單的json-server服務已經搭建成功了,後續如果有更多需求,如跨域、參數查詢、路由這些,請參考json-server的github

 

4.安裝Mockjs

進入E:/mock

npm install mockjs --save 
5.使用Mockjs
  • 創建js文件,如news.js,文件內容如下
let Mock=require('mockjs'); let Random=Mock.Random; module.exports=()=>{ let data={ news:[] }; let images=[1,2,3].map(x=>Random.image('120x60',Random.color(),Random.word(2,6))); for(let i=1;i<=100;i++){ let content=Random.cparagraph(0,10); data.news.push({ id:i, title:Random.cword(8,20), desc:content.substr(0,40), tag:Random.cword(2,6), views:Random.integer(100,5000), images:images.slice(0,Random.integer(1,3)) }) } return data } 
  • 啟動json-server,cmd執行
json-server --watch news.js 

訪問結果(部分)


 
 

完成了以上的搭建與驗證過程后,你就可以開始使用json-server與Mockjs來繼續構建模擬服務器了,來滿足自己的各種需要

Tips:

1、也可以使用json-server db.json ,使用“json-server –watch db.json”命令可以實時監測db.json的變化;如果沒有 — watch 命令,即使db.json已經發生了改變,重新發請求,仍然會返回原先的mock data,返回狀態碼304,認為沒有變化。

2、同時,我們可以發送 POST、PUT、PATCH和DELETE請求,相應的結果會通過lowdb自動保存到db.json。關於POST、PUT等相關請求的發送。

3、我們的request body應該是一個json對象,比如{“name”:”Lynn”};

4、POST、PUT、PATCH請求頭中要包含Content-Type: application/json;

5、id的值是自動生成且不易變的。PUT請求和PATCH請求中自帶的id會被忽略。

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

分類
發燒車訊

springboot_自動配置原理

目錄

  • 1.1 @SpringBootApplication
  • 2.1 @EnableAutoConfiguration
    • 2.1.1 @AutoConfigurationPackage
    • 2.1.2 @Import({Registrar.class})
  • 3.1 以HttpEncodingAutoConfiguration為例

springboot啥都不難,總所周知spring全家桶系列難就難在理解源碼。。。。。。。

今天結合網上資料,自己總結了一下springboot的自動配置原理。

我現在使用的springboot版本為2.3.1.不同版本的springboot在源碼上有差別!但大體一致。

管他三七二十一先打個斷點再說:

1.1 @SpringBootApplication

這個註解點進去我們可以看到:

這裏面主要關注兩個東西:

  • @SpringBootConfiguration
  • @EnableAutoConfiguration
    第一個註解點進去:

    可以看到這個@SpringBootConfiguration本質就是一個@Configuration,標註在某個類上,表示這是一個Spring Boot的配置類。
    第二個註解@EnableAutoConfiguration: 開啟自動配置類,SpringBoot的精華所在。(最重要的就是這個註解)

2.1 @EnableAutoConfiguration

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@AutoConfigurationPackage
@Import({AutoConfigurationImportSelector.class})
public @interface EnableAutoConfiguration {
    String ENABLED_OVERRIDE_PROPERTY = "spring.boot.enableautoconfiguration";

    Class<?>[] exclude() default {};

    String[] excludeName() default {};
}

兩個比較重要的註解:

  • @AutoConfigurationPackage:自動配置包。
  • @Import({AutoConfigurationImportSelector.class}):導入自動配置的組件。

2.1.1 @AutoConfigurationPackage

點進去瞅瞅:

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import({Registrar.class})
public @interface AutoConfigurationPackage {
    String[] basePackages() default {};

    Class<?>[] basePackageClasses() default {};
}

發現這裡有導入Regitstrar類:

static class Registrar implements ImportBeanDefinitionRegistrar, DeterminableImports {
        Registrar() {
        }

        public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
            AutoConfigurationPackages.register(registry, (String[])(new AutoConfigurationPackages.PackageImports(metadata)).getPackageNames().toArray(new String[0]));
        }

        public Set<Object> determineImports(AnnotationMetadata metadata) {
            return Collections.singleton(new AutoConfigurationPackages.PackageImports(metadata));
        }
    }

new PackageImport(metadata).getPackageName(),它其實返回了當前主程序類的 **同級以及子級 ** 的包組件。

什麼意思呢?

我們來看這樣一個目錄:

bean1和我們的springboot啟動類位於同一個包下,二bean2不是位於我們啟動類的同級目錄或者子級目錄,那麼我們啟動的時候bean2是不會被加載到的!所以你項目的一切需要加入容器的類必須放在啟動類的同級包下或者它的子級目錄中。

2.1.2 @Import({Registrar.class})

AutoConfigurationImportSelector有一個方法為:selectImports。

public String[] selectImports(AnnotationMetadata annotationMetadata) {
        if (!this.isEnabled(annotationMetadata)) {
            return NO_IMPORTS;
        } else {
            AutoConfigurationImportSelector.AutoConfigurationEntry autoConfigurationEntry = this.getAutoConfigurationEntry(annotationMetadata);
            return StringUtils.toStringArray(autoConfigurationEntry.getConfigurations());
        }
    }

它首先回去檢查是否開啟了自動配置類,然後才回去加載註解數據 this.getAutoConfigurationEntry(annotationMetadata);
那麼這個annotationMetadata在哪兒?
來看下面一行代碼:

  protected List<String> getCandidateConfigurations(AnnotationMetadata metadata, AnnotationAttributes attributes) {
        List<String> configurations = SpringFactoriesLoader.loadFactoryNames(this.getSpringFactoriesLoaderFactoryClass(), this.getBeanClassLoader());
        Assert.notEmpty(configurations, "No auto configuration classes found in META-INF/spring.factories. If you are using a custom packaging, make sure that file is correct.");
        return configurations;
    }

SpringFactoriesLoader.loadFactoryNames(this.getSpringFactoriesLoaderFactoryClass(), this.getBeanClassLoader());
再點進去我們會發現這一行代碼:
Enumeration urls = classLoader != null ? classLoader.getResources(“META-INF/spring.factories”) : ClassLoader.getSystemResources(“META-INF/spring.factories”);

它其實是去加載 public static final String FACTORIES_RESOURCE_LOCATION = “META-INF/spring.factories”;外部文件。這個外部文件,有很多自動配置的類。如下:

spring.factories文件由一組一組的key=value的形式,其中一個key是EnableAutoConfiguration類的全類名,而它的value是一個xxxxAutoConfiguration的類名的列表,這些類名以逗號分隔。
springboot項目啟動時,@SpringBootApplication用在啟動類在SpringApplication.run(…)的內部就會執行selectImports()方法,找到所有JavaConfig自動配置類的全限定名對應的class,然後將所有自動配置類加載到Spring容器中。

3.1 以HttpEncodingAutoConfiguration為例

@Configuration(
    proxyBeanMethods = false
)    //表示是一個配置類,可以給容器中添加組件
@EnableConfigurationProperties({ServerProperties.class})// 啟用ConfigurationProperties功能
@ConditionalOnWebApplication(
    type = Type.SERVLET
)
@ConditionalOnClass({CharacterEncodingFilter.class})
@ConditionalOnProperty(
    prefix = "server.servlet.encoding",
    value = {"enabled"},
    matchIfMissing = true
)

@EnableConfigurationProperties({ServerProperties.class})// 啟用ConfigurationProperties功能
ServerProperties.class:

@ConfigurationProperties(
    prefix = "server",
    ignoreUnknownFields = true
)

@ConditionalOnWebApplication :spring底層@Conditional註解,根據不同的條件進行判斷,如果滿足條件整個配置類才會生效。

總結:
1.springboot會自動加載大量的自動配置類。
2.只要我們要用的組件有,我們就不需要再去配置
3.給容器添加組件的時候。會從properties類中獲取某些屬性。我們就可以在配置文件中指定這些屬性。
xxxxxAutoConfiguration:自動配置類

給容器中添加屬性:
xxxxProperties:封裝配置文件中的相關屬性。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※超省錢租車方案

※教你寫出一流的銷售文案?