Whamcloud - gitweb
LU-3531 llite: move dir cache to MDC layer
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_LMV
38 #ifdef __KERNEL__
39 #include <linux/slab.h>
40 #include <linux/module.h>
41 #include <linux/init.h>
42 #include <linux/slab.h>
43 #include <linux/pagemap.h>
44 #include <linux/mm.h>
45 #include <asm/div64.h>
46 #include <linux/seq_file.h>
47 #include <linux/namei.h>
48 #else
49 #include <liblustre.h>
50 #endif
51
52 #include <lustre/lustre_idl.h>
53 #include <obd_support.h>
54 #include <lustre_lib.h>
55 #include <lustre_net.h>
56 #include <obd_class.h>
57 #include <lustre_lmv.h>
58 #include <lprocfs_status.h>
59 #include <cl_object.h>
60 #include <lclient.h>
61 #include <lustre_lite.h>
62 #include <lustre_fid.h>
63 #include "lmv_internal.h"
64
65 int raw_name2idx(int hashtype, int count, const char *name, int namelen)
66 {
67         unsigned int    c = 0;
68         int             idx;
69
70         LASSERT(namelen > 0);
71
72         if (filename_is_volatile(name, namelen, &idx)) {
73                 if (idx >= 0 && idx < count)
74                         return idx;
75                 goto choose_hash;
76         }
77
78         if (count <= 1)
79                 return 0;
80
81 choose_hash:
82         switch (hashtype) {
83         case MEA_MAGIC_LAST_CHAR:
84                 c = mea_last_char_hash(count, name, namelen);
85                 break;
86         case MEA_MAGIC_ALL_CHARS:
87                 c = mea_all_chars_hash(count, name, namelen);
88                 break;
89         case MEA_MAGIC_HASH_SEGMENT:
90                 CERROR("Unsupported hash type MEA_MAGIC_HASH_SEGMENT\n");
91                 break;
92         default:
93                 CERROR("Unknown hash type 0x%x\n", hashtype);
94         }
95
96         LASSERT(c < count);
97         return c;
98 }
99
100 static void lmv_activate_target(struct lmv_obd *lmv,
101                                 struct lmv_tgt_desc *tgt,
102                                 int activate)
103 {
104         if (tgt->ltd_active == activate)
105                 return;
106
107         tgt->ltd_active = activate;
108         lmv->desc.ld_active_tgt_count += (activate ? 1 : -1);
109 }
110
111 /**
112  * Error codes:
113  *
114  *  -EINVAL  : UUID can't be found in the LMV's target list
115  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
116  *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
117  */
118 static int lmv_set_mdc_active(struct lmv_obd *lmv,
119                               const struct obd_uuid *uuid,
120                               int activate)
121 {
122         struct lmv_tgt_desc     *tgt = NULL;
123         struct obd_device       *obd;
124         __u32                    i;
125         int                      rc = 0;
126         ENTRY;
127
128         CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
129                         lmv, uuid->uuid, activate);
130
131         spin_lock(&lmv->lmv_lock);
132         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
133                 tgt = lmv->tgts[i];
134                 if (tgt == NULL || tgt->ltd_exp == NULL)
135                         continue;
136
137                 CDEBUG(D_INFO, "Target idx %d is %s conn "LPX64"\n", i,
138                        tgt->ltd_uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
139
140                 if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
141                         break;
142         }
143
144         if (i == lmv->desc.ld_tgt_count)
145                 GOTO(out_lmv_lock, rc = -EINVAL);
146
147         obd = class_exp2obd(tgt->ltd_exp);
148         if (obd == NULL)
149                 GOTO(out_lmv_lock, rc = -ENOTCONN);
150
151         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
152                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
153                obd->obd_type->typ_name, i);
154         LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0);
155
156         if (tgt->ltd_active == activate) {
157                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
158                        activate ? "" : "in");
159                 GOTO(out_lmv_lock, rc);
160         }
161
162         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd,
163                activate ? "" : "in");
164         lmv_activate_target(lmv, tgt, activate);
165         EXIT;
166
167  out_lmv_lock:
168         spin_unlock(&lmv->lmv_lock);
169         return rc;
170 }
171
172 struct obd_uuid *lmv_get_uuid(struct obd_export *exp)
173 {
174         struct lmv_obd          *lmv = &exp->exp_obd->u.lmv;
175         struct lmv_tgt_desc     *tgt = lmv->tgts[0];
176
177         return (tgt == NULL) ? NULL : obd_get_uuid(tgt->ltd_exp);
178 }
179
180 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
181                       enum obd_notify_event ev, void *data)
182 {
183         struct obd_connect_data *conn_data;
184         struct lmv_obd          *lmv = &obd->u.lmv;
185         struct obd_uuid         *uuid;
186         int                      rc = 0;
187         ENTRY;
188
189         if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) {
190                 CERROR("unexpected notification of %s %s!\n",
191                        watched->obd_type->typ_name,
192                        watched->obd_name);
193                 RETURN(-EINVAL);
194         }
195
196         uuid = &watched->u.cli.cl_target_uuid;
197         if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) {
198                 /*
199                  * Set MDC as active before notifying the observer, so the
200                  * observer can use the MDC normally.
201                  */
202                 rc = lmv_set_mdc_active(lmv, uuid,
203                                         ev == OBD_NOTIFY_ACTIVE);
204                 if (rc) {
205                         CERROR("%sactivation of %s failed: %d\n",
206                                ev == OBD_NOTIFY_ACTIVE ? "" : "de",
207                                uuid->uuid, rc);
208                         RETURN(rc);
209                 }
210         } else if (ev == OBD_NOTIFY_OCD) {
211                 conn_data = &watched->u.cli.cl_import->imp_connect_data;
212                 /*
213                  * XXX: Make sure that ocd_connect_flags from all targets are
214                  * the same. Otherwise one of MDTs runs wrong version or
215                  * something like this.  --umka
216                  */
217                 obd->obd_self_export->exp_connect_data = *conn_data;
218         }
219 #if 0
220         else if (ev == OBD_NOTIFY_DISCON) {
221                 /*
222                  * For disconnect event, flush fld cache for failout MDS case.
223                  */
224                 fld_client_flush(&lmv->lmv_fld);
225         }
226 #endif
227         /*
228          * Pass the notification up the chain.
229          */
230         if (obd->obd_observer)
231                 rc = obd_notify(obd->obd_observer, watched, ev, data);
232
233         RETURN(rc);
234 }
235
236 /**
237  * This is fake connect function. Its purpose is to initialize lmv and say
238  * caller that everything is okay. Real connection will be performed later.
239  */
240 static int lmv_connect(const struct lu_env *env,
241                        struct obd_export **exp, struct obd_device *obd,
242                        struct obd_uuid *cluuid, struct obd_connect_data *data,
243                        void *localdata)
244 {
245 #ifdef __KERNEL__
246         struct proc_dir_entry *lmv_proc_dir;
247 #endif
248         struct lmv_obd        *lmv = &obd->u.lmv;
249         struct lustre_handle  conn = { 0 };
250         int                    rc = 0;
251         ENTRY;
252
253         /*
254          * We don't want to actually do the underlying connections more than
255          * once, so keep track.
256          */
257         lmv->refcount++;
258         if (lmv->refcount > 1) {
259                 *exp = NULL;
260                 RETURN(0);
261         }
262
263         rc = class_connect(&conn, obd, cluuid);
264         if (rc) {
265                 CERROR("class_connection() returned %d\n", rc);
266                 RETURN(rc);
267         }
268
269         *exp = class_conn2export(&conn);
270         class_export_get(*exp);
271
272         lmv->exp = *exp;
273         lmv->connected = 0;
274         lmv->cluuid = *cluuid;
275
276         if (data)
277                 lmv->conn_data = *data;
278
279 #ifdef __KERNEL__
280         if (obd->obd_proc_private != NULL) {
281                 lmv_proc_dir = obd->obd_proc_private;
282         } else {
283                 lmv_proc_dir = lprocfs_seq_register("target_obds",
284                                                     obd->obd_proc_entry,
285                                                     NULL, NULL);
286                 if (IS_ERR(lmv_proc_dir)) {
287                         CERROR("could not register /proc/fs/lustre/%s/%s/target_obds.",
288                                obd->obd_type->typ_name, obd->obd_name);
289                         lmv_proc_dir = NULL;
290                 }
291                 obd->obd_proc_private = lmv_proc_dir;
292         }
293 #endif
294
295         /*
296          * All real clients should perform actual connection right away, because
297          * it is possible, that LMV will not have opportunity to connect targets
298          * and MDC stuff will be called directly, for instance while reading
299          * ../mdc/../kbytesfree procfs file, etc.
300          */
301         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_REAL))
302                 rc = lmv_check_connect(obd);
303
304 #ifdef __KERNEL__
305         if (rc && lmv_proc_dir) {
306                 lprocfs_remove(&lmv_proc_dir);
307                 obd->obd_proc_private = NULL;
308         }
309 #endif
310         RETURN(rc);
311 }
312
313 static void lmv_set_timeouts(struct obd_device *obd)
314 {
315         struct lmv_obd          *lmv;
316         __u32                    i;
317
318         lmv = &obd->u.lmv;
319         if (lmv->server_timeout == 0)
320                 return;
321
322         if (lmv->connected == 0)
323                 return;
324
325         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
326                 struct lmv_tgt_desc *tgt = lmv->tgts[i];
327
328                 if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active)
329                         continue;
330
331                 obd_set_info_async(NULL, tgt->ltd_exp, sizeof(KEY_INTERMDS),
332                                    KEY_INTERMDS, 0, NULL, NULL);
333         }
334 }
335
336 static int lmv_init_ea_size(struct obd_export *exp, int easize,
337                             int def_easize, int cookiesize)
338 {
339         struct obd_device       *obd = exp->exp_obd;
340         struct lmv_obd          *lmv = &obd->u.lmv;
341         __u32                    i;
342         int                      rc = 0;
343         int                      change = 0;
344         ENTRY;
345
346         if (lmv->max_easize < easize) {
347                 lmv->max_easize = easize;
348                 change = 1;
349         }
350         if (lmv->max_def_easize < def_easize) {
351                 lmv->max_def_easize = def_easize;
352                 change = 1;
353         }
354         if (lmv->max_cookiesize < cookiesize) {
355                 lmv->max_cookiesize = cookiesize;
356                 change = 1;
357         }
358         if (change == 0)
359                 RETURN(0);
360
361         if (lmv->connected == 0)
362                 RETURN(0);
363
364         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
365                 struct lmv_tgt_desc *tgt = lmv->tgts[i];
366
367                 if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active) {
368                         CWARN("%s: NULL export for %d\n", obd->obd_name, i);
369                         continue;
370                 }
371
372                 rc = md_init_ea_size(tgt->ltd_exp, easize, def_easize,
373                                      cookiesize);
374                 if (rc) {
375                         CERROR("%s: obd_init_ea_size() failed on MDT target %d:"
376                                " rc = %d.\n", obd->obd_name, i, rc);
377                         break;
378                 }
379         }
380         RETURN(rc);
381 }
382
383 #define MAX_STRING_SIZE 128
384
385 int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
386 {
387 #ifdef __KERNEL__
388         struct proc_dir_entry   *lmv_proc_dir;
389 #endif
390         struct lmv_obd          *lmv = &obd->u.lmv;
391         struct obd_uuid         *cluuid = &lmv->cluuid;
392         struct obd_uuid          lmv_mdc_uuid = { "LMV_MDC_UUID" };
393         struct obd_device       *mdc_obd;
394         struct obd_export       *mdc_exp;
395         struct lu_fld_target     target;
396         int                      rc;
397         ENTRY;
398
399         mdc_obd = class_find_client_obd(&tgt->ltd_uuid, LUSTRE_MDC_NAME,
400                                         &obd->obd_uuid);
401         if (!mdc_obd) {
402                 CERROR("target %s not attached\n", tgt->ltd_uuid.uuid);
403                 RETURN(-EINVAL);
404         }
405
406         CDEBUG(D_CONFIG, "connect to %s(%s) - %s, %s FOR %s\n",
407                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
408                 tgt->ltd_uuid.uuid, obd->obd_uuid.uuid,
409                 cluuid->uuid);
410
411         if (!mdc_obd->obd_set_up) {
412                 CERROR("target %s is not set up\n", tgt->ltd_uuid.uuid);
413                 RETURN(-EINVAL);
414         }
415
416         rc = obd_connect(NULL, &mdc_exp, mdc_obd, &lmv_mdc_uuid,
417                          &lmv->conn_data, NULL);
418         if (rc) {
419                 CERROR("target %s connect error %d\n", tgt->ltd_uuid.uuid, rc);
420                 RETURN(rc);
421         }
422
423         /*
424          * Init fid sequence client for this mdc and add new fld target.
425          */
426         rc = obd_fid_init(mdc_obd, mdc_exp, LUSTRE_SEQ_METADATA);
427         if (rc)
428                 RETURN(rc);
429
430         target.ft_srv = NULL;
431         target.ft_exp = mdc_exp;
432         target.ft_idx = tgt->ltd_idx;
433
434         fld_client_add_target(&lmv->lmv_fld, &target);
435
436         rc = obd_register_observer(mdc_obd, obd);
437         if (rc) {
438                 obd_disconnect(mdc_exp);
439                 CERROR("target %s register_observer error %d\n",
440                        tgt->ltd_uuid.uuid, rc);
441                 RETURN(rc);
442         }
443
444         if (obd->obd_observer) {
445                 /*
446                  * Tell the observer about the new target.
447                  */
448                 rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
449                                 OBD_NOTIFY_ACTIVE,
450                                 (void *)(tgt - lmv->tgts[0]));
451                 if (rc) {
452                         obd_disconnect(mdc_exp);
453                         RETURN(rc);
454                 }
455         }
456
457         tgt->ltd_active = 1;
458         tgt->ltd_exp = mdc_exp;
459         lmv->desc.ld_active_tgt_count++;
460
461         md_init_ea_size(tgt->ltd_exp, lmv->max_easize,
462                         lmv->max_def_easize, lmv->max_cookiesize);
463
464         CDEBUG(D_CONFIG, "Connected to %s(%s) successfully (%d)\n",
465                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
466                 cfs_atomic_read(&obd->obd_refcount));
467
468 #ifdef __KERNEL__
469         lmv_proc_dir = obd->obd_proc_private;
470         if (lmv_proc_dir) {
471                 struct proc_dir_entry *mdc_symlink;
472
473                 LASSERT(mdc_obd->obd_type != NULL);
474                 LASSERT(mdc_obd->obd_type->typ_name != NULL);
475                 mdc_symlink = lprocfs_add_symlink(mdc_obd->obd_name,
476                                                   lmv_proc_dir,
477                                                   "../../../%s/%s",
478                                                   mdc_obd->obd_type->typ_name,
479                                                   mdc_obd->obd_name);
480                 if (mdc_symlink == NULL) {
481                         CERROR("Could not register LMV target "
482                                "/proc/fs/lustre/%s/%s/target_obds/%s.",
483                                obd->obd_type->typ_name, obd->obd_name,
484                                mdc_obd->obd_name);
485                         lprocfs_remove(&lmv_proc_dir);
486                         obd->obd_proc_private = NULL;
487                 }
488         }
489 #endif
490         RETURN(0);
491 }
492
493 static void lmv_del_target(struct lmv_obd *lmv, int index)
494 {
495         if (lmv->tgts[index] == NULL)
496                 return;
497
498         OBD_FREE_PTR(lmv->tgts[index]);
499         lmv->tgts[index] = NULL;
500         return;
501 }
502
503 static int lmv_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
504                            __u32 index, int gen)
505 {
506         struct lmv_obd      *lmv = &obd->u.lmv;
507         struct lmv_tgt_desc *tgt;
508         int                  rc = 0;
509         ENTRY;
510
511         CDEBUG(D_CONFIG, "Target uuid: %s. index %d\n", uuidp->uuid, index);
512
513         lmv_init_lock(lmv);
514
515         if (lmv->desc.ld_tgt_count == 0) {
516                 struct obd_device *mdc_obd;
517
518                 mdc_obd = class_find_client_obd(uuidp, LUSTRE_MDC_NAME,
519                                                 &obd->obd_uuid);
520                 if (!mdc_obd) {
521                         lmv_init_unlock(lmv);
522                         CERROR("%s: Target %s not attached: rc = %d\n",
523                                obd->obd_name, uuidp->uuid, -EINVAL);
524                         RETURN(-EINVAL);
525                 }
526         }
527
528         if ((index < lmv->tgts_size) && (lmv->tgts[index] != NULL)) {
529                 tgt = lmv->tgts[index];
530                 CERROR("%s: UUID %s already assigned at LOV target index %d:"
531                        " rc = %d\n", obd->obd_name,
532                        obd_uuid2str(&tgt->ltd_uuid), index, -EEXIST);
533                 lmv_init_unlock(lmv);
534                 RETURN(-EEXIST);
535         }
536
537         if (index >= lmv->tgts_size) {
538                 /* We need to reallocate the lmv target array. */
539                 struct lmv_tgt_desc **newtgts, **old = NULL;
540                 __u32 newsize = 1;
541                 __u32 oldsize = 0;
542
543                 while (newsize < index + 1)
544                         newsize = newsize << 1;
545                 OBD_ALLOC(newtgts, sizeof(*newtgts) * newsize);
546                 if (newtgts == NULL) {
547                         lmv_init_unlock(lmv);
548                         RETURN(-ENOMEM);
549                 }
550
551                 if (lmv->tgts_size) {
552                         memcpy(newtgts, lmv->tgts,
553                                sizeof(*newtgts) * lmv->tgts_size);
554                         old = lmv->tgts;
555                         oldsize = lmv->tgts_size;
556                 }
557
558                 lmv->tgts = newtgts;
559                 lmv->tgts_size = newsize;
560                 smp_rmb();
561                 if (old)
562                         OBD_FREE(old, sizeof(*old) * oldsize);
563
564                 CDEBUG(D_CONFIG, "tgts: %p size: %d\n", lmv->tgts,
565                        lmv->tgts_size);
566         }
567
568         OBD_ALLOC_PTR(tgt);
569         if (!tgt) {
570                 lmv_init_unlock(lmv);
571                 RETURN(-ENOMEM);
572         }
573
574         mutex_init(&tgt->ltd_fid_mutex);
575         tgt->ltd_idx = index;
576         tgt->ltd_uuid = *uuidp;
577         tgt->ltd_active = 0;
578         lmv->tgts[index] = tgt;
579         if (index >= lmv->desc.ld_tgt_count)
580                 lmv->desc.ld_tgt_count = index + 1;
581
582         if (lmv->connected) {
583                 rc = lmv_connect_mdc(obd, tgt);
584                 if (rc) {
585                         spin_lock(&lmv->lmv_lock);
586                         lmv->desc.ld_tgt_count--;
587                         memset(tgt, 0, sizeof(*tgt));
588                         spin_unlock(&lmv->lmv_lock);
589                 } else {
590                         int easize = sizeof(struct lmv_stripe_md) +
591                                      lmv->desc.ld_tgt_count *
592                                      sizeof(struct lu_fid);
593                         lmv_init_ea_size(obd->obd_self_export, easize, 0, 0);
594                 }
595         }
596
597         lmv_init_unlock(lmv);
598         RETURN(rc);
599 }
600
601 int lmv_check_connect(struct obd_device *obd)
602 {
603         struct lmv_obd          *lmv = &obd->u.lmv;
604         struct lmv_tgt_desc     *tgt;
605         __u32                    i;
606         int                      rc;
607         int                      easize;
608         ENTRY;
609
610         if (lmv->connected)
611                 RETURN(0);
612
613         lmv_init_lock(lmv);
614         if (lmv->connected) {
615                 lmv_init_unlock(lmv);
616                 RETURN(0);
617         }
618
619         if (lmv->desc.ld_tgt_count == 0) {
620                 lmv_init_unlock(lmv);
621                 CERROR("%s: no targets configured.\n", obd->obd_name);
622                 RETURN(-EINVAL);
623         }
624
625         LASSERT(lmv->tgts != NULL);
626
627         if (lmv->tgts[0] == NULL) {
628                 lmv_init_unlock(lmv);
629                 CERROR("%s: no target configured for index 0.\n",
630                        obd->obd_name);
631                 RETURN(-EINVAL);
632         }
633
634         CDEBUG(D_CONFIG, "Time to connect %s to %s\n",
635                lmv->cluuid.uuid, obd->obd_name);
636
637         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
638                 tgt = lmv->tgts[i];
639                 if (tgt == NULL)
640                         continue;
641                 rc = lmv_connect_mdc(obd, tgt);
642                 if (rc)
643                         GOTO(out_disc, rc);
644         }
645
646         lmv_set_timeouts(obd);
647         class_export_put(lmv->exp);
648         lmv->connected = 1;
649         easize = lmv_mds_md_size(lmv->desc.ld_tgt_count, LMV_MAGIC);
650         lmv_init_ea_size(obd->obd_self_export, easize, 0, 0);
651         lmv_init_unlock(lmv);
652         RETURN(0);
653
654  out_disc:
655         while (i-- > 0) {
656                 int rc2;
657                 tgt = lmv->tgts[i];
658                 if (tgt == NULL)
659                         continue;
660                 tgt->ltd_active = 0;
661                 if (tgt->ltd_exp) {
662                         --lmv->desc.ld_active_tgt_count;
663                         rc2 = obd_disconnect(tgt->ltd_exp);
664                         if (rc2) {
665                                 CERROR("LMV target %s disconnect on "
666                                        "MDC idx %d: error %d\n",
667                                        tgt->ltd_uuid.uuid, i, rc2);
668                         }
669                 }
670         }
671         class_disconnect(lmv->exp);
672         lmv_init_unlock(lmv);
673         RETURN(rc);
674 }
675
676 static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
677 {
678 #ifdef __KERNEL__
679         struct proc_dir_entry  *lmv_proc_dir;
680 #endif
681         struct lmv_obd         *lmv = &obd->u.lmv;
682         struct obd_device      *mdc_obd;
683         int                     rc;
684         ENTRY;
685
686         LASSERT(tgt != NULL);
687         LASSERT(obd != NULL);
688
689         mdc_obd = class_exp2obd(tgt->ltd_exp);
690
691         if (mdc_obd) {
692                 mdc_obd->obd_force = obd->obd_force;
693                 mdc_obd->obd_fail = obd->obd_fail;
694                 mdc_obd->obd_no_recov = obd->obd_no_recov;
695         }
696
697 #ifdef __KERNEL__
698         lmv_proc_dir = obd->obd_proc_private;
699         if (lmv_proc_dir)
700                 lprocfs_remove_proc_entry(mdc_obd->obd_name, lmv_proc_dir);
701 #endif
702         rc = obd_fid_fini(tgt->ltd_exp->exp_obd);
703         if (rc)
704                 CERROR("Can't finanize fids factory\n");
705
706         CDEBUG(D_INFO, "Disconnected from %s(%s) successfully\n",
707                tgt->ltd_exp->exp_obd->obd_name,
708                tgt->ltd_exp->exp_obd->obd_uuid.uuid);
709
710         obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
711         rc = obd_disconnect(tgt->ltd_exp);
712         if (rc) {
713                 if (tgt->ltd_active) {
714                         CERROR("Target %s disconnect error %d\n",
715                                tgt->ltd_uuid.uuid, rc);
716                 }
717         }
718
719         lmv_activate_target(lmv, tgt, 0);
720         tgt->ltd_exp = NULL;
721         RETURN(0);
722 }
723
724 static int lmv_disconnect(struct obd_export *exp)
725 {
726         struct obd_device       *obd = class_exp2obd(exp);
727         struct lmv_obd          *lmv = &obd->u.lmv;
728         int                      rc;
729         __u32                    i;
730         ENTRY;
731
732         if (!lmv->tgts)
733                 goto out_local;
734
735         /*
736          * Only disconnect the underlying layers on the final disconnect.
737          */
738         lmv->refcount--;
739         if (lmv->refcount != 0)
740                 goto out_local;
741
742         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
743                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
744                         continue;
745
746                 lmv_disconnect_mdc(obd, lmv->tgts[i]);
747         }
748
749 #ifdef __KERNEL__
750         if (obd->obd_proc_private)
751                 lprocfs_remove((struct proc_dir_entry **)&obd->obd_proc_private);
752         else
753                 CERROR("/proc/fs/lustre/%s/%s/target_obds missing\n",
754                        obd->obd_type->typ_name, obd->obd_name);
755 #endif
756
757 out_local:
758         /*
759          * This is the case when no real connection is established by
760          * lmv_check_connect().
761          */
762         if (!lmv->connected)
763                 class_export_put(exp);
764         rc = class_disconnect(exp);
765         if (lmv->refcount == 0)
766                 lmv->connected = 0;
767         RETURN(rc);
768 }
769
770 static int lmv_fid2path(struct obd_export *exp, int len, void *karg, void *uarg)
771 {
772         struct obd_device       *obddev = class_exp2obd(exp);
773         struct lmv_obd          *lmv = &obddev->u.lmv;
774         struct getinfo_fid2path *gf;
775         struct lmv_tgt_desc     *tgt;
776         struct getinfo_fid2path *remote_gf = NULL;
777         int                     remote_gf_size = 0;
778         int                     rc;
779
780         gf = (struct getinfo_fid2path *)karg;
781         tgt = lmv_find_target(lmv, &gf->gf_fid);
782         if (IS_ERR(tgt))
783                 RETURN(PTR_ERR(tgt));
784
785 repeat_fid2path:
786         rc = obd_iocontrol(OBD_IOC_FID2PATH, tgt->ltd_exp, len, gf, uarg);
787         if (rc != 0 && rc != -EREMOTE)
788                 GOTO(out_fid2path, rc);
789
790         /* If remote_gf != NULL, it means just building the
791          * path on the remote MDT, copy this path segement to gf */
792         if (remote_gf != NULL) {
793                 struct getinfo_fid2path *ori_gf;
794                 char *ptr;
795
796                 ori_gf = (struct getinfo_fid2path *)karg;
797                 if (strlen(ori_gf->gf_path) +
798                     strlen(gf->gf_path) > ori_gf->gf_pathlen)
799                         GOTO(out_fid2path, rc = -EOVERFLOW);
800
801                 ptr = ori_gf->gf_path;
802
803                 memmove(ptr + strlen(gf->gf_path) + 1, ptr,
804                         strlen(ori_gf->gf_path));
805
806                 strncpy(ptr, gf->gf_path, strlen(gf->gf_path));
807                 ptr += strlen(gf->gf_path);
808                 *ptr = '/';
809         }
810
811         CDEBUG(D_INFO, "%s: get path %s "DFID" rec: "LPU64" ln: %u\n",
812                tgt->ltd_exp->exp_obd->obd_name,
813                gf->gf_path, PFID(&gf->gf_fid), gf->gf_recno,
814                gf->gf_linkno);
815
816         if (rc == 0)
817                 GOTO(out_fid2path, rc);
818
819         /* sigh, has to go to another MDT to do path building further */
820         if (remote_gf == NULL) {
821                 remote_gf_size = sizeof(*remote_gf) + PATH_MAX;
822                 OBD_ALLOC(remote_gf, remote_gf_size);
823                 if (remote_gf == NULL)
824                         GOTO(out_fid2path, rc = -ENOMEM);
825                 remote_gf->gf_pathlen = PATH_MAX;
826         }
827
828         if (!fid_is_sane(&gf->gf_fid)) {
829                 CERROR("%s: invalid FID "DFID": rc = %d\n",
830                        tgt->ltd_exp->exp_obd->obd_name,
831                        PFID(&gf->gf_fid), -EINVAL);
832                 GOTO(out_fid2path, rc = -EINVAL);
833         }
834
835         tgt = lmv_find_target(lmv, &gf->gf_fid);
836         if (IS_ERR(tgt))
837                 GOTO(out_fid2path, rc = -EINVAL);
838
839         remote_gf->gf_fid = gf->gf_fid;
840         remote_gf->gf_recno = -1;
841         remote_gf->gf_linkno = -1;
842         memset(remote_gf->gf_path, 0, remote_gf->gf_pathlen);
843         gf = remote_gf;
844         goto repeat_fid2path;
845
846 out_fid2path:
847         if (remote_gf != NULL)
848                 OBD_FREE(remote_gf, remote_gf_size);
849         RETURN(rc);
850 }
851
852 static int lmv_hsm_req_count(struct lmv_obd *lmv,
853                              const struct hsm_user_request *hur,
854                              const struct lmv_tgt_desc *tgt_mds)
855 {
856         __u32                    i;
857         int                      nr = 0;
858         struct lmv_tgt_desc     *curr_tgt;
859
860         /* count how many requests must be sent to the given target */
861         for (i = 0; i < hur->hur_request.hr_itemcount; i++) {
862                 curr_tgt = lmv_find_target(lmv, &hur->hur_user_item[i].hui_fid);
863                 if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid))
864                         nr++;
865         }
866         return nr;
867 }
868
869 static void lmv_hsm_req_build(struct lmv_obd *lmv,
870                               struct hsm_user_request *hur_in,
871                               const struct lmv_tgt_desc *tgt_mds,
872                               struct hsm_user_request *hur_out)
873 {
874         __u32                    i, nr_out;
875         struct lmv_tgt_desc     *curr_tgt;
876
877         /* build the hsm_user_request for the given target */
878         hur_out->hur_request = hur_in->hur_request;
879         nr_out = 0;
880         for (i = 0; i < hur_in->hur_request.hr_itemcount; i++) {
881                 curr_tgt = lmv_find_target(lmv,
882                                            &hur_in->hur_user_item[i].hui_fid);
883                 if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid)) {
884                         hur_out->hur_user_item[nr_out] =
885                                                 hur_in->hur_user_item[i];
886                         nr_out++;
887                 }
888         }
889         hur_out->hur_request.hr_itemcount = nr_out;
890         memcpy(hur_data(hur_out), hur_data(hur_in),
891                hur_in->hur_request.hr_data_len);
892 }
893
894 static int lmv_hsm_ct_unregister(struct lmv_obd *lmv, unsigned int cmd, int len,
895                                  struct lustre_kernelcomm *lk, void *uarg)
896 {
897         __u32                    i;
898         int                      rc;
899         struct kkuc_ct_data     *kcd = NULL;
900         ENTRY;
901
902         /* unregister request (call from llapi_hsm_copytool_fini) */
903         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
904                 struct lmv_tgt_desc *tgt = lmv->tgts[i];
905
906                 if (tgt == NULL || tgt->ltd_exp == NULL)
907                         continue;
908                 /* best effort: try to clean as much as possible
909                  * (continue on error) */
910                 obd_iocontrol(cmd, tgt->ltd_exp, len, lk, uarg);
911         }
912
913         /* Whatever the result, remove copytool from kuc groups.
914          * Unreached coordinators will get EPIPE on next requests
915          * and will unregister automatically.
916          */
917         rc = libcfs_kkuc_group_rem(lk->lk_uid, lk->lk_group, (void **)&kcd);
918         if (kcd != NULL)
919                 OBD_FREE_PTR(kcd);
920
921         RETURN(rc);
922 }
923
924 static int lmv_hsm_ct_register(struct lmv_obd *lmv, unsigned int cmd, int len,
925                                struct lustre_kernelcomm *lk, void *uarg)
926 {
927         struct file             *filp;
928         __u32                    i, j;
929         int                      err, rc;
930         bool                     any_set = false;
931         struct kkuc_ct_data     *kcd;
932         ENTRY;
933
934         /* All or nothing: try to register to all MDS.
935          * In case of failure, unregister from previous MDS,
936          * except if it because of inactive target. */
937         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
938                 struct lmv_tgt_desc *tgt = lmv->tgts[i];
939
940                 if (tgt == NULL || tgt->ltd_exp == NULL)
941                         continue;
942                 err = obd_iocontrol(cmd, tgt->ltd_exp, len, lk, uarg);
943                 if (err) {
944                         if (tgt->ltd_active) {
945                                 /* permanent error */
946                                 CERROR("%s: iocontrol MDC %s on MDT"
947                                        " idx %d cmd %x: err = %d\n",
948                                        class_exp2obd(lmv->exp)->obd_name,
949                                        tgt->ltd_uuid.uuid, i, cmd, err);
950                                 rc = err;
951                                 lk->lk_flags |= LK_FLG_STOP;
952                                 /* unregister from previous MDS */
953                                 for (j = 0; j < i; j++) {
954                                         tgt = lmv->tgts[j];
955                                         if (tgt == NULL || tgt->ltd_exp == NULL)
956                                                 continue;
957                                         obd_iocontrol(cmd, tgt->ltd_exp, len,
958                                                       lk, uarg);
959                                 }
960                                 RETURN(rc);
961                         }
962                         /* else: transient error.
963                          * kuc will register to the missing MDT
964                          * when it is back */
965                 } else {
966                         any_set = true;
967                 }
968         }
969
970         if (!any_set)
971                 /* no registration done: return error */
972                 RETURN(-ENOTCONN);
973
974         /* at least one registration done, with no failure */
975         filp = fget(lk->lk_wfd);
976         if (filp == NULL)
977                 RETURN(-EBADF);
978
979         OBD_ALLOC_PTR(kcd);
980         if (kcd == NULL) {
981                 fput(filp);
982                 RETURN(-ENOMEM);
983         }
984         kcd->kcd_magic = KKUC_CT_DATA_MAGIC;
985         kcd->kcd_uuid = lmv->cluuid;
986         kcd->kcd_archive = lk->lk_data;
987
988         rc = libcfs_kkuc_group_add(filp, lk->lk_uid, lk->lk_group, kcd);
989         if (rc != 0) {
990                 if (filp != NULL)
991                         fput(filp);
992                 OBD_FREE_PTR(kcd);
993         }
994
995         RETURN(rc);
996 }
997
998
999
1000
1001 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
1002                          int len, void *karg, void *uarg)
1003 {
1004         struct obd_device       *obddev = class_exp2obd(exp);
1005         struct lmv_obd          *lmv = &obddev->u.lmv;
1006         struct lmv_tgt_desc     *tgt = NULL;
1007         __u32                    i = 0;
1008         int                      rc = 0;
1009         int                      set = 0;
1010         __u32                    count = lmv->desc.ld_tgt_count;
1011         ENTRY;
1012
1013         if (count == 0)
1014                 RETURN(-ENOTTY);
1015
1016         switch (cmd) {
1017         case IOC_OBD_STATFS: {
1018                 struct obd_ioctl_data *data = karg;
1019                 struct obd_device *mdc_obd;
1020                 struct obd_statfs stat_buf = {0};
1021                 __u32 index;
1022
1023                 memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
1024                 if ((index >= count))
1025                         RETURN(-ENODEV);
1026
1027                 tgt = lmv->tgts[index];
1028                 if (tgt == NULL || !tgt->ltd_active)
1029                         RETURN(-ENODATA);
1030
1031                 mdc_obd = class_exp2obd(tgt->ltd_exp);
1032                 if (!mdc_obd)
1033                         RETURN(-EINVAL);
1034
1035                 /* copy UUID */
1036                 if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
1037                                  min((int) data->ioc_plen2,
1038                                      (int) sizeof(struct obd_uuid))))
1039                         RETURN(-EFAULT);
1040
1041                 rc = obd_statfs(NULL, tgt->ltd_exp, &stat_buf,
1042                                 cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
1043                                 0);
1044                 if (rc)
1045                         RETURN(rc);
1046                 if (copy_to_user(data->ioc_pbuf1, &stat_buf,
1047                                  min((int) data->ioc_plen1,
1048                                      (int) sizeof(stat_buf))))
1049                         RETURN(-EFAULT);
1050                 break;
1051         }
1052         case OBD_IOC_QUOTACTL: {
1053                 struct if_quotactl *qctl = karg;
1054                 struct obd_quotactl *oqctl;
1055
1056                 if (qctl->qc_valid == QC_MDTIDX) {
1057                         if (count <= qctl->qc_idx)
1058                                 RETURN(-EINVAL);
1059
1060                         tgt = lmv->tgts[qctl->qc_idx];
1061                         if (tgt == NULL || tgt->ltd_exp == NULL)
1062                                 RETURN(-EINVAL);
1063                 } else if (qctl->qc_valid == QC_UUID) {
1064                         for (i = 0; i < count; i++) {
1065                                 tgt = lmv->tgts[i];
1066                                 if (tgt == NULL)
1067                                         continue;
1068                                 if (!obd_uuid_equals(&tgt->ltd_uuid,
1069                                                      &qctl->obd_uuid))
1070                                         continue;
1071
1072                                 if (tgt->ltd_exp == NULL)
1073                                         RETURN(-EINVAL);
1074
1075                                 break;
1076                         }
1077                 } else {
1078                         RETURN(-EINVAL);
1079                 }
1080
1081                 if (i >= count)
1082                         RETURN(-EAGAIN);
1083
1084                 LASSERT(tgt != NULL && tgt->ltd_exp != NULL);
1085                 OBD_ALLOC_PTR(oqctl);
1086                 if (!oqctl)
1087                         RETURN(-ENOMEM);
1088
1089                 QCTL_COPY(oqctl, qctl);
1090                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
1091                 if (rc == 0) {
1092                         QCTL_COPY(qctl, oqctl);
1093                         qctl->qc_valid = QC_MDTIDX;
1094                         qctl->obd_uuid = tgt->ltd_uuid;
1095                 }
1096                 OBD_FREE_PTR(oqctl);
1097                 break;
1098         }
1099         case OBD_IOC_CHANGELOG_SEND:
1100         case OBD_IOC_CHANGELOG_CLEAR: {
1101                 struct ioc_changelog *icc = karg;
1102
1103                 if (icc->icc_mdtindex >= count)
1104                         RETURN(-ENODEV);
1105
1106                 tgt = lmv->tgts[icc->icc_mdtindex];
1107                 if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active)
1108                         RETURN(-ENODEV);
1109                 rc = obd_iocontrol(cmd, tgt->ltd_exp, sizeof(*icc), icc, NULL);
1110                 break;
1111         }
1112         case LL_IOC_GET_CONNECT_FLAGS: {
1113                 tgt = lmv->tgts[0];
1114                 if (tgt == NULL || tgt->ltd_exp == NULL)
1115                         RETURN(-ENODATA);
1116                 rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1117                 break;
1118         }
1119         case OBD_IOC_FID2PATH: {
1120                 rc = lmv_fid2path(exp, len, karg, uarg);
1121                 break;
1122         }
1123         case LL_IOC_HSM_STATE_GET:
1124         case LL_IOC_HSM_STATE_SET:
1125         case LL_IOC_HSM_ACTION: {
1126                 struct md_op_data       *op_data = karg;
1127
1128                 tgt = lmv_find_target(lmv, &op_data->op_fid1);
1129                 if (IS_ERR(tgt))
1130                         RETURN(PTR_ERR(tgt));
1131
1132                 if (tgt->ltd_exp == NULL)
1133                         RETURN(-EINVAL);
1134
1135                 rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1136                 break;
1137         }
1138         case LL_IOC_HSM_PROGRESS: {
1139                 const struct hsm_progress_kernel *hpk = karg;
1140
1141                 tgt = lmv_find_target(lmv, &hpk->hpk_fid);
1142                 if (IS_ERR(tgt))
1143                         RETURN(PTR_ERR(tgt));
1144                 rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1145                 break;
1146         }
1147         case LL_IOC_HSM_REQUEST: {
1148                 struct hsm_user_request *hur = karg;
1149                 unsigned int reqcount = hur->hur_request.hr_itemcount;
1150
1151                 if (reqcount == 0)
1152                         RETURN(0);
1153
1154                 /* if the request is about a single fid
1155                  * or if there is a single MDS, no need to split
1156                  * the request. */
1157                 if (reqcount == 1 || count == 1) {
1158                         tgt = lmv_find_target(lmv,
1159                                               &hur->hur_user_item[0].hui_fid);
1160                         if (IS_ERR(tgt))
1161                                 RETURN(PTR_ERR(tgt));
1162                         rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1163                 } else {
1164                         /* split fid list to their respective MDS */
1165                         for (i = 0; i < count; i++) {
1166                                 unsigned int            nr, reqlen;
1167                                 int                     rc1;
1168                                 struct hsm_user_request *req;
1169
1170                                 tgt = lmv->tgts[i];
1171                                 if (tgt == NULL || tgt->ltd_exp == NULL)
1172                                         continue;
1173
1174                                 nr = lmv_hsm_req_count(lmv, hur, tgt);
1175                                 if (nr == 0) /* nothing for this MDS */
1176                                         continue;
1177
1178                                 /* build a request with fids for this MDS */
1179                                 reqlen = offsetof(typeof(*hur),
1180                                                   hur_user_item[nr])
1181                                                 + hur->hur_request.hr_data_len;
1182                                 OBD_ALLOC_LARGE(req, reqlen);
1183                                 if (req == NULL)
1184                                         RETURN(-ENOMEM);
1185
1186                                 lmv_hsm_req_build(lmv, hur, tgt, req);
1187
1188                                 rc1 = obd_iocontrol(cmd, tgt->ltd_exp, reqlen,
1189                                                     req, uarg);
1190                                 if (rc1 != 0 && rc == 0)
1191                                         rc = rc1;
1192                                 OBD_FREE_LARGE(req, reqlen);
1193                         }
1194                 }
1195                 break;
1196         }
1197         case LL_IOC_LOV_SWAP_LAYOUTS: {
1198                 struct md_op_data       *op_data = karg;
1199                 struct lmv_tgt_desc     *tgt1, *tgt2;
1200
1201                 tgt1 = lmv_find_target(lmv, &op_data->op_fid1);
1202                 if (IS_ERR(tgt1))
1203                         RETURN(PTR_ERR(tgt1));
1204
1205                 tgt2 = lmv_find_target(lmv, &op_data->op_fid2);
1206                 if (IS_ERR(tgt2))
1207                         RETURN(PTR_ERR(tgt2));
1208
1209                 if ((tgt1->ltd_exp == NULL) || (tgt2->ltd_exp == NULL))
1210                         RETURN(-EINVAL);
1211
1212                 /* only files on same MDT can have their layouts swapped */
1213                 if (tgt1->ltd_idx != tgt2->ltd_idx)
1214                         RETURN(-EPERM);
1215
1216                 rc = obd_iocontrol(cmd, tgt1->ltd_exp, len, karg, uarg);
1217                 break;
1218         }
1219         case LL_IOC_HSM_CT_START: {
1220                 struct lustre_kernelcomm *lk = karg;
1221                 if (lk->lk_flags & LK_FLG_STOP)
1222                         rc = lmv_hsm_ct_unregister(lmv, cmd, len, lk, uarg);
1223                 else
1224                         rc = lmv_hsm_ct_register(lmv, cmd, len, lk, uarg);
1225                 break;
1226         }
1227         default:
1228                 for (i = 0; i < count; i++) {
1229                         struct obd_device *mdc_obd;
1230                         int err;
1231
1232                         tgt = lmv->tgts[i];
1233                         if (tgt == NULL || tgt->ltd_exp == NULL)
1234                                 continue;
1235                         /* ll_umount_begin() sets force flag but for lmv, not
1236                          * mdc. Let's pass it through */
1237                         mdc_obd = class_exp2obd(tgt->ltd_exp);
1238                         mdc_obd->obd_force = obddev->obd_force;
1239                         err = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1240                         if (err == -ENODATA && cmd == OBD_IOC_POLL_QUOTACHECK) {
1241                                 RETURN(err);
1242                         } else if (err) {
1243                                 if (tgt->ltd_active) {
1244                                         CERROR("error: iocontrol MDC %s on MDT"
1245                                                " idx %d cmd %x: err = %d\n",
1246                                                tgt->ltd_uuid.uuid, i, cmd, err);
1247                                         if (!rc)
1248                                                 rc = err;
1249                                 }
1250                         } else
1251                                 set = 1;
1252                 }
1253                 if (!set && !rc)
1254                         rc = -EIO;
1255         }
1256         RETURN(rc);
1257 }
1258
1259 #if 0
1260 static int lmv_all_chars_policy(int count, const char *name,
1261                                 int len)
1262 {
1263         unsigned int c = 0;
1264
1265         while (len > 0)
1266                 c += name[--len];
1267         c = c % count;
1268         return c;
1269 }
1270
1271 static int lmv_nid_policy(struct lmv_obd *lmv)
1272 {
1273         struct obd_import *imp;
1274         __u32              id;
1275
1276         /*
1277          * XXX: To get nid we assume that underlying obd device is mdc.
1278          */
1279         imp = class_exp2cliimp(lmv->tgts[0].ltd_exp);
1280         id = imp->imp_connection->c_self ^ (imp->imp_connection->c_self >> 32);
1281         return id % lmv->desc.ld_tgt_count;
1282 }
1283
1284 static int lmv_choose_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
1285                           placement_policy_t placement)
1286 {
1287         switch (placement) {
1288         case PLACEMENT_CHAR_POLICY:
1289                 return lmv_all_chars_policy(lmv->desc.ld_tgt_count,
1290                                             op_data->op_name,
1291                                             op_data->op_namelen);
1292         case PLACEMENT_NID_POLICY:
1293                 return lmv_nid_policy(lmv);
1294
1295         default:
1296                 break;
1297         }
1298
1299         CERROR("Unsupported placement policy %x\n", placement);
1300         return -EINVAL;
1301 }
1302 #endif
1303
1304 /**
1305  * This is _inode_ placement policy function (not name).
1306  */
1307 static int lmv_placement_policy(struct obd_device *obd,
1308                                 struct md_op_data *op_data,
1309                                 mdsno_t *mds)
1310 {
1311         struct lmv_obd          *lmv = &obd->u.lmv;
1312         ENTRY;
1313
1314         LASSERT(mds != NULL);
1315
1316         if (lmv->desc.ld_tgt_count == 1) {
1317                 *mds = 0;
1318                 RETURN(0);
1319         }
1320
1321         /**
1322          * If stripe_offset is provided during setdirstripe
1323          * (setdirstripe -i xx), xx MDS will be choosen.
1324          */
1325         if (op_data->op_cli_flags & CLI_SET_MEA) {
1326                 struct lmv_user_md *lum;
1327
1328                 lum = (struct lmv_user_md *)op_data->op_data;
1329                 if (lum->lum_type == LMV_STRIPE_TYPE &&
1330                     lum->lum_stripe_offset != -1) {
1331                         if (lum->lum_stripe_offset >= lmv->desc.ld_tgt_count) {
1332                                 CERROR("%s: Stripe_offset %d > MDT count %d:"
1333                                        " rc = %d\n", obd->obd_name,
1334                                        lum->lum_stripe_offset,
1335                                        lmv->desc.ld_tgt_count, -ERANGE);
1336                                 RETURN(-ERANGE);
1337                         }
1338                         *mds = lum->lum_stripe_offset;
1339                         RETURN(0);
1340                 }
1341         }
1342
1343         /* Allocate new fid on target according to operation type and parent
1344          * home mds. */
1345         *mds = op_data->op_mds;
1346         RETURN(0);
1347 }
1348
1349 int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
1350                     mdsno_t mds)
1351 {
1352         struct lmv_tgt_desc     *tgt;
1353         int                      rc;
1354         ENTRY;
1355
1356         tgt = lmv_get_target(lmv, mds);
1357         if (IS_ERR(tgt))
1358                 RETURN(PTR_ERR(tgt));
1359
1360         /*
1361          * New seq alloc and FLD setup should be atomic. Otherwise we may find
1362          * on server that seq in new allocated fid is not yet known.
1363          */
1364         mutex_lock(&tgt->ltd_fid_mutex);
1365
1366         if (tgt->ltd_active == 0 || tgt->ltd_exp == NULL)
1367                 GOTO(out, rc = -ENODEV);
1368
1369         /*
1370          * Asking underlaying tgt layer to allocate new fid.
1371          */
1372         rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL);
1373         if (rc > 0) {
1374                 LASSERT(fid_is_sane(fid));
1375                 rc = 0;
1376         }
1377
1378         EXIT;
1379 out:
1380         mutex_unlock(&tgt->ltd_fid_mutex);
1381         return rc;
1382 }
1383
1384 int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
1385                   struct md_op_data *op_data)
1386 {
1387         struct obd_device     *obd = class_exp2obd(exp);
1388         struct lmv_obd        *lmv = &obd->u.lmv;
1389         mdsno_t                mds = 0;
1390         int                    rc;
1391         ENTRY;
1392
1393         LASSERT(op_data != NULL);
1394         LASSERT(fid != NULL);
1395
1396         rc = lmv_placement_policy(obd, op_data, &mds);
1397         if (rc) {
1398                 CERROR("Can't get target for allocating fid, "
1399                        "rc %d\n", rc);
1400                 RETURN(rc);
1401         }
1402
1403         rc = __lmv_fid_alloc(lmv, fid, mds);
1404         if (rc) {
1405                 CERROR("Can't alloc new fid, rc %d\n", rc);
1406                 RETURN(rc);
1407         }
1408
1409         RETURN(rc);
1410 }
1411
1412 static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
1413 {
1414         struct lmv_obd  *lmv = &obd->u.lmv;
1415         struct lmv_desc *desc;
1416         int             rc;
1417         ENTRY;
1418
1419         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
1420                 CERROR("LMV setup requires a descriptor\n");
1421                 RETURN(-EINVAL);
1422         }
1423
1424         desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
1425         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
1426                 CERROR("Lmv descriptor size wrong: %d > %d\n",
1427                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
1428                 RETURN(-EINVAL);
1429         }
1430
1431         OBD_ALLOC(lmv->tgts, sizeof(*lmv->tgts) * 32);
1432         if (lmv->tgts == NULL)
1433                 RETURN(-ENOMEM);
1434         lmv->tgts_size = 32;
1435
1436         obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid);
1437         lmv->desc.ld_tgt_count = 0;
1438         lmv->desc.ld_active_tgt_count = 0;
1439         lmv->max_cookiesize = 0;
1440         lmv->max_def_easize = 0;
1441         lmv->max_easize = 0;
1442         lmv->lmv_placement = PLACEMENT_CHAR_POLICY;
1443
1444         spin_lock_init(&lmv->lmv_lock);
1445         mutex_init(&lmv->init_mutex);
1446
1447 #ifdef LPROCFS
1448         obd->obd_vars = lprocfs_lmv_obd_vars;
1449         lprocfs_seq_obd_setup(obd);
1450         lprocfs_alloc_md_stats(obd, 0);
1451         rc = lprocfs_seq_create(obd->obd_proc_entry, "target_obd",
1452                                 0444, &lmv_proc_target_fops, obd);
1453         if (rc)
1454                 CWARN("%s: error adding LMV target_obd file: rc = %d\n",
1455                       obd->obd_name, rc);
1456 #endif
1457         rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
1458                              LUSTRE_CLI_FLD_HASH_DHT);
1459         if (rc) {
1460                 CERROR("Can't init FLD, err %d\n", rc);
1461                 GOTO(out, rc);
1462         }
1463
1464         RETURN(0);
1465
1466 out:
1467         return rc;
1468 }
1469
1470 static int lmv_cleanup(struct obd_device *obd)
1471 {
1472         struct lmv_obd   *lmv = &obd->u.lmv;
1473         ENTRY;
1474
1475         fld_client_fini(&lmv->lmv_fld);
1476         if (lmv->tgts != NULL) {
1477                 int i;
1478                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1479                         if (lmv->tgts[i] == NULL)
1480                                 continue;
1481                         lmv_del_target(lmv, i);
1482                 }
1483                 OBD_FREE(lmv->tgts, sizeof(*lmv->tgts) * lmv->tgts_size);
1484                 lmv->tgts_size = 0;
1485         }
1486         RETURN(0);
1487 }
1488
1489 static int lmv_process_config(struct obd_device *obd, obd_count len, void *buf)
1490 {
1491         struct lustre_cfg       *lcfg = buf;
1492         struct obd_uuid         obd_uuid;
1493         int                     gen;
1494         __u32                   index;
1495         int                     rc;
1496         ENTRY;
1497
1498         switch (lcfg->lcfg_command) {
1499         case LCFG_ADD_MDC:
1500                 /* modify_mdc_tgts add 0:lustre-clilmv  1:lustre-MDT0000_UUID
1501                  * 2:0  3:1  4:lustre-MDT0000-mdc_UUID */
1502                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid))
1503                         GOTO(out, rc = -EINVAL);
1504
1505                 obd_str2uuid(&obd_uuid,  lustre_cfg_buf(lcfg, 1));
1506
1507                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1508                         GOTO(out, rc = -EINVAL);
1509                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
1510                         GOTO(out, rc = -EINVAL);
1511                 rc = lmv_add_target(obd, &obd_uuid, index, gen);
1512                 GOTO(out, rc);
1513         default:
1514                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
1515                 GOTO(out, rc = -EINVAL);
1516         }
1517 out:
1518         RETURN(rc);
1519 }
1520
1521 static int lmv_statfs(const struct lu_env *env, struct obd_export *exp,
1522                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
1523 {
1524         struct obd_device       *obd = class_exp2obd(exp);
1525         struct lmv_obd          *lmv = &obd->u.lmv;
1526         struct obd_statfs       *temp;
1527         int                      rc = 0;
1528         __u32                    i;
1529         ENTRY;
1530
1531         rc = lmv_check_connect(obd);
1532         if (rc)
1533                 RETURN(rc);
1534
1535         OBD_ALLOC(temp, sizeof(*temp));
1536         if (temp == NULL)
1537                 RETURN(-ENOMEM);
1538
1539         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1540                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
1541                         continue;
1542
1543                 rc = obd_statfs(env, lmv->tgts[i]->ltd_exp, temp,
1544                                 max_age, flags);
1545                 if (rc) {
1546                         CERROR("can't stat MDS #%d (%s), error %d\n", i,
1547                                lmv->tgts[i]->ltd_exp->exp_obd->obd_name,
1548                                rc);
1549                         GOTO(out_free_temp, rc);
1550                 }
1551
1552                 if (i == 0) {
1553                         *osfs = *temp;
1554                         /* If the statfs is from mount, it will needs
1555                          * retrieve necessary information from MDT0.
1556                          * i.e. mount does not need the merged osfs
1557                          * from all of MDT.
1558                          * And also clients can be mounted as long as
1559                          * MDT0 is in service*/
1560                         if (flags & OBD_STATFS_FOR_MDT0)
1561                                 GOTO(out_free_temp, rc);
1562                 } else {
1563                         osfs->os_bavail += temp->os_bavail;
1564                         osfs->os_blocks += temp->os_blocks;
1565                         osfs->os_ffree += temp->os_ffree;
1566                         osfs->os_files += temp->os_files;
1567                 }
1568         }
1569
1570         EXIT;
1571 out_free_temp:
1572         OBD_FREE(temp, sizeof(*temp));
1573         return rc;
1574 }
1575
1576 static int lmv_getstatus(struct obd_export *exp,
1577                          struct lu_fid *fid,
1578                          struct obd_capa **pc)
1579 {
1580         struct obd_device    *obd = exp->exp_obd;
1581         struct lmv_obd       *lmv = &obd->u.lmv;
1582         int                   rc;
1583         ENTRY;
1584
1585         rc = lmv_check_connect(obd);
1586         if (rc)
1587                 RETURN(rc);
1588
1589         rc = md_getstatus(lmv->tgts[0]->ltd_exp, fid, pc);
1590         RETURN(rc);
1591 }
1592
1593 static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid,
1594                         struct obd_capa *oc, obd_valid valid, const char *name,
1595                         const char *input, int input_size, int output_size,
1596                         int flags, struct ptlrpc_request **request)
1597 {
1598         struct obd_device      *obd = exp->exp_obd;
1599         struct lmv_obd         *lmv = &obd->u.lmv;
1600         struct lmv_tgt_desc    *tgt;
1601         int                     rc;
1602         ENTRY;
1603
1604         rc = lmv_check_connect(obd);
1605         if (rc)
1606                 RETURN(rc);
1607
1608         tgt = lmv_find_target(lmv, fid);
1609         if (IS_ERR(tgt))
1610                 RETURN(PTR_ERR(tgt));
1611
1612         rc = md_getxattr(tgt->ltd_exp, fid, oc, valid, name, input,
1613                          input_size, output_size, flags, request);
1614
1615         RETURN(rc);
1616 }
1617
1618 static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
1619                         struct obd_capa *oc, obd_valid valid, const char *name,
1620                         const char *input, int input_size, int output_size,
1621                         int flags, __u32 suppgid,
1622                         struct ptlrpc_request **request)
1623 {
1624         struct obd_device      *obd = exp->exp_obd;
1625         struct lmv_obd         *lmv = &obd->u.lmv;
1626         struct lmv_tgt_desc    *tgt;
1627         int                     rc;
1628         ENTRY;
1629
1630         rc = lmv_check_connect(obd);
1631         if (rc)
1632                 RETURN(rc);
1633
1634         tgt = lmv_find_target(lmv, fid);
1635         if (IS_ERR(tgt))
1636                 RETURN(PTR_ERR(tgt));
1637
1638         rc = md_setxattr(tgt->ltd_exp, fid, oc, valid, name, input,
1639                          input_size, output_size, flags, suppgid,
1640                          request);
1641
1642         RETURN(rc);
1643 }
1644
1645 static int lmv_getattr(struct obd_export *exp, struct md_op_data *op_data,
1646                        struct ptlrpc_request **request)
1647 {
1648         struct obd_device       *obd = exp->exp_obd;
1649         struct lmv_obd          *lmv = &obd->u.lmv;
1650         struct lmv_tgt_desc     *tgt;
1651         int                      rc;
1652         ENTRY;
1653
1654         rc = lmv_check_connect(obd);
1655         if (rc)
1656                 RETURN(rc);
1657
1658         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1659         if (IS_ERR(tgt))
1660                 RETURN(PTR_ERR(tgt));
1661
1662         if (op_data->op_flags & MF_GET_MDT_IDX) {
1663                 op_data->op_mds = tgt->ltd_idx;
1664                 RETURN(0);
1665         }
1666
1667         rc = md_getattr(tgt->ltd_exp, op_data, request);
1668
1669         RETURN(rc);
1670 }
1671
1672 static int lmv_null_inode(struct obd_export *exp, const struct lu_fid *fid)
1673 {
1674         struct obd_device   *obd = exp->exp_obd;
1675         struct lmv_obd      *lmv = &obd->u.lmv;
1676         __u32                i;
1677         int                  rc;
1678         ENTRY;
1679
1680         rc = lmv_check_connect(obd);
1681         if (rc)
1682                 RETURN(rc);
1683
1684         CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
1685
1686         /*
1687          * With DNE every object can have two locks in different namespaces:
1688          * lookup lock in space of MDT storing direntry and update/open lock in
1689          * space of MDT storing inode.
1690          */
1691         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1692                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
1693                         continue;
1694                 md_null_inode(lmv->tgts[i]->ltd_exp, fid);
1695         }
1696
1697         RETURN(0);
1698 }
1699
1700 static int lmv_find_cbdata(struct obd_export *exp, const struct lu_fid *fid,
1701                            ldlm_iterator_t it, void *data)
1702 {
1703         struct obd_device   *obd = exp->exp_obd;
1704         struct lmv_obd      *lmv = &obd->u.lmv;
1705         __u32                i;
1706         int                  rc;
1707         ENTRY;
1708
1709         rc = lmv_check_connect(obd);
1710         if (rc)
1711                 RETURN(rc);
1712
1713         CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
1714
1715         /*
1716          * With DNE every object can have two locks in different namespaces:
1717          * lookup lock in space of MDT storing direntry and update/open lock in
1718          * space of MDT storing inode.
1719          */
1720         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1721                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
1722                         continue;
1723                 rc = md_find_cbdata(lmv->tgts[i]->ltd_exp, fid, it, data);
1724                 if (rc)
1725                         RETURN(rc);
1726         }
1727
1728         RETURN(rc);
1729 }
1730
1731
1732 static int lmv_close(struct obd_export *exp, struct md_op_data *op_data,
1733                      struct md_open_data *mod, struct ptlrpc_request **request)
1734 {
1735         struct obd_device     *obd = exp->exp_obd;
1736         struct lmv_obd        *lmv = &obd->u.lmv;
1737         struct lmv_tgt_desc   *tgt;
1738         int                    rc;
1739         ENTRY;
1740
1741         rc = lmv_check_connect(obd);
1742         if (rc)
1743                 RETURN(rc);
1744
1745         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1746         if (IS_ERR(tgt))
1747                 RETURN(PTR_ERR(tgt));
1748
1749         CDEBUG(D_INODE, "CLOSE "DFID"\n", PFID(&op_data->op_fid1));
1750         rc = md_close(tgt->ltd_exp, op_data, mod, request);
1751         RETURN(rc);
1752 }
1753
1754 struct lmv_tgt_desc
1755 *lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
1756                 struct lu_fid *fid)
1757 {
1758         struct lmv_tgt_desc *tgt;
1759
1760         tgt = lmv_find_target(lmv, fid);
1761         if (IS_ERR(tgt))
1762                 return tgt;
1763
1764         op_data->op_mds = tgt->ltd_idx;
1765
1766         return tgt;
1767 }
1768
1769 int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
1770                const void *data, int datalen, int mode, __u32 uid,
1771                __u32 gid, cfs_cap_t cap_effective, __u64 rdev,
1772                struct ptlrpc_request **request)
1773 {
1774         struct obd_device       *obd = exp->exp_obd;
1775         struct lmv_obd          *lmv = &obd->u.lmv;
1776         struct lmv_tgt_desc     *tgt;
1777         int                      rc;
1778         ENTRY;
1779
1780         rc = lmv_check_connect(obd);
1781         if (rc)
1782                 RETURN(rc);
1783
1784         if (!lmv->desc.ld_active_tgt_count)
1785                 RETURN(-EIO);
1786
1787         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1788         if (IS_ERR(tgt))
1789                 RETURN(PTR_ERR(tgt));
1790
1791         rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data);
1792         if (rc)
1793                 RETURN(rc);
1794
1795         CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #%x\n",
1796                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1797                op_data->op_mds);
1798
1799         op_data->op_flags |= MF_MDC_CANCEL_FID1;
1800         rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid,
1801                        cap_effective, rdev, request);
1802
1803         if (rc == 0) {
1804                 if (*request == NULL)
1805                         RETURN(rc);
1806                 CDEBUG(D_INODE, "Created - "DFID"\n", PFID(&op_data->op_fid2));
1807         }
1808         RETURN(rc);
1809 }
1810
1811 static int lmv_done_writing(struct obd_export *exp,
1812                             struct md_op_data *op_data,
1813                             struct md_open_data *mod)
1814 {
1815         struct obd_device     *obd = exp->exp_obd;
1816         struct lmv_obd        *lmv = &obd->u.lmv;
1817         struct lmv_tgt_desc   *tgt;
1818         int                    rc;
1819         ENTRY;
1820
1821         rc = lmv_check_connect(obd);
1822         if (rc)
1823                 RETURN(rc);
1824
1825         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1826         if (IS_ERR(tgt))
1827                 RETURN(PTR_ERR(tgt));
1828
1829         rc = md_done_writing(tgt->ltd_exp, op_data, mod);
1830         RETURN(rc);
1831 }
1832
1833 static int
1834 lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1835                    struct lookup_intent *it, struct md_op_data *op_data,
1836                    struct lustre_handle *lockh, void *lmm, int lmmsize,
1837                    __u64 extra_lock_flags)
1838 {
1839         struct ptlrpc_request      *req = it->d.lustre.it_data;
1840         struct obd_device          *obd = exp->exp_obd;
1841         struct lmv_obd             *lmv = &obd->u.lmv;
1842         struct lustre_handle        plock;
1843         struct lmv_tgt_desc        *tgt;
1844         struct md_op_data          *rdata;
1845         struct lu_fid               fid1;
1846         struct mdt_body            *body;
1847         int                         rc = 0;
1848         int                         pmode;
1849         ENTRY;
1850
1851         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1852         LASSERT(body != NULL);
1853
1854         if (!(body->valid & OBD_MD_MDS))
1855                 RETURN(0);
1856
1857         CDEBUG(D_INODE, "REMOTE_ENQUEUE '%s' on "DFID" -> "DFID"\n",
1858                LL_IT2STR(it), PFID(&op_data->op_fid1), PFID(&body->fid1));
1859
1860         /*
1861          * We got LOOKUP lock, but we really need attrs.
1862          */
1863         pmode = it->d.lustre.it_lock_mode;
1864         LASSERT(pmode != 0);
1865         memcpy(&plock, lockh, sizeof(plock));
1866         it->d.lustre.it_lock_mode = 0;
1867         it->d.lustre.it_data = NULL;
1868         fid1 = body->fid1;
1869
1870         ptlrpc_req_finished(req);
1871
1872         tgt = lmv_find_target(lmv, &fid1);
1873         if (IS_ERR(tgt))
1874                 GOTO(out, rc = PTR_ERR(tgt));
1875
1876         OBD_ALLOC_PTR(rdata);
1877         if (rdata == NULL)
1878                 GOTO(out, rc = -ENOMEM);
1879
1880         rdata->op_fid1 = fid1;
1881         rdata->op_bias = MDS_CROSS_REF;
1882
1883         rc = md_enqueue(tgt->ltd_exp, einfo, it, rdata, lockh,
1884                         lmm, lmmsize, NULL, extra_lock_flags);
1885         OBD_FREE_PTR(rdata);
1886         EXIT;
1887 out:
1888         ldlm_lock_decref(&plock, pmode);
1889         return rc;
1890 }
1891
1892 static int
1893 lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1894             struct lookup_intent *it, struct md_op_data *op_data,
1895             struct lustre_handle *lockh, void *lmm, int lmmsize,
1896             struct ptlrpc_request **req, __u64 extra_lock_flags)
1897 {
1898         struct obd_device        *obd = exp->exp_obd;
1899         struct lmv_obd           *lmv = &obd->u.lmv;
1900         struct lmv_tgt_desc      *tgt;
1901         int                       rc;
1902         ENTRY;
1903
1904         rc = lmv_check_connect(obd);
1905         if (rc)
1906                 RETURN(rc);
1907
1908         CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID"\n",
1909                LL_IT2STR(it), PFID(&op_data->op_fid1));
1910
1911         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1912         if (IS_ERR(tgt))
1913                 RETURN(PTR_ERR(tgt));
1914
1915         CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID" -> mds #%d\n",
1916                LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx);
1917
1918         rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data, lockh,
1919                         lmm, lmmsize, req, extra_lock_flags);
1920
1921         if (rc == 0 && it && it->it_op == IT_OPEN) {
1922                 rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh,
1923                                         lmm, lmmsize, extra_lock_flags);
1924         }
1925         RETURN(rc);
1926 }
1927
1928 static int
1929 lmv_getattr_name(struct obd_export *exp,struct md_op_data *op_data,
1930                  struct ptlrpc_request **request)
1931 {
1932         struct ptlrpc_request   *req = NULL;
1933         struct obd_device       *obd = exp->exp_obd;
1934         struct lmv_obd          *lmv = &obd->u.lmv;
1935         struct lmv_tgt_desc     *tgt;
1936         struct mdt_body         *body;
1937         int                      rc;
1938         ENTRY;
1939
1940         rc = lmv_check_connect(obd);
1941         if (rc)
1942                 RETURN(rc);
1943
1944         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1945         if (IS_ERR(tgt))
1946                 RETURN(PTR_ERR(tgt));
1947
1948         CDEBUG(D_INODE, "GETATTR_NAME for %*s on "DFID" -> mds #%d\n",
1949                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1950                tgt->ltd_idx);
1951
1952         rc = md_getattr_name(tgt->ltd_exp, op_data, request);
1953         if (rc != 0)
1954                 RETURN(rc);
1955
1956         body = req_capsule_server_get(&(*request)->rq_pill,
1957                                       &RMF_MDT_BODY);
1958         LASSERT(body != NULL);
1959
1960         if (body->valid & OBD_MD_MDS) {
1961                 struct lu_fid rid = body->fid1;
1962                 CDEBUG(D_INODE, "Request attrs for "DFID"\n",
1963                        PFID(&rid));
1964
1965                 tgt = lmv_find_target(lmv, &rid);
1966                 if (IS_ERR(tgt)) {
1967                         ptlrpc_req_finished(*request);
1968                         RETURN(PTR_ERR(tgt));
1969                 }
1970
1971                 op_data->op_fid1 = rid;
1972                 op_data->op_valid |= OBD_MD_FLCROSSREF;
1973                 op_data->op_namelen = 0;
1974                 op_data->op_name = NULL;
1975                 rc = md_getattr_name(tgt->ltd_exp, op_data, &req);
1976                 ptlrpc_req_finished(*request);
1977                 *request = req;
1978         }
1979
1980         RETURN(rc);
1981 }
1982
1983 #define md_op_data_fid(op_data, fl)                     \
1984         (fl == MF_MDC_CANCEL_FID1 ? &op_data->op_fid1 : \
1985          fl == MF_MDC_CANCEL_FID2 ? &op_data->op_fid2 : \
1986          fl == MF_MDC_CANCEL_FID3 ? &op_data->op_fid3 : \
1987          fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \
1988          NULL)
1989
1990 static int lmv_early_cancel(struct obd_export *exp, struct md_op_data *op_data,
1991                             int op_tgt, ldlm_mode_t mode, int bits, int flag)
1992 {
1993         struct lu_fid          *fid = md_op_data_fid(op_data, flag);
1994         struct obd_device      *obd = exp->exp_obd;
1995         struct lmv_obd         *lmv = &obd->u.lmv;
1996         struct lmv_tgt_desc    *tgt;
1997         ldlm_policy_data_t      policy = {{0}};
1998         int                     rc = 0;
1999         ENTRY;
2000
2001         if (!fid_is_sane(fid))
2002                 RETURN(0);
2003
2004         tgt = lmv_find_target(lmv, fid);
2005         if (IS_ERR(tgt))
2006                 RETURN(PTR_ERR(tgt));
2007
2008         if (tgt->ltd_idx != op_tgt) {
2009                 CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid));
2010                 policy.l_inodebits.bits = bits;
2011                 rc = md_cancel_unused(tgt->ltd_exp, fid, &policy,
2012                                       mode, LCF_ASYNC, NULL);
2013         } else {
2014                 CDEBUG(D_INODE,
2015                        "EARLY_CANCEL skip operation target %d on "DFID"\n",
2016                        op_tgt, PFID(fid));
2017                 op_data->op_flags |= flag;
2018                 rc = 0;
2019         }
2020
2021         RETURN(rc);
2022 }
2023
2024 /*
2025  * llite passes fid of an target inode in op_data->op_fid1 and id of directory in
2026  * op_data->op_fid2
2027  */
2028 static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
2029                     struct ptlrpc_request **request)
2030 {
2031         struct obd_device       *obd = exp->exp_obd;
2032         struct lmv_obd          *lmv = &obd->u.lmv;
2033         struct lmv_tgt_desc     *tgt;
2034         int                      rc;
2035         ENTRY;
2036
2037         rc = lmv_check_connect(obd);
2038         if (rc)
2039                 RETURN(rc);
2040
2041         LASSERT(op_data->op_namelen != 0);
2042
2043         CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n",
2044                PFID(&op_data->op_fid2), op_data->op_namelen,
2045                op_data->op_name, PFID(&op_data->op_fid1));
2046
2047         op_data->op_fsuid = current_fsuid();
2048         op_data->op_fsgid = current_fsgid();
2049         op_data->op_cap = cfs_curproc_cap_pack();
2050         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
2051         if (IS_ERR(tgt))
2052                 RETURN(PTR_ERR(tgt));
2053
2054         /*
2055          * Cancel UPDATE lock on child (fid1).
2056          */
2057         op_data->op_flags |= MF_MDC_CANCEL_FID2;
2058         rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
2059                               MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
2060         if (rc != 0)
2061                 RETURN(rc);
2062
2063         rc = md_link(tgt->ltd_exp, op_data, request);
2064
2065         RETURN(rc);
2066 }
2067
2068 static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
2069                       const char *old, int oldlen, const char *new, int newlen,
2070                       struct ptlrpc_request **request)
2071 {
2072         struct obd_device       *obd = exp->exp_obd;
2073         struct lmv_obd          *lmv = &obd->u.lmv;
2074         struct lmv_tgt_desc     *src_tgt;
2075         struct lmv_tgt_desc     *tgt_tgt;
2076         int                     rc;
2077         ENTRY;
2078
2079         LASSERT(oldlen != 0);
2080
2081         CDEBUG(D_INODE, "RENAME %*s in "DFID" to %*s in "DFID"\n",
2082                oldlen, old, PFID(&op_data->op_fid1),
2083                newlen, new, PFID(&op_data->op_fid2));
2084
2085         rc = lmv_check_connect(obd);
2086         if (rc)
2087                 RETURN(rc);
2088
2089         op_data->op_fsuid = current_fsuid();
2090         op_data->op_fsgid = current_fsgid();
2091         op_data->op_cap = cfs_curproc_cap_pack();
2092         src_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
2093         if (IS_ERR(src_tgt))
2094                 RETURN(PTR_ERR(src_tgt));
2095
2096         tgt_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
2097         if (IS_ERR(tgt_tgt))
2098                 RETURN(PTR_ERR(tgt_tgt));
2099         /*
2100          * LOOKUP lock on src child (fid3) should also be cancelled for
2101          * src_tgt in mdc_rename.
2102          */
2103         op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
2104
2105         /*
2106          * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
2107          * own target.
2108          */
2109         rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
2110                               LCK_EX, MDS_INODELOCK_UPDATE,
2111                               MF_MDC_CANCEL_FID2);
2112
2113         /*
2114          * Cancel LOOKUP locks on tgt child (fid4) for parent tgt_tgt.
2115          */
2116         if (rc == 0) {
2117                 rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
2118                                       LCK_EX, MDS_INODELOCK_LOOKUP,
2119                                       MF_MDC_CANCEL_FID4);
2120         }
2121
2122         /*
2123          * Cancel all the locks on tgt child (fid4).
2124          */
2125         if (rc == 0)
2126                 rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
2127                                       LCK_EX, MDS_INODELOCK_FULL,
2128                                       MF_MDC_CANCEL_FID4);
2129
2130         if (rc == 0)
2131                 rc = md_rename(src_tgt->ltd_exp, op_data, old, oldlen,
2132                                new, newlen, request);
2133         RETURN(rc);
2134 }
2135
2136 static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
2137                        void *ea, int ealen, void *ea2, int ea2len,
2138                        struct ptlrpc_request **request,
2139                        struct md_open_data **mod)
2140 {
2141         struct obd_device       *obd = exp->exp_obd;
2142         struct lmv_obd          *lmv = &obd->u.lmv;
2143         struct lmv_tgt_desc     *tgt;
2144         int                      rc = 0;
2145         ENTRY;
2146
2147         rc = lmv_check_connect(obd);
2148         if (rc)
2149                 RETURN(rc);
2150
2151         CDEBUG(D_INODE, "SETATTR for "DFID", valid 0x%x\n",
2152                PFID(&op_data->op_fid1), op_data->op_attr.ia_valid);
2153
2154         op_data->op_flags |= MF_MDC_CANCEL_FID1;
2155         tgt = lmv_find_target(lmv, &op_data->op_fid1);
2156         if (IS_ERR(tgt))
2157                 RETURN(PTR_ERR(tgt));
2158
2159         rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen, ea2,
2160                         ea2len, request, mod);
2161
2162         RETURN(rc);
2163 }
2164
2165 static int lmv_fsync(struct obd_export *exp, const struct lu_fid *fid,
2166                      struct obd_capa *oc, struct ptlrpc_request **request)
2167 {
2168         struct obd_device       *obd = exp->exp_obd;
2169         struct lmv_obd          *lmv = &obd->u.lmv;
2170         struct lmv_tgt_desc     *tgt;
2171         int                      rc;
2172         ENTRY;
2173
2174         rc = lmv_check_connect(obd);
2175         if (rc != 0)
2176                 RETURN(rc);
2177
2178         tgt = lmv_find_target(lmv, fid);
2179         if (IS_ERR(tgt))
2180                 RETURN(PTR_ERR(tgt));
2181
2182         rc = md_fsync(tgt->ltd_exp, fid, oc, request);
2183         RETURN(rc);
2184 }
2185
2186 /*
2187  * Adjust a set of pages, each page containing an array of lu_dirpages,
2188  * so that each page can be used as a single logical lu_dirpage.
2189  *
2190  * A lu_dirpage is laid out as follows, where s = ldp_hash_start,
2191  * e = ldp_hash_end, f = ldp_flags, p = padding, and each "ent" is a
2192  * struct lu_dirent.  It has size up to LU_PAGE_SIZE. The ldp_hash_end
2193  * value is used as a cookie to request the next lu_dirpage in a
2194  * directory listing that spans multiple pages (two in this example):
2195  *   ________
2196  *  |        |
2197  * .|--------v-------   -----.
2198  * |s|e|f|p|ent|ent| ... |ent|
2199  * '--|--------------   -----'   Each CFS_PAGE contains a single
2200  *    '------.                   lu_dirpage.
2201  * .---------v-------   -----.
2202  * |s|e|f|p|ent| 0 | ... | 0 |
2203  * '-----------------   -----'
2204  *
2205  * However, on hosts where the native VM page size (PAGE_CACHE_SIZE) is
2206  * larger than LU_PAGE_SIZE, a single host page may contain multiple
2207  * lu_dirpages. After reading the lu_dirpages from the MDS, the
2208  * ldp_hash_end of the first lu_dirpage refers to the one immediately
2209  * after it in the same CFS_PAGE (arrows simplified for brevity, but
2210  * in general e0==s1, e1==s2, etc.):
2211  *
2212  * .--------------------   -----.
2213  * |s0|e0|f0|p|ent|ent| ... |ent|
2214  * |---v----------------   -----|
2215  * |s1|e1|f1|p|ent|ent| ... |ent|
2216  * |---v----------------   -----|  Here, each CFS_PAGE contains
2217  *             ...                 multiple lu_dirpages.
2218  * |---v----------------   -----|
2219  * |s'|e'|f'|p|ent|ent| ... |ent|
2220  * '---|----------------   -----'
2221  *     v
2222  * .----------------------------.
2223  * |        next CFS_PAGE       |
2224  *
2225  * This structure is transformed into a single logical lu_dirpage as follows:
2226  *
2227  * - Replace e0 with e' so the request for the next lu_dirpage gets the page
2228  *   labeled 'next CFS_PAGE'.
2229  *
2230  * - Copy the LDF_COLLIDE flag from f' to f0 to correctly reflect whether
2231  *   a hash collision with the next page exists.
2232  *
2233  * - Adjust the lde_reclen of the ending entry of each lu_dirpage to span
2234  *   to the first entry of the next lu_dirpage.
2235  */
2236 #if PAGE_CACHE_SIZE > LU_PAGE_SIZE
2237 static void lmv_adjust_dirpages(struct page **pages, int ncfspgs, int nlupgs)
2238 {
2239         int i;
2240
2241         for (i = 0; i < ncfspgs; i++) {
2242                 struct lu_dirpage       *dp = kmap(pages[i]);
2243                 struct lu_dirpage       *first = dp;
2244                 struct lu_dirent        *end_dirent = NULL;
2245                 struct lu_dirent        *ent;
2246                 __u64                   hash_end = dp->ldp_hash_end;
2247                 __u32                   flags = dp->ldp_flags;
2248
2249                 while (--nlupgs > 0) {
2250                         ent = lu_dirent_start(dp);
2251                         for (end_dirent = ent; ent != NULL;
2252                              end_dirent = ent, ent = lu_dirent_next(ent));
2253
2254                         /* Advance dp to next lu_dirpage. */
2255                         dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2256
2257                         /* Check if we've reached the end of the CFS_PAGE. */
2258                         if (!((unsigned long)dp & ~CFS_PAGE_MASK))
2259                                 break;
2260
2261                         /* Save the hash and flags of this lu_dirpage. */
2262                         hash_end = dp->ldp_hash_end;
2263                         flags = dp->ldp_flags;
2264
2265                         /* Check if lu_dirpage contains no entries. */
2266                         if (!end_dirent)
2267                                 break;
2268
2269                         /* Enlarge the end entry lde_reclen from 0 to
2270                          * first entry of next lu_dirpage. */
2271                         LASSERT(le16_to_cpu(end_dirent->lde_reclen) == 0);
2272                         end_dirent->lde_reclen =
2273                                 cpu_to_le16((char *)(dp->ldp_entries) -
2274                                             (char *)end_dirent);
2275                 }
2276
2277                 first->ldp_hash_end = hash_end;
2278                 first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
2279                 first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
2280
2281                 kunmap(pages[i]);
2282         }
2283         LASSERTF(nlupgs == 0, "left = %d", nlupgs);
2284 }
2285 #else
2286 #define lmv_adjust_dirpages(pages, ncfspgs, nlupgs) do {} while (0)
2287 #endif  /* PAGE_CACHE_SIZE > LU_PAGE_SIZE */
2288
2289 #define NORMAL_MAX_STRIPES 4
2290 int lmv_read_entry(struct obd_export *exp, struct md_op_data *op_data,
2291                    struct md_callback *cb_op, struct lu_dirent **ldp)
2292 {
2293         struct obd_device       *obd = exp->exp_obd;
2294         struct lmv_obd          *lmv = &obd->u.lmv;
2295         struct lmv_stripe_md    *lsm = op_data->op_mea1;
2296         struct lu_dirent        *tmp_ents[NORMAL_MAX_STRIPES];
2297         struct lu_dirent        **ents = NULL;
2298         int                     stripe_count;
2299         __u64                   min_hash;
2300         int                     min_idx = 0;
2301         int                     i;
2302         int                     rc;
2303         ENTRY;
2304
2305         rc = lmv_check_connect(obd);
2306         if (rc)
2307                 RETURN(rc);
2308
2309         if (lsm == NULL)
2310                 stripe_count = 1;
2311         else
2312                 stripe_count = lsm->lsm_md_stripe_count;
2313
2314         if (stripe_count > NORMAL_MAX_STRIPES) {
2315                 OBD_ALLOC(ents, sizeof(ents[0]) * stripe_count);
2316                 if (ents == NULL)
2317                         GOTO(out, rc = -ENOMEM);
2318         } else {
2319                 ents = tmp_ents;
2320                 memset(ents, 0, sizeof(ents[0]) * stripe_count);
2321         }
2322
2323         min_hash = MDS_DIR_END_OFF;
2324         for (i = 0; i < stripe_count; i++) {
2325                 struct lmv_tgt_desc *tgt;
2326
2327                 if (likely(lsm == NULL)) {
2328                         tgt = lmv_find_target(lmv, &op_data->op_fid1);
2329                         if (IS_ERR(tgt))
2330                                 GOTO(out, rc = PTR_ERR(tgt));
2331                         LASSERT(op_data->op_data != NULL);
2332                 } else {
2333                         tgt = lmv_get_target(lmv, lsm->lsm_md_oinfo[i].lmo_mds);
2334                         if (IS_ERR(tgt))
2335                                 GOTO(out, rc = PTR_ERR(tgt));
2336                         op_data->op_fid1 = lsm->lsm_md_oinfo[i].lmo_fid;
2337                         op_data->op_fid2 = lsm->lsm_md_oinfo[i].lmo_fid;
2338                         op_data->op_stripe_offset = i;
2339                 }
2340
2341                 rc = md_read_entry(tgt->ltd_exp, op_data, cb_op, &ents[i]);
2342                 if (rc != 0)
2343                         GOTO(out, rc);
2344
2345                 if (ents[i] != NULL &&
2346                     le64_to_cpu(ents[i]->lde_hash) <= min_hash) {
2347                         min_hash = le64_to_cpu(ents[i]->lde_hash);
2348                         min_idx = i;
2349                 }
2350         }
2351
2352         if (min_hash != MDS_DIR_END_OFF)
2353                 *ldp = ents[min_idx];
2354         else
2355                 *ldp = NULL;
2356 out:
2357         if (stripe_count > NORMAL_MAX_STRIPES && ents != NULL)
2358                 OBD_FREE(ents, sizeof(ents[0]) * stripe_count);
2359
2360         RETURN(rc);
2361 }
2362
2363 static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
2364                       struct ptlrpc_request **request)
2365 {
2366         struct obd_device       *obd = exp->exp_obd;
2367         struct lmv_obd          *lmv = &obd->u.lmv;
2368         struct lmv_tgt_desc     *tgt = NULL;
2369         struct mdt_body         *body;
2370         int                     rc;
2371         ENTRY;
2372
2373         rc = lmv_check_connect(obd);
2374         if (rc)
2375                 RETURN(rc);
2376 retry:
2377         /* Send unlink requests to the MDT where the child is located */
2378         if (likely(!fid_is_zero(&op_data->op_fid2)))
2379                 tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
2380         else
2381                 tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
2382         if (IS_ERR(tgt))
2383                 RETURN(PTR_ERR(tgt));
2384
2385         op_data->op_fsuid = current_fsuid();
2386         op_data->op_fsgid = current_fsgid();
2387         op_data->op_cap = cfs_curproc_cap_pack();
2388
2389         /*
2390          * If child's fid is given, cancel unused locks for it if it is from
2391          * another export than parent.
2392          *
2393          * LOOKUP lock for child (fid3) should also be cancelled on parent
2394          * tgt_tgt in mdc_unlink().
2395          */
2396         op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
2397
2398         /*
2399          * Cancel FULL locks on child (fid3).
2400          */
2401         rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
2402                               MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
2403
2404         if (rc != 0)
2405                 RETURN(rc);
2406
2407         CDEBUG(D_INODE, "unlink with fid="DFID"/"DFID" -> mds #%d\n",
2408                PFID(&op_data->op_fid1), PFID(&op_data->op_fid2), tgt->ltd_idx);
2409
2410         rc = md_unlink(tgt->ltd_exp, op_data, request);
2411         if (rc != 0 && rc != -EREMOTE)
2412                 RETURN(rc);
2413
2414         body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
2415         if (body == NULL)
2416                 RETURN(-EPROTO);
2417
2418         /* Not cross-ref case, just get out of here. */
2419         if (likely(!(body->valid & OBD_MD_MDS)))
2420                 RETURN(0);
2421
2422         CDEBUG(D_INODE, "%s: try unlink to another MDT for "DFID"\n",
2423                exp->exp_obd->obd_name, PFID(&body->fid1));
2424
2425         /* This is a remote object, try remote MDT, Note: it may
2426          * try more than 1 time here, Considering following case
2427          * /mnt/lustre is root on MDT0, remote1 is on MDT1
2428          * 1. Initially A does not know where remote1 is, it send
2429          *    unlink RPC to MDT0, MDT0 return -EREMOTE, it will
2430          *    resend unlink RPC to MDT1 (retry 1st time).
2431          *
2432          * 2. During the unlink RPC in flight,
2433          *    client B mv /mnt/lustre/remote1 /mnt/lustre/remote2
2434          *    and create new remote1, but on MDT0
2435          *
2436          * 3. MDT1 get unlink RPC(from A), then do remote lock on
2437          *    /mnt/lustre, then lookup get fid of remote1, and find
2438          *    it is remote dir again, and replay -EREMOTE again.
2439          *
2440          * 4. Then A will resend unlink RPC to MDT0. (retry 2nd times).
2441          *
2442          * In theory, it might try unlimited time here, but it should
2443          * be very rare case.  */
2444         op_data->op_fid2 = body->fid1;
2445         ptlrpc_req_finished(*request);
2446         *request = NULL;
2447
2448         goto retry;
2449 }
2450
2451 static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2452 {
2453         struct lmv_obd *lmv = &obd->u.lmv;
2454         int rc = 0;
2455
2456         switch (stage) {
2457         case OBD_CLEANUP_EARLY:
2458                 /* XXX: here should be calling obd_precleanup() down to
2459                  * stack. */
2460                 break;
2461         case OBD_CLEANUP_EXPORTS:
2462                 fld_client_proc_fini(&lmv->lmv_fld);
2463                 lprocfs_obd_cleanup(obd);
2464                 lprocfs_free_md_stats(obd);
2465                 break;
2466         default:
2467                 break;
2468         }
2469         RETURN(rc);
2470 }
2471
2472 static int lmv_get_info(const struct lu_env *env, struct obd_export *exp,
2473                         __u32 keylen, void *key, __u32 *vallen, void *val,
2474                         struct lov_stripe_md *lsm)
2475 {
2476         struct obd_device       *obd;
2477         struct lmv_obd          *lmv;
2478         int                      rc = 0;
2479         ENTRY;
2480
2481         obd = class_exp2obd(exp);
2482         if (obd == NULL) {
2483                 CDEBUG(D_IOCTL, "Invalid client cookie "LPX64"\n",
2484                        exp->exp_handle.h_cookie);
2485                 RETURN(-EINVAL);
2486         }
2487
2488         lmv = &obd->u.lmv;
2489         if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
2490                 int i;
2491
2492                 rc = lmv_check_connect(obd);
2493                 if (rc)
2494                         RETURN(rc);
2495
2496                 LASSERT(*vallen == sizeof(__u32));
2497                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2498                         struct lmv_tgt_desc *tgt = lmv->tgts[i];
2499                         /*
2500                          * All tgts should be connected when this gets called.
2501                          */
2502                         if (tgt == NULL || tgt->ltd_exp == NULL)
2503                                 continue;
2504
2505                         if (!obd_get_info(env, tgt->ltd_exp, keylen, key,
2506                                           vallen, val, NULL))
2507                                 RETURN(0);
2508                 }
2509                 RETURN(-EINVAL);
2510         } else if (KEY_IS(KEY_MAX_EASIZE) || KEY_IS(KEY_CONN_DATA)) {
2511                 rc = lmv_check_connect(obd);
2512                 if (rc)
2513                         RETURN(rc);
2514
2515                 /*
2516                  * Forwarding this request to first MDS, it should know LOV
2517                  * desc.
2518                  */
2519                 rc = obd_get_info(env, lmv->tgts[0]->ltd_exp, keylen, key,
2520                                   vallen, val, NULL);
2521                 if (!rc && KEY_IS(KEY_CONN_DATA))
2522                         exp->exp_connect_data = *(struct obd_connect_data *)val;
2523                 RETURN(rc);
2524         } else if (KEY_IS(KEY_TGT_COUNT)) {
2525                 *((int *)val) = lmv->desc.ld_tgt_count;
2526                 RETURN(0);
2527         }
2528
2529         CDEBUG(D_IOCTL, "Invalid key\n");
2530         RETURN(-EINVAL);
2531 }
2532
2533 int lmv_set_info_async(const struct lu_env *env, struct obd_export *exp,
2534                        obd_count keylen, void *key, obd_count vallen,
2535                        void *val, struct ptlrpc_request_set *set)
2536 {
2537         struct lmv_tgt_desc    *tgt = NULL;
2538         struct obd_device      *obd;
2539         struct lmv_obd         *lmv;
2540         int rc = 0;
2541         ENTRY;
2542
2543         obd = class_exp2obd(exp);
2544         if (obd == NULL) {
2545                 CDEBUG(D_IOCTL, "Invalid client cookie "LPX64"\n",
2546                        exp->exp_handle.h_cookie);
2547                 RETURN(-EINVAL);
2548         }
2549         lmv = &obd->u.lmv;
2550
2551         if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX)) {
2552                 int i, err = 0;
2553
2554                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2555                         tgt = lmv->tgts[i];
2556
2557                         if (tgt == NULL || tgt->ltd_exp == NULL)
2558                                 continue;
2559
2560                         err = obd_set_info_async(env, tgt->ltd_exp,
2561                                                  keylen, key, vallen, val, set);
2562                         if (err && rc == 0)
2563                                 rc = err;
2564                 }
2565
2566                 RETURN(rc);
2567         }
2568
2569         RETURN(-EINVAL);
2570 }
2571
2572 static int lmv_pack_md_v1(const struct lmv_stripe_md *lsm,
2573                           struct lmv_mds_md_v1 *lmm1)
2574 {
2575         int     cplen;
2576         int     i;
2577
2578         lmm1->lmv_magic = cpu_to_le32(lsm->lsm_md_magic);
2579         lmm1->lmv_stripe_count = cpu_to_le32(lsm->lsm_md_stripe_count);
2580         lmm1->lmv_master_mdt_index = cpu_to_le32(lsm->lsm_md_master_mdt_index);
2581         lmm1->lmv_hash_type = cpu_to_le32(lsm->lsm_md_hash_type);
2582         cplen = strlcpy(lmm1->lmv_pool_name, lsm->lsm_md_pool_name,
2583                         sizeof(lmm1->lmv_pool_name));
2584         if (cplen >= sizeof(lmm1->lmv_pool_name))
2585                 return -E2BIG;
2586
2587         for (i = 0; i < lsm->lsm_md_stripe_count; i++)
2588                 fid_cpu_to_le(&lmm1->lmv_stripe_fids[i],
2589                               &lsm->lsm_md_oinfo[i].lmo_fid);
2590         return 0;
2591 }
2592
2593 int lmv_pack_md(union lmv_mds_md **lmmp, const struct lmv_stripe_md *lsm,
2594                 int stripe_count)
2595 {
2596         int     lmm_size = 0;
2597         bool    allocated = false;
2598         int     rc = 0;
2599         ENTRY;
2600
2601         LASSERT(lmmp != NULL);
2602         /* Free lmm */
2603         if (*lmmp != NULL && lsm == NULL) {
2604                 int stripe_count;
2605
2606                 stripe_count = lmv_mds_md_stripe_count_get(*lmmp);
2607                 lmm_size = lmv_mds_md_size(stripe_count,
2608                                            le32_to_cpu((*lmmp)->lmv_magic));
2609                 if (lmm_size == 0)
2610                         RETURN(-EINVAL);
2611                 OBD_FREE(*lmmp, lmm_size);
2612                 *lmmp = NULL;
2613                 RETURN(0);
2614         }
2615
2616         /* Alloc lmm */
2617         if (*lmmp == NULL && lsm == NULL) {
2618                 lmm_size = lmv_mds_md_size(stripe_count, LMV_MAGIC);
2619                 LASSERT(lmm_size > 0);
2620                 OBD_ALLOC(*lmmp, lmm_size);
2621                 if (*lmmp == NULL)
2622                         RETURN(-ENOMEM);
2623                 lmv_mds_md_stripe_count_set(*lmmp, stripe_count);
2624                 (*lmmp)->lmv_magic = cpu_to_le32(LMV_MAGIC);
2625                 RETURN(lmm_size);
2626         }
2627
2628         /* pack lmm */
2629         LASSERT(lsm != NULL);
2630         lmm_size = lmv_mds_md_size(lsm->lsm_md_stripe_count, lsm->lsm_md_magic);
2631         if (*lmmp == NULL) {
2632                 OBD_ALLOC(*lmmp, lmm_size);
2633                 if (*lmmp == NULL)
2634                         RETURN(-ENOMEM);
2635                 allocated = true;
2636         }
2637
2638         switch (lsm->lsm_md_magic) {
2639         case LMV_MAGIC_V1:
2640                 rc = lmv_pack_md_v1(lsm, &(*lmmp)->lmv_md_v1);
2641                 break;
2642         default:
2643                 rc = -EINVAL;
2644                 break;
2645         }
2646
2647         if (rc != 0 && allocated) {
2648                 OBD_FREE(*lmmp, lmm_size);
2649                 *lmmp = NULL;
2650         }
2651
2652         RETURN(lmm_size);
2653 }
2654 EXPORT_SYMBOL(lmv_pack_md);
2655
2656 static int lmv_unpack_md_v1(struct obd_export *exp, struct lmv_stripe_md *lsm,
2657                             const struct lmv_mds_md_v1 *lmm1)
2658 {
2659         struct lmv_obd  *lmv = &exp->exp_obd->u.lmv;
2660         int             stripe_count;
2661         int             cplen;
2662         int             i;
2663         int             rc = 0;
2664         ENTRY;
2665
2666         lsm->lsm_md_magic = le32_to_cpu(lmm1->lmv_magic);
2667         lsm->lsm_md_stripe_count = le32_to_cpu(lmm1->lmv_stripe_count);
2668         lsm->lsm_md_master_mdt_index = le32_to_cpu(lmm1->lmv_master_mdt_index);
2669         lsm->lsm_md_hash_type = le32_to_cpu(lmm1->lmv_hash_type);
2670         lsm->lsm_md_layout_version = le32_to_cpu(lmm1->lmv_layout_version);
2671         cplen = strlcpy(lsm->lsm_md_pool_name, lmm1->lmv_pool_name,
2672                         sizeof(lsm->lsm_md_pool_name));
2673
2674         if (cplen >= sizeof(lsm->lsm_md_pool_name))
2675                 RETURN(-E2BIG);
2676
2677         CDEBUG(D_INFO, "unpack lsm count %d, master %d hash_type %d"
2678                "layout_version %d\n", lsm->lsm_md_stripe_count,
2679                lsm->lsm_md_master_mdt_index, lsm->lsm_md_hash_type,
2680                lsm->lsm_md_layout_version);
2681
2682         stripe_count = le32_to_cpu(lmm1->lmv_stripe_count);
2683         for (i = 0; i < le32_to_cpu(stripe_count); i++) {
2684                 fid_le_to_cpu(&lsm->lsm_md_oinfo[i].lmo_fid,
2685                               &lmm1->lmv_stripe_fids[i]);
2686                 rc = lmv_fld_lookup(lmv, &lsm->lsm_md_oinfo[i].lmo_fid,
2687                                     &lsm->lsm_md_oinfo[i].lmo_mds);
2688                 if (rc != 0)
2689                         RETURN(rc);
2690                 CDEBUG(D_INFO, "unpack fid #%d "DFID"\n", i,
2691                        PFID(&lsm->lsm_md_oinfo[i].lmo_fid));
2692         }
2693
2694         RETURN(rc);
2695 }
2696
2697 int lmv_unpack_md(struct obd_export *exp, struct lmv_stripe_md **lsmp,
2698                   const union lmv_mds_md *lmm, int stripe_count)
2699 {
2700         struct lmv_stripe_md     *lsm;
2701         int                      lsm_size;
2702         int                      rc;
2703         bool                     allocated = false;
2704         ENTRY;
2705
2706         LASSERT(lsmp != NULL);
2707
2708         lsm = *lsmp;
2709         /* Free memmd */
2710         if (lsm != NULL && lmm == NULL) {
2711 #ifdef __KERNEL__
2712                 int i;
2713                 for (i = 1; i < lsm->lsm_md_stripe_count; i++) {
2714                         if (lsm->lsm_md_oinfo[i].lmo_root != NULL)
2715                                 iput(lsm->lsm_md_oinfo[i].lmo_root);
2716                 }
2717 #endif
2718                 lsm_size = lmv_stripe_md_size(lsm->lsm_md_stripe_count);
2719                 OBD_FREE(lsm, lsm_size);
2720                 *lsmp = NULL;
2721                 RETURN(0);
2722         }
2723
2724         /* Alloc memmd */
2725         if (lsm == NULL && lmm == NULL) {
2726                 lsm_size = lmv_stripe_md_size(stripe_count);
2727                 OBD_ALLOC(lsm, lsm_size);
2728                 if (lsm == NULL)
2729                         RETURN(-ENOMEM);
2730                 lsm->lsm_md_stripe_count = stripe_count;
2731                 *lsmp = lsm;
2732                 RETURN(0);
2733         }
2734
2735         /* Unpack memmd */
2736         if (le32_to_cpu(lmm->lmv_magic) != LMV_MAGIC_V1) {
2737                 CERROR("%s: invalid magic %x.\n", exp->exp_obd->obd_name,
2738                        le32_to_cpu(lmm->lmv_magic));
2739                 RETURN(-EINVAL);
2740         }
2741
2742         lsm_size = lmv_stripe_md_size(lmv_mds_md_stripe_count_get(lmm));
2743         if (lsm == NULL) {
2744                 OBD_ALLOC(lsm, lsm_size);
2745                 if (lsm == NULL)
2746                         RETURN(-ENOMEM);
2747                 allocated = true;
2748                 *lsmp = lsm;
2749         }
2750
2751         switch (le32_to_cpu(lmm->lmv_magic)) {
2752         case LMV_MAGIC_V1:
2753                 rc = lmv_unpack_md_v1(exp, lsm, &lmm->lmv_md_v1);
2754                 break;
2755         default:
2756                 CERROR("%s: unrecognized magic %x\n", exp->exp_obd->obd_name,
2757                        le32_to_cpu(lmm->lmv_magic));
2758                 rc = -EINVAL;
2759                 break;
2760         }
2761
2762         if (rc != 0 && allocated) {
2763                 OBD_FREE(lsm, lsm_size);
2764                 *lsmp = NULL;
2765                 lsm_size = rc;
2766         }
2767         RETURN(lsm_size);
2768 }
2769
2770 int lmv_alloc_memmd(struct lmv_stripe_md **lsmp, int stripes)
2771 {
2772         return lmv_unpack_md(NULL, lsmp, NULL, stripes);
2773 }
2774 EXPORT_SYMBOL(lmv_alloc_memmd);
2775
2776 void lmv_free_memmd(struct lmv_stripe_md *lsm)
2777 {
2778         lmv_unpack_md(NULL, &lsm, NULL, 0);
2779 }
2780 EXPORT_SYMBOL(lmv_free_memmd);
2781
2782 int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
2783                  struct lov_mds_md *lmm, int disk_len)
2784 {
2785         return lmv_unpack_md(exp, (struct lmv_stripe_md **)lsmp,
2786                              (union lmv_mds_md *)lmm, disk_len);
2787 }
2788
2789 int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
2790                struct lov_stripe_md *lsm)
2791 {
2792         struct obd_device               *obd = exp->exp_obd;
2793         struct lmv_obd                  *lmv_obd = &obd->u.lmv;
2794         const struct lmv_stripe_md      *lmv = (struct lmv_stripe_md *)lsm;
2795         int                             stripe_count;
2796
2797         if (lmmp == NULL) {
2798                 if (lsm != NULL)
2799                         stripe_count = lmv->lsm_md_stripe_count;
2800                 else
2801                         stripe_count = lmv_obd->desc.ld_tgt_count;
2802
2803                 return lmv_mds_md_size(stripe_count, LMV_MAGIC_V1);
2804         }
2805
2806         return lmv_pack_md((union lmv_mds_md **)lmmp, lmv, 0);
2807 }
2808
2809 static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
2810                              ldlm_policy_data_t *policy, ldlm_mode_t mode,
2811                              ldlm_cancel_flags_t flags, void *opaque)
2812 {
2813         struct obd_device       *obd = exp->exp_obd;
2814         struct lmv_obd          *lmv = &obd->u.lmv;
2815         int                      rc = 0;
2816         int                      err;
2817         __u32                    i;
2818         ENTRY;
2819
2820         LASSERT(fid != NULL);
2821
2822         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2823                 struct lmv_tgt_desc *tgt = lmv->tgts[i];
2824
2825                 if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active)
2826                         continue;
2827
2828                 err = md_cancel_unused(tgt->ltd_exp, fid, policy, mode, flags,
2829                                        opaque);
2830                 if (!rc)
2831                         rc = err;
2832         }
2833         RETURN(rc);
2834 }
2835
2836 int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
2837                       __u64 *bits)
2838 {
2839         struct lmv_obd          *lmv = &exp->exp_obd->u.lmv;
2840         struct lmv_tgt_desc     *tgt = lmv->tgts[0];
2841         int                      rc;
2842         ENTRY;
2843
2844         if (tgt == NULL || tgt->ltd_exp == NULL)
2845                 RETURN(-EINVAL);
2846         rc =  md_set_lock_data(tgt->ltd_exp, lockh, data, bits);
2847         RETURN(rc);
2848 }
2849
2850 ldlm_mode_t lmv_lock_match(struct obd_export *exp, __u64 flags,
2851                            const struct lu_fid *fid, ldlm_type_t type,
2852                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
2853                            struct lustre_handle *lockh)
2854 {
2855         struct obd_device       *obd = exp->exp_obd;
2856         struct lmv_obd          *lmv = &obd->u.lmv;
2857         ldlm_mode_t              rc;
2858         __u32                    i;
2859         ENTRY;
2860
2861         CDEBUG(D_INODE, "Lock match for "DFID"\n", PFID(fid));
2862
2863         /*
2864          * With CMD every object can have two locks in different namespaces:
2865          * lookup lock in space of mds storing direntry and update/open lock in
2866          * space of mds storing inode. Thus we check all targets, not only that
2867          * one fid was created in.
2868          */
2869         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2870                 struct lmv_tgt_desc *tgt = lmv->tgts[i];
2871
2872                 if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active)
2873                         continue;
2874
2875                 rc = md_lock_match(tgt->ltd_exp, flags, fid, type, policy, mode,
2876                                    lockh);
2877                 if (rc)
2878                         RETURN(rc);
2879         }
2880
2881         RETURN(0);
2882 }
2883
2884 int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
2885                       struct obd_export *dt_exp, struct obd_export *md_exp,
2886                       struct lustre_md *md)
2887 {
2888         struct lmv_obd          *lmv = &exp->exp_obd->u.lmv;
2889         struct lmv_tgt_desc     *tgt = lmv->tgts[0];
2890
2891         if (tgt == NULL || tgt->ltd_exp == NULL)
2892                 RETURN(-EINVAL);
2893
2894         return md_get_lustre_md(lmv->tgts[0]->ltd_exp, req, dt_exp, md_exp, md);
2895 }
2896
2897 int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
2898 {
2899         struct obd_device       *obd = exp->exp_obd;
2900         struct lmv_obd          *lmv = &obd->u.lmv;
2901         struct lmv_tgt_desc     *tgt = lmv->tgts[0];
2902         ENTRY;
2903
2904         if (md->lmv != NULL)
2905                 lmv_free_memmd(md->lmv);
2906         if (tgt == NULL || tgt->ltd_exp == NULL)
2907                 RETURN(-EINVAL);
2908         RETURN(md_free_lustre_md(lmv->tgts[0]->ltd_exp, md));
2909 }
2910
2911 int lmv_set_open_replay_data(struct obd_export *exp,
2912                              struct obd_client_handle *och,
2913                              struct lookup_intent *it)
2914 {
2915         struct obd_device       *obd = exp->exp_obd;
2916         struct lmv_obd          *lmv = &obd->u.lmv;
2917         struct lmv_tgt_desc     *tgt;
2918         ENTRY;
2919
2920         tgt = lmv_find_target(lmv, &och->och_fid);
2921         if (IS_ERR(tgt))
2922                 RETURN(PTR_ERR(tgt));
2923
2924         RETURN(md_set_open_replay_data(tgt->ltd_exp, och, it));
2925 }
2926
2927 int lmv_clear_open_replay_data(struct obd_export *exp,
2928                                struct obd_client_handle *och)
2929 {
2930         struct obd_device       *obd = exp->exp_obd;
2931         struct lmv_obd          *lmv = &obd->u.lmv;
2932         struct lmv_tgt_desc     *tgt;
2933         ENTRY;
2934
2935         tgt = lmv_find_target(lmv, &och->och_fid);
2936         if (IS_ERR(tgt))
2937                 RETURN(PTR_ERR(tgt));
2938
2939         RETURN(md_clear_open_replay_data(tgt->ltd_exp, och));
2940 }
2941
2942 static int lmv_get_remote_perm(struct obd_export *exp,
2943                                const struct lu_fid *fid,
2944                                struct obd_capa *oc, __u32 suppgid,
2945                                struct ptlrpc_request **request)
2946 {
2947         struct obd_device       *obd = exp->exp_obd;
2948         struct lmv_obd          *lmv = &obd->u.lmv;
2949         struct lmv_tgt_desc     *tgt;
2950         int                      rc;
2951         ENTRY;
2952
2953         rc = lmv_check_connect(obd);
2954         if (rc)
2955                 RETURN(rc);
2956
2957         tgt = lmv_find_target(lmv, fid);
2958         if (IS_ERR(tgt))
2959                 RETURN(PTR_ERR(tgt));
2960
2961         rc = md_get_remote_perm(tgt->ltd_exp, fid, oc, suppgid, request);
2962         RETURN(rc);
2963 }
2964
2965 static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *oc,
2966                           renew_capa_cb_t cb)
2967 {
2968         struct obd_device       *obd = exp->exp_obd;
2969         struct lmv_obd          *lmv = &obd->u.lmv;
2970         struct lmv_tgt_desc     *tgt;
2971         int                      rc;
2972         ENTRY;
2973
2974         rc = lmv_check_connect(obd);
2975         if (rc)
2976                 RETURN(rc);
2977
2978         tgt = lmv_find_target(lmv, &oc->c_capa.lc_fid);
2979         if (IS_ERR(tgt))
2980                 RETURN(PTR_ERR(tgt));
2981
2982         rc = md_renew_capa(tgt->ltd_exp, oc, cb);
2983         RETURN(rc);
2984 }
2985
2986 int lmv_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req,
2987                     const struct req_msg_field *field, struct obd_capa **oc)
2988 {
2989         struct lmv_obd          *lmv = &exp->exp_obd->u.lmv;
2990         struct lmv_tgt_desc     *tgt = lmv->tgts[0];
2991
2992         if (tgt == NULL || tgt->ltd_exp == NULL)
2993                 RETURN(-EINVAL);
2994         return md_unpack_capa(tgt->ltd_exp, req, field, oc);
2995 }
2996
2997 int lmv_intent_getattr_async(struct obd_export *exp,
2998                              struct md_enqueue_info *minfo,
2999                              struct ldlm_enqueue_info *einfo)
3000 {
3001         struct md_op_data       *op_data = &minfo->mi_data;
3002         struct obd_device       *obd = exp->exp_obd;
3003         struct lmv_obd          *lmv = &obd->u.lmv;
3004         struct lmv_tgt_desc     *tgt = NULL;
3005         int                      rc;
3006         ENTRY;
3007
3008         rc = lmv_check_connect(obd);
3009         if (rc)
3010                 RETURN(rc);
3011
3012         tgt = lmv_find_target(lmv, &op_data->op_fid1);
3013         if (IS_ERR(tgt))
3014                 RETURN(PTR_ERR(tgt));
3015
3016         rc = md_intent_getattr_async(tgt->ltd_exp, minfo, einfo);
3017         RETURN(rc);
3018 }
3019
3020 int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
3021                         struct lu_fid *fid, __u64 *bits)
3022 {
3023         struct obd_device       *obd = exp->exp_obd;
3024         struct lmv_obd          *lmv = &obd->u.lmv;
3025         struct lmv_tgt_desc     *tgt;
3026         int                      rc;
3027         ENTRY;
3028
3029         rc = lmv_check_connect(obd);
3030         if (rc)
3031                 RETURN(rc);
3032
3033         tgt = lmv_find_target(lmv, fid);
3034         if (IS_ERR(tgt))
3035                 RETURN(PTR_ERR(tgt));
3036
3037         rc = md_revalidate_lock(tgt->ltd_exp, it, fid, bits);
3038         RETURN(rc);
3039 }
3040
3041 /**
3042  * For lmv, only need to send request to master MDT, and the master MDT will
3043  * process with other slave MDTs. The only exception is Q_GETOQUOTA for which
3044  * we directly fetch data from the slave MDTs.
3045  */
3046 int lmv_quotactl(struct obd_device *unused, struct obd_export *exp,
3047                  struct obd_quotactl *oqctl)
3048 {
3049         struct obd_device   *obd = class_exp2obd(exp);
3050         struct lmv_obd      *lmv = &obd->u.lmv;
3051         struct lmv_tgt_desc *tgt = lmv->tgts[0];
3052         int                  rc = 0;
3053         __u32                i;
3054         __u64                curspace, curinodes;
3055         ENTRY;
3056
3057         if (tgt == NULL ||
3058             tgt->ltd_exp == NULL ||
3059             !tgt->ltd_active ||
3060             lmv->desc.ld_tgt_count == 0) {
3061                 CERROR("master lmv inactive\n");
3062                 RETURN(-EIO);
3063         }
3064
3065         if (oqctl->qc_cmd != Q_GETOQUOTA) {
3066                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
3067                 RETURN(rc);
3068         }
3069
3070         curspace = curinodes = 0;
3071         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
3072                 int err;
3073                 tgt = lmv->tgts[i];
3074
3075                 if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active)
3076                         continue;
3077
3078                 err = obd_quotactl(tgt->ltd_exp, oqctl);
3079                 if (err) {
3080                         CERROR("getquota on mdt %d failed. %d\n", i, err);
3081                         if (!rc)
3082                                 rc = err;
3083                 } else {
3084                         curspace += oqctl->qc_dqblk.dqb_curspace;
3085                         curinodes += oqctl->qc_dqblk.dqb_curinodes;
3086                 }
3087         }
3088         oqctl->qc_dqblk.dqb_curspace = curspace;
3089         oqctl->qc_dqblk.dqb_curinodes = curinodes;
3090
3091         RETURN(rc);
3092 }
3093
3094 int lmv_quotacheck(struct obd_device *unused, struct obd_export *exp,
3095                    struct obd_quotactl *oqctl)
3096 {
3097         struct obd_device       *obd = class_exp2obd(exp);
3098         struct lmv_obd          *lmv = &obd->u.lmv;
3099         struct lmv_tgt_desc     *tgt;
3100         __u32                    i;
3101         int                      rc = 0;
3102         ENTRY;
3103
3104         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
3105                 int err;
3106                 tgt = lmv->tgts[i];
3107                 if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active) {
3108                         CERROR("lmv idx %d inactive\n", i);
3109                         RETURN(-EIO);
3110                 }
3111
3112                 err = obd_quotacheck(tgt->ltd_exp, oqctl);
3113                 if (err && !rc)
3114                         rc = err;
3115         }
3116
3117         RETURN(rc);
3118 }
3119
3120 struct obd_ops lmv_obd_ops = {
3121         .o_owner                = THIS_MODULE,
3122         .o_setup                = lmv_setup,
3123         .o_cleanup              = lmv_cleanup,
3124         .o_precleanup           = lmv_precleanup,
3125         .o_process_config       = lmv_process_config,
3126         .o_connect              = lmv_connect,
3127         .o_disconnect           = lmv_disconnect,
3128         .o_statfs               = lmv_statfs,
3129         .o_get_info             = lmv_get_info,
3130         .o_set_info_async       = lmv_set_info_async,
3131         .o_packmd               = lmv_packmd,
3132         .o_unpackmd             = lmv_unpackmd,
3133         .o_notify               = lmv_notify,
3134         .o_get_uuid             = lmv_get_uuid,
3135         .o_iocontrol            = lmv_iocontrol,
3136         .o_quotacheck           = lmv_quotacheck,
3137         .o_quotactl             = lmv_quotactl
3138 };
3139
3140 struct md_ops lmv_md_ops = {
3141         .m_getstatus            = lmv_getstatus,
3142         .m_null_inode           = lmv_null_inode,
3143         .m_find_cbdata          = lmv_find_cbdata,
3144         .m_close                = lmv_close,
3145         .m_create               = lmv_create,
3146         .m_done_writing         = lmv_done_writing,
3147         .m_enqueue              = lmv_enqueue,
3148         .m_getattr              = lmv_getattr,
3149         .m_getxattr             = lmv_getxattr,
3150         .m_getattr_name         = lmv_getattr_name,
3151         .m_intent_lock          = lmv_intent_lock,
3152         .m_link                 = lmv_link,
3153         .m_rename               = lmv_rename,
3154         .m_setattr              = lmv_setattr,
3155         .m_setxattr             = lmv_setxattr,
3156         .m_fsync                = lmv_fsync,
3157         .m_read_entry           = lmv_read_entry,
3158         .m_unlink               = lmv_unlink,
3159         .m_init_ea_size         = lmv_init_ea_size,
3160         .m_cancel_unused        = lmv_cancel_unused,
3161         .m_set_lock_data        = lmv_set_lock_data,
3162         .m_lock_match           = lmv_lock_match,
3163         .m_get_lustre_md        = lmv_get_lustre_md,
3164         .m_free_lustre_md       = lmv_free_lustre_md,
3165         .m_set_open_replay_data = lmv_set_open_replay_data,
3166         .m_clear_open_replay_data = lmv_clear_open_replay_data,
3167         .m_renew_capa           = lmv_renew_capa,
3168         .m_unpack_capa          = lmv_unpack_capa,
3169         .m_get_remote_perm      = lmv_get_remote_perm,
3170         .m_intent_getattr_async = lmv_intent_getattr_async,
3171         .m_revalidate_lock      = lmv_revalidate_lock
3172 };
3173
3174 int __init lmv_init(void)
3175 {
3176         return class_register_type(&lmv_obd_ops, &lmv_md_ops, NULL,
3177 #ifndef HAVE_ONLY_PROCFS_SEQ
3178                                         NULL,
3179 #endif
3180                                         LUSTRE_LMV_NAME, NULL);
3181 }
3182
3183 #ifdef __KERNEL__
3184 static void lmv_exit(void)
3185 {
3186         class_unregister_type(LUSTRE_LMV_NAME);
3187 }
3188
3189 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3190 MODULE_DESCRIPTION("Lustre Logical Metadata Volume OBD driver");
3191 MODULE_LICENSE("GPL");
3192
3193 module_init(lmv_init);
3194 module_exit(lmv_exit);
3195 #endif