From 627f3b67348ba134e5025acfb741134be6ad5d06 Mon Sep 17 00:00:00 2001 From: pschwan Date: Mon, 17 Jun 2002 19:53:26 +0000 Subject: [PATCH] Many dlm and intent lock fixes. - initial unlink support - a DLM trace mode for debugging - serious DLM concurrency bugfixes - removed vestigial ``local callback'' code from ldlm - the beginnings of multi-RPC infrastructure --- lustre/include/linux/lustre_dlm.h | 35 +++++++++++---- lustre/include/linux/lustre_mds.h | 2 +- lustre/include/linux/lustre_net.h | 1 + lustre/ldlm/ldlm_lock.c | 90 ++++++++++++++++++++++++++++++--------- lustre/ldlm/ldlm_lockd.c | 51 +++++++++------------- lustre/ldlm/ldlm_request.c | 32 ++++++++++---- lustre/ldlm/ldlm_resource.c | 32 +++++++++----- lustre/ldlm/ldlm_test.c | 7 +-- lustre/lib/mds_updates.c | 3 +- lustre/llite/dcache.c | 12 +++--- lustre/llite/file.c | 3 +- lustre/llite/namei.c | 26 ++++++++++- lustre/llite/super.c | 15 ++++--- lustre/mdc/mdc_request.c | 35 +++++++++++++-- lustre/mds/handler.c | 12 ++++-- lustre/mds/mds_extN.c | 13 +++--- lustre/mds/mds_reint.c | 72 ++++++++++++++++++++++++++++++- lustre/osc/osc_request.c | 3 +- lustre/ost/ost_handler.c | 3 +- lustre/ptlrpc/client.c | 12 ++++-- 20 files changed, 344 insertions(+), 115 deletions(-) diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index eec8407..875be13 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -78,11 +78,12 @@ static inline int lockmode_compat(ldlm_mode_t exist, ldlm_mode_t new) */ struct ldlm_namespace { - struct ptlrpc_client ns_client; /* used for revocation callbacks */ - __u32 ns_local; /* is this a local lock tree? */ + char *ns_name; + struct ptlrpc_client ns_rpc_client; /* used for revocation callbacks */ + __u32 ns_client; /* is this a client-side lock tree? */ struct list_head *ns_hash; /* hash table for ns */ __u32 ns_refcount; /* count of resources in the hash */ - struct list_head ns_root_list; /* all root resources in ns */ + struct list_head ns_root_list; /* all root resources in ns */ spinlock_t ns_lock; /* protects hash, refcount, list */ }; @@ -99,7 +100,8 @@ struct ldlm_namespace { struct ldlm_lock; typedef int (*ldlm_lock_callback)(struct ldlm_lock *lock, struct ldlm_lock *new, - void *data, __u32 data_len); + void *data, __u32 data_len, + struct ptlrpc_request **req); struct ldlm_lock { struct ldlm_resource *l_resource; @@ -167,6 +169,7 @@ struct ldlm_resource { __u32 lr_version[RES_VERSION_SIZE]; __u32 lr_refcount; spinlock_t lr_lock; /* protects lists, refcount */ + void *lr_tmp; }; static inline struct ldlm_extent *ldlm_res2extent(struct ldlm_resource *res) @@ -176,6 +179,23 @@ static inline struct ldlm_extent *ldlm_res2extent(struct ldlm_resource *res) extern struct obd_ops ldlm_obd_ops; +#define LDLM_DEBUG(lock, format, a...) \ +do { \ + CDEBUG(D_DLMTRACE, "### " format \ + " (%s: lock %p mode %d/%d on res %Lu (rc %d) " \ + " type %d remote %Lx)\n", ## a, \ + lock->l_resource->lr_namespace->ns_name, lock, \ + lock->l_granted_mode, lock->l_req_mode, \ + lock->l_resource->lr_name[0], \ + lock->l_resource->lr_refcount, \ + lock->l_resource->lr_type, \ + lock->l_remote_handle.addr); \ +} while (0) + +#define LDLM_DEBUG_NOLOCK(format, a...) \ + CDEBUG(D_DLMTRACE, "### " format "\n", ## a); + + /* ldlm_extent.c */ int ldlm_extent_compat(struct ldlm_lock *, struct ldlm_lock *); int ldlm_extent_policy(struct ldlm_lock *, void *, ldlm_mode_t, void *); @@ -211,10 +231,10 @@ void ldlm_lock_dump(struct ldlm_lock *lock); int ldlm_test(struct obd_device *device, struct ptlrpc_connection *conn); /* resource.c */ -struct ldlm_namespace *ldlm_namespace_new(__u32 local); +struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 local); int ldlm_namespace_free(struct ldlm_namespace *ns); -/* resourc.c - internal */ +/* resource.c - internal */ struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent, __u64 *name, __u32 type, int create); @@ -241,12 +261,11 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, __u32 data_len, struct lustre_handle *lockh); int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new, - void *data, __u32 data_len); + void *data, __u32 data_len, struct ptlrpc_request **reqp); int ldlm_cli_convert(struct ptlrpc_client *, struct lustre_handle *, int new_mode, int *flags); int ldlm_cli_cancel(struct ptlrpc_client *, struct ldlm_lock *); - #endif /* __KERNEL__ */ /* ioctls for trying requests */ diff --git a/lustre/include/linux/lustre_mds.h b/lustre/include/linux/lustre_mds.h index 52b034a..9238982 100644 --- a/lustre/include/linux/lustre_mds.h +++ b/lustre/include/linux/lustre_mds.h @@ -134,7 +134,7 @@ void mds_pack_inode2body(struct mds_body *body, struct inode *inode); /* mds/handler.c */ struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt); int mds_lock_callback(struct ldlm_lock *lock, struct ldlm_lock *new, - void *data, int data_len); + void *data, int data_len, struct ptlrpc_request **req); int mds_reint(int offset, struct ptlrpc_request *req); /* mdc/mdc_request.c */ diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index 139330c..30224e2 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -141,6 +141,7 @@ struct ptlrpc_client { struct ptlrpc_request { int rq_type; /* one of PTL_RPC_REQUEST, PTL_RPC_REPLY, PTL_RPC_BULK */ struct list_head rq_list; + struct list_head rq_multi; struct obd_device *rq_obd; int rq_status; int rq_flags; diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 25d2214..3ff3c7e 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -55,13 +55,16 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie, struct ldlm_reply *rep; struct ldlm_namespace *ns = lock->l_resource->lr_namespace; __u32 type = lock->l_resource->lr_type; - __u64 new_resid[3] = {0, 0, 0}; + __u64 new_resid[3] = {0, 0, 0}, old_res; int bufcount, rc, size[3] = {sizeof(struct ldlm_reply), sizeof(struct mds_body), sizeof(struct obdo)}; it->opc = NTOH__u64(it->opc); + LDLM_DEBUG(lock, "intent policy, opc: %Ld", it->opc); + + /* prepare reply */ switch(it->opc) { case IT_GETATTR: /* Note that in the negative case you may be returning @@ -76,8 +79,12 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie, case IT_RENAME: bufcount = 3; break; - default: + case IT_UNLINK: bufcount = 2; + size[1] = sizeof(struct obdo); + break; + default: + LBUG(); } rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen, @@ -89,6 +96,8 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie, rep = lustre_msg_buf(req->rq_repmsg, 0); rep->lock_policy_res1 = 1; + + /* execute policy */ switch ( it->opc ) { case IT_CREAT: case IT_CREAT|IT_OPEN: @@ -97,6 +106,7 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie, case IT_SYMLINK: case IT_MKNOD: case IT_LINK: + case IT_UNLINK: case IT_RENAME2: if (mds_reint_p == NULL) mds_reint_p = @@ -140,13 +150,16 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie, LBUG(); } + if (it->opc == IT_UNLINK || it->opc == IT_RMDIR) + RETURN(ELDLM_LOCK_ABORTED); + mds_rep = lustre_msg_buf(req->rq_repmsg, 1); rep->lock_policy_res2 = req->rq_status; new_resid[0] = mds_rep->ino; + old_res = lock->l_resource->lr_name[0]; CDEBUG(D_INFO, "remote intent: locking %d instead of" - "%ld\n", mds_rep->ino, - (long)lock->l_resource->lr_name[0]); + "%ld\n", mds_rep->ino, (long)old_res); ldlm_resource_put(lock->l_resource); lock->l_resource = @@ -155,6 +168,8 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie, LBUG(); RETURN(-ENOMEM); } + LDLM_DEBUG(lock, "intent policy, old res %ld", + (long)old_res); RETURN(ELDLM_LOCK_CHANGED); } else { int size = sizeof(struct ldlm_reply); @@ -247,21 +262,25 @@ void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode) spin_unlock(&lock->l_lock); } -void ldlm_send_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock *new) +int ldlm_send_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock *new) { + struct ptlrpc_request *req = NULL; ENTRY; spin_lock(&lock->l_lock); if (lock->l_flags & LDLM_FL_AST_SENT) { - EXIT; - return; + RETURN(0); } lock->l_flags |= LDLM_FL_AST_SENT; - spin_unlock(&lock->l_lock); - lock->l_blocking_ast(lock, new, lock->l_data, lock->l_data_len); - EXIT; + lock->l_blocking_ast(lock, new, lock->l_data, lock->l_data_len, &req); + spin_unlock(&lock->l_lock); + if (req != NULL) { + struct list_head *list = lock->l_resource->lr_tmp; + list_add(&req->rq_multi, list); + } + RETURN(1); } /* Args: unlocked lock */ @@ -280,7 +299,7 @@ void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode) if (!lock->l_readers && !lock->l_writers && lock->l_flags & LDLM_FL_DYING) { /* Read this lock its rights. */ - if (!lock->l_resource->lr_namespace->ns_local) { + if (!lock->l_resource->lr_namespace->ns_client) { CERROR("LDLM_FL_DYING set on non-local lock!\n"); LBUG(); } @@ -288,14 +307,16 @@ void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode) CDEBUG(D_INFO, "final decref done on dying lock, " "calling callback.\n"); spin_unlock(&lock->l_lock); + /* This function pointer is unfortunately overloaded. This + * call will not result in an RPC. */ lock->l_blocking_ast(lock, NULL, lock->l_data, - lock->l_data_len); + lock->l_data_len, NULL); } else spin_unlock(&lock->l_lock); EXIT; } -/* Args: locked lock */ +/* Args: unlocked lock */ static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs, struct list_head *queue) { @@ -325,6 +346,10 @@ static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs, CDEBUG(D_OTHER, "compat function failed and lock modes incompat\n"); if (send_cbs && child->l_blocking_ast != NULL) { CDEBUG(D_OTHER, "incompatible; sending blocking AST.\n"); + /* It's very difficult to actually send the AST from + * here, because we'd have to drop the lock before going + * to sleep to wait for the reply. Instead we build the + * packet and send it later. */ ldlm_send_blocking_ast(child, lock); } } @@ -358,8 +383,8 @@ void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock) res->lr_most_restr = lock->l_granted_mode; if (lock->l_completion_ast) - lock->l_completion_ast(lock, NULL, - lock->l_data, lock->l_data_len); + lock->l_completion_ast(lock, NULL, lock->l_data, + lock->l_data_len, NULL); EXIT; } @@ -477,7 +502,7 @@ ldlm_error_t ldlm_local_lock_enqueue(struct lustre_handle *lockh, lock = lustre_handle2object(lockh); res = lock->l_resource; - local = res->lr_namespace->ns_local; + local = res->lr_namespace->ns_client; spin_lock(&res->lr_lock); lock->l_blocking_ast = blocking; @@ -492,7 +517,7 @@ ldlm_error_t ldlm_local_lock_enqueue(struct lustre_handle *lockh, res = lock->l_resource; *flags |= LDLM_FL_LOCK_CHANGED; } - if (rc < 0) { + if (rc == ELDLM_LOCK_ABORTED) { /* Abort. */ ldlm_resource_put(lock->l_resource); ldlm_lock_free(lock); @@ -562,6 +587,8 @@ static int ldlm_reprocess_queue(struct ldlm_resource *res, struct ldlm_lock *pending; pending = list_entry(tmp, struct ldlm_lock, l_res_link); + CDEBUG(D_INFO, "Reprocessing lock %p\n", pending); + /* the resource lock protects ldlm_lock_compat */ if (ldlm_lock_compat(pending, 1)) RETURN(1); @@ -579,15 +606,37 @@ static int ldlm_reprocess_queue(struct ldlm_resource *res, /* Must be called with resource->lr_lock not taken. */ void ldlm_reprocess_all(struct ldlm_resource *res) { + struct list_head rpc_list, *tmp, *pos; + + INIT_LIST_HEAD(&rpc_list); + /* Local lock trees don't get reprocessed. */ - if (res->lr_namespace->ns_local) + if (res->lr_namespace->ns_client) return; spin_lock(&res->lr_lock); + res->lr_tmp = &rpc_list; + ldlm_reprocess_queue(res, &res->lr_converting); if (list_empty(&res->lr_converting)) ldlm_reprocess_queue(res, &res->lr_waiting); + + res->lr_tmp = NULL; spin_unlock(&res->lr_lock); + + list_for_each_safe(tmp, pos, &rpc_list) { + int rc; + struct ptlrpc_request *req = + list_entry(tmp, struct ptlrpc_request, rq_multi); + + CDEBUG(D_INFO, "Sending callback.\n"); + + rc = ptlrpc_queue_wait(req); + rc = ptlrpc_check_status(req, rc); + ptlrpc_free_req(req); + if (rc) + CERROR("Callback send failed: %d\n", rc); + } } /* Must be called with lock and lock->l_resource unlocked */ @@ -632,7 +681,7 @@ struct ldlm_resource *ldlm_local_lock_convert(struct lustre_handle *lockh, list_del_init(&lock->l_res_link); /* If this is a local resource, put it on the appropriate list. */ - if (res->lr_namespace->ns_local) { + if (res->lr_namespace->ns_client) { if (*flags & LDLM_FL_BLOCK_CONV) ldlm_resource_add_lock(res, res->lr_converting.prev, lock); @@ -670,7 +719,8 @@ void ldlm_lock_dump(struct ldlm_lock *lock) CDEBUG(D_OTHER, " -- Lock dump: %p (%s)\n", lock, ver); CDEBUG(D_OTHER, " Parent: %p\n", lock->l_parent); - CDEBUG(D_OTHER, " Resource: %p\n", lock->l_resource); + CDEBUG(D_OTHER, " Resource: %p (%Ld)\n", lock->l_resource, + lock->l_resource->lr_name[0]); CDEBUG(D_OTHER, " Requested mode: %d, granted mode: %d\n", (int)lock->l_req_mode, (int)lock->l_granted_mode); CDEBUG(D_OTHER, " Readers: %u ; Writers; %u\n", diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index dcd6943..3e1cc75 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -21,21 +21,11 @@ extern kmem_cache_t *ldlm_lock_slab; extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req); extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req); -#define LOOPBACK(x) (((x) & cpu_to_be32(0xff000000)) == cpu_to_be32(0x7f000000)) - -static int is_local_conn(struct ptlrpc_connection *conn) -{ - ENTRY; - if (conn == NULL) - RETURN(1); - - RETURN(LOOPBACK(conn->c_peer.peer_nid)); -} - /* _ldlm_callback and local_callback setup the variables then call this common * code */ static int common_callback(struct ldlm_lock *lock, struct ldlm_lock *new, - ldlm_mode_t mode, void *data, __u32 data_len) + ldlm_mode_t mode, void *data, __u32 data_len, + struct ptlrpc_request **reqp) { ENTRY; @@ -66,7 +56,7 @@ static int common_callback(struct ldlm_lock *lock, struct ldlm_lock *new, "callback (%p).\n", lock->l_blocking_ast); if (lock->l_blocking_ast != NULL) lock->l_blocking_ast(lock, new, lock->l_data, - lock->l_data_len); + lock->l_data_len, reqp); } else { CDEBUG(D_INFO, "Lock still has references; lock will be" " cancelled later.\n"); @@ -75,17 +65,6 @@ static int common_callback(struct ldlm_lock *lock, struct ldlm_lock *new, RETURN(0); } -/* FIXME: I think that this is no longer necessary. */ -static int local_callback(struct ldlm_lock *l, struct ldlm_lock *new, - void *data, __u32 data_len) -{ - struct ldlm_lock *lock; - /* the 'remote handle' is the lock in the FS's namespace */ - lock = lustre_handle2object(&l->l_remote_handle); - - return common_callback(lock, new, l->l_granted_mode, data, data_len); -} - static int _ldlm_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc, struct ptlrpc_request *req) { @@ -100,11 +79,7 @@ static int _ldlm_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc, void *cookie; ENTRY; - /* Is this lock managed locally? */ - if (is_local_conn(req->rq_connection)) - callback = local_callback; - else - callback = ldlm_cli_callback; + callback = ldlm_cli_callback; dlm_req = lustre_msg_buf(req->rq_reqmsg, 0); if (dlm_req->lock_desc.l_resource.lr_type == LDLM_MDSINTENT) { @@ -134,6 +109,9 @@ static int _ldlm_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc, if (err != ELDLM_OK) GOTO(out, err); + lock = lustre_handle2object(&lockh); + LDLM_DEBUG(lock, "server-side enqueue handler"); + flags = dlm_req->lock_flags; err = ldlm_local_lock_enqueue(&lockh, cookie, cookielen, &flags, callback, callback); @@ -144,7 +122,6 @@ static int _ldlm_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc, dlm_rep->lock_flags = flags; memcpy(&dlm_rep->lock_handle, &lockh, sizeof(lockh)); - lock = lustre_handle2object(&lockh); if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) memcpy(&dlm_rep->lock_extent, &lock->l_extent, sizeof(lock->l_extent)); @@ -174,6 +151,7 @@ static int _ldlm_convert(struct ptlrpc_service *svc, struct ptlrpc_request *req) struct ldlm_request *dlm_req; struct ldlm_reply *dlm_rep; struct ldlm_resource *res; + struct ldlm_lock *lock; int rc, size = sizeof(*dlm_rep); ENTRY; @@ -186,6 +164,9 @@ static int _ldlm_convert(struct ptlrpc_service *svc, struct ptlrpc_request *req) dlm_rep = lustre_msg_buf(req->rq_repmsg, 0); dlm_rep->lock_flags = dlm_req->lock_flags; + lock = lustre_handle2object(&dlm_req->lock_handle1); + LDLM_DEBUG(lock, "server-side convert handler"); + res = ldlm_local_lock_convert(&dlm_req->lock_handle1, dlm_req->lock_desc.l_req_mode, &dlm_rep->lock_flags); @@ -214,6 +195,7 @@ static int _ldlm_cancel(struct ptlrpc_service *svc, struct ptlrpc_request *req) dlm_req = lustre_msg_buf(req->rq_reqmsg, 0); lock = lustre_handle2object(&dlm_req->lock_handle1); + LDLM_DEBUG(lock, "server-side cancel handler"); res = ldlm_local_lock_cancel(lock); req->rq_status = 0; if (ptlrpc_reply(svc, req) != 0) @@ -249,8 +231,15 @@ static int _ldlm_callback(struct ptlrpc_service *svc, lock1 = lustre_handle2object(&dlm_req->lock_handle1); lock2 = lustre_handle2object(&dlm_req->lock_handle2); + LDLM_DEBUG(lock1, "client %s callback handler START", + lock2 == NULL ? "completion" : "blocked"); + common_callback(lock1, lock2, dlm_req->lock_desc.l_granted_mode, NULL, - 0); + 0, NULL); + + LDLM_DEBUG_NOLOCK("client %s callback handler END", + lock2 == NULL ? "completion" : "blocked"); + RETURN(0); } diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 074d60e..5f566ad 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -40,6 +40,7 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, GOTO(out, rc); lock = lustre_handle2object(lockh); + LDLM_DEBUG(lock, "client-side enqueue START"); if (req == NULL) { req = ptlrpc_prep_req(cl, conn, LDLM_ENQUEUE, 1, &size, NULL); @@ -80,6 +81,8 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, ldlm_resource_put(lock->l_resource); spin_unlock(&lock->l_resource->lr_lock); ldlm_lock_free(lock); + if (rc == ELDLM_LOCK_ABORTED) + rc = 0; GOTO(out, rc); } @@ -111,6 +114,7 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, LBUG(); RETURN(-ENOMEM); } + LDLM_DEBUG(lock, "client-side enqueue, new resource"); } if (!req_passed_in) @@ -119,6 +123,7 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, rc = ldlm_local_lock_enqueue(lockh, cookie, cookielen, flags, callback, callback); + LDLM_DEBUG(lock, "client-side enqueue END"); if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV)) { /* Go to sleep until the lock is granted. */ @@ -136,16 +141,17 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, } int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new, - void *data, __u32 data_len) + void *data, __u32 data_len, struct ptlrpc_request **reqp) { struct ldlm_request *body; struct ptlrpc_request *req; - struct ptlrpc_client *cl = &lock->l_resource->lr_namespace->ns_client; + struct ptlrpc_client *cl = + &lock->l_resource->lr_namespace->ns_rpc_client; int rc, size = sizeof(*body); ENTRY; - req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1, &size, - NULL); + req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1, + &size, NULL); if (!req) GOTO(out, rc = -ENOMEM); @@ -162,11 +168,18 @@ int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new, ldlm_object2handle(new, &body->lock_handle2); } + LDLM_DEBUG(lock, "server preparing %s AST", + new == NULL ? "completion" : "blocked"); + req->rq_replen = lustre_msg_size(0, NULL); - rc = ptlrpc_queue_wait(req); - rc = ptlrpc_check_status(req, rc); - ptlrpc_free_req(req); + if (reqp == NULL) { + rc = ptlrpc_queue_wait(req); + rc = ptlrpc_check_status(req, rc); + ptlrpc_free_req(req); + } else { + *reqp = req; + } EXIT; out: @@ -187,6 +200,8 @@ int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh, lock = lustre_handle2object(lockh); *flags = 0; + LDLM_DEBUG(lock, "client-side convert"); + req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CONVERT, 1, &size, NULL); if (!req) @@ -228,12 +243,13 @@ int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh, int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_lock *lock) { - struct ldlm_request *body; struct ptlrpc_request *req; + struct ldlm_request *body; struct ldlm_resource *res; int rc, size = sizeof(*body); ENTRY; + LDLM_DEBUG(lock, "client-side cancel"); req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CANCEL, 1, &size, NULL); if (!req) diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index 99d00b8..67fdf88 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -15,7 +15,7 @@ kmem_cache_t *ldlm_resource_slab, *ldlm_lock_slab; -struct ldlm_namespace *ldlm_namespace_new(__u32 local) +struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client) { struct ldlm_namespace *ns = NULL; struct list_head *bucket; @@ -32,14 +32,20 @@ struct ldlm_namespace *ldlm_namespace_new(__u32 local) GOTO(out, ns); } - ptlrpc_init_client(NULL, NULL, - LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, - &ns->ns_client); + OBD_ALLOC(ns->ns_name, strlen(name) + 1); + if (!ns->ns_name) { + LBUG(); + GOTO(out, ns); + } + strcpy(ns->ns_name, name); + + ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, + &ns->ns_rpc_client); INIT_LIST_HEAD(&ns->ns_root_list); ns->ns_lock = SPIN_LOCK_UNLOCKED; ns->ns_refcount = 0; - ns->ns_local = local; + ns->ns_client = client; for (bucket = ns->ns_hash + RES_HASH_SIZE - 1; bucket >= ns->ns_hash; bucket--) @@ -49,6 +55,8 @@ struct ldlm_namespace *ldlm_namespace_new(__u32 local) out: if (ns && ns->ns_hash) vfree(ns->ns_hash); + if (ns && ns->ns_name) + OBD_FREE(ns->ns_name, strlen(name) + 1); if (ns) OBD_FREE(ns, sizeof(*ns)); return NULL; @@ -57,7 +65,7 @@ struct ldlm_namespace *ldlm_namespace_new(__u32 local) static int cleanup_resource(struct ldlm_resource *res, struct list_head *q) { struct list_head *tmp, *pos; - int rc = 0, client = res->lr_namespace->ns_local; + int rc = 0, client = res->lr_namespace->ns_client; ENTRY; list_for_each_safe(tmp, pos, q) { @@ -117,7 +125,8 @@ int ldlm_namespace_free(struct ldlm_namespace *ns) } vfree(ns->ns_hash /* , sizeof(struct list_head) * RES_HASH_SIZE */); - ptlrpc_cleanup_client(&ns->ns_client); + ptlrpc_cleanup_client(&ns->ns_rpc_client); + OBD_FREE(ns->ns_name, strlen(ns->ns_name) + 1); OBD_FREE(ns, sizeof(*ns)); return ELDLM_OK; @@ -310,8 +319,10 @@ void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head, /* Must be called with resource->lr_lock taken */ void ldlm_resource_del_lock(struct ldlm_lock *lock) { - list_del_init(&lock->l_res_link); - lock->l_resource->lr_refcount--; + if (!list_empty(&lock->l_res_link)) { + list_del_init(&lock->l_res_link); + lock->l_resource->lr_refcount--; + } } int ldlm_get_resource_handle(struct ldlm_resource *res, struct lustre_handle *h) @@ -341,7 +352,8 @@ void ldlm_resource_dump(struct ldlm_resource *res) (unsigned long long)res->lr_name[2]); CDEBUG(D_OTHER, "--- Resource: %p (%s)\n", res, name); - CDEBUG(D_OTHER, "Namespace: %p\n", res->lr_namespace); + CDEBUG(D_OTHER, "Namespace: %p (%s)\n", res->lr_namespace, + res->lr_namespace->ns_name); CDEBUG(D_OTHER, "Parent: %p, root: %p\n", res->lr_parent, res->lr_root); CDEBUG(D_OTHER, "Granted locks:\n"); diff --git a/lustre/ldlm/ldlm_test.c b/lustre/ldlm/ldlm_test.c index c110553..554e203 100644 --- a/lustre/ldlm/ldlm_test.c +++ b/lustre/ldlm/ldlm_test.c @@ -14,7 +14,8 @@ #include static int ldlm_test_callback(struct ldlm_lock *lock, struct ldlm_lock *new, - void *data, __u32 data_len) + void *data, __u32 data_len, + struct ptlrpc_request **reqp) { printk("ldlm_test_callback: lock=%p, new=%p\n", lock, new); return 0; @@ -29,7 +30,7 @@ int ldlm_test_basics(struct obd_device *obddev) struct lustre_handle lockh_1, lockh_2; int flags; - ns = ldlm_namespace_new(LDLM_NAMESPACE_SERVER); + ns = ldlm_namespace_new("test_server", LDLM_NAMESPACE_SERVER); if (ns == NULL) LBUG(); @@ -75,7 +76,7 @@ int ldlm_test_extents(struct obd_device *obddev) ldlm_error_t err; int flags; - ns = ldlm_namespace_new(LDLM_NAMESPACE_SERVER); + ns = ldlm_namespace_new("test_server", LDLM_NAMESPACE_SERVER); if (ns == NULL) LBUG(); diff --git a/lustre/lib/mds_updates.c b/lustre/lib/mds_updates.c index 70e6a40..e7f192c 100644 --- a/lustre/lib/mds_updates.c +++ b/lustre/lib/mds_updates.c @@ -189,7 +189,8 @@ void mds_unlink_pack(struct ptlrpc_request *req, int offset, rec->ul_opcode = HTON__u32(REINT_UNLINK); ll_inode2fid(&rec->ul_fid1, inode); - ll_inode2fid(&rec->ul_fid2, child); + if (child) + ll_inode2fid(&rec->ul_fid2, child); tmp = lustre_msg_buf(req->rq_reqmsg, offset + 1); LOGL0(name, namelen, tmp); diff --git a/lustre/llite/dcache.c b/lustre/llite/dcache.c index bbdd884..cbe154b 100644 --- a/lustre/llite/dcache.c +++ b/lustre/llite/dcache.c @@ -30,11 +30,13 @@ void ll_intent_release(struct dentry *de) return; } - handle = (struct lustre_handle *)de->d_it->it_lock_handle; - lock = lustre_handle2object(handle); - CDEBUG(D_INFO, "calling ldlm_lock_decref(%p, %d)\n", lock, - de->d_it->it_lock_mode); - ldlm_lock_decref(lock, de->d_it->it_lock_mode); + if (de->d_it->it_lock_mode) { + handle = (struct lustre_handle *)de->d_it->it_lock_handle; + lock = lustre_handle2object(handle); + CDEBUG(D_INFO, "calling ldlm_lock_decref(%p, %d)\n", lock, + de->d_it->it_lock_mode); + ldlm_lock_decref(lock, de->d_it->it_lock_mode); + } de->d_it = NULL; EXIT; } diff --git a/lustre/llite/file.c b/lustre/llite/file.c index aa16333..1382b9c 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -196,7 +196,8 @@ static void ll_update_atime(struct inode *inode) } static int ll_lock_callback(struct ldlm_lock *lock, struct ldlm_lock *new, - void *data, __u32 data_len) + void *data, __u32 data_len, + struct ptlrpc_request **reqp) { struct inode *inode = lock->l_data; ENTRY; diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 5d23512..1dba622 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -105,10 +105,11 @@ int ll_lock(struct inode *dir, struct dentry *dentry, err = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_MDSINTENT, it, LCK_PW, dir, dentry, lockh, 0, NULL, 0, dir, sizeof(*dir)); - else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN)) + else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_UNLINK)) err = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_MDSINTENT, it, LCK_PR, dir, dentry, lockh, 0, NULL, 0, dir, sizeof(*dir)); + else LBUG(); @@ -152,7 +153,7 @@ static struct dentry *ll_lookup2(struct inode * dir, struct dentry *dentry, it->it_disposition && !it->it_status) GOTO(negative, NULL); - if ( (it->it_op & (IT_GETATTR)) && + if ( (it->it_op & (IT_GETATTR | IT_UNLINK)) && it->it_disposition && it->it_status) GOTO(negative, NULL); @@ -170,6 +171,21 @@ static struct dentry *ll_lookup2(struct inode * dir, struct dentry *dentry, RETURN(ERR_PTR(-abs(err))); } offset = 0; + } else if (it->it_op == IT_UNLINK) { + struct obdo *obdo; + request = (struct ptlrpc_request *)it->it_data; + obdo = lustre_msg_buf(request->rq_repmsg, 1); + inode = new_inode(dir->i_sb); + ll_i2info(inode)->lli_obdo = obdo_alloc(); + + /* XXX fix mem allocation error */ + memcpy(ll_i2info(inode)->lli_obdo, obdo, sizeof(*obdo)); + + if (!inode) + GOTO(out_req, -ENOMEM); + inode->i_mode= S_IFREG; + inode->i_nlink = 1; + GOTO(out_req, 0); } else { struct mds_body *body; @@ -196,6 +212,7 @@ static struct dentry *ll_lookup2(struct inode * dir, struct dentry *dentry, if (it->it_op & IT_RENAME) it->it_data = dentry; + out_req: ptlrpc_free_req(request); if (!inode) RETURN(ERR_PTR(-ENOMEM)); @@ -517,6 +534,11 @@ static int ll_unlink(struct inode * dir, struct dentry *dentry) struct page * page; int err = -ENOENT; + if (dentry->d_it && dentry->d_it->it_disposition) { + inode->i_nlink = 0; + GOTO(out, err=0); + } + de = ext2_find_entry (dir, dentry, &page); if (!de) goto out; diff --git a/lustre/llite/super.c b/lustre/llite/super.c index 434176a..1f92883 100644 --- a/lustre/llite/super.c +++ b/lustre/llite/super.c @@ -257,19 +257,20 @@ static void ll_clear_inode(struct inode *inode) static void ll_delete_inode(struct inode *inode) { if (S_ISREG(inode->i_mode)) { - int err; - struct obdo *oa; + int err; + struct obdo *oa; oa = ll_i2info(inode)->lli_obdo; - if (!oa) { - CERROR("no memory\n"); - GOTO(out, -ENOMEM); - } + if (!oa) + GOTO(out, -EINVAL); + + if (oa->o_id == 0) + /* No obdo was ever created */ + GOTO(out, 0); err = obd_destroy(ll_i2obdconn(inode), oa); CDEBUG(D_INODE, "obd destroy of %Ld error %d\n", (unsigned long long)oa->o_id, err); - obdo_free(oa); } out: clear_inode(inode); diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 9830405..924629e 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -125,7 +125,8 @@ int mdc_getattr(struct obd_conn *conn, } static int mdc_lock_callback(struct ldlm_lock *lock, struct ldlm_lock *new, - void *data, int data_len) + void *data, int data_len, + struct ptlrpc_request **req) { int rc; struct inode *inode = data; @@ -173,6 +174,10 @@ int mdc_enqueue(struct obd_conn *conn, int lock_type, struct lookup_intent *it, struct ldlm_intent *lit; ENTRY; +#warning FIXME: Andreas, the sgid directory stuff also goes here, but check again on mds + + LDLM_DEBUG_NOLOCK("mdsintent %d dir %ld", it->it_op, dir->i_ino); + switch (it->it_op) { case IT_MKDIR: it->it_mode = (it->it_mode | S_IFDIR) & ~current->fs->umask; @@ -235,6 +240,24 @@ int mdc_enqueue(struct obd_conn *conn, int lock_type, struct lookup_intent *it, size[0] = sizeof(struct ldlm_reply); size[1] = sizeof(struct mds_body); req->rq_replen = lustre_msg_size(2, size); + } else if ( it->it_op == IT_UNLINK ) { + size[2] = sizeof(struct mds_rec_unlink); + size[3] = de->d_name.len + 1; + req = ptlrpc_prep_req(mdc->mdc_ldlm_client, mdc->mdc_conn, + LDLM_ENQUEUE, 4, size, NULL); + if (!req) + RETURN(-ENOMEM); + + /* pack the intent */ + lit = lustre_msg_buf(req->rq_reqmsg, 1); + lit->opc = NTOH__u64((__u64)it->it_op); + + /* pack the intended request */ + mds_unlink_pack(req, 2, dir, NULL, de->d_name.name, + de->d_name.len); + size[0] = sizeof(struct ldlm_reply); + size[1] = sizeof(struct obdo); + req->rq_replen = lustre_msg_size(2, size); } else if ( it->it_op == IT_GETATTR || it->it_op == IT_RENAME || it->it_op == IT_OPEN ) { size[2] = sizeof(struct mds_body); @@ -293,8 +316,10 @@ int mdc_enqueue(struct obd_conn *conn, int lock_type, struct lookup_intent *it, obddev->obd_namespace, NULL, res_id, lock_type, NULL, 0, lock_mode, &flags, (void *)mdc_lock_callback, data, datalen, lockh); - - if (rc != 0) { + if (rc == -ENOENT) { + lock_mode = 0; + memset(lockh, 0, sizeof(*lockh)); + } else if (rc != 0) { CERROR("ldlm_cli_enqueue: %d\n", rc); RETURN(rc); } @@ -316,6 +341,7 @@ int mdc_open(struct obd_conn *conn, ino_t ino, int type, int flags, struct mds_body *body; int rc, size[2] = {sizeof(*body)}, bufcount = 1; struct ptlrpc_request *req; + ENTRY; if (obdo != NULL) { bufcount = 2; @@ -686,7 +712,8 @@ static int mdc_connect(struct obd_conn *conn) ENTRY; - conn->oc_dev->obd_namespace = ldlm_namespace_new(LDLM_NAMESPACE_CLIENT); + conn->oc_dev->obd_namespace = + ldlm_namespace_new("mdc", LDLM_NAMESPACE_CLIENT); if (conn->oc_dev->obd_namespace == NULL) RETURN(-ENOMEM); diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 36c3448..4e1d8ec 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -260,7 +260,7 @@ static int mds_disconnect(struct mds_obd *mds, struct ptlrpc_request *req) } int mds_lock_callback(struct ldlm_lock *lock, struct ldlm_lock *new, - void *data, int data_len) + void *data, int data_len, struct ptlrpc_request **reqp) { ENTRY; @@ -317,6 +317,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req) rc = ldlm_local_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN, NULL, 0, lock_mode, &lockh); if (rc == 0) { + LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]); rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn, NULL, mds->mds_local_namespace, NULL, res_id, LDLM_PLAIN, NULL, 0, lock_mode, @@ -326,6 +327,9 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req) CERROR("lock enqueue: err: %d\n", rc); GOTO(out_create_de, rc = -EIO); } + } else { + lock = lustre_handle2object(&lockh); + LDLM_DEBUG(lock, "matched"); } ldlm_lock_dump((void *)(unsigned long)lockh.addr); @@ -882,14 +886,16 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf) GOTO(err_thread, rc); } - obddev->obd_namespace = ldlm_namespace_new(LDLM_NAMESPACE_SERVER); + obddev->obd_namespace = + ldlm_namespace_new("mds_server", LDLM_NAMESPACE_SERVER); if (obddev->obd_namespace == NULL) { LBUG(); mds_cleanup(obddev); GOTO(err_thread, rc); } - mds->mds_local_namespace = ldlm_namespace_new(LDLM_NAMESPACE_CLIENT); + mds->mds_local_namespace = + ldlm_namespace_new("mds_client", LDLM_NAMESPACE_CLIENT); if (mds->mds_local_namespace == NULL) { LBUG(); mds_cleanup(obddev); diff --git a/lustre/mds/mds_extN.c b/lustre/mds/mds_extN.c index 30f4d51..55ce734 100644 --- a/lustre/mds/mds_extN.c +++ b/lustre/mds/mds_extN.c @@ -100,20 +100,23 @@ static int mds_extN_setattr(struct dentry *dentry, void *handle, static int mds_extN_set_obdo(struct inode *inode, void *handle, struct obdo *obdo) { - struct mds_objid *data = (struct mds_objid *)obdo->o_inline; + struct mds_objid *data; int rc; - data->mo_magic = cpu_to_le64(XATTR_MDS_MO_MAGIC); + lock_kernel(); down(&inode->i_sem); if (obdo == NULL) rc = extN_xattr_set(handle, inode, EXTN_XATTR_INDEX_LUSTRE, XATTR_LUSTRE_MDS_OBJID, NULL, 0, 0); - else + else { + data = (struct mds_objid *)obdo->o_inline; + data->mo_magic = cpu_to_le64(XATTR_MDS_MO_MAGIC); rc = extN_xattr_set(handle, inode, EXTN_XATTR_INDEX_LUSTRE, XATTR_LUSTRE_MDS_OBJID, obdo->o_inline, OBD_INLINESZ, XATTR_CREATE); + } up(&inode->i_sem); unlock_kernel(); @@ -139,8 +142,8 @@ static int mds_extN_get_obdo(struct inode *inode, struct obdo *obdo) unlock_kernel(); if (rc < 0) { - CERROR("error getting EA %s from MDS inode %ld: rc = %d\n", - XATTR_LUSTRE_MDS_OBJID, inode->i_ino, rc); + CDEBUG(D_INFO, "error getting EA %s from MDS inode %ld: " + "rc = %d\n", XATTR_LUSTRE_MDS_OBJID, inode->i_ino, rc); obdo->o_id = 0; } else if (data->mo_magic != cpu_to_le64(XATTR_MDS_MO_MAGIC)) { CERROR("MDS object id %Ld has bad magic %Lx\n", diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index 6b95585..b2ab3c3 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -220,6 +220,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, rc = ldlm_local_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN, NULL, 0, lock_mode, &lockh); if (rc == 0) { + LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]); rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn, NULL, mds->mds_local_namespace, NULL, res_id, LDLM_PLAIN, NULL, 0, lock_mode, @@ -229,6 +230,9 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, CERROR("lock enqueue: err: %d\n", rc); GOTO(out_create_de, rc = -EIO); } + } else { + lock = lustre_handle2object(&lockh); + LDLM_DEBUG(lock, "matched"); } ldlm_lock_dump((void *)(unsigned long)lockh.addr); @@ -245,7 +249,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, struct mds_body *body; struct obdo *obdo; struct inode *inode = dchild->d_inode; - CERROR("child exists (dir %ld, name %s, ino %ld)\n", + CDEBUG(D_INODE, "child exists (dir %ld, name %s, ino %ld)\n", dir->i_ino, rec->ur_name, dchild->d_inode->i_ino); body = lustre_msg_buf(req->rq_repmsg, offset); @@ -373,7 +377,12 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, struct dentry *de = NULL; struct dentry *dchild = NULL; struct mds_obd *mds = &req->rq_obd->u.mds; + struct obdo *obdo; struct inode *dir, *inode; + int lock_mode, flags; + __u64 res_id[3] = {0}; + struct lustre_handle lockh; + struct ldlm_lock *lock; void *handle; int rc = 0; int err; @@ -384,8 +393,30 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, LBUG(); GOTO(out_unlink, rc = -ESTALE); } + dir = de->d_inode; CDEBUG(D_INODE, "parent ino %ld\n", dir->i_ino); + lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW; + res_id[0] = dir->i_ino; + + rc = ldlm_local_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN, + NULL, 0, lock_mode, &lockh); + if (rc == 0) { + LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]); + rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn, + NULL, mds->mds_local_namespace, NULL, + res_id, LDLM_PLAIN, NULL, 0, lock_mode, + &flags, (void *)mds_lock_callback, NULL, + 0, &lockh); + if (rc != ELDLM_OK) { + CERROR("lock enqueue: err: %d\n", rc); + GOTO(out_unlink_de, rc = -EIO); + } + } else { + lock = lustre_handle2object(&lockh); + LDLM_DEBUG(lock, "matched"); + } + ldlm_lock_dump((void *)(unsigned long)lockh.addr); down(&dir->i_sem); dchild = lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1); @@ -403,6 +434,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, GOTO(out_unlink_dchild, rc = -ESTALE); } +#if 0 /* in intent case the client doesn't have the inode */ if (inode->i_ino != rec->ur_fid2->id) { CERROR("inode and FID ID do not match (%ld != %Ld)\n", inode->i_ino, rec->ur_fid2->id); @@ -415,6 +447,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, LBUG(); GOTO(out_unlink_dchild, rc = -ESTALE); } +#endif OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE, dir->i_sb->s_dev); @@ -426,6 +459,14 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, rc = vfs_rmdir(dir, dchild); break; default: + if (offset) { + obdo = lustre_msg_buf(req->rq_repmsg, 1); + rc = mds_fs_get_obdo(mds, inode, obdo); + if (rc < 0) + CDEBUG(D_INFO, "No obdo for ino %ld err %d\n", + inode->i_ino, rc); + } + handle = mds_fs_start(mds, dir, MDS_FSOP_UNLINK); if (!handle) GOTO(out_unlink_dchild, rc = PTR_ERR(handle)); @@ -444,10 +485,39 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, EXIT; out_unlink_dchild: + res_id[0] = inode->i_ino; l_dput(dchild); out_unlink_de: up(&dir->i_sem); + lock = lustre_handle2object(&lockh); + ldlm_lock_decref(lock, lock_mode); + if (!rc) { + /* Take an exclusive lock on the resource that we're + * about to free, to force everyone to drop their + * locks. */ + LDLM_DEBUG_NOLOCK("getting EX lock res %Lu", res_id[0]); + rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn, + NULL, mds->mds_local_namespace, NULL, + res_id, + LDLM_PLAIN, NULL, 0, LCK_EX, &flags, + (void *)mds_lock_callback, NULL, 0, + &lockh); + if (rc) + CERROR("failed to get child inode lock (child ino %Ld, " + "dir ino %ld)\n", + res_id[0], de->d_inode->i_ino); + } + l_dput(de); + + if (!rc) { + lock = lustre_handle2object(&lockh); + ldlm_lock_decref(lock, LCK_EX); + if (ldlm_cli_cancel(lock->l_client, lock)) + CERROR("failed to get child inode lock ino %Ld\n", + res_id[0]); + } + out_unlink: req->rq_status = rc; return 0; diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 7590bfd..82a28eb 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -755,7 +755,8 @@ static int osc_setup(struct obd_device *obddev, obd_count len, void *buf) if (!osc->osc_conn) RETURN(-ENOENT); - obddev->obd_namespace = ldlm_namespace_new(LDLM_NAMESPACE_CLIENT); + obddev->obd_namespace = + ldlm_namespace_new("osc", LDLM_NAMESPACE_CLIENT); if (obddev->obd_namespace == NULL) GOTO(out_conn, rc = -ENOMEM); diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 79592f6..386cd7e 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -630,7 +630,8 @@ static int ost_setup(struct obd_device *obddev, obd_count len, void *buf) GOTO(error_dec, err = -EINVAL); } - obddev->obd_namespace = ldlm_namespace_new(LDLM_NAMESPACE_SERVER); + obddev->obd_namespace = + ldlm_namespace_new("ost", LDLM_NAMESPACE_SERVER); if (obddev->obd_namespace == NULL) LBUG(); diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 01bee46..61ca919 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -92,6 +92,7 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *conn) OBD_ALLOC(bulk, sizeof(*bulk)); if (bulk != NULL) { bulk->b_connection = ptlrpc_connection_addref(conn); + atomic_set(&bulk->b_pages_remaining, 0); init_waitqueue_head(&bulk->b_waitq); INIT_LIST_HEAD(&bulk->b_page_list); } @@ -183,6 +184,7 @@ struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, request->rq_reqmsg->target_id = HTON__u32(cl->cli_target_devno); INIT_LIST_HEAD(&request->rq_list); + INIT_LIST_HEAD(&request->rq_multi); /* this will be dec()d once in req_finished, once in free_committed */ atomic_set(&request->rq_refcount, 2); @@ -232,7 +234,7 @@ void ptlrpc_free_req(struct ptlrpc_request *request) } ptlrpc_put_connection(request->rq_connection); - + list_del(&request->rq_multi); OBD_FREE(request, sizeof(*request)); EXIT; } @@ -314,8 +316,12 @@ int ptlrpc_check_status(struct ptlrpc_request *req, int err) } if (req->rq_repmsg->status != 0) { - CERROR("req->rq_repmsg->status is %d\n", - req->rq_repmsg->status); + if (req->rq_repmsg->status < 0) + CERROR("req->rq_repmsg->status is %d\n", + req->rq_repmsg->status); + else + CDEBUG(D_INFO, "req->rq_repmsg->status is %d\n", + req->rq_repmsg->status); /* XXX: translate this error from net to host */ RETURN(req->rq_repmsg->status); } -- 1.8.3.1