分類
發燒車訊

同步鎖基本原理與實現

  為充分利用機器性能,人們發明了多線程。但同時帶來了線程安全問題,於是人們又發明了同步鎖。

  這個問題自然人人知道,但你真的了解同步鎖嗎?還是說你會用其中的上鎖與解鎖功能?

  今天我們就一起來深入看同步鎖的原理和實現吧!

 

一、同步鎖的職責

  同步鎖的職責可以說就一個,限制資源的使用(線程安全從屬)。

  它一般至少會包含兩個功能: 1. 給資源加鎖; 2. 給資源解鎖;另外,它一般還有 等待/通知 即 wait/notify 的功能;

  同步鎖的應用場景:多個線程同時操作一個事務必須保證正確性;一個資源只能同時由一線程訪問操作;一個資源最多只能接入k的併發訪問;保證訪問的順序性;

  同步鎖的實現方式:操作系統調度實現;應用自行實現;CAS自旋;

  同步鎖的幾個問題:

    為什麼它能保證線程安全?

    鎖等待耗CPU嗎?

    使用鎖后性能下降嚴重的原因是啥?

 

二、同步鎖的實現一:lock/unlock

  其實對於應用層來說,非常多就是 lock/unlock , 這也是鎖的核心。

  AQS 是java中很多鎖實現的基礎,因為它屏蔽了很多繁雜而底層的阻塞操作,為上層抽象出易用的接口。

  我們就以AQS作為跳板,先來看一下上鎖的過程。為不至於陷入具體鎖的業務邏輯中,我們先以最簡單的 CountDownLatch 看看。

    // 先看看 CountDownLatch 的基礎數據結構,可以說是不能再簡單了,就繼承了 AQS,然後簡單覆寫了幾個必要方法。
    // java.util.concurrent.CountDownLatch.Sync
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            // 只有一種情況會獲取鎖成功,即 state == 0 的時候
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                // 原始的鎖數量是在初始化時指定的不可變的,每次釋放一個鎖標識
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    // 只有一情況會釋放鎖成功,即本次釋放后 state == 0
                    return nextc == 0;
            }
        }
    }
    private final Sync sync;

 

重點1,我們看看上鎖過程,即 await() 的調用。

    public void await() throws InterruptedException {
        // 調用 AQS 的接口,由AQS實現了鎖的骨架邏輯
        sync.acquireSharedInterruptibly(1);
    }
    
    // java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly
    /**
     * Acquires in shared mode, aborting if interrupted.  Implemented
     * by first checking interrupt status, then invoking at least once
     * {@link #tryAcquireShared}, returning on success.  Otherwise the
     * thread is queued, possibly repeatedly blocking and unblocking,
     * invoking {@link #tryAcquireShared} until success or the thread
     * is interrupted.
     * @param arg the acquire argument.
     * This value is conveyed to {@link #tryAcquireShared} but is
     * otherwise uninterpreted and can represent anything
     * you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 首先嘗試獲取鎖,如果成功就不用阻塞了
        // 而從上面的邏輯我們看到,獲取鎖相當之簡單,所以,獲取鎖本身並沒有太多的性能消耗喲
        // 如果獲取鎖失敗,則會進行稍後嘗試,這應該是複雜而精巧的
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    
    /**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 首先將當前線程添加排隊隊尾,此處會保證線程安全,稍後我們可以看到
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                // 獲取其上一節點,如果上一節點是頭節點,就代表當前線程可以再次嘗試獲取鎖了
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                // 先檢測是否需要阻塞,然後再進行阻塞等待,阻塞由 LockSupport 底層支持
                // 如果阻塞后,將不會主動喚醒,只會由 unlock 時,主動被通知
                // 因此,此處即是獲取鎖的最終等待點
                // 操作系統將不會再次調度到本線程,直到獲取到鎖
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    // 如此線程安全地添加當前線程到隊尾? CAS 保證
    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    // 檢測是否需要進行阻塞
    /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
             // 只有前置節點是 SIGNAL 狀態的節點,才需要進行 阻塞等待,當然前置節點會在下一次循環中被設置好
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    // park 阻塞實現
    /**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        // 將當前 AQS 實例作為鎖對象 blocker, 進行操作系統調用阻塞, 所以所有等待鎖的線程將會在同一個鎖前提下執行
        LockSupport.park(this);
        return Thread.interrupted();
    }

  如上,上鎖過程是比較簡單明了的。加入一隊列,然後由操作系統將線程調出。(那麼操作系統是如何把線程調出的呢?有興趣自行研究)

 

重點2. 解鎖過程,即 countDown() 調用

    public void countDown() {
        // 同樣直接調用 AQS 的接口,由AQS實現了鎖的釋放骨架邏輯
        sync.releaseShared(1);
    }
    // java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
    /**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        // 調用業務實現的釋放邏輯,如果成功,再執行底層的釋放,如隊列移除,線程通知等等
        // 在 CountDownLatch 的實現中,只有 state == 0 時才會成功,所以它只會執行一次底層釋放
        // 這也是我們認為 CountDownLatch 能夠做到多線程同時執行的效果的原因之一
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    /**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            // 隊列不為空才進行釋放
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                // 看過上面的 lock 邏輯,我們知道只要在阻塞狀態,一定是 Node.SIGNAL 
                if (ws == Node.SIGNAL) {
                    // 狀態改變成功,才進行後續的喚醒邏輯
                    // 因為先改變狀態成功,才算是線程安全的,再進行喚醒,否則進入下一次循環再檢查
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 將頭節點的下一節點喚醒,如有必要
                    unparkSuccessor(h);
                }
                // 這裏的 propagates, 是要傳播啥呢??
                // 為什麼只喚醒了一個線程,其他線程也可以動了?
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
    /**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        // 喚醒下一個節點
        // 但如果下一節點已經取消等待了,那麼就找下一個沒最近的沒被取消的線程進行喚醒
        // 喚醒只是針對一個線程的喲
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

 

重要3. 線程解鎖的傳播性?

  因為從上一節的講解中,我們看到,當用戶調用 countDown 時,僅僅是讓操作系統喚醒了 head 的下一個節點線程或者最近未取消的節點。那麼,從哪裡來的所有線程都獲取了鎖從而運行呢?

  其實是在 獲取鎖的過程中,還有一點我們未看清:

    // java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireShared
    /**
     * Acquires in shared uninterruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    // 當countDown被調用后,head節點被喚醒,執行
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 獲取到鎖后,設置node為下一個頭節點,並把喚醒狀態傳播下去,而這裏面肯定會做一些喚醒其他線程的操作,請看下文
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    /**
     * Sets head of queue, and checks if successor may be waiting
     * in shared mode, if so propagating if either propagate > 0 or
     * PROPAGATE status was set.
     *
     * @param node the node
     * @param propagate the return value from a tryAcquireShared
     */
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            // 如果有必要,則做一次喚醒下一線程的操作
            // 在 countDown() 不會觸發此操作,所以這裏只是一個內部調用傳播
            Node s = node.next;
            if (s == null || s.isShared())
                // 此處鎖釋放邏輯如上,總之,又是另一次的喚醒觸發
                doReleaseShared();
        }
    }

  到此,我們明白了它是怎麼做到一個鎖釋放,所有線程可通行的。也從根本上回答了我們猜想,所有線程同時併發運行。然而並沒有,它只是通過喚醒傳播性來依次喚醒各個等待線程的。從絕對時間性上來講,都是有先後關係的。以後可別再淺顯說是同時執行了喲。

 

