Movatterモバイル変換


[0]ホーム

URL:


CN114721843A - Method and system for processing market quotations of multiple American stocks - Google Patents

Method and system for processing market quotations of multiple American stocks
Download PDF

Info

Publication number
CN114721843A
CN114721843ACN202210217495.8ACN202210217495ACN114721843ACN 114721843 ACN114721843 ACN 114721843ACN 202210217495 ACN202210217495 ACN 202210217495ACN 114721843 ACN114721843 ACN 114721843A
Authority
CN
China
Prior art keywords
market
quotation
time
thread
real
Prior art date
Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
Pending
Application number
CN202210217495.8A
Other languages
Chinese (zh)
Inventor
王伏根
Current Assignee (The listed assignees may be inaccurate. Google has not performed a legal analysis and makes no representation or warranty as to the accuracy of the list.)
Yingli Shuzhi Technology Shenzhen Co ltd
Original Assignee
Yingli Shuzhi Technology Shenzhen Co ltd
Priority date (The priority date is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the date listed.)
Filing date
Publication date
Application filed by Yingli Shuzhi Technology Shenzhen Co ltdfiledCriticalYingli Shuzhi Technology Shenzhen Co ltd
Priority to CN202210217495.8ApriorityCriticalpatent/CN114721843A/en
Publication of CN114721843ApublicationCriticalpatent/CN114721843A/en
Pendinglegal-statusCriticalCurrent

Links

Images

Classifications

Landscapes

Abstract

The invention provides a method and a system for processing the market quotation of a partnered multi-exchange, wherein the method comprises the following steps of S1, generating an asynchronous event by a timing thread; s2, executing asynchronous events by the main thread, and subscribing stock code quotations in batches; s3, executing the quotation message processing logic by the quotation callback processing thread, and putting the quotation message into a corresponding message queue; s4, analyzing the quotation message by the quotation decoding thread, acquiring real-time quotation and delayed quotation, putting the delayed quotation into a cache queue, judging whether the real-time quotation is the quotation in a disk, if the real-time quotation is the quotation in the disk, putting the real-time quotation into the cache queue, and if the real-time quotation is the quotation before the disk or after the disk, putting the real-time quotation into the quotation message queue; s5, merging the real-time quotations and the delayed quotations in the cache queue by the quotation merging thread, and putting the merged results into a quotation message queue; and S6, converting the messages in the market message queue into protobuf format by the market sending thread, and sending the converted messages to the kafka streaming system for downstream consumers.

Description

Translated fromChinese
一种美股多交易所行情处理方法及系统A method and system for processing U.S. stock multi-exchange quotations

技术领域technical field

本发明涉及行情数据处理技术领域,具体的,本发明涉及一种美股多交易所行情处理方法及系统。The present invention relates to the technical field of market data processing, and in particular, the present invention relates to a method and system for processing quotations of U.S. stocks and multiple exchanges.

背景技术Background technique

美股行情权限包含延时行情、Level1高级行情和Level2高级行情。其中Level1高级行情是盘前盘后展示来自纳斯达克NB市场实时的流式行情,而盘中则同时接收美股几个主要的交易所延迟行情(包括纳斯达克综合延迟行情,ARCA/AMEX/NYSE交易所延迟行情等,通常延迟时间为15分钟),对于下游行情订阅服务来说,如何修正盘中行情数据,以尽可能准确的反映美股市场行情,成为亟需解决的问题,本发明提供了一种通过将实时行情和延时行情合并的方案,以修正盘中行情数据。The U.S. stock market authority includes delayed market, Level1 advanced market and Level2 advanced market. Among them, the Level 1 advanced market is to display the real-time streaming market from the Nasdaq NB market before and after the market, and at the same time receive the delayed market information of several major exchanges in the US stock market (including the Nasdaq comprehensive delayed market, ARCA/ AMEX/NYSE exchanges delay quotations, etc., usually the delay time is 15 minutes). For downstream quotation subscription services, how to correct the intraday quotation data to reflect the US stock market conditions as accurately as possible has become an urgent problem to be solved. The invention provides a solution of merging real-time quotations and delayed quotations to correct intraday quotation data.

发明内容SUMMARY OF THE INVENTION

为了克服现有技术的不足,本发明提供了一种美股多交易所行情处理方法及系统,以解决上述的技术问题。In order to overcome the deficiencies of the prior art, the present invention provides a method and system for processing quotations of U.S. stocks on multiple exchanges to solve the above-mentioned technical problems.

本发明解决其技术问题所采用的技术方法是:一种美股多交易所行情处理方法,其改进之处在于:包括以下的步骤:S1、定时线程生成异步事件,异步事件包括针对交易所以及美股市场的事件;S2、主线程执行异步事件,批量订阅证券代码行情;S3、行情回调处理线程执行行情消息处理逻辑,将行情消息放入对应的消息队列,行情消息根据代码类型分为链代码行情和证券代码行情;S4、行情解码线程解析行情消息,获取实时行情和延时行情,延时行情放入缓存队列,并判断该实时行情是否为盘中行情,若是盘中行情,则将该实时行情放入缓存队列,若是盘前或盘后行情,则将该实时行情放入行情消息队列,并跳转至步骤S6;S5、行情合并线程对缓存队列中的实时行情和延时行情进行合并,合并后的结果放入行情消息队列;S6、行情发送线程将行情消息队列中的消息转换成protobuf格式,并发送到kafka流式系统,供下游消费者使用。The technical method adopted by the present invention to solve the technical problem is: a method for processing quotations of U.S. stocks on multiple exchanges. Market events; S2, the main thread executes asynchronous events, and subscribes to stock code quotes in batches; S3, the market callback processing thread executes the market message processing logic, and puts the market message into the corresponding message queue. The market message is divided into chain code quotes according to the code type. And the stock code quotation; S4, the quotation decoding thread parses the quotation information, obtains the real-time quotation and the delayed quotation, puts the delayed quotation into the cache queue, and judges whether the real-time quotation is an intraday quotation. The quotation is put into the cache queue. If it is a pre-market or after-market quotation, the real-time quotation is put into the quotation message queue, and jumps to step S6; S5, the quotation merging thread merges the real-time quotation and the delayed quotation in the cache queue. , the combined result is put into the market message queue; S6, the market sending thread converts the message in the market message queue into protobuf format, and sends it to the kafka streaming system for downstream consumers to use.

