Skip to main content
  1. Posts/

Concurrency Abstractions in Go: Queues, Tasks, and Actors

·2980 words·14 mins· 0
go concurrency actor-model
Shaffer Dehmlow
Author
Shaffer Dehmlow

Introduction #

Concurrency is built-in to the Go programming language and is one of the most powerful features of the language. There are many great resources that explain how concurrency works out of the box in Go. This post will cover a basic overview of how concurrency works in Go and then explore some concurrency abstractions that are common in other languages and how they can be implemented in Go. This post will also cover some basic usage of generics and managing state with closures.

Note that one of the characteristics of Go is that it does not rely on as many abstractions as other languages and more direct approaches are common. The patterns presented here are more for learning and exploring with comparisons to other languages rather than for actual use. Error handling is not directly addressed in any of these examples as well.

Code for this post can be found at github.com/sdehm/go-concurrency-abstractions.

Go Concurrency Fundamentals #

Go has two primary features for handling concurrency: goroutines and channels.

Goroutines #

A goroutine is a lightweight, virtual process that is managed by a built-in scheduler. Goroutines are lighter weight than threads in other languages and are created by calling the go keyword followed by a function call. For example:

func main() {
  go func() {
    fmt.Println("Hello, World!")
  }()
}

This code snippet will create a new goroutine that will print “Hello, World!” to the console in the background. It is important to note that goroutines on their own do not provide any way to wait for execution to complete or to communicate with other goroutines. This is where channels come in.

Channels #

A channel in Go is a special data structure that behaves like a queue that can be accessed by multiple goroutines. Items can be either sent into the channel or received from the channel. Channels provide the ability for goroutines to communicate with each other. This allows data to be passed in and out of goroutines and for goroutines to wait for data to be available. They also allow for synchronization of goroutines to ensure that they are executed in a particular order. If the example above was run in an empty program it would not print anything since the main goroutine would exit before the goroutine that prints “Hello, World!” could finish. Channels can be used to wait for the goroutine to finish before exiting the program.

// helloworld.go
func helloworld() {
  c := make(chan struct{})
  go func() {
    fmt.Println("Hello, World!")
    c <- struct{}{}
  }()
  <-c
}
The struct{} type is used here due to its minimal size. Another popular option for a signal channel like this is a bool.

In this example the data in the channel does not matter. Instead we are using a special feature of channels that the receive operation will block until data is available. When the empty struct is sent into the channel in the goroutine the receive operation in the main goroutine will unblock and the program will exit having printed the greeting.

Channels can also be closed which signals that no more data will be sent into the channel and any blocking receive operation should unblock. In our example here this is not necessary since we are only sending a single item into the channel and only waiting for one item to be received.

An additional way to receive data from a channel is to use a for loop and the range keyword. Channels can also be buffered, see this Go by Example post for more details.

Work Queues #

Channels in Go are basically queues so a work queue concurrency abstraction makes sense in Go. A work queue is a queue of tasks that are executed by a pool of workers. The number of workers should be configurable and typically makes sense to set to the number of CPU cores. An example work queue implementation is shown below.

// workers.go
type Workers[T any] struct {
	Work chan func() T
	Results chan T
	wg sync.WaitGroup
}

func New[T any](numWorkers int) *Workers[T] {
	w := &Workers[T]{
		Work: make(chan func() T),
		Results: make(chan T),
		wg: sync.WaitGroup{},
	}

	for i := 0; i < numWorkers; i++ {
		w.wg.Add(1)
		go func() {
			for f := range w.Work {
				w.Results <- f()
			}
			w.wg.Done()
		}()
	}

	// Close the results channel when the work is done.
	go func() {
		w.wg.Wait()
		close(w.Results)
	}()

	return w
}

In this example we create a Workers struct that has a Work channel for tasks and a Results channel for results. The struct has a generic type parameter to allow a variety of result types. We could allow for input arguments explicitly here but our earlier trick of using closures works here as well since we are using a function as the work item type. The struct also stores a wait group to manage synchronization of the workers so that we can close the results channel when all the work is done. The constructor for this struct accepts a number of workers to create and starts a go routine for each worker that is listening to the work channel to take on work as it comes in. As work is performed the result is sent to the results channel. The following shows how this can be used.

w := workers.New[string](2)
go func() {
  for i := 0; i < 10; i++ {
    i := i
    w.Work <- func() string {
      return fmt.Sprintf("%d", i)
    }
  }
  close(w.Work)
}()
for r := range w.Results {
  fmt.Println(r)
}
Note that the Workers struct exposes a basic Work channel that is not buffered. Since the channel will block on send we must send our work items in a separate go routine that will close the channel once all work has been added. If we knew ahead of time how much work we would have we could create a buffered channel of the appropriate size.

Tasks #

Other languages provide a Task concurrency abstraction that allows for asynchronous execution code with explicit synchronization or “awaiting”. Sometimes this behavior is called a promise or future.

Basic Task Implementation #

The following is an implementation of a Task abstraction in Go based loosely on the .NET Task.

// task.go
func New(f func()) *Task {
	return &Task{
		f:       f,
		awaiter: make(chan struct{}),
	}
}

