Class Experiment10_PrimeStreams
Experiment 3 - Prime Numbers
Calculating Prime Numbers is always fun, and it's most fun when we try to compute prime numbers as fast as possible, or as many at once as we can. As we will see, Project Loom offers us no benefits in this use case, but it's interesting to experiment and see why.
As I have mentioned before, with Concurrent Programming, higher levels of abstraction are generally better
and safer because they have been designed and implemented by experts, and it's better to stand on the
shoulders of giants. Generally, for CPU Bound use cases, the Stream
Interface
is a good place to start as it has been specifically design for such use cases.
Streams
The Streams Architecture has three important phases
- Source
- Something that generates a stream of objects, such as a range of numbers, in our case, numbers that could be prime
- Pipeline
- A sequence of Functions on each object in the stream, in our case, testing if a number is prime,
which leverages the
Stream.filter(Predicate)
function. - Sink
- Ultimately, we need somewhere to Collect the Stream of objects. This is also known as *Terminating* the Stream.
Flow
is more akin to
Akka Streams, and in Akka Streams, they actually use the term Flow. Hopefully later we
can see the effects of running Java Flows with Project Loom...
There are only two hard things in Computer Science: cache invalidation and naming things.As we can see in— Phil Karlton
serialPrimes(long)
it is very easy to express taking a range
of numbers, and filtering out the primes. From parallelPrimes(long)
we can see
how easy it is to change this to parallel computation. However, with some experimentation, we can easily
see that unless we have a large set of computations, 10,000 or more, making the computations parallel
does not really buy us much. This is because the overhead of Concurrent Operation can easily overwhelm
any benefits of parallelism.
Threads
For the rest of the experiments, we can conclude that that attempts to use Threads to do better than basic
BaseStream.parallel()
generally fail. We can also see that the code is substantially more
complicated too...
However, where Threads really shine is when we are dealing with concurrency where there are a lot of blocking operations, such as transaction processing, network access, etc. Every blocking operation is an opportunity for some other task to run and progress. From the Structured Concurrency JEP
Structured concurrency is a counterpart to parallel streams and (their underlying mechanism) ForkJoinPool. Those are concerned with data-parallelism and computation, and ForkJoinPool also employs "structured parallelism" where forks are followed by joins. But as concurrency focuses more on interaction – through I/O and/or message passing – than pure data processing, structured concurrency places a special emphasis on handling cancellation and partial failures, and its goal isn't just to assist in writing a correct algorithm, but also to express the application's logical unit in a manner that is reflected both in the code's structure as well as in runtime observation with various service tools.In particular, Parallel Streams are intended for simpler internal processing on hardware cores/threads, but are not well suited for external processing, such as HTTP REST calls. For Distributed Parallel Computation,
Flow
might be a better tool to use which is more similar to Parallel Streams, but this is a Reactive
design, and with Virtual Threads, there might be better solutions that don't add the complexity of Reactive
design. From
Reactive Programming with Java 9's Flow
Reactive Programming is not the new hype to replace Functional Programming. Both are compatible and work perfectly together. While the Streams API introduced in Java 8 is perfect to process data streams (map, reduce and all the variants), the Flow API shines on the communication side (request, slow down, drop, block, etc.). You can use Streams as data sources for Publisher, blocking them or dropping items as needed. You can also use them on your Subscriber’s side, for example, to perform aggregations after receiving some items. Not to mention all the rest of programming logic in which reactive streams doesn’t fit but yet it can be written in a functional style and be ten times more readable and easier to maintain than in imperative programming.
For these experiments we warp our primes experiments into a pseudo networking application, where we simulate
farming isPrime() out to HTTP Endpoints. This simulation basically wraps the
isPrime(long, long, long, AtomicLong, AtomicLong)
calculation with network latency via
Thread.sleep(long)
, one to simulate Request Latency, another to simulate Response
Latency.
The spirit of the experiments here is that we might have some application that is computationally expensive and
takes a long time. While BaseStream.parallel()
works just fine on limited scale, we might imagine a magic
cloud that has some /isPrime endpoints where the work can be done faster, more efficiently, etc. Indeed,
this is a classic
MapReduce
model. To be sure, if we really wanted to generate prime numbers in a
High-Performance Computing
style, we might use something like Apache Spark,
but here we are trying to keep things simple, and make a point, not a perfect application.
Benchmarking
When trying to compare the performance of different Concurrent and Parallel approaches, it's important to conduct benchmarks, and one tool to use is the Java Microbenchmark Harness (JMH). However, it can take quite a long time for JMH to run benchmarks. By default, JMH runs 25 warmups, and 25 benchmarks, then averages the results. For quicker results, the experiments in this code take a rather casual approach to get a feel for things, leaving the heavy lifting to JMH.
Streams
See benchmarks/PrimeNumbers for the code that runs these the JMH Benchmarks.
Benchmark Mode Cnt Score Error PrimeNumbers.parallelPrimesTo_1000 avgt 25 46.620 ± 1.549 PrimeNumbers.parallelPrimesTo_10_000 avgt 25 379.036 ± 7.604 PrimeNumbers.parallelPrimesTo_10_000_000 avgt 25 1360362.823 ± 14723.084 PrimeNumbers.serialPrimesTo_1000 avgt 25 32.998 ± 0.315 PrimeNumbers.serialPrimesTo_10_000 avgt 25 644.467 ± 16.160 PrimeNumbers.serialPrimesTo_10_000_000 avgt 25 8199848.272 ± 84514.067
where the Score is the Average Microseconds (μS) to complete the run. Given we only test odd numbers we can conclude from testing numbers if they are prime, where throughput = tests per μS
Benchmark tested throughput ratio PrimeNumbers.parallelPrimesTo_1000 500 10.725 0.708 PrimeNumbers.parallelPrimesTo_10_000 5,000 13.191 1.700 PrimeNumbers.parallelPrimesTo_10_000_000 5,000,000 3.675 6.025 PrimeNumbers.serialPrimesTo_1000 500 15.152 1.413 PrimeNumbers.serialPrimesTo_10_000 5,000 7.758 0.588 PrimeNumbers.serialPrimesTo_10_000_000 5,000,000 0.610 0.166
From this we can conclude
As an aside, computing primes up to
- 1,000, there are 167 primes
- 10,000, there are 1228 primes
- 10,000,000 there are 664578 primes
Project Loom
Pure Computation
When I first started playing with these experiments I naïvely thought that using Project Loom could improve upon the previous benchmarks. Even though I had already watched Ron Pressler - Loom: Bringing Lightweight Threads and Delimited Continuations to the JVM, I had not really appreciated or internalized that knowledge. But, by naïvely concocting my own experiments, I had empirical evidence that supported what Ron Pressler had already stated.
Without using JMH, playing around here I discovered
BaseStream.parallel()
within a VirtualThread ExecutorService context did not improve
throughput. In most cases it was a little worse.
ExecutorService.invokeAll(Collection)
has about 1/3 the throughput of Stream.parallel()
for testing up to 10,000,000 primes.
ExecutorService.submit(Callable)
has about 1/2 the throughput of Stream.parallel(),
which is better than ExecutorService#.invokeAll(Collection)
Simulated Networking
The bottom line is, that for raw computation, stick with the Java Streams API, and Parallel Streams. However, how does Project Loom compare when simulating a Network Application, where we might be farming prime computations out to some HTTP Endpoint?
Using the same isPrime(long, long, long, AtomicLong, AtomicLong)
code as the previous benchmarks,
where we use isPrime(candiate, minimumLag, maximumLag) with minimumLag = 10 ms and
maximumLag = 30 ms, times 2, or a total of 20 ms minimum and 60 ms maximum, where the actual
lag is random; we simulate some network blocking overhead by using Thread.sleep(long)
before the prime
test and after the prime test to simulate the HTTP Request overhead and the HTTP Response overhead. More
importantly, these two sleep() requests give the Thread schedulers a chance to let other tasks run.
Implicitly, there is the actual lag of the isPrime(candidate) computation itself, which leads me to believe
that this is actually a pretty good simulation.
Benchmark Mode Cnt Score Error PrimeThreads.platformPrimesTo_1000 avgt 25 122.004 ± 4.888 PrimeThreads.platformPrimesTo_10_000 avgt 25 986.577 ± 55.197 PrimeThreads.platformPrimesTo_10_000_000 avgt 25 1043651.917 ± 139003.151 PrimeThreads.virtualPrimesTo_1000 avgt 25 33.420 ± 1.142 PrimeThreads.virtualPrimesTo_10_000 avgt 25 53.476 ± 0.610 PrimeThreads.virtualPrimesTo_10_000_000 avgt 25 33058.254 ± 1171.181 Benchmark tested throughput ratio PrimeThreads.platformPrimesTo_1000 500 4.098 0.274 PrimeThreads.platformPrimesTo_10_000 5,000 5.068 0.054 PrimeThreads.platformPrimesTo_10_000_000 5,000,000 4.791 0.032 PrimeThreads.virtualPrimesTo_1000 500 14.961 3.651 PrimeThreads.virtualPrimesTo_10_000 5,000 93.500 18.449 PrimeThreads.virtualPrimesTo_10_000_000 5,000,000 151.248 31.569
Based on the JMH results (after a time of 14:56:54), where we are using units of Milliseconds,
Many thanks to Ron Pressler who responded to my email regarding benchmarking Project Loom, and patiently clarified my thinking on how to benchmark, and more importantly how to interprest and explain benchmarks.
-
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionstatic void
futurePrimes1
(long limit, ThreadFactory threadFactory) static void
futurePrimes12
(long limit, ThreadFactory threadFactory) static void
futurePrimes2
(long limit, ThreadFactory threadFactory) static void
futurePrimes22
(long limit, ThreadFactory threadFactory) static void
futurePrimes33
(long limit, ThreadFactory threadFactory) static long[]
static void
static void
parallelPrimes
(long limit) static void
parallelPrimes2
(long limit) primeThreads
(long limit, ExecutorService executorService) static long[]
serialPrimes
(long limit) static long[]
serialPrimes2
(long limit) static void
suite1()
static void
suite2()
static void
suite3
(long limit) primeThreads: threadMaximum = 499 primeThreads: threadMaximum = 499 primeThreads: threadMaximum = 256 primeThreads: threadMaximum = 149 to 1,000 to 10,000 to 10,000,000 tasks op/ms ratio tasks op/ms ratio tasks op/ms ratio virtualCachedThreadPool 499 79, 74, 5 6.329 3.224 3711 114, 113, 1 34.722 1.625 3103283 43308, 43128, 180 115.452 0.834 virtualThreadPerTaskExecutor 499 57, 57, 0 8.772 2.599 4906 76, 76, 0 65.789 13.894 3454111 36032, 35866, 166 138.765 31.616 platformCachedThreadPool 256 81, 80, 1 1.953 0.310 828 234, 233, 1 21.367 0.615 3690 36133, 35641, 492 138.378 1.198 platformThreadPerTaskExecutor 149 115, 115, 0 3.356 0.385 140 1049, 1048, 1 4.766 0.072 186 1139449, 1139315, 134 4.389 0.032 Prime numbers to 1,000 to 10,000 to 10,000,000 tasks op/ms ratio tasks op/ms ratio tasks op/ms ratio virtualCachedThreadPool 499 6.329 3.224 3711 34.722 1.625 3103283 115.452 0.834 virtualThreadPerTaskExecutor 499 8.772 2.599 4906 65.789 3.894 3454111 138.765 31.616 platformCachedThreadPool 256 1.953 0.310 828 21.367 0.615 3690 138.378 1.198 platformThreadPerTaskExecutor 149 3.356 0.385 140 4.766 0.072 186 4.389 0.032 primeThreads: threadMaximum = 3711 primeThreads: threadMaximum = 4906 primeThreads: threadMaximum = 828 primeThreads: threadMaximum = 140 virtualCachedThreadPool 114, 113, 1 virtualThreadPerTaskExecutor 76, 76, 0 platformCachedThreadPool 234, 233, 1 platformThreadPerTaskExecutor 1049, 1048, 1 primeThreads: threadMaximum = 3103283 primeThreads: threadMaximum = 3454111 primeThreads: threadMaximum = 3690 primeThreads: threadMaximum = 186 virtualCachedThreadPool 43308, 43128, 180 virtualThreadPerTaskExecutor 36032, 35866, 166 platformCachedThreadPool 36133, 35641, 492 platformThreadPerTaskExecutor 1139449, 1139315, 134static void
virtualPrimes
(long limit, ThreadFactory threadFactory) static void
virtualPrimes2
(long limit, ThreadFactory threadFactory)
-
Constructor Details
-
Experiment10_PrimeStreams
public Experiment10_PrimeStreams()
-
-
Method Details
-
main
-
suite1
public static void suite1() -
suite2
public static void suite2() -
getPrimes
-
suite3
public static void suite3(long limit) primeThreads: threadMaximum = 499 primeThreads: threadMaximum = 499 primeThreads: threadMaximum = 256 primeThreads: threadMaximum = 149 to 1,000 to 10,000 to 10,000,000 tasks op/ms ratio tasks op/ms ratio tasks op/ms ratio virtualCachedThreadPool 499 79, 74, 5 6.329 3.224 3711 114, 113, 1 34.722 1.625 3103283 43308, 43128, 180 115.452 0.834 virtualThreadPerTaskExecutor 499 57, 57, 0 8.772 2.599 4906 76, 76, 0 65.789 13.894 3454111 36032, 35866, 166 138.765 31.616 platformCachedThreadPool 256 81, 80, 1 1.953 0.310 828 234, 233, 1 21.367 0.615 3690 36133, 35641, 492 138.378 1.198 platformThreadPerTaskExecutor 149 115, 115, 0 3.356 0.385 140 1049, 1048, 1 4.766 0.072 186 1139449, 1139315, 134 4.389 0.032 Prime numbers to 1,000 to 10,000 to 10,000,000 tasks op/ms ratio tasks op/ms ratio tasks op/ms ratio virtualCachedThreadPool 499 6.329 3.224 3711 34.722 1.625 3103283 115.452 0.834 virtualThreadPerTaskExecutor 499 8.772 2.599 4906 65.789 3.894 3454111 138.765 31.616 platformCachedThreadPool 256 1.953 0.310 828 21.367 0.615 3690 138.378 1.198 platformThreadPerTaskExecutor 149 3.356 0.385 140 4.766 0.072 186 4.389 0.032 primeThreads: threadMaximum = 3711 primeThreads: threadMaximum = 4906 primeThreads: threadMaximum = 828 primeThreads: threadMaximum = 140 virtualCachedThreadPool 114, 113, 1 virtualThreadPerTaskExecutor 76, 76, 0 platformCachedThreadPool 234, 233, 1 platformThreadPerTaskExecutor 1049, 1048, 1 primeThreads: threadMaximum = 3103283 primeThreads: threadMaximum = 3454111 primeThreads: threadMaximum = 3690 primeThreads: threadMaximum = 186 virtualCachedThreadPool 43308, 43128, 180 virtualThreadPerTaskExecutor 36032, 35866, 166 platformCachedThreadPool 36133, 35641, 492 platformThreadPerTaskExecutor 1139449, 1139315, 134
- Parameters:
limit
-
-
primeThreads
-
serialPrimes
public static long[] serialPrimes(long limit) -
serialPrimes2
public static long[] serialPrimes2(long limit) -
parallelPrimes
public static void parallelPrimes(long limit) -
parallelPrimes2
public static void parallelPrimes2(long limit) -
virtualPrimes
-
virtualPrimes2
-
futurePrimes1
-
futurePrimes12
-
futurePrimes2
-
futurePrimes22
-
futurePrimes33
-