Whamcloud - gitweb
LU-5042 ldlm: delay filling resource's LVB upon replay 45/10845/23
authorBruno Faccini <bruno.faccini@intel.com>
Thu, 26 Jun 2014 09:03:52 +0000 (11:03 +0200)
committerOleg Drokin <oleg.drokin@intel.com>
Mon, 25 Aug 2014 04:22:54 +0000 (04:22 +0000)
This patch is an attempt to delay unnecessary filling+resend of
resource's LVB upon replay after Server reboot.
This should avoid recovery to take a very long time when
replaying a huge number of locks and due to all associated LVBs
beeing read from disk. Now resource's LVB is only read upon need
to be sent to a new Client.

Signed-off-by: Bruno Faccini <bruno.faccini@intel.com>
Change-Id: I20bd20bce328953c46accb4b41dcba776f3608a6
Reviewed-on: http://review.whamcloud.com/10845
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Jinshan Xiong <jinshan.xiong@intel.com>
Reviewed-by: Niu Yawei <yawei.niu@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lustre_dlm.h
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/ldlm/ldlm_resource.c
lustre/ofd/ofd_io.c

index 2aa2e7d..7aab315 100644 (file)
@@ -286,6 +286,9 @@ typedef int (*ldlm_cancel_cbt)(struct ldlm_lock *lock);
  * Currently LVBs are used by:
  *  - OSC-OST code to maintain current object size/times
  *  - layout lock code to return the layout when the layout lock is granted
