Whamcloud - gitweb
land lustre part of b_hd_sec on HEAD.
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LMV
26 #ifdef __KERNEL__
27 #include <linux/slab.h>
28 #include <linux/module.h>
29 #include <linux/init.h>
30 #include <linux/slab.h>
31 #include <linux/pagemap.h>
32 #include <asm/div64.h>
33 #include <linux/seq_file.h>
34 #include <linux/namei.h>
35 #else
36 #include <liblustre.h>
37 #endif
38 #include <linux/ext2_fs.h>
39
40 #include <linux/obd_support.h>
41 #include <linux/lustre_lib.h>
42 #include <linux/lustre_net.h>
43 #include <linux/lustre_idl.h>
44 #include <linux/lustre_dlm.h>
45 #include <linux/lustre_mds.h>
46 #include <linux/obd_class.h>
47 #include <linux/obd_ost.h>
48 #include <linux/lprocfs_status.h>
49 #include <linux/lustre_fsfilt.h>
50 #include <linux/obd_lmv.h>
51 #include <linux/lustre_lite.h>
52 #include "lmv_internal.h"
53
54 /* object cache. */
55 kmem_cache_t *obj_cache;
56 atomic_t obj_cache_count = ATOMIC_INIT(0);
57
58 static void lmv_activate_target(struct lmv_obd *lmv,
59                                 struct lmv_tgt_desc *tgt,
60                                 int activate)
61 {
62         if (tgt->active == activate)
63                 return;
64         
65         tgt->active = activate;
66         lmv->desc.ld_active_tgt_count += (activate ? 1 : -1);
67 }
68
69 /* Error codes:
70  *
71  *  -EINVAL  : UUID can't be found in the LMV's target list
72  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
73  *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
74  */
75 static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
76                               int activate)
77 {
78         struct lmv_tgt_desc *tgt;
79         struct obd_device *obd;
80         int i, rc = 0;
81         ENTRY;
82
83         CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
84                lmv, uuid->uuid, activate);
85
86         spin_lock(&lmv->lmv_lock);
87         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
88                 if (tgt->ltd_exp == NULL)
89                         continue;
90
91                 CDEBUG(D_INFO, "lmv idx %d is %s conn "LPX64"\n",
92                        i, tgt->uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
93
94                 if (strncmp(uuid->uuid, tgt->uuid.uuid, sizeof uuid->uuid) == 0)
95                         break;
96         }
97
98         if (i == lmv->desc.ld_tgt_count)
99                 GOTO(out_lmv_lock, rc = -EINVAL);
100
101         obd = class_exp2obd(tgt->ltd_exp);
102         if (obd == NULL)
103                 GOTO(out_lmv_lock, rc = -ENOTCONN);
104
105         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
106                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
107                obd->obd_type->typ_name, i);
108         LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0);
109
110         if (tgt->active == activate) {
111                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
112                        activate ? "" : "in");
113                 GOTO(out_lmv_lock, rc);
114         }
115
116         CDEBUG(D_INFO, "Marking OBD %p %sactive\n",
117                obd, activate ? "" : "in");
118
119         lmv_activate_target(lmv, tgt, activate);
120
121         EXIT;
122         
123  out_lmv_lock:
124         spin_unlock(&lmv->lmv_lock);
125         return rc;
126 }
127
128 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
129                       int active, void *data)
130 {
131         struct obd_uuid *uuid;
132         int rc;
133         ENTRY;
134
135         if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) {
136                 CERROR("unexpected notification of %s %s!\n",
137                        watched->obd_type->typ_name,
138                        watched->obd_name);
139                 RETURN(-EINVAL);
140         }
141         uuid = &watched->u.cli.cl_import->imp_target_uuid;
142
143         /* Set MDC as active before notifying the observer, so the observer can
144          * use the MDC normally.
145          */
146         rc = lmv_set_mdc_active(&obd->u.lmv, uuid, active);
147         if (rc) {
148                 CERROR("%sactivation of %s failed: %d\n",
149                        active ? "" : "de", uuid->uuid, rc);
150                 RETURN(rc);
151         }
152
153         if (obd->obd_observer)
154                 /* Pass the notification up the chain. */
155                 rc = obd_notify(obd->obd_observer, watched, active, data);
156
157         RETURN(rc);
158 }
159
160 int lmv_attach(struct obd_device *dev, obd_count len, void *data)
161 {
162         struct lprocfs_static_vars lvars;
163         int rc;
164         ENTRY;
165
166         lprocfs_init_vars(lmv, &lvars);
167         rc = lprocfs_obd_attach(dev, lvars.obd_vars);
168 #ifdef __KERNEL__
169         if (rc == 0) {
170                 struct proc_dir_entry *entry;
171                 
172                 entry = create_proc_entry("target_obd_status", 0444, 
173                                            dev->obd_proc_entry);
174                 if (entry == NULL)
175                         RETURN(-ENOMEM);
176                 entry->proc_fops = &lmv_proc_target_fops; 
177                 entry->data = dev;
178        }
179 #endif
180         RETURN (rc);
181 }
182
183 int lmv_detach(struct obd_device *dev)
184 {
185         return lprocfs_obd_detach(dev);
186 }
187
188 /* this is fake connect function. Its purpose is to initialize lmv and say
189  * caller that everything is okay. Real connection will be performed later. */
190 static int lmv_connect(struct lustre_handle *conn, struct obd_device *obd,
191                        struct obd_uuid *cluuid, unsigned long flags)
192 {
193 #ifdef __KERNEL__
194         struct proc_dir_entry *lmv_proc_dir;
195 #endif
196         struct lmv_obd *lmv = &obd->u.lmv;
197         struct obd_export *exp;
198         int rc = 0;
199         ENTRY;
200
201         rc = class_connect(conn, obd, cluuid);
202         if (rc) {
203                 CERROR("class_connection() returned %d\n", rc);
204                 RETURN(rc);
205         }
206
207         exp = class_conn2export(conn);
208         
209         /* we don't want to actually do the underlying connections more than
210          * once, so keep track. */
211         lmv->refcount++;
212         if (lmv->refcount > 1) {
213                 class_export_put(exp);
214                 RETURN(0);
215         }
216
217         lmv->exp = exp;
218         lmv->connected = 0;
219         lmv->cluuid = *cluuid;
220         lmv->connect_flags = flags;
221         sema_init(&lmv->init_sem, 1);
222
223 #ifdef __KERNEL__
224         lmv_proc_dir = lprocfs_register("target_obds", obd->obd_proc_entry,
225                                         NULL, NULL);
226         if (IS_ERR(lmv_proc_dir)) {
227                 CERROR("could not register /proc/fs/lustre/%s/%s/target_obds.",
228                        obd->obd_type->typ_name, obd->obd_name);
229                 lmv_proc_dir = NULL;
230         }
231 #endif
232
233         /* 
234          * all real clients shouls perform actual connection rightaway, because
235          * it is possible, that LMV will not have opportunity to connect
236          * targets, as MDC stuff will bit called directly, for instance while
237          * reading ../mdc/../kbytesfree procfs file, etc.
238          */
239         if (flags & OBD_OPT_REAL_CLIENT)
240                 rc = lmv_check_connect(obd);
241
242 #ifdef __KERNEL__
243         if (rc) {
244                 if (lmv_proc_dir)
245                         lprocfs_remove(lmv_proc_dir);
246         }
247 #endif
248
249         RETURN(rc);
250 }
251
252 void lmv_set_timeouts(struct obd_device *obd)
253 {
254         struct lmv_tgt_desc *tgts;
255         struct lmv_obd *lmv;
256         int i;
257
258         lmv = &obd->u.lmv;
259         if (lmv->server_timeout == 0)
260                 return;
261
262         if (lmv->connected == 0)
263                 return;
264
265         for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgts++) {
266                 if (tgts->ltd_exp == NULL)
267                         continue;
268                 
269                 obd_set_info(tgts->ltd_exp, strlen("inter_mds"),
270                              "inter_mds", 0, NULL);
271         }
272 }
273
274 #define MAX_STRING_SIZE 128
275
276 /* performs a check if passed obd is connected. If no - connect it. */
277 int lmv_check_connect(struct obd_device *obd)
278 {
279 #ifdef __KERNEL__
280         struct proc_dir_entry *lmv_proc_dir;
281 #endif
282         struct lmv_obd *lmv = &obd->u.lmv;
283         struct lmv_tgt_desc *tgts;
284         struct obd_uuid *cluuid;
285         struct obd_export *exp;
286         int rc, rc2, i;
287         ENTRY;
288
289         if (lmv->connected)
290                 RETURN(0);
291         
292         down(&lmv->init_sem);
293         if (lmv->connected) {
294                 up(&lmv->init_sem);
295                 RETURN(0);
296         }
297
298         cluuid = &lmv->cluuid;
299         exp = lmv->exp;
300         
301         CDEBUG(D_OTHER, "time to connect %s to %s\n",
302                cluuid->uuid, obd->obd_name);
303
304         for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgts++) {
305                 struct obd_uuid lmv_mdc_uuid = { "LMV_MDC_UUID" };
306                 struct lustre_handle conn = {0, };
307                 struct obd_device *tgt_obd;
308
309                 LASSERT(tgts != NULL);
310
311                 tgt_obd = class_find_client_obd(&tgts->uuid, LUSTRE_MDC_NAME, 
312                                                 &obd->obd_uuid);
313                 if (!tgt_obd) {
314                         CERROR("target %s not attached\n", tgts->uuid.uuid);
315                         GOTO(out_disc, rc = -EINVAL);
316                 }
317
318                 /* for MDS: don't connect to yourself */
319                 if (obd_uuid_equals(&tgts->uuid, cluuid)) {
320                         CDEBUG(D_OTHER, "don't connect back to %s\n",
321                                cluuid->uuid);
322                         tgts->ltd_exp = NULL;
323                         continue;
324                 }
325
326                 CDEBUG(D_OTHER, "connect to %s(%s) - %s, %s FOR %s\n",
327                         tgt_obd->obd_name, tgt_obd->obd_uuid.uuid,
328                         tgts->uuid.uuid, obd->obd_uuid.uuid,
329                         cluuid->uuid);
330
331                 if (!tgt_obd->obd_set_up) {
332                         CERROR("target %s not set up\n", tgts->uuid.uuid);
333                         GOTO(out_disc, rc = -EINVAL);
334                 }
335                 
336                 rc = obd_connect(&conn, tgt_obd, &lmv_mdc_uuid,
337                                  lmv->connect_flags);
338                 if (rc) {
339                         CERROR("target %s connect error %d\n",
340                                 tgts->uuid.uuid, rc);
341                         GOTO(out_disc, rc);
342                 }
343                 tgts->ltd_exp = class_conn2export(&conn);
344
345                 obd_init_ea_size(tgts->ltd_exp, lmv->max_easize,
346                                  lmv->max_cookiesize);
347
348                 rc = obd_register_observer(tgt_obd, obd);
349                 if (rc) {
350                         CERROR("target %s register_observer error %d\n",
351                                tgts->uuid.uuid, rc);
352                         obd_disconnect(tgts->ltd_exp, 0);
353                         GOTO(out_disc, rc);
354                 }
355
356                 lmv->desc.ld_active_tgt_count++;
357                 tgts->active = 1;
358
359                 CDEBUG(D_OTHER, "connected to %s(%s) successfully (%d)\n",
360                         tgt_obd->obd_name, tgt_obd->obd_uuid.uuid,
361                         atomic_read(&obd->obd_refcount));
362
363 #ifdef __KERNEL__
364                 lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
365                 if (lmv_proc_dir) {
366                         struct obd_device *mdc_obd = class_conn2obd(&conn);
367                         struct proc_dir_entry *mdc_symlink;
368                         char name[MAX_STRING_SIZE + 1];
369
370                         LASSERT(mdc_obd != NULL);
371                         LASSERT(mdc_obd->obd_type != NULL);
372                         LASSERT(mdc_obd->obd_type->typ_name != NULL);
373                         name[MAX_STRING_SIZE] = '\0';
374                         snprintf(name, MAX_STRING_SIZE, "../../../%s/%s",
375                                  mdc_obd->obd_type->typ_name,
376                                  mdc_obd->obd_name);
377                         mdc_symlink = proc_symlink(mdc_obd->obd_name,
378                                                    lmv_proc_dir, name);
379                         if (mdc_symlink == NULL) {
380                                 CERROR("could not register LMV target "
381                                        "/proc/fs/lustre/%s/%s/target_obds/%s.",
382                                        obd->obd_type->typ_name, obd->obd_name,
383                                        mdc_obd->obd_name);
384                                 lprocfs_remove(lmv_proc_dir);
385                                 lmv_proc_dir = NULL;
386                         }
387                 }
388 #endif
389         }
390
391         lmv_set_timeouts(obd);
392         class_export_put(exp);
393         lmv->connected = 1;
394         up(&lmv->init_sem);
395         RETURN(0);
396
397  out_disc:
398         while (i-- > 0) {
399                 struct obd_uuid uuid;
400                 --tgts;
401                 --lmv->desc.ld_active_tgt_count;
402                 tgts->active = 0;
403                 /* save for CERROR below; (we know it's terminated) */
404                 uuid = tgts->uuid;
405                 rc2 = obd_disconnect(tgts->ltd_exp, 0);
406                 if (rc2)
407                         CERROR("error: LMV target %s disconnect on MDC idx %d: "
408                                "error %d\n", uuid.uuid, i, rc2);
409         }
410         class_disconnect(exp, 0);
411         up(&lmv->init_sem);
412         return rc;
413 }
414
415 static int lmv_disconnect(struct obd_export *exp, unsigned long flags)
416 {
417         struct obd_device *obd = class_exp2obd(exp);
418         struct lmv_obd *lmv = &obd->u.lmv;
419
420 #ifdef __KERNEL__
421         struct proc_dir_entry *lmv_proc_dir;
422 #endif
423         int rc, i;
424         ENTRY;
425
426         if (!lmv->tgts)
427                 goto out_local;
428
429         /* Only disconnect the underlying layers on the final disconnect. */
430         lmv->refcount--;
431         if (lmv->refcount != 0)
432                 goto out_local;
433
434 #ifdef __KERNEL__
435         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
436 #endif
437
438         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
439                 struct obd_device *mdc_obd; 
440                 
441                 if (lmv->tgts[i].ltd_exp == NULL)
442                         continue;
443
444                 mdc_obd = class_exp2obd(lmv->tgts[i].ltd_exp);
445
446                 if (mdc_obd)
447                         mdc_obd->obd_no_recov = obd->obd_no_recov;
448
449 #ifdef __KERNEL__
450                 if (lmv_proc_dir) {
451                         struct proc_dir_entry *mdc_symlink;
452
453                         mdc_symlink = lprocfs_srch(lmv_proc_dir, mdc_obd->obd_name);
454                         if (mdc_symlink) {
455                                 lprocfs_remove(mdc_symlink);
456                         } else {
457                                 CERROR("/proc/fs/lustre/%s/%s/target_obds/%s missing\n",
458                                        obd->obd_type->typ_name, obd->obd_name,
459                                        mdc_obd->obd_name);
460                         }
461                 }
462 #endif
463                 CDEBUG(D_OTHER, "disconnected from %s(%s) successfully\n",
464                         lmv->tgts[i].ltd_exp->exp_obd->obd_name,
465                         lmv->tgts[i].ltd_exp->exp_obd->obd_uuid.uuid);
466
467                 obd_register_observer(lmv->tgts[i].ltd_exp->exp_obd, NULL);
468                 rc = obd_disconnect(lmv->tgts[i].ltd_exp, flags);
469                 if (rc) {
470                         if (lmv->tgts[i].active) {
471                                 CERROR("Target %s disconnect error %d\n",
472                                        lmv->tgts[i].uuid.uuid, rc);
473                         }
474                         rc = 0;
475                 }
476                 
477                 lmv_activate_target(lmv, &lmv->tgts[i], 0);
478                 lmv->tgts[i].ltd_exp = NULL;
479         }
480
481 #ifdef __KERNEL__
482         if (lmv_proc_dir) {
483                 lprocfs_remove(lmv_proc_dir);
484         } else {
485                 CERROR("/proc/fs/lustre/%s/%s/target_obds missing\n",
486                        obd->obd_type->typ_name, obd->obd_name);
487         }
488 #endif
489
490 out_local:
491         /* this is the case when no real connection is established by
492          * lmv_check_connect(). */
493         if (!lmv->connected)
494                 class_export_put(exp);
495         rc = class_disconnect(exp, 0);
496         if (lmv->refcount == 0)
497                 lmv->connected = 0;
498         RETURN(rc);
499 }
500
501 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
502                          int len, void *karg, void *uarg)
503 {
504         struct obd_device *obddev = class_exp2obd(exp);
505         struct lmv_obd *lmv = &obddev->u.lmv;
506         int i, rc = 0, set = 0;
507         ENTRY;
508
509         if (lmv->desc.ld_tgt_count == 0)
510                 RETURN(-ENOTTY);
511         
512         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
513                 int err;
514
515                 if (lmv->tgts[i].ltd_exp == NULL)
516                         continue;
517
518                 err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp, len, karg, uarg);
519                 if (err) {
520                         if (lmv->tgts[i].active) {
521                                 CERROR("error: iocontrol MDC %s on MDT"
522                                        "idx %d: err = %d\n",
523                                        lmv->tgts[i].uuid.uuid, i, err);
524                                 if (!rc)
525                                         rc = err;
526                         }
527                 } else
528                         set = 1;
529         }
530         if (!set && !rc)
531                 rc = -EIO;
532
533         RETURN(rc);
534 }
535
536 static int lmv_setup(struct obd_device *obd, obd_count len, void *buf)
537 {
538         int i, rc = 0;
539         struct lmv_desc *desc;
540         struct obd_uuid *uuids;
541         struct lmv_tgt_desc *tgts;
542         struct obd_device *tgt_obd;
543         struct lustre_cfg *lcfg = buf;
544         struct lmv_obd *lmv = &obd->u.lmv;
545         ENTRY;
546
547         if (lcfg->lcfg_inllen1 < 1) {
548                 CERROR("LMV setup requires a descriptor\n");
549                 RETURN(-EINVAL);
550         }
551
552         if (lcfg->lcfg_inllen2 < 1) {
553                 CERROR("LMV setup requires an MDT UUID list\n");
554                 RETURN(-EINVAL);
555         }
556
557         desc = (struct lmv_desc *)lcfg->lcfg_inlbuf1;
558         if (sizeof(*desc) > lcfg->lcfg_inllen1) {
559                 CERROR("descriptor size wrong: %d > %d\n",
560                        (int)sizeof(*desc), lcfg->lcfg_inllen1);
561                 RETURN(-EINVAL);
562         }
563
564         uuids = (struct obd_uuid *)lcfg->lcfg_inlbuf2;
565         if (sizeof(*uuids) * desc->ld_tgt_count != lcfg->lcfg_inllen2) {
566                 CERROR("UUID array size wrong: %u * %u != %u\n",
567                        sizeof(*uuids), desc->ld_tgt_count, lcfg->lcfg_inllen2);
568                 RETURN(-EINVAL);
569         }
570
571         lmv->tgts_size = sizeof(struct lmv_tgt_desc) * desc->ld_tgt_count;
572         OBD_ALLOC(lmv->tgts, lmv->tgts_size);
573         if (lmv->tgts == NULL) {
574                 CERROR("Out of memory\n");
575                 RETURN(-ENOMEM);
576         }
577
578         lmv->desc = *desc;
579         spin_lock_init(&lmv->lmv_lock);
580         
581         for (i = 0, tgts = lmv->tgts; i < desc->ld_tgt_count; i++, tgts++)
582                 tgts->uuid = uuids[i];
583         
584         lmv->max_cookiesize = 0;
585
586         lmv->max_easize = sizeof(struct lustre_id) *
587                 desc->ld_tgt_count + sizeof(struct mea);
588         
589         rc = lmv_setup_mgr(obd);
590         if (rc) {
591                 CERROR("Can't setup LMV object manager, "
592                        "error %d.\n", rc);
593                 OBD_FREE(lmv->tgts, lmv->tgts_size);
594         }
595
596         tgt_obd = class_find_client_obd(&lmv->tgts->uuid, LUSTRE_MDC_NAME,
597                                         &obd->obd_uuid);
598         if (!tgt_obd) {
599                 CERROR("Target %s not attached\n", lmv->tgts->uuid.uuid);
600                 RETURN(-EINVAL);
601         }
602
603         RETURN(rc);
604 }
605
606 static int lmv_cleanup(struct obd_device *obd, int flags) 
607 {
608         struct lmv_obd *lmv = &obd->u.lmv;
609         ENTRY;
610
611         lmv_cleanup_mgr(obd);
612         OBD_FREE(lmv->tgts, lmv->tgts_size);
613         
614         RETURN(0);
615 }
616
617 static int lmv_statfs(struct obd_device *obd, struct obd_statfs *osfs,
618                       unsigned long max_age)
619 {
620         struct lmv_obd *lmv = &obd->u.lmv;
621         struct obd_statfs *temp;
622         int rc = 0, i;
623         ENTRY;
624         
625         rc = lmv_check_connect(obd);
626         if (rc)
627                 RETURN(rc);
628
629         OBD_ALLOC(temp, sizeof(*temp));
630         if (temp == NULL)
631                 RETURN(-ENOMEM);
632                 
633         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
634                 if (lmv->tgts[i].ltd_exp == NULL) {
635                         CWARN("%s: NULL export for %d\n", obd->obd_name, i);
636                         continue;
637                 }
638
639                 rc = obd_statfs(lmv->tgts[i].ltd_exp->exp_obd, temp, max_age);
640                 if (rc) {
641                         CERROR("can't stat MDS #%d (%s), error %d\n", i,
642                                lmv->tgts[i].ltd_exp->exp_obd->obd_name,
643                                rc);
644                         GOTO(out_free_temp, rc);
645                 }
646                 if (i == 0) {
647                         memcpy(osfs, temp, sizeof(*temp));
648                 } else {
649                         osfs->os_bavail += temp->os_bavail;
650                         osfs->os_blocks += temp->os_blocks;
651                         osfs->os_ffree += temp->os_ffree;
652                         osfs->os_files += temp->os_files;
653                 }
654         }
655
656         EXIT;
657 out_free_temp:
658         OBD_FREE(temp, sizeof(*temp));
659         return rc;
660 }
661
662 static int lmv_getstatus(struct obd_export *exp, struct lustre_id *id)
663 {
664         struct obd_device *obd = exp->exp_obd;
665         struct lmv_obd *lmv = &obd->u.lmv;
666         int rc;
667         ENTRY;
668
669         rc = lmv_check_connect(obd);
670         if (rc)
671                 RETURN(rc);
672
673         rc = md_getstatus(lmv->tgts[0].ltd_exp, id);
674         id_group(id) = 0;
675         
676         RETURN(rc);
677 }
678
679 static int lmv_getattr(struct obd_export *exp, struct lustre_id *id,
680                        __u64 valid, const char *ea_name, int ea_namelen,
681                        unsigned int ea_size, struct ptlrpc_request **request)
682 {
683         struct obd_device *obd = exp->exp_obd;
684         struct lmv_obd *lmv = &obd->u.lmv;
685         int rc, i = id_group(id);
686         struct lmv_obj *obj;
687         ENTRY;
688
689         rc = lmv_check_connect(obd);
690         if (rc)
691                 RETURN(rc);
692
693         LASSERT(i < lmv->desc.ld_tgt_count);
694
695
696         rc = md_getattr(lmv->tgts[i].ltd_exp, id, valid,
697                         ea_name, ea_namelen, ea_size, request);
698         if (rc)
699                 RETURN(rc);
700         
701         obj = lmv_grab_obj(obd, id);
702         
703         CDEBUG(D_OTHER, "GETATTR for "DLID4" %s\n",
704                OLID4(id), obj ? "(splitted)" : "");
705
706         /*
707          * if object is splitted, then we loop over all the slaves and gather
708          * size attribute. In ideal world we would have to gather also mds field
709          * from all slaves, as object is spread over the cluster and this is
710          * definitely interesting information and it is not good to loss it,
711          * but...
712          */
713         if (obj) {
714                 struct mds_body *body;
715
716                 if (*request == NULL) {
717                         lmv_put_obj(obj);
718                         RETURN(rc);
719                 }
720                         
721                 body = lustre_msg_buf((*request)->rq_repmsg, 0,
722                                       sizeof(*body));
723                 LASSERT(body != NULL);
724
725                 lmv_lock_obj(obj);
726         
727                 for (i = 0; i < obj->objcount; i++) {
728
729                         if (lmv->tgts[i].ltd_exp == NULL) {
730                                 CWARN("%s: NULL export for %d\n",
731                                       obd->obd_name, i);
732                                 continue;
733                         }
734
735                         /* skip master obj. */
736                         if (id_equal_fid(&obj->id, &obj->objs[i].id))
737                                 continue;
738                         
739                         body->size += obj->objs[i].size;
740                 }
741
742                 lmv_unlock_obj(obj);
743                 lmv_put_obj(obj);
744         }
745         
746         RETURN(rc);
747 }
748
749 static int lmv_change_cbdata(struct obd_export *exp,
750                              struct lustre_id *id, 
751                              ldlm_iterator_t it,
752                              void *data)
753 {
754         struct obd_device *obd = exp->exp_obd;
755         struct lmv_obd *lmv = &obd->u.lmv;
756         int rc = 0;
757         ENTRY;
758         
759         rc = lmv_check_connect(obd);
760         if (rc)
761                 RETURN(rc);
762         
763         CDEBUG(D_OTHER, "CBDATA for "DLID4"\n", OLID4(id));
764         LASSERT(id_group(id) < lmv->desc.ld_tgt_count);
765
766         rc = md_change_cbdata(lmv->tgts[id_group(id)].ltd_exp,
767                               id, it, data);
768         
769         RETURN(rc);
770 }
771
772 static int lmv_change_cbdata_name(struct obd_export *exp,
773                                   struct lustre_id *pid,
774                                   char *name, int len,
775                                   struct lustre_id *cid,
776                                   ldlm_iterator_t it,
777                                   void *data)
778 {
779         struct obd_device *obd = exp->exp_obd;
780         struct lmv_obd *lmv = &obd->u.lmv;
781         struct lustre_id rcid = *cid;
782         struct lmv_obj *obj;
783         int rc = 0, mds;
784         ENTRY;
785
786         rc = lmv_check_connect(obd);
787         if (rc)
788                 RETURN(rc);
789
790         LASSERT(id_group(pid) < lmv->desc.ld_tgt_count);
791         LASSERT(id_group(cid) < lmv->desc.ld_tgt_count);
792         
793         CDEBUG(D_OTHER, "CBDATA for "DLID4":%*s -> "DLID4"\n",
794                OLID4(pid), len, name, OLID4(cid));
795
796         /* this is default mds for directory name belongs to. */
797         mds = id_group(pid);
798         obj = lmv_grab_obj(obd, pid);
799         if (obj) {
800                 /* directory is splitted. look for right mds for this name. */
801                 mds = raw_name2idx(obj->hashtype, obj->objcount, name, len);
802                 rcid = obj->objs[mds].id;
803                 mds = id_group(&rcid);
804                 lmv_put_obj(obj);
805         }
806         rc = md_change_cbdata(lmv->tgts[mds].ltd_exp, &rcid, it, data);
807         RETURN(rc);
808 }
809
810 static int lmv_valid_attrs(struct obd_export *exp, struct lustre_id *id) 
811 {
812         struct obd_device *obd = exp->exp_obd;
813         struct lmv_obd *lmv = &obd->u.lmv;
814         int rc = 0;
815         ENTRY;
816
817         rc = lmv_check_connect(obd);
818         if (rc)
819                 RETURN(rc);
820
821         CDEBUG(D_OTHER, "validate "DLID4"\n", OLID4(id));
822         LASSERT(id_group(id) < lmv->desc.ld_tgt_count);
823         rc = md_valid_attrs(lmv->tgts[id_group(id)].ltd_exp, id);
824         RETURN(rc);
825 }
826
827 int lmv_close(struct obd_export *exp, struct obdo *obdo,
828               struct obd_client_handle *och,
829               struct ptlrpc_request **request)
830 {
831         struct obd_device *obd = exp->exp_obd;
832         struct lmv_obd *lmv = &obd->u.lmv;
833         int rc, i = obdo->o_mds;
834         ENTRY;
835         
836         rc = lmv_check_connect(obd);
837         if (rc)
838                 RETURN(rc);
839
840         LASSERT(i < lmv->desc.ld_tgt_count);
841         CDEBUG(D_OTHER, "CLOSE %lu/%lu/%lu\n", (unsigned long)obdo->o_mds,
842                (unsigned long)obdo->o_id, (unsigned long)obdo->o_generation);
843         rc = md_close(lmv->tgts[i].ltd_exp, obdo, och, request);
844         RETURN(rc);
845 }
846
847 int lmv_get_mea_and_update_object(struct obd_export *exp, 
848                                   struct lustre_id *id)
849 {
850         struct obd_device *obd = exp->exp_obd;
851         struct lmv_obd *lmv = &obd->u.lmv;
852         struct ptlrpc_request *req = NULL;
853         struct lmv_obj *obj;
854         struct lustre_md md;
855         int mealen, rc;
856         __u64 valid;
857         ENTRY;
858
859         md.mea = NULL;
860         mealen = MEA_SIZE_LMV(lmv);
861         
862         valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA;
863
864         /* time to update mea of parent id */
865         rc = md_getattr(lmv->tgts[id_group(id)].ltd_exp,
866                         id, valid, NULL, 0, mealen, &req);
867         if (rc) {
868                 CERROR("md_getattr() failed, error %d\n", rc);
869                 GOTO(cleanup, rc);
870         }
871
872         rc = mdc_req2lustre_md(exp, req, 0, NULL, &md);
873         if (rc) {
874                 CERROR("mdc_req2lustre_md() failed, error %d\n", rc);
875                 GOTO(cleanup, rc);
876         }
877
878         if (md.mea == NULL)
879                 GOTO(cleanup, rc = -ENODATA);
880
881         obj = lmv_create_obj(exp, id, md.mea);
882         if (IS_ERR(obj))
883                 rc = PTR_ERR(obj);
884         
885         lmv_put_obj(obj);
886         obd_free_memmd(exp, (struct lov_stripe_md **)&md.mea);
887
888         EXIT;
889 cleanup:
890         if (req)
891                 ptlrpc_req_finished(req);
892         return rc;
893 }
894
895 int lmv_create(struct obd_export *exp, struct mdc_op_data *op_data,
896                const void *data, int datalen, int mode, __u32 uid,
897                __u32 gid, __u64 rdev, struct ptlrpc_request **request)
898 {
899         struct obd_device *obd = exp->exp_obd;
900         struct lmv_obd *lmv = &obd->u.lmv;
901         struct mds_body *body;
902         struct lmv_obj *obj;
903         int rc, mds, loop = 0;
904         ENTRY;
905
906         rc = lmv_check_connect(obd);
907         if (rc)
908                 RETURN(rc);
909
910         if (!lmv->desc.ld_active_tgt_count)
911                 RETURN(-EIO);
912 repeat:
913         LASSERT(++loop <= 2);
914         obj = lmv_grab_obj(obd, &op_data->id1);
915         if (obj) {
916                 mds = raw_name2idx(obj->hashtype, obj->objcount, 
917                                    op_data->name, op_data->namelen);
918                 op_data->id1 = obj->objs[mds].id;
919                 lmv_put_obj(obj);
920         }
921
922         CDEBUG(D_OTHER, "CREATE '%*s' on "DLID4"\n", op_data->namelen,
923                op_data->name, OLID4(&op_data->id1));
924         
925         rc = md_create(lmv->tgts[id_group(&op_data->id1)].ltd_exp, 
926                        op_data, data, datalen, mode, uid, gid, rdev,
927                        request);
928         if (rc == 0) {
929                 if (*request == NULL)
930                         RETURN(rc);
931
932                 body = lustre_msg_buf((*request)->rq_repmsg, 0,
933                                       sizeof(*body));
934                 if (body == NULL)
935                         RETURN(-ENOMEM);
936                 
937                 CDEBUG(D_OTHER, "created. "DLID4"\n", OLID4(&op_data->id1));
938         } else if (rc == -ERESTART) {
939                 /*
940                  * directory got splitted. time to update local object and
941                  * repeat the request with proper MDS.
942                  */
943                 rc = lmv_get_mea_and_update_object(exp, &op_data->id1);
944                 if (rc == 0) {
945                         ptlrpc_req_finished(*request);
946                         goto repeat;
947                 }
948         }
949         RETURN(rc);
950 }
951
952 int lmv_done_writing(struct obd_export *exp, struct obdo *obdo)
953 {
954         struct obd_device *obd = exp->exp_obd;
955         struct lmv_obd *lmv = &obd->u.lmv;
956         int rc;
957         ENTRY;
958         
959         rc = lmv_check_connect(obd);
960         if (rc)
961                 RETURN(rc);
962
963         /* FIXME: choose right MDC here */
964         CWARN("this method isn't implemented yet\n");
965         rc = md_done_writing(lmv->tgts[0].ltd_exp, obdo);
966         RETURN(rc);
967 }
968
969 int lmv_enqueue_slaves(struct obd_export *exp, int locktype,
970                        struct lookup_intent *it, int lockmode,
971                        struct mdc_op_data *data, struct lustre_handle *lockh,
972                        void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
973                        ldlm_blocking_callback cb_blocking, void *cb_data)
974 {
975         struct obd_device *obd = exp->exp_obd;
976         struct lmv_obd *lmv = &obd->u.lmv;
977         struct mea *mea = data->mea1;
978         struct mdc_op_data *data2;
979         int i, rc, mds;
980         ENTRY;
981
982         OBD_ALLOC(data2, sizeof(*data2));
983         if (data2 == NULL)
984                 RETURN(-ENOMEM);
985         
986         LASSERT(mea != NULL);
987         for (i = 0; i < mea->mea_count; i++) {
988                 memset(data2, 0, sizeof(*data2));
989                 data2->id1 = mea->mea_ids[i];
990                 mds = id_group(&data2->id1);
991                 
992                 if (lmv->tgts[mds].ltd_exp == NULL)
993                         continue;
994
995                 rc = md_enqueue(lmv->tgts[mds].ltd_exp, locktype, it, 
996                                 lockmode, data2, lockh + i, lmm, lmmsize, 
997                                 cb_compl, cb_blocking, cb_data);
998                 
999                 CDEBUG(D_OTHER, "take lock on slave "DLID4" -> %d/%d\n",
1000                        OLID4(&mea->mea_ids[i]), rc, LUSTRE_IT(it)->it_status);
1001                 if (rc)
1002                         GOTO(cleanup, rc);
1003                 if (LUSTRE_IT(it)->it_data) {
1004                         struct ptlrpc_request *req;
1005                         req = (struct ptlrpc_request *) LUSTRE_IT(it)->it_data;
1006                         ptlrpc_req_finished(req);
1007                 }
1008                 
1009                 if (LUSTRE_IT(it)->it_status)
1010                         GOTO(cleanup, rc = LUSTRE_IT(it)->it_status);
1011         }
1012         
1013         OBD_FREE(data2, sizeof(*data2));
1014         RETURN(0);
1015 cleanup:
1016         OBD_FREE(data2, sizeof(*data2));
1017         
1018         /* drop all taken locks */
1019         while (--i >= 0) {
1020                 if (lockh[i].cookie)
1021                         ldlm_lock_decref(lockh + i, lockmode);
1022                 lockh[i].cookie = 0;
1023         }
1024         return rc;
1025 }
1026
1027 int lmv_enqueue(struct obd_export *exp, int lock_type,
1028                 struct lookup_intent *it, int lock_mode,
1029                 struct mdc_op_data *data, struct lustre_handle *lockh,
1030                 void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
1031                 ldlm_blocking_callback cb_blocking, void *cb_data)
1032 {
1033         struct obd_device *obd = exp->exp_obd;
1034         struct lmv_obd *lmv = &obd->u.lmv;
1035         struct lmv_obj *obj;
1036         int rc, mds;
1037         ENTRY;
1038
1039         rc = lmv_check_connect(obd);
1040         if (rc)
1041                 RETURN(rc);
1042
1043         if (data->mea1 && it->it_op == IT_UNLINK) {
1044                 rc = lmv_enqueue_slaves(exp, lock_type, it, lock_mode,
1045                                         data, lockh, lmm, lmmsize,
1046                                         cb_compl, cb_blocking, cb_data);
1047                 RETURN(rc);
1048         }
1049
1050         if (data->namelen) {
1051                 obj = lmv_grab_obj(obd, &data->id1);
1052                 if (obj) {
1053                         /* directory is splitted. look for right mds for this
1054                          * name */
1055                         mds = raw_name2idx(obj->hashtype, obj->objcount,
1056                                            (char *)data->name, data->namelen);
1057                         data->id1 = obj->objs[mds].id;
1058                         lmv_put_obj(obj);
1059                 }
1060         }
1061         CDEBUG(D_OTHER, "ENQUEUE '%s' on "DLID4"\n", LL_IT2STR(it),
1062                OLID4(&data->id1));
1063         
1064         rc = md_enqueue(lmv->tgts[id_group(&data->id1)].ltd_exp, 
1065                         lock_type, it, lock_mode, data, lockh, lmm, 
1066                         lmmsize, cb_compl, cb_blocking, cb_data);
1067         RETURN(rc);
1068 }
1069
1070 int lmv_getattr_lock(struct obd_export *exp, struct lustre_id *id,
1071                      char *filename, int namelen, __u64 valid,
1072                      unsigned int ea_size, struct ptlrpc_request **request)
1073 {
1074         int rc, mds = id_group(id), loop = 0;
1075         struct obd_device *obd = exp->exp_obd;
1076         struct lmv_obd *lmv = &obd->u.lmv;
1077         struct lustre_id rid = *id;
1078         struct mds_body *body;
1079         struct lmv_obj *obj;
1080         ENTRY;
1081         
1082         rc = lmv_check_connect(obd);
1083         if (rc)
1084                 RETURN(rc);
1085 repeat:
1086         LASSERT(++loop <= 2);
1087         obj = lmv_grab_obj(obd, id);
1088         if (obj) {
1089                 /* directory is splitted. look for right mds for this name */
1090                 mds = raw_name2idx(obj->hashtype, obj->objcount, 
1091                                    filename, namelen - 1);
1092                 rid = obj->objs[mds].id;
1093                 lmv_put_obj(obj);
1094         }
1095         
1096         CDEBUG(D_OTHER, "getattr_lock for %*s on "DLID4" -> "DLID4"\n",
1097                namelen, filename, OLID4(id), OLID4(&rid));
1098
1099         rc = md_getattr_lock(lmv->tgts[id_group(&rid)].ltd_exp, &rid,
1100                              filename, namelen, (valid | OBD_MD_FID),
1101                              ea_size, request);
1102         if (rc == 0) {
1103                 /*
1104                  * this could be cross-node reference. in this case all we have
1105                  * right now is lustre_id triple. we'd like to find other
1106                  * attributes.
1107                  */
1108                 body = lustre_msg_buf((*request)->rq_repmsg, 0, sizeof(*body));
1109                 LASSERT(body != NULL);
1110                 LASSERT((body->valid & OBD_MD_FID) != 0);
1111
1112                 if (body->valid & OBD_MD_MDS) {
1113                         struct ptlrpc_request *req = NULL;
1114                         
1115                         rid = body->id1;
1116                         CDEBUG(D_OTHER, "request attrs for "DLID4"\n", OLID4(&rid));
1117
1118                         rc = md_getattr_lock(lmv->tgts[id_group(&rid)].ltd_exp, 
1119                                              &rid, NULL, 1, valid, ea_size, &req);
1120                         ptlrpc_req_finished(*request);
1121                         *request = req;
1122                 }
1123         } else if (rc == -ERESTART) {
1124                 /* directory got splitted. time to update local object and
1125                  * repeat the request with proper MDS */
1126                 rc = lmv_get_mea_and_update_object(exp, &rid);
1127                 if (rc == 0) {
1128                         ptlrpc_req_finished(*request);
1129                         goto repeat;
1130                 }
1131         }
1132         RETURN(rc);
1133 }
1134
1135 /*
1136  * llite passes id of an target inode in data->id1 and id of directory in
1137  * data->id2
1138  */
1139 int lmv_link(struct obd_export *exp, struct mdc_op_data *data,
1140              struct ptlrpc_request **request)
1141 {
1142         struct obd_device *obd = exp->exp_obd;
1143         struct lmv_obd *lmv = &obd->u.lmv;
1144         struct lmv_obj *obj;
1145         int rc;
1146         ENTRY;
1147         
1148         rc = lmv_check_connect(obd);
1149         if (rc)
1150                 RETURN(rc);
1151
1152         if (data->namelen != 0) {
1153                 /* usual link request */
1154                 obj = lmv_grab_obj(obd, &data->id1);
1155                 if (obj) {
1156                         rc = raw_name2idx(obj->hashtype, obj->objcount, 
1157                                           data->name, data->namelen);
1158                         data->id1 = obj->objs[rc].id;
1159                         lmv_put_obj(obj);
1160                 }
1161                 
1162                 CDEBUG(D_OTHER,"link "DLID4":%*s to "DLID4"\n",
1163                        OLID4(&data->id2), data->namelen, data->name,
1164                        OLID4(&data->id1));
1165         } else {
1166                 /* request from MDS to acquire i_links for inode by id1 */
1167                 CDEBUG(D_OTHER, "inc i_nlinks for "DLID4"\n",
1168                        OLID4(&data->id1));
1169         }
1170                         
1171         rc = md_link(lmv->tgts[id_group(&data->id1)].ltd_exp, 
1172                      data, request);
1173         RETURN(rc);
1174 }
1175
1176 int lmv_rename(struct obd_export *exp, struct mdc_op_data *data,
1177                const char *old, int oldlen, const char *new, int newlen,
1178                struct ptlrpc_request **request)
1179 {
1180         struct obd_device *obd = exp->exp_obd;
1181         struct lmv_obd *lmv = &obd->u.lmv;
1182         struct lmv_obj *obj;
1183         int rc, mds;
1184         ENTRY;
1185
1186         CDEBUG(D_OTHER, "rename %*s in "DLID4" to %*s in "DLID4"\n",
1187                oldlen, old, OLID4(&data->id1), newlen, new,
1188                OLID4(&data->id2));
1189
1190         rc = lmv_check_connect(obd);
1191         if (rc)
1192                 RETURN(rc);
1193
1194         if (oldlen == 0) {
1195                 /*
1196                  * MDS with old dir entry is asking another MDS to create name
1197                  * there.
1198                  */
1199                 CDEBUG(D_OTHER,
1200                        "create %*s(%d/%d) in "DLID4" pointing "
1201                        "to "DLID4"\n", newlen, new, oldlen, newlen,
1202                        OLID4(&data->id2), OLID4(&data->id1));
1203
1204                 mds = id_group(&data->id2);
1205
1206                 /* 
1207                  * target directory can be splitted, sowe should forward request
1208                  * to the right MDS.
1209                  */
1210                 obj = lmv_grab_obj(obd, &data->id2);
1211                 if (obj) {
1212                         mds = raw_name2idx(obj->hashtype, obj->objcount, 
1213                                            (char *)new, newlen);
1214                         data->id2 = obj->objs[mds].id;
1215                         CDEBUG(D_OTHER, "forward to MDS #%u ("DLID4")\n", mds,
1216                                OLID4(&data->id2));
1217                         lmv_put_obj(obj);
1218                 }
1219                 goto request;
1220         }
1221
1222         obj = lmv_grab_obj(obd, &data->id1);
1223         if (obj) {
1224                 /*
1225                  * directory is already splitted, so we have to forward request
1226                  * to the right MDS.
1227                  */
1228                 mds = raw_name2idx(obj->hashtype, obj->objcount, 
1229                                    (char *)old, oldlen);
1230                 data->id1 = obj->objs[mds].id;
1231                 CDEBUG(D_OTHER, "forward to MDS #%u ("DLID4")\n", mds,
1232                        OLID4(&data->id1));
1233                 lmv_put_obj(obj);
1234         }
1235
1236         obj = lmv_grab_obj(obd, &data->id2);
1237         if (obj) {
1238                 /*
1239                  * directory is already splitted, so we have to forward request
1240                  * to the right MDS.
1241                  */
1242                 mds = raw_name2idx(obj->hashtype, obj->objcount, 
1243                                    (char *)new, newlen);
1244                 
1245                 data->id2 = obj->objs[mds].id;
1246                 CDEBUG(D_OTHER, "forward to MDS #%u ("DLID4")\n", mds,
1247                        OLID4(&data->id2));
1248                 lmv_put_obj(obj);
1249         }
1250         
1251         mds = id_group(&data->id1);
1252
1253 request:
1254         if (id_group(&data->id1) != id_group(&data->id2)) {
1255                 CDEBUG(D_OTHER,"cross-node rename "DLID4"/%*s to "DLID4"/%*s\n",
1256                        OLID4(&data->id1), oldlen, old, OLID4(&data->id2),
1257                        newlen, new);
1258         }
1259
1260         rc = md_rename(lmv->tgts[mds].ltd_exp, data, old, oldlen,
1261                        new, newlen, request); 
1262         RETURN(rc);
1263 }
1264
1265 int lmv_setattr(struct obd_export *exp, struct mdc_op_data *data,
1266                 struct iattr *iattr, void *ea, int ealen, void *ea2,
1267                 int ea2len, struct ptlrpc_request **request)
1268 {
1269         struct obd_device *obd = exp->exp_obd;
1270         struct lmv_obd *lmv = &obd->u.lmv;
1271         struct ptlrpc_request *req;
1272         struct mds_body *body;
1273         struct lmv_obj *obj;
1274         int rc = 0, i;
1275         ENTRY;
1276
1277         rc = lmv_check_connect(obd);
1278         if (rc)
1279                 RETURN(rc);
1280
1281         obj = lmv_grab_obj(obd, &data->id1);
1282         
1283         CDEBUG(D_OTHER, "SETATTR for "DLID4", valid 0x%x%s\n",
1284                OLID4(&data->id1), iattr->ia_valid, obj ? ", splitted" : "");
1285         
1286         if (obj) {
1287                 for (i = 0; i < obj->objcount; i++) {
1288                         data->id1 = obj->objs[i].id;
1289                         
1290                         rc = md_setattr(lmv->tgts[id_group(&data->id1)].ltd_exp, 
1291                                         data, iattr, ea, ealen, ea2, ea2len, &req);
1292
1293                         if (id_equal_fid(&obj->id, &obj->objs[i].id)) {
1294                                 /*
1295                                  * this is master object and this request should
1296                                  * be returned back to llite.
1297                                  */
1298                                 *request = req;
1299                         } else {
1300                                 ptlrpc_req_finished(req);
1301                         }
1302
1303                         if (rc)
1304                                 break;
1305                 }
1306                 lmv_put_obj(obj);
1307         } else {
1308                 LASSERT(id_group(&data->id1) < lmv->desc.ld_tgt_count);
1309                 rc = md_setattr(lmv->tgts[id_group(&data->id1)].ltd_exp,
1310                                 data, iattr, ea, ealen, ea2, ea2len, request); 
1311                 if (rc == 0) {
1312                         body = lustre_msg_buf((*request)->rq_repmsg, 0,
1313                                               sizeof(*body));
1314                         LASSERT(body != NULL);
1315                         LASSERT((body->valid & OBD_MD_FID) != 0);
1316                         LASSERT(id_group(&body->id1) == id_group(&data->id1));
1317                 }
1318         }
1319         RETURN(rc);
1320 }
1321
1322 int lmv_sync(struct obd_export *exp, struct lustre_id *id,
1323              struct ptlrpc_request **request)
1324 {
1325         struct obd_device *obd = exp->exp_obd;
1326         struct lmv_obd *lmv = &obd->u.lmv;
1327         int rc;
1328         ENTRY;
1329
1330         rc = lmv_check_connect(obd);
1331         if (rc)
1332                 RETURN(rc);
1333
1334         rc = md_sync(lmv->tgts[id_group(id)].ltd_exp, 
1335                      id, request);
1336         RETURN(rc);
1337 }
1338
1339 int lmv_dirobj_blocking_ast(struct ldlm_lock *lock, 
1340                             struct ldlm_lock_desc *desc,
1341                             void *data, int flag)
1342 {
1343         struct lustre_handle lockh;
1344         struct lmv_obj *obj;
1345         int rc;
1346         ENTRY;
1347
1348         switch (flag) {
1349         case LDLM_CB_BLOCKING:
1350                 ldlm_lock2handle(lock, &lockh);
1351                 rc = ldlm_cli_cancel(&lockh);
1352                 if (rc < 0) {
1353                         CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
1354                         RETURN(rc);
1355                 }
1356                 break;
1357         case LDLM_CB_CANCELING:
1358                 /* time to drop cached attrs for dirobj */
1359                 obj = lock->l_ast_data;
1360                 if (obj) {
1361                         CDEBUG(D_OTHER, "cancel %s on "LPU64"/"LPU64
1362                                ", master "DLID4"\n",
1363                                lock->l_resource->lr_name.name[3] == 1 ?
1364                                "LOOKUP" : "UPDATE",
1365                                lock->l_resource->lr_name.name[0],
1366                                lock->l_resource->lr_name.name[1], 
1367                                OLID4(&obj->id));
1368                         lmv_put_obj(obj);
1369                 }
1370                 break;
1371         default:
1372                 LBUG();
1373         }
1374         RETURN(0);
1375 }
1376
1377 void lmv_remove_dots(struct page *page)
1378 {
1379         unsigned limit = PAGE_CACHE_SIZE;
1380         char *kaddr = page_address(page);
1381         struct ext2_dir_entry_2 *p;
1382         unsigned offs, rec_len;
1383
1384         for (offs = 0; offs <= limit - EXT2_DIR_REC_LEN(1); offs += rec_len) {
1385                 p = (struct ext2_dir_entry_2 *)(kaddr + offs);
1386                 rec_len = le16_to_cpu(p->rec_len);
1387
1388                 if ((p->name_len == 1 && p->name[0] == '.') ||
1389                     (p->name_len == 2 && p->name[0] == '.' && p->name[1] == '.'))
1390                         p->inode = 0;
1391         }
1392 }
1393
1394 int lmv_readpage(struct obd_export *exp, struct lustre_id *id,
1395                  __u64 offset, struct page *page,
1396                  struct ptlrpc_request **request)
1397 {
1398         struct obd_device *obd = exp->exp_obd;
1399         struct lmv_obd *lmv = &obd->u.lmv;
1400         struct lustre_id rid = *id;
1401         struct lmv_obj *obj;
1402         int rc, i;
1403         ENTRY;
1404
1405 #warning "we need well-desgined readdir() implementation"
1406         rc = lmv_check_connect(obd);
1407         if (rc)
1408                 RETURN(rc);
1409
1410         LASSERT(id_group(id) < lmv->desc.ld_tgt_count);
1411         CDEBUG(D_OTHER, "READPAGE at %llu from "DLID4"\n",
1412                offset, OLID4(&rid));
1413
1414         obj = lmv_grab_obj(obd, id);
1415         if (obj) {
1416                 lmv_lock_obj(obj);
1417
1418                 /* find dirobj containing page with requested offset. */
1419                 for (i = 0; i < obj->objcount; i++) {
1420                         if (offset < obj->objs[i].size)
1421                                 break;
1422                         offset -= obj->objs[i].size;
1423                 }
1424                 rid = obj->objs[i].id;
1425                 
1426                 lmv_unlock_obj(obj);
1427                 lmv_put_obj(obj);
1428                 
1429                 CDEBUG(D_OTHER, "forward to "DLID4" with offset %lu\n",
1430                        OLID4(&rid), (unsigned long)offset);
1431         }
1432         rc = md_readpage(lmv->tgts[id_group(&rid)].ltd_exp, &rid, 
1433                          offset, page, request);
1434         
1435         if (rc == 0 && !id_equal_fid(&rid, id))
1436                 /* this page isn't from master object. To avoid "." and ".." 
1437                  * duplication in directory, we have to remove them from all
1438                  * slave objects */
1439                 lmv_remove_dots(page);
1440         
1441         RETURN(rc);
1442 }
1443
1444 int lmv_unlink_slaves(struct obd_export *exp, struct mdc_op_data *data,
1445                       struct ptlrpc_request **req)
1446 {
1447         struct obd_device *obd = exp->exp_obd;
1448         struct lmv_obd *lmv = &obd->u.lmv;
1449         struct mea *mea = data->mea1;
1450         struct mdc_op_data *data2;
1451         int i, rc = 0;
1452         ENTRY;
1453
1454         OBD_ALLOC(data2, sizeof(*data2));
1455         if (data2 == NULL)
1456                 RETURN(-ENOMEM);
1457         
1458         LASSERT(mea != NULL);
1459         for (i = 0; i < mea->mea_count; i++) {
1460                 memset(data2, 0, sizeof(*data2));
1461                 data2->id1 = mea->mea_ids[i];
1462                 data2->create_mode = MDS_MODE_DONT_LOCK | S_IFDIR;
1463                 
1464                 if (lmv->tgts[id_group(&data2->id1)].ltd_exp == NULL)
1465                         continue;
1466
1467                 rc = md_unlink(lmv->tgts[id_group(&data2->id1)].ltd_exp,
1468                                data2, req);
1469                 
1470                 CDEBUG(D_OTHER, "unlink slave "DLID4" -> %d\n",
1471                        OLID4(&mea->mea_ids[i]), rc);
1472                 
1473                 if (*req) {
1474                         ptlrpc_req_finished(*req);
1475                         *req = NULL;
1476                 }
1477                 if (rc)
1478                         RETURN(rc);
1479         }
1480         OBD_FREE(data2, sizeof(*data2));
1481         RETURN(rc);
1482 }
1483
1484 int lmv_delete_inode(struct obd_export *exp, struct lustre_id *id)
1485 {
1486         ENTRY;
1487
1488         LASSERT(exp && id);
1489         if (lmv_delete_obj(exp, id)) {
1490                 CDEBUG(D_OTHER, "lmv object "DLID4" is destroyed.\n",
1491                        OLID4(id));
1492         }
1493         RETURN(0);
1494 }
1495
1496 int lmv_unlink(struct obd_export *exp, struct mdc_op_data *data,
1497                struct ptlrpc_request **request)
1498 {
1499         struct obd_device *obd = exp->exp_obd;
1500         struct lmv_obd *lmv = &obd->u.lmv;
1501         int rc, i = 0;
1502         ENTRY;
1503         
1504         rc = lmv_check_connect(obd);
1505         if (rc)
1506                 RETURN(rc);
1507
1508         if (data->namelen == 0 && data->mea1 != NULL) {
1509                 /* mds asks to remove slave objects */
1510                 rc = lmv_unlink_slaves(exp, data, request);
1511                 RETURN(rc);
1512         }
1513
1514         if (data->namelen != 0) {
1515                 struct lmv_obj *obj;
1516                 
1517                 obj = lmv_grab_obj(obd, &data->id1);
1518                 if (obj) {
1519                         i = raw_name2idx(obj->hashtype, obj->objcount,
1520                                          data->name, data->namelen);
1521                         data->id1 = obj->objs[i].id;
1522                         lmv_put_obj(obj);
1523                 }
1524                 CDEBUG(D_OTHER, "unlink '%*s' in "DLID4" -> %u\n",
1525                        data->namelen, data->name, OLID4(&data->id1),
1526                        i);
1527         } else {
1528                 CDEBUG(D_OTHER, "drop i_nlink on "DLID4"\n",
1529                        OLID4(&data->id1));
1530         }
1531         rc = md_unlink(lmv->tgts[id_group(&data->id1)].ltd_exp, 
1532                        data, request);
1533         RETURN(rc);
1534 }
1535
1536 struct obd_device *lmv_get_real_obd(struct obd_export *exp,
1537                                     struct lustre_id *id)
1538 {
1539         struct obd_device *obd = exp->exp_obd;
1540         struct lmv_obd *lmv = &obd->u.lmv;
1541         int rc;
1542         ENTRY;
1543
1544         rc = lmv_check_connect(obd);
1545         if (rc)
1546                 RETURN(ERR_PTR(rc));
1547         obd = lmv->tgts[id_group(id)].ltd_exp->exp_obd;
1548         EXIT;
1549         
1550         return obd;
1551 }
1552
1553 int lmv_init_ea_size(struct obd_export *exp, int easize,
1554                      int cookiesize)
1555 {
1556         struct obd_device *obd = exp->exp_obd;
1557         struct lmv_obd *lmv = &obd->u.lmv;
1558         int i, rc = 0, change = 0;
1559         ENTRY;
1560
1561         if (lmv->max_easize < easize) {
1562                 lmv->max_easize = easize;
1563                 change = 1;
1564         }
1565         if (lmv->max_cookiesize < cookiesize) {
1566                 lmv->max_cookiesize = cookiesize;
1567                 change = 1;
1568         }
1569         if (change == 0)
1570                 RETURN(0);
1571         
1572         if (lmv->connected == 0)
1573                 RETURN(0);
1574
1575         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1576                 if (lmv->tgts[i].ltd_exp == NULL) {
1577                         CWARN("%s: NULL export for %d\n", obd->obd_name, i);
1578                         continue;
1579                 }
1580
1581                 rc = obd_init_ea_size(lmv->tgts[i].ltd_exp, easize, cookiesize);
1582                 if (rc) {
1583                         CERROR("obd_init_ea_size() failed on MDT target %d, "
1584                                "error %d.\n", i, rc);
1585                         break;
1586                 }
1587         }
1588         RETURN(rc);
1589 }
1590
1591 int lmv_obd_create_single(struct obd_export *exp, struct obdo *oa,
1592                           struct lov_stripe_md **ea, struct obd_trans_info *oti)
1593 {
1594         struct obd_device *obd = exp->exp_obd;
1595         struct lmv_obd *lmv = &obd->u.lmv;
1596         struct lov_stripe_md obj_md;
1597         struct lov_stripe_md *obj_mdp = &obj_md;
1598         int rc = 0;
1599         ENTRY;
1600
1601         LASSERT(ea == NULL);
1602         LASSERT(oa->o_mds < lmv->desc.ld_tgt_count);
1603
1604         rc = obd_create(lmv->tgts[oa->o_mds].ltd_exp,
1605                         oa, &obj_mdp, oti);
1606
1607         RETURN(rc);
1608 }
1609
1610 int lmv_getready(struct obd_export *exp)
1611 {
1612         struct obd_device *obd = exp->exp_obd;
1613         int rc = 0;
1614         
1615         ENTRY;
1616         rc = lmv_check_connect(obd);
1617         RETURN(rc);
1618 }
1619
1620 /*
1621  * to be called from MDS only. @oa should have correct store cookie and o_fid
1622  * values for "master" object, as it will be used.
1623  */
1624 int lmv_obd_create(struct obd_export *exp, struct obdo *oa,
1625                    struct lov_stripe_md **ea, struct obd_trans_info *oti)
1626 {
1627         struct obd_device *obd = exp->exp_obd;
1628         struct lmv_obd *lmv = &obd->u.lmv;
1629         struct lustre_id mid;
1630         int i, c, rc = 0;
1631         struct mea *mea;
1632         ENTRY;
1633
1634         rc = lmv_check_connect(obd);
1635         if (rc)
1636                 RETURN(rc);
1637
1638         LASSERT(oa != NULL);
1639         
1640         if (ea == NULL) {
1641                 rc = lmv_obd_create_single(exp, oa, NULL, oti);
1642                 if (rc)
1643                         CERROR("Can't create object, rc = %d\n", rc);
1644                 RETURN(rc);
1645         }
1646
1647         if (*ea == NULL) {
1648                 rc = obd_alloc_diskmd(exp, (struct lov_mds_md **)ea);
1649                 if (rc < 0) {
1650                         CERROR("obd_alloc_diskmd() failed, error %d\n",
1651                                rc);
1652                         RETURN(rc);
1653                 } else
1654                         rc = 0;
1655                 
1656                 if (*ea == NULL)
1657                         RETURN(-ENOMEM);
1658         }
1659
1660         /* 
1661          * here we should take care about splitted dir, so store cookie and fid
1662          * for "master" object should already be allocated and passed in @oa.
1663          */
1664         LASSERT(oa->o_id != 0);
1665         LASSERT(oa->o_fid != 0);
1666
1667         /* save "master" object id */
1668         obdo2id(&mid, oa);
1669
1670         mea = (struct mea *)*ea;
1671         mea->mea_master = -1;
1672         mea->mea_magic = MEA_MAGIC_ALL_CHARS;
1673
1674         if (!mea->mea_count || mea->mea_count > lmv->desc.ld_tgt_count)
1675                 mea->mea_count = lmv->desc.ld_tgt_count;
1676
1677         for (i = 0, c = 0; c < mea->mea_count && i < lmv->desc.ld_tgt_count; i++) {
1678                 struct lov_stripe_md obj_md;
1679                 struct lov_stripe_md *obj_mdp = &obj_md;
1680                
1681                 if (lmv->tgts[i].ltd_exp == NULL) {
1682                         /* this is "master" MDS */
1683                         mea->mea_master = i;
1684                         mea->mea_ids[c] = mid;
1685                         c++;
1686                         continue;
1687                 }
1688
1689                 /*
1690                  * "master" MDS should always be part of stripped dir,
1691                  * so scan for it.
1692                  */
1693                 if (mea->mea_master == -1 && c == mea->mea_count - 1)
1694                         continue;
1695
1696                 oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLTYPE | OBD_MD_FLMODE |
1697                         OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLID;
1698
1699                 rc = obd_create(lmv->tgts[c].ltd_exp, oa, &obj_mdp, oti);
1700                 if (rc) {
1701                         CERROR("obd_create() failed on MDT target %d, "
1702                                "error %d\n", c, rc);
1703                         RETURN(rc);
1704                 }
1705
1706                 CDEBUG(D_OTHER, "dirobj at mds %d: "LPU64"/%u\n",
1707                        i, oa->o_id, oa->o_generation);
1708
1709
1710                 /*
1711                  * here, when object is created (or it is master and was passed
1712                  * from caller) on desired MDS we save its fid to local mea_ids.
1713                  */
1714                 LASSERT(oa->o_fid);
1715
1716                 /* 
1717                  * store cookie should be defined here for both cases (master
1718                  * object and not master), because master is already created.
1719                  */
1720                 LASSERT(oa->o_id);
1721
1722                 /* fill mea by store cookie and fid */
1723                 obdo2id(&mea->mea_ids[c], oa);
1724                 c++;
1725         }
1726         LASSERT(c == mea->mea_count);
1727
1728         CDEBUG(D_OTHER, "%d dirobjects created\n",
1729                (int)mea->mea_count);
1730         
1731         RETURN(rc);
1732 }
1733
1734 static int lmv_llog_init(struct obd_device *obd, struct obd_llogs *llogs, 
1735                          struct obd_device *tgt, int count,
1736                          struct llog_catid *logid)
1737 {
1738         struct llog_ctxt *ctxt;
1739         int rc;
1740         ENTRY;
1741
1742         rc = obd_llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
1743                             &llog_client_ops);
1744         if (rc == 0) {
1745                 ctxt = llog_get_context(llogs, LLOG_CONFIG_REPL_CTXT);
1746                 ctxt->loc_imp = tgt->u.cli.cl_import;
1747         }
1748
1749         RETURN(rc);
1750 }
1751
1752 static int lmv_llog_finish(struct obd_device *obd,
1753                            struct obd_llogs *llogs, int count)
1754 {
1755         int rc;
1756         ENTRY;
1757
1758         rc = obd_llog_cleanup(llog_get_context(llogs, LLOG_CONFIG_REPL_CTXT));
1759         RETURN(rc);
1760 }
1761
1762 static int lmv_get_info(struct obd_export *exp, __u32 keylen,
1763                         void *key, __u32 *vallen, void *val)
1764 {
1765         struct obd_device *obd;
1766         struct lmv_obd *lmv;
1767         int rc = 0;
1768         ENTRY;
1769
1770         obd = class_exp2obd(exp);
1771         if (obd == NULL) {
1772                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
1773                        exp->exp_handle.h_cookie);
1774                 RETURN(-EINVAL);
1775         }
1776
1777         lmv = &obd->u.lmv;
1778         if (keylen == 6 && memcmp(key, "mdsize", 6) == 0) {
1779                 __u32 *mdsize = val;
1780                 *vallen = sizeof(__u32);
1781                 *mdsize = sizeof(struct lustre_id) * lmv->desc.ld_tgt_count
1782                         + sizeof(struct mea);
1783                 RETURN(0);
1784         } else if (keylen == 6 && memcmp(key, "mdsnum", 6) == 0) {
1785                 struct obd_uuid *cluuid = &lmv->cluuid;
1786                 struct lmv_tgt_desc *tgts;
1787                 __u32 *mdsnum = val;
1788                 int i;
1789
1790                 for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgts++) {
1791                         if (obd_uuid_equals(&tgts->uuid, cluuid)) {
1792                                 *vallen = sizeof(__u32);
1793                                 *mdsnum = i;
1794                                 RETURN(0);
1795                         }
1796                 }
1797                 LASSERT(0);
1798         } else if (keylen == 6 && memcmp(key, "rootid", 6) == 0) {
1799                 /* getting rootid from first MDS. */
1800                 rc = obd_get_info(lmv->tgts[0].ltd_exp, keylen, key,
1801                                   vallen, val);
1802                 RETURN(rc);
1803         } else if (keylen >= strlen("lmvdesc") && strcmp(key, "lmvdesc") == 0) {
1804                 struct lmv_desc *desc_ret = val;
1805                 *desc_ret = lmv->desc;
1806                 RETURN(0);
1807         }
1808
1809         CDEBUG(D_IOCTL, "invalid key\n");
1810         RETURN(-EINVAL);
1811 }
1812
1813 int lmv_set_info(struct obd_export *exp, obd_count keylen,
1814                  void *key, obd_count vallen, void *val)
1815 {
1816         struct obd_device *obd;
1817         struct lmv_obd *lmv;
1818         ENTRY;
1819
1820         obd = class_exp2obd(exp);
1821         if (obd == NULL) {
1822                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
1823                        exp->exp_handle.h_cookie);
1824                 RETURN(-EINVAL);
1825         }
1826         lmv = &obd->u.lmv;
1827
1828         if (keylen >= strlen("inter_mds") && strcmp(key, "inter_mds") == 0) {
1829                 lmv->server_timeout = 1;
1830                 lmv_set_timeouts(obd);
1831                 RETURN(0);
1832         }
1833
1834         /* maybe this could be default */
1835         if ((keylen == strlen("sec") && strcmp(key, "sec") == 0) ||
1836             (keylen == strlen("nllu") && strcmp(key, "nllu") == 0)) {
1837                 struct lmv_tgt_desc *tgt;
1838                 struct obd_export *exp;
1839                 int rc = 0, err, i;
1840
1841                 spin_lock(&lmv->lmv_lock);
1842                 for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count;
1843                      i++, tgt++) {
1844                         exp = tgt->ltd_exp;
1845                         /* during setup time the connections to mdc might
1846                          * haven't been established.
1847                          */
1848                         if (exp == NULL) {
1849                                 struct obd_device *tgt_obd;
1850
1851                                 tgt_obd = class_find_client_obd(&tgt->uuid,
1852                                                                 LUSTRE_MDC_NAME,
1853                                                                 &obd->obd_uuid);
1854                                 if (!tgt_obd) {
1855                                         CERROR("can't set info %s, "
1856                                                "device %s not attached?\n",
1857                                                 (char *) key, tgt->uuid.uuid);
1858                                         rc = -EINVAL;
1859                                         continue;
1860                                 }
1861                                 exp = tgt_obd->obd_self_export;
1862                         }
1863
1864                         err = obd_set_info(exp, keylen, key, vallen, val);
1865                         if (!rc)
1866                                 rc = err;
1867                 }
1868                 spin_unlock(&lmv->lmv_lock);
1869
1870                 RETURN(rc);
1871         }
1872
1873         RETURN(-EINVAL);
1874 }
1875
1876 int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
1877                struct lov_stripe_md *lsm)
1878 {
1879         struct obd_device *obd = class_exp2obd(exp);
1880         struct lmv_obd *lmv = &obd->u.lmv;
1881         struct mea *meap, *lsmp;
1882         int mea_size, i;
1883         ENTRY;
1884
1885         mea_size = (sizeof(struct lustre_id) * 
1886                     lmv->desc.ld_tgt_count) + sizeof(struct mea);
1887         if (!lmmp)
1888                 RETURN(mea_size);
1889
1890         if (*lmmp && !lsm) {
1891                 OBD_FREE(*lmmp, mea_size);
1892                 *lmmp = NULL;
1893                 RETURN(0);
1894         }
1895
1896         if (*lmmp == NULL) {
1897                 OBD_ALLOC(*lmmp, mea_size);
1898                 if (*lmmp == NULL)
1899                         RETURN(-ENOMEM);
1900         }
1901
1902         if (!lsm)
1903                 RETURN(mea_size);
1904
1905         lsmp = (struct mea *)lsm;
1906         meap = (struct mea *)*lmmp;
1907
1908         meap->mea_magic = cpu_to_le32(lsmp->mea_magic);
1909         meap->mea_count = cpu_to_le32(lsmp->mea_count);
1910         meap->mea_master = cpu_to_le32(lsmp->mea_master);
1911
1912         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1913                 meap->mea_ids[i] = meap->mea_ids[i];
1914                 id_cpu_to_le(&meap->mea_ids[i]);
1915         }
1916
1917         RETURN(mea_size);
1918 }
1919
1920 int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **mem_tgt,
1921                  struct lov_mds_md *disk_src, int mdsize)
1922 {
1923         struct obd_device *obd = class_exp2obd(exp);
1924         struct mea **tmea = (struct mea **)mem_tgt;
1925         struct mea *mea = (struct mea *)disk_src;
1926         struct lmv_obd *lmv = &obd->u.lmv;
1927         int mea_size, i;
1928         ENTRY;
1929
1930         mea_size = sizeof(struct lustre_id) * 
1931                 lmv->desc.ld_tgt_count + sizeof(struct mea);
1932         if (mem_tgt == NULL)
1933                 return mea_size;
1934
1935         if (*mem_tgt != NULL && disk_src == NULL) {
1936                 OBD_FREE(*tmea, mea_size);
1937                 RETURN(0);
1938         }
1939
1940         LASSERT(mea_size == mdsize);
1941
1942         OBD_ALLOC(*tmea, mea_size);
1943         if (*tmea == NULL)
1944                 RETURN(-ENOMEM);
1945
1946         if (!disk_src)
1947                 RETURN(mea_size);
1948
1949         (*tmea)->mea_magic = le32_to_cpu(mea->mea_magic);
1950         (*tmea)->mea_count = le32_to_cpu(mea->mea_count);
1951         (*tmea)->mea_master = le32_to_cpu(mea->mea_master);
1952
1953         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1954                 (*tmea)->mea_ids[i] = mea->mea_ids[i];
1955                 id_le_to_cpu(&(*tmea)->mea_ids[i]);
1956         }
1957
1958         RETURN(mea_size);
1959 }
1960
1961 int lmv_brw(int rw, struct obd_export *exp, struct obdo *oa,
1962             struct lov_stripe_md *ea, obd_count oa_bufs,
1963             struct brw_page *pgarr, struct obd_trans_info *oti)
1964 {
1965         struct obd_device *obd = exp->exp_obd;
1966         struct lmv_obd *lmv = &obd->u.lmv;
1967         struct mea *mea = (struct mea *) ea;
1968         int err;
1969       
1970         LASSERT(oa != NULL);
1971         LASSERT(ea != NULL);
1972         LASSERT(pgarr != NULL);
1973         LASSERT(oa->o_mds < lmv->desc.ld_tgt_count);
1974
1975         oa->o_gr = id_gen(&mea->mea_ids[oa->o_mds]);
1976         oa->o_id = id_ino(&mea->mea_ids[oa->o_mds]);
1977         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
1978         
1979         err = obd_brw(rw, lmv->tgts[oa->o_mds].ltd_exp,
1980                       oa, NULL, oa_bufs, pgarr, oti);
1981         RETURN(err);
1982 }
1983
1984 struct obd_ops lmv_obd_ops = {
1985         .o_owner                = THIS_MODULE,
1986         .o_attach               = lmv_attach,
1987         .o_detach               = lmv_detach,
1988         .o_setup                = lmv_setup,
1989         .o_cleanup              = lmv_cleanup,
1990         .o_connect              = lmv_connect,
1991         .o_disconnect           = lmv_disconnect,
1992         .o_statfs               = lmv_statfs,
1993         .o_llog_init            = lmv_llog_init,
1994         .o_llog_finish          = lmv_llog_finish,
1995         .o_get_info             = lmv_get_info,
1996         .o_set_info             = lmv_set_info,
1997         .o_create               = lmv_obd_create,
1998         .o_packmd               = lmv_packmd,
1999         .o_unpackmd             = lmv_unpackmd,
2000         .o_brw                  = lmv_brw,
2001         .o_init_ea_size         = lmv_init_ea_size,
2002         .o_notify               = lmv_notify,
2003         .o_iocontrol            = lmv_iocontrol,
2004         .o_getready             = lmv_getready,
2005 };
2006
2007 struct md_ops lmv_md_ops = {
2008         .m_getstatus           = lmv_getstatus,
2009         .m_getattr             = lmv_getattr,
2010         .m_change_cbdata       = lmv_change_cbdata,
2011         .m_change_cbdata_name  = lmv_change_cbdata_name,
2012         .m_close               = lmv_close,
2013         .m_create              = lmv_create,
2014         .m_done_writing        = lmv_done_writing,
2015         .m_enqueue             = lmv_enqueue,
2016         .m_getattr_lock        = lmv_getattr_lock,
2017         .m_intent_lock         = lmv_intent_lock,
2018         .m_link                = lmv_link,
2019         .m_rename              = lmv_rename,
2020         .m_setattr             = lmv_setattr,
2021         .m_sync                = lmv_sync,
2022         .m_readpage            = lmv_readpage,
2023         .m_unlink              = lmv_unlink,
2024         .m_get_real_obd        = lmv_get_real_obd,
2025         .m_valid_attrs         = lmv_valid_attrs,
2026         .m_delete_inode        = lmv_delete_inode,
2027 };
2028
2029 int __init lmv_init(void)
2030 {
2031         struct lprocfs_static_vars lvars;
2032         int rc;
2033
2034         obj_cache = kmem_cache_create("lmv_objects",
2035                                       sizeof(struct lmv_obj),
2036                                       0, 0, NULL, NULL);
2037         if (!obj_cache) {
2038                 CERROR("error allocating lmv objects cache\n");
2039                 return -ENOMEM;
2040         }
2041
2042         lprocfs_init_vars(lmv, &lvars);
2043         rc = class_register_type(&lmv_obd_ops, &lmv_md_ops,
2044                                  lvars.module_vars,
2045                                  OBD_LMV_DEVICENAME);
2046         if (rc)
2047                 kmem_cache_destroy(obj_cache);
2048         
2049         return rc;
2050 }
2051
2052 #ifdef __KERNEL__
2053 static void lmv_exit(void)
2054 {
2055         class_unregister_type(OBD_LMV_DEVICENAME);
2056
2057         LASSERTF(kmem_cache_destroy(obj_cache) == 0,
2058                  "can't free lmv objects cache, %d object(s)"
2059                  "still in use\n", atomic_read(&obj_cache_count));
2060 }
2061
2062 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2063 MODULE_DESCRIPTION("Lustre Logical Metadata Volume OBD driver");
2064 MODULE_LICENSE("GPL");
2065
2066 module_init(lmv_init);
2067 module_exit(lmv_exit);
2068 #endif