@@ -635,13 +635,36 @@ ObHTableRowIterator::ObHTableRowIterator(const ObTableQuery &query)
635635 cell_count_(0 ),
636636 count_per_row_(0 ),
637637 has_more_cells_(true ),
638- is_first_result_(true )
639- {}
638+ is_first_result_(true ),
639+ hbase_params_(),
640+ scanner_context_(NULL )
641+ {
642+ init_ob_params (query);
643+ }
640644
641645ObHTableRowIterator::~ObHTableRowIterator ()
642646{
643647}
644648
649+ int ObHTableRowIterator::init_ob_params (const ObTableQuery &query)
650+ {
651+ int ret = OB_SUCCESS;
652+ if (query.get_ob_params ().ob_params_ !=nullptr && query.get_ob_params ().ob_params_ ->get_param_type () == ParamType::HBase) {
653+ ObHBaseParams* hbase_params =static_cast <ObHBaseParams*>(query.get_ob_params ().ob_params_ );
654+ if (hbase_params ==nullptr || hbase_params->get_param_type () != ParamType::HBase) {
655+ ret = OB_ERR_UNEXPECTED;
656+ LOG_WARN (" unexpected ob_params" ,K (ret),KPC (hbase_params));
657+ }else {// only need the following two params;
658+ hbase_params_.allow_partial_results_ = hbase_params->allow_partial_results_ ;
659+ hbase_params_.is_cache_block_ = hbase_params->is_cache_block_ ;
660+ }
661+ }else {
662+ ret = OB_BAD_NULL_ERROR;
663+ LOG_WARN (" unexpected ob_params_ nullptr" ,K (ret));
664+ }
665+ return ret;
666+ }
667+
645668int ObHTableRowIterator::next_cell ()
646669{
647670 ObNewRow *ob_row =NULL ;
@@ -736,11 +759,15 @@ int ObHTableRowIterator::get_next_result(ObTableQueryResult *&out_result)
736759 one_hbase_row_.reset ();
737760 ObIArray<common::ObNewRow>& same_kq_cells =get_same_kq_cells ();
738761 ObHTableMatchCode match_code = ObHTableMatchCode::DONE_SCAN;// initialize
762+ bool has_filter_row = (NULL != hfilter_) && (hfilter_->has_filter_row ());
739763if (ObQueryFlag::Reverse == scan_order_ && (-1 != limit_per_row_per_cf_ ||0 != offset_per_row_per_cf_)) {
740764 ret = OB_NOT_SUPPORTED;
741765LOG_USER_ERROR (OB_NOT_SUPPORTED," set limit_per_row_per_cf_ and offset_per_row_per_cf_ in reverse scan" );
742766LOG_WARN (" server don't support set limit_per_row_per_cf_ and offset_per_row_per_cf_ in reverse scan yet" ,
743767K (ret),K (scan_order_),K (limit_per_row_per_cf_),K (offset_per_row_per_cf_));
768+ }else if (scanner_context_ ==nullptr ) {
769+ ret = OB_ERR_UNEXPECTED;
770+ LOG_WARN (" scanner_context_ meet unexecpted nullptr" ,K (ret));
744771 }
745772if (OB_SUCC (ret) &&NULL == column_tracker_) {
746773// first iteration
@@ -772,6 +799,15 @@ int ObHTableRowIterator::get_next_result(ObTableQueryResult *&out_result)
772799 count_per_row_ =0 ;
773800 ret = matcher_->set_to_new_row (curr_cell_);
774801 }
802+ if (OB_SUCC (ret)) {
803+ if (has_filter_row) {
804+ scanner_context_->limits_ .set_size_scope (LimitScope::Scope::BETWEEN_ROWS);
805+ scanner_context_->limits_ .set_time_scope (LimitScope::Scope::BETWEEN_ROWS);
806+ }else if (hbase_params_.allow_partial_results_ ) {
807+ scanner_context_->limits_ .set_size_scope (LimitScope::Scope::BETWEEN_CELLS);
808+ scanner_context_->limits_ .set_time_scope (LimitScope::Scope::BETWEEN_CELLS);
809+ }
810+ }
775811bool loop =true ;
776812if (OB_SUCC (ret)) {
777813if (NULL == matcher_->get_curr_row ()) {
@@ -865,6 +901,8 @@ int ObHTableRowIterator::get_next_result(ObTableQueryResult *&out_result)
865901if (OB_FAIL (out_result->add_row (*(curr_cell_.get_ob_row ())))) {
866902LOG_WARN (" failed to add row to result" ,K (ret));
867903 }else {
904+ scanner_context_->increment_batch_progress (1 );
905+ scanner_context_->increment_size_progress (curr_cell_.get_ob_row ()->get_serialize_size ());
868906 ++cell_count_;
869907LOG_DEBUG (" [yzfdebug] add cell" ,K_ (cell_count),K_ (curr_cell),
870908K_ (count_per_row),K_ (offset_per_row_per_cf));
@@ -909,7 +947,7 @@ int ObHTableRowIterator::get_next_result(ObTableQueryResult *&out_result)
909947 }
910948 }
911949if (OB_SUCC (ret)) {
912- if (reach_batch_limit () || reach_size_limit ( )) {
950+ if (scanner_context_-> check_any_limit (LimitScope::Scope::BETWEEN_CELLS )) {
913951 loop =false ;
914952 }
915953 }else if (OB_ITER_END == ret) {
@@ -1037,20 +1075,50 @@ void ObHTableRowIterator::set_ttl(int32_t ttl_value)
10371075ObHTableFilterOperator::ObHTableFilterOperator (const ObTableQuery &query,
10381076 table::ObTableQueryResult &one_result)
10391077 : ObTableQueryResultIterator(&query),
1078+ is_inited_(false ),
10401079 row_iterator_(query),
10411080 one_result_(&one_result),
10421081 hfilter_(NULL ),
1043- batch_size_(query.get_batch()),
1082+ ob_kv_params_(query.get_ob_params()),
1083+ caching_(-1 ),
1084+ batch_(query.get_batch()),
10441085 max_result_size_(std::min(query.get_max_result_size(),
10451086 static_cast<int64_t>(ObTableQueryResult::get_max_packet_buffer_length() - 1024))),
1046- is_first_result_(true )
1087+ is_first_result_(true ),
1088+ check_existence_only_(false ),
1089+ scanner_context_()
1090+ {
1091+ }
1092+
1093+ int ObHTableFilterOperator::init_ob_params (){
1094+ int ret = OB_SUCCESS;
1095+ const ObHBaseParams* hbase_params =nullptr ;
1096+ if (ob_kv_params_.allocator_ !=nullptr && !is_inited_) {
1097+ if (OB_FAIL (ob_kv_params_.init_ob_params_for_hfilter (hbase_params))) {
1098+ LOG_WARN (" init_ob_params fail" ,K (ret));
1099+ }else {
1100+ caching_ = hbase_params->caching_ ;
1101+ scanner_context_.limits_ .set_fields (batch_, max_result_size_, hbase_params->call_timeout_ ,LimitScope (LimitScope::Scope::BETWEEN_ROWS));
1102+ check_existence_only_ = hbase_params->check_existence_only_ ;
1103+ is_inited_ =true ;
1104+ }
1105+ }
1106+ row_iterator_.set_scanner_context (&scanner_context_);
1107+ return ret;
1108+ }
1109+
1110+ bool ObHTableFilterOperator::reach_caching_limit (int num_of_row)
10471111{
1112+ return caching_ >=0 && num_of_row >= caching_;
10481113}
10491114
10501115// @param one_result for one batch
10511116int ObHTableFilterOperator::get_next_result (ObTableQueryResult *&next_result)
10521117{
10531118int ret = OB_SUCCESS;
1119+ if (OB_FAIL (init_ob_params ())) {
1120+ LOG_WARN (" init ob params fail" ,K (ret),K (ob_kv_params_));
1121+ }
10541122 one_result_->reset_except_property ();
10551123bool has_filter_row = (NULL != hfilter_) && (hfilter_->has_filter_row ());
10561124 next_result = one_result_;
@@ -1059,8 +1127,12 @@ int ObHTableFilterOperator::get_next_result(ObTableQueryResult *&next_result)
10591127// ObObj first_entity_cells[4];
10601128// first_entity.cells_ = first_entity_cells;
10611129// first_entity.count_ = 4;
1130+ int num_of_row =0 ;
1131+ scanner_context_.progress_ .reset ();
10621132while (OB_SUCC (ret) && row_iterator_.has_more_result ()
1063- &&OB_SUCC (row_iterator_.get_next_result (htable_row))) {
1133+ && !reach_caching_limit (num_of_row)
1134+ &&OB_SUCC (row_iterator_.get_next_result (htable_row)
1135+ )) {
10641136LOG_DEBUG (" [yzfdebug] got one row" ," cells_count" , htable_row->get_row_count ());
10651137bool is_empty_row = (htable_row->get_row_count () ==0 );
10661138if (is_empty_row) {
@@ -1107,14 +1179,20 @@ int ObHTableFilterOperator::get_next_result(ObTableQueryResult *&next_result)
11071179 }
11081180/* @todo check batch limit and size limit*/
11091181// We have got one hbase row, store it to this batch
1182+ if (check_existence_only_) {
1183+ one_result_->save_row_count_only (1 );
1184+ break ;
1185+ }
11101186if (OB_FAIL (one_result_->add_all_row (*htable_row))) {
11111187LOG_WARN (" failed to add cells to row" ,K (ret));
11121188 }
11131189if (NULL != hfilter_) {
11141190 hfilter_->reset ();
11151191 }
11161192if (OB_SUCC (ret)) {
1117- if (one_result_->reach_batch_size_or_result_size (batch_size_, max_result_size_)) {
1193+ num_of_row +=1 ;
1194+ scanner_context_.increment_size_progress (htable_row->get_serialize_size_ ());
1195+ if (scanner_context_.check_any_limit (LimitScope::Scope::BETWEEN_ROWS)) {
11181196break ;
11191197 }
11201198 }
@@ -1152,3 +1230,68 @@ int ObHTableFilterOperator::parse_filter_string(common::ObIAllocator* allocator)
11521230 }
11531231return ret;
11541232}
1233+
1234+ void ObHTableFilterOperator::init_properties (const ObHBaseParams* params,const ObTableQuery &query)
1235+ {
1236+ if (params !=nullptr ) {
1237+ caching_ = params->caching_ ;
1238+ scanner_context_.limits_ .set_fields (query.get_batch (), max_result_size_, params->call_timeout_ ,LimitScope (LimitScope::Scope::BETWEEN_ROWS));
1239+ check_existence_only_ = params->check_existence_only_ ;
1240+ }
1241+ row_iterator_.set_scanner_context (&scanner_context_);
1242+ }
1243+
1244+ ScannerContext::ScannerContext ()
1245+ {
1246+ limits_.set_fields (LIMIT_DEFAULT_VALUE, LIMIT_DEFAULT_VALUE, LIMIT_DEFAULT_VALUE, LimitScope::Scope::BETWEEN_ROWS);
1247+ progress_.set_fields (PROGRESS_DEFAULT_VALUE, PROGRESS_DEFAULT_VALUE, PROGRESS_DEFAULT_VALUE, LimitScope::Scope::BETWEEN_ROWS);
1248+ }
1249+
1250+ ScannerContext::ScannerContext (int32_t batch,int64_t size,int64_t time, LimitScope limit_scope)
1251+ {
1252+ limits_.set_fields (batch, size, time, limit_scope);
1253+ }
1254+
1255+ void ScannerContext::increment_batch_progress (int32_t batch)
1256+ {
1257+ int32_t current_batch = progress_.get_batch ();
1258+ progress_.set_batch (current_batch + batch);
1259+ }
1260+
1261+ void ScannerContext::increment_size_progress (int64_t size)
1262+ {
1263+ int64_t current_size = progress_.get_size ();
1264+ progress_.set_size (current_size + size);
1265+ }
1266+
1267+ bool ScannerContext::check_batch_limit (LimitScope checker_scope)
1268+ {
1269+ bool ret =false ;
1270+ if (limits_.can_enforce_batch_from_scope (checker_scope) && limits_.get_batch () >0 ) {
1271+ ret = progress_.get_batch () >= limits_.get_batch ();
1272+ }
1273+ return ret;
1274+ }
1275+
1276+ bool ScannerContext::check_size_limit (LimitScope checker_scope)
1277+ {
1278+ bool ret =false ;
1279+ if (limits_.can_enforce_size_from_scope (checker_scope) && limits_.get_size () >0 ) {
1280+ ret = progress_.get_size () >= limits_.get_size ();
1281+ }
1282+ return ret;
1283+ }
1284+
1285+ bool ScannerContext::check_time_limit (LimitScope checker_scope)
1286+ {
1287+ bool ret =false ;
1288+ if (limits_.can_enforce_time_from_scope (checker_scope) && limits_.get_time () >0 ) {
1289+ ret = progress_.get_time () >= limits_.get_time ();
1290+ }
1291+ return ret;
1292+ }
1293+
1294+ bool ScannerContext::check_any_limit (LimitScope checker_scope)
1295+ {
1296+ return check_batch_limit (checker_scope) ||check_size_limit (checker_scope) ||check_time_limit (checker_scope);
1297+ }