三、 鎖的切換:wait/notify

  上面看出,針對一個lock/unlock 的過程還是很簡單的,由操作系統負責大頭,實現代碼也並不多。

  但是針對稍微有點要求的場景,就會進行條件式的操作。比如:持有某個鎖運行一段代碼,但是,運行時發現某條件不滿足,需要進行等待而不能直接結束,直到條件成立。即所謂的 wait 操作。

  乍一看,wait/notify 與 lock/unlock 很像,其實不然。區分主要是 lock/unlock 是針對整個代碼段的,而 wait/notify 則是針對某個條件的,即獲取了鎖不代表條件成立了,但是條件成立了一定要在鎖的前提下才能進行安全操作。

  那麼,是否 wait/notify 也一樣的實現簡單呢?比如java的最基礎類 Object 類就提供了 wait/notify 功能。

  我們既然想一探究竟,還是以併發包下的實現作為基礎吧,畢竟 java 才是我們的強項。

  本次,咱們以  ArrayBlockingQueue#put/take 作為基礎看下這種場景的使用先。

  ArrayBlockingQueue 的put/take 特性就是,put當隊列滿時,一直阻塞,直到有可用位置才繼續運行下一步。而take當隊列為空時一樣阻塞,直到隊列里有數據才運行下一步。這種場景使用鎖主不好搞了,因為這是一個條件判斷。put/take 如下:

    // java.util.concurrent.ArrayBlockingQueue#put
    /**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 當隊列滿時,一直等待
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    
    // java.util.concurrent.ArrayBlockingQueue#take
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 當隊列為空時一直等待
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

  看起來相當簡單,完全符合人類思維。只是,這裏使用的兩個變量進行控制流程 notFull,notEmpty. 這兩個變量是如何進行關聯的呢?

  在這之前,我們還需要補充下上面的例子,即 notFull.await(), notEmpty.await(); 被阻塞了,何時才能運行呢?如上代碼在各自的入隊和出隊完成之後進行通知就可以了。

    // 與 put 對應,入隊完成后,隊列自然就不為空了,通知下 notEmpty 就好了
    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        // 我已放入一個元素,不為空了
        notEmpty.signal();
    }
    // 與 take 對應,出隊完成后,自然就不可能是滿的了,至少一個空餘空間。
    /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        // 我已移除一個元素,肯定沒有滿了,你們繼續放入吧
        notFull.signal();
        return x;
    }

  是不是超級好理解。是的。不過,我們不是想看 ArrayBlockingQueue 是如何實現的,我們是要論清 wait/notify 是如何實現的。因為畢竟,他們不是一個鎖那麼簡單。

    // 三個鎖的關係,即 notEmpty, notFull 都是 ReentrantLock 的條件鎖,相當於是其子集吧
    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;
    
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    // lock.newCondition() 是什麼鬼?它是 AQS 中實現的 ConditionObject
    // java.util.concurrent.locks.ReentrantLock#newCondition
    public Condition newCondition() {
        return sync.newCondition();
    }
        // java.util.concurrent.locks.ReentrantLock.Sync#newCondition
        final ConditionObject newCondition() {
            // AQS 中定義
            return new ConditionObject();
        }

  接下來,我們要帶着幾個疑問來看這個 Condition 的對象:

    1. 它的 wait/notify 是如何實現的?
    2. 它是如何與互相進行聯繫的?
    3. 為什麼 wait/notify 必須要在外面的lock獲取之後才能執行?
    4. 它與Object的wait/notify 有什麼相同和不同點?

  能夠回答了上面的問題,基本上對其原理與實現也就理解得差不多了。

 

重點1. wait/notify 是如何實現的?

  我們從上面可以看到,它是通過調用 await()/signal() 實現的,到底做事如何,且看下面。

        // java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()
        /**
         * Implements interruptible condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled or interrupted.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * </ol>
         */
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 添加當前線程到 等待線程隊列中,有 lastWaiter/firstWaiter 維護
            Node node = addConditionWaiter();
            // 釋放當前lock中持有的鎖,詳情且看下文
            int savedState = fullyRelease(node);
            // 從以下開始,將不再保證線程安全性,因為當前的鎖已經釋放,其他線程將會重新競爭鎖使用
            int interruptMode = 0;
            // 循環判定,如果當前節點不在 sync 同步隊列中,那麼就反覆阻塞自己
            // 所以判斷是否在 同步隊列上,是很重要的
            while (!isOnSyncQueue(node)) {
                // 沒有在同步隊列,阻塞
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 當條件被滿足后,需要重新競爭鎖,詳情看下文
            // 競爭到鎖后,原樣返回到 wait 的原點,繼續執行業務邏輯
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            // 下面是異常處理,忽略
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
    /**
     * Invokes release with current state value; returns saved state.
     * Cancels node and throws exception on failure.
     * @param node the condition node for this wait
     * @return previous sync state
     */
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            // 預期的,都是釋放鎖成功,如果失敗,說明當前線程並並未獲取到鎖,引發異常
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
    /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        // tryRelease 由客戶端自定義實現
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    // 如何判定當前線程是否在同步隊列中或者可以進行同步隊列?
    /**
     * Returns true if a node, always one that was initially placed on
     * a condition queue, is now waiting to reacquire on sync queue.
     * @param node the node
     * @return true if is reacquiring
     */
    final boolean isOnSyncQueue(Node node) {
        // 如果上一節點還沒有被移除,當前節點就不能被加入到同步隊列
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        // 如果當前節點的下游節點已經存在,則它自身必定已經被移到同步隊列中
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
         // 最終直接從同步隊列中查找,如果找到,則自身已經在同步隊列中
        return findNodeFromTail(node);
    }

    /**
     * Returns true if node is on sync queue by searching backwards from tail.
     * Called only when needed by isOnSyncQueue.
     * @return true if present
     */
    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }
    
    // 當條件被滿足后,需要重新競爭鎖,以保證外部的鎖語義,因為之前自己已經將鎖主動釋放
    // 這個鎖與 lock/unlock 時的一毛一樣,沒啥可講的
    // java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued
    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

  總結一下 wait 的邏輯:

    1. 前提:自身已獲取到外部鎖;
    2. 將當前線程添加到 ConditionQueue 等待隊列中;
    3. 釋放已獲取到的鎖;
    4. 反覆檢查進入等待,直到當前節點被移動到同步隊列中;
    5. 條件滿足被喚醒,重新競爭外部鎖,成功則返回,否則繼續阻塞;(外部鎖是同一個,這也是要求兩個對象必須存在依賴關係的原因)
    6. wait前線程持有鎖,wait后線程持有鎖,沒有一點外部鎖變化;

 

重點2. 釐清了 wait, 接下來,我們看 signal() 通知喚醒的實現:

        // java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal
        /**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signal() {
            // 只有獲取鎖的實例,才可以進行signal,否則你拿什麼去保證線程安全呢
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            // 通知 firstWaiter 
            if (first != null)
                doSignal(first);
        }
        
        /**
         * Removes and transfers nodes until hit non-cancelled one or
         * null. Split out from signal in part to encourage compilers
         * to inline the case of no waiters.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignal(Node first) {
            // 最多只轉移一個 節點
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
    // 將一個節點從 等待隊列 移動到 同步隊列中,即可參与下一輪競爭
    // 只有確實移動成功才會返回 true
    // 說明:當前線程是持有鎖的線程
    // java.util.concurrent.locks.AbstractQueuedSynchronizer#transferForSignal
    /**
     * Transfers a node from a condition queue onto sync queue.
     * Returns true if successful.
     * @param node the node
     * @return true if successfully transferred (else the node was
     * cancelled before signal)
     */
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        // 同步隊列由 head/tail 指針維護
        Node p = enq(node);
        int ws = p.waitStatus;
        // 注意,此處正常情況下並不會喚醒等待線程,僅是將隊列轉移。 
        // 因為當前線程的鎖保護區域並未完成,完成后自然會喚醒其他等待線程
        // 否則將會存在當前線程任務還未執行完成,卻被其他線程搶了先去,那接下來的任務當如何??
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

  總結一下,notify 的功能原理如下:

    1. 前提:自身已獲取到外部鎖;
    2. 轉移下一個等待隊列的節點到同步隊列中;
    3. 如果遇到下一節點被取消情況,順延到再下一節點直到為空,至多轉移一個節點;
    4. 正常情況下不做線程的喚醒操作;

  所以,實現 wait/notify, 最關鍵的就是維護兩個隊列,等待隊列與同步隊列,而且都要求是在有外部鎖保證的情況下執行。

  到此,我們也能回答一個問題:為什麼wait/notify一定要在鎖模式下才能運行?

  因為wait是等待條件成立,此時必定存在競爭需要做保護,而它自身又必須釋放鎖以使外部條件可成立,且後續需要做恢復動作;而notify之後可能還有後續工作必須保障安全,notify只是鎖的一個子集。。。

 

