数据迁移方案 + Elasticsearch在综合搜索列表实现

栏目: 后端 · 发布时间: 6年前

数据迁移方案 + Elasticsearch在综合搜索列表实现

2 迁移实现

2.1 从数据库读取进件id存到文件中

@RequestMapping(value = "***", method = {RequestMethod.GET})
    @ResponseBody
    public void writeFile(Integer id) {
        if (null == id) {
            pieceSearchService.writePieceFile();
        } else {
            pieceSearchService.writePieceFile(id);
        }
    }
    
    // PieceSearchServiceImpl
    public void writePieceFile() {
        dataMoveManager.startWriteFile();
    }

复制代码
// AbstractTemplate
    public void startWriteFile() {
        readDataAndWriteFile();
    }
    // DataMoveManager
    protected void readDataAndWriteFile() {
        pieceInfoWriteFileTask.start();
    }
    
    // AbstractTaskTemplate
    public void start() {
        List<? extends Callable<String>> task = createTask();
        ExecutorService executorService = createExecutorService();
        List list = runTaskForResult(executorService, task);
        writeLogFile(list);
    }
    
    // PieceInfoWriteFileTask
    protected List<? extends Callable<String>> createTask() {
        //获取进件起始id 并构建任务
        //查询所有进件数据(500W+)
        List<PieceIdAndIdDTO> pieceIdAndIdDTOS = pieceSearchDao.queryAllPieceInfo();
        List<MyTask> tasks = this.getTaskFromPieceId(pieceIdAndIdDTOS);
        return tasks;
    }
    
    // PieceInfoWriteFileTask
    List<MyTask> getTaskFromPieceId(List<PieceIdAndIdDTO> pieceIdAndIdDTOS) {
        //任务集合
        List<MyTask> result = Lists.newArrayList();
        Integer index = 1;
        int fileName = 1;
        List<PieceIdAndIdDTO> ids = Lists.newArrayList();
        for (PieceIdAndIdDTO pieceIdAndIdDTO : pieceIdAndIdDTOS) {
            ids.add(pieceIdAndIdDTO);
            //每 条创建一个任务
            if (index % FILE_MAX_NUM == 0) {
                //创建任务
                MyTask myTask = new MyTask();
                myTask.setIds(ids);
                myTask.setFileName(Integer.toString(fileName));
                result.add(myTask);
                ids = Lists.newArrayList(); //一定要new
                fileName++;
            }
            index++;
        }
        if (!ids.isEmpty()) {
            MyTask myTask = new MyTask();
            myTask.setIds(ids);
            myTask.setFileName(Integer.toString(fileName));
            result.add(myTask);
        }
        return result;
    }
    // PieceInfoWriteFileTask
    protected ExecutorService createExecutorService() {
        //创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(MAX_THREAD_NUM);
        return executorService;
    }
    
    // AbstractTaskTemplate
    private List runTaskForResult(ExecutorService executorService, List<? extends Callable<String>> tasks) {
        //线程池执行任务
        List<Future<String>> futures = new ArrayList<>();
        try {
            futures = executorService.invokeAll(tasks);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //关闭线程池
            executorService.shutdown();
        }
        //组装返回结果
        List<String> results = Lists.newArrayList();
        for (Future<String> future : futures) {
            try {
                results.add(future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        return results;
    }
    
    // PieceInfoWriteFileTask
    protected void writeLogFile(List results) {
        //日志文件名称
        String fileName = "writeFile.log";
        //文件路径
        String filePath = ABSOLUTE_PATH + FILE_ROOT_PATH + fileName;
        //按行写出
        try {
            FileKit.writeLines(results, filePath, "UTF-8");
        } catch (IOException e) {
            e.printStackTrace();
            log.error("class:PieceInfoWriteFileTask  method: writeLogFile()" +
                " 日志信息写出失败, 失败原因:" + e.getMessage());
        }
    }
复制代码
// PieceInfoWriteFileTask
    class MyTask implements Callable<String> {

        private List<PieceIdAndIdDTO> ids;

        private String fileName;

        @Override
        public String call() {
            String message = "文件生成失败";
            try {
                message = writeFile(ids, fileName);
                long l2 = System.currentTimeMillis();
            } catch (Exception e) {
                return message + "----失败原因:" + e.getMessage();
            }
            return message;
        }
    }
    
    // PieceInfoWriteFileTask
    private String writeFile(List<PieceIdAndIdDTO> pieceIdAndId, String fileName) {
        //文件路径+文件名
        String filePath = ABSOLUTE_PATH + FILE_ROOT_PATH + fileName + ".txt";
        try {
            log.info("文件输出路径:" + filePath);
            //输出文件
            FileKit.writeLines(pieceIdAndId, filePath, "UTF-8");
        } catch (IOException e) {
            return "此文件生成失败:" + fileName + "失败原因:" + e.getMessage();
        }
        return "此文件生成成功:" + fileName;
    }
复制代码

2.2 数据迁移

2.2.1 从文件读取id存到队列中

@RequestMapping(value = "***", method = {RequestMethod.GET})
    @ResponseBody
    public void dataMove() {
        pieceSearchService.pieceDataMove();
    }
    
    // PieceSearchServiceImpl
    public void pieceDataMove() {
        dataMoveManager.startReadForDb();
    }
    
    
复制代码
// AbstractTemplate
    public void startReadForDb() {
        readFileForDb();
    }
    // DataMoveManager
    protected void readFileForDb() {
        pieceInfoDataMoveTask.start();
    }
    
    // AbstractTaskTemplate
    public void start() {
        List<? extends Callable<String>> task = createTask();
        ExecutorService executorService = createExecutorService();
        List list = runTaskForResult(executorService, task);
        writeLogFile(list);
    }
    
    // PieceInfoDataMoveTask
    protected List<? extends Callable<String>> createTask() {

        //获取此路径下的所有.txt文件
        List<File> files = null;
        try {
            files = FileKit.listFilesWithSuffix(new File(ABSOLUTE_PATH + FILE_ROOT_PATH), ".txt");
        } catch (Exception e) {
            e.printStackTrace();
            log.error("class:PieceInfoDataMoveTask  method: createTask()" +
                " 创建任务组失败, 失败原因:" + e.getMessage());
        }

        //根据文件构建任务
        List<MyTask> tasks = getTaskWithFile(files);
        return tasks;
    }
    
    // PieceInfoDataMoveTask
     private List<MyTask> getTaskWithFile(List<File> files) {
        ArrayList<MyTask> tasks = Lists.newArrayList();
        for (File file : files) {
            MyTask myTask = new MyTask();
            myTask.setFile(file);
            tasks.add(myTask);
        }
        return tasks;
    }
    
    // PieceInfoDataMoveTask
    private class MyTask implements Callable<String> {

        private File file;

        @Override
        public String call() throws Exception {
            //每次保存的进件Id
            ArrayList<String> ids = new ArrayList<>();
            //开始时间
            long beginTime = System.currentTimeMillis();
            //文件名称
            String fileName = file.getName();
            BufferedReader reader = null;
            //行数据
            String line;
            int a = 1;
            try {
                reader = FileKit.getReader(file, "UTF-8");
                while (true) {
                    line = reader.readLine();
                    if (Func.isEmpty(line)) {
                        break;
                    }
                    //每行数据格式为  id:1121  pieceId:ZPTE1213
                    String id = line.split("\\s+")[0].split(":")[1];
                    ids.add(id);
                    //每多少行  执行插入操作
                    if (a % MAX_SAVE_NUM == 0) {
                        //存入队列
                        QueueUtil.QUEUE_IDS.put(ids);
                        ids = Lists.newArrayList();
                    }
                    a++;
                }
            } catch (Exception e) {
                return "读取" + fileName + "文件失败原因:" + e.getMessage();
            } finally {
                if (!ids.isEmpty()) {
                    //存入队列
                    QueueUtil.QUEUE_IDS.put(ids);
                }
                IoKit.close(reader);
            }
            long useTime = (System.currentTimeMillis() - beginTime) / 1000;
            return "读取" + fileName + "文件完毕,耗时:" + useTime + "秒 , 文件共:" + (a - 1) + "条数据";
        }
    }
    
    // QueueUtil
    public class QueueUtil {

    /**
     *队列一:存放从文件中读取的id集合
     */
    public static final LinkedBlockingQueue<List<String>> QUEUE_IDS = new LinkedBlockingQueue<List<String>>();

    /**
     * 队列二:存放组装数据过后的List<JsonObject>
     */
    public static final  LinkedBlockingQueue<List<JSONObject>> QUEUE_JSON_OBJECTS = new LinkedBlockingQueue<List<JSONObject>>();


    /**
     *队列三:存放所有失败id集合
     */
    public static final LinkedBlockingQueue<List<String>> QUEUE_FAIL_IDS = new LinkedBlockingQueue<List<String>>();

    /**
     *队列四:存放所有校验过的集合
     */
    public static final LinkedBlockingQueue<SaveToMysqlDTO> QUEUE_SAVETOMYSQL = new LinkedBlockingQueue<SaveToMysqlDTO>();

    /**
     *队列五:存放所有校验过的集合
     */
    public static final LinkedBlockingQueue<List<Document>> QUEUE_SAVETOMONGO = new LinkedBlockingQueue<List<Document>>();


    /**
     *队列六:存放所有校验过的集合
     */
    public static final LinkedBlockingQueue<Map<Long,PieceJsonDTO>> QUEUE_SAVETOES = new LinkedBlockingQueue< Map<Long,PieceJsonDTO>>();
}

复制代码
// PieceInfoDataMoveTask
    protected ExecutorService createExecutorService() {
        //创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(Integer.valueOf(MAX_THREAD_NUM));
        return executorService;
    }
    
    // AbstractTaskTemplate
    private List runTaskForResult(ExecutorService executorService, List<? extends Callable<String>> tasks) {
        //线程池执行任务
        List<Future<String>> futures = new ArrayList<>();
        try {
            futures = executorService.invokeAll(tasks);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //关闭线程池
            executorService.shutdown();
        }
        //组装返回结果
        List<String> results = Lists.newArrayList();
        for (Future<String> future : futures) {
            try {
                results.add(future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        return results;
    }
    
    // PieceInfoDataMoveTask
    protected void writeLogFile(List results) {
    }

复制代码

2.2.2 消费进件id队列(QUEUE_IDS)

//spring 配置文件加载bean 调用init() 方法
    <bean  class="com.migration.eagle.manager.PieceSearchManager" init-method="init" />
    /**
     * 启动线程
     */
    // PieceSearchManager
    private void init() {
        //组装JsonObject
        readQueueForAssembleJsonThread.start();
        log.info("线程1启动完毕,正在读取ids队列!!!!");
        //校验数据
        readQueueForValidateThread.start();
        log.info("线程2启动完毕,正在读取List<JsonObject>队列!!!!");
        //入库数据
        readQueueForSaveMysqlThread.start();
        log.info("线程3启动完毕,正在读取saveToMysql队列!!!!");
        //入库Mongo
        readQueueForSaveMongoThread.start();
        log.info("线程4启动完毕,正在读取saveToMongo队列!!!!");
        //入库es
        readQueueForSaveESThread.start();
        log.info("线程5启动完毕,正在读取saveToES队列!!!!");
        //写出失败日志
        readQueueForWriteLogThread.start();
        log.info("线程6启动完毕,正在读取失败ids队列!!!!");
    }   
复制代码
public class ReadQueueForAssembleJsonThread extends Thread {

    @Autowired
    PieceAssembleManager pieceAssembleManager;

    @Override
    public void run() {
        //创建线程池
        //任务数
        int countThread = Integer.valueOf(new Config().getMaxAssembleJsonThreadNum());
        ExecutorService executorService = Executors.newFixedThreadPool(countThread);
        for (int i = 0; i < countThread; i++) {
            MyTask task = new MyTask();
            executorService.execute(task);
        }
        executorService.shutdown();
    }


    @Data
    class MyTask implements Runnable {

        @Override
        public void run() {
            while (true) {
                try {
                    //读取队列中的ids
                    List<String> ids = QueueUtil.QUEUE_IDS.take();
                    if (null != ids) {
                        long l1 = System.currentTimeMillis();
                        //组装数据
                        Map<String, List> pieceJson_map = pieceAssembleManager.getPieceJson(ids);
                        //存入队列
                        QueueUtil.QUEUE_JSON_OBJECTS.put(pieceJson_map.get("success"));
                        //存入失败ids队列
                        QueueUtil.QUEUE_FAIL_IDS.put(pieceJson_map.get("fail"));
                        long l2 = System.currentTimeMillis();
                        log.info("组装JSON耗时:" + (l2 - l1)," ,队列大小: "+QueueUtil.QUEUE_IDS.size());
                        ids=null;
                        pieceJson_map=null;
                    }
                } catch (Exception e) {
                    log.error("组装数据失败,失败原因: "+e.getMessage());
                    e.printStackTrace();
                }
            }
        }
    }


}
复制代码

2.2.3 消费JsonObject队列(QUEUE_JSON_OBJECTS)

public class ReadQueueForValidateThread extends Thread {

    @Autowired
    PieceSearchManager pieceSearchManager;


    @Override
    public void run() {
        //创建线程池
        int countThread = Integer.valueOf(new Config().getMaxValidateThreadNum());
        ExecutorService executorService = Executors.newFixedThreadPool(countThread);
        List<JSONObject> jsonObjects = Lists.newArrayList();
        while (true) {
            try {
                //从队列中读取List<JsonObject>
                List<JSONObject> poll = QueueUtil.QUEUE_JSON_OBJECTS.take();
                if (null != poll) {
                    //创建任务
                    for (int i = 1; i <= poll.size(); i++) {
                        jsonObjects.add(poll.get(i - 1));
                        if (i % 100 == 0) {
                            MyTask myTask = new MyTask();
                            myTask.setJsonObjects(jsonObjects);
                            executorService.execute(myTask);
                            jsonObjects = Lists.newArrayList();
                        }
                    }
                    if (!jsonObjects.isEmpty()) {
                        MyTask myTask = new MyTask();
                        myTask.setJsonObjects(jsonObjects);
                        executorService.execute(myTask);
                        jsonObjects = Lists.newArrayList();
                    }
                    log.info("校验队列大小: "+QueueUtil.QUEUE_JSON_OBJECTS.size());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }


    }


    @Data
    class MyTask implements Runnable {
        private List<JSONObject> jsonObjects;

        @Override
        public void run() {
            long l = System.currentTimeMillis();
            SaveToMysqlDTO dto = new SaveToMysqlDTO();
            //mongo队列所需数据
            List<Document> toMongos = Lists.newArrayList();
            //es队列所需数据
            Map<Long,PieceJsonDTO>  id_PieceJsonDTO =Maps.newHashMap();

            List<InvestigatePieceEntity> investigatePieceDTOS = Lists.newArrayList();
            List<PieceDetailEntity> pieceDetailDTOS = Lists.newArrayList();
            List<PieceContactEntity> pieceContactDTOS = Lists.newArrayList();
            //失败结果
            List<String> failedJson = Lists.newArrayList();
            try {
                for (JSONObject finalObj : jsonObjects) {
                    Map<String, JSONObject> map = pieceSearchManager.validatePiece(finalObj);
                    if (map.containsKey("successData")) {
                        pieceSearchManager.savePiecesLast(map.get("successData"), investigatePieceDTOS, pieceContactDTOS, pieceDetailDTOS,id_PieceJsonDTO);
                        //增加id(存入mongo使用)
                        JsonObject jsonObject = JsonKit.parseJsonObject(finalObj.toString());
                        JsonElement appkey = JsonKit.getValueByPath(jsonObject, MongoPieceManagerImpl.PIECE_CODE_JSON_PATH);
                        jsonObject.add(MongoPieceManagerImpl.MONGO_ID, appkey);
                        toMongos.add(Document.parse(jsonObject.toString()));
                    } else {
                        failedJson.add(((Map) ((Map) map.get("failed").get("data")).get("applicant")).get("intpc_id").toString());
                    }
                    map=null;
                }
                dto.setInvestigatePieceDTOS(investigatePieceDTOS);
                dto.setPieceContactDTOS(pieceContactDTOS);
                dto.setPieceDetailDTOS(pieceDetailDTOS);

                //存入失败队列
                QueueUtil.QUEUE_FAIL_IDS.put(failedJson);
                //存入入库队列
                QueueUtil.QUEUE_SAVETOMYSQL.put(dto);
                QueueUtil.QUEUE_SAVETOMONGO.put(toMongos);
                QueueUtil.QUEUE_SAVETOES.put(id_PieceJsonDTO);

                long l1 = System.currentTimeMillis();
                log.info("校验耗时:" + (l1 - l));
                dto=null;
                toMongos=null;
                id_PieceJsonDTO=null;
                failedJson=null;
                jsonObjects=null;
            } catch (Exception e) {
                log.error("校验失败,失败原因:  "+e.getMessage());
                e.printStackTrace();
            }
        }
    }

}
复制代码

2.2.4 消费保存到数据库数据队列(QUEUE_SAVETOMYSQL)

public class ReadQueueForSaveMysqlThread extends Thread {

    @Autowired
    PieceSearchManager pieceSearchManager;

    @Override
    public void run() {
        //创建线程池
        //任务数
        int countThread = Integer.valueOf(new Config().getMaxSaveMysqlThreadNum());
        ExecutorService executorService = Executors.newFixedThreadPool(countThread);
        for (int i = 0; i < countThread; i++) {
            MyTask task = new MyTask();
            executorService.execute(task);
        }
        executorService.shutdown();
    }


    @Data
    class MyTask implements Runnable {

        @Override
        public void run() {
            while (true) {
                try {
                    //从队列中读取校验过后的数据
                    SaveToMysqlDTO dto = QueueUtil.QUEUE_SAVETOMYSQL.take();
                    if (null != dto) {
                        //数据入库
                        long l = System.currentTimeMillis();
                        pieceSearchManager.savePieceToSql(dto);
                        long l1 = System.currentTimeMillis();
                        log.info("入库 Mysql 耗时:" + (l1 - l)+" , 队列大小: "+QueueUtil.QUEUE_SAVETOMYSQL.size());
                    }
                } catch (InterruptedException e) {
                    log.error("插入Mysql失败,失败原因:  "+e.getMessage());
                    e.printStackTrace();
                }
            }
        }
    }

}
复制代码

2.2.5 消费保存到 mongoDB 数据队列(QUEUE_SAVETOMONGO)

public class ReadQueueForSaveMongoThread extends Thread {

    @Autowired
    MongoUtil mongoUtil;

    private MongoCollection mongoCollection;

    @Override
    public void run() {
        //获取mongo 链接
        mongoCollection = mongoUtil.mongoClient();
        //创建线程池
        //任务数
        int countThread = Integer.valueOf(new Config().getMaxSaveMongoThreadNum());
        ExecutorService executorService = Executors.newFixedThreadPool(countThread);
        for (int i = 0; i < countThread; i++) {
            MyTask task = new MyTask();
            executorService.execute(task);
        }
        executorService.shutdown();
    }


    @Data
    class MyTask implements Runnable {

        @Override
        public void run() {
            while (true) {
                try {
                    //从队列中读取校验过后的数据
                    List<Document> documents = QueueUtil.QUEUE_SAVETOMONGO.take();
                    if (null != documents) {
                        //数据入库
                        long l = System.currentTimeMillis();
                        mongoCollection.insertMany(documents);
                        long l1 = System.currentTimeMillis();
                        log.info("Mongo耗时:" + (l1 - l)+", 队列大小: "+ QueueUtil.QUEUE_SAVETOMONGO.size());
                    }
                } catch (Exception e) {
                    log.error("插入mongo失败,失败原因:  "+e.getMessage());
                    e.printStackTrace();
                }
            }
        }
    }

}

复制代码

2.2.6 消费保存到Elasticsearch数据队列(QUEUE_SAVETOES)

public class ReadQueueForSaveESThread extends Thread {

    @Autowired
    ElasticsearchManager elasticsearchManager;

    @Override
    public void run() {
        //创建线程池
        //任务数
        int countThread = Integer.valueOf(new Config().getMaxSaveESThreadNum());
        ExecutorService executorService = Executors.newFixedThreadPool(countThread);
        for (int i = 0; i < countThread; i++) {
            MyTask task = new MyTask();
            executorService.execute(task);
        }
        executorService.shutdown();
    }


    @Data
    class MyTask implements Runnable {

        @Override
        public void run() {
            BulkProcessor bulkProcessor = elasticsearchManager.createBulkProcessor();
            while (true) {
                try {
                    //从队列中读取校验过后的数据
                    Map<Long, PieceJsonDTO> map = QueueUtil.QUEUE_SAVETOES.take();
                    if (null != map) {
                        for (Long id : map.keySet()) {
                            //创建request  blukprocessor
                            PieceJsonDTO pieceJsonDTO = map.get(id);
                            String s = JsonKit.toJsonSerializeNulls(pieceJsonDTO);
                            bulkProcessor.add(elasticsearchManager.createPieceIndexRequest(Long.toString(id),s));
                            pieceJsonDTO=null;
                        }
                        map = null;
                        log.info("es队列大小:"+QueueUtil.QUEUE_SAVETOES.size());
                    }
                } catch (Exception e) {
                    log.error("插入es失败,失败原因:  "+e.getMessage());
                    e.printStackTrace();
                }
            }
        }
    }

}
复制代码

2.2.7 消费失败队列保存到文件中(QUEUE_FAIL_IDS)

public class ReadQueueForWriteLogThread extends Thread {

    /**
     * 文件根路径
     */
    private final static String FILE_ROOT_PATH = new Config().getFileRootPath();
    /**
     * 项目路径
     */
    private final static String ABSOLUTE_PATH = FileKit.getAbsolutePath("");


    @Override
    public void run() {
        //日志文件名称
        String fileName = "datamove.log";
        //文件路径
        String filePath = ABSOLUTE_PATH + FILE_ROOT_PATH + fileName;
        while (true) {
            try {
                List<String> fail_ids = QueueUtil.QUEUE_FAIL_IDS.take();
                if (null != fail_ids) {
                    ArrayList<String> result = Lists.newArrayList();
                    String s = "失败个数:" + fail_ids.size() + " , 失败id: " + fail_ids;
                    result.add(s);
                    //按行写出
                    FileKit.appendLines(result, filePath, "UTF-8");
                }
            } catch (Exception e) {
                log.error("写出失败ids失败,失败原因:  "+e.getMessage());
                e.printStackTrace();
            }
        }
    }
}
复制代码

2.3 失败数据处理

@RequestMapping(value = "***", method = {RequestMethod.GET})
    @ResponseBody
    public void readLog() {
        pieceSearchService.pieceLogFile();
    }
    
    // PieceSearchServiceImpl
    public void pieceLogFile() {
        dataMoveManager.startReadLogForDb();
    }
    
    // AbstractTemplate
    public void startReadLogForDb() {
        readLogFileForDb();
    }
    
    // DataMoveManager
    protected void readLogFileForDb() {
        pieceInfoLogFileTask.start();
    }
    
    // AbstractTaskTemplate
    public void start() {
        List<? extends Callable<String>> task = createTask();
        ExecutorService executorService = createExecutorService();
        List list = runTaskForResult(executorService, task);
        writeLogFile(list);
    }
    
    // PieceInfoLogFileTask
    protected List<? extends Callable<String>> createTask() {
        //获取 datamove.log路径
        String fileName = "datamove.log";
        String filePath = ABSOLUTE_PATH + FILE_ROOT_PATH + fileName;

        //按行读取  获取文件中 失败id:
        List<String> ids = Lists.newArrayList();
        try {
            ids = readLines(new File(filePath), "UTF-8", ids);
        } catch (IOException e) {
            e.printStackTrace();
        }

        //创建任务组
        if (!Func.isEmpty(ids)) {
            List<MyTask> taskWithIds = getTaskWithIds(ids);
            return taskWithIds;
        }
        return null;
    }
    
    // PieceInfoLogFileTask
    private List<MyTask> getTaskWithIds(List<String> ids) {
        //一百个ids 起一个任务
        ArrayList<MyTask> tasks = Lists.newArrayList();
        int a = 1;
        int beginIndex = 0;
        while (true) {
            if (a * 100 >= ids.size()) {
                MyTask myTask = new MyTask();
                myTask.setIds(ids.subList(beginIndex, ids.size()));
                tasks.add(myTask);
                break;
            }
            MyTask myTask = new MyTask();
            //subList() 包左不包右
            myTask.setIds(ids.subList(beginIndex, beginIndex + 100));
            tasks.add(myTask);
            beginIndex += 100;
            a++;
        }
        return tasks;
    }
    
    private class MyTask implements Callable<String> {

        private List<String> ids;

        @Override
        public String call() throws Exception {
            //每次保存的进件Id
            ArrayList<String> pieceIds = new ArrayList<>();
            //开始时间
            long beginTime = System.currentTimeMillis();
            for (int i = 0; i < ids.size(); i++) {
                pieceIds.add(ids.get(i));
                //每多少行  执行插入操作
                if (i % MAX_SAVE_NUM == 0) {
                    //执行查询保存操作
                    System.out.println(ids);
                    pieceIds.clear();
                }
            }
            if (!pieceIds.isEmpty()) {
                //执行查询保存操作

            }
            long useTime = (System.currentTimeMillis() - beginTime) / 1000;
            return "日志文件失败进件重新入库完毕,耗时:" + useTime + "秒 , 文件共:" + ids.size() + "条数据, 失败个数: , 失败id:";
        }
    }
    
    // PieceInfoLogFileTask
    protected ExecutorService createExecutorService() {
        //创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(Integer.valueOf(MAX_THREAD_NUM));
        return executorService;
    }
    
    // AbstractTaskTemplate
    private List runTaskForResult(ExecutorService executorService, List<? extends Callable<String>> tasks) {
        //线程池执行任务
        List<Future<String>> futures = new ArrayList<>();
        try {
            futures = executorService.invokeAll(tasks);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //关闭线程池
            executorService.shutdown();
        }
        //组装返回结果
        List<String> results = Lists.newArrayList();
        for (Future<String> future : futures) {
            try {
                results.add(future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        return results;
    }
    
    // PieceInfoLogFileTask
    protected void writeLogFile(List results) {
        //日志文件名称
        String fileName = "datamove.log";
        //文件路径
        String filePath = ABSOLUTE_PATH + FILE_ROOT_PATH + fileName;
        //按行写出
        try {
            FileKit.writeLines(results, filePath, "UTF-8");
        } catch (IOException e) {
            e.printStackTrace();
            log.error("class:PieceInfoDataMoveTask  method: writeLogFile()" +
                " 日志信息写出失败, 失败原因:" + e.getMessage());
        }
    }

复制代码

2.4 数据迁移流程图

数据迁移方案 + Elasticsearch在综合搜索列表实现

3 Elasticsearch在综合搜索列表实现

public Page<InvestigatePiecePageResDTO> pagination(final InvestigatePiecePageReqDTO investigatepiecePageReqDTO, final Integer pageNo, final Integer pageSize) {
        //创建es组合查询器
        BoolQueryBuilder mustQuery = QueryBuilders.boolQuery();
        //进件编号
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getPieceCode())) {
            mustQuery.must(QueryBuilders.wildcardQuery("pieceCode.keyword", "*" + investigatepiecePageReqDTO.getPieceCode() + "*"));
        }
        //手机号
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getMobile())) {
            mustQuery.must(QueryBuilders.wildcardQuery("pieceContacts.mobile.keyword", "*" + investigatepiecePageReqDTO.getMobile() + "*"));
        }
        //身份证号
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getIdCard())) {
            mustQuery.must(QueryBuilders.wildcardQuery("idCard.keyword", "*" + investigatepiecePageReqDTO.getIdCard() + "*"));
        }
        //服务网点
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getServicePoint())) {
            mustQuery.must(QueryBuilders.wildcardQuery("servicePoint.keyword", "*" + investigatepiecePageReqDTO.getServicePoint() + "*"));
        }
        //座机号
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getPhone())) {
            mustQuery.must(QueryBuilders.wildcardQuery("pieceContacts.phone.keyword", "*" + investigatepiecePageReqDTO.getPhone() + "*"));
        }
        //单位名称
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getCompanyName())) {
            mustQuery.must(QueryBuilders.wildcardQuery("companyName.keyword", "*" + investigatepiecePageReqDTO.getCompanyName() + "*"));
        }
        //工单编号
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getOrderCode())) {
            mustQuery.must(QueryBuilders.wildcardQuery("orderCode.keyword", "*" + investigatepiecePageReqDTO.getOrderCode() + "*"));
        }
        //案件编号
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getCaseCode())) {
            mustQuery.must(QueryBuilders.wildcardQuery("caseCode.keyword", "*" + investigatepiecePageReqDTO.getCaseCode() + "*"));
        }
        //规则搜索
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getRuleCode())) {
            mustQuery.must(QueryBuilders.wildcardQuery("ruleName.keyword", "*" + investigatepiecePageReqDTO.getRuleCode() + "*"));
        }
        //客户经理
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getCustomerManager())) {
            mustQuery.must(QueryBuilders.wildcardQuery("customerManager.keyword", "*" + investigatepiecePageReqDTO.getCustomerManager() + "*"));
        }
        //进件大区
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getPieceFromArea())) {
            mustQuery.must(QueryBuilders.wildcardQuery("pieceFromArea.keyword", "*" + investigatepiecePageReqDTO.getPieceFromArea() + "*"));
        }
        //进件状态
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getBusinessStatus())) {
            mustQuery.must(QueryBuilders.termQuery("businessStatus.keyword", investigatepiecePageReqDTO.getBusinessStatus()));
        }
        //产品类型
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getProductCode())) {
            mustQuery.must(QueryBuilders.termQuery("productName.keyword", investigatepiecePageReqDTO.getProductCode()));
        } else {
            //默认查询当前用户权限下的所有产品类型
            List<String> productCodes = getProductCodeByCurrentUser();
            mustQuery.must(QueryBuilders.termsQuery("productName.keyword", productCodes));
        }
        //反欺诈处理人
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionMaker())) {
            mustQuery.must(QueryBuilders.wildcardQuery("decisionMaker.keyword", "*" + investigatepiecePageReqDTO.getDecisionMaker() + "*"));
        }
        //单位地址
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getCompanyAddress())) {
            mustQuery.must(QueryBuilders.wildcardQuery("companyAddress.keyword", "*" + investigatepiecePageReqDTO.getCompanyAddress() + "*"));
        }
        //欺诈报警
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getFraudAlarmLevelCode())) {
            DictionaryDTO dictionaryDTO = DictionaryKit.QZJGLB_MAP().get(investigatepiecePageReqDTO.getFraudAlarmLevelCode());
            mustQuery.must(QueryBuilders.termQuery("fraudAlarmLevelName.keyword", dictionaryDTO.getName()));
        }
        //决策结果
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getResultCode())) {
            DictionaryDTO dictionaryDTO = DictionaryKit.JCJG_MAP().get(investigatepiecePageReqDTO.getResultCode());
            mustQuery.must(QueryBuilders.termQuery("decisionResult.keyword", dictionaryDTO.getName()));
        }
        //反欺诈处理状态
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionStatus())) {
            DictionaryDTO dictionaryDTO = DictionaryKit.JCZT_MAP().get(investigatepiecePageReqDTO.getDecisionStatus());
            mustQuery.must(QueryBuilders.termQuery("decisionStatus.keyword", dictionaryDTO.getName()));
        }
        //客服人员
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getCustomerService())) {
            mustQuery.must(QueryBuilders.wildcardQuery("serviceUserName.keyword", "*" + investigatepiecePageReqDTO.getCustomerService() + "*"));
        }
        //客户姓名
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getCustomerName())) {
            mustQuery.must(QueryBuilders.wildcardQuery("customerName.keyword", "*" + investigatepiecePageReqDTO.getCustomerName() + "*"));
        }
        //团队经理
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getTeamManager())) {
            mustQuery.must(QueryBuilders.wildcardQuery("teamManager.keyword", "*" + investigatepiecePageReqDTO.getTeamManager() + "*"));
        }
        //进件时间
        try {
            if (Func.isNotEmpty(investigatepiecePageReqDTO.getEnterCreditTimeStart()) && Func.isNotEmpty(investigatepiecePageReqDTO.getEnterCreditTimeEnd())) {
                mustQuery.must(QueryBuilders.rangeQuery("enterCreditTime").from(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getEnterCreditTimeStart()).getTime()).to(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getEnterCreditTimeEnd()).getTime()));
            } else if (Func.isNotEmpty(investigatepiecePageReqDTO.getEnterCreditTimeStart())) {
                mustQuery.must(QueryBuilders.rangeQuery("enterCreditTime").from(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getEnterCreditTimeStart()).getTime()));
            } else if (Func.isNotEmpty(investigatepiecePageReqDTO.getEnterCreditTimeEnd())) {
                mustQuery.must(QueryBuilders.rangeQuery("enterCreditTime").to(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getEnterCreditTimeEnd()).getTime()));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        //决策时间
        try {
            if (Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionTimeStart()) && Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionTimeEnd())) {
                mustQuery.must(QueryBuilders.rangeQuery("decisionTime").from(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getDecisionTimeStart()).getTime()).to(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getDecisionTimeEnd()).getTime()));
            } else if (Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionTimeStart())) {
                mustQuery.must(QueryBuilders.rangeQuery("decisionTime").from(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getDecisionTimeStart()).getTime()));
            } else if (Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionTimeEnd())) {
                mustQuery.must(QueryBuilders.rangeQuery("decisionTime").to(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getDecisionTimeEnd()).getTime()));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        //是否初始化
        if (Boolean.valueOf(investigatepiecePageReqDTO.getIfInit())) {
            //第一次进入,设置时间为今天
            mustQuery.must(QueryBuilders.rangeQuery("enterCreditTime").from(getTodayStartTime()).to(getTodayEndTime()));
        }
        //组装分页  排序 条件
        SearchSourceBuilder source = new SearchSourceBuilder();
        source.query(mustQuery);
        //排序字段
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getSortChangeKey())) {
            SortOrder sortOrder = "asc".equals(investigatepiecePageReqDTO.getSortChangeWay()) ? SortOrder.ASC : SortOrder.DESC;
            if (Func.equals(investigatepiecePageReqDTO.getSortChangeKey(), InvestigatePieceEntity.DB_COL_ENTER_CREDIT_TIME)) {
                source.sort("enterCreditTime", sortOrder);
            } else if (Func.equals(investigatepiecePageReqDTO.getSortChangeKey(), InvestigatePieceEntity.DB_COL_DECISION_TIME)) {
                source.sort("decisionTime", sortOrder);
            } else if (Func.equals(investigatepiecePageReqDTO.getSortChangeKey(), InvestigatePieceEntity.DB_COL_DECISION_MAKER)) {
                source.sort("decisionMaker.keyword", sortOrder);
            } else {
                source.sort("enterCreditTime", sortOrder);
            }
        } else {
            source.sort("enterCreditTime", SortOrder.DESC);
        }
        //设置分页  注意:es分页默认从0页开始
        source.from(pageNo - 1);
        source.size(pageSize);

        //查询结果
        Page<InvestigatePiecePageResDTO> page = getResponseFromEs(source);
        return page;
    }
