case ${host_cpu} in
um )
AC_MSG_RESULT($host_cpu)
- KCFLAGS='-g -Wall -pipe -Wno-trigraphs -Wstrict-prototypes -fno-strict-aliasing -fno-common '
+ KCFLAGS='-g -Wall -pipe -Wno-trigraphs -Wstrict-prototypes -fno-strict-aliasing -fno-common '
KCPPFLAGS='-D__KERNEL__ -U__i386__ -Ui386 -DUM_FASTCALL -D__arch_um__ -DSUBARCH="i386" -DNESTING=0 -D_LARGEFILE64_SOURCE -Derrno=kernel_errno -DPATCHLEVEL=4 -DMODULE -I$(LINUX)/arch/um/include '
MOD_LINK=elf_i386
;;
demodir='$(docdir)/demo'
AC_SUBST(demodir)
-AM_CONFIG_HEADER(include/config.h)
+# not needed until the AC_CHECK_LIB(readline) above works
+# AM_CONFIG_HEADER(include/config.h)
AC_OUTPUT(Makefile lib/Makefile ldlm/Makefile \
obdecho/Makefile ptlrpc/Makefile \
#define LDLM_FL_CBPENDING (1 << 4)
#define LDLM_FL_AST_SENT (1 << 5)
#define LDLM_FL_DESTROYED (1 << 6)
+#define LDLM_FL_WAIT_NOREPROC (1 << 7)
#define L2B(c) (1 << c)
struct ldlm_lock_desc *new, void *data,
__u32 data_len);
+typedef int (*ldlm_completion_callback)(struct ldlm_lock *lock, int flags);
+
struct ldlm_lock {
__u64 l_random;
int l_refc;
ldlm_mode_t l_req_mode;
ldlm_mode_t l_granted_mode;
- ldlm_lock_callback l_completion_ast;
+ ldlm_completion_callback l_completion_ast;
ldlm_lock_callback l_blocking_ast;
struct ptlrpc_connection *l_connection;
struct ptlrpc_client *l_client;
+ struct lustre_handle *l_connh;
__u32 l_flags;
struct lustre_handle l_remote_handle;
void *l_data;
int w_blocking;
struct ldlm_lock_desc w_desc;
struct list_head w_list;
+ int w_flags;
void *w_data;
int w_datalen;
};
extern char *ldlm_lockname[];
extern char *ldlm_typename[];
+extern char *ldlm_it2str(int it);
#define LDLM_DEBUG(lock, format, a...) \
do { \
int ldlm_extent_compat(struct ldlm_lock *, struct ldlm_lock *);
int ldlm_extent_policy(struct ldlm_lock *, void *, ldlm_mode_t, void *);
+/* ldlm_lockd.c */
+int ldlm_handle_enqueue(struct ptlrpc_request *req);
+int ldlm_handle_convert(struct ptlrpc_request *req);
+int ldlm_handle_cancel(struct ptlrpc_request *req);
+
/* ldlm_lock.c */
+void ldlm_register_intent(int (*arg)(struct ldlm_lock *lock, void *req_cookie,
+ ldlm_mode_t mode, void *data));
+void ldlm_unregister_intent(void);
void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh);
struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *handle);
void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh);
__u32 data_len);
ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock *lock, void *cookie,
int cookie_len, int *flags,
- ldlm_lock_callback completion,
+ ldlm_completion_callback completion,
ldlm_lock_callback blocking);
struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
int *flags);
int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3]);
/* ldlm_request.c */
-int ldlm_cli_enqueue(struct ptlrpc_client *cl,
- struct ptlrpc_connection *peer,
- struct lustre_handle *connh,
+int ldlm_completion_ast(struct ldlm_lock *lock, int flags);
+int ldlm_cli_enqueue(struct lustre_handle *conn,
struct ptlrpc_request *req,
struct ldlm_namespace *ns,
struct lustre_handle *parent_lock_handle,
void *cookie, int cookielen,
ldlm_mode_t mode,
int *flags,
+ ldlm_completion_callback completion,
ldlm_lock_callback callback,
void *data,
__u32 data_len,
struct lustre_handle *lockh);
-int ldlm_match_or_enqueue(struct ptlrpc_client *cl,
- struct ptlrpc_connection *conn,
- struct lustre_handle *connh,
+int ldlm_match_or_enqueue(struct lustre_handle *connh,
struct ptlrpc_request *req,
struct ldlm_namespace *ns,
struct lustre_handle *parent_lock_handle,
void *cookie, int cookielen,
ldlm_mode_t mode,
int *flags,
+ ldlm_completion_callback completion,
ldlm_lock_callback callback,
void *data,
__u32 data_len,
struct lustre_handle *lockh);
int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new,
void *data, __u32 data_len);
-int ldlm_cli_convert(struct ptlrpc_client *, struct lustre_handle *,
- struct lustre_handle *connh, int new_mode, int *flags);
+int ldlm_cli_convert(struct lustre_handle *, int new_mode, int *flags);
int ldlm_cli_cancel(struct lustre_handle *lockh);
#endif /* __KERNEL__ */
__u64 l_object_id;
};
+#define LOV_MAGIC 0x0BD00BD0
+
struct lov_stripe_md {
- __u64 lmd_magic;
- __u64 lmd_object_id; /* lov object id */
- __u64 lmd_stripe_count;
- __u32 lmd_size;
- __u32 lmd_stripe_size;
+ __u32 lmd_magic;
+ __u32 lmd_easize; /* packed size of extended */
+ __u64 lmd_object_id; /* lov object id */
+ __u64 lmd_stripe_offset; /* offset of the stripe */
+ __u64 lmd_stripe_size; /* size of the stripe */
+ __u32 lmd_stripe_count; /* how many objects are being striped */
__u32 lmd_stripe_pattern; /* per-lov object stripe pattern */
struct lov_object_id lmd_objects[0];
};
* LOV data structures
*/
-#define LOV_RAID0 0
+#define LOV_RAID0 0
+#define LOV_RAIDRR 1
+
struct lov_desc {
- __u32 ld_tgt_count; /* how many OBD's */
- __u32 ld_default_stripecount;
- __u32 ld_default_stripesize; /* in bytes */
- __u32 ld_pattern; /* RAID 0,1 etc */
+ __u32 ld_tgt_count; /* how many OBD's */
+ __u32 ld_default_stripe_count; /* how many objects are used */
+ __u64 ld_default_stripe_size; /* in bytes */
+ __u64 ld_default_stripe_offset; /* in bytes */
+ __u32 ld_pattern; /* RAID 0,1 etc */
uuid_t ld_uuid;
};
/*
* LDLM requests:
*/
-
-/* opcodes */
-#define LDLM_REPLY 0
-#define LDLM_ENQUEUE 1
-#define LDLM_CONVERT 2
-#define LDLM_CANCEL 3
-#define LDLM_CALLBACK 4
+/* opcodes -- MUST be distinct from OST/MDS opcodes */
+#define LDLM_ENQUEUE 101
+#define LDLM_CONVERT 102
+#define LDLM_CANCEL 103
+#define LDLM_CALLBACK 104
#define RES_NAME_SIZE 3
#define RES_VERSION_SIZE 4
#else
# include <asm/semaphore.h>
#endif
-
#include <linux/portals_lib.h>
#include <linux/lustre_idl.h>
#ifdef __KERNEL__
/* l_net.c */
struct ptlrpc_request;
+struct obd_device;
int target_handle_connect(struct ptlrpc_request *req);
int target_handle_disconnect(struct ptlrpc_request *req);
+int client_obd_connect(struct lustre_handle *conn, struct obd_device *obd);
+int client_obd_disconnect(struct lustre_handle *conn);
+int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf);
+int client_obd_cleanup(struct obd_device * obddev);
+struct client_obd *client_conn2cli(struct lustre_handle *conn);
/* l_lock.c */
struct lustre_lock {
return &(ll_s2sbi(sb))->ll_osc_conn;
}
-static inline struct mdc_obd *sbi2mdc(struct ll_sb_info *sbi)
+static inline struct client_obd *sbi2mdc(struct ll_sb_info *sbi)
{
struct obd_device *obd = class_conn2obd(&sbi->ll_mdc_conn);
if (obd == NULL)
LBUG();
- return &obd->u.mdc;
+ return &obd->u.cli;
}
static inline struct ll_sb_info *ll_i2sbi(struct inode *inode)
void mds_pack_inode2body(struct mds_body *body, struct inode *inode);
/* mds/handler.c */
-struct dentry *mds_name2locked_dentry(struct mds_obd *mds, struct dentry *dir,
+struct dentry *mds_name2locked_dentry(struct obd_device *obd, struct dentry *dir,
struct vfsmount **mnt, char *name,
int namelen, int lock_mode,
struct lustre_handle *lockh,
int dir_lock_mode);
-struct dentry *mds_fid2locked_dentry(struct mds_obd *mds, struct ll_fid *fid,
+struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
struct vfsmount **mnt, int lock_mode,
struct lustre_handle *lockh);
struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
__u32 b_flags;
struct dentry *b_dentry;
int (*b_cb)(struct ptlrpc_bulk_page *);
-
- ptl_md_t b_md;
- ptl_handle_md_t b_md_h;
- ptl_handle_me_t b_me_h;
};
struct ptlrpc_bulk_desc {
wait_queue_head_t b_waitq;
struct list_head b_page_list;
__u32 b_page_count;
- atomic_t b_pages_remaining;
atomic_t b_refcount;
void *b_desc_private;
struct tq_struct b_queue;
+
+ ptl_md_t b_md;
+ ptl_handle_md_t b_md_h;
+ ptl_handle_me_t b_me_h;
+
+ struct iovec b_iov[16]; /* self-sized pre-allocated iov */
};
struct ptlrpc_thread {
req->rq_reqmsg->addr = h->addr;
req->rq_reqmsg->cookie = h->cookie;
}
-struct ptlrpc_request *ptlrpc_prep_req2(struct ptlrpc_client *cl,
- struct ptlrpc_connection *conn,
- struct lustre_handle *handle,
+struct ptlrpc_request *ptlrpc_prep_req2(struct lustre_handle *conn,
int opcode, int count, int *lengths,
char **bufs);
struct mds_client_info;
struct mds_server_data;
-struct mdc_obd {
- struct ptlrpc_client *mdc_client;
- struct ptlrpc_client *mdc_ldlm_client;
- struct ptlrpc_connection *mdc_conn;
- int mdc_max_mdsize;
- __u8 mdc_target_uuid[37];
+struct client_obd {
+ struct ptlrpc_client *cl_client;
+ struct ptlrpc_client *cl_ldlm_client;
+ struct ptlrpc_connection *cl_conn;
+ struct lustre_handle cl_exporth;
+ struct semaphore cl_sem;
+ int cl_conn_count;
+ __u8 cl_target_uuid[37];
+ int cl_max_mdsize;
};
+#if 0
struct osc_obd {
struct ptlrpc_client *osc_client;
struct ptlrpc_client *osc_ldlm_client;
struct ptlrpc_connection *osc_conn;
__u8 osc_target_uuid[37];
};
+#endif
struct mds_obd {
- struct ldlm_namespace *mds_local_namespace;
struct ptlrpc_service *mds_service;
- struct ptlrpc_client *mds_ldlm_client; /* to be an LDLM client */
- struct ptlrpc_connection *mds_ldlm_conn; /* to be an LDLM client */
- struct lustre_handle mds_connh; /* to be one's on DLM client */
char *mds_fstype;
struct super_block *mds_sb;
struct ext2_obd ext2;
struct filter_obd filter;
struct mds_obd mds;
- struct mdc_obd mdc;
+ struct client_obd cli;
struct ost_obd ost;
- struct osc_obd osc;
+ // struct osc_obd osc;
struct ldlm_obd ldlm;
struct echo_obd echo;
struct recovd_obd recovd;
struct lov_stripe_md **ea);
int (*o_destroy)(struct lustre_handle *conn, struct obdo *oa,
struct lov_stripe_md *ea);
- int (*o_setattr)(struct lustre_handle *conn, struct obdo *oa);
- int (*o_getattr)(struct lustre_handle *conn, struct obdo *oa);
+ int (*o_setattr)(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md *ea);
+ int (*o_getattr)(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md *ea);
int (*o_open)(struct lustre_handle *conn, struct obdo *oa,
struct lov_stripe_md *);
int (*o_close)(struct lustre_handle *conn, struct obdo *oa,
RETURN(rc);
}
-static inline int obd_getattr(struct lustre_handle *conn, struct obdo *obdo)
+static inline int obd_getattr(struct lustre_handle *conn,
+ struct obdo *obdo,
+ struct lov_stripe_md *ea)
{
int rc;
struct obd_export *export;
OBD_CHECK_SETUP(conn, export);
OBD_CHECK_OP(export->exp_obd,getattr);
- rc = OBP(export->exp_obd, getattr)(conn, obdo);
+ rc = OBP(export->exp_obd, getattr)(conn, obdo, ea);
RETURN(rc);
}
RETURN(rc);
}
-static inline int obd_setattr(struct lustre_handle *conn, struct obdo *obdo)
+static inline int obd_setattr(struct lustre_handle *conn,
+ struct obdo *obdo,
+ struct lov_stripe_md *ea)
{
int rc;
struct obd_export *export;
OBD_CHECK_SETUP(conn, export);
OBD_CHECK_OP(export->exp_obd,setattr);
- rc = OBP(export->exp_obd, setattr)(conn, obdo);
+ rc = OBP(export->exp_obd, setattr)(conn, obdo, ea);
RETURN(rc);
}
l_lock.c:
test -e l_lock.c || ln -sf $(top_srcdir)/lib/l_lock.c
+l_net.c:
+ test -e l_net.c || ln -sf $(top_srcdir)/lib/l_net.c
-LINX=l_lock.c
+LINX=l_lock.c l_net.c
ldlm_SOURCES = l_lock.c ldlm_lock.c ldlm_resource.c ldlm_test.c ldlm_lockd.c \
-ldlm_extent.c ldlm_request.c
+ldlm_extent.c ldlm_request.c l_net.c
include $(top_srcdir)/Rules
[LDLM_MDSINTENT] "INT"
};
+char *ldlm_it2str(int it)
+{
+ switch (it) {
+ case IT_OPEN:
+ return "open";
+ case IT_CREAT:
+ return "creat";
+ case (IT_OPEN|IT_CREAT):
+ return "open|creat";
+ case IT_MKDIR:
+ return "mkdir";
+ case IT_LINK:
+ return "link";
+ case IT_SYMLINK:
+ return "symlink";
+ case IT_UNLINK:
+ return "unlink";
+ case IT_RMDIR:
+ return "rmdir";
+ case IT_RENAME:
+ return "rename";
+ case IT_RENAME2:
+ return "rename2";
+ case IT_READDIR:
+ return "readdir";
+ case IT_GETATTR:
+ return "getattr";
+ case IT_SETATTR:
+ return "setattr";
+ case IT_READLINK:
+ return "readlink";
+ case IT_MKNOD:
+ return "mknod";
+ case IT_LOOKUP:
+ return "lookup";
+ default:
+ CERROR("Unknown intent %d\n", it);
+ return "UNKNOWN";
+ }
+}
+
extern kmem_cache_t *ldlm_lock_slab;
-int (*mds_reint_p)(int offset, struct ptlrpc_request *req) = NULL;
-int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req) = NULL;
static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b);
-static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
- ldlm_mode_t mode, void *data);
ldlm_res_compat ldlm_res_compat_table [] = {
[LDLM_PLAIN] ldlm_plain_compat,
ldlm_res_policy ldlm_res_policy_table [] = {
[LDLM_PLAIN] NULL,
[LDLM_EXTENT] ldlm_extent_policy,
- [LDLM_MDSINTENT] ldlm_intent_policy
+ [LDLM_MDSINTENT] NULL
};
+void ldlm_register_intent(int (*arg)(struct ldlm_lock *lock, void *req_cookie,
+ ldlm_mode_t mode, void *data))
+{
+ ldlm_res_policy_table[LDLM_MDSINTENT] = arg;
+}
+
+void ldlm_unregister_intent()
+{
+ ldlm_res_policy_table[LDLM_MDSINTENT] = NULL;
+}
/*
* REFCOUNTED LOCK OBJECTS
-static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
- ldlm_mode_t mode, void *data)
-{
- struct ptlrpc_request *req = req_cookie;
- int rc = 0;
- ENTRY;
-
- if (!req_cookie)
- RETURN(0);
-
- if (req->rq_reqmsg->bufcount > 1) {
- /* an intent needs to be considered */
- struct ldlm_intent *it = lustre_msg_buf(req->rq_reqmsg, 1);
- struct mds_obd *mds= &req->rq_export->exp_obd->u.mds;
- struct mds_body *mds_rep;
- struct ldlm_reply *rep;
- __u64 new_resid[3] = {0, 0, 0}, old_res;
- int bufcount = -1, rc, size[3] = {sizeof(struct ldlm_reply),
- sizeof(struct mds_body),
- mds->mds_max_mdsize};
-
- it->opc = NTOH__u64(it->opc);
-
- LDLM_DEBUG(lock, "intent policy, opc: %Ld", it->opc);
-
- /* prepare reply */
- switch(it->opc) {
- case IT_GETATTR:
- /* Note that in the negative case you may be returning
- * a file and its obdo */
- case IT_CREAT:
- case IT_CREAT|IT_OPEN:
- case IT_LINK:
- case IT_LOOKUP:
- case IT_MKDIR:
- case IT_MKNOD:
- case IT_OPEN:
- case IT_READLINK:
- case IT_RENAME:
- case IT_RMDIR:
- case IT_SETATTR:
- case IT_SYMLINK:
- case IT_UNLINK:
- bufcount = 3;
- break;
- case IT_RENAME2:
- bufcount = 1;
- break;
- default:
- LBUG();
- }
-
- rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen,
- &req->rq_repmsg);
- if (rc) {
- rc = req->rq_status = -ENOMEM;
- RETURN(rc);
- }
-
- rep = lustre_msg_buf(req->rq_repmsg, 0);
- rep->lock_policy_res1 = 1;
-
- /* execute policy */
- switch (it->opc) {
- case IT_CREAT:
- case IT_CREAT|IT_OPEN:
- case IT_LINK:
- case IT_MKDIR:
- case IT_MKNOD:
- case IT_RENAME2:
- case IT_RMDIR:
- case IT_SYMLINK:
- case IT_UNLINK:
- rc = mds_reint_p(2, req);
- if (rc || req->rq_status != 0) {
- rep->lock_policy_res2 = req->rq_status;
- RETURN(ELDLM_LOCK_ABORTED);
- }
- break;
- case IT_GETATTR:
- case IT_LOOKUP:
- case IT_OPEN:
- case IT_READDIR:
- case IT_READLINK:
- case IT_RENAME:
- case IT_SETATTR:
- rc = mds_getattr_name_p(2, req);
- /* FIXME: we need to sit down and decide on who should
- * set req->rq_status, who should return negative and
- * positive return values, and what they all mean. */
- if (rc || req->rq_status != 0) {
- rep->lock_policy_res2 = req->rq_status;
- RETURN(ELDLM_LOCK_ABORTED);
- }
- break;
- case IT_READDIR|IT_OPEN:
- LBUG();
- break;
- default:
- CERROR("Unhandled intent\n");
- LBUG();
- }
-
- if (it->opc == IT_UNLINK || it->opc == IT_RMDIR ||
- it->opc == IT_RENAME || it->opc == IT_RENAME2)
- RETURN(ELDLM_LOCK_ABORTED);
-
- rep->lock_policy_res2 = req->rq_status;
- mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
- new_resid[0] = NTOH__u32(mds_rep->ino);
- if (new_resid[0] == 0)
- LBUG();
- old_res = lock->l_resource->lr_name[0];
-
- CDEBUG(D_INFO, "remote intent: locking %d instead of"
- "%ld\n", mds_rep->ino, (long)old_res);
-
- ldlm_lock_change_resource(lock, new_resid);
- if (lock->l_resource == NULL) {
- LBUG();
- RETURN(-ENOMEM);
- }
- LDLM_DEBUG(lock, "intent policy, old res %ld",
- (long)old_res);
- RETURN(ELDLM_LOCK_CHANGED);
- } else {
- int size = sizeof(struct ldlm_reply);
- rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
- &req->rq_repmsg);
- if (rc) {
- CERROR("out of memory\n");
- LBUG();
- RETURN(-ENOMEM);
- }
- }
- RETURN(rc);
-}
static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b)
{
EXIT;
}
+/* returns a referenced lock or NULL */
static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
struct ldlm_extent *extent)
{
/* Must be called with no resource or lock locks held.
*
* Returns 1 if it finds an already-existing lock that is compatible; in this
- * case, lockh is filled in with a addref()ed lock */
+ * case, lockh is filled in with a addref()ed lock
+*/
int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
void *cookie, int cookielen, ldlm_mode_t mode,
struct lustre_handle *lockh)
if (lock) {
ldlm_lock2handle(lock, lockh);
- wait_event(lock->l_waitq,
- lock->l_req_mode == lock->l_granted_mode);
+ if (lock->l_completion_ast)
+ lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC);
}
if (rc)
LDLM_DEBUG(lock, "matched");
ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock *lock,
void *cookie, int cookie_len,
int *flags,
- ldlm_lock_callback completion,
+ ldlm_completion_callback completion,
ldlm_lock_callback blocking)
{
struct ldlm_resource *res;
rc = w->w_lock->l_blocking_ast
(&lockh, &w->w_desc, w->w_data, w->w_datalen);
else
- rc = w->w_lock->l_completion_ast
- (&lockh, NULL, w->w_data, w->w_datalen);
+ rc = w->w_lock->l_completion_ast(w->w_lock, w->w_flags);
if (rc)
CERROR("Failed AST - should clean & disconnect "
"client\n");
res->lr_tmp = NULL;
granted = 1;
/* FIXME: completion handling not with ns_lock held ! */
- wake_up(&lock->l_waitq);
+ if (lock->l_completion_ast)
+ lock->l_completion_ast(lock, 0);
}
} else
list_add(&lock->l_res_link, res->lr_converting.prev);
extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
-static int ldlm_handle_enqueue(struct ptlrpc_request *req)
+int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags)
+{
+ struct ldlm_request *body;
+ struct ptlrpc_request *req;
+ struct ptlrpc_client *cl;
+ int rc = 0, size = sizeof(*body);
+ ENTRY;
+
+ if (lock == NULL) {
+ LBUG();
+ RETURN(-EINVAL);
+ }
+
+ cl = &lock->l_resource->lr_namespace->ns_rpc_client;
+ req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1,
+ &size, NULL);
+ if (!req)
+ RETURN(-ENOMEM);
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ memcpy(&body->lock_handle1, &lock->l_remote_handle,
+ sizeof(body->lock_handle1));
+ body->lock_flags = flags;
+
+ LDLM_DEBUG(lock, "server preparing completion AST");
+ req->rq_replen = lustre_msg_size(0, NULL);
+
+ rc = ptlrpc_queue_wait(req);
+ rc = ptlrpc_check_status(req, rc);
+ ptlrpc_free_req(req);
+ RETURN(rc);
+}
+
+int ldlm_handle_enqueue(struct ptlrpc_request *req)
{
struct obd_device *obddev = req->rq_export->exp_obd;
struct ldlm_reply *dlm_rep;
flags = dlm_req->lock_flags;
err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
- ldlm_server_ast, ldlm_server_ast);
+ ldlm_server_completion_ast, ldlm_server_ast);
if (err != ELDLM_OK)
GOTO(out, err);
return 0;
}
-static int ldlm_handle_convert(struct ptlrpc_request *req)
+int ldlm_handle_convert(struct ptlrpc_request *req)
{
struct ldlm_request *dlm_req;
struct ldlm_reply *dlm_rep;
RETURN(0);
}
-static int ldlm_handle_cancel(struct ptlrpc_request *req)
+int ldlm_handle_cancel(struct ptlrpc_request *req)
{
struct ldlm_request *dlm_req;
struct ldlm_lock *lock;
lock = ldlm_handle2lock(&dlm_req->lock_handle1);
if (!lock) {
+ LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock (lock %p)",
+ (void *)(unsigned long) dlm_req->lock_handle1.addr);
req->rq_status = ESTALE;
} else {
LDLM_DEBUG(lock, "server-side cancel handler START");
ldlm_reprocess_all(lock->l_resource);
LDLM_DEBUG(lock, "server-side cancel handler END");
LDLM_LOCK_PUT(lock);
- } else
- LDLM_DEBUG_NOLOCK("server-side cancel handler END (lock %p)",
- lock);
+ }
RETURN(0);
}
}
LDLM_LOCK_PUT(lock);
} else {
- struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+ struct list_head ast_list = LIST_HEAD_INIT(ast_list);
l_lock(&lock->l_resource->lr_namespace->ns_lock);
lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
/* If we receive the completion AST before the actual enqueue
* returned, then we might need to switch resources. */
+ ldlm_resource_unlink_lock(lock);
if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
lock->l_resource->lr_name,
sizeof(__u64) * RES_NAME_SIZE) != 0) {
ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
LDLM_DEBUG(lock, "completion AST, new resource");
}
- lock->l_resource->lr_tmp = &rpc_list;
- ldlm_resource_unlink_lock(lock);
+ lock->l_resource->lr_tmp = &ast_list;
ldlm_grant_lock(lock);
- /* FIXME: we want any completion function, not just wake_up */
- wake_up(&lock->l_waitq);
lock->l_resource->lr_tmp = NULL;
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
LDLM_LOCK_PUT(lock);
- ldlm_run_ast_work(&rpc_list);
+ ldlm_run_ast_work(&ast_list);
}
LDLM_DEBUG_NOLOCK("client %s callback handler END (lock %p)",
RETURN(0);
}
-static int ldlm_handle(struct ptlrpc_request *req)
+
+static int ldlm_callback_handler(struct ptlrpc_request *req)
{
int rc;
ENTRY;
req->rq_reqmsg->type);
GOTO(out, rc = -EINVAL);
}
-
- if (!req->rq_export && req->rq_reqmsg->opc == LDLM_ENQUEUE) {
- CERROR("No export handle for enqueue request.\n");
- GOTO(out, rc = -ENOTCONN);
- }
-
switch (req->rq_reqmsg->opc) {
- case LDLM_ENQUEUE:
- CDEBUG(D_INODE, "enqueue\n");
- OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
- rc = ldlm_handle_enqueue(req);
- break;
-
- case LDLM_CONVERT:
- CDEBUG(D_INODE, "convert\n");
- OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
- rc = ldlm_handle_convert(req);
- break;
-
- case LDLM_CANCEL:
- CDEBUG(D_INODE, "cancel\n");
- OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
- rc = ldlm_handle_cancel(req);
- break;
-
case LDLM_CALLBACK:
CDEBUG(D_INODE, "callback\n");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
return 0;
}
+
static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len,
void *karg, void *uarg)
{
MOD_INC_USE_COUNT;
ldlm->ldlm_service =
ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
- LDLM_REPLY_PORTAL, "self", ldlm_handle);
+ LDLM_REPLY_PORTAL, "self", ldlm_callback_handler);
if (!ldlm->ldlm_service) {
LBUG();
GOTO(out_dec, rc = -ENOMEM);
}
- if (mds_reint_p == NULL)
- mds_reint_p = inter_module_get_request("mds_reint", "mds");
- if (IS_ERR(mds_reint_p)) {
- CERROR("MDSINTENT locks require the MDS module.\n");
- GOTO(out_dec, rc = -EINVAL);
- }
- if (mds_getattr_name_p == NULL)
- mds_getattr_name_p = inter_module_get_request
- ("mds_getattr_name", "mds");
- if (IS_ERR(mds_getattr_name_p)) {
- CERROR("MDSINTENT locks require the MDS module.\n");
- GOTO(out_dec, rc = -EINVAL);
- }
-
for (i = 0; i < LDLM_NUM_THREADS; i++) {
- rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service,
- "lustre_dlm");
+ char name[32];
+ sprintf(name, "lustre_dlm_%2d", i);
+ rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service, name);
if (rc) {
CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
LBUG();
ptlrpc_stop_all_threads(ldlm->ldlm_service);
ptlrpc_unregister_service(ldlm->ldlm_service);
out_dec:
- if (mds_reint_p != NULL)
- inter_module_put("mds_reint");
- if (mds_getattr_name_p != NULL)
- inter_module_put("mds_getattr_name");
MOD_DEC_USE_COUNT;
return rc;
}
ptlrpc_stop_all_threads(ldlm->ldlm_service);
ptlrpc_unregister_service(ldlm->ldlm_service);
- if (mds_reint_p != NULL)
- inter_module_put("mds_reint");
- if (mds_getattr_name_p != NULL)
- inter_module_put("mds_getattr_name");
-
MOD_DEC_USE_COUNT;
RETURN(0);
}
CERROR("couldn't free ldlm lock slab\n");
}
+EXPORT_SYMBOL(ldlm_completion_ast);
+EXPORT_SYMBOL(ldlm_handle_enqueue);
+EXPORT_SYMBOL(ldlm_handle_cancel);
+EXPORT_SYMBOL(ldlm_handle_convert);
+EXPORT_SYMBOL(ldlm_register_intent);
+EXPORT_SYMBOL(ldlm_unregister_intent);
EXPORT_SYMBOL(ldlm_lockname);
EXPORT_SYMBOL(ldlm_typename);
EXPORT_SYMBOL(ldlm_handle2lock);
EXPORT_SYMBOL(ldlm_lock_match);
EXPORT_SYMBOL(ldlm_lock_addref);
EXPORT_SYMBOL(ldlm_lock_decref);
+EXPORT_SYMBOL(ldlm_lock_change_resource);
EXPORT_SYMBOL(ldlm_cli_convert);
EXPORT_SYMBOL(ldlm_cli_enqueue);
EXPORT_SYMBOL(ldlm_cli_cancel);
EXPORT_SYMBOL(ldlm_match_or_enqueue);
+EXPORT_SYMBOL(ldlm_it2str);
EXPORT_SYMBOL(ldlm_test);
EXPORT_SYMBOL(ldlm_lock_dump);
EXPORT_SYMBOL(ldlm_namespace_new);
#include <linux/lustre_dlm.h>
-int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
- struct lustre_handle *connh,
+int ldlm_completion_ast(struct ldlm_lock *lock, int flags)
+{
+
+ ENTRY;
+
+ if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
+ LDLM_FL_BLOCK_CONV)) {
+ /* Go to sleep until the lock is granted. */
+ /* FIXME: or cancelled. */
+ LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock,"
+ " sleeping");
+ ldlm_lock_dump(lock);
+ ldlm_reprocess_all(lock->l_resource);
+ wait_event(lock->l_waitq, lock->l_req_mode == lock->l_granted_mode);
+ LDLM_DEBUG(lock, "client-side enqueue waking up: granted");
+ } else if (flags == LDLM_FL_WAIT_NOREPROC) {
+ wait_event(lock->l_waitq, lock->l_req_mode == lock->l_granted_mode);
+ } else if (flags == 0) {
+ wake_up(&lock->l_waitq);
+
+ }
+
+ RETURN(0);
+}
+
+
+static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
+ struct lustre_handle *parent_lockh,
+ __u64 *res_id,
+ __u32 type,
+ void *cookie, int cookielen,
+ ldlm_mode_t mode,
+ int *flags,
+ ldlm_completion_callback completion,
+ ldlm_lock_callback callback,
+ void *data,
+ __u32 data_len,
+ struct lustre_handle *lockh)
+{
+ struct ldlm_lock *lock;
+ int err;
+
+ if (ns->ns_client) {
+ CERROR("Trying to cancel local lock\n");
+ LBUG();
+ }
+
+ lock = ldlm_lock_create(ns, parent_lockh, res_id, type, mode, NULL, 0);
+ if (!lock)
+ GOTO(out_nolock, err = -ENOMEM);
+ LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
+
+ ldlm_lock_addref_internal(lock, mode);
+ ldlm_lock2handle(lock, lockh);
+ lock->l_connh = NULL;
+
+ err = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion, callback);
+ if (err != ELDLM_OK)
+ GOTO(out, err);
+
+ if (type == LDLM_EXTENT)
+ memcpy(cookie, &lock->l_extent, sizeof(lock->l_extent));
+ if ((*flags) & LDLM_FL_LOCK_CHANGED)
+ memcpy(res_id, lock->l_resource->lr_name, sizeof(*res_id));
+
+ LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)", lock);
+
+ if (lock->l_completion_ast)
+ lock->l_completion_ast(lock, *flags);
+
+ LDLM_DEBUG(lock, "client-side local enqueue END");
+ EXIT;
+ out:
+ LDLM_LOCK_PUT(lock);
+ out_nolock:
+ return err;
+}
+
+
+int ldlm_cli_enqueue(struct lustre_handle *connh,
struct ptlrpc_request *req,
struct ldlm_namespace *ns,
struct lustre_handle *parent_lock_handle,
void *cookie, int cookielen,
ldlm_mode_t mode,
int *flags,
+ ldlm_completion_callback completion,
ldlm_lock_callback callback,
void *data,
__u32 data_len,
struct lustre_handle *lockh)
{
+ struct ptlrpc_connection *connection;
struct ldlm_lock *lock;
struct ldlm_request *body;
struct ldlm_reply *reply;
int rc, size = sizeof(*body), req_passed_in = 1;
ENTRY;
+ if (connh == NULL)
+ return ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id,
+ type, cookie, cookielen, mode,
+ flags, completion, callback, data, data_len, lockh);
+ connection = client_conn2cli(connh)->cl_conn;
+
*flags = 0;
lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type, mode,
data, data_len);
ldlm_lock2handle(lock, lockh);
if (req == NULL) {
- req = ptlrpc_prep_req2(cl, conn, connh,
- LDLM_ENQUEUE, 1, &size, NULL);
+ req = ptlrpc_prep_req2(connh, LDLM_ENQUEUE, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
req_passed_in = 0;
size = sizeof(*reply);
req->rq_replen = lustre_msg_size(1, &size);
}
-
- lock->l_connection = ptlrpc_connection_addref(conn);
- lock->l_client = cl;
-
- /* I apologize in advance for what I am about to do.
- *
- * The MDS needs to send lock requests to itself, but it doesn't want to
- * go through the whole hassle of setting up proper connections. Soon,
- * these will just be function calls, but until I have the time, just
- * set this connection to level FULL so that they go through.
- *
- * Since the MDS is the only user of PLAIN locks right now, we can use
- * that to distinguish. Sorry. */
- if (type == LDLM_PLAIN)
- conn->c_level = LUSTRE_CONN_FULL;
+ lock->l_connh = connh;
+ lock->l_connection = ptlrpc_connection_addref(connection);
+ lock->l_client = client_conn2cli(connh)->cl_client;
rc = ptlrpc_queue_wait(req);
/* FIXME: status check here? */
if (!req_passed_in)
ptlrpc_free_req(req);
- rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, callback,
+ rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion,
callback);
+ if (lock->l_completion_ast)
+ lock->l_completion_ast(lock, *flags);
- if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
- LDLM_FL_BLOCK_CONV)) {
- /* Go to sleep until the lock is granted. */
- /* FIXME: or cancelled. */
- LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock,"
- " sleeping");
- ldlm_lock_dump(lock);
-#warning ldlm needs to time out
- wait_event(lock->l_waitq,
- lock->l_req_mode == lock->l_granted_mode);
- LDLM_DEBUG(lock, "client-side enqueue waking up: granted");
- }
LDLM_DEBUG(lock, "client-side enqueue END");
EXIT;
out:
return rc;
}
-int ldlm_match_or_enqueue(struct ptlrpc_client *cl,
- struct ptlrpc_connection *conn,
- struct lustre_handle *connh,
+int ldlm_match_or_enqueue(struct lustre_handle *connh,
struct ptlrpc_request *req,
struct ldlm_namespace *ns,
struct lustre_handle *parent_lock_handle,
void *cookie, int cookielen,
ldlm_mode_t mode,
int *flags,
+ ldlm_completion_callback completion,
ldlm_lock_callback callback,
void *data,
__u32 data_len,
ENTRY;
rc = ldlm_lock_match(ns, res_id, type, cookie, cookielen, mode, lockh);
if (rc == 0) {
- rc = ldlm_cli_enqueue(cl, conn, connh, req, ns,
+ rc = ldlm_cli_enqueue(connh, req, ns,
parent_lock_handle, res_id, type, cookie,
- cookielen, mode, flags, callback, data,
+ cookielen, mode, flags, completion, callback, data,
data_len, lockh);
if (rc != ELDLM_OK)
CERROR("ldlm_cli_enqueue: err: %d\n", rc);
return rc;
}
-int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh,
- struct lustre_handle *connh,
- int new_mode, int *flags)
+
+
+static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode, int *flags)
+{
+
+ if (lock->l_resource->lr_namespace->ns_client) {
+ CERROR("Trying to cancel local lock\n");
+ LBUG();
+ }
+ LDLM_DEBUG(lock, "client-side local convert");
+
+ ldlm_lock_convert(lock, new_mode, flags);
+ ldlm_reprocess_all(lock->l_resource);
+
+ LDLM_DEBUG(lock, "client-side local convert handler END");
+ LDLM_LOCK_PUT(lock);
+ RETURN(0);
+}
+
+int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
{
struct ldlm_request *body;
+ struct lustre_handle *connh;
struct ldlm_reply *reply;
struct ldlm_lock *lock;
struct ldlm_resource *res;
RETURN(-EINVAL);
}
*flags = 0;
+ connh = lock->l_connh;
+
+ if (!connh)
+ return ldlm_cli_convert_local(lock, new_mode, flags);
LDLM_DEBUG(lock, "client-side convert");
- req = ptlrpc_prep_req(cl, lock->l_connection,
- LDLM_CONVERT, 1, &size, NULL);
+ req = ptlrpc_prep_req2(connh, LDLM_CONVERT, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
if (res != NULL)
ldlm_reprocess_all(res);
- if (lock->l_req_mode != lock->l_granted_mode) {
- /* Go to sleep until the lock is granted. */
- /* FIXME: or cancelled. */
- CDEBUG(D_NET, "convert returned a blocked lock, "
- "going to sleep.\n");
- wait_event(lock->l_waitq,
- lock->l_req_mode == lock->l_granted_mode);
- CDEBUG(D_NET, "waking up, the lock must be granted.\n");
- }
+ /* Go to sleep until the lock is granted. */
+ /* FIXME: or cancelled. */
+ if (lock->l_completion_ast)
+ lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC);
EXIT;
out:
LDLM_LOCK_PUT(lock);
struct ptlrpc_request *req;
struct ldlm_lock *lock;
struct ldlm_request *body;
- int rc, size = sizeof(*body);
+ int rc = 0, size = sizeof(*body);
ENTRY;
lock = ldlm_handle2lock(lockh);
RETURN(-EINVAL);
}
- LDLM_DEBUG(lock, "client-side cancel");
- req = ptlrpc_prep_req(lock->l_client, lock->l_connection,
- LDLM_CANCEL, 1, &size, NULL);
- if (!req)
- GOTO(out, rc = -ENOMEM);
-
- body = lustre_msg_buf(req->rq_reqmsg, 0);
- memcpy(&body->lock_handle1, &lock->l_remote_handle,
- sizeof(body->lock_handle1));
-
- req->rq_replen = lustre_msg_size(0, NULL);
-
- rc = ptlrpc_queue_wait(req);
- rc = ptlrpc_check_status(req, rc);
- ptlrpc_free_req(req);
- if (rc != ELDLM_OK)
- GOTO(out, rc);
+ if (lock->l_connh) {
+ LDLM_DEBUG(lock, "client-side cancel");
+ req = ptlrpc_prep_req2(lock->l_connh, LDLM_CANCEL, 1, &size, NULL);
+ if (!req)
+ GOTO(out, rc = -ENOMEM);
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ memcpy(&body->lock_handle1, &lock->l_remote_handle,
+ sizeof(body->lock_handle1));
+
+ req->rq_replen = lustre_msg_size(0, NULL);
+
+ rc = ptlrpc_queue_wait(req);
+ rc = ptlrpc_check_status(req, rc);
+ ptlrpc_free_req(req);
+ if (rc != ELDLM_OK)
+ GOTO(out, rc);
+
+ ldlm_lock_cancel(lock);
+ } else {
+ LDLM_DEBUG(lock, "client-side local cancel");
+ if (lock->l_resource->lr_namespace->ns_client) {
+ CERROR("Trying to cancel local lock\n");
+ LBUG();
+ }
+ ldlm_lock_cancel(lock);
+ ldlm_reprocess_all(lock->l_resource);
+ LDLM_DEBUG(lock, "client-side local cancel handler END");
+ }
- ldlm_lock_cancel(lock);
EXIT;
out:
LDLM_LOCK_PUT(lock);
static spinlock_t ctl_lock = SPIN_LOCK_UNLOCKED;
static struct list_head ctl_threads;
static int regression_running = 0;
-static struct ptlrpc_client ctl_client;
-static struct ptlrpc_connection *ctl_conn;
static int ldlm_test_callback(struct lustre_handle *lockh,
struct ldlm_lock_desc *new,
if (lock1 == NULL)
LBUG();
err = ldlm_lock_enqueue(lock1, NULL, 0, &flags,
- ldlm_test_callback, ldlm_test_callback);
+ ldlm_completion_ast, ldlm_test_callback);
if (err != ELDLM_OK)
LBUG();
if (lock == NULL)
LBUG();
err = ldlm_lock_enqueue(lock, NULL, 0, &flags,
- ldlm_test_callback, ldlm_test_callback);
+ ldlm_completion_ast, ldlm_test_callback);
if (err != ELDLM_OK)
LBUG();
if (!(flags & LDLM_FL_BLOCK_GRANTED))
static int ldlm_test_network(struct obd_device *obddev,
struct ptlrpc_connection *conn)
{
- struct ldlm_obd *ldlm = &obddev->u.ldlm;
__u64 res_id[RES_NAME_SIZE] = {1, 2, 3};
struct ldlm_extent ext = {4, 6};
/* FIXME: this needs a connh as 3rd paramter, before it will work */
- err = ldlm_cli_enqueue(ldlm->ldlm_client, conn, NULL, NULL,
- obddev->obd_namespace, NULL, res_id, LDLM_EXTENT,
- &ext, sizeof(ext), LCK_PR, &flags, NULL, NULL, 0,
+ err = ldlm_cli_enqueue(NULL, NULL, obddev->obd_namespace, NULL, res_id, LDLM_EXTENT,
+ &ext, sizeof(ext), LCK_PR, &flags, NULL, NULL, NULL, 0,
&lockh1);
CERROR("ldlm_cli_enqueue: %d\n", err);
if (err == ELDLM_OK)
get_random_bytes(&random, sizeof(random));
lock_mode = random % LCK_NL + 1;
- rc = ldlm_cli_enqueue(&ctl_client, ctl_conn, NULL, NULL, ns, NULL,
+ rc = ldlm_cli_enqueue(NULL, NULL, ns, NULL,
res_id, LDLM_PLAIN, NULL, 0, lock_mode,
- &flags, ldlm_test_callback, NULL, 0,
+ &flags, ldlm_completion_ast, ldlm_test_callback, NULL, 0,
&lockh);
if (rc < 0) {
CERROR("ldlm_cli_enqueue: %d\n", rc);
#include <linux/lustre_net.h>
#include <linux/lustre_dlm.h>
+struct client_obd *client_conn2cli(struct lustre_handle *conn)
+{
+ struct obd_export *export = class_conn2export(conn);
+ if (!export)
+ LBUG();
+ return &export->exp_obd->u.cli;
+}
+extern struct recovd_obd *ptlrpc_connmgr;
+
+int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
+{
+ struct obd_ioctl_data* data = buf;
+ int rq_portal = (obddev->obd_type->typ_ops->o_getattr) ? OST_REQUEST_PORTAL : MDS_REQUEST_PORTAL;
+ int rp_portal = (obddev->obd_type->typ_ops->o_getattr) ? OSC_REPLY_PORTAL : MDC_REPLY_PORTAL;
+ struct client_obd *mdc = &obddev->u.cli;
+ char server_uuid[37];
+ int rc;
+ ENTRY;
+
+ if (data->ioc_inllen1 < 1) {
+ CERROR("requires a TARGET UUID\n");
+ RETURN(-EINVAL);
+ }
+
+ if (data->ioc_inllen1 > 37) {
+ CERROR("client UUID must be less than 38 characters\n");
+ RETURN(-EINVAL);
+ }
+
+ if (data->ioc_inllen2 < 1) {
+ CERROR("setup requires a SERVER UUID\n");
+ RETURN(-EINVAL);
+ }
+
+ if (data->ioc_inllen2 > 37) {
+ CERROR("target UUID must be less than 38 characters\n");
+ RETURN(-EINVAL);
+ }
+
+ sema_init(&mdc->cl_sem, 1);
+ mdc->cl_conn_count = 0;
+ memcpy(mdc->cl_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
+ memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
+ sizeof(server_uuid)));
+
+ mdc->cl_conn = ptlrpc_uuid_to_connection(server_uuid);
+ if (!mdc->cl_conn)
+ RETURN(-ENOENT);
+
+ OBD_ALLOC(mdc->cl_client, sizeof(*mdc->cl_client));
+ if (mdc->cl_client == NULL)
+ GOTO(out_conn, rc = -ENOMEM);
+
+ OBD_ALLOC(mdc->cl_ldlm_client, sizeof(*mdc->cl_ldlm_client));
+ if (mdc->cl_ldlm_client == NULL)
+ GOTO(out_client, rc = -ENOMEM);
+
+ /* XXX get recovery hooked in here again */
+ //ptlrpc_init_client(ptlrpc_connmgr, ll_recover,...
+
+ ptlrpc_init_client(NULL, NULL, rq_portal, rp_portal, mdc->cl_client);
+ ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
+ mdc->cl_ldlm_client);
+ mdc->cl_client->cli_name = "mdc";
+ mdc->cl_ldlm_client->cli_name = "ldlm";
+ mdc->cl_max_mdsize = sizeof(struct lov_stripe_md);
+
+ MOD_INC_USE_COUNT;
+ RETURN(0);
+
+ out_client:
+ OBD_FREE(mdc->cl_client, sizeof(*mdc->cl_client));
+ out_conn:
+ ptlrpc_put_connection(mdc->cl_conn);
+ return rc;
+}
+
+int client_obd_cleanup(struct obd_device * obddev)
+{
+ struct client_obd *mdc = &obddev->u.cli;
+
+ ptlrpc_cleanup_client(mdc->cl_client);
+ OBD_FREE(mdc->cl_client, sizeof(*mdc->cl_client));
+ ptlrpc_cleanup_client(mdc->cl_ldlm_client);
+ OBD_FREE(mdc->cl_ldlm_client, sizeof(*mdc->cl_ldlm_client));
+ ptlrpc_put_connection(mdc->cl_conn);
+
+ MOD_DEC_USE_COUNT;
+ return 0;
+}
+
+int client_obd_connect(struct lustre_handle *conn, struct obd_device *obd)
+{
+ struct client_obd *cli = &obd->u.cli;
+ struct ptlrpc_request *request;
+ int rc, size = sizeof(cli->cl_target_uuid);
+ char *tmp = cli->cl_target_uuid;
+ int rq_opc = (obd->obd_type->typ_ops->o_getattr) ? OST_CONNECT : MDS_CONNECT;
+
+ ENTRY;
+ down(&cli->cl_sem);
+ MOD_INC_USE_COUNT;
+ rc = class_connect(conn, obd);
+ if (!rc)
+ cli->cl_conn_count++;
+ else {
+ MOD_DEC_USE_COUNT;
+ up(&cli->cl_sem);
+ RETURN(rc);
+ }
+
+ if (cli->cl_conn_count > 1) {
+ up(&cli->cl_sem);
+ RETURN(0);
+ }
+
+ obd->obd_namespace =
+ ldlm_namespace_new("cli", LDLM_NAMESPACE_CLIENT);
+ if (obd->obd_namespace == NULL) {
+ up(&cli->cl_sem);
+ RETURN(-ENOMEM);
+ }
+
+ request = ptlrpc_prep_req(cli->cl_client, cli->cl_conn, rq_opc, 1, &size, &tmp);
+ if (!request)
+ GOTO(out_disco, -ENOMEM);
+
+ request->rq_level = LUSTRE_CONN_NEW;
+ request->rq_replen = lustre_msg_size(0, NULL);
+ /* Sending our local connection info breaks for local connections
+ request->rq_reqmsg->addr = conn->addr;
+ request->rq_reqmsg->cookie = conn->cookie;
+ */
+
+ rc = ptlrpc_queue_wait(request);
+ rc = ptlrpc_check_status(request, rc);
+ if (rc)
+ GOTO(out, rc);
+
+ request->rq_connection->c_level = LUSTRE_CONN_FULL;
+ cli->cl_exporth = *(struct lustre_handle *)request->rq_repmsg;
+
+ EXIT;
+ out:
+ ptlrpc_free_req(request);
+ out_disco:
+ if (rc) {
+ class_disconnect(conn);
+ MOD_DEC_USE_COUNT;
+ }
+ up(&cli->cl_sem);
+ return rc;
+}
+
+int client_obd_disconnect(struct lustre_handle *conn)
+{
+ struct obd_device *obd = class_conn2obd(conn);
+ int rq_opc = (obd->obd_type->typ_ops->o_getattr) ? OST_DISCONNECT : MDS_DISCONNECT;
+ struct ptlrpc_request *request = NULL;
+ int rc;
+ ENTRY;
+
+ down(&obd->u.cli.cl_sem);
+ if (!obd->u.cli.cl_conn_count) {
+ CERROR("disconnecting disconnected device (%s)\n",
+ obd->obd_name);
+ RETURN(0);
+ }
+
+ obd->u.cli.cl_conn_count--;
+ if (obd->u.cli.cl_conn_count)
+ GOTO(class_only, 0);
+
+ ldlm_namespace_free(obd->obd_namespace);
+ obd->obd_namespace = NULL;
+ request = ptlrpc_prep_req2(conn, rq_opc, 0, NULL, NULL);
+ if (!request)
+ RETURN(-ENOMEM);
+
+ request->rq_replen = lustre_msg_size(0, NULL);
+
+ rc = ptlrpc_queue_wait(request);
+ if (rc)
+ GOTO(out, rc);
+ class_only:
+ rc = class_disconnect(conn);
+ if (!rc)
+ MOD_DEC_USE_COUNT;
+ out:
+ if (request)
+ ptlrpc_free_req(request);
+ up(&obd->u.cli.cl_sem);
+ RETURN(rc);
+}
+
int target_handle_connect(struct ptlrpc_request *req)
{
struct obd_device *target;
void lov_packdesc(struct lov_desc *ld)
{
ld->ld_tgt_count = HTON__u32(ld->ld_tgt_count);
- ld->ld_default_stripecount = HTON__u32(ld->ld_default_stripecount);
- ld->ld_default_stripesize = HTON__u32(ld->ld_default_stripesize);
+ ld->ld_default_stripe_count = HTON__u32(ld->ld_default_stripe_count);
+ ld->ld_default_stripe_size = HTON__u32(ld->ld_default_stripe_size);
ld->ld_pattern = HTON__u32(ld->ld_pattern);
}
void lov_unpackdesc(struct lov_desc *ld)
{
ld->ld_tgt_count = NTOH__u32(ld->ld_tgt_count);
- ld->ld_default_stripecount = HTON__u32(ld->ld_default_stripecount);
- ld->ld_default_stripesize = HTON__u32(ld->ld_default_stripesize);
+ ld->ld_default_stripe_count = HTON__u32(ld->ld_default_stripe_count);
+ ld->ld_default_stripe_size = HTON__u32(ld->ld_default_stripe_size);
ld->ld_pattern = HTON__u32(ld->ld_pattern);
}
/* XXX object needs to be cleaned up if mdc_open fails */
/* XXX error handling appropriate here? */
if (lli->lli_smd == NULL || lli->lli_smd->lmd_object_id == 0) {
- struct mdc_obd *mdc = sbi2mdc(ll_s2sbi(inode->i_sb));
+ struct client_obd *mdc = sbi2mdc(ll_s2sbi(inode->i_sb));
struct inode * inode = file->f_dentry->d_inode;
lli->lli_smd = NULL;
}
oa->o_valid = OBD_MD_FLMODE;
oa->o_mode = S_IFREG | 0600;
- oa->o_easize = mdc->mdc_max_mdsize;
+ oa->o_easize = mdc->cl_max_mdsize;
rc = obd_create(ll_i2obdconn(inode), oa, &lli->lli_smd);
if (rc)
RETURN(rc);
struct obdo oa;
struct inode *inode;
struct lov_stripe_md *smd;
- struct ll_inode_info *ii;
+ struct ll_inode_info *ii = NULL;
if (dentry->d_it->it_disposition == 0) {
memset(&oa, 0, sizeof(oa));
RETURN(rc);
out_destroy:
- oa.o_easize = ii->lli_smd->lmd_size;
+ oa.o_easize = ii->lli_smd->lmd_easize;
err = obd_destroy(ll_i2obdconn(dir), &oa, ii->lli_smd);
if (err)
CERROR("error destroying object %Ld in error path: err = %d\n",
int err;
struct ptlrpc_request *request;
- ptlrpc_readdress_connection(sbi2mdc(sbi)->mdc_conn, "mds");
+ ptlrpc_readdress_connection(sbi2mdc(sbi)->cl_conn, "mds");
- err = connmgr_connect(ptlrpc_connmgr, sbi2mdc(sbi)->mdc_conn);
+ err = connmgr_connect(ptlrpc_connmgr, sbi2mdc(sbi)->cl_conn);
if (err) {
CERROR("cannot connect to MDS: rc = %d\n", err);
- ptlrpc_put_connection(sbi2mdc(sbi)->mdc_conn);
+ ptlrpc_put_connection(sbi2mdc(sbi)->cl_conn);
GOTO(out_disc, err = -ENOTCONN);
}
- sbi2mdc(sbi)->mdc_conn->c_level = LUSTRE_CONN_CON;
+ sbi2mdc(sbi)->cl_conn->c_level = LUSTRE_CONN_CON;
/* XXX: need to store the last_* values somewhere */
err = mdc_getstatus(&sbi->ll_mdc_conn,
CERROR("cannot mds_connect: rc = %d\n", err);
GOTO(out_disc, err = -ENOTCONN);
}
- sbi2mdc(sbi)->mdc_client->cli_last_rcvd = last_xid;
- sbi2mdc(sbi)->mdc_conn->c_level = LUSTRE_CONN_RECOVD;
+ sbi2mdc(sbi)->cl_client->cli_last_rcvd = last_xid;
+ sbi2mdc(sbi)->cl_conn->c_level = LUSTRE_CONN_RECOVD;
out_disc:
return err;
}
- sbi2mdc(sbi)->mdc_conn->c_level = LUSTRE_CONN_FULL;
+ sbi2mdc(sbi)->cl_conn->c_level = LUSTRE_CONN_FULL;
recovd_cli_fixed(cli);
/* Finally, continue what we delayed since recovery started */
CDEBUG(D_INFO, "calling punch for %ld (all bytes after %Ld)\n",
(long)oa.o_id, (unsigned long long)oa.o_size);
+
oa.o_size = inode->i_size;
oa.o_id = md->lmd_object_id;
- oa.o_valid = OBD_MD_FLSIZE | OBD_MD_FLID;
- /* truncate == punch from i_size onwards */
- err = obd_punch(ll_i2obdconn(inode), &oa, md, -1 - oa.o_size, oa.o_size);
+ oa.o_valid = OBD_MD_FLID;
+ /* truncate == punch to/from start from/to end:
+ set end to -1 for that. */
+ err = obd_punch(ll_i2obdconn(inode), &oa, md, oa.o_size,
+ 0xffffffffffffffff);
if (err)
CERROR("obd_truncate fails (%d)\n", err);
else
this_char != NULL;
this_char = strtok (NULL, ",")) {
CDEBUG(D_SUPER, "this_char %s\n", this_char);
- if ( (!*ost && (*ost = ll_read_opt("ost", this_char)))||
- (!*mds && (*mds = ll_read_opt("mds", this_char))) )
+ if ( (!*ost && (*ost = ll_read_opt("osc", this_char)))||
+ (!*mds && (*mds = ll_read_opt("mdc", this_char))) )
continue;
}
EXIT;
struct inode *root = 0;
struct obd_device *obd;
struct ll_sb_info *sbi;
- char *ost = NULL;
- char *mds = NULL;
+ char *osc = NULL;
+ char *mdc = NULL;
int err;
struct ll_fid rootfid;
struct statfs sfs;
sb->u.generic_sbp = sbi;
- ll_options(data, &ost, &mds);
+ ll_options(data, &osc, &mdc);
- if (!ost) {
- CERROR("no ost\n");
+ if (!osc) {
+ CERROR("no osc\n");
GOTO(out_free, sb = NULL);
}
- if (!mds) {
- CERROR("no mds\n");
+ if (!mdc) {
+ CERROR("no mdc\n");
GOTO(out_free, sb = NULL);
}
- obd = class_uuid2obd(mds);
+ obd = class_uuid2obd(mdc);
if (!obd) {
- CERROR("MDS %s: not setup or attached\n", mds);
+ CERROR("MDC %s: not setup or attached\n", mdc);
GOTO(out_free, sb = NULL);
}
#if 0
- err = connmgr_connect(ptlrpc_connmgr, sbi->ll_mds_conn);
+ err = connmgr_connect(ptlrpc_connmgr, sbi->ll_mdc_conn);
if (err) {
- CERROR("cannot connect to MDS: rc = %d\n", err);
+ CERROR("cannot connect to MDC: rc = %d\n", err);
GOTO(out_rpc, sb = NULL);
}
#endif
err = obd_connect(&sbi->ll_mdc_conn, obd);
if (err) {
- CERROR("cannot connect to %s: rc = %d\n", mds, err);
+ CERROR("cannot connect to %s: rc = %d\n", mdc, err);
GOTO(out_free, sb = NULL);
}
- sbi2mdc(sbi)->mdc_conn->c_level = LUSTRE_CONN_FULL;
+ sbi2mdc(sbi)->cl_conn->c_level = LUSTRE_CONN_FULL;
- obd = class_uuid2obd(ost);
+ obd = class_uuid2obd(osc);
if (!obd) {
- CERROR("OST %s: not setup or attached\n", ost);
+ CERROR("OSC %s: not setup or attached\n", osc);
GOTO(out_mdc, sb = NULL);
}
err = obd_connect(&sbi->ll_osc_conn, obd);
if (err) {
- CERROR("cannot connect to %s: rc = %d\n", ost, err);
+ CERROR("cannot connect to %s: rc = %d\n", osc, err);
GOTO(out_mdc, sb = NULL);
}
err = mdc_getstatus(&sbi->ll_mdc_conn, &rootfid, &last_committed,
&last_rcvd, &last_xid, &request);
if (err) {
- CERROR("cannot mds_connect: rc = %d\n", err);
+ CERROR("cannot mdc_connect: rc = %d\n", err);
GOTO(out_request, sb = NULL);
}
CDEBUG(D_SUPER, "rootfid %Ld\n", (unsigned long long)rootfid.id);
ptlrpc_free_req(request);
out_dev:
- if (mds)
- OBD_FREE(mds, strlen(mds) + 1);
- if (ost)
- OBD_FREE(ost, strlen(ost) + 1);
+ if (mdc)
+ OBD_FREE(mdc, strlen(mdc) + 1);
+ if (osc)
+ OBD_FREE(osc, strlen(osc) + 1);
RETURN(sb);
struct lov_stripe_md *md = lli->lli_smd;
if (md) {
- OBD_FREE(md, md->lmd_size);
+ OBD_FREE(md, md->lmd_easize);
lli->lli_smd = NULL;
}
if (lli->lli_symlink_name) {
GOTO(out, -EINVAL);
oa.o_id = md->lmd_object_id;
- oa.o_easize = md->lmd_size;
+ oa.o_easize = md->lmd_easize;
if (oa.o_id == 0) {
CERROR("This really happens\n");
/* No obdo was ever created */
inline int ll_stripe_md_size(struct super_block *sb)
{
- struct mdc_obd *mdc = sbi2mdc(ll_s2sbi(sb));
- return mdc->mdc_max_mdsize;
+ struct client_obd *mdc = sbi2mdc(ll_s2sbi(sb));
+ return mdc->cl_max_mdsize;
}
static void ll_to_inode(struct inode *dst, struct ll_inode_md *md)
if (md && md->md && md->md->lmd_stripe_count) {
struct lov_stripe_md *smd = md->md;
int size = ll_stripe_md_size(dst->i_sb);
- if (md->md->lmd_size != size) {
+ if (md->md->lmd_easize != size) {
CERROR("Striping metadata size error %ld\n",
dst->i_ino);
LBUG();
*
* Copyright (C) 2002 Cluster File Systems, Inc.
* Author: Phil Schwan <phil@off.net>
+ * Peter Braam <braam@clusterfs.com>
*
* This code is issued under the GNU General Public License.
* See the file COPYING in this distribution
extern struct obd_device obd_dev[MAX_OBD_DEVICES];
/* obd methods */
-
static int lov_connect(struct lustre_handle *conn, struct obd_device *obd)
{
struct ptlrpc_request *req;
struct lov_obd *lov = &obd->u.lov;
struct lustre_handle mdc_conn;
uuid_t *uuidarray;
- int rc;
+ int rc, rc2;
int i;
MOD_INC_USE_COUNT;
RETURN(rc);
}
+ /* retrieve LOV metadata from MDS */
rc = obd_connect(&mdc_conn, lov->mdcobd);
if (rc) {
CERROR("cannot connect to mdc: rc = %d\n", rc);
}
rc = mdc_getlovinfo(obd, &mdc_conn, &uuidarray, &req);
- obd_disconnect(&mdc_conn);
-
- if (rc) {
- CERROR("cannot get lov info %d\n", rc);
- GOTO(out, rc);
- }
-
-
- if (lov->desc.ld_tgt_count > 1000) {
- CERROR("configuration error: target count > 1000 (%d)\n",
- lov->desc.ld_tgt_count);
- GOTO(out, rc = -EINVAL);
+ rc2 = obd_disconnect(&mdc_conn);
+ if (rc || rc2) {
+ CERROR("cannot get lov info or disconnect %d/%d\n", rc, rc2);
+ GOTO(out, (rc) ? rc : rc2 );
}
+ /* sanity... */
if (strcmp(obd->obd_uuid, lov->desc.ld_uuid)) {
CERROR("lov uuid %s not on mds device (%s)\n",
obd->obd_uuid, lov->desc.ld_uuid);
GOTO(out, rc = -EINVAL);
}
-
+ if (lov->desc.ld_tgt_count > 1000) {
+ CERROR("configuration error: target count > 1000 (%d)\n",
+ lov->desc.ld_tgt_count);
+ GOTO(out, rc = -EINVAL);
+ }
if (req->rq_repmsg->bufcount < 2 || req->rq_repmsg->buflens[1] <
sizeof(uuid_t) * lov->desc.ld_tgt_count) {
CERROR("invalid uuid array returned\n");
for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
struct obd_device *tgt = class_uuid2obd(uuidarray[i]);
if (!tgt) {
- CERROR("Target %s not configured\n", uuidarray[i]);
+ CERROR("Target %s not attached\n", uuidarray[i]);
GOTO(out_mem, rc = -EINVAL);
}
+ if (!(tgt->obd_flags & OBD_SET_UP)) {
+ CERROR("Target %s not set up\n", uuidarray[i]);
+ GOTO(out_mem, rc = -EINVAL);
+ }
rc = obd_connect(&lov->tgts[i].conn, tgt);
if (rc) {
CERROR("Target %s connect error %d\n",
out_mem:
if (rc) {
for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
- int rc2;
rc2 = obd_disconnect(&lov->tgts[i].conn);
if (rc2)
CERROR("BAD: Target %s disconnect error %d\n",
out:
if (rc)
class_disconnect(conn);
-
ptlrpc_free_req(req);
return rc;
}
lov->tgts = NULL;
out_local:
-
rc = class_disconnect(conn);
if (!rc)
MOD_DEC_USE_COUNT;
-
return rc;
}
RETURN(-EINVAL);
}
- /* FIXME: we should make a connection instead perhaps to avoid
- the mdc from walking away? The fs guarantees this. */
lov->mdcobd = class_uuid2obd(data->ioc_inlbuf1);
if (!lov->mdcobd) {
CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid,
return size;
}
+/* the LOV counts on oa->o_id to be set as the LOV object id */
static int lov_create(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md **ea)
{
int rc = 0, i;
CERROR("lov_create needs EA for striping information\n");
RETURN(-EINVAL);
}
-
if (!export)
RETURN(-EINVAL);
lov = &export->exp_obd->u.lov;
}
md = *ea;
- md->lmd_size = oa->o_easize;
+ md->lmd_easize = oa->o_easize;
md->lmd_object_id = oa->o_id;
if (!md->lmd_stripe_count) {
- md->lmd_stripe_count = lov->desc.ld_default_stripecount;
+ md->lmd_stripe_count = lov->desc.ld_default_stripe_count;
}
for (i = 0; i < md->lmd_stripe_count; i++) {
}
static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
-struct lov_stripe_md *ea)
+ struct lov_stripe_md *md)
{
int rc = 0, i;
struct obdo tmp;
struct obd_export *export = class_conn2export(conn);
struct lov_obd *lov;
- struct lov_stripe_md *md;
ENTRY;
- if (!ea) {
+ if (!md) {
CERROR("LOV requires striping ea for desctruction\n");
RETURN(-EINVAL);
}
RETURN(-ENODEV);
lov = &export->exp_obd->u.lov;
- md = ea;
-
for (i = 0; i < md->lmd_stripe_count; i++) {
/* create data objects with "parent" OA */
memcpy(&tmp, oa, sizeof(tmp));
RETURN(rc);
}
-#if 0
-static int lov_getattr(struct lustre_handle *conn, struct obdo *oa)
+static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md *md)
{
- int rc;
+ int rc = 0, i;
+ struct obdo tmp;
+ struct obd_export *export = class_conn2export(conn);
+ struct lov_obd *lov;
ENTRY;
- if (!class_conn2export(conn))
- RETURN(-EINVAL);
+ if (!md) {
+ CERROR("LOV requires striping ea for desctruction\n");
+ RETURN(-EINVAL);
+ }
+
+ if (!export || !export->exp_obd)
+ RETURN(-ENODEV);
+
+ lov = &export->exp_obd->u.lov;
+ for (i = 0; i < md->lmd_stripe_count; i++) {
+ /* create data objects with "parent" OA */
+ memcpy(&tmp, oa, sizeof(tmp));
+ oa->o_id = md->lmd_objects[i].l_object_id;
- rc = obd_getattr(&conn->oc_dev->obd_multi_conn[0], oa);
+ rc = obd_getattr(&lov->tgts[i].conn, &tmp, NULL);
+ if (!rc) {
+ CERROR("Error getattr object %Ld on %d\n",
+ oa->o_id, i);
+ }
+ /* XXX can do something more sophisticated here... */
+ if (i == 0 ) {
+ obd_id id = oa->o_id;
+ memcpy(oa, &tmp, sizeof(tmp));
+ oa->o_id = id;
+ }
+ }
RETURN(rc);
}
-static int lov_setattr(struct lustre_handle *conn, struct obdo *oa)
+static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md *md)
{
- int rc, retval, i;
+ int rc = 0, i;
+ struct obdo tmp;
+ struct obd_export *export = class_conn2export(conn);
+ struct lov_obd *lov;
ENTRY;
- if (!class_conn2export(conn))
- RETURN(-EINVAL);
-
- for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
- rc = obd_setattr(&conn->oc_dev->obd_multi_conn[i], oa);
- if (i == 0)
- retval = rc;
- else if (retval != rc)
- CERROR("different results on multiple OBDs!\n");
+ if (!md) {
+ CERROR("LOV requires striping ea for desctruction\n");
+ RETURN(-EINVAL);
}
+ if (!export || !export->exp_obd)
+ RETURN(-ENODEV);
+
+ lov = &export->exp_obd->u.lov;
+ for (i = 0; i < md->lmd_stripe_count; i++) {
+ /* create data objects with "parent" OA */
+ memcpy(&tmp, oa, sizeof(tmp));
+ oa->o_id = md->lmd_objects[i].l_object_id;
+
+ rc = obd_setattr(&lov->tgts[i].conn, &tmp, NULL);
+ if (!rc) {
+ CERROR("Error setattr object %Ld on %d\n",
+ oa->o_id, i);
+ }
+ }
RETURN(rc);
}
-static int lov_open(struct lustre_handle *conn, struct obdo *oa)
+static int lov_open(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md *md)
{
- int rc, retval, i;
+ int rc = 0, i;
+ struct obdo tmp;
+ struct obd_export *export = class_conn2export(conn);
+ struct lov_obd *lov;
ENTRY;
- if (!class_conn2export(conn))
- RETURN(-EINVAL);
-
- for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
- rc = obd_open(&conn->oc_dev->obd_multi_conn[i], oa);
- if (i == 0)
- retval = rc;
- else if (retval != rc)
- CERROR("different results on multiple OBDs!\n");
+ if (!md) {
+ CERROR("LOV requires striping ea for desctruction\n");
+ RETURN(-EINVAL);
}
+ if (!export || !export->exp_obd)
+ RETURN(-ENODEV);
+
+ lov = &export->exp_obd->u.lov;
+ for (i = 0; i < md->lmd_stripe_count; i++) {
+ /* create data objects with "parent" OA */
+ memcpy(&tmp, oa, sizeof(tmp));
+ oa->o_id = md->lmd_objects[i].l_object_id;
+
+ rc = obd_open(&lov->tgts[i].conn, &tmp, NULL);
+ if (!rc) {
+ CERROR("Error getattr object %Ld on %d\n",
+ oa->o_id, i);
+ }
+ }
RETURN(rc);
}
-static int lov_close(struct lustre_handle *conn, struct obdo *oa)
+
+static int lov_close(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md *md)
{
- int rc, retval, i;
+ int rc = 0, i;
+ struct obdo tmp;
+ struct obd_export *export = class_conn2export(conn);
+ struct lov_obd *lov;
ENTRY;
- if (!class_conn2export(conn))
- RETURN(-EINVAL);
-
- for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
- rc = obd_close(&conn->oc_dev->obd_multi_conn[i], oa);
- if (i == 0)
- retval = rc;
- else if (retval != rc)
- CERROR("different results on multiple OBDs!\n");
+ if (!md) {
+ CERROR("LOV requires striping ea for desctruction\n");
+ RETURN(-EINVAL);
}
- RETURN(rc);
-}
+ if (!export || !export->exp_obd)
+ RETURN(-ENODEV);
+ lov = &export->exp_obd->u.lov;
+ for (i = 0; i < md->lmd_stripe_count; i++) {
+ /* create data objects with "parent" OA */
+ memcpy(&tmp, oa, sizeof(tmp));
+ oa->o_id = md->lmd_objects[i].l_object_id;
+ rc = obd_close(&lov->tgts[i].conn, &tmp, NULL);
+ if (!rc) {
+ CERROR("Error getattr object %Ld on %d\n",
+ oa->o_id, i);
+ }
+ }
+ RETURN(rc);
+}
-/* FIXME: maybe we'll just make one node the authoritative attribute node, then
- * we can send this 'punch' to just the authoritative node and the nodes
- * that the punch will affect. */
-static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
- obd_size count, obd_off offset)
+/* compute offset in stripe i corresponds to offset "in" */
+__u64 lov_offset(struct lov_stripe_md *md, __u64 in, int i)
{
- int rc, retval, i;
- ENTRY;
-
- if (!class_conn2export(conn))
- RETURN(-EINVAL);
+ __u32 ssz = md->lmd_stripe_size;
+ /* full stripes across all * stripe size */
+ __u32 out = ( ((__u32)in) / (md->lmd_stripe_count * ssz)) * ssz;
+ __u32 off = (__u32)in % (md->lmd_stripe_count * ssz);
- for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
- rc = obd_punch(&conn->oc_dev->obd_multi_conn[i], oa, count,
- offset);
- if (i == 0)
- retval = rc;
- else if (retval != rc)
- CERROR("different results on multiple OBDs!\n");
+ if ( in == 0xffffffffffffffff ) {
+ return 0xffffffffffffffff;
}
- RETURN(rc);
+ if ( (i+1) * ssz < off )
+ out += ssz;
+ else if ( i * ssz > off )
+ out += 0;
+ else
+ out += (off - (i * ssz)) % ssz;
+
+ return (__u64) out;
}
+
struct lov_callback_data {
atomic_t count;
- wait_queue_head waitq;
+ wait_queue_head_t waitq;
};
static void lov_read_callback(struct ptlrpc_bulk_desc *desc, void *data)
if (sigismember(&(current->pending.signal), SIGKILL) ||
sigismember(&(current->pending.signal), SIGTERM) ||
sigismember(&(current->pending.signal), SIGINT)) {
- cb_data->flags |= PTL_RPC_FL_INTR;
+ // FIXME XXX what here
+ // cb_data->flags |= PTL_RPC_FL_INTR;
RETURN(1);
}
if (atomic_read(&cb_data->count) == 0)
RETURN(0);
}
-/* buffer must lie in user memory here */
+
+/* FIXME: maybe we'll just make one node the authoritative attribute node, then
+ * we can send this 'punch' to just the authoritative node and the nodes
+ * that the punch will affect. */
+static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md *md,
+ obd_off start, obd_off end)
+{
+ int rc = 0, i;
+ struct obdo tmp;
+ struct obd_export *export = class_conn2export(conn);
+ struct lov_obd *lov;
+ ENTRY;
+
+ if (!md) {
+ CERROR("LOV requires striping ea for desctruction\n");
+ RETURN(-EINVAL);
+ }
+
+ if (!export || !export->exp_obd)
+ RETURN(-ENODEV);
+
+ lov = &export->exp_obd->u.lov;
+ for (i = 0; i < md->lmd_stripe_count; i++) {
+ __u64 starti = lov_offset(md, start, i);
+ __u64 endi = lov_offset(md, end, i);
+
+ /* create data objects with "parent" OA */
+ memcpy(&tmp, oa, sizeof(tmp));
+ oa->o_id = md->lmd_objects[i].l_object_id;
+
+ rc = obd_punch(&lov->tgts[i].conn, &tmp, NULL,
+ starti, endi);
+ if (!rc) {
+ CERROR("Error punch object %Ld on %d\n",
+ oa->o_id, i);
+ }
+ }
+ RETURN(rc);
+}
+
+
+#if 0
static int lov_brw(int cmd, struct lustre_handle *conn, obd_count num_oa,
struct obdo **oa,
obd_count *oa_bufs, struct page **buf,
o_disconnect: lov_disconnect,
o_create: lov_create,
o_destroy: lov_destroy,
-#if 0
o_getattr: lov_getattr,
o_setattr: lov_setattr,
o_open: lov_open,
o_close: lov_close,
+#if 0
o_brw: lov_pgcache_brw,
o_punch: lov_punch,
o_enqueue: lov_enqueue,
modulefs_DATA = mdc.o
EXTRA_PROGRAMS = mdc
-LINX=mds_updates.c ll_pack.c lov_pack.c
+LINX=l_net.c mds_updates.c ll_pack.c lov_pack.c
mdc_SOURCES = mdc_request.c mdc_reint.c $(LINX)
lov_pack.c:
test -e lov_pack.c || ln -sf $(top_srcdir)/lib/lov_pack.c .
ll_pack.c:
test -e ll_pack.c || ln -sf $(top_srcdir)/lib/ll_pack.c .
+l_net.c:
+ test -e l_net.c || ln -sf $(top_srcdir)/lib/l_net.c .
+
mds_updates.c:
test -e mds_updates.c || ln -sf $(top_srcdir)/lib/mds_updates.c .
ENTRY;
mdc_con2cl(conn, &cl, &connection, &rconn);
- req = ptlrpc_prep_req2(cl, connection, rconn,
+ req = ptlrpc_prep_req2(conn,
MDS_REINT, 1, &size, NULL);
if (!req)
RETURN(-ENOMEM);
struct ptlrpc_request **request)
{
struct mds_rec_create *rec;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
struct ptlrpc_request *req;
int rc, size[3] = {sizeof(struct mds_rec_create), namelen + 1, 0};
char *tmp, *bufs[3] = {NULL, NULL, NULL};
dir->i_ino, namelen, name);
LBUG();
}
- size[2] = smd->lmd_size;
+ size[2] = smd->lmd_easize;
bufs[2] = (char *)smd;
bufcount = 3;
} else if (S_ISLNK(mode)) {
bufcount = 3;
}
- mdc_con2cl(conn, &cl, &connection, &rconn);
- req = ptlrpc_prep_req2(cl, connection, rconn,
- MDS_REINT, bufcount, size, bufs);
+ req = ptlrpc_prep_req2(conn, MDS_REINT, bufcount, size, bufs);
if (!req)
RETURN(-ENOMEM);
if (S_ISREG(mode)) {
tmp = lustre_msg_buf(req->rq_reqmsg, 2);
- memcpy(tmp, smd, smd->lmd_size);
+ memcpy(tmp, smd, smd->lmd_easize);
} else if (S_ISLNK(mode)) {
tmp = lustre_msg_buf(req->rq_reqmsg, 2);
LOGL0(tgt, tgtlen, tmp);
struct inode *dir, struct inode *child, const char *name,
int namelen, struct ptlrpc_request **request)
{
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
struct ptlrpc_request *req;
int rc, size[2] = {sizeof(struct mds_rec_unlink), namelen + 1};
ENTRY;
- mdc_con2cl(conn, &cl, &connection, &rconn);
- req = ptlrpc_prep_req2(cl, connection, rconn,
- MDS_REINT, 2, size, NULL);
+ req = ptlrpc_prep_req2(conn, MDS_REINT, 2, size, NULL);
if (!req)
RETURN(-ENOMEM);
struct dentry *src, struct inode *dir, const char *name,
int namelen, struct ptlrpc_request **request)
{
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
struct ptlrpc_request *req;
int rc, size[2] = {sizeof(struct mds_rec_link), namelen + 1};
ENTRY;
- mdc_con2cl(conn, &cl, &connection, &rconn);
- req = ptlrpc_prep_req2(cl, connection, rconn,
- MDS_REINT, 2, size, NULL);
+ req = ptlrpc_prep_req2(conn, MDS_REINT, 2, size, NULL);
if (!req)
RETURN(-ENOMEM);
int oldlen, const char *new, int newlen,
struct ptlrpc_request **request)
{
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
struct ptlrpc_request *req;
int rc, size[3] = {sizeof(struct mds_rec_rename), oldlen + 1,
newlen + 1};
ENTRY;
- mdc_con2cl(conn, &cl, &connection, &rconn);
- req = ptlrpc_prep_req2(cl, connection, rconn,
- MDS_REINT, 3, size, NULL);
+ req = ptlrpc_prep_req2(conn, MDS_REINT, 3, size, NULL);
if (!req)
RETURN(-ENOMEM);
struct lustre_handle **rconn)
{
struct obd_export *export;
- struct mdc_obd *mdc;
+ struct client_obd *mdc;
export = class_conn2export(conn);
if (!export)
return -ENOTCONN;
- mdc = &export->exp_obd->u.mdc;
+ mdc = &export->exp_obd->u.cli;
- *cl = mdc->mdc_client;
- *connection = mdc->mdc_conn;
+ *cl = mdc->cl_client;
+ *connection = mdc->cl_conn;
*rconn = &export->exp_rconnh;
return 0;
}
-static int mdc_con2dlmcl(struct lustre_handle *conn, struct ptlrpc_client **cl,
- struct ptlrpc_connection **connection,
- struct lustre_handle **rconn)
-{
- struct obd_export *export;
- struct mdc_obd *mdc;
-
- export = class_conn2export(conn);
- if (!export)
- return -ENOTCONN;
-
- mdc = &export->exp_obd->u.mdc;
-
- *cl = mdc->mdc_ldlm_client;
- *connection = mdc->mdc_conn;
- *rconn = &export->exp_rconnh;
-
- return 0;
-}
-
-
int mdc_getstatus(struct lustre_handle *conn, struct ll_fid *rootfid,
__u64 *last_committed, __u64 *last_rcvd,
__u32 *last_xid, struct ptlrpc_request **request)
{
struct ptlrpc_request *req;
struct mds_body *body;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
int rc, size = sizeof(*body);
ENTRY;
- mdc_con2cl(conn, &cl, &connection, &rconn);
- req = ptlrpc_prep_req2(cl, connection, rconn,
- MDS_GETSTATUS, 1, &size, NULL);
+ req = ptlrpc_prep_req2(conn, MDS_GETSTATUS, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
struct ptlrpc_request *req;
struct mds_status_req *streq;
struct lov_obd *lov = &obd->u.lov;
- struct mdc_obd *mdc = &lov->mdcobd->u.mdc;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
+ struct client_obd *mdc = &lov->mdcobd->u.cli;
struct lov_desc *desc = &lov->desc;
int rc, size[2] = {sizeof(*streq)};
ENTRY;
- mdc_con2cl(mdc_connh, &cl, &connection, &rconn);
- req = ptlrpc_prep_req2(cl, connection, rconn,
- MDS_GETLOVINFO, 1, size, NULL);
+ req = ptlrpc_prep_req2(mdc_connh, MDS_GETLOVINFO, 1, size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
*uuids = lustre_msg_buf(req->rq_repmsg, 1);
lov_unpackdesc(desc);
}
- mdc->mdc_max_mdsize = sizeof(*desc) +
+ mdc->cl_max_mdsize = sizeof(*desc) +
desc->ld_tgt_count * sizeof(uuid_t);
EXIT;
ENTRY;
mdc_con2cl(conn, &cl, &connection, &rconn);
- req = ptlrpc_prep_req2(cl, connection, rconn,
- MDS_GETATTR, 1, size, NULL);
+ req = ptlrpc_prep_req2(conn, MDS_GETATTR, 1, size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
body->valid = valid;
if (S_ISREG(type)) {
- struct mdc_obd *mdc = &class_conn2obd(conn)->u.mdc;
+ struct client_obd *mdc = &class_conn2obd(conn)->u.cli;
bufcount = 2;
- size[1] = mdc->mdc_max_mdsize;
+ size[1] = mdc->cl_max_mdsize;
} else if (valid & OBD_MD_LINKNAME) {
bufcount = 2;
size[1] = ea_size;
{
struct ptlrpc_request *req;
struct obd_device *obddev = class_conn2obd(conn);
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
__u64 res_id[RES_NAME_SIZE] = {dir->i_ino};
int size[5] = {sizeof(struct ldlm_request), sizeof(struct ldlm_intent)};
int rc, flags;
int repsize[3] = {sizeof(struct ldlm_reply),
sizeof(struct mds_body),
- obddev->u.mdc.mdc_max_mdsize};
+ obddev->u.cli.cl_max_mdsize};
struct ldlm_reply *dlm_rep;
struct ldlm_intent *lit;
ENTRY;
- LDLM_DEBUG_NOLOCK("mdsintent %d dir %ld", it->it_op, dir->i_ino);
+ LDLM_DEBUG_NOLOCK("mdsintent %s dir %ld", ldlm_it2str(it->it_op), dir->i_ino);
switch (it->it_op) {
case IT_MKDIR:
break;
}
- mdc_con2dlmcl(conn, &cl, &connection, &rconn);
if (it->it_op & (IT_MKDIR | IT_CREAT | IT_SYMLINK | IT_MKNOD)) {
size[2] = sizeof(struct mds_rec_create);
size[3] = de->d_name.len + 1;
size[4] = tgtlen + 1;
- req = ptlrpc_prep_req2(cl, connection, rconn,
- LDLM_ENQUEUE, 5, size, NULL);
+ req = ptlrpc_prep_req2(conn, LDLM_ENQUEUE, 5, size, NULL);
if (!req)
RETURN(-ENOMEM);
size[2] = sizeof(struct mds_rec_rename);
size[3] = old_de->d_name.len + 1;
size[4] = de->d_name.len + 1;
- req = ptlrpc_prep_req2(cl, connection, rconn,
- LDLM_ENQUEUE, 5, size, NULL);
+ req = ptlrpc_prep_req2(conn, LDLM_ENQUEUE, 5, size, NULL);
if (!req)
RETURN(-ENOMEM);
} else if (it->it_op == IT_UNLINK || it->it_op == IT_RMDIR) {
size[2] = sizeof(struct mds_rec_unlink);
size[3] = de->d_name.len + 1;
- req = ptlrpc_prep_req2(cl, connection, rconn,
- LDLM_ENQUEUE, 4, size, NULL);
+ req = ptlrpc_prep_req2(conn, LDLM_ENQUEUE, 4, size, NULL);
if (!req)
RETURN(-ENOMEM);
size[2] = sizeof(struct mds_body);
size[3] = de->d_name.len + 1;
- req = ptlrpc_prep_req2(cl, connection, rconn,
- LDLM_ENQUEUE, 4, size, NULL);
+ req = ptlrpc_prep_req2(conn, LDLM_ENQUEUE, 4, size, NULL);
if (!req)
RETURN(-ENOMEM);
/* get ready for the reply */
req->rq_replen = lustre_msg_size(3, repsize);
} else if (it->it_op == IT_READDIR) {
- req = ptlrpc_prep_req2(cl, connection, rconn,
- LDLM_ENQUEUE, 1, size, NULL);
+ req = ptlrpc_prep_req2(conn, LDLM_ENQUEUE, 1, size, NULL);
if (!req)
RETURN(-ENOMEM);
RETURN(-1);
}
#warning FIXME: the data here needs to be different if a lock was granted for a different inode
- rc = ldlm_cli_enqueue(cl, connection, rconn, req,
- obddev->obd_namespace, NULL, res_id, lock_type,
- NULL, 0, lock_mode, &flags,
+ rc = ldlm_cli_enqueue(conn, req, obddev->obd_namespace, NULL, res_id, lock_type,
+ NULL, 0, lock_mode, &flags, ldlm_completion_ast,
(void *)mdc_lock_callback, data, datalen, lockh);
if (rc == -ENOENT || rc == ELDLM_LOCK_ABORTED) {
lock_mode = 0;
if (smd != NULL) {
bufcount = 2;
- size[1] = smd->lmd_size;
+ size[1] = smd->lmd_easize;
}
mdc_con2cl(conn, &cl, &connection, &rconn);
- req = ptlrpc_prep_req2(cl, connection, rconn,
- MDS_OPEN, bufcount, size, NULL);
+ req = ptlrpc_prep_req2(conn, MDS_OPEN, bufcount, size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
body->extra = cookie;
if (smd != NULL)
- memcpy(lustre_msg_buf(req->rq_reqmsg, 1), smd, smd->lmd_size);
+ memcpy(lustre_msg_buf(req->rq_reqmsg, 1), smd, smd->lmd_easize);
req->rq_replen = lustre_msg_size(1, size);
int mdc_close(struct lustre_handle *conn,
obd_id ino, int type, __u64 fh, struct ptlrpc_request **request)
{
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
struct mds_body *body;
int rc, size = sizeof(*body);
struct ptlrpc_request *req;
- mdc_con2cl(conn, &cl, &connection, &rconn);
- req = ptlrpc_prep_req2(cl, connection, rconn,
- MDS_CLOSE, 1, &size, NULL);
+ req = ptlrpc_prep_req2(conn, MDS_CLOSE, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
int mdc_readpage(struct lustre_handle *conn, obd_id ino, int type, __u64 offset,
char *addr, struct ptlrpc_request **request)
{
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
+ struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
struct ptlrpc_request *req = NULL;
struct ptlrpc_bulk_desc *desc = NULL;
struct ptlrpc_bulk_page *bulk = NULL;
CDEBUG(D_INODE, "inode: %ld\n", (long)ino);
- mdc_con2cl(conn, &cl, &connection, &rconn);
desc = ptlrpc_prep_bulk(connection);
if (desc == NULL)
GOTO(out, rc = -ENOMEM);
- req = ptlrpc_prep_req2(cl, connection, rconn,
- MDS_READPAGE, 1, &size, NULL);
+ req = ptlrpc_prep_req2(conn, MDS_READPAGE, 1, &size, NULL);
if (!req)
GOTO(out2, rc = -ENOMEM);
int mdc_statfs(struct lustre_handle *conn, struct statfs *sfs,
struct ptlrpc_request **request)
{
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
struct obd_statfs *osfs;
struct ptlrpc_request *req;
int rc, size = sizeof(*osfs);
ENTRY;
- mdc_con2cl(conn, &cl, &connection, &rconn);
- req = ptlrpc_prep_req2(cl, connection, rconn,
- MDS_STATFS, 0, NULL, NULL);
+ req = ptlrpc_prep_req2(conn, MDS_STATFS, 0, NULL, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
req->rq_replen = lustre_msg_size(1, &size);
return rc;
}
-static int mdc_ioctl(long cmd, struct lustre_handle *conn, int len, void *karg,
- void *uarg)
-{
-#if 0
- /* FIXME XXX : This should use the new ioc_data to pass args in */
- int err = 0;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
- struct ptlrpc_request *request;
-
- ENTRY;
-
- if (_IOC_TYPE(cmd) != IOC_REQUEST_TYPE ||
- _IOC_NR(cmd) < IOC_REQUEST_MIN_NR ||
- _IOC_NR(cmd) > IOC_REQUEST_MAX_NR ) {
- CDEBUG(D_IOCTL, "invalid ioctl ( type %d, nr %d, size %d )\n",
- _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
- RETURN(-EINVAL);
- }
-
- ptlrpc_init_client(NULL, NULL,
- MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, &cl);
-
- mdc_con2cl(conn, &cl, &connection, &rconn);
- switch (cmd) {
- case IOC_REQUEST_GETATTR: {
- CERROR("-- getting attr for ino %lu\n", arg);
- err = mdc_getattr(&cl, connection, (obd_id)arg, S_IFDIR, ~0, 0,
- &request);
- CERROR("-- done err %d\n", err);
-
- GOTO(out, err);
- }
-
- case IOC_REQUEST_READPAGE: {
- char *buf;
- OBD_ALLOC(buf, PAGE_SIZE);
- if (!buf) {
- err = -ENOMEM;
- GOTO(out, err);
- }
- CERROR("-- readpage 0 for ino %lu\n", arg);
- err = mdc_readpage(&cl, connection, arg, S_IFDIR, 0, buf,
- &request);
- CERROR("-- done err %d\n", err);
- OBD_FREE(buf, PAGE_SIZE);
-
- GOTO(out, err);
- }
-
- case IOC_REQUEST_SETATTR: {
- struct inode inode;
- struct iattr iattr;
-
- inode.i_ino = arg;
- inode.i_generation = 0;
- iattr.ia_mode = 040777;
- iattr.ia_atime = 0;
- iattr.ia_valid = ATTR_MODE | ATTR_ATIME;
-
- err = mdc_setattr(&cl, connection, &inode, &iattr, &request);
- CERROR("-- done err %d\n", err);
-
- GOTO(out, err);
- }
-
- case IOC_REQUEST_CREATE: {
- struct inode inode;
- struct iattr iattr;
-
- inode.i_ino = arg;
- inode.i_generation = 0;
- iattr.ia_mode = 040777;
- iattr.ia_atime = 0;
- iattr.ia_valid = ATTR_MODE | ATTR_ATIME;
-
- err = mdc_create(&cl, connection, &inode,
- "foofile", strlen("foofile"),
- NULL, 0, 0100707, 47114711,
- 11, 47, 0, NULL, &request);
- CERROR("-- done err %d\n", err);
-
- GOTO(out, err);
- }
-
- case IOC_REQUEST_OPEN: {
- __u64 fh, ino;
- copy_from_user(&ino, (__u64 *)arg, sizeof(ino));
- CERROR("-- opening ino %llu\n", (unsigned long long)ino);
- err = mdc_open(&cl, connection, ino, S_IFDIR, O_RDONLY, 4711,
- &fh, &request);
- copy_to_user((__u64 *)arg, &fh, sizeof(fh));
- CERROR("-- done err %d (fh=%Lu)\n", err,
- (unsigned long long)fh);
-
- GOTO(out, err);
- }
-
- case IOC_REQUEST_CLOSE: {
- CERROR("-- closing ino 2, filehandle %lu\n", arg);
- err = mdc_close(&cl, connection, 2, S_IFDIR, arg, &request);
- CERROR("-- done err %d\n", err);
-
- GOTO(out, err);
- }
-
- default:
- GOTO(out, err = -EINVAL);
- }
-
- out:
- ptlrpc_free_req(request);
- ptlrpc_put_connection(connection);
- ptlrpc_cleanup_client(&cl);
-
- RETURN(err);
-#endif
- return 0;
-}
-
-static int mdc_setup(struct obd_device *obddev, obd_count len, void *buf)
-{
- struct obd_ioctl_data* data = buf;
- struct mdc_obd *mdc = &obddev->u.mdc;
- char server_uuid[37];
- int rc;
- ENTRY;
-
- if (data->ioc_inllen1 < 1) {
- CERROR("osc setup requires a TARGET UUID\n");
- RETURN(-EINVAL);
- }
-
- if (data->ioc_inllen1 > 37) {
- CERROR("mdc UUID must be less than 38 characters\n");
- RETURN(-EINVAL);
- }
-
- if (data->ioc_inllen2 < 1) {
- CERROR("mdc setup requires a SERVER UUID\n");
- RETURN(-EINVAL);
- }
-
- if (data->ioc_inllen2 > 37) {
- CERROR("mdc UUID must be less than 38 characters\n");
- RETURN(-EINVAL);
- }
-
- memcpy(mdc->mdc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
- memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
- sizeof(server_uuid)));
-
- mdc->mdc_conn = ptlrpc_uuid_to_connection(server_uuid);
- if (!mdc->mdc_conn)
- RETURN(-ENOENT);
-
- OBD_ALLOC(mdc->mdc_client, sizeof(*mdc->mdc_client));
- if (mdc->mdc_client == NULL)
- GOTO(out_conn, rc = -ENOMEM);
-
- OBD_ALLOC(mdc->mdc_ldlm_client, sizeof(*mdc->mdc_ldlm_client));
- if (mdc->mdc_ldlm_client == NULL)
- GOTO(out_client, rc = -ENOMEM);
-
- ptlrpc_init_client(NULL, NULL, MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
- mdc->mdc_client);
- ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
- mdc->mdc_ldlm_client);
- mdc->mdc_client->cli_name = "mdc";
- mdc->mdc_ldlm_client->cli_name = "ldlm";
- mdc->mdc_max_mdsize = sizeof(struct lov_stripe_md);
- /* XXX get recovery hooked in here again */
- //ptlrpc_init_client(ptlrpc_connmgr, ll_recover,...
-
- ptlrpc_init_client(ptlrpc_connmgr, NULL,
- MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
- mdc->mdc_client);
-
- MOD_INC_USE_COUNT;
- RETURN(0);
-
- out_client:
- OBD_FREE(mdc->mdc_client, sizeof(*mdc->mdc_client));
- out_conn:
- ptlrpc_put_connection(mdc->mdc_conn);
- return rc;
-}
-
-static int mdc_cleanup(struct obd_device * obddev)
-{
- struct mdc_obd *mdc = &obddev->u.mdc;
-
- ptlrpc_cleanup_client(mdc->mdc_client);
- OBD_FREE(mdc->mdc_client, sizeof(*mdc->mdc_client));
- ptlrpc_cleanup_client(mdc->mdc_ldlm_client);
- OBD_FREE(mdc->mdc_ldlm_client, sizeof(*mdc->mdc_ldlm_client));
- ptlrpc_put_connection(mdc->mdc_conn);
-
- MOD_DEC_USE_COUNT;
- return 0;
-}
-
-static int mdc_connect(struct lustre_handle *conn, struct obd_device *obd)
-{
- struct mdc_obd *mdc = &obd->u.mdc;
- struct ptlrpc_request *request;
- int rc, size = sizeof(mdc->mdc_target_uuid);
- char *tmp = mdc->mdc_target_uuid;
-
- ENTRY;
-
- obd->obd_namespace =
- ldlm_namespace_new("mdc", LDLM_NAMESPACE_CLIENT);
- if (obd->obd_namespace == NULL)
- RETURN(-ENOMEM);
-
- MOD_INC_USE_COUNT;
- rc = class_connect(conn, obd);
- if (rc)
- RETURN(rc);
-
- request = ptlrpc_prep_req(mdc->mdc_client, mdc->mdc_conn,
- MDS_CONNECT, 1, &size, &tmp);
- if (!request)
- GOTO(out_disco, -ENOMEM);
-
- request->rq_level = LUSTRE_CONN_NEW;
- request->rq_replen = lustre_msg_size(0, NULL);
- /* Sending our local connection info breaks for local connections
- request->rq_reqmsg->addr = conn->addr;
- request->rq_reqmsg->cookie = conn->cookie;
- */
-
- rc = ptlrpc_queue_wait(request);
- rc = ptlrpc_check_status(request, rc);
- if (rc)
- GOTO(out, rc);
-
- class_rconn2export(conn, (struct lustre_handle *)request->rq_repmsg);
-
- EXIT;
- out:
- ptlrpc_free_req(request);
- out_disco:
- if (rc) {
- class_disconnect(conn);
- MOD_DEC_USE_COUNT;
- }
- return rc;
-}
-
-static int mdc_disconnect(struct lustre_handle *conn)
-{
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
- struct obd_device *obd = class_conn2obd(conn);
- struct ptlrpc_request *request;
- int rc;
- ENTRY;
-
- ldlm_namespace_free(obd->obd_namespace);
- mdc_con2cl(conn, &cl, &connection, &rconn);
- request = ptlrpc_prep_req2(cl, connection, rconn,
- MDS_DISCONNECT, 0, NULL, NULL);
- if (!request)
- RETURN(-ENOMEM);
-
- request->rq_replen = lustre_msg_size(0, NULL);
-
- rc = ptlrpc_queue_wait(request);
- if (rc)
- GOTO(out, rc);
- rc = class_disconnect(conn);
- if (!rc)
- MOD_DEC_USE_COUNT;
- out:
- ptlrpc_free_req(request);
- return rc;
-}
-
struct obd_ops mdc_obd_ops = {
- o_setup: mdc_setup,
- o_cleanup: mdc_cleanup,
- o_connect: mdc_connect,
- o_disconnect: mdc_disconnect,
- o_iocontrol: mdc_ioctl
+ o_setup: client_obd_setup,
+ o_cleanup: client_obd_cleanup,
+ o_connect: client_obd_connect,
+ o_disconnect: client_obd_disconnect,
};
static int __init ptlrpc_request_init(void)
* by Peter Braam <braam@clusterfs.com> &
* Andreas Dilger <braam@clusterfs.com>
*
- * This server is single threaded at present (but can easily be multi threaded)
- *
*/
#define EXPORT_SYMTAB
return &req->rq_export->exp_obd->u.mds;
}
+
/* Assumes caller has already pushed into the kernel filesystem context */
static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
__u64 offset)
}
/* 'dir' is a inode for which a lock has already been taken */
-struct dentry *mds_name2locked_dentry(struct mds_obd *mds, struct dentry *dir,
+struct dentry *mds_name2locked_dentry(struct obd_device *obd, struct dentry *dir,
struct vfsmount **mnt, char *name,
int namelen, int lock_mode,
struct lustre_handle *lockh,
RETURN(dchild);
res_id[0] = dchild->d_inode->i_ino;
- rc = ldlm_match_or_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
- (struct lustre_handle *)&mds->mds_connh,
- NULL, mds->mds_local_namespace, NULL,
+ rc = ldlm_match_or_enqueue(NULL, NULL, obd->obd_namespace, NULL,
res_id, LDLM_PLAIN, NULL, 0, lock_mode,
- &flags, (void *)mds_lock_callback, NULL,
+ &flags, ldlm_completion_ast, (void *)mds_lock_callback, NULL,
0, lockh);
if (rc != ELDLM_OK) {
l_dput(dchild);
RETURN(dchild);
}
-struct dentry *mds_fid2locked_dentry(struct mds_obd *mds, struct ll_fid *fid,
+struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
struct vfsmount **mnt, int lock_mode,
struct lustre_handle *lockh)
{
+ struct mds_obd *mds = &obd->u.mds;
struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
int flags, rc;
__u64 res_id[3] = {0};
RETURN(de);
res_id[0] = de->d_inode->i_ino;
- rc = ldlm_match_or_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
- (struct lustre_handle *)&mds->mds_connh,
- NULL, mds->mds_local_namespace, NULL,
+ rc = ldlm_match_or_enqueue(NULL, NULL, obd->obd_namespace, NULL,
res_id, LDLM_PLAIN, NULL, 0, lock_mode,
- &flags, (void *)mds_lock_callback, NULL,
+ &flags, ldlm_completion_ast, (void *)mds_lock_callback, NULL,
0, lockh);
if (rc != ELDLM_OK) {
l_dput(de);
RETURN(ERR_PTR(-ESTALE));
}
- /* now to find a dentry.
- * If possible, get a well-connected one
- */
+ /* now to find a dentry. If possible, get a well-connected one */
if (mnt)
*mnt = mds->mds_vfsmnt;
spin_lock(&dcache_lock);
static int mds_getattr_name(int offset, struct ptlrpc_request *req)
{
struct mds_obd *mds = mds_req2mds(req);
+ struct obd_device *obd = req->rq_export->exp_obd;
struct obd_run_ctxt saved;
struct mds_body *body;
struct dentry *de = NULL, *dchild = NULL;
lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
res_id[0] = dir->i_ino;
- rc = ldlm_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN,
+ rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
NULL, 0, lock_mode, &lockh);
if (rc == 0) {
LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]);
- rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
- (struct lustre_handle *)&mds->mds_connh,
- NULL, mds->mds_local_namespace, NULL,
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
res_id, LDLM_PLAIN, NULL, 0, lock_mode,
- &flags, (void *)mds_lock_callback,
+ &flags, ldlm_completion_ast, (void *)mds_lock_callback,
NULL, 0, &lockh);
if (rc != ELDLM_OK) {
CERROR("lock enqueue: err: %d\n", rc);
if (S_ISREG(inode->i_mode)) {
struct lov_stripe_md *md;
md = lustre_msg_buf(req->rq_repmsg, offset + 1);
- md->lmd_size = mds->mds_max_mdsize;
+ md->lmd_easize = mds->mds_max_mdsize;
mds_fs_get_md(mds, inode, md);
}
/* now a normal case for intent locking */
void *handle;
struct lov_stripe_md *md;
struct inode *inode = de->d_inode;
- //struct iattr iattr;
int rc;
md = lustre_msg_buf(req->rq_reqmsg, 1);
- //iattr.ia_mode = inode->i_mode;
handle = mds_fs_start(mds, de->d_inode, MDS_FSOP_SETATTR);
if (!handle) {
/* XXX error handling */
rc = mds_fs_set_md(mds, inode, handle, md);
- // rc = mds_fs_setattr(mds, de, handle, &iattr);
if (!rc) {
struct obd_run_ctxt saved;
push_ctxt(&saved, &mds->mds_ctxt);
rc = mds_close(req);
break;
+ case LDLM_ENQUEUE:
+ CDEBUG(D_INODE, "enqueue\n");
+ OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
+ rc = ldlm_handle_enqueue(req);
+ if (rc)
+ break;
+ RETURN(0);
+
+ case LDLM_CONVERT:
+ CDEBUG(D_INODE, "convert\n");
+ OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
+ rc = ldlm_handle_convert(req);
+ if (rc)
+ break;
+ RETURN(0);
+
+ case LDLM_CANCEL:
+ CDEBUG(D_INODE, "cancel\n");
+ OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
+ rc = ldlm_handle_cancel(req);
+ if (rc)
+ break;
+ RETURN(0);
+ case LDLM_CALLBACK:
+ CDEBUG(D_INODE, "callback\n");
+ CERROR("callbacks should not happen on MDS\n");
+ LBUG();
+ OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
+ break;
+
+
default:
rc = ptlrpc_error(req->rq_svc, req);
RETURN(rc);
return rc;
}
-
+#define MDS_NUM_THREADS 8
/* mount the file system (secretly) */
static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
{
+ int i;
struct obd_ioctl_data* data = buf;
- struct obd_export *export;
struct mds_obd *mds = &obddev->u.mds;
struct vfsmount *mnt;
int rc = 0;
GOTO(err_fs, rc = -EINVAL);
}
- rc = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds");
- if (rc) {
- CERROR("cannot start thread: rc = %d\n", rc);
- GOTO(err_svc, rc);
- }
-
rc = -ENOENT;
- mds->mds_ldlm_conn = ptlrpc_uuid_to_connection("self");
- if (!mds->mds_ldlm_conn) {
- mds_cleanup(obddev);
- GOTO(err_thread, rc);
- }
-
obddev->obd_namespace =
ldlm_namespace_new("mds_server", LDLM_NAMESPACE_SERVER);
if (obddev->obd_namespace == NULL) {
LBUG();
mds_cleanup(obddev);
- GOTO(err_thread, rc);
- }
-
- mds->mds_local_namespace =
- ldlm_namespace_new("mds_client", LDLM_NAMESPACE_CLIENT);
- if (mds->mds_local_namespace == NULL) {
- LBUG();
- mds_cleanup(obddev);
- GOTO(err_thread, rc);
+ GOTO(err_svc, rc);
}
- OBD_ALLOC(mds->mds_ldlm_client, sizeof(*mds->mds_ldlm_client));
- if (mds->mds_ldlm_client == NULL) {
- LBUG();
- mds_cleanup(obddev);
- GOTO(err_thread, rc);
+ for (i = 0; i < MDS_NUM_THREADS; i++) {
+ char name[32];
+ sprintf(name, "lustre_MDS_%2d", i);
+ rc = ptlrpc_start_thread(obddev, mds->mds_service, name);
+ if (rc) {
+ CERROR("cannot start MDS thread #%d: rc %d\n", i, rc);
+ LBUG();
+ GOTO(err_thread, rc);
+ }
}
- ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
- mds->mds_ldlm_client);
- mds->mds_ldlm_client->cli_target_devno = obddev->obd_minor;
- mds->mds_ldlm_client->cli_name = "mds ldlm";
rc = mds_recover(obddev);
if (rc)
GOTO(err_thread, rc);
- rc = class_connect(&mds->mds_connh, obddev);
- if (rc)
- GOTO(err_thread, rc);
- export = class_conn2export(&mds->mds_connh);
- if (!export)
- LBUG();
- export->exp_connection = mds->mds_ldlm_conn;
-
RETURN(0);
struct mds_obd *mds = &obddev->u.mds;
ENTRY;
- class_disconnect(&mds->mds_connh);
-
if ( !list_empty(&obddev->obd_exports) ) {
CERROR("still has exports!\n");
mds->mds_sb = 0;
kfree(mds->mds_fstype);
- ldlm_namespace_free(mds->mds_local_namespace);
ldlm_namespace_free(obddev->obd_namespace);
- if (mds->mds_ldlm_conn != NULL)
- ptlrpc_put_connection(mds->mds_ldlm_conn);
-
- OBD_FREE(mds->mds_ldlm_client, sizeof(*mds->mds_ldlm_client));
-
lock_kernel();
#ifdef CONFIG_DEV_RDONLY
dev_clear_rdonly(2);
RETURN(0);
}
+static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
+ ldlm_mode_t mode, void *data)
+{
+ struct ptlrpc_request *req = req_cookie;
+ int rc = 0;
+ ENTRY;
+
+ if (!req_cookie)
+ RETURN(0);
+
+ if (req->rq_reqmsg->bufcount > 1) {
+ /* an intent needs to be considered */
+ struct ldlm_intent *it = lustre_msg_buf(req->rq_reqmsg, 1);
+ struct mds_obd *mds= &req->rq_export->exp_obd->u.mds;
+ struct mds_body *mds_rep;
+ struct ldlm_reply *rep;
+ __u64 new_resid[3] = {0, 0, 0}, old_res;
+ int bufcount = -1, rc, size[3] = {sizeof(struct ldlm_reply),
+ sizeof(struct mds_body),
+ mds->mds_max_mdsize};
+
+ it->opc = NTOH__u64(it->opc);
+
+ LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc));
+
+ /* prepare reply */
+ switch((long)it->opc) {
+ case IT_GETATTR:
+ /* Note that in the negative case you may be returning
+ * a file and its obdo */
+ case IT_CREAT:
+ case IT_CREAT|IT_OPEN:
+ case IT_LINK:
+ case IT_LOOKUP:
+ case IT_MKDIR:
+ case IT_MKNOD:
+ case IT_OPEN:
+ case IT_READLINK:
+ case IT_RENAME:
+ case IT_RMDIR:
+ case IT_SETATTR:
+ case IT_SYMLINK:
+ case IT_UNLINK:
+ bufcount = 3;
+ break;
+ case IT_RENAME2:
+ bufcount = 1;
+ break;
+ default:
+ LBUG();
+ }
+
+ rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen,
+ &req->rq_repmsg);
+ if (rc) {
+ rc = req->rq_status = -ENOMEM;
+ RETURN(rc);
+ }
+
+ rep = lustre_msg_buf(req->rq_repmsg, 0);
+ rep->lock_policy_res1 = 1;
+
+ /* execute policy */
+ switch ((long)it->opc) {
+ case IT_CREAT:
+ case IT_CREAT|IT_OPEN:
+ case IT_LINK:
+ case IT_MKDIR:
+ case IT_MKNOD:
+ case IT_RENAME2:
+ case IT_RMDIR:
+ case IT_SYMLINK:
+ case IT_UNLINK:
+ rc = mds_reint(2, req);
+ if (rc || req->rq_status != 0) {
+ rep->lock_policy_res2 = req->rq_status;
+ RETURN(ELDLM_LOCK_ABORTED);
+ }
+ break;
+ case IT_GETATTR:
+ case IT_LOOKUP:
+ case IT_OPEN:
+ case IT_READDIR:
+ case IT_READLINK:
+ case IT_RENAME:
+ case IT_SETATTR:
+ rc = mds_getattr_name(2, req);
+ /* FIXME: we need to sit down and decide on who should
+ * set req->rq_status, who should return negative and
+ * positive return values, and what they all mean. */
+ if (rc || req->rq_status != 0) {
+ rep->lock_policy_res2 = req->rq_status;
+ RETURN(ELDLM_LOCK_ABORTED);
+ }
+ break;
+ case IT_READDIR|IT_OPEN:
+ LBUG();
+ break;
+ default:
+ CERROR("Unhandled intent\n");
+ LBUG();
+ }
+
+ if (it->opc == IT_UNLINK || it->opc == IT_RMDIR ||
+ it->opc == IT_RENAME || it->opc == IT_RENAME2)
+ RETURN(ELDLM_LOCK_ABORTED);
+
+ rep->lock_policy_res2 = req->rq_status;
+ mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
+ new_resid[0] = NTOH__u32(mds_rep->ino);
+ if (new_resid[0] == 0)
+ LBUG();
+ old_res = lock->l_resource->lr_name[0];
+
+ CDEBUG(D_INFO, "remote intent: locking %d instead of"
+ "%ld\n", mds_rep->ino, (long)old_res);
+
+ ldlm_lock_change_resource(lock, new_resid);
+ if (lock->l_resource == NULL) {
+ LBUG();
+ RETURN(-ENOMEM);
+ }
+ LDLM_DEBUG(lock, "intent policy, old res %ld",
+ (long)old_res);
+ RETURN(ELDLM_LOCK_CHANGED);
+ } else {
+ int size = sizeof(struct ldlm_reply);
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
+ &req->rq_repmsg);
+ if (rc) {
+ CERROR("out of memory\n");
+ LBUG();
+ RETURN(-ENOMEM);
+ }
+ }
+ RETURN(rc);
+}
+
+
extern int mds_iocontrol(long cmd, struct lustre_handle *conn,
int len, void *karg, void *uarg);
static int __init mds_init(void)
{
- inter_module_register("mds_reint", THIS_MODULE, &mds_reint);
- inter_module_register("mds_getattr_name", THIS_MODULE,
- &mds_getattr_name);
class_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
+ ldlm_register_intent(ldlm_intent_policy);
return 0;
}
static void __exit mds_exit(void)
{
- inter_module_unregister("mds_reint");
- inter_module_unregister("mds_getattr_name");
+ ldlm_unregister_intent();
class_unregister_type(LUSTRE_MDS_NAME);
}
md->lmd_magic = cpu_to_le32(XATTR_MDS_MO_MAGIC);
rc = extN_xattr_set(handle, inode, EXTN_XATTR_INDEX_LUSTRE,
XATTR_LUSTRE_MDS_OBJID, md,
- md->lmd_size, XATTR_CREATE);
+ md->lmd_easize, XATTR_CREATE);
}
up(&inode->i_sem);
unlock_kernel();
static int mds_extN_get_md(struct inode *inode, struct lov_stripe_md *md)
{
int rc;
- int size = md->lmd_size;
+ int size = md->lmd_easize;
lock_kernel();
down(&inode->i_sem);
struct ptlrpc_request *req)
{
struct mds_obd *mds = mds_req2mds(req);
+ struct obd_device *obd = req->rq_export->exp_obd;
struct dentry *de;
void *handle;
struct lustre_handle child_lockh;
int namelen;
/* a name was supplied by the client; fid1 is the directory */
- dir = mds_fid2locked_dentry(mds, rec->ur_fid1, NULL, LCK_PR,
+ dir = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PR,
&dir_lockh);
if (!dir || IS_ERR(dir)) {
l_dput(dir);
name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
namelen = req->rq_reqmsg->buflens[offset + 1] - 1;
- de = mds_name2locked_dentry(mds, dir, NULL, name, namelen,
+ de = mds_name2locked_dentry(obd, dir, NULL, name, namelen,
0, &child_lockh, LCK_PR);
l_dput(dir);
if (!de || IS_ERR(de)) {
{
struct dentry *de = NULL;
struct mds_obd *mds = mds_req2mds(req);
+ struct obd_device *obd = req->rq_export->exp_obd;
struct dentry *dchild = NULL;
struct inode *dir;
void *handle;
lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
res_id[0] = dir->i_ino;
- rc = ldlm_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN,
+ rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
NULL, 0, lock_mode, &lockh);
if (rc == 0) {
LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]);
- rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
- (struct lustre_handle *)&mds->mds_connh,
- NULL, mds->mds_local_namespace, NULL,
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
res_id, LDLM_PLAIN, NULL, 0, lock_mode,
- &flags, (void *)mds_lock_callback, NULL,
+ &flags, ldlm_completion_ast, (void *)mds_lock_callback, NULL,
0, &lockh);
if (rc != ELDLM_OK) {
CERROR("lock enqueue: err: %d\n", rc);
/* If i_file_acl is set, this inode has an EA */
if (S_ISREG(inode->i_mode) && inode->u.ext3_i.i_file_acl) {
md = lustre_msg_buf(req->rq_repmsg, offset + 1);
- md->lmd_size = mds->mds_max_mdsize;
+ md->lmd_easize = mds->mds_max_mdsize;
mds_fs_get_md(mds, inode, md);
}
/* now a normal case for intent locking */
struct dentry *de = NULL;
struct dentry *dchild = NULL;
struct mds_obd *mds = mds_req2mds(req);
+ struct obd_device *obd = req->rq_export->exp_obd;
char *name;
int namelen;
struct inode *dir, *inode;
/* a name was supplied by the client; fid1 is the directory */
lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
- de = mds_fid2locked_dentry(mds, rec->ur_fid1, NULL, lock_mode,
+ de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, lock_mode,
&lockh);
if (IS_ERR(de)) {
LBUG();
name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
namelen = req->rq_reqmsg->buflens[offset + 1] - 1;
-
- dchild = mds_name2locked_dentry(mds, de, NULL, name, namelen,
- LCK_EX, &child_lockh, lock_mode);
+ dchild = mds_name2locked_dentry(obd, de, NULL, name, namelen,
+ LCK_EX, &child_lockh, lock_mode);
if (IS_ERR(dchild) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK)) {
LBUG();
if (rc < 0) {
CDEBUG(D_INFO, "No md for ino %ld err %d\n",
inode->i_ino, rc);
- memset(md, 0, md->lmd_size);
+ memset(md, 0, md->lmd_easize);
}
}
default:
static int mds_reint_rename(struct mds_update_record *rec, int offset,
struct ptlrpc_request *req)
{
+ struct obd_device *obd = req->rq_export->exp_obd;
struct dentry *de_srcdir = NULL;
struct dentry *de_tgtdir = NULL;
struct dentry *de_old = NULL;
lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
res_id[0] = de_srcdir->d_inode->i_ino;
- rc = ldlm_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN,
+ rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
NULL, 0, lock_mode, &srclockh);
if (rc == 0) {
LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]);
- rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
- (struct lustre_handle *)&mds->mds_connh,
- NULL, mds->mds_local_namespace, NULL,
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
res_id, LDLM_PLAIN, NULL, 0, lock_mode,
- &flags, (void *)mds_lock_callback, NULL,
+ &flags, ldlm_completion_ast, (void *)mds_lock_callback, NULL,
0, &srclockh);
if (rc != ELDLM_OK) {
CERROR("lock enqueue: err: %d\n", rc);
lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
res_id[0] = de_tgtdir->d_inode->i_ino;
- rc = ldlm_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN,
+ rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
NULL, 0, lock_mode, &tgtlockh);
if (rc == 0) {
LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]);
- rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
- (struct lustre_handle *)&mds->mds_connh,
- NULL, mds->mds_local_namespace, NULL,
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
res_id, LDLM_PLAIN, NULL, 0, lock_mode,
- &flags, (void *)mds_lock_callback, NULL,
+ &flags, ldlm_completion_ast, (void *)mds_lock_callback, NULL,
0, &tgtlockh);
if (rc != ELDLM_OK) {
CERROR("lock enqueue: err: %d\n", rc);
* about to free, to force everyone to drop their
* locks. */
LDLM_DEBUG_NOLOCK("getting EX lock res %Lu", res_id[0]);
- rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
- (struct lustre_handle *)&mds->mds_connh,
- NULL, mds->mds_local_namespace, NULL,
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
res_id,
LDLM_PLAIN, NULL, 0, LCK_EX, &flags,
- (void *)mds_lock_callback, NULL, 0,
+ ldlm_completion_ast, (void *)mds_lock_callback, NULL, 0,
&oldhandle);
if (rc)
CERROR("failed to get child inode lock (child ino %Ld, "
case OBD_IOC_GETATTR: {
obd_data2conn(&conn, data);
- err = obd_getattr(&conn, &data->ioc_obdo1);
+ err = obd_getattr(&conn, &data->ioc_obdo1, NULL);
if (err)
GOTO(out, err);
case OBD_IOC_SETATTR: {
obd_data2conn(&conn, data);
- err = obd_setattr(&conn, &data->ioc_obdo1);
+ err = obd_setattr(&conn, &data->ioc_obdo1, NULL);
if (err)
GOTO(out, err);
* by Peter Braam <braam@clusterfs.com>
*/
-static char rcsid[] __attribute ((unused)) = "$Id: echo.c,v 1.18 2002/07/26 16:54:55 rread Exp $";
-#define OBDECHO_VERSION "$Revision: 1.18 $"
+static char rcsid[] __attribute ((unused)) = "$Id: echo.c,v 1.19 2002/07/31 20:49:37 braam Exp $";
+#define OBDECHO_VERSION "$Revision: 1.19 $"
#define EXPORT_SYMTAB
#include <linux/locks.h>
#include <linux/ext2_fs.h>
#include <linux/quotaops.h>
+#include <linux/proc_fs.h>
#include <linux/init.h>
#include <asm/unistd.h>
static obd_count GEN;
static long echo_pages = 0;
+static atomic_t echo_page_rws;
+static atomic_t echo_getattrs;
+
+#define ECHO_PROC_STAT "sys/obdecho"
+
+int
+echo_proc_read (char *page, char **start, off_t off, int count, int *eof, void *data)
+{
+ int len;
+ int attrs = atomic_read (&echo_getattrs);
+ int pages = atomic_read (&echo_page_rws);
+
+ *eof = 1;
+ if (off != 0)
+ return (0);
+
+ len = sprintf (page, "%d %d\n", pages, attrs);
+
+ *start = page;
+ return (len);
+}
+
+int
+echo_proc_write (struct file *file, const char *ubuffer, unsigned long count, void *data)
+{
+ /* Ignore what we've been asked to write, and just zero the stats counters */
+ atomic_set (&echo_page_rws, 0);
+ atomic_set (&echo_getattrs, 0);
+
+ return (count);
+}
+
+void
+echo_proc_init(void)
+{
+ struct proc_dir_entry *entry = create_proc_entry (ECHO_PROC_STAT, S_IFREG | S_IRUGO | S_IWUSR, NULL);
+
+ if (entry == NULL)
+ {
+ CERROR("couldn't create proc entry %s\n", ECHO_PROC_STAT);
+ return;
+ }
+
+ entry->data = NULL;
+ entry->read_proc = echo_proc_read;
+ entry->write_proc = echo_proc_write;
+}
+
+void
+echo_proc_fini(void)
+{
+ remove_proc_entry(ECHO_PROC_STAT, 0);
+}
+
static int echo_connect(struct lustre_handle *conn, struct obd_device *obd)
{
int rc;
return rc;
}
-static int echo_getattr(struct lustre_handle *conn, struct obdo *oa)
+static int echo_getattr(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md *md)
{
memcpy(oa, &OA, sizeof(*oa));
oa->o_mode = ++GEN;
+ atomic_inc (&echo_getattrs);
+
return 0;
}
GOTO(commitrw_cleanup, rc = -EFAULT);
}
+ atomic_inc (&echo_page_rws);
+
kunmap(page);
__free_pages(page, 0);
echo_pages--;
{
printk(KERN_INFO "Echo OBD driver " OBDECHO_VERSION " info@clusterfs.com\n");
+ echo_proc_init();
+
return class_register_type(&echo_obd_ops, OBD_ECHO_DEVICENAME);
}
static void __exit obdecho_exit(void)
{
+ echo_proc_fini ();
+
CERROR("%ld prep/commitrw pages leaked\n", echo_pages);
class_unregister_type(OBD_ECHO_DEVICENAME);
}
EXIT;
}
-static int filter_getattr(struct lustre_handle *conn, struct obdo *oa)
+static int filter_getattr(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md *md)
{
struct obd_device *obddev = class_conn2obd(conn);
struct dentry *dentry;
RETURN(0);
}
-static int filter_setattr(struct lustre_handle *conn, struct obdo *oa)
+static int filter_setattr(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md *md)
{
struct obd_run_ctxt saved;
struct obd_device *obd = class_conn2obd(conn);
/* NB count and offset are used for punch, but not truncate */
static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
struct lov_stripe_md *md,
- obd_size count, obd_off offset)
+ obd_off start, obd_off end)
{
int error;
ENTRY;
- CDEBUG(D_INODE, "calling truncate for object #%Ld, valid = %x, "
- "o_size = %Ld\n", oa->o_id, oa->o_valid, oa->o_size);
- error = filter_setattr(conn, oa);
+ if ( end != 0xffffffffffffffff ) {
+ CERROR("PUNCH not supported, only truncate works\n");
+ }
+ CDEBUG(D_INODE, "calling truncate for object #%Ld, valid = %x, "
+ "o_size = %Ld\n", oa->o_id, oa->o_valid, start);
+ oa->o_size = start;
+ error = filter_setattr(conn, oa, NULL);
RETURN(error);
}
modulefs_DATA = osc.o
EXTRA_PROGRAMS = osc
-LINX= obd_pack.c ll_pack.c
+LINX= obd_pack.c ll_pack.c l_net.c
osc_SOURCES = osc_request.c $(LINX)
obd_pack.c:
ll_pack.c:
test -e ll_pack.c || ln -sf $(top_srcdir)/lib/ll_pack.c
+l_net.c:
+ test -e l_net.c || ln -sf $(top_srcdir)/lib/l_net.c .
+
dist-hook:
list='$(LINX)'; for f in $$list; do rm -f $(distdir)/$$f; done
#include <linux/obd_lov.h>
#include <linux/init.h>
-static void osc_con2cl(struct lustre_handle *conn, struct ptlrpc_client **cl,
- struct ptlrpc_connection **connection,
- struct lustre_handle **rconn)
-{
- struct obd_export *export = class_conn2export(conn);
- struct osc_obd *osc = &export->exp_obd->u.osc;
-
- *cl = osc->osc_client;
- *connection = osc->osc_conn;
- *rconn = &export->exp_rconnh;
-}
-
-static void osc_con2dlmcl(struct lustre_handle *conn, struct ptlrpc_client **cl,
- struct ptlrpc_connection **connection,
- struct lustre_handle **rconn)
-{
- struct obd_export *export = class_conn2export(conn);
- struct osc_obd *osc = &export->exp_obd->u.osc;
-
- *cl = osc->osc_ldlm_client;
- *connection = osc->osc_conn;
- *rconn = &export->exp_rconnh;
-}
-
-static int osc_connect(struct lustre_handle *conn, struct obd_device *obd)
-{
- struct osc_obd *osc = &obd->u.osc;
- //struct obd_import *import;
- struct ptlrpc_request *request;
- char *tmp = osc->osc_target_uuid;
- int rc, size = sizeof(osc->osc_target_uuid);
- ENTRY;
-
- /* not used yet
- OBD_ALLOC(import, sizeof(*import));
- if (!import)
- RETURN(-ENOMEM);
- */
-
- MOD_INC_USE_COUNT;
- rc = class_connect(conn, obd);
- if (rc)
- RETURN(rc);
-
- request = ptlrpc_prep_req(osc->osc_client, osc->osc_conn,
- OST_CONNECT, 1, &size, &tmp);
- if (!request)
- GOTO(out_disco, rc = -ENOMEM);
-
- request->rq_level = LUSTRE_CONN_NEW;
- request->rq_replen = lustre_msg_size(0, NULL);
- request->rq_reqmsg->addr = -1;
- /* Sending our local connection info breaks for local connections
- request->rq_reqmsg->addr = conn->addr;
- request->rq_reqmsg->cookie = conn->cookie;
- */
-
- rc = ptlrpc_queue_wait(request);
- rc = ptlrpc_check_status(request, rc);
- if (rc) {
- CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
- GOTO(out, rc);
- }
-
- /* XXX eventually maybe more refinement */
- osc->osc_conn->c_level = LUSTRE_CONN_FULL;
-
- class_rconn2export(conn, (struct lustre_handle *)request->rq_repmsg);
-
- EXIT;
- out:
- ptlrpc_free_req(request);
- out_disco:
- if (rc) {
- class_disconnect(conn);
- MOD_DEC_USE_COUNT;
- }
- return rc;
-}
-
-static int osc_disconnect(struct lustre_handle *conn)
-{
- struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
- int rc;
- ENTRY;
-
- osc_con2cl(conn, &cl, &connection, &rconn);
- request = ptlrpc_prep_req2(cl, connection, rconn,
- OST_DISCONNECT, 0, NULL, NULL);
- if (!request)
- RETURN(-ENOMEM);
- request->rq_replen = lustre_msg_size(0, NULL);
- rc = ptlrpc_queue_wait(request);
- if (rc)
- GOTO(out, rc);
- rc = class_disconnect(conn);
- if (!rc)
- MOD_DEC_USE_COUNT;
- out:
- ptlrpc_free_req(request);
- return rc;
-}
-
-static int osc_getattr(struct lustre_handle *conn, struct obdo *oa)
+static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md *md)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
- osc_con2cl(conn, &cl, &connection, &rconn);
- request = ptlrpc_prep_req2(cl, connection, rconn,
- OST_GETATTR, 1, &size, NULL);
+ request = ptlrpc_prep_req2(conn, OST_GETATTR, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
struct lov_stripe_md *md)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
- osc_con2cl(conn, &cl, &connection, &rconn);
- request = ptlrpc_prep_req2(cl, connection, rconn,
- OST_OPEN, 1, &size, NULL);
+ request = ptlrpc_prep_req2(conn, OST_OPEN, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
struct lov_stripe_md *md)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
- osc_con2cl(conn, &cl, &connection, &rconn);
- request = ptlrpc_prep_req2(cl, connection, rconn,
- OST_CLOSE, 1, &size, NULL);
+ request = ptlrpc_prep_req2(conn, OST_CLOSE, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
return rc;
}
-static int osc_setattr(struct lustre_handle *conn, struct obdo *oa)
+static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
+ struct lov_stripe_md *md)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
- osc_con2cl(conn, &cl, &connection, &rconn);
- request = ptlrpc_prep_req2(cl, connection, rconn,
- OST_SETATTR, 1, &size, NULL);
+ request = ptlrpc_prep_req2(conn, OST_SETATTR, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
struct lov_stripe_md **ea)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
OBD_ALLOC(*ea, oa->o_easize);
if (!*ea)
RETURN(-ENOMEM);
- (*ea)->lmd_size = oa->o_easize;
+ (*ea)->lmd_easize = oa->o_easize;
}
- osc_con2cl(conn, &cl, &connection, &rconn);
- request = ptlrpc_prep_req2(cl, connection, rconn,
- OST_CREATE, 1, &size, NULL);
+ request = ptlrpc_prep_req2(conn, OST_CREATE, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
}
static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
- struct lov_stripe_md *md, obd_size count,
- obd_off offset)
+ struct lov_stripe_md *md, obd_size start,
+ obd_size end)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
CERROR("oa NULL\n");
RETURN(-EINVAL);
}
- osc_con2cl(conn, &cl, &connection, &rconn);
- request = ptlrpc_prep_req2(cl, connection, rconn,
- OST_PUNCH, 1, &size, NULL);
+
+ request = ptlrpc_prep_req2(conn, OST_PUNCH, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
body = lustre_msg_buf(request->rq_reqmsg, 0);
memcpy(&body->oa, oa, sizeof(*oa));
- body->oa.o_blocks = count;
- body->oa.o_valid |= OBD_MD_FLBLOCKS;
+
+ /* overload the blocks and size fields in the oa with start/end */
+ body->oa.o_blocks = start;
+ body->oa.o_size = end;
+ body->oa.o_valid |= OBD_MD_FLBLOCKS | OBD_MD_FLSIZE;
request->rq_replen = lustre_msg_size(1, &size);
struct lov_stripe_md *ea)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
CERROR("oa NULL\n");
RETURN(-EINVAL);
}
- osc_con2cl(conn, &cl, &connection, &rconn);
- request = ptlrpc_prep_req2(cl, connection, rconn,
- OST_DESTROY, 1, &size, NULL);
+ request = ptlrpc_prep_req2(conn, OST_DESTROY, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
obd_size *count, obd_off *offset, obd_flag *flags,
brw_callback_t callback, void *data)
{
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
+ struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
struct ptlrpc_request *request = NULL;
struct ptlrpc_bulk_desc *desc = NULL;
struct ost_body *body;
int rc, size[3] = {sizeof(*body)};
void *iooptr, *nioptr;
int mapped = 0;
+ __u32 xid;
ENTRY;
size[1] = sizeof(struct obd_ioobj);
size[2] = page_count * sizeof(struct niobuf_remote);
- osc_con2cl(conn, &cl, &connection, &rconn);
- request = ptlrpc_prep_req2(cl, connection, rconn,
- OST_READ, 3, size, NULL);
+ request = ptlrpc_prep_req2(conn, OST_READ, 3, size, NULL);
if (!request)
RETURN(-ENOMEM);
ost_pack_ioo(&iooptr, md, page_count);
/* end almost identical to brw_write case */
+ spin_lock(&connection->c_lock);
+ xid = ++connection->c_xid_out; /* single xid for all pages */
+ spin_unlock(&connection->c_lock);
+
for (mapped = 0; mapped < page_count; mapped++) {
struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
if (bulk == NULL)
GOTO(out_unmap, rc = -ENOMEM);
- spin_lock(&connection->c_lock);
- bulk->b_xid = ++connection->c_xid_out;
- spin_unlock(&connection->c_lock);
+ bulk->b_xid = xid; /* single xid for all pages */
bulk->b_buf = kmap(page_array[mapped]);
bulk->b_page = page_array[mapped];
obd_off *offset, obd_flag *flags,
brw_callback_t callback, void *data)
{
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
+ struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
struct ptlrpc_request *request = NULL;
struct ptlrpc_bulk_desc *desc = NULL;
struct ost_body *body;
size[1] = sizeof(struct obd_ioobj);
size[2] = page_count * sizeof(*remote);
- osc_con2cl(conn, &cl, &connection, &rconn);
- request = ptlrpc_prep_req2(cl, connection, rconn,
- OST_WRITE, 3, size, NULL);
+ request = ptlrpc_prep_req2(conn, OST_WRITE, 3, size, NULL);
if (!request)
RETURN(-ENOMEM);
offset, flags, callback, data);
}
-static int osc_enqueue(struct lustre_handle *conn,
+static int osc_enqueue(struct lustre_handle *connh,
struct lustre_handle *parent_lock, __u64 *res_id,
__u32 type, void *extentp, int extent_len, __u32 mode,
int *flags, void *callback, void *data, int datalen,
struct lustre_handle *lockh)
{
- struct obd_device *obddev = class_conn2obd(conn);
- struct ptlrpc_connection *connection;
- struct ptlrpc_client *cl;
- struct lustre_handle *rconn;
+ struct obd_device *obddev = class_conn2obd(connh);
struct ldlm_extent *extent = extentp;
int rc;
__u32 mode2;
extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
/* Next, search for already existing extent locks that will cover us */
- osc_con2dlmcl(conn, &cl, &connection, &rconn);
+ //osc_con2dlmcl(conn, &cl, &connection, &rconn);
rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
sizeof(extent), mode, lockh);
if (rc == 1) {
if (mode == LCK_PR)
return 0;
- rc = ldlm_cli_convert(cl, lockh, rconn, mode, &flags);
+ rc = ldlm_cli_convert(lockh, mode, &flags);
if (rc)
LBUG();
return rc;
}
- rc = ldlm_cli_enqueue(cl, connection, rconn, NULL,obddev->obd_namespace,
+ rc = ldlm_cli_enqueue(connh, NULL,obddev->obd_namespace,
parent_lock, res_id, type, extent, sizeof(extent),
- mode, flags, callback, data, datalen, lockh);
+ mode, flags, ldlm_completion_ast, callback, data, datalen, lockh);
return rc;
}
RETURN(0);
}
-static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
-{
- struct obd_ioctl_data* data = buf;
- struct osc_obd *osc = &obddev->u.osc;
- char server_uuid[37];
- int rc;
- ENTRY;
-
- if (data->ioc_inllen1 < 1) {
- CERROR("osc setup requires a TARGET UUID\n");
- RETURN(-EINVAL);
- }
-
- if (data->ioc_inllen1 > 37) {
- CERROR("osc TARGET UUID must be less than 38 characters\n");
- RETURN(-EINVAL);
- }
-
- if (data->ioc_inllen2 < 1) {
- CERROR("osc setup requires a SERVER UUID\n");
- RETURN(-EINVAL);
- }
-
- if (data->ioc_inllen2 > 37) {
- CERROR("osc SERVER UUID must be less than 38 characters\n");
- RETURN(-EINVAL);
- }
-
- memcpy(osc->osc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
- memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
- sizeof(server_uuid)));
-
- osc->osc_conn = ptlrpc_uuid_to_connection(server_uuid);
- if (!osc->osc_conn)
- RETURN(-ENOENT);
-
- obddev->obd_namespace =
- ldlm_namespace_new("osc", LDLM_NAMESPACE_CLIENT);
- if (obddev->obd_namespace == NULL)
- GOTO(out_conn, rc = -ENOMEM);
-
- OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
- if (osc->osc_client == NULL)
- GOTO(out_ns, rc = -ENOMEM);
-
- OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
- if (osc->osc_ldlm_client == NULL)
- GOTO(out_client, rc = -ENOMEM);
-
- ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
- osc->osc_client);
- ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
- osc->osc_ldlm_client);
- osc->osc_client->cli_name = "osc";
- osc->osc_ldlm_client->cli_name = "ldlm";
-
- MOD_INC_USE_COUNT;
- RETURN(0);
-
- out_client:
- OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
- out_ns:
- ldlm_namespace_free(obddev->obd_namespace);
- out_conn:
- ptlrpc_put_connection(osc->osc_conn);
- return rc;
-}
-
-static int osc_cleanup(struct obd_device * obddev)
-{
- struct osc_obd *osc = &obddev->u.osc;
-
- ldlm_namespace_free(obddev->obd_namespace);
-
- ptlrpc_cleanup_client(osc->osc_client);
- OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
- ptlrpc_cleanup_client(osc->osc_ldlm_client);
- OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
- ptlrpc_put_connection(osc->osc_conn);
-
- MOD_DEC_USE_COUNT;
- return 0;
-}
-
static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl;
- struct ptlrpc_connection *connection;
- struct lustre_handle *rconn;
struct obd_statfs *osfs;
int rc, size = sizeof(*osfs);
ENTRY;
- osc_con2cl(conn, &cl, &connection, &rconn);
- request = ptlrpc_prep_req2(cl, connection, rconn,
- OST_STATFS, 0, NULL, NULL);
+ request = ptlrpc_prep_req2(conn, OST_STATFS, 0, NULL, NULL);
if (!request)
RETURN(-ENOMEM);
}
struct obd_ops osc_obd_ops = {
- o_setup: osc_setup,
- o_cleanup: osc_cleanup,
+ o_setup: client_obd_setup,
+ o_cleanup: client_obd_cleanup,
o_statfs: osc_statfs,
o_create: osc_create,
o_destroy: osc_destroy,
o_setattr: osc_setattr,
o_open: osc_open,
o_close: osc_close,
- o_connect: osc_connect,
- o_disconnect: osc_disconnect,
+ o_connect: client_obd_connect,
+ o_disconnect: client_obd_disconnect,
o_brw: osc_brw,
o_punch: osc_punch,
o_enqueue: osc_enqueue,
repbody = lustre_msg_buf(req->rq_repmsg, 0);
memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
- req->rq_status = obd_getattr(conn, &repbody->oa);
+ req->rq_status = obd_getattr(conn, &repbody->oa, NULL);
RETURN(0);
}
repbody = lustre_msg_buf(req->rq_repmsg, 0);
memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
- req->rq_status = obd_setattr(conn, &repbody->oa);
+ req->rq_status = obd_setattr(conn, &repbody->oa, NULL);
RETURN(0);
}
void *tmp1, *tmp2, *end2;
void *desc_priv = NULL;
int reply_sent = 0;
+ struct ptlrpc_service *srv;
+ __u32 xid;
ENTRY;
body = lustre_msg_buf(req->rq_reqmsg, 0);
desc->b_desc_private = desc_priv;
memcpy(&(desc->b_conn), &conn, sizeof(conn));
+ srv = req->rq_obd->u.ost.ost_service;
+ spin_lock(&srv->srv_lock);
+ xid = srv->srv_xid++; /* single xid for all pages */
+ spin_unlock(&srv->srv_lock);
+
for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) {
- struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
struct ptlrpc_bulk_page *bulk;
bulk = ptlrpc_prep_bulk_page(desc);
if (bulk == NULL)
GOTO(fail_bulk, rc = -ENOMEM);
- spin_lock(&srv->srv_lock);
- bulk->b_xid = srv->srv_xid++;
- spin_unlock(&srv->srv_lock);
+ bulk->b_xid = xid; /* single xid for all pages */
bulk->b_buf = lnb->addr;
bulk->b_page = lnb->page;
OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
rc = ost_statfs(req);
break;
+ case LDLM_ENQUEUE:
+ CDEBUG(D_INODE, "enqueue\n");
+ OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
+ rc = ldlm_handle_enqueue(req);
+ if (rc)
+ break;
+ RETURN(0);
+ case LDLM_CONVERT:
+ CDEBUG(D_INODE, "convert\n");
+ OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
+ rc = ldlm_handle_convert(req);
+ if (rc)
+ break;
+ RETURN(0);
+ case LDLM_CANCEL:
+ CDEBUG(D_INODE, "cancel\n");
+ OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
+ rc = ldlm_handle_cancel(req);
+ if (rc)
+ break;
+ RETURN(0);
+ case LDLM_CALLBACK:
+ CDEBUG(D_INODE, "callback\n");
+ CERROR("callbacks should not happen on MDS\n");
+ LBUG();
+ OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
+ break;
+
+
+
default:
req->rq_status = -ENOTSUPP;
rc = ptlrpc_error(req->rq_svc, req);
int i;
ENTRY;
- if (data->ioc_dev < 0 || data->ioc_dev > MAX_OBD_DEVICES)
- RETURN(-ENODEV);
+ if (data->ioc_inllen1 < 1) {
+ CERROR("requires a TARGET OBD UUID\n");
+ RETURN(-EINVAL);
+ }
+ if (data->ioc_inllen1 > 37) {
+ CERROR("OBD UUID must be less than 38 characters\n");
+ RETURN(-EINVAL);
+ }
MOD_INC_USE_COUNT;
- tgt = &obd_dev[data->ioc_dev];
- if (!(tgt->obd_flags & OBD_ATTACHED) ||
+ tgt = class_uuid2obd(data->ioc_inlbuf1);
+ if (!tgt || !(tgt->obd_flags & OBD_ATTACHED) ||
!(tgt->obd_flags & OBD_SET_UP)) {
CERROR("device not attached or not set up (%d)\n",
data->ioc_dev);
}
for (i = 0; i < OST_NUM_THREADS; i++) {
- err = ptlrpc_start_thread(obddev, ost->ost_service,
- "lustre_ost");
+ char name[32];
+ sprintf(name, "lustre_ost_%2d", i);
+ err = ptlrpc_start_thread(obddev, ost->ost_service, name);
if (err) {
CERROR("error starting thread #%d: rc %d\n", i, err);
GOTO(error_disc, err = -EINVAL);
#define DEBUG_SUBSYSTEM S_RPC
#include <linux/obd_support.h>
+#include <linux/obd_class.h>
#include <linux/lustre_lib.h>
#include <linux/lustre_ha.h>
OBD_ALLOC(desc, sizeof(*desc));
if (desc != NULL) {
desc->b_connection = ptlrpc_connection_addref(conn);
- atomic_set(&desc->b_pages_remaining, 0);
atomic_set(&desc->b_refcount, 1);
init_waitqueue_head(&desc->b_waitq);
INIT_LIST_HEAD(&desc->b_page_list);
+ ptl_set_inv_handle(&desc->b_md_h);
+ ptl_set_inv_handle(&desc->b_me_h);
}
return desc;
OBD_ALLOC(bulk, sizeof(*bulk));
if (bulk != NULL) {
bulk->b_desc = desc;
- ptl_set_inv_handle(&bulk->b_md_h);
- ptl_set_inv_handle(&bulk->b_me_h);
list_add_tail(&bulk->b_link, &desc->b_page_list);
desc->b_page_count++;
- atomic_inc(&desc->b_pages_remaining);
}
return bulk;
}
RETURN(request);
}
-struct ptlrpc_request *ptlrpc_prep_req2(struct ptlrpc_client *cl,
- struct ptlrpc_connection *conn,
- struct lustre_handle *handle,
+struct ptlrpc_request *ptlrpc_prep_req2(struct lustre_handle *conn,
int opcode, int count, int *lengths,
char **bufs)
{
+ struct client_obd *clobd;
struct ptlrpc_request *req;
- req = ptlrpc_prep_req(cl, conn, opcode, count, lengths, bufs);
- ptlrpc_hdl2req(req, handle);
+ struct obd_export *export;
+
+ export = class_conn2export(conn);
+ if (!export) {
+ LBUG();
+ CERROR("NOT connected\n");
+ return NULL;
+ }
+
+ clobd = &export->exp_obd->u.cli;
+ req = ptlrpc_prep_req(clobd->cl_client, clobd->cl_conn,
+ opcode, count, lengths, bufs);
+ ptlrpc_hdl2req(req, &clobd->cl_exporth);
return req;
}
{
ENTRY;
+ LASSERT ((ev->mem_desc.options & PTL_MD_IOV) == 0); /* requests always contiguous */
+
if (ev->type != PTL_EVENT_SENT) {
// XXX make sure we understand all events, including ACK's
CERROR("Unknown event %d\n", ev->type);
{
ENTRY;
+ LASSERT ((ev->mem_desc.options & PTL_MD_IOV) == 0); /* replies always contiguous */
+
if (ev->type == PTL_EVENT_SENT) {
OBD_FREE(ev->mem_desc.start, ev->mem_desc.length);
} else {
struct ptlrpc_request *req = ev->mem_desc.user_ptr;
ENTRY;
+ LASSERT ((ev->mem_desc.options & PTL_MD_IOV) == 0); /* replies always contiguous */
+
if (req->rq_xid == 0x5a5a5a5a5a5a5a5a) {
CERROR("Reply received for freed request! Probably a missing "
"ptlrpc_abort()\n");
{
struct ptlrpc_service *service = ev->mem_desc.user_ptr;
+ LASSERT ((ev->mem_desc.options & PTL_MD_IOV) == 0); /* requests always contiguous */
+
if (ev->rlength != ev->mlength)
CERROR("Warning: Possibly truncated rpc (%d/%d)\n",
ev->mlength, ev->rlength);
static int bulk_source_callback(ptl_event_t *ev)
{
- struct ptlrpc_bulk_page *bulk = ev->mem_desc.user_ptr;
- struct ptlrpc_bulk_desc *desc = bulk->b_desc;
+ struct ptlrpc_bulk_desc *desc = ev->mem_desc.user_ptr;
+ struct ptlrpc_bulk_page *bulk;
+ struct list_head *tmp;
+ struct list_head *next;
ENTRY;
+ /* 1 fragment for each page always */
+ LASSERT (ev->mem_desc.niov == desc->b_page_count);
+
if (ev->type == PTL_EVENT_SENT) {
CDEBUG(D_NET, "got SENT event\n");
} else if (ev->type == PTL_EVENT_ACK) {
CDEBUG(D_NET, "got ACK event\n");
- if (bulk->b_cb != NULL)
- bulk->b_cb(bulk);
- if (atomic_dec_and_test(&desc->b_pages_remaining)) {
- desc->b_flags |= PTL_BULK_FL_SENT;
- wake_up(&desc->b_waitq);
- if (desc->b_cb != NULL)
- desc->b_cb(desc, desc->b_cb_data);
+
+ list_for_each_safe(tmp, next, &desc->b_page_list) {
+ bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
+
+ if (bulk->b_cb != NULL)
+ bulk->b_cb(bulk);
}
+ desc->b_flags |= PTL_BULK_FL_SENT;
+ wake_up(&desc->b_waitq);
+ if (desc->b_cb != NULL)
+ desc->b_cb(desc, desc->b_cb_data);
} else {
CERROR("Unexpected event type!\n");
LBUG();
static int bulk_sink_callback(ptl_event_t *ev)
{
- struct ptlrpc_bulk_page *bulk = ev->mem_desc.user_ptr;
- struct ptlrpc_bulk_desc *desc = bulk->b_desc;
+ struct ptlrpc_bulk_desc *desc = ev->mem_desc.user_ptr;
+ struct ptlrpc_bulk_page *bulk;
+ struct list_head *tmp;
+ struct list_head *next;
+ ptl_size_t total = 0;
ENTRY;
if (ev->type == PTL_EVENT_PUT) {
- if (bulk->b_buf != ev->mem_desc.start + ev->offset)
- CERROR("bulkbuf != mem_desc -- why?\n");
- if (bulk->b_cb != NULL)
- bulk->b_cb(bulk);
- if (atomic_dec_and_test(&desc->b_pages_remaining)) {
- desc->b_flags |= PTL_BULK_FL_RCVD;
- wake_up(&desc->b_waitq);
- if (desc->b_cb != NULL)
- desc->b_cb(desc, desc->b_cb_data);
+ /* put with zero offset */
+ LASSERT (ev->offset == 0);
+ /* used iovs */
+ LASSERT ((ev->mem_desc.options & PTL_MD_IOV) != 0);
+ /* 1 fragment for each page always */
+ LASSERT (ev->mem_desc.niov == desc->b_page_count);
+
+ list_for_each_safe (tmp, next, &desc->b_page_list) {
+ bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
+
+ total += bulk->b_buflen;
+
+ if (bulk->b_cb != NULL)
+ bulk->b_cb(bulk);
}
+
+ LASSERT (ev->mem_desc.length == total);
+
+ desc->b_flags |= PTL_BULK_FL_RCVD;
+ wake_up(&desc->b_waitq);
+ if (desc->b_cb != NULL)
+ desc->b_cb(desc, desc->b_cb_data);
} else {
CERROR("Unexpected event type!\n");
LBUG();
return rc;
}
+static inline struct iovec *
+ptlrpc_get_bulk_iov (struct ptlrpc_bulk_desc *desc)
+{
+ struct iovec *iov;
+
+ if (desc->b_page_count <= sizeof (desc->b_iov)/sizeof (struct iovec))
+ return (desc->b_iov);
+
+ OBD_ALLOC (iov, desc->b_page_count * sizeof (struct iovec));
+ if (iov == NULL)
+ LBUG();
+
+ return (iov);
+}
+
+static inline void
+ptlrpc_put_bulk_iov (struct ptlrpc_bulk_desc *desc, struct iovec *iov)
+{
+ if (desc->b_page_count <= sizeof (desc->b_iov)/sizeof (struct iovec))
+ return;
+
+ OBD_FREE (iov, desc->b_page_count * sizeof (struct iovec));
+}
+
int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *desc)
{
int rc;
struct list_head *tmp, *next;
ptl_process_id_t remote_id;
+ __u32 xid = 0;
+ struct iovec *iov;
ENTRY;
+ iov = ptlrpc_get_bulk_iov (desc);
+ if (iov == NULL)
+ RETURN (-ENOMEM);
+
+ desc->b_md.start = iov;
+ desc->b_md.niov = 0;
+ desc->b_md.length = 0;
+ desc->b_md.eventq = bulk_source_eq;
+ desc->b_md.threshold = 2; /* SENT and ACK */
+ desc->b_md.options = PTL_MD_OP_PUT | PTL_MD_IOV;
+ desc->b_md.user_ptr = desc;
+
list_for_each_safe(tmp, next, &desc->b_page_list) {
struct ptlrpc_bulk_page *bulk;
bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
- bulk->b_md.start = bulk->b_buf;
- bulk->b_md.length = bulk->b_buflen;
- bulk->b_md.eventq = bulk_source_eq;
- bulk->b_md.threshold = 2; /* SENT and ACK */
- bulk->b_md.options = PTL_MD_OP_PUT;
- bulk->b_md.user_ptr = bulk;
-
- rc = PtlMDBind(desc->b_connection->c_peer.peer_ni, bulk->b_md,
- &bulk->b_md_h);
- if (rc != 0) {
- CERROR("PtlMDBind failed: %d\n", rc);
- LBUG();
- RETURN(rc);
- }
-
- remote_id.nid = desc->b_connection->c_peer.peer_nid;
- remote_id.pid = 0;
-
- CDEBUG(D_NET, "Sending %d bytes to portal %d, xid %d\n",
- bulk->b_md.length, desc->b_portal, bulk->b_xid);
-
- rc = PtlPut(bulk->b_md_h, PTL_ACK_REQ, remote_id,
- desc->b_portal, 0, bulk->b_xid, 0, 0);
- if (rc != PTL_OK) {
- CERROR("PtlPut(%Lu, %d, %d) failed: %d\n",
- remote_id.nid, desc->b_portal, bulk->b_xid, rc);
- PtlMDUnlink(bulk->b_md_h);
- LBUG();
- RETURN(rc);
- }
+ LASSERT (desc->b_md.niov < desc->b_page_count);
+
+ if (desc->b_md.niov == 0)
+ xid = bulk->b_xid;
+ LASSERT (xid == bulk->b_xid); /* should all be the same */
+
+ iov[desc->b_md.niov].iov_base = bulk->b_buf;
+ iov[desc->b_md.niov].iov_len = bulk->b_buflen;
+ desc->b_md.niov++;
+ desc->b_md.length += bulk->b_buflen;
+ }
+
+ LASSERT (desc->b_md.niov == desc->b_page_count);
+ LASSERT (desc->b_md.niov != 0);
+
+ rc = PtlMDBind(desc->b_connection->c_peer.peer_ni, desc->b_md,
+ &desc->b_md_h);
+
+ ptlrpc_put_bulk_iov (desc, iov); /* move down to reduce latency to send */
+
+ if (rc != PTL_OK) {
+ CERROR("PtlMDBind failed: %d\n", rc);
+ LBUG();
+ RETURN(rc);
+ }
+
+ remote_id.nid = desc->b_connection->c_peer.peer_nid;
+ remote_id.pid = 0;
+
+ CDEBUG(D_NET, "Sending %u pages %u bytes to portal %d nid %Lx pid %d xid %d\n",
+ desc->b_md.niov, desc->b_md.length,
+ desc->b_portal, remote_id.nid, remote_id.pid, xid);
+
+ rc = PtlPut(desc->b_md_h, PTL_ACK_REQ, remote_id,
+ desc->b_portal, 0, xid, 0, 0);
+ if (rc != PTL_OK) {
+ CERROR("PtlPut(%Lu, %d, %d) failed: %d\n",
+ remote_id.nid, desc->b_portal, xid, rc);
+ PtlMDUnlink(desc->b_md_h);
+ LBUG();
+ RETURN(rc);
}
RETURN(0);
{
struct list_head *tmp, *next;
int rc;
+ __u32 xid = 0;
+ struct iovec *iov;
ENTRY;
+ iov = ptlrpc_get_bulk_iov (desc);
+ if (iov == NULL)
+ return (-ENOMEM);
+
+ desc->b_md.start = iov;
+ desc->b_md.niov = 0;
+ desc->b_md.length = 0;
+ desc->b_md.threshold = 1;
+ desc->b_md.options = PTL_MD_OP_PUT | PTL_MD_IOV;
+ desc->b_md.user_ptr = desc;
+ desc->b_md.eventq = bulk_sink_eq;
+
list_for_each_safe(tmp, next, &desc->b_page_list) {
struct ptlrpc_bulk_page *bulk;
bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
- rc = PtlMEAttach(desc->b_connection->c_peer.peer_ni,
- desc->b_portal, local_id, bulk->b_xid, 0,
- PTL_UNLINK, PTL_INS_AFTER, &bulk->b_me_h);
- if (rc != PTL_OK) {
- CERROR("PtlMEAttach failed: %d\n", rc);
- LBUG();
- GOTO(cleanup, rc);
- }
-
- bulk->b_md.start = bulk->b_buf;
- bulk->b_md.length = bulk->b_buflen;
- bulk->b_md.threshold = 1;
- bulk->b_md.options = PTL_MD_OP_PUT;
- bulk->b_md.user_ptr = bulk;
- bulk->b_md.eventq = bulk_sink_eq;
-
- rc = PtlMDAttach(bulk->b_me_h, bulk->b_md, PTL_UNLINK,
- &bulk->b_md_h);
- if (rc != PTL_OK) {
- CERROR("PtlMDAttach failed: %d\n", rc);
- LBUG();
- GOTO(cleanup, rc);
- }
-
- CDEBUG(D_NET, "Setup bulk sink buffer: %u bytes, xid %u, "
- "portal %u\n", bulk->b_buflen, bulk->b_xid,
- desc->b_portal);
+ LASSERT (desc->b_md.niov < desc->b_page_count);
+
+ if (desc->b_md.niov == 0)
+ xid = bulk->b_xid;
+ LASSERT (xid == bulk->b_xid); /* should all be the same */
+
+ iov[desc->b_md.niov].iov_base = bulk->b_buf;
+ iov[desc->b_md.niov].iov_len = bulk->b_buflen;
+ desc->b_md.niov++;
+ desc->b_md.length += bulk->b_buflen;
+ }
+
+ LASSERT (desc->b_md.niov == desc->b_page_count);
+ LASSERT (desc->b_md.niov != 0);
+
+ rc = PtlMEAttach(desc->b_connection->c_peer.peer_ni,
+ desc->b_portal, local_id, xid, 0,
+ PTL_UNLINK, PTL_INS_AFTER, &desc->b_me_h);
+
+ ptlrpc_put_bulk_iov (desc, iov);
+
+ if (rc != PTL_OK) {
+ CERROR("PtlMEAttach failed: %d\n", rc);
+ LBUG();
+ GOTO(cleanup, rc);
}
+
+ rc = PtlMDAttach(desc->b_me_h, desc->b_md, PTL_UNLINK,
+ &desc->b_md_h);
+ if (rc != PTL_OK) {
+ CERROR("PtlMDAttach failed: %d\n", rc);
+ LBUG();
+ GOTO(cleanup, rc);
+ }
+
+ CDEBUG(D_NET, "Setup bulk sink buffers: %u pages %u bytes, xid %u, "
+ "portal %u\n", desc->b_md.niov, desc->b_md.length,
+ xid, desc->b_portal);
RETURN(0);
int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc)
{
- struct list_head *tmp, *next;
-
- list_for_each_safe(tmp, next, &desc->b_page_list) {
- struct ptlrpc_bulk_page *bulk;
- bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
-
- /* This should be safe: these handles are initialized to be
- * invalid in ptlrpc_prep_bulk_page() */
- PtlMDUnlink(bulk->b_md_h);
- PtlMEUnlink(bulk->b_me_h);
- }
+ /* This should be safe: these handles are initialized to be
+ * invalid in ptlrpc_prep_bulk() */
+ PtlMDUnlink(desc->b_md_h);
+ PtlMEUnlink(desc->b_me_h);
return 0;
}
/* FIXME: If we move to an event-driven model, we should put the request
* on the stack of mds_handle instead. */
+ LASSERT ((event->mem_desc.options & PTL_MD_IOV) == 0);
start = event->mem_desc.start;
memset(&request, 0, sizeof(request));
{
int index;
+ LASSERT ((ev->mem_desc.options & PTL_MD_IOV) == 0);
+
for (index = 0; index < service->srv_ring_length; index++)
if (service->srv_buf[index] == ev->mem_desc.start)
break;
noinst_SCRIPTS = fs.sh intent-test.sh intent-test2.sh leak_finder.pl \
lldlm.sh llecho.sh llext3.sh llmodules.sh llmount-client.sh \
llmount-server.sh llmount.sh llmountcleanup.sh llrext3.sh \
- llrmount.sh llsimple.sh mdcreq.sh mdcreqcleanup.sh \
+ llrmount.sh llsimple.sh mdcreq.sh mdcreqcleanup.sh \
ostreq.sh runfailure-client-mds-recover.sh runfailure-mds \
runfailure-net runfailure-ost runiozone runregression-net.sh \
runtests runvmstat snaprun.sh
$OBDCTL <<- EOF || return $?
newdev
attach ost OSTDEV OSTUUID
- setup \$OBDDEV
+ setup OBDUUID
quit
EOF
}
fi
[ ! -d $MTPT ] && mkdir $MTPT
- echo mount -t lustre_lite -o ost=${THEOSC}-UUID,mds=${THEMDC}-UUID none $MTPT
- mount -t lustre_lite -o ost=${THEOSC}-UUID,mds=${THEMDC}-UUID none $MTPT
+ echo mount -t lustre_lite -o osc=${THEOSC}-UUID,mdc=${THEMDC}-UUID none $MTPT
+ mount -t lustre_lite -o osc=${THEOSC}-UUID,mdc=${THEMDC}-UUID none $MTPT
done
done
}
remount() {
umount $MTPT || exit -1
debugctl clear
- mount -t lustre_lite -o ost=OSCDEV-UUID,mds=MDCDEV-UUID none $MTPT
+ mount -t lustre_lite -o osc=OSCDEV-UUID,mdc=MDCDEV-UUID none $MTPT
}
# Test mkdir
-#!/bin/sh
+#!/bin/sh -vx
SRCDIR="`dirname $0`/"
[ -f $SRCDIR/common.sh ] || SRCDIR="/lib/lustre"
<lov name="lov1" uuid="lov1-UUID">
<mdc_ref uuidref="mdc1-UUID"/>
- <devices stripesize="65536" pattern="0">
+ <devices stripesize="65536" stripeoffset="0" pattern="0">
<osc_ref uuidref="osc1-UUID"/>
<osc_ref uuidref="osc2-UUID"/>
</devices>
int rc, size, i;
IOCINIT(data);
- if (argc <= 5)
+ if (argc <= 6)
return CMD_HELP;
if (strlen(argv[1]) > sizeof(uuid_t) - 1) {
memset(&desc, 0, sizeof(desc));
strcpy(desc.ld_uuid, argv[1]);
- desc.ld_default_stripecount = strtoul(argv[2], NULL, 0);
- desc.ld_default_stripesize = strtoul(argv[3], NULL, 0);
- desc.ld_pattern = strtoul(argv[4], NULL, 0);
- desc.ld_tgt_count = argc - 5;
+ desc.ld_default_stripe_count = strtoul(argv[2], NULL, 0);
+ desc.ld_default_stripe_size = (__u64) strtoul(argv[3], NULL, 0);
+ desc.ld_default_stripe_offset = (__u64) strtoul(argv[3], NULL, 0);
+ desc.ld_pattern = strtoul(argv[5], NULL, 0);
+ desc.ld_tgt_count = argc - 6;
size = sizeof(uuid_t) * desc.ld_tgt_count;
return -ENOMEM;
}
memset(uuidarray, 0, size);
- for (i=5 ; i < argc ; i++) {
- char *buf = (char *) (uuidarray + i -5 );
+ for (i=6 ; i < argc ; i++) {
+ char *buf = (char *) (uuidarray + i -6 );
if (strlen(argv[i]) >= sizeof(uuid_t)) {
fprintf(stderr, "lov_config: arg %d (%s) too long\n",
i, argv[i]);
self.run(cmds)
# create an lov
- def lovconfig(self, uuid, mdcuuid, stripe_cnt, stripe_sz, pattern, devlist):
+ def lovconfig(self, uuid, mdcuuid, stripe_cnt, stripe_sz, stripe_off, pattern, devlist):
cmds = """
device $%s
probe
- lovconfig %s %d %d %s %s
- quit""" % (mdcuuid, uuid, stripe_cnt, stripe_sz, pattern, devlist)
+ lovconfig %s %d %d %d %s %s
+ quit""" % (mdcuuid, uuid, stripe_cnt, stripe_sz, stripe_off, pattern, devlist)
self.run(cmds)
# ============================================================
setup ="")
def prepare_lov(node):
- (name, uuid, mdcuuid, stripe_cnt, strip_sz, pattern, devlist, mdsname) = getLOVInfo(node)
- print 'LOV:', name, uuid, mdcuuid, stripe_cnt, strip_sz, pattern, devlist, mdsname
- lctl.lovconfig(uuid, mdsname, stripe_cnt, strip_sz, pattern, devlist)
+ (name, uuid, mdcuuid, stripe_cnt, strip_sz, stripe_off, pattern, devlist, mdsname) = getLOVInfo(node)
+ print 'LOV:', name, uuid, mdcuuid, stripe_cnt, strip_sz, stripe_off, pattern, devlist, mdsname
+ lctl.lovconfig(uuid, mdsname, stripe_cnt, strip_sz, stripe_off, pattern, devlist)
lctl.newdev(attach="lov %s %s" % (name, uuid),
setup ="%s" % (mdcuuid))
name, uuid, obd = getOSTInfo(ost)
print 'OST:', name, uuid, obd
lctl.newdev(attach="ost %s %s" % (name, uuid),
- setup ="$%s" % (obd))
+ setup ="%s" % (obd))
def prepare_mds(node):
(name, uuid, dev, size, fstype, format) = getMDSInfo(node)
print 'MDC:', name, uuid, mdsuuid, netuuid
net = lookup(node.parentNode, netuuid)
srvname, srvuuid, net, server, port = getNetworkInfo(net)
+ mds = lookup(node.parentNode, mdsuuid)
+ if mds == None:
+ panic(mdsuuid, "not found.")
lctl.connect(net, server, port, netuuid)
lctl.newdev(attach="mdc %s %s" % (name, uuid),
setup ="%s %s" %(mdsuuid, netuuid))
def prepare_mountpoint(node):
name, uuid, oscuuid, mdcuuid, mtpt = getMTPTInfo(node)
print 'MTPT:', name, uuid, oscuuid, mdcuuid, mtpt
- cmd = "mount -t lustre_lite -o ost=%s,mds=%s none %s" % \
+ cmd = "mount -t lustre_lite -o osc=%s,mdc=%s none %s" % \
(oscuuid, mdcuuid, mtpt)
run("mkdir", mtpt)
- run(cmd)
+ ret, val = run(cmd)
+ if ret:
+ print mtpt, "mount failed."
# ============================================================
# Functions to cleanup the various objects
print "cleanup failed: ", name
def cleanup_lov(node):
- (name, uuid, mdcuuid, stripe_cnt, strip_sz, pattern, devlist, mdsname) = getLOVInfo(node)
- print 'LOV:', name, uuid, mdcuuid, stripe_cnt, strip_sz, pattern, devlist, mdsname
+ (name, uuid, mdcuuid, stripe_cnt, strip_sz, stripe_off, pattern, devlist, mdsname) = getLOVInfo(node)
+ print 'LOV:', name, uuid, mdcuuid, stripe_cnt, strip_sz, stripe_off, pattern, devlist, mdsname
try:
lctl.cleanup(name, uuid)
except CommandError:
name, uuid = getNodeAttr(node)
devs = node.getElementsByTagName('devices')[0]
stripe_sz = int(devs.getAttribute('stripesize'))
+ stripe_off = int(devs.getAttribute('stripeoffset'))
pattern = int(devs.getAttribute('pattern'))
mdcref = node.getElementsByTagName('mdc_ref')[0]
mdcuuid = mdcref.getAttribute('uuidref')
if child.nodeName == 'osc_ref':
devlist = devlist + child.getAttribute('uuidref') + " "
strip_cnt = stripe_cnt + 1
- return (name, uuid, mdcuuid, stripe_cnt, stripe_sz, pattern, devlist, mdsname)
+ return (name, uuid, mdcuuid, stripe_cnt, stripe_sz, stripe_off, pattern, devlist, mdsname)
# extract device attributes for an obd
def getMDSInfo(node):
name, uuid = getNodeAttr(node)
ref = node.getElementsByTagName('obd_ref')[0]
uuid = ref.getAttribute('uuidref')
- obd = lookup(node.parentNode, uuid)
- if obd:
- obdname = getOBDInfo(obd)[0]
- else:
- obdname = "OBD NOT FOUND"
- return (name, uuid, obdname)
+ return (name, uuid, uuid)
# extract device attributes for an obd
def getOSCInfo(node):
#
# Load profile for
def doHost(lustreNode, hosts, cleanFlag):
+ node = None
for h in hosts:
node = getByName(lustreNode, 'node', h)
if node:
break
-
+
+ if not node:
+ print 'No host entry found.'
+ return
+
reflist = node.getElementsByTagName('profile_ref')
for r in reflist:
if cleanFlag:
usage()
if not options.has_key('hostname'):
+ options['hostname'] = []
ret, host = run('hostname')
if ret:
print "unable to determine hostname"
- else:
- options['hostname'] = [string.strip(host[0])]
+ elif len(host) > 0:
+ options['hostname'].append(string.strip(host[0]))
options['hostname'].append('localhost')
print "configuring for host: ", options['hostname']
doHost(dom.childNodes[0], options['hostname'], options.has_key('cleanup') )
<!ATTLIST lov %tag.attr;>\r
<!ELEMENT devices (osc_ref)+>\r
<!ATTLIST devices stripesize CDATA #REQUIRED\r
+ stripeoffset CDATA #REQUIRED\r
pattern CDATA #REQUIRED>\r
<!ELEMENT router (misc)*>\r
<!ATTLIST router %tag.attr;>\r
#include "parser.h"
#include <stdio.h>
+#define SHMEM_STATS 1
+#if SHMEM_STATS
+#include <sys/ipc.h>
+#include <sys/shm.h>
+
+#define MAX_SHMEM_COUNT 1024
+static long long *shared_counters;
+static long long counter_snapshot[2][MAX_SHMEM_COUNT];
+struct timeval prev_time;
+#endif
+
static int jt_newdev(int argc, char **argv);
static int jt_attach(int argc, char **argv);
static int jt_setup(int argc, char **argv);
return rc;
}
+#if SHMEM_STATS
+static void
+shmem_setup ()
+{
+ int shmid = shmget (IPC_PRIVATE, sizeof (counter_snapshot[0]), 0600);
+
+ if (shmid == -1)
+ {
+ fprintf (stderr, "Can't create shared memory counters: %s\n", strerror (errno));
+ return;
+ }
+
+ shared_counters = (long long *)shmat (shmid, NULL, 0);
+
+ if (shared_counters == (long long *)(-1))
+ {
+ fprintf (stderr, "Can't attach shared memory counters: %s\n", strerror (errno));
+ shared_counters = NULL;
+ return;
+ }
+}
+
+static inline void
+shmem_reset ()
+{
+ if (shared_counters == NULL)
+ return;
+
+ memset (shared_counters, 0, sizeof (counter_snapshot[0]));
+ memset (counter_snapshot, 0, sizeof (counter_snapshot));
+ gettimeofday (&prev_time, NULL);
+}
+
+static inline void
+shmem_bump ()
+{
+ if (shared_counters == NULL ||
+ thread <= 0 || thread > MAX_SHMEM_COUNT)
+ return;
+
+ shared_counters[thread - 1]++;
+}
+
+static void
+shmem_snap (int n)
+{
+ struct timeval this_time;
+ int non_zero = 0;
+ long long total = 0;
+ double secs;
+ int i;
+
+ if (shared_counters == NULL || n > MAX_SHMEM_COUNT)
+ return;
+
+ memcpy (counter_snapshot[1], counter_snapshot[0], n * sizeof (counter_snapshot[0][0]));
+ memcpy (counter_snapshot[0], shared_counters, n * sizeof (counter_snapshot[0][0]));
+ gettimeofday (&this_time, NULL);
+
+ for (i = 0; i < n; i++)
+ {
+ long long this_count = counter_snapshot[0][i] - counter_snapshot[1][i];
+
+ if (this_count != 0)
+ {
+ non_zero++;
+ total += this_count;
+ }
+ }
+
+ secs = (this_time.tv_sec + this_time.tv_usec/1000000.0) -
+ (prev_time.tv_sec + prev_time.tv_usec/1000000.0);
+
+ printf ("%d/%d Total: %f/second\n", non_zero, n, total / secs);
+
+ prev_time = this_time;
+}
+
+#define SHMEM_SETUP() shmem_setup()
+#define SHMEM_RESET() shmem_reset()
+#define SHMEM_BUMP() shmem_bump()
+#define SHMEM_SNAP(n) shmem_snap(n)
+#else
+#define SHMEM_SETUP()
+#define SHMEM_RESET()
+#define SHMEM_BUMP()
+#define SHMEM_SNAP(n)
+#endif
+
extern command_t cmdlist[];
static int xml_command(char *cmd, ...) {
{
int threads, next_thread;
int verbose;
- int i, j;
+ int i;
if (argc < 5) {
fprintf(stderr,
verbose = get_verbose(argv[2]);
- printf("%s: starting %d threads on device %s running %s\n",
- argv[0], threads, argv[3], argv[4]);
+ if (verbose != 0)
+ printf("%s: starting %d threads on device %s running %s\n",
+ argv[0], threads, argv[3], argv[4]);
+ SHMEM_RESET();
+
for (i = 1, next_thread = verbose; i <= threads; i++) {
rc = fork();
if (rc < 0) {
}
if (!thread) { /* parent process */
- if (!verbose)
- printf("%s: started %d threads\n\n", argv[0], i - 1);
- else
- printf("\n");
-
- for (j = 1; j < i; j++) {
- int status;
- int ret = wait(&status);
+ int live_threads = threads;
+
+ while (live_threads > 0)
+ {
+ int status;
+ pid_t ret;
+
+ ret = waitpid (0, &status, verbose < 0 ? WNOHANG : 0);
+ if (ret == 0)
+ {
+ if (verbose >= 0)
+ abort ();
+
+ sleep (-verbose);
+ SHMEM_SNAP (threads);
+ continue;
+ }
if (ret < 0) {
fprintf(stderr, "error: %s: wait - %s\n",
argv[0], ret, err);
if (!rc)
rc = err;
+
+ live_threads--;
}
}
}
for (i = 1, next_count = verbose; i <= count ; i++) {
rc = ioctl(fd, OBD_IOC_CREATE , &data);
+ SHMEM_BUMP();
if (rc < 0) {
fprintf(stderr, "error: %s: #%d - %s\n",
cmdname(argv[0]), i, strerror(rc = errno));
gettimeofday(&start, NULL);
next_time.tv_sec = start.tv_sec - verbose;
next_time.tv_usec = start.tv_usec;
- printf("%s: getting %d attrs (testing only): %s", cmdname(argv[0]),
- count, ctime(&start.tv_sec));
+ if (verbose != 0)
+ printf("%s: getting %d attrs (testing only): %s", cmdname(argv[0]),
+ count, ctime(&start.tv_sec));
for (i = 1, next_count = verbose; i <= count; i++) {
rc = ioctl(fd, OBD_IOC_GETATTR , &data);
+ SHMEM_BUMP();
if (rc < 0) {
fprintf(stderr, "error: %s: #%d - %s\n",
cmdname(argv[0]), i, strerror(rc = errno));
diff = difftime(&end, &start);
--i;
- printf("%s: %d attrs in %.4gs (%.4g attr/s): %s",
- cmdname(argv[0]), i, diff, (double)i / diff,
- ctime(&end.tv_sec));
+ if (verbose != 0)
+ printf("%s: %d attrs in %.4gs (%.4g attr/s): %s",
+ cmdname(argv[0]), i, diff, (double)i / diff,
+ ctime(&end.tv_sec));
}
return rc;
}
next_time.tv_sec = start.tv_sec - verbose;
next_time.tv_usec = start.tv_usec;
- printf("%s: %s %d (%dx%d pages) (testing only): %s",
- cmdname(argv[0]), write ? "writing" : "reading",
- count, obdos, pages, ctime(&start.tv_sec));
+ if (verbose != 0)
+ printf("%s: %s %d (%dx%d pages) (testing only): %s",
+ cmdname(argv[0]), write ? "writing" : "reading",
+ count, obdos, pages, ctime(&start.tv_sec));
/*
* We will put in the start time (and loop count inside the loop)
}
rc = ioctl(fd, rw, &data);
+ SHMEM_BUMP();
if (rc) {
fprintf(stderr, "error: %s: #%d - %s on %s\n",
cmdname(argv[0]), i, strerror(rc = errno),
diff = difftime(&end, &start);
--i;
- printf("%s: %s %dx%dx%d pages in %.4gs (%.4g pg/s): %s",
- cmdname(argv[0]), write ? "wrote" : "read", obdos,
- pages, i, diff, (double)obdos * i * pages / diff,
- ctime(&end.tv_sec));
+ if (verbose != 0)
+ printf("%s: %s %dx%dx%d pages in %.4gs (%.4g pg/s): %s",
+ cmdname(argv[0]), write ? "wrote" : "read", obdos,
+ pages, i, diff, (double)obdos * i * pages / diff,
+ ctime(&end.tv_sec));
}
return rc;
}
int size, i;
IOCINIT(data);
+ printf("WARNING: obdctl lovconfig NOT MAINTAINED\n");
+ return -1;
+
if (argc <= 5 ){
Parser_printhelp("lovconfig");
return -1;
memset(&desc, 0, sizeof(desc));
strcpy(desc.ld_uuid, argv[1]);
- desc.ld_default_stripecount = strtoul(argv[2], NULL, 0);
- desc.ld_default_stripesize = strtoul(argv[3], NULL, 0);
+ desc.ld_default_stripe_count = strtoul(argv[2], NULL, 0);
+ desc.ld_default_stripe_size = strtoul(argv[3], NULL, 0);
desc.ld_pattern = strtoul(argv[4], NULL, 0);
desc.ld_tgt_count = argc - 5;
sigact.sa_flags = SA_RESTART;
sigaction(SIGINT, &sigact, NULL);
-
+ setlinebuf (stdout);
+ SHMEM_SETUP();
+
if (argc > 1) {
rc = Parser_execarg(argc - 1, argv + 1, cmdlist);
} else {
#include <sys/param.h>
#include <assert.h>
-#include <config.h>
#ifdef HAVE_LIBREADLINE
#define READLINE_LIBRARY
#include <readline/readline.h>