Whamcloud - gitweb
LU-4871 newline: Correct missing newline
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_LMV
38 #ifdef __KERNEL__
39 #include <linux/slab.h>
40 #include <linux/module.h>
41 #include <linux/init.h>
42 #include <linux/slab.h>
43 #include <linux/pagemap.h>
44 #include <linux/mm.h>
45 #include <linux/math64.h>
46 #include <linux/seq_file.h>
47 #include <linux/namei.h>
48 #else
49 #include <liblustre.h>
50 #endif
51
52 #include <lustre/lustre_idl.h>
53 #include <obd_support.h>
54 #include <lustre_lib.h>
55 #include <lustre_net.h>
56 #include <obd_class.h>
57 #include <lustre_lmv.h>
58 #include <lprocfs_status.h>
59 #include <cl_object.h>
60 #include <lclient.h>
61 #include <lustre_lite.h>
62 #include <lustre_fid.h>
63 #include <lustre_ioctl.h>
64 #include "lmv_internal.h"
65
66 /* This hash is only for testing purpose */
67 static inline unsigned int
68 lmv_hash_all_chars(unsigned int count, const char *name, int namelen)
69 {
70         unsigned int c = 0;
71         const unsigned char *p = (const unsigned char *)name;
72
73         while (--namelen >= 0)
74                 c += p[namelen];
75
76         c = c % count;
77
78         return c;
79 }
80
81 static inline unsigned int
82 lmv_hash_fnv1a(unsigned int count, const char *name, int namelen)
83 {
84         __u64   hash;
85
86         hash = lustre_hash_fnv_1a_64(name, namelen);
87
88         hash = hash % count;
89
90         return hash;
91 }
92
93 int lmv_name_to_stripe_index(__u32 lmv_hash_type, unsigned int stripe_count,
94                              const char *name, int namelen)
95 {
96         int     idx;
97         __u32   hash_type = lmv_hash_type & LMV_HASH_TYPE_MASK;
98
99         LASSERT(namelen > 0);
100         if (stripe_count <= 1)
101                 return 0;
102
103         /* for migrating object, always start from 0 stripe */
104         if (lmv_hash_type & LMV_HASH_FLAG_MIGRATION)
105                 return 0;
106
107         switch (hash_type) {
108         case LMV_HASH_TYPE_ALL_CHARS:
109                 idx = lmv_hash_all_chars(stripe_count, name, namelen);
110                 break;
111         case LMV_HASH_TYPE_FNV_1A_64:
112                 idx = lmv_hash_fnv1a(stripe_count, name, namelen);
113                 break;
114         default:
115                 idx = -EBADFD;
116                 break;
117         }
118
119         CDEBUG(D_INFO, "name %.*s hash_type %d idx %d\n", namelen, name,
120                hash_type, idx);
121
122         return idx;
123 }
124
125 static void lmv_activate_target(struct lmv_obd *lmv,
126                                 struct lmv_tgt_desc *tgt,
127                                 int activate)
128 {
129         if (tgt->ltd_active == activate)
130                 return;
131
132         tgt->ltd_active = activate;
133         lmv->desc.ld_active_tgt_count += (activate ? 1 : -1);
134 }
135
136 /**
137  * Error codes:
138  *
139  *  -EINVAL  : UUID can't be found in the LMV's target list
140  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
141  *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
142  */
143 static int lmv_set_mdc_active(struct lmv_obd *lmv,
144                               const struct obd_uuid *uuid,
145                               int activate)
146 {
147         struct lmv_tgt_desc     *tgt = NULL;
148         struct obd_device       *obd;
149         __u32                    i;
150         int                      rc = 0;
151         ENTRY;
152
153         CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
154                         lmv, uuid->uuid, activate);
155
156         spin_lock(&lmv->lmv_lock);
157         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
158                 tgt = lmv->tgts[i];
159                 if (tgt == NULL || tgt->ltd_exp == NULL)
160                         continue;
161
162                 CDEBUG(D_INFO, "Target idx %d is %s conn "LPX64"\n", i,
163                        tgt->ltd_uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
164
165                 if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
166                         break;
167         }
168
169         if (i == lmv->desc.ld_tgt_count)
170                 GOTO(out_lmv_lock, rc = -EINVAL);
171
172         obd = class_exp2obd(tgt->ltd_exp);
173         if (obd == NULL)
174                 GOTO(out_lmv_lock, rc = -ENOTCONN);
175
176         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
177                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
178                obd->obd_type->typ_name, i);
179         LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0);
180
181         if (tgt->ltd_active == activate) {
182                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
183                        activate ? "" : "in");
184                 GOTO(out_lmv_lock, rc);
185         }
186
187         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd,
188                activate ? "" : "in");
189         lmv_activate_target(lmv, tgt, activate);
190         EXIT;
191
192  out_lmv_lock:
193         spin_unlock(&lmv->lmv_lock);
194         return rc;
195 }
196
197 struct obd_uuid *lmv_get_uuid(struct obd_export *exp)
198 {
199         struct lmv_obd          *lmv = &exp->exp_obd->u.lmv;
200         struct lmv_tgt_desc     *tgt = lmv->tgts[0];
201
202         return (tgt == NULL) ? NULL : obd_get_uuid(tgt->ltd_exp);
203 }
204
205 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
206                       enum obd_notify_event ev, void *data)
207 {
208         struct obd_connect_data *conn_data;
209         struct lmv_obd          *lmv = &obd->u.lmv;
210         struct obd_uuid         *uuid;
211         int                      rc = 0;
212         ENTRY;
213
214         if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) {
215                 CERROR("unexpected notification of %s %s!\n",
216                        watched->obd_type->typ_name,
217                        watched->obd_name);
218                 RETURN(-EINVAL);
219         }
220
221         uuid = &watched->u.cli.cl_target_uuid;
222         if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) {
223                 /*
224                  * Set MDC as active before notifying the observer, so the
225                  * observer can use the MDC normally.
226                  */
227                 rc = lmv_set_mdc_active(lmv, uuid,
228                                         ev == OBD_NOTIFY_ACTIVE);
229                 if (rc) {
230                         CERROR("%sactivation of %s failed: %d\n",
231                                ev == OBD_NOTIFY_ACTIVE ? "" : "de",
232                                uuid->uuid, rc);
233                         RETURN(rc);
234                 }
235         } else if (ev == OBD_NOTIFY_OCD) {
236                 conn_data = &watched->u.cli.cl_import->imp_connect_data;
237                 /*
238                  * XXX: Make sure that ocd_connect_flags from all targets are
239                  * the same. Otherwise one of MDTs runs wrong version or
240                  * something like this.  --umka
241                  */
242                 obd->obd_self_export->exp_connect_data = *conn_data;
243         }
244 #if 0
245         else if (ev == OBD_NOTIFY_DISCON) {
246                 /*
247                  * For disconnect event, flush fld cache for failout MDS case.
248                  */
249                 fld_client_flush(&lmv->lmv_fld);
250         }
251 #endif
252         /*
253          * Pass the notification up the chain.
254          */
255         if (obd->obd_observer)
256                 rc = obd_notify(obd->obd_observer, watched, ev, data);
257
258         RETURN(rc);
259 }
260
261 /**
262  * This is fake connect function. Its purpose is to initialize lmv and say
263  * caller that everything is okay. Real connection will be performed later.
264  */
265 static int lmv_connect(const struct lu_env *env,
266                        struct obd_export **exp, struct obd_device *obd,
267                        struct obd_uuid *cluuid, struct obd_connect_data *data,
268                        void *localdata)
269 {
270         struct lmv_obd        *lmv = &obd->u.lmv;
271         struct lustre_handle  conn = { 0 };
272         int                    rc = 0;
273         ENTRY;
274
275         /*
276          * We don't want to actually do the underlying connections more than
277          * once, so keep track.
278          */
279         lmv->refcount++;
280         if (lmv->refcount > 1) {
281                 *exp = NULL;
282                 RETURN(0);
283         }
284
285         rc = class_connect(&conn, obd, cluuid);
286         if (rc) {
287                 CERROR("class_connection() returned %d\n", rc);
288                 RETURN(rc);
289         }
290
291         *exp = class_conn2export(&conn);
292         class_export_get(*exp);
293
294         lmv->exp = *exp;
295         lmv->connected = 0;
296         lmv->cluuid = *cluuid;
297
298         if (data)
299                 lmv->conn_data = *data;
300
301         if (lmv->targets_proc_entry == NULL) {
302                 lmv->targets_proc_entry = lprocfs_seq_register("target_obds",
303                                                         obd->obd_proc_entry,
304                                                         NULL, NULL);
305                 if (IS_ERR(lmv->targets_proc_entry)) {
306                         CERROR("%s: cannot register "
307                                "/proc/fs/lustre/%s/%s/target_obds\n",
308                                obd->obd_name, obd->obd_type->typ_name,
309                                obd->obd_name);
310                         lmv->targets_proc_entry = NULL;
311                 }
312         }
313
314         /*
315          * All real clients should perform actual connection right away, because
316          * it is possible, that LMV will not have opportunity to connect targets
317          * and MDC stuff will be called directly, for instance while reading
318          * ../mdc/../kbytesfree procfs file, etc.
319          */
320         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_REAL))
321                 rc = lmv_check_connect(obd);
322
323         if (rc && lmv->targets_proc_entry != NULL)
324                 lprocfs_remove(&lmv->targets_proc_entry);
325         RETURN(rc);
326 }
327
328 static void lmv_set_timeouts(struct obd_device *obd)
329 {
330         struct lmv_obd          *lmv;
331         __u32                    i;
332
333         lmv = &obd->u.lmv;
334         if (lmv->server_timeout == 0)
335                 return;
336
337         if (lmv->connected == 0)
338                 return;
339
340         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
341                 struct lmv_tgt_desc *tgt = lmv->tgts[i];
342
343                 if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active)
344                         continue;
345
346                 obd_set_info_async(NULL, tgt->ltd_exp, sizeof(KEY_INTERMDS),
347                                    KEY_INTERMDS, 0, NULL, NULL);
348         }
349 }
350
351 static int lmv_init_ea_size(struct obd_export *exp, int easize,
352                             int def_easize, int cookiesize, int def_cookiesize)
353 {
354         struct obd_device       *obd = exp->exp_obd;
355         struct lmv_obd          *lmv = &obd->u.lmv;
356         __u32                    i;
357         int                      rc = 0;
358         int                      change = 0;
359         ENTRY;
360
361         if (lmv->max_easize < easize) {
362                 lmv->max_easize = easize;
363                 change = 1;
364         }
365         if (lmv->max_def_easize < def_easize) {
366                 lmv->max_def_easize = def_easize;
367                 change = 1;
368         }
369         if (lmv->max_cookiesize < cookiesize) {
370                 lmv->max_cookiesize = cookiesize;
371                 change = 1;
372         }
373         if (lmv->max_def_cookiesize < def_cookiesize) {
374                 lmv->max_def_cookiesize = def_cookiesize;
375                 change = 1;
376         }
377         if (change == 0)
378                 RETURN(0);
379
380         if (lmv->connected == 0)
381                 RETURN(0);
382
383         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
384                 struct lmv_tgt_desc *tgt = lmv->tgts[i];
385
386                 if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active) {
387                         CWARN("%s: NULL export for %d\n", obd->obd_name, i);
388                         continue;
389                 }
390
391                 rc = md_init_ea_size(tgt->ltd_exp, easize, def_easize,
392                                      cookiesize, def_cookiesize);
393                 if (rc) {
394                         CERROR("%s: obd_init_ea_size() failed on MDT target %d:"
395                                " rc = %d\n", obd->obd_name, i, rc);
396                         break;
397                 }
398         }
399         RETURN(rc);
400 }
401
402 #define MAX_STRING_SIZE 128
403
404 int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
405 {
406         struct lmv_obd          *lmv = &obd->u.lmv;
407         struct obd_uuid         *cluuid = &lmv->cluuid;
408         struct obd_uuid          lmv_mdc_uuid = { "LMV_MDC_UUID" };
409         struct obd_device       *mdc_obd;
410         struct obd_export       *mdc_exp;
411         struct lu_fld_target     target;
412         int                      rc;
413         ENTRY;
414
415         mdc_obd = class_find_client_obd(&tgt->ltd_uuid, LUSTRE_MDC_NAME,
416                                         &obd->obd_uuid);
417         if (!mdc_obd) {
418                 CERROR("target %s not attached\n", tgt->ltd_uuid.uuid);
419                 RETURN(-EINVAL);
420         }
421
422         CDEBUG(D_CONFIG, "connect to %s(%s) - %s, %s FOR %s\n",
423                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
424                 tgt->ltd_uuid.uuid, obd->obd_uuid.uuid,
425                 cluuid->uuid);
426
427         if (!mdc_obd->obd_set_up) {
428                 CERROR("target %s is not set up\n", tgt->ltd_uuid.uuid);
429                 RETURN(-EINVAL);
430         }
431
432         rc = obd_connect(NULL, &mdc_exp, mdc_obd, &lmv_mdc_uuid,
433                          &lmv->conn_data, NULL);
434         if (rc) {
435                 CERROR("target %s connect error %d\n", tgt->ltd_uuid.uuid, rc);
436                 RETURN(rc);
437         }
438
439         /*
440          * Init fid sequence client for this mdc and add new fld target.
441          */
442         rc = obd_fid_init(mdc_obd, mdc_exp, LUSTRE_SEQ_METADATA);
443         if (rc)
444                 RETURN(rc);
445
446         target.ft_srv = NULL;
447         target.ft_exp = mdc_exp;
448         target.ft_idx = tgt->ltd_idx;
449
450         fld_client_add_target(&lmv->lmv_fld, &target);
451
452         rc = obd_register_observer(mdc_obd, obd);
453         if (rc) {
454                 obd_disconnect(mdc_exp);
455                 CERROR("target %s register_observer error %d\n",
456                        tgt->ltd_uuid.uuid, rc);
457                 RETURN(rc);
458         }
459
460         if (obd->obd_observer) {
461                 /*
462                  * Tell the observer about the new target.
463                  */
464                 rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
465                                 OBD_NOTIFY_ACTIVE,
466                                 (void *)(tgt - lmv->tgts[0]));
467                 if (rc) {
468                         obd_disconnect(mdc_exp);
469                         RETURN(rc);
470                 }
471         }
472
473         tgt->ltd_active = 1;
474         tgt->ltd_exp = mdc_exp;
475         lmv->desc.ld_active_tgt_count++;
476
477         md_init_ea_size(tgt->ltd_exp, lmv->max_easize, lmv->max_def_easize,
478                         lmv->max_cookiesize, lmv->max_def_cookiesize);
479
480         CDEBUG(D_CONFIG, "Connected to %s(%s) successfully (%d)\n",
481                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
482                 atomic_read(&obd->obd_refcount));
483
484         if (lmv->targets_proc_entry != NULL) {
485                 struct proc_dir_entry *mdc_symlink;
486
487                 LASSERT(mdc_obd->obd_type != NULL);
488                 LASSERT(mdc_obd->obd_type->typ_name != NULL);
489                 mdc_symlink = lprocfs_add_symlink(mdc_obd->obd_name,
490                                                   lmv->targets_proc_entry,
491                                                   "../../../%s/%s",
492                                                   mdc_obd->obd_type->typ_name,
493                                                   mdc_obd->obd_name);
494                 if (mdc_symlink == NULL) {
495                         CERROR("cannot register LMV target "
496                                "/proc/fs/lustre/%s/%s/target_obds/%s\n",
497                                obd->obd_type->typ_name, obd->obd_name,
498                                mdc_obd->obd_name);
499                 }
500         }
501         RETURN(0);
502 }
503
504 static void lmv_del_target(struct lmv_obd *lmv, int index)
505 {
506         if (lmv->tgts[index] == NULL)
507                 return;
508
509         OBD_FREE_PTR(lmv->tgts[index]);
510         lmv->tgts[index] = NULL;
511         return;
512 }
513
514 static int lmv_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
515                            __u32 index, int gen)
516 {
517         struct lmv_obd      *lmv = &obd->u.lmv;
518         struct lmv_tgt_desc *tgt;
519         int                  orig_tgt_count = 0;
520         int                  rc = 0;
521         ENTRY;
522
523         CDEBUG(D_CONFIG, "Target uuid: %s. index %d\n", uuidp->uuid, index);
524
525         lmv_init_lock(lmv);
526
527         if (lmv->desc.ld_tgt_count == 0) {
528                 struct obd_device *mdc_obd;
529
530                 mdc_obd = class_find_client_obd(uuidp, LUSTRE_MDC_NAME,
531                                                 &obd->obd_uuid);
532                 if (!mdc_obd) {
533                         lmv_init_unlock(lmv);
534                         CERROR("%s: Target %s not attached: rc = %d\n",
535                                obd->obd_name, uuidp->uuid, -EINVAL);
536                         RETURN(-EINVAL);
537                 }
538         }
539
540         if ((index < lmv->tgts_size) && (lmv->tgts[index] != NULL)) {
541                 tgt = lmv->tgts[index];
542                 CERROR("%s: UUID %s already assigned at LOV target index %d:"
543                        " rc = %d\n", obd->obd_name,
544                        obd_uuid2str(&tgt->ltd_uuid), index, -EEXIST);
545                 lmv_init_unlock(lmv);
546                 RETURN(-EEXIST);
547         }
548
549         if (index >= lmv->tgts_size) {
550                 /* We need to reallocate the lmv target array. */
551                 struct lmv_tgt_desc **newtgts, **old = NULL;
552                 __u32 newsize = 1;
553                 __u32 oldsize = 0;
554
555                 while (newsize < index + 1)
556                         newsize = newsize << 1;
557                 OBD_ALLOC(newtgts, sizeof(*newtgts) * newsize);
558                 if (newtgts == NULL) {
559                         lmv_init_unlock(lmv);
560                         RETURN(-ENOMEM);
561                 }
562
563                 if (lmv->tgts_size) {
564                         memcpy(newtgts, lmv->tgts,
565                                sizeof(*newtgts) * lmv->tgts_size);
566                         old = lmv->tgts;
567                         oldsize = lmv->tgts_size;
568                 }
569
570                 lmv->tgts = newtgts;
571                 lmv->tgts_size = newsize;
572                 smp_rmb();
573                 if (old)
574                         OBD_FREE(old, sizeof(*old) * oldsize);
575
576                 CDEBUG(D_CONFIG, "tgts: %p size: %d\n", lmv->tgts,
577                        lmv->tgts_size);
578         }
579
580         OBD_ALLOC_PTR(tgt);
581         if (!tgt) {
582                 lmv_init_unlock(lmv);
583                 RETURN(-ENOMEM);
584         }
585
586         mutex_init(&tgt->ltd_fid_mutex);
587         tgt->ltd_idx = index;
588         tgt->ltd_uuid = *uuidp;
589         tgt->ltd_active = 0;
590         lmv->tgts[index] = tgt;
591         if (index >= lmv->desc.ld_tgt_count) {
592                 orig_tgt_count = lmv->desc.ld_tgt_count;
593                 lmv->desc.ld_tgt_count = index + 1;
594         }
595
596         if (lmv->connected) {
597                 rc = lmv_connect_mdc(obd, tgt);
598                 if (rc != 0) {
599                         spin_lock(&lmv->lmv_lock);
600                         if (lmv->desc.ld_tgt_count == index + 1)
601                                 lmv->desc.ld_tgt_count = orig_tgt_count;
602                         memset(tgt, 0, sizeof(*tgt));
603                         spin_unlock(&lmv->lmv_lock);
604                 } else {
605                         int easize = sizeof(struct lmv_stripe_md) +
606                                 lmv->desc.ld_tgt_count * sizeof(struct lu_fid);
607                         lmv_init_ea_size(obd->obd_self_export, easize, 0, 0, 0);
608                 }
609         }
610
611         lmv_init_unlock(lmv);
612         RETURN(rc);
613 }
614
615 int lmv_check_connect(struct obd_device *obd)
616 {
617         struct lmv_obd          *lmv = &obd->u.lmv;
618         struct lmv_tgt_desc     *tgt;
619         __u32                    i;
620         int                      rc;
621         int                      easize;
622         ENTRY;
623
624         if (lmv->connected)
625                 RETURN(0);
626
627         lmv_init_lock(lmv);
628         if (lmv->connected) {
629                 lmv_init_unlock(lmv);
630                 RETURN(0);
631         }
632
633         if (lmv->desc.ld_tgt_count == 0) {
634                 lmv_init_unlock(lmv);
635                 CERROR("%s: no targets configured.\n", obd->obd_name);
636                 RETURN(-EINVAL);
637         }
638
639         LASSERT(lmv->tgts != NULL);
640
641         if (lmv->tgts[0] == NULL) {
642                 lmv_init_unlock(lmv);
643                 CERROR("%s: no target configured for index 0.\n",
644                        obd->obd_name);
645                 RETURN(-EINVAL);
646         }
647
648         CDEBUG(D_CONFIG, "Time to connect %s to %s\n",
649                lmv->cluuid.uuid, obd->obd_name);
650
651         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
652                 tgt = lmv->tgts[i];
653                 if (tgt == NULL)
654                         continue;
655                 rc = lmv_connect_mdc(obd, tgt);
656                 if (rc)
657                         GOTO(out_disc, rc);
658         }
659
660         lmv_set_timeouts(obd);
661         class_export_put(lmv->exp);
662         lmv->connected = 1;
663         easize = lmv_mds_md_size(lmv->desc.ld_tgt_count, LMV_MAGIC);
664         lmv_init_ea_size(obd->obd_self_export, easize, 0, 0, 0);
665         lmv_init_unlock(lmv);
666         RETURN(0);
667
668  out_disc:
669         while (i-- > 0) {
670                 int rc2;
671                 tgt = lmv->tgts[i];
672                 if (tgt == NULL)
673                         continue;
674                 tgt->ltd_active = 0;
675                 if (tgt->ltd_exp) {
676                         --lmv->desc.ld_active_tgt_count;
677                         rc2 = obd_disconnect(tgt->ltd_exp);
678                         if (rc2) {
679                                 CERROR("LMV target %s disconnect on "
680                                        "MDC idx %d: error %d\n",
681                                        tgt->ltd_uuid.uuid, i, rc2);
682                         }
683                 }
684         }
685         class_disconnect(lmv->exp);
686         lmv_init_unlock(lmv);
687         RETURN(rc);
688 }
689
690 static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
691 {
692         struct lmv_obd         *lmv = &obd->u.lmv;
693         struct obd_device      *mdc_obd;
694         int                     rc;
695         ENTRY;
696
697         LASSERT(tgt != NULL);
698         LASSERT(obd != NULL);
699
700         mdc_obd = class_exp2obd(tgt->ltd_exp);
701
702         if (mdc_obd) {
703                 mdc_obd->obd_force = obd->obd_force;
704                 mdc_obd->obd_fail = obd->obd_fail;
705                 mdc_obd->obd_no_recov = obd->obd_no_recov;
706         }
707
708         if (lmv->targets_proc_entry != NULL)
709                 lprocfs_remove_proc_entry(mdc_obd->obd_name,
710                                           lmv->targets_proc_entry);
711
712         rc = obd_fid_fini(tgt->ltd_exp->exp_obd);
713         if (rc)
714                 CERROR("Can't finanize fids factory\n");
715
716         CDEBUG(D_INFO, "Disconnected from %s(%s) successfully\n",
717                tgt->ltd_exp->exp_obd->obd_name,
718                tgt->ltd_exp->exp_obd->obd_uuid.uuid);
719
720         obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
721         rc = obd_disconnect(tgt->ltd_exp);
722         if (rc) {
723                 if (tgt->ltd_active) {
724                         CERROR("Target %s disconnect error %d\n",
725                                tgt->ltd_uuid.uuid, rc);
726                 }
727         }
728
729         lmv_activate_target(lmv, tgt, 0);
730         tgt->ltd_exp = NULL;
731         RETURN(0);
732 }
733
734 static int lmv_disconnect(struct obd_export *exp)
735 {
736         struct obd_device       *obd = class_exp2obd(exp);
737         struct lmv_obd          *lmv = &obd->u.lmv;
738         int                      rc;
739         __u32                    i;
740         ENTRY;
741
742         if (!lmv->tgts)
743                 goto out_local;
744
745         /*
746          * Only disconnect the underlying layers on the final disconnect.
747          */
748         lmv->refcount--;
749         if (lmv->refcount != 0)
750                 goto out_local;
751
752         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
753                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
754                         continue;
755
756                 lmv_disconnect_mdc(obd, lmv->tgts[i]);
757         }
758
759         if (lmv->targets_proc_entry != NULL)
760                 lprocfs_remove(&lmv->targets_proc_entry);
761         else
762                 CERROR("/proc/fs/lustre/%s/%s/target_obds missing\n",
763                        obd->obd_type->typ_name, obd->obd_name);
764
765 out_local:
766         /*
767          * This is the case when no real connection is established by
768          * lmv_check_connect().
769          */
770         if (!lmv->connected)
771                 class_export_put(exp);
772         rc = class_disconnect(exp);
773         if (lmv->refcount == 0)
774                 lmv->connected = 0;
775         RETURN(rc);
776 }
777
778 static int lmv_fid2path(struct obd_export *exp, int len, void *karg, void *uarg)
779 {
780         struct obd_device       *obddev = class_exp2obd(exp);
781         struct lmv_obd          *lmv = &obddev->u.lmv;
782         struct getinfo_fid2path *gf;
783         struct lmv_tgt_desc     *tgt;
784         struct getinfo_fid2path *remote_gf = NULL;
785         int                     remote_gf_size = 0;
786         int                     rc;
787
788         gf = (struct getinfo_fid2path *)karg;
789         tgt = lmv_find_target(lmv, &gf->gf_fid);
790         if (IS_ERR(tgt))
791                 RETURN(PTR_ERR(tgt));
792
793 repeat_fid2path:
794         rc = obd_iocontrol(OBD_IOC_FID2PATH, tgt->ltd_exp, len, gf, uarg);
795         if (rc != 0 && rc != -EREMOTE)
796                 GOTO(out_fid2path, rc);
797
798         /* If remote_gf != NULL, it means just building the
799          * path on the remote MDT, copy this path segement to gf */
800         if (remote_gf != NULL) {
801                 struct getinfo_fid2path *ori_gf;
802                 char *ptr;
803
804                 ori_gf = (struct getinfo_fid2path *)karg;
805                 if (strlen(ori_gf->gf_path) +
806                     strlen(gf->gf_path) > ori_gf->gf_pathlen)
807                         GOTO(out_fid2path, rc = -EOVERFLOW);
808
809                 ptr = ori_gf->gf_path;
810
811                 memmove(ptr + strlen(gf->gf_path) + 1, ptr,
812                         strlen(ori_gf->gf_path));
813
814                 strncpy(ptr, gf->gf_path, strlen(gf->gf_path));
815                 ptr += strlen(gf->gf_path);
816                 *ptr = '/';
817         }
818
819         CDEBUG(D_INFO, "%s: get path %s "DFID" rec: "LPU64" ln: %u\n",
820                tgt->ltd_exp->exp_obd->obd_name,
821                gf->gf_path, PFID(&gf->gf_fid), gf->gf_recno,
822                gf->gf_linkno);
823
824         if (rc == 0)
825                 GOTO(out_fid2path, rc);
826
827         /* sigh, has to go to another MDT to do path building further */
828         if (remote_gf == NULL) {
829                 remote_gf_size = sizeof(*remote_gf) + PATH_MAX;
830                 OBD_ALLOC(remote_gf, remote_gf_size);
831                 if (remote_gf == NULL)
832                         GOTO(out_fid2path, rc = -ENOMEM);
833                 remote_gf->gf_pathlen = PATH_MAX;
834         }
835
836         if (!fid_is_sane(&gf->gf_fid)) {
837                 CERROR("%s: invalid FID "DFID": rc = %d\n",
838                        tgt->ltd_exp->exp_obd->obd_name,
839                        PFID(&gf->gf_fid), -EINVAL);
840                 GOTO(out_fid2path, rc = -EINVAL);
841         }
842
843         tgt = lmv_find_target(lmv, &gf->gf_fid);
844         if (IS_ERR(tgt))
845                 GOTO(out_fid2path, rc = -EINVAL);
846
847         remote_gf->gf_fid = gf->gf_fid;
848         remote_gf->gf_recno = -1;
849         remote_gf->gf_linkno = -1;
850         memset(remote_gf->gf_path, 0, remote_gf->gf_pathlen);
851         gf = remote_gf;
852         goto repeat_fid2path;
853
854 out_fid2path:
855         if (remote_gf != NULL)
856                 OBD_FREE(remote_gf, remote_gf_size);
857         RETURN(rc);
858 }
859
860 static int lmv_hsm_req_count(struct lmv_obd *lmv,
861                              const struct hsm_user_request *hur,
862                              const struct lmv_tgt_desc *tgt_mds)
863 {
864         __u32                    i;
865         int                      nr = 0;
866         struct lmv_tgt_desc     *curr_tgt;
867
868         /* count how many requests must be sent to the given target */
869         for (i = 0; i < hur->hur_request.hr_itemcount; i++) {
870                 curr_tgt = lmv_find_target(lmv, &hur->hur_user_item[i].hui_fid);
871                 if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid))
872                         nr++;
873         }
874         return nr;
875 }
876
877 static void lmv_hsm_req_build(struct lmv_obd *lmv,
878                               struct hsm_user_request *hur_in,
879                               const struct lmv_tgt_desc *tgt_mds,
880                               struct hsm_user_request *hur_out)
881 {
882         __u32                    i, nr_out;
883         struct lmv_tgt_desc     *curr_tgt;
884
885         /* build the hsm_user_request for the given target */
886         hur_out->hur_request = hur_in->hur_request;
887         nr_out = 0;
888         for (i = 0; i < hur_in->hur_request.hr_itemcount; i++) {
889                 curr_tgt = lmv_find_target(lmv,
890                                            &hur_in->hur_user_item[i].hui_fid);
891                 if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid)) {
892                         hur_out->hur_user_item[nr_out] =
893                                                 hur_in->hur_user_item[i];
894                         nr_out++;
895                 }
896         }
897         hur_out->hur_request.hr_itemcount = nr_out;
898         memcpy(hur_data(hur_out), hur_data(hur_in),
899                hur_in->hur_request.hr_data_len);
900 }
901
902 static int lmv_hsm_ct_unregister(struct lmv_obd *lmv, unsigned int cmd, int len,
903                                  struct lustre_kernelcomm *lk, void *uarg)
904 {
905         __u32                    i;
906         int                      rc;
907         struct kkuc_ct_data     *kcd = NULL;
908         ENTRY;
909
910         /* unregister request (call from llapi_hsm_copytool_fini) */
911         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
912                 struct lmv_tgt_desc *tgt = lmv->tgts[i];
913
914                 if (tgt == NULL || tgt->ltd_exp == NULL)
915                         continue;
916                 /* best effort: try to clean as much as possible
917                  * (continue on error) */
918                 obd_iocontrol(cmd, tgt->ltd_exp, len, lk, uarg);
919         }
920
921         /* Whatever the result, remove copytool from kuc groups.
922          * Unreached coordinators will get EPIPE on next requests
923          * and will unregister automatically.
924          */
925         rc = libcfs_kkuc_group_rem(lk->lk_uid, lk->lk_group, (void **)&kcd);
926         if (kcd != NULL)
927                 OBD_FREE_PTR(kcd);
928
929         RETURN(rc);
930 }
931
932 static int lmv_hsm_ct_register(struct lmv_obd *lmv, unsigned int cmd, int len,
933                                struct lustre_kernelcomm *lk, void *uarg)
934 {
935         struct file             *filp;
936         __u32                    i, j;
937         int                      err, rc;
938         bool                     any_set = false;
939         struct kkuc_ct_data     *kcd;
940         ENTRY;
941
942         /* All or nothing: try to register to all MDS.
943          * In case of failure, unregister from previous MDS,
944          * except if it because of inactive target. */
945         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
946                 struct lmv_tgt_desc *tgt = lmv->tgts[i];
947
948                 if (tgt == NULL || tgt->ltd_exp == NULL)
949                         continue;
950                 err = obd_iocontrol(cmd, tgt->ltd_exp, len, lk, uarg);
951                 if (err) {
952                         if (tgt->ltd_active) {
953                                 /* permanent error */
954                                 CERROR("%s: iocontrol MDC %s on MDT"
955                                        " idx %d cmd %x: err = %d\n",
956                                        class_exp2obd(lmv->exp)->obd_name,
957                                        tgt->ltd_uuid.uuid, i, cmd, err);
958                                 rc = err;
959                                 lk->lk_flags |= LK_FLG_STOP;
960                                 /* unregister from previous MDS */
961                                 for (j = 0; j < i; j++) {
962                                         tgt = lmv->tgts[j];
963                                         if (tgt == NULL || tgt->ltd_exp == NULL)
964                                                 continue;
965                                         obd_iocontrol(cmd, tgt->ltd_exp, len,
966                                                       lk, uarg);
967                                 }
968                                 RETURN(rc);
969                         }
970                         /* else: transient error.
971                          * kuc will register to the missing MDT
972                          * when it is back */
973                 } else {
974                         any_set = true;
975                 }
976         }
977
978         if (!any_set)
979                 /* no registration done: return error */
980                 RETURN(-ENOTCONN);
981
982         /* at least one registration done, with no failure */
983         filp = fget(lk->lk_wfd);
984         if (filp == NULL)
985                 RETURN(-EBADF);
986
987         OBD_ALLOC_PTR(kcd);
988         if (kcd == NULL) {
989                 fput(filp);
990                 RETURN(-ENOMEM);
991         }
992         kcd->kcd_magic = KKUC_CT_DATA_MAGIC;
993         kcd->kcd_uuid = lmv->cluuid;
994         kcd->kcd_archive = lk->lk_data;
995
996         rc = libcfs_kkuc_group_add(filp, lk->lk_uid, lk->lk_group, kcd);
997         if (rc != 0) {
998                 if (filp != NULL)
999                         fput(filp);
1000                 OBD_FREE_PTR(kcd);
1001         }
1002
1003         RETURN(rc);
1004 }
1005
1006
1007
1008
1009 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
1010                          int len, void *karg, void *uarg)
1011 {
1012         struct obd_device       *obddev = class_exp2obd(exp);
1013         struct lmv_obd          *lmv = &obddev->u.lmv;
1014         struct lmv_tgt_desc     *tgt = NULL;
1015         __u32                    i = 0;
1016         int                      rc = 0;
1017         int                      set = 0;
1018         __u32                    count = lmv->desc.ld_tgt_count;
1019         ENTRY;
1020
1021         if (count == 0)
1022                 RETURN(-ENOTTY);
1023
1024         switch (cmd) {
1025         case IOC_OBD_STATFS: {
1026                 struct obd_ioctl_data *data = karg;
1027                 struct obd_device *mdc_obd;
1028                 struct obd_statfs stat_buf = {0};
1029                 __u32 index;
1030
1031                 memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
1032                 if ((index >= count))
1033                         RETURN(-ENODEV);
1034
1035                 tgt = lmv->tgts[index];
1036                 if (tgt == NULL || !tgt->ltd_active)
1037                         RETURN(-ENODATA);
1038
1039                 mdc_obd = class_exp2obd(tgt->ltd_exp);
1040                 if (!mdc_obd)
1041                         RETURN(-EINVAL);
1042
1043                 /* copy UUID */
1044                 if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
1045                                  min((int) data->ioc_plen2,
1046                                      (int) sizeof(struct obd_uuid))))
1047                         RETURN(-EFAULT);
1048
1049                 rc = obd_statfs(NULL, tgt->ltd_exp, &stat_buf,
1050                                 cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
1051                                 0);
1052                 if (rc)
1053                         RETURN(rc);
1054                 if (copy_to_user(data->ioc_pbuf1, &stat_buf,
1055                                  min((int) data->ioc_plen1,
1056                                      (int) sizeof(stat_buf))))
1057                         RETURN(-EFAULT);
1058                 break;
1059         }
1060         case OBD_IOC_QUOTACTL: {
1061                 struct if_quotactl *qctl = karg;
1062                 struct obd_quotactl *oqctl;
1063
1064                 if (qctl->qc_valid == QC_MDTIDX) {
1065                         if (count <= qctl->qc_idx)
1066                                 RETURN(-EINVAL);
1067
1068                         tgt = lmv->tgts[qctl->qc_idx];
1069                         if (tgt == NULL || tgt->ltd_exp == NULL)
1070                                 RETURN(-EINVAL);
1071                 } else if (qctl->qc_valid == QC_UUID) {
1072                         for (i = 0; i < count; i++) {
1073                                 tgt = lmv->tgts[i];
1074                                 if (tgt == NULL)
1075                                         continue;
1076                                 if (!obd_uuid_equals(&tgt->ltd_uuid,
1077                                                      &qctl->obd_uuid))
1078                                         continue;
1079
1080                                 if (tgt->ltd_exp == NULL)
1081                                         RETURN(-EINVAL);
1082
1083                                 break;
1084                         }
1085                 } else {
1086                         RETURN(-EINVAL);
1087                 }
1088
1089                 if (i >= count)
1090                         RETURN(-EAGAIN);
1091
1092                 LASSERT(tgt != NULL && tgt->ltd_exp != NULL);
1093                 OBD_ALLOC_PTR(oqctl);
1094                 if (!oqctl)
1095                         RETURN(-ENOMEM);
1096
1097                 QCTL_COPY(oqctl, qctl);
1098                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
1099                 if (rc == 0) {
1100                         QCTL_COPY(qctl, oqctl);
1101                         qctl->qc_valid = QC_MDTIDX;
1102                         qctl->obd_uuid = tgt->ltd_uuid;
1103                 }
1104                 OBD_FREE_PTR(oqctl);
1105                 break;
1106         }
1107         case OBD_IOC_CHANGELOG_SEND:
1108         case OBD_IOC_CHANGELOG_CLEAR: {
1109                 struct ioc_changelog *icc = karg;
1110
1111                 if (icc->icc_mdtindex >= count)
1112                         RETURN(-ENODEV);
1113
1114                 tgt = lmv->tgts[icc->icc_mdtindex];
1115                 if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active)
1116                         RETURN(-ENODEV);
1117                 rc = obd_iocontrol(cmd, tgt->ltd_exp, sizeof(*icc), icc, NULL);
1118                 break;
1119         }
1120         case LL_IOC_GET_CONNECT_FLAGS: {
1121                 tgt = lmv->tgts[0];
1122                 if (tgt == NULL || tgt->ltd_exp == NULL)
1123                         RETURN(-ENODATA);
1124                 rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1125                 break;
1126         }
1127         case OBD_IOC_FID2PATH: {
1128                 rc = lmv_fid2path(exp, len, karg, uarg);
1129                 break;
1130         }
1131         case LL_IOC_HSM_STATE_GET:
1132         case LL_IOC_HSM_STATE_SET:
1133         case LL_IOC_HSM_ACTION: {
1134                 struct md_op_data       *op_data = karg;
1135
1136                 tgt = lmv_find_target(lmv, &op_data->op_fid1);
1137                 if (IS_ERR(tgt))
1138                         RETURN(PTR_ERR(tgt));
1139
1140                 if (tgt->ltd_exp == NULL)
1141                         RETURN(-EINVAL);
1142
1143                 rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1144                 break;
1145         }
1146         case LL_IOC_HSM_PROGRESS: {
1147                 const struct hsm_progress_kernel *hpk = karg;
1148
1149                 tgt = lmv_find_target(lmv, &hpk->hpk_fid);
1150                 if (IS_ERR(tgt))
1151                         RETURN(PTR_ERR(tgt));
1152                 rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1153                 break;
1154         }
1155         case LL_IOC_HSM_REQUEST: {
1156                 struct hsm_user_request *hur = karg;
1157                 unsigned int reqcount = hur->hur_request.hr_itemcount;
1158
1159                 if (reqcount == 0)
1160                         RETURN(0);
1161
1162                 /* if the request is about a single fid
1163                  * or if there is a single MDS, no need to split
1164                  * the request. */
1165                 if (reqcount == 1 || count == 1) {
1166                         tgt = lmv_find_target(lmv,
1167                                               &hur->hur_user_item[0].hui_fid);
1168                         if (IS_ERR(tgt))
1169                                 RETURN(PTR_ERR(tgt));
1170                         rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1171                 } else {
1172                         /* split fid list to their respective MDS */
1173                         for (i = 0; i < count; i++) {
1174                                 unsigned int            nr, reqlen;
1175                                 int                     rc1;
1176                                 struct hsm_user_request *req;
1177
1178                                 tgt = lmv->tgts[i];
1179                                 if (tgt == NULL || tgt->ltd_exp == NULL)
1180                                         continue;
1181
1182                                 nr = lmv_hsm_req_count(lmv, hur, tgt);
1183                                 if (nr == 0) /* nothing for this MDS */
1184                                         continue;
1185
1186                                 /* build a request with fids for this MDS */
1187                                 reqlen = offsetof(typeof(*hur),
1188                                                   hur_user_item[nr])
1189                                                 + hur->hur_request.hr_data_len;
1190                                 OBD_ALLOC_LARGE(req, reqlen);
1191                                 if (req == NULL)
1192                                         RETURN(-ENOMEM);
1193
1194                                 lmv_hsm_req_build(lmv, hur, tgt, req);
1195
1196                                 rc1 = obd_iocontrol(cmd, tgt->ltd_exp, reqlen,
1197                                                     req, uarg);
1198                                 if (rc1 != 0 && rc == 0)
1199                                         rc = rc1;
1200                                 OBD_FREE_LARGE(req, reqlen);
1201                         }
1202                 }
1203                 break;
1204         }
1205         case LL_IOC_LOV_SWAP_LAYOUTS: {
1206                 struct md_op_data       *op_data = karg;
1207                 struct lmv_tgt_desc     *tgt1, *tgt2;
1208
1209                 tgt1 = lmv_find_target(lmv, &op_data->op_fid1);
1210                 if (IS_ERR(tgt1))
1211                         RETURN(PTR_ERR(tgt1));
1212
1213                 tgt2 = lmv_find_target(lmv, &op_data->op_fid2);
1214                 if (IS_ERR(tgt2))
1215                         RETURN(PTR_ERR(tgt2));
1216
1217                 if ((tgt1->ltd_exp == NULL) || (tgt2->ltd_exp == NULL))
1218                         RETURN(-EINVAL);
1219
1220                 /* only files on same MDT can have their layouts swapped */
1221                 if (tgt1->ltd_idx != tgt2->ltd_idx)
1222                         RETURN(-EPERM);
1223
1224                 rc = obd_iocontrol(cmd, tgt1->ltd_exp, len, karg, uarg);
1225                 break;
1226         }
1227         case LL_IOC_HSM_CT_START: {
1228                 struct lustre_kernelcomm *lk = karg;
1229                 if (lk->lk_flags & LK_FLG_STOP)
1230                         rc = lmv_hsm_ct_unregister(lmv, cmd, len, lk, uarg);
1231                 else
1232                         rc = lmv_hsm_ct_register(lmv, cmd, len, lk, uarg);
1233                 break;
1234         }
1235         default:
1236                 for (i = 0; i < count; i++) {
1237                         struct obd_device *mdc_obd;
1238                         int err;
1239
1240                         tgt = lmv->tgts[i];
1241                         if (tgt == NULL || tgt->ltd_exp == NULL)
1242                                 continue;
1243                         /* ll_umount_begin() sets force flag but for lmv, not
1244                          * mdc. Let's pass it through */
1245                         mdc_obd = class_exp2obd(tgt->ltd_exp);
1246                         mdc_obd->obd_force = obddev->obd_force;
1247                         err = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1248                         if (err == -ENODATA && cmd == OBD_IOC_POLL_QUOTACHECK) {
1249                                 RETURN(err);
1250                         } else if (err) {
1251                                 if (tgt->ltd_active) {
1252                                         CERROR("error: iocontrol MDC %s on MDT"
1253                                                " idx %d cmd %x: err = %d\n",
1254                                                tgt->ltd_uuid.uuid, i, cmd, err);
1255                                         if (!rc)
1256                                                 rc = err;
1257                                 }
1258                         } else
1259                                 set = 1;
1260                 }
1261                 if (!set && !rc)
1262                         rc = -EIO;
1263         }
1264         RETURN(rc);
1265 }
1266
1267 #if 0
1268 static int lmv_all_chars_policy(int count, const char *name,
1269                                 int len)
1270 {
1271         unsigned int c = 0;
1272
1273         while (len > 0)
1274                 c += name[--len];
1275         c = c % count;
1276         return c;
1277 }
1278
1279 static int lmv_nid_policy(struct lmv_obd *lmv)
1280 {
1281         struct obd_import *imp;
1282         __u32              id;
1283
1284         /*
1285          * XXX: To get nid we assume that underlying obd device is mdc.
1286          */
1287         imp = class_exp2cliimp(lmv->tgts[0].ltd_exp);
1288         id = imp->imp_connection->c_self ^ (imp->imp_connection->c_self >> 32);
1289         return id % lmv->desc.ld_tgt_count;
1290 }
1291
1292 static int lmv_choose_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
1293                           placement_policy_t placement)
1294 {
1295         switch (placement) {
1296         case PLACEMENT_CHAR_POLICY:
1297                 return lmv_all_chars_policy(lmv->desc.ld_tgt_count,
1298                                             op_data->op_name,
1299                                             op_data->op_namelen);
1300         case PLACEMENT_NID_POLICY:
1301                 return lmv_nid_policy(lmv);
1302
1303         default:
1304                 break;
1305         }
1306
1307         CERROR("Unsupported placement policy %x\n", placement);
1308         return -EINVAL;
1309 }
1310 #endif
1311
1312 /**
1313  * This is _inode_ placement policy function (not name).
1314  */
1315 static int lmv_placement_policy(struct obd_device *obd,
1316                                 struct md_op_data *op_data,
1317                                 mdsno_t *mds)
1318 {
1319         struct lmv_obd          *lmv = &obd->u.lmv;
1320         ENTRY;
1321
1322         LASSERT(mds != NULL);
1323
1324         if (lmv->desc.ld_tgt_count == 1) {
1325                 *mds = 0;
1326                 RETURN(0);
1327         }
1328
1329         /**
1330          * If stripe_offset is provided during setdirstripe
1331          * (setdirstripe -i xx), xx MDS will be choosen.
1332          */
1333         if (op_data->op_cli_flags & CLI_SET_MEA && op_data->op_data != NULL) {
1334                 struct lmv_user_md *lum;
1335
1336                 lum = op_data->op_data;
1337
1338                 if (le32_to_cpu(lum->lum_stripe_offset) != (__u32)-1) {
1339                         *mds = le32_to_cpu(lum->lum_stripe_offset);
1340                 } else {
1341                         /* -1 means default, which will be in the same MDT with
1342                          * the stripe */
1343                         *mds = op_data->op_mds;
1344                         lum->lum_stripe_offset = cpu_to_le32(op_data->op_mds);
1345                 }
1346         } else {
1347                 /* Allocate new fid on target according to operation type and
1348                  * parent home mds. */
1349                 *mds = op_data->op_mds;
1350         }
1351
1352         RETURN(0);
1353 }
1354
1355 int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
1356                     mdsno_t mds)
1357 {
1358         struct lmv_tgt_desc     *tgt;
1359         int                      rc;
1360         ENTRY;
1361
1362         tgt = lmv_get_target(lmv, mds, NULL);
1363         if (IS_ERR(tgt))
1364                 RETURN(PTR_ERR(tgt));
1365
1366         /*
1367          * New seq alloc and FLD setup should be atomic. Otherwise we may find
1368          * on server that seq in new allocated fid is not yet known.
1369          */
1370         mutex_lock(&tgt->ltd_fid_mutex);
1371
1372         if (tgt->ltd_active == 0 || tgt->ltd_exp == NULL)
1373                 GOTO(out, rc = -ENODEV);
1374
1375         /*
1376          * Asking underlying tgt layer to allocate new fid.
1377          */
1378         rc = obd_fid_alloc(NULL, tgt->ltd_exp, fid, NULL);
1379         if (rc > 0) {
1380                 LASSERT(fid_is_sane(fid));
1381                 rc = 0;
1382         }
1383
1384         EXIT;
1385 out:
1386         mutex_unlock(&tgt->ltd_fid_mutex);
1387         return rc;
1388 }
1389
1390 int lmv_fid_alloc(const struct lu_env *env, struct obd_export *exp,
1391                   struct lu_fid *fid, struct md_op_data *op_data)
1392 {
1393         struct obd_device     *obd = class_exp2obd(exp);
1394         struct lmv_obd        *lmv = &obd->u.lmv;
1395         mdsno_t                mds = 0;
1396         int                    rc;
1397         ENTRY;
1398
1399         LASSERT(op_data != NULL);
1400         LASSERT(fid != NULL);
1401
1402         rc = lmv_placement_policy(obd, op_data, &mds);
1403         if (rc) {
1404                 CERROR("Can't get target for allocating fid, "
1405                        "rc %d\n", rc);
1406                 RETURN(rc);
1407         }
1408
1409         rc = __lmv_fid_alloc(lmv, fid, mds);
1410         if (rc) {
1411                 CERROR("Can't alloc new fid, rc %d\n", rc);
1412                 RETURN(rc);
1413         }
1414
1415         RETURN(rc);
1416 }
1417
1418 static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
1419 {
1420         struct lmv_obd  *lmv = &obd->u.lmv;
1421         struct lmv_desc *desc;
1422         int             rc;
1423         ENTRY;
1424
1425         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
1426                 CERROR("LMV setup requires a descriptor\n");
1427                 RETURN(-EINVAL);
1428         }
1429
1430         desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
1431         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
1432                 CERROR("Lmv descriptor size wrong: %d > %d\n",
1433                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
1434                 RETURN(-EINVAL);
1435         }
1436
1437         OBD_ALLOC(lmv->tgts, sizeof(*lmv->tgts) * 32);
1438         if (lmv->tgts == NULL)
1439                 RETURN(-ENOMEM);
1440         lmv->tgts_size = 32;
1441
1442         obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid);
1443         lmv->desc.ld_tgt_count = 0;
1444         lmv->desc.ld_active_tgt_count = 0;
1445         lmv->max_cookiesize = 0;
1446         lmv->max_def_easize = 0;
1447         lmv->max_easize = 0;
1448         lmv->lmv_placement = PLACEMENT_CHAR_POLICY;
1449
1450         spin_lock_init(&lmv->lmv_lock);
1451         mutex_init(&lmv->init_mutex);
1452
1453 #ifdef LPROCFS
1454         obd->obd_vars = lprocfs_lmv_obd_vars;
1455         lprocfs_seq_obd_setup(obd);
1456         lprocfs_alloc_md_stats(obd, 0);
1457         rc = lprocfs_seq_create(obd->obd_proc_entry, "target_obd",
1458                                 0444, &lmv_proc_target_fops, obd);
1459         if (rc)
1460                 CWARN("%s: error adding LMV target_obd file: rc = %d\n",
1461                       obd->obd_name, rc);
1462 #endif
1463         rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
1464                              LUSTRE_CLI_FLD_HASH_DHT);
1465         if (rc) {
1466                 CERROR("Can't init FLD, err %d\n", rc);
1467                 GOTO(out, rc);
1468         }
1469
1470         RETURN(0);
1471
1472 out:
1473         return rc;
1474 }
1475
1476 static int lmv_cleanup(struct obd_device *obd)
1477 {
1478         struct lmv_obd   *lmv = &obd->u.lmv;
1479         ENTRY;
1480
1481         fld_client_fini(&lmv->lmv_fld);
1482         if (lmv->tgts != NULL) {
1483                 int i;
1484                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1485                         if (lmv->tgts[i] == NULL)
1486                                 continue;
1487                         lmv_del_target(lmv, i);
1488                 }
1489                 OBD_FREE(lmv->tgts, sizeof(*lmv->tgts) * lmv->tgts_size);
1490                 lmv->tgts_size = 0;
1491         }
1492         RETURN(0);
1493 }
1494
1495 static int lmv_process_config(struct obd_device *obd, obd_count len, void *buf)
1496 {
1497         struct lustre_cfg       *lcfg = buf;
1498         struct obd_uuid         obd_uuid;
1499         int                     gen;
1500         __u32                   index;
1501         int                     rc;
1502         ENTRY;
1503
1504         switch (lcfg->lcfg_command) {
1505         case LCFG_ADD_MDC:
1506                 /* modify_mdc_tgts add 0:lustre-clilmv  1:lustre-MDT0000_UUID
1507                  * 2:0  3:1  4:lustre-MDT0000-mdc_UUID */
1508                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid))
1509                         GOTO(out, rc = -EINVAL);
1510
1511                 obd_str2uuid(&obd_uuid,  lustre_cfg_buf(lcfg, 1));
1512
1513                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%u", &index) != 1)
1514                         GOTO(out, rc = -EINVAL);
1515                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
1516                         GOTO(out, rc = -EINVAL);
1517                 rc = lmv_add_target(obd, &obd_uuid, index, gen);
1518                 GOTO(out, rc);
1519         default:
1520                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
1521                 GOTO(out, rc = -EINVAL);
1522         }
1523 out:
1524         RETURN(rc);
1525 }
1526
1527 static int lmv_statfs(const struct lu_env *env, struct obd_export *exp,
1528                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
1529 {
1530         struct obd_device       *obd = class_exp2obd(exp);
1531         struct lmv_obd          *lmv = &obd->u.lmv;
1532         struct obd_statfs       *temp;
1533         int                      rc = 0;
1534         __u32                    i;
1535         ENTRY;
1536
1537         rc = lmv_check_connect(obd);
1538         if (rc)
1539                 RETURN(rc);
1540
1541         OBD_ALLOC(temp, sizeof(*temp));
1542         if (temp == NULL)
1543                 RETURN(-ENOMEM);
1544
1545         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1546                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
1547                         continue;
1548
1549                 rc = obd_statfs(env, lmv->tgts[i]->ltd_exp, temp,
1550                                 max_age, flags);
1551                 if (rc) {
1552                         CERROR("can't stat MDS #%d (%s), error %d\n", i,
1553                                lmv->tgts[i]->ltd_exp->exp_obd->obd_name,
1554                                rc);
1555                         GOTO(out_free_temp, rc);
1556                 }
1557
1558                 if (i == 0) {
1559                         *osfs = *temp;
1560                         /* If the statfs is from mount, it will needs
1561                          * retrieve necessary information from MDT0.
1562                          * i.e. mount does not need the merged osfs
1563                          * from all of MDT.
1564                          * And also clients can be mounted as long as
1565                          * MDT0 is in service*/
1566                         if (flags & OBD_STATFS_FOR_MDT0)
1567                                 GOTO(out_free_temp, rc);
1568                 } else {
1569                         osfs->os_bavail += temp->os_bavail;
1570                         osfs->os_blocks += temp->os_blocks;
1571                         osfs->os_ffree += temp->os_ffree;
1572                         osfs->os_files += temp->os_files;
1573                 }
1574         }
1575
1576         EXIT;
1577 out_free_temp:
1578         OBD_FREE(temp, sizeof(*temp));
1579         return rc;
1580 }
1581
1582 static int lmv_getstatus(struct obd_export *exp,
1583                          struct lu_fid *fid,
1584                          struct obd_capa **pc)
1585 {
1586         struct obd_device    *obd = exp->exp_obd;
1587         struct lmv_obd       *lmv = &obd->u.lmv;
1588         int                   rc;
1589         ENTRY;
1590
1591         rc = lmv_check_connect(obd);
1592         if (rc)
1593                 RETURN(rc);
1594
1595         rc = md_getstatus(lmv->tgts[0]->ltd_exp, fid, pc);
1596         RETURN(rc);
1597 }
1598
1599 static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid,
1600                         struct obd_capa *oc, obd_valid valid, const char *name,
1601                         const char *input, int input_size, int output_size,
1602                         int flags, struct ptlrpc_request **request)
1603 {
1604         struct obd_device      *obd = exp->exp_obd;
1605         struct lmv_obd         *lmv = &obd->u.lmv;
1606         struct lmv_tgt_desc    *tgt;
1607         int                     rc;
1608         ENTRY;
1609
1610         rc = lmv_check_connect(obd);
1611         if (rc)
1612                 RETURN(rc);
1613
1614         tgt = lmv_find_target(lmv, fid);
1615         if (IS_ERR(tgt))
1616                 RETURN(PTR_ERR(tgt));
1617
1618         rc = md_getxattr(tgt->ltd_exp, fid, oc, valid, name, input,
1619                          input_size, output_size, flags, request);
1620
1621         RETURN(rc);
1622 }
1623
1624 static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
1625                         struct obd_capa *oc, obd_valid valid, const char *name,
1626                         const char *input, int input_size, int output_size,
1627                         int flags, __u32 suppgid,
1628                         struct ptlrpc_request **request)
1629 {
1630         struct obd_device      *obd = exp->exp_obd;
1631         struct lmv_obd         *lmv = &obd->u.lmv;
1632         struct lmv_tgt_desc    *tgt;
1633         int                     rc;
1634         ENTRY;
1635
1636         rc = lmv_check_connect(obd);
1637         if (rc)
1638                 RETURN(rc);
1639
1640         tgt = lmv_find_target(lmv, fid);
1641         if (IS_ERR(tgt))
1642                 RETURN(PTR_ERR(tgt));
1643
1644         rc = md_setxattr(tgt->ltd_exp, fid, oc, valid, name, input,
1645                          input_size, output_size, flags, suppgid,
1646                          request);
1647
1648         RETURN(rc);
1649 }
1650
1651 static int lmv_getattr(struct obd_export *exp, struct md_op_data *op_data,
1652                        struct ptlrpc_request **request)
1653 {
1654         struct obd_device       *obd = exp->exp_obd;
1655         struct lmv_obd          *lmv = &obd->u.lmv;
1656         struct lmv_tgt_desc     *tgt;
1657         int                      rc;
1658         ENTRY;
1659
1660         rc = lmv_check_connect(obd);
1661         if (rc)
1662                 RETURN(rc);
1663
1664         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1665         if (IS_ERR(tgt))
1666                 RETURN(PTR_ERR(tgt));
1667
1668         if (op_data->op_flags & MF_GET_MDT_IDX) {
1669                 op_data->op_mds = tgt->ltd_idx;
1670                 RETURN(0);
1671         }
1672
1673         rc = md_getattr(tgt->ltd_exp, op_data, request);
1674
1675         RETURN(rc);
1676 }
1677
1678 static int lmv_null_inode(struct obd_export *exp, const struct lu_fid *fid)
1679 {
1680         struct obd_device   *obd = exp->exp_obd;
1681         struct lmv_obd      *lmv = &obd->u.lmv;
1682         __u32                i;
1683         int                  rc;
1684         ENTRY;
1685
1686         rc = lmv_check_connect(obd);
1687         if (rc)
1688                 RETURN(rc);
1689
1690         CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
1691
1692         /*
1693          * With DNE every object can have two locks in different namespaces:
1694          * lookup lock in space of MDT storing direntry and update/open lock in
1695          * space of MDT storing inode.
1696          */
1697         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1698                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
1699                         continue;
1700                 md_null_inode(lmv->tgts[i]->ltd_exp, fid);
1701         }
1702
1703         RETURN(0);
1704 }
1705
1706 static int lmv_find_cbdata(struct obd_export *exp, const struct lu_fid *fid,
1707                            ldlm_iterator_t it, void *data)
1708 {
1709         struct obd_device       *obd = exp->exp_obd;
1710         struct lmv_obd          *lmv = &obd->u.lmv;
1711         int                     i;
1712         int                     tgt;
1713         int                     rc;
1714         ENTRY;
1715
1716         rc = lmv_check_connect(obd);
1717         if (rc)
1718                 RETURN(rc);
1719
1720         CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
1721
1722         /*
1723          * With DNE every object can have two locks in different namespaces:
1724          * lookup lock in space of MDT storing direntry and update/open lock in
1725          * space of MDT storing inode.  Try the MDT that the FID maps to first,
1726          * since this can be easily found, and only try others if that fails.
1727          */
1728         for (i = 0, tgt = lmv_find_target_index(lmv, fid);
1729              i < lmv->desc.ld_tgt_count;
1730              i++, tgt = (tgt + 1) % lmv->desc.ld_tgt_count) {
1731                 if (tgt < 0) {
1732                         CDEBUG(D_HA, "%s: "DFID" is inaccessible: rc = %d\n",
1733                                obd->obd_name, PFID(fid), tgt);
1734                         tgt = 0;
1735                 }
1736
1737                 if (lmv->tgts[tgt] == NULL ||
1738                     lmv->tgts[tgt]->ltd_exp == NULL)
1739                         continue;
1740
1741                 rc = md_find_cbdata(lmv->tgts[tgt]->ltd_exp, fid, it, data);
1742                 if (rc)
1743                         RETURN(rc);
1744         }
1745
1746         RETURN(rc);
1747 }
1748
1749
1750 static int lmv_close(struct obd_export *exp, struct md_op_data *op_data,
1751                      struct md_open_data *mod, struct ptlrpc_request **request)
1752 {
1753         struct obd_device     *obd = exp->exp_obd;
1754         struct lmv_obd        *lmv = &obd->u.lmv;
1755         struct lmv_tgt_desc   *tgt;
1756         int                    rc;
1757         ENTRY;
1758
1759         rc = lmv_check_connect(obd);
1760         if (rc)
1761                 RETURN(rc);
1762
1763         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1764         if (IS_ERR(tgt))
1765                 RETURN(PTR_ERR(tgt));
1766
1767         CDEBUG(D_INODE, "CLOSE "DFID"\n", PFID(&op_data->op_fid1));
1768         rc = md_close(tgt->ltd_exp, op_data, mod, request);
1769         RETURN(rc);
1770 }
1771
1772 /**
1773  * Choosing the MDT by name or FID in @op_data.
1774  * For non-striped directory, it will locate MDT by fid.
1775  * For striped-directory, it will locate MDT by name. And also
1776  * it will reset op_fid1 with the FID of the choosen stripe.
1777  **/
1778 struct lmv_tgt_desc *
1779 lmv_locate_target_for_name(struct lmv_obd *lmv, struct lmv_stripe_md *lsm,
1780                            const char *name, int namelen, struct lu_fid *fid,
1781                            mdsno_t *mds)
1782 {
1783         struct lmv_tgt_desc     *tgt;
1784         const struct lmv_oinfo  *oinfo;
1785
1786         oinfo = lsm_name_to_stripe_info(lsm, name, namelen);
1787         if (IS_ERR(oinfo))
1788                 RETURN(ERR_CAST(oinfo));
1789         *fid = oinfo->lmo_fid;
1790         *mds = oinfo->lmo_mds;
1791         tgt = lmv_get_target(lmv, *mds, NULL);
1792
1793         CDEBUG(D_INFO, "locate on mds %u "DFID"\n", *mds, PFID(fid));
1794         return tgt;
1795 }
1796
1797 /**
1798  * Locate mds by fid or name
1799  *
1800  * For striped directory (lsm != NULL), it will locate the stripe
1801  * by name hash (see lsm_name_to_stripe_info()). Note: if the hash_type
1802  * is unknown, it will return -EBADFD, and lmv_intent_lookup might need
1803  * walk through all of stripes to locate the entry.
1804  *
1805  * For normal direcotry, it will locate MDS by FID directly.
1806  * \param[in] lmv       LMV device
1807  * \param[in] op_data   client MD stack parameters, name, namelen
1808  *                      mds_num etc.
1809  * \param[in] fid       object FID used to locate MDS.
1810  *
1811  * retval               pointer to the lmv_tgt_desc if succeed.
1812  *                      ERR_PTR(errno) if failed.
1813  */
1814 struct lmv_tgt_desc
1815 *lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
1816                 struct lu_fid *fid)
1817 {
1818         struct lmv_stripe_md    *lsm = op_data->op_mea1;
1819         struct lmv_tgt_desc     *tgt;
1820
1821         if (lsm == NULL || op_data->op_namelen == 0) {
1822                 tgt = lmv_find_target(lmv, fid);
1823                 if (IS_ERR(tgt))
1824                         return tgt;
1825
1826                 op_data->op_mds = tgt->ltd_idx;
1827                 return tgt;
1828         }
1829
1830         return lmv_locate_target_for_name(lmv, lsm, op_data->op_name,
1831                                           op_data->op_namelen, fid,
1832                                           &op_data->op_mds);
1833 }
1834
1835 int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
1836                const void *data, int datalen, int mode, __u32 uid,
1837                __u32 gid, cfs_cap_t cap_effective, __u64 rdev,
1838                struct ptlrpc_request **request)
1839 {
1840         struct obd_device       *obd = exp->exp_obd;
1841         struct lmv_obd          *lmv = &obd->u.lmv;
1842         struct lmv_tgt_desc     *tgt;
1843         int                      rc;
1844         ENTRY;
1845
1846         rc = lmv_check_connect(obd);
1847         if (rc)
1848                 RETURN(rc);
1849
1850         if (!lmv->desc.ld_active_tgt_count)
1851                 RETURN(-EIO);
1852
1853         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1854         if (IS_ERR(tgt))
1855                 RETURN(PTR_ERR(tgt));
1856
1857         CDEBUG(D_INODE, "CREATE name '%.*s' on "DFID" -> mds #%x\n",
1858                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1859                op_data->op_mds);
1860
1861         rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1862         if (rc)
1863                 RETURN(rc);
1864
1865         /* Send the create request to the MDT where the object
1866          * will be located */
1867         tgt = lmv_find_target(lmv, &op_data->op_fid2);
1868         if (IS_ERR(tgt))
1869                 RETURN(PTR_ERR(tgt));
1870
1871         op_data->op_mds = tgt->ltd_idx;
1872
1873         CDEBUG(D_INODE, "CREATE obj "DFID" -> mds #%x\n",
1874                PFID(&op_data->op_fid2), op_data->op_mds);
1875
1876         op_data->op_flags |= MF_MDC_CANCEL_FID1;
1877         rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid,
1878                        cap_effective, rdev, request);
1879         if (rc == 0) {
1880                 if (*request == NULL)
1881                         RETURN(rc);
1882                 CDEBUG(D_INODE, "Created - "DFID"\n", PFID(&op_data->op_fid2));
1883         }
1884         RETURN(rc);
1885 }
1886
1887 static int lmv_done_writing(struct obd_export *exp,
1888                             struct md_op_data *op_data,
1889                             struct md_open_data *mod)
1890 {
1891         struct obd_device     *obd = exp->exp_obd;
1892         struct lmv_obd        *lmv = &obd->u.lmv;
1893         struct lmv_tgt_desc   *tgt;
1894         int                    rc;
1895         ENTRY;
1896
1897         rc = lmv_check_connect(obd);
1898         if (rc)
1899                 RETURN(rc);
1900
1901         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1902         if (IS_ERR(tgt))
1903                 RETURN(PTR_ERR(tgt));
1904
1905         rc = md_done_writing(tgt->ltd_exp, op_data, mod);
1906         RETURN(rc);
1907 }
1908
1909 static int
1910 lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1911             const union ldlm_policy_data *policy,
1912             struct lookup_intent *it, struct md_op_data *op_data,
1913             struct lustre_handle *lockh, __u64 extra_lock_flags)
1914 {
1915         struct obd_device        *obd = exp->exp_obd;
1916         struct lmv_obd           *lmv = &obd->u.lmv;
1917         struct lmv_tgt_desc      *tgt;
1918         int                       rc;
1919         ENTRY;
1920
1921         rc = lmv_check_connect(obd);
1922         if (rc)
1923                 RETURN(rc);
1924
1925         CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID"\n",
1926                LL_IT2STR(it), PFID(&op_data->op_fid1));
1927
1928         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1929         if (IS_ERR(tgt))
1930                 RETURN(PTR_ERR(tgt));
1931
1932         CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID" -> mds #%d\n",
1933                LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx);
1934
1935         rc = md_enqueue(tgt->ltd_exp, einfo, policy, it, op_data, lockh,
1936                         extra_lock_flags);
1937
1938         RETURN(rc);
1939 }
1940
1941 static int
1942 lmv_getattr_name(struct obd_export *exp,struct md_op_data *op_data,
1943                  struct ptlrpc_request **preq)
1944 {
1945         struct ptlrpc_request   *req = NULL;
1946         struct obd_device       *obd = exp->exp_obd;
1947         struct lmv_obd          *lmv = &obd->u.lmv;
1948         struct lmv_tgt_desc     *tgt;
1949         struct mdt_body         *body;
1950         int                      rc;
1951         ENTRY;
1952
1953         rc = lmv_check_connect(obd);
1954         if (rc)
1955                 RETURN(rc);
1956
1957         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1958         if (IS_ERR(tgt))
1959                 RETURN(PTR_ERR(tgt));
1960
1961         CDEBUG(D_INODE, "GETATTR_NAME for %*s on "DFID" -> mds #%d\n",
1962                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1963                tgt->ltd_idx);
1964
1965         rc = md_getattr_name(tgt->ltd_exp, op_data, preq);
1966         if (rc != 0)
1967                 RETURN(rc);
1968
1969         body = req_capsule_server_get(&(*preq)->rq_pill, &RMF_MDT_BODY);
1970         LASSERT(body != NULL);
1971
1972         if (body->mbo_valid & OBD_MD_MDS) {
1973                 struct lu_fid rid = body->mbo_fid1;
1974                 CDEBUG(D_INODE, "Request attrs for "DFID"\n",
1975                        PFID(&rid));
1976
1977                 tgt = lmv_find_target(lmv, &rid);
1978                 if (IS_ERR(tgt)) {
1979                         ptlrpc_req_finished(*preq);
1980                         preq = NULL;
1981                         RETURN(PTR_ERR(tgt));
1982                 }
1983
1984                 op_data->op_fid1 = rid;
1985                 op_data->op_valid |= OBD_MD_FLCROSSREF;
1986                 op_data->op_namelen = 0;
1987                 op_data->op_name = NULL;
1988                 rc = md_getattr_name(tgt->ltd_exp, op_data, &req);
1989                 ptlrpc_req_finished(*preq);
1990                 *preq = req;
1991         }
1992
1993         RETURN(rc);
1994 }
1995
1996 #define md_op_data_fid(op_data, fl)                     \
1997         (fl == MF_MDC_CANCEL_FID1 ? &op_data->op_fid1 : \
1998          fl == MF_MDC_CANCEL_FID2 ? &op_data->op_fid2 : \
1999          fl == MF_MDC_CANCEL_FID3 ? &op_data->op_fid3 : \
2000          fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \
2001          NULL)
2002
2003 static int lmv_early_cancel(struct obd_export *exp, struct lmv_tgt_desc *tgt,
2004                             struct md_op_data *op_data,
2005                             int op_tgt, ldlm_mode_t mode, int bits, int flag)
2006 {
2007         struct lu_fid          *fid = md_op_data_fid(op_data, flag);
2008         struct obd_device      *obd = exp->exp_obd;
2009         struct lmv_obd         *lmv = &obd->u.lmv;
2010         ldlm_policy_data_t      policy = {{ 0 }};
2011         int                     rc = 0;
2012         ENTRY;
2013
2014         if (!fid_is_sane(fid))
2015                 RETURN(0);
2016
2017         if (tgt == NULL) {
2018                 tgt = lmv_find_target(lmv, fid);
2019                 if (IS_ERR(tgt))
2020                         RETURN(PTR_ERR(tgt));
2021         }
2022
2023         if (tgt->ltd_idx != op_tgt) {
2024                 CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid));
2025                 policy.l_inodebits.bits = bits;
2026                 rc = md_cancel_unused(tgt->ltd_exp, fid, &policy,
2027                                       mode, LCF_ASYNC, NULL);
2028         } else {
2029                 CDEBUG(D_INODE,
2030                        "EARLY_CANCEL skip operation target %d on "DFID"\n",
2031                        op_tgt, PFID(fid));
2032                 op_data->op_flags |= flag;
2033                 rc = 0;
2034         }
2035
2036         RETURN(rc);
2037 }
2038
2039 /*
2040  * llite passes fid of an target inode in op_data->op_fid1 and id of directory in
2041  * op_data->op_fid2
2042  */
2043 static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
2044                     struct ptlrpc_request **request)
2045 {
2046         struct obd_device       *obd = exp->exp_obd;
2047         struct lmv_obd          *lmv = &obd->u.lmv;
2048         struct lmv_tgt_desc     *tgt;
2049         int                      rc;
2050         ENTRY;
2051
2052         rc = lmv_check_connect(obd);
2053         if (rc)
2054                 RETURN(rc);
2055
2056         LASSERT(op_data->op_namelen != 0);
2057
2058         CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n",
2059                PFID(&op_data->op_fid2), op_data->op_namelen,
2060                op_data->op_name, PFID(&op_data->op_fid1));
2061
2062         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
2063         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
2064         op_data->op_cap = cfs_curproc_cap_pack();
2065         if (op_data->op_mea2 != NULL) {
2066                 struct lmv_stripe_md    *lsm = op_data->op_mea2;
2067                 const struct lmv_oinfo  *oinfo;
2068
2069                 oinfo = lsm_name_to_stripe_info(lsm, op_data->op_name,
2070                                                 op_data->op_namelen);
2071                 if (IS_ERR(oinfo))
2072                         RETURN(PTR_ERR(oinfo));
2073
2074                 op_data->op_fid2 = oinfo->lmo_fid;
2075         }
2076
2077         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
2078         if (IS_ERR(tgt))
2079                 RETURN(PTR_ERR(tgt));
2080
2081         /*
2082          * Cancel UPDATE lock on child (fid1).
2083          */
2084         op_data->op_flags |= MF_MDC_CANCEL_FID2;
2085         rc = lmv_early_cancel(exp, NULL, op_data, tgt->ltd_idx, LCK_EX,
2086                               MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
2087         if (rc != 0)
2088                 RETURN(rc);
2089
2090         rc = md_link(tgt->ltd_exp, op_data, request);
2091
2092         RETURN(rc);
2093 }
2094
2095 static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
2096                       const char *old, int oldlen, const char *new, int newlen,
2097                       struct ptlrpc_request **request)
2098 {
2099         struct obd_device       *obd = exp->exp_obd;
2100         struct lmv_obd          *lmv = &obd->u.lmv;
2101         struct lmv_tgt_desc     *src_tgt;
2102         int                     rc;
2103         ENTRY;
2104
2105         LASSERT(oldlen != 0);
2106
2107         CDEBUG(D_INODE, "RENAME %.*s in "DFID":%d to %.*s in "DFID":%d\n",
2108                oldlen, old, PFID(&op_data->op_fid1),
2109                op_data->op_mea1 ? op_data->op_mea1->lsm_md_stripe_count : 0,
2110                newlen, new, PFID(&op_data->op_fid2),
2111                op_data->op_mea2 ? op_data->op_mea2->lsm_md_stripe_count : 0);
2112
2113         rc = lmv_check_connect(obd);
2114         if (rc)
2115                 RETURN(rc);
2116
2117         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
2118         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
2119         op_data->op_cap = cfs_curproc_cap_pack();
2120         if (op_data->op_cli_flags & CLI_MIGRATE) {
2121                 LASSERTF(fid_is_sane(&op_data->op_fid3), "invalid FID "DFID"\n",
2122                          PFID(&op_data->op_fid3));
2123                 rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
2124                 if (rc)
2125                         RETURN(rc);
2126                 src_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid3);
2127         } else {
2128                 if (op_data->op_mea1 != NULL) {
2129                         struct lmv_stripe_md    *lsm = op_data->op_mea1;
2130
2131                         src_tgt = lmv_locate_target_for_name(lmv, lsm, old,
2132                                                              oldlen,
2133                                                              &op_data->op_fid1,
2134                                                              &op_data->op_mds);
2135                         if (IS_ERR(src_tgt))
2136                                 RETURN(PTR_ERR(src_tgt));
2137                 } else {
2138                         src_tgt = lmv_find_target(lmv, &op_data->op_fid1);
2139                         if (IS_ERR(src_tgt))
2140                                 RETURN(PTR_ERR(src_tgt));
2141
2142                         op_data->op_mds = src_tgt->ltd_idx;
2143                 }
2144
2145                 if (op_data->op_mea2) {
2146                         struct lmv_stripe_md    *lsm = op_data->op_mea2;
2147                         const struct lmv_oinfo  *oinfo;
2148
2149                         oinfo = lsm_name_to_stripe_info(lsm, new, newlen);
2150                         if (IS_ERR(oinfo))
2151                                 RETURN(PTR_ERR(oinfo));
2152
2153                         op_data->op_fid2 = oinfo->lmo_fid;
2154                 }
2155         }
2156         if (IS_ERR(src_tgt))
2157                 RETURN(PTR_ERR(src_tgt));
2158
2159         /*
2160          * LOOKUP lock on src child (fid3) should also be cancelled for
2161          * src_tgt in mdc_rename.
2162          */
2163         op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
2164
2165         /*
2166          * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
2167          * own target.
2168          */
2169         rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
2170                               LCK_EX, MDS_INODELOCK_UPDATE,
2171                               MF_MDC_CANCEL_FID2);
2172
2173         if (rc != 0)
2174                 RETURN(rc);
2175         /*
2176          * Cancel LOOKUP locks on source child (fid3) for parent tgt_tgt.
2177          */
2178         if (fid_is_sane(&op_data->op_fid3)) {
2179                 struct lmv_tgt_desc *tgt;
2180
2181                 tgt = lmv_find_target(lmv, &op_data->op_fid1);
2182                 if (IS_ERR(tgt))
2183                         RETURN(PTR_ERR(tgt));
2184
2185                 /* Cancel LOOKUP lock on its parent */
2186                 rc = lmv_early_cancel(exp, tgt, op_data, src_tgt->ltd_idx,
2187                                       LCK_EX, MDS_INODELOCK_LOOKUP,
2188                                       MF_MDC_CANCEL_FID3);
2189                 if (rc != 0)
2190                         RETURN(rc);
2191
2192                 rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
2193                                       LCK_EX, MDS_INODELOCK_FULL,
2194                                       MF_MDC_CANCEL_FID3);
2195                 if (rc != 0)
2196                         RETURN(rc);
2197         }
2198
2199         /*
2200          * Cancel all the locks on tgt child (fid4).
2201          */
2202         if (fid_is_sane(&op_data->op_fid4))
2203                 rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
2204                                       LCK_EX, MDS_INODELOCK_FULL,
2205                                       MF_MDC_CANCEL_FID4);
2206
2207         CDEBUG(D_INODE, DFID":m%d to "DFID"\n", PFID(&op_data->op_fid1),
2208                op_data->op_mds, PFID(&op_data->op_fid2));
2209
2210         rc = md_rename(src_tgt->ltd_exp, op_data, old, oldlen, new, newlen,
2211                        request);
2212
2213         RETURN(rc);
2214 }
2215
2216 static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
2217                        void *ea, int ealen, void *ea2, int ea2len,
2218                        struct ptlrpc_request **request,
2219                        struct md_open_data **mod)
2220 {
2221         struct obd_device       *obd = exp->exp_obd;
2222         struct lmv_obd          *lmv = &obd->u.lmv;
2223         struct lmv_tgt_desc     *tgt;
2224         int                      rc = 0;
2225         ENTRY;
2226
2227         rc = lmv_check_connect(obd);
2228         if (rc)
2229                 RETURN(rc);
2230
2231         CDEBUG(D_INODE, "SETATTR for "DFID", valid 0x%x\n",
2232                PFID(&op_data->op_fid1), op_data->op_attr.ia_valid);
2233
2234         op_data->op_flags |= MF_MDC_CANCEL_FID1;
2235         tgt = lmv_find_target(lmv, &op_data->op_fid1);
2236         if (IS_ERR(tgt))
2237                 RETURN(PTR_ERR(tgt));
2238
2239         rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen, ea2,
2240                         ea2len, request, mod);
2241
2242         RETURN(rc);
2243 }
2244
2245 static int lmv_fsync(struct obd_export *exp, const struct lu_fid *fid,
2246                      struct obd_capa *oc, struct ptlrpc_request **request)
2247 {
2248         struct obd_device       *obd = exp->exp_obd;
2249         struct lmv_obd          *lmv = &obd->u.lmv;
2250         struct lmv_tgt_desc     *tgt;
2251         int                      rc;
2252         ENTRY;
2253
2254         rc = lmv_check_connect(obd);
2255         if (rc != 0)
2256                 RETURN(rc);
2257
2258         tgt = lmv_find_target(lmv, fid);
2259         if (IS_ERR(tgt))
2260                 RETURN(PTR_ERR(tgt));
2261
2262         rc = md_fsync(tgt->ltd_exp, fid, oc, request);
2263         RETURN(rc);
2264 }
2265
2266 /*
2267  * Adjust a set of pages, each page containing an array of lu_dirpages,
2268  * so that each page can be used as a single logical lu_dirpage.
2269  *
2270  * A lu_dirpage is laid out as follows, where s = ldp_hash_start,
2271  * e = ldp_hash_end, f = ldp_flags, p = padding, and each "ent" is a
2272  * struct lu_dirent.  It has size up to LU_PAGE_SIZE. The ldp_hash_end
2273  * value is used as a cookie to request the next lu_dirpage in a
2274  * directory listing that spans multiple pages (two in this example):
2275  *   ________
2276  *  |        |
2277  * .|--------v-------   -----.
2278  * |s|e|f|p|ent|ent| ... |ent|
2279  * '--|--------------   -----'   Each CFS_PAGE contains a single
2280  *    '------.                   lu_dirpage.
2281  * .---------v-------   -----.
2282  * |s|e|f|p|ent| 0 | ... | 0 |
2283  * '-----------------   -----'
2284  *
2285  * However, on hosts where the native VM page size (PAGE_CACHE_SIZE) is
2286  * larger than LU_PAGE_SIZE, a single host page may contain multiple
2287  * lu_dirpages. After reading the lu_dirpages from the MDS, the
2288  * ldp_hash_end of the first lu_dirpage refers to the one immediately
2289  * after it in the same CFS_PAGE (arrows simplified for brevity, but
2290  * in general e0==s1, e1==s2, etc.):
2291  *
2292  * .--------------------   -----.
2293  * |s0|e0|f0|p|ent|ent| ... |ent|
2294  * |---v----------------   -----|
2295  * |s1|e1|f1|p|ent|ent| ... |ent|
2296  * |---v----------------   -----|  Here, each CFS_PAGE contains
2297  *             ...                 multiple lu_dirpages.
2298  * |---v----------------   -----|
2299  * |s'|e'|f'|p|ent|ent| ... |ent|
2300  * '---|----------------   -----'
2301  *     v
2302  * .----------------------------.
2303  * |        next CFS_PAGE       |
2304  *
2305  * This structure is transformed into a single logical lu_dirpage as follows:
2306  *
2307  * - Replace e0 with e' so the request for the next lu_dirpage gets the page
2308  *   labeled 'next CFS_PAGE'.
2309  *
2310  * - Copy the LDF_COLLIDE flag from f' to f0 to correctly reflect whether
2311  *   a hash collision with the next page exists.
2312  *
2313  * - Adjust the lde_reclen of the ending entry of each lu_dirpage to span
2314  *   to the first entry of the next lu_dirpage.
2315  */
2316 #if PAGE_CACHE_SIZE > LU_PAGE_SIZE
2317 static void lmv_adjust_dirpages(struct page **pages, int ncfspgs, int nlupgs)
2318 {
2319         int i;
2320
2321         for (i = 0; i < ncfspgs; i++) {
2322                 struct lu_dirpage       *dp = kmap(pages[i]);
2323                 struct lu_dirpage       *first = dp;
2324                 struct lu_dirent        *end_dirent = NULL;
2325                 struct lu_dirent        *ent;
2326                 __u64                   hash_end = dp->ldp_hash_end;
2327                 __u32                   flags = dp->ldp_flags;
2328
2329                 while (--nlupgs > 0) {
2330                         ent = lu_dirent_start(dp);
2331                         for (end_dirent = ent; ent != NULL;
2332                              end_dirent = ent, ent = lu_dirent_next(ent));
2333
2334                         /* Advance dp to next lu_dirpage. */
2335                         dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2336
2337                         /* Check if we've reached the end of the CFS_PAGE. */
2338                         if (!((unsigned long)dp & ~CFS_PAGE_MASK))
2339                                 break;
2340
2341                         /* Save the hash and flags of this lu_dirpage. */
2342                         hash_end = dp->ldp_hash_end;
2343                         flags = dp->ldp_flags;
2344
2345                         /* Check if lu_dirpage contains no entries. */
2346                         if (!end_dirent)
2347                                 break;
2348
2349                         /* Enlarge the end entry lde_reclen from 0 to
2350                          * first entry of next lu_dirpage. */
2351                         LASSERT(le16_to_cpu(end_dirent->lde_reclen) == 0);
2352                         end_dirent->lde_reclen =
2353                                 cpu_to_le16((char *)(dp->ldp_entries) -
2354                                             (char *)end_dirent);
2355                 }
2356
2357                 first->ldp_hash_end = hash_end;
2358                 first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
2359                 first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
2360
2361                 kunmap(pages[i]);
2362         }
2363         LASSERTF(nlupgs == 0, "left = %d", nlupgs);
2364 }
2365 #else
2366 #define lmv_adjust_dirpages(pages, ncfspgs, nlupgs) do {} while (0)
2367 #endif  /* PAGE_CACHE_SIZE > LU_PAGE_SIZE */
2368
2369 /**
2370  * Get current minimum entry from striped directory
2371  *
2372  * This function will search the dir entry, whose hash value is the
2373  * closest(>=) to @hash_offset, from all of sub-stripes, and it is
2374  * only being called for striped directory.
2375  *
2376  * \param[in] exp               export of LMV
2377  * \param[in] op_data           parameters transferred beween client MD stack
2378  *                              stripe_information will be included in this
2379  *                              parameter
2380  * \param[in] cb_op             ldlm callback being used in enqueue in
2381  *                              mdc_read_page
2382  * \param[in] hash_offset       the hash value, which is used to locate
2383  *                              minum(closet) dir entry
2384  * \param[in|out] stripe_offset the caller use this to indicate the stripe
2385  *                              index of last entry, so to avoid hash conflict
2386  *                              between stripes. It will also be used to
2387  *                              return the stripe index of current dir entry.
2388  * \param[in|out] entp          the minum entry and it also is being used
2389  *                              to input the last dir entry to resolve the
2390  *                              hash conflict
2391  *
2392  * \param[out] ppage            the page which holds the minum entry
2393  *
2394  * \retval                      = 0 get the entry successfully
2395  *                              negative errno (< 0) does not get the entry
2396  */
2397 static int lmv_get_min_striped_entry(struct obd_export *exp,
2398                                      struct md_op_data *op_data,
2399                                      struct md_callback *cb_op,
2400                                      __u64 hash_offset, int *stripe_offset,
2401                                      struct lu_dirent **entp,
2402                                      struct page **ppage)
2403 {
2404         struct obd_device       *obd = exp->exp_obd;
2405         struct lmv_obd          *lmv = &obd->u.lmv;
2406         struct lmv_stripe_md    *lsm = op_data->op_mea1;
2407         struct lmv_tgt_desc     *tgt;
2408         int                     stripe_count;
2409         struct lu_dirent        *min_ent = NULL;
2410         struct page             *min_page = NULL;
2411         int                     min_idx = 0;
2412         int                     i;
2413         int                     rc = 0;
2414         ENTRY;
2415
2416         stripe_count = lsm->lsm_md_stripe_count;
2417         for (i = 0; i < stripe_count; i++) {
2418                 struct lu_dirent        *ent = NULL;
2419                 struct page             *page = NULL;
2420                 struct lu_dirpage       *dp;
2421                 __u64                   stripe_hash = hash_offset;
2422
2423                 tgt = lmv_get_target(lmv, lsm->lsm_md_oinfo[i].lmo_mds, NULL);
2424                 if (IS_ERR(tgt))
2425                         GOTO(out, rc = PTR_ERR(tgt));
2426
2427                 /* op_data will be shared by each stripe, so we need
2428                  * reset these value for each stripe */
2429                 op_data->op_stripe_offset = i;
2430                 op_data->op_fid1 = lsm->lsm_md_oinfo[i].lmo_fid;
2431                 op_data->op_fid2 = lsm->lsm_md_oinfo[i].lmo_fid;
2432                 op_data->op_data = lsm->lsm_md_oinfo[i].lmo_root;
2433 next:
2434                 rc = md_read_page(tgt->ltd_exp, op_data, cb_op, stripe_hash,
2435                                   &page);
2436                 if (rc != 0)
2437                         GOTO(out, rc);
2438
2439                 dp = page_address(page);
2440                 for (ent = lu_dirent_start(dp); ent != NULL;
2441                      ent = lu_dirent_next(ent)) {
2442                         /* Skip dummy entry */
2443                         if (le16_to_cpu(ent->lde_namelen) == 0)
2444                                 continue;
2445
2446                         if (le64_to_cpu(ent->lde_hash) < hash_offset)
2447                                 continue;
2448
2449                         if (le64_to_cpu(ent->lde_hash) == hash_offset &&
2450                             (*entp == ent || i < *stripe_offset))
2451                                 continue;
2452
2453                         /* skip . and .. for other stripes */
2454                         if (i != 0 &&
2455                             (strncmp(ent->lde_name, ".",
2456                                      le16_to_cpu(ent->lde_namelen)) == 0 ||
2457                              strncmp(ent->lde_name, "..",
2458                                      le16_to_cpu(ent->lde_namelen)) == 0))
2459                                 continue;
2460                         break;
2461                 }
2462
2463                 if (ent == NULL) {
2464                         stripe_hash = le64_to_cpu(dp->ldp_hash_end);
2465
2466                         kunmap(page);
2467                         page_cache_release(page);
2468                         page = NULL;
2469
2470                         /* reach the end of current stripe, go to next stripe */
2471                         if (stripe_hash == MDS_DIR_END_OFF)
2472                                 continue;
2473                         else
2474                                 goto next;
2475                 }
2476
2477                 if (min_ent != NULL) {
2478                         if (le64_to_cpu(min_ent->lde_hash) >
2479                             le64_to_cpu(ent->lde_hash)) {
2480                                 min_ent = ent;
2481                                 kunmap(min_page);
2482                                 page_cache_release(min_page);
2483                                 min_idx = i;
2484                                 min_page = page;
2485                         } else {
2486                                 kunmap(page);
2487                                 page_cache_release(page);
2488                                 page = NULL;
2489                         }
2490                 } else {
2491                         min_ent = ent;
2492                         min_page = page;
2493                         min_idx = i;
2494                 }
2495         }
2496
2497 out:
2498         if (*ppage != NULL) {
2499                 kunmap(*ppage);
2500                 page_cache_release(*ppage);
2501         }
2502         *stripe_offset = min_idx;
2503         *entp = min_ent;
2504         *ppage = min_page;
2505         RETURN(rc);
2506 }
2507
2508 /**
2509  * Build dir entry page from a striped directory
2510  *
2511  * This function gets one entry by @offset from a striped directory. It will
2512  * read entries from all of stripes, and choose one closest to the required
2513  * offset(&offset). A few notes
2514  * 1. skip . and .. for non-zero stripes, because there can only have one .
2515  * and .. in a directory.
2516  * 2. op_data will be shared by all of stripes, instead of allocating new
2517  * one, so need to restore before reusing.
2518  * 3. release the entry page if that is not being chosen.
2519  *
2520  * \param[in] exp       obd export refer to LMV
2521  * \param[in] op_data   hold those MD parameters of read_entry
2522  * \param[in] cb_op     ldlm callback being used in enqueue in mdc_read_entry
2523  * \param[out] ldp      the entry being read
2524  * \param[out] ppage    the page holding the entry. Note: because the entry
2525  *                      will be accessed in upper layer, so we need hold the
2526  *                      page until the usages of entry is finished, see
2527  *                      ll_dir_entry_next.
2528  *
2529  * retval               =0 if get entry successfully
2530  *                      <0 cannot get entry
2531  */
2532 static int lmv_read_striped_page(struct obd_export *exp,
2533                                  struct md_op_data *op_data,
2534                                  struct md_callback *cb_op,
2535                                  __u64 offset, struct page **ppage)
2536 {
2537         struct obd_device       *obd = exp->exp_obd;
2538         struct lu_fid           master_fid = op_data->op_fid1;
2539         struct inode            *master_inode = op_data->op_data;
2540         __u64                   hash_offset = offset;
2541         struct lu_dirpage       *dp;
2542         struct page             *min_ent_page = NULL;
2543         struct page             *ent_page = NULL;
2544         struct lu_dirent        *ent;
2545         void                    *area;
2546         int                     ent_idx = 0;
2547         struct lu_dirent        *min_ent = NULL;
2548         struct lu_dirent        *last_ent;
2549         int                     left_bytes;
2550         int                     rc;
2551         ENTRY;
2552
2553         rc = lmv_check_connect(obd);
2554         if (rc)
2555                 RETURN(rc);
2556
2557         /* Allocate a page and read entries from all of stripes and fill
2558          * the page by hash order */
2559         ent_page = alloc_page(GFP_KERNEL);
2560         if (ent_page == NULL)
2561                 RETURN(-ENOMEM);
2562
2563         /* Initialize the entry page */
2564         dp = kmap(ent_page);
2565         memset(dp, 0, sizeof(*dp));
2566         dp->ldp_hash_start = cpu_to_le64(offset);
2567         dp->ldp_flags |= LDF_COLLIDE;
2568
2569         area = dp + 1;
2570         left_bytes = PAGE_CACHE_SIZE - sizeof(*dp);
2571         ent = area;
2572         last_ent = ent;
2573         do {
2574                 __u16   ent_size;
2575
2576                 /* Find the minum entry from all sub-stripes */
2577                 rc = lmv_get_min_striped_entry(exp, op_data, cb_op, hash_offset,
2578                                                &ent_idx, &min_ent,
2579                                                &min_ent_page);
2580                 if (rc != 0)
2581                         GOTO(out, rc);
2582
2583                 /* If it can not get minum entry, it means it already reaches
2584                  * the end of this directory */
2585                 if (min_ent == NULL) {
2586                         last_ent->lde_reclen = 0;
2587                         hash_offset = MDS_DIR_END_OFF;
2588                         GOTO(out, rc);
2589                 }
2590
2591                 ent_size = le16_to_cpu(min_ent->lde_reclen);
2592
2593                 /* the last entry lde_reclen is 0, but it might not
2594                  * the end of this entry of this temporay entry */
2595                 if (ent_size == 0)
2596                         ent_size = lu_dirent_calc_size(
2597                                         le16_to_cpu(min_ent->lde_namelen),
2598                                         le32_to_cpu(min_ent->lde_attrs));
2599                 if (ent_size > left_bytes) {
2600                         last_ent->lde_reclen = cpu_to_le16(0);
2601                         hash_offset = le64_to_cpu(min_ent->lde_hash);
2602                         GOTO(out, rc);
2603                 }
2604
2605                 memcpy(ent, min_ent, ent_size);
2606
2607                 /* Replace . with master FID and Replace .. with the parent FID
2608                  * of master object */
2609                 if (strncmp(ent->lde_name, ".",
2610                             le16_to_cpu(ent->lde_namelen)) == 0 &&
2611                     le16_to_cpu(ent->lde_namelen) == 1)
2612                         fid_cpu_to_le(&ent->lde_fid, &master_fid);
2613                 else if (strncmp(ent->lde_name, "..",
2614                                    le16_to_cpu(ent->lde_namelen)) == 0 &&
2615                            le16_to_cpu(ent->lde_namelen) == 2)
2616                         fid_cpu_to_le(&ent->lde_fid, &op_data->op_fid3);
2617
2618                 left_bytes -= ent_size;
2619                 ent->lde_reclen = cpu_to_le16(ent_size);
2620                 last_ent = ent;
2621                 ent = (void *)ent + ent_size;
2622                 hash_offset = le64_to_cpu(min_ent->lde_hash);
2623                 if (hash_offset == MDS_DIR_END_OFF) {
2624                         last_ent->lde_reclen = 0;
2625                         break;
2626                 }
2627         } while (1);
2628 out:
2629         if (min_ent_page != NULL) {
2630                 kunmap(min_ent_page);
2631                 page_cache_release(min_ent_page);
2632         }
2633
2634         if (unlikely(rc != 0)) {
2635                 __free_page(ent_page);
2636                 ent_page = NULL;
2637         } else {
2638                 if (ent == area)
2639                         dp->ldp_flags |= LDF_EMPTY;
2640                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2641                 dp->ldp_hash_end = cpu_to_le64(hash_offset);
2642         }
2643
2644         /* We do not want to allocate md_op_data during each
2645          * dir entry reading, so op_data will be shared by every stripe,
2646          * then we need to restore it back to original value before
2647          * return to the upper layer */
2648         op_data->op_fid1 = master_fid;
2649         op_data->op_fid2 = master_fid;
2650         op_data->op_data = master_inode;
2651
2652         *ppage = ent_page;
2653
2654         RETURN(rc);
2655 }
2656
2657 int lmv_read_page(struct obd_export *exp, struct md_op_data *op_data,
2658                   struct md_callback *cb_op, __u64 offset,
2659                   struct page **ppage)
2660 {
2661         struct obd_device       *obd = exp->exp_obd;
2662         struct lmv_obd          *lmv = &obd->u.lmv;
2663         struct lmv_stripe_md    *lsm = op_data->op_mea1;
2664         struct lmv_tgt_desc     *tgt;
2665         int                     rc;
2666         ENTRY;
2667
2668         rc = lmv_check_connect(obd);
2669         if (rc != 0)
2670                 RETURN(rc);
2671
2672         if (unlikely(lsm != NULL)) {
2673                 rc = lmv_read_striped_page(exp, op_data, cb_op, offset, ppage);
2674                 RETURN(rc);
2675         }
2676
2677         tgt = lmv_find_target(lmv, &op_data->op_fid1);
2678         if (IS_ERR(tgt))
2679                 RETURN(PTR_ERR(tgt));
2680
2681         rc = md_read_page(tgt->ltd_exp, op_data, cb_op, offset, ppage);
2682
2683         RETURN(rc);
2684 }
2685
2686 /**
2687  * Unlink a file/directory
2688  *
2689  * Unlink a file or directory under the parent dir. The unlink request
2690  * usually will be sent to the MDT where the child is located, but if
2691  * the client does not have the child FID then request will be sent to the
2692  * MDT where the parent is located.
2693  *
2694  * If the parent is a striped directory then it also needs to locate which
2695  * stripe the name of the child is located, and replace the parent FID
2696  * (@op->op_fid1) with the stripe FID. Note: if the stripe is unknown,
2697  * it will walk through all of sub-stripes until the child is being
2698  * unlinked finally.
2699  *
2700  * \param[in] exp       export refer to LMV
2701  * \param[in] op_data   different parameters transferred beween client
2702  *                      MD stacks, name, namelen, FIDs etc.
2703  *                      op_fid1 is the parent FID, op_fid2 is the child
2704  *                      FID.
2705  * \param[out] request  point to the request of unlink.
2706  *
2707  * retval               0 if succeed
2708  *                      negative errno if failed.
2709  */
2710 static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
2711                       struct ptlrpc_request **request)
2712 {
2713         struct obd_device       *obd = exp->exp_obd;
2714         struct lmv_obd          *lmv = &obd->u.lmv;
2715         struct lmv_tgt_desc     *tgt = NULL;
2716         struct lmv_tgt_desc     *parent_tgt = NULL;
2717         struct mdt_body         *body;
2718         int                     rc;
2719         int                     stripe_index = 0;
2720         struct lmv_stripe_md    *lsm = op_data->op_mea1;
2721         ENTRY;
2722
2723         rc = lmv_check_connect(obd);
2724         if (rc)
2725                 RETURN(rc);
2726 retry_unlink:
2727         /* For striped dir, we need to locate the parent as well */
2728         if (lsm != NULL) {
2729                 struct lmv_tgt_desc *tmp;
2730
2731                 LASSERT(op_data->op_name != NULL &&
2732                         op_data->op_namelen != 0);
2733
2734                 tmp = lmv_locate_target_for_name(lmv, lsm,
2735                                                  op_data->op_name,
2736                                                  op_data->op_namelen,
2737                                                  &op_data->op_fid1,
2738                                                  &op_data->op_mds);
2739
2740                 /* return -EBADFD means unknown hash type, might
2741                  * need try all sub-stripe here */
2742                 if (IS_ERR(tmp) && PTR_ERR(tmp) != -EBADFD)
2743                         RETURN(PTR_ERR(tmp));
2744
2745                 /* Note: both migrating dir and unknown hash dir need to
2746                  * try all of sub-stripes, so we need start search the
2747                  * name from stripe 0, but migrating dir is already handled
2748                  * inside lmv_locate_target_for_name(), so we only check
2749                  * unknown hash type directory here */
2750                 if (!lmv_is_known_hash_type(lsm)) {
2751                         struct lmv_oinfo *oinfo;
2752
2753                         oinfo = &lsm->lsm_md_oinfo[stripe_index];
2754
2755                         op_data->op_fid1 = oinfo->lmo_fid;
2756                         op_data->op_mds = oinfo->lmo_mds;
2757                 }
2758         }
2759
2760 try_next_stripe:
2761         /* Send unlink requests to the MDT where the child is located */
2762         if (likely(!fid_is_zero(&op_data->op_fid2)))
2763                 tgt = lmv_find_target(lmv, &op_data->op_fid2);
2764         else if (lsm != NULL)
2765                 tgt = lmv_get_target(lmv, op_data->op_mds, NULL);
2766         else
2767                 tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
2768
2769         if (IS_ERR(tgt))
2770                 RETURN(PTR_ERR(tgt));
2771
2772         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
2773         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
2774         op_data->op_cap = cfs_curproc_cap_pack();
2775
2776         /*
2777          * If child's fid is given, cancel unused locks for it if it is from
2778          * another export than parent.
2779          *
2780          * LOOKUP lock for child (fid3) should also be cancelled on parent
2781          * tgt_tgt in mdc_unlink().
2782          */
2783         op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
2784
2785         /*
2786          * Cancel FULL locks on child (fid3).
2787          */
2788         parent_tgt = lmv_find_target(lmv, &op_data->op_fid1);
2789         if (IS_ERR(parent_tgt))
2790                 RETURN(PTR_ERR(parent_tgt));
2791
2792         if (parent_tgt != tgt) {
2793                 rc = lmv_early_cancel(exp, parent_tgt, op_data, tgt->ltd_idx,
2794                                       LCK_EX, MDS_INODELOCK_LOOKUP,
2795                                       MF_MDC_CANCEL_FID3);
2796         }
2797
2798         rc = lmv_early_cancel(exp, NULL, op_data, tgt->ltd_idx, LCK_EX,
2799                               MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
2800         if (rc != 0)
2801                 RETURN(rc);
2802
2803         CDEBUG(D_INODE, "unlink with fid="DFID"/"DFID" -> mds #%d\n",
2804                PFID(&op_data->op_fid1), PFID(&op_data->op_fid2), tgt->ltd_idx);
2805
2806         rc = md_unlink(tgt->ltd_exp, op_data, request);
2807         if (rc != 0 && rc != -EREMOTE && rc != -ENOENT)
2808                 RETURN(rc);
2809
2810         /* Try next stripe if it is needed. */
2811         if (rc == -ENOENT && lsm != NULL && lmv_need_try_all_stripes(lsm)) {
2812                 struct lmv_oinfo *oinfo;
2813
2814                 stripe_index++;
2815                 if (stripe_index >= lsm->lsm_md_stripe_count)
2816                         RETURN(rc);
2817
2818                 oinfo = &lsm->lsm_md_oinfo[stripe_index];
2819
2820                 op_data->op_fid1 = oinfo->lmo_fid;
2821                 op_data->op_mds = oinfo->lmo_mds;
2822
2823                 ptlrpc_req_finished(*request);
2824                 *request = NULL;
2825
2826                 goto try_next_stripe;
2827         }
2828
2829         body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
2830         if (body == NULL)
2831                 RETURN(-EPROTO);
2832
2833         /* Not cross-ref case, just get out of here. */
2834         if (likely(!(body->mbo_valid & OBD_MD_MDS)))
2835                 RETURN(0);
2836
2837         CDEBUG(D_INODE, "%s: try unlink to another MDT for "DFID"\n",
2838                exp->exp_obd->obd_name, PFID(&body->mbo_fid1));
2839
2840         /* This is a remote object, try remote MDT, Note: it may
2841          * try more than 1 time here, Considering following case
2842          * /mnt/lustre is root on MDT0, remote1 is on MDT1
2843          * 1. Initially A does not know where remote1 is, it send
2844          *    unlink RPC to MDT0, MDT0 return -EREMOTE, it will
2845          *    resend unlink RPC to MDT1 (retry 1st time).
2846          *
2847          * 2. During the unlink RPC in flight,
2848          *    client B mv /mnt/lustre/remote1 /mnt/lustre/remote2
2849          *    and create new remote1, but on MDT0
2850          *
2851          * 3. MDT1 get unlink RPC(from A), then do remote lock on
2852          *    /mnt/lustre, then lookup get fid of remote1, and find
2853          *    it is remote dir again, and replay -EREMOTE again.
2854          *
2855          * 4. Then A will resend unlink RPC to MDT0. (retry 2nd times).
2856          *
2857          * In theory, it might try unlimited time here, but it should
2858          * be very rare case.  */
2859         op_data->op_fid2 = body->mbo_fid1;
2860         ptlrpc_req_finished(*request);
2861         *request = NULL;
2862
2863         goto retry_unlink;
2864 }
2865
2866 static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2867 {
2868         struct lmv_obd *lmv = &obd->u.lmv;
2869         int rc = 0;
2870
2871         switch (stage) {
2872         case OBD_CLEANUP_EARLY:
2873                 /* XXX: here should be calling obd_precleanup() down to
2874                  * stack. */
2875                 break;
2876         case OBD_CLEANUP_EXPORTS:
2877                 fld_client_proc_fini(&lmv->lmv_fld);
2878                 lprocfs_obd_cleanup(obd);
2879                 lprocfs_free_md_stats(obd);
2880                 break;
2881         default:
2882                 break;
2883         }
2884         RETURN(rc);
2885 }
2886
2887 static int lmv_get_info(const struct lu_env *env, struct obd_export *exp,
2888                         __u32 keylen, void *key, __u32 *vallen, void *val,
2889                         struct lov_stripe_md *lsm)
2890 {
2891         struct obd_device       *obd;
2892         struct lmv_obd          *lmv;
2893         int                      rc = 0;
2894         ENTRY;
2895
2896         obd = class_exp2obd(exp);
2897         if (obd == NULL) {
2898                 CDEBUG(D_IOCTL, "Invalid client cookie "LPX64"\n",
2899                        exp->exp_handle.h_cookie);
2900                 RETURN(-EINVAL);
2901         }
2902
2903         lmv = &obd->u.lmv;
2904         if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
2905                 int i;
2906
2907                 rc = lmv_check_connect(obd);
2908                 if (rc)
2909                         RETURN(rc);
2910
2911                 LASSERT(*vallen == sizeof(__u32));
2912                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2913                         struct lmv_tgt_desc *tgt = lmv->tgts[i];
2914                         /*
2915                          * All tgts should be connected when this gets called.
2916                          */
2917                         if (tgt == NULL || tgt->ltd_exp == NULL)
2918                                 continue;
2919
2920                         if (!obd_get_info(env, tgt->ltd_exp, keylen, key,
2921                                           vallen, val, NULL))
2922                                 RETURN(0);
2923                 }
2924                 RETURN(-EINVAL);
2925         } else if (KEY_IS(KEY_MAX_EASIZE) ||
2926                    KEY_IS(KEY_DEFAULT_EASIZE) ||
2927                    KEY_IS(KEY_MAX_COOKIESIZE) ||
2928                    KEY_IS(KEY_DEFAULT_COOKIESIZE) ||
2929                    KEY_IS(KEY_CONN_DATA)) {
2930                 rc = lmv_check_connect(obd);
2931                 if (rc)
2932                         RETURN(rc);
2933
2934                 /*
2935                  * Forwarding this request to first MDS, it should know LOV
2936                  * desc.
2937                  */
2938                 rc = obd_get_info(env, lmv->tgts[0]->ltd_exp, keylen, key,
2939                                   vallen, val, NULL);
2940                 if (!rc && KEY_IS(KEY_CONN_DATA))
2941                         exp->exp_connect_data = *(struct obd_connect_data *)val;
2942                 RETURN(rc);
2943         } else if (KEY_IS(KEY_TGT_COUNT)) {
2944                 *((int *)val) = lmv->desc.ld_tgt_count;
2945                 RETURN(0);
2946         }
2947
2948         CDEBUG(D_IOCTL, "Invalid key\n");
2949         RETURN(-EINVAL);
2950 }
2951
2952 int lmv_set_info_async(const struct lu_env *env, struct obd_export *exp,
2953                        obd_count keylen, void *key, obd_count vallen,
2954                        void *val, struct ptlrpc_request_set *set)
2955 {
2956         struct lmv_tgt_desc    *tgt = NULL;
2957         struct obd_device      *obd;
2958         struct lmv_obd         *lmv;
2959         int rc = 0;
2960         ENTRY;
2961
2962         obd = class_exp2obd(exp);
2963         if (obd == NULL) {
2964                 CDEBUG(D_IOCTL, "Invalid client cookie "LPX64"\n",
2965                        exp->exp_handle.h_cookie);
2966                 RETURN(-EINVAL);
2967         }
2968         lmv = &obd->u.lmv;
2969
2970         if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX)) {
2971                 int i, err = 0;
2972
2973                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2974                         tgt = lmv->tgts[i];
2975
2976                         if (tgt == NULL || tgt->ltd_exp == NULL)
2977                                 continue;
2978
2979                         err = obd_set_info_async(env, tgt->ltd_exp,
2980                                                  keylen, key, vallen, val, set);
2981                         if (err && rc == 0)
2982                                 rc = err;
2983                 }
2984
2985                 RETURN(rc);
2986         }
2987
2988         RETURN(-EINVAL);
2989 }
2990
2991 static int lmv_pack_md_v1(const struct lmv_stripe_md *lsm,
2992                           struct lmv_mds_md_v1 *lmm1)
2993 {
2994         int     cplen;
2995         int     i;
2996
2997         lmm1->lmv_magic = cpu_to_le32(lsm->lsm_md_magic);
2998         lmm1->lmv_stripe_count = cpu_to_le32(lsm->lsm_md_stripe_count);
2999         lmm1->lmv_master_mdt_index = cpu_to_le32(lsm->lsm_md_master_mdt_index);
3000         lmm1->lmv_hash_type = cpu_to_le32(lsm->lsm_md_hash_type);
3001         cplen = strlcpy(lmm1->lmv_pool_name, lsm->lsm_md_pool_name,
3002                         sizeof(lmm1->lmv_pool_name));
3003         if (cplen >= sizeof(lmm1->lmv_pool_name))
3004                 return -E2BIG;
3005
3006         for (i = 0; i < lsm->lsm_md_stripe_count; i++)
3007                 fid_cpu_to_le(&lmm1->lmv_stripe_fids[i],
3008                               &lsm->lsm_md_oinfo[i].lmo_fid);
3009         return 0;
3010 }
3011
3012 int lmv_pack_md(union lmv_mds_md **lmmp, const struct lmv_stripe_md *lsm,
3013                 int stripe_count)
3014 {
3015         int     lmm_size = 0;
3016         bool    allocated = false;
3017         int     rc = 0;
3018         ENTRY;
3019
3020         LASSERT(lmmp != NULL);
3021         /* Free lmm */
3022         if (*lmmp != NULL && lsm == NULL) {
3023                 int stripe_count;
3024
3025                 stripe_count = lmv_mds_md_stripe_count_get(*lmmp);
3026                 lmm_size = lmv_mds_md_size(stripe_count,
3027                                            le32_to_cpu((*lmmp)->lmv_magic));
3028                 if (lmm_size == 0)
3029                         RETURN(-EINVAL);
3030                 OBD_FREE(*lmmp, lmm_size);
3031                 *lmmp = NULL;
3032                 RETURN(0);
3033         }
3034
3035         /* Alloc lmm */
3036         if (*lmmp == NULL && lsm == NULL) {
3037                 lmm_size = lmv_mds_md_size(stripe_count, LMV_MAGIC);
3038                 LASSERT(lmm_size > 0);
3039                 OBD_ALLOC(*lmmp, lmm_size);
3040                 if (*lmmp == NULL)
3041                         RETURN(-ENOMEM);
3042                 lmv_mds_md_stripe_count_set(*lmmp, stripe_count);
3043                 (*lmmp)->lmv_magic = cpu_to_le32(LMV_MAGIC);
3044                 RETURN(lmm_size);
3045         }
3046
3047         /* pack lmm */
3048         LASSERT(lsm != NULL);
3049         lmm_size = lmv_mds_md_size(lsm->lsm_md_stripe_count, lsm->lsm_md_magic);
3050         if (*lmmp == NULL) {
3051                 OBD_ALLOC(*lmmp, lmm_size);
3052                 if (*lmmp == NULL)
3053                         RETURN(-ENOMEM);
3054                 allocated = true;
3055         }
3056
3057         switch (lsm->lsm_md_magic) {
3058         case LMV_MAGIC_V1:
3059                 rc = lmv_pack_md_v1(lsm, &(*lmmp)->lmv_md_v1);
3060                 break;
3061         default:
3062                 rc = -EINVAL;
3063                 break;
3064         }
3065
3066         if (rc != 0 && allocated) {
3067                 OBD_FREE(*lmmp, lmm_size);
3068                 *lmmp = NULL;
3069         }
3070
3071         RETURN(lmm_size);
3072 }
3073 EXPORT_SYMBOL(lmv_pack_md);
3074
3075 static int lmv_unpack_md_v1(struct obd_export *exp, struct lmv_stripe_md *lsm,
3076                             const struct lmv_mds_md_v1 *lmm1)
3077 {
3078         struct lmv_obd  *lmv = &exp->exp_obd->u.lmv;
3079         int             stripe_count;
3080         int             cplen;
3081         int             i;
3082         int             rc = 0;
3083         ENTRY;
3084
3085         lsm->lsm_md_magic = le32_to_cpu(lmm1->lmv_magic);
3086         lsm->lsm_md_stripe_count = le32_to_cpu(lmm1->lmv_stripe_count);
3087         lsm->lsm_md_master_mdt_index = le32_to_cpu(lmm1->lmv_master_mdt_index);
3088         if (OBD_FAIL_CHECK(OBD_FAIL_UNKNOWN_LMV_STRIPE))
3089                 lsm->lsm_md_hash_type = LMV_HASH_TYPE_UNKNOWN;
3090         else
3091                 lsm->lsm_md_hash_type = le32_to_cpu(lmm1->lmv_hash_type);
3092         lsm->lsm_md_layout_version = le32_to_cpu(lmm1->lmv_layout_version);
3093         cplen = strlcpy(lsm->lsm_md_pool_name, lmm1->lmv_pool_name,
3094                         sizeof(lsm->lsm_md_pool_name));
3095
3096         if (cplen >= sizeof(lsm->lsm_md_pool_name))
3097                 RETURN(-E2BIG);
3098
3099         CDEBUG(D_INFO, "unpack lsm count %d, master %d hash_type %d"
3100                "layout_version %d\n", lsm->lsm_md_stripe_count,
3101                lsm->lsm_md_master_mdt_index, lsm->lsm_md_hash_type,
3102                lsm->lsm_md_layout_version);
3103
3104         stripe_count = le32_to_cpu(lmm1->lmv_stripe_count);
3105         for (i = 0; i < le32_to_cpu(stripe_count); i++) {
3106                 fid_le_to_cpu(&lsm->lsm_md_oinfo[i].lmo_fid,
3107                               &lmm1->lmv_stripe_fids[i]);
3108                 rc = lmv_fld_lookup(lmv, &lsm->lsm_md_oinfo[i].lmo_fid,
3109                                     &lsm->lsm_md_oinfo[i].lmo_mds);
3110                 if (rc != 0)
3111                         RETURN(rc);
3112                 CDEBUG(D_INFO, "unpack fid #%d "DFID"\n", i,
3113                        PFID(&lsm->lsm_md_oinfo[i].lmo_fid));
3114         }
3115
3116         RETURN(rc);
3117 }
3118
3119 int lmv_unpack_md(struct obd_export *exp, struct lmv_stripe_md **lsmp,
3120                   const union lmv_mds_md *lmm, int stripe_count)
3121 {
3122         struct lmv_stripe_md     *lsm;
3123         int                      lsm_size;
3124         int                      rc;
3125         bool                     allocated = false;
3126         ENTRY;
3127
3128         LASSERT(lsmp != NULL);
3129
3130         lsm = *lsmp;
3131         /* Free memmd */
3132         if (lsm != NULL && lmm == NULL) {
3133 #ifdef __KERNEL__
3134                 int i;
3135                 for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
3136                         /* For migrating inode, the master stripe and master
3137                          * object will be the same, so do not need iput, see
3138                          * ll_update_lsm_md */
3139                         if (!(lsm->lsm_md_hash_type & LMV_HASH_FLAG_MIGRATION &&
3140                               i == 0) && lsm->lsm_md_oinfo[i].lmo_root != NULL)
3141                                 iput(lsm->lsm_md_oinfo[i].lmo_root);
3142                 }
3143 #endif
3144                 lsm_size = lmv_stripe_md_size(lsm->lsm_md_stripe_count);
3145                 OBD_FREE(lsm, lsm_size);
3146                 *lsmp = NULL;
3147                 RETURN(0);
3148         }
3149
3150         /* Alloc memmd */
3151         if (lsm == NULL && lmm == NULL) {
3152                 lsm_size = lmv_stripe_md_size(stripe_count);
3153                 OBD_ALLOC(lsm, lsm_size);
3154                 if (lsm == NULL)
3155                         RETURN(-ENOMEM);
3156                 lsm->lsm_md_stripe_count = stripe_count;
3157                 *lsmp = lsm;
3158                 RETURN(0);
3159         }
3160
3161         if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_STRIPE)
3162                 RETURN(-EPERM);
3163
3164         /* Unpack memmd */
3165         if (le32_to_cpu(lmm->lmv_magic) != LMV_MAGIC_V1 &&
3166             le32_to_cpu(lmm->lmv_magic) != LMV_USER_MAGIC) {
3167                 CERROR("%s: invalid lmv magic %x: rc = %d\n",
3168                        exp->exp_obd->obd_name, le32_to_cpu(lmm->lmv_magic),
3169                        -EIO);
3170                 RETURN(-EIO);
3171         }
3172
3173         if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_V1)
3174                 lsm_size = lmv_stripe_md_size(lmv_mds_md_stripe_count_get(lmm));
3175         else
3176                 /**
3177                  * Unpack default dirstripe(lmv_user_md) to lmv_stripe_md,
3178                  * stripecount should be 0 then.
3179                  */
3180                 lsm_size = lmv_stripe_md_size(0);
3181
3182         lsm_size = lmv_stripe_md_size(lmv_mds_md_stripe_count_get(lmm));
3183         if (lsm == NULL) {
3184                 OBD_ALLOC(lsm, lsm_size);
3185                 if (lsm == NULL)
3186                         RETURN(-ENOMEM);
3187                 allocated = true;
3188                 *lsmp = lsm;
3189         }
3190
3191         switch (le32_to_cpu(lmm->lmv_magic)) {
3192         case LMV_MAGIC_V1:
3193                 rc = lmv_unpack_md_v1(exp, lsm, &lmm->lmv_md_v1);
3194                 break;
3195         default:
3196                 CERROR("%s: unrecognized magic %x\n", exp->exp_obd->obd_name,
3197                        le32_to_cpu(lmm->lmv_magic));
3198                 rc = -EINVAL;
3199                 break;
3200         }
3201
3202         if (rc != 0 && allocated) {
3203                 OBD_FREE(lsm, lsm_size);
3204                 *lsmp = NULL;
3205                 lsm_size = rc;
3206         }
3207         RETURN(lsm_size);
3208 }
3209
3210 int lmv_alloc_memmd(struct lmv_stripe_md **lsmp, int stripes)
3211 {
3212         return lmv_unpack_md(NULL, lsmp, NULL, stripes);
3213 }
3214 EXPORT_SYMBOL(lmv_alloc_memmd);
3215
3216 void lmv_free_memmd(struct lmv_stripe_md *lsm)
3217 {
3218         lmv_unpack_md(NULL, &lsm, NULL, 0);
3219 }
3220 EXPORT_SYMBOL(lmv_free_memmd);
3221
3222 int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
3223                  struct lov_mds_md *lmm, int disk_len)
3224 {
3225         return lmv_unpack_md(exp, (struct lmv_stripe_md **)lsmp,
3226                              (union lmv_mds_md *)lmm, disk_len);
3227 }
3228
3229 int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
3230                struct lov_stripe_md *lsm)
3231 {
3232         struct obd_device               *obd = exp->exp_obd;
3233         struct lmv_obd                  *lmv_obd = &obd->u.lmv;
3234         const struct lmv_stripe_md      *lmv = (struct lmv_stripe_md *)lsm;
3235         int                             stripe_count;
3236
3237         if (lmmp == NULL) {
3238                 if (lsm != NULL)
3239                         stripe_count = lmv->lsm_md_stripe_count;
3240                 else
3241                         stripe_count = lmv_obd->desc.ld_tgt_count;
3242
3243                 return lmv_mds_md_size(stripe_count, LMV_MAGIC_V1);
3244         }
3245
3246         return lmv_pack_md((union lmv_mds_md **)lmmp, lmv, 0);
3247 }
3248
3249 static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
3250                              ldlm_policy_data_t *policy, ldlm_mode_t mode,
3251                              ldlm_cancel_flags_t flags, void *opaque)
3252 {
3253         struct obd_device       *obd = exp->exp_obd;
3254         struct lmv_obd          *lmv = &obd->u.lmv;
3255         int                      rc = 0;
3256         int                      err;
3257         __u32                    i;
3258         ENTRY;
3259
3260         LASSERT(fid != NULL);
3261
3262         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
3263                 struct lmv_tgt_desc *tgt = lmv->tgts[i];
3264
3265                 if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active)
3266                         continue;
3267
3268                 err = md_cancel_unused(tgt->ltd_exp, fid, policy, mode, flags,
3269                                        opaque);
3270                 if (!rc)
3271                         rc = err;
3272         }
3273         RETURN(rc);
3274 }
3275
3276 int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
3277                       __u64 *bits)
3278 {
3279         struct lmv_obd          *lmv = &exp->exp_obd->u.lmv;
3280         struct lmv_tgt_desc     *tgt = lmv->tgts[0];
3281         int                      rc;
3282         ENTRY;
3283
3284         if (tgt == NULL || tgt->ltd_exp == NULL)
3285                 RETURN(-EINVAL);
3286         rc =  md_set_lock_data(tgt->ltd_exp, lockh, data, bits);
3287         RETURN(rc);
3288 }
3289
3290 ldlm_mode_t lmv_lock_match(struct obd_export *exp, __u64 flags,
3291                            const struct lu_fid *fid, ldlm_type_t type,
3292                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
3293                            struct lustre_handle *lockh)
3294 {
3295         struct obd_device       *obd = exp->exp_obd;
3296         struct lmv_obd          *lmv = &obd->u.lmv;
3297         ldlm_mode_t             rc;
3298         int                     tgt;
3299         int                     i;
3300         ENTRY;
3301
3302         CDEBUG(D_INODE, "Lock match for "DFID"\n", PFID(fid));
3303
3304         /*
3305          * With DNE every object can have two locks in different namespaces:
3306          * lookup lock in space of MDT storing direntry and update/open lock in
3307          * space of MDT storing inode.  Try the MDT that the FID maps to first,
3308          * since this can be easily found, and only try others if that fails.
3309          */
3310         for (i = 0, tgt = lmv_find_target_index(lmv, fid);
3311              i < lmv->desc.ld_tgt_count;
3312              i++, tgt = (tgt + 1) % lmv->desc.ld_tgt_count) {
3313                 if (tgt < 0) {
3314                         CDEBUG(D_HA, "%s: "DFID" is inaccessible: rc = %d\n",
3315                                obd->obd_name, PFID(fid), tgt);
3316                         tgt = 0;
3317                 }
3318
3319                 if (lmv->tgts[tgt] == NULL ||
3320                     lmv->tgts[tgt]->ltd_exp == NULL ||
3321                     lmv->tgts[tgt]->ltd_active == 0)
3322                         continue;
3323
3324                 rc = md_lock_match(lmv->tgts[tgt]->ltd_exp, flags, fid,
3325                                    type, policy, mode, lockh);
3326                 if (rc)
3327                         RETURN(rc);
3328         }
3329
3330         RETURN(0);
3331 }
3332
3333 int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
3334                       struct obd_export *dt_exp, struct obd_export *md_exp,
3335                       struct lustre_md *md)
3336 {
3337         struct lmv_obd          *lmv = &exp->exp_obd->u.lmv;
3338         struct lmv_tgt_desc     *tgt = lmv->tgts[0];
3339
3340         if (tgt == NULL || tgt->ltd_exp == NULL)
3341                 RETURN(-EINVAL);
3342
3343         return md_get_lustre_md(lmv->tgts[0]->ltd_exp, req, dt_exp, md_exp, md);
3344 }
3345
3346 int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
3347 {
3348         struct obd_device       *obd = exp->exp_obd;
3349         struct lmv_obd          *lmv = &obd->u.lmv;
3350         struct lmv_tgt_desc     *tgt = lmv->tgts[0];
3351         ENTRY;
3352
3353         if (md->lmv != NULL) {
3354                 lmv_free_memmd(md->lmv);
3355                 md->lmv = NULL;
3356         }
3357         if (tgt == NULL || tgt->ltd_exp == NULL)
3358                 RETURN(-EINVAL);
3359         RETURN(md_free_lustre_md(lmv->tgts[0]->ltd_exp, md));
3360 }
3361
3362 int lmv_set_open_replay_data(struct obd_export *exp,
3363                              struct obd_client_handle *och,
3364                              struct lookup_intent *it)
3365 {
3366         struct obd_device       *obd = exp->exp_obd;
3367         struct lmv_obd          *lmv = &obd->u.lmv;
3368         struct lmv_tgt_desc     *tgt;
3369         ENTRY;
3370
3371         tgt = lmv_find_target(lmv, &och->och_fid);
3372         if (IS_ERR(tgt))
3373                 RETURN(PTR_ERR(tgt));
3374
3375         RETURN(md_set_open_replay_data(tgt->ltd_exp, och, it));
3376 }
3377
3378 int lmv_clear_open_replay_data(struct obd_export *exp,
3379                                struct obd_client_handle *och)
3380 {
3381         struct obd_device       *obd = exp->exp_obd;
3382         struct lmv_obd          *lmv = &obd->u.lmv;
3383         struct lmv_tgt_desc     *tgt;
3384         ENTRY;
3385
3386         tgt = lmv_find_target(lmv, &och->och_fid);
3387         if (IS_ERR(tgt))
3388                 RETURN(PTR_ERR(tgt));
3389
3390         RETURN(md_clear_open_replay_data(tgt->ltd_exp, och));
3391 }
3392
3393 static int lmv_get_remote_perm(struct obd_export *exp,
3394                                const struct lu_fid *fid,
3395                                struct obd_capa *oc, __u32 suppgid,
3396                                struct ptlrpc_request **request)
3397 {
3398         struct obd_device       *obd = exp->exp_obd;
3399         struct lmv_obd          *lmv = &obd->u.lmv;
3400         struct lmv_tgt_desc     *tgt;
3401         int                      rc;
3402         ENTRY;
3403
3404         rc = lmv_check_connect(obd);
3405         if (rc)
3406                 RETURN(rc);
3407
3408         tgt = lmv_find_target(lmv, fid);
3409         if (IS_ERR(tgt))
3410                 RETURN(PTR_ERR(tgt));
3411
3412         rc = md_get_remote_perm(tgt->ltd_exp, fid, oc, suppgid, request);
3413         RETURN(rc);
3414 }
3415
3416 static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *oc,
3417                           renew_capa_cb_t cb)
3418 {
3419         struct obd_device       *obd = exp->exp_obd;
3420         struct lmv_obd          *lmv = &obd->u.lmv;
3421         struct lmv_tgt_desc     *tgt;
3422         int                      rc;
3423         ENTRY;
3424
3425         rc = lmv_check_connect(obd);
3426         if (rc)
3427                 RETURN(rc);
3428
3429         tgt = lmv_find_target(lmv, &oc->c_capa.lc_fid);
3430         if (IS_ERR(tgt))
3431                 RETURN(PTR_ERR(tgt));
3432
3433         rc = md_renew_capa(tgt->ltd_exp, oc, cb);
3434         RETURN(rc);
3435 }
3436
3437 int lmv_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req,
3438                     const struct req_msg_field *field, struct obd_capa **oc)
3439 {
3440         struct lmv_obd          *lmv = &exp->exp_obd->u.lmv;
3441         struct lmv_tgt_desc     *tgt = lmv->tgts[0];
3442
3443         if (tgt == NULL || tgt->ltd_exp == NULL)
3444                 RETURN(-EINVAL);
3445         return md_unpack_capa(tgt->ltd_exp, req, field, oc);
3446 }
3447
3448 int lmv_intent_getattr_async(struct obd_export *exp,
3449                              struct md_enqueue_info *minfo,
3450                              struct ldlm_enqueue_info *einfo)
3451 {
3452         struct md_op_data       *op_data = &minfo->mi_data;
3453         struct obd_device       *obd = exp->exp_obd;
3454         struct lmv_obd          *lmv = &obd->u.lmv;
3455         struct lmv_tgt_desc     *tgt = NULL;
3456         int                      rc;
3457         ENTRY;
3458
3459         rc = lmv_check_connect(obd);
3460         if (rc)
3461                 RETURN(rc);
3462
3463         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
3464         if (IS_ERR(tgt))
3465                 RETURN(PTR_ERR(tgt));
3466
3467         rc = md_intent_getattr_async(tgt->ltd_exp, minfo, einfo);
3468         RETURN(rc);
3469 }
3470
3471 int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
3472                         struct lu_fid *fid, __u64 *bits)
3473 {
3474         struct obd_device       *obd = exp->exp_obd;
3475         struct lmv_obd          *lmv = &obd->u.lmv;
3476         struct lmv_tgt_desc     *tgt;
3477         int                      rc;
3478         ENTRY;
3479
3480         rc = lmv_check_connect(obd);
3481         if (rc)
3482                 RETURN(rc);
3483
3484         tgt = lmv_find_target(lmv, fid);
3485         if (IS_ERR(tgt))
3486                 RETURN(PTR_ERR(tgt));
3487
3488         rc = md_revalidate_lock(tgt->ltd_exp, it, fid, bits);
3489         RETURN(rc);
3490 }
3491
3492 int lmv_get_fid_from_lsm(struct obd_export *exp,
3493                          const struct lmv_stripe_md *lsm,
3494                          const char *name, int namelen, struct lu_fid *fid)
3495 {
3496         const struct lmv_oinfo *oinfo;
3497
3498         LASSERT(lsm != NULL);
3499         oinfo = lsm_name_to_stripe_info(lsm, name, namelen);
3500         if (IS_ERR(oinfo))
3501                 return PTR_ERR(oinfo);
3502
3503         *fid = oinfo->lmo_fid;
3504
3505         RETURN(0);
3506 }
3507
3508 /**
3509  * For lmv, only need to send request to master MDT, and the master MDT will
3510  * process with other slave MDTs. The only exception is Q_GETOQUOTA for which
3511  * we directly fetch data from the slave MDTs.
3512  */
3513 int lmv_quotactl(struct obd_device *unused, struct obd_export *exp,
3514                  struct obd_quotactl *oqctl)
3515 {
3516         struct obd_device   *obd = class_exp2obd(exp);
3517         struct lmv_obd      *lmv = &obd->u.lmv;
3518         struct lmv_tgt_desc *tgt = lmv->tgts[0];
3519         int                  rc = 0;
3520         __u32                i;
3521         __u64                curspace, curinodes;
3522         ENTRY;
3523
3524         if (tgt == NULL ||
3525             tgt->ltd_exp == NULL ||
3526             !tgt->ltd_active ||
3527             lmv->desc.ld_tgt_count == 0) {
3528                 CERROR("master lmv inactive\n");
3529                 RETURN(-EIO);
3530         }
3531
3532         if (oqctl->qc_cmd != Q_GETOQUOTA) {
3533                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
3534                 RETURN(rc);
3535         }
3536
3537         curspace = curinodes = 0;
3538         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
3539                 int err;
3540                 tgt = lmv->tgts[i];
3541
3542                 if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active)
3543                         continue;
3544
3545                 err = obd_quotactl(tgt->ltd_exp, oqctl);
3546                 if (err) {
3547                         CERROR("getquota on mdt %d failed. %d\n", i, err);
3548                         if (!rc)
3549                                 rc = err;
3550                 } else {
3551                         curspace += oqctl->qc_dqblk.dqb_curspace;
3552                         curinodes += oqctl->qc_dqblk.dqb_curinodes;
3553                 }
3554         }
3555         oqctl->qc_dqblk.dqb_curspace = curspace;
3556         oqctl->qc_dqblk.dqb_curinodes = curinodes;
3557
3558         RETURN(rc);
3559 }
3560
3561 int lmv_quotacheck(struct obd_device *unused, struct obd_export *exp,
3562                    struct obd_quotactl *oqctl)
3563 {
3564         struct obd_device       *obd = class_exp2obd(exp);
3565         struct lmv_obd          *lmv = &obd->u.lmv;
3566         struct lmv_tgt_desc     *tgt;
3567         __u32                    i;
3568         int                      rc = 0;
3569         ENTRY;
3570
3571         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
3572                 int err;
3573                 tgt = lmv->tgts[i];
3574                 if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active) {
3575                         CERROR("lmv idx %d inactive\n", i);
3576                         RETURN(-EIO);
3577                 }
3578
3579                 err = obd_quotacheck(tgt->ltd_exp, oqctl);
3580                 if (err && !rc)
3581                         rc = err;
3582         }
3583
3584         RETURN(rc);
3585 }
3586
3587 int lmv_update_lsm_md(struct obd_export *exp, struct lmv_stripe_md *lsm,
3588                       struct mdt_body *body, ldlm_blocking_callback cb_blocking)
3589 {
3590         return lmv_revalidate_slaves(exp, body, lsm, cb_blocking, 0);
3591 }
3592
3593 int lmv_merge_attr(struct obd_export *exp, const struct lmv_stripe_md *lsm,
3594                    struct cl_attr *attr)
3595 {
3596 #ifdef __KERNEL__
3597         int i;
3598
3599         for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
3600                 struct inode *inode = lsm->lsm_md_oinfo[i].lmo_root;
3601
3602                 CDEBUG(D_INFO, ""DFID" size %llu, nlink %u, atime %lu ctime"
3603                        "%lu, mtime %lu.\n", PFID(&lsm->lsm_md_oinfo[i].lmo_fid),
3604                        i_size_read(inode), inode->i_nlink,
3605                        LTIME_S(inode->i_atime), LTIME_S(inode->i_ctime),
3606                        LTIME_S(inode->i_mtime));
3607
3608                 /* for slave stripe, it needs to subtract nlink for . and .. */
3609                 if (i != 0)
3610                         attr->cat_nlink += inode->i_nlink - 2;
3611                 else
3612                         attr->cat_nlink = inode->i_nlink;
3613
3614                 attr->cat_size += i_size_read(inode);
3615
3616                 if (attr->cat_atime < LTIME_S(inode->i_atime))
3617                         attr->cat_atime = LTIME_S(inode->i_atime);
3618
3619                 if (attr->cat_ctime < LTIME_S(inode->i_ctime))
3620                         attr->cat_ctime = LTIME_S(inode->i_ctime);
3621
3622                 if (attr->cat_mtime < LTIME_S(inode->i_mtime))
3623                         attr->cat_mtime = LTIME_S(inode->i_mtime);
3624         }
3625 #endif
3626         return 0;
3627 }
3628
3629 struct obd_ops lmv_obd_ops = {
3630         .o_owner                = THIS_MODULE,
3631         .o_setup                = lmv_setup,
3632         .o_cleanup              = lmv_cleanup,
3633         .o_precleanup           = lmv_precleanup,
3634         .o_process_config       = lmv_process_config,
3635         .o_connect              = lmv_connect,
3636         .o_disconnect           = lmv_disconnect,
3637         .o_statfs               = lmv_statfs,
3638         .o_get_info             = lmv_get_info,
3639         .o_set_info_async       = lmv_set_info_async,
3640         .o_packmd               = lmv_packmd,
3641         .o_unpackmd             = lmv_unpackmd,
3642         .o_notify               = lmv_notify,
3643         .o_get_uuid             = lmv_get_uuid,
3644         .o_iocontrol            = lmv_iocontrol,
3645         .o_quotacheck           = lmv_quotacheck,
3646         .o_quotactl             = lmv_quotactl
3647 };
3648
3649 struct md_ops lmv_md_ops = {
3650         .m_getstatus            = lmv_getstatus,
3651         .m_null_inode           = lmv_null_inode,
3652         .m_find_cbdata          = lmv_find_cbdata,
3653         .m_close                = lmv_close,
3654         .m_create               = lmv_create,
3655         .m_done_writing         = lmv_done_writing,
3656         .m_enqueue              = lmv_enqueue,
3657         .m_getattr              = lmv_getattr,
3658         .m_getxattr             = lmv_getxattr,
3659         .m_getattr_name         = lmv_getattr_name,
3660         .m_intent_lock          = lmv_intent_lock,
3661         .m_link                 = lmv_link,
3662         .m_rename               = lmv_rename,
3663         .m_setattr              = lmv_setattr,
3664         .m_setxattr             = lmv_setxattr,
3665         .m_fsync                = lmv_fsync,
3666         .m_read_page            = lmv_read_page,
3667         .m_unlink               = lmv_unlink,
3668         .m_init_ea_size         = lmv_init_ea_size,
3669         .m_cancel_unused        = lmv_cancel_unused,
3670         .m_set_lock_data        = lmv_set_lock_data,
3671         .m_lock_match           = lmv_lock_match,
3672         .m_get_lustre_md        = lmv_get_lustre_md,
3673         .m_free_lustre_md       = lmv_free_lustre_md,
3674         .m_update_lsm_md        = lmv_update_lsm_md,
3675         .m_merge_attr           = lmv_merge_attr,
3676         .m_set_open_replay_data = lmv_set_open_replay_data,
3677         .m_clear_open_replay_data = lmv_clear_open_replay_data,
3678         .m_renew_capa           = lmv_renew_capa,
3679         .m_unpack_capa          = lmv_unpack_capa,
3680         .m_get_remote_perm      = lmv_get_remote_perm,
3681         .m_intent_getattr_async = lmv_intent_getattr_async,
3682         .m_revalidate_lock      = lmv_revalidate_lock,
3683         .m_get_fid_from_lsm     = lmv_get_fid_from_lsm,
3684 };
3685
3686 int __init lmv_init(void)
3687 {
3688         return class_register_type(&lmv_obd_ops, &lmv_md_ops, true, NULL,
3689 #ifndef HAVE_ONLY_PROCFS_SEQ
3690                                    NULL,
3691 #endif
3692                                    LUSTRE_LMV_NAME, NULL);
3693 }
3694
3695 #ifdef __KERNEL__
3696 static void lmv_exit(void)
3697 {
3698         class_unregister_type(LUSTRE_LMV_NAME);
3699 }
3700
3701 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3702 MODULE_DESCRIPTION("Lustre Logical Metadata Volume OBD driver");
3703 MODULE_LICENSE("GPL");
3704
3705 module_init(lmv_init);
3706 module_exit(lmv_exit);
3707 #endif