Go Pipeline

One of the most powerful things about Golang is the ability to run parts of your application concurrently. This allows the application to run in parallel at times (depending on the number of cores) or continue to run while waiting for a blocked process. There are a number of concurrency patterns that allow you to structure your applications to get a performance boost.

Recently I stumbled across the “Pipeline” pattern and was able to use it to transform incoming data and aggregate the results. Imagine a water treatment plant. Dirty water comes in, clean water comes out. The water goes through several phases before it is clean. First you may filter out large chunks of sediment, then smaller chunks, until you are screening microscopic pieces of debris. Then you want to put it through a series of chemical cleanings and finally test the water quality. Water flows through these various phases. At no point is water stopped until the first step is fully complete. Instead the water is processed as it flow ultimately becoming clean.

The pipeline pattern is similar because it processes data through different phases as it comes in instead of doing it iteratively. This way parts of the process aren’t blocked. However, like all concurrency patterns there is no way of guaranteeing the order in which things are returned. Different stages of the pipeline are fed by channels. So as data flows from one channel to another the goroutines can run concurrently and not be blocked if they are waiting for data.

Better yet a sub pattern of pipelining is called Fan Out. Several Go processes are started at once feeding data into the pipeline. This is beneficial for I/O operations that can block a system, API calls for example.

Let’s dive into some code to see how this pattern works.

Example

I found a great free weather api called MetaWeather The only problem is all of the data in metric! So let’s build an service that gathers weather data from several different cities and convert the temperatures from Celsius to Fahrenheit.

First I will define some structs to help with the parsing:

type WeatherData struct {
	ID string
	Country CountryData `json:"parent"`
	City string `json:"title"`
	Forecast []*WeatherDay `json:"consolidated_weather"`
	Error error
}

type WeatherDay struct {
	Type string `json:"weather_state_name"`
	Date string `json:"applicable_date"`
	MinTemp float64 `json:"min_temp"`
	MaxTemp float64 `json:"max_temp"`
}

type CountryData struct {
	Name string `json:"title"`
}

Next we’ll define the methods. Each step will either consume and/or produce a channel which will allow the data to flow. When the source is done it will close the first channel which will in turn close the other channels.

We will expect a list of “city codes” that define different cities around the world. Then we will spawn an individual goroutine to fetch that data. This allows our api calls to be concurrent. Each result is then parsed and pushed to a channel. We need to create a WaitGroup to close the channel when we are done downloading all of the city data. WaitGroup is part of the sync package and allows an application to wait for a set of goroutines to finish before proceeding. So we define the initial WaitGroup size to be the same as the incoming slice and spawn individual goroutines for each download. At the end of each routine there is a wg.Done() which will decrement the number of waits by one. Once all the processes have completed the WaitGroup can proceed and close the channel.

The actual process inside the goroutine is a simple fetch from the api and creates a new struct from the json response. The struct is then pushed onto the channel for the other process to pick it up.

const BASE_URL = "https://www.metaweather.com/api/location/%s/"

func getWeatherData(ids ...string) <-chan WeatherData {

	var wg sync.WaitGroup

	out := make(chan WeatherData)
	wg.Add(len(ids))

	for _, id := range ids {
		go func(cityId string){
			var data WeatherData
			data.ID = cityId

			url := fmt.Sprintf(BASE_URL, cityId)
			req, err := http.NewRequest("GET", url, nil)
			if err != nil {
				data.Error = errors.New("Error making request")
				out <- data
				wg.Done()
				return
			}

			client := &http.Client{}

			resp, err := client.Do(req)
			if err != nil {
				data.Error = errors.New("Error calling client")
				out <- data
				wg.Done()
				return
			}

			defer resp.Body.Close()

			if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
				data.Error = errors.New("Error decoding message")
				out <- data
				wg.Done()
				return
			}
			out <- data
			wg.Done()
		}(id)
	}


	go func(){
		wg.Wait()
		close(out)
	}()

	return out
}

Once the code has been pushed to the channel the consuming method can then begin processing. The function we have is simple, it will iterate over the days of the forecast and convert the temperatures to Fahrenheit. After processing the struct is then pushed onto the channel again for the last step which is printing the information.

func convertWeatherData(weatherData <-chan WeatherData)  <-chan WeatherData {
	out := make(chan WeatherData)

	go func(){
		for data := range weatherData{

			//This could be broken out even more
			for _, day := range data.Forecast {
				day.MaxTemp = conversionCtoF(day.MaxTemp)
				day.MinTemp = conversionCtoF(day.MinTemp)
			}
			out <- data
		}
		close(out)
	}()

	return out
}

func conversionCtoF(temp float64) float64{
	return temp * 1.8 + 32
}

After the code has been converted we will consume the values on the channel as they come in and print the results to the console. Once it’s done we will exit the application.

func main() {

	cities := []string{"44418", "2358820", "2471217", "2459115", "4118", "2372071", "615702", "968019", "727232", "650272"}

	fmt.Printf("Gathering weather information for %d cities\n", len(cities))
	fmt.Printf("\n#######################\n\n")
	data := convertWeatherData(getWeatherData(cities...))
	for data := range data{

		if data.Error != nil {
			fmt.Printf("Error fetching weather data for city id: %s\n", data.ID)
			continue
		}

		fmt.Printf("Weather Forcast for %s, %s\n", data.City, data.Country.Name)

		for _, day := range data.Forecast {
			fmt.Printf("\tDate: %s\n", day.Date)
			fmt.Printf("\t\t%s, High of %.2f℉, Low of %.2f℉\n\n", day.Type, day.MaxTemp, day.MinTemp)
		}
		fmt.Printf("\n#######################\n\n")
	}

	fmt.Println("Data fetch complete!")
}

What you will see is if you were to run the application several times is that each time the results of the cities would be in a different order. This is because they are being processed concurrently. You could imagine how long it would take if you had to wait on each city to be downloaded sequentially.

This flow has changed the way I write my go code. Concurrency is such a huge part of Golang and yet people don’t use them as much as they should. Take a look and see what other patterns you can find, hopefully it will make your code run a little bit faster.

Source code can be found here

Related