def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] =
fromPath(f, chunkSize, startPosition = 0)
def toPath(
f: Path,
options: Set[OpenOption] = Set(WRITE, TRUNCATE_EXISTING, CREATE)): Sink[ByteString, Future[IOResult]] =
toPath(f, options, startPosition = 0)
package com.datatech.restapi
import akka.stream._
import akka.stream.scaladsl._
import java.nio.file._
import akka.util._
object FileStreaming {
def fileStreamSource(filePath: String, chunkSize: Int = 1024, dispatcherName: String = ""): Source[ ByteString,Any] = {
def loadFile = {
// implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
val file = Paths.get(filePath)
if (dispatcherName != "")
FileIO.fromPath(file, chunkSize)
FileIO.fromPath(file, chunkSize)
akka {
http {
blocking-ops-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
// or in Akka 2.4.2+
fixed-pool-size = 16
throughput = 100
package com.datatech.restapi
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._
import akka.http.scaladsl.coding.Gzip
import java.nio.file._
import FileStreaming._
import AuthBase._
case class FileRoute(jwt: String)(implicit auth: AuthBase, sys: ActorSystem) {
val destPath = "/users/tiger-macpro/cert4/meme.jpg"
implicit val mat = ActorMaterializer()
val route = pathPrefix("file") {
val privatePath = auth.tempDirFromJwt(jwt)
if (privatePath.length == 0)
(get & path(Remaining)) { filename =>
withoutSizeLimit {
encodeResponseWith(Gzip) {
fileStreamSource(privatePath+"/download/"+filename, 1024))
} ~
(post & parameters(‘filename)) { filename =>
withoutSizeLimit {
decodeRequest {
extractDataBytes { bytes =>
val fut = bytes.runWith(FileIO.toPath(Paths.get(privatePath+"/upload/"+filename)))
onComplete(fut) { _ => complete(StatusCodes.OK)}
package com.datatech.restapi
import akka.http.scaladsl.server.directives.Credentials
import AuthBase._
object MockUserAuthService {
case class User(username: String, password: String, userInfo: UserInfo)
val validUsers = Seq(User("johnny", "p4ssw0rd",
Map("shopid" -> "1101", "userid" -> "101", "tmpdir" ->"/users/tiger-macpro/1101101"))
,User("tiger", "secret",
Map("shopid" -> "1101" , "userid" -> "102", "tmpdir" ->"/users/tiger-macpro/1101102")))
def getValidUser(credentials: Credentials): Option[UserInfo] =
credentials match {
case p @ Credentials.Provided(_) =>
validUsers.find(user => user.username == p.identifier && p.verify(user.password)) match {
case Some(user) => Some(user.userInfo)
case _ => None
case _ => None
def tempDirFromJwt(jwt: String): String = {
val optUserInfo = getUserInfo(jwt)
val dir: String = optUserInfo match {
case Some(m) =>
try {
} catch {case err: Throwable => ""}
case None => ""
val route =
path("auth") {
authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo =>
post { complete(authenticator.issueJwt(userinfo))}
} ~
pathPrefix("api") {
authenticateOAuth2(realm = "api", authenticator.authenticateToken) { validToken =>
(path("hello") & get) {
complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")
} ~
(path("how are you") & get) {
complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")
} ~
// ~ ...
import akka.stream._
import java.nio.file._
import java.io._
import akka.http.scaladsl.model.headers._
import scala.concurrent._
import com.datatech.restapi.FileStreaming._
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.Http
import akka.stream.scaladsl.{FileIO, Source}
import scala.util._
case class FileUtil(implicit sys: ActorSystem) {
import sys.dispatcher
implicit val mat = ActorMaterializer()
def createEntity(file: File): RequestEntity = {
val formData =
HttpEntity(MediaTypes.`application/octet-stream`, file.length(), FileIO.fromPath(file.toPath, chunkSize = 100000)), // the chunk size here is currently critical for performance
Map("filename" -> file.getName))))
Await.result(Marshal(formData).to[RequestEntity], 3 seconds)
def uploadFile(request: HttpRequest, dataEntity: RequestEntity) = {
implicit val mat = ActorMaterializer()
import sys.dispatcher
val futResp = Http(sys).singleRequest(
// Gzip.encodeMessage(
request.copy(entity = dataEntity) //.addHeader(`Content-Encoding`(HttpEncodings.gzip))
// )
.andThen {
case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
case Success(r@HttpResponse(code, _, _, _)) =>
println(s"Upload request failed, response code: $code")
case Success(_) => println("Unable to Upload file!")
case Failure(err) => println(s"Upload failed: ${err.getMessage}")
def downloadFileTo(request: HttpRequest, destPath: String) = {
// val req = request.addHeader(`Content-Encoding`(HttpEncodings.gzip))
val futResp = Http(sys).singleRequest(request) //.map(Gzip.decodeMessage(_))
.andThen {
case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
.onComplete { case _ => println(s"Download file saved to: $destPath") }
case Success(r@HttpResponse(code, _, _, _)) =>
println(s"Download request failed, response code: $code")
case Success(_) => println("Unable to download file!")
case Failure(err) => println(s"Download failed: ${err.getMessage}")
object TestFileClient {
type UserInfo = Map[String,Any]
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher
val helloRequest = HttpRequest(uri = "")
val authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd"))
val authRequest = HttpRequest(
uri = "",
headers = List(authorization)
val futToken: Future[HttpResponse] = Http().singleRequest(authRequest)
val respToken = for {
resp <- futToken
jstr <- resp.entity.dataBytes.runFold("") {(s,b) => s + b.utf8String}
} yield jstr
val jstr = Await.result[String](respToken,2 seconds)
val authentication = headers.Authorization(OAuth2BearerToken(jstr))
val entity = HttpEntity(
val chunked = HttpEntity.Chunked.fromData(
val multipart = FileUtil().createEntity(new File("/Users/tiger-macpro/cert3/ctiger.jpg"))
val uploadRequest = HttpRequest(
uri = "",
//upload file
//Await.ready(FileUtil().uploadFile(uploadRequest,entity),2 seconds)
//Await.ready(FileUtil().uploadFile(uploadRequest,chunked),2 seconds)
Await.ready(FileUtil().uploadFile(uploadRequest,multipart),2 seconds)
val dlRequest = HttpRequest(
uri = "",
FileUtil().downloadFileTo(dlRequest, "/users/tiger-macpro/cert3/mypic.jpg")
在文件上传upload时试过用entity,chunked,multipart方式构建的request-entity,服务端都能处理。好像看过很多java的httpclient图片上传,都是用multipart entity。现在这个服务端能正确处理。当然,在服务端同样可以用multipart方式提供文件下载服务,就不在这里实现了。不过可以提供一段示范代码:
import akka.actor._
import akka.stream._
import java.nio.file._
import java.io._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.util.ByteString
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.Http
import akka.stream.scaladsl.{FileIO, Source}
import com.typesafe.config.{Config, ConfigFactory}
import scala.concurrent.Future
object TestMultipartFileUpload extends App {
val testConf: Config = ConfigFactory.parseString("""
akka.loglevel = INFO
akka.log-dead-letters = off""")
implicit val system = ActorSystem("ServerTest", testConf)
import system.dispatcher
implicit val materializer = ActorMaterializer()
val testFile: File = new File("/users/tiger-macpro/downloads/uploadFileDemo.scala") //args(0))
def startTestServer(): Future[ServerBinding] = {
import akka.http.scaladsl.server.Directives._
val route: Route =
path("upload") {
entity(as[Multipart.FormData]) { (formdata: Multipart.FormData) ⇒
val fileNamesFuture = formdata.parts.mapAsync(1) { p ⇒
println(s"Got part. name: ${p.name} filename: ${p.filename}")
println("Counting size...")
@volatile var lastReport = System.currentTimeMillis()
@volatile var lastSize = 0L
def receiveChunk(counter: (Long, Long), chunk: ByteString): (Long, Long) = {
val (oldSize, oldChunks) = counter
val newSize = oldSize + chunk.size
val newChunks = oldChunks + 1
val now = System.currentTimeMillis()
if (now > lastReport + 1000) {
val lastedTotal = now - lastReport
val bytesSinceLast = newSize - lastSize
val speedMBPS = bytesSinceLast.toDouble / 1000000 /* bytes per MB */ / lastedTotal * 1000 /* millis per second */
println(f"Already got $newChunks%7d chunks with total size $newSize%11d bytes avg chunksize ${newSize / newChunks}%7d bytes/chunk speed: $speedMBPS%6.2f MB/s")
lastReport = now
lastSize = newSize
(newSize, newChunks)
p.entity.dataBytes.runFold((0L, 0L))(receiveChunk).map {
case (size, numChunks) ⇒
println(s"Size is $size")
(p.name, p.filename, size)
}.runFold(Seq.empty[(String, Option[String], Long)])(_ :+ _).map(_.mkString(", "))
complete {
Http().bindAndHandle(route, interface = "localhost", port = 0)
def createEntity(file: File): Future[RequestEntity] = {
val formData =
HttpEntity(MediaTypes.`application/octet-stream`, file.length(), FileIO.fromPath(file.toPath, chunkSize = 100000)), // the chunk size here is currently critical for performance
Map("filename" -> file.getName))))
def createRequest(target: Uri, file: File): Future[HttpRequest] =
for {
e ← createEntity(file)
} yield HttpRequest(HttpMethods.POST, uri = target, entity = e)
try {
val result =
for {
ServerBinding(address) ← startTestServer()
_ = println(s"Server up at $address")
port = address.getPort
target = Uri(scheme = "http", authority = Uri.Authority(Uri.Host("localhost"), port = port), path = Uri.Path("/upload"))
req ← createRequest(target, testFile)
_ = println(s"Running request, uploading test file of size ${testFile.length} bytes")
response ← Http().singleRequest(req)
responseBodyAsString ← Unmarshal(response).to[String]
} yield responseBodyAsString
result.onComplete { res ⇒
println(s"The result was $res")
system.scheduler.scheduleOnce(60.seconds) {
println("Shutting down after timeout...")
} catch {
case _: Throwable ⇒ system.terminate()
name := "restapi" version := "0.1" scalaVersion := "2.12.8" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-http" % "10.1.8", "com.typesafe.akka" %% "akka-stream" % "2.5.23", "com.pauldijou" %% "jwt-core" % "3.0.1", "de.heikoseeberger" %% "akka-http-json4s" % "1.22.0", "org.json4s" %% "json4s-native" % "3.6.1", "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8", "com.typesafe.scala-logging" %% "scala-logging" % "3.9.0", "org.slf4j" % "slf4j-simple" % "1.7.25", "org.json4s" %% "json4s-jackson" % "3.6.7", "org.json4s" %% "json4s-ext" % "3.6.7" )
package com.datatech.restapi import akka.http.scaladsl.server.directives.Credentials import pdi.jwt._ import org.json4s.native.Json import org.json4s._ import org.json4s.jackson.JsonMethods._ import pdi.jwt.algorithms._ import scala.util._ object AuthBase { type UserInfo = Map[String, Any] case class AuthBase( algorithm: JwtAlgorithm = JwtAlgorithm.HMD5, secret: String = "OpenSesame", getUserInfo: Credentials => Option[UserInfo] = null) { ctx => def withAlgorithm(algo: JwtAlgorithm): AuthBase = ctx.copy(algorithm=algo) def withSecretKey(key: String): AuthBase = ctx.copy(secret = key) def withUserFunc(f: Credentials => Option[UserInfo]): AuthBase = ctx.copy(getUserInfo = f) def authenticateToken(credentials: Credentials): Option[String] = credentials match { case Credentials.Provided(token) => algorithm match { case algo: JwtAsymmetricAlgorithm => Jwt.isValid(token, secret, Seq((algorithm.asInstanceOf[JwtAsymmetricAlgorithm]))) match { case true => Some(token) case _ => None } case _ => Jwt.isValid(token, secret, Seq((algorithm.asInstanceOf[JwtHmacAlgorithm]))) match { case true => Some(token) case _ => None } } case _ => None } def getUserInfo(token: String): Option[UserInfo] = { algorithm match { case algo: JwtAsymmetricAlgorithm => Jwt.decodeRawAll(token, secret, Seq(algorithm.asInstanceOf[JwtAsymmetricAlgorithm])) match { case Success(parts) => Some(((parse(parts._2).asInstanceOf[JObject]) \ "userinfo").values.asInstanceOf[UserInfo]) case Failure(err) => None } case _ => Jwt.decodeRawAll(token, secret, Seq(algorithm.asInstanceOf[JwtHmacAlgorithm])) match { case Success(parts) => Some(((parse(parts._2).asInstanceOf[JObject]) \ "userinfo").values.asInstanceOf[UserInfo]) case Failure(err) => None } } } def issueJwt(userinfo: UserInfo): String = { val claims = JwtClaim() + Json(DefaultFormats).write(("userinfo", userinfo)) Jwt.encode(claims, secret, algorithm) } def tempDirFromJwt(jwt: String): String = { val optUserInfo = getUserInfo(jwt) val dir: String = optUserInfo match { case Some(m) => try { m("tmpdir").toString } catch {case err: Throwable => ""} case None => "" } dir } } }
package com.datatech.restapi import akka.stream._ import akka.stream.scaladsl._ import java.nio.file._ import akka.util._ object FileStreaming { def fileStreamSource(filePath: String, chunkSize: Int = 1024, dispatcherName: String = ""): Source[ ByteString,Any] = { def loadFile = { // implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher") val file = Paths.get(filePath) if (dispatcherName != "") FileIO.fromPath(file, chunkSize) .withAttributes(ActorAttributes.dispatcher(dispatcherName)) else FileIO.fromPath(file, chunkSize) } loadFile } }
package com.datatech.restapi import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.model._ import akka.http.scaladsl.coding.Gzip import java.nio.file._ import FileStreaming._ import AuthBase._ case class FileRoute(jwt: String)(implicit auth: AuthBase, sys: ActorSystem) { val destPath = "/users/tiger-macpro/cert4/meme.jpg" implicit val mat = ActorMaterializer() val route = pathPrefix("file") { val privatePath = auth.tempDirFromJwt(jwt) if (privatePath.length == 0) complete(StatusCodes.NotFound) (get & path(Remaining)) { filename => withoutSizeLimit { encodeResponseWith(Gzip) { complete( HttpEntity( ContentTypes.`application/octet-stream`, fileStreamSource(privatePath+"/download/"+filename, 1024)) ) } } } ~ (post & parameters(‘filename)) { filename => withoutSizeLimit { decodeRequest { extractDataBytes { bytes => val fut = bytes.runWith(FileIO.toPath(Paths.get(privatePath+"/upload/"+filename))) onComplete(fut) { _ => complete(StatusCodes.OK)} } } } } } }
package com.datatech.restapi import akka.http.scaladsl.server.directives.Credentials import AuthBase._ object MockUserAuthService { case class User(username: String, password: String, userInfo: UserInfo) val validUsers = Seq(User("johnny", "p4ssw0rd", Map("shopid" -> "1101", "userid" -> "101", "tmpdir" ->"/users/tiger-macpro/1101101")) ,User("tiger", "secret", Map("shopid" -> "1101" , "userid" -> "102", "tmpdir" ->"/users/tiger-macpro/1101102"))) def getValidUser(credentials: Credentials): Option[UserInfo] = credentials match { case p @ Credentials.Provided(_) => validUsers.find(user => user.username == p.identifier && p.verify(user.password)) match { case Some(user) => Some(user.userInfo) case _ => None } case _ => None } }
package com.datatech.restapi import akka.actor._ import akka.stream._ import akka.http.scaladsl.Http import akka.http.scaladsl.server.Directives._ import pdi.jwt._ import AuthBase._ import MockUserAuthService._ object RestApiServer extends App { implicit val httpSys = ActorSystem("httpSystem") implicit val httpMat = ActorMaterializer() implicit val httpEC = httpSys.dispatcher implicit val authenticator = new AuthBase() .withAlgorithm(JwtAlgorithm.HS256) .withSecretKey("OpenSesame") .withUserFunc(getValidUser) val route = path("auth") { authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo => post { complete(authenticator.issueJwt(userinfo))} } } ~ pathPrefix("api") { authenticateOAuth2(realm = "api", authenticator.authenticateToken) { validToken => (path("hello") & get) { complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}") } ~ (path("how are you") & get) { complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}") } ~ FileRoute(validToken) .route // ~ ... } } val (port, host) = (50081,"") val bindingFuture = Http().bindAndHandle(route,host,port) println(s"Server running at $host $port. Press any key to exit ...") scala.io.StdIn.readLine() bindingFuture.flatMap(_.unbind()) .onComplete(_ => httpSys.terminate()) }
import akka.stream._ import java.nio.file._ import java.io._ import akka.http.scaladsl.model.headers._ import scala.concurrent._ import com.datatech.restapi.FileStreaming._ import scala.concurrent.duration._ import akka.actor.ActorSystem import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model._ import akka.http.scaladsl.Http import akka.stream.scaladsl.{FileIO, Source} import scala.util._ case class FileUtil(implicit sys: ActorSystem) { import sys.dispatcher implicit val mat = ActorMaterializer() def createEntity(file: File): RequestEntity = { require(file.exists()) val formData = Multipart.FormData( Source.single( Multipart.FormData.BodyPart( "test", HttpEntity(MediaTypes.`application/octet-stream`, file.length(), FileIO.fromPath(file.toPath, chunkSize = 100000)), // the chunk size here is currently critical for performance Map("filename" -> file.getName)))) Await.result(Marshal(formData).to[RequestEntity], 3 seconds) } def uploadFile(request: HttpRequest, dataEntity: RequestEntity) = { implicit val mat = ActorMaterializer() import sys.dispatcher val futResp = Http(sys).singleRequest( // Gzip.encodeMessage( request.copy(entity = dataEntity) //.addHeader(`Content-Encoding`(HttpEncodings.gzip)) // ) ) futResp .andThen { case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) => entity.dataBytes.map(_.utf8String).runForeach(println) case Success(r@HttpResponse(code, _, _, _)) => println(s"Upload request failed, response code: $code") r.discardEntityBytes() case Success(_) => println("Unable to Upload file!") case Failure(err) => println(s"Upload failed: ${err.getMessage}") } } def downloadFileTo(request: HttpRequest, destPath: String) = { // val req = request.addHeader(`Content-Encoding`(HttpEncodings.gzip)) val futResp = Http(sys).singleRequest(request) //.map(Gzip.decodeMessage(_)) futResp .andThen { case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) => entity.withoutSizeLimit().dataBytes.runWith(FileIO.toPath(Paths.get(destPath))) .onComplete { case _ => println(s"Download file saved to: $destPath") } case Success(r@HttpResponse(code, _, _, _)) => println(s"Download request failed, response code: $code") r.discardEntityBytes() case Success(_) => println("Unable to download file!") case Failure(err) => println(s"Download failed: ${err.getMessage}") } } } object TestFileClient { type UserInfo = Map[String,Any] def main(args: Array[String]): Unit = { implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() // needed for the future flatMap/onComplete in the end implicit val executionContext = system.dispatcher val helloRequest = HttpRequest(uri = "") val authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd")) val authRequest = HttpRequest( HttpMethods.POST, uri = "", headers = List(authorization) ) val futToken: Future[HttpResponse] = Http().singleRequest(authRequest) val respToken = for { resp <- futToken jstr <- resp.entity.dataBytes.runFold("") {(s,b) => s + b.utf8String} } yield jstr val jstr = Await.result[String](respToken,2 seconds) println(jstr) scala.io.StdIn.readLine() val authentication = headers.Authorization(OAuth2BearerToken(jstr)) val entity = HttpEntity( ContentTypes.`application/octet-stream`, fileStreamSource("/Users/tiger-macpro/cert3/ctiger.jpg",1024) ) // val chunked = HttpEntity.Chunked.fromData( ContentTypes.`application/octet-stream`, fileStreamSource("/Users/tiger-macpro/cert3/ctiger.jpg",1024) ) val multipart = FileUtil().createEntity(new File("/Users/tiger-macpro/cert3/ctiger.jpg")) val uploadRequest = HttpRequest( HttpMethods.POST, uri = "", ).addHeader(authentication) //upload file //Await.ready(FileUtil().uploadFile(uploadRequest,entity),2 seconds) //Await.ready(FileUtil().uploadFile(uploadRequest,chunked),2 seconds) Await.ready(FileUtil().uploadFile(uploadRequest,multipart),2 seconds) val dlRequest = HttpRequest( HttpMethods.GET, uri = "", ).addHeader(authentication) FileUtil().downloadFileTo(dlRequest, "/users/tiger-macpro/cert3/mypic.jpg") scala.io.StdIn.readLine() system.terminate() } }