I want to make sure I am not re-processing sentences when I do nlp pipeline because I calculate rank on next step.
Example code:
def process_item(record):
shard_id=hashtag()
log(f"Matcher received {record['key']} and my {shard_id}")
for each_key in record['value']:
sentence_key=record['key']+f':{each_key}'
tokens=set(record['value'][each_key].split(' '))
processed=execute('SISMEMBER','processed_docs_stage3{%s}' % hashtag(),sentence_key)
log(f"Matcher thinks {processed}")
log("Matcher: length of tokens " + str(len(tokens)))
if not processed:
execute('SADD','processed_docs_stage3{%s}' % hashtag(),sentence_key)
else:
log(f"Alteady processed {sentence_key}")
bg = GearsBuilder('KeysReader')
bg.foreach(process_item)
bg.count()
bg.run('sentence:*', mode="async_local")
If I run it twice it will produce desired behaviour in logs:
Matcher Alteady processed sentence:PMC5539802.xml:{06S}:53
Is it consistent behaviour? In other words the same hset keys will be allocated to the same shards so Sets behaviour even in cluster configuration will be valid? Obviously if I remove hashtag checks will fail with permission error.
I want to make sure I am not re-processing sentences regardless of gears to shards allocation.