四、通知所有線程的實現:notifyAll

  有時條件成立后,可以允許所有線程通行,這時就可以進行 notifyAll, 那麼如果達到通知所有的目的呢?是一起通知還是??

  以下是 AQS 中的實現:

        // java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signalAll
        public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }
        /**
         * Removes and transfers all nodes.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

  可以看到,它是通過遍歷所有節點,依次轉移等待隊列到同步隊列(通知)的,原本就沒有人能同時干幾件事的!

  本文從java實現的角度去解析同步鎖的原理與實現,但並不局限於java。道理總是相通的,只是像操作系統這樣的大佬,能幹的活更純粹:比如讓cpu根本不用調度一個線程。

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

分類
發燒車訊

Kafka冪等性原理及實現剖析

1.概述

最近和一些同學交流的時候反饋說,在面試Kafka時,被問到Kafka組件組成部分、API使用、Consumer和Producer原理及作用等問題都能詳細作答。但是,問到一個平時不注意的問題,就是Kafka的冪等性,被卡主了。那麼,今天筆者就為大家來剖析一下Kafka的冪等性原理及實現。

2.內容

2.1 Kafka為啥需要冪等性?

Producer在生產發送消息時,難免會重複發送消息。Producer進行retry時會產生重試機制,發生消息重複發送。而引入冪等性后,重複發送只會生成一條有效的消息。Kafka作為分佈式消息系統,它的使用場景常見與分佈式系統中,比如消息推送系統、業務平台系統(如物流平台、銀行結算平台等)。以銀行結算平台來說,業務方作為上游把數據上報到銀行結算平台,如果一份數據被計算、處理多次,那麼產生的影響會很嚴重。

2.2 影響Kafka冪等性的因素有哪些?

在使用Kafka時,需要確保Exactly-Once語義。分佈式系統中,一些不可控因素有很多,比如網絡、OOM、FullGC等。在Kafka Broker確認Ack時,出現網絡異常、FullGC、OOM等問題時導致Ack超時,Producer會進行重複發送。可能出現的情況如下:

 

 

2.3 Kafka的冪等性是如何實現的?

Kafka為了實現冪等性,它在底層設計架構中引入了ProducerID和SequenceNumber。那這兩個概念的用途是什麼呢?

  • ProducerID:在每個新的Producer初始化時,會被分配一個唯一的ProducerID,這個ProducerID對客戶端使用者是不可見的。
  • SequenceNumber:對於每個ProducerID,Producer發送數據的每個Topic和Partition都對應一個從0開始單調遞增的SequenceNumber值。

2.3.1 冪等性引入之前的問題?

Kafka在引入冪等性之前,Producer向Broker發送消息,然後Broker將消息追加到消息流中后給Producer返回Ack信號值。實現流程如下:

 

上圖的實現流程是一種理想狀態下的消息發送情況,但是實際情況中,會出現各種不確定的因素,比如在Producer在發送給Broker的時候出現網絡異常。比如以下這種異常情況的出現:

 

上圖這種情況,當Producer第一次發送消息給Broker時,Broker將消息(x2,y2)追加到了消息流中,但是在返回Ack信號給Producer時失敗了(比如網絡異常) 。此時,Producer端觸發重試機制,將消息(x2,y2)重新發送給Broker,Broker接收到消息后,再次將該消息追加到消息流中,然後成功返回Ack信號給Producer。這樣下來,消息流中就被重複追加了兩條相同的(x2,y2)的消息。

2.3.2 冪等性引入之後解決了什麼問題?

面對這樣的問題,Kafka引入了冪等性。那麼冪等性是如何解決這類重複發送消息的問題的呢?下面我們可以先來看看流程圖:

 

 同樣,這是一種理想狀態下的發送流程。實際情況下,會有很多不確定的因素,比如Broker在發送Ack信號給Producer時出現網絡異常,導致發送失敗。異常情況如下圖所示:

 

 當Producer發送消息(x2,y2)給Broker時,Broker接收到消息並將其追加到消息流中。此時,Broker返回Ack信號給Producer時,發生異常導致Producer接收Ack信號失敗。對於Producer來說,會觸發重試機制,將消息(x2,y2)再次發送,但是,由於引入了冪等性,在每條消息中附帶了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber發送給Broker,而之前Broker緩存過之前發送的相同的消息,那麼在消息流中的消息就只有一條(x2,y2),不會出現重複發送的情況。

2.3.3 ProducerID是如何生成的?

客戶端在生成Producer時,會實例化如下代碼:

// 實例化一個Producer對象
Producer<String, String> producer = new KafkaProducer<>(props);

在org.apache.kafka.clients.producer.internals.Sender類中,在run()中有一個maybeWaitForPid()方法,用來生成一個ProducerID,實現代碼如下:

 private void maybeWaitForPid() {
        if (transactionState == null)
            return;

        while (!transactionState.hasPid()) {
            try {
                Node node = awaitLeastLoadedNodeReady(requestTimeout);
                if (node != null) {
                    ClientResponse response = sendAndAwaitInitPidRequest(node);
                    if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) {
                        InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();
                        transactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
                    } else {
                        log.error("Received an unexpected response type for an InitPidRequest from {}. " +
                                "We will back off and try again.", node);
                    }
                } else {
                    log.debug("Could not find an available broker to send InitPidRequest to. " +
                            "We will back off and try again.");
                }
            } catch (Exception e) {
                log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);
            }
            log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);
            time.sleep(retryBackoffMs);
            metadata.requestUpdate();
        }
    }

3.事務

與冪等性有關的另外一個特性就是事務。Kafka中的事務與數據庫的事務類似,Kafka中的事務屬性是指一系列的Producer生產消息和消費消息提交Offsets的操作在一個事務中,即原子性操作。對應的結果是同時成功或者同時失敗。

這裏需要與數據庫中事務進行區別,操作數據庫中的事務指一系列的增刪查改,對Kafka來說,操作事務是指一系列的生產和消費等原子性操作。

3.1 Kafka引入事務的用途?

在事務屬性引入之前,先引入Producer的冪等性,它的作用為:

  • Producer多次發送消息可以封裝成一個原子性操作,即同時成功,或者同時失敗;
  • 消費者&生產者模式下,因為Consumer在Commit Offsets出現問題時,導致重複消費消息時,Producer重複生產消息。需要將這個模式下Consumer的Commit Offsets操作和Producer一系列生產消息的操作封裝成一個原子性操作。

產生的場景有:

比如,在Consumer中Commit Offsets時,當Consumer在消費完成時Commit的Offsets為100(假設最近一次Commit的Offsets為50),那麼執行觸發Balance時,其他Consumer就會重複消費消息(消費的Offsets介於50~100之間的消息)。

3.2 事務提供了哪些可使用的API?

Producer提供了五種事務方法,它們分別是:initTransactions()、beginTransaction()、sendOffsetsToTransaction()、commitTransaction()、abortTransaction(),代碼定義在org.apache.kafka.clients.producer.Producer<K,V>接口中,具體定義接口如下:

// 初始化事務,需要注意確保transation.id屬性被分配
void initTransactions();

// 開啟事務
void beginTransaction() throws ProducerFencedException;

// 為Consumer提供的在事務內Commit Offsets的操作
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                              String consumerGroupId) throws ProducerFencedException;

// 提交事務
void commitTransaction() throws ProducerFencedException;

// 放棄事務,類似於回滾事務的操作
void abortTransaction() throws ProducerFencedException;

3.3 事務的實際應用場景有哪些?

在Kafka事務中,一個原子性操作,根據操作類型可以分為3種情況。情況如下:

  • 只有Producer生產消息,這種場景需要事務的介入;
  • 消費消息和生產消息並存,比如Consumer&Producer模式,這種場景是一般Kafka項目中比較常見的模式,需要事務介入;
  • 只有Consumer消費消息,這種操作在實際項目中意義不大,和手動Commit Offsets的結果一樣,而且這種場景不是事務的引入目的。

4.總結

Kafka的冪等性和事務是比較重要的特性,特別是在數據丟失和數據重複的問題上非常重要。Kafka引入冪等性,設計的原理也比較好理解。而事務與數據庫的事務特性類似,有數據庫使用的經驗對理解Kafka的事務也比較容易接受。

5.結束語

這篇博客就和大家分享到這裏,如果大家在研究學習的過程當中有什麼問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!

另外,博主出書了《》和《》,喜歡的朋友或同學, 可以在公告欄那裡點擊購買鏈接購買博主的書進行學習,在此感謝大家的支持。關注下面公眾號,根據提示,可免費獲取書籍的教學視頻。 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

分類
發燒車訊

【原創】(十一)Linux內存管理slub分配器

背景

  • Read the fucking source code! –By 魯迅
  • A picture is worth a thousand words. –By 高爾基

說明:

  1. Kernel版本:4.14
  2. ARM64處理器,Contex-A53,雙核
  3. 使用工具:Source Insight 3.5, Visio

1. 概述

之前的文章分析的都是基於頁面的內存分配,而小塊內存的分配和管理是通過塊分配器來實現的。目前內核中,有三種方式來實現小塊內存分配:slab, slub, slob,最先有slab分配器,slub/slob分配器是改進版,slob分配器適用於小內存嵌入式設備,而slub分配器目前已逐漸成為主流塊分配器。接下來的文章,就是以slub分配器為目標,進一步深入。

先來一個初印象:

2. 數據結構

有四個關鍵的數據結構:

  • struct kmem_cache:用於管理SLAB緩存,包括該緩存中對象的信息描述,per-CPU/Node管理slab頁面等;
    關鍵字段如下:
/*
 * Slab cache management.
 */
struct kmem_cache {
    struct kmem_cache_cpu __percpu *cpu_slab;       //每個CPU slab頁面
    /* Used for retriving partial slabs etc */
    unsigned long flags;
    unsigned long min_partial;
    int size;       /* The size of an object including meta data */
    int object_size;    /* The size of an object without meta data */
    int offset;     /* Free pointer offset. */
#ifdef CONFIG_SLUB_CPU_PARTIAL
    /* Number of per cpu partial objects to keep around */
    unsigned int cpu_partial;
#endif
    struct kmem_cache_order_objects oo;     //該結構體會描述申請頁面的order值,以及object的個數

    /* Allocation and freeing of slabs */
    struct kmem_cache_order_objects max;
    struct kmem_cache_order_objects min;
    gfp_t allocflags;   /* gfp flags to use on each alloc */
    int refcount;       /* Refcount for slab cache destroy */
    void (*ctor)(void *);           // 對象構造函數
    int inuse;      /* Offset to metadata */
    int align;      /* Alignment */
    int reserved;       /* Reserved bytes at the end of slabs */
    int red_left_pad;   /* Left redzone padding size */
    const char *name;   /* Name (only for display!) */
    struct list_head list;  /* List of slab caches */       //kmem_cache最終會鏈接在一個全局鏈表中
    struct kmem_cache_node *node[MAX_NUMNODES];     //Node管理slab頁面
};
  • struct kmem_cache_cpu:用於管理每個CPU的slab頁面,可以使用無鎖訪問,提高緩存對象分配速度;
struct kmem_cache_cpu {
    void **freelist;    /* Pointer to next available object */                  //指向空閑對象的指針
    unsigned long tid;  /* Globally unique transaction id */                
    struct page *page;  /* The slab from which we are allocating */     //slab緩存頁面
#ifdef CONFIG_SLUB_CPU_PARTIAL
    struct page *partial;   /* Partially allocated frozen slabs */
#endif
#ifdef CONFIG_SLUB_STATS
    unsigned stat[NR_SLUB_STAT_ITEMS];
#endif
};
  • struct kmem_cache_node:用於管理每個Node的slab頁面,由於每個Node的訪問速度不一致,slab頁面由Node來管理;
