SpringBoot中OKHttp和压缩文件的使用

OKHttp和压缩文件实战

一、发起请求处理

import okhttp3.*;
import org.junit.jupiter.api.*;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.List;
import java.util.ArrayList;
import java.util.stream.Collectors;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.Map;

public class ApiServiceCaller {
    private static final ExecutorService executor = Executors.newFixedThreadPool(10, runnable -> {
        Thread thread = new Thread(runnable);
        thread.setName("ApiServiceCaller-Thread");
        thread.setDaemon(true);
        return thread;
    });
    private static final Logger logger = Logger.getLogger(ApiServiceCaller.class.getName());
    private static final OkHttpClient client = new OkHttpClient.Builder()
            .connectTimeout(5, TimeUnit.SECONDS)
            .readTimeout(5, TimeUnit.SECONDS)
            .connectionPool(new ConnectionPool(10, 5, TimeUnit.MINUTES))
            .retryOnConnectionFailure(true)
            .build();

    // 异步调用外部系统 API 的方法
    public CompletableFuture<String> callExternalApi(String url, Map<String, String> params, String method) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Request request = buildRequest(url, params, method);
                return executeRequest(request);
            } catch (Exception e) {
                logger.log(Level.SEVERE, "构建请求或执行请求时出错", e);
                throw new RuntimeException("调用 API 时出错: " + url, e);
            }
        }, executor);
    }

    // 构建 GET 请求
    private Request buildGetRequest(String url, Map<String, String> params) {
        HttpUrl.Builder httpBuilder = HttpUrl.parse(url).newBuilder();
        if (params != null && !params.isEmpty()) {
            params.forEach(httpBuilder::addQueryParameter);
        }
        return new Request.Builder().url(httpBuilder.build()).get().build();
    }

    // 构建 POST 请求
    private Request buildPostRequest(String url, Map<String, String> params) throws IOException {
        RequestBody body = RequestBody.create(
                MediaType.parse("application/json"),
                new com.fasterxml.jackson.databind.ObjectMapper().writeValueAsString(params)
        );
        return new Request.Builder().url(url).post(body).build();
    }

    // 通用请求构建方法
    private Request buildRequest(String url, Map<String, String> params, String method) throws IOException {
        if ("GET".equalsIgnoreCase(method)) {
            return buildGetRequest(url, params);
        } else if ("POST".equalsIgnoreCase(method)) {
            return buildPostRequest(url, params);
        } else {
            throw new IllegalArgumentException("不支持的方法: " + method);
        }
    }

    // 执行请求并处理响应
    private String executeRequest(Request request) throws IOException {
        try (Response response = client.newCall(request).execute()) {
            if (response.isSuccessful() && response.body() != null) {
                String responseBody = response.body().string();
                logger.info("收到响应: " + responseBody);
                return responseBody;
            } else {
                logger.warning("收到非正常响应码: " + response.code());
                throw new RuntimeException("调用 API 失败,响应码: " + response.code());
            }
        }
    }

    // 处理多个不同 URL 和参数的 API 调用的方法
    public List<CompletableFuture<String>> callMultipleApis(List<ApiRequest> apiRequests) {
        logger.info("正在调用多个 API...");
        return apiRequests.stream()
                .map(request -> callExternalApi(request.getUrl(), request.getParams(), request.getMethod()))
                .collect(Collectors.toList());
    }

    // 高效处理 CompletableFuture 结果的方法
    public void processApiResponses(List<CompletableFuture<String>> futures) {
        CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        allOf.thenAccept(v -> futures.forEach(future -> {
            future.handle((response, throwable) -> {
                if (throwable != null) {
                    logger.log(Level.SEVERE, "处理 future 出错", throwable);
                    System.err.println("处理 future 出错: " + throwable.getMessage());
                } else {
                    logger.info("处理响应: " + response);
                    System.out.println(response);
                }
                return null;
            });
        }));
    }

    // 主函数,调用 API
    public static void main(String[] args) {
        ApiServiceCaller apiServiceCaller = new ApiServiceCaller();
        List<ApiRequest> apiRequests = new ArrayList<>();
        apiRequests.add(new ApiRequest("http://example.com/api1", Map.of("param1", "value1"), "GET"));
        apiRequests.add(new ApiRequest("http://example.com/api2", Map.of("key", "value"), "POST"));
        apiRequests.add(new ApiRequest("http://example.com/api3", Map.of("param3", "value3"), "GET"));

        logger.info("开始调用 API...");
        List<CompletableFuture<String>> apiCalls = apiServiceCaller.callMultipleApis(apiRequests);
        apiServiceCaller.processApiResponses(apiCalls);
    }

    // ApiServiceCaller 的单元测试
    public static class ApiServiceCallerTest {

        @Test
        public void testCallExternalApi_getRequest() {
            ApiServiceCaller caller = new ApiServiceCaller();
            CompletableFuture<String> responseFuture = caller.callExternalApi("http://example.com/api1", Map.of("param", "value"), "GET");
            Assertions.assertDoesNotThrow(() -> {
                String response = responseFuture.get(10, TimeUnit.SECONDS);
                Assertions.assertNotNull(response);
            });
        }

        @Test
        public void testCallExternalApi_postRequest() {
            ApiServiceCaller caller = new ApiServiceCaller();
            CompletableFuture<String> responseFuture = caller.callExternalApi("http://example.com/api1", Map.of("key", "value"), "POST");
            Assertions.assertDoesNotThrow(() -> {
                String response = responseFuture.get(10, TimeUnit.SECONDS);
                Assertions.assertNotNull(response);
            });
        }

        @Test
        public void testCallMultipleApis() {
            ApiServiceCaller caller = new ApiServiceCaller();
            List<ApiRequest> apiRequests = new ArrayList<>();
            apiRequests.add(new ApiRequest("http://example.com/api1", Map.of("param1", "value1"), "GET"));
            apiRequests.add(new ApiRequest("http://example.com/api2", Map.of("key", "value"), "POST"));
            List<CompletableFuture<String>> responseFutures = caller.callMultipleApis(apiRequests);
            Assertions.assertEquals(2, responseFutures.size());
            responseFutures.forEach(future -> Assertions.assertDoesNotThrow(() -> {
                String response = future.get(10, TimeUnit.SECONDS);
                Assertions.assertNotNull(response);
            }));
        }
    }

    // 用于保存 API 请求详情的类
    public static class ApiRequest {
        private final String url;
        private final Map<String, String> params;
        private final String method;

        public ApiRequest(String url, Map<String, String> params, String method) {
            this.url = url;
            this.params = params;
            this.method = method;
        }

        public String getUrl() {
            return url;
        }

        public Map<String, String> getParams() {
            return params;
        }

        public String getMethod() {
            return method;
        }
    }
}

// 确保执行器的优雅关闭
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    try {
        logger.info("正在关闭执行器...");
        executor.shutdown();
        if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
            logger.warning("执行器未在指定时间内终止。");
            executor.shutdownNow();
        }
    } catch (InterruptedException e) {
        logger.log(Level.SEVERE, "关闭过程中断", e);
        executor.shutdownNow();
    }
}));

二、压缩文件

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;

import java.io.*;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import java.util.concurrent.TimeUnit;

public class S3DownloadAndCompress {

    private final AmazonS3 s3Client;
    private final ExecutorService executorService;

    public S3DownloadAndCompress(int threadPoolSize) {
        System.out.println("初始化 S3 客户端和执行器服务...");
        this.s3Client = AmazonS3ClientBuilder.standard().build();
        this.executorService = Executors.newFixedThreadPool(threadPoolSize);
    }

    public ByteArrayOutputStream getCompressedFileStream(List<String> fileKeys, String bucketName) {
        System.out.println("开始下载和压缩过程..."
上一篇:大数据-174 Elasticsearch Query DSL - 全文检索 full-text query 匹配、短语、多字段 详细操作


下一篇:机器学习基础——概述