在上述方法中,所述步骤S1,包括以下的步骤:In the above method, the step S1 includes the following steps:

S11、主线程启动后读取配置文件,将所有定时事件时间设置添加到定时线程;S11. After the main thread is started, read the configuration file, and add all timed event time settings to the timed thread;

S12、定时线程定时检查当前时间是否满足任一定时任务时间设置,如果满足,则将针对交易所的事件类型和交易所标识写入至事件消息队列,如果不满足,则定时线程休眠一段时间后再重新检查,直至满足任一定时任务时间设置。S12. The timing thread regularly checks whether the current time satisfies any timing task time setting, and if so, writes the event type and exchange ID for the exchange to the event message queue, if not, the timing thread sleeps for a period of time after Check again until any of the timing task time settings are met.

在上述方法中,所述步骤S2,包括以下的步骤:In the above method, the step S2 includes the following steps:

S21、主线程接收到该事件,向路透TREP服务器发起链代码订阅请求,即请求获取该市场的所有证券代码;S21. The main thread receives the event and initiates a chain code subscription request to the Reuters TREP server, that is, requests to obtain all securities codes in the market;

S22、批量订阅证券代码行情。S22. Subscribing securities code quotations in batches.

在上述方法中,所述步骤S3中,链代码行情包括链代码快照行情和链代码更新行情。In the above method, in the step S3, the chain code quotation includes the chain code snapshot quotation and the chain code update quotation.

在上述方法中,所述步骤S4中,所述行情解码线程解析行情消息,获取实时行情和延时行情,包括以下的步骤:In the above method, in the step S4, the quotation decoding thread parses the quotation information to obtain real-time quotations and delayed quotations, including the following steps:

S401、针对每个交易所,内部创建对应消息队列和缓存数据结构;S401. For each exchange, create a corresponding message queue and cache data structure internally;

S402、行情接收线程将数据发送到消息队列后取出消息,行情解码线程解析出证券代码;S402, the quotation receiving thread sends the data to the message queue and then retrieves the message, and the quotation decoding thread parses the securities code;

S403、根据证券代码从行情缓存中查找该支证券代码的缓存数据,解析行情消息每个字段,更新内部缓存数据,获取实时行情和延时行情。S403. Search the cached data of the securities code from the market information cache according to the securities code, parse each field of the market information, update the internal cached data, and obtain real-time and delayed market information.

在上述方法中,所述步骤S4中,所述判断该实时行情是否为盘中行情,是基于最新时间和交易日历来判断。In the above method, in the step S4, the judging whether the real-time market price is the intraday market price is based on the latest time and the transaction calendar.

在上述方法中,所述步骤S5,包括以下的步骤:In the above method, the step S5 includes the following steps:

S51、行情合并线程从缓存队列中取出消息;S51, the market merging thread takes out the message from the cache queue;

S52、在行情缓存中查找是否有相反状态的行情数据,若有,则基于特定规则对实时行情和延时行情进行合并,若没有,则直接取缓存队列中的行情数据,进行合并;S52. Find out whether there is market data in the opposite state in the market cache. If so, combine the real-time market and the delayed market based on specific rules. If not, directly fetch the market data in the cache queue and merge them;

S53、更新行情缓存中该证券代码的最新数据,并将合并后的结果放入行情消息队列。S53 , update the latest data of the security code in the quotation cache, and put the combined result into the quotation message queue.

在上述方法中,所述步骤S52中,所述特定规则,包括:In the above method, in step S52, the specific rule includes:

(1)最新价:根据行情时间判断,取实时行情和延时行情中行情时间大的最新价;(1) Latest price: Judging by the market time, take the latest price with a longer market time in the real-time market and the delayed market;

(2)最高价:取实时行情和延时行情中最高价的最高值;(2) Highest price: take the highest value of the highest price in the real-time market and the delayed market;

(3)最低价:取实时行情和延时行情中最低价的最低值;(3) Lowest price: take the lowest value of the lowest price in the real-time market and the delayed market;

(4)开盘价:以延迟行情开盘价为准,如果延迟行情没有开盘价则使用实时行情开盘价;(4) Opening price: The opening price of the delayed market shall prevail. If there is no opening price of the delayed market, the opening price of the real-time market shall be used;

(5)昨收价:以延时行情昨收价为准,如果延迟行情没有昨收价则使用实时行情昨收价;(5) Yesterday's closing price: The yesterday's closing price of the delayed market shall prevail. If the delayed market does not have yesterday's closing price, the real-time market yesterday's closing price shall be used;

(6)收盘价:以延时行情收盘价为准,如果延迟行情没有收盘价则使用实时行情收盘价。(6) Closing price: The closing price of the delayed market shall prevail. If there is no closing price of the delayed market, the closing price of the real-time market shall be used.

在上述方法中,所述步骤S6,包括以下的步骤:In the above method, the step S6 includes the following steps:

S61、行情发送线程从行情消息队列中逐条取出待发送的原始缓存消息;S61, the quotation sending thread takes out the original cached messages to be sent one by one from the quotation message queue;

S62、在内部转换成protobuf格式的消息,根据行情类型以及市场状态,将其发送到kafka流式系统中对应的主题中,供下游消费使用,行情类型包括行情、逐笔或盘口,市场状态包括盘前、盘中或盘后。S62. Internally convert the message into protobuf format, and send it to the corresponding topic in the kafka streaming system according to the market type and market status for downstream consumption. Including pre-market, intra-day or after-hours.

本发明还提供了一种美股多交易所行情处理系统,包括定时线程、主线程、行情回调处理线程、行情解码线程、行情合并线程和行情发送线程,The present invention also provides a quotation processing system for US stocks and multiple exchanges, including a timing thread, a main thread, a quotation callback processing thread, a quotation decoding thread, a quotation merging thread and a quotation sending thread,

定时线程用于生成异步事件,异步事件包括针对交易所以及美股市场的事件;Timing threads are used to generate asynchronous events, including events for exchanges and the US stock market;

主线程用于执行异步事件,批量订阅证券代码行情;The main thread is used to execute asynchronous events and subscribe to stock code quotes in batches;

行情回调处理线程用于执行行情消息处理逻辑,将行情消息放入对应的消息队列,行情消息根据代码类型分为链代码行情和证券代码行情;The market callback processing thread is used to execute the market message processing logic and put the market message into the corresponding message queue. The market message is divided into chain code market and securities code market according to the code type;