/*
 * The slab lists for all objects.
 */
struct kmem_cache_node {
    spinlock_t list_lock;

#ifdef CONFIG_SLUB
    unsigned long nr_partial;    //slab頁表數量
    struct list_head partial;       //slab頁面鏈表
#ifdef CONFIG_SLUB_DEBUG
    atomic_long_t nr_slabs;
    atomic_long_t total_objects;
    struct list_head full;
#endif
#endif
};
  • struct page:用於描述slab頁面struct page結構體中很多字段都是通過union聯合體進行復用的。
    struct page結構中,用於slub的成員如下:
struct page {
    union {
       ...
        void *s_mem;            /* slab first object */
       ...
    };
    
    /* Second double word */
    union {
       ...
        void *freelist;     /* sl[aou]b first free object */
       ...
    };
    
    union {
       ...
        struct {
            union {
              ...
                struct {            /* SLUB */
                    unsigned inuse:16;
                    unsigned objects:15;
                    unsigned frozen:1;
                };
                ...
            };
       ...
        };       
    };   
    
    /*
     * Third double word block
     */
    union {
       ...
        struct {        /* slub per cpu partial pages */
            struct page *next;  /* Next partial slab */
#ifdef CONFIG_64BIT
            int pages;  /* Nr of partial slabs left */
            int pobjects;   /* Approximate # of objects */
#else
            short int pages;
            short int pobjects;
#endif
        };

        struct rcu_head rcu_head;   /* Used by SLAB
                         * when destroying via RCU
                         */
    };
    ...
        struct kmem_cache *slab_cache;  /* SL[AU]B: Pointer to slab */    
    ...
}

圖來了:

3. 流程分析

針對Slub的使用,可以從三個維度來分析:

  1. slub緩存創建
  2. slub對象分配
  3. slub對象釋放

下邊將進一步分析。

3.1 kmem_cache_create

在內核中通過kmem_cache_create接口來創建一個slab緩存

先看一下這個接口的函數調用關係圖:

  1. kmem_cache_create完成的功能比較簡單,就是創建一個用於管理slab緩存kmem_cache結構,並對該結構體進行初始化,最終添加到全局鏈表中。kmem_cache結構體初始化,包括了上文中分析到的kmem_cache_cpukmem_cache_node兩個字段結構。

  2. 在創建的過程中,當發現已有的slab緩存中,有存在對象大小相近,且具有兼容標誌的slab緩存,那就只需要進行merge操作並返回,而無需進一步創建新的slab緩存

  3. calculate_sizes函數會根據指定的force_order或根據對象大小去計算kmem_cache結構體中的size/min/oo等值,其中kmem_cache_order_objects結構體,是由頁面分配order值和對象數量兩者通過位域拼接起來的。

  4. 在創建slab緩存的時候,有一個先雞后蛋的問題:kmem_cache結構體來管理一個slab緩存,而創建kmem_cache結構體又是從slab緩存中分配出來的對象,那麼這個問題是怎麼解決的呢?可以看一下kmem_cache_init函數,內核中定義了兩個靜態的全局變量kmem_cachekmem_cache_node,在kmem_cache_init函數中完成了這兩個結構體的初始化之後,相當於就是創建了兩個slab緩存,一個用於分配kmem_cache結構體對象的緩存池,一個用於分配kmem_cache_node結構體對象的緩存池。由於kmem_cache_cpu結構體是通過__alloc_percpu來分配的,因此不需要創建一個相關的slab緩存

3.2 kmem_cache_alloc

kmem_cache_alloc接口用於從slab緩存池中分配對象。

看一下大體的調用流程圖:

從上圖中可以看出,分配slab對象與Buddy System中分配頁面類似,存在快速路徑和慢速路徑兩種,所謂的快速路徑就是per-CPU緩存,可以無鎖訪問,因而效率更高。

整體的分配流程大體是這樣的:優先從per-CPU緩存中進行分配,如果per-CPU緩存中已經全部分配完畢,則從Node管理的slab頁面中遷移slab頁per-CPU緩存中,再重新分配。當Node管理的slab頁面也不足的情況下,則從Buddy System中分配新的頁面,添加到per-CPU緩存中。

還是用圖來說明更清晰,分為以下幾步來分配:

  1. fastpath
    快速路徑下,以原子的方式檢索per-CPU緩存的freelist列表中的第一個對象,如果freelist為空並且沒有要檢索的對象,則跳入慢速路徑操作,最後再返回到快速路徑中重試操作。

  2. slowpath-1
    將per-CPU緩存中page指向的slab頁中的空閑對象遷移到freelist中,如果有空閑對象,則freeze該頁面,沒有空閑對象則跳轉到slowpath-2

  3. slowpath-2
    將per-CPU緩存中partial鏈表中的第一個slab頁遷移到page指針中,如果partial鏈表為空,則跳轉到slowpath-3

  4. slowpath-3
    將Node管理的partial鏈表中的slab頁遷移到per-CPU緩存中的page中,並重複第二個slab頁將其添加到per-CPU緩存中的partial鏈表中。如果遷移的slab中空閑對象超過了kmem_cache.cpu_partial的一半,則僅遷移slab頁,並且不再重複。
    如果每個Node的partial鏈表都為空,跳轉到slowpath-4

  5. slowpath-4
    Buddy System中獲取頁面,並將其添加到per-CPU的page中。

3.2 kmem_cache_free

kmem_cache_free的操作,可以看成是kmem_cache_alloc的逆過程,因此也分為快速路徑和慢速路徑兩種方式,同時,慢速路徑中又分為了好幾種情況,可以參考kmem_cache_alloc的過程。

調用流程圖如下:

效果如下:

  1. 快速路徑釋放
    快速路徑下,直接將對象返回到freelist中即可。

  2. put_cpu_partial
    put_cpu_partial函數主要是將一個剛freeze的slab頁,放入到partial鏈表中。
    put_cpu_partial函數中調用unfreeze_partials函數,這時候會將per-CPU管理的partial鏈表中的slab頁面添加到Node管理的partial鏈表的尾部。如果超出了Node的partial鏈表,溢出的slab頁面中沒有分配對象的slab頁面將會返回到夥伴系統。

  3. add_partial
    添加slab頁到Node的partial鏈表中。

  4. remove_partial
    從Node的partial鏈表移除slab頁。

具體釋放的流程走哪個分支,跟對象的使用情況,partial鏈表的個數nr_partial/min_partial等相關,細節就不再深入分析了。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

分類
發燒車訊

實現 Redis 協議解析器

本文是 《用 Golang 實現一個 Redis》系列文章第二篇,本文將分別介紹 以及 的實現,若您對協議有所了解可以直接閱讀協議解析器部分。

Redis 通信協議

Redis 自 2.0 版本起使用了統一的協議 RESP (REdis Serialization Protocol),該協議易於實現,計算機可以高效的進行解析且易於被人類讀懂。

RESP 是一個二進制安全的文本協議,工作於 TCP 協議上。客戶端和服務器發送的命令或數據一律以 \r\n (CRLF)結尾。

RESP 定義了5種格式:

  • 簡單字符串(Simple String): 服務器用來返回簡單的結果,比如”OK”。非二進制安全,且不允許換行。
  • 錯誤信息(Error): 服務器用來返回簡單的結果,比如”ERR Invalid Synatx”。非二進制安全,且不允許換行。
  • 整數(Integer): llenscard等命令的返回值, 64位有符號整數
  • 字符串(Bulk String): 二進制安全字符串, get 等命令的返回值
  • 數組(Array, 舊版文檔中稱 Multi Bulk Strings): Bulk String 數組,客戶端發送指令以及lrange等命令響應的格式

RESP 通過第一個字符來表示格式:

  • 簡單字符串:以”+” 開始, 如:”+OK\r\n”
  • 錯誤:以”-” 開始,如:”-ERR Invalid Synatx\r\n”
  • 整數:以”:”開始,如:”:1\r\n”
  • 字符串:以 $ 開始
  • 數組:以 * 開始

Bulk String有兩行,第一行為 $+正文長度,第二行為實際內容。如:

$3\r\nSET\r\n

Bulk String 是二進制安全的可以包含任意字節,就是說可以在 Bulk String 內部包含 “\r\n” 字符(行尾的CRLF被隱藏):

$4
a\r\nb

$-1 表示 nil, 比如使用 get 命令查詢一個不存在的key時,響應即為$-1

Array 格式第一行為 “*”+數組長度,其後是相應數量的 Bulk String。如, ["foo", "bar"]的報文:

*2
$3
foo
$3
bar

客戶端也使用 Array 格式向服務端發送指令。命令本身將作為第一個參數,如 SET key value指令的RESP報文:

*3
$3
SET
$3
key
$5
value

將換行符打印出來:

*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n

協議解析器

我們在 一文中已經介紹過TCP服務器的實現,協議解析器將實現其 Handler 接口充當應用層服務器。

協議解析器將接收 Socket 傳來的數據,並將其數據還原為 [][]byte 格式,如 "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\value\r\n" 將被還原為 ['SET', 'key', 'value']

本文完整代碼:

來自客戶端的請求均為數組格式,它在第一行中標記報文的總行數並使用CRLF作為分行符。

