0001
0002 #include <linux/ceph/ceph_debug.h>
0003
0004 #include <linux/types.h>
0005 #include <linux/percpu_counter.h>
0006 #include <linux/math64.h>
0007
0008 #include "metric.h"
0009 #include "mds_client.h"
0010
0011 static void ktime_to_ceph_timespec(struct ceph_timespec *ts, ktime_t val)
0012 {
0013 struct timespec64 t = ktime_to_timespec64(val);
0014 ceph_encode_timespec64(ts, &t);
0015 }
0016
0017 static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
0018 struct ceph_mds_session *s)
0019 {
0020 struct ceph_metric_head *head;
0021 struct ceph_metric_cap *cap;
0022 struct ceph_metric_read_latency *read;
0023 struct ceph_metric_write_latency *write;
0024 struct ceph_metric_metadata_latency *meta;
0025 struct ceph_metric_dlease *dlease;
0026 struct ceph_opened_files *files;
0027 struct ceph_pinned_icaps *icaps;
0028 struct ceph_opened_inodes *inodes;
0029 struct ceph_read_io_size *rsize;
0030 struct ceph_write_io_size *wsize;
0031 struct ceph_client_metric *m = &mdsc->metric;
0032 u64 nr_caps = atomic64_read(&m->total_caps);
0033 u32 header_len = sizeof(struct ceph_metric_header);
0034 struct ceph_msg *msg;
0035 s64 sum;
0036 s32 items = 0;
0037 s32 len;
0038
0039 len = sizeof(*head) + sizeof(*cap) + sizeof(*read) + sizeof(*write)
0040 + sizeof(*meta) + sizeof(*dlease) + sizeof(*files)
0041 + sizeof(*icaps) + sizeof(*inodes) + sizeof(*rsize)
0042 + sizeof(*wsize);
0043
0044 msg = ceph_msg_new(CEPH_MSG_CLIENT_METRICS, len, GFP_NOFS, true);
0045 if (!msg) {
0046 pr_err("send metrics to mds%d, failed to allocate message\n",
0047 s->s_mds);
0048 return false;
0049 }
0050
0051 head = msg->front.iov_base;
0052
0053
0054 cap = (struct ceph_metric_cap *)(head + 1);
0055 cap->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_CAP_INFO);
0056 cap->header.ver = 1;
0057 cap->header.compat = 1;
0058 cap->header.data_len = cpu_to_le32(sizeof(*cap) - header_len);
0059 cap->hit = cpu_to_le64(percpu_counter_sum(&m->i_caps_hit));
0060 cap->mis = cpu_to_le64(percpu_counter_sum(&m->i_caps_mis));
0061 cap->total = cpu_to_le64(nr_caps);
0062 items++;
0063
0064
0065 read = (struct ceph_metric_read_latency *)(cap + 1);
0066 read->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_READ_LATENCY);
0067 read->header.ver = 2;
0068 read->header.compat = 1;
0069 read->header.data_len = cpu_to_le32(sizeof(*read) - header_len);
0070 sum = m->metric[METRIC_READ].latency_sum;
0071 ktime_to_ceph_timespec(&read->lat, sum);
0072 ktime_to_ceph_timespec(&read->avg, m->metric[METRIC_READ].latency_avg);
0073 read->sq_sum = cpu_to_le64(m->metric[METRIC_READ].latency_sq_sum);
0074 read->count = cpu_to_le64(m->metric[METRIC_READ].total);
0075 items++;
0076
0077
0078 write = (struct ceph_metric_write_latency *)(read + 1);
0079 write->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_WRITE_LATENCY);
0080 write->header.ver = 2;
0081 write->header.compat = 1;
0082 write->header.data_len = cpu_to_le32(sizeof(*write) - header_len);
0083 sum = m->metric[METRIC_WRITE].latency_sum;
0084 ktime_to_ceph_timespec(&write->lat, sum);
0085 ktime_to_ceph_timespec(&write->avg, m->metric[METRIC_WRITE].latency_avg);
0086 write->sq_sum = cpu_to_le64(m->metric[METRIC_WRITE].latency_sq_sum);
0087 write->count = cpu_to_le64(m->metric[METRIC_WRITE].total);
0088 items++;
0089
0090
0091 meta = (struct ceph_metric_metadata_latency *)(write + 1);
0092 meta->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_METADATA_LATENCY);
0093 meta->header.ver = 2;
0094 meta->header.compat = 1;
0095 meta->header.data_len = cpu_to_le32(sizeof(*meta) - header_len);
0096 sum = m->metric[METRIC_METADATA].latency_sum;
0097 ktime_to_ceph_timespec(&meta->lat, sum);
0098 ktime_to_ceph_timespec(&meta->avg, m->metric[METRIC_METADATA].latency_avg);
0099 meta->sq_sum = cpu_to_le64(m->metric[METRIC_METADATA].latency_sq_sum);
0100 meta->count = cpu_to_le64(m->metric[METRIC_METADATA].total);
0101 items++;
0102
0103
0104 dlease = (struct ceph_metric_dlease *)(meta + 1);
0105 dlease->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_DENTRY_LEASE);
0106 dlease->header.ver = 1;
0107 dlease->header.compat = 1;
0108 dlease->header.data_len = cpu_to_le32(sizeof(*dlease) - header_len);
0109 dlease->hit = cpu_to_le64(percpu_counter_sum(&m->d_lease_hit));
0110 dlease->mis = cpu_to_le64(percpu_counter_sum(&m->d_lease_mis));
0111 dlease->total = cpu_to_le64(atomic64_read(&m->total_dentries));
0112 items++;
0113
0114 sum = percpu_counter_sum(&m->total_inodes);
0115
0116
0117 files = (struct ceph_opened_files *)(dlease + 1);
0118 files->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_OPENED_FILES);
0119 files->header.ver = 1;
0120 files->header.compat = 1;
0121 files->header.data_len = cpu_to_le32(sizeof(*files) - header_len);
0122 files->opened_files = cpu_to_le64(atomic64_read(&m->opened_files));
0123 files->total = cpu_to_le64(sum);
0124 items++;
0125
0126
0127 icaps = (struct ceph_pinned_icaps *)(files + 1);
0128 icaps->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_PINNED_ICAPS);
0129 icaps->header.ver = 1;
0130 icaps->header.compat = 1;
0131 icaps->header.data_len = cpu_to_le32(sizeof(*icaps) - header_len);
0132 icaps->pinned_icaps = cpu_to_le64(nr_caps);
0133 icaps->total = cpu_to_le64(sum);
0134 items++;
0135
0136
0137 inodes = (struct ceph_opened_inodes *)(icaps + 1);
0138 inodes->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_OPENED_INODES);
0139 inodes->header.ver = 1;
0140 inodes->header.compat = 1;
0141 inodes->header.data_len = cpu_to_le32(sizeof(*inodes) - header_len);
0142 inodes->opened_inodes = cpu_to_le64(percpu_counter_sum(&m->opened_inodes));
0143 inodes->total = cpu_to_le64(sum);
0144 items++;
0145
0146
0147 rsize = (struct ceph_read_io_size *)(inodes + 1);
0148 rsize->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_READ_IO_SIZES);
0149 rsize->header.ver = 1;
0150 rsize->header.compat = 1;
0151 rsize->header.data_len = cpu_to_le32(sizeof(*rsize) - header_len);
0152 rsize->total_ops = cpu_to_le64(m->metric[METRIC_READ].total);
0153 rsize->total_size = cpu_to_le64(m->metric[METRIC_READ].size_sum);
0154 items++;
0155
0156
0157 wsize = (struct ceph_write_io_size *)(rsize + 1);
0158 wsize->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_WRITE_IO_SIZES);
0159 wsize->header.ver = 1;
0160 wsize->header.compat = 1;
0161 wsize->header.data_len = cpu_to_le32(sizeof(*wsize) - header_len);
0162 wsize->total_ops = cpu_to_le64(m->metric[METRIC_WRITE].total);
0163 wsize->total_size = cpu_to_le64(m->metric[METRIC_WRITE].size_sum);
0164 items++;
0165
0166 put_unaligned_le32(items, &head->num);
0167 msg->front.iov_len = len;
0168 msg->hdr.version = cpu_to_le16(1);
0169 msg->hdr.compat_version = cpu_to_le16(1);
0170 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
0171 ceph_con_send(&s->s_con, msg);
0172
0173 return true;
0174 }
0175
0176
0177 static void metric_get_session(struct ceph_mds_client *mdsc)
0178 {
0179 struct ceph_mds_session *s;
0180 int i;
0181
0182 mutex_lock(&mdsc->mutex);
0183 for (i = 0; i < mdsc->max_sessions; i++) {
0184 s = __ceph_lookup_mds_session(mdsc, i);
0185 if (!s)
0186 continue;
0187
0188
0189
0190
0191
0192
0193 if (check_session_state(s) &&
0194 test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &s->s_features)) {
0195 mdsc->metric.session = s;
0196 break;
0197 }
0198
0199 ceph_put_mds_session(s);
0200 }
0201 mutex_unlock(&mdsc->mutex);
0202 }
0203
0204 static void metric_delayed_work(struct work_struct *work)
0205 {
0206 struct ceph_client_metric *m =
0207 container_of(work, struct ceph_client_metric, delayed_work.work);
0208 struct ceph_mds_client *mdsc =
0209 container_of(m, struct ceph_mds_client, metric);
0210
0211 if (mdsc->stopping)
0212 return;
0213
0214 if (!m->session || !check_session_state(m->session)) {
0215 if (m->session) {
0216 ceph_put_mds_session(m->session);
0217 m->session = NULL;
0218 }
0219 metric_get_session(mdsc);
0220 }
0221 if (m->session) {
0222 ceph_mdsc_send_metrics(mdsc, m->session);
0223 metric_schedule_delayed(m);
0224 }
0225 }
0226
0227 int ceph_metric_init(struct ceph_client_metric *m)
0228 {
0229 struct ceph_metric *metric;
0230 int ret, i;
0231
0232 if (!m)
0233 return -EINVAL;
0234
0235 atomic64_set(&m->total_dentries, 0);
0236 ret = percpu_counter_init(&m->d_lease_hit, 0, GFP_KERNEL);
0237 if (ret)
0238 return ret;
0239
0240 ret = percpu_counter_init(&m->d_lease_mis, 0, GFP_KERNEL);
0241 if (ret)
0242 goto err_d_lease_mis;
0243
0244 atomic64_set(&m->total_caps, 0);
0245 ret = percpu_counter_init(&m->i_caps_hit, 0, GFP_KERNEL);
0246 if (ret)
0247 goto err_i_caps_hit;
0248
0249 ret = percpu_counter_init(&m->i_caps_mis, 0, GFP_KERNEL);
0250 if (ret)
0251 goto err_i_caps_mis;
0252
0253 for (i = 0; i < METRIC_MAX; i++) {
0254 metric = &m->metric[i];
0255 spin_lock_init(&metric->lock);
0256 metric->size_sum = 0;
0257 metric->size_min = U64_MAX;
0258 metric->size_max = 0;
0259 metric->total = 0;
0260 metric->latency_sum = 0;
0261 metric->latency_avg = 0;
0262 metric->latency_sq_sum = 0;
0263 metric->latency_min = KTIME_MAX;
0264 metric->latency_max = 0;
0265 }
0266
0267 atomic64_set(&m->opened_files, 0);
0268 ret = percpu_counter_init(&m->opened_inodes, 0, GFP_KERNEL);
0269 if (ret)
0270 goto err_opened_inodes;
0271 ret = percpu_counter_init(&m->total_inodes, 0, GFP_KERNEL);
0272 if (ret)
0273 goto err_total_inodes;
0274
0275 m->session = NULL;
0276 INIT_DELAYED_WORK(&m->delayed_work, metric_delayed_work);
0277
0278 return 0;
0279
0280 err_total_inodes:
0281 percpu_counter_destroy(&m->opened_inodes);
0282 err_opened_inodes:
0283 percpu_counter_destroy(&m->i_caps_mis);
0284 err_i_caps_mis:
0285 percpu_counter_destroy(&m->i_caps_hit);
0286 err_i_caps_hit:
0287 percpu_counter_destroy(&m->d_lease_mis);
0288 err_d_lease_mis:
0289 percpu_counter_destroy(&m->d_lease_hit);
0290
0291 return ret;
0292 }
0293
0294 void ceph_metric_destroy(struct ceph_client_metric *m)
0295 {
0296 if (!m)
0297 return;
0298
0299 cancel_delayed_work_sync(&m->delayed_work);
0300
0301 percpu_counter_destroy(&m->total_inodes);
0302 percpu_counter_destroy(&m->opened_inodes);
0303 percpu_counter_destroy(&m->i_caps_mis);
0304 percpu_counter_destroy(&m->i_caps_hit);
0305 percpu_counter_destroy(&m->d_lease_mis);
0306 percpu_counter_destroy(&m->d_lease_hit);
0307
0308 ceph_put_mds_session(m->session);
0309 }
0310
0311 #define METRIC_UPDATE_MIN_MAX(min, max, new) \
0312 { \
0313 if (unlikely(new < min)) \
0314 min = new; \
0315 if (unlikely(new > max)) \
0316 max = new; \
0317 }
0318
0319 static inline void __update_mean_and_stdev(ktime_t total, ktime_t *lavg,
0320 ktime_t *sq_sump, ktime_t lat)
0321 {
0322 ktime_t avg;
0323
0324 if (unlikely(total == 1)) {
0325 *lavg = lat;
0326 } else {
0327
0328 avg = *lavg + div64_s64(lat - *lavg, total);
0329 *sq_sump += (lat - *lavg)*(lat - avg);
0330 *lavg = avg;
0331 }
0332 }
0333
0334 void ceph_update_metrics(struct ceph_metric *m,
0335 ktime_t r_start, ktime_t r_end,
0336 unsigned int size, int rc)
0337 {
0338 ktime_t lat = ktime_sub(r_end, r_start);
0339 ktime_t total;
0340
0341 if (unlikely(rc < 0 && rc != -ENOENT && rc != -ETIMEDOUT))
0342 return;
0343
0344 spin_lock(&m->lock);
0345 total = ++m->total;
0346 m->size_sum += size;
0347 METRIC_UPDATE_MIN_MAX(m->size_min, m->size_max, size);
0348 m->latency_sum += lat;
0349 METRIC_UPDATE_MIN_MAX(m->latency_min, m->latency_max, lat);
0350 __update_mean_and_stdev(total, &m->latency_avg, &m->latency_sq_sum,
0351 lat);
0352 spin_unlock(&m->lock);
0353 }