Class Experiment10_PrimeStreams

java.lang.Object
net.kolotyluk.loom.Experiment10_PrimeStreams

public class Experiment10_PrimeStreams extends Object
TODO - Experiment with Java Flow

Experiment 3 - Prime Numbers

Calculating Prime Numbers is always fun, and it's most fun when we try to compute prime numbers as fast as possible, or as many at once as we can. As we will see, Project Loom offers us no benefits in this use case, but it's interesting to experiment and see why.

As I have mentioned before, with Concurrent Programming, higher levels of abstraction are generally better and safer because they have been designed and implemented by experts, and it's better to stand on the shoulders of giants. Generally, for CPU Bound use cases, the Stream Interface is a good place to start as it has been specifically design for such use cases.

Streams

The Streams Architecture has three important phases

Source
Something that generates a stream of objects, such as a range of numbers, in our case, numbers that could be prime
Pipeline
A sequence of Functions on each object in the stream, in our case, testing if a number is prime, which leverages the Stream.filter(Predicate) function.
Sink
Ultimately, we need somewhere to Collect the Stream of objects. This is also known as *Terminating* the Stream.
Note: Unlike Akka Streams, Java Streams are not Infinite, they must terminate, and indeed, the Pipeline does not start until it is actually terminated with some collection function. The Java Flow is more akin to Akka Streams, and in Akka Streams, they actually use the term Flow. Hopefully later we can see the effects of running Java Flows with Project Loom...
There are only two hard things in Computer Science: cache invalidation and naming things.

— Phil Karlton

As we can see in serialPrimes(long) it is very easy to express taking a range of numbers, and filtering out the primes. From parallelPrimes(long) we can see how easy it is to change this to parallel computation. However, with some experimentation, we can easily see that unless we have a large set of computations, 10,000 or more, making the computations parallel does not really buy us much. This is because the overhead of Concurrent Operation can easily overwhelm any benefits of parallelism.

Threads

For the rest of the experiments, we can conclude that that attempts to use Threads to do better than basic BaseStream.parallel() generally fail. We can also see that the code is substantially more complicated too...

However, where Threads really shine is when we are dealing with concurrency where there are a lot of blocking operations, such as transaction processing, network access, etc. Every blocking operation is an opportunity for some other task to run and progress. From the Structured Concurrency JEP

Structured concurrency is a counterpart to parallel streams and (their underlying mechanism) ForkJoinPool. Those are concerned with data-parallelism and computation, and ForkJoinPool also employs "structured parallelism" where forks are followed by joins. But as concurrency focuses more on interaction – through I/O and/or message passing – than pure data processing, structured concurrency places a special emphasis on handling cancellation and partial failures, and its goal isn't just to assist in writing a correct algorithm, but also to express the application's logical unit in a manner that is reflected both in the code's structure as well as in runtime observation with various service tools.
In particular, Parallel Streams are intended for simpler internal processing on hardware cores/threads, but are not well suited for external processing, such as HTTP REST calls. For Distributed Parallel Computation, Flow might be a better tool to use which is more similar to Parallel Streams, but this is a Reactive design, and with Virtual Threads, there might be better solutions that don't add the complexity of Reactive design. From Reactive Programming with Java 9's Flow
Reactive Programming is not the new hype to replace Functional Programming. Both are compatible and work perfectly together. While the Streams API introduced in Java 8 is perfect to process data streams (map, reduce and all the variants), the Flow API shines on the communication side (request, slow down, drop, block, etc.). You can use Streams as data sources for Publisher, blocking them or dropping items as needed. You can also use them on your Subscriber’s side, for example, to perform aggregations after receiving some items. Not to mention all the rest of programming logic in which reactive streams doesn’t fit but yet it can be written in a functional style and be ten times more readable and easier to maintain than in imperative programming.

For these experiments we warp our primes experiments into a pseudo networking application, where we simulate farming isPrime() out to HTTP Endpoints. This simulation basically wraps the isPrime(long, long, long, AtomicLong, AtomicLong) calculation with network latency via Thread.sleep(long), one to simulate Request Latency, another to simulate Response Latency.

