Whamcloud - gitweb
LU-1187 lmv: Locate right MDT in lmv.
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_LMV
38 #ifdef __KERNEL__
39 #include <linux/slab.h>
40 #include <linux/module.h>
41 #include <linux/init.h>
42 #include <linux/slab.h>
43 #include <linux/pagemap.h>
44 #include <linux/mm.h>
45 #include <asm/div64.h>
46 #include <linux/seq_file.h>
47 #include <linux/namei.h>
48 #else
49 #include <liblustre.h>
50 #endif
51
52 #include <lustre/lustre_idl.h>
53 #include <obd_support.h>
54 #include <lustre_lib.h>
55 #include <lustre_net.h>
56 #include <obd_class.h>
57 #include <lprocfs_status.h>
58 #include <lustre_lite.h>
59 #include <lustre_fid.h>
60 #include "lmv_internal.h"
61
62 static void lmv_activate_target(struct lmv_obd *lmv,
63                                 struct lmv_tgt_desc *tgt,
64                                 int activate)
65 {
66         if (tgt->ltd_active == activate)
67                 return;
68
69         tgt->ltd_active = activate;
70         lmv->desc.ld_active_tgt_count += (activate ? 1 : -1);
71 }
72
73 /**
74  * Error codes:
75  *
76  *  -EINVAL  : UUID can't be found in the LMV's target list
77  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
78  *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
79  */
80 static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
81                               int activate)
82 {
83         struct lmv_tgt_desc    *tgt;
84         struct obd_device      *obd;
85         int                     i;
86         int                     rc = 0;
87         ENTRY;
88
89         CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
90                lmv, uuid->uuid, activate);
91
92         spin_lock(&lmv->lmv_lock);
93         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
94                 if (tgt->ltd_exp == NULL)
95                         continue;
96
97                 CDEBUG(D_INFO, "Target idx %d is %s conn "LPX64"\n",
98                        i, tgt->ltd_uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
99
100                 if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
101                         break;
102         }
103
104         if (i == lmv->desc.ld_tgt_count)
105                 GOTO(out_lmv_lock, rc = -EINVAL);
106
107         obd = class_exp2obd(tgt->ltd_exp);
108         if (obd == NULL)
109                 GOTO(out_lmv_lock, rc = -ENOTCONN);
110
111         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
112                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
113                obd->obd_type->typ_name, i);
114         LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0);
115
116         if (tgt->ltd_active == activate) {
117                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
118                        activate ? "" : "in");
119                 GOTO(out_lmv_lock, rc);
120         }
121
122         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd,
123                activate ? "" : "in");
124         lmv_activate_target(lmv, tgt, activate);
125         EXIT;
126
127  out_lmv_lock:
128         spin_unlock(&lmv->lmv_lock);
129         return rc;
130 }
131
132 static int lmv_set_mdc_data(struct lmv_obd *lmv, struct obd_uuid *uuid,
133                             struct obd_connect_data *data)
134 {
135         struct lmv_tgt_desc    *tgt;
136         int                     i;
137         ENTRY;
138
139         LASSERT(data != NULL);
140
141         spin_lock(&lmv->lmv_lock);
142         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
143                 if (tgt->ltd_exp == NULL)
144                         continue;
145
146                 if (obd_uuid_equals(uuid, &tgt->ltd_uuid)) {
147                         lmv->datas[tgt->ltd_idx] = *data;
148                         break;
149                 }
150         }
151         spin_unlock(&lmv->lmv_lock);
152         RETURN(0);
153 }
154
155 struct obd_uuid *lmv_get_uuid(struct obd_export *exp) {
156         struct obd_device *obd = exp->exp_obd;
157         struct lmv_obd *lmv = &obd->u.lmv;
158         return obd_get_uuid(lmv->tgts[0].ltd_exp);
159 }
160
161 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
162                       enum obd_notify_event ev, void *data)
163 {
164         struct obd_connect_data *conn_data;
165         struct lmv_obd          *lmv = &obd->u.lmv;
166         struct obd_uuid         *uuid;
167         int                      rc = 0;
168         ENTRY;
169
170         if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) {
171                 CERROR("unexpected notification of %s %s!\n",
172                        watched->obd_type->typ_name,
173                        watched->obd_name);
174                 RETURN(-EINVAL);
175         }
176
177         uuid = &watched->u.cli.cl_target_uuid;
178         if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) {
179                 /*
180                  * Set MDC as active before notifying the observer, so the
181                  * observer can use the MDC normally.
182                  */
183                 rc = lmv_set_mdc_active(lmv, uuid,
184                                         ev == OBD_NOTIFY_ACTIVE);
185                 if (rc) {
186                         CERROR("%sactivation of %s failed: %d\n",
187                                ev == OBD_NOTIFY_ACTIVE ? "" : "de",
188                                uuid->uuid, rc);
189                         RETURN(rc);
190                 }
191         } else if (ev == OBD_NOTIFY_OCD) {
192                 conn_data = &watched->u.cli.cl_import->imp_connect_data;
193
194                 /*
195                  * Set connect data to desired target, update exp_connect_flags.
196                  */
197                 rc = lmv_set_mdc_data(lmv, uuid, conn_data);
198                 if (rc) {
199                         CERROR("can't set connect data to target %s, rc %d\n",
200                                uuid->uuid, rc);
201                         RETURN(rc);
202                 }
203
204                 /*
205                  * XXX: Make sure that ocd_connect_flags from all targets are
206                  * the same. Otherwise one of MDTs runs wrong version or
207                  * something like this.  --umka
208                  */
209                 obd->obd_self_export->exp_connect_flags =
210                         conn_data->ocd_connect_flags;
211         }
212 #if 0
213         else if (ev == OBD_NOTIFY_DISCON) {
214                 /*
215                  * For disconnect event, flush fld cache for failout MDS case.
216                  */
217                 fld_client_flush(&lmv->lmv_fld);
218         }
219 #endif
220         /*
221          * Pass the notification up the chain.
222          */
223         if (obd->obd_observer)
224                 rc = obd_notify(obd->obd_observer, watched, ev, data);
225
226         RETURN(rc);
227 }
228
229 /**
230  * This is fake connect function. Its purpose is to initialize lmv and say
231  * caller that everything is okay. Real connection will be performed later.
232  */
233 static int lmv_connect(const struct lu_env *env,
234                        struct obd_export **exp, struct obd_device *obd,
235                        struct obd_uuid *cluuid, struct obd_connect_data *data,
236                        void *localdata)
237 {
238 #ifdef __KERNEL__
239         struct proc_dir_entry *lmv_proc_dir;
240 #endif
241         struct lmv_obd        *lmv = &obd->u.lmv;
242         struct lustre_handle  conn = { 0 };
243         int                    rc = 0;
244         ENTRY;
245
246         /*
247          * We don't want to actually do the underlying connections more than
248          * once, so keep track.
249          */
250         lmv->refcount++;
251         if (lmv->refcount > 1) {
252                 *exp = NULL;
253                 RETURN(0);
254         }
255
256         rc = class_connect(&conn, obd, cluuid);
257         if (rc) {
258                 CERROR("class_connection() returned %d\n", rc);
259                 RETURN(rc);
260         }
261
262         *exp = class_conn2export(&conn);
263         class_export_get(*exp);
264
265         lmv->exp = *exp;
266         lmv->connected = 0;
267         lmv->cluuid = *cluuid;
268
269         if (data)
270                 lmv->conn_data = *data;
271
272 #ifdef __KERNEL__
273         lmv_proc_dir = lprocfs_register("target_obds", obd->obd_proc_entry,
274                                         NULL, NULL);
275         if (IS_ERR(lmv_proc_dir)) {
276                 CERROR("could not register /proc/fs/lustre/%s/%s/target_obds.",
277                        obd->obd_type->typ_name, obd->obd_name);
278                 lmv_proc_dir = NULL;
279         }
280 #endif
281
282         /*
283          * All real clients should perform actual connection right away, because
284          * it is possible, that LMV will not have opportunity to connect targets
285          * and MDC stuff will be called directly, for instance while reading
286          * ../mdc/../kbytesfree procfs file, etc.
287          */
288         if (data->ocd_connect_flags & OBD_CONNECT_REAL)
289                 rc = lmv_check_connect(obd);
290
291 #ifdef __KERNEL__
292         if (rc) {
293                 if (lmv_proc_dir)
294                         lprocfs_remove(&lmv_proc_dir);
295         }
296 #endif
297
298         RETURN(rc);
299 }
300
301 static void lmv_set_timeouts(struct obd_device *obd)
302 {
303         struct lmv_tgt_desc   *tgts;
304         struct lmv_obd        *lmv;
305         int                    i;
306
307         lmv = &obd->u.lmv;
308         if (lmv->server_timeout == 0)
309                 return;
310
311         if (lmv->connected == 0)
312                 return;
313
314         for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgts++) {
315                 if (tgts->ltd_exp == NULL)
316                         continue;
317
318                 obd_set_info_async(NULL, tgts->ltd_exp, sizeof(KEY_INTERMDS),
319                                    KEY_INTERMDS, 0, NULL, NULL);
320         }
321 }
322
323 static int lmv_init_ea_size(struct obd_export *exp, int easize,
324                             int def_easize, int cookiesize)
325 {
326         struct obd_device   *obd = exp->exp_obd;
327         struct lmv_obd      *lmv = &obd->u.lmv;
328         int                  i;
329         int                  rc = 0;
330         int                  change = 0;
331         ENTRY;
332
333         if (lmv->max_easize < easize) {
334                 lmv->max_easize = easize;
335                 change = 1;
336         }
337         if (lmv->max_def_easize < def_easize) {
338                 lmv->max_def_easize = def_easize;
339                 change = 1;
340         }
341         if (lmv->max_cookiesize < cookiesize) {
342                 lmv->max_cookiesize = cookiesize;
343                 change = 1;
344         }
345         if (change == 0)
346                 RETURN(0);
347
348         if (lmv->connected == 0)
349                 RETURN(0);
350
351         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
352                 if (lmv->tgts[i].ltd_exp == NULL) {
353                         CWARN("%s: NULL export for %d\n", obd->obd_name, i);
354                         continue;
355                 }
356
357                 rc = md_init_ea_size(lmv->tgts[i].ltd_exp, easize, def_easize,
358                                      cookiesize);
359                 if (rc) {
360                         CERROR("obd_init_ea_size() failed on MDT target %d, "
361                                "error %d.\n", i, rc);
362                         break;
363                 }
364         }
365         RETURN(rc);
366 }
367
368 #define MAX_STRING_SIZE 128
369
370 int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
371 {
372 #ifdef __KERNEL__
373         struct proc_dir_entry   *lmv_proc_dir;
374 #endif
375         struct lmv_obd          *lmv = &obd->u.lmv;
376         struct obd_uuid         *cluuid = &lmv->cluuid;
377         struct obd_connect_data *mdc_data = NULL;
378         struct obd_uuid          lmv_mdc_uuid = { "LMV_MDC_UUID" };
379         struct obd_device       *mdc_obd;
380         struct obd_export       *mdc_exp;
381         struct lu_fld_target     target;
382         int                      rc;
383         ENTRY;
384
385         mdc_obd = class_find_client_obd(&tgt->ltd_uuid, LUSTRE_MDC_NAME,
386                                         &obd->obd_uuid);
387         if (!mdc_obd) {
388                 CERROR("target %s not attached\n", tgt->ltd_uuid.uuid);
389                 RETURN(-EINVAL);
390         }
391
392         CDEBUG(D_CONFIG, "connect to %s(%s) - %s, %s FOR %s\n",
393                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
394                 tgt->ltd_uuid.uuid, obd->obd_uuid.uuid,
395                 cluuid->uuid);
396
397         if (!mdc_obd->obd_set_up) {
398                 CERROR("target %s is not set up\n", tgt->ltd_uuid.uuid);
399                 RETURN(-EINVAL);
400         }
401
402         rc = obd_connect(NULL, &mdc_exp, mdc_obd, &lmv_mdc_uuid,
403                          &lmv->conn_data, NULL);
404         if (rc) {
405                 CERROR("target %s connect error %d\n", tgt->ltd_uuid.uuid, rc);
406                 RETURN(rc);
407         }
408
409         /*
410          * Init fid sequence client for this mdc and add new fld target.
411          */
412         rc = obd_fid_init(mdc_exp, LUSTRE_SEQ_METADATA);
413         if (rc)
414                 RETURN(rc);
415
416         target.ft_srv = NULL;
417         target.ft_exp = mdc_exp;
418         target.ft_idx = tgt->ltd_idx;
419
420         fld_client_add_target(&lmv->lmv_fld, &target);
421
422         mdc_data = &class_exp2cliimp(mdc_exp)->imp_connect_data;
423
424         rc = obd_register_observer(mdc_obd, obd);
425         if (rc) {
426                 obd_disconnect(mdc_exp);
427                 CERROR("target %s register_observer error %d\n",
428                        tgt->ltd_uuid.uuid, rc);
429                 RETURN(rc);
430         }
431
432         if (obd->obd_observer) {
433                 /*
434                  * Tell the observer about the new target.
435                  */
436                 rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
437                                 OBD_NOTIFY_ACTIVE, (void *)(tgt - lmv->tgts));
438                 if (rc) {
439                         obd_disconnect(mdc_exp);
440                         RETURN(rc);
441                 }
442         }
443
444         tgt->ltd_active = 1;
445         tgt->ltd_exp = mdc_exp;
446         lmv->desc.ld_active_tgt_count++;
447
448         /*
449          * Copy connect data, it may be used later.
450          */
451         lmv->datas[tgt->ltd_idx] = *mdc_data;
452
453         md_init_ea_size(tgt->ltd_exp, lmv->max_easize,
454                         lmv->max_def_easize, lmv->max_cookiesize);
455
456         CDEBUG(D_CONFIG, "Connected to %s(%s) successfully (%d)\n",
457                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
458                 cfs_atomic_read(&obd->obd_refcount));
459
460 #ifdef __KERNEL__
461         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
462         if (lmv_proc_dir) {
463                 struct proc_dir_entry *mdc_symlink;
464
465                 LASSERT(mdc_obd->obd_type != NULL);
466                 LASSERT(mdc_obd->obd_type->typ_name != NULL);
467                 mdc_symlink = lprocfs_add_symlink(mdc_obd->obd_name,
468                                                   lmv_proc_dir,
469                                                   "../../../%s/%s",
470                                                   mdc_obd->obd_type->typ_name,
471                                                   mdc_obd->obd_name);
472                 if (mdc_symlink == NULL) {
473                         CERROR("Could not register LMV target "
474                                "/proc/fs/lustre/%s/%s/target_obds/%s.",
475                                obd->obd_type->typ_name, obd->obd_name,
476                                mdc_obd->obd_name);
477                         lprocfs_remove(&lmv_proc_dir);
478                         lmv_proc_dir = NULL;
479                 }
480         }
481 #endif
482         RETURN(0);
483 }
484
485 int lmv_add_target(struct obd_device *obd, struct obd_uuid *tgt_uuid)
486 {
487         struct lmv_obd      *lmv = &obd->u.lmv;
488         struct lmv_tgt_desc *tgt;
489         int                  rc = 0;
490         ENTRY;
491
492         CDEBUG(D_CONFIG, "Target uuid: %s.\n", tgt_uuid->uuid);
493
494         lmv_init_lock(lmv);
495
496         if (lmv->desc.ld_active_tgt_count >= LMV_MAX_TGT_COUNT) {
497                 lmv_init_unlock(lmv);
498                 CERROR("Can't add %s, LMV module compiled for %d MDCs. "
499                        "That many MDCs already configured.\n",
500                        tgt_uuid->uuid, LMV_MAX_TGT_COUNT);
501                 RETURN(-EINVAL);
502         }
503         if (lmv->desc.ld_tgt_count == 0) {
504                 struct obd_device *mdc_obd;
505
506                 mdc_obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME,
507                                                 &obd->obd_uuid);
508                 if (!mdc_obd) {
509                         lmv_init_unlock(lmv);
510                         CERROR("Target %s not attached\n", tgt_uuid->uuid);
511                         RETURN(-EINVAL);
512                 }
513         }
514         spin_lock(&lmv->lmv_lock);
515         tgt = lmv->tgts + lmv->desc.ld_tgt_count++;
516         tgt->ltd_uuid = *tgt_uuid;
517         spin_unlock(&lmv->lmv_lock);
518
519         if (lmv->connected) {
520                 rc = lmv_connect_mdc(obd, tgt);
521                 if (rc) {
522                         spin_lock(&lmv->lmv_lock);
523                         lmv->desc.ld_tgt_count--;
524                         memset(tgt, 0, sizeof(*tgt));
525                         spin_unlock(&lmv->lmv_lock);
526                 } else {
527                         int easize = sizeof(struct lmv_stripe_md) +
528                                      lmv->desc.ld_tgt_count *
529                                      sizeof(struct lu_fid);
530                         lmv_init_ea_size(obd->obd_self_export, easize, 0, 0);
531                 }
532         }
533
534         lmv_init_unlock(lmv);
535         RETURN(rc);
536 }
537
538 int lmv_check_connect(struct obd_device *obd)
539 {
540         struct lmv_obd       *lmv = &obd->u.lmv;
541         struct lmv_tgt_desc  *tgt;
542         int                   i;
543         int                   rc;
544         int                   easize;
545         ENTRY;
546
547         if (lmv->connected)
548                 RETURN(0);
549
550         lmv_init_lock(lmv);
551         if (lmv->connected) {
552                 lmv_init_unlock(lmv);
553                 RETURN(0);
554         }
555
556         if (lmv->desc.ld_tgt_count == 0) {
557                 lmv_init_unlock(lmv);
558                 CERROR("%s: no targets configured.\n", obd->obd_name);
559                 RETURN(-EINVAL);
560         }
561
562         CDEBUG(D_CONFIG, "Time to connect %s to %s\n",
563                lmv->cluuid.uuid, obd->obd_name);
564
565         LASSERT(lmv->tgts != NULL);
566
567         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
568                 rc = lmv_connect_mdc(obd, tgt);
569                 if (rc)
570                         GOTO(out_disc, rc);
571         }
572
573         lmv_set_timeouts(obd);
574         class_export_put(lmv->exp);
575         lmv->connected = 1;
576         easize = lmv_get_easize(lmv);
577         lmv_init_ea_size(obd->obd_self_export, easize, 0, 0);
578         lmv_init_unlock(lmv);
579         RETURN(0);
580
581  out_disc:
582         while (i-- > 0) {
583                 int rc2;
584                 --tgt;
585                 tgt->ltd_active = 0;
586                 if (tgt->ltd_exp) {
587                         --lmv->desc.ld_active_tgt_count;
588                         rc2 = obd_disconnect(tgt->ltd_exp);
589                         if (rc2) {
590                                 CERROR("LMV target %s disconnect on "
591                                        "MDC idx %d: error %d\n",
592                                        tgt->ltd_uuid.uuid, i, rc2);
593                         }
594                 }
595         }
596         class_disconnect(lmv->exp);
597         lmv_init_unlock(lmv);
598         RETURN(rc);
599 }
600
601 static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
602 {
603 #ifdef __KERNEL__
604         struct proc_dir_entry  *lmv_proc_dir;
605 #endif
606         struct lmv_obd         *lmv = &obd->u.lmv;
607         struct obd_device      *mdc_obd;
608         int                     rc;
609         ENTRY;
610
611         LASSERT(tgt != NULL);
612         LASSERT(obd != NULL);
613
614         mdc_obd = class_exp2obd(tgt->ltd_exp);
615
616         if (mdc_obd) {
617                 mdc_obd->obd_force = obd->obd_force;
618                 mdc_obd->obd_fail = obd->obd_fail;
619                 mdc_obd->obd_no_recov = obd->obd_no_recov;
620         }
621
622 #ifdef __KERNEL__
623         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
624         if (lmv_proc_dir) {
625                 struct proc_dir_entry *mdc_symlink;
626
627                 mdc_symlink = lprocfs_srch(lmv_proc_dir, mdc_obd->obd_name);
628                 if (mdc_symlink) {
629                         lprocfs_remove(&mdc_symlink);
630                 } else {
631                         CERROR("/proc/fs/lustre/%s/%s/target_obds/%s missing\n",
632                                obd->obd_type->typ_name, obd->obd_name,
633                                mdc_obd->obd_name);
634                 }
635         }
636 #endif
637         rc = obd_fid_fini(tgt->ltd_exp);
638         if (rc)
639                 CERROR("Can't finanize fids factory\n");
640
641         CDEBUG(D_INFO, "Disconnected from %s(%s) successfully\n",
642                tgt->ltd_exp->exp_obd->obd_name,
643                tgt->ltd_exp->exp_obd->obd_uuid.uuid);
644
645         obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
646         rc = obd_disconnect(tgt->ltd_exp);
647         if (rc) {
648                 if (tgt->ltd_active) {
649                         CERROR("Target %s disconnect error %d\n",
650                                tgt->ltd_uuid.uuid, rc);
651                 }
652         }
653
654         lmv_activate_target(lmv, tgt, 0);
655         tgt->ltd_exp = NULL;
656         RETURN(0);
657 }
658
659 static int lmv_disconnect(struct obd_export *exp)
660 {
661         struct obd_device     *obd = class_exp2obd(exp);
662 #ifdef __KERNEL__
663         struct proc_dir_entry *lmv_proc_dir;
664 #endif
665         struct lmv_obd        *lmv = &obd->u.lmv;
666         int                    rc;
667         int                    i;
668         ENTRY;
669
670         if (!lmv->tgts)
671                 goto out_local;
672
673         /*
674          * Only disconnect the underlying layers on the final disconnect.
675          */
676         lmv->refcount--;
677         if (lmv->refcount != 0)
678                 goto out_local;
679
680         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
681                 if (lmv->tgts[i].ltd_exp == NULL)
682                         continue;
683                 lmv_disconnect_mdc(obd, &lmv->tgts[i]);
684         }
685
686 #ifdef __KERNEL__
687         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
688         if (lmv_proc_dir) {
689                 lprocfs_remove(&lmv_proc_dir);
690         } else {
691                 CERROR("/proc/fs/lustre/%s/%s/target_obds missing\n",
692                        obd->obd_type->typ_name, obd->obd_name);
693         }
694 #endif
695
696 out_local:
697         /*
698          * This is the case when no real connection is established by
699          * lmv_check_connect().
700          */
701         if (!lmv->connected)
702                 class_export_put(exp);
703         rc = class_disconnect(exp);
704         if (lmv->refcount == 0)
705                 lmv->connected = 0;
706         RETURN(rc);
707 }
708
709 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
710                          int len, void *karg, void *uarg)
711 {
712         struct obd_device    *obddev = class_exp2obd(exp);
713         struct lmv_obd       *lmv = &obddev->u.lmv;
714         int                   i = 0;
715         int                   rc = 0;
716         int                   set = 0;
717         int                   count = lmv->desc.ld_tgt_count;
718         ENTRY;
719
720         if (count == 0)
721                 RETURN(-ENOTTY);
722
723         switch (cmd) {
724         case IOC_OBD_STATFS: {
725                 struct obd_ioctl_data *data = karg;
726                 struct obd_device *mdc_obd;
727                 struct obd_statfs stat_buf = {0};
728                 __u32 index;
729
730                 memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
731                 if ((index >= count))
732                         RETURN(-ENODEV);
733
734                 if (!lmv->tgts[index].ltd_active)
735                         RETURN(-ENODATA);
736
737                 mdc_obd = class_exp2obd(lmv->tgts[index].ltd_exp);
738                 if (!mdc_obd)
739                         RETURN(-EINVAL);
740
741                 /* copy UUID */
742                 if (cfs_copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
743                                      min((int) data->ioc_plen2,
744                                          (int) sizeof(struct obd_uuid))))
745                         RETURN(-EFAULT);
746
747                 rc = obd_statfs(NULL, lmv->tgts[index].ltd_exp, &stat_buf,
748                                 cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
749                                 0);
750                 if (rc)
751                         RETURN(rc);
752                 if (cfs_copy_to_user(data->ioc_pbuf1, &stat_buf,
753                                      min((int) data->ioc_plen1,
754                                          (int) sizeof(stat_buf))))
755                         RETURN(-EFAULT);
756                 break;
757         }
758         case OBD_IOC_QUOTACTL: {
759                 struct if_quotactl *qctl = karg;
760                 struct lmv_tgt_desc *tgt = NULL;
761                 struct obd_quotactl *oqctl;
762
763                 if (qctl->qc_valid == QC_MDTIDX) {
764                         if (qctl->qc_idx < 0 || count <= qctl->qc_idx)
765                                 RETURN(-EINVAL);
766
767                         tgt = &lmv->tgts[qctl->qc_idx];
768                         if (!tgt->ltd_exp)
769                                 RETURN(-EINVAL);
770                 } else if (qctl->qc_valid == QC_UUID) {
771                         for (i = 0; i < count; i++) {
772                                 tgt = &lmv->tgts[i];
773                                 if (!obd_uuid_equals(&tgt->ltd_uuid,
774                                                      &qctl->obd_uuid))
775                                         continue;
776
777                                 if (tgt->ltd_exp == NULL)
778                                         RETURN(-EINVAL);
779
780                                 break;
781                         }
782                 } else {
783                         RETURN(-EINVAL);
784                 }
785
786                 if (i >= count)
787                         RETURN(-EAGAIN);
788
789                 LASSERT(tgt && tgt->ltd_exp);
790                 OBD_ALLOC_PTR(oqctl);
791                 if (!oqctl)
792                         RETURN(-ENOMEM);
793
794                 QCTL_COPY(oqctl, qctl);
795                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
796                 if (rc == 0) {
797                         QCTL_COPY(qctl, oqctl);
798                         qctl->qc_valid = QC_MDTIDX;
799                         qctl->obd_uuid = tgt->ltd_uuid;
800                 }
801                 OBD_FREE_PTR(oqctl);
802                 break;
803         }
804         case OBD_IOC_CHANGELOG_SEND:
805         case OBD_IOC_CHANGELOG_CLEAR: {
806                 struct ioc_changelog *icc = karg;
807
808                 if (icc->icc_mdtindex >= count)
809                         RETURN(-ENODEV);
810
811                 rc = obd_iocontrol(cmd, lmv->tgts[icc->icc_mdtindex].ltd_exp,
812                                    sizeof(*icc), icc, NULL);
813                 break;
814         }
815         case LL_IOC_GET_CONNECT_FLAGS: {
816                 rc = obd_iocontrol(cmd, lmv->tgts[0].ltd_exp, len, karg, uarg);
817                 break;
818         }
819         case OBD_IOC_FID2PATH: {
820                 struct getinfo_fid2path *gf;
821                 struct lmv_tgt_desc     *tgt;
822
823                 gf = (struct getinfo_fid2path *)karg;
824                 tgt = lmv_find_target(lmv, &gf->gf_fid);
825                 if (IS_ERR(tgt))
826                         RETURN(PTR_ERR(tgt));
827                 rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
828                 break;
829         }
830         default : {
831                 for (i = 0; i < count; i++) {
832                         int err;
833                         struct obd_device *mdc_obd;
834
835                         if (lmv->tgts[i].ltd_exp == NULL)
836                                 continue;
837                         /* ll_umount_begin() sets force flag but for lmv, not
838                          * mdc. Let's pass it through */
839                         mdc_obd = class_exp2obd(lmv->tgts[i].ltd_exp);
840                         mdc_obd->obd_force = obddev->obd_force;
841                         err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp, len,
842                                             karg, uarg);
843                         if (err == -ENODATA && cmd == OBD_IOC_POLL_QUOTACHECK) {
844                                 RETURN(err);
845                         } else if (err) {
846                                 if (lmv->tgts[i].ltd_active) {
847                                         CERROR("error: iocontrol MDC %s on MDT"
848                                                "idx %d cmd %x: err = %d\n",
849                                                 lmv->tgts[i].ltd_uuid.uuid,
850                                                 i, cmd, err);
851                                         if (!rc)
852                                                 rc = err;
853                                 }
854                         } else
855                                 set = 1;
856                 }
857                 if (!set && !rc)
858                         rc = -EIO;
859         }
860         }
861         RETURN(rc);
862 }
863
864 #if 0
865 static int lmv_all_chars_policy(int count, const char *name,
866                                 int len)
867 {
868         unsigned int c = 0;
869
870         while (len > 0)
871                 c += name[--len];
872         c = c % count;
873         return c;
874 }
875
876 static int lmv_nid_policy(struct lmv_obd *lmv)
877 {
878         struct obd_import *imp;
879         __u32              id;
880
881         /*
882          * XXX: To get nid we assume that underlying obd device is mdc.
883          */
884         imp = class_exp2cliimp(lmv->tgts[0].ltd_exp);
885         id = imp->imp_connection->c_self ^ (imp->imp_connection->c_self >> 32);
886         return id % lmv->desc.ld_tgt_count;
887 }
888
889 static int lmv_choose_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
890                           placement_policy_t placement)
891 {
892         switch (placement) {
893         case PLACEMENT_CHAR_POLICY:
894                 return lmv_all_chars_policy(lmv->desc.ld_tgt_count,
895                                             op_data->op_name,
896                                             op_data->op_namelen);
897         case PLACEMENT_NID_POLICY:
898                 return lmv_nid_policy(lmv);
899
900         default:
901                 break;
902         }
903
904         CERROR("Unsupported placement policy %x\n", placement);
905         return -EINVAL;
906 }
907 #endif
908
909 /**
910  * This is _inode_ placement policy function (not name).
911  */
912 static int lmv_placement_policy(struct obd_device *obd,
913                                 struct md_op_data *op_data,
914                                 mdsno_t *mds)
915 {
916         LASSERT(mds != NULL);
917
918         /* Allocate new fid on target according to to different
919          * QOS policy. In DNE phase I, llite should always tell
920          * which MDT where the dir will be located */
921         *mds = op_data->op_mds;
922
923         RETURN(0);
924 }
925
926 int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
927                     mdsno_t mds)
928 {
929         struct lmv_tgt_desc *tgt;
930         int                  rc;
931         ENTRY;
932
933         tgt = lmv_get_target(lmv, mds);
934
935         /*
936          * New seq alloc and FLD setup should be atomic. Otherwise we may find
937          * on server that seq in new allocated fid is not yet known.
938          */
939         mutex_lock(&tgt->ltd_fid_mutex);
940
941         if (!tgt->ltd_active)
942                 GOTO(out, rc = -ENODEV);
943
944         /*
945          * Asking underlaying tgt layer to allocate new fid.
946          */
947         rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL);
948         if (rc > 0) {
949                 LASSERT(fid_is_sane(fid));
950                 rc = 0;
951         }
952
953         EXIT;
954 out:
955         mutex_unlock(&tgt->ltd_fid_mutex);
956         return rc;
957 }
958
959 int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
960                   struct md_op_data *op_data)
961 {
962         struct obd_device     *obd = class_exp2obd(exp);
963         struct lmv_obd        *lmv = &obd->u.lmv;
964         mdsno_t                mds = 0;
965         int                    rc;
966         ENTRY;
967
968         LASSERT(op_data != NULL);
969         LASSERT(fid != NULL);
970
971         rc = lmv_placement_policy(obd, op_data, &mds);
972         if (rc) {
973                 CERROR("Can't get target for allocating fid, "
974                        "rc %d\n", rc);
975                 RETURN(rc);
976         }
977
978         rc = __lmv_fid_alloc(lmv, fid, mds);
979         if (rc) {
980                 CERROR("Can't alloc new fid, rc %d\n", rc);
981                 RETURN(rc);
982         }
983
984         RETURN(rc);
985 }
986
987 static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
988 {
989         struct lmv_obd             *lmv = &obd->u.lmv;
990         struct lprocfs_static_vars  lvars;
991         struct lmv_desc            *desc;
992         int                         rc;
993         int                         i = 0;
994         ENTRY;
995
996         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
997                 CERROR("LMV setup requires a descriptor\n");
998                 RETURN(-EINVAL);
999         }
1000
1001         desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
1002         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
1003                 CERROR("Lmv descriptor size wrong: %d > %d\n",
1004                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
1005                 RETURN(-EINVAL);
1006         }
1007
1008         lmv->tgts_size = LMV_MAX_TGT_COUNT * sizeof(struct lmv_tgt_desc);
1009
1010         OBD_ALLOC(lmv->tgts, lmv->tgts_size);
1011         if (lmv->tgts == NULL)
1012                 RETURN(-ENOMEM);
1013
1014         for (i = 0; i < LMV_MAX_TGT_COUNT; i++) {
1015                 mutex_init(&lmv->tgts[i].ltd_fid_mutex);
1016                 lmv->tgts[i].ltd_idx = i;
1017         }
1018
1019         lmv->datas_size = LMV_MAX_TGT_COUNT * sizeof(struct obd_connect_data);
1020
1021         OBD_ALLOC(lmv->datas, lmv->datas_size);
1022         if (lmv->datas == NULL)
1023                 GOTO(out_free_tgts, rc = -ENOMEM);
1024
1025         obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid);
1026         lmv->desc.ld_tgt_count = 0;
1027         lmv->desc.ld_active_tgt_count = 0;
1028         lmv->max_cookiesize = 0;
1029         lmv->max_def_easize = 0;
1030         lmv->max_easize = 0;
1031         lmv->lmv_placement = PLACEMENT_CHAR_POLICY;
1032
1033         spin_lock_init(&lmv->lmv_lock);
1034         mutex_init(&lmv->init_mutex);
1035
1036         lprocfs_lmv_init_vars(&lvars);
1037         lprocfs_obd_setup(obd, lvars.obd_vars);
1038 #ifdef LPROCFS
1039         {
1040                 rc = lprocfs_seq_create(obd->obd_proc_entry, "target_obd",
1041                                         0444, &lmv_proc_target_fops, obd);
1042                 if (rc)
1043                         CWARN("%s: error adding LMV target_obd file: rc = %d\n",
1044                                obd->obd_name, rc);
1045        }
1046 #endif
1047         rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
1048                              LUSTRE_CLI_FLD_HASH_DHT);
1049         if (rc) {
1050                 CERROR("Can't init FLD, err %d\n", rc);
1051                 GOTO(out_free_datas, rc);
1052         }
1053
1054         RETURN(0);
1055
1056 out_free_datas:
1057         OBD_FREE(lmv->datas, lmv->datas_size);
1058         lmv->datas = NULL;
1059 out_free_tgts:
1060         OBD_FREE(lmv->tgts, lmv->tgts_size);
1061         lmv->tgts = NULL;
1062         return rc;
1063 }
1064
1065 static int lmv_cleanup(struct obd_device *obd)
1066 {
1067         struct lmv_obd   *lmv = &obd->u.lmv;
1068         ENTRY;
1069
1070         fld_client_fini(&lmv->lmv_fld);
1071         OBD_FREE(lmv->datas, lmv->datas_size);
1072         OBD_FREE(lmv->tgts, lmv->tgts_size);
1073
1074         RETURN(0);
1075 }
1076
1077 static int lmv_process_config(struct obd_device *obd, obd_count len, void *buf)
1078 {
1079         struct lustre_cfg     *lcfg = buf;
1080         struct obd_uuid        tgt_uuid;
1081         int                    rc;
1082         ENTRY;
1083
1084         switch(lcfg->lcfg_command) {
1085         case LCFG_ADD_MDC:
1086                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(tgt_uuid.uuid))
1087                         GOTO(out, rc = -EINVAL);
1088
1089                 obd_str2uuid(&tgt_uuid, lustre_cfg_string(lcfg, 1));
1090                 rc = lmv_add_target(obd, &tgt_uuid);
1091                 GOTO(out, rc);
1092         default: {
1093                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
1094                 GOTO(out, rc = -EINVAL);
1095         }
1096         }
1097 out:
1098         RETURN(rc);
1099 }
1100
1101 static int lmv_statfs(const struct lu_env *env, struct obd_export *exp,
1102                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
1103 {
1104         struct obd_device     *obd = class_exp2obd(exp);
1105         struct lmv_obd        *lmv = &obd->u.lmv;
1106         struct obd_statfs     *temp;
1107         int                    rc = 0;
1108         int                    i;
1109         ENTRY;
1110
1111         rc = lmv_check_connect(obd);
1112         if (rc)
1113                 RETURN(rc);
1114
1115         OBD_ALLOC(temp, sizeof(*temp));
1116         if (temp == NULL)
1117                 RETURN(-ENOMEM);
1118
1119         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1120                 if (lmv->tgts[i].ltd_exp == NULL)
1121                         continue;
1122
1123                 rc = obd_statfs(env, lmv->tgts[i].ltd_exp, temp,
1124                                 max_age, flags);
1125                 if (rc) {
1126                         CERROR("can't stat MDS #%d (%s), error %d\n", i,
1127                                lmv->tgts[i].ltd_exp->exp_obd->obd_name,
1128                                rc);
1129                         GOTO(out_free_temp, rc);
1130                 }
1131
1132                 if (i == 0) {
1133                         *osfs = *temp;
1134                         /* If the statfs is from mount, it will needs
1135                          * retrieve necessary information from MDT0.
1136                          * i.e. mount does not need the merged osfs
1137                          * from all of MDT.
1138                          * And also clients can be mounted as long as
1139                          * MDT0 is in service*/
1140                         if (flags & OBD_STATFS_FOR_MDT0)
1141                                 GOTO(out_free_temp, rc);
1142                 } else {
1143                         osfs->os_bavail += temp->os_bavail;
1144                         osfs->os_blocks += temp->os_blocks;
1145                         osfs->os_ffree += temp->os_ffree;
1146                         osfs->os_files += temp->os_files;
1147                 }
1148         }
1149
1150         EXIT;
1151 out_free_temp:
1152         OBD_FREE(temp, sizeof(*temp));
1153         return rc;
1154 }
1155
1156 static int lmv_getstatus(struct obd_export *exp,
1157                          struct lu_fid *fid,
1158                          struct obd_capa **pc)
1159 {
1160         struct obd_device    *obd = exp->exp_obd;
1161         struct lmv_obd       *lmv = &obd->u.lmv;
1162         int                   rc;
1163         ENTRY;
1164
1165         rc = lmv_check_connect(obd);
1166         if (rc)
1167                 RETURN(rc);
1168
1169         rc = md_getstatus(lmv->tgts[0].ltd_exp, fid, pc);
1170         RETURN(rc);
1171 }
1172
1173 static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid,
1174                         struct obd_capa *oc, obd_valid valid, const char *name,
1175                         const char *input, int input_size, int output_size,
1176                         int flags, struct ptlrpc_request **request)
1177 {
1178         struct obd_device      *obd = exp->exp_obd;
1179         struct lmv_obd         *lmv = &obd->u.lmv;
1180         struct lmv_tgt_desc    *tgt;
1181         int                     rc;
1182         ENTRY;
1183
1184         rc = lmv_check_connect(obd);
1185         if (rc)
1186                 RETURN(rc);
1187
1188         tgt = lmv_find_target(lmv, fid);
1189         if (IS_ERR(tgt))
1190                 RETURN(PTR_ERR(tgt));
1191
1192         rc = md_getxattr(tgt->ltd_exp, fid, oc, valid, name, input,
1193                          input_size, output_size, flags, request);
1194
1195         RETURN(rc);
1196 }
1197
1198 static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
1199                         struct obd_capa *oc, obd_valid valid, const char *name,
1200                         const char *input, int input_size, int output_size,
1201                         int flags, __u32 suppgid,
1202                         struct ptlrpc_request **request)
1203 {
1204         struct obd_device      *obd = exp->exp_obd;
1205         struct lmv_obd         *lmv = &obd->u.lmv;
1206         struct lmv_tgt_desc    *tgt;
1207         int                     rc;
1208         ENTRY;
1209
1210         rc = lmv_check_connect(obd);
1211         if (rc)
1212                 RETURN(rc);
1213
1214         tgt = lmv_find_target(lmv, fid);
1215         if (IS_ERR(tgt))
1216                 RETURN(PTR_ERR(tgt));
1217
1218         rc = md_setxattr(tgt->ltd_exp, fid, oc, valid, name, input,
1219                          input_size, output_size, flags, suppgid,
1220                          request);
1221
1222         RETURN(rc);
1223 }
1224
1225 static int lmv_getattr(struct obd_export *exp, struct md_op_data *op_data,
1226                        struct ptlrpc_request **request)
1227 {
1228         struct obd_device       *obd = exp->exp_obd;
1229         struct lmv_obd          *lmv = &obd->u.lmv;
1230         struct lmv_tgt_desc     *tgt;
1231         int                      rc;
1232         ENTRY;
1233
1234         rc = lmv_check_connect(obd);
1235         if (rc)
1236                 RETURN(rc);
1237
1238         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1239         if (IS_ERR(tgt))
1240                 RETURN(PTR_ERR(tgt));
1241
1242         if (op_data->op_flags & MF_GET_MDT_IDX) {
1243                 op_data->op_mds = tgt->ltd_idx;
1244                 RETURN(0);
1245         }
1246
1247         rc = md_getattr(tgt->ltd_exp, op_data, request);
1248
1249         RETURN(rc);
1250 }
1251
1252 static int lmv_change_cbdata(struct obd_export *exp, const struct lu_fid *fid,
1253                              ldlm_iterator_t it, void *data)
1254 {
1255         struct obd_device   *obd = exp->exp_obd;
1256         struct lmv_obd      *lmv = &obd->u.lmv;
1257         int                  i;
1258         int                  rc;
1259         ENTRY;
1260
1261         rc = lmv_check_connect(obd);
1262         if (rc)
1263                 RETURN(rc);
1264
1265         CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
1266
1267         /*
1268          * With CMD every object can have two locks in different namespaces:
1269          * lookup lock in space of mds storing direntry and update/open lock in
1270          * space of mds storing inode.
1271          */
1272         for (i = 0; i < lmv->desc.ld_tgt_count; i++)
1273                 md_change_cbdata(lmv->tgts[i].ltd_exp, fid, it, data);
1274
1275         RETURN(0);
1276 }
1277
1278 static int lmv_find_cbdata(struct obd_export *exp, const struct lu_fid *fid,
1279                            ldlm_iterator_t it, void *data)
1280 {
1281         struct obd_device   *obd = exp->exp_obd;
1282         struct lmv_obd      *lmv = &obd->u.lmv;
1283         int                  i;
1284         int                  rc;
1285         ENTRY;
1286
1287         rc = lmv_check_connect(obd);
1288         if (rc)
1289                 RETURN(rc);
1290
1291         CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
1292
1293         /*
1294          * With CMD every object can have two locks in different namespaces:
1295          * lookup lock in space of mds storing direntry and update/open lock in
1296          * space of mds storing inode.
1297          */
1298         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1299                 rc = md_find_cbdata(lmv->tgts[i].ltd_exp, fid, it, data);
1300                 if (rc)
1301                         RETURN(rc);
1302         }
1303
1304         RETURN(rc);
1305 }
1306
1307
1308 static int lmv_close(struct obd_export *exp, struct md_op_data *op_data,
1309                      struct md_open_data *mod, struct ptlrpc_request **request)
1310 {
1311         struct obd_device     *obd = exp->exp_obd;
1312         struct lmv_obd        *lmv = &obd->u.lmv;
1313         struct lmv_tgt_desc   *tgt;
1314         int                    rc;
1315         ENTRY;
1316
1317         rc = lmv_check_connect(obd);
1318         if (rc)
1319                 RETURN(rc);
1320
1321         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1322         if (IS_ERR(tgt))
1323                 RETURN(PTR_ERR(tgt));
1324
1325         CDEBUG(D_INODE, "CLOSE "DFID"\n", PFID(&op_data->op_fid1));
1326         rc = md_close(tgt->ltd_exp, op_data, mod, request);
1327         RETURN(rc);
1328 }
1329
1330 struct lmv_tgt_desc
1331 *lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
1332                 struct lu_fid *fid)
1333 {
1334         struct lmv_tgt_desc *tgt;
1335
1336         tgt = lmv_find_target(lmv, fid);
1337         op_data->op_mds = tgt->ltd_idx;
1338
1339         return tgt;
1340 }
1341
1342 int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
1343                const void *data, int datalen, int mode, __u32 uid,
1344                __u32 gid, cfs_cap_t cap_effective, __u64 rdev,
1345                struct ptlrpc_request **request)
1346 {
1347         struct obd_device       *obd = exp->exp_obd;
1348         struct lmv_obd          *lmv = &obd->u.lmv;
1349         struct lmv_tgt_desc     *tgt;
1350         int                      rc;
1351         ENTRY;
1352
1353         rc = lmv_check_connect(obd);
1354         if (rc)
1355                 RETURN(rc);
1356
1357         if (!lmv->desc.ld_active_tgt_count)
1358                 RETURN(-EIO);
1359
1360         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1361         if (IS_ERR(tgt))
1362                 RETURN(PTR_ERR(tgt));
1363
1364         rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data);
1365         if (rc)
1366                 RETURN(rc);
1367
1368         CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #%x\n",
1369                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1370                op_data->op_mds);
1371
1372         op_data->op_flags |= MF_MDC_CANCEL_FID1;
1373         rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid,
1374                        cap_effective, rdev, request);
1375
1376         if (rc == 0) {
1377                 if (*request == NULL)
1378                         RETURN(rc);
1379                 CDEBUG(D_INODE, "Created - "DFID"\n", PFID(&op_data->op_fid2));
1380         }
1381         RETURN(rc);
1382 }
1383
1384 static int lmv_done_writing(struct obd_export *exp,
1385                             struct md_op_data *op_data,
1386                             struct md_open_data *mod)
1387 {
1388         struct obd_device     *obd = exp->exp_obd;
1389         struct lmv_obd        *lmv = &obd->u.lmv;
1390         struct lmv_tgt_desc   *tgt;
1391         int                    rc;
1392         ENTRY;
1393
1394         rc = lmv_check_connect(obd);
1395         if (rc)
1396                 RETURN(rc);
1397
1398         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1399         if (IS_ERR(tgt))
1400                 RETURN(PTR_ERR(tgt));
1401
1402         rc = md_done_writing(tgt->ltd_exp, op_data, mod);
1403         RETURN(rc);
1404 }
1405
1406 static int
1407 lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1408                    struct lookup_intent *it, struct md_op_data *op_data,
1409                    struct lustre_handle *lockh, void *lmm, int lmmsize,
1410                    int extra_lock_flags)
1411 {
1412         struct ptlrpc_request      *req = it->d.lustre.it_data;
1413         struct obd_device          *obd = exp->exp_obd;
1414         struct lmv_obd             *lmv = &obd->u.lmv;
1415         struct lustre_handle        plock;
1416         struct lmv_tgt_desc        *tgt;
1417         struct md_op_data          *rdata;
1418         struct lu_fid               fid1;
1419         struct mdt_body            *body;
1420         int                         rc = 0;
1421         int                         pmode;
1422         ENTRY;
1423
1424         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1425         LASSERT(body != NULL);
1426
1427         if (!(body->valid & OBD_MD_MDS))
1428                 RETURN(0);
1429
1430         CDEBUG(D_INODE, "REMOTE_ENQUEUE '%s' on "DFID" -> "DFID"\n",
1431                LL_IT2STR(it), PFID(&op_data->op_fid1), PFID(&body->fid1));
1432
1433         /*
1434          * We got LOOKUP lock, but we really need attrs.
1435          */
1436         pmode = it->d.lustre.it_lock_mode;
1437         LASSERT(pmode != 0);
1438         memcpy(&plock, lockh, sizeof(plock));
1439         it->d.lustre.it_lock_mode = 0;
1440         it->d.lustre.it_data = NULL;
1441         fid1 = body->fid1;
1442
1443         it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
1444         ptlrpc_req_finished(req);
1445
1446         tgt = lmv_find_target(lmv, &fid1);
1447         if (IS_ERR(tgt))
1448                 GOTO(out, rc = PTR_ERR(tgt));
1449
1450         OBD_ALLOC_PTR(rdata);
1451         if (rdata == NULL)
1452                 GOTO(out, rc = -ENOMEM);
1453
1454         rdata->op_fid1 = fid1;
1455         rdata->op_bias = MDS_CROSS_REF;
1456
1457         rc = md_enqueue(tgt->ltd_exp, einfo, it, rdata, lockh,
1458                         lmm, lmmsize, NULL, extra_lock_flags);
1459         OBD_FREE_PTR(rdata);
1460         EXIT;
1461 out:
1462         ldlm_lock_decref(&plock, pmode);
1463         return rc;
1464 }
1465
1466 static int
1467 lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1468             struct lookup_intent *it, struct md_op_data *op_data,
1469             struct lustre_handle *lockh, void *lmm, int lmmsize,
1470             struct ptlrpc_request **req, __u64 extra_lock_flags)
1471 {
1472         struct obd_device        *obd = exp->exp_obd;
1473         struct lmv_obd           *lmv = &obd->u.lmv;
1474         struct lmv_tgt_desc      *tgt;
1475         int                       rc;
1476         ENTRY;
1477
1478         rc = lmv_check_connect(obd);
1479         if (rc)
1480                 RETURN(rc);
1481
1482         CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID"\n",
1483                LL_IT2STR(it), PFID(&op_data->op_fid1));
1484
1485         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1486         if (IS_ERR(tgt))
1487                 RETURN(PTR_ERR(tgt));
1488
1489         CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID" -> mds #%d\n",
1490                LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx);
1491
1492         rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data, lockh,
1493                         lmm, lmmsize, req, extra_lock_flags);
1494
1495         if (rc == 0 && it && it->it_op == IT_OPEN) {
1496                 rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh,
1497                                         lmm, lmmsize, extra_lock_flags);
1498         }
1499         RETURN(rc);
1500 }
1501
1502 static int
1503 lmv_getattr_name(struct obd_export *exp,struct md_op_data *op_data,
1504                  struct ptlrpc_request **request)
1505 {
1506         struct ptlrpc_request   *req = NULL;
1507         struct obd_device       *obd = exp->exp_obd;
1508         struct lmv_obd          *lmv = &obd->u.lmv;
1509         struct lmv_tgt_desc     *tgt;
1510         struct mdt_body         *body;
1511         int                      rc;
1512         ENTRY;
1513
1514         rc = lmv_check_connect(obd);
1515         if (rc)
1516                 RETURN(rc);
1517
1518         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1519         if (IS_ERR(tgt))
1520                 RETURN(PTR_ERR(tgt));
1521
1522         CDEBUG(D_INODE, "GETATTR_NAME for %*s on "DFID" -> mds #%d\n",
1523                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1524                tgt->ltd_idx);
1525
1526         rc = md_getattr_name(tgt->ltd_exp, op_data, request);
1527         if (rc != 0)
1528                 RETURN(rc);
1529
1530         body = req_capsule_server_get(&(*request)->rq_pill,
1531                                       &RMF_MDT_BODY);
1532         LASSERT(body != NULL);
1533
1534         if (body->valid & OBD_MD_MDS) {
1535                 struct lu_fid rid = body->fid1;
1536                 CDEBUG(D_INODE, "Request attrs for "DFID"\n",
1537                        PFID(&rid));
1538
1539                 tgt = lmv_find_target(lmv, &rid);
1540                 if (IS_ERR(tgt)) {
1541                         ptlrpc_req_finished(*request);
1542                         RETURN(PTR_ERR(tgt));
1543                 }
1544
1545                 op_data->op_fid1 = rid;
1546                 op_data->op_valid |= OBD_MD_FLCROSSREF;
1547                 op_data->op_namelen = 0;
1548                 op_data->op_name = NULL;
1549                 rc = md_getattr_name(tgt->ltd_exp, op_data, &req);
1550                 ptlrpc_req_finished(*request);
1551                 *request = req;
1552         }
1553
1554         RETURN(rc);
1555 }
1556
1557 #define md_op_data_fid(op_data, fl)                     \
1558         (fl == MF_MDC_CANCEL_FID1 ? &op_data->op_fid1 : \
1559          fl == MF_MDC_CANCEL_FID2 ? &op_data->op_fid2 : \
1560          fl == MF_MDC_CANCEL_FID3 ? &op_data->op_fid3 : \
1561          fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \
1562          NULL)
1563
1564 static int lmv_early_cancel(struct obd_export *exp, struct md_op_data *op_data,
1565                             int op_tgt, ldlm_mode_t mode, int bits, int flag)
1566 {
1567         struct lu_fid          *fid = md_op_data_fid(op_data, flag);
1568         struct obd_device      *obd = exp->exp_obd;
1569         struct lmv_obd         *lmv = &obd->u.lmv;
1570         struct lmv_tgt_desc    *tgt;
1571         ldlm_policy_data_t      policy = {{0}};
1572         int                     rc = 0;
1573         ENTRY;
1574
1575         if (!fid_is_sane(fid))
1576                 RETURN(0);
1577
1578         tgt = lmv_find_target(lmv, fid);
1579         if (IS_ERR(tgt))
1580                 RETURN(PTR_ERR(tgt));
1581
1582         if (tgt->ltd_idx != op_tgt) {
1583                 CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid));
1584                 policy.l_inodebits.bits = bits;
1585                 rc = md_cancel_unused(tgt->ltd_exp, fid, &policy,
1586                                       mode, LCF_ASYNC, NULL);
1587         } else {
1588                 CDEBUG(D_INODE,
1589                        "EARLY_CANCEL skip operation target %d on "DFID"\n",
1590                        op_tgt, PFID(fid));
1591                 op_data->op_flags |= flag;
1592                 rc = 0;
1593         }
1594
1595         RETURN(rc);
1596 }
1597
1598 /*
1599  * llite passes fid of an target inode in op_data->op_fid1 and id of directory in
1600  * op_data->op_fid2
1601  */
1602 static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
1603                     struct ptlrpc_request **request)
1604 {
1605         struct obd_device       *obd = exp->exp_obd;
1606         struct lmv_obd          *lmv = &obd->u.lmv;
1607         struct lmv_tgt_desc     *tgt;
1608         int                      rc;
1609         ENTRY;
1610
1611         rc = lmv_check_connect(obd);
1612         if (rc)
1613                 RETURN(rc);
1614
1615         LASSERT(op_data->op_namelen != 0);
1616
1617         CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n",
1618                PFID(&op_data->op_fid2), op_data->op_namelen,
1619                op_data->op_name, PFID(&op_data->op_fid1));
1620
1621         op_data->op_fsuid = cfs_curproc_fsuid();
1622         op_data->op_fsgid = cfs_curproc_fsgid();
1623         op_data->op_cap = cfs_curproc_cap_pack();
1624         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
1625         if (IS_ERR(tgt))
1626                 RETURN(PTR_ERR(tgt));
1627
1628         /*
1629          * Cancel UPDATE lock on child (fid1).
1630          */
1631         op_data->op_flags |= MF_MDC_CANCEL_FID2;
1632         rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
1633                               MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
1634         if (rc != 0)
1635                 RETURN(rc);
1636
1637         rc = md_link(tgt->ltd_exp, op_data, request);
1638
1639         RETURN(rc);
1640 }
1641
1642 static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
1643                       const char *old, int oldlen, const char *new, int newlen,
1644                       struct ptlrpc_request **request)
1645 {
1646         struct obd_device       *obd = exp->exp_obd;
1647         struct lmv_obd          *lmv = &obd->u.lmv;
1648         struct lmv_tgt_desc     *src_tgt;
1649         struct lmv_tgt_desc     *tgt_tgt;
1650         int                     rc;
1651         ENTRY;
1652
1653         LASSERT(oldlen != 0);
1654
1655         CDEBUG(D_INODE, "RENAME %*s in "DFID" to %*s in "DFID"\n",
1656                oldlen, old, PFID(&op_data->op_fid1),
1657                newlen, new, PFID(&op_data->op_fid2));
1658
1659         rc = lmv_check_connect(obd);
1660         if (rc)
1661                 RETURN(rc);
1662
1663         op_data->op_fsuid = cfs_curproc_fsuid();
1664         op_data->op_fsgid = cfs_curproc_fsgid();
1665         op_data->op_cap = cfs_curproc_cap_pack();
1666         src_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1667         if (IS_ERR(src_tgt))
1668                 RETURN(PTR_ERR(src_tgt));
1669
1670         tgt_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
1671         if (IS_ERR(tgt_tgt))
1672                 RETURN(PTR_ERR(tgt_tgt));
1673         /*
1674          * LOOKUP lock on src child (fid3) should also be cancelled for
1675          * src_tgt in mdc_rename.
1676          */
1677         op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
1678
1679         /*
1680          * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
1681          * own target.
1682          */
1683         rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
1684                               LCK_EX, MDS_INODELOCK_UPDATE,
1685                               MF_MDC_CANCEL_FID2);
1686
1687         /*
1688          * Cancel LOOKUP locks on tgt child (fid4) for parent tgt_tgt.
1689          */
1690         if (rc == 0) {
1691                 rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
1692                                       LCK_EX, MDS_INODELOCK_LOOKUP,
1693                                       MF_MDC_CANCEL_FID4);
1694         }
1695
1696         /*
1697          * Cancel all the locks on tgt child (fid4).
1698          */
1699         if (rc == 0)
1700                 rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
1701                                       LCK_EX, MDS_INODELOCK_FULL,
1702                                       MF_MDC_CANCEL_FID4);
1703
1704         if (rc == 0)
1705                 rc = md_rename(src_tgt->ltd_exp, op_data, old, oldlen,
1706                                new, newlen, request);
1707         RETURN(rc);
1708 }
1709
1710 static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
1711                        void *ea, int ealen, void *ea2, int ea2len,
1712                        struct ptlrpc_request **request,
1713                        struct md_open_data **mod)
1714 {
1715         struct obd_device       *obd = exp->exp_obd;
1716         struct lmv_obd          *lmv = &obd->u.lmv;
1717         struct lmv_tgt_desc     *tgt;
1718         int                      rc = 0;
1719         ENTRY;
1720
1721         rc = lmv_check_connect(obd);
1722         if (rc)
1723                 RETURN(rc);
1724
1725         CDEBUG(D_INODE, "SETATTR for "DFID", valid 0x%x\n",
1726                PFID(&op_data->op_fid1), op_data->op_attr.ia_valid);
1727
1728         op_data->op_flags |= MF_MDC_CANCEL_FID1;
1729         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1730         if (IS_ERR(tgt))
1731                 RETURN(PTR_ERR(tgt));
1732
1733         rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen, ea2,
1734                         ea2len, request, mod);
1735
1736         RETURN(rc);
1737 }
1738
1739 static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
1740                     struct obd_capa *oc, struct ptlrpc_request **request)
1741 {
1742         struct obd_device         *obd = exp->exp_obd;
1743         struct lmv_obd            *lmv = &obd->u.lmv;
1744         struct lmv_tgt_desc       *tgt;
1745         int                        rc;
1746         ENTRY;
1747
1748         rc = lmv_check_connect(obd);
1749         if (rc)
1750                 RETURN(rc);
1751
1752         tgt = lmv_find_target(lmv, fid);
1753         if (IS_ERR(tgt))
1754                 RETURN(PTR_ERR(tgt));
1755
1756         rc = md_sync(tgt->ltd_exp, fid, oc, request);
1757         RETURN(rc);
1758 }
1759
1760 #if 0
1761 static void lmv_hash_adjust(__u64 *hash, __u64 hash_adj)
1762 {
1763         __u64         val;
1764
1765         val = le64_to_cpu(*hash);
1766         if (val < hash_adj)
1767                 val += MAX_HASH_SIZE;
1768         if (val != MDS_DIR_END_OFF)
1769                 *hash = cpu_to_le64(val - hash_adj);
1770 }
1771
1772 static __u32 lmv_node_rank(struct obd_export *exp, const struct lu_fid *fid)
1773 {
1774         __u64              id;
1775         struct obd_import *imp;
1776
1777         /*
1778          * XXX: to get nid we assume that underlying obd device is mdc.
1779          */
1780         imp  = class_exp2cliimp(exp);
1781         id   = imp->imp_connection->c_self + fid_flatten(fid);
1782
1783         CDEBUG(D_INODE, "Readpage node rank: "LPX64" "DFID" "LPX64" "LPX64"\n",
1784                imp->imp_connection->c_self, PFID(fid), id, id ^ (id >> 32));
1785
1786         return id ^ (id >> 32);
1787 }
1788 #endif
1789
1790 static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data,
1791                         struct page **pages, struct ptlrpc_request **request)
1792 {
1793         struct obd_device       *obd = exp->exp_obd;
1794         struct lmv_obd          *lmv = &obd->u.lmv;
1795         __u64                    offset = op_data->op_offset;
1796         int                      rc;
1797         int                      i;
1798         /* number of pages read, in CFS_PAGE_SIZE */
1799         int                      nrdpgs;
1800         /* number of pages transferred in LU_PAGE_SIZE */
1801         int                      nlupgs;
1802         struct lmv_tgt_desc     *tgt;
1803         struct lu_dirpage       *dp;
1804         struct lu_dirent        *ent;
1805         ENTRY;
1806
1807         rc = lmv_check_connect(obd);
1808         if (rc)
1809                 RETURN(rc);
1810
1811         CDEBUG(D_INODE, "READPAGE at "LPX64" from "DFID"\n",
1812                offset, PFID(&op_data->op_fid1));
1813
1814         /*
1815          * This case handle directory lookup in clustered metadata case (i.e.
1816          * split directory is located on multiple md servers.)
1817          * each server keeps directory entries for certain range of hashes.
1818          * E.g. we have N server and suppose hash range is 0 to MAX_HASH.
1819          * first server will keep records with hashes [ 0 ... MAX_HASH /N  - 1],
1820          * second one with hashes [MAX_HASH / N ... 2 * MAX_HASH / N] and
1821          * so on....
1822          *      readdir can simply start reading entries from 0 - N server in
1823          * order but that will not scale well as all client will request dir in
1824          * to server in same order.
1825          * Following algorithm does optimization:
1826          * Instead of doing readdir in 1, 2, ...., N order, client with a
1827          * rank R does readdir in R, R + 1, ..., N, 1, ... R - 1 order.
1828          * (every client has rank R)
1829          *      But ll_readdir() expect offset range [0 to MAX_HASH/N) but
1830          * since client ask dir from MDS{R} client has pages with offsets
1831          * [R*MAX_HASH/N ... (R + 1)*MAX_HASH/N] there for we do hash_adj
1832          * on hash  values that we get.
1833         if (0) {
1834                 LASSERT(nr > 0);
1835                 seg_size = MAX_HASH_SIZE;
1836                 do_div(seg_size, nr);
1837                 los      = obj->lo_stripes;
1838                 tgt      = lmv_get_target(lmv, los[0].ls_mds);
1839                 rank     = lmv_node_rank(tgt->ltd_exp, fid) % nr;
1840                 tgt_tmp  = offset;
1841                 do_div(tgt_tmp, seg_size);
1842                 tgt0_idx = do_div(tgt_tmp,  nr);
1843                 tgt_idx  = (tgt0_idx + rank) % nr;
1844
1845                 if (tgt_idx < tgt0_idx)
1846                          * Wrap around.
1847                          *
1848                          * Last segment has unusual length due to division
1849                          * rounding.
1850                         hash_adj = MAX_HASH_SIZE - seg_size * nr;
1851                 else
1852                         hash_adj = 0;
1853
1854                 hash_adj += rank * seg_size;
1855
1856                 CDEBUG(D_INODE, "Readpage hash adjustment: %x "LPX64" "
1857                        LPX64"/%x -> "LPX64"/%x\n", rank, hash_adj,
1858                        offset, tgt0_idx, offset + hash_adj, tgt_idx);
1859
1860                 offset = (offset + hash_adj) & MAX_HASH_SIZE;
1861                 rid = lsm->mea_oinfo[tgt_idx].lmo_fid;
1862                 tgt = lmv_get_target(lmv, lsm->mea_oinfo[tgt_idx].lmo_mds);
1863
1864                 CDEBUG(D_INODE, "Forward to "DFID" with offset %lu i %d\n",
1865                        PFID(&rid), (unsigned long)offset, tgt_idx);
1866         }
1867         */
1868         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1869         if (IS_ERR(tgt))
1870                 RETURN(PTR_ERR(tgt));
1871
1872         rc = md_readpage(tgt->ltd_exp, op_data, pages, request);
1873         if (rc != 0)
1874                 RETURN(rc);
1875
1876         nrdpgs = ((*request)->rq_bulk->bd_nob_transferred + CFS_PAGE_SIZE - 1)
1877                  >> CFS_PAGE_SHIFT;
1878         nlupgs = (*request)->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT;
1879         LASSERT(!((*request)->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK));
1880         LASSERT(nrdpgs > 0 && nrdpgs <= op_data->op_npages);
1881
1882         CDEBUG(D_INODE, "read %d(%d)/%d pages\n", nrdpgs, nlupgs,
1883                op_data->op_npages);
1884
1885         for (i = 0; i < nrdpgs; i++) {
1886 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
1887                 struct lu_dirpage *first;
1888                 __u64 hash_end = 0;
1889                 __u32 flags = 0;
1890 #endif
1891                 struct lu_dirent *tmp = NULL;
1892
1893                 dp = cfs_kmap(pages[i]);
1894                 ent = lu_dirent_start(dp);
1895 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
1896                 first = dp;
1897                 hash_end = dp->ldp_hash_end;
1898 repeat:
1899 #endif
1900                 nlupgs--;
1901
1902                 for (tmp = ent; ent != NULL;
1903                      tmp = ent, ent = lu_dirent_next(ent));
1904 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
1905                 dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
1906                 if (((unsigned long)dp & ~CFS_PAGE_MASK) && nlupgs > 0) {
1907                         ent = lu_dirent_start(dp);
1908
1909                         if (tmp) {
1910                                 /* enlarge the end entry lde_reclen from 0 to
1911                                  * first entry of next lu_dirpage, in this way
1912                                  * several lu_dirpages can be stored into one
1913                                  * client page on client. */
1914                                 tmp = ((void *)tmp) +
1915                                       le16_to_cpu(tmp->lde_reclen);
1916                                 tmp->lde_reclen =
1917                                         cpu_to_le16((char *)(dp->ldp_entries) -
1918                                                     (char *)tmp);
1919                                 goto repeat;
1920                         }
1921                 }
1922                 first->ldp_hash_end = hash_end;
1923                 first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
1924                 first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
1925 #else
1926                 SET_BUT_UNUSED(tmp);
1927 #endif
1928                 cfs_kunmap(pages[i]);
1929         }
1930         RETURN(rc);
1931 }
1932
1933 static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
1934                       struct ptlrpc_request **request)
1935 {
1936         struct obd_device       *obd = exp->exp_obd;
1937         struct lmv_obd          *lmv = &obd->u.lmv;
1938         struct lmv_tgt_desc     *tgt = NULL;
1939         int                      rc;
1940         ENTRY;
1941
1942         rc = lmv_check_connect(obd);
1943         if (rc)
1944                 RETURN(rc);
1945
1946         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1947         if (IS_ERR(tgt))
1948                 RETURN(PTR_ERR(tgt));
1949
1950         op_data->op_fsuid = cfs_curproc_fsuid();
1951         op_data->op_fsgid = cfs_curproc_fsgid();
1952         op_data->op_cap = cfs_curproc_cap_pack();
1953
1954         /*
1955          * If child's fid is given, cancel unused locks for it if it is from
1956          * another export than parent.
1957          *
1958          * LOOKUP lock for child (fid3) should also be cancelled on parent
1959          * tgt_tgt in mdc_unlink().
1960          */
1961         op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
1962
1963         /*
1964          * Cancel FULL locks on child (fid3).
1965          */
1966         rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
1967                               MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
1968
1969         if (rc != 0)
1970                 RETURN(rc);
1971
1972         rc = md_unlink(tgt->ltd_exp, op_data, request);
1973
1974         RETURN(rc);
1975 }
1976
1977 static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
1978 {
1979         struct lmv_obd *lmv = &obd->u.lmv;
1980         int rc = 0;
1981
1982         switch (stage) {
1983         case OBD_CLEANUP_EARLY:
1984                 /* XXX: here should be calling obd_precleanup() down to
1985                  * stack. */
1986                 break;
1987         case OBD_CLEANUP_EXPORTS:
1988                 fld_client_proc_fini(&lmv->lmv_fld);
1989                 lprocfs_obd_cleanup(obd);
1990                 break;
1991         default:
1992                 break;
1993         }
1994         RETURN(rc);
1995 }
1996
1997 static int lmv_get_info(const struct lu_env *env, struct obd_export *exp,
1998                         __u32 keylen, void *key, __u32 *vallen, void *val,
1999                         struct lov_stripe_md *lsm)
2000 {
2001         struct obd_device       *obd;
2002         struct lmv_obd          *lmv;
2003         int                      rc = 0;
2004         ENTRY;
2005
2006         obd = class_exp2obd(exp);
2007         if (obd == NULL) {
2008                 CDEBUG(D_IOCTL, "Invalid client cookie "LPX64"\n",
2009                        exp->exp_handle.h_cookie);
2010                 RETURN(-EINVAL);
2011         }
2012
2013         lmv = &obd->u.lmv;
2014         if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
2015                 struct lmv_tgt_desc *tgts;
2016                 int i;
2017
2018                 rc = lmv_check_connect(obd);
2019                 if (rc)
2020                         RETURN(rc);
2021
2022                 LASSERT(*vallen == sizeof(__u32));
2023                 for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count;
2024                      i++, tgts++) {
2025
2026                         /*
2027                          * All tgts should be connected when this gets called.
2028                          */
2029                         if (!tgts || !tgts->ltd_exp) {
2030                                 CERROR("target not setup?\n");
2031                                 continue;
2032                         }
2033
2034                         if (!obd_get_info(env, tgts->ltd_exp, keylen, key,
2035                                           vallen, val, NULL))
2036                                 RETURN(0);
2037                 }
2038                 RETURN(-EINVAL);
2039         } else if (KEY_IS(KEY_MAX_EASIZE) || KEY_IS(KEY_CONN_DATA)) {
2040                 rc = lmv_check_connect(obd);
2041                 if (rc)
2042                         RETURN(rc);
2043
2044                 /*
2045                  * Forwarding this request to first MDS, it should know LOV
2046                  * desc.
2047                  */
2048                 rc = obd_get_info(env, lmv->tgts[0].ltd_exp, keylen, key,
2049                                   vallen, val, NULL);
2050                 if (!rc && KEY_IS(KEY_CONN_DATA)) {
2051                         exp->exp_connect_flags =
2052                         ((struct obd_connect_data *)val)->ocd_connect_flags;
2053                 }
2054                 RETURN(rc);
2055         } else if (KEY_IS(KEY_TGT_COUNT)) {
2056                 *((int *)val) = lmv->desc.ld_tgt_count;
2057                 RETURN(0);
2058         }
2059
2060         CDEBUG(D_IOCTL, "Invalid key\n");
2061         RETURN(-EINVAL);
2062 }
2063
2064 int lmv_set_info_async(const struct lu_env *env, struct obd_export *exp,
2065                        obd_count keylen, void *key, obd_count vallen,
2066                        void *val, struct ptlrpc_request_set *set)
2067 {
2068         struct lmv_tgt_desc    *tgt;
2069         struct obd_device      *obd;
2070         struct lmv_obd         *lmv;
2071         int rc = 0;
2072         ENTRY;
2073
2074         obd = class_exp2obd(exp);
2075         if (obd == NULL) {
2076                 CDEBUG(D_IOCTL, "Invalid client cookie "LPX64"\n",
2077                        exp->exp_handle.h_cookie);
2078                 RETURN(-EINVAL);
2079         }
2080         lmv = &obd->u.lmv;
2081
2082         if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX)) {
2083                 int i, err = 0;
2084
2085                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2086                         tgt = &lmv->tgts[i];
2087
2088                         if (!tgt->ltd_exp)
2089                                 continue;
2090
2091                         err = obd_set_info_async(env, tgt->ltd_exp,
2092                                                  keylen, key, vallen, val, set);
2093                         if (err && rc == 0)
2094                                 rc = err;
2095                 }
2096
2097                 RETURN(rc);
2098         }
2099
2100         RETURN(-EINVAL);
2101 }
2102
2103 int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
2104                struct lov_stripe_md *lsm)
2105 {
2106         struct obd_device         *obd = class_exp2obd(exp);
2107         struct lmv_obd            *lmv = &obd->u.lmv;
2108         struct lmv_stripe_md      *meap;
2109         struct lmv_stripe_md      *lsmp;
2110         int                        mea_size;
2111         int                        i;
2112         ENTRY;
2113
2114         mea_size = lmv_get_easize(lmv);
2115         if (!lmmp)
2116                 RETURN(mea_size);
2117
2118         if (*lmmp && !lsm) {
2119                 OBD_FREE_LARGE(*lmmp, mea_size);
2120                 *lmmp = NULL;
2121                 RETURN(0);
2122         }
2123
2124         if (*lmmp == NULL) {
2125                 OBD_ALLOC_LARGE(*lmmp, mea_size);
2126                 if (*lmmp == NULL)
2127                         RETURN(-ENOMEM);
2128         }
2129
2130         if (!lsm)
2131                 RETURN(mea_size);
2132
2133         lsmp = (struct lmv_stripe_md *)lsm;
2134         meap = (struct lmv_stripe_md *)*lmmp;
2135
2136         if (lsmp->mea_magic != MEA_MAGIC_LAST_CHAR &&
2137             lsmp->mea_magic != MEA_MAGIC_ALL_CHARS)
2138                 RETURN(-EINVAL);
2139
2140         meap->mea_magic = cpu_to_le32(lsmp->mea_magic);
2141         meap->mea_count = cpu_to_le32(lsmp->mea_count);
2142         meap->mea_master = cpu_to_le32(lsmp->mea_master);
2143
2144         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2145                 meap->mea_ids[i] = meap->mea_ids[i];
2146                 fid_cpu_to_le(&meap->mea_ids[i], &meap->mea_ids[i]);
2147         }
2148
2149         RETURN(mea_size);
2150 }
2151
2152 int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
2153                  struct lov_mds_md *lmm, int lmm_size)
2154 {
2155         struct obd_device          *obd = class_exp2obd(exp);
2156         struct lmv_stripe_md      **tmea = (struct lmv_stripe_md **)lsmp;
2157         struct lmv_stripe_md       *mea = (struct lmv_stripe_md *)lmm;
2158         struct lmv_obd             *lmv = &obd->u.lmv;
2159         int                         mea_size;
2160         int                         i;
2161         __u32                       magic;
2162         ENTRY;
2163
2164         mea_size = lmv_get_easize(lmv);
2165         if (lsmp == NULL)
2166                 return mea_size;
2167
2168         if (*lsmp != NULL && lmm == NULL) {
2169                 OBD_FREE_LARGE(*tmea, mea_size);
2170                 *lsmp = NULL;
2171                 RETURN(0);
2172         }
2173
2174         LASSERT(mea_size == lmm_size);
2175
2176         OBD_ALLOC_LARGE(*tmea, mea_size);
2177         if (*tmea == NULL)
2178                 RETURN(-ENOMEM);
2179
2180         if (!lmm)
2181                 RETURN(mea_size);
2182
2183         if (mea->mea_magic == MEA_MAGIC_LAST_CHAR ||
2184             mea->mea_magic == MEA_MAGIC_ALL_CHARS ||
2185             mea->mea_magic == MEA_MAGIC_HASH_SEGMENT)
2186         {
2187                 magic = le32_to_cpu(mea->mea_magic);
2188         } else {
2189                 /*
2190                  * Old mea is not handled here.
2191                  */
2192                 CERROR("Old not supportable EA is found\n");
2193                 LBUG();
2194         }
2195
2196         (*tmea)->mea_magic = magic;
2197         (*tmea)->mea_count = le32_to_cpu(mea->mea_count);
2198         (*tmea)->mea_master = le32_to_cpu(mea->mea_master);
2199
2200         for (i = 0; i < (*tmea)->mea_count; i++) {
2201                 (*tmea)->mea_ids[i] = mea->mea_ids[i];
2202                 fid_le_to_cpu(&(*tmea)->mea_ids[i], &(*tmea)->mea_ids[i]);
2203         }
2204         RETURN(mea_size);
2205 }
2206
2207 static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
2208                              ldlm_policy_data_t *policy, ldlm_mode_t mode,
2209                              ldlm_cancel_flags_t flags, void *opaque)
2210 {
2211         struct obd_device       *obd = exp->exp_obd;
2212         struct lmv_obd          *lmv = &obd->u.lmv;
2213         int                      rc = 0;
2214         int                      err;
2215         int                      i;
2216         ENTRY;
2217
2218         LASSERT(fid != NULL);
2219
2220         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2221                 if (!lmv->tgts[i].ltd_exp || !lmv->tgts[i].ltd_active)
2222                         continue;
2223
2224                 err = md_cancel_unused(lmv->tgts[i].ltd_exp, fid,
2225                                        policy, mode, flags, opaque);
2226                 if (!rc)
2227                         rc = err;
2228         }
2229         RETURN(rc);
2230 }
2231
2232 int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
2233                       __u64 *bits)
2234 {
2235         struct obd_device       *obd = exp->exp_obd;
2236         struct lmv_obd          *lmv = &obd->u.lmv;
2237         int                      rc;
2238         ENTRY;
2239
2240         rc =  md_set_lock_data(lmv->tgts[0].ltd_exp, lockh, data, bits);
2241         RETURN(rc);
2242 }
2243
2244 ldlm_mode_t lmv_lock_match(struct obd_export *exp, __u64 flags,
2245                            const struct lu_fid *fid, ldlm_type_t type,
2246                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
2247                            struct lustre_handle *lockh)
2248 {
2249         struct obd_device       *obd = exp->exp_obd;
2250         struct lmv_obd          *lmv = &obd->u.lmv;
2251         ldlm_mode_t              rc;
2252         int                      i;
2253         ENTRY;
2254
2255         CDEBUG(D_INODE, "Lock match for "DFID"\n", PFID(fid));
2256
2257         /*
2258          * With CMD every object can have two locks in different namespaces:
2259          * lookup lock in space of mds storing direntry and update/open lock in
2260          * space of mds storing inode. Thus we check all targets, not only that
2261          * one fid was created in.
2262          */
2263         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2264                 rc = md_lock_match(lmv->tgts[i].ltd_exp, flags, fid,
2265                                    type, policy, mode, lockh);
2266                 if (rc)
2267                         RETURN(rc);
2268         }
2269
2270         RETURN(0);
2271 }
2272
2273 int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
2274                       struct obd_export *dt_exp, struct obd_export *md_exp,
2275                       struct lustre_md *md)
2276 {
2277         struct obd_device       *obd = exp->exp_obd;
2278         struct lmv_obd          *lmv = &obd->u.lmv;
2279         int                      rc;
2280         ENTRY;
2281         rc = md_get_lustre_md(lmv->tgts[0].ltd_exp, req, dt_exp, md_exp, md);
2282         RETURN(rc);
2283 }
2284
2285 int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
2286 {
2287         struct obd_device       *obd = exp->exp_obd;
2288         struct lmv_obd          *lmv = &obd->u.lmv;
2289         ENTRY;
2290
2291         if (md->mea)
2292                 obd_free_memmd(exp, (void *)&md->mea);
2293         RETURN(md_free_lustre_md(lmv->tgts[0].ltd_exp, md));
2294 }
2295
2296 int lmv_set_open_replay_data(struct obd_export *exp,
2297                              struct obd_client_handle *och,
2298                              struct ptlrpc_request *open_req)
2299 {
2300         struct obd_device       *obd = exp->exp_obd;
2301         struct lmv_obd          *lmv = &obd->u.lmv;
2302         struct lmv_tgt_desc     *tgt;
2303         ENTRY;
2304
2305         tgt = lmv_find_target(lmv, &och->och_fid);
2306         if (IS_ERR(tgt))
2307                 RETURN(PTR_ERR(tgt));
2308
2309         RETURN(md_set_open_replay_data(tgt->ltd_exp, och, open_req));
2310 }
2311
2312 int lmv_clear_open_replay_data(struct obd_export *exp,
2313                                struct obd_client_handle *och)
2314 {
2315         struct obd_device       *obd = exp->exp_obd;
2316         struct lmv_obd          *lmv = &obd->u.lmv;
2317         struct lmv_tgt_desc     *tgt;
2318         ENTRY;
2319
2320         tgt = lmv_find_target(lmv, &och->och_fid);
2321         if (IS_ERR(tgt))
2322                 RETURN(PTR_ERR(tgt));
2323
2324         RETURN(md_clear_open_replay_data(tgt->ltd_exp, och));
2325 }
2326
2327 static int lmv_get_remote_perm(struct obd_export *exp,
2328                                const struct lu_fid *fid,
2329                                struct obd_capa *oc, __u32 suppgid,
2330                                struct ptlrpc_request **request)
2331 {
2332         struct obd_device       *obd = exp->exp_obd;
2333         struct lmv_obd          *lmv = &obd->u.lmv;
2334         struct lmv_tgt_desc     *tgt;
2335         int                      rc;
2336         ENTRY;
2337
2338         rc = lmv_check_connect(obd);
2339         if (rc)
2340                 RETURN(rc);
2341
2342         tgt = lmv_find_target(lmv, fid);
2343         if (IS_ERR(tgt))
2344                 RETURN(PTR_ERR(tgt));
2345
2346         rc = md_get_remote_perm(tgt->ltd_exp, fid, oc, suppgid, request);
2347         RETURN(rc);
2348 }
2349
2350 static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *oc,
2351                           renew_capa_cb_t cb)
2352 {
2353         struct obd_device       *obd = exp->exp_obd;
2354         struct lmv_obd          *lmv = &obd->u.lmv;
2355         struct lmv_tgt_desc     *tgt;
2356         int                      rc;
2357         ENTRY;
2358
2359         rc = lmv_check_connect(obd);
2360         if (rc)
2361                 RETURN(rc);
2362
2363         tgt = lmv_find_target(lmv, &oc->c_capa.lc_fid);
2364         if (IS_ERR(tgt))
2365                 RETURN(PTR_ERR(tgt));
2366
2367         rc = md_renew_capa(tgt->ltd_exp, oc, cb);
2368         RETURN(rc);
2369 }
2370
2371 int lmv_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req,
2372                     const struct req_msg_field *field, struct obd_capa **oc)
2373 {
2374         struct obd_device *obd = exp->exp_obd;
2375         struct lmv_obd *lmv = &obd->u.lmv;
2376         int rc;
2377
2378         ENTRY;
2379         rc = md_unpack_capa(lmv->tgts[0].ltd_exp, req, field, oc);
2380         RETURN(rc);
2381 }
2382
2383 int lmv_intent_getattr_async(struct obd_export *exp,
2384                              struct md_enqueue_info *minfo,
2385                              struct ldlm_enqueue_info *einfo)
2386 {
2387         struct md_op_data       *op_data = &minfo->mi_data;
2388         struct obd_device       *obd = exp->exp_obd;
2389         struct lmv_obd          *lmv = &obd->u.lmv;
2390         struct lmv_tgt_desc     *tgt = NULL;
2391         int                      rc;
2392         ENTRY;
2393
2394         rc = lmv_check_connect(obd);
2395         if (rc)
2396                 RETURN(rc);
2397
2398         tgt = lmv_find_target(lmv, &op_data->op_fid1);
2399         if (IS_ERR(tgt))
2400                 RETURN(PTR_ERR(tgt));
2401
2402         rc = md_intent_getattr_async(tgt->ltd_exp, minfo, einfo);
2403         RETURN(rc);
2404 }
2405
2406 int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
2407                         struct lu_fid *fid, __u64 *bits)
2408 {
2409         struct obd_device       *obd = exp->exp_obd;
2410         struct lmv_obd          *lmv = &obd->u.lmv;
2411         struct lmv_tgt_desc     *tgt;
2412         int                      rc;
2413         ENTRY;
2414
2415         rc = lmv_check_connect(obd);
2416         if (rc)
2417                 RETURN(rc);
2418
2419         tgt = lmv_find_target(lmv, fid);
2420         if (IS_ERR(tgt))
2421                 RETURN(PTR_ERR(tgt));
2422
2423         rc = md_revalidate_lock(tgt->ltd_exp, it, fid, bits);
2424         RETURN(rc);
2425 }
2426
2427 /**
2428  * For lmv, only need to send request to master MDT, and the master MDT will
2429  * process with other slave MDTs. The only exception is Q_GETOQUOTA for which
2430  * we directly fetch data from the slave MDTs.
2431  */
2432 int lmv_quotactl(struct obd_device *unused, struct obd_export *exp,
2433                  struct obd_quotactl *oqctl)
2434 {
2435         struct obd_device   *obd = class_exp2obd(exp);
2436         struct lmv_obd      *lmv = &obd->u.lmv;
2437         struct lmv_tgt_desc *tgt = &lmv->tgts[0];
2438         int                  rc = 0, i;
2439         __u64                curspace, curinodes;
2440         ENTRY;
2441
2442         if (!lmv->desc.ld_tgt_count || !tgt->ltd_active) {
2443                 CERROR("master lmv inactive\n");
2444                 RETURN(-EIO);
2445         }
2446
2447         if (oqctl->qc_cmd != Q_GETOQUOTA) {
2448                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
2449                 RETURN(rc);
2450         }
2451
2452         curspace = curinodes = 0;
2453         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2454                 int err;
2455                 tgt = &lmv->tgts[i];
2456
2457                 if (tgt->ltd_exp == NULL)
2458                         continue;
2459                 if (!tgt->ltd_active) {
2460                         CDEBUG(D_HA, "mdt %d is inactive.\n", i);
2461                         continue;
2462                 }
2463
2464                 err = obd_quotactl(tgt->ltd_exp, oqctl);
2465                 if (err) {
2466                         CERROR("getquota on mdt %d failed. %d\n", i, err);
2467                         if (!rc)
2468                                 rc = err;
2469                 } else {
2470                         curspace += oqctl->qc_dqblk.dqb_curspace;
2471                         curinodes += oqctl->qc_dqblk.dqb_curinodes;
2472                 }
2473         }
2474         oqctl->qc_dqblk.dqb_curspace = curspace;
2475         oqctl->qc_dqblk.dqb_curinodes = curinodes;
2476
2477         RETURN(rc);
2478 }
2479
2480 int lmv_quotacheck(struct obd_device *unused, struct obd_export *exp,
2481                    struct obd_quotactl *oqctl)
2482 {
2483         struct obd_device   *obd = class_exp2obd(exp);
2484         struct lmv_obd      *lmv = &obd->u.lmv;
2485         struct lmv_tgt_desc *tgt;
2486         int                  i, rc = 0;
2487         ENTRY;
2488
2489         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
2490                 int err;
2491
2492                 if (!tgt->ltd_active) {
2493                         CERROR("lmv idx %d inactive\n", i);
2494                         RETURN(-EIO);
2495                 }
2496
2497                 err = obd_quotacheck(tgt->ltd_exp, oqctl);
2498                 if (err && !rc)
2499                         rc = err;
2500         }
2501
2502         RETURN(rc);
2503 }
2504
2505 struct obd_ops lmv_obd_ops = {
2506         .o_owner                = THIS_MODULE,
2507         .o_setup                = lmv_setup,
2508         .o_cleanup              = lmv_cleanup,
2509         .o_precleanup           = lmv_precleanup,
2510         .o_process_config       = lmv_process_config,
2511         .o_connect              = lmv_connect,
2512         .o_disconnect           = lmv_disconnect,
2513         .o_statfs               = lmv_statfs,
2514         .o_get_info             = lmv_get_info,
2515         .o_set_info_async       = lmv_set_info_async,
2516         .o_packmd               = lmv_packmd,
2517         .o_unpackmd             = lmv_unpackmd,
2518         .o_notify               = lmv_notify,
2519         .o_get_uuid             = lmv_get_uuid,
2520         .o_iocontrol            = lmv_iocontrol,
2521         .o_quotacheck           = lmv_quotacheck,
2522         .o_quotactl             = lmv_quotactl
2523 };
2524
2525 struct md_ops lmv_md_ops = {
2526         .m_getstatus            = lmv_getstatus,
2527         .m_change_cbdata        = lmv_change_cbdata,
2528         .m_find_cbdata          = lmv_find_cbdata,
2529         .m_close                = lmv_close,
2530         .m_create               = lmv_create,
2531         .m_done_writing         = lmv_done_writing,
2532         .m_enqueue              = lmv_enqueue,
2533         .m_getattr              = lmv_getattr,
2534         .m_getxattr             = lmv_getxattr,
2535         .m_getattr_name         = lmv_getattr_name,
2536         .m_intent_lock          = lmv_intent_lock,
2537         .m_link                 = lmv_link,
2538         .m_rename               = lmv_rename,
2539         .m_setattr              = lmv_setattr,
2540         .m_setxattr             = lmv_setxattr,
2541         .m_sync                 = lmv_sync,
2542         .m_readpage             = lmv_readpage,
2543         .m_unlink               = lmv_unlink,
2544         .m_init_ea_size         = lmv_init_ea_size,
2545         .m_cancel_unused        = lmv_cancel_unused,
2546         .m_set_lock_data        = lmv_set_lock_data,
2547         .m_lock_match           = lmv_lock_match,
2548         .m_get_lustre_md        = lmv_get_lustre_md,
2549         .m_free_lustre_md       = lmv_free_lustre_md,
2550         .m_set_open_replay_data = lmv_set_open_replay_data,
2551         .m_clear_open_replay_data = lmv_clear_open_replay_data,
2552         .m_renew_capa           = lmv_renew_capa,
2553         .m_unpack_capa          = lmv_unpack_capa,
2554         .m_get_remote_perm      = lmv_get_remote_perm,
2555         .m_intent_getattr_async = lmv_intent_getattr_async,
2556         .m_revalidate_lock      = lmv_revalidate_lock
2557 };
2558
2559 int __init lmv_init(void)
2560 {
2561         struct lprocfs_static_vars lvars;
2562         int                        rc;
2563
2564         lprocfs_lmv_init_vars(&lvars);
2565
2566         rc = class_register_type(&lmv_obd_ops, &lmv_md_ops,
2567                                  lvars.module_vars, LUSTRE_LMV_NAME, NULL);
2568         return rc;
2569 }
2570
2571 #ifdef __KERNEL__
2572 static void lmv_exit(void)
2573 {
2574         class_unregister_type(LUSTRE_LMV_NAME);
2575 }
2576
2577 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2578 MODULE_DESCRIPTION("Lustre Logical Metadata Volume OBD driver");
2579 MODULE_LICENSE("GPL");
2580
2581 module_init(lmv_init);
2582 module_exit(lmv_exit);
2583 #endif