Whamcloud - gitweb
rename confusing mdc_uuid to tgt_uuid.
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LMV
26 #ifdef __KERNEL__
27 #include <linux/slab.h>
28 #include <linux/module.h>
29 #include <linux/init.h>
30 #include <linux/slab.h>
31 #include <linux/pagemap.h>
32 #include <asm/div64.h>
33 #include <linux/seq_file.h>
34 #include <linux/namei.h>
35 #else
36 #include <liblustre.h>
37 #endif
38 #include <linux/ext2_fs.h>
39
40 #include <linux/obd_support.h>
41 #include <linux/lustre_lib.h>
42 #include <linux/lustre_net.h>
43 #include <linux/lustre_idl.h>
44 #include <linux/lustre_dlm.h>
45 #include <linux/lustre_mds.h>
46 #include <linux/obd_class.h>
47 #include <linux/obd_ost.h>
48 #include <linux/lprocfs_status.h>
49 #include <linux/lustre_fsfilt.h>
50 #include <linux/obd_lmv.h>
51 #include <linux/lustre_lite.h>
52 #include <linux/lustre_audit.h>
53 #include "lmv_internal.h"
54
55 /* not defined for liblustre building */
56 #if !defined(ATOMIC_INIT)
57 #define ATOMIC_INIT(val) { (val) }
58 #endif
59
60 /* object cache. */
61 kmem_cache_t *obj_cache;
62 atomic_t obj_cache_count = ATOMIC_INIT(0);
63
64 static void lmv_activate_target(struct lmv_obd *lmv,
65                                 struct lmv_tgt_desc *tgt,
66                                 int activate)
67 {
68         if (tgt->active == activate)
69                 return;
70         
71         tgt->active = activate;
72         lmv->desc.ld_active_tgt_count += (activate ? 1 : -1);
73 }
74
75 /* Error codes:
76  *
77  *  -EINVAL  : UUID can't be found in the LMV's target list
78  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
79  *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
80  */
81 static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
82                               int activate)
83 {
84         struct lmv_tgt_desc *tgt;
85         struct obd_device *obd;
86         int i, rc = 0;
87         ENTRY;
88
89         CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
90                lmv, uuid->uuid, activate);
91
92         spin_lock(&lmv->lmv_lock);
93         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
94                 if (tgt->ltd_exp == NULL)
95                         continue;
96
97                 CDEBUG(D_INFO, "lmv idx %d is %s conn "LPX64"\n",
98                        i, tgt->uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
99
100                 if (obd_uuid_equals(uuid, &tgt->uuid))
101                         break;
102         }
103
104         if (i == lmv->desc.ld_tgt_count)
105                 GOTO(out_lmv_lock, rc = -EINVAL);
106
107         obd = class_exp2obd(tgt->ltd_exp);
108         if (obd == NULL)
109                 GOTO(out_lmv_lock, rc = -ENOTCONN);
110
111         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
112                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
113                obd->obd_type->typ_name, i);
114         LASSERT(strcmp(obd->obd_type->typ_name, OBD_MDC_DEVICENAME) == 0);
115
116         if (tgt->active == activate) {
117                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
118                        activate ? "" : "in");
119                 GOTO(out_lmv_lock, rc);
120         }
121
122         CDEBUG(D_INFO, "Marking OBD %p %sactive\n",
123                obd, activate ? "" : "in");
124
125         lmv_activate_target(lmv, tgt, activate);
126
127         EXIT;
128         
129  out_lmv_lock:
130         spin_unlock(&lmv->lmv_lock);
131         return rc;
132 }
133
134 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
135                       int active, void *data)
136 {
137         struct obd_uuid *uuid;
138         int rc;
139         ENTRY;
140
141         if (strcmp(watched->obd_type->typ_name, OBD_MDC_DEVICENAME)) {
142                 CERROR("unexpected notification of %s %s!\n",
143                        watched->obd_type->typ_name,
144                        watched->obd_name);
145                 RETURN(-EINVAL);
146         }
147         uuid = &watched->u.cli.cl_import->imp_target_uuid;
148
149         /* Set MDC as active before notifying the observer, so the observer can
150          * use the MDC normally.
151          */
152         rc = lmv_set_mdc_active(&obd->u.lmv, uuid, active);
153         if (rc) {
154                 CERROR("%sactivation of %s failed: %d\n",
155                        active ? "" : "de", uuid->uuid, rc);
156                 RETURN(rc);
157         }
158
159         if (obd->obd_observer)
160                 /* Pass the notification up the chain. */
161                 rc = obd_notify(obd->obd_observer, watched, active, data);
162
163         RETURN(rc);
164 }
165
166 static int lmv_attach(struct obd_device *dev, obd_count len, void *data)
167 {
168         struct lprocfs_static_vars lvars;
169         int rc;
170         ENTRY;
171
172         lprocfs_init_vars(lmv, &lvars);
173         rc = lprocfs_obd_attach(dev, lvars.obd_vars);
174 #ifdef __KERNEL__
175         if (rc == 0) {
176                 struct proc_dir_entry *entry;
177                 
178                 entry = create_proc_entry("target_obd_status", 0444, 
179                                            dev->obd_proc_entry);
180                 if (entry == NULL)
181                         RETURN(-ENOMEM);
182                 entry->proc_fops = &lmv_proc_target_fops; 
183                 entry->data = dev;
184        }
185 #endif
186         RETURN (rc);
187 }
188
189 static int lmv_detach(struct obd_device *dev)
190 {
191         return lprocfs_obd_detach(dev);
192 }
193
194 /* this is fake connect function. Its purpose is to initialize lmv and say
195  * caller that everything is okay. Real connection will be performed later. */
196 static int lmv_connect(struct lustre_handle *conn, struct obd_device *obd,
197                        struct obd_uuid *cluuid, struct obd_connect_data *data,
198                        unsigned long flags)
199 {
200 #ifdef __KERNEL__
201         struct proc_dir_entry *lmv_proc_dir;
202 #endif
203         struct lmv_obd *lmv = &obd->u.lmv;
204         struct obd_export *exp;
205         int rc = 0;
206         ENTRY;
207
208         rc = class_connect(conn, obd, cluuid);
209         if (rc) {
210                 CERROR("class_connection() returned %d\n", rc);
211                 RETURN(rc);
212         }
213
214         exp = class_conn2export(conn);
215         
216         /* we don't want to actually do the underlying connections more than
217          * once, so keep track. */
218         lmv->refcount++;
219         if (lmv->refcount > 1) {
220                 class_export_put(exp);
221                 RETURN(0);
222         }
223
224         lmv->exp = exp;
225         lmv->connected = 0;
226         lmv->cluuid = *cluuid;
227         lmv->connect_flags = flags;
228         if (data)
229                 memcpy(&lmv->conn_data, data, sizeof(*data));
230
231 #ifdef __KERNEL__
232         lmv_proc_dir = lprocfs_register("target_obds", obd->obd_proc_entry,
233                                         NULL, NULL);
234         if (IS_ERR(lmv_proc_dir)) {
235                 CERROR("could not register /proc/fs/lustre/%s/%s/target_obds.",
236                        obd->obd_type->typ_name, obd->obd_name);
237                 lmv_proc_dir = NULL;
238         }
239 #endif
240
241         /* 
242          * all real clients should perform actual connection rightaway, because
243          * it is possible, that LMV will not have opportunity to connect
244          * targets, as MDC stuff will bit called directly, for instance while
245          * reading ../mdc/../kbytesfree procfs file, etc.
246          */
247         if (flags & OBD_OPT_REAL_CLIENT)
248                 rc = lmv_check_connect(obd);
249
250 #ifdef __KERNEL__
251         if (rc) {
252                 if (lmv_proc_dir)
253                         lprocfs_remove(lmv_proc_dir);
254         }
255 #endif
256
257         RETURN(rc);
258 }
259
260 static void lmv_set_timeouts(struct obd_device *obd)
261 {
262         struct lmv_tgt_desc *tgts;
263         struct lmv_obd *lmv;
264         int i;
265
266         lmv = &obd->u.lmv;
267         if (lmv->server_timeout == 0)
268                 return;
269
270         if (lmv->connected == 0)
271                 return;
272
273         for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgts++) {
274                 if (tgts->ltd_exp == NULL)
275                         continue;
276                 
277                 obd_set_info(tgts->ltd_exp, strlen("inter_mds"),
278                              "inter_mds", 0, NULL);
279         }
280 }
281
282 static int lmv_init_ea_size(struct obd_export *exp, int easize,
283                             int cookiesize)
284 {
285         struct obd_device *obd = exp->exp_obd;
286         struct lmv_obd *lmv = &obd->u.lmv;
287         int i, rc = 0, change = 0;
288         ENTRY;
289
290         if (lmv->max_easize < easize) {
291                 lmv->max_easize = easize;
292                 change = 1;
293         }
294         if (lmv->max_cookiesize < cookiesize) {
295                 lmv->max_cookiesize = cookiesize;
296                 change = 1;
297         }
298         if (change == 0)
299                 RETURN(0);
300         
301         if (lmv->connected == 0)
302                 RETURN(0);
303
304         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
305                 if (lmv->tgts[i].ltd_exp == NULL) {
306                         CWARN("%s: NULL export for %d\n", obd->obd_name, i);
307                         continue;
308                 }
309
310                 rc = obd_init_ea_size(lmv->tgts[i].ltd_exp, easize, cookiesize);
311                 if (rc) {
312                         CERROR("obd_init_ea_size() failed on MDT target %d, "
313                                "error %d.\n", i, rc);
314                         break;
315                 }
316         }
317         RETURN(rc);
318 }
319
320 #define MAX_STRING_SIZE 128
321
322 int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
323 {
324         struct lmv_obd *lmv = &obd->u.lmv;
325         struct obd_uuid *cluuid = &lmv->cluuid;
326         struct obd_uuid lmv_mdc_uuid = { "LMV_MDC_UUID" };
327         struct lustre_handle conn = {0, };
328         struct obd_device *mdc_obd;
329         struct obd_export *mdc_exp;
330         int rc;
331 #ifdef __KERNEL__
332         struct proc_dir_entry *lmv_proc_dir;
333 #endif
334         ENTRY;
335
336         /* for MDS: don't connect to yourself */
337         if (obd_uuid_equals(&tgt->uuid, cluuid)) {
338                 CDEBUG(D_CONFIG, "don't connect back to %s\n", cluuid->uuid);
339                 /* XXX - the old code didn't increment active tgt count.
340                  *       should we ? */
341                 RETURN(0);
342         }
343
344         mdc_obd = class_find_client_obd(&tgt->uuid, OBD_MDC_DEVICENAME,
345                                         &obd->obd_uuid);
346         if (!mdc_obd) {
347                 CERROR("target %s not attached\n", tgt->uuid.uuid);
348                 RETURN(-EINVAL);
349         }
350
351         CDEBUG(D_CONFIG, "connect to %s(%s) - %s, %s FOR %s\n",
352                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
353                 tgt->uuid.uuid, obd->obd_uuid.uuid,
354                 cluuid->uuid);
355
356         if (!mdc_obd->obd_set_up) {
357                 CERROR("target %s not set up\n", tgt->uuid.uuid);
358                 RETURN(-EINVAL);
359         }
360         
361         rc = obd_connect(&conn, mdc_obd, &lmv_mdc_uuid, &lmv->conn_data,
362                          lmv->connect_flags);
363         if (rc) {
364                 CERROR("target %s connect error %d\n", tgt->uuid.uuid, rc);
365                 RETURN(rc);
366         }
367
368         mdc_exp = class_conn2export(&conn);
369
370         rc = obd_register_observer(mdc_obd, obd);
371         if (rc) {
372                 obd_disconnect(mdc_exp, 0);
373                 CERROR("target %s register_observer error %d\n",
374                        tgt->uuid.uuid, rc);
375                 RETURN(rc);
376         }
377
378         if (obd->obd_observer) {
379                 /* tell the mds_lmv about the new target */
380                 rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd, 1,
381                                 (void *)(tgt - lmv->tgts));
382                 if (rc) {
383                         obd_disconnect(mdc_exp, 0);
384                         RETURN(rc);
385                 }
386         }
387
388         tgt->ltd_exp = mdc_exp;
389         tgt->active = 1; 
390         lmv->desc.ld_active_tgt_count++;
391
392         obd_init_ea_size(tgt->ltd_exp, lmv->max_easize,
393                          lmv->max_cookiesize);
394         CDEBUG(D_CONFIG, "connected to %s(%s) successfully (%d)\n",
395                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
396                 atomic_read(&obd->obd_refcount));
397
398 #ifdef __KERNEL__
399         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
400         if (lmv_proc_dir) {
401                 struct proc_dir_entry *mdc_symlink;
402                 char name[MAX_STRING_SIZE + 1];
403
404                 LASSERT(mdc_obd->obd_type != NULL);
405                 LASSERT(mdc_obd->obd_type->typ_name != NULL);
406                 name[MAX_STRING_SIZE] = '\0';
407                 snprintf(name, MAX_STRING_SIZE, "../../../%s/%s",
408                          mdc_obd->obd_type->typ_name,
409                          mdc_obd->obd_name);
410                 mdc_symlink = proc_symlink(mdc_obd->obd_name,
411                                            lmv_proc_dir, name);
412                 if (mdc_symlink == NULL) {
413                         CERROR("could not register LMV target "
414                                "/proc/fs/lustre/%s/%s/target_obds/%s.",
415                                obd->obd_type->typ_name, obd->obd_name,
416                                mdc_obd->obd_name);
417                         lprocfs_remove(lmv_proc_dir);
418                         lmv_proc_dir = NULL;
419                 }
420         }
421 #endif
422         RETURN(0);
423 }
424
425 int lmv_add_mdc(struct obd_device *obd, struct obd_uuid *tgt_uuid)
426 {
427         struct lmv_obd *lmv = &obd->u.lmv;
428         struct lmv_tgt_desc *tgt;
429         int rc = 0;
430         ENTRY;
431
432         CDEBUG(D_CONFIG, "tgt_uuid: %s.\n", tgt_uuid->uuid);
433
434         lmv_init_lock(lmv);
435
436         if (lmv->desc.ld_active_tgt_count >= LMV_MAX_TGT_COUNT) {
437                 lmv_init_unlock(lmv);
438                 CERROR("can't add %s, LMV module compiled for %d MDCs. "
439                        "That many MDCs already configured.\n",
440                        tgt_uuid->uuid, LMV_MAX_TGT_COUNT);
441                 RETURN(-EINVAL);
442         }
443         if (lmv->desc.ld_tgt_count == 0) {
444                 struct obd_device *mdc_obd;
445
446                 mdc_obd = class_find_client_obd(tgt_uuid, OBD_MDC_DEVICENAME,
447                                                 &obd->obd_uuid);
448                 if (!mdc_obd) {
449                         lmv_init_unlock(lmv);
450                         CERROR("Target %s not attached\n", tgt_uuid->uuid);
451                         RETURN(-EINVAL);
452                 }
453
454                 rc = obd_llog_init(obd, &obd->obd_llogs, mdc_obd, 0, NULL);
455                 if (rc) {
456                         lmv_init_unlock(lmv);
457                         CERROR("lmv failed to setup llogging subsystems\n");
458                 }
459         }
460         spin_lock(&lmv->lmv_lock);
461         tgt = lmv->tgts + lmv->desc.ld_tgt_count++;
462         tgt->uuid = *tgt_uuid;
463         spin_unlock(&lmv->lmv_lock);
464
465         if (lmv->connected) {
466                 rc = lmv_connect_mdc(obd, tgt);
467                 if (rc) {
468                         spin_lock(&lmv->lmv_lock);
469                         lmv->desc.ld_tgt_count--;
470                         memset(tgt, 0, sizeof(*tgt));
471                         spin_unlock(&lmv->lmv_lock);
472                 } else {
473                         int easize = sizeof(struct mea) +
474                                      lmv->desc.ld_tgt_count *
475                                      sizeof(struct lustre_id);
476                         lmv_init_ea_size(obd->obd_self_export, easize, 0);
477                 }
478         }
479
480         lmv_init_unlock(lmv);
481         RETURN(rc);
482 }
483
484 /* performs a check if passed obd is connected. If no - connect it. */
485 int lmv_check_connect(struct obd_device *obd)
486 {
487         struct lmv_obd *lmv = &obd->u.lmv;
488         struct lmv_tgt_desc *tgt;
489         int i, rc, easize;
490         ENTRY;
491
492         if (lmv->connected)
493                 RETURN(0);
494         
495         lmv_init_lock(lmv);
496         if (lmv->connected) {
497                 lmv_init_unlock(lmv);
498                 RETURN(0);
499         }
500
501         if (lmv->desc.ld_tgt_count == 0) {
502                 CERROR("%s: no targets configured.\n", obd->obd_name);
503                 RETURN(-EINVAL);
504         }
505
506         CDEBUG(D_CONFIG, "time to connect %s to %s\n",
507                lmv->cluuid.uuid, obd->obd_name);
508
509         LASSERT(lmv->tgts != NULL);
510
511         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
512                 rc = lmv_connect_mdc(obd, tgt);
513                 if (rc)
514                         GOTO(out_disc, rc);
515         }
516
517         lmv_set_timeouts(obd);
518         class_export_put(lmv->exp);
519         lmv->connected = 1;
520         easize = lmv->desc.ld_tgt_count * sizeof(struct lustre_id) +
521                  sizeof(struct mea);
522         lmv_init_ea_size(obd->obd_self_export, easize, 0);
523         lmv_init_unlock(lmv);
524         RETURN(0);
525
526  out_disc:
527         while (i-- > 0) {
528                 int rc2;
529                 --tgt;
530                 tgt->active = 0;
531                 if (tgt->ltd_exp) {
532                         --lmv->desc.ld_active_tgt_count;
533                         rc2 = obd_disconnect(tgt->ltd_exp, 0);
534                         if (rc2) {
535                                 CERROR("error: LMV target %s disconnect on "
536                                        "MDC idx %d: error %d\n",
537                                        tgt->uuid.uuid, i, rc2);
538                         }
539                 }
540         }
541         class_disconnect(lmv->exp, 0);
542         lmv_init_unlock(lmv);
543         RETURN(rc);
544 }
545
546 static int lmv_disconnect(struct obd_export *exp, unsigned long flags)
547 {
548         struct obd_device *obd = class_exp2obd(exp);
549         struct lmv_obd *lmv = &obd->u.lmv;
550
551 #ifdef __KERNEL__
552         struct proc_dir_entry *lmv_proc_dir;
553 #endif
554         int rc, i;
555         ENTRY;
556
557         if (!lmv->tgts)
558                 goto out_local;
559
560         /* Only disconnect the underlying layers on the final disconnect. */
561         lmv->refcount--;
562         if (lmv->refcount != 0)
563                 goto out_local;
564
565 #ifdef __KERNEL__
566         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
567 #endif
568
569         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
570                 struct obd_device *mdc_obd; 
571                 
572                 if (lmv->tgts[i].ltd_exp == NULL)
573                         continue;
574
575                 mdc_obd = class_exp2obd(lmv->tgts[i].ltd_exp);
576
577                 if (mdc_obd)
578                         mdc_obd->obd_no_recov = obd->obd_no_recov;
579
580 #ifdef __KERNEL__
581                 if (lmv_proc_dir) {
582                         struct proc_dir_entry *mdc_symlink;
583
584                         mdc_symlink = lprocfs_srch(lmv_proc_dir, mdc_obd->obd_name);
585                         if (mdc_symlink) {
586                                 lprocfs_remove(mdc_symlink);
587                         } else {
588                                 CERROR("/proc/fs/lustre/%s/%s/target_obds/%s missing\n",
589                                        obd->obd_type->typ_name, obd->obd_name,
590                                        mdc_obd->obd_name);
591                         }
592                 }
593 #endif
594                 CDEBUG(D_OTHER, "disconnected from %s(%s) successfully\n",
595                         lmv->tgts[i].ltd_exp->exp_obd->obd_name,
596                         lmv->tgts[i].ltd_exp->exp_obd->obd_uuid.uuid);
597
598                 obd_register_observer(lmv->tgts[i].ltd_exp->exp_obd, NULL);
599                 rc = obd_disconnect(lmv->tgts[i].ltd_exp, flags);
600                 if (rc) {
601                         if (lmv->tgts[i].active) {
602                                 CERROR("Target %s disconnect error %d\n",
603                                        lmv->tgts[i].uuid.uuid, rc);
604                         }
605                         rc = 0;
606                 }
607                 
608                 lmv_activate_target(lmv, &lmv->tgts[i], 0);
609                 lmv->tgts[i].ltd_exp = NULL;
610         }
611
612 #ifdef __KERNEL__
613         if (lmv_proc_dir) {
614                 lprocfs_remove(lmv_proc_dir);
615         } else {
616                 CERROR("/proc/fs/lustre/%s/%s/target_obds missing\n",
617                        obd->obd_type->typ_name, obd->obd_name);
618         }
619 #endif
620
621 out_local:
622         /* this is the case when no real connection is established by
623          * lmv_check_connect(). */
624         if (!lmv->connected)
625                 class_export_put(exp);
626         rc = class_disconnect(exp, 0);
627         if (lmv->refcount == 0)
628                 lmv->connected = 0;
629         RETURN(rc);
630 }
631
632 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
633                          int len, void *karg, void *uarg)
634 {
635         struct obd_device *obddev = class_exp2obd(exp);
636         struct lmv_obd *lmv = &obddev->u.lmv;
637         int i, rc = 0, set = 0;
638         ENTRY;
639
640         if (lmv->desc.ld_tgt_count == 0)
641                 RETURN(-ENOTTY);
642         
643         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
644                 int err;
645
646                 if (lmv->tgts[i].ltd_exp == NULL)
647                         continue;
648
649                 err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp, len, karg, uarg);
650                 if (err) {
651                         if (lmv->tgts[i].active) {
652                                 CERROR("error: iocontrol MDC %s on MDT"
653                                        "idx %d: err = %d\n",
654                                        lmv->tgts[i].uuid.uuid, i, err);
655                                 if (!rc)
656                                         rc = err;
657                         }
658                 } else
659                         set = 1;
660         }
661         if (!set && !rc)
662                 rc = -EIO;
663
664         RETURN(rc);
665 }
666
667 static int lmv_setup(struct obd_device *obd, obd_count len, void *buf)
668 {
669         struct lmv_obd *lmv = &obd->u.lmv;
670         struct lustre_cfg *lcfg = buf;
671         struct lmv_desc *desc;
672         int rc = 0;
673         ENTRY;
674
675         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
676                 CERROR("LMV setup requires a descriptor\n");
677                 RETURN(-EINVAL);
678         }
679
680         desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
681         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
682                 CERROR("descriptor size wrong: %d > %d\n",
683                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
684                 RETURN(-EINVAL);
685         }
686
687         lmv->tgts_size = LMV_MAX_TGT_COUNT * sizeof(struct lmv_tgt_desc);
688
689         OBD_ALLOC(lmv->tgts, lmv->tgts_size);
690         if (lmv->tgts == NULL) {
691                 CERROR("Out of memory\n");
692                 RETURN(-ENOMEM);
693         }
694
695         obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid);
696         lmv->desc.ld_tgt_count = 0;
697         lmv->desc.ld_active_tgt_count = 0;
698         lmv->max_cookiesize = 0;
699         lmv->max_easize = 0;
700
701         spin_lock_init(&lmv->lmv_lock);
702         sema_init(&lmv->init_sem, 1);
703
704         rc = lmv_setup_mgr(obd);
705         if (rc) {
706                 CERROR("Can't setup LMV object manager, "
707                        "error %d.\n", rc);
708                 OBD_FREE(lmv->tgts, lmv->tgts_size);
709                 RETURN(rc);
710         }
711
712         RETURN(rc);
713 }
714
715 static int lmv_cleanup(struct obd_device *obd, int flags) 
716 {
717         struct lmv_obd *lmv = &obd->u.lmv;
718         ENTRY;
719
720         lmv_cleanup_mgr(obd);
721         OBD_FREE(lmv->tgts, lmv->tgts_size);
722         
723         RETURN(0);
724 }
725 static int lmv_process_config(struct obd_device *obd, obd_count len, void *buf)
726 {
727         struct lustre_cfg *lcfg = buf;
728         struct obd_uuid tgt_uuid;
729         int rc;
730         ENTRY;
731
732         switch(lcfg->lcfg_command) {
733         case LCFG_LMV_ADD_MDC:
734                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(tgt_uuid.uuid))
735                         GOTO(out, rc = -EINVAL);
736
737                 obd_str2uuid(&tgt_uuid, lustre_cfg_string(lcfg, 1));
738                 rc = lmv_add_mdc(obd, &tgt_uuid);
739                 GOTO(out, rc);
740         default: {
741                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
742                 GOTO(out, rc = -EINVAL);
743         }
744         }
745 out:
746         RETURN(rc);
747 }
748
749 static int lmv_statfs(struct obd_device *obd, struct obd_statfs *osfs,
750                       unsigned long max_age)
751 {
752         struct lmv_obd *lmv = &obd->u.lmv;
753         struct obd_statfs *temp;
754         int rc = 0, i;
755         ENTRY;
756         
757         rc = lmv_check_connect(obd);
758         if (rc)
759                 RETURN(rc);
760
761         OBD_ALLOC(temp, sizeof(*temp));
762         if (temp == NULL)
763                 RETURN(-ENOMEM);
764                
765         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
766                 if (lmv->tgts[i].ltd_exp == NULL)
767                         continue;
768
769                 rc = obd_statfs(lmv->tgts[i].ltd_exp->exp_obd, temp, max_age);
770                 if (rc) {
771                         CERROR("can't stat MDS #%d (%s), error %d\n", i,
772                                lmv->tgts[i].ltd_exp->exp_obd->obd_name,
773                                rc);
774                         GOTO(out_free_temp, rc);
775                 }
776                 if (i == 0) {
777                         memcpy(osfs, temp, sizeof(*temp));
778                 } else {
779                         osfs->os_bavail += temp->os_bavail;
780                         osfs->os_blocks += temp->os_blocks;
781                         osfs->os_ffree += temp->os_ffree;
782                         osfs->os_files += temp->os_files;
783                 }
784         }
785
786         EXIT;
787 out_free_temp:
788         OBD_FREE(temp, sizeof(*temp));
789         return rc;
790 }
791
792 static int lmv_getstatus(struct obd_export *exp, struct lustre_id *id)
793 {
794         struct obd_device *obd = exp->exp_obd;
795         struct lmv_obd *lmv = &obd->u.lmv;
796         int rc;
797         ENTRY;
798
799         rc = lmv_check_connect(obd);
800         if (rc)
801                 RETURN(rc);
802
803         rc = md_getstatus(lmv->tgts[0].ltd_exp, id);
804         id_group(id) = 0;
805         
806         RETURN(rc);
807 }
808
809 static int lmv_getattr(struct obd_export *exp, struct lustre_id *id,
810                        __u64 valid, const char *xattr_name,
811                        const void *xattr_data, unsigned int xattr_datalen,
812                        unsigned int ea_size, struct obd_capa *ocapa,
813                        struct ptlrpc_request **request)
814 {
815         struct obd_device *obd = exp->exp_obd;
816         struct lmv_obd *lmv = &obd->u.lmv;
817         int rc, i = id_group(id);
818         struct lmv_obj *obj;
819         ENTRY;
820
821         rc = lmv_check_connect(obd);
822         if (rc)
823                 RETURN(rc);
824
825         LASSERT(i < lmv->desc.ld_tgt_count);
826
827
828         rc = md_getattr(lmv->tgts[i].ltd_exp, id, valid,
829                         xattr_name, xattr_data, xattr_datalen,
830                         ea_size, ocapa, request);
831         if (rc)
832                 RETURN(rc);
833         
834         obj = lmv_grab_obj(obd, id);
835         
836         CDEBUG(D_OTHER, "GETATTR for "DLID4" %s\n",
837                OLID4(id), obj ? "(splitted)" : "");
838
839         /*
840          * if object is splitted, then we loop over all the slaves and gather
841          * size attribute. In ideal world we would have to gather also mds field
842          * from all slaves, as object is spread over the cluster and this is
843          * definitely interesting information and it is not good to loss it,
844          * but...
845          */
846         if (obj) {
847                 struct mds_body *body;
848
849                 if (*request == NULL) {
850                         lmv_put_obj(obj);
851                         RETURN(rc);
852                 }
853                         
854                 body = lustre_msg_buf((*request)->rq_repmsg, 0,
855                                       sizeof(*body));
856                 LASSERT(body != NULL);
857
858                 lmv_lock_obj(obj);
859         
860                 for (i = 0; i < obj->objcount; i++) {
861
862                         if (lmv->tgts[i].ltd_exp == NULL) {
863                                 CWARN("%s: NULL export for %d\n",
864                                       obd->obd_name, i);
865                                 continue;
866                         }
867
868                         /* skip master obj. */
869                         if (id_equal_fid(&obj->id, &obj->objs[i].id))
870                                 continue;
871                         
872                         body->size += obj->objs[i].size;
873                 }
874
875                 lmv_unlock_obj(obj);
876                 lmv_put_obj(obj);
877         }
878         
879         RETURN(rc);
880 }
881
882 static int lmv_access_check(struct obd_export *exp,
883                             struct lustre_id *id,
884                             struct ptlrpc_request **request)
885 {
886         struct obd_device *obd = exp->exp_obd;
887         struct lmv_obd *lmv = &obd->u.lmv;
888         int rc, i = id_group(id);
889         ENTRY;
890
891         rc = lmv_check_connect(obd);
892         if (rc)
893                 RETURN(rc);
894
895         LASSERT(i < lmv->desc.ld_tgt_count);
896         rc = md_access_check(lmv->tgts[i].ltd_exp, id, request);
897         RETURN(rc);
898 }
899
900 static int lmv_change_cbdata(struct obd_export *exp,
901                              struct lustre_id *id, 
902                              ldlm_iterator_t it,
903                              void *data)
904 {
905         struct obd_device *obd = exp->exp_obd;
906         struct lmv_obd *lmv = &obd->u.lmv;
907         int i, rc;
908         ENTRY;
909         
910         rc = lmv_check_connect(obd);
911         if (rc)
912                 RETURN(rc);
913         
914         CDEBUG(D_OTHER, "CBDATA for "DLID4"\n", OLID4(id));
915         LASSERT(id_group(id) < lmv->desc.ld_tgt_count);
916
917         /* with CMD every object can have two locks in different
918          * namespaces: lookup lock in space of mds storing direntry
919          * and update/open lock in space of mds storing inode */
920         for (i = 0; i < lmv->desc.ld_tgt_count; i++)
921                 md_change_cbdata(lmv->tgts[i].ltd_exp, id, it, data);
922         
923         RETURN(0);
924 }
925
926 static int lmv_change_cbdata_name(struct obd_export *exp,
927                                   struct lustre_id *pid,
928                                   char *name, int len,
929                                   struct lustre_id *cid,
930                                   ldlm_iterator_t it,
931                                   void *data)
932 {
933         struct obd_device *obd = exp->exp_obd;
934         struct lmv_obd *lmv = &obd->u.lmv;
935         struct lustre_id rcid = *cid;
936         struct lmv_obj *obj;
937         int rc = 0, mds;
938         ENTRY;
939
940         rc = lmv_check_connect(obd);
941         if (rc)
942                 RETURN(rc);
943
944         LASSERT(id_group(pid) < lmv->desc.ld_tgt_count);
945         LASSERT(id_group(cid) < lmv->desc.ld_tgt_count);
946         
947         CDEBUG(D_OTHER, "CBDATA for "DLID4":%*s -> "DLID4"\n",
948                OLID4(pid), len, name, OLID4(cid));
949
950         /* this is default mds for directory name belongs to. */
951         mds = id_group(pid);
952         obj = lmv_grab_obj(obd, pid);
953         if (obj) {
954                 /* directory is splitted. look for right mds for this name. */
955                 mds = raw_name2idx(obj->hashtype, obj->objcount, name, len);
956                 rcid = obj->objs[mds].id;
957                 mds = id_group(&rcid);
958                 lmv_put_obj(obj);
959         }
960         rc = md_change_cbdata(lmv->tgts[mds].ltd_exp, &rcid, it, data);
961         RETURN(rc);
962 }
963
964 static int lmv_valid_attrs(struct obd_export *exp, struct lustre_id *id) 
965 {
966         struct obd_device *obd = exp->exp_obd;
967         struct lmv_obd *lmv = &obd->u.lmv;
968         int rc = 0;
969         ENTRY;
970
971         rc = lmv_check_connect(obd);
972         if (rc)
973                 RETURN(rc);
974
975         CDEBUG(D_OTHER, "validate "DLID4"\n", OLID4(id));
976         LASSERT(id_group(id) < lmv->desc.ld_tgt_count);
977         rc = md_valid_attrs(lmv->tgts[id_group(id)].ltd_exp, id);
978         RETURN(rc);
979 }
980
981 static int lmv_close(struct obd_export *exp, struct mdc_op_data *op_data,
982                      struct obd_client_handle *och,
983                      struct ptlrpc_request **request)
984 {
985         struct obd_device *obd = exp->exp_obd;
986         struct lmv_obd *lmv = &obd->u.lmv;
987         int rc, i = id_group(&op_data->id1);
988         ENTRY;
989         
990         rc = lmv_check_connect(obd);
991         if (rc)
992                 RETURN(rc);
993
994         LASSERT(i < lmv->desc.ld_tgt_count);
995         CDEBUG(D_OTHER, "CLOSE "DLID4"\n", OLID4(&op_data->id1));
996         rc = md_close(lmv->tgts[i].ltd_exp, op_data, och, request);
997         RETURN(rc);
998 }
999
1000 int lmv_get_mea_and_update_object(struct obd_export *exp, 
1001                                   struct lustre_id *id)
1002 {
1003         struct obd_device *obd = exp->exp_obd;
1004         struct lmv_obd *lmv = &obd->u.lmv;
1005         struct ptlrpc_request *req = NULL;
1006         struct lmv_obj *obj;
1007         struct lustre_md md;
1008         int mealen, rc;
1009         __u64 valid;
1010         ENTRY;
1011
1012         md.mea = NULL;
1013         mealen = MEA_SIZE_LMV(lmv);
1014         
1015         valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA;
1016
1017         /* time to update mea of parent id */
1018         rc = md_getattr(lmv->tgts[id_group(id)].ltd_exp,
1019                         id, valid, NULL, NULL, 0, mealen, NULL, &req);
1020         if (rc) {
1021                 CERROR("md_getattr() failed, error %d\n", rc);
1022                 GOTO(cleanup, rc);
1023         }
1024
1025         rc = mdc_req2lustre_md(exp, req, 0, NULL, &md);
1026         if (rc) {
1027                 CERROR("mdc_req2lustre_md() failed, error %d\n", rc);
1028                 GOTO(cleanup, rc);
1029         }
1030
1031         if (md.mea == NULL)
1032                 GOTO(cleanup, rc = -ENODATA);
1033
1034         obj = lmv_create_obj(exp, id, md.mea);
1035         if (IS_ERR(obj))
1036                 rc = PTR_ERR(obj);
1037         else
1038                 lmv_put_obj(obj);
1039
1040         obd_free_memmd(exp, (struct lov_stripe_md **)&md.mea);
1041
1042         EXIT;
1043 cleanup:
1044         if (req)
1045                 ptlrpc_req_finished(req);
1046         return rc;
1047 }
1048
1049 int lmv_create(struct obd_export *exp, struct mdc_op_data *op_data,
1050                const void *data, int datalen, int mode, __u32 uid,
1051                __u32 gid, __u64 rdev, struct ptlrpc_request **request)
1052 {
1053         struct obd_device *obd = exp->exp_obd;
1054         struct lmv_obd *lmv = &obd->u.lmv;
1055         struct mds_body *body;
1056         struct lmv_obj *obj;
1057         int rc, mds, loop = 0;
1058         ENTRY;
1059
1060         rc = lmv_check_connect(obd);
1061         if (rc)
1062                 RETURN(rc);
1063
1064         if (!lmv->desc.ld_active_tgt_count)
1065                 RETURN(-EIO);
1066 repeat:
1067         LASSERT(++loop <= 2);
1068         obj = lmv_grab_obj(obd, &op_data->id1);
1069         if (obj) {
1070                 mds = raw_name2idx(obj->hashtype, obj->objcount, 
1071                                    op_data->name, op_data->namelen);
1072                 op_data->id1 = obj->objs[mds].id;
1073                 lmv_put_obj(obj);
1074         }
1075
1076         CDEBUG(D_OTHER, "CREATE '%*s' on "DLID4"\n", op_data->namelen,
1077                op_data->name, OLID4(&op_data->id1));
1078         
1079         rc = md_create(lmv->tgts[id_group(&op_data->id1)].ltd_exp, 
1080                        op_data, data, datalen, mode, uid, gid, rdev,
1081                        request);
1082         if (rc == 0) {
1083                 if (*request == NULL)
1084                         RETURN(rc);
1085
1086                 body = lustre_msg_buf((*request)->rq_repmsg, 0,
1087                                       sizeof(*body));
1088                 if (body == NULL)
1089                         RETURN(-ENOMEM);
1090                 
1091                 CDEBUG(D_OTHER, "created. "DLID4"\n", OLID4(&op_data->id1));
1092         } else if (rc == -ERESTART) {
1093                 /*
1094                  * directory got splitted. time to update local object and
1095                  * repeat the request with proper MDS.
1096                  */
1097                 rc = lmv_get_mea_and_update_object(exp, &op_data->id1);
1098                 if (rc == 0) {
1099                         ptlrpc_req_finished(*request);
1100                         goto repeat;
1101                 }
1102         }
1103         RETURN(rc);
1104 }
1105
1106 static int lmv_done_writing(struct obd_export *exp, struct obdo *obdo)
1107 {
1108         struct obd_device *obd = exp->exp_obd;
1109         struct lmv_obd *lmv = &obd->u.lmv;
1110         int rc;
1111         ENTRY;
1112         
1113         rc = lmv_check_connect(obd);
1114         if (rc)
1115                 RETURN(rc);
1116
1117         /* FIXME: choose right MDC here */
1118         CWARN("this method isn't implemented yet\n");
1119         rc = md_done_writing(lmv->tgts[0].ltd_exp, obdo);
1120         RETURN(rc);
1121 }
1122
1123 static int
1124 lmv_enqueue_slaves(struct obd_export *exp, int locktype,
1125                    struct lookup_intent *it, int lockmode,
1126                    struct mdc_op_data *data, struct lustre_handle *lockh,
1127                    void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
1128                    ldlm_blocking_callback cb_blocking, void *cb_data)
1129 {
1130         struct obd_device *obd = exp->exp_obd;
1131         struct lmv_obd *lmv = &obd->u.lmv;
1132         struct mea *mea = data->mea1;
1133         struct mdc_op_data *data2;
1134         int i, rc, mds;
1135         ENTRY;
1136
1137         OBD_ALLOC(data2, sizeof(*data2));
1138         if (data2 == NULL)
1139                 RETURN(-ENOMEM);
1140         
1141         LASSERT(mea != NULL);
1142         for (i = 0; i < mea->mea_count; i++) {
1143                 memset(data2, 0, sizeof(*data2));
1144                 data2->id1 = mea->mea_ids[i];
1145                 mds = id_group(&data2->id1);
1146                 
1147                 if (lmv->tgts[mds].ltd_exp == NULL)
1148                         continue;
1149
1150                 rc = md_enqueue(lmv->tgts[mds].ltd_exp, locktype, it, 
1151                                 lockmode, data2, lockh + i, lmm, lmmsize, 
1152                                 cb_compl, cb_blocking, cb_data);
1153                 
1154                 CDEBUG(D_OTHER, "take lock on slave "DLID4" -> %d/%d\n",
1155                        OLID4(&mea->mea_ids[i]), rc, LUSTRE_IT(it)->it_status);
1156                 if (rc)
1157                         GOTO(cleanup, rc);
1158                 if (LUSTRE_IT(it)->it_data) {
1159                         struct ptlrpc_request *req;
1160                         req = (struct ptlrpc_request *) LUSTRE_IT(it)->it_data;
1161                         ptlrpc_req_finished(req);
1162                 }
1163                 
1164                 if (LUSTRE_IT(it)->it_status)
1165                         GOTO(cleanup, rc = LUSTRE_IT(it)->it_status);
1166         }
1167         
1168         OBD_FREE(data2, sizeof(*data2));
1169         RETURN(0);
1170 cleanup:
1171         OBD_FREE(data2, sizeof(*data2));
1172         
1173         /* drop all taken locks */
1174         while (--i >= 0) {
1175                 if (lockh[i].cookie)
1176                         ldlm_lock_decref(lockh + i, lockmode);
1177                 lockh[i].cookie = 0;
1178         }
1179         return rc;
1180 }
1181
1182 static int
1183 lmv_enqueue_remote(struct obd_export *exp, int lock_type,
1184                    struct lookup_intent *it, int lock_mode,
1185                    struct mdc_op_data *data, struct lustre_handle *lockh,
1186                    void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
1187                    ldlm_blocking_callback cb_blocking, void *cb_data)
1188 {
1189         struct ptlrpc_request *req = LUSTRE_IT(it)->it_data;
1190         struct obd_device *obd = exp->exp_obd;
1191         struct lmv_obd *lmv = &obd->u.lmv;
1192         struct lustre_handle plock;
1193         struct mdc_op_data rdata;
1194         struct mds_body *body = NULL;
1195         int rc = 0, pmode;
1196         ENTRY;
1197
1198         body = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body));
1199         LASSERT(body != NULL);
1200
1201         if (!(body->valid & OBD_MD_MDS))
1202                 RETURN(0);
1203
1204         CDEBUG(D_OTHER, "ENQUEUE '%s' on "DLID4" -> "DLID4"\n",
1205                LL_IT2STR(it), OLID4(&data->id1), OLID4(&body->id1));
1206
1207         /* we got LOOKUP lock, but we really need attrs */
1208         pmode = LUSTRE_IT(it)->it_lock_mode;
1209         LASSERT(pmode != 0);
1210         memcpy(&plock, lockh, sizeof(plock));
1211         LUSTRE_IT(it)->it_lock_mode = 0;
1212         LUSTRE_IT(it)->it_data = NULL;
1213         LASSERT((body->valid & OBD_MD_FID) != 0);
1214
1215         memcpy(&rdata, data, sizeof(rdata));
1216         rdata.id1 = body->id1;
1217         rdata.name = NULL;
1218         rdata.namelen = 0;
1219
1220         LUSTRE_IT(it)->it_disposition &= ~DISP_ENQ_COMPLETE;
1221         ptlrpc_req_finished(req);
1222
1223         rc = md_enqueue(lmv->tgts[id_group(&rdata.id1)].ltd_exp, 
1224                         lock_type, it, lock_mode, &rdata, lockh, lmm, 
1225                         lmmsize, cb_compl, cb_blocking, cb_data);
1226         ldlm_lock_decref(&plock, pmode);
1227         RETURN(rc);
1228 }
1229
1230 static int
1231 lmv_enqueue(struct obd_export *exp, int lock_type,
1232             struct lookup_intent *it, int lock_mode,
1233             struct mdc_op_data *data, struct lustre_handle *lockh,
1234             void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
1235             ldlm_blocking_callback cb_blocking, void *cb_data)
1236 {
1237         struct obd_device *obd = exp->exp_obd;
1238         struct lmv_obd *lmv = &obd->u.lmv;
1239         struct lmv_obj *obj;
1240         int rc, mds;
1241         ENTRY;
1242
1243         rc = lmv_check_connect(obd);
1244         if (rc)
1245                 RETURN(rc);
1246
1247         if (data->mea1 && it->it_op == IT_UNLINK) {
1248                 rc = lmv_enqueue_slaves(exp, lock_type, it, lock_mode,
1249                                         data, lockh, lmm, lmmsize,
1250                                         cb_compl, cb_blocking, cb_data);
1251                 RETURN(rc);
1252         }
1253
1254         if (data->namelen) {
1255                 obj = lmv_grab_obj(obd, &data->id1);
1256                 if (obj) {
1257                         /* directory is splitted. look for right mds for this
1258                          * name */
1259                         mds = raw_name2idx(obj->hashtype, obj->objcount,
1260                                            (char *)data->name, data->namelen);
1261                         data->id1 = obj->objs[mds].id;
1262                         lmv_put_obj(obj);
1263                 }
1264         }
1265         CDEBUG(D_OTHER, "ENQUEUE '%s' on "DLID4"\n", LL_IT2STR(it),
1266                OLID4(&data->id1));
1267         
1268         rc = md_enqueue(lmv->tgts[id_group(&data->id1)].ltd_exp, 
1269                         lock_type, it, lock_mode, data, lockh, lmm, 
1270                         lmmsize, cb_compl, cb_blocking, cb_data);
1271         if (rc == 0 && it->it_op == IT_OPEN)
1272                 rc = lmv_enqueue_remote(exp, lock_type, it, lock_mode,
1273                                         data, lockh, lmm, lmmsize,
1274                                         cb_compl, cb_blocking, cb_data);
1275         RETURN(rc);
1276 }
1277
1278 static int
1279 lmv_getattr_lock(struct obd_export *exp, struct lustre_id *id,
1280                  char *filename, int namelen, __u64 valid,
1281                  unsigned int ea_size, struct ptlrpc_request **request)
1282 {
1283         int rc, mds = id_group(id), loop = 0;
1284         struct obd_device *obd = exp->exp_obd;
1285         struct lmv_obd *lmv = &obd->u.lmv;
1286         struct lustre_id rid = *id;
1287         struct mds_body *body;
1288         struct lmv_obj *obj;
1289         ENTRY;
1290         
1291         rc = lmv_check_connect(obd);
1292         if (rc)
1293                 RETURN(rc);
1294 repeat:
1295         LASSERT(++loop <= 2);
1296         obj = lmv_grab_obj(obd, id);
1297         if (obj) {
1298                 /* directory is splitted. look for right mds for this name */
1299                 mds = raw_name2idx(obj->hashtype, obj->objcount, 
1300                                    filename, namelen - 1);
1301                 rid = obj->objs[mds].id;
1302                 lmv_put_obj(obj);
1303         }
1304         
1305         CDEBUG(D_OTHER, "getattr_lock for %*s on "DLID4" -> "DLID4"\n",
1306                namelen, filename, OLID4(id), OLID4(&rid));
1307
1308         rc = md_getattr_lock(lmv->tgts[id_group(&rid)].ltd_exp,
1309                              &rid, filename, namelen,
1310                              valid == OBD_MD_FLID ? valid : valid | OBD_MD_FID,
1311                              ea_size, request);
1312         if (rc == 0) {
1313                 /*
1314                  * this could be cross-node reference. in this case all we have
1315                  * right now is lustre_id triple. we'd like to find other
1316                  * attributes.
1317                  */
1318                 body = lustre_msg_buf((*request)->rq_repmsg, 0, sizeof(*body));
1319                 LASSERT(body != NULL);
1320                 LASSERT((body->valid & OBD_MD_FID) != 0
1321                                 || body->valid == OBD_MD_FLID);
1322
1323                 if (body->valid & OBD_MD_MDS) {
1324                         struct ptlrpc_request *req = NULL;
1325                         
1326                         rid = body->id1;
1327                         CDEBUG(D_OTHER, "request attrs for "DLID4"\n", OLID4(&rid));
1328
1329                         rc = md_getattr_lock(lmv->tgts[id_group(&rid)].ltd_exp, 
1330                                              &rid, NULL, 1, valid, ea_size, &req);
1331                         ptlrpc_req_finished(*request);
1332                         *request = req;
1333                 }
1334         } else if (rc == -ERESTART) {
1335                 /* directory got splitted. time to update local object and
1336                  * repeat the request with proper MDS */
1337                 rc = lmv_get_mea_and_update_object(exp, &rid);
1338                 if (rc == 0) {
1339                         ptlrpc_req_finished(*request);
1340                         goto repeat;
1341                 }
1342         }
1343         RETURN(rc);
1344 }
1345
1346 /*
1347  * llite passes id of an target inode in data->id1 and id of directory in
1348  * data->id2
1349  */
1350 static int lmv_link(struct obd_export *exp, struct mdc_op_data *data,
1351                     struct ptlrpc_request **request)
1352 {
1353         struct obd_device *obd = exp->exp_obd;
1354         struct lmv_obd *lmv = &obd->u.lmv;
1355         struct lmv_obj *obj;
1356         int rc, mds;
1357         ENTRY;
1358         
1359         rc = lmv_check_connect(obd);
1360         if (rc)
1361                 RETURN(rc);
1362
1363         if (data->namelen != 0) {
1364                 /* usual link request */
1365                 obj = lmv_grab_obj(obd, &data->id2);
1366                 if (obj) {
1367                         rc = raw_name2idx(obj->hashtype, obj->objcount, 
1368                                           data->name, data->namelen);
1369                         data->id2 = obj->objs[rc].id;
1370                         lmv_put_obj(obj);
1371                 }
1372
1373                 mds = id_group(&data->id2);
1374                 
1375                 CDEBUG(D_OTHER,"link "DLID4":%*s to "DLID4"\n",
1376                        OLID4(&data->id2), data->namelen, data->name,
1377                        OLID4(&data->id1));
1378         } else {
1379                 mds = id_group(&data->id1);
1380                 
1381                 /* request from MDS to acquire i_links for inode by id1 */
1382                 CDEBUG(D_OTHER, "inc i_nlinks for "DLID4"\n",
1383                        OLID4(&data->id1));
1384         }
1385
1386         CDEBUG(D_OTHER, "forward to MDS #%u ("DLID4")\n",
1387                mds, OLID4(&data->id1));
1388         rc = md_link(lmv->tgts[mds].ltd_exp, data, request);
1389         
1390         RETURN(rc);
1391 }
1392
1393 static int lmv_rename(struct obd_export *exp, struct mdc_op_data *data,
1394                       const char *old, int oldlen, const char *new, int newlen,
1395                       struct ptlrpc_request **request)
1396 {
1397         struct obd_device *obd = exp->exp_obd;
1398         struct lmv_obd *lmv = &obd->u.lmv;
1399         struct lmv_obj *obj;
1400         int rc, mds;
1401         ENTRY;
1402
1403         CDEBUG(D_OTHER, "rename %*s in "DLID4" to %*s in "DLID4"\n",
1404                oldlen, old, OLID4(&data->id1), newlen, new,
1405                OLID4(&data->id2));
1406
1407         rc = lmv_check_connect(obd);
1408         if (rc)
1409                 RETURN(rc);
1410
1411         if (oldlen == 0) {
1412                 /*
1413                  * MDS with old dir entry is asking another MDS to create name
1414                  * there.
1415                  */
1416                 CDEBUG(D_OTHER,
1417                        "create %*s(%d/%d) in "DLID4" pointing "
1418                        "to "DLID4"\n", newlen, new, oldlen, newlen,
1419                        OLID4(&data->id2), OLID4(&data->id1));
1420
1421                 mds = id_group(&data->id2);
1422
1423                 /* 
1424                  * target directory can be splitted, sowe should forward request
1425                  * to the right MDS.
1426                  */
1427                 obj = lmv_grab_obj(obd, &data->id2);
1428                 if (obj) {
1429                         mds = raw_name2idx(obj->hashtype, obj->objcount, 
1430                                            (char *)new, newlen);
1431                         data->id2 = obj->objs[mds].id;
1432                         CDEBUG(D_OTHER, "forward to MDS #%u ("DLID4")\n", mds,
1433                                OLID4(&data->id2));
1434                         lmv_put_obj(obj);
1435                 }
1436                 goto request;
1437         }
1438
1439         obj = lmv_grab_obj(obd, &data->id1);
1440         if (obj) {
1441                 /*
1442                  * directory is already splitted, so we have to forward request
1443                  * to the right MDS.
1444                  */
1445                 mds = raw_name2idx(obj->hashtype, obj->objcount, 
1446                                    (char *)old, oldlen);
1447                 data->id1 = obj->objs[mds].id;
1448                 CDEBUG(D_OTHER, "forward to MDS #%u ("DLID4")\n", mds,
1449                        OLID4(&data->id1));
1450                 lmv_put_obj(obj);
1451         }
1452
1453         obj = lmv_grab_obj(obd, &data->id2);
1454         if (obj) {
1455                 /*
1456                  * directory is already splitted, so we have to forward request
1457                  * to the right MDS.
1458                  */
1459                 mds = raw_name2idx(obj->hashtype, obj->objcount, 
1460                                    (char *)new, newlen);
1461                 
1462                 data->id2 = obj->objs[mds].id;
1463                 CDEBUG(D_OTHER, "forward to MDS #%u ("DLID4")\n", mds,
1464                        OLID4(&data->id2));
1465                 lmv_put_obj(obj);
1466         }
1467         
1468         mds = id_group(&data->id1);
1469
1470 request:
1471         if (id_group(&data->id1) != id_group(&data->id2)) {
1472                 CDEBUG(D_OTHER,"cross-node rename "DLID4"/%*s to "DLID4"/%*s\n",
1473                        OLID4(&data->id1), oldlen, old, OLID4(&data->id2),
1474                        newlen, new);
1475         }
1476
1477         rc = md_rename(lmv->tgts[mds].ltd_exp, data, old, oldlen,
1478                        new, newlen, request); 
1479         RETURN(rc);
1480 }
1481
1482 static int lmv_setattr(struct obd_export *exp, struct mdc_op_data *data,
1483                        struct iattr *iattr, void *ea, int ealen, void *ea2,
1484                        int ea2len, void *ea3, int ea3len, 
1485                        struct ptlrpc_request **request)
1486 {
1487         struct obd_device *obd = exp->exp_obd;
1488         struct lmv_obd *lmv = &obd->u.lmv;
1489         struct ptlrpc_request *req;
1490         struct mds_body *body;
1491         struct lmv_obj *obj;
1492         int rc = 0, i;
1493         ENTRY;
1494
1495         rc = lmv_check_connect(obd);
1496         if (rc)
1497                 RETURN(rc);
1498
1499         obj = lmv_grab_obj(obd, &data->id1);
1500         
1501         CDEBUG(D_OTHER, "SETATTR for "DLID4", valid 0x%x%s\n",
1502                OLID4(&data->id1), iattr->ia_valid, obj ? ", splitted" : "");
1503         
1504         if (obj) {
1505                 for (i = 0; i < obj->objcount; i++) {
1506                         data->id1 = obj->objs[i].id;
1507                         
1508                         rc = md_setattr(lmv->tgts[id_group(&data->id1)].ltd_exp, 
1509                                         data, iattr, ea, ealen, ea2, ea2len, 
1510                                         ea3, ea3len, &req);
1511
1512                         if (id_equal_fid(&obj->id, &obj->objs[i].id)) {
1513                                 /*
1514                                  * this is master object and this request should
1515                                  * be returned back to llite.
1516                                  */
1517                                 *request = req;
1518                         } else {
1519                                 ptlrpc_req_finished(req);
1520                         }
1521
1522                         if (rc)
1523                                 break;
1524                 }
1525                 lmv_put_obj(obj);
1526         } else {
1527                 LASSERT(id_group(&data->id1) < lmv->desc.ld_tgt_count);
1528                 rc = md_setattr(lmv->tgts[id_group(&data->id1)].ltd_exp,
1529                                 data, iattr, ea, ealen, ea2, ea2len, ea3,
1530                                 ea3len, request); 
1531                 if (rc == 0) {
1532                         body = lustre_msg_buf((*request)->rq_repmsg, 0,
1533                                               sizeof(*body));
1534                         LASSERT(body != NULL);
1535                         LASSERT((body->valid & OBD_MD_FID) != 0);
1536                         LASSERT(id_group(&body->id1) == id_group(&data->id1));
1537                 }
1538         }
1539         RETURN(rc);
1540 }
1541
1542 static int lmv_sync(struct obd_export *exp, struct lustre_id *id,
1543                     struct ptlrpc_request **request)
1544 {
1545         struct obd_device *obd = exp->exp_obd;
1546         struct lmv_obd *lmv = &obd->u.lmv;
1547         int rc;
1548         ENTRY;
1549
1550         rc = lmv_check_connect(obd);
1551         if (rc)
1552                 RETURN(rc);
1553
1554         rc = md_sync(lmv->tgts[id_group(id)].ltd_exp, 
1555                      id, request);
1556         RETURN(rc);
1557 }
1558
1559 int lmv_dirobj_blocking_ast(struct ldlm_lock *lock, 
1560                             struct ldlm_lock_desc *desc,
1561                             void *data, int flag)
1562 {
1563         struct lustre_handle lockh;
1564         struct lmv_obj *obj;
1565         int rc;
1566         ENTRY;
1567
1568         switch (flag) {
1569         case LDLM_CB_BLOCKING:
1570                 ldlm_lock2handle(lock, &lockh);
1571                 rc = ldlm_cli_cancel(&lockh);
1572                 if (rc < 0) {
1573                         CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
1574                         RETURN(rc);
1575                 }
1576                 break;
1577         case LDLM_CB_CANCELING:
1578                 /* time to drop cached attrs for dirobj */
1579                 obj = lock->l_ast_data;
1580                 if (obj) {
1581                         CDEBUG(D_OTHER, "cancel %s on "LPU64"/"LPU64
1582                                ", master "DLID4"\n",
1583                                lock->l_resource->lr_name.name[3] == 1 ?
1584                                "LOOKUP" : "UPDATE",
1585                                lock->l_resource->lr_name.name[0],
1586                                lock->l_resource->lr_name.name[1], 
1587                                OLID4(&obj->id));
1588                         lmv_put_obj(obj);
1589                 }
1590                 break;
1591         default:
1592                 LBUG();
1593         }
1594         RETURN(0);
1595 }
1596
1597 static void lmv_remove_dots(struct page *page)
1598 {
1599         unsigned limit = PAGE_CACHE_SIZE;
1600         char *kaddr = page_address(page);
1601         struct ext2_dir_entry_2 *p;
1602         unsigned offs, rec_len;
1603
1604         for (offs = 0; offs <= limit - EXT2_DIR_REC_LEN(1); offs += rec_len) {
1605                 p = (struct ext2_dir_entry_2 *)(kaddr + offs);
1606                 rec_len = le16_to_cpu(p->rec_len);
1607
1608                 if ((p->name_len == 1 && p->name[0] == '.') ||
1609                     (p->name_len == 2 && p->name[0] == '.' && p->name[1] == '.'))
1610                         p->inode = 0;
1611         }
1612 }
1613
1614 static int lmv_readpage(struct obd_export *exp, struct lustre_id *id,
1615                         __u64 offset, struct page *page,
1616                         struct ptlrpc_request **request)
1617 {
1618         struct obd_device *obd = exp->exp_obd;
1619         struct lmv_obd *lmv = &obd->u.lmv;
1620         struct lustre_id rid = *id;
1621         struct lmv_obj *obj;
1622         int rc, i;
1623         ENTRY;
1624
1625 #warning "we need well-desgined readdir() implementation"
1626         rc = lmv_check_connect(obd);
1627         if (rc)
1628                 RETURN(rc);
1629
1630         LASSERT(id_group(id) < lmv->desc.ld_tgt_count);
1631         CDEBUG(D_OTHER, "READPAGE at %llu from "DLID4"\n",
1632                offset, OLID4(&rid));
1633
1634         obj = lmv_grab_obj(obd, id);
1635         if (obj) {
1636                 lmv_lock_obj(obj);
1637
1638                 /* find dirobj containing page with requested offset. */
1639                 for (i = 0; i < obj->objcount; i++) {
1640                         if (offset < obj->objs[i].size)
1641                                 break;
1642                         offset -= obj->objs[i].size;
1643                 }
1644                 rid = obj->objs[i].id;
1645                 
1646                 lmv_unlock_obj(obj);
1647                 lmv_put_obj(obj);
1648                 
1649                 CDEBUG(D_OTHER, "forward to "DLID4" with offset %lu\n",
1650                        OLID4(&rid), (unsigned long)offset);
1651         }
1652         rc = md_readpage(lmv->tgts[id_group(&rid)].ltd_exp, &rid, 
1653                          offset, page, request);
1654         
1655         if (rc == 0 && !id_equal_fid(&rid, id))
1656                 /* this page isn't from master object. To avoid "." and ".." 
1657                  * duplication in directory, we have to remove them from all
1658                  * slave objects */
1659                 lmv_remove_dots(page);
1660         
1661         RETURN(rc);
1662 }
1663
1664 static int lmv_unlink_slaves(struct obd_export *exp, struct mdc_op_data *data,
1665                              struct ptlrpc_request **req)
1666 {
1667         struct obd_device *obd = exp->exp_obd;
1668         struct lmv_obd *lmv = &obd->u.lmv;
1669         struct mea *mea = data->mea1;
1670         struct mdc_op_data *data2;
1671         int i, rc = 0;
1672         ENTRY;
1673
1674         OBD_ALLOC(data2, sizeof(*data2));
1675         if (data2 == NULL)
1676                 RETURN(-ENOMEM);
1677         
1678         LASSERT(mea != NULL);
1679         for (i = 0; i < mea->mea_count; i++) {
1680                 memset(data2, 0, sizeof(*data2));
1681                 data2->id1 = mea->mea_ids[i];
1682                 data2->create_mode = MDS_MODE_DONT_LOCK | S_IFDIR;
1683                 
1684                 if (lmv->tgts[id_group(&data2->id1)].ltd_exp == NULL)
1685                         continue;
1686
1687                 rc = md_unlink(lmv->tgts[id_group(&data2->id1)].ltd_exp,
1688                                data2, req);
1689                 
1690                 CDEBUG(D_OTHER, "unlink slave "DLID4" -> %d\n",
1691                        OLID4(&mea->mea_ids[i]), rc);
1692                 
1693                 if (*req) {
1694                         ptlrpc_req_finished(*req);
1695                         *req = NULL;
1696                 }
1697                 if (rc)
1698                         RETURN(rc);
1699         }
1700         OBD_FREE(data2, sizeof(*data2));
1701         RETURN(rc);
1702 }
1703
1704 static int lmv_delete_inode(struct obd_export *exp, struct lustre_id *id)
1705 {
1706         ENTRY;
1707
1708         LASSERT(exp && id);
1709         if (lmv_delete_obj(exp, id)) {
1710                 CDEBUG(D_OTHER, "lmv object "DLID4" is destroyed.\n",
1711                        OLID4(id));
1712         }
1713         RETURN(0);
1714 }
1715
1716 static int lmv_unlink(struct obd_export *exp, struct mdc_op_data *data,
1717                       struct ptlrpc_request **request)
1718 {
1719         struct obd_device *obd = exp->exp_obd;
1720         struct lmv_obd *lmv = &obd->u.lmv;
1721         int rc, i = 0;
1722         ENTRY;
1723         
1724         rc = lmv_check_connect(obd);
1725         if (rc)
1726                 RETURN(rc);
1727
1728         if (data->namelen == 0 && data->mea1 != NULL) {
1729                 /* mds asks to remove slave objects */
1730                 rc = lmv_unlink_slaves(exp, data, request);
1731                 RETURN(rc);
1732         }
1733
1734         if (data->namelen != 0) {
1735                 struct lmv_obj *obj;
1736                 
1737                 obj = lmv_grab_obj(obd, &data->id1);
1738                 if (obj) {
1739                         i = raw_name2idx(obj->hashtype, obj->objcount,
1740                                          data->name, data->namelen);
1741                         data->id1 = obj->objs[i].id;
1742                         lmv_put_obj(obj);
1743                 }
1744                 CDEBUG(D_OTHER, "unlink '%*s' in "DLID4" -> %u\n",
1745                        data->namelen, data->name, OLID4(&data->id1),
1746                        i);
1747         } else {
1748                 CDEBUG(D_OTHER, "drop i_nlink on "DLID4"\n",
1749                        OLID4(&data->id1));
1750         }
1751         rc = md_unlink(lmv->tgts[id_group(&data->id1)].ltd_exp, 
1752                        data, request);
1753         RETURN(rc);
1754 }
1755
1756 static struct obd_device *lmv_get_real_obd(struct obd_export *exp,
1757                                            struct lustre_id *id)
1758 {
1759         struct obd_device *obd = exp->exp_obd;
1760         struct lmv_obd *lmv = &obd->u.lmv;
1761         int rc;
1762         ENTRY;
1763
1764         rc = lmv_check_connect(obd);
1765         if (rc)
1766                 RETURN(ERR_PTR(rc));
1767         obd = lmv->tgts[id_group(id)].ltd_exp->exp_obd;
1768         EXIT;
1769         
1770         return obd;
1771 }
1772
1773 static int lmv_obd_create_single(struct obd_export *exp, struct obdo *oa,
1774                                  void *acl, int acl_size,
1775                                  struct lov_stripe_md **ea,
1776                                  struct obd_trans_info *oti)
1777 {
1778         struct obd_device *obd = exp->exp_obd;
1779         struct lmv_obd *lmv = &obd->u.lmv;
1780         struct lov_stripe_md obj_md;
1781         struct lov_stripe_md *obj_mdp = &obj_md;
1782         int rc = 0;
1783         ENTRY;
1784
1785         LASSERT(ea == NULL);
1786         LASSERT(oa->o_mds < lmv->desc.ld_tgt_count);
1787
1788         rc = obd_create(lmv->tgts[oa->o_mds].ltd_exp, oa,
1789                         acl, acl_size, &obj_mdp, oti);
1790
1791         RETURN(rc);
1792 }
1793
1794 /*
1795  * to be called from MDS only. @oa should have correct store cookie and o_fid
1796  * values for "master" object, as it will be used.
1797  */
1798 int lmv_obd_create(struct obd_export *exp, struct obdo *oa,
1799                    void *acl, int acl_size,
1800                    struct lov_stripe_md **ea, struct obd_trans_info *oti)
1801 {
1802         struct obd_device *obd = exp->exp_obd;
1803         struct lmv_obd *lmv = &obd->u.lmv;
1804         struct lustre_id mid;
1805         int i, c, rc = 0;
1806         struct mea *mea;
1807         ENTRY;
1808
1809         rc = lmv_check_connect(obd);
1810         if (rc)
1811                 RETURN(rc);
1812
1813         LASSERT(oa != NULL);
1814         
1815         if (ea == NULL) {
1816                 rc = lmv_obd_create_single(exp, oa, acl, acl_size, NULL, oti);
1817                 if (rc)
1818                         CERROR("Can't create object, rc = %d\n", rc);
1819                 RETURN(rc);
1820         }
1821
1822         /* acl is only suppied when mds create single remote obj */
1823         LASSERT(acl == NULL && acl_size == 0);
1824
1825         if (*ea == NULL) {
1826                 rc = obd_alloc_diskmd(exp, (struct lov_mds_md **)ea);
1827                 if (rc < 0) {
1828                         CERROR("obd_alloc_diskmd() failed, error %d\n",
1829                                rc);
1830                         RETURN(rc);
1831                 } else
1832                         rc = 0;
1833                 
1834                 if (*ea == NULL)
1835                         RETURN(-ENOMEM);
1836         }
1837
1838         /* 
1839          * here we should take care about splitted dir, so store cookie and fid
1840          * for "master" object should already be allocated and passed in @oa.
1841          */
1842         LASSERT(oa->o_id != 0);
1843         LASSERT(oa->o_fid != 0);
1844
1845         /* save "master" object id */
1846         obdo2id(&mid, oa);
1847
1848         mea = (struct mea *)*ea;
1849         mea->mea_master = -1;
1850         mea->mea_magic = MEA_MAGIC_ALL_CHARS;
1851
1852         if (!mea->mea_count || mea->mea_count > lmv->desc.ld_tgt_count)
1853                 mea->mea_count = lmv->desc.ld_tgt_count;
1854
1855         for (i = 0, c = 0; c < mea->mea_count && i < lmv->desc.ld_tgt_count; i++) {
1856                 struct lov_stripe_md obj_md;
1857                 struct lov_stripe_md *obj_mdp = &obj_md;
1858                
1859                 if (lmv->tgts[i].ltd_exp == NULL) {
1860                         /* this is "master" MDS */
1861                         mea->mea_master = i;
1862                         mea->mea_ids[c] = mid;
1863                         c++;
1864                         continue;
1865                 }
1866
1867                 /*
1868                  * "master" MDS should always be part of stripped dir,
1869                  * so scan for it.
1870                  */
1871                 if (mea->mea_master == -1 && c == mea->mea_count - 1)
1872                         continue;
1873
1874                 oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLTYPE | OBD_MD_FLMODE |
1875                         OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLID;
1876
1877                 rc = obd_create(lmv->tgts[c].ltd_exp, oa, NULL, 0,
1878                                 &obj_mdp, oti);
1879                 if (rc) {
1880                         CERROR("obd_create() failed on MDT target %d, "
1881                                "error %d\n", c, rc);
1882                         RETURN(rc);
1883                 }
1884
1885                 CDEBUG(D_OTHER, "dirobj at mds %d: "LPU64"/%u\n",
1886                        i, oa->o_id, oa->o_generation);
1887
1888
1889                 /*
1890                  * here, when object is created (or it is master and was passed
1891                  * from caller) on desired MDS we save its fid to local mea_ids.
1892                  */
1893                 LASSERT(oa->o_fid);
1894
1895                 /* 
1896                  * store cookie should be defined here for both cases (master
1897                  * object and not master), because master is already created.
1898                  */
1899                 LASSERT(oa->o_id);
1900
1901                 /* fill mea by store cookie and fid */
1902                 obdo2id(&mea->mea_ids[c], oa);
1903                 c++;
1904         }
1905         LASSERT(c == mea->mea_count);
1906
1907         CDEBUG(D_OTHER, "%d dirobjects created\n",
1908                (int)mea->mea_count);
1909         
1910         RETURN(rc);
1911 }
1912
1913 static int lmv_llog_init(struct obd_device *obd, struct obd_llogs *llogs, 
1914                          struct obd_device *tgt, int count,
1915                          struct llog_catid *logid)
1916 {
1917         struct llog_ctxt *ctxt;
1918         int rc;
1919         ENTRY;
1920
1921         rc = obd_llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
1922                             &llog_client_ops);
1923         if (rc == 0) {
1924                 ctxt = llog_get_context(llogs, LLOG_CONFIG_REPL_CTXT);
1925                 ctxt->loc_imp = tgt->u.cli.cl_import;
1926         }
1927
1928         RETURN(rc);
1929 }
1930
1931 static int lmv_llog_finish(struct obd_device *obd,
1932                            struct obd_llogs *llogs, int count)
1933 {
1934         int rc;
1935         ENTRY;
1936
1937         rc = obd_llog_cleanup(llog_get_context(llogs, LLOG_CONFIG_REPL_CTXT));
1938         RETURN(rc);
1939 }
1940
1941 static int lmv_precleanup(struct obd_device *obd, int flags)
1942 {
1943         int rc = 0;
1944         
1945         rc = obd_llog_finish(obd, &obd->obd_llogs, 0);
1946         if (rc != 0)
1947                 CERROR("failed to cleanup llogging subsystems\n");
1948
1949         RETURN(rc);
1950 }
1951
1952 static int lmv_get_info(struct obd_export *exp, __u32 keylen,
1953                         void *key, __u32 *vallen, void *val)
1954 {
1955         struct obd_device *obd;
1956         struct lmv_obd *lmv;
1957         int rc = 0;
1958         ENTRY;
1959
1960         obd = class_exp2obd(exp);
1961         if (obd == NULL) {
1962                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
1963                        exp->exp_handle.h_cookie);
1964                 RETURN(-EINVAL);
1965         }
1966
1967         lmv = &obd->u.lmv;
1968         if (keylen == strlen("mdsize") && !strcmp(key, "mdsize")) {
1969                 __u32 *mdsize = val;
1970                 *vallen = sizeof(__u32);
1971                 *mdsize = sizeof(struct lustre_id) * lmv->desc.ld_tgt_count
1972                        + sizeof(struct mea);
1973                 RETURN(0);
1974         } else if (keylen == strlen("mdsnum") && !strcmp(key, "mdsnum")) {
1975                 struct obd_uuid *cluuid = &lmv->cluuid;
1976                 struct lmv_tgt_desc *tgts;
1977                 __u32 *mdsnum = val;
1978                 int i;
1979
1980                 tgts = lmv->tgts;
1981                 for (i = 0; i < lmv->desc.ld_tgt_count; i++, tgts++) {
1982                         if (obd_uuid_equals(&tgts->uuid, cluuid)) {
1983                                 *vallen = sizeof(__u32);
1984                                 *mdsnum = i;
1985                                 RETURN(0);
1986                         }
1987                 }
1988                 LASSERT(0);
1989         } else if (keylen == strlen("rootid") && !strcmp(key, "rootid")) {
1990                 rc = lmv_check_connect(obd);
1991                 if (rc)
1992                         RETURN(rc);
1993                 
1994                 /* getting rootid from first MDS. */
1995                 rc = obd_get_info(lmv->tgts[0].ltd_exp, keylen, key,
1996                                   vallen, val);
1997                 RETURN(rc);
1998         } else if (keylen >= strlen("lmvdesc") && !strcmp(key, "lmvdesc")) {
1999                 struct lmv_desc *desc_ret = val;
2000                 *desc_ret = lmv->desc;
2001                 RETURN(0);
2002         } else if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
2003                 struct lmv_tgt_desc *tgts;
2004                 int i;
2005
2006                 rc = lmv_check_connect(obd);
2007                 if (rc)
2008                         RETURN(rc);
2009                 
2010                 LASSERT(*vallen == sizeof(__u32));
2011                 for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count;
2012                      i++, tgts++) {
2013
2014                         /* all tgts should be connected when this get called. */
2015                         if (!tgts || !tgts->ltd_exp) {
2016                                 CERROR("target not setup?\n");
2017                                 continue;
2018                         }
2019
2020                         if (!obd_get_info(tgts->ltd_exp, keylen, key,
2021                                           vallen, val))
2022                                 RETURN(0);
2023                 }
2024                 RETURN(-EINVAL);
2025         } else if (keylen >= strlen("lovdesc") && !strcmp(key, "lovdesc")) {
2026                 rc = lmv_check_connect(obd);
2027                 if (rc)
2028                         RETURN(rc);
2029
2030                 /* forwarding this request to first MDS, it should know LOV
2031                  * desc. */
2032                 rc = obd_get_info(lmv->tgts[0].ltd_exp, keylen, key,
2033                                   vallen, val);
2034                 RETURN(rc);
2035         } else if (keylen >= strlen("getext") && !strcmp(key, "getext")) {
2036                 struct lmv_tgt_desc *tgts;
2037                 int i;
2038
2039                 rc = lmv_check_connect(obd);
2040                 if (rc)
2041                         RETURN(rc);
2042
2043                 LASSERT(*vallen == sizeof(struct fid_extent));
2044                 for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count;
2045                      i++, tgts++) {
2046
2047                         /* all tgts should be connected when this get called. */
2048                         if (!tgts || !tgts->ltd_exp) {
2049                                 CERROR("target not setup?\n");
2050                                 continue;
2051                         }
2052
2053                         rc = obd_get_info(tgts->ltd_exp, keylen, key,
2054                                           vallen, val);
2055                         if (rc)
2056                                 RETURN(rc);
2057                 }
2058                 RETURN(0);
2059         }
2060
2061         CDEBUG(D_IOCTL, "invalid key\n");
2062         RETURN(-EINVAL);
2063 }
2064
2065 int lmv_set_info(struct obd_export *exp, obd_count keylen,
2066                  void *key, obd_count vallen, void *val)
2067 {
2068         struct lmv_tgt_desc    *tgt;
2069         struct obd_device      *obd;
2070         struct lmv_obd         *lmv;
2071         int rc = 0;
2072         ENTRY;
2073
2074         obd = class_exp2obd(exp);
2075         if (obd == NULL) {
2076                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2077                        exp->exp_handle.h_cookie);
2078                 RETURN(-EINVAL);
2079         }
2080         lmv = &obd->u.lmv;
2081
2082         if (keylen >= strlen("inter_mds") && strcmp(key, "inter_mds") == 0) {
2083                 lmv->server_timeout = 1;
2084                 lmv_set_timeouts(obd);
2085                 RETURN(0);
2086         }
2087
2088         /* maybe this could be default */
2089         if ((keylen == strlen("sec") && strcmp(key, "sec") == 0) ||
2090             (keylen == strlen("sec_flags") && strcmp(key, "sec_flags") == 0) ||
2091             (keylen == strlen("nllu") && strcmp(key, "nllu") == 0)) {
2092                 struct obd_export *exp;
2093                 int err, i;
2094
2095                 spin_lock(&lmv->lmv_lock);
2096                 for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count;
2097                      i++, tgt++) {
2098                         exp = tgt->ltd_exp;
2099                         /* during setup time the connections to mdc might
2100                          * haven't been established.
2101                          */
2102                         if (exp == NULL) {
2103                                 struct obd_device *tgt_obd;
2104
2105                                 tgt_obd = class_find_client_obd(&tgt->uuid,
2106                                                                 OBD_MDC_DEVICENAME,
2107                                                                 &obd->obd_uuid);
2108                                 if (!tgt_obd) {
2109                                         CERROR("can't set info %s, "
2110                                                "device %s not attached?\n",
2111                                                 (char *) key, tgt->uuid.uuid);
2112                                         rc = -EINVAL;
2113                                         continue;
2114                                 }
2115                                 exp = tgt_obd->obd_self_export;
2116                         }
2117
2118                         err = obd_set_info(exp, keylen, key, vallen, val);
2119                         if (!rc)
2120                                 rc = err;
2121                 }
2122                 spin_unlock(&lmv->lmv_lock);
2123
2124                 RETURN(rc);
2125         }
2126         if (keylen == 5 && strcmp(key, "audit") == 0) {
2127                 struct audit_attr_msg * msg = val;
2128                 int mds = id_group(&msg->id);
2129                 int i;
2130                 LASSERT(mds < lmv->desc.ld_tgt_count);
2131                 
2132                 if (IS_AUDIT_OP(msg->attr, AUDIT_FS)) {
2133                         //FS audit, send message to all mds
2134                         for (i = 0; i < lmv->desc.ld_tgt_count;i++) {
2135                                 obd_set_info(lmv->tgts[i].ltd_exp, 
2136                                                   keylen, key, vallen, val);
2137                         }
2138                 }
2139                 else if (IS_AUDIT_OP(msg->attr, AUDIT_DIR)) {
2140                         //audit for dir.
2141                         //if dir is splitted, send RPC to all mds involved
2142                         struct lmv_obj *obj;
2143                         struct lustre_id rid;
2144                         int i;
2145                         
2146                         obj = lmv_grab_obj(obd, &msg->id);
2147                         if (obj) {
2148                                 lmv_lock_obj(obj);
2149                                 for (i = 0; i < obj->objcount; i++) {
2150                                         rid = obj->objs[i].id;
2151                                         mds = id_group(&rid);
2152                                         obd_set_info(lmv->tgts[mds].ltd_exp,
2153                                                           keylen, key,
2154                                                           vallen, val);
2155                                 }
2156                                 lmv_unlock_obj(obj);
2157                                 lmv_put_obj(obj);
2158                         }
2159                         else {
2160                                 rc = obd_set_info(lmv->tgts[mds].ltd_exp,
2161                                                  keylen, key, vallen, val);
2162                         }
2163                 }
2164                 else {
2165                         //set audit for file
2166                         rc = obd_set_info(lmv->tgts[mds].ltd_exp,
2167                                           keylen, key, vallen, val);                        
2168                 }
2169                 RETURN(rc);
2170         }
2171         if (((keylen == strlen("flush_cred") &&
2172              strcmp(key, "flush_cred") == 0)) || 
2173              ((keylen == strlen("crypto_type") &&
2174              strcmp(key, "crypto_type") == 0))) {
2175                 int i;
2176
2177                 for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count;
2178                      i++, tgt++) {
2179                         if (!tgt->ltd_exp)
2180                                 continue;
2181                         rc = obd_set_info(tgt->ltd_exp,
2182                                           keylen, key, vallen, val);
2183                         if (rc)
2184                                 RETURN(rc);
2185                 }
2186
2187                 RETURN(0);
2188         }
2189         
2190         if (keylen == strlen("ids") && memcmp(key, "ids", keylen) == 0) {
2191                 struct lustre_id *id = (struct lustre_id *)val;
2192                 
2193                 rc = lmv_check_connect(obd);
2194                 if (rc)
2195                         RETURN(rc);
2196
2197                 rc = obd_set_info(lmv->tgts[id_group(id)].ltd_exp, 
2198                                   keylen, key, vallen, val); 
2199                 RETURN(rc);
2200         }
2201
2202         if (keylen == strlen("chkconnect") && 
2203             memcmp(key, "chkconnect", keylen) == 0) {
2204                 rc = lmv_check_connect(obd);
2205                 RETURN(rc);
2206         }
2207
2208         RETURN(-EINVAL);
2209 }
2210
2211 int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
2212                struct lov_stripe_md *lsm)
2213 {
2214         struct obd_device *obd = class_exp2obd(exp);
2215         struct lmv_obd *lmv = &obd->u.lmv;
2216         struct mea *meap, *lsmp;
2217         int mea_size, i;
2218         ENTRY;
2219
2220         mea_size = (sizeof(struct lustre_id) * 
2221                     lmv->desc.ld_tgt_count) + sizeof(struct mea);
2222         if (!lmmp)
2223                 RETURN(mea_size);
2224
2225         if (*lmmp && !lsm) {
2226                 OBD_FREE(*lmmp, mea_size);
2227                 *lmmp = NULL;
2228                 RETURN(0);
2229         }
2230
2231         if (*lmmp == NULL) {
2232                 OBD_ALLOC(*lmmp, mea_size);
2233                 if (*lmmp == NULL)
2234                         RETURN(-ENOMEM);
2235         }
2236
2237         if (!lsm)
2238                 RETURN(mea_size);
2239
2240         lsmp = (struct mea *)lsm;
2241         meap = (struct mea *)*lmmp;
2242
2243         if (lsmp->mea_magic != MEA_MAGIC_LAST_CHAR &&
2244             lsmp->mea_magic != MEA_MAGIC_ALL_CHARS)
2245                 RETURN(-EINVAL);
2246
2247         meap->mea_magic = cpu_to_le32(lsmp->mea_magic);
2248         meap->mea_count = cpu_to_le32(lsmp->mea_count);
2249         meap->mea_master = cpu_to_le32(lsmp->mea_master);
2250
2251         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2252                 meap->mea_ids[i] = meap->mea_ids[i];
2253                 id_cpu_to_le(&meap->mea_ids[i]);
2254         }
2255
2256         RETURN(mea_size);
2257 }
2258
2259 int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
2260                  struct lov_mds_md *lmm, int lmm_size)
2261 {
2262         struct obd_device *obd = class_exp2obd(exp);
2263         struct mea **tmea = (struct mea **)lsmp;
2264         struct mea *mea = (struct mea *)lmm;
2265         struct lmv_obd *lmv = &obd->u.lmv;
2266         int mea_size, i, rc = 0;
2267         __u32 magic;
2268         ENTRY;
2269
2270         mea_size = sizeof(struct lustre_id) * 
2271                 lmv->desc.ld_tgt_count + sizeof(struct mea);
2272
2273         if (lsmp == NULL)
2274                 return mea_size;
2275
2276         if (*lsmp != NULL && lmm == NULL) {
2277                 OBD_FREE(*tmea, mea_size);
2278                 RETURN(0);
2279         }
2280
2281         LASSERT(mea_size == lmm_size);
2282
2283         OBD_ALLOC(*tmea, mea_size);
2284         if (*tmea == NULL)
2285                 RETURN(-ENOMEM);
2286
2287         if (!lmm)
2288                 RETURN(mea_size);
2289
2290         if (mea->mea_magic == MEA_MAGIC_LAST_CHAR ||
2291             mea->mea_magic == MEA_MAGIC_ALL_CHARS)
2292         {
2293                 magic = le32_to_cpu(mea->mea_magic);
2294         } else {
2295                 struct mea_old *old = (struct mea_old *)lmm;
2296         
2297                 mea_size = sizeof(struct lustre_id) * old->mea_count + 
2298                         sizeof(struct mea_old);
2299         
2300                 if (old->mea_count > 256 || old->mea_master > 256 ||
2301                     lmm_size < mea_size || old->mea_master > old->mea_count) {
2302                         CWARN("bad MEA: count %u, master %u, size %u\n",
2303                               old->mea_count, old->mea_master, mea_size);
2304                         GOTO(out_free_mea, rc = -EINVAL);
2305                 }
2306                 magic = MEA_MAGIC_LAST_CHAR;
2307         }
2308
2309         (*tmea)->mea_magic = magic;
2310         (*tmea)->mea_count = le32_to_cpu(mea->mea_count);
2311         (*tmea)->mea_master = le32_to_cpu(mea->mea_master);
2312
2313         for (i = 0; i < (*tmea)->mea_count; i++) {
2314                 (*tmea)->mea_ids[i] = mea->mea_ids[i];
2315                 id_le_to_cpu(&(*tmea)->mea_ids[i]);
2316         }
2317         RETURN(mea_size);
2318
2319 out_free_mea:
2320         OBD_FREE(*tmea, mea_size);
2321         return rc;
2322 }
2323
2324 int lmv_brw(int rw, struct obd_export *exp, struct obdo *oa,
2325             struct lov_stripe_md *ea, obd_count oa_bufs,
2326             struct brw_page *pgarr, struct obd_trans_info *oti)
2327 {
2328         struct obd_device *obd = exp->exp_obd;
2329         struct lmv_obd *lmv = &obd->u.lmv;
2330         struct mea *mea = (struct mea *) ea;
2331         int err;
2332       
2333         LASSERT(oa != NULL);
2334         LASSERT(ea != NULL);
2335         LASSERT(pgarr != NULL);
2336         LASSERT(oa->o_mds < lmv->desc.ld_tgt_count);
2337
2338         oa->o_gr = id_gen(&mea->mea_ids[oa->o_mds]);
2339         oa->o_id = id_ino(&mea->mea_ids[oa->o_mds]);
2340         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2341         
2342         err = obd_brw(rw, lmv->tgts[oa->o_mds].ltd_exp,
2343                       oa, NULL, oa_bufs, pgarr, oti);
2344         RETURN(err);
2345 }
2346
2347 static int lmv_cancel_unused(struct obd_export *exp,
2348                              struct lov_stripe_md *lsm, 
2349                              int flags, void *opaque)
2350 {
2351         struct obd_device *obd = exp->exp_obd;
2352         struct lmv_obd *lmv = &obd->u.lmv;
2353         int rc = 0, err, i;
2354         ENTRY;
2355
2356         LASSERT(lsm == NULL);
2357         
2358         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2359                 if (!lmv->tgts[i].ltd_exp || !lmv->tgts[i].active)
2360                         continue;
2361                 
2362                 err = obd_cancel_unused(lmv->tgts[i].ltd_exp,
2363                                         NULL, flags, opaque);
2364                 if (!rc)
2365                         rc = err;
2366         }
2367         RETURN(rc);
2368 }
2369
2370 struct obd_ops lmv_obd_ops = {
2371         .o_owner                = THIS_MODULE,
2372         .o_attach               = lmv_attach,
2373         .o_detach               = lmv_detach,
2374         .o_setup                = lmv_setup,
2375         .o_cleanup              = lmv_cleanup,
2376         .o_precleanup           = lmv_precleanup,
2377         .o_process_config       = lmv_process_config,
2378         .o_connect              = lmv_connect,
2379         .o_disconnect           = lmv_disconnect,
2380         .o_statfs               = lmv_statfs,
2381         .o_llog_init            = lmv_llog_init,
2382         .o_llog_finish          = lmv_llog_finish,
2383         .o_get_info             = lmv_get_info,
2384         .o_set_info             = lmv_set_info,
2385         .o_create               = lmv_obd_create,
2386         .o_packmd               = lmv_packmd,
2387         .o_unpackmd             = lmv_unpackmd,
2388         .o_brw                  = lmv_brw,
2389         .o_init_ea_size         = lmv_init_ea_size,
2390         .o_notify               = lmv_notify,
2391         .o_iocontrol            = lmv_iocontrol,
2392         .o_cancel_unused        = lmv_cancel_unused,
2393 };
2394
2395 struct md_ops lmv_md_ops = {
2396         .m_getstatus           = lmv_getstatus,
2397         .m_getattr             = lmv_getattr,
2398         .m_change_cbdata       = lmv_change_cbdata,
2399         .m_change_cbdata_name  = lmv_change_cbdata_name,
2400         .m_close               = lmv_close,
2401         .m_create              = lmv_create,
2402         .m_done_writing        = lmv_done_writing,
2403         .m_enqueue             = lmv_enqueue,
2404         .m_getattr_lock        = lmv_getattr_lock,
2405         .m_intent_lock         = lmv_intent_lock,
2406         .m_link                = lmv_link,
2407         .m_rename              = lmv_rename,
2408         .m_setattr             = lmv_setattr,
2409         .m_sync                = lmv_sync,
2410         .m_readpage            = lmv_readpage,
2411         .m_unlink              = lmv_unlink,
2412         .m_get_real_obd        = lmv_get_real_obd,
2413         .m_valid_attrs         = lmv_valid_attrs,
2414         .m_delete_inode        = lmv_delete_inode,
2415         .m_access_check        = lmv_access_check,
2416 };
2417
2418 int __init lmv_init(void)
2419 {
2420         struct lprocfs_static_vars lvars;
2421         int rc;
2422
2423         obj_cache = kmem_cache_create("lmv_objects",
2424                                       sizeof(struct lmv_obj),
2425                                       0, 0, NULL, NULL);
2426         if (!obj_cache) {
2427                 CERROR("error allocating lmv objects cache\n");
2428                 return -ENOMEM;
2429         }
2430
2431         lprocfs_init_vars(lmv, &lvars);
2432         rc = class_register_type(&lmv_obd_ops, &lmv_md_ops,
2433                                  lvars.module_vars,
2434                                  OBD_LMV_DEVICENAME);
2435         if (rc)
2436                 kmem_cache_destroy(obj_cache);
2437         
2438         return rc;
2439 }
2440
2441 #ifdef __KERNEL__
2442 static void lmv_exit(void)
2443 {
2444         class_unregister_type(OBD_LMV_DEVICENAME);
2445
2446         LASSERTF(kmem_cache_destroy(obj_cache) == 0,
2447                  "can't free lmv objects cache, %d object(s)"
2448                  "still in use\n", atomic_read(&obj_cache_count));
2449 }
2450
2451 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2452 MODULE_DESCRIPTION("Lustre Logical Metadata Volume OBD driver");
2453 MODULE_LICENSE("GPL");
2454
2455 module_init(lmv_init);
2456 module_exit(lmv_exit);
2457 #endif