/* lu2dt_dev() */
#include <linux/dt_object.h>
+/*LUSTRE_POSIX_ACL_MAX_SIZE*/
+#include <linux/lustre_acl.h>
+
/* struct mds_client_data */
#include "../mds/mds_internal.h"
return -EOPNOTSUPP;
}
+int mdt_reint_internal(struct mdt_thread_info *info,
+ struct ptlrpc_request *req,
+ int offset,
+ struct lustre_handle *lockh)
+{
+ struct mdt_reint_record *rec; /* 116 bytes on the stack? no sir! */
+ int rc;
+
+ OBD_ALLOC(rec, sizeof(*rec));
+ if (rec == NULL)
+ RETURN(-ENOMEM);
+
+ rc = mdt_reint_unpack(info, req, offset, rec);
+ if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
+ CERROR("invalid record\n");
+ GOTO(out, rc = -EINVAL);
+ }
+
+ rc = mdt_reint_rec(info, rec, offset, req, lockh);
+out:
+ OBD_FREE(rec, sizeof(*rec));
+ RETURN(rc);
+}
+
static int mdt_reint(struct mdt_thread_info *info,
struct ptlrpc_request *req, int offset)
{
int size[] = { sizeof(struct mdt_body), sizeof(struct lov_mds_md), /*FIXME*/
sizeof(struct llog_cookie)};
int bufcount,rc;
- struct mdt_reint_record *rec; /* 116 bytes on the stack? no sir! */
ENTRY;
rc = lustre_pack_reply(req, bufcount, size, NULL);
if (rc)
RETURN (rc);
-
- OBD_ALLOC(rec, sizeof(*rec));
- if (rec == NULL)
- RETURN(-ENOMEM);
-
- rc = mdt_reint_unpack(info, req, offset, rec);
- if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
- CERROR("invalid record\n");
- GOTO(out, req->rq_status = -EINVAL);
- }
-
- rc = mdt_reint_rec(info, rec, offset, req, NULL);
-out:
- OBD_FREE(rec, sizeof(*rec));
+ rc = mdt_reint_internal(info, req, offset, NULL);
RETURN(rc);
}
RETURN(result);
}
+/*Please move these function from mds to mdt*/
+int intent_disposition(struct ldlm_reply *rep, int flag)
+{
+ if (!rep)
+ return 0;
+ return (rep->lock_policy_res1 & flag);
+}
+
+void intent_set_disposition(struct ldlm_reply *rep, int flag)
+{
+ if (!rep)
+ return;
+ rep->lock_policy_res1 |= flag;
+}
+
+static void fixup_handle_for_resent_req(struct mdt_thread_info *info,
+ struct ptlrpc_request *req,
+ int offset,
+ struct ldlm_lock *new_lock,
+ struct ldlm_lock **old_lock,
+ struct lustre_handle *lockh)
+{
+ struct obd_export *exp = req->rq_export;
+ struct mdt_device * mdt = info->mti_mdt;
+ struct ldlm_request *dlmreq =
+ lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*dlmreq));
+ struct lustre_handle remote_hdl = dlmreq->lock_handle1;
+ struct list_head *iter;
+
+ if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
+ return;
+
+ l_lock(&mdt->mdt_namespace->ns_lock);
+ list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
+ struct ldlm_lock *lock;
+ lock = list_entry(iter, struct ldlm_lock, l_export_chain);
+ if (lock == new_lock)
+ continue;
+ if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
+ lockh->cookie = lock->l_handle.h_cookie;
+ LDLM_DEBUG(lock, "restoring lock cookie");
+ DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
+ lockh->cookie);
+ if (old_lock)
+ *old_lock = LDLM_LOCK_GET(lock);
+ l_unlock(&mdt->mdt_namespace->ns_lock);
+ return;
+ }
+ }
+ l_unlock(&mdt->mdt_namespace->ns_lock);
+
+ /* If the xid matches, then we know this is a resent request,
+ * and allow it. (It's probably an OPEN, for which we don't
+ * send a lock */
+ if (req->rq_xid ==
+ le64_to_cpu(exp->exp_mds_data.med_mcd->mcd_last_xid))
+ return;
+
+ /* This remote handle isn't enqueued, so we never received or
+ * processed this request. Clear MSG_RESENT, because it can
+ * be handled like any normal request now. */
+
+ lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
+
+ DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
+ remote_hdl.cookie);
+}
+
static int mdt_intent_policy(struct ldlm_namespace *ns,
struct ldlm_lock **lockp, void *req_cookie,
ldlm_mode_t mode, int flags, void *data)
{
+ struct ptlrpc_request *req = req_cookie;
+ struct ldlm_lock *lock = *lockp;
+ struct ldlm_intent *it;
+ struct ldlm_reply *rep;
+ struct lustre_handle lockh = { 0 };
+ struct ldlm_lock *new_lock = NULL;
+ int getattr_part = MDS_INODELOCK_UPDATE;
+ int repsize[4] = {sizeof(*rep),
+ sizeof(struct mdt_body),
+ sizeof(struct lov_mds_md)};/*FIXME:See mds*/
+ int repbufcnt = 3, offset = MDS_REQ_INTENT_REC_OFF;
+ int rc;
+ struct mdt_thread_info *info = NULL;
+ /*FIXME:How to get this pointer?
+ from the request? or passed in by @data*/
+
+
ENTRY;
- RETURN(ELDLM_LOCK_ABORTED);
+
+ LASSERT(req != NULL);
+
+ if (req->rq_reqmsg->bufcount <= MDS_REQ_INTENT_IT_OFF) {
+ /* No intent was provided */
+ int size = sizeof(struct ldlm_reply);
+ rc = lustre_pack_reply(req, 1, &size, NULL);
+ LASSERT(rc == 0);
+ RETURN(0);
+ }
+
+ it = lustre_swab_reqbuf(req, MDS_REQ_INTENT_IT_OFF, sizeof(*it),
+ lustre_swab_ldlm_intent);
+ if (it == NULL) {
+ CERROR("Intent missing\n");
+ RETURN(req->rq_status = -EFAULT);
+ }
+
+ LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc));
+
+ if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
+ (it->opc & (IT_OPEN | IT_GETATTR | IT_LOOKUP)))
+ /* we should never allow OBD_CONNECT_ACL if not configured */
+ repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
+ else if (it->opc & IT_UNLINK)
+ repsize[repbufcnt++] = sizeof(struct llog_cookie); /*FIXME:See mds*/
+
+ rc = lustre_pack_reply(req, repbufcnt, repsize, NULL);
+ if (rc)
+ RETURN(req->rq_status = rc);
+
+ rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
+ intent_set_disposition(rep, DISP_IT_EXECD);
+
+
+ /* execute policy */
+ switch ((long)it->opc) {
+ case IT_OPEN:
+ case IT_CREAT|IT_OPEN:
+ fixup_handle_for_resent_req(info,req, MDS_REQ_INTENT_LOCKREQ_OFF,
+ lock, NULL, &lockh);
+ /* XXX swab here to assert that an mds_open reint
+ * packet is following */
+ rep->lock_policy_res2 = mdt_reint_internal(info, req,
+ offset, &lockh);
+#if 0
+ /* We abort the lock if the lookup was negative and
+ * we did not make it to the OPEN portion */
+ if (!intent_disposition(rep, DISP_LOOKUP_EXECD))
+ RETURN(ELDLM_LOCK_ABORTED);
+ if (intent_disposition(rep, DISP_LOOKUP_NEG) &&
+ !intent_disposition(rep, DISP_OPEN_OPEN))
+ RETURN(ELDLM_LOCK_ABORTED);
+#endif
+ break;
+ case IT_LOOKUP:
+ getattr_part = MDS_INODELOCK_LOOKUP;
+ case IT_GETATTR:
+ getattr_part |= MDS_INODELOCK_LOOKUP;
+ case IT_READDIR:
+#if 0
+ fixup_handle_for_resent_req(info, req, MDS_REQ_INTENT_LOCKREQ_OFF,
+ lock, &new_lock, &lockh);
+
+ /* INODEBITS_INTEROP: if this lock was converted from a
+ * plain lock (client does not support inodebits), then
+ * child lock must be taken with both lookup and update
+ * bits set for all operations.
+ */
+ if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS))
+ getattr_part = MDS_INODELOCK_LOOKUP |
+ MDS_INODELOCK_UPDATE;
+
+ rep->lock_policy_res2 = mds_getattr_name(offset, req,
+ getattr_part, &lockh);
+ /* FIXME: LDLM can set req->rq_status. MDS sets
+ policy_res{1,2} with disposition and status.
+ - replay: returns 0 & req->status is old status
+ - otherwise: returns req->status */
+ if (intent_disposition(rep, DISP_LOOKUP_NEG))
+ rep->lock_policy_res2 = 0;
+ if (!intent_disposition(rep, DISP_LOOKUP_POS) ||
+ rep->lock_policy_res2)
+ RETURN(ELDLM_LOCK_ABORTED);
+ if (req->rq_status != 0) {
+ LBUG();
+ rep->lock_policy_res2 = req->rq_status;
+ RETURN(ELDLM_LOCK_ABORTED);
+ }
+#endif
+ RETURN(ELDLM_LOCK_ABORTED);
+ break;
+ default:
+ CERROR("Unhandled intent "LPD64"\n", it->opc);
+ LBUG();
+ }
+
+ /* By this point, whatever function we called above must have either
+ * filled in 'lockh', been an intent replay, or returned an error. We
+ * want to allow replayed RPCs to not get a lock, since we would just
+ * drop it below anyways because lock replay is done separately by the
+ * client afterwards. For regular RPCs we want to give the new lock to
+ * the client instead of whatever lock it was about to get. */
+ if (new_lock == NULL)
+ new_lock = ldlm_handle2lock(&lockh);
+ if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
+ RETURN(0);
+
+ LASSERTF(new_lock != NULL, "op "LPX64" lockh "LPX64"\n",
+ it->opc, lockh.cookie);
+
+ /* If we've already given this lock to a client once, then we should
+ * have no readers or writers. Otherwise, we should have one reader
+ * _or_ writer ref (which will be zeroed below) before returning the
+ * lock to a client. */
+ if (new_lock->l_export == req->rq_export) {
+ LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
+ } else {
+ LASSERT(new_lock->l_export == NULL);
+ LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
+ }
+
+ *lockp = new_lock;
+
+ if (new_lock->l_export == req->rq_export) {
+ /* Already gave this to the client, which means that we
+ * reconstructed a reply. */
+ LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
+ MSG_RESENT);
+ RETURN(ELDLM_LOCK_REPLACED);
+ }
+
+ /* Fixup the lock to be given to the client */
+ l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
+ new_lock->l_readers = 0;
+ new_lock->l_writers = 0;
+
+ new_lock->l_export = class_export_get(req->rq_export);
+ list_add(&new_lock->l_export_chain,
+ &new_lock->l_export->exp_ldlm_data.led_held_locks);
+
+ new_lock->l_blocking_ast = lock->l_blocking_ast;
+ new_lock->l_completion_ast = lock->l_completion_ast;
+
+ memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle,
+ sizeof(lock->l_remote_handle));
+
+ new_lock->l_flags &= ~LDLM_FL_LOCAL;
+
+ LDLM_LOCK_PUT(new_lock);
+ l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
+
+ RETURN(ELDLM_LOCK_REPLACED);
}
static int mdt_config(struct lu_context *ctx, struct mdt_device *m,