From: alex Date: Mon, 4 Jul 2005 06:28:56 +0000 (+0000) Subject: - CROW-related fixes from b_hd_mdref X-Git-Tag: v1_7_100~1128 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=87c86d444e61e38d6454bba5700ba966dc1ac83d;hp=25c97ec86f471de50556673b52d1e1b7713063af - CROW-related fixes from b_hd_mdref --- diff --git a/lustre/include/linux/lustre_export.h b/lustre/include/linux/lustre_export.h index 8f9b829..1fc3263 100644 --- a/lustre/include/linux/lustre_export.h +++ b/lustre/include/linux/lustre_export.h @@ -33,6 +33,8 @@ struct osc_creator { spinlock_t oscc_lock; struct obd_device *oscc_obd; int oscc_flags; + obd_id oscc_next_id; + wait_queue_head_t oscc_waitq; }; struct ldlm_export_data { diff --git a/lustre/include/linux/obd.h b/lustre/include/linux/obd.h index 23f46e6..104acc0 100644 --- a/lustre/include/linux/obd.h +++ b/lustre/include/linux/obd.h @@ -553,8 +553,6 @@ struct niobuf_local { /* Don't conflict with on-wire flags OBD_BRW_WRITE, etc */ #define N_LOCAL_TEMP_PAGE 0x10000000 -typedef int (*obd_obj_alloc_func_t)(obd_id *objid); - struct obd_trans_info { __u64 oti_transno; __u64 *oti_objid; @@ -569,7 +567,6 @@ struct obd_trans_info { struct llog_cookie *oti_logcookies; int oti_numcookies; int oti_flags; - obd_obj_alloc_func_t oti_obj_alloc; }; static inline void oti_alloc_cookies(struct obd_trans_info *oti,int num_cookies) diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 152d5d7..3186999 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -763,21 +763,9 @@ lov_create(struct obd_export *exp, struct obdo *src_oa, struct lov_request *req = list_entry(pos, struct lov_request, rq_link); - obd_id *objids = oti->oti_objid; - - if (oti->oti_obj_alloc) { - __u64 next_id; - - /* - * allocating new objid. Here it is delegated to caller, - * that is MDS in CROW case. - */ - next_id = oti->oti_obj_alloc(&objids[req->rq_idx]); - req->rq_oa->o_id = next_id; - } else { - /* and here is default "allocator" */ - req->rq_oa->o_id = ++objids[req->rq_idx]; - } + /* XXX: LOV STACKING: use real "obj_mdp" sub-data */ + rc = obd_create(lov->tgts[req->rq_idx].ltd_exp, + req->rq_oa, NULL, 0, &req->rq_md, oti); lov_update_create_set(set, req, rc); } rc = lov_fini_create_set(set, ea); diff --git a/lustre/mds/mds_lov.c b/lustre/mds/mds_lov.c index ced694c..2270fd7 100644 --- a/lustre/mds/mds_lov.c +++ b/lustre/mds/mds_lov.c @@ -323,6 +323,7 @@ int mds_dt_connect(struct obd_device *obd, char *lov_name) "writing objids file: %d\n", rc); } } + /* * I want to see a callback happen when the OBD moves to a "For General * Use" state, and that's when we'll call set_nextid(). The class driver @@ -659,11 +660,15 @@ int mds_dt_synchronize(void *data) if (rc) GOTO(cleanup, rc); + /* we don't set next id manually, instead OSCs will set them + * during own recovery from DELORPHAN reply -bzzz */ +#if 0 vals[0] = index; rc = mds_dt_set_info(obd->obd_self_export, strlen("next_id"), "next_id", 2, vals); if (rc) GOTO(cleanup, rc); +#endif obd_llog_finish(obd, &obd->obd_llogs, old_count); obd_llog_cat_initialize(obd, &obd->obd_llogs, count, name); diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c index a23df76..e76ff94 100644 --- a/lustre/mds/mds_open.c +++ b/lustre/mds/mds_open.c @@ -297,14 +297,6 @@ cleanup_dentry: return ERR_PTR(rc); } -/* this is object id allocation callback */ -static int mds_obj_alloc(obd_id *objid) -{ - ENTRY; - LASSERT(objid != NULL); - RETURN(++(*objid)); -} - static inline void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm, struct lov_desc *desc) @@ -319,10 +311,10 @@ mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm, /* must be called with i_sem held */ int -mds_create_object(struct obd_device *obd, struct ptlrpc_request *req, - int offset, struct mds_update_record *rec, - struct dentry *dchild, void **handle, - obd_id *ids) +mds_create_objects(struct obd_device *obd, struct ptlrpc_request *req, + int offset, struct mds_update_record *rec, + struct dentry *dchild, void **handle, + obd_id **ids) { struct inode *inode = dchild->d_inode; struct mds_obd *mds = &obd->u.mds; @@ -346,7 +338,10 @@ mds_create_object(struct obd_device *obd, struct ptlrpc_request *req, if (body->valid & OBD_MD_FLEASIZE) RETURN(0); - oti.oti_objid = ids; + OBD_ALLOC(*ids, mds->mds_dt_desc.ld_tgt_count * sizeof(**ids)); + if (*ids == NULL) + RETURN(-ENOMEM); + oti.oti_objid = *ids; /* replay case */ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) { @@ -364,15 +359,10 @@ mds_create_object(struct obd_device *obd, struct ptlrpc_request *req, RETURN(rc); } - /* - * FIXME: this is evil layering violation, all things related to - * stripping should be done by LOV. --umka. - */ - mds_objids_from_lmm(ids, lmm, &mds->mds_dt_desc); + mds_objids_from_lmm(*ids, lmm, &mds->mds_dt_desc); lmm_buf = lustre_msg_buf(req->rq_repmsg, offset, 0); lmm_bufsize = req->rq_repmsg->buflens[offset]; - LASSERT(lmm_buf != NULL); LASSERT(lmm_bufsize >= lmm_size); @@ -428,14 +418,8 @@ mds_create_object(struct obd_device *obd, struct ptlrpc_request *req, GOTO(out_oa, rc); } - /* - * create with CROW flag and base ids for allocating new ids on - * them. - */ - oti.oti_flags |= OBD_MODE_CROW; - oti.oti_obj_alloc = mds_obj_alloc; - LASSERT(oa->o_gr >= FILTER_GROUP_FIRST_MDS); + oti.oti_flags |= OBD_MODE_CROW; rc = obd_create(mds->mds_dt_exp, oa, NULL, 0, &lsm, &oti); if (rc) { @@ -741,7 +725,6 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild, } if (rec != NULL) { - /* no EA: create objects */ if ((body->valid & OBD_MD_FLEASIZE) && (rec->ur_flags & MDS_OPEN_HAS_EA)) { up(&dchild->d_inode->i_sem); @@ -749,45 +732,14 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild, } if (!(body->valid & OBD_MD_FLEASIZE)) { - int ids_size = mds->mds_dt_desc.ld_tgt_count * sizeof(*ids); - - OBD_ALLOC(ids, ids_size); - if (ids == NULL) { - up(&dchild->d_inode->i_sem); - RETURN(-ENOMEM); - } - - /* - * synchronizing object creating to prevent another - * threads take the same base objid values. - */ - down(&mds->mds_create_sem); - - /* preparing base ids */ - mds_dt_save_objids(obd, ids); - - /* - * create objects, @ids will contain new allocated obj - * ids. - */ - rc = mds_create_object(obd, req, 2, rec, - dchild, handle, ids); + /* no EA: create objects */ + rc = mds_create_objects(obd, req, 2, rec, + dchild, handle, &ids); if (rc) { CERROR("mds_create_object: rc = %d\n", rc); - up(&mds->mds_create_sem); up(&dchild->d_inode->i_sem); - OBD_FREE(ids, ids_size); RETURN(rc); } - - /* - * update MDS objids by new ones allocated in - * mds_create_object(). - */ - mds_dt_update_objids(obd, ids); - OBD_FREE(ids, ids_size); - - up(&mds->mds_create_sem); } if (S_ISREG(dchild->d_inode->i_mode) && @@ -827,6 +779,12 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild, CDEBUG(D_INODE, "mfd %p, cookie "LPX64"\n", mfd, mfd->mfd_handle.h_cookie); + + if (ids != NULL) { + mds_dt_update_objids(obd, ids); + OBD_FREE(ids, sizeof(*ids) * mds->mds_dt_desc.ld_tgt_count); + } + RETURN(rc); } diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index c6ce804..80a55f7 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -2509,6 +2509,9 @@ filter_clear_orphans(struct obd_export *exp, struct obdo *oa) CDEBUG(D_HA, "%s:["LPU64"] after destroy: set last_objids = " LPU64"\n", exp->exp_obd->obd_name, doa->o_gr, oa->o_id); + /* return next free id to be used as a new start of sequence -bzzz */ + oa->o_id = last + 1; + filter_set_last_id(filter, oa->o_gr, oa->o_id); clear_bit(doa->o_gr, &filter->fo_destroys_in_progress); up(&filter->fo_create_locks[oa->o_gr]); diff --git a/lustre/osc/osc_create.c b/lustre/osc/osc_create.c index 722cfc7..3a25414 100644 --- a/lustre/osc/osc_create.c +++ b/lustre/osc/osc_create.c @@ -56,13 +56,24 @@ #include #include "osc_internal.h" +int oscc_recovering(struct osc_creator *oscc) +{ + int recov = 0; + + spin_lock(&oscc->oscc_lock); + recov = oscc->oscc_flags & OSCC_FLAG_RECOVERING; + spin_unlock(&oscc->oscc_lock); + + return recov; +} + /* this only is used now for deleting orphanes */ int osc_create(struct obd_export *exp, struct obdo *oa, void *acl, int acl_size, struct lov_stripe_md **ea, struct obd_trans_info *oti) { struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc; - int rc = 0; + int rc = 0, try_again = 1; ENTRY; LASSERT(oa); @@ -117,8 +128,51 @@ int osc_create(struct obd_export *exp, struct obdo *oa, RETURN(rc); } - LBUG(); - RETURN(0); + while (try_again) { + /* If orphans are being recovered, then we must wait until + it is finished before we can continue with create. */ + if (oscc_recovering(oscc)) { + struct l_wait_info lwi; + + CDEBUG(D_HA,"%p: oscc recovery in progress, waiting\n", + oscc); + + lwi = LWI_TIMEOUT(MAX(obd_timeout*HZ/4, 1), NULL, NULL); + rc = l_wait_event(oscc->oscc_waitq, + !oscc_recovering(oscc), &lwi); + LASSERT(rc == 0 || rc == -ETIMEDOUT); + if (rc == -ETIMEDOUT) { + CDEBUG(D_HA,"%p: timeout waiting on recovery\n", + oscc); + RETURN(rc); + } + CDEBUG(D_HA, "%p: oscc recovery over, waking up\n", + oscc); + } + + spin_lock(&oscc->oscc_lock); + if (oscc->oscc_flags & OSCC_FLAG_EXITING) { + spin_unlock(&oscc->oscc_lock); + break; + } + + if (oscc->oscc_flags & OSCC_FLAG_NOSPC) { + rc = -ENOSPC; + spin_unlock(&oscc->oscc_lock); + break; + } + + oscc->oscc_next_id++; + oa->o_id = oscc->oscc_next_id; + try_again = 0; + spin_unlock(&oscc->oscc_lock); + + CDEBUG(D_HA, "%s: returning objid "LPU64"\n", + oscc->oscc_obd->u.cli.cl_import->imp_target_uuid.uuid, + oa->o_id); + } + + RETURN(rc); } void oscc_init(struct obd_device *obd) @@ -134,4 +188,5 @@ void oscc_init(struct obd_device *obd) oscc->oscc_obd = obd; spin_lock_init(&oscc->oscc_lock); oscc->oscc_flags |= OSCC_FLAG_RECOVERING; + init_waitqueue_head(&oscc->oscc_waitq); } diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 12a42e5..2d565f6 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -292,6 +292,7 @@ out: int osc_real_create(struct obd_export *exp, struct obdo *oa, struct lov_stripe_md **ea, struct obd_trans_info *oti) { + struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc; struct ptlrpc_request *request; struct ost_body *body; struct lov_stripe_md *lsm; @@ -337,6 +338,16 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, GOTO (out_req, rc = -EPROTO); } + if ((oa->o_valid & OBD_MD_FLFLAGS) && oa->o_flags == OBD_FL_DELORPHAN) { + struct obd_import *imp = class_exp2cliimp(exp); + /* MDS declares last known object, OSS responses + * with next possible object -bzzz */ + spin_lock(&oscc->oscc_lock); + oscc->oscc_next_id = body->oa.o_id; + spin_unlock(&oscc->oscc_lock); + CDEBUG(D_HA, "%s: set nextid "LPD64" after recovery\n", + imp->imp_target_uuid.uuid, oa->o_id); + } memcpy(oa, &body->oa, sizeof(*oa)); /* This should really be sent by the OST */