Whamcloud - gitweb
LU-17744 ldiskfs: mballoc stats fixes
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  */
31
32 #define DEBUG_SUBSYSTEM S_LOV
33
34 #include <linux/delay.h>
35 #include <libcfs/libcfs.h>
36
37 #include <obd_class.h>
38 #include "lov_internal.h"
39
40 static void lov_init_set(struct lov_request_set *set)
41 {
42         set->set_count = 0;
43         atomic_set(&set->set_completes, 0);
44         atomic_set(&set->set_success, 0);
45         INIT_LIST_HEAD(&set->set_list);
46 }
47
48 static void lov_finish_set(struct lov_request_set *set)
49 {
50         struct lov_request *req;
51
52         ENTRY;
53         LASSERT(set != NULL);
54         while ((req = list_first_entry_or_null(&set->set_list,
55                                                struct lov_request,
56                                                rq_link)) != NULL) {
57                 list_del_init(&req->rq_link);
58                 if (req->rq_oi.oi_osfs)
59                         OBD_FREE_PTR(req->rq_oi.oi_osfs);
60                 OBD_FREE_PTR(req);
61         }
62
63         OBD_FREE_PTR(set);
64         EXIT;
65 }
66
67 static void
68 lov_update_set(struct lov_request_set *set, struct lov_request *req, int rc)
69 {
70         atomic_inc(&set->set_completes);
71         if (rc == 0)
72                 atomic_inc(&set->set_success);
73 }
74
75 static void
76 lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
77 {
78         list_add_tail(&req->rq_link, &set->set_list);
79         set->set_count++;
80         req->rq_rqset = set;
81 }
82
83 static int lov_check_set(struct lov_obd *lov, int idx)
84 {
85         int rc = 0;
86
87         mutex_lock(&lov->lov_lock);
88
89         if (!lov->lov_tgts[idx] || lov->lov_tgts[idx]->ltd_active ||
90             (lov->lov_tgts[idx]->ltd_exp &&
91              class_exp2cliimp(lov->lov_tgts[idx]->ltd_exp)->imp_connect_tried))
92                 rc = 1;
93
94         mutex_unlock(&lov->lov_lock);
95         return rc;
96 }
97
98 /*
99  * Check if the OSC connection exists and is active.
100  * If the OSC has not yet had a chance to connect to the OST the first time,
101  * wait once for it to connect instead of returning an error.
102  */
103 static int lov_check_and_wait_active(struct lov_obd *lov, int ost_idx)
104 {
105         struct lov_tgt_desc *tgt;
106         struct obd_import *imp = NULL;
107         int rc = 0;
108         int cnt;
109
110         mutex_lock(&lov->lov_lock);
111
112         tgt = lov->lov_tgts[ost_idx];
113
114         if (unlikely(!tgt))
115                 GOTO(out, rc = 0);
116
117         if (likely(tgt->ltd_active))
118                 GOTO(out, rc = 1);
119
120         if (tgt->ltd_exp)
121                 imp = class_exp2cliimp(tgt->ltd_exp);
122         if (imp && imp->imp_connect_tried)
123                 GOTO(out, rc = 0);
124         if (imp && imp->imp_state == LUSTRE_IMP_IDLE)
125                 GOTO(out, rc = 0);
126
127         mutex_unlock(&lov->lov_lock);
128
129         cnt = obd_timeout;
130         while (cnt > 0 &&
131                !lov_check_set(lov, ost_idx)) {
132                 ssleep(1);
133                 cnt -= 1;
134         }
135         if (tgt->ltd_active)
136                 return 1;
137
138         return 0;
139
140 out:
141         mutex_unlock(&lov->lov_lock);
142         return rc;
143 }
144
145 static int
146 lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs, int success)
147 {
148         ENTRY;
149
150         if (success) {
151                 __u32 expected_stripes = lov_get_stripe_count(&obd->u.lov,
152                                                               LOV_MAGIC, 0);
153                 if (osfs->os_files != U64_MAX)
154                         do_div(osfs->os_files, expected_stripes);
155                 if (osfs->os_ffree != U64_MAX)
156                         do_div(osfs->os_ffree, expected_stripes);
157
158                 spin_lock(&obd->obd_osfs_lock);
159                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
160                 obd->obd_osfs_age = ktime_get_seconds();
161                 spin_unlock(&obd->obd_osfs_lock);
162                 RETURN(0);
163         }
164
165         RETURN(-EIO);
166 }
167
168 int lov_fini_statfs_set(struct lov_request_set *set)
169 {
170         int rc = 0;
171         ENTRY;
172
173         if (!set)
174                 RETURN(0);
175
176         if (atomic_read(&set->set_completes)) {
177                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
178                                      atomic_read(&set->set_success));
179         }
180
181         lov_finish_set(set);
182
183         RETURN(rc);
184 }
185
186 static void
187 lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
188                   int success)
189 {
190         int shift = 0, quit = 0;
191         __u64 tmp;
192
193         if (success == 0) {
194                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
195         } else {
196                 if (osfs->os_bsize != lov_sfs->os_bsize) {
197                         /* assume all block sizes are always powers of 2 */
198                         /* get the bits difference */
199                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
200                         for (shift = 0; shift < 32; shift++) {
201                                 if (tmp & 1) {
202                                         if (quit)
203                                                 break;
204                                         quit = 1;
205                                         shift = 0;
206                                 }
207                                 tmp >>= 1;
208                         }
209                 }
210
211                 if (osfs->os_bsize < lov_sfs->os_bsize) {
212                         osfs->os_bsize = lov_sfs->os_bsize;
213
214                         osfs->os_bfree  >>= shift;
215                         osfs->os_bavail >>= shift;
216                         osfs->os_blocks >>= shift;
217                 } else if (shift != 0) {
218                         lov_sfs->os_bfree  >>= shift;
219                         lov_sfs->os_bavail >>= shift;
220                         lov_sfs->os_blocks >>= shift;
221                 }
222 #ifdef MIN_DF
223                 /*
224                  * Sandia requested that df (and so, statfs) only
225                  * returned minimal available space on
226                  * a single OST, so people would be able to
227                  * write this much data guaranteed.
228                  */
229                 if (osfs->os_bavail > lov_sfs->os_bavail) {
230                         /*
231                          * Presumably if new bavail is smaller,
232                          * new bfree is bigger as well
233                          */
234                         osfs->os_bfree = lov_sfs->os_bfree;
235                         osfs->os_bavail = lov_sfs->os_bavail;
236                 }
237 #else
238                 osfs->os_bfree += lov_sfs->os_bfree;
239                 osfs->os_bavail += lov_sfs->os_bavail;
240 #endif
241                 osfs->os_blocks += lov_sfs->os_blocks;
242                 /*
243                  * XXX not sure about this one - depends on policy.
244                  *   - could be minimum if we always stripe on all OBDs
245                  *     (but that would be wrong for any other policy,
246                  *     if one of the OBDs has no more objects left)
247                  *   - could be sum if we stripe whole objects
248                  *   - could be average, just to give a nice number
249                  *
250                  * Currently using the sum capped at U64_MAX.
251                  */
252                 osfs->os_files = osfs->os_files + lov_sfs->os_files < osfs->os_files ?
253                         U64_MAX : osfs->os_files + lov_sfs->os_files;
254                 osfs->os_ffree = osfs->os_ffree + lov_sfs->os_ffree < osfs->os_ffree ?
255                         U64_MAX : osfs->os_ffree + lov_sfs->os_ffree;
256         }
257 }
258
259 /*
260  * The callback for osc_statfs_async that finilizes a request info when a
261  * response is received.
262  */
263 static int cb_statfs_update(void *cookie, int rc)
264 {
265         struct obd_info *oinfo = cookie;
266         struct lov_request *lovreq;
267         struct lov_request_set *set;
268         struct obd_statfs *osfs, *lov_sfs;
269         struct lov_obd *lov;
270         struct lov_tgt_desc *tgt;
271         struct obd_device *lovobd, *tgtobd;
272         int success;
273
274         ENTRY;
275
276         lovreq = container_of(oinfo, struct lov_request, rq_oi);
277         set = lovreq->rq_rqset;
278         lovobd = set->set_obd;
279         lov = &lovobd->u.lov;
280         osfs = set->set_oi->oi_osfs;
281         lov_sfs = oinfo->oi_osfs;
282         success = atomic_read(&set->set_success);
283         /*
284          * XXX: the same is done in lov_update_common_set, however
285          * lovset->set_exp is not initialized.
286          */
287         lov_update_set(set, lovreq, rc);
288         if (rc)
289                 GOTO(out, rc);
290
291         lov_tgts_getref(lovobd);
292         tgt = lov->lov_tgts[lovreq->rq_idx];
293         if (!tgt || !tgt->ltd_active)
294                 GOTO(out_update, rc);
295
296         tgtobd = class_exp2obd(tgt->ltd_exp);
297         spin_lock(&tgtobd->obd_osfs_lock);
298         memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
299         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
300                 tgtobd->obd_osfs_age = ktime_get_seconds();
301         spin_unlock(&tgtobd->obd_osfs_lock);
302
303 out_update:
304         lov_update_statfs(osfs, lov_sfs, success);
305         lov_tgts_putref(lovobd);
306 out:
307         RETURN(0);
308 }
309
310 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
311                         struct lov_request_set **reqset)
312 {
313         struct lov_request_set *set;
314         struct lov_obd *lov = &obd->u.lov;
315         int rc = 0, i;
316
317         ENTRY;
318
319         OBD_ALLOC(set, sizeof(*set));
320         if (!set)
321                 RETURN(-ENOMEM);
322         lov_init_set(set);
323
324         set->set_obd = obd;
325         set->set_oi = oinfo;
326
327         /* We only get block data from the OBD */
328         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
329                 struct lov_tgt_desc *ltd = lov->lov_tgts[i];
330                 struct lov_request *req;
331
332                 if (!ltd) {
333                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
334                         continue;
335                 }
336
337                 /*
338                  * skip targets that have been explicitely disabled by the
339                  * administrator
340                  */
341                 if (!ltd->ltd_exp) {
342                         CDEBUG(D_HA, "lov idx %d administratively disabled\n",
343                                i);
344                         continue;
345                 }
346
347                 if (oinfo->oi_flags & OBD_STATFS_NODELAY &&
348                     class_exp2cliimp(ltd->ltd_exp)->imp_state !=
349                     LUSTRE_IMP_IDLE && !ltd->ltd_active) {
350                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
351                         continue;
352                 }
353
354                 if (!ltd->ltd_active)
355                         lov_check_and_wait_active(lov, i);
356
357                 OBD_ALLOC(req, sizeof(*req));
358                 if (!req)
359                         GOTO(out_set, rc = -ENOMEM);
360
361                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
362                 if (!req->rq_oi.oi_osfs) {
363                         OBD_FREE(req, sizeof(*req));
364                         GOTO(out_set, rc = -ENOMEM);
365                 }
366
367                 req->rq_idx = i;
368                 req->rq_oi.oi_cb_up = cb_statfs_update;
369                 req->rq_oi.oi_flags = oinfo->oi_flags;
370
371                 lov_set_add_req(req, set);
372         }
373         if (!set->set_count)
374                 GOTO(out_set, rc = -EIO);
375         *reqset = set;
376         RETURN(rc);
377 out_set:
378         lov_fini_statfs_set(set);
379         RETURN(rc);
380 }