From 8739f13233e0827d7f4c8db9a8a539281b9bc9f8 Mon Sep 17 00:00:00 2001 From: Bruno Faccini Date: Thu, 26 Jun 2014 11:03:52 +0200 Subject: [PATCH] LU-5042 ldlm: delay filling resource's LVB upon replay This patch is an attempt to delay unnecessary filling+resend of resource's LVB upon replay after Server reboot. This should avoid recovery to take a very long time when replaying a huge number of locks and due to all associated LVBs beeing read from disk. Now resource's LVB is only read upon need to be sent to a new Client. Signed-off-by: Bruno Faccini Change-Id: I20bd20bce328953c46accb4b41dcba776f3608a6 Reviewed-on: http://review.whamcloud.com/10845 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Jinshan Xiong Reviewed-by: Niu Yawei Reviewed-by: Oleg Drokin --- lustre/include/lustre_dlm.h | 49 ++++++++++++++++++++++++++++++++++++++++++--- lustre/ldlm/ldlm_lockd.c | 33 ++++++++++++++++++++++++++++-- lustre/ldlm/ldlm_request.c | 6 ++++++ lustre/ldlm/ldlm_resource.c | 44 ++++++---------------------------------- lustre/ofd/ofd_io.c | 2 +- 5 files changed, 90 insertions(+), 44 deletions(-) diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 2aa2e7de..7aab315 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -286,6 +286,9 @@ typedef int (*ldlm_cancel_cbt)(struct ldlm_lock *lock); * Currently LVBs are used by: * - OSC-OST code to maintain current object size/times * - layout lock code to return the layout when the layout lock is granted + * + * To ensure delayed LVB initialization, it is highly recommended to use the set + * of ldlm_[res_]lvbo_[init,update,fill]() functions. */ struct ldlm_valblock_ops { int (*lvbo_init)(struct ldlm_resource *res); @@ -956,6 +959,8 @@ struct ldlm_resource { */ struct mutex lr_lvb_mutex; int lr_lvb_len; + /** is lvb initialized ? */ + bool lr_lvb_initialized; /** protected by lr_lock */ void *lr_lvb_data; @@ -1006,11 +1011,32 @@ ldlm_lock_to_ns_at(struct ldlm_lock *lock) static inline int ldlm_lvbo_init(struct ldlm_resource *res) { struct ldlm_namespace *ns = ldlm_res_to_ns(res); + int rc = 0; - if (ns->ns_lvbo != NULL && ns->ns_lvbo->lvbo_init != NULL) - return ns->ns_lvbo->lvbo_init(res); + if (ns->ns_lvbo == NULL || ns->ns_lvbo->lvbo_init == NULL || + res->lr_lvb_initialized) + return 0; - return 0; + mutex_lock(&res->lr_lvb_mutex); + /* Did we lose the race? */ + if (res->lr_lvb_initialized) { + mutex_unlock(&res->lr_lvb_mutex); + return 0; + } + rc = ns->ns_lvbo->lvbo_init(res); + if (rc < 0) { + CDEBUG(D_DLMTRACE, "lvbo_init failed for resource : rc = %d\n", + rc); + if (res->lr_lvb_data != NULL) { + OBD_FREE(res->lr_lvb_data, res->lr_lvb_len); + res->lr_lvb_data = NULL; + } + res->lr_lvb_len = rc; + } else { + res->lr_lvb_initialized = true; + } + mutex_unlock(&res->lr_lvb_mutex); + return rc; } static inline int ldlm_lvbo_size(struct ldlm_lock *lock) @@ -1026,9 +1052,17 @@ static inline int ldlm_lvbo_size(struct ldlm_lock *lock) static inline int ldlm_lvbo_fill(struct ldlm_lock *lock, void *buf, int len) { struct ldlm_namespace *ns = ldlm_lock_to_ns(lock); + int rc; if (ns->ns_lvbo != NULL) { LASSERT(ns->ns_lvbo->lvbo_fill != NULL); + /* init lvb now if not already */ + rc = ldlm_lvbo_init(lock->l_resource); + if (rc < 0) { + CERROR("lock %p: delayed lvb init failed (rc %d)", + lock, rc); + return rc; + } return ns->ns_lvbo->lvbo_fill(lock, buf, len); } return 0; @@ -1245,6 +1279,15 @@ ldlm_handle2lock_long(const struct lustre_handle *h, __u64 flags) static inline int ldlm_res_lvbo_update(struct ldlm_resource *res, struct ptlrpc_request *r, int increase) { + int rc; + + /* delayed lvb init may be required */ + rc = ldlm_lvbo_init(res); + if (rc < 0) { + CERROR("delayed lvb init failed (rc %d)\n", rc); + return rc; + } + if (ldlm_res_to_ns(res)->ns_lvbo && ldlm_res_to_ns(res)->ns_lvbo->lvbo_update) { return ldlm_res_to_ns(res)->ns_lvbo->lvbo_update(res, r, diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index f56c776..fb7fc3b 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -1173,6 +1173,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, struct ldlm_lock *lock = NULL; void *cookie = NULL; int rc = 0; + struct ldlm_resource *res = NULL; ENTRY; LDLM_DEBUG_NOLOCK("server-side enqueue handler START"); @@ -1268,6 +1269,21 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, lock->l_remote_handle = dlm_req->lock_handle[0]; LDLM_DEBUG(lock, "server-side enqueue handler, new lock created"); + /* Initialize resource lvb but not for a lock being replayed since + * Client already got lvb sent in this case. + * This must occur early since some policy methods assume resource + * lvb is available (lr_lvb_data != NULL). + */ + res = lock->l_resource; + if (!(flags & LDLM_FL_REPLAY)) { + /* non-replayed lock, delayed lvb init may need to be done */ + rc = ldlm_lvbo_init(res); + if (rc < 0) { + LDLM_ERROR(lock, "delayed lvb init failed (rc %d)", rc); + GOTO(out, rc); + } + } + OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2); /* Don't enqueue a lock onto the export if it is been disonnected * due to eviction (bug 3822) or server umount (bug 24324). @@ -1416,7 +1432,9 @@ existing_lock: req, lock); buflen = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER); - if (buflen > 0) { + /* non-replayed lock, delayed lvb init may + * need to be occur now */ + if ((buflen > 0) && !(flags & LDLM_FL_REPLAY)) { buflen = ldlm_lvbo_fill(lock, buf, buflen); if (buflen >= 0) @@ -1426,11 +1444,22 @@ existing_lock: buflen, RCL_SERVER); else rc = buflen; + } else if (flags & LDLM_FL_REPLAY) { + /* no LVB resend upon replay */ + if (buflen > 0) + req_capsule_shrink( + &req->rq_pill, + &RMF_DLM_LVB, + 0, RCL_SERVER); + else + rc = buflen; } else { rc = buflen; } } - } else { + } + + if (rc != 0) { lock_res_and_lock(lock); ldlm_resource_unlink_lock(lock); ldlm_lock_destroy_nolock(lock); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 5e9d83e..19a3595 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -445,6 +445,12 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, if (IS_ERR(lock)) GOTO(out_nolock, err = PTR_ERR(lock)); + err = ldlm_lvbo_init(lock->l_resource); + if (err < 0) { + LDLM_ERROR(lock, "delayed lvb init failed (rc %d)", err); + GOTO(out, err); + } + ldlm_lock2handle(lock, lockh); /* NB: we don't have any lock now (lock_res_and_lock) diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index 5f6a645..85584ab 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -1054,10 +1054,10 @@ static struct ldlm_resource *ldlm_resource_new(void) spin_lock_init(&res->lr_lock); lu_ref_init(&res->lr_reference); - /* The creator of the resource must unlock the mutex after LVB - * initialization. */ + /* Since LVB init can be delayed now, there is no longer need to + * immediatelly acquire mutex here. */ mutex_init(&res->lr_lvb_mutex); - mutex_lock(&res->lr_lvb_mutex); + res->lr_lvb_initialized = false; return res; } @@ -1077,7 +1077,6 @@ ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent, cfs_hash_bd_t bd; __u64 version; int ns_refcount = 0; - int rc; LASSERT(ns != NULL); LASSERT(parent == NULL); @@ -1088,7 +1087,7 @@ ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent, hnode = cfs_hash_bd_lookup_locked(ns->ns_rs_hash, &bd, (void *)name); if (hnode != NULL) { cfs_hash_bd_unlock(ns->ns_rs_hash, &bd, 0); - GOTO(lvbo_init, res); + GOTO(found, res); } version = cfs_hash_bd_version_get(&bd); @@ -1117,22 +1116,9 @@ ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent, cfs_hash_bd_unlock(ns->ns_rs_hash, &bd, 1); /* Clean lu_ref for failed resource. */ lu_ref_fini(&res->lr_reference); - /* We have taken lr_lvb_mutex. Drop it. */ - mutex_unlock(&res->lr_lvb_mutex); OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res); -lvbo_init: +found: res = hlist_entry(hnode, struct ldlm_resource, lr_hash); - /* Synchronize with regard to resource creation. */ - if (ns->ns_lvbo && ns->ns_lvbo->lvbo_init) { - mutex_lock(&res->lr_lvb_mutex); - mutex_unlock(&res->lr_lvb_mutex); - } - - if (unlikely(res->lr_lvb_len < 0)) { - rc = res->lr_lvb_len; - ldlm_resource_putref(res); - res = ERR_PTR(rc); - } return res; } /* We won! Let's add the resource. */ @@ -1141,26 +1127,8 @@ lvbo_init: ns_refcount = ldlm_namespace_get_return(ns); cfs_hash_bd_unlock(ns->ns_rs_hash, &bd, 1); - if (ns->ns_lvbo && ns->ns_lvbo->lvbo_init) { - OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CREATE_RESOURCE, 2); - rc = ns->ns_lvbo->lvbo_init(res); - if (rc < 0) { - CERROR("%s: lvbo_init failed for resource "LPX64":" - LPX64": rc = %d\n", ns->ns_obd->obd_name, - name->name[0], name->name[1], rc); - if (res->lr_lvb_data) { - OBD_FREE(res->lr_lvb_data, res->lr_lvb_len); - res->lr_lvb_data = NULL; - } - res->lr_lvb_len = rc; - mutex_unlock(&res->lr_lvb_mutex); - ldlm_resource_putref(res); - return ERR_PTR(rc); - } - } - /* We create resource with locked lr_lvb_mutex. */ - mutex_unlock(&res->lr_lvb_mutex); + OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CREATE_RESOURCE, 2); /* Let's see if we happened to be the very first resource in this * namespace. If so, and this is a client namespace, we need to move diff --git a/lustre/ofd/ofd_io.c b/lustre/ofd/ofd_io.c index 2c83be3..d8bef11 100644 --- a/lustre/ofd/ofd_io.c +++ b/lustre/ofd/ofd_io.c @@ -1017,7 +1017,7 @@ int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp, rs = ldlm_resource_get(ns, NULL, &info->fti_resid, LDLM_EXTENT, 0); if (!IS_ERR(rs)) { - ns->ns_lvbo->lvbo_update(rs, NULL, 1); + ldlm_res_lvbo_update(rs, NULL, 1); ldlm_resource_putref(rs); } } -- 1.8.3.1