Guides
...
Usage Metering
Backfill Meters and Customers
Java sample to backfill meters from MongoDB
2min
Java sample to backfill meters from historical data stored in MongoDB into Amberflo.
The sample will perform the following tasks:
- Read entries from MongoDB collection. In this sample, events are ingested as meters into Amberflo.
- For each entry, ingest an event meter into Amberflo. Ensure the create time is used as ingested event time in UTC. Use the entry id as uniqueId of the ingested meter to prevent duplicated ingested records if the code were to run multiple times.
Maven dependencies (check for latest versions):
XML
1<dependencies>
2 <dependency>
3 <groupId>io.amberflo</groupId>
4 <artifactId>metering-java-client</artifactId>
5 <version>1.3.6</version>
6 </dependency>
7 <dependency>
8 <groupId>org.mongodb</groupId>
9 <artifactId>mongo-java-driver</artifactId>
10 <version>3.5.0</version>
11 </dependency>
12 <dependency>
13 <groupId>com.stripe</groupId>
14 <artifactId>stripe-java</artifactId>
15 <version>20.52.0</version>
16 </dependency>
17</dependencies>
Java
1package com.amberflo.backfill;
2
3import com.amberflo.metering.ingest.MeteringContext;
4import com.amberflo.metering.ingest.meter_message.Domain;
5import com.amberflo.metering.ingest.meter_message.MeterMessage;
6import com.amberflo.metering.ingest.meter_message.MeterMessageBuilder;
7import com.amberflo.metering.ingest.meter_message.Region;
8import com.mongodb.MongoClient;
9import com.mongodb.MongoClientURI;
10import com.mongodb.client.FindIterable;
11import com.mongodb.client.MongoCollection;
12import com.mongodb.client.MongoCursor;
13import com.mongodb.client.MongoDatabase;
14import org.bson.Document;
15
16import java.time.Instant;
17import java.time.LocalDateTime;
18import java.time.ZoneId;
19import java.util.HashMap;
20import java.util.Map;
21
22import static com.amberflo.metering.ingest.MeteringContext.metering;
23
24public class BackfillMongoDbData {
25 //TODO: set the database connection, queries, and Amberflo info
26 private static final String MONGO_HOST = "sample-db.mongo.host";
27 private static final int MONGO_PORT = 27017;
28 private static final String MONGO_USER = "sample-user";
29 private static final String MONGO_PASS = "pass";
30 private static final String DB_NAME = "sampleDb";
31 private final static String AMBERFLO_API_KEY = "amberflo-api-key";
32 //END TODO
33
34 private final static String EVENT_CREATE = "event_create";
35
36 public static void main(final String[] args) throws Exception {
37 MeteringContext
38 .createOrReplaceContext(
39 AMBERFLO_API_KEY,
40 "events-api",
41 Domain.Prod,
42 Region.US_West,
43 1,
44 10
45 );
46
47 // Mongodb connection string.
48 final String connectionString = String.format("mongodb://%s:%s@%s:%s", MONGO_USER, MONGO_PASS, MONGO_HOST, MONGO_PORT);
49 final MongoClientURI uri = new MongoClientURI(connectionString);
50
51 // Connecting to the mongodb server using the given client uri.
52 final MongoClient mongoClient = new MongoClient(uri);
53
54 // Fetching the database from the mongodb.
55 final MongoDatabase db = mongoClient.getDatabase(DB_NAME);
56
57 // Fetching the collection from the mongodb.
58 final MongoCollection<Document> col = db.getCollection("events");
59
60 // Performing a read operation on the collection.
61 final FindIterable<Document> fi = col.find();
62 final MongoCursor<Document> cursor = fi.iterator();
63 int counter = 1;
64 try {
65 while (cursor.hasNext()) {
66 final Document document = cursor.next();
67 if (document == null) {
68 continue;
69 }
70 //example of a nested document
71 final Document eventData = (Document) document.get("eventData");
72 final String eventSourceType = eventData.get("event_source_type").toString();
73 final String eventType = eventData.get("event_type").toString();
74 final String meterName = EVENT_CREATE;
75 final String customerId = eventData.get("customer_id").toString();
76 final String id = document.get("_id").toString();
77
78 //Converting timestamp stored as double epoch
79 final Long eventTimeEpoch = ((Double) Double.parseDouble(document.get("created_time_epoch_utc").toString())).longValue();
80 final LocalDateTime eventTime = Instant.ofEpochMilli(eventTimeEpoch).atZone(ZoneId.of("UTC")).toLocalDateTime();
81
82 System.out.println(String.format("Ingesting: event_source_type=%s, customerId=%s, id=%s, eventTime=%s, eventType=%s, meterName %s",
83 eventSourceType, customerId, id, eventTime, eventType, meterName));
84 System.out.println(document.toJson());
85
86 final Map<String, String> dimensions = new HashMap();
87 dimensions.put("event_source_type", eventSourceType);
88 dimensions.put("event_type", eventType);
89
90 final MeterMessage meterMessage = MeterMessageBuilder
91 .createInstance(meterName, eventTime, customerId)
92 .setMeterValue(1)
93 .setUniqueId(id)
94 .setDimensionsMap(dimensions)
95 .build();
96
97 metering().meter(meterMessage);
98
99 if (counter % 100 == 0) {
100 Thread.sleep(5000);
101 }
102 counter++;
103 }
104 System.out.println("Total meters ingested: " + counter);
105 } finally {
106 cursor.close();
107 mongoClient.close();
108 MeteringContext.flushAndClose();
109 System.out.println("done!");
110 }
111 }
112}
Updated 31 Jul 2024
Did this page help you?