경고 : 아래 코드는 다중 읽기/쓰기가 아닙니다!
다수의 쓰기가 없거나 크기 조정이 필요하지 않는 한 제대로 작동합니다. 그러나 여러 작성자가 있으면 코드가 교착 상태로 전환 될 수 있습니다. 코드를 수정하고 있는데이 문제를 해결하려면 관련 질문에 게시하겠습니다.
<시간>이후 에서 무엇이 잘못되었는지 분석 크기 제한이없는 잠금없는 작업 큐 (다중 읽기/쓰기) 다른 해결책을 생각해 냈습니다.
require_lock_
잠금 해제 설정을 종료하고 버퍼 크기를 조정해야 할 때 신호를 보내는 데 사용되므로 기술적으로 잠금 해제 설정이 없습니다 **
lock_
이것은 버퍼 크기를 조정해야 할 때만 사용되는 잠금이므로 큐를 충분히 크게 만들면 모든 작업이 끝날 때까지 잠금이 해제됩니다
concurrent_users_
다음 회원에게 액세스하는 사용자 수를 추적합니다
read_, write_, size_
대기열에있는 작업 수를 추적하는 데 사용됩니다
storage_
이것은 작업이 저장되는 공간을 할당하는 벡터입니다. 조정이 필요한 경우 잠금이 사용됩니다
*bitflag_
이것은
lookup_
내부의 위치를 나타내는 데 사용되는 비트 배열입니다.
찍은. 이것은
storage_
와 동기화 상태를 유지합니다
작업이 추가 될 때마다 다음이 발생합니다 :
-
concurrent_users_
storage_
를 터치하려고 함을 나타 내기 위해 증가합니다. 그리고bitflag_
-
require_lock_
뮤텍스를 획득해야하는지 확인합니다 -
write_
작업을 작성할 고유 색인을 검색하도록 증가했습니다 - 이 ID가 범위를 벗어나면 뮤텍스 동기화를 요구하는 스토리지 크기를 조정하려고합니다
- 우리는 직업을 저장한다
- 우리는 비트 배열에 인덱스를 표시하여이 작업을 사용할 준비가되었음을 나타냅니다
작업이 대기열에서 제거 된 경우 :
-
concurrent_users_
가드가 다시 범위를 벗어나면 자동으로 감소합니다. - 우리는
require_lock_
를 확인 -
read_
write_
처럼 증가 푸시 기능에서 증가했습니다 - 수신 한 ID가 범위를 벗어난 경우 read_를 이전 값으로 되돌리고 작업 공급이 소진되었으므로 정리를 시도합니다
- 저장소에서 작업을 검색하고
bitflag_
를 제거합니다 작업을 사용할 준비가되었음을 표시
write_
의 최대 값에 도달하게됩니다.
변수는 큐가 완전히 비워 질 때까지 코드가 더 이상 작업을 허용하지 않습니다.
사용자 지정 할당자를 허용하도록 코드도 변경해야하지만 잠금 해제 설정이 완료되면 구현하기 쉬운 것으로 생각했습니다.
귀하의 의견을 듣고 싶습니다. 완전히 잠금을 해제 할 수 없었지만이 솔루션은 속임수처럼 느껴지지만 다소 멋지다고 생각합니다.
- 실제로 스레드 안전합니까?
- 내가 언급 한 점 외에도 어떤 기능을 개선/추가 할 수 있습니까?
fifo.h :
#pragma once
#include <atomic>
#include <memory>
#include <vector>
#include <mutex>
#include <thread>
#include <algorithm>
namespace lock_free
{
/**
* this class is used so we're able to use the RAII mechanism for locking
*/
template < typename T >
class use_count
{
public:
template < typename V >
use_count( V &&v ) :
data_( std::forward< V >( v ) ) { }
const T& operator()() const { return data_; }
void lock() { ++data_; }
void unlock() { --data_; }
private:
use_count( const use_count& );
use_count& operator = ( const use_count& );
T data_;
};
/**
* This is a lock free fifo, which can be used for multi-producer, multi-consumer
* type job queue
*/
template < typename Value >
class fifo
{
public:
typedef Value value_type;
fifo( size_t size = 1024 ) :
require_lock_( false ),
lock_(),
concurrent_users_( 0 ),
read_( 0 ),
write_( 0 ),
size_( size ),
storage_( size ),
bitflag_( new std::atomic_size_t[ std::max( size_t( 1 ), size / bits_per_section() ) ] )
{
fill_bitflags( 0 );
}
~fifo()
{
clear();
delete [] bitflag_;
}
/**
* pushes an item into the job queue, may throw if allocation fails
* leaving the queue unchanged
*/
void push( const value_type &value )
{
std::lock_guard< use_count< std::atomic_size_t > > lock( concurrent_users_ );
conditional_lock();
if ( write_ == std::numeric_limits< size_t >::max() )
{
throw std::logic_error( "fifo full, remove some jobs before adding new ones" );
}
const size_t id = write_++;
if ( id >= size_ )
{
resize_storage( id );
}
storage_[ id ] = value;
set_bitflag_( id, mask_for_id( id ) );
}
/**
* retrieves an item from the job queue.
* if no item was available, func is untouched and pop returns false
*/
bool pop( value_type &func )
{
auto assign = [ & ]( value_type &dst, value_type &src )
{
std::swap( dst, src );
};
return pop_generic( func, assign );
}
/**
* clears the job queue, storing all pending jobs in the supplied container.
* the container is also returned for convenience
*/
template < typename T >
T& pop_all( T &unfinished )
{
value_type tmp;
while ( pop( tmp ) )
{
unfinished.push_back( tmp );
}
return unfinished;
}
/**
* clears the job queue.
*/
void clear()
{
auto del = []( value_type&, value_type& ) {};
value_type tmp;
while ( pop_generic( tmp, del ) )
{
// empty
}
}
/**
* returns true if there are no pending jobs
*/
bool empty() const
{
return read_ == write_;
}
private:
fifo( const fifo& );
fifo& operator = ( const fifo& );
static constexpr size_t bits_per_section()
{
return sizeof( size_t ) * 8;
}
template < typename Assign >
bool pop_generic( value_type &value, Assign assign )
{
std::lock_guard< use_count< std::atomic_size_t > > lock( concurrent_users_ );
conditional_lock();
const size_t id = read_++;
if ( id >= write_ )
{
--read_;
try_cleanup();
return false;
}
const size_t mask = mask_for_id( id );
while ( !unset_bitflag_( id, mask ) )
{
std::this_thread::yield();
}
assign( value, storage_[ id ] );
return true;
}
void try_cleanup()
{
if ( !write_ || read_ != write_ || require_lock_ )
{
// early exit, avoids needless locking
return;
}
bool expected( false );
if ( require_lock_.compare_exchange_strong( expected, true ) )
{
std::lock_guard< std::mutex > guard( lock_ );
while ( concurrent_users_() > 1 )
{
std::this_thread::yield();
}
write_ = 0;
read_ = 0;
fill_bitflags( 0 );
require_lock_ = false;
}
}
void resize_storage( size_t id )
{
while ( size_ <= id )
{
if ( id == size_ )
{
require_lock_ = true;
std::lock_guard< std::mutex > guard( lock_ );
while ( concurrent_users_() > 1 )
{
std::this_thread::yield();
}
const size_t bitflag_size = size_ / bits_per_section();
storage_.resize( std::max( size_t( 1 ), size_ * 2 ) );
std::atomic_size_t *newbitflag = new std::atomic_size_t[ std::max( size_t( 1 ), bitflag_size * 2 ) ];
std::atomic_size_t *start = newbitflag;
const std::atomic_size_t *end = start + bitflag_size;
const std::atomic_size_t *src = bitflag_;
while ( start != end )
{
(start++)->store( *src++ );
}
end = newbitflag + bitflag_size * 2;
while ( start != end )
{
(start++)->store( 0 );
}
delete [] bitflag_;
bitflag_ = newbitflag;
size_ = storage_.size();
require_lock_ = false;
}
else
{
conditional_lock();
}
}
}
static size_t mask_for_id( size_t id )
{
const size_t offset = id / bits_per_section();
id -= offset * bits_per_section();
return size_t( 1 ) << id;
}
void set_bitflag_( size_t id, size_t mask )
{
bitflag_[ id / bits_per_section() ].fetch_or( mask );
}
bool unset_bitflag_( size_t id, size_t mask )
{
const size_t old = bitflag_[ id / bits_per_section() ].fetch_and( ~mask );
return ( old & mask ) == mask;
}
void conditional_lock()
{
if ( require_lock_ )
{
concurrent_users_.unlock();
lock_.lock();
lock_.unlock();
concurrent_users_.lock();
}
}
void fill_bitflags( size_t value )
{
std::atomic_size_t *start = bitflag_;
const std::atomic_size_t *end = start + size_ / bits_per_section();
while ( start != end )
{
(start++)->store( value );
}
}
std::atomic_bool require_lock_;
std::mutex lock_;
use_count< std::atomic_size_t > concurrent_users_;
std::atomic_size_t read_, write_, size_;
std::vector< value_type > storage_;
std::atomic_size_t *bitflag_;
};
}
- 답변 # 1
- C++ 보기 유형: const&또는 값으로 전달?
- c++ : shared_pointer 구문을 이해할 수 없음
- Visual Studio 2010에서 C++11 표준을 "활성화"하는 방법은 무엇입니까?
- c++ : 템플릿 매개변수 그룹을 정의하고 조건부로 전문화할 수 있습니까?
- c++ : spdlog로 플레이하는 방법?
- C++ 중첩 컨테이너 및 개체의 메모리 해제
- C++ : Code::Blocks, MinGW 및 C++11과 함께 Google Mock을 사용한 단위 테스트
- C++ 댕글링 참조 이상한 동작
- c++ : 두 벡터를 하나로 병합하기
- c++ : std::regex가 다른 정규식 라이브러리보다 악명 높게 느린 이유는 무엇입니까?
이 대기열이 여러 "작성자"를 지원하지 않는다고 말하면 무슨 뜻입니까? 여러 생산자를 의미합니까? 큐에서 푸시 (생산) 및 팝 (소비)은 모두 변경 ( "쓰기") 작업이기 때문입니다. 그리고 "한 작가가 아이템을 밀고 한 작가가 그것을 터뜨리는"시나리오를 지원하지 않으면 실제로는 동시 큐라고 부를 수 없습니다. 그래서 여러분은 "한 명의 생산자, 한 명의 소비자"를 지원하는의미라고 가정하고 그 시나리오에서 버그가 어디에 있는지 보여줄 것입니다.
<시간>use_count<T>
만 사용T = std::atomic_size_t
와 함께 따라서 클래스 템플릿이 아니어야합니다 (일반적인 클래스 여야 함).템플릿을 유지하는 경우 생성자는 가변 템플릿이어야합니다. 완벽한 전달 및 다양한 템플릿은 땅콩 버터 및 젤리와 같이 사용됩니다.그리고생성자는 반드시
explicit
여야합니다 원하지 않는 암시 적 변환을 비활성화하려면use_count
에 대한 개인 복사 생성자 및 복사 할당 연산자를 선언합니다. 그러나 결코 정의하지 마십시오. 이 C ++ 03 관용구는 C ++ 11부터 더 이상 사용되지 않습니다. 더 나은 방법은 (atomic_size_t
가 있기 때문에 전혀 선언하지 않는 것입니다) 멤버는 클래스를 복사/이동할 수 없도록하거나=delete
로 정의합니다. .와이즈 비즈 생성자는 또한
fifo
로 표시되어야합니다 .explicit
아마도bitflag_
해야합니다 원시 포인터 대신빈 줄을 모두 제거하면 코드를 더 쉽게 읽을 수 있습니다 (함수 정의 사이의 줄을 제외하고는 추측합니다). 예를 들어, 당신은
<시간>std::unique_ptr<std::atomic_size_t[]>
를 썼습니다 21 개 라인 중 5 개는 비어 있고 2 개는 풀리지 않은push()
에스. 14 줄이었을 수도 있습니다.알겠습니다. 여기 버그가 있습니다 (내 생각에 뭔가 빠진 것이 있으면 알려주세요). 스레드 T1이 저장소 크기를 조정하려고하고 스레드 T2가 팝업을 시도하는 새로운 리더로 들어오고 있다고 가정합니다.
T1 :
{
에 들어갑니다T1 : 증분
push
0에서 1까지 T1 : 전화concurrent_users_.data_
아무 작업도하지 않고 반환됩니다.T1 :
conditional_lock
에 들어갑니다resize_storage
와 함께T1 : 세트
id == size_
T1 : 잠금
require_lock_ = true
T1 : 테스트
lock_
concurrent_users_() > 1
때문에 거짓입니다.T1 : 따라서
concurrent_users_.data_ == 1
의 반복을 실행하지 않습니다 루프T1 : Wyzwyz에서 정점에 달하는 비 동기화되지 않은 것들
그동안
T2 :
while
에 들어갑니다T2 :
delete [] bitflag_;
에 들어갑니다T2 : 증분
pop
1에서 2까지 T2 :pop_generic
에 들어갑니다T2 : 테스트
concurrent_users_.data_
conditional_lock
입니다T2 :
requires_lock_
감소 2에서 1까지 T2 :true
인수 그것을 해제합니다.T2 : 증분
concurrent_users_.data_
1에서 2까지 T2 :lock_
에서 반환T2 : Wyzwyz에서 정점에 달하는 비 동기화되지 않은 것들
그것을 인터리브하자 무슨 일이 일어나고 있는지 분명하다 :
T1 :
concurrent_users_.data_
에 들어갑니다T1 : 증분
conditional_lock
0에서 1까지 T1 : 전화unset_bitflag_
아무 작업도하지 않고 반환됩니다.T1 :
push
에 들어갑니다concurrent_users_.data_
와 함께T1 : 세트
conditional_lock
T2 :
resize_storage
에 들어갑니다T2 :
id == size_
에 들어갑니다T2 : 증분
require_lock_ = true
1에서 2까지 T2 :pop
에 들어갑니다T2 : 테스트
pop_generic
concurrent_users_.data_
입니다T2 :
conditional_lock
감소 2에서 1까지 T2 :requires_lock_
인수 그것을 해제T1 : 잠금
true
T1 : 테스트
concurrent_users_.data_
lock_
때문에 거짓입니다.T1 : 따라서
lock_
의 반복을 실행하지 않습니다 루프T1 : Wyzwyz에서 정점에 달하는 비 동기화되지 않은 것들
T2 :
<시간>concurrent_users_() > 1
증분 1에서 2까지 T2 :concurrent_users_.data_ == 1
에서 반환T2 : Wyzwyz에서 정점에 달하는 비 동기화되지 않은 것들
따라서
while
가 지적한 메모리에 동기화되지 않은 데이터 레이스가 있습니다. . T2가 그것을보기 전에 T1은 그 메모리를 쉽게 할당 해제 할 수 있습니다. 즉, T2는 가비지 메모리를 읽거나 이미 재 할당되어 현재 다른 스레드에서 사용중인 메모리에 스톰 핑 될 수 있습니다.delete [] bitflag_;