spqrk通过rdd和dataset实现相同sql操作


声明:本文转载自https://my.oschina.net/penngo/blog/1797168,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

待分析的数据文件格式,内容为nginx的日志记录,内容已经被处理过,已经转为只记录ip和url的txt文件,内容如下:

rdd读取txt文件:

    public static void rdd(){         SparkConf conf = new SparkConf().setAppName("name_rdd").setMaster("local[4]");         JavaSparkContext sc = new JavaSparkContext(conf);         JavaRDD<String> logRdd = sc.textFile(textFile);          JavaRDD<LogInfo> infoRdd = logRdd.map(new Function<String, LogInfo>() {             public LogInfo call(String line) throws Exception{                 String[] strs = line.split(" ");                 LogInfo log = new LogInfo();                 if(strs.length == 2){                     //LogInfo log = new LogInfo();                     log.setIp(strs[0]);                     log.setUrl(strs[1]);                 }                 else{                     log.setIp("");                     log.setUrl("");                 }                 return log;             }         });  //        rdd1(infoRdd); //        rdd2(infoRdd); //        rdd3(infoRdd);     } 	

dataset读取txt文件

    public static void dataset(){         spark = SparkSession                 .builder()                 .appName("name_dataset")                 .master("local[4]")                 .getOrCreate();         Dataset<String> logDataset = spark.read().textFile(textFile);         Dataset<LogInfo> mapDataset = logDataset.map(new MapFunction<String, LogInfo>() {             public LogInfo call(String line) throws Exception{                 String[] strs = line.split(" ");                 LogInfo log = new LogInfo();                 if(strs.length == 2){                     //LogInfo log = new LogInfo();                     log.setIp(strs[0]);                     log.setUrl(strs[1]);                 }                 return log;             }         }, Encoders.bean(LogInfo.class));  //        dataset1(mapDataset); //        dataset2(mapDataset); //        dataset3(mapDataset);     }

1、sql查询select * from visit_info where ip = '115.217.254.106'

rdd实现

    public static void rdd1(JavaRDD<LogInfo> infoRdd){         System.out.println("infoRdd.count()==================" + infoRdd.count());         JavaRDD<LogInfo> filterRdd = infoRdd.filter(new Function<LogInfo, Boolean>(){             public Boolean call(LogInfo log) throws Exception{                 if(log.getIp() != null && log.getIp().equals("115.217.254.106")){                     return true;                 }                 return false;             }         });         System.out.println("filterRdd.count()==================" + filterRdd.count());     }

dataset实现

public static void dataset1( Dataset<LogInfo> mapDataset){         //方式一         Dataset<LogInfo> filter1Dataset = mapDataset.filter(new FilterFunction<LogInfo>(){             public boolean call(LogInfo log) throws Exception{                 if(log.getIp() != null && log.getIp().equals("115.217.254.106")){                     return true;                 }                 return false;             }         });         filter1Dataset.show();         //方式二         Dataset<LogInfo> filter2Dataset = mapDataset.filter(" ip = '115.217.254.106' ");         mapDataset.show();         //方式三         Dataset<LogInfo> selectDataset = mapDataset.where(" ip = '115.217.254.106' ");         selectDataset.show();         //方式四         mapDataset.createOrReplaceTempView("visit_info");         Dataset<Row> sqlDataset = spark.sql("select * from visit_info where ip = '115.217.254.106' ");         sqlDataset.show();     }

2、sql查询select url,count(1) as co from visit_info group by url order by co desc limit 0,10

rdd实现

    public static void rdd2(JavaRDD<LogInfo> infoRdd){         JavaPairRDD<String, Long> pairRDD =  infoRdd.mapToPair(new PairFunction<LogInfo, String, Long>(){             public Tuple2<String, Long> call(LogInfo t) throws Exception{                 return new Tuple2(t.getUrl(), 1L);             }         });          JavaPairRDD<String, Long> reduceRDD = pairRDD.reduceByKey(new Function2<Long, Long, Long>(){             public Long call(Long v1, Long v2) throws Exception{                 return (Long)(v1 + v2);             }         });          JavaRDD<LogInfo> countRdd = reduceRDD.map((t)->{             LogInfo log = new LogInfo();             log.setUrl(t._1);             log.setCount(t._2);             return log;         });          JavaRDD<LogInfo> sortRdd = countRdd.sortBy(log->{             return log.getCount();         }, false, countRdd.getNumPartitions());         sortRdd.take(10).forEach(t->{             System.out.println("sortRdd==================" + t.getUrl() + " " + t.getCount());         });     }

dataset实现

	public static void dataset2(Dataset<LogInfo> mapDataset){         //方式一         mapDataset.select("url").groupBy("url").agg(count("url").as("co")).orderBy(functions.desc("co")).show();         //方式二         mapDataset.createOrReplaceTempView("visit_info");         Dataset<Row> sqlDataset = spark.sql(" select url,count(1) as co from visit_info group by url order by co desc ");         sqlDataset.show();     } 

3、sql查询select distinct url from visit_info    

rdd实现

	public static void rdd3(JavaRDD<LogInfo> infoRdd){         JavaPairRDD<String, Long> pairRDD =  infoRdd.mapToPair(new PairFunction<LogInfo, String, Long>(){             public Tuple2<String, Long> call(LogInfo t) throws Exception{                 return new Tuple2(t.getUrl(), 1L);             }         });         pairRDD.distinct().collect().forEach(t->{             System.out.println("distinct==================" + t._1 + " " + t._2);         });     }

dataset实现
 

    public static void dataset3(Dataset<LogInfo> mapDataset){         // 方式一         mapDataset.select("url").distinct().show();         // 方式二         mapDataset.createOrReplaceTempView("visit_info");         Dataset<Row> sqlDataset = spark.sql(" select distinct url from visit_info ");         sqlDataset.show();     }

 

本文发表于2018年04月18日 22:38
(c)注:本文转载自https://my.oschina.net/penngo/blog/1797168,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 2094 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1