RETURN(result);
}
-/*Please move these function from mds to mdt*/
+/*Please move these functions from mds to mdt*/
int intent_disposition(struct ldlm_reply *rep, int flag)
{
if (!rep)
rep->lock_policy_res1 |= flag;
}
-static void fixup_handle_for_resent_req(struct mdt_thread_info *info,
- struct ptlrpc_request *req,
- int offset,
- struct ldlm_lock *new_lock,
- struct ldlm_lock **old_lock,
- struct mdt_lock_handle *lockh)
-{
- struct obd_export *exp = req->rq_export;
- struct mdt_device * mdt = info->mti_mdt;
- struct ldlm_request *dlmreq =
- lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*dlmreq));
- struct lustre_handle remote_hdl = dlmreq->lock_handle1;
- struct list_head *iter;
-
- if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
- return;
-
- l_lock(&mdt->mdt_namespace->ns_lock);
- list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
- struct ldlm_lock *lock;
- lock = list_entry(iter, struct ldlm_lock, l_export_chain);
- if (lock == new_lock)
- continue;
- if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
- lockh->mlh_lh.cookie = lock->l_handle.h_cookie;
- LDLM_DEBUG(lock, "restoring lock cookie");
- DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
- lockh->mlh_lh.cookie);
- if (old_lock)
- *old_lock = LDLM_LOCK_GET(lock);
- l_unlock(&mdt->mdt_namespace->ns_lock);
- return;
- }
- }
- l_unlock(&mdt->mdt_namespace->ns_lock);
-
- /* If the xid matches, then we know this is a resent request,
- * and allow it. (It's probably an OPEN, for which we don't
- * send a lock */
- if (req->rq_xid ==
- le64_to_cpu(exp->exp_mds_data.med_mcd->mcd_last_xid))
- return;
-
- /* This remote handle isn't enqueued, so we never received or
- * processed this request. Clear MSG_RESENT, because it can
- * be handled like any normal request now. */
-
- lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
-
- DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
- remote_hdl.cookie);
-}
static int mdt_intent_policy(struct ldlm_namespace *ns,
struct ldlm_lock **lockp, void *req_cookie,
struct ldlm_reply *rep;
struct mdt_lock_handle lockh = { {0} };
struct ldlm_lock *new_lock = NULL;
- int getattr_part = MDS_INODELOCK_UPDATE;
int offset = MDS_REQ_INTENT_REC_OFF;
int rc;
struct mdt_thread_info *info;
LASSERT(req != NULL);
- /* We already got it in mdt_handle. But we have to do it again*/
info = lu_context_key_get(req->rq_svc_thread->t_ctx, &mdt_thread_key);
LASSERT(info != NULL);
- mdt_thread_info_init(info);
if (req->rq_reqmsg->bufcount <= MDS_REQ_INTENT_IT_OFF) {
info->mti_rep_buf_size[0] = sizeof(struct ldlm_reply);
rc = lustre_pack_reply(req, 1, info->mti_rep_buf_size, NULL);
LASSERT(rc == 0);
- mdt_thread_info_fini(info);
RETURN(0);
}
lustre_swab_ldlm_intent);
if (it == NULL) {
CERROR("Intent missing\n");
- mdt_thread_info_fini(info);
RETURN(req->rq_status = -EFAULT);
}
info->mti_rep_buf_size[1] = sizeof(struct mdt_body);
info->mti_rep_buf_size[2] = sizeof(struct lov_mds_md);/*FIXME:See mds*/
- if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
- (it->opc & (IT_OPEN | IT_GETATTR | IT_LOOKUP))){
- /* we should never allow OBD_CONNECT_ACL if not configured */
- info->mti_rep_buf_size[info->mti_rep_buf_nr++] =
- LUSTRE_POSIX_ACL_MAX_SIZE;
- }
- else if (it->opc & IT_UNLINK){
- info->mti_rep_buf_size[info->mti_rep_buf_nr++] =
- sizeof(struct llog_cookie);
- /*FIXME:See mds*/
- }
rc = lustre_pack_reply(req, info->mti_rep_buf_nr,
info->mti_rep_buf_size, NULL);
if (rc){
- mdt_thread_info_fini(info);
RETURN(req->rq_status = rc);
}
/* execute policy */
switch ((long)it->opc) {
- case IT_OPEN:
- case IT_CREAT|IT_OPEN:
- fixup_handle_for_resent_req(info, req,
- MDS_REQ_INTENT_LOCKREQ_OFF,
- lock, NULL, &lockh);
- /* XXX swab here to assert that an mds_open reint
- * packet is following */
+ case IT_CREAT:
rep->lock_policy_res2 = mdt_reint_internal(info, req,
offset, &lockh);
-#if 0
- /* We abort the lock if the lookup was negative and
- * we did not make it to the OPEN portion */
- if (!intent_disposition(rep, DISP_LOOKUP_EXECD))
- RETURN(ELDLM_LOCK_ABORTED);
- if (intent_disposition(rep, DISP_LOOKUP_NEG) &&
- !intent_disposition(rep, DISP_OPEN_OPEN))
- RETURN(ELDLM_LOCK_ABORTED);
-#endif
- break;
- case IT_LOOKUP:
- getattr_part = MDS_INODELOCK_LOOKUP;
- case IT_GETATTR:
- getattr_part |= MDS_INODELOCK_LOOKUP;
- case IT_READDIR:
-#if 0
- fixup_handle_for_resent_req(info, req,
- MDS_REQ_INTENT_LOCKREQ_OFF,
- lock, &new_lock, &lockh);
-
- /* INODEBITS_INTEROP: if this lock was converted from a
- * plain lock (client does not support inodebits), then
- * child lock must be taken with both lookup and update
- * bits set for all operations.
- */
- if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS))
- getattr_part = MDS_INODELOCK_LOOKUP |
- MDS_INODELOCK_UPDATE;
-
- rep->lock_policy_res2 = mds_getattr_name(offset, req,
- getattr_part, &lockh);
- /* FIXME: LDLM can set req->rq_status. MDS sets
- policy_res{1,2} with disposition and status.
- - replay: returns 0 & req->status is old status
- - otherwise: returns req->status */
- if (intent_disposition(rep, DISP_LOOKUP_NEG))
- rep->lock_policy_res2 = 0;
- if (!intent_disposition(rep, DISP_LOOKUP_POS) ||
- rep->lock_policy_res2)
- RETURN(ELDLM_LOCK_ABORTED);
- if (req->rq_status != 0) {
- LBUG();
- rep->lock_policy_res2 = req->rq_status;
- RETURN(ELDLM_LOCK_ABORTED);
- }
-#endif
- RETURN(ELDLM_LOCK_ABORTED);
break;
default:
- CERROR("Unhandled intent "LPD64"\n", it->opc);
- LBUG();
+ CERROR("Unhandled intent till now "LPD64"\n", it->opc);
+ RETURN(ELDLM_LOCK_ABORTED);
+ /*LBUG();*/
}
/* By this point, whatever function we called above must have either
if (new_lock == NULL)
new_lock = ldlm_handle2lock(&lockh.mlh_lh);
if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) {
- mdt_thread_info_fini(info);
- RETURN(0);
+ RETURN(ELDLM_OK);
}
LASSERTF(new_lock != NULL, "op "LPX64" lockh "LPX64"\n",
*lockp = new_lock;
- if (new_lock->l_export == req->rq_export) {
- /* Already gave this to the client, which means that we
- * reconstructed a reply. */
- LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
- MSG_RESENT);
- mdt_thread_info_fini(info);
- RETURN(ELDLM_LOCK_REPLACED);
- }
-
/* Fixup the lock to be given to the client */
l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
new_lock->l_readers = 0;
new_lock->l_blocking_ast = lock->l_blocking_ast;
new_lock->l_completion_ast = lock->l_completion_ast;
- memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle,
- sizeof(lock->l_remote_handle));
+ new_lock->l_remote_handle = lock->l_remote_handle;
new_lock->l_flags &= ~LDLM_FL_LOCAL;
LDLM_LOCK_PUT(new_lock);
l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
- mdt_thread_info_fini(info);
RETURN(ELDLM_LOCK_REPLACED);
}