X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdc%2Fmdc_locks.c;h=1da0c78f11b25038cbdc537434ab5510ca72945a;hp=b9ec6d18b7c3270bcb857c9fdcae5dac9b4fe7f4;hb=1cecd6cc21fc408f6cbefd82569967cceb3c40fb;hpb=710e1a34bada840db26198d8333d6477e536ef86 diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index b9ec6d1..1da0c78 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -1,25 +1,37 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * Copyright (C) 2001-2003 Cluster File Systems, Inc. + * GPL HEADER START * - * This file is part of the Lustre file system, http://www.lustre.org - * Lustre is a trademark of Cluster File Systems, Inc. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * You may have signed or agreed to another license before downloading - * this software. If so, you are bound by the terms and conditions - * of that agreement, and the following does not apply to you. See the - * LICENSE file included with this distribution for more information. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * If you did not agree to a different license, then this copy of Lustre - * is open source software; you can redistribute it and/or modify it - * under the terms of version 2 of the GNU General Public License as - * published by the Free Software Foundation. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * - * In either case, Lustre is distributed in the hope that it will be - * useful, but WITHOUT ANY WARRANTY; without even the implied warranty - * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * license text for more details. + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. */ #ifndef EXPORT_SYMTAB @@ -36,7 +48,7 @@ # include #endif -#include +#include #include #include /* fid_res_name_eq() */ @@ -62,20 +74,6 @@ void it_clear_disposition(struct lookup_intent *it, int flag) } EXPORT_SYMBOL(it_clear_disposition); -static int it_to_lock_mode(struct lookup_intent *it) -{ - ENTRY; - - /* CREAT needs to be tested before open (both could be set) */ - if (it->it_op & IT_CREAT) - return LCK_PW; - else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP)) - return LCK_PR; - - LBUG(); - RETURN(-EINVAL); -} - int it_open_error(int phase, struct lookup_intent *it) { if (it_disposition(it, DISP_OPEN_OPEN)) { @@ -113,11 +111,15 @@ int it_open_error(int phase, struct lookup_intent *it) EXPORT_SYMBOL(it_open_error); /* this must be called on a lockh that is known to have a referenced lock */ -int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data) +int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data, + __u32 *bits) { struct ldlm_lock *lock; ENTRY; + if(bits) + *bits = 0; + if (!*lockh) { EXIT; RETURN(0); @@ -140,28 +142,27 @@ int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data) } #endif lock->l_ast_data = data; + if (bits) + *bits = lock->l_policy_data.l_inodebits.bits; + unlock_res_and_lock(lock); LDLM_LOCK_PUT(lock); RETURN(0); } -int mdc_lock_match(struct obd_export *exp, int flags, - const struct lu_fid *fid, ldlm_type_t type, - ldlm_policy_data_t *policy, ldlm_mode_t mode, - struct lustre_handle *lockh) +ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags, + const struct lu_fid *fid, ldlm_type_t type, + ldlm_policy_data_t *policy, ldlm_mode_t mode, + struct lustre_handle *lockh) { - struct ldlm_res_id res_id = - { .name = {fid_seq(fid), - fid_oid(fid), - fid_ver(fid)} }; - struct obd_device *obd = class_exp2obd(exp); - int rc; + struct ldlm_res_id res_id; + ldlm_mode_t rc; ENTRY; - rc = ldlm_lock_match(obd->obd_namespace, flags, - &res_id, type, policy, mode, lockh); - + fid_build_reg_res_name(fid, &res_id); + rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags, + &res_id, type, policy, mode, lockh, 0); RETURN(rc); } @@ -170,15 +171,13 @@ int mdc_cancel_unused(struct obd_export *exp, ldlm_policy_data_t *policy, ldlm_mode_t mode, int flags, void *opaque) { - struct ldlm_res_id res_id = - { .name = {fid_seq(fid), - fid_oid(fid), - fid_ver(fid)} }; + struct ldlm_res_id res_id; struct obd_device *obd = class_exp2obd(exp); int rc; ENTRY; + fid_build_reg_res_name(fid, &res_id); rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id, policy, mode, flags, opaque); RETURN(rc); @@ -188,13 +187,10 @@ int mdc_change_cbdata(struct obd_export *exp, const struct lu_fid *fid, ldlm_iterator_t it, void *data) { - struct ldlm_res_id res_id = { .name = {0} }; + struct ldlm_res_id res_id; ENTRY; - res_id.name[0] = fid_seq(fid); - res_id.name[1] = fid_oid(fid); - res_id.name[2] = fid_ver(fid); - + fid_build_reg_res_name(fid, &res_id); ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id, it, data); @@ -228,11 +224,11 @@ static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc) * but this is incredibly unlikely, and questionable whether the client * could do MDS recovery under OOM anyways... */ static void mdc_realloc_openmsg(struct ptlrpc_request *req, - struct mdt_body *body, int size[9]) + struct mdt_body *body) { int rc; - ENTRY; + /* FIXME: remove this explicit offset. */ rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4, body->eadatasize); if (rc) { @@ -241,205 +237,228 @@ static void mdc_realloc_openmsg(struct ptlrpc_request *req, body->valid &= ~OBD_MD_FLEASIZE; body->eadatasize = 0; } - EXIT; } -/* We always reserve enough space in the reply packet for a stripe MD, because - * we don't know in advance the file type. */ -int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, - struct lookup_intent *it, struct md_op_data *op_data, - struct lustre_handle *lockh, void *lmm, int lmmsize, - int extra_lock_flags) +static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp, + struct lookup_intent *it, + struct md_op_data *op_data, + void *lmm, int lmmsize, + void *cb_data) { struct ptlrpc_request *req; - struct obd_device *obddev = class_exp2obd(exp); - struct ldlm_res_id res_id = - { .name = {fid_seq(&op_data->op_fid1), - fid_oid(&op_data->op_fid1), - fid_ver(&op_data->op_fid1)} }; - ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } }; - struct ldlm_request *lockreq; - struct ldlm_intent *lit; - struct ldlm_reply *lockrep; - int size[9] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), - [DLM_LOCKREQ_OFF] = sizeof(*lockreq), - [DLM_INTENT_IT_OFF] = sizeof(*lit), - 0, 0, 0, 0, 0, 0 }; - int repsize[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), - [DLM_LOCKREPLY_OFF] = sizeof(*lockrep), - [DLM_REPLY_REC_OFF] = sizeof(struct mdt_body), - [DLM_REPLY_REC_OFF+1] = obddev->u.cli. - cl_max_mds_easize, - 0, 0, 0 }; - int flags = extra_lock_flags | LDLM_FL_HAS_INTENT; - int repbufcnt = 4, rc; + struct obd_device *obddev = class_exp2obd(exp); + struct ldlm_intent *lit; + int joinfile = !!((it->it_create_mode & M_JOIN_FILE) && + op_data->op_data); + CFS_LIST_HEAD(cancels); + int count = 0; + int mode; + int rc; ENTRY; - LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type); - - if (it->it_op & IT_OPEN) { - int do_join = !!(it->it_flags & O_JOIN_FILE); - CFS_LIST_HEAD(cancels); - int count = 0; - int mode; - - it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG; - - size[DLM_INTENT_REC_OFF] = sizeof(struct mdt_rec_create); - /* parent capability */ - size[DLM_INTENT_REC_OFF + 1] = op_data->op_capa1 ? - sizeof(struct lustre_capa) : 0; - /* child capability, used for replay only */ - size[DLM_INTENT_REC_OFF + 2] = sizeof(struct lustre_capa); - size[DLM_INTENT_REC_OFF + 3] = op_data->op_namelen + 1; - /* As an optimization, we allocate an RPC request buffer for - * at least a default-sized LOV EA even if we aren't sending - * one. - */ - size[DLM_INTENT_REC_OFF + 4] = max(lmmsize, - obddev->u.cli.cl_default_mds_easize); - - /* XXX: openlock is not cancelled for cross-refs. */ - /* If inode is known, cancel conflicting OPEN locks. */ - if (fid_is_sane(&op_data->op_fid2)) { - if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC)) - mode = LCK_CW; + it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG; + + /* XXX: openlock is not cancelled for cross-refs. */ + /* If inode is known, cancel conflicting OPEN locks. */ + if (fid_is_sane(&op_data->op_fid2)) { + if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC)) + mode = LCK_CW; #ifdef FMODE_EXEC - else if (it->it_flags & FMODE_EXEC) - mode = LCK_PR; + else if (it->it_flags & FMODE_EXEC) + mode = LCK_PR; #endif - else - mode = LCK_CR; - count = mdc_resource_get_unused(exp, &op_data->op_fid2, - &cancels, mode, - MDS_INODELOCK_OPEN); - } - - /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */ - if (it->it_op & IT_CREAT || it->it_flags & O_JOIN_FILE) - mode = LCK_EX; else mode = LCK_CR; - count += mdc_resource_get_unused(exp, &op_data->op_fid1, - &cancels, mode, - MDS_INODELOCK_UPDATE); + count = mdc_resource_get_unused(exp, &op_data->op_fid2, + &cancels, mode, + MDS_INODELOCK_OPEN); + } - if (do_join) - size[DLM_INTENT_REC_OFF + 5] = - sizeof(struct mdt_rec_join); + /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */ + if (it->it_op & IT_CREAT || joinfile) + mode = LCK_EX; + else + mode = LCK_CR; + count += mdc_resource_get_unused(exp, &op_data->op_fid1, + &cancels, mode, + MDS_INODELOCK_UPDATE); + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_LDLM_INTENT_OPEN); + if (req == NULL) { + ldlm_lock_list_put(&cancels, l_bl_ast, count); + RETURN(ERR_PTR(-ENOMEM)); + } - req = ldlm_prep_enqueue_req(exp, 8 + do_join, size, &cancels, - count); - if (!req) - RETURN(-ENOMEM); + /* parent capability */ + mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); + /* child capability, reserve the size according to parent capa, it will + * be filled after we get the reply */ + mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1); + + req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, + op_data->op_namelen + 1); + req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, + max(lmmsize, obddev->u.cli.cl_default_mds_easize)); + if (!joinfile) { + req_capsule_set_size(&req->rq_pill, &RMF_REC_JOINFILE, + RCL_CLIENT, 0); + } - if (do_join) { - /* join is like an unlink of the tail */ - policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; - mdc_join_pack(req, DLM_INTENT_REC_OFF + 5, op_data, - (*(__u64 *)op_data->op_data)); - } + rc = ldlm_prep_enqueue_req(exp, req, &cancels, count); + if (rc) { + ptlrpc_request_free(req); + return NULL; + } - spin_lock(&req->rq_lock); - req->rq_replay = 1; - spin_unlock(&req->rq_lock); + if (joinfile) { + __u64 head_size = *(__u64 *)op_data->op_data; + mdc_join_pack(req, op_data, head_size); + } - /* pack the intent */ - lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF, - sizeof(*lit)); - lit->opc = (__u64)it->it_op; - - /* pack the intended request */ - mdc_open_pack(req, DLM_INTENT_REC_OFF, op_data, - it->it_create_mode, 0, it->it_flags, - lmm, lmmsize); - - /* for remote client, fetch remote perm for current user */ - repsize[repbufcnt++] = client_is_remote(exp) ? - sizeof(struct mdt_remote_perm) : - LUSTRE_POSIX_ACL_MAX_SIZE; - repsize[repbufcnt++] = sizeof(struct lustre_capa); - repsize[repbufcnt++] = sizeof(struct lustre_capa); - } else if (it->it_op & IT_UNLINK) { - size[DLM_INTENT_REC_OFF] = sizeof(struct mdt_rec_unlink); - size[DLM_INTENT_REC_OFF + 1] = op_data->op_capa1 ? - sizeof(struct lustre_capa) : 0; - size[DLM_INTENT_REC_OFF + 2] = op_data->op_namelen + 1; - policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; - req = ldlm_prep_enqueue_req(exp, 6, size, NULL, 0); - if (!req) - RETURN(-ENOMEM); - - /* pack the intent */ - lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF, - sizeof(*lit)); - lit->opc = (__u64)it->it_op; - - /* pack the intended request */ - mdc_unlink_pack(req, DLM_INTENT_REC_OFF, op_data); - - repsize[repbufcnt++] = obddev->u.cli.cl_max_mds_cookiesize; - } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) { - obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | - OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA | - OBD_MD_FLMDSCAPA | OBD_MD_MEA; - valid |= client_is_remote(exp) ? OBD_MD_FLRMTPERM : - OBD_MD_FLACL; - size[DLM_INTENT_REC_OFF] = sizeof(struct mdt_body); - size[DLM_INTENT_REC_OFF + 1] = op_data->op_capa1 ? - sizeof(struct lustre_capa) : 0; - size[DLM_INTENT_REC_OFF + 2] = op_data->op_namelen + 1; - - if (it->it_op & IT_GETATTR) - policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; + spin_lock(&req->rq_lock); + req->rq_replay = req->rq_import->imp_replayable; + spin_unlock(&req->rq_lock); - req = ldlm_prep_enqueue_req(exp, 6, size, NULL, 0); - if (!req) - RETURN(-ENOMEM); + /* pack the intent */ + lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); + lit->opc = (__u64)it->it_op; - /* pack the intent */ - lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF, - sizeof(*lit)); - lit->opc = (__u64)it->it_op; + /* pack the intended request */ + mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm, + lmmsize); - /* pack the intended request */ - mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, - it->it_flags, op_data); + /* for remote client, fetch remote perm for current user */ + if (client_is_remote(exp)) + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, + sizeof(struct mdt_remote_perm)); + ptlrpc_request_set_replen(req); + return req; +} - repsize[repbufcnt++] = client_is_remote(exp) ? - sizeof(struct mdt_remote_perm) : - LUSTRE_POSIX_ACL_MAX_SIZE; - repsize[repbufcnt++] = sizeof(struct lustre_capa); - } else if (it->it_op == IT_READDIR) { - policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; - req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0); - if (!req) - RETURN(-ENOMEM); +static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp, + struct lookup_intent *it, + struct md_op_data *op_data) +{ + struct ptlrpc_request *req; + struct obd_device *obddev = class_exp2obd(exp); + struct ldlm_intent *lit; + int rc; + ENTRY; - repbufcnt = 2; - } else { - LBUG(); - RETURN(-EINVAL); + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_LDLM_INTENT_UNLINK); + if (req == NULL) + RETURN(ERR_PTR(-ENOMEM)); + + mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); + req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, + op_data->op_namelen + 1); + + rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); + if (rc) { + ptlrpc_request_free(req); + RETURN(ERR_PTR(rc)); } - /* get ready for the reply */ - ptlrpc_req_set_repsize(req, repbufcnt, repsize); + /* pack the intent */ + lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); + lit->opc = (__u64)it->it_op; - /* It is important to obtain rpc_lock first (if applicable), so that - * threads that are serialised with rpc_lock are not polluting our - * rpcs in flight counter */ - mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it); - mdc_enter_request(&obddev->u.cli); - rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL, - 0, NULL, lockh, 0); - mdc_exit_request(&obddev->u.cli); - mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it); + /* pack the intended request */ + mdc_unlink_pack(req, op_data); + + req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, + obddev->u.cli.cl_max_mds_easize); + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, + obddev->u.cli.cl_max_mds_cookiesize); + ptlrpc_request_set_replen(req); + RETURN(req); +} + +static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp, + struct lookup_intent *it, + struct md_op_data *op_data) +{ + struct ptlrpc_request *req; + struct obd_device *obddev = class_exp2obd(exp); + obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | + OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA | + OBD_MD_FLMDSCAPA | OBD_MD_MEA | + (client_is_remote(exp) ? + OBD_MD_FLRMTPERM : OBD_MD_FLACL); + struct ldlm_intent *lit; + int rc; + ENTRY; + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_LDLM_INTENT_GETATTR); + if (req == NULL) + RETURN(ERR_PTR(-ENOMEM)); + + mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); + req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, + op_data->op_namelen + 1); + + rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); + if (rc) { + ptlrpc_request_free(req); + RETURN(ERR_PTR(rc)); + } + + /* pack the intent */ + lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); + lit->opc = (__u64)it->it_op; + + /* pack the intended request */ + mdc_getattr_pack(req, valid, it->it_flags, op_data); + + req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, + obddev->u.cli.cl_max_mds_easize); + if (client_is_remote(exp)) + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, + sizeof(struct mdt_remote_perm)); + ptlrpc_request_set_replen(req); + RETURN(req); +} + +static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp) +{ + struct ptlrpc_request *req; + int rc; + ENTRY; + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE); + if (req == NULL) + RETURN(ERR_PTR(-ENOMEM)); + + rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); + if (rc) { + ptlrpc_request_free(req); + RETURN(ERR_PTR(rc)); + } + ptlrpc_request_set_replen(req); + RETURN(req); +} + +static int mdc_finish_enqueue(struct obd_export *exp, + struct ptlrpc_request *req, + struct ldlm_enqueue_info *einfo, + struct lookup_intent *it, + struct lustre_handle *lockh, + int rc) +{ + struct req_capsule *pill = &req->rq_pill; + struct ldlm_request *lockreq; + struct ldlm_reply *lockrep; + ENTRY; + + LASSERT(rc >= 0); /* Similarly, if we're going to replay this request, we don't want to * actually get a lock, just perform the intent. */ if (req->rq_transno || req->rq_replay) { - lockreq = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, - sizeof(*lockreq)); + lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ); lockreq->lock_flags |= LDLM_FL_INTENT_ONLY; } @@ -447,12 +466,6 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, einfo->ei_mode = 0; memset(lockh, 0, sizeof(*lockh)); rc = 0; - } else if (rc != 0) { - CERROR("ldlm_cli_enqueue: %d\n", rc); - LASSERTF(rc < 0, "rc %d\n", rc); - mdc_clear_replay_flag(req, rc); - ptlrpc_req_finished(req); - RETURN(rc); } else { /* rc = 0 */ struct ldlm_lock *lock = ldlm_handle2lock(lockh); LASSERT(lock); @@ -467,14 +480,13 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, LDLM_LOCK_PUT(lock); } - lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF, - sizeof(*lockrep)); + lockrep = req_capsule_server_get(pill, &RMF_DLM_REP); LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */ - LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF); /* swabbed by ldlm_cli_enqueue() */ it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1; it->d.lustre.it_status = (int)lockrep->lock_policy_res2; it->d.lustre.it_lock_mode = einfo->ei_mode; + it->d.lustre.it_lock_handle = lockh->cookie; it->d.lustre.it_data = req; if (it->d.lustre.it_status < 0 && req->rq_replay) @@ -495,19 +507,16 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status); /* We know what to expect, so we do any byte flipping required here */ - LASSERT(repbufcnt == 7 || repbufcnt == 6 || repbufcnt == 2); - if (repbufcnt >= 6) { - int reply_off = DLM_REPLY_REC_OFF; + if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) { struct mdt_body *body; - body = lustre_swab_repbuf(req, reply_off++, sizeof(*body), - lustre_swab_mdt_body); + body = req_capsule_server_get(pill, &RMF_MDT_BODY); if (body == NULL) { CERROR ("Can't swab mdt_body\n"); RETURN (-EPROTO); } - if (req->rq_replay && it_disposition(it, DISP_OPEN_OPEN) && + if (it_disposition(it, DISP_OPEN_OPEN) && !it_open_error(DISP_OPEN_OPEN, it)) { /* * If this is a successful OPEN request, we need to set @@ -525,28 +534,10 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, * The eadata is opaque; just check that it is there. * Eventually, obd_unpackmd() will check the contents. */ - eadata = lustre_swab_repbuf(req, reply_off++, - body->eadatasize, NULL); - if (eadata == NULL) { - CERROR("Missing/short eadata\n"); + eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD, + body->eadatasize); + if (eadata == NULL) RETURN(-EPROTO); - } - if (body->valid & OBD_MD_FLMODEASIZE) { - if (obddev->u.cli.cl_max_mds_easize < - body->max_mdsize) { - obddev->u.cli.cl_max_mds_easize = - body->max_mdsize; - CDEBUG(D_INFO, "maxeasize become %d\n", - body->max_mdsize); - } - if (obddev->u.cli.cl_max_mds_cookiesize < - body->max_cookiesize) { - obddev->u.cli.cl_max_mds_cookiesize = - body->max_cookiesize; - CDEBUG(D_INFO, "cookiesize become %d\n", - body->max_cookiesize); - } - } /* * We save the reply LOV EA in case we have to replay a @@ -558,46 +549,45 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, * (for example error one). */ if ((it->it_op & IT_OPEN) && req->rq_replay) { - if (lustre_msg_buflen(req->rq_reqmsg, - DLM_INTENT_REC_OFF + 4) < + void *lmm; + if (req_capsule_get_size(pill, &RMF_EADATA, + RCL_CLIENT) < body->eadatasize) - mdc_realloc_openmsg(req, body, size); - - lmm = lustre_msg_buf(req->rq_reqmsg, - DLM_INTENT_REC_OFF + 4, + mdc_realloc_openmsg(req, body); + else + req_capsule_shrink(pill, &RMF_EADATA, + body->eadatasize, + RCL_CLIENT); + + req_capsule_set_size(pill, &RMF_EADATA, + RCL_CLIENT, body->eadatasize); + + lmm = req_capsule_client_get(pill, &RMF_EADATA); if (lmm) memcpy(lmm, eadata, body->eadatasize); } } + if (body->valid & OBD_MD_FLRMTPERM) { struct mdt_remote_perm *perm; LASSERT(client_is_remote(exp)); - perm = lustre_swab_repbuf(req, reply_off++, - sizeof(*perm), - lustre_swab_mdt_remote_perm); - if (perm == NULL) { - CERROR("missing remote permission!\n"); + perm = req_capsule_server_swab_get(pill, &RMF_ACL, + lustre_swab_mdt_remote_perm); + if (perm == NULL) RETURN(-EPROTO); - } - } else if ((body->valid & OBD_MD_FLACL) && body->aclsize) { - reply_off++; } if (body->valid & OBD_MD_FLMDSCAPA) { struct lustre_capa *capa, *p; - capa = lustre_unpack_capa(req->rq_repmsg, reply_off++); - if (capa == NULL) { - CERROR("Missing/short MDS capability\n"); + capa = req_capsule_server_get(pill, &RMF_CAPA1); + if (capa == NULL) RETURN(-EPROTO); - } if (it->it_op & IT_OPEN) { /* client fid capa will be checked in replay */ - p = lustre_msg_buf(req->rq_reqmsg, - DLM_INTENT_REC_OFF + 2, - sizeof(*p)); + p = req_capsule_client_get(pill, &RMF_CAPA2); LASSERT(p); *p = *capa; } @@ -605,16 +595,230 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, if (body->valid & OBD_MD_FLOSSCAPA) { struct lustre_capa *capa; - capa = lustre_unpack_capa(req->rq_repmsg, reply_off++); - if (capa == NULL) { - CERROR("Missing/short OSS capability\n"); + capa = req_capsule_server_get(pill, &RMF_CAPA2); + if (capa == NULL) RETURN(-EPROTO); - } } } RETURN(rc); } + +/* We always reserve enough space in the reply packet for a stripe MD, because + * we don't know in advance the file type. */ +int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, + struct lookup_intent *it, struct md_op_data *op_data, + struct lustre_handle *lockh, void *lmm, int lmmsize, + struct ptlrpc_request **reqp, int extra_lock_flags) +{ + struct obd_device *obddev = class_exp2obd(exp); + struct ptlrpc_request *req = NULL; + struct req_capsule *pill; + int flags = extra_lock_flags; + int rc; + struct ldlm_res_id res_id; + ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } }; + ENTRY; + + LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n", + einfo->ei_type); + + fid_build_reg_res_name(&op_data->op_fid1, &res_id); + + if (it) + flags |= LDLM_FL_HAS_INTENT; + if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR)) + policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; + + if (reqp) + req = *reqp; + + if (!it) { + /* The only way right now is FLOCK, in this case we hide flock + policy as lmm, but lmmsize is 0 */ + LASSERT(lmm && lmmsize == 0); + LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n", + einfo->ei_type); + policy = *(ldlm_policy_data_t *)lmm; + res_id.name[3] = LDLM_FLOCK; + } else if (it->it_op & IT_OPEN) { + int joinfile = !!((it->it_create_mode & M_JOIN_FILE) && + op_data->op_data); + + req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize, + einfo->ei_cbdata); + if (!joinfile) { + policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; + einfo->ei_cbdata = NULL; + lmm = NULL; + } else + it->it_create_mode &= ~M_JOIN_FILE; + } else if (it->it_op & IT_UNLINK) + req = mdc_intent_unlink_pack(exp, it, op_data); + else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) + req = mdc_intent_getattr_pack(exp, it, op_data); + else if (it->it_op == IT_READDIR) + req = ldlm_enqueue_pack(exp); + else { + LBUG(); + RETURN(-EINVAL); + } + + if (IS_ERR(req)) + RETURN(PTR_ERR(req)); + pill = &req->rq_pill; + + /* It is important to obtain rpc_lock first (if applicable), so that + * threads that are serialised with rpc_lock are not polluting our + * rpcs in flight counter. We do not do flock request limiting, though*/ + if (it) { + mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it); + mdc_enter_request(&obddev->u.cli); + } + rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL, + 0, NULL, lockh, 0); + if (reqp) + *reqp = req; + + if (it) { + mdc_exit_request(&obddev->u.cli); + mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it); + } + if (!it) { + /* For flock requests we immediatelly return without further + delay and let caller deal with the rest, since rest of + this function metadata processing makes no sense for flock + requests anyway */ + RETURN(rc); + } + + if (rc < 0) { + CERROR("ldlm_cli_enqueue: %d\n", rc); + mdc_clear_replay_flag(req, rc); + ptlrpc_req_finished(req); + RETURN(rc); + } + rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc); + + RETURN(rc); +} + +static int mdc_finish_intent_lock(struct obd_export *exp, + struct ptlrpc_request *request, + struct md_op_data *op_data, + struct lookup_intent *it, + struct lustre_handle *lockh) +{ + struct lustre_handle old_lock; + struct mdt_body *mdt_body; + struct ldlm_lock *lock; + int rc; + + + LASSERT(request != NULL); + LASSERT(request != LP_POISON); + LASSERT(request->rq_repmsg != LP_POISON); + + if (!it_disposition(it, DISP_IT_EXECD)) { + /* The server failed before it even started executing the + * intent, i.e. because it couldn't unpack the request. */ + LASSERT(it->d.lustre.it_status != 0); + RETURN(it->d.lustre.it_status); + } + rc = it_open_error(DISP_IT_EXECD, it); + if (rc) + RETURN(rc); + + mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY); + LASSERT(mdt_body != NULL); /* mdc_enqueue checked */ + + /* If we were revalidating a fid/name pair, mark the intent in + * case we fail and get called again from lookup */ + if (fid_is_sane(&op_data->op_fid2) && + it->it_create_mode & M_CHECK_STALE && + it->it_op != IT_GETATTR) { + it_set_disposition(it, DISP_ENQ_COMPLETE); + + /* Also: did we find the same inode? */ + /* sever can return one of two fids: + * op_fid2 - new allocated fid - if file is created. + * op_fid3 - existent fid - if file only open. + * op_fid3 is saved in lmv_intent_open */ + if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) && + (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) { + CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID + "\n", PFID(&op_data->op_fid2), + PFID(&op_data->op_fid2), PFID(&mdt_body->fid1)); + RETURN(-ESTALE); + } + } + + rc = it_open_error(DISP_LOOKUP_EXECD, it); + if (rc) + RETURN(rc); + + /* keep requests around for the multiple phases of the call + * this shows the DISP_XX must guarantee we make it into the call + */ + if (!it_disposition(it, DISP_ENQ_CREATE_REF) && + it_disposition(it, DISP_OPEN_CREATE) && + !it_open_error(DISP_OPEN_CREATE, it)) { + it_set_disposition(it, DISP_ENQ_CREATE_REF); + ptlrpc_request_addref(request); /* balanced in ll_create_node */ + } + if (!it_disposition(it, DISP_ENQ_OPEN_REF) && + it_disposition(it, DISP_OPEN_OPEN) && + !it_open_error(DISP_OPEN_OPEN, it)) { + it_set_disposition(it, DISP_ENQ_OPEN_REF); + ptlrpc_request_addref(request); /* balanced in ll_file_open */ + /* BUG 11546 - eviction in the middle of open rpc processing */ + OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout); + } + + if (it->it_op & IT_CREAT) { + /* XXX this belongs in ll_create_it */ + } else if (it->it_op == IT_OPEN) { + LASSERT(!it_disposition(it, DISP_OPEN_CREATE)); + } else { + LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP)); + } + + /* If we already have a matching lock, then cancel the new + * one. We have to set the data here instead of in + * mdc_enqueue, because we need to use the child's inode as + * the l_ast_data to match, and that's not available until + * intent_finish has performed the iget().) */ + lock = ldlm_handle2lock(lockh); + if (lock) { + ldlm_policy_data_t policy = lock->l_policy_data; + LDLM_DEBUG(lock, "matching against this"); + + LASSERTF(fid_res_name_eq(&mdt_body->fid1, + &lock->l_resource->lr_name), + "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n", + (unsigned long)lock->l_resource->lr_name.name[0], + (unsigned long)lock->l_resource->lr_name.name[1], + (unsigned long)lock->l_resource->lr_name.name[2], + (unsigned long)fid_seq(&mdt_body->fid1), + (unsigned long)fid_oid(&mdt_body->fid1), + (unsigned long)fid_ver(&mdt_body->fid1)); + LDLM_LOCK_PUT(lock); + + memcpy(&old_lock, lockh, sizeof(*lockh)); + if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL, + LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) { + ldlm_lock_decref_and_cancel(lockh, + it->d.lustre.it_lock_mode); + memcpy(lockh, &old_lock, sizeof(old_lock)); + it->d.lustre.it_lock_handle = lockh->cookie; + } + } + CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n", + op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op), + it->d.lustre.it_status, it->d.lustre.it_disposition, rc); + RETURN(rc); +} + /* * This long block is all about fixing up the lock and request state * so that it is correct as of the moment _before_ the operation was @@ -648,11 +852,7 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, ldlm_blocking_callback cb_blocking, int extra_lock_flags) { - struct ptlrpc_request *request; - struct lustre_handle old_lock; struct lustre_handle lockh; - struct mdt_body *mdt_body; - struct ldlm_lock *lock; int rc = 0; ENTRY; LASSERT(it); @@ -663,16 +863,14 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags); + lockh.cookie = 0; if (fid_is_sane(&op_data->op_fid2) && (it->it_op & (IT_LOOKUP | IT_GETATTR))) { /* We could just return 1 immediately, but since we should only * be called in revalidate_it if we already have a lock, let's * verify that. */ - struct ldlm_res_id res_id = { .name = { fid_seq(&op_data->op_fid2), - fid_oid(&op_data->op_fid2), - fid_ver(&op_data->op_fid2) } }; ldlm_policy_data_t policy; - ldlm_mode_t mode = LCK_CR; + ldlm_mode_t mode; /* As not all attributes are kept under update lock, e.g. owner/group/acls are under lookup lock, we need both @@ -685,39 +883,18 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ? MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP; - rc = ldlm_lock_match(exp->exp_obd->obd_namespace, - LDLM_FL_BLOCK_GRANTED, &res_id, - LDLM_IBITS, &policy, mode, &lockh); - if (!rc) { - mode = LCK_CW; - rc = ldlm_lock_match(exp->exp_obd->obd_namespace, - LDLM_FL_BLOCK_GRANTED, &res_id, - LDLM_IBITS, &policy, mode, &lockh); - } - if (!rc) { - mode = LCK_PR; - rc = ldlm_lock_match(exp->exp_obd->obd_namespace, - LDLM_FL_BLOCK_GRANTED, &res_id, - LDLM_IBITS, &policy, mode, &lockh); - } - - if (!rc) { - mode = LCK_PW; - rc = ldlm_lock_match(exp->exp_obd->obd_namespace, - LDLM_FL_BLOCK_GRANTED, &res_id, - LDLM_IBITS, &policy, mode, &lockh); - } - - if (rc) { - memcpy(&it->d.lustre.it_lock_handle, &lockh, - sizeof(lockh)); + mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, + &op_data->op_fid2, LDLM_IBITS, &policy, + LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh); + if (mode) { + it->d.lustre.it_lock_handle = lockh.cookie; it->d.lustre.it_lock_mode = mode; } /* Only return failure if it was not GETATTR by cfid (from inode_revalidate) */ - if (rc || op_data->op_namelen != 0) - RETURN(rc); + if (mode || op_data->op_namelen != 0) + RETURN(!!mode); } /* lookup_it may be called only after revalidate_it has run, because @@ -731,7 +908,7 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, if (!it_disposition(it, DISP_ENQ_COMPLETE)) { struct ldlm_enqueue_info einfo = { LDLM_IBITS, it_to_lock_mode(it), cb_blocking, - ldlm_completion_ast, NULL, NULL }; + ldlm_completion_ast, NULL, NULL, NULL }; /* For case if upper layer did not alloc fid, do it now. */ if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) { @@ -742,113 +919,135 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, } } rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, - lmm, lmmsize, extra_lock_flags); + lmm, lmmsize, NULL, extra_lock_flags); if (rc < 0) RETURN(rc); - memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh)); } else if (!fid_is_sane(&op_data->op_fid2) || - !(it->it_flags & O_CHECK_STALE)) { + !(it->it_create_mode & M_CHECK_STALE)) { /* DISP_ENQ_COMPLETE set means there is extra reference on * request referenced from this intent, saved for subsequent * lookup. This path is executed when we proceed to this * lookup, so we clear DISP_ENQ_COMPLETE */ it_clear_disposition(it, DISP_ENQ_COMPLETE); } - request = *reqp = it->d.lustre.it_data; - LASSERT(request != NULL); - LASSERT(request != LP_POISON); - LASSERT(request->rq_repmsg != LP_POISON); + *reqp = it->d.lustre.it_data; + rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh); + RETURN(rc); +} - if (!it_disposition(it, DISP_IT_EXECD)) { - /* The server failed before it even started executing the - * intent, i.e. because it couldn't unpack the request. */ - LASSERT(it->d.lustre.it_status != 0); - RETURN(it->d.lustre.it_status); - } - rc = it_open_error(DISP_IT_EXECD, it); - if (rc) - RETURN(rc); +static int mdc_intent_getattr_async_interpret(const struct lu_env *env, + struct ptlrpc_request *req, + void *unused, int rc) +{ + struct obd_export *exp = req->rq_async_args.pointer_arg[0]; + struct md_enqueue_info *minfo = req->rq_async_args.pointer_arg[1]; + struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2]; + struct lookup_intent *it; + struct lustre_handle *lockh; + struct obd_device *obddev; + int flags = LDLM_FL_HAS_INTENT; + ENTRY; - mdt_body = lustre_msg_buf(request->rq_repmsg, DLM_REPLY_REC_OFF, - sizeof(*mdt_body)); - LASSERT(mdt_body != NULL); /* mdc_enqueue checked */ - LASSERT_REPSWABBED(request, 1); /* mdc_enqueue swabbed */ + it = &minfo->mi_it; + lockh = &minfo->mi_lockh; - /* If we were revalidating a fid/name pair, mark the intent in - * case we fail and get called again from lookup */ - if (fid_is_sane(&op_data->op_fid2) && (it->it_flags & O_CHECK_STALE) && - (it->it_op != IT_GETATTR)) { - it_set_disposition(it, DISP_ENQ_COMPLETE); + obddev = class_exp2obd(exp); - /* Also: did we find the same inode? */ - if (!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) - RETURN(-ESTALE); + mdc_exit_request(&obddev->u.cli); + if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE)) + rc = -ETIMEDOUT; + + rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode, + &flags, NULL, 0, NULL, lockh, rc); + if (rc < 0) { + CERROR("ldlm_cli_enqueue_fini: %d\n", rc); + mdc_clear_replay_flag(req, rc); + GOTO(out, rc); } - rc = it_open_error(DISP_LOOKUP_EXECD, it); + rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc); if (rc) - RETURN(rc); + GOTO(out, rc); - /* keep requests around for the multiple phases of the call - * this shows the DISP_XX must guarantee we make it into the call - */ - if (!it_disposition(it, DISP_ENQ_CREATE_REF) && - it_disposition(it, DISP_OPEN_CREATE) && - !it_open_error(DISP_OPEN_CREATE, it)) { - it_set_disposition(it, DISP_ENQ_CREATE_REF); - ptlrpc_request_addref(request); /* balanced in ll_create_node */ - } - if (!it_disposition(it, DISP_ENQ_OPEN_REF) && - it_disposition(it, DISP_OPEN_OPEN) && - !it_open_error(DISP_OPEN_OPEN, it)) { - it_set_disposition(it, DISP_ENQ_OPEN_REF); - ptlrpc_request_addref(request); /* balanced in ll_file_open */ - /* BUG 11546 - eviction in the middle of open rpc processing */ - OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout); - } + rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh); + EXIT; - if (it->it_op & IT_CREAT) { - /* XXX this belongs in ll_create_it */ - } else if (it->it_op == IT_OPEN) { - LASSERT(!it_disposition(it, DISP_OPEN_CREATE)); - } else { - LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP)); +out: + OBD_FREE_PTR(einfo); + minfo->mi_cb(req, minfo, rc); + return 0; +} + +int mdc_intent_getattr_async(struct obd_export *exp, + struct md_enqueue_info *minfo, + struct ldlm_enqueue_info *einfo) +{ + struct md_op_data *op_data = &minfo->mi_data; + struct lookup_intent *it = &minfo->mi_it; + struct ptlrpc_request *req; + struct obd_device *obddev = class_exp2obd(exp); + struct ldlm_res_id res_id; + ldlm_policy_data_t policy = { + .l_inodebits = { MDS_INODELOCK_LOOKUP } + }; + int rc; + int flags = LDLM_FL_HAS_INTENT; + ENTRY; + + CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n", + op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1), + ldlm_it2str(it->it_op), it->it_flags); + + fid_build_reg_res_name(&op_data->op_fid1, &res_id); + req = mdc_intent_getattr_pack(exp, it, op_data); + if (!req) + RETURN(-ENOMEM); + + mdc_enter_request(&obddev->u.cli); + rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL, + 0, NULL, &minfo->mi_lockh, 1); + if (rc < 0) { + mdc_exit_request(&obddev->u.cli); + RETURN(rc); } - /* If we already have a matching lock, then cancel the new - * one. We have to set the data here instead of in - * mdc_enqueue, because we need to use the child's inode as - * the l_ast_data to match, and that's not available until - * intent_finish has performed the iget().) */ - lock = ldlm_handle2lock(&lockh); - if (lock) { - ldlm_policy_data_t policy = lock->l_policy_data; - LDLM_DEBUG(lock, "matching against this"); + req->rq_async_args.pointer_arg[0] = exp; + req->rq_async_args.pointer_arg[1] = minfo; + req->rq_async_args.pointer_arg[2] = einfo; + req->rq_interpret_reply = mdc_intent_getattr_async_interpret; + ptlrpcd_add_req(req, PSCOPE_OTHER); - LASSERTF(fid_res_name_eq(&mdt_body->fid1, - &lock->l_resource->lr_name), - "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n", - (unsigned long)lock->l_resource->lr_name.name[0], - (unsigned long)lock->l_resource->lr_name.name[1], - (unsigned long)lock->l_resource->lr_name.name[2], - (unsigned long)fid_seq(&mdt_body->fid1), - (unsigned long)fid_oid(&mdt_body->fid1), - (unsigned long)fid_ver(&mdt_body->fid1)); - LDLM_LOCK_PUT(lock); + RETURN(0); +} - memcpy(&old_lock, &lockh, sizeof(lockh)); - if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL, - LDLM_IBITS, &policy, LCK_NL, &old_lock)) { - ldlm_lock_decref_and_cancel(&lockh, - it->d.lustre.it_lock_mode); - memcpy(&lockh, &old_lock, sizeof(old_lock)); - memcpy(&it->d.lustre.it_lock_handle, &lockh, - sizeof(lockh)); - } +int mdc_revalidate_lock(struct obd_export *exp, + struct lookup_intent *it, + struct lu_fid *fid) +{ + /* We could just return 1 immediately, but since we should only + * be called in revalidate_it if we already have a lock, let's + * verify that. */ + struct ldlm_res_id res_id; + struct lustre_handle lockh; + ldlm_policy_data_t policy; + ldlm_mode_t mode; + ENTRY; + + fid_build_reg_res_name(fid, &res_id); + /* As not all attributes are kept under update lock, e.g. + owner/group/acls are under lookup lock, we need both + ibits for GETATTR. */ + policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ? + MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP : + MDS_INODELOCK_LOOKUP; + + mode = ldlm_lock_match(exp->exp_obd->obd_namespace, + LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS, + &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0); + if (mode) { + it->d.lustre.it_lock_handle = lockh.cookie; + it->d.lustre.it_lock_mode = mode; } - CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n", - op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op), - it->d.lustre.it_status, it->d.lustre.it_disposition, rc); - RETURN(rc); + RETURN(!!mode); }