bufio 標準庫可以將從 reader 讀到的數據緩存到 buffer 中,直至遇到分隔符或讀取完畢后返回,所以我們使用 reader.ReadBytes('\n') 來保證每次讀取到完整的一行。

需要注意的是RESP是二進制安全的協議,它允許在正文中使用CRLF字符。舉例來說 Redis 可以正確接收並執行SET "a\r\nb" 1指令, 這條指令的正確報文是這樣的:

*3  
$3
SET
$4
a\r\nb 
$7
myvalue

ReadBytes 讀取到第五行 “a\r\nb\r\n”時會將其誤認為兩行:

*3  
$3
SET
$4
a  // 錯誤的分行
b // 錯誤的分行
$7
myvalue

因此當讀取到第四行$4后, 不應該繼續使用 ReadBytes('\n') 讀取下一行, 應使用 io.ReadFull(reader, msg) 方法來讀取指定長度的內容。

msg = make([]byte, 4 + 2) // 正文長度4 + 換行符長度2
_, err = io.ReadFull(reader, msg)

定義 Client 結構體作為客戶端抽象:

type Client struct {
    /* 與客戶端的 Tcp 連接 */
    conn   net.Conn

    /* 
     * 帶有 timeout 功能的 WaitGroup, 用於優雅關閉
     * 當響應被完整發送前保持 waiting 狀態, 阻止鏈接被關閉
     */
    waitingReply wait.Wait

    /* 標記客戶端是否正在發送指令 */ 
    sending atomic.AtomicBool
    
    /* 客戶端正在發送的參數數量, 即 Array 第一行指定的數組長度 */
    expectedArgsCount uint32
    
    /* 已經接收的參數數量, 即 len(args)*/ 
    receivedCount uint32
    
    /*
     * 已經接收到的命令參數,每個參數由一個 []byte 表示
     */
    args [][]byte
}

定義解析器:

type Handler struct {

    /* 
     * 記錄活躍的客戶端鏈接 
     * 類型為 *Client -> placeholder 
     */
    activeConn sync.Map 

    /* 數據庫引擎,執行指令並返回結果 */
    db db.DB

    /* 關閉狀態標誌位,關閉過程中時拒絕新建連接和新請求 */
    closing atomic.AtomicBool 
}

接下來可以編寫主要部分了:

func (h *Handler)Handle(ctx context.Context, conn net.Conn) {
    if h.closing.Get() {
        // 關閉過程中不接受新連接
        _ = conn.Close()
    }

    /* 初始化客戶端狀態 */
    client := &Client {
        conn:   conn,
    }
    h.activeConn.Store(client, 1)

    reader := bufio.NewReader(conn)
    var fixedLen int64 = 0 // 將要讀取的 BulkString 的正文長度
    var err error
    var msg []byte
    for {
        /* 讀取下一行數據 */ 
        if fixedLen == 0 { // 正常模式下使用 CRLF 區分數據行
            msg, err = reader.ReadBytes('\n')
            // 判斷是否以 \r\n 結尾
            if len(msg) == 0 || msg[len(msg) - 2] != '\r' {
                errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"}
                _, _ =  client.conn.Write(errReply.ToBytes())
            }
        } else { // 當讀取到 BulkString 第二行時,根據給出的長度進行讀取
            msg = make([]byte, fixedLen + 2)
            _, err = io.ReadFull(reader, msg)
            // 判斷是否以 \r\n 結尾
            if len(msg) == 0 || 
              msg[len(msg) - 2] != '\r' ||  
              msg[len(msg) - 1] != '\n'{
                errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"}
                _, _ =  client.conn.Write(errReply.ToBytes())
            }
            // Bulk String 讀取完畢,重新使用正常模式
            fixedLen = 0 
        }
        // 處理 IO 異常
        if err != nil {
            if err == io.EOF || err == io.ErrUnexpectedEOF {
                logger.Info("connection close")
            } else {
                logger.Warn(err)
            }
            _ = client.Close()
            h.activeConn.Delete(client)
            return // io error, disconnect with client
        }

        /* 解析收到的數據 */
        if !client.sending.Get() { 
            // sending == false 表明收到了一條新指令
            if msg[0] == '*' {
                // 讀取第一行獲取參數個數
                expectedLine, err := strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
                if err != nil {
                    _, _ = client.conn.Write(UnknownErrReplyBytes)
                    continue
                }
                // 初始化客戶端狀態
                client.waitingReply.Add(1) // 有指令未處理完成,阻止服務器關閉
                client.sending.Set(true) // 正在接收指令中
                // 初始化計數器和緩衝區 
                client.expectedArgsCount = uint32(expectedLine) 
                client.receivedCount = 0
                client.args = make([][]byte, expectedLine)
            } else {
                // TODO: text protocol
            }
        } else {
            // 收到了指令的剩餘部分(非首行)
            line := msg[0:len(msg)-2] // 移除換行符
            if line[0] == '$' { 
                // BulkString 的首行,讀取String長度
                fixedLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
                if err != nil {
                    errReply := &reply.ProtocolErrReply{Msg:err.Error()}
                    _, _ = client.conn.Write(errReply.ToBytes())
                }
                if fixedLen <= 0 {
                    errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"}
                    _, _ = client.conn.Write(errReply.ToBytes())
                }
            } else { 
                // 收到參數
                client.args[client.receivedCount] = line
                client.receivedCount++
            }


            // 一條命令發送完畢
            if client.receivedCount == client.expectedArgsCount {
                client.sending.Set(false)

                // 執行命令並響應
                result := h.db.Exec(client.args)
                if result != nil {
                    _, _ = conn.Write(result.ToBytes())
                } else {
                    _, _ = conn.Write(UnknownErrReplyBytes)
                }

                // 重置客戶端狀態,等待下一條指令
                client.expectedArgsCount = 0
                client.receivedCount = 0
                client.args = nil
                client.waitingReply.Done()
            }
        }
    }
}

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

分類
發燒車訊

MySQL 複製表結構和表數據

1、前言

  在功能開發完畢,在本地或者測試環境進行測試時,經常會遇到這種情況:有專門的測試數據,測試過程會涉及到修改表中的數據,經常不能一次測試成功,所以,每次執行測試后,原來表中的數據其實已經被修改了,下一次測試,就需要將數據恢復。

  我一般的做法是:先創建一個副本表,比如測試使用的user表,我在測試前創建副本表user_bak,每次測試后,將user表清空,然後將副本表user_bak的數據導入到user表中。

  上面的操作是對一個table做備份,如果涉及到的table太多,可以創建database的副本。

  接下來我將對此處的表結構複製以及表數據複製進行闡述,並非數據庫的複製原理!!!!

  下面是staff表的表結構

create table staff (
	id int not null auto_increment comment '自增id',
	name char(20) not null comment '用戶姓名',
	dep char(20) not null comment '所屬部門',
	gender tinyint not null default 1 comment '性別:1男; 2女',
	addr char(30) not null comment '地址',
	primary key(id),
	index idx_1 (name, dep),
	index idx_2 (name, gender)
) engine=innodb default charset=utf8mb4 comment '員工表';

 

2、具體方式 

  2.1、執行舊錶的創建SQL來創建表

  如果原始表已經存在,那麼可以使用命令查看該表的創建語句:

mysql> show create table staff\G
*************************** 1. row ***************************
       Table: staff
