From: pschwan Date: Tue, 6 Aug 2002 02:02:13 +0000 (+0000) Subject: - Grammatical, LDLM updates to network.lyx X-Git-Tag: 0.5.3~14 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=26f846563602a3275472079c2a360a88ac4db514;p=fs%2Flustre-release.git - Grammatical, LDLM updates to network.lyx - dlm cleanups, bugfixes - there are now two totally separate DLM callback packets and functions - beginnings of ldlm /proc bits - the lock server can now change the lock mode, in addition to the resource - fix one part of the open() object creation race by adding a semaphore that's taken in ll_file_open - some cleanup in obdclass/proc_lustre.c; much more is needed --- diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index cf1e79b..f8b4c21 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -7,6 +7,7 @@ #ifdef __KERNEL__ +#include #include #include @@ -80,13 +81,15 @@ static inline int lockmode_compat(ldlm_mode_t exist, ldlm_mode_t new) */ struct ldlm_namespace { - char *ns_name; - struct ptlrpc_client ns_rpc_client; /* used for revocation callbacks */ - __u32 ns_client; /* is this a client-side lock tree? */ - struct list_head *ns_hash; /* hash table for ns */ - __u32 ns_refcount; /* count of resources in the hash */ - struct list_head ns_root_list; /* all root resources in ns */ - struct lustre_lock ns_lock; /* protects hash, refcount, list */ + char *ns_name; + struct ptlrpc_client ns_rpc_client;/* used for revocation callbacks */ + __u32 ns_client; /* is this a client-side lock tree? */ + struct list_head *ns_hash; /* hash table for ns */ + __u32 ns_refcount; /* count of resources in the hash */ + struct list_head ns_root_list; /* all root resources in ns */ + struct lustre_lock ns_lock; /* protects hash, refcount, list */ + struct list_head ns_list_chain; /* position in global NS list */ + struct proc_dir_entry *ns_proc_dir; }; /* @@ -101,9 +104,9 @@ struct ldlm_namespace { struct ldlm_lock; -typedef int (*ldlm_lock_callback)(struct lustre_handle *lockh, - struct ldlm_lock_desc *new, void *data, - __u32 data_len); +typedef int (*ldlm_blocking_callback)(struct ldlm_lock *lock, + struct ldlm_lock_desc *new, void *data, + __u32 data_len); typedef int (*ldlm_completion_callback)(struct ldlm_lock *lock, int flags); @@ -120,7 +123,7 @@ struct ldlm_lock { ldlm_mode_t l_granted_mode; ldlm_completion_callback l_completion_ast; - ldlm_lock_callback l_blocking_ast; + ldlm_blocking_callback l_blocking_ast; struct ptlrpc_connection *l_connection; struct ptlrpc_client *l_client; @@ -160,10 +163,10 @@ extern ldlm_res_policy ldlm_res_policy_table []; struct ldlm_resource { struct ldlm_namespace *lr_namespace; struct list_head lr_hash; - struct list_head lr_rootlink; /* link all root resources in NS */ struct ldlm_resource *lr_parent; /* 0 for a root resource */ struct list_head lr_children; /* list head for child resources */ - struct list_head lr_childof; /* part of child list of parent */ + struct list_head lr_childof; /* part of ns_root_list if root res, + * part of lr_children if child */ struct list_head lr_granted; struct list_head lr_converting; @@ -275,7 +278,7 @@ ldlm_lock_create(struct ldlm_namespace *ns, ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock *lock, void *cookie, int cookie_len, int *flags, ldlm_completion_callback completion, - ldlm_lock_callback blocking); + ldlm_blocking_callback blocking); struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, int *flags); void ldlm_lock_cancel(struct ldlm_lock *lock); @@ -289,6 +292,8 @@ int ldlm_test(struct obd_device *device, struct ptlrpc_connection *conn); /* resource.c */ struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 local); int ldlm_namespace_free(struct ldlm_namespace *ns); +int ldlm_proc_setup(struct obd_device *obd); +void ldlm_proc_cleanup(struct obd_device *obd); /* resource.c - internal */ struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns, @@ -300,8 +305,10 @@ void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head, struct ldlm_lock *lock); void ldlm_resource_unlink_lock(struct ldlm_lock *lock); void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc); -void ldlm_resource_dump(struct ldlm_resource *res); -int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3]); +void ldlm_dump_all_namespaces(void); +void ldlm_namespace_dump(struct ldlm_namespace *); +void ldlm_resource_dump(struct ldlm_resource *); +int ldlm_lock_change_resource(struct ldlm_lock *, __u64 new_resid[3]); /* ldlm_request.c */ int ldlm_completion_ast(struct ldlm_lock *lock, int flags); @@ -315,7 +322,7 @@ int ldlm_cli_enqueue(struct lustre_handle *conn, ldlm_mode_t mode, int *flags, ldlm_completion_callback completion, - ldlm_lock_callback callback, + ldlm_blocking_callback callback, void *data, __u32 data_len, struct lustre_handle *lockh); @@ -329,7 +336,7 @@ int ldlm_match_or_enqueue(struct lustre_handle *connh, ldlm_mode_t mode, int *flags, ldlm_completion_callback completion, - ldlm_lock_callback callback, + ldlm_blocking_callback callback, void *data, __u32 data_len, struct lustre_handle *lockh); @@ -338,6 +345,11 @@ int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new, int ldlm_cli_convert(struct lustre_handle *, int new_mode, int *flags); int ldlm_cli_cancel(struct lustre_handle *lockh); +/* mds/handler.c */ +/* This has to be here because recurisve inclusion sucks. */ +int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, + void *data, __u32 data_len); + #endif /* __KERNEL__ */ /* ioctls for trying requests */ @@ -345,6 +357,7 @@ int ldlm_cli_cancel(struct lustre_handle *lockh); #define IOC_LDLM_MIN_NR 40 #define IOC_LDLM_TEST _IOWR('f', 40, long) +#define IOC_LDLM_DUMP _IOWR('f', 41, long) #define IOC_LDLM_MAX_NR 41 #endif diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index 4e14155..23f25c6 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -165,7 +165,6 @@ typedef uint32_t obd_count; #define OBD_FL_INLINEDATA (0x00000001) #define OBD_FL_OBDMDEXISTS (0x00000002) -#define OBD_FL_CREATEONOPEN (0x00000004) #define OBD_INLINESZ 60 @@ -422,7 +421,8 @@ struct lov_desc { #define LDLM_ENQUEUE 101 #define LDLM_CONVERT 102 #define LDLM_CANCEL 103 -#define LDLM_CALLBACK 104 +#define LDLM_BL_CALLBACK 104 +#define LDLM_CP_CALLBACK 105 #define RES_NAME_SIZE 3 #define RES_VERSION_SIZE 4 @@ -470,6 +470,7 @@ struct ldlm_request { struct ldlm_reply { __u32 lock_flags; __u64 lock_resource_name[RES_NAME_SIZE]; + __u32 lock_mode; struct lustre_handle lock_handle; struct ldlm_extent lock_extent; /* XXX make this policy 1 &2 */ __u64 lock_policy_res1; diff --git a/lustre/include/linux/lustre_lite.h b/lustre/include/linux/lustre_lite.h index 14ff974..02b4c6e 100644 --- a/lustre/include/linux/lustre_lite.h +++ b/lustre/include/linux/lustre_lite.h @@ -42,14 +42,10 @@ struct ll_inode_md { #define LL_INLINESZ 60 struct ll_inode_info { - int lli_flags; - // struct obdo *lli_obdo; - struct lov_stripe_md *lli_smd; - // int lli_obdo_mdsz; - //void *lli_obdo_md; - char *lli_symlink_name; - char lli_inline[LL_INLINESZ]; - struct lustre_handle lli_intent_lock_handle; + struct lov_stripe_md *lli_smd; + char *lli_symlink_name; + struct lustre_handle lli_intent_lock_handle; + struct semaphore lli_open_sem; }; #define LL_SUPER_MAGIC 0x0BD00BD0 @@ -101,12 +97,6 @@ static inline struct ll_inode_info *ll_i2info(struct inode *inode) return (struct ll_inode_info *)&(inode->u.generic_ip); } -static inline int ll_has_inline(struct inode *inode) -{ - return (ll_i2info(inode)->lli_flags & OBD_FL_INLINEDATA); -} - - static inline struct lustre_handle *ll_i2obdconn(struct inode *inode) { return ll_s2obdconn(inode->i_sb); diff --git a/lustre/include/linux/lustre_mds.h b/lustre/include/linux/lustre_mds.h index 34c90de..175c52b 100644 --- a/lustre/include/linux/lustre_mds.h +++ b/lustre/include/linux/lustre_mds.h @@ -134,8 +134,6 @@ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, struct lustre_handle *lockh); struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt); -int mds_lock_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *desc, - void *data, int data_len, struct ptlrpc_request **req); int mds_reint(int offset, struct ptlrpc_request *req); /* mdc/mdc_request.c */ diff --git a/lustre/include/linux/obd.h b/lustre/include/linux/obd.h index cc58695..d2553bbc 100644 --- a/lustre/include/linux/obd.h +++ b/lustre/include/linux/obd.h @@ -12,6 +12,7 @@ #include #include #include +#include #include diff --git a/lustre/include/linux/obd_support.h b/lustre/include/linux/obd_support.h index 887cfb9..3c0a9e1 100644 --- a/lustre/include/linux/obd_support.h +++ b/lustre/include/linux/obd_support.h @@ -83,7 +83,8 @@ extern unsigned long obd_fail_loc; #define OBD_FAIL_LDLM_ENQUEUE 0x302 #define OBD_FAIL_LDLM_CONVERT 0x303 #define OBD_FAIL_LDLM_CANCEL 0x304 -#define OBD_FAIL_LDLM_CALLBACK 0x305 +#define OBD_FAIL_LDLM_BL_CALLBACK 0x305 +#define OBD_FAIL_LDLM_CP_CALLBACK 0x306 /* preparation for a more advanced failure testbed (not functional yet) */ #define OBD_FAIL_MASK_SYS 0x0000FF00 diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index c10d560..6eb1e60 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -21,7 +21,7 @@ /* lock types */ char *ldlm_lockname[] = { - [0] "--", + [0] "--", [LCK_EX] "EX", [LCK_PW] "PW", [LCK_PR] "PR", @@ -30,30 +30,30 @@ char *ldlm_lockname[] = { [LCK_NL] "NL" }; char *ldlm_typename[] = { - [LDLM_PLAIN] "PLN", - [LDLM_EXTENT] "EXT", + [LDLM_PLAIN] "PLN", + [LDLM_EXTENT] "EXT", [LDLM_MDSINTENT] "INT" }; char *ldlm_it2str(int it) { - switch (it) { + switch (it) { case IT_OPEN: return "open"; case IT_CREAT: return "creat"; - case (IT_OPEN|IT_CREAT): + case (IT_OPEN | IT_CREAT): return "open|creat"; case IT_MKDIR: return "mkdir"; case IT_LINK: - return "link"; + return "link"; case IT_SYMLINK: return "symlink"; case IT_UNLINK: return "unlink"; case IT_RMDIR: - return "rmdir"; + return "rmdir"; case IT_RENAME: return "rename"; case IT_RENAME2: @@ -71,7 +71,7 @@ char *ldlm_it2str(int it) case IT_LOOKUP: return "lookup"; default: - CERROR("Unknown intent %d\n", it); + CERROR("Unknown intent %d\n", it); return "UNKNOWN"; } } @@ -80,20 +80,20 @@ extern kmem_cache_t *ldlm_lock_slab; static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b); -ldlm_res_compat ldlm_res_compat_table [] = { +ldlm_res_compat ldlm_res_compat_table[] = { [LDLM_PLAIN] ldlm_plain_compat, [LDLM_EXTENT] ldlm_extent_compat, [LDLM_MDSINTENT] ldlm_plain_compat }; -ldlm_res_policy ldlm_res_policy_table [] = { +ldlm_res_policy ldlm_res_policy_table[] = { [LDLM_PLAIN] NULL, [LDLM_EXTENT] ldlm_extent_policy, [LDLM_MDSINTENT] NULL }; -void ldlm_register_intent(int (*arg)(struct ldlm_lock *lock, void *req_cookie, - ldlm_mode_t mode, void *data)) +void ldlm_register_intent(int (*arg) (struct ldlm_lock * lock, void *req_cookie, + ldlm_mode_t mode, void *data)) { ldlm_res_policy_table[LDLM_MDSINTENT] = arg; } @@ -237,8 +237,16 @@ int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3]) ENTRY; l_lock(&ns->ns_lock); - type = lock->l_resource->lr_type; + if (memcmp(new_resid, lock->l_resource->lr_name, + sizeof(lock->l_resource->lr_name)) == 0) { + /* Nothing to do */ + l_unlock(&ns->ns_lock); + RETURN(0); + } + type = lock->l_resource->lr_type; + if (new_resid[0] == 0) + LBUG(); lock->l_resource = ldlm_resource_get(ns, NULL, new_resid, type, 1); if (lock->l_resource == NULL) { LBUG(); @@ -266,7 +274,7 @@ int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3]) void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh) { - lockh->addr = (__u64)(unsigned long)lock; + lockh->addr = (__u64) (unsigned long)lock; lockh->cookie = lock->l_random; } @@ -291,7 +299,7 @@ struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *handle) retval = LDLM_LOCK_GET(lock); EXIT; - out: + out: l_unlock(&lock->l_resource->lr_namespace->ns_lock); return retval; } @@ -337,7 +345,7 @@ static void ldlm_add_ast_work_item(struct ldlm_lock *lock, w->w_lock = LDLM_LOCK_GET(lock); list_add(&w->w_list, lock->l_resource->lr_tmp); - out: + out: l_unlock(&lock->l_resource->lr_namespace->ns_lock); return; } @@ -384,8 +392,6 @@ void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode) * run the callback. */ if (!lock->l_readers && !lock->l_writers && (lock->l_flags & LDLM_FL_CBPENDING)) { - struct lustre_handle lockh; - if (!lock->l_resource->lr_namespace->ns_client) { CERROR("LDLM_FL_CBPENDING set on non-local lock!\n"); LBUG(); @@ -394,21 +400,20 @@ void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode) LDLM_DEBUG(lock, "final decref done on cbpending lock"); l_unlock(&lock->l_resource->lr_namespace->ns_lock); - ldlm_lock2handle(lock, &lockh); - /* FIXME: -1 is a really, really bad 'desc' */ - lock->l_blocking_ast(&lockh, (void *)-1, lock->l_data, + /* FIXME: need a real 'desc' here */ + lock->l_blocking_ast(lock, NULL, lock->l_data, lock->l_data_len); } else l_unlock(&lock->l_resource->lr_namespace->ns_lock); - LDLM_LOCK_PUT(lock); /* matches the ldlm_lock_get in addref */ - LDLM_LOCK_PUT(lock); /* matches the handle2lock above */ + LDLM_LOCK_PUT(lock); /* matches the ldlm_lock_get in addref */ + LDLM_LOCK_PUT(lock); /* matches the handle2lock above */ EXIT; } static int ldlm_lock_compat_list(struct ldlm_lock *lock, int send_cbs, - struct list_head *queue) + struct list_head *queue) { struct list_head *tmp, *pos; int rc = 1; @@ -482,7 +487,7 @@ void ldlm_grant_lock(struct ldlm_lock *lock) EXIT; } -/* returns a referenced lock or NULL */ +/* returns a referenced lock or NULL */ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, struct ldlm_extent *extent) { @@ -517,7 +522,7 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, * Returns 1 if it finds an already-existing lock that is compatible; in this * case, lockh is filled in with a addref()ed lock */ -int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type, +int ldlm_lock_match(struct ldlm_namespace *ns, __u64 * res_id, __u32 type, void *cookie, int cookielen, ldlm_mode_t mode, struct lustre_handle *lockh) { @@ -541,14 +546,14 @@ int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type, GOTO(out, rc = 1); EXIT; - out: + out: ldlm_resource_put(res); l_unlock(&ns->ns_lock); if (lock) { ldlm_lock2handle(lock, lockh); if (lock->l_completion_ast) - lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC); + lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC); } if (rc) LDLM_DEBUG(lock, "matched"); @@ -560,10 +565,8 @@ int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type, /* Returns a referenced, lock */ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns, struct lustre_handle *parent_lock_handle, - __u64 *res_id, __u32 type, - ldlm_mode_t mode, - void *data, - __u32 data_len) + __u64 * res_id, __u32 type, + ldlm_mode_t mode, void *data, __u32 data_len) { struct ldlm_resource *res, *parent_res = NULL; struct ldlm_lock *lock, *parent_lock; @@ -590,11 +593,11 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns, } /* Must be called with lock->l_lock and lock->l_resource->lr_lock not held */ -ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock *lock, +ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock * lock, void *cookie, int cookie_len, int *flags, ldlm_completion_callback completion, - ldlm_lock_callback blocking) + ldlm_blocking_callback blocking) { struct ldlm_resource *res; int local; @@ -632,9 +635,12 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock *lock, GOTO(out, ELDLM_OK); } - /* If this is a local resource, put it on the appropriate list. */ - /* FIXME: don't like this: can we call ldlm_resource_unlink_lock? */ - list_del_init(&lock->l_res_link); + /* This distinction between local lock trees is very important; a client + * namespace only has information about locks taken by that client, and + * thus doesn't have enough information to decide for itself if it can + * be granted (below). In this case, we do exactly what the server + * tells us to do, as dictated by the 'flags' */ + ldlm_resource_unlink_lock(lock); if (local) { if (*flags & LDLM_FL_BLOCK_CONV) ldlm_resource_add_lock(res, res->lr_converting.prev, @@ -657,7 +663,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock *lock, *flags |= LDLM_FL_BLOCK_WAIT; GOTO(out, ELDLM_OK); } - if (!ldlm_lock_compat(lock,0)) { + if (!ldlm_lock_compat(lock, 0)) { ldlm_resource_add_lock(res, res->lr_waiting.prev, lock); *flags |= LDLM_FL_BLOCK_GRANTED; GOTO(out, ELDLM_OK); @@ -665,7 +671,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock *lock, ldlm_grant_lock(lock); EXIT; - out: + out: /* Don't set 'completion_ast' until here so that if the lock is granted * immediately we don't do an unnecessary completion call. */ lock->l_completion_ast = completion; @@ -704,12 +710,11 @@ void ldlm_run_ast_work(struct list_head *rpc_list) list_for_each_safe(tmp, pos, rpc_list) { struct ldlm_ast_work *w = list_entry(tmp, struct ldlm_ast_work, w_list); - struct lustre_handle lockh; - ldlm_lock2handle(w->w_lock, &lockh); if (w->w_blocking) rc = w->w_lock->l_blocking_ast - (&lockh, &w->w_desc, w->w_data, w->w_datalen); + (w->w_lock, &w->w_desc, w->w_data, + w->w_datalen); else rc = w->w_lock->l_completion_ast(w->w_lock, w->w_flags); if (rc) @@ -797,7 +802,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, granted = 1; /* FIXME: completion handling not with ns_lock held ! */ if (lock->l_completion_ast) - lock->l_completion_ast(lock, 0); + lock->l_completion_ast(lock, 0); } } else list_add(&lock->l_res_link, res->lr_converting.prev); diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index bc659f9..a1cf9b4 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -21,7 +21,38 @@ extern kmem_cache_t *ldlm_lock_slab; extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req); extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req); -int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags) +static int ldlm_server_blocking_ast(struct ldlm_lock *lock, + struct ldlm_lock_desc *desc, + void *data, __u32 data_len) +{ + struct ldlm_request *body; + struct ptlrpc_request *req; + struct ptlrpc_client *cl; + int rc = 0, size = sizeof(*body); + ENTRY; + + cl = &lock->l_resource->lr_namespace->ns_rpc_client; + req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_BL_CALLBACK, 1, + &size, NULL); + if (!req) + RETURN(-ENOMEM); + + body = lustre_msg_buf(req->rq_reqmsg, 0); + memcpy(&body->lock_handle1, &lock->l_remote_handle, + sizeof(body->lock_handle1)); + memcpy(&body->lock_desc, desc, sizeof(*desc)); + + LDLM_DEBUG(lock, "server preparing blocking AST"); + req->rq_replen = lustre_msg_size(0, NULL); + + rc = ptlrpc_queue_wait(req); + rc = ptlrpc_check_status(req, rc); + ptlrpc_free_req(req); + + RETURN(rc); +} + +static int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags) { struct ldlm_request *body; struct ptlrpc_request *req; @@ -35,7 +66,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags) } cl = &lock->l_resource->lr_namespace->ns_rpc_client; - req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1, + req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CP_CALLBACK, 1, &size, NULL); if (!req) RETURN(-ENOMEM); @@ -44,6 +75,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags) memcpy(&body->lock_handle1, &lock->l_remote_handle, sizeof(body->lock_handle1)); body->lock_flags = flags; + ldlm_lock2desc(lock, &body->lock_desc); LDLM_DEBUG(lock, "server preparing completion AST"); req->rq_replen = lustre_msg_size(0, NULL); @@ -101,7 +133,8 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req) flags = dlm_req->lock_flags; err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags, - ldlm_server_completion_ast, ldlm_server_ast); + ldlm_server_completion_ast, + ldlm_server_blocking_ast); if (err != ELDLM_OK) GOTO(out, err); @@ -112,9 +145,11 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req) if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) memcpy(&dlm_rep->lock_extent, &lock->l_extent, sizeof(lock->l_extent)); - if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED) + if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED) { memcpy(dlm_rep->lock_resource_name, lock->l_resource->lr_name, sizeof(dlm_rep->lock_resource_name)); + dlm_rep->lock_mode = lock->l_req_mode; + } lock->l_connection = ptlrpc_connection_addref(req->rq_connection); EXIT; @@ -213,97 +248,104 @@ int ldlm_handle_cancel(struct ptlrpc_request *req) RETURN(0); } -static int ldlm_handle_callback(struct ptlrpc_request *req) +static int ldlm_handle_bl_callback(struct ptlrpc_request *req) { struct ldlm_request *dlm_req; - struct ldlm_lock_desc *descp = NULL; struct ldlm_lock *lock; - __u64 is_blocking_ast = 0; - int rc; + int rc, do_ast; ENTRY; rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg); - if (rc) { - CERROR("out of memory\n"); + if (rc) RETURN(-ENOMEM); - } + rc = ptlrpc_reply(req->rq_svc, req); + if (rc) + RETURN(rc); + dlm_req = lustre_msg_buf(req->rq_reqmsg, 0); - /* We must send the reply first, so that the thread is free to handle - * any requests made in common_callback() */ + lock = ldlm_handle2lock(&dlm_req->lock_handle1); + if (!lock) { + CERROR("blocking callback on lock %Lx - lock disappeared\n", + dlm_req->lock_handle1.addr); + RETURN(0); + } + + LDLM_DEBUG(lock, "client blocking AST callback handler START"); + + l_lock(&lock->l_resource->lr_namespace->ns_lock); + lock->l_flags |= LDLM_FL_CBPENDING; + do_ast = (!lock->l_readers && !lock->l_writers); + l_unlock(&lock->l_resource->lr_namespace->ns_lock); + + if (do_ast) { + LDLM_DEBUG(lock, "already unused, calling " + "callback (%p)", lock->l_blocking_ast); + if (lock->l_blocking_ast != NULL) { + lock->l_blocking_ast(lock, &dlm_req->lock_desc, + lock->l_data, lock->l_data_len); + } + } else + LDLM_DEBUG(lock, "Lock still has references, will be" + " cancelled later"); + + LDLM_DEBUG(lock, "client blocking callback handler END"); + LDLM_LOCK_PUT(lock); + RETURN(0); +} + +static int ldlm_handle_cp_callback(struct ptlrpc_request *req) +{ + struct list_head ast_list = LIST_HEAD_INIT(ast_list); + struct ldlm_request *dlm_req; + struct ldlm_lock *lock; + int rc; + ENTRY; + + rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg); + if (rc) + RETURN(-ENOMEM); rc = ptlrpc_reply(req->rq_svc, req); - if (rc != 0) + if (rc) RETURN(rc); + dlm_req = lustre_msg_buf(req->rq_reqmsg, 0); + lock = ldlm_handle2lock(&dlm_req->lock_handle1); if (!lock) { - CERROR("callback on lock %Lx - lock disappeared\n", + CERROR("completion callback on lock %Lx - lock disappeared\n", dlm_req->lock_handle1.addr); RETURN(0); } - /* check if this is a blocking AST */ - if (dlm_req->lock_desc.l_req_mode != - dlm_req->lock_desc.l_granted_mode) { - descp = &dlm_req->lock_desc; - is_blocking_ast = 1; - } + LDLM_DEBUG(lock, "client completion callback handler START"); - LDLM_DEBUG(lock, "client %s callback handler START", - is_blocking_ast ? "blocked" : "completion"); - - if (descp) { - int do_ast; - l_lock(&lock->l_resource->lr_namespace->ns_lock); - lock->l_flags |= LDLM_FL_CBPENDING; - do_ast = (!lock->l_readers && !lock->l_writers); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); - - if (do_ast) { - LDLM_DEBUG(lock, "already unused, calling " - "callback (%p)", lock->l_blocking_ast); - if (lock->l_blocking_ast != NULL) { - struct lustre_handle lockh; - ldlm_lock2handle(lock, &lockh); - lock->l_blocking_ast(&lockh, descp, - lock->l_data, - lock->l_data_len); - } - } else { - LDLM_DEBUG(lock, "Lock still has references, will be" - " cancelled later"); - } - LDLM_LOCK_PUT(lock); - } else { - struct list_head ast_list = LIST_HEAD_INIT(ast_list); - - l_lock(&lock->l_resource->lr_namespace->ns_lock); - lock->l_req_mode = dlm_req->lock_desc.l_granted_mode; - - /* If we receive the completion AST before the actual enqueue - * returned, then we might need to switch resources. */ - ldlm_resource_unlink_lock(lock); - if (memcmp(dlm_req->lock_desc.l_resource.lr_name, - lock->l_resource->lr_name, - sizeof(__u64) * RES_NAME_SIZE) != 0) { - ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name); - LDLM_DEBUG(lock, "completion AST, new resource"); - } - lock->l_resource->lr_tmp = &ast_list; - ldlm_grant_lock(lock); - lock->l_resource->lr_tmp = NULL; - l_unlock(&lock->l_resource->lr_namespace->ns_lock); - LDLM_LOCK_PUT(lock); + l_lock(&lock->l_resource->lr_namespace->ns_lock); + lock->l_req_mode = dlm_req->lock_desc.l_granted_mode; - ldlm_run_ast_work(&ast_list); + /* If we receive the completion AST before the actual enqueue returned, + * then we might need to switch resources. */ + ldlm_resource_unlink_lock(lock); + if (memcmp(dlm_req->lock_desc.l_resource.lr_name, + lock->l_resource->lr_name, + sizeof(__u64) * RES_NAME_SIZE) != 0) { + ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name); + LDLM_DEBUG(lock, "completion AST, new resource"); } + lock->l_resource->lr_tmp = &ast_list; + ldlm_grant_lock(lock); + lock->l_resource->lr_tmp = NULL; + l_unlock(&lock->l_resource->lr_namespace->ns_lock); + LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work"); + LDLM_LOCK_PUT(lock); - LDLM_DEBUG_NOLOCK("client %s callback handler END (lock %p)", - is_blocking_ast ? "blocked" : "completion", lock); + ldlm_run_ast_work(&ast_list); + + LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)", + lock); RETURN(0); } - static int ldlm_callback_handler(struct ptlrpc_request *req) { int rc; @@ -321,10 +363,15 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) GOTO(out, rc = -EINVAL); } switch (req->rq_reqmsg->opc) { - case LDLM_CALLBACK: - CDEBUG(D_INODE, "callback\n"); - OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0); - rc = ldlm_handle_callback(req); + case LDLM_BL_CALLBACK: + CDEBUG(D_INODE, "blocking ast\n"); + OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0); + rc = ldlm_handle_bl_callback(req); + break; + case LDLM_CP_CALLBACK: + CDEBUG(D_INODE, "completion ast\n"); + OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0); + rc = ldlm_handle_cp_callback(req); break; default: @@ -345,7 +392,7 @@ static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len, { struct obd_device *obddev = class_conn2obd(conn); struct ptlrpc_connection *connection; - int err; + int err = 0; ENTRY; if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR || @@ -365,11 +412,13 @@ static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len, CERROR("No LDLM UUID found: assuming ldlm is local.\n"); switch (cmd) { - case IOC_LDLM_TEST: { + case IOC_LDLM_TEST: err = ldlm_test(obddev, connection); CERROR("-- done err %d\n", err); GOTO(out, err); - } + case IOC_LDLM_DUMP: + ldlm_dump_all_namespaces(); + GOTO(out, err); default: GOTO(out, err = -EINVAL); } @@ -387,17 +436,20 @@ static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len, static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf) { struct ldlm_obd *ldlm = &obddev->u.ldlm; - int rc; - int i; + int rc, i; ENTRY; MOD_INC_USE_COUNT; - ldlm->ldlm_service = - ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL, - LDLM_REPLY_PORTAL, "self", ldlm_callback_handler); + rc = ldlm_proc_setup(obddev); + if (rc != 0) + GOTO(out_dec, rc); + + ldlm->ldlm_service = ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL, + LDLM_REPLY_PORTAL, "self", + ldlm_callback_handler); if (!ldlm->ldlm_service) { LBUG(); - GOTO(out_dec, rc = -ENOMEM); + GOTO(out_proc, rc = -ENOMEM); } for (i = 0; i < LDLM_NUM_THREADS; i++) { @@ -413,10 +465,12 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf) RETURN(0); -out_thread: + out_thread: ptlrpc_stop_all_threads(ldlm->ldlm_service); ptlrpc_unregister_service(ldlm->ldlm_service); -out_dec: + out_proc: + ldlm_proc_cleanup(obddev); + out_dec: MOD_DEC_USE_COUNT; return rc; } @@ -428,6 +482,7 @@ static int ldlm_cleanup(struct obd_device *obddev) ptlrpc_stop_all_threads(ldlm->ldlm_service); ptlrpc_unregister_service(ldlm->ldlm_service); + ldlm_proc_cleanup(obddev); MOD_DEC_USE_COUNT; RETURN(0); @@ -441,7 +496,6 @@ struct obd_ops ldlm_obd_ops = { o_disconnect: class_disconnect }; - static int __init ldlm_init(void) { int rc = class_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME); @@ -483,6 +537,7 @@ EXPORT_SYMBOL(ldlm_unregister_intent); EXPORT_SYMBOL(ldlm_lockname); EXPORT_SYMBOL(ldlm_typename); EXPORT_SYMBOL(ldlm_handle2lock); +EXPORT_SYMBOL(ldlm_lock2handle); EXPORT_SYMBOL(ldlm_lock_match); EXPORT_SYMBOL(ldlm_lock_addref); EXPORT_SYMBOL(ldlm_lock_decref); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 5f21751..2b83255 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -15,7 +15,6 @@ int ldlm_completion_ast(struct ldlm_lock *lock, int flags) { - ENTRY; if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED | @@ -26,19 +25,19 @@ int ldlm_completion_ast(struct ldlm_lock *lock, int flags) " sleeping"); ldlm_lock_dump(lock); ldlm_reprocess_all(lock->l_resource); - wait_event(lock->l_waitq, lock->l_req_mode == lock->l_granted_mode); + wait_event(lock->l_waitq, (lock->l_req_mode == + lock->l_granted_mode)); LDLM_DEBUG(lock, "client-side enqueue waking up: granted"); - } else if (flags == LDLM_FL_WAIT_NOREPROC) { - wait_event(lock->l_waitq, lock->l_req_mode == lock->l_granted_mode); - } else if (flags == 0) { + } else if (flags == LDLM_FL_WAIT_NOREPROC) { + wait_event(lock->l_waitq, (lock->l_req_mode == + lock->l_granted_mode)); + } else if (flags == 0) { wake_up(&lock->l_waitq); - } RETURN(0); } - static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, struct lustre_handle *parent_lockh, __u64 *res_id, @@ -47,16 +46,16 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, ldlm_mode_t mode, int *flags, ldlm_completion_callback completion, - ldlm_lock_callback callback, + ldlm_blocking_callback blocking, void *data, __u32 data_len, struct lustre_handle *lockh) { - struct ldlm_lock *lock; - int err; + struct ldlm_lock *lock; + int err; - if (ns->ns_client) { - CERROR("Trying to cancel local lock\n"); + if (ns->ns_client) { + CERROR("Trying to cancel local lock\n"); LBUG(); } @@ -67,9 +66,10 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, ldlm_lock_addref_internal(lock, mode); ldlm_lock2handle(lock, lockh); - lock->l_connh = NULL; + lock->l_connh = NULL; - err = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion, callback); + err = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion, + blocking); if (err != ELDLM_OK) GOTO(out, err); @@ -78,10 +78,11 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, if ((*flags) & LDLM_FL_LOCK_CHANGED) memcpy(res_id, lock->l_resource->lr_name, sizeof(*res_id)); - LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)", lock); + LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)", + lock); if (lock->l_completion_ast) - lock->l_completion_ast(lock, *flags); + lock->l_completion_ast(lock, *flags); LDLM_DEBUG(lock, "client-side local enqueue END"); EXIT; @@ -91,8 +92,7 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, return err; } - -int ldlm_cli_enqueue(struct lustre_handle *connh, +int ldlm_cli_enqueue(struct lustre_handle *connh, struct ptlrpc_request *req, struct ldlm_namespace *ns, struct lustre_handle *parent_lock_handle, @@ -102,7 +102,7 @@ int ldlm_cli_enqueue(struct lustre_handle *connh, ldlm_mode_t mode, int *flags, ldlm_completion_callback completion, - ldlm_lock_callback callback, + ldlm_blocking_callback blocking, void *data, __u32 data_len, struct lustre_handle *lockh) @@ -114,10 +114,11 @@ int ldlm_cli_enqueue(struct lustre_handle *connh, int rc, size = sizeof(*body), req_passed_in = 1; ENTRY; - if (connh == NULL) - return ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id, - type, cookie, cookielen, mode, - flags, completion, callback, data, data_len, lockh); + if (connh == NULL) + return ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id, + type, cookie, cookielen, mode, + flags, completion, blocking, data, + data_len, lockh); connection = client_conn2cli(connh)->cl_conn; *flags = 0; @@ -157,7 +158,7 @@ int ldlm_cli_enqueue(struct lustre_handle *connh, size = sizeof(*reply); req->rq_replen = lustre_msg_size(1, &size); } - lock->l_connh = connh; + lock->l_connh = connh; lock->l_connection = ptlrpc_connection_addref(connection); lock->l_client = client_conn2cli(connh)->cl_client; @@ -191,27 +192,38 @@ int ldlm_cli_enqueue(struct lustre_handle *connh, /* If enqueue returned a blocked lock but the completion handler has * already run, then it fixed up the resource and we don't need to do it * again. */ - if ((*flags) & LDLM_FL_LOCK_CHANGED && - lock->l_req_mode != lock->l_granted_mode) { - CDEBUG(D_INFO, "remote intent success, locking %ld instead of" - "%ld\n", (long)reply->lock_resource_name[0], - (long)lock->l_resource->lr_name[0]); - - ldlm_lock_change_resource(lock, reply->lock_resource_name); - if (lock->l_resource == NULL) { - LBUG(); - RETURN(-ENOMEM); + if ((*flags) & LDLM_FL_LOCK_CHANGED) { + int newmode = reply->lock_mode; + if (newmode && newmode != lock->l_req_mode) { + LDLM_DEBUG(lock, "server returned different mode %s", + ldlm_lockname[newmode]); + lock->l_req_mode = newmode; + } + + if (reply->lock_resource_name[0] != + lock->l_resource->lr_name[0]) { + CDEBUG(D_INFO, "remote intent success, locking %ld " + "instead of %ld\n", + (long)reply->lock_resource_name[0], + (long)lock->l_resource->lr_name[0]); + + ldlm_lock_change_resource(lock, + reply->lock_resource_name); + if (lock->l_resource == NULL) { + LBUG(); + RETURN(-ENOMEM); + } + LDLM_DEBUG(lock, "client-side enqueue, new resource"); } - LDLM_DEBUG(lock, "client-side enqueue, new resource"); } if (!req_passed_in) ptlrpc_free_req(req); rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion, - callback); + blocking); if (lock->l_completion_ast) - lock->l_completion_ast(lock, *flags); + lock->l_completion_ast(lock, *flags); LDLM_DEBUG(lock, "client-side enqueue END"); EXIT; @@ -221,7 +233,7 @@ int ldlm_cli_enqueue(struct lustre_handle *connh, return rc; } -int ldlm_match_or_enqueue(struct lustre_handle *connh, +int ldlm_match_or_enqueue(struct lustre_handle *connh, struct ptlrpc_request *req, struct ldlm_namespace *ns, struct lustre_handle *parent_lock_handle, @@ -231,7 +243,7 @@ int ldlm_match_or_enqueue(struct lustre_handle *connh, ldlm_mode_t mode, int *flags, ldlm_completion_callback completion, - ldlm_lock_callback callback, + ldlm_blocking_callback blocking, void *data, __u32 data_len, struct lustre_handle *lockh) @@ -242,8 +254,8 @@ int ldlm_match_or_enqueue(struct lustre_handle *connh, if (rc == 0) { rc = ldlm_cli_enqueue(connh, req, ns, parent_lock_handle, res_id, type, cookie, - cookielen, mode, flags, completion, callback, data, - data_len, lockh); + cookielen, mode, flags, completion, + blocking, data, data_len, lockh); if (rc != ELDLM_OK) CERROR("ldlm_cli_enqueue: err: %d\n", rc); RETURN(rc); @@ -251,66 +263,17 @@ int ldlm_match_or_enqueue(struct lustre_handle *connh, RETURN(0); } -int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *desc, - void *data, __u32 data_len) +static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode, + int *flags) { - struct ldlm_lock *lock; - struct ldlm_request *body; - struct ptlrpc_request *req; - struct ptlrpc_client *cl; - int rc = 0, size = sizeof(*body); - ENTRY; - lock = ldlm_handle2lock(lockh); - if (lock == NULL) { - LBUG(); - RETURN(-EINVAL); - } - cl = &lock->l_resource->lr_namespace->ns_rpc_client; - req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1, - &size, NULL); - if (!req) - GOTO(out, rc = -ENOMEM); - - body = lustre_msg_buf(req->rq_reqmsg, 0); - memcpy(&body->lock_handle1, &lock->l_remote_handle, - sizeof(body->lock_handle1)); - - if (desc == NULL) { - CDEBUG(D_NET, "Sending granted AST\n"); - ldlm_lock2desc(lock, &body->lock_desc); - } else { - CDEBUG(D_NET, "Sending blocked AST\n"); - memcpy(&body->lock_desc, desc, sizeof(*desc)); - } - - LDLM_DEBUG(lock, "server preparing %s AST", - desc == 0 ? "completion" : "blocked"); - - req->rq_replen = lustre_msg_size(0, NULL); - - rc = ptlrpc_queue_wait(req); - rc = ptlrpc_check_status(req, rc); - ptlrpc_free_req(req); - - EXIT; - out: - LDLM_LOCK_PUT(lock); - return rc; -} - - - -static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode, int *flags) -{ - - if (lock->l_resource->lr_namespace->ns_client) { - CERROR("Trying to cancel local lock\n"); + if (lock->l_resource->lr_namespace->ns_client) { + CERROR("Trying to cancel local lock\n"); LBUG(); } LDLM_DEBUG(lock, "client-side local convert"); - ldlm_lock_convert(lock, new_mode, flags); + ldlm_lock_convert(lock, new_mode, flags); ldlm_reprocess_all(lock->l_resource); LDLM_DEBUG(lock, "client-side local convert handler END"); @@ -335,7 +298,7 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags) RETURN(-EINVAL); } *flags = 0; - connh = lock->l_connh; + connh = lock->l_connh; if (!connh) return ldlm_cli_convert_local(lock, new_mode, flags); @@ -368,7 +331,7 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags) /* Go to sleep until the lock is granted. */ /* FIXME: or cancelled. */ if (lock->l_completion_ast) - lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC); + lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC); EXIT; out: LDLM_LOCK_PUT(lock); @@ -384,7 +347,7 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) int rc = 0, size = sizeof(*body); ENTRY; - lock = ldlm_handle2lock(lockh); + lock = ldlm_handle2lock(lockh); if (!lock) { /* It's possible that the decref that we did just before this * cancel was the last reader/writer, and caused a cancel before @@ -395,33 +358,34 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) RETURN(-EINVAL); } - if (lock->l_connh) { + if (lock->l_connh) { LDLM_DEBUG(lock, "client-side cancel"); - req = ptlrpc_prep_req2(lock->l_connh, LDLM_CANCEL, 1, &size, NULL); + req = ptlrpc_prep_req2(lock->l_connh, LDLM_CANCEL, 1, &size, + NULL); if (!req) GOTO(out, rc = -ENOMEM); - + body = lustre_msg_buf(req->rq_reqmsg, 0); memcpy(&body->lock_handle1, &lock->l_remote_handle, sizeof(body->lock_handle1)); - + req->rq_replen = lustre_msg_size(0, NULL); - + rc = ptlrpc_queue_wait(req); rc = ptlrpc_check_status(req, rc); ptlrpc_free_req(req); if (rc != ELDLM_OK) GOTO(out, rc); - + ldlm_lock_cancel(lock); - } else { + } else { LDLM_DEBUG(lock, "client-side local cancel"); - if (lock->l_resource->lr_namespace->ns_client) { - CERROR("Trying to cancel local lock\n"); + if (lock->l_resource->lr_namespace->ns_client) { + CERROR("Trying to cancel local lock\n"); LBUG(); } - ldlm_lock_cancel(lock); - ldlm_reprocess_all(lock->l_resource); + ldlm_lock_cancel(lock); + ldlm_reprocess_all(lock->l_resource); LDLM_DEBUG(lock, "client-side local cancel handler END"); } diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index fcfca26..fec3c0a 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -15,6 +15,30 @@ kmem_cache_t *ldlm_resource_slab, *ldlm_lock_slab; +spinlock_t ldlm_namespace_lock = SPIN_LOCK_UNLOCKED; +struct list_head ldlm_namespace_list = LIST_HEAD_INIT(ldlm_namespace_list); +static struct proc_dir_entry *ldlm_ns_proc_dir = NULL; + +int ldlm_proc_setup(struct obd_device *obd) +{ + ENTRY; + + if (obd->obd_proc_entry == NULL) + RETURN(-EINVAL); + + ldlm_ns_proc_dir = proc_mkdir("namespaces", obd->obd_proc_entry); + if (ldlm_ns_proc_dir == NULL) { + CERROR("Couldn't create /proc/lustre/ldlm/namespaces\n"); + RETURN(-EPERM); + } + RETURN(0); +} + +void ldlm_proc_cleanup(struct obd_device *obd) +{ + proc_lustre_remove_obd_entry("namespaces", obd); +} + struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client) { struct ldlm_namespace *ns = NULL; @@ -50,6 +74,14 @@ struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client) for (bucket = ns->ns_hash + RES_HASH_SIZE - 1; bucket >= ns->ns_hash; bucket--) INIT_LIST_HEAD(bucket); + + spin_lock(&ldlm_namespace_lock); + list_add(&ns->ns_list_chain, &ldlm_namespace_list); + ns->ns_proc_dir = proc_mkdir(ns->ns_name, ldlm_ns_proc_dir); + if (ns->ns_proc_dir == NULL) + CERROR("Unable to create proc directory for namespace.\n"); + spin_unlock(&ldlm_namespace_lock); + RETURN(ns); out: @@ -104,6 +136,11 @@ int ldlm_namespace_free(struct ldlm_namespace *ns) if (!ns) RETURN(ELDLM_OK); + spin_lock(&ldlm_namespace_lock); + list_del(&ns->ns_list_chain); + remove_proc_entry(ns->ns_name, ldlm_ns_proc_dir); + spin_unlock(&ldlm_namespace_lock); + l_lock(&ns->ns_lock); for (i = 0; i < RES_HASH_SIZE; i++) { @@ -201,10 +238,9 @@ static struct ldlm_resource *ldlm_resource_add(struct ldlm_namespace *ns, bucket = ns->ns_hash + ldlm_hash_fn(parent, name); list_add(&res->lr_hash, bucket); - if (parent == NULL) { - res->lr_parent = res; - list_add(&res->lr_rootlink, &ns->ns_root_list); - } else { + if (parent == NULL) + list_add(&res->lr_childof, &ns->ns_root_list); + else { res->lr_parent = parent; list_add(&res->lr_childof, &parent->lr_children); } @@ -288,7 +324,6 @@ int ldlm_resource_put(struct ldlm_resource *res) ns->ns_refcount--; list_del(&res->lr_hash); - list_del(&res->lr_rootlink); list_del(&res->lr_childof); kmem_cache_free(ldlm_resource_slab, res); @@ -333,6 +368,40 @@ void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc) memcpy(desc->lr_version, res->lr_version, sizeof(desc->lr_version)); } +void ldlm_dump_all_namespaces(void) +{ + struct list_head *tmp; + + spin_lock(&ldlm_namespace_lock); + + list_for_each(tmp, &ldlm_namespace_list) { + struct ldlm_namespace *ns; + ns = list_entry(tmp, struct ldlm_namespace, ns_list_chain); + ldlm_namespace_dump(ns); + } + + spin_unlock(&ldlm_namespace_lock); +} + +void ldlm_namespace_dump(struct ldlm_namespace *ns) +{ + struct list_head *tmp; + + l_lock(&ns->ns_lock); + CDEBUG(D_OTHER, "--- Namespace: %s (rc: %d, client: %d)\n", ns->ns_name, + ns->ns_refcount, ns->ns_client); + + list_for_each(tmp, &ns->ns_root_list) { + struct ldlm_resource *res; + res = list_entry(tmp, struct ldlm_resource, lr_childof); + + /* Once we have resources with children, this should really dump + * them recursively. */ + ldlm_resource_dump(res); + } + l_unlock(&ns->ns_lock); +} + void ldlm_resource_dump(struct ldlm_resource *res) { struct list_head *tmp; diff --git a/lustre/ldlm/ldlm_test.c b/lustre/ldlm/ldlm_test.c index 6b5daba..c0c8f13 100644 --- a/lustre/ldlm/ldlm_test.c +++ b/lustre/ldlm/ldlm_test.c @@ -27,21 +27,13 @@ static spinlock_t ctl_lock = SPIN_LOCK_UNLOCKED; static struct list_head ctl_threads; static int regression_running = 0; -static int ldlm_test_callback(struct lustre_handle *lockh, - struct ldlm_lock_desc *new, - void *data, __u32 data_len) +static int ldlm_blocking_ast(struct ldlm_lock *lock, + struct ldlm_lock_desc *new, + void *data, __u32 data_len) { - struct ldlm_lock *lock; ENTRY; - - lock = ldlm_handle2lock(lockh); - if (lock == NULL) { - CERROR("invalid handle in callback\n"); - RETURN(0); - } - - printk("ldlm_test_callback: lock=%Lu, new=%p\n", lockh->addr, new); - return 0; + CERROR("ldlm_blocking_ast: lock=%p, new=%p\n", lock, new); + RETURN(0); } int ldlm_test_basics(struct obd_device *obddev) @@ -61,7 +53,7 @@ int ldlm_test_basics(struct obd_device *obddev) if (lock1 == NULL) LBUG(); err = ldlm_lock_enqueue(lock1, NULL, 0, &flags, - ldlm_completion_ast, ldlm_test_callback); + ldlm_completion_ast, ldlm_blocking_ast); if (err != ELDLM_OK) LBUG(); @@ -69,7 +61,7 @@ int ldlm_test_basics(struct obd_device *obddev) if (lock == NULL) LBUG(); err = ldlm_lock_enqueue(lock, NULL, 0, &flags, - ldlm_completion_ast, ldlm_test_callback); + ldlm_completion_ast, ldlm_blocking_ast); if (err != ELDLM_OK) LBUG(); if (!(flags & LDLM_FL_BLOCK_GRANTED)) @@ -169,9 +161,9 @@ static int ldlm_test_network(struct obd_device *obddev, /* FIXME: this needs a connh as 3rd paramter, before it will work */ - err = ldlm_cli_enqueue(NULL, NULL, obddev->obd_namespace, NULL, res_id, LDLM_EXTENT, - &ext, sizeof(ext), LCK_PR, &flags, NULL, NULL, NULL, 0, - &lockh1); + err = ldlm_cli_enqueue(NULL, NULL, obddev->obd_namespace, NULL, res_id, + LDLM_EXTENT, &ext, sizeof(ext), LCK_PR, &flags, + NULL, NULL, NULL, 0, &lockh1); CERROR("ldlm_cli_enqueue: %d\n", err); if (err == ELDLM_OK) ldlm_lock_decref(&lockh1, LCK_PR); @@ -221,8 +213,8 @@ static int ldlm_test_main(void *data) rc = ldlm_cli_enqueue(NULL, NULL, ns, NULL, res_id, LDLM_PLAIN, NULL, 0, lock_mode, - &flags, ldlm_completion_ast, ldlm_test_callback, NULL, 0, - &lockh); + &flags, ldlm_completion_ast, + ldlm_blocking_ast, NULL, 0, &lockh); if (rc < 0) { CERROR("ldlm_cli_enqueue: %d\n", rc); LBUG(); diff --git a/lustre/lib/l_net.c b/lustre/lib/l_net.c index 4c4a016..27cd3ef 100644 --- a/lustre/lib/l_net.c +++ b/lustre/lib/l_net.c @@ -38,7 +38,7 @@ #include #include -struct client_obd *client_conn2cli(struct lustre_handle *conn) +struct client_obd *client_conn2cli(struct lustre_handle *conn) { struct obd_export *export = class_conn2export(conn); if (!export) @@ -85,7 +85,7 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf) mdc->cl_conn = ptlrpc_uuid_to_connection(server_uuid); if (!mdc->cl_conn) - RETURN(-ENOENT); + RETURN(-ENOENT); OBD_ALLOC(mdc->cl_client, sizeof(*mdc->cl_client)); if (mdc->cl_client == NULL) @@ -133,36 +133,37 @@ int client_obd_connect(struct lustre_handle *conn, struct obd_device *obd) { struct client_obd *cli = &obd->u.cli; struct ptlrpc_request *request; - int rc, size[] = {sizeof(cli->cl_target_uuid), + int rc, size[] = {sizeof(cli->cl_target_uuid), sizeof(obd->obd_uuid) }; char *tmp[] = {cli->cl_target_uuid, obd->obd_uuid}; int rq_opc = (obd->obd_type->typ_ops->o_getattr) ? OST_CONNECT : MDS_CONNECT; ENTRY; - down(&cli->cl_sem); + down(&cli->cl_sem); MOD_INC_USE_COUNT; rc = class_connect(conn, obd); - if (!rc) + if (!rc) cli->cl_conn_count++; - else { + else { MOD_DEC_USE_COUNT; up(&cli->cl_sem); RETURN(rc); } - - if (cli->cl_conn_count > 1) { + + if (cli->cl_conn_count > 1) { up(&cli->cl_sem); RETURN(0); } - obd->obd_namespace = - ldlm_namespace_new("cli", LDLM_NAMESPACE_CLIENT); - if (obd->obd_namespace == NULL) { + obd->obd_namespace = ldlm_namespace_new(obd->obd_name, + LDLM_NAMESPACE_CLIENT); + if (obd->obd_namespace == NULL) { up(&cli->cl_sem); RETURN(-ENOMEM); } - request = ptlrpc_prep_req(cli->cl_client, cli->cl_conn, rq_opc, 2, size, tmp); + request = ptlrpc_prep_req(cli->cl_client, cli->cl_conn, rq_opc, 2, size, + tmp); if (!request) GOTO(out_disco, -ENOMEM); @@ -202,18 +203,18 @@ int client_obd_disconnect(struct lustre_handle *conn) ENTRY; down(&obd->u.cli.cl_sem); - if (!obd->u.cli.cl_conn_count) { - CERROR("disconnecting disconnected device (%s)\n", - obd->obd_name); + if (!obd->u.cli.cl_conn_count) { + CERROR("disconnecting disconnected device (%s)\n", + obd->obd_name); RETURN(0); } - obd->u.cli.cl_conn_count--; + obd->u.cli.cl_conn_count--; if (obd->u.cli.cl_conn_count) GOTO(class_only, 0); ldlm_namespace_free(obd->obd_namespace); - obd->obd_namespace = NULL; + obd->obd_namespace = NULL; request = ptlrpc_prep_req2(conn, rq_opc, 0, NULL, NULL); if (!request) RETURN(-ENOMEM); @@ -221,14 +222,14 @@ int client_obd_disconnect(struct lustre_handle *conn) request->rq_replen = lustre_msg_size(0, NULL); rc = ptlrpc_queue_wait(request); - if (rc) + if (rc) GOTO(out, rc); class_only: rc = class_disconnect(conn); if (!rc) MOD_DEC_USE_COUNT; out: - if (request) + if (request) ptlrpc_free_req(request); up(&obd->u.cli.cl_sem); RETURN(rc); diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 22c198a..19ff845 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -48,23 +48,32 @@ static int ll_file_open(struct inode *inode, struct file *file) /* delayed create of object (intent created inode) */ /* XXX object needs to be cleaned up if mdc_open fails */ /* XXX error handling appropriate here? */ - if (lli->lli_smd == NULL || lli->lli_smd->lmd_object_id == 0) { + if (lli->lli_smd == NULL) { struct client_obd *mdc = sbi2mdc(ll_s2sbi(inode->i_sb)); struct inode * inode = file->f_dentry->d_inode; - lli->lli_smd = NULL; - oa = obdo_alloc(); - if (!oa) { - RETURN(-ENOMEM); + down(&lli->lli_open_sem); + /* Check to see if we lost the race */ + if (lli->lli_smd == NULL) { + oa = obdo_alloc(); + if (!oa) { + up(&lli->lli_open_sem); + RETURN(-ENOMEM); + } + oa->o_mode = S_IFREG | 0600; + oa->o_easize = mdc->cl_max_mdsize; + oa->o_valid = OBD_MD_FLMODE | OBD_MD_FLEASIZE; + rc = obd_create(ll_i2obdconn(inode), oa, &lli->lli_smd); + if (rc) { + obdo_free(oa); + up(&lli->lli_open_sem); + RETURN(rc); + } + md = lli->lli_smd; } - oa->o_mode = S_IFREG | 0600; - oa->o_easize = mdc->cl_max_mdsize; - oa->o_valid = OBD_MD_FLMODE | OBD_MD_FLEASIZE; - rc = obd_create(ll_i2obdconn(inode), oa, &lli->lli_smd); - if (rc) - RETURN(rc); - md = lli->lli_smd; - lli->lli_flags &= ~OBD_FL_CREATEONOPEN; + if (lli->lli_smd && lli->lli_smd->lmd_object_id == 0) + LBUG(); + up(&lli->lli_open_sem); } fd = kmem_cache_alloc(ll_file_data_slab, SLAB_KERNEL); diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 7a43976..5b1cf5f 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -169,14 +169,17 @@ static struct dentry *ll_lookup2(struct inode * dir, struct dentry *dentry, //else // GOTO(err, it->it_status); } else if (it->it_op & (IT_RENAME | IT_GETATTR | IT_UNLINK | - IT_RMDIR | IT_SETATTR | IT_LOOKUP | - IT_OPEN)) { + IT_RMDIR | IT_SETATTR | IT_LOOKUP)) { /* For remove/check, we want the lookup to succeed */ request = (struct ptlrpc_request *)it->it_data; if (it->it_status) GOTO(negative, NULL); //else // GOTO(err, it->it_status); + } else if (it->it_op == IT_OPEN) { + request = (struct ptlrpc_request *)it->it_data; + if (it->it_status && it->it_status != -EEXIST) + GOTO(negative, NULL); } else if (it->it_op == IT_RENAME2) { /* Set below to be a dentry from the IT_RENAME op */ inode = ((struct dentry *)(it->it_data))->d_inode; @@ -224,7 +227,7 @@ static struct dentry *ll_lookup2(struct inode * dir, struct dentry *dentry, negative: dentry->d_op = &ll_d_ops; d_add(dentry, inode); - if (it->it_op == IT_LOOKUP || (it->it_op == IT_OPEN && it->it_status)) { + if (it->it_op == IT_LOOKUP) { ll_intent_release(dentry); ptlrpc_free_req(request); } @@ -396,7 +399,6 @@ static int ll_create(struct inode * dir, struct dentry * dentry, int mode) if (dentry->d_it->it_disposition) { ii = ll_i2info(inode); - ii->lli_flags |= OBD_FL_CREATEONOPEN; memcpy(&ii->lli_intent_lock_handle, dentry->d_it->it_lock_handle, sizeof(struct lustre_handle)); diff --git a/lustre/llite/super.c b/lustre/llite/super.c index 315c2d3..f49ae28 100644 --- a/lustre/llite/super.c +++ b/lustre/llite/super.c @@ -397,43 +397,47 @@ inline int ll_stripe_md_size(struct super_block *sb) return mdc->cl_max_mdsize; } -static void ll_to_inode(struct inode *dst, struct ll_inode_md *md) +static void ll_read_inode2(struct inode *inode, void *opaque) { + struct ll_inode_md *md = opaque; struct mds_body *body = md->body; - struct ll_inode_info *ii = ll_i2info(dst); + struct ll_inode_info *ii = ll_i2info(inode); + ENTRY; + + sema_init(&ii->lli_open_sem, 1); /* core attributes first */ if (body->valid & OBD_MD_FLID) - dst->i_ino = body->ino; + inode->i_ino = body->ino; if (body->valid & OBD_MD_FLATIME) - dst->i_atime = body->atime; + inode->i_atime = body->atime; if (body->valid & OBD_MD_FLMTIME) - dst->i_mtime = body->mtime; + inode->i_mtime = body->mtime; if (body->valid & OBD_MD_FLCTIME) - dst->i_ctime = body->ctime; + inode->i_ctime = body->ctime; if (body->valid & OBD_MD_FLSIZE) - dst->i_size = body->size; + inode->i_size = body->size; if (body->valid & OBD_MD_FLMODE) - dst->i_mode = body->mode; + inode->i_mode = body->mode; if (body->valid & OBD_MD_FLUID) - dst->i_uid = body->uid; + inode->i_uid = body->uid; if (body->valid & OBD_MD_FLGID) - dst->i_gid = body->gid; + inode->i_gid = body->gid; if (body->valid & OBD_MD_FLFLAGS) - dst->i_flags = body->flags; + inode->i_flags = body->flags; if (body->valid & OBD_MD_FLNLINK) - dst->i_nlink = body->nlink; + inode->i_nlink = body->nlink; if (body->valid & OBD_MD_FLGENER) - dst->i_generation = body->generation; + inode->i_generation = body->generation; if (body->valid & OBD_MD_FLRDEV) - dst->i_rdev = body->extra; + inode->i_rdev = body->extra; //if (body->valid & OBD_MD_FLEASIZE) if (md && md->md && md->md->lmd_stripe_count) { struct lov_stripe_md *smd = md->md; - int size = ll_stripe_md_size(dst->i_sb); + int size = ll_stripe_md_size(inode->i_sb); if (md->md->lmd_easize != size) { CERROR("Striping metadata size error %ld\n", - dst->i_ino); + inode->i_ino); LBUG(); } OBD_ALLOC(ii->lli_smd, size); @@ -443,14 +447,6 @@ static void ll_to_inode(struct inode *dst, struct ll_inode_md *md) } memcpy(ii->lli_smd, smd, size); } -} /* ll_to_inode */ - -static void ll_read_inode2(struct inode *inode, void *opaque) -{ - struct ll_inode_md *md = opaque; - - ENTRY; - ll_to_inode(inode, md); /* OIDEBUG(inode); */ @@ -468,8 +464,7 @@ static void ll_read_inode2(struct inode *inode, void *opaque) inode->i_op = &ll_fast_symlink_inode_operations; EXIT; } else { - init_special_inode(inode, inode->i_mode, - ((int *)ll_i2info(inode)->lli_inline)[0]); + init_special_inode(inode, inode->i_mode, inode->i_rdev); EXIT; } diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index e6de13e..bf725c2 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -187,19 +187,14 @@ int mdc_getattr(struct lustre_handle *conn, return rc; } -static int mdc_lock_callback(struct lustre_handle *lockh, - struct ldlm_lock_desc *desc, void *data, - int data_len, struct ptlrpc_request **req) +static int mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, + void *data, __u32 data_len) { int rc; struct inode *inode = data; + struct lustre_handle lockh; ENTRY; - if (desc == NULL) { - /* Completion AST. Do nothing. */ - RETURN(0); - } - if (data_len != sizeof(*inode)) { CERROR("data_len should be %d, but is %d\n", sizeof(*inode), data_len); @@ -215,7 +210,8 @@ static int mdc_lock_callback(struct lustre_handle *lockh, invalidate_inode_pages(inode); } - rc = ldlm_cli_cancel(lockh); + ldlm_lock2handle(lock, &lockh); + rc = ldlm_cli_cancel(&lockh); if (rc < 0) { CERROR("ldlm_cli_cancel: %d\n", rc); LBUG(); @@ -340,10 +336,15 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type, RETURN(-EINVAL); } #warning FIXME: the data here needs to be different if a lock was granted for a different inode - rc = ldlm_cli_enqueue(conn, req, obddev->obd_namespace, NULL, res_id, lock_type, - NULL, 0, lock_mode, &flags, ldlm_completion_ast, - (void *)mdc_lock_callback, data, datalen, lockh); - if (rc == -ENOENT || rc == ELDLM_LOCK_ABORTED) { + rc = ldlm_cli_enqueue(conn, req, obddev->obd_namespace, NULL, res_id, + lock_type, NULL, 0, lock_mode, &flags, + ldlm_completion_ast, mdc_blocking_ast, data, + datalen, lockh); + if (rc == -ENOENT) { + /* This can go when we're sure that this can never happen */ + LBUG(); + } + if (rc == ELDLM_LOCK_ABORTED) { lock_mode = 0; memset(lockh, 0, sizeof(*lockh)); /* rc = 0 */ diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 1d66464..bdede83 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -94,9 +94,9 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, } /* 'dir' is a inode for which a lock has already been taken */ -struct dentry *mds_name2locked_dentry(struct obd_device *obd, struct dentry *dir, - struct vfsmount **mnt, char *name, - int namelen, int lock_mode, +struct dentry *mds_name2locked_dentry(struct obd_device *obd, + struct dentry *dir, struct vfsmount **mnt, + char *name, int namelen, int lock_mode, struct lustre_handle *lockh, int dir_lock_mode) { @@ -124,8 +124,8 @@ struct dentry *mds_name2locked_dentry(struct obd_device *obd, struct dentry *dir res_id[0] = dchild->d_inode->i_ino; rc = ldlm_match_or_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id, LDLM_PLAIN, NULL, 0, lock_mode, - &flags, ldlm_completion_ast, (void *)mds_lock_callback, NULL, - 0, lockh); + &flags, ldlm_completion_ast, + mds_blocking_ast, NULL, 0, lockh); if (rc != ELDLM_OK) { l_dput(dchild); up(&dir->d_inode->i_sem); @@ -151,8 +151,8 @@ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, res_id[0] = de->d_inode->i_ino; rc = ldlm_match_or_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id, LDLM_PLAIN, NULL, 0, lock_mode, - &flags, ldlm_completion_ast, (void *)mds_lock_callback, NULL, - 0, lockh); + &flags, ldlm_completion_ast, + mds_blocking_ast, NULL, 0, lockh); if (rc != ELDLM_OK) { l_dput(de); retval = ERR_PTR(-ENOLCK); /* XXX translate ldlm code */ @@ -349,17 +349,14 @@ static int mds_getlovinfo(struct ptlrpc_request *req) RETURN(0); } -int mds_lock_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *desc, - void *data, int data_len, struct ptlrpc_request **reqp) +int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, + void *data, __u32 data_len) { + struct lustre_handle lockh; ENTRY; - if (desc == NULL) { - /* Completion AST. Do nothing */ - RETURN(0); - } - - if (ldlm_cli_cancel(lockh) < 0) + ldlm_lock2handle(lock, &lockh); + if (ldlm_cli_cancel(&lockh) < 0) LBUG(); RETURN(0); } @@ -412,8 +409,8 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req) LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]); rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id, LDLM_PLAIN, NULL, 0, lock_mode, - &flags, ldlm_completion_ast, (void *)mds_lock_callback, - NULL, 0, &lockh); + &flags, ldlm_completion_ast, + mds_blocking_ast, NULL, 0, &lockh); if (rc != ELDLM_OK) { CERROR("lock enqueue: err: %d\n", rc); GOTO(out_create_de, rc = -EIO); @@ -892,14 +889,13 @@ int mds_handle(struct ptlrpc_request *req) if (rc) break; RETURN(0); - case LDLM_CALLBACK: + case LDLM_BL_CALLBACK: + case LDLM_CP_CALLBACK: CDEBUG(D_INODE, "callback\n"); CERROR("callbacks should not happen on MDS\n"); LBUG(); - OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0); + OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0); break; - - default: rc = ptlrpc_error(req->rq_svc, req); RETURN(rc); @@ -1142,7 +1138,8 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie, it->opc = NTOH__u64(it->opc); - LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc)); + LDLM_DEBUG(lock, "intent policy, opc: %s", + ldlm_it2str(it->opc)); /* prepare reply */ switch((long)it->opc) { @@ -1183,8 +1180,15 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie, /* execute policy */ switch ((long)it->opc) { - case IT_CREAT: case IT_CREAT|IT_OPEN: + rc = mds_reint(2, req); + if (rc || (req->rq_status != 0 && + req->rq_status != -EEXIST)) { + rep->lock_policy_res2 = req->rq_status; + RETURN(ELDLM_LOCK_ABORTED); + } + break; + case IT_CREAT: case IT_LINK: case IT_MKDIR: case IT_MKNOD: @@ -1259,7 +1263,7 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie, extern int mds_iocontrol(long cmd, struct lustre_handle *conn, - int len, void *karg, void *uarg); + int len, void *karg, void *uarg); /* use obd ops to offer management infrastructure */ static struct obd_ops mds_obd_ops = { diff --git a/lustre/mds/mds_extN.c b/lustre/mds/mds_extN.c index f9cfb36..f9ee992 100644 --- a/lustre/mds/mds_extN.c +++ b/lustre/mds/mds_extN.c @@ -98,12 +98,10 @@ static int mds_extN_setattr(struct dentry *dentry, void *handle, } static int mds_extN_set_md(struct inode *inode, void *handle, - struct lov_stripe_md *md) + struct lov_stripe_md *md) { int rc; - - lock_kernel(); down(&inode->i_sem); if (md == NULL) @@ -118,9 +116,11 @@ static int mds_extN_set_md(struct inode *inode, void *handle, up(&inode->i_sem); unlock_kernel(); - if (rc) - CERROR("error adding objectid %Ld to inode %ld\n", - (unsigned long long)md->lmd_object_id, inode->i_ino); + if (rc) { + CERROR("error adding objectid %Ld to inode %ld: %d\n", + (unsigned long long)md->lmd_object_id, inode->i_ino, rc); + LBUG(); + } return rc; } diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index e18be7f..c63ebc6 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -128,17 +128,17 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset, } EXIT; - out_unlock: + out_unlock: unlock_kernel(); - out_setattr_de: + out_setattr_de: l_dput(de); - out_setattr: + out_setattr: req->rq_status = rc; - return(0); + return (0); } static int mds_reint_recreate(struct mds_update_record *rec, int offset, - struct ptlrpc_request *req) + struct ptlrpc_request *req) { struct dentry *de = NULL; struct mds_obd *mds = mds_req2mds(req); @@ -177,10 +177,10 @@ static int mds_reint_recreate(struct mds_update_record *rec, int offset, LBUG(); } -out_create_dchild: + out_create_dchild: l_dput(dchild); up(&dir->i_sem); -out_create_de: + out_create_de: l_dput(de); req->rq_status = rc; return 0; @@ -262,7 +262,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, } switch (type) { - case S_IFREG: { + case S_IFREG:{ handle = mds_fs_start(mds, dir, MDS_FSOP_CREATE); if (!handle) GOTO(out_create_dchild, PTR_ERR(handle)); @@ -270,7 +270,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, EXIT; break; } - case S_IFDIR: { + case S_IFDIR:{ handle = mds_fs_start(mds, dir, MDS_FSOP_MKDIR); if (!handle) GOTO(out_create_dchild, PTR_ERR(handle)); @@ -278,7 +278,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, EXIT; break; } - case S_IFLNK: { + case S_IFLNK:{ handle = mds_fs_start(mds, dir, MDS_FSOP_SYMLINK); if (!handle) GOTO(out_create_dchild, PTR_ERR(handle)); @@ -289,7 +289,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, case S_IFCHR: case S_IFBLK: case S_IFIFO: - case S_IFSOCK: { + case S_IFSOCK:{ int rdev = rec->ur_rdev; handle = mds_fs_start(mds, dir, MDS_FSOP_MKNOD); if (!handle) @@ -299,7 +299,8 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, break; } default: - CERROR("bad file type %o for create of %s\n",type,rec->ur_name); + CERROR("bad file type %o for create of %s\n", type, + rec->ur_name); GOTO(out_create_dchild, rc = -EINVAL); } @@ -351,29 +352,29 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, body->valid = OBD_MD_FLID | OBD_MD_FLGENER; } EXIT; -out_create_commit: + out_create_commit: err = mds_fs_commit(mds, dir, handle); if (err) { CERROR("error on commit: err = %d\n", err); if (!rc) rc = err; } -out_create_dchild: + out_create_dchild: l_dput(dchild); ldlm_lock_decref(&lockh, lock_mode); -out_create_de: + out_create_de: up(&dir->i_sem); l_dput(de); -out_create: + out_create: req->rq_status = rc; return 0; -out_create_unlink: + out_create_unlink: /* Destroy the file we just created. This should not need extra * journal credits, as we have already modified all of the blocks * needed in order to create the file in the first place. */ - switch(type) { + switch (type) { case S_IFDIR: err = vfs_rmdir(dir, dchild); if (err) @@ -407,8 +408,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, /* a name was supplied by the client; fid1 is the directory */ lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW; - de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, lock_mode, - &lockh); + de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, lock_mode, &lockh); if (IS_ERR(de)) { LBUG(); RETURN(PTR_ERR(de)); @@ -417,7 +417,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, name = lustre_msg_buf(req->rq_reqmsg, offset + 1); namelen = req->rq_reqmsg->buflens[offset + 1] - 1; dchild = mds_name2locked_dentry(obd, de, NULL, name, namelen, - LCK_EX, &child_lockh, lock_mode); + LCK_EX, &child_lockh, lock_mode); if (IS_ERR(dchild) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK)) { LBUG(); @@ -431,10 +431,10 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, if (!inode) { CDEBUG(D_INODE, "child doesn't exist (dir %ld, name %s\n", dir->i_ino, rec->ur_name); - /* XXX should be out_unlink_cancel, or do we keep child_lockh?*/ + /* XXX should be out_unlink_cancel, or do we keep child_lockh? */ GOTO(out_unlink_dchild, rc = -ENOENT); } else if (offset) { - struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1); + struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1); mds_pack_inode2fid(&body->fid1, inode); mds_pack_inode2body(body, inode); } @@ -456,7 +456,8 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, md = lustre_msg_buf(req->rq_repmsg, 2); md->lmd_easize = mds->mds_max_mdsize; if ((rc = mds_fs_get_md(mds, inode, md)) < 0) { - CDEBUG(D_INFO, "No md for ino %ld: rc = %d\n", + CDEBUG(D_INFO, + "No md for ino %ld: rc = %d\n", inode->i_ino, rc); memset(md, 0, md->lmd_easize); } @@ -481,18 +482,18 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, EXIT; - out_unlink_cancel: + out_unlink_cancel: ldlm_lock_decref(&child_lockh, LCK_EX); err = ldlm_cli_cancel(&child_lockh); if (err < 0) { CERROR("failed to cancel child inode lock: err = %d\n", err); if (!rc) - rc = -ENOLCK; /*XXX translate LDLM lock error */ + rc = -ENOLCK; /*XXX translate LDLM lock error */ } -out_unlink_dchild: + out_unlink_dchild: l_dput(dchild); up(&dir->i_sem); -out_unlink: + out_unlink: ldlm_lock_decref(&lockh, lock_mode); l_dput(de); req->rq_status = rc; @@ -500,7 +501,7 @@ out_unlink: } static int mds_reint_link(struct mds_update_record *rec, int offset, - struct ptlrpc_request *req) + struct ptlrpc_request *req) { struct dentry *de_src = NULL; struct dentry *de_tgt_dir = NULL; @@ -554,14 +555,14 @@ static int mds_reint_link(struct mds_update_record *rec, int offset, } EXIT; -out_link_dchild: + out_link_dchild: l_dput(dchild); -out_link_de_tgt_dir: + out_link_de_tgt_dir: up(&de_tgt_dir->d_inode->i_sem); l_dput(de_tgt_dir); -out_link_de_src: + out_link_de_src: l_dput(de_src); -out_link: + out_link: req->rq_status = rc; return 0; } @@ -578,7 +579,7 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset, struct lustre_handle tgtlockh, srclockh, oldhandle; int flags, lock_mode, rc = 0, err; void *handle; - __u64 res_id[3] = {0}; + __u64 res_id[3] = { 0 }; ENTRY; de_srcdir = mds_fid2dentry(mds, rec->ur_fid1, NULL); @@ -589,13 +590,13 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset, res_id[0] = de_srcdir->d_inode->i_ino; rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN, - NULL, 0, lock_mode, &srclockh); + NULL, 0, lock_mode, &srclockh); if (rc == 0) { LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]); rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id, LDLM_PLAIN, NULL, 0, lock_mode, - &flags, ldlm_completion_ast, (void *)mds_lock_callback, NULL, - 0, &srclockh); + &flags, ldlm_completion_ast, + mds_blocking_ast, NULL, 0, &srclockh); if (rc != ELDLM_OK) { CERROR("lock enqueue: err: %d\n", rc); GOTO(out_rename_srcput, rc = -EIO); @@ -611,13 +612,13 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset, res_id[0] = de_tgtdir->d_inode->i_ino; rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN, - NULL, 0, lock_mode, &tgtlockh); + NULL, 0, lock_mode, &tgtlockh); if (rc == 0) { LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]); rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id, LDLM_PLAIN, NULL, 0, lock_mode, - &flags, ldlm_completion_ast, (void *)mds_lock_callback, NULL, - 0, &tgtlockh); + &flags, ldlm_completion_ast, + mds_blocking_ast, NULL, 0, &tgtlockh); if (rc != ELDLM_OK) { CERROR("lock enqueue: err: %d\n", rc); GOTO(out_rename_tgtput, rc = -EIO); @@ -625,7 +626,7 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset, } else ldlm_lock_dump((void *)(unsigned long)tgtlockh.addr); - double_lock(de_tgtdir, de_srcdir); + double_lock(de_tgtdir, de_srcdir); de_old = lookup_one_len(rec->ur_name, de_srcdir, rec->ur_namelen - 1); if (IS_ERR(de_old)) { @@ -663,21 +664,20 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset, } EXIT; -out_rename_denew: + out_rename_denew: l_dput(de_new); -out_rename_deold: - if (!rc) { + out_rename_deold: + if (!rc) { res_id[0] = de_old->d_inode->i_ino; /* Take an exclusive lock on the resource that we're * about to free, to force everyone to drop their * locks. */ LDLM_DEBUG_NOLOCK("getting EX lock res %Lu", res_id[0]); - rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, - res_id, - LDLM_PLAIN, NULL, 0, LCK_EX, &flags, - ldlm_completion_ast, (void *)mds_lock_callback, NULL, 0, - &oldhandle); - if (rc) + rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, + res_id, LDLM_PLAIN, NULL, 0, LCK_EX, + &flags, ldlm_completion_ast, + mds_blocking_ast, NULL, 0, &oldhandle); + if (rc) CERROR("failed to get child inode lock (child ino %Ld, " "dir ino %ld)\n", res_id[0], de_old->d_inode->i_ino); @@ -685,37 +685,37 @@ out_rename_deold: l_dput(de_old); - if (!rc) { + if (!rc) { ldlm_lock_decref(&oldhandle, LCK_EX); rc = ldlm_cli_cancel(&oldhandle); if (rc < 0) CERROR("failed to cancel child inode lock ino " "%Ld: %d\n", res_id[0], rc); } - out_rename_tgtdir: + out_rename_tgtdir: double_up(&de_srcdir->d_inode->i_sem, &de_tgtdir->d_inode->i_sem); ldlm_lock_decref(&tgtlockh, lock_mode); - out_rename_tgtput: + out_rename_tgtput: l_dput(de_tgtdir); - out_rename_srcdir: + out_rename_srcdir: ldlm_lock_decref(&srclockh, lock_mode); - out_rename_srcput: + out_rename_srcput: l_dput(de_srcdir); - out_rename: + out_rename: req->rq_status = rc; return 0; } -typedef int (*mds_reinter)(struct mds_update_record *, int offset, - struct ptlrpc_request *); +typedef int (*mds_reinter) (struct mds_update_record *, int offset, + struct ptlrpc_request *); -static mds_reinter reinters[REINT_MAX+1] = { - [REINT_SETATTR] mds_reint_setattr, - [REINT_CREATE] mds_reint_create, - [REINT_UNLINK] mds_reint_unlink, - [REINT_LINK] mds_reint_link, - [REINT_RENAME] mds_reint_rename, - [REINT_RECREATE] mds_reint_recreate, +static mds_reinter reinters[REINT_MAX + 1] = { + [REINT_SETATTR] mds_reint_setattr, + [REINT_CREATE] mds_reint_create, + [REINT_UNLINK] mds_reint_unlink, + [REINT_LINK] mds_reint_link, + [REINT_RENAME] mds_reint_rename, + [REINT_RECREATE] mds_reint_recreate, }; int mds_reint_rec(struct mds_update_record *rec, int offset, @@ -733,7 +733,7 @@ int mds_reint_rec(struct mds_update_record *rec, int offset, } push_ctxt(&saved, &mds->mds_ctxt); - rc = reinters[rec->ur_opcode](rec, offset, req); + rc = reinters[rec->ur_opcode] (rec, offset, req); pop_ctxt(&saved); return rc; diff --git a/lustre/obdclass/class_obd.c b/lustre/obdclass/class_obd.c index 1dcf9b7..8b2a80f 100644 --- a/lustre/obdclass/class_obd.c +++ b/lustre/obdclass/class_obd.c @@ -301,8 +301,8 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp, LBUG(); } memcpy(obd->obd_name, data->ioc_inlbuf2, len); - //obd->obd_proc_entry = - // proc_lustre_register_obd_device(obd); + obd->obd_proc_entry = + proc_lustre_register_obd_device(obd); } else { CERROR("WARNING: unnamed obd device\n"); obd->obd_proc_entry = NULL; @@ -348,8 +348,8 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp, obd->obd_name = NULL; } - //if (obd->obd_proc_entry) - // proc_lustre_release_obd_device(obd); + if (obd->obd_proc_entry) + proc_lustre_release_obd_device(obd); obd->obd_flags &= ~OBD_ATTACHED; obd->obd_type->typ_refcnt--; diff --git a/lustre/obdclass/proc_lustre.c b/lustre/obdclass/proc_lustre.c index 34601a5..979467c 100644 --- a/lustre/obdclass/proc_lustre.c +++ b/lustre/obdclass/proc_lustre.c @@ -1,27 +1,42 @@ -/* proc_lustre.c manages /proc/lustre/obd. +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * proc_lustre.c manages /proc/lustre * * Copyright (c) 2001 Rumi Zahir * - * This code is issued under the GNU General Public License. - * See the file COPYING in this distribution + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. * - * OBD devices materialize in /proc as a directory: + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + +/* OBD devices materialize in /proc as a directory: * /proc/lustre/obd/ - * when /dev/obd is opened. When the device is closed, the - * directory entry disappears. - * + * when /dev/obd is opened. When the device is closed, the + * directory entry disappears. + * * For each open OBD device, code in this file also creates a file - * named . "cat /proc/lustre/obd//status" gives + * named . "cat /proc/lustre/obd//status" gives * information about the OBD device's configuration. * The class driver manages the "status" entry. * * Other logical drivers can create their own entries. For example, * the obdtrace driver creates /proc/lustre/obd//stats entry. * - * This file defines three functions + * This file defines three functions * proc_lustre_register_obd_device() - called at device attach time * proc_lustre_release_obd_device() - called at detach - * proc_lustre_remove_obd_entry() + * proc_lustre_remove_obd_entry() * that dynamically create/delete /proc/lustre/obd entries: * * proc_lustre_register_obd_device() registers an obd device, @@ -33,7 +48,7 @@ * * proc_lustre_remove_obd_entry() removes a * /proc/lustre/obd// entry by name. This is the only - * function that is exported to other modules. + * function that is exported to other modules. */ #define EXPORT_SYMTAB @@ -49,124 +64,116 @@ #ifdef CONFIG_PROC_FS extern struct proc_dir_entry proc_root; -static struct proc_dir_entry *proc_lustre_dir_entry = 0; -static struct proc_dir_entry *proc_lustre_obd_dir_entry = 0; - +static struct proc_dir_entry *proc_lustre_dir_entry = NULL; +static struct proc_dir_entry *proc_lustre_obd_dir_entry = NULL; static int read_lustre_status(char *page, char **start, off_t offset, - int count, int *eof, void *data) + int count, int *eof, void *data) { - struct obd_device * obddev = (struct obd_device *) data; - int p; + struct obd_device * obddev = (struct obd_device *)data; + int p; + +#warning FIXME: This function is madness, completely unsafe, a disaster waiting to happen. - p = sprintf(&page[0], "device=%d\n", obddev->obd_minor); - p += sprintf(&page[0], "name=%s\n", MKSTR(obddev->obd_name)); - p += sprintf(&page[0], "uuid=%s\n", obddev->obd_uuid); + p = sprintf(&page[0], "device=%d\n", obddev->obd_minor); + p += sprintf(&page[p], "name=%s\n", MKSTR(obddev->obd_name)); + p += sprintf(&page[p], "uuid=%s\n", obddev->obd_uuid); p += sprintf(&page[p], "attached=1\n"); - p += sprintf(&page[0], "type=%s\n", MKSTR(obddev->obd_type->typ_name)); + p += sprintf(&page[p], "type=%s\n", MKSTR(obddev->obd_type->typ_name)); - if (obddev->obd_flags & OBD_SET_UP) { + if (obddev->obd_flags & OBD_SET_UP) p += sprintf(&page[p], "setup=1\n"); - } /* print exports */ { - struct list_head * lh; - struct obd_export * export=0; + struct list_head *lh; + struct obd_export *export = NULL; lh = &obddev->obd_exports; while ((lh = lh->next) != &obddev->obd_exports) { p += sprintf(&page[p], - ((export==0) ? ", connections(" : ",") ); + ((export == NULL) ? ", connections(" : ",") ); export = list_entry(lh, struct obd_export, exp_chain); p += sprintf(&page[p], "%p", export); } - if (export!=0) { /* there was at least one export */ + if (export != 0) { /* there was at least one export */ p += sprintf(&page[p], ")"); } } p += sprintf(&page[p], "\n"); - /* Compute eof and return value */ - if (offset + count >= p) { - *eof=1; - return (p - offset); - } - return count; + /* Compute eof and return value */ + if (offset + count >= p) { + *eof = 1; + return (p - offset); + } + return count; } -struct proc_dir_entry * -proc_lustre_register_obd_device(struct obd_device *obd) +struct proc_dir_entry *proc_lustre_register_obd_device(struct obd_device *obd) { - char obdname[32]; - struct proc_dir_entry *obd_dir; - struct proc_dir_entry *obd_status = 0; - - if (!proc_lustre_dir_entry) { - proc_lustre_dir_entry = proc_mkdir("lustre", &proc_root); - if (IS_ERR(proc_lustre_dir_entry)) - return 0; - - proc_lustre_obd_dir_entry = - proc_mkdir("devices", proc_lustre_dir_entry); - if (IS_ERR(proc_lustre_obd_dir_entry)) - return 0; - } - sprintf(obdname, "%s", obd->obd_name); - obd_dir = proc_mkdir(obdname, proc_lustre_obd_dir_entry); - - if (obd_dir) - obd_status = create_proc_entry("status", S_IRUSR | S_IFREG, + struct proc_dir_entry *obd_dir; + struct proc_dir_entry *obd_status = NULL; + + if (!proc_lustre_dir_entry) { + proc_lustre_dir_entry = proc_mkdir("lustre", &proc_root); + if (IS_ERR(proc_lustre_dir_entry)) + return 0; + + proc_lustre_obd_dir_entry = + proc_mkdir("devices", proc_lustre_dir_entry); + if (IS_ERR(proc_lustre_obd_dir_entry)) + return 0; + } + obd_dir = proc_mkdir(obd->obd_name, proc_lustre_obd_dir_entry); + + if (obd_dir) + obd_status = create_proc_entry("status", S_IRUSR | S_IFREG, obd_dir); - if (obd_status) { - obd_status->read_proc = read_lustre_status; - obd_status->data = (void*) obd; - } + if (obd_status) { + obd_status->read_proc = read_lustre_status; + obd_status->data = (void *)obd; + } - return obd_dir; + return obd_dir; } -void proc_lustre_remove_obd_entry(const char* name, struct obd_device *obd) +void proc_lustre_remove_obd_entry(const char *name, struct obd_device *obd) { - struct proc_dir_entry *obd_entry = 0; - struct proc_dir_entry *obd_dir = obd->obd_proc_entry; - - remove_proc_entry(name, obd_dir); - - while (obd_dir->subdir==0) { - /* if we removed last entry in this directory, - * then remove parent directory unless this - * is /proc itself - */ - if (obd_dir == &proc_root) - break; - - obd_entry = obd_dir; - obd_dir = obd_dir->parent; - - /* If /proc/lustre/obd/foo or /proc/lustre/obd or - * /proc/lustre is being removed, then reset - * internal variables - */ - - if (obd_entry == obd->obd_proc_entry) - obd->obd_proc_entry=0; /* /proc/lustre/obd/foo */ - else - if (obd_entry == proc_lustre_obd_dir_entry) - proc_lustre_obd_dir_entry=0; - else - if (obd_entry == proc_lustre_dir_entry) - proc_lustre_dir_entry=0; - - remove_proc_entry(obd_entry->name, obd_dir); - } + struct proc_dir_entry *obd_entry = NULL; + struct proc_dir_entry *obd_dir = obd->obd_proc_entry; + + remove_proc_entry(name, obd_dir); + + while (obd_dir->subdir == NULL) { + /* if we removed last entry in this directory, then + * remove parent directory unless this is /proc itself */ + if (obd_dir == &proc_root) + break; + + obd_entry = obd_dir; + obd_dir = obd_dir->parent; + + /* If /proc/lustre/obd/foo or /proc/lustre/obd or + * /proc/lustre is being removed, then reset internal + * variables */ + + if (obd_entry == obd->obd_proc_entry) + obd->obd_proc_entry = NULL; /* /proc/lustre/obd/foo */ + else if (obd_entry == proc_lustre_obd_dir_entry) + proc_lustre_obd_dir_entry = NULL; /* /proc/lustre/obd */ + else if (obd_entry == proc_lustre_dir_entry) + proc_lustre_dir_entry = NULL; /* /proc/lustre */ + + remove_proc_entry(obd_entry->name, obd_dir); + } } void proc_lustre_release_obd_device(struct obd_device *obd) { - proc_lustre_remove_obd_entry("status", obd); + proc_lustre_remove_obd_entry("status", obd); } @@ -174,23 +181,17 @@ void proc_lustre_release_obd_device(struct obd_device *obd) struct proc_dir_entry *proc_lustre_register_obd_device(struct obd_device *obd) { - return 0; + return 0; } -void proc_lustre_remove_obd_entry(const char* name, struct obd_device *obd) {} -void proc_lustre_release_obd_device(struct obd_device *obd) {} - -#endif /* CONFIG_PROC_FS */ - -EXPORT_SYMBOL(proc_lustre_remove_obd_entry); - - - - - - - - +void proc_lustre_remove_obd_entry(const char* name, struct obd_device *obd) +{ +} +void proc_lustre_release_obd_device(struct obd_device *obd) +{ +} +#endif /* CONFIG_PROC_FS */ +EXPORT_SYMBOL(proc_lustre_remove_obd_entry); diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 22ba1f2..e2ceb0e 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -675,6 +675,11 @@ static int filter_destroy(struct lustre_handle *conn, struct obdo *oa, GOTO(out, rc = -ENOENT); inode = object_dentry->d_inode; + if (inode == NULL) { + CERROR("trying to destroy negative inode %Ld!\n", oa->o_id); + GOTO(out, rc = -ENOENT); + } + if (inode->i_nlink != 1) { CERROR("destroying inode with nlink = %d\n", inode->i_nlink); LBUG(); diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 333ca79..90a2571 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -514,15 +514,13 @@ static int ost_handle(struct ptlrpc_request *req) if (rc) break; RETURN(0); - case LDLM_CALLBACK: + case LDLM_BL_CALLBACK: + case LDLM_CP_CALLBACK: CDEBUG(D_INODE, "callback\n"); - CERROR("callbacks should not happen on MDS\n"); + CERROR("callbacks should not happen on OST\n"); LBUG(); - OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0); + OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0); break; - - - default: req->rq_status = -ENOTSUPP; rc = ptlrpc_error(req->rq_svc, req); diff --git a/lustre/tests/create.pl b/lustre/tests/create.pl index 71b4f95..952dac5 100644 --- a/lustre/tests/create.pl +++ b/lustre/tests/create.pl @@ -4,7 +4,7 @@ my $mtpt = shift || usage(); my $mount_count = shift || usage(); my $i = shift || usage(); my $files = 5; -my $mcreate = 1; # should we use mcreate or open? +my $mcreate = 0; # should we use mcreate or open? sub usage () { print "Usage: $0 \n"; diff --git a/lustre/utils/Makefile.am b/lustre/utils/Makefile.am index 097b4ab..0320ff2 100644 --- a/lustre/utils/Makefile.am +++ b/lustre/utils/Makefile.am @@ -5,10 +5,10 @@ CFLAGS:=-g -O2 -I. -I/usr/include/libxml2 -I/usr/include/glib-1.2 -I$(PORTALS)/i -I/usr/lib/glib/include -I$(srcdir)/../include -Wall KFLAGS:= CPPFLAGS = $(HAVE_LIBREADLINE) -#bdctl_LDADD := $(LIBREADLINE) -lxml2 # -lefence +obdctl_LDADD := $(LIBREADLINE) -lxml2 # -lefence lctl_LDADD := $(LIBREADLINE) -sbin_PROGRAMS = lctl # obdctl -#obdctl_SOURCES = parser.c obdctl.c parser.h +sbin_PROGRAMS = lctl obdctl +obdctl_SOURCES = parser.c obdctl.c parser.h lctl_SOURCES = parser.c network.c device.c debug.c lctl.c lctl.h parser.h include $(top_srcdir)/Rules diff --git a/lustre/utils/obdctl.c b/lustre/utils/obdctl.c index acd785f..6fbd1a7 100644 --- a/lustre/utils/obdctl.c +++ b/lustre/utils/obdctl.c @@ -50,7 +50,7 @@ #include #include -#include /* needed for PAGE_SIZE - rread*/ +#include /* needed for PAGE_SIZE - rread */ #define __KERNEL__ #include @@ -61,13 +61,13 @@ #define SHMEM_STATS 1 #if SHMEM_STATS -#include -#include +# include +# include -#define MAX_SHMEM_COUNT 1024 +# define MAX_SHMEM_COUNT 1024 static long long *shared_counters; -static long long counter_snapshot[2][MAX_SHMEM_COUNT]; -struct timeval prev_time; +static long long counter_snapshot[2][MAX_SHMEM_COUNT]; +struct timeval prev_time; #endif static int jt_newdev(int argc, char **argv); @@ -123,7 +123,7 @@ do { \ */ -char * obdo_print(struct obdo *obd) +char *obdo_print(struct obdo *obd) { char buf[1024]; @@ -141,10 +141,7 @@ char * obdo_print(struct obdo *obd) obd->o_mode, obd->o_uid, obd->o_gid, - obd->o_flags, - obd->o_obdflags, - obd->o_nlink, - obd->o_valid); + obd->o_flags, obd->o_obdflags, obd->o_nlink, obd->o_valid); return strdup(buf); } @@ -199,7 +196,7 @@ static int be_verbose(int verbose, struct timeval *next_time, } /* A negative verbosity means to print at most each X seconds */ - if (verbose < 0 && next_time != NULL && difftime(&now, next_time) >= 0){ + if (verbose < 0 && next_time != NULL && difftime(&now, next_time) >= 0) { next_time->tv_sec = now.tv_sec - verbose; next_time->tv_usec = now.tv_usec; if (next_num) @@ -219,7 +216,7 @@ static int get_verbose(const char *arg) else if (arg[0] == 's' || arg[0] == 'q') verbose = 0; else - verbose = (int) strtoul(arg, NULL, 0); + verbose = (int)strtoul(arg, NULL, 0); if (verbose < 0) printf("Print status every %d seconds\n", -verbose); @@ -235,12 +232,12 @@ static int do_disconnect(char *func, int verbose) { struct obd_ioctl_data data; - if (conn_addr == -1) - return 0; + if (conn_addr == -1) + return 0; IOCINIT(data); - rc = ioctl(fd, OBD_IOC_DISCONNECT , &data); + rc = ioctl(fd, OBD_IOC_DISCONNECT, &data); if (rc < 0) { fprintf(stderr, "error: %s: %x %s\n", cmdname(func), OBD_IOC_DISCONNECT, strerror(errno)); @@ -255,79 +252,75 @@ static int do_disconnect(char *func, int verbose) } #if SHMEM_STATS -static void -shmem_setup () +static void shmem_setup(void) { - int shmid = shmget (IPC_PRIVATE, sizeof (counter_snapshot[0]), 0600); + int shmid = shmget(IPC_PRIVATE, sizeof(counter_snapshot[0]), 0600); - if (shmid == -1) - { - fprintf (stderr, "Can't create shared memory counters: %s\n", strerror (errno)); + if (shmid == -1) { + fprintf(stderr, "Can't create shared memory counters: %s\n", + strerror(errno)); return; } - - shared_counters = (long long *)shmat (shmid, NULL, 0); - if (shared_counters == (long long *)(-1)) - { - fprintf (stderr, "Can't attach shared memory counters: %s\n", strerror (errno)); + shared_counters = (long long *)shmat(shmid, NULL, 0); + + if (shared_counters == (long long *)(-1)) { + fprintf(stderr, "Can't attach shared memory counters: %s\n", + strerror(errno)); shared_counters = NULL; return; } } -static inline void -shmem_reset () +static inline void shmem_reset(void) { if (shared_counters == NULL) return; - - memset (shared_counters, 0, sizeof (counter_snapshot[0])); - memset (counter_snapshot, 0, sizeof (counter_snapshot)); - gettimeofday (&prev_time, NULL); + + memset(shared_counters, 0, sizeof(counter_snapshot[0])); + memset(counter_snapshot, 0, sizeof(counter_snapshot)); + gettimeofday(&prev_time, NULL); } -static inline void -shmem_bump () +static inline void shmem_bump(void) { - if (shared_counters == NULL || - thread <= 0 || thread > MAX_SHMEM_COUNT) + if (shared_counters == NULL || thread <= 0 || thread > MAX_SHMEM_COUNT) return; - + shared_counters[thread - 1]++; } -static void -shmem_snap (int n) +static void shmem_snap(int n) { struct timeval this_time; - int non_zero = 0; - long long total = 0; - double secs; - int i; - + int non_zero = 0; + long long total = 0; + double secs; + int i; + if (shared_counters == NULL || n > MAX_SHMEM_COUNT) return; - memcpy (counter_snapshot[1], counter_snapshot[0], n * sizeof (counter_snapshot[0][0])); - memcpy (counter_snapshot[0], shared_counters, n * sizeof (counter_snapshot[0][0])); - gettimeofday (&this_time, NULL); - - for (i = 0; i < n; i++) - { - long long this_count = counter_snapshot[0][i] - counter_snapshot[1][i]; - - if (this_count != 0) - { + memcpy(counter_snapshot[1], counter_snapshot[0], + n * sizeof(counter_snapshot[0][0])); + memcpy(counter_snapshot[0], shared_counters, + n * sizeof(counter_snapshot[0][0])); + gettimeofday(&this_time, NULL); + + for (i = 0; i < n; i++) { + long long this_count = + counter_snapshot[0][i] - counter_snapshot[1][i]; + + if (this_count != 0) { non_zero++; total += this_count; } } - - secs = (this_time.tv_sec + this_time.tv_usec/1000000.0) - - (prev_time.tv_sec + prev_time.tv_usec/1000000.0); - - printf ("%d/%d Total: %f/second\n", non_zero, n, total / secs); + + secs = (this_time.tv_sec + this_time.tv_usec / 1000000.0) - + (prev_time.tv_sec + prev_time.tv_usec / 1000000.0); + + printf("%d/%d Total: %f/second\n", non_zero, n, total / secs); prev_time = this_time; } @@ -345,12 +338,13 @@ shmem_snap (int n) extern command_t cmdlist[]; -static int xml_command(char *cmd, ...) { +static int xml_command(char *cmd, ...) +{ va_list args; char *arg, *cmds[8]; int i = 1, j; - - cmds[0] = cmd; + + cmds[0] = cmd; va_start(args, cmd); while (((arg = va_arg(args, char *)) != NULL) && (i < 8)) { @@ -362,7 +356,7 @@ static int xml_command(char *cmd, ...) { printf("obdctl > "); for (j = 0; j < i; j++) - printf("%s ", cmds[j]); + printf("%s ", cmds[j]); printf("\n"); @@ -370,15 +364,16 @@ static int xml_command(char *cmd, ...) { } #if 0 -static network_t *xml_network(xmlDocPtr doc, xmlNodePtr root) { +static network_t *xml_network(xmlDocPtr doc, xmlNodePtr root) +{ xmlNodePtr cur = root->xmlChildrenNode; network_t *net; - - if ((net = (network_t *)calloc(1, sizeof(network_t))) == NULL) { + + if ((net = (network_t *) calloc(1, sizeof(network_t))) == NULL) { printf("error: unable to malloc network_t\n"); return NULL; } - + net->type = xmlGetProp(root, "type"); if (net->type == NULL) { printf("error: type attrib required (tcp, elan, myrinet)\n"); @@ -394,20 +389,21 @@ static network_t *xml_network(xmlDocPtr doc, xmlNodePtr root) { net->port = atoi(xmlNodeGetContent(cur)); cur = cur->next; - } + } if (net->server == NULL) { printf("error: tag required\n"); free(net); return NULL; } - + return net; } #endif -static int xml_mds(xmlDocPtr doc, xmlNodePtr root, - char *serv_name, char *serv_uuid) { +static int xml_mds(xmlDocPtr doc, xmlNodePtr root, + char *serv_name, char *serv_uuid) +{ xmlNodePtr cur = root->xmlChildrenNode; char *fstype = NULL, *device = NULL; int rc; @@ -419,7 +415,7 @@ static int xml_mds(xmlDocPtr doc, xmlNodePtr root, if (!xmlStrcmp(cur->name, "device")) device = xmlNodeGetContent(cur); - + /* FIXME: Parse the network bits * if (!xmlStrcmp(cur->name, "network")) { * net = xml_network(doc, cur); @@ -429,7 +425,7 @@ static int xml_mds(xmlDocPtr doc, xmlNodePtr root, * free(net); */ cur = cur->next; - } + } if ((fstype == NULL) || (device == NULL)) { printf("error: and tags required\n"); @@ -439,7 +435,8 @@ static int xml_mds(xmlDocPtr doc, xmlNodePtr root, if ((rc = xml_command("newdev", NULL)) != 0) return rc; - if ((rc = xml_command("attach","mds",serv_name,serv_uuid,NULL)) != 0) + if ((rc = + xml_command("attach", "mds", serv_name, serv_uuid, NULL)) != 0) return rc; if ((rc = xml_command("setup", device, fstype, NULL)) != 0) @@ -447,9 +444,10 @@ static int xml_mds(xmlDocPtr doc, xmlNodePtr root, return 0; } - -static int xml_obd(xmlDocPtr doc, xmlNodePtr root, - char *serv_name, char *serv_uuid) { + +static int xml_obd(xmlDocPtr doc, xmlNodePtr root, + char *serv_name, char *serv_uuid) +{ char *obdtype, *format = NULL, *fstype = NULL, *device = NULL; xmlNodePtr cur = root->xmlChildrenNode; int rc; @@ -463,12 +461,12 @@ static int xml_obd(xmlDocPtr doc, xmlNodePtr root, if (!xmlStrcmp(cur->name, "device")) device = xmlNodeGetContent(cur); - + if (!xmlStrcmp(cur->name, "autoformat")) format = xmlNodeGetContent(cur); cur = cur->next; - } + } if ((obdtype == NULL) || (fstype == NULL) || (device == NULL)) { printf("error: 'type' attrib and " @@ -480,12 +478,12 @@ static int xml_obd(xmlDocPtr doc, xmlNodePtr root, * but is currently unsupported. You'll have to use the scripts * for now until support is added, or specify a real device. */ - + if ((rc = xml_command("newdev", NULL)) != 0) return rc; if ((rc = xml_command("attach", obdtype, - serv_name,serv_uuid, NULL)) != 0) + serv_name, serv_uuid, NULL)) != 0) return rc; if ((rc = xml_command("setup", device, fstype, NULL)) != 0) @@ -494,8 +492,9 @@ static int xml_obd(xmlDocPtr doc, xmlNodePtr root, return 0; } -static int xml_ost(xmlDocPtr doc, xmlNodePtr root, - char *serv_name, char *serv_uuid) { +static int xml_ost(xmlDocPtr doc, xmlNodePtr root, + char *serv_name, char *serv_uuid) +{ char *server_name = NULL, *server_uuid = NULL; char *failover_name = NULL, *failover_uuid = NULL; xmlNodePtr cur = root->xmlChildrenNode; @@ -517,17 +516,18 @@ static int xml_ost(xmlDocPtr doc, xmlNodePtr root, } cur = cur->next; - } + } if ((server_name == NULL) || (server_uuid == NULL)) { printf("error: atleast the tag is required\n"); return -1; } - + if ((rc = xml_command("newdev", NULL)) != 0) return rc; - if ((rc = xml_command("attach","ost",serv_name,serv_uuid,NULL)) != 0) + if ((rc = + xml_command("attach", "ost", serv_name, serv_uuid, NULL)) != 0) return rc; if ((rc = xml_command("setup", server_name, NULL)) != 0) @@ -536,8 +536,9 @@ static int xml_ost(xmlDocPtr doc, xmlNodePtr root, return 0; } -static int xml_osc(xmlDocPtr doc, xmlNodePtr root, - char *serv_name, char *serv_uuid) { +static int xml_osc(xmlDocPtr doc, xmlNodePtr root, + char *serv_name, char *serv_uuid) +{ char *ost_name = NULL, *ost_uuid = NULL; xmlNodePtr cur = root->xmlChildrenNode; int ost_num, rc = 0; @@ -551,7 +552,7 @@ static int xml_osc(xmlDocPtr doc, xmlNodePtr root, } cur = cur->next; - } + } if ((ost_name == NULL) || (ost_uuid == NULL)) { printf("error: atleast the tag is required\n"); @@ -561,7 +562,8 @@ static int xml_osc(xmlDocPtr doc, xmlNodePtr root, if ((rc = xml_command("newdev", NULL)) != 0) return rc; - if ((rc = xml_command("attach","osc",serv_name,serv_uuid,NULL)) != 0) + if ((rc = + xml_command("attach", "osc", serv_name, serv_uuid, NULL)) != 0) return rc; if ((rc = xml_command("setup", ost_uuid, "localhost", NULL)) != 0) @@ -570,8 +572,9 @@ static int xml_osc(xmlDocPtr doc, xmlNodePtr root, return 0; } -static int xml_mdc(xmlDocPtr doc, xmlNodePtr root, - char *serv_name, char *serv_uuid) { +static int xml_mdc(xmlDocPtr doc, xmlNodePtr root, + char *serv_name, char *serv_uuid) +{ char *mds_name = NULL, *mds_uuid = NULL; xmlNodePtr cur = root->xmlChildrenNode; int mds_num, rc = 0; @@ -585,46 +588,51 @@ static int xml_mdc(xmlDocPtr doc, xmlNodePtr root, } cur = cur->next; - } + } if ((mds_name == NULL) || (mds_uuid == NULL)) { printf("error: atleast the tag is required\n"); return -1; } - if ((rc = xml_command("newdev", NULL)) != 0) + if ((rc = xml_command("newdev", NULL)) != 0) return rc; - if ((rc = xml_command("attach","mdc",serv_name,serv_uuid,NULL)) != 0) + if ((rc = + xml_command("attach", "mdc", serv_name, serv_uuid, NULL)) != 0) return rc; if ((rc = xml_command("setup", mds_uuid, "localhost", NULL)) != 0) return rc; - + return 0; } -static int xml_lov(xmlDocPtr doc, xmlNodePtr root, - char *serv_name, char *serv_uuid) { +static int xml_lov(xmlDocPtr doc, xmlNodePtr root, + char *serv_name, char *serv_uuid) +{ printf("--- Setting up LOV ---\n"); return 0; } -static int xml_router(xmlDocPtr doc, xmlNodePtr root, - char *serv_name, char *serv_uuid) { +static int xml_router(xmlDocPtr doc, xmlNodePtr root, + char *serv_name, char *serv_uuid) +{ printf("--- Setting up ROUTER ---\n"); return 0; } -static int xml_ldlm(xmlDocPtr doc, xmlNodePtr root, - char *serv_name, char *serv_uuid) { +static int xml_ldlm(xmlDocPtr doc, xmlNodePtr root, + char *serv_name, char *serv_uuid) +{ int rc; printf("--- Setting up LDLM ---\n"); if ((rc = xml_command("newdev", NULL)) != 0) return rc; - if ((rc = xml_command("attach","ldlm",serv_name,serv_uuid,NULL)) != 0) + if ((rc = + xml_command("attach", "ldlm", serv_name, serv_uuid, NULL)) != 0) return rc; if ((rc = xml_command("setup", NULL)) != 0) @@ -633,8 +641,9 @@ static int xml_ldlm(xmlDocPtr doc, xmlNodePtr root, return 0; } -static int xml_service(xmlDocPtr doc, xmlNodePtr root, - int serv_num, char *serv_name, char *serv_uuid) { +static int xml_service(xmlDocPtr doc, xmlNodePtr root, + int serv_num, char *serv_name, char *serv_uuid) +{ xmlNodePtr cur = root; char *name, *uuid; @@ -642,8 +651,7 @@ static int xml_service(xmlDocPtr doc, xmlNodePtr root, name = xmlGetProp(cur, "name"); uuid = xmlGetProp(cur, "uuid"); - if (xmlStrcmp(name, serv_name) || - xmlStrcmp(uuid, serv_uuid)) { + if (xmlStrcmp(name, serv_name) || xmlStrcmp(uuid, serv_uuid)) { cur = cur->next; continue; } @@ -665,18 +673,19 @@ static int xml_service(xmlDocPtr doc, xmlNodePtr root, else if (!xmlStrcmp(cur->name, "ldlm")) return xml_ldlm(doc, cur, name, uuid); else - return -1; + return -1; cur = cur->next; } printf("error: No XML config branch for name=%s uuid=%s\n", - serv_name, serv_uuid); - return -1; + serv_name, serv_uuid); + return -1; } -static int xml_profile(xmlDocPtr doc, xmlNodePtr root, - int prof_num, char *prof_name, char *prof_uuid) { +static int xml_profile(xmlDocPtr doc, xmlNodePtr root, + int prof_num, char *prof_name, char *prof_uuid) +{ xmlNodePtr parent, cur = root; char *name, *uuid, *srv_name, *srv_uuid; int rc = 0, num; @@ -685,16 +694,15 @@ static int xml_profile(xmlDocPtr doc, xmlNodePtr root, name = xmlGetProp(cur, "name"); uuid = xmlGetProp(cur, "uuid"); - if (xmlStrcmp(cur->name, "profile") || - xmlStrcmp(name, prof_name) || - xmlStrcmp(uuid, prof_uuid)) { + if (xmlStrcmp(cur->name, "profile") || + xmlStrcmp(name, prof_name) || xmlStrcmp(uuid, prof_uuid)) { cur = cur->next; continue; } /* FIXME: Doesn't understand mountpoints yet * xml_mountpoint(doc, root, ...); - */ + */ /* Setup each service in turn * FIXME: Should be sorted by "num" attr, we shouldn't @@ -706,8 +714,10 @@ static int xml_profile(xmlDocPtr doc, xmlNodePtr root, if (!xmlStrcmp(cur->name, "service_id")) { num = atoi(xmlGetProp(cur, "num")); rc = xml_service(doc, root, num, - srv_name = xmlGetProp(cur, "name"), - srv_uuid = xmlGetProp(cur, "uuid")); + srv_name = + xmlGetProp(cur, "name"), + srv_uuid = + xmlGetProp(cur, "uuid")); if (rc != 0) { printf("error: service config\n"); return rc; @@ -720,14 +730,15 @@ static int xml_profile(xmlDocPtr doc, xmlNodePtr root, cur = parent->next; } - return rc; + return rc; } -static int xml_node(xmlDocPtr doc, xmlNodePtr root) { +static int xml_node(xmlDocPtr doc, xmlNodePtr root) +{ xmlNodePtr parent, cur = root; char *name, *uuid; int rc = 0, num; - + /* Walk the node tags looking for ours */ while (cur != NULL) { if (xmlStrcmp(cur->name, "node")) { @@ -781,7 +792,7 @@ static int do_xml(char *func, char *file) doc = xmlParseFile(file); if (doc == NULL) { fprintf(stderr, "error: Unable to parse XML\n"); - return -1; + return -1; } cur = xmlDocGetRootElement(doc); @@ -790,7 +801,7 @@ static int do_xml(char *func, char *file) xmlFreeDoc(doc); return -1; } - + if (xmlStrcmp(cur->name, (const xmlChar *)"lustre")) { fprintf(stderr, "error: Root node != \n"); xmlFreeDoc(doc); @@ -798,13 +809,13 @@ static int do_xml(char *func, char *file) } /* FIXME: Validate the XML against the DTD here */ - + /* FIXME: Merge all the text nodes under each branch and * prune empty nodes. Just to make the parsing more * tolerant, the exact location of nested tags isn't * critical for this. */ - + rc = xml_node(doc, cur->xmlChildrenNode); xmlFreeDoc(doc); @@ -827,7 +838,7 @@ static int do_device(char *func, int dev) return -2; } - return ioctl(fd, OBD_IOC_DEVICE , buf); + return ioctl(fd, OBD_IOC_DEVICE, buf); } static int jt_device(int argc, char **argv) @@ -861,13 +872,13 @@ static int jt_connect(int argc, char **argv) return -1; } - rc = ioctl(fd, OBD_IOC_CONNECT , &data); + rc = ioctl(fd, OBD_IOC_CONNECT, &data); if (rc < 0) fprintf(stderr, "error: %s: %x %s\n", cmdname(argv[0]), OBD_IOC_CONNECT, strerror(rc = errno)); else conn_addr = data.ioc_addr; - conn_cookie = data.ioc_cookie; + conn_cookie = data.ioc_cookie; return rc; } @@ -946,7 +957,7 @@ static int jt__threads(int argc, char **argv) argv[0], threads, argv[3], argv[4]); SHMEM_RESET(); - + for (i = 1, next_thread = verbose; i <= threads; i++) { rc = fork(); if (rc < 0) { @@ -963,22 +974,20 @@ static int jt__threads(int argc, char **argv) rc = 0; } - if (!thread) { /* parent process */ + if (!thread) { /* parent process */ int live_threads = threads; - - while (live_threads > 0) - { - int status; - pid_t ret; - - ret = waitpid (0, &status, verbose < 0 ? WNOHANG : 0); - if (ret == 0) - { + + while (live_threads > 0) { + int status; + pid_t ret; + + ret = waitpid(0, &status, verbose < 0 ? WNOHANG : 0); + if (ret == 0) { if (verbose >= 0) - abort (); + abort(); - sleep (-verbose); - SHMEM_SNAP (threads); + sleep(-verbose); + SHMEM_SNAP(threads); continue; } @@ -1026,10 +1035,10 @@ static int jt_detach(int argc, char **argv) return -2; } - rc = ioctl(fd, OBD_IOC_DETACH , buf); + rc = ioctl(fd, OBD_IOC_DETACH, buf); if (rc < 0) fprintf(stderr, "error: %s: %s\n", cmdname(argv[0]), - strerror(rc=errno)); + strerror(rc = errno)); return rc; } @@ -1045,10 +1054,10 @@ static int jt_cleanup(int argc, char **argv) return -1; } - rc = ioctl(fd, OBD_IOC_CLEANUP , &data); + rc = ioctl(fd, OBD_IOC_CLEANUP, &data); if (rc < 0) fprintf(stderr, "error: %s: %s\n", cmdname(argv[0]), - strerror(rc=errno)); + strerror(rc = errno)); return rc; } @@ -1067,10 +1076,10 @@ static int jt_newdev(int argc, char **argv) return -1; } - rc = ioctl(fd, OBD_IOC_NEWDEV , &data); + rc = ioctl(fd, OBD_IOC_NEWDEV, &data); if (rc < 0) fprintf(stderr, "error: %s: %s\n", cmdname(argv[0]), - strerror(rc=errno)); + strerror(rc = errno)); else { printf("Current device set to %d\n", data.ioc_dev); } @@ -1098,10 +1107,10 @@ static int jt_list(int argc, char **argv) return -1; } - rc = ioctl(fd, OBD_IOC_LIST , data); + rc = ioctl(fd, OBD_IOC_LIST, data); if (rc < 0) fprintf(stderr, "error: %s: %s\n", cmdname(argv[0]), - strerror(rc=errno)); + strerror(rc = errno)); else { printf("%s", data->ioc_bulk); } @@ -1121,7 +1130,7 @@ static int jt_attach(int argc, char **argv) return -1; } - data.ioc_inllen1 = strlen(argv[1]) + 1; + data.ioc_inllen1 = strlen(argv[1]) + 1; data.ioc_inlbuf1 = argv[1]; if (argc >= 3) { data.ioc_inllen2 = strlen(argv[2]) + 1; @@ -1138,7 +1147,7 @@ static int jt_attach(int argc, char **argv) return -2; } - rc = ioctl(fd, OBD_IOC_ATTACH , buf); + rc = ioctl(fd, OBD_IOC_ATTACH, buf); if (rc < 0) fprintf(stderr, "error: %s: %x %s\n", cmdname(argv[0]), OBD_IOC_ATTACH, strerror(rc = errno)); @@ -1158,7 +1167,7 @@ static int jt_attach(int argc, char **argv) return rc; } -#define N2D_OFF 0x100 /* So we can tell between error codes and devices */ +#define N2D_OFF 0x100 /* So we can tell between error codes and devices */ static int do_name2dev(char *func, char *name) { @@ -1176,7 +1185,7 @@ static int do_name2dev(char *func, char *name) fprintf(stderr, "error: %s: invalid ioctl\n", cmdname(func)); return -2; } - rc = ioctl(fd, OBD_IOC_NAME2DEV , buf); + rc = ioctl(fd, OBD_IOC_NAME2DEV, buf); if (rc < 0) { fprintf(stderr, "error: %s: %s - %s\n", cmdname(func), name, strerror(rc = errno)); @@ -1191,7 +1200,7 @@ static int do_name2dev(char *func, char *name) static int jt_name2dev(int argc, char **argv) { if (argc != 2) { - Parser_printhelp("name2dev"); + Parser_printhelp("name2dev"); return -1; } @@ -1211,7 +1220,7 @@ static int jt_setup(int argc, char **argv) IOCINIT(data); - if ( argc > 3) { + if (argc > 3) { fprintf(stderr, "usage: %s [device] [fstype]\n", cmdname(argv[0])); return -1; @@ -1231,7 +1240,7 @@ static int jt_setup(int argc, char **argv) data.ioc_inllen1 = strlen(argv[1]) + 1; data.ioc_inlbuf1 = argv[1]; } - if ( argc == 3 ) { + if (argc == 3) { data.ioc_inllen2 = strlen(argv[2]) + 1; data.ioc_inlbuf2 = argv[2]; } @@ -1240,7 +1249,7 @@ static int jt_setup(int argc, char **argv) fprintf(stderr, "error: %s: invalid ioctl\n", cmdname(argv[0])); return -2; } - rc = ioctl(fd, OBD_IOC_SETUP , buf); + rc = ioctl(fd, OBD_IOC_SETUP, buf); if (rc < 0) fprintf(stderr, "error: %s: %s\n", cmdname(argv[0]), strerror(rc = errno)); @@ -1277,8 +1286,8 @@ static int jt_create(int argc, char **argv) gettimeofday(&next_time, NULL); next_time.tv_sec -= verbose; - for (i = 1, next_count = verbose; i <= count ; i++) { - rc = ioctl(fd, OBD_IOC_CREATE , &data); + for (i = 1, next_count = verbose; i <= count; i++) { + rc = ioctl(fd, OBD_IOC_CREATE, &data); SHMEM_BUMP(); if (rc < 0) { fprintf(stderr, "error: %s: #%d - %s\n", @@ -1306,7 +1315,7 @@ static int jt_setattr(int argc, char **argv) data.ioc_obdo1.o_mode = S_IFREG | strtoul(argv[2], NULL, 0); data.ioc_obdo1.o_valid = OBD_MD_FLMODE; - rc = ioctl(fd, OBD_IOC_SETATTR , &data); + rc = ioctl(fd, OBD_IOC_SETATTR, &data); if (rc < 0) fprintf(stderr, "error: %s: %s\n", cmdname(argv[0]), strerror(rc = errno)); @@ -1325,9 +1334,9 @@ static int jt_destroy(int argc, char **argv) } data.ioc_obdo1.o_id = strtoul(argv[1], NULL, 0); - data.ioc_obdo1.o_mode = S_IFREG|0644; + data.ioc_obdo1.o_mode = S_IFREG | 0644; - rc = ioctl(fd, OBD_IOC_DESTROY , &data); + rc = ioctl(fd, OBD_IOC_DESTROY, &data); if (rc < 0) fprintf(stderr, "error: %s: %s\n", cmdname(argv[0]), strerror(rc = errno)); @@ -1351,10 +1360,10 @@ static int jt_getattr(int argc, char **argv) data.ioc_obdo1.o_valid = 0xffffffff; printf("%s: object id %Ld\n", cmdname(argv[0]), data.ioc_obdo1.o_id); - rc = ioctl(fd, OBD_IOC_GETATTR , &data); + rc = ioctl(fd, OBD_IOC_GETATTR, &data); if (rc) { fprintf(stderr, "error: %s: %s\n", cmdname(argv[0]), - strerror(rc=errno)); + strerror(rc = errno)); } else { printf("%s: object id %Ld, mode %o\n", cmdname(argv[0]), data.ioc_obdo1.o_id, data.ioc_obdo1.o_mode); @@ -1370,7 +1379,8 @@ static int jt_test_getattr(int argc, char **argv) int verbose; if (argc != 2 && argc != 3) { - fprintf(stderr, "usage: %s count [verbose]\n",cmdname(argv[0])); + fprintf(stderr, "usage: %s count [verbose]\n", + cmdname(argv[0])); return -1; } @@ -1388,21 +1398,23 @@ static int jt_test_getattr(int argc, char **argv) next_time.tv_sec = start.tv_sec - verbose; next_time.tv_usec = start.tv_usec; if (verbose != 0) - printf("%s: getting %d attrs (testing only): %s", cmdname(argv[0]), - count, ctime(&start.tv_sec)); + printf("%s: getting %d attrs (testing only): %s", + cmdname(argv[0]), count, ctime(&start.tv_sec)); for (i = 1, next_count = verbose; i <= count; i++) { - rc = ioctl(fd, OBD_IOC_GETATTR , &data); + rc = ioctl(fd, OBD_IOC_GETATTR, &data); SHMEM_BUMP(); if (rc < 0) { fprintf(stderr, "error: %s: #%d - %s\n", cmdname(argv[0]), i, strerror(rc = errno)); break; } else { - if (be_verbose(verbose, &next_time, i,&next_count,count)) - printf("%s: got attr #%d\n", cmdname(argv[0]), i); - } - } + if (be_verbose + (verbose, &next_time, i, &next_count, count)) + printf("%s: got attr #%d\n", cmdname(argv[0]), + i); + } + } if (!rc) { struct timeval end; @@ -1518,7 +1530,8 @@ static int jt_test_brw(int argc, char **argv) cmdname(argv[0]), i, strerror(rc = errno), write ? "write" : "read"); break; - } else if (be_verbose(verbose, &next_time, i,&next_count,count)) + } else if (be_verbose + (verbose, &next_time, i, &next_count, count)) printf("%s: %s number %d\n", cmdname(argv[0]), write ? "write" : "read", i); } @@ -1536,8 +1549,9 @@ static int jt_test_brw(int argc, char **argv) --i; if (verbose != 0) printf("%s: %s %dx%dx%d pages in %.4gs (%.4g pg/s): %s", - cmdname(argv[0]), write ? "wrote" : "read", obdos, - pages, i, diff, (double)obdos * i * pages / diff, + cmdname(argv[0]), write ? "wrote" : "read", + obdos, pages, i, diff, + (double)obdos * i * pages / diff, ctime(&end.tv_sec)); } return rc; @@ -1546,53 +1560,52 @@ static int jt_test_brw(int argc, char **argv) static int jt_lov_config(int argc, char **argv) { struct obd_ioctl_data data; - struct lov_desc desc; + struct lov_desc desc; uuid_t *uuidarray; int size, i; IOCINIT(data); - printf("WARNING: obdctl lovconfig NOT MAINTAINED\n"); + printf("WARNING: obdctl lovconfig NOT MAINTAINED\n"); return -1; - if (argc <= 5 ){ - Parser_printhelp("lovconfig"); + if (argc <= 5) { + Parser_printhelp("lovconfig"); return -1; } - if (strlen(argv[1]) > sizeof(uuid_t) - 1) { - fprintf(stderr, "lov_config: no %dB memory for uuid's\n", + if (strlen(argv[1]) > sizeof(uuid_t) - 1) { + fprintf(stderr, "lov_config: no %dB memory for uuid's\n", strlen(argv[1])); return -ENOMEM; } - - memset(&desc, 0, sizeof(desc)); - strcpy(desc.ld_uuid, argv[1]); - desc.ld_default_stripe_count = strtoul(argv[2], NULL, 0); - desc.ld_default_stripe_size = strtoul(argv[3], NULL, 0); - desc.ld_pattern = strtoul(argv[4], NULL, 0); + + memset(&desc, 0, sizeof(desc)); + strcpy(desc.ld_uuid, argv[1]); + desc.ld_default_stripe_count = strtoul(argv[2], NULL, 0); + desc.ld_default_stripe_size = strtoul(argv[3], NULL, 0); + desc.ld_pattern = strtoul(argv[4], NULL, 0); desc.ld_tgt_count = argc - 5; size = sizeof(uuid_t) * desc.ld_tgt_count; uuidarray = malloc(size); - if (!uuidarray) { - fprintf(stderr, "lov_config: no %dB memory for uuid's\n", - size); + if (!uuidarray) { + fprintf(stderr, "lov_config: no %dB memory for uuid's\n", size); return -ENOMEM; } - memset(uuidarray, 0, size); - for (i=5 ; i < argc ; i++) { - char *buf = (char *) (uuidarray + i -5 ); - if (strlen(argv[i]) >= sizeof(uuid_t)) { - fprintf(stderr, "lov_config: arg %d (%s) too long\n", + memset(uuidarray, 0, size); + for (i = 5; i < argc; i++) { + char *buf = (char *)(uuidarray + i - 5); + if (strlen(argv[i]) >= sizeof(uuid_t)) { + fprintf(stderr, "lov_config: arg %d (%s) too long\n", i, argv[i]); free(uuidarray); return -EINVAL; } - strcpy(buf, argv[i]); + strcpy(buf, argv[i]); } - data.ioc_inllen1 = sizeof(desc); + data.ioc_inllen1 = sizeof(desc); data.ioc_inlbuf1 = (char *)&desc; data.ioc_inllen2 = size; data.ioc_inlbuf2 = (char *)uuidarray; @@ -1602,15 +1615,14 @@ static int jt_lov_config(int argc, char **argv) return -EINVAL; } - rc = ioctl(fd, OBD_IOC_LOV_CONFIG , buf); + rc = ioctl(fd, OBD_IOC_LOV_CONFIG, buf); if (rc < 0) - fprintf(stderr, "lov_config: error: %s: %s\n", - cmdname(argv[0]),strerror(rc = errno)); + fprintf(stderr, "lov_config: error: %s: %s\n", + cmdname(argv[0]), strerror(rc = errno)); free(uuidarray); return rc; } - static int jt_test_ldlm(int argc, char **argv) { struct obd_ioctl_data data; @@ -1628,6 +1640,23 @@ static int jt_test_ldlm(int argc, char **argv) return rc; } +static int jt_dump_ldlm(int argc, char **argv) +{ + struct obd_ioctl_data data; + + IOCINIT(data); + if (argc != 1) { + fprintf(stderr, "usage: %s\n", cmdname(argv[0])); + return 1; + } + + rc = ioctl(fd, IOC_LDLM_DUMP, &data); + if (rc) + fprintf(stderr, "error: %s failed: %s\n", + cmdname(argv[0]), strerror(rc = errno)); + return rc; +} + static int jt_newconn(int argc, char **argv) { struct obd_ioctl_data data; @@ -1638,7 +1667,7 @@ static int jt_newconn(int argc, char **argv) return -1; } - rc = ioctl(fd, OBD_IOC_RECOVD_NEWCONN , &data); + rc = ioctl(fd, OBD_IOC_RECOVD_NEWCONN, &data); if (rc < 0) fprintf(stderr, "error: %s: %s\n", cmdname(argv[0]), strerror(rc = errno)); @@ -1658,15 +1687,16 @@ command_t cmdlist[] = { {"--xml", jt__xml, 0, "--xml "}, {"--device", jt__device, 0, "--device "}, {"--threads", jt__threads, 0, - "--threads "}, + "--threads "}, /* Device configuration commands */ {"lovconfig", jt_lov_config, 0, "configure lov data on MDS " - "[usage: lovconfig lov-uuid stripecount, stripesize, pattern, UUID1, [UUID2, ...]"}, + "[usage: lovconfig lov-uuid stripecount, stripesize, pattern, UUID1, [UUID2, ...]"}, {"list", jt_list, 0, "list the devices (no args)"}, {"newdev", jt_newdev, 0, "set device to a new unused obd (no args)"}, {"device", jt_device, 0, "set current device (args device_no name)"}, - {"name2dev", jt_name2dev, 0, "set device by name [usage: name2dev devname]"}, + {"name2dev", jt_name2dev, 0, + "set device by name [usage: name2dev devname]"}, {"attach", jt_attach, 0, "name the type of device (args: type data"}, {"setup", jt_setup, 0, "setup device (args: [data]"}, {"detach", jt_detach, 0, "detach the current device (arg: )"}, @@ -1675,7 +1705,7 @@ command_t cmdlist[] = { /* Session commands */ {"connect", jt_connect, 0, "connect - get a connection to device"}, {"disconnect", jt_disconnect, 0, - "disconnect - break connection to device"}, + "disconnect - break connection to device"}, /* Session operations */ {"create", jt_create, 0, "create [count [mode [verbose]]]"}, @@ -1686,12 +1716,13 @@ command_t cmdlist[] = { {"test_getattr", jt_test_getattr, 0, "test_getattr [verbose]"}, {"test_brw", jt_test_brw, 0, "test_brw [write [verbose]]"}, {"test_ldlm", jt_test_ldlm, 0, "test lock manager (no args)"}, + {"dump_ldlm", jt_dump_ldlm, 0, "dump all lock manager state (no args)"}, /* User interface commands */ {"help", Parser_help, 0, "help"}, {"exit", jt_quit, 0, "quit"}, {"quit", jt_quit, 0, "quit"}, - { 0, 0, 0, NULL } + {0, 0, 0, NULL} }; @@ -1700,9 +1731,8 @@ static void signal_server(int sig) if (sig == SIGINT) { do_disconnect("sigint", 1); exit(1); - } else { + } else fprintf(stderr, "%s: got signal %d\n", cmdname("sigint"), sig); - } } int main(int argc, char **argv) @@ -1714,9 +1744,9 @@ int main(int argc, char **argv) sigact.sa_flags = SA_RESTART; sigaction(SIGINT, &sigact, NULL); - setlinebuf (stdout); + setlinebuf(stdout); SHMEM_SETUP(); - + if (argc > 1) { rc = Parser_execarg(argc - 1, argv + 1, cmdlist); } else { @@ -1727,4 +1757,3 @@ int main(int argc, char **argv) do_disconnect(argv[0], 1); return rc; } -