Guides
...
Usage Metering
Backfill Meters and Customers
Java sample to backfill meters from Postgres
2min
This sample illustrates how to backfill meters from historic data stored in Postgres using the JAVA SDK.
The sample performs the following tasks:
- Read historical data from a Postgres table of tasks. The sample uses tasks sent and accepted by customers as meters.
- For each row, ingest the task sent or accepted meters into Amberflo depending on the task status. The
Maven dependencies (check for latest versions):
XML
1<dependencies>
2 <dependency>
3 <groupId>io.amberflo</groupId>
4 <artifactId>metering-java-client</artifactId>
5 <version>1.3.6</version>
6 </dependency>
7 <dependency>
8 <groupId>org.postgresql</groupId>
9 <artifactId>postgresql</artifactId>
10 <version>42.3.1</version>
11 </dependency>
12 <dependency>
13 <groupId>com.stripe</groupId>
14 <artifactId>stripe-java</artifactId>
15 <version>20.52.0</version>
16 </dependency>
17</dependencies>
Java code:
Java
1import com.amberflo.metering.ingest.MeteringContext;
2import com.amberflo.metering.ingest.meter_message.Domain;
3import com.amberflo.metering.ingest.meter_message.MeterMessage;
4import com.amberflo.metering.ingest.meter_message.MeterMessageBuilder;
5import com.amberflo.metering.ingest.meter_message.Region;
6
7import java.sql.Connection;
8import java.sql.DriverManager;
9import java.sql.ResultSet;
10import java.sql.Statement;
11import java.time.LocalDateTime;
12import java.time.ZoneId;
13import java.util.Date;
14import java.util.HashMap;
15import java.util.Map;
16
17import static com.amberflo.metering.ingest.MeteringContext.metering;
18
19public class TasksPostgresBackfill {
20 //TODO: set the database connection, queries, and Amberflo info
21 final static String DB_HOST = "sample-host.us-west-2.rds.amazonaws.com";
22 final static String DB_NAME = "sampleDb";
23 final static String DB_PASS = "sample-pass";
24 final static String DB_USER = "sample-user";
25 final static String SCHEMA = "sampleSchema";
26 final static String COUNT_QUERY = "SELECT count(*) AS total FROM tasks where status in ('sent', 'accepted');";
27 final static String SELECT_QUERY = "SELECT id, tasks_type, user_id, status, email_sent_date, accepted_date FROM tasks where status in ('sent', 'accepted');";
28 final static String AMBERFLO_API_KEY = "amberflo-api-key";
29 //END TODO
30
31 final static String TASKS_SENT_METER = "tasks_sent";
32 final static String TASKS_ACCEPTED_METER = "tasks_accepted";
33 static int counter_sent = 0;
34 static int counter_accepted = 0;
35
36 public static void main(final String[] args) throws Exception {
37 Statement stmt = null;
38 Connection con = null;
39 ResultSet rs = null;
40 try {
41 MeteringContext
42 .createOrReplaceContext(
43 AMBERFLO_API_KEY,
44 "tasksManager",
45 Domain.Prod,
46 Region.US_West,
47 1,
48 10
49 );
50
51 final String url = String.format("jdbc:postgresql://%s:5432/%s?currentSchema=%s", DB_HOST, DB_NAME, SCHEMA);
52 con = DriverManager.getConnection(url, DB_USER, DB_PASS);
53 stmt = con.createStatement();
54 rs = stmt.executeQuery(COUNT_QUERY);
55 rs.next();
56 int total = rs.getInt("total");
57 System.out.println("Total tasks sent and accepted = " + total);
58
59 rs = stmt.executeQuery(SELECT_QUERY);
60 while (rs.next()) {
61 final int tasksId = rs.getInt("id");
62 final String customerId = rs.getString("user_id");
63 final String tasksType = rs.getString("tasks_type");
64 final String status = rs.getString("status");
65 final Date emailSentDate = rs.getDate("email_sent_date");
66 final Date acceptedDate = rs.getDate("accepted_date");
67 System.out.println(String.format("tasksId=%s, status=%s, customerId=%s, tasksType=%s, emailSentDate=%s, acceptedDate=%s",
68 tasksId, status, customerId, tasksType, emailSentDate, acceptedDate));
69 System.out.println();
70 if ("sent".equals(status)) {
71 ingestMeterForTask(Integer.toString(tasksId), customerId, tasksType, TASKS_SENT_METER, emailSentDate);
72 counter_sent++;
73 if (counter_sent % 100 == 0 && counter_sent > 0) {
74 Thread.sleep(5000);
75 }
76 } else if ("accepted".equals(status)) {
77 ingestMeterForTask(Integer.toString(tasksId), customerId, tasksType, TASKS_ACCEPTED_METER, acceptedDate);
78 counter_accepted++;
79 if (counter_accepted % 100 == 0 && counter_accepted > 0) {
80 Thread.sleep(5000);
81 }
82 }
83 }
84 System.out.println("DONE!!! total tasks sent ingested = " + counter_sent);
85 System.out.println("DONE!!! total tasks accepted ingested = " + counter_accepted);
86 } catch (Exception ex) {
87 ex.printStackTrace();
88 } finally {
89 if (rs != null) rs.close();
90 if (stmt != null) stmt.close();
91 if (con != null) con.close();
92 MeteringContext.flushAndClose();
93 }
94 }
95
96 public static void ingestMeterForTask(
97 final String tasksId,
98 final String customerId,
99 final String tasksType,
100 final String meterName,
101 final Date eventDate) {
102 try {
103
104 //java.sql.Date supports only Date components
105 final LocalDateTime eventTime =
106 new java.util.Date(eventDate.getTime())
107 .toInstant()
108 .atZone(ZoneId.of("UTC"))
109 .toLocalDateTime();
110
111 final Map<String, String> dimensions = new HashMap();
112 dimensions.put("tasksType", tasksType);
113
114 //set uniqueId to prevent duplicate ingested records in case this code
115 //is run multiple times
116 final MeterMessage meterMessage = MeterMessageBuilder
117 .createInstance(meterName, eventTime, customerId)
118 .setMeterValue(1)
119 .setUniqueId(tasksId)
120 .setDimensionsMap(dimensions)
121 .build();
122 metering().meter(meterMessage);
123
124 System.out.println(String.format("Amberflo ingestion: meter %s, task id %s, customerId %s, tasksType %s, eventTime %s",
125 meterName, tasksId, customerId, dimensions.get("tasksType"), eventTime));
126 } catch (Exception ex) {
127 ex.printStackTrace();
128 }
129 }
130}
Updated 31 Jul 2024
Did this page help you?