Aggregating real-time tick data into OHLCV

Hello

I’ve been wondering how to go about aggregate real-time stock data from the New York Stock Exchange. This is an example of the ticks feed:

{"timestamp": "1587861684083", "stock": "AAPL", "price": 234.56, "size": 800}
{"timestamp": "1587861684083", "stock": "AAPL", "price": 234.57, "size": 200}
{"timestamp": "1587861684083", "stock": "AAPL", "price": 234.58, "size": 600}
{"timestamp": "1587861684087", "stock": "AAPL", "price": 234.50, "size": 400}
{"timestamp": "1587861684093", "stock": "MSFT", "price": 1234.56, "size": 10000}

Aggregating it to an OHLCV with a time bucket of 1 second and grouped by stock as an example:
O = first( price ) // first tick inside this time bucket
h = max( price ) // higiest tick inside this time bucket
L = min( price ) // lowest tick inside this time bucket
C = last( price ) // last tick inside this time bucket
V = sum( size ) // totall volume of shares traded tick inside this time bucket

we get the time bucket by:
1587861684083- 1587861684083 % 1000 -> 1587861684000
(it’s not really important whether the time bucket timestamp is “floored” or “ceiled”)

for AAPL:
{“timestamp”: “1587861684000”, “O”: 234.56, “H”: 234.58, “L”: 234.50, “C”: 234.50, “V”: 2000}
for MSFT:
{“timestamp”: “1587861684000”, “O”: 1234.56, “H”: 1234.56, “L”: 1234.56, “C”: 1234.56, “V”: 10000}

The end goal is to fire an even in pub/sub and store the produced aggregation in Redis Timeseries upon every final aggregation ( in this case every ). It’s also important not to produce aggregations for time buckets when there were no ticks. But it’s clear to me how to do that after I create the aggregate. What I need help with is the actual aggregation event.

Let me know if anything is unclear,
Cheers

There are a couple of options I can think of here, for the aggregation, I think you can take a look how the avg is implemented, the other (max, min, sum, first, last) should be very similar:

The question is how do the data arrives (stream/hashes ?) can you assume it arrived ordered by timestamp so you will know when to trigger the aggregation?

Yes, it’s a continuous stream of real-time data, arrives trough a WebSocket and my best guess was to keep pushing the events in a stream as they arrive. Yes, I can assume it comes in time stamp order.

Also any recommendations for ingesting the websocket into redis in the most optimal way? (or it doesn’t matter that much)