+ *
+ * To ensure delayed LVB initialization, it is highly recommended to use the set
+ * of ldlm_[res_]lvbo_[init,update,fill]() functions.
  */
 struct ldlm_valblock_ops {
         int (*lvbo_init)(struct ldlm_resource *res);
@@ -956,6 +959,8 @@ struct ldlm_resource {
         */
        struct mutex            lr_lvb_mutex;
        int                     lr_lvb_len;
+       /** is lvb initialized ? */
+       bool                    lr_lvb_initialized;
        /** protected by lr_lock */
        void                    *lr_lvb_data;
 
@@ -1006,11 +1011,32 @@ ldlm_lock_to_ns_at(struct ldlm_lock *lock)
 static inline int ldlm_lvbo_init(struct ldlm_resource *res)
 {
        struct ldlm_namespace *ns = ldlm_res_to_ns(res);
+       int rc = 0;
 
-       if (ns->ns_lvbo != NULL && ns->ns_lvbo->lvbo_init != NULL)
-               return ns->ns_lvbo->lvbo_init(res);
+       if (ns->ns_lvbo == NULL || ns->ns_lvbo->lvbo_init == NULL ||
+           res->lr_lvb_initialized)
+               return 0;
 
-       return 0;
+       mutex_lock(&res->lr_lvb_mutex);
+       /* Did we lose the race? */
+       if (res->lr_lvb_initialized) {
+               mutex_unlock(&res->lr_lvb_mutex);
+               return 0;
+       }
+       rc = ns->ns_lvbo->lvbo_init(res);
+       if (rc < 0) {
+               CDEBUG(D_DLMTRACE, "lvbo_init failed for resource : rc = %d\n",
+                      rc);
+               if (res->lr_lvb_data != NULL) {
+                       OBD_FREE(res->lr_lvb_data, res->lr_lvb_len);
+                       res->lr_lvb_data = NULL;
+               }
+               res->lr_lvb_len = rc;
+       } else {
+               res->lr_lvb_initialized = true;
+       }
+       mutex_unlock(&res->lr_lvb_mutex);
+       return rc;
 }
 
 static inline int ldlm_lvbo_size(struct ldlm_lock *lock)
@@ -1026,9 +1052,17 @@ static inline int ldlm_lvbo_size(struct ldlm_lock *lock)
 static inline int ldlm_lvbo_fill(struct ldlm_lock *lock, void *buf, int len)
 {
        struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
+       int rc;
 
        if (ns->ns_lvbo != NULL) {
                LASSERT(ns->ns_lvbo->lvbo_fill != NULL);
+               /* init lvb now if not already */
+               rc = ldlm_lvbo_init(lock->l_resource);
+               if (rc < 0) {
+                       CERROR("lock %p: delayed lvb init failed (rc %d)",
+                              lock, rc);
+                       return rc;
+               }
                return ns->ns_lvbo->lvbo_fill(lock, buf, len);
        }
        return 0;
@@ -1245,6 +1279,15 @@ ldlm_handle2lock_long(const struct lustre_handle *h, __u64 flags)
 static inline int ldlm_res_lvbo_update(struct ldlm_resource *res,
                                        struct ptlrpc_request *r, int increase)
 {
+       int rc;
+
+       /* delayed lvb init may be required */
+       rc = ldlm_lvbo_init(res);
+       if (rc < 0) {
+               CERROR("delayed lvb init failed (rc %d)\n", rc);
+               return rc;
+       }
+
         if (ldlm_res_to_ns(res)->ns_lvbo &&
             ldlm_res_to_ns(res)->ns_lvbo->lvbo_update) {
                 return ldlm_res_to_ns(res)->ns_lvbo->lvbo_update(res, r,
index f56c776..fb7fc3b 100644 (file)
@@ -1173,6 +1173,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
         struct ldlm_lock *lock = NULL;
         void *cookie = NULL;
         int rc = 0;
+       struct ldlm_resource *res = NULL;
         ENTRY;
 
         LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
@@ -1268,6 +1269,21 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
         lock->l_remote_handle = dlm_req->lock_handle[0];
         LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
 
+       /* Initialize resource lvb but not for a lock being replayed since
+        * Client already got lvb sent in this case.
+        * This must occur early since some policy methods assume resource
+        * lvb is available (lr_lvb_data != NULL).
+        */
+       res = lock->l_resource;
+       if (!(flags & LDLM_FL_REPLAY)) {
+               /* non-replayed lock, delayed lvb init may need to be done */
+               rc = ldlm_lvbo_init(res);
+               if (rc < 0) {
+                       LDLM_ERROR(lock, "delayed lvb init failed (rc %d)", rc);
+                       GOTO(out, rc);
+               }
+       }
+
         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2);
         /* Don't enqueue a lock onto the export if it is been disonnected
          * due to eviction (bug 3822) or server umount (bug 24324).
@@ -1416,7 +1432,9 @@ existing_lock:
                                         req, lock);
                                buflen = req_capsule_get_size(&req->rq_pill,
                                                &RMF_DLM_LVB, RCL_SERVER);
-                               if (buflen > 0) {
+                               /* non-replayed lock, delayed lvb init may
+                                * need to be occur now */
+                               if ((buflen > 0) && !(flags & LDLM_FL_REPLAY)) {
                                        buflen = ldlm_lvbo_fill(lock, buf,
                                                                buflen);
                                        if (buflen >= 0)
@@ -1426,11 +1444,22 @@ existing_lock:
                                                        buflen, RCL_SERVER);
                                        else
                                                rc = buflen;
+                               } else if (flags & LDLM_FL_REPLAY) {
+                                       /* no LVB resend upon replay */
+                                       if (buflen > 0)
+                                               req_capsule_shrink(
+                                                       &req->rq_pill,
+                                                       &RMF_DLM_LVB,
+                                                       0, RCL_SERVER);
+                                       else
+                                               rc = buflen;
                                } else {
                                        rc = buflen;
                                }
                        }
-                } else {
+               }
+
+               if (rc != 0) {
                         lock_res_and_lock(lock);
                         ldlm_resource_unlink_lock(lock);
                         ldlm_lock_destroy_nolock(lock);
index 5e9d83e..19a3595 100644 (file)
@@ -445,6 +445,12 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
        if (IS_ERR(lock))
                GOTO(out_nolock, err = PTR_ERR(lock));
 
+       err = ldlm_lvbo_init(lock->l_resource);
+       if (err < 0) {
+               LDLM_ERROR(lock, "delayed lvb init failed (rc %d)", err);
+               GOTO(out, err);
+       }
+
         ldlm_lock2handle(lock, lockh);
 
         /* NB: we don't have any lock now (lock_res_and_lock)
index 5f6a645..85584ab 100644 (file)
@@ -1054,10 +1054,10 @@ static struct ldlm_resource *ldlm_resource_new(void)
        spin_lock_init(&res->lr_lock);
        lu_ref_init(&res->lr_reference);
 
-       /* The creator of the resource must unlock the mutex after LVB
-        * initialization. */
+       /* Since LVB init can be delayed now, there is no longer need to
+        * immediatelly acquire mutex here. */
        mutex_init(&res->lr_lvb_mutex);
-       mutex_lock(&res->lr_lvb_mutex);
+       res->lr_lvb_initialized = false;
 
        return res;
 }
@@ -1077,7 +1077,6 @@ ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent,
        cfs_hash_bd_t           bd;
        __u64                   version;
        int                     ns_refcount = 0;
-       int                     rc;
 
         LASSERT(ns != NULL);
         LASSERT(parent == NULL);
@@ -1088,7 +1087,7 @@ ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent,
         hnode = cfs_hash_bd_lookup_locked(ns->ns_rs_hash, &bd, (void *)name);
         if (hnode != NULL) {
                 cfs_hash_bd_unlock(ns->ns_rs_hash, &bd, 0);
-               GOTO(lvbo_init, res);
+               GOTO(found, res);
        }
 
        version = cfs_hash_bd_version_get(&bd);
@@ -1117,22 +1116,9 @@ ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent,
                cfs_hash_bd_unlock(ns->ns_rs_hash, &bd, 1);
                /* Clean lu_ref for failed resource. */
                lu_ref_fini(&res->lr_reference);
-               /* We have taken lr_lvb_mutex. Drop it. */
-               mutex_unlock(&res->lr_lvb_mutex);
                OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res);
-lvbo_init:
+found:
                res = hlist_entry(hnode, struct ldlm_resource, lr_hash);
-               /* Synchronize with regard to resource creation. */
-               if (ns->ns_lvbo && ns->ns_lvbo->lvbo_init) {
-                       mutex_lock(&res->lr_lvb_mutex);
-                       mutex_unlock(&res->lr_lvb_mutex);
-               }
-
-               if (unlikely(res->lr_lvb_len < 0)) {
-                       rc = res->lr_lvb_len;
-                       ldlm_resource_putref(res);
-                       res = ERR_PTR(rc);
-               }
                return res;
        }
        /* We won! Let's add the resource. */
