AWS CloudWatch

Meter Ingestion via Amberflo serverless agent for AWS CloudWatch

Amberflo provides a serverless agent (or fully managed hosted collectors) that requires zero download and can automatically extract records from an AWS Cloudwatch repository.

This can be useful in two ways -

  1. To quickly test the Amberflo platform (without requiring any coding or library import).
    If you want to try out Amberflo without making changes to your code (and importing a new library), you can configure Amberflo's Serverless Agent for AWS Cloudwatch to automatically extract data from Cloudwatch into Amberflo as meters.

You simply configure our Serverless Agent (via AWS IAM policies) to attach to your Cloudwatch system in your AWS VPC. You then simply write meters as logs with an Amberflo tag. Our Serverless Agent will automatically extract all logs with the tag name - Amberflo, and ingest them as meters.

  1. To use your existing logging infrastructure to write meters.
    Once you've defined the meters (using CLI or Admin Console), you can simply write these meters to your standard log. This approach provides you an option for not importing the Amberflo library. The downside is that there may be an additional delay in Amberflo receiving the meters. Additionally, (inadvertent) misconfigurations on AWS Cloudwatch can result in meter losses and therefore, Amberflo cannot guarantee 100% accuracy when using this approach.

To learn more about our integration with AWS Cloudwatch, please contact us at: [email protected]

822822

Please use the following AWS guide Using CloudWatch Logs subscription filters - Amazon CloudWatch Logs for setting up the Kinesis stream and CloudWatch subscription filter.

The Lambda function code is under https://github.com/amberflo/metering-python-sample/blob/main/lambda_cloudwatch_amberflo.py

import base64
import datetime
import gzip
import io 
import json
import os
import time
import uuid
import boto3

import metering
metering.app_key = 'amberfloAppKey'

def lambda_handler(records, context):

    # print(records)
    num_records = len(records["Records"])
    print("INFO: NumRecords is {}".format(num_records))

    for record in records["Records"]:
        datablob = record["kinesis"]["data"]
        try:
            entry = decode(datablob)
        except Exception:
            print("ERROR decode Record: {}".format(str(record)))
            continue
        
        print(entry)
        logevents = entry['logEvents']
        
        for log_event in logevents:
            if 'amberflo_meter' not in log_event['message'] : 
                continue

            if 'amberflo_meter:' in log_event['message'] : 
                record = json.loads(log_event['message'].split("amberflo_meter:")[1])              
            if '\"amberflo_meter\":' in log_event['message'] : 
                record = json.loads(log_event['message'])['amberflo_meter']
            if 'tenant' in record :
                    record['customerId'] = record['tenant']
            if 'tenant_id' in record :
                    record['customerId'] = record['tenant_id']
            if 'customerId' in record :
                    record['customer'] = record['customerId']
            if 'time' not in record:
                record['time'] = int(round(time.time() * 1000))
            if 'dimensions' in  record and record['dimensions'] is None:
                del record['dimensions']

            metering.meter(record)

    metering.flush()
                
    return ''
    
    
    
def decode(datablob):
    try:
        data = base64.b64decode(datablob)
        striodata = io.BytesIO(data)
        with gzip.GzipFile(fileobj=striodata, mode="r") as f:
            entry = json.loads(f.read())
        return entry
    except Exception as e:
        print("ERROR: Unable to decode kinesis datablob")
        print("ERROR: {}".format(e))
        raise e

This code takes a log line like:

{"amberflo_meter":{"customerId":"3","meterApiName":"UsageService_processing_time_millis","meterValue":478,"uniqueId":"71a8545f-d197-45b9-b694-22d7b45ac336","dimensions":{"class":"MetadataObjectService"}}}

{"amberflo_meter":{"customerId":"3","meterApiName":"UsageService_processing_time_millis","meterValue":478,"uniqueId":"71a8545f-d197-45b9-b694-22d7b45ac336","dimensions":{"class":"MetadataObjectService"}}}

into a meter record that we will write to the S3 bucket generated by Amberflo.

[{"amberflo_meter":{"customerId":"3","meterApiName":"UsageService_processing_time_millis","meterValue":478,"uniqueId":"71a8545f-d197-45b9-b694-22d7b45ac336","dimensions":{"class":"MetadataObjectService"}}}]

You can take the S3 details from the Onboarding screen in the Amberflo Console.

18351835

Create a Kinesis stream: if have not done this yet, please use the following AWS guide: Using CloudWatch Logs subscription filters - Amazon CloudWatch Logs for setting up the Kinesis stream and CloudWatch subscription filter.

To setup a subscription filter to send the relevant logs from CloudWatch to Kinesis (in the following example we will take only log lines with the text “amberflo_meter”):

aws logs put-subscription-filter --filter-pattern amberflo_meter

Thats it, you can go to the Amberflo Console and see the metering events created. If there are any problems, look at the Lambda function logs.

12901290