Scala - Process Management


You can manage external processes using the scala.sys.process package. You can manage execution, chaining, and management of external commands, just similar to Unix shells. So you can execute commands, pipe outputs, inputs/outputs etc. It is built on Java Process and ProcessBuilder classes.

There are various tasks, like automating system operations, integrating with other software, performing various utility functions, etc., that need to be performed. So we use Scala concurrency tools (i.e., Futures and the Akka actor system) for concurrent process execution. You can ensure reliable error handling, robust logging, and scalable concurrent programming.

Core Concepts

ProcessBuilder

ProcessBuilder trait is the cornerstone of process control in Scala. It represents at least one external process that can be executed. There are various methods to create instances of ProcessBuilder using Process objects and implicit conversions.

Process Execution

You can execute a process in various ways. Some of these given as follows -

  • !: It blocks until the process exits and returns the exit code.
  • !!: It blocks until the process exits and returns the output as a String.
  • lazyLines: It returns a LazyList[String] that allows lazy reading of the process output.
  • run: It executes the process concurrently and returns a Process object for control.

Process Composition

You can compose processes similarly to Unix shell commands. Some of these given as follows -

  • #|: It pipes the output of one process as the input to another.
  • ###: It executes processes sequentially.
  • #&&: It executes the second process only if the first succeeds.
  • #||: It executes the second process only if the first fails.

Example

import scala.sys.process._

// Execute `ls` and pipe its output to `grep`
val command = Seq("ls") #| Seq("grep", ".scala")
val outputBuffer = new StringBuffer()
val exitCode = command ! ProcessLogger(outputBuffer.append(_))

// Check the exit code
if (exitCode == 0) {
  println(outputBuffer.toString)
} else {
  println(s"Command failed with exit code $exitCode")
}

Save the above program in Demo.scala. Use the following commands to compile and execute this program.

Command

> scalac Demo.scala
> scala Demo

Output

Command failed with exit code 1

Handling Input/Output

You can handle input and output effectively. It is important for process control. Scala provides several abstractions -

  • ProcessIO: It offers low-level control over process input and output streams.
  • ProcessLogger: It is a higher-level abstraction to capture and log process output.
  • BasicIO: There are helper methods for creating ProcessIO.

Example

import java.io.{File, InputStream}
import scala.sys.process._

def customProcessHandling(name: String): Int = {
  val cat = Seq("cat", name)
  var count = 0

  def byteCounter(input: InputStream): Unit = {
    while (input.read() != -1) count += 1
    input.close()
  }

  val p = cat run new ProcessIO(_.close(), byteCounter, _.close())
  p.exitValue()
}

println(s"Byte count: ${customProcessHandling("example.txt")}")

Save the above program in Demo.scala. Use the following commands to compile and execute this program.

Command

> scalac Demo.scala
> scala Demo

Output

Byte count: 

Advanced I/O Handling

You can control process I/O using files and URLs directly.

Example

import java.io.File
import java.net.URL
import scala.sys.process._

// Define the URL and the output file
val url = new URL("http://www.scala-lang.org/")
val file = new File("scala-lang.html")

// Download the webpage and save it to the file
val process = url #> file

// Execute the process
process.!

Save the above program in Demo.scala. Use the following commands to compile and execute this program.

Command

> scalac Demo.scala
> scala Demo

It will execute successfully.

You can find concurrency an important aspect of process control. Scala has various tools for managing concurrent tasks.

Controlling Concurrency with Futures

You can control concurrency with Scala Future API and ExecutionContext.

Example

import scala.concurrent.{ExecutionContext, Future}
import java.util.concurrent.Executors

// Execution context with a fixed thread pool
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))

// Futures for concurrent tasks
val futures = (1 to 10).map { i =>
  Future {
    // Simulate processing
    Thread.sleep(10)
    i
  }
}

// Handle the results of the futures
Future.sequence(futures).foreach(results => println(s"Processed results: $results"))

Save the above program in Demo.scala. Use the following commands to compile and execute this program.

Command

> scalac Demo.scala
> scala Demo

Output