The spirit of the experiments here is that we might have some application that is computationally expensive and takes a long time. While BaseStream.parallel() works just fine on limited scale, we might imagine a magic cloud that has some /isPrime endpoints where the work can be done faster, more efficiently, etc. Indeed, this is a classic MapReduce model. To be sure, if we really wanted to generate prime numbers in a High-Performance Computing style, we might use something like Apache Spark, but here we are trying to keep things simple, and make a point, not a perfect application.

Benchmarking

When trying to compare the performance of different Concurrent and Parallel approaches, it's important to conduct benchmarks, and one tool to use is the Java Microbenchmark Harness (JMH). However, it can take quite a long time for JMH to run benchmarks. By default, JMH runs 25 warmups, and 25 benchmarks, then averages the results. For quicker results, the experiments in this code take a rather casual approach to get a feel for things, leaving the heavy lifting to JMH.

Streams

See benchmarks/PrimeNumbers for the code that runs these the JMH Benchmarks.

 Benchmark                                  Mode  Cnt        Score        Error
 PrimeNumbers.parallelPrimesTo_1000         avgt   25       46.620 ±      1.549
 PrimeNumbers.parallelPrimesTo_10_000       avgt   25      379.036 ±      7.604
 PrimeNumbers.parallelPrimesTo_10_000_000   avgt   25  1360362.823 ±  14723.084
 PrimeNumbers.serialPrimesTo_1000           avgt   25       32.998 ±      0.315
 PrimeNumbers.serialPrimesTo_10_000         avgt   25      644.467 ±     16.160
 PrimeNumbers.serialPrimesTo_10_000_000     avgt   25  8199848.272 ±  84514.067
 

where the Score is the Average Microseconds (μS) to complete the run. Given we only test odd numbers we can conclude from testing numbers if they are prime, where throughput = tests per μS

 Benchmark                                     tested  throughput  ratio
 PrimeNumbers.parallelPrimesTo_1000               500      10.725  0.708
 PrimeNumbers.parallelPrimesTo_10_000           5,000      13.191  1.700
 PrimeNumbers.parallelPrimesTo_10_000_000   5,000,000       3.675  6.025
 PrimeNumbers.serialPrimesTo_1000                 500      15.152  1.413
 PrimeNumbers.serialPrimesTo_10_000             5,000       7.758  0.588
 PrimeNumbers.serialPrimesTo_10_000_000     5,000,000       0.610  0.166
 

