Correct and fastest way to send Gears script to background using gears-cli

Adding re-partition didn’t help either:

gb.repartition(lambda x: int(len(x['value'])))

logs


==> 30003.log <==
13:M 20 May 2020 13:54:11.639 # <module> On ExecutionPlan_NotifyRun, execution aborted
13:M 20 May 2020 13:54:11.639 # <module> On ExecutionPlan_NotifyRun, execution aborted
13:M 20 May 2020 13:54:11.639 # <module> On ExecutionPlan_NotifyRun, execution aborted

==> 30001.log <==
9:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_NotifyReceived, execution aborted

==> 30002.log <==
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_NotifyRun, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_NotifyRun, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_NotifyRun, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_NotifyRun, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted
11:M 20 May 2020 13:54:26.791 # <module> On ExecutionPlan_TeminateExecution, execution aborted

==> 30003.log <==
13:M 20 May 2020 13:54:27.115 # <module> On ExecutionPlan_NotifyRun, execution aborted
13:M 20 May 2020 13:54:27.115 # <module> On ExecutionPlan_NotifyRun, execution aborted
13:M 20 May 2020 13:54:27.115 # <module> On ExecutionPlan_NotifyRun, execution aborted
13:M 20 May 2020 13:54:27.115 # <module> On ExecutionPlan_NotifyRun, execution aborted
13:M 20 May 2020 13:54:27.115 # <module> On ExecutionPlan_TeminateExecution, execution aborted
13:M 20 May 2020 13:54:27.115 # <module> On ExecutionPlan_TeminateExecution, execution aborted
13:M 20 May 2020 13:54:27.115 # <module> On ExecutionPlan_TeminateExecution, execution aborted
13:M 20 May 2020 13:54:27.115 # <module> On ExecutionPlan_TeminateExecution, execution aborted
13:M 20 May 2020 13:54:27.115 # <module> On ExecutionPlan_TeminateExecution, execution aborted
13:M 20 May 2020 13:54:27.115 # <module> On ExecutionPlan_TeminateExecution, execution aborted
13:M 20 May 2020 13:54:27.116 # <module> On ExecutionPlan_TeminateExecution, execution aborted
13:M 20 May 2020 13:54:27.116 # <module> On ExecutionPlan_TeminateExecution, execution aborted
13:M 20 May 2020 13:54:27.116 # <module> On ExecutionPlan_TeminateExecution, execution aborted
13:M 20 May 2020 13:54:27.116 # <module> On ExecutionPlan_TeminateExecution, execution aborted
13:M 20 May 2020 13:54:27.116 # <module> On ExecutionPlan_TeminateExecution, execution aborted
13:M 20 May 2020 13:54:27.116 # <module> On ExecutionPlan_TeminateExecution, execution aborted
13:M 20 May 2020 13:54:27.116 # <module> On ExecutionPlan_TeminateExecution, execution aborted
13:M 20 May 2020 13:54:27.116 # <module> On ExecutionPlan_TeminateExecution, execution aborted

==> 30001.log <==
9:M 20 May 2020 13:54:32.347 # <module> On ExecutionPlan_NotifyReceived, execution aborted
9:M 20 May 2020 13:54:37.826 # <module> On ExecutionPlan_NotifyReceived, execution aborted
9:M 20 May 2020 13:54:41.290 * Marking node 7e443986124535b40885896a5d14e3f345f28c40 as failing (quorum reached).
9:M 20 May 2020 13:54:41.290 # Cluster state changed: fail
9:M 20 May 2020 13:54:41.623 # <module> On ExecutionPlan_NotifyReceived, execution aborted

==> 30003.log <==
13:M 20 May 2020 13:54:41.290 * FAIL message received from 6fc86dc7c8184b038b3680d0813edf76aa453e3c about 7e443986124535b40885896a5d14e3f345f28c40
13:M 20 May 2020 13:54:41.290 # Cluster state changed: fail

