标签:task 工厂 保存文件 dna 压缩 inter async executor direct
#文件上传
1、存在形式:web服务,可以跨平台部署
2、文件监控:使用apache下commons-io.jar包,继承FileAlterationListenerAdaptor类定义一个监听器,创建FileAlterationObserver观察者,将监听器注入到观察者,当文件发生变化,观察者会调用监听器的方法。
FileAlterationListenerAdaptor和FileAlterationObserver都属于org.apache.commons.io.monitor 包下的类,这个包的作用是监控指定目录下的文件状态,它使用观察者设计模式设计这些类的关系。
a、可设置监听路径
b、可设置监听间隔
c、可设置监听指定格式,支持开启和关闭配置,如果开启了指定格式,最终文件只上传指定格式的文件
3、扫描功能:为了解决历史文件问题
a、可以设置扫描频率,即每隔多久扫一次,只适合移动文件模式
b、支持开启和关闭配置
c、扫描到的文件采用并行流(利用CPU的多核)的方式,将任务交给线程池去执行
4、文件上传:
a、可以设置上传时间段
b、上传模式[0-复制文件 1-移动文件]
c、采用feign+okhttp完成,稳定高效,通过连接池来减小响应延迟,还有透明的GZIP压缩,请求缓存等优势。
d、使用线程池,异步多线程去执行,提高执行效率
#pom配置
<!-- 添加依赖 --> <dependencies> <!--部署成war包时开启↓↓↓↓--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.tomcat.embed</groupId> <artifactId>tomcat-embed-jasper</artifactId> <scope>provided</scope> </dependency> <!--部署成war包时开启↑↑↑↑--> </dependencies>
#配置文件
server.port=8000 server.servlet.context-path=/nb-file-server spring.servlet.multipart.max-request-size=1024MB spring.servlet.multipart.max-file-size=1024MB file.upload.path=C:\\test\\tmp3
#控制层
@RestController @RequestMapping("/file") public class FileController { @Value("${file.upload.path}") private String path; @PostMapping(value = "/upload", consumes = MediaType.MULTIPART_FORM_DATA_VALUE) public boolean upload(@RequestPart("file") MultipartFile file) { if (file == null) { return false; } try { File parent = new File(path); if (!parent.exists()) { boolean mkdirs = parent.mkdirs(); if (!mkdirs) { return false; } } // 保存文件 file.transferTo(new File(parent.getAbsolutePath() + File.separator + file.getOriginalFilename())); return true; } catch (Exception e) { e.printStackTrace(); } return false; } }
#启动类
@SpringBootApplication(exclude = DruidDataSourceAutoConfigure.class) public class FileServerApplication extends SpringBootServletInitializer { public static void main(String[] args) { SpringApplication.run(FileServerApplication.class, args); } @Override protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) { return builder.sources(FileServerApplication.class); } }
#文件接收
1、存在形式:web服务,可以跨平台部署
2、自定义配置
a、可设置文件保存路径
b、可设置单个文件大小
c、可设置单次请求的文件总大小
#pom配置
<!-- 添加依赖 --> <dependencies> <!--部署成war包时开启↓↓↓↓--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.tomcat.embed</groupId> <artifactId>tomcat-embed-jasper</artifactId> <scope>provided</scope> </dependency> <!--部署成war包时开启↑↑↑↑--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency> <!-- feign底层采用的http请求方式 不加则默认使用JDK的HttpURLConnection --> <dependency> <groupId>io.github.openfeign</groupId> <artifactId>feign-okhttp</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> </dependency> </dependencies>
#配置文件
server.port=8001 feign.okhttp.enabled=true logging.level.com.nubomed.apiservice.client=debug #文件上传目标服务器地址 endpoint.file.server=http://192.168.1.220:8000/nb-file-server #监听路径 monitor.dir=C:\\test\\tmp #监听间隔,单位毫秒 monitor.interval=5000 #是否监听指定文件格式 monitor.file.suffix.enable=false #监听文件格式 monitor.file.suffix=.txt #上传文件时间段 file.upload.time.slot=00:00-08:00 #0-复制文件 1-移动文件 file.operate.mode=1 #是否开启主动扫描功能,[0-复制文件]模式下,不会执行文件上传操作 scanner.enable=true #扫描频率,多少分钟执行一次 scanner.rate=1
#配置类-线程池
@Configuration @EnableAsync public class ExecutorConfig { @Bean public Executor asyncServiceExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(5); //配置最大线程数 executor.setMaxPoolSize(10); //配置队列大小 executor.setQueueCapacity(1000); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix("nb-file-client-async-"); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //执行初始化 executor.initialize(); return executor; } }
#配置类-feign
@Configuration public class FeignConfig { @Bean Logger.Level feignLoggerLevel() { //记录请求和响应的标头,正文和元数据 return Logger.Level.FULL; } }
#文件监听器
public class FileListener extends FileAlterationListenerAdaptor { private ListenerService listenerService; public FileListener(ListenerService listenerService) { this.listenerService = listenerService; } @Override public void onFileCreate(File file) { listenerService.handleFileCreate(file); } @Override public void onFileChange(File file) { listenerService.handleFileChange(file); } @Override public void onFileDelete(File file) { } @Override public void onDirectoryCreate(File directory) { } @Override public void onDirectoryChange(File directory) { } @Override public void onDirectoryDelete(File directory) { } @Override public void onStart(FileAlterationObserver observer) { } @Override public void onStop(FileAlterationObserver observer) { } }
#文件监听工厂类
@Component public class FileListenerFactory { @Value("${monitor.dir}") private String monitorDir; @Value("${monitor.interval}") private long interval; @Value("${monitor.file.suffix.enable}") private boolean enable; @Value("${monitor.file.suffix}") private String suffix; @Autowired private ListenerService listenerService; public FileAlterationMonitor getMonitor() { FileAlterationObserver observer = null; // 创建过滤器 if (enable) { IOFileFilter directories = FileFilterUtils.and(FileFilterUtils.directoryFileFilter(), HiddenFileFilter.VISIBLE); IOFileFilter files = FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter(suffix)); IOFileFilter filter = FileFilterUtils.or(directories, files); //装配过滤器 observer = new FileAlterationObserver(new File(monitorDir), filter); } else { observer = new FileAlterationObserver(new File(monitorDir)); } // 向监听者添加监听器,并注入业务服务 observer.addListener(new FileListener(listenerService)); // 返回监听者 return new FileAlterationMonitor(interval, observer); } }
#文件监听服务接口
public interface ListenerService { /** * 监听文件创建 * * @param file 创建的文件 */ void handleFileCreate(File file); /** * 监听文件修改 * * @param file 修改的文件 */ void handleFileChange(File file); /** * 执行文件扫描 */ void handleScanner(); }
#文件监听服务接口实现
@Service @Slf4j public class ListenerServiceImpl implements ListenerService { @Resource private AsyncService asyncService; @Value("${file.operate.mode}") private Integer mode; @Value("${file.upload.time.slot}") private String timeSlot; @Value("${monitor.dir}") private String dir; @Value("${monitor.file.suffix.enable}") private boolean enableSuffix; @Value("${monitor.file.suffix}") private String suffix; @Override public void handleFileCreate(File file) { log.info("发现新的文件[{}]...", file.getName()); if (isHandleTimeSlot()) { //asyncService.handleFileUpload(file); } } @Override public void handleFileChange(File file) { log.info("发现文件[{}]修改了...", file.getName()); if (isHandleTimeSlot()) { asyncService.handleFileUpload(file); } } @Override public void handleScanner() { if (mode == 0) { log.info("[复制文件模式]不执行扫描操作!"); return; } log.info("开始扫描目录[{}]文件...", dir); File file = new File(dir); File[] files = file.listFiles(); if (files == null || files.length == 0) { log.info("目录[{}]下没有发现文件!", dir); return; } log.info("已扫描到[{}]个文件", files.length); if (enableSuffix) { log.info("已开启扫描[{}]格式文件", suffix); List<File> fileList = Arrays.stream(files) .filter(file1 -> file1.getName().contains(suffix)).collect(Collectors.toList()); log.info("[{}]格式文件有[{}]个", suffix, fileList.size()); fileList.parallelStream().forEach(file12 -> asyncService.handleFileUpload(file12)); return; } Arrays.stream(files).parallel().forEach(file13 -> asyncService.handleFileUpload(file13)); } private LocalDateTime parseStringToDateTime(String time) { String[] split = time.split(":"); return LocalDateTime.of(LocalDate.now(), LocalTime.of(Integer.valueOf(split[0]), Integer.valueOf(split[1]), 0)); } private boolean isHandleTimeSlot() { String[] split = timeSlot.split("-"); LocalDateTime startTime = parseStringToDateTime(split[0]); LocalDateTime endTime = parseStringToDateTime(split[1]); LocalDateTime now = LocalDateTime.now(); if (now.isBefore(startTime) || now.isAfter(endTime)) { log.info("文件上传的时间段为[{}]", timeSlot); return false; } return true; } }
#文件上传服务接口
public interface AsyncService { /** * 执行异步任务-上传文件 * * @param file 目标文件 */ void handleFileUpload(File file); }
#文件上传服务实现
@Service @Slf4j public class AsyncServiceImpl implements AsyncService { private static final Map<String, Boolean> PROCESS_MAP = new ConcurrentHashMap<>(); @Resource private FileUploadClient fileUploadClient; @Value("${file.operate.mode}") private Integer mode; @Override @Async("asyncServiceExecutor") public void handleFileUpload(File file) { log.info("当前线程[{}]", Thread.currentThread().getName()); if (PROCESS_MAP.get(file.getName()) != null && PROCESS_MAP.get(file.getName())) { log.info("文件[{}]正在被处理中...", file.getName()); return; } PROCESS_MAP.put(file.getName(), true); if (!file.exists()) { log.info("文件[{}]已被处理!", file.getName()); return; } long start = System.currentTimeMillis(); log.info("文件[{}]正在上传...", file.getName()); boolean result = false; try { MultipartFile multipartFile = new MockMultipartFile("file", file.getName(), MediaType.MULTIPART_FORM_DATA_VALUE, new FileInputStream(file)); result = fileUploadClient.upload(multipartFile); if (result) { //移动文件 if (mode == 1) { log.info("开始删除文件[{}]...", file.getName()); boolean delete = file.delete(); if (delete) { log.info("文件[{}]删除成功!", file.getName()); } else { log.error("文件[{}]删除失败!", file.getName()); } } } } catch (Exception e) { e.printStackTrace(); } long end = System.currentTimeMillis(); long cost = end - start; if (result) { log.info("文件[{}]上传成功!耗时[{}]ms", file.getName(), cost); } else { log.error("文件[{}]上传失败!耗时[{}]ms", file.getName(), cost); } PROCESS_MAP.remove(file.getName()); } }
#启动类
@SpringBootApplication(exclude = DruidDataSourceAutoConfigure.class) @EnableFeignClients @EnableScheduling @Slf4j public class FileClientApplication extends SpringBootServletInitializer implements CommandLineRunner { @Autowired private FileListenerFactory fileListenerFactory; @Autowired private ThreadPoolTaskScheduler threadPoolTaskScheduler; @Autowired private ListenerService listenerService; @Value("${scanner.enable}") private boolean enable; @Value("${scanner.rate}") private String rate; public static void main(String[] args) { SpringApplication.run(FileClientApplication.class, args); } @Override protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) { return builder.sources(FileClientApplication.class); } @Override public void run(String... args) throws Exception { try { // 创建监听者 FileAlterationMonitor fileAlterationMonitor = fileListenerFactory.getMonitor(); fileAlterationMonitor.start(); if (enable) { log.info("启动主动扫描功能,扫描频率[{}]分钟执行一次", rate); String cron = String.format("0/59 0/%s * * * ?", rate); threadPoolTaskScheduler.schedule(() -> listenerService.handleScanner(), new CronTrigger(cron)); } else { log.info("未启动主动扫描功能,原因[scanner.enable={}]", enable); } } catch (Exception e) { e.printStackTrace(); } } }
标签:task 工厂 保存文件 dna 压缩 inter async executor direct
原文地址:https://www.cnblogs.com/lushichao/p/13041876.html