X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmdc%2Fmdc_locks.c;h=932564d62bcec5f7b1c8a4af35bc7167a7d807f8;hb=218d9b7631b25e9e2ea4e2c4e8b752aa13749c18;hp=82ad397b0da14dcba90b20bae70142782b9ac8ab;hpb=f50d3d56f80184bc21f729e1640422d6a2a31158;p=fs%2Flustre-release.git diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 82ad397..932564d 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -146,22 +146,20 @@ int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data) RETURN(0); } -int mdc_lock_match(struct obd_export *exp, int flags, - const struct lu_fid *fid, ldlm_type_t type, - ldlm_policy_data_t *policy, ldlm_mode_t mode, - struct lustre_handle *lockh) +ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags, + const struct lu_fid *fid, ldlm_type_t type, + ldlm_policy_data_t *policy, ldlm_mode_t mode, + struct lustre_handle *lockh) { struct ldlm_res_id res_id = { .name = {fid_seq(fid), fid_oid(fid), fid_ver(fid)} }; - struct obd_device *obd = class_exp2obd(exp); - int rc; + ldlm_mode_t rc; ENTRY; - rc = ldlm_lock_match(obd->obd_namespace, flags, + rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags, &res_id, type, policy, mode, lockh); - RETURN(rc); } @@ -228,11 +226,11 @@ static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc) * but this is incredibly unlikely, and questionable whether the client * could do MDS recovery under OOM anyways... */ static void mdc_realloc_openmsg(struct ptlrpc_request *req, - struct mdt_body *body, int size[9]) + struct mdt_body *body) { int rc; - ENTRY; + /* FIXME: remove this explicit offset. */ rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4, body->eadatasize); if (rc) { @@ -241,193 +239,267 @@ static void mdc_realloc_openmsg(struct ptlrpc_request *req, body->valid &= ~OBD_MD_FLEASIZE; body->eadatasize = 0; } - EXIT; } -/* We always reserve enough space in the reply packet for a stripe MD, because - * we don't know in advance the file type. */ -int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, - struct lookup_intent *it, struct md_op_data *op_data, - struct lustre_handle *lockh, void *lmm, int lmmsize, - int extra_lock_flags) +static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp, + struct lookup_intent *it, + struct md_op_data *op_data, + void *lmm, int lmmsize, + void *cb_data) { struct ptlrpc_request *req; - struct obd_device *obddev = class_exp2obd(exp); - struct ldlm_res_id res_id = - { .name = {fid_seq(&op_data->op_fid1), - fid_oid(&op_data->op_fid1), - fid_ver(&op_data->op_fid1)} }; - ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } }; - struct ldlm_request *lockreq; - struct ldlm_intent *lit; - struct ldlm_reply *lockrep; - int size[9] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), - [DLM_LOCKREQ_OFF] = sizeof(*lockreq), - [DLM_INTENT_IT_OFF] = sizeof(*lit), - 0, 0, 0, 0, 0, 0 }; - int repsize[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), - [DLM_LOCKREPLY_OFF] = sizeof(*lockrep), - [DLM_REPLY_REC_OFF] = sizeof(struct mdt_body), - [DLM_REPLY_REC_OFF+1] = obddev->u.cli. - cl_max_mds_easize, - 0, 0, 0 }; - int flags = extra_lock_flags | LDLM_FL_HAS_INTENT; - int repbufcnt = 4, rc; + struct obd_device *obddev = class_exp2obd(exp); + struct ldlm_intent *lit; + int joinfile = !!((it->it_flags & O_JOIN_FILE) && + op_data->op_data); + CFS_LIST_HEAD(cancels); + int count = 0; + int mode; + int rc; ENTRY; - LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type); + it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG; - if (it->it_op & IT_OPEN) { - int do_join = !!(it->it_flags & O_JOIN_FILE); - CFS_LIST_HEAD(cancels); - int count = 0; - int mode; - - it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG; - - size[DLM_INTENT_REC_OFF] = sizeof(struct mdt_rec_create); - /* parent capability */ - size[DLM_INTENT_REC_OFF + 1] = op_data->op_capa1 ? - sizeof(struct lustre_capa) : 0; - /* child capability, used for replay only */ - size[DLM_INTENT_REC_OFF + 2] = sizeof(struct lustre_capa); - size[DLM_INTENT_REC_OFF + 3] = op_data->op_namelen + 1; - /* As an optimization, we allocate an RPC request buffer for - * at least a default-sized LOV EA even if we aren't sending - * one. - */ - size[DLM_INTENT_REC_OFF + 4] = max(lmmsize, - obddev->u.cli.cl_default_mds_easize); - - /* XXX: openlock is not cancelled for cross-refs. */ - /* If inode is known, cancel conflicting OPEN locks. */ - if (fid_is_sane(&op_data->op_fid2)) { - if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC)) - mode = LCK_CW; + /* XXX: openlock is not cancelled for cross-refs. */ + /* If inode is known, cancel conflicting OPEN locks. */ + if (fid_is_sane(&op_data->op_fid2)) { + if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC)) + mode = LCK_CW; #ifdef FMODE_EXEC - else if (it->it_flags & FMODE_EXEC) - mode = LCK_PR; + else if (it->it_flags & FMODE_EXEC) + mode = LCK_PR; #endif - else - mode = LCK_CR; - count = mdc_resource_get_unused(exp, &op_data->op_fid2, - &cancels, mode, - MDS_INODELOCK_OPEN); - } - - /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */ - if (it->it_op & IT_CREAT || it->it_flags & O_JOIN_FILE) - mode = LCK_EX; else mode = LCK_CR; - count += mdc_resource_get_unused(exp, &op_data->op_fid1, - &cancels, mode, - MDS_INODELOCK_UPDATE); + count = mdc_resource_get_unused(exp, &op_data->op_fid2, + &cancels, mode, + MDS_INODELOCK_OPEN); + } - if (do_join) - size[DLM_INTENT_REC_OFF + 5] = - sizeof(struct mdt_rec_join); + /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */ + if (it->it_op & IT_CREAT || joinfile) + mode = LCK_EX; + else + mode = LCK_CR; + count += mdc_resource_get_unused(exp, &op_data->op_fid1, + &cancels, mode, + MDS_INODELOCK_UPDATE); + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_LDLM_INTENT_OPEN); + if (req == NULL) { + ldlm_lock_list_put(&cancels, l_bl_ast, count); + RETURN(ERR_PTR(-ENOMEM)); + } - req = ldlm_prep_enqueue_req(exp, 8 + do_join, size, &cancels, - count); - if (!req) - RETURN(-ENOMEM); + /* parent capability */ + mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); + /* child capability, reserve the size according to parent capa, it will + * be filled after we get the reply */ + mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1); + + req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, + op_data->op_namelen + 1); + req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, + max(lmmsize, obddev->u.cli.cl_default_mds_easize)); + if (!joinfile) { + req_capsule_set_size(&req->rq_pill, &RMF_REC_JOINFILE, + RCL_CLIENT, 0); + } - if (do_join) { - /* join is like an unlink of the tail */ - policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; - mdc_join_pack(req, DLM_INTENT_REC_OFF + 5, op_data, - (*(__u64 *)op_data->op_data)); - } + rc = ldlm_prep_enqueue_req(exp, req, &cancels, count); + if (rc) { + ptlrpc_request_free(req); + return NULL; + } - spin_lock(&req->rq_lock); - req->rq_replay = 1; - spin_unlock(&req->rq_lock); + if (joinfile) { + __u64 head_size = *(__u64 *)op_data->op_data; + mdc_join_pack(req, op_data, head_size); + } - /* pack the intent */ - lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF, - sizeof(*lit)); - lit->opc = (__u64)it->it_op; - - /* pack the intended request */ - mdc_open_pack(req, DLM_INTENT_REC_OFF, op_data, - it->it_create_mode, 0, it->it_flags, - lmm, lmmsize); - - /* for remote client, fetch remote perm for current user */ - repsize[repbufcnt++] = client_is_remote(exp) ? - sizeof(struct mdt_remote_perm) : - LUSTRE_POSIX_ACL_MAX_SIZE; - repsize[repbufcnt++] = sizeof(struct lustre_capa); - repsize[repbufcnt++] = sizeof(struct lustre_capa); - } else if (it->it_op & IT_UNLINK) { - size[DLM_INTENT_REC_OFF] = sizeof(struct mdt_rec_unlink); - size[DLM_INTENT_REC_OFF + 1] = op_data->op_capa1 ? - sizeof(struct lustre_capa) : 0; - size[DLM_INTENT_REC_OFF + 2] = op_data->op_namelen + 1; - policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; - req = ldlm_prep_enqueue_req(exp, 6, size, NULL, 0); - if (!req) - RETURN(-ENOMEM); - - /* pack the intent */ - lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF, - sizeof(*lit)); - lit->opc = (__u64)it->it_op; - - /* pack the intended request */ - mdc_unlink_pack(req, DLM_INTENT_REC_OFF, op_data); - - repsize[repbufcnt++] = obddev->u.cli.cl_max_mds_cookiesize; - } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) { - obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | - OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA | - OBD_MD_FLMDSCAPA | OBD_MD_MEA; - valid |= client_is_remote(exp) ? OBD_MD_FLRMTPERM : - OBD_MD_FLACL; - size[DLM_INTENT_REC_OFF] = sizeof(struct mdt_body); - size[DLM_INTENT_REC_OFF + 1] = op_data->op_capa1 ? - sizeof(struct lustre_capa) : 0; - size[DLM_INTENT_REC_OFF + 2] = op_data->op_namelen + 1; - - if (it->it_op & IT_GETATTR) - policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; + spin_lock(&req->rq_lock); + req->rq_replay = 1; + spin_unlock(&req->rq_lock); + + /* pack the intent */ + lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); + lit->opc = (__u64)it->it_op; + + /* pack the intended request */ + mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm, + lmmsize); + + /* for remote client, fetch remote perm for current user */ + if (client_is_remote(exp)) + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, + sizeof(struct mdt_remote_perm)); + ptlrpc_request_set_replen(req); + return req; +} + +static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp, + struct lookup_intent *it, + struct md_op_data *op_data) +{ + struct ptlrpc_request *req; + struct obd_device *obddev = class_exp2obd(exp); + struct ldlm_intent *lit; + int rc; + ENTRY; + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_LDLM_INTENT_UNLINK); + if (req == NULL) + RETURN(ERR_PTR(-ENOMEM)); + + mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); + req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, + op_data->op_namelen + 1); + + rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); + if (rc) { + ptlrpc_request_free(req); + RETURN(ERR_PTR(rc)); + } + + /* pack the intent */ + lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); + lit->opc = (__u64)it->it_op; + + /* pack the intended request */ + mdc_unlink_pack(req, op_data); + + req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, + obddev->u.cli.cl_max_mds_easize); + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, + obddev->u.cli.cl_max_mds_cookiesize); + ptlrpc_request_set_replen(req); + RETURN(req); +} + +static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp, + struct lookup_intent *it, + struct md_op_data *op_data) +{ + struct ptlrpc_request *req; + struct obd_device *obddev = class_exp2obd(exp); + obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | + OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA | + OBD_MD_FLMDSCAPA | OBD_MD_MEA | + (client_is_remote(exp) ? + OBD_MD_FLRMTPERM : OBD_MD_FLACL); + struct ldlm_intent *lit; + int rc; + ENTRY; - req = ldlm_prep_enqueue_req(exp, 6, size, NULL, 0); - if (!req) - RETURN(-ENOMEM); + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_LDLM_INTENT_GETATTR); + if (req == NULL) + RETURN(ERR_PTR(-ENOMEM)); - /* pack the intent */ - lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF, - sizeof(*lit)); - lit->opc = (__u64)it->it_op; + mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); + req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, + op_data->op_namelen + 1); - /* pack the intended request */ - mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, - it->it_flags, op_data); + rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); + if (rc) { + ptlrpc_request_free(req); + RETURN(ERR_PTR(rc)); + } + + /* pack the intent */ + lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); + lit->opc = (__u64)it->it_op; + + /* pack the intended request */ + mdc_getattr_pack(req, valid, it->it_flags, op_data); + + req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, + obddev->u.cli.cl_max_mds_easize); + if (client_is_remote(exp)) + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, + sizeof(struct mdt_remote_perm)); + ptlrpc_request_set_replen(req); + RETURN(req); +} + +static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp) +{ + struct ptlrpc_request *req; + int rc; + ENTRY; + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE); + if (req == NULL) + RETURN(ERR_PTR(-ENOMEM)); + + rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); + if (rc) { + ptlrpc_request_free(req); + RETURN(ERR_PTR(rc)); + } + + ptlrpc_request_set_replen(req); + RETURN(req); +} - repsize[repbufcnt++] = client_is_remote(exp) ? - sizeof(struct mdt_remote_perm) : - LUSTRE_POSIX_ACL_MAX_SIZE; - repsize[repbufcnt++] = sizeof(struct lustre_capa); - } else if (it->it_op == IT_READDIR) { +/* We always reserve enough space in the reply packet for a stripe MD, because + * we don't know in advance the file type. */ +int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, + struct lookup_intent *it, struct md_op_data *op_data, + struct lustre_handle *lockh, void *lmm, int lmmsize, + int extra_lock_flags) +{ + struct obd_device *obddev = class_exp2obd(exp); + struct ptlrpc_request *req; + struct req_capsule *pill; + struct ldlm_request *lockreq; + struct ldlm_reply *lockrep; + int flags = extra_lock_flags | LDLM_FL_HAS_INTENT; + int rc; + struct ldlm_res_id res_id = + { .name = {fid_seq(&op_data->op_fid1), + fid_oid(&op_data->op_fid1), + fid_ver(&op_data->op_fid1)} }; + ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } }; + ENTRY; + + LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type); + + if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR)) policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; - req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0); - if (!req) - RETURN(-ENOMEM); - repbufcnt = 2; - } else { + if (it->it_op & IT_OPEN) { + int joinfile = !!((it->it_flags & O_JOIN_FILE) && + op_data->op_data); + + req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize, + einfo->ei_cbdata); + if (!joinfile) { + policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; + einfo->ei_cbdata = NULL; + lmm = NULL; + } else + it->it_flags &= ~O_JOIN_FILE; + } else if (it->it_op & IT_UNLINK) + req = mdc_intent_unlink_pack(exp, it, op_data); + else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) + req = mdc_intent_getattr_pack(exp, it, op_data); + else if (it->it_op == IT_READDIR) + req = ldlm_enqueue_pack(exp); + else { LBUG(); RETURN(-EINVAL); } - /* get ready for the reply */ - ptlrpc_req_set_repsize(req, repbufcnt, repsize); + if (IS_ERR(req)) + RETURN(PTR_ERR(req)); + pill = &req->rq_pill; - /* It is important to obtain rpc_lock first (if applicable), so that - * threads that are serialised with rpc_lock are not polluting our - * rpcs in flight counter */ + /* It is important to obtain rpc_lock first (if applicable), so that + * threads that are serialised with rpc_lock are not polluting our + * rpcs in flight counter */ mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it); mdc_enter_request(&obddev->u.cli); rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL, @@ -438,8 +510,7 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, /* Similarly, if we're going to replay this request, we don't want to * actually get a lock, just perform the intent. */ if (req->rq_transno || req->rq_replay) { - lockreq = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, - sizeof(*lockreq)); + lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ); lockreq->lock_flags |= LDLM_FL_INTENT_ONLY; } @@ -467,11 +538,8 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, LDLM_LOCK_PUT(lock); } - lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF, - sizeof(*lockrep)); + lockrep = req_capsule_server_get(pill, &RMF_DLM_REP); LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */ - /* swabbed by ldlm_cli_enqueue() */ - LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF)); it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1; it->d.lustre.it_status = (int)lockrep->lock_policy_res2; @@ -496,13 +564,10 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status); /* We know what to expect, so we do any byte flipping required here */ - LASSERT(repbufcnt == 7 || repbufcnt == 6 || repbufcnt == 2); - if (repbufcnt >= 6) { - int reply_off = DLM_REPLY_REC_OFF; + if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) { struct mdt_body *body; - body = lustre_swab_repbuf(req, reply_off++, sizeof(*body), - lustre_swab_mdt_body); + body = req_capsule_server_get(pill, &RMF_MDT_BODY); if (body == NULL) { CERROR ("Can't swab mdt_body\n"); RETURN (-EPROTO); @@ -526,12 +591,11 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, * The eadata is opaque; just check that it is there. * Eventually, obd_unpackmd() will check the contents. */ - eadata = lustre_swab_repbuf(req, reply_off++, - body->eadatasize, NULL); - if (eadata == NULL) { - CERROR("Missing/short eadata\n"); + eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD, + body->eadatasize); + if (eadata == NULL) RETURN(-EPROTO); - } + if (body->valid & OBD_MD_FLMODEASIZE) { if (obddev->u.cli.cl_max_mds_easize < body->max_mdsize) { @@ -559,46 +623,39 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, * (for example error one). */ if ((it->it_op & IT_OPEN) && req->rq_replay) { - if (lustre_msg_buflen(req->rq_reqmsg, - DLM_INTENT_REC_OFF + 4) < - body->eadatasize) - mdc_realloc_openmsg(req, body, size); - - lmm = lustre_msg_buf(req->rq_reqmsg, - DLM_INTENT_REC_OFF + 4, - body->eadatasize); + if (req_capsule_get_size(pill, &RMF_EADATA, + RCL_CLIENT) < + body->eadatasize) { + mdc_realloc_openmsg(req, body); + req_capsule_set_size(pill, &RMF_EADATA, + RCL_CLIENT, + body->eadatasize); + } + lmm = req_capsule_client_get(pill, &RMF_EADATA); if (lmm) memcpy(lmm, eadata, body->eadatasize); } } + if (body->valid & OBD_MD_FLRMTPERM) { struct mdt_remote_perm *perm; LASSERT(client_is_remote(exp)); - perm = lustre_swab_repbuf(req, reply_off++, - sizeof(*perm), - lustre_swab_mdt_remote_perm); - if (perm == NULL) { - CERROR("missing remote permission!\n"); + perm = req_capsule_server_swab_get(pill, &RMF_ACL, + lustre_swab_mdt_remote_perm); + if (perm == NULL) RETURN(-EPROTO); - } - } else if ((body->valid & OBD_MD_FLACL) && body->aclsize) { - reply_off++; } if (body->valid & OBD_MD_FLMDSCAPA) { struct lustre_capa *capa, *p; - capa = lustre_unpack_capa(req->rq_repmsg, reply_off++); - if (capa == NULL) { - CERROR("Missing/short MDS capability\n"); + capa = req_capsule_server_get(pill, &RMF_CAPA1); + if (capa == NULL) RETURN(-EPROTO); - } if (it->it_op & IT_OPEN) { /* client fid capa will be checked in replay */ - p = lustre_msg_buf(req->rq_reqmsg, - DLM_INTENT_REC_OFF + 2, - sizeof(*p)); + p = req_capsule_client_get(pill, &RMF_CAPA2); LASSERT(p); *p = *capa; } @@ -606,11 +663,9 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, if (body->valid & OBD_MD_FLOSSCAPA) { struct lustre_capa *capa; - capa = lustre_unpack_capa(req->rq_repmsg, reply_off++); - if (capa == NULL) { - CERROR("Missing/short OSS capability\n"); + capa = req_capsule_server_get(pill, &RMF_CAPA2); + if (capa == NULL) RETURN(-EPROTO); - } } } @@ -669,11 +724,8 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, /* We could just return 1 immediately, but since we should only * be called in revalidate_it if we already have a lock, let's * verify that. */ - struct ldlm_res_id res_id = { .name = { fid_seq(&op_data->op_fid2), - fid_oid(&op_data->op_fid2), - fid_ver(&op_data->op_fid2) } }; ldlm_policy_data_t policy; - ldlm_mode_t mode = LCK_CR; + ldlm_mode_t mode; /* As not all attributes are kept under update lock, e.g. owner/group/acls are under lookup lock, we need both @@ -686,30 +738,10 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ? MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP; - rc = ldlm_lock_match(exp->exp_obd->obd_namespace, - LDLM_FL_BLOCK_GRANTED, &res_id, - LDLM_IBITS, &policy, mode, &lockh); - if (!rc) { - mode = LCK_CW; - rc = ldlm_lock_match(exp->exp_obd->obd_namespace, - LDLM_FL_BLOCK_GRANTED, &res_id, - LDLM_IBITS, &policy, mode, &lockh); - } - if (!rc) { - mode = LCK_PR; - rc = ldlm_lock_match(exp->exp_obd->obd_namespace, - LDLM_FL_BLOCK_GRANTED, &res_id, - LDLM_IBITS, &policy, mode, &lockh); - } - - if (!rc) { - mode = LCK_PW; - rc = ldlm_lock_match(exp->exp_obd->obd_namespace, - LDLM_FL_BLOCK_GRANTED, &res_id, - LDLM_IBITS, &policy, mode, &lockh); - } - - if (rc) { + mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, + &op_data->op_fid2, LDLM_IBITS, &policy, + LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh); + if (mode) { memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh)); it->d.lustre.it_lock_mode = mode; @@ -717,8 +749,8 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, /* Only return failure if it was not GETATTR by cfid (from inode_revalidate) */ - if (rc || op_data->op_namelen != 0) - RETURN(rc); + if (mode || op_data->op_namelen != 0) + RETURN(!!mode); } /* lookup_it may be called only after revalidate_it has run, because @@ -770,22 +802,28 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, if (rc) RETURN(rc); - mdt_body = lustre_msg_buf(request->rq_repmsg, DLM_REPLY_REC_OFF, - sizeof(*mdt_body)); - /* mdc_enqueue checked */ - LASSERT(mdt_body != NULL); - /* mdc_enqueue swabbed */ - LASSERT(lustre_rep_swabbed(request, 1)); + mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY); + LASSERT(mdt_body != NULL); /* mdc_enqueue checked */ /* If we were revalidating a fid/name pair, mark the intent in * case we fail and get called again from lookup */ - if (fid_is_sane(&op_data->op_fid2) && (it->it_flags & O_CHECK_STALE) && - (it->it_op != IT_GETATTR)) { + if (fid_is_sane(&op_data->op_fid2) && + (it->it_flags & O_CHECK_STALE) && + it->it_op != IT_GETATTR) { it_set_disposition(it, DISP_ENQ_COMPLETE); /* Also: did we find the same inode? */ - if (!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) + /* sever can return one of two fids: + * op_fid2 - new allocated fid - if file is created. + * op_fid3 - existent fid - if file only open. + * op_fid3 is saved in lmv_intent_open */ + if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) && + (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) { + CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID + "\n", PFID(&op_data->op_fid2), + PFID(&op_data->op_fid2), PFID(&mdt_body->fid1)); RETURN(-ESTALE); + } } rc = it_open_error(DISP_LOOKUP_EXECD, it);