Solution :
Step 1 : Import Single table .
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db -username=retail_dba -password=cloudera -table=orders --target-dir=p92_orders –m 1
sqoop import -connect jdbc:mysql://quickstart:3306/retail_db -username=retail_dba -password=cloudera -table=order_items --target-dir=p92_order_orderitems --m 1
Note : Please check you dont have space between before or after '=' sign. Sqoop uses the MapReduce framework to copy data from RDBMS to hdfs
Step 2 : Read the data from one of the partition, created using above command, hadoop fs -cat p92_orders/part-m-00000 hadoop fs -cat p92 orderitems/part-m-00000
Step 3 : Load these above two directory as RDD using Spark and Python (Open pyspark terminal and do following). orders = sc.textFile(Mp92_orders") orderitems = sc.textFile("p92_order_items")
Step 4 : Convert RDD into key value as (orderjd as a key and rest of the values as a value)
#First value is orderjd
orders Key Value = orders.map(lambda line: (int(line.split(",")[0]), line))
#Second value as an Orderjd
orderltemsKeyValue = orderltems.map(lambda line: (int(line.split(",")[1]), line))
Step 5 : Join both the RDD using orderjd
joinedData = orderltemsKeyValue.join(ordersKeyValue)
#print the joined data
for line in joinedData.collect():
print(line)
#Format of joinedData as below.
#[Orderld, 'All columns from orderltemsKeyValue', 'All columns from ordersKeyValue']
ordersPerDatePerCustomer = joinedData.map(lambda line: ((line[1][1].split(",")[1], line[1][1].split(",M)[2]), float(line[1][0].split(",")[4]))) amountCollectedPerDayPerCustomer = ordersPerDatePerCustomer.reduceByKey(lambda runningSum, amount: runningSum + amount}
#(Out record format will be ((date,customer_id), totalAmount} for line in amountCollectedPerDayPerCustomer.collect(): print(line)
#now change the format of record as (date,(customer_id,total_amount))
revenuePerDatePerCustomerRDD = amountCollectedPerDayPerCustomer.map(lambda threeElementTuple: (threeElementTuple[0][0], (threeElementTuple[0][1],threeElementTuple[1])))
for line in revenuePerDatePerCustomerRDD.collect():
print(line)
#Calculate maximum amount collected by a customer for each day
perDateMaxAmountCollectedByCustomer = revenuePerDatePerCustomerRDD.reduceByKey(lambda runningAmountTuple, newAmountTuple: (runningAmountTuple if runningAmountTuple[1] >= newAmountTuple[1] else newAmountTuple})
for line in perDateMaxAmountCollectedByCustomer\sortByKey().collect(): print(line)
Submit