Spark-Redis connection

Hi,

I’m trying to connect Redis with Spark. I’d like to read a Redis HASH db into a Spark dataframe. I think I’m close to getting it to work. There don’t seem to be many code samples out there written for Python. After much effort, I think I’m finally able to run the code without error, however I’m not returning any results :frowning:

My data is located in Redis db 0. My code as follows:

spark = SparkSession.builder.appName(“myApp”).config(‘repositories’,‘app/spark-apps/spark-redis-2.3.1-M2-jar-with-dependencies.jar’).config(“spark.redis.host”, “redis-cache”).config(“spark.redis.port”, “6379”).config(“spark.redis.dbNum”, “0”).getOrCreate()

spark.conf.set("master", "spark://spark-master:7077")
sc = spark.sparkContext
sc.addPyFile("spark-apps/appx_wordcount_redis.zip")
sc.setLogLevel("ERROR")

data_schema = [StructField('id', StringType(), True), StructField('text', StringType(), True)]
final_struc = StructType(fields=data_schema)

df = spark.read.format("org.apache.spark.sql.redis")\
          .schema(final_struc)\
          .option("table", "0")\
          .option("key.column", "id").load()

df.show(10)
df.printSchema()

±–±—+
| id|text|
±–±—+
±–±—+

root
|-- id: string (nullable = true)
|-- text: string (nullable = true)

My Redis db0 HASH looks like this:

127.0.0.1:6379> HGETALL rec-25

  1. “text”
  2. "“I PURCHASED A SET OF BRAKE PADS FROM ““CARQUEST AUTO PARTS”” IN BETHLEHEM”
  3. “id”
  4. “732996”

Any ideas why no results?

Thanks!

@rmk

not sure what issue you are facing here, remember that for every record it will create new hashes in redis. based on your logic it has multiple hashes each has text and id field. and incremental number appended in hashes key.

my friend @LorisCro has written nice blog about Getting Started with Redis, Apache Spark and Python :

this should help you.

All the best.

The Issue that I am facing is that I cannot write to alternative redis databases other than 0. The dbNm configuration option does not appear to work!

@rmk

have you tried setting configuration with variable spark.redis.db?
spark.redis.db - optional DB number. If you are using it in Redis cluster mode it may be an issue.

Thanks for the suggestion. It doesn’t seem to make any difference.

I tried spar.redis.db vs. spark.redis.dbNum and it did not work. Documentation from different sources shows it both ways. I looked at the source code. Even that’s confusing :slight_smile:

I also confirmed that this is running in client mode vs. cluster mode. Client mode is the default, however I added the parm. in spark-submit but it too did not make any difference.

Hopefully I get to the bottom of this soon. Anyone else with a suggestion?

@rmk

Possibly this bug in the spark-redis library, someone has already logged this (dont tell me its you :wink: )

Hello

I was able to make it work with spark.redis.db.

The issue is related to a mix between db and dbNum in the code see here

Can you test again with db?

I will in parallel work on a fix.

Regards
Tug

1 Like

Hello Tug,

Yes, I see it. I did however try spark.redis.db and still was unable to get it to work in both places I reference it.

I’m think what might be going on here is throughout the code, it references both db and dbnum which I think is the inconsistency your’re referring to. I set both the default database and then trying to change from the default later in the code. I don’t know scala, but maybe what’s going on is the default might use one parameter and the option to change from the default uses the other?

Can you tell me which is which?

Thanks,

Hello Tug,

I did confirm that spark.redis.db works. I do agree that the references to dbNum be changed.

Finally, I was also able to resolve what was a seperate but contributing issue preventing me to switch context of the redis.db I wanted to write to within the same spark session using pyspark. It turns out that you must redefine the spark session using the .getOrCreate() method. As per the pyspark doc https://spark.apache.org/docs/latest/api/python/pyspark.sql.html “In case an existing SparkSession is returned, the config options specified in this builder will be applied to the existing SparkSession.” This is exactly what I needed to do. After that, it worked like a charm :slight_smile: Thanks for the assistance. I hope this might help others sort this out.

2 Likes