@@ -1141,26 +1127,8 @@ lvbo_init:
                ns_refcount = ldlm_namespace_get_return(ns);
 
         cfs_hash_bd_unlock(ns->ns_rs_hash, &bd, 1);
-        if (ns->ns_lvbo && ns->ns_lvbo->lvbo_init) {
-                OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CREATE_RESOURCE, 2);
-                rc = ns->ns_lvbo->lvbo_init(res);
-               if (rc < 0) {
-                       CERROR("%s: lvbo_init failed for resource "LPX64":"
-                              LPX64": rc = %d\n", ns->ns_obd->obd_name,
-                              name->name[0], name->name[1], rc);
-                       if (res->lr_lvb_data) {
-                               OBD_FREE(res->lr_lvb_data, res->lr_lvb_len);
-                               res->lr_lvb_data = NULL;
-                       }
-                       res->lr_lvb_len = rc;
-                       mutex_unlock(&res->lr_lvb_mutex);
-                       ldlm_resource_putref(res);
-                       return ERR_PTR(rc);
-               }
-       }
 
-       /* We create resource with locked lr_lvb_mutex. */
-       mutex_unlock(&res->lr_lvb_mutex);
+       OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CREATE_RESOURCE, 2);
 
        /* Let's see if we happened to be the very first resource in this
         * namespace. If so, and this is a client namespace, we need to move
index 2c83be3..d8bef11 100644 (file)
@@ -1017,7 +1017,7 @@ int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
                        rs = ldlm_resource_get(ns, NULL, &info->fti_resid,
                                               LDLM_EXTENT, 0);
                        if (!IS_ERR(rs)) {
-                               ns->ns_lvbo->lvbo_update(rs, NULL, 1);
+                               ldlm_res_lvbo_update(rs, NULL, 1);
                                ldlm_resource_putref(rs);
                        }
                }