func (t *Task) Start() {
	go func() {
		t.f()
		t.awaiter <- struct{}{}
	}()
}

func (t *Task) Wait() {
	<-t.awaiter
}

To use this Task abstraction we can create a new Task and call Start to start the Task in a new goroutine. We then call Wait to wait for the Task to complete. The awaiter channel is used to block the Wait call until the provided function has completed.

t := New(func() {
  fmt.Println("Hello, World!")
})
t.Start()
t.Wait()

This implementation is very simple and does not provide any error handling or cancellation. It also only implements functions that take no arguments and return no values.

Task with Input Arguments #

If we want to use a function with arguments with our Task we can use a closure to capture the arguments. This example uses a similar function except the greeting to print is set in the outside scope and passed in as a closure.

greeting := "Hello from the closure!"
t := New(func() {
  fmt.Println(greeting)
})
t.Start()
t.Wait()
Note that the usual caveats about closures apply here and loop scope variables should be used with care and set as local variables when necessary. In this proposal this might be changing.

Task with Output Values #

If we want to return a value from the Task we can also use a closure to capture the return value. This example defines a variable in the outside scope and modifies it in the Task function.

var greeting string
t := New(func() {
  greeting = "Hello from the closure!"
})
t.Start()
t.Wait()
fmt.Println(greeting)

Using closures like this works but it can be hard to determine what the actual inputs and outputs of the Task are. We also need to pay more attention to the scoping of our variables than seems necessary.

Refactoring with Generic Helpers #

We can update our Task implementation to use some generic helpers to make the code more readable and easier to use. Since we have already proven that the closure method works for input and outputs we can take advantage of that and not modify existing code. To allow the creation of a task with a single input argument we can create the following helper function.

// Creates a new task with a single input argument.
func NewWithInput[T any](f func(T), input T) *Task {
	fun := func() {
		f(input)
	}
	return New(fun)
}

// usage
t := NewWithInput(func(i string) {
  fmt.Println(i)
}, "Hello with generics!")
t.Start()
t.Wait()

This helper function takes a function that takes a single argument and a value to pass into that function.

To allow a task to return a value we must slightly modify the Task struct to store the return value. We create a new constructor function to simplify creation of the new struct and a getter function to get the result after blocking on the execution of the task.

// Task struct that stores a single result value.
type TaskWithResult[T any] struct {
	Task
	result T
}

// Creates a new task that stores a single result value.
func NewWithResult[T any](f func() T) *TaskWithResult[T] {
	t := &TaskWithResult[T]{
		Task: *New(nil),
	}
	t.f = func() {
		t.result = f()
	}
	return t
}

// Returns the result value after waiting for the task to finish.
func (t *TaskWithResult[T]) GetResult() T {
	t.Wait()
	return t.result
}

This new constructor closes over its own container struct in order to store the result value. Using these new features are straightforward and shown in the following example. GetResult is called rather than Wait so that we get the result value after the task has finished.

t := NewWithResult(func() string {
  return "Hello with generic output!"
})
t.Start()
fmt.Println(t.GetResult())

These approaches can be combined based on the needed input and output tasks of the application. Unfortunately we would need to create a new version of these functions and wrapper structs for every variation. If we were building a library we would not want to create a new function for every possible number of arguments. Code generation and reflection could help solve this but we won’t explore that now.

Actor Model #

The actor model is a concurrency abstraction that is based on the idea of actors that communicate with each other via messages. An actor can only do three things when receiving a message: mutate internal state, send a message to another actor, and create new actors. 1 Message sending is asynchronous and non-blocking by default so the order of messages is not guaranteed. The actor is the base unit of computation and they compose to form larger systems. One of the interesting properties of an actor model based system is that there is no shared state between actors. This allows for greater fault tolerance and easier reasoning about the system.

This pattern has been implemented in many languages and is a proven approach to distributed systems. Erlang and other BEAM languages such as Elixir are built around this concept. BEAM languages use an abstraction on top of the actor model known as OTP (Open Telecom Platform)2 that forms a powerful framework for building distributed systems. For a Go implementation of OTP see https://github.com/ergo-services/ergo. This example implementation of the actor model is more basic.

An extra benefit of the actor model is that it can be easily scaled out to a distributed system where the actors are running on different machines though we won’t cover that case here.

Actors #

In Go, goroutines make a great foundation for actors as Erlang processes do for BEAM languages. On top of the virtualized lightweight unit of computation we only need to add a message queue to create an actor. Some synchronization is also necessary in to improve the usability of the actor.

This example creates a simple Actor struct that wraps up a configurable actor implementation with a message queue and a wait group for synchronization. The handling of messages is configured at construction via a function that is stored in the struct.

// actor.go
type Actor[T any] struct {
	messages chan T
	handler  func(T)
	wg       sync.WaitGroup
}

// Creates a new actor with the given handler function.
func New[T any](handler func(T)) *Actor[T] {
	a := &Actor[T]{
		messages: make(chan T),
		handler:  handler,
	}
	go func() {
		for m := range a.messages {
			a.handler(m)
			a.wg.Done()
		}
	}()
	return a
}

