* Currently LVBs are used by:
* - OSC-OST code to maintain current object size/times
* - layout lock code to return the layout when the layout lock is granted
+ *
+ * To ensure delayed LVB initialization, it is highly recommended to use the set
+ * of ldlm_[res_]lvbo_[init,update,fill]() functions.
*/
struct ldlm_valblock_ops {
int (*lvbo_init)(struct ldlm_resource *res);
*/
struct mutex lr_lvb_mutex;
int lr_lvb_len;
+ /** is lvb initialized ? */
+ bool lr_lvb_initialized;
/** protected by lr_lock */
void *lr_lvb_data;
static inline int ldlm_lvbo_init(struct ldlm_resource *res)
{
struct ldlm_namespace *ns = ldlm_res_to_ns(res);
+ int rc = 0;
- if (ns->ns_lvbo != NULL && ns->ns_lvbo->lvbo_init != NULL)
- return ns->ns_lvbo->lvbo_init(res);
+ if (ns->ns_lvbo == NULL || ns->ns_lvbo->lvbo_init == NULL ||
+ res->lr_lvb_initialized)
+ return 0;
- return 0;
+ mutex_lock(&res->lr_lvb_mutex);
+ /* Did we lose the race? */
+ if (res->lr_lvb_initialized) {
+ mutex_unlock(&res->lr_lvb_mutex);
+ return 0;
+ }
+ rc = ns->ns_lvbo->lvbo_init(res);
+ if (rc < 0) {
+ CDEBUG(D_DLMTRACE, "lvbo_init failed for resource : rc = %d\n",
+ rc);
+ if (res->lr_lvb_data != NULL) {
+ OBD_FREE(res->lr_lvb_data, res->lr_lvb_len);
+ res->lr_lvb_data = NULL;
+ }
+ res->lr_lvb_len = rc;
+ } else {
+ res->lr_lvb_initialized = true;
+ }
+ mutex_unlock(&res->lr_lvb_mutex);
+ return rc;
}
static inline int ldlm_lvbo_size(struct ldlm_lock *lock)
static inline int ldlm_lvbo_fill(struct ldlm_lock *lock, void *buf, int len)
{
struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
+ int rc;
if (ns->ns_lvbo != NULL) {
LASSERT(ns->ns_lvbo->lvbo_fill != NULL);
+ /* init lvb now if not already */
+ rc = ldlm_lvbo_init(lock->l_resource);
+ if (rc < 0) {
+ CERROR("lock %p: delayed lvb init failed (rc %d)",
+ lock, rc);
+ return rc;
+ }
return ns->ns_lvbo->lvbo_fill(lock, buf, len);
}
return 0;
static inline int ldlm_res_lvbo_update(struct ldlm_resource *res,
struct ptlrpc_request *r, int increase)
{
+ int rc;
+
+ /* delayed lvb init may be required */
+ rc = ldlm_lvbo_init(res);
+ if (rc < 0) {
+ CERROR("delayed lvb init failed (rc %d)\n", rc);
+ return rc;
+ }
+
if (ldlm_res_to_ns(res)->ns_lvbo &&
ldlm_res_to_ns(res)->ns_lvbo->lvbo_update) {
return ldlm_res_to_ns(res)->ns_lvbo->lvbo_update(res, r,
struct ldlm_lock *lock = NULL;
void *cookie = NULL;
int rc = 0;
+ struct ldlm_resource *res = NULL;
ENTRY;
LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
lock->l_remote_handle = dlm_req->lock_handle[0];
LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
+ /* Initialize resource lvb but not for a lock being replayed since
+ * Client already got lvb sent in this case.
+ * This must occur early since some policy methods assume resource
+ * lvb is available (lr_lvb_data != NULL).
+ */
+ res = lock->l_resource;
+ if (!(flags & LDLM_FL_REPLAY)) {
+ /* non-replayed lock, delayed lvb init may need to be done */
+ rc = ldlm_lvbo_init(res);
+ if (rc < 0) {
+ LDLM_ERROR(lock, "delayed lvb init failed (rc %d)", rc);
+ GOTO(out, rc);
+ }
+ }
+
OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2);
/* Don't enqueue a lock onto the export if it is been disonnected
* due to eviction (bug 3822) or server umount (bug 24324).
req, lock);
buflen = req_capsule_get_size(&req->rq_pill,
&RMF_DLM_LVB, RCL_SERVER);
- if (buflen > 0) {
+ /* non-replayed lock, delayed lvb init may
+ * need to be occur now */
+ if ((buflen > 0) && !(flags & LDLM_FL_REPLAY)) {
buflen = ldlm_lvbo_fill(lock, buf,
buflen);
if (buflen >= 0)
buflen, RCL_SERVER);
else
rc = buflen;
+ } else if (flags & LDLM_FL_REPLAY) {
+ /* no LVB resend upon replay */
+ if (buflen > 0)
+ req_capsule_shrink(
+ &req->rq_pill,
+ &RMF_DLM_LVB,
+ 0, RCL_SERVER);
+ else
+ rc = buflen;
} else {
rc = buflen;
}
}
- } else {
+ }
+
+ if (rc != 0) {
lock_res_and_lock(lock);
ldlm_resource_unlink_lock(lock);
ldlm_lock_destroy_nolock(lock);
if (IS_ERR(lock))
GOTO(out_nolock, err = PTR_ERR(lock));
+ err = ldlm_lvbo_init(lock->l_resource);
+ if (err < 0) {
+ LDLM_ERROR(lock, "delayed lvb init failed (rc %d)", err);
+ GOTO(out, err);
+ }
+
ldlm_lock2handle(lock, lockh);
/* NB: we don't have any lock now (lock_res_and_lock)
spin_lock_init(&res->lr_lock);
lu_ref_init(&res->lr_reference);
- /* The creator of the resource must unlock the mutex after LVB
- * initialization. */
+ /* Since LVB init can be delayed now, there is no longer need to
+ * immediatelly acquire mutex here. */
mutex_init(&res->lr_lvb_mutex);
- mutex_lock(&res->lr_lvb_mutex);
+ res->lr_lvb_initialized = false;
return res;
}
cfs_hash_bd_t bd;
__u64 version;
int ns_refcount = 0;
- int rc;
LASSERT(ns != NULL);
LASSERT(parent == NULL);
hnode = cfs_hash_bd_lookup_locked(ns->ns_rs_hash, &bd, (void *)name);
if (hnode != NULL) {
cfs_hash_bd_unlock(ns->ns_rs_hash, &bd, 0);
- GOTO(lvbo_init, res);
+ GOTO(found, res);
}
version = cfs_hash_bd_version_get(&bd);
cfs_hash_bd_unlock(ns->ns_rs_hash, &bd, 1);
/* Clean lu_ref for failed resource. */
lu_ref_fini(&res->lr_reference);
- /* We have taken lr_lvb_mutex. Drop it. */
- mutex_unlock(&res->lr_lvb_mutex);
OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res);
-lvbo_init:
+found:
res = hlist_entry(hnode, struct ldlm_resource, lr_hash);
- /* Synchronize with regard to resource creation. */
- if (ns->ns_lvbo && ns->ns_lvbo->lvbo_init) {
- mutex_lock(&res->lr_lvb_mutex);
- mutex_unlock(&res->lr_lvb_mutex);
- }
-
- if (unlikely(res->lr_lvb_len < 0)) {
- rc = res->lr_lvb_len;
- ldlm_resource_putref(res);
- res = ERR_PTR(rc);
- }
return res;
}
/* We won! Let's add the resource. */
ns_refcount = ldlm_namespace_get_return(ns);
cfs_hash_bd_unlock(ns->ns_rs_hash, &bd, 1);
- if (ns->ns_lvbo && ns->ns_lvbo->lvbo_init) {
- OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CREATE_RESOURCE, 2);
- rc = ns->ns_lvbo->lvbo_init(res);
- if (rc < 0) {
- CERROR("%s: lvbo_init failed for resource "LPX64":"
- LPX64": rc = %d\n", ns->ns_obd->obd_name,
- name->name[0], name->name[1], rc);
- if (res->lr_lvb_data) {
- OBD_FREE(res->lr_lvb_data, res->lr_lvb_len);
- res->lr_lvb_data = NULL;
- }
- res->lr_lvb_len = rc;
- mutex_unlock(&res->lr_lvb_mutex);
- ldlm_resource_putref(res);
- return ERR_PTR(rc);
- }
- }
- /* We create resource with locked lr_lvb_mutex. */
- mutex_unlock(&res->lr_lvb_mutex);
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CREATE_RESOURCE, 2);
/* Let's see if we happened to be the very first resource in this
* namespace. If so, and this is a client namespace, we need to move
rs = ldlm_resource_get(ns, NULL, &info->fti_resid,
LDLM_EXTENT, 0);
if (!IS_ERR(rs)) {
- ns->ns_lvbo->lvbo_update(rs, NULL, 1);
+ ldlm_res_lvbo_update(rs, NULL, 1);
ldlm_resource_putref(rs);
}
}