==> 30001.log <==
9:M 20 May 2020 13:54:45.640 * Clear FAIL state for node 7e443986124535b40885896a5d14e3f345f28c40: is reachable again and nobody is serving its slots after some time.
9:M 20 May 2020 13:54:45.641 # Cluster state changed: ok
9:M 20 May 2020 13:54:45.641 # <module> On ExecutionPlan_NotifyReceived, execution aborted
9:M 20 May 2020 13:54:49.369 # <module> On ExecutionPlan_NotifyReceived, execution aborted
9:M 20 May 2020 13:54:52.765 # <module> On ExecutionPlan_NotifyReceived, execution aborted
9:M 20 May 2020 13:54:55.637 # <module> On ExecutionPlan_NotifyReceived, execution aborted
9:M 20 May 2020 13:54:57.352 * Marking node effdf1f49518b3f80cdf91948522eab1d099d1c0 as failing (quorum reached).
9:M 20 May 2020 13:54:57.352 # Cluster state changed: fail

==> 30002.log <==
11:M 20 May 2020 13:54:57.352 * FAIL message received from 6fc86dc7c8184b038b3680d0813edf76aa453e3c about effdf1f49518b3f80cdf91948522eab1d099d1c0
11:M 20 May 2020 13:54:57.352 # Cluster state changed: fail

==> 30003.log <==
13:M 20 May 2020 13:54:57.732 * Clear FAIL state for node 7e443986124535b40885896a5d14e3f345f28c40: is reachable again and nobody is serving its slots after some time.
13:M 20 May 2020 13:54:57.732 # Cluster state changed: ok

==> 30001.log <==
9:M 20 May 2020 13:55:01.669 * Clear FAIL state for node effdf1f49518b3f80cdf91948522eab1d099d1c0: is reachable again and nobody is serving its slots after some time.
9:M 20 May 2020 13:55:01.669 # Cluster state changed: ok

==> 30002.log <==
11:M 20 May 2020 13:55:01.657 * Clear FAIL state for node effdf1f49518b3f80cdf91948522eab1d099d1c0: is reachable again and nobody is serving its slots after some time.
11:M 20 May 2020 13:55:01.657 # Cluster state changed: ok

@meirsh if you don’t want to download kaggle dataset sample data inside sample_data folder.
I am checking if it will be same behaviour on current edge single Redis instance docker.

Checking, will reply soon.

@meirsh I checked with single instance:

docker pull redislabs/redismod:edge
docker run -p 6379:6379 redislabs/redismod:edge

code works as expected - pipeline runs until spellchecker. It seems to be killer only for RedisGears cluster.

@AlexMikhalev so seems like there is couple of issues here, some of them are in the gear function itself but one is a real RedisGears issue (which is a funny thing because I was currently working on fixing it). Its going to be a long reply but I hope I will be able to explain it good enough. And yes basically you are right its a cluster issue and it will not happened with one shard. So here is the explanation:

Lets call the shard that you executed the RG.PYEXECUTE on the initiator. The initiator get the RG.PYEXECUTE command with the requirement list, download and install them (before running the function). Then it runs the function and link the requirements to everything that function creates (registrations, runs, …). Now it needs to send the registrations to all the other shards. In order to do it, it serialize the registration with the requirements and all the steps (python function) and send it to all the shards. The serialized registration here is huge, not just because the requirements are very big but because it also serialize the ‘nlp’ object (because it used by ‘parse_paragraphs’ function) which is by itself huge. Till here everything is ok, its should only happened once so its not a big deal. The issue is that its not happening only once… it happened each time this registration fires, why? because the registration mode is async which means that it distributed across all the shards. So each time you write a key, Gear serialize and deserialize the registration to a buffer and memory quickly explodes.

Now, I agree that requirements should not have been serialized and deserialized each time, as well as the execution itself, only the data its working on (like the key for example) should have been sent. I am currently working on fixing it for the next version. But until then I am going to suggest a “workaround” (which I also believe is the preferred way to do it regarding the issue I describe). The “workaround” I will suggest will trigger a local execution on the shard that got the event (without creating global execution which require initialization time in order to send it to all the shards).

