2 迁移实现
2.1 从数据库读取进件id存到文件中
@RequestMapping(value = "***", method = {RequestMethod.GET}) @ResponseBody public void writeFile(Integer id) { if (null == id) { pieceSearchService.writePieceFile(); } else { pieceSearchService.writePieceFile(id); } } // PieceSearchServiceImpl public void writePieceFile() { dataMoveManager.startWriteFile(); } 复制代码
// AbstractTemplate public void startWriteFile() { readDataAndWriteFile(); } // DataMoveManager protected void readDataAndWriteFile() { pieceInfoWriteFileTask.start(); } // AbstractTaskTemplate public void start() { List<? extends Callable<String>> task = createTask(); ExecutorService executorService = createExecutorService(); List list = runTaskForResult(executorService, task); writeLogFile(list); } // PieceInfoWriteFileTask protected List<? extends Callable<String>> createTask() { //获取进件起始id 并构建任务 //查询所有进件数据(500W+) List<PieceIdAndIdDTO> pieceIdAndIdDTOS = pieceSearchDao.queryAllPieceInfo(); List<MyTask> tasks = this.getTaskFromPieceId(pieceIdAndIdDTOS); return tasks; } // PieceInfoWriteFileTask List<MyTask> getTaskFromPieceId(List<PieceIdAndIdDTO> pieceIdAndIdDTOS) { //任务集合 List<MyTask> result = Lists.newArrayList(); Integer index = 1; int fileName = 1; List<PieceIdAndIdDTO> ids = Lists.newArrayList(); for (PieceIdAndIdDTO pieceIdAndIdDTO : pieceIdAndIdDTOS) { ids.add(pieceIdAndIdDTO); //每 条创建一个任务 if (index % FILE_MAX_NUM == 0) { //创建任务 MyTask myTask = new MyTask(); myTask.setIds(ids); myTask.setFileName(Integer.toString(fileName)); result.add(myTask); ids = Lists.newArrayList(); //一定要new fileName++; } index++; } if (!ids.isEmpty()) { MyTask myTask = new MyTask(); myTask.setIds(ids); myTask.setFileName(Integer.toString(fileName)); result.add(myTask); } return result; } // PieceInfoWriteFileTask protected ExecutorService createExecutorService() { //创建线程池 ExecutorService executorService = Executors.newFixedThreadPool(MAX_THREAD_NUM); return executorService; } // AbstractTaskTemplate private List runTaskForResult(ExecutorService executorService, List<? extends Callable<String>> tasks) { //线程池执行任务 List<Future<String>> futures = new ArrayList<>(); try { futures = executorService.invokeAll(tasks); } catch (InterruptedException e) { e.printStackTrace(); } finally { //关闭线程池 executorService.shutdown(); } //组装返回结果 List<String> results = Lists.newArrayList(); for (Future<String> future : futures) { try { results.add(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } return results; } // PieceInfoWriteFileTask protected void writeLogFile(List results) { //日志文件名称 String fileName = "writeFile.log"; //文件路径 String filePath = ABSOLUTE_PATH + FILE_ROOT_PATH + fileName; //按行写出 try { FileKit.writeLines(results, filePath, "UTF-8"); } catch (IOException e) { e.printStackTrace(); log.error("class:PieceInfoWriteFileTask method: writeLogFile()" + " 日志信息写出失败, 失败原因:" + e.getMessage()); } } 复制代码
// PieceInfoWriteFileTask class MyTask implements Callable<String> { private List<PieceIdAndIdDTO> ids; private String fileName; @Override public String call() { String message = "文件生成失败"; try { message = writeFile(ids, fileName); long l2 = System.currentTimeMillis(); } catch (Exception e) { return message + "----失败原因:" + e.getMessage(); } return message; } } // PieceInfoWriteFileTask private String writeFile(List<PieceIdAndIdDTO> pieceIdAndId, String fileName) { //文件路径+文件名 String filePath = ABSOLUTE_PATH + FILE_ROOT_PATH + fileName + ".txt"; try { log.info("文件输出路径:" + filePath); //输出文件 FileKit.writeLines(pieceIdAndId, filePath, "UTF-8"); } catch (IOException e) { return "此文件生成失败:" + fileName + "失败原因:" + e.getMessage(); } return "此文件生成成功:" + fileName; } 复制代码
2.2 数据迁移
2.2.1 从文件读取id存到队列中
@RequestMapping(value = "***", method = {RequestMethod.GET}) @ResponseBody public void dataMove() { pieceSearchService.pieceDataMove(); } // PieceSearchServiceImpl public void pieceDataMove() { dataMoveManager.startReadForDb(); } 复制代码
// AbstractTemplate public void startReadForDb() { readFileForDb(); } // DataMoveManager protected void readFileForDb() { pieceInfoDataMoveTask.start(); } // AbstractTaskTemplate public void start() { List<? extends Callable<String>> task = createTask(); ExecutorService executorService = createExecutorService(); List list = runTaskForResult(executorService, task); writeLogFile(list); } // PieceInfoDataMoveTask protected List<? extends Callable<String>> createTask() { //获取此路径下的所有.txt文件 List<File> files = null; try { files = FileKit.listFilesWithSuffix(new File(ABSOLUTE_PATH + FILE_ROOT_PATH), ".txt"); } catch (Exception e) { e.printStackTrace(); log.error("class:PieceInfoDataMoveTask method: createTask()" + " 创建任务组失败, 失败原因:" + e.getMessage()); } //根据文件构建任务 List<MyTask> tasks = getTaskWithFile(files); return tasks; } // PieceInfoDataMoveTask private List<MyTask> getTaskWithFile(List<File> files) { ArrayList<MyTask> tasks = Lists.newArrayList(); for (File file : files) { MyTask myTask = new MyTask(); myTask.setFile(file); tasks.add(myTask); } return tasks; } // PieceInfoDataMoveTask private class MyTask implements Callable<String> { private File file; @Override public String call() throws Exception { //每次保存的进件Id ArrayList<String> ids = new ArrayList<>(); //开始时间 long beginTime = System.currentTimeMillis(); //文件名称 String fileName = file.getName(); BufferedReader reader = null; //行数据 String line; int a = 1; try { reader = FileKit.getReader(file, "UTF-8"); while (true) { line = reader.readLine(); if (Func.isEmpty(line)) { break; } //每行数据格式为 id:1121 pieceId:ZPTE1213 String id = line.split("\\s+")[0].split(":")[1]; ids.add(id); //每多少行 执行插入操作 if (a % MAX_SAVE_NUM == 0) { //存入队列 QueueUtil.QUEUE_IDS.put(ids); ids = Lists.newArrayList(); } a++; } } catch (Exception e) { return "读取" + fileName + "文件失败原因:" + e.getMessage(); } finally { if (!ids.isEmpty()) { //存入队列 QueueUtil.QUEUE_IDS.put(ids); } IoKit.close(reader); } long useTime = (System.currentTimeMillis() - beginTime) / 1000; return "读取" + fileName + "文件完毕,耗时:" + useTime + "秒 , 文件共:" + (a - 1) + "条数据"; } } // QueueUtil public class QueueUtil { /** *队列一:存放从文件中读取的id集合 */ public static final LinkedBlockingQueue<List<String>> QUEUE_IDS = new LinkedBlockingQueue<List<String>>(); /** * 队列二:存放组装数据过后的List<JsonObject> */ public static final LinkedBlockingQueue<List<JSONObject>> QUEUE_JSON_OBJECTS = new LinkedBlockingQueue<List<JSONObject>>(); /** *队列三:存放所有失败id集合 */ public static final LinkedBlockingQueue<List<String>> QUEUE_FAIL_IDS = new LinkedBlockingQueue<List<String>>(); /** *队列四:存放所有校验过的集合 */ public static final LinkedBlockingQueue<SaveToMysqlDTO> QUEUE_SAVETOMYSQL = new LinkedBlockingQueue<SaveToMysqlDTO>(); /** *队列五:存放所有校验过的集合 */ public static final LinkedBlockingQueue<List<Document>> QUEUE_SAVETOMONGO = new LinkedBlockingQueue<List<Document>>(); /** *队列六:存放所有校验过的集合 */ public static final LinkedBlockingQueue<Map<Long,PieceJsonDTO>> QUEUE_SAVETOES = new LinkedBlockingQueue< Map<Long,PieceJsonDTO>>(); } 复制代码
// PieceInfoDataMoveTask protected ExecutorService createExecutorService() { //创建线程池 ExecutorService executorService = Executors.newFixedThreadPool(Integer.valueOf(MAX_THREAD_NUM)); return executorService; } // AbstractTaskTemplate private List runTaskForResult(ExecutorService executorService, List<? extends Callable<String>> tasks) { //线程池执行任务 List<Future<String>> futures = new ArrayList<>(); try { futures = executorService.invokeAll(tasks); } catch (InterruptedException e) { e.printStackTrace(); } finally { //关闭线程池 executorService.shutdown(); } //组装返回结果 List<String> results = Lists.newArrayList(); for (Future<String> future : futures) { try { results.add(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } return results; } // PieceInfoDataMoveTask protected void writeLogFile(List results) { } 复制代码
2.2.2 消费进件id队列(QUEUE_IDS)
//spring 配置文件加载bean 调用init() 方法 <bean class="com.migration.eagle.manager.PieceSearchManager" init-method="init" /> /** * 启动线程 */ // PieceSearchManager private void init() { //组装JsonObject readQueueForAssembleJsonThread.start(); log.info("线程1启动完毕,正在读取ids队列!!!!"); //校验数据 readQueueForValidateThread.start(); log.info("线程2启动完毕,正在读取List<JsonObject>队列!!!!"); //入库数据 readQueueForSaveMysqlThread.start(); log.info("线程3启动完毕,正在读取saveToMysql队列!!!!"); //入库Mongo readQueueForSaveMongoThread.start(); log.info("线程4启动完毕,正在读取saveToMongo队列!!!!"); //入库es readQueueForSaveESThread.start(); log.info("线程5启动完毕,正在读取saveToES队列!!!!"); //写出失败日志 readQueueForWriteLogThread.start(); log.info("线程6启动完毕,正在读取失败ids队列!!!!"); } 复制代码
public class ReadQueueForAssembleJsonThread extends Thread { @Autowired PieceAssembleManager pieceAssembleManager; @Override public void run() { //创建线程池 //任务数 int countThread = Integer.valueOf(new Config().getMaxAssembleJsonThreadNum()); ExecutorService executorService = Executors.newFixedThreadPool(countThread); for (int i = 0; i < countThread; i++) { MyTask task = new MyTask(); executorService.execute(task); } executorService.shutdown(); } @Data class MyTask implements Runnable { @Override public void run() { while (true) { try { //读取队列中的ids List<String> ids = QueueUtil.QUEUE_IDS.take(); if (null != ids) { long l1 = System.currentTimeMillis(); //组装数据 Map<String, List> pieceJson_map = pieceAssembleManager.getPieceJson(ids); //存入队列 QueueUtil.QUEUE_JSON_OBJECTS.put(pieceJson_map.get("success")); //存入失败ids队列 QueueUtil.QUEUE_FAIL_IDS.put(pieceJson_map.get("fail")); long l2 = System.currentTimeMillis(); log.info("组装JSON耗时:" + (l2 - l1)," ,队列大小: "+QueueUtil.QUEUE_IDS.size()); ids=null; pieceJson_map=null; } } catch (Exception e) { log.error("组装数据失败,失败原因: "+e.getMessage()); e.printStackTrace(); } } } } } 复制代码
2.2.3 消费JsonObject队列(QUEUE_JSON_OBJECTS)
public class ReadQueueForValidateThread extends Thread { @Autowired PieceSearchManager pieceSearchManager; @Override public void run() { //创建线程池 int countThread = Integer.valueOf(new Config().getMaxValidateThreadNum()); ExecutorService executorService = Executors.newFixedThreadPool(countThread); List<JSONObject> jsonObjects = Lists.newArrayList(); while (true) { try { //从队列中读取List<JsonObject> List<JSONObject> poll = QueueUtil.QUEUE_JSON_OBJECTS.take(); if (null != poll) { //创建任务 for (int i = 1; i <= poll.size(); i++) { jsonObjects.add(poll.get(i - 1)); if (i % 100 == 0) { MyTask myTask = new MyTask(); myTask.setJsonObjects(jsonObjects); executorService.execute(myTask); jsonObjects = Lists.newArrayList(); } } if (!jsonObjects.isEmpty()) { MyTask myTask = new MyTask(); myTask.setJsonObjects(jsonObjects); executorService.execute(myTask); jsonObjects = Lists.newArrayList(); } log.info("校验队列大小: "+QueueUtil.QUEUE_JSON_OBJECTS.size()); } } catch (Exception e) { e.printStackTrace(); } } } @Data class MyTask implements Runnable { private List<JSONObject> jsonObjects; @Override public void run() { long l = System.currentTimeMillis(); SaveToMysqlDTO dto = new SaveToMysqlDTO(); //mongo队列所需数据 List<Document> toMongos = Lists.newArrayList(); //es队列所需数据 Map<Long,PieceJsonDTO> id_PieceJsonDTO =Maps.newHashMap(); List<InvestigatePieceEntity> investigatePieceDTOS = Lists.newArrayList(); List<PieceDetailEntity> pieceDetailDTOS = Lists.newArrayList(); List<PieceContactEntity> pieceContactDTOS = Lists.newArrayList(); //失败结果 List<String> failedJson = Lists.newArrayList(); try { for (JSONObject finalObj : jsonObjects) { Map<String, JSONObject> map = pieceSearchManager.validatePiece(finalObj); if (map.containsKey("successData")) { pieceSearchManager.savePiecesLast(map.get("successData"), investigatePieceDTOS, pieceContactDTOS, pieceDetailDTOS,id_PieceJsonDTO); //增加id(存入mongo使用) JsonObject jsonObject = JsonKit.parseJsonObject(finalObj.toString()); JsonElement appkey = JsonKit.getValueByPath(jsonObject, MongoPieceManagerImpl.PIECE_CODE_JSON_PATH); jsonObject.add(MongoPieceManagerImpl.MONGO_ID, appkey); toMongos.add(Document.parse(jsonObject.toString())); } else { failedJson.add(((Map) ((Map) map.get("failed").get("data")).get("applicant")).get("intpc_id").toString()); } map=null; } dto.setInvestigatePieceDTOS(investigatePieceDTOS); dto.setPieceContactDTOS(pieceContactDTOS); dto.setPieceDetailDTOS(pieceDetailDTOS); //存入失败队列 QueueUtil.QUEUE_FAIL_IDS.put(failedJson); //存入入库队列 QueueUtil.QUEUE_SAVETOMYSQL.put(dto); QueueUtil.QUEUE_SAVETOMONGO.put(toMongos); QueueUtil.QUEUE_SAVETOES.put(id_PieceJsonDTO); long l1 = System.currentTimeMillis(); log.info("校验耗时:" + (l1 - l)); dto=null; toMongos=null; id_PieceJsonDTO=null; failedJson=null; jsonObjects=null; } catch (Exception e) { log.error("校验失败,失败原因: "+e.getMessage()); e.printStackTrace(); } } } } 复制代码
2.2.4 消费保存到数据库数据队列(QUEUE_SAVETOMYSQL)
public class ReadQueueForSaveMysqlThread extends Thread { @Autowired PieceSearchManager pieceSearchManager; @Override public void run() { //创建线程池 //任务数 int countThread = Integer.valueOf(new Config().getMaxSaveMysqlThreadNum()); ExecutorService executorService = Executors.newFixedThreadPool(countThread); for (int i = 0; i < countThread; i++) { MyTask task = new MyTask(); executorService.execute(task); } executorService.shutdown(); } @Data class MyTask implements Runnable { @Override public void run() { while (true) { try { //从队列中读取校验过后的数据 SaveToMysqlDTO dto = QueueUtil.QUEUE_SAVETOMYSQL.take(); if (null != dto) { //数据入库 long l = System.currentTimeMillis(); pieceSearchManager.savePieceToSql(dto); long l1 = System.currentTimeMillis(); log.info("入库 Mysql 耗时:" + (l1 - l)+" , 队列大小: "+QueueUtil.QUEUE_SAVETOMYSQL.size()); } } catch (InterruptedException e) { log.error("插入Mysql失败,失败原因: "+e.getMessage()); e.printStackTrace(); } } } } } 复制代码
2.2.5 消费保存到 mongoDB 数据队列(QUEUE_SAVETOMONGO)
public class ReadQueueForSaveMongoThread extends Thread { @Autowired MongoUtil mongoUtil; private MongoCollection mongoCollection; @Override public void run() { //获取mongo 链接 mongoCollection = mongoUtil.mongoClient(); //创建线程池 //任务数 int countThread = Integer.valueOf(new Config().getMaxSaveMongoThreadNum()); ExecutorService executorService = Executors.newFixedThreadPool(countThread); for (int i = 0; i < countThread; i++) { MyTask task = new MyTask(); executorService.execute(task); } executorService.shutdown(); } @Data class MyTask implements Runnable { @Override public void run() { while (true) { try { //从队列中读取校验过后的数据 List<Document> documents = QueueUtil.QUEUE_SAVETOMONGO.take(); if (null != documents) { //数据入库 long l = System.currentTimeMillis(); mongoCollection.insertMany(documents); long l1 = System.currentTimeMillis(); log.info("Mongo耗时:" + (l1 - l)+", 队列大小: "+ QueueUtil.QUEUE_SAVETOMONGO.size()); } } catch (Exception e) { log.error("插入mongo失败,失败原因: "+e.getMessage()); e.printStackTrace(); } } } } } 复制代码
2.2.6 消费保存到Elasticsearch数据队列(QUEUE_SAVETOES)
public class ReadQueueForSaveESThread extends Thread { @Autowired ElasticsearchManager elasticsearchManager; @Override public void run() { //创建线程池 //任务数 int countThread = Integer.valueOf(new Config().getMaxSaveESThreadNum()); ExecutorService executorService = Executors.newFixedThreadPool(countThread); for (int i = 0; i < countThread; i++) { MyTask task = new MyTask(); executorService.execute(task); } executorService.shutdown(); } @Data class MyTask implements Runnable { @Override public void run() { BulkProcessor bulkProcessor = elasticsearchManager.createBulkProcessor(); while (true) { try { //从队列中读取校验过后的数据 Map<Long, PieceJsonDTO> map = QueueUtil.QUEUE_SAVETOES.take(); if (null != map) { for (Long id : map.keySet()) { //创建request blukprocessor PieceJsonDTO pieceJsonDTO = map.get(id); String s = JsonKit.toJsonSerializeNulls(pieceJsonDTO); bulkProcessor.add(elasticsearchManager.createPieceIndexRequest(Long.toString(id),s)); pieceJsonDTO=null; } map = null; log.info("es队列大小:"+QueueUtil.QUEUE_SAVETOES.size()); } } catch (Exception e) { log.error("插入es失败,失败原因: "+e.getMessage()); e.printStackTrace(); } } } } } 复制代码
2.2.7 消费失败队列保存到文件中(QUEUE_FAIL_IDS)
public class ReadQueueForWriteLogThread extends Thread { /** * 文件根路径 */ private final static String FILE_ROOT_PATH = new Config().getFileRootPath(); /** * 项目路径 */ private final static String ABSOLUTE_PATH = FileKit.getAbsolutePath(""); @Override public void run() { //日志文件名称 String fileName = "datamove.log"; //文件路径 String filePath = ABSOLUTE_PATH + FILE_ROOT_PATH + fileName; while (true) { try { List<String> fail_ids = QueueUtil.QUEUE_FAIL_IDS.take(); if (null != fail_ids) { ArrayList<String> result = Lists.newArrayList(); String s = "失败个数:" + fail_ids.size() + " , 失败id: " + fail_ids; result.add(s); //按行写出 FileKit.appendLines(result, filePath, "UTF-8"); } } catch (Exception e) { log.error("写出失败ids失败,失败原因: "+e.getMessage()); e.printStackTrace(); } } } } 复制代码
2.3 失败数据处理
@RequestMapping(value = "***", method = {RequestMethod.GET}) @ResponseBody public void readLog() { pieceSearchService.pieceLogFile(); } // PieceSearchServiceImpl public void pieceLogFile() { dataMoveManager.startReadLogForDb(); } // AbstractTemplate public void startReadLogForDb() { readLogFileForDb(); } // DataMoveManager protected void readLogFileForDb() { pieceInfoLogFileTask.start(); } // AbstractTaskTemplate public void start() { List<? extends Callable<String>> task = createTask(); ExecutorService executorService = createExecutorService(); List list = runTaskForResult(executorService, task); writeLogFile(list); } // PieceInfoLogFileTask protected List<? extends Callable<String>> createTask() { //获取 datamove.log路径 String fileName = "datamove.log"; String filePath = ABSOLUTE_PATH + FILE_ROOT_PATH + fileName; //按行读取 获取文件中 失败id: List<String> ids = Lists.newArrayList(); try { ids = readLines(new File(filePath), "UTF-8", ids); } catch (IOException e) { e.printStackTrace(); } //创建任务组 if (!Func.isEmpty(ids)) { List<MyTask> taskWithIds = getTaskWithIds(ids); return taskWithIds; } return null; } // PieceInfoLogFileTask private List<MyTask> getTaskWithIds(List<String> ids) { //一百个ids 起一个任务 ArrayList<MyTask> tasks = Lists.newArrayList(); int a = 1; int beginIndex = 0; while (true) { if (a * 100 >= ids.size()) { MyTask myTask = new MyTask(); myTask.setIds(ids.subList(beginIndex, ids.size())); tasks.add(myTask); break; } MyTask myTask = new MyTask(); //subList() 包左不包右 myTask.setIds(ids.subList(beginIndex, beginIndex + 100)); tasks.add(myTask); beginIndex += 100; a++; } return tasks; } private class MyTask implements Callable<String> { private List<String> ids; @Override public String call() throws Exception { //每次保存的进件Id ArrayList<String> pieceIds = new ArrayList<>(); //开始时间 long beginTime = System.currentTimeMillis(); for (int i = 0; i < ids.size(); i++) { pieceIds.add(ids.get(i)); //每多少行 执行插入操作 if (i % MAX_SAVE_NUM == 0) { //执行查询保存操作 System.out.println(ids); pieceIds.clear(); } } if (!pieceIds.isEmpty()) { //执行查询保存操作 } long useTime = (System.currentTimeMillis() - beginTime) / 1000; return "日志文件失败进件重新入库完毕,耗时:" + useTime + "秒 , 文件共:" + ids.size() + "条数据, 失败个数: , 失败id:"; } } // PieceInfoLogFileTask protected ExecutorService createExecutorService() { //创建线程池 ExecutorService executorService = Executors.newFixedThreadPool(Integer.valueOf(MAX_THREAD_NUM)); return executorService; } // AbstractTaskTemplate private List runTaskForResult(ExecutorService executorService, List<? extends Callable<String>> tasks) { //线程池执行任务 List<Future<String>> futures = new ArrayList<>(); try { futures = executorService.invokeAll(tasks); } catch (InterruptedException e) { e.printStackTrace(); } finally { //关闭线程池 executorService.shutdown(); } //组装返回结果 List<String> results = Lists.newArrayList(); for (Future<String> future : futures) { try { results.add(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } return results; } // PieceInfoLogFileTask protected void writeLogFile(List results) { //日志文件名称 String fileName = "datamove.log"; //文件路径 String filePath = ABSOLUTE_PATH + FILE_ROOT_PATH + fileName; //按行写出 try { FileKit.writeLines(results, filePath, "UTF-8"); } catch (IOException e) { e.printStackTrace(); log.error("class:PieceInfoDataMoveTask method: writeLogFile()" + " 日志信息写出失败, 失败原因:" + e.getMessage()); } } 复制代码
2.4 数据迁移流程图
3 Elasticsearch在综合搜索列表实现
public Page<InvestigatePiecePageResDTO> pagination(final InvestigatePiecePageReqDTO investigatepiecePageReqDTO, final Integer pageNo, final Integer pageSize) { //创建es组合查询器 BoolQueryBuilder mustQuery = QueryBuilders.boolQuery(); //进件编号 if (Func.isNotEmpty(investigatepiecePageReqDTO.getPieceCode())) { mustQuery.must(QueryBuilders.wildcardQuery("pieceCode.keyword", "*" + investigatepiecePageReqDTO.getPieceCode() + "*")); } //手机号 if (Func.isNotEmpty(investigatepiecePageReqDTO.getMobile())) { mustQuery.must(QueryBuilders.wildcardQuery("pieceContacts.mobile.keyword", "*" + investigatepiecePageReqDTO.getMobile() + "*")); } //身份证号 if (Func.isNotEmpty(investigatepiecePageReqDTO.getIdCard())) { mustQuery.must(QueryBuilders.wildcardQuery("idCard.keyword", "*" + investigatepiecePageReqDTO.getIdCard() + "*")); } //服务网点 if (Func.isNotEmpty(investigatepiecePageReqDTO.getServicePoint())) { mustQuery.must(QueryBuilders.wildcardQuery("servicePoint.keyword", "*" + investigatepiecePageReqDTO.getServicePoint() + "*")); } //座机号 if (Func.isNotEmpty(investigatepiecePageReqDTO.getPhone())) { mustQuery.must(QueryBuilders.wildcardQuery("pieceContacts.phone.keyword", "*" + investigatepiecePageReqDTO.getPhone() + "*")); } //单位名称 if (Func.isNotEmpty(investigatepiecePageReqDTO.getCompanyName())) { mustQuery.must(QueryBuilders.wildcardQuery("companyName.keyword", "*" + investigatepiecePageReqDTO.getCompanyName() + "*")); } //工单编号 if (Func.isNotEmpty(investigatepiecePageReqDTO.getOrderCode())) { mustQuery.must(QueryBuilders.wildcardQuery("orderCode.keyword", "*" + investigatepiecePageReqDTO.getOrderCode() + "*")); } //案件编号 if (Func.isNotEmpty(investigatepiecePageReqDTO.getCaseCode())) { mustQuery.must(QueryBuilders.wildcardQuery("caseCode.keyword", "*" + investigatepiecePageReqDTO.getCaseCode() + "*")); } //规则搜索 if (Func.isNotEmpty(investigatepiecePageReqDTO.getRuleCode())) { mustQuery.must(QueryBuilders.wildcardQuery("ruleName.keyword", "*" + investigatepiecePageReqDTO.getRuleCode() + "*")); } //客户经理 if (Func.isNotEmpty(investigatepiecePageReqDTO.getCustomerManager())) { mustQuery.must(QueryBuilders.wildcardQuery("customerManager.keyword", "*" + investigatepiecePageReqDTO.getCustomerManager() + "*")); } //进件大区 if (Func.isNotEmpty(investigatepiecePageReqDTO.getPieceFromArea())) { mustQuery.must(QueryBuilders.wildcardQuery("pieceFromArea.keyword", "*" + investigatepiecePageReqDTO.getPieceFromArea() + "*")); } //进件状态 if (Func.isNotEmpty(investigatepiecePageReqDTO.getBusinessStatus())) { mustQuery.must(QueryBuilders.termQuery("businessStatus.keyword", investigatepiecePageReqDTO.getBusinessStatus())); } //产品类型 if (Func.isNotEmpty(investigatepiecePageReqDTO.getProductCode())) { mustQuery.must(QueryBuilders.termQuery("productName.keyword", investigatepiecePageReqDTO.getProductCode())); } else { //默认查询当前用户权限下的所有产品类型 List<String> productCodes = getProductCodeByCurrentUser(); mustQuery.must(QueryBuilders.termsQuery("productName.keyword", productCodes)); } //反欺诈处理人 if (Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionMaker())) { mustQuery.must(QueryBuilders.wildcardQuery("decisionMaker.keyword", "*" + investigatepiecePageReqDTO.getDecisionMaker() + "*")); } //单位地址 if (Func.isNotEmpty(investigatepiecePageReqDTO.getCompanyAddress())) { mustQuery.must(QueryBuilders.wildcardQuery("companyAddress.keyword", "*" + investigatepiecePageReqDTO.getCompanyAddress() + "*")); } //欺诈报警 if (Func.isNotEmpty(investigatepiecePageReqDTO.getFraudAlarmLevelCode())) { DictionaryDTO dictionaryDTO = DictionaryKit.QZJGLB_MAP().get(investigatepiecePageReqDTO.getFraudAlarmLevelCode()); mustQuery.must(QueryBuilders.termQuery("fraudAlarmLevelName.keyword", dictionaryDTO.getName())); } //决策结果 if (Func.isNotEmpty(investigatepiecePageReqDTO.getResultCode())) { DictionaryDTO dictionaryDTO = DictionaryKit.JCJG_MAP().get(investigatepiecePageReqDTO.getResultCode()); mustQuery.must(QueryBuilders.termQuery("decisionResult.keyword", dictionaryDTO.getName())); } //反欺诈处理状态 if (Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionStatus())) { DictionaryDTO dictionaryDTO = DictionaryKit.JCZT_MAP().get(investigatepiecePageReqDTO.getDecisionStatus()); mustQuery.must(QueryBuilders.termQuery("decisionStatus.keyword", dictionaryDTO.getName())); } //客服人员 if (Func.isNotEmpty(investigatepiecePageReqDTO.getCustomerService())) { mustQuery.must(QueryBuilders.wildcardQuery("serviceUserName.keyword", "*" + investigatepiecePageReqDTO.getCustomerService() + "*")); } //客户姓名 if (Func.isNotEmpty(investigatepiecePageReqDTO.getCustomerName())) { mustQuery.must(QueryBuilders.wildcardQuery("customerName.keyword", "*" + investigatepiecePageReqDTO.getCustomerName() + "*")); } //团队经理 if (Func.isNotEmpty(investigatepiecePageReqDTO.getTeamManager())) { mustQuery.must(QueryBuilders.wildcardQuery("teamManager.keyword", "*" + investigatepiecePageReqDTO.getTeamManager() + "*")); } //进件时间 try { if (Func.isNotEmpty(investigatepiecePageReqDTO.getEnterCreditTimeStart()) && Func.isNotEmpty(investigatepiecePageReqDTO.getEnterCreditTimeEnd())) { mustQuery.must(QueryBuilders.rangeQuery("enterCreditTime").from(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getEnterCreditTimeStart()).getTime()).to(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getEnterCreditTimeEnd()).getTime())); } else if (Func.isNotEmpty(investigatepiecePageReqDTO.getEnterCreditTimeStart())) { mustQuery.must(QueryBuilders.rangeQuery("enterCreditTime").from(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getEnterCreditTimeStart()).getTime())); } else if (Func.isNotEmpty(investigatepiecePageReqDTO.getEnterCreditTimeEnd())) { mustQuery.must(QueryBuilders.rangeQuery("enterCreditTime").to(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getEnterCreditTimeEnd()).getTime())); } } catch (Exception e) { e.printStackTrace(); } //决策时间 try { if (Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionTimeStart()) && Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionTimeEnd())) { mustQuery.must(QueryBuilders.rangeQuery("decisionTime").from(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getDecisionTimeStart()).getTime()).to(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getDecisionTimeEnd()).getTime())); } else if (Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionTimeStart())) { mustQuery.must(QueryBuilders.rangeQuery("decisionTime").from(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getDecisionTimeStart()).getTime())); } else if (Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionTimeEnd())) { mustQuery.must(QueryBuilders.rangeQuery("decisionTime").to(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getDecisionTimeEnd()).getTime())); } } catch (Exception e) { e.printStackTrace(); } //是否初始化 if (Boolean.valueOf(investigatepiecePageReqDTO.getIfInit())) { //第一次进入,设置时间为今天 mustQuery.must(QueryBuilders.rangeQuery("enterCreditTime").from(getTodayStartTime()).to(getTodayEndTime())); } //组装分页 排序 条件 SearchSourceBuilder source = new SearchSourceBuilder(); source.query(mustQuery); //排序字段 if (Func.isNotEmpty(investigatepiecePageReqDTO.getSortChangeKey())) { SortOrder sortOrder = "asc".equals(investigatepiecePageReqDTO.getSortChangeWay()) ? SortOrder.ASC : SortOrder.DESC; if (Func.equals(investigatepiecePageReqDTO.getSortChangeKey(), InvestigatePieceEntity.DB_COL_ENTER_CREDIT_TIME)) { source.sort("enterCreditTime", sortOrder); } else if (Func.equals(investigatepiecePageReqDTO.getSortChangeKey(), InvestigatePieceEntity.DB_COL_DECISION_TIME)) { source.sort("decisionTime", sortOrder); } else if (Func.equals(investigatepiecePageReqDTO.getSortChangeKey(), InvestigatePieceEntity.DB_COL_DECISION_MAKER)) { source.sort("decisionMaker.keyword", sortOrder); } else { source.sort("enterCreditTime", sortOrder); } } else { source.sort("enterCreditTime", SortOrder.DESC); } //设置分页 注意:es分页默认从0页开始 source.from(pageNo - 1); source.size(pageSize); //查询结果 Page<InvestigatePiecePageResDTO> page = getResponseFromEs(source); return page; } 复制代码
private Page<InvestigatePiecePageResDTO> getResponseFromEs(SearchSourceBuilder source) { //创建查询es请求 SearchHits hits; try { SearchResponse response = elasticsearchManager.getPieceResponseWithBuilder(source); hits = response.getHits(); } catch (Exception e) { throw new ServiceException(500, "es搜索引擎连接失败"); } //获取命中数据 SearchHit[] hitsHits = hits.getHits(); //总命中数 Long totalHits = hits.getTotalHits(); //组装前端返回DTO List<InvestigatePiecePageResDTO> result = Lists.newArrayList(); for (SearchHit hitsHit : hitsHits) { InvestigatePiecePageResDTO investigatePiecePageResDTO = new InvestigatePiecePageResDTO(); Map<String, Object> sourceAsMap = hitsHit.getSourceAsMap(); investigatePiecePageResDTO.setId(Long.valueOf(hitsHit.getId())); investigatePiecePageResDTO.setPieceCode((String) sourceAsMap.get("pieceCode")); investigatePiecePageResDTO.setAppKey((String) sourceAsMap.get("appKey")); Integer decisionId = (Integer) sourceAsMap.get("decisionId"); if (null != decisionId) { investigatePiecePageResDTO.setDecisionId(decisionId.longValue()); } List<String> orderCode = (List<String>) sourceAsMap.get("orderCode"); if (Func.isNotEmpty(orderCode)) { investigatePiecePageResDTO.setOrderCode(((List<String>) sourceAsMap.get("orderCode")).get(0)); } investigatePiecePageResDTO.setCustomerName((String) sourceAsMap.get("customerName")); investigatePiecePageResDTO.setIdCard((String) sourceAsMap.get("idCard")); investigatePiecePageResDTO.setProductName(DictionaryKit.CPLX_MAP().get(sourceAsMap.get("productName")).getName()); investigatePiecePageResDTO.setEnterCreditTime(SIMPLE_DATE_FORMAT.format(new Date((Long) sourceAsMap.get("enterCreditTime")))); investigatePiecePageResDTO.setBusinessStatus((String) sourceAsMap.get("businessStatus")); Long decisionTime = (Long) sourceAsMap.get("decisionTime"); if (null != decisionTime) { investigatePiecePageResDTO.setDecisionTime(SIMPLE_DATE_FORMAT.format(new Date(decisionTime))); } investigatePiecePageResDTO.setDecisionResult((String) sourceAsMap.get("decisionStatus")); investigatePiecePageResDTO.setFraudAlarmLevelName((String) sourceAsMap.get("fraudAlarmLevelName")); investigatePiecePageResDTO.setDecisionMaker((String) sourceAsMap.get("decisionMaker")); investigatePiecePageResDTO.setServicePoint((String) sourceAsMap.get("servicePoint")); String decisionStatus = (String) sourceAsMap.get("decisionStatus"); if ("已决策".equals(decisionStatus)) { investigatePiecePageResDTO.set_disabled(false); } else { investigatePiecePageResDTO.set_disabled(true); } result.add(investigatePiecePageResDTO); } //组装前端分页DTO Page<InvestigatePiecePageResDTO> page = new Page<>(); page.setRecords(result); page.setTotal(totalHits.intValue()); return page; } 复制代码
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 实现业务数据的同步迁移 · 思路一
- 通过迁移学习实现OCT图像识别
- 快速实现地图迁移数据可视化
- 用TensorFlow实现神经网络画风迁移
- 使用云和虚拟化实现数据迁移的最佳实践
- vSPhere使用vMotion实现虚拟机热迁移详解
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
产品经理修炼之道
费杰 / 机械工业出版社华章公司 / 2012-7-30 / 59.00元
本书由资深产品经理、中国最大的产品经理沙龙Pmcaff创始人费杰亲自执笔,微软、腾讯、百度、新浪、搜狐、奇虎、阿里云、Evernote等国内外20余家大型互联网企业资深产品经理和技术专家联袂推荐。用系统化的方法论和丰富的实战案例解读了优秀产品经理所必须修炼的产品规划能力、产品设计能力、产品执行能力,以及思考、分析和解决问题的能力和方法,旨在为互联网产品经理打造核心竞争力提供实践指导。 全书一......一起来看看 《产品经理修炼之道》 这本书的介绍吧!