From c0ab3403465d75c80c5486347741172dca9e0f2f Mon Sep 17 00:00:00 2001 From: braam Date: Wed, 31 Jul 2002 20:49:39 +0000 Subject: [PATCH] - move the peter branch changes to the head --- lustre/archdep.m4 | 2 +- lustre/configure.in | 3 +- lustre/include/linux/lustre_dlm.h | 32 ++- lustre/include/linux/lustre_idl.h | 38 ++-- lustre/include/linux/lustre_lib.h | 7 +- lustre/include/linux/lustre_lite.h | 4 +- lustre/include/linux/lustre_mds.h | 4 +- lustre/include/linux/lustre_net.h | 15 +- lustre/include/linux/obd.h | 31 +-- lustre/include/linux/obd_class.h | 12 +- lustre/ldlm/Makefile.am | 6 +- lustre/ldlm/ldlm_lock.c | 210 ++++++------------- lustre/ldlm/ldlm_lockd.c | 123 ++++++------ lustre/ldlm/ldlm_request.c | 229 ++++++++++++++------- lustre/ldlm/ldlm_test.c | 16 +- lustre/lib/l_net.c | 195 ++++++++++++++++++ lustre/lib/lov_pack.c | 8 +- lustre/llite/file.c | 4 +- lustre/llite/namei.c | 4 +- lustre/llite/recover.c | 14 +- lustre/llite/rw.c | 9 +- lustre/llite/super.c | 56 +++--- lustre/lov/lov_obd.c | 284 +++++++++++++++++--------- lustre/mdc/Makefile.am | 5 +- lustre/mdc/mdc_reint.c | 34 +--- lustre/mdc/mdc_request.c | 402 +++---------------------------------- lustre/mds/handler.c | 275 ++++++++++++++++++------- lustre/mds/mds_extN.c | 4 +- lustre/mds/mds_reint.c | 49 ++--- lustre/obdclass/class_obd.c | 4 +- lustre/obdecho/echo.c | 70 ++++++- lustre/obdfilter/filter.c | 18 +- lustre/osc/Makefile.am | 5 +- lustre/osc/osc_request.c | 314 ++++------------------------- lustre/ost/ost_handler.c | 65 ++++-- lustre/ptlrpc/client.c | 27 ++- lustre/ptlrpc/events.c | 71 +++++-- lustre/ptlrpc/niobuf.c | 202 ++++++++++++------- lustre/ptlrpc/service.c | 3 + lustre/tests/Makefile.am | 2 +- lustre/tests/common.sh | 6 +- lustre/tests/intent-test.sh | 2 +- lustre/tests/llsetup.sh | 2 +- lustre/tests/lov.xml | 2 +- lustre/utils/device.c | 15 +- lustre/utils/lconf | 49 +++-- lustre/utils/lustre.dtd | 1 + lustre/utils/obdctl.c | 178 +++++++++++++--- lustre/utils/parser.c | 1 - 49 files changed, 1657 insertions(+), 1455 deletions(-) diff --git a/lustre/archdep.m4 b/lustre/archdep.m4 index eae4bf0..f298ede 100644 --- a/lustre/archdep.m4 +++ b/lustre/archdep.m4 @@ -15,7 +15,7 @@ AC_MSG_CHECKING(setting make flags system architecture: ) case ${host_cpu} in um ) AC_MSG_RESULT($host_cpu) - KCFLAGS='-g -Wall -pipe -Wno-trigraphs -Wstrict-prototypes -fno-strict-aliasing -fno-common ' + KCFLAGS='-g -Wall -pipe -Wno-trigraphs -Wstrict-prototypes -fno-strict-aliasing -fno-common ' KCPPFLAGS='-D__KERNEL__ -U__i386__ -Ui386 -DUM_FASTCALL -D__arch_um__ -DSUBARCH="i386" -DNESTING=0 -D_LARGEFILE64_SOURCE -Derrno=kernel_errno -DPATCHLEVEL=4 -DMODULE -I$(LINUX)/arch/um/include ' MOD_LINK=elf_i386 ;; diff --git a/lustre/configure.in b/lustre/configure.in index 453465a..eea6d94 100644 --- a/lustre/configure.in +++ b/lustre/configure.in @@ -100,7 +100,8 @@ AC_SUBST(docdir) demodir='$(docdir)/demo' AC_SUBST(demodir) -AM_CONFIG_HEADER(include/config.h) +# not needed until the AC_CHECK_LIB(readline) above works +# AM_CONFIG_HEADER(include/config.h) AC_OUTPUT(Makefile lib/Makefile ldlm/Makefile \ obdecho/Makefile ptlrpc/Makefile \ diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index f4d7e3b..cf1e79b 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -33,6 +33,7 @@ typedef enum { #define LDLM_FL_CBPENDING (1 << 4) #define LDLM_FL_AST_SENT (1 << 5) #define LDLM_FL_DESTROYED (1 << 6) +#define LDLM_FL_WAIT_NOREPROC (1 << 7) #define L2B(c) (1 << c) @@ -104,6 +105,8 @@ typedef int (*ldlm_lock_callback)(struct lustre_handle *lockh, struct ldlm_lock_desc *new, void *data, __u32 data_len); +typedef int (*ldlm_completion_callback)(struct ldlm_lock *lock, int flags); + struct ldlm_lock { __u64 l_random; int l_refc; @@ -116,11 +119,12 @@ struct ldlm_lock { ldlm_mode_t l_req_mode; ldlm_mode_t l_granted_mode; - ldlm_lock_callback l_completion_ast; + ldlm_completion_callback l_completion_ast; ldlm_lock_callback l_blocking_ast; struct ptlrpc_connection *l_connection; struct ptlrpc_client *l_client; + struct lustre_handle *l_connh; __u32 l_flags; struct lustre_handle l_remote_handle; void *l_data; @@ -178,6 +182,7 @@ struct ldlm_ast_work { int w_blocking; struct ldlm_lock_desc w_desc; struct list_head w_list; + int w_flags; void *w_data; int w_datalen; }; @@ -192,6 +197,7 @@ extern struct obd_ops ldlm_obd_ops; extern char *ldlm_lockname[]; extern char *ldlm_typename[]; +extern char *ldlm_it2str(int it); #define LDLM_DEBUG(lock, format, a...) \ do { \ @@ -225,7 +231,15 @@ do { \ int ldlm_extent_compat(struct ldlm_lock *, struct ldlm_lock *); int ldlm_extent_policy(struct ldlm_lock *, void *, ldlm_mode_t, void *); +/* ldlm_lockd.c */ +int ldlm_handle_enqueue(struct ptlrpc_request *req); +int ldlm_handle_convert(struct ptlrpc_request *req); +int ldlm_handle_cancel(struct ptlrpc_request *req); + /* ldlm_lock.c */ +void ldlm_register_intent(int (*arg)(struct ldlm_lock *lock, void *req_cookie, + ldlm_mode_t mode, void *data)); +void ldlm_unregister_intent(void); void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh); struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *handle); void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh); @@ -260,7 +274,7 @@ ldlm_lock_create(struct ldlm_namespace *ns, __u32 data_len); ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock *lock, void *cookie, int cookie_len, int *flags, - ldlm_lock_callback completion, + ldlm_completion_callback completion, ldlm_lock_callback blocking); struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, int *flags); @@ -290,9 +304,8 @@ void ldlm_resource_dump(struct ldlm_resource *res); int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3]); /* ldlm_request.c */ -int ldlm_cli_enqueue(struct ptlrpc_client *cl, - struct ptlrpc_connection *peer, - struct lustre_handle *connh, +int ldlm_completion_ast(struct ldlm_lock *lock, int flags); +int ldlm_cli_enqueue(struct lustre_handle *conn, struct ptlrpc_request *req, struct ldlm_namespace *ns, struct lustre_handle *parent_lock_handle, @@ -301,13 +314,12 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, void *cookie, int cookielen, ldlm_mode_t mode, int *flags, + ldlm_completion_callback completion, ldlm_lock_callback callback, void *data, __u32 data_len, struct lustre_handle *lockh); -int ldlm_match_or_enqueue(struct ptlrpc_client *cl, - struct ptlrpc_connection *conn, - struct lustre_handle *connh, +int ldlm_match_or_enqueue(struct lustre_handle *connh, struct ptlrpc_request *req, struct ldlm_namespace *ns, struct lustre_handle *parent_lock_handle, @@ -316,14 +328,14 @@ int ldlm_match_or_enqueue(struct ptlrpc_client *cl, void *cookie, int cookielen, ldlm_mode_t mode, int *flags, + ldlm_completion_callback completion, ldlm_lock_callback callback, void *data, __u32 data_len, struct lustre_handle *lockh); int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new, void *data, __u32 data_len); -int ldlm_cli_convert(struct ptlrpc_client *, struct lustre_handle *, - struct lustre_handle *connh, int new_mode, int *flags); +int ldlm_cli_convert(struct lustre_handle *, int new_mode, int *flags); int ldlm_cli_cancel(struct lustre_handle *lockh); #endif /* __KERNEL__ */ diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index 1814325..5fa091e 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -196,12 +196,15 @@ struct lov_object_id { /* per-child structure */ __u64 l_object_id; }; +#define LOV_MAGIC 0x0BD00BD0 + struct lov_stripe_md { - __u64 lmd_magic; - __u64 lmd_object_id; /* lov object id */ - __u64 lmd_stripe_count; - __u32 lmd_size; - __u32 lmd_stripe_size; + __u32 lmd_magic; + __u32 lmd_easize; /* packed size of extended */ + __u64 lmd_object_id; /* lov object id */ + __u64 lmd_stripe_offset; /* offset of the stripe */ + __u64 lmd_stripe_size; /* size of the stripe */ + __u32 lmd_stripe_count; /* how many objects are being striped */ __u32 lmd_stripe_pattern; /* per-lov object stripe pattern */ struct lov_object_id lmd_objects[0]; }; @@ -400,25 +403,26 @@ struct mds_rec_rename { * LOV data structures */ -#define LOV_RAID0 0 +#define LOV_RAID0 0 +#define LOV_RAIDRR 1 + struct lov_desc { - __u32 ld_tgt_count; /* how many OBD's */ - __u32 ld_default_stripecount; - __u32 ld_default_stripesize; /* in bytes */ - __u32 ld_pattern; /* RAID 0,1 etc */ + __u32 ld_tgt_count; /* how many OBD's */ + __u32 ld_default_stripe_count; /* how many objects are used */ + __u64 ld_default_stripe_size; /* in bytes */ + __u64 ld_default_stripe_offset; /* in bytes */ + __u32 ld_pattern; /* RAID 0,1 etc */ uuid_t ld_uuid; }; /* * LDLM requests: */ - -/* opcodes */ -#define LDLM_REPLY 0 -#define LDLM_ENQUEUE 1 -#define LDLM_CONVERT 2 -#define LDLM_CANCEL 3 -#define LDLM_CALLBACK 4 +/* opcodes -- MUST be distinct from OST/MDS opcodes */ +#define LDLM_ENQUEUE 101 +#define LDLM_CONVERT 102 +#define LDLM_CANCEL 103 +#define LDLM_CALLBACK 104 #define RES_NAME_SIZE 3 #define RES_VERSION_SIZE 4 diff --git a/lustre/include/linux/lustre_lib.h b/lustre/include/linux/lustre_lib.h index 3bb73a0..c58c1e9 100644 --- a/lustre/include/linux/lustre_lib.h +++ b/lustre/include/linux/lustre_lib.h @@ -30,15 +30,20 @@ #else # include #endif - #include #include #ifdef __KERNEL__ /* l_net.c */ struct ptlrpc_request; +struct obd_device; int target_handle_connect(struct ptlrpc_request *req); int target_handle_disconnect(struct ptlrpc_request *req); +int client_obd_connect(struct lustre_handle *conn, struct obd_device *obd); +int client_obd_disconnect(struct lustre_handle *conn); +int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf); +int client_obd_cleanup(struct obd_device * obddev); +struct client_obd *client_conn2cli(struct lustre_handle *conn); /* l_lock.c */ struct lustre_lock { diff --git a/lustre/include/linux/lustre_lite.h b/lustre/include/linux/lustre_lite.h index 41c9093..d74f319 100644 --- a/lustre/include/linux/lustre_lite.h +++ b/lustre/include/linux/lustre_lite.h @@ -82,12 +82,12 @@ static inline struct lustre_handle *ll_s2obdconn(struct super_block *sb) return &(ll_s2sbi(sb))->ll_osc_conn; } -static inline struct mdc_obd *sbi2mdc(struct ll_sb_info *sbi) +static inline struct client_obd *sbi2mdc(struct ll_sb_info *sbi) { struct obd_device *obd = class_conn2obd(&sbi->ll_mdc_conn); if (obd == NULL) LBUG(); - return &obd->u.mdc; + return &obd->u.cli; } static inline struct ll_sb_info *ll_i2sbi(struct inode *inode) diff --git a/lustre/include/linux/lustre_mds.h b/lustre/include/linux/lustre_mds.h index 3ede83d..a834dd3 100644 --- a/lustre/include/linux/lustre_mds.h +++ b/lustre/include/linux/lustre_mds.h @@ -126,12 +126,12 @@ void mds_pack_inode2fid(struct ll_fid *fid, struct inode *inode); void mds_pack_inode2body(struct mds_body *body, struct inode *inode); /* mds/handler.c */ -struct dentry *mds_name2locked_dentry(struct mds_obd *mds, struct dentry *dir, +struct dentry *mds_name2locked_dentry(struct obd_device *obd, struct dentry *dir, struct vfsmount **mnt, char *name, int namelen, int lock_mode, struct lustre_handle *lockh, int dir_lock_mode); -struct dentry *mds_fid2locked_dentry(struct mds_obd *mds, struct ll_fid *fid, +struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, struct vfsmount **mnt, int lock_mode, struct lustre_handle *lockh); struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index db810a1..fffe5a4 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -144,10 +144,6 @@ struct ptlrpc_bulk_page { __u32 b_flags; struct dentry *b_dentry; int (*b_cb)(struct ptlrpc_bulk_page *); - - ptl_md_t b_md; - ptl_handle_md_t b_md_h; - ptl_handle_me_t b_me_h; }; struct ptlrpc_bulk_desc { @@ -162,10 +158,15 @@ struct ptlrpc_bulk_desc { wait_queue_head_t b_waitq; struct list_head b_page_list; __u32 b_page_count; - atomic_t b_pages_remaining; atomic_t b_refcount; void *b_desc_private; struct tq_struct b_queue; + + ptl_md_t b_md; + ptl_handle_md_t b_md_h; + ptl_handle_me_t b_me_h; + + struct iovec b_iov[16]; /* self-sized pre-allocated iov */ }; struct ptlrpc_thread { @@ -209,9 +210,7 @@ static inline void ptlrpc_hdl2req(struct ptlrpc_request *req, struct lustre_hand req->rq_reqmsg->addr = h->addr; req->rq_reqmsg->cookie = h->cookie; } -struct ptlrpc_request *ptlrpc_prep_req2(struct ptlrpc_client *cl, - struct ptlrpc_connection *conn, - struct lustre_handle *handle, +struct ptlrpc_request *ptlrpc_prep_req2(struct lustre_handle *conn, int opcode, int count, int *lengths, char **bufs); diff --git a/lustre/include/linux/obd.h b/lustre/include/linux/obd.h index ac4a75e..9eebcd3 100644 --- a/lustre/include/linux/obd.h +++ b/lustre/include/linux/obd.h @@ -58,27 +58,28 @@ struct filter_obd { struct mds_client_info; struct mds_server_data; -struct mdc_obd { - struct ptlrpc_client *mdc_client; - struct ptlrpc_client *mdc_ldlm_client; - struct ptlrpc_connection *mdc_conn; - int mdc_max_mdsize; - __u8 mdc_target_uuid[37]; +struct client_obd { + struct ptlrpc_client *cl_client; + struct ptlrpc_client *cl_ldlm_client; + struct ptlrpc_connection *cl_conn; + struct lustre_handle cl_exporth; + struct semaphore cl_sem; + int cl_conn_count; + __u8 cl_target_uuid[37]; + int cl_max_mdsize; }; +#if 0 struct osc_obd { struct ptlrpc_client *osc_client; struct ptlrpc_client *osc_ldlm_client; struct ptlrpc_connection *osc_conn; __u8 osc_target_uuid[37]; }; +#endif struct mds_obd { - struct ldlm_namespace *mds_local_namespace; struct ptlrpc_service *mds_service; - struct ptlrpc_client *mds_ldlm_client; /* to be an LDLM client */ - struct ptlrpc_connection *mds_ldlm_conn; /* to be an LDLM client */ - struct lustre_handle mds_connh; /* to be one's on DLM client */ char *mds_fstype; struct super_block *mds_sb; @@ -195,9 +196,9 @@ struct obd_device { struct ext2_obd ext2; struct filter_obd filter; struct mds_obd mds; - struct mdc_obd mdc; + struct client_obd cli; struct ost_obd ost; - struct osc_obd osc; + // struct osc_obd osc; struct ldlm_obd ldlm; struct echo_obd echo; struct recovd_obd recovd; @@ -233,8 +234,10 @@ struct obd_ops { struct lov_stripe_md **ea); int (*o_destroy)(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md *ea); - int (*o_setattr)(struct lustre_handle *conn, struct obdo *oa); - int (*o_getattr)(struct lustre_handle *conn, struct obdo *oa); + int (*o_setattr)(struct lustre_handle *conn, struct obdo *oa, + struct lov_stripe_md *ea); + int (*o_getattr)(struct lustre_handle *conn, struct obdo *oa, + struct lov_stripe_md *ea); int (*o_open)(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md *); int (*o_close)(struct lustre_handle *conn, struct obdo *oa, diff --git a/lustre/include/linux/obd_class.h b/lustre/include/linux/obd_class.h index 906e42c..927385c 100644 --- a/lustre/include/linux/obd_class.h +++ b/lustre/include/linux/obd_class.h @@ -238,14 +238,16 @@ static inline int obd_destroy(struct lustre_handle *conn, struct obdo *obdo, str RETURN(rc); } -static inline int obd_getattr(struct lustre_handle *conn, struct obdo *obdo) +static inline int obd_getattr(struct lustre_handle *conn, + struct obdo *obdo, + struct lov_stripe_md *ea) { int rc; struct obd_export *export; OBD_CHECK_SETUP(conn, export); OBD_CHECK_OP(export->exp_obd,getattr); - rc = OBP(export->exp_obd, getattr)(conn, obdo); + rc = OBP(export->exp_obd, getattr)(conn, obdo, ea); RETURN(rc); } @@ -271,14 +273,16 @@ static inline int obd_open(struct lustre_handle *conn, struct obdo *obdo, RETURN(rc); } -static inline int obd_setattr(struct lustre_handle *conn, struct obdo *obdo) +static inline int obd_setattr(struct lustre_handle *conn, + struct obdo *obdo, + struct lov_stripe_md *ea) { int rc; struct obd_export *export; OBD_CHECK_SETUP(conn, export); OBD_CHECK_OP(export->exp_obd,setattr); - rc = OBP(export->exp_obd, setattr)(conn, obdo); + rc = OBP(export->exp_obd, setattr)(conn, obdo, ea); RETURN(rc); } diff --git a/lustre/ldlm/Makefile.am b/lustre/ldlm/Makefile.am index 606c1dd..d3ef194 100644 --- a/lustre/ldlm/Makefile.am +++ b/lustre/ldlm/Makefile.am @@ -10,11 +10,13 @@ EXTRA_PROGRAMS = ldlm l_lock.c: test -e l_lock.c || ln -sf $(top_srcdir)/lib/l_lock.c +l_net.c: + test -e l_net.c || ln -sf $(top_srcdir)/lib/l_net.c -LINX=l_lock.c +LINX=l_lock.c l_net.c ldlm_SOURCES = l_lock.c ldlm_lock.c ldlm_resource.c ldlm_test.c ldlm_lockd.c \ -ldlm_extent.c ldlm_request.c +ldlm_extent.c ldlm_request.c l_net.c include $(top_srcdir)/Rules diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 75c3cf3..c10d560 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -35,13 +35,50 @@ char *ldlm_typename[] = { [LDLM_MDSINTENT] "INT" }; +char *ldlm_it2str(int it) +{ + switch (it) { + case IT_OPEN: + return "open"; + case IT_CREAT: + return "creat"; + case (IT_OPEN|IT_CREAT): + return "open|creat"; + case IT_MKDIR: + return "mkdir"; + case IT_LINK: + return "link"; + case IT_SYMLINK: + return "symlink"; + case IT_UNLINK: + return "unlink"; + case IT_RMDIR: + return "rmdir"; + case IT_RENAME: + return "rename"; + case IT_RENAME2: + return "rename2"; + case IT_READDIR: + return "readdir"; + case IT_GETATTR: + return "getattr"; + case IT_SETATTR: + return "setattr"; + case IT_READLINK: + return "readlink"; + case IT_MKNOD: + return "mknod"; + case IT_LOOKUP: + return "lookup"; + default: + CERROR("Unknown intent %d\n", it); + return "UNKNOWN"; + } +} + extern kmem_cache_t *ldlm_lock_slab; -int (*mds_reint_p)(int offset, struct ptlrpc_request *req) = NULL; -int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req) = NULL; static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b); -static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie, - ldlm_mode_t mode, void *data); ldlm_res_compat ldlm_res_compat_table [] = { [LDLM_PLAIN] ldlm_plain_compat, @@ -52,9 +89,19 @@ ldlm_res_compat ldlm_res_compat_table [] = { ldlm_res_policy ldlm_res_policy_table [] = { [LDLM_PLAIN] NULL, [LDLM_EXTENT] ldlm_extent_policy, - [LDLM_MDSINTENT] ldlm_intent_policy + [LDLM_MDSINTENT] NULL }; +void ldlm_register_intent(int (*arg)(struct ldlm_lock *lock, void *req_cookie, + ldlm_mode_t mode, void *data)) +{ + ldlm_res_policy_table[LDLM_MDSINTENT] = arg; +} + +void ldlm_unregister_intent() +{ + ldlm_res_policy_table[LDLM_MDSINTENT] = NULL; +} /* * REFCOUNTED LOCK OBJECTS @@ -251,143 +298,6 @@ struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *handle) -static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie, - ldlm_mode_t mode, void *data) -{ - struct ptlrpc_request *req = req_cookie; - int rc = 0; - ENTRY; - - if (!req_cookie) - RETURN(0); - - if (req->rq_reqmsg->bufcount > 1) { - /* an intent needs to be considered */ - struct ldlm_intent *it = lustre_msg_buf(req->rq_reqmsg, 1); - struct mds_obd *mds= &req->rq_export->exp_obd->u.mds; - struct mds_body *mds_rep; - struct ldlm_reply *rep; - __u64 new_resid[3] = {0, 0, 0}, old_res; - int bufcount = -1, rc, size[3] = {sizeof(struct ldlm_reply), - sizeof(struct mds_body), - mds->mds_max_mdsize}; - - it->opc = NTOH__u64(it->opc); - - LDLM_DEBUG(lock, "intent policy, opc: %Ld", it->opc); - - /* prepare reply */ - switch(it->opc) { - case IT_GETATTR: - /* Note that in the negative case you may be returning - * a file and its obdo */ - case IT_CREAT: - case IT_CREAT|IT_OPEN: - case IT_LINK: - case IT_LOOKUP: - case IT_MKDIR: - case IT_MKNOD: - case IT_OPEN: - case IT_READLINK: - case IT_RENAME: - case IT_RMDIR: - case IT_SETATTR: - case IT_SYMLINK: - case IT_UNLINK: - bufcount = 3; - break; - case IT_RENAME2: - bufcount = 1; - break; - default: - LBUG(); - } - - rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen, - &req->rq_repmsg); - if (rc) { - rc = req->rq_status = -ENOMEM; - RETURN(rc); - } - - rep = lustre_msg_buf(req->rq_repmsg, 0); - rep->lock_policy_res1 = 1; - - /* execute policy */ - switch (it->opc) { - case IT_CREAT: - case IT_CREAT|IT_OPEN: - case IT_LINK: - case IT_MKDIR: - case IT_MKNOD: - case IT_RENAME2: - case IT_RMDIR: - case IT_SYMLINK: - case IT_UNLINK: - rc = mds_reint_p(2, req); - if (rc || req->rq_status != 0) { - rep->lock_policy_res2 = req->rq_status; - RETURN(ELDLM_LOCK_ABORTED); - } - break; - case IT_GETATTR: - case IT_LOOKUP: - case IT_OPEN: - case IT_READDIR: - case IT_READLINK: - case IT_RENAME: - case IT_SETATTR: - rc = mds_getattr_name_p(2, req); - /* FIXME: we need to sit down and decide on who should - * set req->rq_status, who should return negative and - * positive return values, and what they all mean. */ - if (rc || req->rq_status != 0) { - rep->lock_policy_res2 = req->rq_status; - RETURN(ELDLM_LOCK_ABORTED); - } - break; - case IT_READDIR|IT_OPEN: - LBUG(); - break; - default: - CERROR("Unhandled intent\n"); - LBUG(); - } - - if (it->opc == IT_UNLINK || it->opc == IT_RMDIR || - it->opc == IT_RENAME || it->opc == IT_RENAME2) - RETURN(ELDLM_LOCK_ABORTED); - - rep->lock_policy_res2 = req->rq_status; - mds_rep = lustre_msg_buf(req->rq_repmsg, 1); - new_resid[0] = NTOH__u32(mds_rep->ino); - if (new_resid[0] == 0) - LBUG(); - old_res = lock->l_resource->lr_name[0]; - - CDEBUG(D_INFO, "remote intent: locking %d instead of" - "%ld\n", mds_rep->ino, (long)old_res); - - ldlm_lock_change_resource(lock, new_resid); - if (lock->l_resource == NULL) { - LBUG(); - RETURN(-ENOMEM); - } - LDLM_DEBUG(lock, "intent policy, old res %ld", - (long)old_res); - RETURN(ELDLM_LOCK_CHANGED); - } else { - int size = sizeof(struct ldlm_reply); - rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, - &req->rq_repmsg); - if (rc) { - CERROR("out of memory\n"); - LBUG(); - RETURN(-ENOMEM); - } - } - RETURN(rc); -} static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b) { @@ -572,6 +482,7 @@ void ldlm_grant_lock(struct ldlm_lock *lock) EXIT; } +/* returns a referenced lock or NULL */ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, struct ldlm_extent *extent) { @@ -604,7 +515,8 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, /* Must be called with no resource or lock locks held. * * Returns 1 if it finds an already-existing lock that is compatible; in this - * case, lockh is filled in with a addref()ed lock */ + * case, lockh is filled in with a addref()ed lock +*/ int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type, void *cookie, int cookielen, ldlm_mode_t mode, struct lustre_handle *lockh) @@ -635,8 +547,8 @@ int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type, if (lock) { ldlm_lock2handle(lock, lockh); - wait_event(lock->l_waitq, - lock->l_req_mode == lock->l_granted_mode); + if (lock->l_completion_ast) + lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC); } if (rc) LDLM_DEBUG(lock, "matched"); @@ -681,7 +593,7 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns, ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock *lock, void *cookie, int cookie_len, int *flags, - ldlm_lock_callback completion, + ldlm_completion_callback completion, ldlm_lock_callback blocking) { struct ldlm_resource *res; @@ -799,8 +711,7 @@ void ldlm_run_ast_work(struct list_head *rpc_list) rc = w->w_lock->l_blocking_ast (&lockh, &w->w_desc, w->w_data, w->w_datalen); else - rc = w->w_lock->l_completion_ast - (&lockh, NULL, w->w_data, w->w_datalen); + rc = w->w_lock->l_completion_ast(w->w_lock, w->w_flags); if (rc) CERROR("Failed AST - should clean & disconnect " "client\n"); @@ -885,7 +796,8 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, res->lr_tmp = NULL; granted = 1; /* FIXME: completion handling not with ns_lock held ! */ - wake_up(&lock->l_waitq); + if (lock->l_completion_ast) + lock->l_completion_ast(lock, 0); } } else list_add(&lock->l_res_link, res->lr_converting.prev); diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 36eb21b..bc659f9 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -21,7 +21,40 @@ extern kmem_cache_t *ldlm_lock_slab; extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req); extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req); -static int ldlm_handle_enqueue(struct ptlrpc_request *req) +int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags) +{ + struct ldlm_request *body; + struct ptlrpc_request *req; + struct ptlrpc_client *cl; + int rc = 0, size = sizeof(*body); + ENTRY; + + if (lock == NULL) { + LBUG(); + RETURN(-EINVAL); + } + + cl = &lock->l_resource->lr_namespace->ns_rpc_client; + req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1, + &size, NULL); + if (!req) + RETURN(-ENOMEM); + + body = lustre_msg_buf(req->rq_reqmsg, 0); + memcpy(&body->lock_handle1, &lock->l_remote_handle, + sizeof(body->lock_handle1)); + body->lock_flags = flags; + + LDLM_DEBUG(lock, "server preparing completion AST"); + req->rq_replen = lustre_msg_size(0, NULL); + + rc = ptlrpc_queue_wait(req); + rc = ptlrpc_check_status(req, rc); + ptlrpc_free_req(req); + RETURN(rc); +} + +int ldlm_handle_enqueue(struct ptlrpc_request *req) { struct obd_device *obddev = req->rq_export->exp_obd; struct ldlm_reply *dlm_rep; @@ -68,7 +101,7 @@ static int ldlm_handle_enqueue(struct ptlrpc_request *req) flags = dlm_req->lock_flags; err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags, - ldlm_server_ast, ldlm_server_ast); + ldlm_server_completion_ast, ldlm_server_ast); if (err != ELDLM_OK) GOTO(out, err); @@ -104,7 +137,7 @@ static int ldlm_handle_enqueue(struct ptlrpc_request *req) return 0; } -static int ldlm_handle_convert(struct ptlrpc_request *req) +int ldlm_handle_convert(struct ptlrpc_request *req) { struct ldlm_request *dlm_req; struct ldlm_reply *dlm_rep; @@ -143,7 +176,7 @@ static int ldlm_handle_convert(struct ptlrpc_request *req) RETURN(0); } -static int ldlm_handle_cancel(struct ptlrpc_request *req) +int ldlm_handle_cancel(struct ptlrpc_request *req) { struct ldlm_request *dlm_req; struct ldlm_lock *lock; @@ -159,6 +192,8 @@ static int ldlm_handle_cancel(struct ptlrpc_request *req) lock = ldlm_handle2lock(&dlm_req->lock_handle1); if (!lock) { + LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock (lock %p)", + (void *)(unsigned long) dlm_req->lock_handle1.addr); req->rq_status = ESTALE; } else { LDLM_DEBUG(lock, "server-side cancel handler START"); @@ -173,9 +208,7 @@ static int ldlm_handle_cancel(struct ptlrpc_request *req) ldlm_reprocess_all(lock->l_resource); LDLM_DEBUG(lock, "server-side cancel handler END"); LDLM_LOCK_PUT(lock); - } else - LDLM_DEBUG_NOLOCK("server-side cancel handler END (lock %p)", - lock); + } RETURN(0); } @@ -242,29 +275,27 @@ static int ldlm_handle_callback(struct ptlrpc_request *req) } LDLM_LOCK_PUT(lock); } else { - struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); + struct list_head ast_list = LIST_HEAD_INIT(ast_list); l_lock(&lock->l_resource->lr_namespace->ns_lock); lock->l_req_mode = dlm_req->lock_desc.l_granted_mode; /* If we receive the completion AST before the actual enqueue * returned, then we might need to switch resources. */ + ldlm_resource_unlink_lock(lock); if (memcmp(dlm_req->lock_desc.l_resource.lr_name, lock->l_resource->lr_name, sizeof(__u64) * RES_NAME_SIZE) != 0) { ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name); LDLM_DEBUG(lock, "completion AST, new resource"); } - lock->l_resource->lr_tmp = &rpc_list; - ldlm_resource_unlink_lock(lock); + lock->l_resource->lr_tmp = &ast_list; ldlm_grant_lock(lock); - /* FIXME: we want any completion function, not just wake_up */ - wake_up(&lock->l_waitq); lock->l_resource->lr_tmp = NULL; l_unlock(&lock->l_resource->lr_namespace->ns_lock); LDLM_LOCK_PUT(lock); - ldlm_run_ast_work(&rpc_list); + ldlm_run_ast_work(&ast_list); } LDLM_DEBUG_NOLOCK("client %s callback handler END (lock %p)", @@ -272,7 +303,8 @@ static int ldlm_handle_callback(struct ptlrpc_request *req) RETURN(0); } -static int ldlm_handle(struct ptlrpc_request *req) + +static int ldlm_callback_handler(struct ptlrpc_request *req) { int rc; ENTRY; @@ -288,31 +320,7 @@ static int ldlm_handle(struct ptlrpc_request *req) req->rq_reqmsg->type); GOTO(out, rc = -EINVAL); } - - if (!req->rq_export && req->rq_reqmsg->opc == LDLM_ENQUEUE) { - CERROR("No export handle for enqueue request.\n"); - GOTO(out, rc = -ENOTCONN); - } - switch (req->rq_reqmsg->opc) { - case LDLM_ENQUEUE: - CDEBUG(D_INODE, "enqueue\n"); - OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0); - rc = ldlm_handle_enqueue(req); - break; - - case LDLM_CONVERT: - CDEBUG(D_INODE, "convert\n"); - OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0); - rc = ldlm_handle_convert(req); - break; - - case LDLM_CANCEL: - CDEBUG(D_INODE, "cancel\n"); - OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0); - rc = ldlm_handle_cancel(req); - break; - case LDLM_CALLBACK: CDEBUG(D_INODE, "callback\n"); OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0); @@ -331,6 +339,7 @@ out: return 0; } + static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len, void *karg, void *uarg) { @@ -385,29 +394,16 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf) MOD_INC_USE_COUNT; ldlm->ldlm_service = ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL, - LDLM_REPLY_PORTAL, "self", ldlm_handle); + LDLM_REPLY_PORTAL, "self", ldlm_callback_handler); if (!ldlm->ldlm_service) { LBUG(); GOTO(out_dec, rc = -ENOMEM); } - if (mds_reint_p == NULL) - mds_reint_p = inter_module_get_request("mds_reint", "mds"); - if (IS_ERR(mds_reint_p)) { - CERROR("MDSINTENT locks require the MDS module.\n"); - GOTO(out_dec, rc = -EINVAL); - } - if (mds_getattr_name_p == NULL) - mds_getattr_name_p = inter_module_get_request - ("mds_getattr_name", "mds"); - if (IS_ERR(mds_getattr_name_p)) { - CERROR("MDSINTENT locks require the MDS module.\n"); - GOTO(out_dec, rc = -EINVAL); - } - for (i = 0; i < LDLM_NUM_THREADS; i++) { - rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service, - "lustre_dlm"); + char name[32]; + sprintf(name, "lustre_dlm_%2d", i); + rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service, name); if (rc) { CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc); LBUG(); @@ -421,10 +417,6 @@ out_thread: ptlrpc_stop_all_threads(ldlm->ldlm_service); ptlrpc_unregister_service(ldlm->ldlm_service); out_dec: - if (mds_reint_p != NULL) - inter_module_put("mds_reint"); - if (mds_getattr_name_p != NULL) - inter_module_put("mds_getattr_name"); MOD_DEC_USE_COUNT; return rc; } @@ -437,11 +429,6 @@ static int ldlm_cleanup(struct obd_device *obddev) ptlrpc_stop_all_threads(ldlm->ldlm_service); ptlrpc_unregister_service(ldlm->ldlm_service); - if (mds_reint_p != NULL) - inter_module_put("mds_reint"); - if (mds_getattr_name_p != NULL) - inter_module_put("mds_getattr_name"); - MOD_DEC_USE_COUNT; RETURN(0); } @@ -487,16 +474,24 @@ static void __exit ldlm_exit(void) CERROR("couldn't free ldlm lock slab\n"); } +EXPORT_SYMBOL(ldlm_completion_ast); +EXPORT_SYMBOL(ldlm_handle_enqueue); +EXPORT_SYMBOL(ldlm_handle_cancel); +EXPORT_SYMBOL(ldlm_handle_convert); +EXPORT_SYMBOL(ldlm_register_intent); +EXPORT_SYMBOL(ldlm_unregister_intent); EXPORT_SYMBOL(ldlm_lockname); EXPORT_SYMBOL(ldlm_typename); EXPORT_SYMBOL(ldlm_handle2lock); EXPORT_SYMBOL(ldlm_lock_match); EXPORT_SYMBOL(ldlm_lock_addref); EXPORT_SYMBOL(ldlm_lock_decref); +EXPORT_SYMBOL(ldlm_lock_change_resource); EXPORT_SYMBOL(ldlm_cli_convert); EXPORT_SYMBOL(ldlm_cli_enqueue); EXPORT_SYMBOL(ldlm_cli_cancel); EXPORT_SYMBOL(ldlm_match_or_enqueue); +EXPORT_SYMBOL(ldlm_it2str); EXPORT_SYMBOL(ldlm_test); EXPORT_SYMBOL(ldlm_lock_dump); EXPORT_SYMBOL(ldlm_namespace_new); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 6ca04c0..5f21751 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -13,8 +13,86 @@ #include -int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, - struct lustre_handle *connh, +int ldlm_completion_ast(struct ldlm_lock *lock, int flags) +{ + + ENTRY; + + if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED | + LDLM_FL_BLOCK_CONV)) { + /* Go to sleep until the lock is granted. */ + /* FIXME: or cancelled. */ + LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock," + " sleeping"); + ldlm_lock_dump(lock); + ldlm_reprocess_all(lock->l_resource); + wait_event(lock->l_waitq, lock->l_req_mode == lock->l_granted_mode); + LDLM_DEBUG(lock, "client-side enqueue waking up: granted"); + } else if (flags == LDLM_FL_WAIT_NOREPROC) { + wait_event(lock->l_waitq, lock->l_req_mode == lock->l_granted_mode); + } else if (flags == 0) { + wake_up(&lock->l_waitq); + + } + + RETURN(0); +} + + +static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, + struct lustre_handle *parent_lockh, + __u64 *res_id, + __u32 type, + void *cookie, int cookielen, + ldlm_mode_t mode, + int *flags, + ldlm_completion_callback completion, + ldlm_lock_callback callback, + void *data, + __u32 data_len, + struct lustre_handle *lockh) +{ + struct ldlm_lock *lock; + int err; + + if (ns->ns_client) { + CERROR("Trying to cancel local lock\n"); + LBUG(); + } + + lock = ldlm_lock_create(ns, parent_lockh, res_id, type, mode, NULL, 0); + if (!lock) + GOTO(out_nolock, err = -ENOMEM); + LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created"); + + ldlm_lock_addref_internal(lock, mode); + ldlm_lock2handle(lock, lockh); + lock->l_connh = NULL; + + err = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion, callback); + if (err != ELDLM_OK) + GOTO(out, err); + + if (type == LDLM_EXTENT) + memcpy(cookie, &lock->l_extent, sizeof(lock->l_extent)); + if ((*flags) & LDLM_FL_LOCK_CHANGED) + memcpy(res_id, lock->l_resource->lr_name, sizeof(*res_id)); + + LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)", lock); + + if (lock->l_completion_ast) + lock->l_completion_ast(lock, *flags); + + LDLM_DEBUG(lock, "client-side local enqueue END"); + EXIT; + out: + LDLM_LOCK_PUT(lock); + out_nolock: + return err; +} + + +int ldlm_cli_enqueue(struct lustre_handle *connh, struct ptlrpc_request *req, struct ldlm_namespace *ns, struct lustre_handle *parent_lock_handle, @@ -23,17 +101,25 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, void *cookie, int cookielen, ldlm_mode_t mode, int *flags, + ldlm_completion_callback completion, ldlm_lock_callback callback, void *data, __u32 data_len, struct lustre_handle *lockh) { + struct ptlrpc_connection *connection; struct ldlm_lock *lock; struct ldlm_request *body; struct ldlm_reply *reply; int rc, size = sizeof(*body), req_passed_in = 1; ENTRY; + if (connh == NULL) + return ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id, + type, cookie, cookielen, mode, + flags, completion, callback, data, data_len, lockh); + connection = client_conn2cli(connh)->cl_conn; + *flags = 0; lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type, mode, data, data_len); @@ -45,8 +131,7 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, ldlm_lock2handle(lock, lockh); if (req == NULL) { - req = ptlrpc_prep_req2(cl, conn, connh, - LDLM_ENQUEUE, 1, &size, NULL); + req = ptlrpc_prep_req2(connh, LDLM_ENQUEUE, 1, &size, NULL); if (!req) GOTO(out, rc = -ENOMEM); req_passed_in = 0; @@ -72,21 +157,9 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, size = sizeof(*reply); req->rq_replen = lustre_msg_size(1, &size); } - - lock->l_connection = ptlrpc_connection_addref(conn); - lock->l_client = cl; - - /* I apologize in advance for what I am about to do. - * - * The MDS needs to send lock requests to itself, but it doesn't want to - * go through the whole hassle of setting up proper connections. Soon, - * these will just be function calls, but until I have the time, just - * set this connection to level FULL so that they go through. - * - * Since the MDS is the only user of PLAIN locks right now, we can use - * that to distinguish. Sorry. */ - if (type == LDLM_PLAIN) - conn->c_level = LUSTRE_CONN_FULL; + lock->l_connh = connh; + lock->l_connection = ptlrpc_connection_addref(connection); + lock->l_client = client_conn2cli(connh)->cl_client; rc = ptlrpc_queue_wait(req); /* FIXME: status check here? */ @@ -135,21 +208,11 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, if (!req_passed_in) ptlrpc_free_req(req); - rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, callback, + rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion, callback); + if (lock->l_completion_ast) + lock->l_completion_ast(lock, *flags); - if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED | - LDLM_FL_BLOCK_CONV)) { - /* Go to sleep until the lock is granted. */ - /* FIXME: or cancelled. */ - LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock," - " sleeping"); - ldlm_lock_dump(lock); -#warning ldlm needs to time out - wait_event(lock->l_waitq, - lock->l_req_mode == lock->l_granted_mode); - LDLM_DEBUG(lock, "client-side enqueue waking up: granted"); - } LDLM_DEBUG(lock, "client-side enqueue END"); EXIT; out: @@ -158,9 +221,7 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, return rc; } -int ldlm_match_or_enqueue(struct ptlrpc_client *cl, - struct ptlrpc_connection *conn, - struct lustre_handle *connh, +int ldlm_match_or_enqueue(struct lustre_handle *connh, struct ptlrpc_request *req, struct ldlm_namespace *ns, struct lustre_handle *parent_lock_handle, @@ -169,6 +230,7 @@ int ldlm_match_or_enqueue(struct ptlrpc_client *cl, void *cookie, int cookielen, ldlm_mode_t mode, int *flags, + ldlm_completion_callback completion, ldlm_lock_callback callback, void *data, __u32 data_len, @@ -178,9 +240,9 @@ int ldlm_match_or_enqueue(struct ptlrpc_client *cl, ENTRY; rc = ldlm_lock_match(ns, res_id, type, cookie, cookielen, mode, lockh); if (rc == 0) { - rc = ldlm_cli_enqueue(cl, conn, connh, req, ns, + rc = ldlm_cli_enqueue(connh, req, ns, parent_lock_handle, res_id, type, cookie, - cookielen, mode, flags, callback, data, + cookielen, mode, flags, completion, callback, data, data_len, lockh); if (rc != ELDLM_OK) CERROR("ldlm_cli_enqueue: err: %d\n", rc); @@ -237,11 +299,29 @@ int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *desc, return rc; } -int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh, - struct lustre_handle *connh, - int new_mode, int *flags) + + +static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode, int *flags) +{ + + if (lock->l_resource->lr_namespace->ns_client) { + CERROR("Trying to cancel local lock\n"); + LBUG(); + } + LDLM_DEBUG(lock, "client-side local convert"); + + ldlm_lock_convert(lock, new_mode, flags); + ldlm_reprocess_all(lock->l_resource); + + LDLM_DEBUG(lock, "client-side local convert handler END"); + LDLM_LOCK_PUT(lock); + RETURN(0); +} + +int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags) { struct ldlm_request *body; + struct lustre_handle *connh; struct ldlm_reply *reply; struct ldlm_lock *lock; struct ldlm_resource *res; @@ -255,11 +335,14 @@ int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh, RETURN(-EINVAL); } *flags = 0; + connh = lock->l_connh; + + if (!connh) + return ldlm_cli_convert_local(lock, new_mode, flags); LDLM_DEBUG(lock, "client-side convert"); - req = ptlrpc_prep_req(cl, lock->l_connection, - LDLM_CONVERT, 1, &size, NULL); + req = ptlrpc_prep_req2(connh, LDLM_CONVERT, 1, &size, NULL); if (!req) GOTO(out, rc = -ENOMEM); @@ -282,15 +365,10 @@ int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh, res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags); if (res != NULL) ldlm_reprocess_all(res); - if (lock->l_req_mode != lock->l_granted_mode) { - /* Go to sleep until the lock is granted. */ - /* FIXME: or cancelled. */ - CDEBUG(D_NET, "convert returned a blocked lock, " - "going to sleep.\n"); - wait_event(lock->l_waitq, - lock->l_req_mode == lock->l_granted_mode); - CDEBUG(D_NET, "waking up, the lock must be granted.\n"); - } + /* Go to sleep until the lock is granted. */ + /* FIXME: or cancelled. */ + if (lock->l_completion_ast) + lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC); EXIT; out: LDLM_LOCK_PUT(lock); @@ -303,7 +381,7 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) struct ptlrpc_request *req; struct ldlm_lock *lock; struct ldlm_request *body; - int rc, size = sizeof(*body); + int rc = 0, size = sizeof(*body); ENTRY; lock = ldlm_handle2lock(lockh); @@ -317,25 +395,36 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) RETURN(-EINVAL); } - LDLM_DEBUG(lock, "client-side cancel"); - req = ptlrpc_prep_req(lock->l_client, lock->l_connection, - LDLM_CANCEL, 1, &size, NULL); - if (!req) - GOTO(out, rc = -ENOMEM); - - body = lustre_msg_buf(req->rq_reqmsg, 0); - memcpy(&body->lock_handle1, &lock->l_remote_handle, - sizeof(body->lock_handle1)); - - req->rq_replen = lustre_msg_size(0, NULL); - - rc = ptlrpc_queue_wait(req); - rc = ptlrpc_check_status(req, rc); - ptlrpc_free_req(req); - if (rc != ELDLM_OK) - GOTO(out, rc); + if (lock->l_connh) { + LDLM_DEBUG(lock, "client-side cancel"); + req = ptlrpc_prep_req2(lock->l_connh, LDLM_CANCEL, 1, &size, NULL); + if (!req) + GOTO(out, rc = -ENOMEM); + + body = lustre_msg_buf(req->rq_reqmsg, 0); + memcpy(&body->lock_handle1, &lock->l_remote_handle, + sizeof(body->lock_handle1)); + + req->rq_replen = lustre_msg_size(0, NULL); + + rc = ptlrpc_queue_wait(req); + rc = ptlrpc_check_status(req, rc); + ptlrpc_free_req(req); + if (rc != ELDLM_OK) + GOTO(out, rc); + + ldlm_lock_cancel(lock); + } else { + LDLM_DEBUG(lock, "client-side local cancel"); + if (lock->l_resource->lr_namespace->ns_client) { + CERROR("Trying to cancel local lock\n"); + LBUG(); + } + ldlm_lock_cancel(lock); + ldlm_reprocess_all(lock->l_resource); + LDLM_DEBUG(lock, "client-side local cancel handler END"); + } - ldlm_lock_cancel(lock); EXIT; out: LDLM_LOCK_PUT(lock); diff --git a/lustre/ldlm/ldlm_test.c b/lustre/ldlm/ldlm_test.c index 8cd970a..6b5daba 100644 --- a/lustre/ldlm/ldlm_test.c +++ b/lustre/ldlm/ldlm_test.c @@ -26,8 +26,6 @@ struct ldlm_test_thread { static spinlock_t ctl_lock = SPIN_LOCK_UNLOCKED; static struct list_head ctl_threads; static int regression_running = 0; -static struct ptlrpc_client ctl_client; -static struct ptlrpc_connection *ctl_conn; static int ldlm_test_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *new, @@ -63,7 +61,7 @@ int ldlm_test_basics(struct obd_device *obddev) if (lock1 == NULL) LBUG(); err = ldlm_lock_enqueue(lock1, NULL, 0, &flags, - ldlm_test_callback, ldlm_test_callback); + ldlm_completion_ast, ldlm_test_callback); if (err != ELDLM_OK) LBUG(); @@ -71,7 +69,7 @@ int ldlm_test_basics(struct obd_device *obddev) if (lock == NULL) LBUG(); err = ldlm_lock_enqueue(lock, NULL, 0, &flags, - ldlm_test_callback, ldlm_test_callback); + ldlm_completion_ast, ldlm_test_callback); if (err != ELDLM_OK) LBUG(); if (!(flags & LDLM_FL_BLOCK_GRANTED)) @@ -162,7 +160,6 @@ int ldlm_test_extents(struct obd_device *obddev) static int ldlm_test_network(struct obd_device *obddev, struct ptlrpc_connection *conn) { - struct ldlm_obd *ldlm = &obddev->u.ldlm; __u64 res_id[RES_NAME_SIZE] = {1, 2, 3}; struct ldlm_extent ext = {4, 6}; @@ -172,9 +169,8 @@ static int ldlm_test_network(struct obd_device *obddev, /* FIXME: this needs a connh as 3rd paramter, before it will work */ - err = ldlm_cli_enqueue(ldlm->ldlm_client, conn, NULL, NULL, - obddev->obd_namespace, NULL, res_id, LDLM_EXTENT, - &ext, sizeof(ext), LCK_PR, &flags, NULL, NULL, 0, + err = ldlm_cli_enqueue(NULL, NULL, obddev->obd_namespace, NULL, res_id, LDLM_EXTENT, + &ext, sizeof(ext), LCK_PR, &flags, NULL, NULL, NULL, 0, &lockh1); CERROR("ldlm_cli_enqueue: %d\n", err); if (err == ELDLM_OK) @@ -223,9 +219,9 @@ static int ldlm_test_main(void *data) get_random_bytes(&random, sizeof(random)); lock_mode = random % LCK_NL + 1; - rc = ldlm_cli_enqueue(&ctl_client, ctl_conn, NULL, NULL, ns, NULL, + rc = ldlm_cli_enqueue(NULL, NULL, ns, NULL, res_id, LDLM_PLAIN, NULL, 0, lock_mode, - &flags, ldlm_test_callback, NULL, 0, + &flags, ldlm_completion_ast, ldlm_test_callback, NULL, 0, &lockh); if (rc < 0) { CERROR("ldlm_cli_enqueue: %d\n", rc); diff --git a/lustre/lib/l_net.c b/lustre/lib/l_net.c index f08a626..b7ed75c 100644 --- a/lustre/lib/l_net.c +++ b/lustre/lib/l_net.c @@ -38,6 +38,201 @@ #include #include +struct client_obd *client_conn2cli(struct lustre_handle *conn) +{ + struct obd_export *export = class_conn2export(conn); + if (!export) + LBUG(); + return &export->exp_obd->u.cli; +} +extern struct recovd_obd *ptlrpc_connmgr; + +int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf) +{ + struct obd_ioctl_data* data = buf; + int rq_portal = (obddev->obd_type->typ_ops->o_getattr) ? OST_REQUEST_PORTAL : MDS_REQUEST_PORTAL; + int rp_portal = (obddev->obd_type->typ_ops->o_getattr) ? OSC_REPLY_PORTAL : MDC_REPLY_PORTAL; + struct client_obd *mdc = &obddev->u.cli; + char server_uuid[37]; + int rc; + ENTRY; + + if (data->ioc_inllen1 < 1) { + CERROR("requires a TARGET UUID\n"); + RETURN(-EINVAL); + } + + if (data->ioc_inllen1 > 37) { + CERROR("client UUID must be less than 38 characters\n"); + RETURN(-EINVAL); + } + + if (data->ioc_inllen2 < 1) { + CERROR("setup requires a SERVER UUID\n"); + RETURN(-EINVAL); + } + + if (data->ioc_inllen2 > 37) { + CERROR("target UUID must be less than 38 characters\n"); + RETURN(-EINVAL); + } + + sema_init(&mdc->cl_sem, 1); + mdc->cl_conn_count = 0; + memcpy(mdc->cl_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1); + memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2, + sizeof(server_uuid))); + + mdc->cl_conn = ptlrpc_uuid_to_connection(server_uuid); + if (!mdc->cl_conn) + RETURN(-ENOENT); + + OBD_ALLOC(mdc->cl_client, sizeof(*mdc->cl_client)); + if (mdc->cl_client == NULL) + GOTO(out_conn, rc = -ENOMEM); + + OBD_ALLOC(mdc->cl_ldlm_client, sizeof(*mdc->cl_ldlm_client)); + if (mdc->cl_ldlm_client == NULL) + GOTO(out_client, rc = -ENOMEM); + + /* XXX get recovery hooked in here again */ + //ptlrpc_init_client(ptlrpc_connmgr, ll_recover,... + + ptlrpc_init_client(NULL, NULL, rq_portal, rp_portal, mdc->cl_client); + ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, + mdc->cl_ldlm_client); + mdc->cl_client->cli_name = "mdc"; + mdc->cl_ldlm_client->cli_name = "ldlm"; + mdc->cl_max_mdsize = sizeof(struct lov_stripe_md); + + MOD_INC_USE_COUNT; + RETURN(0); + + out_client: + OBD_FREE(mdc->cl_client, sizeof(*mdc->cl_client)); + out_conn: + ptlrpc_put_connection(mdc->cl_conn); + return rc; +} + +int client_obd_cleanup(struct obd_device * obddev) +{ + struct client_obd *mdc = &obddev->u.cli; + + ptlrpc_cleanup_client(mdc->cl_client); + OBD_FREE(mdc->cl_client, sizeof(*mdc->cl_client)); + ptlrpc_cleanup_client(mdc->cl_ldlm_client); + OBD_FREE(mdc->cl_ldlm_client, sizeof(*mdc->cl_ldlm_client)); + ptlrpc_put_connection(mdc->cl_conn); + + MOD_DEC_USE_COUNT; + return 0; +} + +int client_obd_connect(struct lustre_handle *conn, struct obd_device *obd) +{ + struct client_obd *cli = &obd->u.cli; + struct ptlrpc_request *request; + int rc, size = sizeof(cli->cl_target_uuid); + char *tmp = cli->cl_target_uuid; + int rq_opc = (obd->obd_type->typ_ops->o_getattr) ? OST_CONNECT : MDS_CONNECT; + + ENTRY; + down(&cli->cl_sem); + MOD_INC_USE_COUNT; + rc = class_connect(conn, obd); + if (!rc) + cli->cl_conn_count++; + else { + MOD_DEC_USE_COUNT; + up(&cli->cl_sem); + RETURN(rc); + } + + if (cli->cl_conn_count > 1) { + up(&cli->cl_sem); + RETURN(0); + } + + obd->obd_namespace = + ldlm_namespace_new("cli", LDLM_NAMESPACE_CLIENT); + if (obd->obd_namespace == NULL) { + up(&cli->cl_sem); + RETURN(-ENOMEM); + } + + request = ptlrpc_prep_req(cli->cl_client, cli->cl_conn, rq_opc, 1, &size, &tmp); + if (!request) + GOTO(out_disco, -ENOMEM); + + request->rq_level = LUSTRE_CONN_NEW; + request->rq_replen = lustre_msg_size(0, NULL); + /* Sending our local connection info breaks for local connections + request->rq_reqmsg->addr = conn->addr; + request->rq_reqmsg->cookie = conn->cookie; + */ + + rc = ptlrpc_queue_wait(request); + rc = ptlrpc_check_status(request, rc); + if (rc) + GOTO(out, rc); + + request->rq_connection->c_level = LUSTRE_CONN_FULL; + cli->cl_exporth = *(struct lustre_handle *)request->rq_repmsg; + + EXIT; + out: + ptlrpc_free_req(request); + out_disco: + if (rc) { + class_disconnect(conn); + MOD_DEC_USE_COUNT; + } + up(&cli->cl_sem); + return rc; +} + +int client_obd_disconnect(struct lustre_handle *conn) +{ + struct obd_device *obd = class_conn2obd(conn); + int rq_opc = (obd->obd_type->typ_ops->o_getattr) ? OST_DISCONNECT : MDS_DISCONNECT; + struct ptlrpc_request *request = NULL; + int rc; + ENTRY; + + down(&obd->u.cli.cl_sem); + if (!obd->u.cli.cl_conn_count) { + CERROR("disconnecting disconnected device (%s)\n", + obd->obd_name); + RETURN(0); + } + + obd->u.cli.cl_conn_count--; + if (obd->u.cli.cl_conn_count) + GOTO(class_only, 0); + + ldlm_namespace_free(obd->obd_namespace); + obd->obd_namespace = NULL; + request = ptlrpc_prep_req2(conn, rq_opc, 0, NULL, NULL); + if (!request) + RETURN(-ENOMEM); + + request->rq_replen = lustre_msg_size(0, NULL); + + rc = ptlrpc_queue_wait(request); + if (rc) + GOTO(out, rc); + class_only: + rc = class_disconnect(conn); + if (!rc) + MOD_DEC_USE_COUNT; + out: + if (request) + ptlrpc_free_req(request); + up(&obd->u.cli.cl_sem); + RETURN(rc); +} + int target_handle_connect(struct ptlrpc_request *req) { struct obd_device *target; diff --git a/lustre/lib/lov_pack.c b/lustre/lib/lov_pack.c index ad51941..d8b9b3e 100644 --- a/lustre/lib/lov_pack.c +++ b/lustre/lib/lov_pack.c @@ -30,15 +30,15 @@ void lov_packdesc(struct lov_desc *ld) { ld->ld_tgt_count = HTON__u32(ld->ld_tgt_count); - ld->ld_default_stripecount = HTON__u32(ld->ld_default_stripecount); - ld->ld_default_stripesize = HTON__u32(ld->ld_default_stripesize); + ld->ld_default_stripe_count = HTON__u32(ld->ld_default_stripe_count); + ld->ld_default_stripe_size = HTON__u32(ld->ld_default_stripe_size); ld->ld_pattern = HTON__u32(ld->ld_pattern); } void lov_unpackdesc(struct lov_desc *ld) { ld->ld_tgt_count = NTOH__u32(ld->ld_tgt_count); - ld->ld_default_stripecount = HTON__u32(ld->ld_default_stripecount); - ld->ld_default_stripesize = HTON__u32(ld->ld_default_stripesize); + ld->ld_default_stripe_count = HTON__u32(ld->ld_default_stripe_count); + ld->ld_default_stripe_size = HTON__u32(ld->ld_default_stripe_size); ld->ld_pattern = HTON__u32(ld->ld_pattern); } diff --git a/lustre/llite/file.c b/lustre/llite/file.c index cca46cc..41ff0b8 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -49,7 +49,7 @@ static int ll_file_open(struct inode *inode, struct file *file) /* XXX object needs to be cleaned up if mdc_open fails */ /* XXX error handling appropriate here? */ if (lli->lli_smd == NULL || lli->lli_smd->lmd_object_id == 0) { - struct mdc_obd *mdc = sbi2mdc(ll_s2sbi(inode->i_sb)); + struct client_obd *mdc = sbi2mdc(ll_s2sbi(inode->i_sb)); struct inode * inode = file->f_dentry->d_inode; lli->lli_smd = NULL; @@ -59,7 +59,7 @@ static int ll_file_open(struct inode *inode, struct file *file) } oa->o_valid = OBD_MD_FLMODE; oa->o_mode = S_IFREG | 0600; - oa->o_easize = mdc->mdc_max_mdsize; + oa->o_easize = mdc->cl_max_mdsize; rc = obd_create(ll_i2obdconn(inode), oa, &lli->lli_smd); if (rc) RETURN(rc); diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 1ed4375..e8a8ee9 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -356,7 +356,7 @@ static int ll_create(struct inode * dir, struct dentry * dentry, int mode) struct obdo oa; struct inode *inode; struct lov_stripe_md *smd; - struct ll_inode_info *ii; + struct ll_inode_info *ii = NULL; if (dentry->d_it->it_disposition == 0) { memset(&oa, 0, sizeof(oa)); @@ -395,7 +395,7 @@ static int ll_create(struct inode * dir, struct dentry * dentry, int mode) RETURN(rc); out_destroy: - oa.o_easize = ii->lli_smd->lmd_size; + oa.o_easize = ii->lli_smd->lmd_easize; err = obd_destroy(ll_i2obdconn(dir), &oa, ii->lli_smd); if (err) CERROR("error destroying object %Ld in error path: err = %d\n", diff --git a/lustre/llite/recover.c b/lustre/llite/recover.c index d642e3d..122ff36 100644 --- a/lustre/llite/recover.c +++ b/lustre/llite/recover.c @@ -31,15 +31,15 @@ static int ll_reconnect(struct ll_sb_info *sbi) int err; struct ptlrpc_request *request; - ptlrpc_readdress_connection(sbi2mdc(sbi)->mdc_conn, "mds"); + ptlrpc_readdress_connection(sbi2mdc(sbi)->cl_conn, "mds"); - err = connmgr_connect(ptlrpc_connmgr, sbi2mdc(sbi)->mdc_conn); + err = connmgr_connect(ptlrpc_connmgr, sbi2mdc(sbi)->cl_conn); if (err) { CERROR("cannot connect to MDS: rc = %d\n", err); - ptlrpc_put_connection(sbi2mdc(sbi)->mdc_conn); + ptlrpc_put_connection(sbi2mdc(sbi)->cl_conn); GOTO(out_disc, err = -ENOTCONN); } - sbi2mdc(sbi)->mdc_conn->c_level = LUSTRE_CONN_CON; + sbi2mdc(sbi)->cl_conn->c_level = LUSTRE_CONN_CON; /* XXX: need to store the last_* values somewhere */ err = mdc_getstatus(&sbi->ll_mdc_conn, @@ -51,8 +51,8 @@ static int ll_reconnect(struct ll_sb_info *sbi) CERROR("cannot mds_connect: rc = %d\n", err); GOTO(out_disc, err = -ENOTCONN); } - sbi2mdc(sbi)->mdc_client->cli_last_rcvd = last_xid; - sbi2mdc(sbi)->mdc_conn->c_level = LUSTRE_CONN_RECOVD; + sbi2mdc(sbi)->cl_client->cli_last_rcvd = last_xid; + sbi2mdc(sbi)->cl_conn->c_level = LUSTRE_CONN_RECOVD; out_disc: return err; @@ -126,7 +126,7 @@ int ll_recover(struct ptlrpc_client *cli) } - sbi2mdc(sbi)->mdc_conn->c_level = LUSTRE_CONN_FULL; + sbi2mdc(sbi)->cl_conn->c_level = LUSTRE_CONN_FULL; recovd_cli_fixed(cli); /* Finally, continue what we delayed since recovery started */ diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index 32b45de..5ce6087 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -200,11 +200,14 @@ void ll_truncate(struct inode *inode) CDEBUG(D_INFO, "calling punch for %ld (all bytes after %Ld)\n", (long)oa.o_id, (unsigned long long)oa.o_size); + oa.o_size = inode->i_size; oa.o_id = md->lmd_object_id; - oa.o_valid = OBD_MD_FLSIZE | OBD_MD_FLID; - /* truncate == punch from i_size onwards */ - err = obd_punch(ll_i2obdconn(inode), &oa, md, -1 - oa.o_size, oa.o_size); + oa.o_valid = OBD_MD_FLID; + /* truncate == punch to/from start from/to end: + set end to -1 for that. */ + err = obd_punch(ll_i2obdconn(inode), &oa, md, oa.o_size, + 0xffffffffffffffff); if (err) CERROR("obd_truncate fails (%d)\n", err); else diff --git a/lustre/llite/super.c b/lustre/llite/super.c index bb32159..4e22cdc 100644 --- a/lustre/llite/super.c +++ b/lustre/llite/super.c @@ -64,8 +64,8 @@ static void ll_options(char *options, char **ost, char **mds) this_char != NULL; this_char = strtok (NULL, ",")) { CDEBUG(D_SUPER, "this_char %s\n", this_char); - if ( (!*ost && (*ost = ll_read_opt("ost", this_char)))|| - (!*mds && (*mds = ll_read_opt("mds", this_char))) ) + if ( (!*ost && (*ost = ll_read_opt("osc", this_char)))|| + (!*mds && (*mds = ll_read_opt("mdc", this_char))) ) continue; } EXIT; @@ -81,8 +81,8 @@ static struct super_block * ll_read_super(struct super_block *sb, struct inode *root = 0; struct obd_device *obd; struct ll_sb_info *sbi; - char *ost = NULL; - char *mds = NULL; + char *osc = NULL; + char *mdc = NULL; int err; struct ll_fid rootfid; struct statfs sfs; @@ -102,47 +102,47 @@ static struct super_block * ll_read_super(struct super_block *sb, sb->u.generic_sbp = sbi; - ll_options(data, &ost, &mds); + ll_options(data, &osc, &mdc); - if (!ost) { - CERROR("no ost\n"); + if (!osc) { + CERROR("no osc\n"); GOTO(out_free, sb = NULL); } - if (!mds) { - CERROR("no mds\n"); + if (!mdc) { + CERROR("no mdc\n"); GOTO(out_free, sb = NULL); } - obd = class_uuid2obd(mds); + obd = class_uuid2obd(mdc); if (!obd) { - CERROR("MDS %s: not setup or attached\n", mds); + CERROR("MDC %s: not setup or attached\n", mdc); GOTO(out_free, sb = NULL); } #if 0 - err = connmgr_connect(ptlrpc_connmgr, sbi->ll_mds_conn); + err = connmgr_connect(ptlrpc_connmgr, sbi->ll_mdc_conn); if (err) { - CERROR("cannot connect to MDS: rc = %d\n", err); + CERROR("cannot connect to MDC: rc = %d\n", err); GOTO(out_rpc, sb = NULL); } #endif err = obd_connect(&sbi->ll_mdc_conn, obd); if (err) { - CERROR("cannot connect to %s: rc = %d\n", mds, err); + CERROR("cannot connect to %s: rc = %d\n", mdc, err); GOTO(out_free, sb = NULL); } - sbi2mdc(sbi)->mdc_conn->c_level = LUSTRE_CONN_FULL; + sbi2mdc(sbi)->cl_conn->c_level = LUSTRE_CONN_FULL; - obd = class_uuid2obd(ost); + obd = class_uuid2obd(osc); if (!obd) { - CERROR("OST %s: not setup or attached\n", ost); + CERROR("OSC %s: not setup or attached\n", osc); GOTO(out_mdc, sb = NULL); } err = obd_connect(&sbi->ll_osc_conn, obd); if (err) { - CERROR("cannot connect to %s: rc = %d\n", ost, err); + CERROR("cannot connect to %s: rc = %d\n", osc, err); GOTO(out_mdc, sb = NULL); } @@ -150,7 +150,7 @@ static struct super_block * ll_read_super(struct super_block *sb, err = mdc_getstatus(&sbi->ll_mdc_conn, &rootfid, &last_committed, &last_rcvd, &last_xid, &request); if (err) { - CERROR("cannot mds_connect: rc = %d\n", err); + CERROR("cannot mdc_connect: rc = %d\n", err); GOTO(out_request, sb = NULL); } CDEBUG(D_SUPER, "rootfid %Ld\n", (unsigned long long)rootfid.id); @@ -199,10 +199,10 @@ static struct super_block * ll_read_super(struct super_block *sb, ptlrpc_free_req(request); out_dev: - if (mds) - OBD_FREE(mds, strlen(mds) + 1); - if (ost) - OBD_FREE(ost, strlen(ost) + 1); + if (mdc) + OBD_FREE(mdc, strlen(mdc) + 1); + if (osc) + OBD_FREE(osc, strlen(osc) + 1); RETURN(sb); @@ -240,7 +240,7 @@ static void ll_clear_inode(struct inode *inode) struct lov_stripe_md *md = lli->lli_smd; if (md) { - OBD_FREE(md, md->lmd_size); + OBD_FREE(md, md->lmd_easize); lli->lli_smd = NULL; } if (lli->lli_symlink_name) { @@ -262,7 +262,7 @@ static void ll_delete_inode(struct inode *inode) GOTO(out, -EINVAL); oa.o_id = md->lmd_object_id; - oa.o_easize = md->lmd_size; + oa.o_easize = md->lmd_easize; if (oa.o_id == 0) { CERROR("This really happens\n"); /* No obdo was ever created */ @@ -382,8 +382,8 @@ out: inline int ll_stripe_md_size(struct super_block *sb) { - struct mdc_obd *mdc = sbi2mdc(ll_s2sbi(sb)); - return mdc->mdc_max_mdsize; + struct client_obd *mdc = sbi2mdc(ll_s2sbi(sb)); + return mdc->cl_max_mdsize; } static void ll_to_inode(struct inode *dst, struct ll_inode_md *md) @@ -419,7 +419,7 @@ static void ll_to_inode(struct inode *dst, struct ll_inode_md *md) if (md && md->md && md->md->lmd_stripe_count) { struct lov_stripe_md *smd = md->md; int size = ll_stripe_md_size(dst->i_sb); - if (md->md->lmd_size != size) { + if (md->md->lmd_easize != size) { CERROR("Striping metadata size error %ld\n", dst->i_ino); LBUG(); diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 2cd84ab..a6cae4b 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -5,6 +5,7 @@ * * Copyright (C) 2002 Cluster File Systems, Inc. * Author: Phil Schwan + * Peter Braam * * This code is issued under the GNU General Public License. * See the file COPYING in this distribution @@ -27,14 +28,13 @@ extern struct obd_device obd_dev[MAX_OBD_DEVICES]; /* obd methods */ - static int lov_connect(struct lustre_handle *conn, struct obd_device *obd) { struct ptlrpc_request *req; struct lov_obd *lov = &obd->u.lov; struct lustre_handle mdc_conn; uuid_t *uuidarray; - int rc; + int rc, rc2; int i; MOD_INC_USE_COUNT; @@ -44,6 +44,7 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd) RETURN(rc); } + /* retrieve LOV metadata from MDS */ rc = obd_connect(&mdc_conn, lov->mdcobd); if (rc) { CERROR("cannot connect to mdc: rc = %d\n", rc); @@ -51,26 +52,23 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd) } rc = mdc_getlovinfo(obd, &mdc_conn, &uuidarray, &req); - obd_disconnect(&mdc_conn); - - if (rc) { - CERROR("cannot get lov info %d\n", rc); - GOTO(out, rc); - } - - - if (lov->desc.ld_tgt_count > 1000) { - CERROR("configuration error: target count > 1000 (%d)\n", - lov->desc.ld_tgt_count); - GOTO(out, rc = -EINVAL); + rc2 = obd_disconnect(&mdc_conn); + if (rc || rc2) { + CERROR("cannot get lov info or disconnect %d/%d\n", rc, rc2); + GOTO(out, (rc) ? rc : rc2 ); } + /* sanity... */ if (strcmp(obd->obd_uuid, lov->desc.ld_uuid)) { CERROR("lov uuid %s not on mds device (%s)\n", obd->obd_uuid, lov->desc.ld_uuid); GOTO(out, rc = -EINVAL); } - + if (lov->desc.ld_tgt_count > 1000) { + CERROR("configuration error: target count > 1000 (%d)\n", + lov->desc.ld_tgt_count); + GOTO(out, rc = -EINVAL); + } if (req->rq_repmsg->bufcount < 2 || req->rq_repmsg->buflens[1] < sizeof(uuid_t) * lov->desc.ld_tgt_count) { CERROR("invalid uuid array returned\n"); @@ -91,9 +89,13 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd) for (i = 0 ; i < lov->desc.ld_tgt_count; i++) { struct obd_device *tgt = class_uuid2obd(uuidarray[i]); if (!tgt) { - CERROR("Target %s not configured\n", uuidarray[i]); + CERROR("Target %s not attached\n", uuidarray[i]); GOTO(out_mem, rc = -EINVAL); } + if (!(tgt->obd_flags & OBD_SET_UP)) { + CERROR("Target %s not set up\n", uuidarray[i]); + GOTO(out_mem, rc = -EINVAL); + } rc = obd_connect(&lov->tgts[i].conn, tgt); if (rc) { CERROR("Target %s connect error %d\n", @@ -105,7 +107,6 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd) out_mem: if (rc) { for (i = 0 ; i < lov->desc.ld_tgt_count; i++) { - int rc2; rc2 = obd_disconnect(&lov->tgts[i].conn); if (rc2) CERROR("BAD: Target %s disconnect error %d\n", @@ -116,7 +117,6 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd) out: if (rc) class_disconnect(conn); - ptlrpc_free_req(req); return rc; } @@ -144,11 +144,9 @@ static int lov_disconnect(struct lustre_handle *conn) lov->tgts = NULL; out_local: - rc = class_disconnect(conn); if (!rc) MOD_DEC_USE_COUNT; - return rc; } @@ -169,8 +167,6 @@ static int lov_setup(struct obd_device *obd, obd_count len, void *buf) RETURN(-EINVAL); } - /* FIXME: we should make a connection instead perhaps to avoid - the mdc from walking away? The fs guarantees this. */ lov->mdcobd = class_uuid2obd(data->ioc_inlbuf1); if (!lov->mdcobd) { CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid, @@ -191,6 +187,7 @@ static inline int lov_stripe_md_size(struct obd_device *obd) return size; } +/* the LOV counts on oa->o_id to be set as the LOV object id */ static int lov_create(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md **ea) { int rc = 0, i; @@ -204,7 +201,6 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa, struct lov_st CERROR("lov_create needs EA for striping information\n"); RETURN(-EINVAL); } - if (!export) RETURN(-EINVAL); lov = &export->exp_obd->u.lov; @@ -217,10 +213,10 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa, struct lov_st } md = *ea; - md->lmd_size = oa->o_easize; + md->lmd_easize = oa->o_easize; md->lmd_object_id = oa->o_id; if (!md->lmd_stripe_count) { - md->lmd_stripe_count = lov->desc.ld_default_stripecount; + md->lmd_stripe_count = lov->desc.ld_default_stripe_count; } for (i = 0; i < md->lmd_stripe_count; i++) { @@ -252,16 +248,15 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa, struct lov_st } static int lov_destroy(struct lustre_handle *conn, struct obdo *oa, -struct lov_stripe_md *ea) + struct lov_stripe_md *md) { int rc = 0, i; struct obdo tmp; struct obd_export *export = class_conn2export(conn); struct lov_obd *lov; - struct lov_stripe_md *md; ENTRY; - if (!ea) { + if (!md) { CERROR("LOV requires striping ea for desctruction\n"); RETURN(-EINVAL); } @@ -270,8 +265,6 @@ struct lov_stripe_md *ea) RETURN(-ENODEV); lov = &export->exp_obd->u.lov; - md = ea; - for (i = 0; i < md->lmd_stripe_count; i++) { /* create data objects with "parent" OA */ memcpy(&tmp, oa, sizeof(tmp)); @@ -285,105 +278,167 @@ struct lov_stripe_md *ea) RETURN(rc); } -#if 0 -static int lov_getattr(struct lustre_handle *conn, struct obdo *oa) +static int lov_getattr(struct lustre_handle *conn, struct obdo *oa, + struct lov_stripe_md *md) { - int rc; + int rc = 0, i; + struct obdo tmp; + struct obd_export *export = class_conn2export(conn); + struct lov_obd *lov; ENTRY; - if (!class_conn2export(conn)) - RETURN(-EINVAL); + if (!md) { + CERROR("LOV requires striping ea for desctruction\n"); + RETURN(-EINVAL); + } + + if (!export || !export->exp_obd) + RETURN(-ENODEV); + + lov = &export->exp_obd->u.lov; + for (i = 0; i < md->lmd_stripe_count; i++) { + /* create data objects with "parent" OA */ + memcpy(&tmp, oa, sizeof(tmp)); + oa->o_id = md->lmd_objects[i].l_object_id; - rc = obd_getattr(&conn->oc_dev->obd_multi_conn[0], oa); + rc = obd_getattr(&lov->tgts[i].conn, &tmp, NULL); + if (!rc) { + CERROR("Error getattr object %Ld on %d\n", + oa->o_id, i); + } + /* XXX can do something more sophisticated here... */ + if (i == 0 ) { + obd_id id = oa->o_id; + memcpy(oa, &tmp, sizeof(tmp)); + oa->o_id = id; + } + } RETURN(rc); } -static int lov_setattr(struct lustre_handle *conn, struct obdo *oa) +static int lov_setattr(struct lustre_handle *conn, struct obdo *oa, + struct lov_stripe_md *md) { - int rc, retval, i; + int rc = 0, i; + struct obdo tmp; + struct obd_export *export = class_conn2export(conn); + struct lov_obd *lov; ENTRY; - if (!class_conn2export(conn)) - RETURN(-EINVAL); - - for (i = 0; i < conn->oc_dev->obd_multi_count; i++) { - rc = obd_setattr(&conn->oc_dev->obd_multi_conn[i], oa); - if (i == 0) - retval = rc; - else if (retval != rc) - CERROR("different results on multiple OBDs!\n"); + if (!md) { + CERROR("LOV requires striping ea for desctruction\n"); + RETURN(-EINVAL); } + if (!export || !export->exp_obd) + RETURN(-ENODEV); + + lov = &export->exp_obd->u.lov; + for (i = 0; i < md->lmd_stripe_count; i++) { + /* create data objects with "parent" OA */ + memcpy(&tmp, oa, sizeof(tmp)); + oa->o_id = md->lmd_objects[i].l_object_id; + + rc = obd_setattr(&lov->tgts[i].conn, &tmp, NULL); + if (!rc) { + CERROR("Error setattr object %Ld on %d\n", + oa->o_id, i); + } + } RETURN(rc); } -static int lov_open(struct lustre_handle *conn, struct obdo *oa) +static int lov_open(struct lustre_handle *conn, struct obdo *oa, + struct lov_stripe_md *md) { - int rc, retval, i; + int rc = 0, i; + struct obdo tmp; + struct obd_export *export = class_conn2export(conn); + struct lov_obd *lov; ENTRY; - if (!class_conn2export(conn)) - RETURN(-EINVAL); - - for (i = 0; i < conn->oc_dev->obd_multi_count; i++) { - rc = obd_open(&conn->oc_dev->obd_multi_conn[i], oa); - if (i == 0) - retval = rc; - else if (retval != rc) - CERROR("different results on multiple OBDs!\n"); + if (!md) { + CERROR("LOV requires striping ea for desctruction\n"); + RETURN(-EINVAL); } + if (!export || !export->exp_obd) + RETURN(-ENODEV); + + lov = &export->exp_obd->u.lov; + for (i = 0; i < md->lmd_stripe_count; i++) { + /* create data objects with "parent" OA */ + memcpy(&tmp, oa, sizeof(tmp)); + oa->o_id = md->lmd_objects[i].l_object_id; + + rc = obd_open(&lov->tgts[i].conn, &tmp, NULL); + if (!rc) { + CERROR("Error getattr object %Ld on %d\n", + oa->o_id, i); + } + } RETURN(rc); } -static int lov_close(struct lustre_handle *conn, struct obdo *oa) + +static int lov_close(struct lustre_handle *conn, struct obdo *oa, + struct lov_stripe_md *md) { - int rc, retval, i; + int rc = 0, i; + struct obdo tmp; + struct obd_export *export = class_conn2export(conn); + struct lov_obd *lov; ENTRY; - if (!class_conn2export(conn)) - RETURN(-EINVAL); - - for (i = 0; i < conn->oc_dev->obd_multi_count; i++) { - rc = obd_close(&conn->oc_dev->obd_multi_conn[i], oa); - if (i == 0) - retval = rc; - else if (retval != rc) - CERROR("different results on multiple OBDs!\n"); + if (!md) { + CERROR("LOV requires striping ea for desctruction\n"); + RETURN(-EINVAL); } - RETURN(rc); -} + if (!export || !export->exp_obd) + RETURN(-ENODEV); + lov = &export->exp_obd->u.lov; + for (i = 0; i < md->lmd_stripe_count; i++) { + /* create data objects with "parent" OA */ + memcpy(&tmp, oa, sizeof(tmp)); + oa->o_id = md->lmd_objects[i].l_object_id; + rc = obd_close(&lov->tgts[i].conn, &tmp, NULL); + if (!rc) { + CERROR("Error getattr object %Ld on %d\n", + oa->o_id, i); + } + } + RETURN(rc); +} -/* FIXME: maybe we'll just make one node the authoritative attribute node, then - * we can send this 'punch' to just the authoritative node and the nodes - * that the punch will affect. */ -static int lov_punch(struct lustre_handle *conn, struct obdo *oa, - obd_size count, obd_off offset) +/* compute offset in stripe i corresponds to offset "in" */ +__u64 lov_offset(struct lov_stripe_md *md, __u64 in, int i) { - int rc, retval, i; - ENTRY; - - if (!class_conn2export(conn)) - RETURN(-EINVAL); + __u32 ssz = md->lmd_stripe_size; + /* full stripes across all * stripe size */ + __u32 out = ( ((__u32)in) / (md->lmd_stripe_count * ssz)) * ssz; + __u32 off = (__u32)in % (md->lmd_stripe_count * ssz); - for (i = 0; i < conn->oc_dev->obd_multi_count; i++) { - rc = obd_punch(&conn->oc_dev->obd_multi_conn[i], oa, count, - offset); - if (i == 0) - retval = rc; - else if (retval != rc) - CERROR("different results on multiple OBDs!\n"); + if ( in == 0xffffffffffffffff ) { + return 0xffffffffffffffff; } - RETURN(rc); + if ( (i+1) * ssz < off ) + out += ssz; + else if ( i * ssz > off ) + out += 0; + else + out += (off - (i * ssz)) % ssz; + + return (__u64) out; } + struct lov_callback_data { atomic_t count; - wait_queue_head waitq; + wait_queue_head_t waitq; }; static void lov_read_callback(struct ptlrpc_bulk_desc *desc, void *data) @@ -400,7 +455,8 @@ static int lov_read_check_status(struct lov_callback_data *cb_data) if (sigismember(&(current->pending.signal), SIGKILL) || sigismember(&(current->pending.signal), SIGTERM) || sigismember(&(current->pending.signal), SIGINT)) { - cb_data->flags |= PTL_RPC_FL_INTR; + // FIXME XXX what here + // cb_data->flags |= PTL_RPC_FL_INTR; RETURN(1); } if (atomic_read(&cb_data->count) == 0) @@ -408,7 +464,49 @@ static int lov_read_check_status(struct lov_callback_data *cb_data) RETURN(0); } -/* buffer must lie in user memory here */ + +/* FIXME: maybe we'll just make one node the authoritative attribute node, then + * we can send this 'punch' to just the authoritative node and the nodes + * that the punch will affect. */ +static int lov_punch(struct lustre_handle *conn, struct obdo *oa, + struct lov_stripe_md *md, + obd_off start, obd_off end) +{ + int rc = 0, i; + struct obdo tmp; + struct obd_export *export = class_conn2export(conn); + struct lov_obd *lov; + ENTRY; + + if (!md) { + CERROR("LOV requires striping ea for desctruction\n"); + RETURN(-EINVAL); + } + + if (!export || !export->exp_obd) + RETURN(-ENODEV); + + lov = &export->exp_obd->u.lov; + for (i = 0; i < md->lmd_stripe_count; i++) { + __u64 starti = lov_offset(md, start, i); + __u64 endi = lov_offset(md, end, i); + + /* create data objects with "parent" OA */ + memcpy(&tmp, oa, sizeof(tmp)); + oa->o_id = md->lmd_objects[i].l_object_id; + + rc = obd_punch(&lov->tgts[i].conn, &tmp, NULL, + starti, endi); + if (!rc) { + CERROR("Error punch object %Ld on %d\n", + oa->o_id, i); + } + } + RETURN(rc); +} + + +#if 0 static int lov_brw(int cmd, struct lustre_handle *conn, obd_count num_oa, struct obdo **oa, obd_count *oa_bufs, struct page **buf, @@ -568,11 +666,11 @@ struct obd_ops lov_obd_ops = { o_disconnect: lov_disconnect, o_create: lov_create, o_destroy: lov_destroy, -#if 0 o_getattr: lov_getattr, o_setattr: lov_setattr, o_open: lov_open, o_close: lov_close, +#if 0 o_brw: lov_pgcache_brw, o_punch: lov_punch, o_enqueue: lov_enqueue, diff --git a/lustre/mdc/Makefile.am b/lustre/mdc/Makefile.am index 1b5d201..ab5fa58 100644 --- a/lustre/mdc/Makefile.am +++ b/lustre/mdc/Makefile.am @@ -9,7 +9,7 @@ MODULE = mdc modulefs_DATA = mdc.o EXTRA_PROGRAMS = mdc -LINX=mds_updates.c ll_pack.c lov_pack.c +LINX=l_net.c mds_updates.c ll_pack.c lov_pack.c mdc_SOURCES = mdc_request.c mdc_reint.c $(LINX) lov_pack.c: test -e lov_pack.c || ln -sf $(top_srcdir)/lib/lov_pack.c . @@ -17,6 +17,9 @@ lov_pack.c: ll_pack.c: test -e ll_pack.c || ln -sf $(top_srcdir)/lib/ll_pack.c . +l_net.c: + test -e l_net.c || ln -sf $(top_srcdir)/lib/l_net.c . + mds_updates.c: test -e mds_updates.c || ln -sf $(top_srcdir)/lib/mds_updates.c . diff --git a/lustre/mdc/mdc_reint.c b/lustre/mdc/mdc_reint.c index c735ece..621fb35 100644 --- a/lustre/mdc/mdc_reint.c +++ b/lustre/mdc/mdc_reint.c @@ -58,7 +58,7 @@ int mdc_setattr(struct lustre_handle *conn, ENTRY; mdc_con2cl(conn, &cl, &connection, &rconn); - req = ptlrpc_prep_req2(cl, connection, rconn, + req = ptlrpc_prep_req2(conn, MDS_REINT, 1, &size, NULL); if (!req) RETURN(-ENOMEM); @@ -83,9 +83,6 @@ int mdc_create(struct lustre_handle *conn, struct ptlrpc_request **request) { struct mds_rec_create *rec; - struct ptlrpc_client *cl; - struct ptlrpc_connection *connection; - struct lustre_handle *rconn; struct ptlrpc_request *req; int rc, size[3] = {sizeof(struct mds_rec_create), namelen + 1, 0}; char *tmp, *bufs[3] = {NULL, NULL, NULL}; @@ -98,7 +95,7 @@ int mdc_create(struct lustre_handle *conn, dir->i_ino, namelen, name); LBUG(); } - size[2] = smd->lmd_size; + size[2] = smd->lmd_easize; bufs[2] = (char *)smd; bufcount = 3; } else if (S_ISLNK(mode)) { @@ -106,9 +103,7 @@ int mdc_create(struct lustre_handle *conn, bufcount = 3; } - mdc_con2cl(conn, &cl, &connection, &rconn); - req = ptlrpc_prep_req2(cl, connection, rconn, - MDS_REINT, bufcount, size, bufs); + req = ptlrpc_prep_req2(conn, MDS_REINT, bufcount, size, bufs); if (!req) RETURN(-ENOMEM); @@ -119,7 +114,7 @@ int mdc_create(struct lustre_handle *conn, if (S_ISREG(mode)) { tmp = lustre_msg_buf(req->rq_reqmsg, 2); - memcpy(tmp, smd, smd->lmd_size); + memcpy(tmp, smd, smd->lmd_easize); } else if (S_ISLNK(mode)) { tmp = lustre_msg_buf(req->rq_reqmsg, 2); LOGL0(tgt, tgtlen, tmp); @@ -149,16 +144,11 @@ int mdc_unlink(struct lustre_handle *conn, struct inode *dir, struct inode *child, const char *name, int namelen, struct ptlrpc_request **request) { - struct ptlrpc_client *cl; - struct ptlrpc_connection *connection; - struct lustre_handle *rconn; struct ptlrpc_request *req; int rc, size[2] = {sizeof(struct mds_rec_unlink), namelen + 1}; ENTRY; - mdc_con2cl(conn, &cl, &connection, &rconn); - req = ptlrpc_prep_req2(cl, connection, rconn, - MDS_REINT, 2, size, NULL); + req = ptlrpc_prep_req2(conn, MDS_REINT, 2, size, NULL); if (!req) RETURN(-ENOMEM); @@ -179,16 +169,11 @@ int mdc_link(struct lustre_handle *conn, struct dentry *src, struct inode *dir, const char *name, int namelen, struct ptlrpc_request **request) { - struct ptlrpc_client *cl; - struct ptlrpc_connection *connection; - struct lustre_handle *rconn; struct ptlrpc_request *req; int rc, size[2] = {sizeof(struct mds_rec_link), namelen + 1}; ENTRY; - mdc_con2cl(conn, &cl, &connection, &rconn); - req = ptlrpc_prep_req2(cl, connection, rconn, - MDS_REINT, 2, size, NULL); + req = ptlrpc_prep_req2(conn, MDS_REINT, 2, size, NULL); if (!req) RETURN(-ENOMEM); @@ -210,17 +195,12 @@ int mdc_rename(struct lustre_handle *conn, int oldlen, const char *new, int newlen, struct ptlrpc_request **request) { - struct ptlrpc_client *cl; - struct ptlrpc_connection *connection; - struct lustre_handle *rconn; struct ptlrpc_request *req; int rc, size[3] = {sizeof(struct mds_rec_rename), oldlen + 1, newlen + 1}; ENTRY; - mdc_con2cl(conn, &cl, &connection, &rconn); - req = ptlrpc_prep_req2(cl, connection, rconn, - MDS_REINT, 3, size, NULL); + req = ptlrpc_prep_req2(conn, MDS_REINT, 3, size, NULL); if (!req) RETURN(-ENOMEM); diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 4116df1..b81e961 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -39,57 +39,31 @@ int mdc_con2cl(struct lustre_handle *conn, struct ptlrpc_client **cl, struct lustre_handle **rconn) { struct obd_export *export; - struct mdc_obd *mdc; + struct client_obd *mdc; export = class_conn2export(conn); if (!export) return -ENOTCONN; - mdc = &export->exp_obd->u.mdc; + mdc = &export->exp_obd->u.cli; - *cl = mdc->mdc_client; - *connection = mdc->mdc_conn; + *cl = mdc->cl_client; + *connection = mdc->cl_conn; *rconn = &export->exp_rconnh; return 0; } -static int mdc_con2dlmcl(struct lustre_handle *conn, struct ptlrpc_client **cl, - struct ptlrpc_connection **connection, - struct lustre_handle **rconn) -{ - struct obd_export *export; - struct mdc_obd *mdc; - - export = class_conn2export(conn); - if (!export) - return -ENOTCONN; - - mdc = &export->exp_obd->u.mdc; - - *cl = mdc->mdc_ldlm_client; - *connection = mdc->mdc_conn; - *rconn = &export->exp_rconnh; - - return 0; -} - - int mdc_getstatus(struct lustre_handle *conn, struct ll_fid *rootfid, __u64 *last_committed, __u64 *last_rcvd, __u32 *last_xid, struct ptlrpc_request **request) { struct ptlrpc_request *req; struct mds_body *body; - struct ptlrpc_client *cl; - struct ptlrpc_connection *connection; - struct lustre_handle *rconn; int rc, size = sizeof(*body); ENTRY; - mdc_con2cl(conn, &cl, &connection, &rconn); - req = ptlrpc_prep_req2(cl, connection, rconn, - MDS_GETSTATUS, 1, &size, NULL); + req = ptlrpc_prep_req2(conn, MDS_GETSTATUS, 1, &size, NULL); if (!req) GOTO(out, rc = -ENOMEM); @@ -129,17 +103,12 @@ int mdc_getlovinfo(struct obd_device *obd, struct lustre_handle *mdc_connh, struct ptlrpc_request *req; struct mds_status_req *streq; struct lov_obd *lov = &obd->u.lov; - struct mdc_obd *mdc = &lov->mdcobd->u.mdc; - struct ptlrpc_client *cl; - struct ptlrpc_connection *connection; - struct lustre_handle *rconn; + struct client_obd *mdc = &lov->mdcobd->u.cli; struct lov_desc *desc = &lov->desc; int rc, size[2] = {sizeof(*streq)}; ENTRY; - mdc_con2cl(mdc_connh, &cl, &connection, &rconn); - req = ptlrpc_prep_req2(cl, connection, rconn, - MDS_GETLOVINFO, 1, size, NULL); + req = ptlrpc_prep_req2(mdc_connh, MDS_GETLOVINFO, 1, size, NULL); if (!req) GOTO(out, rc = -ENOMEM); @@ -162,7 +131,7 @@ int mdc_getlovinfo(struct obd_device *obd, struct lustre_handle *mdc_connh, *uuids = lustre_msg_buf(req->rq_repmsg, 1); lov_unpackdesc(desc); } - mdc->mdc_max_mdsize = sizeof(*desc) + + mdc->cl_max_mdsize = sizeof(*desc) + desc->ld_tgt_count * sizeof(uuid_t); EXIT; @@ -184,8 +153,7 @@ int mdc_getattr(struct lustre_handle *conn, ENTRY; mdc_con2cl(conn, &cl, &connection, &rconn); - req = ptlrpc_prep_req2(cl, connection, rconn, - MDS_GETATTR, 1, size, NULL); + req = ptlrpc_prep_req2(conn, MDS_GETATTR, 1, size, NULL); if (!req) GOTO(out, rc = -ENOMEM); @@ -194,9 +162,9 @@ int mdc_getattr(struct lustre_handle *conn, body->valid = valid; if (S_ISREG(type)) { - struct mdc_obd *mdc = &class_conn2obd(conn)->u.mdc; + struct client_obd *mdc = &class_conn2obd(conn)->u.cli; bufcount = 2; - size[1] = mdc->mdc_max_mdsize; + size[1] = mdc->cl_max_mdsize; } else if (valid & OBD_MD_LINKNAME) { bufcount = 2; size[1] = ea_size; @@ -260,20 +228,17 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type, { struct ptlrpc_request *req; struct obd_device *obddev = class_conn2obd(conn); - struct ptlrpc_client *cl; - struct ptlrpc_connection *connection; - struct lustre_handle *rconn; __u64 res_id[RES_NAME_SIZE] = {dir->i_ino}; int size[5] = {sizeof(struct ldlm_request), sizeof(struct ldlm_intent)}; int rc, flags; int repsize[3] = {sizeof(struct ldlm_reply), sizeof(struct mds_body), - obddev->u.mdc.mdc_max_mdsize}; + obddev->u.cli.cl_max_mdsize}; struct ldlm_reply *dlm_rep; struct ldlm_intent *lit; ENTRY; - LDLM_DEBUG_NOLOCK("mdsintent %d dir %ld", it->it_op, dir->i_ino); + LDLM_DEBUG_NOLOCK("mdsintent %s dir %ld", ldlm_it2str(it->it_op), dir->i_ino); switch (it->it_op) { case IT_MKDIR: @@ -290,13 +255,11 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type, break; } - mdc_con2dlmcl(conn, &cl, &connection, &rconn); if (it->it_op & (IT_MKDIR | IT_CREAT | IT_SYMLINK | IT_MKNOD)) { size[2] = sizeof(struct mds_rec_create); size[3] = de->d_name.len + 1; size[4] = tgtlen + 1; - req = ptlrpc_prep_req2(cl, connection, rconn, - LDLM_ENQUEUE, 5, size, NULL); + req = ptlrpc_prep_req2(conn, LDLM_ENQUEUE, 5, size, NULL); if (!req) RETURN(-ENOMEM); @@ -315,8 +278,7 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type, size[2] = sizeof(struct mds_rec_rename); size[3] = old_de->d_name.len + 1; size[4] = de->d_name.len + 1; - req = ptlrpc_prep_req2(cl, connection, rconn, - LDLM_ENQUEUE, 5, size, NULL); + req = ptlrpc_prep_req2(conn, LDLM_ENQUEUE, 5, size, NULL); if (!req) RETURN(-ENOMEM); @@ -332,8 +294,7 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type, } else if (it->it_op == IT_UNLINK || it->it_op == IT_RMDIR) { size[2] = sizeof(struct mds_rec_unlink); size[3] = de->d_name.len + 1; - req = ptlrpc_prep_req2(cl, connection, rconn, - LDLM_ENQUEUE, 4, size, NULL); + req = ptlrpc_prep_req2(conn, LDLM_ENQUEUE, 4, size, NULL); if (!req) RETURN(-ENOMEM); @@ -352,8 +313,7 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type, size[2] = sizeof(struct mds_body); size[3] = de->d_name.len + 1; - req = ptlrpc_prep_req2(cl, connection, rconn, - LDLM_ENQUEUE, 4, size, NULL); + req = ptlrpc_prep_req2(conn, LDLM_ENQUEUE, 4, size, NULL); if (!req) RETURN(-ENOMEM); @@ -367,8 +327,7 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type, /* get ready for the reply */ req->rq_replen = lustre_msg_size(3, repsize); } else if (it->it_op == IT_READDIR) { - req = ptlrpc_prep_req2(cl, connection, rconn, - LDLM_ENQUEUE, 1, size, NULL); + req = ptlrpc_prep_req2(conn, LDLM_ENQUEUE, 1, size, NULL); if (!req) RETURN(-ENOMEM); @@ -379,9 +338,8 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type, RETURN(-1); } #warning FIXME: the data here needs to be different if a lock was granted for a different inode - rc = ldlm_cli_enqueue(cl, connection, rconn, req, - obddev->obd_namespace, NULL, res_id, lock_type, - NULL, 0, lock_mode, &flags, + rc = ldlm_cli_enqueue(conn, req, obddev->obd_namespace, NULL, res_id, lock_type, + NULL, 0, lock_mode, &flags, ldlm_completion_ast, (void *)mdc_lock_callback, data, datalen, lockh); if (rc == -ENOENT || rc == ELDLM_LOCK_ABORTED) { lock_mode = 0; @@ -414,12 +372,11 @@ int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags, if (smd != NULL) { bufcount = 2; - size[1] = smd->lmd_size; + size[1] = smd->lmd_easize; } mdc_con2cl(conn, &cl, &connection, &rconn); - req = ptlrpc_prep_req2(cl, connection, rconn, - MDS_OPEN, bufcount, size, NULL); + req = ptlrpc_prep_req2(conn, MDS_OPEN, bufcount, size, NULL); if (!req) GOTO(out, rc = -ENOMEM); @@ -431,7 +388,7 @@ int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags, body->extra = cookie; if (smd != NULL) - memcpy(lustre_msg_buf(req->rq_reqmsg, 1), smd, smd->lmd_size); + memcpy(lustre_msg_buf(req->rq_reqmsg, 1), smd, smd->lmd_easize); req->rq_replen = lustre_msg_size(1, size); @@ -452,16 +409,11 @@ int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags, int mdc_close(struct lustre_handle *conn, obd_id ino, int type, __u64 fh, struct ptlrpc_request **request) { - struct ptlrpc_client *cl; - struct ptlrpc_connection *connection; - struct lustre_handle *rconn; struct mds_body *body; int rc, size = sizeof(*body); struct ptlrpc_request *req; - mdc_con2cl(conn, &cl, &connection, &rconn); - req = ptlrpc_prep_req2(cl, connection, rconn, - MDS_CLOSE, 1, &size, NULL); + req = ptlrpc_prep_req2(conn, MDS_CLOSE, 1, &size, NULL); if (!req) GOTO(out, rc = -ENOMEM); @@ -483,9 +435,7 @@ int mdc_close(struct lustre_handle *conn, int mdc_readpage(struct lustre_handle *conn, obd_id ino, int type, __u64 offset, char *addr, struct ptlrpc_request **request) { - struct ptlrpc_client *cl; - struct ptlrpc_connection *connection; - struct lustre_handle *rconn; + struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn; struct ptlrpc_request *req = NULL; struct ptlrpc_bulk_desc *desc = NULL; struct ptlrpc_bulk_page *bulk = NULL; @@ -495,13 +445,11 @@ int mdc_readpage(struct lustre_handle *conn, obd_id ino, int type, __u64 offset, CDEBUG(D_INODE, "inode: %ld\n", (long)ino); - mdc_con2cl(conn, &cl, &connection, &rconn); desc = ptlrpc_prep_bulk(connection); if (desc == NULL) GOTO(out, rc = -ENOMEM); - req = ptlrpc_prep_req2(cl, connection, rconn, - MDS_READPAGE, 1, &size, NULL); + req = ptlrpc_prep_req2(conn, MDS_READPAGE, 1, &size, NULL); if (!req) GOTO(out2, rc = -ENOMEM); @@ -544,17 +492,12 @@ int mdc_readpage(struct lustre_handle *conn, obd_id ino, int type, __u64 offset, int mdc_statfs(struct lustre_handle *conn, struct statfs *sfs, struct ptlrpc_request **request) { - struct ptlrpc_client *cl; - struct ptlrpc_connection *connection; - struct lustre_handle *rconn; struct obd_statfs *osfs; struct ptlrpc_request *req; int rc, size = sizeof(*osfs); ENTRY; - mdc_con2cl(conn, &cl, &connection, &rconn); - req = ptlrpc_prep_req2(cl, connection, rconn, - MDS_STATFS, 0, NULL, NULL); + req = ptlrpc_prep_req2(conn, MDS_STATFS, 0, NULL, NULL); if (!req) GOTO(out, rc = -ENOMEM); req->rq_replen = lustre_msg_size(1, &size); @@ -575,294 +518,11 @@ out: return rc; } -static int mdc_ioctl(long cmd, struct lustre_handle *conn, int len, void *karg, - void *uarg) -{ -#if 0 - /* FIXME XXX : This should use the new ioc_data to pass args in */ - int err = 0; - struct ptlrpc_client *cl; - struct ptlrpc_connection *connection; - struct lustre_handle *rconn; - struct ptlrpc_request *request; - - ENTRY; - - if (_IOC_TYPE(cmd) != IOC_REQUEST_TYPE || - _IOC_NR(cmd) < IOC_REQUEST_MIN_NR || - _IOC_NR(cmd) > IOC_REQUEST_MAX_NR ) { - CDEBUG(D_IOCTL, "invalid ioctl ( type %d, nr %d, size %d )\n", - _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd)); - RETURN(-EINVAL); - } - - ptlrpc_init_client(NULL, NULL, - MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, &cl); - - mdc_con2cl(conn, &cl, &connection, &rconn); - switch (cmd) { - case IOC_REQUEST_GETATTR: { - CERROR("-- getting attr for ino %lu\n", arg); - err = mdc_getattr(&cl, connection, (obd_id)arg, S_IFDIR, ~0, 0, - &request); - CERROR("-- done err %d\n", err); - - GOTO(out, err); - } - - case IOC_REQUEST_READPAGE: { - char *buf; - OBD_ALLOC(buf, PAGE_SIZE); - if (!buf) { - err = -ENOMEM; - GOTO(out, err); - } - CERROR("-- readpage 0 for ino %lu\n", arg); - err = mdc_readpage(&cl, connection, arg, S_IFDIR, 0, buf, - &request); - CERROR("-- done err %d\n", err); - OBD_FREE(buf, PAGE_SIZE); - - GOTO(out, err); - } - - case IOC_REQUEST_SETATTR: { - struct inode inode; - struct iattr iattr; - - inode.i_ino = arg; - inode.i_generation = 0; - iattr.ia_mode = 040777; - iattr.ia_atime = 0; - iattr.ia_valid = ATTR_MODE | ATTR_ATIME; - - err = mdc_setattr(&cl, connection, &inode, &iattr, &request); - CERROR("-- done err %d\n", err); - - GOTO(out, err); - } - - case IOC_REQUEST_CREATE: { - struct inode inode; - struct iattr iattr; - - inode.i_ino = arg; - inode.i_generation = 0; - iattr.ia_mode = 040777; - iattr.ia_atime = 0; - iattr.ia_valid = ATTR_MODE | ATTR_ATIME; - - err = mdc_create(&cl, connection, &inode, - "foofile", strlen("foofile"), - NULL, 0, 0100707, 47114711, - 11, 47, 0, NULL, &request); - CERROR("-- done err %d\n", err); - - GOTO(out, err); - } - - case IOC_REQUEST_OPEN: { - __u64 fh, ino; - copy_from_user(&ino, (__u64 *)arg, sizeof(ino)); - CERROR("-- opening ino %llu\n", (unsigned long long)ino); - err = mdc_open(&cl, connection, ino, S_IFDIR, O_RDONLY, 4711, - &fh, &request); - copy_to_user((__u64 *)arg, &fh, sizeof(fh)); - CERROR("-- done err %d (fh=%Lu)\n", err, - (unsigned long long)fh); - - GOTO(out, err); - } - - case IOC_REQUEST_CLOSE: { - CERROR("-- closing ino 2, filehandle %lu\n", arg); - err = mdc_close(&cl, connection, 2, S_IFDIR, arg, &request); - CERROR("-- done err %d\n", err); - - GOTO(out, err); - } - - default: - GOTO(out, err = -EINVAL); - } - - out: - ptlrpc_free_req(request); - ptlrpc_put_connection(connection); - ptlrpc_cleanup_client(&cl); - - RETURN(err); -#endif - return 0; -} - -static int mdc_setup(struct obd_device *obddev, obd_count len, void *buf) -{ - struct obd_ioctl_data* data = buf; - struct mdc_obd *mdc = &obddev->u.mdc; - char server_uuid[37]; - int rc; - ENTRY; - - if (data->ioc_inllen1 < 1) { - CERROR("osc setup requires a TARGET UUID\n"); - RETURN(-EINVAL); - } - - if (data->ioc_inllen1 > 37) { - CERROR("mdc UUID must be less than 38 characters\n"); - RETURN(-EINVAL); - } - - if (data->ioc_inllen2 < 1) { - CERROR("mdc setup requires a SERVER UUID\n"); - RETURN(-EINVAL); - } - - if (data->ioc_inllen2 > 37) { - CERROR("mdc UUID must be less than 38 characters\n"); - RETURN(-EINVAL); - } - - memcpy(mdc->mdc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1); - memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2, - sizeof(server_uuid))); - - mdc->mdc_conn = ptlrpc_uuid_to_connection(server_uuid); - if (!mdc->mdc_conn) - RETURN(-ENOENT); - - OBD_ALLOC(mdc->mdc_client, sizeof(*mdc->mdc_client)); - if (mdc->mdc_client == NULL) - GOTO(out_conn, rc = -ENOMEM); - - OBD_ALLOC(mdc->mdc_ldlm_client, sizeof(*mdc->mdc_ldlm_client)); - if (mdc->mdc_ldlm_client == NULL) - GOTO(out_client, rc = -ENOMEM); - - ptlrpc_init_client(NULL, NULL, MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, - mdc->mdc_client); - ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, - mdc->mdc_ldlm_client); - mdc->mdc_client->cli_name = "mdc"; - mdc->mdc_ldlm_client->cli_name = "ldlm"; - mdc->mdc_max_mdsize = sizeof(struct lov_stripe_md); - /* XXX get recovery hooked in here again */ - //ptlrpc_init_client(ptlrpc_connmgr, ll_recover,... - - ptlrpc_init_client(ptlrpc_connmgr, NULL, - MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, - mdc->mdc_client); - - MOD_INC_USE_COUNT; - RETURN(0); - - out_client: - OBD_FREE(mdc->mdc_client, sizeof(*mdc->mdc_client)); - out_conn: - ptlrpc_put_connection(mdc->mdc_conn); - return rc; -} - -static int mdc_cleanup(struct obd_device * obddev) -{ - struct mdc_obd *mdc = &obddev->u.mdc; - - ptlrpc_cleanup_client(mdc->mdc_client); - OBD_FREE(mdc->mdc_client, sizeof(*mdc->mdc_client)); - ptlrpc_cleanup_client(mdc->mdc_ldlm_client); - OBD_FREE(mdc->mdc_ldlm_client, sizeof(*mdc->mdc_ldlm_client)); - ptlrpc_put_connection(mdc->mdc_conn); - - MOD_DEC_USE_COUNT; - return 0; -} - -static int mdc_connect(struct lustre_handle *conn, struct obd_device *obd) -{ - struct mdc_obd *mdc = &obd->u.mdc; - struct ptlrpc_request *request; - int rc, size = sizeof(mdc->mdc_target_uuid); - char *tmp = mdc->mdc_target_uuid; - - ENTRY; - - obd->obd_namespace = - ldlm_namespace_new("mdc", LDLM_NAMESPACE_CLIENT); - if (obd->obd_namespace == NULL) - RETURN(-ENOMEM); - - MOD_INC_USE_COUNT; - rc = class_connect(conn, obd); - if (rc) - RETURN(rc); - - request = ptlrpc_prep_req(mdc->mdc_client, mdc->mdc_conn, - MDS_CONNECT, 1, &size, &tmp); - if (!request) - GOTO(out_disco, -ENOMEM); - - request->rq_level = LUSTRE_CONN_NEW; - request->rq_replen = lustre_msg_size(0, NULL); - /* Sending our local connection info breaks for local connections - request->rq_reqmsg->addr = conn->addr; - request->rq_reqmsg->cookie = conn->cookie; - */ - - rc = ptlrpc_queue_wait(request); - rc = ptlrpc_check_status(request, rc); - if (rc) - GOTO(out, rc); - - class_rconn2export(conn, (struct lustre_handle *)request->rq_repmsg); - - EXIT; - out: - ptlrpc_free_req(request); - out_disco: - if (rc) { - class_disconnect(conn); - MOD_DEC_USE_COUNT; - } - return rc; -} - -static int mdc_disconnect(struct lustre_handle *conn) -{ - struct ptlrpc_client *cl; - struct ptlrpc_connection *connection; - struct lustre_handle *rconn; - struct obd_device *obd = class_conn2obd(conn); - struct ptlrpc_request *request; - int rc; - ENTRY; - - ldlm_namespace_free(obd->obd_namespace); - mdc_con2cl(conn, &cl, &connection, &rconn); - request = ptlrpc_prep_req2(cl, connection, rconn, - MDS_DISCONNECT, 0, NULL, NULL); - if (!request) - RETURN(-ENOMEM); - - request->rq_replen = lustre_msg_size(0, NULL); - - rc = ptlrpc_queue_wait(request); - if (rc) - GOTO(out, rc); - rc = class_disconnect(conn); - if (!rc) - MOD_DEC_USE_COUNT; - out: - ptlrpc_free_req(request); - return rc; -} - struct obd_ops mdc_obd_ops = { - o_setup: mdc_setup, - o_cleanup: mdc_cleanup, - o_connect: mdc_connect, - o_disconnect: mdc_disconnect, - o_iocontrol: mdc_ioctl + o_setup: client_obd_setup, + o_cleanup: client_obd_cleanup, + o_connect: client_obd_connect, + o_disconnect: client_obd_disconnect, }; static int __init ptlrpc_request_init(void) diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 48667cc..dfc469b 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -13,8 +13,6 @@ * by Peter Braam & * Andreas Dilger * - * This server is single threaded at present (but can easily be multi threaded) - * */ #define EXPORT_SYMTAB @@ -37,6 +35,7 @@ inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req) return &req->rq_export->exp_obd->u.mds; } + /* Assumes caller has already pushed into the kernel filesystem context */ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, __u64 offset) @@ -95,7 +94,7 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, } /* 'dir' is a inode for which a lock has already been taken */ -struct dentry *mds_name2locked_dentry(struct mds_obd *mds, struct dentry *dir, +struct dentry *mds_name2locked_dentry(struct obd_device *obd, struct dentry *dir, struct vfsmount **mnt, char *name, int namelen, int lock_mode, struct lustre_handle *lockh, @@ -123,11 +122,9 @@ struct dentry *mds_name2locked_dentry(struct mds_obd *mds, struct dentry *dir, RETURN(dchild); res_id[0] = dchild->d_inode->i_ino; - rc = ldlm_match_or_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn, - (struct lustre_handle *)&mds->mds_connh, - NULL, mds->mds_local_namespace, NULL, + rc = ldlm_match_or_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id, LDLM_PLAIN, NULL, 0, lock_mode, - &flags, (void *)mds_lock_callback, NULL, + &flags, ldlm_completion_ast, (void *)mds_lock_callback, NULL, 0, lockh); if (rc != ELDLM_OK) { l_dput(dchild); @@ -138,10 +135,11 @@ struct dentry *mds_name2locked_dentry(struct mds_obd *mds, struct dentry *dir, RETURN(dchild); } -struct dentry *mds_fid2locked_dentry(struct mds_obd *mds, struct ll_fid *fid, +struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, struct vfsmount **mnt, int lock_mode, struct lustre_handle *lockh) { + struct mds_obd *mds = &obd->u.mds; struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de; int flags, rc; __u64 res_id[3] = {0}; @@ -151,11 +149,9 @@ struct dentry *mds_fid2locked_dentry(struct mds_obd *mds, struct ll_fid *fid, RETURN(de); res_id[0] = de->d_inode->i_ino; - rc = ldlm_match_or_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn, - (struct lustre_handle *)&mds->mds_connh, - NULL, mds->mds_local_namespace, NULL, + rc = ldlm_match_or_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id, LDLM_PLAIN, NULL, 0, lock_mode, - &flags, (void *)mds_lock_callback, NULL, + &flags, ldlm_completion_ast, (void *)mds_lock_callback, NULL, 0, lockh); if (rc != ELDLM_OK) { l_dput(de); @@ -197,9 +193,7 @@ struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, RETURN(ERR_PTR(-ESTALE)); } - /* now to find a dentry. - * If possible, get a well-connected one - */ + /* now to find a dentry. If possible, get a well-connected one */ if (mnt) *mnt = mds->mds_vfsmnt; spin_lock(&dcache_lock); @@ -374,6 +368,7 @@ int mds_lock_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *desc, static int mds_getattr_name(int offset, struct ptlrpc_request *req) { struct mds_obd *mds = mds_req2mds(req); + struct obd_device *obd = req->rq_export->exp_obd; struct obd_run_ctxt saved; struct mds_body *body; struct dentry *de = NULL, *dchild = NULL; @@ -412,15 +407,13 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req) lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW; res_id[0] = dir->i_ino; - rc = ldlm_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN, + rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN, NULL, 0, lock_mode, &lockh); if (rc == 0) { LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]); - rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn, - (struct lustre_handle *)&mds->mds_connh, - NULL, mds->mds_local_namespace, NULL, + rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id, LDLM_PLAIN, NULL, 0, lock_mode, - &flags, (void *)mds_lock_callback, + &flags, ldlm_completion_ast, (void *)mds_lock_callback, NULL, 0, &lockh); if (rc != ELDLM_OK) { CERROR("lock enqueue: err: %d\n", rc); @@ -450,7 +443,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req) if (S_ISREG(inode->i_mode)) { struct lov_stripe_md *md; md = lustre_msg_buf(req->rq_repmsg, offset + 1); - md->lmd_size = mds->mds_max_mdsize; + md->lmd_easize = mds->mds_max_mdsize; mds_fs_get_md(mds, inode, md); } /* now a normal case for intent locking */ @@ -634,11 +627,9 @@ static int mds_open(struct ptlrpc_request *req) void *handle; struct lov_stripe_md *md; struct inode *inode = de->d_inode; - //struct iattr iattr; int rc; md = lustre_msg_buf(req->rq_reqmsg, 1); - //iattr.ia_mode = inode->i_mode; handle = mds_fs_start(mds, de->d_inode, MDS_FSOP_SETATTR); if (!handle) { @@ -648,7 +639,6 @@ static int mds_open(struct ptlrpc_request *req) /* XXX error handling */ rc = mds_fs_set_md(mds, inode, handle, md); - // rc = mds_fs_setattr(mds, de, handle, &iattr); if (!rc) { struct obd_run_ctxt saved; push_ctxt(&saved, &mds->mds_ctxt); @@ -881,6 +871,37 @@ int mds_handle(struct ptlrpc_request *req) rc = mds_close(req); break; + case LDLM_ENQUEUE: + CDEBUG(D_INODE, "enqueue\n"); + OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0); + rc = ldlm_handle_enqueue(req); + if (rc) + break; + RETURN(0); + + case LDLM_CONVERT: + CDEBUG(D_INODE, "convert\n"); + OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0); + rc = ldlm_handle_convert(req); + if (rc) + break; + RETURN(0); + + case LDLM_CANCEL: + CDEBUG(D_INODE, "cancel\n"); + OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0); + rc = ldlm_handle_cancel(req); + if (rc) + break; + RETURN(0); + case LDLM_CALLBACK: + CDEBUG(D_INODE, "callback\n"); + CERROR("callbacks should not happen on MDS\n"); + LBUG(); + OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0); + break; + + default: rc = ptlrpc_error(req->rq_svc, req); RETURN(rc); @@ -963,12 +984,12 @@ static int mds_recover(struct obd_device *obddev) return rc; } - +#define MDS_NUM_THREADS 8 /* mount the file system (secretly) */ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf) { + int i; struct obd_ioctl_data* data = buf; - struct obd_export *export; struct mds_obd *mds = &obddev->u.mds; struct vfsmount *mnt; int rc = 0; @@ -1008,58 +1029,30 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf) GOTO(err_fs, rc = -EINVAL); } - rc = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds"); - if (rc) { - CERROR("cannot start thread: rc = %d\n", rc); - GOTO(err_svc, rc); - } - rc = -ENOENT; - mds->mds_ldlm_conn = ptlrpc_uuid_to_connection("self"); - if (!mds->mds_ldlm_conn) { - mds_cleanup(obddev); - GOTO(err_thread, rc); - } - obddev->obd_namespace = ldlm_namespace_new("mds_server", LDLM_NAMESPACE_SERVER); if (obddev->obd_namespace == NULL) { LBUG(); mds_cleanup(obddev); - GOTO(err_thread, rc); - } - - mds->mds_local_namespace = - ldlm_namespace_new("mds_client", LDLM_NAMESPACE_CLIENT); - if (mds->mds_local_namespace == NULL) { - LBUG(); - mds_cleanup(obddev); - GOTO(err_thread, rc); + GOTO(err_svc, rc); } - OBD_ALLOC(mds->mds_ldlm_client, sizeof(*mds->mds_ldlm_client)); - if (mds->mds_ldlm_client == NULL) { - LBUG(); - mds_cleanup(obddev); - GOTO(err_thread, rc); + for (i = 0; i < MDS_NUM_THREADS; i++) { + char name[32]; + sprintf(name, "lustre_MDS_%2d", i); + rc = ptlrpc_start_thread(obddev, mds->mds_service, name); + if (rc) { + CERROR("cannot start MDS thread #%d: rc %d\n", i, rc); + LBUG(); + GOTO(err_thread, rc); + } } - ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, - mds->mds_ldlm_client); - mds->mds_ldlm_client->cli_target_devno = obddev->obd_minor; - mds->mds_ldlm_client->cli_name = "mds ldlm"; rc = mds_recover(obddev); if (rc) GOTO(err_thread, rc); - rc = class_connect(&mds->mds_connh, obddev); - if (rc) - GOTO(err_thread, rc); - export = class_conn2export(&mds->mds_connh); - if (!export) - LBUG(); - export->exp_connection = mds->mds_ldlm_conn; - RETURN(0); @@ -1088,8 +1081,6 @@ static int mds_cleanup(struct obd_device * obddev) struct mds_obd *mds = &obddev->u.mds; ENTRY; - class_disconnect(&mds->mds_connh); - if ( !list_empty(&obddev->obd_exports) ) { CERROR("still has exports!\n"); @@ -1118,14 +1109,8 @@ static int mds_cleanup(struct obd_device * obddev) mds->mds_sb = 0; kfree(mds->mds_fstype); - ldlm_namespace_free(mds->mds_local_namespace); ldlm_namespace_free(obddev->obd_namespace); - if (mds->mds_ldlm_conn != NULL) - ptlrpc_put_connection(mds->mds_ldlm_conn); - - OBD_FREE(mds->mds_ldlm_client, sizeof(*mds->mds_ldlm_client)); - lock_kernel(); #ifdef CONFIG_DEV_RDONLY dev_clear_rdonly(2); @@ -1136,6 +1121,145 @@ static int mds_cleanup(struct obd_device * obddev) RETURN(0); } +static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie, + ldlm_mode_t mode, void *data) +{ + struct ptlrpc_request *req = req_cookie; + int rc = 0; + ENTRY; + + if (!req_cookie) + RETURN(0); + + if (req->rq_reqmsg->bufcount > 1) { + /* an intent needs to be considered */ + struct ldlm_intent *it = lustre_msg_buf(req->rq_reqmsg, 1); + struct mds_obd *mds= &req->rq_export->exp_obd->u.mds; + struct mds_body *mds_rep; + struct ldlm_reply *rep; + __u64 new_resid[3] = {0, 0, 0}, old_res; + int bufcount = -1, rc, size[3] = {sizeof(struct ldlm_reply), + sizeof(struct mds_body), + mds->mds_max_mdsize}; + + it->opc = NTOH__u64(it->opc); + + LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc)); + + /* prepare reply */ + switch((long)it->opc) { + case IT_GETATTR: + /* Note that in the negative case you may be returning + * a file and its obdo */ + case IT_CREAT: + case IT_CREAT|IT_OPEN: + case IT_LINK: + case IT_LOOKUP: + case IT_MKDIR: + case IT_MKNOD: + case IT_OPEN: + case IT_READLINK: + case IT_RENAME: + case IT_RMDIR: + case IT_SETATTR: + case IT_SYMLINK: + case IT_UNLINK: + bufcount = 3; + break; + case IT_RENAME2: + bufcount = 1; + break; + default: + LBUG(); + } + + rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen, + &req->rq_repmsg); + if (rc) { + rc = req->rq_status = -ENOMEM; + RETURN(rc); + } + + rep = lustre_msg_buf(req->rq_repmsg, 0); + rep->lock_policy_res1 = 1; + + /* execute policy */ + switch ((long)it->opc) { + case IT_CREAT: + case IT_CREAT|IT_OPEN: + case IT_LINK: + case IT_MKDIR: + case IT_MKNOD: + case IT_RENAME2: + case IT_RMDIR: + case IT_SYMLINK: + case IT_UNLINK: + rc = mds_reint(2, req); + if (rc || req->rq_status != 0) { + rep->lock_policy_res2 = req->rq_status; + RETURN(ELDLM_LOCK_ABORTED); + } + break; + case IT_GETATTR: + case IT_LOOKUP: + case IT_OPEN: + case IT_READDIR: + case IT_READLINK: + case IT_RENAME: + case IT_SETATTR: + rc = mds_getattr_name(2, req); + /* FIXME: we need to sit down and decide on who should + * set req->rq_status, who should return negative and + * positive return values, and what they all mean. */ + if (rc || req->rq_status != 0) { + rep->lock_policy_res2 = req->rq_status; + RETURN(ELDLM_LOCK_ABORTED); + } + break; + case IT_READDIR|IT_OPEN: + LBUG(); + break; + default: + CERROR("Unhandled intent\n"); + LBUG(); + } + + if (it->opc == IT_UNLINK || it->opc == IT_RMDIR || + it->opc == IT_RENAME || it->opc == IT_RENAME2) + RETURN(ELDLM_LOCK_ABORTED); + + rep->lock_policy_res2 = req->rq_status; + mds_rep = lustre_msg_buf(req->rq_repmsg, 1); + new_resid[0] = NTOH__u32(mds_rep->ino); + if (new_resid[0] == 0) + LBUG(); + old_res = lock->l_resource->lr_name[0]; + + CDEBUG(D_INFO, "remote intent: locking %d instead of" + "%ld\n", mds_rep->ino, (long)old_res); + + ldlm_lock_change_resource(lock, new_resid); + if (lock->l_resource == NULL) { + LBUG(); + RETURN(-ENOMEM); + } + LDLM_DEBUG(lock, "intent policy, old res %ld", + (long)old_res); + RETURN(ELDLM_LOCK_CHANGED); + } else { + int size = sizeof(struct ldlm_reply); + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, + &req->rq_repmsg); + if (rc) { + CERROR("out of memory\n"); + LBUG(); + RETURN(-ENOMEM); + } + } + RETURN(rc); +} + + extern int mds_iocontrol(long cmd, struct lustre_handle *conn, int len, void *karg, void *uarg); @@ -1150,17 +1274,14 @@ static struct obd_ops mds_obd_ops = { static int __init mds_init(void) { - inter_module_register("mds_reint", THIS_MODULE, &mds_reint); - inter_module_register("mds_getattr_name", THIS_MODULE, - &mds_getattr_name); class_register_type(&mds_obd_ops, LUSTRE_MDS_NAME); + ldlm_register_intent(ldlm_intent_policy); return 0; } static void __exit mds_exit(void) { - inter_module_unregister("mds_reint"); - inter_module_unregister("mds_getattr_name"); + ldlm_unregister_intent(); class_unregister_type(LUSTRE_MDS_NAME); } diff --git a/lustre/mds/mds_extN.c b/lustre/mds/mds_extN.c index fcf9faf..f9cfb36 100644 --- a/lustre/mds/mds_extN.c +++ b/lustre/mds/mds_extN.c @@ -113,7 +113,7 @@ static int mds_extN_set_md(struct inode *inode, void *handle, md->lmd_magic = cpu_to_le32(XATTR_MDS_MO_MAGIC); rc = extN_xattr_set(handle, inode, EXTN_XATTR_INDEX_LUSTRE, XATTR_LUSTRE_MDS_OBJID, md, - md->lmd_size, XATTR_CREATE); + md->lmd_easize, XATTR_CREATE); } up(&inode->i_sem); unlock_kernel(); @@ -127,7 +127,7 @@ static int mds_extN_set_md(struct inode *inode, void *handle, static int mds_extN_get_md(struct inode *inode, struct lov_stripe_md *md) { int rc; - int size = md->lmd_size; + int size = md->lmd_easize; lock_kernel(); down(&inode->i_sem); diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index edbe1b6..f35a9bf 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -101,6 +101,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset, struct ptlrpc_request *req) { struct mds_obd *mds = mds_req2mds(req); + struct obd_device *obd = req->rq_export->exp_obd; struct dentry *de; void *handle; struct lustre_handle child_lockh; @@ -113,7 +114,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset, int namelen; /* a name was supplied by the client; fid1 is the directory */ - dir = mds_fid2locked_dentry(mds, rec->ur_fid1, NULL, LCK_PR, + dir = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PR, &dir_lockh); if (!dir || IS_ERR(dir)) { l_dput(dir); @@ -123,7 +124,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset, name = lustre_msg_buf(req->rq_reqmsg, offset + 1); namelen = req->rq_reqmsg->buflens[offset + 1] - 1; - de = mds_name2locked_dentry(mds, dir, NULL, name, namelen, + de = mds_name2locked_dentry(obd, dir, NULL, name, namelen, 0, &child_lockh, LCK_PR); l_dput(dir); if (!de || IS_ERR(de)) { @@ -222,6 +223,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, { struct dentry *de = NULL; struct mds_obd *mds = mds_req2mds(req); + struct obd_device *obd = req->rq_export->exp_obd; struct dentry *dchild = NULL; struct inode *dir; void *handle; @@ -249,15 +251,13 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW; res_id[0] = dir->i_ino; - rc = ldlm_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN, + rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN, NULL, 0, lock_mode, &lockh); if (rc == 0) { LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]); - rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn, - (struct lustre_handle *)&mds->mds_connh, - NULL, mds->mds_local_namespace, NULL, + rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id, LDLM_PLAIN, NULL, 0, lock_mode, - &flags, (void *)mds_lock_callback, NULL, + &flags, ldlm_completion_ast, (void *)mds_lock_callback, NULL, 0, &lockh); if (rc != ELDLM_OK) { CERROR("lock enqueue: err: %d\n", rc); @@ -289,7 +289,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, /* If i_file_acl is set, this inode has an EA */ if (S_ISREG(inode->i_mode) && inode->u.ext3_i.i_file_acl) { md = lustre_msg_buf(req->rq_repmsg, offset + 1); - md->lmd_size = mds->mds_max_mdsize; + md->lmd_easize = mds->mds_max_mdsize; mds_fs_get_md(mds, inode, md); } /* now a normal case for intent locking */ @@ -435,6 +435,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, struct dentry *de = NULL; struct dentry *dchild = NULL; struct mds_obd *mds = mds_req2mds(req); + struct obd_device *obd = req->rq_export->exp_obd; char *name; int namelen; struct inode *dir, *inode; @@ -447,7 +448,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, /* a name was supplied by the client; fid1 is the directory */ lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW; - de = mds_fid2locked_dentry(mds, rec->ur_fid1, NULL, lock_mode, + de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, lock_mode, &lockh); if (IS_ERR(de)) { LBUG(); @@ -457,9 +458,8 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, name = lustre_msg_buf(req->rq_reqmsg, offset + 1); namelen = req->rq_reqmsg->buflens[offset + 1] - 1; - - dchild = mds_name2locked_dentry(mds, de, NULL, name, namelen, - LCK_EX, &child_lockh, lock_mode); + dchild = mds_name2locked_dentry(obd, de, NULL, name, namelen, + LCK_EX, &child_lockh, lock_mode); if (IS_ERR(dchild) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK)) { LBUG(); @@ -498,7 +498,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, if (rc < 0) { CDEBUG(D_INFO, "No md for ino %ld err %d\n", inode->i_ino, rc); - memset(md, 0, md->lmd_size); + memset(md, 0, md->lmd_easize); } } default: @@ -608,6 +608,7 @@ out_link: static int mds_reint_rename(struct mds_update_record *rec, int offset, struct ptlrpc_request *req) { + struct obd_device *obd = req->rq_export->exp_obd; struct dentry *de_srcdir = NULL; struct dentry *de_tgtdir = NULL; struct dentry *de_old = NULL; @@ -626,15 +627,13 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset, lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW; res_id[0] = de_srcdir->d_inode->i_ino; - rc = ldlm_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN, + rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN, NULL, 0, lock_mode, &srclockh); if (rc == 0) { LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]); - rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn, - (struct lustre_handle *)&mds->mds_connh, - NULL, mds->mds_local_namespace, NULL, + rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id, LDLM_PLAIN, NULL, 0, lock_mode, - &flags, (void *)mds_lock_callback, NULL, + &flags, ldlm_completion_ast, (void *)mds_lock_callback, NULL, 0, &srclockh); if (rc != ELDLM_OK) { CERROR("lock enqueue: err: %d\n", rc); @@ -650,15 +649,13 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset, lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW; res_id[0] = de_tgtdir->d_inode->i_ino; - rc = ldlm_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN, + rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN, NULL, 0, lock_mode, &tgtlockh); if (rc == 0) { LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]); - rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn, - (struct lustre_handle *)&mds->mds_connh, - NULL, mds->mds_local_namespace, NULL, + rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id, LDLM_PLAIN, NULL, 0, lock_mode, - &flags, (void *)mds_lock_callback, NULL, + &flags, ldlm_completion_ast, (void *)mds_lock_callback, NULL, 0, &tgtlockh); if (rc != ELDLM_OK) { CERROR("lock enqueue: err: %d\n", rc); @@ -714,12 +711,10 @@ out_rename_deold: * about to free, to force everyone to drop their * locks. */ LDLM_DEBUG_NOLOCK("getting EX lock res %Lu", res_id[0]); - rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn, - (struct lustre_handle *)&mds->mds_connh, - NULL, mds->mds_local_namespace, NULL, + rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id, LDLM_PLAIN, NULL, 0, LCK_EX, &flags, - (void *)mds_lock_callback, NULL, 0, + ldlm_completion_ast, (void *)mds_lock_callback, NULL, 0, &oldhandle); if (rc) CERROR("failed to get child inode lock (child ino %Ld, " diff --git a/lustre/obdclass/class_obd.c b/lustre/obdclass/class_obd.c index 73eb518..41d4793 100644 --- a/lustre/obdclass/class_obd.c +++ b/lustre/obdclass/class_obd.c @@ -440,7 +440,7 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp, case OBD_IOC_GETATTR: { obd_data2conn(&conn, data); - err = obd_getattr(&conn, &data->ioc_obdo1); + err = obd_getattr(&conn, &data->ioc_obdo1, NULL); if (err) GOTO(out, err); @@ -450,7 +450,7 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp, case OBD_IOC_SETATTR: { obd_data2conn(&conn, data); - err = obd_setattr(&conn, &data->ioc_obdo1); + err = obd_setattr(&conn, &data->ioc_obdo1, NULL); if (err) GOTO(out, err); diff --git a/lustre/obdecho/echo.c b/lustre/obdecho/echo.c index 0cd3708..3c0c33f 100644 --- a/lustre/obdecho/echo.c +++ b/lustre/obdecho/echo.c @@ -11,8 +11,8 @@ * by Peter Braam */ -static char rcsid[] __attribute ((unused)) = "$Id: echo.c,v 1.18 2002/07/26 16:54:55 rread Exp $"; -#define OBDECHO_VERSION "$Revision: 1.18 $" +static char rcsid[] __attribute ((unused)) = "$Id: echo.c,v 1.19 2002/07/31 20:49:37 braam Exp $"; +#define OBDECHO_VERSION "$Revision: 1.19 $" #define EXPORT_SYMTAB @@ -23,6 +23,7 @@ static char rcsid[] __attribute ((unused)) = "$Id: echo.c,v 1.18 2002/07/26 16:5 #include #include #include +#include #include #include @@ -37,6 +38,60 @@ static struct obdo OA; static obd_count GEN; static long echo_pages = 0; +static atomic_t echo_page_rws; +static atomic_t echo_getattrs; + +#define ECHO_PROC_STAT "sys/obdecho" + +int +echo_proc_read (char *page, char **start, off_t off, int count, int *eof, void *data) +{ + int len; + int attrs = atomic_read (&echo_getattrs); + int pages = atomic_read (&echo_page_rws); + + *eof = 1; + if (off != 0) + return (0); + + len = sprintf (page, "%d %d\n", pages, attrs); + + *start = page; + return (len); +} + +int +echo_proc_write (struct file *file, const char *ubuffer, unsigned long count, void *data) +{ + /* Ignore what we've been asked to write, and just zero the stats counters */ + atomic_set (&echo_page_rws, 0); + atomic_set (&echo_getattrs, 0); + + return (count); +} + +void +echo_proc_init(void) +{ + struct proc_dir_entry *entry = create_proc_entry (ECHO_PROC_STAT, S_IFREG | S_IRUGO | S_IWUSR, NULL); + + if (entry == NULL) + { + CERROR("couldn't create proc entry %s\n", ECHO_PROC_STAT); + return; + } + + entry->data = NULL; + entry->read_proc = echo_proc_read; + entry->write_proc = echo_proc_write; +} + +void +echo_proc_fini(void) +{ + remove_proc_entry(ECHO_PROC_STAT, 0); +} + static int echo_connect(struct lustre_handle *conn, struct obd_device *obd) { int rc; @@ -61,11 +116,14 @@ static int echo_disconnect(struct lustre_handle *conn) return rc; } -static int echo_getattr(struct lustre_handle *conn, struct obdo *oa) +static int echo_getattr(struct lustre_handle *conn, struct obdo *oa, + struct lov_stripe_md *md) { memcpy(oa, &OA, sizeof(*oa)); oa->o_mode = ++GEN; + atomic_inc (&echo_getattrs); + return 0; } @@ -160,6 +218,8 @@ int echo_commitrw(int cmd, struct lustre_handle *conn, int objcount, GOTO(commitrw_cleanup, rc = -EFAULT); } + atomic_inc (&echo_page_rws); + kunmap(page); __free_pages(page, 0); echo_pages--; @@ -193,11 +253,15 @@ static int __init obdecho_init(void) { printk(KERN_INFO "Echo OBD driver " OBDECHO_VERSION " info@clusterfs.com\n"); + echo_proc_init(); + return class_register_type(&echo_obd_ops, OBD_ECHO_DEVICENAME); } static void __exit obdecho_exit(void) { + echo_proc_fini (); + CERROR("%ld prep/commitrw pages leaked\n", echo_pages); class_unregister_type(OBD_ECHO_DEVICENAME); } diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 4e1ec72..22ba1f2 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -490,7 +490,8 @@ static inline void filter_from_inode(struct obdo *oa, struct inode *inode) EXIT; } -static int filter_getattr(struct lustre_handle *conn, struct obdo *oa) +static int filter_getattr(struct lustre_handle *conn, struct obdo *oa, + struct lov_stripe_md *md) { struct obd_device *obddev = class_conn2obd(conn); struct dentry *dentry; @@ -514,7 +515,8 @@ static int filter_getattr(struct lustre_handle *conn, struct obdo *oa) RETURN(0); } -static int filter_setattr(struct lustre_handle *conn, struct obdo *oa) +static int filter_setattr(struct lustre_handle *conn, struct obdo *oa, + struct lov_stripe_md *md) { struct obd_run_ctxt saved; struct obd_device *obd = class_conn2obd(conn); @@ -698,15 +700,19 @@ out: /* NB count and offset are used for punch, but not truncate */ static int filter_truncate(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md *md, - obd_size count, obd_off offset) + obd_off start, obd_off end) { int error; ENTRY; - CDEBUG(D_INODE, "calling truncate for object #%Ld, valid = %x, " - "o_size = %Ld\n", oa->o_id, oa->o_valid, oa->o_size); - error = filter_setattr(conn, oa); + if ( end != 0xffffffffffffffff ) { + CERROR("PUNCH not supported, only truncate works\n"); + } + CDEBUG(D_INODE, "calling truncate for object #%Ld, valid = %x, " + "o_size = %Ld\n", oa->o_id, oa->o_valid, start); + oa->o_size = start; + error = filter_setattr(conn, oa, NULL); RETURN(error); } diff --git a/lustre/osc/Makefile.am b/lustre/osc/Makefile.am index 45253da..17276bc 100644 --- a/lustre/osc/Makefile.am +++ b/lustre/osc/Makefile.am @@ -9,7 +9,7 @@ MODULE = osc modulefs_DATA = osc.o EXTRA_PROGRAMS = osc -LINX= obd_pack.c ll_pack.c +LINX= obd_pack.c ll_pack.c l_net.c osc_SOURCES = osc_request.c $(LINX) obd_pack.c: @@ -18,6 +18,9 @@ obd_pack.c: ll_pack.c: test -e ll_pack.c || ln -sf $(top_srcdir)/lib/ll_pack.c +l_net.c: + test -e l_net.c || ln -sf $(top_srcdir)/lib/l_net.c . + dist-hook: list='$(LINX)'; for f in $$list; do rm -f $(distdir)/$$f; done diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 96dedd4..006b745 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -26,127 +26,17 @@ #include #include -static void osc_con2cl(struct lustre_handle *conn, struct ptlrpc_client **cl, - struct ptlrpc_connection **connection, - struct lustre_handle **rconn) -{ - struct obd_export *export = class_conn2export(conn); - struct osc_obd *osc = &export->exp_obd->u.osc; - - *cl = osc->osc_client; - *connection = osc->osc_conn; - *rconn = &export->exp_rconnh; -} - -static void osc_con2dlmcl(struct lustre_handle *conn, struct ptlrpc_client **cl, - struct ptlrpc_connection **connection, - struct lustre_handle **rconn) -{ - struct obd_export *export = class_conn2export(conn); - struct osc_obd *osc = &export->exp_obd->u.osc; - - *cl = osc->osc_ldlm_client; - *connection = osc->osc_conn; - *rconn = &export->exp_rconnh; -} - -static int osc_connect(struct lustre_handle *conn, struct obd_device *obd) -{ - struct osc_obd *osc = &obd->u.osc; - //struct obd_import *import; - struct ptlrpc_request *request; - char *tmp = osc->osc_target_uuid; - int rc, size = sizeof(osc->osc_target_uuid); - ENTRY; - - /* not used yet - OBD_ALLOC(import, sizeof(*import)); - if (!import) - RETURN(-ENOMEM); - */ - - MOD_INC_USE_COUNT; - rc = class_connect(conn, obd); - if (rc) - RETURN(rc); - - request = ptlrpc_prep_req(osc->osc_client, osc->osc_conn, - OST_CONNECT, 1, &size, &tmp); - if (!request) - GOTO(out_disco, rc = -ENOMEM); - - request->rq_level = LUSTRE_CONN_NEW; - request->rq_replen = lustre_msg_size(0, NULL); - request->rq_reqmsg->addr = -1; - /* Sending our local connection info breaks for local connections - request->rq_reqmsg->addr = conn->addr; - request->rq_reqmsg->cookie = conn->cookie; - */ - - rc = ptlrpc_queue_wait(request); - rc = ptlrpc_check_status(request, rc); - if (rc) { - CERROR("%s failed: rc = %d\n", __FUNCTION__, rc); - GOTO(out, rc); - } - - /* XXX eventually maybe more refinement */ - osc->osc_conn->c_level = LUSTRE_CONN_FULL; - - class_rconn2export(conn, (struct lustre_handle *)request->rq_repmsg); - - EXIT; - out: - ptlrpc_free_req(request); - out_disco: - if (rc) { - class_disconnect(conn); - MOD_DEC_USE_COUNT; - } - return rc; -} - -static int osc_disconnect(struct lustre_handle *conn) -{ - struct ptlrpc_request *request; - struct ptlrpc_client *cl; - struct ptlrpc_connection *connection; - struct lustre_handle *rconn; - int rc; - ENTRY; - - osc_con2cl(conn, &cl, &connection, &rconn); - request = ptlrpc_prep_req2(cl, connection, rconn, - OST_DISCONNECT, 0, NULL, NULL); - if (!request) - RETURN(-ENOMEM); - request->rq_replen = lustre_msg_size(0, NULL); - rc = ptlrpc_queue_wait(request); - if (rc) - GOTO(out, rc); - rc = class_disconnect(conn); - if (!rc) - MOD_DEC_USE_COUNT; - out: - ptlrpc_free_req(request); - return rc; -} - -static int osc_getattr(struct lustre_handle *conn, struct obdo *oa) +static int osc_getattr(struct lustre_handle *conn, struct obdo *oa, + struct lov_stripe_md *md) { struct ptlrpc_request *request; - struct ptlrpc_client *cl; - struct ptlrpc_connection *connection; - struct lustre_handle *rconn; struct ost_body *body; int rc, size = sizeof(*body); ENTRY; - osc_con2cl(conn, &cl, &connection, &rconn); - request = ptlrpc_prep_req2(cl, connection, rconn, - OST_GETATTR, 1, &size, NULL); + request = ptlrpc_prep_req2(conn, OST_GETATTR, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -178,16 +68,11 @@ static int osc_open(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md *md) { struct ptlrpc_request *request; - struct ptlrpc_client *cl; - struct ptlrpc_connection *connection; - struct lustre_handle *rconn; struct ost_body *body; int rc, size = sizeof(*body); ENTRY; - osc_con2cl(conn, &cl, &connection, &rconn); - request = ptlrpc_prep_req2(cl, connection, rconn, - OST_OPEN, 1, &size, NULL); + request = ptlrpc_prep_req2(conn, OST_OPEN, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -217,16 +102,11 @@ static int osc_close(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md *md) { struct ptlrpc_request *request; - struct ptlrpc_client *cl; - struct ptlrpc_connection *connection; - struct lustre_handle *rconn; struct ost_body *body; int rc, size = sizeof(*body); ENTRY; - osc_con2cl(conn, &cl, &connection, &rconn); - request = ptlrpc_prep_req2(cl, connection, rconn, - OST_CLOSE, 1, &size, NULL); + request = ptlrpc_prep_req2(conn, OST_CLOSE, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -254,19 +134,15 @@ static int osc_close(struct lustre_handle *conn, struct obdo *oa, return rc; } -static int osc_setattr(struct lustre_handle *conn, struct obdo *oa) +static int osc_setattr(struct lustre_handle *conn, struct obdo *oa, + struct lov_stripe_md *md) { struct ptlrpc_request *request; - struct ptlrpc_client *cl; - struct ptlrpc_connection *connection; - struct lustre_handle *rconn; struct ost_body *body; int rc, size = sizeof(*body); ENTRY; - osc_con2cl(conn, &cl, &connection, &rconn); - request = ptlrpc_prep_req2(cl, connection, rconn, - OST_SETATTR, 1, &size, NULL); + request = ptlrpc_prep_req2(conn, OST_SETATTR, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -288,9 +164,6 @@ static int osc_create(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md **ea) { struct ptlrpc_request *request; - struct ptlrpc_client *cl; - struct ptlrpc_connection *connection; - struct lustre_handle *rconn; struct ost_body *body; int rc, size = sizeof(*body); ENTRY; @@ -308,12 +181,10 @@ static int osc_create(struct lustre_handle *conn, struct obdo *oa, OBD_ALLOC(*ea, oa->o_easize); if (!*ea) RETURN(-ENOMEM); - (*ea)->lmd_size = oa->o_easize; + (*ea)->lmd_easize = oa->o_easize; } - osc_con2cl(conn, &cl, &connection, &rconn); - request = ptlrpc_prep_req2(cl, connection, rconn, - OST_CREATE, 1, &size, NULL); + request = ptlrpc_prep_req2(conn, OST_CREATE, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -339,13 +210,10 @@ static int osc_create(struct lustre_handle *conn, struct obdo *oa, } static int osc_punch(struct lustre_handle *conn, struct obdo *oa, - struct lov_stripe_md *md, obd_size count, - obd_off offset) + struct lov_stripe_md *md, obd_size start, + obd_size end) { struct ptlrpc_request *request; - struct ptlrpc_client *cl; - struct ptlrpc_connection *connection; - struct lustre_handle *rconn; struct ost_body *body; int rc, size = sizeof(*body); ENTRY; @@ -354,16 +222,18 @@ static int osc_punch(struct lustre_handle *conn, struct obdo *oa, CERROR("oa NULL\n"); RETURN(-EINVAL); } - osc_con2cl(conn, &cl, &connection, &rconn); - request = ptlrpc_prep_req2(cl, connection, rconn, - OST_PUNCH, 1, &size, NULL); + + request = ptlrpc_prep_req2(conn, OST_PUNCH, 1, &size, NULL); if (!request) RETURN(-ENOMEM); body = lustre_msg_buf(request->rq_reqmsg, 0); memcpy(&body->oa, oa, sizeof(*oa)); - body->oa.o_blocks = count; - body->oa.o_valid |= OBD_MD_FLBLOCKS; + + /* overload the blocks and size fields in the oa with start/end */ + body->oa.o_blocks = start; + body->oa.o_size = end; + body->oa.o_valid |= OBD_MD_FLBLOCKS | OBD_MD_FLSIZE; request->rq_replen = lustre_msg_size(1, &size); @@ -385,9 +255,6 @@ static int osc_destroy(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md *ea) { struct ptlrpc_request *request; - struct ptlrpc_client *cl; - struct ptlrpc_connection *connection; - struct lustre_handle *rconn; struct ost_body *body; int rc, size = sizeof(*body); ENTRY; @@ -396,9 +263,7 @@ static int osc_destroy(struct lustre_handle *conn, struct obdo *oa, CERROR("oa NULL\n"); RETURN(-EINVAL); } - osc_con2cl(conn, &cl, &connection, &rconn); - request = ptlrpc_prep_req2(cl, connection, rconn, - OST_DESTROY, 1, &size, NULL); + request = ptlrpc_prep_req2(conn, OST_DESTROY, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -476,9 +341,7 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md, obd_size *count, obd_off *offset, obd_flag *flags, brw_callback_t callback, void *data) { - struct ptlrpc_client *cl; - struct ptlrpc_connection *connection; - struct lustre_handle *rconn; + struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn; struct ptlrpc_request *request = NULL; struct ptlrpc_bulk_desc *desc = NULL; struct ost_body *body; @@ -486,14 +349,13 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md, int rc, size[3] = {sizeof(*body)}; void *iooptr, *nioptr; int mapped = 0; + __u32 xid; ENTRY; size[1] = sizeof(struct obd_ioobj); size[2] = page_count * sizeof(struct niobuf_remote); - osc_con2cl(conn, &cl, &connection, &rconn); - request = ptlrpc_prep_req2(cl, connection, rconn, - OST_READ, 3, size, NULL); + request = ptlrpc_prep_req2(conn, OST_READ, 3, size, NULL); if (!request) RETURN(-ENOMEM); @@ -517,14 +379,16 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md, ost_pack_ioo(&iooptr, md, page_count); /* end almost identical to brw_write case */ + spin_lock(&connection->c_lock); + xid = ++connection->c_xid_out; /* single xid for all pages */ + spin_unlock(&connection->c_lock); + for (mapped = 0; mapped < page_count; mapped++) { struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc); if (bulk == NULL) GOTO(out_unmap, rc = -ENOMEM); - spin_lock(&connection->c_lock); - bulk->b_xid = ++connection->c_xid_out; - spin_unlock(&connection->c_lock); + bulk->b_xid = xid; /* single xid for all pages */ bulk->b_buf = kmap(page_array[mapped]); bulk->b_page = page_array[mapped]; @@ -603,9 +467,7 @@ static int osc_brw_write(struct lustre_handle *conn, obd_off *offset, obd_flag *flags, brw_callback_t callback, void *data) { - struct ptlrpc_client *cl; - struct ptlrpc_connection *connection; - struct lustre_handle *rconn; + struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn; struct ptlrpc_request *request = NULL; struct ptlrpc_bulk_desc *desc = NULL; struct ost_body *body; @@ -620,9 +482,7 @@ static int osc_brw_write(struct lustre_handle *conn, size[1] = sizeof(struct obd_ioobj); size[2] = page_count * sizeof(*remote); - osc_con2cl(conn, &cl, &connection, &rconn); - request = ptlrpc_prep_req2(cl, connection, rconn, - OST_WRITE, 3, size, NULL); + request = ptlrpc_prep_req2(conn, OST_WRITE, 3, size, NULL); if (!request) RETURN(-ENOMEM); @@ -754,16 +614,13 @@ static int osc_brw(int cmd, struct lustre_handle *conn, offset, flags, callback, data); } -static int osc_enqueue(struct lustre_handle *conn, +static int osc_enqueue(struct lustre_handle *connh, struct lustre_handle *parent_lock, __u64 *res_id, __u32 type, void *extentp, int extent_len, __u32 mode, int *flags, void *callback, void *data, int datalen, struct lustre_handle *lockh) { - struct obd_device *obddev = class_conn2obd(conn); - struct ptlrpc_connection *connection; - struct ptlrpc_client *cl; - struct lustre_handle *rconn; + struct obd_device *obddev = class_conn2obd(connh); struct ldlm_extent *extent = extentp; int rc; __u32 mode2; @@ -774,7 +631,7 @@ static int osc_enqueue(struct lustre_handle *conn, extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK; /* Next, search for already existing extent locks that will cover us */ - osc_con2dlmcl(conn, &cl, &connection, &rconn); + //osc_con2dlmcl(conn, &cl, &connection, &rconn); rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent, sizeof(extent), mode, lockh); if (rc == 1) { @@ -804,16 +661,16 @@ static int osc_enqueue(struct lustre_handle *conn, if (mode == LCK_PR) return 0; - rc = ldlm_cli_convert(cl, lockh, rconn, mode, &flags); + rc = ldlm_cli_convert(lockh, mode, &flags); if (rc) LBUG(); return rc; } - rc = ldlm_cli_enqueue(cl, connection, rconn, NULL,obddev->obd_namespace, + rc = ldlm_cli_enqueue(connh, NULL,obddev->obd_namespace, parent_lock, res_id, type, extent, sizeof(extent), - mode, flags, callback, data, datalen, lockh); + mode, flags, ldlm_completion_ast, callback, data, datalen, lockh); return rc; } @@ -827,103 +684,14 @@ static int osc_cancel(struct lustre_handle *oconn, __u32 mode, RETURN(0); } -static int osc_setup(struct obd_device *obddev, obd_count len, void *buf) -{ - struct obd_ioctl_data* data = buf; - struct osc_obd *osc = &obddev->u.osc; - char server_uuid[37]; - int rc; - ENTRY; - - if (data->ioc_inllen1 < 1) { - CERROR("osc setup requires a TARGET UUID\n"); - RETURN(-EINVAL); - } - - if (data->ioc_inllen1 > 37) { - CERROR("osc TARGET UUID must be less than 38 characters\n"); - RETURN(-EINVAL); - } - - if (data->ioc_inllen2 < 1) { - CERROR("osc setup requires a SERVER UUID\n"); - RETURN(-EINVAL); - } - - if (data->ioc_inllen2 > 37) { - CERROR("osc SERVER UUID must be less than 38 characters\n"); - RETURN(-EINVAL); - } - - memcpy(osc->osc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1); - memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2, - sizeof(server_uuid))); - - osc->osc_conn = ptlrpc_uuid_to_connection(server_uuid); - if (!osc->osc_conn) - RETURN(-ENOENT); - - obddev->obd_namespace = - ldlm_namespace_new("osc", LDLM_NAMESPACE_CLIENT); - if (obddev->obd_namespace == NULL) - GOTO(out_conn, rc = -ENOMEM); - - OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client)); - if (osc->osc_client == NULL) - GOTO(out_ns, rc = -ENOMEM); - - OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client)); - if (osc->osc_ldlm_client == NULL) - GOTO(out_client, rc = -ENOMEM); - - ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL, - osc->osc_client); - ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, - osc->osc_ldlm_client); - osc->osc_client->cli_name = "osc"; - osc->osc_ldlm_client->cli_name = "ldlm"; - - MOD_INC_USE_COUNT; - RETURN(0); - - out_client: - OBD_FREE(osc->osc_client, sizeof(*osc->osc_client)); - out_ns: - ldlm_namespace_free(obddev->obd_namespace); - out_conn: - ptlrpc_put_connection(osc->osc_conn); - return rc; -} - -static int osc_cleanup(struct obd_device * obddev) -{ - struct osc_obd *osc = &obddev->u.osc; - - ldlm_namespace_free(obddev->obd_namespace); - - ptlrpc_cleanup_client(osc->osc_client); - OBD_FREE(osc->osc_client, sizeof(*osc->osc_client)); - ptlrpc_cleanup_client(osc->osc_ldlm_client); - OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client)); - ptlrpc_put_connection(osc->osc_conn); - - MOD_DEC_USE_COUNT; - return 0; -} - static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs) { struct ptlrpc_request *request; - struct ptlrpc_client *cl; - struct ptlrpc_connection *connection; - struct lustre_handle *rconn; struct obd_statfs *osfs; int rc, size = sizeof(*osfs); ENTRY; - osc_con2cl(conn, &cl, &connection, &rconn); - request = ptlrpc_prep_req2(cl, connection, rconn, - OST_STATFS, 0, NULL, NULL); + request = ptlrpc_prep_req2(conn, OST_STATFS, 0, NULL, NULL); if (!request) RETURN(-ENOMEM); @@ -946,8 +714,8 @@ static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs) } struct obd_ops osc_obd_ops = { - o_setup: osc_setup, - o_cleanup: osc_cleanup, + o_setup: client_obd_setup, + o_cleanup: client_obd_cleanup, o_statfs: osc_statfs, o_create: osc_create, o_destroy: osc_destroy, @@ -955,8 +723,8 @@ struct obd_ops osc_obd_ops = { o_setattr: osc_setattr, o_open: osc_open, o_close: osc_close, - o_connect: osc_connect, - o_disconnect: osc_disconnect, + o_connect: client_obd_connect, + o_disconnect: client_obd_disconnect, o_brw: osc_brw, o_punch: osc_punch, o_enqueue: osc_enqueue, diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index d86e05f..333ca79 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -71,7 +71,7 @@ static int ost_getattr(struct ptlrpc_request *req) repbody = lustre_msg_buf(req->rq_repmsg, 0); memcpy(&repbody->oa, &body->oa, sizeof(body->oa)); - req->rq_status = obd_getattr(conn, &repbody->oa); + req->rq_status = obd_getattr(conn, &repbody->oa, NULL); RETURN(0); } @@ -192,7 +192,7 @@ static int ost_setattr(struct ptlrpc_request *req) repbody = lustre_msg_buf(req->rq_repmsg, 0); memcpy(&repbody->oa, &body->oa, sizeof(body->oa)); - req->rq_status = obd_setattr(conn, &repbody->oa); + req->rq_status = obd_setattr(conn, &repbody->oa, NULL); RETURN(0); } @@ -297,6 +297,8 @@ static int ost_brw_write(struct ptlrpc_request *req) void *tmp1, *tmp2, *end2; void *desc_priv = NULL; int reply_sent = 0; + struct ptlrpc_service *srv; + __u32 xid; ENTRY; body = lustre_msg_buf(req->rq_reqmsg, 0); @@ -343,17 +345,19 @@ static int ost_brw_write(struct ptlrpc_request *req) desc->b_desc_private = desc_priv; memcpy(&(desc->b_conn), &conn, sizeof(conn)); + srv = req->rq_obd->u.ost.ost_service; + spin_lock(&srv->srv_lock); + xid = srv->srv_xid++; /* single xid for all pages */ + spin_unlock(&srv->srv_lock); + for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) { - struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service; struct ptlrpc_bulk_page *bulk; bulk = ptlrpc_prep_bulk_page(desc); if (bulk == NULL) GOTO(fail_bulk, rc = -ENOMEM); - spin_lock(&srv->srv_lock); - bulk->b_xid = srv->srv_xid++; - spin_unlock(&srv->srv_lock); + bulk->b_xid = xid; /* single xid for all pages */ bulk->b_buf = lnb->addr; bulk->b_page = lnb->page; @@ -489,6 +493,36 @@ static int ost_handle(struct ptlrpc_request *req) OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0); rc = ost_statfs(req); break; + case LDLM_ENQUEUE: + CDEBUG(D_INODE, "enqueue\n"); + OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0); + rc = ldlm_handle_enqueue(req); + if (rc) + break; + RETURN(0); + case LDLM_CONVERT: + CDEBUG(D_INODE, "convert\n"); + OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0); + rc = ldlm_handle_convert(req); + if (rc) + break; + RETURN(0); + case LDLM_CANCEL: + CDEBUG(D_INODE, "cancel\n"); + OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0); + rc = ldlm_handle_cancel(req); + if (rc) + break; + RETURN(0); + case LDLM_CALLBACK: + CDEBUG(D_INODE, "callback\n"); + CERROR("callbacks should not happen on MDS\n"); + LBUG(); + OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0); + break; + + + default: req->rq_status = -ENOTSUPP; rc = ptlrpc_error(req->rq_svc, req); @@ -526,12 +560,18 @@ static int ost_setup(struct obd_device *obddev, obd_count len, void *buf) int i; ENTRY; - if (data->ioc_dev < 0 || data->ioc_dev > MAX_OBD_DEVICES) - RETURN(-ENODEV); + if (data->ioc_inllen1 < 1) { + CERROR("requires a TARGET OBD UUID\n"); + RETURN(-EINVAL); + } + if (data->ioc_inllen1 > 37) { + CERROR("OBD UUID must be less than 38 characters\n"); + RETURN(-EINVAL); + } MOD_INC_USE_COUNT; - tgt = &obd_dev[data->ioc_dev]; - if (!(tgt->obd_flags & OBD_ATTACHED) || + tgt = class_uuid2obd(data->ioc_inlbuf1); + if (!tgt || !(tgt->obd_flags & OBD_ATTACHED) || !(tgt->obd_flags & OBD_SET_UP)) { CERROR("device not attached or not set up (%d)\n", data->ioc_dev); @@ -552,8 +592,9 @@ static int ost_setup(struct obd_device *obddev, obd_count len, void *buf) } for (i = 0; i < OST_NUM_THREADS; i++) { - err = ptlrpc_start_thread(obddev, ost->ost_service, - "lustre_ost"); + char name[32]; + sprintf(name, "lustre_ost_%2d", i); + err = ptlrpc_start_thread(obddev, ost->ost_service, name); if (err) { CERROR("error starting thread #%d: rc %d\n", i, err); GOTO(error_disc, err = -EINVAL); diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index b135afe..983b071 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -23,6 +23,7 @@ #define DEBUG_SUBSYSTEM S_RPC #include +#include #include #include @@ -94,10 +95,11 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *conn) OBD_ALLOC(desc, sizeof(*desc)); if (desc != NULL) { desc->b_connection = ptlrpc_connection_addref(conn); - atomic_set(&desc->b_pages_remaining, 0); atomic_set(&desc->b_refcount, 1); init_waitqueue_head(&desc->b_waitq); INIT_LIST_HEAD(&desc->b_page_list); + ptl_set_inv_handle(&desc->b_md_h); + ptl_set_inv_handle(&desc->b_me_h); } return desc; @@ -110,11 +112,8 @@ struct ptlrpc_bulk_page *ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc) OBD_ALLOC(bulk, sizeof(*bulk)); if (bulk != NULL) { bulk->b_desc = desc; - ptl_set_inv_handle(&bulk->b_md_h); - ptl_set_inv_handle(&bulk->b_me_h); list_add_tail(&bulk->b_link, &desc->b_page_list); desc->b_page_count++; - atomic_inc(&desc->b_pages_remaining); } return bulk; } @@ -198,15 +197,25 @@ struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, RETURN(request); } -struct ptlrpc_request *ptlrpc_prep_req2(struct ptlrpc_client *cl, - struct ptlrpc_connection *conn, - struct lustre_handle *handle, +struct ptlrpc_request *ptlrpc_prep_req2(struct lustre_handle *conn, int opcode, int count, int *lengths, char **bufs) { + struct client_obd *clobd; struct ptlrpc_request *req; - req = ptlrpc_prep_req(cl, conn, opcode, count, lengths, bufs); - ptlrpc_hdl2req(req, handle); + struct obd_export *export; + + export = class_conn2export(conn); + if (!export) { + LBUG(); + CERROR("NOT connected\n"); + return NULL; + } + + clobd = &export->exp_obd->u.cli; + req = ptlrpc_prep_req(clobd->cl_client, clobd->cl_conn, + opcode, count, lengths, bufs); + ptlrpc_hdl2req(req, &clobd->cl_exporth); return req; } diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index 3bf879f..4cc7a74 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -37,6 +37,8 @@ static int request_out_callback(ptl_event_t *ev) { ENTRY; + LASSERT ((ev->mem_desc.options & PTL_MD_IOV) == 0); /* requests always contiguous */ + if (ev->type != PTL_EVENT_SENT) { // XXX make sure we understand all events, including ACK's CERROR("Unknown event %d\n", ev->type); @@ -54,6 +56,8 @@ static int reply_out_callback(ptl_event_t *ev) { ENTRY; + LASSERT ((ev->mem_desc.options & PTL_MD_IOV) == 0); /* replies always contiguous */ + if (ev->type == PTL_EVENT_SENT) { OBD_FREE(ev->mem_desc.start, ev->mem_desc.length); } else { @@ -73,6 +77,8 @@ static int reply_in_callback(ptl_event_t *ev) struct ptlrpc_request *req = ev->mem_desc.user_ptr; ENTRY; + LASSERT ((ev->mem_desc.options & PTL_MD_IOV) == 0); /* replies always contiguous */ + if (req->rq_xid == 0x5a5a5a5a5a5a5a5a) { CERROR("Reply received for freed request! Probably a missing " "ptlrpc_abort()\n"); @@ -101,6 +107,8 @@ int request_in_callback(ptl_event_t *ev) { struct ptlrpc_service *service = ev->mem_desc.user_ptr; + LASSERT ((ev->mem_desc.options & PTL_MD_IOV) == 0); /* requests always contiguous */ + if (ev->rlength != ev->mlength) CERROR("Warning: Possibly truncated rpc (%d/%d)\n", ev->mlength, ev->rlength); @@ -115,22 +123,30 @@ int request_in_callback(ptl_event_t *ev) static int bulk_source_callback(ptl_event_t *ev) { - struct ptlrpc_bulk_page *bulk = ev->mem_desc.user_ptr; - struct ptlrpc_bulk_desc *desc = bulk->b_desc; + struct ptlrpc_bulk_desc *desc = ev->mem_desc.user_ptr; + struct ptlrpc_bulk_page *bulk; + struct list_head *tmp; + struct list_head *next; ENTRY; + /* 1 fragment for each page always */ + LASSERT (ev->mem_desc.niov == desc->b_page_count); + if (ev->type == PTL_EVENT_SENT) { CDEBUG(D_NET, "got SENT event\n"); } else if (ev->type == PTL_EVENT_ACK) { CDEBUG(D_NET, "got ACK event\n"); - if (bulk->b_cb != NULL) - bulk->b_cb(bulk); - if (atomic_dec_and_test(&desc->b_pages_remaining)) { - desc->b_flags |= PTL_BULK_FL_SENT; - wake_up(&desc->b_waitq); - if (desc->b_cb != NULL) - desc->b_cb(desc, desc->b_cb_data); + + list_for_each_safe(tmp, next, &desc->b_page_list) { + bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link); + + if (bulk->b_cb != NULL) + bulk->b_cb(bulk); } + desc->b_flags |= PTL_BULK_FL_SENT; + wake_up(&desc->b_waitq); + if (desc->b_cb != NULL) + desc->b_cb(desc, desc->b_cb_data); } else { CERROR("Unexpected event type!\n"); LBUG(); @@ -141,21 +157,36 @@ static int bulk_source_callback(ptl_event_t *ev) static int bulk_sink_callback(ptl_event_t *ev) { - struct ptlrpc_bulk_page *bulk = ev->mem_desc.user_ptr; - struct ptlrpc_bulk_desc *desc = bulk->b_desc; + struct ptlrpc_bulk_desc *desc = ev->mem_desc.user_ptr; + struct ptlrpc_bulk_page *bulk; + struct list_head *tmp; + struct list_head *next; + ptl_size_t total = 0; ENTRY; if (ev->type == PTL_EVENT_PUT) { - if (bulk->b_buf != ev->mem_desc.start + ev->offset) - CERROR("bulkbuf != mem_desc -- why?\n"); - if (bulk->b_cb != NULL) - bulk->b_cb(bulk); - if (atomic_dec_and_test(&desc->b_pages_remaining)) { - desc->b_flags |= PTL_BULK_FL_RCVD; - wake_up(&desc->b_waitq); - if (desc->b_cb != NULL) - desc->b_cb(desc, desc->b_cb_data); + /* put with zero offset */ + LASSERT (ev->offset == 0); + /* used iovs */ + LASSERT ((ev->mem_desc.options & PTL_MD_IOV) != 0); + /* 1 fragment for each page always */ + LASSERT (ev->mem_desc.niov == desc->b_page_count); + + list_for_each_safe (tmp, next, &desc->b_page_list) { + bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link); + + total += bulk->b_buflen; + + if (bulk->b_cb != NULL) + bulk->b_cb(bulk); } + + LASSERT (ev->mem_desc.length == total); + + desc->b_flags |= PTL_BULK_FL_RCVD; + wake_up(&desc->b_waitq); + if (desc->b_cb != NULL) + desc->b_cb(desc, desc->b_cb_data); } else { CERROR("Unexpected event type!\n"); LBUG(); diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 408939c..0f7c955 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -119,47 +119,96 @@ static int ptl_send_buf(struct ptlrpc_request *request, return rc; } +static inline struct iovec * +ptlrpc_get_bulk_iov (struct ptlrpc_bulk_desc *desc) +{ + struct iovec *iov; + + if (desc->b_page_count <= sizeof (desc->b_iov)/sizeof (struct iovec)) + return (desc->b_iov); + + OBD_ALLOC (iov, desc->b_page_count * sizeof (struct iovec)); + if (iov == NULL) + LBUG(); + + return (iov); +} + +static inline void +ptlrpc_put_bulk_iov (struct ptlrpc_bulk_desc *desc, struct iovec *iov) +{ + if (desc->b_page_count <= sizeof (desc->b_iov)/sizeof (struct iovec)) + return; + + OBD_FREE (iov, desc->b_page_count * sizeof (struct iovec)); +} + int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *desc) { int rc; struct list_head *tmp, *next; ptl_process_id_t remote_id; + __u32 xid = 0; + struct iovec *iov; ENTRY; + iov = ptlrpc_get_bulk_iov (desc); + if (iov == NULL) + RETURN (-ENOMEM); + + desc->b_md.start = iov; + desc->b_md.niov = 0; + desc->b_md.length = 0; + desc->b_md.eventq = bulk_source_eq; + desc->b_md.threshold = 2; /* SENT and ACK */ + desc->b_md.options = PTL_MD_OP_PUT | PTL_MD_IOV; + desc->b_md.user_ptr = desc; + list_for_each_safe(tmp, next, &desc->b_page_list) { struct ptlrpc_bulk_page *bulk; bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link); - bulk->b_md.start = bulk->b_buf; - bulk->b_md.length = bulk->b_buflen; - bulk->b_md.eventq = bulk_source_eq; - bulk->b_md.threshold = 2; /* SENT and ACK */ - bulk->b_md.options = PTL_MD_OP_PUT; - bulk->b_md.user_ptr = bulk; - - rc = PtlMDBind(desc->b_connection->c_peer.peer_ni, bulk->b_md, - &bulk->b_md_h); - if (rc != 0) { - CERROR("PtlMDBind failed: %d\n", rc); - LBUG(); - RETURN(rc); - } - - remote_id.nid = desc->b_connection->c_peer.peer_nid; - remote_id.pid = 0; - - CDEBUG(D_NET, "Sending %d bytes to portal %d, xid %d\n", - bulk->b_md.length, desc->b_portal, bulk->b_xid); - - rc = PtlPut(bulk->b_md_h, PTL_ACK_REQ, remote_id, - desc->b_portal, 0, bulk->b_xid, 0, 0); - if (rc != PTL_OK) { - CERROR("PtlPut(%Lu, %d, %d) failed: %d\n", - remote_id.nid, desc->b_portal, bulk->b_xid, rc); - PtlMDUnlink(bulk->b_md_h); - LBUG(); - RETURN(rc); - } + LASSERT (desc->b_md.niov < desc->b_page_count); + + if (desc->b_md.niov == 0) + xid = bulk->b_xid; + LASSERT (xid == bulk->b_xid); /* should all be the same */ + + iov[desc->b_md.niov].iov_base = bulk->b_buf; + iov[desc->b_md.niov].iov_len = bulk->b_buflen; + desc->b_md.niov++; + desc->b_md.length += bulk->b_buflen; + } + + LASSERT (desc->b_md.niov == desc->b_page_count); + LASSERT (desc->b_md.niov != 0); + + rc = PtlMDBind(desc->b_connection->c_peer.peer_ni, desc->b_md, + &desc->b_md_h); + + ptlrpc_put_bulk_iov (desc, iov); /* move down to reduce latency to send */ + + if (rc != PTL_OK) { + CERROR("PtlMDBind failed: %d\n", rc); + LBUG(); + RETURN(rc); + } + + remote_id.nid = desc->b_connection->c_peer.peer_nid; + remote_id.pid = 0; + + CDEBUG(D_NET, "Sending %u pages %u bytes to portal %d nid %Lx pid %d xid %d\n", + desc->b_md.niov, desc->b_md.length, + desc->b_portal, remote_id.nid, remote_id.pid, xid); + + rc = PtlPut(desc->b_md_h, PTL_ACK_REQ, remote_id, + desc->b_portal, 0, xid, 0, 0); + if (rc != PTL_OK) { + CERROR("PtlPut(%Lu, %d, %d) failed: %d\n", + remote_id.nid, desc->b_portal, xid, rc); + PtlMDUnlink(desc->b_md_h); + LBUG(); + RETURN(rc); } RETURN(0); @@ -169,40 +218,64 @@ int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *desc) { struct list_head *tmp, *next; int rc; + __u32 xid = 0; + struct iovec *iov; ENTRY; + iov = ptlrpc_get_bulk_iov (desc); + if (iov == NULL) + return (-ENOMEM); + + desc->b_md.start = iov; + desc->b_md.niov = 0; + desc->b_md.length = 0; + desc->b_md.threshold = 1; + desc->b_md.options = PTL_MD_OP_PUT | PTL_MD_IOV; + desc->b_md.user_ptr = desc; + desc->b_md.eventq = bulk_sink_eq; + list_for_each_safe(tmp, next, &desc->b_page_list) { struct ptlrpc_bulk_page *bulk; bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link); - rc = PtlMEAttach(desc->b_connection->c_peer.peer_ni, - desc->b_portal, local_id, bulk->b_xid, 0, - PTL_UNLINK, PTL_INS_AFTER, &bulk->b_me_h); - if (rc != PTL_OK) { - CERROR("PtlMEAttach failed: %d\n", rc); - LBUG(); - GOTO(cleanup, rc); - } - - bulk->b_md.start = bulk->b_buf; - bulk->b_md.length = bulk->b_buflen; - bulk->b_md.threshold = 1; - bulk->b_md.options = PTL_MD_OP_PUT; - bulk->b_md.user_ptr = bulk; - bulk->b_md.eventq = bulk_sink_eq; - - rc = PtlMDAttach(bulk->b_me_h, bulk->b_md, PTL_UNLINK, - &bulk->b_md_h); - if (rc != PTL_OK) { - CERROR("PtlMDAttach failed: %d\n", rc); - LBUG(); - GOTO(cleanup, rc); - } - - CDEBUG(D_NET, "Setup bulk sink buffer: %u bytes, xid %u, " - "portal %u\n", bulk->b_buflen, bulk->b_xid, - desc->b_portal); + LASSERT (desc->b_md.niov < desc->b_page_count); + + if (desc->b_md.niov == 0) + xid = bulk->b_xid; + LASSERT (xid == bulk->b_xid); /* should all be the same */ + + iov[desc->b_md.niov].iov_base = bulk->b_buf; + iov[desc->b_md.niov].iov_len = bulk->b_buflen; + desc->b_md.niov++; + desc->b_md.length += bulk->b_buflen; + } + + LASSERT (desc->b_md.niov == desc->b_page_count); + LASSERT (desc->b_md.niov != 0); + + rc = PtlMEAttach(desc->b_connection->c_peer.peer_ni, + desc->b_portal, local_id, xid, 0, + PTL_UNLINK, PTL_INS_AFTER, &desc->b_me_h); + + ptlrpc_put_bulk_iov (desc, iov); + + if (rc != PTL_OK) { + CERROR("PtlMEAttach failed: %d\n", rc); + LBUG(); + GOTO(cleanup, rc); } + + rc = PtlMDAttach(desc->b_me_h, desc->b_md, PTL_UNLINK, + &desc->b_md_h); + if (rc != PTL_OK) { + CERROR("PtlMDAttach failed: %d\n", rc); + LBUG(); + GOTO(cleanup, rc); + } + + CDEBUG(D_NET, "Setup bulk sink buffers: %u pages %u bytes, xid %u, " + "portal %u\n", desc->b_md.niov, desc->b_md.length, + xid, desc->b_portal); RETURN(0); @@ -214,17 +287,10 @@ int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *desc) int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc) { - struct list_head *tmp, *next; - - list_for_each_safe(tmp, next, &desc->b_page_list) { - struct ptlrpc_bulk_page *bulk; - bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link); - - /* This should be safe: these handles are initialized to be - * invalid in ptlrpc_prep_bulk_page() */ - PtlMDUnlink(bulk->b_md_h); - PtlMEUnlink(bulk->b_me_h); - } + /* This should be safe: these handles are initialized to be + * invalid in ptlrpc_prep_bulk() */ + PtlMDUnlink(desc->b_md_h); + PtlMEUnlink(desc->b_me_h); return 0; } diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 6aa5f65..36e4b7c 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -138,6 +138,7 @@ static int handle_incoming_request(struct obd_device *obddev, /* FIXME: If we move to an event-driven model, we should put the request * on the stack of mds_handle instead. */ + LASSERT ((event->mem_desc.options & PTL_MD_IOV) == 0); start = event->mem_desc.start; memset(&request, 0, sizeof(request)); @@ -199,6 +200,8 @@ void ptlrpc_rotate_reqbufs(struct ptlrpc_service *service, { int index; + LASSERT ((ev->mem_desc.options & PTL_MD_IOV) == 0); + for (index = 0; index < service->srv_ring_length; index++) if (service->srv_buf[index] == ev->mem_desc.start) break; diff --git a/lustre/tests/Makefile.am b/lustre/tests/Makefile.am index f50b8ef..39a89c1 100644 --- a/lustre/tests/Makefile.am +++ b/lustre/tests/Makefile.am @@ -17,7 +17,7 @@ noinst_DATA = ext2_10000.gz ext2_25000.gz ext3_10000.gz lov.xml noinst_SCRIPTS = fs.sh intent-test.sh intent-test2.sh leak_finder.pl \ lldlm.sh llecho.sh llext3.sh llmodules.sh llmount-client.sh \ llmount-server.sh llmount.sh llmountcleanup.sh llrext3.sh \ - llrmount.sh llsimple.sh mdcreq.sh mdcreqcleanup.sh \ + llrmount.sh llsimple.sh mdcreq.sh mdcreqcleanup.sh \ ostreq.sh runfailure-client-mds-recover.sh runfailure-mds \ runfailure-net runfailure-ost runiozone runregression-net.sh \ runtests runvmstat snaprun.sh diff --git a/lustre/tests/common.sh b/lustre/tests/common.sh index 2801464..87078a8 100644 --- a/lustre/tests/common.sh +++ b/lustre/tests/common.sh @@ -398,7 +398,7 @@ setup_ost() { $OBDCTL <<- EOF || return $? newdev attach ost OSTDEV OSTUUID - setup \$OBDDEV + setup OBDUUID quit EOF } @@ -482,8 +482,8 @@ setup_mount() { fi [ ! -d $MTPT ] && mkdir $MTPT - echo mount -t lustre_lite -o ost=${THEOSC}-UUID,mds=${THEMDC}-UUID none $MTPT - mount -t lustre_lite -o ost=${THEOSC}-UUID,mds=${THEMDC}-UUID none $MTPT + echo mount -t lustre_lite -o osc=${THEOSC}-UUID,mdc=${THEMDC}-UUID none $MTPT + mount -t lustre_lite -o osc=${THEOSC}-UUID,mdc=${THEMDC}-UUID none $MTPT done done } diff --git a/lustre/tests/intent-test.sh b/lustre/tests/intent-test.sh index 3e6cfbb..9113f17 100755 --- a/lustre/tests/intent-test.sh +++ b/lustre/tests/intent-test.sh @@ -5,7 +5,7 @@ MTPT=/mnt/lustre remount() { umount $MTPT || exit -1 debugctl clear - mount -t lustre_lite -o ost=OSCDEV-UUID,mds=MDCDEV-UUID none $MTPT + mount -t lustre_lite -o osc=OSCDEV-UUID,mdc=MDCDEV-UUID none $MTPT } # Test mkdir diff --git a/lustre/tests/llsetup.sh b/lustre/tests/llsetup.sh index 3df8172..4828f26 100644 --- a/lustre/tests/llsetup.sh +++ b/lustre/tests/llsetup.sh @@ -1,4 +1,4 @@ -#!/bin/sh +#!/bin/sh -vx SRCDIR="`dirname $0`/" [ -f $SRCDIR/common.sh ] || SRCDIR="/lib/lustre" diff --git a/lustre/tests/lov.xml b/lustre/tests/lov.xml index 0af48e8..f636d69 100644 --- a/lustre/tests/lov.xml +++ b/lustre/tests/lov.xml @@ -29,7 +29,7 @@ - + diff --git a/lustre/utils/device.c b/lustre/utils/device.c index d5e61ba..24e05b3 100644 --- a/lustre/utils/device.c +++ b/lustre/utils/device.c @@ -501,7 +501,7 @@ int jt_dev_lov_config(int argc, char **argv) int rc, size, i; IOCINIT(data); - if (argc <= 5) + if (argc <= 6) return CMD_HELP; if (strlen(argv[1]) > sizeof(uuid_t) - 1) { @@ -512,10 +512,11 @@ int jt_dev_lov_config(int argc, char **argv) memset(&desc, 0, sizeof(desc)); strcpy(desc.ld_uuid, argv[1]); - desc.ld_default_stripecount = strtoul(argv[2], NULL, 0); - desc.ld_default_stripesize = strtoul(argv[3], NULL, 0); - desc.ld_pattern = strtoul(argv[4], NULL, 0); - desc.ld_tgt_count = argc - 5; + desc.ld_default_stripe_count = strtoul(argv[2], NULL, 0); + desc.ld_default_stripe_size = (__u64) strtoul(argv[3], NULL, 0); + desc.ld_default_stripe_offset = (__u64) strtoul(argv[3], NULL, 0); + desc.ld_pattern = strtoul(argv[5], NULL, 0); + desc.ld_tgt_count = argc - 6; size = sizeof(uuid_t) * desc.ld_tgt_count; @@ -526,8 +527,8 @@ int jt_dev_lov_config(int argc, char **argv) return -ENOMEM; } memset(uuidarray, 0, size); - for (i=5 ; i < argc ; i++) { - char *buf = (char *) (uuidarray + i -5 ); + for (i=6 ; i < argc ; i++) { + char *buf = (char *) (uuidarray + i -6 ); if (strlen(argv[i]) >= sizeof(uuid_t)) { fprintf(stderr, "lov_config: arg %d (%s) too long\n", i, argv[i]); diff --git a/lustre/utils/lconf b/lustre/utils/lconf index a5c93fd..b5b536a 100755 --- a/lustre/utils/lconf +++ b/lustre/utils/lconf @@ -187,12 +187,12 @@ class LCTLInterface: self.run(cmds) # create an lov - def lovconfig(self, uuid, mdcuuid, stripe_cnt, stripe_sz, pattern, devlist): + def lovconfig(self, uuid, mdcuuid, stripe_cnt, stripe_sz, stripe_off, pattern, devlist): cmds = """ device $%s probe - lovconfig %s %d %d %s %s - quit""" % (mdcuuid, uuid, stripe_cnt, stripe_sz, pattern, devlist) + lovconfig %s %d %d %d %s %s + quit""" % (mdcuuid, uuid, stripe_cnt, stripe_sz, stripe_off, pattern, devlist) self.run(cmds) # ============================================================ @@ -329,9 +329,9 @@ def prepare_ldlm(node): setup ="") def prepare_lov(node): - (name, uuid, mdcuuid, stripe_cnt, strip_sz, pattern, devlist, mdsname) = getLOVInfo(node) - print 'LOV:', name, uuid, mdcuuid, stripe_cnt, strip_sz, pattern, devlist, mdsname - lctl.lovconfig(uuid, mdsname, stripe_cnt, strip_sz, pattern, devlist) + (name, uuid, mdcuuid, stripe_cnt, strip_sz, stripe_off, pattern, devlist, mdsname) = getLOVInfo(node) + print 'LOV:', name, uuid, mdcuuid, stripe_cnt, strip_sz, stripe_off, pattern, devlist, mdsname + lctl.lovconfig(uuid, mdsname, stripe_cnt, strip_sz, stripe_off, pattern, devlist) lctl.newdev(attach="lov %s %s" % (name, uuid), setup ="%s" % (mdcuuid)) @@ -362,7 +362,7 @@ def prepare_ost(ost): name, uuid, obd = getOSTInfo(ost) print 'OST:', name, uuid, obd lctl.newdev(attach="ost %s %s" % (name, uuid), - setup ="$%s" % (obd)) + setup ="%s" % (obd)) def prepare_mds(node): (name, uuid, dev, size, fstype, format) = getMDSInfo(node) @@ -386,6 +386,9 @@ def prepare_mdc(node): print 'MDC:', name, uuid, mdsuuid, netuuid net = lookup(node.parentNode, netuuid) srvname, srvuuid, net, server, port = getNetworkInfo(net) + mds = lookup(node.parentNode, mdsuuid) + if mds == None: + panic(mdsuuid, "not found.") lctl.connect(net, server, port, netuuid) lctl.newdev(attach="mdc %s %s" % (name, uuid), setup ="%s %s" %(mdsuuid, netuuid)) @@ -393,10 +396,12 @@ def prepare_mdc(node): def prepare_mountpoint(node): name, uuid, oscuuid, mdcuuid, mtpt = getMTPTInfo(node) print 'MTPT:', name, uuid, oscuuid, mdcuuid, mtpt - cmd = "mount -t lustre_lite -o ost=%s,mds=%s none %s" % \ + cmd = "mount -t lustre_lite -o osc=%s,mdc=%s none %s" % \ (oscuuid, mdcuuid, mtpt) run("mkdir", mtpt) - run(cmd) + ret, val = run(cmd) + if ret: + print mtpt, "mount failed." # ============================================================ # Functions to cleanup the various objects @@ -409,8 +414,8 @@ def cleanup_ldlm(node): print "cleanup failed: ", name def cleanup_lov(node): - (name, uuid, mdcuuid, stripe_cnt, strip_sz, pattern, devlist, mdsname) = getLOVInfo(node) - print 'LOV:', name, uuid, mdcuuid, stripe_cnt, strip_sz, pattern, devlist, mdsname + (name, uuid, mdcuuid, stripe_cnt, strip_sz, stripe_off, pattern, devlist, mdsname) = getLOVInfo(node) + print 'LOV:', name, uuid, mdcuuid, stripe_cnt, strip_sz, stripe_off, pattern, devlist, mdsname try: lctl.cleanup(name, uuid) except CommandError: @@ -524,6 +529,7 @@ def getLOVInfo(node): name, uuid = getNodeAttr(node) devs = node.getElementsByTagName('devices')[0] stripe_sz = int(devs.getAttribute('stripesize')) + stripe_off = int(devs.getAttribute('stripeoffset')) pattern = int(devs.getAttribute('pattern')) mdcref = node.getElementsByTagName('mdc_ref')[0] mdcuuid = mdcref.getAttribute('uuidref') @@ -538,7 +544,7 @@ def getLOVInfo(node): if child.nodeName == 'osc_ref': devlist = devlist + child.getAttribute('uuidref') + " " strip_cnt = stripe_cnt + 1 - return (name, uuid, mdcuuid, stripe_cnt, stripe_sz, pattern, devlist, mdsname) + return (name, uuid, mdcuuid, stripe_cnt, stripe_sz, stripe_off, pattern, devlist, mdsname) # extract device attributes for an obd def getMDSInfo(node): @@ -563,12 +569,7 @@ def getOSTInfo(node): name, uuid = getNodeAttr(node) ref = node.getElementsByTagName('obd_ref')[0] uuid = ref.getAttribute('uuidref') - obd = lookup(node.parentNode, uuid) - if obd: - obdname = getOBDInfo(obd)[0] - else: - obdname = "OBD NOT FOUND" - return (name, uuid, obdname) + return (name, uuid, uuid) # extract device attributes for an obd def getOSCInfo(node): @@ -758,11 +759,16 @@ def cleanupProfile(lustreNode, profileNode): # # Load profile for def doHost(lustreNode, hosts, cleanFlag): + node = None for h in hosts: node = getByName(lustreNode, 'node', h) if node: break - + + if not node: + print 'No host entry found.' + return + reflist = node.getElementsByTagName('profile_ref') for r in reflist: if cleanFlag: @@ -837,11 +843,12 @@ def main(): usage() if not options.has_key('hostname'): + options['hostname'] = [] ret, host = run('hostname') if ret: print "unable to determine hostname" - else: - options['hostname'] = [string.strip(host[0])] + elif len(host) > 0: + options['hostname'].append(string.strip(host[0])) options['hostname'].append('localhost') print "configuring for host: ", options['hostname'] doHost(dom.childNodes[0], options['hostname'], options.has_key('cleanup') ) diff --git a/lustre/utils/lustre.dtd b/lustre/utils/lustre.dtd index 3b2a6db..49f3faa 100644 --- a/lustre/utils/lustre.dtd +++ b/lustre/utils/lustre.dtd @@ -38,6 +38,7 @@ diff --git a/lustre/utils/obdctl.c b/lustre/utils/obdctl.c index f347c79..acd785f 100644 --- a/lustre/utils/obdctl.c +++ b/lustre/utils/obdctl.c @@ -59,6 +59,17 @@ #include "parser.h" #include +#define SHMEM_STATS 1 +#if SHMEM_STATS +#include +#include + +#define MAX_SHMEM_COUNT 1024 +static long long *shared_counters; +static long long counter_snapshot[2][MAX_SHMEM_COUNT]; +struct timeval prev_time; +#endif + static int jt_newdev(int argc, char **argv); static int jt_attach(int argc, char **argv); static int jt_setup(int argc, char **argv); @@ -243,6 +254,95 @@ static int do_disconnect(char *func, int verbose) return rc; } +#if SHMEM_STATS +static void +shmem_setup () +{ + int shmid = shmget (IPC_PRIVATE, sizeof (counter_snapshot[0]), 0600); + + if (shmid == -1) + { + fprintf (stderr, "Can't create shared memory counters: %s\n", strerror (errno)); + return; + } + + shared_counters = (long long *)shmat (shmid, NULL, 0); + + if (shared_counters == (long long *)(-1)) + { + fprintf (stderr, "Can't attach shared memory counters: %s\n", strerror (errno)); + shared_counters = NULL; + return; + } +} + +static inline void +shmem_reset () +{ + if (shared_counters == NULL) + return; + + memset (shared_counters, 0, sizeof (counter_snapshot[0])); + memset (counter_snapshot, 0, sizeof (counter_snapshot)); + gettimeofday (&prev_time, NULL); +} + +static inline void +shmem_bump () +{ + if (shared_counters == NULL || + thread <= 0 || thread > MAX_SHMEM_COUNT) + return; + + shared_counters[thread - 1]++; +} + +static void +shmem_snap (int n) +{ + struct timeval this_time; + int non_zero = 0; + long long total = 0; + double secs; + int i; + + if (shared_counters == NULL || n > MAX_SHMEM_COUNT) + return; + + memcpy (counter_snapshot[1], counter_snapshot[0], n * sizeof (counter_snapshot[0][0])); + memcpy (counter_snapshot[0], shared_counters, n * sizeof (counter_snapshot[0][0])); + gettimeofday (&this_time, NULL); + + for (i = 0; i < n; i++) + { + long long this_count = counter_snapshot[0][i] - counter_snapshot[1][i]; + + if (this_count != 0) + { + non_zero++; + total += this_count; + } + } + + secs = (this_time.tv_sec + this_time.tv_usec/1000000.0) - + (prev_time.tv_sec + prev_time.tv_usec/1000000.0); + + printf ("%d/%d Total: %f/second\n", non_zero, n, total / secs); + + prev_time = this_time; +} + +#define SHMEM_SETUP() shmem_setup() +#define SHMEM_RESET() shmem_reset() +#define SHMEM_BUMP() shmem_bump() +#define SHMEM_SNAP(n) shmem_snap(n) +#else +#define SHMEM_SETUP() +#define SHMEM_RESET() +#define SHMEM_BUMP() +#define SHMEM_SNAP(n) +#endif + extern command_t cmdlist[]; static int xml_command(char *cmd, ...) { @@ -828,7 +928,7 @@ static int jt__threads(int argc, char **argv) { int threads, next_thread; int verbose; - int i, j; + int i; if (argc < 5) { fprintf(stderr, @@ -841,9 +941,12 @@ static int jt__threads(int argc, char **argv) verbose = get_verbose(argv[2]); - printf("%s: starting %d threads on device %s running %s\n", - argv[0], threads, argv[3], argv[4]); + if (verbose != 0) + printf("%s: starting %d threads on device %s running %s\n", + argv[0], threads, argv[3], argv[4]); + SHMEM_RESET(); + for (i = 1, next_thread = verbose; i <= threads; i++) { rc = fork(); if (rc < 0) { @@ -861,14 +964,23 @@ static int jt__threads(int argc, char **argv) } if (!thread) { /* parent process */ - if (!verbose) - printf("%s: started %d threads\n\n", argv[0], i - 1); - else - printf("\n"); - - for (j = 1; j < i; j++) { - int status; - int ret = wait(&status); + int live_threads = threads; + + while (live_threads > 0) + { + int status; + pid_t ret; + + ret = waitpid (0, &status, verbose < 0 ? WNOHANG : 0); + if (ret == 0) + { + if (verbose >= 0) + abort (); + + sleep (-verbose); + SHMEM_SNAP (threads); + continue; + } if (ret < 0) { fprintf(stderr, "error: %s: wait - %s\n", @@ -889,6 +1001,8 @@ static int jt__threads(int argc, char **argv) argv[0], ret, err); if (!rc) rc = err; + + live_threads--; } } } @@ -1165,6 +1279,7 @@ static int jt_create(int argc, char **argv) for (i = 1, next_count = verbose; i <= count ; i++) { rc = ioctl(fd, OBD_IOC_CREATE , &data); + SHMEM_BUMP(); if (rc < 0) { fprintf(stderr, "error: %s: #%d - %s\n", cmdname(argv[0]), i, strerror(rc = errno)); @@ -1272,11 +1387,13 @@ static int jt_test_getattr(int argc, char **argv) gettimeofday(&start, NULL); next_time.tv_sec = start.tv_sec - verbose; next_time.tv_usec = start.tv_usec; - printf("%s: getting %d attrs (testing only): %s", cmdname(argv[0]), - count, ctime(&start.tv_sec)); + if (verbose != 0) + printf("%s: getting %d attrs (testing only): %s", cmdname(argv[0]), + count, ctime(&start.tv_sec)); for (i = 1, next_count = verbose; i <= count; i++) { rc = ioctl(fd, OBD_IOC_GETATTR , &data); + SHMEM_BUMP(); if (rc < 0) { fprintf(stderr, "error: %s: #%d - %s\n", cmdname(argv[0]), i, strerror(rc = errno)); @@ -1296,9 +1413,10 @@ static int jt_test_getattr(int argc, char **argv) diff = difftime(&end, &start); --i; - printf("%s: %d attrs in %.4gs (%.4g attr/s): %s", - cmdname(argv[0]), i, diff, (double)i / diff, - ctime(&end.tv_sec)); + if (verbose != 0) + printf("%s: %d attrs in %.4gs (%.4g attr/s): %s", + cmdname(argv[0]), i, diff, (double)i / diff, + ctime(&end.tv_sec)); } return rc; } @@ -1366,9 +1484,10 @@ static int jt_test_brw(int argc, char **argv) next_time.tv_sec = start.tv_sec - verbose; next_time.tv_usec = start.tv_usec; - printf("%s: %s %d (%dx%d pages) (testing only): %s", - cmdname(argv[0]), write ? "writing" : "reading", - count, obdos, pages, ctime(&start.tv_sec)); + if (verbose != 0) + printf("%s: %s %d (%dx%d pages) (testing only): %s", + cmdname(argv[0]), write ? "writing" : "reading", + count, obdos, pages, ctime(&start.tv_sec)); /* * We will put in the start time (and loop count inside the loop) @@ -1393,6 +1512,7 @@ static int jt_test_brw(int argc, char **argv) } rc = ioctl(fd, rw, &data); + SHMEM_BUMP(); if (rc) { fprintf(stderr, "error: %s: #%d - %s on %s\n", cmdname(argv[0]), i, strerror(rc = errno), @@ -1414,10 +1534,11 @@ static int jt_test_brw(int argc, char **argv) diff = difftime(&end, &start); --i; - printf("%s: %s %dx%dx%d pages in %.4gs (%.4g pg/s): %s", - cmdname(argv[0]), write ? "wrote" : "read", obdos, - pages, i, diff, (double)obdos * i * pages / diff, - ctime(&end.tv_sec)); + if (verbose != 0) + printf("%s: %s %dx%dx%d pages in %.4gs (%.4g pg/s): %s", + cmdname(argv[0]), write ? "wrote" : "read", obdos, + pages, i, diff, (double)obdos * i * pages / diff, + ctime(&end.tv_sec)); } return rc; } @@ -1430,6 +1551,9 @@ static int jt_lov_config(int argc, char **argv) int size, i; IOCINIT(data); + printf("WARNING: obdctl lovconfig NOT MAINTAINED\n"); + return -1; + if (argc <= 5 ){ Parser_printhelp("lovconfig"); return -1; @@ -1443,8 +1567,8 @@ static int jt_lov_config(int argc, char **argv) memset(&desc, 0, sizeof(desc)); strcpy(desc.ld_uuid, argv[1]); - desc.ld_default_stripecount = strtoul(argv[2], NULL, 0); - desc.ld_default_stripesize = strtoul(argv[3], NULL, 0); + desc.ld_default_stripe_count = strtoul(argv[2], NULL, 0); + desc.ld_default_stripe_size = strtoul(argv[3], NULL, 0); desc.ld_pattern = strtoul(argv[4], NULL, 0); desc.ld_tgt_count = argc - 5; @@ -1590,7 +1714,9 @@ int main(int argc, char **argv) sigact.sa_flags = SA_RESTART; sigaction(SIGINT, &sigact, NULL); - + setlinebuf (stdout); + SHMEM_SETUP(); + if (argc > 1) { rc = Parser_execarg(argc - 1, argv + 1, cmdlist); } else { diff --git a/lustre/utils/parser.c b/lustre/utils/parser.c index 0118e68..0ecfd42d 100644 --- a/lustre/utils/parser.c +++ b/lustre/utils/parser.c @@ -28,7 +28,6 @@ #include #include -#include #ifdef HAVE_LIBREADLINE #define READLINE_LIBRARY #include -- 1.8.3.1