国产chinesehdxxxx野外,国产av无码专区亚洲av琪琪,播放男人添女人下边视频,成人国产精品一区二区免费看,chinese丰满人妻videos

Scala 并發(fā)編程

2018-02-24 15:49 更新

Runnable/Callable

Runnable接口只有一個(gè)沒有返回值的方法。

trait Runnable {
  def run(): Unit
}

Callable與之類似,除了它有一個(gè)返回值

trait Callable[V] {
  def call(): V
}

線程

Scala并發(fā)是建立在Java并發(fā)模型基礎(chǔ)上的。

在Sun JVM上,對(duì)IO密集的任務(wù),我們可以在一臺(tái)機(jī)器運(yùn)行成千上萬個(gè)線程。

一個(gè)線程需要一個(gè)Runnable。你必須調(diào)用線程的?start?方法來運(yùn)行Runnable。

scala> val hello = new Thread(new Runnable {
  def run() {
    println("hello world")
  }
})
hello: java.lang.Thread = Thread[Thread-3,5,main]

scala> hello.start
hello world

當(dāng)你看到一個(gè)類實(shí)現(xiàn)了Runnable接口,你就知道它的目的是運(yùn)行在一個(gè)線程中。

單線程代碼

這里有一個(gè)可以工作但有問題的代碼片斷。

import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Date

class NetworkService(port: Int, poolSize: Int) extends Runnable {
  val serverSocket = new ServerSocket(port)

  def run() {
    while (true) {
      // This will block until a connection comes in.
      val socket = serverSocket.accept()
      (new Handler(socket)).run()
    }
  }
}

class Handler(socket: Socket) extends Runnable {
  def message = (Thread.currentThread.getName() + "\n").getBytes

  def run() {
    socket.getOutputStream.write(message)
    socket.getOutputStream.close()
  }
}

(new NetworkService(2020, 2)).run

每個(gè)請(qǐng)求都會(huì)回應(yīng)當(dāng)前線程的名稱,所以結(jié)果始終是?main?。

這段代碼的主要缺點(diǎn)是在同一時(shí)間,只有一個(gè)請(qǐng)求可以被相應(yīng)!

你可以把每個(gè)請(qǐng)求放入一個(gè)線程中處理。只要簡單改變

(new Handler(socket)).run()

(new Thread(new Handler(socket))).start()

但如果你想重用線程或者對(duì)線程的行為有其他策略呢?

Executors

隨著Java 5的發(fā)布,它決定提供一個(gè)針對(duì)線程的更抽象的接口。

你可以通過?Executors?對(duì)象的靜態(tài)方法得到一個(gè)?ExecutorService?對(duì)象。這些方法為你提供了可以通過各種政策配置的?ExecutorService?,如線程池。

下面改寫我們之前的阻塞式網(wǎng)絡(luò)服務(wù)器來允許并發(fā)請(qǐng)求。

import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Date

class NetworkService(port: Int, poolSize: Int) extends Runnable {
  val serverSocket = new ServerSocket(port)
  val pool: ExecutorService = Executors.newFixedThreadPool(poolSize)

  def run() {
    try {
      while (true) {
        // This will block until a connection comes in.
        val socket = serverSocket.accept()
        pool.execute(new Handler(socket))
      }
    } finally {
      pool.shutdown()
    }
  }
}

class Handler(socket: Socket) extends Runnable {
  def message = (Thread.currentThread.getName() + "\n").getBytes

  def run() {
    socket.getOutputStream.write(message)
    socket.getOutputStream.close()
  }
}

(new NetworkService(2020, 2)).run

這里有一個(gè)連接腳本展示了內(nèi)部線程是如何重用的。

$ nc localhost 2020
pool-1-thread-1

$ nc localhost 2020
pool-1-thread-2

$ nc localhost 2020
pool-1-thread-1

$ nc localhost 2020
pool-1-thread-2

Futures

Future?代表異步計(jì)算。你可以把你的計(jì)算包裝在Future中,當(dāng)你需要計(jì)算結(jié)果的時(shí)候,你只需調(diào)用一個(gè)阻塞的?get()?方法就可以了。一個(gè)?Executor?返回一個(gè)?Future?。如果使用Finagle RPC系統(tǒng),你可以使用?Future?實(shí)例持有可能尚未到達(dá)的結(jié)果。

一個(gè)?FutureTask?是一個(gè)Runnable實(shí)現(xiàn),就是被設(shè)計(jì)為由?Executor?運(yùn)行的

val future = new FutureTask[String](new Callable[String]() {
  def call(): String = {
    searcher.search(target);
}})
executor.execute(future)

現(xiàn)在我需要結(jié)果,所以阻塞直到其完成。

val blockingResult = future.get()

參考?Scala School的Finagle介紹中大量使用了Future,包括一些把它們結(jié)合起來的不錯(cuò)的方法。以及 Effective Scala 對(duì)Futures的意見。

線程安全問題

class Person(var name: String) {
  def set(changedName: String) {
    name = changedName
  }
}

