Whamcloud - gitweb
mdt prototype: 0. add handlers for dlm requests; 1. add fid_build_res_name() and...
authornikita <nikita>
Thu, 6 Apr 2006 13:47:08 +0000 (13:47 +0000)
committernikita <nikita>
Thu, 6 Apr 2006 13:47:08 +0000 (13:47 +0000)
lustre/include/linux/lustre_dlm.h
lustre/ldlm/ldlm_lockd.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h

index 7d03b63..9d632eb 100644 (file)
@@ -444,6 +444,11 @@ int ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data);
 /* ldlm_extent.c */
 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms);
 
+struct ldlm_callback_suite {
+        ldlm_completion_callback lcs_completion;
+        ldlm_blocking_callback   lcs_blocking;
+        ldlm_glimpse_callback    lcs_glimpse;
+};
 
 /* ldlm_lockd.c */
 int ldlm_server_blocking_ast(struct ldlm_lock *, struct ldlm_lock_desc *,
@@ -452,7 +457,12 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data);
 int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data);
 int ldlm_handle_enqueue(struct ptlrpc_request *req, ldlm_completion_callback,
                         ldlm_blocking_callback, ldlm_glimpse_callback);
+int ldlm_handle_enqueue0(struct ptlrpc_request *req,
+                         struct ldlm_request *dlm_req,
+                         struct ldlm_callback_suite *cbs);
 int ldlm_handle_convert(struct ptlrpc_request *req);
+int ldlm_handle_convert0(struct ptlrpc_request *req,
+                         struct ldlm_request *dlm_req);
 int ldlm_handle_cancel(struct ptlrpc_request *req);
 int ldlm_del_waiting_lock(struct ldlm_lock *lock);
 int ldlm_get_ref(void);
index 25e042c..a6da2a9 100644 (file)
@@ -671,14 +671,12 @@ find_existing_lock(struct obd_export *exp, struct lustre_handle *remote_hdl)
  * Main server-side entry point into LDLM. This is called by ptlrpc service
  * threads to carry out client lock enqueueing requests.
  */
-int ldlm_handle_enqueue(struct ptlrpc_request *req,
-                        ldlm_completion_callback completion_callback,
-                        ldlm_blocking_callback blocking_callback,
-                        ldlm_glimpse_callback glimpse_callback)
+int ldlm_handle_enqueue0(struct ptlrpc_request *req,
+                         struct ldlm_request *dlm_req,
+                         struct ldlm_callback_suite *cbs)
 {
         struct obd_device *obddev = req->rq_export->exp_obd;
         struct ldlm_reply *dlm_rep;
-        struct ldlm_request *dlm_req;
         int rc = 0, size[2] = {sizeof(*dlm_rep)};
         __u32 flags;
         ldlm_error_t err = ELDLM_OK;
@@ -688,14 +686,6 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
 
         LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
 
-        dlm_req = lustre_swab_reqbuf (req, MDS_REQ_INTENT_LOCKREQ_OFF,
-                                      sizeof (*dlm_req),
-                                      lustre_swab_ldlm_request);
-        if (dlm_req == NULL) {
-                CERROR ("Can't unpack dlm_req\n");
-                GOTO(out, rc = -EFAULT);
-        }
-
         flags = dlm_req->lock_flags;
 
         LASSERT(req->rq_export);
@@ -759,8 +749,8 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
                                 dlm_req->lock_desc.l_resource.lr_name,
                                 dlm_req->lock_desc.l_resource.lr_type,
                                 dlm_req->lock_desc.l_req_mode,
-                                blocking_callback, completion_callback,
-                                glimpse_callback, NULL, 0);
+                                cbs->lcs_blocking, cbs->lcs_completion,
+                                cbs->lcs_glimpse, NULL, 0);
         if (!lock)
                 GOTO(out, rc = -ENOMEM);
 
@@ -918,20 +908,38 @@ existing_lock:
         return rc;
 }
 
