MetricOutputDAO.java 8.15 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/*
 * Copyright (c) 2005-2010 KOM – Multimedia Communications Lab
 *
 * This file is part of PeerfactSim.KOM.
 * 
 * PeerfactSim.KOM is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * any later version.
 * 
 * PeerfactSim.KOM is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License
 * along with PeerfactSim.KOM.  If not, see <http://www.gnu.org/licenses/>.
 *
 */

package de.tud.kom.p2psim.impl.analyzer.metric.output;

23
24
25
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
26
import java.util.List;
27
import java.util.Map;
28
import java.util.Map.Entry;
29
30
31
import java.util.Set;

import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

import de.tud.kom.p2psim.api.common.SimHost;
import de.tud.kom.p2psim.impl.util.db.dao.DAO;
import de.tud.kom.p2psim.impl.util.db.dao.metric.MeasurementDAO;
import de.tud.kom.p2psim.impl.util.db.metric.MetricDescription;
import de.tud.kom.p2psim.impl.util.oracle.GlobalOracle;
import de.tudarmstadt.maki.simonstrator.api.Time;
import de.tudarmstadt.maki.simonstrator.api.common.metric.ActiveMetric;
import de.tudarmstadt.maki.simonstrator.api.common.metric.ActiveMetric.ActiveMetricListener;
import de.tudarmstadt.maki.simonstrator.api.common.metric.Metric;
import de.tudarmstadt.maki.simonstrator.api.common.metric.Metric.MetricValue;
import de.tudarmstadt.maki.simonstrator.api.util.XMLConfigurableConstructor;

/**
 * This class maps {@link Metric}s to calls to the DAO on regular intervals or
 * special actions.
 * 
 * @author Bjoern Richerzhagen
 * @version 1.0, 13.08.2012
 */
public class MetricOutputDAO extends AbstractOutput {

	protected long timeEnableDao = 0;

	protected long timeStopDao = Long.MAX_VALUE;

58
59
	protected Set<String> metricsToAggregate = new LinkedHashSet<>();

60
61
	protected List<MetricDaoAdapter> daoAdapters = new LinkedList<>();

62
63
64
65
66
67
	/**
	 * 
	 * @param table
	 */
	@XMLConfigurableConstructor({ "table" })
	public MetricOutputDAO(String table) {
68
		DAO.database = table;
69
70
71
	}

	public void setUser(String user) {
72
		DAO.username = user;
73
74
75
	}

	public void setPassword(String password) {
76
		DAO.password = password;
77
78
79
80
81
82
83
84
85
86
	}

	public void setTimeEnableDao(long timeEnableDao) {
		this.timeEnableDao = timeEnableDao;
	}

	public void setTimeStopDao(long timeStopDao) {
		this.timeStopDao = timeStopDao;
	}

87
88
89
90
91
92
93
94
95
96
97
98
	/**
	 * A list of PER-HOST metrics that are NOT written individually but instead
	 * statistical figures for a group of hosts is written. The group of hosts
	 * is defined by the hostGroup from the XML-config.
	 * 
	 * @param metricsToAggregate
	 */
	public void setToAggregate(String[] metricsToAggregate) {
		for (String metric : metricsToAggregate) {
			this.metricsToAggregate.add(metric);
		}
	}
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
	
	public boolean isToAggregate(Metric metric) {
		for (String string : metricsToAggregate) {
			if (metric.getName().equals(string)) {
				return true;
			}
			if (string.endsWith("*")) {
				// prefix matching
				String mName = metric.getName();
				if (mName.startsWith(string.substring(0, string.length()-1))) {
					return true;
				}
			}
			if (string.startsWith("*")) {
				// postfix matching
				String mName = metric.getName();
				if (mName.endsWith(string.substring(1, string.length()))) {
					return true;
				}
			}
		}
		return false;
	}
122
123
124
125
126
127
128
129
130
131

	@Override
	public void onInitialize(List<Metric> metrics) {
		for (Metric metric : metrics) {
			/*
			 * Only active metrics are allowed. We register as a listener and
			 * wait for our call.
			 */
			if (metric instanceof ActiveMetric) {
				ActiveMetric am = (ActiveMetric) metric;
132
133
134
				MetricDaoAdapter adapter = new MetricDaoAdapter(am);
				am.addActiveMetricListener(adapter);
				daoAdapters.add(adapter);
135
136
137
138
139
140
			}
		}
	}

	@Override
	public void onStop() {
141
142
143
		for (MetricDaoAdapter adapter : daoAdapters) {
			adapter.onStop();
		}
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
		/*
		 * Commit missing values
		 */
		DAO.commitQueue();
	}

