*/
#define EXPORT_SYMTAB
+#define DEBUG_SUBSYSTEM S_LDLM
-#include <linux/version.h>
#include <linux/module.h>
#include <linux/slab.h>
-#include <asm/unistd.h>
-
-#define DEBUG_SUBSYSTEM S_LDLM
-
#include <linux/lustre_dlm.h>
extern kmem_cache_t *ldlm_resource_slab;
extern kmem_cache_t *ldlm_lock_slab;
+extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
+extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
-static int _ldlm_namespace_new(struct obd_device *obddev,
- struct ptlrpc_request *req)
-{
- struct ldlm_request *dlm_req;
- struct ldlm_namespace *ns;
- int rc;
- ldlm_error_t err;
- ENTRY;
-
- rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
- if (rc) {
- CERROR("out of memory\n");
- req->rq_status = -ENOMEM;
- RETURN(0);
- }
- dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
-
- err = ldlm_namespace_new(obddev, dlm_req->lock_desc.l_resource.lr_ns_id,
- &ns);
- req->rq_status = err;
-
- CERROR("err = %d\n", err);
-
- RETURN(0);
-}
-
-static int _ldlm_enqueue(struct ptlrpc_request *req)
+static int _ldlm_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc,
+ struct ptlrpc_request *req)
{
struct ldlm_reply *dlm_rep;
struct ldlm_request *dlm_req;
- int rc, size = sizeof(*dlm_rep);
+ int rc, size = sizeof(*dlm_rep), cookielen = 0;
+ __u32 flags;
ldlm_error_t err;
- struct ldlm_lock *lock;
+ struct ldlm_lock *lock = NULL;
+ ldlm_lock_callback callback;
+ struct lustre_handle lockh;
+ void *cookie = NULL;
ENTRY;
- rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
- if (rc) {
- CERROR("out of memory\n");
- req->rq_status = -ENOMEM;
- RETURN(0);
- }
- dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
- dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+ callback = ldlm_cli_callback;
- memcpy(&dlm_rep->lock_extent, &dlm_req->lock_desc.l_extent,
- sizeof(dlm_rep->lock_extent));
- dlm_rep->flags = dlm_req->flags;
+ dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+ if (dlm_req->lock_desc.l_resource.lr_type == LDLM_MDSINTENT) {
+ /* In this case, the reply buffer is allocated deep in
+ * local_lock_enqueue by the policy function. */
+ cookie = req;
+ cookielen = sizeof(*req);
+ } else {
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
+ &req->rq_repmsg);
+ if (rc) {
+ CERROR("out of memory\n");
+ RETURN(-ENOMEM);
+ }
+ if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
+ cookie = &dlm_req->lock_desc.l_extent;
+ cookielen = sizeof(struct ldlm_extent);
+ }
+ }
- err = ldlm_local_lock_create(dlm_req->lock_desc.l_resource.lr_ns_id,
+ err = ldlm_local_lock_create(obddev->obd_namespace,
&dlm_req->lock_handle2,
dlm_req->lock_desc.l_resource.lr_name,
dlm_req->lock_desc.l_resource.lr_type,
- &dlm_rep->lock_handle);
+ dlm_req->lock_desc.l_req_mode,
+ NULL, 0, &lockh);
if (err != ELDLM_OK)
GOTO(out, err);
- err = ldlm_local_lock_enqueue(&dlm_rep->lock_handle,
- dlm_req->lock_desc.l_req_mode,
- &dlm_rep->lock_extent,
- &dlm_rep->flags,
- ldlm_cli_callback,
- ldlm_cli_callback,
- lustre_msg_buf(req->rq_reqmsg, 1),
- req->rq_reqmsg->buflens[1]);
+ lock = lustre_handle2object(&lockh);
+ memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
+ sizeof(lock->l_remote_handle));
+ LDLM_DEBUG(lock, "server-side enqueue handler START");
+
+ flags = dlm_req->lock_flags;
+ err = ldlm_local_lock_enqueue(&lockh, cookie, cookielen, &flags,
+ callback, callback);
if (err != ELDLM_OK)
GOTO(out, err);
- lock = ldlm_handle2object(&dlm_rep->lock_handle);
- memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
- sizeof(lock->l_remote_handle));
+ dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
+ dlm_rep->lock_flags = flags;
+
+ memcpy(&dlm_rep->lock_handle, &lockh, sizeof(lockh));
+ if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
+ memcpy(&dlm_rep->lock_extent, &lock->l_extent,
+ sizeof(lock->l_extent));
+ if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED)
+ memcpy(dlm_rep->lock_resource_name, lock->l_resource->lr_name,
+ sizeof(dlm_rep->lock_resource_name));
+
lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
EXIT;
out:
req->rq_status = err;
- CERROR("err = %d\n", err);
+ CDEBUG(D_INFO, "err = %d\n", err);
+
+ if (ptlrpc_reply(svc, req))
+ LBUG();
+
+ if (err)
+ LDLM_DEBUG_NOLOCK("server-side enqueue handler END");
+ else {
+ ldlm_reprocess_all(lock->l_resource);
+ LDLM_DEBUG(lock, "server-side enqueue handler END");
+ }
return 0;
}
-static int _ldlm_convert(struct ptlrpc_request *req)
+static int _ldlm_convert(struct ptlrpc_service *svc, struct ptlrpc_request *req)
{
struct ldlm_request *dlm_req;
- int rc;
+ struct ldlm_reply *dlm_rep;
+ struct ldlm_resource *res;
+ struct ldlm_lock *lock;
+ int rc, size = sizeof(*dlm_rep);
ENTRY;
- rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc) {
CERROR("out of memory\n");
- req->rq_status = -ENOMEM;
- RETURN(0);
+ RETURN(-ENOMEM);
}
dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+ dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
+ dlm_rep->lock_flags = dlm_req->lock_flags;
+
+ lock = lustre_handle2object(&dlm_req->lock_handle1);
+ LDLM_DEBUG(lock, "server-side convert handler START");
+
+ res = ldlm_local_lock_convert(&dlm_req->lock_handle1,
+ dlm_req->lock_desc.l_req_mode,
+ &dlm_rep->lock_flags);
+ req->rq_status = 0;
+ if (ptlrpc_reply(svc, req) != 0)
+ LBUG();
+
+ ldlm_reprocess_all(res);
+ LDLM_DEBUG(lock, "server-side convert handler END");
- req->rq_status =
- ldlm_local_lock_convert(&dlm_req->lock_handle1,
- dlm_req->lock_desc.l_req_mode,
- &dlm_req->flags);
RETURN(0);
}
-static int _ldlm_cancel(struct ptlrpc_request *req)
+static int _ldlm_cancel(struct ptlrpc_service *svc, struct ptlrpc_request *req)
{
struct ldlm_request *dlm_req;
+ struct ldlm_lock *lock;
+ struct ldlm_resource *res;
int rc;
ENTRY;
rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc) {
CERROR("out of memory\n");
- req->rq_status = -ENOMEM;
- RETURN(0);
+ RETURN(-ENOMEM);
}
dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
- req->rq_status = ldlm_local_lock_cancel(&dlm_req->lock_handle1);
+ lock = lustre_handle2object(&dlm_req->lock_handle1);
+ LDLM_DEBUG(lock, "server-side cancel handler START");
+ res = ldlm_local_lock_cancel(lock);
+ req->rq_status = 0;
+ if (ptlrpc_reply(svc, req) != 0)
+ LBUG();
+
+ if (res != NULL)
+ ldlm_reprocess_all(res);
+ LDLM_DEBUG_NOLOCK("server-side cancel handler END");
+
RETURN(0);
}
-static int _ldlm_callback(struct ptlrpc_request *req)
+static int _ldlm_callback(struct ptlrpc_service *svc,
+ struct ptlrpc_request *req)
{
struct ldlm_request *dlm_req;
- struct ldlm_lock *lock;
+ struct ldlm_lock *lock, *new;
int rc;
ENTRY;
rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc) {
CERROR("out of memory\n");
- req->rq_status = -ENOMEM;
- RETURN(0);
+ RETURN(-ENOMEM);
}
dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
- lock = ldlm_handle2object(&dlm_req->lock_handle1);
- ldlm_lock_dump(lock);
- if (dlm_req->lock_handle2.addr) {
- CERROR("Got blocked callback for lock %p.\n", lock);
- /* FIXME: do something impressive. */
- } else {
- CERROR("Got granted callback for lock %p.\n", lock);
- lock->l_granted_mode = lock->l_req_mode;
+ /* We must send the reply first, so that the thread is free to handle
+ * any requests made in common_callback() */
+ rc = ptlrpc_reply(svc, req);
+ if (rc != 0)
+ RETURN(rc);
+
+ lock = lustre_handle2object(&dlm_req->lock_handle1);
+ new = lustre_handle2object(&dlm_req->lock_handle2);
+
+ LDLM_DEBUG(lock, "client %s callback handler START",
+ new == NULL ? "completion" : "blocked");
+
+ spin_lock(&lock->l_resource->lr_lock);
+ spin_lock(&lock->l_lock);
+ if (!new) {
+ CDEBUG(D_INFO, "Got local completion AST for lock %p.\n", lock);
+ lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
+
+ /* If we receive the completion AST before the actual enqueue
+ * returned, then we might need to switch resources. */
+ if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
+ lock->l_resource->lr_name,
+ sizeof(__u64) * RES_NAME_SIZE) != 0) {
+ struct ldlm_namespace *ns =
+ lock->l_resource->lr_namespace;
+ int type = lock->l_resource->lr_type;
+
+ if (!ldlm_resource_put(lock->l_resource))
+ spin_unlock(&lock->l_resource->lr_lock);
+
+ lock->l_resource = ldlm_resource_get(ns, NULL, dlm_req->lock_desc.l_resource.lr_name, type, 1);
+ if (lock->l_resource == NULL) {
+ LBUG();
+ RETURN(-ENOMEM);
+ }
+ spin_lock(&lock->l_resource->lr_lock);
+ LDLM_DEBUG(lock, "completion AST, new resource");
+ }
+
+ /* FIXME: the API is flawed if I have to do these refcount
+ * acrobatics (along with the _put() below). */
+ lock->l_resource->lr_refcount++;
+
+ /* _del_lock is safe for half-created locks that are not yet on
+ * a list. */
+ ldlm_resource_del_lock(lock);
+ ldlm_grant_lock(lock->l_resource, lock);
+
+ ldlm_resource_put(lock->l_resource);
+
wake_up(&lock->l_waitq);
+ spin_unlock(&lock->l_lock);
+ spin_unlock(&lock->l_resource->lr_lock);
+ } else {
+ CDEBUG(D_INFO, "Got local blocking AST for lock %p.\n", lock);
+ lock->l_flags |= LDLM_FL_DYING;
+ spin_unlock(&lock->l_lock);
+ spin_unlock(&lock->l_resource->lr_lock);
+ if (!lock->l_readers && !lock->l_writers) {
+ CDEBUG(D_INFO, "Lock already unused, calling "
+ "callback (%p).\n", lock->l_blocking_ast);
+ if (lock->l_blocking_ast != NULL)
+ lock->l_blocking_ast(lock, new, lock->l_data,
+ lock->l_data_len, NULL);
+ } else {
+ CDEBUG(D_INFO, "Lock still has references; lock will be"
+ " cancelled later.\n");
+ }
}
- req->rq_status = 0;
+ LDLM_DEBUG_NOLOCK("client %s callback handler END (lock: %p)",
+ new == NULL ? "completion" : "blocked", lock);
RETURN(0);
}
-static int ldlm_handle(struct obd_device *dev, struct ptlrpc_service *svc,
+static int lustre_handle(struct obd_device *dev, struct ptlrpc_service *svc,
struct ptlrpc_request *req)
{
- int rc;
+ struct obd_device *req_dev;
+ int id, rc;
ENTRY;
rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
GOTO(out, rc);
}
- if (req->rq_reqmsg->type != PTL_RPC_REQUEST) {
+ if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
CERROR("lustre_ldlm: wrong packet type sent %d\n",
req->rq_reqmsg->type);
GOTO(out, rc = -EINVAL);
}
- switch (req->rq_reqmsg->opc) {
- case LDLM_NAMESPACE_NEW:
- CDEBUG(D_INODE, "namespace_new\n");
- OBD_FAIL_RETURN(OBD_FAIL_LDLM_NAMESPACE_NEW, 0);
- rc = _ldlm_namespace_new(dev, req);
- break;
+ id = req->rq_reqmsg->target_id;
+ if (id < 0 || id > MAX_OBD_DEVICES)
+ GOTO(out, rc = -ENODEV);
+ req_dev = req->rq_obd = &obd_dev[id];
+ switch (req->rq_reqmsg->opc) {
case LDLM_ENQUEUE:
CDEBUG(D_INODE, "enqueue\n");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
- rc = _ldlm_enqueue(req);
+ rc = _ldlm_enqueue(req_dev, svc, req);
break;
case LDLM_CONVERT:
CDEBUG(D_INODE, "convert\n");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
- rc = _ldlm_convert(req);
+ rc = _ldlm_convert(svc, req);
break;
case LDLM_CANCEL:
CDEBUG(D_INODE, "cancel\n");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
- rc = _ldlm_cancel(req);
+ rc = _ldlm_cancel(svc, req);
break;
case LDLM_CALLBACK:
CDEBUG(D_INODE, "callback\n");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
- rc = _ldlm_callback(req);
+ rc = _ldlm_callback(svc, req);
break;
default:
RETURN(rc);
}
+ EXIT;
out:
if (rc)
RETURN(ptlrpc_error(svc, req));
- else
- RETURN(ptlrpc_reply(svc, req));
+ return 0;
}
-static int ldlm_iocontrol(int cmd, struct obd_conn *conn, int len, void *karg,
+static int ldlm_iocontrol(long cmd, struct obd_conn *conn, int len, void *karg,
void *uarg)
{
struct obd_device *obddev = conn->oc_dev;
if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
_IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
- CDEBUG(D_IOCTL, "invalid ioctl ( type %d, nr %d, size %d )\n",
+ CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
_IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
RETURN(-EINVAL);
}
- ptlrpc_init_client(NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
+ OBD_ALLOC(obddev->u.ldlm.ldlm_client,
+ sizeof(*obddev->u.ldlm.ldlm_client));
+ ptlrpc_init_client(NULL, NULL,
+ LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
obddev->u.ldlm.ldlm_client);
connection = ptlrpc_uuid_to_connection("ldlm");
if (!connection)
}
out:
- ptlrpc_put_connection(connection);
+ if (connection)
+ ptlrpc_put_connection(connection);
+ OBD_FREE(obddev->u.ldlm.ldlm_client,
+ sizeof(*obddev->u.ldlm.ldlm_client));
return err;
}
-static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data)
+#define LDLM_NUM_THREADS 8
+
+static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
{
struct ldlm_obd *ldlm = &obddev->u.ldlm;
- int err;
+ int rc;
+ int i;
ENTRY;
- ldlm_spinlock = SPIN_LOCK_UNLOCKED;
-
+ MOD_INC_USE_COUNT;
ldlm->ldlm_service =
ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
- LDLM_REPLY_PORTAL, "self", ldlm_handle);
- if (!ldlm->ldlm_service)
- LBUG();
-
- err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
- if (err) {
- CERROR("cannot start thread\n");
+ LDLM_REPLY_PORTAL, "self", lustre_handle);
+ if (!ldlm->ldlm_service) {
LBUG();
+ GOTO(out_dec, rc = -ENOMEM);
}
- OBD_ALLOC(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
- if (ldlm->ldlm_client == NULL)
- LBUG();
- ptlrpc_init_client(NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
- ldlm->ldlm_client);
-
- MOD_INC_USE_COUNT;
- RETURN(0);
-}
-
-static int cleanup_resource(struct ldlm_resource *res, struct list_head *q)
-{
- struct list_head *tmp, *pos;
- int rc = 0;
-
- list_for_each_safe(tmp, pos, q) {
- struct ldlm_lock *lock;
-
+ for (i = 0; i < LDLM_NUM_THREADS; i++) {
+ rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service,
+ "lustre_dlm");
+ /* XXX We could just continue if we had started at least
+ * a few threads here.
+ */
if (rc) {
- /* Res was already cleaned up. */
+ CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
LBUG();
+ GOTO(out_thread, rc);
}
-
- lock = list_entry(tmp, struct ldlm_lock, l_res_link);
-
- ldlm_resource_del_lock(lock);
- ldlm_lock_free(lock);
- rc = ldlm_resource_put(res);
- }
-
- return rc;
-}
-
-static int do_free_namespace(struct ldlm_namespace *ns)
-{
- struct list_head *tmp, *pos;
- int i, rc;
-
- for (i = 0; i < RES_HASH_SIZE; i++) {
- list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
- struct ldlm_resource *res;
- res = list_entry(tmp, struct ldlm_resource, lr_hash);
- list_del_init(&res->lr_hash);
-
- rc = cleanup_resource(res, &res->lr_granted);
- if (!rc)
- rc = cleanup_resource(res, &res->lr_converting);
- if (!rc)
- rc = cleanup_resource(res, &res->lr_waiting);
-
- while (rc == 0)
- rc = ldlm_resource_put(res);
- }
- }
-
- return ldlm_namespace_free(ns);
-}
-
-static int ldlm_free_all(struct obd_device *obddev)
-{
- struct list_head *tmp, *pos;
- int rc = 0;
-
- ldlm_lock();
-
- list_for_each_safe(tmp, pos, &ldlm_namespaces) {
- struct ldlm_namespace *ns;
- ns = list_entry(tmp, struct ldlm_namespace, ns_link);
-
- rc |= do_free_namespace(ns);
}
- ldlm_unlock();
+ RETURN(0);
+out_thread:
+ ptlrpc_stop_all_threads(ldlm->ldlm_service);
+ ptlrpc_unregister_service(ldlm->ldlm_service);
+out_dec:
+ MOD_DEC_USE_COUNT;
return rc;
}
struct ldlm_obd *ldlm = &obddev->u.ldlm;
ENTRY;
- ptlrpc_stop_thread(ldlm->ldlm_service);
- rpc_unregister_service(ldlm->ldlm_service);
-
- if (!list_empty(&ldlm->ldlm_service->srv_reqs)) {
- // XXX reply with errors and clean up
- CERROR("Request list not empty!\n");
- }
-
- OBD_FREE(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
- OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service));
+ ptlrpc_stop_all_threads(ldlm->ldlm_service);
+ ptlrpc_unregister_service(ldlm->ldlm_service);
- if (ldlm_free_all(obddev)) {
- CERROR("ldlm_free_all could not complete.\n");
- RETURN(-1);
- }
+ if (mds_reint_p != NULL)
+ inter_module_put("mds_reint");
+ if (mds_getattr_name_p != NULL)
+ inter_module_put("mds_getattr_name");
MOD_DEC_USE_COUNT;
RETURN(0);
static void __exit ldlm_exit(void)
{
obd_unregister_type(OBD_LDLM_DEVICENAME);
- kmem_cache_destroy(ldlm_resource_slab);
- kmem_cache_destroy(ldlm_lock_slab);
+ if (kmem_cache_destroy(ldlm_resource_slab) != 0)
+ CERROR("couldn't free ldlm resource slab\n");
+ if (kmem_cache_destroy(ldlm_lock_slab) != 0)
+ CERROR("couldn't free ldlm lock slab\n");
}
+EXPORT_SYMBOL(ldlm_local_lock_match);
+EXPORT_SYMBOL(ldlm_lock_addref);
+EXPORT_SYMBOL(ldlm_lock_decref);
+EXPORT_SYMBOL(ldlm_cli_convert);
+EXPORT_SYMBOL(ldlm_cli_enqueue);
+EXPORT_SYMBOL(ldlm_cli_cancel);
+EXPORT_SYMBOL(lustre_handle2object);
+EXPORT_SYMBOL(ldlm_test);
+EXPORT_SYMBOL(ldlm_lock_dump);
+EXPORT_SYMBOL(ldlm_namespace_new);
+EXPORT_SYMBOL(ldlm_namespace_free);
+
MODULE_AUTHOR("Cluster File Systems, Inc. <braam@clusterfs.com>");
MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
MODULE_LICENSE("GPL");