這個(gè)程序在多線程環(huán)境中是不安全的。如果有兩個(gè)線程有引用到同一個(gè)Person實(shí)例,并調(diào)用?set?,你不能預(yù)測(cè)兩個(gè)調(diào)用結(jié)束后?name?的結(jié)果。

在Java內(nèi)存模型中,允許每個(gè)處理器把值緩存在L1或L2緩存中,所以在不同處理器上運(yùn)行的兩個(gè)線程都可以有自己的數(shù)據(jù)視圖。

讓我們來討論一些工具,來使線程保持一致的數(shù)據(jù)視圖。

三種工具

同步

互斥鎖(Mutex)提供所有權(quán)語義。當(dāng)你進(jìn)入一個(gè)互斥體,你擁有它。同步是JVM中使用互斥鎖最常見的方式。在這個(gè)例子中,我們會(huì)同步Person。

在JVM中,你可以同步任何不為null的實(shí)例。

class Person(var name: String) {
  def set(changedName: String) {
    this.synchronized {
      name = changedName
    }
  }
}

volatile

隨著Java 5內(nèi)存模型的變化,volatile和synchronized基本上是相同的,除了volatile允許空值。

synchronized?允許更細(xì)粒度的鎖。 而?volatile?則對(duì)每次訪問同步。

class Person(@volatile var name: String) {
  def set(changedName: String) {
    name = changedName
  }
}

AtomicReference

此外,在Java 5中還添加了一系列低級(jí)別的并發(fā)原語。?AtomicReference?類是其中之一

import java.util.concurrent.atomic.AtomicReference

class Person(val name: AtomicReference[String]) {
  def set(changedName: String) {
    name.set(changedName)
  }
}

這個(gè)成本是什么?

AtomicReference?是這兩種選擇中最昂貴的,因?yàn)槟惚仨毴ネㄟ^方法調(diào)度(method dispatch)來訪問值。

volatile?和?synchronized?是建立在Java的內(nèi)置監(jiān)視器基礎(chǔ)上的。如果沒有資源爭用,監(jiān)視器的成本很小。由于?synchronized?允許你進(jìn)行更細(xì)粒度的控制權(quán),從而會(huì)有更少的爭奪,所以?synchronized?往往是最好的選擇。

當(dāng)你進(jìn)入同步點(diǎn),訪問volatile引用,或去掉AtomicReferences引用時(shí), Java會(huì)強(qiáng)制處理器刷新其緩存線從而提供了一致的數(shù)據(jù)視圖。

如果我錯(cuò)了,請(qǐng)大家指正。這是一個(gè)復(fù)雜的課題,我敢肯定要弄清楚這一點(diǎn)需要一個(gè)漫長的課堂討論。

Java5的其他靈巧的工具

正如前面提到的?AtomicReference?,Java5帶來了許多很棒的工具。

CountDownLatch

CountDownLatch?是一個(gè)簡單的多線程互相通信的機(jī)制。

val doneSignal = new CountDownLatch(2)
doAsyncWork(1)
doAsyncWork(2)

doneSignal.await()
println("both workers finished!")

先不說別的,這是一個(gè)優(yōu)秀的單元測(cè)試。比方說,你正在做一些異步工作,并要確保功能完成。你的函數(shù)只需要?倒數(shù)計(jì)數(shù)(countDown)?并在測(cè)試中?等待(await)?就可以了。

AtomicInteger/Long

由于對(duì)Int和Long遞增是一個(gè)經(jīng)常用到的任務(wù),所以增加了?AtomicInteger?和?AtomicLong?。

AtomicBoolean

我可能不需要解釋這是什么。

ReadWriteLocks

讀寫鎖(ReadWriteLock)?使你擁有了讀線程和寫線程的鎖控制。當(dāng)寫線程獲取鎖的時(shí)候讀線程只能等待。

讓我們構(gòu)建一個(gè)不安全的搜索引擎

下面是一個(gè)簡單的倒排索引,它不是線程安全的。我們的倒排索引按名字映射到一個(gè)給定的用戶。

這里的代碼天真地假設(shè)只有單個(gè)線程來訪問。

注意使用了?mutable.HashMap?替代了默認(rèn)的構(gòu)造函數(shù)?this()

import scala.collection.mutable

case class User(name: String, id: Int)

class InvertedIndex(val userMap: mutable.Map[String, User]) {

  def this() = this(new mutable.HashMap[String, User])

  def tokenizeName(name: String): Seq[String] = {
    name.split(" ").map(_.toLowerCase)
  }

  def add(term: String, user: User) {
    userMap += term -> user
  }

  def add(user: User) {
    tokenizeName(user.name).foreach { term =>
      add(term, user)
    }
  }
}

這里沒有寫如何從索引中獲取用戶。稍后我們會(huì)補(bǔ)充。

讓我們把它變?yōu)榫€程安全

在上面的倒排索引例子中,userMap不能保證是線程安全的。多個(gè)客戶端可以同時(shí)嘗試添加項(xiàng)目,并有可能出現(xiàn)前面?Person?例子中的視圖錯(cuò)誤。

