RETURN(req);
}
+static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
+ struct lookup_intent *it,
+ struct md_op_data *unused)
+{
+ struct obd_device *obd = class_exp2obd(exp);
+ struct ptlrpc_request *req;
+ struct ldlm_intent *lit;
+ struct layout_intent *layout;
+ int rc;
+ ENTRY;
+
+ req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+ &RQF_LDLM_INTENT_LAYOUT);
+ if (req == NULL)
+ RETURN(ERR_PTR(-ENOMEM));
+
+ req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
+ rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
+ if (rc) {
+ ptlrpc_request_free(req);
+ RETURN(ERR_PTR(rc));
+ }
+
+ /* pack the intent */
+ lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
+ lit->opc = (__u64)it->it_op;
+
+ /* pack the layout intent request */
+ layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
+ /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
+ * set for replication */
+ layout->li_opc = LAYOUT_INTENT_ACCESS;
+
+ req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+ obd->u.cli.cl_max_mds_easize);
+ ptlrpc_request_set_replen(req);
+ RETURN(req);
+}
+
static struct ptlrpc_request *
mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
{
struct ldlm_request *lockreq;
struct ldlm_reply *lockrep;
struct lustre_intent_data *intent = &it->d.lustre;
+ struct ldlm_lock *lock;
+ void *lvb_data = NULL;
+ int lvb_len = 0;
ENTRY;
LASSERT(rc >= 0);
memset(lockh, 0, sizeof(*lockh));
rc = 0;
} else { /* rc = 0 */
- struct ldlm_lock *lock = ldlm_handle2lock(lockh);
- LASSERT(lock);
+ lock = ldlm_handle2lock(lockh);
+ LASSERT(lock != NULL);
/* If the server gave us back a different lock mode, we should
* fix up our variables. */
mdc_set_open_replay_data(NULL, NULL, req);
}
- /* TODO: make sure LAYOUT lock must be granted along with EA */
-
if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
void *eadata;
- mdc_update_max_ea_from_body(exp, body);
+ mdc_update_max_ea_from_body(exp, body);
/*
* The eadata is opaque; just check that it is there.
if (eadata == NULL)
RETURN(-EPROTO);
+ /* save lvb data and length in case this is for layout
+ * lock */
+ lvb_data = eadata;
+ lvb_len = body->eadatasize;
+
/*
* We save the reply LOV EA in case we have to replay a
* create for recovery. If we didn't allocate a large
RETURN(-EPROTO);
}
} else if (it->it_op & IT_LAYOUT) {
- struct ldlm_lock *lock = ldlm_handle2lock(lockh);
+ /* maybe the lock was granted right away and layout
+ * is packed into RMF_DLM_LVB of req */
+ lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
+ if (lvb_len > 0) {
+ lvb_data = req_capsule_server_sized_get(pill,
+ &RMF_DLM_LVB, lvb_len);
+ if (lvb_data == NULL)
+ RETURN(-EPROTO);
+ }
+ }
- if (lock != NULL && lock->l_lvb_data == NULL) {
- int lvb_len;
+ /* fill in stripe data for layout lock */
+ lock = ldlm_handle2lock(lockh);
+ if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
+ void *lmm;
- /* maybe the lock was granted right away and layout
- * is packed into RMF_DLM_LVB of req */
- lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB,
- RCL_SERVER);
- if (lvb_len > 0) {
- void *lvb;
- void *lmm;
+ LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
+ ldlm_it2str(it->it_op), lvb_len);
- lvb = req_capsule_server_sized_get(pill,
- &RMF_DLM_LVB, lvb_len);
- if (lvb == NULL) {
- LDLM_LOCK_PUT(lock);
- RETURN(-EPROTO);
- }
-
- OBD_ALLOC_LARGE(lmm, lvb_len);
- if (lmm == NULL) {
- LDLM_LOCK_PUT(lock);
- RETURN(-ENOMEM);
- }
- memcpy(lmm, lvb, lvb_len);
-
- /* install lvb_data */
- lock_res_and_lock(lock);
- LASSERT(lock->l_lvb_data == NULL);
- lock->l_lvb_data = lmm;
- lock->l_lvb_len = lvb_len;
- unlock_res_and_lock(lock);
- }
- }
- if (lock != NULL)
+ OBD_ALLOC_LARGE(lmm, lvb_len);
+ if (lmm == NULL) {
LDLM_LOCK_PUT(lock);
+ RETURN(-ENOMEM);
+ }
+ memcpy(lmm, lvb_data, lvb_len);
+
+ /* install lvb_data */
+ lock_res_and_lock(lock);
+ if (lock->l_lvb_data == NULL) {
+ lock->l_lvb_data = lmm;
+ lock->l_lvb_len = lvb_len;
+ lmm = NULL;
+ }
+ unlock_res_and_lock(lock);
+ if (lmm != NULL)
+ OBD_FREE_LARGE(lmm, lvb_len);
}
+ if (lock != NULL)
+ LDLM_LOCK_PUT(lock);
RETURN(rc);
}
if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
RETURN(-EOPNOTSUPP);
- req = mdc_enqueue_pack(exp, obddev->u.cli.cl_max_mds_easize);
+ req = mdc_intent_layout_pack(exp, it, op_data);
lvb_type = LVB_T_LAYOUT;
} else {
LBUG();
}
}
- rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
-
- RETURN(rc);
+ rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
+ if (rc < 0) {
+ if (lustre_handle_is_used(lockh)) {
+ ldlm_lock_decref(lockh, einfo->ei_mode);
+ memset(lockh, 0, sizeof(*lockh));
+ }
+ ptlrpc_req_finished(req);
+ }
+ RETURN(rc);
}
static int mdc_finish_intent_lock(struct obd_export *exp,
lockh.cookie = 0;
if (fid_is_sane(&op_data->op_fid2) &&
- (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT))) {
+ (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
/* We could just return 1 immediately, but since we should only
* be called in revalidate_it if we already have a lock, let's
* verify that. */