待分析的数据文件格式,内容为nginx的日志记录,内容已经被处理过,已经转为只记录ip和url的txt文件,内容如下:

rdd读取txt文件:
public static void rdd(){ SparkConf conf = new SparkConf().setAppName("name_rdd").setMaster("local[4]"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> logRdd = sc.textFile(textFile); JavaRDD<LogInfo> infoRdd = logRdd.map(new Function<String, LogInfo>() { public LogInfo call(String line) throws Exception{ String[] strs = line.split(" "); LogInfo log = new LogInfo(); if(strs.length == 2){ //LogInfo log = new LogInfo(); log.setIp(strs[0]); log.setUrl(strs[1]); } else{ log.setIp(""); log.setUrl(""); } return log; } }); // rdd1(infoRdd); // rdd2(infoRdd); // rdd3(infoRdd); }
dataset读取txt文件
public static void dataset(){ spark = SparkSession .builder() .appName("name_dataset") .master("local[4]") .getOrCreate(); Dataset<String> logDataset = spark.read().textFile(textFile); Dataset<LogInfo> mapDataset = logDataset.map(new MapFunction<String, LogInfo>() { public LogInfo call(String line) throws Exception{ String[] strs = line.split(" "); LogInfo log = new LogInfo(); if(strs.length == 2){ //LogInfo log = new LogInfo(); log.setIp(strs[0]); log.setUrl(strs[1]); } return log; } }, Encoders.bean(LogInfo.class)); // dataset1(mapDataset); // dataset2(mapDataset); // dataset3(mapDataset); }
1、sql查询select * from visit_info where ip = '115.217.254.106'
rdd实现
public static void rdd1(JavaRDD<LogInfo> infoRdd){ System.out.println("infoRdd.count()==================" + infoRdd.count()); JavaRDD<LogInfo> filterRdd = infoRdd.filter(new Function<LogInfo, Boolean>(){ public Boolean call(LogInfo log) throws Exception{ if(log.getIp() != null && log.getIp().equals("115.217.254.106")){ return true; } return false; } }); System.out.println("filterRdd.count()==================" + filterRdd.count()); }
dataset实现
public static void dataset1( Dataset<LogInfo> mapDataset){ //方式一 Dataset<LogInfo> filter1Dataset = mapDataset.filter(new FilterFunction<LogInfo>(){ public boolean call(LogInfo log) throws Exception{ if(log.getIp() != null && log.getIp().equals("115.217.254.106")){ return true; } return false; } }); filter1Dataset.show(); //方式二 Dataset<LogInfo> filter2Dataset = mapDataset.filter(" ip = '115.217.254.106' "); mapDataset.show(); //方式三 Dataset<LogInfo> selectDataset = mapDataset.where(" ip = '115.217.254.106' "); selectDataset.show(); //方式四 mapDataset.createOrReplaceTempView("visit_info"); Dataset<Row> sqlDataset = spark.sql("select * from visit_info where ip = '115.217.254.106' "); sqlDataset.show(); }
2、sql查询select url,count(1) as co from visit_info group by url order by co desc limit 0,10
rdd实现
public static void rdd2(JavaRDD<LogInfo> infoRdd){ JavaPairRDD<String, Long> pairRDD = infoRdd.mapToPair(new PairFunction<LogInfo, String, Long>(){ public Tuple2<String, Long> call(LogInfo t) throws Exception{ return new Tuple2(t.getUrl(), 1L); } }); JavaPairRDD<String, Long> reduceRDD = pairRDD.reduceByKey(new Function2<Long, Long, Long>(){ public Long call(Long v1, Long v2) throws Exception{ return (Long)(v1 + v2); } }); JavaRDD<LogInfo> countRdd = reduceRDD.map((t)->{ LogInfo log = new LogInfo(); log.setUrl(t._1); log.setCount(t._2); return log; }); JavaRDD<LogInfo> sortRdd = countRdd.sortBy(log->{ return log.getCount(); }, false, countRdd.getNumPartitions()); sortRdd.take(10).forEach(t->{ System.out.println("sortRdd==================" + t.getUrl() + " " + t.getCount()); }); }
dataset实现
public static void dataset2(Dataset<LogInfo> mapDataset){ //方式一 mapDataset.select("url").groupBy("url").agg(count("url").as("co")).orderBy(functions.desc("co")).show(); //方式二 mapDataset.createOrReplaceTempView("visit_info"); Dataset<Row> sqlDataset = spark.sql(" select url,count(1) as co from visit_info group by url order by co desc "); sqlDataset.show(); }
3、sql查询select distinct url from visit_info
rdd实现
public static void rdd3(JavaRDD<LogInfo> infoRdd){ JavaPairRDD<String, Long> pairRDD = infoRdd.mapToPair(new PairFunction<LogInfo, String, Long>(){ public Tuple2<String, Long> call(LogInfo t) throws Exception{ return new Tuple2(t.getUrl(), 1L); } }); pairRDD.distinct().collect().forEach(t->{ System.out.println("distinct==================" + t._1 + " " + t._2); }); }
dataset实现
public static void dataset3(Dataset<LogInfo> mapDataset){ // 方式一 mapDataset.select("url").distinct().show(); // 方式二 mapDataset.createOrReplaceTempView("visit_info"); Dataset<Row> sqlDataset = spark.sql(" select distinct url from visit_info "); sqlDataset.show(); }