Whamcloud - gitweb
- add libiam.a (if exists) to rpm, this is neededfor testing our branch;
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003, 2004, 2005, 2006 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LMV
26 #ifdef __KERNEL__
27 #include <linux/slab.h>
28 #include <linux/module.h>
29 #include <linux/init.h>
30 #include <linux/slab.h>
31 #include <linux/pagemap.h>
32 #include <asm/div64.h>
33 #include <linux/seq_file.h>
34 #include <linux/namei.h>
35 #else
36 #include <liblustre.h>
37 #endif
38 #include <linux/ext2_fs.h>
39
40 #include <lustre/lustre_idl.h>
41 #include <lustre_log.h>
42 #include <obd_support.h>
43 #include <lustre_lib.h>
44 #include <lustre_net.h>
45 #include <obd_class.h>
46 #include <lprocfs_status.h>
47 #include <lustre_lite.h>
48 #include "lmv_internal.h"
49
50 /* not defined for liblustre building */
51 #if !defined(ATOMIC_INIT)
52 #define ATOMIC_INIT(val) { (val) }
53 #endif
54
55 /* object cache. */
56 kmem_cache_t *obj_cache;
57 atomic_t obj_cache_count = ATOMIC_INIT(0);
58
59 static void lmv_activate_target(struct lmv_obd *lmv,
60                                 struct lmv_tgt_desc *tgt,
61                                 int activate)
62 {
63         if (tgt->active == activate)
64                 return;
65
66         tgt->active = activate;
67         lmv->desc.ld_active_tgt_count += (activate ? 1 : -1);
68 }
69
70 /* Error codes:
71  *
72  *  -EINVAL  : UUID can't be found in the LMV's target list
73  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
74  *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
75  */
76 static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
77                               int activate)
78 {
79         struct lmv_tgt_desc *tgt;
80         struct obd_device *obd;
81         int i, rc = 0;
82         ENTRY;
83
84         CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
85                lmv, uuid->uuid, activate);
86
87         spin_lock(&lmv->lmv_lock);
88         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
89                 if (tgt->ltd_exp == NULL)
90                         continue;
91
92                 CDEBUG(D_INFO, "lmv idx %d is %s conn "LPX64"\n",
93                        i, tgt->uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
94
95                 if (obd_uuid_equals(uuid, &tgt->uuid))
96                         break;
97         }
98
99         if (i == lmv->desc.ld_tgt_count)
100                 GOTO(out_lmv_lock, rc = -EINVAL);
101
102         obd = class_exp2obd(tgt->ltd_exp);
103         if (obd == NULL)
104                 GOTO(out_lmv_lock, rc = -ENOTCONN);
105
106         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
107                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
108                obd->obd_type->typ_name, i);
109         LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0);
110
111         if (tgt->active == activate) {
112                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
113                        activate ? "" : "in");
114                 GOTO(out_lmv_lock, rc);
115         }
116
117         CDEBUG(D_INFO, "Marking OBD %p %sactive\n",
118                obd, activate ? "" : "in");
119
120         lmv_activate_target(lmv, tgt, activate);
121
122         EXIT;
123
124  out_lmv_lock:
125         spin_unlock(&lmv->lmv_lock);
126         return rc;
127 }
128
129 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
130                       enum obd_notify_event ev, void *data)
131 {
132         struct obd_uuid *uuid;
133         int rc;
134         ENTRY;
135
136         if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) {
137                 CERROR("unexpected notification of %s %s!\n",
138                        watched->obd_type->typ_name,
139                        watched->obd_name);
140                 RETURN(-EINVAL);
141         }
142         uuid = &watched->u.cli.cl_target_uuid;
143
144         /* Set MDC as active before notifying the observer, so the observer can
145          * use the MDC normally. */
146         rc = lmv_set_mdc_active(&obd->u.lmv, uuid,
147                                 ev == OBD_NOTIFY_ACTIVE);
148         if (rc) {
149                 CERROR("%sactivation of %s failed: %d\n",
150                        ev == OBD_NOTIFY_ACTIVE ? "" : "de",
151                        uuid->uuid, rc);
152                 RETURN(rc);
153         }
154
155         if (obd->obd_observer)
156                 /* pass the notification up the chain. */
157                 rc = obd_notify(obd->obd_observer, watched, ev, data);
158
159         RETURN(rc);
160 }
161
162 /* this is fake connect function. Its purpose is to initialize lmv and say
163  * caller that everything is okay. Real connection will be performed later. */
164 static int lmv_connect(const struct lu_context *ctx,
165                        struct lustre_handle *conn, struct obd_device *obd,
166                        struct obd_uuid *cluuid, struct obd_connect_data *data)
167 {
168 #ifdef __KERNEL__
169         struct proc_dir_entry *lmv_proc_dir;
170 #endif
171         struct lmv_obd *lmv = &obd->u.lmv;
172         struct obd_export *exp;
173         int rc = 0;
174         ENTRY;
175
176         rc = class_connect(conn, obd, cluuid);
177         if (rc) {
178                 CERROR("class_connection() returned %d\n", rc);
179                 RETURN(rc);
180         }
181
182         exp = class_conn2export(conn);
183
184         /* we don't want to actually do the underlying connections more than
185          * once, so keep track. */
186         lmv->refcount++;
187         if (lmv->refcount > 1) {
188                 class_export_put(exp);
189                 RETURN(0);
190         }
191
192         lmv->exp = exp;
193         lmv->connected = 0;
194         lmv->cluuid = *cluuid;
195
196         if (data)
197                 lmv->conn_data = *data;
198
199 #ifdef __KERNEL__
200         lmv_proc_dir = lprocfs_register("target_obds", obd->obd_proc_entry,
201                                         NULL, NULL);
202         if (IS_ERR(lmv_proc_dir)) {
203                 CERROR("could not register /proc/fs/lustre/%s/%s/target_obds.",
204                        obd->obd_type->typ_name, obd->obd_name);
205                 lmv_proc_dir = NULL;
206         }
207 #endif
208
209         /* all real clients should perform actual connection right away, because
210          * it is possible, that LMV will not have opportunity to connect targets
211          * and MDC stuff will be called directly, for instance while reading
212          * ../mdc/../kbytesfree procfs file, etc. */
213         if (data->ocd_connect_flags & OBD_CONNECT_REAL)
214                 rc = lmv_check_connect(obd);
215
216 #ifdef __KERNEL__
217         if (rc) {
218                 if (lmv_proc_dir)
219                         lprocfs_remove(lmv_proc_dir);
220         }
221 #endif
222
223         RETURN(rc);
224 }
225
226 static void lmv_set_timeouts(struct obd_device *obd)
227 {
228         struct lmv_tgt_desc *tgts;
229         struct lmv_obd *lmv;
230         int i;
231
232         lmv = &obd->u.lmv;
233         if (lmv->server_timeout == 0)
234                 return;
235
236         if (lmv->connected == 0)
237                 return;
238
239         for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgts++) {
240                 if (tgts->ltd_exp == NULL)
241                         continue;
242
243                 obd_set_info_async(tgts->ltd_exp, strlen("inter_mds"),
244                                    "inter_mds", 0, NULL, NULL);
245         }
246 }
247
248 static int lmv_init_ea_size(struct obd_export *exp, int easize,
249                             int def_easize, int cookiesize)
250 {
251         struct obd_device *obd = exp->exp_obd;
252         struct lmv_obd *lmv = &obd->u.lmv;
253         int i, rc = 0, change = 0;
254         ENTRY;
255
256         if (lmv->max_easize < easize) {
257                 lmv->max_easize = easize;
258                 change = 1;
259         }
260         if (lmv->max_def_easize < def_easize) {
261                 lmv->max_def_easize = def_easize;
262                 change = 1;
263         }
264         if (lmv->max_cookiesize < cookiesize) {
265                 lmv->max_cookiesize = cookiesize;
266                 change = 1;
267         }
268         if (change == 0)
269                 RETURN(0);
270
271         if (lmv->connected == 0)
272                 RETURN(0);
273
274         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
275                 if (lmv->tgts[i].ltd_exp == NULL) {
276                         CWARN("%s: NULL export for %d\n", obd->obd_name, i);
277                         continue;
278                 }
279
280                 rc = md_init_ea_size(lmv->tgts[i].ltd_exp, easize, def_easize,
281                                      cookiesize);
282                 if (rc) {
283                         CERROR("obd_init_ea_size() failed on MDT target %d, "
284                                "error %d.\n", i, rc);
285                         break;
286                 }
287         }
288         RETURN(rc);
289 }
290
291 #define MAX_STRING_SIZE 128
292
293 int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
294 {
295         struct lmv_obd *lmv = &obd->u.lmv;
296         struct obd_uuid *cluuid = &lmv->cluuid;
297         struct obd_connect_data *mdc_data = NULL;
298         struct obd_uuid lmv_mdc_uuid = { "LMV_MDC_UUID" };
299         struct lustre_handle conn = {0, };
300         struct obd_device *mdc_obd;
301         struct obd_export *mdc_exp;
302         int rc;
303 #ifdef __KERNEL__
304         struct proc_dir_entry *lmv_proc_dir;
305 #endif
306         ENTRY;
307
308         /* for MDS: don't connect to yourself */
309         if (obd_uuid_equals(&tgt->uuid, cluuid)) {
310                 CDEBUG(D_CONFIG, "don't connect back to %s\n", cluuid->uuid);
311                 /* XXX - the old code didn't increment active tgt count.
312                  *       should we ? */
313                 RETURN(0);
314         }
315
316         mdc_obd = class_find_client_obd(&tgt->uuid, LUSTRE_MDC_NAME,
317                                         &obd->obd_uuid);
318         if (!mdc_obd) {
319                 CERROR("target %s not attached\n", tgt->uuid.uuid);
320                 RETURN(-EINVAL);
321         }
322
323         CDEBUG(D_CONFIG, "connect to %s(%s) - %s, %s FOR %s\n",
324                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
325                 tgt->uuid.uuid, obd->obd_uuid.uuid,
326                 cluuid->uuid);
327
328         if (!mdc_obd->obd_set_up) {
329                 CERROR("target %s not set up\n", tgt->uuid.uuid);
330                 RETURN(-EINVAL);
331         }
332
333         rc = obd_connect(NULL, &conn, mdc_obd, &lmv_mdc_uuid, &lmv->conn_data);
334         if (rc) {
335                 CERROR("target %s connect error %d\n", tgt->uuid.uuid, rc);
336                 RETURN(rc);
337         }
338
339         mdc_exp = class_conn2export(&conn);
340         fld_client_add_target(&lmv->lmv_fld, mdc_exp);
341
342         mdc_data = &class_exp2cliimp(mdc_exp)->imp_connect_data;
343
344         rc = obd_register_observer(mdc_obd, obd);
345         if (rc) {
346                 obd_disconnect(mdc_exp);
347                 CERROR("target %s register_observer error %d\n",
348                        tgt->uuid.uuid, rc);
349                 RETURN(rc);
350         }
351
352         if (obd->obd_observer) {
353                 /* tell the mds_lmv about the new target */
354                 rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
355                                 OBD_NOTIFY_ACTIVE, (void *)(tgt - lmv->tgts));
356                 if (rc) {
357                         obd_disconnect(mdc_exp);
358                         RETURN(rc);
359                 }
360         }
361
362         tgt->active = 1;
363         tgt->ltd_exp = mdc_exp;
364         lmv->desc.ld_active_tgt_count++;
365
366         /* copy connect data, it may be used later */
367         lmv->datas[tgt->idx] = *mdc_data;
368
369         md_init_ea_size(tgt->ltd_exp, lmv->max_easize,
370                         lmv->max_def_easize, lmv->max_cookiesize);
371
372         CDEBUG(D_CONFIG, "connected to %s(%s) successfully (%d)\n",
373                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
374                 atomic_read(&obd->obd_refcount));
375
376 #ifdef __KERNEL__
377         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
378         if (lmv_proc_dir) {
379                 struct proc_dir_entry *mdc_symlink;
380                 char name[MAX_STRING_SIZE + 1];
381
382                 LASSERT(mdc_obd->obd_type != NULL);
383                 LASSERT(mdc_obd->obd_type->typ_name != NULL);
384                 name[MAX_STRING_SIZE] = '\0';
385                 snprintf(name, MAX_STRING_SIZE, "../../../%s/%s",
386                          mdc_obd->obd_type->typ_name,
387                          mdc_obd->obd_name);
388                 mdc_symlink = proc_symlink(mdc_obd->obd_name,
389                                            lmv_proc_dir, name);
390                 if (mdc_symlink == NULL) {
391                         CERROR("could not register LMV target "
392                                "/proc/fs/lustre/%s/%s/target_obds/%s.",
393                                obd->obd_type->typ_name, obd->obd_name,
394                                mdc_obd->obd_name);
395                         lprocfs_remove(lmv_proc_dir);
396                         lmv_proc_dir = NULL;
397                 }
398         }
399 #endif
400         RETURN(0);
401 }
402
403 int lmv_add_target(struct obd_device *obd, struct obd_uuid *tgt_uuid)
404 {
405         struct lmv_obd *lmv = &obd->u.lmv;
406         struct lmv_tgt_desc *tgt;
407         int rc = 0;
408         ENTRY;
409
410         CDEBUG(D_CONFIG, "tgt_uuid: %s.\n", tgt_uuid->uuid);
411
412         lmv_init_lock(lmv);
413
414         if (lmv->desc.ld_active_tgt_count >= LMV_MAX_TGT_COUNT) {
415                 lmv_init_unlock(lmv);
416                 CERROR("can't add %s, LMV module compiled for %d MDCs. "
417                        "That many MDCs already configured.\n",
418                        tgt_uuid->uuid, LMV_MAX_TGT_COUNT);
419                 RETURN(-EINVAL);
420         }
421         if (lmv->desc.ld_tgt_count == 0) {
422                 struct obd_device *mdc_obd;
423
424                 mdc_obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME,
425                                                 &obd->obd_uuid);
426                 if (!mdc_obd) {
427                         lmv_init_unlock(lmv);
428                         CERROR("Target %s not attached\n", tgt_uuid->uuid);
429                         RETURN(-EINVAL);
430                 }
431
432                 rc = obd_llog_init(obd, mdc_obd, 0, NULL);
433                 if (rc) {
434                         lmv_init_unlock(lmv);
435                         CERROR("lmv failed to setup llogging subsystems\n");
436                 }
437         }
438         spin_lock(&lmv->lmv_lock);
439         tgt = lmv->tgts + lmv->desc.ld_tgt_count++;
440         tgt->uuid = *tgt_uuid;
441         spin_unlock(&lmv->lmv_lock);
442
443         if (lmv->connected) {
444                 rc = lmv_connect_mdc(obd, tgt);
445                 if (rc) {
446                         spin_lock(&lmv->lmv_lock);
447                         lmv->desc.ld_tgt_count--;
448                         memset(tgt, 0, sizeof(*tgt));
449                         spin_unlock(&lmv->lmv_lock);
450                 } else {
451                         int easize = sizeof(struct lmv_stripe_md) +
452                                      lmv->desc.ld_tgt_count *
453                                      sizeof(struct lu_fid);
454                         lmv_init_ea_size(obd->obd_self_export, easize, 0, 0);
455                 }
456         }
457
458         lmv_init_unlock(lmv);
459         RETURN(rc);
460 }
461
462 /* performs a check if passed obd is connected. If no - connect it. */
463 int lmv_check_connect(struct obd_device *obd)
464 {
465         struct lmv_obd *lmv = &obd->u.lmv;
466         struct lmv_tgt_desc *tgt;
467         int i, rc, easize;
468         ENTRY;
469
470         if (lmv->connected)
471                 RETURN(0);
472
473         lmv_init_lock(lmv);
474         if (lmv->connected) {
475                 lmv_init_unlock(lmv);
476                 RETURN(0);
477         }
478
479         if (lmv->desc.ld_tgt_count == 0) {
480                 CERROR("%s: no targets configured.\n", obd->obd_name);
481                 RETURN(-EINVAL);
482         }
483
484         CDEBUG(D_CONFIG, "time to connect %s to %s\n",
485                lmv->cluuid.uuid, obd->obd_name);
486
487         LASSERT(lmv->tgts != NULL);
488
489         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
490                 rc = lmv_connect_mdc(obd, tgt);
491                 if (rc)
492                         GOTO(out_disc, rc);
493         }
494
495         lmv_set_timeouts(obd);
496         class_export_put(lmv->exp);
497         lmv->connected = 1;
498         easize = lmv_get_easize(lmv);
499         lmv_init_ea_size(obd->obd_self_export, easize, 0, 0);
500         lmv_init_unlock(lmv);
501         RETURN(0);
502
503  out_disc:
504         while (i-- > 0) {
505                 int rc2;
506                 --tgt;
507                 tgt->active = 0;
508                 if (tgt->ltd_exp) {
509                         --lmv->desc.ld_active_tgt_count;
510                         rc2 = obd_disconnect(tgt->ltd_exp);
511                         if (rc2) {
512                                 CERROR("error: LMV target %s disconnect on "
513                                        "MDC idx %d: error %d\n",
514                                        tgt->uuid.uuid, i, rc2);
515                         }
516                 }
517         }
518         class_disconnect(lmv->exp);
519         lmv_init_unlock(lmv);
520         RETURN(rc);
521 }
522
523 static int lmv_disconnect(struct obd_export *exp)
524 {
525         struct obd_device *obd = class_exp2obd(exp);
526         struct lmv_obd *lmv = &obd->u.lmv;
527
528 #ifdef __KERNEL__
529         struct proc_dir_entry *lmv_proc_dir;
530 #endif
531         int rc, i;
532         ENTRY;
533
534         if (!lmv->tgts)
535                 goto out_local;
536
537         /* Only disconnect the underlying layers on the final disconnect. */
538         lmv->refcount--;
539         if (lmv->refcount != 0)
540                 goto out_local;
541
542 #ifdef __KERNEL__
543         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
544 #endif
545
546         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
547                 struct obd_device *mdc_obd;
548
549                 if (lmv->tgts[i].ltd_exp == NULL)
550                         continue;
551
552                 mdc_obd = class_exp2obd(lmv->tgts[i].ltd_exp);
553
554                 if (mdc_obd)
555                         mdc_obd->obd_no_recov = obd->obd_no_recov;
556
557 #ifdef __KERNEL__
558                 if (lmv_proc_dir) {
559                         struct proc_dir_entry *mdc_symlink;
560
561                         mdc_symlink = lprocfs_srch(lmv_proc_dir, mdc_obd->obd_name);
562                         if (mdc_symlink) {
563                                 lprocfs_remove(mdc_symlink);
564                         } else {
565                                 CERROR("/proc/fs/lustre/%s/%s/target_obds/%s missing\n",
566                                        obd->obd_type->typ_name, obd->obd_name,
567                                        mdc_obd->obd_name);
568                         }
569                 }
570 #endif
571                 CDEBUG(D_OTHER, "disconnected from %s(%s) successfully\n",
572                         lmv->tgts[i].ltd_exp->exp_obd->obd_name,
573                         lmv->tgts[i].ltd_exp->exp_obd->obd_uuid.uuid);
574
575                 obd_register_observer(lmv->tgts[i].ltd_exp->exp_obd, NULL);
576                 rc = obd_disconnect(lmv->tgts[i].ltd_exp);
577                 if (rc) {
578                         if (lmv->tgts[i].active) {
579                                 CERROR("Target %s disconnect error %d\n",
580                                        lmv->tgts[i].uuid.uuid, rc);
581                         }
582                         rc = 0;
583                 }
584
585                 lmv_activate_target(lmv, &lmv->tgts[i], 0);
586                 lmv->tgts[i].ltd_exp = NULL;
587         }
588
589 #ifdef __KERNEL__
590         if (lmv_proc_dir) {
591                 lprocfs_remove(lmv_proc_dir);
592         } else {
593                 CERROR("/proc/fs/lustre/%s/%s/target_obds missing\n",
594                        obd->obd_type->typ_name, obd->obd_name);
595         }
596 #endif
597
598 out_local:
599         /* this is the case when no real connection is established by
600          * lmv_check_connect(). */
601         if (!lmv->connected)
602                 class_export_put(exp);
603         rc = class_disconnect(exp);
604         if (lmv->refcount == 0)
605                 lmv->connected = 0;
606         RETURN(rc);
607 }
608
609 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
610                          int len, void *karg, void *uarg)
611 {
612         struct obd_device *obddev = class_exp2obd(exp);
613         struct lmv_obd *lmv = &obddev->u.lmv;
614         int i, rc = 0, set = 0;
615         ENTRY;
616
617         if (lmv->desc.ld_tgt_count == 0)
618                 RETURN(-ENOTTY);
619
620         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
621                 int err;
622
623                 if (lmv->tgts[i].ltd_exp == NULL)
624                         continue;
625
626                 err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp, len, karg, uarg);
627                 if (err) {
628                         if (lmv->tgts[i].active) {
629                                 CERROR("error: iocontrol MDC %s on MDT"
630                                        "idx %d: err = %d\n",
631                                        lmv->tgts[i].uuid.uuid, i, err);
632                                 if (!rc)
633                                         rc = err;
634                         }
635                 } else
636                         set = 1;
637         }
638         if (!set && !rc)
639                 rc = -EIO;
640
641         RETURN(rc);
642 }
643
644 /* assume all is balanced for now */
645 static int lmv_fids_balanced(struct obd_device *obd)
646 {
647         ENTRY;
648         RETURN(1);
649 }
650
651 static int lmv_all_chars_policy(int count, struct qstr *name)
652 {
653         unsigned int c = 0;
654         unsigned int len = name->len;
655
656         while (len > 0)
657                 c += name->name[-- len];
658         c = c % count;
659         return c;
660 }
661
662 static int lmv_placement_policy(struct obd_device *obd,
663                                 struct lu_placement_hint *hint,
664                                 mdsno_t *mds)
665 {
666         struct lmv_obd *lmv = &obd->u.lmv;
667         int rc;
668         ENTRY;
669
670         LASSERT(mds != NULL);
671
672         /* here are some policies to allocate new fid */
673         if (lmv_fids_balanced(obd)) {
674                 /* allocate new fid basing on its name in the case fids are
675                  * balanced, that is all sequences have more or less equal
676                  * number of objects created. */
677                 if (hint->ph_cname && (hint->ph_opc == LUSTRE_OPC_MKDIR)) {
678                         *mds = lmv_all_chars_policy(lmv->desc.ld_tgt_count,
679                                                     hint->ph_cname);
680                         rc = 0;
681                 } else {
682                         /* default policy is to use parent MDS */
683                         LASSERT(fid_is_sane(hint->ph_pfid));
684                         rc = lmv_fld_lookup(lmv, hint->ph_pfid, mds);
685                 }
686         } else {
687                 /* sequences among all tgts are not well balanced, allocate new
688                  * fid taking this into account to balance them. Not implemented
689                  * yet! */
690                 *mds = 0;
691                 rc = -EINVAL;
692         }
693
694         if (rc) {
695                 CERROR("cannot choose MDS, err = %d\n", rc);
696         } else {
697                 LASSERT(*mds < lmv->desc.ld_tgt_count);
698         }
699
700         RETURN(rc);
701 }
702
703 static int lmv_fid_init(struct obd_export *exp)
704 {
705         struct obd_device *obd = class_exp2obd(exp);
706         struct lmv_obd *lmv = &obd->u.lmv;
707         int i, rc = 0;
708         ENTRY;
709
710         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
711                 if (lmv->tgts[i].ltd_exp == NULL)
712                         continue;
713
714                 rc = obd_fid_init(lmv->tgts[i].ltd_exp);
715                 if (rc)
716                         RETURN(rc);
717         }
718         RETURN(rc);
719 }
720
721 static int lmv_fid_fini(struct obd_export *exp)
722 {
723         struct obd_device *obd = class_exp2obd(exp);
724         struct lmv_obd *lmv = &obd->u.lmv;
725         int i, rc = 0;
726         ENTRY;
727
728         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
729                 if (lmv->tgts[i].ltd_exp == NULL)
730                         continue;
731
732                 rc = obd_fid_fini(lmv->tgts[i].ltd_exp);
733                 if (rc)
734                         break;
735         }
736         RETURN(rc);
737 }
738
739 static int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
740                          struct lu_placement_hint *hint)
741 {
742         struct obd_device *obd = class_exp2obd(exp);
743         struct lmv_obd *lmv = &obd->u.lmv;
744         mdsno_t mds;
745         int rc;
746         ENTRY;
747
748         LASSERT(fid != NULL);
749         LASSERT(hint != NULL);
750
751         rc = lmv_placement_policy(obd, hint, &mds);
752         if (rc) {
753                 CERROR("can't get target for allocating fid, "
754                        "rc %d\n", rc);
755                 RETURN(rc);
756         }
757
758         /* asking underlaying tgt layer to allocate new fid */
759         rc = obd_fid_alloc(lmv->tgts[mds].ltd_exp, fid, hint);
760
761         /* client switches to new sequence, setup fld */
762         if (rc > 0) {
763                 LASSERT(fid_is_sane(fid));
764                 
765                 rc = fld_client_create(&lmv->lmv_fld, fid_seq(fid),
766                                        mds);
767                 if (rc) {
768                         CERROR("can't create fld entry, rc %d\n", rc);
769                         RETURN(rc);
770                 }
771         }
772
773         RETURN(rc);
774 }
775
776 static int lmv_fid_delete(struct obd_export *exp, const struct lu_fid *fid)
777 {
778         ENTRY;
779
780         LASSERT(exp && fid);
781         if (lmv_obj_delete(exp, fid)) {
782                 CDEBUG(D_OTHER, "lmv object "DFID" is destroyed.\n",
783                        PFID(fid));
784         }
785         RETURN(0);
786 }
787
788 static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
789 {
790         struct lmv_obd *lmv = &obd->u.lmv;
791         struct lprocfs_static_vars lvars;
792         struct lmv_desc *desc;
793         int rc, i = 0;
794         ENTRY;
795
796         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
797                 CERROR("LMV setup requires a descriptor\n");
798                 RETURN(-EINVAL);
799         }
800
801         desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
802         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
803                 CERROR("descriptor size wrong: %d > %d\n",
804                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
805                 RETURN(-EINVAL);
806         }
807
808         lmv->tgts_size = LMV_MAX_TGT_COUNT * sizeof(struct lmv_tgt_desc);
809
810         OBD_ALLOC(lmv->tgts, lmv->tgts_size);
811         if (lmv->tgts == NULL)
812                 RETURN(-ENOMEM);
813
814         for (i = 0; i < LMV_MAX_TGT_COUNT; i++)
815                 lmv->tgts[i].idx = i;
816
817         lmv->datas_size = LMV_MAX_TGT_COUNT * sizeof(struct obd_connect_data);
818
819         OBD_ALLOC(lmv->datas, lmv->datas_size);
820         if (lmv->datas == NULL)
821                 GOTO(out_free_tgts, rc = -ENOMEM);
822
823         obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid);
824         lmv->desc.ld_tgt_count = 0;
825         lmv->desc.ld_active_tgt_count = 0;
826         lmv->max_cookiesize = 0;
827         lmv->max_def_easize = 0;
828         lmv->max_easize = 0;
829
830         spin_lock_init(&lmv->lmv_lock);
831         sema_init(&lmv->init_sem, 1);
832
833         rc = lmv_mgr_setup(obd);
834         if (rc) {
835                 CERROR("Can't setup LMV object manager, "
836                        "error %d.\n", rc);
837                 GOTO(out_free_datas, rc);
838         }
839
840         lprocfs_init_vars(lmv, &lvars);
841         lprocfs_obd_setup(obd, lvars.obd_vars);
842 #ifdef LPROCFS
843         {
844                 struct proc_dir_entry *entry;
845
846                 entry = create_proc_entry("target_obd_status", 0444,
847                                           obd->obd_proc_entry);
848                 if (entry != NULL) {
849                         entry->proc_fops = &lmv_proc_target_fops;
850                         entry->data = obd;
851                 }
852        }
853 #endif
854         rc = fld_client_init(&lmv->lmv_fld,
855                              "LMV_UUID", LUSTRE_CLI_FLD_HASH_RRB);
856         if (rc) {
857                 CERROR("can't init FLD, err %d\n",
858                        rc);
859                 GOTO(out_free_datas, rc);
860         }
861
862         RETURN(0);
863
864 out_free_datas:
865         OBD_FREE(lmv->datas, lmv->datas_size);
866         lmv->datas = NULL;
867 out_free_tgts:
868         OBD_FREE(lmv->tgts, lmv->tgts_size);
869         lmv->tgts = NULL;
870         return rc;
871 }
872
873 static int lmv_cleanup(struct obd_device *obd)
874 {
875         struct lmv_obd *lmv = &obd->u.lmv;
876         ENTRY;
877
878         fld_client_fini(&lmv->lmv_fld);
879         lprocfs_obd_cleanup(obd);
880         lmv_mgr_cleanup(obd);
881         OBD_FREE(lmv->datas, lmv->datas_size);
882         OBD_FREE(lmv->tgts, lmv->tgts_size);
883
884         RETURN(0);
885 }
886
887 static int lmv_process_config(struct obd_device *obd, obd_count len, void *buf)
888 {
889         struct lustre_cfg *lcfg = buf;
890         struct obd_uuid tgt_uuid;
891         int rc;
892         ENTRY;
893
894         switch(lcfg->lcfg_command) {
895         case LCFG_ADD_MDC:
896                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(tgt_uuid.uuid))
897                         GOTO(out, rc = -EINVAL);
898
899                 obd_str2uuid(&tgt_uuid, lustre_cfg_string(lcfg, 1));
900                 rc = lmv_add_target(obd, &tgt_uuid);
901                 GOTO(out, rc);
902         default: {
903                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
904                 GOTO(out, rc = -EINVAL);
905         }
906         }
907 out:
908         RETURN(rc);
909 }
910
911 static int lmv_statfs(struct obd_device *obd, struct obd_statfs *osfs,
912                       __u64 max_age)
913 {
914         struct lmv_obd *lmv = &obd->u.lmv;
915         struct obd_statfs *temp;
916         int rc = 0, i;
917         ENTRY;
918
919         rc = lmv_check_connect(obd);
920         if (rc)
921                 RETURN(rc);
922
923         OBD_ALLOC(temp, sizeof(*temp));
924         if (temp == NULL)
925                 RETURN(-ENOMEM);
926
927         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
928                 if (lmv->tgts[i].ltd_exp == NULL)
929                         continue;
930
931                 rc = obd_statfs(lmv->tgts[i].ltd_exp->exp_obd, temp, max_age);
932                 if (rc) {
933                         CERROR("can't stat MDS #%d (%s), error %d\n", i,
934                                lmv->tgts[i].ltd_exp->exp_obd->obd_name,
935                                rc);
936                         GOTO(out_free_temp, rc);
937                 }
938                 if (i == 0) {
939                         *osfs = *temp;
940                 } else {
941                         osfs->os_bavail += temp->os_bavail;
942                         osfs->os_blocks += temp->os_blocks;
943                         osfs->os_ffree += temp->os_ffree;
944                         osfs->os_files += temp->os_files;
945                 }
946         }
947
948         EXIT;
949 out_free_temp:
950         OBD_FREE(temp, sizeof(*temp));
951         return rc;
952 }
953
954 static int lmv_getstatus(struct obd_export *exp,
955                          struct lu_fid *fid)
956 {
957         struct obd_device *obd = exp->exp_obd;
958         struct lmv_obd *lmv = &obd->u.lmv;
959         int rc;
960         ENTRY;
961
962         rc = lmv_check_connect(obd);
963         if (rc)
964                 RETURN(rc);
965
966         rc = md_getstatus(lmv->tgts[0].ltd_exp, fid);
967
968         RETURN(rc);
969 }
970
971 static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid,
972                         obd_valid valid, const char *name, const char *input,
973                         int input_size, int output_size, int flags,
974                         struct ptlrpc_request **request)
975 {
976         struct obd_device *obd = exp->exp_obd;
977         struct lmv_obd *lmv = &obd->u.lmv;
978         struct obd_export *tgt_exp;
979         int rc;
980         ENTRY;
981
982         rc = lmv_check_connect(obd);
983         if (rc)
984                 RETURN(rc);
985
986         tgt_exp = lmv_get_export(lmv, fid);
987         if (IS_ERR(tgt_exp))
988                 RETURN(PTR_ERR(tgt_exp));
989
990         rc = md_getxattr(tgt_exp, fid, valid, name, input, input_size,
991                          output_size, flags, request);
992
993         RETURN(rc);
994 }
995
996 static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
997                         obd_valid valid, const char *name, const char *input,
998                         int input_size, int output_size, int flags,
999                         struct ptlrpc_request **request)
1000 {
1001         struct obd_device *obd = exp->exp_obd;
1002         struct lmv_obd *lmv = &obd->u.lmv;
1003         struct obd_export *tgt_exp;
1004         int rc;
1005         ENTRY;
1006
1007         rc = lmv_check_connect(obd);
1008         if (rc)
1009                 RETURN(rc);
1010
1011         tgt_exp = lmv_get_export(lmv, fid);
1012         if (IS_ERR(tgt_exp))
1013                 RETURN(PTR_ERR(tgt_exp));
1014
1015         rc = md_setxattr(tgt_exp, fid, valid, name,
1016                          input, input_size, output_size, flags, request);
1017         
1018         RETURN(rc);
1019 }
1020
1021 static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid,
1022                        obd_valid valid, int ea_size,
1023                        struct ptlrpc_request **request)
1024 {
1025         struct obd_device *obd = exp->exp_obd;
1026         struct lmv_obd *lmv = &obd->u.lmv;
1027         struct obd_export *tgt_exp;
1028         struct lmv_obj *obj;
1029         int rc, i;
1030         ENTRY;
1031
1032         rc = lmv_check_connect(obd);
1033         if (rc)
1034                 RETURN(rc);
1035
1036         tgt_exp = lmv_get_export(lmv, fid);
1037         if (IS_ERR(tgt_exp))
1038                 RETURN(PTR_ERR(tgt_exp));
1039
1040         rc = md_getattr(tgt_exp, fid, valid, ea_size, request);
1041         if (rc)
1042                 RETURN(rc);
1043
1044         obj = lmv_obj_grab(obd, fid);
1045
1046         CDEBUG(D_OTHER, "GETATTR for "DFID" %s\n",
1047                PFID(fid), obj ? "(split)" : "");
1048
1049         /* if object is split, then we loop over all the slaves and gather size
1050          * attribute. In ideal world we would have to gather also mds field from
1051          * all slaves, as object is spread over the cluster and this is
1052          * definitely interesting information and it is not good to loss it,
1053          * but... */
1054         if (obj) {
1055                 struct mdt_body *body;
1056
1057                 if (*request == NULL) {
1058                         lmv_obj_put(obj);
1059                         RETURN(rc);
1060                 }
1061
1062                 body = lustre_msg_buf((*request)->rq_repmsg, REQ_REC_OFF,
1063                                       sizeof(*body));
1064                 LASSERT(body != NULL);
1065
1066                 lmv_obj_lock(obj);
1067
1068                 for (i = 0; i < obj->lo_objcount; i++) {
1069                         if (lmv->tgts[i].ltd_exp == NULL) {
1070                                 CWARN("%s: NULL export for %d\n",
1071                                       obd->obd_name, i);
1072                                 continue;
1073                         }
1074
1075                         /* skip master obj. */
1076                         if (lu_fid_eq(&obj->lo_fid, &obj->lo_inodes[i].li_fid))
1077                                 continue;
1078
1079                         body->size += obj->lo_inodes[i].li_size;
1080                 }
1081
1082                 lmv_obj_unlock(obj);
1083                 lmv_obj_put(obj);
1084         }
1085
1086         RETURN(rc);
1087 }
1088
1089 static int lmv_change_cbdata(struct obd_export *exp,
1090                              const struct lu_fid *fid,
1091                              ldlm_iterator_t it,
1092                              void *data)
1093 {
1094         struct obd_device *obd = exp->exp_obd;
1095         struct lmv_obd *lmv = &obd->u.lmv;
1096         int i, rc;
1097         ENTRY;
1098
1099         rc = lmv_check_connect(obd);
1100         if (rc)
1101                 RETURN(rc);
1102
1103         CDEBUG(D_OTHER, "CBDATA for "DFID"\n", PFID(fid));
1104
1105         /* with CMD every object can have two locks in different namespaces:
1106          * lookup lock in space of mds storing direntry and update/open lock in
1107          * space of mds storing inode */
1108         for (i = 0; i < lmv->desc.ld_tgt_count; i++)
1109                 md_change_cbdata(lmv->tgts[i].ltd_exp, fid, it, data);
1110
1111         RETURN(0);
1112 }
1113
1114 static int lmv_close(struct obd_export *exp,
1115                      struct md_op_data *op_data,
1116                      struct obd_client_handle *och,
1117                      struct ptlrpc_request **request)
1118 {
1119         struct obd_device *obd = exp->exp_obd;
1120         struct lmv_obd *lmv = &obd->u.lmv;
1121         struct obd_export *tgt_exp;
1122         int rc;
1123         ENTRY;
1124
1125         rc = lmv_check_connect(obd);
1126         if (rc)
1127                 RETURN(rc);
1128
1129         tgt_exp = lmv_get_export(lmv, &op_data->fid1);
1130         if (IS_ERR(tgt_exp))
1131                 RETURN(PTR_ERR(tgt_exp));
1132
1133         CDEBUG(D_OTHER, "CLOSE "DFID"\n", PFID(&op_data->fid1));
1134         rc = md_close(tgt_exp, op_data, och, request);
1135         RETURN(rc);
1136 }
1137
1138 /* called in the case MDS returns -ERESTART on create on open, what means that
1139  * directory is split and its LMV presentation object has to be updated. */
1140 int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid)
1141 {
1142         struct obd_device *obd = exp->exp_obd;
1143         struct lmv_obd *lmv = &obd->u.lmv;
1144         struct ptlrpc_request *req = NULL;
1145         struct obd_export *tgt_exp;
1146         struct lmv_obj *obj;
1147         struct lustre_md md;
1148         int mealen, rc;
1149         __u64 valid;
1150         ENTRY;
1151
1152         md.mea = NULL;
1153         mealen = lmv_get_easize(lmv);
1154
1155         valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA;
1156
1157         tgt_exp = lmv_get_export(lmv, fid);
1158         if (IS_ERR(tgt_exp))
1159                 RETURN(PTR_ERR(tgt_exp));
1160
1161         /* time to update mea of parent fid */
1162         rc = md_getattr(tgt_exp, fid, valid, mealen, &req);
1163         if (rc) {
1164                 CERROR("md_getattr() failed, error %d\n", rc);
1165                 GOTO(cleanup, rc);
1166         }
1167
1168         rc = md_get_lustre_md(tgt_exp, req, 0, NULL, &md);
1169         if (rc) {
1170                 CERROR("mdc_get_lustre_md() failed, error %d\n", rc);
1171                 GOTO(cleanup, rc);
1172         }
1173
1174         if (md.mea == NULL)
1175                 GOTO(cleanup, rc = -ENODATA);
1176
1177         obj = lmv_obj_create(exp, fid, md.mea);
1178         if (IS_ERR(obj))
1179                 rc = PTR_ERR(obj);
1180         else
1181                 lmv_obj_put(obj);
1182
1183         obd_free_memmd(exp, (struct lov_stripe_md **)&md.mea);
1184
1185         EXIT;
1186 cleanup:
1187         if (req)
1188                 ptlrpc_req_finished(req);
1189         return rc;
1190 }
1191
1192 int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
1193                const void *data, int datalen, int mode, __u32 uid,
1194                __u32 gid, __u32 cap_effective,  __u64 rdev,
1195                struct ptlrpc_request **request)
1196 {
1197         struct obd_device *obd = exp->exp_obd;
1198         struct lmv_obd *lmv = &obd->u.lmv;
1199         struct obd_export *tgt_exp;
1200         struct mdt_body *body;
1201         struct lmv_obj *obj;
1202         int rc, loop = 0;
1203         ENTRY;
1204
1205         rc = lmv_check_connect(obd);
1206         if (rc)
1207                 RETURN(rc);
1208
1209         if (!lmv->desc.ld_active_tgt_count)
1210                 RETURN(-EIO);
1211 repeat:
1212         LASSERT(++loop <= 2);
1213         obj = lmv_obj_grab(obd, &op_data->fid1);
1214         if (obj) {
1215                 mdsno_t mds;
1216                 
1217                 mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1218                                    op_data->name, op_data->namelen);
1219                 op_data->fid1 = obj->lo_inodes[mds].li_fid;
1220                 lmv_obj_put(obj);
1221         }
1222
1223         CDEBUG(D_OTHER, "CREATE '%*s' on "DFID"\n", op_data->namelen,
1224                op_data->name, PFID(&op_data->fid1));
1225
1226         tgt_exp = lmv_get_export(lmv, &op_data->fid1);
1227         if (IS_ERR(tgt_exp))
1228                 RETURN(PTR_ERR(tgt_exp));
1229
1230         rc = md_create(tgt_exp, op_data, data, datalen, mode, uid, gid,
1231                        cap_effective, rdev, request);
1232         if (rc == 0) {
1233                 if (*request == NULL)
1234                         RETURN(rc);
1235
1236                 body = lustre_msg_buf((*request)->rq_repmsg, REQ_REC_OFF,
1237                                       sizeof(*body));
1238                 if (body == NULL)
1239                         RETURN(-ENOMEM);
1240
1241                 CDEBUG(D_OTHER, "created. "DFID"\n", PFID(&op_data->fid1));
1242         } else if (rc == -ERESTART) {
1243                 /* directory got split. time to update local object and repeat
1244                  * the request with proper MDS. */
1245                 rc = lmv_handle_split(exp, &op_data->fid1);
1246                 if (rc == 0) {
1247                         ptlrpc_req_finished(*request);
1248                         goto repeat;
1249                 }
1250         }
1251         RETURN(rc);
1252 }
1253
1254 static int lmv_done_writing(struct obd_export *exp,
1255                             struct md_op_data *op_data)
1256 {
1257         struct obd_device *obd = exp->exp_obd;
1258         struct lmv_obd *lmv = &obd->u.lmv;
1259         struct obd_export *tgt_exp;
1260         int rc;
1261         ENTRY;
1262
1263         rc = lmv_check_connect(obd);
1264         if (rc)
1265                 RETURN(rc);
1266
1267         tgt_exp = lmv_get_export(lmv, &op_data->fid1);
1268         if (IS_ERR(tgt_exp))
1269                 RETURN(PTR_ERR(tgt_exp));
1270
1271         rc = md_done_writing(tgt_exp, op_data);
1272         RETURN(rc);
1273 }
1274
1275 static int
1276 lmv_enqueue_slaves(struct obd_export *exp, int locktype,
1277                    struct lookup_intent *it, int lockmode,
1278                    struct md_op_data *op_data, struct lustre_handle *lockh,
1279                    void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
1280                    ldlm_blocking_callback cb_blocking, void *cb_data)
1281 {
1282         struct obd_device *obd = exp->exp_obd;
1283         struct lmv_obd *lmv = &obd->u.lmv;
1284         struct lmv_stripe_md *mea = op_data->mea1;
1285         struct md_op_data *op_data2;
1286         struct obd_export *tgt_exp;
1287         int i, rc = 0;
1288         ENTRY;
1289
1290         OBD_ALLOC_PTR(op_data2);
1291         if (op_data2 == NULL)
1292                 RETURN(-ENOMEM);
1293
1294         LASSERT(mea != NULL);
1295         for (i = 0; i < mea->mea_count; i++) {
1296                 memset(op_data2, 0, sizeof(*op_data2));
1297                 op_data2->fid1 = mea->mea_ids[i];
1298
1299                 tgt_exp = lmv_get_export(lmv, &op_data2->fid1);
1300                 if (IS_ERR(tgt_exp))
1301                         GOTO(cleanup, rc = PTR_ERR(tgt_exp));
1302
1303                 if (tgt_exp == NULL)
1304                         continue;
1305
1306                 rc = md_enqueue(tgt_exp, locktype, it, lockmode, op_data2,
1307                                 lockh + i, lmm, lmmsize, cb_compl, cb_blocking,
1308                                 cb_data, 0);
1309
1310                 CDEBUG(D_OTHER, "take lock on slave "DFID" -> %d/%d\n",
1311                        PFID(&mea->mea_ids[i]), rc, it->d.lustre.it_status);
1312
1313                 if (rc)
1314                         GOTO(cleanup, rc);
1315                 
1316                 if (it->d.lustre.it_data) {
1317                         struct ptlrpc_request *req;
1318                         req = (struct ptlrpc_request *)it->d.lustre.it_data;
1319                         ptlrpc_req_finished(req);
1320                 }
1321
1322                 if (it->d.lustre.it_status)
1323                         GOTO(cleanup, rc = it->d.lustre.it_status);
1324         }
1325
1326         EXIT;
1327 cleanup:
1328         OBD_FREE_PTR(op_data2);
1329
1330         if (rc != 0) {
1331                 /* drop all taken locks */
1332                 while (--i >= 0) {
1333                         if (lockh[i].cookie)
1334                                 ldlm_lock_decref(lockh + i, lockmode);
1335                         lockh[i].cookie = 0;
1336                 }
1337         }
1338         return rc;
1339 }
1340
1341 static int
1342 lmv_enqueue_remote(struct obd_export *exp, int lock_type,
1343                    struct lookup_intent *it, int lock_mode,
1344                    struct md_op_data *op_data, struct lustre_handle *lockh,
1345                    void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
1346                    ldlm_blocking_callback cb_blocking, void *cb_data,
1347                    int extra_lock_flags)
1348 {
1349         struct ptlrpc_request *req = it->d.lustre.it_data;
1350         struct obd_device *obd = exp->exp_obd;
1351         struct lmv_obd *lmv = &obd->u.lmv;
1352         struct mdt_body *body = NULL;
1353         struct lustre_handle plock;
1354         struct obd_export *tgt_exp;
1355         struct md_op_data *rdata;
1356         int rc = 0, pmode;
1357         ENTRY;
1358
1359         body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
1360         LASSERT(body != NULL);
1361
1362         if (!(body->valid & OBD_MD_MDS))
1363                 RETURN(0);
1364
1365         CDEBUG(D_OTHER, "ENQUEUE '%s' on "DFID" -> "DFID"\n",
1366                LL_IT2STR(it), PFID(&op_data->fid1), PFID(&body->fid1));
1367
1368         /* we got LOOKUP lock, but we really need attrs */
1369         pmode = it->d.lustre.it_lock_mode;
1370         LASSERT(pmode != 0);
1371         memcpy(&plock, lockh, sizeof(plock));
1372         it->d.lustre.it_lock_mode = 0;
1373         it->d.lustre.it_data = NULL;
1374
1375         OBD_ALLOC_PTR(rdata);
1376         if (rdata == NULL)
1377                 RETURN(-ENOMEM);
1378         rdata->fid1 = body->fid1;
1379         rdata->name = NULL;
1380         rdata->namelen = 0;
1381
1382         it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
1383         ptlrpc_req_finished(req);
1384
1385         tgt_exp = lmv_get_export(lmv, &rdata->fid1);
1386         if (IS_ERR(tgt_exp))
1387                 GOTO(out_free_rdata, rc = PTR_ERR(tgt_exp));
1388
1389         rc = md_enqueue(tgt_exp, lock_type, it, lock_mode, rdata,
1390                         lockh, lmm, lmmsize, cb_compl, cb_blocking,
1391                         cb_data, extra_lock_flags);
1392         ldlm_lock_decref(&plock, pmode);
1393
1394         EXIT;
1395 out_free_rdata:
1396         OBD_FREE_PTR(rdata);
1397         return rc;
1398 }
1399
1400 static int
1401 lmv_enqueue(struct obd_export *exp, int lock_type,
1402             struct lookup_intent *it, int lock_mode,
1403             struct md_op_data *op_data, struct lustre_handle *lockh,
1404             void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
1405             ldlm_blocking_callback cb_blocking, void *cb_data,
1406             int extra_lock_flags)
1407 {
1408         struct obd_device *obd = exp->exp_obd;
1409         struct lmv_obd *lmv = &obd->u.lmv;
1410         struct obd_export *tgt_exp;
1411         struct lmv_obj *obj;
1412         int rc;
1413         ENTRY;
1414
1415         rc = lmv_check_connect(obd);
1416         if (rc)
1417                 RETURN(rc);
1418
1419         if (op_data->mea1 && it->it_op == IT_UNLINK) {
1420                 rc = lmv_enqueue_slaves(exp, lock_type, it, lock_mode,
1421                                         op_data, lockh, lmm, lmmsize,
1422                                         cb_compl, cb_blocking, cb_data);
1423                 RETURN(rc);
1424         }
1425
1426         if (op_data->namelen) {
1427                 obj = lmv_obj_grab(obd, &op_data->fid1);
1428                 if (obj) {
1429                         mdsno_t mds;
1430                         
1431                         /* directory is split. look for right mds for this
1432                          * name */
1433                         mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1434                                            (char *)op_data->name, op_data->namelen);
1435                         op_data->fid1 = obj->lo_inodes[mds].li_fid;
1436                         lmv_obj_put(obj);
1437                 }
1438         }
1439         CDEBUG(D_OTHER, "ENQUEUE '%s' on "DFID"\n", LL_IT2STR(it),
1440                PFID(&op_data->fid1));
1441
1442         tgt_exp = lmv_get_export(lmv, &op_data->fid1);
1443         if (IS_ERR(tgt_exp))
1444                 RETURN(PTR_ERR(tgt_exp));
1445
1446         rc = md_enqueue(tgt_exp, lock_type, it, lock_mode, op_data, lockh,
1447                         lmm, lmmsize, cb_compl, cb_blocking, cb_data,
1448                         extra_lock_flags);
1449         
1450         if (rc == 0 && it->it_op == IT_OPEN)
1451                 rc = lmv_enqueue_remote(exp, lock_type, it, lock_mode,
1452                                         op_data, lockh, lmm, lmmsize,
1453                                         cb_compl, cb_blocking, cb_data,
1454                                         extra_lock_flags);
1455         RETURN(rc);
1456 }
1457
1458 static int
1459 lmv_getattr_name(struct obd_export *exp, const struct lu_fid *fid,
1460                  const char *filename, int namelen, obd_valid valid,
1461                  int ea_size, struct ptlrpc_request **request)
1462 {
1463         struct obd_device *obd = exp->exp_obd;
1464         struct lmv_obd *lmv = &obd->u.lmv;
1465         struct obd_export *tgt_exp;
1466         struct lu_fid rid = *fid;
1467         struct mdt_body *body;
1468         struct lmv_obj *obj;
1469         int rc, loop = 0;
1470         mdsno_t mds;
1471         ENTRY;
1472
1473         rc = lmv_check_connect(obd);
1474         if (rc)
1475                 RETURN(rc);
1476
1477 repeat:
1478         LASSERT(++loop <= 2);
1479         obj = lmv_obj_grab(obd, fid);
1480         if (obj) {
1481                 /* directory is split. look for right mds for this name */
1482                 mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1483                                    filename, namelen - 1);
1484                 rid = obj->lo_inodes[mds].li_fid;
1485                 lmv_obj_put(obj);
1486         }
1487
1488         CDEBUG(D_OTHER, "getattr_name for %*s on "DFID" -> "DFID"\n",
1489                namelen, filename, PFID(fid), PFID(&rid));
1490
1491         tgt_exp = lmv_get_export(lmv, &rid);
1492         if (IS_ERR(tgt_exp))
1493                 RETURN(PTR_ERR(tgt_exp));
1494
1495         rc = md_getattr_name(tgt_exp, &rid, filename, namelen, valid,
1496                              ea_size, request);
1497         if (rc == 0) {
1498                 body = lustre_msg_buf((*request)->rq_repmsg,
1499                                       REQ_REC_OFF, sizeof(*body));
1500                 LASSERT(body != NULL);
1501
1502                 if (body->valid & OBD_MD_MDS) {
1503                         struct ptlrpc_request *req = NULL;
1504
1505                         rid = body->fid1;
1506                         CDEBUG(D_OTHER, "request attrs for "DFID"\n",
1507                                PFID(&rid));
1508
1509                         tgt_exp = lmv_get_export(lmv, &rid);
1510                         if (IS_ERR(tgt_exp)) {
1511                                 ptlrpc_req_finished(*request);
1512                                 RETURN(PTR_ERR(tgt_exp));
1513                         }
1514                         
1515                         rc = md_getattr_name(tgt_exp, &rid, NULL, 1, valid,
1516                                              ea_size, &req);
1517                         ptlrpc_req_finished(*request);
1518                         *request = req;
1519                 }
1520         } else if (rc == -ERESTART) {
1521                 /* directory got split. time to update local object and repeat
1522                  * the request with proper MDS */
1523                 rc = lmv_handle_split(exp, &rid);
1524                 if (rc == 0) {
1525                         ptlrpc_req_finished(*request);
1526                         goto repeat;
1527                 }
1528         }
1529         RETURN(rc);
1530 }
1531
1532 /*
1533  * llite passes fid of an target inode in op_data->fid1 and id of directory in
1534  * op_data->fid2
1535  */
1536 static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
1537                     struct ptlrpc_request **request)
1538 {
1539         struct obd_device *obd = exp->exp_obd;
1540         struct lmv_obd *lmv = &obd->u.lmv;
1541         struct lmv_obj *obj;
1542         mdsno_t mds;
1543         int rc;
1544         ENTRY;
1545
1546         rc = lmv_check_connect(obd);
1547         if (rc)
1548                 RETURN(rc);
1549
1550         if (op_data->namelen != 0) {
1551                 /* usual link request */
1552                 obj = lmv_obj_grab(obd, &op_data->fid2);
1553                 if (obj) {
1554                         rc = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1555                                           op_data->name, op_data->namelen);
1556                         op_data->fid2 = obj->lo_inodes[rc].li_fid;
1557                         lmv_obj_put(obj);
1558                 }
1559
1560                 rc = lmv_fld_lookup(lmv, &op_data->fid2, &mds);
1561                 if (rc)
1562                         RETURN(rc);
1563
1564                 CDEBUG(D_OTHER,"link "DFID":%*s to "DFID"\n",
1565                        PFID(&op_data->fid2), op_data->namelen,
1566                        op_data->name, PFID(&op_data->fid1));
1567         } else {
1568                 rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds);
1569                 if (rc)
1570                         RETURN(rc);
1571
1572                 /* request from MDS to acquire i_links for inode by fid1 */
1573                 CDEBUG(D_OTHER, "inc i_nlinks for "DFID"\n",
1574                        PFID(&op_data->fid1));
1575         }
1576
1577         CDEBUG(D_OTHER, "forward to MDS #"LPU64" ("DFID")\n",
1578                mds, PFID(&op_data->fid1));
1579         
1580         rc = md_link(lmv->tgts[mds].ltd_exp, op_data, request);
1581
1582         RETURN(rc);
1583 }
1584
1585 static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
1586                       const char *old, int oldlen, const char *new, int newlen,
1587                       struct ptlrpc_request **request)
1588 {
1589         struct obd_device *obd = exp->exp_obd;
1590         struct lmv_obd *lmv = &obd->u.lmv;
1591         struct lmv_obj *obj;
1592         mdsno_t mds, mds2;
1593         int rc;
1594         ENTRY;
1595
1596         CDEBUG(D_OTHER, "rename %*s in "DFID" to %*s in "DFID"\n",
1597                oldlen, old, PFID(&op_data->fid1), newlen, new,
1598                PFID(&op_data->fid2));
1599
1600         rc = lmv_check_connect(obd);
1601         if (rc)
1602                 RETURN(rc);
1603
1604         if (oldlen == 0) {
1605                 /*
1606                  * MDS with old dir entry is asking another MDS to create name
1607                  * there.
1608                  */
1609                 CDEBUG(D_OTHER,
1610                        "create %*s(%d/%d) in "DFID" pointing "
1611                        "to "DFID"\n", newlen, new, oldlen, newlen,
1612                        PFID(&op_data->fid2), PFID(&op_data->fid1));
1613
1614                 rc = lmv_fld_lookup(lmv, &op_data->fid2, &mds);
1615                 if (rc)
1616                         RETURN(rc);
1617
1618                 /*
1619                  * target directory can be split, sowe should forward request to
1620                  * the right MDS.
1621                  */
1622                 obj = lmv_obj_grab(obd, &op_data->fid2);
1623                 if (obj) {
1624                         mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1625                                            (char *)new, newlen);
1626                         op_data->fid2 = obj->lo_inodes[mds].li_fid;
1627                         CDEBUG(D_OTHER, "forward to MDS #"LPU64" ("DFID")\n", mds,
1628                                PFID(&op_data->fid2));
1629                         lmv_obj_put(obj);
1630                 }
1631                 goto request;
1632         }
1633
1634         obj = lmv_obj_grab(obd, &op_data->fid1);
1635         if (obj) {
1636                 /*
1637                  * directory is already split, so we have to forward request to
1638                  * the right MDS.
1639                  */
1640                 mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1641                                    (char *)old, oldlen);
1642                 op_data->fid1 = obj->lo_inodes[mds].li_fid;
1643                 CDEBUG(D_OTHER, "forward to MDS #"LPU64" ("DFID")\n", mds,
1644                        PFID(&op_data->fid1));
1645                 lmv_obj_put(obj);
1646         }
1647
1648         obj = lmv_obj_grab(obd, &op_data->fid2);
1649         if (obj) {
1650                 /*
1651                  * directory is already split, so we have to forward request to
1652                  * the right MDS.
1653                  */
1654                 mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1655                                    (char *)new, newlen);
1656
1657                 op_data->fid2 = obj->lo_inodes[mds].li_fid;
1658                 CDEBUG(D_OTHER, "forward to MDS #"LPU64" ("DFID")\n", mds,
1659                        PFID(&op_data->fid2));
1660                 lmv_obj_put(obj);
1661         }
1662
1663         rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds);
1664         if (rc)
1665                 RETURN(rc);
1666
1667 request:
1668         rc = lmv_fld_lookup(lmv, &op_data->fid2, &mds2);
1669         if (rc)
1670                 RETURN(rc);
1671
1672         if (mds != mds2) {
1673                 CDEBUG(D_OTHER,"cross-node rename "DFID"/%*s to "DFID"/%*s\n",
1674                        PFID(&op_data->fid1), oldlen, old, PFID(&op_data->fid2),
1675                        newlen, new);
1676         }
1677
1678         rc = md_rename(lmv->tgts[mds].ltd_exp, op_data, old, oldlen,
1679                        new, newlen, request);
1680         RETURN(rc);
1681 }
1682
1683 static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
1684                        struct iattr *iattr, void *ea, int ealen, void *ea2,
1685                        int ea2len, struct ptlrpc_request **request)
1686 {
1687         struct obd_device *obd = exp->exp_obd;
1688         struct lmv_obd *lmv = &obd->u.lmv;
1689         struct ptlrpc_request *req;
1690         struct obd_export *tgt_exp;
1691         struct mdt_body *body;
1692         struct lmv_obj *obj;
1693         int rc = 0, i;
1694         ENTRY;
1695
1696         rc = lmv_check_connect(obd);
1697         if (rc)
1698                 RETURN(rc);
1699
1700         obj = lmv_obj_grab(obd, &op_data->fid1);
1701
1702         CDEBUG(D_OTHER, "SETATTR for "DFID", valid 0x%x%s\n",
1703                PFID(&op_data->fid1), iattr->ia_valid, obj ? ", split" : "");
1704
1705         if (obj) {
1706                 for (i = 0; i < obj->lo_objcount; i++) {
1707                         op_data->fid1 = obj->lo_inodes[i].li_fid;
1708
1709                         tgt_exp = lmv_get_export(lmv, &op_data->fid1);
1710                         if (IS_ERR(tgt_exp)) {
1711                                 rc = PTR_ERR(tgt_exp);
1712                                 break;
1713                         }
1714
1715                         rc = md_setattr(tgt_exp, op_data, iattr, ea, ealen,
1716                                         ea2, ea2len, &req);
1717
1718                         if (lu_fid_eq(&obj->lo_fid, &obj->lo_inodes[i].li_fid)) {
1719                                 /*
1720                                  * this is master object and this request should
1721                                  * be returned back to llite.
1722                                  */
1723                                 *request = req;
1724                         } else {
1725                                 ptlrpc_req_finished(req);
1726                         }
1727
1728                         if (rc)
1729                                 break;
1730                 }
1731                 lmv_obj_put(obj);
1732         } else {
1733                 tgt_exp = lmv_get_export(lmv, &op_data->fid1);
1734                 if (IS_ERR(tgt_exp))
1735                         RETURN(PTR_ERR(tgt_exp));
1736
1737                 rc = md_setattr(tgt_exp, op_data, iattr, ea, ealen, ea2,
1738                                 ea2len, request);
1739                 if (rc == 0) {
1740                         body = lustre_msg_buf((*request)->rq_repmsg, REQ_REC_OFF,
1741                                               sizeof(*body));
1742                         LASSERT(body != NULL);
1743                 }
1744         }
1745         RETURN(rc);
1746 }
1747
1748 static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
1749                     struct ptlrpc_request **request)
1750 {
1751         struct obd_device *obd = exp->exp_obd;
1752         struct lmv_obd *lmv = &obd->u.lmv;
1753         struct obd_export *tgt_exp;
1754         int rc;
1755         ENTRY;
1756
1757         rc = lmv_check_connect(obd);
1758         if (rc)
1759                 RETURN(rc);
1760
1761         tgt_exp = lmv_get_export(lmv, fid);
1762         if (IS_ERR(tgt_exp))
1763                 RETURN(PTR_ERR(tgt_exp));
1764
1765         rc = md_sync(tgt_exp, fid, request);
1766         RETURN(rc);
1767 }
1768
1769 /* main purpose of LMV blocking ast is to remove split directory LMV
1770  * presentation object (struct lmv_obj) attached to the lock being revoked. */
1771 int lmv_blocking_ast(struct ldlm_lock *lock,
1772                      struct ldlm_lock_desc *desc,
1773                      void *data, int flag)
1774 {
1775         struct lustre_handle lockh;
1776         struct lmv_obj *obj;
1777         int rc;
1778         ENTRY;
1779
1780         switch (flag) {
1781         case LDLM_CB_BLOCKING:
1782                 ldlm_lock2handle(lock, &lockh);
1783                 rc = ldlm_cli_cancel(&lockh);
1784                 if (rc < 0) {
1785                         CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
1786                         RETURN(rc);
1787                 }
1788                 break;
1789         case LDLM_CB_CANCELING:
1790                 /* time to drop cached attrs for dirobj */
1791                 obj = lock->l_ast_data;
1792                 if (obj) {
1793                         CDEBUG(D_OTHER, "cancel %s on "LPU64"/"LPU64
1794                                ", master "DFID"\n",
1795                                lock->l_resource->lr_name.name[3] == 1 ?
1796                                "LOOKUP" : "UPDATE",
1797                                lock->l_resource->lr_name.name[0],
1798                                lock->l_resource->lr_name.name[1],
1799                                PFID(&obj->lo_fid));
1800                         lmv_obj_put(obj);
1801                 }
1802                 break;
1803         default:
1804                 LBUG();
1805         }
1806         RETURN(0);
1807 }
1808
1809 #if 0
1810 /* not needed for CMD3 because only dir on master has "." and ".." */
1811 static void lmv_remove_dots(struct page *page)
1812 {
1813         unsigned limit = PAGE_CACHE_SIZE;
1814         char *kaddr = cfs_page_address(page);
1815         struct ext2_dir_entry_2 *p;
1816         unsigned offs, rec_len;
1817
1818         for (offs = 0; offs <= limit - EXT2_DIR_REC_LEN(1); offs += rec_len) {
1819                 p = (struct ext2_dir_entry_2 *)(kaddr + offs);
1820                 rec_len = le16_to_cpu(p->rec_len);
1821
1822                 if ((p->name_len == 1 && p->name[0] == '.') ||
1823                     (p->name_len == 2 && p->name[0] == '.' && p->name[1] == '.'))
1824                         p->inode = 0;
1825         }
1826 }
1827 #endif
1828
1829 static int lmv_readpage(struct obd_export *exp,
1830                         const struct lu_fid *fid,
1831                         __u64 offset, struct page *page,
1832                         struct ptlrpc_request **request)
1833 {
1834         struct obd_device *obd = exp->exp_obd;
1835         struct lmv_obd *lmv = &obd->u.lmv;
1836         struct obd_export *tgt_exp;
1837         struct lu_fid rid = *fid;
1838         struct lmv_obj *obj;
1839         int i, rc;
1840         ENTRY;
1841
1842         rc = lmv_check_connect(obd);
1843         if (rc)
1844                 RETURN(rc);
1845
1846         CDEBUG(D_OTHER, "READPAGE at %llu from "DFID"\n",
1847                offset, PFID(&rid));
1848
1849         obj = lmv_obj_grab(obd, fid);
1850         if (obj) {
1851                 lmv_obj_lock(obj);
1852
1853                 /* find dirobj containing page with requested offset. */
1854                 for (i = 0; i < obj->lo_objcount; i++) {
1855                         if (offset < obj->lo_inodes[i].li_size)
1856                                 break;
1857                         offset -= obj->lo_inodes[i].li_size;
1858                 }
1859                 rid = obj->lo_inodes[i].li_fid;
1860
1861                 lmv_obj_unlock(obj);
1862                 lmv_obj_put(obj);
1863
1864                 CDEBUG(D_OTHER, "forward to "DFID" with offset %lu\n",
1865                        PFID(&rid), (unsigned long)offset);
1866         }
1867         
1868         tgt_exp = lmv_get_export(lmv, &rid);
1869         if (IS_ERR(tgt_exp))
1870                 RETURN(PTR_ERR(tgt_exp));
1871
1872         rc = md_readpage(tgt_exp, &rid, offset, page, request);
1873
1874 #if 0
1875         if (rc == 0 && !lu_fid_eq(&rid, fid))
1876                 /*
1877                  * This page isn't from master object. To avoid "." and ".."
1878                  * duplication in directory, we have to remove them from all
1879                  * slave objects
1880                  *
1881                  * XXX this is not needed for cmd3 readdir, because only master
1882                  * directory has dot and dotdot.
1883                  */
1884                 lmv_remove_dots(page);
1885 #endif
1886         
1887         RETURN(rc);
1888 }
1889
1890 static int lmv_unlink_slaves(struct obd_export *exp,
1891                              struct md_op_data *op_data,
1892                              struct ptlrpc_request **req)
1893 {
1894         struct obd_device *obd = exp->exp_obd;
1895         struct lmv_obd *lmv = &obd->u.lmv;
1896         struct lmv_stripe_md *mea = op_data->mea1;
1897         struct md_op_data *op_data2;
1898         struct obd_export *tgt_exp;
1899         int i, rc = 0;
1900         ENTRY;
1901
1902         OBD_ALLOC_PTR(op_data2);
1903         if (op_data2 == NULL)
1904                 RETURN(-ENOMEM);
1905
1906         LASSERT(mea != NULL);
1907         for (i = 0; i < mea->mea_count; i++) {
1908                 memset(op_data2, 0, sizeof(*op_data2));
1909                 op_data2->fid1 = mea->mea_ids[i];
1910                 op_data2->create_mode = MDS_MODE_DONT_LOCK | S_IFDIR;
1911
1912                 tgt_exp = lmv_get_export(lmv, &op_data2->fid1);
1913                 if (IS_ERR(tgt_exp))
1914                         GOTO(out_free_op_data2, rc = PTR_ERR(tgt_exp));
1915
1916                 if (tgt_exp == NULL)
1917                         continue;
1918
1919                 rc = md_unlink(tgt_exp, op_data2, req);
1920
1921                 CDEBUG(D_OTHER, "unlink slave "DFID" -> %d\n",
1922                        PFID(&mea->mea_ids[i]), rc);
1923
1924                 if (*req) {
1925                         ptlrpc_req_finished(*req);
1926                         *req = NULL;
1927                 }
1928                 if (rc)
1929                         GOTO(out_free_op_data2, rc);
1930         }
1931
1932         EXIT;
1933 out_free_op_data2:
1934         OBD_FREE_PTR(op_data2);
1935         return rc;
1936 }
1937
1938 static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
1939                       struct ptlrpc_request **request)
1940 {
1941         struct obd_device *obd = exp->exp_obd;
1942         struct lmv_obd *lmv = &obd->u.lmv;
1943         struct obd_export *tgt_exp;
1944         int rc, i;
1945         ENTRY;
1946
1947         rc = lmv_check_connect(obd);
1948         if (rc)
1949                 RETURN(rc);
1950
1951         if (op_data->namelen == 0 && op_data->mea1 != NULL) {
1952                 /* mds asks to remove slave objects */
1953                 rc = lmv_unlink_slaves(exp, op_data, request);
1954                 RETURN(rc);
1955         }
1956
1957         if (op_data->namelen != 0) {
1958                 struct lmv_obj *obj;
1959
1960                 obj = lmv_obj_grab(obd, &op_data->fid1);
1961                 if (obj) {
1962                         i = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1963                                          op_data->name, op_data->namelen);
1964                         op_data->fid1 = obj->lo_inodes[i].li_fid;
1965                         lmv_obj_put(obj);
1966                 }
1967                 CDEBUG(D_OTHER, "unlink '%*s' in "DFID" -> %u\n",
1968                        op_data->namelen, op_data->name, PFID(&op_data->fid1),
1969                        i);
1970         } else {
1971                 CDEBUG(D_OTHER, "drop i_nlink on "DFID"\n",
1972                        PFID(&op_data->fid1));
1973         }
1974         tgt_exp = lmv_get_export(lmv, &op_data->fid1);
1975         if (IS_ERR(tgt_exp))
1976                 RETURN(PTR_ERR(tgt_exp));
1977         
1978         rc = md_unlink(tgt_exp, op_data, request);
1979         RETURN(rc);
1980 }
1981
1982 static int lmv_llog_init(struct obd_device *obd, struct obd_device *tgt,
1983                          int count, struct llog_catid *logid)
1984 {
1985         struct llog_ctxt *ctxt;
1986         int rc;
1987         ENTRY;
1988
1989         rc = llog_setup(obd, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
1990                         &llog_client_ops);
1991         if (rc == 0) {
1992                 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
1993                 ctxt->loc_imp = tgt->u.cli.cl_import;
1994         }
1995
1996         RETURN(rc);
1997 }
1998
1999 static int lmv_llog_finish(struct obd_device *obd, int count)
2000 {
2001         int rc;
2002         ENTRY;
2003
2004         rc = llog_cleanup(llog_get_context(obd, LLOG_CONFIG_REPL_CTXT));
2005         RETURN(rc);
2006 }
2007
2008 static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2009 {
2010         int rc = 0;
2011
2012         switch (stage) {
2013         case OBD_CLEANUP_EARLY:
2014                 /* XXX: here should be calling obd_precleanup() down to
2015                  * stack. */
2016                 break;
2017         case OBD_CLEANUP_SELF_EXP:
2018                 rc = obd_llog_finish(obd, 0);
2019                 if (rc != 0)
2020                         CERROR("failed to cleanup llogging subsystems\n");
2021                 break;
2022         default:
2023                 break;
2024         }
2025         RETURN(rc);
2026 }
2027
2028 static int lmv_get_info(struct obd_export *exp, __u32 keylen,
2029                         void *key, __u32 *vallen, void *val)
2030 {
2031         struct obd_device *obd;
2032         struct lmv_obd *lmv;
2033         int rc = 0;
2034         ENTRY;
2035
2036         obd = class_exp2obd(exp);
2037         if (obd == NULL) {
2038                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2039                        exp->exp_handle.h_cookie);
2040                 RETURN(-EINVAL);
2041         }
2042
2043         lmv = &obd->u.lmv;
2044         if (keylen == strlen("mdsize") && !strcmp(key, "mdsize")) {
2045                 __u32 *mdsize = val;
2046                 *vallen = sizeof(__u32);
2047                 *mdsize = lmv_get_easize(lmv);
2048                 RETURN(0);
2049         } else if (keylen == strlen("mdsnum") && !strcmp(key, "mdsnum")) {
2050                 struct obd_uuid *cluuid = &lmv->cluuid;
2051                 struct lmv_tgt_desc *tgts;
2052                 __u32 *mdsnum = val;
2053                 int i;
2054
2055                 tgts = lmv->tgts;
2056                 for (i = 0; i < lmv->desc.ld_tgt_count; i++, tgts++) {
2057                         if (obd_uuid_equals(&tgts->uuid, cluuid)) {
2058                                 *vallen = sizeof(__u32);
2059                                 *mdsnum = i;
2060                                 RETURN(0);
2061                         }
2062                 }
2063                 LASSERT(0);
2064         } else if (keylen == strlen("rootid") && !strcmp(key, "rootid")) {
2065                 rc = lmv_check_connect(obd);
2066                 if (rc)
2067                         RETURN(rc);
2068
2069                 /* getting rootid from first MDS. */
2070                 rc = obd_get_info(lmv->tgts[0].ltd_exp, keylen, key,
2071                                   vallen, val);
2072                 RETURN(rc);
2073         } else if (keylen >= strlen("lmvdesc") && !strcmp(key, "lmvdesc")) {
2074                 struct lmv_desc *desc_ret = val;
2075                 *desc_ret = lmv->desc;
2076                 RETURN(0);
2077         } else if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
2078                 struct lmv_tgt_desc *tgts;
2079                 int i;
2080
2081                 rc = lmv_check_connect(obd);
2082                 if (rc)
2083                         RETURN(rc);
2084
2085                 LASSERT(*vallen == sizeof(__u32));
2086                 for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count;
2087                      i++, tgts++) {
2088
2089                         /* all tgts should be connected when this get called. */
2090                         if (!tgts || !tgts->ltd_exp) {
2091                                 CERROR("target not setup?\n");
2092                                 continue;
2093                         }
2094
2095                         if (!obd_get_info(tgts->ltd_exp, keylen, key,
2096                                           vallen, val))
2097                                 RETURN(0);
2098                 }
2099                 RETURN(-EINVAL);
2100         } else if ((keylen >= strlen("lovdesc") && !strcmp(key, "lovdesc")) ||
2101                    (keylen >= strlen("max_easize") && !strcmp(key, "max_easize"))) {
2102                 
2103                 rc = lmv_check_connect(obd);
2104                 if (rc)
2105                         RETURN(rc);
2106
2107                 /* forwarding this request to first MDS, it should know LOV
2108                  * desc. */
2109                 rc = obd_get_info(lmv->tgts[0].ltd_exp, keylen, key,
2110                                   vallen, val);
2111                 RETURN(rc);
2112         } /* else if (keylen >= strlen("getext") && !strcmp(key, "getext")) {
2113                 struct lmv_tgt_desc *tgts;
2114                 int i;
2115
2116                 rc = lmv_check_connect(obd);
2117                 if (rc)
2118                         RETURN(rc);
2119
2120                 LASSERT(*vallen == sizeof(struct fid_extent));
2121                 for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count;
2122                      i++, tgts++) {
2123
2124                         if (!tgts || !tgts->ltd_exp) {
2125                                 CERROR("target not setup?\n");
2126                                 continue;
2127                         }
2128
2129                         rc = obd_get_info(tgts->ltd_exp, keylen, key,
2130                                           vallen, val);
2131                         if (rc)
2132                                 RETURN(rc);
2133                 }
2134                 RETURN(0);
2135         }*/
2136
2137         CDEBUG(D_IOCTL, "invalid key\n");
2138         RETURN(-EINVAL);
2139 }
2140
2141 int lmv_set_info_async(struct obd_export *exp, obd_count keylen,
2142                        void *key, obd_count vallen, void *val,
2143                        struct ptlrpc_request_set *set)
2144 {
2145         struct lmv_tgt_desc    *tgt;
2146         struct obd_device      *obd;
2147         struct lmv_obd         *lmv;
2148         int rc = 0;
2149         ENTRY;
2150
2151         obd = class_exp2obd(exp);
2152         if (obd == NULL) {
2153                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2154                        exp->exp_handle.h_cookie);
2155                 RETURN(-EINVAL);
2156         }
2157         lmv = &obd->u.lmv;
2158
2159         if (keylen >= strlen("inter_mds") && strcmp(key, "inter_mds") == 0) {
2160                 lmv->server_timeout = 1;
2161                 lmv_set_timeouts(obd);
2162                 RETURN(0);
2163         }
2164
2165         /* maybe this could be default */
2166         if ((keylen == strlen("sec") && strcmp(key, "sec") == 0) ||
2167             (keylen == strlen("sec_flags") && strcmp(key, "sec_flags") == 0) ||
2168             (keylen == strlen("nllu") && strcmp(key, "nllu") == 0)) {
2169                 struct obd_export *exp;
2170                 int err, i;
2171
2172                 spin_lock(&lmv->lmv_lock);
2173                 for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count;
2174                      i++, tgt++) {
2175                         exp = tgt->ltd_exp;
2176                         /* during setup time the connections to mdc might
2177                          * haven't been established.
2178                          */
2179                         if (exp == NULL) {
2180                                 struct obd_device *tgt_obd;
2181
2182                                 tgt_obd = class_find_client_obd(&tgt->uuid,
2183                                                                 LUSTRE_MDC_NAME,
2184                                                                 &obd->obd_uuid);
2185                                 if (!tgt_obd) {
2186                                         CERROR("can't set info %s, "
2187                                                "device %s not attached?\n",
2188                                                 (char *) key, tgt->uuid.uuid);
2189                                         rc = -EINVAL;
2190                                         continue;
2191                                 }
2192                                 exp = tgt_obd->obd_self_export;
2193                         }
2194
2195                         err = obd_set_info_async(exp, keylen, key, vallen, val, set);
2196                         if (!rc)
2197                                 rc = err;
2198                 }
2199                 spin_unlock(&lmv->lmv_lock);
2200
2201                 RETURN(rc);
2202         }
2203         if (((keylen == strlen("flush_cred") &&
2204              strcmp(key, "flush_cred") == 0)) ||
2205              ((keylen == strlen("crypto_type") &&
2206              strcmp(key, "crypto_type") == 0))) {
2207                 int i;
2208
2209                 for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count;
2210                      i++, tgt++) {
2211                         if (!tgt->ltd_exp)
2212                                 continue;
2213                         rc = obd_set_info_async(tgt->ltd_exp,
2214                                                 keylen, key, vallen,
2215                                                 val, set);
2216                         if (rc)
2217                                 RETURN(rc);
2218                 }
2219
2220                 RETURN(0);
2221         }
2222
2223         if (keylen == strlen("ids") && memcmp(key, "ids", keylen) == 0) {
2224                 struct lu_fid *fid = (struct lu_fid *)val;
2225                 struct obd_export *tgt_exp;
2226
2227                 rc = lmv_check_connect(obd);
2228                 if (rc)
2229                         RETURN(rc);
2230
2231                 tgt_exp = lmv_get_export(lmv, fid);
2232                 if (IS_ERR(tgt_exp))
2233                         RETURN(PTR_ERR(tgt_exp));
2234
2235                 rc = obd_set_info_async(tgt_exp, keylen, key, vallen,
2236                                         val, set);
2237                 RETURN(rc);
2238         }
2239
2240         if (keylen == strlen("chkconnect") &&
2241             memcmp(key, "chkconnect", keylen) == 0) {
2242                 rc = lmv_check_connect(obd);
2243                 RETURN(rc);
2244         }
2245
2246         RETURN(-EINVAL);
2247 }
2248
2249 int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
2250                struct lov_stripe_md *lsm)
2251 {
2252         struct obd_device *obd = class_exp2obd(exp);
2253         struct lmv_obd *lmv = &obd->u.lmv;
2254         struct lmv_stripe_md *meap, *lsmp;
2255         int mea_size, i;
2256         ENTRY;
2257
2258         mea_size = lmv_get_easize(lmv);
2259         if (!lmmp)
2260                 RETURN(mea_size);
2261
2262         if (*lmmp && !lsm) {
2263                 OBD_FREE(*lmmp, mea_size);
2264                 *lmmp = NULL;
2265                 RETURN(0);
2266         }
2267
2268         if (*lmmp == NULL) {
2269                 OBD_ALLOC(*lmmp, mea_size);
2270                 if (*lmmp == NULL)
2271                         RETURN(-ENOMEM);
2272         }
2273
2274         if (!lsm)
2275                 RETURN(mea_size);
2276
2277         lsmp = (struct lmv_stripe_md *)lsm;
2278         meap = (struct lmv_stripe_md *)*lmmp;
2279
2280         if (lsmp->mea_magic != MEA_MAGIC_LAST_CHAR &&
2281             lsmp->mea_magic != MEA_MAGIC_ALL_CHARS)
2282                 RETURN(-EINVAL);
2283
2284         meap->mea_magic = cpu_to_le32(lsmp->mea_magic);
2285         meap->mea_count = cpu_to_le32(lsmp->mea_count);
2286         meap->mea_master = cpu_to_le32(lsmp->mea_master);
2287
2288         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2289                 meap->mea_ids[i] = meap->mea_ids[i];
2290                 fid_cpu_to_le(&meap->mea_ids[i]);
2291         }
2292
2293         RETURN(mea_size);
2294 }
2295
2296 int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
2297                  struct lov_mds_md *lmm, int lmm_size)
2298 {
2299         struct obd_device *obd = class_exp2obd(exp);
2300         struct lmv_stripe_md **tmea = (struct lmv_stripe_md **)lsmp;
2301         struct lmv_stripe_md *mea = (struct lmv_stripe_md *)lmm;
2302         struct lmv_obd *lmv = &obd->u.lmv;
2303         int mea_size, i;
2304         __u32 magic;
2305         ENTRY;
2306
2307         mea_size = lmv_get_easize(lmv);
2308         if (lsmp == NULL)
2309                 return mea_size;
2310
2311         if (*lsmp != NULL && lmm == NULL) {
2312                 OBD_FREE(*tmea, mea_size);
2313                 RETURN(0);
2314         }
2315
2316         LASSERT(mea_size == lmm_size);
2317
2318         OBD_ALLOC(*tmea, mea_size);
2319         if (*tmea == NULL)
2320                 RETURN(-ENOMEM);
2321
2322         if (!lmm)
2323                 RETURN(mea_size);
2324
2325         if (mea->mea_magic == MEA_MAGIC_LAST_CHAR ||
2326             mea->mea_magic == MEA_MAGIC_ALL_CHARS)
2327         {
2328                 magic = le32_to_cpu(mea->mea_magic);
2329         } else {
2330                 /* old mea isnot handled here */
2331                 LBUG();
2332         }
2333
2334         (*tmea)->mea_magic = magic;
2335         (*tmea)->mea_count = le32_to_cpu(mea->mea_count);
2336         (*tmea)->mea_master = le32_to_cpu(mea->mea_master);
2337
2338         for (i = 0; i < (*tmea)->mea_count; i++) {
2339                 (*tmea)->mea_ids[i] = mea->mea_ids[i];
2340                 fid_le_to_cpu(&(*tmea)->mea_ids[i]);
2341         }
2342         RETURN(mea_size);
2343 }
2344
2345 #if 0
2346 /* lmv_create() and lmv_brw() is needed anymore as they purely server stuff and
2347  * lmv is going to use only on client. */
2348 static int lmv_obd_create_single(struct obd_export *exp, struct obdo *oa,
2349                                  struct lov_stripe_md **ea,
2350                                  struct obd_trans_info *oti)
2351 {
2352         struct obd_device *obd = exp->exp_obd;
2353         struct lmv_obd *lmv = &obd->u.lmv;
2354         struct lov_stripe_md obj_md;
2355         struct lov_stripe_md *obj_mdp = &obj_md;
2356         int rc = 0;
2357         ENTRY;
2358
2359         LASSERT(ea == NULL);
2360         LASSERT(oa->o_mds < lmv->desc.ld_tgt_count);
2361
2362         rc = obd_create(lmv->tgts[oa->o_mds].ltd_exp,
2363                         oa, &obj_mdp, oti);
2364
2365         RETURN(rc);
2366 }
2367
2368 /*
2369  * to be called from MDS only. @oa should have correct store cookie and o_fid
2370  * values for "master" object, as it will be used.
2371  */
2372 int lmv_obd_create(struct obd_export *exp, struct obdo *oa,
2373                    struct lov_stripe_md **ea, struct obd_trans_info *oti)
2374 {
2375         struct obd_device *obd = exp->exp_obd;
2376         struct lmv_obd *lmv = &obd->u.lmv;
2377         struct lmv_stripe_md *mea;
2378         struct lu_fid mid;
2379         int i, c, rc = 0;
2380         ENTRY;
2381
2382         rc = lmv_check_connect(obd);
2383         if (rc)
2384                 RETURN(rc);
2385
2386         LASSERT(oa != NULL);
2387
2388         if (ea == NULL) {
2389                 rc = lmv_obd_create_single(exp, oa, NULL, oti);
2390                 if (rc)
2391                         CERROR("Can't create object, rc = %d\n", rc);
2392                 RETURN(rc);
2393         }
2394
2395         if (*ea == NULL) {
2396                 rc = obd_alloc_diskmd(exp, (struct lov_mds_md **)ea);
2397                 if (rc < 0) {
2398                         CERROR("obd_alloc_diskmd() failed, error %d\n",
2399                                rc);
2400                         RETURN(rc);
2401                 } else
2402                         rc = 0;
2403
2404                 if (*ea == NULL)
2405                         RETURN(-ENOMEM);
2406         }
2407
2408         /* here we should take care about split dir, so store cookie and fid
2409          * for "master" object should already be allocated and passed in @oa. */
2410         LASSERT(oa->o_id != 0);
2411         LASSERT(oa->o_fid != 0);
2412
2413         /* save "master" object fid */
2414         obdo2fid(oa, &mid);
2415
2416         mea = (struct lmv_stripe_md *)*ea;
2417         mea->mea_master = -1;
2418         mea->mea_magic = MEA_MAGIC_ALL_CHARS;
2419
2420         if (!mea->mea_count || mea->mea_count > lmv->desc.ld_tgt_count)
2421                 mea->mea_count = lmv->desc.ld_tgt_count;
2422
2423         for (i = 0, c = 0; c < mea->mea_count && i < lmv->desc.ld_tgt_count; i++) {
2424                 struct lov_stripe_md obj_md;
2425                 struct lov_stripe_md *obj_mdp = &obj_md;
2426
2427                 if (lmv->tgts[i].ltd_exp == NULL) {
2428                         /* this is "master" MDS */
2429                         mea->mea_master = i;
2430                         mea->mea_ids[c] = mid;
2431                         c++;
2432                         continue;
2433                 }
2434
2435                 /*
2436                  * "master" MDS should always be part of stripped dir,
2437                  * so scan for it.
2438                  */
2439                 if (mea->mea_master == -1 && c == mea->mea_count - 1)
2440                         continue;
2441
2442                 oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLTYPE | OBD_MD_FLMODE |
2443                         OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLID;
2444
2445                 rc = obd_create(lmv->tgts[c].ltd_exp, oa, &obj_mdp, oti);
2446                 if (rc) {
2447                         CERROR("obd_create() failed on MDT target %d, "
2448                                "error %d\n", c, rc);
2449                         RETURN(rc);
2450                 }
2451
2452                 CDEBUG(D_OTHER, "dirobj at mds %d: "LPU64"/%u\n",
2453                        i, oa->o_id, oa->o_generation);
2454
2455
2456                 /*
2457                  * here, when object is created (or it is master and was passed
2458                  * from caller) on desired MDS we save its fid to local mea_ids.
2459                  */
2460                 LASSERT(oa->o_fid);
2461
2462                 /*
2463                  * store cookie should be defined here for both cases (master
2464                  * object and not master), because master is already created.
2465                  */
2466                 LASSERT(oa->o_id);
2467
2468                 /* fill mea by store cookie and fid */
2469                 obdo2fid(oa, &mea->mea_ids[c]);
2470                 c++;
2471         }
2472         LASSERT(c == mea->mea_count);
2473
2474         CDEBUG(D_OTHER, "%d dirobjects created\n",
2475                (int)mea->mea_count);
2476
2477         RETURN(rc);
2478 }
2479
2480 int lmv_brw(int rw, struct obd_export *exp, struct obdo *oa,
2481             struct lov_stripe_md *ea, obd_count oa_bufs,
2482             struct brw_page *pgarr, struct obd_trans_info *oti)
2483 {
2484         /* splitting is not needed in lmv */
2485         struct obd_device *obd = exp->exp_obd;
2486         struct lmv_obd *lmv = &obd->u.lmv;
2487         struct lmv_stripe_md *mea = (struct lmv_stripe_md *) ea;
2488         int err;
2489
2490         LASSERT(oa != NULL);
2491         LASSERT(ea != NULL);
2492         LASSERT(pgarr != NULL);
2493         LASSERT(oa->o_mds < lmv->desc.ld_tgt_count);
2494
2495         oa->o_gr = id_gen(&mea->mea_ids[oa->o_mds]);
2496         oa->o_id = id_ino(&mea->mea_ids[oa->o_mds]);
2497         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2498
2499         err = obd_brw(rw, lmv->tgts[oa->o_mds].ltd_exp,
2500                       oa, NULL, oa_bufs, pgarr, oti);
2501         RETURN(err);
2502 }
2503 #endif
2504
2505 static int lmv_cancel_unused(struct obd_export *exp,
2506                              const struct lu_fid *fid,
2507                              int flags, void *opaque)
2508 {
2509         struct obd_device *obd = exp->exp_obd;
2510         struct lmv_obd *lmv = &obd->u.lmv;
2511         int rc = 0, err, i;
2512         ENTRY;
2513
2514         LASSERT(fid != NULL);
2515
2516         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2517                 if (!lmv->tgts[i].ltd_exp || !lmv->tgts[i].active)
2518                         continue;
2519
2520                 err = md_cancel_unused(lmv->tgts[i].ltd_exp,
2521                                        fid, flags, opaque);
2522                 if (!rc)
2523                         rc = err;
2524         }
2525         RETURN(rc);
2526 }
2527
2528 int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data)
2529 {
2530         struct obd_device *obd = exp->exp_obd;
2531         struct lmv_obd *lmv = &obd->u.lmv;
2532
2533         ENTRY;
2534         RETURN(md_set_lock_data(lmv->tgts[0].ltd_exp, lockh, data));
2535 }
2536
2537 int lmv_lock_match(struct obd_export *exp, int flags,
2538                    const struct lu_fid *fid, ldlm_type_t type,
2539                    ldlm_policy_data_t *policy, ldlm_mode_t mode,
2540                    struct lustre_handle *lockh)
2541 {
2542         struct obd_device *obd = exp->exp_obd;
2543         struct lmv_obd *lmv = &obd->u.lmv;
2544         int i, rc = 0;
2545         ENTRY;
2546
2547         CDEBUG(D_OTHER, "lock match for "DFID"\n", PFID(fid));
2548
2549         /* with CMD every object can have two locks in different namespaces:
2550          * lookup lock in space of mds storing direntry and update/open lock in
2551          * space of mds storing inode. Thus we check all targets, not only that
2552          * one fid was created in. */
2553         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2554                 rc = md_lock_match(lmv->tgts[i].ltd_exp, flags, fid,
2555                                    type, policy, mode, lockh);
2556                 if (rc)
2557                         RETURN(1);
2558         }
2559
2560         RETURN(rc);
2561 }
2562
2563 int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
2564                       int offset, struct obd_export *dt_exp, struct lustre_md *md)
2565 {
2566         struct obd_device *obd = exp->exp_obd;
2567         struct lmv_obd *lmv = &obd->u.lmv;
2568         int rc;
2569
2570         ENTRY;
2571         rc = md_get_lustre_md(lmv->tgts[0].ltd_exp, req, offset, dt_exp, md);
2572         RETURN(rc);
2573 }
2574
2575 int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
2576 {
2577         struct obd_device *obd = exp->exp_obd;
2578         struct lmv_obd *lmv = &obd->u.lmv;
2579
2580         ENTRY;
2581         RETURN(md_free_lustre_md(lmv->tgts[0].ltd_exp, md));
2582 }
2583
2584 int lmv_set_open_replay_data(struct obd_export *exp,
2585                              struct obd_client_handle *och,
2586                              struct ptlrpc_request *open_req)
2587 {
2588         struct obd_device *obd = exp->exp_obd;
2589         struct lmv_obd *lmv = &obd->u.lmv;
2590
2591         ENTRY;
2592         RETURN(md_set_open_replay_data(lmv->tgts[0].ltd_exp,
2593                                        och, open_req));
2594 }
2595
2596 int lmv_clear_open_replay_data(struct obd_export *exp,
2597                                struct obd_client_handle *och)
2598 {
2599         struct obd_device *obd = exp->exp_obd;
2600         struct lmv_obd *lmv = &obd->u.lmv;
2601
2602         ENTRY;
2603         RETURN(md_clear_open_replay_data(lmv->tgts[0].ltd_exp, och));
2604 }
2605
2606 struct obd_ops lmv_obd_ops = {
2607         .o_owner                = THIS_MODULE,
2608         .o_setup                = lmv_setup,
2609         .o_cleanup              = lmv_cleanup,
2610         .o_precleanup           = lmv_precleanup,
2611         .o_process_config       = lmv_process_config,
2612         .o_connect              = lmv_connect,
2613         .o_disconnect           = lmv_disconnect,
2614         .o_statfs               = lmv_statfs,
2615         .o_llog_init            = lmv_llog_init,
2616         .o_llog_finish          = lmv_llog_finish,
2617         .o_get_info             = lmv_get_info,
2618         .o_set_info_async       = lmv_set_info_async,
2619         .o_packmd               = lmv_packmd,
2620         .o_unpackmd             = lmv_unpackmd,
2621         .o_notify               = lmv_notify,
2622         .o_fid_init             = lmv_fid_init,
2623         .o_fid_fini             = lmv_fid_fini,
2624         .o_fid_alloc            = lmv_fid_alloc,
2625         .o_fid_delete           = lmv_fid_delete,
2626         .o_iocontrol            = lmv_iocontrol
2627 };
2628
2629 struct md_ops lmv_md_ops = {
2630         .m_getstatus            = lmv_getstatus,
2631         .m_change_cbdata        = lmv_change_cbdata,
2632         .m_close                = lmv_close,
2633         .m_create               = lmv_create,
2634         .m_done_writing         = lmv_done_writing,
2635         .m_enqueue              = lmv_enqueue,
2636         .m_getattr              = lmv_getattr,
2637         .m_getxattr             = lmv_getxattr,
2638         .m_getattr_name         = lmv_getattr_name,
2639         .m_intent_lock          = lmv_intent_lock,
2640         .m_link                 = lmv_link,
2641         .m_rename               = lmv_rename,
2642         .m_setattr              = lmv_setattr,
2643         .m_setxattr             = lmv_setxattr,
2644         .m_sync                 = lmv_sync,
2645         .m_readpage             = lmv_readpage,
2646         .m_unlink               = lmv_unlink,
2647         .m_init_ea_size         = lmv_init_ea_size,
2648         .m_cancel_unused        = lmv_cancel_unused,
2649         .m_set_lock_data        = lmv_set_lock_data,
2650         .m_lock_match           = lmv_lock_match,
2651         .m_get_lustre_md        = lmv_get_lustre_md,
2652         .m_free_lustre_md       = lmv_free_lustre_md,
2653         .m_set_open_replay_data = lmv_set_open_replay_data,
2654         .m_clear_open_replay_data = lmv_clear_open_replay_data
2655 };
2656
2657 int __init lmv_init(void)
2658 {
2659         struct lprocfs_static_vars lvars;
2660         int rc;
2661
2662         obj_cache = kmem_cache_create("lmv_objects",
2663                                       sizeof(struct lmv_obj),
2664                                       0, 0, NULL, NULL);
2665         if (!obj_cache) {
2666                 CERROR("error allocating lmv objects cache\n");
2667                 return -ENOMEM;
2668         }
2669
2670         lprocfs_init_vars(lmv, &lvars);
2671         rc = class_register_type(&lmv_obd_ops, &lmv_md_ops,
2672                                  lvars.module_vars, LUSTRE_LMV_NAME, NULL);
2673         if (rc)
2674                 kmem_cache_destroy(obj_cache);
2675
2676         return rc;
2677 }
2678
2679 #ifdef __KERNEL__
2680 static void lmv_exit(void)
2681 {
2682         class_unregister_type(LUSTRE_LMV_NAME);
2683
2684         LASSERTF(kmem_cache_destroy(obj_cache) == 0,
2685                  "can't free lmv objects cache, %d object(s)"
2686                  "still in use\n", atomic_read(&obj_cache_count));
2687 }
2688
2689 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2690 MODULE_DESCRIPTION("Lustre Logical Metadata Volume OBD driver");
2691 MODULE_LICENSE("GPL");
2692
2693 module_init(lmv_init);
2694 module_exit(lmv_exit);
2695 #endif