From: wangdi Date: Fri, 14 Jul 2006 07:58:22 +0000 (+0000) Subject: Branch: b_new_cmd X-Git-Tag: v1_8_0_110~486^2~1426 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=4ccd21cbc3143372679591fe4a1e7ddba61c04c6;p=fs%2Flustre-release.git Branch: b_new_cmd 1)reorganize MDS to share mds lov api with mdd 2)serval fixes about mdd lov --- diff --git a/lustre/include/lustre_mds.h b/lustre/include/lustre_mds.h index edbf088..b0ef627 100644 --- a/lustre/include/lustre_mds.h +++ b/lustre/include/lustre_mds.h @@ -81,6 +81,15 @@ struct mds_file_data { int mds_reint_rec(struct mds_update_record *r, int offset, struct ptlrpc_request *req, struct lustre_handle *); +int md_lov_connect(struct obd_device *obd, struct md_lov_info *mli, + char *lov_name, struct obd_uuid *uuid, + struct md_lov_ops *mlo, const void *ctxt); +int md_lov_notity_pre(struct obd_device *obd, struct md_lov_info *mli, + struct obd_device *watched, enum obd_notify_event ev, + void *data); +int md_lov_start_synchronize(struct obd_device *obd, struct md_lov_info *mli, + struct obd_device *watched, + void *data, int nonblock, const void *ctxt); /* ioctls for trying requests */ #define IOC_REQUEST_TYPE 'f' #define IOC_REQUEST_MIN_NR 30 diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 83fa845..6838b97 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -402,31 +402,47 @@ struct mgs_obd { struct semaphore mgs_sem; }; +struct md_lov_info; + +struct md_lov_ops { + int (*ml_read_objids)(struct obd_device *obd, struct md_lov_info *mli, + const void *ctxt); + int (*ml_write_objids)(struct obd_device *obd, struct md_lov_info *mli, + const void *ctxt); +}; struct md_lov_info { - struct obd_device *md_osc_obd; /* XXX lov_obd */ + struct obd_device *md_lov_obd; /* XXX lov_obd */ struct obd_uuid md_lov_uuid; - struct obd_export *md_osc_exp; /* XXX lov_exp */ + struct obd_export *md_lov_exp; /* XXX lov_exp */ struct lov_desc md_lov_desc; obd_id *md_lov_objids; int md_lov_objids_size; __u32 md_lov_objids_in_file; unsigned int md_lov_objids_dirty:1; int md_lov_nextid_set; - struct file *md_lov_objid_filp; + void *md_lov_objid_obj; + struct lu_fid md_lov_objid_fid; unsigned long md_lov_objids_valid:1; + int md_lov_max_mdsize; + int md_lov_max_cookiesize; + struct semaphore md_lov_orphan_recovery_sem; + struct md_lov_ops *md_lov_ops; }; -#define mds_osc_obd mds_lov_info.md_osc_obd +#define mds_osc_obd mds_lov_info.md_lov_obd #define mds_lov_uuid mds_lov_info.md_lov_uuid -#define mds_osc_exp mds_lov_info.md_osc_exp +#define mds_osc_exp mds_lov_info.md_lov_exp #define mds_lov_desc mds_lov_info.md_lov_desc #define mds_lov_objids mds_lov_info.md_lov_objids #define mds_lov_objids_size mds_lov_info.md_lov_objids_size #define mds_lov_objids_in_file mds_lov_info.md_lov_objids_in_file #define mds_lov_objids_dirty mds_lov_info.md_lov_objids_dirty #define mds_lov_nextid_set mds_lov_info.md_lov_nextid_set -#define mds_lov_objid_filp mds_lov_info.md_lov_objid_filp +#define mds_lov_objid_filp mds_lov_info.md_lov_objid_obj #define mds_lov_objids_valid mds_lov_info.md_lov_objids_valid +#define mds_max_mdsize mds_lov_info.md_lov_max_mdsize +#define mds_max_cookiesize mds_lov_info.md_lov_max_cookiesize +#define mds_orphan_recovery_sem mds_lov_info.md_lov_orphan_recovery_sem struct mds_obd { /* NB this field MUST be first */ @@ -436,8 +452,6 @@ struct mds_obd { struct ptlrpc_service *mds_readpage_service; struct vfsmount *mds_vfsmnt; cfs_dentry_t *mds_fid_de; - int mds_max_mdsize; - int mds_max_cookiesize; struct file *mds_rcvd_filp; spinlock_t mds_transno_lock; __u64 mds_last_transno; @@ -456,7 +470,6 @@ struct mds_obd { char *mds_profile; struct file *mds_health_check_filp; unsigned long *mds_client_bitmap; - struct semaphore mds_orphan_recovery_sem; struct upcall_cache *mds_group_hash; struct lustre_quota_info mds_quota_info; diff --git a/lustre/mdd/mdd_internal.h b/lustre/mdd/mdd_internal.h index 7a9844b..f66f939 100644 --- a/lustre/mdd/mdd_internal.h +++ b/lustre/mdd/mdd_internal.h @@ -32,26 +32,11 @@ struct dt_device; -struct mdd_lov_info { - struct obd_device *mdd_lov_obd; - struct obd_uuid mdd_lov_uuid; - struct lov_desc mdd_lov_desc; - obd_id *mdd_lov_objids; - int mdd_lov_objids_size; - __u32 mdd_lov_objids_in_file; - int mdd_lov_nextid_set; - struct lu_fid mdd_lov_objid_fid; - struct dt_object *mdd_lov_objid_obj; - unsigned int mdd_lov_objids_dirty:1; -}; - struct mdd_device { struct md_device mdd_md_dev; struct dt_device *mdd_child; - struct mdd_lov_info mdd_lov_info; + struct md_lov_info mdd_lov_info; struct dt_device mdd_lov_dev; - int mdd_max_mdsize; - int mdd_max_cookiesize; struct lu_fid mdd_root_fid; }; diff --git a/lustre/mdd/mdd_lov.c b/lustre/mdd/mdd_lov.c index 2f190d3..6a89c13 100644 --- a/lustre/mdd/mdd_lov.c +++ b/lustre/mdd/mdd_lov.c @@ -44,23 +44,23 @@ #include #include #include +#include #include "mdd_internal.h" static const char mdd_lov_objid_name[] = "lov_objid"; -static int mdd_lov_read_objids(const struct lu_context *ctxt, - struct mdd_device *mdd) +static int mdd_lov_read_objids(struct obd_device *obd, struct md_lov_info *mli, + const void *ctxt) { - struct mdd_lov_info *lov_info = &mdd->mdd_lov_info; - struct dt_object *obj_ids = lov_info->mdd_lov_objid_obj; + struct dt_object *obj_ids = mli->md_lov_objid_obj; struct lu_attr *lu_attr = &mdd_ctx_info(ctxt)->mti_attr; obd_id *ids; int i, rc; ENTRY; - LASSERT(!lov_info->mdd_lov_objids_size); - LASSERT(!lov_info->mdd_lov_objids_dirty); + LASSERT(!mli->md_lov_objids_size); + LASSERT(!mli->md_lov_objids_dirty); /* Read everything in the file, even if our current lov desc has fewer targets. Old targets not in the lov descriptor @@ -77,8 +77,8 @@ static int mdd_lov_read_objids(const struct lu_context *ctxt, if (ids == NULL) RETURN(-ENOMEM); - lov_info->mdd_lov_objids = ids; - lov_info->mdd_lov_objids_size = lu_attr->la_size; + mli->md_lov_objids = ids; + mli->md_lov_objids_size = lu_attr->la_size; #if 0 rc = obj_ids->do_body_ops->dbo_read(ctxt, obj_ids, ids, @@ -88,475 +88,101 @@ static int mdd_lov_read_objids(const struct lu_context *ctxt, RETURN(rc); } #endif - lov_info->mdd_lov_objids_in_file = lu_attr->la_size / sizeof(*ids); + mli->md_lov_objids_in_file = lu_attr->la_size / sizeof(*ids); - for (i = 0; i < lov_info->mdd_lov_objids_in_file; i++) { + for (i = 0; i < mli->md_lov_objids_in_file; i++) { CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n", - lov_info->mdd_lov_objids[i], i); + mli->md_lov_objids[i], i); } out: RETURN(0); } -/* Update the lov desc for a new size lov. */ -static int mdd_lov_update_desc(const struct lu_context *ctxt, - struct mdd_device *mdd) -{ - struct mdd_lov_info *lov_info = &mdd->mdd_lov_info; - __u32 size, stripes, valsize = sizeof(lov_info->mdd_lov_desc); - struct lov_desc *ld; - struct obd_device *lov_obd = lov_info->mdd_lov_obd; - int rc = 0; - ENTRY; - - ld = &mdd_ctx_info(ctxt)->mti_ld; - - rc = obd_get_info(lov_obd->obd_self_export, strlen(KEY_LOVDESC) + 1, - KEY_LOVDESC, &valsize, ld); - if (rc) - GOTO(out, rc); - - /* The size of the LOV target table may have increased. */ - size = ld->ld_tgt_count * sizeof(obd_id); - if ((lov_info->mdd_lov_objids_size == 0) || - (size > lov_info->mdd_lov_objids_size)) { - obd_id *ids; - - /* add room by powers of 2 */ - size = 1; - while (size < ld->ld_tgt_count) - size = size << 1; - size = size * sizeof(obd_id); - - OBD_ALLOC(ids, size); - if (ids == NULL) - GOTO(out, rc = -ENOMEM); - memset(ids, 0, size); - if (lov_info->mdd_lov_objids_size) { - obd_id *old_ids = lov_info->mdd_lov_objids; - memcpy(ids, lov_info->mdd_lov_objids, - lov_info->mdd_lov_objids_size); - lov_info->mdd_lov_objids = ids; - OBD_FREE(old_ids, lov_info->mdd_lov_objids_size); - } - lov_info->mdd_lov_objids = ids; - lov_info->mdd_lov_objids_size = size; - } - - /* Don't change the mds_lov_desc until the objids size matches the - count (paranoia) */ - lov_info->mdd_lov_desc = *ld; - CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n", - lov_info->mdd_lov_desc.ld_tgt_count); - - stripes = min((__u32)LOV_MAX_STRIPE_COUNT, - max(lov_info->mdd_lov_desc.ld_tgt_count, - lov_info->mdd_lov_objids_in_file)); - mdd->mdd_max_mdsize = lov_mds_md_size(stripes); - mdd->mdd_max_cookiesize = stripes * sizeof(struct llog_cookie); - CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize: %d/%d\n", - mdd->mdd_max_mdsize, mdd->mdd_max_cookiesize); -out: - RETURN(rc); -} - -int mdd_lov_write_objids(const struct lu_context *ctxt, - struct mdd_lov_info *lov_info) +int mdd_lov_write_objids(struct obd_device *obd, struct md_lov_info *mli, + const void *ctxt) { int i, rc = 0, tgts; ENTRY; - if (!lov_info->mdd_lov_objids_dirty) + if (!mli->md_lov_objids_dirty) RETURN(0); - tgts = max(lov_info->mdd_lov_desc.ld_tgt_count, - lov_info->mdd_lov_objids_in_file); + tgts = max(mli->md_lov_desc.ld_tgt_count, + mli->md_lov_objids_in_file); if (!tgts) RETURN(0); for (i = 0; i < tgts; i++) CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n", - lov_info->mdd_lov_objids[i], i); + mli->md_lov_objids[i], i); #if 0 rc = ids_obj->do_body_ops->dbo_write(ctxt, ids_obj, - lov_info->mdd_lov_objids, + mli->mdd_lov_objids, tgts * sizeof(obd_id), &off); if (rc >= 0) { - lov_info->mdd_lov_objids_dirty = 0; + mli->mdd_lov_objids_dirty = 0; rc = 0; } #endif RETURN(rc); } -static int mdd_lov_connect(const struct lu_context *ctxt, - struct mdd_device *mdd, char *lov_name) -{ - struct mdd_lov_info *lov_info = &mdd->mdd_lov_info; - struct lustre_handle conn = {0,}; - struct obd_connect_data *data; - int rc = 0; - ENTRY; - - /*connect to obd*/ - OBD_ALLOC(data, sizeof(*data)); - if (data == NULL) - RETURN(-ENOMEM); - data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX | - OBD_CONNECT_REQPORTAL; - data->ocd_version = LUSTRE_VERSION_CODE; - /* NB: lov_connect() needs to fill in .ocd_index for each OST */ - rc = obd_connect(&conn, lov_info->mdd_lov_obd, &lov_info->mdd_lov_uuid, - data); - OBD_FREE(data, sizeof(*data)); - if (rc) { - CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc); - lov_info->mdd_lov_obd = ERR_PTR(rc); - RETURN(rc); - } - - /* open and test the lov objd file */ - - rc = mdd_lov_read_objids(ctxt, mdd); - if (rc) { - CERROR("cannot read %s: rc = %d\n", "lov_objids", rc); - GOTO(out, rc); - } - - rc = mdd_lov_update_desc(ctxt, mdd); - if (rc) - GOTO(out, rc); -#if 0 - /* tgt_count may be 0! */ - rc = llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count); - if (rc) { - CERROR("failed to initialize catalog %d\n", rc); - GOTO(err_reg, rc); - } -#endif - /* If we're mounting this code for the first time on an existing FS, - * we need to populate the objids array from the real OST values */ - if (lov_info->mdd_lov_desc.ld_tgt_count > - lov_info->mdd_lov_objids_in_file) { - int size = sizeof(obd_id) * lov_info->mdd_lov_desc.ld_tgt_count; - int i; - - rc = obd_get_info(lov_info->mdd_lov_obd->obd_self_export, - strlen("last_id"), "last_id", &size, - lov_info->mdd_lov_objids); - if (!rc) { - for (i = 0; i < lov_info->mdd_lov_desc.ld_tgt_count; i++) - CWARN("got last object "LPU64" from OST %d\n", - lov_info->mdd_lov_objids[i], i); - lov_info->mdd_lov_objids_dirty = 1; - rc = mdd_lov_write_objids(ctxt, lov_info); - if (rc) - CERROR("got last objids from OSTs, but error " - "writing objids file: %d\n", rc); - } - } - /* I want to see a callback happen when the OBD moves to a - * "For General Use" state, and that's when we'll call - * set_nextid(). The class driver can help us here, because - * it can use the obd_recovering flag to determine when the - * the OBD is full available. */ -#if 0 - if (!obd->obd_recovering) - rc = mds_postrecov(obd); -#endif -out: - if (rc) - obd_disconnect(lov_info->mdd_lov_obd->obd_self_export); - RETURN(rc); -} +struct md_lov_ops mdd_lov_ops = { + .ml_read_objids = mdd_lov_read_objids, + .ml_write_objids = mdd_lov_write_objids, +}; int mdd_lov_fini(const struct lu_context *ctxt, struct mdd_device *mdd) { - struct mdd_lov_info *lov_info = &mdd->mdd_lov_info; + struct md_lov_info *mli = &mdd->mdd_lov_info; - dt_object_fini(lov_info->mdd_lov_objid_obj); + obd_register_observer(mli->md_lov_obd, NULL); + + if (mli->md_lov_exp) { + obd_disconnect(mli->md_lov_exp); + mli->md_lov_exp = NULL; + } + + dt_object_fini(mli->md_lov_objid_obj); return 0; } int mdd_lov_init(const struct lu_context *ctxt, struct mdd_device *mdd, struct lustre_cfg *cfg) { - struct mdd_lov_info *lov_info = &mdd->mdd_lov_info; + struct md_lov_info *lov_info = &mdd->mdd_lov_info; struct dt_object *obj_id; struct obd_device *obd = NULL; char *lov_name = NULL, *srv = NULL; int rc = 0; ENTRY; - if (IS_ERR(lov_info->mdd_lov_obd)) - RETURN(PTR_ERR(lov_info->mdd_lov_obd)); + if (IS_ERR(lov_info->md_lov_obd)) + RETURN(PTR_ERR(lov_info->md_lov_obd)); lov_name = lustre_cfg_string(cfg, 3); LASSERTF(lov_name != NULL, "MDD need lov \n"); - lov_info->mdd_lov_obd = class_name2obd(lov_name); - if (!lov_info->mdd_lov_obd) { - CERROR("MDS cannot locate LOV %s\n", lov_name); - lov_info->mdd_lov_obd = ERR_PTR(-ENOTCONN); - RETURN(-ENOTCONN); - } obj_id = dt_store_open(ctxt, mdd->mdd_child, mdd_lov_objid_name, - &lov_info->mdd_lov_objid_fid); + &lov_info->md_lov_objid_fid); if (IS_ERR(obj_id)){ rc = PTR_ERR(obj_id); RETURN(rc); } LASSERT(obj_id != NULL); - lov_info->mdd_lov_objid_obj = obj_id; - - obd_str2uuid(&lov_info->mdd_lov_uuid, lustre_cfg_string(cfg, 1)); + lov_info->md_lov_objid_obj = obj_id; - rc = mdd_lov_connect(ctxt, mdd, lov_name); - if (rc) - GOTO(out, rc); - - /*register the obd server for lov*/ srv = lustre_cfg_string(cfg, 0); obd = class_name2obd(srv); if (obd == NULL) { CERROR("No such OBD %s\n", srv); LBUG(); } - rc = obd_register_observer(lov_info->mdd_lov_obd, obd); - if (rc) { - CERROR("MDS cannot register as observer of LOV %s (%d)\n", - lov_name, rc); - GOTO(out, rc); - } - EXIT; -out: + rc = md_lov_connect(obd, lov_info, lov_name, + &obd->obd_uuid, &mdd_lov_ops, ctxt); if (rc) mdd_lov_fini(ctxt, mdd); - return rc; -} - -/* update the LOV-OSC knowledge of the last used object id's */ -int mdd_lov_set_nextid(struct mdd_device *mdd) -{ - struct mdd_lov_info *lov_info = &mdd->mdd_lov_info; - int rc; - ENTRY; - - LASSERT(lov_info->mdd_lov_objids != NULL); - - rc = obd_set_info_async(lov_info->mdd_lov_obd->obd_self_export, - strlen(KEY_NEXT_ID), KEY_NEXT_ID, - lov_info->mdd_lov_desc.ld_tgt_count, - lov_info->mdd_lov_objids, NULL); - - if (rc) - CERROR ("mdd_lov_set_nextid failed (%d)\n", rc); - - RETURN(rc); -} - -struct mdd_lov_sync_info { - struct lu_context *mlsi_ctxt; - struct lu_device *mlsi_ld; /* the lov device to sync */ - struct obd_device *mlsi_watched; /* target osc */ - __u32 mlsi_index; /* index of target */ -}; - -#define MDSLOV_NO_INDEX -1 - -/* Inform MDS about new/updated target */ -static int mdd_lov_update_mds(struct lu_context *ctxt, - struct lu_device *ld, - struct obd_device *watched, - __u32 idx) -{ - struct mdd_device *mdd = lu2mdd_dev(ld); - struct mdd_lov_info *lov_info = &mdd->mdd_lov_info; - int old_count; - int rc = 0; - ENTRY; - - old_count = lov_info->mdd_lov_desc.ld_tgt_count; - rc = mdd_lov_update_desc(ctxt, mdd); - if (rc) - RETURN(rc); - - /* - * idx is set as data from lov_notify. - * XXX did not consider recovery here - */ - if (idx != MDSLOV_NO_INDEX) { - if (idx >= lov_info->mdd_lov_desc.ld_tgt_count) { - CERROR("index %d > count %d!\n", idx, - lov_info->mdd_lov_desc.ld_tgt_count); - RETURN(-EINVAL); - } - - if (idx >= lov_info->mdd_lov_objids_in_file) { - /* We never read this lastid; ask the osc */ - obd_id lastid; - __u32 size = sizeof(lastid); - rc = obd_get_info(watched->obd_self_export, - strlen("last_id"), - "last_id", &size, &lastid); - if (rc) - RETURN(rc); - lov_info->mdd_lov_objids[idx] = lastid; - lov_info->mdd_lov_objids_dirty = 1; - mdd_lov_write_objids(ctxt, lov_info); - } else { - /* We have read this lastid from disk; tell the osc. - Don't call this during recovery. */ - rc = mdd_lov_set_nextid(mdd); - } - - CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n", - lov_info->mdd_lov_objids[idx], idx); - } - - RETURN(rc); -} - -static int mdd_lov_clear_orphans(struct mdd_lov_info *mli, - struct obd_uuid *ost_uuid) -{ - int rc; - struct obdo oa; - struct obd_trans_info oti = {0}; - struct lov_stripe_md *empty_ea = NULL; - ENTRY; - - LASSERT(mli->mdd_lov_objids != NULL); - - /* This create will in fact either create or destroy: If the OST is - * missing objects below this ID, they will be created. If it finds - * objects above this ID, they will be removed. */ - memset(&oa, 0, sizeof(oa)); - oa.o_valid = OBD_MD_FLFLAGS; - oa.o_flags = OBD_FL_DELORPHAN; - if (ost_uuid != NULL) { - memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid)); - oa.o_valid |= OBD_MD_FLINLINE; - } - rc = obd_create(mli->mdd_lov_obd->obd_self_export, &oa, - &empty_ea, &oti); - - RETURN(rc); -} - -/* We only sync one osc at a time, so that we don't have to hold - any kind of lock on the whole mds_lov_desc, which may change - (grow) as a result of mds_lov_add_ost. This also avoids any - kind of mismatch between the lov_desc and the mds_lov_desc, - which are not in lock-step during lov_add_obd */ -static int __mdd_lov_synchronize(void *data) -{ - struct mdd_lov_sync_info *mlsi = data; - struct lu_device *ld = mlsi->mlsi_ld; - struct obd_device *watched = mlsi->mlsi_watched; - struct lu_context *ctxt = mlsi->mlsi_ctxt; - struct mdd_device *mdd = lu2mdd_dev(ld); - struct obd_uuid *uuid; - __u32 idx = mlsi->mlsi_index; - int rc = 0; - ENTRY; - - OBD_FREE(mlsi, sizeof(*mlsi)); - - LASSERT(ld); - LASSERT(watched); - uuid = &watched->u.cli.cl_target_uuid; - LASSERT(uuid); - - rc = mdd_lov_update_mds(ctxt, ld, watched, idx); - if (rc != 0) - GOTO(out, rc); - - rc = obd_set_info_async(mdd->mdd_lov_info.mdd_lov_obd->obd_self_export, - strlen(KEY_MDS_CONN), KEY_MDS_CONN, 0, uuid, - NULL); - if (rc != 0) { - CERROR("failed at obd_set_info_async: %d\n", rc); - GOTO(out, rc); - } - - rc = mdd_lov_clear_orphans(&mdd->mdd_lov_info, uuid); - if (rc != 0) { - CERROR("failed at mds_lov_clear_orphans: %d\n", rc); - GOTO(out, rc); - } -out: - EXIT; - lu_device_put(ld); - return rc; -} - -int mdd_lov_synchronize(void *data) -{ - struct mdd_lov_sync_info *mlsi = data; - char name[20]; - - sprintf(name, "ll_mlov_sync_%02u", mlsi->mlsi_index); - ptlrpc_daemonize(name); - - RETURN(__mdd_lov_synchronize(data)); -} - -int mdd_lov_start_synchronize(const struct lu_context *ctxt, - struct lu_device *ld, - struct obd_device *watched, - void *data, int nonblock) -{ - struct mdd_lov_sync_info *mlsi; - int rc; - - ENTRY; - - LASSERT(watched); - - OBD_ALLOC(mlsi, sizeof(*mlsi)); - if (mlsi == NULL) - RETURN(-ENOMEM); - - mlsi->mlsi_ctxt = (struct lu_context *)ctxt; - mlsi->mlsi_ld = ld; - mlsi->mlsi_watched = watched; - if (data) - mlsi->mlsi_index = *(__u32 *)data; - else - mlsi->mlsi_index = MDSLOV_NO_INDEX; - - /* Although class_export_get(obd->obd_self_export) would lock - the MDS in place, since it's only a self-export - it doesn't lock the LOV in place. The LOV can be disconnected - during MDS precleanup, leaving nothing for __mdd_lov_synchronize. - Simply taking an export ref on the LOV doesn't help, because it's - still disconnected. Taking an obd reference insures that we don't - disconnect the LOV. This of course means a cleanup won't - finish for as long as the sync is blocking. */ - lu_device_get(ld); -#if 0 - if (nonblock) { - /* Synchronize in the background */ - rc = cfs_kernel_thread(mdd_lov_synchronize, mlsi, - CLONE_VM | CLONE_FILES); - if (rc < 0) { - CERROR("error starting mdd_lov_synchronize: %d\n", rc); - lu_device_put(ld); - } else { - CDEBUG(D_HA, "mdd_lov_synchronize idx=%d thread=%d\n", - mlsi->mlsi_index, rc); - rc = 0; - } - } else { - rc = __mdd_lov_synchronize((void *)mlsi); - } -#else - /*FIXME: Did not implement the nonblock lov sync here. because ctxt can not - * be shared, maybe we need ref_count for ctxt */ - rc = __mdd_lov_synchronize((void *)mlsi); -#endif RETURN(rc); } @@ -569,38 +195,16 @@ int mdd_notify(const struct lu_context *ctxt, struct lu_device *ld, int rc = 0; ENTRY; - switch (ev) { - /* We only handle these: */ - case OBD_NOTIFY_ACTIVE: - case OBD_NOTIFY_SYNC: - case OBD_NOTIFY_SYNC_NONBLOCK: - break; - default: - RETURN(0); - } - - CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev); - - if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) { - CERROR("unexpected notification of %s %s!\n", - watched->obd_type->typ_name, watched->obd_name); - RETURN(-EINVAL); - } - - /*FIXME later, Recovery stuff still not be designed */ - if (obd->obd_recovering) { - CWARN("MDS %s: in recovery, not resetting orphans on %s\n", - obd->obd_name, - obd_uuid2str(&watched->u.cli.cl_target_uuid)); - /* We still have to fix the lov descriptor for ost's added - after the mdt in the config log. They didn't make it into - mds_lov_connect. */ - rc = mdd_lov_update_desc(ctxt, mdd); + rc = md_lov_notity_pre(obd, &mdd->mdd_lov_info, watched, ev, data); + if (rc) { + if (rc == -ENOENT || rc == -EBUSY) + rc = 0; RETURN(rc); } - rc = mdd_lov_start_synchronize(ctxt, ld, watched, data, - !(ev == OBD_NOTIFY_SYNC)); + rc = md_lov_start_synchronize(obd, &mdd->mdd_lov_info, watched, data, + !(ev == OBD_NOTIFY_SYNC), ctxt); + RETURN(rc); } @@ -651,7 +255,7 @@ int mdd_lov_set_md(const struct lu_context *ctxt, struct md_object *pobj, int mdd_lov_create(const struct lu_context *ctxt, struct mdd_device *mdd, struct mdd_object *child) { - struct mdd_lov_info *mli = &mdd->mdd_lov_info; + struct md_lov_info *mli = &mdd->mdd_lov_info; struct obdo *oa; struct lov_mds_md *lmm = NULL; struct lov_stripe_md *lsm = NULL; @@ -667,19 +271,18 @@ int mdd_lov_create(const struct lu_context *ctxt, struct mdd_device *mdd, OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID; oa->o_size = 0; - rc = obd_create(mli->mdd_lov_obd->obd_self_export, oa, &lsm, NULL); + rc = obd_create(mli->md_lov_exp, oa, &lsm, NULL); if (rc) GOTO(out_oa, rc); - rc = obd_packmd(mli->mdd_lov_obd->obd_self_export, &lmm, lsm); + rc = obd_packmd(mli->md_lov_exp, &lmm, lsm); if (rc < 0) { CERROR("cannot pack lsm, err = %d\n", rc); GOTO(out_oa, rc); } lmm_size = rc; - - rc = mdd_xattr_set(ctxt, &child->mod_obj, lmm, lmm_size, - MDS_LOV_MD_NAME); + rc = 0; + /*FIXME: did not set MD here */ out_oa: obdo_free(oa); RETURN(rc); diff --git a/lustre/mds/mds_fs.c b/lustre/mds/mds_fs.c index 124d178..82214b0 100644 --- a/lustre/mds/mds_fs.c +++ b/lustre/mds/mds_fs.c @@ -555,7 +555,8 @@ err_health_check: filp_close(mds->mds_health_check_filp, 0)) CERROR("can't close %s after error\n", HEALTH_CHECK); err_lov_objid: - if (mds->mds_lov_objid_filp && filp_close(mds->mds_lov_objid_filp, 0)) + if (mds->mds_lov_objid_filp && + filp_close((struct file *)mds->mds_lov_objid_filp, 0)) CERROR("can't close %s after error\n", LOV_OBJID); err_client: class_disconnect_exports(obd); @@ -595,7 +596,7 @@ int mds_fs_cleanup(struct obd_device *obd) CERROR("%s file won't close, rc=%d\n", LAST_RCVD, rc); } if (mds->mds_lov_objid_filp) { - rc = filp_close(mds->mds_lov_objid_filp, 0); + rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0); mds->mds_lov_objid_filp = NULL; if (rc) CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc); diff --git a/lustre/mds/mds_internal.h b/lustre/mds/mds_internal.h index f67a19f..3a61aa9 100644 --- a/lustre/mds/mds_internal.h +++ b/lustre/mds/mds_internal.h @@ -183,9 +183,10 @@ int mds_llog_finish(struct obd_device *obd, int count); /* mds/mds_lov.c */ int mds_lov_connect(struct obd_device *obd, char * lov_name); int mds_lov_disconnect(struct obd_device *obd); -int mds_lov_write_objids(struct obd_device *obd); -void mds_lov_update_objids(struct obd_device *obd, obd_id *ids); +int mds_lov_write_objids(struct obd_device *obd, struct md_lov_info *mli, + const void *ctxt); int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid); +void mds_lov_update_objids(struct obd_device *obd, obd_id *ids); int mds_lov_set_nextid(struct obd_device *obd); int mds_lov_start_synchronize(struct obd_device *obd, struct obd_device *watched, diff --git a/lustre/mds/mds_lov.c b/lustre/mds/mds_lov.c index ed4296b..a9b2c32 100644 --- a/lustre/mds/mds_lov.c +++ b/lustre/mds/mds_lov.c @@ -53,7 +53,6 @@ void md_lov_info_update_objids(struct md_lov_info *mli, obd_id *ids) } unlock_kernel(); } -EXPORT_SYMBOL(md_lov_info_update_objids); void mds_lov_update_objids(struct obd_device *obd, obd_id *ids) { @@ -62,77 +61,83 @@ void mds_lov_update_objids(struct obd_device *obd, obd_id *ids) md_lov_info_update_objids(&mds->mds_lov_info, ids); } -static int mds_lov_read_objids(struct obd_device *obd) +static int mds_lov_read_objids(struct obd_device *obd, struct md_lov_info *mli, + const void *ctxt) { - struct mds_obd *mds = &obd->u.mds; + struct file *filp = (struct file *)mli->md_lov_objid_obj; obd_id *ids; loff_t off = 0; int i, rc, size; ENTRY; - LASSERT(!mds->mds_lov_objids_size); - LASSERT(!mds->mds_lov_objids_dirty); + LASSERT(!mli->md_lov_objids_size); + LASSERT(!mli->md_lov_objids_dirty); /* Read everything in the file, even if our current lov desc has fewer targets. Old targets not in the lov descriptor during mds setup may still have valid objids. */ - size = mds->mds_lov_objid_filp->f_dentry->d_inode->i_size; + size = filp->f_dentry->d_inode->i_size; if (size == 0) RETURN(0); OBD_ALLOC(ids, size); if (ids == NULL) RETURN(-ENOMEM); - mds->mds_lov_objids = ids; - mds->mds_lov_objids_size = size; + mli->md_lov_objids = ids; + mli->md_lov_objids_size = size; - rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, ids, size, &off); + rc = fsfilt_read_record(obd, filp, ids, size, &off); if (rc < 0) { CERROR("Error reading objids %d\n", rc); RETURN(rc); } - mds->mds_lov_objids_in_file = size / sizeof(*ids); + mli->md_lov_objids_in_file = size / sizeof(*ids); - for (i = 0; i < mds->mds_lov_objids_in_file; i++) { + for (i = 0; i < mli->md_lov_objids_in_file; i++) { CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n", - mds->mds_lov_objids[i], i); + mli->md_lov_objids[i], i); } RETURN(0); } -int mds_lov_write_objids(struct obd_device *obd) +int mds_lov_write_objids(struct obd_device *obd, struct md_lov_info *mli, + const void *ctxt) { - struct mds_obd *mds = &obd->u.mds; + struct file *filp = (struct file *)mli->md_lov_objid_obj; loff_t off = 0; int i, rc, tgts; ENTRY; - if (!mds->mds_lov_objids_dirty) + if (!mli->md_lov_objids_dirty) RETURN(0); - tgts = max(mds->mds_lov_desc.ld_tgt_count, mds->mds_lov_objids_in_file); + tgts = max(mli->md_lov_desc.ld_tgt_count, mli->md_lov_objids_in_file); if (!tgts) RETURN(0); for (i = 0; i < tgts; i++) CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n", - mds->mds_lov_objids[i], i); + mli->md_lov_objids[i], i); - rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, - mds->mds_lov_objids, tgts * sizeof(obd_id), + rc = fsfilt_write_record(obd, filp, mli->md_lov_objids, + tgts * sizeof(obd_id), &off, 0); if (rc >= 0) { - mds->mds_lov_objids_dirty = 0; + mli->md_lov_objids_dirty = 0; rc = 0; } RETURN(rc); } -int md_lov_info_clear_orphans(struct md_lov_info *mli, - struct obd_uuid *ost_uuid) +struct md_lov_ops mli_ops = { + .ml_read_objids = mds_lov_read_objids, + .ml_write_objids = mds_lov_write_objids, +}; + +int md_lov_clear_orphans(struct md_lov_info *mli, struct obd_uuid *ost_uuid) { int rc; struct obdo oa; @@ -152,54 +157,44 @@ int md_lov_info_clear_orphans(struct md_lov_info *mli, memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid)); oa.o_valid |= OBD_MD_FLINLINE; } - rc = obd_create(mli->md_osc_exp, &oa, &empty_ea, &oti); + rc = obd_create(mli->md_lov_exp, &oa, &empty_ea, &oti); RETURN(rc); } -EXPORT_SYMBOL(md_lov_info_clear_orphans); int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid) { - return md_lov_info_clear_orphans(&mds->mds_lov_info, ost_uuid); + return md_lov_clear_orphans(&mds->mds_lov_info, ost_uuid); } -int md_lov_info_set_nextid(struct md_lov_info *mli) +/* update the LOV-OSC knowledge of the last used object id's */ +static int md_lov_info_set_nextid(struct obd_device *obd, + struct md_lov_info *mli) { int rc; ENTRY; - + + LASSERT(!obd->obd_recovering); LASSERT(mli->md_lov_objids != NULL); - rc = obd_set_info_async(mli->md_osc_exp, strlen(KEY_NEXT_ID), + rc = obd_set_info_async(mli->md_lov_exp, strlen(KEY_NEXT_ID), KEY_NEXT_ID, mli->md_lov_desc.ld_tgt_count, mli->md_lov_objids, NULL); - RETURN(rc); - } -EXPORT_SYMBOL(md_lov_info_set_nextid); -/* update the LOV-OSC knowledge of the last used object id's */ int mds_lov_set_nextid(struct obd_device *obd) { - struct mds_obd *mds = &obd->u.mds; - int rc; - - LASSERT(!obd->obd_recovering); - - rc = md_lov_info_set_nextid(&mds->mds_lov_info); - if (rc) - CERROR ("%s: mds_lov_set_nextid failed (%d)\n", - obd->obd_name, rc); - RETURN(rc); + struct md_lov_info *mli = &obd->u.mds.mds_lov_info; + return md_lov_info_set_nextid(obd, mli); } int md_lov_info_update_desc(struct md_lov_info *mli, struct obd_export *lov) { struct lov_desc *ld; - __u32 size, valsize = sizeof(mli->md_lov_desc); + __u32 size, stripes, valsize = sizeof(mli->md_lov_desc); int rc = 0; ENTRY; @@ -244,65 +239,50 @@ int md_lov_info_update_desc(struct md_lov_info *mli, struct obd_export *lov) mli->md_lov_desc = *ld; CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n", mli->md_lov_desc.ld_tgt_count); -out: - OBD_FREE(ld, sizeof(*ld)); - RETURN(rc); -} - -/* Update the lov desc for a new size lov. */ -static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov) -{ - struct mds_obd *mds = &obd->u.mds; - __u32 stripes; - int rc = 0; - ENTRY; - rc = md_lov_info_update_desc(&mds->mds_lov_info, lov); - if (rc) - GOTO(out, rc); - stripes = min((__u32)LOV_MAX_STRIPE_COUNT, - max(mds->mds_lov_desc.ld_tgt_count, - mds->mds_lov_objids_in_file)); - mds->mds_max_mdsize = lov_mds_md_size(stripes); - mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie); + max(mli->md_lov_desc.ld_tgt_count, + mli->md_lov_objids_in_file)); + + mli->md_lov_max_mdsize = lov_mds_md_size(stripes); + mli->md_lov_max_cookiesize = stripes * sizeof(struct llog_cookie); CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize: %d/%d\n", - mds->mds_max_mdsize, mds->mds_max_cookiesize); + mli->md_lov_max_mdsize, mli->md_lov_max_cookiesize); + out: + OBD_FREE(ld, sizeof(*ld)); RETURN(rc); } - #define MDSLOV_NO_INDEX -1 - /* Inform MDS about new/updated target */ -static int mds_lov_update_mds(struct obd_device *obd, +static int mds_lov_update_mds(struct obd_device *obd, + struct md_lov_info *mli, struct obd_device *watched, - __u32 idx) + __u32 idx, const void *ctxt) { - struct mds_obd *mds = &obd->u.mds; int old_count; - int rc = 0; + int rc; ENTRY; - old_count = mds->mds_lov_desc.ld_tgt_count; - rc = mds_lov_update_desc(obd, mds->mds_osc_exp); + old_count = mli->md_lov_desc.ld_tgt_count; + rc = md_lov_info_update_desc(mli, mli->md_lov_exp); if (rc) RETURN(rc); CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n", - idx, obd->obd_recovering, obd->obd_async_recov, old_count, - mds->mds_lov_desc.ld_tgt_count); + idx, obd->obd_recovering, obd->obd_async_recov, old_count, + mli->md_lov_desc.ld_tgt_count); /* idx is set as data from lov_notify. */ if (idx != MDSLOV_NO_INDEX && !obd->obd_recovering) { - if (idx >= mds->mds_lov_desc.ld_tgt_count) { + if (idx >= mli->md_lov_desc.ld_tgt_count) { CERROR("index %d > count %d!\n", idx, - mds->mds_lov_desc.ld_tgt_count); + mli->md_lov_desc.ld_tgt_count); RETURN(-EINVAL); } - if (idx >= mds->mds_lov_objids_in_file) { + if (idx >= mli->md_lov_objids_in_file) { /* We never read this lastid; ask the osc */ obd_id lastid; __u32 size = sizeof(lastid); @@ -311,113 +291,143 @@ static int mds_lov_update_mds(struct obd_device *obd, "last_id", &size, &lastid); if (rc) RETURN(rc); - mds->mds_lov_objids[idx] = lastid; - mds->mds_lov_objids_dirty = 1; - mds_lov_write_objids(obd); + mli->md_lov_objids[idx] = lastid; + mli->md_lov_objids_dirty = 1; + mli->md_lov_ops->ml_write_objids(obd, mli, ctxt); } else { /* We have read this lastid from disk; tell the osc. Don't call this during recovery. */ - rc = mds_lov_set_nextid(obd); + rc = md_lov_info_set_nextid(obd, mli); } CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n", - mds->mds_lov_objids[idx], idx); + mli->md_lov_objids[idx], idx); } - +#if 0 + /*FIXME: Do not support llog in mdd, so disable this temporarily*/ /* If we added a target we have to reconnect the llogs */ /* Only do this at first add (idx), or the first time after recovery */ if (idx != MDSLOV_NO_INDEX || 1/*FIXME*/) { CDEBUG(D_CONFIG, "reset llogs idx=%d\n", idx); /* These two must be atomic */ - down(&mds->mds_orphan_recovery_sem); + down(&mli->md_lov_orphan_recovery_sem); obd_llog_finish(obd, old_count); - llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count); - up(&mds->mds_orphan_recovery_sem); + llog_cat_initialize(obd, mli->md_lov_desc.ld_tgt_count); + up(&mli->md_lov_orphan_recovery_sem); } - +#endif RETURN(rc); } -/* update the LOV-OSC knowledge of the last used object id's */ -int mds_lov_connect(struct obd_device *obd, char * lov_name) +int md_lov_connect(struct obd_device *obd, struct md_lov_info *mli, + char *lov_name, struct obd_uuid *uuid, + struct md_lov_ops *mlo, const void *ctxt) { - struct mds_obd *mds = &obd->u.mds; struct lustre_handle conn = {0,}; struct obd_connect_data *data; - int rc, i; - ENTRY; + int rc; - if (IS_ERR(mds->mds_osc_obd)) - RETURN(PTR_ERR(mds->mds_osc_obd)); + if (IS_ERR(mli->md_lov_obd)) + RETURN(PTR_ERR(mli->md_lov_obd)); - if (mds->mds_osc_obd) + if (mli->md_lov_obd) RETURN(0); - mds->mds_osc_obd = class_name2obd(lov_name); - if (!mds->mds_osc_obd) { + mli->md_lov_obd = class_name2obd(lov_name); + if (!mli->md_lov_obd) { CERROR("MDS cannot locate LOV %s\n", lov_name); - mds->mds_osc_obd = ERR_PTR(-ENOTCONN); + mli->md_lov_obd = ERR_PTR(-ENOTCONN); RETURN(-ENOTCONN); } + mli->md_lov_ops = mlo; + OBD_ALLOC(data, sizeof(*data)); if (data == NULL) RETURN(-ENOMEM); data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX | OBD_CONNECT_REQPORTAL; data->ocd_version = LUSTRE_VERSION_CODE; + /* NB: lov_connect() needs to fill in .ocd_index for each OST */ - rc = obd_connect(&conn, mds->mds_osc_obd, &obd->obd_uuid, data); + rc = obd_connect(&conn, mli->md_lov_obd, uuid, data); OBD_FREE(data, sizeof(*data)); if (rc) { CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc); - mds->mds_osc_obd = ERR_PTR(rc); - RETURN(rc); + GOTO(out, rc); } - mds->mds_osc_exp = class_conn2export(&conn); + mli->md_lov_exp = class_conn2export(&conn); - rc = obd_register_observer(mds->mds_osc_obd, obd); + rc = obd_register_observer(mli->md_lov_obd, obd); if (rc) { CERROR("MDS cannot register as observer of LOV %s (%d)\n", lov_name, rc); - GOTO(err_discon, rc); + GOTO(out, rc); } - - rc = mds_lov_read_objids(obd); + + rc = mli->md_lov_ops->ml_read_objids(obd, mli, ctxt); if (rc) { CERROR("cannot read %s: rc = %d\n", "lov_objids", rc); - GOTO(err_reg, rc); + GOTO(out, rc); } - rc = mds_lov_update_desc(obd, mds->mds_osc_exp); + rc = md_lov_info_update_desc(mli, mli->md_lov_exp); if (rc) - GOTO(err_reg, rc); + GOTO(out, rc); +out: + RETURN(rc); +} +EXPORT_SYMBOL(md_lov_connect); - /* tgt_count may be 0! */ - rc = llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count); - if (rc) { - CERROR("failed to initialize catalog %d\n", rc); - GOTO(err_reg, rc); - } +int md_lov_update_objids(struct obd_device *obd, struct md_lov_info *mli, + const void *ctxt) +{ + int rc = 0, i; /* If we're mounting this code for the first time on an existing FS, * we need to populate the objids array from the real OST values */ - if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_in_file) { - int size = sizeof(obd_id) * mds->mds_lov_desc.ld_tgt_count; - rc = obd_get_info(mds->mds_osc_exp, strlen("last_id"), - "last_id", &size, mds->mds_lov_objids); + if (mli->md_lov_desc.ld_tgt_count > mli->md_lov_objids_in_file) { + int size = sizeof(obd_id) * mli->md_lov_desc.ld_tgt_count; + rc = obd_get_info(mli->md_lov_exp, strlen("last_id"), + "last_id", &size, mli->md_lov_objids); if (!rc) { - for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++) + for (i = 0; i < mli->md_lov_desc.ld_tgt_count; i++) CWARN("got last object "LPU64" from OST %d\n", - mds->mds_lov_objids[i], i); - mds->mds_lov_objids_dirty = 1; - rc = mds_lov_write_objids(obd); + mli->md_lov_objids[i], i); + mli->md_lov_objids_dirty = 1; + rc = mli->md_lov_ops->ml_write_objids(obd, mli, ctxt); if (rc) CERROR("got last objids from OSTs, but error " "writing objids file: %d\n", rc); } } + return rc; +} + +/* update the LOV-OSC knowledge of the last used object id's */ +int mds_lov_connect(struct obd_device *obd, char * lov_name) +{ + struct mds_obd *mds = &obd->u.mds; + struct md_lov_info *mli = &mds->mds_lov_info; + int rc; + ENTRY; + + rc = md_lov_connect(obd, mli, lov_name, &obd->obd_uuid, &mli_ops, + NULL); + if (rc) + GOTO(err_reg, rc); + /* tgt_count may be 0! */ + rc = llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count); + if (rc) { + CERROR("failed to initialize catalog %d\n", rc); + GOTO(err_reg, rc); + } + + /* If we're mounting this code for the first time on an existing FS, + * we need to populate the objids array from the real OST values */ + rc = md_lov_update_objids(obd, mli, NULL); + /* I want to see a callback happen when the OBD moves to a * "For General Use" state, and that's when we'll call * set_nextid(). The class driver can help us here, because @@ -429,10 +439,11 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name) err_reg: obd_register_observer(mds->mds_osc_obd, NULL); -err_discon: - obd_disconnect(mds->mds_osc_exp); - mds->mds_osc_exp = NULL; - mds->mds_osc_obd = ERR_PTR(rc); + if (mli->md_lov_exp) { + obd_disconnect(mli->md_lov_exp); + mli->md_lov_exp = NULL; + } + mli->md_lov_obd = ERR_PTR(rc); RETURN(rc); } @@ -654,9 +665,11 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len, } struct mds_lov_sync_info { - struct obd_device *mlsi_obd; /* the lov device to sync */ - struct obd_device *mlsi_watched; /* target osc */ - __u32 mlsi_index; /* index of target */ + struct obd_device *mlsi_obd; /* the lov device to sync */ + struct md_lov_info *mlsi_mli; + struct obd_device *mlsi_watched; /* target osc */ + __u32 mlsi_index; /* index of target */ + const void *mlsi_ctxt; }; @@ -670,7 +683,8 @@ static int __mds_lov_synchronize(void *data) struct mds_lov_sync_info *mlsi = data; struct obd_device *obd = mlsi->mlsi_obd; struct obd_device *watched = mlsi->mlsi_watched; - struct mds_obd *mds = &obd->u.mds; + struct md_lov_info *mli = mlsi->mlsi_mli; + const void *ctxt = mlsi->mlsi_ctxt; struct obd_uuid *uuid; __u32 idx = mlsi->mlsi_index; int rc = 0; @@ -683,15 +697,16 @@ static int __mds_lov_synchronize(void *data) uuid = &watched->u.cli.cl_target_uuid; LASSERT(uuid); - rc = mds_lov_update_mds(obd, watched, idx); + rc = mds_lov_update_mds(obd, mli, watched, idx, ctxt); if (rc != 0) GOTO(out, rc); - rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_MDS_CONN), + rc = obd_set_info_async(mli->md_lov_exp, strlen(KEY_MDS_CONN), KEY_MDS_CONN, 0, uuid, NULL); if (rc != 0) GOTO(out, rc); - +#if 0 + /*disable for not support llog in mdd*/ rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT), mds->mds_lov_desc.ld_tgt_count, NULL, NULL, uuid); @@ -701,16 +716,16 @@ static int __mds_lov_synchronize(void *data) obd->obd_name, rc); GOTO(out, rc); } - +#endif LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n", obd->obd_name, obd_uuid2str(uuid)); if (obd->obd_stopping) GOTO(out, rc = -ENODEV); - rc = mds_lov_clear_orphans(mds, uuid); + rc = md_lov_clear_orphans(mli, uuid); if (rc != 0) { - CERROR("%s: failed at mds_lov_clear_orphans: %d\n", + CERROR("%s: failed at md_lov_clear_orphans: %d\n", obd->obd_name, rc); GOTO(out, rc); } @@ -731,9 +746,9 @@ int mds_lov_synchronize(void *data) RETURN(__mds_lov_synchronize(data)); } -int mds_lov_start_synchronize(struct obd_device *obd, - struct obd_device *watched, - void *data, int nonblock) +int md_lov_start_synchronize(struct obd_device *obd, struct md_lov_info *mli, + struct obd_device *watched, + void *data, int nonblock, const void *ctxt) { struct mds_lov_sync_info *mlsi; int rc; @@ -748,6 +763,8 @@ int mds_lov_start_synchronize(struct obd_device *obd, mlsi->mlsi_obd = obd; mlsi->mlsi_watched = watched; + mlsi->mlsi_mli = mli; + mlsi->mlsi_ctxt = ctxt; if (data) mlsi->mlsi_index = *(__u32 *)data; else @@ -762,7 +779,7 @@ int mds_lov_start_synchronize(struct obd_device *obd, disconnect the LOV. This of course means a cleanup won't finish for as long as the sync is blocking. */ class_incref(obd); - +#if 0 if (nonblock) { /* Synchronize in the background */ rc = cfs_kernel_thread(mds_lov_synchronize, mlsi, @@ -780,15 +797,26 @@ int mds_lov_start_synchronize(struct obd_device *obd, } else { rc = __mds_lov_synchronize((void *)mlsi); } - +#else + rc = __mds_lov_synchronize((void *)mlsi); +#endif RETURN(rc); } +EXPORT_SYMBOL(md_lov_start_synchronize); -int mds_notify(struct obd_device *obd, struct obd_device *watched, - enum obd_notify_event ev, void *data) +int mds_lov_start_synchronize(struct obd_device *obd, + struct obd_device *watched, + void *data, int nonblock) +{ + return md_lov_start_synchronize(obd, &obd->u.mds.mds_lov_info, + watched, data, nonblock, NULL); +} + +int md_lov_notity_pre(struct obd_device *obd, struct md_lov_info *mli, + struct obd_device *watched, enum obd_notify_event ev, + void *data) { int rc = 0; - ENTRY; switch (ev) { /* We only handle these: */ @@ -797,7 +825,7 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched, case OBD_NOTIFY_SYNC_NONBLOCK: break; default: - RETURN(0); + RETURN(-ENOENT); } CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev); @@ -815,16 +843,33 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched, /* We still have to fix the lov descriptor for ost's added after the mdt in the config log. They didn't make it into mds_lov_connect. */ - rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp); + rc = md_lov_info_update_desc(mli, mli->md_lov_exp); + RETURN(-EBUSY); + } + RETURN(rc); +} +EXPORT_SYMBOL(md_lov_notity_pre); + +int mds_notify(struct obd_device *obd, struct obd_device *watched, + enum obd_notify_event ev, void *data) +{ + int rc = 0; + struct md_lov_info *mli = &obd->u.mds.mds_lov_info; + ENTRY; + + rc = md_lov_notity_pre(obd, mli, watched, ev, data); + if (rc) { + if (rc == -ENOENT || rc == -EBUSY) + rc = 0; RETURN(rc); } LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL); rc = mds_lov_start_synchronize(obd, watched, data, !(ev == OBD_NOTIFY_SYNC)); - + lquota_recovery(quota_interface, obd); - + RETURN(rc); } diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index 43e1ba3..02ad8b7 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -186,7 +186,7 @@ int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle, "wrote trans #"LPU64" rc %d client %s at idx %u: err = %d", transno, rc, mcd->mcd_uuid, med->med_lr_idx, err); - err = mds_lov_write_objids(obd); + err = mds_lov_write_objids(obd, &mds->mds_lov_info, NULL); if (err) { log_pri = D_ERROR; if (rc == 0)