Usage Metering
Backfill Meters and Customers
Java sample to backfill meters from Postgres
2min
this sample illustrates how to backfill meters from historic data stored in postgres using the java sdk the sample performs the following tasks read historical data from a postgres table of tasks the sample uses tasks sent and accepted by customers as meters for each row, ingest the task sent or accepted meters into amberflo depending on the task status the maven dependencies (check for latest versions) \<dependencies> \<dependency> \<groupid>io amberflo\</groupid> \<artifactid>metering java client\</artifactid> \<version>1 3 6\</version> \</dependency> \<dependency> \<groupid>org postgresql\</groupid> \<artifactid>postgresql\</artifactid> \<version>42 3 1\</version> \</dependency> \<dependency> \<groupid>com stripe\</groupid> \<artifactid>stripe java\</artifactid> \<version>20 52 0\</version> \</dependency> \</dependencies> java code import com amberflo metering ingest meteringcontext; import com amberflo metering ingest meter message domain; import com amberflo metering ingest meter message metermessage; import com amberflo metering ingest meter message metermessagebuilder; import com amberflo metering ingest meter message region; import java sql connection; import java sql drivermanager; import java sql resultset; import java sql statement; import java time localdatetime; import java time zoneid; import java util date; import java util hashmap; import java util map; import static com amberflo metering ingest meteringcontext metering; public class taskspostgresbackfill { //todo set the database connection, queries, and amberflo info final static string db host = "sample host us west 2 rds amazonaws com"; final static string db name = "sampledb"; final static string db pass = "sample pass"; final static string db user = "sample user"; final static string schema = "sampleschema"; final static string count query = "select count( ) as total from tasks where status in ('sent', 'accepted');"; final static string select query = "select id, tasks type, user id, status, email sent date, accepted date from tasks where status in ('sent', 'accepted');"; final static string amberflo api key = "amberflo api key"; //end todo final static string tasks sent meter = "tasks sent"; final static string tasks accepted meter = "tasks accepted"; static int counter sent = 0; static int counter accepted = 0; public static void main(final string\[] args) throws exception { statement stmt = null; connection con = null; resultset rs = null; try { meteringcontext createorreplacecontext( amberflo api key, "tasksmanager", domain prod, region us west, 1, 10 ); final string url = string format("jdbc\ postgresql //%s 5432/%s?currentschema=%s", db host, db name, schema); con = drivermanager getconnection(url, db user, db pass); stmt = con createstatement(); rs = stmt executequery(count query); rs next(); int total = rs getint("total"); system out println("total tasks sent and accepted = " + total); rs = stmt executequery(select query); while (rs next()) { final int tasksid = rs getint("id"); final string customerid = rs getstring("user id"); final string taskstype = rs getstring("tasks type"); final string status = rs getstring("status"); final date emailsentdate = rs getdate("email sent date"); final date accepteddate = rs getdate("accepted date"); system out println(string format("tasksid=%s, status=%s, customerid=%s, taskstype=%s, emailsentdate=%s, accepteddate=%s", tasksid, status, customerid, taskstype, emailsentdate, accepteddate)); system out println(); if ("sent" equals(status)) { ingestmeterfortask(integer tostring(tasksid), customerid, taskstype, tasks sent meter, emailsentdate); counter sent++; if (counter sent % 100 == 0 && counter sent > 0) { thread sleep(5000); } } else if ("accepted" equals(status)) { ingestmeterfortask(integer tostring(tasksid), customerid, taskstype, tasks accepted meter, accepteddate); counter accepted++; if (counter accepted % 100 == 0 && counter accepted > 0) { thread sleep(5000); } } } system out println("done!!! total tasks sent ingested = " + counter sent); system out println("done!!! total tasks accepted ingested = " + counter accepted); } catch (exception ex) { ex printstacktrace(); } finally { if (rs != null) rs close(); if (stmt != null) stmt close(); if (con != null) con close(); meteringcontext flushandclose(); } } public static void ingestmeterfortask( final string tasksid, final string customerid, final string taskstype, final string metername, final date eventdate) { try { //java sql date supports only date components final localdatetime eventtime = new java util date(eventdate gettime()) toinstant() atzone(zoneid of("utc")) tolocaldatetime(); final map\<string, string> dimensions = new hashmap(); dimensions put("taskstype", taskstype); //set uniqueid to prevent duplicate ingested records in case this code //is run multiple times final metermessage metermessage = metermessagebuilder createinstance(metername, eventtime, customerid) setmetervalue(1) setuniqueid(tasksid) setdimensionsmap(dimensions) build(); metering() meter(metermessage); system out println(string format("amberflo ingestion meter %s, task id %s, customerid %s, taskstype %s, eventtime %s", metername, tasksid, customerid, dimensions get("taskstype"), eventtime)); } catch (exception ex) { ex printstacktrace(); } } }