From 7088fa71183e5f376eb1b813327544e5d3cee465 Mon Sep 17 00:00:00 2001 From: pschwan Date: Sat, 13 Apr 2002 17:01:01 +0000 Subject: [PATCH] - added connection structure which encompasses some (but perhaps not enough yet) HA elements and a lustre_peer. see rpc/connection.c or lustre_net.h for the very-simple API - removed many duplicated structure entries from the RPC path - broke local_lock_enqueue into two functions; create the lock, do the remote operation, do the local operation. - the shadow local lock tree is implemented, but not tested. --- lustre/include/linux/lustre_dlm.h | 46 +++++++----- lustre/include/linux/lustre_idl.h | 4 + lustre/include/linux/lustre_lite.h | 4 +- lustre/include/linux/lustre_mds.h | 35 ++++----- lustre/include/linux/lustre_net.h | 64 +++++++++------- lustre/include/linux/obd.h | 7 +- lustre/ldlm/ldlm_lock.c | 101 +++++++++++++------------ lustre/ldlm/ldlm_lockd.c | 86 +++++++++++---------- lustre/ldlm/ldlm_request.c | 148 ++++++++++++++++++++++++++++++++----- lustre/ldlm/ldlm_resource.c | 15 ++-- lustre/ldlm/ldlm_test.c | 61 ++++++++------- lustre/llite/dir.c | 4 +- lustre/llite/file.c | 10 +-- lustre/llite/namei.c | 30 ++++---- lustre/llite/rw.c | 56 ++++++-------- lustre/llite/super.c | 10 +-- lustre/mdc/mdc_reint.c | 22 +++--- lustre/mdc/mdc_request.c | 62 ++++++++-------- lustre/mds/handler.c | 116 ++++++++++++----------------- lustre/mds/mds_reint.c | 3 +- lustre/osc/osc_request.c | 127 ++++++++++++++++--------------- lustre/ost/ost_handler.c | 45 ++++------- lustre/ptlrpc/Makefile.am | 3 +- lustre/ptlrpc/client.c | 91 +++++++++++------------ lustre/ptlrpc/connection.c | 128 ++++++++++++++++++++++++++++++++ lustre/ptlrpc/events.c | 6 +- lustre/ptlrpc/niobuf.c | 78 ++++++++++--------- lustre/ptlrpc/pack_generic.c | 15 ++-- lustre/ptlrpc/rpc.c | 9 +-- lustre/ptlrpc/service.c | 80 ++++++++++++-------- lustre/tests/llmount.sh | 1 + 31 files changed, 842 insertions(+), 625 deletions(-) create mode 100644 lustre/ptlrpc/connection.c diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index 9fd38b5..1b34f76 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -107,7 +107,7 @@ struct ldlm_lock { ldlm_mode_t l_granted_mode; ldlm_lock_callback l_completion_ast; ldlm_lock_callback l_blocking_ast; - struct lustre_peer l_peer; + struct ptlrpc_connection *l_connection; void *l_data; __u32 l_data_len; struct ldlm_extent l_extent; @@ -169,14 +169,17 @@ static inline void ldlm_object2handle(void *object, struct ldlm_handle *handle) handle->addr = (__u64)(unsigned long)object; } -static inline void ldlm_lock(struct obd_device *obddev) +extern struct list_head ldlm_namespaces; +extern spinlock_t ldlm_spinlock; + +static inline void ldlm_lock(void) { - spin_lock(&obddev->u.ldlm.ldlm_lock); + spin_lock(&ldlm_spinlock); } -static inline void ldlm_unlock(struct obd_device *obddev) +static inline void ldlm_unlock(void) { - spin_unlock(&obddev->u.ldlm.ldlm_lock); + spin_unlock(&ldlm_spinlock); } extern struct obd_ops ldlm_obd_ops; @@ -189,31 +192,29 @@ int ldlm_extent_policy(struct ldlm_resource *, struct ldlm_extent *, /* ldlm_lock.c */ void ldlm_lock_free(struct ldlm_lock *lock); void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc); -ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, - __u32 ns_id, - struct ldlm_handle *parent_lock_handle, - __u64 *res_id, - __u32 type, - struct ldlm_extent *req_ex, +ldlm_error_t ldlm_local_lock_create(__u32 ns_id, + struct ldlm_handle *parent_lock_handle, + __u64 *res_id, + __u32 type, + struct ldlm_handle *lockh); +ldlm_error_t ldlm_local_lock_enqueue(struct ldlm_handle *lockh, ldlm_mode_t mode, + struct ldlm_extent *req_ex, int *flags, ldlm_lock_callback completion, ldlm_lock_callback blocking, void *data, - __u32 data_len, - struct ldlm_handle *lockh); -ldlm_error_t ldlm_local_lock_convert(struct obd_device *obddev, - struct ldlm_handle *lockh, + __u32 data_len); +ldlm_error_t ldlm_local_lock_convert(struct ldlm_handle *lockh, int new_mode, int *flags); -ldlm_error_t ldlm_local_lock_cancel(struct obd_device *obddev, - struct ldlm_handle *lockh); +ldlm_error_t ldlm_local_lock_cancel(struct ldlm_handle *lockh); void ldlm_lock_dump(struct ldlm_lock *lock); /* ldlm_test.c */ int ldlm_test(struct obd_device *device); /* resource.c */ -struct ldlm_namespace *ldlm_namespace_find(struct obd_device *, __u32 id); +struct ldlm_namespace *ldlm_namespace_find(__u32 id); ldlm_error_t ldlm_namespace_new(struct obd_device *, __u32 id, struct ldlm_namespace **); int ldlm_namespace_free(struct ldlm_namespace *ns); @@ -228,9 +229,10 @@ void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc); void ldlm_resource_dump(struct ldlm_resource *res); /* ldlm_request.c */ -int ldlm_cli_namespace_new(struct ptlrpc_client *, struct lustre_peer *, +int ldlm_cli_namespace_new(struct obd_device *, struct ptlrpc_client *, + struct ptlrpc_connection *, __u32 ns_id, struct ptlrpc_request **); -int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct lustre_peer *peer, +int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *peer, __u32 ns_id, struct ldlm_handle *parent_lock_handle, __u64 *res_id, @@ -244,6 +246,10 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct lustre_peer *peer, struct ptlrpc_request **request); int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new, void *data, __u32 data_len); +int ldlm_cli_convert(struct ptlrpc_client *, struct ldlm_handle *, + int new_mode, int *flags, struct ptlrpc_request **); +int ldlm_cli_cancel(struct ptlrpc_client *, struct ldlm_handle *, + struct ptlrpc_request **); #endif /* __KERNEL__ */ diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index 067a834..7dd153b 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -43,6 +43,9 @@ */ struct lustre_msg { + __u64 conn; + __u64 token; + __u32 opc; __u32 xid; __u32 status; @@ -327,6 +330,7 @@ struct ldlm_request { }; struct ldlm_reply { + __u32 flags; struct ldlm_handle lock_handle; struct ldlm_extent lock_extent; }; diff --git a/lustre/include/linux/lustre_lite.h b/lustre/include/linux/lustre_lite.h index df66e70..ba265b7 100644 --- a/lustre/include/linux/lustre_lite.h +++ b/lustre/include/linux/lustre_lite.h @@ -44,10 +44,10 @@ struct ll_sb_info { unsigned long ll_cache_count; struct semaphore ll_list_mutex; struct ptlrpc_client ll_mds_client; - struct lustre_peer ll_mds_peer; + struct ptlrpc_connection *ll_mds_conn; struct ptlrpc_client ll_ost_client; struct lustre_ha_mgr *ll_ha_mgr; - struct lustre_peer ll_ost_peer; + struct ptlrpc_connection *ll_ost_conn; }; diff --git a/lustre/include/linux/lustre_mds.h b/lustre/include/linux/lustre_mds.h index 6696006..b9d08be 100644 --- a/lustre/include/linux/lustre_mds.h +++ b/lustre/include/linux/lustre_mds.h @@ -79,30 +79,31 @@ void mds_rename_pack(struct mds_rec_rename *, struct inode *srcdir, struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt); /* llight/request.c */ -int mdc_getattr(struct ptlrpc_client *, struct lustre_peer *, ino_t ino, +int mdc_getattr(struct ptlrpc_client *, struct ptlrpc_connection *, ino_t ino, int type, int valid, struct ptlrpc_request **); -int mdc_setattr(struct ptlrpc_client *, struct lustre_peer *, struct inode *, - struct iattr *iattr, struct ptlrpc_request **); -int mdc_open(struct ptlrpc_client *, struct lustre_peer *, ino_t ino, int type, - int flags, __u64 *fh, struct ptlrpc_request **req); -int mdc_close(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino, - int type, __u64 fh, struct ptlrpc_request **req); -int mdc_readpage(struct ptlrpc_client *, struct lustre_peer *, ino_t ino, +int mdc_setattr(struct ptlrpc_client *, struct ptlrpc_connection *, + struct inode *, struct iattr *iattr, struct ptlrpc_request **); +int mdc_open(struct ptlrpc_client *, struct ptlrpc_connection *, ino_t ino, + int type, int flags, __u64 *fh, struct ptlrpc_request **req); +int mdc_close(struct ptlrpc_client *cl, struct ptlrpc_connection *peer, + ino_t ino, int type, __u64 fh, struct ptlrpc_request **req); +int mdc_readpage(struct ptlrpc_client *, struct ptlrpc_connection *, ino_t ino, int type, __u64 offset, char *addr, struct ptlrpc_request **); -int mdc_create(struct ptlrpc_client *, struct lustre_peer *, +int mdc_create(struct ptlrpc_client *, struct ptlrpc_connection *, struct inode *dir, const char *name, int namelen, const char *tgt, int tgtlen, int mode, __u64 id, __u32 uid, __u32 gid, __u64 time, struct ptlrpc_request **); -int mdc_unlink(struct ptlrpc_client *, struct lustre_peer *, struct inode *dir, - struct inode *child, const char *name, int namelen, +int mdc_unlink(struct ptlrpc_client *, struct ptlrpc_connection *, + struct inode *dir, struct inode *child, const char *name, + int namelen, struct ptlrpc_request **); +int mdc_link(struct ptlrpc_client *, struct ptlrpc_connection *, + struct dentry *src, struct inode *dir, const char *name, + int namelen, struct ptlrpc_request **); +int mdc_rename(struct ptlrpc_client *, struct ptlrpc_connection *, + struct inode *src, struct inode *tgt, const char *old, + int oldlen, const char *new, int newlen, struct ptlrpc_request **); -int mdc_link(struct ptlrpc_client *, struct lustre_peer *, struct dentry *src, - struct inode *dir, const char *name, int namelen, - struct ptlrpc_request **); -int mdc_rename(struct ptlrpc_client *, struct lustre_peer *, struct inode *src, - struct inode *tgt, const char *old, int oldlen, - const char *new, int newlen, struct ptlrpc_request **); int mdc_create_client(char *uuid, struct ptlrpc_client *cl); struct mds_fs_operations { diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index 78bd73b..805bc0d 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -53,8 +53,6 @@ #define LDLM_REQUEST_PORTAL 13 #define LDLM_REPLY_PORTAL 14 -#define LDLM_CLI_REQUEST_PORTAL 15 -#define LDLM_CLI_REPLY_PORTAL 16 /* default rpc ring length */ #define RPC_RING_LENGTH 2 @@ -67,6 +65,22 @@ #define SVC_LIST 32 #define SVC_SIGNAL 64 +struct ptlrpc_connection { + struct list_head c_link; + struct lustre_peer c_peer; + + __u32 c_generation; /* changes upon new connection */ + __u32 c_epoch; /* changes when peer changes */ + __u32 c_bootcount; /* peer's boot count */ + + spinlock_t c_lock; + __u32 c_xid_in; + __u32 c_xid_out; + + atomic_t c_refcount; + __u64 c_token; +}; + struct ptlrpc_client { struct obd_device *cli_obd; struct list_head cli_sending_head; @@ -74,11 +88,6 @@ struct ptlrpc_client { __u32 cli_request_portal; __u32 cli_reply_portal; - spinlock_t cli_lock; - __u32 cli_xid; - __u32 cli_generation; /* changes upon new connection */ - __u32 cli_epoch; /* changes when peer changes */ - __u32 cli_bootcount; /* peer's boot count */ struct semaphore cli_rpc_sem; struct list_head cli_ha_item; struct lustre_ha_mgr *cli_ha_mgr; @@ -102,14 +111,11 @@ struct ptlrpc_request { int rq_status; int rq_flags; __u32 rq_connid; - __u32 rq_xid; int rq_reqlen; - char *rq_reqbuf; struct lustre_msg *rq_reqmsg; int rq_replen; - char *rq_repbuf; struct lustre_msg *rq_repmsg; char *rq_bulkbuf; @@ -128,16 +134,13 @@ struct ptlrpc_request { ptl_md_t rq_req_md; ptl_handle_md_t rq_req_md_h; - __u32 rq_reply_portal; - __u32 rq_req_portal; - - struct lustre_peer rq_peer; + struct ptlrpc_connection *rq_connection; struct ptlrpc_client *rq_client; }; struct ptlrpc_bulk_desc { int b_flags; - struct lustre_peer b_peer; + struct ptlrpc_connection *b_connection; __u32 b_portal; char *b_buf; int b_buflen; @@ -189,31 +192,35 @@ typedef int (*svc_handler_t)(struct obd_device *obddev, struct ptlrpc_service *svc, struct ptlrpc_request *req); - +/* rpc/connection.c */ +struct ptlrpc_connection *ptlrpc_get_connection(struct lustre_peer *peer); +int ptlrpc_put_connection(struct ptlrpc_connection *c); +struct ptlrpc_connection *ptlrpc_connection_addref(struct ptlrpc_connection *); +void ptlrpc_init_connection(void); +void ptlrpc_cleanup_connection(void); /* rpc/niobuf.c */ int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *); int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *, int portal); -int ptl_send_buf(struct ptlrpc_request *, struct lustre_peer *, int portal); +int ptl_send_buf(struct ptlrpc_request *, struct ptlrpc_connection *, + int portal); int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *); int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *bulk); int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req); int ptlrpc_error(struct ptlrpc_service *svc, struct ptlrpc_request *req); -int ptl_send_rpc(struct ptlrpc_request *request, struct ptlrpc_client *cl); +int ptl_send_rpc(struct ptlrpc_request *request); void ptlrpc_link_svc_me(struct ptlrpc_service *service, int i); /* rpc/client.c */ -void ptlrpc_init_client(struct lustre_ha_mgr *mgr, int req_portal, int rep_portal, - struct ptlrpc_client *cl); -int ptlrpc_connect_client(char *uuid, struct ptlrpc_client *cl, - struct lustre_peer *peer); -int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req); -int ptlrpc_queue_req(struct ptlrpc_client *peer, struct ptlrpc_request *req); +void ptlrpc_init_client(struct lustre_ha_mgr *, int req_portal, int rep_portal, + struct ptlrpc_client *); +struct ptlrpc_connection *ptlrpc_connect_client(char *uuid); +int ptlrpc_queue_wait(struct ptlrpc_request *req); struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, - struct lustre_peer *peer, int opcode, + struct ptlrpc_connection *u, int opcode, int count, int *lengths, char **bufs); void ptlrpc_free_req(struct ptlrpc_request *request); -struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct lustre_peer *); +struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *); int ptlrpc_check_status(struct ptlrpc_request *req, int err); /* rpc/service.c */ @@ -232,9 +239,10 @@ struct ptlrpc_svc_data { }; /* rpc/pack_generic.c */ -int lustre_pack_msg(int count, int *lens, char **bufs, int *len, char **buf); +int lustre_pack_msg(int count, int *lens, char **bufs, int *len, + struct lustre_msg **msg); int lustre_msg_size(int count, int *lengths); -int lustre_unpack_msg(char *buf, int len); +int lustre_unpack_msg(struct lustre_msg *m, int len); void *lustre_msg_buf(struct lustre_msg *m, int n); #endif diff --git a/lustre/include/linux/obd.h b/lustre/include/linux/obd.h index 4d41e8a..53f7b33 100644 --- a/lustre/include/linux/obd.h +++ b/lustre/include/linux/obd.h @@ -80,10 +80,7 @@ struct mds_obd { struct ldlm_obd { struct ptlrpc_service *ldlm_service; struct ptlrpc_client *ldlm_client; - struct lustre_peer ldlm_server_peer; - - struct list_head ldlm_namespaces; - spinlock_t ldlm_lock; + struct ptlrpc_connection *ldlm_server_conn; }; struct echo_obd { @@ -128,7 +125,7 @@ struct ost_obd { struct osc_obd { struct obd_device *osc_tgt; struct ptlrpc_client *osc_client; - struct lustre_peer osc_peer; + struct ptlrpc_connection *osc_conn; }; /* corresponds to one of the obd's */ diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index bddb89df8..6bf38e8 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -54,8 +54,7 @@ static int ldlm_intent_compat(struct ldlm_lock *a, struct ldlm_lock *b) } static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, - struct ldlm_resource *resource, - ldlm_mode_t mode) + struct ldlm_resource *resource) { struct ldlm_lock *lock; @@ -68,7 +67,6 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, memset(lock, 0, sizeof(*lock)); lock->l_resource = resource; - lock->l_req_mode = mode; INIT_LIST_HEAD(&lock->l_children); if (parent != NULL) { @@ -132,94 +130,103 @@ static void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock) lock->l_data, lock->l_data_len); } -/* XXX: Revisit the error handling; we do not, for example, do - * ldlm_resource_put()s in our error cases, and we probably leak any allocated - * memory. */ -ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, - __u32 ns_id, - struct ldlm_handle *parent_lock_handle, - __u64 *res_id, - __u32 type, - struct ldlm_extent *req_ex, - ldlm_mode_t mode, - int *flags, - ldlm_lock_callback completion, - ldlm_lock_callback blocking, - void *data, - __u32 data_len, - struct ldlm_handle *lockh) +ldlm_error_t ldlm_local_lock_create(__u32 ns_id, + struct ldlm_handle *parent_lock_handle, + __u64 *res_id, + __u32 type, + struct ldlm_handle *lockh) { struct ldlm_namespace *ns; - struct ldlm_resource *res, *parent_res; + struct ldlm_resource *res, *parent_res = NULL; struct ldlm_lock *lock, *parent_lock; - struct ldlm_extent new_ex; - int incompat = 0, rc; - ldlm_res_policy policy; - ENTRY; + ns = ldlm_namespace_find(ns_id); + if (ns == NULL || ns->ns_hash == NULL) + RETURN(-ELDLM_BAD_NAMESPACE); parent_lock = ldlm_handle2object(parent_lock_handle); if (parent_lock) parent_res = parent_lock->l_resource; - else - parent_res = NULL; - - ns = ldlm_namespace_find(obddev, ns_id); - if (ns == NULL || ns->ns_hash == NULL) - RETURN(-ELDLM_BAD_NAMESPACE); res = ldlm_resource_get(ns, parent_res, res_id, type, 1); if (res == NULL) RETURN(-ENOMEM); - lock = ldlm_lock_new(parent_lock, res, mode); + lock = ldlm_lock_new(parent_lock, res); if (lock == NULL) RETURN(-ENOMEM); - if ((policy = ldlm_res_policy_table[type])) { - rc = policy(res, req_ex, &new_ex, mode, NULL); + ldlm_object2handle(lock, lockh); + + return ELDLM_OK; +} + +/* XXX: Revisit the error handling; we do not, for example, do + * ldlm_resource_put()s in our error cases, and we probably leak any allocated + * memory. */ +ldlm_error_t ldlm_local_lock_enqueue(struct ldlm_handle *lockh, + ldlm_mode_t mode, + struct ldlm_extent *req_ex, + int *flags, + ldlm_lock_callback completion, + ldlm_lock_callback blocking, + void *data, + __u32 data_len) +{ + struct ldlm_lock *lock; + struct ldlm_extent new_ex; + int incompat = 0, rc; + ldlm_res_policy policy; + ENTRY; + + lock = ldlm_handle2object(lockh); + if ((policy = ldlm_res_policy_table[lock->l_resource->lr_type])) { + rc = policy(lock->l_resource, req_ex, &new_ex, mode, NULL); if (rc == ELDLM_LOCK_CHANGED) { *flags |= LDLM_FL_LOCK_CHANGED; memcpy(req_ex, &new_ex, sizeof(new_ex)); } } - if ((type == LDLM_EXTENT && !req_ex) || - (type != LDLM_EXTENT && req_ex)) + if ((lock->l_resource->lr_type == LDLM_EXTENT && !req_ex) || + (lock->l_resource->lr_type != LDLM_EXTENT && req_ex)) LBUG(); if (req_ex) memcpy(&lock->l_extent, req_ex, sizeof(*req_ex)); + lock->l_req_mode = mode; lock->l_data = data; lock->l_data_len = data_len; lock->l_completion_ast = completion; lock->l_blocking_ast = blocking; - ldlm_object2handle(lock, lockh); - spin_lock(&res->lr_lock); + spin_lock(&lock->l_resource->lr_lock); /* FIXME: We may want to optimize by checking lr_most_restr */ - if (!list_empty(&res->lr_converting)) { - ldlm_resource_add_lock(res, res->lr_waiting.prev, lock); + if (!list_empty(&lock->l_resource->lr_converting)) { + ldlm_resource_add_lock(lock->l_resource, + lock->l_resource->lr_waiting.prev, lock); *flags |= LDLM_FL_BLOCK_CONV; GOTO(out, ELDLM_OK); } - if (!list_empty(&res->lr_waiting)) { - ldlm_resource_add_lock(res, res->lr_waiting.prev, lock); + if (!list_empty(&lock->l_resource->lr_waiting)) { + ldlm_resource_add_lock(lock->l_resource, + lock->l_resource->lr_waiting.prev, lock); *flags |= LDLM_FL_BLOCK_WAIT; GOTO(out, ELDLM_OK); } incompat = ldlm_lock_compat(lock); if (incompat) { - ldlm_resource_add_lock(res, res->lr_waiting.prev, lock); + ldlm_resource_add_lock(lock->l_resource, + lock->l_resource->lr_waiting.prev, lock); *flags |= LDLM_FL_BLOCK_GRANTED; GOTO(out, ELDLM_OK); } - ldlm_grant_lock(res, lock); + ldlm_grant_lock(lock->l_resource, lock); EXIT; out: - spin_unlock(&res->lr_lock); + spin_unlock(&lock->l_resource->lr_lock); return ELDLM_OK; } @@ -251,8 +258,7 @@ static void ldlm_reprocess_all(struct ldlm_resource *res) ldlm_reprocess_queue(res, &res->lr_waiting); } -ldlm_error_t ldlm_local_lock_cancel(struct obd_device *obddev, - struct ldlm_handle *lockh) +ldlm_error_t ldlm_local_lock_cancel(struct ldlm_handle *lockh) { struct ldlm_lock *lock; struct ldlm_resource *res; @@ -271,8 +277,7 @@ ldlm_error_t ldlm_local_lock_cancel(struct obd_device *obddev, RETURN(ELDLM_OK); } -ldlm_error_t ldlm_local_lock_convert(struct obd_device *obddev, - struct ldlm_handle *lockh, +ldlm_error_t ldlm_local_lock_convert(struct ldlm_handle *lockh, int new_mode, int *flags) { struct ldlm_lock *lock; diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 8662db0..8949e6c 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -32,8 +32,7 @@ static int _ldlm_namespace_new(struct obd_device *obddev, ldlm_error_t err; ENTRY; - rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf); - req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) { CERROR("out of memory\n"); req->rq_status = -ENOMEM; @@ -56,10 +55,10 @@ static int _ldlm_enqueue(struct ptlrpc_request *req) struct ldlm_request *dlm_req; int rc, size = sizeof(*dlm_rep); ldlm_error_t err; + struct ldlm_lock *lock; ENTRY; - rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf); - req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) { CERROR("out of memory\n"); req->rq_status = -ENOMEM; @@ -70,33 +69,37 @@ static int _ldlm_enqueue(struct ptlrpc_request *req) memcpy(&dlm_rep->lock_extent, &dlm_req->lock_desc.l_extent, sizeof(dlm_rep->lock_extent)); + dlm_rep->flags = dlm_req->flags; + + err = ldlm_local_lock_create(dlm_req->lock_desc.l_resource.lr_ns_id, + &dlm_req->lock_handle2, + dlm_req->lock_desc.l_resource.lr_name, + dlm_req->lock_desc.l_resource.lr_type, + &dlm_rep->lock_handle); + if (err != ELDLM_OK) + GOTO(out, err); - err = ldlm_local_lock_enqueue(req->rq_obd, - dlm_req->lock_desc.l_resource.lr_ns_id, - &dlm_req->lock_handle2, - dlm_req->lock_desc.l_resource.lr_name, - dlm_req->lock_desc.l_resource.lr_type, - &dlm_rep->lock_extent, + err = ldlm_local_lock_enqueue(&dlm_rep->lock_handle, dlm_req->lock_desc.l_req_mode, - &dlm_req->flags, - //ldlm_cli_callback, - NULL, + &dlm_rep->lock_extent, + &dlm_rep->flags, + NULL, //ldlm_cli_callback, ldlm_cli_callback, lustre_msg_buf(req->rq_reqmsg, 1), - req->rq_reqmsg->buflens[1], - &dlm_rep->lock_handle); - if (err != -ENOMEM && err != -ELDLM_BAD_NAMESPACE) { - struct ldlm_lock *lock; - lock = ldlm_handle2object(&dlm_rep->lock_handle); - memcpy(&lock->l_peer, &req->rq_peer, sizeof(lock->l_peer)); - memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1, - sizeof(lock->l_remote_handle)); - } - req->rq_status = err; + req->rq_reqmsg->buflens[1]); + if (err != ELDLM_OK) + GOTO(out, err); + lock = ldlm_handle2object(&dlm_rep->lock_handle); + memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1, + sizeof(lock->l_remote_handle)); + lock->l_connection = ptlrpc_connection_addref(req->rq_connection); + EXIT; + out: + req->rq_status = err; CERROR("err = %d\n", err); - RETURN(0); + return 0; } static int _ldlm_convert(struct ptlrpc_request *req) @@ -105,8 +108,7 @@ static int _ldlm_convert(struct ptlrpc_request *req) int rc; ENTRY; - rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf); - req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) { CERROR("out of memory\n"); req->rq_status = -ENOMEM; @@ -115,7 +117,7 @@ static int _ldlm_convert(struct ptlrpc_request *req) dlm_req = lustre_msg_buf(req->rq_reqmsg, 0); req->rq_status = - ldlm_local_lock_convert(req->rq_obd, &dlm_req->lock_handle1, + ldlm_local_lock_convert(&dlm_req->lock_handle1, dlm_req->lock_desc.l_req_mode, &dlm_req->flags); RETURN(0); @@ -127,8 +129,7 @@ static int _ldlm_cancel(struct ptlrpc_request *req) int rc; ENTRY; - rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf); - req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) { CERROR("out of memory\n"); req->rq_status = -ENOMEM; @@ -136,8 +137,7 @@ static int _ldlm_cancel(struct ptlrpc_request *req) } dlm_req = lustre_msg_buf(req->rq_reqmsg, 0); - req->rq_status = - ldlm_local_lock_cancel(req->rq_obd, &dlm_req->lock_handle1); + req->rq_status = ldlm_local_lock_cancel(&dlm_req->lock_handle1); RETURN(0); } @@ -148,8 +148,7 @@ static int _ldlm_callback(struct ptlrpc_request *req) int rc; ENTRY; - rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf); - req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) { CERROR("out of memory\n"); req->rq_status = -ENOMEM; @@ -171,8 +170,7 @@ static int ldlm_handle(struct obd_device *dev, struct ptlrpc_service *svc, int rc; ENTRY; - rc = lustre_unpack_msg(req->rq_reqbuf, req->rq_reqlen); - req->rq_reqmsg = (struct lustre_msg *)req->rq_reqbuf; + rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen); if (rc) { CERROR("lustre_ldlm: Invalid request\n"); GOTO(out, rc); @@ -273,8 +271,8 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data) int err; ENTRY; - INIT_LIST_HEAD(&obddev->u.ldlm.ldlm_namespaces); - obddev->u.ldlm.ldlm_lock = SPIN_LOCK_UNLOCKED; + INIT_LIST_HEAD(&ldlm_namespaces); + ldlm_spinlock = SPIN_LOCK_UNLOCKED; ldlm->ldlm_service = ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL, @@ -293,11 +291,10 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data) if (ldlm->ldlm_client == NULL) LBUG(); - ptlrpc_init_client(-1, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, + ptlrpc_init_client(NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, ldlm->ldlm_client); - err = ptlrpc_connect_client("ldlm", ldlm->ldlm_client, - &ldlm->ldlm_server_peer); - if (err) { + ldlm->ldlm_server_conn = ptlrpc_connect_client("ldlm"); + if (!ldlm->ldlm_server_conn) { CERROR("cannot create client\n"); LBUG(); } @@ -359,16 +356,16 @@ static int ldlm_free_all(struct obd_device *obddev) struct list_head *tmp, *pos; int rc = 0; - ldlm_lock(obddev); + ldlm_lock(); - list_for_each_safe(tmp, pos, &obddev->u.ldlm.ldlm_namespaces) { + list_for_each_safe(tmp, pos, &ldlm_namespaces) { struct ldlm_namespace *ns; ns = list_entry(tmp, struct ldlm_namespace, ns_link); rc |= do_free_namespace(ns); } - ldlm_unlock(obddev); + ldlm_unlock(); return rc; } @@ -388,6 +385,7 @@ static int ldlm_cleanup(struct obd_device *obddev) OBD_FREE(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client)); OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service)); + ptlrpc_put_connection(ldlm->ldlm_server_conn); if (ldlm_free_all(obddev)) { CERROR("ldlm_free_all could not complete.\n"); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 3e796ac..7c20bd9 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -15,7 +15,7 @@ #include -int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct lustre_peer *peer, +int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, __u32 ns_id, struct ldlm_handle *parent_lock_handle, __u64 *res_id, @@ -28,20 +28,26 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct lustre_peer *peer, struct ldlm_handle *lockh, struct ptlrpc_request **request) { + struct ldlm_handle local_lockh; + struct ldlm_lock *lock; struct ldlm_request *body; struct ldlm_reply *reply; - struct ptlrpc_request *req; + struct ptlrpc_request *req = NULL; char *bufs[2] = {NULL, data}; int rc, size[2] = {sizeof(*body), data_len}; + ldlm_error_t err; + ENTRY; -#if 0 - ldlm_local_lock_enqueue(obddev, ns_id, parent_lock_handle, res_id, type, - req_ex, mode, flags); -#endif + err = ldlm_local_lock_create(ns_id, parent_lock_handle, res_id, + type, &local_lockh); + if (err != ELDLM_OK) + RETURN(err); - /* FIXME: if this is a local lock, stop here. */ + /* Is this lock locally managed? */ + if (conn == NULL) + GOTO(local, 0); - req = ptlrpc_prep_req(cl, peer, LDLM_ENQUEUE, 2, size, bufs); + req = ptlrpc_prep_req(cl, conn, LDLM_ENQUEUE, 2, size, bufs); if (!req) GOTO(out, rc = -ENOMEM); @@ -58,7 +64,7 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct lustre_peer *peer, sizeof(body->lock_desc.l_extent)); body->flags = *flags; - /* FIXME: lock_handle1 will be the shadow handle */ + memcpy(&body->lock_handle1, &local_lockh, sizeof(body->lock_handle1)); if (parent_lock_handle) memcpy(&body->lock_handle2, parent_lock_handle, @@ -68,31 +74,45 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct lustre_peer *peer, size[0] = sizeof(*reply); req->rq_replen = lustre_msg_size(1, size); - rc = ptlrpc_queue_wait(cl, req); + rc = ptlrpc_queue_wait(req); rc = ptlrpc_check_status(req, rc); if (rc != ELDLM_OK) GOTO(out, rc); reply = lustre_msg_buf(req->rq_repmsg, 0); - CERROR("remote handle: %p\n", - (void *)(unsigned long)reply->lock_handle.addr); + lock = ldlm_handle2object(&local_lockh); + memcpy(&lock->l_remote_handle, &reply->lock_handle, + sizeof(lock->l_remote_handle)); + *flags = reply->flags; + + CERROR("remote handle: %p, flags: %d\n", + (void *)(unsigned long)reply->lock_handle.addr, *flags); CERROR("extent: %Lu -> %Lu\n", reply->lock_extent.start, reply->lock_extent.end); EXIT; + local: + rc = ldlm_local_lock_enqueue(&local_lockh, mode, req_ex, flags, NULL, + NULL, data, data_len); out: *request = req; return rc; } -int ldlm_cli_namespace_new(struct ptlrpc_client *cl, struct lustre_peer *peer, - __u32 ns_id, struct ptlrpc_request **request) +int ldlm_cli_namespace_new(struct obd_device *obddev, struct ptlrpc_client *cl, + struct ptlrpc_connection *conn, __u32 ns_id, + struct ptlrpc_request **request) { + struct ldlm_namespace *ns; struct ldlm_request *body; struct ptlrpc_request *req; int rc, size = sizeof(*body); + int err; - req = ptlrpc_prep_req(cl, peer, LDLM_NAMESPACE_NEW, 1, &size, NULL); + if (conn == NULL) + GOTO(local, 0); + + req = ptlrpc_prep_req(cl, conn, LDLM_NAMESPACE_NEW, 1, &size, NULL); if (!req) GOTO(out, rc = -ENOMEM); @@ -101,10 +121,18 @@ int ldlm_cli_namespace_new(struct ptlrpc_client *cl, struct lustre_peer *peer, req->rq_replen = lustre_msg_size(0, NULL); - rc = ptlrpc_queue_wait(cl, req); + rc = ptlrpc_queue_wait(req); rc = ptlrpc_check_status(req, rc); + if (rc) + GOTO(out, rc); EXIT; + local: + err = ldlm_namespace_new(obddev, ns_id, &ns); + if (err != ELDLM_OK) { + /* XXX: It succeeded remotely but failed locally. What to do? */ + return err; + } out: *request = req; return rc; @@ -120,7 +148,8 @@ int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new, int rc, size[2] = {sizeof(*body), data_len}; char *bufs[2] = {NULL, data}; - req = ptlrpc_prep_req(cl, &lock->l_peer, LDLM_CALLBACK, 2, size, bufs); + req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 2, size, + bufs); if (!req) GOTO(out, rc = -ENOMEM); @@ -133,7 +162,7 @@ int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new, req->rq_replen = lustre_msg_size(0, NULL); - rc = ptlrpc_queue_wait(cl, req); + rc = ptlrpc_queue_wait(req); rc = ptlrpc_check_status(req, rc); ptlrpc_free_req(req); @@ -141,3 +170,86 @@ int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new, out: return rc; } + +int ldlm_cli_convert(struct ptlrpc_client *cl, struct ldlm_handle *lockh, + int new_mode, int *flags, struct ptlrpc_request **request) +{ + struct ldlm_request *body; + struct ldlm_reply *reply; + struct ldlm_lock *lock; + struct ptlrpc_request *req; + int rc, size[2] = {sizeof(*body), lock->l_data_len}; + char *bufs[2] = {NULL, lock->l_data}; + + lock = ldlm_handle2object(lockh); + + if (lock->l_connection == NULL) + GOTO(local, 0); + + req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CONVERT, 2, size, + bufs); + if (!req) + GOTO(out, rc = -ENOMEM); + + body = lustre_msg_buf(req->rq_reqmsg, 0); + memcpy(&body->lock_handle1, &lock->l_remote_handle, + sizeof(body->lock_handle1)); + + body->lock_desc.l_req_mode = new_mode; + body->flags = *flags; + + req->rq_replen = lustre_msg_size(1, size); + + rc = ptlrpc_queue_wait(req); + rc = ptlrpc_check_status(req, rc); + if (rc != ELDLM_OK) + GOTO(out, rc); + + reply = lustre_msg_buf(req->rq_repmsg, 0); + *flags = reply->flags; + + EXIT; + local: + rc = ldlm_local_lock_convert(lockh, new_mode, flags); + out: + *request = req; + return rc; +} + +int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_handle *lockh, + struct ptlrpc_request **request) +{ + struct ldlm_request *body; + struct ldlm_lock *lock; + struct ptlrpc_request *req; + int rc, size[2] = {sizeof(*body), lock->l_data_len}; + char *bufs[2] = {NULL, lock->l_data}; + + lock = ldlm_handle2object(lockh); + + if (lock->l_connection == NULL) + GOTO(local, 0); + + req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CANCEL, 2, size, + bufs); + if (!req) + GOTO(out, rc = -ENOMEM); + + body = lustre_msg_buf(req->rq_reqmsg, 0); + memcpy(&body->lock_handle1, &lock->l_remote_handle, + sizeof(body->lock_handle1)); + + req->rq_replen = lustre_msg_size(0, NULL); + + rc = ptlrpc_queue_wait(req); + rc = ptlrpc_check_status(req, rc); + if (rc != ELDLM_OK) + GOTO(out, rc); + + EXIT; + local: + rc = ldlm_local_lock_cancel(lockh); + out: + *request = req; + return rc; +} diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index 88f82f2..157c89e 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -26,13 +26,16 @@ kmem_cache_t *ldlm_resource_slab; kmem_cache_t *ldlm_lock_slab; -struct ldlm_namespace *ldlm_namespace_find(struct obd_device *obddev, __u32 id) +struct list_head ldlm_namespaces; +spinlock_t ldlm_spinlock; + +struct ldlm_namespace *ldlm_namespace_find(__u32 id) { struct list_head *tmp; struct ldlm_namespace *res; res = NULL; - list_for_each(tmp, &obddev->u.ldlm.ldlm_namespaces) { + list_for_each(tmp, &ldlm_namespaces) { struct ldlm_namespace *chk; chk = list_entry(tmp, struct ldlm_namespace, ns_link); @@ -45,7 +48,7 @@ struct ldlm_namespace *ldlm_namespace_find(struct obd_device *obddev, __u32 id) return res; } -/* this must be called with ldlm_lock(obddev) held */ +/* this must be called with ldlm_lock() held */ static void res_hash_init(struct ldlm_namespace *ns) { struct list_head *res_hash; @@ -71,7 +74,7 @@ ldlm_error_t ldlm_namespace_new(struct obd_device *obddev, __u32 id, { struct ldlm_namespace *ns; - if (ldlm_namespace_find(obddev, id)) + if (ldlm_namespace_find(id)) return -ELDLM_NAMESPACE_EXISTS; OBD_ALLOC(ns, sizeof(*ns)); @@ -82,7 +85,7 @@ ldlm_error_t ldlm_namespace_new(struct obd_device *obddev, __u32 id, ns->ns_obddev = obddev; INIT_LIST_HEAD(&ns->ns_root_list); - list_add(&ns->ns_link, &obddev->u.ldlm.ldlm_namespaces); + list_add(&ns->ns_link, &ldlm_namespaces); res_hash_init(ns); atomic_set(&ns->ns_refcount, 0); @@ -138,7 +141,7 @@ static struct ldlm_resource *ldlm_resource_new(void) return res; } -/* ldlm_lock(obddev) must be taken before calling resource_add */ +/* ldlm_lock() must be taken before calling resource_add */ static struct ldlm_resource *ldlm_resource_add(struct ldlm_namespace *ns, struct ldlm_resource *parent, __u64 *name, __u32 type) diff --git a/lustre/ldlm/ldlm_test.c b/lustre/ldlm/ldlm_test.c index 39f4bd9..2ebe593 100644 --- a/lustre/ldlm/ldlm_test.c +++ b/lustre/ldlm/ldlm_test.c @@ -39,40 +39,37 @@ int ldlm_test_basics(struct obd_device *obddev) struct ldlm_handle lockh_1, lockh_2; int flags; - ldlm_lock(obddev); + ldlm_lock(); err = ldlm_namespace_new(obddev, 1, &ns); if (err != ELDLM_OK) LBUG(); - res = ldlm_resource_get(ns, NULL, res_id, LDLM_PLAIN, 1); - if (res == NULL) - LBUG(); - - /* Get a couple of read locks */ - err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_PLAIN, - NULL, LCK_CR, &flags, NULL, - ldlm_test_callback, NULL, 0, &lockh_1); + err = ldlm_local_lock_create(1, NULL, res_id, LDLM_PLAIN, &lockh_1); + err = ldlm_local_lock_enqueue(&lockh_1, LCK_CR, NULL, &flags, NULL, + ldlm_test_callback, NULL, 0); if (err != ELDLM_OK) LBUG(); - err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_PLAIN, - NULL, LCK_EX, &flags, NULL, - ldlm_test_callback, NULL, 0, &lockh_2); + err = ldlm_local_lock_create(1, NULL, res_id, LDLM_PLAIN, &lockh_2); + err = ldlm_local_lock_enqueue(&lockh_2, LCK_EX, NULL, &flags, NULL, + ldlm_test_callback, NULL, 0); if (err != ELDLM_OK) LBUG(); if (!(flags & LDLM_FL_BLOCK_GRANTED)) LBUG(); + res = ldlm_resource_get(ns, NULL, res_id, LDLM_PLAIN, 1); + if (res == NULL) + LBUG(); ldlm_resource_dump(res); - err = ldlm_local_lock_convert(obddev, &lockh_1, LCK_NL, &flags); + err = ldlm_local_lock_convert(&lockh_1, LCK_NL, &flags); if (err != ELDLM_OK) LBUG(); ldlm_resource_dump(res); - - ldlm_unlock(obddev); + ldlm_unlock(); return 0; } @@ -87,34 +84,34 @@ int ldlm_test_extents(struct obd_device *obddev) ldlm_error_t err; int flags; - ldlm_lock(obddev); + ldlm_lock(); err = ldlm_namespace_new(obddev, 2, &ns); if (err != ELDLM_OK) LBUG(); flags = 0; - err = ldlm_local_lock_enqueue(obddev, 2, NULL, res_id, LDLM_EXTENT, - &ext1, LCK_PR, &flags, NULL, NULL, NULL, - 0, &ext1_h); + err = ldlm_local_lock_create(2, NULL, res_id, LDLM_EXTENT, &ext1_h); + err = ldlm_local_lock_enqueue(&ext1_h, LCK_PR, &ext1, &flags, NULL, + NULL, NULL, 0); if (err != ELDLM_OK) LBUG(); if (!(flags & LDLM_FL_LOCK_CHANGED)) LBUG(); flags = 0; - err = ldlm_local_lock_enqueue(obddev, 2, NULL, res_id, LDLM_EXTENT, - &ext2, LCK_PR, &flags, NULL, NULL, NULL, - 0, &ext2_h); + err = ldlm_local_lock_create(2, NULL, res_id, LDLM_EXTENT, &ext2_h); + err = ldlm_local_lock_enqueue(&ext2_h, LCK_PR, &ext2, &flags, NULL, + NULL, NULL, 0); if (err != ELDLM_OK) LBUG(); if (!(flags & LDLM_FL_LOCK_CHANGED)) LBUG(); flags = 0; - err = ldlm_local_lock_enqueue(obddev, 2, NULL, res_id, LDLM_EXTENT, - &ext3, LCK_EX, &flags, NULL, NULL, NULL, - 0, &ext3_h); + err = ldlm_local_lock_create(2, NULL, res_id, LDLM_EXTENT, &ext3_h); + err = ldlm_local_lock_enqueue(&ext3_h, LCK_EX, &ext3, &flags, NULL, + NULL, NULL, 0); if (err != ELDLM_OK) LBUG(); if (!(flags & LDLM_FL_BLOCK_GRANTED)) @@ -124,12 +121,12 @@ int ldlm_test_extents(struct obd_device *obddev) /* Convert/cancel blocking locks */ flags = 0; - err = ldlm_local_lock_convert(obddev, &ext1_h, LCK_NL, &flags); + err = ldlm_local_lock_convert(&ext1_h, LCK_NL, &flags); if (err != ELDLM_OK) LBUG(); flags = 0; - err = ldlm_local_lock_cancel(obddev, &ext2_h); + err = ldlm_local_lock_cancel(&ext2_h); if (err != ELDLM_OK) LBUG(); @@ -139,7 +136,7 @@ int ldlm_test_extents(struct obd_device *obddev) LBUG(); ldlm_resource_dump(res); - ldlm_unlock(obddev); + ldlm_unlock(); return 0; } @@ -155,21 +152,21 @@ static int ldlm_test_network(struct obd_device *obddev) int flags = 0; ldlm_error_t err; - err = ldlm_cli_namespace_new(ldlm->ldlm_client, &ldlm->ldlm_server_peer, - 3, &request); + err = ldlm_cli_namespace_new(obddev, ldlm->ldlm_client, + ldlm->ldlm_server_conn, 3, &request); ptlrpc_free_req(request); CERROR("ldlm_cli_namespace_new: %d\n", err); if (err != ELDLM_OK) GOTO(out, err); - err = ldlm_cli_enqueue(ldlm->ldlm_client, &ldlm->ldlm_server_peer, 3, + err = ldlm_cli_enqueue(ldlm->ldlm_client, ldlm->ldlm_server_conn, 3, NULL, res_id, LDLM_EXTENT, &ext, LCK_PR, &flags, NULL, 0, &lockh1, &request); ptlrpc_free_req(request); CERROR("ldlm_cli_enqueue: %d\n", err); flags = 0; - err = ldlm_cli_enqueue(ldlm->ldlm_client, &ldlm->ldlm_server_peer, 3, + err = ldlm_cli_enqueue(ldlm->ldlm_client, ldlm->ldlm_server_conn, 3, NULL, res_id, LDLM_EXTENT, &ext, LCK_EX, &flags, NULL, 0, &lockh2, &request); ptlrpc_free_req(request); diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index 2a645fd..5413342 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -60,7 +60,7 @@ static int ll_dir_readpage(struct file *file, struct page *page) char *buf; __u64 offset; int rc = 0; - struct ptlrpc_request *request; + struct ptlrpc_request *request = NULL; ENTRY; @@ -80,7 +80,7 @@ static int ll_dir_readpage(struct file *file, struct page *page) offset = page->index << PAGE_SHIFT; buf = kmap(page); - rc = mdc_readpage(&sbi->ll_mds_client, &sbi->ll_mds_peer, inode->i_ino, + rc = mdc_readpage(&sbi->ll_mds_client, sbi->ll_mds_conn, inode->i_ino, S_IFDIR, offset, buf, &request); kunmap(page); ptlrpc_free_req(request); diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 25f1444..8073b49 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -48,7 +48,7 @@ extern inline struct obdo * ll_oa_from_inode(struct inode *inode, int valid); static int ll_file_open(struct inode *inode, struct file *file) { int rc; - struct ptlrpc_request *req; + struct ptlrpc_request *req = NULL; struct ll_file_data *fd; struct obdo *oa; struct ll_sb_info *sbi = ll_i2sbi(inode); @@ -62,7 +62,7 @@ static int ll_file_open(struct inode *inode, struct file *file) GOTO(out, rc = -ENOMEM); memset(fd, 0, sizeof(*fd)); - rc = mdc_open(&sbi->ll_mds_client, &sbi->ll_mds_peer, inode->i_ino, + rc = mdc_open(&sbi->ll_mds_client, sbi->ll_mds_conn, inode->i_ino, S_IFREG, file->f_flags, &fd->fd_mdshandle, &req); if (!fd->fd_mdshandle) CERROR("mdc_open didn't assign fd_mdshandle\n"); @@ -101,7 +101,7 @@ static int ll_file_open(struct inode *inode, struct file *file) static int ll_file_release(struct inode *inode, struct file *file) { int rc; - struct ptlrpc_request *req; + struct ptlrpc_request *req = NULL; struct ll_file_data *fd; struct obdo *oa; struct ll_sb_info *sbi = ll_i2sbi(inode); @@ -134,8 +134,8 @@ static int ll_file_release(struct inode *inode, struct file *file) rc = -EIO; } - rc = mdc_close(&sbi->ll_mds_client, &sbi->ll_mds_peer, inode->i_ino, - S_IFREG, fd->fd_mdshandle, &req); + rc = mdc_close(&sbi->ll_mds_client, sbi->ll_mds_conn, inode->i_ino, + S_IFREG, fd->fd_mdshandle, &req); ptlrpc_free_req(req); if (rc) { if (rc > 0) diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 5d6fb60..f5a1e33 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -92,7 +92,7 @@ static int ll_find_inode(struct inode *inode, unsigned long ino, void *opaque) static struct dentry *ll_lookup(struct inode * dir, struct dentry *dentry) { - struct ptlrpc_request *request; + struct ptlrpc_request *request = NULL; struct inode * inode = NULL; struct ll_sb_info *sbi = ll_i2sbi(dir); int err; @@ -101,19 +101,18 @@ static struct dentry *ll_lookup(struct inode * dir, struct dentry *dentry) ENTRY; if (dentry->d_name.len > EXT2_NAME_LEN) - return ERR_PTR(-ENAMETOOLONG); + RETURN(ERR_PTR(-ENAMETOOLONG)); ino = ll_inode_by_name(dir, dentry, &type); if (!ino) - goto negative; + GOTO(negative, NULL); - err = mdc_getattr(&sbi->ll_mds_client, &sbi->ll_mds_peer, ino, type, + err = mdc_getattr(&sbi->ll_mds_client, sbi->ll_mds_conn, ino, type, OBD_MD_FLNOTOBD|OBD_MD_FLBLOCKS, &request); if (err) { CERROR("failure %d inode %ld\n", err, ino); ptlrpc_free_req(request); - EXIT; - return ERR_PTR(-abs(err)); + RETURN(ERR_PTR(-abs(err))); } inode = iget4(dir->i_sb, ino, ll_find_inode, @@ -121,8 +120,9 @@ static struct dentry *ll_lookup(struct inode * dir, struct dentry *dentry) ptlrpc_free_req(request); if (!inode) - return ERR_PTR(-ENOMEM); + RETURN(ERR_PTR(-ENOMEM)); + EXIT; negative: d_add(dentry, inode); return NULL; @@ -133,7 +133,7 @@ static struct inode *ll_create_node(struct inode *dir, const char *name, int mode, __u64 id) { struct inode *inode; - struct ptlrpc_request *request; + struct ptlrpc_request *request = NULL; struct mds_body *body; int err; time_t time = CURRENT_TIME; @@ -141,7 +141,7 @@ static struct inode *ll_create_node(struct inode *dir, const char *name, ENTRY; - err = mdc_create(&sbi->ll_mds_client, &sbi->ll_mds_peer, dir, name, + err = mdc_create(&sbi->ll_mds_client, sbi->ll_mds_conn, dir, name, namelen, tgt, tgtlen, mode, id, current->uid, current->gid, time, &request); if (err) { @@ -185,13 +185,13 @@ static struct inode *ll_create_node(struct inode *dir, const char *name, int ll_mdc_unlink(struct inode *dir, struct inode *child, const char *name, int len) { - struct ptlrpc_request *request; + struct ptlrpc_request *request = NULL; int err; struct ll_sb_info *sbi = ll_i2sbi(dir); ENTRY; - err = mdc_unlink(&sbi->ll_mds_client, &sbi->ll_mds_peer, dir, child, + err = mdc_unlink(&sbi->ll_mds_client, sbi->ll_mds_conn, dir, child, name, len, &request); ptlrpc_free_req(request); @@ -202,13 +202,13 @@ int ll_mdc_unlink(struct inode *dir, struct inode *child, int ll_mdc_link(struct dentry *src, struct inode *dir, const char *name, int len) { - struct ptlrpc_request *request; + struct ptlrpc_request *request = NULL; int err; struct ll_sb_info *sbi = ll_i2sbi(dir); ENTRY; - err = mdc_link(&sbi->ll_mds_client, &sbi->ll_mds_peer, src, dir, name, + err = mdc_link(&sbi->ll_mds_client, sbi->ll_mds_conn, src, dir, name, len, &request); ptlrpc_free_req(request); @@ -219,13 +219,13 @@ int ll_mdc_link(struct dentry *src, struct inode *dir, int ll_mdc_rename(struct inode *src, struct inode *tgt, struct dentry *old, struct dentry *new) { - struct ptlrpc_request *request; + struct ptlrpc_request *request = NULL; int err; struct ll_sb_info *sbi = ll_i2sbi(src); ENTRY; - err = mdc_rename(&sbi->ll_mds_client, &sbi->ll_mds_peer, src, tgt, + err = mdc_rename(&sbi->ll_mds_client, sbi->ll_mds_conn, src, tgt, old->d_name.name, old->d_name.len, new->d_name.name, new->d_name.len, &request); ptlrpc_free_req(request); diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index 0021f31..091887e 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -1,7 +1,9 @@ -/* - * Lustre Light I/O Page Cache +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Lustre Lite I/O Page Cache * - * Copyright (C) 2002, Cluster File Systems, Inc. + * Copyright (C) 2002 Cluster File Systems, Inc. */ #include @@ -26,14 +28,9 @@ #define DEBUG_SUBSYSTEM S_LLITE -#include -#include -#include -#include #include #include - #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,4,10)) /* * Add a page to the dirty page list. @@ -167,19 +164,17 @@ static int ll_brw(int rw, struct inode *inode, struct page *page, int create) obd_off offset = ((obd_off)page->index) << PAGE_SHIFT; obd_flag flags = create ? OBD_BRW_CREATE : 0; int err; - ENTRY; oa = ll_oa_from_inode(inode, OBD_MD_FLNOTOBD); - if (!oa) { - return -ENOMEM; - } + if (!oa) + RETURN(-ENOMEM); + err = obd_brw(rw, ll_i2obdconn(inode), num_obdo, &oa, &bufs_per_obdo, - &page, &count, &offset, &flags); + &page, &count, &offset, &flags); obdo_free(oa); - EXIT; - return err; + RETURN(err); } /* ll_brw */ extern void set_page_clean(struct page *); @@ -191,7 +186,6 @@ static int ll_readpage(struct file *file, struct page *page) { struct inode *inode = page->mapping->host; int rc = 0; - ENTRY; if (!PageLocked(page)) @@ -201,14 +195,12 @@ static int ll_readpage(struct file *file, struct page *page) <= page->index) { memset(kmap(page), 0, PAGE_CACHE_SIZE); kunmap(page); - EXIT; - goto readpage_out; + GOTO(readpage_out, rc); } if (Page_Uptodate(page)) { CERROR("Explain this please?\n"); - EXIT; - goto readpage_out; + GOTO(readpage_out, rc); } rc = ll_brw(OBD_BRW_READ, inode, page, 0); @@ -223,7 +215,7 @@ static int ll_readpage(struct file *file, struct page *page) static int ll_prepare_write(struct file *file, struct page *page, unsigned from, - unsigned to) + unsigned to) { struct inode *inode = page->mapping->host; obd_off offset = ((obd_off)page->index) << PAGE_SHIFT; @@ -235,15 +227,12 @@ static int ll_prepare_write(struct file *file, struct page *page, unsigned from, if (!PageLocked(page)) LBUG(); - if (Page_Uptodate(page)) { - EXIT; - goto prepare_done; - } + if (Page_Uptodate(page)) + GOTO(prepare_done, rc); if (offset + from >= inode->i_size) { memset(addr, 0, PAGE_SIZE); - EXIT; - goto prepare_done; + GOTO(prepare_done, rc); } /* We're completely overwriting an existing page, so _don't_ set it up @@ -255,11 +244,11 @@ static int ll_prepare_write(struct file *file, struct page *page, unsigned from, rc = ll_brw(OBD_BRW_READ, inode, page, 0); + EXIT; prepare_done: - if ( !rc ) + if (!rc) SetPageUptodate(page); - EXIT; return rc; } @@ -283,8 +272,7 @@ static int ll_writepage(struct page *page) CERROR("ll_brw failure %d\n", err); } UnlockPage(page); - EXIT; - return err; + RETURN(err); } /* SYNCHRONOUS I/O to object storage for an inode -- object attr will be updated @@ -335,8 +323,7 @@ static int ll_commit_write(struct file *file, struct page *page, } obdo_free(oa); - EXIT; - return err; + RETURN(err); } /* ll_commit_write */ void ll_truncate(struct inode *inode) @@ -360,7 +347,7 @@ void ll_truncate(struct inode *inode) CERROR("obd_truncate fails (%d)\n", err); } EXIT; - return; + return; } /* ll_truncate */ struct address_space_operations ll_aops = { @@ -371,4 +358,3 @@ struct address_space_operations ll_aops = { commit_write: ll_commit_write, bmap: NULL }; - diff --git a/lustre/llite/super.c b/lustre/llite/super.c index a897d38..0a28bd5 100644 --- a/lustre/llite/super.c +++ b/lustre/llite/super.c @@ -138,8 +138,7 @@ static struct super_block * ll_read_super(struct super_block *sb, /* the first parameter should become an mds device no */ ptlrpc_init_client(llite_ha_mgr, MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, &sbi->ll_mds_client); - err = ptlrpc_connect_client("mds", &sbi->ll_mds_client, - &sbi->ll_mds_peer); + sbi->ll_mds_conn = ptlrpc_connect_client("mds"); if (err) { CERROR("cannot find MDS\n"); GOTO(out_disc, sb = NULL); @@ -156,7 +155,7 @@ static struct super_block * ll_read_super(struct super_block *sb, sb->s_op = &ll_super_operations; /* make root inode */ - err = mdc_getattr(&sbi->ll_mds_client, &sbi->ll_mds_peer, + err = mdc_getattr(&sbi->ll_mds_client, sbi->ll_mds_conn, sbi->ll_rootino, S_IFDIR, OBD_MD_FLNOTOBD|OBD_MD_FLBLOCKS, &request); if (err) { @@ -195,6 +194,7 @@ static void ll_put_super(struct super_block *sb) struct ll_sb_info *sbi = sb->u.generic_sbp; ENTRY; obd_disconnect(&sbi->ll_conn); + ptlrpc_put_connection(sbi->ll_mds_conn); OBD_FREE(sb->u.generic_sbp, sizeof(*sbi)); MOD_DEC_USE_COUNT; EXIT; @@ -256,7 +256,7 @@ out: int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc) { - struct ptlrpc_request *request; + struct ptlrpc_request *request = NULL; struct ll_sb_info *sbi = ll_i2sbi(inode); int err; @@ -265,7 +265,7 @@ int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc) /* change incore inode */ ll_attr2inode(inode, attr, do_trunc); - err = mdc_setattr(&sbi->ll_mds_client, &sbi->ll_mds_peer, inode, attr, + err = mdc_setattr(&sbi->ll_mds_client, sbi->ll_mds_conn, inode, attr, &request); if (err) CERROR("mdc_setattr fails (%d)\n", err); diff --git a/lustre/mdc/mdc_reint.c b/lustre/mdc/mdc_reint.c index 58eb55a..7d38319 100644 --- a/lustre/mdc/mdc_reint.c +++ b/lustre/mdc/mdc_reint.c @@ -35,7 +35,7 @@ static int mdc_reint(struct ptlrpc_client *cl, struct ptlrpc_request *request) { int rc; - rc = ptlrpc_queue_wait(cl, request); + rc = ptlrpc_queue_wait(request); rc = ptlrpc_check_status(request, rc); if (rc) CERROR("error in handling %d\n", rc); @@ -43,7 +43,7 @@ static int mdc_reint(struct ptlrpc_client *cl, struct ptlrpc_request *request) return rc; } -int mdc_setattr(struct ptlrpc_client *cl, struct lustre_peer *peer, +int mdc_setattr(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, struct inode *inode, struct iattr *iattr, struct ptlrpc_request **request) { @@ -52,7 +52,7 @@ int mdc_setattr(struct ptlrpc_client *cl, struct lustre_peer *peer, int rc, size = sizeof(*rec); ENTRY; - req = ptlrpc_prep_req(cl, peer, MDS_REINT, 1, &size, NULL); + req = ptlrpc_prep_req(cl, conn, MDS_REINT, 1, &size, NULL); if (!req) RETURN(-ENOMEM); @@ -68,7 +68,7 @@ int mdc_setattr(struct ptlrpc_client *cl, struct lustre_peer *peer, RETURN(rc); } -int mdc_create(struct ptlrpc_client *cl, struct lustre_peer *peer, +int mdc_create(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, struct inode *dir, const char *name, int namelen, const char *tgt, int tgtlen, int mode, __u64 id, __u32 uid, __u32 gid, __u64 time, struct ptlrpc_request **request) @@ -79,7 +79,7 @@ int mdc_create(struct ptlrpc_client *cl, struct lustre_peer *peer, char *tmp; ENTRY; - req = ptlrpc_prep_req(cl, peer, MDS_REINT, 3, size, NULL); + req = ptlrpc_prep_req(cl, conn, MDS_REINT, 3, size, NULL); if (!req) RETURN(-ENOMEM); @@ -103,7 +103,7 @@ int mdc_create(struct ptlrpc_client *cl, struct lustre_peer *peer, RETURN(rc); } -int mdc_unlink(struct ptlrpc_client *cl, struct lustre_peer *peer, +int mdc_unlink(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, struct inode *dir, struct inode *child, const char *name, int namelen, struct ptlrpc_request **request) { @@ -113,7 +113,7 @@ int mdc_unlink(struct ptlrpc_client *cl, struct lustre_peer *peer, char *tmp; ENTRY; - req = ptlrpc_prep_req(cl, peer, MDS_REINT, 2, size, NULL); + req = ptlrpc_prep_req(cl, conn, MDS_REINT, 2, size, NULL); if (!req) RETURN(-ENOMEM); @@ -132,7 +132,7 @@ int mdc_unlink(struct ptlrpc_client *cl, struct lustre_peer *peer, RETURN(rc); } -int mdc_link(struct ptlrpc_client *cl, struct lustre_peer *peer, +int mdc_link(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, struct dentry *src, struct inode *dir, const char *name, int namelen, struct ptlrpc_request **request) { @@ -142,7 +142,7 @@ int mdc_link(struct ptlrpc_client *cl, struct lustre_peer *peer, char *tmp; ENTRY; - req = ptlrpc_prep_req(cl, peer, MDS_REINT, 2, size, NULL); + req = ptlrpc_prep_req(cl, conn, MDS_REINT, 2, size, NULL); if (!req) RETURN(-ENOMEM); @@ -161,7 +161,7 @@ int mdc_link(struct ptlrpc_client *cl, struct lustre_peer *peer, RETURN(rc); } -int mdc_rename(struct ptlrpc_client *cl, struct lustre_peer *peer, +int mdc_rename(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, struct inode *src, struct inode *tgt, const char *old, int oldlen, const char *new, int newlen, struct ptlrpc_request **request) @@ -172,7 +172,7 @@ int mdc_rename(struct ptlrpc_client *cl, struct lustre_peer *peer, char *tmp; ENTRY; - req = ptlrpc_prep_req(cl, peer, MDS_REINT, 3, size, NULL); + req = ptlrpc_prep_req(cl, conn, MDS_REINT, 3, size, NULL); if (!req) RETURN(-ENOMEM); diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 55630b9..e16ce02 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -33,15 +33,15 @@ extern int mds_queue_req(struct ptlrpc_request *); -int mdc_connect(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino, - int type, int valid, struct ptlrpc_request **request) +int mdc_connect(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, + ino_t ino, int type, int valid, struct ptlrpc_request **request) { struct ptlrpc_request *req; struct mds_body *body; int rc, size = sizeof(*body); ENTRY; - req = ptlrpc_prep_req(cl, peer, MDS_GETATTR, 1, &size, NULL); + req = ptlrpc_prep_req(cl, conn, MDS_GETATTR, 1, &size, NULL); if (!req) GOTO(out, rc = -ENOMEM); @@ -51,7 +51,7 @@ int mdc_connect(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino, req->rq_replen = lustre_msg_size(1, &size); - rc = ptlrpc_queue_wait(cl, req); + rc = ptlrpc_queue_wait(req); rc = ptlrpc_check_status(req, rc); if (!rc) { @@ -67,15 +67,15 @@ int mdc_connect(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino, } -int mdc_getattr(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino, - int type, int valid, struct ptlrpc_request **request) +int mdc_getattr(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, + ino_t ino, int type, int valid, struct ptlrpc_request **request) { struct ptlrpc_request *req; struct mds_body *body; int rc, size = sizeof(*body); ENTRY; - req = ptlrpc_prep_req(cl, peer, MDS_GETATTR, 1, &size, NULL); + req = ptlrpc_prep_req(cl, conn, MDS_GETATTR, 1, &size, NULL); if (!req) GOTO(out, rc = -ENOMEM); @@ -85,7 +85,7 @@ int mdc_getattr(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino, req->rq_replen = lustre_msg_size(1, &size); - rc = ptlrpc_queue_wait(cl, req); + rc = ptlrpc_queue_wait(req); rc = ptlrpc_check_status(req, rc); if (!rc) { @@ -100,14 +100,15 @@ int mdc_getattr(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino, return rc; } -int mdc_open(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino, - int type, int flags, __u64 *fh, struct ptlrpc_request **request) +int mdc_open(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, + ino_t ino, int type, int flags, __u64 *fh, + struct ptlrpc_request **request) { struct mds_body *body; int rc, size = sizeof(*body); struct ptlrpc_request *req; - req = ptlrpc_prep_req(cl, peer, MDS_OPEN, 1, &size, NULL); + req = ptlrpc_prep_req(cl, conn, MDS_OPEN, 1, &size, NULL); if (!req) GOTO(out, rc = -ENOMEM); @@ -117,7 +118,7 @@ int mdc_open(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino, req->rq_replen = lustre_msg_size(1, &size); - rc = ptlrpc_queue_wait(cl, req); + rc = ptlrpc_queue_wait(req); rc = ptlrpc_check_status(req, rc); if (!rc) { @@ -132,14 +133,14 @@ int mdc_open(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino, return rc; } -int mdc_close(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino, - int type, __u64 fh, struct ptlrpc_request **request) +int mdc_close(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, + ino_t ino, int type, __u64 fh, struct ptlrpc_request **request) { struct mds_body *body; int rc, size = sizeof(*body); struct ptlrpc_request *req; - req = ptlrpc_prep_req(cl, peer, MDS_CLOSE, 1, &size, NULL); + req = ptlrpc_prep_req(cl, conn, MDS_CLOSE, 1, &size, NULL); if (!req) GOTO(out, rc = -ENOMEM); @@ -149,7 +150,7 @@ int mdc_close(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino, req->rq_replen = lustre_msg_size(1, &size); - rc = ptlrpc_queue_wait(cl, req); + rc = ptlrpc_queue_wait(req); rc = ptlrpc_check_status(req, rc); EXIT; @@ -158,8 +159,8 @@ int mdc_close(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino, return rc; } -int mdc_readpage(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino, - int type, __u64 offset, char *addr, +int mdc_readpage(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, + ino_t ino, int type, __u64 offset, char *addr, struct ptlrpc_request **request) { struct ptlrpc_request *req = NULL; @@ -173,21 +174,21 @@ int mdc_readpage(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino, CDEBUG(D_INODE, "inode: %ld\n", ino); - bulk = ptlrpc_prep_bulk(peer); + bulk = ptlrpc_prep_bulk(conn); if (bulk == NULL) { CERROR("%s: cannot init bulk desc\n", __FUNCTION__); rc = -ENOMEM; goto out; } - req = ptlrpc_prep_req(cl, peer, MDS_READPAGE, 2, size, bufs); + req = ptlrpc_prep_req(cl, conn, MDS_READPAGE, 2, size, bufs); if (!req) GOTO(out, rc = -ENOMEM); bulk->b_buflen = PAGE_SIZE; bulk->b_buf = (void *)(long)niobuf.addr; bulk->b_portal = MDS_BULK_PORTAL; - bulk->b_xid = req->rq_xid; + bulk->b_xid = req->rq_reqmsg->xid; rc = ptlrpc_register_bulk(bulk); if (rc) { @@ -202,7 +203,7 @@ int mdc_readpage(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino, req->rq_replen = lustre_msg_size(1, size); - rc = ptlrpc_queue_wait(cl, req); + rc = ptlrpc_queue_wait(req); if (rc) { CERROR("error in handling %d\n", rc); ptlrpc_abort_bulk(bulk); @@ -224,7 +225,7 @@ static int request_ioctl(struct inode *inode, struct file *file, { int err; struct ptlrpc_client cl; - struct lustre_peer peer; + struct ptlrpc_connection *conn; struct ptlrpc_request *request; ENTRY; @@ -241,7 +242,7 @@ static int request_ioctl(struct inode *inode, struct file *file, } ptlrpc_init_client(NULL, MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, &cl); - err = ptlrpc_connect_client("mds", &cl, &peer); + conn = ptlrpc_connect_client("mds"); if (err) { CERROR("cannot create client\n"); RETURN(-EINVAL); @@ -250,7 +251,7 @@ static int request_ioctl(struct inode *inode, struct file *file, switch (cmd) { case IOC_REQUEST_GETATTR: { CERROR("-- getting attr for ino %lu\n", arg); - err = mdc_getattr(&cl, &peer, arg, S_IFDIR, ~0, &request); + err = mdc_getattr(&cl, conn, arg, S_IFDIR, ~0, &request); CERROR("-- done err %d\n", err); GOTO(out, err); @@ -264,7 +265,7 @@ static int request_ioctl(struct inode *inode, struct file *file, break; } CERROR("-- readpage 0 for ino %lu\n", arg); - err = mdc_readpage(&cl, &peer, arg, S_IFDIR, 0, buf, &request); + err = mdc_readpage(&cl, conn, arg, S_IFDIR, 0, buf, &request); CERROR("-- done err %d\n", err); OBD_FREE(buf, PAGE_SIZE); @@ -281,7 +282,7 @@ static int request_ioctl(struct inode *inode, struct file *file, iattr.ia_atime = 0; iattr.ia_valid = ATTR_MODE | ATTR_ATIME; - err = mdc_setattr(&cl, &peer, &inode, &iattr, &request); + err = mdc_setattr(&cl, conn, &inode, &iattr, &request); CERROR("-- done err %d\n", err); GOTO(out, err); @@ -297,7 +298,7 @@ static int request_ioctl(struct inode *inode, struct file *file, iattr.ia_atime = 0; iattr.ia_valid = ATTR_MODE | ATTR_ATIME; - err = mdc_create(&cl, &peer, &inode, + err = mdc_create(&cl, conn, &inode, "foofile", strlen("foofile"), NULL, 0, 0100707, 47114711, 11, 47, 0, &request); @@ -310,7 +311,7 @@ static int request_ioctl(struct inode *inode, struct file *file, __u64 fh, ino; copy_from_user(&ino, (__u64 *)arg, sizeof(ino)); CERROR("-- opening ino %llu\n", ino); - err = mdc_open(&cl, &peer, ino, S_IFDIR, O_RDONLY, &fh, + err = mdc_open(&cl, conn, ino, S_IFDIR, O_RDONLY, &fh, &request); copy_to_user((__u64 *)arg, &fh, sizeof(fh)); CERROR("-- done err %d (fh=%Lu)\n", err, fh); @@ -320,7 +321,7 @@ static int request_ioctl(struct inode *inode, struct file *file, case IOC_REQUEST_CLOSE: { CERROR("-- closing ino 2, filehandle %lu\n", arg); - err = mdc_close(&cl, &peer, 2, S_IFDIR, arg, &request); + err = mdc_close(&cl, conn, 2, S_IFDIR, arg, &request); CERROR("-- done err %d\n", err); GOTO(out, err); @@ -332,6 +333,7 @@ static int request_ioctl(struct inode *inode, struct file *file, out: ptlrpc_free_req(request); + ptlrpc_put_connection(conn); RETURN(err); } diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 14d670e..cc442ad 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -38,74 +38,57 @@ int mds_sendpage(struct ptlrpc_request *req, struct file *file, { int rc = 0; mm_segment_t oldfs = get_fs(); + struct ptlrpc_bulk_desc *bulk; + char *buf; - if (req->rq_peer.peer_nid == 0) { - /* dst->addr is a user address, but in a different task! */ - char *buf = (char *)(long)dst->addr; - - set_fs(KERNEL_DS); - rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE, - &offset); - set_fs(oldfs); - - if (rc != PAGE_SIZE) { - rc = -EIO; - GOTO(out, rc); - } - EXIT; - } else { - struct ptlrpc_bulk_desc *bulk; - char *buf; - - bulk = ptlrpc_prep_bulk(&req->rq_peer); - if (bulk == NULL) { - rc = -ENOMEM; - GOTO(out, rc); - } - - bulk->b_xid = req->rq_xid; + bulk = ptlrpc_prep_bulk(req->rq_connection); + if (bulk == NULL) { + rc = -ENOMEM; + GOTO(out, rc); + } - OBD_ALLOC(buf, PAGE_SIZE); - if (!buf) { - rc = -ENOMEM; - GOTO(cleanup_bulk, rc); - } + bulk->b_xid = req->rq_reqmsg->xid; - set_fs(KERNEL_DS); - rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE, - &offset); - set_fs(oldfs); + OBD_ALLOC(buf, PAGE_SIZE); + if (!buf) { + rc = -ENOMEM; + GOTO(cleanup_bulk, rc); + } - if (rc != PAGE_SIZE) { - rc = -EIO; - GOTO(cleanup_buf, rc); - } + set_fs(KERNEL_DS); + rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE, + &offset); + set_fs(oldfs); - bulk->b_buf = buf; - bulk->b_buflen = PAGE_SIZE; + if (rc != PAGE_SIZE) { + rc = -EIO; + GOTO(cleanup_buf, rc); + } - rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) { - CERROR("obd_fail_loc=%x, fail operation rc=%d\n", - OBD_FAIL_MDS_SENDPAGE, rc); - PtlMDUnlink(bulk->b_md_h); - GOTO(cleanup_buf, rc); - } - wait_event_interruptible(bulk->b_waitq, - ptlrpc_check_bulk_sent(bulk)); + bulk->b_buf = buf; + bulk->b_buflen = PAGE_SIZE; - if (bulk->b_flags == PTL_RPC_INTR) { - rc = -EINTR; - GOTO(cleanup_buf, rc); - } + rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) { + CERROR("obd_fail_loc=%x, fail operation rc=%d\n", + OBD_FAIL_MDS_SENDPAGE, rc); + PtlMDUnlink(bulk->b_md_h); + GOTO(cleanup_buf, rc); + } + wait_event_interruptible(bulk->b_waitq, + ptlrpc_check_bulk_sent(bulk)); - EXIT; - cleanup_buf: - OBD_FREE(buf, PAGE_SIZE); - cleanup_bulk: - OBD_FREE(bulk, sizeof(*bulk)); + if (bulk->b_flags == PTL_RPC_INTR) { + rc = -EINTR; + GOTO(cleanup_buf, rc); } -out: + + EXIT; + cleanup_buf: + OBD_FREE(buf, PAGE_SIZE); + cleanup_bulk: + OBD_FREE(bulk, sizeof(*bulk)); + out: return rc; } @@ -181,8 +164,7 @@ int mds_getattr(struct ptlrpc_request *req) int rc, size = sizeof(*body); ENTRY; - rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf); - req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) { CERROR("mds: out of memory\n"); req->rq_status = -ENOMEM; @@ -224,8 +206,7 @@ int mds_open(struct ptlrpc_request *req) int rc, size = sizeof(*body); ENTRY; - rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf); - req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) { CERROR("mds: out of memory\n"); req->rq_status = -ENOMEM; @@ -259,8 +240,7 @@ int mds_close(struct ptlrpc_request *req) int rc; ENTRY; - rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf); - req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg); if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) { CERROR("mds: out of memory\n"); req->rq_status = -ENOMEM; @@ -292,8 +272,7 @@ int mds_readpage(struct ptlrpc_request *req) int rc, size = sizeof(*body); ENTRY; - rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf); - req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) { CERROR("mds: out of memory\n"); req->rq_status = -ENOMEM; @@ -355,8 +334,7 @@ int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc, int rc; ENTRY; - rc = lustre_unpack_msg(req->rq_reqbuf, req->rq_reqlen); - req->rq_reqmsg = (struct lustre_msg *)req->rq_reqbuf; + rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen); if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) { CERROR("lustre_mds: Invalid request\n"); GOTO(out, rc); diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index 1c800aa..9149be6 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -346,8 +346,7 @@ int mds_reint_rec(struct mds_update_record *rec, struct ptlrpc_request *req) RETURN(rc); } - rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf); - req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) { CERROR("mds: out of memory\n"); rc = req->rq_status = -ENOMEM; diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 4d2007a..3af50e9 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -45,30 +45,30 @@ #include static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl, - struct lustre_peer **peer) + struct ptlrpc_connection **connection) { struct osc_obd *osc = &conn->oc_dev->u.osc; *cl = osc->osc_client; - *peer = &osc->osc_peer; + *connection = osc->osc_conn; } static int osc_connect(struct obd_conn *conn) { struct ptlrpc_request *request; struct ptlrpc_client *cl; - struct lustre_peer *peer; + struct ptlrpc_connection *connection; struct ost_body *body; int rc, size = sizeof(*body); ENTRY; - osc_con2cl(conn, &cl, &peer); - request = ptlrpc_prep_req(cl, peer, OST_CONNECT, 0, NULL, NULL); + osc_con2cl(conn, &cl, &connection); + request = ptlrpc_prep_req(cl, connection, OST_CONNECT, 0, NULL, NULL); if (!request) RETURN(-ENOMEM); request->rq_replen = lustre_msg_size(1, &size); - rc = ptlrpc_queue_wait(cl, request); + rc = ptlrpc_queue_wait(request); if (rc) GOTO(out, rc); @@ -86,13 +86,13 @@ static int osc_disconnect(struct obd_conn *conn) { struct ptlrpc_request *request; struct ptlrpc_client *cl; - struct lustre_peer *peer; + struct ptlrpc_connection *connection; struct ost_body *body; int rc, size = sizeof(*body); ENTRY; - osc_con2cl(conn, &cl, &peer); - request = ptlrpc_prep_req(cl, peer, OST_DISCONNECT, 1, &size, NULL); + osc_con2cl(conn, &cl, &connection); + request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -101,7 +101,7 @@ static int osc_disconnect(struct obd_conn *conn) request->rq_replen = lustre_msg_size(1, &size); - rc = ptlrpc_queue_wait(cl, request); + rc = ptlrpc_queue_wait(request); GOTO(out, rc); out: ptlrpc_free_req(request); @@ -112,13 +112,13 @@ static int osc_getattr(struct obd_conn *conn, struct obdo *oa) { struct ptlrpc_request *request; struct ptlrpc_client *cl; - struct lustre_peer *peer; + struct ptlrpc_connection *connection; struct ost_body *body; int rc, size = sizeof(*body); ENTRY; - osc_con2cl(conn, &cl, &peer); - request = ptlrpc_prep_req(cl, peer, OST_GETATTR, 1, &size, NULL); + osc_con2cl(conn, &cl, &connection); + request = ptlrpc_prep_req(cl, connection, OST_GETATTR, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -129,7 +129,7 @@ static int osc_getattr(struct obd_conn *conn, struct obdo *oa) request->rq_replen = lustre_msg_size(1, &size); - rc = ptlrpc_queue_wait(cl, request); + rc = ptlrpc_queue_wait(request); if (rc) GOTO(out, rc); @@ -148,13 +148,13 @@ static int osc_open(struct obd_conn *conn, struct obdo *oa) { struct ptlrpc_request *request; struct ptlrpc_client *cl; - struct lustre_peer *peer; + struct ptlrpc_connection *connection; struct ost_body *body; int rc, size = sizeof(*body); ENTRY; - osc_con2cl(conn, &cl, &peer); - request = ptlrpc_prep_req(cl, peer, OST_OPEN, 1, &size, NULL); + osc_con2cl(conn, &cl, &connection); + request = ptlrpc_prep_req(cl, connection, OST_OPEN, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -166,7 +166,7 @@ static int osc_open(struct obd_conn *conn, struct obdo *oa) request->rq_replen = lustre_msg_size(1, &size); - rc = ptlrpc_queue_wait(cl, request); + rc = ptlrpc_queue_wait(request); if (rc) GOTO(out, rc); @@ -185,13 +185,13 @@ static int osc_close(struct obd_conn *conn, struct obdo *oa) { struct ptlrpc_request *request; struct ptlrpc_client *cl; - struct lustre_peer *peer; + struct ptlrpc_connection *connection; struct ost_body *body; int rc, size = sizeof(*body); ENTRY; - osc_con2cl(conn, &cl, &peer); - request = ptlrpc_prep_req(cl, peer, OST_CLOSE, 1, &size, NULL); + osc_con2cl(conn, &cl, &connection); + request = ptlrpc_prep_req(cl, connection, OST_CLOSE, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -201,7 +201,7 @@ static int osc_close(struct obd_conn *conn, struct obdo *oa) request->rq_replen = lustre_msg_size(1, &size); - rc = ptlrpc_queue_wait(cl, request); + rc = ptlrpc_queue_wait(request); if (rc) GOTO(out, rc); @@ -220,13 +220,13 @@ static int osc_setattr(struct obd_conn *conn, struct obdo *oa) { struct ptlrpc_request *request; struct ptlrpc_client *cl; - struct lustre_peer *peer; + struct ptlrpc_connection *connection; struct ost_body *body; int rc, size = sizeof(*body); ENTRY; - osc_con2cl(conn, &cl, &peer); - request = ptlrpc_prep_req(cl, peer, OST_SETATTR, 1, &size, NULL); + osc_con2cl(conn, &cl, &connection); + request = ptlrpc_prep_req(cl, connection, OST_SETATTR, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -236,7 +236,7 @@ static int osc_setattr(struct obd_conn *conn, struct obdo *oa) request->rq_replen = lustre_msg_size(1, &size); - rc = ptlrpc_queue_wait(cl, request); + rc = ptlrpc_queue_wait(request); GOTO(out, rc); out: @@ -248,7 +248,7 @@ static int osc_create(struct obd_conn *conn, struct obdo *oa) { struct ptlrpc_request *request; struct ptlrpc_client *cl; - struct lustre_peer *peer; + struct ptlrpc_connection *connection; struct ost_body *body; int rc, size = sizeof(*body); ENTRY; @@ -257,8 +257,8 @@ static int osc_create(struct obd_conn *conn, struct obdo *oa) CERROR("oa NULL\n"); RETURN(-EINVAL); } - osc_con2cl(conn, &cl, &peer); - request = ptlrpc_prep_req(cl, peer, OST_CREATE, 1, &size, NULL); + osc_con2cl(conn, &cl, &connection); + request = ptlrpc_prep_req(cl, connection, OST_CREATE, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -269,7 +269,7 @@ static int osc_create(struct obd_conn *conn, struct obdo *oa) request->rq_replen = lustre_msg_size(1, &size); - rc = ptlrpc_queue_wait(cl, request); + rc = ptlrpc_queue_wait(request); if (rc) GOTO(out, rc); @@ -287,7 +287,7 @@ static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count, { struct ptlrpc_request *request; struct ptlrpc_client *cl; - struct lustre_peer *peer; + struct ptlrpc_connection *connection; struct ost_body *body; int rc, size = sizeof(*body); ENTRY; @@ -296,8 +296,8 @@ static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count, CERROR("oa NULL\n"); RETURN(-EINVAL); } - osc_con2cl(conn, &cl, &peer); - request = ptlrpc_prep_req(cl, peer, OST_PUNCH, 1, &size, NULL); + osc_con2cl(conn, &cl, &connection); + request = ptlrpc_prep_req(cl, connection, OST_PUNCH, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -310,7 +310,7 @@ static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count, request->rq_replen = lustre_msg_size(1, &size); - rc = ptlrpc_queue_wait(cl, request); + rc = ptlrpc_queue_wait(request); if (rc) GOTO(out, rc); @@ -327,7 +327,7 @@ static int osc_destroy(struct obd_conn *conn, struct obdo *oa) { struct ptlrpc_request *request; struct ptlrpc_client *cl; - struct lustre_peer *peer; + struct ptlrpc_connection *connection; struct ost_body *body; int rc, size = sizeof(*body); ENTRY; @@ -336,8 +336,8 @@ static int osc_destroy(struct obd_conn *conn, struct obdo *oa) CERROR("oa NULL\n"); RETURN(-EINVAL); } - osc_con2cl(conn, &cl, &peer); - request = ptlrpc_prep_req(cl, peer, OST_DESTROY, 1, &size, NULL); + osc_con2cl(conn, &cl, &connection); + request = ptlrpc_prep_req(cl, connection, OST_DESTROY, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -348,7 +348,7 @@ static int osc_destroy(struct obd_conn *conn, struct obdo *oa) request->rq_replen = lustre_msg_size(1, &size); - rc = ptlrpc_queue_wait(cl, request); + rc = ptlrpc_queue_wait(request); if (rc) GOTO(out, rc); @@ -365,9 +365,9 @@ int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req, struct niobuf *dst, struct niobuf *src) { struct ptlrpc_client *cl; - struct lustre_peer *peer; + struct ptlrpc_connection *connection; - osc_con2cl(conn, &cl, &peer); + osc_con2cl(conn, &cl, &connection); if (cl->cli_obd) { /* local sendpage */ @@ -377,7 +377,7 @@ int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req, struct ptlrpc_bulk_desc *bulk; int rc; - bulk = ptlrpc_prep_bulk(peer); + bulk = ptlrpc_prep_bulk(connection); if (bulk == NULL) return -ENOMEM; @@ -410,7 +410,7 @@ int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, obd_off *offset, obd_flag *flags) { struct ptlrpc_client *cl; - struct lustre_peer *peer; + struct ptlrpc_connection *connection; struct ptlrpc_request *request; struct ost_body *body; struct obd_ioobj ioo; @@ -430,8 +430,8 @@ int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, if (bulk == NULL) RETURN(-ENOMEM); - osc_con2cl(conn, &cl, &peer); - request = ptlrpc_prep_req(cl, peer, OST_BRW, 3, size, NULL); + osc_con2cl(conn, &cl, &connection); + request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL); if (!request) GOTO(out, rc = -ENOMEM); @@ -443,13 +443,13 @@ int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, for (pages = 0, i = 0; i < num_oa; i++) { ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); for (j = 0; j < oa_bufs[i]; j++, pages++) { - bulk[pages] = ptlrpc_prep_bulk(peer); + bulk[pages] = ptlrpc_prep_bulk(connection); if (bulk[pages] == NULL) GOTO(out, rc = -ENOMEM); - spin_lock(&cl->cli_lock); - bulk[pages]->b_xid = cl->cli_xid++; - spin_unlock(&cl->cli_lock); + spin_lock(&connection->c_lock); + bulk[pages]->b_xid = ++connection->c_xid_out; + spin_unlock(&connection->c_lock); bulk[pages]->b_buf = kmap(buf[pages]); bulk[pages]->b_buflen = PAGE_SIZE; @@ -465,7 +465,7 @@ int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, } request->rq_replen = lustre_msg_size(1, size); - rc = ptlrpc_queue_wait(cl, request); + rc = ptlrpc_queue_wait(request); GOTO(out, rc); out: @@ -491,7 +491,7 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, obd_off *offset, obd_flag *flags) { struct ptlrpc_client *cl; - struct lustre_peer *peer; + struct ptlrpc_connection *connection; struct ptlrpc_request *request; struct obd_ioobj ioo; struct ost_body *body; @@ -510,8 +510,8 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, if (!src) RETURN(-ENOMEM); - osc_con2cl(conn, &cl, &peer); - request = ptlrpc_prep_req(cl, peer, OST_BRW, 3, size, NULL); + osc_con2cl(conn, &cl, &connection); + request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL); if (!request) RETURN(-ENOMEM); body = lustre_msg_buf(request->rq_reqmsg, 0); @@ -531,7 +531,7 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, size[1] = pages * sizeof(struct niobuf); request->rq_replen = lustre_msg_size(2, size); - rc = ptlrpc_queue_wait(cl, request); + rc = ptlrpc_queue_wait(request); if (rc) GOTO(out, rc); @@ -576,13 +576,9 @@ int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa, } /* mount the file system (secretly) */ -static int osc_setup(struct obd_device *obddev, obd_count len, - void *buf) - +static int osc_setup(struct obd_device *obddev, obd_count len, void *buf) { struct osc_obd *osc = &obddev->u.osc; - struct obd_ioctl_data *data = (struct obd_ioctl_data *)buf; - int rc; ENTRY; OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client)); @@ -590,21 +586,22 @@ static int osc_setup(struct obd_device *obddev, obd_count len, RETURN(-ENOMEM); ptlrpc_init_client(NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL, - osc->osc_client); + osc->osc_client); - rc = ptlrpc_connect_client("ost", osc->osc_client, &osc->osc_peer); + osc->osc_conn = ptlrpc_connect_client("ost"); + if (!osc->osc_conn) + RETURN(-EINVAL); - if (rc == 0) - MOD_INC_USE_COUNT; - RETURN(rc); + MOD_INC_USE_COUNT; + RETURN(0); } static int osc_cleanup(struct obd_device * obddev) { struct osc_obd *osc = &obddev->u.osc; - if (osc->osc_client != NULL) - OBD_FREE(osc->osc_client, sizeof(*osc->osc_client)); + OBD_FREE(osc->osc_client, sizeof(*osc->osc_client)); + ptlrpc_put_connection(osc->osc_conn); MOD_DEC_USE_COUNT; return 0; diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index f543343..47f2628 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -57,8 +57,7 @@ static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req) conn.oc_id = body->connid; conn.oc_dev = ost->ost_tgt; - rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf); - req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) RETURN(rc); @@ -77,8 +76,7 @@ static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req) conn.oc_id = body->connid; conn.oc_dev = ost->ost_tgt; - rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf); - req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) RETURN(rc); @@ -99,8 +97,7 @@ static int ost_open(struct ost_obd *ost, struct ptlrpc_request *req) conn.oc_id = body->connid; conn.oc_dev = ost->ost_tgt; - rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf); - req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) RETURN(rc); @@ -121,8 +118,7 @@ static int ost_close(struct ost_obd *ost, struct ptlrpc_request *req) conn.oc_id = body->connid; conn.oc_dev = ost->ost_tgt; - rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf); - req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) RETURN(rc); @@ -143,8 +139,7 @@ static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req) conn.oc_id = body->connid; conn.oc_dev = ost->ost_tgt; - rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf); - req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) RETURN(rc); @@ -165,8 +160,7 @@ static int ost_punch(struct ost_obd *ost, struct ptlrpc_request *req) conn.oc_id = body->connid; conn.oc_dev = ost->ost_tgt; - rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf); - req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) RETURN(rc); @@ -188,8 +182,7 @@ static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req) conn.oc_id = body->connid; conn.oc_dev = ost->ost_tgt; - rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf); - req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) RETURN(rc); @@ -208,14 +201,13 @@ static int ost_connect(struct ost_obd *ost, struct ptlrpc_request *req) conn.oc_dev = ost->ost_tgt; - rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf); - req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) RETURN(rc); req->rq_status = obd_connect(&conn); - CDEBUG(D_IOCTL, "rep buffer %p, id %d\n", req->rq_repbuf, conn.oc_id); + CDEBUG(D_IOCTL, "rep buffer %p, id %d\n", req->rq_repmsg, conn.oc_id); body = lustre_msg_buf(req->rq_repmsg, 0); body->connid = conn.oc_id; RETURN(0); @@ -232,8 +224,7 @@ static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req) conn.oc_id = body->connid; conn.oc_dev = ost->ost_tgt; - rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf); - req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) RETURN(rc); @@ -261,8 +252,7 @@ static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req) req->rq_status = obd_get_info(&conn, req->rq_reqmsg->buflens[1], ptr, &(size[1]), (void **)&(bufs[1])); - rc = lustre_pack_msg(2, size, bufs, &req->rq_replen, &req->rq_repbuf); - req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + rc = lustre_pack_msg(2, size, bufs, &req->rq_replen, &req->rq_repmsg); if (rc) CERROR("cannot pack reply\n"); @@ -302,8 +292,7 @@ static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req) ost_unpack_niobuf(&tmp2, &nb); } - rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf); - req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) RETURN(rc); OBD_ALLOC(res, sizeof(*res) * niocount); @@ -320,7 +309,7 @@ static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req) GOTO(out, 0); for (i = 0; i < niocount; i++) { - bulk = ptlrpc_prep_bulk(&req->rq_peer); + bulk = ptlrpc_prep_bulk(req->rq_connection); if (bulk == NULL) { CERROR("cannot alloc bulk desc\n"); GOTO(out, rc = -ENOMEM); @@ -427,8 +416,7 @@ static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req) } size[1] = niocount * sizeof(*nb); - rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repbuf); - req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) RETURN(rc); @@ -447,7 +435,7 @@ static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req) struct ptlrpc_bulk_desc *bulk; struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service; - bulk = ptlrpc_prep_bulk(&req->rq_peer); + bulk = ptlrpc_prep_bulk(req->rq_connection); if (bulk == NULL) GOTO(out, rc = -ENOMEM); @@ -492,8 +480,7 @@ static int ost_handle(struct obd_device *obddev, struct ptlrpc_service *svc, struct ost_obd *ost = &obddev->u.ost; ENTRY; - rc = lustre_unpack_msg(req->rq_reqbuf, req->rq_reqlen); - req->rq_reqmsg = (struct lustre_msg *)req->rq_reqbuf; + rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen); if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) { CERROR("lustre_mds: Invalid request\n"); GOTO(out, rc); diff --git a/lustre/ptlrpc/Makefile.am b/lustre/ptlrpc/Makefile.am index 7c47ff2..c4d4bab 100644 --- a/lustre/ptlrpc/Makefile.am +++ b/lustre/ptlrpc/Makefile.am @@ -9,6 +9,7 @@ MODULE = ptlrpc modulefs_DATA = ptlrpc.o EXTRA_PROGRAMS = ptlrpc -ptlrpc_SOURCES = rpc.c events.c service.c client.c niobuf.c pack_generic.c +ptlrpc_SOURCES = client.c connection.c events.c niobuf.c pack_generic.c rpc.c \ +service.c include $(top_srcdir)/Rules diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 5a6a5c1..e2bceff 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -22,15 +22,8 @@ #define EXPORT_SYMTAB -#include -#include -#include -#include - #define DEBUG_SUBSYSTEM S_RPC -#include -#include #include void llite_ha_conn_manage(struct lustre_ha_mgr *mgr, struct ptlrpc_client *cli) @@ -54,18 +47,14 @@ void llite_ha_conn_fail(struct ptlrpc_client *cli) EXIT; } -void ptlrpc_init_client(struct lustre_ha_mgr *mgr, int req_portal, int rep_portal, - struct ptlrpc_client *cl) +void ptlrpc_init_client(struct lustre_ha_mgr *mgr, int req_portal, + int rep_portal, struct ptlrpc_client *cl) { memset(cl, 0, sizeof(*cl)); spin_lock_init(&cl->cli_lock); cl->cli_ha_mgr = mgr; if (mgr) llite_ha_conn_manage(mgr, cl); - cl->cli_xid = 1; - cl->cli_generation = 1; - cl->cli_epoch = 1; - cl->cli_bootcount = 0; cl->cli_obd = NULL; cl->cli_request_portal = req_portal; cl->cli_reply_portal = rep_portal; @@ -74,26 +63,32 @@ void ptlrpc_init_client(struct lustre_ha_mgr *mgr, int req_portal, int rep_porta sema_init(&cl->cli_rpc_sem, 32); } -int ptlrpc_connect_client(char *uuid, struct ptlrpc_client *cl, - struct lustre_peer *peer) +struct ptlrpc_connection *ptlrpc_connect_client(char *uuid) { + struct ptlrpc_connection *c; + struct lustre_peer peer; int err; - cl->cli_epoch++; - err = kportal_uuid_to_peer(uuid, peer); - if (err != 0) + err = kportal_uuid_to_peer(uuid, &peer); + if (err != 0) { CERROR("cannot find peer %s!\n", uuid); + return NULL; + } - return err; + c = ptlrpc_get_connection(&peer); + if (c) + c->c_epoch++; + + return c; } -struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct lustre_peer *peer) +struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *conn) { struct ptlrpc_bulk_desc *bulk; OBD_ALLOC(bulk, sizeof(*bulk)); if (bulk != NULL) { - memcpy(&bulk->b_peer, peer, sizeof(*peer)); + bulk->b_connection = ptlrpc_connection_addref(conn); init_waitqueue_head(&bulk->b_waitq); } @@ -101,8 +96,9 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct lustre_peer *peer) } struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, - struct lustre_peer *peer, int opcode, - int count, int *lengths, char **bufs) + struct ptlrpc_connection *conn, + int opcode, int count, int *lengths, + char **bufs) { struct ptlrpc_request *request; int rc; @@ -114,26 +110,27 @@ struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, RETURN(NULL); } - spin_lock(&cl->cli_lock); - request->rq_xid = cl->cli_xid++; - spin_unlock(&cl->cli_lock); - rc = lustre_pack_msg(count, lengths, bufs, - &request->rq_reqlen, &request->rq_reqbuf); + &request->rq_reqlen, &request->rq_reqmsg); if (rc) { CERROR("cannot pack request %d\n", rc); RETURN(NULL); } + request->rq_time = CURRENT_TIME; request->rq_type = PTL_RPC_REQUEST; - memcpy(&request->rq_peer, peer, sizeof(*peer)); - request->rq_reqmsg = (struct lustre_msg *)request->rq_reqbuf; + request->rq_connection = ptlrpc_connection_addref(conn); + + request->rq_reqmsg->conn = (__u64)(unsigned long)conn; + request->rq_reqmsg->token = conn->c_token; request->rq_reqmsg->opc = HTON__u32(opcode); - request->rq_reqmsg->xid = HTON__u32(request->rq_xid); request->rq_reqmsg->type = HTON__u32(request->rq_type); + + spin_lock(&conn->c_lock); + request->rq_reqmsg->xid = HTON__u32(++conn->c_xid_out); + spin_unlock(&c->c_lock); + request->rq_client = cl; - request->rq_req_portal = cl->cli_request_portal; - request->rq_reply_portal = cl->cli_reply_portal; RETURN(request); } @@ -143,8 +140,11 @@ void ptlrpc_free_req(struct ptlrpc_request *request) if (request == NULL) return; - if (request->rq_repbuf != NULL) - OBD_FREE(request->rq_repbuf, request->rq_replen); + if (request->rq_repmsg != NULL) + OBD_FREE(request->rq_repmsg, request->rq_replen); + + ptlrpc_put_connection(request->rq_connection); + OBD_FREE(request, sizeof(*request)); } @@ -153,7 +153,7 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req) int rc = 0; schedule_timeout(3 * HZ); /* 3 second timeout */ - if (req->rq_repbuf != NULL) { + if (req->rq_repmsg != NULL) { req->rq_flags = PTL_RPC_REPLY; GOTO(out, rc = 1); } @@ -207,8 +207,8 @@ int ptlrpc_check_status(struct ptlrpc_request *req, int err) static void ptlrpc_cleanup_request_buf(struct ptlrpc_request *request) { - OBD_FREE(request->rq_reqbuf, request->rq_reqlen); - request->rq_reqbuf = NULL; + OBD_FREE(request->rq_reqmsg, request->rq_reqlen); + request->rq_reqmsg = NULL; request->rq_reqlen = 0; } @@ -219,23 +219,23 @@ static int ptlrpc_abort(struct ptlrpc_request *request) * that we can tear down the buffer safely. */ PtlMEUnlink(request->rq_reply_me_h); OBD_FREE(request->rq_reply_md.start, request->rq_replen); - request->rq_repbuf = NULL; + request->rq_repmsg = NULL; request->rq_replen = 0; return 0; } -int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req) +int ptlrpc_queue_wait(struct ptlrpc_request *req) { int rc = 0; ENTRY; init_waitqueue_head(&req->rq_wait_for_rep); - rc = ptl_send_rpc(req, cl); + rc = ptl_send_rpc(req); if (rc) { CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc); ptlrpc_cleanup_request_buf(req); - up(&cl->cli_rpc_sem); + up(&req->rq_client->cli_rpc_sem); RETURN(-rc); } @@ -243,7 +243,7 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req) wait_event_interruptible(req->rq_wait_for_rep, ptlrpc_check_reply(req)); CDEBUG(D_OTHER, "-- done\n"); ptlrpc_cleanup_request_buf(req); - up(&cl->cli_rpc_sem); + up(&req->rq_client->cli_rpc_sem); if (req->rq_flags == PTL_RPC_INTR) { /* Clean up the dangling reply buffers */ ptlrpc_abort(req); @@ -257,8 +257,7 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req) GOTO(out, rc = -EINTR); } - rc = lustre_unpack_msg(req->rq_repbuf, req->rq_replen); - req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen); if (rc) { CERROR("unpack_rep failed: %d\n", rc); GOTO(out, rc); @@ -266,7 +265,7 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req) CDEBUG(D_NET, "got rep %d\n", req->rq_repmsg->xid); if (req->rq_repmsg->status == 0) - CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repbuf, + CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repmsg, req->rq_replen, req->rq_repmsg->status); EXIT; diff --git a/lustre/ptlrpc/connection.c b/lustre/ptlrpc/connection.c new file mode 100644 index 0000000..30f7f87 --- /dev/null +++ b/lustre/ptlrpc/connection.c @@ -0,0 +1,128 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2002 Cluster File Systems, Inc. + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + +#define EXPORT_SYMTAB + +#define DEBUG_SUBSYSTEM S_RPC + +#include + +static spinlock_t conn_lock; +static struct list_head conn_list; +static struct list_head conn_unused_list; + +struct ptlrpc_connection *ptlrpc_get_connection(struct lustre_peer *peer) +{ + struct list_head *tmp, *pos; + struct ptlrpc_connection *c; + ENTRY; + + spin_lock(&conn_lock); + list_for_each(tmp, &conn_list) { + c = list_entry(tmp, struct ptlrpc_connection, c_link); + if (memcmp(peer, &c->c_peer, sizeof(*peer)) == 0) { + atomic_inc(&c->c_refcount); + GOTO(out, c); + } + } + + list_for_each_safe(tmp, pos, &conn_unused_list) { + c = list_entry(tmp, struct ptlrpc_connection, c_link); + if (memcmp(peer, &c->c_peer, sizeof(*peer)) == 0) { + atomic_inc(&c->c_refcount); + list_del(&c->c_link); + list_add(&c->c_link, &conn_list); + GOTO(out, c); + } + } + + /* FIXME: this should be a slab once we can validate slab addresses + * without OOPSing */ + OBD_ALLOC(c, sizeof(*c)); + if (c == NULL) + GOTO(out, c); + + c->c_xid_in = 1; + c->c_xid_out = 1; + c->c_generation = 1; + c->c_epoch = 1; + c->c_bootcount = 0; + atomic_set(&c->c_refcount, 1); + + memcpy(&c->c_peer, peer, sizeof(c->c_peer)); + list_add(&c->c_link, &conn_list); + + EXIT; + out: + spin_unlock(&conn_lock); + return c; +} + +int ptlrpc_put_connection(struct ptlrpc_connection *c) +{ + int rc = 0; + + if (atomic_dec_and_test(&c->c_refcount)) { + spin_lock(&conn_lock); + list_del(&c->c_link); + list_add(&c->c_link, &conn_unused_list); + spin_unlock(&conn_lock); + rc = 1; + } + + return rc; +} + +struct ptlrpc_connection *ptlrpc_connection_addref(struct ptlrpc_connection *c) +{ + atomic_inc(&c->c_refcount); + return c; +} + +void ptlrpc_init_connection(void) +{ + INIT_LIST_HEAD(&conn_list); + INIT_LIST_HEAD(&conn_unused_list); + conn_lock = SPIN_LOCK_UNLOCKED; +} + +void ptlrpc_cleanup_connection(void) +{ + struct list_head *tmp, *pos; + struct ptlrpc_connection *c; + + spin_lock(&conn_lock); + list_for_each_safe(tmp, pos, &conn_unused_list) { + c = list_entry(tmp, struct ptlrpc_connection, c_link); + list_del(&c->c_link); + OBD_FREE(c, sizeof(*c)); + } + list_for_each_safe(tmp, pos, &conn_list) { + c = list_entry(tmp, struct ptlrpc_connection, c_link); + CERROR("Connection %p has refcount %d at cleanup (nid=%lu)!\n", + c, atomic_read(&c->c_refcount), + (unsigned long)c->c_peer.peer_nid); + list_del(&c->c_link); + OBD_FREE(c, sizeof(*c)); + } + spin_unlock(&conn_lock); +} diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index 5cf37fb..bf275b8 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -22,14 +22,10 @@ #define EXPORT_SYMTAB -#include #include -#include #define DEBUG_SUBSYSTEM S_RPC -#include -#include #include ptl_handle_eq_t request_out_eq, @@ -89,7 +85,7 @@ static int reply_in_callback(ptl_event_t *ev, void *data) ENTRY; if (ev->type == PTL_EVENT_PUT) { - rpc->rq_repbuf = ev->mem_desc.start + ev->offset; + rpc->rq_repmsg = ev->mem_desc.start + ev->offset; barrier(); wake_up_interruptible(&rpc->rq_wait_for_rep); } else { diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 90c7f51..67c7309 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -22,14 +22,8 @@ #define EXPORT_SYMTAB -#include -#include -#include - #define DEBUG_SUBSYSTEM S_RPC -#include -#include #include extern ptl_handle_eq_t request_out_eq, @@ -42,23 +36,22 @@ static ptl_process_id_t local_id = {PTL_ID_ANY, PTL_ID_ANY}; int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk) { - if (bulk->b_flags == PTL_BULK_SENT) { - EXIT; - return 1; - } + ENTRY; + + if (bulk->b_flags == PTL_BULK_SENT) + RETURN(1); if (sigismember(&(current->pending.signal), SIGKILL) || sigismember(&(current->pending.signal), SIGINT)) { bulk->b_flags = PTL_RPC_INTR; - EXIT; - return 1; + RETURN(1); } CDEBUG(D_NET, "no event yet\n"); - return 0; + RETURN(0); } -int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer, +int ptl_send_buf(struct ptlrpc_request *request, struct ptlrpc_connection *conn, int portal) { int rc; @@ -77,14 +70,14 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer, ack = PTL_ACK_REQ; break; case PTL_RPC_REQUEST: - request->rq_req_md.start = request->rq_reqbuf; + request->rq_req_md.start = request->rq_reqmsg; request->rq_req_md.length = request->rq_reqlen; request->rq_req_md.eventq = request_out_eq; request->rq_req_md.threshold = 1; ack = PTL_NOACK_REQ; break; case PTL_RPC_REPLY: - request->rq_req_md.start = request->rq_repbuf; + request->rq_req_md.start = request->rq_repmsg; request->rq_req_md.length = request->rq_replen; request->rq_req_md.eventq = reply_out_eq; request->rq_req_md.threshold = 1; @@ -97,7 +90,7 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer, request->rq_req_md.options = PTL_MD_OP_PUT; request->rq_req_md.user_ptr = request; - rc = PtlMDBind(peer->peer_ni, request->rq_req_md, &md_h); + rc = PtlMDBind(conn->c_peer.peer_ni, request->rq_req_md, &md_h); //CERROR("MDBind (outgoing req/rep/bulk): %Lu\n", (__u64)md_h); if (rc != 0) { CERROR("PtlMDBind failed: %d\n", rc); @@ -105,16 +98,17 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer, return rc; } - remote_id.nid = peer->peer_nid; + remote_id.nid = conn->c_peer.peer_nid; remote_id.pid = 0; CDEBUG(D_NET, "Sending %d bytes to portal %d, xid %d\n", - request->rq_req_md.length, portal, request->rq_xid); + request->rq_req_md.length, portal, request->rq_reqmsg->xid); - rc = PtlPut(md_h, ack, remote_id, portal, 0, request->rq_xid, 0, 0); + rc = PtlPut(md_h, ack, remote_id, portal, 0, request->rq_reqmsg->xid, + 0, 0); if (rc != PTL_OK) { CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid, - portal, request->rq_xid, rc); + portal, request->rq_reqmsg->xid, rc); PtlMDUnlink(md_h); } @@ -133,14 +127,15 @@ int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *bulk, int portal) bulk->b_md.options = PTL_MD_OP_PUT; bulk->b_md.user_ptr = bulk; - rc = PtlMDBind(bulk->b_peer.peer_ni, bulk->b_md, &bulk->b_md_h); + rc = PtlMDBind(bulk->b_connection->c_peer.peer_ni, bulk->b_md, + &bulk->b_md_h); if (rc != 0) { CERROR("PtlMDBind failed: %d\n", rc); LBUG(); return rc; } - remote_id.nid = bulk->b_peer.peer_nid; + remote_id.nid = bulk->b_connection->c_peer.peer_nid; remote_id.pid = 0; CDEBUG(D_NET, "Sending %d bytes to portal %d, xid %d\n", @@ -161,11 +156,10 @@ int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *bulk, int portal) int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *bulk) { int rc; - ENTRY; - rc = PtlMEAttach(bulk->b_peer.peer_ni, bulk->b_portal, local_id, - bulk->b_xid, 0, PTL_UNLINK, PTL_INS_AFTER, + rc = PtlMEAttach(bulk->b_connection->c_peer.peer_ni, bulk->b_portal, + local_id, bulk->b_xid, 0, PTL_UNLINK, PTL_INS_AFTER, &bulk->b_me_h); if (rc != PTL_OK) { CERROR("PtlMEAttach failed: %d\n", rc); @@ -213,10 +207,12 @@ int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req) { /* FIXME: we need to increment the count of handled events */ req->rq_type = PTL_RPC_REPLY; + req->rq_repmsg->conn = req->rq_reqmsg->conn; + req->rq_repmsg->token = req->rq_reqmsg->token; req->rq_repmsg->xid = HTON__u32(req->rq_reqmsg->xid); req->rq_repmsg->status = HTON__u32(req->rq_status); req->rq_reqmsg->type = HTON__u32(req->rq_type); - return ptl_send_buf(req, &req->rq_peer, svc->srv_rep_portal); + return ptl_send_buf(req, req->rq_connection, svc->srv_rep_portal); } int ptlrpc_error(struct ptlrpc_service *svc, struct ptlrpc_request *req) @@ -224,13 +220,12 @@ int ptlrpc_error(struct ptlrpc_service *svc, struct ptlrpc_request *req) int rc; ENTRY; - if (req->rq_repbuf) { - CERROR("req has repbuf\n"); + if (req->rq_repmsg) { + CERROR("req already has repmsg\n"); LBUG(); } - rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf); - req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) RETURN(rc); @@ -240,7 +235,7 @@ int ptlrpc_error(struct ptlrpc_service *svc, struct ptlrpc_request *req) RETURN(rc); } -int ptl_send_rpc(struct ptlrpc_request *request, struct ptlrpc_client *cl) +int ptl_send_rpc(struct ptlrpc_request *request) { ptl_process_id_t local_id; int rc; @@ -259,7 +254,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct ptlrpc_client *cl) RETURN(-EINVAL); } - /* request->rq_repbuf is set only when the reply comes in, in + /* request->rq_repmsg is set only when the reply comes in, in * client_packet_callback() */ OBD_ALLOC(repbuf, request->rq_replen); if (!repbuf) @@ -268,10 +263,11 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct ptlrpc_client *cl) local_id.nid = PTL_ID_ANY; local_id.pid = PTL_ID_ANY; - down(&cl->cli_rpc_sem); + down(&request->rq_client->cli_rpc_sem); - rc = PtlMEAttach(request->rq_peer.peer_ni, request->rq_reply_portal, - local_id, request->rq_xid, 0, PTL_UNLINK, + rc = PtlMEAttach(request->rq_connection->c_peer.peer_ni, + request->rq_client->cli_reply_portal, + local_id, request->rq_reqmsg->xid, 0, PTL_UNLINK, PTL_INS_AFTER, &request->rq_reply_me_h); if (rc != PTL_OK) { CERROR("PtlMEAttach failed: %d\n", rc); @@ -296,17 +292,19 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct ptlrpc_client *cl) } CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid %u, portal %u\n", - request->rq_replen, request->rq_xid, request->rq_reply_portal); + request->rq_replen, request->rq_reqmsg->xid, + request->rq_client->cli_request_portal); - list_add(&request->rq_list, &cl->cli_sending_head); - rc = ptl_send_buf(request, &request->rq_peer, request->rq_req_portal); + list_add(&request->rq_list, &request->rq_client->cli_sending_head); + rc = ptl_send_buf(request, request->rq_connection, + request->rq_client->cli_request_portal); RETURN(rc); cleanup2: PtlMEUnlink(request->rq_reply_me_h); cleanup: OBD_FREE(repbuf, request->rq_replen); - up(&cl->cli_rpc_sem); + up(&request->rq_client->cli_rpc_sem); return rc; } diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index ad66380..76140ed 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -26,10 +26,10 @@ #define DEBUG_SUBSYSTEM S_CLASS -#include #include -int lustre_pack_msg(int count, int *lens, char **bufs, int *len, char **buf) +int lustre_pack_msg(int count, int *lens, char **bufs, int *len, + struct lustre_msg **msg) { char *ptr; struct lustre_msg *m; @@ -40,16 +40,16 @@ int lustre_pack_msg(int count, int *lens, char **bufs, int *len, char **buf) *len = sizeof(*m) + count * sizeof(__u32) + size; - OBD_ALLOC(*buf, *len); - if (!*buf) + OBD_ALLOC(*msg, *len); + if (!*msg) RETURN(-ENOMEM); - m = (struct lustre_msg *)(*buf); + m = *msg; m->bufcount = HTON__u32(count); for (i = 0; i < count; i++) m->buflens[i] = HTON__u32(lens[i]); - ptr = *buf + sizeof(*m) + sizeof(__u32) * count; + ptr = (char *)m + sizeof(*m) + sizeof(__u32) * count; for (i = 0; i < count; i++) { char *tmp = NULL; if (bufs) @@ -74,9 +74,8 @@ int lustre_msg_size(int count, int *lengths) return size; } -int lustre_unpack_msg(char *buf, int len) +int lustre_unpack_msg(struct lustre_msg *m, int len) { - struct lustre_msg *m = (struct lustre_msg *)buf; int required_len, i; required_len = sizeof(*m); diff --git a/lustre/ptlrpc/rpc.c b/lustre/ptlrpc/rpc.c index 1a6d58d..72c28be 100644 --- a/lustre/ptlrpc/rpc.c +++ b/lustre/ptlrpc/rpc.c @@ -22,31 +22,26 @@ #define EXPORT_SYMTAB -#include #include -#include #define DEBUG_SUBSYSTEM S_RPC -#include -#include #include - extern int ptlrpc_init_portals(void); extern void ptlrpc_exit_portals(void); static int __init ptlrpc_init(void) { + ptlrpc_init_connection(); return ptlrpc_init_portals(); } static void __exit ptlrpc_exit(void) { ptlrpc_exit_portals(); - - return; + ptlrpc_cleanup_connection(); } MODULE_AUTHOR("Peter J. Braam "); diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 0532837..031ae64 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -22,14 +22,8 @@ #define EXPORT_SYMTAB -#include -#include -#include - #define DEBUG_SUBSYSTEM S_RPC -#include -#include #include extern int request_in_callback(ptl_event_t *ev, void *data); @@ -152,6 +146,55 @@ err_free: return NULL; } +static int handle_incoming_request(struct obd_device *obddev, + struct ptlrpc_service *svc) +{ + struct ptlrpc_request request; + struct lustre_peer peer; + void *start; + int rc; + + /* FIXME: If we move to an event-driven model, we should put the request + * on the stack of mds_handle instead. */ + start = svc->srv_ev.mem_desc.start; + memset(&request, 0, sizeof(request)); + request.rq_obd = obddev; + request.rq_reqmsg = (svc->srv_ev.mem_desc.start + + svc->srv_ev.offset); + request.rq_reqlen = svc->srv_ev.mem_desc.length; + + if (request.rq_reqmsg->xid != svc->srv_ev.match_bits) + LBUG(); + + CDEBUG(D_NET, "got req %d\n", request.rq_reqmsg->xid); + + if (request.rq_reqmsg->conn) { + request.rq_connection = + (void *)(unsigned long)request.rq_reqmsg->conn; + if (request.rq_reqmsg->token != request.rq_connection->c_token) + LBUG(); + ptlrpc_connection_addref(request.rq_connection); + } else { + request.rq_connection = ptlrpc_get_connection(&peer); + if (!request.rq_connection) + LBUG(); + CERROR("Did not find valid/conn token pair.\n"); + } + + peer.peer_nid = svc->srv_ev.initiator.nid; + /* FIXME: this NI should be the incoming NI. + * We don't know how to find that from here. */ + peer.peer_ni = svc->srv_self.peer_ni; + + svc->srv_flags &= ~SVC_EVENT; + + spin_unlock(&svc->srv_lock); + rc = svc->srv_handler(obddev, svc, &request); + ptlrpc_put_connection(request.rq_connection); + ptl_handled_rpc(svc, start); + return rc; +} + static int ptlrpc_main(void *arg) { int rc; @@ -195,31 +238,8 @@ static int ptlrpc_main(void *arg) } if (svc->srv_flags & SVC_EVENT) { - struct ptlrpc_request request; - void *start; svc->srv_flags = SVC_RUNNING; - - /* FIXME: If we move to an event-driven model, - * we should put the request on the stack of - * mds_handle instead. */ - start = svc->srv_ev.mem_desc.start; - memset(&request, 0, sizeof(request)); - request.rq_obd = obddev; - request.rq_reqbuf = (svc->srv_ev.mem_desc.start + - svc->srv_ev.offset); - request.rq_reqlen = svc->srv_ev.mem_desc.length; - request.rq_xid = svc->srv_ev.match_bits; - CDEBUG(D_NET, "got req %d\n", request.rq_xid); - - request.rq_peer.peer_nid = svc->srv_ev.initiator.nid; - /* FIXME: this NI should be the incoming NI. - * We don't know how to find that from here. */ - request.rq_peer.peer_ni = svc->srv_self.peer_ni; - svc->srv_flags &= ~SVC_EVENT; - - spin_unlock(&svc->srv_lock); - rc = svc->srv_handler(obddev, svc, &request); - ptl_handled_rpc(svc, start); + rc = handle_incoming_request(obddev, svc); continue; } diff --git a/lustre/tests/llmount.sh b/lustre/tests/llmount.sh index 094abda..3454211 100755 --- a/lustre/tests/llmount.sh +++ b/lustre/tests/llmount.sh @@ -10,6 +10,7 @@ PORT=1234 setup_portals setup_lustre +read new_fs ext2 /tmp/ost 10000 OST=$LOOPDEV -- 1.8.3.1