I think pushing to a stream (redis Stream https://redis.io/topics/streams-intro) is a good idea, you can register on this stream and maintain a hash per timestamp per stock that aggregate the data (will have a field per aggregated data such as first, max, min, …) Then when you see the next timestamp you can publish a message and write to Timeseries (maybe also delete the hash so memory will not explode). You can see this small example to register for messages on a stream:
https://oss.redislabs.com/redisgears/examples.html#basic-redis-stream-processing

That was my initial idea, but I held on to it so that I don’t influence anyone’s creativity, however, there is one small problem with it, I am basically relying on the arrival of timestamp from the next time bucket to trigger an aggregation “event” which is not guaranteed or can be late in the best-case scenario.

1587861684083
15xxxxxxx0010 <- first tick inside this time bucket (1000 ms)
15xxxxxxx0023 <- another possible tick (not guarannteed)
.
.
.
15xxxxxxx1000 <- this is where I would like and emit an aggregate for the ticks that happened in the past 1000 ms, but it’s not guaranteed that I will have a tick at this timestamp.

Would it be possible to set up a timer in the first tick that will trigger an aggregate event 990 ms after the first tick in this case, in general, it will trigger timestamp % 1000 ms after the first tick? Maybe if I could receive expire events on Redis keys or something like that. I wonder if setting a python timer/sleep on the first tick is a good idea.

Yes I got you problem, as a workaround you can definitely create another key called for example < time >_expire and set expiration on it. Then you need to register on changes on this key and you will get the expire event. Its not the best solution but it will work (notice that there might be a little delay in the expire event because expiration is lazy so you will get it on the cron job of redis). I do plan to support timer like api in future gears releases.

Correct me if I got it wrong: I need to enable CONFIG SET notify-keyspace-events Ex. Every time an event arrives in the stream I process it with RedisGears where I extract the time bucket from the timestamp ( 15xxxxxxx0023 ->15xxxxxxx1000 ). I check if a key <15xxxxxxx1000>_expire exists. If it doesn’t I create it with expiration time Px 15xxxxxxx0023 % 1000 = 977 (maybe add 1-5 ms extra just for safety ). I also create a hash like <15xxxxxxx1000>_hash for example and I set fields O H L C V. On the next stream events I keep on aggregating on top of those fields. When the key expires it will publish to the pub//sub channel, so I need to write a client that will listen on the channel and for every expiration, it will fetch the data from the hash key <15xxxxxxx1000>_hash and push it into a secondary stream that I can then process with a separate RedisGears script that will take the events and save them to RedisTimeseries ad also maybe do some data enrichment If I need it to. Or can all of this be done wit Redis gears? Skipping the pub/sub client ad receiving expiration events with Redis gears?

So yes it can all be done with RedisGears (as close as possible to the data, and there is also no need for CONFIG SET notify-keyspace-events Ex). You can get the expire event on RedisGears, read the _hash and put it to RedisTimeseries (also the enrichment can be done with RedisGears, maybe you want to search some indexed data with RediSearch and enrich according to the results?).

Regarding the creation of the expire key, yes you need to check if exists and if not create it with expiration (as I said this is a workaround for now, I hope that in the next RedisGears versions you will have the timer api), Notice that you can do it with a single command to Redis using NX and expire arguments (https://redis.io/commands/set)

Some suggestions:

  1. regarding the keys name, instead of putting the _expire as suffix put if as a prefix and then you can create a gears registration that fire on the expiration of keys with this prefix.

  2. If you are planning to move a Redis Cluster one day to distribute the work, you need to make sure the keys you are creating are located correctly on the shards (otherwise when you create a key you will get a move reply on Redis Cluster). RedisGears gives you all you need in order to do it. One way is to read from the original stream and reshuffle so data will be located correctly, the thing is that reshuffle takes time because its over the network (and its not really needed here), the better approach is to use the hashtag function (https://oss.redislabs.com/redisgears/runtime.html#hashtag) that returns a string that belong to the current shard you run on. This way you can create the keys like this ‘expire_<15xxxxxxx1000>_{%s}’ % hashtag() and you will know for sure you will not get any move reply. Take a look on how we done it on rgsync (https://github.com/RedisGears/rgsync/blob/b24ebb5e22d66252d51b57bdf7f3b9d8ffe6c001/rgsync/common.py#L24)

Let me know if you need any other help.

Thank you so much for this, I will read trough your Instructions now and I will try to implement them. As for the data enrichment - all I need for now is to store a single number in a key so with the aggregation I can also send some metadata number / C that will give me a stock yield calculation for example. As of right now I am going to be running on a single machine. Btw what sort of a delay should I expect from the expire notification that you mentioned?

@Damian_Danev the delay is really depends on the amount of such expire keys you will have. Redis has 2 expire Technics, one is on access and the other called active expire and happened in the background on Redis cron job. So if you have a lot of such expire keys that you do not touch, the active expire might decide that it runs to long and it should stop and continue on the next cron job cycle. Good news is that you can configure the amount of efforts to put on the active expire phase:

Redis reclaims expired keys in two ways: upon access when those keys are
found to be expired, and also in background, in what is called the
“active expire key”. The key space is slowly and interactively scanned
looking for expired keys to reclaim, so that it is possible to free memory
of keys that are expired and will never be accessed again in a short time.
The default effort of the expire cycle will try to avoid having more than
ten percent of expired keys still in memory, and will try to avoid consuming
more than 25% of total memory and to add latency to the system. However
it is possible to increase the expire “effort” that is normally set to
“1”, to a greater value, up to the value “10”. At its maximum value the
system will use more CPU, longer cycles (and technically may introduce
more latency), and will tollerate less already expired keys still present
in the system. It’s a tradeoff betweeen memory, CPU and latecy.
active-expire-effort 1

You said that the events are ordered by the timestamp so most of the time the next timestamp would be the trigger to continue processing. So IIUC there will be a respectively small amount of times where you reply on this expire event, only when data stop arriving there might be a key per stock where you need to reply on its expiration (is that correct?). How many stocks are we talking about?