>

경고 : 아래 코드는 다중 읽기/쓰기가 아닙니다!

다수의 쓰기가 없거나 크기 조정이 필요하지 않는 한 제대로 작동합니다. 그러나 여러 작성자가 있으면 코드가 교착 상태로 전환 될 수 있습니다. 코드를 수정하고 있는데이 문제를 해결하려면 관련 질문에 게시하겠습니다.

<시간>

이후 에서 무엇이 잘못되었는지 분석 크기 제한이없는 잠금없는 작업 큐 (다중 읽기/쓰기) 다른 해결책을 생각해 냈습니다.

require_lock_  잠금 해제 설정을 종료하고 버퍼 크기를 조정해야 할 때 신호를 보내는 데 사용되므로 기술적으로 잠금 해제 설정이 없습니다 **

lock_  이것은 버퍼 크기를 조정해야 할 때만 사용되는 잠금이므로 큐를 충분히 크게 만들면 모든 작업이 끝날 때까지 잠금이 해제됩니다

concurrent_users_  다음 회원에게 액세스하는 사용자 수를 추적합니다

read_, write_, size_  대기열에있는 작업 수를 추적하는 데 사용됩니다

storage_  이것은 작업이 저장되는 공간을 할당하는 벡터입니다. 조정이 필요한 경우 잠금이 사용됩니다

*bitflag_  이것은 lookup_ 내부의 위치를 ​​나타내는 데 사용되는 비트 배열입니다.  찍은. 이것은 storage_ 와 동기화 상태를 유지합니다

작업이 추가 될 때마다 다음이 발생합니다 :

  • concurrent_users_   storage_ 를 터치하려고 함을 나타 내기 위해 증가합니다.  그리고 bitflag_
  • require_lock_  뮤텍스를 획득해야하는지 확인합니다
  • write_  작업을 작성할 고유 색인을 검색하도록 증가했습니다
  • 이 ID가 범위를 벗어나면 뮤텍스 동기화를 요구하는 스토리지 크기를 조정하려고합니다
  • 우리는 직업을 저장한다
  • 우리는 비트 배열에 인덱스를 표시하여이 작업을 사용할 준비가되었음을 나타냅니다

작업이 대기열에서 제거 된 경우 :

  • concurrent_users_  가드가 다시 범위를 벗어나면 자동으로 감소합니다.
  • 우리는 require_lock_ 를 확인
  • read_   write_ 처럼 증가  푸시 기능에서 증가했습니다
  • 수신 한 ID가 범위를 벗어난 경우 read_를 이전 값으로 되돌리고 작업 공급이 소진되었으므로 정리를 시도합니다
  • 저장소에서 작업을 검색하고 bitflag_ 를 제거합니다  작업을 사용할 준비가되었음을 표시
언급했듯이, 이것은 잠금이없는 설정이 아니기 때문에 기술적으로는 초기 질문에 대한 해결책이 아닙니다. 그러나 스토리지가 가득 찬 경우에만 잠기므로 실제로 뮤텍스 잠금을 방지하려면 예상 작업에 충분한 공간을 할당해야합니다.

이 시점에서 코드는 예외 안전하지 않으므로 코드를 변경하여 강력한 예외 안전을 보장합니다. 또한 작업 큐가 완전히 비워지지 않도록 허용하면 '이전'작업에 사용 된 많은 메모리가 생겨 결국 write_ 의 최대 값에 도달하게됩니다.  변수는 큐가 완전히 비워 질 때까지 코드가 더 이상 작업을 허용하지 않습니다. 사용자 지정 할당자를 허용하도록 코드도 변경해야하지만 잠금 해제 설정이 완료되면 구현하기 쉬운 것으로 생각했습니다.

귀하의 의견을 듣고 싶습니다. 완전히 잠금을 해제 할 수 없었지만이 솔루션은 속임수처럼 느껴지지만 다소 멋지다고 생각합니다.

  • 실제로 스레드 안전합니까?
  • 내가 언급 한 점 외에도 어떤 기능을 개선/추가 할 수 있습니까?

fifo.h :