行情解码线程用于解析行情消息,获取实时行情和延时行情,延时行情放入缓存队列,并判断该实时行情是否为盘中行情,若是盘中行情,则将该实时行情放入缓存队列,若是盘前或盘后行情,则将该实时行情放入行情消息队列,并由行情发送线程作进一步处理;The market decoding thread is used to parse the market information, obtain the real-time market and delayed market, put the delayed market into the cache queue, and judge whether the real-time market is intraday market, if it is intraday, put the real-time market into the cache queue , if the market is pre-market or after-market, the real-time market will be put into the market message queue, and will be further processed by the market sending thread;

行情合并线程用于对缓存队列中的实时行情和延时行情进行合并,合并后的结果放入行情消息队列;The quotation merging thread is used to merge the real-time quotations and delayed quotations in the cache queue, and the combined result is put into the quotation message queue;

行情发送线程用于将行情消息队列中的消息转换成protobuf格式,并发送到kafka流式系统,供下游消费者使用。The market sending thread is used to convert the messages in the market message queue into protobuf format and send them to the kafka streaming system for downstream consumers to use.

本发明的有益效果是:实现了通过配置文件定义每个交易所的开盘和收盘时间,自动订阅和取消订阅该市场行情;通过独立的线程处理来自各个交易所的行情;并且基于特定的规则完成盘中实时行情和延时行情的数据合并,保证数据准确性。The beneficial effects of the present invention are: to define the opening and closing time of each exchange through the configuration file, to automatically subscribe and unsubscribe the market quotations; to process the quotations from each exchange through independent threads; and to complete the process based on specific rules The data of the real-time market and the delayed market in the intraday market are merged to ensure the accuracy of the data.

附图说明Description of drawings

附图1为本发明的一种美股多交易所行情处理方法及系统的逻辑图。FIG. 1 is a logic diagram of a method and system for processing quotations of U.S. stocks on multiple exchanges according to the present invention.

附图2为本发明的一种美股多交易所行情处理方法的流程图。FIG. 2 is a flow chart of a method for processing quotations of U.S. stocks on multiple exchanges according to the present invention.

具体实施方式Detailed ways

下面结合附图和实施例对本发明进一步说明。The present invention will be further described below in conjunction with the accompanying drawings and embodiments.

以下将结合实施例和附图对本发明的构思、具体结构及产生的技术效果进行清楚、完整地描述,以充分地理解本发明的目的、特征和效果。显然,所描述的实施例只是本发明的一部分实施例,而不是全部实施例,基于本发明的实施例,本领域的技术人员在不付出创造性劳动的前提下所获得的其他实施例,均属于本发明保护的范围。另外,专利中涉及到的所有联接/连接关系,并非单指构件直接相接,而是指可根据具体实施情况,通过添加或减少联接辅件,来组成更优的联接结构。本发明创造中的各个技术特征,在不互相矛盾冲突的前提下可以交互组合。The concept, specific structure and technical effects of the present invention will be clearly and completely described below with reference to the embodiments and accompanying drawings, so as to fully understand the purpose, characteristics and effects of the present invention. Obviously, the described embodiments are only a part of the embodiments of the present invention, rather than all the embodiments. Based on the embodiments of the present invention, other embodiments obtained by those skilled in the art without creative efforts are all within the scope of The scope of protection of the present invention. In addition, all the coupling/connection relationships involved in the patent do not mean that the components are directly connected, but refer to a better coupling structure by adding or reducing coupling accessories according to the specific implementation. Various technical features in the present invention can be combined interactively on the premise of not contradicting each other.

参照图1和图2,一种美股多交易所行情处理方法,包括以下的步骤:Referring to Figure 1 and Figure 2, a method for processing quotations of U.S. stocks on multiple exchanges includes the following steps:

S1、定时线程生成异步事件,异步事件包括针对交易所以及美股市场的事件,其中,针对交易所的事件包括开盘、清盘和收市,针对美股市场的事件包括交易日历获取、设置冬令时或者夏令时(美股采用美东时间)。所有这些事件都由定时器线程产生,并由主线程异步执行,实现通过配置文件方式定义多个交易所的事件触发时间,由定时线程根据配置时间和当前时间产生异步事件通知主线程执行;S1. The timing thread generates asynchronous events. The asynchronous events include events for the exchange and the US stock market. Among them, the events for the exchange include the opening, liquidation and closing, and the events for the US stock market include the acquisition of the trading calendar and the setting of winter time or summer time. (U.S. stocks use Eastern Time). All these events are generated by the timer thread and executed asynchronously by the main thread. The event trigger time of multiple exchanges is defined through the configuration file, and the asynchronous event is generated by the timer thread according to the configuration time and the current time to notify the main thread to execute;

具体的,包括如下步骤:Specifically, it includes the following steps:

S11、主线程启动后读取配置文件,将所有定时事件时间设置添加到定时线程;S11. After the main thread is started, read the configuration file, and add all timed event time settings to the timed thread;

S12、定时线程定时(例如每500ms)检查当前时间是否满足任一定时任务时间设置,如果满足,则将针对交易所的事件类型和交易所标识写入至事件消息队列,如果不满足,则定时线程休眠一段时间(例如休眠500ms)后再重新检查,直至满足任一定时任务时间设置;S12. Timing thread timing (for example, every 500ms) to check whether the current time satisfies any timing task time setting. If so, write the event type and exchange identification for the exchange to the event message queue. If not, then schedule the time. The thread sleeps for a period of time (for example, sleeps for 500ms) and then rechecks it until it meets the time setting of any timed task;

在此提供一个配置文件的例子,supported_exchange配置项包含订阅的交易所列表,针对每一个交易所会设置refresh_time(行情订阅时间),clear_time(清盘行情发送时间),close_time(收市时间),即可在相应的美东时间开始订阅行情,发送清盘行情,收盘取消行情订阅An example of a configuration file is provided here. The supported_exchange configuration item contains a list of subscribed exchanges. For each exchange, refresh_time (market subscription time), clear_time (clearing market sending time), close_time (closing time) will be set. The corresponding EST starts to subscribe to the market, sends the liquidation market, and cancels the market subscription when the market closes.

