OKHttp和压缩文件实战
一、发起请求处理
import okhttp3.*;
import org.junit.jupiter.api.*;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.List;
import java.util.ArrayList;
import java.util.stream.Collectors;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.Map;
public class ApiServiceCaller {
private static final ExecutorService executor = Executors.newFixedThreadPool(10, runnable -> {
Thread thread = new Thread(runnable);
thread.setName("ApiServiceCaller-Thread");
thread.setDaemon(true);
return thread;
});
private static final Logger logger = Logger.getLogger(ApiServiceCaller.class.getName());
private static final OkHttpClient client = new OkHttpClient.Builder()
.connectTimeout(5, TimeUnit.SECONDS)
.readTimeout(5, TimeUnit.SECONDS)
.connectionPool(new ConnectionPool(10, 5, TimeUnit.MINUTES))
.retryOnConnectionFailure(true)
.build();
public CompletableFuture<String> callExternalApi(String url, Map<String, String> params, String method) {
return CompletableFuture.supplyAsync(() -> {
try {
Request request = buildRequest(url, params, method);
return executeRequest(request);
} catch (Exception e) {
logger.log(Level.SEVERE, "构建请求或执行请求时出错", e);
throw new RuntimeException("调用 API 时出错: " + url, e);
}
}, executor);
}
private Request buildGetRequest(String url, Map<String, String> params) {
HttpUrl.Builder httpBuilder = HttpUrl.parse(url).newBuilder();
if (params != null && !params.isEmpty()) {
params.forEach(httpBuilder::addQueryParameter);
}
return new Request.Builder().url(httpBuilder.build()).get().build();
}
private Request buildPostRequest(String url, Map<String, String> params) throws IOException {
RequestBody body = RequestBody.create(
MediaType.parse("application/json"),
new com.fasterxml.jackson.databind.ObjectMapper().writeValueAsString(params)
);
return new Request.Builder().url(url).post(body).build();
}
private Request buildRequest(String url, Map<String, String> params, String method) throws IOException {
if ("GET".equalsIgnoreCase(method)) {
return buildGetRequest(url, params);
} else if ("POST".equalsIgnoreCase(method)) {
return buildPostRequest(url, params);
} else {
throw new IllegalArgumentException("不支持的方法: " + method);
}
}
private String executeRequest(Request request) throws IOException {
try (Response response = client.newCall(request).execute()) {
if (response.isSuccessful() && response.body() != null) {
String responseBody = response.body().string();
logger.info("收到响应: " + responseBody);
return responseBody;
} else {
logger.warning("收到非正常响应码: " + response.code());
throw new RuntimeException("调用 API 失败,响应码: " + response.code());
}
}
}
public List<CompletableFuture<String>> callMultipleApis(List<ApiRequest> apiRequests) {
logger.info("正在调用多个 API...");
return apiRequests.stream()
.map(request -> callExternalApi(request.getUrl(), request.getParams(), request.getMethod()))
.collect(Collectors.toList());
}
public void processApiResponses(List<CompletableFuture<String>> futures) {
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allOf.thenAccept(v -> futures.forEach(future -> {
future.handle((response, throwable) -> {
if (throwable != null) {
logger.log(Level.SEVERE, "处理 future 出错", throwable);
System.err.println("处理 future 出错: " + throwable.getMessage());
} else {
logger.info("处理响应: " + response);
System.out.println(response);
}
return null;
});
}));
}
public static void main(String[] args) {
ApiServiceCaller apiServiceCaller = new ApiServiceCaller();
List<ApiRequest> apiRequests = new ArrayList<>();
apiRequests.add(new ApiRequest("http://example.com/api1", Map.of("param1", "value1"), "GET"));
apiRequests.add(new ApiRequest("http://example.com/api2", Map.of("key", "value"), "POST"));
apiRequests.add(new ApiRequest("http://example.com/api3", Map.of("param3", "value3"), "GET"));
logger.info("开始调用 API...");
List<CompletableFuture<String>> apiCalls = apiServiceCaller.callMultipleApis(apiRequests);
apiServiceCaller.processApiResponses(apiCalls);
}
public static class ApiServiceCallerTest {
@Test
public void testCallExternalApi_getRequest() {
ApiServiceCaller caller = new ApiServiceCaller();
CompletableFuture<String> responseFuture = caller.callExternalApi("http://example.com/api1", Map.of("param", "value"), "GET");
Assertions.assertDoesNotThrow(() -> {
String response = responseFuture.get(10, TimeUnit.SECONDS);
Assertions.assertNotNull(response);
});
}
@Test
public void testCallExternalApi_postRequest() {
ApiServiceCaller caller = new ApiServiceCaller();
CompletableFuture<String> responseFuture = caller.callExternalApi("http://example.com/api1", Map.of("key", "value"), "POST");
Assertions.assertDoesNotThrow(() -> {
String response = responseFuture.get(10, TimeUnit.SECONDS);
Assertions.assertNotNull(response);
});
}
@Test
public void testCallMultipleApis() {
ApiServiceCaller caller = new ApiServiceCaller();
List<ApiRequest> apiRequests = new ArrayList<>();
apiRequests.add(new ApiRequest("http://example.com/api1", Map.of("param1", "value1"), "GET"));
apiRequests.add(new ApiRequest("http://example.com/api2", Map.of("key", "value"), "POST"));
List<CompletableFuture<String>> responseFutures = caller.callMultipleApis(apiRequests);
Assertions.assertEquals(2, responseFutures.size());
responseFutures.forEach(future -> Assertions.assertDoesNotThrow(() -> {
String response = future.get(10, TimeUnit.SECONDS);
Assertions.assertNotNull(response);
}));
}
}
public static class ApiRequest {
private final String url;
private final Map<String, String> params;
private final String method;
public ApiRequest(String url, Map<String, String> params, String method) {
this.url = url;
this.params = params;
this.method = method;
}
public String getUrl() {
return url;
}
public Map<String, String> getParams() {
return params;
}
public String getMethod() {
return method;
}
}
}
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
logger.info("正在关闭执行器...");
executor.shutdown();
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
logger.warning("执行器未在指定时间内终止。");
executor.shutdownNow();
}
} catch (InterruptedException e) {
logger.log(Level.SEVERE, "关闭过程中断", e);
executor.shutdownNow();
}
}));
二、压缩文件
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import java.io.*;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import java.util.concurrent.TimeUnit;
public class S3DownloadAndCompress {
private final AmazonS3 s3Client;
private final ExecutorService executorService;
public S3DownloadAndCompress(int threadPoolSize) {
System.out.println("初始化 S3 客户端和执行器服务...");
this.s3Client = AmazonS3ClientBuilder.standard().build();
this.executorService = Executors.newFixedThreadPool(threadPoolSize);
}
public ByteArrayOutputStream getCompressedFileStream(List<String> fileKeys, String bucketName) {
System.out.println("开始下载和压缩过程..."