Akka-Stream implementation slower than single threaded implementation

Akka Streams is using asynchronous message passing between Actors to implement stream processing stages. Passing data across an asynchronous boundary has an overhead that you are seeing here: your computation seems to take only about 160ns (derived from the single-threaded measurement) while the streaming solution takes roughly 1µs per element, which is dominated by the … Read more

Accessing the underlying ActorRef of an akka stream Source created by Source.actorRef

You need a Flow: import akka.stream.OverflowStrategy.fail import akka.stream.scaladsl.Source import akka.stream.scaladsl.{Sink, Flow} case class Weather(zip : String, temp : Double, raining : Boolean) val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail) val sunnySource = weatherSource.filter(!_.raining) val ref = Flow[Weather] .to(Sink.ignore) .runWith(sunnySource) ref ! Weather(“02139”, 32.0, true) Remember this is all experimental and may change!

How to create a Source that can receive elements later via a method call?

There are three ways this can be achieved: 1. Post Materialization with SourceQueue You can use Source.queue that materializes the Flow into a SourceQueue: case class Weather(zipCode : String, temperature : Double, raining : Boolean) val bufferSize = 100 //if the buffer fills up then this strategy drops the oldest elements //upon the arrival of … Read more