Whamcloud - gitweb
LU-1187 lmv: remove obsolete lmv object.
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_LMV
38 #ifdef __KERNEL__
39 #include <linux/slab.h>
40 #include <linux/module.h>
41 #include <linux/init.h>
42 #include <linux/slab.h>
43 #include <linux/pagemap.h>
44 #include <linux/mm.h>
45 #include <asm/div64.h>
46 #include <linux/seq_file.h>
47 #include <linux/namei.h>
48 #else
49 #include <liblustre.h>
50 #endif
51
52 #include <lustre/lustre_idl.h>
53 #include <obd_support.h>
54 #include <lustre_lib.h>
55 #include <lustre_net.h>
56 #include <obd_class.h>
57 #include <lprocfs_status.h>
58 #include <lustre_lite.h>
59 #include <lustre_fid.h>
60 #include "lmv_internal.h"
61
62 static void lmv_activate_target(struct lmv_obd *lmv,
63                                 struct lmv_tgt_desc *tgt,
64                                 int activate)
65 {
66         if (tgt->ltd_active == activate)
67                 return;
68
69         tgt->ltd_active = activate;
70         lmv->desc.ld_active_tgt_count += (activate ? 1 : -1);
71 }
72
73 /**
74  * Error codes:
75  *
76  *  -EINVAL  : UUID can't be found in the LMV's target list
77  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
78  *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
79  */
80 static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
81                               int activate)
82 {
83         struct lmv_tgt_desc    *tgt;
84         struct obd_device      *obd;
85         int                     i;
86         int                     rc = 0;
87         ENTRY;
88
89         CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
90                lmv, uuid->uuid, activate);
91
92         spin_lock(&lmv->lmv_lock);
93         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
94                 if (tgt->ltd_exp == NULL)
95                         continue;
96
97                 CDEBUG(D_INFO, "Target idx %d is %s conn "LPX64"\n",
98                        i, tgt->ltd_uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
99
100                 if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
101                         break;
102         }
103
104         if (i == lmv->desc.ld_tgt_count)
105                 GOTO(out_lmv_lock, rc = -EINVAL);
106
107         obd = class_exp2obd(tgt->ltd_exp);
108         if (obd == NULL)
109                 GOTO(out_lmv_lock, rc = -ENOTCONN);
110
111         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
112                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
113                obd->obd_type->typ_name, i);
114         LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0);
115
116         if (tgt->ltd_active == activate) {
117                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
118                        activate ? "" : "in");
119                 GOTO(out_lmv_lock, rc);
120         }
121
122         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd,
123                activate ? "" : "in");
124         lmv_activate_target(lmv, tgt, activate);
125         EXIT;
126
127  out_lmv_lock:
128         spin_unlock(&lmv->lmv_lock);
129         return rc;
130 }
131
132 static int lmv_set_mdc_data(struct lmv_obd *lmv, struct obd_uuid *uuid,
133                             struct obd_connect_data *data)
134 {
135         struct lmv_tgt_desc    *tgt;
136         int                     i;
137         ENTRY;
138
139         LASSERT(data != NULL);
140
141         spin_lock(&lmv->lmv_lock);
142         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
143                 if (tgt->ltd_exp == NULL)
144                         continue;
145
146                 if (obd_uuid_equals(uuid, &tgt->ltd_uuid)) {
147                         lmv->datas[tgt->ltd_idx] = *data;
148                         break;
149                 }
150         }
151         spin_unlock(&lmv->lmv_lock);
152         RETURN(0);
153 }
154
155 struct obd_uuid *lmv_get_uuid(struct obd_export *exp) {
156         struct obd_device *obd = exp->exp_obd;
157         struct lmv_obd *lmv = &obd->u.lmv;
158         return obd_get_uuid(lmv->tgts[0].ltd_exp);
159 }
160
161 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
162                       enum obd_notify_event ev, void *data)
163 {
164         struct obd_connect_data *conn_data;
165         struct lmv_obd          *lmv = &obd->u.lmv;
166         struct obd_uuid         *uuid;
167         int                      rc = 0;
168         ENTRY;
169
170         if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) {
171                 CERROR("unexpected notification of %s %s!\n",
172                        watched->obd_type->typ_name,
173                        watched->obd_name);
174                 RETURN(-EINVAL);
175         }
176
177         uuid = &watched->u.cli.cl_target_uuid;
178         if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) {
179                 /*
180                  * Set MDC as active before notifying the observer, so the
181                  * observer can use the MDC normally.
182                  */
183                 rc = lmv_set_mdc_active(lmv, uuid,
184                                         ev == OBD_NOTIFY_ACTIVE);
185                 if (rc) {
186                         CERROR("%sactivation of %s failed: %d\n",
187                                ev == OBD_NOTIFY_ACTIVE ? "" : "de",
188                                uuid->uuid, rc);
189                         RETURN(rc);
190                 }
191         } else if (ev == OBD_NOTIFY_OCD) {
192                 conn_data = &watched->u.cli.cl_import->imp_connect_data;
193
194                 /*
195                  * Set connect data to desired target, update exp_connect_flags.
196                  */
197                 rc = lmv_set_mdc_data(lmv, uuid, conn_data);
198                 if (rc) {
199                         CERROR("can't set connect data to target %s, rc %d\n",
200                                uuid->uuid, rc);
201                         RETURN(rc);
202                 }
203
204                 /*
205                  * XXX: Make sure that ocd_connect_flags from all targets are
206                  * the same. Otherwise one of MDTs runs wrong version or
207                  * something like this.  --umka
208                  */
209                 obd->obd_self_export->exp_connect_flags =
210                         conn_data->ocd_connect_flags;
211         }
212 #if 0
213         else if (ev == OBD_NOTIFY_DISCON) {
214                 /*
215                  * For disconnect event, flush fld cache for failout MDS case.
216                  */
217                 fld_client_flush(&lmv->lmv_fld);
218         }
219 #endif
220         /*
221          * Pass the notification up the chain.
222          */
223         if (obd->obd_observer)
224                 rc = obd_notify(obd->obd_observer, watched, ev, data);
225
226         RETURN(rc);
227 }
228
229 /**
230  * This is fake connect function. Its purpose is to initialize lmv and say
231  * caller that everything is okay. Real connection will be performed later.
232  */
233 static int lmv_connect(const struct lu_env *env,
234                        struct obd_export **exp, struct obd_device *obd,
235                        struct obd_uuid *cluuid, struct obd_connect_data *data,
236                        void *localdata)
237 {
238 #ifdef __KERNEL__
239         struct proc_dir_entry *lmv_proc_dir;
240 #endif
241         struct lmv_obd        *lmv = &obd->u.lmv;
242         struct lustre_handle  conn = { 0 };
243         int                    rc = 0;
244         ENTRY;
245
246         /*
247          * We don't want to actually do the underlying connections more than
248          * once, so keep track.
249          */
250         lmv->refcount++;
251         if (lmv->refcount > 1) {
252                 *exp = NULL;
253                 RETURN(0);
254         }
255
256         rc = class_connect(&conn, obd, cluuid);
257         if (rc) {
258                 CERROR("class_connection() returned %d\n", rc);
259                 RETURN(rc);
260         }
261
262         *exp = class_conn2export(&conn);
263         class_export_get(*exp);
264
265         lmv->exp = *exp;
266         lmv->connected = 0;
267         lmv->cluuid = *cluuid;
268
269         if (data)
270                 lmv->conn_data = *data;
271
272 #ifdef __KERNEL__
273         lmv_proc_dir = lprocfs_register("target_obds", obd->obd_proc_entry,
274                                         NULL, NULL);
275         if (IS_ERR(lmv_proc_dir)) {
276                 CERROR("could not register /proc/fs/lustre/%s/%s/target_obds.",
277                        obd->obd_type->typ_name, obd->obd_name);
278                 lmv_proc_dir = NULL;
279         }
280 #endif
281
282         /*
283          * All real clients should perform actual connection right away, because
284          * it is possible, that LMV will not have opportunity to connect targets
285          * and MDC stuff will be called directly, for instance while reading
286          * ../mdc/../kbytesfree procfs file, etc.
287          */
288         if (data->ocd_connect_flags & OBD_CONNECT_REAL)
289                 rc = lmv_check_connect(obd);
290
291 #ifdef __KERNEL__
292         if (rc) {
293                 if (lmv_proc_dir)
294                         lprocfs_remove(&lmv_proc_dir);
295         }
296 #endif
297
298         RETURN(rc);
299 }
300
301 static void lmv_set_timeouts(struct obd_device *obd)
302 {
303         struct lmv_tgt_desc   *tgts;
304         struct lmv_obd        *lmv;
305         int                    i;
306
307         lmv = &obd->u.lmv;
308         if (lmv->server_timeout == 0)
309                 return;
310
311         if (lmv->connected == 0)
312                 return;
313
314         for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgts++) {
315                 if (tgts->ltd_exp == NULL)
316                         continue;
317
318                 obd_set_info_async(NULL, tgts->ltd_exp, sizeof(KEY_INTERMDS),
319                                    KEY_INTERMDS, 0, NULL, NULL);
320         }
321 }
322
323 static int lmv_init_ea_size(struct obd_export *exp, int easize,
324                             int def_easize, int cookiesize)
325 {
326         struct obd_device   *obd = exp->exp_obd;
327         struct lmv_obd      *lmv = &obd->u.lmv;
328         int                  i;
329         int                  rc = 0;
330         int                  change = 0;
331         ENTRY;
332
333         if (lmv->max_easize < easize) {
334                 lmv->max_easize = easize;
335                 change = 1;
336         }
337         if (lmv->max_def_easize < def_easize) {
338                 lmv->max_def_easize = def_easize;
339                 change = 1;
340         }
341         if (lmv->max_cookiesize < cookiesize) {
342                 lmv->max_cookiesize = cookiesize;
343                 change = 1;
344         }
345         if (change == 0)
346                 RETURN(0);
347
348         if (lmv->connected == 0)
349                 RETURN(0);
350
351         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
352                 if (lmv->tgts[i].ltd_exp == NULL) {
353                         CWARN("%s: NULL export for %d\n", obd->obd_name, i);
354                         continue;
355                 }
356
357                 rc = md_init_ea_size(lmv->tgts[i].ltd_exp, easize, def_easize,
358                                      cookiesize);
359                 if (rc) {
360                         CERROR("obd_init_ea_size() failed on MDT target %d, "
361                                "error %d.\n", i, rc);
362                         break;
363                 }
364         }
365         RETURN(rc);
366 }
367
368 #define MAX_STRING_SIZE 128
369
370 int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
371 {
372 #ifdef __KERNEL__
373         struct proc_dir_entry   *lmv_proc_dir;
374 #endif
375         struct lmv_obd          *lmv = &obd->u.lmv;
376         struct obd_uuid         *cluuid = &lmv->cluuid;
377         struct obd_connect_data *mdc_data = NULL;
378         struct obd_uuid          lmv_mdc_uuid = { "LMV_MDC_UUID" };
379         struct obd_device       *mdc_obd;
380         struct obd_export       *mdc_exp;
381         struct lu_fld_target     target;
382         int                      rc;
383         ENTRY;
384
385         mdc_obd = class_find_client_obd(&tgt->ltd_uuid, LUSTRE_MDC_NAME,
386                                         &obd->obd_uuid);
387         if (!mdc_obd) {
388                 CERROR("target %s not attached\n", tgt->ltd_uuid.uuid);
389                 RETURN(-EINVAL);
390         }
391
392         CDEBUG(D_CONFIG, "connect to %s(%s) - %s, %s FOR %s\n",
393                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
394                 tgt->ltd_uuid.uuid, obd->obd_uuid.uuid,
395                 cluuid->uuid);
396
397         if (!mdc_obd->obd_set_up) {
398                 CERROR("target %s is not set up\n", tgt->ltd_uuid.uuid);
399                 RETURN(-EINVAL);
400         }
401
402         rc = obd_connect(NULL, &mdc_exp, mdc_obd, &lmv_mdc_uuid,
403                          &lmv->conn_data, NULL);
404         if (rc) {
405                 CERROR("target %s connect error %d\n", tgt->ltd_uuid.uuid, rc);
406                 RETURN(rc);
407         }
408
409         /*
410          * Init fid sequence client for this mdc and add new fld target.
411          */
412         rc = obd_fid_init(mdc_exp, LUSTRE_SEQ_METADATA);
413         if (rc)
414                 RETURN(rc);
415
416         target.ft_srv = NULL;
417         target.ft_exp = mdc_exp;
418         target.ft_idx = tgt->ltd_idx;
419
420         fld_client_add_target(&lmv->lmv_fld, &target);
421
422         mdc_data = &class_exp2cliimp(mdc_exp)->imp_connect_data;
423
424         rc = obd_register_observer(mdc_obd, obd);
425         if (rc) {
426                 obd_disconnect(mdc_exp);
427                 CERROR("target %s register_observer error %d\n",
428                        tgt->ltd_uuid.uuid, rc);
429                 RETURN(rc);
430         }
431
432         if (obd->obd_observer) {
433                 /*
434                  * Tell the observer about the new target.
435                  */
436                 rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
437                                 OBD_NOTIFY_ACTIVE, (void *)(tgt - lmv->tgts));
438                 if (rc) {
439                         obd_disconnect(mdc_exp);
440                         RETURN(rc);
441                 }
442         }
443
444         tgt->ltd_active = 1;
445         tgt->ltd_exp = mdc_exp;
446         lmv->desc.ld_active_tgt_count++;
447
448         /*
449          * Copy connect data, it may be used later.
450          */
451         lmv->datas[tgt->ltd_idx] = *mdc_data;
452
453         md_init_ea_size(tgt->ltd_exp, lmv->max_easize,
454                         lmv->max_def_easize, lmv->max_cookiesize);
455
456         CDEBUG(D_CONFIG, "Connected to %s(%s) successfully (%d)\n",
457                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
458                 cfs_atomic_read(&obd->obd_refcount));
459
460 #ifdef __KERNEL__
461         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
462         if (lmv_proc_dir) {
463                 struct proc_dir_entry *mdc_symlink;
464
465                 LASSERT(mdc_obd->obd_type != NULL);
466                 LASSERT(mdc_obd->obd_type->typ_name != NULL);
467                 mdc_symlink = lprocfs_add_symlink(mdc_obd->obd_name,
468                                                   lmv_proc_dir,
469                                                   "../../../%s/%s",
470                                                   mdc_obd->obd_type->typ_name,
471                                                   mdc_obd->obd_name);
472                 if (mdc_symlink == NULL) {
473                         CERROR("Could not register LMV target "
474                                "/proc/fs/lustre/%s/%s/target_obds/%s.",
475                                obd->obd_type->typ_name, obd->obd_name,
476                                mdc_obd->obd_name);
477                         lprocfs_remove(&lmv_proc_dir);
478                         lmv_proc_dir = NULL;
479                 }
480         }
481 #endif
482         RETURN(0);
483 }
484
485 int lmv_add_target(struct obd_device *obd, struct obd_uuid *tgt_uuid)
486 {
487         struct lmv_obd      *lmv = &obd->u.lmv;
488         struct lmv_tgt_desc *tgt;
489         int                  rc = 0;
490         ENTRY;
491
492         CDEBUG(D_CONFIG, "Target uuid: %s.\n", tgt_uuid->uuid);
493
494         lmv_init_lock(lmv);
495
496         if (lmv->desc.ld_active_tgt_count >= LMV_MAX_TGT_COUNT) {
497                 lmv_init_unlock(lmv);
498                 CERROR("Can't add %s, LMV module compiled for %d MDCs. "
499                        "That many MDCs already configured.\n",
500                        tgt_uuid->uuid, LMV_MAX_TGT_COUNT);
501                 RETURN(-EINVAL);
502         }
503         if (lmv->desc.ld_tgt_count == 0) {
504                 struct obd_device *mdc_obd;
505
506                 mdc_obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME,
507                                                 &obd->obd_uuid);
508                 if (!mdc_obd) {
509                         lmv_init_unlock(lmv);
510                         CERROR("Target %s not attached\n", tgt_uuid->uuid);
511                         RETURN(-EINVAL);
512                 }
513         }
514         spin_lock(&lmv->lmv_lock);
515         tgt = lmv->tgts + lmv->desc.ld_tgt_count++;
516         tgt->ltd_uuid = *tgt_uuid;
517         spin_unlock(&lmv->lmv_lock);
518
519         if (lmv->connected) {
520                 rc = lmv_connect_mdc(obd, tgt);
521                 if (rc) {
522                         spin_lock(&lmv->lmv_lock);
523                         lmv->desc.ld_tgt_count--;
524                         memset(tgt, 0, sizeof(*tgt));
525                         spin_unlock(&lmv->lmv_lock);
526                 } else {
527                         int easize = sizeof(struct lmv_stripe_md) +
528                                      lmv->desc.ld_tgt_count *
529                                      sizeof(struct lu_fid);
530                         lmv_init_ea_size(obd->obd_self_export, easize, 0, 0);
531                 }
532         }
533
534         lmv_init_unlock(lmv);
535         RETURN(rc);
536 }
537
538 int lmv_check_connect(struct obd_device *obd)
539 {
540         struct lmv_obd       *lmv = &obd->u.lmv;
541         struct lmv_tgt_desc  *tgt;
542         int                   i;
543         int                   rc;
544         int                   easize;
545         ENTRY;
546
547         if (lmv->connected)
548                 RETURN(0);
549
550         lmv_init_lock(lmv);
551         if (lmv->connected) {
552                 lmv_init_unlock(lmv);
553                 RETURN(0);
554         }
555
556         if (lmv->desc.ld_tgt_count == 0) {
557                 lmv_init_unlock(lmv);
558                 CERROR("%s: no targets configured.\n", obd->obd_name);
559                 RETURN(-EINVAL);
560         }
561
562         CDEBUG(D_CONFIG, "Time to connect %s to %s\n",
563                lmv->cluuid.uuid, obd->obd_name);
564
565         LASSERT(lmv->tgts != NULL);
566
567         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
568                 rc = lmv_connect_mdc(obd, tgt);
569                 if (rc)
570                         GOTO(out_disc, rc);
571         }
572
573         lmv_set_timeouts(obd);
574         class_export_put(lmv->exp);
575         lmv->connected = 1;
576         easize = lmv_get_easize(lmv);
577         lmv_init_ea_size(obd->obd_self_export, easize, 0, 0);
578         lmv_init_unlock(lmv);
579         RETURN(0);
580
581  out_disc:
582         while (i-- > 0) {
583                 int rc2;
584                 --tgt;
585                 tgt->ltd_active = 0;
586                 if (tgt->ltd_exp) {
587                         --lmv->desc.ld_active_tgt_count;
588                         rc2 = obd_disconnect(tgt->ltd_exp);
589                         if (rc2) {
590                                 CERROR("LMV target %s disconnect on "
591                                        "MDC idx %d: error %d\n",
592                                        tgt->ltd_uuid.uuid, i, rc2);
593                         }
594                 }
595         }
596         class_disconnect(lmv->exp);
597         lmv_init_unlock(lmv);
598         RETURN(rc);
599 }
600
601 static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
602 {
603 #ifdef __KERNEL__
604         struct proc_dir_entry  *lmv_proc_dir;
605 #endif
606         struct lmv_obd         *lmv = &obd->u.lmv;
607         struct obd_device      *mdc_obd;
608         int                     rc;
609         ENTRY;
610
611         LASSERT(tgt != NULL);
612         LASSERT(obd != NULL);
613
614         mdc_obd = class_exp2obd(tgt->ltd_exp);
615
616         if (mdc_obd) {
617                 mdc_obd->obd_force = obd->obd_force;
618                 mdc_obd->obd_fail = obd->obd_fail;
619                 mdc_obd->obd_no_recov = obd->obd_no_recov;
620         }
621
622 #ifdef __KERNEL__
623         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
624         if (lmv_proc_dir) {
625                 struct proc_dir_entry *mdc_symlink;
626
627                 mdc_symlink = lprocfs_srch(lmv_proc_dir, mdc_obd->obd_name);
628                 if (mdc_symlink) {
629                         lprocfs_remove(&mdc_symlink);
630                 } else {
631                         CERROR("/proc/fs/lustre/%s/%s/target_obds/%s missing\n",
632                                obd->obd_type->typ_name, obd->obd_name,
633                                mdc_obd->obd_name);
634                 }
635         }
636 #endif
637         rc = obd_fid_fini(tgt->ltd_exp);
638         if (rc)
639                 CERROR("Can't finanize fids factory\n");
640
641         CDEBUG(D_INFO, "Disconnected from %s(%s) successfully\n",
642                tgt->ltd_exp->exp_obd->obd_name,
643                tgt->ltd_exp->exp_obd->obd_uuid.uuid);
644
645         obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
646         rc = obd_disconnect(tgt->ltd_exp);
647         if (rc) {
648                 if (tgt->ltd_active) {
649                         CERROR("Target %s disconnect error %d\n",
650                                tgt->ltd_uuid.uuid, rc);
651                 }
652         }
653
654         lmv_activate_target(lmv, tgt, 0);
655         tgt->ltd_exp = NULL;
656         RETURN(0);
657 }
658
659 static int lmv_disconnect(struct obd_export *exp)
660 {
661         struct obd_device     *obd = class_exp2obd(exp);
662 #ifdef __KERNEL__
663         struct proc_dir_entry *lmv_proc_dir;
664 #endif
665         struct lmv_obd        *lmv = &obd->u.lmv;
666         int                    rc;
667         int                    i;
668         ENTRY;
669
670         if (!lmv->tgts)
671                 goto out_local;
672
673         /*
674          * Only disconnect the underlying layers on the final disconnect.
675          */
676         lmv->refcount--;
677         if (lmv->refcount != 0)
678                 goto out_local;
679
680         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
681                 if (lmv->tgts[i].ltd_exp == NULL)
682                         continue;
683                 lmv_disconnect_mdc(obd, &lmv->tgts[i]);
684         }
685
686 #ifdef __KERNEL__
687         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
688         if (lmv_proc_dir) {
689                 lprocfs_remove(&lmv_proc_dir);
690         } else {
691                 CERROR("/proc/fs/lustre/%s/%s/target_obds missing\n",
692                        obd->obd_type->typ_name, obd->obd_name);
693         }
694 #endif
695
696 out_local:
697         /*
698          * This is the case when no real connection is established by
699          * lmv_check_connect().
700          */
701         if (!lmv->connected)
702                 class_export_put(exp);
703         rc = class_disconnect(exp);
704         if (lmv->refcount == 0)
705                 lmv->connected = 0;
706         RETURN(rc);
707 }
708
709 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
710                          int len, void *karg, void *uarg)
711 {
712         struct obd_device    *obddev = class_exp2obd(exp);
713         struct lmv_obd       *lmv = &obddev->u.lmv;
714         int                   i = 0;
715         int                   rc = 0;
716         int                   set = 0;
717         int                   count = lmv->desc.ld_tgt_count;
718         ENTRY;
719
720         if (count == 0)
721                 RETURN(-ENOTTY);
722
723         switch (cmd) {
724         case IOC_OBD_STATFS: {
725                 struct obd_ioctl_data *data = karg;
726                 struct obd_device *mdc_obd;
727                 struct obd_statfs stat_buf = {0};
728                 __u32 index;
729
730                 memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
731                 if ((index >= count))
732                         RETURN(-ENODEV);
733
734                 if (!lmv->tgts[index].ltd_active)
735                         RETURN(-ENODATA);
736
737                 mdc_obd = class_exp2obd(lmv->tgts[index].ltd_exp);
738                 if (!mdc_obd)
739                         RETURN(-EINVAL);
740
741                 /* copy UUID */
742                 if (cfs_copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
743                                      min((int) data->ioc_plen2,
744                                          (int) sizeof(struct obd_uuid))))
745                         RETURN(-EFAULT);
746
747                 rc = obd_statfs(NULL, lmv->tgts[index].ltd_exp, &stat_buf,
748                                 cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
749                                 0);
750                 if (rc)
751                         RETURN(rc);
752                 if (cfs_copy_to_user(data->ioc_pbuf1, &stat_buf,
753                                      min((int) data->ioc_plen1,
754                                          (int) sizeof(stat_buf))))
755                         RETURN(-EFAULT);
756                 break;
757         }
758         case OBD_IOC_QUOTACTL: {
759                 struct if_quotactl *qctl = karg;
760                 struct lmv_tgt_desc *tgt = NULL;
761                 struct obd_quotactl *oqctl;
762
763                 if (qctl->qc_valid == QC_MDTIDX) {
764                         if (qctl->qc_idx < 0 || count <= qctl->qc_idx)
765                                 RETURN(-EINVAL);
766
767                         tgt = &lmv->tgts[qctl->qc_idx];
768                         if (!tgt->ltd_exp)
769                                 RETURN(-EINVAL);
770                 } else if (qctl->qc_valid == QC_UUID) {
771                         for (i = 0; i < count; i++) {
772                                 tgt = &lmv->tgts[i];
773                                 if (!obd_uuid_equals(&tgt->ltd_uuid,
774                                                      &qctl->obd_uuid))
775                                         continue;
776
777                                 if (tgt->ltd_exp == NULL)
778                                         RETURN(-EINVAL);
779
780                                 break;
781                         }
782                 } else {
783                         RETURN(-EINVAL);
784                 }
785
786                 if (i >= count)
787                         RETURN(-EAGAIN);
788
789                 LASSERT(tgt && tgt->ltd_exp);
790                 OBD_ALLOC_PTR(oqctl);
791                 if (!oqctl)
792                         RETURN(-ENOMEM);
793
794                 QCTL_COPY(oqctl, qctl);
795                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
796                 if (rc == 0) {
797                         QCTL_COPY(qctl, oqctl);
798                         qctl->qc_valid = QC_MDTIDX;
799                         qctl->obd_uuid = tgt->ltd_uuid;
800                 }
801                 OBD_FREE_PTR(oqctl);
802                 break;
803         }
804         case OBD_IOC_CHANGELOG_SEND:
805         case OBD_IOC_CHANGELOG_CLEAR: {
806                 struct ioc_changelog *icc = karg;
807
808                 if (icc->icc_mdtindex >= count)
809                         RETURN(-ENODEV);
810
811                 rc = obd_iocontrol(cmd, lmv->tgts[icc->icc_mdtindex].ltd_exp,
812                                    sizeof(*icc), icc, NULL);
813                 break;
814         }
815         case LL_IOC_GET_CONNECT_FLAGS: {
816                 rc = obd_iocontrol(cmd, lmv->tgts[0].ltd_exp, len, karg, uarg);
817                 break;
818         }
819
820         default : {
821                 for (i = 0; i < count; i++) {
822                         int err;
823                         struct obd_device *mdc_obd;
824
825                         if (lmv->tgts[i].ltd_exp == NULL)
826                                 continue;
827                         /* ll_umount_begin() sets force flag but for lmv, not
828                          * mdc. Let's pass it through */
829                         mdc_obd = class_exp2obd(lmv->tgts[i].ltd_exp);
830                         mdc_obd->obd_force = obddev->obd_force;
831                         err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp, len,
832                                             karg, uarg);
833                         if (err == -ENODATA && cmd == OBD_IOC_POLL_QUOTACHECK) {
834                                 RETURN(err);
835                         } else if (err) {
836                                 if (lmv->tgts[i].ltd_active) {
837                                         CERROR("error: iocontrol MDC %s on MDT"
838                                                "idx %d cmd %x: err = %d\n",
839                                                 lmv->tgts[i].ltd_uuid.uuid,
840                                                 i, cmd, err);
841                                         if (!rc)
842                                                 rc = err;
843                                 }
844                         } else
845                                 set = 1;
846                 }
847                 if (!set && !rc)
848                         rc = -EIO;
849         }
850         }
851         RETURN(rc);
852 }
853
854 #if 0
855 static int lmv_all_chars_policy(int count, const char *name,
856                                 int len)
857 {
858         unsigned int c = 0;
859
860         while (len > 0)
861                 c += name[--len];
862         c = c % count;
863         return c;
864 }
865
866 static int lmv_nid_policy(struct lmv_obd *lmv)
867 {
868         struct obd_import *imp;
869         __u32              id;
870
871         /*
872          * XXX: To get nid we assume that underlying obd device is mdc.
873          */
874         imp = class_exp2cliimp(lmv->tgts[0].ltd_exp);
875         id = imp->imp_connection->c_self ^ (imp->imp_connection->c_self >> 32);
876         return id % lmv->desc.ld_tgt_count;
877 }
878
879 static int lmv_choose_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
880                           placement_policy_t placement)
881 {
882         switch (placement) {
883         case PLACEMENT_CHAR_POLICY:
884                 return lmv_all_chars_policy(lmv->desc.ld_tgt_count,
885                                             op_data->op_name,
886                                             op_data->op_namelen);
887         case PLACEMENT_NID_POLICY:
888                 return lmv_nid_policy(lmv);
889
890         default:
891                 break;
892         }
893
894         CERROR("Unsupported placement policy %x\n", placement);
895         return -EINVAL;
896 }
897 #endif
898
899 /**
900  * This is _inode_ placement policy function (not name).
901  */
902 static int lmv_placement_policy(struct obd_device *obd,
903                                 struct md_op_data *op_data,
904                                 mdsno_t *mds)
905 {
906         LASSERT(mds != NULL);
907
908         /* Allocate new fid on target according to to different
909          * QOS policy. In DNE phase I, llite should always tell
910          * which MDT where the dir will be located */
911         *mds = op_data->op_mds;
912
913         RETURN(0);
914 }
915
916 int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
917                     mdsno_t mds)
918 {
919         struct lmv_tgt_desc *tgt;
920         int                  rc;
921         ENTRY;
922
923         tgt = lmv_get_target(lmv, mds);
924
925         /*
926          * New seq alloc and FLD setup should be atomic. Otherwise we may find
927          * on server that seq in new allocated fid is not yet known.
928          */
929         mutex_lock(&tgt->ltd_fid_mutex);
930
931         if (!tgt->ltd_active)
932                 GOTO(out, rc = -ENODEV);
933
934         /*
935          * Asking underlaying tgt layer to allocate new fid.
936          */
937         rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL);
938         if (rc > 0) {
939                 LASSERT(fid_is_sane(fid));
940                 rc = 0;
941         }
942
943         EXIT;
944 out:
945         mutex_unlock(&tgt->ltd_fid_mutex);
946         return rc;
947 }
948
949 int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
950                   struct md_op_data *op_data)
951 {
952         struct obd_device     *obd = class_exp2obd(exp);
953         struct lmv_obd        *lmv = &obd->u.lmv;
954         mdsno_t                mds = 0;
955         int                    rc;
956         ENTRY;
957
958         LASSERT(op_data != NULL);
959         LASSERT(fid != NULL);
960
961         rc = lmv_placement_policy(obd, op_data, &mds);
962         if (rc) {
963                 CERROR("Can't get target for allocating fid, "
964                        "rc %d\n", rc);
965                 RETURN(rc);
966         }
967
968         rc = __lmv_fid_alloc(lmv, fid, mds);
969         if (rc) {
970                 CERROR("Can't alloc new fid, rc %d\n", rc);
971                 RETURN(rc);
972         }
973
974         RETURN(rc);
975 }
976
977 static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
978 {
979         struct lmv_obd             *lmv = &obd->u.lmv;
980         struct lprocfs_static_vars  lvars;
981         struct lmv_desc            *desc;
982         int                         rc;
983         int                         i = 0;
984         ENTRY;
985
986         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
987                 CERROR("LMV setup requires a descriptor\n");
988                 RETURN(-EINVAL);
989         }
990
991         desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
992         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
993                 CERROR("Lmv descriptor size wrong: %d > %d\n",
994                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
995                 RETURN(-EINVAL);
996         }
997
998         lmv->tgts_size = LMV_MAX_TGT_COUNT * sizeof(struct lmv_tgt_desc);
999
1000         OBD_ALLOC(lmv->tgts, lmv->tgts_size);
1001         if (lmv->tgts == NULL)
1002                 RETURN(-ENOMEM);
1003
1004         for (i = 0; i < LMV_MAX_TGT_COUNT; i++) {
1005                 mutex_init(&lmv->tgts[i].ltd_fid_mutex);
1006                 lmv->tgts[i].ltd_idx = i;
1007         }
1008
1009         lmv->datas_size = LMV_MAX_TGT_COUNT * sizeof(struct obd_connect_data);
1010
1011         OBD_ALLOC(lmv->datas, lmv->datas_size);
1012         if (lmv->datas == NULL)
1013                 GOTO(out_free_tgts, rc = -ENOMEM);
1014
1015         obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid);
1016         lmv->desc.ld_tgt_count = 0;
1017         lmv->desc.ld_active_tgt_count = 0;
1018         lmv->max_cookiesize = 0;
1019         lmv->max_def_easize = 0;
1020         lmv->max_easize = 0;
1021         lmv->lmv_placement = PLACEMENT_CHAR_POLICY;
1022
1023         spin_lock_init(&lmv->lmv_lock);
1024         mutex_init(&lmv->init_mutex);
1025
1026         lprocfs_lmv_init_vars(&lvars);
1027         lprocfs_obd_setup(obd, lvars.obd_vars);
1028 #ifdef LPROCFS
1029         {
1030                 rc = lprocfs_seq_create(obd->obd_proc_entry, "target_obd",
1031                                         0444, &lmv_proc_target_fops, obd);
1032                 if (rc)
1033                         CWARN("%s: error adding LMV target_obd file: rc = %d\n",
1034                                obd->obd_name, rc);
1035        }
1036 #endif
1037         rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
1038                              LUSTRE_CLI_FLD_HASH_DHT);
1039         if (rc) {
1040                 CERROR("Can't init FLD, err %d\n", rc);
1041                 GOTO(out_free_datas, rc);
1042         }
1043
1044         RETURN(0);
1045
1046 out_free_datas:
1047         OBD_FREE(lmv->datas, lmv->datas_size);
1048         lmv->datas = NULL;
1049 out_free_tgts:
1050         OBD_FREE(lmv->tgts, lmv->tgts_size);
1051         lmv->tgts = NULL;
1052         return rc;
1053 }
1054
1055 static int lmv_cleanup(struct obd_device *obd)
1056 {
1057         struct lmv_obd   *lmv = &obd->u.lmv;
1058         ENTRY;
1059
1060         fld_client_fini(&lmv->lmv_fld);
1061         OBD_FREE(lmv->datas, lmv->datas_size);
1062         OBD_FREE(lmv->tgts, lmv->tgts_size);
1063
1064         RETURN(0);
1065 }
1066
1067 static int lmv_process_config(struct obd_device *obd, obd_count len, void *buf)
1068 {
1069         struct lustre_cfg     *lcfg = buf;
1070         struct obd_uuid        tgt_uuid;
1071         int                    rc;
1072         ENTRY;
1073
1074         switch(lcfg->lcfg_command) {
1075         case LCFG_ADD_MDC:
1076                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(tgt_uuid.uuid))
1077                         GOTO(out, rc = -EINVAL);
1078
1079                 obd_str2uuid(&tgt_uuid, lustre_cfg_string(lcfg, 1));
1080                 rc = lmv_add_target(obd, &tgt_uuid);
1081                 GOTO(out, rc);
1082         default: {
1083                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
1084                 GOTO(out, rc = -EINVAL);
1085         }
1086         }
1087 out:
1088         RETURN(rc);
1089 }
1090
1091 static int lmv_statfs(const struct lu_env *env, struct obd_export *exp,
1092                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
1093 {
1094         struct obd_device     *obd = class_exp2obd(exp);
1095         struct lmv_obd        *lmv = &obd->u.lmv;
1096         struct obd_statfs     *temp;
1097         int                    rc = 0;
1098         int                    i;
1099         ENTRY;
1100
1101         rc = lmv_check_connect(obd);
1102         if (rc)
1103                 RETURN(rc);
1104
1105         OBD_ALLOC(temp, sizeof(*temp));
1106         if (temp == NULL)
1107                 RETURN(-ENOMEM);
1108
1109         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1110                 if (lmv->tgts[i].ltd_exp == NULL)
1111                         continue;
1112
1113                 rc = obd_statfs(env, lmv->tgts[i].ltd_exp, temp,
1114                                 max_age, flags);
1115                 if (rc) {
1116                         CERROR("can't stat MDS #%d (%s), error %d\n", i,
1117                                lmv->tgts[i].ltd_exp->exp_obd->obd_name,
1118                                rc);
1119                         GOTO(out_free_temp, rc);
1120                 }
1121                 if (i == 0) {
1122                         *osfs = *temp;
1123                 } else {
1124                         osfs->os_bavail += temp->os_bavail;
1125                         osfs->os_blocks += temp->os_blocks;
1126                         osfs->os_ffree += temp->os_ffree;
1127                         osfs->os_files += temp->os_files;
1128                 }
1129         }
1130
1131         EXIT;
1132 out_free_temp:
1133         OBD_FREE(temp, sizeof(*temp));
1134         return rc;
1135 }
1136
1137 static int lmv_getstatus(struct obd_export *exp,
1138                          struct lu_fid *fid,
1139                          struct obd_capa **pc)
1140 {
1141         struct obd_device    *obd = exp->exp_obd;
1142         struct lmv_obd       *lmv = &obd->u.lmv;
1143         int                   rc;
1144         ENTRY;
1145
1146         rc = lmv_check_connect(obd);
1147         if (rc)
1148                 RETURN(rc);
1149
1150         rc = md_getstatus(lmv->tgts[0].ltd_exp, fid, pc);
1151         RETURN(rc);
1152 }
1153
1154 static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid,
1155                         struct obd_capa *oc, obd_valid valid, const char *name,
1156                         const char *input, int input_size, int output_size,
1157                         int flags, struct ptlrpc_request **request)
1158 {
1159         struct obd_device      *obd = exp->exp_obd;
1160         struct lmv_obd         *lmv = &obd->u.lmv;
1161         struct lmv_tgt_desc    *tgt;
1162         int                     rc;
1163         ENTRY;
1164
1165         rc = lmv_check_connect(obd);
1166         if (rc)
1167                 RETURN(rc);
1168
1169         tgt = lmv_find_target(lmv, fid);
1170         if (IS_ERR(tgt))
1171                 RETURN(PTR_ERR(tgt));
1172
1173         rc = md_getxattr(tgt->ltd_exp, fid, oc, valid, name, input,
1174                          input_size, output_size, flags, request);
1175
1176         RETURN(rc);
1177 }
1178
1179 static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
1180                         struct obd_capa *oc, obd_valid valid, const char *name,
1181                         const char *input, int input_size, int output_size,
1182                         int flags, __u32 suppgid,
1183                         struct ptlrpc_request **request)
1184 {
1185         struct obd_device      *obd = exp->exp_obd;
1186         struct lmv_obd         *lmv = &obd->u.lmv;
1187         struct lmv_tgt_desc    *tgt;
1188         int                     rc;
1189         ENTRY;
1190
1191         rc = lmv_check_connect(obd);
1192         if (rc)
1193                 RETURN(rc);
1194
1195         tgt = lmv_find_target(lmv, fid);
1196         if (IS_ERR(tgt))
1197                 RETURN(PTR_ERR(tgt));
1198
1199         rc = md_setxattr(tgt->ltd_exp, fid, oc, valid, name, input,
1200                          input_size, output_size, flags, suppgid,
1201                          request);
1202
1203         RETURN(rc);
1204 }
1205
1206 static int lmv_getattr(struct obd_export *exp, struct md_op_data *op_data,
1207                        struct ptlrpc_request **request)
1208 {
1209         struct obd_device       *obd = exp->exp_obd;
1210         struct lmv_obd          *lmv = &obd->u.lmv;
1211         struct lmv_tgt_desc     *tgt;
1212         int                      rc;
1213         ENTRY;
1214
1215         rc = lmv_check_connect(obd);
1216         if (rc)
1217                 RETURN(rc);
1218
1219         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1220         if (IS_ERR(tgt))
1221                 RETURN(PTR_ERR(tgt));
1222
1223         if (op_data->op_flags & MF_GET_MDT_IDX) {
1224                 op_data->op_mds = tgt->ltd_idx;
1225                 RETURN(0);
1226         }
1227
1228         rc = md_getattr(tgt->ltd_exp, op_data, request);
1229
1230         RETURN(rc);
1231 }
1232
1233 static int lmv_change_cbdata(struct obd_export *exp, const struct lu_fid *fid,
1234                              ldlm_iterator_t it, void *data)
1235 {
1236         struct obd_device   *obd = exp->exp_obd;
1237         struct lmv_obd      *lmv = &obd->u.lmv;
1238         int                  i;
1239         int                  rc;
1240         ENTRY;
1241
1242         rc = lmv_check_connect(obd);
1243         if (rc)
1244                 RETURN(rc);
1245
1246         CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
1247
1248         /*
1249          * With CMD every object can have two locks in different namespaces:
1250          * lookup lock in space of mds storing direntry and update/open lock in
1251          * space of mds storing inode.
1252          */
1253         for (i = 0; i < lmv->desc.ld_tgt_count; i++)
1254                 md_change_cbdata(lmv->tgts[i].ltd_exp, fid, it, data);
1255
1256         RETURN(0);
1257 }
1258
1259 static int lmv_find_cbdata(struct obd_export *exp, const struct lu_fid *fid,
1260                            ldlm_iterator_t it, void *data)
1261 {
1262         struct obd_device   *obd = exp->exp_obd;
1263         struct lmv_obd      *lmv = &obd->u.lmv;
1264         int                  i;
1265         int                  rc;
1266         ENTRY;
1267
1268         rc = lmv_check_connect(obd);
1269         if (rc)
1270                 RETURN(rc);
1271
1272         CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
1273
1274         /*
1275          * With CMD every object can have two locks in different namespaces:
1276          * lookup lock in space of mds storing direntry and update/open lock in
1277          * space of mds storing inode.
1278          */
1279         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1280                 rc = md_find_cbdata(lmv->tgts[i].ltd_exp, fid, it, data);
1281                 if (rc)
1282                         RETURN(rc);
1283         }
1284
1285         RETURN(rc);
1286 }
1287
1288
1289 static int lmv_close(struct obd_export *exp, struct md_op_data *op_data,
1290                      struct md_open_data *mod, struct ptlrpc_request **request)
1291 {
1292         struct obd_device     *obd = exp->exp_obd;
1293         struct lmv_obd        *lmv = &obd->u.lmv;
1294         struct lmv_tgt_desc   *tgt;
1295         int                    rc;
1296         ENTRY;
1297
1298         rc = lmv_check_connect(obd);
1299         if (rc)
1300                 RETURN(rc);
1301
1302         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1303         if (IS_ERR(tgt))
1304                 RETURN(PTR_ERR(tgt));
1305
1306         CDEBUG(D_INODE, "CLOSE "DFID"\n", PFID(&op_data->op_fid1));
1307         rc = md_close(tgt->ltd_exp, op_data, mod, request);
1308         RETURN(rc);
1309 }
1310
1311 struct lmv_tgt_desc
1312 *lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
1313                 struct lu_fid *fid)
1314 {
1315         struct lmv_tgt_desc *tgt;
1316
1317         tgt = lmv_find_target(lmv, fid);
1318         op_data->op_mds = tgt->ltd_idx;
1319
1320         return tgt;
1321 }
1322
1323 int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
1324                const void *data, int datalen, int mode, __u32 uid,
1325                __u32 gid, cfs_cap_t cap_effective, __u64 rdev,
1326                struct ptlrpc_request **request)
1327 {
1328         struct obd_device       *obd = exp->exp_obd;
1329         struct lmv_obd          *lmv = &obd->u.lmv;
1330         struct lmv_tgt_desc     *tgt;
1331         int                      rc;
1332         ENTRY;
1333
1334         rc = lmv_check_connect(obd);
1335         if (rc)
1336                 RETURN(rc);
1337
1338         if (!lmv->desc.ld_active_tgt_count)
1339                 RETURN(-EIO);
1340
1341         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1342         if (IS_ERR(tgt))
1343                 RETURN(PTR_ERR(tgt));
1344
1345         rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data);
1346         if (rc)
1347                 RETURN(rc);
1348
1349         CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #%x\n",
1350                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1351                op_data->op_mds);
1352
1353         op_data->op_flags |= MF_MDC_CANCEL_FID1;
1354         rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid,
1355                        cap_effective, rdev, request);
1356
1357         if (rc == 0) {
1358                 if (*request == NULL)
1359                         RETURN(rc);
1360                 CDEBUG(D_INODE, "Created - "DFID"\n", PFID(&op_data->op_fid2));
1361         }
1362         RETURN(rc);
1363 }
1364
1365 static int lmv_done_writing(struct obd_export *exp,
1366                             struct md_op_data *op_data,
1367                             struct md_open_data *mod)
1368 {
1369         struct obd_device     *obd = exp->exp_obd;
1370         struct lmv_obd        *lmv = &obd->u.lmv;
1371         struct lmv_tgt_desc   *tgt;
1372         int                    rc;
1373         ENTRY;
1374
1375         rc = lmv_check_connect(obd);
1376         if (rc)
1377                 RETURN(rc);
1378
1379         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1380         if (IS_ERR(tgt))
1381                 RETURN(PTR_ERR(tgt));
1382
1383         rc = md_done_writing(tgt->ltd_exp, op_data, mod);
1384         RETURN(rc);
1385 }
1386
1387 static int
1388 lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1389                    struct lookup_intent *it, struct md_op_data *op_data,
1390                    struct lustre_handle *lockh, void *lmm, int lmmsize,
1391                    int extra_lock_flags)
1392 {
1393         struct ptlrpc_request      *req = it->d.lustre.it_data;
1394         struct obd_device          *obd = exp->exp_obd;
1395         struct lmv_obd             *lmv = &obd->u.lmv;
1396         struct lustre_handle        plock;
1397         struct lmv_tgt_desc        *tgt;
1398         struct md_op_data          *rdata;
1399         struct lu_fid               fid1;
1400         struct mdt_body            *body;
1401         int                         rc = 0;
1402         int                         pmode;
1403         ENTRY;
1404
1405         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1406         LASSERT(body != NULL);
1407
1408         if (!(body->valid & OBD_MD_MDS))
1409                 RETURN(0);
1410
1411         CDEBUG(D_INODE, "REMOTE_ENQUEUE '%s' on "DFID" -> "DFID"\n",
1412                LL_IT2STR(it), PFID(&op_data->op_fid1), PFID(&body->fid1));
1413
1414         /*
1415          * We got LOOKUP lock, but we really need attrs.
1416          */
1417         pmode = it->d.lustre.it_lock_mode;
1418         LASSERT(pmode != 0);
1419         memcpy(&plock, lockh, sizeof(plock));
1420         it->d.lustre.it_lock_mode = 0;
1421         it->d.lustre.it_data = NULL;
1422         fid1 = body->fid1;
1423
1424         it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
1425         ptlrpc_req_finished(req);
1426
1427         tgt = lmv_find_target(lmv, &fid1);
1428         if (IS_ERR(tgt))
1429                 GOTO(out, rc = PTR_ERR(tgt));
1430
1431         OBD_ALLOC_PTR(rdata);
1432         if (rdata == NULL)
1433                 GOTO(out, rc = -ENOMEM);
1434
1435         rdata->op_fid1 = fid1;
1436         rdata->op_bias = MDS_CROSS_REF;
1437
1438         rc = md_enqueue(tgt->ltd_exp, einfo, it, rdata, lockh,
1439                         lmm, lmmsize, NULL, extra_lock_flags);
1440         OBD_FREE_PTR(rdata);
1441         EXIT;
1442 out:
1443         ldlm_lock_decref(&plock, pmode);
1444         return rc;
1445 }
1446
1447 static int
1448 lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1449             struct lookup_intent *it, struct md_op_data *op_data,
1450             struct lustre_handle *lockh, void *lmm, int lmmsize,
1451             struct ptlrpc_request **req, __u64 extra_lock_flags)
1452 {
1453         struct obd_device        *obd = exp->exp_obd;
1454         struct lmv_obd           *lmv = &obd->u.lmv;
1455         struct lmv_tgt_desc      *tgt;
1456         int                       rc;
1457         ENTRY;
1458
1459         rc = lmv_check_connect(obd);
1460         if (rc)
1461                 RETURN(rc);
1462
1463         CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID"\n",
1464                LL_IT2STR(it), PFID(&op_data->op_fid1));
1465
1466         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1467         if (IS_ERR(tgt))
1468                 RETURN(PTR_ERR(tgt));
1469
1470         CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID" -> mds #%d\n",
1471                LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx);
1472
1473         rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data, lockh,
1474                         lmm, lmmsize, req, extra_lock_flags);
1475
1476         if (rc == 0 && it && it->it_op == IT_OPEN) {
1477                 rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh,
1478                                         lmm, lmmsize, extra_lock_flags);
1479         }
1480         RETURN(rc);
1481 }
1482
1483 static int
1484 lmv_getattr_name(struct obd_export *exp,struct md_op_data *op_data,
1485                  struct ptlrpc_request **request)
1486 {
1487         struct ptlrpc_request   *req = NULL;
1488         struct obd_device       *obd = exp->exp_obd;
1489         struct lmv_obd          *lmv = &obd->u.lmv;
1490         struct lmv_tgt_desc     *tgt;
1491         struct mdt_body         *body;
1492         int                      rc;
1493         ENTRY;
1494
1495         rc = lmv_check_connect(obd);
1496         if (rc)
1497                 RETURN(rc);
1498
1499         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1500         if (IS_ERR(tgt))
1501                 RETURN(PTR_ERR(tgt));
1502
1503         CDEBUG(D_INODE, "GETATTR_NAME for %*s on "DFID" -> mds #%d\n",
1504                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1505                tgt->ltd_idx);
1506
1507         rc = md_getattr_name(tgt->ltd_exp, op_data, request);
1508         if (rc != 0)
1509                 RETURN(rc);
1510
1511         body = req_capsule_server_get(&(*request)->rq_pill,
1512                                       &RMF_MDT_BODY);
1513         LASSERT(body != NULL);
1514
1515         if (body->valid & OBD_MD_MDS) {
1516                 struct lu_fid rid = body->fid1;
1517                 CDEBUG(D_INODE, "Request attrs for "DFID"\n",
1518                        PFID(&rid));
1519
1520                 tgt = lmv_find_target(lmv, &rid);
1521                 if (IS_ERR(tgt)) {
1522                         ptlrpc_req_finished(*request);
1523                         RETURN(PTR_ERR(tgt));
1524                 }
1525
1526                 op_data->op_fid1 = rid;
1527                 op_data->op_valid |= OBD_MD_FLCROSSREF;
1528                 op_data->op_namelen = 0;
1529                 op_data->op_name = NULL;
1530                 rc = md_getattr_name(tgt->ltd_exp, op_data, &req);
1531                 ptlrpc_req_finished(*request);
1532                 *request = req;
1533         }
1534
1535         RETURN(rc);
1536 }
1537
1538 #define md_op_data_fid(op_data, fl)                     \
1539         (fl == MF_MDC_CANCEL_FID1 ? &op_data->op_fid1 : \
1540          fl == MF_MDC_CANCEL_FID2 ? &op_data->op_fid2 : \
1541          fl == MF_MDC_CANCEL_FID3 ? &op_data->op_fid3 : \
1542          fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \
1543          NULL)
1544
1545 static int lmv_early_cancel(struct obd_export *exp, struct md_op_data *op_data,
1546                             int op_tgt, ldlm_mode_t mode, int bits, int flag)
1547 {
1548         struct lu_fid          *fid = md_op_data_fid(op_data, flag);
1549         struct obd_device      *obd = exp->exp_obd;
1550         struct lmv_obd         *lmv = &obd->u.lmv;
1551         struct lmv_tgt_desc    *tgt;
1552         ldlm_policy_data_t      policy = {{0}};
1553         int                     rc = 0;
1554         ENTRY;
1555
1556         if (!fid_is_sane(fid))
1557                 RETURN(0);
1558
1559         tgt = lmv_find_target(lmv, fid);
1560         if (IS_ERR(tgt))
1561                 RETURN(PTR_ERR(tgt));
1562
1563         if (tgt->ltd_idx != op_tgt) {
1564                 CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid));
1565                 policy.l_inodebits.bits = bits;
1566                 rc = md_cancel_unused(tgt->ltd_exp, fid, &policy,
1567                                       mode, LCF_ASYNC, NULL);
1568         } else {
1569                 CDEBUG(D_INODE,
1570                        "EARLY_CANCEL skip operation target %d on "DFID"\n",
1571                        op_tgt, PFID(fid));
1572                 op_data->op_flags |= flag;
1573                 rc = 0;
1574         }
1575
1576         RETURN(rc);
1577 }
1578
1579 /*
1580  * llite passes fid of an target inode in op_data->op_fid1 and id of directory in
1581  * op_data->op_fid2
1582  */
1583 static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
1584                     struct ptlrpc_request **request)
1585 {
1586         struct obd_device       *obd = exp->exp_obd;
1587         struct lmv_obd          *lmv = &obd->u.lmv;
1588         struct lmv_tgt_desc     *tgt;
1589         int                      rc;
1590         ENTRY;
1591
1592         rc = lmv_check_connect(obd);
1593         if (rc)
1594                 RETURN(rc);
1595
1596         LASSERT(op_data->op_namelen != 0);
1597
1598         CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n",
1599                PFID(&op_data->op_fid2), op_data->op_namelen,
1600                op_data->op_name, PFID(&op_data->op_fid1));
1601
1602         op_data->op_fsuid = cfs_curproc_fsuid();
1603         op_data->op_fsgid = cfs_curproc_fsgid();
1604         op_data->op_cap = cfs_curproc_cap_pack();
1605         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
1606         if (IS_ERR(tgt))
1607                 RETURN(PTR_ERR(tgt));
1608
1609         /*
1610          * Cancel UPDATE lock on child (fid1).
1611          */
1612         op_data->op_flags |= MF_MDC_CANCEL_FID2;
1613         rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
1614                               MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
1615         if (rc != 0)
1616                 RETURN(rc);
1617
1618         rc = md_link(tgt->ltd_exp, op_data, request);
1619
1620         RETURN(rc);
1621 }
1622
1623 static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
1624                       const char *old, int oldlen, const char *new, int newlen,
1625                       struct ptlrpc_request **request)
1626 {
1627         struct obd_device       *obd = exp->exp_obd;
1628         struct lmv_obd          *lmv = &obd->u.lmv;
1629         struct lmv_tgt_desc     *src_tgt;
1630         struct lmv_tgt_desc     *tgt_tgt;
1631         int                     rc;
1632         ENTRY;
1633
1634         LASSERT(oldlen != 0);
1635
1636         CDEBUG(D_INODE, "RENAME %*s in "DFID" to %*s in "DFID"\n",
1637                oldlen, old, PFID(&op_data->op_fid1),
1638                newlen, new, PFID(&op_data->op_fid2));
1639
1640         rc = lmv_check_connect(obd);
1641         if (rc)
1642                 RETURN(rc);
1643
1644         op_data->op_fsuid = cfs_curproc_fsuid();
1645         op_data->op_fsgid = cfs_curproc_fsgid();
1646         op_data->op_cap = cfs_curproc_cap_pack();
1647         src_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1648         if (IS_ERR(src_tgt))
1649                 RETURN(PTR_ERR(src_tgt));
1650
1651         tgt_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
1652         if (IS_ERR(tgt_tgt))
1653                 RETURN(PTR_ERR(tgt_tgt));
1654         /*
1655          * LOOKUP lock on src child (fid3) should also be cancelled for
1656          * src_tgt in mdc_rename.
1657          */
1658         op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
1659
1660         /*
1661          * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
1662          * own target.
1663          */
1664         rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
1665                               LCK_EX, MDS_INODELOCK_UPDATE,
1666                               MF_MDC_CANCEL_FID2);
1667
1668         /*
1669          * Cancel LOOKUP locks on tgt child (fid4) for parent tgt_tgt.
1670          */
1671         if (rc == 0) {
1672                 rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
1673                                       LCK_EX, MDS_INODELOCK_LOOKUP,
1674                                       MF_MDC_CANCEL_FID4);
1675         }
1676
1677         /*
1678          * Cancel all the locks on tgt child (fid4).
1679          */
1680         if (rc == 0)
1681                 rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
1682                                       LCK_EX, MDS_INODELOCK_FULL,
1683                                       MF_MDC_CANCEL_FID4);
1684
1685         if (rc == 0)
1686                 rc = md_rename(src_tgt->ltd_exp, op_data, old, oldlen,
1687                                new, newlen, request);
1688         RETURN(rc);
1689 }
1690
1691 static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
1692                        void *ea, int ealen, void *ea2, int ea2len,
1693                        struct ptlrpc_request **request,
1694                        struct md_open_data **mod)
1695 {
1696         struct obd_device       *obd = exp->exp_obd;
1697         struct lmv_obd          *lmv = &obd->u.lmv;
1698         struct lmv_tgt_desc     *tgt;
1699         int                      rc = 0;
1700         ENTRY;
1701
1702         rc = lmv_check_connect(obd);
1703         if (rc)
1704                 RETURN(rc);
1705
1706         CDEBUG(D_INODE, "SETATTR for "DFID", valid 0x%x\n",
1707                PFID(&op_data->op_fid1), op_data->op_attr.ia_valid);
1708
1709         op_data->op_flags |= MF_MDC_CANCEL_FID1;
1710         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1711         if (IS_ERR(tgt))
1712                 RETURN(PTR_ERR(tgt));
1713
1714         rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen, ea2,
1715                         ea2len, request, mod);
1716
1717         RETURN(rc);
1718 }
1719
1720 static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
1721                     struct obd_capa *oc, struct ptlrpc_request **request)
1722 {
1723         struct obd_device         *obd = exp->exp_obd;
1724         struct lmv_obd            *lmv = &obd->u.lmv;
1725         struct lmv_tgt_desc       *tgt;
1726         int                        rc;
1727         ENTRY;
1728
1729         rc = lmv_check_connect(obd);
1730         if (rc)
1731                 RETURN(rc);
1732
1733         tgt = lmv_find_target(lmv, fid);
1734         if (IS_ERR(tgt))
1735                 RETURN(PTR_ERR(tgt));
1736
1737         rc = md_sync(tgt->ltd_exp, fid, oc, request);
1738         RETURN(rc);
1739 }
1740
1741 #if 0
1742 static void lmv_hash_adjust(__u64 *hash, __u64 hash_adj)
1743 {
1744         __u64         val;
1745
1746         val = le64_to_cpu(*hash);
1747         if (val < hash_adj)
1748                 val += MAX_HASH_SIZE;
1749         if (val != MDS_DIR_END_OFF)
1750                 *hash = cpu_to_le64(val - hash_adj);
1751 }
1752
1753 static __u32 lmv_node_rank(struct obd_export *exp, const struct lu_fid *fid)
1754 {
1755         __u64              id;
1756         struct obd_import *imp;
1757
1758         /*
1759          * XXX: to get nid we assume that underlying obd device is mdc.
1760          */
1761         imp  = class_exp2cliimp(exp);
1762         id   = imp->imp_connection->c_self + fid_flatten(fid);
1763
1764         CDEBUG(D_INODE, "Readpage node rank: "LPX64" "DFID" "LPX64" "LPX64"\n",
1765                imp->imp_connection->c_self, PFID(fid), id, id ^ (id >> 32));
1766
1767         return id ^ (id >> 32);
1768 }
1769 #endif
1770
1771 static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data,
1772                         struct page **pages, struct ptlrpc_request **request)
1773 {
1774         struct obd_device       *obd = exp->exp_obd;
1775         struct lmv_obd          *lmv = &obd->u.lmv;
1776         __u64                    offset = op_data->op_offset;
1777         int                      rc;
1778         int                      i;
1779         /* number of pages read, in CFS_PAGE_SIZE */
1780         int                      nrdpgs;
1781         /* number of pages transferred in LU_PAGE_SIZE */
1782         int                      nlupgs;
1783         struct lmv_tgt_desc     *tgt;
1784         struct lu_dirpage       *dp;
1785         struct lu_dirent        *ent;
1786         ENTRY;
1787
1788         rc = lmv_check_connect(obd);
1789         if (rc)
1790                 RETURN(rc);
1791
1792         CDEBUG(D_INODE, "READPAGE at "LPX64" from "DFID"\n",
1793                offset, PFID(&op_data->op_fid1));
1794
1795         /*
1796          * This case handle directory lookup in clustered metadata case (i.e.
1797          * split directory is located on multiple md servers.)
1798          * each server keeps directory entries for certain range of hashes.
1799          * E.g. we have N server and suppose hash range is 0 to MAX_HASH.
1800          * first server will keep records with hashes [ 0 ... MAX_HASH /N  - 1],
1801          * second one with hashes [MAX_HASH / N ... 2 * MAX_HASH / N] and
1802          * so on....
1803          *      readdir can simply start reading entries from 0 - N server in
1804          * order but that will not scale well as all client will request dir in
1805          * to server in same order.
1806          * Following algorithm does optimization:
1807          * Instead of doing readdir in 1, 2, ...., N order, client with a
1808          * rank R does readdir in R, R + 1, ..., N, 1, ... R - 1 order.
1809          * (every client has rank R)
1810          *      But ll_readdir() expect offset range [0 to MAX_HASH/N) but
1811          * since client ask dir from MDS{R} client has pages with offsets
1812          * [R*MAX_HASH/N ... (R + 1)*MAX_HASH/N] there for we do hash_adj
1813          * on hash  values that we get.
1814         if (0) {
1815                 LASSERT(nr > 0);
1816                 seg_size = MAX_HASH_SIZE;
1817                 do_div(seg_size, nr);
1818                 los      = obj->lo_stripes;
1819                 tgt      = lmv_get_target(lmv, los[0].ls_mds);
1820                 rank     = lmv_node_rank(tgt->ltd_exp, fid) % nr;
1821                 tgt_tmp  = offset;
1822                 do_div(tgt_tmp, seg_size);
1823                 tgt0_idx = do_div(tgt_tmp,  nr);
1824                 tgt_idx  = (tgt0_idx + rank) % nr;
1825
1826                 if (tgt_idx < tgt0_idx)
1827                          * Wrap around.
1828                          *
1829                          * Last segment has unusual length due to division
1830                          * rounding.
1831                         hash_adj = MAX_HASH_SIZE - seg_size * nr;
1832                 else
1833                         hash_adj = 0;
1834
1835                 hash_adj += rank * seg_size;
1836
1837                 CDEBUG(D_INODE, "Readpage hash adjustment: %x "LPX64" "
1838                        LPX64"/%x -> "LPX64"/%x\n", rank, hash_adj,
1839                        offset, tgt0_idx, offset + hash_adj, tgt_idx);
1840
1841                 offset = (offset + hash_adj) & MAX_HASH_SIZE;
1842                 rid = lsm->mea_oinfo[tgt_idx].lmo_fid;
1843                 tgt = lmv_get_target(lmv, lsm->mea_oinfo[tgt_idx].lmo_mds);
1844
1845                 CDEBUG(D_INODE, "Forward to "DFID" with offset %lu i %d\n",
1846                        PFID(&rid), (unsigned long)offset, tgt_idx);
1847         }
1848         */
1849         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1850         if (IS_ERR(tgt))
1851                 RETURN(PTR_ERR(tgt));
1852
1853         rc = md_readpage(tgt->ltd_exp, op_data, pages, request);
1854         if (rc != 0)
1855                 RETURN(rc);
1856
1857         nrdpgs = ((*request)->rq_bulk->bd_nob_transferred + CFS_PAGE_SIZE - 1)
1858                  >> CFS_PAGE_SHIFT;
1859         nlupgs = (*request)->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT;
1860         LASSERT(!((*request)->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK));
1861         LASSERT(nrdpgs > 0 && nrdpgs <= op_data->op_npages);
1862
1863         CDEBUG(D_INODE, "read %d(%d)/%d pages\n", nrdpgs, nlupgs,
1864                op_data->op_npages);
1865
1866         for (i = 0; i < nrdpgs; i++) {
1867 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
1868                 struct lu_dirpage *first;
1869                 __u64 hash_end = 0;
1870                 __u32 flags = 0;
1871 #endif
1872                 struct lu_dirent *tmp = NULL;
1873
1874                 dp = cfs_kmap(pages[i]);
1875                 ent = lu_dirent_start(dp);
1876 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
1877                 first = dp;
1878                 hash_end = dp->ldp_hash_end;
1879 repeat:
1880 #endif
1881                 nlupgs--;
1882
1883                 for (tmp = ent; ent != NULL;
1884                      tmp = ent, ent = lu_dirent_next(ent));
1885 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
1886                 dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
1887                 if (((unsigned long)dp & ~CFS_PAGE_MASK) && nlupgs > 0) {
1888                         ent = lu_dirent_start(dp);
1889
1890                         if (tmp) {
1891                                 /* enlarge the end entry lde_reclen from 0 to
1892                                  * first entry of next lu_dirpage, in this way
1893                                  * several lu_dirpages can be stored into one
1894                                  * client page on client. */
1895                                 tmp = ((void *)tmp) +
1896                                       le16_to_cpu(tmp->lde_reclen);
1897                                 tmp->lde_reclen =
1898                                         cpu_to_le16((char *)(dp->ldp_entries) -
1899                                                     (char *)tmp);
1900                                 goto repeat;
1901                         }
1902                 }
1903                 first->ldp_hash_end = hash_end;
1904                 first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
1905                 first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
1906 #else
1907                 SET_BUT_UNUSED(tmp);
1908 #endif
1909                 cfs_kunmap(pages[i]);
1910         }
1911         RETURN(rc);
1912 }
1913
1914 static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
1915                       struct ptlrpc_request **request)
1916 {
1917         struct obd_device       *obd = exp->exp_obd;
1918         struct lmv_obd          *lmv = &obd->u.lmv;
1919         struct lmv_tgt_desc     *tgt = NULL;
1920         int                      rc;
1921         ENTRY;
1922
1923         rc = lmv_check_connect(obd);
1924         if (rc)
1925                 RETURN(rc);
1926
1927         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1928         if (IS_ERR(tgt))
1929                 RETURN(PTR_ERR(tgt));
1930
1931         op_data->op_fsuid = cfs_curproc_fsuid();
1932         op_data->op_fsgid = cfs_curproc_fsgid();
1933         op_data->op_cap = cfs_curproc_cap_pack();
1934
1935         /*
1936          * If child's fid is given, cancel unused locks for it if it is from
1937          * another export than parent.
1938          *
1939          * LOOKUP lock for child (fid3) should also be cancelled on parent
1940          * tgt_tgt in mdc_unlink().
1941          */
1942         op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
1943
1944         /*
1945          * Cancel FULL locks on child (fid3).
1946          */
1947         rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
1948                               MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
1949
1950         if (rc != 0)
1951                 RETURN(rc);
1952
1953         rc = md_unlink(tgt->ltd_exp, op_data, request);
1954
1955         RETURN(rc);
1956 }
1957
1958 static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
1959 {
1960         struct lmv_obd *lmv = &obd->u.lmv;
1961         int rc = 0;
1962
1963         switch (stage) {
1964         case OBD_CLEANUP_EARLY:
1965                 /* XXX: here should be calling obd_precleanup() down to
1966                  * stack. */
1967                 break;
1968         case OBD_CLEANUP_EXPORTS:
1969                 fld_client_proc_fini(&lmv->lmv_fld);
1970                 lprocfs_obd_cleanup(obd);
1971                 break;
1972         default:
1973                 break;
1974         }
1975         RETURN(rc);
1976 }
1977
1978 static int lmv_get_info(const struct lu_env *env, struct obd_export *exp,
1979                         __u32 keylen, void *key, __u32 *vallen, void *val,
1980                         struct lov_stripe_md *lsm)
1981 {
1982         struct obd_device       *obd;
1983         struct lmv_obd          *lmv;
1984         int                      rc = 0;
1985         ENTRY;
1986
1987         obd = class_exp2obd(exp);
1988         if (obd == NULL) {
1989                 CDEBUG(D_IOCTL, "Invalid client cookie "LPX64"\n",
1990                        exp->exp_handle.h_cookie);
1991                 RETURN(-EINVAL);
1992         }
1993
1994         lmv = &obd->u.lmv;
1995         if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
1996                 struct lmv_tgt_desc *tgts;
1997                 int i;
1998
1999                 rc = lmv_check_connect(obd);
2000                 if (rc)
2001                         RETURN(rc);
2002
2003                 LASSERT(*vallen == sizeof(__u32));
2004                 for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count;
2005                      i++, tgts++) {
2006
2007                         /*
2008                          * All tgts should be connected when this gets called.
2009                          */
2010                         if (!tgts || !tgts->ltd_exp) {
2011                                 CERROR("target not setup?\n");
2012                                 continue;
2013                         }
2014
2015                         if (!obd_get_info(env, tgts->ltd_exp, keylen, key,
2016                                           vallen, val, NULL))
2017                                 RETURN(0);
2018                 }
2019                 RETURN(-EINVAL);
2020         } else if (KEY_IS(KEY_MAX_EASIZE) || KEY_IS(KEY_CONN_DATA)) {
2021                 rc = lmv_check_connect(obd);
2022                 if (rc)
2023                         RETURN(rc);
2024
2025                 /*
2026                  * Forwarding this request to first MDS, it should know LOV
2027                  * desc.
2028                  */
2029                 rc = obd_get_info(env, lmv->tgts[0].ltd_exp, keylen, key,
2030                                   vallen, val, NULL);
2031                 if (!rc && KEY_IS(KEY_CONN_DATA)) {
2032                         exp->exp_connect_flags =
2033                         ((struct obd_connect_data *)val)->ocd_connect_flags;
2034                 }
2035                 RETURN(rc);
2036         } else if (KEY_IS(KEY_TGT_COUNT)) {
2037                 *((int *)val) = lmv->desc.ld_tgt_count;
2038                 RETURN(0);
2039         }
2040
2041         CDEBUG(D_IOCTL, "Invalid key\n");
2042         RETURN(-EINVAL);
2043 }
2044
2045 int lmv_set_info_async(const struct lu_env *env, struct obd_export *exp,
2046                        obd_count keylen, void *key, obd_count vallen,
2047                        void *val, struct ptlrpc_request_set *set)
2048 {
2049         struct lmv_tgt_desc    *tgt;
2050         struct obd_device      *obd;
2051         struct lmv_obd         *lmv;
2052         int rc = 0;
2053         ENTRY;
2054
2055         obd = class_exp2obd(exp);
2056         if (obd == NULL) {
2057                 CDEBUG(D_IOCTL, "Invalid client cookie "LPX64"\n",
2058                        exp->exp_handle.h_cookie);
2059                 RETURN(-EINVAL);
2060         }
2061         lmv = &obd->u.lmv;
2062
2063         if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX)) {
2064                 int i, err = 0;
2065
2066                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2067                         tgt = &lmv->tgts[i];
2068
2069                         if (!tgt->ltd_exp)
2070                                 continue;
2071
2072                         err = obd_set_info_async(env, tgt->ltd_exp,
2073                                                  keylen, key, vallen, val, set);
2074                         if (err && rc == 0)
2075                                 rc = err;
2076                 }
2077
2078                 RETURN(rc);
2079         }
2080
2081         RETURN(-EINVAL);
2082 }
2083
2084 int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
2085                struct lov_stripe_md *lsm)
2086 {
2087         struct obd_device         *obd = class_exp2obd(exp);
2088         struct lmv_obd            *lmv = &obd->u.lmv;
2089         struct lmv_stripe_md      *meap;
2090         struct lmv_stripe_md      *lsmp;
2091         int                        mea_size;
2092         int                        i;
2093         ENTRY;
2094
2095         mea_size = lmv_get_easize(lmv);
2096         if (!lmmp)
2097                 RETURN(mea_size);
2098
2099         if (*lmmp && !lsm) {
2100                 OBD_FREE_LARGE(*lmmp, mea_size);
2101                 *lmmp = NULL;
2102                 RETURN(0);
2103         }
2104
2105         if (*lmmp == NULL) {
2106                 OBD_ALLOC_LARGE(*lmmp, mea_size);
2107                 if (*lmmp == NULL)
2108                         RETURN(-ENOMEM);
2109         }
2110
2111         if (!lsm)
2112                 RETURN(mea_size);
2113
2114         lsmp = (struct lmv_stripe_md *)lsm;
2115         meap = (struct lmv_stripe_md *)*lmmp;
2116
2117         if (lsmp->mea_magic != MEA_MAGIC_LAST_CHAR &&
2118             lsmp->mea_magic != MEA_MAGIC_ALL_CHARS)
2119                 RETURN(-EINVAL);
2120
2121         meap->mea_magic = cpu_to_le32(lsmp->mea_magic);
2122         meap->mea_count = cpu_to_le32(lsmp->mea_count);
2123         meap->mea_master = cpu_to_le32(lsmp->mea_master);
2124
2125         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2126                 meap->mea_ids[i] = meap->mea_ids[i];
2127                 fid_cpu_to_le(&meap->mea_ids[i], &meap->mea_ids[i]);
2128         }
2129
2130         RETURN(mea_size);
2131 }
2132
2133 int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
2134                  struct lov_mds_md *lmm, int lmm_size)
2135 {
2136         struct obd_device          *obd = class_exp2obd(exp);
2137         struct lmv_stripe_md      **tmea = (struct lmv_stripe_md **)lsmp;
2138         struct lmv_stripe_md       *mea = (struct lmv_stripe_md *)lmm;
2139         struct lmv_obd             *lmv = &obd->u.lmv;
2140         int                         mea_size;
2141         int                         i;
2142         __u32                       magic;
2143         ENTRY;
2144
2145         mea_size = lmv_get_easize(lmv);
2146         if (lsmp == NULL)
2147                 return mea_size;
2148
2149         if (*lsmp != NULL && lmm == NULL) {
2150                 OBD_FREE_LARGE(*tmea, mea_size);
2151                 *lsmp = NULL;
2152                 RETURN(0);
2153         }
2154
2155         LASSERT(mea_size == lmm_size);
2156
2157         OBD_ALLOC_LARGE(*tmea, mea_size);
2158         if (*tmea == NULL)
2159                 RETURN(-ENOMEM);
2160
2161         if (!lmm)
2162                 RETURN(mea_size);
2163
2164         if (mea->mea_magic == MEA_MAGIC_LAST_CHAR ||
2165             mea->mea_magic == MEA_MAGIC_ALL_CHARS ||
2166             mea->mea_magic == MEA_MAGIC_HASH_SEGMENT)
2167         {
2168                 magic = le32_to_cpu(mea->mea_magic);
2169         } else {
2170                 /*
2171                  * Old mea is not handled here.
2172                  */
2173                 CERROR("Old not supportable EA is found\n");
2174                 LBUG();
2175         }
2176
2177         (*tmea)->mea_magic = magic;
2178         (*tmea)->mea_count = le32_to_cpu(mea->mea_count);
2179         (*tmea)->mea_master = le32_to_cpu(mea->mea_master);
2180
2181         for (i = 0; i < (*tmea)->mea_count; i++) {
2182                 (*tmea)->mea_ids[i] = mea->mea_ids[i];
2183                 fid_le_to_cpu(&(*tmea)->mea_ids[i], &(*tmea)->mea_ids[i]);
2184         }
2185         RETURN(mea_size);
2186 }
2187
2188 static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
2189                              ldlm_policy_data_t *policy, ldlm_mode_t mode,
2190                              ldlm_cancel_flags_t flags, void *opaque)
2191 {
2192         struct obd_device       *obd = exp->exp_obd;
2193         struct lmv_obd          *lmv = &obd->u.lmv;
2194         int                      rc = 0;
2195         int                      err;
2196         int                      i;
2197         ENTRY;
2198
2199         LASSERT(fid != NULL);
2200
2201         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2202                 if (!lmv->tgts[i].ltd_exp || !lmv->tgts[i].ltd_active)
2203                         continue;
2204
2205                 err = md_cancel_unused(lmv->tgts[i].ltd_exp, fid,
2206                                        policy, mode, flags, opaque);
2207                 if (!rc)
2208                         rc = err;
2209         }
2210         RETURN(rc);
2211 }
2212
2213 int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
2214                       __u64 *bits)
2215 {
2216         struct obd_device       *obd = exp->exp_obd;
2217         struct lmv_obd          *lmv = &obd->u.lmv;
2218         int                      rc;
2219         ENTRY;
2220
2221         rc =  md_set_lock_data(lmv->tgts[0].ltd_exp, lockh, data, bits);
2222         RETURN(rc);
2223 }
2224
2225 ldlm_mode_t lmv_lock_match(struct obd_export *exp, __u64 flags,
2226                            const struct lu_fid *fid, ldlm_type_t type,
2227                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
2228                            struct lustre_handle *lockh)
2229 {
2230         struct obd_device       *obd = exp->exp_obd;
2231         struct lmv_obd          *lmv = &obd->u.lmv;
2232         ldlm_mode_t              rc;
2233         int                      i;
2234         ENTRY;
2235
2236         CDEBUG(D_INODE, "Lock match for "DFID"\n", PFID(fid));
2237
2238         /*
2239          * With CMD every object can have two locks in different namespaces:
2240          * lookup lock in space of mds storing direntry and update/open lock in
2241          * space of mds storing inode. Thus we check all targets, not only that
2242          * one fid was created in.
2243          */
2244         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2245                 rc = md_lock_match(lmv->tgts[i].ltd_exp, flags, fid,
2246                                    type, policy, mode, lockh);
2247                 if (rc)
2248                         RETURN(rc);
2249         }
2250
2251         RETURN(0);
2252 }
2253
2254 int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
2255                       struct obd_export *dt_exp, struct obd_export *md_exp,
2256                       struct lustre_md *md)
2257 {
2258         struct obd_device       *obd = exp->exp_obd;
2259         struct lmv_obd          *lmv = &obd->u.lmv;
2260         int                      rc;
2261         ENTRY;
2262         rc = md_get_lustre_md(lmv->tgts[0].ltd_exp, req, dt_exp, md_exp, md);
2263         RETURN(rc);
2264 }
2265
2266 int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
2267 {
2268         struct obd_device       *obd = exp->exp_obd;
2269         struct lmv_obd          *lmv = &obd->u.lmv;
2270         ENTRY;
2271
2272         if (md->mea)
2273                 obd_free_memmd(exp, (void *)&md->mea);
2274         RETURN(md_free_lustre_md(lmv->tgts[0].ltd_exp, md));
2275 }
2276
2277 int lmv_set_open_replay_data(struct obd_export *exp,
2278                              struct obd_client_handle *och,
2279                              struct ptlrpc_request *open_req)
2280 {
2281         struct obd_device       *obd = exp->exp_obd;
2282         struct lmv_obd          *lmv = &obd->u.lmv;
2283         struct lmv_tgt_desc     *tgt;
2284         ENTRY;
2285
2286         tgt = lmv_find_target(lmv, &och->och_fid);
2287         if (IS_ERR(tgt))
2288                 RETURN(PTR_ERR(tgt));
2289
2290         RETURN(md_set_open_replay_data(tgt->ltd_exp, och, open_req));
2291 }
2292
2293 int lmv_clear_open_replay_data(struct obd_export *exp,
2294                                struct obd_client_handle *och)
2295 {
2296         struct obd_device       *obd = exp->exp_obd;
2297         struct lmv_obd          *lmv = &obd->u.lmv;
2298         struct lmv_tgt_desc     *tgt;
2299         ENTRY;
2300
2301         tgt = lmv_find_target(lmv, &och->och_fid);
2302         if (IS_ERR(tgt))
2303                 RETURN(PTR_ERR(tgt));
2304
2305         RETURN(md_clear_open_replay_data(tgt->ltd_exp, och));
2306 }
2307
2308 static int lmv_get_remote_perm(struct obd_export *exp,
2309                                const struct lu_fid *fid,
2310                                struct obd_capa *oc, __u32 suppgid,
2311                                struct ptlrpc_request **request)
2312 {
2313         struct obd_device       *obd = exp->exp_obd;
2314         struct lmv_obd          *lmv = &obd->u.lmv;
2315         struct lmv_tgt_desc     *tgt;
2316         int                      rc;
2317         ENTRY;
2318
2319         rc = lmv_check_connect(obd);
2320         if (rc)
2321                 RETURN(rc);
2322
2323         tgt = lmv_find_target(lmv, fid);
2324         if (IS_ERR(tgt))
2325                 RETURN(PTR_ERR(tgt));
2326
2327         rc = md_get_remote_perm(tgt->ltd_exp, fid, oc, suppgid, request);
2328         RETURN(rc);
2329 }
2330
2331 static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *oc,
2332                           renew_capa_cb_t cb)
2333 {
2334         struct obd_device       *obd = exp->exp_obd;
2335         struct lmv_obd          *lmv = &obd->u.lmv;
2336         struct lmv_tgt_desc     *tgt;
2337         int                      rc;
2338         ENTRY;
2339
2340         rc = lmv_check_connect(obd);
2341         if (rc)
2342                 RETURN(rc);
2343
2344         tgt = lmv_find_target(lmv, &oc->c_capa.lc_fid);
2345         if (IS_ERR(tgt))
2346                 RETURN(PTR_ERR(tgt));
2347
2348         rc = md_renew_capa(tgt->ltd_exp, oc, cb);
2349         RETURN(rc);
2350 }
2351
2352 int lmv_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req,
2353                     const struct req_msg_field *field, struct obd_capa **oc)
2354 {
2355         struct obd_device *obd = exp->exp_obd;
2356         struct lmv_obd *lmv = &obd->u.lmv;
2357         int rc;
2358
2359         ENTRY;
2360         rc = md_unpack_capa(lmv->tgts[0].ltd_exp, req, field, oc);
2361         RETURN(rc);
2362 }
2363
2364 int lmv_intent_getattr_async(struct obd_export *exp,
2365                              struct md_enqueue_info *minfo,
2366                              struct ldlm_enqueue_info *einfo)
2367 {
2368         struct md_op_data       *op_data = &minfo->mi_data;
2369         struct obd_device       *obd = exp->exp_obd;
2370         struct lmv_obd          *lmv = &obd->u.lmv;
2371         struct lmv_tgt_desc     *tgt = NULL;
2372         int                      rc;
2373         ENTRY;
2374
2375         rc = lmv_check_connect(obd);
2376         if (rc)
2377                 RETURN(rc);
2378
2379         tgt = lmv_find_target(lmv, &op_data->op_fid1);
2380         if (IS_ERR(tgt))
2381                 RETURN(PTR_ERR(tgt));
2382
2383         rc = md_intent_getattr_async(tgt->ltd_exp, minfo, einfo);
2384         RETURN(rc);
2385 }
2386
2387 int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
2388                         struct lu_fid *fid, __u64 *bits)
2389 {
2390         struct obd_device       *obd = exp->exp_obd;
2391         struct lmv_obd          *lmv = &obd->u.lmv;
2392         struct lmv_tgt_desc     *tgt;
2393         int                      rc;
2394         ENTRY;
2395
2396         rc = lmv_check_connect(obd);
2397         if (rc)
2398                 RETURN(rc);
2399
2400         tgt = lmv_find_target(lmv, fid);
2401         if (IS_ERR(tgt))
2402                 RETURN(PTR_ERR(tgt));
2403
2404         rc = md_revalidate_lock(tgt->ltd_exp, it, fid, bits);
2405         RETURN(rc);
2406 }
2407
2408 /**
2409  * For lmv, only need to send request to master MDT, and the master MDT will
2410  * process with other slave MDTs. The only exception is Q_GETOQUOTA for which
2411  * we directly fetch data from the slave MDTs.
2412  */
2413 int lmv_quotactl(struct obd_device *unused, struct obd_export *exp,
2414                  struct obd_quotactl *oqctl)
2415 {
2416         struct obd_device   *obd = class_exp2obd(exp);
2417         struct lmv_obd      *lmv = &obd->u.lmv;
2418         struct lmv_tgt_desc *tgt = &lmv->tgts[0];
2419         int                  rc = 0, i;
2420         __u64                curspace, curinodes;
2421         ENTRY;
2422
2423         if (!lmv->desc.ld_tgt_count || !tgt->ltd_active) {
2424                 CERROR("master lmv inactive\n");
2425                 RETURN(-EIO);
2426         }
2427
2428         if (oqctl->qc_cmd != Q_GETOQUOTA) {
2429                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
2430                 RETURN(rc);
2431         }
2432
2433         curspace = curinodes = 0;
2434         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2435                 int err;
2436                 tgt = &lmv->tgts[i];
2437
2438                 if (tgt->ltd_exp == NULL)
2439                         continue;
2440                 if (!tgt->ltd_active) {
2441                         CDEBUG(D_HA, "mdt %d is inactive.\n", i);
2442                         continue;
2443                 }
2444
2445                 err = obd_quotactl(tgt->ltd_exp, oqctl);
2446                 if (err) {
2447                         CERROR("getquota on mdt %d failed. %d\n", i, err);
2448                         if (!rc)
2449                                 rc = err;
2450                 } else {
2451                         curspace += oqctl->qc_dqblk.dqb_curspace;
2452                         curinodes += oqctl->qc_dqblk.dqb_curinodes;
2453                 }
2454         }
2455         oqctl->qc_dqblk.dqb_curspace = curspace;
2456         oqctl->qc_dqblk.dqb_curinodes = curinodes;
2457
2458         RETURN(rc);
2459 }
2460
2461 int lmv_quotacheck(struct obd_device *unused, struct obd_export *exp,
2462                    struct obd_quotactl *oqctl)
2463 {
2464         struct obd_device   *obd = class_exp2obd(exp);
2465         struct lmv_obd      *lmv = &obd->u.lmv;
2466         struct lmv_tgt_desc *tgt;
2467         int                  i, rc = 0;
2468         ENTRY;
2469
2470         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
2471                 int err;
2472
2473                 if (!tgt->ltd_active) {
2474                         CERROR("lmv idx %d inactive\n", i);
2475                         RETURN(-EIO);
2476                 }
2477
2478                 err = obd_quotacheck(tgt->ltd_exp, oqctl);
2479                 if (err && !rc)
2480                         rc = err;
2481         }
2482
2483         RETURN(rc);
2484 }
2485
2486 struct obd_ops lmv_obd_ops = {
2487         .o_owner                = THIS_MODULE,
2488         .o_setup                = lmv_setup,
2489         .o_cleanup              = lmv_cleanup,
2490         .o_precleanup           = lmv_precleanup,
2491         .o_process_config       = lmv_process_config,
2492         .o_connect              = lmv_connect,
2493         .o_disconnect           = lmv_disconnect,
2494         .o_statfs               = lmv_statfs,
2495         .o_get_info             = lmv_get_info,
2496         .o_set_info_async       = lmv_set_info_async,
2497         .o_packmd               = lmv_packmd,
2498         .o_unpackmd             = lmv_unpackmd,
2499         .o_notify               = lmv_notify,
2500         .o_get_uuid             = lmv_get_uuid,
2501         .o_iocontrol            = lmv_iocontrol,
2502         .o_quotacheck           = lmv_quotacheck,
2503         .o_quotactl             = lmv_quotactl
2504 };
2505
2506 struct md_ops lmv_md_ops = {
2507         .m_getstatus            = lmv_getstatus,
2508         .m_change_cbdata        = lmv_change_cbdata,
2509         .m_find_cbdata          = lmv_find_cbdata,
2510         .m_close                = lmv_close,
2511         .m_create               = lmv_create,
2512         .m_done_writing         = lmv_done_writing,
2513         .m_enqueue              = lmv_enqueue,
2514         .m_getattr              = lmv_getattr,
2515         .m_getxattr             = lmv_getxattr,
2516         .m_getattr_name         = lmv_getattr_name,
2517         .m_intent_lock          = lmv_intent_lock,
2518         .m_link                 = lmv_link,
2519         .m_rename               = lmv_rename,
2520         .m_setattr              = lmv_setattr,
2521         .m_setxattr             = lmv_setxattr,
2522         .m_sync                 = lmv_sync,
2523         .m_readpage             = lmv_readpage,
2524         .m_unlink               = lmv_unlink,
2525         .m_init_ea_size         = lmv_init_ea_size,
2526         .m_cancel_unused        = lmv_cancel_unused,
2527         .m_set_lock_data        = lmv_set_lock_data,
2528         .m_lock_match           = lmv_lock_match,
2529         .m_get_lustre_md        = lmv_get_lustre_md,
2530         .m_free_lustre_md       = lmv_free_lustre_md,
2531         .m_set_open_replay_data = lmv_set_open_replay_data,
2532         .m_clear_open_replay_data = lmv_clear_open_replay_data,
2533         .m_renew_capa           = lmv_renew_capa,
2534         .m_unpack_capa          = lmv_unpack_capa,
2535         .m_get_remote_perm      = lmv_get_remote_perm,
2536         .m_intent_getattr_async = lmv_intent_getattr_async,
2537         .m_revalidate_lock      = lmv_revalidate_lock
2538 };
2539
2540 int __init lmv_init(void)
2541 {
2542         struct lprocfs_static_vars lvars;
2543         int                        rc;
2544
2545         lprocfs_lmv_init_vars(&lvars);
2546
2547         rc = class_register_type(&lmv_obd_ops, &lmv_md_ops,
2548                                  lvars.module_vars, LUSTRE_LMV_NAME, NULL);
2549         return rc;
2550 }
2551
2552 #ifdef __KERNEL__
2553 static void lmv_exit(void)
2554 {
2555         class_unregister_type(LUSTRE_LMV_NAME);
2556 }
2557
2558 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2559 MODULE_DESCRIPTION("Lustre Logical Metadata Volume OBD driver");
2560 MODULE_LICENSE("GPL");
2561
2562 module_init(lmv_init);
2563 module_exit(lmv_exit);
2564 #endif