struct ptlrpc_request *r,
int increase);
int (*lvbo_free)(struct ldlm_resource *res);
+ /* Return size of lvb data appropriate RPC size can be reserved */
+ int (*lvbo_size)(struct ldlm_lock *lock);
+ /* Called to fill in lvb data to RPC buffer @buf */
+ int (*lvbo_fill)(struct ldlm_lock *lock, void *buf, int buflen);
};
typedef enum {
return &lock->l_resource->lr_ns_bucket->nsb_at_estimate;
}
+static inline int ldlm_lvbo_init(struct ldlm_resource *res)
+{
+ struct ldlm_namespace *ns = ldlm_res_to_ns(res);
+
+ if (ns->ns_lvbo != NULL && ns->ns_lvbo->lvbo_init != NULL)
+ return ns->ns_lvbo->lvbo_init(res);
+
+ return 0;
+}
+
+static inline int ldlm_lvbo_size(struct ldlm_lock *lock)
+{
+ struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
+
+ if (ns->ns_lvbo != NULL && ns->ns_lvbo->lvbo_size != NULL)
+ return ns->ns_lvbo->lvbo_size(lock);
+
+ return 0;
+}
+
+static inline int ldlm_lvbo_fill(struct ldlm_lock *lock, void *buf, int len)
+{
+ struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
+
+ if (ns->ns_lvbo != NULL) {
+ LASSERT(ns->ns_lvbo->lvbo_fill != NULL);
+ return ns->ns_lvbo->lvbo_fill(lock, buf, len);
+ }
+ return 0;
+}
+
struct ldlm_ast_work {
struct ldlm_lock *w_lock;
int w_blocking;
long total_enqueue_wait;
int instant_cancel = 0;
int rc = 0;
+ int lvb_len;
ENTRY;
LASSERT(lock != NULL);
if (req == NULL)
RETURN(-ENOMEM);
- /* server namespace, doesn't need lock */
- if (lock->l_resource->lr_lvb_len) {
+ /* server namespace, doesn't need lock */
+ lvb_len = ldlm_lvbo_size(lock);
+ if (lvb_len > 0)
req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT,
- lock->l_resource->lr_lvb_len);
- }
+ lvb_len);
rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK);
if (rc) {
body->lock_handle[0] = lock->l_remote_handle;
body->lock_flags = flags;
ldlm_lock2desc(lock, &body->lock_desc);
- if (lock->l_resource->lr_lvb_len) {
- void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
+ if (lvb_len > 0) {
+ void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
- lock_res(lock->l_resource);
- memcpy(lvb, lock->l_resource->lr_lvb_data,
- lock->l_resource->lr_lvb_len);
- unlock_res(lock->l_resource);
+ lvb_len = ldlm_lvbo_fill(lock, lvb, lvb_len);
+ req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB,
+ lvb_len, RCL_CLIENT);
}
LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)",
/* server namespace, doesn't need lock */
req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
- lock->l_resource->lr_lvb_len);
+ ldlm_lvbo_size(lock));
ptlrpc_request_set_replen(req);
req->rq_send_state = LUSTRE_IMP_FULL;
/* based on the assumption that lvb size never changes during
* resource life time otherwise it need resource->lr_lock's
* protection */
- if (lock->l_resource->lr_lvb_len) {
- req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
- RCL_SERVER,
- lock->l_resource->lr_lvb_len);
- }
+ req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
+ RCL_SERVER, ldlm_lvbo_size(lock));
if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
GOTO(out, rc = -ENOMEM);
"(err=%d, rc=%d)", err, rc);
if (rc == 0) {
- if (lock->l_resource->lr_lvb_len > 0) {
- /* MDT path won't handle lr_lvb_data, so
- * lock/unlock better be contained in the
- * if block */
- void *lvb;
-
- lvb = req_capsule_server_get(&req->rq_pill,
- &RMF_DLM_LVB);
- LASSERTF(lvb != NULL, "req %p, lock %p\n",
- req, lock);
- lock_res(lock->l_resource);
- memcpy(lvb, lock->l_resource->lr_lvb_data,
- lock->l_resource->lr_lvb_len);
- unlock_res(lock->l_resource);
- }
+ int lvb_len = ldlm_lvbo_size(lock);
+
+ if (lvb_len > 0) {
+ void *buf;
+ int buflen;
+
+ buf = req_capsule_server_get(&req->rq_pill,
+ &RMF_DLM_LVB);
+ LASSERTF(buf != NULL, "req %p, lock %p\n",
+ req, lock);
+ buflen = req_capsule_get_size(&req->rq_pill,
+ &RMF_DLM_LVB, RCL_SERVER);
+ buflen = ldlm_lvbo_fill(lock, buf, buflen);
+ req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB,
+ buflen, RCL_SERVER);
+ }
} else {
lock_res_and_lock(lock);
ldlm_resource_unlink_lock(lock);
MODULES := mdt
mdt-objs := mdt_handler.o mdt_lib.o mdt_reint.o mdt_xattr.o mdt_recovery.o
mdt-objs += mdt_open.o mdt_idmap.o mdt_identity.o mdt_capa.o mdt_lproc.o mdt_fs.o
+mdt-objs += mdt_lvb.o
@INCLUDE_RULES@
if (m->mdt_namespace == NULL)
GOTO(err_fini_seq, rc = -ENOMEM);
+ m->mdt_namespace->ns_lvbp = m;
+ m->mdt_namespace->ns_lvbo = &mdt_lvbo;
+
ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
/* set obd_namespace for compatibility with old code */
obd->obd_namespace = m->mdt_namespace;
return mdt_dlm_lock_modes[mode];
}
+/* mdt_lvb.c */
+extern struct ldlm_valblock_ops mdt_lvbo;
+
static inline struct lu_name *mdt_name(const struct lu_env *env,
char *name, int namelen)
{
--- /dev/null
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2012, Intel, Inc.
+ * Use is subject to license terms.
+ *
+ * lustre/mdt/mdt_lvb.c
+ *
+ * Author: Jinshan Xiong <jinshan.xiong@intel.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include "mdt_internal.h"
+
+/* Called with res->lr_lvb_sem held */
+static int mdt_lvbo_init(struct ldlm_resource *res)
+{
+ return 0;
+}
+
+static int mdt_lvbo_size(struct ldlm_lock *lock)
+{
+ return 0;
+}
+
+static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen)
+{
+ return 0;
+}
+
+struct ldlm_valblock_ops mdt_lvbo = {
+ lvbo_init: mdt_lvbo_init,
+ lvbo_size: mdt_lvbo_size,
+ lvbo_fill: mdt_lvbo_fill
+};
return rc;
}
+static int filter_lvbo_size(struct ldlm_lock *unused)
+{
+ return sizeof(struct ost_lvb);
+}
+
+static int filter_lvbo_fill(struct ldlm_lock *lock,
+ void *buf, int buflen)
+{
+ struct ldlm_resource *res = lock->l_resource;
+
+ lock_res(res);
+ LASSERTF(buflen >= res->lr_lvb_len,
+ "actual %d, want %d\n", buflen, res->lr_lvb_len);
+ memcpy(buf, res->lr_lvb_data, res->lr_lvb_len);
+ unlock_res(res);
+
+ return res->lr_lvb_len;
+}
+
struct ldlm_valblock_ops filter_lvbo = {
lvbo_init: filter_lvbo_init,
lvbo_update: filter_lvbo_update,
- lvbo_free: filter_lvbo_free
+ lvbo_free: filter_lvbo_free,
+ lvbo_size: filter_lvbo_size,
+ lvbo_fill: filter_lvbo_fill
};
return rc;
}
+static int ofd_lvbo_size(struct ldlm_lock *unused)
+{
+ return sizeof(struct ost_lvb);
+}
+
+static int ofd_lvbo_fill(struct ldlm_lock *lock, void *buf, int buflen)
+{
+ struct ldlm_resource *res = lock->l_resource;
+
+ lock_res(res);
+ LASSERTF(buflen >= res->lr_lvb_len,
+ "actual %d, want %d\n", buflen, res->lr_lvb_len);
+ memcpy(buf, res->lr_lvb_data, res->lr_lvb_len);
+ unlock_res(res);
+
+ return res->lr_lvb_len;
+}
+
struct ldlm_valblock_ops ofd_lvbo = {
lvbo_init: ofd_lvbo_init,
lvbo_update: ofd_lvbo_update,
lvbo_free: ofd_lvbo_free,
+ lvbo_size: ofd_lvbo_size,
+ lvbo_fill: ofd_lvbo_fill
};