Solution :
Step 1 : Import Single table .
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db -username=retail_dba -password=cloudera -table=products --target-dir=p93
Note : Please check you dont have space between before or after '=' sign. Sqoop uses the MapReduce framework to copy data from RDBMS to hdfs
Step 2 : Step 2 : Read the data from one of the partition, created using above command, hadoop fs -cat p93_products/part-m-00000
Step 3 : Load this directory as RDD using Spark and Python (Open pyspark terminal and do following}. productsRDD = sc.textFile(Mp93_products")
Step 4 : Filter empty prices, if exists
#filter out empty prices lines
Nonempty_lines = productsRDD.filter(lambda x: len(x.split(",")[4]) > 0)
Step 5 : Create data set like (categroyld, (id,name,price)
mappedRDD = nonempty_lines.map(lambda line: (line.split(",")[1], (line.split(",")[0], line.split(",")[2], float(line.split(",")[4]))))
tor line in mappedRDD.collect(): print(line)
Step 6 : Now groupBy the all records based on categoryld, which a key on mappedRDD it will produce output like (categoryld, iterable of all lines for a key/categoryld)
groupByCategroyld = mappedRDD.groupByKey() for line in groupByCategroyld.collect(): print(line)
step 7 : Now sort the data in each category based on price in ascending order.
# sorted is a function to sort an iterable, we can also specify, what would be the Key on which we want to sort in this case we have price on which it needs to be sorted.
groupByCategroyld.map(lambda tuple: sorted(tuple[1], key=lambda tupleValue: tupleValue[2])).take(5)
Step 8 : Now sort the data in each category based on price in descending order.
# sorted is a function to sort an iterable, we can also specify, what would be the Key on which we want to sort in this case we have price which it needs to be sorted.
on groupByCategroyld.map(lambda tuple: sorted(tuple[1], key=lambda tupleValue: tupleValue[2] , reverse=True)).take(5)
Submit