Whamcloud - gitweb
LU-1876 ldlm: extend ldlm_valblock_ops{}
authorJinshan Xiong <jinshan.xiong@whamcloud.com>
Mon, 17 Sep 2012 16:41:20 +0000 (09:41 -0700)
committerOleg Drokin <green@whamcloud.com>
Wed, 26 Sep 2012 03:08:31 +0000 (23:08 -0400)
To extend the usage of RQF_DLM_LVB for quota, layout lock and nano
seconds, two extra callbacks will be added:
 - lvbo_size: determine the size of lvb data
 - lvbo_fill: fill in RPC buffer by lvb data

Also framework of mdt_lvb is added for future extension.

Signed-off-by: Jinshan Xiong <jinshan.xiong@whamcloud.com>
Change-Id: I2b3022525f8fbef72549a25766aa73d94b8b997c
Reviewed-on: http://review.whamcloud.com/3960
Reviewed-by: Johann Lombardi <johann@whamcloud.com>
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Niu Yawei <niu@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
lustre/include/lustre_dlm.h
lustre/ldlm/ldlm_lockd.c
lustre/mdt/Makefile.in
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lvb.c [new file with mode: 0644]
lustre/obdfilter/filter_lvb.c
lustre/ofd/ofd_lvb.c

index 480bced..27c7fc2 100644 (file)
@@ -380,6 +380,10 @@ struct ldlm_valblock_ops {
                            struct ptlrpc_request *r,
                            int increase);
         int (*lvbo_free)(struct ldlm_resource *res);
+       /* Return size of lvb data appropriate RPC size can be reserved */
+       int (*lvbo_size)(struct ldlm_lock *lock);
+       /* Called to fill in lvb data to RPC buffer @buf */
+       int (*lvbo_fill)(struct ldlm_lock *lock, void *buf, int buflen);
 };
 
 typedef enum {
@@ -898,6 +902,37 @@ ldlm_lock_to_ns_at(struct ldlm_lock *lock)
         return &lock->l_resource->lr_ns_bucket->nsb_at_estimate;
 }
 
+static inline int ldlm_lvbo_init(struct ldlm_resource *res)
+{
+       struct ldlm_namespace *ns = ldlm_res_to_ns(res);
+
+       if (ns->ns_lvbo != NULL && ns->ns_lvbo->lvbo_init != NULL)
+               return ns->ns_lvbo->lvbo_init(res);
+
+       return 0;
+}
+
+static inline int ldlm_lvbo_size(struct ldlm_lock *lock)
+{
+       struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
+
+       if (ns->ns_lvbo != NULL && ns->ns_lvbo->lvbo_size != NULL)
+               return ns->ns_lvbo->lvbo_size(lock);
+
+       return 0;
+}
+
+static inline int ldlm_lvbo_fill(struct ldlm_lock *lock, void *buf, int len)
+{
+       struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
+
+       if (ns->ns_lvbo != NULL) {
+               LASSERT(ns->ns_lvbo->lvbo_fill != NULL);
+               return ns->ns_lvbo->lvbo_fill(lock, buf, len);
+       }
+       return 0;
+}
+
 struct ldlm_ast_work {
         struct ldlm_lock      *w_lock;
         int                    w_blocking;
index 5b56761..c62a1a2 100644 (file)
@@ -899,6 +899,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         long                    total_enqueue_wait;
         int                     instant_cancel = 0;
         int                     rc = 0;
+       int                     lvb_len;
         ENTRY;
 
         LASSERT(lock != NULL);
@@ -912,11 +913,11 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        /* server namespace, doesn't need lock */
-        if (lock->l_resource->lr_lvb_len) {
+       /* server namespace, doesn't need lock */
+       lvb_len = ldlm_lvbo_size(lock);
+       if (lvb_len > 0)
                  req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT,
-                                      lock->l_resource->lr_lvb_len);
-        }
+                                      lvb_len);
 
         rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK);
         if (rc) {
@@ -936,13 +937,12 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         body->lock_handle[0] = lock->l_remote_handle;
         body->lock_flags = flags;
         ldlm_lock2desc(lock, &body->lock_desc);
-        if (lock->l_resource->lr_lvb_len) {
-                void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
+       if (lvb_len > 0) {
+               void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
 
-                lock_res(lock->l_resource);
-                memcpy(lvb, lock->l_resource->lr_lvb_data,
-                       lock->l_resource->lr_lvb_len);
-                unlock_res(lock->l_resource);
+               lvb_len = ldlm_lvbo_fill(lock, lvb, lvb_len);
+               req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB,
+                                  lvb_len, RCL_CLIENT);
         }
 
         LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)",
