2 迁移实现
2.1 从数据库读取进件id存到文件中
@RequestMapping(value = "***", method = {RequestMethod.GET}) @ResponseBody public void writeFile(Integer id) { if (null == id) { pieceSearchService.writePieceFile(); } else { pieceSearchService.writePieceFile(id); } } // PieceSearchServiceImpl public void writePieceFile() { dataMoveManager.startWriteFile(); } 复制代码
// AbstractTemplate public void startWriteFile() { readDataAndWriteFile(); } // DataMoveManager protected void readDataAndWriteFile() { pieceInfoWriteFileTask.start(); } // AbstractTaskTemplate public void start() { List<? extends Callable<String>> task = createTask(); ExecutorService executorService = createExecutorService(); List list = runTaskForResult(executorService, task); writeLogFile(list); } // PieceInfoWriteFileTask protected List<? extends Callable<String>> createTask() { //获取进件起始id 并构建任务 //查询所有进件数据(500W+) List<PieceIdAndIdDTO> pieceIdAndIdDTOS = pieceSearchDao.queryAllPieceInfo(); List<MyTask> tasks = this.getTaskFromPieceId(pieceIdAndIdDTOS); return tasks; } // PieceInfoWriteFileTask List<MyTask> getTaskFromPieceId(List<PieceIdAndIdDTO> pieceIdAndIdDTOS) { //任务集合 List<MyTask> result = Lists.newArrayList(); Integer index = 1; int fileName = 1; List<PieceIdAndIdDTO> ids = Lists.newArrayList(); for (PieceIdAndIdDTO pieceIdAndIdDTO : pieceIdAndIdDTOS) { ids.add(pieceIdAndIdDTO); //每 条创建一个任务 if (index % FILE_MAX_NUM == 0) { //创建任务 MyTask myTask = new MyTask(); myTask.setIds(ids); myTask.setFileName(Integer.toString(fileName)); result.add(myTask); ids = Lists.newArrayList(); //一定要new fileName++; } index++; } if (!ids.isEmpty()) { MyTask myTask = new MyTask(); myTask.setIds(ids); myTask.setFileName(Integer.toString(fileName)); result.add(myTask); } return result; } // PieceInfoWriteFileTask protected ExecutorService createExecutorService() { //创建线程池 ExecutorService executorService = Executors.newFixedThreadPool(MAX_THREAD_NUM); return executorService; } // AbstractTaskTemplate private List runTaskForResult(ExecutorService executorService, List<? extends Callable<String>> tasks) { //线程池执行任务 List<Future<String>> futures = new ArrayList<>(); try { futures = executorService.invokeAll(tasks); } catch (InterruptedException e) { e.printStackTrace(); } finally { //关闭线程池 executorService.shutdown(); } //组装返回结果 List<String> results = Lists.newArrayList(); for (Future<String> future : futures) { try { results.add(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } return results; } // PieceInfoWriteFileTask protected void writeLogFile(List results) { //日志文件名称 String fileName = "writeFile.log"; //文件路径 String filePath = ABSOLUTE_PATH + FILE_ROOT_PATH + fileName; //按行写出 try { FileKit.writeLines(results, filePath, "UTF-8"); } catch (IOException e) { e.printStackTrace(); log.error("class:PieceInfoWriteFileTask method: writeLogFile()" + " 日志信息写出失败, 失败原因:" + e.getMessage()); } } 复制代码
// PieceInfoWriteFileTask class MyTask implements Callable<String> { private List<PieceIdAndIdDTO> ids; private String fileName; @Override public String call() { String message = "文件生成失败"; try { message = writeFile(ids, fileName); long l2 = System.currentTimeMillis(); } catch (Exception e) { return message + "----失败原因:" + e.getMessage(); } return message; } } // PieceInfoWriteFileTask private String writeFile(List<PieceIdAndIdDTO> pieceIdAndId, String fileName) { //文件路径+文件名 String filePath = ABSOLUTE_PATH + FILE_ROOT_PATH + fileName + ".txt"; try { log.info("文件输出路径:" + filePath); //输出文件 FileKit.writeLines(pieceIdAndId, filePath, "UTF-8"); } catch (IOException e) { return "此文件生成失败:" + fileName + "失败原因:" + e.getMessage(); } return "此文件生成成功:" + fileName; } 复制代码
2.2 数据迁移
2.2.1 从文件读取id存到队列中
@RequestMapping(value = "***", method = {RequestMethod.GET}) @ResponseBody public void dataMove() { pieceSearchService.pieceDataMove(); } // PieceSearchServiceImpl public void pieceDataMove() { dataMoveManager.startReadForDb(); } 复制代码
// AbstractTemplate public void startReadForDb() { readFileForDb(); } // DataMoveManager protected void readFileForDb() { pieceInfoDataMoveTask.start(); } // AbstractTaskTemplate public void start() { List<? extends Callable<String>> task = createTask(); ExecutorService executorService = createExecutorService(); List list = runTaskForResult(executorService, task); writeLogFile(list); } // PieceInfoDataMoveTask protected List<? extends Callable<String>> createTask() { //获取此路径下的所有.txt文件 List<File> files = null; try { files = FileKit.listFilesWithSuffix(new File(ABSOLUTE_PATH + FILE_ROOT_PATH), ".txt"); } catch (Exception e) { e.printStackTrace(); log.error("class:PieceInfoDataMoveTask method: createTask()" + " 创建任务组失败, 失败原因:" + e.getMessage()); } //根据文件构建任务 List<MyTask> tasks = getTaskWithFile(files); return tasks; } // PieceInfoDataMoveTask private List<MyTask> getTaskWithFile(List<File> files) { ArrayList<MyTask> tasks = Lists.newArrayList(); for (File file : files) { MyTask myTask = new MyTask(); myTask.setFile(file); tasks.add(myTask); } return tasks; } // PieceInfoDataMoveTask private class MyTask implements Callable<String> { private File file; @Override public String call() throws Exception { //每次保存的进件Id ArrayList<String> ids = new ArrayList<>(); //开始时间 long beginTime = System.currentTimeMillis(); //文件名称 String fileName = file.getName(); BufferedReader reader = null; //行数据 String line; int a = 1; try { reader = FileKit.getReader(file, "UTF-8"); while (true) { line = reader.readLine(); if (Func.isEmpty(line)) { break; } //每行数据格式为 id:1121 pieceId:ZPTE1213 String id = line.split("\\s+")[0].split(":")[1]; ids.add(id); //每多少行 执行插入操作 if (a % MAX_SAVE_NUM == 0) { //存入队列 QueueUtil.QUEUE_IDS.put(ids); ids = Lists.newArrayList(); } a++; } } catch (Exception e) { return "读取" + fileName + "文件失败原因:" + e.getMessage(); } finally { if (!ids.isEmpty()) { //存入队列 QueueUtil.QUEUE_IDS.put(ids); } IoKit.close(reader); } long useTime = (System.currentTimeMillis() - beginTime) / 1000; return "读取" + fileName + "文件完毕,耗时:" + useTime + "秒 , 文件共:" + (a - 1) + "条数据"; } } // QueueUtil public class QueueUtil { /** *队列一:存放从文件中读取的id集合 */ public static final LinkedBlockingQueue<List<String>> QUEUE_IDS = new LinkedBlockingQueue<List<String>>(); /** * 队列二:存放组装数据过后的List<JsonObject> */ public static final LinkedBlockingQueue<List<JSONObject>> QUEUE_JSON_OBJECTS = new LinkedBlockingQueue<List<JSONObject>>(); /** *队列三:存放所有失败id集合 */ public static final LinkedBlockingQueue<List<String>> QUEUE_FAIL_IDS = new LinkedBlockingQueue<List<String>>(); /** *队列四:存放所有校验过的集合 */ public static final LinkedBlockingQueue<SaveToMysqlDTO> QUEUE_SAVETOMYSQL = new LinkedBlockingQueue<SaveToMysqlDTO>(); /** *队列五:存放所有校验过的集合 */ public static final LinkedBlockingQueue<List<Document>> QUEUE_SAVETOMONGO = new LinkedBlockingQueue<List<Document>>(); /** *队列六:存放所有校验过的集合 */ public static final LinkedBlockingQueue<Map<Long,PieceJsonDTO>> QUEUE_SAVETOES = new LinkedBlockingQueue< Map<Long,PieceJsonDTO>>(); } 复制代码
// PieceInfoDataMoveTask protected ExecutorService createExecutorService() { //创建线程池 ExecutorService executorService = Executors.newFixedThreadPool(Integer.valueOf(MAX_THREAD_NUM)); return executorService; } // AbstractTaskTemplate private List runTaskForResult(ExecutorService executorService, List<? extends Callable<String>> tasks) { //线程池执行任务 List<Future<String>> futures = new ArrayList<>(); try { futures = executorService.invokeAll(tasks); } catch (InterruptedException e) { e.printStackTrace(); } finally { //关闭线程池 executorService.shutdown(); } //组装返回结果 List<String> results = Lists.newArrayList(); for (Future<String> future : futures) { try { results.add(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } return results; } // PieceInfoDataMoveTask protected void writeLogFile(List results) { } 复制代码
2.2.2 消费进件id队列(QUEUE_IDS)
//spring 配置文件加载bean 调用init() 方法 <bean class="com.migration.eagle.manager.PieceSearchManager" init-method="init" /> /** * 启动线程 */ // PieceSearchManager private void init() { //组装JsonObject readQueueForAssembleJsonThread.start(); log.info("线程1启动完毕,正在读取ids队列!!!!"); //校验数据 readQueueForValidateThread.start(); log.info("线程2启动完毕,正在读取List<JsonObject>队列!!!!"); //入库数据 readQueueForSaveMysqlThread.start(); log.info("线程3启动完毕,正在读取saveToMysql队列!!!!"); //入库Mongo readQueueForSaveMongoThread.start(); log.info("线程4启动完毕,正在读取saveToMongo队列!!!!"); //入库es readQueueForSaveESThread.start(); log.info("线程5启动完毕,正在读取saveToES队列!!!!"); //写出失败日志 readQueueForWriteLogThread.start(); log.info("线程6启动完毕,正在读取失败ids队列!!!!"); } 复制代码
public class ReadQueueForAssembleJsonThread extends Thread { @Autowired PieceAssembleManager pieceAssembleManager; @Override public void run() { //创建线程池 //任务数 int countThread = Integer.valueOf(new Config().getMaxAssembleJsonThreadNum()); ExecutorService executorService = Executors.newFixedThreadPool(countThread); for (int i = 0; i < countThread; i++) { MyTask task = new MyTask(); executorService.execute(task); } executorService.shutdown(); } @Data class MyTask implements Runnable { @Override public void run() { while (true) { try { //读取队列中的ids List<String> ids = QueueUtil.QUEUE_IDS.take(); if (null != ids) { long l1 = System.currentTimeMillis(); //组装数据 Map<String, List> pieceJson_map = pieceAssembleManager.getPieceJson(ids); //存入队列 QueueUtil.QUEUE_JSON_OBJECTS.put(pieceJson_map.get("success")); //存入失败ids队列 QueueUtil.QUEUE_FAIL_IDS.put(pieceJson_map.get("fail")); long l2 = System.currentTimeMillis(); log.info("组装JSON耗时:" + (l2 - l1)," ,队列大小: "+QueueUtil.QUEUE_IDS.size()); ids=null; pieceJson_map=null; } } catch (Exception e) { log.error("组装数据失败,失败原因: "+e.getMessage()); e.printStackTrace(); } } } } } 复制代码
2.2.3 消费JsonObject队列(QUEUE_JSON_OBJECTS)
public class ReadQueueForValidateThread extends Thread { @Autowired PieceSearchManager pieceSearchManager; @Override public void run() { //创建线程池 int countThread = Integer.valueOf(new Config().getMaxValidateThreadNum()); ExecutorService executorService = Executors.newFixedThreadPool(countThread); List<JSONObject> jsonObjects = Lists.newArrayList(); while (true) { try { //从队列中读取List<JsonObject> List<JSONObject> poll = QueueUtil.QUEUE_JSON_OBJECTS.take(); if (null != poll) { //创建任务 for (int i = 1; i <= poll.size(); i++) { jsonObjects.add(poll.get(i - 1)); if (i % 100 == 0) { MyTask myTask = new MyTask(); myTask.setJsonObjects(jsonObjects); executorService.execute(myTask); jsonObjects = Lists.newArrayList(); } } if (!jsonObjects.isEmpty()) { MyTask myTask = new MyTask(); myTask.setJsonObjects(jsonObjects); executorService.execute(myTask); jsonObjects = Lists.newArrayList(); } log.info("校验队列大小: "+QueueUtil.QUEUE_JSON_OBJECTS.size()); } } catch (Exception e) { e.printStackTrace(); } } } @Data class MyTask implements Runnable { private List<JSONObject> jsonObjects; @Override public void run() { long l = System.currentTimeMillis(); SaveToMysqlDTO dto = new SaveToMysqlDTO(); //mongo队列所需数据 List<Document> toMongos = Lists.newArrayList(); //es队列所需数据 Map<Long,PieceJsonDTO> id_PieceJsonDTO =Maps.newHashMap(); List<InvestigatePieceEntity> investigatePieceDTOS = Lists.newArrayList(); List<PieceDetailEntity> pieceDetailDTOS = Lists.newArrayList(); List<PieceContactEntity> pieceContactDTOS = Lists.newArrayList(); //失败结果 List<String> failedJson = Lists.newArrayList(); try { for (JSONObject finalObj : jsonObjects) { Map<String, JSONObject> map = pieceSearchManager.validatePiece(finalObj); if (map.containsKey("successData")) { pieceSearchManager.savePiecesLast(map.get("successData"), investigatePieceDTOS, pieceContactDTOS, pieceDetailDTOS,id_PieceJsonDTO); //增加id(存入mongo使用) JsonObject jsonObject = JsonKit.parseJsonObject(finalObj.toString()); JsonElement appkey = JsonKit.getValueByPath(jsonObject, MongoPieceManagerImpl.PIECE_CODE_JSON_PATH); jsonObject.add(MongoPieceManagerImpl.MONGO_ID, appkey); toMongos.add(Document.parse(jsonObject.toString())); } else { failedJson.add(((Map) ((Map) map.get("failed").get("data")).get("applicant")).get("intpc_id").toString()); } map=null; } dto.setInvestigatePieceDTOS(investigatePieceDTOS); dto.setPieceContactDTOS(pieceContactDTOS); dto.setPieceDetailDTOS(pieceDetailDTOS); //存入失败队列 QueueUtil.QUEUE_FAIL_IDS.put(failedJson); //存入入库队列 QueueUtil.QUEUE_SAVETOMYSQL.put(dto); QueueUtil.QUEUE_SAVETOMONGO.put(toMongos); QueueUtil.QUEUE_SAVETOES.put(id_PieceJsonDTO); long l1 = System.currentTimeMillis(); log.info("校验耗时:" + (l1 - l)); dto=null; toMongos=null; id_PieceJsonDTO=null; failedJson=null; jsonObjects=null; } catch (Exception e) { log.error("校验失败,失败原因: "+e.getMessage()); e.printStackTrace(); } } } } 复制代码
2.2.4 消费保存到数据库数据队列(QUEUE_SAVETOMYSQL)
public class ReadQueueForSaveMysqlThread extends Thread { @Autowired PieceSearchManager pieceSearchManager; @Override public void run() { //创建线程池 //任务数 int countThread = Integer.valueOf(new Config().getMaxSaveMysqlThreadNum()); ExecutorService executorService = Executors.newFixedThreadPool(countThread); for (int i = 0; i < countThread; i++) { MyTask task = new MyTask(); executorService.execute(task); } executorService.shutdown(); } @Data class MyTask implements Runnable { @Override public void run() { while (true) { try { //从队列中读取校验过后的数据 SaveToMysqlDTO dto = QueueUtil.QUEUE_SAVETOMYSQL.take(); if (null != dto) { //数据入库 long l = System.currentTimeMillis(); pieceSearchManager.savePieceToSql(dto); long l1 = System.currentTimeMillis(); log.info("入库 Mysql 耗时:" + (l1 - l)+" , 队列大小: "+QueueUtil.QUEUE_SAVETOMYSQL.size()); } } catch (InterruptedException e) { log.error("插入Mysql失败,失败原因: "+e.getMessage()); e.printStackTrace(); } } } } } 复制代码
2.2.5 消费保存到 mongoDB 数据队列(QUEUE_SAVETOMONGO)
public class ReadQueueForSaveMongoThread extends Thread { @Autowired MongoUtil mongoUtil; private MongoCollection mongoCollection; @Override public void run() { //获取mongo 链接 mongoCollection = mongoUtil.mongoClient(); //创建线程池 //任务数 int countThread = Integer.valueOf(new Config().getMaxSaveMongoThreadNum()); ExecutorService executorService = Executors.newFixedThreadPool(countThread); for (int i = 0; i < countThread; i++) { MyTask task = new MyTask(); executorService.execute(task); } executorService.shutdown(); } @Data class MyTask implements Runnable { @Override public void run() { while (true) { try { //从队列中读取校验过后的数据 List<Document> documents = QueueUtil.QUEUE_SAVETOMONGO.take(); if (null != documents) { //数据入库 long l = System.currentTimeMillis(); mongoCollection.insertMany(documents); long l1 = System.currentTimeMillis(); log.info("Mongo耗时:" + (l1 - l)+", 队列大小: "+ QueueUtil.QUEUE_SAVETOMONGO.size()); } } catch (Exception e) { log.error("插入mongo失败,失败原因: "+e.getMessage()); e.printStackTrace(); } } } } } 复制代码
2.2.6 消费保存到Elasticsearch数据队列(QUEUE_SAVETOES)
public class ReadQueueForSaveESThread extends Thread { @Autowired ElasticsearchManager elasticsearchManager; @Override public void run() { //创建线程池 //任务数 int countThread = Integer.valueOf(new Config().getMaxSaveESThreadNum()); ExecutorService executorService = Executors.newFixedThreadPool(countThread); for (int i = 0; i < countThread; i++) { MyTask task = new MyTask(); executorService.execute(task); } executorService.shutdown(); } @Data class MyTask implements Runnable { @Override public void run() { BulkProcessor bulkProcessor = elasticsearchManager.createBulkProcessor(); while (true) { try { //从队列中读取校验过后的数据 Map<Long, PieceJsonDTO> map = QueueUtil.QUEUE_SAVETOES.take(); if (null != map) { for (Long id : map.keySet()) { //创建request blukprocessor PieceJsonDTO pieceJsonDTO = map.get(id); String s = JsonKit.toJsonSerializeNulls(pieceJsonDTO); bulkProcessor.add(elasticsearchManager.createPieceIndexRequest(Long.toString(id),s)); pieceJsonDTO=null; } map = null; log.info("es队列大小:"+QueueUtil.QUEUE_SAVETOES.size()); } } catch (Exception e) { log.error("插入es失败,失败原因: "+e.getMessage()); e.printStackTrace(); } } } } } 复制代码
2.2.7 消费失败队列保存到文件中(QUEUE_FAIL_IDS)
public class ReadQueueForWriteLogThread extends Thread { /** * 文件根路径 */ private final static String FILE_ROOT_PATH = new Config().getFileRootPath(); /** * 项目路径 */ private final static String ABSOLUTE_PATH = FileKit.getAbsolutePath(""); @Override public void run() { //日志文件名称 String fileName = "datamove.log"; //文件路径 String filePath = ABSOLUTE_PATH + FILE_ROOT_PATH + fileName; while (true) { try { List<String> fail_ids = QueueUtil.QUEUE_FAIL_IDS.take(); if (null != fail_ids) { ArrayList<String> result = Lists.newArrayList(); String s = "失败个数:" + fail_ids.size() + " , 失败id: " + fail_ids; result.add(s); //按行写出 FileKit.appendLines(result, filePath, "UTF-8"); } } catch (Exception e) { log.error("写出失败ids失败,失败原因: "+e.getMessage()); e.printStackTrace(); } } } } 复制代码
2.3 失败数据处理
@RequestMapping(value = "***", method = {RequestMethod.GET}) @ResponseBody public void readLog() { pieceSearchService.pieceLogFile(); } // PieceSearchServiceImpl public void pieceLogFile() { dataMoveManager.startReadLogForDb(); } // AbstractTemplate public void startReadLogForDb() { readLogFileForDb(); } // DataMoveManager protected void readLogFileForDb() { pieceInfoLogFileTask.start(); } // AbstractTaskTemplate public void start() { List<? extends Callable<String>> task = createTask(); ExecutorService executorService = createExecutorService(); List list = runTaskForResult(executorService, task); writeLogFile(list); } // PieceInfoLogFileTask protected List<? extends Callable<String>> createTask() { //获取 datamove.log路径 String fileName = "datamove.log"; String filePath = ABSOLUTE_PATH + FILE_ROOT_PATH + fileName; //按行读取 获取文件中 失败id: List<String> ids = Lists.newArrayList(); try { ids = readLines(new File(filePath), "UTF-8", ids); } catch (IOException e) { e.printStackTrace(); } //创建任务组 if (!Func.isEmpty(ids)) { List<MyTask> taskWithIds = getTaskWithIds(ids); return taskWithIds; } return null; } // PieceInfoLogFileTask private List<MyTask> getTaskWithIds(List<String> ids) { //一百个ids 起一个任务 ArrayList<MyTask> tasks = Lists.newArrayList(); int a = 1; int beginIndex = 0; while (true) { if (a * 100 >= ids.size()) { MyTask myTask = new MyTask(); myTask.setIds(ids.subList(beginIndex, ids.size())); tasks.add(myTask); break; } MyTask myTask = new MyTask(); //subList() 包左不包右 myTask.setIds(ids.subList(beginIndex, beginIndex + 100)); tasks.add(myTask); beginIndex += 100; a++; } return tasks; } private class MyTask implements Callable<String> { private List<String> ids; @Override public String call() throws Exception { //每次保存的进件Id ArrayList<String> pieceIds = new ArrayList<>(); //开始时间 long beginTime = System.currentTimeMillis(); for (int i = 0; i < ids.size(); i++) { pieceIds.add(ids.get(i)); //每多少行 执行插入操作 if (i % MAX_SAVE_NUM == 0) { //执行查询保存操作 System.out.println(ids); pieceIds.clear(); } } if (!pieceIds.isEmpty()) { //执行查询保存操作 } long useTime = (System.currentTimeMillis() - beginTime) / 1000; return "日志文件失败进件重新入库完毕,耗时:" + useTime + "秒 , 文件共:" + ids.size() + "条数据, 失败个数: , 失败id:"; } } // PieceInfoLogFileTask protected ExecutorService createExecutorService() { //创建线程池 ExecutorService executorService = Executors.newFixedThreadPool(Integer.valueOf(MAX_THREAD_NUM)); return executorService; } // AbstractTaskTemplate private List runTaskForResult(ExecutorService executorService, List<? extends Callable<String>> tasks) { //线程池执行任务 List<Future<String>> futures = new ArrayList<>(); try { futures = executorService.invokeAll(tasks); } catch (InterruptedException e) { e.printStackTrace(); } finally { //关闭线程池 executorService.shutdown(); } //组装返回结果 List<String> results = Lists.newArrayList(); for (Future<String> future : futures) { try { results.add(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } return results; } // PieceInfoLogFileTask protected void writeLogFile(List results) { //日志文件名称 String fileName = "datamove.log"; //文件路径 String filePath = ABSOLUTE_PATH + FILE_ROOT_PATH + fileName; //按行写出 try { FileKit.writeLines(results, filePath, "UTF-8"); } catch (IOException e) { e.printStackTrace(); log.error("class:PieceInfoDataMoveTask method: writeLogFile()" + " 日志信息写出失败, 失败原因:" + e.getMessage()); } } 复制代码
2.4 数据迁移流程图
3 Elasticsearch在综合搜索列表实现
public Page<InvestigatePiecePageResDTO> pagination(final InvestigatePiecePageReqDTO investigatepiecePageReqDTO, final Integer pageNo, final Integer pageSize) { //创建es组合查询器 BoolQueryBuilder mustQuery = QueryBuilders.boolQuery(); //进件编号 if (Func.isNotEmpty(investigatepiecePageReqDTO.getPieceCode())) { mustQuery.must(QueryBuilders.wildcardQuery("pieceCode.keyword", "*" + investigatepiecePageReqDTO.getPieceCode() + "*")); } //手机号 if (Func.isNotEmpty(investigatepiecePageReqDTO.getMobile())) { mustQuery.must(QueryBuilders.wildcardQuery("pieceContacts.mobile.keyword", "*" + investigatepiecePageReqDTO.getMobile() + "*")); } //身份证号 if (Func.isNotEmpty(investigatepiecePageReqDTO.getIdCard())) { mustQuery.must(QueryBuilders.wildcardQuery("idCard.keyword", "*" + investigatepiecePageReqDTO.getIdCard() + "*")); } //服务网点 if (Func.isNotEmpty(investigatepiecePageReqDTO.getServicePoint())) { mustQuery.must(QueryBuilders.wildcardQuery("servicePoint.keyword", "*" + investigatepiecePageReqDTO.getServicePoint() + "*")); } //座机号 if (Func.isNotEmpty(investigatepiecePageReqDTO.getPhone())) { mustQuery.must(QueryBuilders.wildcardQuery("pieceContacts.phone.keyword", "*" + investigatepiecePageReqDTO.getPhone() + "*")); } //单位名称 if (Func.isNotEmpty(investigatepiecePageReqDTO.getCompanyName())) { mustQuery.must(QueryBuilders.wildcardQuery("companyName.keyword", "*" + investigatepiecePageReqDTO.getCompanyName() + "*")); } //工单编号 if (Func.isNotEmpty(investigatepiecePageReqDTO.getOrderCode())) { mustQuery.must(QueryBuilders.wildcardQuery("orderCode.keyword", "*" + investigatepiecePageReqDTO.getOrderCode() + "*")); } //案件编号 if (Func.isNotEmpty(investigatepiecePageReqDTO.getCaseCode())) { mustQuery.must(QueryBuilders.wildcardQuery("caseCode.keyword", "*" + investigatepiecePageReqDTO.getCaseCode() + "*")); } //规则搜索 if (Func.isNotEmpty(investigatepiecePageReqDTO.getRuleCode())) { mustQuery.must(QueryBuilders.wildcardQuery("ruleName.keyword", "*" + investigatepiecePageReqDTO.getRuleCode() + "*")); } //客户经理 if (Func.isNotEmpty(investigatepiecePageReqDTO.getCustomerManager())) { mustQuery.must(QueryBuilders.wildcardQuery("customerManager.keyword", "*" + investigatepiecePageReqDTO.getCustomerManager() + "*")); } //进件大区 if (Func.isNotEmpty(investigatepiecePageReqDTO.getPieceFromArea())) { mustQuery.must(QueryBuilders.wildcardQuery("pieceFromArea.keyword", "*" + investigatepiecePageReqDTO.getPieceFromArea() + "*")); } //进件状态 if (Func.isNotEmpty(investigatepiecePageReqDTO.getBusinessStatus())) { mustQuery.must(QueryBuilders.termQuery("businessStatus.keyword", investigatepiecePageReqDTO.getBusinessStatus())); } //产品类型 if (Func.isNotEmpty(investigatepiecePageReqDTO.getProductCode())) { mustQuery.must(QueryBuilders.termQuery("productName.keyword", investigatepiecePageReqDTO.getProductCode())); } else { //默认查询当前用户权限下的所有产品类型 List<String> productCodes = getProductCodeByCurrentUser(); mustQuery.must(QueryBuilders.termsQuery("productName.keyword", productCodes)); } //反欺诈处理人 if (Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionMaker())) { mustQuery.must(QueryBuilders.wildcardQuery("decisionMaker.keyword", "*" + investigatepiecePageReqDTO.getDecisionMaker() + "*")); } //单位地址 if (Func.isNotEmpty(investigatepiecePageReqDTO.getCompanyAddress())) { mustQuery.must(QueryBuilders.wildcardQuery("companyAddress.keyword", "*" + investigatepiecePageReqDTO.getCompanyAddress() + "*")); } //欺诈报警 if (Func.isNotEmpty(investigatepiecePageReqDTO.getFraudAlarmLevelCode())) { DictionaryDTO dictionaryDTO = DictionaryKit.QZJGLB_MAP().get(investigatepiecePageReqDTO.getFraudAlarmLevelCode()); mustQuery.must(QueryBuilders.termQuery("fraudAlarmLevelName.keyword", dictionaryDTO.getName())); } //决策结果 if (Func.isNotEmpty(investigatepiecePageReqDTO.getResultCode())) { DictionaryDTO dictionaryDTO = DictionaryKit.JCJG_MAP().get(investigatepiecePageReqDTO.getResultCode()); mustQuery.must(QueryBuilders.termQuery("decisionResult.keyword", dictionaryDTO.getName())); } //反欺诈处理状态 if (Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionStatus())) { DictionaryDTO dictionaryDTO = DictionaryKit.JCZT_MAP().get(investigatepiecePageReqDTO.getDecisionStatus()); mustQuery.must(QueryBuilders.termQuery("decisionStatus.keyword", dictionaryDTO.getName())); } //客服人员 if (Func.isNotEmpty(investigatepiecePageReqDTO.getCustomerService())) { mustQuery.must(QueryBuilders.wildcardQuery("serviceUserName.keyword", "*" + investigatepiecePageReqDTO.getCustomerService() + "*")); } //客户姓名 if (Func.isNotEmpty(investigatepiecePageReqDTO.getCustomerName())) { mustQuery.must(QueryBuilders.wildcardQuery("customerName.keyword", "*" + investigatepiecePageReqDTO.getCustomerName() + "*")); } //团队经理 if (Func.isNotEmpty(investigatepiecePageReqDTO.getTeamManager())) { mustQuery.must(QueryBuilders.wildcardQuery("teamManager.keyword", "*" + investigatepiecePageReqDTO.getTeamManager() + "*")); } //进件时间 try { if (Func.isNotEmpty(investigatepiecePageReqDTO.getEnterCreditTimeStart()) && Func.isNotEmpty(investigatepiecePageReqDTO.getEnterCreditTimeEnd())) { mustQuery.must(QueryBuilders.rangeQuery("enterCreditTime").from(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getEnterCreditTimeStart()).getTime()).to(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getEnterCreditTimeEnd()).getTime())); } else if (Func.isNotEmpty(investigatepiecePageReqDTO.getEnterCreditTimeStart())) { mustQuery.must(QueryBuilders.rangeQuery("enterCreditTime").from(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getEnterCreditTimeStart()).getTime())); } else if (Func.isNotEmpty(investigatepiecePageReqDTO.getEnterCreditTimeEnd())) { mustQuery.must(QueryBuilders.rangeQuery("enterCreditTime").to(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getEnterCreditTimeEnd()).getTime())); } } catch (Exception e) { e.printStackTrace(); } //决策时间 try { if (Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionTimeStart()) && Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionTimeEnd())) { mustQuery.must(QueryBuilders.rangeQuery("decisionTime").from(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getDecisionTimeStart()).getTime()).to(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getDecisionTimeEnd()).getTime())); } else if (Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionTimeStart())) { mustQuery.must(QueryBuilders.rangeQuery("decisionTime").from(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getDecisionTimeStart()).getTime())); } else if (Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionTimeEnd())) { mustQuery.must(QueryBuilders.rangeQuery("decisionTime").to(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getDecisionTimeEnd()).getTime())); } } catch (Exception e) { e.printStackTrace(); } //是否初始化 if (Boolean.valueOf(investigatepiecePageReqDTO.getIfInit())) { //第一次进入,设置时间为今天 mustQuery.must(QueryBuilders.rangeQuery("enterCreditTime").from(getTodayStartTime()).to(getTodayEndTime())); } //组装分页 排序 条件 SearchSourceBuilder source = new SearchSourceBuilder(); source.query(mustQuery); //排序字段 if (Func.isNotEmpty(investigatepiecePageReqDTO.getSortChangeKey())) { SortOrder sortOrder = "asc".equals(investigatepiecePageReqDTO.getSortChangeWay()) ? SortOrder.ASC : SortOrder.DESC; if (Func.equals(investigatepiecePageReqDTO.getSortChangeKey(), InvestigatePieceEntity.DB_COL_ENTER_CREDIT_TIME)) { source.sort("enterCreditTime", sortOrder); } else if (Func.equals(investigatepiecePageReqDTO.getSortChangeKey(), InvestigatePieceEntity.DB_COL_DECISION_TIME)) { source.sort("decisionTime", sortOrder); } else if (Func.equals(investigatepiecePageReqDTO.getSortChangeKey(), InvestigatePieceEntity.DB_COL_DECISION_MAKER)) { source.sort("decisionMaker.keyword", sortOrder); } else { source.sort("enterCreditTime", sortOrder); } } else { source.sort("enterCreditTime", SortOrder.DESC); } //设置分页 注意:es分页默认从0页开始 source.from(pageNo - 1); source.size(pageSize); //查询结果 Page<InvestigatePiecePageResDTO> page = getResponseFromEs(source); return page; } 复制代码
private Page<InvestigatePiecePageResDTO> getResponseFromEs(SearchSourceBuilder source) { //创建查询es请求 SearchHits hits; try { SearchResponse response = elasticsearchManager.getPieceResponseWithBuilder(source); hits = response.getHits(); } catch (Exception e) { throw new ServiceException(500, "es搜索引擎连接失败"); } //获取命中数据 SearchHit[] hitsHits = hits.getHits(); //总命中数 Long totalHits = hits.getTotalHits(); //组装前端返回DTO List<InvestigatePiecePageResDTO> result = Lists.newArrayList(); for (SearchHit hitsHit : hitsHits) { InvestigatePiecePageResDTO investigatePiecePageResDTO = new InvestigatePiecePageResDTO(); Map<String, Object> sourceAsMap = hitsHit.getSourceAsMap(); investigatePiecePageResDTO.setId(Long.valueOf(hitsHit.getId())); investigatePiecePageResDTO.setPieceCode((String) sourceAsMap.get("pieceCode")); investigatePiecePageResDTO.setAppKey((String) sourceAsMap.get("appKey")); Integer decisionId = (Integer) sourceAsMap.get("decisionId"); if (null != decisionId) { investigatePiecePageResDTO.setDecisionId(decisionId.longValue()); } List<String> orderCode = (List<String>) sourceAsMap.get("orderCode"); if (Func.isNotEmpty(orderCode)) { investigatePiecePageResDTO.setOrderCode(((List<String>) sourceAsMap.get("orderCode")).get(0)); } investigatePiecePageResDTO.setCustomerName((String) sourceAsMap.get("customerName")); investigatePiecePageResDTO.setIdCard((String) sourceAsMap.get("idCard")); investigatePiecePageResDTO.setProductName(DictionaryKit.CPLX_MAP().get(sourceAsMap.get("productName")).getName()); investigatePiecePageResDTO.setEnterCreditTime(SIMPLE_DATE_FORMAT.format(new Date((Long) sourceAsMap.get("enterCreditTime")))); investigatePiecePageResDTO.setBusinessStatus((String) sourceAsMap.get("businessStatus")); Long decisionTime = (Long) sourceAsMap.get("decisionTime"); if (null != decisionTime) { investigatePiecePageResDTO.setDecisionTime(SIMPLE_DATE_FORMAT.format(new Date(decisionTime))); } investigatePiecePageResDTO.setDecisionResult((String) sourceAsMap.get("decisionStatus")); investigatePiecePageResDTO.setFraudAlarmLevelName((String) sourceAsMap.get("fraudAlarmLevelName")); investigatePiecePageResDTO.setDecisionMaker((String) sourceAsMap.get("decisionMaker")); investigatePiecePageResDTO.setServicePoint((String) sourceAsMap.get("servicePoint")); String decisionStatus = (String) sourceAsMap.get("decisionStatus"); if ("已决策".equals(decisionStatus)) { investigatePiecePageResDTO.set_disabled(false); } else { investigatePiecePageResDTO.set_disabled(true); } result.add(investigatePiecePageResDTO); } //组装前端分页DTO Page<InvestigatePiecePageResDTO> page = new Page<>(); page.setRecords(result); page.setTotal(totalHits.intValue()); return page; } 复制代码
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 实现业务数据的同步迁移 · 思路一
- 通过迁移学习实现OCT图像识别
- 快速实现地图迁移数据可视化
- 用TensorFlow实现神经网络画风迁移
- 使用云和虚拟化实现数据迁移的最佳实践
- vSPhere使用vMotion实现虚拟机热迁移详解
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。