I Built a Go Plugin for Alpaca’s MarketStore as a College Intern
Hey all! I’m Ethan and recently started working for Alpaca as a Software Engineering Intern! For my first task, I created a Go plugin for Alpaca’s open source MarketStore server that fetches and writes Binance minute-level.
Hey all! I’m Ethan and recently started working for Alpaca as a Software Engineering Intern! For my first task, I created a Go plugin for Alpaca’s open source MarketStore server that fetches and writes Binance minute-level.
You might be wondering — What is MarketStore? MarketStore is a database server written in Go that helps users handle large amounts of financial data. Inside of MarketStore, there are Go plugins that allow users to gather important financial and crypto data from third party sources.
For this blog post, I’ll be going over how I created the plugin from start to finish in three sections: Installing MarketStore, understanding MarketStore’s plugin structure, creating the Go plugin., and installing the Go plugin.
Experience Installing and Running MarketStore Locally
First, I set up MarketStore locally. I installed the latest version of Go and started going through the installation process outlined in MarketStore’s README. All the installation commands worked swimmingly, but when I tried to run marketstore using
ethanc@ethanc-Inspiron-5559~/go/bin/src/github.com/alpacahq/marketstore$ config mkts.yml=
I got this weird error:
/usr/local/go/src/fmt/print.go:597:CreateFile/go/src/github.com/alpacahq/marketstore/executor/wal.go:87open /project/data/mktsdb/WALFile.1529203211246361858.walfile: no such file or directory: Error Creating WAL File
I was super confused and couldn’t find any other examples of this error online. After checking and changing permissions in the directory, I realized my mkts.yml file configuration root_directory was incorrect. To resolve this, I changed mkts.yml from.
root_directory: /project/data/mktsdb
To
root_directory: /home/ethanc/go/bin/src/github.com/alpacahq/marketstore/project/data/mktsdb
and reran
ethanc@ethanc-Inspiron-5559: ~/go/bin/src/github.com/alpacahq/marketstore$ marketstore -config mkts.yml
ethanc@ethanc-Inspiron-5559: ~/go/bin/src/github.com/alpacahq/marketstore$ marketstore -config mkts.yml
...
I0621 11:37:52.067803 27660 log.go:14] Launching heartbeat service…
I0621 11:37:52.067803 27660 log.go:14] Enabling Query Access…
I0621 11:37:52.067803 27660 log.go:14] Launching tcp listener for all services
...
To enable the gdaxfeeder plugin which grabs data from a specified cryptocurrency, I uncommented these lines in the mkts.yml file:
bgworkers:
- module: gdaxfeeder.so
name: GdaxFetcher
config:
query_start: "2017-09-01 00:00"
and reran
ethanc@ethanc-Inspiron-5559: ~/go/bin/src/github.com/alpacahq/marketstore$ marketstore -config mkts.yml
which yielded:
I0621 11:44:27.248433 28089 log.go:14] Enabling Query Access…
I0621 11:44:27.248448 28089 log.go:14] Launching tcp listener for all services…
I0621 11:44:27.254118 28089 gdaxfeeder.go:123] lastTimestamp for BTC = 2017–09–01 04:59:00 +0000 UTC
I0621 11:44:27.254189 28089 gdaxfeeder.go:123] lastTimestamp for ETH = 0001–01–01 00:00:00 +0000 UTC
I0621 11:44:27.254242 28089 gdaxfeeder.go:123] lastTimestamp for LTC = 0001–01–01 00:00:00 +0000 UTC
I0621 11:44:27.254266 28089 gdaxfeeder.go:123] lastTimestamp for BCH = 0001–01–01 00:00:00 +0000 UTC
I0621 11:44:27.254283 28089 gdaxfeeder.go:144] Requesting BTC 2017–09–01 04:59:00 +0000 UTC — 2017–09–01 09:59:00 +0000 UTC
...
Now that I got MarketStore running, I used Jupyter notebooks and tested out the commands listed in this Alpaca tutorial and got the same results. You can read more about how to run MarketStore in MarketStore’s README, Alpaca’s tutorial, and this thread.
Understanding how MarketStore Plugins work
After installing, I wanted to understand how their MarketStore repository works and how their current Go plugins work. Before working in Alpaca, I didn’t have any experience with the Go programming language. So, I completed the Go’s “A Tour of Go” tutorial to get a general feel of the language. Having some experience with C++ and Python, I saw a lot of similarities and found that it wasn’t as difficult as I thought it would be.
Creating a MarketStore Plugin
To get started, I read the MarketStore Plugin README. To summarize at a very high level, there are two critical Go features which power plugins: Triggers and BgWorkers. You use triggers when you want your plugin to respond when certain types data are written to your MarketStore’s database. You would use BgWorkers if you want your plugin to run in the background.
I only needed to use the BgWorker feature because my plugin’s goal is to collect data outlined by the user in the mkts.yml configuration file.
To get started, I read the code from the gdaxfeeder plugin which is quite similar to what I wanted to do except that I’m trying to get and write data from the Binance exchange instead of the GDAX exchange.
I noticed that the gdaxfeeder used a GDAX Go Wrapper, which got its historical price data public endpoint. Luckily, I found a Go Wrapper for Binance created by adshao that has the endpoints which retrieves the current supported symbols as well as retrieves Open, High, Low, Close, Volume data for any timespan, duration, or symbol(s) set as the parameters.
To get started, I first created a folder called binancefeeder then created a file called binancefeeder.go inside of that. I then first tested the Go Wrapper for Binanceto see how to create a client and talk to the Binance API’s Kline endpoint to get data:
package main
import (
"encoding/json"
"fmt"
binance "github.com/adshao/go-binance"
)
func main() {
symbol := "BTCUSDT"
interval := "1m"
client := binance.NewClient("", "")
klines, err := client.NewKlinesService().Symbol(symbol).
Interval(interval).Do(context.Background())
if err != nil {
fmt.Println(err)
return
}
for _, k := range klines {
fmt.Println(k)
}
}
I then ran this command in my root directory:
ethanc@ethanc-Inspiron-5559: ~/go/bin/src/github.com/alpacahq/marketstore$ go run binancefeeder.go
and received the following response with Binance data:
& {1529553060000 6769.28000000 6773.91000000 6769.17000000 6771.34000000 32.95342700 1529553119999 223100.99470354 68 20.58056800 139345.00899491}
& { 1529553120000 6771.33000000 6774.00000000 6769.66000000 6774.00000000 36.43794400 1529553179999 246732.39415947 93 20.42194600 138288.41850603 }
...
So, it turns out that the Go Wrapper worked!
Next, I started brainstorming how I wanted to configure the Binance Go plugin. I ultimately chose symbols, queryStart, queryEnd, and baseTimeframe as my parameters since I wanted the user to query any specific symbol(s), start time, end time, and timespan (ex: 1min). Then, right after my imports, I started creating the necessary configurations and structure for BinanceFetcher for a MarketStore plugin:
package main
import (
"encoding/json"
"fmt"
binance "github.com/adshao/go-binance"
)
func main() {
symbol := "BTCUSDT"
interval := "1m"
client := binance.NewClient("", "")
klines, err := client.NewKlinesService().Symbol(symbol).
Interval(interval).Do(context.Background())
if err != nil {
fmt.Println(err)
return
}
for _, k := range klines {
fmt.Println(k)
}
}
The FetcherConfig’s members are what types of settings the user can configure in their configuration file (ex: mkts.yml) to start the plugin. The BinanceFetcher’’s members are similar to the FetcherConfig with the addition of the config member. This will be used in the Run function later.
After creating those structures, I started to write the background worker function. To set it up, I created the necessary variables inside the backgroundworker function and copied the recast function from the gdaxfeeder. The recast function uses Go’s Marshal function to encode the config JSON data received, then sets a variable ret to an empty interface called FetcherConfig. Then it stores the parsed JSON config data in the ret variable and returns it:
package main
import (
"encoding/json"
"fmt"
binance "github.com/adshao/go-binance"
)
//For user's config file
type FetcherConfig struct {
Symbols []string `json:"symbols"`
QueryStart string `json:"query_start"`
QueryEnd string `json:"query_end"`
BaseTimeframe string `json:"base_timeframe"`
}
//Implements bgworker.Run()
type BinanceFetcher struct {
config map[string]interface{}
symbols []string
queryStart time.Time
queryEnd time.Time
baseTimeframe *utils.Timeframe
}
Then inside the NewBgWorker function, I started to create a function to determine and return the correct time format as well as set up the symbols, end time, start time, and time duration. If there are no symbols set, by default, the background worker retrieves all the valid cryptocurrencies and sets the symbol member to all those currencies. It also checks the given times and duration and sets them to defaults if empty. At the end, it returns the pointer to BinanceFetcher as the bgworker.BgWorker:
package main
import (
"context"
"encoding/json"
"fmt"
"regexp"
"strconv"
"time"
binance "github.com/adshao/go-binance"
"github.com/alpacahq/marketstore/executor"
"github.com/alpacahq/marketstore/plugins/bgworker"
"github.com/alpacahq/marketstore/utils"
"github.com/alpacahq/marketstore/utils/io"
"github.com/golang/glog"
)
type FetcherConfig struct {
Symbols []string `json:"symbols"`
QueryStart string `json:"query_start"`
QueryEnd string `json:"query_end"`
BaseTimeframe string `json:"base_timeframe"`
}
//BinanceFetcher is the main worker for Binance
type BinanceFetcher struct {
config map[string]interface{}
symbols []string
queryStart time.Time
queryEnd time.Time
baseTimeframe *utils.Timeframe
}
func recast(config map[string]interface{}) *FetcherConfig {
data, _ := json.Marshal(config)
ret := FetcherConfig{}
json.Unmarshal(data, &ret)
return &ret
}
//Checks time string and returns correct time format
func QueryTime(query string) time.Time {
trials := []string{
"2006-01-02 03:04:05",
"2006-01-02T03:04:05",
"2006-01-02 03:04",
"2006-01-02T03:04",
"2006-01-02",
}
for _, layout := range trials {
qs, err := time.Parse(layout, query)
if err == nil {
//Returns time in correct time.Time object once it matches correct time format
return qs.In(utils.InstanceConfig.Timezone)
}
}
//Return null if no time matches time format
return time.Time{}
}
//Gets all symbols from binance
func GetAllSymbols() []string {
client := binance.NewClient("", "")
exchangeinfo, err := client.NewExchangeInfoService().Do(context.Background())
symbol := make([]string, 0)
status := make([]string, 0)
validSymbols := make([]string, 0)
if err != nil {
fmt.Println(err)
symbols := []string{"BTC", "EOS", "ETH", "BNB", "TRX", "ONT", "XRP", "ADA",
"LTC", "BCC", "TUSD", "IOTA", "ETC", "ICX", "NEO", "XLM", "QTUM", "BCH"}
return symbols
} else {
for _, info := range exchangeinfo.Symbols {
symbol = append(symbol, info.Symbol)
status = append(status, info.Status)
}
//Check status and append to symbols list if valid
for index, s := range status {
if s == "TRADING" {
validSymbols = append(validSymbols, symbol[index])
}
}
}
return validSymbols
}
//Register new background worker
func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) {
config := recast(conf)
var queryStart time.Time
var queryEnd time.Time
timeframeStr := "1Min"
var symbols []string
//First see if config has symbols, if not retrieve all from binance as default
if len(config.Symbols) > 0 {
symbols = config.Symbols
} else {
symbols = GetAllSymbols()
}
if config.BaseTimeframe != "" {
timeframeStr = config.BaseTimeframe
}
queryStart = QueryTime(config.QueryStart)
queryEnd = QueryTime(config.QueryEnd)
if config.BaseTimeframe != "" {
timeframeStr = config.BaseTimeframe
}
return &BinanceFetcher{
config: conf,
symbols: symbols,
queryStart: queryStart,
queryEnd: queryEnd,
baseTimeframe: utils.NewTimeframe(timeframeStr),
}, nil
}
Then, I started creating the Run function which is implemented by BgWorker (see bgworker.go for more details). To get a better sense of how to handle errors and write modular code in Go, I read the code for plugins gdaxfeeder and polygon plugins. The Run function receives the BinanceFetcher (which is dereferenced since bgworker.BgWorker was the pointer to BinanceFetcher). Our goal for the Run function is to call the Binance API’s endpoint with the given parameters for OHLCV and retrieve the data and writes it to your MarketStore’s database.
I first created a new Binance client with no API key or secret since I’m using their API’s public endpoints.
Then, to make sure that the BinanceFetcher doesn’t make any incorrectly formatted API calls, I created a function to check the timestamp format using regex and change it to the correct one. I had to convert the user’s given timestamp to maintain consistency in the Alpaca’s utils.Timeframe which has a lot of helpful functions but has different structure members than the one’s Binance uses (ex: “1min” vs. “1m”). If the user uses an unrecognizable timestamp format, it sets the baseTimeframe value to 1 minute:
func (bn *BinanceFetcher) Run() {
symbols := bn.symbols
client := binance.NewClient("", "")
timeStart := time.Time{}
finalTime := bn.queryEnd
originalInterval := bn.baseTimeframe.String
re := regexp.MustCompile("[0-9]+")
re2 := regexp.MustCompile("[a-zA-Z]+")
timeIntervalLettersOnly := re.ReplaceAllString(originalInterval, "")
timeIntervalNumsOnly := re2.ReplaceAllString(originalInterval, "")
correctIntervalSymbol := suffixBinanceDefs[timeIntervalLettersOnly]
//If Interval is formmatted incorrectly
if len(correctIntervalSymbol) <= 0 {
glog.Errorf("Interval Symbol Format Incorrect. Setting to time interval to default '1Min'")
correctIntervalSymbol = "1Min"
}
}
The start and end time objects are already checked in the NewBgWorker function and returns a null time.Time object if invalid. So, I only have to check if the start time is empty and set it to the default string of the current Time. The end time isn’t checked since it will be ignored if incorrect which will be explained in the later section:
if timeStart.IsZero() {
if !bn.queryStart.IsZero() {
timeStart = bn.queryStart
} else {
timeStart = time.Now().UTC().Add(-time.Hour)
}
} else {
timeStart = timeStart.Add(bn.baseTimeframe.Duration * 300)
}
Now that the BinanceFetcher checks for the validity of its parameters and sets it to defaults if its not valid, I moved onto programming a way to call the Binance API.
To make sure we don’t overcall the Binance API and get IP banned, I used a for loop to get the data in intervals. I created a timeStart variable which is first set to the given time start and then created a timeEnd variable which is 300 times the duration plus the timeStart's time. At the beginning of each loop after the first one, the timeStart variable is set to timeEnd and the timeEnd variable is set to 300 times the duration plus the timeStart’s time:
for {
if timeStart.IsZero() {
if !bn.queryStart.IsZero() {
timeStart = bn.queryStart
} else {
timeStart = time.Now().UTC().Add(-time.Hour)
}
} else {
timeStart = timeStart.Add(bn.baseTimeframe.Duration * 300)
}
timeEnd: = timeStart.Add(bn.baseTimeframe.Duration * 300)
}
When it reaches the end time given by the user, it simply alerts the user through glog and continues onward. Since this is a background worker, it needs to continue to work in the background. Then it writes the data retrieved to the MarketStore database. If invalid, the plugin will stop because I don’t want to write garbage values to the database:
func ConvertStringToFloat(str string) float64 {
convertedString, err: = strconv.ParseFloat(str, 64)
//Store error in string array which will be checked in main fucntion later to see if there is a need to exit
if err != nil {
glog.Errorf("String to float error: %v", err)
errorsConversion = append(errorsConversion, err)
}
return convertedString
}
//Convert time from milliseconds to Unix
func ConvertMillToTime(originalTime int64) time.Time {
i := time.Unix(0, originalTime*int64(time.Millisecond))
return i
}
for {
...
var timeStartM int64
var timeEndM int64
timeStartM = timeStart.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))
timeEndM = timeEnd.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))
for _, symbol: = range symbols {
glog.Infof("Requesting %s %v - %v", symbol, timeStart, timeEnd)
rates, err: = client.NewKlinesService().Symbol(symbol + "USDT").Interval(timeInterval).StartTime(timeStartM).EndTime(timeEndM).Do(context.Background())
if err != nil {
glog.Errorf("Response error: %v", err)
time.Sleep(time.Minute)
continue
}
if len(rates) == 0 {
glog.Info("len(rates) == 0")
continue
}
openTime: = make([] int64, 0)
open: = make([] float64, 0)
high: = make([] float64, 0)
low: = make([] float64, 0)
close: = make([] float64, 0)
volume: = make([] float64, 0)
for _, rate: = range rates {
errorsConversion = errorsConversion[: 0]
openTime = append(openTime, ConvertMillToTime(rate.OpenTime).Unix())
open = append(open, ConvertStringToFloat(rate.Open))
high = append(high, ConvertStringToFloat(rate.High))
low = append(low, ConvertStringToFloat(rate.Low))
close = append(close, ConvertStringToFloat(rate.Close))
volume = append(volume, ConvertStringToFloat(rate.Volume))
for _, e: = range errorsConversion {
if e != nil {
return
}
}
}
cs: = io.NewColumnSeries()
cs.AddColumn("Epoch", openTime)
cs.AddColumn("Open", open)
cs.AddColumn("High", high)
cs.AddColumn("Low", low)
cs.AddColumn("Close", close)
cs.AddColumn("Volume", volume)
// glog.Infof("%s: %d rates between %v - %v", symbol, len(rates),
// timeStart.String(), timeEnd.String())
csm: = io.NewColumnSeriesMap()
//Has to be in this format for the data to be written correctly!
tbk: = io.NewTimeBucketKey(symbol + "/" + bn.baseTimeframe.String + "/OHLCV")
csm.AddColumnSeries( * tbk, cs)
executor.WriteCSM(csm, false)
}
//Sleep for a second before next call
time.Sleep(time.Second)
}
Installing Go Plugin
To install, I simply changed back to the root directory and ran:
ethanc@ethanc-Inspiron-5559: ~/go/bin/src/github.com/alpacahq/marketstore$ make plugins
Then, to configure MarketStore to use my file, I changed my config file, mkts.yml, to the following:
root_directory: /home/ethanc/go/bin/src/github.com/cryptoscan/project/data/mktsdb
listen_port: 5993
# timezone: "America/New_York"
log_level: info
queryable: true
stop_grace_period: 0
wal_rotate_interval: 5
enable_add: true
enable_remove: false
enable_last_known: false
bgworkers:
- module: binancefeeder.so
name: BinanceFetcher
config:
symbols:
- ETH
base_timeframe: "1Min"
query_start: "2018-01-01 00:00"
query_end: "2018-01-02 00:00"
view rawmkts.yml hosted with ❤ by GitHub
Then, I ran MarketStore:
ethanc@ethanc-Inspiron-5559: ~/go/bin/src/github.com/alpacahq/marketstore$ marketstore -config mkts.yml
And got the following:
I0621 14:48:46.944709 6391 plugins.go:42] InitializeBgWorkers
I0621 14:48:46.944801 6391 plugins.go:45] bgWorkerSetting = &{binancefeeder.so BinanceFetcher map[base_timeframe:1Min query_start:2018–01–01 00:00 query_end:2018–01–02 00:00 symbols:[ETH]]}
I0621 14:48:46.952424 6391 log.go:14] Trying to load module from path:/home/ethanc/go/bin/bin/binancefeeder.so…
I0621 14:48:47.650619 6391 log.go:14] Success loading module /home/ethanc/go/bin/bin/binancefeeder.so.
I0621 14:48:47.651571 6391 plugins.go:51] Start running BgWorker BinanceFetcher…
I0621 14:48:47.651633 6391 log.go:14] Launching heartbeat service…
I0621 14:48:47.651679 6391 log.go:14] Enabling Query Access…
14:48:47.651749 6391 log.go:14] Launching tcp listener for all services…
14:48:47.654961 6391 binancefeeder.go:198] Requesting ETH 2018–01–01 00:00:00 +0000 UTC — 2018–01–01 05:00:00 +0000
...
Testing:
When I was editing my plugin and debugging, I often ran the binancefeeder.go file:
ethanc@ethanc-Inspiron-5559: ~/go/bin/src/github.com/alpacahq/marketstore$ go run binancefeeder.go
If I ran into an issue I couldn’t resolve, I used the equivalent print function for Go (fmt). If there is an issue while running the plugin as part of MarketStore via the marketstore -config mkts.yml command, I used the glog.Infof() or glog.Errorf() function to output the corresponding error or incorrect data value.
Lastly, I copied the gdaxfeeder test go program and simply modified it for my binancefeeder test go program.
You’ve made it to the end of the blog post! Here’s the link to the Binance plugin if you want to see the complete code. If you want to see all of MarketStore’s plugins, check out this folder.
To summarize, if you want to create a Go extension for any open source repository, I would first read the existing documentation whether it is a README.md or a dedicated documentation website. Then, I would experiment around the repositories code by changing certain parts of the code and see which functions correspond with what action. Lastly, I would look over previous extensions and refactor an existing one that seems close to your plugin idea.
Thanks for reading! I hope you take a look at the MarketStore repository and test it out. If you have any questions, few free to comment below and I’ll try to answer!
Special thanks to Hitoshi, Sho, Chris, and the rest of the Alpaca’s Engineering team for their code reviews and help as well as Yoshi and Rao for providing feedback for this post.
By: Ethan Chiu