X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmdc%2Fmdc_locks.c;h=23802974e218b2dd397aecd12b607951182239e1;hb=5516349042147d5553e5861a6f61739771d5f90d;hp=c2937bbfdc3b6377ef0982044cff9bffc7ad05fc;hpb=113303973ec9f8484eb2355a1a6ef3c4c7fd6a56;p=fs%2Flustre-release.git diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index c2937bb..2380297 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -229,48 +229,46 @@ static void mdc_realloc_openmsg(struct ptlrpc_request *req, /* We always reserve enough space in the reply packet for a stripe MD, because * we don't know in advance the file type. */ -int mdc_enqueue(struct obd_export *exp, - int lock_type, - struct lookup_intent *it, - int lock_mode, - struct mdc_op_data *data, - struct lustre_handle *lockh, - void *lmm, - int lmmsize, - ldlm_completion_callback cb_completion, - ldlm_blocking_callback cb_blocking, - void *cb_data, int extra_lock_flags) +int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, + struct lookup_intent *it, struct mdc_op_data *op_data, + struct lustre_handle *lockh, void *lmm, int lmmsize, + int extra_lock_flags) { struct ptlrpc_request *req; struct obd_device *obddev = class_exp2obd(exp); struct ldlm_res_id res_id = - { .name = {data->fid1.id, data->fid1.generation} }; + { .name = {op_data->fid1.id, op_data->fid1.generation} }; ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } }; struct ldlm_request *lockreq; struct ldlm_intent *lit; struct ldlm_reply *lockrep; int size[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), [DLM_LOCKREQ_OFF] = sizeof(*lockreq), - [DLM_INTENT_IT_OFF] = sizeof(*lit) }; + [DLM_INTENT_IT_OFF] = sizeof(*lit), + 0, 0, 0, 0 }; int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), [DLM_LOCKREPLY_OFF] = sizeof(*lockrep), [DLM_REPLY_REC_OFF] = sizeof(struct mds_body), [DLM_REPLY_REC_OFF+1] = obddev->u.cli. - cl_max_mds_easize }; + cl_max_mds_easize, 0 }; int flags = extra_lock_flags | LDLM_FL_HAS_INTENT; int repbufcnt = 4, rc; void *eadata; ENTRY; - LASSERTF(lock_type == LDLM_IBITS, "lock type %d\n", lock_type); + LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type); // LDLM_DEBUG_NOLOCK("mdsintent=%s,name=%s,dir=%lu", // ldlm_it2str(it->it_op), it_name, it_inode->i_ino); if (it->it_op & IT_OPEN) { + CFS_LIST_HEAD(cancels); + int count = 0; + int mode; + it->it_create_mode |= S_IFREG; size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create); - size[DLM_INTENT_REC_OFF + 1] = data->namelen + 1; + size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1; /* As an optimization, we allocate an RPC request buffer for * at least a default-sized LOV EA even if we aren't sending * one. We grow the whole request to the next power-of-two @@ -286,27 +284,40 @@ int mdc_enqueue(struct obd_export *exp, min(size[DLM_INTENT_REC_OFF+2]+round_up(rc)-rc, obddev->u.cli.cl_max_mds_easize); - if (it->it_flags & O_JOIN_FILE) { - __u64 head_size = *(__u32*)cb_data; - __u32 tsize = *(__u32*)lmm; + /* If inode is known, cancel conflicting OPEN locks. */ + if (op_data->fid2.id) { + if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC)) + mode = LCK_CW; +#ifdef FMODE_EXEC + else if (it->it_flags & FMODE_EXEC) + mode = LCK_PR; +#endif + else + mode = LCK_CR; + count = mdc_resource_get_unused(exp, &op_data->fid2, + &cancels, mode, + MDS_INODELOCK_OPEN); + } + /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */ + if (it->it_op & IT_CREAT || it->it_flags & O_JOIN_FILE) + mode = LCK_EX; + else + mode = LCK_CR; + count += mdc_resource_get_unused(exp, &op_data->fid1, &cancels, + mode, MDS_INODELOCK_UPDATE); + if (it->it_flags & O_JOIN_FILE) { /* join is like an unlink of the tail */ policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; size[DLM_INTENT_REC_OFF + 3] = sizeof(struct mds_rec_join); - req = ptlrpc_prep_req(class_exp2cliimp(exp), - LUSTRE_DLM_VERSION, LDLM_ENQUEUE, - 7, size, NULL); - /* when joining file, cb_data and lmm args together - * indicate the head file size*/ - mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, data, - (head_size << 32) | tsize); - cb_data = NULL; - lmm = NULL; + req = ldlm_prep_enqueue_req(exp, 7, size, &cancels, + count); + mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, op_data, + (*(__u64 *)op_data->data)); } else { - req = ptlrpc_prep_req(class_exp2cliimp(exp), - LUSTRE_DLM_VERSION, LDLM_ENQUEUE, - 6, size, NULL); + req = ldlm_prep_enqueue_req(exp, 6, size, &cancels, + count); } if (!req) @@ -322,16 +333,16 @@ int mdc_enqueue(struct obd_export *exp, lit->opc = (__u64)it->it_op; /* pack the intended request */ - mdc_open_pack(req, DLM_INTENT_REC_OFF, data, it->it_create_mode, - 0, it->it_flags, lmm, lmmsize); + mdc_open_pack(req, DLM_INTENT_REC_OFF, op_data, + it->it_create_mode, 0, it->it_flags, + lmm, lmmsize); repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE; } else if (it->it_op & IT_UNLINK) { size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_unlink); - size[DLM_INTENT_REC_OFF + 1] = data->namelen + 1; + size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1; policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION, - LDLM_ENQUEUE, 5, size, NULL); + req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0); if (!req) RETURN(-ENOMEM); @@ -341,7 +352,7 @@ int mdc_enqueue(struct obd_export *exp, lit->opc = (__u64)it->it_op; /* pack the intended request */ - mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data); + mdc_unlink_pack(req, DLM_INTENT_REC_OFF, op_data); repsize[repbufcnt++] = obddev->u.cli.cl_max_mds_cookiesize; } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) { @@ -349,13 +360,12 @@ int mdc_enqueue(struct obd_export *exp, OBD_MD_FLACL | OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA; size[DLM_INTENT_REC_OFF] = sizeof(struct mds_body); - size[DLM_INTENT_REC_OFF + 1] = data->namelen + 1; + size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1; if (it->it_op & IT_GETATTR) policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION, - LDLM_ENQUEUE, 5, size, NULL); + req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0); if (!req) RETURN(-ENOMEM); @@ -366,13 +376,12 @@ int mdc_enqueue(struct obd_export *exp, /* pack the intended request */ mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, - it->it_flags, data); + it->it_flags, op_data); repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE; } else if (it->it_op == IT_READDIR) { policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION, - LDLM_ENQUEUE, 2, size, NULL); + req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0); if (!req) RETURN(-ENOMEM); @@ -390,9 +399,8 @@ int mdc_enqueue(struct obd_export *exp, * rpcs in flight counter */ mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it); mdc_enter_request(&obddev->u.cli); - rc = ldlm_cli_enqueue(exp, &req, res_id, lock_type, &policy, - lock_mode, &flags, cb_blocking, cb_completion, - NULL, cb_data, NULL, 0, NULL, lockh, 0); + rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL, + 0, NULL, lockh, 0); mdc_exit_request(&obddev->u.cli); mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it); @@ -407,7 +415,7 @@ int mdc_enqueue(struct obd_export *exp, /* This can go when we're sure that this can never happen */ LASSERT(rc != -ENOENT); if (rc == ELDLM_LOCK_ABORTED) { - lock_mode = 0; + einfo->ei_mode = 0; memset(lockh, 0, sizeof(*lockh)); rc = 0; } else if (rc != 0) { @@ -422,10 +430,10 @@ int mdc_enqueue(struct obd_export *exp, /* If the server gave us back a different lock mode, we should * fix up our variables. */ - if (lock->l_req_mode != lock_mode) { + if (lock->l_req_mode != einfo->ei_mode) { ldlm_lock_addref(lockh, lock->l_req_mode); - ldlm_lock_decref(lockh, lock_mode); - lock_mode = lock->l_req_mode; + ldlm_lock_decref(lockh, einfo->ei_mode); + einfo->ei_mode = lock->l_req_mode; } LDLM_LOCK_PUT(lock); } @@ -437,7 +445,7 @@ int mdc_enqueue(struct obd_export *exp, it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1; it->d.lustre.it_status = (int)lockrep->lock_policy_res2; - it->d.lustre.it_lock_mode = lock_mode; + it->d.lustre.it_lock_mode = einfo->ei_mode; it->d.lustre.it_data = req; if (it->d.lustre.it_status < 0 && req->rq_replay) @@ -624,11 +632,12 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data, * this and use the request from revalidate. In this case, revalidate * never dropped its reference, so the refcounts are all OK */ if (!it_disposition(it, DISP_ENQ_COMPLETE)) { + struct ldlm_enqueue_info einfo = + { LDLM_IBITS, it_to_lock_mode(it), cb_blocking, + ldlm_completion_ast, NULL, NULL }; - rc = mdc_enqueue(exp, LDLM_IBITS, it, it_to_lock_mode(it), - op_data, &lockh, lmm, lmmsize, - ldlm_completion_ast, cb_blocking, NULL, - extra_lock_flags); + rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, + lmm, lmmsize, extra_lock_flags); if (rc < 0) RETURN(rc); memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh)); @@ -687,6 +696,8 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data, !it_open_error(DISP_OPEN_OPEN, it)) { it_set_disposition(it, DISP_ENQ_OPEN_REF); ptlrpc_request_addref(request); /* balanced in ll_file_open */ + /* BUG 11546 - eviction in the middle of open rpc processing */ + OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout); } if (it->it_op & IT_CREAT) {