Disclosure of Invention
The application aims to solve the technical problems that a large number of small files in Iceberg in the prior art easily cause performance reduction and operation and maintenance cost is high.
To achieve the above technical object, in one aspect, the present application provides a method for processing a large number of small files in Iceberg, the method comprising:
determining optimization parameters by scanning database and table information under a directory in Iceberg, wherein the optimization parameters comprise the type of established optimization tasks, the number of small files, the number of deleted files and the number of table snapshots;
establishing a current optimization task based on the optimization parameters;
determining the priority of the current optimization task according to the type of the current optimization task;
and determining an execution queue of all the optimization tasks based on the priority of the established optimization task and the priority of the current optimization task, and sequentially executing all the optimization tasks in the execution queue.
Further, the establishing a current optimization task based on the optimization parameters specifically includes:
if the number of the table snapshots exceeds the preset number of the snapshots, generating a current optimization task with the type of cleaning;
and if the number of the small files exceeds a first threshold or the number of the deleted files exceeds a second threshold, generating a current optimization task with the type of operation and maintenance optimization.
Further, if the established optimization task with the same type as the current optimization task exists, combining the established optimization task and the current optimization task to generate a combined task with the same type as the combination.
Further, the type is that the priority corresponding to merging is greater than the priority corresponding to the type is that the priority corresponding to operation and maintenance optimization is greater than the priority corresponding to cleaning.
Further, the step of sequentially executing all the optimization tasks in the execution queue specifically includes:
generating task acquisition threads according to the number of the optimized resources in the catalog;
and when available optimized resources exist under the catalog, sequentially acquiring optimized tasks from the execution queue by the task acquisition thread for execution.
Further, before determining the optimization parameters by scanning database and table information under the catalog in Iceberg, the method further includes isolating the optimization resources for running the optimization tasks in the execution queue.
Further, the optimized resource is specifically a kubernetes resource, and the isolation process specifically includes:
generating a name space according to the client interface of the optimized resource;
and binding the namespaces and the catalogs in a one-to-one correspondence mode.
In another aspect, the present application also provides a large number of small file processing apparatuses for use in Iceberg, the apparatus comprising:
the scanning module is used for scanning the database and the table information under the directory in the Iceberg to determine optimization parameters, wherein the optimization parameters comprise the type of the established optimization task, the number of small files, the number of deleted files and the number of table snapshots;
the establishing module is used for establishing a current optimization task based on the optimization parameters;
the priority module is used for determining the priority of the current optimization task according to the type of the current optimization task;
and the execution module is used for determining the execution queues of all the optimization tasks based on the priority of the established optimization task and the priority of the current optimization task, and executing all the optimization tasks in the execution queues in sequence.
Compared with the prior art, the method determines the optimization parameters by scanning the database and the table information under the catalog in the Iceberg, wherein the optimization parameters comprise the type of the established optimization task, the number of the small files, the number of the deleted files and the number of the table snapshots; establishing a current optimization task based on the optimization parameters; determining the priority of the current optimization task according to the type of the current optimization task; and determining execution queues of all the optimization tasks based on the priority of the established optimization task and the priority of the current optimization task, and sequentially executing all the optimization tasks in the execution queues, so that a large number of small files in the Iceberg are effectively optimized in time while the operation and maintenance cost is reduced, and the query performance of the Iceberg is maintained.
Detailed Description
In order that those skilled in the art will better understand the technical solutions in the present specification, the technical solutions in the embodiments of the present application will be clearly and completely described below with reference to the drawings in the embodiments of the present application, and it is apparent that the described embodiments are only some embodiments of the present application, but not all embodiments. All other embodiments, which can be made by those skilled in the art based on the embodiments of the application without making any inventive effort, are intended to be within the scope of the application.
While the present description provides the following embodiments or the method operation steps or apparatus structures shown in the drawings, more or fewer operation steps or module units may be included in the method or apparatus based on conventional or non-creative labor or after partial merging, and in steps or structures where there is no logically necessary causal relationship, the execution order of the steps or the module structures of the apparatus are not limited to those shown in the embodiments or the drawings of the present description. The described methods or module structures may be implemented in a sequential or parallel manner (e.g., in a parallel processor or multithreaded environment, or even in a distributed processing, server cluster implementation environment) in accordance with the method or module structures shown in the embodiments or figures when the actual device, server, or end product is in use.
The method for processing a large number of small files in Iceberg provided in the embodiment of the present disclosure may be applied to terminal devices such as a client and a server, as shown in fig. 1, and specifically includes the following steps:
and step S101, determining optimization parameters by scanning database and table information in the Iceberg under the directory, wherein the optimization parameters comprise the type of the established optimization task, the number of small files, the number of deleted files and the number of table snapshots.
Specifically, the database and table information under the directory in the Iceberg is scanned through the directory manager, and after the storage path of the table is taken, the interface of the Iceberg is called to obtain the optimization parameters, wherein the optimization parameters are parameters for determining whether to establish an optimization task or not, and specifically comprise the type of the established optimization task, the number of small files, the number of deleted files and the number of table snapshots.
In the embodiment of the present application, before determining the optimization parameters by scanning the database and table information under the catalogue in Iceberg, the method further includes isolating the optimization resources, which are specifically kubernetes resources, for running the optimization tasks in the execution queue, where the isolation process specifically includes:
generating a name space according to the client interface of the optimized resource;
and binding the namespaces and the catalogs in a one-to-one correspondence mode.
Specifically, in the prior art, the tasks of different tables of the compact cannot be isolated from resources, but in the scheme of the application, the corresponding or dedicated optimized resources are isolated from different tables, and the resource binding is performed through the catalogue of the tables.
As shown in fig. 2, which is a schematic flow chart of optimizing resource isolation in a specific application scenario, firstly, newly creating an optimized resource, namely, creating an optimized resource name or allocating a special id, if yes, ending the operation, if not, judging whether the optimized resource meets the requirement, and in the present application, the optimized resource is a kubernetes resource, namely, a k8s resource for short, such as Pod, deployment, service, node, and the like; if the requirements are not met, ending the operation, and if the requirements are met, performing optimized resource information persistence; creating a namespace of k8s, namely judging whether the namespace exists in k8s, if so, ending the operation, and if not, creating the namespace; allocating resource allowance to the Namespace, configuring the maximum available CPU and memory, deleting the Namespace and rolling back the transaction to recreate the Namespace if the configuration fails, adding the secret if the configuration is successful, rolling back the transaction to recreate the Namespace if the addition fails, creating configma if the addition is successful, rolling back the transaction to recreate the Namespace if the creation is unsuccessful, continuing to create hdlfs configuration configma in the spark job if the creation is successful, and ending the operation.
And step S102, establishing a current optimization task based on the optimization parameters.
In the embodiment of the present application, the establishing a current optimization task based on the optimization parameters specifically includes:
if the number of the table snapshots exceeds the preset number of the snapshots, generating a current optimization task with the type of cleaning;
and if the number of the small files exceeds a first threshold or the number of the deleted files exceeds a second threshold, generating a current optimization task with the type of operation and maintenance optimization.
Specifically, judging whether the optimal operation and maintenance task is required to be created currently according to factors such as the task type, the small file number, the deleted file number and the like, scoring the task priority, and performing priority scheduling:
if the current table has a new snapshot, the total snapshot data exceeds a threshold value, and the task queue does not have the same type of task, a current optimized task of a cleaning type is generated.
If the newly added data exists, the number of the small files exceeds a threshold value or the number of the deleted files exceeds a threshold value, and the task queue does not have the same type of tasks, merging tasks of a merging type are generated, or if the established optimizing tasks of the same type as the current optimizing tasks exist, merging the tasks to generate merging tasks of the merging type.
If the table state meets the merging task condition, an ordering field is configured in the configuration parameter, the duty ratio of the newly added data volume and the total data volume exceeds a threshold value, and the task queue does not have the same type of task, an ordering task is generated, the ordering task is specifically a task ordered according to the user-specified table field, the ordering task is also one of the tasks to be executed, and the priority of the ordering task is self-defined.
And step S103, determining the priority of the current optimization task according to the type of the current optimization task.
In the embodiment of the application, the priority corresponding to the type merging is larger than the priority corresponding to the type operation and maintenance optimization, and the priority corresponding to the type operation and maintenance optimization is larger than the priority corresponding to the type cleaning.
Specifically, the priority starting score of the merging type task and the sorting type task is higher than that of the cleaning task, namely, the priority is higher. The priority of the cleaning task can be increased in a gradient mode according to the snapshot data quantity, and the priority can be increased according to a preset gradient threshold. And increasing the priorities of the merging type tasks and the sorting type tasks according to the number of the deleted files and the number of the small files in a gradient manner. If the task in the queue does not run for a long time, the priority is automatically raised.
And step S104, determining an execution queue of all the optimized tasks based on the priority of the established optimized task and the priority of the current optimized task, and executing all the optimized tasks in the execution queue in sequence.
Determining the priority of all the optimization tasks according to the method for determining the priority, and sequentially placing the optimization tasks into an execution queue, wherein the step of sequentially executing all the optimization tasks in the execution queue specifically comprises the following steps:
generating task acquisition threads according to the number of the optimized resources in the catalog, namely generating task acquisition threads according to the number of the optimized resources bound with the catalog;
and when available optimized resources exist under the catalog, sequentially acquiring optimized tasks from the execution queue by the task acquisition thread for execution.
Specifically, the optimized resource refers to a k8s resource, specifically refers to a k8s resource isolated by cpu and memory, and is specially used for running an optimized task, the resource of the Spark task is set by a k8s Spark (Spark is a general distributed data processing engine) operator, that is, an operator, and the Spark task is initiated by running jar packets of the Spark task. And updating the task state through a callback task in the spark task, monitoring and alarming, and judging whether the resources required by the task are larger than available optimized resources or not when the optimized task in the execution queue is executed, if so, stopping the corresponding task or dividing the corresponding task into executable sizes for execution.
As shown in fig. 3, a flow chart of an optimization task running in a specific application scenario specifically includes:
querying a lake bin directory associated with the optimized resource, wherein the lake bin directory is a table directory in the Iceberg data lake; acquiring a task queue, namely an execution queue, according to the lake bin catalog; the method comprises the steps of obtaining optimization parameters of a task, wherein the optimization parameters of the task comprise the maximum file number, the minimum file size, the maximum file size, a sequencing field, a file deletion threshold value, snapshot preservation time and the like; then judging whether the cluster resources meet the requirements or not, wherein the cluster resources are the optimized resources; if not, ending the operation, if so, submitting the task to the resource cluster, and executing the task, namely, the optimization task in the queue; task monitoring is carried out, wherein the task monitoring is specifically to update the task state through a callback task in the spark task and carry out monitoring alarm; ending the operation.
After the processing, the number of small files, the number of deleted files and the like of the table can be reduced through table optimization, and the query performance of the table is improved through rewriting the table structure; the file structuring operation can be continuously carried out in the background, so that the operation and maintenance workload is reduced, and the operation and maintenance cost is reduced.
Based on the above-described method for processing a large number of small files in Iceberg, one or more embodiments of the present disclosure further provide a platform, a terminal, or a platform for processing a large number of small files in Iceberg, where the platform or the terminal may include a device, software, a module, a plug-in, a server, a client, etc. using the method described in the embodiments of the present disclosure in combination with a device for implementing hardware, where the system in one or more embodiments provided in the embodiments of the present disclosure is based on the same innovative concept, as described in the following embodiments, since the implementation of the system to solve the problem is similar to the method, the implementation of the specific system in the embodiments of the present disclosure may refer to the implementation of the foregoing method, and the terms "unit" or "module" used in the following may implement a combination of software and/or hardware for the predetermined function, which will not be repeated. While the system described in the following embodiments is preferably implemented in software, hardware, implementation of a combination of hardware and software is also possible and contemplated.
Specifically, fig. 4 is a schematic block diagram of an embodiment of a large-size small-file processing apparatus for use in Iceberg provided in the present specification, and as shown in fig. 4, the large-size small-file processing apparatus for use in Iceberg provided in the present specification includes:
the scanning module 401 is configured to scan the database and the table information under the directory in the Iceberg to determine optimization parameters, where the optimization parameters include an established optimization task type, the number of small files, the number of deleted files, and the number of table snapshots;
a building module 402, configured to build a current optimization task based on the optimization parameters;
a priority module 403, configured to determine a priority of the current optimization task according to a type of the current optimization task;
and the execution module 404 is configured to determine an execution queue of all optimization tasks based on the priority of the established optimization task and the priority of the current optimization task, and execute all optimization tasks in the execution queue in sequence.
It should be noted that, the description of the above system according to the corresponding method embodiment may further include other embodiments, and specific implementation manner may refer to the description of the above corresponding method embodiment, which is not described herein in detail.
The embodiment of the application also provides electronic equipment, which comprises:
a processor;
a memory for storing the processor-executable instructions;
the processor is configured to perform the method as provided in the above embodiments.
According to the electronic equipment provided by the embodiment of the application, the executable instructions of the processor are stored through the memory, and when the processor executes the executable instructions, the optimization parameters can be determined by scanning the database and the table information under the directory in the Iceberg, wherein the optimization parameters comprise the type of the established optimization task, the number of small files, the number of deleted files and the number of table snapshots; establishing a current optimization task based on the optimization parameters; determining the priority of the current optimization task according to the type of the current optimization task; and determining an execution queue of all the optimization tasks based on the priority of the established optimization task and the priority of the current optimization task, and sequentially executing all the optimization tasks in the execution queue.
The method embodiments provided in the embodiments of the present specification may be performed in a mobile terminal, a computer terminal, a server, or similar computing device. Taking the example of running on a server, fig. 5 is a block diagram of a hardware structure of a large number of small file processing servers used in Iceberg in one embodiment of the present specification, and the computer terminal may be the large number of small file processing servers used in Iceberg or the large number of small file processing apparatuses used in Iceberg in the above embodiment. One or more (only one is shown in the figure) processors 100 (the processor 100 may include, but is not limited to, a processing means such as a microprocessor mcu or a programmable logic device fpga), a nonvolatile memory 200 for storing data, and a transmission module 300 for communication functions.
The non-volatile memory 200 may be used to store software programs and modules of application software, such as program instructions/modules corresponding to the numerous small file handling methods used in Iceberg in the present embodiment, and the processor 100 executes various functional applications and resource data updates by running the software programs and modules stored in the non-volatile memory 200. The non-volatile memory 200 may include high speed random access memory, and may also include non-volatile memory, such as one or more magnetic storage devices, flash memory, or other non-volatile solid state memory. In some examples, the non-volatile memory 200 may further include memory located remotely from the processor 100, which may be connected to the computer terminal via a network. Examples of such networks include, but are not limited to, the internet, intranets, local area networks, mobile communication networks, and combinations thereof.
The transmission module 300 is used to receive or transmit data via a network. Specific examples of the network described above may include a wireless network provided by a communication provider of a computer terminal. In one example, the transmission module 300 includes a network adapter (network interface controller, nic) that can connect to other network devices through a base station to communicate with the internet. In one example, the transmission module 300 may be a radio frequency (rf) module for communicating with the internet wirelessly.
The foregoing describes specific embodiments of the present disclosure. Other embodiments are within the scope of the following claims. In some cases, the actions or steps recited in the claims can be performed in a different order than in the embodiments and still achieve desirable results. In addition, the processes depicted in the accompanying figures do not necessarily require the particular order shown, or sequential order, to achieve desirable results. In some embodiments, multitasking and parallel processing are also possible or may be advantageous.
The method or apparatus according to the foregoing embodiments provided in the present specification may implement service logic through a computer program and be recorded on a storage medium, where the storage medium may be read and executed by a computer, to implement effects of the solutions described in the embodiments of the present specification, for example:
determining optimization parameters by scanning database and table information under a directory in Iceberg, wherein the optimization parameters comprise the type of established optimization tasks, the number of small files, the number of deleted files and the number of table snapshots;
establishing a current optimization task based on the optimization parameters;
determining the priority of the current optimization task according to the type of the current optimization task;
and determining an execution queue of all the optimization tasks based on the priority of the established optimization task and the priority of the current optimization task, and sequentially executing all the optimization tasks in the execution queue.
The storage medium may include physical means for storing information, typically by digitizing the information before storing it in an electronic, magnetic, or optical medium. The storage medium may include: devices for storing information by using electric energy, such as various memories, e.g. ram, rom, etc.; devices for storing information using magnetic energy such as hard disk, floppy disk, magnetic tape, magnetic core memory, bubble memory, and u-disk; means for optically storing information, such as cd or dvd. Of course, there are other ways of readable storage medium, such as quantum memory, graphene memory, etc.
Embodiments of the present description are not limited to situations in which industry communication standards, standard computer resource data updates, and data storage rules must be met or described in one or more embodiments of the present description. Some industry standards or embodiments modified slightly based on the implementation described by the custom manner or examples can also realize the same, equivalent or similar or predictable implementation effect after modification of the above examples. Examples of data acquisition, storage, judgment, processing, etc., using these modifications or variations may still fall within the scope of alternative implementations of the examples of this specification.
The controller may be implemented in any suitable manner, for example, the controller may take the form of, for example, a microprocessor or processor and a computer readable medium storing computer readable program code (e.g., software or firmware) executable by the (micro) processor, logic gates, switches, application specific integrated circuits (application specific integrated circuit, asics), programmable logic controllers, and embedded microcontrollers, examples of which include, but are not limited to, the following microcontrollers: the arc 625d, atm at91sam, microchip pic18f26k20, and silicone labs c8051f320, the memory controller may also be implemented as part of the control logic of the memory. Those skilled in the art will also appreciate that, in addition to implementing the controller in a pure computer readable program code, it is well possible to implement the same functionality by logically programming the method steps such that the controller is in the form of logic gates, switches, application specific integrated circuits, programmable logic controllers, embedded microcontrollers, etc. Such a controller may thus be regarded as a kind of hardware component, and means for performing various functions included therein may also be regarded as structures within the hardware component. Or even means for achieving the various functions may be regarded as either software modules implementing the methods or structures within hardware components.
The above-described apparatus embodiments are merely illustrative, and for example, the division of the units is merely a logical function division, and there may be additional divisions in actual implementation, for example, multiple units or plug-ins may be combined or integrated into another system, or some features may be omitted or not performed. Alternatively, the coupling or direct coupling or communication connection shown or discussed with each other may be an indirect coupling or communication connection via some interfaces, devices or units, which may be in electrical, mechanical or other form.
These computer program instructions may also be loaded onto a computer or other programmable resource data updating apparatus to cause a series of operational steps to be performed on the computer or other programmable apparatus to produce a computer implemented process such that the instructions which execute on the computer or other programmable apparatus provide steps for implementing the functions specified in the flowchart flow or flows and/or block diagram block or blocks.
In this specification, each embodiment is described in a progressive manner, and identical and similar parts of each embodiment are referred to each other, and each embodiment is mainly described in a different manner from other embodiments. In particular, for system embodiments, the description is relatively simple as it is substantially similar to method embodiments, and reference is made to the section of the method embodiments where relevant. In the description of the present specification, a description referring to terms "one embodiment," "some embodiments," "examples," "specific examples," or "some examples," etc., means that a particular feature, structure, material, or characteristic described in connection with the embodiment or example is included in at least one embodiment or example of the present specification. In this specification, schematic representations of the above terms are not necessarily directed to the same embodiment or example. Furthermore, the particular features, structures, materials, or characteristics described may be combined in any suitable manner in any one or more embodiments or examples. Furthermore, the various embodiments or examples described in this specification and the features of the various embodiments or examples may be combined and combined by those skilled in the art without conflict
Those of ordinary skill in the art will recognize that the embodiments described herein are for the purpose of aiding the reader in understanding the principles of the present application and should be understood that the scope of the application is not limited to such specific statements and embodiments. Those of ordinary skill in the art can make various other specific modifications and combinations from the teachings of the present disclosure without departing from the spirit thereof, and such modifications and combinations remain within the scope of the present disclosure.