S2、主线程执行异步事件,批量订阅证券代码行情;S2. The main thread executes asynchronous events and subscribes to stock code quotes in batches;

具体的,步骤S2包括的如下步骤:Specifically, step S2 includes the following steps:

S21、主线程接收到该事件,向路透TREP服务器发起链代码订阅请求,即请求获取该市场的所有证券代码;S21. The main thread receives the event and initiates a chain code subscription request to the Reuters TREP server, that is, requests to obtain all securities codes in the market;

S22、批量订阅证券代码行情;S22. Subscribing to securities code quotations in batches;

主线程启动后主要用于执行异步事件,由于服务可能存在异常重启,因此需要考虑如何实现快速行情恢复。正常情况下,定时线程根据开盘时间产生开盘事件,主线程接收到该事件后向路透TREP服务器发起链代码订阅请求,即请求获取该市场的所有证券代码,后续所有订阅消息都由行情回调线程异步处理。在服务重启后,根据时间判断可能在开盘之后,此时不能依赖定时线程产生开盘事件,而是执行重放操作,即通过读取行情回调线程写入的证券代码文件得到该交易所的全量证券代码,再批量订阅证券代码行情,实现行情数据的快速恢复,由回调线程异步处理。After the main thread is started, it is mainly used to execute asynchronous events. Since the service may restart abnormally, it is necessary to consider how to achieve rapid market recovery. Under normal circumstances, the timing thread generates an opening event according to the opening time. After receiving the event, the main thread initiates a chain code subscription request to the Reuters TREP server, that is, requests to obtain all securities codes in the market, and all subsequent subscription messages are asynchronously executed by the market callback thread. deal with. After the service is restarted, according to the time judgment, it may be after the market opening. At this time, you cannot rely on the timing thread to generate the opening event, but perform the replay operation, that is, obtain the full amount of securities of the exchange by reading the securities code file written by the market callback thread. code, and then subscribe to securities code quotations in batches to achieve rapid recovery of quotation data, which is asynchronously processed by the callback thread.

S3、行情回调处理线程执行行情消息处理逻辑,将行情消息放入对应的消息队列,行情消息根据代码类型分为链代码行情和证券代码行情;进一步地,链代码行情包括链代码快照行情和链代码更新行情。不同类别的行情消息处理逻辑分别如下:S3. The market callback processing thread executes the market message processing logic, and puts the market message into the corresponding message queue. The market message is divided into chain code market and securities code market according to the code type; further, the chain code market includes the chain code snapshot market and the chain code market. Code update quotes. The processing logic of different types of market news is as follows:

(1)链代码快照行情(1) Chaincode snapshot market

主线程订阅链代码后首先收到链代码快照行情,由于单个交易所证券代码比较多,因此可能通过多个快照消息才能够获取到所有证券代码,当所有证券代码都获取到本地后,回调线程以2000支代码为一批次开始订阅行情。由于获取全量证券代码比较耗时,通常以若干秒计,为减少服务重启后的订阅延迟导致的行情丢失,行情回调处理线程将所有证券代码按交易所写入至系统文件中,供主线程恢复订阅使用。After the main thread subscribes to the chain code, it first receives the chain code snapshot market. Since there are many securities codes on a single exchange, it may be possible to obtain all the securities codes through multiple snapshot messages. When all the securities codes are obtained locally, the callback thread Start subscribing to the market in batches of 2000 codes. Since it is time-consuming to obtain the full amount of securities codes, usually measured in several seconds, in order to reduce the loss of market quotations caused by subscription delay after service restart, the quotation callback processing thread writes all securities codes to the system file according to the exchange for the main thread to resume. Subscribe to use.

(2)链代码更新行情(2) Chaincode update market

这种行情较少,用于开盘后发生代码变化时推送更新,例如特殊情况下开盘后有新的代码可以开始交易。收到这种行情时,需要将每条更新消息中包含的代码列表与之前收到的全量代码列表进行比较,确认是否新增,对于新增代码即时订阅,并更新代码文件。This kind of market is rare, and is used to push updates when the code changes after the opening. For example, in special cases, there is a new code to start trading after the opening. When receiving this kind of market, it is necessary to compare the code list contained in each update message with the full code list received before to confirm whether it is newly added, subscribe to the newly added code immediately, and update the code file.

(3)证券代码行情(3) Securities code market

回调线程将证券代码行情消息在本地创建副本,根据不同的交易所,将该消息副本放入对应的消息队列,供行情解码线程处理。The callback thread creates a copy of the stock code quotation message locally, and according to different exchanges, puts the copy of the message into the corresponding message queue for processing by the quotation decoding thread.

S4、行情解码线程解析行情消息,获取实时行情和延时行情,延时行情放入缓存队列(用作数据修正,因此盘中状态的延时行情直接放入缓存队列),同时,基于最新时间和交易日历来判断该实时行情是否为盘中行情,若是盘中行情,则将该实时行情放入缓存队列,并跳转至下一步S5;若是盘前或盘后行情,则将该实时行情放入行情消息队列,并跳转至步骤S6;S4. The market decoding thread parses the market information, obtains the real-time market and the delayed market, and puts the delayed market into the cache queue (used for data correction, so the delayed market in the disk state is directly put into the cache queue), and at the same time, based on the latest time and trading calendar to determine whether the real-time market is intraday, if it is intraday, put the real-time market in the cache queue, and jump to the next step S5; if it is pre-market or after-market, then the real-time market Put into the market message queue, and jump to step S6;

所述行情解码线程解析行情消息,获取实时行情和延时行情,包括以下的步骤:The market decoding thread parses the market information to obtain real-time and delayed market information, including the following steps:

S401、针对每个交易所,内部创建对应消息队列和缓存数据结构;S401. For each exchange, create a corresponding message queue and cache data structure internally;

S402、行情接收线程将数据发送到消息队列后取出消息,行情解码线程解析出证券代码;S402, the quotation receiving thread sends the data to the message queue and then retrieves the message, and the quotation decoding thread parses the securities code;

S403、根据证券代码从行情缓存中查找该支证券代码的缓存数据,解析行情消息每个字段,更新内部缓存数据,获取实时行情和延时行情。S403. Search the cached data of the securities code from the market information cache according to the securities code, parse each field of the market information, update the internal cached data, and obtain real-time and delayed market information.

