Whamcloud - gitweb
Land b_hd_capa onto HEAD (20050809_1942)
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LMV
26 #ifdef __KERNEL__
27 #include <linux/slab.h>
28 #include <linux/module.h>
29 #include <linux/init.h>
30 #include <linux/slab.h>
31 #include <linux/pagemap.h>
32 #include <asm/div64.h>
33 #include <linux/seq_file.h>
34 #include <linux/namei.h>
35 #else
36 #include <liblustre.h>
37 #endif
38 #include <linux/ext2_fs.h>
39
40 #include <linux/obd_support.h>
41 #include <linux/lustre_lib.h>
42 #include <linux/lustre_net.h>
43 #include <linux/lustre_idl.h>
44 #include <linux/lustre_dlm.h>
45 #include <linux/lustre_mds.h>
46 #include <linux/obd_class.h>
47 #include <linux/obd_ost.h>
48 #include <linux/lprocfs_status.h>
49 #include <linux/lustre_fsfilt.h>
50 #include <linux/obd_lmv.h>
51 #include <linux/lustre_lite.h>
52 #include <linux/lustre_audit.h>
53 #include "lmv_internal.h"
54
55 /* not defined for liblustre building */
56 #if !defined(ATOMIC_INIT)
57 #define ATOMIC_INIT(val) { (val) }
58 #endif
59
60 /* object cache. */
61 kmem_cache_t *obj_cache;
62 atomic_t obj_cache_count = ATOMIC_INIT(0);
63
64 static void lmv_activate_target(struct lmv_obd *lmv,
65                                 struct lmv_tgt_desc *tgt,
66                                 int activate)
67 {
68         if (tgt->active == activate)
69                 return;
70         
71         tgt->active = activate;
72         lmv->desc.ld_active_tgt_count += (activate ? 1 : -1);
73 }
74
75 /* Error codes:
76  *
77  *  -EINVAL  : UUID can't be found in the LMV's target list
78  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
79  *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
80  */
81 static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
82                               int activate)
83 {
84         struct lmv_tgt_desc *tgt;
85         struct obd_device *obd;
86         int i, rc = 0;
87         ENTRY;
88
89         CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
90                lmv, uuid->uuid, activate);
91
92         spin_lock(&lmv->lmv_lock);
93         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
94                 if (tgt->ltd_exp == NULL)
95                         continue;
96
97                 CDEBUG(D_INFO, "lmv idx %d is %s conn "LPX64"\n",
98                        i, tgt->uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
99
100                 if (obd_uuid_equals(uuid, &tgt->uuid))
101                         break;
102         }
103
104         if (i == lmv->desc.ld_tgt_count)
105                 GOTO(out_lmv_lock, rc = -EINVAL);
106
107         obd = class_exp2obd(tgt->ltd_exp);
108         if (obd == NULL)
109                 GOTO(out_lmv_lock, rc = -ENOTCONN);
110
111         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
112                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
113                obd->obd_type->typ_name, i);
114         LASSERT(strcmp(obd->obd_type->typ_name, OBD_MDC_DEVICENAME) == 0);
115
116         if (tgt->active == activate) {
117                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
118                        activate ? "" : "in");
119                 GOTO(out_lmv_lock, rc);
120         }
121
122         CDEBUG(D_INFO, "Marking OBD %p %sactive\n",
123                obd, activate ? "" : "in");
124
125         lmv_activate_target(lmv, tgt, activate);
126
127         EXIT;
128         
129  out_lmv_lock:
130         spin_unlock(&lmv->lmv_lock);
131         return rc;
132 }
133
134 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
135                       int active, void *data)
136 {
137         struct obd_uuid *uuid;
138         int rc;
139         ENTRY;
140
141         if (strcmp(watched->obd_type->typ_name, OBD_MDC_DEVICENAME)) {
142                 CERROR("unexpected notification of %s %s!\n",
143                        watched->obd_type->typ_name,
144                        watched->obd_name);
145                 RETURN(-EINVAL);
146         }
147         uuid = &watched->u.cli.cl_import->imp_target_uuid;
148
149         /* Set MDC as active before notifying the observer, so the observer can
150          * use the MDC normally.
151          */
152         rc = lmv_set_mdc_active(&obd->u.lmv, uuid, active);
153         if (rc) {
154                 CERROR("%sactivation of %s failed: %d\n",
155                        active ? "" : "de", uuid->uuid, rc);
156                 RETURN(rc);
157         }
158
159         if (obd->obd_observer)
160                 /* Pass the notification up the chain. */
161                 rc = obd_notify(obd->obd_observer, watched, active, data);
162
163         RETURN(rc);
164 }
165
166 static int lmv_attach(struct obd_device *dev, obd_count len, void *data)
167 {
168         struct lprocfs_static_vars lvars;
169         int rc;
170         ENTRY;
171
172         lprocfs_init_vars(lmv, &lvars);
173         rc = lprocfs_obd_attach(dev, lvars.obd_vars);
174 #ifdef __KERNEL__
175         if (rc == 0) {
176                 struct proc_dir_entry *entry;
177                 
178                 entry = create_proc_entry("target_obd_status", 0444, 
179                                            dev->obd_proc_entry);
180                 if (entry == NULL)
181                         RETURN(-ENOMEM);
182                 entry->proc_fops = &lmv_proc_target_fops; 
183                 entry->data = dev;
184        }
185 #endif
186         RETURN (rc);
187 }
188
189 static int lmv_detach(struct obd_device *dev)
190 {
191         return lprocfs_obd_detach(dev);
192 }
193
194 /* this is fake connect function. Its purpose is to initialize lmv and say
195  * caller that everything is okay. Real connection will be performed later. */
196 static int lmv_connect(struct lustre_handle *conn, struct obd_device *obd,
197                        struct obd_uuid *cluuid, struct obd_connect_data *data,
198                        unsigned long flags)
199 {
200 #ifdef __KERNEL__
201         struct proc_dir_entry *lmv_proc_dir;
202 #endif
203         struct lmv_obd *lmv = &obd->u.lmv;
204         struct obd_export *exp;
205         int rc = 0;
206         ENTRY;
207
208         rc = class_connect(conn, obd, cluuid);
209         if (rc) {
210                 CERROR("class_connection() returned %d\n", rc);
211                 RETURN(rc);
212         }
213
214         exp = class_conn2export(conn);
215         
216         /* we don't want to actually do the underlying connections more than
217          * once, so keep track. */
218         lmv->refcount++;
219         if (lmv->refcount > 1) {
220                 class_export_put(exp);
221                 RETURN(0);
222         }
223
224         lmv->exp = exp;
225         lmv->connected = 0;
226         lmv->cluuid = *cluuid;
227         lmv->connect_flags = flags;
228         sema_init(&lmv->init_sem, 1);
229         if (data)
230                 memcpy(&lmv->conn_data, data, sizeof(*data));
231
232 #ifdef __KERNEL__
233         lmv_proc_dir = lprocfs_register("target_obds", obd->obd_proc_entry,
234                                         NULL, NULL);
235         if (IS_ERR(lmv_proc_dir)) {
236                 CERROR("could not register /proc/fs/lustre/%s/%s/target_obds.",
237                        obd->obd_type->typ_name, obd->obd_name);
238                 lmv_proc_dir = NULL;
239         }
240 #endif
241
242         /* 
243          * all real clients shouls perform actual connection rightaway, because
244          * it is possible, that LMV will not have opportunity to connect
245          * targets, as MDC stuff will bit called directly, for instance while
246          * reading ../mdc/../kbytesfree procfs file, etc.
247          */
248         if (flags & OBD_OPT_REAL_CLIENT)
249                 rc = lmv_check_connect(obd);
250
251 #ifdef __KERNEL__
252         if (rc) {
253                 if (lmv_proc_dir)
254                         lprocfs_remove(lmv_proc_dir);
255         }
256 #endif
257
258         RETURN(rc);
259 }
260
261 static void lmv_set_timeouts(struct obd_device *obd)
262 {
263         struct lmv_tgt_desc *tgts;
264         struct lmv_obd *lmv;
265         int i;
266
267         lmv = &obd->u.lmv;
268         if (lmv->server_timeout == 0)
269                 return;
270
271         if (lmv->connected == 0)
272                 return;
273
274         for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgts++) {
275                 if (tgts->ltd_exp == NULL)
276                         continue;
277                 
278                 obd_set_info(tgts->ltd_exp, strlen("inter_mds"),
279                              "inter_mds", 0, NULL);
280         }
281 }
282
283 #define MAX_STRING_SIZE 128
284
285 /* performs a check if passed obd is connected. If no - connect it. */
286 int lmv_check_connect(struct obd_device *obd)
287 {
288 #ifdef __KERNEL__
289         struct proc_dir_entry *lmv_proc_dir;
290 #endif
291         struct lmv_obd *lmv = &obd->u.lmv;
292         struct lmv_tgt_desc *tgts;
293         struct obd_uuid *cluuid;
294         struct obd_export *exp;
295         int rc, rc2, i;
296         ENTRY;
297
298         if (lmv->connected)
299                 RETURN(0);
300         
301         down(&lmv->init_sem);
302         if (lmv->connected) {
303                 up(&lmv->init_sem);
304                 RETURN(0);
305         }
306
307         cluuid = &lmv->cluuid;
308         exp = lmv->exp;
309         
310         CDEBUG(D_OTHER, "time to connect %s to %s\n",
311                cluuid->uuid, obd->obd_name);
312
313         for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgts++) {
314                 struct obd_uuid lmv_mdc_uuid = { "LMV_MDC_UUID" };
315                 struct lustre_handle conn = {0, };
316                 struct obd_device *tgt_obd;
317
318                 LASSERT(tgts != NULL);
319
320                 tgt_obd = class_find_client_obd(&tgts->uuid, OBD_MDC_DEVICENAME, 
321                                                 &obd->obd_uuid);
322                 if (!tgt_obd) {
323                         CERROR("target %s not attached\n", tgts->uuid.uuid);
324                         GOTO(out_disc, rc = -EINVAL);
325                 }
326
327                 /* for MDS: don't connect to yourself */
328                 if (obd_uuid_equals(&tgts->uuid, cluuid)) {
329                         CDEBUG(D_OTHER, "don't connect back to %s\n",
330                                cluuid->uuid);
331                         tgts->ltd_exp = NULL;
332                         continue;
333                 }
334
335                 CDEBUG(D_OTHER, "connect to %s(%s) - %s, %s FOR %s\n",
336                         tgt_obd->obd_name, tgt_obd->obd_uuid.uuid,
337                         tgts->uuid.uuid, obd->obd_uuid.uuid,
338                         cluuid->uuid);
339
340                 if (!tgt_obd->obd_set_up) {
341                         CERROR("target %s not set up\n", tgts->uuid.uuid);
342                         GOTO(out_disc, rc = -EINVAL);
343                 }
344                 
345                 rc = obd_connect(&conn, tgt_obd, &lmv_mdc_uuid, &lmv->conn_data,
346                                  lmv->connect_flags);
347                 if (rc) {
348                         CERROR("target %s connect error %d\n",
349                                 tgts->uuid.uuid, rc);
350                         GOTO(out_disc, rc);
351                 }
352                 tgts->ltd_exp = class_conn2export(&conn);
353
354                 obd_init_ea_size(tgts->ltd_exp, lmv->max_easize,
355                                  lmv->max_cookiesize);
356
357                 rc = obd_register_observer(tgt_obd, obd);
358                 if (rc) {
359                         CERROR("target %s register_observer error %d\n",
360                                tgts->uuid.uuid, rc);
361                         obd_disconnect(tgts->ltd_exp, 0);
362                         GOTO(out_disc, rc);
363                 }
364
365                 lmv->desc.ld_active_tgt_count++;
366                 tgts->active = 1;
367
368                 CDEBUG(D_OTHER, "connected to %s(%s) successfully (%d)\n",
369                         tgt_obd->obd_name, tgt_obd->obd_uuid.uuid,
370                         atomic_read(&obd->obd_refcount));
371
372 #ifdef __KERNEL__
373                 lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
374                 if (lmv_proc_dir) {
375                         struct obd_device *mdc_obd = class_conn2obd(&conn);
376                         struct proc_dir_entry *mdc_symlink;
377                         char name[MAX_STRING_SIZE + 1];
378
379                         LASSERT(mdc_obd != NULL);
380                         LASSERT(mdc_obd->obd_type != NULL);
381                         LASSERT(mdc_obd->obd_type->typ_name != NULL);
382                         name[MAX_STRING_SIZE] = '\0';
383                         snprintf(name, MAX_STRING_SIZE, "../../../%s/%s",
384                                  mdc_obd->obd_type->typ_name,
385                                  mdc_obd->obd_name);
386                         mdc_symlink = proc_symlink(mdc_obd->obd_name,
387                                                    lmv_proc_dir, name);
388                         if (mdc_symlink == NULL) {
389                                 CERROR("could not register LMV target "
390                                        "/proc/fs/lustre/%s/%s/target_obds/%s.",
391                                        obd->obd_type->typ_name, obd->obd_name,
392                                        mdc_obd->obd_name);
393                                 lprocfs_remove(lmv_proc_dir);
394                                 lmv_proc_dir = NULL;
395                         }
396                 }
397 #endif
398         }
399
400         lmv_set_timeouts(obd);
401         class_export_put(exp);
402         lmv->connected = 1;
403         up(&lmv->init_sem);
404         RETURN(0);
405
406  out_disc:
407         while (i-- > 0) {
408                 struct obd_uuid uuid;
409                 --tgts;
410                 --lmv->desc.ld_active_tgt_count;
411                 tgts->active = 0;
412                 /* save for CERROR below; (we know it's terminated) */
413                 uuid = tgts->uuid;
414                 rc2 = obd_disconnect(tgts->ltd_exp, 0);
415                 if (rc2)
416                         CERROR("error: LMV target %s disconnect on MDC idx %d: "
417                                "error %d\n", uuid.uuid, i, rc2);
418         }
419         class_disconnect(exp, 0);
420         up(&lmv->init_sem);
421         return rc;
422 }
423
424 static int lmv_disconnect(struct obd_export *exp, unsigned long flags)
425 {
426         struct obd_device *obd = class_exp2obd(exp);
427         struct lmv_obd *lmv = &obd->u.lmv;
428
429 #ifdef __KERNEL__
430         struct proc_dir_entry *lmv_proc_dir;
431 #endif
432         int rc, i;
433         ENTRY;
434
435         if (!lmv->tgts)
436                 goto out_local;
437
438         /* Only disconnect the underlying layers on the final disconnect. */
439         lmv->refcount--;
440         if (lmv->refcount != 0)
441                 goto out_local;
442
443 #ifdef __KERNEL__
444         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
445 #endif
446
447         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
448                 struct obd_device *mdc_obd; 
449                 
450                 if (lmv->tgts[i].ltd_exp == NULL)
451                         continue;
452
453                 mdc_obd = class_exp2obd(lmv->tgts[i].ltd_exp);
454
455                 if (mdc_obd)
456                         mdc_obd->obd_no_recov = obd->obd_no_recov;
457
458 #ifdef __KERNEL__
459                 if (lmv_proc_dir) {
460                         struct proc_dir_entry *mdc_symlink;
461
462                         mdc_symlink = lprocfs_srch(lmv_proc_dir, mdc_obd->obd_name);
463                         if (mdc_symlink) {
464                                 lprocfs_remove(mdc_symlink);
465                         } else {
466                                 CERROR("/proc/fs/lustre/%s/%s/target_obds/%s missing\n",
467                                        obd->obd_type->typ_name, obd->obd_name,
468                                        mdc_obd->obd_name);
469                         }
470                 }
471 #endif
472                 CDEBUG(D_OTHER, "disconnected from %s(%s) successfully\n",
473                         lmv->tgts[i].ltd_exp->exp_obd->obd_name,
474                         lmv->tgts[i].ltd_exp->exp_obd->obd_uuid.uuid);
475
476                 obd_register_observer(lmv->tgts[i].ltd_exp->exp_obd, NULL);
477                 rc = obd_disconnect(lmv->tgts[i].ltd_exp, flags);
478                 if (rc) {
479                         if (lmv->tgts[i].active) {
480                                 CERROR("Target %s disconnect error %d\n",
481                                        lmv->tgts[i].uuid.uuid, rc);
482                         }
483                         rc = 0;
484                 }
485                 
486                 lmv_activate_target(lmv, &lmv->tgts[i], 0);
487                 lmv->tgts[i].ltd_exp = NULL;
488         }
489
490 #ifdef __KERNEL__
491         if (lmv_proc_dir) {
492                 lprocfs_remove(lmv_proc_dir);
493         } else {
494                 CERROR("/proc/fs/lustre/%s/%s/target_obds missing\n",
495                        obd->obd_type->typ_name, obd->obd_name);
496         }
497 #endif
498
499 out_local:
500         /* this is the case when no real connection is established by
501          * lmv_check_connect(). */
502         if (!lmv->connected)
503                 class_export_put(exp);
504         rc = class_disconnect(exp, 0);
505         if (lmv->refcount == 0)
506                 lmv->connected = 0;
507         RETURN(rc);
508 }
509
510 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
511                          int len, void *karg, void *uarg)
512 {
513         struct obd_device *obddev = class_exp2obd(exp);
514         struct lmv_obd *lmv = &obddev->u.lmv;
515         int i, rc = 0, set = 0;
516         ENTRY;
517
518         if (lmv->desc.ld_tgt_count == 0)
519                 RETURN(-ENOTTY);
520         
521         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
522                 int err;
523
524                 if (lmv->tgts[i].ltd_exp == NULL)
525                         continue;
526
527                 err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp, len, karg, uarg);
528                 if (err) {
529                         if (lmv->tgts[i].active) {
530                                 CERROR("error: iocontrol MDC %s on MDT"
531                                        "idx %d: err = %d\n",
532                                        lmv->tgts[i].uuid.uuid, i, err);
533                                 if (!rc)
534                                         rc = err;
535                         }
536                 } else
537                         set = 1;
538         }
539         if (!set && !rc)
540                 rc = -EIO;
541
542         RETURN(rc);
543 }
544
545 static int lmv_setup(struct obd_device *obd, obd_count len, void *buf)
546 {
547         int i, rc = 0;
548         struct lmv_desc *desc;
549         struct obd_uuid *uuids;
550         struct lmv_tgt_desc *tgts;
551         struct obd_device *tgt_obd;
552         struct lustre_cfg *lcfg = buf;
553         struct lmv_obd *lmv = &obd->u.lmv;
554         ENTRY;
555
556         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
557                 CERROR("LMV setup requires a descriptor\n");
558                 RETURN(-EINVAL);
559         }
560
561         if (LUSTRE_CFG_BUFLEN(lcfg, 2) < 1) {
562                 CERROR("LMV setup requires an MDT UUID list\n");
563                 RETURN(-EINVAL);
564         }
565
566         desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
567         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
568                 CERROR("descriptor size wrong: %d > %d\n",
569                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
570                 RETURN(-EINVAL);
571         }
572
573         uuids = (struct obd_uuid *)lustre_cfg_buf(lcfg, 2);
574         if (sizeof(*uuids) * desc->ld_tgt_count != LUSTRE_CFG_BUFLEN(lcfg, 2)) {
575                 CERROR("UUID array size wrong: %u * %u != %u\n",
576                        sizeof(*uuids), desc->ld_tgt_count, LUSTRE_CFG_BUFLEN(lcfg, 2));
577                 RETURN(-EINVAL);
578         }
579
580         lmv->tgts_size = sizeof(struct lmv_tgt_desc) * desc->ld_tgt_count;
581         OBD_ALLOC(lmv->tgts, lmv->tgts_size);
582         if (lmv->tgts == NULL) {
583                 CERROR("Out of memory\n");
584                 RETURN(-ENOMEM);
585         }
586
587         lmv->desc = *desc;
588         spin_lock_init(&lmv->lmv_lock);
589         
590         for (i = 0, tgts = lmv->tgts; i < desc->ld_tgt_count; i++, tgts++)
591                 tgts->uuid = uuids[i];
592         
593         lmv->max_cookiesize = 0;
594
595         lmv->max_easize = sizeof(struct lustre_id) *
596                 desc->ld_tgt_count + sizeof(struct mea);
597         
598         rc = lmv_setup_mgr(obd);
599         if (rc) {
600                 CERROR("Can't setup LMV object manager, "
601                        "error %d.\n", rc);
602                 OBD_FREE(lmv->tgts, lmv->tgts_size);
603         }
604
605         tgt_obd = class_find_client_obd(&lmv->tgts->uuid, OBD_MDC_DEVICENAME,
606                                         &obd->obd_uuid);
607         if (!tgt_obd) {
608                 CERROR("Target %s not attached\n", lmv->tgts->uuid.uuid);
609                 RETURN(-EINVAL);
610         }
611
612         RETURN(rc);
613 }
614
615 static int lmv_cleanup(struct obd_device *obd, int flags) 
616 {
617         struct lmv_obd *lmv = &obd->u.lmv;
618         ENTRY;
619
620         lmv_cleanup_mgr(obd);
621         OBD_FREE(lmv->tgts, lmv->tgts_size);
622         
623         RETURN(0);
624 }
625
626 static int lmv_statfs(struct obd_device *obd, struct obd_statfs *osfs,
627                       unsigned long max_age)
628 {
629         struct lmv_obd *lmv = &obd->u.lmv;
630         struct obd_statfs *temp;
631         int rc = 0, i;
632         ENTRY;
633         
634         rc = lmv_check_connect(obd);
635         if (rc)
636                 RETURN(rc);
637
638         OBD_ALLOC(temp, sizeof(*temp));
639         if (temp == NULL)
640                 RETURN(-ENOMEM);
641                 
642         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
643                 if (lmv->tgts[i].ltd_exp == NULL)
644                         continue;
645
646                 rc = obd_statfs(lmv->tgts[i].ltd_exp->exp_obd, temp, max_age);
647                 if (rc) {
648                         CERROR("can't stat MDS #%d (%s), error %d\n", i,
649                                lmv->tgts[i].ltd_exp->exp_obd->obd_name,
650                                rc);
651                         GOTO(out_free_temp, rc);
652                 }
653                 if (i == 0) {
654                         memcpy(osfs, temp, sizeof(*temp));
655                 } else {
656                         osfs->os_bavail += temp->os_bavail;
657                         osfs->os_blocks += temp->os_blocks;
658                         osfs->os_ffree += temp->os_ffree;
659                         osfs->os_files += temp->os_files;
660                 }
661         }
662
663         EXIT;
664 out_free_temp:
665         OBD_FREE(temp, sizeof(*temp));
666         return rc;
667 }
668
669 static int lmv_getstatus(struct obd_export *exp, struct lustre_id *id)
670 {
671         struct obd_device *obd = exp->exp_obd;
672         struct lmv_obd *lmv = &obd->u.lmv;
673         int rc;
674         ENTRY;
675
676         rc = lmv_check_connect(obd);
677         if (rc)
678                 RETURN(rc);
679
680         rc = md_getstatus(lmv->tgts[0].ltd_exp, id);
681         id_group(id) = 0;
682         
683         RETURN(rc);
684 }
685
686 static int lmv_getattr(struct obd_export *exp, struct lustre_id *id,
687                        __u64 valid, const char *xattr_name,
688                        const void *xattr_data, unsigned int xattr_datalen,
689                        unsigned int ea_size, struct obd_capa *ocapa,
690                        struct ptlrpc_request **request)
691 {
692         struct obd_device *obd = exp->exp_obd;
693         struct lmv_obd *lmv = &obd->u.lmv;
694         int rc, i = id_group(id);
695         struct lmv_obj *obj;
696         ENTRY;
697
698         rc = lmv_check_connect(obd);
699         if (rc)
700                 RETURN(rc);
701
702         LASSERT(i < lmv->desc.ld_tgt_count);
703
704
705         rc = md_getattr(lmv->tgts[i].ltd_exp, id, valid,
706                         xattr_name, xattr_data, xattr_datalen,
707                         ea_size, ocapa, request);
708         if (rc)
709                 RETURN(rc);
710         
711         obj = lmv_grab_obj(obd, id);
712         
713         CDEBUG(D_OTHER, "GETATTR for "DLID4" %s\n",
714                OLID4(id), obj ? "(splitted)" : "");
715
716         /*
717          * if object is splitted, then we loop over all the slaves and gather
718          * size attribute. In ideal world we would have to gather also mds field
719          * from all slaves, as object is spread over the cluster and this is
720          * definitely interesting information and it is not good to loss it,
721          * but...
722          */
723         if (obj) {
724                 struct mds_body *body;
725
726                 if (*request == NULL) {
727                         lmv_put_obj(obj);
728                         RETURN(rc);
729                 }
730                         
731                 body = lustre_msg_buf((*request)->rq_repmsg, 0,
732                                       sizeof(*body));
733                 LASSERT(body != NULL);
734
735                 lmv_lock_obj(obj);
736         
737                 for (i = 0; i < obj->objcount; i++) {
738
739                         if (lmv->tgts[i].ltd_exp == NULL) {
740                                 CWARN("%s: NULL export for %d\n",
741                                       obd->obd_name, i);
742                                 continue;
743                         }
744
745                         /* skip master obj. */
746                         if (id_equal_fid(&obj->id, &obj->objs[i].id))
747                                 continue;
748                         
749                         body->size += obj->objs[i].size;
750                 }
751
752                 lmv_unlock_obj(obj);
753                 lmv_put_obj(obj);
754         }
755         
756         RETURN(rc);
757 }
758
759 static int lmv_access_check(struct obd_export *exp,
760                             struct lustre_id *id,
761                             struct ptlrpc_request **request)
762 {
763         struct obd_device *obd = exp->exp_obd;
764         struct lmv_obd *lmv = &obd->u.lmv;
765         int rc, i = id_group(id);
766         ENTRY;
767
768         rc = lmv_check_connect(obd);
769         if (rc)
770                 RETURN(rc);
771
772         LASSERT(i < lmv->desc.ld_tgt_count);
773         rc = md_access_check(lmv->tgts[i].ltd_exp, id, request);
774         RETURN(rc);
775 }
776
777 static int lmv_change_cbdata(struct obd_export *exp,
778                              struct lustre_id *id, 
779                              ldlm_iterator_t it,
780                              void *data)
781 {
782         struct obd_device *obd = exp->exp_obd;
783         struct lmv_obd *lmv = &obd->u.lmv;
784         int i, rc;
785         ENTRY;
786         
787         rc = lmv_check_connect(obd);
788         if (rc)
789                 RETURN(rc);
790         
791         CDEBUG(D_OTHER, "CBDATA for "DLID4"\n", OLID4(id));
792         LASSERT(id_group(id) < lmv->desc.ld_tgt_count);
793
794         /* with CMD every object can have two locks in different
795          * namespaces: lookup lock in space of mds storing direntry
796          * and update/open lock in space of mds storing inode */
797         for (i = 0; i < lmv->desc.ld_tgt_count; i++)
798                 md_change_cbdata(lmv->tgts[i].ltd_exp, id, it, data);
799         
800         RETURN(0);
801 }
802
803 static int lmv_change_cbdata_name(struct obd_export *exp,
804                                   struct lustre_id *pid,
805                                   char *name, int len,
806                                   struct lustre_id *cid,
807                                   ldlm_iterator_t it,
808                                   void *data)
809 {
810         struct obd_device *obd = exp->exp_obd;
811         struct lmv_obd *lmv = &obd->u.lmv;
812         struct lustre_id rcid = *cid;
813         struct lmv_obj *obj;
814         int rc = 0, mds;
815         ENTRY;
816
817         rc = lmv_check_connect(obd);
818         if (rc)
819                 RETURN(rc);
820
821         LASSERT(id_group(pid) < lmv->desc.ld_tgt_count);
822         LASSERT(id_group(cid) < lmv->desc.ld_tgt_count);
823         
824         CDEBUG(D_OTHER, "CBDATA for "DLID4":%*s -> "DLID4"\n",
825                OLID4(pid), len, name, OLID4(cid));
826
827         /* this is default mds for directory name belongs to. */
828         mds = id_group(pid);
829         obj = lmv_grab_obj(obd, pid);
830         if (obj) {
831                 /* directory is splitted. look for right mds for this name. */
832                 mds = raw_name2idx(obj->hashtype, obj->objcount, name, len);
833                 rcid = obj->objs[mds].id;
834                 mds = id_group(&rcid);
835                 lmv_put_obj(obj);
836         }
837         rc = md_change_cbdata(lmv->tgts[mds].ltd_exp, &rcid, it, data);
838         RETURN(rc);
839 }
840
841 static int lmv_valid_attrs(struct obd_export *exp, struct lustre_id *id) 
842 {
843         struct obd_device *obd = exp->exp_obd;
844         struct lmv_obd *lmv = &obd->u.lmv;
845         int rc = 0;
846         ENTRY;
847
848         rc = lmv_check_connect(obd);
849         if (rc)
850                 RETURN(rc);
851
852         CDEBUG(D_OTHER, "validate "DLID4"\n", OLID4(id));
853         LASSERT(id_group(id) < lmv->desc.ld_tgt_count);
854         rc = md_valid_attrs(lmv->tgts[id_group(id)].ltd_exp, id);
855         RETURN(rc);
856 }
857
858 static int lmv_close(struct obd_export *exp, struct obdo *obdo,
859                      struct obd_client_handle *och,
860                      struct ptlrpc_request **request)
861 {
862         struct obd_device *obd = exp->exp_obd;
863         struct lmv_obd *lmv = &obd->u.lmv;
864         int rc, i = obdo->o_mds;
865         ENTRY;
866         
867         rc = lmv_check_connect(obd);
868         if (rc)
869                 RETURN(rc);
870
871         LASSERT(i < lmv->desc.ld_tgt_count);
872         CDEBUG(D_OTHER, "CLOSE %lu/%lu/%lu\n", (unsigned long)obdo->o_mds,
873                (unsigned long)obdo->o_id, (unsigned long)obdo->o_generation);
874         rc = md_close(lmv->tgts[i].ltd_exp, obdo, och, request);
875         RETURN(rc);
876 }
877
878 int lmv_get_mea_and_update_object(struct obd_export *exp, 
879                                   struct lustre_id *id)
880 {
881         struct obd_device *obd = exp->exp_obd;
882         struct lmv_obd *lmv = &obd->u.lmv;
883         struct ptlrpc_request *req = NULL;
884         struct lmv_obj *obj;
885         struct lustre_md md;
886         int mealen, rc;
887         __u64 valid;
888         ENTRY;
889
890         md.mea = NULL;
891         mealen = MEA_SIZE_LMV(lmv);
892         
893         valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA;
894
895         /* time to update mea of parent id */
896         rc = md_getattr(lmv->tgts[id_group(id)].ltd_exp,
897                         id, valid, NULL, NULL, 0, mealen, NULL, &req);
898         if (rc) {
899                 CERROR("md_getattr() failed, error %d\n", rc);
900                 GOTO(cleanup, rc);
901         }
902
903         rc = mdc_req2lustre_md(exp, req, 0, NULL, &md);
904         if (rc) {
905                 CERROR("mdc_req2lustre_md() failed, error %d\n", rc);
906                 GOTO(cleanup, rc);
907         }
908
909         if (md.mea == NULL)
910                 GOTO(cleanup, rc = -ENODATA);
911
912         obj = lmv_create_obj(exp, id, md.mea);
913         if (IS_ERR(obj))
914                 rc = PTR_ERR(obj);
915         else
916                 lmv_put_obj(obj);
917
918         obd_free_memmd(exp, (struct lov_stripe_md **)&md.mea);
919
920         EXIT;
921 cleanup:
922         if (req)
923                 ptlrpc_req_finished(req);
924         return rc;
925 }
926
927 int lmv_create(struct obd_export *exp, struct mdc_op_data *op_data,
928                const void *data, int datalen, int mode, __u32 uid,
929                __u32 gid, __u64 rdev, struct ptlrpc_request **request)
930 {
931         struct obd_device *obd = exp->exp_obd;
932         struct lmv_obd *lmv = &obd->u.lmv;
933         struct mds_body *body;
934         struct lmv_obj *obj;
935         int rc, mds, loop = 0;
936         ENTRY;
937
938         rc = lmv_check_connect(obd);
939         if (rc)
940                 RETURN(rc);
941
942         if (!lmv->desc.ld_active_tgt_count)
943                 RETURN(-EIO);
944 repeat:
945         LASSERT(++loop <= 2);
946         obj = lmv_grab_obj(obd, &op_data->id1);
947         if (obj) {
948                 mds = raw_name2idx(obj->hashtype, obj->objcount, 
949                                    op_data->name, op_data->namelen);
950                 op_data->id1 = obj->objs[mds].id;
951                 lmv_put_obj(obj);
952         }
953
954         CDEBUG(D_OTHER, "CREATE '%*s' on "DLID4"\n", op_data->namelen,
955                op_data->name, OLID4(&op_data->id1));
956         
957         rc = md_create(lmv->tgts[id_group(&op_data->id1)].ltd_exp, 
958                        op_data, data, datalen, mode, uid, gid, rdev,
959                        request);
960         if (rc == 0) {
961                 if (*request == NULL)
962                         RETURN(rc);
963
964                 body = lustre_msg_buf((*request)->rq_repmsg, 0,
965                                       sizeof(*body));
966                 if (body == NULL)
967                         RETURN(-ENOMEM);
968                 
969                 CDEBUG(D_OTHER, "created. "DLID4"\n", OLID4(&op_data->id1));
970         } else if (rc == -ERESTART) {
971                 /*
972                  * directory got splitted. time to update local object and
973                  * repeat the request with proper MDS.
974                  */
975                 rc = lmv_get_mea_and_update_object(exp, &op_data->id1);
976                 if (rc == 0) {
977                         ptlrpc_req_finished(*request);
978                         goto repeat;
979                 }
980         }
981         RETURN(rc);
982 }
983
984 static int lmv_done_writing(struct obd_export *exp, struct obdo *obdo)
985 {
986         struct obd_device *obd = exp->exp_obd;
987         struct lmv_obd *lmv = &obd->u.lmv;
988         int rc;
989         ENTRY;
990         
991         rc = lmv_check_connect(obd);
992         if (rc)
993                 RETURN(rc);
994
995         /* FIXME: choose right MDC here */
996         CWARN("this method isn't implemented yet\n");
997         rc = md_done_writing(lmv->tgts[0].ltd_exp, obdo);
998         RETURN(rc);
999 }
1000
1001 static int
1002 lmv_enqueue_slaves(struct obd_export *exp, int locktype,
1003                    struct lookup_intent *it, int lockmode,
1004                    struct mdc_op_data *data, struct lustre_handle *lockh,
1005                    void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
1006                    ldlm_blocking_callback cb_blocking, void *cb_data)
1007 {
1008         struct obd_device *obd = exp->exp_obd;
1009         struct lmv_obd *lmv = &obd->u.lmv;
1010         struct mea *mea = data->mea1;
1011         struct mdc_op_data *data2;
1012         int i, rc, mds;
1013         ENTRY;
1014
1015         OBD_ALLOC(data2, sizeof(*data2));
1016         if (data2 == NULL)
1017                 RETURN(-ENOMEM);
1018         
1019         LASSERT(mea != NULL);
1020         for (i = 0; i < mea->mea_count; i++) {
1021                 memset(data2, 0, sizeof(*data2));
1022                 data2->id1 = mea->mea_ids[i];
1023                 mds = id_group(&data2->id1);
1024                 
1025                 if (lmv->tgts[mds].ltd_exp == NULL)
1026                         continue;
1027
1028                 rc = md_enqueue(lmv->tgts[mds].ltd_exp, locktype, it, 
1029                                 lockmode, data2, lockh + i, lmm, lmmsize, 
1030                                 cb_compl, cb_blocking, cb_data);
1031                 
1032                 CDEBUG(D_OTHER, "take lock on slave "DLID4" -> %d/%d\n",
1033                        OLID4(&mea->mea_ids[i]), rc, LUSTRE_IT(it)->it_status);
1034                 if (rc)
1035                         GOTO(cleanup, rc);
1036                 if (LUSTRE_IT(it)->it_data) {
1037                         struct ptlrpc_request *req;
1038                         req = (struct ptlrpc_request *) LUSTRE_IT(it)->it_data;
1039                         ptlrpc_req_finished(req);
1040                 }
1041                 
1042                 if (LUSTRE_IT(it)->it_status)
1043                         GOTO(cleanup, rc = LUSTRE_IT(it)->it_status);
1044         }
1045         
1046         OBD_FREE(data2, sizeof(*data2));
1047         RETURN(0);
1048 cleanup:
1049         OBD_FREE(data2, sizeof(*data2));
1050         
1051         /* drop all taken locks */
1052         while (--i >= 0) {
1053                 if (lockh[i].cookie)
1054                         ldlm_lock_decref(lockh + i, lockmode);
1055                 lockh[i].cookie = 0;
1056         }
1057         return rc;
1058 }
1059
1060 static int
1061 lmv_enqueue_remote(struct obd_export *exp, int lock_type,
1062                    struct lookup_intent *it, int lock_mode,
1063                    struct mdc_op_data *data, struct lustre_handle *lockh,
1064                    void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
1065                    ldlm_blocking_callback cb_blocking, void *cb_data)
1066 {
1067         struct ptlrpc_request *req = LUSTRE_IT(it)->it_data;
1068         struct obd_device *obd = exp->exp_obd;
1069         struct lmv_obd *lmv = &obd->u.lmv;
1070         struct lustre_handle plock;
1071         struct mdc_op_data rdata;
1072         struct mds_body *body = NULL;
1073         int rc = 0, pmode;
1074         ENTRY;
1075
1076         body = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body));
1077         LASSERT(body != NULL);
1078
1079         if (!(body->valid & OBD_MD_MDS))
1080                 RETURN(0);
1081
1082         CDEBUG(D_OTHER, "ENQUEUE '%s' on "DLID4" -> "DLID4"\n",
1083                LL_IT2STR(it), OLID4(&data->id1), OLID4(&body->id1));
1084
1085         /* we got LOOKUP lock, but we really need attrs */
1086         pmode = LUSTRE_IT(it)->it_lock_mode;
1087         LASSERT(pmode != 0);
1088         memcpy(&plock, lockh, sizeof(plock));
1089         LUSTRE_IT(it)->it_lock_mode = 0;
1090         LUSTRE_IT(it)->it_data = NULL;
1091         LASSERT((body->valid & OBD_MD_FID) != 0);
1092
1093         memcpy(&rdata, data, sizeof(rdata));
1094         rdata.id1 = body->id1;
1095         rdata.name = NULL;
1096         rdata.namelen = 0;
1097
1098         LUSTRE_IT(it)->it_disposition &= ~DISP_ENQ_COMPLETE;
1099         ptlrpc_req_finished(req);
1100
1101         rc = md_enqueue(lmv->tgts[id_group(&rdata.id1)].ltd_exp, 
1102                         lock_type, it, lock_mode, &rdata, lockh, lmm, 
1103                         lmmsize, cb_compl, cb_blocking, cb_data);
1104         ldlm_lock_decref(&plock, pmode);
1105         RETURN(rc);
1106 }
1107
1108 static int
1109 lmv_enqueue(struct obd_export *exp, int lock_type,
1110             struct lookup_intent *it, int lock_mode,
1111             struct mdc_op_data *data, struct lustre_handle *lockh,
1112             void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
1113             ldlm_blocking_callback cb_blocking, void *cb_data)
1114 {
1115         struct obd_device *obd = exp->exp_obd;
1116         struct lmv_obd *lmv = &obd->u.lmv;
1117         struct lmv_obj *obj;
1118         int rc, mds;
1119         ENTRY;
1120
1121         rc = lmv_check_connect(obd);
1122         if (rc)
1123                 RETURN(rc);
1124
1125         if (data->mea1 && it->it_op == IT_UNLINK) {
1126                 rc = lmv_enqueue_slaves(exp, lock_type, it, lock_mode,
1127                                         data, lockh, lmm, lmmsize,
1128                                         cb_compl, cb_blocking, cb_data);
1129                 RETURN(rc);
1130         }
1131
1132         if (data->namelen) {
1133                 obj = lmv_grab_obj(obd, &data->id1);
1134                 if (obj) {
1135                         /* directory is splitted. look for right mds for this
1136                          * name */
1137                         mds = raw_name2idx(obj->hashtype, obj->objcount,
1138                                            (char *)data->name, data->namelen);
1139                         data->id1 = obj->objs[mds].id;
1140                         lmv_put_obj(obj);
1141                 }
1142         }
1143         CDEBUG(D_OTHER, "ENQUEUE '%s' on "DLID4"\n", LL_IT2STR(it),
1144                OLID4(&data->id1));
1145         
1146         rc = md_enqueue(lmv->tgts[id_group(&data->id1)].ltd_exp, 
1147                         lock_type, it, lock_mode, data, lockh, lmm, 
1148                         lmmsize, cb_compl, cb_blocking, cb_data);
1149         if (rc == 0 && it->it_op == IT_OPEN)
1150                 rc = lmv_enqueue_remote(exp, lock_type, it, lock_mode,
1151                                         data, lockh, lmm, lmmsize,
1152                                         cb_compl, cb_blocking, cb_data);
1153         RETURN(rc);
1154 }
1155
1156 static int
1157 lmv_getattr_lock(struct obd_export *exp, struct lustre_id *id,
1158                  char *filename, int namelen, __u64 valid,
1159                  unsigned int ea_size, struct ptlrpc_request **request)
1160 {
1161         int rc, mds = id_group(id), loop = 0;
1162         struct obd_device *obd = exp->exp_obd;
1163         struct lmv_obd *lmv = &obd->u.lmv;
1164         struct lustre_id rid = *id;
1165         struct mds_body *body;
1166         struct lmv_obj *obj;
1167         ENTRY;
1168         
1169         rc = lmv_check_connect(obd);
1170         if (rc)
1171                 RETURN(rc);
1172 repeat:
1173         LASSERT(++loop <= 2);
1174         obj = lmv_grab_obj(obd, id);
1175         if (obj) {
1176                 /* directory is splitted. look for right mds for this name */
1177                 mds = raw_name2idx(obj->hashtype, obj->objcount, 
1178                                    filename, namelen - 1);
1179                 rid = obj->objs[mds].id;
1180                 lmv_put_obj(obj);
1181         }
1182         
1183         CDEBUG(D_OTHER, "getattr_lock for %*s on "DLID4" -> "DLID4"\n",
1184                namelen, filename, OLID4(id), OLID4(&rid));
1185
1186         rc = md_getattr_lock(lmv->tgts[id_group(&rid)].ltd_exp,
1187                              &rid, filename, namelen,
1188                              valid == OBD_MD_FLID ? valid : valid | OBD_MD_FID,
1189                              ea_size, request);
1190         if (rc == 0) {
1191                 /*
1192                  * this could be cross-node reference. in this case all we have
1193                  * right now is lustre_id triple. we'd like to find other
1194                  * attributes.
1195                  */
1196                 body = lustre_msg_buf((*request)->rq_repmsg, 0, sizeof(*body));
1197                 LASSERT(body != NULL);
1198                 LASSERT((body->valid & OBD_MD_FID) != 0
1199                                 || body->valid == OBD_MD_FLID);
1200
1201                 if (body->valid & OBD_MD_MDS) {
1202                         struct ptlrpc_request *req = NULL;
1203                         
1204                         rid = body->id1;
1205                         CDEBUG(D_OTHER, "request attrs for "DLID4"\n", OLID4(&rid));
1206
1207                         rc = md_getattr_lock(lmv->tgts[id_group(&rid)].ltd_exp, 
1208                                              &rid, NULL, 1, valid, ea_size, &req);
1209                         ptlrpc_req_finished(*request);
1210                         *request = req;
1211                 }
1212         } else if (rc == -ERESTART) {
1213                 /* directory got splitted. time to update local object and
1214                  * repeat the request with proper MDS */
1215                 rc = lmv_get_mea_and_update_object(exp, &rid);
1216                 if (rc == 0) {
1217                         ptlrpc_req_finished(*request);
1218                         goto repeat;
1219                 }
1220         }
1221         RETURN(rc);
1222 }
1223
1224 /*
1225  * llite passes id of an target inode in data->id1 and id of directory in
1226  * data->id2
1227  */
1228 static int lmv_link(struct obd_export *exp, struct mdc_op_data *data,
1229                     struct ptlrpc_request **request)
1230 {
1231         struct obd_device *obd = exp->exp_obd;
1232         struct lmv_obd *lmv = &obd->u.lmv;
1233         struct lmv_obj *obj;
1234         int rc, mds;
1235         ENTRY;
1236         
1237         rc = lmv_check_connect(obd);
1238         if (rc)
1239                 RETURN(rc);
1240
1241         if (data->namelen != 0) {
1242                 /* usual link request */
1243                 obj = lmv_grab_obj(obd, &data->id2);
1244                 if (obj) {
1245                         rc = raw_name2idx(obj->hashtype, obj->objcount, 
1246                                           data->name, data->namelen);
1247                         data->id2 = obj->objs[rc].id;
1248                         lmv_put_obj(obj);
1249                 }
1250
1251                 mds = id_group(&data->id2);
1252                 
1253                 CDEBUG(D_OTHER,"link "DLID4":%*s to "DLID4"\n",
1254                        OLID4(&data->id2), data->namelen, data->name,
1255                        OLID4(&data->id1));
1256         } else {
1257                 mds = id_group(&data->id1);
1258                 
1259                 /* request from MDS to acquire i_links for inode by id1 */
1260                 CDEBUG(D_OTHER, "inc i_nlinks for "DLID4"\n",
1261                        OLID4(&data->id1));
1262         }
1263
1264         CDEBUG(D_OTHER, "forward to MDS #%u ("DLID4")\n",
1265                mds, OLID4(&data->id1));
1266         rc = md_link(lmv->tgts[mds].ltd_exp, data, request);
1267         
1268         RETURN(rc);
1269 }
1270
1271 static int lmv_rename(struct obd_export *exp, struct mdc_op_data *data,
1272                       const char *old, int oldlen, const char *new, int newlen,
1273                       struct ptlrpc_request **request)
1274 {
1275         struct obd_device *obd = exp->exp_obd;
1276         struct lmv_obd *lmv = &obd->u.lmv;
1277         struct lmv_obj *obj;
1278         int rc, mds;
1279         ENTRY;
1280
1281         CDEBUG(D_OTHER, "rename %*s in "DLID4" to %*s in "DLID4"\n",
1282                oldlen, old, OLID4(&data->id1), newlen, new,
1283                OLID4(&data->id2));
1284
1285         rc = lmv_check_connect(obd);
1286         if (rc)
1287                 RETURN(rc);
1288
1289         if (oldlen == 0) {
1290                 /*
1291                  * MDS with old dir entry is asking another MDS to create name
1292                  * there.
1293                  */
1294                 CDEBUG(D_OTHER,
1295                        "create %*s(%d/%d) in "DLID4" pointing "
1296                        "to "DLID4"\n", newlen, new, oldlen, newlen,
1297                        OLID4(&data->id2), OLID4(&data->id1));
1298
1299                 mds = id_group(&data->id2);
1300
1301                 /* 
1302                  * target directory can be splitted, sowe should forward request
1303                  * to the right MDS.
1304                  */
1305                 obj = lmv_grab_obj(obd, &data->id2);
1306                 if (obj) {
1307                         mds = raw_name2idx(obj->hashtype, obj->objcount, 
1308                                            (char *)new, newlen);
1309                         data->id2 = obj->objs[mds].id;
1310                         CDEBUG(D_OTHER, "forward to MDS #%u ("DLID4")\n", mds,
1311                                OLID4(&data->id2));
1312                         lmv_put_obj(obj);
1313                 }
1314                 goto request;
1315         }
1316
1317         obj = lmv_grab_obj(obd, &data->id1);
1318         if (obj) {
1319                 /*
1320                  * directory is already splitted, so we have to forward request
1321                  * to the right MDS.
1322                  */
1323                 mds = raw_name2idx(obj->hashtype, obj->objcount, 
1324                                    (char *)old, oldlen);
1325                 data->id1 = obj->objs[mds].id;
1326                 CDEBUG(D_OTHER, "forward to MDS #%u ("DLID4")\n", mds,
1327                        OLID4(&data->id1));
1328                 lmv_put_obj(obj);
1329         }
1330
1331         obj = lmv_grab_obj(obd, &data->id2);
1332         if (obj) {
1333                 /*
1334                  * directory is already splitted, so we have to forward request
1335                  * to the right MDS.
1336                  */
1337                 mds = raw_name2idx(obj->hashtype, obj->objcount, 
1338                                    (char *)new, newlen);
1339                 
1340                 data->id2 = obj->objs[mds].id;
1341                 CDEBUG(D_OTHER, "forward to MDS #%u ("DLID4")\n", mds,
1342                        OLID4(&data->id2));
1343                 lmv_put_obj(obj);
1344         }
1345         
1346         mds = id_group(&data->id1);
1347
1348 request:
1349         if (id_group(&data->id1) != id_group(&data->id2)) {
1350                 CDEBUG(D_OTHER,"cross-node rename "DLID4"/%*s to "DLID4"/%*s\n",
1351                        OLID4(&data->id1), oldlen, old, OLID4(&data->id2),
1352                        newlen, new);
1353         }
1354
1355         rc = md_rename(lmv->tgts[mds].ltd_exp, data, old, oldlen,
1356                        new, newlen, request); 
1357         RETURN(rc);
1358 }
1359
1360 static int lmv_setattr(struct obd_export *exp, struct mdc_op_data *data,
1361                        struct iattr *iattr, void *ea, int ealen, void *ea2,
1362                        int ea2len, void *ea3, int ea3len, 
1363                        struct ptlrpc_request **request)
1364 {
1365         struct obd_device *obd = exp->exp_obd;
1366         struct lmv_obd *lmv = &obd->u.lmv;
1367         struct ptlrpc_request *req;
1368         struct mds_body *body;
1369         struct lmv_obj *obj;
1370         int rc = 0, i;
1371         ENTRY;
1372
1373         rc = lmv_check_connect(obd);
1374         if (rc)
1375                 RETURN(rc);
1376
1377         obj = lmv_grab_obj(obd, &data->id1);
1378         
1379         CDEBUG(D_OTHER, "SETATTR for "DLID4", valid 0x%x%s\n",
1380                OLID4(&data->id1), iattr->ia_valid, obj ? ", splitted" : "");
1381         
1382         if (obj) {
1383                 for (i = 0; i < obj->objcount; i++) {
1384                         data->id1 = obj->objs[i].id;
1385                         
1386                         rc = md_setattr(lmv->tgts[id_group(&data->id1)].ltd_exp, 
1387                                         data, iattr, ea, ealen, ea2, ea2len, 
1388                                         ea3, ea3len, &req);
1389
1390                         if (id_equal_fid(&obj->id, &obj->objs[i].id)) {
1391                                 /*
1392                                  * this is master object and this request should
1393                                  * be returned back to llite.
1394                                  */
1395                                 *request = req;
1396                         } else {
1397                                 ptlrpc_req_finished(req);
1398                         }
1399
1400                         if (rc)
1401                                 break;
1402                 }
1403                 lmv_put_obj(obj);
1404         } else {
1405                 LASSERT(id_group(&data->id1) < lmv->desc.ld_tgt_count);
1406                 rc = md_setattr(lmv->tgts[id_group(&data->id1)].ltd_exp,
1407                                 data, iattr, ea, ealen, ea2, ea2len, ea3,
1408                                 ea3len, request); 
1409                 if (rc == 0) {
1410                         body = lustre_msg_buf((*request)->rq_repmsg, 0,
1411                                               sizeof(*body));
1412                         LASSERT(body != NULL);
1413                         LASSERT((body->valid & OBD_MD_FID) != 0);
1414                         LASSERT(id_group(&body->id1) == id_group(&data->id1));
1415                 }
1416         }
1417         RETURN(rc);
1418 }
1419
1420 static int lmv_sync(struct obd_export *exp, struct lustre_id *id,
1421                     struct ptlrpc_request **request)
1422 {
1423         struct obd_device *obd = exp->exp_obd;
1424         struct lmv_obd *lmv = &obd->u.lmv;
1425         int rc;
1426         ENTRY;
1427
1428         rc = lmv_check_connect(obd);
1429         if (rc)
1430                 RETURN(rc);
1431
1432         rc = md_sync(lmv->tgts[id_group(id)].ltd_exp, 
1433                      id, request);
1434         RETURN(rc);
1435 }
1436
1437 int lmv_dirobj_blocking_ast(struct ldlm_lock *lock, 
1438                             struct ldlm_lock_desc *desc,
1439                             void *data, int flag)
1440 {
1441         struct lustre_handle lockh;
1442         struct lmv_obj *obj;
1443         int rc;
1444         ENTRY;
1445
1446         switch (flag) {
1447         case LDLM_CB_BLOCKING:
1448                 ldlm_lock2handle(lock, &lockh);
1449                 rc = ldlm_cli_cancel(&lockh);
1450                 if (rc < 0) {
1451                         CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
1452                         RETURN(rc);
1453                 }
1454                 break;
1455         case LDLM_CB_CANCELING:
1456                 /* time to drop cached attrs for dirobj */
1457                 obj = lock->l_ast_data;
1458                 if (obj) {
1459                         CDEBUG(D_OTHER, "cancel %s on "LPU64"/"LPU64
1460                                ", master "DLID4"\n",
1461                                lock->l_resource->lr_name.name[3] == 1 ?
1462                                "LOOKUP" : "UPDATE",
1463                                lock->l_resource->lr_name.name[0],
1464                                lock->l_resource->lr_name.name[1], 
1465                                OLID4(&obj->id));
1466                         lmv_put_obj(obj);
1467                 }
1468                 break;
1469         default:
1470                 LBUG();
1471         }
1472         RETURN(0);
1473 }
1474
1475 static void lmv_remove_dots(struct page *page)
1476 {
1477         unsigned limit = PAGE_CACHE_SIZE;
1478         char *kaddr = page_address(page);
1479         struct ext2_dir_entry_2 *p;
1480         unsigned offs, rec_len;
1481
1482         for (offs = 0; offs <= limit - EXT2_DIR_REC_LEN(1); offs += rec_len) {
1483                 p = (struct ext2_dir_entry_2 *)(kaddr + offs);
1484                 rec_len = le16_to_cpu(p->rec_len);
1485
1486                 if ((p->name_len == 1 && p->name[0] == '.') ||
1487                     (p->name_len == 2 && p->name[0] == '.' && p->name[1] == '.'))
1488                         p->inode = 0;
1489         }
1490 }
1491
1492 static int lmv_readpage(struct obd_export *exp, struct lustre_id *id,
1493                         __u64 offset, struct page *page,
1494                         struct ptlrpc_request **request)
1495 {
1496         struct obd_device *obd = exp->exp_obd;
1497         struct lmv_obd *lmv = &obd->u.lmv;
1498         struct lustre_id rid = *id;
1499         struct lmv_obj *obj;
1500         int rc, i;
1501         ENTRY;
1502
1503 #warning "we need well-desgined readdir() implementation"
1504         rc = lmv_check_connect(obd);
1505         if (rc)
1506                 RETURN(rc);
1507
1508         LASSERT(id_group(id) < lmv->desc.ld_tgt_count);
1509         CDEBUG(D_OTHER, "READPAGE at %llu from "DLID4"\n",
1510                offset, OLID4(&rid));
1511
1512         obj = lmv_grab_obj(obd, id);
1513         if (obj) {
1514                 lmv_lock_obj(obj);
1515
1516                 /* find dirobj containing page with requested offset. */
1517                 for (i = 0; i < obj->objcount; i++) {
1518                         if (offset < obj->objs[i].size)
1519                                 break;
1520                         offset -= obj->objs[i].size;
1521                 }
1522                 rid = obj->objs[i].id;
1523                 
1524                 lmv_unlock_obj(obj);
1525                 lmv_put_obj(obj);
1526                 
1527                 CDEBUG(D_OTHER, "forward to "DLID4" with offset %lu\n",
1528                        OLID4(&rid), (unsigned long)offset);
1529         }
1530         rc = md_readpage(lmv->tgts[id_group(&rid)].ltd_exp, &rid, 
1531                          offset, page, request);
1532         
1533         if (rc == 0 && !id_equal_fid(&rid, id))
1534                 /* this page isn't from master object. To avoid "." and ".." 
1535                  * duplication in directory, we have to remove them from all
1536                  * slave objects */
1537                 lmv_remove_dots(page);
1538         
1539         RETURN(rc);
1540 }
1541
1542 static int lmv_unlink_slaves(struct obd_export *exp, struct mdc_op_data *data,
1543                              struct ptlrpc_request **req)
1544 {
1545         struct obd_device *obd = exp->exp_obd;
1546         struct lmv_obd *lmv = &obd->u.lmv;
1547         struct mea *mea = data->mea1;
1548         struct mdc_op_data *data2;
1549         int i, rc = 0;
1550         ENTRY;
1551
1552         OBD_ALLOC(data2, sizeof(*data2));
1553         if (data2 == NULL)
1554                 RETURN(-ENOMEM);
1555         
1556         LASSERT(mea != NULL);
1557         for (i = 0; i < mea->mea_count; i++) {
1558                 memset(data2, 0, sizeof(*data2));
1559                 data2->id1 = mea->mea_ids[i];
1560                 data2->create_mode = MDS_MODE_DONT_LOCK | S_IFDIR;
1561                 
1562                 if (lmv->tgts[id_group(&data2->id1)].ltd_exp == NULL)
1563                         continue;
1564
1565                 rc = md_unlink(lmv->tgts[id_group(&data2->id1)].ltd_exp,
1566                                data2, req);
1567                 
1568                 CDEBUG(D_OTHER, "unlink slave "DLID4" -> %d\n",
1569                        OLID4(&mea->mea_ids[i]), rc);
1570                 
1571                 if (*req) {
1572                         ptlrpc_req_finished(*req);
1573                         *req = NULL;
1574                 }
1575                 if (rc)
1576                         RETURN(rc);
1577         }
1578         OBD_FREE(data2, sizeof(*data2));
1579         RETURN(rc);
1580 }
1581
1582 static int lmv_delete_inode(struct obd_export *exp, struct lustre_id *id)
1583 {
1584         ENTRY;
1585
1586         LASSERT(exp && id);
1587         if (lmv_delete_obj(exp, id)) {
1588                 CDEBUG(D_OTHER, "lmv object "DLID4" is destroyed.\n",
1589                        OLID4(id));
1590         }
1591         RETURN(0);
1592 }
1593
1594 static int lmv_unlink(struct obd_export *exp, struct mdc_op_data *data,
1595                       struct ptlrpc_request **request)
1596 {
1597         struct obd_device *obd = exp->exp_obd;
1598         struct lmv_obd *lmv = &obd->u.lmv;
1599         int rc, i = 0;
1600         ENTRY;
1601         
1602         rc = lmv_check_connect(obd);
1603         if (rc)
1604                 RETURN(rc);
1605
1606         if (data->namelen == 0 && data->mea1 != NULL) {
1607                 /* mds asks to remove slave objects */
1608                 rc = lmv_unlink_slaves(exp, data, request);
1609                 RETURN(rc);
1610         }
1611
1612         if (data->namelen != 0) {
1613                 struct lmv_obj *obj;
1614                 
1615                 obj = lmv_grab_obj(obd, &data->id1);
1616                 if (obj) {
1617                         i = raw_name2idx(obj->hashtype, obj->objcount,
1618                                          data->name, data->namelen);
1619                         data->id1 = obj->objs[i].id;
1620                         lmv_put_obj(obj);
1621                 }
1622                 CDEBUG(D_OTHER, "unlink '%*s' in "DLID4" -> %u\n",
1623                        data->namelen, data->name, OLID4(&data->id1),
1624                        i);
1625         } else {
1626                 CDEBUG(D_OTHER, "drop i_nlink on "DLID4"\n",
1627                        OLID4(&data->id1));
1628         }
1629         rc = md_unlink(lmv->tgts[id_group(&data->id1)].ltd_exp, 
1630                        data, request);
1631         RETURN(rc);
1632 }
1633
1634 static struct obd_device *lmv_get_real_obd(struct obd_export *exp,
1635                                            struct lustre_id *id)
1636 {
1637         struct obd_device *obd = exp->exp_obd;
1638         struct lmv_obd *lmv = &obd->u.lmv;
1639         int rc;
1640         ENTRY;
1641
1642         rc = lmv_check_connect(obd);
1643         if (rc)
1644                 RETURN(ERR_PTR(rc));
1645         obd = lmv->tgts[id_group(id)].ltd_exp->exp_obd;
1646         EXIT;
1647         
1648         return obd;
1649 }
1650
1651 static int lmv_init_ea_size(struct obd_export *exp, int easize,
1652                             int cookiesize)
1653 {
1654         struct obd_device *obd = exp->exp_obd;
1655         struct lmv_obd *lmv = &obd->u.lmv;
1656         int i, rc = 0, change = 0;
1657         ENTRY;
1658
1659         if (lmv->max_easize < easize) {
1660                 lmv->max_easize = easize;
1661                 change = 1;
1662         }
1663         if (lmv->max_cookiesize < cookiesize) {
1664                 lmv->max_cookiesize = cookiesize;
1665                 change = 1;
1666         }
1667         if (change == 0)
1668                 RETURN(0);
1669         
1670         if (lmv->connected == 0)
1671                 RETURN(0);
1672
1673         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1674                 if (lmv->tgts[i].ltd_exp == NULL) {
1675                         CWARN("%s: NULL export for %d\n", obd->obd_name, i);
1676                         continue;
1677                 }
1678
1679                 rc = obd_init_ea_size(lmv->tgts[i].ltd_exp, easize, cookiesize);
1680                 if (rc) {
1681                         CERROR("obd_init_ea_size() failed on MDT target %d, "
1682                                "error %d.\n", i, rc);
1683                         break;
1684                 }
1685         }
1686         RETURN(rc);
1687 }
1688
1689 static int lmv_obd_create_single(struct obd_export *exp, struct obdo *oa,
1690                                  void *acl, int acl_size,
1691                                  struct lov_stripe_md **ea,
1692                                  struct obd_trans_info *oti)
1693 {
1694         struct obd_device *obd = exp->exp_obd;
1695         struct lmv_obd *lmv = &obd->u.lmv;
1696         struct lov_stripe_md obj_md;
1697         struct lov_stripe_md *obj_mdp = &obj_md;
1698         int rc = 0;
1699         ENTRY;
1700
1701         LASSERT(ea == NULL);
1702         LASSERT(oa->o_mds < lmv->desc.ld_tgt_count);
1703
1704         rc = obd_create(lmv->tgts[oa->o_mds].ltd_exp, oa,
1705                         acl, acl_size, &obj_mdp, oti);
1706
1707         RETURN(rc);
1708 }
1709
1710 /*
1711  * to be called from MDS only. @oa should have correct store cookie and o_fid
1712  * values for "master" object, as it will be used.
1713  */
1714 int lmv_obd_create(struct obd_export *exp, struct obdo *oa,
1715                    void *acl, int acl_size,
1716                    struct lov_stripe_md **ea, struct obd_trans_info *oti)
1717 {
1718         struct obd_device *obd = exp->exp_obd;
1719         struct lmv_obd *lmv = &obd->u.lmv;
1720         struct lustre_id mid;
1721         int i, c, rc = 0;
1722         struct mea *mea;
1723         ENTRY;
1724
1725         rc = lmv_check_connect(obd);
1726         if (rc)
1727                 RETURN(rc);
1728
1729         LASSERT(oa != NULL);
1730         
1731         if (ea == NULL) {
1732                 rc = lmv_obd_create_single(exp, oa, acl, acl_size, NULL, oti);
1733                 if (rc)
1734                         CERROR("Can't create object, rc = %d\n", rc);
1735                 RETURN(rc);
1736         }
1737
1738         /* acl is only suppied when mds create single remote obj */
1739         LASSERT(acl == NULL && acl_size == 0);
1740
1741         if (*ea == NULL) {
1742                 rc = obd_alloc_diskmd(exp, (struct lov_mds_md **)ea);
1743                 if (rc < 0) {
1744                         CERROR("obd_alloc_diskmd() failed, error %d\n",
1745                                rc);
1746                         RETURN(rc);
1747                 } else
1748                         rc = 0;
1749                 
1750                 if (*ea == NULL)
1751                         RETURN(-ENOMEM);
1752         }
1753
1754         /* 
1755          * here we should take care about splitted dir, so store cookie and fid
1756          * for "master" object should already be allocated and passed in @oa.
1757          */
1758         LASSERT(oa->o_id != 0);
1759         LASSERT(oa->o_fid != 0);
1760
1761         /* save "master" object id */
1762         obdo2id(&mid, oa);
1763
1764         mea = (struct mea *)*ea;
1765         mea->mea_master = -1;
1766         mea->mea_magic = MEA_MAGIC_ALL_CHARS;
1767
1768         if (!mea->mea_count || mea->mea_count > lmv->desc.ld_tgt_count)
1769                 mea->mea_count = lmv->desc.ld_tgt_count;
1770
1771         for (i = 0, c = 0; c < mea->mea_count && i < lmv->desc.ld_tgt_count; i++) {
1772                 struct lov_stripe_md obj_md;
1773                 struct lov_stripe_md *obj_mdp = &obj_md;
1774                
1775                 if (lmv->tgts[i].ltd_exp == NULL) {
1776                         /* this is "master" MDS */
1777                         mea->mea_master = i;
1778                         mea->mea_ids[c] = mid;
1779                         c++;
1780                         continue;
1781                 }
1782
1783                 /*
1784                  * "master" MDS should always be part of stripped dir,
1785                  * so scan for it.
1786                  */
1787                 if (mea->mea_master == -1 && c == mea->mea_count - 1)
1788                         continue;
1789
1790                 oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLTYPE | OBD_MD_FLMODE |
1791                         OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLID;
1792
1793                 rc = obd_create(lmv->tgts[c].ltd_exp, oa, NULL, 0,
1794                                 &obj_mdp, oti);
1795                 if (rc) {
1796                         CERROR("obd_create() failed on MDT target %d, "
1797                                "error %d\n", c, rc);
1798                         RETURN(rc);
1799                 }
1800
1801                 CDEBUG(D_OTHER, "dirobj at mds %d: "LPU64"/%u\n",
1802                        i, oa->o_id, oa->o_generation);
1803
1804
1805                 /*
1806                  * here, when object is created (or it is master and was passed
1807                  * from caller) on desired MDS we save its fid to local mea_ids.
1808                  */
1809                 LASSERT(oa->o_fid);
1810
1811                 /* 
1812                  * store cookie should be defined here for both cases (master
1813                  * object and not master), because master is already created.
1814                  */
1815                 LASSERT(oa->o_id);
1816
1817                 /* fill mea by store cookie and fid */
1818                 obdo2id(&mea->mea_ids[c], oa);
1819                 c++;
1820         }
1821         LASSERT(c == mea->mea_count);
1822
1823         CDEBUG(D_OTHER, "%d dirobjects created\n",
1824                (int)mea->mea_count);
1825         
1826         RETURN(rc);
1827 }
1828
1829 static int lmv_llog_init(struct obd_device *obd, struct obd_llogs *llogs, 
1830                          struct obd_device *tgt, int count,
1831                          struct llog_catid *logid)
1832 {
1833         struct llog_ctxt *ctxt;
1834         int rc;
1835         ENTRY;
1836
1837         rc = obd_llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
1838                             &llog_client_ops);
1839         if (rc == 0) {
1840                 ctxt = llog_get_context(llogs, LLOG_CONFIG_REPL_CTXT);
1841                 ctxt->loc_imp = tgt->u.cli.cl_import;
1842         }
1843
1844         RETURN(rc);
1845 }
1846
1847 static int lmv_llog_finish(struct obd_device *obd,
1848                            struct obd_llogs *llogs, int count)
1849 {
1850         int rc;
1851         ENTRY;
1852
1853         rc = obd_llog_cleanup(llog_get_context(llogs, LLOG_CONFIG_REPL_CTXT));
1854         RETURN(rc);
1855 }
1856
1857 static int lmv_get_info(struct obd_export *exp, __u32 keylen,
1858                         void *key, __u32 *vallen, void *val)
1859 {
1860         struct obd_device *obd;
1861         struct lmv_obd *lmv;
1862         int rc = 0;
1863         ENTRY;
1864
1865         obd = class_exp2obd(exp);
1866         if (obd == NULL) {
1867                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
1868                        exp->exp_handle.h_cookie);
1869                 RETURN(-EINVAL);
1870         }
1871
1872         lmv = &obd->u.lmv;
1873         if (keylen == strlen("mdsize") && !strcmp(key, "mdsize")) {
1874                 __u32 *mdsize = val;
1875                 *vallen = sizeof(__u32);
1876                 *mdsize = sizeof(struct lustre_id) * lmv->desc.ld_tgt_count
1877                         + sizeof(struct mea);
1878                 RETURN(0);
1879         } else if (keylen == strlen("mdsnum") && !strcmp(key, "mdsnum")) {
1880                 struct obd_uuid *cluuid = &lmv->cluuid;
1881                 struct lmv_tgt_desc *tgts;
1882                 __u32 *mdsnum = val;
1883                 int i;
1884
1885                 for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgts++) {
1886                         if (obd_uuid_equals(&tgts->uuid, cluuid)) {
1887                                 *vallen = sizeof(__u32);
1888                                 *mdsnum = i;
1889                                 RETURN(0);
1890                         }
1891                 }
1892                 LASSERT(0);
1893         } else if (keylen == strlen("rootid") && !strcmp(key, "rootid")) {
1894                 rc = lmv_check_connect(obd);
1895                 if (rc)
1896                         RETURN(rc);
1897                 
1898                 /* getting rootid from first MDS. */
1899                 rc = obd_get_info(lmv->tgts[0].ltd_exp, keylen, key,
1900                                   vallen, val);
1901                 RETURN(rc);
1902         } else if (keylen >= strlen("lmvdesc") && !strcmp(key, "lmvdesc")) {
1903                 struct lmv_desc *desc_ret = val;
1904                 *desc_ret = lmv->desc;
1905                 RETURN(0);
1906         } else if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
1907                 struct lmv_tgt_desc *tgts;
1908                 int i;
1909
1910                 rc = lmv_check_connect(obd);
1911                 if (rc)
1912                         RETURN(rc);
1913                 
1914                 LASSERT(*vallen == sizeof(__u32));
1915                 for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count;
1916                      i++, tgts++) {
1917
1918                         /* all tgts should be connected when this get called. */
1919                         if (!tgts || !tgts->ltd_exp) {
1920                                 CERROR("target not setup?\n");
1921                                 continue;
1922                         }
1923
1924                         if (!obd_get_info(tgts->ltd_exp, keylen, key,
1925                                           vallen, val))
1926                                 RETURN(0);
1927                 }
1928                 RETURN(-EINVAL);
1929         } else if (keylen >= strlen("lovdesc") && !strcmp(key, "lovdesc")) {
1930                 rc = lmv_check_connect(obd);
1931                 if (rc)
1932                         RETURN(rc);
1933
1934                 /* forwarding this request to first MDS, it should know LOV
1935                  * desc. */
1936                 rc = obd_get_info(lmv->tgts[0].ltd_exp, keylen, key,
1937                                   vallen, val);
1938                 RETURN(rc);
1939         } else if (keylen >= strlen("getext") && !strcmp(key, "getext")) {
1940                 struct lmv_tgt_desc *tgts;
1941                 int i;
1942
1943                 rc = lmv_check_connect(obd);
1944                 if (rc)
1945                         RETURN(rc);
1946
1947                 LASSERT(*vallen == sizeof(struct fid_extent));
1948                 for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count;
1949                      i++, tgts++) {
1950
1951                         /* all tgts should be connected when this get called. */
1952                         if (!tgts || !tgts->ltd_exp) {
1953                                 CERROR("target not setup?\n");
1954                                 continue;
1955                         }
1956
1957                         rc = obd_get_info(tgts->ltd_exp, keylen, key,
1958                                           vallen, val);
1959                         if (rc)
1960                                 RETURN(rc);
1961                 }
1962                 RETURN(0);
1963         }
1964
1965         CDEBUG(D_IOCTL, "invalid key\n");
1966         RETURN(-EINVAL);
1967 }
1968
1969 int lmv_set_info(struct obd_export *exp, obd_count keylen,
1970                  void *key, obd_count vallen, void *val)
1971 {
1972         struct lmv_tgt_desc    *tgt;
1973         struct obd_device      *obd;
1974         struct lmv_obd         *lmv;
1975         ENTRY;
1976
1977         obd = class_exp2obd(exp);
1978         if (obd == NULL) {
1979                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
1980                        exp->exp_handle.h_cookie);
1981                 RETURN(-EINVAL);
1982         }
1983         lmv = &obd->u.lmv;
1984
1985         if (keylen >= strlen("inter_mds") && strcmp(key, "inter_mds") == 0) {
1986                 lmv->server_timeout = 1;
1987                 lmv_set_timeouts(obd);
1988                 RETURN(0);
1989         }
1990
1991         /* maybe this could be default */
1992         if ((keylen == strlen("sec") && strcmp(key, "sec") == 0) ||
1993             (keylen == strlen("sec_flags") && strcmp(key, "sec_flags") == 0) ||
1994             (keylen == strlen("nllu") && strcmp(key, "nllu") == 0)) {
1995                 struct obd_export *exp;
1996                 int rc = 0, err, i;
1997
1998                 spin_lock(&lmv->lmv_lock);
1999                 for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count;
2000                      i++, tgt++) {
2001                         exp = tgt->ltd_exp;
2002                         /* during setup time the connections to mdc might
2003                          * haven't been established.
2004                          */
2005                         if (exp == NULL) {
2006                                 struct obd_device *tgt_obd;
2007
2008                                 tgt_obd = class_find_client_obd(&tgt->uuid,
2009                                                                 OBD_MDC_DEVICENAME,
2010                                                                 &obd->obd_uuid);
2011                                 if (!tgt_obd) {
2012                                         CERROR("can't set info %s, "
2013                                                "device %s not attached?\n",
2014                                                 (char *) key, tgt->uuid.uuid);
2015                                         rc = -EINVAL;
2016                                         continue;
2017                                 }
2018                                 exp = tgt_obd->obd_self_export;
2019                         }
2020
2021                         err = obd_set_info(exp, keylen, key, vallen, val);
2022                         if (!rc)
2023                                 rc = err;
2024                 }
2025                 spin_unlock(&lmv->lmv_lock);
2026
2027                 RETURN(rc);
2028         }
2029         if (keylen == 5 && strcmp(key, "audit") == 0) {
2030                 struct audit_attr_msg * msg = val;
2031                 int mds = id_group(&msg->id);
2032                 int i, rc = 0;
2033                 LASSERT(mds < lmv->desc.ld_tgt_count);
2034                 
2035                 if (IS_AUDIT_OP(msg->attr, AUDIT_FS)) {
2036                         //FS audit, send message to all mds
2037                         for (i = 0; i < lmv->desc.ld_tgt_count;i++) {
2038                                 obd_set_info(lmv->tgts[i].ltd_exp, 
2039                                                   keylen, key, vallen, val);
2040                         }
2041                 }
2042                 else if (IS_AUDIT_OP(msg->attr, AUDIT_DIR)) {
2043                         //audit for dir.
2044                         //if dir is splitted, send RPC to all mds involved
2045                         struct lmv_obj *obj;
2046                         struct lustre_id rid;
2047                         int i;
2048                         
2049                         obj = lmv_grab_obj(obd, &msg->id);
2050                         if (obj) {
2051                                 lmv_lock_obj(obj);
2052                                 for (i = 0; i < obj->objcount; i++) {
2053                                         rid = obj->objs[i].id;
2054                                         mds = id_group(&rid);
2055                                         obd_set_info(lmv->tgts[mds].ltd_exp,
2056                                                           keylen, key,
2057                                                           vallen, val);
2058                                 }
2059                                 lmv_unlock_obj(obj);
2060                                 lmv_put_obj(obj);
2061                         }
2062                         else {
2063                                 rc = obd_set_info(lmv->tgts[mds].ltd_exp,
2064                                                  keylen, key, vallen, val);
2065                         }
2066                 }
2067                 else {
2068                         //set audit for file
2069                         rc = obd_set_info(lmv->tgts[mds].ltd_exp,
2070                                           keylen, key, vallen, val);                        
2071                 }
2072                 RETURN(rc);
2073         }
2074         if (((keylen == strlen("flush_cred") &&
2075              strcmp(key, "flush_cred") == 0)) || 
2076              ((keylen == strlen("crypto_type") &&
2077              strcmp(key, "crypto_type") == 0))) {
2078                 int rc = 0, i;
2079
2080                 for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count;
2081                      i++, tgt++) {
2082                         if (!tgt->ltd_exp)
2083                                 continue;
2084                         rc = obd_set_info(tgt->ltd_exp,
2085                                           keylen, key, vallen, val);
2086                         if (rc)
2087                                 RETURN(rc);
2088                 }
2089
2090                 RETURN(0);
2091         }
2092
2093         RETURN(-EINVAL);
2094 }
2095
2096 int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
2097                struct lov_stripe_md *lsm)
2098 {
2099         struct obd_device *obd = class_exp2obd(exp);
2100         struct lmv_obd *lmv = &obd->u.lmv;
2101         struct mea *meap, *lsmp;
2102         int mea_size, i;
2103         ENTRY;
2104
2105         mea_size = (sizeof(struct lustre_id) * 
2106                     lmv->desc.ld_tgt_count) + sizeof(struct mea);
2107         if (!lmmp)
2108                 RETURN(mea_size);
2109
2110         if (*lmmp && !lsm) {
2111                 OBD_FREE(*lmmp, mea_size);
2112                 *lmmp = NULL;
2113                 RETURN(0);
2114         }
2115
2116         if (*lmmp == NULL) {
2117                 OBD_ALLOC(*lmmp, mea_size);
2118                 if (*lmmp == NULL)
2119                         RETURN(-ENOMEM);
2120         }
2121
2122         if (!lsm)
2123                 RETURN(mea_size);
2124
2125         lsmp = (struct mea *)lsm;
2126         meap = (struct mea *)*lmmp;
2127
2128         if (lsmp->mea_magic != MEA_MAGIC_LAST_CHAR &&
2129             lsmp->mea_magic != MEA_MAGIC_ALL_CHARS)
2130                 RETURN(-EINVAL);
2131
2132         meap->mea_magic = cpu_to_le32(lsmp->mea_magic);
2133         meap->mea_count = cpu_to_le32(lsmp->mea_count);
2134         meap->mea_master = cpu_to_le32(lsmp->mea_master);
2135
2136         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2137                 meap->mea_ids[i] = meap->mea_ids[i];
2138                 id_cpu_to_le(&meap->mea_ids[i]);
2139         }
2140
2141         RETURN(mea_size);
2142 }
2143
2144 int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
2145                  struct lov_mds_md *lmm, int lmm_size)
2146 {
2147         struct obd_device *obd = class_exp2obd(exp);
2148         struct mea **tmea = (struct mea **)lsmp;
2149         struct mea *mea = (struct mea *)lmm;
2150         struct lmv_obd *lmv = &obd->u.lmv;
2151         int mea_size, i, rc = 0;
2152         __u32 magic;
2153         ENTRY;
2154
2155         mea_size = sizeof(struct lustre_id) * 
2156                 lmv->desc.ld_tgt_count + sizeof(struct mea);
2157
2158         if (lsmp == NULL)
2159                 return mea_size;
2160
2161         if (*lsmp != NULL && lmm == NULL) {
2162                 OBD_FREE(*tmea, mea_size);
2163                 RETURN(0);
2164         }
2165
2166         LASSERT(mea_size == lmm_size);
2167
2168         OBD_ALLOC(*tmea, mea_size);
2169         if (*tmea == NULL)
2170                 RETURN(-ENOMEM);
2171
2172         if (!lmm)
2173                 RETURN(mea_size);
2174
2175         if (mea->mea_magic == MEA_MAGIC_LAST_CHAR ||
2176             mea->mea_magic == MEA_MAGIC_ALL_CHARS)
2177         {
2178                 magic = le32_to_cpu(mea->mea_magic);
2179         } else {
2180                 struct mea_old *old = (struct mea_old *)lmm;
2181         
2182                 mea_size = sizeof(struct lustre_id) * old->mea_count + 
2183                         sizeof(struct mea_old);
2184         
2185                 if (old->mea_count > 256 || old->mea_master > 256 ||
2186                     lmm_size < mea_size || old->mea_master > old->mea_count) {
2187                         CWARN("bad MEA: count %u, master %u, size %u\n",
2188                               old->mea_count, old->mea_master, mea_size);
2189                         GOTO(out_free_mea, rc = -EINVAL);
2190                 }
2191                 magic = MEA_MAGIC_LAST_CHAR;
2192         }
2193
2194         (*tmea)->mea_magic = magic;
2195         (*tmea)->mea_count = le32_to_cpu(mea->mea_count);
2196         (*tmea)->mea_master = le32_to_cpu(mea->mea_master);
2197
2198         for (i = 0; i < (*tmea)->mea_count; i++) {
2199                 (*tmea)->mea_ids[i] = mea->mea_ids[i];
2200                 id_le_to_cpu(&(*tmea)->mea_ids[i]);
2201         }
2202         RETURN(mea_size);
2203
2204 out_free_mea:
2205         OBD_FREE(*tmea, mea_size);
2206         return rc;
2207 }
2208
2209 int lmv_brw(int rw, struct obd_export *exp, struct obdo *oa,
2210             struct lov_stripe_md *ea, obd_count oa_bufs,
2211             struct brw_page *pgarr, struct obd_trans_info *oti)
2212 {
2213         struct obd_device *obd = exp->exp_obd;
2214         struct lmv_obd *lmv = &obd->u.lmv;
2215         struct mea *mea = (struct mea *) ea;
2216         int err;
2217       
2218         LASSERT(oa != NULL);
2219         LASSERT(ea != NULL);
2220         LASSERT(pgarr != NULL);
2221         LASSERT(oa->o_mds < lmv->desc.ld_tgt_count);
2222
2223         oa->o_gr = id_gen(&mea->mea_ids[oa->o_mds]);
2224         oa->o_id = id_ino(&mea->mea_ids[oa->o_mds]);
2225         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2226         
2227         err = obd_brw(rw, lmv->tgts[oa->o_mds].ltd_exp,
2228                       oa, NULL, oa_bufs, pgarr, oti);
2229         RETURN(err);
2230 }
2231
2232 static int lmv_cancel_unused(struct obd_export *exp,
2233                              struct lov_stripe_md *lsm, 
2234                              int flags, void *opaque)
2235 {
2236         struct obd_device *obd = exp->exp_obd;
2237         struct lmv_obd *lmv = &obd->u.lmv;
2238         int rc = 0, err, i;
2239         ENTRY;
2240
2241         LASSERT(lsm == NULL);
2242         
2243         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2244                 if (!lmv->tgts[i].ltd_exp || !lmv->tgts[i].active)
2245                         continue;
2246                 
2247                 err = obd_cancel_unused(lmv->tgts[i].ltd_exp,
2248                                         NULL, flags, opaque);
2249                 if (!rc)
2250                         rc = err;
2251         }
2252         RETURN(rc);
2253 }
2254
2255 struct obd_ops lmv_obd_ops = {
2256         .o_owner                = THIS_MODULE,
2257         .o_attach               = lmv_attach,
2258         .o_detach               = lmv_detach,
2259         .o_setup                = lmv_setup,
2260         .o_cleanup              = lmv_cleanup,
2261         .o_connect              = lmv_connect,
2262         .o_disconnect           = lmv_disconnect,
2263         .o_statfs               = lmv_statfs,
2264         .o_llog_init            = lmv_llog_init,
2265         .o_llog_finish          = lmv_llog_finish,
2266         .o_get_info             = lmv_get_info,
2267         .o_set_info             = lmv_set_info,
2268         .o_create               = lmv_obd_create,
2269         .o_packmd               = lmv_packmd,
2270         .o_unpackmd             = lmv_unpackmd,
2271         .o_brw                  = lmv_brw,
2272         .o_init_ea_size         = lmv_init_ea_size,
2273         .o_notify               = lmv_notify,
2274         .o_iocontrol            = lmv_iocontrol,
2275         .o_cancel_unused        = lmv_cancel_unused,
2276 };
2277
2278 struct md_ops lmv_md_ops = {
2279         .m_getstatus           = lmv_getstatus,
2280         .m_getattr             = lmv_getattr,
2281         .m_change_cbdata       = lmv_change_cbdata,
2282         .m_change_cbdata_name  = lmv_change_cbdata_name,
2283         .m_close               = lmv_close,
2284         .m_create              = lmv_create,
2285         .m_done_writing        = lmv_done_writing,
2286         .m_enqueue             = lmv_enqueue,
2287         .m_getattr_lock        = lmv_getattr_lock,
2288         .m_intent_lock         = lmv_intent_lock,
2289         .m_link                = lmv_link,
2290         .m_rename              = lmv_rename,
2291         .m_setattr             = lmv_setattr,
2292         .m_sync                = lmv_sync,
2293         .m_readpage            = lmv_readpage,
2294         .m_unlink              = lmv_unlink,
2295         .m_get_real_obd        = lmv_get_real_obd,
2296         .m_valid_attrs         = lmv_valid_attrs,
2297         .m_delete_inode        = lmv_delete_inode,
2298         .m_access_check        = lmv_access_check,
2299 };
2300
2301 int __init lmv_init(void)
2302 {
2303         struct lprocfs_static_vars lvars;
2304         int rc;
2305
2306         obj_cache = kmem_cache_create("lmv_objects",
2307                                       sizeof(struct lmv_obj),
2308                                       0, 0, NULL, NULL);
2309         if (!obj_cache) {
2310                 CERROR("error allocating lmv objects cache\n");
2311                 return -ENOMEM;
2312         }
2313
2314         lprocfs_init_vars(lmv, &lvars);
2315         rc = class_register_type(&lmv_obd_ops, &lmv_md_ops,
2316                                  lvars.module_vars,
2317                                  OBD_LMV_DEVICENAME);
2318         if (rc)
2319                 kmem_cache_destroy(obj_cache);
2320         
2321         return rc;
2322 }
2323
2324 #ifdef __KERNEL__
2325 static void lmv_exit(void)
2326 {
2327         class_unregister_type(OBD_LMV_DEVICENAME);
2328
2329         LASSERTF(kmem_cache_destroy(obj_cache) == 0,
2330                  "can't free lmv objects cache, %d object(s)"
2331                  "still in use\n", atomic_read(&obj_cache_count));
2332 }
2333
2334 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2335 MODULE_DESCRIPTION("Lustre Logical Metadata Volume OBD driver");
2336 MODULE_LICENSE("GPL");
2337
2338 module_init(lmv_init);
2339 module_exit(lmv_exit);
2340 #endif