Usage Metering
Backfill Meters and Customers
Java sample to backfill meters from Postgres
2min
this sample demonstrates how to use the amberflo java sdk to backfill historical meter data stored in a postgres database it’s designed to help you populate amberflo with meaningful usage events derived from previously recorded customer activity the sample performs the following tasks reads historical records from a postgres table of tasks for each row determines the task status (e g , sent, accepted) ingests the appropriate meter event into amberflo based on that status "task sent" → ingest as a task sent meter "task accepted" → ingest as a task accepted meter this pattern enables teams to retroactively meter usage from business logic or transaction logs, ensuring billing accuracy and complete usage history in amberflo the maven dependencies (check for latest versions) \<dependencies> \<dependency> \<groupid>io amberflo\</groupid> \<artifactid>metering java client\</artifactid> \<version>1 3 6\</version> \</dependency> \<dependency> \<groupid>org postgresql\</groupid> \<artifactid>postgresql\</artifactid> \<version>42 3 1\</version> \</dependency> \<dependency> \<groupid>com stripe\</groupid> \<artifactid>stripe java\</artifactid> \<version>20 52 0\</version> \</dependency> \</dependencies> java code import com amberflo metering ingest meteringcontext; import com amberflo metering ingest meter message domain; import com amberflo metering ingest meter message metermessage; import com amberflo metering ingest meter message metermessagebuilder; import com amberflo metering ingest meter message region; import java sql connection; import java sql drivermanager; import java sql resultset; import java sql statement; import java time localdatetime; import java time zoneid; import java util date; import java util hashmap; import java util map; import static com amberflo metering ingest meteringcontext metering; public class taskspostgresbackfill { //todo set the database connection, queries, and amberflo info final static string db host = "sample host us west 2 rds amazonaws com"; final static string db name = "sampledb"; final static string db pass = "sample pass"; final static string db user = "sample user"; final static string schema = "sampleschema"; final static string count query = "select count( ) as total from tasks where status in ('sent', 'accepted');"; final static string select query = "select id, tasks type, user id, status, email sent date, accepted date from tasks where status in ('sent', 'accepted');"; final static string amberflo api key = "amberflo api key"; //end todo final static string tasks sent meter = "tasks sent"; final static string tasks accepted meter = "tasks accepted"; static int counter sent = 0; static int counter accepted = 0; public static void main(final string\[] args) throws exception { statement stmt = null; connection con = null; resultset rs = null; try { meteringcontext createorreplacecontext( amberflo api key, "tasksmanager", domain prod, region us west, 1, 10 ); final string url = string format("jdbc\ postgresql //%s 5432/%s?currentschema=%s", db host, db name, schema); con = drivermanager getconnection(url, db user, db pass); stmt = con createstatement(); rs = stmt executequery(count query); rs next(); int total = rs getint("total"); system out println("total tasks sent and accepted = " + total); rs = stmt executequery(select query); while (rs next()) { final int tasksid = rs getint("id"); final string customerid = rs getstring("user id"); final string taskstype = rs getstring("tasks type"); final string status = rs getstring("status"); final date emailsentdate = rs getdate("email sent date"); final date accepteddate = rs getdate("accepted date"); system out println(string format("tasksid=%s, status=%s, customerid=%s, taskstype=%s, emailsentdate=%s, accepteddate=%s", tasksid, status, customerid, taskstype, emailsentdate, accepteddate)); system out println(); if ("sent" equals(status)) { ingestmeterfortask(integer tostring(tasksid), customerid, taskstype, tasks sent meter, emailsentdate); counter sent++; if (counter sent % 100 == 0 && counter sent > 0) { thread sleep(5000); } } else if ("accepted" equals(status)) { ingestmeterfortask(integer tostring(tasksid), customerid, taskstype, tasks accepted meter, accepteddate); counter accepted++; if (counter accepted % 100 == 0 && counter accepted > 0) { thread sleep(5000); } } } system out println("done!!! total tasks sent ingested = " + counter sent); system out println("done!!! total tasks accepted ingested = " + counter accepted); } catch (exception ex) { ex printstacktrace(); } finally { if (rs != null) rs close(); if (stmt != null) stmt close(); if (con != null) con close(); meteringcontext flushandclose(); } } public static void ingestmeterfortask( final string tasksid, final string customerid, final string taskstype, final string metername, final date eventdate) { try { //java sql date supports only date components final localdatetime eventtime = new java util date(eventdate gettime()) toinstant() atzone(zoneid of("utc")) tolocaldatetime(); final map\<string, string> dimensions = new hashmap(); dimensions put("taskstype", taskstype); //set uniqueid to prevent duplicate ingested records in case this code //is run multiple times final metermessage metermessage = metermessagebuilder createinstance(metername, eventtime, customerid) setmetervalue(1) setuniqueid(tasksid) setdimensionsmap(dimensions) build(); metering() meter(metermessage); system out println(string format("amberflo ingestion meter %s, task id %s, customerid %s, taskstype %s, eventtime %s", metername, tasksid, customerid, dimensions get("taskstype"), eventtime)); } catch (exception ex) { ex printstacktrace(); } } }