S5、行情合并线程对缓存队列中的盘中阶段的实时行情和延时行情进行合并,合并后的结果放入行情消息队列;S5. The quotation merging thread merges the real-time quotations and delayed quotations in the intraday stage in the cache queue, and the combined result is put into the quotation message queue;

具体的,步骤S5,包括以下的步骤:Specifically, step S5 includes the following steps:

S51、独立的行情合并线程从缓存队列中取出一条消息;S51. An independent market merging thread takes out a message from the cache queue;

S52、在行情缓存中查找是否有相反状态的行情数据(如果缓存队列的消息是实时行情,则从行情缓存中查找该代码的延时行情,如果缓存队列的消息是延时行情,则从行情缓存中查找该证券代码的实时行情),若有,则基于特定规则对实时行情和延时行情进行合并,若没有(例如只有缓存队列的实时行情,行情缓存中没有延时行情,或者只有缓存队列的延时行情,行情缓存中没有实时行情),则合并字段值直接取缓存队列中的行情数据,进行合并;S52. Check whether there is market data in the opposite state in the market cache (if the message in the cache queue is a real-time market, then look up the delayed market of the code from the market cache, if the message in the cache queue is a delayed market, then from the market Find the real-time quotation of the security code in the cache), if so, combine the real-time quotation and delayed quotation based on specific rules, if not (for example, only the real-time quotation of the cache queue, there is no delayed quotation in the quotation cache, or only the cache The delayed quotation of the queue, there is no real-time quotation in the quotation cache), the merged field value directly takes the quotation data in the cache queue and merges it;

所述特定规则,包括:The specific rules include:

(1)最新价:根据行情时间判断,取实时行情和延时行情中行情时间大的最新价;(1) Latest price: Judging by the market time, take the latest price with a longer market time in the real-time market and the delayed market;

(2)最高价:取实时行情和延时行情中最高价的最高值;(2) Highest price: take the highest value of the highest price in the real-time market and the delayed market;

(3)最低价:取实时行情和延时行情中最低价的最低值;(3) Lowest price: take the lowest value of the lowest price in the real-time market and the delayed market;

(4)开盘价:以延迟行情开盘价为准,如果延迟行情没有开盘价则使用实时行情开盘价;(4) Opening price: The opening price of the delayed market shall prevail. If there is no opening price of the delayed market, the opening price of the real-time market shall be used;

(5)昨收价:以延时行情昨收价为准,如果延迟行情没有昨收价则使用实时行情昨收价;(5) Yesterday's closing price: The yesterday's closing price of the delayed market shall prevail. If the delayed market does not have yesterday's closing price, the real-time market yesterday's closing price shall be used;

(6)收盘价:以延时行情收盘价为准,如果延迟行情没有收盘价则使用实时行情收盘价;(6) Closing price: The closing price of the delayed market shall prevail. If there is no closing price of the delayed market, the closing price of the real-time market shall be used;

S53、合并完成后,更新行情缓存中该证券代码的最新数据,并将合并后的结果放入行情消息队列。S53. After the merging is completed, update the latest data of the security code in the quotation cache, and put the merged result into the quotation message queue.

S6、行情发送线程将行情消息队列中的消息转换成protobuf格式,并发送到kafka流式系统,供下游消费者使用,protobuf是Google开源的实现数据自动序列化和反序列化的机制,kafka是一种高吞吐量的分布式发布订阅消息系统;S6. The market sending thread converts the messages in the market message queue into protobuf format and sends them to the kafka streaming system for downstream consumers to use. protobuf is Google's open source mechanism for automatic data serialization and deserialization, and kafka is A high-throughput distributed publish-subscribe messaging system;

所述步骤S6,包括以下的步骤:The step S6 includes the following steps:

S61、独立的行情发送线程从行情消息队列中逐条取出待发送的原始缓存消息;S61, the independent market quotation sending thread takes out the original cached messages to be sent one by one from the quotation message queue;

S62、在内部转换成protobuf格式的消息,根据行情类型以及市场状态,将其发送到kafka流式系统中对应的主题中,供下游消费使用,行情类型包括行情、逐笔或盘口,市场状态包括盘前、盘中或盘后。S62. Internally convert the message in protobuf format, and send it to the corresponding topic in the kafka streaming system according to the market type and market status for downstream consumption. Including pre-market, intra-day or after-hours.

通过以上方法,实现了支持交易所证券代码列表落地文件,实现服务异常重启时主线程批量订阅行情实现快速数据恢复;行情接收线程通过多个行情消息队列降低接收延迟,同时通过多个行情解码线程异步解析行情消息,实现行情高并发处理;独立的行情合并线程完成实时行情和延迟行情消息合并,提高数据准确性;独立的行情发送线程将protobuf格式的数据存储至流式系统,供下游行情消费者使用;将行情的订阅、接收、解码、合并、存储均通过独立线程实现,彼此间通过服务进程内部的队列解耦,提升整体消息处理能力。Through the above methods, it is possible to support the landing file of the stock code list of the exchange, and realize the batch subscription of quotations by the main thread to achieve fast data recovery when the service is restarted abnormally; the quotation receiving thread reduces the receiving delay through multiple quotation message queues, and simultaneously uses multiple quotation decoding threads. Asynchronously parses market information to achieve high concurrent market processing; an independent market merging thread completes the merging of real-time and delayed market information to improve data accuracy; an independent market sending thread stores data in protobuf format to the streaming system for downstream market consumption Users can use it; the subscription, reception, decoding, merging, and storage of market quotations are implemented through independent threads, and they are decoupled from each other through the queue inside the service process to improve the overall message processing capability.

本发明还提供了一种美股多交易所行情处理系统,包括定时线程、主线程、行情回调处理线程、行情解码线程、行情合并线程和行情发送线程,The present invention also provides a quotation processing system for US stocks and multiple exchanges, including a timing thread, a main thread, a quotation callback processing thread, a quotation decoding thread, a quotation merging thread and a quotation sending thread,

定时线程用于生成异步事件,异步事件包括针对交易所以及美股市场的事件;Timing threads are used to generate asynchronous events, including events for exchanges and the US stock market;

主线程用于执行异步事件,批量订阅证券代码行情;The main thread is used to execute asynchronous events and subscribe to stock code quotes in batches;

