RETURN(result);
}
-static int mdt_getattr_name(struct mdt_thread_info *info)
+/* @ Huang Hua
+ * UPDATE lock should be taken against parent, and be release before exit;
+ * child_bits lock should be taken against child, and be returned back:
+ * (1)normal request will release child lock;
+ * (2)intent request will grant it to client.
+ */
+static int mdt_getattr_name_internal(struct mdt_thread_info *info,
+ int child_bits)
{
- struct md_object *next = mdt_object_child(info->mti_object);
+ struct mdt_object *parent = info->mti_object;
struct mdt_object *child;
+ struct md_object *next = mdt_object_child(info->mti_object);
struct mdt_body *body;
const char *name;
int result;
+ struct mdt_lock_handle *lhp;
+ struct mdt_lock_handle *lhc;
+ struct ldlm_reply *ldlm_rep;
+
+ ENTRY;
+
+ lhp = &info->mti_lh[MDT_LH_PARENT];
+ lhp->mlh_mode = LCK_CR;
+ lhc = &info->mti_lh[MDT_LH_CHILD];
+ lhc->mlh_mode = LCK_CR;
+ ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
LASSERT(info->mti_object != NULL);
- ENTRY;
+ intent_set_disposition(ldlm_rep, DISP_IT_EXECD);
body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
name = req_capsule_client_get(&info->mti_pill, &RMF_NAME);
if (name == NULL)
RETURN(-EFAULT);
+ /*step 1: lock parent */
+ result = mdt_object_lock(info->mti_mdt->mdt_namespace, parent,
+ lhp, MDS_INODELOCK_UPDATE);
+ if (result != 0)
+ RETURN(result);
+
+ /*step 2: lookup child's fid by name */
result = mdo_lookup(info->mti_ctxt, next, name, &body->fid1);
- if (result == 0) {
- child = mdt_object_find(info->mti_ctxt, info->mti_mdt,
- &body->fid1);
- if (!IS_ERR(child)) {
- result = mo_attr_get(info->mti_ctxt, next,
- &info->mti_attr);
- if (result == 0) {
- mdt_pack_attr2body(body, &info->mti_attr);
- body->valid |= OBD_MD_FLID;
- }
- mdt_object_put(info->mti_ctxt, child);
- } else
- result = PTR_ERR(child);
+ if (result != 0)
+ GOTO(out_parent, result);
+
+ /*step 3: find the child object by fid */
+ child = mdt_object_find(info->mti_ctxt, info->mti_mdt, &body->fid1);
+ if (IS_ERR(child))
+ GOTO(out_parent, result = PTR_ERR(child));
+
+ if (!lu_object_exists(info->mti_ctxt, &child->mot_obj.mo_lu)) {
+ intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
+ GOTO(out_child, result = -ENOENT);
+ } else {
+ intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
}
+
+ /*step 4: lock child: this lock is returned back to caller
+ * if successfully get attr.
+ */
+ result = mdt_object_lock(info->mti_mdt->mdt_namespace, child,
+ lhc, child_bits);
+ if (result != 0)
+ GOTO(out_child, result);
+
+ /* finally, we can get attr for child. */
+ result = mo_attr_get(info->mti_ctxt, next, &info->mti_attr);
+ if (result == 0) {
+ mdt_pack_attr2body(body, &info->mti_attr);
+ body->fid1 = *mdt_object_fid(child);
+ body->valid |= OBD_MD_FLID;
+ } else
+ mdt_object_unlock(info->mti_mdt->mdt_namespace, child, lhc);
+out_child:
+ mdt_object_put(info->mti_ctxt, child);
+out_parent:
+ mdt_object_unlock(info->mti_mdt->mdt_namespace, parent, lhp);
RETURN(result);
}
+/* see the above function */
+static int mdt_getattr_name(struct mdt_thread_info *info)
+{
+ struct mdt_lock_handle *lhc;
+ int rc;
+
+ ENTRY;
+ lhc = &info->mti_lh[MDT_LH_CHILD];
+ lhc->mlh_mode = LCK_CR;
+
+ rc = mdt_getattr_name_internal(info, MDS_INODELOCK_UPDATE);
+ if (rc == 0 && lustre_handle_is_used(&lhc->mlh_lh))
+ ldlm_lock_decref(&lhc->mlh_lh, lhc->mlh_mode);
+ RETURN(rc);
+}
+
static struct lu_device_operations mdt_lu_ops;
static int lu_device_is_mdt(struct lu_device *d)
};
static int mdt_intent_getattr(enum mdt_it_code opcode,
- struct mdt_thread_info *info);
+ struct mdt_thread_info *info,
+ struct ldlm_lock **,
+ int);
static int mdt_intent_reint(enum mdt_it_code opcode,
- struct mdt_thread_info *info);
+ struct mdt_thread_info *info,
+ struct ldlm_lock **,
+ int);
static struct mdt_it_flavor {
const struct req_format *it_fmt;
__u32 it_flags;
int (*it_act)(enum mdt_it_code ,
- struct mdt_thread_info *);
+ struct mdt_thread_info *,
+ struct ldlm_lock **,
+ int);
long it_reint;
} mdt_it_flavor[] = {
[MDT_IT_OPEN] = {
};
static int mdt_intent_getattr(enum mdt_it_code opcode,
- struct mdt_thread_info *info)
+ struct mdt_thread_info *info,
+ struct ldlm_lock **lockp,
+ int flags)
{
- int getattr_part = MDS_INODELOCK_UPDATE;
+ int child_bits;
+ struct ldlm_lock *old_lock = *lockp;
+ struct ldlm_lock *new_lock = NULL;
+ struct mdt_lock_handle *lhc;
+ struct ptlrpc_request *req = mdt_info_req(info);
+ struct ldlm_reply *ldlm_rep;
+
+ ENTRY;
switch (opcode) {
case MDT_IT_LOOKUP:
- getattr_part = MDS_INODELOCK_LOOKUP;
+ child_bits = MDS_INODELOCK_LOOKUP;
break;
case MDT_IT_GETATTR:
- getattr_part |= MDS_INODELOCK_LOOKUP;
+ child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
break;
default:
+ CERROR("Unhandled till now");
+ RETURN(-EINVAL);
break;
}
- /*FIXME do something on the above flag */
+
+ ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
+ intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
+ /* lock on child object is returned @info->mti_lh[MDT_LH_CHILD] */
+ ldlm_rep->lock_policy_res2 = mdt_getattr_name_internal(info, child_bits);
+
+ if (intent_disposition(ldlm_rep, DISP_LOOKUP_NEG))
+ ldlm_rep->lock_policy_res2 = 0;
+ if (!intent_disposition(ldlm_rep, DISP_LOOKUP_POS) ||
+ ldlm_rep->lock_policy_res2) {
+ RETURN(ELDLM_LOCK_ABORTED);
+ }
+
+ lhc = &info->mti_lh[MDT_LH_CHILD];
+ new_lock = ldlm_handle2lock(&lhc->mlh_lh);
+ if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
+ RETURN(0);
+
+ *lockp = new_lock;
+
+ /* Fixup the lock to be given to the client */
+ l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
+ new_lock->l_readers = 0;
+ new_lock->l_writers = 0;
+
+ new_lock->l_export = class_export_get(req->rq_export);
+ list_add(&new_lock->l_export_chain,
+ &new_lock->l_export->exp_ldlm_data.led_held_locks);
+
+ new_lock->l_blocking_ast = old_lock->l_blocking_ast;
+ new_lock->l_completion_ast = old_lock->l_completion_ast;
- return mdt_getattr_name(info);
+ new_lock->l_remote_handle = old_lock->l_remote_handle;
+
+ new_lock->l_flags &= ~LDLM_FL_LOCAL;
+
+ LDLM_LOCK_PUT(old_lock);
+ l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
+
+ RETURN(ELDLM_LOCK_REPLACED);
}
static int mdt_intent_reint(enum mdt_it_code opcode,
- struct mdt_thread_info *info)
+ struct mdt_thread_info *info,
+ struct ldlm_lock **lockp,
+ int flags)
{
long opc;
- int rc;
-
+ struct ldlm_reply *rep;
+
static const struct req_format *intent_fmts[REINT_MAX] = {
[REINT_CREATE] = &RQF_LDLM_INTENT_CREATE,
[REINT_OPEN] = &RQF_LDLM_INTENT_OPEN
};
ENTRY;
+ rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
+ if (rep == NULL)
+ RETURN(-EFAULT);
opc = mdt_reint_opcode(info, intent_fmts);
- if (opc >= 0) {
- if (mdt_it_flavor[opcode].it_reint == opc)
- rc = mdt_reint_internal(info, opc);
- else {
- CERROR("Reint code %ld doesn't match intent: %d\n",
- opc, opcode);
- rc = -EPROTO;
- }
- } else
- rc = opc;
- RETURN(rc);
+ if (opc < 0)
+ RETURN(opc);
+
+ if (mdt_it_flavor[opcode].it_reint != opc) {
+ CERROR("Reint code %ld doesn't match intent: %d\n",
+ opc, opcode);
+ RETURN(-EPROTO);
+ }
+ rep->lock_policy_res2 = mdt_reint_internal(info, opc);
+ intent_set_disposition(rep, DISP_IT_EXECD);
+
+ RETURN(ELDLM_LOCK_ABORTED);
}
static int mdt_intent_code(long itcode)
return result;
}
-static int mdt_intent_opc(long itopc, struct mdt_thread_info *info)
+static int mdt_intent_opc(long itopc, struct mdt_thread_info *info,
+ struct ldlm_lock **lockp, int flags)
{
int opc;
int rc;
ENTRY;
opc = mdt_intent_code(itopc);
- if (opc >= 0) {
- pill = &info->mti_pill;
- flv = &mdt_it_flavor[opc];
-
- if (flv->it_fmt != NULL)
- req_capsule_extend(pill, flv->it_fmt);
-
- rc = mdt_unpack_req_pack_rep(info, flv->it_flags);
- if (rc == 0) {
- struct ldlm_reply *rep;
-
- rep = req_capsule_server_get(pill, &RMF_DLM_REP);
- if (rep != NULL) {
- /* execute policy */
- /*XXX LASSERT( flv->it_act) */
- if (flv->it_act) {
- rc = flv->it_act(opc, info);
- rep->lock_policy_res2 = rc;
- intent_set_disposition(rep,
- DISP_IT_EXECD);
- rc = 0;
- } else
- rc = -EOPNOTSUPP;
- } else
- rc = -EFAULT;
- }
- } else
- rc = -EINVAL;
+ if (opc < 0)
+ RETURN(-EINVAL);
+
+ pill = &info->mti_pill;
+ flv = &mdt_it_flavor[opc];
+
+ if (flv->it_fmt != NULL)
+ req_capsule_extend(pill, flv->it_fmt);
+
+ rc = mdt_unpack_req_pack_rep(info, flv->it_flags);
+ if (rc)
+ RETURN(rc);
+
+
+ /* execute policy */
+ /*XXX LASSERT( flv->it_act) */
+ if (flv->it_act) {
+ rc = flv->it_act(opc, info, lockp, flags);
+ } else
+ rc = -EOPNOTSUPP;
RETURN(rc);
}
LDLM_DEBUG(lock, "intent policy opc: %s",
ldlm_it2str(it->opc));
- rc = mdt_intent_opc(it->opc, info);
+ rc = mdt_intent_opc(it->opc, info, lockp, flags);
if (rc == 0)
rc = ELDLM_OK;
} else