最近有个需求,实时统计pv,uv
,结果按照date,hour,pv,uv
来展示,按天统计,第二天重新统计,当然了实际还需要按照类型字段分类统计pv,uv
,比如按照date,hour,pv,uv,type
来展示。这里介绍最基本的pv,uv
的展示。
id |
uv |
pv |
date |
hour |
---|---|---|---|---|
1 |
155599 |
306053 |
2018-07-27 |
00 |
2 |
255496 |
596223 |
2018-07-27 |
01 |
... |
... |
... |
... |
... |
10 |
10490270 |
12927245 |
2018-07-27 |
10 |
关于什么是pv,uv
,可以参见这篇博客https://blog.csdn.net/petermsh/article/details/78652246
1、项目流程
日志数据从flume
采集过来,落到hdfs
供其它离线业务使用,也会sink
到kafka
,sparkStreaming
从kafka
拉数据过来,计算pv,uv
,uv
是用的redis
的set
集合去重,最后把结果写入mysql
数据库,供前端展示使用。
2、具体过程
1)pv
的计算
拉取数据有两种方式,基于received
和direct
方式,这里用direct
直拉的方式,用的mapWithState
算子保存状态,这个算子与updateStateByKey
一样,并且性能更好。当然了实际中数据过来需要经过清洗,过滤,才能使用。
定义一个状态函数
// 实时流量状态更新函数
val mapFunction = (datehour:String, pv:Option[Long], state:State[Long]) => {
val accuSum = pv.getOrElse(0L) + state.getOption().getOrElse(0L)
val output = (datehour,accuSum)
state.update(accuSum)
output
}
计算pv
val stateSpec = StateSpec.function(mapFunction)
val helper_count_all = helper_data.map(x => (x._1,1L)).mapWithState(stateSpec).stateSnapshots().repartition(2)
这样就很容易的把pv
计算出来了。
2)uv的计算
uv是要全天去重的,每次进来一个batch
的数据,如果用原生的reduceByKey
或者groupByKey
对配置要求太高,在配置较低情况下,我们申请了一个93G
的redis
用来去重,原理是每进来一条数据,将date
作为key
,guid
加入set
集合,20
秒刷新一次,也就是将set
集合的尺寸取出来,更新一下数据库即可。
helper_data_dis.foreachRDD(rdd => {
rdd.foreachPartition(eachPartition => {
var jedis: Jedis = null
try {
jedis = getJedis
eachPartition.foreach(x => {
val arr = x._2.split("\t")
val date: String = arr(0).split(":")(0)
// helper 统计
val key0 = "helper_" + date
jedis.sadd(key0, x._1)
jedis.expire(key0, ConfigFactory.rediskeyexists)
// helperversion 统计
val key = date + "_" + arr(1)
jedis.sadd(key, x._1)
jedis.expire(key, ConfigFactory.rediskeyexists)
})
} catch {
case e: Exception => {
logger.error(e)
logger2.error(HelperHandle.getClass.getSimpleName + e)
}
} finally {
if (jedis != null) {
closeJedis(jedis)
}
}
})
})
// 获取jedis连接
def getJedis: Jedis = {
val jedis = RedisPoolUtil.getPool.getResource
jedis
}
// 释放jedis连接
def closeJedis(jedis: Jedis): Unit = {
RedisPoolUtil.getPool.returnResource(jedis)
}
redis连接池代码RedisPoolUtil.scala
:
package com.js.ipflow.utils
import com.js.ipflow.start.ConfigFactory
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.JedisPool
/**
* redis 连接池工具类
* @author keguang
*/
object RedisPoolUtil extends Serializable{
@transient private var pool: JedisPool = null
/**
* 读取jedis配置信息, 出发jedis初始化
*/
def initJedis: Unit ={
ConfigFactory.initConfig()
val maxTotal = 50
val maxIdle = 30
val minIdle = 10
val redisHost = ConfigFactory.redishost
val redisPort = ConfigFactory.redisport
val redisTimeout = ConfigFactory.redistimeout
val redisPassword = ConfigFactory.redispassword
makePool(redisHost, redisPort, redisTimeout, redisPassword, maxTotal, maxIdle, minIdle)
}
def makePool(redisHost: String, redisPort: Int, redisTimeout: Int,redisPassword:String, maxTotal: Int, maxIdle: Int, minIdle: Int): Unit = {
init(redisHost, redisPort, redisTimeout, redisPassword, maxTotal, maxIdle, minIdle, true, false, 10000)
}
/**
* 初始化jedis连接池
* @param redisHost host
* @param redisPort 端口
* @param redisTimeout 连接redis超时时间
* @param redisPassword redis密码
* @param maxTotal 总的连接数
* @param maxIdle 最大空闲连接数
* @param minIdle 最小空闲连接数
* @param testOnBorrow
* @param testOnReturn
* @param maxWaitMillis
*/
def init(redisHost: String, redisPort: Int, redisTimeout: Int,redisPassword:String, maxTotal: Int, maxIdle: Int, minIdle: Int, testOnBorrow: Boolean, testOnReturn: Boolean, maxWaitMillis: Long): Unit = {
if (pool == null) {
val poolConfig = new GenericObjectPoolConfig()
poolConfig.setMaxTotal(maxTotal)
poolConfig.setMaxIdle(maxIdle)
poolConfig.setMinIdle(minIdle)
poolConfig.setTestOnBorrow(testOnBorrow)
poolConfig.setTestOnReturn(testOnReturn)
poolConfig.setMaxWaitMillis(maxWaitMillis)
pool = new JedisPool(poolConfig, redisHost, redisPort, redisTimeout,redisPassword)
val hook = new Thread {
override def run = pool.destroy()
}
sys.addShutdownHook(hook.run)
}
}
def getPool: JedisPool = {
if(pool == null){
initJedis
}
pool
}
}
3)结果保存到数据库
结果保存到mysql
,数据库,20
秒刷新一次数据库,前端展示刷新一次,就会重新查询一次数据库,做到实时统计展示pv,uv
的目的。
/**
* 插入数据
*
* @param data (addTab(datehour)+helperversion)
* @param tbName
* @param colNames
*/
def insertHelper(data: DStream[(String, Long)], tbName: String, colNames: String*): Unit = {
data.foreachRDD(rdd => {
val tmp_rdd = rdd.map(x => x._1.substring(11, 13).toInt)
if (!rdd.isEmpty()) {
val hour_now = tmp_rdd.max() // 获取当前结果中最大的时间,在数据恢复中可以起作用
rdd.foreachPartition(eachPartition => {
var jedis: Jedis = null
var conn: Connection = null
try {
jedis = getJedis
conn = MysqlPoolUtil.getConnection()
conn.setAutoCommit(false)
val stmt = conn.createStatement()
eachPartition.foreach(x => {
if (colNames.length == 7) {
val datehour = x._1.split("\t")(0)
val helperversion = x._1.split("\t")(1)
val date_hour = datehour.split(":")
val date = date_hour(0)
val hour = date_hour(1).toInt
val colName0 = colNames(0) // date
val colName1 = colNames(1) // hour
val colName2 = colNames(2) // count_all
val colName3 = colNames(3) // count
val colName4 = colNames(4) // helperversion
val colName5 = colNames(5) // datehour
val colName6 = colNames(6) // dh
val colValue0 = addYin(date)
val colValue1 = hour
val colValue2 = x._2.toInt
val colValue3 = jedis.scard(date + "_" + helperversion) // // 2018-07-08_10.0.1.22
val colValue4 = addYin(helperversion)
var colValue5 = if (hour < 10) "'" + date + " 0" + hour + ":00 " + helperversion + "'" else "'" + date + " " + hour + ":00 " + helperversion + "'"
val colValue6 = if (hour < 10) "'" + date + " 0" + hour + ":00'" else "'" + date + " " + hour + ":00'"
var sql = ""
if (hour == hour_now) { // uv只对现在更新
sql = s"insert into ${tbName}(${colName0},${colName1},${colName2},${colName3},${colName4},${colName5},${colName6}) values(${colValue0},${colValue1},${colValue2},${colValue3},${colValue4},${colValue5},${colValue6}) on duplicate key update ${colName2} = ${colValue2},${colName3} = ${colValue3}"
logger.warn(sql)
stmt.addBatch(sql)
} /* else {
sql = s"insert into ${tbName}(${colName0},${colName1},${colName2},${colName4},${colName5},${colName6}) values(${colValue0},${colValue1},${colValue2},${colValue4},${colValue5},${colValue6}) on duplicate key update ${colName2} = ${colValue2}"
}*/
} else if (colNames.length == 5) {
val date_hour = x._1.split(":")
val date = date_hour(0)
val hour = date_hour(1).toInt
val colName0 = colNames(0) // date
val colName1 = colNames(1) // hour
val colName2 = colNames(2) // helper_count_all
val colName3 = colNames(3) // helper_count
val colName4 = colNames(4) // dh
val colValue0 = addYin(date)
val colValue1 = hour
val colValue2 = x._2.toInt
val colValue3 = jedis.scard("helper_" + date) // // helper_2018-07-08
val colValue4 = if (hour < 10) "'" + date + " 0" + hour + ":00'" else "'" + date + " " + hour + ":00'"
var sql = ""
if (hour == hour_now) { // uv只对现在更新
sql = s"insert into ${tbName}(${colName0},${colName1},${colName2},${colName3},${colName4}) values(${colValue0},${colValue1},${colValue2},${colValue3},${colValue4}) on duplicate key update ${colName2} = ${colValue2},${colName3} = ${colValue3}"
logger.warn(sql)
stmt.addBatch(sql)
}
}
})
stmt.executeBatch() // 批量执行sql语句
conn.commit()
} catch {
case e: Exception => {
logger.error(e)
logger2.error(HelperHandle.getClass.getSimpleName + e)
}
} finally {
if (jedis != null) {
closeJedis(jedis)
}
if(conn != null){
conn.close()
}
}
})
}
})
}
// 计算当前时间距离次日零点的时长(毫秒)
def resetTime = {
val now = new Date()
val todayEnd = Calendar.getInstance
todayEnd.set(Calendar.HOUR_OF_DAY, 23) // Calendar.HOUR 12小时制
todayEnd.set(Calendar.MINUTE, 59)
todayEnd.set(Calendar.SECOND, 59)
todayEnd.set(Calendar.MILLISECOND, 999)
todayEnd.getTimeInMillis - now.getTime
}
msql 连接池代码MysqlPoolUtil.scala
package com.js.ipflow.utils
import java.sql.{Connection, PreparedStatement, ResultSet}
import com.js.ipflow.start.ConfigFactory
import org.apache.commons.dbcp.BasicDataSource
import org.apache.logging.log4j.LogManager
/**
*jdbc mysql 连接池工具类
* @author keguang
*/
object MysqlPoolUtil {
val logger = LogManager.getLogger(MysqlPoolUtil.getClass.getSimpleName)
private var bs:BasicDataSource = null
/**
* 创建数据源
* @return
*/
def getDataSource():BasicDataSource={
if(bs==null){
ConfigFactory.initConfig()
bs = new BasicDataSource()
bs.setDriverClassName("com.mysql.jdbc.Driver")
bs.setUrl(ConfigFactory.mysqlurl)
bs.setUsername(ConfigFactory.mysqlusername)
bs.setPassword(ConfigFactory.mysqlpassword)
bs.setMaxActive(50) // 设置最大并发数
bs.setInitialSize(20) // 数据库初始化时,创建的连接个数
bs.setMinIdle(20) // 在不新建连接的条件下,池中保持空闲的最少连接数。
bs.setMaxIdle(20) // 池里不会被释放的最多空闲连接数量。设置为0时表示无限制。
bs.setMaxWait(5000) // 在抛出异常之前,池等待连接被回收的最长时间(当没有可用连接时)。设置为-1表示无限等待。
bs.setMinEvictableIdleTimeMillis(10*1000) // 空闲连接5秒中后释放
bs.setTimeBetweenEvictionRunsMillis(1*60*1000) //1分钟检测一次是否有死掉的线程
bs.setTestOnBorrow(true)
}
bs
}
/**
* 释放数据源
*/
def shutDownDataSource(){
if(bs!=null){
bs.close()
}
}
/**
* 获取数据库连接
* @return
*/
def getConnection():Connection={
var con:Connection = null
try {
if(bs!=null){
con = bs.getConnection()
}else{
con = getDataSource().getConnection()
}
} catch{
case e:Exception => logger.error(e)
}
con
}
/**
* 关闭连接
*/
def closeCon(rs:ResultSet ,ps:PreparedStatement,con:Connection){
if(rs!=null){
try {
rs.close()
} catch{
case e:Exception => println(e.getMessage)
}
}
if(ps!=null){
try {
ps.close()
} catch{
case e:Exception => println(e.getMessage)
}
}
if(con!=null){
try {
con.close()
} catch{
case e:Exception => println(e.getMessage)
}
}
}
}
4)数据容错
流处理消费kafka都会考虑到数据丢失问题,一般可以保存到任何存储系统,包括mysql,hdfs,hbase,redis,zookeeper
等到。这里用SparkStreaming
自带的checkpoint
机制来实现应用重启时数据恢复。
checkpoint
这里采用的是checkpoint
机制,在重启或者失败后重启可以直接读取上次没有完成的任务,从kafka
对应offset
读取数据。
// 初始化配置文件
ConfigFactory.initConfig()
val conf = new SparkConf().setAppName(ConfigFactory.sparkstreamname)
conf.set("spark.streaming.stopGracefullyOnShutdown","true")
conf.set("spark.streaming.kafka.maxRatePerPartition",consumeRate)
conf.set("spark.default.parallelism","24")
val sc = new SparkContext(conf)
while (true){
val ssc = StreamingContext.getOrCreate(ConfigFactory.checkpointdir + DateUtil.getDay(0),getStreamingContext _ )
ssc.start()
ssc.awaitTerminationOrTimeout(resetTime)
ssc.stop(false,true)
}
checkpoint是每天一个目录,在第二天凌晨定时销毁StreamingContext对象,重新统计计算pv,uv。
注意
ssc.stop(false,true)
表示优雅地销毁StreamingContext
对象,不能销毁SparkContext
对象,ssc.stop(true,true)
会停掉SparkContext
对象,程序就直接停了。
应用迁移或者程序升级
在这个过程中,我们把应用升级了一下,比如说某个功能写的不够完善,或者有逻辑错误,这时候都是需要修改代码,重新打jar包的,这时候如果把程序停了,新的应用还是会读取老的checkpoint
,可能会有两个问题:
- 执行的还是上一次的程序,因为
checkpoint
里面也有序列化的代码;- 直接执行失败,反序列化失败;
其实有时候,修改代码后不用删除checkpoint
也是可以直接生效,经过很多测试,我发现如果对数据的过滤操作导致数据过滤逻辑改变,还有状态操作保存修改,也会导致重启失败,只有删除checkpoint才行,可是实际中一旦删除checkpoint
,就会导致上一次未完成的任务和消费kafka
的offset
丢失,直接导致数据丢失,这种情况下我一般这么做。
这种情况一般是在另外一个集群,或者把
checkpoint
目录修改下,我们是代码与配置文件分离,所以修改配置文件checkpoint
的位置还是很方便的。然后两个程序一起跑,除了checkpoint
目录不一样,会重新建,都插入同一个数据库,跑一段时间后,把旧的程序停掉就好。以前看官网这么说,只能记住不能清楚明了,只有自己做时才会想一下办法去保证数据准确。
5)日志
日志用的log4j2
,本地保存一份,ERROR
级别的日志会通过邮件发送到邮箱。
val logger = LogManager.getLogger(HelperHandle.getClass.getSimpleName)
// 邮件level=error日志
val logger2 = LogManager.getLogger("email")
3、主要代码
需要的maven
依赖:
org.apache.spark
spark-core_2.11
${spark.version}
provided
org.apache.spark
spark-streaming_2.11
${spark.version}
provided
mysql
mysql-connector-java
5.1.40
commons-dbcp
commons-dbcp
1.4
provided
读取配置文件代码ConfigFactory .java
:
package com.js.ipflow.start;
import com.google.common.io.Resources;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import java.io.File;
public class ConfigFactory {
private final static Logger log = LogManager.getLogger("email");
public static String kafkaipport;
public static String kafkazookeeper;
public static String kafkatopic;
public static String kafkagroupid;
public static String mysqlurl;
public static String mysqlusername;
public static String mysqlpassword;
public static String redishost;
public static int redisport;
public static String redispassword;
public static int redistimeout;
public static int rediskeyexists;
public static String sparkstreamname;
public static int sparkstreamseconds;
public static String sparkstreammaster = "spark://qcloud-spark01:7077"; // 仅供本地测试使用
public static String localpath;
public static String checkpointdir;
// public static String gracestopfile; // 优雅得kill掉程序
public static String keydeserilizer;
public static String valuedeserilizer;
/**
* 初始化所有的通用信息
*/
public static void initConfig(){readCommons();}
/**
* 读取commons.xml文件
*/
private static void readCommons(){
SAXReader reader = new SAXReader(); // 构建xml解析器
Document document = null;
try{
document = reader.read(Resources.getResource("commons.xml"));
}catch (DocumentException e){
log.error("ConfigFactory.readCommons",e);
}
if(document != null){
Element root = document.getRootElement();
Element kafkaElement = root.element("kafka");
kafkaipport = kafkaElement.element("ipport").getText();
kafkazookeeper = kafkaElement.element("zookeeper").getText();
kafkatopic = kafkaElement.element("topic").getText();
kafkagroupid = kafkaElement.element("groupid").getText();
keydeserilizer=kafkaElement.element("keySer").getText();
valuedeserilizer=kafkaElement.element("valSer").getText();
Element mysqlElement = root.element("mysql");
mysqlurl = mysqlElement.element("url").getText();
mysqlusername = mysqlElement.element("username").getText();
mysqlpassword = mysqlElement.element("password").getText();
Element redisElement = root.element("redis");
redishost = redisElement.element("host").getText();
redisport = Integer.valueOf(redisElement.element("port").getText());
redispassword = redisElement.element("password").getText();
redistimeout = Integer.valueOf(redisElement.element("timeout").getText());
rediskeyexists = Integer.valueOf(redisElement.element("keyexists").getText());
Element sparkElement = root.element("spark");
// sparkstreammaster = sparkElement.element("streammaster").getText();
sparkstreamname = sparkElement.element("streamname").getText();
sparkstreamseconds = Integer.valueOf(sparkElement.element("seconds").getText());
Element pathElement = root.element("path");
localpath = pathElement.element("localpath").getText();
checkpointdir = pathElement.element("checkpointdir").getText();
// gracestopfile = pathElement.element("gracestopfile").getText();
}else {
log.warn("commons.xml配置文件读取错误...");
}
}
}
主要业务代码,如下:
package com.js.ipflow.flash.helper
import java.sql.Connection
import java.util.{Calendar, Date}
import com.alibaba.fastjson.JSON
import com.js.ipflow.start.ConfigFactory
import com.js.ipflow.utils.{DateUtil, MysqlPoolUtil, RedisPoolUtil}
import kafka.serializer.StringDecoder
import org.apache.logging.log4j.LogManager
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import redis.clients.jedis.Jedis
object HelperHandle {
val logger = LogManager.getLogger(HelperHandle.getClass.getSimpleName)
// 邮件level=error日志
val logger2 = LogManager.getLogger("email")
def main(args: Array[String]): Unit = {
helperHandle(args(0))
}
def helperHandle(consumeRate: String): Unit = {
// 初始化配置文件
ConfigFactory.initConfig()
val conf = new SparkConf().setAppName(ConfigFactory.sparkstreamname)
conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
conf.set("spark.streaming.kafka.maxRatePerPartition", consumeRate)
conf.set("spark.default.parallelism", "30")
val sc = new SparkContext(conf)
while (true) {
val ssc = StreamingContext.getOrCreate(ConfigFactory.checkpointdir + DateUtil.getDay(0), getStreamingContext _)
ssc.start()
ssc.awaitTerminationOrTimeout(resetTime)
ssc.stop(false, true)
}
def getStreamingContext(): StreamingContext = {
val stateSpec = StateSpec.function(mapFunction)
val ssc = new StreamingContext(sc, Seconds(ConfigFactory.sparkstreamseconds))
ssc.checkpoint(ConfigFactory.checkpointdir + DateUtil.getDay(0))
val zkQuorm = ConfigFactory.kafkazookeeper
val topics = ConfigFactory.kafkatopic
val topicSet = Set(topics)
val kafkaParams = Map[String, String](
"metadata.broker.list" -> (ConfigFactory.kafkaipport)
, "group.id" -> (ConfigFactory.kafkagroupid)
, "auto.offset.reset" -> kafka.api.OffsetRequest.LargestTimeString
)
val rmessage = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicSet
)
// helper数据 (dateHour,guid,helperversion)
val helper_data = FilterHelper.getHelperData(rmessage.map(x => {
val message = JSON.parseObject(x._2).getString("message")
JSON.parseObject(message)
})).repartition(60).cache()
// (guid, datehour + helperversion)
val helper_data_dis = helper_data.map(x => (x._2, addTab(x._1) + x._3)).reduceByKey((x, y) => y)
// pv,uv
val helper_count = helper_data.map(x => (x._1, 1L)).mapWithState(stateSpec).stateSnapshots().repartition(2)
// helperversion
val helper_helperversion_count = helper_data.map(x => (addTab(x._1) + x._3, 1L)).mapWithState(stateSpec).stateSnapshots().repartition(2)
helper_data_dis.foreachRDD(rdd => {
rdd.foreachPartition(eachPartition => {
var jedis: Jedis = null
try {
jedis = getJedis
eachPartition.foreach(x => {
val arr = x._2.split("\t")
val date: String = arr(0).split(":")(0)
// helper 统计
val key0 = "helper_" + date
jedis.sadd(key0, x._1)
jedis.expire(key0, ConfigFactory.rediskeyexists)
// helperversion 统计
val key = date + "_" + arr(1)
jedis.sadd(key, x._1)
jedis.expire(key, ConfigFactory.rediskeyexists)
})
} catch {
case e: Exception => {
logger.error(e)
logger2.error(HelperHandle.getClass.getSimpleName + e)
}
} finally {
if (jedis != null) {
closeJedis(jedis)
}
}
})
})
insertHelper(helper_helperversion_count, "statistic_realtime_flash_helper", "date", "hour", "count_all", "count", "helperversion", "datehour", "dh")
insertHelper(helper_count, "statistic_realtime_helper_count", "date", "hour", "helper_count_all", "helper_count", "dh")
ssc
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// 计算当前时间距离次日零点的时长(毫秒)
def resetTime = {
val now = new Date()
val todayEnd = Calendar.getInstance
todayEnd.set(Calendar.HOUR_OF_DAY, 23) // Calendar.HOUR 12小时制
todayEnd.set(Calendar.MINUTE, 59)
todayEnd.set(Calendar.SECOND, 59)
todayEnd.set(Calendar.MILLISECOND, 999)
todayEnd.getTimeInMillis - now.getTime
}
/**
* 插入数据
*
* @param data (addTab(datehour)+helperversion)
* @param tbName
* @param colNames
*/
def insertHelper(data: DStream[(String, Long)], tbName: String, colNames: String*): Unit = {
data.foreachRDD(rdd => {
val tmp_rdd = rdd.map(x => x._1.substring(11, 13).toInt)
if (!rdd.isEmpty()) {
val hour_now = tmp_rdd.max() // 获取当前结果中最大的时间,在数据恢复中可以起作用
rdd.foreachPartition(eachPartition => {
var jedis: Jedis = null
var conn: Connection = null
try {
jedis = getJedis
conn = MysqlPoolUtil.getConnection()
conn.setAutoCommit(false)
val stmt = conn.createStatement()
eachPartition.foreach(x => {
if (colNames.length == 7) {
val datehour = x._1.split("\t")(0)
val helperversion = x._1.split("\t")(1)
val date_hour = datehour.split(":")
val date = date_hour(0)
val hour = date_hour(1).toInt
val colName0 = colNames(0) // date
val colName1 = colNames(1) // hour
val colName2 = colNames(2) // count_all
val colName3 = colNames(3) // count
val colName4 = colNames(4) // helperversion
val colName5 = colNames(5) // datehour
val colName6 = colNames(6) // dh
val colValue0 = addYin(date)
val colValue1 = hour
val colValue2 = x._2.toInt
val colValue3 = jedis.scard(date + "_" + helperversion) // // 2018-07-08_10.0.1.22
val colValue4 = addYin(helperversion)
var colValue5 = if (hour < 10) "'" + date + " 0" + hour + ":00 " + helperversion + "'" else "'" + date + " " + hour + ":00 " + helperversion + "'"
val colValue6 = if (hour < 10) "'" + date + " 0" + hour + ":00'" else "'" + date + " " + hour + ":00'"
var sql = ""
if (hour == hour_now) { // uv只对现在更新
sql = s"insert into ${tbName}(${colName0},${colName1},${colName2},${colName3},${colName4},${colName5},${colName6}) values(${colValue0},${colValue1},${colValue2},${colValue3},${colValue4},${colValue5},${colValue6}) on duplicate key update ${colName2} = ${colValue2},${colName3} = ${colValue3}"
logger.warn(sql)
stmt.addBatch(sql)
} /* else {
sql = s"insert into ${tbName}(${colName0},${colName1},${colName2},${colName4},${colName5},${colName6}) values(${colValue0},${colValue1},${colValue2},${colValue4},${colValue5},${colValue6}) on duplicate key update ${colName2} = ${colValue2}"
}*/
} else if (colNames.length == 5) {
val date_hour = x._1.split(":")
val date = date_hour(0)
val hour = date_hour(1).toInt
val colName0 = colNames(0) // date
val colName1 = colNames(1) // hour
val colName2 = colNames(2) // helper_count_all
val colName3 = colNames(3) // helper_count
val colName4 = colNames(4) // dh
val colValue0 = addYin(date)
val colValue1 = hour
val colValue2 = x._2.toInt
val colValue3 = jedis.scard("helper_" + date) // // helper_2018-07-08
val colValue4 = if (hour < 10) "'" + date + " 0" + hour + ":00'" else "'" + date + " " + hour + ":00'"
var sql = ""
if (hour == hour_now) { // uv只对现在更新
sql = s"insert into ${tbName}(${colName0},${colName1},${colName2},${colName3},${colName4}) values(${colValue0},${colValue1},${colValue2},${colValue3},${colValue4}) on duplicate key update ${colName2} = ${colValue2},${colName3} = ${colValue3}"
logger.warn(sql)
stmt.addBatch(sql)
}
}
})
stmt.executeBatch() // 批量执行sql语句
conn.commit()
} catch {
case e: Exception => {
logger.error(e)
logger2.error(HelperHandle.getClass.getSimpleName + e)
}
} finally {
if (jedis != null) {
closeJedis(jedis)
}
if(conn != null){
conn.close()
}
}
})
}
})
}
def addYin(str: String): String = {
"'" + str + "'"
}
// 字符串添加tab格式化方法
def addTab(str: String): String = {
str + "\t";
}
// 实时流量状态更新函数
val mapFunction = (datehour: String, pv: Option[Long], state: State[Long]) => {
val accuSum = pv.getOrElse(0L) + state.getOption().getOrElse(0L)
val output = (datehour, accuSum)
state.update(accuSum)
output
}
// 获取jedis连接
def getJedis: Jedis = {
val jedis = RedisPoolUtil.getPool.getResource
jedis
}
// 释放jedis连接
def closeJedis(jedis: Jedis): Unit = {
RedisPoolUtil.getPool.returnResource(jedis)
}
}
分享一个大神的人工智能教程。零基础!通俗易懂!风趣幽默!还带黄段子!希望你也加入到人工智能的队伍中来!
微信公众号
我的微信公众号,专注于大数据分析与挖掘,感兴趣可以关注,看一看,瞧一瞧!
文章评论