Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit00a1d56

Browse files
committed
HBase compatible
1 parent7cbec9c commit00a1d56

File tree

4 files changed

+375
-163
lines changed

4 files changed

+375
-163
lines changed

‎src/observer/table/ob_htable_filter_operator.cpp‎

Lines changed: 150 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -635,13 +635,36 @@ ObHTableRowIterator::ObHTableRowIterator(const ObTableQuery &query)
635635
cell_count_(0),
636636
count_per_row_(0),
637637
has_more_cells_(true),
638-
is_first_result_(true)
639-
{}
638+
is_first_result_(true),
639+
hbase_params_(),
640+
scanner_context_(NULL)
641+
{
642+
init_ob_params(query);
643+
}
640644

641645
ObHTableRowIterator::~ObHTableRowIterator()
642646
{
643647
}
644648

649+
intObHTableRowIterator::init_ob_params(const ObTableQuery &query)
650+
{
651+
int ret = OB_SUCCESS;
652+
if (query.get_ob_params().ob_params_ !=nullptr && query.get_ob_params().ob_params_->get_param_type() == ParamType::HBase) {
653+
ObHBaseParams* hbase_params =static_cast<ObHBaseParams*>(query.get_ob_params().ob_params_);
654+
if (hbase_params ==nullptr || hbase_params->get_param_type() != ParamType::HBase) {
655+
ret = OB_ERR_UNEXPECTED;
656+
LOG_WARN("unexpected ob_params",K(ret),KPC(hbase_params));
657+
}else {// only need the following two params;
658+
hbase_params_.allow_partial_results_ = hbase_params->allow_partial_results_;
659+
hbase_params_.is_cache_block_ = hbase_params->is_cache_block_;
660+
}
661+
}else {
662+
ret = OB_BAD_NULL_ERROR;
663+
LOG_WARN("unexpected ob_params_ nullptr",K(ret));
664+
}
665+
return ret;
666+
}
667+
645668
intObHTableRowIterator::next_cell()
646669
{
647670
ObNewRow *ob_row =NULL;
@@ -736,11 +759,15 @@ int ObHTableRowIterator::get_next_result(ObTableQueryResult *&out_result)
736759
one_hbase_row_.reset();
737760
ObIArray<common::ObNewRow>& same_kq_cells =get_same_kq_cells();
738761
ObHTableMatchCode match_code = ObHTableMatchCode::DONE_SCAN;// initialize
762+
bool has_filter_row = (NULL != hfilter_) && (hfilter_->has_filter_row());
739763
if (ObQueryFlag::Reverse == scan_order_ && (-1 != limit_per_row_per_cf_ ||0 != offset_per_row_per_cf_)) {
740764
ret = OB_NOT_SUPPORTED;
741765
LOG_USER_ERROR(OB_NOT_SUPPORTED,"set limit_per_row_per_cf_ and offset_per_row_per_cf_ in reverse scan");
742766
LOG_WARN("server don't support set limit_per_row_per_cf_ and offset_per_row_per_cf_ in reverse scan yet",
743767
K(ret),K(scan_order_),K(limit_per_row_per_cf_),K(offset_per_row_per_cf_));
768+
}elseif (scanner_context_ ==nullptr) {
769+
ret = OB_ERR_UNEXPECTED;
770+
LOG_WARN("scanner_context_ meet unexecpted nullptr",K(ret));
744771
}
745772
if (OB_SUCC(ret) &&NULL == column_tracker_) {
746773
// first iteration
@@ -772,6 +799,15 @@ int ObHTableRowIterator::get_next_result(ObTableQueryResult *&out_result)
772799
count_per_row_ =0;
773800
ret = matcher_->set_to_new_row(curr_cell_);
774801
}
802+
if (OB_SUCC(ret)) {
803+
if (has_filter_row) {
804+
scanner_context_->limits_.set_size_scope(LimitScope::Scope::BETWEEN_ROWS);
805+
scanner_context_->limits_.set_time_scope(LimitScope::Scope::BETWEEN_ROWS);
806+
}elseif (hbase_params_.allow_partial_results_) {
807+
scanner_context_->limits_.set_size_scope(LimitScope::Scope::BETWEEN_CELLS);
808+
scanner_context_->limits_.set_time_scope(LimitScope::Scope::BETWEEN_CELLS);
809+
}
810+
}
775811
bool loop =true;
776812
if (OB_SUCC(ret)) {
777813
if (NULL == matcher_->get_curr_row()) {
@@ -865,6 +901,8 @@ int ObHTableRowIterator::get_next_result(ObTableQueryResult *&out_result)
865901
if (OB_FAIL(out_result->add_row(*(curr_cell_.get_ob_row())))) {
866902
LOG_WARN("failed to add row to result",K(ret));
867903
}else {
904+
scanner_context_->increment_batch_progress(1);
905+
scanner_context_->increment_size_progress(curr_cell_.get_ob_row()->get_serialize_size());
868906
++cell_count_;
869907
LOG_DEBUG("[yzfdebug] add cell",K_(cell_count),K_(curr_cell),
870908
K_(count_per_row),K_(offset_per_row_per_cf));
@@ -909,7 +947,7 @@ int ObHTableRowIterator::get_next_result(ObTableQueryResult *&out_result)
909947
}
910948
}
911949
if (OB_SUCC(ret)) {
912-
if (reach_batch_limit() ||reach_size_limit()) {
950+
if (scanner_context_->check_any_limit(LimitScope::Scope::BETWEEN_CELLS)) {
913951
loop =false;
914952
}
915953
}elseif (OB_ITER_END == ret) {
@@ -1037,20 +1075,50 @@ void ObHTableRowIterator::set_ttl(int32_t ttl_value)
10371075
ObHTableFilterOperator::ObHTableFilterOperator(const ObTableQuery &query,
10381076
table::ObTableQueryResult &one_result)
10391077
: ObTableQueryResultIterator(&query),
1078+
is_inited_(false),
10401079
row_iterator_(query),
10411080
one_result_(&one_result),
10421081
hfilter_(NULL),
1043-
batch_size_(query.get_batch()),
1082+
ob_kv_params_(query.get_ob_params()),
1083+
caching_(-1),
1084+
batch_(query.get_batch()),
10441085
max_result_size_(std::min(query.get_max_result_size(),
10451086
static_cast<int64_t>(ObTableQueryResult::get_max_packet_buffer_length() - 1024))),
1046-
is_first_result_(true)
1087+
is_first_result_(true),
1088+
check_existence_only_(false),
1089+
scanner_context_()
1090+
{
1091+
}
1092+
1093+
intObHTableFilterOperator::init_ob_params(){
1094+
int ret = OB_SUCCESS;
1095+
const ObHBaseParams* hbase_params =nullptr;
1096+
if (ob_kv_params_.allocator_ !=nullptr && !is_inited_) {
1097+
if (OB_FAIL(ob_kv_params_.init_ob_params_for_hfilter(hbase_params))) {
1098+
LOG_WARN("init_ob_params fail",K(ret));
1099+
}else {
1100+
caching_ = hbase_params->caching_;
1101+
scanner_context_.limits_.set_fields(batch_, max_result_size_, hbase_params->call_timeout_,LimitScope(LimitScope::Scope::BETWEEN_ROWS));
1102+
check_existence_only_ = hbase_params->check_existence_only_;
1103+
is_inited_ =true;
1104+
}
1105+
}
1106+
row_iterator_.set_scanner_context(&scanner_context_);
1107+
return ret;
1108+
}
1109+
1110+
boolObHTableFilterOperator::reach_caching_limit(int num_of_row)
10471111
{
1112+
return caching_ >=0 && num_of_row >= caching_;
10481113
}
10491114

10501115
// @param one_result for one batch
10511116
intObHTableFilterOperator::get_next_result(ObTableQueryResult *&next_result)
10521117
{
10531118
int ret = OB_SUCCESS;
1119+
if (OB_FAIL(init_ob_params())) {
1120+
LOG_WARN("init ob params fail",K(ret),K(ob_kv_params_));
1121+
}
10541122
one_result_->reset_except_property();
10551123
bool has_filter_row = (NULL != hfilter_) && (hfilter_->has_filter_row());
10561124
next_result = one_result_;
@@ -1059,8 +1127,12 @@ int ObHTableFilterOperator::get_next_result(ObTableQueryResult *&next_result)
10591127
// ObObj first_entity_cells[4];
10601128
// first_entity.cells_ = first_entity_cells;
10611129
// first_entity.count_ = 4;
1130+
int num_of_row =0;
1131+
scanner_context_.progress_.reset();
10621132
while (OB_SUCC(ret) && row_iterator_.has_more_result()
1063-
&&OB_SUCC(row_iterator_.get_next_result(htable_row))) {
1133+
&& !reach_caching_limit(num_of_row)
1134+
&&OB_SUCC(row_iterator_.get_next_result(htable_row)
1135+
)) {
10641136
LOG_DEBUG("[yzfdebug] got one row","cells_count", htable_row->get_row_count());
10651137
bool is_empty_row = (htable_row->get_row_count() ==0);
10661138
if (is_empty_row) {
@@ -1107,14 +1179,20 @@ int ObHTableFilterOperator::get_next_result(ObTableQueryResult *&next_result)
11071179
}
11081180
/* @todo check batch limit and size limit*/
11091181
// We have got one hbase row, store it to this batch
1182+
if (check_existence_only_) {
1183+
one_result_->save_row_count_only(1);
1184+
break;
1185+
}
11101186
if (OB_FAIL(one_result_->add_all_row(*htable_row))) {
11111187
LOG_WARN("failed to add cells to row",K(ret));
11121188
}
11131189
if (NULL != hfilter_) {
11141190
hfilter_->reset();
11151191
}
11161192
if (OB_SUCC(ret)) {
1117-
if (one_result_->reach_batch_size_or_result_size(batch_size_, max_result_size_)) {
1193+
num_of_row +=1;
1194+
scanner_context_.increment_size_progress(htable_row->get_serialize_size_());
1195+
if (scanner_context_.check_any_limit(LimitScope::Scope::BETWEEN_ROWS)) {
11181196
break;
11191197
}
11201198
}
@@ -1152,3 +1230,68 @@ int ObHTableFilterOperator::parse_filter_string(common::ObIAllocator* allocator)
11521230
}
11531231
return ret;
11541232
}
1233+
1234+
voidObHTableFilterOperator::init_properties(const ObHBaseParams* params,const ObTableQuery &query)
1235+
{
1236+
if (params !=nullptr) {
1237+
caching_ = params->caching_;
1238+
scanner_context_.limits_.set_fields(query.get_batch(), max_result_size_, params->call_timeout_,LimitScope(LimitScope::Scope::BETWEEN_ROWS));
1239+
check_existence_only_ = params->check_existence_only_;
1240+
}
1241+
row_iterator_.set_scanner_context(&scanner_context_);
1242+
}
1243+
1244+
ScannerContext::ScannerContext()
1245+
{
1246+
limits_.set_fields(LIMIT_DEFAULT_VALUE, LIMIT_DEFAULT_VALUE, LIMIT_DEFAULT_VALUE, LimitScope::Scope::BETWEEN_ROWS);
1247+
progress_.set_fields(PROGRESS_DEFAULT_VALUE, PROGRESS_DEFAULT_VALUE, PROGRESS_DEFAULT_VALUE, LimitScope::Scope::BETWEEN_ROWS);
1248+
}
1249+
1250+
ScannerContext::ScannerContext(int32_t batch,int64_t size,int64_t time, LimitScope limit_scope)
1251+
{
1252+
limits_.set_fields(batch, size, time, limit_scope);
1253+
}
1254+
1255+
voidScannerContext::increment_batch_progress(int32_t batch)
1256+
{
1257+
int32_t current_batch = progress_.get_batch();
1258+
progress_.set_batch(current_batch + batch);
1259+
}
1260+
1261+
voidScannerContext::increment_size_progress(int64_t size)
1262+
{
1263+
int64_t current_size = progress_.get_size();
1264+
progress_.set_size(current_size + size);
1265+
}
1266+
1267+
boolScannerContext::check_batch_limit(LimitScope checker_scope)
1268+
{
1269+
bool ret =false;
1270+
if (limits_.can_enforce_batch_from_scope(checker_scope) && limits_.get_batch() >0) {
1271+
ret = progress_.get_batch() >= limits_.get_batch();
1272+
}
1273+
return ret;
1274+
}
1275+
1276+
boolScannerContext::check_size_limit(LimitScope checker_scope)
1277+
{
1278+
bool ret =false;
1279+
if (limits_.can_enforce_size_from_scope(checker_scope) && limits_.get_size() >0 ) {
1280+
ret = progress_.get_size() >= limits_.get_size();
1281+
}
1282+
return ret;
1283+
}
1284+
1285+
boolScannerContext::check_time_limit(LimitScope checker_scope)
1286+
{
1287+
bool ret =false;
1288+
if (limits_.can_enforce_time_from_scope(checker_scope) && limits_.get_time() >0) {
1289+
ret = progress_.get_time() >= limits_.get_time();
1290+
}
1291+
return ret;
1292+
}
1293+
1294+
boolScannerContext::check_any_limit(LimitScope checker_scope)
1295+
{
1296+
returncheck_batch_limit(checker_scope) ||check_size_limit(checker_scope) ||check_time_limit(checker_scope);
1297+
}

‎src/observer/table/ob_htable_filter_operator.h‎

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,86 @@ class ObHTableWildcardColumnTracker: public ObHTableColumnTracker
166166
int32_t current_count_;
167167
};
168168

169+
170+
classLimitScope
171+
{
172+
public:
173+
enumclassScope
174+
{
175+
BETWEEN_ROWS =0,
176+
BETWEEN_CELLS =1
177+
};
178+
LimitScope() : scope_(Scope::BETWEEN_ROWS), depth_(0) {}
179+
LimitScope(Scope scope) : scope_(scope), depth_(static_cast<int>(scope)) {}
180+
~LimitScope() {};
181+
OB_INLINEintdepth()const {return depth_; }
182+
OB_INLINEvoidset_scope(Scope scope) {scope_ = scope; depth_ =static_cast<int>(scope);}
183+
OB_INLINEboolcan_enforce_limit_from_scope(const LimitScope& checker_scope)const {return checker_scope.depth() <= depth_; }
184+
TO_STRING_KV(K_(scope), K_(depth));
185+
private:
186+
Scope scope_;
187+
int depth_;
188+
};
189+
190+
classLimitFields
191+
{
192+
public:
193+
LimitFields(): batch_(-1), size_(-1), time_(-1), size_scope_(LimitScope::Scope::BETWEEN_ROWS), time_scope_(LimitScope::Scope::BETWEEN_ROWS) {}
194+
LimitFields(int32_t batch,int64_t size,int64_t time, LimitScope limit_scope) {set_fields(batch, size, time, limit_scope); }
195+
voidset_fields(int32_t batch,int64_t size,int64_t time, LimitScope limit_scope) {
196+
set_batch(batch);
197+
set_size_scope(limit_scope);
198+
set_time_scope(limit_scope);
199+
set_size(size);
200+
set_time(time);
201+
}
202+
~LimitFields() {};
203+
voidreset() { batch_ =0; size_ =0; time_ =0; size_scope_.set_scope(LimitScope::Scope::BETWEEN_ROWS);}
204+
voidset_batch(int32_t batch) { batch_ = batch; }
205+
voidset_size(int64_t size) { size_ = size; }
206+
voidset_time(int64_t time) { time_ = time; }
207+
voidset_time_scope(LimitScope scope) { time_scope_ = scope; }
208+
voidset_size_scope(LimitScope scope) { size_scope_ = scope; }
209+
int32_tget_batch() {return batch_; }
210+
int64_tget_size() {return size_; }
211+
int64_tget_time() {return time_; }
212+
LimitScopeget_size_scope() {return size_scope_; }
213+
LimitScopeget_time_scope() {return time_scope_; }
214+
boolcan_enforce_batch_from_scope(LimitScope checker_scope) {returnLimitScope(LimitScope::Scope::BETWEEN_CELLS).can_enforce_limit_from_scope(checker_scope);}
215+
boolcan_enforce_size_from_scope(LimitScope checker_scope) {return size_scope_.can_enforce_limit_from_scope(checker_scope); }
216+
boolcan_enforce_time_from_scope(LimitScope checker_scope) {return time_scope_.can_enforce_limit_from_scope(checker_scope); }
217+
TO_STRING_KV(K_(batch), K_(size), K_(time), K_(size_scope), K_(time_scope));
218+
private:
219+
int32_t batch_;
220+
int64_t size_;
221+
int64_t time_;
222+
LimitScope size_scope_;
223+
LimitScope time_scope_;
224+
};
225+
226+
classScannerContext
227+
{
228+
public:
229+
ScannerContext();
230+
~ScannerContext() {};
231+
ScannerContext(int32_t batch,int64_t size,int64_t time, LimitScope limit_scope);
232+
voidincrement_batch_progress(int32_t batch);
233+
voidincrement_size_progress(int64_t size);
234+
voidupdate_time_progress() { progress_.set_time(ObTimeUtility::current_time()); }
235+
boolcheck_batch_limit(LimitScope checker_scope);
236+
boolcheck_size_limit(LimitScope checker_scope);
237+
boolcheck_time_limit(LimitScope checker_scope);
238+
boolcheck_any_limit(LimitScope checker_scope);
239+
240+
LimitFields limits_;
241+
LimitFields progress_;
242+
TO_STRING_KV(K_(limits),
243+
K_(progress));
244+
private:
245+
staticconstint LIMIT_DEFAULT_VALUE = -1;
246+
staticconstint PROGRESS_DEFAULT_VALUE =0;
247+
};
248+
169249
classObHTableScanMatcher
170250
{
171251
public:
@@ -217,13 +297,15 @@ class ObHTableRowIterator: public ObTableQueryResultIterator
217297
{
218298
child_op_ = scan_result;
219299
}
300+
intinit_ob_params(const ObTableQuery &query);
220301
boolhas_more_result()const {return has_more_cells_; }
221302
voidset_hfilter(table::hfilter::Filter *hfilter);
222303
voidset_ttl(int32_t ttl_value);
223304
intadd_same_kq_to_res(ObIArray<common::ObNewRow> &same_kq_cells,
224305
ObTableQueryResult *&out_result);
225306
ObIArray<common::ObNewRow> &get_same_kq_cells() {return same_kq_cells_; }
226307
voidset_max_version(int32_t max_version) { max_version_ = max_version; }
308+
voidset_scanner_context(ScannerContext *scanner_context) { scanner_context_ = scanner_context; }
227309
private:
228310
intnext_cell();
229311
intreverse_next_cell(ObIArray<common::ObNewRow> &same_kq_cells,
@@ -257,6 +339,8 @@ class ObHTableRowIterator: public ObTableQueryResultIterator
257339
int32_t count_per_row_;
258340
bool has_more_cells_;
259341
bool is_first_result_;
342+
ObHBaseParams hbase_params_;
343+
ScannerContext *scanner_context_;
260344
};
261345

262346
// entry class
@@ -266,6 +350,8 @@ class ObHTableFilterOperator: public ObTableQueryResultIterator
266350
ObHTableFilterOperator(const ObTableQuery &query, table::ObTableQueryResult &one_result);
267351
virtual~ObHTableFilterOperator() {}
268352
/// Fetch next batch result
353+
boolreach_caching_limit(int num_of_row);
354+
intinit_ob_params();
269355
virtualintget_next_result(ObTableQueryResult *&one_result)override;
270356
virtualboolhas_more_result()constoverride {return row_iterator_.has_more_result(); }
271357
virtual table::ObTableQueryResult *get_one_result()override {return one_result_; }
@@ -278,15 +364,22 @@ class ObHTableFilterOperator: public ObTableQueryResultIterator
278364
voidset_max_version(int32_t max_version_value) { row_iterator_.set_max_version(max_version_value); }
279365
// parse the filter string
280366
intparse_filter_string(common::ObIAllocator* allocator);
367+
voidinit_properties(const ObHBaseParams* params,const ObTableQuery &query);
281368
OB_INLINE table::hfilter::Filter *get_hfiter() {return hfilter_; }
282369
private:
370+
bool is_inited_;
283371
ObHTableRowIterator row_iterator_;
284372
table::ObTableQueryResult *one_result_;
285373
table::ObHTableFilterParser filter_parser_;
286374
table::hfilter::Filter *hfilter_;
287-
int32_t batch_size_;
375+
const ObKVParams& ob_kv_params_;
376+
int32_t caching_;
377+
int32_t batch_;
378+
int time_limit_delta;
288379
int64_t max_result_size_;
289380
bool is_first_result_;
381+
bool check_existence_only_;
382+
ScannerContext scanner_context_;
290383
};
291384

292385
}// end namespace table

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2026 Movatter.jp