Whamcloud - gitweb
LU-12072 lov: remove KEY_CACHE_SET to simplify the code
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/lov/lov_obd.c
33  *
34  * Author: Phil Schwan <phil@clusterfs.com>
35  * Author: Peter Braam <braam@clusterfs.com>
36  * Author: Mike Shaver <shaver@clusterfs.com>
37  * Author: Nathan Rutman <nathan@clusterfs.com>
38  */
39
40 #define DEBUG_SUBSYSTEM S_LOV
41 #include <libcfs/libcfs.h>
42
43 #include <cl_object.h>
44 #include <lustre_dlm.h>
45 #include <lustre_fid.h>
46 #include <uapi/linux/lustre/lustre_ioctl.h>
47 #include <lustre_lib.h>
48 #include <lustre_mds.h>
49 #include <lustre_net.h>
50 #include <uapi/linux/lustre/lustre_param.h>
51 #include <lustre_swab.h>
52 #include <lprocfs_status.h>
53 #include <obd_class.h>
54 #include <obd_support.h>
55
56 #include "lov_internal.h"
57
58 /* Keep a refcount of lov->tgt usage to prevent racing with addition/deletion.
59    Any function that expects lov_tgts to remain stationary must take a ref. */
60 void lov_tgts_getref(struct obd_device *obd)
61 {
62         struct lov_obd *lov = &obd->u.lov;
63
64         /* nobody gets through here until lov_putref is done */
65         mutex_lock(&lov->lov_lock);
66         atomic_inc(&lov->lov_refcount);
67         mutex_unlock(&lov->lov_lock);
68         return;
69 }
70
71 static void __lov_del_obd(struct obd_device *obd, struct lov_tgt_desc *tgt);
72
73 void lov_tgts_putref(struct obd_device *obd)
74 {
75         struct lov_obd *lov = &obd->u.lov;
76
77         mutex_lock(&lov->lov_lock);
78         /* ok to dec to 0 more than once -- ltd_exp's will be null */
79         if (atomic_dec_and_test(&lov->lov_refcount) && lov->lov_death_row) {
80                 struct list_head kill = LIST_HEAD_INIT(kill);
81                 struct lov_tgt_desc *tgt, *n;
82                 int i;
83
84                 CDEBUG(D_CONFIG, "destroying %d lov targets\n",
85                        lov->lov_death_row);
86                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
87                         tgt = lov->lov_tgts[i];
88
89                         if (!tgt || !tgt->ltd_reap)
90                                 continue;
91                         list_add(&tgt->ltd_kill, &kill);
92                         /* XXX - right now there is a dependency on ld_tgt_count
93                          * being the maximum tgt index for computing the
94                          * mds_max_easize. So we can't shrink it. */
95                         lov_ost_pool_remove(&lov->lov_packed, i);
96                         lov->lov_tgts[i] = NULL;
97                         lov->lov_death_row--;
98                 }
99                 mutex_unlock(&lov->lov_lock);
100
101                 list_for_each_entry_safe(tgt, n, &kill, ltd_kill) {
102                         list_del(&tgt->ltd_kill);
103                         /* Disconnect */
104                         __lov_del_obd(obd, tgt);
105                 }
106         } else {
107                 mutex_unlock(&lov->lov_lock);
108         }
109 }
110
111 static int lov_set_osc_active(struct obd_device *obd, struct obd_uuid *uuid,
112                               enum obd_notify_event ev);
113 static int lov_notify(struct obd_device *obd, struct obd_device *watched,
114                       enum obd_notify_event ev);
115
116 int lov_connect_osc(struct obd_device *obd, u32 index, int activate,
117                     struct obd_connect_data *data)
118 {
119         struct lov_obd *lov = &obd->u.lov;
120         struct obd_uuid *tgt_uuid;
121         struct obd_device *tgt_obd;
122         static struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
123         struct obd_import *imp;
124         int rc;
125         ENTRY;
126
127         if (lov->lov_tgts[index] == NULL)
128                 RETURN(-EINVAL);
129
130         tgt_uuid = &lov->lov_tgts[index]->ltd_uuid;
131         tgt_obd = lov->lov_tgts[index]->ltd_obd;
132
133         if (!tgt_obd->obd_set_up) {
134                 CERROR("Target %s not set up\n", obd_uuid2str(tgt_uuid));
135                 RETURN(-EINVAL);
136         }
137
138         /* override the sp_me from lov */
139         tgt_obd->u.cli.cl_sp_me = lov->lov_sp_me;
140
141         if (data && (data->ocd_connect_flags & OBD_CONNECT_INDEX))
142                 data->ocd_index = index;
143
144         /*
145          * Divine LOV knows that OBDs under it are OSCs.
146          */
147         imp = tgt_obd->u.cli.cl_import;
148
149         if (activate) {
150                 tgt_obd->obd_no_recov = 0;
151                 /* FIXME this is probably supposed to be
152                    ptlrpc_set_import_active.  Horrible naming. */
153                 ptlrpc_activate_import(imp);
154         }
155
156         rc = obd_register_observer(tgt_obd, obd);
157         if (rc) {
158                 CERROR("Target %s register_observer error %d\n",
159                        obd_uuid2str(tgt_uuid), rc);
160                 RETURN(rc);
161         }
162
163         if (imp->imp_invalid) {
164                 CDEBUG(D_CONFIG, "%s: not connecting - administratively disabled\n",
165                        obd_uuid2str(tgt_uuid));
166                 RETURN(0);
167         }
168
169         rc = obd_connect(NULL, &lov->lov_tgts[index]->ltd_exp, tgt_obd,
170                          &lov_osc_uuid, data, lov->lov_cache);
171         if (rc || !lov->lov_tgts[index]->ltd_exp) {
172                 CERROR("Target %s connect error %d\n",
173                        obd_uuid2str(tgt_uuid), rc);
174                 RETURN(-ENODEV);
175         }
176
177         lov->lov_tgts[index]->ltd_reap = 0;
178
179         CDEBUG(D_CONFIG, "Connected tgt idx %d %s (%s) %sactive\n", index,
180                obd_uuid2str(tgt_uuid), tgt_obd->obd_name, activate ? "":"in");
181
182         if (lov->lov_tgts_kobj) {
183                 /* Even if we failed, that's ok */
184                 rc = sysfs_create_link(lov->lov_tgts_kobj,
185                                        &tgt_obd->obd_kset.kobj,
186                                        tgt_obd->obd_name);
187                 if (rc) {
188                         CERROR("%s: can't register LOV target /sys/fs/lustre/%s/%s/target_obds/%s : rc = %d\n",
189                                obd->obd_name, obd->obd_type->typ_name,
190                                obd->obd_name,
191                                lov->lov_tgts[index]->ltd_exp->exp_obd->obd_name,
192                                rc);
193                 }
194         }
195         RETURN(0);
196 }
197
198 static int lov_connect(const struct lu_env *env,
199                        struct obd_export **exp, struct obd_device *obd,
200                        struct obd_uuid *cluuid, struct obd_connect_data *data,
201                        void *localdata)
202 {
203         struct lov_obd *lov = &obd->u.lov;
204         struct lov_tgt_desc *tgt;
205         struct lustre_handle conn;
206         int i, rc;
207         ENTRY;
208
209         CDEBUG(D_CONFIG, "connect #%d\n", lov->lov_connects);
210
211         rc = class_connect(&conn, obd, cluuid);
212         if (rc)
213                 RETURN(rc);
214
215         *exp = class_conn2export(&conn);
216
217         /* Why should there ever be more than 1 connect? */
218         lov->lov_connects++;
219         LASSERT(lov->lov_connects == 1);
220
221         memset(&lov->lov_ocd, 0, sizeof(lov->lov_ocd));
222         if (data)
223                 lov->lov_ocd = *data;
224
225         lov_tgts_getref(obd);
226
227         if (localdata) {
228                 lov->lov_cache = localdata;
229                 cl_cache_incref(lov->lov_cache);
230         }
231
232         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
233                 tgt = lov->lov_tgts[i];
234                 if (!tgt || obd_uuid_empty(&tgt->ltd_uuid))
235                         continue;
236                 /* Flags will be lowest common denominator */
237                 rc = lov_connect_osc(obd, i, tgt->ltd_activate, &lov->lov_ocd);
238                 if (rc) {
239                         CERROR("%s: lov connect tgt %d failed: %d\n",
240                                obd->obd_name, i, rc);
241                         continue;
242                 }
243                 /* connect to administrative disabled ost */
244                 if (!lov->lov_tgts[i]->ltd_exp)
245                         continue;
246
247                 rc = lov_notify(obd, lov->lov_tgts[i]->ltd_exp->exp_obd,
248                                 OBD_NOTIFY_CONNECT);
249                 if (rc) {
250                         CERROR("%s error sending notify %d\n",
251                                obd->obd_name, rc);
252                 }
253         }
254
255         lov_tgts_putref(obd);
256
257         RETURN(0);
258 }
259
260 static int lov_disconnect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt)
261 {
262         struct lov_obd *lov = &obd->u.lov;
263         struct obd_device *osc_obd;
264         int rc;
265         ENTRY;
266
267         osc_obd = class_exp2obd(tgt->ltd_exp);
268         CDEBUG(D_CONFIG, "%s: disconnecting target %s\n",
269                obd->obd_name, osc_obd->obd_name);
270
271         if (tgt->ltd_active) {
272                 tgt->ltd_active = 0;
273                 lov->desc.ld_active_tgt_count--;
274                 tgt->ltd_exp->exp_obd->obd_inactive = 1;
275         }
276
277         if (osc_obd) {
278                 if (lov->lov_tgts_kobj)
279                         sysfs_remove_link(lov->lov_tgts_kobj,
280                                           osc_obd->obd_name);
281
282                 /* Pass it on to our clients.
283                  * XXX This should be an argument to disconnect,
284                  * XXX not a back-door flag on the OBD.  Ah well.
285                  */
286                 osc_obd->obd_force = obd->obd_force;
287                 osc_obd->obd_fail = obd->obd_fail;
288                 osc_obd->obd_no_recov = obd->obd_no_recov;
289
290                 if (lov->targets_proc_entry != NULL)
291                         lprocfs_remove_proc_entry(osc_obd->obd_name,
292                                                   lov->targets_proc_entry);
293         }
294
295         obd_register_observer(osc_obd, NULL);
296
297         rc = obd_disconnect(tgt->ltd_exp);
298         if (rc) {
299                 CERROR("Target %s disconnect error %d\n",
300                        tgt->ltd_uuid.uuid, rc);
301                 rc = 0;
302         }
303
304         tgt->ltd_exp = NULL;
305         RETURN(0);
306 }
307
308 static int lov_disconnect(struct obd_export *exp)
309 {
310         struct obd_device *obd = class_exp2obd(exp);
311         struct lov_obd *lov = &obd->u.lov;
312         u32 index;
313         int rc;
314
315         ENTRY;
316         if (!lov->lov_tgts)
317                 goto out;
318
319         /* Only disconnect the underlying layers on the final disconnect. */
320         lov->lov_connects--;
321         if (lov->lov_connects != 0) {
322                 /* why should there be more than 1 connect? */
323                 CWARN("%s: unexpected disconnect #%d\n",
324                       obd->obd_name, lov->lov_connects);
325                 goto out;
326         }
327
328         /* hold another ref so lov_del_obd() doesn't spin in putref each time */
329         lov_tgts_getref(obd);
330
331         for (index = 0; index < lov->desc.ld_tgt_count; index++) {
332                 if (lov->lov_tgts[index] && lov->lov_tgts[index]->ltd_exp) {
333                         /* Disconnection is the last we know about an OBD */
334                         lov_del_target(obd, index, NULL,
335                                        lov->lov_tgts[index]->ltd_gen);
336                 }
337         }
338         lov_tgts_putref(obd);
339
340 out:
341         rc = class_disconnect(exp); /* bz 9811 */
342         RETURN(rc);
343 }
344
345 /* Error codes:
346  *
347  *  -EINVAL  : UUID can't be found in the LOV's target list
348  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
349  *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
350  *  any >= 0 : is log target index
351  */
352 static int lov_set_osc_active(struct obd_device *obd, struct obd_uuid *uuid,
353                               enum obd_notify_event ev)
354 {
355         struct lov_obd *lov = &obd->u.lov;
356         struct lov_tgt_desc *tgt;
357         int index, activate, active;
358         ENTRY;
359
360         CDEBUG(D_INFO, "Searching in lov %p for uuid %s event(%d)\n",
361                lov, uuid->uuid, ev);
362
363         lov_tgts_getref(obd);
364         for (index = 0; index < lov->desc.ld_tgt_count; index++) {
365                 tgt = lov->lov_tgts[index];
366                 if (!tgt)
367                         continue;
368                 if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
369                         break;
370         }
371
372         if (index == lov->desc.ld_tgt_count)
373                 GOTO(out, index = -EINVAL);
374
375         if (ev == OBD_NOTIFY_DEACTIVATE || ev == OBD_NOTIFY_ACTIVATE) {
376                 activate = (ev == OBD_NOTIFY_ACTIVATE) ? 1 : 0;
377
378                 /*
379                  * LU-642, initially inactive OSC could miss the obd_connect,
380                  * we make up for it here.
381                  */
382                 if (activate && !tgt->ltd_exp) {
383                         int rc;
384                         struct obd_uuid lov_osc_uuid = {"LOV_OSC_UUID"};
385
386                         rc = obd_connect(NULL, &tgt->ltd_exp, tgt->ltd_obd,
387                                          &lov_osc_uuid, &lov->lov_ocd,
388                                          lov->lov_cache);
389                         if (rc || tgt->ltd_exp == NULL)
390                                 GOTO(out, index = rc);
391                 }
392
393                 if (lov->lov_tgts[index]->ltd_activate == activate) {
394                         CDEBUG(D_INFO, "OSC %s already %sactivate!\n",
395                                uuid->uuid, activate ? "" : "de");
396                 } else {
397                         lov->lov_tgts[index]->ltd_activate = activate;
398                         CDEBUG(D_CONFIG, "%sactivate OSC %s\n",
399                                activate ? "" : "de", obd_uuid2str(uuid));
400                 }
401
402         } else if (ev == OBD_NOTIFY_INACTIVE || ev == OBD_NOTIFY_ACTIVE) {
403                 active = (ev == OBD_NOTIFY_ACTIVE) ? 1 : 0;
404
405                 if (lov->lov_tgts[index]->ltd_active == active) {
406                         CDEBUG(D_INFO, "OSC %s already %sactive!\n",
407                                uuid->uuid, active ? "" : "in");
408                         GOTO(out, index);
409                 } else {
410                         CDEBUG(D_CONFIG, "Marking OSC %s %sactive\n",
411                                obd_uuid2str(uuid), active ? "" : "in");
412                 }
413
414                 lov->lov_tgts[index]->ltd_active = active;
415                 if (active) {
416                         lov->desc.ld_active_tgt_count++;
417                         lov->lov_tgts[index]->ltd_exp->exp_obd->obd_inactive = 0;
418                 } else {
419                         lov->desc.ld_active_tgt_count--;
420                         lov->lov_tgts[index]->ltd_exp->exp_obd->obd_inactive = 1;
421                 }
422         } else {
423                 CERROR("%s: unknown event %d for uuid %s\n", obd->obd_name,
424                        ev, uuid->uuid);
425         }
426
427         if (tgt->ltd_exp)
428                 CDEBUG(D_INFO, "%s: lov idx %d conn %llx\n", obd_uuid2str(uuid),
429                        index, tgt->ltd_exp->exp_handle.h_cookie);
430
431  out:
432         lov_tgts_putref(obd);
433         RETURN(index);
434 }
435
436 static int lov_notify(struct obd_device *obd, struct obd_device *watched,
437                       enum obd_notify_event ev)
438 {
439         int rc = 0;
440         struct lov_obd *lov = &obd->u.lov;
441         ENTRY;
442
443         down_read(&lov->lov_notify_lock);
444         if (!lov->lov_connects)
445                 GOTO(out_notify_lock, rc = 0);
446
447         if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE ||
448             ev == OBD_NOTIFY_ACTIVATE || ev == OBD_NOTIFY_DEACTIVATE) {
449                 struct obd_uuid *uuid;
450
451                 LASSERT(watched);
452
453                 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME)) {
454                         CERROR("unexpected notification of %s %s\n",
455                                watched->obd_type->typ_name, watched->obd_name);
456                         GOTO(out_notify_lock, rc = -EINVAL);
457                 }
458
459                 uuid = &watched->u.cli.cl_target_uuid;
460
461                 /* Set OSC as active before notifying the observer, so the
462                  * observer can use the OSC normally.
463                  */
464                 rc = lov_set_osc_active(obd, uuid, ev);
465                 if (rc < 0) {
466                         CERROR("%s: event %d failed: rc = %d\n", obd->obd_name,
467                                ev, rc);
468                         GOTO(out_notify_lock, rc);
469                 }
470         }
471
472         /* Pass the notification up the chain. */
473         rc = obd_notify_observer(obd, watched, ev);
474
475 out_notify_lock:
476         up_read(&lov->lov_notify_lock);
477
478         RETURN(rc);
479 }
480
481 static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
482                           u32 index, int gen, int active)
483 {
484         struct lov_obd *lov = &obd->u.lov;
485         struct lov_tgt_desc *tgt;
486         struct obd_device *tgt_obd;
487         int rc;
488
489         ENTRY;
490         CDEBUG(D_CONFIG, "uuid:%s idx:%u gen:%d active:%d\n",
491                uuidp->uuid, index, gen, active);
492
493         if (gen <= 0) {
494                 CERROR("%s: request to add '%s' with invalid generation: %d\n",
495                        obd->obd_name, uuidp->uuid, gen);
496                 RETURN(-EINVAL);
497         }
498
499         tgt_obd = class_find_client_obd(uuidp, LUSTRE_OSC_NAME, &obd->obd_uuid);
500         if (tgt_obd == NULL)
501                 RETURN(-EINVAL);
502
503         mutex_lock(&lov->lov_lock);
504
505         if ((index < lov->lov_tgt_size) && (lov->lov_tgts[index] != NULL)) {
506                 tgt = lov->lov_tgts[index];
507                 rc = -EEXIST;
508                 CERROR("%s: UUID %s already assigned at index %d: rc = %d\n",
509                        obd->obd_name, obd_uuid2str(&tgt->ltd_uuid), index, rc);
510                 mutex_unlock(&lov->lov_lock);
511                 RETURN(rc);
512         }
513
514         if (index >= lov->lov_tgt_size) {
515                 /* We need to reallocate the lov target array. */
516                 struct lov_tgt_desc **newtgts, **old = NULL;
517                 __u32 newsize, oldsize = 0;
518
519                 newsize = max(lov->lov_tgt_size, (__u32)2);
520                 while (newsize < index + 1)
521                         newsize = newsize << 1;
522                 OBD_ALLOC(newtgts, sizeof(*newtgts) * newsize);
523                 if (newtgts == NULL) {
524                         mutex_unlock(&lov->lov_lock);
525                         RETURN(-ENOMEM);
526                 }
527
528                 if (lov->lov_tgt_size) {
529                         memcpy(newtgts, lov->lov_tgts, sizeof(*newtgts) *
530                                lov->lov_tgt_size);
531                         old = lov->lov_tgts;
532                         oldsize = lov->lov_tgt_size;
533                 }
534
535                 lov->lov_tgts = newtgts;
536                 lov->lov_tgt_size = newsize;
537                 smp_rmb();
538                 if (old)
539                         OBD_FREE(old, sizeof(*old) * oldsize);
540
541                 CDEBUG(D_CONFIG, "tgts: %p size: %d\n",
542                        lov->lov_tgts, lov->lov_tgt_size);
543         }
544
545         OBD_ALLOC_PTR(tgt);
546         if (!tgt) {
547                 mutex_unlock(&lov->lov_lock);
548                 RETURN(-ENOMEM);
549         }
550
551         rc = lov_ost_pool_add(&lov->lov_packed, index, lov->lov_tgt_size);
552         if (rc) {
553                 mutex_unlock(&lov->lov_lock);
554                 OBD_FREE_PTR(tgt);
555                 RETURN(rc);
556         }
557
558         tgt->ltd_uuid = *uuidp;
559         tgt->ltd_obd = tgt_obd;
560         /* XXX - add a sanity check on the generation number. */
561         tgt->ltd_gen = gen;
562         tgt->ltd_index = index;
563         tgt->ltd_activate = active;
564         lov->lov_tgts[index] = tgt;
565         if (index >= lov->desc.ld_tgt_count)
566                 lov->desc.ld_tgt_count = index + 1;
567
568         mutex_unlock(&lov->lov_lock);
569
570         CDEBUG(D_CONFIG, "idx=%d ltd_gen=%d ld_tgt_count=%d\n",
571                 index, tgt->ltd_gen, lov->desc.ld_tgt_count);
572
573         if (lov->lov_connects == 0) {
574                 /* lov_connect hasn't been called yet. We'll do the
575                    lov_connect_osc on this target when that fn first runs,
576                    because we don't know the connect flags yet. */
577                 RETURN(0);
578         }
579
580         lov_tgts_getref(obd);
581
582         rc = lov_connect_osc(obd, index, active, &lov->lov_ocd);
583         if (rc)
584                 GOTO(out, rc);
585
586         /* connect to administrative disabled ost */
587         if (!tgt->ltd_exp)
588                 GOTO(out, rc = 0);
589
590         rc = lov_notify(obd, tgt->ltd_exp->exp_obd,
591                         active ? OBD_NOTIFY_CONNECT : OBD_NOTIFY_INACTIVE);
592
593 out:
594         if (rc) {
595                 CERROR("%s: add failed, deleting %s: rc = %d\n",
596                        obd->obd_name, obd_uuid2str(&tgt->ltd_uuid), rc);
597                 lov_del_target(obd, index, NULL, 0);
598         }
599         lov_tgts_putref(obd);
600         RETURN(rc);
601 }
602
603 /* Schedule a target for deletion */
604 int lov_del_target(struct obd_device *obd, u32 index,
605                    struct obd_uuid *uuidp, int gen)
606 {
607         struct lov_obd *lov = &obd->u.lov;
608         int count = lov->desc.ld_tgt_count;
609         int rc = 0;
610         ENTRY;
611
612         if (index >= count) {
613                 CERROR("LOV target index %d >= number of LOV OBDs %d.\n",
614                        index, count);
615                 RETURN(-EINVAL);
616         }
617
618         /* to make sure there's no ongoing lov_notify() now */
619         down_write(&lov->lov_notify_lock);
620         lov_tgts_getref(obd);
621
622         if (!lov->lov_tgts[index]) {
623                 CERROR("LOV target at index %d is not setup.\n", index);
624                 GOTO(out, rc = -EINVAL);
625         }
626
627         if (uuidp && !obd_uuid_equals(uuidp, &lov->lov_tgts[index]->ltd_uuid)) {
628                 CERROR("LOV target UUID %s at index %d doesn't match %s.\n",
629                        lov_uuid2str(lov, index), index,
630                        obd_uuid2str(uuidp));
631                 GOTO(out, rc = -EINVAL);
632         }
633
634         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d exp: %p active: %d\n",
635                lov_uuid2str(lov, index), index,
636                lov->lov_tgts[index]->ltd_gen, lov->lov_tgts[index]->ltd_exp,
637                lov->lov_tgts[index]->ltd_active);
638
639         lov->lov_tgts[index]->ltd_reap = 1;
640         lov->lov_death_row++;
641         /* we really delete it from lov_tgts_putref() */
642 out:
643         lov_tgts_putref(obd);
644         up_write(&lov->lov_notify_lock);
645
646         RETURN(rc);
647 }
648
649 static void __lov_del_obd(struct obd_device *obd, struct lov_tgt_desc *tgt)
650 {
651         struct obd_device *osc_obd;
652
653         LASSERT(tgt);
654         LASSERT(tgt->ltd_reap);
655
656         osc_obd = class_exp2obd(tgt->ltd_exp);
657
658         CDEBUG(D_CONFIG, "Removing tgt %s : %s\n",
659                tgt->ltd_uuid.uuid,
660                osc_obd ? osc_obd->obd_name : "<no obd>");
661
662         if (tgt->ltd_exp)
663                 lov_disconnect_obd(obd, tgt);
664
665         OBD_FREE_PTR(tgt);
666
667         /* Manual cleanup - no cleanup logs to clean up the osc's.  We must
668            do it ourselves. And we can't do it from lov_cleanup,
669            because we just lost our only reference to it. */
670         if (osc_obd)
671                 class_manual_cleanup(osc_obd);
672 }
673
674 void lov_fix_desc_stripe_size(__u64 *val)
675 {
676         if (*val < LOV_MIN_STRIPE_SIZE) {
677                 if (*val != 0)
678                         LCONSOLE_INFO("Increasing default stripe size to "
679                                       "minimum %u\n",
680                                       LOV_DESC_STRIPE_SIZE_DEFAULT);
681                 *val = LOV_DESC_STRIPE_SIZE_DEFAULT;
682         } else if (*val & (LOV_MIN_STRIPE_SIZE - 1)) {
683                 *val &= ~(LOV_MIN_STRIPE_SIZE - 1);
684                 LCONSOLE_WARN("Changing default stripe size to %llu (a "
685                               "multiple of %u)\n",
686                               *val, LOV_MIN_STRIPE_SIZE);
687         }
688 }
689
690 void lov_fix_desc_stripe_count(__u32 *val)
691 {
692         if (*val == 0)
693                 *val = 1;
694 }
695
696 void lov_fix_desc_pattern(__u32 *val)
697 {
698         /* from lov_setstripe */
699         if ((*val != 0) && (*val != LOV_PATTERN_RAID0)) {
700                 LCONSOLE_WARN("Unknown stripe pattern: %#x\n", *val);
701                 *val = 0;
702         }
703 }
704
705 void lov_fix_desc_qos_maxage(__u32 *val)
706 {
707         if (*val == 0)
708                 *val = LOV_DESC_QOS_MAXAGE_DEFAULT;
709 }
710
711 void lov_fix_desc(struct lov_desc *desc)
712 {
713         lov_fix_desc_stripe_size(&desc->ld_default_stripe_size);
714         lov_fix_desc_stripe_count(&desc->ld_default_stripe_count);
715         lov_fix_desc_pattern(&desc->ld_pattern);
716         lov_fix_desc_qos_maxage(&desc->ld_qos_maxage);
717 }
718
719 int lov_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
720 {
721         struct lov_desc *desc;
722         struct lov_obd *lov = &obd->u.lov;
723         int rc;
724         ENTRY;
725
726         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
727                 CERROR("LOV setup requires a descriptor\n");
728                 RETURN(-EINVAL);
729         }
730
731         desc = (struct lov_desc *)lustre_cfg_buf(lcfg, 1);
732
733         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
734                 CERROR("descriptor size wrong: %d > %d\n",
735                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
736                 RETURN(-EINVAL);
737         }
738
739         if (desc->ld_magic != LOV_DESC_MAGIC) {
740                 if (desc->ld_magic == __swab32(LOV_DESC_MAGIC)) {
741                             CDEBUG(D_OTHER, "%s: Swabbing lov desc %p\n",
742                                    obd->obd_name, desc);
743                             lustre_swab_lov_desc(desc);
744                 } else {
745                         CERROR("%s: Bad lov desc magic: %#x\n",
746                                obd->obd_name, desc->ld_magic);
747                         RETURN(-EINVAL);
748                 }
749         }
750
751         lov_fix_desc(desc);
752
753         desc->ld_active_tgt_count = 0;
754         lov->desc = *desc;
755         lov->lov_tgt_size = 0;
756
757         mutex_init(&lov->lov_lock);
758         atomic_set(&lov->lov_refcount, 0);
759         lov->lov_sp_me = LUSTRE_SP_CLI;
760
761         init_rwsem(&lov->lov_notify_lock);
762
763         lov->lov_pools_hash_body = cfs_hash_create("POOLS", HASH_POOLS_CUR_BITS,
764                                                    HASH_POOLS_MAX_BITS,
765                                                    HASH_POOLS_BKT_BITS, 0,
766                                                    CFS_HASH_MIN_THETA,
767                                                    CFS_HASH_MAX_THETA,
768                                                    &pool_hash_operations,
769                                                    CFS_HASH_DEFAULT);
770         INIT_LIST_HEAD(&lov->lov_pool_list);
771         lov->lov_pool_count = 0;
772         rc = lov_ost_pool_init(&lov->lov_packed, 0);
773         if (rc)
774                 GOTO(out, rc);
775
776         rc = lov_tunables_init(obd);
777         if (rc)
778                 GOTO(out, rc);
779
780         lov->lov_tgts_kobj = kobject_create_and_add("target_obds",
781                                                     &obd->obd_kset.kobj);
782
783 out:
784         return rc;
785 }
786
787 static int lov_cleanup(struct obd_device *obd)
788 {
789         struct lov_obd *lov = &obd->u.lov;
790         struct list_head *pos, *tmp;
791         struct pool_desc *pool;
792         ENTRY;
793
794         if (lov->lov_tgts_kobj) {
795                 kobject_put(lov->lov_tgts_kobj);
796                 lov->lov_tgts_kobj = NULL;
797         }
798
799         list_for_each_safe(pos, tmp, &lov->lov_pool_list) {
800                 pool = list_entry(pos, struct pool_desc, pool_list);
801                 /* free pool structs */
802                 CDEBUG(D_INFO, "delete pool %p\n", pool);
803                 /* In the function below, .hs_keycmp resolves to
804                  * pool_hashkey_keycmp() */
805                 /* coverity[overrun-buffer-val] */
806                 lov_pool_del(obd, pool->pool_name);
807         }
808         cfs_hash_putref(lov->lov_pools_hash_body);
809         lov_ost_pool_free(&lov->lov_packed);
810
811         lprocfs_obd_cleanup(obd);
812         if (lov->lov_tgts) {
813                 int i;
814                 lov_tgts_getref(obd);
815                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
816                         if (!lov->lov_tgts[i])
817                                 continue;
818
819                         /* Inactive targets may never have connected */
820                         if (lov->lov_tgts[i]->ltd_active ||
821                             atomic_read(&lov->lov_refcount))
822                                 /* We should never get here - these
823                                  * should have been removed in the
824                                  * disconnect. */
825                                 CERROR("%s: lov tgt %d not cleaned! "
826                                        "deathrow=%d, lovrc=%d\n",
827                                        obd->obd_name, i, lov->lov_death_row,
828                                        atomic_read(&lov->lov_refcount));
829                         lov_del_target(obd, i, NULL, 0);
830                 }
831                 lov_tgts_putref(obd);
832                 OBD_FREE(lov->lov_tgts, sizeof(*lov->lov_tgts) *
833                          lov->lov_tgt_size);
834                 lov->lov_tgt_size = 0;
835         }
836
837         if (lov->lov_cache != NULL) {
838                 cl_cache_decref(lov->lov_cache);
839                 lov->lov_cache = NULL;
840         }
841
842         RETURN(0);
843 }
844
845 int lov_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg,
846                             u32 *indexp, int *genp)
847 {
848         struct obd_uuid obd_uuid;
849         int cmd;
850         int rc = 0;
851
852         ENTRY;
853         switch (cmd = lcfg->lcfg_command) {
854         case LCFG_ADD_MDC:
855         case LCFG_DEL_MDC:
856                 break;
857         case LCFG_LOV_ADD_OBD:
858         case LCFG_LOV_ADD_INA:
859         case LCFG_LOV_DEL_OBD: {
860                 u32 index;
861                 int gen;
862
863                 /* lov_modify_tgts add  0:lov_mdsA  1:ost1_UUID  2:0  3:1 */
864                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid))
865                         GOTO(out, rc = -EINVAL);
866
867                 obd_str2uuid(&obd_uuid,  lustre_cfg_buf(lcfg, 1));
868
869                 rc = kstrtou32(lustre_cfg_buf(lcfg, 2), 10, indexp);
870                 if (rc)
871                         GOTO(out, rc);
872                 rc = kstrtoint(lustre_cfg_buf(lcfg, 3), 10, genp);
873                 if (rc)
874                         GOTO(out, rc);
875                 index = *indexp;
876                 gen = *genp;
877                 if (cmd == LCFG_LOV_ADD_OBD)
878                         rc = lov_add_target(obd, &obd_uuid, index, gen, 1);
879                 else if (cmd == LCFG_LOV_ADD_INA)
880                         rc = lov_add_target(obd, &obd_uuid, index, gen, 0);
881                 else
882                         rc = lov_del_target(obd, index, &obd_uuid, gen);
883
884                 GOTO(out, rc);
885         }
886         case LCFG_PARAM: {
887                 struct lov_desc *desc = &(obd->u.lov.desc);
888                 ssize_t count;
889
890                 if (!desc)
891                         GOTO(out, rc = -EINVAL);
892
893                 count = class_modify_config(lcfg, PARAM_LOV,
894                                             &obd->obd_kset.kobj);
895                 GOTO(out, rc = count < 0 ? count : 0);
896         }
897         case LCFG_POOL_NEW:
898         case LCFG_POOL_ADD:
899         case LCFG_POOL_DEL:
900         case LCFG_POOL_REM:
901                 GOTO(out, rc);
902
903         default: {
904                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
905                 GOTO(out, rc = -EINVAL);
906
907         }
908         }
909 out:
910         RETURN(rc);
911 }
912
913 static int lov_statfs(const struct lu_env *env, struct obd_export *exp,
914                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
915 {
916         struct obd_device *obd = class_exp2obd(exp);
917         struct lov_obd *lov = &obd->u.lov;
918         struct obd_info oinfo = {
919                 .oi_osfs = osfs,
920                 .oi_flags = flags,
921         };
922         struct ptlrpc_request_set *rqset;
923         struct lov_request_set *set = NULL;
924         struct lov_request *req;
925         int rc = 0;
926         int rc2;
927
928         ENTRY;
929
930         rqset = ptlrpc_prep_set();
931         if (rqset == NULL)
932                 RETURN(-ENOMEM);
933
934         rc = lov_prep_statfs_set(obd, &oinfo, &set);
935         if (rc < 0)
936                 GOTO(out_rqset, rc);
937
938         list_for_each_entry(req, &set->set_list, rq_link) {
939                 rc = obd_statfs_async(lov->lov_tgts[req->rq_idx]->ltd_exp,
940                                       &req->rq_oi, max_age, rqset);
941                 if (rc < 0)
942                         GOTO(out_set, rc);
943         }
944
945         rc = ptlrpc_set_wait(env, rqset);
946
947 out_set:
948         if (rc < 0)
949                 atomic_set(&set->set_completes, 0);
950
951         rc2 = lov_fini_statfs_set(set);
952         if (rc == 0)
953                 rc = rc2;
954
955 out_rqset:
956         ptlrpc_set_destroy(rqset);
957
958         RETURN(rc);
959 }
960
961 static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
962                          void *karg, void __user *uarg)
963 {
964         struct obd_device *obd = class_exp2obd(exp);
965         struct lov_obd *lov = &obd->u.lov;
966         int i = 0, rc = 0, count = lov->desc.ld_tgt_count;
967         struct obd_uuid *uuidp;
968
969         ENTRY;
970         switch (cmd) {
971         case IOC_OBD_STATFS: {
972                 struct obd_ioctl_data *data = karg;
973                 struct obd_device *osc_obd;
974                 struct obd_statfs stat_buf = {0};
975                 struct obd_import *imp;
976                 __u32 index;
977                 __u32 flags;
978
979                 memcpy(&index, data->ioc_inlbuf2, sizeof(index));
980                 if (index >= count)
981                         RETURN(-ENODEV);
982
983                 if (!lov->lov_tgts[index])
984                         /* Try again with the next index */
985                         RETURN(-EAGAIN);
986
987                 osc_obd = class_exp2obd(lov->lov_tgts[index]->ltd_exp);
988                 if (!osc_obd)
989                         RETURN(-EINVAL);
990
991                 imp = osc_obd->u.cli.cl_import;
992                 if (!lov->lov_tgts[index]->ltd_active &&
993                     imp->imp_state != LUSTRE_IMP_IDLE)
994                         RETURN(-ENODATA);
995
996                 /* copy UUID */
997                 if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(osc_obd),
998                                  min_t(unsigned long, data->ioc_plen2,
999                                        sizeof(struct obd_uuid))))
1000                         RETURN(-EFAULT);
1001
1002                 memcpy(&flags, data->ioc_inlbuf1, sizeof(flags));
1003                 flags = flags & LL_STATFS_NODELAY ? OBD_STATFS_NODELAY : 0;
1004
1005                 /* got statfs data */
1006                 rc = obd_statfs(NULL, lov->lov_tgts[index]->ltd_exp, &stat_buf,
1007                                 ktime_get_seconds() - OBD_STATFS_CACHE_SECONDS,
1008                                 flags);
1009                 if (rc)
1010                         RETURN(rc);
1011                 if (copy_to_user(data->ioc_pbuf1, &stat_buf,
1012                                  min_t(unsigned long, data->ioc_plen1,
1013                                        sizeof(struct obd_statfs))))
1014                         RETURN(-EFAULT);
1015                 break;
1016         }
1017         case OBD_IOC_LOV_GET_CONFIG: {
1018                 struct obd_ioctl_data *data;
1019                 struct lov_desc *desc;
1020                 char *buf = NULL;
1021                 __u32 *genp;
1022
1023                 len = 0;
1024                 if (obd_ioctl_getdata(&buf, &len, uarg))
1025                         RETURN(-EINVAL);
1026
1027                 data = (struct obd_ioctl_data *)buf;
1028
1029                 if (sizeof(*desc) > data->ioc_inllen1) {
1030                         OBD_FREE_LARGE(buf, len);
1031                         RETURN(-EINVAL);
1032                 }
1033
1034                 if (sizeof(uuidp->uuid) * count > data->ioc_inllen2) {
1035                         OBD_FREE_LARGE(buf, len);
1036                         RETURN(-EINVAL);
1037                 }
1038
1039                 if (sizeof(__u32) * count > data->ioc_inllen3) {
1040                         OBD_FREE_LARGE(buf, len);
1041                         RETURN(-EINVAL);
1042                 }
1043
1044                 desc = (struct lov_desc *)data->ioc_inlbuf1;
1045                 memcpy(desc, &(lov->desc), sizeof(*desc));
1046
1047                 uuidp = (struct obd_uuid *)data->ioc_inlbuf2;
1048                 genp = (__u32 *)data->ioc_inlbuf3;
1049                 /* the uuid will be empty for deleted OSTs */
1050                 for (i = 0; i < count; i++, uuidp++, genp++) {
1051                         if (!lov->lov_tgts[i])
1052                                 continue;
1053                         *uuidp = lov->lov_tgts[i]->ltd_uuid;
1054                         *genp = lov->lov_tgts[i]->ltd_gen;
1055                 }
1056
1057                 if (copy_to_user(uarg, buf, len))
1058                         rc = -EFAULT;
1059                 OBD_FREE_LARGE(buf, len);
1060                 break;
1061         }
1062         case OBD_IOC_QUOTACTL: {
1063                 struct if_quotactl *qctl = karg;
1064                 struct lov_tgt_desc *tgt = NULL;
1065                 struct obd_quotactl *oqctl;
1066
1067                 if (qctl->qc_valid == QC_OSTIDX) {
1068                         if (count <= qctl->qc_idx)
1069                                 RETURN(-EINVAL);
1070
1071                         tgt = lov->lov_tgts[qctl->qc_idx];
1072                         if (!tgt || !tgt->ltd_exp)
1073                                 RETURN(-EINVAL);
1074                 } else if (qctl->qc_valid == QC_UUID) {
1075                         for (i = 0; i < count; i++) {
1076                                 tgt = lov->lov_tgts[i];
1077                                 if (!tgt ||
1078                                     !obd_uuid_equals(&tgt->ltd_uuid,
1079                                                      &qctl->obd_uuid))
1080                                         continue;
1081
1082                                 if (tgt->ltd_exp == NULL)
1083                                         RETURN(-EINVAL);
1084
1085                                 break;
1086                         }
1087                 } else {
1088                         RETURN(-EINVAL);
1089                 }
1090
1091                 if (i >= count)
1092                         RETURN(-EAGAIN);
1093
1094                 LASSERT(tgt && tgt->ltd_exp);
1095                 OBD_ALLOC_PTR(oqctl);
1096                 if (!oqctl)
1097                         RETURN(-ENOMEM);
1098
1099                 QCTL_COPY(oqctl, qctl);
1100                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
1101                 if (rc == 0) {
1102                         QCTL_COPY(qctl, oqctl);
1103                         qctl->qc_valid = QC_OSTIDX;
1104                         qctl->obd_uuid = tgt->ltd_uuid;
1105                 }
1106                 OBD_FREE_PTR(oqctl);
1107                 break;
1108         }
1109         default: {
1110                 int set = 0;
1111
1112                 if (count == 0)
1113                         RETURN(-ENOTTY);
1114
1115                 for (i = 0; i < count; i++) {
1116                         int err;
1117                         struct obd_device *osc_obd;
1118
1119                         /* OST was disconnected */
1120                         if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
1121                                 continue;
1122
1123                         /* ll_umount_begin() sets force on lov, pass to osc */
1124                         osc_obd = class_exp2obd(lov->lov_tgts[i]->ltd_exp);
1125                         osc_obd->obd_force = obd->obd_force;
1126                         err = obd_iocontrol(cmd, lov->lov_tgts[i]->ltd_exp,
1127                                             len, karg, uarg);
1128                         if (err) {
1129                                 if (lov->lov_tgts[i]->ltd_active) {
1130                                         CDEBUG(err == -ENOTTY ?
1131                                                D_IOCTL : D_WARNING,
1132                                                "iocontrol OSC %s on OST "
1133                                                "idx %d cmd %x: err = %d\n",
1134                                                lov_uuid2str(lov, i),
1135                                                i, cmd, err);
1136                                         if (!rc)
1137                                                 rc = err;
1138                                 }
1139                         } else {
1140                                 set = 1;
1141                         }
1142                 }
1143                 if (!set && !rc)
1144                         rc = -EIO;
1145         }
1146         }
1147
1148         RETURN(rc);
1149 }
1150
1151 static int lov_get_info(const struct lu_env *env, struct obd_export *exp,
1152                         __u32 keylen, void *key, __u32 *vallen, void *val)
1153 {
1154         struct obd_device *obddev = class_exp2obd(exp);
1155         struct lov_obd *lov = &obddev->u.lov;
1156         struct lov_desc *ld = &lov->desc;
1157         int rc = 0;
1158         ENTRY;
1159
1160         if (vallen == NULL || val == NULL)
1161                 RETURN(-EFAULT);
1162
1163         lov_tgts_getref(obddev);
1164
1165         if (KEY_IS(KEY_MAX_EASIZE)) {
1166                 u32 max_stripe_count = min_t(u32, ld->ld_active_tgt_count,
1167                                              LOV_MAX_STRIPE_COUNT);
1168
1169                 *((u32 *)val) = lov_mds_md_size(max_stripe_count, LOV_MAGIC_V3);
1170         } else if (KEY_IS(KEY_DEFAULT_EASIZE)) {
1171                 u32 def_stripe_count = min_t(u32, ld->ld_default_stripe_count,
1172                                              LOV_MAX_STRIPE_COUNT);
1173
1174                 *((u32 *)val) = lov_mds_md_size(def_stripe_count, LOV_MAGIC_V3);
1175         } else if (KEY_IS(KEY_TGT_COUNT)) {
1176                 *((int *)val) = lov->desc.ld_tgt_count;
1177         } else {
1178                 rc = -EINVAL;
1179         }
1180
1181         lov_tgts_putref(obddev);
1182
1183         RETURN(rc);
1184 }
1185
1186 static int lov_set_info_async(const struct lu_env *env, struct obd_export *exp,
1187                               __u32 keylen, void *key,
1188                               __u32 vallen, void *val,
1189                               struct ptlrpc_request_set *set)
1190 {
1191         struct obd_device *obddev = class_exp2obd(exp);
1192         struct lov_obd *lov = &obddev->u.lov;
1193         struct lov_tgt_desc *tgt;
1194         bool do_inactive = false, no_set = false;
1195         u32 i;
1196         int rc = 0;
1197         int err;
1198
1199         ENTRY;
1200
1201         if (set == NULL) {
1202                 no_set = true;
1203                 set = ptlrpc_prep_set();
1204                 if (!set)
1205                         RETURN(-ENOMEM);
1206         }
1207
1208         lov_tgts_getref(obddev);
1209
1210         if (KEY_IS(KEY_CHECKSUM))
1211                 do_inactive = true;
1212
1213         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1214                 tgt = lov->lov_tgts[i];
1215
1216                 /* OST was disconnected */
1217                 if (tgt == NULL || tgt->ltd_exp == NULL)
1218                         continue;
1219
1220                 /* OST is inactive and we don't want inactive OSCs */
1221                 if (!tgt->ltd_active && !do_inactive)
1222                         continue;
1223
1224                 err = obd_set_info_async(env, tgt->ltd_exp, keylen, key,
1225                                          vallen, val, set);
1226
1227                 if (rc == 0)
1228                         rc = err;
1229         }
1230
1231         /* cycle through MDC target for Data-on-MDT */
1232         for (i = 0; i < LOV_MDC_TGT_MAX; i++) {
1233                 struct obd_device *mdc;
1234
1235                 mdc = lov->lov_mdc_tgts[i].lmtd_mdc;
1236                 if (mdc == NULL)
1237                         continue;
1238
1239                 err = obd_set_info_async(env, mdc->obd_self_export,
1240                                          keylen, key, vallen, val, set);
1241                 if (rc == 0)
1242                         rc = err;
1243         }
1244
1245         lov_tgts_putref(obddev);
1246         if (no_set) {
1247                 err = ptlrpc_set_wait(env, set);
1248                 if (rc == 0)
1249                         rc = err;
1250                 ptlrpc_set_destroy(set);
1251         }
1252         RETURN(rc);
1253 }
1254
1255 void lov_stripe_lock(struct lov_stripe_md *md)
1256 __acquires(&md->lsm_lock)
1257 {
1258         LASSERT(md->lsm_lock_owner != current_pid());
1259         spin_lock(&md->lsm_lock);
1260         LASSERT(md->lsm_lock_owner == 0);
1261         md->lsm_lock_owner = current_pid();
1262 }
1263
1264 void lov_stripe_unlock(struct lov_stripe_md *md)
1265 __releases(&md->lsm_lock)
1266 {
1267         LASSERT(md->lsm_lock_owner == current_pid());
1268         md->lsm_lock_owner = 0;
1269         spin_unlock(&md->lsm_lock);
1270 }
1271
1272 static int lov_quotactl(struct obd_device *obd, struct obd_export *exp,
1273                         struct obd_quotactl *oqctl)
1274 {
1275         struct lov_obd      *lov = &obd->u.lov;
1276         struct lov_tgt_desc *tgt;
1277         __u64                curspace = 0;
1278         __u64                bhardlimit = 0;
1279         int                  i, rc = 0;
1280         ENTRY;
1281
1282         if (oqctl->qc_cmd != Q_GETOQUOTA &&
1283             oqctl->qc_cmd != LUSTRE_Q_SETQUOTA) {
1284                 CERROR("%s: bad quota opc %x for lov obd\n",
1285                        obd->obd_name, oqctl->qc_cmd);
1286                 RETURN(-EFAULT);
1287         }
1288
1289         /* for lov tgt */
1290         lov_tgts_getref(obd);
1291         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1292                 int err;
1293
1294                 tgt = lov->lov_tgts[i];
1295
1296                 if (!tgt)
1297                         continue;
1298
1299                 if (!tgt->ltd_active || tgt->ltd_reap) {
1300                         if (oqctl->qc_cmd == Q_GETOQUOTA &&
1301                             lov->lov_tgts[i]->ltd_activate) {
1302                                 rc = -ENETDOWN;
1303                                 CERROR("ost %d is inactive\n", i);
1304                         } else {
1305                                 CDEBUG(D_HA, "ost %d is inactive\n", i);
1306                         }
1307                         continue;
1308                 }
1309
1310                 err = obd_quotactl(tgt->ltd_exp, oqctl);
1311                 if (err) {
1312                         if (tgt->ltd_active && !rc)
1313                                 rc = err;
1314                         continue;
1315                 }
1316
1317                 if (oqctl->qc_cmd == Q_GETOQUOTA) {
1318                         curspace += oqctl->qc_dqblk.dqb_curspace;
1319                         bhardlimit += oqctl->qc_dqblk.dqb_bhardlimit;
1320                 }
1321         }
1322         lov_tgts_putref(obd);
1323
1324         if (oqctl->qc_cmd == Q_GETOQUOTA) {
1325                 oqctl->qc_dqblk.dqb_curspace = curspace;
1326                 oqctl->qc_dqblk.dqb_bhardlimit = bhardlimit;
1327         }
1328         RETURN(rc);
1329 }
1330
1331 static struct obd_ops lov_obd_ops = {
1332         .o_owner                = THIS_MODULE,
1333         .o_setup                = lov_setup,
1334         .o_cleanup              = lov_cleanup,
1335         .o_connect              = lov_connect,
1336         .o_disconnect           = lov_disconnect,
1337         .o_statfs               = lov_statfs,
1338         .o_iocontrol            = lov_iocontrol,
1339         .o_get_info             = lov_get_info,
1340         .o_set_info_async       = lov_set_info_async,
1341         .o_notify               = lov_notify,
1342         .o_pool_new             = lov_pool_new,
1343         .o_pool_rem             = lov_pool_remove,
1344         .o_pool_add             = lov_pool_add,
1345         .o_pool_del             = lov_pool_del,
1346         .o_quotactl             = lov_quotactl,
1347 };
1348
1349 struct kmem_cache *lov_oinfo_slab;
1350
1351 static int __init lov_init(void)
1352 {
1353         bool enable_proc = true;
1354         struct obd_type *type;
1355         int rc;
1356         ENTRY;
1357
1358         /* print an address of _any_ initialized kernel symbol from this
1359          * module, to allow debugging with gdb that doesn't support data
1360          * symbols from modules.*/
1361         CDEBUG(D_INFO, "Lustre LOV module (%p).\n", &lov_caches);
1362
1363         rc = lu_kmem_init(lov_caches);
1364         if (rc)
1365                 return rc;
1366
1367         lov_oinfo_slab = kmem_cache_create("lov_oinfo",
1368                                            sizeof(struct lov_oinfo), 0,
1369                                            SLAB_HWCACHE_ALIGN, NULL);
1370         if (lov_oinfo_slab == NULL) {
1371                 lu_kmem_fini(lov_caches);
1372                 return -ENOMEM;
1373         }
1374
1375         type = class_search_type(LUSTRE_LOD_NAME);
1376         if (type != NULL && type->typ_procsym != NULL)
1377                 enable_proc = false;
1378
1379         rc = class_register_type(&lov_obd_ops, NULL, enable_proc, NULL,
1380                                  LUSTRE_LOV_NAME, &lov_device_type);
1381
1382         if (rc) {
1383                 kmem_cache_destroy(lov_oinfo_slab);
1384                 lu_kmem_fini(lov_caches);
1385         }
1386
1387         RETURN(rc);
1388 }
1389
1390 static void __exit lov_exit(void)
1391 {
1392         class_unregister_type(LUSTRE_LOV_NAME);
1393         kmem_cache_destroy(lov_oinfo_slab);
1394         lu_kmem_fini(lov_caches);
1395 }
1396
1397 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
1398 MODULE_DESCRIPTION("Lustre Logical Object Volume");
1399 MODULE_VERSION(LUSTRE_VERSION_STRING);
1400 MODULE_LICENSE("GPL");
1401
1402 module_init(lov_init);
1403 module_exit(lov_exit);