复制代码
private Page<InvestigatePiecePageResDTO> getResponseFromEs(SearchSourceBuilder source) {
        //创建查询es请求
        SearchHits hits;
        try {
            SearchResponse response = elasticsearchManager.getPieceResponseWithBuilder(source);
            hits = response.getHits();
        } catch (Exception e) {
            throw new ServiceException(500, "es搜索引擎连接失败");
        }
        //获取命中数据
        SearchHit[] hitsHits = hits.getHits();
        //总命中数
        Long totalHits = hits.getTotalHits();
        //组装前端返回DTO
        List<InvestigatePiecePageResDTO> result = Lists.newArrayList();
        for (SearchHit hitsHit : hitsHits) {
            InvestigatePiecePageResDTO investigatePiecePageResDTO = new InvestigatePiecePageResDTO();
            Map<String, Object> sourceAsMap = hitsHit.getSourceAsMap();
            investigatePiecePageResDTO.setId(Long.valueOf(hitsHit.getId()));
            investigatePiecePageResDTO.setPieceCode((String) sourceAsMap.get("pieceCode"));
            investigatePiecePageResDTO.setAppKey((String) sourceAsMap.get("appKey"));
            Integer decisionId = (Integer) sourceAsMap.get("decisionId");
            if (null != decisionId) {
                investigatePiecePageResDTO.setDecisionId(decisionId.longValue());
            }
            List<String> orderCode = (List<String>) sourceAsMap.get("orderCode");
            if (Func.isNotEmpty(orderCode)) {
                investigatePiecePageResDTO.setOrderCode(((List<String>) sourceAsMap.get("orderCode")).get(0));
            }
            investigatePiecePageResDTO.setCustomerName((String) sourceAsMap.get("customerName"));
            investigatePiecePageResDTO.setIdCard((String) sourceAsMap.get("idCard"));
            investigatePiecePageResDTO.setProductName(DictionaryKit.CPLX_MAP().get(sourceAsMap.get("productName")).getName());
            investigatePiecePageResDTO.setEnterCreditTime(SIMPLE_DATE_FORMAT.format(new Date((Long) sourceAsMap.get("enterCreditTime"))));
            investigatePiecePageResDTO.setBusinessStatus((String) sourceAsMap.get("businessStatus"));
            Long decisionTime = (Long) sourceAsMap.get("decisionTime");
            if (null != decisionTime) {
                investigatePiecePageResDTO.setDecisionTime(SIMPLE_DATE_FORMAT.format(new Date(decisionTime)));
            }
            investigatePiecePageResDTO.setDecisionResult((String) sourceAsMap.get("decisionStatus"));
            investigatePiecePageResDTO.setFraudAlarmLevelName((String) sourceAsMap.get("fraudAlarmLevelName"));
            investigatePiecePageResDTO.setDecisionMaker((String) sourceAsMap.get("decisionMaker"));
            investigatePiecePageResDTO.setServicePoint((String) sourceAsMap.get("servicePoint"));
            String decisionStatus = (String) sourceAsMap.get("decisionStatus");
            if ("已决策".equals(decisionStatus)) {
                investigatePiecePageResDTO.set_disabled(false);
            } else {
                investigatePiecePageResDTO.set_disabled(true);
            }
            result.add(investigatePiecePageResDTO);
        }

        //组装前端分页DTO
        Page<InvestigatePiecePageResDTO> page = new Page<>();
        page.setRecords(result);
        page.setTotal(totalHits.intValue());
        return page;
    }
复制代码

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

产品经理修炼之道

产品经理修炼之道

费杰 / 机械工业出版社华章公司 / 2012-7-30 / 59.00元

本书由资深产品经理、中国最大的产品经理沙龙Pmcaff创始人费杰亲自执笔,微软、腾讯、百度、新浪、搜狐、奇虎、阿里云、Evernote等国内外20余家大型互联网企业资深产品经理和技术专家联袂推荐。用系统化的方法论和丰富的实战案例解读了优秀产品经理所必须修炼的产品规划能力、产品设计能力、产品执行能力,以及思考、分析和解决问题的能力和方法,旨在为互联网产品经理打造核心竞争力提供实践指导。 全书一......一起来看看 《产品经理修炼之道》 这本书的介绍吧!

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具