The first thing I would suggest is to initialize the ‘nlp’ on each shard separately. It can be done by using the ‘onRegistered’ callback when register the function. this callback will be called on each shard upon registration (https://oss.redislabs.com/redisgears/functions.html#register). The second thing I would like to suggest is that each shard will write to a local keys by using the {} and ‘hashtag’ function. And the last thing I am going to suggest is to change the execution mode to ‘async_local’. It will look something like this:

nlp = None

def OnRegistered():
    global nlp
    import spacy
    nlp=spacy.load('en_core_web_md', disable=['ner','tagger'])
    nlp.max_length=2000000

def remove_prefix(text, prefix):
    return text[text.startswith(prefix) and len(prefix):]

def parse_paragraphs(x):
    global nlp
    key_prefix="en:paragraphs:"
    #make sure we only process english article
    paragraphs =x['value']
    key = x['key']
    doc=nlp(paragraphs)
    idx=1
    article_id=remove_prefix(key,key_prefix)
    for each_sent in doc.sents:
        sentence_key="sentences{%s}:%s:%s" % (key, article_id, idx)
        execute('SET', sentence_key, each_sent)
        idx+=1
        execute('SADD','processed_docs_stage2_sentence{%s}' % hashtag(), article_id)
        log("Successfully processed paragraphs "+str(article_id),level='notice')
    else:
        execute('SADD','screw_ups{%s}' % hashtag(), x['key'])
    

GB().\
foreach(parse_paragraphs).\
count().\
register('en:paragraphs:*', keyTypes=['string'], onRegistered=OnRegistered, mode="async_local")

Now each time a key will be written to a shard it will be processed locally on the shard. On failure it will be written to a local shard key ‘screw_ups{< string that match to shard hslot >}’ and on success it will be written to ‘processed_docs_stage2_sentence{< same string that matches shard hslot >}’. Also for each key more keys will be create with the prefix ‘sentences{ < the original key > }’ and we will know they are sitting correctly on the shards hslot mapping again thanks to the ‘{}’.

Now you can use batch gear execution to collect all the ‘processed_docs_stage2_sentence{…}’ keys and analyze them. Or maybe get all the ‘screw_ups’. And you can easily get all the ‘sentences’ created for a given key.

One last thing to notice, Gears will run your function in the background one by one for each event. If you will write your data faster then the time it takes to process this buffer will increase (You can see it increases with RG.DUMPREGISTRATIONS and see the amount of trigger executions and the amount of completed execution, the diff is the backlog :slight_smile: ). If you have picks then its fine because Gear will keep up the pace after the peek end. But if you always write faster this buffer will continue to grow. One way to avoid it is setting the execution mode to ‘sync’ which mean that it will run at the same thread as the write itself and you will only get the write reply after the processing has finished. If you chose this approach you will know for sure this backlog is not growing but you will also increase the reply latency. This is a tradeoff that you will have to chose depends on the full use-case.

Hope its clear. Please update us about the progress and if you have any other questions/issues.

Thank you @meirsh, I will try your recommendation. Serialisation/deserialisation of nlp and another similar object (scispacy) which is what I was fighting using dask.distributed and joblib.
I will rewrite using your sample - my next step in process have even bigger memory requirements using transformers BERT tokeniser and I just manage to blow away single instance feeding the same pipeline.

@AlexMikhalev Notice that you can put the initialization of all those objects in the onRegistered callback so they will not be serialized/deserialized.

Yes, thank, doing it now.

1 Like

@meirsh I managed to get this step working at least twice.
I am struggling to understand the role of {}:

and I didn’t find any mention in the documents.
Can you point me in the right direction?

Found explanation hashing algorithm and role of {} in https://redis.io/topics/cluster-spec
Really cool feature - for example I can always make sure all sentences from same article stays on the same shard by using {article_id} inside of the key.

@AlexMikhalev Yes this is the idea so you will not try to create keys which are not match to the shard.

Sorry, thread went way too long, it provides enough context. For the last several hours I am struggling to get simpler step working:

from langdetect import detect   

def remove_prefix(text, prefix):
    return text[text.startswith(prefix) and len(prefix):]

def detect_language(record):
    #detect language of the article
    value=record['value']
    value1000=value[:1000]
    log("Value 1000 "+value1000)
    try:
        lang=detect(value1000)
    except:
        lang="empty"
    if lang=='en':
        article_id = remove_prefix(record['key'],'paragraphs:') 
        paragraph_key="langen:{%s}" % (article_id)
        log(f"Success lang {paragraph_key}",level='notice')
        log('Hashtag {%s}' % hashtag())
        execute('SET', paragraph_key, value)
        execute('SADD','successfull_lang{%s}' % hashtag(), paragraph_key)
    else:
        log("Failed to detect language: "+str(record['key']),level='notice')
        execute('SADD','articles_to_delete', record['key'])

gb = GB()
gb.foreach(detect_language)
gb.register('paragraphs:*',keyTypes=['string'], mode="sync")

execute('SET', paragraph_key, value) doesn’t return any errors, but also doesn’t create a record.
I tried changing key format, mode and logged everything around it, and still no records with langen prefix, despite log capturing Success lang langen:{0ec759a568cb64acd211d0977da4ee9b098a7dec}
and smembers successfull_lang{4MP} returning list of correct keys.
Any attempt to query keys via RedisInsight or redis-cli -c fails.

Log output example:

47:M 21 May 2020 08:36:38.924 * <module> GEARS: Value 1000 Infection by enveloped viruses, whose infectious particles are surrounded by a lipid bilayer, requires fusion of the host cell and viral membranes; this process is facilitated by one or more viral envelope glycoproteins [1] . Although details of this mechanism vary among viruses, viral glycoproteins typically consist of a surface subunit, which binds to a host cell receptor, and a transmembrane subunit responsible for drawing host and viral membranes together via the formation of a stable post-fusion conformation [2] . The "class I" viral fusion proteins, which include those of the human immunodeficiency viruses, influenza, Ebola viruses [exemplified by Ebola virus (EBOV) and Sudan virus (SUDV)] and Marburg virus (MARV), are defined by the formation of a core, trimeric α-helical bundle by the ectodomain of the transmembrane subunit during membrane fusion [1, 3] . The post-fusion conformations consist of a "trimer-of-hairpins" motif in which the ectodomain N-and C-terminal segments
47:M 21 May 2020 08:36:38.935 * <module> GEARS: Success lang langen:{0ec759a568cb64acd211d0977da4ee9b098a7dec}
47:M 21 May 2020 08:36:38.935 * <module> GEARS: Hashtag {4MP}

There is a cryptic message in “Executions”:

1) []
2) ["['Traceback (most recent call last):\\n', ' File \"<string>\", line 1, in <lambda>\\n', 'spam.error: execution plan does not exist\\r\\n\\n']\u0000"]