	/**
	 * This class helps in persisting a metric using the {@link MeasurementDAO}
	 * 
	 * @author Bjoern Richerzhagen
	 * @version 1.0, 13.08.2012
	 */
	private class MetricDaoAdapter implements ActiveMetricListener {

		private final ActiveMetric metric;

		private final MetricDescription md;

		private final MeasurementDAO dao = new MeasurementDAO();

		private final List<SimHost> hosts;

166
167
168
		private final boolean writeAggregates;

		private final Map<String, List<SimHost>> hostsByGroup;
169
170
171

		private final Map<String, DescriptiveStatistics> globalStatsByGroup;

172
		private long timestampLastEvent = -1;
173

174
175
176
		public MetricDaoAdapter(ActiveMetric metric) {
			this.metric = metric;
			this.md = new MetricDescription(MetricOutputDAO.class.getName(),
177
178
					metric.getName(), metric.getDescription(),
					metric.getUnit().toString());
179
			this.hosts = GlobalOracle.getHosts();
180
			this.writeAggregates = isToAggregate(metric);
181
			this.hostsByGroup = new LinkedHashMap<>();
182
			this.globalStatsByGroup = new LinkedHashMap<>();
183
184
185
186
			for (SimHost simHost : hosts) {
				String groupId = simHost.getProperties().getGroupID();
				if (!this.hostsByGroup.containsKey(groupId)) {
					this.hostsByGroup.put(groupId, new LinkedList<>());
187
188
					this.globalStatsByGroup.put(groupId,
							new DescriptiveStatistics());
189
190
191
				}
				this.hostsByGroup.get(groupId).add(simHost);
			}
192
193
		}

194
195
196
197
198
199
200
201
202
203
204
205
206
		public void onStop() {
			if (writeAggregates) {
				for (Entry<String, DescriptiveStatistics> groupData : globalStatsByGroup
						.entrySet()) {
					MeasurementDAO.storeGroupStatisticsMeasurement(md,
							groupData.getKey(), Time.getCurrentTime(),
							groupData.getValue(),
							Time.getCurrentTime() - timeEnableDao, true);
				}
				globalStatsByGroup.clear();
			}
		}

207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
		@Override
		public void onMetricUpdate(ActiveMetric metric) {
			long time = Time.getCurrentTime();

			if (time < timeEnableDao || time > timeStopDao) {
				return;
			}

			if (metric.isOverallMetric()) {
				// global
				MetricValue mv = metric.getOverallMetric();
				Object val = mv.getValue();
				if (mv.isValid()) {
					if (val instanceof Number) {
						double vd = ((Number) val).doubleValue();
						dao.storeGlobalSingleMeasurement(md, time, vd);
					}
				}
			} else {
				// per-host metric
227
228
229
230
231
232
233
234
235
				if (writeAggregates) {
					/*
					 * Write aggregates instead of individual metric values.
					 * This can be improved w.r.t. performance, but currently we
					 * do not really care.
					 */
					// Iterate over groups
					for (String group : hostsByGroup.keySet()) {
						DescriptiveStatistics stats = new DescriptiveStatistics();
236
237
						DescriptiveStatistics globalStats = globalStatsByGroup
								.get(group);
238
239
240
241
242
243
244
245
246
247
248
249
250
251
						for (SimHost host : hostsByGroup.get(group)) {
							MetricValue mv = metric
									.getPerHostMetric(host.getId());
							if (mv != null) {
								Object val = mv.getValue();
								if (mv.isValid()) {
									if (val instanceof Number) {
										double vd = ((Number) val)
												.doubleValue();
										if (Double.isNaN(vd)) {
											continue;
										}
										// Add value
										stats.addValue(vd);
252
										globalStats.addValue(vd);
253
254
255
256
257
									}
								}
							}
						}
						// Write Group stats
258
259
						long observationDuration = Time.getCurrentTime()
								- timestampLastEvent;
260
						if (timestampLastEvent == -1) {
261
262
							observationDuration = Time.getCurrentTime()
									- timeEnableDao;
263
						}
264
						MeasurementDAO.storeGroupStatisticsMeasurement(md,
265
								group, time, stats, observationDuration, false);
266
					}
267
					timestampLastEvent = Time.getCurrentTime();
268
269
270
271
272
273
274
275
276
277
278
279
280
				} else {
					for (SimHost host : hosts) {
						MetricValue mv = metric.getPerHostMetric(host.getId());
						if (mv != null) {
							Object val = mv.getValue();
							if (mv.isValid()) {
								if (val instanceof Number) {
									double vd = ((Number) val).doubleValue();
									if (Double.isNaN(vd)) {
										continue;
									}
									dao.storeSingleMeasurement(md,
											host.getHostId(), time, vd);
281
282
283
284
285
286
287
288
289
290
291
								}
							}
						}
					}
				}
			}
		}

	}

}