运维开发网

如何控制scala中future.sequence的并发性?

运维开发网 https://www.qedev.com 2020-07-15 11:07 出处:网络 作者:运维开发网整理
我知道我可以将Seq [Future [T]]转换为Future [Seq [T]] via val seqFuture = Future.sequence(seqOfFutures) seqFuture.map((seqT: Seq[T]) => {...}) 我现在的问题是,我在那个序列中有700个期货,我希望能够控制它们中有多少并行解决,因为每个未来将调用内部休息api,并且同时有700
我知道我可以将Seq [Future [T]]转换为Future [Seq [T]] via

val seqFuture = Future.sequence(seqOfFutures)
  seqFuture.map((seqT: Seq[T]) => {...})

我现在的问题是,我在那个序列中有700个期货,我希望能够控制它们中有多少并行解决,因为每个未来将调用内部休息api,并且同时有700个请求就像开火一样针对该服务器的dos攻击.

我宁愿一次只解决10个期货的问题.

我怎样才能做到这一点?

尝试pamu’s answer我看到错误:

[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:44: com.dreamlines.commons.LazyFuture[A] does not take parameters
[error]         val batch = Future.sequence(c.map(_()))
[error]                                            ^
[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:28: no type parameters for method sequence: (in: M[scala.concurrent.Future[A]])(implicit cbf: scala.collection.generic.CanBuildFrom[M[scala.concurrent.Future[A]],A,M[A]], implicit executor: scala.concurrent.ExecutionContext)scala.concurrent.Future[M[A]] exist so that it can be applied to arguments (List[Nothing])
[error]  --- because ---
[error] argument expression's type is not compatible with formal parameter type;
[error]  found   : List[Nothing]
[error]  required: ?M[scala.concurrent.Future[?A]]
[error]         val batch = Future.sequence(c.map(_()))
[error]                            ^
[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:42: type mismatch;
[error]  found   : List[Nothing]
[error]  required: M[scala.concurrent.Future[A]]
[error]         val batch = Future.sequence(c.map(_()))
[error]                                          ^
[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:36: Cannot construct a collection of type M[A] with elements of type A based on a collection of type M[scala.concurrent.Future[A]].
[error]         val batch = Future.sequence(c.map(_()))
[error]                                    ^
[error] four errors found
FoldLeft

简单的foldLeft可用于控制一次同时运行的期货数量.

首先,让我们创建一个名为LazyFuture的案例类

case class LazyFuture[+A](f: Unit => Future[A]) {
  def apply() = f()
}

object LazyFuture {
  def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f))

  def apply[A](f: => Future[A])(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => f)
}

LazyFuture阻止未来立即运行

val list: List[LazyFuture[A]] = ...


list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])){ (r, c) =>
  val batch = Future.sequence(c.map(_()))
  batch.flatMap(values => r.map(rs => rs ++ values))
}

相应地更改concurFactor以同时运行多个期货.

concurFactor of 1将同时运行一个未来

concurFactor of 2将同时运行两个期货

等等 …

def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int) =
   list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])){ (r, c) =>
      val batch = Future.sequence(c.map(_()))
      r.flatMap(rs => batch.map(values => rs ++ values))
    }

完整的代码

case class LazyFuture[+A](f: Unit => Future[A]) {
    def apply() = f()
  }

  object LazyFuture {
    def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f))

    def apply[A](f: => Future[A])(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => f)
  }

  def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int)(implicit ec: ExecutionContext): Future[List[A]] =
    list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])) { (r, c) =>
      val batch = Future.sequence(c.map(_ ()))
      r.flatMap(rs => batch.map(values => rs ++ values))
    }

限制执行上下文

您还可以通过限制执行池中的线程数来限制计算资源.但是,这种解决方案并不灵活.就个人而言,我不喜欢它.

val context: ExecutionContext = 
  ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8))

您必须记住传递正确的执行上下文,这是一个隐式值.有时我们不知道哪个隐含在范围内.这是马车

警告

当未来如下构建时

val foo = Future {
     1 + 2
} // future starts executing

LazyFuture(foo) // Not a right way

foo已经开始执行,无法控制.

构建LazyFuture的正确方法

val foo = LazyFuture {
  1 + 2
}

要么

val foo = LazyFuture {
  Future {
   1 + 2
  }
}

工作实例

package main

import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.Duration

object Main {

  case class LazyFuture[A](f: Unit => Future[A]) {
    def apply(): Future[A] = f()
  }

  object LazyFuture {
    def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f))
    def apply[A](f: => Future[A]): LazyFuture[A] = LazyFuture(_ => f)
  }

  def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int)
    (implicit ec: ExecutionContext): Future[List[A]] =
    list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])) { (r, c) =>
      val batch = Future.sequence(c.map(_ ()))
      r.flatMap(rs => r.map(values=> rs ++ values))
    }

  def main(args: Array[String]): Unit = {
    import scala.concurrent.ExecutionContext.Implicits.global


    val futures: Seq[LazyFuture[Int]] = List(1, 2, 3, 4, 5).map { value =>
      LazyFuture {
        println(s"value: $value started")
        Thread.sleep(value * 200)
        println(s"value: $value stopped")
        value
      }
    }
    val f = executeBatch(futures.toList)(2)
    Await.result(f, Duration.Inf)
  }

}
0

精彩评论

暂无评论...
验证码 换一张
取 消