使用SseEmitter进行日志推送时,后台出现 broken pipe 错误,排查发现是浏览器在订阅打开SseEmitter请求连接时,会在一定时间内多次发起请求,但由于代码中new SseEmitter后将实例保存起来,请求会查询实例是否存在,如果存在则取出,但下次请求浏览器传递的是新的 EventSource,但后台存的是旧实例,导致406错误。
解决办法:每次订阅都将保存的实例移除后重新创建;
问题:
org.apache.catalina.connector.ClientAbortException: java.io.IOException: Broken pipe at org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:299) at org.apache.catalina.connector.OutputBuffer.flush(OutputBuffer.java:262) at org.apache.catalina.connector.CoyoteOutputStream.flush(CoyoteOutputStream.java:118) at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:297) at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141) at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229) at org.springframework.util.StreamUtils.copy(StreamUtils.java:124) at org.springframework.http.converter.StringHttpMessageConverter.writeInternal(StringHttpMessageConverter.java:110) at org.springframework.http.converter.StringHttpMessageConverter.writeInternal(StringHttpMessageConverter.java:44) at org.springframework.http.converter.AbstractHttpMessageConverter.write(AbstractHttpMessageConverter.java:227) at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler$HttpMessageConvertingHandler.sendInternal(ResponseBodyEmitterReturnValueHandler.java:191) at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler$HttpMessageConvertingHandler.send(ResponseBodyEmitterReturnValueHandler.java:184) at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter.sendInternal(ResponseBodyEmitter.java:189) at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter.send(ResponseBodyEmitter.java:183) at org.springframework.web.servlet.mvc.method.annotation.SseEmitter.send(SseEmitter.java:133) at org.springframework.web.servlet.mvc.method.annotation.SseEmitter.send(SseEmitter.java:116) at org.springframework.web.servlet.mvc.method.annotation.SseEmitter.send(SseEmitter.java:95) at com.southgis.ispatial.dag.controller.DagController.subscribe(DagController.java:771) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190) at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138) at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:104) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:892) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797) at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1039) at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:942) at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1005) at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:897) at javax.servlet.http.HttpServlet.service(HttpServlet.java:634) at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:882) at javax.servlet.http.HttpServlet.service(HttpServlet.java:741) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:92) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:200) at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:490) at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139) at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408) at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:836) at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1747) at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) at java.lang.Thread.run(Thread.java:748)
代码:
@GetMapping(value = "xxx/subscribe") public SseEmitter subscribe(@RequestParam(name = "instId", required = false) Long instId) { Map<Long, SseEmitter> sseCache = SseParamsUtil.getSseCache(); sseCache.remove(instId); SseEmitter sseEmitter = new SseEmitter(0L); sseCache.put(instId, sseEmitter); try { if (sseEmitter != null) { sseEmitter.send("EventSource start"); } } catch (IOException e) { e.printStackTrace(); } // 超时回调 触发 sseEmitter.onTimeout(() -> sseCache.remove(instId)); sseEmitter.onError((throwable -> System.out.println(throwable))); // 结束之后的回调触发 sseEmitter.onCompletion(() -> System.out.println("sss推送结束!!!")); return sseEmitter; }