@@ -1034,7 +1034,7 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
 
         /* server namespace, doesn't need lock */
         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
-                             lock->l_resource->lr_lvb_len);
+                             ldlm_lvbo_size(lock));
         ptlrpc_request_set_replen(req);
 
         req->rq_send_state = LUSTRE_IMP_FULL;
@@ -1233,11 +1233,8 @@ existing_lock:
                 /* based on the assumption that lvb size never changes during
                  * resource life time otherwise it need resource->lr_lock's
                  * protection */
-                if (lock->l_resource->lr_lvb_len) {
-                        req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
-                                             RCL_SERVER,
-                                             lock->l_resource->lr_lvb_len);
-                }
+               req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
+                                    RCL_SERVER, ldlm_lvbo_size(lock));
 
                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
                         GOTO(out, rc = -ENOMEM);
@@ -1342,21 +1339,22 @@ existing_lock:
                            "(err=%d, rc=%d)", err, rc);
 
                 if (rc == 0) {
-                        if (lock->l_resource->lr_lvb_len > 0) {
-                                /* MDT path won't handle lr_lvb_data, so
-                                 * lock/unlock better be contained in the
-                                 * if block */
-                                void *lvb;
-
-                                lvb = req_capsule_server_get(&req->rq_pill,
-                                                             &RMF_DLM_LVB);
-                                LASSERTF(lvb != NULL, "req %p, lock %p\n",
-                                         req, lock);
-                                lock_res(lock->l_resource);
-                                memcpy(lvb, lock->l_resource->lr_lvb_data,
-                                       lock->l_resource->lr_lvb_len);
-                                unlock_res(lock->l_resource);
-                        }
+                       int lvb_len = ldlm_lvbo_size(lock);
+
+                       if (lvb_len > 0) {
+                               void *buf;
+                               int buflen;
+
+                               buf = req_capsule_server_get(&req->rq_pill,
+                                                            &RMF_DLM_LVB);
+                               LASSERTF(buf != NULL, "req %p, lock %p\n",
+                                        req, lock);
+                               buflen = req_capsule_get_size(&req->rq_pill,
+                                               &RMF_DLM_LVB, RCL_SERVER);
+                               buflen = ldlm_lvbo_fill(lock, buf, buflen);
+                               req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB,
+                                                  buflen, RCL_SERVER);
+                       }
                 } else {
                         lock_res_and_lock(lock);
                         ldlm_resource_unlink_lock(lock);
index e88367c..a1be690 100644 (file)
@@ -1,5 +1,6 @@
 MODULES := mdt
 mdt-objs := mdt_handler.o mdt_lib.o mdt_reint.o mdt_xattr.o mdt_recovery.o
 mdt-objs += mdt_open.o mdt_idmap.o mdt_identity.o mdt_capa.o mdt_lproc.o mdt_fs.o
+mdt-objs += mdt_lvb.o
 
 @INCLUDE_RULES@
index 71dce5b..b0c86fa 100644 (file)
@@ -5108,6 +5108,9 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         if (m->mdt_namespace == NULL)
                 GOTO(err_fini_seq, rc = -ENOMEM);
 
+       m->mdt_namespace->ns_lvbp = m;
+       m->mdt_namespace->ns_lvbo = &mdt_lvbo;
+
         ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
         /* set obd_namespace for compatibility with old code */
         obd->obd_namespace = m->mdt_namespace;
index f1d60b8..93a5e75 100644 (file)
@@ -754,6 +754,9 @@ static inline ldlm_mode_t mdt_mdl_mode2dlm_mode(mdl_mode_t mode)
         return mdt_dlm_lock_modes[mode];
 }
 
