Whamcloud - gitweb
fix issue with use-after-free if commit occurs while journal_stop()
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LMV
26 #ifdef __KERNEL__
27 #include <linux/slab.h>
28 #include <linux/module.h>
29 #include <linux/init.h>
30 #include <linux/slab.h>
31 #include <linux/pagemap.h>
32 #include <asm/div64.h>
33 #include <linux/seq_file.h>
34 #include <linux/namei.h>
35 #else
36 #include <liblustre.h>
37 #include <lustre_log.h>
38 #endif
39 #include <linux/ext2_fs.h>
40
41 #include <lustre/lustre_idl.h>
42 #include <obd_support.h>
43 #include <lustre_lib.h>
44 #include <lustre_net.h>
45 #include <obd_class.h>
46 #include <lprocfs_status.h>
47 #include <lustre_lite.h>
48 #include "lmv_internal.h"
49
50 /* not defined for liblustre building */
51 #if !defined(ATOMIC_INIT)
52 #define ATOMIC_INIT(val) { (val) }
53 #endif
54
55 /* object cache. */
56 kmem_cache_t *obj_cache;
57 atomic_t obj_cache_count = ATOMIC_INIT(0);
58
59 static void lmv_activate_target(struct lmv_obd *lmv,
60                                 struct lmv_tgt_desc *tgt,
61                                 int activate)
62 {
63         if (tgt->active == activate)
64                 return;
65
66         tgt->active = activate;
67         lmv->desc.ld_active_tgt_count += (activate ? 1 : -1);
68 }
69
70 /* Error codes:
71  *
72  *  -EINVAL  : UUID can't be found in the LMV's target list
73  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
74  *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
75  */
76 static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
77                               int activate)
78 {
79         struct lmv_tgt_desc *tgt;
80         struct obd_device *obd;
81         int i, rc = 0;
82         ENTRY;
83
84         CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
85                lmv, uuid->uuid, activate);
86
87         spin_lock(&lmv->lmv_lock);
88         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
89                 if (tgt->ltd_exp == NULL)
90                         continue;
91
92                 CDEBUG(D_INFO, "lmv idx %d is %s conn "LPX64"\n",
93                        i, tgt->uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
94
95                 if (obd_uuid_equals(uuid, &tgt->uuid))
96                         break;
97         }
98
99         if (i == lmv->desc.ld_tgt_count)
100                 GOTO(out_lmv_lock, rc = -EINVAL);
101
102         obd = class_exp2obd(tgt->ltd_exp);
103         if (obd == NULL)
104                 GOTO(out_lmv_lock, rc = -ENOTCONN);
105
106         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
107                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
108                obd->obd_type->typ_name, i);
109         LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0);
110
111         if (tgt->active == activate) {
112                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
113                        activate ? "" : "in");
114                 GOTO(out_lmv_lock, rc);
115         }
116
117         CDEBUG(D_INFO, "Marking OBD %p %sactive\n",
118                obd, activate ? "" : "in");
119
120         lmv_activate_target(lmv, tgt, activate);
121
122         EXIT;
123
124  out_lmv_lock:
125         spin_unlock(&lmv->lmv_lock);
126         return rc;
127 }
128
129 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
130                       enum obd_notify_event ev, void *data)
131 {
132         struct obd_uuid *uuid;
133         int rc;
134         ENTRY;
135
136         if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) {
137                 CERROR("unexpected notification of %s %s!\n",
138                        watched->obd_type->typ_name,
139                        watched->obd_name);
140                 RETURN(-EINVAL);
141         }
142         uuid = &watched->u.cli.cl_target_uuid;
143
144         /* Set MDC as active before notifying the observer, so the observer can
145          * use the MDC normally. */
146         rc = lmv_set_mdc_active(&obd->u.lmv, uuid,
147                                 ev == OBD_NOTIFY_ACTIVE);
148         if (rc) {
149                 CERROR("%sactivation of %s failed: %d\n",
150                        ev == OBD_NOTIFY_ACTIVE ? "" : "de",
151                        uuid->uuid, rc);
152                 RETURN(rc);
153         }
154
155         if (obd->obd_observer)
156                 /* pass the notification up the chain. */
157                 rc = obd_notify(obd->obd_observer, watched, ev, data);
158
159         RETURN(rc);
160 }
161
162 /* this is fake connect function. Its purpose is to initialize lmv and say
163  * caller that everything is okay. Real connection will be performed later. */
164 static int lmv_connect(const struct lu_context *ctx,
165                        struct lustre_handle *conn, struct obd_device *obd,
166                        struct obd_uuid *cluuid, struct obd_connect_data *data)
167 {
168 #ifdef __KERNEL__
169         struct proc_dir_entry *lmv_proc_dir;
170 #endif
171         struct lmv_obd *lmv = &obd->u.lmv;
172         struct obd_export *exp;
173         int rc = 0;
174         ENTRY;
175
176         rc = class_connect(conn, obd, cluuid);
177         if (rc) {
178                 CERROR("class_connection() returned %d\n", rc);
179                 RETURN(rc);
180         }
181
182         exp = class_conn2export(conn);
183
184         /* we don't want to actually do the underlying connections more than
185          * once, so keep track. */
186         lmv->refcount++;
187         if (lmv->refcount > 1) {
188                 class_export_put(exp);
189                 RETURN(0);
190         }
191
192         lmv->exp = exp;
193         lmv->connected = 0;
194         lmv->cluuid = *cluuid;
195
196         /* saving */
197         if (data)
198                 memcpy(&lmv->conn_data, data, sizeof(*data));
199
200 #ifdef __KERNEL__
201         lmv_proc_dir = lprocfs_register("target_obds", obd->obd_proc_entry,
202                                         NULL, NULL);
203         if (IS_ERR(lmv_proc_dir)) {
204                 CERROR("could not register /proc/fs/lustre/%s/%s/target_obds.",
205                        obd->obd_type->typ_name, obd->obd_name);
206                 lmv_proc_dir = NULL;
207         }
208 #endif
209
210         /* all real clients should perform actual connection right away, because
211          * it is possible, that LMV will not have opportunity to connect targets
212          * and MDC stuff will be called directly, for instance while reading
213          * ../mdc/../kbytesfree procfs file, etc. */
214         if (data->ocd_connect_flags & OBD_CONNECT_REAL)
215                 rc = lmv_check_connect(obd);
216
217 #ifdef __KERNEL__
218         if (rc) {
219                 if (lmv_proc_dir)
220                         lprocfs_remove(lmv_proc_dir);
221         }
222 #endif
223
224         RETURN(rc);
225 }
226
227 static void lmv_set_timeouts(struct obd_device *obd)
228 {
229         struct lmv_tgt_desc *tgts;
230         struct lmv_obd *lmv;
231         int i;
232
233         lmv = &obd->u.lmv;
234         if (lmv->server_timeout == 0)
235                 return;
236
237         if (lmv->connected == 0)
238                 return;
239
240         for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgts++) {
241                 if (tgts->ltd_exp == NULL)
242                         continue;
243
244                 obd_set_info_async(tgts->ltd_exp, strlen("inter_mds"),
245                                    "inter_mds", 0, NULL, NULL);
246         }
247 }
248
249 static int lmv_init_ea_size(struct obd_export *exp, int easize,
250                             int def_easize, int cookiesize)
251 {
252         struct obd_device *obd = exp->exp_obd;
253         struct lmv_obd *lmv = &obd->u.lmv;
254         int i, rc = 0, change = 0;
255         ENTRY;
256
257         if (lmv->max_easize < easize) {
258                 lmv->max_easize = easize;
259                 change = 1;
260         }
261         if (lmv->max_def_easize < def_easize) {
262                 lmv->max_def_easize = def_easize;
263                 change = 1;
264         }
265         if (lmv->max_cookiesize < cookiesize) {
266                 lmv->max_cookiesize = cookiesize;
267                 change = 1;
268         }
269         if (change == 0)
270                 RETURN(0);
271
272         if (lmv->connected == 0)
273                 RETURN(0);
274
275         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
276                 if (lmv->tgts[i].ltd_exp == NULL) {
277                         CWARN("%s: NULL export for %d\n", obd->obd_name, i);
278                         continue;
279                 }
280
281                 rc = md_init_ea_size(lmv->tgts[i].ltd_exp, easize, def_easize,
282                                      cookiesize);
283                 if (rc) {
284                         CERROR("obd_init_ea_size() failed on MDT target %d, "
285                                "error %d.\n", i, rc);
286                         break;
287                 }
288         }
289         RETURN(rc);
290 }
291
292 #define MAX_STRING_SIZE 128
293
294 int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
295 {
296         struct lmv_obd *lmv = &obd->u.lmv;
297         struct obd_uuid *cluuid = &lmv->cluuid;
298         struct obd_connect_data *mdc_data = NULL;
299         struct obd_uuid lmv_mdc_uuid = { "LMV_MDC_UUID" };
300         struct lustre_handle conn = {0, };
301         struct obd_device *mdc_obd;
302         struct obd_export *mdc_exp;
303         int rc;
304 #ifdef __KERNEL__
305         struct proc_dir_entry *lmv_proc_dir;
306 #endif
307         ENTRY;
308
309         /* for MDS: don't connect to yourself */
310         if (obd_uuid_equals(&tgt->uuid, cluuid)) {
311                 CDEBUG(D_CONFIG, "don't connect back to %s\n", cluuid->uuid);
312                 /* XXX - the old code didn't increment active tgt count.
313                  *       should we ? */
314                 RETURN(0);
315         }
316
317         mdc_obd = class_find_client_obd(&tgt->uuid, LUSTRE_MDC_NAME,
318                                         &obd->obd_uuid);
319         if (!mdc_obd) {
320                 CERROR("target %s not attached\n", tgt->uuid.uuid);
321                 RETURN(-EINVAL);
322         }
323
324         CDEBUG(D_CONFIG, "connect to %s(%s) - %s, %s FOR %s\n",
325                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
326                 tgt->uuid.uuid, obd->obd_uuid.uuid,
327                 cluuid->uuid);
328
329         if (!mdc_obd->obd_set_up) {
330                 CERROR("target %s not set up\n", tgt->uuid.uuid);
331                 RETURN(-EINVAL);
332         }
333
334         rc = obd_connect(NULL, &conn, mdc_obd, &lmv_mdc_uuid, &lmv->conn_data);
335         if (rc) {
336                 CERROR("target %s connect error %d\n", tgt->uuid.uuid, rc);
337                 RETURN(rc);
338         }
339
340         mdc_exp = class_conn2export(&conn);
341         fld_client_add_target(&lmv->lmv_fld, mdc_exp);
342
343         mdc_data = &class_exp2cliimp(mdc_exp)->imp_connect_data;
344
345         rc = obd_register_observer(mdc_obd, obd);
346         if (rc) {
347                 obd_disconnect(mdc_exp);
348                 CERROR("target %s register_observer error %d\n",
349                        tgt->uuid.uuid, rc);
350                 RETURN(rc);
351         }
352
353         if (obd->obd_observer) {
354                 /* tell the mds_lmv about the new target */
355                 rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
356                                 OBD_NOTIFY_ACTIVE, (void *)(tgt - lmv->tgts));
357                 if (rc) {
358                         obd_disconnect(mdc_exp);
359                         RETURN(rc);
360                 }
361         }
362
363         tgt->active = 1;
364         tgt->ltd_exp = mdc_exp;
365         lmv->desc.ld_active_tgt_count++;
366
367         /* copy connect data, it may be used later */
368         lmv->datas[tgt->idx] = *mdc_data;
369
370         md_init_ea_size(tgt->ltd_exp, lmv->max_easize,
371                         lmv->max_def_easize, lmv->max_cookiesize);
372
373         CDEBUG(D_CONFIG, "connected to %s(%s) successfully (%d)\n",
374                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
375                 atomic_read(&obd->obd_refcount));
376
377 #ifdef __KERNEL__
378         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
379         if (lmv_proc_dir) {
380                 struct proc_dir_entry *mdc_symlink;
381                 char name[MAX_STRING_SIZE + 1];
382
383                 LASSERT(mdc_obd->obd_type != NULL);
384                 LASSERT(mdc_obd->obd_type->typ_name != NULL);
385                 name[MAX_STRING_SIZE] = '\0';
386                 snprintf(name, MAX_STRING_SIZE, "../../../%s/%s",
387                          mdc_obd->obd_type->typ_name,
388                          mdc_obd->obd_name);
389                 mdc_symlink = proc_symlink(mdc_obd->obd_name,
390                                            lmv_proc_dir, name);
391                 if (mdc_symlink == NULL) {
392                         CERROR("could not register LMV target "
393                                "/proc/fs/lustre/%s/%s/target_obds/%s.",
394                                obd->obd_type->typ_name, obd->obd_name,
395                                mdc_obd->obd_name);
396                         lprocfs_remove(lmv_proc_dir);
397                         lmv_proc_dir = NULL;
398                 }
399         }
400 #endif
401         RETURN(0);
402 }
403
404 int lmv_add_target(struct obd_device *obd, struct obd_uuid *tgt_uuid)
405 {
406         struct lmv_obd *lmv = &obd->u.lmv;
407         struct lmv_tgt_desc *tgt;
408         int rc = 0;
409         ENTRY;
410
411         CDEBUG(D_CONFIG, "tgt_uuid: %s.\n", tgt_uuid->uuid);
412
413         lmv_init_lock(lmv);
414
415         if (lmv->desc.ld_active_tgt_count >= LMV_MAX_TGT_COUNT) {
416                 lmv_init_unlock(lmv);
417                 CERROR("can't add %s, LMV module compiled for %d MDCs. "
418                        "That many MDCs already configured.\n",
419                        tgt_uuid->uuid, LMV_MAX_TGT_COUNT);
420                 RETURN(-EINVAL);
421         }
422         if (lmv->desc.ld_tgt_count == 0) {
423                 struct obd_device *mdc_obd;
424
425                 mdc_obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME,
426                                                 &obd->obd_uuid);
427                 if (!mdc_obd) {
428                         lmv_init_unlock(lmv);
429                         CERROR("Target %s not attached\n", tgt_uuid->uuid);
430                         RETURN(-EINVAL);
431                 }
432
433                 rc = obd_llog_init(obd, mdc_obd, 0, NULL);
434                 if (rc) {
435                         lmv_init_unlock(lmv);
436                         CERROR("lmv failed to setup llogging subsystems\n");
437                 }
438         }
439         spin_lock(&lmv->lmv_lock);
440         tgt = lmv->tgts + lmv->desc.ld_tgt_count++;
441         tgt->uuid = *tgt_uuid;
442         spin_unlock(&lmv->lmv_lock);
443
444         if (lmv->connected) {
445                 rc = lmv_connect_mdc(obd, tgt);
446                 if (rc) {
447                         spin_lock(&lmv->lmv_lock);
448                         lmv->desc.ld_tgt_count--;
449                         memset(tgt, 0, sizeof(*tgt));
450                         spin_unlock(&lmv->lmv_lock);
451                 } else {
452                         int easize = sizeof(struct lmv_stripe_md) +
453                                      lmv->desc.ld_tgt_count *
454                                      sizeof(struct lu_fid);
455                         lmv_init_ea_size(obd->obd_self_export, easize, 0, 0);
456                 }
457         }
458
459         lmv_init_unlock(lmv);
460         RETURN(rc);
461 }
462
463 /* performs a check if passed obd is connected. If no - connect it. */
464 int lmv_check_connect(struct obd_device *obd)
465 {
466         struct lmv_obd *lmv = &obd->u.lmv;
467         struct lmv_tgt_desc *tgt;
468         int i, rc, easize;
469         ENTRY;
470
471         if (lmv->connected)
472                 RETURN(0);
473
474         lmv_init_lock(lmv);
475         if (lmv->connected) {
476                 lmv_init_unlock(lmv);
477                 RETURN(0);
478         }
479
480         if (lmv->desc.ld_tgt_count == 0) {
481                 CERROR("%s: no targets configured.\n", obd->obd_name);
482                 RETURN(-EINVAL);
483         }
484
485         CDEBUG(D_CONFIG, "time to connect %s to %s\n",
486                lmv->cluuid.uuid, obd->obd_name);
487
488         LASSERT(lmv->tgts != NULL);
489
490         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
491                 rc = lmv_connect_mdc(obd, tgt);
492                 if (rc)
493                         GOTO(out_disc, rc);
494         }
495
496         lmv_set_timeouts(obd);
497         class_export_put(lmv->exp);
498         lmv->connected = 1;
499         easize = lmv->desc.ld_tgt_count * sizeof(struct lu_fid) +
500                  sizeof(struct lmv_stripe_md);
501         lmv_init_ea_size(obd->obd_self_export, easize, 0, 0);
502         lmv_init_unlock(lmv);
503         RETURN(0);
504
505  out_disc:
506         while (i-- > 0) {
507                 int rc2;
508                 --tgt;
509                 tgt->active = 0;
510                 if (tgt->ltd_exp) {
511                         --lmv->desc.ld_active_tgt_count;
512                         rc2 = obd_disconnect(tgt->ltd_exp);
513                         if (rc2) {
514                                 CERROR("error: LMV target %s disconnect on "
515                                        "MDC idx %d: error %d\n",
516                                        tgt->uuid.uuid, i, rc2);
517                         }
518                 }
519         }
520         class_disconnect(lmv->exp);
521         lmv_init_unlock(lmv);
522         RETURN(rc);
523 }
524
525 static int lmv_disconnect(struct obd_export *exp)
526 {
527         struct obd_device *obd = class_exp2obd(exp);
528         struct lmv_obd *lmv = &obd->u.lmv;
529
530 #ifdef __KERNEL__
531         struct proc_dir_entry *lmv_proc_dir;
532 #endif
533         int rc, i;
534         ENTRY;
535
536         if (!lmv->tgts)
537                 goto out_local;
538
539         /* Only disconnect the underlying layers on the final disconnect. */
540         lmv->refcount--;
541         if (lmv->refcount != 0)
542                 goto out_local;
543
544 #ifdef __KERNEL__
545         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
546 #endif
547
548         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
549                 struct obd_device *mdc_obd;
550
551                 if (lmv->tgts[i].ltd_exp == NULL)
552                         continue;
553
554                 mdc_obd = class_exp2obd(lmv->tgts[i].ltd_exp);
555
556                 if (mdc_obd)
557                         mdc_obd->obd_no_recov = obd->obd_no_recov;
558
559 #ifdef __KERNEL__
560                 if (lmv_proc_dir) {
561                         struct proc_dir_entry *mdc_symlink;
562
563                         mdc_symlink = lprocfs_srch(lmv_proc_dir, mdc_obd->obd_name);
564                         if (mdc_symlink) {
565                                 lprocfs_remove(mdc_symlink);
566                         } else {
567                                 CERROR("/proc/fs/lustre/%s/%s/target_obds/%s missing\n",
568                                        obd->obd_type->typ_name, obd->obd_name,
569                                        mdc_obd->obd_name);
570                         }
571                 }
572 #endif
573                 CDEBUG(D_OTHER, "disconnected from %s(%s) successfully\n",
574                         lmv->tgts[i].ltd_exp->exp_obd->obd_name,
575                         lmv->tgts[i].ltd_exp->exp_obd->obd_uuid.uuid);
576
577                 obd_register_observer(lmv->tgts[i].ltd_exp->exp_obd, NULL);
578                 rc = obd_disconnect(lmv->tgts[i].ltd_exp);
579                 if (rc) {
580                         if (lmv->tgts[i].active) {
581                                 CERROR("Target %s disconnect error %d\n",
582                                        lmv->tgts[i].uuid.uuid, rc);
583                         }
584                         rc = 0;
585                 }
586
587                 lmv_activate_target(lmv, &lmv->tgts[i], 0);
588                 lmv->tgts[i].ltd_exp = NULL;
589         }
590
591 #ifdef __KERNEL__
592         if (lmv_proc_dir) {
593                 lprocfs_remove(lmv_proc_dir);
594         } else {
595                 CERROR("/proc/fs/lustre/%s/%s/target_obds missing\n",
596                        obd->obd_type->typ_name, obd->obd_name);
597         }
598 #endif
599
600 out_local:
601         /* this is the case when no real connection is established by
602          * lmv_check_connect(). */
603         if (!lmv->connected)
604                 class_export_put(exp);
605         rc = class_disconnect(exp);
606         if (lmv->refcount == 0)
607                 lmv->connected = 0;
608         RETURN(rc);
609 }
610
611 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
612                          int len, void *karg, void *uarg)
613 {
614         struct obd_device *obddev = class_exp2obd(exp);
615         struct lmv_obd *lmv = &obddev->u.lmv;
616         int i, rc = 0, set = 0;
617         ENTRY;
618
619         if (lmv->desc.ld_tgt_count == 0)
620                 RETURN(-ENOTTY);
621
622         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
623                 int err;
624
625                 if (lmv->tgts[i].ltd_exp == NULL)
626                         continue;
627
628                 err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp, len, karg, uarg);
629                 if (err) {
630                         if (lmv->tgts[i].active) {
631                                 CERROR("error: iocontrol MDC %s on MDT"
632                                        "idx %d: err = %d\n",
633                                        lmv->tgts[i].uuid.uuid, i, err);
634                                 if (!rc)
635                                         rc = err;
636                         }
637                 } else
638                         set = 1;
639         }
640         if (!set && !rc)
641                 rc = -EIO;
642
643         RETURN(rc);
644 }
645
646 static int lmv_fids_balanced(struct obd_device *obd)
647 {
648         ENTRY;
649         RETURN(0);
650 }
651
652 /* returns number of target where new fid should be allocated using passed @hint
653  * as input data for making decision. */
654 static int lmv_placement_policy(struct obd_device *obd,
655                                 struct lu_placement_hint *hint)
656 {
657         struct lmv_obd *lmv = &obd->u.lmv;
658         ENTRY;
659
660         /* here are some policies to allocate new fid */
661         if (hint->ph_cname && lmv_fids_balanced(obd)) {
662                 /* allocate new fid basing on its name in the case fids are
663                  * balanced, that is all sequences have more or less equal
664                  * number of objects created. */
665         } else {
666                 /* sequences among all tgts are not well balanced, allocate new
667                  * fid taking this into account to balance them. */
668         }
669         //stub to place new dir on second MDS
670         if (hint->ph_opc == LUSTRE_OPC_MKDIR)
671                 RETURN(lmv->desc.ld_tgt_count - 1);
672
673         RETURN(0);
674 }
675
676 static int lmv_fid_init(struct obd_export *exp)
677 {
678         struct obd_device *obd = class_exp2obd(exp);
679         struct lmv_obd *lmv = &obd->u.lmv;
680         int i, rc = 0;
681         ENTRY;
682
683         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
684                 if (lmv->tgts[i].ltd_exp == NULL)
685                         continue;
686
687                 rc = obd_fid_init(lmv->tgts[i].ltd_exp);
688                 if (rc)
689                         RETURN(rc);
690         }
691         RETURN(rc);
692 }
693
694 static int lmv_fid_fini(struct obd_export *exp)
695 {
696         struct obd_device *obd = class_exp2obd(exp);
697         struct lmv_obd *lmv = &obd->u.lmv;
698         int i, rc = 0;
699         ENTRY;
700
701         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
702                 if (lmv->tgts[i].ltd_exp == NULL)
703                         continue;
704
705                 rc = obd_fid_fini(lmv->tgts[i].ltd_exp);
706                 if (rc)
707                         break;
708         }
709         RETURN(rc);
710 }
711
712 static int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
713                          struct lu_placement_hint *hint)
714 {
715         struct obd_device *obd = class_exp2obd(exp);
716         struct lmv_obd *lmv = &obd->u.lmv;
717         int rc = 0, mds;
718         ENTRY;
719
720         LASSERT(fid != NULL);
721         LASSERT(hint != NULL);
722
723         mds = lmv_placement_policy(obd, hint);
724         if (mds < 0 || mds >= lmv->desc.ld_tgt_count) {
725                 CERROR("can't get target for allocating fid\n");
726                 RETURN(-EINVAL);
727         }
728
729         /* asking underlaying tgt layer to allocate new fid */
730         rc = obd_fid_alloc(lmv->tgts[mds].ltd_exp, fid, hint);
731
732         /* client switches to new sequence, setup fld */
733         if (rc > 0) {
734                 LASSERT(fid_is_sane(fid));
735                 
736                 rc = fld_client_create(&lmv->lmv_fld,
737                                        fid_seq(fid),
738                                        mds);
739                 if (rc) {
740                         CERROR("can't create fld entry, "
741                                "rc %d\n", rc);
742                 }
743         }
744
745         RETURN(rc);
746 }
747
748 static int lmv_fid_delete(struct obd_export *exp, struct lu_fid *fid)
749 {
750         ENTRY;
751
752         LASSERT(exp && fid);
753         if (lmv_obj_delete(exp, fid)) {
754                 CDEBUG(D_OTHER, "lmv object "DFID" is destroyed.\n",
755                        PFID(fid));
756         }
757         RETURN(0);
758 }
759
760 static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
761 {
762         struct lmv_obd *lmv = &obd->u.lmv;
763         struct lprocfs_static_vars lvars;
764         struct lmv_desc *desc;
765         int rc, i = 0;
766         ENTRY;
767
768         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
769                 CERROR("LMV setup requires a descriptor\n");
770                 RETURN(-EINVAL);
771         }
772
773         desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
774         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
775                 CERROR("descriptor size wrong: %d > %d\n",
776                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
777                 RETURN(-EINVAL);
778         }
779
780         lmv->tgts_size = LMV_MAX_TGT_COUNT * sizeof(struct lmv_tgt_desc);
781
782         OBD_ALLOC(lmv->tgts, lmv->tgts_size);
783         if (lmv->tgts == NULL)
784                 RETURN(-ENOMEM);
785
786         for (i = 0; i < LMV_MAX_TGT_COUNT; i++)
787                 lmv->tgts[i].idx = i;
788
789         lmv->datas_size = LMV_MAX_TGT_COUNT * sizeof(struct obd_connect_data);
790
791         OBD_ALLOC(lmv->datas, lmv->datas_size);
792         if (lmv->datas == NULL)
793                 GOTO(out_free_tgts, rc = -ENOMEM);
794
795         obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid);
796         lmv->desc.ld_tgt_count = 0;
797         lmv->desc.ld_active_tgt_count = 0;
798         lmv->max_cookiesize = 0;
799         lmv->max_def_easize = 0;
800         lmv->max_easize = 0;
801
802         spin_lock_init(&lmv->lmv_lock);
803         sema_init(&lmv->init_sem, 1);
804
805         rc = lmv_mgr_setup(obd);
806         if (rc) {
807                 CERROR("Can't setup LMV object manager, "
808                        "error %d.\n", rc);
809                 GOTO(out_free_datas, rc);
810         }
811
812         lprocfs_init_vars(lmv, &lvars);
813         lprocfs_obd_setup(obd, lvars.obd_vars);
814 #ifdef LPROCFS
815         {
816                 struct proc_dir_entry *entry;
817
818                 entry = create_proc_entry("target_obd_status", 0444,
819                                           obd->obd_proc_entry);
820                 if (entry != NULL) {
821                         entry->proc_fops = &lmv_proc_target_fops;
822                         entry->data = obd;
823                 }
824        }
825 #endif
826         rc = fld_client_init(&lmv->lmv_fld,
827                              "LMV_UUID", LUSTRE_CLI_FLD_HASH_RRB);
828         if (rc) {
829                 CERROR("can't init FLD, err %d\n",
830                        rc);
831                 GOTO(out_free_datas, rc);
832         }
833
834         RETURN(0);
835
836 out_free_datas:
837         OBD_FREE(lmv->datas, lmv->datas_size);
838         lmv->datas = NULL;
839 out_free_tgts:
840         OBD_FREE(lmv->tgts, lmv->tgts_size);
841         lmv->tgts = NULL;
842         return rc;
843 }
844
845 static int lmv_cleanup(struct obd_device *obd)
846 {
847         struct lmv_obd *lmv = &obd->u.lmv;
848         ENTRY;
849
850         lprocfs_obd_cleanup(obd);
851         lmv_mgr_cleanup(obd);
852         fld_client_fini(&lmv->lmv_fld);
853         OBD_FREE(lmv->datas, lmv->datas_size);
854         OBD_FREE(lmv->tgts, lmv->tgts_size);
855
856         RETURN(0);
857 }
858
859 static int lmv_process_config(struct obd_device *obd, obd_count len, void *buf)
860 {
861         struct lustre_cfg *lcfg = buf;
862         struct obd_uuid tgt_uuid;
863         int rc;
864         ENTRY;
865
866         switch(lcfg->lcfg_command) {
867         case LCFG_ADD_MDC:
868                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(tgt_uuid.uuid))
869                         GOTO(out, rc = -EINVAL);
870
871                 obd_str2uuid(&tgt_uuid, lustre_cfg_string(lcfg, 1));
872                 rc = lmv_add_target(obd, &tgt_uuid);
873                 GOTO(out, rc);
874         default: {
875                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
876                 GOTO(out, rc = -EINVAL);
877         }
878         }
879 out:
880         RETURN(rc);
881 }
882
883 static int lmv_statfs(struct obd_device *obd, struct obd_statfs *osfs,
884                       unsigned long max_age)
885 {
886         struct lmv_obd *lmv = &obd->u.lmv;
887         struct obd_statfs *temp;
888         int rc = 0, i;
889         ENTRY;
890
891         rc = lmv_check_connect(obd);
892         if (rc)
893                 RETURN(rc);
894
895         OBD_ALLOC(temp, sizeof(*temp));
896         if (temp == NULL)
897                 RETURN(-ENOMEM);
898
899         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
900                 if (lmv->tgts[i].ltd_exp == NULL)
901                         continue;
902
903                 rc = obd_statfs(lmv->tgts[i].ltd_exp->exp_obd, temp, max_age);
904                 if (rc) {
905                         CERROR("can't stat MDS #%d (%s), error %d\n", i,
906                                lmv->tgts[i].ltd_exp->exp_obd->obd_name,
907                                rc);
908                         GOTO(out_free_temp, rc);
909                 }
910                 if (i == 0) {
911                         memcpy(osfs, temp, sizeof(*temp));
912                 } else {
913                         osfs->os_bavail += temp->os_bavail;
914                         osfs->os_blocks += temp->os_blocks;
915                         osfs->os_ffree += temp->os_ffree;
916                         osfs->os_files += temp->os_files;
917                 }
918         }
919
920         EXIT;
921 out_free_temp:
922         OBD_FREE(temp, sizeof(*temp));
923         return rc;
924 }
925
926 static int lmv_getstatus(struct obd_export *exp, struct lu_fid *fid)
927 {
928         struct obd_device *obd = exp->exp_obd;
929         struct lmv_obd *lmv = &obd->u.lmv;
930         int rc;
931         ENTRY;
932
933         rc = lmv_check_connect(obd);
934         if (rc)
935                 RETURN(rc);
936
937         rc = md_getstatus(lmv->tgts[0].ltd_exp, fid);
938
939         RETURN(rc);
940 }
941 static int lmv_getxattr(struct obd_export *exp, struct lu_fid *fid,
942                         obd_valid valid, const char *name,
943                         const char *input, int input_size,
944                         int output_size, int flags,
945                         struct ptlrpc_request **request)
946 {
947         struct obd_device *obd = exp->exp_obd;
948         struct lmv_obd *lmv = &obd->u.lmv;
949         int rc, i;
950         ENTRY;
951
952         rc = lmv_check_connect(obd);
953         if (rc)
954                 RETURN(rc);
955
956         i = lmv_fld_lookup(obd, fid);
957         if (i < 0)
958                 RETURN(i);
959
960         LASSERT(i < lmv->desc.ld_tgt_count);
961
962         rc = md_getxattr(lmv->tgts[i].ltd_exp, fid, valid, name,
963                          input, input_size, output_size, flags, request);
964
965         RETURN(rc);
966 }
967
968 static int lmv_setxattr(struct obd_export *exp, struct lu_fid *fid,
969                         obd_valid valid, const char *name,
970                         const char *input, int input_size,
971                         int output_size, int flags,
972                         struct ptlrpc_request **request)
973 {
974         struct obd_device *obd = exp->exp_obd;
975         struct lmv_obd *lmv = &obd->u.lmv;
976         int rc, i;
977         ENTRY;
978
979         rc = lmv_check_connect(obd);
980         if (rc)
981                 RETURN(rc);
982
983         i = lmv_fld_lookup(obd, fid);
984         if (i < 0)
985                 RETURN(i);
986
987         LASSERT(i < lmv->desc.ld_tgt_count);
988
989         rc = md_setxattr(lmv->tgts[i].ltd_exp, fid, valid, name,
990                          input, input_size, output_size, flags, request);
991         
992         RETURN(rc);
993 }
994
995 static int lmv_getattr(struct obd_export *exp, struct lu_fid *fid,
996                        obd_valid valid, int ea_size,
997                        struct ptlrpc_request **request)
998 {
999         struct obd_device *obd = exp->exp_obd;
1000         struct lmv_obd *lmv = &obd->u.lmv;
1001         struct lmv_obj *obj;
1002         int rc, i;
1003         ENTRY;
1004
1005         rc = lmv_check_connect(obd);
1006         if (rc)
1007                 RETURN(rc);
1008
1009         i = lmv_fld_lookup(obd, fid);
1010         if (i < 0)
1011                 RETURN(i);
1012
1013         LASSERT(i < lmv->desc.ld_tgt_count);
1014
1015         rc = md_getattr(lmv->tgts[i].ltd_exp, fid, valid,
1016                         ea_size, request);
1017         if (rc)
1018                 RETURN(rc);
1019
1020         obj = lmv_obj_grab(obd, fid);
1021
1022         CDEBUG(D_OTHER, "GETATTR for "DFID" %s\n",
1023                PFID(fid), obj ? "(splitted)" : "");
1024
1025         /* if object is splitted, then we loop over all the slaves and gather
1026          * size attribute. In ideal world we would have to gather also mds field
1027          * from all slaves, as object is spread over the cluster and this is
1028          * definitely interesting information and it is not good to loss it,
1029          * but... */
1030         if (obj) {
1031                 struct mdt_body *body;
1032
1033                 if (*request == NULL) {
1034                         lmv_obj_put(obj);
1035                         RETURN(rc);
1036                 }
1037
1038                 body = lustre_msg_buf((*request)->rq_repmsg, 0,
1039                                       sizeof(*body));
1040                 LASSERT(body != NULL);
1041
1042                 lmv_obj_lock(obj);
1043
1044                 for (i = 0; i < obj->lo_objcount; i++) {
1045
1046                         if (lmv->tgts[i].ltd_exp == NULL) {
1047                                 CWARN("%s: NULL export for %d\n",
1048                                       obd->obd_name, i);
1049                                 continue;
1050                         }
1051
1052                         /* skip master obj. */
1053                         if (lu_fid_eq(&obj->lo_fid, &obj->lo_inodes[i].li_fid))
1054                                 continue;
1055
1056                         body->size += obj->lo_inodes[i].li_size;
1057                 }
1058
1059                 lmv_obj_unlock(obj);
1060                 lmv_obj_put(obj);
1061         }
1062
1063         RETURN(rc);
1064 }
1065
1066 static int lmv_change_cbdata(struct obd_export *exp,
1067                              struct lu_fid *fid,
1068                              ldlm_iterator_t it,
1069                              void *data)
1070 {
1071         struct obd_device *obd = exp->exp_obd;
1072         struct lmv_obd *lmv = &obd->u.lmv;
1073         int i, rc;
1074         ENTRY;
1075
1076         rc = lmv_check_connect(obd);
1077         if (rc)
1078                 RETURN(rc);
1079
1080         CDEBUG(D_OTHER, "CBDATA for "DFID"\n", PFID(fid));
1081
1082         /* with CMD every object can have two locks in different namespaces:
1083          * lookup lock in space of mds storing direntry and update/open lock in
1084          * space of mds storing inode */
1085         for (i = 0; i < lmv->desc.ld_tgt_count; i++)
1086                 md_change_cbdata(lmv->tgts[i].ltd_exp, fid, it, data);
1087
1088         RETURN(0);
1089 }
1090
1091 static int lmv_close(struct obd_export *exp,
1092                      struct md_op_data *op_data,
1093                      struct obd_client_handle *och,
1094                      struct ptlrpc_request **request)
1095 {
1096         struct obd_device *obd = exp->exp_obd;
1097         struct lmv_obd *lmv = &obd->u.lmv;
1098         int rc, i;
1099         ENTRY;
1100
1101         rc = lmv_check_connect(obd);
1102         if (rc)
1103                 RETURN(rc);
1104
1105         i = lmv_fld_lookup(obd, &op_data->fid1);
1106         if (i < 0)
1107                 RETURN(i);
1108
1109         LASSERT(i < lmv->desc.ld_tgt_count);
1110         CDEBUG(D_OTHER, "CLOSE "DFID"\n", PFID(&op_data->fid1));
1111         rc = md_close(lmv->tgts[i].ltd_exp, op_data, och, request);
1112         RETURN(rc);
1113 }
1114
1115 /* called in the case MDS returns -ERESTART on create on open, what means that
1116  * directory is splitted and its LMV presentation object has to be updated. */
1117 int lmv_handle_split(struct obd_export *exp, struct lu_fid *fid)
1118 {
1119         struct obd_device *obd = exp->exp_obd;
1120         struct lmv_obd *lmv = &obd->u.lmv;
1121         struct ptlrpc_request *req = NULL;
1122         struct lmv_obj *obj;
1123         struct lustre_md md;
1124         int mealen, rc, i;
1125         __u64 valid;
1126         ENTRY;
1127
1128         md.mea = NULL;
1129         mealen = MEA_SIZE_LMV(lmv);
1130
1131         valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA;
1132
1133         i = lmv_fld_lookup(obd, fid);
1134         if (i < 0)
1135                 RETURN(i);
1136
1137         LASSERT(i < lmv->desc.ld_tgt_count);
1138
1139         /* time to update mea of parent fid */
1140         rc = md_getattr(lmv->tgts[i].ltd_exp, fid, valid,
1141                         mealen, &req);
1142         if (rc) {
1143                 CERROR("md_getattr() failed, error %d\n", rc);
1144                 GOTO(cleanup, rc);
1145         }
1146
1147         rc = md_get_lustre_md(lmv->tgts[i].ltd_exp, req, 0,
1148                               NULL, &md);
1149         if (rc) {
1150                 CERROR("mdc_get_lustre_md() failed, error %d\n", rc);
1151                 GOTO(cleanup, rc);
1152         }
1153
1154         if (md.mea == NULL)
1155                 GOTO(cleanup, rc = -ENODATA);
1156
1157         obj = lmv_obj_create(exp, fid, md.mea);
1158         if (IS_ERR(obj))
1159                 rc = PTR_ERR(obj);
1160         else
1161                 lmv_obj_put(obj);
1162
1163         obd_free_memmd(exp, (struct lov_stripe_md **)&md.mea);
1164
1165         EXIT;
1166 cleanup:
1167         if (req)
1168                 ptlrpc_req_finished(req);
1169         return rc;
1170 }
1171
1172 int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
1173                const void *data, int datalen, int mode, __u32 uid,
1174                __u32 gid, __u32 cap_effective,  __u64 rdev,
1175                struct ptlrpc_request **request)
1176 {
1177         struct obd_device *obd = exp->exp_obd;
1178         struct lmv_obd *lmv = &obd->u.lmv;
1179         struct mdt_body *body;
1180         struct lmv_obj *obj;
1181         int rc, mds, loop = 0;
1182         ENTRY;
1183
1184         rc = lmv_check_connect(obd);
1185         if (rc)
1186                 RETURN(rc);
1187
1188         if (!lmv->desc.ld_active_tgt_count)
1189                 RETURN(-EIO);
1190 repeat:
1191         LASSERT(++loop <= 2);
1192         obj = lmv_obj_grab(obd, &op_data->fid1);
1193         if (obj) {
1194                 mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1195                                    op_data->name, op_data->namelen);
1196                 op_data->fid1 = obj->lo_inodes[mds].li_fid;
1197                 lmv_obj_put(obj);
1198         }
1199
1200         CDEBUG(D_OTHER, "CREATE '%*s' on "DFID"\n", op_data->namelen,
1201                op_data->name, PFID(&op_data->fid1));
1202
1203         mds = lmv_fld_lookup(obd, &op_data->fid1);
1204         if (mds < 0)
1205                 RETURN(mds);
1206
1207         rc = md_create(lmv->tgts[mds].ltd_exp, op_data, data, datalen,
1208                        mode, uid, gid, cap_effective, rdev, request);
1209         if (rc == 0) {
1210                 if (*request == NULL)
1211                         RETURN(rc);
1212
1213                 body = lustre_msg_buf((*request)->rq_repmsg, 0,
1214                                       sizeof(*body));
1215                 if (body == NULL)
1216                         RETURN(-ENOMEM);
1217
1218                 CDEBUG(D_OTHER, "created. "DFID"\n", PFID(&op_data->fid1));
1219         } else if (rc == -ERESTART) {
1220                 /* directory got splitted. time to update local object and
1221                  * repeat the request with proper MDS. */
1222                 rc = lmv_handle_split(exp, &op_data->fid1);
1223                 if (rc == 0) {
1224                         ptlrpc_req_finished(*request);
1225                         goto repeat;
1226                 }
1227         }
1228         RETURN(rc);
1229 }
1230
1231 static int lmv_done_writing(struct obd_export *exp,
1232                             struct md_op_data *op_data)
1233 {
1234         struct obd_device *obd = exp->exp_obd;
1235         struct lmv_obd *lmv = &obd->u.lmv;
1236         int rc, mds;
1237         ENTRY;
1238
1239         rc = lmv_check_connect(obd);
1240         if (rc)
1241                 RETURN(rc);
1242
1243         mds = lmv_fld_lookup(obd, &op_data->fid1);
1244         if (mds < 0)
1245                 RETURN(mds);
1246         rc = md_done_writing(lmv->tgts[mds].ltd_exp, op_data);
1247         RETURN(rc);
1248 }
1249
1250 static int
1251 lmv_enqueue_slaves(struct obd_export *exp, int locktype,
1252                    struct lookup_intent *it, int lockmode,
1253                    struct md_op_data *op_data, struct lustre_handle *lockh,
1254                    void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
1255                    ldlm_blocking_callback cb_blocking, void *cb_data)
1256 {
1257         struct obd_device *obd = exp->exp_obd;
1258         struct lmv_obd *lmv = &obd->u.lmv;
1259         struct lmv_stripe_md *mea = op_data->mea1;
1260         struct md_op_data *op_data2;
1261         int i, rc = 0, mds;
1262         ENTRY;
1263
1264         OBD_ALLOC_PTR(op_data2);
1265         if (op_data2 == NULL)
1266                 RETURN(-ENOMEM);
1267
1268         LASSERT(mea != NULL);
1269         for (i = 0; i < mea->mea_count; i++) {
1270                 memset(op_data2, 0, sizeof(*op_data2));
1271                 op_data2->fid1 = mea->mea_ids[i];
1272                 mds = lmv_fld_lookup(obd, &op_data2->fid1);
1273                 if (mds < 0)
1274                         GOTO(cleanup, rc = mds);
1275
1276                 if (lmv->tgts[mds].ltd_exp == NULL)
1277                         continue;
1278
1279                 rc = md_enqueue(lmv->tgts[mds].ltd_exp, locktype, it,
1280                                 lockmode, op_data2, lockh + i, lmm, lmmsize,
1281                                 cb_compl, cb_blocking, cb_data, 0);
1282
1283                 CDEBUG(D_OTHER, "take lock on slave "DFID" -> %d/%d\n",
1284                        PFID(&mea->mea_ids[i]), rc, it->d.lustre.it_status);
1285                 if (rc)
1286                         GOTO(cleanup, rc);
1287                 if (it->d.lustre.it_data) {
1288                         struct ptlrpc_request *req;
1289                         req = (struct ptlrpc_request *)it->d.lustre.it_data;
1290                         ptlrpc_req_finished(req);
1291                 }
1292
1293                 if (it->d.lustre.it_status)
1294                         GOTO(cleanup, rc = it->d.lustre.it_status);
1295         }
1296
1297         EXIT;
1298 cleanup:
1299         OBD_FREE_PTR(op_data2);
1300
1301         if (rc != 0) {
1302                 /* drop all taken locks */
1303                 while (--i >= 0) {
1304                         if (lockh[i].cookie)
1305                                 ldlm_lock_decref(lockh + i, lockmode);
1306                         lockh[i].cookie = 0;
1307                 }
1308         }
1309         return rc;
1310 }
1311
1312 static int
1313 lmv_enqueue_remote(struct obd_export *exp, int lock_type,
1314                    struct lookup_intent *it, int lock_mode,
1315                    struct md_op_data *op_data, struct lustre_handle *lockh,
1316                    void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
1317                    ldlm_blocking_callback cb_blocking, void *cb_data,
1318                    int extra_lock_flags)
1319 {
1320         struct ptlrpc_request *req = it->d.lustre.it_data;
1321         struct obd_device *obd = exp->exp_obd;
1322         struct lmv_obd *lmv = &obd->u.lmv;
1323         struct mdt_body *body = NULL;
1324         struct lustre_handle plock;
1325         struct md_op_data *rdata;
1326         int i, rc = 0, pmode;
1327         ENTRY;
1328
1329         body = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body));
1330         LASSERT(body != NULL);
1331
1332         if (!(body->valid & OBD_MD_MDS))
1333                 RETURN(0);
1334
1335         CDEBUG(D_OTHER, "ENQUEUE '%s' on "DFID" -> "DFID"\n",
1336                LL_IT2STR(it), PFID(&op_data->fid1), PFID(&body->fid1));
1337
1338         /* we got LOOKUP lock, but we really need attrs */
1339         pmode = it->d.lustre.it_lock_mode;
1340         LASSERT(pmode != 0);
1341         memcpy(&plock, lockh, sizeof(plock));
1342         it->d.lustre.it_lock_mode = 0;
1343         it->d.lustre.it_data = NULL;
1344
1345         OBD_ALLOC_PTR(rdata);
1346         if (rdata == NULL)
1347                 RETURN(-ENOMEM);
1348         rdata->fid1 = body->fid1;
1349         rdata->name = NULL;
1350         rdata->namelen = 0;
1351
1352         it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
1353         ptlrpc_req_finished(req);
1354
1355         i = lmv_fld_lookup(obd, &rdata->fid1);
1356         if (i < 0)
1357                 GOTO(out_free_rdata, rc = i);
1358         rc = md_enqueue(lmv->tgts[i].ltd_exp,
1359                         lock_type, it, lock_mode, rdata, lockh, lmm,
1360                         lmmsize, cb_compl, cb_blocking, cb_data,
1361                         extra_lock_flags);
1362         ldlm_lock_decref(&plock, pmode);
1363
1364         EXIT;
1365 out_free_rdata:
1366         OBD_FREE_PTR(rdata);
1367         return rc;
1368 }
1369
1370 static int
1371 lmv_enqueue(struct obd_export *exp, int lock_type,
1372             struct lookup_intent *it, int lock_mode,
1373             struct md_op_data *op_data, struct lustre_handle *lockh,
1374             void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
1375             ldlm_blocking_callback cb_blocking, void *cb_data,
1376             int extra_lock_flags)
1377 {
1378         struct obd_device *obd = exp->exp_obd;
1379         struct lmv_obd *lmv = &obd->u.lmv;
1380         struct lmv_obj *obj;
1381         int rc, mds;
1382         ENTRY;
1383
1384         rc = lmv_check_connect(obd);
1385         if (rc)
1386                 RETURN(rc);
1387
1388         if (op_data->mea1 && it->it_op == IT_UNLINK) {
1389                 rc = lmv_enqueue_slaves(exp, lock_type, it, lock_mode,
1390                                         op_data, lockh, lmm, lmmsize,
1391                                         cb_compl, cb_blocking, cb_data);
1392                 RETURN(rc);
1393         }
1394
1395         if (op_data->namelen) {
1396                 obj = lmv_obj_grab(obd, &op_data->fid1);
1397                 if (obj) {
1398                         /* directory is splitted. look for right mds for this
1399                          * name */
1400                         mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1401                                            (char *)op_data->name, op_data->namelen);
1402                         op_data->fid1 = obj->lo_inodes[mds].li_fid;
1403                         lmv_obj_put(obj);
1404                 }
1405         }
1406         CDEBUG(D_OTHER, "ENQUEUE '%s' on "DFID"\n", LL_IT2STR(it),
1407                PFID(&op_data->fid1));
1408
1409         mds = lmv_fld_lookup(obd, &op_data->fid1);
1410         if (mds < 0)
1411                 RETURN(mds);
1412         rc = md_enqueue(lmv->tgts[mds].ltd_exp,
1413                         lock_type, it, lock_mode, op_data, lockh, lmm,
1414                         lmmsize, cb_compl, cb_blocking, cb_data,
1415                         extra_lock_flags);
1416         if (rc == 0 && it->it_op == IT_OPEN)
1417                 rc = lmv_enqueue_remote(exp, lock_type, it, lock_mode,
1418                                         op_data, lockh, lmm, lmmsize,
1419                                         cb_compl, cb_blocking, cb_data,
1420                                         extra_lock_flags);
1421         RETURN(rc);
1422 }
1423
1424 static int
1425 lmv_getattr_name(struct obd_export *exp, struct lu_fid *fid,
1426                  const char *filename, int namelen, obd_valid valid,
1427                  int ea_size, struct ptlrpc_request **request)
1428 {
1429         struct obd_device *obd = exp->exp_obd;
1430         struct lmv_obd *lmv = &obd->u.lmv;
1431         struct lu_fid rid = *fid;
1432         int rc, mds, loop = 0;
1433         struct mdt_body *body;
1434         struct lmv_obj *obj;
1435         ENTRY;
1436
1437         rc = lmv_check_connect(obd);
1438         if (rc)
1439                 RETURN(rc);
1440
1441         mds = lmv_fld_lookup(obd, fid);
1442         if (mds < 0)
1443                 RETURN(mds);
1444 repeat:
1445         LASSERT(++loop <= 2);
1446         obj = lmv_obj_grab(obd, fid);
1447         if (obj) {
1448                 /* directory is splitted. look for right mds for this name */
1449                 mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1450                                    filename, namelen - 1);
1451                 rid = obj->lo_inodes[mds].li_fid;
1452                 lmv_obj_put(obj);
1453         }
1454
1455         CDEBUG(D_OTHER, "getattr_lock for %*s on "DFID" -> "DFID"\n",
1456                namelen, filename, PFID(fid), PFID(&rid));
1457
1458         mds = lmv_fld_lookup(obd, &rid);
1459         if (mds < 0)
1460                 RETURN(mds);
1461
1462         rc = md_getattr_name(lmv->tgts[mds].ltd_exp,
1463                              &rid, filename, namelen,
1464                              valid, ea_size, request);
1465         if (rc == 0) {
1466                 body = lustre_msg_buf((*request)->rq_repmsg, 0, sizeof(*body));
1467                 LASSERT(body != NULL);
1468
1469                 if (body->valid & OBD_MD_MDS) {
1470                         struct ptlrpc_request *req = NULL;
1471
1472                         rid = body->fid1;
1473                         CDEBUG(D_OTHER, "request attrs for "DFID"\n", PFID(&rid));
1474
1475                         /*
1476                          * XXX check for error.
1477                          */
1478                         mds = lmv_fld_lookup(obd, &rid);
1479                         rc = md_getattr_name(lmv->tgts[mds].ltd_exp,
1480                                              &rid, NULL, 1, valid, ea_size, &req);
1481                         ptlrpc_req_finished(*request);
1482                         *request = req;
1483                 }
1484         } else if (rc == -ERESTART) {
1485                 /* directory got splitted. time to update local object and
1486                  * repeat the request with proper MDS */
1487                 rc = lmv_handle_split(exp, &rid);
1488                 if (rc == 0) {
1489                         ptlrpc_req_finished(*request);
1490                         goto repeat;
1491                 }
1492         }
1493         RETURN(rc);
1494 }
1495
1496 /*
1497  * llite passes fid of an target inode in op_data->fid1 and id of directory in
1498  * op_data->fid2
1499  */
1500 static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
1501                     struct ptlrpc_request **request)
1502 {
1503         struct obd_device *obd = exp->exp_obd;
1504         struct lmv_obd *lmv = &obd->u.lmv;
1505         struct lmv_obj *obj;
1506         int rc, mds;
1507         ENTRY;
1508
1509         rc = lmv_check_connect(obd);
1510         if (rc)
1511                 RETURN(rc);
1512
1513         if (op_data->namelen != 0) {
1514                 /* usual link request */
1515                 obj = lmv_obj_grab(obd, &op_data->fid2);
1516                 if (obj) {
1517                         rc = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1518                                           op_data->name, op_data->namelen);
1519                         op_data->fid2 = obj->lo_inodes[rc].li_fid;
1520                         lmv_obj_put(obj);
1521                 }
1522
1523                 mds = lmv_fld_lookup(obd, &op_data->fid2);
1524                 if (mds < 0)
1525                         RETURN(mds);
1526
1527                 CDEBUG(D_OTHER,"link "DFID":%*s to "DFID"\n",
1528                        PFID(&op_data->fid2), op_data->namelen,
1529                        op_data->name, PFID(&op_data->fid1));
1530         } else {
1531                 mds = lmv_fld_lookup(obd, &op_data->fid1);
1532                 if (mds < 0)
1533                         RETURN(mds);
1534
1535                 /* request from MDS to acquire i_links for inode by fid1 */
1536                 CDEBUG(D_OTHER, "inc i_nlinks for "DFID"\n",
1537                        PFID(&op_data->fid1));
1538         }
1539
1540         CDEBUG(D_OTHER, "forward to MDS #%u ("DFID")\n",
1541                mds, PFID(&op_data->fid1));
1542         rc = md_link(lmv->tgts[mds].ltd_exp, op_data, request);
1543
1544         RETURN(rc);
1545 }
1546
1547 static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
1548                       const char *old, int oldlen, const char *new, int newlen,
1549                       struct ptlrpc_request **request)
1550 {
1551         struct obd_device *obd = exp->exp_obd;
1552         struct lmv_obd *lmv = &obd->u.lmv;
1553         struct lmv_obj *obj;
1554         int rc, mds, mds2;
1555         ENTRY;
1556
1557         CDEBUG(D_OTHER, "rename %*s in "DFID" to %*s in "DFID"\n",
1558                oldlen, old, PFID(&op_data->fid1), newlen, new,
1559                PFID(&op_data->fid2));
1560
1561         rc = lmv_check_connect(obd);
1562         if (rc)
1563                 RETURN(rc);
1564
1565         if (oldlen == 0) {
1566                 /*
1567                  * MDS with old dir entry is asking another MDS to create name
1568                  * there.
1569                  */
1570                 CDEBUG(D_OTHER,
1571                        "create %*s(%d/%d) in "DFID" pointing "
1572                        "to "DFID"\n", newlen, new, oldlen, newlen,
1573                        PFID(&op_data->fid2), PFID(&op_data->fid1));
1574
1575                 mds = lmv_fld_lookup(obd, &op_data->fid2);
1576                 if (mds < 0)
1577                         RETURN(mds);
1578
1579                 /*
1580                  * target directory can be splitted, sowe should forward request
1581                  * to the right MDS.
1582                  */
1583                 obj = lmv_obj_grab(obd, &op_data->fid2);
1584                 if (obj) {
1585                         mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1586                                            (char *)new, newlen);
1587                         op_data->fid2 = obj->lo_inodes[mds].li_fid;
1588                         CDEBUG(D_OTHER, "forward to MDS #%u ("DFID")\n", mds,
1589                                PFID(&op_data->fid2));
1590                         lmv_obj_put(obj);
1591                 }
1592                 goto request;
1593         }
1594
1595         obj = lmv_obj_grab(obd, &op_data->fid1);
1596         if (obj) {
1597                 /*
1598                  * directory is already splitted, so we have to forward request
1599                  * to the right MDS.
1600                  */
1601                 mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1602                                    (char *)old, oldlen);
1603                 op_data->fid1 = obj->lo_inodes[mds].li_fid;
1604                 CDEBUG(D_OTHER, "forward to MDS #%u ("DFID")\n", mds,
1605                        PFID(&op_data->fid1));
1606                 lmv_obj_put(obj);
1607         }
1608
1609         obj = lmv_obj_grab(obd, &op_data->fid2);
1610         if (obj) {
1611                 /*
1612                  * directory is already splitted, so we have to forward request
1613                  * to the right MDS.
1614                  */
1615                 mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1616                                    (char *)new, newlen);
1617
1618                 op_data->fid2 = obj->lo_inodes[mds].li_fid;
1619                 CDEBUG(D_OTHER, "forward to MDS #%u ("DFID")\n", mds,
1620                        PFID(&op_data->fid2));
1621                 lmv_obj_put(obj);
1622         }
1623
1624         mds = lmv_fld_lookup(obd, &op_data->fid1);
1625         if (mds < 0)
1626                 RETURN(mds);
1627
1628
1629 request:
1630         mds2 = lmv_fld_lookup(obd, &op_data->fid2);
1631         if (mds2 < 0)
1632                 RETURN(mds2);
1633
1634         if (mds != mds2) {
1635                 CDEBUG(D_OTHER,"cross-node rename "DFID"/%*s to "DFID"/%*s\n",
1636                        PFID(&op_data->fid1), oldlen, old, PFID(&op_data->fid2),
1637                        newlen, new);
1638         }
1639
1640         rc = md_rename(lmv->tgts[mds].ltd_exp, op_data, old, oldlen,
1641                        new, newlen, request);
1642         RETURN(rc);
1643 }
1644
1645 static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
1646                        struct iattr *iattr, void *ea, int ealen, void *ea2,
1647                        int ea2len, struct ptlrpc_request **request)
1648 {
1649         struct obd_device *obd = exp->exp_obd;
1650         struct lmv_obd *lmv = &obd->u.lmv;
1651         struct ptlrpc_request *req;
1652         struct mdt_body *body;
1653         struct lmv_obj *obj;
1654         int rc = 0, i, mds;
1655         ENTRY;
1656
1657         rc = lmv_check_connect(obd);
1658         if (rc)
1659                 RETURN(rc);
1660
1661         obj = lmv_obj_grab(obd, &op_data->fid1);
1662
1663         CDEBUG(D_OTHER, "SETATTR for "DFID", valid 0x%x%s\n",
1664                PFID(&op_data->fid1), iattr->ia_valid, obj ? ", splitted" : "");
1665
1666         if (obj) {
1667                 for (i = 0; i < obj->lo_objcount; i++) {
1668                         op_data->fid1 = obj->lo_inodes[i].li_fid;
1669
1670                         mds = lmv_fld_lookup(obd, &op_data->fid1);
1671                         if (mds < 0) {
1672                                 rc = mds;
1673                                 break;
1674                         }
1675
1676                         rc = md_setattr(lmv->tgts[mds].ltd_exp,
1677                                         op_data, iattr, ea, ealen, ea2,
1678                                         ea2len, &req);
1679
1680                         if (lu_fid_eq(&obj->lo_fid, &obj->lo_inodes[i].li_fid)) {
1681                                 /*
1682                                  * this is master object and this request should
1683                                  * be returned back to llite.
1684                                  */
1685                                 *request = req;
1686                         } else {
1687                                 ptlrpc_req_finished(req);
1688                         }
1689
1690                         if (rc)
1691                                 break;
1692                 }
1693                 lmv_obj_put(obj);
1694         } else {
1695                 mds = lmv_fld_lookup(obd, &op_data->fid1);
1696                 if (mds < 0)
1697                         RETURN(mds);
1698                 LASSERT(mds < lmv->desc.ld_tgt_count);
1699                 rc = md_setattr(lmv->tgts[mds].ltd_exp, op_data, iattr, ea,
1700                                 ealen, ea2, ea2len, request);
1701                 if (rc == 0) {
1702                         body = lustre_msg_buf((*request)->rq_repmsg, 0,
1703                                               sizeof(*body));
1704                         LASSERT(body != NULL);
1705                 }
1706         }
1707         RETURN(rc);
1708 }
1709
1710 static int lmv_sync(struct obd_export *exp, struct lu_fid *fid,
1711                     struct ptlrpc_request **request)
1712 {
1713         struct obd_device *obd = exp->exp_obd;
1714         struct lmv_obd *lmv = &obd->u.lmv;
1715         int i, rc;
1716         ENTRY;
1717
1718         rc = lmv_check_connect(obd);
1719         if (rc)
1720                 RETURN(rc);
1721
1722         i = lmv_fld_lookup(obd, fid);
1723         if (i < 0)
1724                 RETURN(i);
1725         rc = md_sync(lmv->tgts[i].ltd_exp,
1726                      fid, request);
1727         RETURN(rc);
1728 }
1729
1730 /* main purpose of LMV blocking ast is to remove splitted directory
1731  * LMV presentation object (struct lmv_obj) attached to the lock
1732  * being revoked. */
1733 int lmv_blocking_ast(struct ldlm_lock *lock,
1734                      struct ldlm_lock_desc *desc,
1735                      void *data, int flag)
1736 {
1737         struct lustre_handle lockh;
1738         struct lmv_obj *obj;
1739         int rc;
1740         ENTRY;
1741
1742         switch (flag) {
1743         case LDLM_CB_BLOCKING:
1744                 ldlm_lock2handle(lock, &lockh);
1745                 rc = ldlm_cli_cancel(&lockh);
1746                 if (rc < 0) {
1747                         CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
1748                         RETURN(rc);
1749                 }
1750                 break;
1751         case LDLM_CB_CANCELING:
1752                 /* time to drop cached attrs for dirobj */
1753                 obj = lock->l_ast_data;
1754                 if (obj) {
1755                         CDEBUG(D_OTHER, "cancel %s on "LPU64"/"LPU64
1756                                ", master "DFID"\n",
1757                                lock->l_resource->lr_name.name[3] == 1 ?
1758                                "LOOKUP" : "UPDATE",
1759                                lock->l_resource->lr_name.name[0],
1760                                lock->l_resource->lr_name.name[1],
1761                                PFID(&obj->lo_fid));
1762                         lmv_obj_put(obj);
1763                 }
1764                 break;
1765         default:
1766                 LBUG();
1767         }
1768         RETURN(0);
1769 }
1770
1771 static void lmv_remove_dots(struct page *page)
1772 {
1773         unsigned limit = PAGE_CACHE_SIZE;
1774         char *kaddr = page_address(page);
1775         struct ext2_dir_entry_2 *p;
1776         unsigned offs, rec_len;
1777
1778         for (offs = 0; offs <= limit - EXT2_DIR_REC_LEN(1); offs += rec_len) {
1779                 p = (struct ext2_dir_entry_2 *)(kaddr + offs);
1780                 rec_len = le16_to_cpu(p->rec_len);
1781
1782                 if ((p->name_len == 1 && p->name[0] == '.') ||
1783                     (p->name_len == 2 && p->name[0] == '.' && p->name[1] == '.'))
1784                         p->inode = 0;
1785         }
1786 }
1787
1788 static int lmv_readpage(struct obd_export *exp, struct lu_fid *fid,
1789                         __u64 offset, struct page *page,
1790                         struct ptlrpc_request **request)
1791 {
1792         struct obd_device *obd = exp->exp_obd;
1793         struct lmv_obd *lmv = &obd->u.lmv;
1794         struct lu_fid rid = *fid;
1795         struct lmv_obj *obj;
1796         int rc, i;
1797         ENTRY;
1798
1799         rc = lmv_check_connect(obd);
1800         if (rc)
1801                 RETURN(rc);
1802
1803         i = lmv_fld_lookup(obd, fid);
1804         if (i < 0)
1805                 RETURN(i);
1806         LASSERT(i < lmv->desc.ld_tgt_count);
1807         CDEBUG(D_OTHER, "READPAGE at %llu from "DFID"\n",
1808                offset, PFID(&rid));
1809
1810         obj = lmv_obj_grab(obd, fid);
1811         if (obj) {
1812                 lmv_obj_lock(obj);
1813
1814                 /* find dirobj containing page with requested offset. */
1815                 for (i = 0; i < obj->lo_objcount; i++) {
1816                         if (offset < obj->lo_inodes[i].li_size)
1817                                 break;
1818                         offset -= obj->lo_inodes[i].li_size;
1819                 }
1820                 rid = obj->lo_inodes[i].li_fid;
1821
1822                 lmv_obj_unlock(obj);
1823                 lmv_obj_put(obj);
1824
1825                 CDEBUG(D_OTHER, "forward to "DFID" with offset %lu\n",
1826                        PFID(&rid), (unsigned long)offset);
1827         }
1828         i = lmv_fld_lookup(obd, &rid);
1829         if (i < 0)
1830                 RETURN(i);
1831         rc = md_readpage(lmv->tgts[i].ltd_exp, &rid,
1832                          offset, page, request);
1833
1834         if (0 && rc == 0 && !lu_fid_eq(&rid, fid))
1835                 /* this page isn't from master object. To avoid "." and ".."
1836                  * duplication in directory, we have to remove them from all
1837                  * slave objects
1838                  *
1839                  * XXX this is not needed for cmd3 readdir, because only
1840                  * master directory has dot and dotdot.
1841                  */
1842                 lmv_remove_dots(page);
1843
1844         RETURN(rc);
1845 }
1846
1847 static int lmv_unlink_slaves(struct obd_export *exp,
1848                              struct md_op_data *op_data,
1849                              struct ptlrpc_request **req)
1850 {
1851         struct obd_device *obd = exp->exp_obd;
1852         struct lmv_obd *lmv = &obd->u.lmv;
1853         struct lmv_stripe_md *mea = op_data->mea1;
1854         struct md_op_data *op_data2;
1855         int i, mds, rc = 0;
1856         ENTRY;
1857
1858         OBD_ALLOC_PTR(op_data2);
1859         if (op_data2 == NULL)
1860                 RETURN(-ENOMEM);
1861
1862         LASSERT(mea != NULL);
1863         for (i = 0; i < mea->mea_count; i++) {
1864                 memset(op_data2, 0, sizeof(*op_data2));
1865                 op_data2->fid1 = mea->mea_ids[i];
1866                 op_data2->create_mode = MDS_MODE_DONT_LOCK | S_IFDIR;
1867
1868                 mds = lmv_fld_lookup(obd, &op_data2->fid1);
1869                 if (mds < 0)
1870                         GOTO(out_free_op_data2, rc = mds);
1871                 if (lmv->tgts[mds].ltd_exp == NULL)
1872                         continue;
1873
1874                 rc = md_unlink(lmv->tgts[mds].ltd_exp,
1875                                op_data2, req);
1876
1877                 CDEBUG(D_OTHER, "unlink slave "DFID" -> %d\n",
1878                        PFID(&mea->mea_ids[i]), rc);
1879
1880                 if (*req) {
1881                         ptlrpc_req_finished(*req);
1882                         *req = NULL;
1883                 }
1884                 if (rc)
1885                         GOTO(out_free_op_data2, rc);
1886         }
1887
1888         EXIT;
1889 out_free_op_data2:
1890         OBD_FREE_PTR(op_data2);
1891         return rc;
1892 }
1893
1894 static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
1895                       struct ptlrpc_request **request)
1896 {
1897         struct obd_device *obd = exp->exp_obd;
1898         struct lmv_obd *lmv = &obd->u.lmv;
1899         int rc, i = 0;
1900         ENTRY;
1901
1902         rc = lmv_check_connect(obd);
1903         if (rc)
1904                 RETURN(rc);
1905
1906         if (op_data->namelen == 0 && op_data->mea1 != NULL) {
1907                 /* mds asks to remove slave objects */
1908                 rc = lmv_unlink_slaves(exp, op_data, request);
1909                 RETURN(rc);
1910         }
1911
1912         if (op_data->namelen != 0) {
1913                 struct lmv_obj *obj;
1914
1915                 obj = lmv_obj_grab(obd, &op_data->fid1);
1916                 if (obj) {
1917                         i = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1918                                          op_data->name, op_data->namelen);
1919                         op_data->fid1 = obj->lo_inodes[i].li_fid;
1920                         lmv_obj_put(obj);
1921                 }
1922                 CDEBUG(D_OTHER, "unlink '%*s' in "DFID" -> %u\n",
1923                        op_data->namelen, op_data->name, PFID(&op_data->fid1),
1924                        i);
1925         } else {
1926                 CDEBUG(D_OTHER, "drop i_nlink on "DFID"\n",
1927                        PFID(&op_data->fid1));
1928         }
1929         i = lmv_fld_lookup(obd, &op_data->fid1);
1930         if (i < 0)
1931                 RETURN(i);
1932         rc = md_unlink(lmv->tgts[i].ltd_exp, op_data, request);
1933         RETURN(rc);
1934 }
1935
1936 static int lmv_llog_init(struct obd_device *obd, struct obd_device *tgt,
1937                          int count, struct llog_catid *logid)
1938 {
1939         struct llog_ctxt *ctxt;
1940         int rc;
1941         ENTRY;
1942
1943         rc = llog_setup(obd, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
1944                         &llog_client_ops);
1945         if (rc == 0) {
1946                 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
1947                 ctxt->loc_imp = tgt->u.cli.cl_import;
1948         }
1949
1950         RETURN(rc);
1951 }
1952
1953 static int lmv_llog_finish(struct obd_device *obd, int count)
1954 {
1955         int rc;
1956         ENTRY;
1957
1958         rc = llog_cleanup(llog_get_context(obd, LLOG_CONFIG_REPL_CTXT));
1959         RETURN(rc);
1960 }
1961
1962 static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
1963 {
1964         int rc = 0;
1965
1966         switch (stage) {
1967         case OBD_CLEANUP_EARLY:
1968                 /* XXX: here should be calling obd_precleanup() down to
1969                  * stack. */
1970                 break;
1971         case OBD_CLEANUP_SELF_EXP:
1972                 rc = obd_llog_finish(obd, 0);
1973                 if (rc != 0)
1974                         CERROR("failed to cleanup llogging subsystems\n");
1975                 break;
1976         default:
1977                 break;
1978         }
1979         RETURN(rc);
1980 }
1981
1982 static int lmv_get_info(struct obd_export *exp, __u32 keylen,
1983                         void *key, __u32 *vallen, void *val)
1984 {
1985         struct obd_device *obd;
1986         struct lmv_obd *lmv;
1987         int rc = 0;
1988         ENTRY;
1989
1990         obd = class_exp2obd(exp);
1991         if (obd == NULL) {
1992                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
1993                        exp->exp_handle.h_cookie);
1994                 RETURN(-EINVAL);
1995         }
1996
1997         lmv = &obd->u.lmv;
1998         if (keylen == strlen("mdsize") && !strcmp(key, "mdsize")) {
1999                 __u32 *mdsize = val;
2000                 *vallen = sizeof(__u32);
2001                 *mdsize = sizeof(struct lu_fid) * lmv->desc.ld_tgt_count
2002                        + sizeof(struct lmv_stripe_md);
2003                 RETURN(0);
2004         } else if (keylen == strlen("mdsnum") && !strcmp(key, "mdsnum")) {
2005                 struct obd_uuid *cluuid = &lmv->cluuid;
2006                 struct lmv_tgt_desc *tgts;
2007                 __u32 *mdsnum = val;
2008                 int i;
2009
2010                 tgts = lmv->tgts;
2011                 for (i = 0; i < lmv->desc.ld_tgt_count; i++, tgts++) {
2012                         if (obd_uuid_equals(&tgts->uuid, cluuid)) {
2013                                 *vallen = sizeof(__u32);
2014                                 *mdsnum = i;
2015                                 RETURN(0);
2016                         }
2017                 }
2018                 LASSERT(0);
2019         } else if (keylen == strlen("rootid") && !strcmp(key, "rootid")) {
2020                 rc = lmv_check_connect(obd);
2021                 if (rc)
2022                         RETURN(rc);
2023
2024                 /* getting rootid from first MDS. */
2025                 rc = obd_get_info(lmv->tgts[0].ltd_exp, keylen, key,
2026                                   vallen, val);
2027                 RETURN(rc);
2028         } else if (keylen >= strlen("lmvdesc") && !strcmp(key, "lmvdesc")) {
2029                 struct lmv_desc *desc_ret = val;
2030                 *desc_ret = lmv->desc;
2031                 RETURN(0);
2032         } else if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
2033                 struct lmv_tgt_desc *tgts;
2034                 int i;
2035
2036                 rc = lmv_check_connect(obd);
2037                 if (rc)
2038                         RETURN(rc);
2039
2040                 LASSERT(*vallen == sizeof(__u32));
2041                 for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count;
2042                      i++, tgts++) {
2043
2044                         /* all tgts should be connected when this get called. */
2045                         if (!tgts || !tgts->ltd_exp) {
2046                                 CERROR("target not setup?\n");
2047                                 continue;
2048                         }
2049
2050                         if (!obd_get_info(tgts->ltd_exp, keylen, key,
2051                                           vallen, val))
2052                                 RETURN(0);
2053                 }
2054                 RETURN(-EINVAL);
2055         } else if ((keylen >= strlen("lovdesc") && !strcmp(key, "lovdesc")) ||
2056                    (keylen >= strlen("max_easize") && !strcmp(key, "max_easize"))) {
2057                 
2058                 rc = lmv_check_connect(obd);
2059                 if (rc)
2060                         RETURN(rc);
2061
2062                 /* forwarding this request to first MDS, it should know LOV
2063                  * desc. */
2064                 rc = obd_get_info(lmv->tgts[0].ltd_exp, keylen, key,
2065                                   vallen, val);
2066                 RETURN(rc);
2067         } /* else if (keylen >= strlen("getext") && !strcmp(key, "getext")) {
2068                 struct lmv_tgt_desc *tgts;
2069                 int i;
2070
2071                 rc = lmv_check_connect(obd);
2072                 if (rc)
2073                         RETURN(rc);
2074
2075                 LASSERT(*vallen == sizeof(struct fid_extent));
2076                 for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count;
2077                      i++, tgts++) {
2078
2079                         if (!tgts || !tgts->ltd_exp) {
2080                                 CERROR("target not setup?\n");
2081                                 continue;
2082                         }
2083
2084                         rc = obd_get_info(tgts->ltd_exp, keylen, key,
2085                                           vallen, val);
2086                         if (rc)
2087                                 RETURN(rc);
2088                 }
2089                 RETURN(0);
2090         }*/
2091
2092         CDEBUG(D_IOCTL, "invalid key\n");
2093         RETURN(-EINVAL);
2094 }
2095
2096 int lmv_set_info_async(struct obd_export *exp, obd_count keylen,
2097                        void *key, obd_count vallen, void *val,
2098                        struct ptlrpc_request_set *set)
2099 {
2100         struct lmv_tgt_desc    *tgt;
2101         struct obd_device      *obd;
2102         struct lmv_obd         *lmv;
2103         int rc = 0;
2104         ENTRY;
2105
2106         obd = class_exp2obd(exp);
2107         if (obd == NULL) {
2108                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2109                        exp->exp_handle.h_cookie);
2110                 RETURN(-EINVAL);
2111         }
2112         lmv = &obd->u.lmv;
2113
2114         if (keylen >= strlen("inter_mds") && strcmp(key, "inter_mds") == 0) {
2115                 lmv->server_timeout = 1;
2116                 lmv_set_timeouts(obd);
2117                 RETURN(0);
2118         }
2119
2120         /* maybe this could be default */
2121         if ((keylen == strlen("sec") && strcmp(key, "sec") == 0) ||
2122             (keylen == strlen("sec_flags") && strcmp(key, "sec_flags") == 0) ||
2123             (keylen == strlen("nllu") && strcmp(key, "nllu") == 0)) {
2124                 struct obd_export *exp;
2125                 int err, i;
2126
2127                 spin_lock(&lmv->lmv_lock);
2128                 for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count;
2129                      i++, tgt++) {
2130                         exp = tgt->ltd_exp;
2131                         /* during setup time the connections to mdc might
2132                          * haven't been established.
2133                          */
2134                         if (exp == NULL) {
2135                                 struct obd_device *tgt_obd;
2136
2137                                 tgt_obd = class_find_client_obd(&tgt->uuid,
2138                                                                 LUSTRE_MDC_NAME,
2139                                                                 &obd->obd_uuid);
2140                                 if (!tgt_obd) {
2141                                         CERROR("can't set info %s, "
2142                                                "device %s not attached?\n",
2143                                                 (char *) key, tgt->uuid.uuid);
2144                                         rc = -EINVAL;
2145                                         continue;
2146                                 }
2147                                 exp = tgt_obd->obd_self_export;
2148                         }
2149
2150                         err = obd_set_info_async(exp, keylen, key, vallen, val, set);
2151                         if (!rc)
2152                                 rc = err;
2153                 }
2154                 spin_unlock(&lmv->lmv_lock);
2155
2156                 RETURN(rc);
2157         }
2158         if (((keylen == strlen("flush_cred") &&
2159              strcmp(key, "flush_cred") == 0)) ||
2160              ((keylen == strlen("crypto_type") &&
2161              strcmp(key, "crypto_type") == 0))) {
2162                 int i;
2163
2164                 for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count;
2165                      i++, tgt++) {
2166                         if (!tgt->ltd_exp)
2167                                 continue;
2168                         rc = obd_set_info_async(tgt->ltd_exp,
2169                                                 keylen, key, vallen,
2170                                                 val, set);
2171                         if (rc)
2172                                 RETURN(rc);
2173                 }
2174
2175                 RETURN(0);
2176         }
2177
2178         if (keylen == strlen("ids") && memcmp(key, "ids", keylen) == 0) {
2179                 struct lu_fid *fid = (struct lu_fid *)val;
2180                 int i;
2181
2182                 rc = lmv_check_connect(obd);
2183                 if (rc)
2184                         RETURN(rc);
2185
2186                 i = lmv_fld_lookup(obd, fid);
2187                 if (i < 0)
2188                         RETURN(i);
2189                 rc = obd_set_info_async(lmv->tgts[i].ltd_exp,
2190                                         keylen, key, vallen, val,
2191                                         set);
2192                 RETURN(rc);
2193         }
2194
2195         if (keylen == strlen("chkconnect") &&
2196             memcmp(key, "chkconnect", keylen) == 0) {
2197                 rc = lmv_check_connect(obd);
2198                 RETURN(rc);
2199         }
2200
2201         RETURN(-EINVAL);
2202 }
2203
2204 int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
2205                struct lov_stripe_md *lsm)
2206 {
2207         struct obd_device *obd = class_exp2obd(exp);
2208         struct lmv_obd *lmv = &obd->u.lmv;
2209         struct lmv_stripe_md *meap, *lsmp;
2210         int mea_size, i;
2211         ENTRY;
2212
2213         mea_size = (sizeof(struct lu_fid) *
2214                     lmv->desc.ld_tgt_count) + sizeof(struct lmv_stripe_md);
2215         if (!lmmp)
2216                 RETURN(mea_size);
2217
2218         if (*lmmp && !lsm) {
2219                 OBD_FREE(*lmmp, mea_size);
2220                 *lmmp = NULL;
2221                 RETURN(0);
2222         }
2223
2224         if (*lmmp == NULL) {
2225                 OBD_ALLOC(*lmmp, mea_size);
2226                 if (*lmmp == NULL)
2227                         RETURN(-ENOMEM);
2228         }
2229
2230         if (!lsm)
2231                 RETURN(mea_size);
2232
2233         lsmp = (struct lmv_stripe_md *)lsm;
2234         meap = (struct lmv_stripe_md *)*lmmp;
2235
2236         if (lsmp->mea_magic != MEA_MAGIC_LAST_CHAR &&
2237             lsmp->mea_magic != MEA_MAGIC_ALL_CHARS)
2238                 RETURN(-EINVAL);
2239
2240         meap->mea_magic = cpu_to_le32(lsmp->mea_magic);
2241         meap->mea_count = cpu_to_le32(lsmp->mea_count);
2242         meap->mea_master = cpu_to_le32(lsmp->mea_master);
2243
2244         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2245                 meap->mea_ids[i] = meap->mea_ids[i];
2246                 fid_cpu_to_le(&meap->mea_ids[i]);
2247         }
2248
2249         RETURN(mea_size);
2250 }
2251
2252 int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
2253                  struct lov_mds_md *lmm, int lmm_size)
2254 {
2255         struct obd_device *obd = class_exp2obd(exp);
2256         struct lmv_stripe_md **tmea = (struct lmv_stripe_md **)lsmp;
2257         struct lmv_stripe_md *mea = (struct lmv_stripe_md *)lmm;
2258         struct lmv_obd *lmv = &obd->u.lmv;
2259         int mea_size, i;
2260         __u32 magic;
2261         ENTRY;
2262
2263         mea_size = sizeof(struct lu_fid) *
2264                 lmv->desc.ld_tgt_count + sizeof(struct lmv_stripe_md);
2265
2266         if (lsmp == NULL)
2267                 return mea_size;
2268
2269         if (*lsmp != NULL && lmm == NULL) {
2270                 OBD_FREE(*tmea, mea_size);
2271                 RETURN(0);
2272         }
2273
2274         LASSERT(mea_size == lmm_size);
2275
2276         OBD_ALLOC(*tmea, mea_size);
2277         if (*tmea == NULL)
2278                 RETURN(-ENOMEM);
2279
2280         if (!lmm)
2281                 RETURN(mea_size);
2282
2283         if (mea->mea_magic == MEA_MAGIC_LAST_CHAR ||
2284             mea->mea_magic == MEA_MAGIC_ALL_CHARS)
2285         {
2286                 magic = le32_to_cpu(mea->mea_magic);
2287         } else {
2288                 /* old mea isnot handled here */
2289                 LBUG();
2290         }
2291
2292         (*tmea)->mea_magic = magic;
2293         (*tmea)->mea_count = le32_to_cpu(mea->mea_count);
2294         (*tmea)->mea_master = le32_to_cpu(mea->mea_master);
2295
2296         for (i = 0; i < (*tmea)->mea_count; i++) {
2297                 (*tmea)->mea_ids[i] = mea->mea_ids[i];
2298                 fid_le_to_cpu(&(*tmea)->mea_ids[i]);
2299         }
2300         RETURN(mea_size);
2301 }
2302
2303 #if 0
2304 /* lmv_create() and lmv_brw() is needed anymore as they purely server stuff and
2305  * lmv is going to use only on client. */
2306 static int lmv_obd_create_single(struct obd_export *exp, struct obdo *oa,
2307                                  struct lov_stripe_md **ea,
2308                                  struct obd_trans_info *oti)
2309 {
2310         struct obd_device *obd = exp->exp_obd;
2311         struct lmv_obd *lmv = &obd->u.lmv;
2312         struct lov_stripe_md obj_md;
2313         struct lov_stripe_md *obj_mdp = &obj_md;
2314         int rc = 0;
2315         ENTRY;
2316
2317         LASSERT(ea == NULL);
2318         LASSERT(oa->o_mds < lmv->desc.ld_tgt_count);
2319
2320         rc = obd_create(lmv->tgts[oa->o_mds].ltd_exp,
2321                         oa, &obj_mdp, oti);
2322
2323         RETURN(rc);
2324 }
2325
2326 /*
2327  * to be called from MDS only. @oa should have correct store cookie and o_fid
2328  * values for "master" object, as it will be used.
2329  */
2330 int lmv_obd_create(struct obd_export *exp, struct obdo *oa,
2331                    struct lov_stripe_md **ea, struct obd_trans_info *oti)
2332 {
2333         struct obd_device *obd = exp->exp_obd;
2334         struct lmv_obd *lmv = &obd->u.lmv;
2335         struct lmv_stripe_md *mea;
2336         struct lu_fid mid;
2337         int i, c, rc = 0;
2338         ENTRY;
2339
2340         rc = lmv_check_connect(obd);
2341         if (rc)
2342                 RETURN(rc);
2343
2344         LASSERT(oa != NULL);
2345
2346         if (ea == NULL) {
2347                 rc = lmv_obd_create_single(exp, oa, NULL, oti);
2348                 if (rc)
2349                         CERROR("Can't create object, rc = %d\n", rc);
2350                 RETURN(rc);
2351         }
2352
2353         if (*ea == NULL) {
2354                 rc = obd_alloc_diskmd(exp, (struct lov_mds_md **)ea);
2355                 if (rc < 0) {
2356                         CERROR("obd_alloc_diskmd() failed, error %d\n",
2357                                rc);
2358                         RETURN(rc);
2359                 } else
2360                         rc = 0;
2361
2362                 if (*ea == NULL)
2363                         RETURN(-ENOMEM);
2364         }
2365
2366         /* here we should take care about splitted dir, so store cookie and fid
2367          * for "master" object should already be allocated and passed in @oa. */
2368         LASSERT(oa->o_id != 0);
2369         LASSERT(oa->o_fid != 0);
2370
2371         /* save "master" object fid */
2372         obdo2fid(oa, &mid);
2373
2374         mea = (struct lmv_stripe_md *)*ea;
2375         mea->mea_master = -1;
2376         mea->mea_magic = MEA_MAGIC_ALL_CHARS;
2377
2378         if (!mea->mea_count || mea->mea_count > lmv->desc.ld_tgt_count)
2379                 mea->mea_count = lmv->desc.ld_tgt_count;
2380
2381         for (i = 0, c = 0; c < mea->mea_count && i < lmv->desc.ld_tgt_count; i++) {
2382                 struct lov_stripe_md obj_md;
2383                 struct lov_stripe_md *obj_mdp = &obj_md;
2384
2385                 if (lmv->tgts[i].ltd_exp == NULL) {
2386                         /* this is "master" MDS */
2387                         mea->mea_master = i;
2388                         mea->mea_ids[c] = mid;
2389                         c++;
2390                         continue;
2391                 }
2392
2393                 /*
2394                  * "master" MDS should always be part of stripped dir,
2395                  * so scan for it.
2396                  */
2397                 if (mea->mea_master == -1 && c == mea->mea_count - 1)
2398                         continue;
2399
2400                 oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLTYPE | OBD_MD_FLMODE |
2401                         OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLID;
2402
2403                 rc = obd_create(lmv->tgts[c].ltd_exp, oa, &obj_mdp, oti);
2404                 if (rc) {
2405                         CERROR("obd_create() failed on MDT target %d, "
2406                                "error %d\n", c, rc);
2407                         RETURN(rc);
2408                 }
2409
2410                 CDEBUG(D_OTHER, "dirobj at mds %d: "LPU64"/%u\n",
2411                        i, oa->o_id, oa->o_generation);
2412
2413
2414                 /*
2415                  * here, when object is created (or it is master and was passed
2416                  * from caller) on desired MDS we save its fid to local mea_ids.
2417                  */
2418                 LASSERT(oa->o_fid);
2419
2420                 /*
2421                  * store cookie should be defined here for both cases (master
2422                  * object and not master), because master is already created.
2423                  */
2424                 LASSERT(oa->o_id);
2425
2426                 /* fill mea by store cookie and fid */
2427                 obdo2fid(oa, &mea->mea_ids[c]);
2428                 c++;
2429         }
2430         LASSERT(c == mea->mea_count);
2431
2432         CDEBUG(D_OTHER, "%d dirobjects created\n",
2433                (int)mea->mea_count);
2434
2435         RETURN(rc);
2436 }
2437
2438 int lmv_brw(int rw, struct obd_export *exp, struct obdo *oa,
2439             struct lov_stripe_md *ea, obd_count oa_bufs,
2440             struct brw_page *pgarr, struct obd_trans_info *oti)
2441 {
2442         /* splitting is not needed in lmv */
2443         struct obd_device *obd = exp->exp_obd;
2444         struct lmv_obd *lmv = &obd->u.lmv;
2445         struct lmv_stripe_md *mea = (struct lmv_stripe_md *) ea;
2446         int err;
2447
2448         LASSERT(oa != NULL);
2449         LASSERT(ea != NULL);
2450         LASSERT(pgarr != NULL);
2451         LASSERT(oa->o_mds < lmv->desc.ld_tgt_count);
2452
2453         oa->o_gr = id_gen(&mea->mea_ids[oa->o_mds]);
2454         oa->o_id = id_ino(&mea->mea_ids[oa->o_mds]);
2455         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2456
2457         err = obd_brw(rw, lmv->tgts[oa->o_mds].ltd_exp,
2458                       oa, NULL, oa_bufs, pgarr, oti);
2459         RETURN(err);
2460 }
2461 #endif
2462
2463 static int lmv_cancel_unused(struct obd_export *exp,
2464                              struct lu_fid *fid,
2465                              int flags, void *opaque)
2466 {
2467         struct obd_device *obd = exp->exp_obd;
2468         struct lmv_obd *lmv = &obd->u.lmv;
2469         int rc = 0, err, i;
2470         ENTRY;
2471
2472         LASSERT(fid != NULL);
2473
2474         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2475                 if (!lmv->tgts[i].ltd_exp || !lmv->tgts[i].active)
2476                         continue;
2477
2478                 err = md_cancel_unused(lmv->tgts[i].ltd_exp,
2479                                        fid, flags, opaque);
2480                 if (!rc)
2481                         rc = err;
2482         }
2483         RETURN(rc);
2484 }
2485
2486 int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data)
2487 {
2488         struct obd_device *obd = exp->exp_obd;
2489         struct lmv_obd *lmv = &obd->u.lmv;
2490
2491         ENTRY;
2492         RETURN(md_set_lock_data(lmv->tgts[0].ltd_exp, lockh, data));
2493 }
2494
2495 int lmv_lock_match(struct obd_export *exp, int flags,
2496                    struct lu_fid *fid, ldlm_type_t type,
2497                    ldlm_policy_data_t *policy, ldlm_mode_t mode,
2498                    struct lustre_handle *lockh)
2499 {
2500         struct obd_device *obd = exp->exp_obd;
2501         struct lmv_obd *lmv = &obd->u.lmv;
2502         int i, rc = 0;
2503         ENTRY;
2504
2505         CDEBUG(D_OTHER, "lock match for "DFID"\n", PFID(fid));
2506
2507         /* with CMD every object can have two locks in different namespaces:
2508          * lookup lock in space of mds storing direntry and update/open lock in
2509          * space of mds storing inode. Thus we check all targets, not only that
2510          * one fid was created in. */
2511         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2512                 rc = md_lock_match(lmv->tgts[i].ltd_exp, flags, fid,
2513                                    type, policy, mode, lockh);
2514                 if (rc)
2515                         RETURN(1);
2516         }
2517
2518         RETURN(rc);
2519 }
2520
2521 int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
2522                       int offset, struct obd_export *dt_exp, struct lustre_md *md)
2523 {
2524         struct obd_device *obd = exp->exp_obd;
2525         struct lmv_obd *lmv = &obd->u.lmv;
2526         int rc;
2527
2528         ENTRY;
2529         rc = md_get_lustre_md(lmv->tgts[0].ltd_exp, req, offset, dt_exp, md);
2530         RETURN(rc);
2531 }
2532
2533 int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
2534 {
2535         struct obd_device *obd = exp->exp_obd;
2536         struct lmv_obd *lmv = &obd->u.lmv;
2537
2538         ENTRY;
2539         RETURN(md_free_lustre_md(lmv->tgts[0].ltd_exp, md));
2540 }
2541
2542 int lmv_set_open_replay_data(struct obd_export *exp,
2543                              struct obd_client_handle *och,
2544                              struct ptlrpc_request *open_req)
2545 {
2546         struct obd_device *obd = exp->exp_obd;
2547         struct lmv_obd *lmv = &obd->u.lmv;
2548
2549         ENTRY;
2550         RETURN(md_set_open_replay_data(lmv->tgts[0].ltd_exp,
2551                                        och, open_req));
2552 }
2553
2554 int lmv_clear_open_replay_data(struct obd_export *exp,
2555                                struct obd_client_handle *och)
2556 {
2557         struct obd_device *obd = exp->exp_obd;
2558         struct lmv_obd *lmv = &obd->u.lmv;
2559
2560         ENTRY;
2561         RETURN(md_clear_open_replay_data(lmv->tgts[0].ltd_exp, och));
2562 }
2563
2564 struct obd_ops lmv_obd_ops = {
2565         .o_owner                = THIS_MODULE,
2566         .o_setup                = lmv_setup,
2567         .o_cleanup              = lmv_cleanup,
2568         .o_precleanup           = lmv_precleanup,
2569         .o_process_config       = lmv_process_config,
2570         .o_connect              = lmv_connect,
2571         .o_disconnect           = lmv_disconnect,
2572         .o_statfs               = lmv_statfs,
2573         .o_llog_init            = lmv_llog_init,
2574         .o_llog_finish          = lmv_llog_finish,
2575         .o_get_info             = lmv_get_info,
2576         .o_set_info_async       = lmv_set_info_async,
2577         .o_packmd               = lmv_packmd,
2578         .o_unpackmd             = lmv_unpackmd,
2579         .o_notify               = lmv_notify,
2580         .o_fid_init             = lmv_fid_init,
2581         .o_fid_fini             = lmv_fid_fini,
2582         .o_fid_alloc            = lmv_fid_alloc,
2583         .o_fid_delete           = lmv_fid_delete,
2584         .o_iocontrol            = lmv_iocontrol
2585 };
2586
2587 struct md_ops lmv_md_ops = {
2588         .m_getstatus            = lmv_getstatus,
2589         .m_change_cbdata        = lmv_change_cbdata,
2590         .m_close                = lmv_close,
2591         .m_create               = lmv_create,
2592         .m_done_writing         = lmv_done_writing,
2593         .m_enqueue              = lmv_enqueue,
2594         .m_getattr              = lmv_getattr,
2595         .m_getxattr             = lmv_getxattr,
2596         .m_getattr_name         = lmv_getattr_name,
2597         .m_intent_lock          = lmv_intent_lock,
2598         .m_link                 = lmv_link,
2599         .m_rename               = lmv_rename,
2600         .m_setattr              = lmv_setattr,
2601         .m_setxattr              = lmv_setxattr,
2602         .m_sync                 = lmv_sync,
2603         .m_readpage             = lmv_readpage,
2604         .m_unlink               = lmv_unlink,
2605         .m_init_ea_size         = lmv_init_ea_size,
2606         .m_cancel_unused        = lmv_cancel_unused,
2607         .m_set_lock_data        = lmv_set_lock_data,
2608         .m_lock_match           = lmv_lock_match,
2609         .m_get_lustre_md        = lmv_get_lustre_md,
2610         .m_free_lustre_md       = lmv_free_lustre_md,
2611         .m_set_open_replay_data = lmv_set_open_replay_data,
2612         .m_clear_open_replay_data = lmv_clear_open_replay_data
2613 };
2614
2615 int __init lmv_init(void)
2616 {
2617         struct lprocfs_static_vars lvars;
2618         int rc;
2619
2620         obj_cache = kmem_cache_create("lmv_objects",
2621                                       sizeof(struct lmv_obj),
2622                                       0, 0, NULL, NULL);
2623         if (!obj_cache) {
2624                 CERROR("error allocating lmv objects cache\n");
2625                 return -ENOMEM;
2626         }
2627
2628         lprocfs_init_vars(lmv, &lvars);
2629         rc = class_register_type(&lmv_obd_ops, &lmv_md_ops,
2630                                  lvars.module_vars, LUSTRE_LMV_NAME, NULL);
2631         if (rc)
2632                 kmem_cache_destroy(obj_cache);
2633
2634         return rc;
2635 }
2636
2637 #ifdef __KERNEL__
2638 static void lmv_exit(void)
2639 {
2640         class_unregister_type(LUSTRE_LMV_NAME);
2641
2642         LASSERTF(kmem_cache_destroy(obj_cache) == 0,
2643                  "can't free lmv objects cache, %d object(s)"
2644                  "still in use\n", atomic_read(&obj_cache_count));
2645 }
2646
2647 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2648 MODULE_DESCRIPTION("Lustre Logical Metadata Volume OBD driver");
2649 MODULE_LICENSE("GPL");
2650
2651 module_init(lmv_init);
2652 module_exit(lmv_exit);
2653 #endif