Whamcloud - gitweb
942e6b16bd20ac301bc079889bcfdd119d0e897b
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003, 2004, 2005, 2006 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LMV
26 #ifdef __KERNEL__
27 #include <linux/slab.h>
28 #include <linux/module.h>
29 #include <linux/init.h>
30 #include <linux/slab.h>
31 #include <linux/pagemap.h>
32 #include <linux/mm.h>
33 #include <asm/div64.h>
34 #include <linux/seq_file.h>
35 #include <linux/namei.h>
36 #else
37 #include <liblustre.h>
38 #endif
39 #include <linux/ext2_fs.h>
40
41 #include <lustre/lustre_idl.h>
42 #include <lustre_log.h>
43 #include <obd_support.h>
44 #include <lustre_lib.h>
45 #include <lustre_net.h>
46 #include <obd_class.h>
47 #include <lprocfs_status.h>
48 #include <lustre_lite.h>
49 #include <lustre_fid.h>
50 #include "lmv_internal.h"
51
52 /* not defined for liblustre building */
53 #if !defined(ATOMIC_INIT)
54 #define ATOMIC_INIT(val) { (val) }
55 #endif
56
57 /* object cache. */
58 kmem_cache_t *obj_cache;
59 atomic_t obj_cache_count = ATOMIC_INIT(0);
60
61 static void lmv_activate_target(struct lmv_obd *lmv,
62                                 struct lmv_tgt_desc *tgt,
63                                 int activate)
64 {
65         if (tgt->ltd_active == activate)
66                 return;
67
68         tgt->ltd_active = activate;
69         lmv->desc.ld_active_tgt_count += (activate ? 1 : -1);
70 }
71
72 /* Error codes:
73  *
74  *  -EINVAL  : UUID can't be found in the LMV's target list
75  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
76  *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
77  */
78 static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
79                               int activate)
80 {
81         struct lmv_tgt_desc *tgt;
82         struct obd_device *obd;
83         int i, rc = 0;
84         ENTRY;
85
86         CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
87                lmv, uuid->uuid, activate);
88
89         spin_lock(&lmv->lmv_lock);
90         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
91                 if (tgt->ltd_exp == NULL)
92                         continue;
93
94                 CDEBUG(D_INFO, "lmv idx %d is %s conn "LPX64"\n",
95                        i, tgt->ltd_uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
96
97                 if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
98                         break;
99         }
100
101         if (i == lmv->desc.ld_tgt_count)
102                 GOTO(out_lmv_lock, rc = -EINVAL);
103
104         obd = class_exp2obd(tgt->ltd_exp);
105         if (obd == NULL)
106                 GOTO(out_lmv_lock, rc = -ENOTCONN);
107
108         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
109                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
110                obd->obd_type->typ_name, i);
111         LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0);
112
113         if (tgt->ltd_active == activate) {
114                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
115                        activate ? "" : "in");
116                 GOTO(out_lmv_lock, rc);
117         }
118
119         CDEBUG(D_INFO, "Marking OBD %p %sactive\n",
120                obd, activate ? "" : "in");
121
122         lmv_activate_target(lmv, tgt, activate);
123
124         EXIT;
125
126  out_lmv_lock:
127         spin_unlock(&lmv->lmv_lock);
128         return rc;
129 }
130
131 static int lmv_set_mdc_data(struct lmv_obd *lmv, struct obd_uuid *uuid,
132                             struct obd_connect_data *data)
133 {
134         struct lmv_tgt_desc *tgt;
135         int i;
136         ENTRY;
137
138         LASSERT(data != NULL);
139
140         spin_lock(&lmv->lmv_lock);
141         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
142                 if (tgt->ltd_exp == NULL)
143                         continue;
144
145                 if (obd_uuid_equals(uuid, &tgt->ltd_uuid)) {
146                         lmv->datas[tgt->ltd_idx] = *data;
147                         break;
148                 }
149         }
150         spin_unlock(&lmv->lmv_lock);
151         RETURN(0);
152 }
153
154 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
155                       enum obd_notify_event ev, void *data)
156 {
157         struct lmv_obd *lmv = &obd->u.lmv;
158         struct obd_uuid *uuid;
159         int rc = 0;
160         ENTRY;
161
162         if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) {
163                 CERROR("unexpected notification of %s %s!\n",
164                        watched->obd_type->typ_name,
165                        watched->obd_name);
166                 RETURN(-EINVAL);
167         }
168
169         uuid = &watched->u.cli.cl_target_uuid;
170         if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) {
171                 /*
172                  * Set MDC as active before notifying the observer, so the
173                  * observer can use the MDC normally.
174                  */
175                 rc = lmv_set_mdc_active(lmv, uuid,
176                                         ev == OBD_NOTIFY_ACTIVE);
177                 if (rc) {
178                         CERROR("%sactivation of %s failed: %d\n",
179                                ev == OBD_NOTIFY_ACTIVE ? "" : "de",
180                                uuid->uuid, rc);
181                         RETURN(rc);
182                 }
183         }
184
185         if (ev == OBD_NOTIFY_OCD) {
186                 struct obd_connect_data *conn_data =
187                         &watched->u.cli.cl_import->imp_connect_data;
188                 
189                 /* Set connect data to desired target, update
190                  * exp_connect_flags. */
191                 rc = lmv_set_mdc_data(lmv, uuid, conn_data);
192                 if (rc) {
193                         CERROR("can't set connect data to target %s, rc %d\n",
194                                uuid->uuid, rc);
195                         RETURN(rc);
196                 }
197
198                 /*
199                  * XXX: Make sure that ocd_connect_flags from all targets are
200                  * the same. Otherwise one of MDTs runs wrong version or
201                  * something like this.  --umka
202                  */
203                 obd->obd_self_export->exp_connect_flags =
204                         conn_data->ocd_connect_flags;
205         }
206
207         /* Pass the notification up the chain. */
208         if (obd->obd_observer)
209                 rc = obd_notify(obd->obd_observer, watched, ev, data);
210
211         RETURN(rc);
212 }
213
214 /* this is fake connect function. Its purpose is to initialize lmv and say
215  * caller that everything is okay. Real connection will be performed later. */
216 static int lmv_connect(const struct lu_env *env,
217                        struct lustre_handle *conn, struct obd_device *obd,
218                        struct obd_uuid *cluuid, struct obd_connect_data *data)
219 {
220 #ifdef __KERNEL__
221         struct proc_dir_entry *lmv_proc_dir;
222 #endif
223         struct lmv_obd *lmv = &obd->u.lmv;
224         struct obd_export *exp;
225         int rc = 0;
226         ENTRY;
227
228         rc = class_connect(conn, obd, cluuid);
229         if (rc) {
230                 CERROR("class_connection() returned %d\n", rc);
231                 RETURN(rc);
232         }
233
234         exp = class_conn2export(conn);
235
236         /* we don't want to actually do the underlying connections more than
237          * once, so keep track. */
238         lmv->refcount++;
239         if (lmv->refcount > 1) {
240                 class_export_put(exp);
241                 RETURN(0);
242         }
243
244         lmv->exp = exp;
245         lmv->connected = 0;
246         lmv->cluuid = *cluuid;
247
248         if (data)
249                 lmv->conn_data = *data;
250
251 #ifdef __KERNEL__
252         lmv_proc_dir = lprocfs_register("target_obds", obd->obd_proc_entry,
253                                         NULL, NULL);
254         if (IS_ERR(lmv_proc_dir)) {
255                 CERROR("could not register /proc/fs/lustre/%s/%s/target_obds.",
256                        obd->obd_type->typ_name, obd->obd_name);
257                 lmv_proc_dir = NULL;
258         }
259 #endif
260
261         /* all real clients should perform actual connection right away, because
262          * it is possible, that LMV will not have opportunity to connect targets
263          * and MDC stuff will be called directly, for instance while reading
264          * ../mdc/../kbytesfree procfs file, etc. */
265         if (data->ocd_connect_flags & OBD_CONNECT_REAL)
266                 rc = lmv_check_connect(obd);
267
268 #ifdef __KERNEL__
269         if (rc) {
270                 if (lmv_proc_dir)
271                         lprocfs_remove(lmv_proc_dir);
272         }
273 #endif
274
275         RETURN(rc);
276 }
277
278 static void lmv_set_timeouts(struct obd_device *obd)
279 {
280         struct lmv_tgt_desc *tgts;
281         struct lmv_obd *lmv;
282         int i;
283
284         lmv = &obd->u.lmv;
285         if (lmv->server_timeout == 0)
286                 return;
287
288         if (lmv->connected == 0)
289                 return;
290
291         for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgts++) {
292                 if (tgts->ltd_exp == NULL)
293                         continue;
294
295                 obd_set_info_async(tgts->ltd_exp, strlen("inter_mds"),
296                                    "inter_mds", 0, NULL, NULL);
297         }
298 }
299
300 static int lmv_init_ea_size(struct obd_export *exp, int easize,
301                             int def_easize, int cookiesize)
302 {
303         struct obd_device *obd = exp->exp_obd;
304         struct lmv_obd *lmv = &obd->u.lmv;
305         int i, rc = 0, change = 0;
306         ENTRY;
307
308         if (lmv->max_easize < easize) {
309                 lmv->max_easize = easize;
310                 change = 1;
311         }
312         if (lmv->max_def_easize < def_easize) {
313                 lmv->max_def_easize = def_easize;
314                 change = 1;
315         }
316         if (lmv->max_cookiesize < cookiesize) {
317                 lmv->max_cookiesize = cookiesize;
318                 change = 1;
319         }
320         if (change == 0)
321                 RETURN(0);
322
323         if (lmv->connected == 0)
324                 RETURN(0);
325
326         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
327                 if (lmv->tgts[i].ltd_exp == NULL) {
328                         CWARN("%s: NULL export for %d\n", obd->obd_name, i);
329                         continue;
330                 }
331
332                 rc = md_init_ea_size(lmv->tgts[i].ltd_exp, easize, def_easize,
333                                      cookiesize);
334                 if (rc) {
335                         CERROR("obd_init_ea_size() failed on MDT target %d, "
336                                "error %d.\n", i, rc);
337                         break;
338                 }
339         }
340         RETURN(rc);
341 }
342
343 #define MAX_STRING_SIZE 128
344
345 int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
346 {
347         struct lmv_obd *lmv = &obd->u.lmv;
348         struct obd_uuid *cluuid = &lmv->cluuid;
349         struct obd_connect_data *mdc_data = NULL;
350         struct obd_uuid lmv_mdc_uuid = { "LMV_MDC_UUID" };
351         struct lustre_handle conn = {0, };
352         struct obd_device *mdc_obd;
353         struct obd_export *mdc_exp;
354         struct lu_fld_target target;
355         int rc;
356 #ifdef __KERNEL__
357         struct proc_dir_entry *lmv_proc_dir;
358 #endif
359         ENTRY;
360
361         /* for MDS: don't connect to yourself */
362         if (obd_uuid_equals(&tgt->ltd_uuid, cluuid)) {
363                 CDEBUG(D_CONFIG, "don't connect back to %s\n", cluuid->uuid);
364                 /* XXX - the old code didn't increment active tgt count.
365                  *       should we ? */
366                 RETURN(0);
367         }
368
369         mdc_obd = class_find_client_obd(&tgt->ltd_uuid, LUSTRE_MDC_NAME,
370                                         &obd->obd_uuid);
371         if (!mdc_obd) {
372                 CERROR("target %s not attached\n", tgt->ltd_uuid.uuid);
373                 RETURN(-EINVAL);
374         }
375
376         CDEBUG(D_CONFIG, "connect to %s(%s) - %s, %s FOR %s\n",
377                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
378                 tgt->ltd_uuid.uuid, obd->obd_uuid.uuid,
379                 cluuid->uuid);
380
381         if (!mdc_obd->obd_set_up) {
382                 CERROR("target %s is not set up\n", tgt->ltd_uuid.uuid);
383                 RETURN(-EINVAL);
384         }
385
386         rc = obd_connect(NULL, &conn, mdc_obd, &lmv_mdc_uuid,
387                          &lmv->conn_data);
388         if (rc) {
389                 CERROR("target %s connect error %d\n", tgt->ltd_uuid.uuid, rc);
390                 RETURN(rc);
391         }
392
393         mdc_exp = class_conn2export(&conn);
394
395         /* Init fid sequence client for this mdc. */
396         rc = obd_fid_init(mdc_exp);
397         if (rc)
398                 RETURN(rc);
399
400         /* Add new FLD target. */
401         target.ft_srv = NULL;
402         target.ft_exp = mdc_exp;
403         target.ft_idx = tgt->ltd_idx;
404
405         fld_client_add_target(&lmv->lmv_fld, &target);
406
407         mdc_data = &class_exp2cliimp(mdc_exp)->imp_connect_data;
408
409         rc = obd_register_observer(mdc_obd, obd);
410         if (rc) {
411                 obd_disconnect(mdc_exp);
412                 CERROR("target %s register_observer error %d\n",
413                        tgt->ltd_uuid.uuid, rc);
414                 RETURN(rc);
415         }
416
417         if (obd->obd_observer) {
418                 /* tell the mds_lmv about the new target */
419                 rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
420                                 OBD_NOTIFY_ACTIVE, (void *)(tgt - lmv->tgts));
421                 if (rc) {
422                         obd_disconnect(mdc_exp);
423                         RETURN(rc);
424                 }
425         }
426
427         tgt->ltd_active = 1;
428         tgt->ltd_exp = mdc_exp;
429         lmv->desc.ld_active_tgt_count++;
430
431         /* copy connect data, it may be used later */
432         lmv->datas[tgt->ltd_idx] = *mdc_data;
433
434         md_init_ea_size(tgt->ltd_exp, lmv->max_easize,
435                         lmv->max_def_easize, lmv->max_cookiesize);
436
437         CDEBUG(D_CONFIG, "connected to %s(%s) successfully (%d)\n",
438                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
439                 atomic_read(&obd->obd_refcount));
440
441 #ifdef __KERNEL__
442         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
443         if (lmv_proc_dir) {
444                 struct proc_dir_entry *mdc_symlink;
445                 char name[MAX_STRING_SIZE + 1];
446
447                 LASSERT(mdc_obd->obd_type != NULL);
448                 LASSERT(mdc_obd->obd_type->typ_name != NULL);
449                 name[MAX_STRING_SIZE] = '\0';
450                 snprintf(name, MAX_STRING_SIZE, "../../../%s/%s",
451                          mdc_obd->obd_type->typ_name,
452                          mdc_obd->obd_name);
453                 mdc_symlink = proc_symlink(mdc_obd->obd_name,
454                                            lmv_proc_dir, name);
455                 if (mdc_symlink == NULL) {
456                         CERROR("could not register LMV target "
457                                "/proc/fs/lustre/%s/%s/target_obds/%s.",
458                                obd->obd_type->typ_name, obd->obd_name,
459                                mdc_obd->obd_name);
460                         lprocfs_remove(lmv_proc_dir);
461                         lmv_proc_dir = NULL;
462                 }
463         }
464 #endif
465         RETURN(0);
466 }
467
468 int lmv_add_target(struct obd_device *obd, struct obd_uuid *tgt_uuid)
469 {
470         struct lmv_obd *lmv = &obd->u.lmv;
471         struct lmv_tgt_desc *tgt;
472         int rc = 0;
473         ENTRY;
474
475         CDEBUG(D_CONFIG, "tgt_uuid: %s.\n", tgt_uuid->uuid);
476
477         lmv_init_lock(lmv);
478
479         if (lmv->desc.ld_active_tgt_count >= LMV_MAX_TGT_COUNT) {
480                 lmv_init_unlock(lmv);
481                 CERROR("can't add %s, LMV module compiled for %d MDCs. "
482                        "That many MDCs already configured.\n",
483                        tgt_uuid->uuid, LMV_MAX_TGT_COUNT);
484                 RETURN(-EINVAL);
485         }
486         if (lmv->desc.ld_tgt_count == 0) {
487                 struct obd_device *mdc_obd;
488
489                 mdc_obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME,
490                                                 &obd->obd_uuid);
491                 if (!mdc_obd) {
492                         lmv_init_unlock(lmv);
493                         CERROR("Target %s not attached\n", tgt_uuid->uuid);
494                         RETURN(-EINVAL);
495                 }
496
497                 rc = obd_llog_init(obd, NULL, mdc_obd, 0, NULL, tgt_uuid);
498                 if (rc) {
499                         lmv_init_unlock(lmv);
500                         CERROR("lmv failed to setup llogging subsystems\n");
501                 }
502         }
503         spin_lock(&lmv->lmv_lock);
504         tgt = lmv->tgts + lmv->desc.ld_tgt_count++;
505         tgt->ltd_uuid = *tgt_uuid;
506         spin_unlock(&lmv->lmv_lock);
507
508         if (lmv->connected) {
509                 rc = lmv_connect_mdc(obd, tgt);
510                 if (rc) {
511                         spin_lock(&lmv->lmv_lock);
512                         lmv->desc.ld_tgt_count--;
513                         memset(tgt, 0, sizeof(*tgt));
514                         spin_unlock(&lmv->lmv_lock);
515                 } else {
516                         int easize = sizeof(struct lmv_stripe_md) +
517                                      lmv->desc.ld_tgt_count *
518                                      sizeof(struct lu_fid);
519                         lmv_init_ea_size(obd->obd_self_export, easize, 0, 0);
520                 }
521         }
522
523         lmv_init_unlock(lmv);
524         RETURN(rc);
525 }
526
527 /* performs a check if passed obd is connected. If no - connect it. */
528 int lmv_check_connect(struct obd_device *obd)
529 {
530         struct lmv_obd *lmv = &obd->u.lmv;
531         struct lmv_tgt_desc *tgt;
532         int i, rc, easize;
533         ENTRY;
534
535         if (lmv->connected)
536                 RETURN(0);
537
538         lmv_init_lock(lmv);
539         if (lmv->connected) {
540                 lmv_init_unlock(lmv);
541                 RETURN(0);
542         }
543
544         if (lmv->desc.ld_tgt_count == 0) {
545                 CERROR("%s: no targets configured.\n", obd->obd_name);
546                 RETURN(-EINVAL);
547         }
548
549         CDEBUG(D_CONFIG, "time to connect %s to %s\n",
550                lmv->cluuid.uuid, obd->obd_name);
551
552         LASSERT(lmv->tgts != NULL);
553
554         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
555                 rc = lmv_connect_mdc(obd, tgt);
556                 if (rc)
557                         GOTO(out_disc, rc);
558         }
559
560         lmv_set_timeouts(obd);
561         class_export_put(lmv->exp);
562         lmv->connected = 1;
563         easize = lmv_get_easize(lmv);
564         lmv_init_ea_size(obd->obd_self_export, easize, 0, 0);
565         lmv_init_unlock(lmv);
566         RETURN(0);
567
568  out_disc:
569         while (i-- > 0) {
570                 int rc2;
571                 --tgt;
572                 tgt->ltd_active = 0;
573                 if (tgt->ltd_exp) {
574                         --lmv->desc.ld_active_tgt_count;
575                         rc2 = obd_disconnect(tgt->ltd_exp);
576                         if (rc2) {
577                                 CERROR("error: LMV target %s disconnect on "
578                                        "MDC idx %d: error %d\n",
579                                        tgt->ltd_uuid.uuid, i, rc2);
580                         }
581                 }
582         }
583         class_disconnect(lmv->exp);
584         lmv_init_unlock(lmv);
585         RETURN(rc);
586 }
587
588 static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
589 {
590 #ifdef __KERNEL__
591         struct proc_dir_entry *lmv_proc_dir;
592 #endif
593         struct lmv_obd *lmv = &obd->u.lmv;
594         struct obd_device *mdc_obd;
595         int rc;
596         ENTRY;
597
598         LASSERT(tgt != NULL);
599         LASSERT(obd != NULL);
600
601         mdc_obd = class_exp2obd(tgt->ltd_exp);
602
603         if (mdc_obd)
604                 mdc_obd->obd_no_recov = obd->obd_no_recov;
605
606 #ifdef __KERNEL__
607         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
608         if (lmv_proc_dir) {
609                 struct proc_dir_entry *mdc_symlink;
610
611                 mdc_symlink = lprocfs_srch(lmv_proc_dir, mdc_obd->obd_name);
612                 if (mdc_symlink) {
613                         lprocfs_remove(mdc_symlink);
614                 } else {
615                         CERROR("/proc/fs/lustre/%s/%s/target_obds/%s missing\n",
616                                obd->obd_type->typ_name, obd->obd_name,
617                                mdc_obd->obd_name);
618                 }
619         }
620 #endif
621         rc = obd_fid_fini(tgt->ltd_exp);
622         if (rc)
623                 CERROR("Can't finanize fids factory\n");
624
625         CDEBUG(D_OTHER, "Disconnected from %s(%s) successfully\n",
626                tgt->ltd_exp->exp_obd->obd_name,
627                tgt->ltd_exp->exp_obd->obd_uuid.uuid);
628
629         obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
630         rc = obd_disconnect(tgt->ltd_exp);
631         if (rc) {
632                 if (tgt->ltd_active) {
633                         CERROR("Target %s disconnect error %d\n",
634                                tgt->ltd_uuid.uuid, rc);
635                 }
636         }
637
638         lmv_activate_target(lmv, tgt, 0);
639         tgt->ltd_exp = NULL;
640         RETURN(0);
641 }
642
643 static int lmv_disconnect(struct obd_export *exp)
644 {
645         struct obd_device *obd = class_exp2obd(exp);
646 #ifdef __KERNEL__
647         struct proc_dir_entry *lmv_proc_dir;
648 #endif
649         struct lmv_obd *lmv = &obd->u.lmv;
650         int rc, i;
651         ENTRY;
652
653         if (!lmv->tgts)
654                 goto out_local;
655
656         /* Only disconnect the underlying layers on the final disconnect. */
657         lmv->refcount--;
658         if (lmv->refcount != 0)
659                 goto out_local;
660
661         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
662                 if (lmv->tgts[i].ltd_exp == NULL)
663                         continue;
664                 lmv_disconnect_mdc(obd, &lmv->tgts[i]);
665         }
666
667 #ifdef __KERNEL__
668         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
669         if (lmv_proc_dir) {
670                 lprocfs_remove(lmv_proc_dir);
671         } else {
672                 CERROR("/proc/fs/lustre/%s/%s/target_obds missing\n",
673                        obd->obd_type->typ_name, obd->obd_name);
674         }
675 #endif
676
677 out_local:
678         /*
679          * This is the case when no real connection is established by
680          * lmv_check_connect().
681          */
682         if (!lmv->connected)
683                 class_export_put(exp);
684         rc = class_disconnect(exp);
685         if (lmv->refcount == 0)
686                 lmv->connected = 0;
687         RETURN(rc);
688 }
689
690 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
691                          int len, void *karg, void *uarg)
692 {
693         struct obd_device *obddev = class_exp2obd(exp);
694         struct lmv_obd *lmv = &obddev->u.lmv;
695         int i, rc = 0, set = 0;
696         ENTRY;
697
698         if (lmv->desc.ld_tgt_count == 0)
699                 RETURN(-ENOTTY);
700
701         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
702                 int err;
703
704                 if (lmv->tgts[i].ltd_exp == NULL)
705                         continue;
706
707                 err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp, len, karg, uarg);
708                 if (err) {
709                         if (lmv->tgts[i].ltd_active) {
710                                 CERROR("error: iocontrol MDC %s on MDT"
711                                        "idx %d: err = %d\n",
712                                        lmv->tgts[i].ltd_uuid.uuid, i, err);
713                                 if (!rc)
714                                         rc = err;
715                         }
716                 } else
717                         set = 1;
718         }
719         if (!set && !rc)
720                 rc = -EIO;
721
722         RETURN(rc);
723 }
724
725 /* assume all is balanced for now */
726 static int lmv_fids_balanced(struct obd_device *obd)
727 {
728         ENTRY;
729         RETURN(1);
730 }
731
732 static int lmv_all_chars_policy(int count, struct qstr *name)
733 {
734         unsigned int c = 0;
735         unsigned int len = name->len;
736
737         while (len > 0)
738                 c += name->name[-- len];
739         c = c % count;
740         return c;
741 }
742
743 static int lmv_placement_policy(struct obd_device *obd,
744                                 struct lu_placement_hint *hint,
745                                 mdsno_t *mds)
746 {
747         struct lmv_obd *lmv = &obd->u.lmv;
748         struct lmv_obj *obj;
749         int rc;
750         ENTRY;
751
752         LASSERT(mds != NULL);
753
754         /* Here are some policies to allocate new fid */
755         if (lmv_fids_balanced(obd)) {
756                 /*
757                  * Allocate new fid basing on its name in the case fids are
758                  * balanced, that is all sequences have more or less equal
759                  * number of objects created.
760                  */
761                 obj = lmv_obj_grab(obd, hint->ph_pfid);
762                 if (obj) {
763                         struct lu_fid *rpid;
764                         int mea_idx;
765
766                         /*
767                          * If the dir got split, alloc fid according to its
768                          * hash. No matter what we create, object create should
769                          * go to correct MDS.
770                          */
771                         mea_idx = raw_name2idx(obj->lo_hashtype,
772                                                obj->lo_objcount,
773                                                hint->ph_cname->name,
774                                                hint->ph_cname->len);
775                         rpid = &obj->lo_inodes[mea_idx].li_fid;
776                         *mds = obj->lo_inodes[mea_idx].li_mds;
777                         lmv_obj_put(obj);
778                         rc = 0;
779
780                         CDEBUG(D_INODE, "The obj "DFID" has been split, got "
781                                "MDS at "LPU64" by name %s\n", PFID(hint->ph_pfid),
782                                *mds, hint->ph_cname->name);
783                 } else if (hint->ph_cname && (hint->ph_opc == LUSTRE_OPC_MKDIR)) {
784                         /* Default policy for directories. */
785                         *mds = lmv_all_chars_policy(lmv->desc.ld_tgt_count,
786                                                     hint->ph_cname);
787                         rc = 0;
788                 } else {
789                         /*
790                          * Default policy for others is to use parent MDS.
791                          * ONLY directories can be cross-ref during creation.
792                          */
793                         rc = lmv_fld_lookup(lmv, hint->ph_pfid, mds);
794                 }
795         } else {
796                 /*
797                  * Sequences among all tgts are not well balanced, allocate new
798                  * fid taking this into account to balance them. Not implemented
799                  * yet!
800                  */
801                 *mds = 0;
802                 rc = -ENOSYS;
803         }
804
805         if (rc) {
806                 CERROR("Can't choose MDS, err = %d\n", rc);
807         } else {
808                 LASSERT(*mds < lmv->desc.ld_tgt_count);
809         }
810
811         RETURN(rc);
812 }
813
814 int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
815                     mdsno_t mds)
816 {
817         struct lmv_tgt_desc *tgt = &lmv->tgts[mds];
818         int rc;
819         ENTRY;
820
821         /* New seq alloc and FLD setup should be atomic. */
822         down(&tgt->ltd_fid_sem);
823
824         /* Asking underlaying tgt layer to allocate new fid. */
825         rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL);
826         if (rc > 0) {
827                 LASSERT(fid_is_sane(fid));
828
829                 /* Client switches to new sequence, setup FLD. */
830                 rc = fld_client_create(&lmv->lmv_fld, fid_seq(fid),
831                                        mds, NULL);
832                 if (rc) {
833                         CERROR("Can't create fld entry, rc %d\n", rc);
834                 }
835         }
836
837         up(&tgt->ltd_fid_sem);
838         RETURN(rc);
839 }
840
841 static int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
842                          struct lu_placement_hint *hint)
843 {
844         struct obd_device *obd = class_exp2obd(exp);
845         struct lmv_obd *lmv = &obd->u.lmv;
846         mdsno_t mds;
847         int rc;
848         ENTRY;
849
850         LASSERT(hint != NULL);
851         LASSERT(fid != NULL);
852
853         rc = lmv_placement_policy(obd, hint, &mds);
854         if (rc) {
855                 CERROR("Can't get target for allocating fid, rc %d\n", rc);
856                 RETURN(rc);
857         }
858
859         rc = __lmv_fid_alloc(lmv, fid, mds);
860         if (rc) {
861                 CERROR("Can't alloc new fid, rc %d\n", rc);
862                 RETURN(rc);
863         }
864
865         RETURN(rc);
866 }
867
868 static int lmv_fid_delete(struct obd_export *exp, const struct lu_fid *fid)
869 {
870         ENTRY;
871
872         LASSERT(exp && fid);
873         if (lmv_obj_delete(exp, fid)) {
874                 CDEBUG(D_OTHER, "lmv object "DFID" is destroyed.\n",
875                        PFID(fid));
876         }
877         RETURN(0);
878 }
879
880 static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
881 {
882         struct lmv_obd *lmv = &obd->u.lmv;
883         struct lprocfs_static_vars lvars;
884         struct lmv_desc *desc;
885         int rc, i = 0;
886         ENTRY;
887
888         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
889                 CERROR("LMV setup requires a descriptor\n");
890                 RETURN(-EINVAL);
891         }
892
893         desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
894         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
895                 CERROR("descriptor size wrong: %d > %d\n",
896                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
897                 RETURN(-EINVAL);
898         }
899
900         lmv->tgts_size = LMV_MAX_TGT_COUNT * sizeof(struct lmv_tgt_desc);
901
902         OBD_ALLOC(lmv->tgts, lmv->tgts_size);
903         if (lmv->tgts == NULL)
904                 RETURN(-ENOMEM);
905
906         for (i = 0; i < LMV_MAX_TGT_COUNT; i++) {
907                 sema_init(&lmv->tgts[i].ltd_fid_sem, 1);
908                 lmv->tgts[i].ltd_idx = i;
909         }
910
911         lmv->datas_size = LMV_MAX_TGT_COUNT * sizeof(struct obd_connect_data);
912
913         OBD_ALLOC(lmv->datas, lmv->datas_size);
914         if (lmv->datas == NULL)
915                 GOTO(out_free_tgts, rc = -ENOMEM);
916
917         obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid);
918         lmv->desc.ld_tgt_count = 0;
919         lmv->desc.ld_active_tgt_count = 0;
920         lmv->max_cookiesize = 0;
921         lmv->max_def_easize = 0;
922         lmv->max_easize = 0;
923
924         spin_lock_init(&lmv->lmv_lock);
925         sema_init(&lmv->init_sem, 1);
926
927         rc = lmv_obj_setup(obd);
928         if (rc) {
929                 CERROR("Can't setup LMV object manager, "
930                        "error %d.\n", rc);
931                 GOTO(out_free_datas, rc);
932         }
933
934         lprocfs_init_vars(lmv, &lvars);
935         lprocfs_obd_setup(obd, lvars.obd_vars);
936 #ifdef LPROCFS
937         {
938                 struct proc_dir_entry *entry;
939
940                 entry = create_proc_entry("target_obd_status", 0444,
941                                           obd->obd_proc_entry);
942                 if (entry != NULL) {
943                         entry->proc_fops = &lmv_proc_target_fops;
944                         entry->data = obd;
945                 }
946        }
947 #endif
948         rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
949                              LUSTRE_CLI_FLD_HASH_DHT);
950         if (rc) {
951                 CERROR("can't init FLD, err %d\n",
952                        rc);
953                 GOTO(out_free_datas, rc);
954         }
955
956         RETURN(0);
957
958 out_free_datas:
959         OBD_FREE(lmv->datas, lmv->datas_size);
960         lmv->datas = NULL;
961 out_free_tgts:
962         OBD_FREE(lmv->tgts, lmv->tgts_size);
963         lmv->tgts = NULL;
964         return rc;
965 }
966
967 static int lmv_cleanup(struct obd_device *obd)
968 {
969         struct lmv_obd *lmv = &obd->u.lmv;
970         ENTRY;
971
972         fld_client_fini(&lmv->lmv_fld);
973         lprocfs_obd_cleanup(obd);
974         lmv_obj_cleanup(obd);
975         OBD_FREE(lmv->datas, lmv->datas_size);
976         OBD_FREE(lmv->tgts, lmv->tgts_size);
977
978         RETURN(0);
979 }
980
981 static int lmv_process_config(struct obd_device *obd, obd_count len, void *buf)
982 {
983         struct lustre_cfg *lcfg = buf;
984         struct obd_uuid tgt_uuid;
985         int rc;
986         ENTRY;
987
988         switch(lcfg->lcfg_command) {
989         case LCFG_ADD_MDC:
990                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(tgt_uuid.uuid))
991                         GOTO(out, rc = -EINVAL);
992
993                 obd_str2uuid(&tgt_uuid, lustre_cfg_string(lcfg, 1));
994                 rc = lmv_add_target(obd, &tgt_uuid);
995                 GOTO(out, rc);
996         default: {
997                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
998                 GOTO(out, rc = -EINVAL);
999         }
1000         }
1001 out:
1002         RETURN(rc);
1003 }
1004
1005 static int lmv_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1006                       __u64 max_age)
1007 {
1008         struct lmv_obd *lmv = &obd->u.lmv;
1009         struct obd_statfs *temp;
1010         int rc = 0, i;
1011         ENTRY;
1012
1013         rc = lmv_check_connect(obd);
1014         if (rc)
1015                 RETURN(rc);
1016
1017         OBD_ALLOC(temp, sizeof(*temp));
1018         if (temp == NULL)
1019                 RETURN(-ENOMEM);
1020
1021         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1022                 if (lmv->tgts[i].ltd_exp == NULL)
1023                         continue;
1024
1025                 rc = obd_statfs(lmv->tgts[i].ltd_exp->exp_obd, temp, max_age);
1026                 if (rc) {
1027                         CERROR("can't stat MDS #%d (%s), error %d\n", i,
1028                                lmv->tgts[i].ltd_exp->exp_obd->obd_name,
1029                                rc);
1030                         GOTO(out_free_temp, rc);
1031                 }
1032                 if (i == 0) {
1033                         *osfs = *temp;
1034                 } else {
1035                         osfs->os_bavail += temp->os_bavail;
1036                         osfs->os_blocks += temp->os_blocks;
1037                         osfs->os_ffree += temp->os_ffree;
1038                         osfs->os_files += temp->os_files;
1039                 }
1040         }
1041
1042         EXIT;
1043 out_free_temp:
1044         OBD_FREE(temp, sizeof(*temp));
1045         return rc;
1046 }
1047
1048 static int lmv_getstatus(struct obd_export *exp,
1049                          struct lu_fid *fid,
1050                          struct obd_capa **pc)
1051 {
1052         struct obd_device *obd = exp->exp_obd;
1053         struct lmv_obd *lmv = &obd->u.lmv;
1054         int rc;
1055         ENTRY;
1056
1057         rc = lmv_check_connect(obd);
1058         if (rc)
1059                 RETURN(rc);
1060
1061         rc = md_getstatus(lmv->tgts[0].ltd_exp, fid, pc);
1062
1063         RETURN(rc);
1064 }
1065
1066 static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid,
1067                         struct obd_capa *oc, obd_valid valid, const char *name,
1068                         const char *input, int input_size, int output_size,
1069                         int flags, struct ptlrpc_request **request)
1070 {
1071         struct obd_device *obd = exp->exp_obd;
1072         struct lmv_obd *lmv = &obd->u.lmv;
1073         struct obd_export *tgt_exp;
1074         int rc;
1075         ENTRY;
1076
1077         rc = lmv_check_connect(obd);
1078         if (rc)
1079                 RETURN(rc);
1080
1081         tgt_exp = lmv_find_export(lmv, fid);
1082         if (IS_ERR(tgt_exp))
1083                 RETURN(PTR_ERR(tgt_exp));
1084
1085         rc = md_getxattr(tgt_exp, fid, oc, valid, name, input, input_size,
1086                          output_size, flags, request);
1087
1088         RETURN(rc);
1089 }
1090
1091 static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
1092                         struct obd_capa *oc, obd_valid valid, const char *name,
1093                         const char *input, int input_size, int output_size,
1094                         int flags, struct ptlrpc_request **request)
1095 {
1096         struct obd_device *obd = exp->exp_obd;
1097         struct lmv_obd *lmv = &obd->u.lmv;
1098         struct obd_export *tgt_exp;
1099         int rc;
1100         ENTRY;
1101
1102         rc = lmv_check_connect(obd);
1103         if (rc)
1104                 RETURN(rc);
1105
1106         tgt_exp = lmv_find_export(lmv, fid);
1107         if (IS_ERR(tgt_exp))
1108                 RETURN(PTR_ERR(tgt_exp));
1109
1110         rc = md_setxattr(tgt_exp, fid, oc, valid, name,
1111                          input, input_size, output_size, flags, request);
1112
1113         RETURN(rc);
1114 }
1115
1116 static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid,
1117                        struct obd_capa *oc, obd_valid valid, int ea_size,
1118                        struct ptlrpc_request **request)
1119 {
1120         struct obd_device *obd = exp->exp_obd;
1121         struct lmv_obd *lmv = &obd->u.lmv;
1122         struct obd_export *tgt_exp;
1123         struct lmv_obj *obj;
1124         int rc, i;
1125         ENTRY;
1126
1127         rc = lmv_check_connect(obd);
1128         if (rc)
1129                 RETURN(rc);
1130
1131         tgt_exp = lmv_find_export(lmv, fid);
1132         if (IS_ERR(tgt_exp))
1133                 RETURN(PTR_ERR(tgt_exp));
1134
1135         rc = md_getattr(tgt_exp, fid, oc, valid, ea_size, request);
1136         if (rc)
1137                 RETURN(rc);
1138
1139         obj = lmv_obj_grab(obd, fid);
1140
1141         CDEBUG(D_OTHER, "GETATTR for "DFID" %s\n",
1142                PFID(fid), obj ? "(split)" : "");
1143
1144         /* if object is split, then we loop over all the slaves and gather size
1145          * attribute. In ideal world we would have to gather also mds field from
1146          * all slaves, as object is spread over the cluster and this is
1147          * definitely interesting information and it is not good to loss it,
1148          * but... */
1149         if (obj) {
1150                 struct mdt_body *body;
1151
1152                 if (*request == NULL) {
1153                         lmv_obj_put(obj);
1154                         RETURN(rc);
1155                 }
1156
1157                 body = lustre_msg_buf((*request)->rq_repmsg, REPLY_REC_OFF,
1158                                       sizeof(*body));
1159                 LASSERT(body != NULL);
1160
1161                 lmv_obj_lock(obj);
1162
1163                 for (i = 0; i < obj->lo_objcount; i++) {
1164                         if (lmv->tgts[i].ltd_exp == NULL) {
1165                                 CWARN("%s: NULL export for %d\n",
1166                                       obd->obd_name, i);
1167                                 continue;
1168                         }
1169
1170                         /* skip master obj. */
1171                         if (lu_fid_eq(&obj->lo_fid, &obj->lo_inodes[i].li_fid))
1172                                 continue;
1173
1174                         body->size += obj->lo_inodes[i].li_size;
1175                 }
1176
1177                 lmv_obj_unlock(obj);
1178                 lmv_obj_put(obj);
1179         }
1180
1181         RETURN(rc);
1182 }
1183
1184 static int lmv_change_cbdata(struct obd_export *exp, const struct lu_fid *fid,
1185                              ldlm_iterator_t it, void *data)
1186 {
1187         struct obd_device *obd = exp->exp_obd;
1188         struct lmv_obd *lmv = &obd->u.lmv;
1189         int i, rc;
1190         ENTRY;
1191
1192         rc = lmv_check_connect(obd);
1193         if (rc)
1194                 RETURN(rc);
1195
1196         CDEBUG(D_OTHER, "CBDATA for "DFID"\n", PFID(fid));
1197
1198         /* with CMD every object can have two locks in different namespaces:
1199          * lookup lock in space of mds storing direntry and update/open lock in
1200          * space of mds storing inode */
1201         for (i = 0; i < lmv->desc.ld_tgt_count; i++)
1202                 md_change_cbdata(lmv->tgts[i].ltd_exp, fid, it, data);
1203
1204         RETURN(0);
1205 }
1206
1207 static int lmv_close(struct obd_export *exp,
1208                      struct md_op_data *op_data,
1209                      struct obd_client_handle *och,
1210                      struct ptlrpc_request **request)
1211 {
1212         struct obd_device *obd = exp->exp_obd;
1213         struct lmv_obd *lmv = &obd->u.lmv;
1214         struct obd_export *tgt_exp;
1215         int rc;
1216         ENTRY;
1217
1218         rc = lmv_check_connect(obd);
1219         if (rc)
1220                 RETURN(rc);
1221
1222         tgt_exp = lmv_find_export(lmv, &op_data->op_fid1);
1223         if (IS_ERR(tgt_exp))
1224                 RETURN(PTR_ERR(tgt_exp));
1225
1226         CDEBUG(D_OTHER, "CLOSE "DFID"\n", PFID(&op_data->op_fid1));
1227         rc = md_close(tgt_exp, op_data, och, request);
1228         RETURN(rc);
1229 }
1230
1231 /*
1232  * Called in the case MDS returns -ERESTART on create on open, what means that
1233  * directory is split and its LMV presentation object has to be updated.
1234  */
1235 int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid)
1236 {
1237         struct obd_device *obd = exp->exp_obd;
1238         struct lmv_obd *lmv = &obd->u.lmv;
1239         struct ptlrpc_request *req = NULL;
1240         struct obd_export *tgt_exp;
1241         struct lmv_obj *obj;
1242         struct lustre_md md;
1243         int mealen, rc;
1244         __u64 valid;
1245         ENTRY;
1246
1247         md.mea = NULL;
1248         mealen = lmv_get_easize(lmv);
1249
1250         valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA;
1251
1252         tgt_exp = lmv_find_export(lmv, fid);
1253         if (IS_ERR(tgt_exp))
1254                 RETURN(PTR_ERR(tgt_exp));
1255
1256         /* time to update mea of parent fid */
1257         rc = md_getattr(tgt_exp, fid, NULL, valid, mealen, &req);
1258         if (rc) {
1259                 CERROR("md_getattr() failed, error %d\n", rc);
1260                 GOTO(cleanup, rc);
1261         }
1262
1263         rc = md_get_lustre_md(tgt_exp, req, 1, NULL, exp, &md);
1264         if (rc) {
1265                 CERROR("mdc_get_lustre_md() failed, error %d\n", rc);
1266                 GOTO(cleanup, rc);
1267         }
1268
1269         if (md.mea == NULL)
1270                 GOTO(cleanup, rc = -ENODATA);
1271
1272         obj = lmv_obj_create(exp, fid, md.mea);
1273         if (IS_ERR(obj))
1274                 rc = PTR_ERR(obj);
1275         else
1276                 lmv_obj_put(obj);
1277
1278         obd_free_memmd(exp, (struct lov_stripe_md **)&md.mea);
1279
1280         EXIT;
1281 cleanup:
1282         if (req)
1283                 ptlrpc_req_finished(req);
1284         return rc;
1285 }
1286
1287 int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
1288                const void *data, int datalen, int mode, __u32 uid,
1289                __u32 gid, __u32 cap_effective,  __u64 rdev,
1290                struct ptlrpc_request **request)
1291 {
1292         struct obd_device *obd = exp->exp_obd;
1293         struct lmv_obd *lmv = &obd->u.lmv;
1294         struct obd_export *tgt_exp;
1295         struct lmv_obj *obj;
1296         int rc, loop = 0;
1297         ENTRY;
1298
1299         rc = lmv_check_connect(obd);
1300         if (rc)
1301                 RETURN(rc);
1302
1303         if (!lmv->desc.ld_active_tgt_count)
1304                 RETURN(-EIO);
1305 repeat:
1306         LASSERT(++loop <= 2);
1307         obj = lmv_obj_grab(obd, &op_data->op_fid1);
1308         if (obj) {
1309                 int mea_idx;
1310
1311                 mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1312                                        op_data->op_name, op_data->op_namelen);
1313                 op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid;
1314                 tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds);
1315                 lmv_obj_put(obj);
1316         } else {
1317                 tgt_exp = lmv_find_export(lmv, &op_data->op_fid1);
1318         }
1319
1320         if (IS_ERR(tgt_exp))
1321                 RETURN(PTR_ERR(tgt_exp));
1322
1323         CDEBUG(D_OTHER, "CREATE '%*s' on "DFID"\n", op_data->op_namelen,
1324                op_data->op_name, PFID(&op_data->op_fid1));
1325
1326         rc = md_create(tgt_exp, op_data, data, datalen, mode, uid, gid,
1327                        cap_effective, rdev, request);
1328         if (rc == 0) {
1329                 if (*request == NULL)
1330                         RETURN(rc);
1331                 CDEBUG(D_OTHER, "created. "DFID"\n", PFID(&op_data->op_fid1));
1332         } else if (rc == -ERESTART) {
1333                 LASSERT(*request != NULL);
1334                 DEBUG_REQ(D_WARNING|D_RPCTRACE, *request, 
1335                           "Got -ERESTART during create!\n");
1336                 ptlrpc_req_finished(*request);
1337                 *request = NULL;
1338                 /*
1339                  * Directory got split. Time to update local object and repeat
1340                  * the request with proper MDS.
1341                  */
1342                 rc = lmv_handle_split(exp, &op_data->op_fid1);
1343                 if (rc == 0) {
1344                         rc = lmv_alloc_slave_fids(obd, &op_data->op_fid1,
1345                                                   op_data, &op_data->op_fid2);
1346                         if (rc)
1347                                 RETURN(rc);
1348                         goto repeat;
1349                 }
1350         }
1351         RETURN(rc);
1352 }
1353
1354 static int lmv_done_writing(struct obd_export *exp,
1355                             struct md_op_data *op_data,
1356                             struct obd_client_handle *och)
1357 {
1358         struct obd_device *obd = exp->exp_obd;
1359         struct lmv_obd *lmv = &obd->u.lmv;
1360         struct obd_export *tgt_exp;
1361         int rc;
1362         ENTRY;
1363
1364         rc = lmv_check_connect(obd);
1365         if (rc)
1366                 RETURN(rc);
1367
1368         tgt_exp = lmv_find_export(lmv, &op_data->op_fid1);
1369         if (IS_ERR(tgt_exp))
1370                 RETURN(PTR_ERR(tgt_exp));
1371
1372         rc = md_done_writing(tgt_exp, op_data, och);
1373         RETURN(rc);
1374 }
1375
1376 static int
1377 lmv_enqueue_slaves(struct obd_export *exp, int locktype,
1378                    struct lookup_intent *it, int lockmode,
1379                    struct md_op_data *op_data, struct lustre_handle *lockh,
1380                    void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
1381                    ldlm_blocking_callback cb_blocking, void *cb_data)
1382 {
1383         struct obd_device *obd = exp->exp_obd;
1384         struct lmv_obd *lmv = &obd->u.lmv;
1385         struct lmv_stripe_md *mea = op_data->op_mea1;
1386         struct md_op_data *op_data2;
1387         struct obd_export *tgt_exp;
1388         int i, rc = 0;
1389         ENTRY;
1390
1391         OBD_ALLOC_PTR(op_data2);
1392         if (op_data2 == NULL)
1393                 RETURN(-ENOMEM);
1394
1395         LASSERT(mea != NULL);
1396         for (i = 0; i < mea->mea_count; i++) {
1397                 memset(op_data2, 0, sizeof(*op_data2));
1398                 op_data2->op_fid1 = mea->mea_ids[i];
1399
1400                 tgt_exp = lmv_find_export(lmv, &op_data2->op_fid1);
1401                 if (IS_ERR(tgt_exp))
1402                         GOTO(cleanup, rc = PTR_ERR(tgt_exp));
1403
1404                 if (tgt_exp == NULL)
1405                         continue;
1406
1407                 rc = md_enqueue(tgt_exp, locktype, it, lockmode, op_data2,
1408                                 lockh + i, lmm, lmmsize, cb_compl, cb_blocking,
1409                                 cb_data, 0);
1410
1411                 CDEBUG(D_OTHER, "take lock on slave "DFID" -> %d/%d\n",
1412                        PFID(&mea->mea_ids[i]), rc, it->d.lustre.it_status);
1413
1414                 if (rc)
1415                         GOTO(cleanup, rc);
1416
1417                 if (it->d.lustre.it_data) {
1418                         struct ptlrpc_request *req;
1419                         req = (struct ptlrpc_request *)it->d.lustre.it_data;
1420                         ptlrpc_req_finished(req);
1421                 }
1422
1423                 if (it->d.lustre.it_status)
1424                         GOTO(cleanup, rc = it->d.lustre.it_status);
1425         }
1426
1427         EXIT;
1428 cleanup:
1429         OBD_FREE_PTR(op_data2);
1430
1431         if (rc != 0) {
1432                 /* drop all taken locks */
1433                 while (--i >= 0) {
1434                         if (lockh[i].cookie)
1435                                 ldlm_lock_decref(lockh + i, lockmode);
1436                         lockh[i].cookie = 0;
1437                 }
1438         }
1439         return rc;
1440 }
1441
1442 static int
1443 lmv_enqueue_remote(struct obd_export *exp, int lock_type,
1444                    struct lookup_intent *it, int lock_mode,
1445                    struct md_op_data *op_data, struct lustre_handle *lockh,
1446                    void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
1447                    ldlm_blocking_callback cb_blocking, void *cb_data,
1448                    int extra_lock_flags)
1449 {
1450         struct ptlrpc_request *req = it->d.lustre.it_data;
1451         struct obd_device *obd = exp->exp_obd;
1452         struct lmv_obd *lmv = &obd->u.lmv;
1453         struct mdt_body *body = NULL;
1454         struct lustre_handle plock;
1455         struct obd_export *tgt_exp;
1456         struct md_op_data *rdata;
1457         int rc = 0, pmode;
1458         struct lu_fid fid_copy;
1459         ENTRY;
1460
1461         body = lustre_msg_buf(req->rq_repmsg,
1462                               DLM_REPLY_REC_OFF, sizeof(*body));
1463         LASSERT(body != NULL);
1464
1465         if (!(body->valid & OBD_MD_MDS))
1466                 RETURN(0);
1467
1468         CDEBUG(D_OTHER, "ENQUEUE '%s' on "DFID" -> "DFID"\n",
1469                LL_IT2STR(it), PFID(&op_data->op_fid1), PFID(&body->fid1));
1470
1471         /* We got LOOKUP lock, but we really need attrs */
1472         pmode = it->d.lustre.it_lock_mode;
1473         LASSERT(pmode != 0);
1474         memcpy(&plock, lockh, sizeof(plock));
1475         it->d.lustre.it_lock_mode = 0;
1476         it->d.lustre.it_data = NULL;
1477         fid_copy = body->fid1;
1478
1479         it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
1480         ptlrpc_req_finished(req);
1481
1482         tgt_exp = lmv_find_export(lmv, &fid_copy);
1483         if (IS_ERR(tgt_exp))
1484                 GOTO(out, rc = PTR_ERR(tgt_exp));
1485
1486         OBD_ALLOC_PTR(rdata);
1487         if (rdata == NULL)
1488                 GOTO(out, rc = -ENOMEM);
1489
1490         rdata->op_fid1 = fid_copy;
1491         rdata->op_name = NULL;
1492         rdata->op_namelen = 0;
1493
1494         rc = md_enqueue(tgt_exp, lock_type, it, lock_mode, rdata,
1495                         lockh, lmm, lmmsize, cb_compl, cb_blocking,
1496                         cb_data, extra_lock_flags);
1497         OBD_FREE_PTR(rdata);
1498         EXIT;
1499 out:
1500         ldlm_lock_decref(&plock, pmode);
1501         return rc;
1502 }
1503
1504 static int
1505 lmv_enqueue(struct obd_export *exp, int lock_type,
1506             struct lookup_intent *it, int lock_mode,
1507             struct md_op_data *op_data, struct lustre_handle *lockh,
1508             void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
1509             ldlm_blocking_callback cb_blocking, void *cb_data,
1510             int extra_lock_flags)
1511 {
1512         struct obd_device *obd = exp->exp_obd;
1513         struct lmv_obd *lmv = &obd->u.lmv;
1514         struct obd_export *tgt_exp = NULL;
1515         struct lmv_obj *obj;
1516         int rc;
1517         ENTRY;
1518
1519         rc = lmv_check_connect(obd);
1520         if (rc)
1521                 RETURN(rc);
1522
1523         if (op_data->op_mea1 && it->it_op == IT_UNLINK) {
1524                 rc = lmv_enqueue_slaves(exp, lock_type, it, lock_mode,
1525                                         op_data, lockh, lmm, lmmsize,
1526                                         cb_compl, cb_blocking, cb_data);
1527                 RETURN(rc);
1528         }
1529
1530         if (op_data->op_namelen) {
1531                 obj = lmv_obj_grab(obd, &op_data->op_fid1);
1532                 if (obj) {
1533                         int mea_idx;
1534
1535                         /* directory is split. look for right mds for this
1536                          * name */
1537                         mea_idx = raw_name2idx(obj->lo_hashtype,
1538                                                obj->lo_objcount,
1539                                                (char *)op_data->op_name,
1540                                                op_data->op_namelen);
1541                         op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid;
1542                         tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds);
1543                         lmv_obj_put(obj);
1544                 }
1545         }
1546
1547         if (tgt_exp == NULL)
1548                 tgt_exp = lmv_find_export(lmv, &op_data->op_fid1);
1549         if (IS_ERR(tgt_exp))
1550                 RETURN(PTR_ERR(tgt_exp));
1551
1552         CDEBUG(D_OTHER, "ENQUEUE '%s' on "DFID"\n", LL_IT2STR(it),
1553                PFID(&op_data->op_fid1));
1554
1555         rc = md_enqueue(tgt_exp, lock_type, it, lock_mode, op_data, lockh,
1556                         lmm, lmmsize, cb_compl, cb_blocking, cb_data,
1557                         extra_lock_flags);
1558
1559         if (rc == 0 && it->it_op == IT_OPEN)
1560                 rc = lmv_enqueue_remote(exp, lock_type, it, lock_mode,
1561                                         op_data, lockh, lmm, lmmsize,
1562                                         cb_compl, cb_blocking, cb_data,
1563                                         extra_lock_flags);
1564         RETURN(rc);
1565 }
1566
1567 static int
1568 lmv_getattr_name(struct obd_export *exp, const struct lu_fid *fid,
1569                  struct obd_capa *oc, const char *filename, int namelen,
1570                  obd_valid valid, int ea_size, struct ptlrpc_request **request)
1571 {
1572         struct obd_device *obd = exp->exp_obd;
1573         struct lmv_obd *lmv = &obd->u.lmv;
1574         struct lu_fid rid = *fid;
1575         struct obd_export *tgt_exp;
1576         struct mdt_body *body;
1577         struct lmv_obj *obj;
1578         int rc, loop = 0;
1579         ENTRY;
1580
1581         rc = lmv_check_connect(obd);
1582         if (rc)
1583                 RETURN(rc);
1584
1585 repeat:
1586         LASSERT(++loop <= 2);
1587         obj = lmv_obj_grab(obd, &rid);
1588         if (obj) {
1589                 int mea_idx;
1590                 /* directory is split. look for right mds for this name */
1591                 mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1592                                        filename, namelen - 1);
1593                 rid = obj->lo_inodes[mea_idx].li_fid;
1594                 tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds);
1595                 lmv_obj_put(obj);
1596         } else {
1597                 tgt_exp = lmv_find_export(lmv, &rid);
1598         }
1599         if (IS_ERR(tgt_exp))
1600                 RETURN(PTR_ERR(tgt_exp));
1601
1602         CDEBUG(D_OTHER, "getattr_name for %*s on "DFID" -> "DFID"\n",
1603                namelen, filename, PFID(fid), PFID(&rid));
1604
1605         rc = md_getattr_name(tgt_exp, &rid, oc, filename, namelen, valid,
1606                              ea_size, request);
1607         if (rc == 0) {
1608                 body = lustre_msg_buf((*request)->rq_repmsg,
1609                                       REQ_REC_OFF, sizeof(*body));
1610                 LASSERT(body != NULL);
1611
1612                 if (body->valid & OBD_MD_MDS) {
1613                         struct ptlrpc_request *req = NULL;
1614
1615                         rid = body->fid1;
1616                         CDEBUG(D_OTHER, "request attrs for "DFID"\n",
1617                                PFID(&rid));
1618
1619                         tgt_exp = lmv_find_export(lmv, &rid);
1620                         if (IS_ERR(tgt_exp)) {
1621                                 ptlrpc_req_finished(*request);
1622                                 RETURN(PTR_ERR(tgt_exp));
1623                         }
1624
1625                         rc = md_getattr_name(tgt_exp, &rid, NULL, NULL, 1,
1626                                              valid, ea_size, &req);
1627                         ptlrpc_req_finished(*request);
1628                         *request = req;
1629                 }
1630         } else if (rc == -ERESTART) {
1631                 LASSERT(*request != NULL);
1632                 DEBUG_REQ(D_WARNING|D_RPCTRACE, *request, 
1633                           "Got -ERESTART during getattr!\n");
1634                 ptlrpc_req_finished(*request);
1635                 *request = NULL;
1636                 
1637                 /*
1638                  * Directory got split. Time to update local object and repeat
1639                  * the request with proper MDS.
1640                  */
1641                 rc = lmv_handle_split(exp, &rid);
1642                 if (rc == 0)
1643                         goto repeat;
1644         }
1645         RETURN(rc);
1646 }
1647
1648 /*
1649  * llite passes fid of an target inode in op_data->op_fid1 and id of directory in
1650  * op_data->op_fid2
1651  */
1652 static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
1653                     struct ptlrpc_request **request)
1654 {
1655         struct obd_device *obd = exp->exp_obd;
1656         struct lmv_obd *lmv = &obd->u.lmv;
1657         struct lmv_obj *obj;
1658         int rc, loop = 0;
1659         mdsno_t mds;
1660         ENTRY;
1661
1662         rc = lmv_check_connect(obd);
1663         if (rc)
1664                 RETURN(rc);
1665
1666 repeat:
1667         LASSERT(++loop <= 2);
1668         if (op_data->op_namelen != 0) {
1669                 int mea_idx;
1670                 
1671                 /* Usual link request */
1672                 obj = lmv_obj_grab(obd, &op_data->op_fid2);
1673                 if (obj) {
1674                         mea_idx = raw_name2idx(obj->lo_hashtype,
1675                                                obj->lo_objcount,
1676                                                op_data->op_name,
1677                                                op_data->op_namelen);
1678                         op_data->op_fid2 = obj->lo_inodes[mea_idx].li_fid;
1679                         mds = obj->lo_inodes[mea_idx].li_mds;
1680                         lmv_obj_put(obj);
1681                 } else {
1682                         rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds);
1683                         if (rc)
1684                                 RETURN(rc);
1685                 }
1686
1687                 CDEBUG(D_OTHER,"link "DFID":%*s to "DFID"\n",
1688                        PFID(&op_data->op_fid2), op_data->op_namelen,
1689                        op_data->op_name, PFID(&op_data->op_fid1));
1690         } else {
1691                 rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds);
1692                 if (rc)
1693                         RETURN(rc);
1694
1695                 /* request from MDS to acquire i_links for inode by fid1 */
1696                 CDEBUG(D_OTHER, "inc i_nlinks for "DFID"\n",
1697                        PFID(&op_data->op_fid1));
1698         }
1699
1700         CDEBUG(D_OTHER, "forward to MDS #"LPU64" ("DFID")\n",
1701                mds, PFID(&op_data->op_fid1));
1702
1703         op_data->op_fsuid = current->fsuid;
1704         op_data->op_fsgid = current->fsgid;
1705         op_data->op_cap   = current->cap_effective;
1706
1707         rc = md_link(lmv->tgts[mds].ltd_exp, op_data, request);
1708         if (rc == -ERESTART) {
1709                 LASSERT(*request != NULL);
1710                 DEBUG_REQ(D_WARNING|D_RPCTRACE, *request, 
1711                           "Got -ERESTART during link!\n");
1712                 ptlrpc_req_finished(*request);
1713                 *request = NULL;
1714                 
1715                 /*
1716                  * Directory got split. Time to update local object and repeat
1717                  * the request with proper MDS.
1718                  */
1719                 rc = lmv_handle_split(exp, &op_data->op_fid2);
1720                 if (rc == 0)
1721                         goto repeat;
1722         }
1723
1724         RETURN(rc);
1725 }
1726
1727 static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
1728                       const char *old, int oldlen, const char *new, int newlen,
1729                       struct ptlrpc_request **request)
1730 {
1731         struct obd_device *obd = exp->exp_obd;
1732         struct lmv_obd *lmv = &obd->u.lmv;
1733         int rc, mea_idx, loop = 0;
1734         struct lmv_obj *obj;
1735         mdsno_t mds;
1736         ENTRY;
1737
1738         CDEBUG(D_OTHER, "rename %*s in "DFID" to %*s in "DFID"\n",
1739                oldlen, old, PFID(&op_data->op_fid1),
1740                newlen, new, PFID(&op_data->op_fid2));
1741
1742         rc = lmv_check_connect(obd);
1743         if (rc)
1744                 RETURN(rc);
1745
1746         if (oldlen == 0) {
1747                 /*
1748                  * MDS with old dir entry is asking another MDS to create name
1749                  * there.
1750                  */
1751                 CDEBUG(D_OTHER,
1752                        "create %*s(%d/%d) in "DFID" pointing "
1753                        "to "DFID"\n", newlen, new, oldlen, newlen,
1754                        PFID(&op_data->op_fid2), PFID(&op_data->op_fid1));
1755
1756                 rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds);
1757                 if (rc)
1758                         RETURN(rc);
1759
1760                 /*
1761                  * Target directory can be split, sowe should forward request to
1762                  * the right MDS.
1763                  */
1764                 obj = lmv_obj_grab(obd, &op_data->op_fid2);
1765                 if (obj) {
1766                         mea_idx = raw_name2idx(obj->lo_hashtype,
1767                                                obj->lo_objcount,
1768                                                (char *)new, newlen);
1769                         op_data->op_fid2 = obj->lo_inodes[mea_idx].li_fid;
1770                         CDEBUG(D_OTHER, "Parent obj "DFID"\n",
1771                                PFID(&op_data->op_fid2));
1772                         lmv_obj_put(obj);
1773                 }
1774                 goto request;
1775         }
1776
1777 repeat:
1778         LASSERT(++loop <= 2);
1779         obj = lmv_obj_grab(obd, &op_data->op_fid1);
1780         if (obj) {
1781                 /*
1782                  * directory is already split, so we have to forward request to
1783                  * the right MDS.
1784                  */
1785                 mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1786                                        (char *)old, oldlen);
1787                 op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid;
1788                 mds = obj->lo_inodes[mea_idx].li_mds;
1789                 CDEBUG(D_OTHER, "Parent obj "DFID"\n", PFID(&op_data->op_fid1));
1790                 lmv_obj_put(obj);
1791         } else {
1792                 rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds);
1793                 if (rc)
1794                         RETURN(rc);
1795         }
1796
1797         obj = lmv_obj_grab(obd, &op_data->op_fid2);
1798         if (obj) {
1799                 /*
1800                  * Directory is already split, so we have to forward request to
1801                  * the right MDS.
1802                  */
1803                 mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1804                                        (char *)new, newlen);
1805
1806                 op_data->op_fid2 = obj->lo_inodes[mea_idx].li_fid;
1807                 CDEBUG(D_OTHER, "Parent obj "DFID"\n", PFID(&op_data->op_fid2));
1808                 lmv_obj_put(obj);
1809         }
1810         
1811 request:
1812         op_data->op_fsuid = current->fsuid;
1813         op_data->op_fsgid = current->fsgid;
1814         op_data->op_cap   = current->cap_effective;
1815
1816         rc = md_rename(lmv->tgts[mds].ltd_exp, op_data, old, oldlen,
1817                        new, newlen, request);
1818         if (rc == -ERESTART) {
1819                 LASSERT(*request != NULL);
1820                 DEBUG_REQ(D_WARNING|D_RPCTRACE, *request, 
1821                           "Got -ERESTART during rename!\n");
1822                 ptlrpc_req_finished(*request);
1823                 *request = NULL;
1824                 
1825                 /*
1826                  * Directory got split. Time to update local object and repeat
1827                  * the request with proper MDS.
1828                  */
1829                 rc = lmv_handle_split(exp, &op_data->op_fid1);
1830                 if (rc == 0)
1831                         goto repeat;
1832         }
1833         RETURN(rc);
1834 }
1835
1836 static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
1837                        void *ea, int ealen, void *ea2, int ea2len,
1838                        struct ptlrpc_request **request)
1839 {
1840         struct obd_device *obd = exp->exp_obd;
1841         struct lmv_obd *lmv = &obd->u.lmv;
1842         struct ptlrpc_request *req;
1843         struct obd_export *tgt_exp;
1844         struct lmv_obj *obj;
1845         int rc = 0, i;
1846         ENTRY;
1847
1848         rc = lmv_check_connect(obd);
1849         if (rc)
1850                 RETURN(rc);
1851
1852         obj = lmv_obj_grab(obd, &op_data->op_fid1);
1853
1854         CDEBUG(D_OTHER, "SETATTR for "DFID", valid 0x%x%s\n",
1855                PFID(&op_data->op_fid1), op_data->op_attr.ia_valid,
1856                obj ? ", split" : "");
1857
1858         if (obj) {
1859                 for (i = 0; i < obj->lo_objcount; i++) {
1860                         op_data->op_fid1 = obj->lo_inodes[i].li_fid;
1861
1862                         tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds);
1863                         if (IS_ERR(tgt_exp)) {
1864                                 rc = PTR_ERR(tgt_exp);
1865                                 break;
1866                         }
1867
1868                         rc = md_setattr(tgt_exp, op_data, ea, ealen,
1869                                         ea2, ea2len, &req);
1870
1871                         if (lu_fid_eq(&obj->lo_fid, &obj->lo_inodes[i].li_fid)) {
1872                                 /*
1873                                  * this is master object and this request should
1874                                  * be returned back to llite.
1875                                  */
1876                                 *request = req;
1877                         } else {
1878                                 ptlrpc_req_finished(req);
1879                         }
1880
1881                         if (rc)
1882                                 break;
1883                 }
1884                 lmv_obj_put(obj);
1885         } else {
1886                 tgt_exp = lmv_find_export(lmv, &op_data->op_fid1);
1887                 if (IS_ERR(tgt_exp))
1888                         RETURN(PTR_ERR(tgt_exp));
1889
1890                 rc = md_setattr(tgt_exp, op_data, ea, ealen, ea2,
1891                                 ea2len, request);
1892         }
1893         RETURN(rc);
1894 }
1895
1896 static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
1897                     struct obd_capa *oc, struct ptlrpc_request **request)
1898 {
1899         struct obd_device *obd = exp->exp_obd;
1900         struct lmv_obd *lmv = &obd->u.lmv;
1901         struct obd_export *tgt_exp;
1902         int rc;
1903         ENTRY;
1904
1905         rc = lmv_check_connect(obd);
1906         if (rc)
1907                 RETURN(rc);
1908
1909         tgt_exp = lmv_find_export(lmv, fid);
1910         if (IS_ERR(tgt_exp))
1911                 RETURN(PTR_ERR(tgt_exp));
1912
1913         rc = md_sync(tgt_exp, fid, oc, request);
1914         RETURN(rc);
1915 }
1916
1917 /* main purpose of LMV blocking ast is to remove split directory LMV
1918  * presentation object (struct lmv_obj) attached to the lock being revoked. */
1919 int lmv_blocking_ast(struct ldlm_lock *lock,
1920                      struct ldlm_lock_desc *desc,
1921                      void *data, int flag)
1922 {
1923         struct lustre_handle lockh;
1924         struct lmv_obj *obj;
1925         int rc;
1926         ENTRY;
1927
1928         switch (flag) {
1929         case LDLM_CB_BLOCKING:
1930                 ldlm_lock2handle(lock, &lockh);
1931                 rc = ldlm_cli_cancel(&lockh);
1932                 if (rc < 0) {
1933                         CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
1934                         RETURN(rc);
1935                 }
1936                 break;
1937         case LDLM_CB_CANCELING:
1938                 /* time to drop cached attrs for dirobj */
1939                 obj = lock->l_ast_data;
1940                 if (obj) {
1941                         CDEBUG(D_OTHER, "cancel %s on "LPU64"/"LPU64
1942                                ", master "DFID"\n",
1943                                lock->l_resource->lr_name.name[3] == 1 ?
1944                                "LOOKUP" : "UPDATE",
1945                                lock->l_resource->lr_name.name[0],
1946                                lock->l_resource->lr_name.name[1],
1947                                PFID(&obj->lo_fid));
1948                         lmv_obj_put(obj);
1949                 }
1950                 break;
1951         default:
1952                 LBUG();
1953         }
1954         RETURN(0);
1955 }
1956
1957 static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
1958                         struct obd_capa *oc, __u64 offset, struct page *page,
1959                         struct ptlrpc_request **request)
1960 {
1961         struct obd_device *obd = exp->exp_obd;
1962         struct lmv_obd *lmv = &obd->u.lmv;
1963         struct obd_export *tgt_exp;
1964         struct lu_fid rid = *fid;
1965         struct lmv_obj *obj;
1966         int i = 0, rc;
1967         ENTRY;
1968
1969         rc = lmv_check_connect(obd);
1970         if (rc)
1971                 RETURN(rc);
1972
1973         CDEBUG(D_INFO, "READPAGE at %llu from "DFID"\n", offset, PFID(&rid));
1974
1975         obj = lmv_obj_grab(obd, fid);
1976         if (obj) {
1977                 __u64 index = offset;
1978                 __u64 seg = MAX_HASH_SIZE;
1979                 lmv_obj_lock(obj);
1980
1981                 LASSERT(obj->lo_objcount > 0);
1982                 do_div(seg, obj->lo_objcount);
1983                 do_div(index, (__u32)seg);
1984                 i = (int)index;
1985                 rid = obj->lo_inodes[i].li_fid;
1986                 tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds);
1987
1988                 lmv_obj_unlock(obj);
1989
1990                 CDEBUG(D_INFO, "forward to "DFID" with offset %lu i %d\n",
1991                        PFID(&rid), (unsigned long)offset, i);
1992         } else {
1993                 tgt_exp = lmv_find_export(lmv, &rid);
1994         }
1995
1996         if (IS_ERR(tgt_exp))
1997                 GOTO(cleanup, rc = PTR_ERR(tgt_exp));
1998
1999         rc = md_readpage(tgt_exp, &rid, oc, offset, page, request);
2000         if (rc)
2001                 GOTO(cleanup, rc);
2002         if (obj && i < obj->lo_objcount - 1) {
2003                 struct lu_dirpage *dp;
2004                 __u32 end;
2005                 (void)kmap(page);
2006                 dp = cfs_page_address(page);
2007                 end = le32_to_cpu(dp->ldp_hash_end);
2008                 if (end == ~0ul) {
2009                         __u64 max_hash = MAX_HASH_SIZE;
2010
2011                         do_div(max_hash, obj->lo_objcount);
2012                         dp->ldp_hash_end = (__u32)max_hash * (i + 1);
2013                         CDEBUG(D_INFO, ""DFID" reset end %lu i %d\n", PFID(&rid),
2014                                         (unsigned long)dp->ldp_hash_end, i);
2015                 }
2016         }
2017         /*
2018          * Here we could remove "." and ".." from all pages which at not from
2019          * master. But MDS has only "." and ".." for master dir.
2020          */
2021         EXIT;
2022 cleanup:
2023         if (obj)
2024                 lmv_obj_put(obj);
2025         return rc;
2026 }
2027
2028 static int lmv_unlink_slaves(struct obd_export *exp,
2029                              struct md_op_data *op_data,
2030                              struct ptlrpc_request **req)
2031 {
2032         struct obd_device *obd = exp->exp_obd;
2033         struct lmv_obd *lmv = &obd->u.lmv;
2034         struct lmv_stripe_md *mea = op_data->op_mea1;
2035         struct md_op_data *op_data2;
2036         struct obd_export *tgt_exp;
2037         int i, rc = 0;
2038         ENTRY;
2039
2040         OBD_ALLOC_PTR(op_data2);
2041         if (op_data2 == NULL)
2042                 RETURN(-ENOMEM);
2043
2044         LASSERT(mea != NULL);
2045         for (i = 0; i < mea->mea_count; i++) {
2046                 memset(op_data2, 0, sizeof(*op_data2));
2047                 op_data2->op_fid1 = mea->mea_ids[i];
2048                 op_data2->op_mode = MDS_MODE_DONT_LOCK | S_IFDIR;
2049                 op_data2->op_fsuid = current->fsuid;
2050                 op_data2->op_fsgid = current->fsgid;
2051                 tgt_exp = lmv_find_export(lmv, &op_data2->op_fid1);
2052                 if (IS_ERR(tgt_exp))
2053                         GOTO(out_free_op_data2, rc = PTR_ERR(tgt_exp));
2054
2055                 if (tgt_exp == NULL)
2056                         continue;
2057
2058                 rc = md_unlink(tgt_exp, op_data2, req);
2059
2060                 CDEBUG(D_OTHER, "unlink slave "DFID" -> %d\n",
2061                        PFID(&mea->mea_ids[i]), rc);
2062
2063                 if (*req) {
2064                         ptlrpc_req_finished(*req);
2065                         *req = NULL;
2066                 }
2067                 if (rc)
2068                         GOTO(out_free_op_data2, rc);
2069         }
2070
2071         EXIT;
2072 out_free_op_data2:
2073         OBD_FREE_PTR(op_data2);
2074         return rc;
2075 }
2076
2077 static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
2078                       struct ptlrpc_request **request)
2079 {
2080         struct obd_device *obd = exp->exp_obd;
2081         struct lmv_obd *lmv = &obd->u.lmv;
2082         struct obd_export *tgt_exp = NULL;
2083         int rc, loop = 0;
2084         ENTRY;
2085
2086         rc = lmv_check_connect(obd);
2087         if (rc)
2088                 RETURN(rc);
2089
2090         if (op_data->op_namelen == 0 && op_data->op_mea1 != NULL) {
2091                 /* mds asks to remove slave objects */
2092                 rc = lmv_unlink_slaves(exp, op_data, request);
2093                 RETURN(rc);
2094         }
2095
2096 repeat:
2097         LASSERT(++loop <= 2);
2098         if (op_data->op_namelen != 0) {
2099                 struct lmv_obj *obj;
2100                 int mea_idx;
2101
2102                 obj = lmv_obj_grab(obd, &op_data->op_fid1);
2103                 if (obj) {
2104                         mea_idx = raw_name2idx(obj->lo_hashtype,
2105                                                obj->lo_objcount,
2106                                                op_data->op_name,
2107                                                op_data->op_namelen);
2108                         op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid;
2109                         tgt_exp = lmv_get_export(lmv,
2110                                                  obj->lo_inodes[mea_idx].li_mds);
2111                         lmv_obj_put(obj);
2112                         CDEBUG(D_OTHER, "unlink '%*s' in "DFID" -> %u\n",
2113                                op_data->op_namelen, op_data->op_name,
2114                                PFID(&op_data->op_fid1), mea_idx);
2115                 }
2116         } else {
2117                 CDEBUG(D_OTHER, "drop i_nlink on "DFID"\n",
2118                        PFID(&op_data->op_fid1));
2119         }
2120         if (tgt_exp == NULL)
2121                 tgt_exp = lmv_find_export(lmv, &op_data->op_fid1);
2122         if (IS_ERR(tgt_exp))
2123                 RETURN(PTR_ERR(tgt_exp));
2124
2125         op_data->op_fsuid = current->fsuid;
2126         op_data->op_fsgid = current->fsgid;
2127         op_data->op_cap   = current->cap_effective;
2128         rc = md_unlink(tgt_exp, op_data, request);
2129         if (rc == -ERESTART) {
2130                 LASSERT(*request != NULL);
2131                 DEBUG_REQ(D_WARNING|D_RPCTRACE, *request, 
2132                           "Got -ERESTART during unlink!\n");
2133                 ptlrpc_req_finished(*request);
2134                 *request = NULL;
2135                 
2136                 /*
2137                  * Directory got split. Time to update local object and repeat
2138                  * the request with proper MDS.
2139                  */
2140                 rc = lmv_handle_split(exp, &op_data->op_fid1);
2141                 if (rc == 0)
2142                         goto repeat;
2143         }
2144         RETURN(rc);
2145 }
2146
2147 static int lmv_llog_init(struct obd_device *obd, struct obd_llogs* llogs,
2148                          struct obd_device *tgt, int count,
2149                          struct llog_catid *logid, struct obd_uuid *uuid)
2150 {
2151         struct llog_ctxt *ctxt;
2152         int rc;
2153         ENTRY;
2154
2155         rc = llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
2156                         &llog_client_ops);
2157         if (rc == 0) {
2158                 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
2159                 ctxt->loc_imp = tgt->u.cli.cl_import;
2160         }
2161
2162         RETURN(rc);
2163 }
2164
2165 static int lmv_llog_finish(struct obd_device *obd, int count)
2166 {
2167         int rc;
2168         ENTRY;
2169
2170         rc = llog_cleanup(llog_get_context(obd, LLOG_CONFIG_REPL_CTXT));
2171         RETURN(rc);
2172 }
2173
2174 static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2175 {
2176         int rc = 0;
2177
2178         switch (stage) {
2179         case OBD_CLEANUP_EARLY:
2180                 /* XXX: here should be calling obd_precleanup() down to
2181                  * stack. */
2182                 break;
2183         case OBD_CLEANUP_SELF_EXP:
2184                 rc = obd_llog_finish(obd, 0);
2185                 if (rc != 0)
2186                         CERROR("failed to cleanup llogging subsystems\n");
2187                 break;
2188         default:
2189                 break;
2190         }
2191         RETURN(rc);
2192 }
2193
2194 static int lmv_get_info(struct obd_export *exp, __u32 keylen,
2195                         void *key, __u32 *vallen, void *val)
2196 {
2197         struct obd_device *obd;
2198         struct lmv_obd *lmv;
2199         int rc = 0;
2200         ENTRY;
2201
2202         obd = class_exp2obd(exp);
2203         if (obd == NULL) {
2204                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2205                        exp->exp_handle.h_cookie);
2206                 RETURN(-EINVAL);
2207         }
2208
2209         lmv = &obd->u.lmv;
2210         if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
2211                 struct lmv_tgt_desc *tgts;
2212                 int i;
2213
2214                 rc = lmv_check_connect(obd);
2215                 if (rc)
2216                         RETURN(rc);
2217
2218                 LASSERT(*vallen == sizeof(__u32));
2219                 for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count;
2220                      i++, tgts++) {
2221
2222                         /* all tgts should be connected when this get called. */
2223                         if (!tgts || !tgts->ltd_exp) {
2224                                 CERROR("target not setup?\n");
2225                                 continue;
2226                         }
2227
2228                         if (!obd_get_info(tgts->ltd_exp, keylen, key,
2229                                           vallen, val))
2230                                 RETURN(0);
2231                 }
2232                 RETURN(-EINVAL);
2233         } else if (KEY_IS(KEY_MAX_EASIZE) || KEY_IS(KEY_CONN_DATA)) {
2234                 rc = lmv_check_connect(obd);
2235                 if (rc)
2236                         RETURN(rc);
2237
2238                 /* forwarding this request to first MDS, it should know LOV
2239                  * desc. */
2240                 rc = obd_get_info(lmv->tgts[0].ltd_exp, keylen, key,
2241                                   vallen, val);
2242                 RETURN(rc);
2243         }
2244
2245         CDEBUG(D_IOCTL, "invalid key\n");
2246         RETURN(-EINVAL);
2247 }
2248
2249 int lmv_set_info_async(struct obd_export *exp, obd_count keylen,
2250                        void *key, obd_count vallen, void *val,
2251                        struct ptlrpc_request_set *set)
2252 {
2253         struct lmv_tgt_desc    *tgt;
2254         struct obd_device      *obd;
2255         struct lmv_obd         *lmv;
2256         int rc = 0;
2257         ENTRY;
2258
2259         obd = class_exp2obd(exp);
2260         if (obd == NULL) {
2261                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2262                        exp->exp_handle.h_cookie);
2263                 RETURN(-EINVAL);
2264         }
2265         lmv = &obd->u.lmv;
2266
2267         if (KEY_IS(KEY_FLUSH_CTX)) {
2268                 int i, err = 0;
2269
2270                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2271                         tgt = &lmv->tgts[i];
2272
2273                         if (!tgt->ltd_exp)
2274                                 continue;
2275
2276                         err = obd_set_info_async(tgt->ltd_exp,
2277                                                  keylen, key, vallen, val, set);
2278                         if (err && rc == 0)
2279                                 rc = err;
2280                 }
2281
2282                 RETURN(rc);
2283         }
2284         if (KEY_IS("pag")) {
2285                 struct obd_export *exp;
2286                 int i, err = 0;
2287
2288                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2289                         exp = lmv->tgts[i].ltd_exp;
2290
2291                         if (exp == NULL) {
2292                                 struct obd_device *tgt_obd;
2293
2294                                 tgt_obd = class_find_client_obd(
2295                                                         &lmv->tgts[i].ltd_uuid,
2296                                                         LUSTRE_MDC_NAME,
2297                                                         &obd->obd_uuid);
2298                                 if (tgt_obd == NULL) {
2299                                         CERROR("can't find obd %s\n",
2300                                                lmv->tgts[i].ltd_uuid.uuid);
2301                                         continue;
2302                                 }
2303                                 exp = tgt_obd->obd_self_export;
2304                         }
2305
2306                         err = obd_set_info_async(exp, keylen, key, vallen,
2307                                                  val, set);
2308                         if (err && rc == 0)
2309                                 rc = err;
2310                 }
2311
2312                 RETURN(rc);
2313         }
2314
2315         RETURN(-EINVAL);
2316 }
2317
2318 int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
2319                struct lov_stripe_md *lsm)
2320 {
2321         struct obd_device *obd = class_exp2obd(exp);
2322         struct lmv_obd *lmv = &obd->u.lmv;
2323         struct lmv_stripe_md *meap, *lsmp;
2324         int mea_size, i;
2325         ENTRY;
2326
2327         mea_size = lmv_get_easize(lmv);
2328         if (!lmmp)
2329                 RETURN(mea_size);
2330
2331         if (*lmmp && !lsm) {
2332                 OBD_FREE(*lmmp, mea_size);
2333                 *lmmp = NULL;
2334                 RETURN(0);
2335         }
2336
2337         if (*lmmp == NULL) {
2338                 OBD_ALLOC(*lmmp, mea_size);
2339                 if (*lmmp == NULL)
2340                         RETURN(-ENOMEM);
2341         }
2342
2343         if (!lsm)
2344                 RETURN(mea_size);
2345
2346         lsmp = (struct lmv_stripe_md *)lsm;
2347         meap = (struct lmv_stripe_md *)*lmmp;
2348
2349         if (lsmp->mea_magic != MEA_MAGIC_LAST_CHAR &&
2350             lsmp->mea_magic != MEA_MAGIC_ALL_CHARS)
2351                 RETURN(-EINVAL);
2352
2353         meap->mea_magic = cpu_to_le32(lsmp->mea_magic);
2354         meap->mea_count = cpu_to_le32(lsmp->mea_count);
2355         meap->mea_master = cpu_to_le32(lsmp->mea_master);
2356
2357         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2358                 meap->mea_ids[i] = meap->mea_ids[i];
2359                 fid_cpu_to_le(&meap->mea_ids[i], &meap->mea_ids[i]);
2360         }
2361
2362         RETURN(mea_size);
2363 }
2364
2365 int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
2366                  struct lov_mds_md *lmm, int lmm_size)
2367 {
2368         struct obd_device *obd = class_exp2obd(exp);
2369         struct lmv_stripe_md **tmea = (struct lmv_stripe_md **)lsmp;
2370         struct lmv_stripe_md *mea = (struct lmv_stripe_md *)lmm;
2371         struct lmv_obd *lmv = &obd->u.lmv;
2372         int mea_size, i;
2373         __u32 magic;
2374         ENTRY;
2375
2376         mea_size = lmv_get_easize(lmv);
2377         if (lsmp == NULL)
2378                 return mea_size;
2379
2380         if (*lsmp != NULL && lmm == NULL) {
2381                 OBD_FREE(*tmea, mea_size);
2382                 RETURN(0);
2383         }
2384
2385         LASSERT(mea_size == lmm_size);
2386
2387         OBD_ALLOC(*tmea, mea_size);
2388         if (*tmea == NULL)
2389                 RETURN(-ENOMEM);
2390
2391         if (!lmm)
2392                 RETURN(mea_size);
2393
2394         if (mea->mea_magic == MEA_MAGIC_LAST_CHAR ||
2395             mea->mea_magic == MEA_MAGIC_ALL_CHARS ||
2396             mea->mea_magic == MEA_MAGIC_HASH_SEGMENT)
2397         {
2398                 magic = le32_to_cpu(mea->mea_magic);
2399         } else {
2400                 /* old mea is not handled here */
2401                 LBUG();
2402         }
2403
2404         (*tmea)->mea_magic = magic;
2405         (*tmea)->mea_count = le32_to_cpu(mea->mea_count);
2406         (*tmea)->mea_master = le32_to_cpu(mea->mea_master);
2407
2408         for (i = 0; i < (*tmea)->mea_count; i++) {
2409                 (*tmea)->mea_ids[i] = mea->mea_ids[i];
2410                 fid_le_to_cpu(&(*tmea)->mea_ids[i], &(*tmea)->mea_ids[i]);
2411         }
2412         RETURN(mea_size);
2413 }
2414
2415 static int lmv_cancel_unused(struct obd_export *exp,
2416                              const struct lu_fid *fid,
2417                              int flags, void *opaque)
2418 {
2419         struct obd_device *obd = exp->exp_obd;
2420         struct lmv_obd *lmv = &obd->u.lmv;
2421         int rc = 0, err, i;
2422         ENTRY;
2423
2424         LASSERT(fid != NULL);
2425
2426         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2427                 if (!lmv->tgts[i].ltd_exp || !lmv->tgts[i].ltd_active)
2428                         continue;
2429
2430                 err = md_cancel_unused(lmv->tgts[i].ltd_exp,
2431                                        fid, flags, opaque);
2432                 if (!rc)
2433                         rc = err;
2434         }
2435         RETURN(rc);
2436 }
2437
2438 int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data)
2439 {
2440         struct obd_device *obd = exp->exp_obd;
2441         struct lmv_obd *lmv = &obd->u.lmv;
2442
2443         ENTRY;
2444         RETURN(md_set_lock_data(lmv->tgts[0].ltd_exp, lockh, data));
2445 }
2446
2447 int lmv_lock_match(struct obd_export *exp, int flags,
2448                    const struct lu_fid *fid, ldlm_type_t type,
2449                    ldlm_policy_data_t *policy, ldlm_mode_t mode,
2450                    struct lustre_handle *lockh)
2451 {
2452         struct obd_device *obd = exp->exp_obd;
2453         struct lmv_obd *lmv = &obd->u.lmv;
2454         int i, rc = 0;
2455         ENTRY;
2456
2457         CDEBUG(D_OTHER, "lock match for "DFID"\n", PFID(fid));
2458
2459         /* with CMD every object can have two locks in different namespaces:
2460          * lookup lock in space of mds storing direntry and update/open lock in
2461          * space of mds storing inode. Thus we check all targets, not only that
2462          * one fid was created in. */
2463         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2464                 rc = md_lock_match(lmv->tgts[i].ltd_exp, flags, fid,
2465                                    type, policy, mode, lockh);
2466                 if (rc)
2467                         RETURN(1);
2468         }
2469
2470         RETURN(rc);
2471 }
2472
2473 int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
2474                       int offset, struct obd_export *dt_exp,
2475                       struct obd_export *md_exp, struct lustre_md *md)
2476 {
2477         struct obd_device *obd = exp->exp_obd;
2478         struct lmv_obd *lmv = &obd->u.lmv;
2479         int rc;
2480
2481         ENTRY;
2482         rc = md_get_lustre_md(lmv->tgts[0].ltd_exp, req, offset, dt_exp, md_exp,
2483                               md);
2484         RETURN(rc);
2485 }
2486
2487 int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
2488 {
2489         struct obd_device *obd = exp->exp_obd;
2490         struct lmv_obd *lmv = &obd->u.lmv;
2491
2492         ENTRY;
2493         RETURN(md_free_lustre_md(lmv->tgts[0].ltd_exp, md));
2494 }
2495
2496 int lmv_set_open_replay_data(struct obd_export *exp,
2497                              struct obd_client_handle *och,
2498                              struct ptlrpc_request *open_req)
2499 {
2500         struct obd_device *obd = exp->exp_obd;
2501         struct lmv_obd *lmv = &obd->u.lmv;
2502         struct obd_export *tgt_exp;
2503
2504         ENTRY;
2505
2506         tgt_exp = lmv_find_export(lmv, &och->och_fid);
2507         if (IS_ERR(tgt_exp))
2508                 RETURN(PTR_ERR(tgt_exp));
2509
2510         RETURN(md_set_open_replay_data(tgt_exp, och, open_req));
2511 }
2512
2513 int lmv_clear_open_replay_data(struct obd_export *exp,
2514                                struct obd_client_handle *och)
2515 {
2516         struct obd_device *obd = exp->exp_obd;
2517         struct lmv_obd *lmv = &obd->u.lmv;
2518         struct obd_export *tgt_exp;
2519         ENTRY;
2520
2521         tgt_exp = lmv_find_export(lmv, &och->och_fid);
2522         if (IS_ERR(tgt_exp))
2523                 RETURN(PTR_ERR(tgt_exp));
2524
2525         RETURN(md_clear_open_replay_data(tgt_exp, och));
2526 }
2527
2528 static int lmv_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid,
2529                                struct obd_capa *oc,
2530                                struct ptlrpc_request **request)
2531 {
2532         struct obd_device *obd = exp->exp_obd;
2533         struct lmv_obd *lmv = &obd->u.lmv;
2534         struct obd_export *tgt_exp;
2535         int rc;
2536
2537         ENTRY;
2538
2539         rc = lmv_check_connect(obd);
2540         if (rc)
2541                 RETURN(rc);
2542
2543         tgt_exp = lmv_find_export(lmv, fid);
2544         if (IS_ERR(tgt_exp))
2545                 RETURN(PTR_ERR(tgt_exp));
2546
2547         rc = md_get_remote_perm(tgt_exp, fid, oc, request);
2548
2549         RETURN(rc);
2550 }
2551
2552 static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *oc,
2553                           renew_capa_cb_t cb)
2554 {
2555         struct obd_device *obd = exp->exp_obd;
2556         struct lmv_obd *lmv = &obd->u.lmv;
2557         struct obd_export *tgt_exp;
2558         int rc;
2559         ENTRY;
2560
2561         rc = lmv_check_connect(obd);
2562         if (rc)
2563                 RETURN(rc);
2564
2565         tgt_exp = lmv_find_export(lmv, &oc->c_capa.lc_fid);
2566         if (IS_ERR(tgt_exp))
2567                 RETURN(PTR_ERR(tgt_exp));
2568
2569         rc = md_renew_capa(tgt_exp, oc, cb);
2570         RETURN(rc);
2571 }
2572
2573 struct obd_ops lmv_obd_ops = {
2574         .o_owner                = THIS_MODULE,
2575         .o_setup                = lmv_setup,
2576         .o_cleanup              = lmv_cleanup,
2577         .o_precleanup           = lmv_precleanup,
2578         .o_process_config       = lmv_process_config,
2579         .o_connect              = lmv_connect,
2580         .o_disconnect           = lmv_disconnect,
2581         .o_statfs               = lmv_statfs,
2582         .o_llog_init            = lmv_llog_init,
2583         .o_llog_finish          = lmv_llog_finish,
2584         .o_get_info             = lmv_get_info,
2585         .o_set_info_async       = lmv_set_info_async,
2586         .o_packmd               = lmv_packmd,
2587         .o_unpackmd             = lmv_unpackmd,
2588         .o_notify               = lmv_notify,
2589         .o_fid_alloc            = lmv_fid_alloc,
2590         .o_fid_delete           = lmv_fid_delete,
2591         .o_iocontrol            = lmv_iocontrol
2592 };
2593
2594 struct md_ops lmv_md_ops = {
2595         .m_getstatus            = lmv_getstatus,
2596         .m_change_cbdata        = lmv_change_cbdata,
2597         .m_close                = lmv_close,
2598         .m_create               = lmv_create,
2599         .m_done_writing         = lmv_done_writing,
2600         .m_enqueue              = lmv_enqueue,
2601         .m_getattr              = lmv_getattr,
2602         .m_getxattr             = lmv_getxattr,
2603         .m_getattr_name         = lmv_getattr_name,
2604         .m_intent_lock          = lmv_intent_lock,
2605         .m_link                 = lmv_link,
2606         .m_rename               = lmv_rename,
2607         .m_setattr              = lmv_setattr,
2608         .m_setxattr             = lmv_setxattr,
2609         .m_sync                 = lmv_sync,
2610         .m_readpage             = lmv_readpage,
2611         .m_unlink               = lmv_unlink,
2612         .m_init_ea_size         = lmv_init_ea_size,
2613         .m_cancel_unused        = lmv_cancel_unused,
2614         .m_set_lock_data        = lmv_set_lock_data,
2615         .m_lock_match           = lmv_lock_match,
2616         .m_get_lustre_md        = lmv_get_lustre_md,
2617         .m_free_lustre_md       = lmv_free_lustre_md,
2618         .m_set_open_replay_data = lmv_set_open_replay_data,
2619         .m_clear_open_replay_data = lmv_clear_open_replay_data,
2620         .m_get_remote_perm      = lmv_get_remote_perm,
2621         .m_renew_capa           = lmv_renew_capa
2622 };
2623
2624 int __init lmv_init(void)
2625 {
2626         struct lprocfs_static_vars lvars;
2627         int rc;
2628
2629         obj_cache = kmem_cache_create("lmv_objects",
2630                                       sizeof(struct lmv_obj),
2631                                       0, 0, NULL, NULL);
2632         if (!obj_cache) {
2633                 CERROR("error allocating lmv objects cache\n");
2634                 return -ENOMEM;
2635         }
2636
2637         lprocfs_init_vars(lmv, &lvars);
2638         rc = class_register_type(&lmv_obd_ops, &lmv_md_ops,
2639                                  lvars.module_vars, LUSTRE_LMV_NAME, NULL);
2640         if (rc)
2641                 kmem_cache_destroy(obj_cache);
2642
2643         return rc;
2644 }
2645
2646 #ifdef __KERNEL__
2647 static void lmv_exit(void)
2648 {
2649         class_unregister_type(LUSTRE_LMV_NAME);
2650
2651         LASSERTF(kmem_cache_destroy(obj_cache) == 0,
2652                  "can't free lmv objects cache, %d object(s)"
2653                  "still in use\n", atomic_read(&obj_cache_count));
2654 }
2655
2656 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2657 MODULE_DESCRIPTION("Lustre Logical Metadata Volume OBD driver");
2658 MODULE_LICENSE("GPL");
2659
2660 module_init(lmv_init);
2661 module_exit(lmv_exit);
2662 #endif