Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd64ffb6

Browse files
committed
HBase compatible
1 parent7cbec9c commitd64ffb6

File tree

4 files changed

+207
-26
lines changed

4 files changed

+207
-26
lines changed

‎src/observer/table/ob_htable_filter_operator.cpp‎

Lines changed: 89 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -635,8 +635,17 @@ ObHTableRowIterator::ObHTableRowIterator(const ObTableQuery &query)
635635
cell_count_(0),
636636
count_per_row_(0),
637637
has_more_cells_(true),
638-
is_first_result_(true)
639-
{}
638+
is_first_result_(true),
639+
allow_partial_results_(false),
640+
is_cache_block_(true),
641+
scanner_context_(NULL)
642+
{
643+
if (query.get_ob_params().ob_params_ !=nullptr) {
644+
ObHBaseParams* hbase_params =dynamic_cast<ObHBaseParams*> (query.get_ob_params().ob_params_);
645+
allow_partial_results_ = hbase_params->allow_partial_results_;
646+
is_cache_block_ = hbase_params->is_cache_block_;
647+
}
648+
}
640649

641650
ObHTableRowIterator::~ObHTableRowIterator()
642651
{
@@ -772,6 +781,10 @@ int ObHTableRowIterator::get_next_result(ObTableQueryResult *&out_result)
772781
count_per_row_ =0;
773782
ret = matcher_->set_to_new_row(curr_cell_);
774783
}
784+
if (OB_SUCC(ret) && allow_partial_results_) {
785+
scanner_context_->limits_.set_size_scope(LimitScope::Scope::BETWEEN_CELLS);
786+
scanner_context_->limits_.set_time_scope(LimitScope::Scope::BETWEEN_CELLS);
787+
}
775788
bool loop =true;
776789
if (OB_SUCC(ret)) {
777790
if (NULL == matcher_->get_curr_row()) {
@@ -865,6 +878,8 @@ int ObHTableRowIterator::get_next_result(ObTableQueryResult *&out_result)
865878
if (OB_FAIL(out_result->add_row(*(curr_cell_.get_ob_row())))) {
866879
LOG_WARN("failed to add row to result",K(ret));
867880
}else {
881+
scanner_context_->increment_batch_progress(1);
882+
scanner_context_->increment_size_progress(curr_cell_.get_ob_row()->get_serialize_size());
868883
++cell_count_;
869884
LOG_DEBUG("[yzfdebug] add cell",K_(cell_count),K_(curr_cell),
870885
K_(count_per_row),K_(offset_per_row_per_cf));
@@ -909,7 +924,7 @@ int ObHTableRowIterator::get_next_result(ObTableQueryResult *&out_result)
909924
}
910925
}
911926
if (OB_SUCC(ret)) {
912-
if (reach_batch_limit() ||reach_size_limit()) {
927+
if (scanner_context_->check_any_limit(LimitScope::Scope::BETWEEN_CELLS)) {
913928
loop =false;
914929
}
915930
}elseif (OB_ITER_END == ret) {
@@ -1040,11 +1055,23 @@ ObHTableFilterOperator::ObHTableFilterOperator(const ObTableQuery &query,
10401055
row_iterator_(query),
10411056
one_result_(&one_result),
10421057
hfilter_(NULL),
1043-
batch_size_(query.get_batch()),
1058+
caching_(query.get_batch()),
10441059
max_result_size_(std::min(query.get_max_result_size(),
10451060
static_cast<int64_t>(ObTableQueryResult::get_max_packet_buffer_length() - 1024))),
1046-
is_first_result_(true)
1061+
is_first_result_(true),
1062+
check_existence_only_(false),
1063+
scanner_context_()
10471064
{
1065+
if (query.get_ob_params().ob_params_ !=nullptr) {
1066+
ObHBaseParams* hbase_params =dynamic_cast<ObHBaseParams*> (query.get_ob_params().ob_params_);
1067+
scanner_context_.limits_.set_fields(hbase_params->batch_, max_result_size_, hbase_params->call_timeout_,LimitScope(LimitScope::Scope::BETWEEN_ROWS));
1068+
check_existence_only_ = hbase_params->check_existence_only_;
1069+
}
1070+
row_iterator_.set_scanner_context(&scanner_context_);
1071+
}
1072+
1073+
boolObHTableFilterOperator::reach_caching_limit(int num_of_row) {
1074+
return (caching_ >=0 && num_of_row < caching_) || caching_ <0;
10481075
}
10491076

10501077
// @param one_result for one batch
@@ -1059,8 +1086,12 @@ int ObHTableFilterOperator::get_next_result(ObTableQueryResult *&next_result)
10591086
// ObObj first_entity_cells[4];
10601087
// first_entity.cells_ = first_entity_cells;
10611088
// first_entity.count_ = 4;
1089+
int num_of_row =0;
1090+
scanner_context_.progress_.reset();
10621091
while (OB_SUCC(ret) && row_iterator_.has_more_result()
1063-
&&OB_SUCC(row_iterator_.get_next_result(htable_row))) {
1092+
&&reach_caching_limit(num_of_row)
1093+
&&OB_SUCC(row_iterator_.get_next_result(htable_row)
1094+
)) {
10641095
LOG_DEBUG("[yzfdebug] got one row","cells_count", htable_row->get_row_count());
10651096
bool is_empty_row = (htable_row->get_row_count() ==0);
10661097
if (is_empty_row) {
@@ -1107,14 +1138,21 @@ int ObHTableFilterOperator::get_next_result(ObTableQueryResult *&next_result)
11071138
}
11081139
/* @todo check batch limit and size limit*/
11091140
// We have got one hbase row, store it to this batch
1141+
if (check_existence_only_) {
1142+
one_result_->check_exists_only(*htable_row);
1143+
htable_row->reset();
1144+
break;
1145+
}
11101146
if (OB_FAIL(one_result_->add_all_row(*htable_row))) {
11111147
LOG_WARN("failed to add cells to row",K(ret));
11121148
}
11131149
if (NULL != hfilter_) {
11141150
hfilter_->reset();
11151151
}
11161152
if (OB_SUCC(ret)) {
1117-
if (one_result_->reach_batch_size_or_result_size(batch_size_, max_result_size_)) {
1153+
num_of_row +=1;
1154+
scanner_context_.increment_size_progress(htable_row->get_serialize_size_());
1155+
if (scanner_context_.check_any_limit(LimitScope::Scope::BETWEEN_ROWS)) {
11181156
break;
11191157
}
11201158
}
@@ -1152,3 +1190,47 @@ int ObHTableFilterOperator::parse_filter_string(common::ObIAllocator* allocator)
11521190
}
11531191
return ret;
11541192
}
1193+
1194+
ScannerContext::ScannerContext() {
1195+
limits_.set_fields(-1, -1, -1, LimitScope::Scope::BETWEEN_ROWS);
1196+
progress_.set_fields(0,0,0, LimitScope::Scope::BETWEEN_ROWS);
1197+
}
1198+
1199+
ScannerContext::ScannerContext(int32_t batch,int64_t size,int64_t time, LimitScope limit_scope) {
1200+
limits_.set_fields(batch, size, time, limit_scope);
1201+
}
1202+
1203+
voidScannerContext::increment_batch_progress(int32_t batch) {
1204+
int32_t current_batch = progress_.get_batch();
1205+
progress_.set_batch(current_batch + batch);
1206+
}
1207+
1208+
voidScannerContext::increment_size_progress(int64_t size) {
1209+
int64_t current_size = progress_.get_size();
1210+
progress_.set_size(current_size + size);
1211+
}
1212+
1213+
boolScannerContext::check_batch_limit(LimitScope checker_scope) {
1214+
if (limits_.can_enforce_batch_from_scope(checker_scope) && limits_.get_batch() >0) {
1215+
return progress_.get_batch() >= limits_.get_batch();
1216+
}
1217+
returnfalse;
1218+
}
1219+
1220+
boolScannerContext::check_size_limit(LimitScope checker_scope) {
1221+
if (limits_.can_enforce_size_from_scope(checker_scope) && limits_.get_size() >0 ){
1222+
return progress_.get_size() >= limits_.get_size();
1223+
}
1224+
returnfalse;
1225+
}
1226+
1227+
boolScannerContext::check_time_limit(LimitScope checker_scope) {
1228+
if (limits_.can_enforce_time_from_scope(checker_scope) && limits_.get_time() >0) {
1229+
return progress_.get_time() >= limits_.get_time();
1230+
}
1231+
returnfalse;
1232+
}
1233+
1234+
boolScannerContext::check_any_limit(LimitScope checker_scope) {
1235+
returncheck_batch_limit(checker_scope) ||check_size_limit(checker_scope) ||check_time_limit(checker_scope);
1236+
}

‎src/observer/table/ob_htable_filter_operator.h‎

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,79 @@ class ObHTableWildcardColumnTracker: public ObHTableColumnTracker
166166
int32_t current_count_;
167167
};
168168

169+
170+
classLimitScope {
171+
public:
172+
enumclassScope {
173+
BETWEEN_ROWS =0,
174+
BETWEEN_CELLS =1
175+
};
176+
LimitScope() : scope_(Scope::BETWEEN_ROWS), depth_(0) {}
177+
LimitScope(Scope scope) : scope_(scope), depth_(static_cast<int>(scope)) {}
178+
~LimitScope();
179+
OB_INLINEintdepth()const {return depth_; }
180+
OB_INLINEvoidset_scope(Scope scope) {scope_ = scope; depth_ =static_cast<int>(scope);}
181+
OB_INLINEboolcan_enforce_limit_from_scope(const LimitScope& checker_scope)const {return checker_scope.depth() <= depth_; }
182+
TO_STRING_KV(K_(scope), K_(depth));
183+
private:
184+
Scope scope_;
185+
int depth_;
186+
};
187+
188+
classLimitFields {
189+
public:
190+
LimitFields(): batch_(-1), size_(-1), time_(-1), size_scope_(LimitScope::Scope::BETWEEN_ROWS), time_scope_(LimitScope::Scope::BETWEEN_ROWS) {}
191+
LimitFields(int32_t batch,int64_t size,int64_t time, LimitScope limit_scope) {set_fields(batch, size, time, limit_scope); }
192+
voidset_fields(int32_t batch,int64_t size,int64_t time, LimitScope limit_scope) {
193+
set_batch(batch);
194+
set_size_scope(limit_scope);
195+
set_time_scope(limit_scope);
196+
set_size(size);
197+
set_time(time);
198+
}
199+
~LimitFields();
200+
voidreset() { batch_ =0; size_ =0; time_ =0; size_scope_.set_scope(LimitScope::Scope::BETWEEN_ROWS);}
201+
voidset_batch(int32_t batch) { batch_ = batch; }
202+
voidset_size(int64_t size) { size_ = size; }
203+
voidset_time(int64_t time) { time_ = time; }
204+
voidset_time_scope(LimitScope scope) { time_scope_ = scope; }
205+
voidset_size_scope(LimitScope scope) { size_scope_ = scope; }
206+
int32_tget_batch() {return batch_; }
207+
int64_tget_size() {return size_; }
208+
int64_tget_time() {return time_; }
209+
LimitScopeget_size_scope() {return size_scope_; }
210+
LimitScopeget_time_scope() {return time_scope_; }
211+
boolcan_enforce_batch_from_scope(LimitScope checker_scope) {returnLimitScope(LimitScope::Scope::BETWEEN_CELLS).can_enforce_limit_from_scope(checker_scope);}
212+
boolcan_enforce_size_from_scope(LimitScope checker_scope) {return size_scope_.can_enforce_limit_from_scope(checker_scope); }
213+
boolcan_enforce_time_from_scope(LimitScope checker_scope) {return time_scope_.can_enforce_limit_from_scope(checker_scope); }
214+
TO_STRING_KV(K_(batch), K_(size), K_(time), K_(size_scope), K_(time_scope));
215+
private:
216+
int32_t batch_;
217+
int64_t size_;
218+
int64_t time_;
219+
LimitScope size_scope_;
220+
LimitScope time_scope_;
221+
};
222+
223+
classScannerContext {
224+
public:
225+
ScannerContext();
226+
~ScannerContext();
227+
ScannerContext(int32_t batch,int64_t size,int64_t time, LimitScope limit_scope);
228+
voidincrement_batch_progress(int32_t batch);
229+
voidincrement_size_progress(int64_t size);
230+
voidupdate_time_progress() { progress_.set_time(ObTimeUtility::current_time()); }
231+
boolcheck_batch_limit(LimitScope checker_scope);
232+
boolcheck_size_limit(LimitScope checker_scope);
233+
boolcheck_time_limit(LimitScope checker_scope);
234+
boolcheck_any_limit(LimitScope checker_scope);
235+
236+
LimitFields limits_;
237+
LimitFields progress_;
238+
TO_STRING_KV(K_(limits),
239+
K_(progress));
240+
};
241+
169242
classObHTableScanMatcher
170243
{
171244
public:
@@ -224,6 +297,7 @@ class ObHTableRowIterator: public ObTableQueryResultIterator
224297
ObTableQueryResult *&out_result);
225298
ObIArray<common::ObNewRow> &get_same_kq_cells() {return same_kq_cells_; }
226299
voidset_max_version(int32_t max_version) { max_version_ = max_version; }
300+
voidset_scanner_context(ScannerContext *scanner_context) { scanner_context_ = scanner_context; }
227301
private:
228302
intnext_cell();
229303
intreverse_next_cell(ObIArray<common::ObNewRow> &same_kq_cells,
@@ -257,6 +331,9 @@ class ObHTableRowIterator: public ObTableQueryResultIterator
257331
int32_t count_per_row_;
258332
bool has_more_cells_;
259333
bool is_first_result_;
334+
bool allow_partial_results_;
335+
bool is_cache_block_;
336+
ScannerContext *scanner_context_;
260337
};
261338

262339
// entry class
@@ -266,6 +343,7 @@ class ObHTableFilterOperator: public ObTableQueryResultIterator
266343
ObHTableFilterOperator(const ObTableQuery &query, table::ObTableQueryResult &one_result);
267344
virtual~ObHTableFilterOperator() {}
268345
/// Fetch next batch result
346+
boolreach_caching_limit(int num_of_row);
269347
virtualintget_next_result(ObTableQueryResult *&one_result)override;
270348
virtualboolhas_more_result()constoverride {return row_iterator_.has_more_result(); }
271349
virtual table::ObTableQueryResult *get_one_result()override {return one_result_; }
@@ -284,9 +362,12 @@ class ObHTableFilterOperator: public ObTableQueryResultIterator
284362
table::ObTableQueryResult *one_result_;
285363
table::ObHTableFilterParser filter_parser_;
286364
table::hfilter::Filter *hfilter_;
287-
int32_t batch_size_;
365+
int32_t caching_;
366+
int time_limit_delta;
288367
int64_t max_result_size_;
289368
bool is_first_result_;
369+
bool check_existence_only_;
370+
ScannerContext scanner_context_;
290371
};
291372

292373
}// end namespace table

‎src/share/table/ob_table.cpp‎

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1053,7 +1053,7 @@ int ObTableQuery::deep_copy(ObIAllocator &allocator, ObTableQuery &dst) const
10531053
LOG_WARN("fail to deep copy index name",K(ret),K_(index_name));
10541054
}elseif (OB_FAIL(htable_filter_.deep_copy(allocator, dst.htable_filter_))) {
10551055
LOG_WARN("fail to deep copy htable filter",K(ret),K_(htable_filter));
1056-
}elseif (OB_FAIL(ob_params_.deep_copy(dst.ob_params_))){
1056+
}elseif (OB_FAIL(ob_params_.deep_copy(allocator,dst.ob_params_))){
10571057
LOG_WARN("fail to deep copy htable filter",K(ret),K_(ob_params));
10581058
}else {
10591059
dst.deserialize_allocator_ = deserialize_allocator_;
@@ -1122,6 +1122,8 @@ OB_DEF_DESERIALIZE(ObTableQuery,)
11221122
LOG_WARN("fail to deep copy range",K(ret));
11231123
}elseif (OB_FAIL(key_ranges_.push_back(key_range))) {
11241124
LOG_WARN("fail to add key range to array",K(ret));
1125+
}else {
1126+
ob_params_.set_allocator(deserialize_allocator_);
11251127
}
11261128
}
11271129
}
@@ -1563,6 +1565,11 @@ int ObTableQueryResult::add_all_row(const ObTableQueryResult &other)
15631565
return ret;
15641566
}
15651567

