* Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2012, Intel Corporation.
+ * Copyright (c) 2011, 2013, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
__u64 *bits)
{
- struct ldlm_lock *lock;
+ struct ldlm_lock *lock;
+ struct inode *new_inode = data;
ENTRY;
if(bits)
LASSERT(lock != NULL);
lock_res_and_lock(lock);
#ifdef __KERNEL__
- if (lock->l_ast_data && lock->l_ast_data != data) {
- struct inode *new_inode = data;
- struct inode *old_inode = lock->l_ast_data;
- LASSERTF(old_inode->i_state & I_FREEING,
- "Found existing inode %p/%lu/%u state %lu in lock: "
- "setting data to %p/%lu/%u\n", old_inode,
- old_inode->i_ino, old_inode->i_generation,
- old_inode->i_state,
- new_inode, new_inode->i_ino, new_inode->i_generation);
- }
+ if (lock->l_resource->lr_lvb_inode &&
+ lock->l_resource->lr_lvb_inode != data) {
+ struct inode *old_inode = lock->l_resource->lr_lvb_inode;
+ LASSERTF(old_inode->i_state & I_FREEING,
+ "Found existing inode %p/%lu/%u state %lu in lock: "
+ "setting data to %p/%lu/%u\n", old_inode,
+ old_inode->i_ino, old_inode->i_generation,
+ old_inode->i_state,
+ new_inode, new_inode->i_ino, new_inode->i_generation);
+ }
#endif
- lock->l_ast_data = data;
+ lock->l_resource->lr_lvb_inode = new_inode;
if (bits)
*bits = lock->l_policy_data.l_inodebits.bits;
RETURN(rc);
}
-int mdc_change_cbdata(struct obd_export *exp,
- const struct lu_fid *fid,
- ldlm_iterator_t it, void *data)
+int mdc_null_inode(struct obd_export *exp,
+ const struct lu_fid *fid)
{
- struct ldlm_res_id res_id;
- ENTRY;
+ struct ldlm_res_id res_id;
+ struct ldlm_resource *res;
+ struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
+ ENTRY;
- fid_build_reg_res_name(fid, &res_id);
- ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
- &res_id, it, data);
+ LASSERTF(ns != NULL, "no namespace passed\n");
- EXIT;
- return 0;
+ fid_build_reg_res_name(fid, &res_id);
+
+ res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
+ if(res == NULL)
+ RETURN(0);
+
+ lock_res(res);
+ res->lr_lvb_inode = NULL;
+ unlock_res(res);
+
+ ldlm_resource_putref(res);
+ RETURN(0);
}
/* find any ldlm lock of the inode in mdc
RETURN(req);
}
+static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
+ struct lookup_intent *it,
+ struct md_op_data *unused)
+{
+ struct obd_device *obd = class_exp2obd(exp);
+ struct ptlrpc_request *req;
+ struct ldlm_intent *lit;
+ struct layout_intent *layout;
+ int rc;
+ ENTRY;
+
+ req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+ &RQF_LDLM_INTENT_LAYOUT);
+ if (req == NULL)
+ RETURN(ERR_PTR(-ENOMEM));
+
+ req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
+ rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
+ if (rc) {
+ ptlrpc_request_free(req);
+ RETURN(ERR_PTR(rc));
+ }
+
+ /* pack the intent */
+ lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
+ lit->opc = (__u64)it->it_op;
+
+ /* pack the layout intent request */
+ layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
+ /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
+ * set for replication */
+ layout->li_opc = LAYOUT_INTENT_ACCESS;
+
+ req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+ obd->u.cli.cl_max_mds_easize);
+ ptlrpc_request_set_replen(req);
+ RETURN(req);
+}
+
static struct ptlrpc_request *
mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
{
struct ldlm_request *lockreq;
struct ldlm_reply *lockrep;
struct lustre_intent_data *intent = &it->d.lustre;
+ struct ldlm_lock *lock;
+ void *lvb_data = NULL;
+ int lvb_len = 0;
ENTRY;
LASSERT(rc >= 0);
memset(lockh, 0, sizeof(*lockh));
rc = 0;
} else { /* rc = 0 */
- struct ldlm_lock *lock = ldlm_handle2lock(lockh);
- LASSERT(lock);
+ lock = ldlm_handle2lock(lockh);
+ LASSERT(lock != NULL);
/* If the server gave us back a different lock mode, we should
* fix up our variables. */
mdc_set_open_replay_data(NULL, NULL, req);
}
- /* TODO: make sure LAYOUT lock must be granted along with EA */
-
if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
void *eadata;
- mdc_update_max_ea_from_body(exp, body);
+ mdc_update_max_ea_from_body(exp, body);
/*
* The eadata is opaque; just check that it is there.
if (eadata == NULL)
RETURN(-EPROTO);
+ /* save lvb data and length in case this is for layout
+ * lock */
+ lvb_data = eadata;
+ lvb_len = body->eadatasize;
+
/*
* We save the reply LOV EA in case we have to replay a
* create for recovery. If we didn't allocate a large
RETURN(-EPROTO);
}
} else if (it->it_op & IT_LAYOUT) {
- struct ldlm_lock *lock = ldlm_handle2lock(lockh);
+ /* maybe the lock was granted right away and layout
+ * is packed into RMF_DLM_LVB of req */
+ lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
+ if (lvb_len > 0) {
+ lvb_data = req_capsule_server_sized_get(pill,
+ &RMF_DLM_LVB, lvb_len);
+ if (lvb_data == NULL)
+ RETURN(-EPROTO);
+ }
+ }
- if (lock != NULL && lock->l_lvb_data == NULL) {
- int lvb_len;
+ /* fill in stripe data for layout lock */
+ lock = ldlm_handle2lock(lockh);
+ if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
+ void *lmm;
- /* maybe the lock was granted right away and layout
- * is packed into RMF_DLM_LVB of req */
- lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB,
- RCL_SERVER);
- if (lvb_len > 0) {
- void *lvb;
- void *lmm;
+ LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
+ ldlm_it2str(it->it_op), lvb_len);
- lvb = req_capsule_server_sized_get(pill,
- &RMF_DLM_LVB, lvb_len);
- if (lvb == NULL) {
- LDLM_LOCK_PUT(lock);
- RETURN(-EPROTO);
- }
-
- OBD_ALLOC_LARGE(lmm, lvb_len);
- if (lmm == NULL) {
- LDLM_LOCK_PUT(lock);
- RETURN(-ENOMEM);
- }
- memcpy(lmm, lvb, lvb_len);
-
- /* install lvb_data */
- lock_res_and_lock(lock);
- LASSERT(lock->l_lvb_data == NULL);
- lock->l_lvb_data = lmm;
- lock->l_lvb_len = lvb_len;
- unlock_res_and_lock(lock);
- }
- }
- if (lock != NULL)
+ OBD_ALLOC_LARGE(lmm, lvb_len);
+ if (lmm == NULL) {
LDLM_LOCK_PUT(lock);
+ RETURN(-ENOMEM);
+ }
+ memcpy(lmm, lvb_data, lvb_len);
+
+ /* install lvb_data */
+ lock_res_and_lock(lock);
+ if (lock->l_lvb_data == NULL) {
+ lock->l_lvb_data = lmm;
+ lock->l_lvb_len = lvb_len;
+ lmm = NULL;
+ }
+ unlock_res_and_lock(lock);
+ if (lmm != NULL)
+ OBD_FREE_LARGE(lmm, lvb_len);
}
+ if (lock != NULL)
+ LDLM_LOCK_PUT(lock);
RETURN(rc);
}
if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
RETURN(-EOPNOTSUPP);
- req = mdc_enqueue_pack(exp, obddev->u.cli.cl_max_mds_easize);
+ req = mdc_intent_layout_pack(exp, it, op_data);
lvb_type = LVB_T_LAYOUT;
} else {
LBUG();
lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
LASSERT(lockrep != NULL);
+ lockrep->lock_policy_res2 =
+ ptlrpc_status_ntoh(lockrep->lock_policy_res2);
+
/* Retry the create infinitely when we get -EINPROGRESS from
* server. This is required by the new quota design. */
if (it && it->it_op & IT_CREAT &&
}
}
- rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
-
- RETURN(rc);
+ rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
+ if (rc < 0) {
+ if (lustre_handle_is_used(lockh)) {
+ ldlm_lock_decref(lockh, einfo->ei_mode);
+ memset(lockh, 0, sizeof(*lockh));
+ }
+ ptlrpc_req_finished(req);
+ }
+ RETURN(rc);
}
static int mdc_finish_intent_lock(struct obd_export *exp,
struct mdt_body *mdt_body;
struct ldlm_lock *lock;
int rc;
-
+ ENTRY;
LASSERT(request != NULL);
LASSERT(request != LP_POISON);
lockh.cookie = 0;
if (fid_is_sane(&op_data->op_fid2) &&
- (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT))) {
+ (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
/* We could just return 1 immediately, but since we should only
* be called in revalidate_it if we already have a lock, let's
* verify that. */
struct lookup_intent *it;
struct lustre_handle *lockh;
struct obd_device *obddev;
+ struct ldlm_reply *lockrep;
__u64 flags = LDLM_FL_HAS_INTENT;
ENTRY;
GOTO(out, rc);
}
+ lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+ LASSERT(lockrep != NULL);
+
+ lockrep->lock_policy_res2 =
+ ptlrpc_status_ntoh(lockrep->lock_policy_res2);
+
rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
if (rc)
GOTO(out, rc);