Elastic Logstash

Meter Ingestion via Logstash

Integrating metering data into Amberflo can be done with open source Logstash.
The integration uses the Logstash S3 output plugin and writes the metering records to the Amberflo S3 bucket.
You can use Logstash's rich language for parsing the metering data out of your logs/repositories.
Logstash can extract data from files, Elasticsearch, JDBC, S3, MongoDB and other systems using various input plugins. https://www.elastic.co/guide/en/logstash/current/input-plugins.html

Example - Logstash configuration

In the example below we will share a sample Logstash configuration that reads log lines from a file and writes the relevant metering records to Amberflo's S3 bucket.
We will tail new files locally from /Users/demoaccount/input/*
the sample file format
log line 1
log line 2
myMeter 2 myCustomerId amberflo_meter
log line 3

Logstash will read any new file and transform them using the mutate logic into metering records.
In this example we ignore any line without the string: amberflo_meter. We splits based on the space character " ", and treat the meterApiName, meterValue, cusomterId as the first, second and third object accordingly.

input { 
   file {  
      path => "/Users/demoaccount/input/*" 
    } 
} 

filter {
    if "amberflo_meter" in [message]{
    ruby { code => "event.set('time', ((event.get('@timestamp').to_f*1000).to_i).to_s)" } 
     mutate {
            split => { "message" => " " }
            add_field => { "meterApiName" => "%{[message][0]}" }
            add_field => { "meterValue" => "%{[message][1]}" }
            add_field => { "customerId" => "%{[message][2]}" }
            remove_field => ["host","message","@version","@timestamp"]
        } 
    }else{
    drop {}
    }

}

output {
   s3{
     access_key_id => "XXXXXX"             
     secret_access_key => "YYYYYYYYYYYYYY" 
     region => "us-west-2"                    
     bucket => "demo-amberflo"               
     size_file => 2048                        
     time_file => 5                          
     codec => "json"                        
     canned_acl => "bucket-owner-full-control"                
   }

}

The resulting file (meter record) will be:

{
	"meterApiName": "myMeter",
	"meterValue": "2",
	"customerId": "myCustomerId",
	"time": "1621619742810",
	"path": "/Users/demoaccount/input/sample.txt"
}

JDBC source

You can use the Logstash JDBC source to extract the metering info from your repository.
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html

input {
  jdbc {
    statement => "SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > :sql_last_value"
    use_column_value => true
    tracking_column => "id"
    jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
    jdbc_user => "mysql"
    schedule => "* * * * *"
    # ... other configuration bits
  }
}

Elasticsearch input

You can use the Elasticsearch input if you have metering data in your logs.
An example input plugin looks like:

input {
elasticsearch {
        hosts => "search-myes-cluster.us-west-2.es.amazonaws.com:443"
        index => "cwl--aws-lambda-meter-definition-api-lambda-2021.07"
        query => '{ "query": {"bool": {"filter": [{"range": {"@timestamp": {"from": "now-1d/d", "to": "now/d", "include_lower": true, "include_upper": true, "format": "epoch_millis", "boost": 1 } } }, {"query_string": {"query": "*amberflo_meter*", "default_field": "@message", "fields": [], "type": "best_fields", "default_operator": "or", "max_determinized_states": 10000, "enable_position_increments": true, "fuzziness": "AUTO", "fuzzy_prefix_length": 0, "fuzzy_max_expansions": 50, "phrase_slop": 0, "escape": false, "auto_generate_synonyms_phrase_query": true, "fuzzy_transpositions": true, "boost": 1 } } ], "adjust_pure_negative": true, "boost": 1 } }, "aggregations": {} }'
        size => 500
        scroll => "5m"
        docinfo => true
        ssl => true
        user => "myuser"
        password => "mypwd"
        #schedule => "*/1 * * * *"
      }

}