怎么使用SpringBoot定时任务实现数据同步


这篇文章主要介绍“怎么使用SpringBoot定时任务实现数据同步”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“怎么使用SpringBoot定时任务实现数据同步”文章能帮助大家解决问题。

前言

业务的需求是,通过中台调用api接口获得,设备数据,要求现实设备数据的同步。

方案一:通过轮询接口的方式执行 pullData() 方法实现数据同步

该方式的原理是先清空之前的所有数据,然后重新插入通过api调用获取的最新数据。该方法的优点,逻辑简单。缺点是,频繁删除、插入数据。再调用查询数据时候,某一时刻,数据全部删除,还没及时插入的时候。数据可能有异常。

方案二:通过轮询接口的方式执行 pullDataNew() 方法实现数据同步

该方式的原理是先查询数据库,已有数据,然后和通过api调用获取的最新数据进行比对,找出数据中增量、减量和变量,进行同步更新。该方法的优点,减少对数据库的频繁操作,提升性能。缺点:无发现明显缺点。

package&nbsp.hxtx.spacedata.task;import&nbsp.alibaba.fastjson.JSON;import&nbsp.alibaba.fastjson.JSONArray;import&nbsp.alibaba.fastjson.JSONObject;import&nbsp.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;import&nbsp.baomidou.mybatisplus.core.toolkit.CollectionUtils;import&nbsp.google.api.client.util.Lists;import&nbsp.hxtx.spacedatamon.domain.ResponseDTO;import&nbsp.hxtx.spacedata.config.SpringContextUtil;import&nbsp.hxtx.spacedata.controller.file.FilesMinioController;import&nbsp.hxtx.spacedata.domain.entity.entityconfig.EntityPointEntity;import&nbsp.hxtx.spacedata.service.entityconfig.EntityPointService;import&nbsp.hxtx.spacedata.util.HttpProxyUtil;importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.scheduling.annotation.Scheduled;importorg.springframework.stereotypeponent;importorg.springframework.transaction.annotation.Transactional;importjava.util.List;importjava.util.Map;importjava.util.Objects;importjava.util.stream.Collectors;/***中台设备数据定时任务执行**@authorTarzanLiu*@version1.0.0*@description*@date2023/12/07*/ponent@Slf4jpublicclassEntityPointTask{@AutowiredprivateEntityPointServiceentityPointService;@Value("${middleGround.server.host}")privateStringhost;@Value("${middleGround.server.port}")privateStringport;privatestaticFilesMinioControllerfilesMinioController=SpringContextUtil.getBean(FilesMinioController.class);/***设备定义点数据拉取**@authortarzanLiu*@date2023/12/2*/@Scheduled(cron="0/30****?")//30秒校验一次publicvoidpullDataTaskByCorn(){Stringresult=HttpProxyUtil.sendGet(""+host+":"+port+"/interface/system/list");JSONObjectjsonObject=JSON.parseObject(result);if(Objects.nonNull(jsonObject)){JSONArrayarray=jsonObject.getJSONArray("data");if(array!=null&&array.size()!=0){for(inti=0;i<array.size();i++){JSONObjectobj=array.getJSONObject(i);StringsystemId=obj.getString("id");pullDataNew(systemId);}}}}@Transactional(rollbackFor=Throwable.class)publicResponseDTO<String>pullData(Stringcode){List<EntityPointEntity>list=Lists.newArrayList();Stringresult=HttpProxyUtil.sendGet(""+host+":"+port+"/interface/defintionView/listBySystemId/"+code);JSONObjectjsonObject=JSON.parseObject(result);if(Objects.nonNull(jsonObject)){JSONArrayarray=jsonObject.getJSONArray("data");if(array!=null&&array.size()!=0){for(inti=0;i<array.size();i++){JSONObjectobj=array.getJSONObject(i);StringpointId=obj.getString("pointId");Stringname=obj.getString("name");list.add(EntityPointEntity.builder().pointId(pointId).name(name).code(code).build());}List<EntityPointEntity>existList=entityPointService.list(newLambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode,code).isNotNull(EntityPointEntity::getValue));if(CollectionUtils.isNotEmpty(existList)){Map<String,String>existMap=existList.stream().collect(Collectors.toMap(EntityPointEntity::getPointId,EntityPointEntity::getValue));list.forEach(e->{Stringvalue=existMap.get(e.getPointId());if(value!=null){e.setValue(value);}});}entityPointService.remove(newLambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode,code));entityPointService.saveBatch(list);}}returnResponseDTO.succ();}@Transactional(rollbackFor=Throwable.class)publicResponseDTO<String>pullDataNew(Stringcode){Stringresult=HttpProxyUtil.sendGet(""+host+":"+port+"/interface/defintionView/listBySystemId/"+code);JSONObjectjsonObject=JSON.parseObject(result);if(Objects.nonNull(jsonObject)){JSONArraydata=jsonObject.getJSONArray("data");List<EntityPointEntity>list=data.toJavaList(EntityPointEntity.class);if(CollectionUtils.isNotEmpty(list)){list.forEach(e->e.setCode(code));List<EntityPointEntity>existList=entityPointService.list(newLambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode,code));if(CollectionUtils.isNotEmpty(existList)){//存在mapMap<String,String>existMap=existList.stream().collect(Collectors.toMap(EntityPointEntity::getPointId,EntityPointEntity::getName));//传输mapMap<String,String>dataMap=list.stream().collect(Collectors.toMap(EntityPointEntity::getPointId,EntityPointEntity::getName));//增量List<EntityPointEntity>increment=list.stream().filter(e->existMap.get(e.getPointId())==null).collect(Collectors.toList());if(CollectionUtils.isNotEmpty(increment)){entityPointService.saveBatch(increment);}//减量List<EntityPointEntity>decrement=existList.stream().filter(e->dataMap.get(e.getPointId())==null).collect(Collectors.toList());if(CollectionUtils.isNotEmpty(decrement)){entityPointService.removeByIds(decrement.stream().map(EntityPointEntity::getId).collect(Collectors.toList()));}//变量List<EntityPointEntity>variable=existList.stream().filter(e->dataMap.get(e.getPointId())!=null&&!dataMap.get(e.getPointId()).equals(e.getName())).collect(Collectors.toList());if(CollectionUtils.isNotEmpty(variable)){variable.forEach(e->{e.setName(dataMap.get(e.getPointId()));});entityPointService.updateBatchById(variable);}}else{entityPointService.saveBatch(list);}}}returnResponseDTO.succ();}}

数据库对应实体类

import&nbsp.baomidou.mybatisplus.annotation.IdType;import&nbsp.baomidou.mybatisplus.annotation.TableId;import&nbsp.baomidou.mybatisplus.annotation.TableName;importlombok.AllArgsConstructor;importlombok.Builder;importlombok.Data;importlombok.NoArgsConstructor;importjava.io.Serializable;importjava.util.Date;@Builder@NoArgsConstructor@AllArgsConstructor@Data@TableName(value="t_entity_point")publicclassEntityPointEntityimplementsSerializable{privatestaticfinallongserialVersionUID=2181036545424452651L;/***定义点id*/@TableId(value="id",type=IdType.ASSIGN_ID)privateLongid;/***定义点id*/privateStringpointId;/***名称*/privateStringname;/***绘制数据*/privateStringvalue;/***编码*/privateStringcode;/***创建时间*/privateDatecreateTime;}

HTTP请求代理工具类

importlombok.extern.slf4j.Slf4j;importorg.apache.http.Consts;importorg.apache.http.HttpEntity;importorg.apache.http.HttpStatus;importorg.apache.http.NameValuePair;importorg.apache.http.client.config.RequestConfig;importorg.apache.http.client.entity.UrlEncodedFormEntity;importorg.apache.http.client.methods.CloseableHttpResponse;importorg.apache.http.client.methods.HttpPost;importorg.apache.http.conn.ssl.SSLConnectionSocketFactory;importorg.apache.http.conn.ssl.TrustStrategy;importorg.apache.http.impl.client.CloseableHttpClient;importorg.apache.http.impl.client.HttpClients;importorg.apache.http.message.BasameValuePair;importorg.apache.http.ssl.SSLContextBuilder;importorg.apache.http.util.EntityUtils;importjavax.net.ssl.SSLContext;importjava.io.BufferedReader;importjava.io.InputStreamReader;importjava.io.PrintWriter;importjava.net.URL;importjava.net.URLConnection;importjava.security.cert.CertificateException;importjava.security.cert.X509Certificate;importjava.util.ArrayList;importjava.util.List;importjava.util.Map;/***HTTP请求代理类**@authortarzanLiu*@description发送GetPost请求*/@Slf4jpublicclassHttpProxyUtil{/***使用URLConnection进行GET请求**@paramapi_url*@return*/publicstaticStringsendGet(Stringapi_url){returnsendGet(api_url,"","utf-8");}/***使用URLConnection进行GET请求**@paramapi_url*@paramparam*@return*/publicstaticStringsendGet(Stringapi_url,Stringparam){returnsendGet(api_url,param,"utf-8");}/***使用URLConnection进行GET请求**@paramapi_url请求路径*@paramparam请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值,可以为空*@paramcharset字符集*@return*/publicstaticStringsendGet(Stringapi_url,Stringparam,Stringcharset){StringBufferbuffer=newStringBuffer();try{//判断有无参数,若是拼接好的url,就不必再拼接了if(param!=null&&!"".equals(param)){api_url=api_url+"?"+param;}log.info("请求的路径是:"+api_url);URLrealUrl=newURL(api_url);//打开联接URLConnectionconn=realUrl.openConnection();//设置通用的请求属性conn.setRequestProperty("accept","*/*");conn.setRequestProperty("connection","Keep-Alive");conn.setRequestProperty("user-agent","Mozilla/4.0patible;MSIE6.0;WindowsNT5.1;SV1)");conn.setConnectTimeout(12000);//设置连接主机超时(单位:毫秒)conn.setReadTimeout(12000);//设置从主机读取数据超时(单位:毫秒)conn.connect();//建立实际的联接//定义BufferedReader输入流来读取URL的相应try(BufferedReaderin=newBufferedReader(newInputStreamReader(conn.getInputStream(),charset))){Stringline;while((line=in.readLine())!=null){//buffer.append("\n"+line);buffer.append(line);}}}catch(Exceptione){log.error("发送GET请求出现异常!"+e.getMessage());returnnull;}//log.info("响应返回数据:"+buffer.toString());returnbuffer.toString();}/***使用URLConnection进行POST请求**@paramapi_url请求路径*@paramparam请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值,最好不为空*@return*/publicstaticStringsendPost(Stringapi_url,Stringparam){returnsendPost(api_url,param,"utf-8");}/***使用URLConnection进行POST请求**@paramapi_url请求路径*@paramparam请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值,最好不为空*@paramcharset字符集*@return*/publicstaticStringsendPost(Stringapi_url,Stringparam,Stringcharset){StringBufferbuffer=newStringBuffer();try{log.info("请求的路径是:"+api_url+",参数是:"+param);URLrealUrl=newURL(api_url);//打开联接URLConnectionconn=realUrl.openConnection();//设置通用的请求属性conn.setRequestProperty("accept","*/*");conn.setRequestProperty("connection","Keep-Alive");conn.setRequestProperty("user-agent","Mozilla/4.0patible;MSIE6.0;WindowsNT5.1;SV1)");conn.setConnectTimeout(12000);//设置连接主机超时(单位:毫秒)conn.setReadTimeout(12000);//设置从主机读取数据超时(单位:毫秒)//发送POST请求必须设置如下两行conn.setDoOutput(true);conn.setDoInput(true);//获取URLConnection对象对应的输出流try(PrintWriterout=newPrintWriter(conn.getOutputStream())){out.print(param);//发送请求参数out.flush();//flush输出流的缓冲}//定义BufferedReader输入流来读取URL的相应,得指明使用UTF-8编码,否则到API服务器XML的中文不能被成功识别try(BufferedReaderin=newBufferedReader(newInputStreamReader(conn.getInputStream(),charset))){Stringline;while((line=in.readLine())!=null){//buffer.append("\n"+line);buffer.append(line);}}}catch(Exceptione){log.error("发送POST请求出现异常!"+e.getMessage());e.printStackTrace();}log.info("响应返回数据:"+buffer.toString());returnbuffer.toString();}publicstaticCloseableHttpClientcreateSSLClientDefault()throwsException{SSLContextsslContext=newSSLContextBuilder().loadTrustMaterial(null,newAllTrustStrategy()).build();SSLConnectionSocketFactorysslSf=newSSLConnectionSocketFactory(sslContext);returnHttpClients.custom().setSSLSocketFactory(sslSf).build();}//加载证书privatestaticclassAllTrustStrategyimplementsTrustStrategy{publicbooleanisTrusted(X509Certificate[]x509Certificates,Strings)throwsCertificateException{returntrue;}}/***支持http请求**@paramurl*@paramparam*@return*@throwsException*/publicstaticStringsendHttpClientPost(Stringurl,Map<String,String>param)throwsException{CloseableHttpClienthttpClient=createSSLClientDefault();HttpPosthttpPost=null;CloseableHttpResponseresponse=null;Stringresult="";try{//发起HTTP的POST请求httpPost=newHttpPost(url);List<NameValuePair>paramList=newArrayList<NameValuePair>();for(Stringkey:param.keySet()){paramList.add(newBasameValuePair(key,param.get(key)));}log.info("http请求地址:"+url+",参数:"+paramList.toString());//UTF8+URL编码httpPost.setEntity(newUrlEncodedFormEntity(paramList,Consts.UTF_8));httpPost.setConfig(RequestConfig.custom().setConnectTimeout(30000).setSocketTimeout(30000).build());response=httpClient.execute(httpPost);HttpEntityentity=response.getEntity();intstatusCode=response.getStatusLine().getStatusCode();if(HttpStatus.SC_OK==statusCode){//如果响应码是200}result=EntityUtils.toString(entity);log.info("状态码:"+statusCode+",响应信息:"+result);}finally{if(response!=null){response.close();}if(httpPost!=null){httpPost.releaseConnection();}httpClient.close();}returnresult;}}

关于“怎么使用SpringBoot定时任务实现数据同步”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识,可以关注主机评测网行业资讯频道,小编每天都会为大家更新不同的知识点。


上一篇:怎么使用Python数据可视化制作全球地震散点图

下一篇:如何使用Swift?Package插件生成代码


Copyright © 2002-2019 测速网 www.inhv.cn 皖ICP备2023010105号
测速城市 测速地区 测速街道 网速测试城市 网速测试地区 网速测试街道
温馨提示:部分文章图片数据来源与网络,仅供参考!版权归原作者所有,如有侵权请联系删除!

热门搜索 城市网站建设 地区网站制作 街道网页设计 大写数字 热点城市 热点地区 热点街道 热点时间 房贷计算器