// Sends a message to the actor.
func (a *Actor[T]) Send(m T) {
	a.wg.Add(1)
	go func() {
		a.messages <- m
	}()
}

// Waits for all messages to be finished and closes the channel.
func (a *Actor[T]) Stop() {
	a.wg.Wait()
	close(a.messages)
}

The Actor struct has a generic type parameter to allow for a variety of message types. It implements a Send method that adds the message to the message queue and increments the wait group. This is done asynchronously so that the caller does not block though the wait group ensures that when we stop the actor it drains existing messages. Asynchronous message sending is expected in an actor model based system. The Stop method waits for all messages to be processed and then closes the message queue.

Note that the messages channel is not buffered and we are able to accept multiple messages because we are using a goroutine to send and process each message.

See the following snippet for usage of this struct. In this example we create a simple actor that prints out the message it receives.

a := actor.New(func(s string) {
    fmt.Println(s)
})
a.Send("Hello, World!")
a.Send("Hello again, World!")
a.Stop()

The messages will not always be printed in the order they were sent because actors are asynchronous and non-deterministic by default.

Specialized Actors #

The Actor struct we built above is a generic implementation that can be expanded upon to create more specialized actors. In this example we create a specialized actor that prints every message (of type string) it receives.

// printer.go
type Printer struct {
	*Actor[string]
}

func NewPrinter() *Printer {
	p := &Printer{}
	p.Actor = New(func(s string) {
		fmt.Println(s)
	})
	return p
}

func (p *Printer) Print(s string) {
	p.Send(s)
}

All we are doing to the generic Actor is creating a custom constructor and wrapping the Send method with a more specific name. The following snippet shows how this would be used to perform the same task as the generic Actor example above.

p := actor.NewPrinter()
p.Print("Hello, World!")
p.Print("Hello again, World!")
p.Stop()

The output is the same but we get a little bit more reusability and better readability with our specialized actors.

Multiple Actors #

Actors are rarely useful on their own. Instead, they are typically composed into larger systems that accomplish more complex tasks. In this example we create two types of actors: a ChatRoom and a Client. The ChatRoom is responsible for printing any message it receives along with the sender’s name. The Client is responsible for sending messages to the ChatRoom.

// chat_room.go
type ChatRoom struct {
	*Actor[message]
}

// Creates a new chat room that prints messages that come in.
func NewChatRoom() *ChatRoom {
	c := &ChatRoom{}
	c.Actor = New(func(m message) {
		fmt.Printf("%s: \t %s\n", m.sender.name, m.text)
	})
	return c
}

type Client struct {
	*Actor[string]
	name string
}

// Creates a new client that sends messages to the given chat room.
func NewClient(name string, room *ChatRoom) *Client {
	c := &Client{
		name: name,
	}
	c.Actor = New(func(t string) {
		m := message{
			sender: c,
			text:   t,
		}
		room.Send(m)
	})
	return c
}

type message struct {
	sender *Client
	text   string
}

All we need to do to implement this is create custom types and constructors for each actor type that sets their message handling behavior as well as create a small custom struct to serve as the message type. This snippet shows these actors in action.

// create chat room
chatRoom := actor.NewChatRoom()

// create clients
alice := actor.NewClient("Alice", chatRoom)
bob := actor.NewClient("Bob", chatRoom)

// send messages
alice.Send("Hello, Bob!")
bob.Send("Hello, Alice!")

// stop actors and wait for them to finish
alice.Stop()
bob.Stop()
chatRoom.Stop()

The chatroom actor is constructed, then two clients are created with the chatroom pointer as an input so that their handlers know which actor to send messages to. The clients then send messages to each other and the chatroom prints them out. Finally, all actors are stopped and the program exits.

Note that the actors must be stopped in this example so that we make sure to wait for all messages to be processed since they are passing asynchronously.

More advanced actors might accept multiple types of messages and have more complex message handling logic. They may also store more complex state. Supervisor actors that manage other actors and enhance fault tolerance are also common.

Conclusions #

The fundamental building blocks of Go allow for some expressive and powerful concurrency patterns. Despite the fact that in reality most usages of concurrency would be more bespoke, the basic tools of Go allow for a lot of flexibility.

Some future exploration could include:

  • More complex actor systems and a more thorough implementation of the actor model
  • Event based concurrency patterns
  • Performance benchmarking of the different approaches and comparison with other languages
  • Error handling and fault tolerance
  • Using buffering to improve performance and change behavior of the channels

Further Reading #

The Go Programming Language by Alan A. A. Donovan and Brian W. Kernighan is a great resource for understanding Go and has thorough descriptions of concurrency in Go.

Go By Example has useful examples of Go features including those for concurrency.


  1. See https://www.brianstorti.com/the-actor-model/ for a good introduction to the actor model as well as https://en.wikipedia.org/wiki/Actor_model for more history and detail. ↩︎

  2. See https://github.com/erlang/otp and https://erlang.org/download/armstrong_thesis_2003.pdf for more information on OTP. ↩︎