-int ldlm_handle_convert(struct ptlrpc_request *req)
+int ldlm_handle_enqueue(struct ptlrpc_request *req,
+                        ldlm_completion_callback completion_callback,
+                        ldlm_blocking_callback blocking_callback,
+                        ldlm_glimpse_callback glimpse_callback)
 {
+        int rc;
         struct ldlm_request *dlm_req;
-        struct ldlm_reply *dlm_rep;
-        struct ldlm_lock *lock;
-        int rc, size = sizeof(*dlm_rep);
-        ENTRY;
+        struct ldlm_callback_suite cbs = {
+                .lcs_completion = completion_callback,
+                .lcs_blocking   = blocking_callback,
+                .lcs_glimpse    = glimpse_callback
+        };
 
-        dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
-                                      lustre_swab_ldlm_request);
+
+        dlm_req = lustre_swab_reqbuf(req, MDS_REQ_INTENT_LOCKREQ_OFF,
+                                     sizeof *dlm_req, lustre_swab_ldlm_request);
         if (dlm_req == NULL) {
+                rc = ldlm_handle_enqueue0(req, dlm_req, &cbs);
+        } else {
                 CERROR ("Can't unpack dlm_req\n");
-                RETURN (-EFAULT);
+                rc = -EFAULT;
         }
+        return rc;
+}
+
+int ldlm_handle_convert0(struct ptlrpc_request *req,
+                         struct ldlm_request *dlm_req)
+{
+        struct ldlm_reply *dlm_rep;
+        struct ldlm_lock *lock;
+        int rc, size = sizeof(*dlm_rep);
+        ENTRY;
 
         rc = lustre_pack_reply(req, 1, &size, NULL);
         if (rc) {
@@ -977,6 +985,22 @@ int ldlm_handle_convert(struct ptlrpc_request *req)
         RETURN(0);
 }
 
+int ldlm_handle_convert(struct ptlrpc_request *req)
+{
+        int rc;
+        struct ldlm_request *dlm_req;
+
+        dlm_req = lustre_swab_reqbuf(req, 0, sizeof *dlm_req,
+                                     lustre_swab_ldlm_request);
+        if (dlm_req != NULL) {
+                rc = ldlm_handle_convert0(req, dlm_req);
+        } else {
+                CERROR ("Can't unpack dlm_req\n");
+                rc = -EFAULT;
+        }
+        return rc;
+}
+
 int ldlm_handle_cancel(struct ptlrpc_request *req)
 {
         struct ldlm_request *dlm_req;
@@ -1758,8 +1782,10 @@ EXPORT_SYMBOL(ldlm_server_blocking_ast);
 EXPORT_SYMBOL(ldlm_server_completion_ast);
 EXPORT_SYMBOL(ldlm_server_glimpse_ast);
 EXPORT_SYMBOL(ldlm_handle_enqueue);
+EXPORT_SYMBOL(ldlm_handle_enqueue0);
 EXPORT_SYMBOL(ldlm_handle_cancel);
 EXPORT_SYMBOL(ldlm_handle_convert);
+EXPORT_SYMBOL(ldlm_handle_convert0);
 EXPORT_SYMBOL(ldlm_del_waiting_lock);
 EXPORT_SYMBOL(ldlm_get_ref);
 EXPORT_SYMBOL(ldlm_put_ref);
index 349737f..670c7aa 100644 (file)
@@ -188,12 +188,80 @@ static int mdt_handle_quotactl(struct mdt_thread_info *info,
         return -EOPNOTSUPP;
 }
 
+/*
+ * DLM handlers.
+ */
+
+static struct ldlm_callback_suite cbs = {
+        .lcs_completion = ldlm_server_completion_ast,
+        .lcs_blocking   = ldlm_server_blocking_ast,
+        .lcs_glimpse    = NULL
+};
+
+static int mdt_enqueue(struct mdt_thread_info *info,
+                       struct ptlrpc_request *req, int offset)
+{
+        /*
+         * info->mti_dlm_req already contains swapped and (if necessary)
+         * converted dlm request.
+         */
+        LASSERT(info->mti_dlm_req);
+
+        info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
+        return ldlm_handle_enqueue0(req, info->mti_dlm_req, &cbs);
+}
+
+static int mdt_convert(struct mdt_thread_info *info,
+                       struct ptlrpc_request *req, int offset)
+{
+        LASSERT(info->mti_dlm_req);
+        return ldlm_handle_convert0(req, info->mti_dlm_req);
+}
+
+static int mdt_bl_callback(struct mdt_thread_info *info,
+                           struct ptlrpc_request *req, int offset)
+{
+        CERROR("bl callbacks should not happen on MDS\n");
+        LBUG();
+        return -EOPNOTSUPP;
+}
+
+static int mdt_cp_callback(struct mdt_thread_info *info,
+                           struct ptlrpc_request *req, int offset)
+{
+        CERROR("cp callbacks should not happen on MDS\n");
+        LBUG();
+        return -EOPNOTSUPP;
+}
+
+/*
+ * Build (DLM) resource name from fid.
+ */
+struct ldlm_res_id *fid_build_res_name(const struct lu_fid *f,
+                                       struct ldlm_res_id *name)
+{
+        memset(name, 0, sizeof *name);
+        /* we use fid_num() whoch includes also object version instread of raw
+         * fid_oid(). */
+        name->name[0] = fid_seq(f);
+        name->name[1] = fid_num(f);
+        return name;
+}
+
+/*
+ * Return true if resource is for object identified by fid.
+ */
+int fid_res_name_eq(const struct lu_fid *f, const struct ldlm_res_id *name)
+{
+        return name->name[0] == fid_seq(f) && name->name[1] == fid_num(f);
+}
+
 /* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
 int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
              struct lustre_handle *lh, ldlm_mode_t mode,
              ldlm_policy_data_t *policy)
 {
-        struct ldlm_res_id res_id = { .name = {0} };
+        struct ldlm_res_id res_id;
         int flags = 0, rc;
         ENTRY;
 
@@ -201,16 +269,11 @@ int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
         LASSERT(lh != NULL);
         LASSERT(f != NULL);
 
-        /* we use fid_num() whoch includes also object version instread of raw
-         * fid_oid(). */
-        res_id.name[0] = fid_seq(f);
-        res_id.name[1] = fid_num(f);
-
         /* FIXME: is that correct to have @flags=0 here? */
-        rc = ldlm_cli_enqueue(NULL, NULL, ns, res_id, LDLM_IBITS, policy,
-                              mode, &flags, ldlm_blocking_ast,
-                              ldlm_completion_ast, NULL, NULL,
-                              NULL, 0, NULL, lh);
+        rc = ldlm_cli_enqueue(NULL, NULL, ns, *fid_build_res_name(f, &res_id),
+                              LDLM_IBITS, policy, mode, &flags,
+                              ldlm_blocking_ast, ldlm_completion_ast, NULL,
+                              NULL, NULL, 0, NULL, lh);
         RETURN (rc == ELDLM_OK ? 0 : -EIO);
 }
 
@@ -227,8 +290,7 @@ void fid_unlock(struct ldlm_namespace *ns, const struct lu_fid *f,
                 LBUG();
         }
 
-        LASSERT(fid_seq(f) == lock->l_resource->lr_name.name[0] &&
-                fid_num(f) == lock->l_resource->lr_name.name[1]);
+        LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name));
 
         ldlm_lock_decref(lh, mode);
         EXIT;
