From a07a72605bd79e48c40f06a0aa9c97e14d96e031 Mon Sep 17 00:00:00 2001 From: pschwan Date: Sun, 3 Mar 2002 02:29:47 +0000 Subject: [PATCH] Merged branch 'peter' with the tip. Pre-merge tag is 't_20020302_networking'. Avert your eyes. --- lustre/include/linux/lustre_idl.h | 5 +- lustre/include/linux/lustre_mds.h | 19 +- lustre/include/linux/lustre_net.h | 221 +++++++++----- lustre/include/linux/obd_class.h | 71 ++++- lustre/include/linux/obd_osc.h | 8 +- lustre/include/linux/obd_ost.h | 41 +-- lustre/include/linux/obd_support.h | 114 +------ lustre/lib/mds_pack.c | 1 + lustre/lib/obd_pack.c | 64 ++-- lustre/lib/page.c | 1 + lustre/llite/dir.c | 8 +- lustre/llite/namei.c | 1 + lustre/llite/rw.c | 3 +- lustre/llite/super.c | 16 +- lustre/mdc/mdc_reint.c | 1 + lustre/mdc/mdc_request.c | 99 +++--- lustre/mds/handler.c | 395 ++++-------------------- lustre/mds/mds_reint.c | 17 +- lustre/obdclass/class_obd.c | 75 ++--- lustre/obdclass/genops.c | 2 +- lustre/obdfs/rw.c | 14 +- lustre/obdfs/super.c | 1 - lustre/obdfs/symlink.c | 4 - lustre/osc/osc_request.c | 452 ++++++++++++++------------- lustre/ost/ost_handler.c | 591 ++++++++++++++---------------------- lustre/ptlrpc/Makefile.am | 2 +- lustre/ptlrpc/client.c | 115 +++++-- lustre/ptlrpc/events.c | 33 +- lustre/ptlrpc/niobuf.c | 248 +++++++++++---- lustre/ptlrpc/rpc.c | 606 +------------------------------------ lustre/ptlrpc/service.c | 33 +- lustre/tests/llmount.sh | 4 +- lustre/tests/ostreq.sh | 19 +- 33 files changed, 1253 insertions(+), 2031 deletions(-) diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index c8d8969..5f09bf4 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -56,6 +56,7 @@ #define OST_CONNECT 7 #define OST_DISCONNECT 8 #define OST_PUNCH 9 +#define OST_BRW_COMPLETE 10 /* packet types */ #define OST_TYPE_REQ 1 @@ -64,14 +65,14 @@ struct ptlreq_hdr { __u32 opc; - __u64 seqno; + __u32 xid; __u32 status; __u32 type; }; struct ptlrep_hdr { __u32 opc; - __u64 seqno; + __u32 xid; __u32 status; __u32 type; }; diff --git a/lustre/include/linux/lustre_mds.h b/lustre/include/linux/lustre_mds.h index df6daf5..022875a 100644 --- a/lustre/include/linux/lustre_mds.h +++ b/lustre/include/linux/lustre_mds.h @@ -28,8 +28,8 @@ #include +#include #include -#include static inline void l_dput(struct dentry *de) { @@ -44,34 +44,21 @@ struct mds_run_ctxt { mm_segment_t fs; }; -#define MDS_STOPPING 1 -#define MDS_RUNNING 2 -#define MDS_STOPPED 4 #define LUSTRE_MDS_NAME "mds" struct mds_obd { + struct ptlrpc_service *mds_service; + char *mds_fstype; - struct task_struct *mds_thread; - __u32 mds_remote_nid; - wait_queue_head_t mds_waitq; - wait_queue_head_t mds_done_waitq; - struct timer_list *mds_timer; - int mds_interval; - int mds_flags; - struct list_head mds_reqs; struct super_block * mds_sb; struct vfsmount *mds_vfsmnt; struct mds_run_ctxt mds_ctxt; - spinlock_t mds_lock; - __u64 mds_lastino; struct file_operations *mds_fop; struct inode_operations *mds_iop; struct address_space_operations *mds_aops; - struct ptlrpc_service *mds_service; }; - struct mds_update_record { __u32 ur_reclen; __u32 ur_opcode; diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index 0efec74..6e0a233 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -24,21 +24,31 @@ #define _LUSTRE_NET_H #include +#include +#include #include #include -/* FOO_REQUEST_PORTAL receives requests for the FOO subsystem. - * FOO_REPLY_PORTAL receives replies _from_ the FOO subsystem. */ +/* FOO_REQUEST_PORTAL is for incoming requests on the FOO + * FOO_REPLY_PORTAL is for incoming replies on the FOO + * FOO_BULK_PORTAL is for incoming bulk on the FOO + */ + #define OSC_REQUEST_PORTAL 1 #define OSC_REPLY_PORTAL 2 -#define MDS_REQUEST_PORTAL 3 -#define MDS_REPLY_PORTAL 4 -#define OST_REQUEST_PORTAL 5 -#define OST_REPLY_PORTAL 6 -#define MDC_BULK_PORTAL 7 -#define MDS_BULK_PORTAL 8 -#define OSC_BULK_PORTAL 9 -#define OST_BULK_PORTAL 10 +#define OSC_BULK_PORTAL 3 + +#define OST_REQUEST_PORTAL 4 +#define OST_REPLY_PORTAL 5 +#define OST_BULK_PORTAL 6 + +#define MDC_REQUEST_PORTAL 7 +#define MDC_REPLY_PORTAL 8 +#define MDC_BULK_PORTAL 9 + +#define MDS_REQUEST_PORTAL 10 +#define MDS_REPLY_PORTAL 11 +#define MDS_BULK_PORTAL 12 /* default rpc ring length */ #define RPC_RING_LENGTH 2 @@ -46,40 +56,52 @@ /* generic wrappable next */ #define NEXT_INDEX(index, max) (((index+1) >= max) ? 0 : (index+1)) +#define SVC_STOPPING 1 +#define SVC_RUNNING 2 +#define SVC_STOPPED 4 +#define SVC_KILLED 8 +#define SVC_EVENT 16 +#define SVC_LIST 32 +#define SVC_SIGNAL 64 -struct ptlrpc_service { - char *srv_buf[RPC_RING_LENGTH]; - __u32 srv_buf_size; - __u32 srv_me_active; - __u32 srv_me_tail; - __u32 srv_md_active; - __u32 srv_ring_length; - __u32 srv_portal; - __u32 srv_ref_count[RPC_RING_LENGTH]; +typedef int (*rep_unpack_t)(char *, int, struct ptlrep_hdr **, union ptl_rep *); +typedef int (*req_pack_t)(char *, int, char *, int, struct ptlreq_hdr **, + union ptl_req*, int *, char **); - struct lustre_peer srv_self; +typedef int (*req_unpack_t)(char *buf, int len, struct ptlreq_hdr **, + union ptl_req *); +typedef int (*rep_pack_t)(char *buf1, int len1, char *buf2, int len2, + struct ptlrep_hdr **, union ptl_rep*, + int *replen, char **repbuf); - /* FIXME: perhaps a list of EQs, if multiple NIs are used? */ - ptl_handle_eq_t srv_eq_h; +struct ptlrpc_client { + struct lustre_peer cli_server; + struct obd_device *cli_obd; + __u32 cli_request_portal; + __u32 cli_reply_portal; + rep_unpack_t cli_rep_unpack; + req_pack_t cli_req_pack; - ptl_handle_me_t srv_me_h[RPC_RING_LENGTH]; - ptl_process_id_t srv_id; - ptl_md_t srv_md[RPC_RING_LENGTH]; - ptl_handle_md_t srv_md_h[RPC_RING_LENGTH]; - wait_queue_head_t *srv_wait_queue; - int (*srv_req_unpack)(char *buf, int len, struct ptlreq_hdr **, - union ptl_req *); - int (*srv_rep_pack)(char *buf1, int len1, char *buf2, int len2, - struct ptlrep_hdr **, union ptl_rep*, - int *replen, char **repbuf); + spinlock_t cli_lock; + __u32 cli_xid; }; +/* These do double-duty in rq_type and rq_flags */ +#define PTL_RPC_INTR 1 +#define PTL_RPC_REQUEST 2 +#define PTL_RPC_REPLY 3 +#define PTL_RPC_BULK 4 +#define PTL_RPC_SENT 5 +#define PTL_BULK_SENT 6 +#define PTL_BULK_RCVD 6 + struct ptlrpc_request { - int rq_type; /* one of PTLRPC_REQUEST, PTLRPC_REPLY, PTLRPC_BULK */ + int rq_type; /* one of PTL_RPC_REQUEST, PTL_RPC_REPLY, PTL_RPC_BULK */ struct list_head rq_list; - struct mds_obd *rq_obd; - struct ost_obd *rq_ost; + struct obd_device *rq_obd; int rq_status; + int rq_flags; + __u32 rq_connid; __u32 rq_xid; char *rq_reqbuf; @@ -94,64 +116,123 @@ struct ptlrpc_request { char *rq_bulkbuf; int rq_bulklen; - int (*rq_bulk_cb)(struct ptlrpc_request *, void *); - void *rq_reply_handle; + void * rq_reply_handle; wait_queue_head_t rq_wait_for_rep; - wait_queue_head_t rq_wait_for_bulk; + /* incoming reply */ ptl_md_t rq_reply_md; ptl_handle_md_t rq_reply_md_h; ptl_handle_me_t rq_reply_me_h; + /* outgoing req/rep */ ptl_md_t rq_req_md; - ptl_md_t rq_bulk_md; - ptl_handle_md_t rq_bulk_md_h; - ptl_handle_me_t rq_bulk_me_h; + ptl_handle_md_t rq_req_md_h; + __u32 rq_reply_portal; __u32 rq_req_portal; - __u32 rq_bulk_portal; struct lustre_peer rq_peer; }; -struct ptlrpc_client { - struct lustre_peer cli_server; - __u32 cli_request_portal; - __u32 cli_reply_portal; - __u32 cli_xid; - int (*cli_rep_unpack)(char *buf, int len, struct ptlrep_hdr **, - union ptl_rep *); - int (*cli_req_pack)(char *buf1, int len1, char *buf2, int len2, - struct ptlreq_hdr **, union ptl_req*, - int *reqlen, char **reqbuf); - int (*cli_enqueue)(struct ptlrpc_request *req); +struct ptlrpc_bulk_desc { + int b_flags; + struct lustre_peer b_peer; + __u32 b_portal; + char *b_buf; + int b_buflen; + int (*b_cb)(struct ptlrpc_request *, void *); + __u32 b_xid; + + wait_queue_head_t b_waitq; + + ptl_md_t b_md; + ptl_handle_md_t b_md_h; + ptl_handle_me_t b_me_h; }; -/* rpc/rpc.c */ -#define PTLRPC_REQUEST 1 -#define PTLRPC_REPLY 2 -#define PTLRPC_BULK 3 +struct ptlrpc_service { + /* incoming request buffers */ + /* FIXME: perhaps a list of EQs, if multiple NIs are used? */ + char *srv_buf[RPC_RING_LENGTH]; + __u32 srv_buf_size; + __u32 srv_me_active; + __u32 srv_me_tail; + __u32 srv_md_active; + __u32 srv_ring_length; + __u32 srv_req_portal; + __u32 srv_rep_portal; + __u32 srv_ref_count[RPC_RING_LENGTH]; + ptl_handle_me_t srv_me_h[RPC_RING_LENGTH]; + ptl_process_id_t srv_id; + ptl_md_t srv_md[RPC_RING_LENGTH]; + ptl_handle_md_t srv_md_h[RPC_RING_LENGTH]; + __u32 srv_xid; -int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer, - int portal); + /* event queue */ + ptl_handle_eq_t srv_eq_h; + + __u32 srv_flags; + struct lustre_peer srv_self; + struct task_struct *srv_thread; + wait_queue_head_t srv_waitq; + wait_queue_head_t srv_ctl_waitq; + int ost_flags; + + spinlock_t srv_lock; + struct list_head srv_reqs; + ptl_event_t srv_ev; + req_unpack_t srv_req_unpack; + rep_pack_t srv_rep_pack; + int (*srv_handler)(struct obd_device *obddev, + struct ptlrpc_service *svc, + struct ptlrpc_request *req); +}; + +typedef int (*svc_handler_t)(struct obd_device *obddev, + struct ptlrpc_service *svc, + struct ptlrpc_request *req); + + + +/* rpc/niobuf.c */ +int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *); +int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *, int portal); +int ptl_send_buf(struct ptlrpc_request *, struct lustre_peer *, int portal); +int ptlrpc_wait_bulk(struct ptlrpc_bulk_desc *); +int ptlrpc_reply(struct obd_device *obddev, struct ptlrpc_service *svc, + struct ptlrpc_request *req); +int ptlrpc_error(struct obd_device *obddev, struct ptlrpc_service *svc, + struct ptlrpc_request *req); int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer); int ptl_received_rpc(struct ptlrpc_service *service); -int rpc_register_service(struct ptlrpc_service *service, char *uuid); -int rpc_unregister_service(struct ptlrpc_service *service); -int ptlrpc_queue_wait(struct ptlrpc_request *req, - struct ptlrpc_client *cl); + +/* rpc/client.c */ +int ptlrpc_connect_client(int dev, char *uuid, int req_portal, int rep_portal, + req_pack_t req_pack, rep_unpack_t rep_unpack, + struct ptlrpc_client *cl); +int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req); +int ptlrpc_queue_req(struct ptlrpc_client *peer, struct ptlrpc_request *req); struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, int opcode, int namelen, char *name, int tgtlen, char *tgt); void ptlrpc_free_req(struct ptlrpc_request *request); +struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct lustre_peer *); + +/* rpc/service.c */ +struct ptlrpc_service * +ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid, + req_unpack_t unpack, rep_pack_t pack, svc_handler_t); +void ptlrpc_stop_thread(struct ptlrpc_service *svc); +int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc, + char *name); +int rpc_register_service(struct ptlrpc_service *service, char *uuid); +int rpc_unregister_service(struct ptlrpc_service *service); - -/* FIXME */ -#if 1 -# define LUSTRE_NAL "ksocknal" -#else -# define LUSTRE_NAL "kqswnal" -#endif +struct ptlrpc_svc_data { + char *name; + struct ptlrpc_service *svc; + struct obd_device *dev; +}; #endif diff --git a/lustre/include/linux/obd_class.h b/lustre/include/linux/obd_class.h index 7aa1e69..f1629b3 100644 --- a/lustre/include/linux/obd_class.h +++ b/lustre/include/linux/obd_class.h @@ -37,6 +37,7 @@ #include #include +#include #include #endif @@ -65,6 +66,7 @@ typedef struct { #include #include #include +#include #include #include /* #include */ @@ -73,7 +75,7 @@ typedef struct { #include #ifdef __KERNEL__ -/* corresponds to one of the obdx */ +/* corresponds to one of the obd's */ struct obd_device { struct obd_type *obd_type; int obd_minor; @@ -86,6 +88,8 @@ struct obd_device { unsigned int obd_gen_last_id; unsigned long obd_gen_prealloc_quota; struct list_head obd_gen_clients; + struct list_head obd_req_list; + wait_queue_head_t obd_req_waitq; union { struct ext2_obd ext2; struct filter_obd filter; @@ -202,6 +206,24 @@ static inline int obd_check_conn(struct obd_conn *conn) #define OBT(dev) dev->obd_type->typ_ops #define OBP(dev,op) dev->obd_type->typ_ops->o_ ## op +#define OBD_CHECK_SETUP(conn) \ +do { \ + if (!(conn)) { \ + CERROR("NULL connection\n"); \ + return -EINVAL; \ + } \ + \ + if (!((conn)->oc_dev)) { \ + CERROR("NULL device\n"); \ + return -EINVAL; \ + } \ + \ + if ( !((conn)->oc_dev->obd_flags & OBD_SET_UP) ) { \ + CERROR("Device %d not setup\n", (conn)->oc_dev->obd_minor); \ + return -EINVAL; \ + } \ +} while (0) + #define OBD_CHECK_OP(conn,op) \ do { \ int rc = obd_check_conn(conn); \ @@ -220,6 +242,7 @@ static inline int obd_get_info(struct obd_conn *conn, obd_count keylen, void *ke obd_count *vallen, void **val) { int rc; + OBD_CHECK_SETUP(conn); OBD_CHECK_OP(conn,get_info); rc = OBP(conn->oc_dev, get_info)(conn, keylen, key, vallen, val); @@ -231,6 +254,7 @@ static inline int obd_set_info(struct obd_conn *conn, obd_count keylen, void *ke obd_count vallen, void *val) { int rc; + OBD_CHECK_SETUP(conn); OBD_CHECK_OP(conn,set_info); rc = OBP(conn->oc_dev, set_info)(conn, keylen, key, vallen, val); @@ -244,6 +268,7 @@ static inline int obd_cleanup(struct obd_device *obd) int rc; conn.oc_dev = obd; + OBD_CHECK_SETUP(&conn); OBD_CHECK_OP((&conn),cleanup); rc = OBP(conn.oc_dev, cleanup)(obd); @@ -254,6 +279,7 @@ static inline int obd_cleanup(struct obd_device *obd) static inline int obd_create(struct obd_conn *conn, struct obdo *obdo) { int rc; + OBD_CHECK_SETUP(conn); OBD_CHECK_OP(conn,create); rc = OBP(conn->oc_dev, create)(conn, obdo); @@ -264,6 +290,7 @@ static inline int obd_create(struct obd_conn *conn, struct obdo *obdo) static inline int obd_destroy(struct obd_conn *conn, struct obdo *obdo) { int rc; + OBD_CHECK_SETUP(conn); OBD_CHECK_OP(conn,destroy); rc = OBP(conn->oc_dev, destroy)(conn, obdo); @@ -274,6 +301,7 @@ static inline int obd_destroy(struct obd_conn *conn, struct obdo *obdo) static inline int obd_getattr(struct obd_conn *conn, struct obdo *obdo) { int rc; + OBD_CHECK_SETUP(conn); OBD_CHECK_OP(conn,getattr); rc = OBP(conn->oc_dev, getattr)(conn, obdo); @@ -284,6 +312,7 @@ static inline int obd_getattr(struct obd_conn *conn, struct obdo *obdo) static inline int obd_setattr(struct obd_conn *conn, struct obdo *obdo) { int rc; + OBD_CHECK_SETUP(conn); OBD_CHECK_OP(conn,setattr); rc = OBP(conn->oc_dev, setattr)(conn, obdo); @@ -294,6 +323,7 @@ static inline int obd_setattr(struct obd_conn *conn, struct obdo *obdo) static inline int obd_connect(struct obd_conn *conn) { int rc; + OBD_CHECK_SETUP(conn); OBD_CHECK_OP(conn,connect); rc = OBP(conn->oc_dev, connect)(conn); @@ -304,6 +334,7 @@ static inline int obd_connect(struct obd_conn *conn) static inline int obd_disconnect(struct obd_conn *conn) { int rc; + OBD_CHECK_SETUP(conn); OBD_CHECK_OP(conn,disconnect); rc = OBP(conn->oc_dev, disconnect)(conn); @@ -314,6 +345,7 @@ static inline int obd_disconnect(struct obd_conn *conn) static inline int obd_statfs(struct obd_conn *conn, struct statfs *buf) { int rc; + OBD_CHECK_SETUP(conn); OBD_CHECK_OP(conn,statfs); rc = OBP(conn->oc_dev, statfs)(conn, buf); @@ -324,6 +356,7 @@ static inline int obd_statfs(struct obd_conn *conn, struct statfs *buf) static inline int obd_punch(struct obd_conn *conn, struct obdo *tgt, obd_size count, obd_off offset) { int rc; + OBD_CHECK_SETUP(conn); OBD_CHECK_OP(conn,punch); rc = OBP(conn->oc_dev, punch)(conn, tgt, count, offset); @@ -336,6 +369,7 @@ static inline int obd_brw(int rw, struct obd_conn *conn, obd_count num_oa, obd_size *count, obd_off *offset, obd_flag *flags) { int rc; + OBD_CHECK_SETUP(conn); OBD_CHECK_OP(conn,brw); rc = OBP(conn->oc_dev, brw)(rw, conn, num_oa, oa, oa_bufs, buf, @@ -344,6 +378,34 @@ static inline int obd_brw(int rw, struct obd_conn *conn, obd_count num_oa, return rc; } +static inline int obd_preprw(int cmd, struct obd_conn *conn, + int objcount, struct obd_ioobj *obj, + int niocount, struct niobuf *nb, + struct niobuf *res) +{ + int rc; + OBD_CHECK_SETUP(conn); + OBD_CHECK_OP(conn, preprw); + + rc = OBP(conn->oc_dev, preprw)(cmd, conn, objcount, obj, niocount, nb, res); + EXIT; + return rc; +} + +static inline int obd_commitrw(int cmd, struct obd_conn *conn, + int objcount, struct obd_ioobj *obj, + int niocount, struct niobuf *res) +{ + int rc; + OBD_CHECK_SETUP(conn); + OBD_CHECK_OP(conn, commitrw); + + rc = OBP(conn->oc_dev, commitrw)(cmd, conn, objcount, obj, niocount, res); + EXIT; + return rc; +} + + #endif /* @@ -681,11 +743,4 @@ int gen_copy_data(struct obd_conn *dst_conn, struct obdo *dst, extern void obd_sysctl_init (void); extern void obd_sysctl_clean (void); -#define CHKCONN(conn) do { if (!gen_client(conn)) {\ - printk("%s %d invalid client %u\n", __FILE__, __LINE__, \ - conn->oc_id);\ - return -EINVAL; }} while (0) - - - #endif /* __LINUX_CLASS_OBD_H */ diff --git a/lustre/include/linux/obd_osc.h b/lustre/include/linux/obd_osc.h index f341737..cb3e7f8 100644 --- a/lustre/include/linux/obd_osc.h +++ b/lustre/include/linux/obd_osc.h @@ -30,12 +30,12 @@ #include #include -#define OST_EXIT 1 -#define LUSTRE_OST_NAME "ost" - struct osc_obd { struct obd_device *osc_tgt; - struct lustre_peer osc_peer; + struct ptlrpc_client osc_peer; }; +#define OST_EXIT 1 +#define LUSTRE_OST_NAME "ost" + #endif diff --git a/lustre/include/linux/obd_ost.h b/lustre/include/linux/obd_ost.h index a53bd37..0609a7f 100644 --- a/lustre/include/linux/obd_ost.h +++ b/lustre/include/linux/obd_ost.h @@ -29,47 +29,24 @@ #include -#define OST_EXIT 1 #define LUSTRE_OST_NAME "ost" #define LUSTRE_OSC_NAME "osc" struct ost_obd { - struct obd_device *ost_tgt; - struct obd_conn ost_conn; - struct task_struct *ost_thread; - wait_queue_head_t ost_waitq; - wait_queue_head_t ost_done_waitq; - int ost_flags; - spinlock_t ost_lock; - struct list_head ost_reqs; - struct ptlrpc_service *ost_service; -}; - -struct ost_request { - struct list_head rq_list; - struct ost_obd *rq_obd; - int rq_status; - char *rq_reqbuf; - int rq_reqlen; - struct ost_req_hdr *rq_reqhdr; - struct ost_req *rq_req; - - char *rq_repbuf; - int rq_replen; - struct ost_rep_hdr *rq_rephdr; - struct ost_rep *rq_rep; - - void *rq_reply_handle; - wait_queue_head_t rq_wait_for_rep; + struct obd_device *ost_tgt; + struct obd_conn ost_conn; }; /* ost/ost_pack.c */ -int ost_pack_req(char *buf1, int buflen1, char *buf2, int buflen2, struct ptlreq_hdr **hdr, struct ost_req **req, int *len, char **buf); -int ost_unpack_req(char *buf, int len, struct ptlreq_hdr **hdr, struct ost_req **req); -int ost_pack_rep(void *buf1, __u32 buflen1, void *buf2, __u32 buflen2, struct ptlrep_hdr **hdr, struct ost_rep **rep, int *len, char **buf); -int ost_unpack_rep(char *buf, int len, struct ptlrep_hdr **hdr, struct ost_rep **rep); +int ost_pack_req(char *buf1, int buflen1, char *buf2, int buflen2, struct ptlreq_hdr **hdr, union ptl_req *req, int *len, char **buf); +int ost_unpack_req(char *buf, int len, struct ptlreq_hdr **hdr, union ptl_req *req); +int ost_pack_rep(char *buf1, int buflen1, char *buf2, int buflen2, + struct ptlrep_hdr **hdr, union ptl_rep *r, + int *len, char **buf); +int ost_unpack_rep(char *buf, int len, struct ptlrep_hdr **hdr, union ptl_rep *rep); + void ost_pack_niobuf(void **tmp, void *addr, __u64 offset, __u32 len, __u32 flags); void ost_unpack_niobuf(void **tmp, struct niobuf **nbp); diff --git a/lustre/include/linux/obd_support.h b/lustre/include/linux/obd_support.h index 4707963..4aeeaa5 100644 --- a/lustre/include/linux/obd_support.h +++ b/lustre/include/linux/obd_support.h @@ -11,122 +11,10 @@ #include #include -#define obd_unlock_page(page) \ -do { \ - if (PageLocked(page)) { \ - UnlockPage(page); \ - } else { \ - printk("file %s, line %d: expecting locked page\n", \ - __FILE__, __LINE__); \ - } \ -} while(0) - -/* - * Debug code - */ /* global variables */ extern int obd_debug_level; extern int obd_print_entry; -extern int obd_inodes; -extern int obd_pages; -extern long obd_memory; - -#define EXT2_OBD_DEBUG - -#ifdef EXT2_OBD_DEBUG -#define CMD(cmd) (( cmd == READ ) ? "read" : "write") - -/* Inode common information printed out (used by obdfs and ext2obd inodes) */ -#define ICDEBUG(inode) { \ - CDEBUG(D_INFO, "ino %ld, atm %ld, mtm %ld, ctm %ld, size %Ld, " \ - "blocks %ld\n", inode->i_ino, inode->i_atime, \ - inode->i_mtime, inode->i_ctime, inode->i_size, \ - inode->i_blocks); \ - CDEBUG(D_INFO, "mode %o, uid %d, gid %d, nlnk %d, count %d\n", \ - inode->i_mode, inode->i_uid, inode->i_gid, \ - inode->i_nlink, atomic_read(&inode->i_count)); \ -} - -/* Ext2 inode information */ -#define EXDEBUG(inode) { \ - ICDEBUG(inode); \ - CDEBUG(D_INFO, "ext2 blocks: %d %d %d %d %d %d %d %d\n", \ - inode->u.ext2_i.i_data[0], inode->u.ext2_i.i_data[1], \ - inode->u.ext2_i.i_data[2], inode->u.ext2_i.i_data[3], \ - inode->u.ext2_i.i_data[4], inode->u.ext2_i.i_data[5], \ - inode->u.ext2_i.i_data[6], inode->u.ext2_i.i_data[7]); \ -} - -/* OBDFS inode information */ -#define OIDEBUG(inode) { \ - ICDEBUG(inode); \ - CDEBUG(D_INFO,"oinfo: flags 0x%08x\n", obdfs_i2info(inode)->oi_flags);\ - /* obdfs_print_plist(inode); */ \ -} - -#define ODEBUG(obdo) { \ - CDEBUG(D_INFO, "id %ld, atm %ld, mtm %ld, ctm %ld, " \ - "size %ld, blocks %ld\n", \ - (long)(obdo)->o_id, (long)(obdo)->o_atime, \ - (long)(obdo)->o_mtime, (long)(obdo)->o_ctime, \ - (long)(obdo)->o_size, (long)(obdo)->o_blocks); \ - CDEBUG(D_INFO, " mode %o, uid %d, gid %d, flg 0x%0x, " \ - "obdflg 0x%0x, nlnk %d, valid 0x%0x\n", \ - (obdo)->o_mode, (obdo)->o_uid, (obdo)->o_gid, (obdo)->o_flags,\ - (obdo)->o_obdflags, (obdo)->o_nlink, (obdo)->o_valid); \ -} - -#define PDEBUG(page,msg) { \ - if (page){ \ - char *uptodate = (Page_Uptodate(page)) ? "upto" : "outof";\ - char *locked = (PageLocked(page)) ? "" : "un"; \ - char *buffer = page->buffers ? "buffer" : ""; \ - int count = page_count(page); \ - long index = page->index; \ - CDEBUG(D_CACHE, "%s: ** off %ld, %sdate, %slocked, flag %ld,"\ - " cnt %d page 0x%p pages %ld virt %lx %s**\n", \ - msg, index, uptodate, locked, page->flags, count,\ - page, page->mapping ? page->mapping->nrpages : -1,\ - page->virtual, buffer); \ - } else \ - CDEBUG(D_CACHE, "** %s: no page\n", msg); \ -} - -#if 0 -#define iget(sb, ino) obd_iget(sb, ino) -#define iput(sb, ino) obd_iput(sb, ino) - -static inline struct inode *obd_iget(struct super_block *sb, unsigned long ino) -{ - struct inode *inode; - - if ((inode = iget(sb, ino)) == NULL) - CDEBUG(D_INODE, "NULL in iget for %ld\n", ino); - else - obd_inodes++; - return inode; -} - -static inline void obd_iput(struct inode *inode) -{ - if (inode == NULL) - CDEBUG(D_INODE, "NULL in iput\n"); - else - obd_inodes--; -} -#endif - -#else /* EXT2_OBD_DEBUG */ - -#define CDEBUG(mask, format, a...) {} -#define ENTRY {} -#define EXIT {} -#define ODEBUG(obdo) {} -#define EXDEBUG(inode) {} -#define OIDEBUG(inode) {} -#define PDEBUG(page, cmd) {} - -#endif /* EXT2_OBD_DEBUG */ +extern unsigned long obd_memory; #define OBD_ALLOC(ptr, size) \ do { \ diff --git a/lustre/lib/mds_pack.c b/lustre/lib/mds_pack.c index 6a4204f..a92fd370 100644 --- a/lustre/lib/mds_pack.c +++ b/lustre/lib/mds_pack.c @@ -47,6 +47,7 @@ #define DEBUG_SUBSYSTEM S_MDS #include +#include #include #include #include diff --git a/lustre/lib/obd_pack.c b/lustre/lib/obd_pack.c index f811437..1ca6133 100644 --- a/lustre/lib/obd_pack.c +++ b/lustre/lib/obd_pack.c @@ -53,13 +53,14 @@ #include int ost_pack_req(char *buf1, int buflen1, char *buf2, int buflen2, - struct ptlreq_hdr **hdr, struct ost_req **req, + struct ptlreq_hdr **hdr, union ptl_req *r, int *len, char **buf) { + struct ost_req *req; char *ptr; *len = sizeof(**hdr) + size_round(buflen1) + size_round(buflen2) + - sizeof(**req); + sizeof(*req); OBD_ALLOC(*buf, *len); if (!*buf) { @@ -69,18 +70,19 @@ int ost_pack_req(char *buf1, int buflen1, char *buf2, int buflen2, memset(*buf, 0, *len); *hdr = (struct ptlreq_hdr *)(*buf); - *req = (struct ost_req *)(*buf + sizeof(**hdr)); + req = (struct ost_req *)(*buf + sizeof(**hdr)); + r->ost = req; - ptr = *buf + sizeof(**hdr) + sizeof(**req); + ptr = *buf + sizeof(**hdr) + sizeof(*req); (*hdr)->type = OST_TYPE_REQ; - (*req)->buflen1 = NTOH__u32(buflen1); + req->buflen1 = NTOH__u32(buflen1); if (buf1) { LOGL(buf1, buflen1, ptr); } - (*req)->buflen2 = NTOH__u32(buflen2); + req->buflen2 = NTOH__u32(buflen2); if (buf2) { LOGL(buf2, buflen2, ptr); } @@ -88,22 +90,24 @@ int ost_pack_req(char *buf1, int buflen1, char *buf2, int buflen2, } int ost_unpack_req(char *buf, int len, - struct ptlreq_hdr **hdr, struct ost_req **req) + struct ptlreq_hdr **hdr, union ptl_req *r) { + struct ost_req *req; - if (len < sizeof(**hdr) + sizeof(**req)) { + if (len < sizeof(**hdr) + sizeof(*req)) { EXIT; return -EINVAL; } *hdr = (struct ptlreq_hdr *) (buf); - *req = (struct ost_req *) (buf + sizeof(**hdr)); + req = (struct ost_req *) (buf + sizeof(**hdr)); + r->ost = req; - (*req)->buflen1 = NTOH__u32((*req)->buflen1); - (*req)->buflen2 = NTOH__u32((*req)->buflen2); + req->buflen1 = NTOH__u32(req->buflen1); + req->buflen2 = NTOH__u32(req->buflen2); - if (len < sizeof(**hdr) + sizeof(**req) + - size_round((*req)->buflen1) + size_round((*req)->buflen2) ) { + if (len < sizeof(**hdr) + sizeof(*req) + + size_round(req->buflen1) + size_round(req->buflen2) ) { EXIT; return -EINVAL; } @@ -128,14 +132,15 @@ void *ost_req_buf2(struct ost_req *req) size_round(req->buflen1)); } -int ost_pack_rep(void *buf1, __u32 buflen1, void *buf2, __u32 buflen2, - struct ptlrep_hdr **hdr, struct ost_rep **rep, +int ost_pack_rep(char *buf1, int buflen1, char *buf2, int buflen2, + struct ptlrep_hdr **hdr, union ptl_rep *r, int *len, char **buf) { char *ptr; + struct ost_rep *rep; *len = sizeof(**hdr) + size_round(buflen1) + size_round(buflen2) + - sizeof(**rep); + sizeof(*rep); OBD_ALLOC(*buf, *len); if (!*buf) { @@ -145,15 +150,17 @@ int ost_pack_rep(void *buf1, __u32 buflen1, void *buf2, __u32 buflen2, memset(*buf, 0, *len); *hdr = (struct ptlrep_hdr *)(*buf); - *rep = (struct ost_rep *)(*buf + sizeof(**hdr)); - ptr = *buf + sizeof(**hdr) + sizeof(**rep); + rep = (struct ost_rep *)(*buf + sizeof(**hdr)); + r->ost = rep; - (*rep)->buflen1 = NTOH__u32(buflen1); + ptr = *buf + sizeof(**hdr) + sizeof(*rep); + + rep->buflen1 = NTOH__u32(buflen1); if (buf1) { LOGL(buf1, buflen1, ptr); } - (*rep)->buflen2 = NTOH__u32(buflen2); + rep->buflen2 = NTOH__u32(buflen2); if (buf2) { LOGL(buf2, buflen2, ptr); } @@ -162,21 +169,24 @@ int ost_pack_rep(void *buf1, __u32 buflen1, void *buf2, __u32 buflen2, int ost_unpack_rep(char *buf, int len, - struct ptlrep_hdr **hdr, struct ost_rep **rep) + struct ptlrep_hdr **hdr, union ptl_rep *r) { - if (len < sizeof(**hdr) + sizeof(**rep)) { + struct ost_rep *rep; + + if (len < sizeof(**hdr) + sizeof(*rep)) { EXIT; return -EINVAL; } *hdr = (struct ptlrep_hdr *) (buf); - *rep = (struct ost_rep *) (buf + sizeof(**hdr)); + rep = (struct ost_rep *) (buf + sizeof(**hdr)); + r->ost = rep; - (*rep)->buflen1 = NTOH__u32((*rep)->buflen1); - (*rep)->buflen2 = NTOH__u32((*rep)->buflen2); + rep->buflen1 = NTOH__u32(rep->buflen1); + rep->buflen2 = NTOH__u32(rep->buflen2); - if (len < sizeof(**hdr) + sizeof(**rep) + - size_round((*rep)->buflen1) + size_round((*rep)->buflen2) ) { + if (len < sizeof(**hdr) + sizeof(*rep) + + size_round(rep->buflen1) + size_round(rep->buflen2) ) { EXIT; return -EINVAL; } diff --git a/lustre/lib/page.c b/lustre/lib/page.c index 8f1437e..50b559b 100644 --- a/lustre/lib/page.c +++ b/lustre/lib/page.c @@ -43,6 +43,7 @@ #define DEBUG_SUBSYSTEM S_OST #include +#include #include #include #include diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index 1cf2604..b8a55e7 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -35,6 +35,7 @@ #define DEBUG_SUBSYSTEM S_LLIGHT #include +#include #include #include #include @@ -63,10 +64,11 @@ static int ll_dir_readpage(struct file *file, struct page *page) ENTRY; - if ( ((inode->i_size + PAGE_CACHE_SIZE -1)>>PAGE_SHIFT) - <= page->index) { + if ( ((inode->i_size + PAGE_CACHE_SIZE -1)>>PAGE_SHIFT) + <= page->index) { memset(kmap(page), 0, PAGE_CACHE_SIZE); kunmap(page); + EXIT; goto readpage_out; } @@ -94,7 +96,7 @@ static int ll_dir_readpage(struct file *file, struct page *page) SetPageUptodate(page); readpage_out: - obd_unlock_page(page); + UnlockPage(page); EXIT; return rc; } /* ll_dir_readpage */ diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 0d25f91..ced42c7 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -274,6 +274,7 @@ int ll_mdc_rename(struct inode *src, struct inode *tgt, * If the create succeeds, we fill in the inode information * with d_instantiate(). */ + static int ll_create (struct inode * dir, struct dentry * dentry, int mode) { int err; diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index 5380675..7d01995 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -27,6 +27,7 @@ #define DEBUG_SUBSYSTEM S_LLIGHT #include +#include #include #include #include @@ -214,7 +215,7 @@ int ll_readpage(struct file *file, struct page *page) readpage_out: SetPageUptodate(page); - obd_unlock_page(page); + UnlockPage(page); EXIT; return 0; } /* ll_readpage */ diff --git a/lustre/llite/super.c b/lustre/llite/super.c index f2b171c..8a3f636 100644 --- a/lustre/llite/super.c +++ b/lustre/llite/super.c @@ -39,7 +39,6 @@ extern struct address_space_operations ll_aops; extern struct address_space_operations ll_dir_aops; struct super_operations ll_super_operations; -long obd_memory = 0; static char *ll_read_opt(const char *opt, char *data) { @@ -140,9 +139,16 @@ static struct super_block * ll_read_super(struct super_block *sb, } connected = 1; - err = mdc_create_client("mds", &sbi->ll_mds_client); - if (err) { - CERROR("cannot find MDS\n"); + /* the first parameter should become an mds device no */ + err = ptlrpc_connect_client(-1, "mds", + MDS_REQUEST_PORTAL, + MDC_REPLY_PORTAL, + mds_pack_req, + mds_unpack_rep, + &sbi->ll_mds_client); + + if (err) { + CERROR("cannot find MDS\n"); sb = NULL; goto ERR; } @@ -218,7 +224,7 @@ static void ll_delete_inode(struct inode *inode) CERROR("no memory\n"); } - err = obd_destroy(IID(inode), oa); + err = obd_destroy(ll_i2obdconn(inode), oa); CDEBUG(D_INODE, "obd destroy of %Ld error %d\n", oa->o_id, err); obdo_free(oa); diff --git a/lustre/mdc/mdc_reint.c b/lustre/mdc/mdc_reint.c index 7eed74b..89ae7a1 100644 --- a/lustre/mdc/mdc_reint.c +++ b/lustre/mdc/mdc_reint.c @@ -45,6 +45,7 @@ #define DEBUG_SUBSYSTEM S_MDC #include +#include #include #include #include diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index b027372..91a7181 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -45,6 +45,7 @@ #define DEBUG_SUBSYSTEM S_MDC #include +#include #include #include #include @@ -54,13 +55,13 @@ extern int mds_queue_req(struct ptlrpc_request *); -int mdc_getattr(struct ptlrpc_client *peer, ino_t ino, int type, int valid, +int mdc_getattr(struct ptlrpc_client *cl, ino_t ino, int type, int valid, struct mds_rep **rep, struct ptlrep_hdr **hdr) { struct ptlrpc_request *request; int rc; - request = ptlrpc_prep_req(peer, MDS_GETATTR, 0, NULL, 0, NULL); + request = ptlrpc_prep_req(cl, MDS_GETATTR, 0, NULL, 0, NULL); if (!request) { CERROR("llight request: cannot pack\n"); return -ENOMEM; @@ -72,7 +73,7 @@ int mdc_getattr(struct ptlrpc_client *peer, ino_t ino, int type, int valid, request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct mds_rep); - rc = ptlrpc_queue_wait(request, peer); + rc = ptlrpc_queue_wait(cl, request); if (rc) { CERROR("llight request: error in handling %d\n", rc); goto out; @@ -92,13 +93,13 @@ int mdc_getattr(struct ptlrpc_client *peer, ino_t ino, int type, int valid, return rc; } -int mdc_open(struct ptlrpc_client *peer, ino_t ino, int type, int flags, +int mdc_open(struct ptlrpc_client *cl, ino_t ino, int type, int flags, __u64 *fh, struct mds_rep **rep, struct ptlrep_hdr **hdr) { struct ptlrpc_request *request; int rc; - request = ptlrpc_prep_req(peer, MDS_OPEN, 0, NULL, 0, NULL); + request = ptlrpc_prep_req(cl, MDS_OPEN, 0, NULL, 0, NULL); if (!request) { CERROR("llight request: cannot pack\n"); return -ENOMEM; @@ -109,7 +110,7 @@ int mdc_open(struct ptlrpc_client *peer, ino_t ino, int type, int flags, request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct mds_rep); - rc = ptlrpc_queue_wait(request, peer); + rc = ptlrpc_queue_wait(cl, request); if (rc) { CERROR("llight request: error in handling %d\n", rc); goto out; @@ -129,13 +130,13 @@ int mdc_open(struct ptlrpc_client *peer, ino_t ino, int type, int flags, } -int mdc_close(struct ptlrpc_client *peer, ino_t ino, int type, __u64 fh, +int mdc_close(struct ptlrpc_client *cl, ino_t ino, int type, __u64 fh, struct mds_rep **rep, struct ptlrep_hdr **hdr) { struct ptlrpc_request *request; int rc; - request = ptlrpc_prep_req(peer, MDS_CLOSE, 0, NULL, 0, NULL); + request = ptlrpc_prep_req(cl, MDS_CLOSE, 0, NULL, 0, NULL); if (!request) { CERROR("llight request: cannot pack\n"); return -ENOMEM; @@ -146,7 +147,7 @@ int mdc_close(struct ptlrpc_client *peer, ino_t ino, int type, __u64 fh, request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct mds_rep); - rc = ptlrpc_queue_wait(request, peer); + rc = ptlrpc_queue_wait(cl, request); if (rc) { CERROR("llight request: error in handling %d\n", rc); goto out; @@ -164,10 +165,11 @@ int mdc_close(struct ptlrpc_client *peer, ino_t ino, int type, __u64 fh, return rc; } -int mdc_readpage(struct ptlrpc_client *peer, ino_t ino, int type, __u64 offset, +int mdc_readpage(struct ptlrpc_client *cl, ino_t ino, int type, __u64 offset, char *addr, struct mds_rep **rep, struct ptlrep_hdr **hdr) { struct ptlrpc_request *request; + struct ptlrpc_bulk_desc *bulk; struct niobuf niobuf; int rc; @@ -175,31 +177,48 @@ int mdc_readpage(struct ptlrpc_client *peer, ino_t ino, int type, __u64 offset, CDEBUG(D_INODE, "inode: %ld\n", ino); - request = ptlrpc_prep_req(peer, MDS_READPAGE, 0, NULL, + bulk = ptlrpc_prep_bulk(&cl->cli_server); + if (bulk == NULL) { + CERROR("%s: cannot init bulk desc\n", __FUNCTION__); + return -ENOMEM; + } + + request = ptlrpc_prep_req(cl, MDS_READPAGE, 0, NULL, sizeof(struct niobuf), (char *)&niobuf); if (!request) { - CERROR("mdc request: cannot pack\n"); + OBD_FREE(bulk, sizeof(*bulk)); + CERROR("%s: cannot pack\n", __FUNCTION__); return -ENOMEM; } + bulk->b_buflen = PAGE_SIZE; + bulk->b_buf = (void *)(long)niobuf.addr; + bulk->b_portal = MDS_BULK_PORTAL; + + spin_lock(&cl->cli_lock); + bulk->b_xid = cl->cli_xid++; + spin_unlock(&cl->cli_lock); + + rc = ptlrpc_wait_bulk(bulk); + if (rc) { + CERROR("%s: couldn't setup bulk sink: error %d.\n", + __FUNCTION__, rc); + goto out; + } + request->rq_req.mds->fid1.id = ino; request->rq_req.mds->fid1.f_type = type; request->rq_req.mds->size = offset; request->rq_req.mds->tgtlen = sizeof(niobuf); + request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct mds_rep); - request->rq_bulklen = PAGE_SIZE; - request->rq_bulkbuf = (void *)(long)niobuf.addr; - request->rq_bulk_portal = MDS_BULK_PORTAL; - request->rq_replen = - sizeof(struct ptlrep_hdr) + sizeof(struct mds_rep); - - rc = ptlrpc_queue_wait(request, peer); + rc = ptlrpc_queue_wait(cl, request); if (rc) { CERROR("mdc request: error in handling %d\n", rc); goto out; } - CDEBUG(0, "mode: %o\n", request->rq_rep.mds->mode); + CDEBUG(D_INODE, "mode: %o\n", request->rq_rep.mds->mode); if (rep) { *rep = request->rq_rep.mds; @@ -208,16 +227,17 @@ int mdc_readpage(struct ptlrpc_client *peer, ino_t ino, int type, __u64 offset, *hdr = request->rq_rephdr; } - out: + out: ptlrpc_free_req(request); + OBD_FREE(bulk, sizeof(*bulk)); return rc; } -int mdc_reint(struct ptlrpc_client *peer, struct ptlrpc_request *request) +int mdc_reint(struct ptlrpc_client *cl, struct ptlrpc_request *request) { int rc; - rc = ptlrpc_queue_wait(request, peer); + rc = ptlrpc_queue_wait(cl, request); if (rc) { CERROR("mdc request: error in handling %d\n", rc); } @@ -225,30 +245,11 @@ int mdc_reint(struct ptlrpc_client *peer, struct ptlrpc_request *request) return rc; } -int mdc_create_client(char *uuid, struct ptlrpc_client *cl) -{ - int err; - - memset(cl, 0, sizeof(*cl)); - cl->cli_xid = 0; - cl->cli_rep_unpack = mds_unpack_rep; - cl->cli_req_pack = mds_pack_req; - err = kportal_uuid_to_peer("mds", &cl->cli_server); - if (err == 0) { - cl->cli_request_portal = MDS_REQUEST_PORTAL; - cl->cli_reply_portal = MDS_REPLY_PORTAL; - - } else { - cl->cli_enqueue = mds_queue_req; - } - return 0; -} - static int request_ioctl(struct inode *inode, struct file *file, unsigned int cmd, unsigned long arg) { int err; - struct ptlrpc_client peer; + struct ptlrpc_client cl; ENTRY; @@ -266,7 +267,8 @@ static int request_ioctl(struct inode *inode, struct file *file, return -EINVAL; } - err = mdc_create_client("mds", &peer); + /* XXX complete this to get debugging working again */ + err = -1; if (err) { CERROR("cannot create client"); return -EINVAL; @@ -276,7 +278,7 @@ static int request_ioctl(struct inode *inode, struct file *file, case IOC_REQUEST_GETATTR: { struct ptlrep_hdr *hdr = NULL; CERROR("-- getting attr for ino 2\n"); - err = mdc_getattr(&peer, 2, S_IFDIR, ~0, NULL, &hdr); + err = mdc_getattr(&cl, 2, S_IFDIR, ~0, NULL, &hdr); if (hdr) { /* FIXME: there must be a better way to get the size */ OBD_FREE(hdr, sizeof(struct ptlrep_hdr) + @@ -295,7 +297,7 @@ static int request_ioctl(struct inode *inode, struct file *file, break; } CERROR("-- readpage 0 for ino 2\n"); - err = mdc_readpage(&peer, 2, S_IFDIR, 0, buf, NULL, &hdr); + err = mdc_readpage(&cl, 2, S_IFDIR, 0, buf, NULL, &hdr); CERROR("-- done err %d\n", err); if (!err) { CERROR("-- status: %d\n", hdr->status); @@ -318,7 +320,7 @@ static int request_ioctl(struct inode *inode, struct file *file, iattr.ia_atime = 0; iattr.ia_valid = ATTR_MODE | ATTR_ATIME; - err = mdc_setattr(&peer, &inode, &iattr, NULL, &hdr); + err = mdc_setattr(&cl, &inode, &iattr, NULL, &hdr); CERROR("-- done err %d\n", err); if (!err) { CERROR("-- status: %d\n", hdr->status); @@ -340,7 +342,7 @@ static int request_ioctl(struct inode *inode, struct file *file, iattr.ia_atime = 0; iattr.ia_valid = ATTR_MODE | ATTR_ATIME; - err = mdc_create(&peer, &inode, + err = mdc_create(&cl, &inode, "foofile", strlen("foofile"), NULL, 0, 0100707, 47114711, 11, 47, 0, NULL, &hdr); @@ -392,7 +394,6 @@ MODULE_AUTHOR("Peter J. Braam "); MODULE_DESCRIPTION("Lustre MDS Request Tester v1.0"); MODULE_LICENSE("GPL"); -EXPORT_SYMBOL(mdc_create_client); EXPORT_SYMBOL(mdc_create); EXPORT_SYMBOL(mdc_unlink); EXPORT_SYMBOL(mdc_rename); diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index d2b611e..a626e5b 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -31,6 +31,7 @@ #define DEBUG_SUBSYSTEM S_MDS #include +#include #include #include #include @@ -38,43 +39,6 @@ #include #include -// XXX for testing -static struct mds_obd *MDS; - -// XXX make this networked! -static int mds_queue_req(struct ptlrpc_request *req) -{ - struct ptlrpc_request *srv_req; - - if (!MDS) { - EXIT; - return -1; - } - - OBD_ALLOC(srv_req, sizeof(*srv_req)); - if (!srv_req) { - EXIT; - return -ENOMEM; - } - - CDEBUG(0, "---> MDS at %d %p, incoming req %p, srv_req %p\n", - __LINE__, MDS, req, srv_req); - - memset(srv_req, 0, sizeof(*req)); - - /* move the request buffer */ - srv_req->rq_reqbuf = req->rq_reqbuf; - srv_req->rq_reqlen = req->rq_reqlen; - srv_req->rq_obd = MDS; - - /* remember where it came from */ - srv_req->rq_reply_handle = req; - - list_add(&srv_req->rq_list, &MDS->mds_reqs); - wake_up(&MDS->mds_waitq); - return 0; -} - int mds_sendpage(struct ptlrpc_request *req, struct file *file, __u64 offset, struct niobuf *dst) { @@ -91,12 +55,20 @@ int mds_sendpage(struct ptlrpc_request *req, struct file *file, if (rc != PAGE_SIZE) return -EIO; } else { + struct ptlrpc_bulk_desc *bulk; char *buf; - DECLARE_WAITQUEUE(wait, current); + + bulk = ptlrpc_prep_bulk(&req->rq_peer); + if (bulk == NULL) + return -ENOMEM; + + bulk->b_xid = req->rq_xid; OBD_ALLOC(buf, PAGE_SIZE); - if (!buf) + if (!buf) { + OBD_FREE(bulk, sizeof(*bulk)); return -ENOMEM; + } set_fs(KERNEL_DS); rc = generic_file_read(file, buf, PAGE_SIZE, &offset); @@ -107,98 +79,26 @@ int mds_sendpage(struct ptlrpc_request *req, struct file *file, return -EIO; } - req->rq_type = PTLRPC_BULK; - req->rq_bulkbuf = buf; - req->rq_bulklen = PAGE_SIZE; - - init_waitqueue_head(&req->rq_wait_for_bulk); - rc = ptl_send_buf(req, &req->rq_peer, MDS_BULK_PORTAL); - add_wait_queue(&req->rq_wait_for_bulk, &wait); - /* The bulk callback will set rq->bulkbuf to NULL when it's - * been ACKed and it's finished using it. */ - while (req->rq_bulkbuf != NULL) { - set_current_state(TASK_INTERRUPTIBLE); + bulk->b_buf = buf; + bulk->b_buflen = PAGE_SIZE; - /* if this process really wants to die, let it go */ - if (sigismember(&(current->pending.signal), SIGKILL) || - sigismember(&(current->pending.signal), SIGINT)) - break; - - schedule(); - } - remove_wait_queue(&req->rq_wait_for_bulk, &wait); - set_current_state(TASK_RUNNING); + rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL); + wait_event_interruptible(bulk->b_waitq, + ptlrpc_check_bulk_sent(bulk)); - if (req->rq_bulkbuf != NULL) { + if (bulk->b_flags == PTL_RPC_INTR) { EXIT; + /* FIXME: hey hey, we leak here. */ return -EINTR; } + OBD_FREE(bulk, sizeof(*bulk)); OBD_FREE(buf, PAGE_SIZE); - req->rq_bulklen = 0; /* FIXME: eek. */ } return 0; } -int mds_reply(struct ptlrpc_request *req) -{ - struct ptlrpc_request *clnt_req = req->rq_reply_handle; - - ENTRY; - - if (req->rq_obd->mds_service != NULL) { - /* This is a request that came from the network via portals. */ - - /* FIXME: we need to increment the count of handled events */ - req->rq_type = PTLRPC_REPLY; - ptl_send_buf(req, &req->rq_peer, MDS_REPLY_PORTAL); - } else { - /* This is a local request that came from another thread. */ - - /* move the reply to the client */ - clnt_req->rq_replen = req->rq_replen; - clnt_req->rq_repbuf = req->rq_repbuf; - req->rq_repbuf = NULL; - req->rq_replen = 0; - - /* free the request buffer */ - OBD_FREE(req->rq_reqbuf, req->rq_reqlen); - req->rq_reqbuf = NULL; - - /* wake up the client */ - wake_up_interruptible(&clnt_req->rq_wait_for_rep); - } - - EXIT; - return 0; -} - -int mds_error(struct ptlrpc_request *req) -{ - struct ptlrep_hdr *hdr; - - ENTRY; - - OBD_ALLOC(hdr, sizeof(*hdr)); - if (!hdr) { - EXIT; - return -ENOMEM; - } - - memset(hdr, 0, sizeof(*hdr)); - - hdr->seqno = req->rq_reqhdr->seqno; - hdr->status = req->rq_status; - hdr->type = MDS_TYPE_ERR; - - req->rq_repbuf = (char *)hdr; - req->rq_replen = sizeof(*hdr); - - EXIT; - return mds_reply(req); -} - struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt) { @@ -284,10 +184,10 @@ int mds_getattr(struct ptlrpc_request *req) return 0; } - req->rq_rephdr->seqno = req->rq_reqhdr->seqno; + req->rq_rephdr->xid = req->rq_reqhdr->xid; rep = req->rq_rep.mds; - de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, NULL); + de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, NULL); if (IS_ERR(de)) { EXIT; req->rq_rephdr->status = -ENOENT; @@ -313,7 +213,6 @@ int mds_getattr(struct ptlrpc_request *req) int mds_open(struct ptlrpc_request *req) { struct dentry *de; - struct inode *inode; struct mds_rep *rep; struct file *file; struct vfsmount *mnt; @@ -329,10 +228,10 @@ int mds_open(struct ptlrpc_request *req) return 0; } - req->rq_rephdr->seqno = req->rq_reqhdr->seqno; + req->rq_rephdr->xid = req->rq_reqhdr->xid; rep = req->rq_rep.mds; - de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, &mnt); + de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt); if (IS_ERR(de)) { EXIT; req->rq_rephdr->status = -ENOENT; @@ -346,7 +245,7 @@ int mds_open(struct ptlrpc_request *req) } rep->objid = (__u64) (unsigned long)file; - mds_get_objid(inode, &rep->objid); + //mds_get_objid(inode, &rep->objid); dput(de); return 0; } @@ -370,10 +269,10 @@ int mds_readpage(struct ptlrpc_request *req) return 0; } - req->rq_rephdr->seqno = req->rq_reqhdr->seqno; + req->rq_rephdr->xid = req->rq_reqhdr->xid; rep = req->rq_rep.mds; - de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, &mnt); + de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt); if (IS_ERR(de)) { EXIT; req->rq_rephdr->status = PTR_ERR(de); @@ -421,8 +320,8 @@ int mds_reint(struct ptlrpc_request *req) return 0; } -//int mds_handle(struct mds_conn *conn, int len, char *buf) -int mds_handle(struct ptlrpc_request *req) +int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc, + struct ptlrpc_request *req) { int rc; struct ptlreq_hdr *hdr; @@ -464,7 +363,7 @@ int mds_handle(struct ptlrpc_request *req) break; default: - return mds_error(req); + return ptlrpc_error(dev, svc, req); } out: @@ -474,192 +373,16 @@ out: } if( req->rq_status) { - mds_error(req); + ptlrpc_error(dev, svc, req); } else { CDEBUG(D_INODE, "sending reply\n"); - mds_reply(req); + ptlrpc_reply(dev, svc, req); } return 0; } -static void mds_timer_run(unsigned long __data) -{ - struct task_struct * p = (struct task_struct *) __data; - - wake_up_process(p); -} - -int mds_main(void *arg) -{ - struct mds_obd *mds = (struct mds_obd *) arg; - struct timer_list timer; - DECLARE_WAITQUEUE(wait, current); - - lock_kernel(); - daemonize(); - spin_lock_irq(¤t->sigmask_lock); - sigfillset(¤t->blocked); - recalc_sigpending(current); - spin_unlock_irq(¤t->sigmask_lock); - - sprintf(current->comm, "lustre_mds"); - - /* Set up an interval timer which can be used to trigger a - wakeup after the interval expires */ - init_timer(&timer); - timer.data = (unsigned long) current; - timer.function = mds_timer_run; - mds->mds_timer = &timer; - - /* Record that the thread is running */ - mds->mds_thread = current; - mds->mds_flags = MDS_RUNNING; - wake_up(&mds->mds_done_waitq); - - /* And now, wait forever for commit wakeup events. */ - while (1) { - int signal; - int rc; - - wake_up(&mds->mds_done_waitq); - CDEBUG(D_INODE, "mds_wakes pick up req here and continue\n"); - - if (mds->mds_service != NULL) { - ptl_event_t ev; - struct ptlrpc_request request; - struct ptlrpc_service *service; - - CDEBUG(D_IOCTL, "-- sleeping\n"); - signal = 0; - add_wait_queue(&mds->mds_waitq, &wait); - while (1) { - set_current_state(TASK_INTERRUPTIBLE); - rc = PtlEQGet(mds->mds_service->srv_eq_h, &ev); - if (rc == PTL_OK || rc == PTL_EQ_DROPPED) - break; - CERROR("EQGet rc %d\n", rc); - if (mds->mds_flags & MDS_STOPPING) - break; - - /* if this process really wants to die, - * let it go */ - if (sigismember(&(current->pending.signal), - SIGKILL) || - sigismember(&(current->pending.signal), - SIGINT)) { - signal = 1; - break; - } - - schedule(); - } - remove_wait_queue(&mds->mds_waitq, &wait); - set_current_state(TASK_RUNNING); - CDEBUG(D_IOCTL, "-- done\n"); - - if (signal == 1) { - /* We broke out because of a signal */ - EXIT; - break; - } - if (mds->mds_flags & MDS_STOPPING) { - break; - } - - service = (struct ptlrpc_service *)ev.mem_desc.user_ptr; - - /* FIXME: If we move to an event-driven model, - * we should put the request on the stack of - * mds_handle instead. */ - memset(&request, 0, sizeof(request)); - request.rq_reqbuf = ev.mem_desc.start + ev.offset; - request.rq_reqlen = ev.mem_desc.length; - request.rq_obd = MDS; - request.rq_xid = ev.match_bits; - CERROR("got req %d\n", request.rq_xid); - - request.rq_peer.peer_nid = ev.initiator.nid; - /* FIXME: this NI should be the incoming NI. - * We don't know how to find that from here. */ - request.rq_peer.peer_ni = - mds->mds_service->srv_self.peer_ni; - rc = mds_handle(&request); - - /* Inform the rpc layer the event has been handled */ - ptl_received_rpc(service); - } else { - struct ptlrpc_request *request; - - CDEBUG(D_IOCTL, "-- sleeping\n"); - add_wait_queue(&mds->mds_waitq, &wait); - while (1) { - spin_lock(&mds->mds_lock); - if (!list_empty(&mds->mds_reqs)) - break; - - set_current_state(TASK_INTERRUPTIBLE); - - /* if this process really wants to die, - * let it go */ - if (sigismember(&(current->pending.signal), - SIGKILL) || - sigismember(&(current->pending.signal), - SIGINT)) - break; - - spin_unlock(&mds->mds_lock); - - schedule(); - } - remove_wait_queue(&mds->mds_waitq, &wait); - set_current_state(TASK_RUNNING); - CDEBUG(D_IOCTL, "-- done\n"); - - if (list_empty(&mds->mds_reqs)) { - CDEBUG(D_INODE, "woke because of signal\n"); - spin_unlock(&mds->mds_lock); - } else { - request = list_entry(mds->mds_reqs.next, - struct ptlrpc_request, - rq_list); - list_del(&request->rq_list); - spin_unlock(&mds->mds_lock); - rc = mds_handle(request); - } - } - } - - del_timer_sync(mds->mds_timer); - - /* XXX maintain a list of all managed devices: cleanup here */ - - mds->mds_thread = NULL; - wake_up(&mds->mds_done_waitq); - CERROR("lustre_mds: exiting\n"); - return 0; -} - -static void mds_stop_srv_thread(struct mds_obd *mds) -{ - mds->mds_flags |= MDS_STOPPING; - - while (mds->mds_thread) { - wake_up(&mds->mds_waitq); - sleep_on(&mds->mds_done_waitq); - } -} - -static void mds_start_srv_thread(struct mds_obd *mds) -{ - init_waitqueue_head(&mds->mds_waitq); - init_waitqueue_head(&mds->mds_done_waitq); - kernel_thread(mds_main, (void *)mds, CLONE_VM | CLONE_FS | CLONE_FILES); - while (!mds->mds_thread) - sleep_on(&mds->mds_done_waitq); -} - /* mount the file system (secretly) */ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf) @@ -668,11 +391,9 @@ static int mds_setup(struct obd_device *obddev, obd_count len, struct obd_ioctl_data* data = buf; struct mds_obd *mds = &obddev->u.mds; struct vfsmount *mnt; - struct lustre_peer peer; int err; ENTRY; - mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL); err = PTR_ERR(mnt); if (IS_ERR(mnt)) { @@ -692,31 +413,22 @@ static int mds_setup(struct obd_device *obddev, obd_count len, mds->mds_ctxt.pwdmnt = mnt; mds->mds_ctxt.pwd = mnt->mnt_root; mds->mds_ctxt.fs = KERNEL_DS; - mds->mds_remote_nid = 0; - - INIT_LIST_HEAD(&mds->mds_reqs); - mds->mds_thread = NULL; - mds->mds_flags = 0; - mds->mds_interval = 3 * HZ; - MDS = mds; - spin_lock_init(&obddev->u.mds.mds_lock); - - err = kportal_uuid_to_peer("self", &peer); - if (err == 0) { - OBD_ALLOC(mds->mds_service, sizeof(*mds->mds_service)); - if (mds->mds_service == NULL) - return -ENOMEM; - mds->mds_service->srv_buf_size = 64 * 1024; - //mds->mds_service->srv_buf_size = 1024; - mds->mds_service->srv_portal = MDS_REQUEST_PORTAL; - memcpy(&mds->mds_service->srv_self, &peer, sizeof(peer)); - mds->mds_service->srv_wait_queue = &mds->mds_waitq; + mds->mds_service = ptlrpc_init_svc( 64 * 1024, + MDS_REQUEST_PORTAL, + MDC_REPLY_PORTAL, + "self", + mds_unpack_req, + mds_pack_rep, + mds_handle); - rpc_register_service(mds->mds_service, "self"); - } + rpc_register_service(mds->mds_service, "self"); - mds_start_srv_thread(mds); + err = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds"); + if (err) { + CERROR("cannot start thread\n"); + } + MOD_INC_USE_COUNT; EXIT; @@ -741,8 +453,14 @@ static int mds_cleanup(struct obd_device * obddev) return -EBUSY; } - MDS = NULL; - mds_stop_srv_thread(mds); + ptlrpc_stop_thread(mds->mds_service); + rpc_unregister_service(mds->mds_service); + + if (!list_empty(&mds->mds_service->srv_reqs)) { + // XXX reply with errors and clean up + CERROR("Request list not empty!\n"); + } + rpc_unregister_service(mds->mds_service); OBD_FREE(mds->mds_service, sizeof(*mds->mds_service)); @@ -752,11 +470,6 @@ static int mds_cleanup(struct obd_device * obddev) return 0; } - if (!list_empty(&mds->mds_reqs)) { - // XXX reply with errors and clean up - CDEBUG(D_INODE, "Request list not empty!\n"); - } - unlock_kernel(); mntput(mds->mds_vfsmnt); mds->mds_sb = 0; @@ -789,9 +502,5 @@ MODULE_AUTHOR("Peter J. Braam "); MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01"); MODULE_LICENSE("GPL"); - -// for testing (maybe this stays) -EXPORT_SYMBOL(mds_queue_req); - module_init(mds_init); module_exit(mds_exit); diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index 2ccd98c..76523e6 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -30,6 +30,7 @@ #define DEBUG_SUBSYSTEM S_MDS #include +#include #include #include #include @@ -42,7 +43,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, struct ptlrpc_reques { struct dentry *de; - de = mds_fid2dentry(req->rq_obd, rec->ur_fid1, NULL); + de = mds_fid2dentry(&req->rq_obd->u.mds, rec->ur_fid1, NULL); if (IS_ERR(de)) { req->rq_rephdr->status = -ESTALE; return 0; @@ -98,7 +99,7 @@ static int mds_reint_create(struct mds_update_record *rec, int rc; ENTRY; - de = mds_fid2dentry(req->rq_obd, rec->ur_fid1, NULL); + de = mds_fid2dentry(&req->rq_obd->u.mds, rec->ur_fid1, NULL); if (IS_ERR(de)) { req->rq_rephdr->status = -ESTALE; EXIT; @@ -179,7 +180,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int rc; ENTRY; - de = mds_fid2dentry(req->rq_obd, rec->ur_fid1, NULL); + de = mds_fid2dentry(&req->rq_obd->u.mds, rec->ur_fid1, NULL); if (IS_ERR(de)) { req->rq_rephdr->status = -ESTALE; EXIT; @@ -235,13 +236,13 @@ static int mds_reint_link(struct mds_update_record *rec, ENTRY; rc = -ESTALE; - de_src = mds_fid2dentry(req->rq_obd, rec->ur_fid1, NULL); + de_src = mds_fid2dentry(&req->rq_obd->u.mds, rec->ur_fid1, NULL); if (IS_ERR(de_src)) { EXIT; goto out_link; } - de_tgt_dir = mds_fid2dentry(req->rq_obd, rec->ur_fid2, NULL); + de_tgt_dir = mds_fid2dentry(&req->rq_obd->u.mds, rec->ur_fid2, NULL); if (IS_ERR(de_tgt_dir)) { rc = -ESTALE; EXIT; @@ -285,13 +286,13 @@ static int mds_reint_rename(struct mds_update_record *rec, ENTRY; rc = -ESTALE; - de_srcdir = mds_fid2dentry(req->rq_obd, rec->ur_fid1, NULL); + de_srcdir = mds_fid2dentry(&req->rq_obd->u.mds, rec->ur_fid1, NULL); if (IS_ERR(de_srcdir)) { EXIT; goto out_rename; } - de_tgtdir = mds_fid2dentry(req->rq_obd, rec->ur_fid2, NULL); + de_tgtdir = mds_fid2dentry(&req->rq_obd->u.mds, rec->ur_fid2, NULL); if (IS_ERR(de_tgtdir)) { rc = -ESTALE; EXIT; @@ -351,7 +352,7 @@ int mds_reint_rec(struct mds_update_record *rec, struct ptlrpc_request *req) rc = req->rq_status = -ENOMEM; return rc; } - req->rq_rephdr->seqno = req->rq_reqhdr->seqno; + req->rq_rephdr->xid = req->rq_reqhdr->xid; rc = reinters[rec->ur_opcode](rec, req); req->rq_status = rc; diff --git a/lustre/obdclass/class_obd.c b/lustre/obdclass/class_obd.c index 2c3a10f..c5984f2 100644 --- a/lustre/obdclass/class_obd.c +++ b/lustre/obdclass/class_obd.c @@ -60,7 +60,7 @@ static int obd_init_magic; int obd_print_entry = 1; int obd_debug_level = ~0; -long obd_memory = 0; +unsigned long obd_memory = 0; struct obd_device obd_dev[MAX_OBD_DEVICES]; struct list_head obd_types; @@ -194,6 +194,7 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp, obd->obd_type = type; obd->obd_multi_count = 0; INIT_LIST_HEAD(&obd->obd_gen_clients); + INIT_LIST_HEAD(&obd->obd_req_list); /* do the attach */ if ( OBT(obd) && OBP(obd, attach) ) { @@ -232,6 +233,11 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp, obd->obd_minor); return -EBUSY; } + if ( !list_empty(&obd->obd_req_list) ) { + CERROR("OBD device %d has hanging requests\n", + obd->obd_minor); + return -EBUSY; + } if (obd->obd_proc_entry) proc_lustre_release_obd_device(obd); @@ -273,11 +279,6 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp, case OBD_IOC_CLEANUP: { ENTRY; - if ( !(obd->obd_flags & OBD_SET_UP) ) { - EXIT; - return -EINVAL; - } - err = obd_cleanup(obd); if ( err ) { EXIT; @@ -432,7 +433,6 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp, err = OBP(obd, read)(&conn, &rw_s->obdo, rw_s->buf, &rw_s->count, rw_s->offset); - ODEBUG(&rw_s->obdo); CDEBUG(D_INFO, "READ: conn %d, count %Ld, offset %Ld, '%s'\n", rw_s->conn_id, rw_s->count, rw_s->offset, rw_s->buf); if ( err ) { @@ -466,7 +466,6 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp, err = OBP(obd, write)(&conn, &rw_s->obdo, rw_s->buf, &rw_s->count, rw_s->offset); - ODEBUG(&rw_s->obdo); if ( err ) { EXIT; return err; @@ -618,7 +617,6 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp, rw_s->conn_id, rw_s->count, rw_s->offset); err = OBP(obd, punch)(&conn, &rw_s->obdo, rw_s->count, rw_s->offset); - ODEBUG(&rw_s->obdo); if ( err ) { EXIT; return err; @@ -769,7 +767,27 @@ static struct miscdevice obd_psdev = { &obd_psdev_fops }; -int init_obd(void) + +EXPORT_SYMBOL(obd_register_type); +EXPORT_SYMBOL(obd_unregister_type); + +EXPORT_SYMBOL(obd_print_entry); +EXPORT_SYMBOL(obd_debug_level); +EXPORT_SYMBOL(obd_dev); + +EXPORT_SYMBOL(gen_connect); +EXPORT_SYMBOL(gen_client); +EXPORT_SYMBOL(gen_cleanup); +EXPORT_SYMBOL(gen_disconnect); +EXPORT_SYMBOL(gen_copy_data); +EXPORT_SYMBOL(obdo_cachep); + +/* EXPORT_SYMBOL(gen_multi_attach); */ +EXPORT_SYMBOL(gen_multi_setup); +EXPORT_SYMBOL(gen_multi_cleanup); +EXPORT_SYMBOL(obd_memory); + +static int __init init_obdclass(void) { int err; int i; @@ -787,6 +805,8 @@ int init_obd(void) memset(&(obd_dev[i]), 0, sizeof(obd_dev[i])); obd_dev[i].obd_minor = i; INIT_LIST_HEAD(&obd_dev[i].obd_gen_clients); + INIT_LIST_HEAD(&obd_dev[i].obd_req_list); + init_waitqueue_head(&obd_dev[i].obd_req_waitq); } err = obd_init_obdo_cache(); @@ -797,32 +817,7 @@ int init_obd(void) return 0; } -EXPORT_SYMBOL(obd_register_type); -EXPORT_SYMBOL(obd_unregister_type); - -EXPORT_SYMBOL(obd_print_entry); -EXPORT_SYMBOL(obd_debug_level); -EXPORT_SYMBOL(obd_dev); - -EXPORT_SYMBOL(gen_connect); -EXPORT_SYMBOL(gen_client); -EXPORT_SYMBOL(gen_cleanup); -EXPORT_SYMBOL(gen_disconnect); -EXPORT_SYMBOL(gen_copy_data); -EXPORT_SYMBOL(obdo_cachep); - -/* EXPORT_SYMBOL(gen_multi_attach); */ -EXPORT_SYMBOL(gen_multi_setup); -EXPORT_SYMBOL(gen_multi_cleanup); -EXPORT_SYMBOL(obd_memory); - -#ifdef MODULE -int init_module(void) -{ - return init_obd(); -} - -void cleanup_module(void) +static void __exit cleanup_obdclass(void) { int i; ENTRY; @@ -844,4 +839,10 @@ void cleanup_module(void) obd_init_magic = 0; EXIT; } -#endif + +MODULE_AUTHOR("Cluster File Systems, Inc. "); +MODULE_DESCRIPTION("Lustre Class Driver v1.0"); +MODULE_LICENSE("GPL"); + +module_init(init_obdclass); +module_exit(cleanup_obdclass); diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index 8140b52..6acfa7a 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -300,7 +300,7 @@ int gen_copy_data(struct obd_conn *dst_conn, struct obdo *dst, dst->o_size = src->o_size; dst->o_blocks = src->o_blocks; dst->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS); - obd_unlock_page(page); + UnlockPage(page); __free_page(page); EXIT; diff --git a/lustre/obdfs/rw.c b/lustre/obdfs/rw.c index e884709..a02e7c6 100644 --- a/lustre/obdfs/rw.c +++ b/lustre/obdfs/rw.c @@ -278,7 +278,7 @@ int obdfs_readpage(struct file *file, struct page *page) readpage_out: SetPageUptodate(page); - obd_unlock_page(page); + UnlockPage(page); EXIT; return 0; } /* obdfs_readpage */ @@ -440,9 +440,7 @@ int obdfs_do_vec_wr(struct inode **inodes, obd_count num_io, --num_io; CDEBUG(D_INFO, "calling put_page for %p, index %ld\n", pages[num_io], pages[num_io]->index); - /* PDEBUG(pages[num_io], "do_vec_wr"); */ put_page(pages[num_io]); - /* PDEBUG(pages[num_io], "do_vec_wr"); */ } CDEBUG(D_INFO, "put_page done\n"); @@ -546,7 +544,6 @@ int obdfs_do_writepage(struct page *page, int sync) int err; ENTRY; - /* PDEBUG(page, "WRITEPAGE"); */ if ( sync ) err = obdfs_brw(OBD_BRW_WRITE, inode, page, 1); else { @@ -559,7 +556,6 @@ int obdfs_do_writepage(struct page *page, int sync) SetPageUptodate(page); set_page_clean(page); } - /* PDEBUG(page,"WRITEPAGE"); */ EXIT; return err; } /* obdfs_do_writepage */ @@ -663,12 +659,11 @@ struct page *obdfs_getpage(struct inode *inode, unsigned long offset, return NULL; } - /* PDEBUG(page, "GETPAGE: got page - before reading\n"); */ /* now check if the data in the page is up to date */ if ( Page_Uptodate(page)) { if (!locked) { if (PageLocked(page)) - obd_unlock_page(page); + UnlockPage(page); } else { CERROR("expecting locked page\n"); } @@ -680,15 +675,14 @@ struct page *obdfs_getpage(struct inode *inode, unsigned long offset, if ( err ) { SetPageError(page); - obd_unlock_page(page); + UnlockPage(page); EXIT; return page; } if ( !locked ) - obd_unlock_page(page); + UnlockPage(page); SetPageUptodate(page); - /* PDEBUG(page,"GETPAGE - after reading"); */ EXIT; return page; } /* obdfs_getpage */ diff --git a/lustre/obdfs/super.c b/lustre/obdfs/super.c index 19d50a2..14ce596 100644 --- a/lustre/obdfs/super.c +++ b/lustre/obdfs/super.c @@ -43,7 +43,6 @@ extern struct address_space_operations obdfs_aops; struct super_operations obdfs_super_operations; long obdfs_cache_count = 0; long obdfs_mutex_start = 0; -long obd_memory = 0; static char *obdfs_read_opt(const char *opt, char *data) { diff --git a/lustre/obdfs/symlink.c b/lustre/obdfs/symlink.c index 3ae8a4d..c5c851b 100644 --- a/lustre/obdfs/symlink.c +++ b/lustre/obdfs/symlink.c @@ -58,9 +58,7 @@ static int obdfs_readlink(struct dentry *dentry, char *buffer, int buflen) int res; ENTRY; - OIDEBUG(dentry->d_inode); page = obdfs_getpage(dentry->d_inode, 0, 0, 0); - /* PDEBUG(page, "readlink"); */ if (!page) { EXIT; return 0; @@ -78,9 +76,7 @@ static int obdfs_follow_link(struct dentry * dentry, int res; ENTRY; - OIDEBUG(dentry->d_inode); page = obdfs_getpage(dentry->d_inode, 0, 0, 0); - /* PDEBUG(page, "follow_link"); */ if (!page) { dput(nd->dentry); EXIT; diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 1b9dd03..f6bd069 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -1,4 +1,6 @@ -/* +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * * Copryright (C) 2001 Cluster File Systems, Inc. * * This code is issued under the GNU General Public License. @@ -42,125 +44,21 @@ #include #include -extern int ost_queue_req(struct obd_device *, struct ptlrpc_request *); - -/* FIXME: this belongs in some sort of service struct */ -static int osc_xid = 1; - -struct ptlrpc_request *ost_prep_req(int opcode, int buflen1, char *buf1, - int buflen2, char *buf2) -{ - struct ptlrpc_request *request; - int rc; - ENTRY; - - OBD_ALLOC(request, sizeof(*request)); - if (!request) { - CERROR("request allocation out of memory\n"); - return NULL; - } - - memset(request, 0, sizeof(*request)); - request->rq_xid = osc_xid++; - - rc = ost_pack_req(buf1, buflen1, buf2, buflen2, - &request->rq_reqhdr, &request->rq_req.ost, - &request->rq_reqlen, &request->rq_reqbuf); - if (rc) { - CERROR("llight request: cannot pack request %d\n", rc); - return NULL; - } - request->rq_reqhdr->opc = opcode; - - EXIT; - return request; -} - -/* XXX: unify with mdc_queue_wait */ -extern int osc_queue_wait(struct obd_conn *conn, struct ptlrpc_request *req) +struct ptlrpc_client *osc_con2cl(struct obd_conn *conn) { - struct obd_device *client = conn->oc_dev; - struct lustre_peer *peer = &conn->oc_dev->u.osc.osc_peer; - int rc; - DECLARE_WAITQUEUE(wait, current); - - ENTRY; - - /* set the connection id */ - req->rq_req.ost->connid = conn->oc_id; - init_waitqueue_head(&req->rq_wait_for_rep); - - /* XXX fix the race here (wait_for_event?)*/ - if (peer == NULL) { - /* Local delivery */ - CDEBUG(D_INODE, "\n"); - rc = ost_queue_req(client, req); - } else { - /* Remote delivery via portals. */ - req->rq_req_portal = OST_REQUEST_PORTAL; - req->rq_reply_portal = OST_REPLY_PORTAL; - rc = ptl_send_rpc(req, peer); - } - if (rc) { - CERROR("error %d, opcode %d\n", rc, req->rq_reqhdr->opc); - return -rc; - } - - CDEBUG(D_INODE, "tgt at %p, conn id %d, opcode %d request at: %p\n", - &conn->oc_dev->u.osc.osc_tgt->u.ost, - conn->oc_id, req->rq_reqhdr->opc, req); - - /* wait for the reply */ - CDEBUG(D_INODE, "-- sleeping\n"); - add_wait_queue(&req->rq_wait_for_rep, &wait); - while (req->rq_repbuf == NULL) { - set_current_state(TASK_INTERRUPTIBLE); - - /* if this process really wants to die, let it go */ - if (sigismember(&(current->pending.signal), SIGKILL) || - sigismember(&(current->pending.signal), SIGINT)) - break; - - schedule(); - } - remove_wait_queue(&req->rq_wait_for_rep, &wait); - set_current_state(TASK_RUNNING); - CDEBUG(D_INODE, "-- done\n"); - - if (req->rq_repbuf == NULL) { - /* We broke out because of a signal */ - EXIT; - return -EINTR; - } - - rc = ost_unpack_rep(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, - &req->rq_rep.ost); - if (rc) { - CERROR("mds_unpack_rep failed: %d\n", rc); - return rc; - } + struct osc_obd *osc = &conn->oc_dev->u.osc; + return &osc->osc_peer; - if ( req->rq_rephdr->status == 0 ) - CDEBUG(D_INODE, "buf %p len %d status %d\n", - req->rq_repbuf, req->rq_replen, - req->rq_rephdr->status); - - EXIT; - return 0; -} - -static void osc_free_req(struct ptlrpc_request *request) -{ - OBD_FREE(request, sizeof(*request)); } static int osc_connect(struct obd_conn *conn) { struct ptlrpc_request *request; + struct ptlrpc_client *peer = osc_con2cl(conn); int rc; ENTRY; - request = ost_prep_req(OST_CONNECT, 0, NULL, 0, NULL); + request = ptlrpc_prep_req(peer, OST_CONNECT, 0, NULL, 0, NULL); if (!request) { CERROR("cannot pack req!\n"); return -ENOMEM; @@ -169,7 +67,7 @@ static int osc_connect(struct obd_conn *conn) request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep); - rc = osc_queue_wait(conn, request); + rc = ptlrpc_queue_wait(peer, request); if (rc) { EXIT; goto out; @@ -179,7 +77,7 @@ static int osc_connect(struct obd_conn *conn) conn->oc_id = request->rq_rep.ost->connid; out: - osc_free_req(request); + ptlrpc_free_req(request); EXIT; return rc; } @@ -187,25 +85,26 @@ static int osc_connect(struct obd_conn *conn) static int osc_disconnect(struct obd_conn *conn) { struct ptlrpc_request *request; + struct ptlrpc_client *peer = osc_con2cl(conn); int rc; ENTRY; - request = ost_prep_req(OST_DISCONNECT, 0, NULL, 0, NULL); + request = ptlrpc_prep_req(peer, OST_DISCONNECT, 0, NULL, 0, NULL); if (!request) { CERROR("cannot pack req!\n"); return -ENOMEM; } - + request->rq_req.ost->connid = conn->oc_id; request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep); - rc = osc_queue_wait(conn, request); + rc = ptlrpc_queue_wait(peer, request); if (rc) { EXIT; goto out; } out: - osc_free_req(request); + ptlrpc_free_req(request); EXIT; return rc; } @@ -214,9 +113,10 @@ static int osc_disconnect(struct obd_conn *conn) static int osc_getattr(struct obd_conn *conn, struct obdo *oa) { struct ptlrpc_request *request; + struct ptlrpc_client *peer = osc_con2cl(conn); int rc; - request = ost_prep_req(OST_GETATTR, 0, NULL, 0, NULL); + request = ptlrpc_prep_req(peer, OST_GETATTR, 0, NULL, 0, NULL); if (!request) { CERROR("cannot pack req!\n"); return -ENOMEM; @@ -227,7 +127,7 @@ static int osc_getattr(struct obd_conn *conn, struct obdo *oa) request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep); - rc = osc_queue_wait(conn, request); + rc = ptlrpc_queue_wait(peer, request); if (rc) { EXIT; goto out; @@ -239,16 +139,17 @@ static int osc_getattr(struct obd_conn *conn, struct obdo *oa) } out: - osc_free_req(request); + ptlrpc_free_req(request); return 0; } static int osc_setattr(struct obd_conn *conn, struct obdo *oa) { struct ptlrpc_request *request; + struct ptlrpc_client *peer = osc_con2cl(conn); int rc; - request = ost_prep_req(OST_SETATTR, 0, NULL, 0, NULL); + request = ptlrpc_prep_req(peer, OST_SETATTR, 0, NULL, 0, NULL); if (!request) { CERROR("cannot pack req!\n"); return -ENOMEM; @@ -258,26 +159,27 @@ static int osc_setattr(struct obd_conn *conn, struct obdo *oa) request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep); - rc = osc_queue_wait(conn, request); + rc = ptlrpc_queue_wait(peer, request); if (rc) { EXIT; goto out; } out: - osc_free_req(request); + ptlrpc_free_req(request); return 0; } static int osc_create(struct obd_conn *conn, struct obdo *oa) { struct ptlrpc_request *request; + struct ptlrpc_client *peer = osc_con2cl(conn); int rc; if (!oa) { CERROR("oa NULL\n"); } - request = ost_prep_req(OST_CREATE, 0, NULL, 0, NULL); + request = ptlrpc_prep_req(peer, OST_CREATE, 0, NULL, 0, NULL); if (!request) { CERROR("cannot pack req!\n"); return -ENOMEM; @@ -288,7 +190,7 @@ static int osc_create(struct obd_conn *conn, struct obdo *oa) request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep); - rc = osc_queue_wait(conn, request); + rc = ptlrpc_queue_wait(peer, request); if (rc) { EXIT; goto out; @@ -296,19 +198,21 @@ static int osc_create(struct obd_conn *conn, struct obdo *oa) memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa)); out: - osc_free_req(request); + ptlrpc_free_req(request); return 0; } -static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count, obd_off offset) +static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count, + obd_off offset) { struct ptlrpc_request *request; + struct ptlrpc_client *peer = osc_con2cl(conn); int rc; if (!oa) { CERROR("oa NULL\n"); } - request = ost_prep_req(OST_PUNCH, 0, NULL, 0, NULL); + request = ptlrpc_prep_req(peer, OST_PUNCH, 0, NULL, 0, NULL); if (!request) { CERROR("cannot pack req!\n"); return -ENOMEM; @@ -321,7 +225,7 @@ static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count, obd request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep); - rc = osc_queue_wait(conn, request); + rc = ptlrpc_queue_wait(peer, request); if (rc) { EXIT; goto out; @@ -329,19 +233,20 @@ static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count, obd memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa)); out: - osc_free_req(request); + ptlrpc_free_req(request); return 0; } static int osc_destroy(struct obd_conn *conn, struct obdo *oa) { struct ptlrpc_request *request; + struct ptlrpc_client *peer = osc_con2cl(conn); int rc; if (!oa) { CERROR("oa NULL\n"); } - request = ost_prep_req(OST_DESTROY, 0, NULL, 0, NULL); + request = ptlrpc_prep_req(peer, OST_DESTROY, 0, NULL, 0, NULL); if (!request) { CERROR("cannot pack req!\n"); return -ENOMEM; @@ -352,7 +257,7 @@ static int osc_destroy(struct obd_conn *conn, struct obdo *oa) request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep); - rc = osc_queue_wait(conn, request); + rc = ptlrpc_queue_wait(peer, request); if (rc) { EXIT; goto out; @@ -360,137 +265,216 @@ static int osc_destroy(struct obd_conn *conn, struct obdo *oa) memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa)); out: - osc_free_req(request); + ptlrpc_free_req(request); return 0; } - -/* mount the file system (secretly) */ -static int osc_setup(struct obd_device *obddev, obd_count len, - void *buf) - -{ - struct obd_ioctl_data* data = buf; - struct osc_obd *osc = &obddev->u.osc; - ENTRY; - - if (data->ioc_dev >= 0 && data->ioc_dev < MAX_OBD_DEVICES) { - /* This is a local connection */ - osc->osc_tgt = &obd_dev[data->ioc_dev]; - - CERROR("OSC: tgt %d ost at %p\n", data->ioc_dev, - &osc->osc_tgt->u.ost); - if ( ! (osc->osc_tgt->obd_flags & OBD_ATTACHED) || - ! (osc->osc_tgt->obd_flags & OBD_SET_UP) ){ - CERROR("device not attached or not set up (%d)\n", - data->ioc_dev); - EXIT; - return -EINVAL; - } - } else { - int err; - /* This is a remote connection using Portals */ - - /* XXX: this should become something like ioc_inlbuf1 */ - err = kportal_uuid_to_peer("ost", &osc->osc_peer); - if (err != 0) { - CERROR("Cannot find 'ost' peer.\n"); - EXIT; - return -EINVAL; - } - } - - MOD_INC_USE_COUNT; - EXIT; - return 0; -} - -int osc_sendpage(struct ptlrpc_request *req, struct niobuf *dst, - struct niobuf *src) +int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req, + struct niobuf *dst, struct niobuf *src) { - if (req->rq_peer.peer_nid == 0) { + if (conn->oc_id != -1) { /* local sendpage */ memcpy((char *)(unsigned long)dst->addr, (char *)(unsigned long)src->addr, src->len); } else { + struct ptlrpc_client *cl = osc_con2cl(conn); + struct ptlrpc_bulk_desc *bulk; char *buf; int rc; + bulk = ptlrpc_prep_bulk(&cl->cli_server); + if (bulk == NULL) + return -ENOMEM; + + spin_lock(&cl->cli_lock); + bulk->b_xid = cl->cli_xid++; + spin_unlock(&cl->cli_lock); + OBD_ALLOC(buf, src->len); - if (!buf) + if (!buf) { + OBD_FREE(bulk, sizeof(*bulk)); return -ENOMEM; + } memcpy(buf, (char *)(unsigned long)src->addr, src->len); - req->rq_type = PTLRPC_BULK; - req->rq_bulkbuf = buf; - req->rq_bulklen = src->len; - rc = ptl_send_buf(req, &req->rq_peer, OST_BULK_PORTAL); - init_waitqueue_head(&req->rq_wait_for_bulk); - sleep_on(&req->rq_wait_for_bulk); + bulk->b_buf = buf; + bulk->b_buflen = src->len; + /* FIXME: maybe we should add an XID to struct niobuf? */ + bulk->b_xid = (__u32)(unsigned long)src->page; + + rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL); + if (rc != 0) { + CERROR("send_bulk failed: %d\n", rc); + BUG(); + return rc; + } + wait_event_interruptible(bulk->b_waitq, + ptlrpc_check_bulk_sent(bulk)); + + if (bulk->b_flags == PTL_RPC_INTR) { + EXIT; + /* FIXME: hey hey, we leak here. */ + return -EINTR; + } + + OBD_FREE(bulk, sizeof(*bulk)); OBD_FREE(buf, src->len); - req->rq_bulklen = 0; /* FIXME: eek. */ } return 0; } - -int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa, - struct obdo **oa, obd_count *oa_bufs, struct page **buf, - obd_size *count, obd_off *offset, obd_flag *flags) +int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, + obd_count *oa_bufs, struct page **buf, obd_size *count, + obd_off *offset, obd_flag *flags) { - struct ptlrpc_request *request; + struct ptlrpc_client *cl = osc_con2cl(conn); + struct ptlrpc_request *request; + int pages; int rc; struct obd_ioobj ioo; struct niobuf src; int size1, size2 = 0; void *ptr1, *ptr2; int i, j, n; + struct ptlrpc_bulk_desc **bulk; size1 = num_oa * sizeof(ioo); + pages = 0; for (i = 0; i < num_oa; i++) { size2 += oa_bufs[i] * sizeof(src); + pages += oa_bufs[i]; } - request = ost_prep_req(OST_BRW, size1, NULL, size2, NULL); + /* We actually pack a _third_ buffer, with XIDs for bulk pages */ + size2 += pages * sizeof(__u32); + request = ptlrpc_prep_req(cl, OST_BRW, size1, NULL, size2, NULL); if (!request) { CERROR("cannot pack req!\n"); return -ENOMEM; } + request->rq_req.ost->cmd = OBD_BRW_READ; + + OBD_ALLOC(bulk, pages * sizeof(struct ptlrpc_bulk_desc *)); + if (bulk == NULL) { + CERROR("cannot alloc bulk desc vector\n"); + return -ENOMEM; + } + memset(bulk, 0, pages * sizeof(struct ptlrpc_bulk_desc *)); + + n = 0; + ptr1 = ost_req_buf1(request->rq_req.ost); + ptr2 = ost_req_buf2(request->rq_req.ost); + for (i = 0; i < num_oa; i++) { + ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); + for (j = 0; j < oa_bufs[i]; j++) { + bulk[n] = ptlrpc_prep_bulk(&cl->cli_server); + if (bulk[n] == NULL) { + CERROR("cannot alloc bulk desc\n"); + rc = -ENOMEM; + goto out; + } + + spin_lock(&cl->cli_lock); + bulk[n]->b_xid = cl->cli_xid++; + spin_unlock(&cl->cli_lock); + bulk[n]->b_buf = kmap(buf[n]); + bulk[n]->b_buflen = PAGE_SIZE; + bulk[n]->b_portal = OST_BULK_PORTAL; + ost_pack_niobuf(&ptr2, bulk[n]->b_buf, offset[n], + count[n], flags[n]); + n++; + } + } + + /* This is kinda silly--put the XIDs in the "third" buffer. */ + for (n = 0; n < pages; n++) { + *(__u32 *)ptr2 = bulk[n]->b_xid; + ptr2 = (char *)ptr2 + sizeof(__u32); + + rc = ptlrpc_wait_bulk(bulk[n]); + if (rc) + goto out; + } + + request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep); + rc = ptlrpc_queue_wait(cl, request); + + out: + /* FIXME: if we've called ptlrpc_wait_bulk but rc != 0, we need to + * abort those bulk listeners. */ + + if (request->rq_rephdr) + OBD_FREE(request->rq_rephdr, request->rq_replen); + n = 0; + for (i = 0; i < num_oa; i++) { + for (j = 0; j < oa_bufs[i]; j++) { + if (bulk[n] == NULL) + continue; + kunmap(bulk[n]->b_buf); + OBD_FREE(bulk[n], sizeof(struct ptlrpc_bulk_desc)); + n++; + } + } + + OBD_FREE(bulk, pages * sizeof(struct ptlrpc_bulk_desc *)); + ptlrpc_free_req(request); + return rc; +} + +int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, + obd_count *oa_bufs, struct page **buf, obd_size *count, + obd_off *offset, obd_flag *flags) +{ + struct ptlrpc_client *cl = osc_con2cl(conn); + struct ptlrpc_request *request; + struct obd_ioobj ioo; + struct niobuf src; + int pages, rc, i, j, n, size1, size2 = 0; + void *ptr1, *ptr2; + + size1 = num_oa * sizeof(ioo); + pages = 0; + for (i = 0; i < num_oa; i++) { + size2 += oa_bufs[i] * sizeof(src); + pages += oa_bufs[i]; + } + + request = ptlrpc_prep_req(cl, OST_BRW, size1, NULL, size2, NULL); + if (!request) { + CERROR("cannot pack req!\n"); + return -ENOMEM; + } + request->rq_req.ost->cmd = OBD_BRW_WRITE; n = 0; - request->rq_req.ost->cmd = rw; ptr1 = ost_req_buf1(request->rq_req.ost); ptr2 = ost_req_buf2(request->rq_req.ost); for (i = 0; i < num_oa; i++) { ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); for (j = 0; j < oa_bufs[i]; j++) { - ost_pack_niobuf(&ptr2, kmap(buf[n]), offset[n], - count[n], flags[n]); + ost_pack_niobuf(&ptr2, kmap(buf[n]), offset[n], + count[n], flags[n]); n++; } } - request->rq_bulk_portal = OST_BULK_PORTAL; - request->rq_replen = - sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep) + size2; - - rc = osc_queue_wait(conn, request); + request->rq_replen = sizeof(struct ptlrep_hdr) + + sizeof(struct ost_rep) + pages * sizeof(struct niobuf); + rc = ptlrpc_queue_wait(cl, request); if (rc) { EXIT; goto out; } -#if 0 - ptr2 = ost_rep_buf2(request->rq_rep.ost); - if (request->rq_rep.ost->buflen2 != n * sizeof(struct niobuf)) { - CERROR("buffer length wrong\n"); - goto out; - } - - if (rw == OBD_BRW_READ) - goto out; + ptr2 = ost_rep_buf2(request->rq_rep.ost); + if (request->rq_rep.ost->buflen2 != n * sizeof(struct niobuf)) { + CERROR("buffer length wrong (%d vs. %d)\n", + request->rq_rep.ost->buflen2, n * sizeof(struct niobuf)); + EXIT; + goto out; + } for (i = 0; i < num_oa; i++) { for (j = 0; j < oa_bufs[i]; j++) { @@ -498,11 +482,22 @@ int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa, src.addr = (__u64)(unsigned long)buf[n]; src.len = count[n]; ost_unpack_niobuf(&ptr2, &dst); - osc_sendpage(request, dst, &src); + osc_sendpage(conn, request, dst, &src); n++; } } -#endif + + /* Reuse the request structure for the completion request. */ + OBD_FREE(request->rq_rephdr, request->rq_replen); + request->rq_rephdr = NULL; + request->rq_repbuf = NULL; + request->rq_reqhdr->opc = OST_BRW_COMPLETE; + request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep); + rc = ptlrpc_queue_wait(cl, request); + if (rc) { + EXIT; + goto out; + } out: if (request->rq_rephdr) @@ -515,10 +510,46 @@ int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa, } } - osc_free_req(request); + ptlrpc_free_req(request); return 0; } +int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa, + struct obdo **oa, obd_count *oa_bufs, struct page **buf, + obd_size *count, obd_off *offset, obd_flag *flags) +{ + if (rw == OBD_BRW_READ) { + return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count, + offset, flags); + } else { + return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count, + offset, flags); + } +} + +/* mount the file system (secretly) */ +static int osc_setup(struct obd_device *obddev, obd_count len, + void *buf) + +{ + struct osc_obd *osc = &obddev->u.osc; + struct obd_ioctl_data *data = (struct obd_ioctl_data *)buf; + int rc; + int dev = data->ioc_dev; + ENTRY; + + rc = ptlrpc_connect_client(dev, "ost", + OST_REQUEST_PORTAL, + OSC_REPLY_PORTAL, + ost_pack_req, + ost_unpack_rep, + &osc->osc_peer); + + MOD_INC_USE_COUNT; + EXIT; + return rc; +} + static int osc_cleanup(struct obd_device * obddev) { MOD_DEC_USE_COUNT; @@ -555,4 +586,3 @@ MODULE_LICENSE("GPL"); module_init(osc_init); module_exit(osc_exit); - diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index e45c6f9..ca57958 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -51,100 +51,7 @@ #include #include -// for testing -static int ost_queue_req(struct obd_device *obddev, struct ptlrpc_request *req) -{ - struct ptlrpc_request *srv_req; - struct ost_obd *ost = &obddev->u.ost; - - if (!ost) { - EXIT; - return -1; - } - - OBD_ALLOC(srv_req, sizeof(*srv_req)); - if (!srv_req) { - EXIT; - return -ENOMEM; - } - - CDEBUG(0, "---> OST at %d %p, incoming req %p, srv_req %p\n", - __LINE__, ost, req, srv_req); - - memset(srv_req, 0, sizeof(*req)); - - /* move the request buffer */ - srv_req->rq_reqbuf = req->rq_reqbuf; - srv_req->rq_reqlen = req->rq_reqlen; - srv_req->rq_ost = ost; - - /* remember where it came from */ - srv_req->rq_reply_handle = req; - - spin_lock(&ost->ost_lock); - list_add(&srv_req->rq_list, &ost->ost_reqs); - spin_unlock(&ost->ost_lock); - wake_up(&ost->ost_waitq); - return 0; -} - -int ost_reply(struct obd_device *obddev, struct ptlrpc_request *req) -{ - struct ptlrpc_request *clnt_req = req->rq_reply_handle; - - ENTRY; - - if (req->rq_ost->ost_service != NULL) { - /* This is a request that came from the network via portals. */ - - /* FIXME: we need to increment the count of handled events */ - req->rq_type = PTLRPC_REPLY; - ptl_send_buf(req, &req->rq_peer, OST_REPLY_PORTAL); - } else { - /* This is a local request that came from another thread. */ - /* move the reply to the client */ - clnt_req->rq_replen = req->rq_replen; - clnt_req->rq_repbuf = req->rq_repbuf; - req->rq_repbuf = NULL; - req->rq_replen = 0; - - /* free the request buffer */ - OBD_FREE(req->rq_reqbuf, req->rq_reqlen); - req->rq_reqbuf = NULL; - - /* wake up the client */ - wake_up_interruptible(&clnt_req->rq_wait_for_rep); - } - - EXIT; - return 0; -} - -int ost_error(struct obd_device *obddev, struct ptlrpc_request *req) -{ - struct ptlrep_hdr *hdr; - - ENTRY; - - OBD_ALLOC(hdr, sizeof(*hdr)); - if (!hdr) { - EXIT; - return -ENOMEM; - } - - memset(hdr, 0, sizeof(*hdr)); - - hdr->seqno = req->rq_reqhdr->seqno; - hdr->status = req->rq_status; - hdr->type = OST_TYPE_ERR; - - req->rq_repbuf = (char *)hdr; - req->rq_replen = sizeof(*hdr); - - EXIT; - return ost_reply(obddev, req); -} static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req) { @@ -156,15 +63,14 @@ static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req) conn.oc_id = req->rq_req.ost->connid; conn.oc_dev = ost->ost_tgt; - rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost, + rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, &req->rq_replen, &req->rq_repbuf); if (rc) { CERROR("cannot pack reply\n"); return rc; } - req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_destroy - (&conn, &req->rq_req.ost->oa); + req->rq_rep.ost->result = obd_destroy(&conn, &req->rq_req.ost->oa); EXIT; return 0; @@ -180,7 +86,7 @@ static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req) conn.oc_id = req->rq_req.ost->connid; conn.oc_dev = ost->ost_tgt; - rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost, + rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, &req->rq_replen, &req->rq_repbuf); if (rc) { CERROR("cannot pack reply\n"); @@ -189,8 +95,7 @@ static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req) req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id; req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid; - req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_getattr - (&conn, &req->rq_rep.ost->oa); + req->rq_rep.ost->result = obd_getattr(&conn, &req->rq_rep.ost->oa); EXIT; return 0; @@ -206,17 +111,17 @@ static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req) conn.oc_id = req->rq_req.ost->connid; conn.oc_dev = ost->ost_tgt; - rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost, + rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, &req->rq_replen, &req->rq_repbuf); if (rc) { CERROR("cannot pack reply\n"); return rc; } - memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa)); + memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, + sizeof(req->rq_req.ost->oa)); - req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_create - (&conn, &req->rq_rep.ost->oa); + req->rq_rep.ost->result =obd_create(&conn, &req->rq_rep.ost->oa); EXIT; return 0; @@ -232,19 +137,19 @@ static int ost_punch(struct ost_obd *ost, struct ptlrpc_request *req) conn.oc_id = req->rq_req.ost->connid; conn.oc_dev = ost->ost_tgt; - rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost, + rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, &req->rq_replen, &req->rq_repbuf); if (rc) { CERROR("cannot pack reply\n"); return rc; } - memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa)); + memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, + sizeof(req->rq_req.ost->oa)); - req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_punch - (&conn, &req->rq_rep.ost->oa, - req->rq_rep.ost->oa.o_size, - req->rq_rep.ost->oa.o_blocks); + req->rq_rep.ost->result = obd_punch(&conn, &req->rq_rep.ost->oa, + req->rq_rep.ost->oa.o_size, + req->rq_rep.ost->oa.o_blocks); EXIT; return 0; @@ -261,7 +166,7 @@ static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req) conn.oc_id = req->rq_req.ost->connid; conn.oc_dev = ost->ost_tgt; - rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost, + rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, &req->rq_replen, &req->rq_repbuf); if (rc) { CERROR("cannot pack reply\n"); @@ -271,8 +176,7 @@ static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req) memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa)); - req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_setattr - (&conn, &req->rq_rep.ost->oa); + req->rq_rep.ost->result = obd_setattr(&conn, &req->rq_rep.ost->oa); EXIT; return 0; @@ -287,17 +191,16 @@ static int ost_connect(struct ost_obd *ost, struct ptlrpc_request *req) conn.oc_dev = ost->ost_tgt; - rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost, + rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, &req->rq_replen, &req->rq_repbuf); if (rc) { CERROR("cannot pack reply\n"); return rc; } - req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_connect(&conn); + req->rq_rep.ost->result = obd_connect(&conn); - CDEBUG(0, "rep buffer %p, id %d\n", req->rq_repbuf, - conn.oc_id); + CDEBUG(D_IOCTL, "rep buffer %p, id %d\n", req->rq_repbuf, conn.oc_id); req->rq_rep.ost->connid = conn.oc_id; EXIT; return 0; @@ -313,14 +216,14 @@ static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req) conn.oc_dev = ost->ost_tgt; conn.oc_id = req->rq_req.ost->connid; - rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost, + rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, &req->rq_replen, &req->rq_repbuf); if (rc) { CERROR("cannot pack reply\n"); return rc; } - - req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_disconnect(&conn); + CDEBUG(D_IOCTL, "Disconnecting %d\n", conn.oc_id); + req->rq_rep.ost->result = obd_disconnect(&conn); EXIT; return 0; @@ -340,11 +243,12 @@ static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req) conn.oc_dev = ost->ost_tgt; ptr = ost_req_buf1(req->rq_req.ost); - req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_get_info - (&conn, req->rq_req.ost->buflen1, ptr, &vallen, &val); + req->rq_rep.ost->result = obd_get_info(&conn, + req->rq_req.ost->buflen1, ptr, + &vallen, &val); rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr, - &req->rq_rep.ost, &req->rq_replen, &req->rq_repbuf); + &req->rq_rep, &req->rq_replen, &req->rq_repbuf); if (rc) { CERROR("cannot pack reply\n"); return rc; @@ -356,6 +260,8 @@ static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req) int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req) { + struct ptlrpc_bulk_desc **bulk_vec = NULL; + struct ptlrpc_bulk_desc *bulk = NULL; struct obd_conn conn; int rc; int i, j; @@ -377,79 +283,213 @@ int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req) cmd = r->cmd; conn.oc_id = req->rq_req.ost->connid; - conn.oc_dev = req->rq_ost->ost_tgt; + conn.oc_dev = req->rq_obd->u.ost.ost_tgt; - rc = ost_pack_rep(NULL, niocount, NULL, 0, - &req->rq_rephdr, &req->rq_rep.ost, - &req->rq_replen, &req->rq_repbuf); - if (rc) { - CERROR("cannot pack reply\n"); - return rc; - } - res = ost_rep_buf1(req->rq_rep.ost); - - for (i=0; i < objcount; i++) { + for (i = 0; i < objcount; i++) { ost_unpack_ioo((void *)&tmp1, &ioo); if (tmp2 + ioo->ioo_bufcnt > end2) { rc = -EFAULT; break; } - for (j = 0 ; j < ioo->ioo_bufcnt ; j++) { + for (j = 0; j < ioo->ioo_bufcnt; j++) { ost_unpack_niobuf((void *)&tmp2, &nb); } } + rc = ost_pack_rep(NULL, 0, NULL, niocount * sizeof(*nb), + &req->rq_rephdr, &req->rq_rep, + &req->rq_replen, &req->rq_repbuf); + if (rc) { + CERROR("cannot pack reply\n"); + return rc; + } + OBD_ALLOC(res, sizeof(struct niobuf) * niocount); + if (res == NULL) { + EXIT; + return -ENOMEM; + } + /* The unpackers move tmp1 and tmp2, so reset them before using */ tmp1 = ost_req_buf1(r); tmp2 = ost_req_buf2(r); - req->rq_rep.ost->result = - req->rq_ost->ost_tgt->obd_type->typ_ops->o_preprw + req->rq_rep.ost->result = obd_preprw (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, niocount, (struct niobuf *)tmp2, (struct niobuf *)res); if (req->rq_rep.ost->result) { EXIT; - goto out; + goto out; } if (cmd == OBD_BRW_WRITE) { + /* Setup buffers for the incoming pages, then send the niobufs + * describing those buffers to the OSC. */ + OBD_ALLOC(bulk_vec, + niocount * sizeof(struct ptlrpc_bulk_desc *)); + if (bulk_vec == NULL) { + CERROR("cannot alloc bulk desc vector\n"); + return -ENOMEM; + } + memset(bulk_vec, 0, + niocount * sizeof(struct ptlrpc_bulk_desc *)); + for (i = 0; i < niocount; i++) { - src = &((struct niobuf *)tmp2)[i]; + struct ptlrpc_service *srv = + req->rq_obd->u.ost.ost_service; + + bulk_vec[i] = ptlrpc_prep_bulk(&req->rq_peer); + if (bulk_vec[i] == NULL) { + CERROR("cannot alloc bulk desc\n"); + rc = -ENOMEM; + goto out; + } + + spin_lock(&srv->srv_lock); + bulk_vec[i]->b_xid = srv->srv_xid++; + spin_unlock(&srv->srv_lock); + dst = &((struct niobuf *)res)[i]; + /* FIXME: we overload ->page with the xid of this buffer + * for the benefit of the remote client */ + dst->page = + (void *)(unsigned long)HTON__u64(bulk_vec[i]->b_xid); + + bulk_vec[i]->b_buf = (void *)(unsigned long)dst->addr; + bulk_vec[i]->b_buflen = PAGE_SIZE; + bulk_vec[i]->b_portal = OSC_BULK_PORTAL; + rc = ptlrpc_wait_bulk(bulk_vec[i]); + if (rc) + goto out; + +#if 0 + /* Local delivery */ + src = &((struct niobuf *)tmp2)[i]; memcpy((void *)(unsigned long)dst->addr, - (void *)(unsigned long)src->addr, - src->len); + (void *)(unsigned long)src->addr, src->len); +#endif } barrier(); - } else { + } else { for (i = 0; i < niocount; i++) { - dst = &((struct niobuf *)tmp2)[i]; - src = &((struct niobuf *)res)[i]; - memcpy((void *)(unsigned long)dst->addr, - (void *)(unsigned long)src->addr, - PAGE_SIZE); - } + struct ptlrpc_service *srv = + req->rq_obd->u.ost.ost_service; + + bulk = ptlrpc_prep_bulk(&req->rq_peer); + if (bulk == NULL) { + CERROR("cannot alloc bulk desc\n"); + rc = -ENOMEM; + goto out; + } + + spin_lock(&srv->srv_lock); + bulk->b_xid = srv->srv_xid++; + spin_unlock(&srv->srv_lock); + + src = &((struct niobuf *)tmp2)[i]; + + bulk->b_buf = (void *)(unsigned long)src->addr; + bulk->b_buflen = PAGE_SIZE; + rc = ptlrpc_send_bulk(bulk, OST_BULK_PORTAL); + if (rc) { + EXIT; + goto out; + } + wait_event_interruptible(bulk->b_waitq, + ptlrpc_check_bulk_sent(bulk)); + + if (bulk->b_flags == PTL_RPC_INTR) { + EXIT; + goto out; + } + + OBD_FREE(bulk, sizeof(*bulk)); + } + +#if 0 + /* Local delivery */ + dst = &((struct niobuf *)tmp2)[i]; + memcpy((void *)(unsigned long)dst->addr, + (void *)(unsigned long)src->addr, PAGE_SIZE); +#endif barrier(); } - req->rq_rep.ost->result = - req->rq_ost->ost_tgt->obd_type->typ_ops->o_commitrw - (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, - niocount, (struct niobuf *)res); - out: + if (bulk != NULL) + OBD_FREE(bulk, sizeof(*bulk)); + if (bulk_vec != NULL) { + for (i = 0; i < niocount; i++) { + if (bulk_vec[i] != NULL) + OBD_FREE(bulk_vec[i], sizeof(*bulk)); + } + OBD_FREE(bulk_vec, + niocount * sizeof(struct ptlrpc_bulk_desc *)); + } + EXIT; return 0; } -int ost_handle(struct obd_device *obddev, struct ptlrpc_request *req) +int ost_brw_complete(struct ost_obd *obddev, struct ptlrpc_request *req) +{ + struct obd_conn conn; + int rc, i, j, cmd; + int objcount, niocount; + char *tmp1, *tmp2, *end2; + struct niobuf *nb; + struct obd_ioobj *ioo; + struct ost_req *r = req->rq_req.ost; + + ENTRY; + + tmp1 = ost_req_buf1(r); + tmp2 = ost_req_buf2(r); + end2 = tmp2 + req->rq_req.ost->buflen2; + objcount = r->buflen1 / sizeof(*ioo); + niocount = r->buflen2 / sizeof(*nb); + cmd = r->cmd; + + conn.oc_id = req->rq_req.ost->connid; + conn.oc_dev = req->rq_obd->u.ost.ost_tgt; + + for (i = 0; i < objcount; i++) { + ost_unpack_ioo((void *)&tmp1, &ioo); + if (tmp2 + ioo->ioo_bufcnt > end2) { + rc = -EFAULT; + break; + } + for (j = 0; j < ioo->ioo_bufcnt; j++) { + ost_unpack_niobuf((void *)&tmp2, &nb); + } + } + + rc = ost_pack_rep(NULL, 0, NULL, 0, + &req->rq_rephdr, &req->rq_rep, + &req->rq_replen, &req->rq_repbuf); + if (rc) { + CERROR("cannot pack reply\n"); + return rc; + } + + /* The unpackers move tmp1 and tmp2, so reset them before using */ + tmp1 = ost_req_buf1(r); + tmp2 = ost_req_buf2(r); + req->rq_rep.ost->result = obd_commitrw + (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, + niocount, (struct niobuf *)tmp2); + + return 0; +} + +static int ost_handle(struct obd_device *obddev, + struct ptlrpc_service *svc, + struct ptlrpc_request *req) { int rc; struct ost_obd *ost = &obddev->u.ost; struct ptlreq_hdr *hdr; ENTRY; - CDEBUG(0, "req at %p\n", req); hdr = (struct ptlreq_hdr *)req->rq_reqbuf; if (NTOH__u32(hdr->type) != OST_TYPE_REQ) { @@ -460,7 +500,7 @@ int ost_handle(struct obd_device *obddev, struct ptlrpc_request *req) } rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen, - &req->rq_reqhdr, &req->rq_req.ost); + &req->rq_reqhdr, &req->rq_req); if (rc) { CERROR("lustre_ost: Invalid request\n"); EXIT; @@ -501,193 +541,32 @@ int ost_handle(struct obd_device *obddev, struct ptlrpc_request *req) CDEBUG(D_INODE, "brw\n"); rc = ost_brw(ost, req); break; + case OST_BRW_COMPLETE: + CDEBUG(D_INODE, "brw_complete\n"); + rc = ost_brw_complete(ost, req); + break; case OST_PUNCH: CDEBUG(D_INODE, "punch\n"); rc = ost_punch(ost, req); break; default: req->rq_status = -ENOTSUPP; - return ost_error(obddev, req); + return ptlrpc_error(obddev, svc, req); } out: req->rq_status = rc; if (rc) { CERROR("ost: processing error %d\n", rc); - ost_error(obddev, req); + ptlrpc_error(obddev, svc, req); } else { CDEBUG(D_INODE, "sending reply\n"); - ost_reply(obddev, req); - } - - return 0; -} - -/* FIXME: Serious refactoring needed */ -int ost_main(void *arg) -{ - int signal; - struct obd_device *obddev = (struct obd_device *) arg; - struct ost_obd *ost = &obddev->u.ost; - DECLARE_WAITQUEUE(wait, current); - - ENTRY; - - lock_kernel(); - daemonize(); - spin_lock_irq(¤t->sigmask_lock); - sigfillset(¤t->blocked); - recalc_sigpending(current); - spin_unlock_irq(¤t->sigmask_lock); - - sprintf(current->comm, "lustre_ost"); - - /* Record that the thread is running */ - ost->ost_thread = current; - wake_up(&ost->ost_done_waitq); - - /* XXX maintain a list of all managed devices: insert here */ - - /* And now, wait forever for commit wakeup events. */ - while (1) { - int rc; - - if (ost->ost_service != NULL) { - ptl_event_t ev; - struct ptlrpc_request request; - struct ptlrpc_service *service; - - CDEBUG(D_IOCTL, "-- sleeping\n"); - signal = 0; - add_wait_queue(&ost->ost_waitq, &wait); - while (1) { - set_current_state(TASK_INTERRUPTIBLE); - rc = PtlEQGet(ost->ost_service->srv_eq_h, &ev); - if (rc == PTL_OK || rc == PTL_EQ_DROPPED) - break; - if (ost->ost_flags & OST_EXIT) - break; - - - /* if this process really wants to die, - * let it go */ - if (sigismember(&(current->pending.signal), - SIGKILL) || - sigismember(&(current->pending.signal), - SIGINT)) { - signal = 1; - break; - } - - schedule(); - } - remove_wait_queue(&ost->ost_waitq, &wait); - set_current_state(TASK_RUNNING); - CDEBUG(D_IOCTL, "-- done\n"); - - if (signal == 1) { - /* We broke out because of a signal */ - EXIT; - break; - } - if (ost->ost_flags & OST_EXIT) { - EXIT; - break; - } - - service = (struct ptlrpc_service *)ev.mem_desc.user_ptr; - - /* FIXME: If we move to an event-driven model, - * we should put the request on the stack of - * mds_handle instead. */ - memset(&request, 0, sizeof(request)); - request.rq_reqbuf = ev.mem_desc.start + ev.offset; - request.rq_reqlen = ev.mem_desc.length; - request.rq_ost = ost; - request.rq_xid = ev.match_bits; - - request.rq_peer.peer_nid = ev.initiator.nid; - /* FIXME: this NI should be the incoming NI. - * We don't know how to find that from here. */ - request.rq_peer.peer_ni = - ost->ost_service->srv_self.peer_ni; - rc = ost_handle(obddev, &request); - - /* Inform the rpc layer the event has been handled */ - ptl_received_rpc(service); - } else { - struct ptlrpc_request *request; - - CDEBUG(D_IOCTL, "-- sleeping\n"); - add_wait_queue(&ost->ost_waitq, &wait); - while (1) { - spin_lock(&ost->ost_lock); - if (!list_empty(&ost->ost_reqs)) - break; - - set_current_state(TASK_INTERRUPTIBLE); - - /* if this process really wants to die, - * let it go */ - if (sigismember(&(current->pending.signal), - SIGKILL) || - sigismember(&(current->pending.signal), - SIGINT)) - break; - - spin_unlock(&ost->ost_lock); - - schedule(); - } - remove_wait_queue(&ost->ost_waitq, &wait); - set_current_state(TASK_RUNNING); - CDEBUG(D_IOCTL, "-- done\n"); - - if (list_empty(&ost->ost_reqs)) { - CDEBUG(D_INODE, "woke because of signal\n"); - spin_unlock(&ost->ost_lock); - } else { - request = list_entry(ost->ost_reqs.next, - struct ptlrpc_request, - rq_list); - list_del(&request->rq_list); - spin_unlock(&ost->ost_lock); - rc = ost_handle(obddev, request); - } - } + ptlrpc_reply(obddev, svc, req); } - /* XXX maintain a list of all managed devices: cleanup here */ - - ost->ost_thread = NULL; - wake_up(&ost->ost_done_waitq); - CERROR("lustre_ost: exiting\n"); return 0; } -static void ost_stop_srv_thread(struct ost_obd *ost) -{ - ost->ost_flags |= OST_EXIT; - - while (ost->ost_thread) { - wake_up(&ost->ost_waitq); - sleep_on(&ost->ost_done_waitq); - } -} - -static void ost_start_srv_thread(struct obd_device *obd) -{ - struct ost_obd *ost = &obd->u.ost; - ENTRY; - - init_waitqueue_head(&ost->ost_waitq); - init_waitqueue_head(&ost->ost_done_waitq); - kernel_thread(ost_main, (void *)obd, - CLONE_VM | CLONE_FS | CLONE_FILES); - while (!ost->ost_thread) - sleep_on(&ost->ost_done_waitq); - EXIT; -} /* mount the file system (secretly) */ static int ost_setup(struct obd_device *obddev, obd_count len, @@ -697,7 +576,6 @@ static int ost_setup(struct obd_device *obddev, obd_count len, struct obd_ioctl_data* data = buf; struct ost_obd *ost = &obddev->u.ost; struct obd_device *tgt; - struct lustre_peer peer; int err; ENTRY; @@ -717,34 +595,32 @@ static int ost_setup(struct obd_device *obddev, obd_count len, } ost->ost_conn.oc_dev = tgt; - err = tgt->obd_type->typ_ops->o_connect(&ost->ost_conn); + err = obd_connect(&ost->ost_conn); if (err) { - CERROR("lustre ost: fail to connect to device %d\n", - data->ioc_dev); + CERROR("fail to connect to device %d\n", data->ioc_dev); return -EINVAL; } - INIT_LIST_HEAD(&ost->ost_reqs); - ost->ost_thread = NULL; - ost->ost_flags = 0; - - spin_lock_init(&obddev->u.ost.ost_lock); - - err = kportal_uuid_to_peer("self", &peer); - if (err == 0) { - OBD_ALLOC(ost->ost_service, sizeof(*ost->ost_service)); - if (ost->ost_service == NULL) - return -ENOMEM; - ost->ost_service->srv_buf_size = 64 * 1024; - ost->ost_service->srv_portal = OST_REQUEST_PORTAL; - memcpy(&ost->ost_service->srv_self, &peer, sizeof(peer)); - ost->ost_service->srv_wait_queue = &ost->ost_waitq; - - rpc_register_service(ost->ost_service, "self"); - } - - ost_start_srv_thread(obddev); + ost->ost_service = ptlrpc_init_svc( 64 * 1024, + OST_REQUEST_PORTAL, + OSC_REPLY_PORTAL, + "self", + ost_unpack_req, + ost_pack_rep, + ost_handle); + if (!ost->ost_service) { + obd_disconnect(&ost->ost_conn); + return -EINVAL; + } + + rpc_register_service(ost->ost_service, "self"); + err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost"); + if (err) { + obd_disconnect(&ost->ost_conn); + return -EINVAL; + } + MOD_INC_USE_COUNT; EXIT; return 0; @@ -753,38 +629,30 @@ static int ost_setup(struct obd_device *obddev, obd_count len, static int ost_cleanup(struct obd_device * obddev) { struct ost_obd *ost = &obddev->u.ost; - struct obd_device *tgt; int err; ENTRY; - if ( !(obddev->obd_flags & OBD_SET_UP) ) { - EXIT; - return 0; - } - if ( !list_empty(&obddev->obd_gen_clients) ) { CERROR("still has clients!\n"); EXIT; return -EBUSY; } - ost_stop_srv_thread(ost); + ptlrpc_stop_thread(ost->ost_service); rpc_unregister_service(ost->ost_service); - OBD_FREE(ost->ost_service, sizeof(*ost->ost_service)); - if (!list_empty(&ost->ost_reqs)) { + if (!list_empty(&ost->ost_service->srv_reqs)) { // XXX reply with errors and clean up - CDEBUG(D_INODE, "Request list not empty!\n"); + CERROR("Request list not empty!\n"); } + OBD_FREE(ost->ost_service, sizeof(*ost->ost_service)); - tgt = ost->ost_tgt; - err = tgt->obd_type->typ_ops->o_disconnect(&ost->ost_conn); + err = obd_disconnect(&ost->ost_conn); if (err) { CERROR("lustre ost: fail to disconnect device\n"); return -EINVAL; } - MOD_DEC_USE_COUNT; EXIT; @@ -812,8 +680,5 @@ MODULE_AUTHOR("Peter J. Braam "); MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01"); MODULE_LICENSE("GPL"); -// for testing (maybe this stays) -EXPORT_SYMBOL(ost_queue_req); - module_init(ost_init); module_exit(ost_exit); diff --git a/lustre/ptlrpc/Makefile.am b/lustre/ptlrpc/Makefile.am index 25abfc8..d9ec67e 100644 --- a/lustre/ptlrpc/Makefile.am +++ b/lustre/ptlrpc/Makefile.am @@ -9,6 +9,6 @@ MODULE = ptlrpc modulefs_DATA = ptlrpc.o EXTRA_PROGRAMS = ptlrpc -ptlrpc_SOURCES = rpc.c +ptlrpc_SOURCES = rpc.c events.c service.c client.c niobuf.c include $(top_srcdir)/Rules diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index b758162..8fc5669 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -32,8 +32,7 @@ #include #include -int ptlrpc_enqueue(struct ptlrpc_client *peer, - struct ptlrpc_request *req) +int ptlrpc_enqueue(struct ptlrpc_client *peer, struct ptlrpc_request *req) { struct ptlrpc_request *srv_req; @@ -66,14 +65,20 @@ int ptlrpc_enqueue(struct ptlrpc_client *peer, return 0; } -int ptlrpc_connect_client(int dev, char *uuid, - struct ptlrpc_client *cl) +int ptlrpc_connect_client(int dev, char *uuid, int req_portal, int rep_portal, + req_pack_t req_pack, rep_unpack_t rep_unpack, + struct ptlrpc_client *cl) { int err; memset(cl, 0, sizeof(*cl)); - cl->cli_xid = 0; + spin_lock_init(&cl->cli_lock); + cl->cli_xid = 1; cl->cli_obd = NULL; + cl->cli_request_portal = req_portal; + cl->cli_reply_portal = rep_portal; + cl->cli_rep_unpack = rep_unpack; + cl->cli_req_pack = req_pack; /* non networked client */ if (dev >= 0 && dev < MAX_OBD_DEVICES) { @@ -84,6 +89,10 @@ int ptlrpc_connect_client(int dev, char *uuid, CERROR("target device %d not att or setup\n", dev); return -EINVAL; } + if (strcmp(obd->obd_type->typ_name, "ost") && + strcmp(obd->obd_type->typ_name, "mds")) { + return -EINVAL; + } cl->cli_obd = &obd_dev[dev]; return 0; @@ -91,13 +100,27 @@ int ptlrpc_connect_client(int dev, char *uuid, /* networked */ err = kportal_uuid_to_peer(uuid, &cl->cli_server); - if (err == 0) { - CERROR("cannot find peer!"); + if (err != 0) { + CERROR("cannot find peer %s!", uuid); } return err; } +struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct lustre_peer *peer) +{ + struct ptlrpc_bulk_desc *bulk; + + OBD_ALLOC(bulk, sizeof(*bulk)); + if (bulk != NULL) { + memset(bulk, 0, sizeof(*bulk)); + memcpy(&bulk->b_peer, peer, sizeof(*peer)); + init_waitqueue_head(&bulk->b_waitq); + } + + return bulk; +} + struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, int opcode, int namelen, char *name, int tgtlen, char *tgt) @@ -113,7 +136,10 @@ struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, } memset(request, 0, sizeof(*request)); + + spin_lock(&cl->cli_lock); request->rq_xid = cl->cli_xid++; + spin_unlock(&cl->cli_lock); rc = cl->cli_req_pack(name, namelen, tgt, tgtlen, &request->rq_reqhdr, &request->rq_req, @@ -123,7 +149,7 @@ struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, return NULL; } request->rq_reqhdr->opc = opcode; - request->rq_reqhdr->seqno = request->rq_xid; + request->rq_reqhdr->xid = request->rq_xid; EXIT; return request; @@ -134,11 +160,43 @@ void ptlrpc_free_req(struct ptlrpc_request *request) OBD_FREE(request, sizeof(*request)); } +static int ptlrpc_check_reply(struct ptlrpc_request *req) +{ + if (req->rq_repbuf != NULL) { + req->rq_flags = PTL_RPC_REPLY; + EXIT; + return 1; + } + + if (sigismember(&(current->pending.signal), SIGKILL) || + sigismember(&(current->pending.signal), SIGINT)) { + req->rq_flags = PTL_RPC_INTR; + EXIT; + return 1; + } + + return 0; +} + +/* Abort this request and cleanup any resources associated with it. */ +int ptlrpc_abort(struct ptlrpc_request *request) +{ + /* First remove the MD for the reply; in theory, this means + * that we can tear down the buffer safely. */ + PtlMEUnlink(request->rq_reply_me_h); + PtlMDUnlink(request->rq_reply_md_h); + OBD_FREE(request->rq_repbuf, request->rq_replen); + request->rq_repbuf = NULL; + request->rq_replen = 0; + + return 0; +} + int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req) { int rc; - DECLARE_WAITQUEUE(wait, current); + ENTRY; init_waitqueue_head(&req->rq_wait_for_rep); @@ -153,43 +211,38 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req) rc = ptl_send_rpc(req, &cl->cli_server); } if (rc) { - CERROR("error %d, opcode %d\n", rc, - req->rq_reqhdr->opc); + CERROR("error %d, opcode %d\n", rc, req->rq_reqhdr->opc); return -rc; } CDEBUG(0, "-- sleeping\n"); - add_wait_queue(&req->rq_wait_for_rep, &wait); - while (req->rq_repbuf == NULL) { - set_current_state(TASK_INTERRUPTIBLE); - - /* if this process really wants to die, let it go */ - if (sigismember(&(current->pending.signal), SIGKILL) || - sigismember(&(current->pending.signal), SIGINT)) - break; - - schedule(); - } - remove_wait_queue(&req->rq_wait_for_rep, &wait); - set_current_state(TASK_RUNNING); + wait_event_interruptible(req->rq_wait_for_rep, ptlrpc_check_reply(req)); CDEBUG(0, "-- done\n"); + + if (req->rq_flags == PTL_RPC_INTR) { + /* Clean up the dangling reply buffers */ + ptlrpc_abort(req); + EXIT; + return -EINTR; + } - if (req->rq_repbuf == NULL) { - /* We broke out because of a signal */ + if (req->rq_flags != PTL_RPC_REPLY) { + CERROR("Unknown reason for wakeup\n"); EXIT; return -EINTR; } - rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, &req->rq_rep); + rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen, + &req->rq_rephdr, &req->rq_rep); if (rc) { CERROR("unpack_rep failed: %d\n", rc); return rc; } - CERROR("got rep %lld\n", req->rq_rephdr->seqno); + CERROR("got rep %d\n", req->rq_rephdr->xid); + if ( req->rq_rephdr->status == 0 ) - CDEBUG(0, "--> buf %p len %d status %d\n", - req->rq_repbuf, req->rq_replen, - req->rq_rephdr->status); + CDEBUG(0, "--> buf %p len %d status %d\n", req->rq_repbuf, + req->rq_replen, req->rq_rephdr->status); EXIT; return 0; diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index f6299f0..f3cfd1c 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -33,6 +33,7 @@ #include ptl_handle_eq_t sent_pkt_eq, rcvd_rep_eq, bulk_source_eq, bulk_sink_eq; +static const ptl_handle_ni_t *socknal_nip = NULL, *qswnal_nip = NULL; /* * Free the packet when it has gone out @@ -131,7 +132,7 @@ int server_request_callback(ptl_event_t *ev, void *data) static int bulk_source_callback(ptl_event_t *ev, void *data) { - struct ptlrpc_request *rpc = ev->mem_desc.user_ptr; + struct ptlrpc_bulk_desc *bulk = ev->mem_desc.user_ptr; ENTRY; @@ -139,7 +140,8 @@ static int bulk_source_callback(ptl_event_t *ev, void *data) CDEBUG(D_NET, "got SENT event\n"); } else if (ev->type == PTL_EVENT_ACK) { CDEBUG(D_NET, "got ACK event\n"); - wake_up_interruptible(&rpc->rq_wait_for_bulk); + bulk->b_flags = PTL_BULK_SENT; + wake_up_interruptible(&bulk->b_waitq); } else { CERROR("Unexpected event type!\n"); BUG(); @@ -151,14 +153,15 @@ static int bulk_source_callback(ptl_event_t *ev, void *data) static int bulk_sink_callback(ptl_event_t *ev, void *data) { - struct ptlrpc_request *rpc = ev->mem_desc.user_ptr; + struct ptlrpc_bulk_desc *bulk = ev->mem_desc.user_ptr; ENTRY; if (ev->type == PTL_EVENT_PUT) { - if (rpc->rq_bulkbuf != ev->mem_desc.start + ev->offset) + if (bulk->b_buf != ev->mem_desc.start + ev->offset) CERROR("bulkbuf != mem_desc -- why?\n"); - wake_up_interruptible(&rpc->rq_wait_for_bulk); + bulk->b_flags = PTL_BULK_RCVD; + wake_up_interruptible(&bulk->b_waitq); } else { CERROR("Unexpected event type!\n"); BUG(); @@ -171,15 +174,20 @@ static int bulk_sink_callback(ptl_event_t *ev, void *data) int ptlrpc_init_portals(void) { int rc; - const ptl_handle_ni_t *nip; ptl_handle_ni_t ni; - nip = inter_module_get_request(LUSTRE_NAL "_ni", LUSTRE_NAL); - if (nip == NULL) { - CERROR("get_ni failed: is the NAL module loaded?\n"); + socknal_nip = inter_module_get_request("ksocknal_ni", "ksocknal"); + qswnal_nip = inter_module_get_request("kqswnal_ni", "kqswnal"); + if (socknal_nip == NULL && qswnal_nip == NULL) { + CERROR("get_ni failed: is a NAL module loaded?\n"); return -EIO; } - ni = *nip; + + /* Use the qswnal if it's there */ + if (qswnal_nip != NULL) + ni = *qswnal_nip; + else + ni = *socknal_nip; rc = PtlEQAlloc(ni, 128, sent_packet_callback, NULL, &sent_pkt_eq); if (rc != PTL_OK) @@ -207,5 +215,8 @@ void ptlrpc_exit_portals(void) PtlEQFree(bulk_source_eq); PtlEQFree(bulk_sink_eq); - inter_module_put(LUSTRE_NAL "_ni"); + if (qswnal_nip != NULL) + inter_module_put("kqswnal_ni"); + if (socknal_nip != NULL) + inter_module_put("ksocknal_ni"); } diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index f019560..33a4900 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -1,4 +1,3 @@ - /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * @@ -34,29 +33,61 @@ #include extern ptl_handle_eq_t bulk_source_eq, sent_pkt_eq, rcvd_rep_eq, bulk_sink_eq; +static ptl_process_id_t local_id = {PTL_ADDR_GID, PTL_ID_ANY, PTL_ID_ANY}; + + +int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk) +{ + if (bulk->b_flags == PTL_BULK_SENT) { + EXIT; + return 1; + } + + if (sigismember(&(current->pending.signal), SIGKILL) || + sigismember(&(current->pending.signal), SIGINT)) { + bulk->b_flags = PTL_RPC_INTR; + EXIT; + return 1; + } + + CERROR("no event yet\n"); + return 0; +} int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer, - int portal, int is_request) + int portal) { int rc; ptl_process_id_t remote_id; ptl_handle_md_t md_h; + ptl_ack_req_t ack; - /* FIXME: This is bad. */ - if (request->rq_bulklen) { + switch (request->rq_type) { + case PTL_RPC_BULK: request->rq_req_md.start = request->rq_bulkbuf; request->rq_req_md.length = request->rq_bulklen; request->rq_req_md.eventq = bulk_source_eq; - } else if (is_request) { + request->rq_req_md.threshold = 2; /* SENT and ACK events */ + ack = PTL_ACK_REQ; + break; + case PTL_RPC_REQUEST: request->rq_req_md.start = request->rq_reqbuf; request->rq_req_md.length = request->rq_reqlen; request->rq_req_md.eventq = sent_pkt_eq; - } else { + request->rq_req_md.threshold = 1; + ack = PTL_NOACK_REQ; + break; + case PTL_RPC_REPLY: request->rq_req_md.start = request->rq_repbuf; request->rq_req_md.length = request->rq_replen; request->rq_req_md.eventq = sent_pkt_eq; + request->rq_req_md.threshold = 1; + ack = PTL_NOACK_REQ; + break; + default: + BUG(); + return -1; /* notreached */ } - request->rq_req_md.threshold = 1; request->rq_req_md.options = PTL_MD_OP_PUT; request->rq_req_md.user_ptr = request; @@ -71,13 +102,10 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer, remote_id.nid = peer->peer_nid; remote_id.pid = 0; - if (request->rq_bulklen) { - rc = PtlPut(md_h, PTL_ACK_REQ, remote_id, portal, 0, - request->rq_xid, 0, 0); - } else { - rc = PtlPut(md_h, PTL_NOACK_REQ, remote_id, portal, 0, - request->rq_xid, 0, 0); - } + CERROR("Sending %d bytes to portal %d, xid %d\n", + request->rq_req_md.length, portal, request->rq_xid); + + rc = PtlPut(md_h, ack, remote_id, portal, 0, request->rq_xid, 0, 0); if (rc != PTL_OK) { BUG(); CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid, @@ -88,9 +116,152 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer, return rc; } +int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *bulk, int portal) +{ + int rc; + ptl_process_id_t remote_id; + ptl_handle_md_t md_h; + + bulk->b_md.start = bulk->b_buf; + bulk->b_md.length = bulk->b_buflen; + bulk->b_md.eventq = bulk_source_eq; + bulk->b_md.threshold = 2; /* SENT and ACK events */ + bulk->b_md.options = PTL_MD_OP_PUT; + bulk->b_md.user_ptr = bulk; + + rc = PtlMDBind(bulk->b_peer.peer_ni, bulk->b_md, &md_h); + if (rc != 0) { + BUG(); + CERROR("PtlMDBind failed: %d\n", rc); + return rc; + } + + remote_id.addr_kind = PTL_ADDR_NID; + remote_id.nid = bulk->b_peer.peer_nid; + remote_id.pid = 0; + + CERROR("Sending %d bytes to portal %d, xid %d\n", + bulk->b_md.length, portal, bulk->b_xid); + + rc = PtlPut(md_h, PTL_ACK_REQ, remote_id, portal, 0, bulk->b_xid, 0, 0); + if (rc != PTL_OK) { + BUG(); + CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid, + portal, bulk->b_xid, rc); + /* FIXME: tear down md */ + } + + return rc; +} + +int ptlrpc_wait_bulk(struct ptlrpc_bulk_desc *bulk) +{ + int rc; + + ENTRY; + + rc = PtlMEPrepend(bulk->b_peer.peer_ni, bulk->b_portal, local_id, + bulk->b_xid, 0, PTL_UNLINK, &bulk->b_me_h); + if (rc != PTL_OK) { + CERROR("PtlMEAttach failed: %d\n", rc); + BUG(); + EXIT; + goto cleanup1; + } + + bulk->b_md.start = bulk->b_buf; + bulk->b_md.length = bulk->b_buflen; + bulk->b_md.threshold = 1; + bulk->b_md.options = PTL_MD_OP_PUT; + bulk->b_md.user_ptr = bulk; + bulk->b_md.eventq = bulk_sink_eq; + + rc = PtlMDAttach(bulk->b_me_h, bulk->b_md, PTL_UNLINK, &bulk->b_md_h); + if (rc != PTL_OK) { + CERROR("PtlMDAttach failed: %d\n", rc); + BUG(); + EXIT; + goto cleanup2; + } + + CDEBUG(D_NET, "Setup bulk sink buffer: %u bytes, xid %u, portal %u\n", + bulk->b_buflen, bulk->b_xid, bulk->b_portal); + + cleanup2: + PtlMEUnlink(bulk->b_me_h); + cleanup1: + PtlMDUnlink(bulk->b_md_h); + + return rc; +} + +int ptlrpc_reply(struct obd_device *obddev, struct ptlrpc_service *svc, + struct ptlrpc_request *req) +{ + struct ptlrpc_request *clnt_req = req->rq_reply_handle; + ENTRY; + + if (req->rq_reply_handle == NULL) { + /* This is a request that came from the network via portals. */ + + /* FIXME: we need to increment the count of handled events */ + req->rq_type = PTL_RPC_REPLY; + req->rq_reqhdr->xid = req->rq_reqhdr->xid; + ptl_send_buf(req, &req->rq_peer, svc->srv_rep_portal); + } else { + /* This is a local request that came from another thread. */ + + /* move the reply to the client */ + clnt_req->rq_replen = req->rq_replen; + clnt_req->rq_repbuf = req->rq_repbuf; + req->rq_repbuf = NULL; + req->rq_replen = 0; + + /* free the request buffer */ + OBD_FREE(req->rq_reqbuf, req->rq_reqlen); + req->rq_reqbuf = NULL; + + /* wake up the client */ + wake_up_interruptible(&clnt_req->rq_wait_for_rep); + } + + EXIT; + return 0; +} + +int ptlrpc_error(struct obd_device *obddev, struct ptlrpc_service *svc, + struct ptlrpc_request *req) +{ + struct ptlrep_hdr *hdr; + + ENTRY; + + OBD_ALLOC(hdr, sizeof(*hdr)); + if (!hdr) { + EXIT; + return -ENOMEM; + } + + memset(hdr, 0, sizeof(*hdr)); + + hdr->xid = req->rq_reqhdr->xid; + hdr->status = req->rq_status; + hdr->type = OST_TYPE_ERR; + + if (req->rq_repbuf) { + CERROR("req has repbuf\n"); + BUG(); + } + + req->rq_repbuf = (char *)hdr; + req->rq_replen = sizeof(*hdr); + + EXIT; + return ptlrpc_reply(obddev, svc, req); +} + int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) { - ptl_handle_me_t me_h, bulk_me_h; ptl_process_id_t local_id; int rc; char *repbuf; @@ -116,8 +287,9 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) local_id.rid = PTL_ID_ANY; //CERROR("sending req %d\n", request->rq_xid); - rc = PtlMEAttach(peer->peer_ni, request->rq_reply_portal, local_id, - request->rq_xid, 0, PTL_UNLINK, &me_h); + rc = PtlMEPrepend(peer->peer_ni, request->rq_reply_portal, local_id, + request->rq_xid, 0, PTL_UNLINK, + &request->rq_reply_me_h); if (rc != PTL_OK) { CERROR("PtlMEAttach failed: %d\n", rc); BUG(); @@ -125,6 +297,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) goto cleanup; } + request->rq_type = PTL_RPC_REQUEST; request->rq_reply_md.start = repbuf; request->rq_reply_md.length = request->rq_replen; request->rq_reply_md.threshold = 1; @@ -132,8 +305,8 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) request->rq_reply_md.user_ptr = request; request->rq_reply_md.eventq = rcvd_rep_eq; - rc = PtlMDAttach(me_h, request->rq_reply_md, PTL_UNLINK, - &request->rq_reply_md_h); + rc = PtlMDAttach(request->rq_reply_me_h, request->rq_reply_md, + PTL_UNLINK, &request->rq_reply_md_h); if (rc != PTL_OK) { CERROR("PtlMDAttach failed: %d\n", rc); BUG(); @@ -141,42 +314,13 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) goto cleanup2; } - if (request->rq_bulklen != 0) { - rc = PtlMEAttach(peer->peer_ni, request->rq_bulk_portal, - local_id, request->rq_xid, 0, PTL_UNLINK, - &bulk_me_h); - if (rc != PTL_OK) { - CERROR("PtlMEAttach failed: %d\n", rc); - BUG(); - EXIT; - goto cleanup3; - } - - request->rq_bulk_md.start = request->rq_bulkbuf; - request->rq_bulk_md.length = request->rq_bulklen; - request->rq_bulk_md.threshold = 1; - request->rq_bulk_md.options = PTL_MD_OP_PUT; - request->rq_bulk_md.user_ptr = request; - request->rq_bulk_md.eventq = bulk_sink_eq; - - rc = PtlMDAttach(bulk_me_h, request->rq_bulk_md, PTL_UNLINK, - &request->rq_bulk_md_h); - if (rc != PTL_OK) { - CERROR("PtlMDAttach failed: %d\n", rc); - BUG(); - EXIT; - goto cleanup4; - } - } + CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid %u, portal %u\n", + request->rq_replen, request->rq_xid, request->rq_reply_portal); - return ptl_send_buf(request, peer, request->rq_req_portal, 1); + return ptl_send_buf(request, peer, request->rq_req_portal); - cleanup4: - PtlMEUnlink(bulk_me_h); - cleanup3: - PtlMDUnlink(request->rq_reply_md_h); cleanup2: - PtlMEUnlink(me_h); + PtlMEUnlink(request->rq_reply_me_h); cleanup: OBD_FREE(repbuf, request->rq_replen); diff --git a/lustre/ptlrpc/rpc.c b/lustre/ptlrpc/rpc.c index d7f46a1..1a6d58d 100644 --- a/lustre/ptlrpc/rpc.c +++ b/lustre/ptlrpc/rpc.c @@ -29,618 +29,22 @@ #define DEBUG_SUBSYSTEM S_RPC #include +#include #include -static ptl_handle_eq_t sent_pkt_eq, rcvd_rep_eq, - bulk_source_eq, bulk_sink_eq; +extern int ptlrpc_init_portals(void); +extern void ptlrpc_exit_portals(void); -struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, - int opcode, int namelen, char *name, - int tgtlen, char *tgt) -{ - struct ptlrpc_request *request; - int rc; - ENTRY; - - OBD_ALLOC(request, sizeof(*request)); - if (!request) { - CERROR("request allocation out of memory\n"); - return NULL; - } - - memset(request, 0, sizeof(*request)); - request->rq_xid = cl->cli_xid++; - - rc = cl->cli_req_pack(name, namelen, tgt, tgtlen, - &request->rq_reqhdr, &request->rq_req, - &request->rq_reqlen, &request->rq_reqbuf); - if (rc) { - CERROR("cannot pack request %d\n", rc); - return NULL; - } - request->rq_reqhdr->opc = opcode; - request->rq_reqhdr->seqno = request->rq_xid; - - EXIT; - return request; -} - -void ptlrpc_free_req(struct ptlrpc_request *request) -{ - OBD_FREE(request, sizeof(*request)); -} - -/* Abort this request and cleanup any resources associated with it. */ -int ptl_abort_rpc(struct ptlrpc_request *request) -{ - /* First remove the MD for the reply; in theory, this means - * that we can tear down the buffer safely. */ - PtlMEUnlink(request->rq_reply_me_h); - PtlMDUnlink(request->rq_reply_md_h); - - if (request->rq_bulklen != 0) { - PtlMEUnlink(request->rq_bulk_me_h); - PtlMDUnlink(request->rq_bulk_md_h); - } - - return 0; -} - -int ptlrpc_queue_wait(struct ptlrpc_request *req, struct ptlrpc_client *cl) -{ - int rc; - DECLARE_WAITQUEUE(wait, current); - - init_waitqueue_head(&req->rq_wait_for_rep); - - if (cl->cli_enqueue) { - /* Local delivery */ - ENTRY; - rc = cl->cli_enqueue(req); - } else { - /* Remote delivery via portals. */ - req->rq_req_portal = cl->cli_request_portal; - req->rq_reply_portal = cl->cli_reply_portal; - rc = ptl_send_rpc(req, &cl->cli_server); - } - if (rc) { - CERROR("error %d, opcode %d\n", rc, - req->rq_reqhdr->opc); - return -rc; - } - - CDEBUG(0, "-- sleeping\n"); - add_wait_queue(&req->rq_wait_for_rep, &wait); - while (req->rq_repbuf == NULL) { - set_current_state(TASK_INTERRUPTIBLE); - - /* if this process really wants to die, let it go */ - if (sigismember(&(current->pending.signal), SIGKILL) || - sigismember(&(current->pending.signal), SIGINT)) - break; - - schedule(); - } - remove_wait_queue(&req->rq_wait_for_rep, &wait); - set_current_state(TASK_RUNNING); - CDEBUG(0, "-- done\n"); - - if (req->rq_repbuf == NULL) { - /* We broke out because of a signal. Clean up the dangling - * reply buffers! */ - ptl_abort_rpc(req); - EXIT; - return -EINTR; - } - - rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, - &req->rq_rep); - if (rc) { - CERROR("unpack_rep failed: %d\n", rc); - return rc; - } - CERROR("got rep %lld\n", req->rq_rephdr->seqno); - if ( req->rq_rephdr->status == 0 ) - CDEBUG(0, "--> buf %p len %d status %d\n", - req->rq_repbuf, req->rq_replen, - req->rq_rephdr->status); - - EXIT; - return 0; -} -/* - * Free the packet when it has gone out - */ -static int sent_packet_callback(ptl_event_t *ev, void *data) -{ - ENTRY; - - if (ev->type == PTL_EVENT_SENT) { - OBD_FREE(ev->mem_desc.start, ev->mem_desc.length); - } else { - // XXX make sure we understand all events, including ACK's - CERROR("Unknown event %d\n", ev->type); - BUG(); - } - - EXIT; - return 1; -} - -/* - * Wake up the thread waiting for the reply once it comes in. - */ -static int rcvd_reply_callback(ptl_event_t *ev, void *data) -{ - struct ptlrpc_request *rpc = ev->mem_desc.user_ptr; - ENTRY; - - if (ev->type == PTL_EVENT_PUT) { - rpc->rq_repbuf = ev->mem_desc.start + ev->offset; - barrier(); - wake_up_interruptible(&rpc->rq_wait_for_rep); - } else { - // XXX make sure we understand all events, including ACK's - CERROR("Unknown event %d\n", ev->type); - BUG(); - } - - EXIT; - return 1; -} - -static int server_request_callback(ptl_event_t *ev, void *data) -{ - struct ptlrpc_service *service = data; - int rc; - - if (ev->rlength != ev->mlength) - CERROR("Warning: Possibly truncated rpc (%d/%d)\n", - ev->mlength, ev->rlength); - - /* The ME is unlinked when there is less than 1024 bytes free - * on its MD. This ensures we are always able to handle the rpc, - * although the 1024 value is a guess as to the size of a - * large rpc (the known safe margin should be determined). - * - * NOTE: The portals API by default unlinks all MD's associated - * with an ME when it's unlinked. For now, this behavior - * has been commented out of the portals library so the - * MD can be unlinked when its ref count drops to zero. - * A new MD and ME will then be created that use the same - * kmalloc()'ed memory and inserted at the ring tail. - */ - - service->srv_ref_count[service->srv_md_active]++; - - if (ev->offset >= (service->srv_buf_size - 1024)) { - CDEBUG(D_INODE, "Unlinking ME %d\n", service->srv_me_active); - - rc = PtlMEUnlink(service->srv_me_h[service->srv_me_active]); - service->srv_me_h[service->srv_me_active] = 0; - - if (rc != PTL_OK) { - CERROR("PtlMEUnlink failed - DROPPING soon: %d\n", rc); - return rc; - } - - service->srv_me_active = NEXT_INDEX(service->srv_me_active, - service->srv_ring_length); - - if (service->srv_me_h[service->srv_me_active] == 0) - CERROR("All %d ring ME's are unlinked!\n", - service->srv_ring_length); - } - - if (ev->type == PTL_EVENT_PUT) { - wake_up(service->srv_wait_queue); - } else { - CERROR("Unexpected event type: %d\n", ev->type); - } - - return 0; -} - -static int bulk_source_callback(ptl_event_t *ev, void *data) -{ - struct ptlrpc_request *rpc = ev->mem_desc.user_ptr; - - ENTRY; - - if (ev->type == PTL_EVENT_SENT) { - CDEBUG(D_NET, "got SENT event\n"); - } else if (ev->type == PTL_EVENT_ACK) { - CDEBUG(D_NET, "got ACK event\n"); - rpc->rq_bulkbuf = NULL; - wake_up_interruptible(&rpc->rq_wait_for_bulk); - } else { - CERROR("Unexpected event type!\n"); - BUG(); - } - - EXIT; - return 1; -} - -static int bulk_sink_callback(ptl_event_t *ev, void *data) -{ - struct ptlrpc_request *rpc = ev->mem_desc.user_ptr; - - ENTRY; - - if (ev->type == PTL_EVENT_PUT) { - if (rpc->rq_bulkbuf != ev->mem_desc.start + ev->offset) - CERROR("bulkbuf != mem_desc -- why?\n"); - //wake_up_interruptible(&rpc->rq_wait_for_bulk); - } else { - CERROR("Unexpected event type!\n"); - BUG(); - } - - EXIT; - return 1; -} - -int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer, - int portal) -{ - int rc; - ptl_process_id_t remote_id; - ptl_handle_md_t md_h; - ptl_ack_req_t ack; - - switch (request->rq_type) { - case PTLRPC_BULK: - request->rq_req_md.start = request->rq_bulkbuf; - request->rq_req_md.length = request->rq_bulklen; - request->rq_req_md.eventq = bulk_source_eq; - request->rq_req_md.threshold = 2; /* SENT and ACK events */ - ack = PTL_ACK_REQ; - break; - case PTLRPC_REQUEST: - request->rq_req_md.start = request->rq_reqbuf; - request->rq_req_md.length = request->rq_reqlen; - request->rq_req_md.eventq = sent_pkt_eq; - request->rq_req_md.threshold = 1; - ack = PTL_NOACK_REQ; - break; - case PTLRPC_REPLY: - request->rq_req_md.start = request->rq_repbuf; - request->rq_req_md.length = request->rq_replen; - request->rq_req_md.eventq = sent_pkt_eq; - request->rq_req_md.threshold = 1; - ack = PTL_NOACK_REQ; - break; - default: - BUG(); - } - request->rq_req_md.options = PTL_MD_OP_PUT; - request->rq_req_md.user_ptr = request; - - rc = PtlMDBind(peer->peer_ni, request->rq_req_md, &md_h); - if (rc != 0) { - BUG(); - CERROR("PtlMDBind failed: %d\n", rc); - return rc; - } - - remote_id.addr_kind = PTL_ADDR_NID; - remote_id.nid = peer->peer_nid; - remote_id.pid = 0; - - CERROR("Sending %d bytes to portal %d, xid %d\n", - request->rq_req_md.length, portal, request->rq_xid); - - rc = PtlPut(md_h, ack, remote_id, portal, 0, request->rq_xid, 0, 0); - if (rc != PTL_OK) { - BUG(); - CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid, - portal, request->rq_xid, rc); - /* FIXME: tear down md */ - } - - return rc; -} - -int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) -{ - ptl_process_id_t local_id; - int rc; - char *repbuf; - - ENTRY; - - if (request->rq_replen == 0) { - CERROR("request->rq_replen is 0!\n"); - EXIT; - return -EINVAL; - } - - /* request->rq_repbuf is set only when the reply comes in, in - * client_packet_callback() */ - OBD_ALLOC(repbuf, request->rq_replen); - if (!repbuf) { - EXIT; - return -ENOMEM; - } - - local_id.addr_kind = PTL_ADDR_GID; - local_id.gid = PTL_ID_ANY; - local_id.rid = PTL_ID_ANY; - - CERROR("sending req %d\n", request->rq_xid); - rc = PtlMEAttach(peer->peer_ni, request->rq_reply_portal, local_id, - request->rq_xid, 0, PTL_UNLINK, - &request->rq_reply_me_h); - if (rc != PTL_OK) { - CERROR("PtlMEAttach failed: %d\n", rc); - BUG(); - EXIT; - goto cleanup; - } - - request->rq_type = PTLRPC_REQUEST; - request->rq_reply_md.start = repbuf; - request->rq_reply_md.length = request->rq_replen; - request->rq_reply_md.threshold = 1; - request->rq_reply_md.options = PTL_MD_OP_PUT; - request->rq_reply_md.user_ptr = request; - request->rq_reply_md.eventq = rcvd_rep_eq; - - rc = PtlMDAttach(request->rq_reply_me_h, request->rq_reply_md, - PTL_UNLINK, &request->rq_reply_md_h); - if (rc != PTL_OK) { - CERROR("PtlMDAttach failed: %d\n", rc); - BUG(); - EXIT; - goto cleanup2; - } - - if (request->rq_bulklen != 0) { - rc = PtlMEAttach(peer->peer_ni, request->rq_bulk_portal, - local_id, request->rq_xid, 0, PTL_UNLINK, - &request->rq_bulk_me_h); - if (rc != PTL_OK) { - CERROR("PtlMEAttach failed: %d\n", rc); - BUG(); - EXIT; - goto cleanup3; - } - - request->rq_bulk_md.start = request->rq_bulkbuf; - request->rq_bulk_md.length = request->rq_bulklen; - request->rq_bulk_md.threshold = 1; - request->rq_bulk_md.options = PTL_MD_OP_PUT; - request->rq_bulk_md.user_ptr = request; - request->rq_bulk_md.eventq = bulk_sink_eq; - - rc = PtlMDAttach(request->rq_bulk_me_h, - request->rq_bulk_md, PTL_UNLINK, - &request->rq_bulk_md_h); - if (rc != PTL_OK) { - CERROR("PtlMDAttach failed: %d\n", rc); - BUG(); - EXIT; - goto cleanup4; - } - } - - return ptl_send_buf(request, peer, request->rq_req_portal); - - cleanup4: - PtlMEUnlink(request->rq_bulk_me_h); - cleanup3: - PtlMDUnlink(request->rq_reply_md_h); - cleanup2: - PtlMEUnlink(request->rq_reply_me_h); - cleanup: - OBD_FREE(repbuf, request->rq_replen); - - return rc; -} - -/* ptl_received_rpc() should be called by the sleeping process once - * it finishes processing an event. This ensures the ref count is - * decremented and that the rpc ring buffer cycles properly. - */ -int ptl_received_rpc(struct ptlrpc_service *service) { - int rc, index; - - index = service->srv_md_active; - CDEBUG(D_INFO, "MD index=%d Ref Count=%d\n", index, - service->srv_ref_count[index]); - service->srv_ref_count[index]--; - - if ((service->srv_ref_count[index] <= 0) && - (service->srv_me_h[index] == 0)) { - - /* Replace the unlinked ME and MD */ - rc = PtlMEInsert(service->srv_me_h[service->srv_me_tail], - service->srv_id, 0, ~0, PTL_RETAIN, - PTL_INS_AFTER, &(service->srv_me_h[index])); - CDEBUG(D_INFO, "Inserting new ME and MD in ring, rc %d\n", rc); - service->srv_me_tail = index; - service->srv_ref_count[index] = 0; - - if (rc != PTL_OK) { - CERROR("PtlMEInsert failed: %d\n", rc); - return rc; - } - - service->srv_md[index].start = service->srv_buf[index]; - service->srv_md[index].length = service->srv_buf_size; - service->srv_md[index].threshold = PTL_MD_THRESH_INF; - service->srv_md[index].options = PTL_MD_OP_PUT; - service->srv_md[index].user_ptr = service; - service->srv_md[index].eventq = service->srv_eq_h; - - rc = PtlMDAttach(service->srv_me_h[index], - service->srv_md[index], - PTL_RETAIN, &(service->srv_md_h[index])); - - CDEBUG(D_INFO, "Attach MD in ring, rc %d\n", rc); - if (rc != PTL_OK) { - /* XXX cleanup */ - BUG(); - CERROR("PtlMDAttach failed: %d\n", rc); - return rc; - } - - service->srv_md_active = - NEXT_INDEX(index, service->srv_ring_length); - } - - return 0; -} - -int rpc_register_service(struct ptlrpc_service *service, char *uuid) -{ - struct lustre_peer peer; - int rc, i; - - rc = kportal_uuid_to_peer(uuid, &peer); - if (rc != 0) { - CERROR("Invalid uuid \"%s\"\n", uuid); - return -EINVAL; - } - - service->srv_ring_length = RPC_RING_LENGTH; - service->srv_me_active = 0; - service->srv_md_active = 0; - - service->srv_id.addr_kind = PTL_ADDR_GID; - service->srv_id.gid = PTL_ID_ANY; - service->srv_id.rid = PTL_ID_ANY; - - rc = PtlEQAlloc(peer.peer_ni, 128, server_request_callback, - service, &(service->srv_eq_h)); - - if (rc != PTL_OK) { - CERROR("PtlEQAlloc failed: %d\n", rc); - return rc; - } - - /* Attach the leading ME on which we build the ring */ - rc = PtlMEAttach(peer.peer_ni, service->srv_portal, - service->srv_id, 0, ~0, PTL_RETAIN, - &(service->srv_me_h[0])); - - if (rc != PTL_OK) { - CERROR("PtlMEAttach failed: %d\n", rc); - return rc; - } - - for (i = 0; i < service->srv_ring_length; i++) { - OBD_ALLOC(service->srv_buf[i], service->srv_buf_size); - - if (service->srv_buf[i] == NULL) { - CERROR("no memory\n"); - return -ENOMEM; - } - - /* Insert additional ME's to the ring */ - if (i > 0) { - rc = PtlMEInsert(service->srv_me_h[i-1], - service->srv_id, 0, ~0, PTL_RETAIN, - PTL_INS_AFTER,&(service->srv_me_h[i])); - service->srv_me_tail = i; - - if (rc != PTL_OK) { - CERROR("PtlMEInsert failed: %d\n", rc); - return rc; - } - } - - service->srv_ref_count[i] = 0; - service->srv_md[i].start = service->srv_buf[i]; - service->srv_md[i].length = service->srv_buf_size; - service->srv_md[i].threshold = PTL_MD_THRESH_INF; - service->srv_md[i].options = PTL_MD_OP_PUT; - service->srv_md[i].user_ptr = service; - service->srv_md[i].eventq = service->srv_eq_h; - - rc = PtlMDAttach(service->srv_me_h[i], service->srv_md[i], - PTL_RETAIN, &(service->srv_md_h[i])); - - if (rc != PTL_OK) { - /* cleanup */ - CERROR("PtlMDAttach failed: %d\n", rc); - return rc; - } - } - - return 0; -} - -int rpc_unregister_service(struct ptlrpc_service *service) -{ - int rc, i; - - for (i = 0; i < service->srv_ring_length; i++) { - rc = PtlMDUnlink(service->srv_md_h[i]); - if (rc) - CERROR("PtlMDUnlink failed: %d\n", rc); - - rc = PtlMEUnlink(service->srv_me_h[i]); - if (rc) - CERROR("PtlMEUnlink failed: %d\n", rc); - - OBD_FREE(service->srv_buf[i], service->srv_buf_size); - } - - rc = PtlEQFree(service->srv_eq_h); - if (rc) - CERROR("PtlEQFree failed: %d\n", rc); - - return 0; -} - -static int req_init_portals(void) -{ - int rc; - const ptl_handle_ni_t *nip; - ptl_handle_ni_t ni; - - nip = inter_module_get_request(LUSTRE_NAL "_ni", LUSTRE_NAL); - if (nip == NULL) { - CERROR("get_ni failed: is the NAL module loaded?\n"); - return -EIO; - } - ni = *nip; - - rc = PtlEQAlloc(ni, 128, sent_packet_callback, NULL, &sent_pkt_eq); - if (rc != PTL_OK) - CERROR("PtlEQAlloc failed: %d\n", rc); - - rc = PtlEQAlloc(ni, 128, rcvd_reply_callback, NULL, &rcvd_rep_eq); - if (rc != PTL_OK) - CERROR("PtlEQAlloc failed: %d\n", rc); - - rc = PtlEQAlloc(ni, 128, bulk_source_callback, NULL, &bulk_source_eq); - if (rc != PTL_OK) - CERROR("PtlEQAlloc failed: %d\n", rc); - - rc = PtlEQAlloc(ni, 128, bulk_sink_callback, NULL, &bulk_sink_eq); - if (rc != PTL_OK) - CERROR("PtlEQAlloc failed: %d\n", rc); - - return rc; -} static int __init ptlrpc_init(void) { - return req_init_portals(); + return ptlrpc_init_portals(); } static void __exit ptlrpc_exit(void) { - PtlEQFree(sent_pkt_eq); - PtlEQFree(rcvd_rep_eq); - PtlEQFree(bulk_source_eq); - PtlEQFree(bulk_sink_eq); - - inter_module_put(LUSTRE_NAL "_ni"); + ptlrpc_exit_portals(); return; } diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 89830f1..f560a41 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -37,10 +37,8 @@ extern int server_request_callback(ptl_event_t *ev, void *data); static int ptlrpc_check_event(struct ptlrpc_service *svc) { - if (sigismember(&(current->pending.signal), - SIGKILL) || - sigismember(&(current->pending.signal), - SIGINT)) { + if (sigismember(&(current->pending.signal), SIGKILL) || + sigismember(&(current->pending.signal), SIGINT)) { svc->srv_flags |= SVC_KILLED; EXIT; return 1; @@ -80,13 +78,9 @@ static int ptlrpc_check_event(struct ptlrpc_service *svc) return 0; } -struct ptlrpc_service *ptlrpc_init_svc(__u32 bufsize, - int portal, - char *uuid, - req_unpack_t unpack, - rep_pack_t pack, - svc_handler_t handler - ) +struct ptlrpc_service * +ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid, + req_unpack_t unpack, rep_pack_t pack, svc_handler_t handler) { int err; struct ptlrpc_service *svc; @@ -108,7 +102,8 @@ struct ptlrpc_service *ptlrpc_init_svc(__u32 bufsize, svc->srv_flags = 0; svc->srv_buf_size = bufsize; - svc->srv_portal = portal; + svc->srv_rep_portal = rep_portal; + svc->srv_req_portal = req_portal; svc->srv_req_unpack = unpack; svc->srv_rep_pack = pack; svc->srv_handler = handler; @@ -169,6 +164,7 @@ static int ptlrpc_main(void *arg) * we should put the request on the stack of * mds_handle instead. */ memset(&request, 0, sizeof(request)); + request.rq_obd = obddev; request.rq_reqbuf = svc->srv_ev.mem_desc.start + svc->srv_ev.offset; request.rq_reqlen = svc->srv_ev.mem_desc.length; request.rq_xid = svc->srv_ev.match_bits; @@ -203,7 +199,7 @@ static int ptlrpc_main(void *arg) svc->srv_thread = NULL; svc->srv_flags = SVC_STOPPED; wake_up(&svc->srv_ctl_waitq); - CERROR("svc %s: exiting\n", data->name); + CERROR("svc exiting process %d\n", current->pid); return 0; } @@ -212,7 +208,8 @@ void ptlrpc_stop_thread(struct ptlrpc_service *svc) svc->srv_flags = SVC_STOPPING; wake_up(&svc->srv_waitq); - wait_event_interruptible(svc->srv_ctl_waitq, (svc->srv_flags & SVC_STOPPED)); + wait_event_interruptible(svc->srv_ctl_waitq, + (svc->srv_flags & SVC_STOPPED)); } int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc, @@ -270,7 +267,7 @@ int rpc_register_service(struct ptlrpc_service *service, char *uuid) } /* Attach the leading ME on which we build the ring */ - rc = PtlMEAttach(peer.peer_ni, service->srv_portal, + rc = PtlMEAttach(peer.peer_ni, service->srv_req_portal, service->srv_id, 0, ~0, PTL_RETAIN, &(service->srv_me_h[0])); @@ -333,8 +330,10 @@ int rpc_unregister_service(struct ptlrpc_service *service) rc = PtlMEUnlink(service->srv_me_h[i]); if (rc) CERROR("PtlMEUnlink failed: %d\n", rc); - - OBD_FREE(service->srv_buf[i], service->srv_buf_size); + + if (service->srv_buf[i] != NULL) + OBD_FREE(service->srv_buf[i], service->srv_buf_size); + service->srv_buf[i] = NULL; } rc = PtlEQFree(service->srv_eq_h); diff --git a/lustre/tests/llmount.sh b/lustre/tests/llmount.sh index bc888bb..b16d19d 100755 --- a/lustre/tests/llmount.sh +++ b/lustre/tests/llmount.sh @@ -57,9 +57,9 @@ attach ost setup 1 device 3 attach osc -setup +setup -1 quit EOF mkdir /mnt/obd -mount -t lustre_light -o device=3 none /mnt/obd +# mount -t lustre_light -o device=3 none /mnt/obd diff --git a/lustre/tests/ostreq.sh b/lustre/tests/ostreq.sh index 685488f..ab26b7c 100644 --- a/lustre/tests/ostreq.sh +++ b/lustre/tests/ostreq.sh @@ -12,20 +12,21 @@ insmod $R/usr/src/portals/linux/socknal/ksocknal.o || exit -1 $R/usr/src/portals/linux/utils/acceptor 1234 & +insmod $R/usr/src/obd/class/obdclass.o || exit -1 +insmod $R/usr/src/obd/rpc/ptlrpc.o || exit -1 +insmod $R/usr/src/obd/ext2obd/obdext2.o || exit -1 +insmod $R/usr/src/obd/ost/ost.o || exit -1 +insmod $R/usr/src/obd/osc/osc.o || exit -1 + $R/usr/src/portals/linux/utils/ptlctl < /proc/sys/obd/trace mknod /dev/obd c 10 241 +$R/usr/src/obd/utils/obdctl modules > $R/tmp/ogdb + $R/usr/src/obd/utils/obdctl <