Create Table: CREATE TABLE `staff` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `name` char(20) NOT NULL COMMENT '用戶姓名',
  `dep` char(20) NOT NULL COMMENT '所屬部門',
  `gender` tinyint(4) NOT NULL DEFAULT '1' COMMENT '性別:1男; 2女',
  `addr` char(30) NOT NULL,
  PRIMARY KEY (`id`),
  KEY `idx_1` (`name`,`dep`),
  KEY `idx_2` (`name`,`gender`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='員工表'
1 row in set (0.01 sec)

  可以看到,上面show creat table xx的命令執行結果中,Create Table的值就是創建表的語句,此時可以直接複製創建表的SQL,然後重新執行一次就行了。

  當數據表中有數據的時候,看到的創建staff表的sql就會稍有不同。比如,我在staff中添加了兩條記錄:

mysql> insert into staff values (null, '李明', 'RD', 1, '北京');
Query OK, 1 row affected (0.00 sec)

mysql> insert into staff values (null, '張三', 'PM', 0, '上海');
Query OK, 1 row affected (0.00 sec)

mysql> select * from staff;
+----+--------+-----+--------+--------+
| id | name   | dep | gender | addr   |
+----+--------+-----+--------+--------+
|  1 | 李明   | RD  |      1 | 北京   |
|  2 | 張三   | PM  |      0 | 上海   |
+----+--------+-----+--------+--------+
2 rows in set (0.00 sec)

  此時在執行show create table命令:

mysql> show create table staff\G
*************************** 1. row ***************************
       Table: staff
Create Table: CREATE TABLE `staff` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `name` char(20) NOT NULL COMMENT '用戶姓名',
  `dep` char(20) NOT NULL COMMENT '所屬部門',
  `gender` tinyint(4) NOT NULL DEFAULT '1' COMMENT '性別:1男; 2女',
  `addr` char(30) NOT NULL,
  PRIMARY KEY (`id`),
  KEY `idx_1` (`name`,`dep`),
  KEY `idx_2` (`name`,`gender`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COMMENT='員工表'
1 row in set (0.00 sec)

  注意,上面結果中的倒數第二行

    ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COMMENT=’員工表’

  因為staff表的id是自增的,且已經有了2條記錄,所以下一次插入數據的自增id應該為3,這個信息,也會出現在表的創建sql中。

    

  2.2、使用like創建新表(僅包含表結構)

  使用like根據已有的表來創建新表,特點如下:

  1、方便,不需要查看原表的表結構定義信息;

  2、創建的新表中,表結構定義、完整性約束,都與原表保持一致。

  3、創建的新表是一個空表,全新的表,沒有數據。

  用法如下:

mysql> select * from staff;  #舊錶中已有2條數據
+----+--------+-----+--------+--------+
| id | name   | dep | gender | addr   |
+----+--------+-----+--------+--------+
|  1 | 李明   | RD  |      1 | 北京   |
|  2 | 張三   | PM  |      0 | 上海   |
+----+--------+-----+--------+--------+
2 rows in set (0.00 sec)

mysql> create table staff_bak_1 like staff;  # 直接使用like,前面指定新表名,後面指定舊錶(參考的表)
Query OK, 0 rows affected (0.02 sec)

mysql> show create table staff_bak_1\G
*************************** 1. row ***************************
       Table: staff_bak_1
Create Table: CREATE TABLE `staff_bak_1` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `name` char(20) NOT NULL COMMENT '用戶姓名',
  `dep` char(20) NOT NULL COMMENT '所屬部門',
  `gender` tinyint(4) NOT NULL DEFAULT '1' COMMENT '性別:1男; 2女',
  `addr` char(30) NOT NULL,
  PRIMARY KEY (`id`),
  KEY `idx_1` (`name`,`dep`),
  KEY `idx_2` (`name`,`gender`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='員工表'  # 注意沒有AUTO_INCREMENT=3
1 row in set (0.00 sec)

mysql> select * from staff_bak_1; # 沒有包含舊錶的數據
Empty set (0.00 sec)

  

  2.3、使用as來創建新表(包含數據)

  使用as來創建新表,有一下特點:

  1、可以有選擇性的決定新表包含哪些字段;

  2、創建的新表中,會包含舊錶的數據;

  3、創建的新表不會包含舊錶的完整性約束(比如主鍵、索引等),僅包含最基礎的表結構定義。

  用法如下:

mysql> create table staff_bak_2 as select * from staff;
Query OK, 2 rows affected (0.02 sec)
Records: 2  Duplicates: 0  Warnings: 0

mysql> select * from staff_bak_2;
+----+--------+-----+--------+--------+
| id | name   | dep | gender | addr   |
+----+--------+-----+--------+--------+
|  1 | 李明   | RD  |      1 | 北京   |
|  2 | 張三   | PM  |      0 | 上海   |
+----+--------+-----+--------+--------+
2 rows in set (0.00 sec)

mysql> show create table staff_bak_2\G
*************************** 1. row ***************************
       Table: staff_bak_2
Create Table: CREATE TABLE `staff_bak_2` (
  `id` int(11) NOT NULL DEFAULT '0' COMMENT '自增id',
  `name` char(20) CHARACTER SET utf8mb4 NOT NULL COMMENT '用戶姓名',
  `dep` char(20) CHARACTER SET utf8mb4 NOT NULL COMMENT '所屬部門',
  `gender` tinyint(4) NOT NULL DEFAULT '1' COMMENT '性別:1男; 2女',
  `addr` char(30) CHARACTER SET utf8mb4 NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1
1 row in set (0.00 sec)

  

  利用as創建表的時候沒有保留完整性約束,其實這個仔細想一下也能想明白。因為使用as創建表的時候,可以指定新表包含哪些字段呀,如果你創建新表時,忽略了幾個字段,這樣的話即使保留了完整約束,保存數據是也不能滿足完整性約束。

  比如,staff表有一個索引idx1,由name和dep字段組成;但是我創建的新表中,沒有name和dep字段(只選擇了其他字段),那麼新表中保存idx1也沒有必要,對吧。

mysql> --  只選擇id、gender、addr作為新表的字段,那麼name和dep組成的索引就沒必要存在了
mysql> create table staff_bak_3 as (select id, gender, addr from staff);
Query OK, 2 rows affected (0.02 sec)
Records: 2  Duplicates: 0  Warnings: 0

mysql> show create table staff_bak_3\G
*************************** 1. row ***************************
       Table: staff_bak_3
Create Table: CREATE TABLE `staff_bak_3` (
  `id` int(11) NOT NULL DEFAULT '0' COMMENT '自增id',
  `gender` tinyint(4) NOT NULL DEFAULT '1' COMMENT '性別:1男; 2女',
  `addr` char(30) CHARACTER SET utf8mb4 NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1
1 row in set (0.00 sec)

mysql> select * from staff_bak_3;
+----+--------+--------+
| id | gender | addr   |
+----+--------+--------+
|  1 |      1 | 北京   |
|  2 |      0 | 上海   |
+----+--------+--------+
2 rows in set (0.00 sec)

  

  2.4、使用like+insert+select創建原表的副本(推薦)

  使用like創建新表,雖然保留了舊錶的各種表結構定義以及完整性約束,但是如何將舊錶的數據導入到新表中呢?

  最極端的方式:寫一個程序,先將舊錶數據讀出來,然後寫入到新表中,這個方式我就不嘗試了。

  有一個比較簡單的命令:

mysql> select * from staff; #原表數據
+----+--------+-----+--------+--------+
| id | name   | dep | gender | addr   |
+----+--------+-----+--------+--------+
|  1 | 李明   | RD  |      1 | 北京   |
|  2 | 張三   | PM  |      0 | 上海   |
+----+--------+-----+--------+--------+
2 rows in set (0.00 sec)

mysql> select * from staff_bak_1; # 使用like創建的表,與原表相同的表結構和完整性約束(自增除外)
Empty set (0.00 sec)

mysql> insert into staff_bak_1 select * from staff;  # 將staff表的所有記錄的所有字段值都插入副本表中
Query OK, 2 rows affected (0.00 sec)
Records: 2  Duplicates: 0  Warnings: 0

mysql> select * from staff_bak_1;
+----+--------+-----+--------+--------+
| id | name   | dep | gender | addr   |
+----+--------+-----+--------+--------+
|  1 | 李明   | RD  |      1 | 北京   |
|  2 | 張三   | PM  |      0 | 上海   |
+----+--------+-----+--------+--------+
2 rows in set (0.00 sec)

  

  其實這條SQL語句,是知道兩個表的表結構和完整性約束相同,所以,可以直接select *。

insert into staff_bak_1 select * from staff;

  

  如果兩個表結構不相同,其實也是可以這個方式的,比如:

mysql> show create table demo\G
*************************** 1. row ***************************
       Table: demo
Create Table: CREATE TABLE `demo` (
  `_id` int(11) NOT NULL AUTO_INCREMENT,
  `_name` char(20) DEFAULT NULL,
  `_gender` tinyint(4) DEFAULT '1',
  PRIMARY KEY (`_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
1 row in set (0.00 sec)

# 只將staff表中的id和name字段組成的數據記錄插入到demo表中,對應_id和_name字段
mysql> insert into demo (_id, _name) select id,name from staff;
Query OK, 2 rows affected (0.00 sec)
Records: 2  Duplicates: 0  Warnings: 0

mysql> select * from demo;
+-----+--------+---------+
| _id | _name  | _gender |
+-----+--------+---------+
|   1 | 李明   |       1 |
|   2 | 張三   |       1 |
+-----+--------+---------+
2 rows in set (0.00 sec)

  這是兩個表的字段數量不相同的情況,此時需要手動指定列名,否則就會報錯

 

  另外,如果兩個表的字段數量,以及相同順序的字段類型相同,如果是全部字段複製,即使字段名不同,也可以直接複製

# staff_bak_5的字段名與staff表並不相同,但是字段數量、相同順序字段的類型相同,所以可以直接插入
mysql> show create table staff_bak_5\G
*************************** 1. row ***************************
       Table: staff_bak_5
Create Table: CREATE TABLE `staff_bak_5` (
  `_id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `_name` char(20) NOT NULL COMMENT '用戶姓名',
  `_dep` char(20) NOT NULL COMMENT '所屬部門',
  `_gender` tinyint(4) NOT NULL DEFAULT '1' COMMENT '性別:1男; 2女',
  `_addr` char(30) NOT NULL,
  PRIMARY KEY (`_id`),
  KEY `idx_1` (`_name`,`_dep`),
  KEY `idx_2` (`_name`,`_gender`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COMMENT='員工表'
1 row in set (0.00 sec)

mysql> insert into staff_bak_5 select * from staff;
Query OK, 2 rows affected (0.00 sec)
Records: 2  Duplicates: 0  Warnings: 0

mysql> select * from staff_bak_5;
+-----+--------+------+---------+--------+
| _id | _name  | _dep | _gender | _addr  |
+-----+--------+------+---------+--------+
|   1 | 李明   | RD   |       1 | 北京   |
|   2 | 張三   | PM   |       0 | 上海   |
+-----+--------+------+---------+--------+
2 rows in set (0.00 sec)

  

  

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

分類
發燒車訊

【目標檢測實戰】目標檢測實戰之一–手把手教你LMDB格式數據集製作!

文章目錄

1 目標檢測簡介
2 lmdb數據製作
    2.1 VOC數據製作
    2.2 lmdb文件生成

lmdb格式的數據是在使用caffe進行目標檢測或分類時,使用的一種數據格式。這裏我主要以目標檢測為例講解lmdb格式數據的製作。

1 目標檢測簡介

【1】目標檢測主要有兩個任務:

  1. 判斷圖像中對象的類別
  2. 類別的位置

【2】目標檢測需要的數據:

  1. 訓練所需的圖像數據,可以是jpg、png等圖片格式
  2. 圖像數據對應的類別信息和類別框的位置信息。

2 lmdb數據製作

caffe一般使用lmdb格式的數據,在製作數據之前,我們需要對數據進行標註,可以使用labelImg對圖像進行標註(),這裏就不多贅述數據標註的問題。總之,我們得到了圖像的標註Annotations數據。lmdb數據製作,首先需要將annotations數據和圖像數據製作為VOC格式,然後將其生成LMDB文件即可。下邊是詳細的步驟:

2.1 VOC數據製作

這裏我以caffe環境的Mobilenet+YOLOv3模型的代碼為例(),進行lmdb數據製作,並且也假設你已經對其配置編譯成功(如沒成功,可以參考博文進行配置),所以我們的根目錄為:caffe-Mobilenet-YOLO-master,下邊為詳細步驟:

【1】VOC格式目錄建立:

VOC格式目錄主要包含為:

其中,Annotations里存儲的是xml標註信息,JPEGImages存儲的是圖片,ImageSets則是訓練和測試的txt列表等信息,下邊我們就要安裝如上的目錄進行建立我們自己的數據目錄。

創建Annotations、JPEGImages、ImageSets/Main等文件,命令如下(也可直接界面操作哈):

注:建議新手按照我的名稱,對於後續文件修改容易!!!

cd ~/   # 進入home目錄
cd Documents/  # 進入Documents目錄
cd caffe-Mobilenet-YOLO-master/  # 進入我們的根目錄
cd data         # 進入data目錄內
mkdir VOCdevkit   # 創建存儲我們自己的數據的文件夾
cd VOCdevkit
mkdir MyDataSet  # 創建存儲voc的目錄
cd MyDataSet   
# 創建VOC格式目錄
mkdir Annotations
mkdir JPEGImages
mkdir ImageSets
cd ImageSets
mkdir Main

好啦,我們的文件夾就建立好了,如下圖所示:

【2】將所有xml文件考入至Annotations文件夾內
【3】將所有圖片考入至JPEGImages文件夾內
【4】劃分訓練接、驗證集合測試集,如下為Python代碼,需要修改的地方註釋已標明:

import os  
import random 
# 標註文件的路徑,需要你自己修改
xmlfilepath=r'/home/Documents/caffe-Mobilenet-YOLO-master/data/VOCdevkit/MyDataSet/Annotations/'      
# 這裡是存儲數據的本目錄,需要改為你自己的目錄              
saveBasePath=r"/home/Documents/caffe-Mobilenet-YOLO-master/data/VOCdevkit/"                        
trainval_percent=0.8            # 表示訓練集和驗證集所佔比例,你需要自己修改,也可選擇不修改
train_percent=0.8               # 表示訓練集所佔訓練集驗證集的比例,你需要自己修改,也可選擇不修改       
total_xml = os.listdir(xmlfilepath)
num=len(total_xml)    
list=range(num)    
tv=int(num*trainval_percent)    
tr=int(tv*train_percent)    
trainval= random.sample(list,tv)    
train=random.sample(trainval,tr)    
  
print("train and val size",tv)  
print("traub suze",tr)  
ftrainval = open(os.path.join(saveBasePath,'MyDataSet/ImageSets/Main/trainval.txt'), 'w')    
ftest = open(os.path.join(saveBasePath,'MyDataSet/ImageSets/Main/test.txt'), 'w')    
ftrain = open(os.path.join(saveBasePath,'MyDataSet/ImageSets/Main/train.txt'), 'w')    
fval = open(os.path.join(saveBasePath,'MyDataSet/ImageSets/Main/val.txt'), 'w')    
  
for i  in list:    
    name=total_xml[i][:-4]+'\n'    
    if i in trainval:    
        ftrainval.write(name)    
        if i in train:    
            ftrain.write(name)    
        else:    
            fval.write(name)    
    else:    
        ftest.write(name)    
    
ftrainval.close()    
ftrain.close()    
fval.close()    
ftest .close() 

上述代碼修改之後,在根目錄caffe-Mobilenet-YOLO-master執行上述代碼即可,
在data/VOCdevkit/MyDataSet/ImageSets下生成trainval.txt、test.txt、train.txt、val.txt等所需的txt文件,如下圖所示:

這些TXT文件會包含圖片的名字,不帶路徑,如下圖所示:

2.2 lmdb文件生成

【1】執行如下命令,將生成lmdb所需的腳本複製至data/VOCdevkit/MyDataSet文件夾內:

cp data/VOC0712/create_* data/MyDataSet/                # 把create_list.sh和create_data.sh複製到MyDataSet目錄                  
cp data/VOC0712/labelmap_voc.prototxt data/MyDataSet/   # 把labelmap_voc.prototxt複製到MyDataSet目錄 

【2】修改create_list.sh文件:

1 第3行修改目錄路徑,截止到VOCdevkit即可

2 第13行修改為for name in MyDataSet(VOCdevkit下自己建立的文件夾名字)

3 第15-18行註釋掉

4 第41行get_image_size修改為自己的路徑(注意,這裡是build caffe_mobilenet_yolo之後才會形成的):

#!/bin/bash
# 如果嚴格安裝我上述的步驟,就可以不用修改路徑位置。
# 需要修改的位置也使用註釋進行了標註和解釋

# 這裏需要更改,你數據的根目錄位置,需要修改的地方!!!!
root_dir="/home/Documents/Caffe_Mobilenet_YOLO/data/VOCdevkit/"   
sub_dir=ImageSets/Main
bash_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
for dataset in trainval test
do
  dst_file=$bash_dir/$dataset.txt
  if [ -f $dst_file ]
  then
    rm -f $dst_file
  fi
  for name in MyDataSet  # 如果你建立的不是MyDataSet,這裏需要修改為你自己的名字
  do
    # 這裏需要修改,註釋掉即可
    #if [[ $dataset == "test" && $name == "VOC2012" ]]
    #then
    #  continue
    #fi
    echo "Create list for $name $dataset..."
    dataset_file=$root_dir/$name/$sub_dir/$dataset.txt

    img_file=$bash_dir/$dataset"_img.txt"
    cp $dataset_file $img_file
    sed -i "s/^/$name\/JPEGImages\//g" $img_file
    sed -i "s/$/.jpg/g" $img_file

    label_file=$bash_dir/$dataset"_label.txt"
    cp $dataset_file $label_file
    sed -i "s/^/$name\/Annotations\//g" $label_file
    sed -i "s/$/.xml/g" $label_file

    paste -d' ' $img_file $label_file >> $dst_file

    rm -f $label_file
    rm -f $img_file
  done

  # Generate image name and size infomation.
  if [ $dataset == "test" ]
  then
    home/Documents/Caffe_Mobilenet_YOLO/caffe-MobileNet-YOLO-master/build/tools/get_image_size $root_dir $dst_file $bash_dir/$dataset"_name_size.txt"

【3】creat_data.sh修改:

1 第2行修改為自己的路徑:root_dir=”/home/Documents/caffe-MobileNet-YOLO-master/”

2 第7行修改為:data_root_dir=”/home/Documents/caffe-MobileNet-YOLO-master/data/VOVdevkit/

3 第8行修改為:dataset_name=”MyDataSet”

4 第9行修改為:mapfile=”\(root_dir/data/VOCdevkit/\)dataset_name/labelmap_voc.prototxt”

5 第26行修改為\(root_dir/data/VOCdevkit/\)dataset_name/$subset.txt

cur_dir=$(cd $( dirname ${BASH_SOURCE[0]} ) && pwd )
# 修改為自己的路徑
root_dir="/home/Documents/Caffe_Mobilenet_YOLO/caffe-MobileNet-YOLO-master/"

cd $root_dir

redo=1
# 這裏需要修改為自己的路徑
data_root_dir="/home/Documents/Caffe_Mobilenet_YOLO/caffe-MobileNet-YOLO-master/data/VOCdevkit/"
dataset_name="MyDataSet"  # 修改為自己的名字
mapfile="$root_dir/data/VOCdevkit/$dataset_name/labelmap_voc.prototxt"  # 修改為自己的路徑
anno_type="detection"
db="lmdb"
min_dim=0
max_dim=0
width=0
height=0

extra_cmd="--encode-type=jpg --encoded"
if [ $redo ]
then
  extra_cmd="$extra_cmd --redo"
fi
for subset in test trainval
# subset.txt路徑需要修改
do
  python $root_dir/scripts/create_annoset.py --anno-type=$anno_type \
  --label-map-file=$mapfile --min-dim=$min_dim --max-dim=$max_dim --resize-width=$width \
  --resize-height=$height --check-label $extra_cmd $data_root_dir $root_dir/data/VOCdevkit/$dataset_name/$subset.txt \
  $data_root_dir/$dataset_name/$db/$dataset_name"_"$subset"_"$db examples/$dataset_name

【3】修改labelmap_voc.prototxt文件:

除了第一個背景標籤部分不要修改,其他改成自己的標籤就行,多的刪掉,少了添加進入就行

【4】最後在caffe-MobileNet-YOLO-master/examples文件夾內新建一個MyDataSet文件夾(空的)

【5】運行create_list.sh腳本: ./data/VOCdevkit/MyDataSet/create_list.sh,運行完后,會在自己建的VOCdevkit/MyDataSet/目錄內生成trainval.txt, test.txt, test_name_size.txt。

【6】運行create_data.sh腳本: ./data/VOCdevkit/MyDataSet/create_data.sh

運行此命令時,提示:bash:./data/VOCdevkit/MyDataSet/create_list.sh:Permission denied,沒有權限,需要執行如下命令賦予執行命令:

chmod u+x data/VOCdevkit/MyDataSet/create_data.sh

出現了錯誤:ValueError: need more than 2 values to unpack,

需要將create_annoset.py中第88行的seg去掉,因為我們的Annotations只有兩個值,img_file和anno。

運行完后,會在會在自己建的VOCdevkit/MyDataSet目錄內生成lmdb文件夾:

lmdb對應訓練集和測試集的lmdb格式的文件夾:

***
好啦,今天的教程就到這裏,如有疑問,可關注公眾號【計算機視覺聯盟】私信我或留言交流!!

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

分類
發燒車訊

電動車商機大、日電池材料商瘋增產

 

日本鋰離子電池材料商W-Scope於9月4日發布新聞稿宣布,因來自電動車的需求急增,加上家電產品、電動工具需求持續強勁,故決議投下200億日圓擴增鋰離子電池關鍵材料分隔膜(separator)產能,計畫在南韓子公司W-SCOPE CHUNGJU PLANT CO., LTD.增設四條產線,其中第12-13號產線預計2019年下半年量產、第14-15號產線預計2020年上半年量產。

W-Scope指出,除上述四條新產線之外,該公司之前已進行增產投資,興建第8-11號分隔膜產線,其中第8-9號產線預計2017年下半年量產、第10-11號預計2018年上半年量產,而待上述八條產線全數導入量產後,2020年末整體分隔膜產能將擴增至2016年末的3.6倍水準。

在電動車市場持續擴大的背景下,日本其他鋰電池材料廠紛紛傳出增產消息。日刊工業新聞8月1日報導,Toray計畫在2020年結束前總計砸下約1,200億日圓擴增車用鋰離子電池關鍵材料分隔膜產能,目標將分隔膜年產能擴增至19.5億平方公尺、將達現行的約五倍。

日刊工業新聞6月23日報導,因車廠加快電動車的研發腳步、帶動電池材料市場成長速度超乎預期,故旭化成(Asahi Kasei)計畫上修鋰離子電池關鍵材料分隔膜的增產計畫,目標在2020年結束前將分隔膜年產能最高擴增至15億平方公尺、將達現行的2.5倍,且將遠高於原先規劃的11億平方公尺目標,期望藉由積極投資、鞏固全球龍頭位置。預估追加擴產所需的投資額約300億日圓。

鋰離子電池四大關鍵材料分別為正極材、負極材、分隔膜和電解液,而這些電池材料皆由日系廠商握有高市佔率,其中,在全球分隔膜市場上,旭化成為龍頭廠、Toray緊追在後。

根據日本市調機構富士經濟(Fuji Keizai)預估,2020年全球分隔膜市場規模將增至3,000億日圓、將達2015年的2倍水準,而電動車、混合動力車等車用用途是推動分隔膜需求急增的最大功臣,預估2020年車用分隔膜佔整體市場比重將達約45%。

富士經濟6月22日公布調查報告指出,預估2030年時電動車年銷售量將增至407萬台、超越混合動力車,且之後雙方的差距將持續擴大。富士經濟預估,在中國需求增加加持下,2035年電動車全球銷售量將擴大至630萬台、將達2016年的13.4倍。

(本文內容由授權使用。首圖為鋰電池材料分隔膜,出處:MoneyDJ)

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

分類
發燒車訊

豐田:車輛全面由電池驅動前、須經歷2-3次技術突破

 

CNBC 9月5日報導,豐田汽車公司會長內山田竹志(Takeshi Uchiyamada)在接受專訪時表示,基於當前電池技術的侷限、他懷疑消費者會立即投向電動車的懷抱。內山田表示,豐田不是排斥電動車,但為了要提供足夠的續航力、電動車目前需要安裝許多電池並得花相當長的時間去充電,而且電池壽命也是一大問題。

內山田認為車輛全面由電池驅動之前、還須經歷2到3次的技術性突破才行。不過,他也坦承,隨著中國、美國等地鼓勵電動車發展的法令生效,汽車製造商若不推出電動車的話可能就會被淘汰出局,因此豐田已著手研發較好的電池技術。

吉利汽車控股旗下Volvo日前宣布,2019年起旗下所有新車將會是純電動或油電混合驅動。

根據DNV GL首度發布的「能源轉型展望」報告,受電動車滲透率持續上揚的影響,石油供應將在2020到2028年期間轉趨持平、隨後大幅下降,2034年將遭天然氣超越。這份報告預估電動車、內燃引擎車將在2022年達到「成本平價」,預估到2033年全球半數輕型新車銷售量都將是電動車。

特斯拉(Tesla)平價電動車「Model 3」不含獎勵計畫的售價為35,000美元起、電池續航力為345公里。

Thomson Reuters報導,嘉能可(Glencore)董事長Tony Hayward 於5月受訪時表示,電動車的快速進步意味著石油需求可能會在2040年以前觸頂,深海鑽油、加拿大油砂等高成本原油生產商恐將先被淘汰出局,擁有生產成本優勢的石油輸出國組織(OPEC)相對較不受衝擊。Hayward曾任英國石油公司(BP Plc)執行長。

嘉能可執行長Ivan Glasenberg指出,如果電動車在2035年拿下90到95%的市占率,全球年度銅需求量可望較目前的2,300萬噸呈現倍增。德國總理梅克爾(Angela Merkel)5月22日指出,鋰電池技術已經進步到可以讓電動車擁有1千公里的續航力、遠高於目前的200-300公里,德國必須大舉投資以確保產業繼續保有優勢。

戴姆勒(Daimler AG)董事長Deiter Zetsche 5月22日表示,預估到2022年旗下將有超過10款的純電動轎車系列。戴姆勒旗下全資子公司ACCUMOTIVE當日在德國卡門茨(Kamenz)為第二座電池工廠舉行奠基儀式、邀請梅克爾出席。這座工廠耗資5億歐元、預計在2018年年中正式營運。

(本文內容由授權使用。圖片出處:public domian CC0)

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

分類
發燒車訊

趨勢無法擋!BMW、JLR宣布自2020年起力推電動車

Thomson Reuters報導,英國最大汽車製造商Jaguar Land Rover(JLR)7日宣布,2020年起旗下所有新車都將具備電動或油電混合驅動選項。英國跟隨法國以及馬德里、墨西哥城和雅典等城市的抗空汙腳步,7月宣布將自2040年起禁止販售汽油和柴油新車。

德國車廠BMW 7日宣布將自2020年起開始量產電動車、預估到2025年將有12種純電動車款。BMW執行長Harald Krueger表示,電動車續航力將上看700公里。電池成本約占電動車造價的30到50%。巴克萊(Barclays)預估電池產能的大幅提升將可令電動車、內燃引擎車在2020到2030年間達到「成本平價」。ING預估歐洲有望在2035年成為100%的電動車市場。

BBC News報導,BMW研發部負責人Klaus Froehlich 日前表示,電動移動趨勢是不可逆轉的,自2020年起BMW所有車款都將擁有電動驅動選項。

(本文內容由授權使用。)

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

分類
發燒車訊

再擴大充電網路,特斯拉宣布將開始在市區設置充電站

 

除了擴建超級充電站、推出家用充電器安裝選擇,特斯拉(Tesla)近日又宣布將在美國一些大型城市內設置市區充電站,來提供給駕駛人更方便的充電選擇。

根據特斯拉透露,市區內的小型充電站將會先在芝加哥、波士頓推出,市區充電站僅能夠提供約72kW 的電力,大約是以往超級充電站120kW 功率的一半,這意味著充電時間會稍微變長,大約會需要45-50 分鐘來為汽車充電。

但在市區充電站中,電動車將不會受到超級充電站內那種「充電分流」的影響,即使另一輛車使用相鄰的充電器,雙方的充電功率也不會因為分流而降低,讓駕駛在高密度人口的城市內也能夠享受快速、實惠的充電。

 

Major increases in the Supercharger and Tesla urban charger network happening over the next several months

— Elon Musk (@elonmusk)

 

除此之外,為了讓駕駛人在汽車充電時不會無聊,特斯拉計畫將這些充電站設置在超級市場、購物中心和市中心的景點附近,讓駕駛能夠利用時間去採買或逛街。市區充電站的收費也將和超級充電站一致,價格遠比汽油的花費便宜的多。

特斯拉希望透過這些市區充電站的設置,能在家庭充電器或目的地級充電站之外,提供給駕駛一個新的選擇,並慢慢達成特斯拉遠大計劃的一部分——建立一個完整的充電網路,讓特斯拉駕駛能夠開車到達世界各地。

「我們會持續擴大充電網路,讓特斯拉駕駛無論身在何處,都能隨時找到可靠的方法取得電力。」

(合作媒體:。圖片出處:)

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選