From a3e4e0706455140cb27b3d184887090ceac864cf Mon Sep 17 00:00:00 2001 From: huanghua Date: Fri, 11 Jul 2008 05:22:17 +0000 Subject: [PATCH] Branch b1_8_gate b=11930 i=adilger i=nikita.danilov i=alex --- lustre/mdc/mdc_locks.c | 144 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 96 insertions(+), 48 deletions(-) diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index d06c739..3438deb 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -134,12 +134,10 @@ EXPORT_SYMBOL(mdc_set_lock_data); int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid, ldlm_iterator_t it, void *data) { - struct ldlm_res_id res_id = { .name = {0} }; + struct ldlm_res_id res_id; ENTRY; - res_id.name[0] = fid->id; - res_id.name[1] = fid->generation; - + fid_build_reg_res_name((struct lu_fid*)fid, &res_id); ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id, it, data); @@ -197,7 +195,7 @@ static void mdc_realloc_openmsg(struct ptlrpc_request *req, OBD_ALLOC(new_msg, new_size); if (new_msg != NULL) { - DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u\n", + DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u", body->eadatasize); memcpy(new_msg, old_msg, old_size); @@ -222,7 +220,7 @@ static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp, struct ptlrpc_request *req; struct ldlm_intent *lit; struct obd_device *obddev = class_exp2obd(exp); - int size[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), + int size[9] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request), [DLM_INTENT_IT_OFF] = sizeof(*lit), [DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create), @@ -236,7 +234,7 @@ static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp, * default-sized LOV EA for open replay. */ [DLM_INTENT_REC_OFF+2]= max(lmmsize, obddev->u.cli.cl_default_mds_easize) }; - int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), + int repsize[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply), [DLM_REPLY_REC_OFF] = sizeof(struct mds_body), [DLM_REPLY_REC_OFF+1] = obddev->u.cli. @@ -245,18 +243,31 @@ static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp, CFS_LIST_HEAD(cancels); int do_join = (it->it_flags & O_JOIN_FILE) && data->data; int count = 0; + int bufcount = 6; + int repbufcount = 5; int mode; int rc; + ENTRY; - it->it_create_mode |= S_IFREG; - - rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, 6, size); + it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG; + if (mdc_exp_is_2_0_server(exp)) { + size[DLM_INTENT_REC_OFF] = sizeof(struct mdt_rec_create); + size[DLM_INTENT_REC_OFF+4] = size[DLM_INTENT_REC_OFF+2]; + size[DLM_INTENT_REC_OFF+3] = size[DLM_INTENT_REC_OFF+1]; + size[DLM_INTENT_REC_OFF+2] = 0; /* capa */ + size[DLM_INTENT_REC_OFF+1] = 0; /* capa */ + bufcount = 8; + repsize[DLM_REPLY_REC_OFF+3]=sizeof(struct lustre_capa); + repsize[DLM_REPLY_REC_OFF+4]=sizeof(struct lustre_capa); + repbufcount = 7; + } + rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, + bufcount, size); if (rc & (rc - 1)) - size[DLM_INTENT_REC_OFF + 2] = - min(size[DLM_INTENT_REC_OFF + 2] + round_up(rc) - rc, - obddev->u.cli.cl_max_mds_easize); + size[bufcount - 1] = min(size[bufcount - 1] + round_up(rc) - rc, + obddev->u.cli.cl_max_mds_easize); - /* If inode is known, cancel conflicting OPEN locks. */ + /* If inode is known, cancel conflicting OPEN locks. */ if (data->fid2.id) { if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC)) mode = LCK_CW; @@ -279,14 +290,19 @@ static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp, MDS_INODELOCK_UPDATE); if (do_join) { __u64 head_size = (*(__u64 *)data->data); - /* join is like an unlink of the tail */ - size[DLM_INTENT_REC_OFF + 3] = sizeof(struct mds_rec_join); - req = ldlm_prep_enqueue_req(exp, 7, size, &cancels, count); + /* join is like an unlink of the tail */ + if (mdc_exp_is_2_0_server(exp)) { + size[DLM_INTENT_REC_OFF+5]=sizeof(struct mdt_rec_join); + } else { + size[DLM_INTENT_REC_OFF+3]=sizeof(struct mds_rec_join); + } + bufcount++; + + req = ldlm_prep_enqueue_req(exp, bufcount, size,&cancels,count); if (req) - mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, data, - head_size); + mdc_join_pack(req, bufcount - 1, data, head_size); } else { - req = ldlm_prep_enqueue_req(exp, 6, size, &cancels, count); + req = ldlm_prep_enqueue_req(exp, bufcount, size,&cancels,count); it->it_flags &= ~O_JOIN_FILE; } @@ -305,9 +321,9 @@ static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp, it->it_create_mode, 0, it->it_flags, lmm, lmmsize); - ptlrpc_req_set_repsize(req, 5, repsize); + ptlrpc_req_set_repsize(req, repbufcount, repsize); } - return req; + RETURN(req); } static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp, @@ -320,7 +336,9 @@ static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp, int size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request), [DLM_INTENT_IT_OFF] = sizeof(*lit), - [DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_unlink), + [DLM_INTENT_REC_OFF] = mdc_exp_is_2_0_server(exp) ? + sizeof(struct mdt_rec_unlink) : + sizeof(struct mds_rec_unlink), [DLM_INTENT_REC_OFF+1]= data->namelen + 1 }; int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply), @@ -329,6 +347,7 @@ static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp, cl_max_mds_easize, [DLM_REPLY_REC_OFF+2] = obddev->u.cli. cl_max_mds_cookiesize }; + ENTRY; req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0); if (req) { @@ -342,7 +361,7 @@ static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp, ptlrpc_req_set_repsize(req, 5, repsize); } - return req; + RETURN(req); } static struct ptlrpc_request *mdc_intent_lookup_pack(struct obd_export *exp, @@ -352,21 +371,30 @@ static struct ptlrpc_request *mdc_intent_lookup_pack(struct obd_export *exp, struct ptlrpc_request *req; struct ldlm_intent *lit; struct obd_device *obddev = class_exp2obd(exp); - int size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), + int size[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request), [DLM_INTENT_IT_OFF] = sizeof(*lit), [DLM_INTENT_REC_OFF] = sizeof(struct mds_body), - [DLM_INTENT_REC_OFF+1]= data->namelen + 1 }; - int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), + [DLM_INTENT_REC_OFF+1]= data->namelen + 1, + [DLM_INTENT_REC_OFF+2]= 0 }; + int repsize[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply), [DLM_REPLY_REC_OFF] = sizeof(struct mds_body), [DLM_REPLY_REC_OFF+1] = obddev->u.cli. cl_max_mds_easize, - [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE }; + [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE, + [DLM_REPLY_REC_OFF+3] = 0 }; obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLACL | OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA; + int bufcount = 5; + ENTRY; - req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0); + if (mdc_exp_is_2_0_server(exp)) { + size[DLM_INTENT_REC_OFF+1] = 0; /* capa */ + size[DLM_INTENT_REC_OFF+2] = data->namelen + 1; + bufcount = 6; + } + req = ldlm_prep_enqueue_req(exp, bufcount, size, NULL, 0); if (req) { /* pack the intent */ lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF, @@ -376,9 +404,9 @@ static struct ptlrpc_request *mdc_intent_lookup_pack(struct obd_export *exp, /* pack the intended request */ mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, it->it_flags, data); - ptlrpc_req_set_repsize(req, 5, repsize); + ptlrpc_req_set_repsize(req, bufcount, repsize); } - return req; + RETURN(req); } static struct ptlrpc_request *mdc_intent_readdir_pack(struct obd_export *exp) @@ -386,13 +414,15 @@ static struct ptlrpc_request *mdc_intent_readdir_pack(struct obd_export *exp) struct ptlrpc_request *req; int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request) }; - int repsize[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), - [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply) }; + int repsize[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), + [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply), + [DLM_REPLY_REC_OFF] = sizeof(struct ost_lvb) }; + ENTRY; req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0); if (req) - ptlrpc_req_set_repsize(req, 2, repsize); - return req; + ptlrpc_req_set_repsize(req, 3, repsize); + RETURN(req); } static int mdc_finish_enqueue(struct obd_export *exp, @@ -435,7 +465,7 @@ static int mdc_finish_enqueue(struct obd_export *exp, lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF, sizeof(*lockrep)); - LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */ + LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */ /* swabbed by ldlm_cli_enqueue() */ LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF)); @@ -543,18 +573,28 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, { struct ptlrpc_request *req; struct obd_device *obddev = class_exp2obd(exp); - struct ldlm_res_id res_id = - { .name = {data->fid1.id, data->fid1.generation} }; + struct ldlm_res_id res_id; ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } }; int flags = extra_lock_flags | LDLM_FL_HAS_INTENT; int rc; ENTRY; + fid_build_reg_res_name((struct lu_fid*)&data->fid1, &res_id); LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type); if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR)) policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; if (it->it_op & IT_OPEN) { + if ((it->it_op & IT_CREAT) && mdc_exp_is_2_0_server(exp)) { + struct client_obd *cli = &obddev->u.cli; + data->fid3 = data->fid2; + rc = mdc_fid_alloc(cli->cl_seq, + (struct lu_fid*)&data->fid2); + if (rc) { + CERROR("fid allocation result: %d\n", rc); + RETURN(rc); + } + } req = mdc_intent_open_pack(exp, it, data, lmm, lmmsize); if (it->it_flags & O_JOIN_FILE) { policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; @@ -600,11 +640,13 @@ int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, /* We could just return 1 immediately, but since we should only * be called in revalidate_it if we already have a lock, let's * verify that. */ - struct ldlm_res_id res_id = {.name ={fid->id, fid->generation}}; + struct ldlm_res_id res_id; struct lustre_handle lockh; ldlm_policy_data_t policy; ldlm_mode_t mode; + ENTRY; + fid_build_reg_res_name((struct lu_fid*)fid, &res_id); /* As not all attributes are kept under update lock, e.g. owner/group/acls are under lookup lock, we need both ibits for GETATTR. */ @@ -620,7 +662,7 @@ int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, it->d.lustre.it_lock_mode = mode; } - return !!mode; + RETURN(!!mode); } EXPORT_SYMBOL(mdc_revalidate_lock); @@ -659,10 +701,15 @@ static int mdc_finish_intent_lock(struct obd_export *exp, /* If we were revalidating a fid/name pair, mark the intent in * case we fail and get called again from lookup */ - if (data->fid2.id && (it->it_op != IT_GETATTR)) { + + if (data->fid2.id && (it->it_op != IT_GETATTR) && + ( !mdc_exp_is_2_0_server(exp) || + (mdc_exp_is_2_0_server(exp) && (it->it_flags & O_CHECK_STALE)))) { it_set_disposition(it, DISP_ENQ_COMPLETE); + /* Also: did we find the same inode? */ - if (memcmp(&data->fid2, &mds_body->fid1, sizeof(data->fid2))) + if (memcmp(&data->fid2, &mds_body->fid1, sizeof(data->fid2)) && + memcmp(&data->fid3, &mds_body->fid1, sizeof(data->fid3))) RETURN(-ESTALE); } @@ -762,8 +809,11 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data, LASSERT(it); - CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n", - op_data->namelen, op_data->name, op_data->fid1.id, + CDEBUG(D_DLMTRACE,"name: %.*s("DFID") in inode ("DFID"), " + "intent: %s flags %#o\n", + op_data->namelen, op_data->name, + PFID(((struct lu_fid*)&op_data->fid2)), + PFID(((struct lu_fid*)&op_data->fid1)), ldlm_it2str(it->it_op), it->it_flags); lockh.cookie = 0; @@ -867,10 +917,7 @@ int mdc_intent_getattr_async(struct obd_export *exp, struct lookup_intent *it = &minfo->mi_it; struct ptlrpc_request *req; struct obd_device *obddev = class_exp2obd(exp); - struct ldlm_res_id res_id = { - .name = {op_data->fid1.id, - op_data->fid1.generation} - }; + struct ldlm_res_id res_id; ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } }; @@ -883,6 +930,7 @@ int mdc_intent_getattr_async(struct obd_export *exp, op_data->namelen, op_data->name, op_data->fid1.id, ldlm_it2str(it->it_op), it->it_flags); + fid_build_reg_res_name((struct lu_fid*)&op_data->fid1, &res_id); req = mdc_intent_lookup_pack(exp, it, op_data); if (!req) RETURN(-ENOMEM); -- 1.8.3.1