X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmds%2Fmds_lov.c;h=40d477ddb27f93d1f295bf0ba8d72809f24cb0af;hb=c9a360f3334f20a956021b78e5379460645c75d0;hp=ac9fa015d38ed7a49ea863ff2cb821388576c23c;hpb=6e3ec5812ebd1b5ecf7cae584f429b013ffe7431;p=fs%2Flustre-release.git diff --git a/lustre/mds/mds_lov.c b/lustre/mds/mds_lov.c index ac9fa01..40d477d 100644 --- a/lustre/mds/mds_lov.c +++ b/lustre/mds/mds_lov.c @@ -26,7 +26,7 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. */ /* @@ -58,6 +58,9 @@ static void mds_lov_dump_objids(const char *label, struct obd_device *obd) struct mds_obd *mds = &obd->u.mds; unsigned int i=0, j; + if ((libcfs_debug & D_INFO) == 0) + return; + CDEBUG(D_INFO, "dump from %s\n", label); if (mds->mds_lov_page_dirty == NULL) { CERROR("NULL bitmap!\n"); @@ -217,6 +220,15 @@ static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index) return 0; } +static int mds_lov_objinit(struct mds_obd *mds, __u32 index) +{ + __u32 page = index / OBJID_PER_PAGE(); + __u32 off = index % OBJID_PER_PAGE(); + obd_id *data = mds->mds_lov_page_array[page]; + + return (data[off] > 0); +} + int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm) { struct lov_ost_data_v1 *data; @@ -271,7 +283,7 @@ static int mds_log_lost_precreated(struct obd_device *obd, ENTRY; if (*lsmp == NULL) { - rc = obd_alloc_memmd(obd->u.mds.mds_osc_exp, &lsm); + rc = obd_alloc_memmd(obd->u.mds.mds_lov_exp, &lsm); if (rc < 0) RETURN(rc); /* need only one stripe, save old value */ @@ -281,7 +293,7 @@ static int mds_log_lost_precreated(struct obd_device *obd, } lsm->lsm_oinfo[0]->loi_id = id; - lsm->lsm_oinfo[0]->loi_gr = mdt_to_obd_objgrp(obd->u.mds.mds_id); + lsm->lsm_oinfo[0]->loi_seq = mdt_to_obd_objseq(obd->u.mds.mds_id); lsm->lsm_oinfo[0]->loi_ost_idx = idx; rc = mds_log_op_orphan(obd, lsm, count); @@ -346,7 +358,7 @@ void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm) if (lsm) { /* restore stripes number */ lsm->lsm_stripe_count = stripes; - obd_free_memmd(mds->mds_osc_exp, &lsm); + obd_free_memmd(mds->mds_lov_exp, &lsm); } EXIT; return; @@ -395,7 +407,7 @@ static int mds_lov_read_objids(struct obd_device *obd) if (size == 0) RETURN(0); - page = (size / (OBJID_PER_PAGE() * sizeof(obd_id))) + 1; + page = (size + MDS_LOV_ALLOC_SIZE - 1) / MDS_LOV_ALLOC_SIZE; CDEBUG(D_INFO, "file size %lu pages %d\n", size, page); for (i = 0; i < page; i++) { obd_id *data; @@ -409,23 +421,22 @@ static int mds_lov_read_objids(struct obd_device *obd) data = mds->mds_lov_page_array[i]; rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data, - OBJID_PER_PAGE()*sizeof(obd_id), &off); + MDS_LOV_ALLOC_SIZE, &off); if (rc < 0) { CERROR("Error reading objids %d\n", rc); GOTO(out, rc); } + if (off == off_old) /* hole is read */ + off += MDS_LOV_ALLOC_SIZE; - count += (off - off_old) / sizeof(obd_id); + count = (off - off_old) / sizeof(obd_id); if (mds_lov_update_from_read(mds, data, count)) { CERROR("Can't update mds data\n"); GOTO(out, rc = -EIO); } - - if (off == off_old) - break; /* eof */ } - mds->mds_lov_objid_lastpage = i; - mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE(); + mds->mds_lov_objid_lastpage = page - 1; + mds->mds_lov_objid_lastidx = count - 1; CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count, mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx); @@ -448,7 +459,7 @@ int mds_lov_write_objids(struct obd_device *obd) cfs_foreach_bit(mds->mds_lov_page_dirty, i) { obd_id *data = mds->mds_lov_page_array[i]; - unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id); + unsigned int size = MDS_LOV_ALLOC_SIZE; loff_t off = i * size; LASSERT(data != NULL); @@ -479,24 +490,27 @@ static int mds_lov_get_objid(struct obd_device * obd, obd_id idx) { struct mds_obd *mds = &obd->u.mds; + struct obd_export *lov_exp = mds->mds_lov_exp; unsigned int page; unsigned int off; obd_id *data; + __u32 size; int rc = 0; ENTRY; page = idx / OBJID_PER_PAGE(); off = idx % OBJID_PER_PAGE(); data = mds->mds_lov_page_array[page]; + if (data[off] < 2) { /* We never read this lastid; ask the osc */ struct obd_id_info lastid; - __u32 size = sizeof(lastid); + size = sizeof(lastid); lastid.idx = idx; lastid.data = &data[off]; - rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID), - KEY_LAST_ID, &size, &lastid, NULL); + rc = obd_get_info(lov_exp, sizeof(KEY_LAST_ID), KEY_LAST_ID, + &size, &lastid, NULL); if (rc) GOTO(out, rc); @@ -527,12 +541,12 @@ int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid) * objects above this ID, they will be removed. */ memset(&oa, 0, sizeof(oa)); oa.o_flags = OBD_FL_DELORPHAN; - oa.o_gr = mdt_to_obd_objgrp(mds->mds_id); + oa.o_seq = mdt_to_obd_objseq(mds->mds_id); oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP; if (ost_uuid != NULL) oti.oti_ost_uuid = ost_uuid; - rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti); + rc = obd_create(mds->mds_lov_exp, &oa, &empty_ea, &oti); RETURN(rc); } @@ -549,7 +563,7 @@ static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id) info.idx = idx; info.data = id; - rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID), + rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_NEXT_ID), KEY_NEXT_ID, sizeof(info), &info, NULL); if (rc) CERROR ("%s: mds_lov_set_nextid failed (%d)\n", @@ -560,7 +574,7 @@ static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id) /* Update the lov desc for a new size lov. */ static int mds_lov_update_desc(struct obd_device *obd, int idx, - struct obd_uuid *uuid, enum obd_notify_event ev) + struct obd_uuid *uuid) { struct mds_obd *mds = &obd->u.mds; struct lov_desc *ld; @@ -572,7 +586,7 @@ static int mds_lov_update_desc(struct obd_device *obd, int idx, if (!ld) RETURN(-ENOMEM); - rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC, + rc = obd_get_info(mds->mds_lov_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC, &valsize, ld, NULL); if (rc) GOTO(out, rc); @@ -596,13 +610,6 @@ static int mds_lov_update_desc(struct obd_device *obd, int idx, if (rc) GOTO(out, rc); - /*XXX this notifies the MDD until lov handling use old mds code */ - if (obd->obd_upcall.onu_owner) { - LASSERT(obd->obd_upcall.onu_upcall != NULL); - rc = obd->obd_upcall.onu_upcall(obd, NULL, ev, - obd->obd_upcall.onu_owner, - &mds->mds_mount_count); - } out: OBD_FREE(ld, sizeof(*ld)); RETURN(rc); @@ -611,20 +618,16 @@ out: /* Inform MDS about new/updated target */ static int mds_lov_update_mds(struct obd_device *obd, struct obd_device *watched, - __u32 idx, enum obd_notify_event ev) + __u32 idx) { struct mds_obd *mds = &obd->u.mds; int rc = 0; int page; int off; obd_id *data; - ENTRY; - /* Don't let anyone else mess with mds_lov_objids now */ - rc = mds_lov_update_desc(obd, idx, &watched->u.cli.cl_target_uuid, ev); - if (rc) - GOTO(out, rc); + LASSERT(mds_lov_objinit(mds, idx)); CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n", idx, obd->obd_recovering, obd->obd_async_recov, @@ -671,16 +674,16 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name) int rc; ENTRY; - if (IS_ERR(mds->mds_osc_obd)) - RETURN(PTR_ERR(mds->mds_osc_obd)); + if (IS_ERR(mds->mds_lov_obd)) + RETURN(PTR_ERR(mds->mds_lov_obd)); - if (mds->mds_osc_obd) + if (mds->mds_lov_obd) RETURN(0); - mds->mds_osc_obd = class_name2obd(lov_name); - if (!mds->mds_osc_obd) { + mds->mds_lov_obd = class_name2obd(lov_name); + if (!mds->mds_lov_obd) { CERROR("MDS cannot locate LOV %s\n", lov_name); - mds->mds_osc_obd = ERR_PTR(-ENOTCONN); + mds->mds_lov_obd = ERR_PTR(-ENOTCONN); RETURN(-ENOTCONN); } @@ -692,19 +695,18 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name) GOTO(err_exit, rc); } - rc = obd_register_observer(mds->mds_osc_obd, obd); + rc = obd_register_observer(mds->mds_lov_obd, obd); if (rc) { CERROR("MDS cannot register as observer of LOV %s (%d)\n", lov_name, rc); GOTO(err_exit, rc); } - /* try init too early */ - rc = obd_llog_init(obd, &obd->obd_olg, obd, NULL); - if (rc) - GOTO(err_exit, rc); + /* ask lov to generate OBD_NOTIFY_CREATE events for already registered + * targets */ + obd_notify(mds->mds_lov_obd, NULL, OBD_NOTIFY_CREATE, NULL); - mds->mds_osc_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT; + mds->mds_lov_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT; OBD_ALLOC(data, sizeof(*data)); if (data == NULL) @@ -716,22 +718,22 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name) OBD_CONNECT_BRW_SIZE | OBD_CONNECT_CKSUM | OBD_CONNECT_CHANGE_QS | OBD_CONNECT_AT | OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN | - OBD_CONNECT_SOM; + OBD_CONNECT_SOM | OBD_CONNECT_FULL20; #ifdef HAVE_LRU_RESIZE_SUPPORT data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE; #endif data->ocd_version = LUSTRE_VERSION_CODE; - data->ocd_group = mdt_to_obd_objgrp(mds->mds_id); + data->ocd_group = mdt_to_obd_objseq(mds->mds_id); /* send max bytes per rpc */ data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT; /* send the list of supported checksum types */ data->ocd_cksum_types = OBD_CKSUM_ALL; /* NB: lov_connect() needs to fill in .ocd_index for each OST */ - rc = obd_connect(NULL, &mds->mds_osc_exp, mds->mds_osc_obd, &obd->obd_uuid, data, NULL); + rc = obd_connect(NULL, &mds->mds_lov_exp, mds->mds_lov_obd, &obd->obd_uuid, data, NULL); OBD_FREE(data, sizeof(*data)); if (rc) { CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc); - mds->mds_osc_obd = ERR_PTR(rc); + mds->mds_lov_obd = ERR_PTR(rc); RETURN(rc); } @@ -747,8 +749,8 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name) RETURN(rc); err_exit: - mds->mds_osc_exp = NULL; - mds->mds_osc_obd = ERR_PTR(rc); + mds->mds_lov_exp = NULL; + mds->mds_lov_obd = ERR_PTR(rc); RETURN(rc); } @@ -758,16 +760,16 @@ int mds_lov_disconnect(struct obd_device *obd) int rc = 0; ENTRY; - if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) { - obd_register_observer(mds->mds_osc_obd, NULL); + if (!IS_ERR(mds->mds_lov_obd) && mds->mds_lov_exp != NULL) { + obd_register_observer(mds->mds_lov_obd, NULL); /* The actual disconnect of the mds_lov will be called from * class_disconnect_exports from mds_lov_clean. So we have to * ensure that class_cleanup doesn't fail due to the extra ref * we're holding now. The mechanism to do that already exists - * the obd_force flag. We'll drop the final ref to the - * mds_osc_exp in mds_cleanup. */ - mds->mds_osc_obd->obd_force = 1; + * mds_lov_exp in mds_cleanup. */ + mds->mds_lov_obd->obd_force = 1; } RETURN(rc); @@ -777,7 +779,6 @@ struct mds_lov_sync_info { struct obd_device *mlsi_obd; /* the lov device to sync */ struct obd_device *mlsi_watched; /* target osc */ __u32 mlsi_index; /* index of target */ - enum obd_notify_event mlsi_ev; /* event type */ }; static int mds_propagate_capa_keys(struct mds_obd *mds, struct obd_uuid *uuid) @@ -791,12 +792,13 @@ static int mds_propagate_capa_keys(struct mds_obd *mds, struct obd_uuid *uuid) if (!mds->mds_capa_keys) RETURN(0); + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_SYNC_CAPA_SL, 5); for (i = 0; i < 2; i++) { key = &mds->mds_capa_keys[i]; DEBUG_CAPA_KEY(D_SEC, key, "propagate"); info.capa = key; - rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY), + rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_CAPA_KEY), KEY_CAPA_KEY, sizeof(info), &info, NULL); if (rc) { DEBUG_CAPA_KEY(D_ERROR, key, @@ -821,7 +823,6 @@ static int __mds_lov_synchronize(void *data) struct mds_obd *mds = &obd->u.mds; struct obd_uuid *uuid; __u32 idx = mlsi->mlsi_index; - enum obd_notify_event ev = mlsi->mlsi_ev; struct mds_group_info mgi; struct llog_ctxt *ctxt; int rc = 0; @@ -839,15 +840,15 @@ static int __mds_lov_synchronize(void *data) GOTO(out, rc = -ENODEV); OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE); - rc = mds_lov_update_mds(obd, watched, idx, ev); + rc = mds_lov_update_mds(obd, watched, idx); if (rc != 0) { CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc); GOTO(out, rc); } - mgi.group = mdt_to_obd_objgrp(mds->mds_id); + mgi.group = mdt_to_obd_objseq(mds->mds_id); mgi.uuid = uuid; - rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN), + rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_MDS_CONN), KEY_MDS_CONN, sizeof(mgi), &mgi, NULL); if (rc != 0) GOTO(out, rc); @@ -871,6 +872,7 @@ static int __mds_lov_synchronize(void *data) LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n", obd->obd_name, obd_uuid2str(uuid)); + rc = mds_lov_clear_orphans(mds, uuid); if (rc != 0) { CERROR("%s failed at mds_lov_clear_orphans: %d\n", @@ -879,7 +881,7 @@ static int __mds_lov_synchronize(void *data) } #ifdef HAVE_QUOTA_SUPPORT - if (obd->obd_upcall.onu_owner) { + if (obd->obd_upcall.onu_owner) { /* * This is a hack for mds_notify->mdd_notify. When the mds obd * in mdd is removed, This hack should be removed. @@ -896,9 +898,9 @@ out: /* Deactivate it for safety */ CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid), rc); - if (!obd->obd_stopping && mds->mds_osc_obd && - !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping) - obd_notify(mds->mds_osc_obd, watched, + if (!obd->obd_stopping && mds->mds_lov_obd && + !mds->mds_lov_obd->obd_stopping && !watched->obd_stopping) + obd_notify(mds->mds_lov_obd, watched, OBD_NOTIFY_INACTIVE, NULL); } @@ -937,7 +939,6 @@ int mds_lov_start_synchronize(struct obd_device *obd, mlsi->mlsi_obd = obd; mlsi->mlsi_watched = watched; mlsi->mlsi_index = *(__u32 *)data; - mlsi->mlsi_ev = ev; /* Although class_export_get(obd->obd_self_export) would lock the MDS in place, since it's only a self-export @@ -973,13 +974,38 @@ int mds_lov_start_synchronize(struct obd_device *obd, int mds_notify(struct obd_device *obd, struct obd_device *watched, enum obd_notify_event ev, void *data) { + struct mds_obd *mds = &obd->u.mds; int rc = 0; ENTRY; CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev); + if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) { + CERROR("unexpected notification of %s %s!\n", + watched->obd_type->typ_name, watched->obd_name); + RETURN(-EINVAL); + } + + /*XXX this notifies the MDD until lov handling use old mds code + * must non block! + */ + if (obd->obd_upcall.onu_owner) { + LASSERT(obd->obd_upcall.onu_upcall != NULL); + rc = obd->obd_upcall.onu_upcall(obd, NULL, ev, + obd->obd_upcall.onu_owner, + &mds->mds_obt.obt_mount_count); + } + switch (ev) { /* We only handle these: */ + case OBD_NOTIFY_CREATE: + CWARN("MDS %s: add target %s\n",obd->obd_name, + obd_uuid2str(&watched->u.cli.cl_target_uuid)); + /* We still have to fix the lov descriptor for ost's */ + LASSERT(data); + rc = mds_lov_update_desc(obd, *(__u32 *)data, + &watched->u.cli.cl_target_uuid); + RETURN(rc); case OBD_NOTIFY_ACTIVE: /* lov want one or more _active_ targets for work */ /* activate event should be pass lov idx as argument */ @@ -991,12 +1017,6 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched, RETURN(0); } - if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) { - CERROR("unexpected notification of %s %s!\n", - watched->obd_type->typ_name, watched->obd_name); - RETURN(-EINVAL); - } - if (obd->obd_recovering) { CWARN("MDS %s: in recovery, not resetting orphans on %s\n", obd->obd_name, @@ -1005,7 +1025,7 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched, after the mdt in the config log. They didn't make it into mds_lov_connect. */ rc = mds_lov_update_desc(obd, *(__u32 *)data, - &watched->u.cli.cl_target_uuid, ev); + &watched->u.cli.cl_target_uuid); } else { rc = mds_lov_start_synchronize(obd, watched, data, ev); }