Skip to content

Commit

Permalink
[Fix]fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
NaturalSelect committed Apr 13, 2022
1 parent 39af8ec commit c1ddd93
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 74 deletions.
10 changes: 1 addition & 9 deletions include/sharpen/LevelTable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,7 @@

#include <set>

#include "MemoryTable.hpp"
#include "BinaryLogger.hpp"
#include "SortedStringTable.hpp"
#include "AsyncReadWriteLock.hpp"
#include "LevelComponent.hpp"
#include "MemoryTableComparator.hpp"
#include "LevelView.hpp"
#include "LevelTableOption.hpp"
#include "LockTable.hpp"
#include "LevelTableScanner.hpp"

namespace sharpen
Expand Down Expand Up @@ -437,7 +429,7 @@ namespace sharpen
return scanner;
}

inline sharpen::LevelTableScanner Scan(bool useCache,const sharpen::ByteBuffer &beginKey,const sharpen::ByteBuffer &endKey) const
inline sharpen::LevelTableScanner Scan(const sharpen::ByteBuffer &beginKey,const sharpen::ByteBuffer &endKey,bool useCache) const
{
sharpen::LevelTableScanner scanner{*this,beginKey,endKey};
if(!useCache)
Expand Down
3 changes: 0 additions & 3 deletions include/sharpen/LevelTableScanner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@
#include "MemoryTable.hpp"
#include "BinaryLogger.hpp"
#include "SortedStringTable.hpp"
#include "AsyncReadWriteLock.hpp"
#include "LevelComponent.hpp"
#include "MemoryTableComparator.hpp"
#include "LevelView.hpp"
#include "LevelTableOption.hpp"
#include "LockTable.hpp"
#include "Optional.hpp"

namespace sharpen
Expand Down
29 changes: 17 additions & 12 deletions include/sharpen/MemoryTable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#include "Noncopyable.hpp"
#include "Optional.hpp"
#include "ExistStatus.hpp"
#include "SpinLock.hpp"
#include "AsyncReadWriteLock.hpp"

namespace sharpen
{
Expand Down Expand Up @@ -89,10 +89,11 @@ namespace sharpen
using ConstIterator = typename MapType::const_iterator;
private:
using Self = sharpen::InternalMemoryTable<_Logger,_Pred,sharpen::EnableIf<sharpen::IsMemoryTableLogger<_Logger>::Value>>;

using Lock = sharpen::AsyncReadWriteLock;

sharpen::CompressedPair<_Logger,MapType> pair_;
bool directlyErase_;
mutable std::unique_ptr<sharpen::SpinLock> lock_;
mutable std::unique_ptr<Lock> lock_;

_Logger &Logger() noexcept
{
Expand Down Expand Up @@ -166,7 +167,7 @@ namespace sharpen
,directlyErase_(false)
,lock_(nullptr)
{
this->lock_.reset(new sharpen::SpinLock{});
this->lock_.reset(new Lock{});
if(!this->lock_)
{
throw std::bad_alloc();
Expand All @@ -179,7 +180,7 @@ namespace sharpen
,directlyErase_(true)
,lock_(nullptr)
{
this->lock_.reset(new sharpen::SpinLock{});
this->lock_.reset(new Lock{});
if(!this->lock_)
{
throw std::bad_alloc();
Expand All @@ -193,7 +194,7 @@ namespace sharpen
,directlyErase_(false)
,lock_(nullptr)
{
this->lock_.reset(new sharpen::SpinLock{});
this->lock_.reset(new Lock{});
if(!this->lock_)
{
throw std::bad_alloc();
Expand All @@ -206,7 +207,7 @@ namespace sharpen
,directlyErase_(true)
,lock_(nullptr)
{
this->lock_.reset(new sharpen::SpinLock{});
this->lock_.reset(new Lock{});
if(!this->lock_)
{
throw std::bad_alloc();
Expand Down Expand Up @@ -247,7 +248,8 @@ namespace sharpen
{
this->Logger().Log(batch);
{
std::unique_lock<sharpen::SpinLock> lock{*this->lock_};
this->lock_->LockWrite();
std::unique_lock<Lock> lock{*this->lock_,std::adopt_lock};
this->InternalAction(batch);
}
}
Expand All @@ -273,7 +275,8 @@ namespace sharpen

sharpen::ExistStatus Exist(const sharpen::ByteBuffer &key) const noexcept
{
std::unique_lock<sharpen::SpinLock> lock{*this->lock_};
this->lock_->LockRead();
std::unique_lock<Lock> lock{*this->lock_,std::adopt_lock};
auto ite = this->Map().find(key);
if(ite == this->Map().end())
{
Expand All @@ -288,7 +291,8 @@ namespace sharpen

sharpen::ByteBuffer &Get(const sharpen::ByteBuffer &key)
{
std::unique_lock<sharpen::SpinLock> lock{*this->lock_};
this->lock_->LockRead();
std::unique_lock<Lock> lock{*this->lock_,std::adopt_lock};
auto &item = this->Map().at(key);
if(item.IsDeleted())
{
Expand All @@ -299,7 +303,8 @@ namespace sharpen

const sharpen::ByteBuffer &Get(const sharpen::ByteBuffer &key) const
{
std::unique_lock<sharpen::SpinLock> lock{*this->lock_};
this->lock_->LockRead();
std::unique_lock<Lock> lock{*this->lock_,std::adopt_lock};
auto &item = this->Map().at(key);
if(item.IsDeleted())
{
Expand Down Expand Up @@ -338,7 +343,7 @@ namespace sharpen
this->Map().clear();
}

inline sharpen::SpinLock &GetLock() const noexcept
inline Lock &GetLock() const noexcept
{
return *this->lock_;
}
Expand Down
2 changes: 1 addition & 1 deletion src/BalancedTableScanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ sharpen::BalancedTableScanner::BalancedTableScanner(const sharpen::BalancedTable
auto &rootLock{this->table_->GetRootLock()};
rootLock.LockRead();
std::unique_lock<sharpen::AsyncReadWriteLock> lock{rootLock,std::adopt_lock};
this->table_->TableScan(std::back_inserter(this->pointers_));
this->table_->TableScan(std::back_inserter(this->pointers_),beginKey,endKey);
if(!this->pointers_.empty())
{
this->range_.Construct(beginKey,endKey);
Expand Down
39 changes: 17 additions & 22 deletions src/LevelTableScanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

sharpen::Optional<sharpen::ByteBuffer> sharpen::LevelTableScanner::SelectKeyFromMemTable(const MemTable &table,const sharpen::ByteBuffer *before,const sharpen::ByteBuffer *after) const
{
table.GetLock().LockRead();
std::unique_lock<sharpen::AsyncReadWriteLock> lock{table.GetLock(),std::adopt_lock};
if(!table.Empty())
{
auto begin = table.Begin(),end = table.End();
Expand Down Expand Up @@ -94,6 +96,7 @@ sharpen::Optional<sharpen::ByteBuffer> sharpen::LevelTableScanner::SelectKeyFrom
return sharpen::EmptyOpt;
}

//(after,before]
sharpen::Optional<sharpen::ByteBuffer> sharpen::LevelTableScanner::SelectKey(const sharpen::ByteBuffer *before,const sharpen::ByteBuffer *after) const
{
const auto &memTable{this->table_->GetMemoryTable()};
Expand Down Expand Up @@ -184,22 +187,15 @@ sharpen::LevelTableScanner::LevelTableScanner(const sharpen::LevelTable &table,c
this->table_->GetLevelLock().LockRead();
std::unique_lock<sharpen::AsyncReadWriteLock> lock{this->table_->GetLevelLock(),std::adopt_lock};
this->table_->TableScan(std::back_inserter(this->tables_));
if(this->table_->Exist(beginKey) == sharpen::ExistStatus::Exist)
auto key{this->SelectKey(&this->GetRangeEnd(),&this->GetRangeBegin())};
if(key.Exist())
{
this->currentKey_ = beginKey;
this->currentKey_ = std::move(key.Get());
this->levelLock_ = std::move(lock);
}
else
{
auto key{this->SelectKey(&this->GetRangeBegin(),&this->GetRangeEnd())};
if(key.Exist())
{
this->currentKey_ = std::move(key.Get());
this->levelLock_ = std::move(lock);
}
else
{
this->isEmpty_ = true;
}
this->isEmpty_ = true;
}
}

Expand Down Expand Up @@ -253,21 +249,20 @@ bool sharpen::LevelTableScanner::Seek(const sharpen::ByteBuffer &key)
this->currentKey_ = key;
return true;
}
sharpen::Optional<sharpen::ByteBuffer> opt;
if(this->IsRangeQuery())
{
auto tmp{this->SelectKey(&this->GetRangeEnd(),&key)};
if(tmp.Exist())
{
this->currentKey_ = std::move(tmp.Get());
}
return tmp.Exist();
opt = this->SelectKey(&this->GetRangeEnd(),&key);
}
else
{
opt = this->SelectKey(nullptr,&key);
}
auto tmp{this->SelectKey(nullptr,&key)};
if(tmp.Exist())
if(opt.Exist())
{
this->currentKey_ = std::move(tmp.Get());
this->currentKey_ = std::move(opt.Get());
}
return tmp.Exist();
return opt.Exist();
}

sharpen::ByteBuffer sharpen::LevelTableScanner::GetCurrentValue() const
Expand Down
52 changes: 25 additions & 27 deletions test/PersistenceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ void Entry()
try
{
std::puts("persistence test begin");
/*sharpen::FileChannelPtr log = sharpen::MakeFileChannel(logName, sharpen::FileAccessModel::All, sharpen::FileOpenModel::CreateNew);
sharpen::FileChannelPtr log = sharpen::MakeFileChannel(logName, sharpen::FileAccessModel::All, sharpen::FileOpenModel::CreateNew);
log->Register(sharpen::EventEngine::GetEngine());
sharpen::FileChannelPtr table = sharpen::MakeFileChannel(tableName, sharpen::FileAccessModel::All, sharpen::FileOpenModel::CreateNew);
table->Register(sharpen::EventEngine::GetEngine());
Expand Down Expand Up @@ -312,18 +312,16 @@ void Entry()
auto scanner{pt.Scan(true)};
if(!scanner.IsEmpty())
{
bool stop{false};
while (!stop)
do
{
sharpen::ByteBuffer val{scanner.GetCurrentValue()};
std::fputs("bt scanner value is ", stdout);
for (sharpen::Size i = 0; i != val.GetSize(); ++i)
{
std::putchar(val[i]);
}
stop = !scanner.Next();
std::putchar('\n');
}
} while (scanner.Next());
}
assert(pt.IsFault() == false);
}
Expand Down Expand Up @@ -441,16 +439,18 @@ void Entry()
}
}
std::printf("bt scan %zu keys\n",count);
auto scanner{pt.Scan(true)};
sharpen::ByteBuffer beginKey{sizeof(sharpen::Uint32)};
beginKey.As<sharpen::Uint32>() = 0;
sharpen::ByteBuffer endKey{sizeof(sharpen::Uint32)};
endKey.As<sharpen::Uint32>() = 10000;
auto scanner{pt.Scan(beginKey,endKey,true)};
if(!scanner.IsEmpty())
{
bool stop{false};
while (!stop)
do
{
sharpen::ByteBuffer val{scanner.GetCurrentValue()};
std::printf("bt scanner value is %u\n",val.As<sharpen::Uint32>());
stop = !scanner.Next();
}
} while (scanner.Next());
}
}
}
Expand All @@ -463,39 +463,37 @@ void Entry()
sharpen::RemoveFile(logName);
sharpen::RemoveFile(tableName);
sharpen::RemoveFile(tableName2);
sharpen::RemoveFile(tableName3);*/
sharpen::RemoveFile(tableName3);
{
sharpen::LevelTable table{sharpen::EventEngine::GetEngine(),"./table","kdb",sharpen::LevelTableOption{&CompAsUint32}};
for (sharpen::Uint32 i = 0,count = static_cast<sharpen::Uint32>(2*1e6); i != count;++i)
for (sharpen::Uint32 i = 0,count = static_cast<sharpen::Uint32>(1e5); i != count;++i)
{
sharpen::ByteBuffer key{sizeof(sharpen::Uint32)};
key.As<sharpen::Uint32>() = i;
sharpen::ByteBuffer value{sizeof(sharpen::Uint32)};
value.As<sharpen::Uint32>() = i;
table.Put(std::move(key),std::move(value));
}
// for (sharpen::Uint32 i = 0,count = static_cast<sharpen::Uint32>(1e6); i != count;++i)
// {
// sharpen::ByteBuffer key{sizeof(sharpen::Uint32)};
// key.As<sharpen::Uint32>() = i;
// sharpen::ByteBuffer value{sizeof(sharpen::Uint32)};
// value.As<sharpen::Uint32>() = i;
// std::printf("lt check %u/%u\n",i,count - 1);
// assert(table.Get(key) == value);
// }
for (sharpen::Uint32 i = 0,count = static_cast<sharpen::Uint32>(1e5); i != count;++i)
{
sharpen::ByteBuffer key{sizeof(sharpen::Uint32)};
key.As<sharpen::Uint32>() = i;
sharpen::ByteBuffer value{sizeof(sharpen::Uint32)};
value.As<sharpen::Uint32>() = i;
std::printf("lt check %u/%u\n",i,count - 1);
assert(table.Get(key) == value);
}
sharpen::ByteBuffer begin{sizeof(sharpen::Uint32)},end{sizeof(sharpen::Uint32)};
begin.As<sharpen::Uint32>() = 0;
end.As<sharpen::Uint32>() = 10;
auto scanner{table.Scan(true,begin,end)};
end.As<sharpen::Uint32>() = 10000;
auto scanner{table.Scan(begin,end,true)};
if(!scanner.IsEmpty())
{
bool stop{false};
while (!stop)
do
{
sharpen::ByteBuffer val{scanner.GetCurrentValue()};
std::printf("lt scanner value is %u\n",val.As<sharpen::Uint32>());
stop = !scanner.Next();
}
} while(scanner.Next());
}
table.Destory();
}
Expand Down

0 comments on commit c1ddd93

Please sign in to comment.