But I can’t relate it to the function.
Any hints? Only one function registered - this is the first step in pipeline.

@AlexMikhalev you do not see the key because its not necessarily belong to the shard so the shard refuse to create it. You are putting the ‘article_id’ and not the ‘record[‘key’]’ inside the {}. Try to put ‘record[‘key’]’ and it should work. I agree there should have been a better error message here, will see if I can fix it (basically its internal inside redis and it returns NULL and not error)…

Regarding the cryptic error, where do you see it? (on which command?)

Cryptic error is on some events inside RedisInsigt, RedisGears, Executions.

Changing to record[‘key’] worked.
So if I manipulate the key shard will refuse to accept it? I will have to rethink my pipeline logic - my assumption was that I create keys with prefix corresponding to each processing step: paragraphs:*, en:*, sentences:*, tokens:*. This will make sure functions will only capture what they are intend to process. Is any better way?

@AlexMikhalev not sure I follow whats the issue, you can still create the keys with those prefixes, just make sure to add the {} with the original key name somewhere in the new key name so the shard will accept it. You can even put the artical_id, something like this should work ‘langen:%s:{%s}’ % (article_id, record[keys])

Issue keys become less humanly readable: instead of sentences:article_id, it will be sentence:article_id:paragraph:article_id, but I can live with it.
Error message would save me time - otherwise it looks like confirmed writes lost inside Redis Cluster.

Yes I agree, will just raise an error if got NULL reply with hint why the write might be rejected…

Thank you for your help @meirsh I should have read your first reply more carefully, you actually gave me better example initially, my final version:

paragraph_key="en:%s:{%s}" % (article_id, hashtag())
2 Likes

Excellent discussion here. My data scientist ran into a similar issue I believe. I’m going to point him to this discussion.

Feel free to PM for lessons learned.