Links
Quick Start
go get github.com/amberflo/metering-go/[email protected]
Ingesting meters
package main
import (
"fmt"
"time"
"github.com/amberflo/metering-go/v2"
"github.com/xtgo/uuid"
)
//obtain your Amberflo API Key
var apiKey = "my-api-key"
func main() {
//Optional ingest options
//Frequency at which queued data will be sent to API. Default is 1 second.
intervalSeconds := 30 * time.Second
//Number of messages posted to the API. Default is 100.
batchSize := 5
//Debug mode logging. Default is false.
debug := true
//Instantiate a new metering client
meteringClient := metering.NewMeteringClient(
apiKey,
metering.WithBatchSize(batchSize),
metering.WithIntervalSeconds(intervalSeconds),
metering.WithDebug(debug),
)
customerId := "dell-10"
//Define dimesions for your meters. Dimensions can be used as filters.
dimensions := make(map[string]string)
dimensions["region"] = "Midwest"
dimensions["customerType"] = "Tech"
for i := 0; i < 50; i++ {
utcMillis := time.Now().UnixNano() / int64(time.Millisecond)
//Queue meter messages for ingestion.
//Queue will be flushed asyncrhonously when Metering.BatchSize is exceeded
//or periodically at Metering.IntervalSeconds
//unique ID is optional, but setting it
//helps with de-dupe and revoking an ingested meter
uniqueId := uuid.NewRandom().String()
meteringError := meteringClient.Meter(&metering.MeterMessage{
UniqueId: uniqueId,
MeterApiName: "ApiCalls-From-Go",
CustomerId: customerId,
MeterValue: float64(i) + 234.0,
MeterTimeInMillis: utcMillis,
Dimensions: dimensions,
})
if meteringError != nil {
fmt.Println("Metering error: ", meteringError)
}
time.Sleep(500 * time.Millisecond)
}
//Perform graceful shutdown
//Flush all messages in the queue, stop the timer,
//close all channels, and shutdown the client
meteringClient.Shutdown()
}
Cancel an ingested meter
A meter can be cancelled by resending the same ingestion event and setting metering.CancelMeter dimension to "true". See code below.
dimensions[metering.CancelMeter] = "true"
//cancel an ingested meter
meteringError := Metering.Meter(&metering.MeterMessage{
UniqueId: uniqueId,
MeterApiName: "ApiCalls-From-Go",
CustomerId: customerId,
MeterValue: meterValue,
MeterTimeInMillis: utcMillis,
Dimensions: dimensions,
})
Query usage
package main
import (
"encoding/json"
"fmt"
"github.com/amberflo/metering-go/v2"
)
//obtain your Amberflo API Key
var apiKey = "my-api-key"
func main() {
customerId := "dell-8"
//initialize the usage client
usageClient := metering.NewUsageClient(
apiKey,
// metering.WithCustomLogger(customerLogger),
)
//set the start time of the time range in Epoch seconds
startTimeInSeconds := (time.Now().UnixNano() / int64(time.Second)) - (24 * 60 * 60)
timeRange := &metering.TimeRange{
StartTimeInSeconds: startTimeInSeconds,
}
//specify the limit and sort order
take := &metering.Take{
Limit: 10,
IsAscending: true,
}
// Example 1: group by customers for a specific meter and all customers
// setup usage query params
// visit following link for description of payload:
// https://amberflo.readme.io/reference#usage
usageResult, err := usageClient.GetUsage(&metering.UsagePayload{
MeterApiName: "ApiCalls-From-Go",
Aggregation: metering.Sum,
TimeGroupingInterval: metering.Day,
GroupBy: []string{"customerId"},
TimeRange: timeRange,
})
fmt.Println("Usage by meterApiName in json format")
printUsageData(*usageResult, err)
//Example 2: filter for a meter for specific customer
//setup usage query params
filter := make(map[string][]string)
filter["customerId"] = []string{customerId}
usageResult, err = usageClient.GetUsage(&metering.UsagePayload{
MeterApiName: "ApiCalls-From-Go",
Aggregation: metering.Sum,
TimeGroupingInterval: metering.Day,
GroupBy: []string{"customerId"},
TimeRange: timeRange,
Filter: filter,
})
fmt.Println("Usage for meter for specific customer in json format")
printUsageData(*usageResult, err)
}
func printUsageData(usageResult metering.DetailedMeterAggregation, err error) {
if err != nil {
fmt.Println("Usage error: ", err)
return
}
jsonString, err := json.MarshalIndent(usageResult, "", " ")
if err != nil {
fmt.Println("Usage error: ", err)
return
}
fmt.Println(string(jsonString))
}
Manage customers
package main
import (
"fmt"
"github.com/amberflo/metering-go/v2"
)
//obtain your Amberflo API Key
var apiKey = "my-api-key"
func main() {
customerId := "dell-8"
//Automatically create customer in Stripe
//and add stripeId to traits
createCustomerInStripe := true
//initialize the customer client
customerClient := metering.NewCustomerClient(
apiKey,
//metering.WithCustomLogger(customerLogger),
)
//check if customer exists
customer, err := customerClient.GetCustomer(customerId)
if err != nil {
fmt.Println("Error getting customer details: ", err)
}
//setup customer
if customer != nil {
//customer exists
//update properties
customer.CustomerName = "Dell 2"
} else {
//setup new customer
//Traits are optional. Traits can be used as filters or aggregation buckets.
traits := make(map[string]string)
traits["region"] = "us-west"
traits["customerType"] = "Tech"
//In case createCustomerInStripe is false, set the trait for stripeId
//traits[metering.StripeTraitKey] = "cus_LVxxpBQvyN3V49"
//Set the AWS marketplace ID trait
//traits[metering.AwsMarketPlaceTraitKey] = "aws_marketplace_id"
customer = &metering.Customer{
CustomerId: customerId,
CustomerName: "Dell",
CustomerEmail: "[email protected]",
Traits: traits,
Enabled: true,
}
}
customer, err = customerClient.AddorUpdateCustomer(customer, createCustomerInStripe)
if err != nil {
fmt.Println("Error creating customer details: ", err)
}
customerStatus := fmt.Sprintf("Stripe id for customer: %s", customer.Traits[metering.StripeTraitKey])
fmt.Println(customerStatus)
}
Custom logger
By default, metering-go uses the default GO logger. You can inject your own logger by implementing the following interface Logger:
type Logger interface {
Log(v ...interface{})
Logf(format string, v ...interface{})
}
Define the custom logger:
package main
import (
"fmt"
"os"
"github.com/rs/zerolog"
)
type CustomLogger struct {
logger *zerolog.Logger
}
func NewCustomLogger() *CustomLogger {
logLevel := zerolog.DebugLevel
zerolog.SetGlobalLevel(logLevel)
logger := zerolog.New(os.Stdout).With().Timestamp().Logger()
return &CustomLogger{logger: &logger}
}
func (l *CustomLogger) Log(args ...interface{}) {
msg := fmt.Sprintln(args...)
l.logger.Debug().Msg(msg)
}
func (l *CustomLogger) Logf(format string, args ...interface{}) {
l.logger.Debug().Msgf(format, args...)
}
Instantiate metering client with custom logger:
package main
import (
"github.com/amberflo/metering-go/v2"
)
//obtain your Amberflo API Key
var apiKey = "my-api-key"
func main() {
customerLogger := NewCustomLogger()
//Instantiate a new metering client with custom logger
meteringClient := metering.NewMeteringClient(
apiKey,
metering.WithLogger(customerLogger),
)
//initialize the usage client with custom logger
usageClient := metering.NewUsageClient(
apiKey,
metering.WithCustomLogger(customerLogger),
)
//initialize the usage cost client with custom logger
usageCostClient := metering.NewUsageCostClient(
apiKey,
metering.WithCustomLogger(customerLogger),
)
}
Query usage cost with paging
package main
import (
"encoding/json"
"fmt"
"github.com/amberflo/metering-go/v2"
)
//obtain your Amberflo API Key
var apiKey = "my-api-key"
func main() {
customerId := "dell-8"
//initialize the usage cost client
usageCostClient := metering.NewUsageCostClient(
apiKey,
// metering.WithCustomLogger(customerLogger),
)
//set the start time of the time range in Epoch seconds
startTimeInSeconds := (time.Now().UnixNano() / int64(time.Second)) - (24 * 60 * 60)
timeRange := &metering.TimeRange{
StartTimeInSeconds: startTimeInSeconds,
}
// paging
pageIndex := int64(1)
page := &metering.Page{
Number: pageIndex,
Size: 2,
}
// Example 1: group by product plan
// setup usage cost query params
// visit following link for description of payload:
// https://docs.amberflo.io/reference/post_payments-cost-usage-cost
for pageIndex < 5 {
usageCostResultForPage, err := usageCostClient.GetUsageCost(&metering.UsageCostsKey{
TimeGroupingInterval: metering.Day,
GroupBy: []string{"product_plan_id"},
TimeRange: timeRange,
Page: page,
})
fmt.Println("Usage Cost Result for page: ", pageIndex)
printUsageCostData(*usageCostResultForPage, err)
//increment the page number
pageIndex = pageIndex + 1
//obtain total pages from result and stop if limit reached
if usageCostResultForPage.PageInfo.TotalPages < pageIndex {
break
}
page.Number = pageIndex
//a token from a previous query page result to track pages and improve performance
pageToken := usageCostResultForPage.PageInfo.PageToken
page.Token = pageToken
}
}
func printUsageCostData(usageCostResult metering.UsageCosts, err error) {
if err != nil {
fmt.Println("Usage cost error: ", err)
return
}
jsonString, err := json.MarshalIndent(usageCostResult, "", " ")
if err != nil {
fmt.Println("Usage cost error: ", err)
return
}
fmt.Println(string(jsonString))
}
Pricing plans
package main
import (
"encoding/json"
"fmt"
"github.com/amberflo/metering-go/v2"
)
//obtain your Amberflo API Key
var apiKey = "my-api-key"
func main() {
customerId := "dell-8"
//Assign pricing plan to customer
productPlanId := "8e880691-1ae8-493b-b0a7-12a71e5dfcca"
customerPricingClient := metering.NewCustomerPricingPlanClient(apiKey)
customerPricingPlan, err := customerPricingClient.AddOrUpdate(&metering.CustomerProductPlan{
ProductPlanId: productPlanId,
CustomerId: customerId,
StartTimeInSeconds: (time.Now().UnixNano() / int64(time.Second)),
})
if err != nil {
fmt.Println("Error assigning customer plan: ", err)
}
pricingStatus := fmt.Sprintf("Customer pricing plan %s assigned to customer %s", customerPricingPlan.ProductPlanId, customerId)
fmt.Println(pricingStatus)
}
Prepaid Client
package main
import (
"fmt"
"time"
"github.com/amberflo/metering-go/v2"
"github.com/xtgo/uuid"
)
//obtain your Amberflo API Key
var apiKey = "my-api-key"
func main() {
customerId := "dell-1800"
startTimeInSeconds := (time.Now().UnixNano() / int64(time.Second)) - (1 * 24 * 60 * 60)
//*****************************************
//*****************************************
//Prepaid SDK
//initialize the prepaidClient
prepaidClient := metering.NewPrepaidClient(
apiKey,
//use a custom logger
//metering.WithCustomLogger(customerLogger), for custom logger
)
recurrenceFrequency := &metering.BillingPeriod{
Interval: metering.DAY,
IntervalsCount: 1,
}
prepaidOrder := &metering.CustomerPrepaid{
Id: uuid.NewRandom().String(),
CustomerId: customerId,
ExternalPayment: true,
StartTimeInSeconds: startTimeInSeconds,
PrepaidPrice: 123,
PrepaidOfferVersion: -1,
RecurrenceFrequency: recurrenceFrequency,
}
// Create a prepaid order
prepaidOrder, err := prepaidClient.CreatePrepaidOrder(prepaidOrder)
if err != nil {
fmt.Println("Prepaid API error: ", err)
return
}
// Get a list of all active prepaid orders
prepaidOrders, err := prepaidClient.GetActivePrepaidOrders(customerId)
if err != nil {
fmt.Println("Prepaid API error: ", err)
return
}
// Update the external payment status of a prepaid order
externalPrepaidPaymentStatus := &metering.ExternalPrepaidPaymentStatus{
PaymentStatus: metering.SETTLED,
SystemName: "Stripe",
PaymentId: "payment-id-1",
PaymentTimeInSeconds: (time.Now().UnixNano() / int64(time.Second)),
PrepaidUri: prepaidOrder.FirstInvoiceUri,
}
externalPrepaidPaymentStatus, err = prepaidClient.UpdateExternalPrepaidStatus(externalPrepaidPaymentStatus)
// Delete a prepaid order
err = prepaidClient.DeletePrepaidOrder(prepaidOrder.Id, customerId)
if err != nil {
fmt.Println("Prepaid API error: ", err)
return
}
}
Signals
package main
import (
"encoding/json"
"fmt"
"github.com/amberflo/metering-go/v2"
)
//obtain your Amberflo API Key
var apiKey = "my-api-key"
func main() {
signalsClient := metering.NewSignalsClient(
apiKey,
//use a custom logger
//metering.WithCustomLogger(customLogger),
)
invoiceAlert := &metering.Notification{
Name: "invoice-tracker-alert",
NotificationType: metering.Invoice,
Email: []string{"[email protected]"},
ThresholdValue: "200",
CustomerFilterMode: metering.PerCustomer,
Enabled: true,
}
//Create a new signal
invoiceAlert, err := signalsClient.CreateSignal(invoiceAlert)
if err != nil {
fmt.Println("API error: ", err)
return
}
//update an existing signal
invoiceAlert.ThresholdValue = "150"
invoiceAlert.Enabled = false //disable a signal without deleting
invoiceAlert, err = signalsClient.UpdateSignal(invoiceAlert)
if err != nil {
fmt.Println("API error: ", err)
return
}
//get an existing signal
invoiceAlert, err = signalsClient.GetSignal(invoiceAlert.Id)
if err != nil {
fmt.Println("API error: ", err)
return
}
//delete a signal
invoiceAlert, err = signalsClient.DeleteSignal(invoiceAlert.Id)
if err != nil {
fmt.Println("API error: ", err)
return
}
fmt.Println("signal with following id deleted: ", invoiceAlert.Id)
}
Batching Records
Amberflo.io libraries are built to support high throughput environments. That means you can safely send hundreds of meter records per second. For example, you can chose to deploy it on a web server that is serving hundreds of requests per second.
However, every call does not result in a HTTP request, but is queued in memory instead. Messages are batched and flushed in the background, allowing for much faster operation. The size of batch and rate of flush can be customized.
Defaults: The library will flush every 100 messages (configuration parameter: BatchSize ) or
if 1 second has passed since the last flush (configuration parameter: IntervalSeconds)
There is a maximum of 500KB per batch request and 32KB per call.
Flush on demand (Blocking call)
You can flush on demand. For example, at the end of your program, you’ll want to flush to make sure there’s nothing left in the queue. Just call the flush method:
metering.Flush()
Please note: Calling this method will block the calling thread until there are no messages left in the queue. So, you’ll want to use it as part of your cleanup scripts and avoid using it as part of the request lifecycle.
Updated about 1 year ago