行情回调处理线程用于执行行情消息处理逻辑,将行情消息放入对应的消息队列,行情消息根据代码类型分为链代码行情和证券代码行情;The market callback processing thread is used to execute the market message processing logic and put the market message into the corresponding message queue. The market message is divided into chain code market and securities code market according to the code type;

行情解码线程用于解析行情消息,获取实时行情和延时行情,延时行情放入缓存队列,并判断该实时行情是否为盘中行情,若是盘中行情,则将该实时行情放入缓存队列,若是盘前或盘后行情,则将该实时行情放入行情消息队列,并由行情发送线程作进一步处理;The market decoding thread is used to parse the market information, obtain the real-time market and delayed market, put the delayed market into the cache queue, and judge whether the real-time market is intraday market, if it is intraday, put the real-time market into the cache queue , if the market is pre-market or after-market, the real-time market will be put into the market message queue, and will be further processed by the market sending thread;

行情合并线程用于对缓存队列中的实时行情和延时行情进行合并,合并后的结果放入行情消息队列;The quotation merging thread is used to merge the real-time quotations and delayed quotations in the cache queue, and the combined result is put into the quotation message queue;

行情发送线程用于将行情消息队列中的消息转换成protobuf格式,并发送到kafka流式系统,供下游消费者使用。The market sending thread is used to convert the messages in the market message queue into protobuf format and send them to the kafka streaming system for downstream consumers to use.

本发明实现了通过配置文件定义每个交易所的开盘和收盘时间,自动订阅和取消订阅该市场行情;通过独立的线程处理来自各个交易所的行情;并且基于特定的规则完成盘中实时行情和延时行情的数据合并,保证数据准确性。The invention realizes defining the opening and closing time of each exchange through configuration files, and automatically subscribes and unsubscribes to the market quotations; processes quotations from various exchanges through independent threads; and completes intraday real-time quotations and quotations based on specific rules. Data merging of delayed market quotations ensures data accuracy.

以上是对本发明的较佳实施进行了具体说明,但本发明创造并不限于所述实施例,熟悉本领域的技术人员在不违背本发明精神的前提下还可做出种种的等同变形或替换,这些等同的变形或替换均包含在本申请权利要求所限定的范围内。The above is a specific description of the preferred implementation of the present invention, but the present invention is not limited to the described embodiments, and those skilled in the art can also make various equivalent deformations or replacements on the premise that does not violate the spirit of the present invention , these equivalent modifications or substitutions are all included within the scope defined by the claims of the present application.

Claims (10)