@@ -325,7 +387,12 @@ enum mdt_handler_flags {
         /*
          * struct mdt_body is passed in the 0-th incoming buffer.
          */
-        HABEO_CORPUS = (1 << 0)
+        HABEO_CORPUS = (1 << 0),
+        /*
+         * struct ldlm_request is passed in MDS_REQ_INTENT_LOCKREQ_OFF-th
+         * incoming buffer.
+         */
+        HABEO_CLAVIS   = (1 << 1)
 };
 
 struct mdt_opc_slice {
@@ -360,6 +427,19 @@ static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
         return req->rq_export->exp_mds_data.med_mcd->mcd_last_xid;
 }
 
+static int mdt_lock_resname_compat(struct mdt_device *m,
+                                   struct ldlm_request *req)
+{
+        /* XXX something... later. */
+        return 0;
+}
+
+static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
+{
+        /* XXX something... later. */
+        return 0;
+}
+
 /*
  * Invoke handler for this request opc. Also do necessary preprocessing
  * (according to handler ->mh_flags), and post-processing (setting of
@@ -371,6 +451,7 @@ static int mdt_req_handle(struct mdt_thread_info *info,
 {
         int result;
         int off;
+        int lock_conv;
 
         ENTRY;
 
@@ -384,19 +465,40 @@ static int mdt_req_handle(struct mdt_thread_info *info,
                 OBD_FAIL_RETURN(h->mh_fail_id, 0);
 
         off = MDS_REQ_REC_OFF + shift;
+        lock_conv =
+                h->mh_flags & HABEO_CLAVIS &&
+                info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME;
+
         result = 0;
         if (h->mh_flags & HABEO_CORPUS) {
-                info->mti_body = lustre_swab_reqbuf(req, off,
-                                                    sizeof *info->mti_body,
-                                                    lustre_swab_mdt_body);
-                if (info->mti_body == NULL) {
+                struct mdt_body *body;
+
+                body = info->mti_body =
+                        lustre_swab_reqbuf(req, off, sizeof *info->mti_body,
+                                           lustre_swab_mdt_body);
+                if (body != NULL) {
+                        info->mti_object = mdt_object_find(info->mti_mdt,
+                                                           &body->fid1);
+                        if (IS_ERR(info->mti_object))
+                                result = PTR_ERR(info->mti_object);
+                } else {
                         CERROR("Can't unpack body\n");
-                        result = req->rq_status = -EFAULT;
+                        result = -EFAULT;
+                }
+        } else if (lock_conv) {
+                struct ldlm_request *dlm;
+
+                LASSERT(shift == 0);
+                dlm = info->mti_dlm_req =
+                        lustre_swab_reqbuf(req, MDS_REQ_INTENT_LOCKREQ_OFF,
+                                           sizeof *dlm,
+                                           lustre_swab_ldlm_request);
+                if (dlm != NULL)
+                        result = mdt_lock_resname_compat(info->mti_mdt, dlm);
+                else {
+                        CERROR("Can't unpack dlm request\n");
+                        result = -EFAULT;
                 }
-                info->mti_object = mdt_object_find(info->mti_mdt,
-                                                   &info->mti_body->fid1);
-                if (IS_ERR(info->mti_object))
-                        result = PTR_ERR(info->mti_object);
         }
         if (result == 0)
                 /*
@@ -414,6 +516,14 @@ static int mdt_req_handle(struct mdt_thread_info *info,
 
         LASSERT(current->journal_info == NULL);
 
+        if (lock_conv) {
+                struct ldlm_reply *rep;
+
+                rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof *rep);
+                if (rep != NULL)
+                        result = mdt_lock_reply_compat(info->mti_mdt, rep);
+        }
+
         /* If we're DISCONNECTing, the mds_export_data is already freed */
         if (result == 0 && h->mh_opc != MDS_DISCONNECT) {
                 req->rq_reqmsg->last_xid = le64_to_cpu(req_exp_last_xid(req));
@@ -559,8 +669,8 @@ static int mdt_filter_recovery_request(struct ptlrpc_request *req,
 
 /*
  * Handle recovery. Return:
- *       +ve: continue request processing;
- *       -ve: abort immediately with given (negated) error code;
+ *        +1: continue request processing;
+ *       -ve: abort immediately with the given error code;
  *         0: send reply with error code in req->rq_status;
  */
 static int mdt_recovery(struct ptlrpc_request *req)
@@ -642,8 +752,9 @@ static int mdt_reply(struct ptlrpc_request *req, struct mdt_thread_info *info)
 
 static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info)
 {
-        struct lustre_msg *msg;
-        int                result;
+        struct mdt_handler *h;
+        struct lustre_msg  *msg;
+        int                 result;
 
         ENTRY;
 
@@ -653,27 +764,25 @@ static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info)
 
         msg = req->rq_reqmsg;
         result = mds_msg_check_version(msg);
-        if (result != 0) {
-                CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
-                RETURN(result);
-        }
-
-        result = mdt_recovery(req);
-        if (result > 0) {
-                struct mdt_handler *h;
-
-                h = mdt_handler_find(msg->opc);
-                if (h != NULL)
-                        result = mdt_req_handle(info, h, req, 0);
-                else {
-                        req->rq_status = -ENOTSUPP;
-                        result = ptlrpc_error(req);
-                        RETURN(result);
+        if (result == 0) {
+                result = mdt_recovery(req);
+                switch (result) {
+                case +1:
+                        h = mdt_handler_find(msg->opc);
+                        if (h != NULL)
+                                result = mdt_req_handle(info, h, req, 0);
+                        else {
+                                req->rq_status = -ENOTSUPP;
+                                result = ptlrpc_error(req);
+                                break;
+                        }
+                        /* fall through */
+                case 0:
+                        result = mdt_reply(req, info);
                 }
-        } else if (result < 0)
-                RETURN(result);
-
-        RETURN(mdt_reply(req, info));
+        } else
+                CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
+        RETURN(result);
 }
 
 static struct mdt_device *mdt_dev(struct lu_device *d)
@@ -1009,16 +1118,17 @@ static void __exit mdt_mod_exit(void)
 }
 
 
