Whamcloud - gitweb
Branch: b_new_cmd
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003, 2004, 2005, 2006 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LMV
26 #ifdef __KERNEL__
27 #include <linux/slab.h>
28 #include <linux/module.h>
29 #include <linux/init.h>
30 #include <linux/slab.h>
31 #include <linux/pagemap.h>
32 #include <linux/mm.h>
33 #include <asm/div64.h>
34 #include <linux/seq_file.h>
35 #include <linux/namei.h>
36 #else
37 #include <liblustre.h>
38 #endif
39 #include <linux/ext2_fs.h>
40
41 #include <lustre/lustre_idl.h>
42 #include <lustre_log.h>
43 #include <obd_support.h>
44 #include <lustre_lib.h>
45 #include <lustre_net.h>
46 #include <obd_class.h>
47 #include <lprocfs_status.h>
48 #include <lustre_lite.h>
49 #include <lustre_fid.h>
50 #include "lmv_internal.h"
51
52 /* not defined for liblustre building */
53 #if !defined(ATOMIC_INIT)
54 #define ATOMIC_INIT(val) { (val) }
55 #endif
56
57 /* object cache. */
58 kmem_cache_t *obj_cache;
59 atomic_t obj_cache_count = ATOMIC_INIT(0);
60
61 static void lmv_activate_target(struct lmv_obd *lmv,
62                                 struct lmv_tgt_desc *tgt,
63                                 int activate)
64 {
65         if (tgt->active == activate)
66                 return;
67
68         tgt->active = activate;
69         lmv->desc.ld_active_tgt_count += (activate ? 1 : -1);
70 }
71
72 /* Error codes:
73  *
74  *  -EINVAL  : UUID can't be found in the LMV's target list
75  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
76  *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
77  */
78 static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
79                               int activate)
80 {
81         struct lmv_tgt_desc *tgt;
82         struct obd_device *obd;
83         int i, rc = 0;
84         ENTRY;
85
86         CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
87                lmv, uuid->uuid, activate);
88
89         spin_lock(&lmv->lmv_lock);
90         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
91                 if (tgt->ltd_exp == NULL)
92                         continue;
93
94                 CDEBUG(D_INFO, "lmv idx %d is %s conn "LPX64"\n",
95                        i, tgt->uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
96
97                 if (obd_uuid_equals(uuid, &tgt->uuid))
98                         break;
99         }
100
101         if (i == lmv->desc.ld_tgt_count)
102                 GOTO(out_lmv_lock, rc = -EINVAL);
103
104         obd = class_exp2obd(tgt->ltd_exp);
105         if (obd == NULL)
106                 GOTO(out_lmv_lock, rc = -ENOTCONN);
107
108         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
109                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
110                obd->obd_type->typ_name, i);
111         LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0);
112
113         if (tgt->active == activate) {
114                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
115                        activate ? "" : "in");
116                 GOTO(out_lmv_lock, rc);
117         }
118
119         CDEBUG(D_INFO, "Marking OBD %p %sactive\n",
120                obd, activate ? "" : "in");
121
122         lmv_activate_target(lmv, tgt, activate);
123
124         EXIT;
125
126  out_lmv_lock:
127         spin_unlock(&lmv->lmv_lock);
128         return rc;
129 }
130
131 static int lmv_set_mdc_data(struct lmv_obd *lmv, struct obd_uuid *uuid,
132                             struct obd_connect_data *data)
133 {
134         struct lmv_tgt_desc *tgt;
135         int i;
136         ENTRY;
137
138         LASSERT(data != NULL);
139
140         spin_lock(&lmv->lmv_lock);
141         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
142                 if (tgt->ltd_exp == NULL)
143                         continue;
144
145                 if (obd_uuid_equals(uuid, &tgt->uuid)) {
146                         lmv->datas[tgt->idx] = *data;
147                         break;
148                 }
149         }
150         spin_unlock(&lmv->lmv_lock);
151         RETURN(0);
152 }
153
154 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
155                       enum obd_notify_event ev, void *data)
156 {
157         struct lmv_obd *lmv = &obd->u.lmv;
158         struct obd_uuid *uuid;
159         int rc = 0;
160         ENTRY;
161
162         if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) {
163                 CERROR("unexpected notification of %s %s!\n",
164                        watched->obd_type->typ_name,
165                        watched->obd_name);
166                 RETURN(-EINVAL);
167         }
168
169         uuid = &watched->u.cli.cl_target_uuid;
170         if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) {
171                 /*
172                  * Set MDC as active before notifying the observer, so the
173                  * observer can use the MDC normally.
174                  */
175                 rc = lmv_set_mdc_active(lmv, uuid,
176                                         ev == OBD_NOTIFY_ACTIVE);
177                 if (rc) {
178                         CERROR("%sactivation of %s failed: %d\n",
179                                ev == OBD_NOTIFY_ACTIVE ? "" : "de",
180                                uuid->uuid, rc);
181                         RETURN(rc);
182                 }
183         }
184
185         if (ev == OBD_NOTIFY_OCD) {
186                 struct obd_connect_data *conn_data =
187                         &watched->u.cli.cl_import->imp_connect_data;
188                 /*
189                  * Set connect data to desired target, update exp_connect_flags.
190                  */
191                 rc = lmv_set_mdc_data(lmv, uuid, conn_data);
192                 if (rc) {
193                         CERROR("can't set connect data to target %s, rc %d\n",
194                                uuid->uuid, rc);
195                         RETURN(rc);
196                 }
197
198                 /*
199                  * XXX: make sure that ocd_connect_flags from all targets are
200                  * the same. Otherwise one of MDTs runs wrong version or
201                  * something like this.  --umka
202                  */
203                 obd->obd_self_export->exp_connect_flags =
204                         conn_data->ocd_connect_flags;
205         }
206
207         /* Pass the notification up the chain. */
208         if (obd->obd_observer)
209                 rc = obd_notify(obd->obd_observer, watched, ev, data);
210
211         RETURN(rc);
212 }
213
214 /* this is fake connect function. Its purpose is to initialize lmv and say
215  * caller that everything is okay. Real connection will be performed later. */
216 static int lmv_connect(const struct lu_env *env,
217                        struct lustre_handle *conn, struct obd_device *obd,
218                        struct obd_uuid *cluuid, struct obd_connect_data *data)
219 {
220 #ifdef __KERNEL__
221         struct proc_dir_entry *lmv_proc_dir;
222 #endif
223         struct lmv_obd *lmv = &obd->u.lmv;
224         struct obd_export *exp;
225         int rc = 0;
226         ENTRY;
227
228         rc = class_connect(conn, obd, cluuid);
229         if (rc) {
230                 CERROR("class_connection() returned %d\n", rc);
231                 RETURN(rc);
232         }
233
234         exp = class_conn2export(conn);
235
236         /* we don't want to actually do the underlying connections more than
237          * once, so keep track. */
238         lmv->refcount++;
239         if (lmv->refcount > 1) {
240                 class_export_put(exp);
241                 RETURN(0);
242         }
243
244         lmv->exp = exp;
245         lmv->connected = 0;
246         lmv->cluuid = *cluuid;
247
248         if (data)
249                 lmv->conn_data = *data;
250
251 #ifdef __KERNEL__
252         lmv_proc_dir = lprocfs_register("target_obds", obd->obd_proc_entry,
253                                         NULL, NULL);
254         if (IS_ERR(lmv_proc_dir)) {
255                 CERROR("could not register /proc/fs/lustre/%s/%s/target_obds.",
256                        obd->obd_type->typ_name, obd->obd_name);
257                 lmv_proc_dir = NULL;
258         }
259 #endif
260
261         /* all real clients should perform actual connection right away, because
262          * it is possible, that LMV will not have opportunity to connect targets
263          * and MDC stuff will be called directly, for instance while reading
264          * ../mdc/../kbytesfree procfs file, etc. */
265         if (data->ocd_connect_flags & OBD_CONNECT_REAL)
266                 rc = lmv_check_connect(obd);
267
268 #ifdef __KERNEL__
269         if (rc) {
270                 if (lmv_proc_dir)
271                         lprocfs_remove(lmv_proc_dir);
272         }
273 #endif
274
275         RETURN(rc);
276 }
277
278 static void lmv_set_timeouts(struct obd_device *obd)
279 {
280         struct lmv_tgt_desc *tgts;
281         struct lmv_obd *lmv;
282         int i;
283
284         lmv = &obd->u.lmv;
285         if (lmv->server_timeout == 0)
286                 return;
287
288         if (lmv->connected == 0)
289                 return;
290
291         for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgts++) {
292                 if (tgts->ltd_exp == NULL)
293                         continue;
294
295                 obd_set_info_async(tgts->ltd_exp, strlen("inter_mds"),
296                                    "inter_mds", 0, NULL, NULL);
297         }
298 }
299
300 static int lmv_init_ea_size(struct obd_export *exp, int easize,
301                             int def_easize, int cookiesize)
302 {
303         struct obd_device *obd = exp->exp_obd;
304         struct lmv_obd *lmv = &obd->u.lmv;
305         int i, rc = 0, change = 0;
306         ENTRY;
307
308         if (lmv->max_easize < easize) {
309                 lmv->max_easize = easize;
310                 change = 1;
311         }
312         if (lmv->max_def_easize < def_easize) {
313                 lmv->max_def_easize = def_easize;
314                 change = 1;
315         }
316         if (lmv->max_cookiesize < cookiesize) {
317                 lmv->max_cookiesize = cookiesize;
318                 change = 1;
319         }
320         if (change == 0)
321                 RETURN(0);
322
323         if (lmv->connected == 0)
324                 RETURN(0);
325
326         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
327                 if (lmv->tgts[i].ltd_exp == NULL) {
328                         CWARN("%s: NULL export for %d\n", obd->obd_name, i);
329                         continue;
330                 }
331
332                 rc = md_init_ea_size(lmv->tgts[i].ltd_exp, easize, def_easize,
333                                      cookiesize);
334                 if (rc) {
335                         CERROR("obd_init_ea_size() failed on MDT target %d, "
336                                "error %d.\n", i, rc);
337                         break;
338                 }
339         }
340         RETURN(rc);
341 }
342
343 #define MAX_STRING_SIZE 128
344
345 int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
346 {
347         struct lmv_obd *lmv = &obd->u.lmv;
348         struct obd_uuid *cluuid = &lmv->cluuid;
349         struct obd_connect_data *mdc_data = NULL;
350         struct obd_uuid lmv_mdc_uuid = { "LMV_MDC_UUID" };
351         struct lustre_handle conn = {0, };
352         struct obd_device *mdc_obd;
353         struct obd_export *mdc_exp;
354         struct lu_fld_target target;
355         int rc;
356 #ifdef __KERNEL__
357         struct proc_dir_entry *lmv_proc_dir;
358 #endif
359         ENTRY;
360
361         /* for MDS: don't connect to yourself */
362         if (obd_uuid_equals(&tgt->uuid, cluuid)) {
363                 CDEBUG(D_CONFIG, "don't connect back to %s\n", cluuid->uuid);
364                 /* XXX - the old code didn't increment active tgt count.
365                  *       should we ? */
366                 RETURN(0);
367         }
368
369         mdc_obd = class_find_client_obd(&tgt->uuid, LUSTRE_MDC_NAME,
370                                         &obd->obd_uuid);
371         if (!mdc_obd) {
372                 CERROR("target %s not attached\n", tgt->uuid.uuid);
373                 RETURN(-EINVAL);
374         }
375
376         CDEBUG(D_CONFIG, "connect to %s(%s) - %s, %s FOR %s\n",
377                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
378                 tgt->uuid.uuid, obd->obd_uuid.uuid,
379                 cluuid->uuid);
380
381         if (!mdc_obd->obd_set_up) {
382                 CERROR("target %s is not set up\n", tgt->uuid.uuid);
383                 RETURN(-EINVAL);
384         }
385
386         rc = obd_connect(NULL, &conn, mdc_obd, &lmv_mdc_uuid,
387                          &lmv->conn_data);
388         if (rc) {
389                 CERROR("target %s connect error %d\n", tgt->uuid.uuid, rc);
390                 RETURN(rc);
391         }
392
393         mdc_exp = class_conn2export(&conn);
394
395         target.ft_srv = NULL;
396         target.ft_exp = mdc_exp;
397         target.ft_idx = tgt->idx;
398
399         fld_client_add_target(&lmv->lmv_fld, &target);
400
401         mdc_data = &class_exp2cliimp(mdc_exp)->imp_connect_data;
402
403         rc = obd_register_observer(mdc_obd, obd);
404         if (rc) {
405                 obd_disconnect(mdc_exp);
406                 CERROR("target %s register_observer error %d\n",
407                        tgt->uuid.uuid, rc);
408                 RETURN(rc);
409         }
410
411         if (obd->obd_observer) {
412                 /* tell the mds_lmv about the new target */
413                 rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
414                                 OBD_NOTIFY_ACTIVE, (void *)(tgt - lmv->tgts));
415                 if (rc) {
416                         obd_disconnect(mdc_exp);
417                         RETURN(rc);
418                 }
419         }
420
421         tgt->active = 1;
422         tgt->ltd_exp = mdc_exp;
423         lmv->desc.ld_active_tgt_count++;
424
425         /* copy connect data, it may be used later */
426         lmv->datas[tgt->idx] = *mdc_data;
427
428         md_init_ea_size(tgt->ltd_exp, lmv->max_easize,
429                         lmv->max_def_easize, lmv->max_cookiesize);
430
431         CDEBUG(D_CONFIG, "connected to %s(%s) successfully (%d)\n",
432                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
433                 atomic_read(&obd->obd_refcount));
434
435 #ifdef __KERNEL__
436         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
437         if (lmv_proc_dir) {
438                 struct proc_dir_entry *mdc_symlink;
439                 char name[MAX_STRING_SIZE + 1];
440
441                 LASSERT(mdc_obd->obd_type != NULL);
442                 LASSERT(mdc_obd->obd_type->typ_name != NULL);
443                 name[MAX_STRING_SIZE] = '\0';
444                 snprintf(name, MAX_STRING_SIZE, "../../../%s/%s",
445                          mdc_obd->obd_type->typ_name,
446                          mdc_obd->obd_name);
447                 mdc_symlink = proc_symlink(mdc_obd->obd_name,
448                                            lmv_proc_dir, name);
449                 if (mdc_symlink == NULL) {
450                         CERROR("could not register LMV target "
451                                "/proc/fs/lustre/%s/%s/target_obds/%s.",
452                                obd->obd_type->typ_name, obd->obd_name,
453                                mdc_obd->obd_name);
454                         lprocfs_remove(lmv_proc_dir);
455                         lmv_proc_dir = NULL;
456                 }
457         }
458 #endif
459         RETURN(0);
460 }
461
462 int lmv_add_target(struct obd_device *obd, struct obd_uuid *tgt_uuid)
463 {
464         struct lmv_obd *lmv = &obd->u.lmv;
465         struct lmv_tgt_desc *tgt;
466         int rc = 0;
467         ENTRY;
468
469         CDEBUG(D_CONFIG, "tgt_uuid: %s.\n", tgt_uuid->uuid);
470
471         lmv_init_lock(lmv);
472
473         if (lmv->desc.ld_active_tgt_count >= LMV_MAX_TGT_COUNT) {
474                 lmv_init_unlock(lmv);
475                 CERROR("can't add %s, LMV module compiled for %d MDCs. "
476                        "That many MDCs already configured.\n",
477                        tgt_uuid->uuid, LMV_MAX_TGT_COUNT);
478                 RETURN(-EINVAL);
479         }
480         if (lmv->desc.ld_tgt_count == 0) {
481                 struct obd_device *mdc_obd;
482
483                 mdc_obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME,
484                                                 &obd->obd_uuid);
485                 if (!mdc_obd) {
486                         lmv_init_unlock(lmv);
487                         CERROR("Target %s not attached\n", tgt_uuid->uuid);
488                         RETURN(-EINVAL);
489                 }
490
491                 rc = obd_llog_init(obd, NULL, mdc_obd, 0, NULL, tgt_uuid);
492                 if (rc) {
493                         lmv_init_unlock(lmv);
494                         CERROR("lmv failed to setup llogging subsystems\n");
495                 }
496         }
497         spin_lock(&lmv->lmv_lock);
498         tgt = lmv->tgts + lmv->desc.ld_tgt_count++;
499         tgt->uuid = *tgt_uuid;
500         spin_unlock(&lmv->lmv_lock);
501
502         if (lmv->connected) {
503                 rc = lmv_connect_mdc(obd, tgt);
504                 if (rc) {
505                         spin_lock(&lmv->lmv_lock);
506                         lmv->desc.ld_tgt_count--;
507                         memset(tgt, 0, sizeof(*tgt));
508                         spin_unlock(&lmv->lmv_lock);
509                 } else {
510                         int easize = sizeof(struct lmv_stripe_md) +
511                                      lmv->desc.ld_tgt_count *
512                                      sizeof(struct lu_fid);
513                         lmv_init_ea_size(obd->obd_self_export, easize, 0, 0);
514                 }
515         }
516
517         lmv_init_unlock(lmv);
518         RETURN(rc);
519 }
520
521 /* performs a check if passed obd is connected. If no - connect it. */
522 int lmv_check_connect(struct obd_device *obd)
523 {
524         struct lmv_obd *lmv = &obd->u.lmv;
525         struct lmv_tgt_desc *tgt;
526         int i, rc, easize;
527         ENTRY;
528
529         if (lmv->connected)
530                 RETURN(0);
531
532         lmv_init_lock(lmv);
533         if (lmv->connected) {
534                 lmv_init_unlock(lmv);
535                 RETURN(0);
536         }
537
538         if (lmv->desc.ld_tgt_count == 0) {
539                 CERROR("%s: no targets configured.\n", obd->obd_name);
540                 RETURN(-EINVAL);
541         }
542
543         CDEBUG(D_CONFIG, "time to connect %s to %s\n",
544                lmv->cluuid.uuid, obd->obd_name);
545
546         LASSERT(lmv->tgts != NULL);
547
548         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
549                 rc = lmv_connect_mdc(obd, tgt);
550                 if (rc)
551                         GOTO(out_disc, rc);
552         }
553
554         lmv_set_timeouts(obd);
555         class_export_put(lmv->exp);
556         lmv->connected = 1;
557         easize = lmv_get_easize(lmv);
558         lmv_init_ea_size(obd->obd_self_export, easize, 0, 0);
559         lmv_init_unlock(lmv);
560         RETURN(0);
561
562  out_disc:
563         while (i-- > 0) {
564                 int rc2;
565                 --tgt;
566                 tgt->active = 0;
567                 if (tgt->ltd_exp) {
568                         --lmv->desc.ld_active_tgt_count;
569                         rc2 = obd_disconnect(tgt->ltd_exp);
570                         if (rc2) {
571                                 CERROR("error: LMV target %s disconnect on "
572                                        "MDC idx %d: error %d\n",
573                                        tgt->uuid.uuid, i, rc2);
574                         }
575                 }
576         }
577         class_disconnect(lmv->exp);
578         lmv_init_unlock(lmv);
579         RETURN(rc);
580 }
581
582 static int lmv_disconnect(struct obd_export *exp)
583 {
584         struct obd_device *obd = class_exp2obd(exp);
585         struct lmv_obd *lmv = &obd->u.lmv;
586
587 #ifdef __KERNEL__
588         struct proc_dir_entry *lmv_proc_dir;
589 #endif
590         int rc, i;
591         ENTRY;
592
593         if (!lmv->tgts)
594                 goto out_local;
595
596         /* Only disconnect the underlying layers on the final disconnect. */
597         lmv->refcount--;
598         if (lmv->refcount != 0)
599                 goto out_local;
600
601 #ifdef __KERNEL__
602         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
603 #endif
604
605         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
606                 struct obd_device *mdc_obd;
607
608                 if (lmv->tgts[i].ltd_exp == NULL)
609                         continue;
610
611                 mdc_obd = class_exp2obd(lmv->tgts[i].ltd_exp);
612
613                 if (mdc_obd)
614                         mdc_obd->obd_no_recov = obd->obd_no_recov;
615
616 #ifdef __KERNEL__
617                 if (lmv_proc_dir) {
618                         struct proc_dir_entry *mdc_symlink;
619
620                         mdc_symlink = lprocfs_srch(lmv_proc_dir, mdc_obd->obd_name);
621                         if (mdc_symlink) {
622                                 lprocfs_remove(mdc_symlink);
623                         } else {
624                                 CERROR("/proc/fs/lustre/%s/%s/target_obds/%s missing\n",
625                                        obd->obd_type->typ_name, obd->obd_name,
626                                        mdc_obd->obd_name);
627                         }
628                 }
629 #endif
630                 CDEBUG(D_OTHER, "disconnected from %s(%s) successfully\n",
631                         lmv->tgts[i].ltd_exp->exp_obd->obd_name,
632                         lmv->tgts[i].ltd_exp->exp_obd->obd_uuid.uuid);
633
634                 obd_register_observer(lmv->tgts[i].ltd_exp->exp_obd, NULL);
635                 rc = obd_disconnect(lmv->tgts[i].ltd_exp);
636                 if (rc) {
637                         if (lmv->tgts[i].active) {
638                                 CERROR("Target %s disconnect error %d\n",
639                                        lmv->tgts[i].uuid.uuid, rc);
640                         }
641                         rc = 0;
642                 }
643
644                 lmv_activate_target(lmv, &lmv->tgts[i], 0);
645                 lmv->tgts[i].ltd_exp = NULL;
646         }
647
648 #ifdef __KERNEL__
649         if (lmv_proc_dir) {
650                 lprocfs_remove(lmv_proc_dir);
651         } else {
652                 CERROR("/proc/fs/lustre/%s/%s/target_obds missing\n",
653                        obd->obd_type->typ_name, obd->obd_name);
654         }
655 #endif
656
657 out_local:
658         /* this is the case when no real connection is established by
659          * lmv_check_connect(). */
660         if (!lmv->connected)
661                 class_export_put(exp);
662         rc = class_disconnect(exp);
663         if (lmv->refcount == 0)
664                 lmv->connected = 0;
665         RETURN(rc);
666 }
667
668 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
669                          int len, void *karg, void *uarg)
670 {
671         struct obd_device *obddev = class_exp2obd(exp);
672         struct lmv_obd *lmv = &obddev->u.lmv;
673         int i, rc = 0, set = 0;
674         ENTRY;
675
676         if (lmv->desc.ld_tgt_count == 0)
677                 RETURN(-ENOTTY);
678
679         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
680                 int err;
681
682                 if (lmv->tgts[i].ltd_exp == NULL)
683                         continue;
684
685                 err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp, len, karg, uarg);
686                 if (err) {
687                         if (lmv->tgts[i].active) {
688                                 CERROR("error: iocontrol MDC %s on MDT"
689                                        "idx %d: err = %d\n",
690                                        lmv->tgts[i].uuid.uuid, i, err);
691                                 if (!rc)
692                                         rc = err;
693                         }
694                 } else
695                         set = 1;
696         }
697         if (!set && !rc)
698                 rc = -EIO;
699
700         RETURN(rc);
701 }
702
703 /* assume all is balanced for now */
704 static int lmv_fids_balanced(struct obd_device *obd)
705 {
706         ENTRY;
707         RETURN(1);
708 }
709
710 #if 1
711 static int lmv_all_chars_policy(int count, struct qstr *name)
712 {
713         unsigned int c = 0;
714         unsigned int len = name->len;
715
716         while (len > 0)
717                 c += name->name[-- len];
718         c = c % count;
719         return c;
720 }
721 #endif
722
723 static int lmv_placement_policy(struct obd_device *obd,
724                                 struct lu_placement_hint *hint,
725                                 mdsno_t *mds)
726 {
727         struct lmv_obd *lmv = &obd->u.lmv;
728         int rc;
729         ENTRY;
730
731         LASSERT(mds != NULL);
732
733         /* here are some policies to allocate new fid */
734         if (lmv_fids_balanced(obd)) {
735                 /* allocate new fid basing on its name in the case fids are
736                  * balanced, that is all sequences have more or less equal
737                  * number of objects created. */
738                 if (hint->ph_cname && (hint->ph_opc == LUSTRE_OPC_MKDIR)) {
739 #if 1
740                         *mds = lmv_all_chars_policy(lmv->desc.ld_tgt_count,
741                                                     hint->ph_cname);
742                         rc = 0;
743 #else
744                         /* stress policy for tests - to use non-parent MDS */
745                         LASSERT(fid_is_sane(hint->ph_pfid));
746                         rc = lmv_fld_lookup(lmv, hint->ph_pfid, mds);
747                         if (rc)
748                                 RETURN(rc);
749                         *mds = (int)(*mds + 1) % lmv->desc.ld_tgt_count;
750
751 #endif
752                 } else {
753                         struct lmv_obj *obj;
754                         LASSERT(fid_is_sane(hint->ph_pfid));
755
756                         obj = lmv_obj_grab(obd, hint->ph_pfid);
757                         if (obj) {
758                                 /* If the dir got split, alloc fid according
759                                  * to its hash
760                                  */
761                                 struct lu_fid *rpid;
762
763                                 *mds = raw_name2idx(obj->lo_hashtype,
764                                                     obj->lo_objcount,
765                                                     hint->ph_cname->name,
766                                                     hint->ph_cname->len);
767                                 rpid = &obj->lo_inodes[*mds].li_fid;
768                                 rc = lmv_fld_lookup(lmv, rpid, mds);
769                                 if (rc) {
770                                         lmv_obj_put(obj);
771                                         GOTO(exit, rc);
772                                 }
773                                 CDEBUG(D_INODE, "The obj "DFID" has been"
774                                        "split, got MDS at "LPU64" by name %s\n",
775                                        PFID(hint->ph_pfid), *mds,
776                                        hint->ph_cname->name);
777
778                                 rc = 0;
779                         } else {
780                                 /* default policy is to use parent MDS */
781                                 rc = lmv_fld_lookup(lmv, hint->ph_pfid, mds);
782                         }
783
784                 }
785         } else {
786                 /* sequences among all tgts are not well balanced, allocate new
787                  * fid taking this into account to balance them. Not implemented
788                  * yet! */
789                 *mds = 0;
790                 rc = -EINVAL;
791         }
792 exit:
793         if (rc) {
794                 CERROR("cannot choose MDS, err = %d\n", rc);
795         } else {
796                 LASSERT(*mds < lmv->desc.ld_tgt_count);
797         }
798
799         RETURN(rc);
800 }
801
802 static int lmv_fid_init(struct obd_export *exp)
803 {
804         struct obd_device *obd = class_exp2obd(exp);
805         struct lmv_obd *lmv = &obd->u.lmv;
806         int i, rc = 0;
807         ENTRY;
808
809         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
810                 if (lmv->tgts[i].ltd_exp == NULL)
811                         continue;
812
813                 rc = obd_fid_init(lmv->tgts[i].ltd_exp);
814                 if (rc)
815                         RETURN(rc);
816         }
817         RETURN(rc);
818 }
819
820 static int lmv_fid_fini(struct obd_export *exp)
821 {
822         struct obd_device *obd = class_exp2obd(exp);
823         struct lmv_obd *lmv = &obd->u.lmv;
824         int i, rc = 0;
825         ENTRY;
826
827         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
828                 if (lmv->tgts[i].ltd_exp == NULL)
829                         continue;
830
831                 rc = obd_fid_fini(lmv->tgts[i].ltd_exp);
832                 if (rc)
833                         break;
834         }
835         RETURN(rc);
836 }
837
838 static int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
839                          struct lu_placement_hint *hint)
840 {
841         struct obd_device *obd = class_exp2obd(exp);
842         struct lmv_obd *lmv = &obd->u.lmv;
843         mdsno_t mds;
844         int rc;
845         ENTRY;
846
847         LASSERT(fid != NULL);
848         LASSERT(hint != NULL);
849
850         rc = lmv_placement_policy(obd, hint, &mds);
851         if (rc) {
852                 CERROR("can't get target for allocating fid, "
853                        "rc %d\n", rc);
854                 RETURN(rc);
855         }
856
857         /* asking underlaying tgt layer to allocate new fid */
858         rc = obd_fid_alloc(lmv->tgts[mds].ltd_exp, fid, hint);
859
860         /* client switches to new sequence, setup fld */
861         if (rc > 0) {
862                 LASSERT(fid_is_sane(fid));
863
864                 rc = fld_client_create(&lmv->lmv_fld, fid_seq(fid),
865                                        mds, NULL);
866                 if (rc) {
867                         CERROR("can't create fld entry, rc %d\n", rc);
868                         RETURN(rc);
869                 }
870         }
871
872         RETURN(rc);
873 }
874
875 static int lmv_fid_delete(struct obd_export *exp, const struct lu_fid *fid)
876 {
877         ENTRY;
878
879         LASSERT(exp && fid);
880         if (lmv_obj_delete(exp, fid)) {
881                 CDEBUG(D_OTHER, "lmv object "DFID" is destroyed.\n",
882                        PFID(fid));
883         }
884         RETURN(0);
885 }
886
887 static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
888 {
889         struct lmv_obd *lmv = &obd->u.lmv;
890         struct lprocfs_static_vars lvars;
891         struct lmv_desc *desc;
892         int rc, i = 0;
893         ENTRY;
894
895         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
896                 CERROR("LMV setup requires a descriptor\n");
897                 RETURN(-EINVAL);
898         }
899
900         desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
901         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
902                 CERROR("descriptor size wrong: %d > %d\n",
903                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
904                 RETURN(-EINVAL);
905         }
906
907         lmv->tgts_size = LMV_MAX_TGT_COUNT * sizeof(struct lmv_tgt_desc);
908
909         OBD_ALLOC(lmv->tgts, lmv->tgts_size);
910         if (lmv->tgts == NULL)
911                 RETURN(-ENOMEM);
912
913         for (i = 0; i < LMV_MAX_TGT_COUNT; i++)
914                 lmv->tgts[i].idx = i;
915
916         lmv->datas_size = LMV_MAX_TGT_COUNT * sizeof(struct obd_connect_data);
917
918         OBD_ALLOC(lmv->datas, lmv->datas_size);
919         if (lmv->datas == NULL)
920                 GOTO(out_free_tgts, rc = -ENOMEM);
921
922         obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid);
923         lmv->desc.ld_tgt_count = 0;
924         lmv->desc.ld_active_tgt_count = 0;
925         lmv->max_cookiesize = 0;
926         lmv->max_def_easize = 0;
927         lmv->max_easize = 0;
928
929         spin_lock_init(&lmv->lmv_lock);
930         sema_init(&lmv->init_sem, 1);
931
932         rc = lmv_obj_setup(obd);
933         if (rc) {
934                 CERROR("Can't setup LMV object manager, "
935                        "error %d.\n", rc);
936                 GOTO(out_free_datas, rc);
937         }
938
939         lprocfs_init_vars(lmv, &lvars);
940         lprocfs_obd_setup(obd, lvars.obd_vars);
941 #ifdef LPROCFS
942         {
943                 struct proc_dir_entry *entry;
944
945                 entry = create_proc_entry("target_obd_status", 0444,
946                                           obd->obd_proc_entry);
947                 if (entry != NULL) {
948                         entry->proc_fops = &lmv_proc_target_fops;
949                         entry->data = obd;
950                 }
951        }
952 #endif
953         rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
954                              LUSTRE_CLI_FLD_HASH_DHT);
955         if (rc) {
956                 CERROR("can't init FLD, err %d\n",
957                        rc);
958                 GOTO(out_free_datas, rc);
959         }
960
961         RETURN(0);
962
963 out_free_datas:
964         OBD_FREE(lmv->datas, lmv->datas_size);
965         lmv->datas = NULL;
966 out_free_tgts:
967         OBD_FREE(lmv->tgts, lmv->tgts_size);
968         lmv->tgts = NULL;
969         return rc;
970 }
971
972 static int lmv_cleanup(struct obd_device *obd)
973 {
974         struct lmv_obd *lmv = &obd->u.lmv;
975         ENTRY;
976
977         fld_client_fini(&lmv->lmv_fld);
978         lprocfs_obd_cleanup(obd);
979         lmv_obj_cleanup(obd);
980         OBD_FREE(lmv->datas, lmv->datas_size);
981         OBD_FREE(lmv->tgts, lmv->tgts_size);
982
983         RETURN(0);
984 }
985
986 static int lmv_process_config(struct obd_device *obd, obd_count len, void *buf)
987 {
988         struct lustre_cfg *lcfg = buf;
989         struct obd_uuid tgt_uuid;
990         int rc;
991         ENTRY;
992
993         switch(lcfg->lcfg_command) {
994         case LCFG_ADD_MDC:
995                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(tgt_uuid.uuid))
996                         GOTO(out, rc = -EINVAL);
997
998                 obd_str2uuid(&tgt_uuid, lustre_cfg_string(lcfg, 1));
999                 rc = lmv_add_target(obd, &tgt_uuid);
1000                 GOTO(out, rc);
1001         default: {
1002                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
1003                 GOTO(out, rc = -EINVAL);
1004         }
1005         }
1006 out:
1007         RETURN(rc);
1008 }
1009
1010 static int lmv_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1011                       __u64 max_age)
1012 {
1013         struct lmv_obd *lmv = &obd->u.lmv;
1014         struct obd_statfs *temp;
1015         int rc = 0, i;
1016         ENTRY;
1017
1018         rc = lmv_check_connect(obd);
1019         if (rc)
1020                 RETURN(rc);
1021
1022         OBD_ALLOC(temp, sizeof(*temp));
1023         if (temp == NULL)
1024                 RETURN(-ENOMEM);
1025
1026         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1027                 if (lmv->tgts[i].ltd_exp == NULL)
1028                         continue;
1029
1030                 rc = obd_statfs(lmv->tgts[i].ltd_exp->exp_obd, temp, max_age);
1031                 if (rc) {
1032                         CERROR("can't stat MDS #%d (%s), error %d\n", i,
1033                                lmv->tgts[i].ltd_exp->exp_obd->obd_name,
1034                                rc);
1035                         GOTO(out_free_temp, rc);
1036                 }
1037                 if (i == 0) {
1038                         *osfs = *temp;
1039                 } else {
1040                         osfs->os_bavail += temp->os_bavail;
1041                         osfs->os_blocks += temp->os_blocks;
1042                         osfs->os_ffree += temp->os_ffree;
1043                         osfs->os_files += temp->os_files;
1044                 }
1045         }
1046
1047         EXIT;
1048 out_free_temp:
1049         OBD_FREE(temp, sizeof(*temp));
1050         return rc;
1051 }
1052
1053 static int lmv_getstatus(struct obd_export *exp,
1054                          struct lu_fid *fid,
1055                          struct obd_capa **pc)
1056 {
1057         struct obd_device *obd = exp->exp_obd;
1058         struct lmv_obd *lmv = &obd->u.lmv;
1059         int rc;
1060         ENTRY;
1061
1062         rc = lmv_check_connect(obd);
1063         if (rc)
1064                 RETURN(rc);
1065
1066         rc = md_getstatus(lmv->tgts[0].ltd_exp, fid, pc);
1067
1068         RETURN(rc);
1069 }
1070
1071 static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid,
1072                         struct obd_capa *oc, obd_valid valid, const char *name,
1073                         const char *input, int input_size, int output_size,
1074                         int flags, struct ptlrpc_request **request)
1075 {
1076         struct obd_device *obd = exp->exp_obd;
1077         struct lmv_obd *lmv = &obd->u.lmv;
1078         struct obd_export *tgt_exp;
1079         int rc;
1080         ENTRY;
1081
1082         rc = lmv_check_connect(obd);
1083         if (rc)
1084                 RETURN(rc);
1085
1086         tgt_exp = lmv_get_export(lmv, fid);
1087         if (IS_ERR(tgt_exp))
1088                 RETURN(PTR_ERR(tgt_exp));
1089
1090         rc = md_getxattr(tgt_exp, fid, oc, valid, name, input, input_size,
1091                          output_size, flags, request);
1092
1093         RETURN(rc);
1094 }
1095
1096 static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
1097                         struct obd_capa *oc, obd_valid valid, const char *name,
1098                         const char *input, int input_size, int output_size,
1099                         int flags, struct ptlrpc_request **request)
1100 {
1101         struct obd_device *obd = exp->exp_obd;
1102         struct lmv_obd *lmv = &obd->u.lmv;
1103         struct obd_export *tgt_exp;
1104         int rc;
1105         ENTRY;
1106
1107         rc = lmv_check_connect(obd);
1108         if (rc)
1109                 RETURN(rc);
1110
1111         tgt_exp = lmv_get_export(lmv, fid);
1112         if (IS_ERR(tgt_exp))
1113                 RETURN(PTR_ERR(tgt_exp));
1114
1115         rc = md_setxattr(tgt_exp, fid, oc, valid, name,
1116                          input, input_size, output_size, flags, request);
1117
1118         RETURN(rc);
1119 }
1120
1121 static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid,
1122                        struct obd_capa *oc, obd_valid valid, int ea_size,
1123                        struct ptlrpc_request **request)
1124 {
1125         struct obd_device *obd = exp->exp_obd;
1126         struct lmv_obd *lmv = &obd->u.lmv;
1127         struct obd_export *tgt_exp;
1128         struct lmv_obj *obj;
1129         int rc, i;
1130         ENTRY;
1131
1132         rc = lmv_check_connect(obd);
1133         if (rc)
1134                 RETURN(rc);
1135
1136         tgt_exp = lmv_get_export(lmv, fid);
1137         if (IS_ERR(tgt_exp))
1138                 RETURN(PTR_ERR(tgt_exp));
1139
1140         rc = md_getattr(tgt_exp, fid, oc, valid, ea_size, request);
1141         if (rc)
1142                 RETURN(rc);
1143
1144         obj = lmv_obj_grab(obd, fid);
1145
1146         CDEBUG(D_OTHER, "GETATTR for "DFID" %s\n",
1147                PFID(fid), obj ? "(split)" : "");
1148
1149         /* if object is split, then we loop over all the slaves and gather size
1150          * attribute. In ideal world we would have to gather also mds field from
1151          * all slaves, as object is spread over the cluster and this is
1152          * definitely interesting information and it is not good to loss it,
1153          * but... */
1154         if (obj) {
1155                 struct mdt_body *body;
1156
1157                 if (*request == NULL) {
1158                         lmv_obj_put(obj);
1159                         RETURN(rc);
1160                 }
1161
1162                 body = lustre_msg_buf((*request)->rq_repmsg, REPLY_REC_OFF,
1163                                       sizeof(*body));
1164                 LASSERT(body != NULL);
1165
1166                 lmv_obj_lock(obj);
1167
1168                 for (i = 0; i < obj->lo_objcount; i++) {
1169                         if (lmv->tgts[i].ltd_exp == NULL) {
1170                                 CWARN("%s: NULL export for %d\n",
1171                                       obd->obd_name, i);
1172                                 continue;
1173                         }
1174
1175                         /* skip master obj. */
1176                         if (lu_fid_eq(&obj->lo_fid, &obj->lo_inodes[i].li_fid))
1177                                 continue;
1178
1179                         body->size += obj->lo_inodes[i].li_size;
1180                 }
1181
1182                 lmv_obj_unlock(obj);
1183                 lmv_obj_put(obj);
1184         }
1185
1186         RETURN(rc);
1187 }
1188
1189 static int lmv_change_cbdata(struct obd_export *exp, const struct lu_fid *fid,
1190                              ldlm_iterator_t it, void *data)
1191 {
1192         struct obd_device *obd = exp->exp_obd;
1193         struct lmv_obd *lmv = &obd->u.lmv;
1194         int i, rc;
1195         ENTRY;
1196
1197         rc = lmv_check_connect(obd);
1198         if (rc)
1199                 RETURN(rc);
1200
1201         CDEBUG(D_OTHER, "CBDATA for "DFID"\n", PFID(fid));
1202
1203         /* with CMD every object can have two locks in different namespaces:
1204          * lookup lock in space of mds storing direntry and update/open lock in
1205          * space of mds storing inode */
1206         for (i = 0; i < lmv->desc.ld_tgt_count; i++)
1207                 md_change_cbdata(lmv->tgts[i].ltd_exp, fid, it, data);
1208
1209         RETURN(0);
1210 }
1211
1212 static int lmv_close(struct obd_export *exp,
1213                      struct md_op_data *op_data,
1214                      struct obd_client_handle *och,
1215                      struct ptlrpc_request **request)
1216 {
1217         struct obd_device *obd = exp->exp_obd;
1218         struct lmv_obd *lmv = &obd->u.lmv;
1219         struct obd_export *tgt_exp;
1220         int rc;
1221         ENTRY;
1222
1223         rc = lmv_check_connect(obd);
1224         if (rc)
1225                 RETURN(rc);
1226
1227         tgt_exp = lmv_get_export(lmv, &op_data->fid1);
1228         if (IS_ERR(tgt_exp))
1229                 RETURN(PTR_ERR(tgt_exp));
1230
1231         CDEBUG(D_OTHER, "CLOSE "DFID"\n", PFID(&op_data->fid1));
1232         rc = md_close(tgt_exp, op_data, och, request);
1233         RETURN(rc);
1234 }
1235
1236 /*
1237  * Called in the case MDS returns -ERESTART on create on open, what means that
1238  * directory is split and its LMV presentation object has to be updated.
1239  */
1240 int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid)
1241 {
1242         struct obd_device *obd = exp->exp_obd;
1243         struct lmv_obd *lmv = &obd->u.lmv;
1244         struct ptlrpc_request *req = NULL;
1245         struct obd_export *tgt_exp;
1246         struct lmv_obj *obj;
1247         struct lustre_md md;
1248         int mealen, rc;
1249         __u64 valid;
1250         ENTRY;
1251
1252         md.mea = NULL;
1253         mealen = lmv_get_easize(lmv);
1254
1255         valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA;
1256
1257         tgt_exp = lmv_get_export(lmv, fid);
1258         if (IS_ERR(tgt_exp))
1259                 RETURN(PTR_ERR(tgt_exp));
1260
1261         /* time to update mea of parent fid */
1262         rc = md_getattr(tgt_exp, fid, NULL, valid, mealen, &req);
1263         if (rc) {
1264                 CERROR("md_getattr() failed, error %d\n", rc);
1265                 GOTO(cleanup, rc);
1266         }
1267
1268         rc = md_get_lustre_md(tgt_exp, req, 1, NULL, exp, &md);
1269         if (rc) {
1270                 CERROR("mdc_get_lustre_md() failed, error %d\n", rc);
1271                 GOTO(cleanup, rc);
1272         }
1273
1274         if (md.mea == NULL)
1275                 GOTO(cleanup, rc = -ENODATA);
1276
1277         obj = lmv_obj_create(exp, fid, md.mea);
1278         if (IS_ERR(obj))
1279                 rc = PTR_ERR(obj);
1280         else
1281                 lmv_obj_put(obj);
1282
1283         obd_free_memmd(exp, (struct lov_stripe_md **)&md.mea);
1284
1285         EXIT;
1286 cleanup:
1287         if (req)
1288                 ptlrpc_req_finished(req);
1289         return rc;
1290 }
1291
1292 int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
1293                const void *data, int datalen, int mode, __u32 uid,
1294                __u32 gid, __u32 cap_effective,  __u64 rdev,
1295                struct ptlrpc_request **request)
1296 {
1297         struct obd_device *obd = exp->exp_obd;
1298         struct lmv_obd *lmv = &obd->u.lmv;
1299         struct obd_export *tgt_exp;
1300         struct lmv_obj *obj;
1301         int rc, loop = 0;
1302         ENTRY;
1303
1304         rc = lmv_check_connect(obd);
1305         if (rc)
1306                 RETURN(rc);
1307
1308         if (!lmv->desc.ld_active_tgt_count)
1309                 RETURN(-EIO);
1310 repeat:
1311         LASSERT(++loop <= 2);
1312         obj = lmv_obj_grab(obd, &op_data->fid1);
1313         if (obj) {
1314                 mdsno_t mds;
1315
1316                 mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1317                                    op_data->name, op_data->namelen);
1318                 op_data->fid1 = obj->lo_inodes[mds].li_fid;
1319                 lmv_obj_put(obj);
1320         }
1321
1322         CDEBUG(D_OTHER, "CREATE '%*s' on "DFID"\n", op_data->namelen,
1323                op_data->name, PFID(&op_data->fid1));
1324
1325         tgt_exp = lmv_get_export(lmv, &op_data->fid1);
1326         if (IS_ERR(tgt_exp))
1327                 RETURN(PTR_ERR(tgt_exp));
1328
1329         rc = md_create(tgt_exp, op_data, data, datalen, mode, uid, gid,
1330                        cap_effective, rdev, request);
1331         if (rc == 0) {
1332                 if (*request == NULL)
1333                         RETURN(rc);
1334                 CDEBUG(D_OTHER, "created. "DFID"\n", PFID(&op_data->fid1));
1335         } else if (rc == -ERESTART) {
1336                 /*
1337                  * Directory got split. Time to update local object and repeat
1338                  * the request with proper MDS.
1339                  */
1340                 rc = lmv_handle_split(exp, &op_data->fid1);
1341                 if (rc == 0) {
1342                         ptlrpc_req_finished(*request);
1343                         rc = lmv_alloc_fid_for_split(obd, &op_data->fid1,
1344                                                      op_data, &op_data->fid2);
1345                         if (rc)
1346                                 RETURN(rc);
1347                         goto repeat;
1348                 }
1349         }
1350         RETURN(rc);
1351 }
1352
1353 static int lmv_done_writing(struct obd_export *exp,
1354                             struct md_op_data *op_data,
1355                             struct obd_client_handle *och)
1356 {
1357         struct obd_device *obd = exp->exp_obd;
1358         struct lmv_obd *lmv = &obd->u.lmv;
1359         struct obd_export *tgt_exp;
1360         int rc;
1361         ENTRY;
1362
1363         rc = lmv_check_connect(obd);
1364         if (rc)
1365                 RETURN(rc);
1366
1367         tgt_exp = lmv_get_export(lmv, &op_data->fid1);
1368         if (IS_ERR(tgt_exp))
1369                 RETURN(PTR_ERR(tgt_exp));
1370
1371         rc = md_done_writing(tgt_exp, op_data, och);
1372         RETURN(rc);
1373 }
1374
1375 static int
1376 lmv_enqueue_slaves(struct obd_export *exp, int locktype,
1377                    struct lookup_intent *it, int lockmode,
1378                    struct md_op_data *op_data, struct lustre_handle *lockh,
1379                    void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
1380                    ldlm_blocking_callback cb_blocking, void *cb_data)
1381 {
1382         struct obd_device *obd = exp->exp_obd;
1383         struct lmv_obd *lmv = &obd->u.lmv;
1384         struct lmv_stripe_md *mea = op_data->mea1;
1385         struct md_op_data *op_data2;
1386         struct obd_export *tgt_exp;
1387         int i, rc = 0;
1388         ENTRY;
1389
1390         OBD_ALLOC_PTR(op_data2);
1391         if (op_data2 == NULL)
1392                 RETURN(-ENOMEM);
1393
1394         LASSERT(mea != NULL);
1395         for (i = 0; i < mea->mea_count; i++) {
1396                 memset(op_data2, 0, sizeof(*op_data2));
1397                 op_data2->fid1 = mea->mea_ids[i];
1398
1399                 tgt_exp = lmv_get_export(lmv, &op_data2->fid1);
1400                 if (IS_ERR(tgt_exp))
1401                         GOTO(cleanup, rc = PTR_ERR(tgt_exp));
1402
1403                 if (tgt_exp == NULL)
1404                         continue;
1405
1406                 rc = md_enqueue(tgt_exp, locktype, it, lockmode, op_data2,
1407                                 lockh + i, lmm, lmmsize, cb_compl, cb_blocking,
1408                                 cb_data, 0);
1409
1410                 CDEBUG(D_OTHER, "take lock on slave "DFID" -> %d/%d\n",
1411                        PFID(&mea->mea_ids[i]), rc, it->d.lustre.it_status);
1412
1413                 if (rc)
1414                         GOTO(cleanup, rc);
1415
1416                 if (it->d.lustre.it_data) {
1417                         struct ptlrpc_request *req;
1418                         req = (struct ptlrpc_request *)it->d.lustre.it_data;
1419                         ptlrpc_req_finished(req);
1420                 }
1421
1422                 if (it->d.lustre.it_status)
1423                         GOTO(cleanup, rc = it->d.lustre.it_status);
1424         }
1425
1426         EXIT;
1427 cleanup:
1428         OBD_FREE_PTR(op_data2);
1429
1430         if (rc != 0) {
1431                 /* drop all taken locks */
1432                 while (--i >= 0) {
1433                         if (lockh[i].cookie)
1434                                 ldlm_lock_decref(lockh + i, lockmode);
1435                         lockh[i].cookie = 0;
1436                 }
1437         }
1438         return rc;
1439 }
1440
1441 static int
1442 lmv_enqueue_remote(struct obd_export *exp, int lock_type,
1443                    struct lookup_intent *it, int lock_mode,
1444                    struct md_op_data *op_data, struct lustre_handle *lockh,
1445                    void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
1446                    ldlm_blocking_callback cb_blocking, void *cb_data,
1447                    int extra_lock_flags)
1448 {
1449         struct ptlrpc_request *req = it->d.lustre.it_data;
1450         struct obd_device *obd = exp->exp_obd;
1451         struct lmv_obd *lmv = &obd->u.lmv;
1452         struct mdt_body *body = NULL;
1453         struct lustre_handle plock;
1454         struct obd_export *tgt_exp;
1455         struct md_op_data *rdata;
1456         int rc = 0, pmode;
1457         ENTRY;
1458
1459         body = lustre_msg_buf(req->rq_repmsg,
1460                               DLM_REPLY_REC_OFF, sizeof(*body));
1461         LASSERT(body != NULL);
1462
1463         if (!(body->valid & OBD_MD_MDS))
1464                 RETURN(0);
1465
1466         CDEBUG(D_OTHER, "ENQUEUE '%s' on "DFID" -> "DFID"\n",
1467                LL_IT2STR(it), PFID(&op_data->fid1), PFID(&body->fid1));
1468
1469         /* We got LOOKUP lock, but we really need attrs */
1470         pmode = it->d.lustre.it_lock_mode;
1471         LASSERT(pmode != 0);
1472         memcpy(&plock, lockh, sizeof(plock));
1473         it->d.lustre.it_lock_mode = 0;
1474         it->d.lustre.it_data = NULL;
1475
1476         it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
1477         ptlrpc_req_finished(req);
1478
1479         tgt_exp = lmv_get_export(lmv, &body->fid1);
1480         if (IS_ERR(tgt_exp))
1481                 GOTO(out, PTR_ERR(tgt_exp));
1482
1483         OBD_ALLOC_PTR(rdata);
1484         if (rdata == NULL)
1485                 GOTO(out, -ENOMEM);
1486
1487         rdata->fid1 = body->fid1;
1488         rdata->name = NULL;
1489         rdata->namelen = 0;
1490
1491         rc = md_enqueue(tgt_exp, lock_type, it, lock_mode, rdata,
1492                         lockh, lmm, lmmsize, cb_compl, cb_blocking,
1493                         cb_data, extra_lock_flags);
1494         OBD_FREE_PTR(rdata);
1495         EXIT;
1496 out:
1497         ldlm_lock_decref(&plock, pmode);
1498         return rc;
1499 }
1500
1501 static int
1502 lmv_enqueue(struct obd_export *exp, int lock_type,
1503             struct lookup_intent *it, int lock_mode,
1504             struct md_op_data *op_data, struct lustre_handle *lockh,
1505             void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
1506             ldlm_blocking_callback cb_blocking, void *cb_data,
1507             int extra_lock_flags)
1508 {
1509         struct obd_device *obd = exp->exp_obd;
1510         struct lmv_obd *lmv = &obd->u.lmv;
1511         struct obd_export *tgt_exp;
1512         struct lmv_obj *obj;
1513         int rc;
1514         ENTRY;
1515
1516         rc = lmv_check_connect(obd);
1517         if (rc)
1518                 RETURN(rc);
1519
1520         if (op_data->mea1 && it->it_op == IT_UNLINK) {
1521                 rc = lmv_enqueue_slaves(exp, lock_type, it, lock_mode,
1522                                         op_data, lockh, lmm, lmmsize,
1523                                         cb_compl, cb_blocking, cb_data);
1524                 RETURN(rc);
1525         }
1526
1527         if (op_data->namelen) {
1528                 obj = lmv_obj_grab(obd, &op_data->fid1);
1529                 if (obj) {
1530                         mdsno_t mds;
1531
1532                         /* directory is split. look for right mds for this
1533                          * name */
1534                         mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1535                                            (char *)op_data->name, op_data->namelen);
1536                         op_data->fid1      = obj->lo_inodes[mds].li_fid;
1537                         lmv_obj_put(obj);
1538                 }
1539         }
1540         CDEBUG(D_OTHER, "ENQUEUE '%s' on "DFID"\n", LL_IT2STR(it),
1541                PFID(&op_data->fid1));
1542
1543         tgt_exp = lmv_get_export(lmv, &op_data->fid1);
1544         if (IS_ERR(tgt_exp))
1545                 RETURN(PTR_ERR(tgt_exp));
1546
1547         rc = md_enqueue(tgt_exp, lock_type, it, lock_mode, op_data, lockh,
1548                         lmm, lmmsize, cb_compl, cb_blocking, cb_data,
1549                         extra_lock_flags);
1550
1551         if (rc == 0 && it->it_op == IT_OPEN)
1552                 rc = lmv_enqueue_remote(exp, lock_type, it, lock_mode,
1553                                         op_data, lockh, lmm, lmmsize,
1554                                         cb_compl, cb_blocking, cb_data,
1555                                         extra_lock_flags);
1556         RETURN(rc);
1557 }
1558
1559 static int
1560 lmv_getattr_name(struct obd_export *exp, const struct lu_fid *fid,
1561                  struct obd_capa *oc, const char *filename, int namelen,
1562                  obd_valid valid, int ea_size, struct ptlrpc_request **request)
1563 {
1564         struct obd_device *obd = exp->exp_obd;
1565         struct lmv_obd *lmv = &obd->u.lmv;
1566         struct lu_fid rid = *fid;
1567         struct obd_export *tgt_exp;
1568         struct mdt_body *body;
1569         struct lmv_obj *obj;
1570         int rc, loop = 0;
1571         mdsno_t mds;
1572         ENTRY;
1573
1574         rc = lmv_check_connect(obd);
1575         if (rc)
1576                 RETURN(rc);
1577
1578 repeat:
1579         LASSERT(++loop <= 2);
1580         obj = lmv_obj_grab(obd, &rid);
1581         if (obj) {
1582                 /* directory is split. look for right mds for this name */
1583                 mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1584                                    filename, namelen - 1);
1585                 rid = obj->lo_inodes[mds].li_fid;
1586                 lmv_obj_put(obj);
1587         }
1588
1589         CDEBUG(D_OTHER, "getattr_name for %*s on "DFID" -> "DFID"\n",
1590                namelen, filename, PFID(fid), PFID(&rid));
1591
1592         tgt_exp = lmv_get_export(lmv, &rid);
1593         if (IS_ERR(tgt_exp))
1594                 RETURN(PTR_ERR(tgt_exp));
1595
1596         rc = md_getattr_name(tgt_exp, &rid, oc, filename, namelen, valid,
1597                              ea_size, request);
1598         if (rc == 0) {
1599                 body = lustre_msg_buf((*request)->rq_repmsg,
1600                                       REQ_REC_OFF, sizeof(*body));
1601                 LASSERT(body != NULL);
1602
1603                 if (body->valid & OBD_MD_MDS) {
1604                         struct ptlrpc_request *req = NULL;
1605
1606                         rid = body->fid1;
1607                         CDEBUG(D_OTHER, "request attrs for "DFID"\n",
1608                                PFID(&rid));
1609
1610                         tgt_exp = lmv_get_export(lmv, &rid);
1611                         if (IS_ERR(tgt_exp)) {
1612                                 ptlrpc_req_finished(*request);
1613                                 RETURN(PTR_ERR(tgt_exp));
1614                         }
1615
1616                         rc = md_getattr_name(tgt_exp, &rid, NULL, NULL, 1,
1617                                              valid, ea_size, &req);
1618                         ptlrpc_req_finished(*request);
1619                         *request = req;
1620                 }
1621         } else if (rc == -ERESTART) {
1622                 /* directory got split. time to update local object and repeat
1623                  * the request with proper MDS */
1624                 rc = lmv_handle_split(exp, &rid);
1625                 if (rc == 0) {
1626                         ptlrpc_req_finished(*request);
1627                         goto repeat;
1628                 }
1629         }
1630         RETURN(rc);
1631 }
1632
1633 /*
1634  * llite passes fid of an target inode in op_data->fid1 and id of directory in
1635  * op_data->fid2
1636  */
1637 static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
1638                     struct ptlrpc_request **request)
1639 {
1640         struct obd_device *obd = exp->exp_obd;
1641         struct lmv_obd *lmv = &obd->u.lmv;
1642         struct lmv_obj *obj;
1643         mdsno_t mds;
1644         int rc;
1645         ENTRY;
1646
1647         rc = lmv_check_connect(obd);
1648         if (rc)
1649                 RETURN(rc);
1650
1651         if (op_data->namelen != 0) {
1652                 /* usual link request */
1653                 obj = lmv_obj_grab(obd, &op_data->fid2);
1654                 if (obj) {
1655                         rc = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1656                                           op_data->name, op_data->namelen);
1657                         op_data->fid2      = obj->lo_inodes[rc].li_fid;
1658                         lmv_obj_put(obj);
1659                 }
1660
1661                 rc = lmv_fld_lookup(lmv, &op_data->fid2, &mds);
1662                 if (rc)
1663                         RETURN(rc);
1664
1665                 CDEBUG(D_OTHER,"link "DFID":%*s to "DFID"\n",
1666                        PFID(&op_data->fid2), op_data->namelen,
1667                        op_data->name, PFID(&op_data->fid1));
1668         } else {
1669                 rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds);
1670                 if (rc)
1671                         RETURN(rc);
1672
1673                 /* request from MDS to acquire i_links for inode by fid1 */
1674                 CDEBUG(D_OTHER, "inc i_nlinks for "DFID"\n",
1675                        PFID(&op_data->fid1));
1676         }
1677
1678         CDEBUG(D_OTHER, "forward to MDS #"LPU64" ("DFID")\n",
1679                mds, PFID(&op_data->fid1));
1680
1681         op_data->fsuid = current->fsuid;
1682         op_data->fsgid = current->fsgid;
1683         op_data->cap   = current->cap_effective;
1684         rc = md_link(lmv->tgts[mds].ltd_exp, op_data, request);
1685
1686         RETURN(rc);
1687 }
1688
1689 static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
1690                       const char *old, int oldlen, const char *new, int newlen,
1691                       struct ptlrpc_request **request)
1692 {
1693         struct obd_device *obd = exp->exp_obd;
1694         struct lmv_obd *lmv = &obd->u.lmv;
1695         struct lmv_obj *obj;
1696         mdsno_t mds, mds2;
1697         int rc;
1698         ENTRY;
1699
1700         CDEBUG(D_OTHER, "rename %*s in "DFID" to %*s in "DFID"\n",
1701                oldlen, old, PFID(&op_data->fid1),
1702                newlen, new, PFID(&op_data->fid2));
1703
1704         rc = lmv_check_connect(obd);
1705         if (rc)
1706                 RETURN(rc);
1707
1708         if (oldlen == 0) {
1709                 /*
1710                  * MDS with old dir entry is asking another MDS to create name
1711                  * there.
1712                  */
1713                 CDEBUG(D_OTHER,
1714                        "create %*s(%d/%d) in "DFID" pointing "
1715                        "to "DFID"\n", newlen, new, oldlen, newlen,
1716                        PFID(&op_data->fid2), PFID(&op_data->fid1));
1717
1718                 rc = lmv_fld_lookup(lmv, &op_data->fid2, &mds);
1719                 if (rc)
1720                         RETURN(rc);
1721
1722                 /*
1723                  * target directory can be split, sowe should forward request to
1724                  * the right MDS.
1725                  */
1726                 obj = lmv_obj_grab(obd, &op_data->fid2);
1727                 if (obj) {
1728                         mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1729                                            (char *)new, newlen);
1730                         op_data->fid2      = obj->lo_inodes[mds].li_fid;
1731                         CDEBUG(D_OTHER, "forward to MDS #"LPU64" ("DFID")\n", mds,
1732                                PFID(&op_data->fid2));
1733                         lmv_obj_put(obj);
1734                 }
1735                 goto request;
1736         }
1737
1738         obj = lmv_obj_grab(obd, &op_data->fid1);
1739         if (obj) {
1740                 /*
1741                  * directory is already split, so we have to forward request to
1742                  * the right MDS.
1743                  */
1744                 mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1745                                    (char *)old, oldlen);
1746                 op_data->fid1      = obj->lo_inodes[mds].li_fid;
1747                 CDEBUG(D_OTHER, "forward to MDS #"LPU64" ("DFID")\n", mds,
1748                        PFID(&op_data->fid1));
1749                 lmv_obj_put(obj);
1750         }
1751
1752         obj = lmv_obj_grab(obd, &op_data->fid2);
1753         if (obj) {
1754                 /*
1755                  * directory is already split, so we have to forward request to
1756                  * the right MDS.
1757                  */
1758                 mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1759                                    (char *)new, newlen);
1760
1761                 op_data->fid2 = obj->lo_inodes[mds].li_fid;
1762                 CDEBUG(D_OTHER, "forward to MDS #"LPU64" ("DFID")\n", mds,
1763                        PFID(&op_data->fid2));
1764                 lmv_obj_put(obj);
1765         }
1766
1767         rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds);
1768         if (rc)
1769                 RETURN(rc);
1770
1771 request:
1772         rc = lmv_fld_lookup(lmv, &op_data->fid2, &mds2);
1773         if (rc)
1774                 RETURN(rc);
1775
1776         if (mds != mds2) {
1777                 CDEBUG(D_OTHER,"cross-node rename "DFID"/%*s to "DFID"/%*s\n",
1778                        PFID(&op_data->fid1), oldlen, old,
1779                        PFID(&op_data->fid2), newlen, new);
1780         }
1781         op_data->fsuid = current->fsuid;
1782         op_data->fsgid = current->fsgid;
1783         op_data->cap   = current->cap_effective;
1784         rc = md_rename(lmv->tgts[mds].ltd_exp, op_data, old, oldlen,
1785                        new, newlen, request);
1786         RETURN(rc);
1787 }
1788
1789 static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
1790                        void *ea, int ealen, void *ea2, int ea2len,
1791                        struct ptlrpc_request **request)
1792 {
1793         struct obd_device *obd = exp->exp_obd;
1794         struct lmv_obd *lmv = &obd->u.lmv;
1795         struct ptlrpc_request *req;
1796         struct obd_export *tgt_exp;
1797         struct lmv_obj *obj;
1798         int rc = 0, i;
1799         ENTRY;
1800
1801         rc = lmv_check_connect(obd);
1802         if (rc)
1803                 RETURN(rc);
1804
1805         obj = lmv_obj_grab(obd, &op_data->fid1);
1806
1807         CDEBUG(D_OTHER, "SETATTR for "DFID", valid 0x%x%s\n",
1808                PFID(&op_data->fid1), op_data->attr.ia_valid,
1809                obj ? ", split" : "");
1810
1811         if (obj) {
1812                 for (i = 0; i < obj->lo_objcount; i++) {
1813                         op_data->fid1 = obj->lo_inodes[i].li_fid;
1814
1815                         tgt_exp = lmv_get_export(lmv, &op_data->fid1);
1816                         if (IS_ERR(tgt_exp)) {
1817                                 rc = PTR_ERR(tgt_exp);
1818                                 break;
1819                         }
1820
1821                         rc = md_setattr(tgt_exp, op_data, ea, ealen,
1822                                         ea2, ea2len, &req);
1823
1824                         if (lu_fid_eq(&obj->lo_fid, &obj->lo_inodes[i].li_fid)) {
1825                                 /*
1826                                  * this is master object and this request should
1827                                  * be returned back to llite.
1828                                  */
1829                                 *request = req;
1830                         } else {
1831                                 ptlrpc_req_finished(req);
1832                         }
1833
1834                         if (rc)
1835                                 break;
1836                 }
1837                 lmv_obj_put(obj);
1838         } else {
1839                 tgt_exp = lmv_get_export(lmv, &op_data->fid1);
1840                 if (IS_ERR(tgt_exp))
1841                         RETURN(PTR_ERR(tgt_exp));
1842
1843                 rc = md_setattr(tgt_exp, op_data, ea, ealen, ea2,
1844                                 ea2len, request);
1845         }
1846         RETURN(rc);
1847 }
1848
1849 static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
1850                     struct obd_capa *oc, struct ptlrpc_request **request)
1851 {
1852         struct obd_device *obd = exp->exp_obd;
1853         struct lmv_obd *lmv = &obd->u.lmv;
1854         struct obd_export *tgt_exp;
1855         int rc;
1856         ENTRY;
1857
1858         rc = lmv_check_connect(obd);
1859         if (rc)
1860                 RETURN(rc);
1861
1862         tgt_exp = lmv_get_export(lmv, fid);
1863         if (IS_ERR(tgt_exp))
1864                 RETURN(PTR_ERR(tgt_exp));
1865
1866         rc = md_sync(tgt_exp, fid, oc, request);
1867         RETURN(rc);
1868 }
1869
1870 /* main purpose of LMV blocking ast is to remove split directory LMV
1871  * presentation object (struct lmv_obj) attached to the lock being revoked. */
1872 int lmv_blocking_ast(struct ldlm_lock *lock,
1873                      struct ldlm_lock_desc *desc,
1874                      void *data, int flag)
1875 {
1876         struct lustre_handle lockh;
1877         struct lmv_obj *obj;
1878         int rc;
1879         ENTRY;
1880
1881         switch (flag) {
1882         case LDLM_CB_BLOCKING:
1883                 ldlm_lock2handle(lock, &lockh);
1884                 rc = ldlm_cli_cancel(&lockh);
1885                 if (rc < 0) {
1886                         CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
1887                         RETURN(rc);
1888                 }
1889                 break;
1890         case LDLM_CB_CANCELING:
1891                 /* time to drop cached attrs for dirobj */
1892                 obj = lock->l_ast_data;
1893                 if (obj) {
1894                         CDEBUG(D_OTHER, "cancel %s on "LPU64"/"LPU64
1895                                ", master "DFID"\n",
1896                                lock->l_resource->lr_name.name[3] == 1 ?
1897                                "LOOKUP" : "UPDATE",
1898                                lock->l_resource->lr_name.name[0],
1899                                lock->l_resource->lr_name.name[1],
1900                                PFID(&obj->lo_fid));
1901                         lmv_obj_put(obj);
1902                 }
1903                 break;
1904         default:
1905                 LBUG();
1906         }
1907         RETURN(0);
1908 }
1909
1910 static int lmv_reset_hash_seg_end (struct lmv_obd *lmv, struct lmv_obj *obj,
1911                                    const struct lu_fid *fid, int index,
1912                                    struct lu_dirpage *dp)
1913 {
1914         struct ptlrpc_request *tmp_req = NULL;
1915         struct page *page = NULL;
1916         struct lu_dirpage *next_dp;
1917         struct obd_export *tgt_exp;
1918         struct lu_fid rid;
1919         __u64 max_hash = MAX_HASH_SIZE;
1920         __u32 seg_end;
1921         int rc = 0;
1922         ENTRY;
1923         
1924         /*
1925          * We have reached the end of this hash segment, and the start offset of
1926          * next segment need to be gotten out from the next segment, set it to
1927          * the end of this segment. */
1928
1929         do_div(max_hash, obj->lo_objcount);
1930         seg_end = (__u32)max_hash * index;
1931
1932         /* Get start offset from next segment */
1933         rid = obj->lo_inodes[index].li_fid;
1934         tgt_exp = lmv_get_export(lmv, &rid);
1935         if (IS_ERR(tgt_exp))
1936                 GOTO(cleanup, PTR_ERR(tgt_exp));
1937
1938         /* Alloc a page to get next segment hash,
1939          * FIXME: should we try to page from cache first */
1940         page = alloc_pages(GFP_KERNEL, 0);
1941         if (!page)
1942                 GOTO(cleanup, rc = -ENOMEM);
1943
1944         rc = md_readpage(tgt_exp, &rid, NULL, seg_end, page, &tmp_req);
1945         if (rc) {
1946                 /* E2BIG means it already reached the end of the dir,
1947                  * no need reset the hash segment end */
1948                 if (rc == -E2BIG)
1949                        GOTO(cleanup, rc = 0);
1950                 if (rc != -ERANGE)
1951                        GOTO(cleanup, rc);
1952                 if (rc == -ERANGE)
1953                         rc = 0;
1954         }
1955         kmap(page);
1956         next_dp = cfs_page_address(page);
1957         LASSERT(le32_to_cpu(next_dp->ldp_hash_start) >= seg_end);
1958         dp->ldp_hash_end = next_dp->ldp_hash_start;
1959         kunmap(page);
1960         CDEBUG(D_INFO,"reset h_end %x for split obj"DFID"o_count %d index %d\n",
1961                le32_to_cpu(dp->ldp_hash_end), PFID(&rid), obj->lo_objcount,
1962                index);
1963 cleanup:
1964         if (tmp_req)
1965                 ptlrpc_req_finished(tmp_req);
1966         if (page)
1967                 __free_pages(page, 0);
1968         RETURN(rc);
1969 }
1970
1971 static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
1972                         struct obd_capa *oc, __u64 offset, struct page *page,
1973                         struct ptlrpc_request **request)
1974 {
1975         struct obd_device *obd = exp->exp_obd;
1976         struct lmv_obd *lmv = &obd->u.lmv;
1977         struct obd_export *tgt_exp;
1978         struct lu_fid rid = *fid;
1979         struct lmv_obj *obj;
1980         int i = 0, rc;
1981         ENTRY;
1982
1983         rc = lmv_check_connect(obd);
1984         if (rc)
1985                 RETURN(rc);
1986
1987         CDEBUG(D_INFO, "READPAGE at %llu from "DFID"\n", offset, PFID(&rid));
1988
1989         obj = lmv_obj_grab(obd, fid);
1990         if (obj) {
1991                 __u64 index = offset;
1992                 __u64 seg = MAX_HASH_SIZE;
1993                 lmv_obj_lock(obj);
1994
1995                 LASSERT(obj->lo_objcount > 0);
1996                 do_div(seg, obj->lo_objcount);
1997                 do_div(index, (__u32)seg);
1998                 i = (int)index;
1999                 rid = obj->lo_inodes[i].li_fid;
2000
2001                 lmv_obj_unlock(obj);
2002
2003                 CDEBUG(D_INFO, "forward to "DFID" with offset %lu i %d\n",
2004                        PFID(&rid), (unsigned long)offset, i);
2005         }
2006
2007         tgt_exp = lmv_get_export(lmv, &rid);
2008         if (IS_ERR(tgt_exp))
2009                 GOTO(cleanup, PTR_ERR(tgt_exp));
2010
2011         rc = md_readpage(tgt_exp, &rid, oc, offset, page, request);
2012         if (rc)
2013                 GOTO(cleanup, rc);
2014
2015         if (obj && i < obj->lo_objcount - 1) {
2016                 struct lu_dirpage *dp;
2017                 __u32 end;
2018                 kmap(page);
2019                 dp = cfs_page_address(page);
2020                 end = le32_to_cpu(dp->ldp_hash_end);
2021                 CDEBUG(D_INFO, "get "DFID" with end %lu i %d\n",
2022                        PFID(&rid), (unsigned long)end, i);
2023                 if (end == ~0ul) {
2024                         do {
2025                                 rc = lmv_reset_hash_seg_end(lmv, obj, fid,
2026                                                             ++i, dp);
2027                                 if (i >= obj->lo_objcount - 1)
2028                                         break;
2029                                 /* if there are no entries in this segment 
2030                                  * and it is not the last hash segment */
2031                         } while (rc != -E2BIG);
2032                 }
2033                 kunmap(page);
2034         }
2035         /*
2036          * Here we could remove "." and ".." from all pages which at not from
2037          * master. But MDS has only "." and ".." for master dir.
2038          */
2039 cleanup:
2040         if (obj)
2041                 lmv_obj_put(obj);
2042         RETURN(rc);
2043 }
2044
2045 static int lmv_unlink_slaves(struct obd_export *exp,
2046                              struct md_op_data *op_data,
2047                              struct ptlrpc_request **req)
2048 {
2049         struct obd_device *obd = exp->exp_obd;
2050         struct lmv_obd *lmv = &obd->u.lmv;
2051         struct lmv_stripe_md *mea = op_data->mea1;
2052         struct md_op_data *op_data2;
2053         struct obd_export *tgt_exp;
2054         int i, rc = 0;
2055         ENTRY;
2056
2057         OBD_ALLOC_PTR(op_data2);
2058         if (op_data2 == NULL)
2059                 RETURN(-ENOMEM);
2060
2061         LASSERT(mea != NULL);
2062         for (i = 0; i < mea->mea_count; i++) {
2063                 memset(op_data2, 0, sizeof(*op_data2));
2064                 op_data2->fid1 = mea->mea_ids[i];
2065                 op_data2->mode = MDS_MODE_DONT_LOCK | S_IFDIR;
2066                 op_data2->fsuid = current->fsuid;
2067                 op_data2->fsgid = current->fsgid;
2068                 tgt_exp = lmv_get_export(lmv, &op_data2->fid1);
2069                 if (IS_ERR(tgt_exp))
2070                         GOTO(out_free_op_data2, rc = PTR_ERR(tgt_exp));
2071
2072                 if (tgt_exp == NULL)
2073                         continue;
2074
2075                 rc = md_unlink(tgt_exp, op_data2, req);
2076
2077                 CDEBUG(D_OTHER, "unlink slave "DFID" -> %d\n",
2078                        PFID(&mea->mea_ids[i]), rc);
2079
2080                 if (*req) {
2081                         ptlrpc_req_finished(*req);
2082                         *req = NULL;
2083                 }
2084                 if (rc)
2085                         GOTO(out_free_op_data2, rc);
2086         }
2087
2088         EXIT;
2089 out_free_op_data2:
2090         OBD_FREE_PTR(op_data2);
2091         return rc;
2092 }
2093
2094 static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
2095                       struct ptlrpc_request **request)
2096 {
2097         struct obd_device *obd = exp->exp_obd;
2098         struct lmv_obd *lmv = &obd->u.lmv;
2099         struct obd_export *tgt_exp;
2100         int rc, i;
2101         ENTRY;
2102
2103         rc = lmv_check_connect(obd);
2104         if (rc)
2105                 RETURN(rc);
2106
2107         if (op_data->namelen == 0 && op_data->mea1 != NULL) {
2108                 /* mds asks to remove slave objects */
2109                 rc = lmv_unlink_slaves(exp, op_data, request);
2110                 RETURN(rc);
2111         }
2112
2113         if (op_data->namelen != 0) {
2114                 struct lmv_obj *obj;
2115
2116                 obj = lmv_obj_grab(obd, &op_data->fid1);
2117                 if (obj) {
2118                         i = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
2119                                          op_data->name, op_data->namelen);
2120                         op_data->fid1      = obj->lo_inodes[i].li_fid;
2121                         lmv_obj_put(obj);
2122                         CDEBUG(D_OTHER, "unlink '%*s' in "DFID" -> %u\n",
2123                                op_data->namelen, op_data->name,
2124                                PFID(&op_data->fid1), i);
2125                 }
2126         } else {
2127                 CDEBUG(D_OTHER, "drop i_nlink on "DFID"\n",
2128                        PFID(&op_data->fid1));
2129         }
2130         tgt_exp = lmv_get_export(lmv, &op_data->fid1);
2131         if (IS_ERR(tgt_exp))
2132                 RETURN(PTR_ERR(tgt_exp));
2133         op_data->fsuid = current->fsuid;
2134         op_data->fsgid = current->fsgid;
2135         op_data->cap   = current->cap_effective;
2136         rc = md_unlink(tgt_exp, op_data, request);
2137         RETURN(rc);
2138 }
2139
2140 static int lmv_llog_init(struct obd_device *obd, struct obd_llogs* llogs,
2141                          struct obd_device *tgt, int count,
2142                          struct llog_catid *logid, struct obd_uuid *uuid)
2143 {
2144         struct llog_ctxt *ctxt;
2145         int rc;
2146         ENTRY;
2147
2148         rc = llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
2149                         &llog_client_ops);
2150         if (rc == 0) {
2151                 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
2152                 ctxt->loc_imp = tgt->u.cli.cl_import;
2153         }
2154
2155         RETURN(rc);
2156 }
2157
2158 static int lmv_llog_finish(struct obd_device *obd, int count)
2159 {
2160         int rc;
2161         ENTRY;
2162
2163         rc = llog_cleanup(llog_get_context(obd, LLOG_CONFIG_REPL_CTXT));
2164         RETURN(rc);
2165 }
2166
2167 static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2168 {
2169         int rc = 0;
2170
2171         switch (stage) {
2172         case OBD_CLEANUP_EARLY:
2173                 /* XXX: here should be calling obd_precleanup() down to
2174                  * stack. */
2175                 break;
2176         case OBD_CLEANUP_SELF_EXP:
2177                 rc = obd_llog_finish(obd, 0);
2178                 if (rc != 0)
2179                         CERROR("failed to cleanup llogging subsystems\n");
2180                 break;
2181         default:
2182                 break;
2183         }
2184         RETURN(rc);
2185 }
2186
2187 static int lmv_get_info(struct obd_export *exp, __u32 keylen,
2188                         void *key, __u32 *vallen, void *val)
2189 {
2190         struct obd_device *obd;
2191         struct lmv_obd *lmv;
2192         int rc = 0;
2193         ENTRY;
2194
2195         obd = class_exp2obd(exp);
2196         if (obd == NULL) {
2197                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2198                        exp->exp_handle.h_cookie);
2199                 RETURN(-EINVAL);
2200         }
2201
2202         lmv = &obd->u.lmv;
2203         if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
2204                 struct lmv_tgt_desc *tgts;
2205                 int i;
2206
2207                 rc = lmv_check_connect(obd);
2208                 if (rc)
2209                         RETURN(rc);
2210
2211                 LASSERT(*vallen == sizeof(__u32));
2212                 for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count;
2213                      i++, tgts++) {
2214
2215                         /* all tgts should be connected when this get called. */
2216                         if (!tgts || !tgts->ltd_exp) {
2217                                 CERROR("target not setup?\n");
2218                                 continue;
2219                         }
2220
2221                         if (!obd_get_info(tgts->ltd_exp, keylen, key,
2222                                           vallen, val))
2223                                 RETURN(0);
2224                 }
2225                 RETURN(-EINVAL);
2226         } else if (KEY_IS(KEY_MAX_EASIZE) || KEY_IS(KEY_CONN_DATA)) {
2227                 rc = lmv_check_connect(obd);
2228                 if (rc)
2229                         RETURN(rc);
2230
2231                 /* forwarding this request to first MDS, it should know LOV
2232                  * desc. */
2233                 rc = obd_get_info(lmv->tgts[0].ltd_exp, keylen, key,
2234                                   vallen, val);
2235                 RETURN(rc);
2236         }
2237
2238         CDEBUG(D_IOCTL, "invalid key\n");
2239         RETURN(-EINVAL);
2240 }
2241
2242 int lmv_set_info_async(struct obd_export *exp, obd_count keylen,
2243                        void *key, obd_count vallen, void *val,
2244                        struct ptlrpc_request_set *set)
2245 {
2246         struct lmv_tgt_desc    *tgt;
2247         struct obd_device      *obd;
2248         struct lmv_obd         *lmv;
2249         int rc = 0;
2250         ENTRY;
2251
2252         obd = class_exp2obd(exp);
2253         if (obd == NULL) {
2254                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2255                        exp->exp_handle.h_cookie);
2256                 RETURN(-EINVAL);
2257         }
2258         lmv = &obd->u.lmv;
2259
2260         if (KEY_IS(KEY_FLUSH_CTX)) {
2261                 int i, err = 0;
2262
2263                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2264                         tgt = &lmv->tgts[i];
2265
2266                         if (!tgt->ltd_exp)
2267                                 continue;
2268
2269                         err = obd_set_info_async(tgt->ltd_exp,
2270                                                  keylen, key, vallen, val, set);
2271                         if (err && rc == 0)
2272                                 rc = err;
2273                 }
2274
2275                 RETURN(rc);
2276         }
2277
2278         RETURN(-EINVAL);
2279 }
2280
2281 int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
2282                struct lov_stripe_md *lsm)
2283 {
2284         struct obd_device *obd = class_exp2obd(exp);
2285         struct lmv_obd *lmv = &obd->u.lmv;
2286         struct lmv_stripe_md *meap, *lsmp;
2287         int mea_size, i;
2288         ENTRY;
2289
2290         mea_size = lmv_get_easize(lmv);
2291         if (!lmmp)
2292                 RETURN(mea_size);
2293
2294         if (*lmmp && !lsm) {
2295                 OBD_FREE(*lmmp, mea_size);
2296                 *lmmp = NULL;
2297                 RETURN(0);
2298         }
2299
2300         if (*lmmp == NULL) {
2301                 OBD_ALLOC(*lmmp, mea_size);
2302                 if (*lmmp == NULL)
2303                         RETURN(-ENOMEM);
2304         }
2305
2306         if (!lsm)
2307                 RETURN(mea_size);
2308
2309         lsmp = (struct lmv_stripe_md *)lsm;
2310         meap = (struct lmv_stripe_md *)*lmmp;
2311
2312         if (lsmp->mea_magic != MEA_MAGIC_LAST_CHAR &&
2313             lsmp->mea_magic != MEA_MAGIC_ALL_CHARS)
2314                 RETURN(-EINVAL);
2315
2316         meap->mea_magic = cpu_to_le32(lsmp->mea_magic);
2317         meap->mea_count = cpu_to_le32(lsmp->mea_count);
2318         meap->mea_master = cpu_to_le32(lsmp->mea_master);
2319
2320         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2321                 meap->mea_ids[i] = meap->mea_ids[i];
2322                 fid_cpu_to_le(&meap->mea_ids[i], &meap->mea_ids[i]);
2323         }
2324
2325         RETURN(mea_size);
2326 }
2327
2328 int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
2329                  struct lov_mds_md *lmm, int lmm_size)
2330 {
2331         struct obd_device *obd = class_exp2obd(exp);
2332         struct lmv_stripe_md **tmea = (struct lmv_stripe_md **)lsmp;
2333         struct lmv_stripe_md *mea = (struct lmv_stripe_md *)lmm;
2334         struct lmv_obd *lmv = &obd->u.lmv;
2335         int mea_size, i;
2336         __u32 magic;
2337         ENTRY;
2338
2339         mea_size = lmv_get_easize(lmv);
2340         if (lsmp == NULL)
2341                 return mea_size;
2342
2343         if (*lsmp != NULL && lmm == NULL) {
2344                 OBD_FREE(*tmea, mea_size);
2345                 RETURN(0);
2346         }
2347
2348         LASSERT(mea_size == lmm_size);
2349
2350         OBD_ALLOC(*tmea, mea_size);
2351         if (*tmea == NULL)
2352                 RETURN(-ENOMEM);
2353
2354         if (!lmm)
2355                 RETURN(mea_size);
2356
2357         if (mea->mea_magic == MEA_MAGIC_LAST_CHAR ||
2358             mea->mea_magic == MEA_MAGIC_ALL_CHARS ||
2359             mea->mea_magic == MEA_MAGIC_HASH_SEGMENT)
2360         {
2361                 magic = le32_to_cpu(mea->mea_magic);
2362         } else {
2363                 /* old mea isnot handled here */
2364                 LBUG();
2365         }
2366
2367         (*tmea)->mea_magic = magic;
2368         (*tmea)->mea_count = le32_to_cpu(mea->mea_count);
2369         (*tmea)->mea_master = le32_to_cpu(mea->mea_master);
2370
2371         for (i = 0; i < (*tmea)->mea_count; i++) {
2372                 (*tmea)->mea_ids[i] = mea->mea_ids[i];
2373                 fid_le_to_cpu(&(*tmea)->mea_ids[i], &(*tmea)->mea_ids[i]);
2374         }
2375         RETURN(mea_size);
2376 }
2377
2378 static int lmv_cancel_unused(struct obd_export *exp,
2379                              const struct lu_fid *fid,
2380                              int flags, void *opaque)
2381 {
2382         struct obd_device *obd = exp->exp_obd;
2383         struct lmv_obd *lmv = &obd->u.lmv;
2384         int rc = 0, err, i;
2385         ENTRY;
2386
2387         LASSERT(fid != NULL);
2388
2389         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2390                 if (!lmv->tgts[i].ltd_exp || !lmv->tgts[i].active)
2391                         continue;
2392
2393                 err = md_cancel_unused(lmv->tgts[i].ltd_exp,
2394                                        fid, flags, opaque);
2395                 if (!rc)
2396                         rc = err;
2397         }
2398         RETURN(rc);
2399 }
2400
2401 int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data)
2402 {
2403         struct obd_device *obd = exp->exp_obd;
2404         struct lmv_obd *lmv = &obd->u.lmv;
2405
2406         ENTRY;
2407         RETURN(md_set_lock_data(lmv->tgts[0].ltd_exp, lockh, data));
2408 }
2409
2410 int lmv_lock_match(struct obd_export *exp, int flags,
2411                    const struct lu_fid *fid, ldlm_type_t type,
2412                    ldlm_policy_data_t *policy, ldlm_mode_t mode,
2413                    struct lustre_handle *lockh)
2414 {
2415         struct obd_device *obd = exp->exp_obd;
2416         struct lmv_obd *lmv = &obd->u.lmv;
2417         int i, rc = 0;
2418         ENTRY;
2419
2420         CDEBUG(D_OTHER, "lock match for "DFID"\n", PFID(fid));
2421
2422         /* with CMD every object can have two locks in different namespaces:
2423          * lookup lock in space of mds storing direntry and update/open lock in
2424          * space of mds storing inode. Thus we check all targets, not only that
2425          * one fid was created in. */
2426         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2427                 rc = md_lock_match(lmv->tgts[i].ltd_exp, flags, fid,
2428                                    type, policy, mode, lockh);
2429                 if (rc)
2430                         RETURN(1);
2431         }
2432
2433         RETURN(rc);
2434 }
2435
2436 int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
2437                       int offset, struct obd_export *dt_exp,
2438                       struct obd_export *md_exp, struct lustre_md *md)
2439 {
2440         struct obd_device *obd = exp->exp_obd;
2441         struct lmv_obd *lmv = &obd->u.lmv;
2442         int rc;
2443
2444         ENTRY;
2445         rc = md_get_lustre_md(lmv->tgts[0].ltd_exp, req, offset, dt_exp, md_exp,
2446                               md);
2447         RETURN(rc);
2448 }
2449
2450 int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
2451 {
2452         struct obd_device *obd = exp->exp_obd;
2453         struct lmv_obd *lmv = &obd->u.lmv;
2454
2455         ENTRY;
2456         RETURN(md_free_lustre_md(lmv->tgts[0].ltd_exp, md));
2457 }
2458
2459 int lmv_set_open_replay_data(struct obd_export *exp,
2460                              struct obd_client_handle *och,
2461                              struct ptlrpc_request *open_req)
2462 {
2463         struct obd_device *obd = exp->exp_obd;
2464         struct lmv_obd *lmv = &obd->u.lmv;
2465         struct obd_export *tgt_exp;
2466
2467         ENTRY;
2468
2469         tgt_exp = lmv_get_export(lmv, och->och_fid);
2470         if (IS_ERR(tgt_exp))
2471                 RETURN(PTR_ERR(tgt_exp));
2472
2473         RETURN(md_set_open_replay_data(tgt_exp, och, open_req));
2474 }
2475
2476 int lmv_clear_open_replay_data(struct obd_export *exp,
2477                                struct obd_client_handle *och)
2478 {
2479         struct obd_device *obd = exp->exp_obd;
2480         struct lmv_obd *lmv = &obd->u.lmv;
2481         struct obd_export *tgt_exp;
2482         ENTRY;
2483
2484         tgt_exp = lmv_get_export(lmv, och->och_fid);
2485         if (IS_ERR(tgt_exp))
2486                 RETURN(PTR_ERR(tgt_exp));
2487
2488         RETURN(md_clear_open_replay_data(tgt_exp, och));
2489 }
2490
2491 static int lmv_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid,
2492                                struct obd_capa *oc,
2493                                struct ptlrpc_request **request)
2494 {
2495         struct obd_device *obd = exp->exp_obd;
2496         struct lmv_obd *lmv = &obd->u.lmv;
2497         struct obd_export *tgt_exp;
2498         int rc;
2499
2500         ENTRY;
2501
2502         rc = lmv_check_connect(obd);
2503         if (rc)
2504                 RETURN(rc);
2505
2506         tgt_exp = lmv_get_export(lmv, fid);
2507         if (IS_ERR(tgt_exp))
2508                 RETURN(PTR_ERR(tgt_exp));
2509
2510         rc = md_get_remote_perm(tgt_exp, fid, oc, request);
2511
2512         RETURN(rc);
2513 }
2514
2515 static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *oc,
2516                           renew_capa_cb_t cb)
2517 {
2518         struct obd_device *obd = exp->exp_obd;
2519         struct lmv_obd *lmv = &obd->u.lmv;
2520         struct obd_export *tgt_exp;
2521         int rc;
2522         ENTRY;
2523
2524         rc = lmv_check_connect(obd);
2525         if (rc)
2526                 RETURN(rc);
2527
2528         tgt_exp = lmv_get_export(lmv, &oc->c_capa.lc_fid);
2529         if (IS_ERR(tgt_exp))
2530                 RETURN(PTR_ERR(tgt_exp));
2531
2532         rc = md_renew_capa(tgt_exp, oc, cb);
2533         RETURN(rc);
2534 }
2535
2536 struct obd_ops lmv_obd_ops = {
2537         .o_owner                = THIS_MODULE,
2538         .o_setup                = lmv_setup,
2539         .o_cleanup              = lmv_cleanup,
2540         .o_precleanup           = lmv_precleanup,
2541         .o_process_config       = lmv_process_config,
2542         .o_connect              = lmv_connect,
2543         .o_disconnect           = lmv_disconnect,
2544         .o_statfs               = lmv_statfs,
2545         .o_llog_init            = lmv_llog_init,
2546         .o_llog_finish          = lmv_llog_finish,
2547         .o_get_info             = lmv_get_info,
2548         .o_set_info_async       = lmv_set_info_async,
2549         .o_packmd               = lmv_packmd,
2550         .o_unpackmd             = lmv_unpackmd,
2551         .o_notify               = lmv_notify,
2552         .o_fid_init             = lmv_fid_init,
2553         .o_fid_fini             = lmv_fid_fini,
2554         .o_fid_alloc            = lmv_fid_alloc,
2555         .o_fid_delete           = lmv_fid_delete,
2556         .o_iocontrol            = lmv_iocontrol
2557 };
2558
2559 struct md_ops lmv_md_ops = {
2560         .m_getstatus            = lmv_getstatus,
2561         .m_change_cbdata        = lmv_change_cbdata,
2562         .m_close                = lmv_close,
2563         .m_create               = lmv_create,
2564         .m_done_writing         = lmv_done_writing,
2565         .m_enqueue              = lmv_enqueue,
2566         .m_getattr              = lmv_getattr,
2567         .m_getxattr             = lmv_getxattr,
2568         .m_getattr_name         = lmv_getattr_name,
2569         .m_intent_lock          = lmv_intent_lock,
2570         .m_link                 = lmv_link,
2571         .m_rename               = lmv_rename,
2572         .m_setattr              = lmv_setattr,
2573         .m_setxattr             = lmv_setxattr,
2574         .m_sync                 = lmv_sync,
2575         .m_readpage             = lmv_readpage,
2576         .m_unlink               = lmv_unlink,
2577         .m_init_ea_size         = lmv_init_ea_size,
2578         .m_cancel_unused        = lmv_cancel_unused,
2579         .m_set_lock_data        = lmv_set_lock_data,
2580         .m_lock_match           = lmv_lock_match,
2581         .m_get_lustre_md        = lmv_get_lustre_md,
2582         .m_free_lustre_md       = lmv_free_lustre_md,
2583         .m_set_open_replay_data = lmv_set_open_replay_data,
2584         .m_clear_open_replay_data = lmv_clear_open_replay_data,
2585         .m_get_remote_perm      = lmv_get_remote_perm,
2586         .m_renew_capa           = lmv_renew_capa
2587 };
2588
2589 int __init lmv_init(void)
2590 {
2591         struct lprocfs_static_vars lvars;
2592         int rc;
2593
2594         obj_cache = kmem_cache_create("lmv_objects",
2595                                       sizeof(struct lmv_obj),
2596                                       0, 0, NULL, NULL);
2597         if (!obj_cache) {
2598                 CERROR("error allocating lmv objects cache\n");
2599                 return -ENOMEM;
2600         }
2601
2602         lprocfs_init_vars(lmv, &lvars);
2603         rc = class_register_type(&lmv_obd_ops, &lmv_md_ops,
2604                                  lvars.module_vars, LUSTRE_LMV_NAME, NULL);
2605         if (rc)
2606                 kmem_cache_destroy(obj_cache);
2607
2608         return rc;
2609 }
2610
2611 #ifdef __KERNEL__
2612 static void lmv_exit(void)
2613 {
2614         class_unregister_type(LUSTRE_LMV_NAME);
2615
2616         LASSERTF(kmem_cache_destroy(obj_cache) == 0,
2617                  "can't free lmv objects cache, %d object(s)"
2618                  "still in use\n", atomic_read(&obj_cache_count));
2619 }
2620
2621 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2622 MODULE_DESCRIPTION("Lustre Logical Metadata Volume OBD driver");
2623 MODULE_LICENSE("GPL");
2624
2625 module_init(lmv_init);
2626 module_exit(lmv_exit);
2627 #endif