Translated fromChinese
1.一种美股多交易所行情处理方法,其特征在于:包括以下的步骤:1. A method for processing quotations of U.S. stocks and multiple exchanges, characterized in that: comprising the following steps:S1、定时线程生成异步事件,异步事件包括针对交易所以及美股市场的事件;S1. Timed threads generate asynchronous events, including events for exchanges and the U.S. stock market;S2、主线程执行异步事件,批量订阅证券代码行情;S2. The main thread executes asynchronous events and subscribes to stock code quotes in batches;S3、行情回调处理线程执行行情消息处理逻辑,将行情消息放入对应的消息队列,行情消息根据代码类型分为链代码行情和证券代码行情;S3. The market callback processing thread executes the market message processing logic, and puts the market message into the corresponding message queue. The market message is divided into chain code market and securities code market according to the code type;S4、行情解码线程解析行情消息,获取实时行情和延时行情,延时行情放入缓存队列,并判断该实时行情是否为盘中行情,若是盘中行情,则将该实时行情放入缓存队列,若是盘前或盘后行情,则将该实时行情放入行情消息队列,并跳转至步骤S6;S4. The market decoding thread parses the market information, obtains the real-time market and the delayed market, puts the delayed market into the cache queue, and judges whether the real-time market is an intraday market. If it is an intraday market, put the real-time market into the cache queue. , if the market is pre-market or after-market, the real-time market is put into the market message queue, and jumps to step S6;S5、行情合并线程对缓存队列中的实时行情和延时行情进行合并,合并后的结果放入行情消息队列;S5. The quotation merging thread merges the real-time quotations and delayed quotations in the cache queue, and the combined result is put into the quotation message queue;S6、行情发送线程将行情消息队列中的消息转换成protobuf格式,并发送到kafka流式系统,供下游消费者使用。S6. The market sending thread converts the messages in the market message queue into protobuf format, and sends them to the kafka streaming system for downstream consumers to use.2.如权利要求1所述的一种美股多交易所行情处理方法,其特征在于:所述步骤S1,包括以下的步骤:2. a kind of U.S. stock multi-exchange quotation processing method as claimed in claim 1, is characterized in that: described step S1, comprises the following steps:S11、主线程启动后读取配置文件,将所有定时事件时间设置添加到定时线程;S11. After the main thread is started, read the configuration file, and add all timed event time settings to the timed thread;S12、定时线程定时检查当前时间是否满足任一定时任务时间设置,如果满足,则将针对交易所的事件类型和交易所标识写入至事件消息队列,如果不满足,则定时线程休眠一段时间后再重新检查,直至满足任一定时任务时间设置。S12. The timing thread regularly checks whether the current time satisfies any timing task time setting, and if so, writes the event type and exchange ID for the exchange to the event message queue, if not, the timing thread sleeps for a period of time after Check again until any of the timing task time settings are met.3.如权利要求2所述的一种美股多交易所行情处理方法,其特征在于:所述步骤S2,包括以下的步骤:3. a kind of U.S. stock multi-exchange quotation processing method as claimed in claim 2, is characterized in that: described step S2, comprises the following steps:S21、主线程接收到该事件,向路透TREP服务器发起链代码订阅请求,即请求获取该市场的所有证券代码;S21. The main thread receives the event and initiates a chain code subscription request to the Reuters TREP server, that is, requests to obtain all securities codes in the market;S22、批量订阅证券代码行情。S22. Subscribing securities code quotations in batches.4.如权利要求3所述的一种美股多交易所行情处理方法,其特征在于:所述步骤S3中,链代码行情包括链代码快照行情和链代码更新行情。4 . The method for processing quotations of U.S. stocks on multiple exchanges as claimed in claim 3 , wherein in the step S3, the chaincode quotations include chaincode snapshot quotations and chaincode update quotations. 5 .5.如权利要求4所述的一种美股多交易所行情处理方法,其特征在于:所述步骤S4中,所述行情解码线程解析行情消息,获取实时行情和延时行情,包括以下的步骤:5. a kind of U.S. stock multi-exchange quotation processing method as claimed in claim 4, is characterized in that: in described step S4, described quotation decoding thread parses quotation information, obtains real-time quotation and delayed quotation, comprises following steps :S401、针对每个交易所,内部创建对应消息队列和缓存数据结构;S401. For each exchange, create a corresponding message queue and cache data structure internally;S402、行情接收线程将数据发送到消息队列后取出消息,行情解码线程解析出证券代码;S402, the quotation receiving thread sends the data to the message queue and then retrieves the message, and the quotation decoding thread parses the securities code;S403、根据证券代码从行情缓存中查找该支证券代码的缓存数据,解析行情消息每个字段,更新内部缓存数据,获取实时行情和延时行情。S403. Search the cached data of the securities code from the market information cache according to the securities code, parse each field of the market information, update the internal cached data, and obtain real-time and delayed market information.6.如权利要求5所述的一种美股多交易所行情处理方法,其特征在于:所述步骤S4中,所述判断该实时行情是否为盘中行情,是基于最新时间和交易日历来判断。6. a kind of U.S. stock multi-exchange quotation processing method as claimed in claim 5, is characterized in that: in described step S4, described judging whether this real-time quotation is intraday quotation, is to judge based on the latest time and trading calendar .7.如权利要求6所述的一种美股多交易所行情处理方法,其特征在于:所述步骤S5,包括以下的步骤:7. a kind of U.S. stock multi-exchange quotation processing method as claimed in claim 6 is characterized in that: described step S5, comprises the following steps:S51、行情合并线程从缓存队列中取出消息;S51, the market merging thread takes out the message from the cache queue;S52、在行情缓存中查找是否有相反状态的行情数据,若有,则基于特定规则对实时行情和延时行情进行合并,若没有,则直接取缓存队列中的行情数据,进行合并;S52. Find out whether there is market data in the opposite state in the market cache. If so, combine the real-time market and the delayed market based on specific rules. If not, directly fetch the market data in the cache queue and merge them;S53、更新行情缓存中该证券代码的最新数据,并将合并后的结果放入行情消息队列。S53 , update the latest data of the security code in the quotation cache, and put the combined result into the quotation message queue.8.如权利要求7所述的一种美股多交易所行情处理方法,其特征在于:所述步骤S52中,所述特定规则,包括:8. The method for processing quotations of U.S. stocks on multiple exchanges as claimed in claim 7, wherein in the step S52, the specific rules include:(1)最新价:根据行情时间判断,取实时行情和延时行情中行情时间大的最新价;(1) Latest price: Judging by the market time, take the latest price with a longer market time in the real-time market and the delayed market;(2)最高价:取实时行情和延时行情中最高价的最高值;(2) Highest price: take the highest value of the highest price in the real-time market and the delayed market;(3)最低价:取实时行情和延时行情中最低价的最低值;(3) Lowest price: take the lowest value of the lowest price in the real-time market and the delayed market;(4)开盘价:以延迟行情开盘价为准,如果延迟行情没有开盘价则使用实时行情开盘价;(4) Opening price: The opening price of the delayed market shall prevail. If there is no opening price of the delayed market, the opening price of the real-time market shall be used;(5)昨收价:以延时行情昨收价为准,如果延迟行情没有昨收价则使用实时行情昨收价;(5) Yesterday's closing price: The yesterday's closing price of the delayed market shall prevail. If the delayed market does not have yesterday's closing price, the real-time market yesterday's closing price shall be used;(6)收盘价:以延时行情收盘价为准,如果延迟行情没有收盘价则使用实时行情收盘价。(6) Closing price: The closing price of the delayed market shall prevail. If there is no closing price of the delayed market, the closing price of the real-time market shall be used.9.如权利要求8所述的一种美股多交易所行情处理方法,其特征在于:所述步骤S6,包括以下的步骤:9. a kind of U.S. stock multi-exchange quotation processing method as claimed in claim 8, is characterized in that: described step S6, comprises the following steps:S61、行情发送线程从行情消息队列中逐条取出待发送的原始缓存消息;S61, the quotation sending thread takes out the original cached messages to be sent one by one from the quotation message queue;S62、在内部转换成protobuf格式的消息,根据行情类型以及市场状态,将其发送到kafka流式系统中对应的主题中,供下游消费使用,行情类型包括行情、逐笔或盘口,市场状态包括盘前、盘中或盘后。S62. Internally convert the message in protobuf format, and send it to the corresponding topic in the kafka streaming system according to the market type and market status for downstream consumption. Including pre-market, intra-day or after-hours.10.一种美股多交易所行情处理系统,其特征在于:包括定时线程、主线程、行情回调处理线程、行情解码线程、行情合并线程和行情发送线程,10. A quotation processing system for U.S. stocks and multiple exchanges, characterized in that it comprises a timing thread, a main thread, a quotation callback processing thread, a quotation decoding thread, a quotation merging thread and a quotation sending thread,定时线程用于生成异步事件,异步事件包括针对交易所以及美股市场的事件;Timing threads are used to generate asynchronous events, including events for exchanges and the US stock market;主线程用于执行异步事件,批量订阅证券代码行情;The main thread is used to execute asynchronous events and subscribe to stock code quotes in batches;行情回调处理线程用于执行行情消息处理逻辑,将行情消息放入对应的消息队列,行情消息根据代码类型分为链代码行情和证券代码行情;The market callback processing thread is used to execute the market message processing logic and put the market message into the corresponding message queue. The market message is divided into chain code market and securities code market according to the code type;行情解码线程用于解析行情消息,获取实时行情和延时行情,延时行情放入缓存队列,并判断该实时行情是否为盘中行情,若是盘中行情,则将该实时行情放入缓存队列,若是盘前或盘后行情,则将该实时行情放入行情消息队列,并由行情发送线程作进一步处理;The market decoding thread is used to parse the market information, obtain the real-time market and delayed market, put the delayed market into the cache queue, and judge whether the real-time market is intraday market, if it is intraday, put the real-time market into the cache queue , if the market is pre-market or after-market, the real-time market will be put into the market message queue, and will be further processed by the market sending thread;行情合并线程用于对缓存队列中的实时行情和延时行情进行合并,合并后的结果放入行情消息队列;The quotation merging thread is used to merge the real-time quotations and delayed quotations in the cache queue, and the combined result is put into the quotation message queue;行情发送线程用于将行情消息队列中的消息转换成protobuf格式,并发送到kafka流式系统,供下游消费者使用。The market sending thread is used to convert the messages in the market message queue into protobuf format and send them to the kafka streaming system for downstream consumers to use.
CN202210217495.8A2022-03-072022-03-07Method and system for processing market quotations of multiple American stocksPendingCN114721843A (en)