1568+
voidObTableQueryResult::check_exists_only(const ObTableQueryResult &other) {
1569+
this->properties_names_.reset();
1570+
row_count_ += other.row_count_;
1571+
}
1572+
15661573
int64_tObTableQueryResult::get_result_size()
15671574
{
15681575
if (0 >= fixed_result_size_) {
@@ -1817,8 +1824,8 @@ int ObHBaseParams::serialize(char *buf, const int64_t buf_len, int64_t &pos) con
18171824
}
18181825
}
18191826
if (!OB_FAIL(ret)) {
1820-
if (OB_FAIL(serialization::encode(buf, buf_len, pos,caching_))){
1821-
RPC_WARN("serialize fail",K(buf),K(buf_len),K(pos),K(caching_));
1827+
if (OB_FAIL(serialization::encode(buf, buf_len, pos,batch_))){
1828+
RPC_WARN("serialize fail",K(buf),K(buf_len),K(pos),K(batch_));
18221829
}elseif (!OB_FAIL(serialization::encode(buf, buf_len, pos, call_timeout_))) {
18231830
RPC_WARN("serialize fail",K(buf),K(buf_len),K(pos),K(call_timeout_));
18241831
}else {
@@ -1841,8 +1848,8 @@ int ObHBaseParams::serialize(char *buf, const int64_t buf_len, int64_t &pos) con
18411848
intObHBaseParams::deserialize(constchar *buf,constint64_t data_len,int64_t &pos) {
18421849
int ret =EmptyUnisStruct::deserialize(buf, data_len, pos);
18431850
if (OB_SUCC(ret)) {
1844-
if (OB_FAIL(serialization::decode(buf, data_len, pos,caching_))) {
1845-
RPC_WARN("deserialize fail",K(buf),K(data_len),K(pos),K(caching_));
1851+
if (OB_FAIL(serialization::decode(buf, data_len, pos,batch_))) {
1852+
RPC_WARN("deserialize fail",K(buf),K(data_len),K(pos),K(batch_));
18461853
}elseif (OB_FAIL(serialization::decode(buf, data_len, pos, call_timeout_))) {
18471854
RPC_WARN("deserialize fail",K(buf),K(data_len),K(pos),K(call_timeout_));
18481855
}else {
@@ -1860,7 +1867,7 @@ int ObHBaseParams::deserialize(const char *buf, const int64_t data_len, int64_t
18601867
int64_tObHBaseParams::get_serialize_size()const {
18611868
int64_t len = ::oceanbase::lib::EmptyUnisStruct::get_serialize_size();
18621869
len +=1;// param_type;
1863-
len +=serialization::encoded_length(caching_);
1870+
len +=serialization::encoded_length(batch_);
18641871
len += ::oceanbase::common::serialization::encoded_length(call_timeout_);
18651872
len +=serialization::encoded_length(bool_to_byte());
18661873
OB_UNIS_ADD_LEN(UNIS_VERSION);
@@ -1870,7 +1877,7 @@ int64_t ObHBaseParams::get_serialize_size() const {
18701877

18711878
intObHBaseParams::deep_copy(ObParamsBase *hbase_params)const {
18721879
ObHBaseParams *param =dynamic_cast<ObHBaseParams*>(hbase_params);
1873-
param->caching_ =caching_;
1880+
param->batch_ =batch_;
18741881
param->call_timeout_ = call_timeout_;
18751882
param->is_raw_ = is_raw_;
18761883
param->is_cache_block_ = is_cache_block_;
@@ -1920,8 +1927,12 @@ int64_t ObParams::get_serialize_size() const {
19201927
return ob_params_->get_serialize_size();
19211928
}
19221929

1923-
intObParams::deep_copy(ObParams &ob_params)const {
1930+
intObParams::deep_copy(ObIAllocator &allocator,ObParams &ob_params)const {
19241931
int ret = OB_SUCCESS;
1932+
ob_params.set_allocator(&allocator);
1933+
if (ob_params_ ==nullptr) {
1934+
return ret;
1935+
}
19251936
ob_params.ob_params_ = ob_params.get_ob_params(ob_params_->get_param_type());
19261937
ob_params_->deep_copy(ob_params.ob_params_);
19271938
return ret;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2026 Movatter.jp