From this we can conclude

  • For smaller numbers of computations, O(1,000), Serial Stream has more throughput than Parallel Stream. This is because, Parallel Stream requires Concurrency, and Concurrency has overhead.
  • For larger numbers of computations, O(10,000), Parallel Stream has more throughput than Serial Stream. This is because the power of parallelism overcomes the overhead of concurrency.
  • For really large numbers of computations, O(10,000,000), Parallel Stream really shines over Serial Stream. In this use case, it is computationally more expensive to test large numbers if they are prime, so the dominant factor is raw CPU, and in this case, computation has been spread across 12 CPUs.
  • For O(10,000,000) computations we can see the throughput of Parallel Streams is much lower than O(10,000), but still much better than O(10,000,000) Serial Stream, so the throughput sweet spot for Parallel Stream in this use case is somewhere between O(10,000) and O(10,000,000). This is explained because it takes longer to test larger numbers if they are prime, therefore throughput will begin decreasing. This is why it's important to benchmark real applications, and not synthetic experiments. As we will see later, however, synthetic experiments do have their place.
  • As an aside, computing primes up to

    • 1,000, there are 167 primes
    • 10,000, there are 1228 primes
    • 10,000,000 there are 664578 primes

    Project Loom

    Pure Computation

    When I first started playing with these experiments I naïvely thought that using Project Loom could improve upon the previous benchmarks. Even though I had already watched Ron Pressler - Loom: Bringing Lightweight Threads and Delimited Continuations to the JVM, I had not really appreciated or internalized that knowledge. But, by naïvely concocting my own experiments, I had empirical evidence that supported what Ron Pressler had already stated.

    Without using JMH, playing around here I discovered

  • Running BaseStream.parallel() within a VirtualThread ExecutorService context did not improve throughput. In most cases it was a little worse.
  • Replacing Stream.parallel() with a VirtualThread ExecutorService that calls ExecutorService.invokeAll(Collection) has about 1/3 the throughput of Stream.parallel() for testing up to 10,000,000 primes.
  • Replacing Stream.parallel() with a VirtualThread ExecutorService that calls ExecutorService.submit(Callable) has about 1/2 the throughput of Stream.parallel(), which is better than ExecutorService#.invokeAll(Collection)
  • Comparing Parallel Streams to Virtual Threads is sort of like comparing oranges to grapefruit, but in a sense they are both citrus. However, comparing ExecutorService#.invokeAll(Collection) to ExecutorService.submit(Callable) is more like comparing navel oranges to mandarin oranges. While ExecutorService#.invokeAll(Collection) looks more attractive, it might not perform as well as we would expect. This is still a naïve test, so it should be invested further with more rigour and analysis.
  • Simulated Networking

    The bottom line is, that for raw computation, stick with the Java Streams API, and Parallel Streams. However, how does Project Loom compare when simulating a Network Application, where we might be farming prime computations out to some HTTP Endpoint?

    Using the same isPrime(long, long, long, AtomicLong, AtomicLong) code as the previous benchmarks, where we use isPrime(candiate, minimumLag, maximumLag) with minimumLag = 10 ms and maximumLag = 30 ms, times 2, or a total of 20 ms minimum and 60 ms maximum, where the actual lag is random; we simulate some network blocking overhead by using Thread.sleep(long) before the prime test and after the prime test to simulate the HTTP Request overhead and the HTTP Response overhead. More importantly, these two sleep() requests give the Thread schedulers a chance to let other tasks run. Implicitly, there is the actual lag of the isPrime(candidate) computation itself, which leads me to believe that this is actually a pretty good simulation.

     Benchmark                                 Mode  Cnt        Score        Error
     PrimeThreads.platformPrimesTo_1000        avgt   25      122.004 ±      4.888
     PrimeThreads.platformPrimesTo_10_000      avgt   25      986.577 ±     55.197
     PrimeThreads.platformPrimesTo_10_000_000  avgt   25  1043651.917 ± 139003.151
     PrimeThreads.virtualPrimesTo_1000         avgt   25       33.420 ±      1.142
     PrimeThreads.virtualPrimesTo_10_000       avgt   25       53.476 ±      0.610
     PrimeThreads.virtualPrimesTo_10_000_000   avgt   25    33058.254 ±   1171.181
    
     Benchmark                                     tested  throughput   ratio
     PrimeThreads.platformPrimesTo_1000               500       4.098   0.274
     PrimeThreads.platformPrimesTo_10_000           5,000       5.068   0.054
     PrimeThreads.platformPrimesTo_10_000_000   5,000,000       4.791   0.032
     PrimeThreads.virtualPrimesTo_1000                500      14.961   3.651
     PrimeThreads.virtualPrimesTo_10_000            5,000      93.500  18.449
     PrimeThreads.virtualPrimesTo_10_000_000    5,000,000     151.248  31.569
     

    Based on the JMH results (after a time of 14:56:54), where we are using units of Milliseconds,

  • I am really surprised I was able to have 5,000,000 Platform Threads running, but it took an incredibly long time
  • On a scale of 5,000,000 threads, Virtual Threads have 32 times the throughput of Platform Threads
  • Given how long my system was running with fans at full, I think Project Loom deserves bragging rights that it also can save energy. It would be interesting to run this benchmark again with my power meter to see how many Joules was consumed by each benchmark.
  • Many thanks to Ron Pressler who responded to my email regarding benchmarking Project Loom, and patiently clarified my thinking on how to benchmark, and more importantly how to interprest and explain benchmarks.


    Author:
    eric@kolotyluk.net
    See Also:
    • Constructor Summary

      Constructors
      Constructor
      Description
       
    • Method Summary

      Modifier and Type
      Method
      Description
      static void
      futurePrimes1(long limit, ThreadFactory threadFactory)
       
      static void
      futurePrimes12(long limit, ThreadFactory threadFactory)
       
      static void
      futurePrimes2(long limit, ThreadFactory threadFactory)
       
      static void
      futurePrimes22(long limit, ThreadFactory threadFactory)
       
      static void
      futurePrimes33(long limit, ThreadFactory threadFactory)
       
      static long[]
       
      static void
      main(String[] args)
       
      static void
      parallelPrimes(long limit)
       
      static void
      parallelPrimes2(long limit)
       
      static List<Future<Long>>
      primeThreads(long limit, ExecutorService executorService)
       
      static long[]
      serialPrimes(long limit)
       
      static long[]
      serialPrimes2(long limit)
       
      static void
       
      static void
       
      static void
      suite3(long limit)
      primeThreads: threadMaximum = 499 primeThreads: threadMaximum = 499 primeThreads: threadMaximum = 256 primeThreads: threadMaximum = 149 to 1,000 to 10,000 to 10,000,000 tasks op/ms ratio tasks op/ms ratio tasks op/ms ratio virtualCachedThreadPool 499 79, 74, 5 6.329 3.224 3711 114, 113, 1 34.722 1.625 3103283 43308, 43128, 180 115.452 0.834 virtualThreadPerTaskExecutor 499 57, 57, 0 8.772 2.599 4906 76, 76, 0 65.789 13.894 3454111 36032, 35866, 166 138.765 31.616 platformCachedThreadPool 256 81, 80, 1 1.953 0.310 828 234, 233, 1 21.367 0.615 3690 36133, 35641, 492 138.378 1.198 platformThreadPerTaskExecutor 149 115, 115, 0 3.356 0.385 140 1049, 1048, 1 4.766 0.072 186 1139449, 1139315, 134 4.389 0.032 Prime numbers to 1,000 to 10,000 to 10,000,000 tasks op/ms ratio tasks op/ms ratio tasks op/ms ratio virtualCachedThreadPool 499 6.329 3.224 3711 34.722 1.625 3103283 115.452 0.834 virtualThreadPerTaskExecutor 499 8.772 2.599 4906 65.789 3.894 3454111 138.765 31.616 platformCachedThreadPool 256 1.953 0.310 828 21.367 0.615 3690 138.378 1.198 platformThreadPerTaskExecutor 149 3.356 0.385 140 4.766 0.072 186 4.389 0.032 primeThreads: threadMaximum = 3711 primeThreads: threadMaximum = 4906 primeThreads: threadMaximum = 828 primeThreads: threadMaximum = 140 virtualCachedThreadPool 114, 113, 1 virtualThreadPerTaskExecutor 76, 76, 0 platformCachedThreadPool 234, 233, 1 platformThreadPerTaskExecutor 1049, 1048, 1 primeThreads: threadMaximum = 3103283 primeThreads: threadMaximum = 3454111 primeThreads: threadMaximum = 3690 primeThreads: threadMaximum = 186 virtualCachedThreadPool 43308, 43128, 180 virtualThreadPerTaskExecutor 36032, 35866, 166 platformCachedThreadPool 36133, 35641, 492 platformThreadPerTaskExecutor 1139449, 1139315, 134
      static void
      virtualPrimes(long limit, ThreadFactory threadFactory)
       
      static void
      virtualPrimes2(long limit, ThreadFactory threadFactory)
       

      Methods inherited from class java.lang.Object

      clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
    • Constructor Details

      • Experiment10_PrimeStreams

        public Experiment10_PrimeStreams()
    • Method Details

      • main

        public static void main(String[] args)
      • suite1

        public static void suite1()
      • suite2

        public static void suite2()
      • getPrimes

        public static long[] getPrimes(List<Future<Long>> primes)
      • suite3

        public static void suite3(long limit)
         primeThreads: threadMaximum = 499
         primeThreads: threadMaximum = 499
         primeThreads: threadMaximum = 256
         primeThreads: threadMaximum = 149
                                          to 1,000                            to 10,000                                 to 10,000,000
                                         tasks                 op/ms   ratio  tasks                  op/ms    ratio     tasks                               op/ms      ratio
         virtualCachedThreadPool          499   79, 74, 5      6.329   3.224  3711   114,  113, 1    34.722   1.625     3103283     43308,    43128, 180    115.452    0.834
         virtualThreadPerTaskExecutor     499   57, 57, 0      8.772   2.599  4906    76,   76, 0    65.789  13.894     3454111     36032,    35866, 166    138.765   31.616
         platformCachedThreadPool         256   81, 80, 1      1.953   0.310   828   234,  233, 1    21.367   0.615        3690     36133,    35641, 492    138.378    1.198
         platformThreadPerTaskExecutor    149   115, 115, 0    3.356   0.385   140  1049, 1048, 1     4.766   0.072         186   1139449,  1139315, 134      4.389    0.032
        
         Prime numbers                   to 1,000             to 10,000            to 10,000,000
                                         tasks op/ms  ratio   tasks  op/ms ratio   tasks    op/ms   ratio
         virtualCachedThreadPool          499  6.329  3.224   3711  34.722 1.625   3103283 115.452  0.834
         virtualThreadPerTaskExecutor     499  8.772  2.599   4906  65.789 3.894   3454111 138.765 31.616
         platformCachedThreadPool         256  1.953  0.310    828  21.367 0.615      3690 138.378  1.198
         platformThreadPerTaskExecutor    149  3.356  0.385    140   4.766 0.072       186   4.389  0.032
        
        
        
         primeThreads: threadMaximum = 3711
         primeThreads: threadMaximum = 4906
         primeThreads: threadMaximum = 828
         primeThreads: threadMaximum = 140
         virtualCachedThreadPool          114, 113, 1
         virtualThreadPerTaskExecutor     76, 76, 0
         platformCachedThreadPool         234, 233, 1
         platformThreadPerTaskExecutor    1049, 1048, 1
        
        
        
         primeThreads: threadMaximum = 3103283
         primeThreads: threadMaximum = 3454111
         primeThreads: threadMaximum = 3690
         primeThreads: threadMaximum = 186
         virtualCachedThreadPool          43308, 43128, 180
         virtualThreadPerTaskExecutor     36032, 35866, 166
         platformCachedThreadPool         36133, 35641, 492
         platformThreadPerTaskExecutor    1139449, 1139315, 134
         
        Parameters:
        limit -
      • primeThreads

        public static List<Future<Long>> primeThreads(long limit, ExecutorService executorService)
      • serialPrimes

        public static long[] serialPrimes(long limit)
      • serialPrimes2

        public static long[] serialPrimes2(long limit)
      • parallelPrimes

        public static void parallelPrimes(long limit)
      • parallelPrimes2

        public static void parallelPrimes2(long limit)
      • virtualPrimes

        public static void virtualPrimes(long limit, ThreadFactory threadFactory)
      • virtualPrimes2

        public static void virtualPrimes2(long limit, ThreadFactory threadFactory)
      • futurePrimes1

        public static void futurePrimes1(long limit, ThreadFactory threadFactory)
      • futurePrimes12

        public static void futurePrimes12(long limit, ThreadFactory threadFactory)
      • futurePrimes2

        public static void futurePrimes2(long limit, ThreadFactory threadFactory)
      • futurePrimes22

        public static void futurePrimes22(long limit, ThreadFactory threadFactory)
      • futurePrimes33

        public static void futurePrimes33(long limit, ThreadFactory threadFactory)