+/* mdt_lvb.c */
+extern struct ldlm_valblock_ops mdt_lvbo;
+
 static inline struct lu_name *mdt_name(const struct lu_env *env,
                                        char *name, int namelen)
 {
diff --git a/lustre/mdt/mdt_lvb.c b/lustre/mdt/mdt_lvb.c
new file mode 100644 (file)
index 0000000..a61d46a
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2012, Intel, Inc.
+ * Use is subject to license terms.
+ *
+ * lustre/mdt/mdt_lvb.c
+ *
+ * Author: Jinshan Xiong <jinshan.xiong@intel.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include "mdt_internal.h"
+
+/* Called with res->lr_lvb_sem held */
+static int mdt_lvbo_init(struct ldlm_resource *res)
+{
+       return 0;
+}
+
+static int mdt_lvbo_size(struct ldlm_lock *lock)
+{
+       return 0;
+}
+
+static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen)
+{
+       return 0;
+}
+
+struct ldlm_valblock_ops mdt_lvbo = {
+       lvbo_init:      mdt_lvbo_init,
+       lvbo_size:      mdt_lvbo_size,
+       lvbo_fill:      mdt_lvbo_fill
+};
index a548983..b488387 100644 (file)
@@ -265,8 +265,29 @@ out:
         return rc;
 }
 
+static int filter_lvbo_size(struct ldlm_lock *unused)
+{
+       return sizeof(struct ost_lvb);
+}
+
+static int filter_lvbo_fill(struct ldlm_lock *lock,
+                           void *buf, int buflen)
+{
+       struct ldlm_resource *res = lock->l_resource;
+
+       lock_res(res);
+       LASSERTF(buflen >= res->lr_lvb_len,
+                "actual %d, want %d\n", buflen, res->lr_lvb_len);
+       memcpy(buf, res->lr_lvb_data, res->lr_lvb_len);
+       unlock_res(res);
+
+       return res->lr_lvb_len;
+}
+
 struct ldlm_valblock_ops filter_lvbo = {
         lvbo_init: filter_lvbo_init,
         lvbo_update: filter_lvbo_update,
-        lvbo_free: filter_lvbo_free
+        lvbo_free: filter_lvbo_free,
+       lvbo_size: filter_lvbo_size,
+       lvbo_fill: filter_lvbo_fill
 };
index 208198c..ae80fdb 100644 (file)
@@ -243,8 +243,28 @@ out_unlock:
        return rc;
 }
 
+static int ofd_lvbo_size(struct ldlm_lock *unused)
+{
+       return sizeof(struct ost_lvb);
+}
+
+static int ofd_lvbo_fill(struct ldlm_lock *lock, void *buf, int buflen)
+{
+       struct ldlm_resource *res = lock->l_resource;
+
+       lock_res(res);
+       LASSERTF(buflen >= res->lr_lvb_len,
+                "actual %d, want %d\n", buflen, res->lr_lvb_len);
+       memcpy(buf, res->lr_lvb_data, res->lr_lvb_len);
+       unlock_res(res);
+
+       return res->lr_lvb_len;
+}
+
 struct ldlm_valblock_ops ofd_lvbo = {
        lvbo_init:      ofd_lvbo_init,
        lvbo_update:    ofd_lvbo_update,
        lvbo_free:      ofd_lvbo_free,
+       lvbo_size:      ofd_lvbo_size,
+       lvbo_fill:      ofd_lvbo_fill
 };