From: pschwan Date: Wed, 17 Apr 2002 17:49:27 +0000 (+0000) Subject: The more controversial changes, although my tree finally works: X-Git-Tag: v1_7_100~5751 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=7ec93168ad1a12925ebc0c3f35c0b3dcdb755131;p=fs%2Flustre-release.git The more controversial changes, although my tree finally works: - if the enqueue_lock RPC doesn't return an immediately granted lock, go to sleep - when the callback RPC is received, wake that process up - LBUG if we try to free a lock that still has children - set the completion_ast at the end of local_lock_enqueue, to avoid doing a spurious RPC in the case of an immediately granted lock - removed ldlm_server_conn, in favour of using the MDS/OST connections that already exist. - don't do remote locking RPCs if the lock is managed locally - fixed some osc_setup error cases - create a namespace at mount time --- diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index 1b34f76..9d8ee98 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -115,6 +115,7 @@ struct ldlm_lock { //void *l_event; //XXX cluster_host l_holder; __u32 l_version[RES_VERSION_SIZE]; + wait_queue_head_t l_waitq; }; typedef int (*ldlm_res_compat)(struct ldlm_lock *child, struct ldlm_lock *new); @@ -211,7 +212,7 @@ ldlm_error_t ldlm_local_lock_cancel(struct ldlm_handle *lockh); void ldlm_lock_dump(struct ldlm_lock *lock); /* ldlm_test.c */ -int ldlm_test(struct obd_device *device); +int ldlm_test(struct obd_device *device, struct ptlrpc_connection *conn); /* resource.c */ struct ldlm_namespace *ldlm_namespace_find(__u32 id); @@ -230,8 +231,7 @@ void ldlm_resource_dump(struct ldlm_resource *res); /* ldlm_request.c */ int ldlm_cli_namespace_new(struct obd_device *, struct ptlrpc_client *, - struct ptlrpc_connection *, - __u32 ns_id, struct ptlrpc_request **); + struct ptlrpc_connection *, __u32 ns_id); int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *peer, __u32 ns_id, struct ldlm_handle *parent_lock_handle, @@ -240,6 +240,8 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *peer, struct ldlm_extent *req_ex, ldlm_mode_t mode, int *flags, + ldlm_lock_callback completion, + ldlm_lock_callback blocking, void *data, __u32 data_len, struct ldlm_handle *lockh, diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 6bf38e8..b8208d4 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -12,17 +12,9 @@ */ #define EXPORT_SYMTAB - -#include -#include -#include -#include - #define DEBUG_SUBSYSTEM S_LDLM -#include -#include - +#include #include extern kmem_cache_t *ldlm_lock_slab; @@ -53,6 +45,7 @@ static int ldlm_intent_compat(struct ldlm_lock *a, struct ldlm_lock *b) return 0; } +/* Caller should do ldlm_resource_get() on this resource first. */ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, struct ldlm_resource *resource) { @@ -68,6 +61,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, memset(lock, 0, sizeof(*lock)); lock->l_resource = resource; INIT_LIST_HEAD(&lock->l_children); + init_waitqueue_head(&lock->l_waitq); if (parent != NULL) { lock->l_parent = parent; @@ -77,8 +71,13 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, return lock; } +/* Caller must do its own ldlm_resource_put() on lock->l_resource */ void ldlm_lock_free(struct ldlm_lock *lock) { + if (!list_empty(&lock->l_children)) { + CERROR("lock still has children!\n"); + LBUG(); + } kmem_cache_free(ldlm_lock_slab, lock); } @@ -132,8 +131,7 @@ static void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock) ldlm_error_t ldlm_local_lock_create(__u32 ns_id, struct ldlm_handle *parent_lock_handle, - __u64 *res_id, - __u32 type, + __u64 *res_id, __u32 type, struct ldlm_handle *lockh) { struct ldlm_namespace *ns; @@ -196,7 +194,6 @@ ldlm_error_t ldlm_local_lock_enqueue(struct ldlm_handle *lockh, lock->l_req_mode = mode; lock->l_data = data; lock->l_data_len = data_len; - lock->l_completion_ast = completion; lock->l_blocking_ast = blocking; spin_lock(&lock->l_resource->lr_lock); @@ -226,6 +223,9 @@ ldlm_error_t ldlm_local_lock_enqueue(struct ldlm_handle *lockh, ldlm_grant_lock(lock->l_resource, lock); EXIT; out: + /* Don't set 'completion_ast' until here so that if the lock is granted + * immediately we don't do an unnecessary completion call. */ + lock->l_completion_ast = completion; spin_unlock(&lock->l_resource->lr_lock); return ELDLM_OK; } diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index dc56bcc..20456f6 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -83,7 +83,7 @@ static int _ldlm_enqueue(struct ptlrpc_request *req) dlm_req->lock_desc.l_req_mode, &dlm_rep->lock_extent, &dlm_rep->flags, - NULL, //ldlm_cli_callback, + ldlm_cli_callback, ldlm_cli_callback, lustre_msg_buf(req->rq_reqmsg, 1), req->rq_reqmsg->buflens[1]); @@ -158,6 +158,14 @@ static int _ldlm_callback(struct ptlrpc_request *req) lock = ldlm_handle2object(&dlm_req->lock_handle1); ldlm_lock_dump(lock); + if (dlm_req->lock_handle2.addr) { + CERROR("Got blocked callback for lock %p.\n", lock); + /* FIXME: do something impressive. */ + } else { + CERROR("Got granted callback for lock %p.\n", lock); + lock->l_granted_mode = lock->l_req_mode; + wake_up(&lock->l_waitq); + } req->rq_status = 0; @@ -229,31 +237,26 @@ static int ldlm_iocontrol(int cmd, struct obd_conn *conn, int len, void *karg, void *uarg) { struct obd_device *obddev = conn->oc_dev; + struct ptlrpc_connection *connection; int err; ENTRY; if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR || _IOC_NR(cmd) > IOC_LDLM_MAX_NR) { CDEBUG(D_IOCTL, "invalid ioctl ( type %d, nr %d, size %d )\n", - _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd)); - EXIT; - return -EINVAL; - } - -#if 0 - /* XX phil -- put the peer back in */ - - ptlrpc_init_client(NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, &cl); - err = ptlrpc_connect_client("ldlm", &cl, NULL); -#endif - if (err) { - CERROR("cannot create client\n"); + _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd)); RETURN(-EINVAL); } + ptlrpc_init_client(NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, + obddev->u.ldlm.ldlm_client); + connection = ptlrpc_uuid_to_connection("ldlm"); + if (!connection) + CERROR("No LDLM UUID found: assuming ldlm is local.\n"); + switch (cmd) { case IOC_LDLM_TEST: { - err = ldlm_test(obddev); + err = ldlm_test(obddev, connection); CERROR("-- done err %d\n", err); GOTO(out, err); } @@ -262,6 +265,7 @@ static int ldlm_iocontrol(int cmd, struct obd_conn *conn, int len, void *karg, } out: + ptlrpc_put_connection(connection); return err; } @@ -274,10 +278,9 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data) INIT_LIST_HEAD(&ldlm_namespaces); ldlm_spinlock = SPIN_LOCK_UNLOCKED; - ldlm->ldlm_service = ptlrpc_init_svc(64 * 1024, - LDLM_REQUEST_PORTAL, - LDLM_REPLY_PORTAL, - "self", ldlm_handle); + ldlm->ldlm_service = + ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL, + LDLM_REPLY_PORTAL, "self", ldlm_handle); if (!ldlm->ldlm_service) LBUG(); @@ -290,14 +293,8 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data) OBD_ALLOC(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client)); if (ldlm->ldlm_client == NULL) LBUG(); - ptlrpc_init_client(NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, ldlm->ldlm_client); - ldlm->ldlm_server_conn = ptlrpc_uuid_to_connection("ldlm"); - if (!ldlm->ldlm_server_conn) { - CERROR("cannot create client\n"); - LBUG(); - } MOD_INC_USE_COUNT; RETURN(0); @@ -385,7 +382,6 @@ static int ldlm_cleanup(struct obd_device *obddev) OBD_FREE(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client)); OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service)); - ptlrpc_put_connection(ldlm->ldlm_server_conn); if (ldlm_free_all(obddev)) { CERROR("ldlm_free_all could not complete.\n"); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 7c20bd9..f1293aa 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -10,11 +10,21 @@ */ #define EXPORT_SYMTAB - #define DEBUG_SUBSYSTEM S_LDLM #include +#define LOOPBACK(x) (((x) & cpu_to_be32(0xff000000)) == cpu_to_be32(0x7f000000)) + +static int is_local_conn(struct ptlrpc_connection *conn) +{ + ENTRY; + if (conn == NULL) + RETURN(1); + + RETURN(LOOPBACK(conn->c_peer.peer_nid)); +} + int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, __u32 ns_id, struct ldlm_handle *parent_lock_handle, @@ -23,6 +33,8 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, struct ldlm_extent *req_ex, ldlm_mode_t mode, int *flags, + ldlm_lock_callback completion, + ldlm_lock_callback blocking, void *data, __u32 data_len, struct ldlm_handle *lockh, @@ -43,8 +55,9 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, if (err != ELDLM_OK) RETURN(err); + lock = ldlm_handle2object(&local_lockh); /* Is this lock locally managed? */ - if (conn == NULL) + if (is_local_conn(conn)) GOTO(local, 0); req = ptlrpc_prep_req(cl, conn, LDLM_ENQUEUE, 2, size, bufs); @@ -76,11 +89,14 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, rc = ptlrpc_queue_wait(req); rc = ptlrpc_check_status(req, rc); - if (rc != ELDLM_OK) + if (rc != ELDLM_OK) { + ldlm_resource_put(lock->l_resource); + ldlm_lock_free(lock); GOTO(out, rc); + } + lock->l_connection = conn; reply = lustre_msg_buf(req->rq_repmsg, 0); - lock = ldlm_handle2object(&local_lockh); memcpy(&lock->l_remote_handle, &reply->lock_handle, sizeof(lock->l_remote_handle)); *flags = reply->flags; @@ -92,24 +108,30 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, EXIT; local: - rc = ldlm_local_lock_enqueue(&local_lockh, mode, req_ex, flags, NULL, - NULL, data, data_len); + rc = ldlm_local_lock_enqueue(&local_lockh, mode, req_ex, flags, + completion, blocking, data, data_len); + if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED | + LDLM_FL_BLOCK_CONV)) { + /* Go to sleep until the lock is granted. */ + /* FIXME: or cancelled. */ + wait_event_interruptible(lock->l_waitq, lock->l_req_mode == + lock->l_granted_mode); + } out: *request = req; return rc; } int ldlm_cli_namespace_new(struct obd_device *obddev, struct ptlrpc_client *cl, - struct ptlrpc_connection *conn, __u32 ns_id, - struct ptlrpc_request **request) + struct ptlrpc_connection *conn, __u32 ns_id) { struct ldlm_namespace *ns; struct ldlm_request *body; struct ptlrpc_request *req; int rc, size = sizeof(*body); - int err; + ENTRY; - if (conn == NULL) + if (is_local_conn(conn)) GOTO(local, 0); req = ptlrpc_prep_req(cl, conn, LDLM_NAMESPACE_NEW, 1, &size, NULL); @@ -123,18 +145,18 @@ int ldlm_cli_namespace_new(struct obd_device *obddev, struct ptlrpc_client *cl, rc = ptlrpc_queue_wait(req); rc = ptlrpc_check_status(req, rc); + ptlrpc_free_req(req); if (rc) GOTO(out, rc); EXIT; local: - err = ldlm_namespace_new(obddev, ns_id, &ns); - if (err != ELDLM_OK) { + rc = ldlm_namespace_new(obddev, ns_id, &ns); + if (rc != ELDLM_OK) { /* XXX: It succeeded remotely but failed locally. What to do? */ - return err; + CERROR("Local ldlm_namespace_new failed.\n"); } out: - *request = req; return rc; } @@ -147,6 +169,7 @@ int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new, struct ptlrpc_client *cl = obddev->u.ldlm.ldlm_client; int rc, size[2] = {sizeof(*body), data_len}; char *bufs[2] = {NULL, data}; + ENTRY; req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 2, size, bufs); @@ -157,8 +180,10 @@ int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new, memcpy(&body->lock_handle1, &lock->l_remote_handle, sizeof(body->lock_handle1)); - if (new != NULL) + if (new != NULL) { ldlm_lock2desc(new, &body->lock_desc); + ldlm_object2handle(new, &body->lock_handle2); + } req->rq_replen = lustre_msg_size(0, NULL); @@ -180,10 +205,11 @@ int ldlm_cli_convert(struct ptlrpc_client *cl, struct ldlm_handle *lockh, struct ptlrpc_request *req; int rc, size[2] = {sizeof(*body), lock->l_data_len}; char *bufs[2] = {NULL, lock->l_data}; + ENTRY; lock = ldlm_handle2object(lockh); - if (lock->l_connection == NULL) + if (is_local_conn(lock->l_connection)) GOTO(local, 0); req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CONVERT, 2, size, @@ -224,10 +250,11 @@ int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_handle *lockh, struct ptlrpc_request *req; int rc, size[2] = {sizeof(*body), lock->l_data_len}; char *bufs[2] = {NULL, lock->l_data}; + ENTRY; lock = ldlm_handle2object(lockh); - if (lock->l_connection == NULL) + if (is_local_conn(lock->l_connection)) GOTO(local, 0); req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CANCEL, 2, size, diff --git a/lustre/ldlm/ldlm_test.c b/lustre/ldlm/ldlm_test.c index 2ebe593..20b2301 100644 --- a/lustre/ldlm/ldlm_test.c +++ b/lustre/ldlm/ldlm_test.c @@ -10,17 +10,8 @@ */ #define EXPORT_SYMTAB - -#include -#include -#include -#include - #define DEBUG_SUBSYSTEM S_LDLM -#include -#include - #include static int ldlm_test_callback(struct ldlm_lock *lock, struct ldlm_lock *new, @@ -141,7 +132,8 @@ int ldlm_test_extents(struct obd_device *obddev) return 0; } -static int ldlm_test_network(struct obd_device *obddev) +static int ldlm_test_network(struct obd_device *obddev, + struct ptlrpc_connection *conn) { struct ldlm_obd *ldlm = &obddev->u.ldlm; struct ptlrpc_request *request; @@ -152,23 +144,22 @@ static int ldlm_test_network(struct obd_device *obddev) int flags = 0; ldlm_error_t err; - err = ldlm_cli_namespace_new(obddev, ldlm->ldlm_client, - ldlm->ldlm_server_conn, 3, &request); + err = ldlm_cli_namespace_new(obddev, ldlm->ldlm_client, conn, 3); ptlrpc_free_req(request); CERROR("ldlm_cli_namespace_new: %d\n", err); if (err != ELDLM_OK) GOTO(out, err); - err = ldlm_cli_enqueue(ldlm->ldlm_client, ldlm->ldlm_server_conn, 3, + err = ldlm_cli_enqueue(ldlm->ldlm_client, conn, 3, NULL, res_id, LDLM_EXTENT, &ext, LCK_PR, &flags, - NULL, 0, &lockh1, &request); + NULL, NULL, NULL, 0, &lockh1, &request); ptlrpc_free_req(request); CERROR("ldlm_cli_enqueue: %d\n", err); flags = 0; - err = ldlm_cli_enqueue(ldlm->ldlm_client, ldlm->ldlm_server_conn, 3, + err = ldlm_cli_enqueue(ldlm->ldlm_client, conn, 3, NULL, res_id, LDLM_EXTENT, &ext, LCK_EX, &flags, - NULL, 0, &lockh2, &request); + NULL, NULL, NULL, 0, &lockh2, &request); ptlrpc_free_req(request); CERROR("ldlm_cli_enqueue: %d\n", err); @@ -177,7 +168,7 @@ static int ldlm_test_network(struct obd_device *obddev) return err; } -int ldlm_test(struct obd_device *obddev) +int ldlm_test(struct obd_device *obddev, struct ptlrpc_connection *conn) { int rc; rc = ldlm_test_basics(obddev); @@ -188,6 +179,6 @@ int ldlm_test(struct obd_device *obddev) if (rc) RETURN(rc); - rc = ldlm_test_network(obddev); + rc = ldlm_test_network(obddev, conn); RETURN(rc); } diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 084b842..d89ce93 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -17,31 +17,11 @@ */ #define EXPORT_SYMTAB - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -#include -#include -#include -#include -#include - #define DEBUG_SUBSYSTEM S_OSC -#include -#include -#include +#include +#include +#include #include static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl, @@ -575,25 +555,48 @@ int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa, offset, flags); } -/* mount the file system (secretly) */ static int osc_setup(struct obd_device *obddev, obd_count len, void *buf) { struct osc_obd *osc = &obddev->u.osc; + __u32 ns_id; + int rc; ENTRY; + osc->osc_conn = ptlrpc_uuid_to_connection("ost"); + if (!osc->osc_conn) + RETURN(-EINVAL); + OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client)); if (osc->osc_client == NULL) - RETURN(-ENOMEM); + GOTO(out_conn, rc = -ENOMEM); + + OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client)); + if (osc->osc_ldlm_client == NULL) + GOTO(out_client, rc = -ENOMEM); ptlrpc_init_client(NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL, osc->osc_client); - - osc->osc_conn = ptlrpc_uuid_to_connection("ost"); - if (!osc->osc_conn) - RETURN(-EINVAL); + ptlrpc_init_client(NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, + osc->osc_ldlm_client); + + get_random_bytes(&ns_id, sizeof(ns_id)); + rc = ldlm_cli_namespace_new(obddev, osc->osc_ldlm_client, osc->osc_conn, + ns_id); + if (rc) { + CERROR("Couldn't create new namespace %u: %d\n", ns_id, rc); + GOTO(out_ldlm_client, rc); + } MOD_INC_USE_COUNT; RETURN(0); + + out_ldlm_client: + OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client)); + out_client: + OBD_FREE(osc->osc_client, sizeof(*osc->osc_client)); + out_conn: + ptlrpc_put_connection(osc->osc_conn); + return rc; } static int osc_cleanup(struct obd_device * obddev) @@ -601,6 +604,7 @@ static int osc_cleanup(struct obd_device * obddev) struct osc_obd *osc = &obddev->u.osc; OBD_FREE(osc->osc_client, sizeof(*osc->osc_client)); + OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client)); ptlrpc_put_connection(osc->osc_conn); MOD_DEC_USE_COUNT;