X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmdc%2Fmdc_locks.c;h=797485f81f4393c71fc07f0b8f24d7c4e63c8d9e;hb=d2d56f38da01001c92a09afc6b52b5acbd9bc13c;hp=618f43006b5ae7f273b50d40cd71c8d87f21f219;hpb=bf527ab7e56d4445f81223b23302b3cbf0dc5fb1;p=fs%2Flustre-release.git diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 618f430..797485f 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -36,8 +36,11 @@ # include #endif +#include #include #include +/* fid_res_name_eq() */ +#include #include #include "mdc_internal.h" @@ -61,11 +64,13 @@ EXPORT_SYMBOL(it_clear_disposition); static int it_to_lock_mode(struct lookup_intent *it) { + ENTRY; + /* CREAT needs to be tested before open (both could be set) */ if (it->it_op & IT_CREAT) - return LCK_CW; + return LCK_PW; else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP)) - return LCK_CR; + return LCK_PR; LBUG(); RETURN(-EINVAL); @@ -108,18 +113,17 @@ int it_open_error(int phase, struct lookup_intent *it) EXPORT_SYMBOL(it_open_error); /* this must be called on a lockh that is known to have a referenced lock */ -void mdc_set_lock_data(__u64 *l, void *data) +int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data) { struct ldlm_lock *lock; - struct lustre_handle *lockh = (struct lustre_handle *)l; ENTRY; - if (!*l) { + if (!*lockh) { EXIT; - return; + RETURN(0); } - lock = ldlm_handle2lock(lockh); + lock = ldlm_handle2lock((struct lustre_handle *)lockh); LASSERT(lock != NULL); lock_res_and_lock(lock); @@ -139,21 +143,59 @@ void mdc_set_lock_data(__u64 *l, void *data) unlock_res_and_lock(lock); LDLM_LOCK_PUT(lock); - EXIT; + RETURN(0); +} + +int mdc_lock_match(struct obd_export *exp, int flags, + const struct lu_fid *fid, ldlm_type_t type, + ldlm_policy_data_t *policy, ldlm_mode_t mode, + struct lustre_handle *lockh) +{ + struct ldlm_res_id res_id = + { .name = {fid_seq(fid), + fid_oid(fid), + fid_ver(fid)} }; + struct obd_device *obd = class_exp2obd(exp); + int rc; + ENTRY; + + rc = ldlm_lock_match(obd->obd_namespace, flags, + &res_id, type, policy, mode, lockh); + + RETURN(rc); +} + +int mdc_cancel_unused(struct obd_export *exp, + const struct lu_fid *fid, + int flags, void *opaque) +{ + struct ldlm_res_id res_id = + { .name = {fid_seq(fid), + fid_oid(fid), + fid_ver(fid)} }; + struct obd_device *obd = class_exp2obd(exp); + int rc; + + ENTRY; + + rc = ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, + flags, opaque); + RETURN(rc); } -EXPORT_SYMBOL(mdc_set_lock_data); -int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid, +int mdc_change_cbdata(struct obd_export *exp, + const struct lu_fid *fid, ldlm_iterator_t it, void *data) { struct ldlm_res_id res_id = { .name = {0} }; ENTRY; - res_id.name[0] = fid->id; - res_id.name[1] = fid->generation; + res_id.name[0] = fid_seq(fid); + res_id.name[1] = fid_oid(fid); + res_id.name[2] = fid_ver(fid); - ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id, - it, data); + ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, + &res_id, it, data); EXIT; return 0; @@ -173,16 +215,6 @@ static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc) } } -static int round_up(int val) -{ - int ret = 1; - while (val) { - val >>= 1; - ret <<= 1; - } - return ret; -} - /* Save a large LOV EA into the request buffer so that it is available * for replay. We don't do this in the initial request because the * original request doesn't need this buffer (at most it sends just the @@ -195,36 +227,20 @@ static int round_up(int val) * but this is incredibly unlikely, and questionable whether the client * could do MDS recovery under OOM anyways... */ static void mdc_realloc_openmsg(struct ptlrpc_request *req, - struct mds_body *body, int size[6]) + struct mdt_body *body, int size[9]) { - int new_size, old_size; - struct lustre_msg *new_msg; - - /* save old size */ - old_size = lustre_msg_size(lustre_request_magic(req), 6, size); - - size[DLM_INTENT_REC_OFF + 2] = body->eadatasize; - new_size = lustre_msg_size(lustre_request_magic(req), 6, size); - OBD_ALLOC(new_msg, new_size); - if (new_msg != NULL) { - struct lustre_msg *old_msg = req->rq_reqmsg; - - DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u\n", - body->eadatasize); - memcpy(new_msg, old_msg, old_size); - lustre_msg_set_buflen(new_msg, DLM_INTENT_REC_OFF + 2, - body->eadatasize); - - spin_lock(&req->rq_lock); - req->rq_reqmsg = new_msg; - req->rq_reqlen = new_size; - spin_unlock(&req->rq_lock); + int rc; + ENTRY; - OBD_FREE(old_msg, old_size); - } else { + rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4, + body->eadatasize); + if (rc) { + CERROR("Can't enlarge segment %d size to %d\n", + DLM_INTENT_REC_OFF + 4, body->eadatasize); body->valid &= ~OBD_MD_FLEASIZE; body->eadatasize = 0; } + EXIT; } /* We always reserve enough space in the reply packet for a stripe MD, because @@ -233,7 +249,7 @@ int mdc_enqueue(struct obd_export *exp, int lock_type, struct lookup_intent *it, int lock_mode, - struct mdc_op_data *data, + struct md_op_data *op_data, struct lustre_handle *lockh, void *lmm, int lmmsize, @@ -244,22 +260,23 @@ int mdc_enqueue(struct obd_export *exp, struct ptlrpc_request *req; struct obd_device *obddev = class_exp2obd(exp); struct ldlm_res_id res_id = - { .name = {data->fid1.id, data->fid1.generation} }; + { .name = {fid_seq(&op_data->op_fid1), + fid_oid(&op_data->op_fid1), + fid_ver(&op_data->op_fid1)} }; ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } }; struct ldlm_request *lockreq; struct ldlm_intent *lit; struct ldlm_reply *lockrep; - int size[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), + int size[9] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), [DLM_LOCKREQ_OFF] = sizeof(*lockreq), [DLM_INTENT_IT_OFF] = sizeof(*lit) }; - int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), + int repsize[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), [DLM_LOCKREPLY_OFF] = sizeof(*lockrep), - [DLM_REPLY_REC_OFF] = sizeof(struct mds_body), + [DLM_REPLY_REC_OFF] = sizeof(struct mdt_body), [DLM_REPLY_REC_OFF+1] = obddev->u.cli. cl_max_mds_easize }; int flags = extra_lock_flags | LDLM_FL_HAS_INTENT; int repbufcnt = 4, rc; - void *eadata; ENTRY; LASSERTF(lock_type == LDLM_IBITS, "lock type %d\n", lock_type); @@ -267,51 +284,46 @@ int mdc_enqueue(struct obd_export *exp, // ldlm_it2str(it->it_op), it_name, it_inode->i_ino); if (it->it_op & IT_OPEN) { - it->it_create_mode |= S_IFREG; + int do_join = !!(it->it_flags & O_JOIN_FILE); - size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create); - size[DLM_INTENT_REC_OFF + 1] = data->namelen + 1; + it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG; + + size[DLM_INTENT_REC_OFF] = sizeof(struct mdt_rec_create); + /* parent capability */ + size[DLM_INTENT_REC_OFF + 1] = op_data->op_capa1 ? + sizeof(struct lustre_capa) : 0; + /* child capability, used for replay only */ + size[DLM_INTENT_REC_OFF + 2] = sizeof(struct lustre_capa); + size[DLM_INTENT_REC_OFF + 3] = op_data->op_namelen + 1; /* As an optimization, we allocate an RPC request buffer for * at least a default-sized LOV EA even if we aren't sending - * one. We grow the whole request to the next power-of-two - * size since we get that much from a slab allocation anyways. - * This avoids an allocation below in the common case where - * we need to save a default-sized LOV EA for open replay. */ - size[DLM_INTENT_REC_OFF + 2] = max(lmmsize, - obddev->u.cli.cl_default_mds_easize); - rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, 6, - size); - if (rc & (rc - 1)) - size[DLM_INTENT_REC_OFF + 2] = - min(size[DLM_INTENT_REC_OFF+2]+round_up(rc)-rc, - obddev->u.cli.cl_max_mds_easize); - - if (it->it_flags & O_JOIN_FILE) { + * one. + */ + size[DLM_INTENT_REC_OFF + 4] = max(lmmsize, + obddev->u.cli.cl_default_mds_easize); + if (do_join) + size[DLM_INTENT_REC_OFF + 5] = + sizeof(struct mdt_rec_join); + + req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION, + LDLM_ENQUEUE, 8 + do_join, size, NULL); + if (!req) + RETURN(-ENOMEM); + + if (do_join) { __u64 head_size = *(__u32*)cb_data; __u32 tsize = *(__u32*)lmm; /* join is like an unlink of the tail */ policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; - size[DLM_INTENT_REC_OFF + 3] = - sizeof(struct mds_rec_join); - req = ptlrpc_prep_req(class_exp2cliimp(exp), - LUSTRE_DLM_VERSION, LDLM_ENQUEUE, - 7, size, NULL); /* when joining file, cb_data and lmm args together * indicate the head file size*/ - mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, data, + mdc_join_pack(req, DLM_INTENT_REC_OFF + 5, op_data, (head_size << 32) | tsize); cb_data = NULL; lmm = NULL; - } else { - req = ptlrpc_prep_req(class_exp2cliimp(exp), - LUSTRE_DLM_VERSION, LDLM_ENQUEUE, - 6, size, NULL); } - if (!req) - RETURN(-ENOMEM); - spin_lock(&req->rq_lock); req->rq_replay = 1; spin_unlock(&req->rq_lock); @@ -322,16 +334,24 @@ int mdc_enqueue(struct obd_export *exp, lit->opc = (__u64)it->it_op; /* pack the intended request */ - mdc_open_pack(req, DLM_INTENT_REC_OFF, data, it->it_create_mode, - 0, it->it_flags, lmm, lmmsize); - - repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE; + mdc_open_pack(req, DLM_INTENT_REC_OFF, op_data, + it->it_create_mode, 0, it->it_flags, + lmm, lmmsize); + + /* for remote client, fetch remote perm for current user */ + repsize[repbufcnt++] = client_is_remote(exp) ? + sizeof(struct mdt_remote_perm) : + LUSTRE_POSIX_ACL_MAX_SIZE; + repsize[repbufcnt++] = sizeof(struct lustre_capa); + repsize[repbufcnt++] = sizeof(struct lustre_capa); } else if (it->it_op & IT_UNLINK) { - size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_unlink); - size[DLM_INTENT_REC_OFF + 1] = data->namelen + 1; + size[DLM_INTENT_REC_OFF] = sizeof(struct mdt_rec_unlink); + size[DLM_INTENT_REC_OFF + 1] = op_data->op_capa1 ? + sizeof(struct lustre_capa) : 0; + size[DLM_INTENT_REC_OFF + 2] = op_data->op_namelen + 1; policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION, - LDLM_ENQUEUE, 5, size, NULL); + LDLM_ENQUEUE, 6, size, NULL); if (!req) RETURN(-ENOMEM); @@ -341,21 +361,25 @@ int mdc_enqueue(struct obd_export *exp, lit->opc = (__u64)it->it_op; /* pack the intended request */ - mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data); + mdc_unlink_pack(req, DLM_INTENT_REC_OFF, op_data); repsize[repbufcnt++] = obddev->u.cli.cl_max_mds_cookiesize; } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) { obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | - OBD_MD_FLACL | OBD_MD_FLMODEASIZE | - OBD_MD_FLDIREA; - size[DLM_INTENT_REC_OFF] = sizeof(struct mds_body); - size[DLM_INTENT_REC_OFF + 1] = data->namelen + 1; + OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA | + OBD_MD_FLMDSCAPA | OBD_MD_MEA; + valid |= client_is_remote(exp) ? OBD_MD_FLRMTPERM : + OBD_MD_FLACL; + size[DLM_INTENT_REC_OFF] = sizeof(struct mdt_body); + size[DLM_INTENT_REC_OFF + 1] = op_data->op_capa1 ? + sizeof(struct lustre_capa) : 0; + size[DLM_INTENT_REC_OFF + 2] = op_data->op_namelen + 1; if (it->it_op & IT_GETATTR) policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION, - LDLM_ENQUEUE, 5, size, NULL); + LDLM_ENQUEUE, 6, size, NULL); if (!req) RETURN(-ENOMEM); @@ -366,9 +390,12 @@ int mdc_enqueue(struct obd_export *exp, /* pack the intended request */ mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, - it->it_flags, data); + it->it_flags, op_data); - repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE; + repsize[repbufcnt++] = client_is_remote(exp) ? + sizeof(struct mdt_remote_perm) : + LUSTRE_POSIX_ACL_MAX_SIZE; + repsize[repbufcnt++] = sizeof(struct lustre_capa); } else if (it->it_op == IT_READDIR) { policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION, @@ -390,7 +417,7 @@ int mdc_enqueue(struct obd_export *exp, * rpcs in flight counter */ mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it); mdc_enter_request(&obddev->u.cli); - rc = ldlm_cli_enqueue(exp, &req, res_id, lock_type, &policy, + rc = ldlm_cli_enqueue(exp, &req, &res_id, lock_type, &policy, lock_mode, &flags, cb_blocking, cb_completion, NULL, cb_data, NULL, 0, NULL, lockh, 0); mdc_exit_request(&obddev->u.cli); @@ -458,74 +485,127 @@ int mdc_enqueue(struct obd_export *exp, it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status); /* We know what to expect, so we do any byte flipping required here */ - LASSERT(repbufcnt == 5 || repbufcnt == 2); - if (repbufcnt == 5) { - struct mds_body *body; + LASSERT(repbufcnt == 7 || repbufcnt == 6 || repbufcnt == 2); + if (repbufcnt >= 6) { + int reply_off = DLM_REPLY_REC_OFF; + struct mdt_body *body; - body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body), - lustre_swab_mds_body); + body = lustre_swab_repbuf(req, reply_off++, sizeof(*body), + lustre_swab_mdt_body); if (body == NULL) { - CERROR ("Can't swab mds_body\n"); + CERROR ("Can't swab mdt_body\n"); RETURN (-EPROTO); } - /* If this is a successful OPEN request, we need to set - replay handler and data early, so that if replay happens - immediately after swabbing below, new reply is swabbed - by that handler correctly */ - if (it_disposition(it, DISP_OPEN_OPEN) && - !it_open_error(DISP_OPEN_OPEN, it)) - mdc_set_open_replay_data(NULL, req); - - if ((body->valid & OBD_MD_FLEASIZE) != 0) { - /* The eadata is opaque; just check that it is there. - * Eventually, obd_unpackmd() will check the contents */ - eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1, + if (req->rq_replay && it_disposition(it, DISP_OPEN_OPEN) && + !it_open_error(DISP_OPEN_OPEN, it)) { + /* + * If this is a successful OPEN request, we need to set + * replay handler and data early, so that if replay + * happens immediately after swabbing below, new reply + * is swabbed by that handler correctly. + */ + mdc_set_open_replay_data(NULL, NULL, req); + } + + if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) { + void *eadata; + + /* + * The eadata is opaque; just check that it is there. + * Eventually, obd_unpackmd() will check the contents. + */ + eadata = lustre_swab_repbuf(req, reply_off++, body->eadatasize, NULL); if (eadata == NULL) { - CERROR ("Missing/short eadata\n"); - RETURN (-EPROTO); + CERROR("Missing/short eadata\n"); + RETURN(-EPROTO); } if (body->valid & OBD_MD_FLMODEASIZE) { - if (obddev->u.cli.cl_max_mds_easize < - body->max_mdsize) { - obddev->u.cli.cl_max_mds_easize = + if (obddev->u.cli.cl_max_mds_easize < + body->max_mdsize) { + obddev->u.cli.cl_max_mds_easize = body->max_mdsize; CDEBUG(D_INFO, "maxeasize become %d\n", body->max_mdsize); } if (obddev->u.cli.cl_max_mds_cookiesize < - body->max_cookiesize) { + body->max_cookiesize) { obddev->u.cli.cl_max_mds_cookiesize = body->max_cookiesize; CDEBUG(D_INFO, "cookiesize become %d\n", body->max_cookiesize); } } - /* We save the reply LOV EA in case we have to replay - * a create for recovery. If we didn't allocate a - * large enough request buffer above we need to - * reallocate it here to hold the actual LOV EA. */ - if (it->it_op & IT_OPEN) { - int offset = DLM_INTENT_REC_OFF + 2; - if (lustre_msg_buflen(req->rq_reqmsg, offset) < + /* + * We save the reply LOV EA in case we have to replay a + * create for recovery. If we didn't allocate a large + * enough request buffer above we need to reallocate it + * here to hold the actual LOV EA. + * + * To not save LOV EA if request is not going to replay + * (for example error one). + */ + if ((it->it_op & IT_OPEN) && req->rq_replay) { + if (lustre_msg_buflen(req->rq_reqmsg, + DLM_INTENT_REC_OFF + 4) < body->eadatasize) mdc_realloc_openmsg(req, body, size); - lmm = lustre_msg_buf(req->rq_reqmsg, offset, + lmm = lustre_msg_buf(req->rq_reqmsg, + DLM_INTENT_REC_OFF + 4, body->eadatasize); if (lmm) memcpy(lmm, eadata, body->eadatasize); } } + if (body->valid & OBD_MD_FLRMTPERM) { + struct mdt_remote_perm *perm; + + LASSERT(client_is_remote(exp)); + perm = lustre_swab_repbuf(req, reply_off++, + sizeof(*perm), + lustre_swab_mdt_remote_perm); + if (perm == NULL) { + CERROR("missing remote permission!\n"); + RETURN(-EPROTO); + } + } else if ((body->valid & OBD_MD_FLACL) && body->aclsize) { + reply_off++; + } + if (body->valid & OBD_MD_FLMDSCAPA) { + struct lustre_capa *capa, *p; + + capa = lustre_unpack_capa(req->rq_repmsg, reply_off++); + if (capa == NULL) { + CERROR("Missing/short MDS capability\n"); + RETURN(-EPROTO); + } + + if (it->it_op & IT_OPEN) { + /* client fid capa will be checked in replay */ + p = lustre_msg_buf(req->rq_reqmsg, + DLM_INTENT_REC_OFF + 2, + sizeof(*p)); + LASSERT(p); + *p = *capa; + } + } + if (body->valid & OBD_MD_FLOSSCAPA) { + struct lustre_capa *capa; + + capa = lustre_unpack_capa(req->rq_repmsg, reply_off++); + if (capa == NULL) { + CERROR("Missing/short OSS capability\n"); + RETURN(-EPROTO); + } + } } RETURN(rc); } -EXPORT_SYMBOL(mdc_enqueue); - -/* +/* * This long block is all about fixing up the lock and request state * so that it is correct as of the moment _before_ the operation was * applied; that way, the VFS will think that everything is normal and @@ -552,57 +632,72 @@ EXPORT_SYMBOL(mdc_enqueue); * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the * child lookup. */ -int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data, +int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, void *lmm, int lmmsize, struct lookup_intent *it, int lookup_flags, struct ptlrpc_request **reqp, - ldlm_blocking_callback cb_blocking, int extra_lock_flags) + ldlm_blocking_callback cb_blocking, + int extra_lock_flags) { - struct lustre_handle lockh; struct ptlrpc_request *request; - int rc = 0; - struct mds_body *mds_body; struct lustre_handle old_lock; + struct lustre_handle lockh; + struct mdt_body *mdt_body; struct ldlm_lock *lock; + int rc = 0; ENTRY; LASSERT(it); - CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n", - op_data->namelen, op_data->name, op_data->fid1.id, - ldlm_it2str(it->it_op), it->it_flags); + CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID + ", intent: %s flags %#o\n", op_data->op_namelen, + op_data->op_name, PFID(&op_data->op_fid2), + PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), + it->it_flags); - if (op_data->fid2.id && - (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) { + if (fid_is_sane(&op_data->op_fid2) && + (it->it_op & (IT_LOOKUP | IT_GETATTR))) { /* We could just return 1 immediately, but since we should only * be called in revalidate_it if we already have a lock, let's * verify that. */ - struct ldlm_res_id res_id = {.name ={op_data->fid2.id, - op_data->fid2.generation}}; - struct lustre_handle lockh; + struct ldlm_res_id res_id = { .name = { fid_seq(&op_data->op_fid2), + fid_oid(&op_data->op_fid2), + fid_ver(&op_data->op_fid2) } }; ldlm_policy_data_t policy; - int mode = LCK_CR; + ldlm_mode_t mode = LCK_CR; - /* As not all attributes are kept under update lock, e.g. - owner/group/acls are under lookup lock, we need both + /* As not all attributes are kept under update lock, e.g. + owner/group/acls are under lookup lock, we need both ibits for GETATTR. */ + + /* For CMD, UPDATE lock and LOOKUP lock can not be got + * at the same for cross-object, so we can not match + * the 2 lock at the same time FIXME: but how to handle + * the above situation */ policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ? - MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP : - MDS_INODELOCK_LOOKUP; + MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP; rc = ldlm_lock_match(exp->exp_obd->obd_namespace, LDLM_FL_BLOCK_GRANTED, &res_id, - LDLM_IBITS, &policy, LCK_CR, &lockh); + LDLM_IBITS, &policy, mode, &lockh); if (!rc) { mode = LCK_CW; rc = ldlm_lock_match(exp->exp_obd->obd_namespace, LDLM_FL_BLOCK_GRANTED, &res_id, - LDLM_IBITS, &policy,LCK_CW,&lockh); + LDLM_IBITS, &policy, mode, &lockh); } if (!rc) { mode = LCK_PR; rc = ldlm_lock_match(exp->exp_obd->obd_namespace, LDLM_FL_BLOCK_GRANTED, &res_id, - LDLM_IBITS, &policy,LCK_PR,&lockh); + LDLM_IBITS, &policy, mode, &lockh); + } + + if (!rc) { + mode = LCK_PW; + rc = ldlm_lock_match(exp->exp_obd->obd_namespace, + LDLM_FL_BLOCK_GRANTED, &res_id, + LDLM_IBITS, &policy, mode, &lockh); } + if (rc) { memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh)); @@ -611,7 +706,7 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data, /* Only return failure if it was not GETATTR by cfid (from inode_revalidate) */ - if (rc || op_data->namelen != 0) + if (rc || op_data->op_namelen != 0) RETURN(rc); } @@ -624,7 +719,15 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data, * this and use the request from revalidate. In this case, revalidate * never dropped its reference, so the refcounts are all OK */ if (!it_disposition(it, DISP_ENQ_COMPLETE)) { - + /* For case if upper layer did not alloc fid, do it now. */ + if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) { + rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data); + if (rc < 0) { + CERROR("Can't alloc new fid, rc %d\n", rc); + RETURN(rc); + } + } + rc = mdc_enqueue(exp, LDLM_IBITS, it, it_to_lock_mode(it), op_data, &lockh, lmm, lmmsize, ldlm_completion_ast, cb_blocking, NULL, @@ -632,7 +735,8 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data, if (rc < 0) RETURN(rc); memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh)); - } else if (!op_data->fid2.id) { + } else if (!fid_is_sane(&op_data->op_fid2) || + !(it->it_flags & O_CHECK_STALE)) { /* DISP_ENQ_COMPLETE set means there is extra reference on * request referenced from this intent, saved for subsequent * lookup. This path is executed when we proceed to this @@ -654,19 +758,20 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data, if (rc) RETURN(rc); - mds_body = lustre_msg_buf(request->rq_repmsg, DLM_REPLY_REC_OFF, - sizeof(*mds_body)); - LASSERT(mds_body != NULL); /* mdc_enqueue checked */ + mdt_body = lustre_msg_buf(request->rq_repmsg, DLM_REPLY_REC_OFF, + sizeof(*mdt_body)); + LASSERT(mdt_body != NULL); /* mdc_enqueue checked */ LASSERT_REPSWABBED(request, 1); /* mdc_enqueue swabbed */ /* If we were revalidating a fid/name pair, mark the intent in * case we fail and get called again from lookup */ - if (op_data->fid2.id && (it->it_op != IT_GETATTR)) { + if (fid_is_sane(&op_data->op_fid2) && (it->it_flags & O_CHECK_STALE) && + (it->it_op != IT_GETATTR)) { it_set_disposition(it, DISP_ENQ_COMPLETE); + /* Also: did we find the same inode? */ - if (memcmp(&op_data->fid2, &mds_body->fid1, - sizeof(op_data->fid2))) - RETURN (-ESTALE); + if (!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) + RETURN(-ESTALE); } rc = it_open_error(DISP_LOOKUP_EXECD, it); @@ -708,7 +813,18 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data, if (lock) { ldlm_policy_data_t policy = lock->l_policy_data; LDLM_DEBUG(lock, "matching against this"); + + LASSERTF(fid_res_name_eq(&mdt_body->fid1, + &lock->l_resource->lr_name), + "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n", + (unsigned long)lock->l_resource->lr_name.name[0], + (unsigned long)lock->l_resource->lr_name.name[1], + (unsigned long)lock->l_resource->lr_name.name[2], + (unsigned long)fid_seq(&mdt_body->fid1), + (unsigned long)fid_oid(&mdt_body->fid1), + (unsigned long)fid_ver(&mdt_body->fid1)); LDLM_LOCK_PUT(lock); + memcpy(&old_lock, &lockh, sizeof(lockh)); if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL, LDLM_IBITS, &policy, LCK_NL, &old_lock)) { @@ -720,9 +836,8 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data, } } CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n", - op_data->namelen, op_data->name, ldlm_it2str(it->it_op), + op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op), it->d.lustre.it_status, it->d.lustre.it_disposition, rc); RETURN(rc); } -EXPORT_SYMBOL(mdc_intent_lock);