Priority Applications (1)

Application NumberPriority DateFiling DateTitle
CN202210217495.8ACN114721843A (en)2022-03-072022-03-07Method and system for processing market quotations of multiple American stocks

Applications Claiming Priority (1)

Application NumberPriority DateFiling DateTitle
CN202210217495.8ACN114721843A (en)2022-03-072022-03-07Method and system for processing market quotations of multiple American stocks

Publications (1)

Publication NumberPublication Date
CN114721843Atrue CN114721843A (en)2022-07-08

Family

ID=82238619

Family Applications (1)

Application NumberTitlePriority DateFiling Date
CN202210217495.8APendingCN114721843A (en)2022-03-072022-03-07Method and system for processing market quotations of multiple American stocks

Country Status (1)

CountryLink
CN (1)CN114721843A (en)

Cited By (2)

* Cited by examiner, † Cited by third party
Publication numberPriority datePublication dateAssigneeTitle
CN116433383A (en)*2023-06-122023-07-14宁波森浦融讯科技有限公司Data processing method, device, electronic equipment and computer readable storage medium
CN119919236A (en)*2025-04-022025-05-02高盈国际创新科技(深圳)有限公司 A distributed multi-channel market information processing method and system

Citations (4)

* Cited by examiner, † Cited by third party
Publication numberPriority datePublication dateAssigneeTitle
US20100211519A1 (en)*2009-02-172010-08-19Parallel Trading Systems, Inc.Method and system for processing real-time, asynchronous financial market data events on a parallel computing platform
CN107067325A (en)*2017-04-182017-08-18湖南福米信息科技有限责任公司Filter method, filter and the supplying system of stock market data
CN111145024A (en)*2019-12-272020-05-12深圳市思迪信息技术股份有限公司Market data pushing method and device
CN111737622A (en)*2020-06-182020-10-02上海英方软件股份有限公司Method and device for subscribing market information flow

Patent Citations (4)

* Cited by examiner, † Cited by third party
Publication numberPriority datePublication dateAssigneeTitle
US20100211519A1 (en)*2009-02-172010-08-19Parallel Trading Systems, Inc.Method and system for processing real-time, asynchronous financial market data events on a parallel computing platform
CN107067325A (en)*2017-04-182017-08-18湖南福米信息科技有限责任公司Filter method, filter and the supplying system of stock market data
CN111145024A (en)*2019-12-272020-05-12深圳市思迪信息技术股份有限公司Market data pushing method and device
CN111737622A (en)*2020-06-182020-10-02上海英方软件股份有限公司Method and device for subscribing market information flow

Cited By (3)

* Cited by examiner, † Cited by third party
Publication numberPriority datePublication dateAssigneeTitle
CN116433383A (en)*2023-06-122023-07-14宁波森浦融讯科技有限公司Data processing method, device, electronic equipment and computer readable storage medium
CN116433383B (en)*2023-06-122023-10-31宁波森浦融讯科技有限公司Data processing method, device, electronic equipment and computer readable storage medium
CN119919236A (en)*2025-04-022025-05-02高盈国际创新科技(深圳)有限公司 A distributed multi-channel market information processing method and system

Similar Documents

PublicationPublication DateTitle
US9106480B2 (en)Performing computations in a distributed infrastructure
US20040213387A1 (en)Propagating commit times
CN113256355B (en)Method, device, medium, equipment and system for determining integral rights and interests in real time
CN108768826A (en)Based on the message route method under MQTT and Kafka high concurrent scenes
CN110377486B (en)Kafka-based method for realizing stable high-throughput asynchronous task processing
US20050108289A1 (en)Method of replicating data between computing devices which each use local clocks
WO2020259354A1 (en)Queue adjustment method, apparatus and device, and computer-readable storage medium
US7478130B2 (en)Message processing apparatus, method and program
CN111400011A (en)Real-time task scheduling method, system, equipment and readable storage medium
CN115994053A (en)Parallel playback method and device of database backup machine, electronic equipment and medium
CN114721843A (en)Method and system for processing market quotations of multiple American stocks
CN111125106A (en)Batch running task execution method, device, server and storage medium
CN115292012A (en) Thread pool management method and system, intelligent terminal and storage medium
CN114169997A (en) A debit method and device
CN114661490A (en) A method and system for market delay based on streaming system
WO2024103898A1 (en)Database cluster management method and apparatus
CN114491338A (en) A method and tool for realizing message real-time and persistence based on redis
US11574323B2 (en)Methods and systems for processing market data
CN115022263A (en) A delivery method, device, device and storage medium for delayed message
CN114331715A (en) An easy-to-control method and system for sending liquidation quotations
CN113392081A (en)Data processing system and method
CN115134320B (en)Transaction system for determining time sequence based on message distribution
CN118885308B (en) Message queue consumer processing method based on transaction group
CN115514678B (en)Continuity monitoring method for internet financial business
US20240193056A1 (en)Pauseless end-of-interval rollover with fault tolerance

Legal Events

DateCodeTitleDescription
PB01Publication
PB01Publication
SE01Entry into force of request for substantive examination
SE01Entry into force of request for substantive examination
CB02Change of applicant information

Country or region after:China

Address after:518100, Shenzhen City, Nanshan District, Shenzhen, Guangdong Province, China No. 18, Tzu Chi Road, Shuiwan Community, Shekou Street, Nanshan District, Shenzhen City, Guangdong Province Building 4C12K27, Haijing Plaza Building

Applicant after:Hong Kong Securities Software (Shenzhen) Co., Ltd.

Address before:Guangdong Province Shenzhen City Nanshan District Yu Hai Street Science and Technology Park Community Keyuan Road 15 KeXing Science Park B Building B4-1003

Applicant before:Yingli Shuzhi Technology (Shenzhen) Co., Ltd.

Country or region before:China

CB02Change of applicant information

[8]ページ先頭

©2009-2025 Movatter.jp