由于userMap不是線程安全的,那我們?cè)鯓颖3衷谕粋€(gè)時(shí)間只有一個(gè)線程能改變它呢?

你可能會(huì)考慮在做添加操作時(shí)鎖定userMap。

def add(user: User) {
  userMap.synchronized {
    tokenizeName(user.name).foreach { term =>
      add(term, user)
    }
  }
}

不幸的是,這個(gè)粒度太粗了。一定要試圖在互斥鎖以外做盡可能多的耗時(shí)的工作。還記得我說過如果不存在資源爭奪,鎖開銷就會(huì)很小嗎。如果在鎖代碼塊里面做的工作越少,爭奪就會(huì)越少。

def add(user: User) {
  // tokenizeName was measured to be the most expensive operation.
  val tokens = tokenizeName(user.name)

  tokens.foreach { term =>
    userMap.synchronized {
      add(term, user)
    }
  }
}

SynchronizedMap

我們可以通過SynchronizedMap特質(zhì)將同步混入一個(gè)可變的HashMap。

我們可以擴(kuò)展現(xiàn)有的InvertedIndex,提供給用戶一個(gè)簡單的方式來構(gòu)建同步索引。

import scala.collection.mutable.SynchronizedMap

class SynchronizedInvertedIndex(userMap: mutable.Map[String, User]) extends InvertedIndex(userMap) {
  def this() = this(new mutable.HashMap[String, User] with SynchronizedMap[String, User])
}

如果你看一下其實(shí)現(xiàn),你就會(huì)意識(shí)到,它只是在每個(gè)方法上加同步鎖來保證其安全性,所以它很可能沒有你希望的性能。

Java ConcurrentHashMap

Java有一個(gè)很好的線程安全的ConcurrentHashMap。值得慶幸的是,我們可以通過JavaConverters獲得不錯(cuò)的Scala語義。

事實(shí)上,我們可以通過擴(kuò)展老的不安全的代碼,來無縫地接入新的線程安全I(xiàn)nvertedIndex。

import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._

class ConcurrentInvertedIndex(userMap: collection.mutable.ConcurrentMap[String, User])
    extends InvertedIndex(userMap) {

  def this() = this(new ConcurrentHashMap[String, User] asScala)
}

讓我們加載InvertedIndex

原始方式

trait UserMaker {
  def makeUser(line: String) = line.split(",") match {
    case Array(name, userid) => User(name, userid.trim().toInt)
  }
}

class FileRecordProducer(path: String) extends UserMaker {
  def run() {
    Source.fromFile(path, "utf-8").getLines.foreach { line =>
      index.add(makeUser(line))
    }
  }
}

對(duì)于文件中的每一行,我們可以調(diào)用?makeUser?然后?add?到 InvertedIndex中。如果我們使用并發(fā)InvertedIndex,我們可以并行調(diào)用add因?yàn)閙akeUser沒有副作用,所以我們的代碼已經(jīng)是線程安全的了。

我們不能并行讀取文件,但我們?可以?并行構(gòu)造用戶并且把它添加到索引中。

一個(gè)解決方案:生產(chǎn)者/消費(fèi)者

異步計(jì)算的一個(gè)常見模式是把消費(fèi)者和生產(chǎn)者分開,讓他們只能通過?隊(duì)列(Queue)?溝通。讓我們看看如何將這個(gè)模式應(yīng)用在我們的搜索引擎索引中。

import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}

// Concrete producer
class Producer[T](path: String, queue: BlockingQueue[T]) extends Runnable {
  def run() {
    Source.fromFile(path, "utf-8").getLines.foreach { line =>
      queue.put(line)
    }
  }
}

// Abstract consumer
abstract class Consumer[T](queue: BlockingQueue[T]) extends Runnable {
  def run() {
    while (true) {
      val item = queue.take()
      consume(item)
    }
  }

  def consume(x: T)
}

val queue = new LinkedBlockingQueue[String]()

// One thread for the producer
val producer = new Producer[String]("users.txt", q)
new Thread(producer).start()

trait UserMaker {
  def makeUser(line: String) = line.split(",") match {
    case Array(name, userid) => User(name, userid.trim().toInt)
  }
}

class IndexerConsumer(index: InvertedIndex, queue: BlockingQueue[String]) extends Consumer[String](queue) with UserMaker {
  def consume(t: String) = index.add(makeUser(t))
}

// Let's pretend we have 8 cores on this machine.
val cores = 8
val pool = Executors.newFixedThreadPool(cores)

// Submit one consumer per core.
for (i <- i to cores) {
  pool.submit(new IndexerConsumer[String](index, q))
}

Built at?@twitter?by?@stevej,?@marius, and?@lahosken?with much help from?@evanm,?@sprsquish,?@kevino,?@zuercher,?@timtrueman,?@wickman, and@mccv; Russian translation by?appigram; Chinese simple translation by?jasonqu; Korean translation by?enshahar;

Licensed under the?Apache License v2.0.

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)