#pragma once
#include <atomic>
#include <memory>
#include <vector>
#include <mutex>
#include <thread>
#include <algorithm>
namespace lock_free
{
    /**
     * this class is used so we're able to use the RAII mechanism for locking
     */
    template < typename T >
    class use_count
    {
        public:
            template < typename V >
            use_count( V &&v ) :
            data_( std::forward< V >( v ) ) { }
            const T& operator()() const { return data_; }
            void lock() { ++data_; }
            void unlock() { --data_; }
        private:
            use_count( const use_count& );
            use_count& operator = ( const use_count& );
            T data_;
    };
    /**
     * This is a lock free fifo, which can be used for multi-producer, multi-consumer
     * type job queue
     */
    template < typename Value >
    class fifo
    {
        public:
            typedef Value value_type;
            fifo( size_t size = 1024 ) :
                require_lock_( false ),
                lock_(),
                concurrent_users_( 0 ),
                read_( 0 ),
                write_( 0 ),
                size_( size ),
                storage_( size ),
                bitflag_( new std::atomic_size_t[ std::max( size_t( 1 ), size / bits_per_section() ) ] )
            {
                fill_bitflags( 0 );
            }
            ~fifo()
            {
                clear();
                delete [] bitflag_;
            }
            /**
             * pushes an item into the job queue, may throw if allocation fails
             * leaving the queue unchanged
             */
            void push( const value_type &value )
            {
                std::lock_guard< use_count< std::atomic_size_t > > lock( concurrent_users_ );
                conditional_lock();
                if ( write_ == std::numeric_limits< size_t >::max() )
                {
                    throw std::logic_error( "fifo full, remove some jobs before adding new ones" );
                }
                const size_t id = write_++;
                if ( id >= size_ )
                {
                    resize_storage( id );
                }
                storage_[ id ] = value;
                set_bitflag_( id, mask_for_id( id ) );
            }
            /**
             * retrieves an item from the job queue.
             * if no item was available, func is untouched and pop returns false
             */
            bool pop( value_type &func )
            {
                auto assign = [ & ]( value_type &dst, value_type &src )
                {
                    std::swap( dst, src );
                };
                return pop_generic( func, assign );
            }
            /**
             * clears the job queue, storing all pending jobs in the supplied container.
             * the container is also returned for convenience
             */
            template < typename T >
            T& pop_all( T &unfinished )
            {
                value_type tmp;
                while ( pop( tmp ) )
                {
                    unfinished.push_back( tmp );
                }
                return unfinished;
            }
            /**
             * clears the job queue.
             */
            void clear()
            {
                auto del = []( value_type&, value_type& ) {};
                value_type tmp;
                while ( pop_generic( tmp, del ) )
                {
                    // empty
                }
            }
            /**
             * returns true if there are no pending jobs
             */
            bool empty() const
            {
                return read_ == write_;
            }
        private:
            fifo( const fifo& );
            fifo& operator = ( const fifo& );
            static constexpr size_t bits_per_section()
            {
                return sizeof( size_t ) * 8;
            }
            template < typename Assign >
            bool pop_generic( value_type &value, Assign assign )
            {
                std::lock_guard< use_count< std::atomic_size_t > > lock( concurrent_users_ );
                conditional_lock();
                const size_t id = read_++;
                if ( id >= write_ )
                {
                    --read_;
                    try_cleanup();
                    return false;
                }
                const size_t mask = mask_for_id( id );
                while ( !unset_bitflag_( id, mask ) )
                {
                    std::this_thread::yield();
                }
                assign( value, storage_[ id ] );
                return true;
            }
            void try_cleanup()
            {
                if ( !write_ || read_ != write_ || require_lock_ )
                {
                    // early exit, avoids needless locking
                    return;
                }
                bool expected( false );
                if ( require_lock_.compare_exchange_strong( expected, true ) )
                {
                    std::lock_guard< std::mutex > guard( lock_ );
                    while ( concurrent_users_() > 1 )
                    {
                        std::this_thread::yield();
                    }
                    write_ = 0;
                    read_ = 0;
                    fill_bitflags( 0 );
                    require_lock_ = false;
                }
            }
            void resize_storage( size_t id )
            {
                while ( size_ <= id )
                {
                    if ( id == size_ )
                    {
                        require_lock_ = true;
                        std::lock_guard< std::mutex > guard( lock_ );
                        while ( concurrent_users_() > 1 )
                        {
                            std::this_thread::yield();
                        }
                        const size_t bitflag_size = size_ / bits_per_section();
                        storage_.resize( std::max( size_t( 1 ), size_ * 2 ) );
                        std::atomic_size_t *newbitflag = new std::atomic_size_t[ std::max( size_t( 1 ), bitflag_size * 2 ) ];
                        std::atomic_size_t *start = newbitflag;
                        const std::atomic_size_t *end = start + bitflag_size;
                        const std::atomic_size_t *src = bitflag_;
                        while ( start != end )
                        {
                            (start++)->store( *src++ );
                        }
                        end = newbitflag + bitflag_size * 2;
                        while ( start != end )
                        {
                            (start++)->store( 0 );
                        }
                        delete [] bitflag_;
                        bitflag_ = newbitflag;
                        size_ = storage_.size();
                        require_lock_ = false;
                    }
                    else
                    {
                        conditional_lock();
                    }
                }
            }
            static size_t mask_for_id( size_t id )
            {
                const size_t offset = id / bits_per_section();
                id -= offset * bits_per_section();
                return size_t( 1 ) << id;
            }
            void set_bitflag_( size_t id, size_t mask )
            {
                bitflag_[ id / bits_per_section() ].fetch_or( mask );
            }
            bool unset_bitflag_( size_t id, size_t mask )
            {
                const size_t old = bitflag_[ id / bits_per_section() ].fetch_and( ~mask );
                return ( old & mask ) == mask;
            }
            void conditional_lock()
            {
                if ( require_lock_ )
                {
                    concurrent_users_.unlock();
                    lock_.lock();
                    lock_.unlock();
                    concurrent_users_.lock();
                }
            }
            void fill_bitflags( size_t value )
            {
                std::atomic_size_t *start = bitflag_;
                const std::atomic_size_t *end = start + size_ / bits_per_section();
                while ( start != end )
                {
                    (start++)->store( value );
                }
            }
            std::atomic_bool require_lock_;
            std::mutex lock_;
            use_count< std::atomic_size_t > concurrent_users_;
            std::atomic_size_t read_, write_, size_;
            std::vector< value_type > storage_;
            std::atomic_size_t *bitflag_;
    };
}

  • 답변 # 1

    이 대기열이 여러 "작성자"를 지원하지 않는다고 말하면 무슨 뜻입니까? 여러 생산자를 의미합니까? 큐에서 푸시 (생산) 및 팝 (소비)은 모두 변경 ( "쓰기") 작업이기 때문입니다. 그리고 "한 작가가 아이템을 밀고 한 작가가 그것을 터뜨리는"시나리오를 지원하지 않으면 실제로는 동시 큐라고 부를 수 없습니다. 그래서 여러분은 "한 명의 생산자, 한 명의 소비자"를 지원하는의미라고 가정하고 그 시나리오에서 버그가 어디에 있는지 보여줄 것입니다.

    <시간>

    use_count<T> 만 사용   T = std::atomic_size_t 와 함께 따라서 클래스 템플릿이 아니어야합니다 (일반적인 클래스 여야 함).

    템플릿을 유지하는 경우 생성자는 가변 템플릿이어야합니다. 완벽한 전달 및 다양한 템플릿은 땅콩 버터 및 젤리와 같이 사용됩니다.그리고생성자는 반드시 explicit 여야합니다 원하지 않는 암시 적 변환을 비활성화하려면

    template <typename... Args>
    explicit use_count(Args&&... args) : data_(std::forward<Args>(args)...) { }
    
    

    use_count 에 대한 개인 복사 생성자 및 복사 할당 연산자를 선언합니다. 그러나 결코 정의하지 마십시오. 이 C ++ 03 관용구는 C ++ 11부터 더 이상 사용되지 않습니다. 더 나은 방법은 ( atomic_size_t 가 있기 때문에 전혀 선언하지 않는 것입니다)  멤버는 클래스를 복사/이동할 수 없도록하거나 =delete 로 정의합니다. .

    와이즈 비즈  생성자는 또한 fifo 로 표시되어야합니다 .

    explicit  아마도 bitflag_ 해야합니다  원시 포인터 대신

    빈 줄을 모두 제거하면 코드를 더 쉽게 읽을 수 있습니다 (함수 정의 사이의 줄을 제외하고는 추측합니다). 예를 들어, 당신은 std::unique_ptr<std::atomic_size_t[]> 를 썼습니다  21 개 라인 중 5 개는 비어 있고 2 개는 풀리지 않은 push() 에스. 14 줄이었을 수도 있습니다.

    <시간>

    알겠습니다. 여기 버그가 있습니다 (내 생각에 뭔가 빠진 것이 있으면 알려주세요). 스레드 T1이 저장소 크기를 조정하려고하고 스레드 T2가 팝업을 시도하는 새로운 리더로 들어오고 있다고 가정합니다.

    T1 : { 에 들어갑니다
    T1 : 증분 push  0에서 1까지 T1 : 전화 concurrent_users_.data_ 아무 작업도하지 않고 반환됩니다.
    T1 : conditional_lock 에 들어갑니다   resize_storage 와 함께
    T1 : 세트 id == size_
    T1 : 잠금 require_lock_ = true
    T1 : 테스트 lock_ concurrent_users_() > 1 때문에 거짓입니다.
    T1 : 따라서 concurrent_users_.data_ == 1 의 반복을 실행하지 않습니다  루프
    T1 : Wyzwyz에서 정점에 달하는 비 동기화되지 않은 것들

    그동안

    T2 : while 에 들어갑니다
    T2 : delete [] bitflag_; 에 들어갑니다
    T2 : 증분 pop  1에서 2까지 T2 : pop_generic 에 들어갑니다
    T2 : 테스트 concurrent_users_.data_ conditional_lock 입니다
    T2 : requires_lock_ 감소  2에서 1까지 T2 : true 인수  그것을 해제합니다.
    T2 : 증분 concurrent_users_.data_  1에서 2까지 T2 : lock_ 에서 반환
    T2 : Wyzwyz에서 정점에 달하는 비 동기화되지 않은 것들

    그것을 인터리브하자 무슨 일이 일어나고 있는지 분명하다 :

    T1 : concurrent_users_.data_ 에 들어갑니다
    T1 : 증분 conditional_lock  0에서 1까지 T1 : 전화 unset_bitflag_ 아무 작업도하지 않고 반환됩니다.
    T1 : push 에 들어갑니다   concurrent_users_.data_ 와 함께
    T1 : 세트 conditional_lock

    T2 : resize_storage 에 들어갑니다
    T2 : id == size_ 에 들어갑니다
    T2 : 증분 require_lock_ = true  1에서 2까지 T2 : pop 에 들어갑니다
    T2 : 테스트 pop_generic concurrent_users_.data_ 입니다
    T2 : conditional_lock 감소  2에서 1까지 T2 : requires_lock_ 인수  그것을 해제

    T1 : 잠금 true
    T1 : 테스트 concurrent_users_.data_ lock_ 때문에 거짓입니다.
    T1 : 따라서 lock_ 의 반복을 실행하지 않습니다  루프
    T1 : Wyzwyz에서 정점에 달하는 비 동기화되지 않은 것들

    T2 : concurrent_users_() > 1 증분  1에서 2까지 T2 : concurrent_users_.data_ == 1 에서 반환
    T2 : Wyzwyz에서 정점에 달하는 비 동기화되지 않은 것들

    <시간>

    따라서 while 가 지적한 메모리에 동기화되지 않은 데이터 레이스가 있습니다. . T2가 그것을보기 전에 T1은 그 메모리를 쉽게 할당 해제 할 수 있습니다. 즉, T2는 가비지 메모리를 읽거나 이미 재 할당되어 현재 다른 스레드에서 사용중인 메모리에 스톰 핑 될 수 있습니다.

    delete [] bitflag_;

  • 이전 java - 어레이에서 가장 유사한 색상을 찾는 빠른 방법
  • 다음 java - Box2d와 농구 충돌