1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002, 2003, 2004, 2005, 2006 Cluster File Systems, Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 # define EXPORT_SYMTAB
25 #define DEBUG_SUBSYSTEM S_LMV
27 #include <linux/slab.h>
28 #include <linux/module.h>
29 #include <linux/init.h>
30 #include <linux/slab.h>
31 #include <linux/pagemap.h>
33 #include <asm/div64.h>
34 #include <linux/seq_file.h>
35 #include <linux/namei.h>
37 #include <liblustre.h>
39 #include <linux/ext2_fs.h>
41 #include <lustre/lustre_idl.h>
42 #include <lustre_log.h>
43 #include <obd_support.h>
44 #include <lustre_lib.h>
45 #include <lustre_net.h>
46 #include <obd_class.h>
47 #include <lprocfs_status.h>
48 #include <lustre_lite.h>
49 #include <lustre_fid.h>
50 #include "lmv_internal.h"
52 /* not defined for liblustre building */
53 #if !defined(ATOMIC_INIT)
54 #define ATOMIC_INIT(val) { (val) }
58 kmem_cache_t *obj_cache;
59 atomic_t obj_cache_count = ATOMIC_INIT(0);
61 static void lmv_activate_target(struct lmv_obd *lmv,
62 struct lmv_tgt_desc *tgt,
65 if (tgt->ltd_active == activate)
68 tgt->ltd_active = activate;
69 lmv->desc.ld_active_tgt_count += (activate ? 1 : -1);
74 * -EINVAL : UUID can't be found in the LMV's target list
75 * -ENOTCONN: The UUID is found, but the target connection is bad (!)
76 * -EBADF : The UUID is found, but the OBD of the wrong type (!)
78 static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
81 struct lmv_tgt_desc *tgt;
82 struct obd_device *obd;
86 CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
87 lmv, uuid->uuid, activate);
89 spin_lock(&lmv->lmv_lock);
90 for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
91 if (tgt->ltd_exp == NULL)
94 CDEBUG(D_INFO, "lmv idx %d is %s conn "LPX64"\n",
95 i, tgt->ltd_uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
97 if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
101 if (i == lmv->desc.ld_tgt_count)
102 GOTO(out_lmv_lock, rc = -EINVAL);
104 obd = class_exp2obd(tgt->ltd_exp);
106 GOTO(out_lmv_lock, rc = -ENOTCONN);
108 CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
109 obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
110 obd->obd_type->typ_name, i);
111 LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0);
113 if (tgt->ltd_active == activate) {
114 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
115 activate ? "" : "in");
116 GOTO(out_lmv_lock, rc);
119 CDEBUG(D_INFO, "Marking OBD %p %sactive\n",
120 obd, activate ? "" : "in");
122 lmv_activate_target(lmv, tgt, activate);
127 spin_unlock(&lmv->lmv_lock);
131 static int lmv_set_mdc_data(struct lmv_obd *lmv, struct obd_uuid *uuid,
132 struct obd_connect_data *data)
134 struct lmv_tgt_desc *tgt;
138 LASSERT(data != NULL);
140 spin_lock(&lmv->lmv_lock);
141 for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
142 if (tgt->ltd_exp == NULL)
145 if (obd_uuid_equals(uuid, &tgt->ltd_uuid)) {
146 lmv->datas[tgt->ltd_idx] = *data;
150 spin_unlock(&lmv->lmv_lock);
154 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
155 enum obd_notify_event ev, void *data)
157 struct lmv_obd *lmv = &obd->u.lmv;
158 struct obd_uuid *uuid;
162 if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) {
163 CERROR("unexpected notification of %s %s!\n",
164 watched->obd_type->typ_name,
169 uuid = &watched->u.cli.cl_target_uuid;
170 if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) {
172 * Set MDC as active before notifying the observer, so the
173 * observer can use the MDC normally.
175 rc = lmv_set_mdc_active(lmv, uuid,
176 ev == OBD_NOTIFY_ACTIVE);
178 CERROR("%sactivation of %s failed: %d\n",
179 ev == OBD_NOTIFY_ACTIVE ? "" : "de",
185 if (ev == OBD_NOTIFY_OCD) {
186 struct obd_connect_data *conn_data =
187 &watched->u.cli.cl_import->imp_connect_data;
189 /* Set connect data to desired target, update
190 * exp_connect_flags. */
191 rc = lmv_set_mdc_data(lmv, uuid, conn_data);
193 CERROR("can't set connect data to target %s, rc %d\n",
199 * XXX: Make sure that ocd_connect_flags from all targets are
200 * the same. Otherwise one of MDTs runs wrong version or
201 * something like this. --umka
203 obd->obd_self_export->exp_connect_flags =
204 conn_data->ocd_connect_flags;
207 /* Pass the notification up the chain. */
208 if (obd->obd_observer)
209 rc = obd_notify(obd->obd_observer, watched, ev, data);
214 /* this is fake connect function. Its purpose is to initialize lmv and say
215 * caller that everything is okay. Real connection will be performed later. */
216 static int lmv_connect(const struct lu_env *env,
217 struct lustre_handle *conn, struct obd_device *obd,
218 struct obd_uuid *cluuid, struct obd_connect_data *data)
221 struct proc_dir_entry *lmv_proc_dir;
223 struct lmv_obd *lmv = &obd->u.lmv;
224 struct obd_export *exp;
228 rc = class_connect(conn, obd, cluuid);
230 CERROR("class_connection() returned %d\n", rc);
234 exp = class_conn2export(conn);
236 /* we don't want to actually do the underlying connections more than
237 * once, so keep track. */
239 if (lmv->refcount > 1) {
240 class_export_put(exp);
246 lmv->cluuid = *cluuid;
249 lmv->conn_data = *data;
252 lmv_proc_dir = lprocfs_register("target_obds", obd->obd_proc_entry,
254 if (IS_ERR(lmv_proc_dir)) {
255 CERROR("could not register /proc/fs/lustre/%s/%s/target_obds.",
256 obd->obd_type->typ_name, obd->obd_name);
261 /* all real clients should perform actual connection right away, because
262 * it is possible, that LMV will not have opportunity to connect targets
263 * and MDC stuff will be called directly, for instance while reading
264 * ../mdc/../kbytesfree procfs file, etc. */
265 if (data->ocd_connect_flags & OBD_CONNECT_REAL)
266 rc = lmv_check_connect(obd);
271 lprocfs_remove(lmv_proc_dir);
278 static void lmv_set_timeouts(struct obd_device *obd)
280 struct lmv_tgt_desc *tgts;
285 if (lmv->server_timeout == 0)
288 if (lmv->connected == 0)
291 for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgts++) {
292 if (tgts->ltd_exp == NULL)
295 obd_set_info_async(tgts->ltd_exp, strlen("inter_mds"),
296 "inter_mds", 0, NULL, NULL);
300 static int lmv_init_ea_size(struct obd_export *exp, int easize,
301 int def_easize, int cookiesize)
303 struct obd_device *obd = exp->exp_obd;
304 struct lmv_obd *lmv = &obd->u.lmv;
305 int i, rc = 0, change = 0;
308 if (lmv->max_easize < easize) {
309 lmv->max_easize = easize;
312 if (lmv->max_def_easize < def_easize) {
313 lmv->max_def_easize = def_easize;
316 if (lmv->max_cookiesize < cookiesize) {
317 lmv->max_cookiesize = cookiesize;
323 if (lmv->connected == 0)
326 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
327 if (lmv->tgts[i].ltd_exp == NULL) {
328 CWARN("%s: NULL export for %d\n", obd->obd_name, i);
332 rc = md_init_ea_size(lmv->tgts[i].ltd_exp, easize, def_easize,
335 CERROR("obd_init_ea_size() failed on MDT target %d, "
336 "error %d.\n", i, rc);
343 #define MAX_STRING_SIZE 128
345 int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
347 struct lmv_obd *lmv = &obd->u.lmv;
348 struct obd_uuid *cluuid = &lmv->cluuid;
349 struct obd_connect_data *mdc_data = NULL;
350 struct obd_uuid lmv_mdc_uuid = { "LMV_MDC_UUID" };
351 struct lustre_handle conn = {0, };
352 struct obd_device *mdc_obd;
353 struct obd_export *mdc_exp;
354 struct lu_fld_target target;
357 struct proc_dir_entry *lmv_proc_dir;
361 /* for MDS: don't connect to yourself */
362 if (obd_uuid_equals(&tgt->ltd_uuid, cluuid)) {
363 CDEBUG(D_CONFIG, "don't connect back to %s\n", cluuid->uuid);
364 /* XXX - the old code didn't increment active tgt count.
369 mdc_obd = class_find_client_obd(&tgt->ltd_uuid, LUSTRE_MDC_NAME,
372 CERROR("target %s not attached\n", tgt->ltd_uuid.uuid);
376 CDEBUG(D_CONFIG, "connect to %s(%s) - %s, %s FOR %s\n",
377 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
378 tgt->ltd_uuid.uuid, obd->obd_uuid.uuid,
381 if (!mdc_obd->obd_set_up) {
382 CERROR("target %s is not set up\n", tgt->ltd_uuid.uuid);
386 rc = obd_connect(NULL, &conn, mdc_obd, &lmv_mdc_uuid,
389 CERROR("target %s connect error %d\n", tgt->ltd_uuid.uuid, rc);
393 mdc_exp = class_conn2export(&conn);
395 /* Init fid sequence client for this mdc. */
396 rc = obd_fid_init(mdc_exp);
400 /* Add new FLD target. */
401 target.ft_srv = NULL;
402 target.ft_exp = mdc_exp;
403 target.ft_idx = tgt->ltd_idx;
405 fld_client_add_target(&lmv->lmv_fld, &target);
407 mdc_data = &class_exp2cliimp(mdc_exp)->imp_connect_data;
409 rc = obd_register_observer(mdc_obd, obd);
411 obd_disconnect(mdc_exp);
412 CERROR("target %s register_observer error %d\n",
413 tgt->ltd_uuid.uuid, rc);
417 if (obd->obd_observer) {
418 /* tell the mds_lmv about the new target */
419 rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
420 OBD_NOTIFY_ACTIVE, (void *)(tgt - lmv->tgts));
422 obd_disconnect(mdc_exp);
428 tgt->ltd_exp = mdc_exp;
429 lmv->desc.ld_active_tgt_count++;
431 /* copy connect data, it may be used later */
432 lmv->datas[tgt->ltd_idx] = *mdc_data;
434 md_init_ea_size(tgt->ltd_exp, lmv->max_easize,
435 lmv->max_def_easize, lmv->max_cookiesize);
437 CDEBUG(D_CONFIG, "connected to %s(%s) successfully (%d)\n",
438 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
439 atomic_read(&obd->obd_refcount));
442 lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
444 struct proc_dir_entry *mdc_symlink;
445 char name[MAX_STRING_SIZE + 1];
447 LASSERT(mdc_obd->obd_type != NULL);
448 LASSERT(mdc_obd->obd_type->typ_name != NULL);
449 name[MAX_STRING_SIZE] = '\0';
450 snprintf(name, MAX_STRING_SIZE, "../../../%s/%s",
451 mdc_obd->obd_type->typ_name,
453 mdc_symlink = proc_symlink(mdc_obd->obd_name,
455 if (mdc_symlink == NULL) {
456 CERROR("could not register LMV target "
457 "/proc/fs/lustre/%s/%s/target_obds/%s.",
458 obd->obd_type->typ_name, obd->obd_name,
460 lprocfs_remove(lmv_proc_dir);
468 int lmv_add_target(struct obd_device *obd, struct obd_uuid *tgt_uuid)
470 struct lmv_obd *lmv = &obd->u.lmv;
471 struct lmv_tgt_desc *tgt;
475 CDEBUG(D_CONFIG, "tgt_uuid: %s.\n", tgt_uuid->uuid);
479 if (lmv->desc.ld_active_tgt_count >= LMV_MAX_TGT_COUNT) {
480 lmv_init_unlock(lmv);
481 CERROR("can't add %s, LMV module compiled for %d MDCs. "
482 "That many MDCs already configured.\n",
483 tgt_uuid->uuid, LMV_MAX_TGT_COUNT);
486 if (lmv->desc.ld_tgt_count == 0) {
487 struct obd_device *mdc_obd;
489 mdc_obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME,
492 lmv_init_unlock(lmv);
493 CERROR("Target %s not attached\n", tgt_uuid->uuid);
497 rc = obd_llog_init(obd, NULL, mdc_obd, 0, NULL, tgt_uuid);
499 lmv_init_unlock(lmv);
500 CERROR("lmv failed to setup llogging subsystems\n");
503 spin_lock(&lmv->lmv_lock);
504 tgt = lmv->tgts + lmv->desc.ld_tgt_count++;
505 tgt->ltd_uuid = *tgt_uuid;
506 spin_unlock(&lmv->lmv_lock);
508 if (lmv->connected) {
509 rc = lmv_connect_mdc(obd, tgt);
511 spin_lock(&lmv->lmv_lock);
512 lmv->desc.ld_tgt_count--;
513 memset(tgt, 0, sizeof(*tgt));
514 spin_unlock(&lmv->lmv_lock);
516 int easize = sizeof(struct lmv_stripe_md) +
517 lmv->desc.ld_tgt_count *
518 sizeof(struct lu_fid);
519 lmv_init_ea_size(obd->obd_self_export, easize, 0, 0);
523 lmv_init_unlock(lmv);
527 /* performs a check if passed obd is connected. If no - connect it. */
528 int lmv_check_connect(struct obd_device *obd)
530 struct lmv_obd *lmv = &obd->u.lmv;
531 struct lmv_tgt_desc *tgt;
539 if (lmv->connected) {
540 lmv_init_unlock(lmv);
544 if (lmv->desc.ld_tgt_count == 0) {
545 CERROR("%s: no targets configured.\n", obd->obd_name);
549 CDEBUG(D_CONFIG, "time to connect %s to %s\n",
550 lmv->cluuid.uuid, obd->obd_name);
552 LASSERT(lmv->tgts != NULL);
554 for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
555 rc = lmv_connect_mdc(obd, tgt);
560 lmv_set_timeouts(obd);
561 class_export_put(lmv->exp);
563 easize = lmv_get_easize(lmv);
564 lmv_init_ea_size(obd->obd_self_export, easize, 0, 0);
565 lmv_init_unlock(lmv);
574 --lmv->desc.ld_active_tgt_count;
575 rc2 = obd_disconnect(tgt->ltd_exp);
577 CERROR("error: LMV target %s disconnect on "
578 "MDC idx %d: error %d\n",
579 tgt->ltd_uuid.uuid, i, rc2);
583 class_disconnect(lmv->exp);
584 lmv_init_unlock(lmv);
588 static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
591 struct proc_dir_entry *lmv_proc_dir;
593 struct lmv_obd *lmv = &obd->u.lmv;
594 struct obd_device *mdc_obd;
598 LASSERT(tgt != NULL);
599 LASSERT(obd != NULL);
601 mdc_obd = class_exp2obd(tgt->ltd_exp);
604 mdc_obd->obd_no_recov = obd->obd_no_recov;
607 lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
609 struct proc_dir_entry *mdc_symlink;
611 mdc_symlink = lprocfs_srch(lmv_proc_dir, mdc_obd->obd_name);
613 lprocfs_remove(mdc_symlink);
615 CERROR("/proc/fs/lustre/%s/%s/target_obds/%s missing\n",
616 obd->obd_type->typ_name, obd->obd_name,
621 rc = obd_fid_fini(tgt->ltd_exp);
623 CERROR("Can't finanize fids factory\n");
625 CDEBUG(D_OTHER, "Disconnected from %s(%s) successfully\n",
626 tgt->ltd_exp->exp_obd->obd_name,
627 tgt->ltd_exp->exp_obd->obd_uuid.uuid);
629 obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
630 rc = obd_disconnect(tgt->ltd_exp);
632 if (tgt->ltd_active) {
633 CERROR("Target %s disconnect error %d\n",
634 tgt->ltd_uuid.uuid, rc);
638 lmv_activate_target(lmv, tgt, 0);
643 static int lmv_disconnect(struct obd_export *exp)
645 struct obd_device *obd = class_exp2obd(exp);
647 struct proc_dir_entry *lmv_proc_dir;
649 struct lmv_obd *lmv = &obd->u.lmv;
656 /* Only disconnect the underlying layers on the final disconnect. */
658 if (lmv->refcount != 0)
661 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
662 if (lmv->tgts[i].ltd_exp == NULL)
664 lmv_disconnect_mdc(obd, &lmv->tgts[i]);
668 lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
670 lprocfs_remove(lmv_proc_dir);
672 CERROR("/proc/fs/lustre/%s/%s/target_obds missing\n",
673 obd->obd_type->typ_name, obd->obd_name);
679 * This is the case when no real connection is established by
680 * lmv_check_connect().
683 class_export_put(exp);
684 rc = class_disconnect(exp);
685 if (lmv->refcount == 0)
690 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
691 int len, void *karg, void *uarg)
693 struct obd_device *obddev = class_exp2obd(exp);
694 struct lmv_obd *lmv = &obddev->u.lmv;
695 int i, rc = 0, set = 0;
698 if (lmv->desc.ld_tgt_count == 0)
701 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
704 if (lmv->tgts[i].ltd_exp == NULL)
707 err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp, len, karg, uarg);
709 if (lmv->tgts[i].ltd_active) {
710 CERROR("error: iocontrol MDC %s on MDT"
711 "idx %d: err = %d\n",
712 lmv->tgts[i].ltd_uuid.uuid, i, err);
725 /* assume all is balanced for now */
726 static int lmv_fids_balanced(struct obd_device *obd)
732 static int lmv_all_chars_policy(int count, struct qstr *name)
735 unsigned int len = name->len;
738 c += name->name[-- len];
743 static int lmv_placement_policy(struct obd_device *obd,
744 struct lu_placement_hint *hint,
747 struct lmv_obd *lmv = &obd->u.lmv;
752 LASSERT(mds != NULL);
754 /* Here are some policies to allocate new fid */
755 if (lmv_fids_balanced(obd)) {
757 * Allocate new fid basing on its name in the case fids are
758 * balanced, that is all sequences have more or less equal
759 * number of objects created.
761 obj = lmv_obj_grab(obd, hint->ph_pfid);
767 * If the dir got split, alloc fid according to its
768 * hash. No matter what we create, object create should
771 mea_idx = raw_name2idx(obj->lo_hashtype,
773 hint->ph_cname->name,
774 hint->ph_cname->len);
775 rpid = &obj->lo_inodes[mea_idx].li_fid;
776 *mds = obj->lo_inodes[mea_idx].li_mds;
780 CDEBUG(D_INODE, "The obj "DFID" has been split, got "
781 "MDS at "LPU64" by name %s\n", PFID(hint->ph_pfid),
782 *mds, hint->ph_cname->name);
783 } else if (hint->ph_cname && (hint->ph_opc == LUSTRE_OPC_MKDIR)) {
784 /* Default policy for directories. */
785 *mds = lmv_all_chars_policy(lmv->desc.ld_tgt_count,
790 * Default policy for others is to use parent MDS.
791 * ONLY directories can be cross-ref during creation.
793 rc = lmv_fld_lookup(lmv, hint->ph_pfid, mds);
797 * Sequences among all tgts are not well balanced, allocate new
798 * fid taking this into account to balance them. Not implemented
806 CERROR("Can't choose MDS, err = %d\n", rc);
808 LASSERT(*mds < lmv->desc.ld_tgt_count);
814 int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
817 struct lmv_tgt_desc *tgt = &lmv->tgts[mds];
821 /* New seq alloc and FLD setup should be atomic. */
822 down(&tgt->ltd_fid_sem);
824 /* Asking underlaying tgt layer to allocate new fid. */
825 rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL);
827 LASSERT(fid_is_sane(fid));
829 /* Client switches to new sequence, setup FLD. */
830 rc = fld_client_create(&lmv->lmv_fld, fid_seq(fid),
833 CERROR("Can't create fld entry, rc %d\n", rc);
837 up(&tgt->ltd_fid_sem);
841 static int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
842 struct lu_placement_hint *hint)
844 struct obd_device *obd = class_exp2obd(exp);
845 struct lmv_obd *lmv = &obd->u.lmv;
850 LASSERT(hint != NULL);
851 LASSERT(fid != NULL);
853 rc = lmv_placement_policy(obd, hint, &mds);
855 CERROR("Can't get target for allocating fid, rc %d\n", rc);
859 rc = __lmv_fid_alloc(lmv, fid, mds);
861 CERROR("Can't alloc new fid, rc %d\n", rc);
868 static int lmv_fid_delete(struct obd_export *exp, const struct lu_fid *fid)
873 if (lmv_obj_delete(exp, fid)) {
874 CDEBUG(D_OTHER, "lmv object "DFID" is destroyed.\n",
880 static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
882 struct lmv_obd *lmv = &obd->u.lmv;
883 struct lprocfs_static_vars lvars;
884 struct lmv_desc *desc;
888 if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
889 CERROR("LMV setup requires a descriptor\n");
893 desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
894 if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
895 CERROR("descriptor size wrong: %d > %d\n",
896 (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
900 lmv->tgts_size = LMV_MAX_TGT_COUNT * sizeof(struct lmv_tgt_desc);
902 OBD_ALLOC(lmv->tgts, lmv->tgts_size);
903 if (lmv->tgts == NULL)
906 for (i = 0; i < LMV_MAX_TGT_COUNT; i++) {
907 sema_init(&lmv->tgts[i].ltd_fid_sem, 1);
908 lmv->tgts[i].ltd_idx = i;
911 lmv->datas_size = LMV_MAX_TGT_COUNT * sizeof(struct obd_connect_data);
913 OBD_ALLOC(lmv->datas, lmv->datas_size);
914 if (lmv->datas == NULL)
915 GOTO(out_free_tgts, rc = -ENOMEM);
917 obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid);
918 lmv->desc.ld_tgt_count = 0;
919 lmv->desc.ld_active_tgt_count = 0;
920 lmv->max_cookiesize = 0;
921 lmv->max_def_easize = 0;
924 spin_lock_init(&lmv->lmv_lock);
925 sema_init(&lmv->init_sem, 1);
927 rc = lmv_obj_setup(obd);
929 CERROR("Can't setup LMV object manager, "
931 GOTO(out_free_datas, rc);
934 lprocfs_init_vars(lmv, &lvars);
935 lprocfs_obd_setup(obd, lvars.obd_vars);
938 struct proc_dir_entry *entry;
940 entry = create_proc_entry("target_obd_status", 0444,
941 obd->obd_proc_entry);
943 entry->proc_fops = &lmv_proc_target_fops;
948 rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
949 LUSTRE_CLI_FLD_HASH_DHT);
951 CERROR("can't init FLD, err %d\n",
953 GOTO(out_free_datas, rc);
959 OBD_FREE(lmv->datas, lmv->datas_size);
962 OBD_FREE(lmv->tgts, lmv->tgts_size);
967 static int lmv_cleanup(struct obd_device *obd)
969 struct lmv_obd *lmv = &obd->u.lmv;
972 fld_client_fini(&lmv->lmv_fld);
973 lprocfs_obd_cleanup(obd);
974 lmv_obj_cleanup(obd);
975 OBD_FREE(lmv->datas, lmv->datas_size);
976 OBD_FREE(lmv->tgts, lmv->tgts_size);
981 static int lmv_process_config(struct obd_device *obd, obd_count len, void *buf)
983 struct lustre_cfg *lcfg = buf;
984 struct obd_uuid tgt_uuid;
988 switch(lcfg->lcfg_command) {
990 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(tgt_uuid.uuid))
991 GOTO(out, rc = -EINVAL);
993 obd_str2uuid(&tgt_uuid, lustre_cfg_string(lcfg, 1));
994 rc = lmv_add_target(obd, &tgt_uuid);
997 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
998 GOTO(out, rc = -EINVAL);
1005 static int lmv_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1008 struct lmv_obd *lmv = &obd->u.lmv;
1009 struct obd_statfs *temp;
1013 rc = lmv_check_connect(obd);
1017 OBD_ALLOC(temp, sizeof(*temp));
1021 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1022 if (lmv->tgts[i].ltd_exp == NULL)
1025 rc = obd_statfs(lmv->tgts[i].ltd_exp->exp_obd, temp, max_age);
1027 CERROR("can't stat MDS #%d (%s), error %d\n", i,
1028 lmv->tgts[i].ltd_exp->exp_obd->obd_name,
1030 GOTO(out_free_temp, rc);
1035 osfs->os_bavail += temp->os_bavail;
1036 osfs->os_blocks += temp->os_blocks;
1037 osfs->os_ffree += temp->os_ffree;
1038 osfs->os_files += temp->os_files;
1044 OBD_FREE(temp, sizeof(*temp));
1048 static int lmv_getstatus(struct obd_export *exp,
1050 struct obd_capa **pc)
1052 struct obd_device *obd = exp->exp_obd;
1053 struct lmv_obd *lmv = &obd->u.lmv;
1057 rc = lmv_check_connect(obd);
1061 rc = md_getstatus(lmv->tgts[0].ltd_exp, fid, pc);
1066 static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid,
1067 struct obd_capa *oc, obd_valid valid, const char *name,
1068 const char *input, int input_size, int output_size,
1069 int flags, struct ptlrpc_request **request)
1071 struct obd_device *obd = exp->exp_obd;
1072 struct lmv_obd *lmv = &obd->u.lmv;
1073 struct obd_export *tgt_exp;
1077 rc = lmv_check_connect(obd);
1081 tgt_exp = lmv_find_export(lmv, fid);
1082 if (IS_ERR(tgt_exp))
1083 RETURN(PTR_ERR(tgt_exp));
1085 rc = md_getxattr(tgt_exp, fid, oc, valid, name, input, input_size,
1086 output_size, flags, request);
1091 static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
1092 struct obd_capa *oc, obd_valid valid, const char *name,
1093 const char *input, int input_size, int output_size,
1094 int flags, struct ptlrpc_request **request)
1096 struct obd_device *obd = exp->exp_obd;
1097 struct lmv_obd *lmv = &obd->u.lmv;
1098 struct obd_export *tgt_exp;
1102 rc = lmv_check_connect(obd);
1106 tgt_exp = lmv_find_export(lmv, fid);
1107 if (IS_ERR(tgt_exp))
1108 RETURN(PTR_ERR(tgt_exp));
1110 rc = md_setxattr(tgt_exp, fid, oc, valid, name,
1111 input, input_size, output_size, flags, request);
1116 static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid,
1117 struct obd_capa *oc, obd_valid valid, int ea_size,
1118 struct ptlrpc_request **request)
1120 struct obd_device *obd = exp->exp_obd;
1121 struct lmv_obd *lmv = &obd->u.lmv;
1122 struct obd_export *tgt_exp;
1123 struct lmv_obj *obj;
1127 rc = lmv_check_connect(obd);
1131 tgt_exp = lmv_find_export(lmv, fid);
1132 if (IS_ERR(tgt_exp))
1133 RETURN(PTR_ERR(tgt_exp));
1135 rc = md_getattr(tgt_exp, fid, oc, valid, ea_size, request);
1139 obj = lmv_obj_grab(obd, fid);
1141 CDEBUG(D_OTHER, "GETATTR for "DFID" %s\n",
1142 PFID(fid), obj ? "(split)" : "");
1144 /* if object is split, then we loop over all the slaves and gather size
1145 * attribute. In ideal world we would have to gather also mds field from
1146 * all slaves, as object is spread over the cluster and this is
1147 * definitely interesting information and it is not good to loss it,
1150 struct mdt_body *body;
1152 if (*request == NULL) {
1157 body = lustre_msg_buf((*request)->rq_repmsg, REPLY_REC_OFF,
1159 LASSERT(body != NULL);
1163 for (i = 0; i < obj->lo_objcount; i++) {
1164 if (lmv->tgts[i].ltd_exp == NULL) {
1165 CWARN("%s: NULL export for %d\n",
1170 /* skip master obj. */
1171 if (lu_fid_eq(&obj->lo_fid, &obj->lo_inodes[i].li_fid))
1174 body->size += obj->lo_inodes[i].li_size;
1177 lmv_obj_unlock(obj);
1184 static int lmv_change_cbdata(struct obd_export *exp, const struct lu_fid *fid,
1185 ldlm_iterator_t it, void *data)
1187 struct obd_device *obd = exp->exp_obd;
1188 struct lmv_obd *lmv = &obd->u.lmv;
1192 rc = lmv_check_connect(obd);
1196 CDEBUG(D_OTHER, "CBDATA for "DFID"\n", PFID(fid));
1198 /* with CMD every object can have two locks in different namespaces:
1199 * lookup lock in space of mds storing direntry and update/open lock in
1200 * space of mds storing inode */
1201 for (i = 0; i < lmv->desc.ld_tgt_count; i++)
1202 md_change_cbdata(lmv->tgts[i].ltd_exp, fid, it, data);
1207 static int lmv_close(struct obd_export *exp,
1208 struct md_op_data *op_data,
1209 struct obd_client_handle *och,
1210 struct ptlrpc_request **request)
1212 struct obd_device *obd = exp->exp_obd;
1213 struct lmv_obd *lmv = &obd->u.lmv;
1214 struct obd_export *tgt_exp;
1218 rc = lmv_check_connect(obd);
1222 tgt_exp = lmv_find_export(lmv, &op_data->op_fid1);
1223 if (IS_ERR(tgt_exp))
1224 RETURN(PTR_ERR(tgt_exp));
1226 CDEBUG(D_OTHER, "CLOSE "DFID"\n", PFID(&op_data->op_fid1));
1227 rc = md_close(tgt_exp, op_data, och, request);
1232 * Called in the case MDS returns -ERESTART on create on open, what means that
1233 * directory is split and its LMV presentation object has to be updated.
1235 int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid)
1237 struct obd_device *obd = exp->exp_obd;
1238 struct lmv_obd *lmv = &obd->u.lmv;
1239 struct ptlrpc_request *req = NULL;
1240 struct obd_export *tgt_exp;
1241 struct lmv_obj *obj;
1242 struct lustre_md md;
1248 mealen = lmv_get_easize(lmv);
1250 valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA;
1252 tgt_exp = lmv_find_export(lmv, fid);
1253 if (IS_ERR(tgt_exp))
1254 RETURN(PTR_ERR(tgt_exp));
1256 /* time to update mea of parent fid */
1257 rc = md_getattr(tgt_exp, fid, NULL, valid, mealen, &req);
1259 CERROR("md_getattr() failed, error %d\n", rc);
1263 rc = md_get_lustre_md(tgt_exp, req, 1, NULL, exp, &md);
1265 CERROR("mdc_get_lustre_md() failed, error %d\n", rc);
1270 GOTO(cleanup, rc = -ENODATA);
1272 obj = lmv_obj_create(exp, fid, md.mea);
1278 obd_free_memmd(exp, (struct lov_stripe_md **)&md.mea);
1283 ptlrpc_req_finished(req);
1287 int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
1288 const void *data, int datalen, int mode, __u32 uid,
1289 __u32 gid, __u32 cap_effective, __u64 rdev,
1290 struct ptlrpc_request **request)
1292 struct obd_device *obd = exp->exp_obd;
1293 struct lmv_obd *lmv = &obd->u.lmv;
1294 struct obd_export *tgt_exp;
1295 struct lmv_obj *obj;
1299 rc = lmv_check_connect(obd);
1303 if (!lmv->desc.ld_active_tgt_count)
1306 LASSERT(++loop <= 2);
1307 obj = lmv_obj_grab(obd, &op_data->op_fid1);
1311 mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1312 op_data->op_name, op_data->op_namelen);
1313 op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid;
1314 tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds);
1317 tgt_exp = lmv_find_export(lmv, &op_data->op_fid1);
1320 if (IS_ERR(tgt_exp))
1321 RETURN(PTR_ERR(tgt_exp));
1323 CDEBUG(D_OTHER, "CREATE '%*s' on "DFID"\n", op_data->op_namelen,
1324 op_data->op_name, PFID(&op_data->op_fid1));
1326 rc = md_create(tgt_exp, op_data, data, datalen, mode, uid, gid,
1327 cap_effective, rdev, request);
1329 if (*request == NULL)
1331 CDEBUG(D_OTHER, "created. "DFID"\n", PFID(&op_data->op_fid1));
1332 } else if (rc == -ERESTART) {
1333 LASSERT(*request != NULL);
1334 DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
1335 "Got -ERESTART during create!\n");
1336 ptlrpc_req_finished(*request);
1339 * Directory got split. Time to update local object and repeat
1340 * the request with proper MDS.
1342 rc = lmv_handle_split(exp, &op_data->op_fid1);
1344 rc = lmv_alloc_slave_fids(obd, &op_data->op_fid1,
1345 op_data, &op_data->op_fid2);
1354 static int lmv_done_writing(struct obd_export *exp,
1355 struct md_op_data *op_data,
1356 struct obd_client_handle *och)
1358 struct obd_device *obd = exp->exp_obd;
1359 struct lmv_obd *lmv = &obd->u.lmv;
1360 struct obd_export *tgt_exp;
1364 rc = lmv_check_connect(obd);
1368 tgt_exp = lmv_find_export(lmv, &op_data->op_fid1);
1369 if (IS_ERR(tgt_exp))
1370 RETURN(PTR_ERR(tgt_exp));
1372 rc = md_done_writing(tgt_exp, op_data, och);
1377 lmv_enqueue_slaves(struct obd_export *exp, int locktype,
1378 struct lookup_intent *it, int lockmode,
1379 struct md_op_data *op_data, struct lustre_handle *lockh,
1380 void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
1381 ldlm_blocking_callback cb_blocking, void *cb_data)
1383 struct obd_device *obd = exp->exp_obd;
1384 struct lmv_obd *lmv = &obd->u.lmv;
1385 struct lmv_stripe_md *mea = op_data->op_mea1;
1386 struct md_op_data *op_data2;
1387 struct obd_export *tgt_exp;
1391 OBD_ALLOC_PTR(op_data2);
1392 if (op_data2 == NULL)
1395 LASSERT(mea != NULL);
1396 for (i = 0; i < mea->mea_count; i++) {
1397 memset(op_data2, 0, sizeof(*op_data2));
1398 op_data2->op_fid1 = mea->mea_ids[i];
1400 tgt_exp = lmv_find_export(lmv, &op_data2->op_fid1);
1401 if (IS_ERR(tgt_exp))
1402 GOTO(cleanup, rc = PTR_ERR(tgt_exp));
1404 if (tgt_exp == NULL)
1407 rc = md_enqueue(tgt_exp, locktype, it, lockmode, op_data2,
1408 lockh + i, lmm, lmmsize, cb_compl, cb_blocking,
1411 CDEBUG(D_OTHER, "take lock on slave "DFID" -> %d/%d\n",
1412 PFID(&mea->mea_ids[i]), rc, it->d.lustre.it_status);
1417 if (it->d.lustre.it_data) {
1418 struct ptlrpc_request *req;
1419 req = (struct ptlrpc_request *)it->d.lustre.it_data;
1420 ptlrpc_req_finished(req);
1423 if (it->d.lustre.it_status)
1424 GOTO(cleanup, rc = it->d.lustre.it_status);
1429 OBD_FREE_PTR(op_data2);
1432 /* drop all taken locks */
1434 if (lockh[i].cookie)
1435 ldlm_lock_decref(lockh + i, lockmode);
1436 lockh[i].cookie = 0;
1443 lmv_enqueue_remote(struct obd_export *exp, int lock_type,
1444 struct lookup_intent *it, int lock_mode,
1445 struct md_op_data *op_data, struct lustre_handle *lockh,
1446 void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
1447 ldlm_blocking_callback cb_blocking, void *cb_data,
1448 int extra_lock_flags)
1450 struct ptlrpc_request *req = it->d.lustre.it_data;
1451 struct obd_device *obd = exp->exp_obd;
1452 struct lmv_obd *lmv = &obd->u.lmv;
1453 struct mdt_body *body = NULL;
1454 struct lustre_handle plock;
1455 struct obd_export *tgt_exp;
1456 struct md_op_data *rdata;
1458 struct lu_fid fid_copy;
1461 body = lustre_msg_buf(req->rq_repmsg,
1462 DLM_REPLY_REC_OFF, sizeof(*body));
1463 LASSERT(body != NULL);
1465 if (!(body->valid & OBD_MD_MDS))
1468 CDEBUG(D_OTHER, "ENQUEUE '%s' on "DFID" -> "DFID"\n",
1469 LL_IT2STR(it), PFID(&op_data->op_fid1), PFID(&body->fid1));
1471 /* We got LOOKUP lock, but we really need attrs */
1472 pmode = it->d.lustre.it_lock_mode;
1473 LASSERT(pmode != 0);
1474 memcpy(&plock, lockh, sizeof(plock));
1475 it->d.lustre.it_lock_mode = 0;
1476 it->d.lustre.it_data = NULL;
1477 fid_copy = body->fid1;
1479 it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
1480 ptlrpc_req_finished(req);
1482 tgt_exp = lmv_find_export(lmv, &fid_copy);
1483 if (IS_ERR(tgt_exp))
1484 GOTO(out, rc = PTR_ERR(tgt_exp));
1486 OBD_ALLOC_PTR(rdata);
1488 GOTO(out, rc = -ENOMEM);
1490 rdata->op_fid1 = fid_copy;
1491 rdata->op_name = NULL;
1492 rdata->op_namelen = 0;
1494 rc = md_enqueue(tgt_exp, lock_type, it, lock_mode, rdata,
1495 lockh, lmm, lmmsize, cb_compl, cb_blocking,
1496 cb_data, extra_lock_flags);
1497 OBD_FREE_PTR(rdata);
1500 ldlm_lock_decref(&plock, pmode);
1505 lmv_enqueue(struct obd_export *exp, int lock_type,
1506 struct lookup_intent *it, int lock_mode,
1507 struct md_op_data *op_data, struct lustre_handle *lockh,
1508 void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
1509 ldlm_blocking_callback cb_blocking, void *cb_data,
1510 int extra_lock_flags)
1512 struct obd_device *obd = exp->exp_obd;
1513 struct lmv_obd *lmv = &obd->u.lmv;
1514 struct obd_export *tgt_exp = NULL;
1515 struct lmv_obj *obj;
1519 rc = lmv_check_connect(obd);
1523 if (op_data->op_mea1 && it->it_op == IT_UNLINK) {
1524 rc = lmv_enqueue_slaves(exp, lock_type, it, lock_mode,
1525 op_data, lockh, lmm, lmmsize,
1526 cb_compl, cb_blocking, cb_data);
1530 if (op_data->op_namelen) {
1531 obj = lmv_obj_grab(obd, &op_data->op_fid1);
1535 /* directory is split. look for right mds for this
1537 mea_idx = raw_name2idx(obj->lo_hashtype,
1539 (char *)op_data->op_name,
1540 op_data->op_namelen);
1541 op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid;
1542 tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds);
1547 if (tgt_exp == NULL)
1548 tgt_exp = lmv_find_export(lmv, &op_data->op_fid1);
1549 if (IS_ERR(tgt_exp))
1550 RETURN(PTR_ERR(tgt_exp));
1552 CDEBUG(D_OTHER, "ENQUEUE '%s' on "DFID"\n", LL_IT2STR(it),
1553 PFID(&op_data->op_fid1));
1555 rc = md_enqueue(tgt_exp, lock_type, it, lock_mode, op_data, lockh,
1556 lmm, lmmsize, cb_compl, cb_blocking, cb_data,
1559 if (rc == 0 && it->it_op == IT_OPEN)
1560 rc = lmv_enqueue_remote(exp, lock_type, it, lock_mode,
1561 op_data, lockh, lmm, lmmsize,
1562 cb_compl, cb_blocking, cb_data,
1568 lmv_getattr_name(struct obd_export *exp, const struct lu_fid *fid,
1569 struct obd_capa *oc, const char *filename, int namelen,
1570 obd_valid valid, int ea_size, struct ptlrpc_request **request)
1572 struct obd_device *obd = exp->exp_obd;
1573 struct lmv_obd *lmv = &obd->u.lmv;
1574 struct lu_fid rid = *fid;
1575 struct obd_export *tgt_exp;
1576 struct mdt_body *body;
1577 struct lmv_obj *obj;
1581 rc = lmv_check_connect(obd);
1586 LASSERT(++loop <= 2);
1587 obj = lmv_obj_grab(obd, &rid);
1590 /* directory is split. look for right mds for this name */
1591 mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1592 filename, namelen - 1);
1593 rid = obj->lo_inodes[mea_idx].li_fid;
1594 tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds);
1597 tgt_exp = lmv_find_export(lmv, &rid);
1599 if (IS_ERR(tgt_exp))
1600 RETURN(PTR_ERR(tgt_exp));
1602 CDEBUG(D_OTHER, "getattr_name for %*s on "DFID" -> "DFID"\n",
1603 namelen, filename, PFID(fid), PFID(&rid));
1605 rc = md_getattr_name(tgt_exp, &rid, oc, filename, namelen, valid,
1608 body = lustre_msg_buf((*request)->rq_repmsg,
1609 REQ_REC_OFF, sizeof(*body));
1610 LASSERT(body != NULL);
1612 if (body->valid & OBD_MD_MDS) {
1613 struct ptlrpc_request *req = NULL;
1616 CDEBUG(D_OTHER, "request attrs for "DFID"\n",
1619 tgt_exp = lmv_find_export(lmv, &rid);
1620 if (IS_ERR(tgt_exp)) {
1621 ptlrpc_req_finished(*request);
1622 RETURN(PTR_ERR(tgt_exp));
1625 rc = md_getattr_name(tgt_exp, &rid, NULL, NULL, 1,
1626 valid, ea_size, &req);
1627 ptlrpc_req_finished(*request);
1630 } else if (rc == -ERESTART) {
1631 LASSERT(*request != NULL);
1632 DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
1633 "Got -ERESTART during getattr!\n");
1634 ptlrpc_req_finished(*request);
1638 * Directory got split. Time to update local object and repeat
1639 * the request with proper MDS.
1641 rc = lmv_handle_split(exp, &rid);
1649 * llite passes fid of an target inode in op_data->op_fid1 and id of directory in
1652 static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
1653 struct ptlrpc_request **request)
1655 struct obd_device *obd = exp->exp_obd;
1656 struct lmv_obd *lmv = &obd->u.lmv;
1657 struct lmv_obj *obj;
1662 rc = lmv_check_connect(obd);
1667 LASSERT(++loop <= 2);
1668 if (op_data->op_namelen != 0) {
1671 /* Usual link request */
1672 obj = lmv_obj_grab(obd, &op_data->op_fid2);
1674 mea_idx = raw_name2idx(obj->lo_hashtype,
1677 op_data->op_namelen);
1678 op_data->op_fid2 = obj->lo_inodes[mea_idx].li_fid;
1679 mds = obj->lo_inodes[mea_idx].li_mds;
1682 rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds);
1687 CDEBUG(D_OTHER,"link "DFID":%*s to "DFID"\n",
1688 PFID(&op_data->op_fid2), op_data->op_namelen,
1689 op_data->op_name, PFID(&op_data->op_fid1));
1691 rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds);
1695 /* request from MDS to acquire i_links for inode by fid1 */
1696 CDEBUG(D_OTHER, "inc i_nlinks for "DFID"\n",
1697 PFID(&op_data->op_fid1));
1700 CDEBUG(D_OTHER, "forward to MDS #"LPU64" ("DFID")\n",
1701 mds, PFID(&op_data->op_fid1));
1703 op_data->op_fsuid = current->fsuid;
1704 op_data->op_fsgid = current->fsgid;
1705 op_data->op_cap = current->cap_effective;
1707 rc = md_link(lmv->tgts[mds].ltd_exp, op_data, request);
1708 if (rc == -ERESTART) {
1709 LASSERT(*request != NULL);
1710 DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
1711 "Got -ERESTART during link!\n");
1712 ptlrpc_req_finished(*request);
1716 * Directory got split. Time to update local object and repeat
1717 * the request with proper MDS.
1719 rc = lmv_handle_split(exp, &op_data->op_fid2);
1727 static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
1728 const char *old, int oldlen, const char *new, int newlen,
1729 struct ptlrpc_request **request)
1731 struct obd_device *obd = exp->exp_obd;
1732 struct lmv_obd *lmv = &obd->u.lmv;
1733 int rc, mea_idx, loop = 0;
1734 struct lmv_obj *obj;
1738 CDEBUG(D_OTHER, "rename %*s in "DFID" to %*s in "DFID"\n",
1739 oldlen, old, PFID(&op_data->op_fid1),
1740 newlen, new, PFID(&op_data->op_fid2));
1742 rc = lmv_check_connect(obd);
1748 * MDS with old dir entry is asking another MDS to create name
1752 "create %*s(%d/%d) in "DFID" pointing "
1753 "to "DFID"\n", newlen, new, oldlen, newlen,
1754 PFID(&op_data->op_fid2), PFID(&op_data->op_fid1));
1756 rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds);
1761 * Target directory can be split, sowe should forward request to
1764 obj = lmv_obj_grab(obd, &op_data->op_fid2);
1766 mea_idx = raw_name2idx(obj->lo_hashtype,
1768 (char *)new, newlen);
1769 op_data->op_fid2 = obj->lo_inodes[mea_idx].li_fid;
1770 CDEBUG(D_OTHER, "Parent obj "DFID"\n",
1771 PFID(&op_data->op_fid2));
1778 LASSERT(++loop <= 2);
1779 obj = lmv_obj_grab(obd, &op_data->op_fid1);
1782 * directory is already split, so we have to forward request to
1785 mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1786 (char *)old, oldlen);
1787 op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid;
1788 mds = obj->lo_inodes[mea_idx].li_mds;
1789 CDEBUG(D_OTHER, "Parent obj "DFID"\n", PFID(&op_data->op_fid1));
1792 rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds);
1797 obj = lmv_obj_grab(obd, &op_data->op_fid2);
1800 * Directory is already split, so we have to forward request to
1803 mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1804 (char *)new, newlen);
1806 op_data->op_fid2 = obj->lo_inodes[mea_idx].li_fid;
1807 CDEBUG(D_OTHER, "Parent obj "DFID"\n", PFID(&op_data->op_fid2));
1812 op_data->op_fsuid = current->fsuid;
1813 op_data->op_fsgid = current->fsgid;
1814 op_data->op_cap = current->cap_effective;
1816 rc = md_rename(lmv->tgts[mds].ltd_exp, op_data, old, oldlen,
1817 new, newlen, request);
1818 if (rc == -ERESTART) {
1819 LASSERT(*request != NULL);
1820 DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
1821 "Got -ERESTART during rename!\n");
1822 ptlrpc_req_finished(*request);
1826 * Directory got split. Time to update local object and repeat
1827 * the request with proper MDS.
1829 rc = lmv_handle_split(exp, &op_data->op_fid1);
1836 static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
1837 void *ea, int ealen, void *ea2, int ea2len,
1838 struct ptlrpc_request **request)
1840 struct obd_device *obd = exp->exp_obd;
1841 struct lmv_obd *lmv = &obd->u.lmv;
1842 struct ptlrpc_request *req;
1843 struct obd_export *tgt_exp;
1844 struct lmv_obj *obj;
1848 rc = lmv_check_connect(obd);
1852 obj = lmv_obj_grab(obd, &op_data->op_fid1);
1854 CDEBUG(D_OTHER, "SETATTR for "DFID", valid 0x%x%s\n",
1855 PFID(&op_data->op_fid1), op_data->op_attr.ia_valid,
1856 obj ? ", split" : "");
1859 for (i = 0; i < obj->lo_objcount; i++) {
1860 op_data->op_fid1 = obj->lo_inodes[i].li_fid;
1862 tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds);
1863 if (IS_ERR(tgt_exp)) {
1864 rc = PTR_ERR(tgt_exp);
1868 rc = md_setattr(tgt_exp, op_data, ea, ealen,
1871 if (lu_fid_eq(&obj->lo_fid, &obj->lo_inodes[i].li_fid)) {
1873 * this is master object and this request should
1874 * be returned back to llite.
1878 ptlrpc_req_finished(req);
1886 tgt_exp = lmv_find_export(lmv, &op_data->op_fid1);
1887 if (IS_ERR(tgt_exp))
1888 RETURN(PTR_ERR(tgt_exp));
1890 rc = md_setattr(tgt_exp, op_data, ea, ealen, ea2,
1896 static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
1897 struct obd_capa *oc, struct ptlrpc_request **request)
1899 struct obd_device *obd = exp->exp_obd;
1900 struct lmv_obd *lmv = &obd->u.lmv;
1901 struct obd_export *tgt_exp;
1905 rc = lmv_check_connect(obd);
1909 tgt_exp = lmv_find_export(lmv, fid);
1910 if (IS_ERR(tgt_exp))
1911 RETURN(PTR_ERR(tgt_exp));
1913 rc = md_sync(tgt_exp, fid, oc, request);
1917 /* main purpose of LMV blocking ast is to remove split directory LMV
1918 * presentation object (struct lmv_obj) attached to the lock being revoked. */
1919 int lmv_blocking_ast(struct ldlm_lock *lock,
1920 struct ldlm_lock_desc *desc,
1921 void *data, int flag)
1923 struct lustre_handle lockh;
1924 struct lmv_obj *obj;
1929 case LDLM_CB_BLOCKING:
1930 ldlm_lock2handle(lock, &lockh);
1931 rc = ldlm_cli_cancel(&lockh);
1933 CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
1937 case LDLM_CB_CANCELING:
1938 /* time to drop cached attrs for dirobj */
1939 obj = lock->l_ast_data;
1941 CDEBUG(D_OTHER, "cancel %s on "LPU64"/"LPU64
1942 ", master "DFID"\n",
1943 lock->l_resource->lr_name.name[3] == 1 ?
1944 "LOOKUP" : "UPDATE",
1945 lock->l_resource->lr_name.name[0],
1946 lock->l_resource->lr_name.name[1],
1947 PFID(&obj->lo_fid));
1957 static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
1958 struct obd_capa *oc, __u64 offset, struct page *page,
1959 struct ptlrpc_request **request)
1961 struct obd_device *obd = exp->exp_obd;
1962 struct lmv_obd *lmv = &obd->u.lmv;
1963 struct obd_export *tgt_exp;
1964 struct lu_fid rid = *fid;
1965 struct lmv_obj *obj;
1969 rc = lmv_check_connect(obd);
1973 CDEBUG(D_INFO, "READPAGE at %llu from "DFID"\n", offset, PFID(&rid));
1975 obj = lmv_obj_grab(obd, fid);
1977 __u64 index = offset;
1978 __u64 seg = MAX_HASH_SIZE;
1981 LASSERT(obj->lo_objcount > 0);
1982 do_div(seg, obj->lo_objcount);
1983 do_div(index, (__u32)seg);
1985 rid = obj->lo_inodes[i].li_fid;
1986 tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds);
1988 lmv_obj_unlock(obj);
1990 CDEBUG(D_INFO, "forward to "DFID" with offset %lu i %d\n",
1991 PFID(&rid), (unsigned long)offset, i);
1993 tgt_exp = lmv_find_export(lmv, &rid);
1996 if (IS_ERR(tgt_exp))
1997 GOTO(cleanup, rc = PTR_ERR(tgt_exp));
1999 rc = md_readpage(tgt_exp, &rid, oc, offset, page, request);
2002 if (obj && i < obj->lo_objcount - 1) {
2003 struct lu_dirpage *dp;
2006 dp = cfs_page_address(page);
2007 end = le32_to_cpu(dp->ldp_hash_end);
2009 __u64 max_hash = MAX_HASH_SIZE;
2011 do_div(max_hash, obj->lo_objcount);
2012 dp->ldp_hash_end = (__u32)max_hash * (i + 1);
2013 CDEBUG(D_INFO, ""DFID" reset end %lu i %d\n", PFID(&rid),
2014 (unsigned long)dp->ldp_hash_end, i);
2018 * Here we could remove "." and ".." from all pages which at not from
2019 * master. But MDS has only "." and ".." for master dir.
2028 static int lmv_unlink_slaves(struct obd_export *exp,
2029 struct md_op_data *op_data,
2030 struct ptlrpc_request **req)
2032 struct obd_device *obd = exp->exp_obd;
2033 struct lmv_obd *lmv = &obd->u.lmv;
2034 struct lmv_stripe_md *mea = op_data->op_mea1;
2035 struct md_op_data *op_data2;
2036 struct obd_export *tgt_exp;
2040 OBD_ALLOC_PTR(op_data2);
2041 if (op_data2 == NULL)
2044 LASSERT(mea != NULL);
2045 for (i = 0; i < mea->mea_count; i++) {
2046 memset(op_data2, 0, sizeof(*op_data2));
2047 op_data2->op_fid1 = mea->mea_ids[i];
2048 op_data2->op_mode = MDS_MODE_DONT_LOCK | S_IFDIR;
2049 op_data2->op_fsuid = current->fsuid;
2050 op_data2->op_fsgid = current->fsgid;
2051 tgt_exp = lmv_find_export(lmv, &op_data2->op_fid1);
2052 if (IS_ERR(tgt_exp))
2053 GOTO(out_free_op_data2, rc = PTR_ERR(tgt_exp));
2055 if (tgt_exp == NULL)
2058 rc = md_unlink(tgt_exp, op_data2, req);
2060 CDEBUG(D_OTHER, "unlink slave "DFID" -> %d\n",
2061 PFID(&mea->mea_ids[i]), rc);
2064 ptlrpc_req_finished(*req);
2068 GOTO(out_free_op_data2, rc);
2073 OBD_FREE_PTR(op_data2);
2077 static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
2078 struct ptlrpc_request **request)
2080 struct obd_device *obd = exp->exp_obd;
2081 struct lmv_obd *lmv = &obd->u.lmv;
2082 struct obd_export *tgt_exp = NULL;
2086 rc = lmv_check_connect(obd);
2090 if (op_data->op_namelen == 0 && op_data->op_mea1 != NULL) {
2091 /* mds asks to remove slave objects */
2092 rc = lmv_unlink_slaves(exp, op_data, request);
2097 LASSERT(++loop <= 2);
2098 if (op_data->op_namelen != 0) {
2099 struct lmv_obj *obj;
2102 obj = lmv_obj_grab(obd, &op_data->op_fid1);
2104 mea_idx = raw_name2idx(obj->lo_hashtype,
2107 op_data->op_namelen);
2108 op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid;
2109 tgt_exp = lmv_get_export(lmv,
2110 obj->lo_inodes[mea_idx].li_mds);
2112 CDEBUG(D_OTHER, "unlink '%*s' in "DFID" -> %u\n",
2113 op_data->op_namelen, op_data->op_name,
2114 PFID(&op_data->op_fid1), mea_idx);
2117 CDEBUG(D_OTHER, "drop i_nlink on "DFID"\n",
2118 PFID(&op_data->op_fid1));
2120 if (tgt_exp == NULL)
2121 tgt_exp = lmv_find_export(lmv, &op_data->op_fid1);
2122 if (IS_ERR(tgt_exp))
2123 RETURN(PTR_ERR(tgt_exp));
2125 op_data->op_fsuid = current->fsuid;
2126 op_data->op_fsgid = current->fsgid;
2127 op_data->op_cap = current->cap_effective;
2128 rc = md_unlink(tgt_exp, op_data, request);
2129 if (rc == -ERESTART) {
2130 LASSERT(*request != NULL);
2131 DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
2132 "Got -ERESTART during unlink!\n");
2133 ptlrpc_req_finished(*request);
2137 * Directory got split. Time to update local object and repeat
2138 * the request with proper MDS.
2140 rc = lmv_handle_split(exp, &op_data->op_fid1);
2147 static int lmv_llog_init(struct obd_device *obd, struct obd_llogs* llogs,
2148 struct obd_device *tgt, int count,
2149 struct llog_catid *logid, struct obd_uuid *uuid)
2151 struct llog_ctxt *ctxt;
2155 rc = llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
2158 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
2159 ctxt->loc_imp = tgt->u.cli.cl_import;
2165 static int lmv_llog_finish(struct obd_device *obd, int count)
2170 rc = llog_cleanup(llog_get_context(obd, LLOG_CONFIG_REPL_CTXT));
2174 static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2179 case OBD_CLEANUP_EARLY:
2180 /* XXX: here should be calling obd_precleanup() down to
2183 case OBD_CLEANUP_SELF_EXP:
2184 rc = obd_llog_finish(obd, 0);
2186 CERROR("failed to cleanup llogging subsystems\n");
2194 static int lmv_get_info(struct obd_export *exp, __u32 keylen,
2195 void *key, __u32 *vallen, void *val)
2197 struct obd_device *obd;
2198 struct lmv_obd *lmv;
2202 obd = class_exp2obd(exp);
2204 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2205 exp->exp_handle.h_cookie);
2210 if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
2211 struct lmv_tgt_desc *tgts;
2214 rc = lmv_check_connect(obd);
2218 LASSERT(*vallen == sizeof(__u32));
2219 for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count;
2222 /* all tgts should be connected when this get called. */
2223 if (!tgts || !tgts->ltd_exp) {
2224 CERROR("target not setup?\n");
2228 if (!obd_get_info(tgts->ltd_exp, keylen, key,
2233 } else if (KEY_IS(KEY_MAX_EASIZE) || KEY_IS(KEY_CONN_DATA)) {
2234 rc = lmv_check_connect(obd);
2238 /* forwarding this request to first MDS, it should know LOV
2240 rc = obd_get_info(lmv->tgts[0].ltd_exp, keylen, key,
2245 CDEBUG(D_IOCTL, "invalid key\n");
2249 int lmv_set_info_async(struct obd_export *exp, obd_count keylen,
2250 void *key, obd_count vallen, void *val,
2251 struct ptlrpc_request_set *set)
2253 struct lmv_tgt_desc *tgt;
2254 struct obd_device *obd;
2255 struct lmv_obd *lmv;
2259 obd = class_exp2obd(exp);
2261 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2262 exp->exp_handle.h_cookie);
2267 if (KEY_IS(KEY_FLUSH_CTX)) {
2270 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2271 tgt = &lmv->tgts[i];
2276 err = obd_set_info_async(tgt->ltd_exp,
2277 keylen, key, vallen, val, set);
2284 if (KEY_IS("pag")) {
2285 struct obd_export *exp;
2288 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2289 exp = lmv->tgts[i].ltd_exp;
2292 struct obd_device *tgt_obd;
2294 tgt_obd = class_find_client_obd(
2295 &lmv->tgts[i].ltd_uuid,
2298 if (tgt_obd == NULL) {
2299 CERROR("can't find obd %s\n",
2300 lmv->tgts[i].ltd_uuid.uuid);
2303 exp = tgt_obd->obd_self_export;
2306 err = obd_set_info_async(exp, keylen, key, vallen,
2318 int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
2319 struct lov_stripe_md *lsm)
2321 struct obd_device *obd = class_exp2obd(exp);
2322 struct lmv_obd *lmv = &obd->u.lmv;
2323 struct lmv_stripe_md *meap, *lsmp;
2327 mea_size = lmv_get_easize(lmv);
2331 if (*lmmp && !lsm) {
2332 OBD_FREE(*lmmp, mea_size);
2337 if (*lmmp == NULL) {
2338 OBD_ALLOC(*lmmp, mea_size);
2346 lsmp = (struct lmv_stripe_md *)lsm;
2347 meap = (struct lmv_stripe_md *)*lmmp;
2349 if (lsmp->mea_magic != MEA_MAGIC_LAST_CHAR &&
2350 lsmp->mea_magic != MEA_MAGIC_ALL_CHARS)
2353 meap->mea_magic = cpu_to_le32(lsmp->mea_magic);
2354 meap->mea_count = cpu_to_le32(lsmp->mea_count);
2355 meap->mea_master = cpu_to_le32(lsmp->mea_master);
2357 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2358 meap->mea_ids[i] = meap->mea_ids[i];
2359 fid_cpu_to_le(&meap->mea_ids[i], &meap->mea_ids[i]);
2365 int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
2366 struct lov_mds_md *lmm, int lmm_size)
2368 struct obd_device *obd = class_exp2obd(exp);
2369 struct lmv_stripe_md **tmea = (struct lmv_stripe_md **)lsmp;
2370 struct lmv_stripe_md *mea = (struct lmv_stripe_md *)lmm;
2371 struct lmv_obd *lmv = &obd->u.lmv;
2376 mea_size = lmv_get_easize(lmv);
2380 if (*lsmp != NULL && lmm == NULL) {
2381 OBD_FREE(*tmea, mea_size);
2385 LASSERT(mea_size == lmm_size);
2387 OBD_ALLOC(*tmea, mea_size);
2394 if (mea->mea_magic == MEA_MAGIC_LAST_CHAR ||
2395 mea->mea_magic == MEA_MAGIC_ALL_CHARS ||
2396 mea->mea_magic == MEA_MAGIC_HASH_SEGMENT)
2398 magic = le32_to_cpu(mea->mea_magic);
2400 /* old mea is not handled here */
2404 (*tmea)->mea_magic = magic;
2405 (*tmea)->mea_count = le32_to_cpu(mea->mea_count);
2406 (*tmea)->mea_master = le32_to_cpu(mea->mea_master);
2408 for (i = 0; i < (*tmea)->mea_count; i++) {
2409 (*tmea)->mea_ids[i] = mea->mea_ids[i];
2410 fid_le_to_cpu(&(*tmea)->mea_ids[i], &(*tmea)->mea_ids[i]);
2415 static int lmv_cancel_unused(struct obd_export *exp,
2416 const struct lu_fid *fid,
2417 int flags, void *opaque)
2419 struct obd_device *obd = exp->exp_obd;
2420 struct lmv_obd *lmv = &obd->u.lmv;
2424 LASSERT(fid != NULL);
2426 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2427 if (!lmv->tgts[i].ltd_exp || !lmv->tgts[i].ltd_active)
2430 err = md_cancel_unused(lmv->tgts[i].ltd_exp,
2431 fid, flags, opaque);
2438 int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data)
2440 struct obd_device *obd = exp->exp_obd;
2441 struct lmv_obd *lmv = &obd->u.lmv;
2444 RETURN(md_set_lock_data(lmv->tgts[0].ltd_exp, lockh, data));
2447 int lmv_lock_match(struct obd_export *exp, int flags,
2448 const struct lu_fid *fid, ldlm_type_t type,
2449 ldlm_policy_data_t *policy, ldlm_mode_t mode,
2450 struct lustre_handle *lockh)
2452 struct obd_device *obd = exp->exp_obd;
2453 struct lmv_obd *lmv = &obd->u.lmv;
2457 CDEBUG(D_OTHER, "lock match for "DFID"\n", PFID(fid));
2459 /* with CMD every object can have two locks in different namespaces:
2460 * lookup lock in space of mds storing direntry and update/open lock in
2461 * space of mds storing inode. Thus we check all targets, not only that
2462 * one fid was created in. */
2463 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2464 rc = md_lock_match(lmv->tgts[i].ltd_exp, flags, fid,
2465 type, policy, mode, lockh);
2473 int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
2474 int offset, struct obd_export *dt_exp,
2475 struct obd_export *md_exp, struct lustre_md *md)
2477 struct obd_device *obd = exp->exp_obd;
2478 struct lmv_obd *lmv = &obd->u.lmv;
2482 rc = md_get_lustre_md(lmv->tgts[0].ltd_exp, req, offset, dt_exp, md_exp,
2487 int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
2489 struct obd_device *obd = exp->exp_obd;
2490 struct lmv_obd *lmv = &obd->u.lmv;
2493 RETURN(md_free_lustre_md(lmv->tgts[0].ltd_exp, md));
2496 int lmv_set_open_replay_data(struct obd_export *exp,
2497 struct obd_client_handle *och,
2498 struct ptlrpc_request *open_req)
2500 struct obd_device *obd = exp->exp_obd;
2501 struct lmv_obd *lmv = &obd->u.lmv;
2502 struct obd_export *tgt_exp;
2506 tgt_exp = lmv_find_export(lmv, &och->och_fid);
2507 if (IS_ERR(tgt_exp))
2508 RETURN(PTR_ERR(tgt_exp));
2510 RETURN(md_set_open_replay_data(tgt_exp, och, open_req));
2513 int lmv_clear_open_replay_data(struct obd_export *exp,
2514 struct obd_client_handle *och)
2516 struct obd_device *obd = exp->exp_obd;
2517 struct lmv_obd *lmv = &obd->u.lmv;
2518 struct obd_export *tgt_exp;
2521 tgt_exp = lmv_find_export(lmv, &och->och_fid);
2522 if (IS_ERR(tgt_exp))
2523 RETURN(PTR_ERR(tgt_exp));
2525 RETURN(md_clear_open_replay_data(tgt_exp, och));
2528 static int lmv_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid,
2529 struct obd_capa *oc,
2530 struct ptlrpc_request **request)
2532 struct obd_device *obd = exp->exp_obd;
2533 struct lmv_obd *lmv = &obd->u.lmv;
2534 struct obd_export *tgt_exp;
2539 rc = lmv_check_connect(obd);
2543 tgt_exp = lmv_find_export(lmv, fid);
2544 if (IS_ERR(tgt_exp))
2545 RETURN(PTR_ERR(tgt_exp));
2547 rc = md_get_remote_perm(tgt_exp, fid, oc, request);
2552 static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *oc,
2555 struct obd_device *obd = exp->exp_obd;
2556 struct lmv_obd *lmv = &obd->u.lmv;
2557 struct obd_export *tgt_exp;
2561 rc = lmv_check_connect(obd);
2565 tgt_exp = lmv_find_export(lmv, &oc->c_capa.lc_fid);
2566 if (IS_ERR(tgt_exp))
2567 RETURN(PTR_ERR(tgt_exp));
2569 rc = md_renew_capa(tgt_exp, oc, cb);
2573 struct obd_ops lmv_obd_ops = {
2574 .o_owner = THIS_MODULE,
2575 .o_setup = lmv_setup,
2576 .o_cleanup = lmv_cleanup,
2577 .o_precleanup = lmv_precleanup,
2578 .o_process_config = lmv_process_config,
2579 .o_connect = lmv_connect,
2580 .o_disconnect = lmv_disconnect,
2581 .o_statfs = lmv_statfs,
2582 .o_llog_init = lmv_llog_init,
2583 .o_llog_finish = lmv_llog_finish,
2584 .o_get_info = lmv_get_info,
2585 .o_set_info_async = lmv_set_info_async,
2586 .o_packmd = lmv_packmd,
2587 .o_unpackmd = lmv_unpackmd,
2588 .o_notify = lmv_notify,
2589 .o_fid_alloc = lmv_fid_alloc,
2590 .o_fid_delete = lmv_fid_delete,
2591 .o_iocontrol = lmv_iocontrol
2594 struct md_ops lmv_md_ops = {
2595 .m_getstatus = lmv_getstatus,
2596 .m_change_cbdata = lmv_change_cbdata,
2597 .m_close = lmv_close,
2598 .m_create = lmv_create,
2599 .m_done_writing = lmv_done_writing,
2600 .m_enqueue = lmv_enqueue,
2601 .m_getattr = lmv_getattr,
2602 .m_getxattr = lmv_getxattr,
2603 .m_getattr_name = lmv_getattr_name,
2604 .m_intent_lock = lmv_intent_lock,
2606 .m_rename = lmv_rename,
2607 .m_setattr = lmv_setattr,
2608 .m_setxattr = lmv_setxattr,
2610 .m_readpage = lmv_readpage,
2611 .m_unlink = lmv_unlink,
2612 .m_init_ea_size = lmv_init_ea_size,
2613 .m_cancel_unused = lmv_cancel_unused,
2614 .m_set_lock_data = lmv_set_lock_data,
2615 .m_lock_match = lmv_lock_match,
2616 .m_get_lustre_md = lmv_get_lustre_md,
2617 .m_free_lustre_md = lmv_free_lustre_md,
2618 .m_set_open_replay_data = lmv_set_open_replay_data,
2619 .m_clear_open_replay_data = lmv_clear_open_replay_data,
2620 .m_get_remote_perm = lmv_get_remote_perm,
2621 .m_renew_capa = lmv_renew_capa
2624 int __init lmv_init(void)
2626 struct lprocfs_static_vars lvars;
2629 obj_cache = kmem_cache_create("lmv_objects",
2630 sizeof(struct lmv_obj),
2633 CERROR("error allocating lmv objects cache\n");
2637 lprocfs_init_vars(lmv, &lvars);
2638 rc = class_register_type(&lmv_obd_ops, &lmv_md_ops,
2639 lvars.module_vars, LUSTRE_LMV_NAME, NULL);
2641 kmem_cache_destroy(obj_cache);
2647 static void lmv_exit(void)
2649 class_unregister_type(LUSTRE_LMV_NAME);
2651 LASSERTF(kmem_cache_destroy(obj_cache) == 0,
2652 "can't free lmv objects cache, %d object(s)"
2653 "still in use\n", atomic_read(&obj_cache_count));
2656 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2657 MODULE_DESCRIPTION("Lustre Logical Metadata Volume OBD driver");
2658 MODULE_LICENSE("GPL");
2660 module_init(lmv_init);
2661 module_exit(lmv_exit);