Processed results: Vector(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

Actors for Concurrent Processing

You can use actors to provide a lightweight model for concurrent processing. You can use Akka library for actor-based concurrency in Scala.

Note that you must have this dependency in your build.sbt file to execute below example -

import Dependencies._

ThisBuild / scalaVersion := "2.13.14"

lazy val root = (project in file("."))
  .settings(
    name := "Demo",
    version := "0.1",
    libraryDependencies ++= Seq(
      "com.typesafe.akka" %% "akka-actor" % "2.6.14"
    )
  )

Example

import akka.actor.{Actor, ActorSystem, Props}
import scala.concurrent.duration._
import scala.concurrent.Await

case class ProcessEvent(data: String)

class EventProcessor extends Actor {
  def receive = {
    case ProcessEvent(data) =>
      // Simulate processing
      println(s"Processing: $data")
  }
}

object Main extends App {
  // Set up the actor system
  val system = ActorSystem("ProcessSystem")
  val processor = system.actorOf(Props[EventProcessor], "processor")

  // Send a message to the actor
  processor ! ProcessEvent("Sample data")

  // Terminate the system after a short delay to ensure the message is processed
  Await.result(system.terminate(), 5.seconds)
}

Save the above program in Demo.scala. Use the following commands to compile and execute this program.

Command

> scalac Demo.scala
> scala Demo

Output

Processing: Sample data

Future and ExecutionContext

You can control the number of concurrent tasks with ExecutionContext.

Example

import scala.concurrent.{ExecutionContext, Future}
import java.util.concurrent.Executors
import scala.util.{Success, Failure}

object Demo extends App {
  // Execution context with a fixed thread pool
  implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(5))

  // Tasks for concurrent processing
  val tasks = (1 to 20).map { i =>
    Future {
      // Simulate some computation
      Thread.sleep(200)
      i * i
    }
  }

  // Handle the results of the tasks
  val aggregatedResult = Future.sequence(tasks)
  aggregatedResult.onComplete {
    case Success(result) => println(s"Results: $result")
    case Failure(e) => println(s"Error occurred: $e")
  }

  // Keep the application running to allow async processing to complete
  Thread.sleep(5000)
}

Save the above program in Demo.scala. Use the following commands to compile and execute this program.

Command

> scalac Demo.scala
> scala Demo

Output

Results: Vector(1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400)

Using ProcessLogger for Enhanced Output Handling

You can use ProcessLogger to capture and handle process output.

Example

import scala.sys.process._

object ProcessLoggerExample extends App {
  val buffer = new StringBuffer()
  val logger = ProcessLogger(buffer.append(_))

  // Execute a command and log its output
  "cmd /c dir".! (logger)

  println(s"Process output: ${buffer.toString}")
}

Save the above program in Demo.scala. Use the following commands to compile and execute this program.

Command

> scalac Demo.scala
> scala Demo

It will execute successfully.

Combining Processes

You can combine multiple processes to create complex workflows.

Example

import scala.sys.process._

object Demo extends App {
  val buffer = new StringBuffer()
  val logger = ProcessLogger(buffer.append(_))

  // Combine the commands using cmd /c and findstr for Windows
  val combined = Seq("cmd", "/c", "echo Hello && echo Hello | findstr Hello && echo Success || echo Failure")

  // Execute the combined process
  combined ! logger

  println(s"Process output: ${buffer.toString}")
}

Save the above program in Demo.scala. Use the following commands to compile and execute this program.

Command

> scalac Demo.scala
> scala Demo

Output

Process output: Hello Hello Success

Notes

  • You can manage external processes using the sys.process package. So you can execute, chaining, and manage commands similar to Unix shells.
  • It is built on Java Process and ProcessBuilder classes.
  • There are tools like Scala concurrency (Futures and Akka actor system) for reliable error handling, robust logging, and scalable concurrent programming.
  • You can execute a process using various methods, like ! for blocking until exit, !! for capturing output as a string, and run for concurrent execution.
  • You can compose processes similarly to Unix shell commands using operators like #| for piping, ### for sequential execution, and #&& for conditional execution.
  • There are abstractions for handling I/O, like ProcessIO for low-level control, ProcessLogger for logging, and BasicIO for helper methods.
  • You can use concurrency tools like Future and ExecutionContext for managing concurrent tasks and Akka actors for lightweight concurrent processing models.