-#define DEF_HNDL(prefix, base, flags, opc, fn)                        \
-[prefix ## _ ## opc - prefix ## _ ## base] = {                        \
-        .mh_name    = #opc,                                        \
-        .mh_fail_id = OBD_FAIL_ ## prefix ## _  ## opc ## _NET,        \
-        .mh_opc     = prefix ## _  ## opc,                        \
-        .mh_flags   = flags,                                        \
-        .mh_act     = fn                                        \
+#define DEF_HNDL(prefix, base, suffix, flags, opc, fn)                  \
+[prefix ## _ ## opc - prefix ## _ ## base] = {                          \
+        .mh_name    = #opc,                                             \
+        .mh_fail_id = OBD_FAIL_ ## prefix ## _  ## opc ## suffix,       \
+        .mh_opc     = prefix ## _  ## opc,                              \
+        .mh_flags   = flags,                                            \
+        .mh_act     = fn                                                \
 }
 
-#define DEF_MDT_HNDL(flags, name, fn) DEF_HNDL(MDS, GETATTR, flags, name, fn)
+#define DEF_MDT_HNDL(flags, name, fn)                   \
+        DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn)
 
 static struct mdt_handler mdt_mds_ops[] = {
         DEF_MDT_HNDL(0,            CONNECT,        mdt_connect),
@@ -1037,13 +1147,20 @@ static struct mdt_handler mdt_mds_ops[] = {
         DEF_MDT_HNDL(HABEO_CORPUS, SYNC,           mdt_sync),
         DEF_MDT_HNDL(0,            SET_INFO,       mdt_set_info),
         DEF_MDT_HNDL(0,            QUOTACHECK,     mdt_handle_quotacheck),
-        DEF_MDT_HNDL(0,            QUOTACTL,       mdt_handle_quotactl),
+        DEF_MDT_HNDL(0,            QUOTACTL,       mdt_handle_quotactl)
 };
 
 static struct mdt_handler mdt_obd_ops[] = {
 };
 
+#define DEF_DLM_HNDL(flags, name, fn)                   \
+        DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn)
+
 static struct mdt_handler mdt_dlm_ops[] = {
+        DEF_DLM_HNDL(HABEO_CLAVIS, ENQUEUE,        mdt_enqueue),
+        DEF_DLM_HNDL(HABEO_CLAVIS, CONVERT,        mdt_convert),
+        DEF_DLM_HNDL(0,            BL_CALLBACK,    mdt_bl_callback),
+        DEF_DLM_HNDL(0,            CP_CALLBACK,    mdt_cp_callback)
 };
 
 static struct mdt_handler mdt_llog_ops[] = {
index d57e315..cd4ae32 100644 (file)
@@ -84,6 +84,19 @@ struct mdt_device {
         struct ptlrpc_client       mdt_ldlm_client;
         /* underlying device */
         struct md_device          *mdt_child;
+        /*
+         * Device flags, taken from enum mdt_flags. No locking (so far) is
+         * necessary.
+         */
+        unsigned long              mdt_flags;
+};
+
+enum mdt_flags {
+        /*
+         * This mdt works with legacy clients with different resource name
+         * encoding (pre-fid, etc.).
+         */
+        MDT_CL_COMPAT_RESNAME = 1 << 0,
 };
 
 struct md_object {
@@ -161,6 +174,10 @@ struct mdt_thread_info {
          */
         struct mdt_body       *mti_body;
         /*
+         * Lock request for "habeo clavis" operations.
+         */
+        struct ldlm_request   *mti_dlm_req;
+        /*
          * Host object. This is released at the end of mdt_handler().
          */
         struct mdt_object     *mti_object;