a peer, so the peer is passed in and managed separately.
- All of the DLM calls are hooked up to the RPC system now
- Some unmaintained local processing code removed
- Added ldlm network-related bits to the test scripts
#ifndef _LUSTRE_DLM_H__
#define _LUSTRE_DLM_H__
-#include <linux/kp30.h>
-#include <linux/list.h>
+#ifdef __KERNEL__
#include <linux/obd_class.h>
-
-#ifdef __KERNEL__
+#include <linux/lustre_net.h>
#define OBD_LDLM_DEVICENAME "ldlm"
-typedef int cluster_host;
-typedef int cluster_pid;
+typedef int cluster_host;
+typedef int cluster_pid;
typedef enum {
ELDLM_OK = 0,
- ELDLM_BLOCK_GRANTED = 600,
- ELDLM_BLOCK_CONV,
- ELDLM_BLOCK_WAIT,
- ELDLM_BAD_NAMESPACE,
- ELDLM_LOCK_CHANGED
+
+ ELDLM_LOCK_CHANGED = 300,
+
+ ELDLM_NAMESPACE_EXISTS = 400,
+ ELDLM_BAD_NAMESPACE = 401
} ldlm_error_t;
#define LDLM_FL_LOCK_CHANGED (1 << 0)
-#define LDLM_FL_COMPLETION_AST (1 << 1)
-#define LDLM_FL_BLOCKING_AST (1 << 2)
+#define LDLM_FL_BLOCK_GRANTED (1 << 1)
+#define LDLM_FL_BLOCK_CONV (1 << 2)
+#define LDLM_FL_BLOCK_WAIT (1 << 3)
#define L2B(c) (1 << c)
ldlm_mode_t l_granted_mode;
ldlm_lock_callback l_completion_ast;
ldlm_lock_callback l_blocking_ast;
- struct lustre_peer *l_peer;
+ struct lustre_peer l_peer;
void *l_data;
__u32 l_data_len;
struct ldlm_extent l_extent;
+ struct ldlm_handle l_remote_handle;
//void *l_event;
//XXX cluster_host l_holder;
__u32 l_version[RES_VERSION_SIZE];
/* ldlm_lock.c */
void ldlm_lock_free(struct ldlm_lock *lock);
+void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc);
ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev,
__u32 ns_id,
struct ldlm_handle *parent_lock_handle,
/* resource.c */
struct ldlm_namespace *ldlm_namespace_find(struct obd_device *, __u32 id);
-struct ldlm_namespace *ldlm_namespace_new(struct obd_device *, __u32 id);
+ldlm_error_t ldlm_namespace_new(struct obd_device *, __u32 id,
+ struct ldlm_namespace **);
int ldlm_namespace_free(struct ldlm_namespace *ns);
-void ldlm_resource_dump(struct ldlm_resource *res);
struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns,
struct ldlm_resource *parent,
__u64 *name, __u32 type, int create);
void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
struct ldlm_lock *lock);
void ldlm_resource_del_lock(struct ldlm_lock *lock);
+void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc);
+void ldlm_resource_dump(struct ldlm_resource *res);
+
+/* ldlm_request.c */
+int ldlm_cli_namespace_new(struct ptlrpc_client *, struct lustre_peer *,
+ __u32 ns_id, struct ptlrpc_request **);
+int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct lustre_peer *peer,
+ __u32 ns_id,
+ struct ldlm_handle *parent_lock_handle,
+ __u64 *res_id,
+ __u32 type,
+ struct ldlm_extent *req_ex,
+ ldlm_mode_t mode,
+ int *flags,
+ void *data,
+ __u32 data_len,
+ struct ldlm_handle *lockh,
+ struct ptlrpc_request **request);
+int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
+ void *data, __u32 data_len);
+
#endif /* __KERNEL__ */
* MDS REQ RECORDS
*/
+/* opcodes */
#define MDS_GETATTR 1
#define MDS_OPEN 2
#define MDS_CLOSE 3
*/
/* opcodes */
-#define LDLM_ENQUEUE 1
-#define LDLM_CONVERT 2
-#define LDLM_CANCEL 3
-#define LDLM_CALLBACK 4
+#define LDLM_NAMESPACE_NEW 1
+#define LDLM_ENQUEUE 2
+#define LDLM_CONVERT 3
+#define LDLM_CANCEL 4
+#define LDLM_CALLBACK 5
#define RES_NAME_SIZE 3
#define RES_VERSION_SIZE 4
__u64 end;
};
+struct ldlm_resource_desc {
+ __u32 lr_ns_id;
+ __u32 lr_type;
+ __u64 lr_name[RES_NAME_SIZE];
+ __u64 lr_version[RES_VERSION_SIZE];
+};
+
+struct ldlm_lock_desc {
+ struct ldlm_resource_desc l_resource;
+ ldlm_mode_t l_req_mode;
+ ldlm_mode_t l_granted_mode;
+ struct ldlm_extent l_extent;
+ __u32 l_version[RES_VERSION_SIZE];
+};
+
struct ldlm_request {
- __u32 ns_id;
- __u64 res_id[RES_NAME_SIZE];
- __u32 res_type;
__u32 flags;
- struct ldlm_extent lock_extent;
- struct ldlm_handle parent_res_handle;
- struct ldlm_handle parent_lock_handle;
- ldlm_mode_t mode;
+ struct ldlm_lock_desc lock_desc;
+ struct ldlm_handle lock_handle1;
+ struct ldlm_handle lock_handle2;
};
struct ldlm_reply {
struct ldlm_handle lock_handle;
+ struct ldlm_extent lock_extent;
};
/*
unsigned long ll_cache_count;
struct semaphore ll_list_mutex;
struct ptlrpc_client ll_mds_client;
+ struct lustre_peer ll_mds_peer;
struct ptlrpc_client ll_ost_client;
+ struct lustre_peer ll_ost_peer;
};
struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt);
/* llight/request.c */
-int mdc_getattr(struct ptlrpc_client *peer, ino_t ino, int type, int valid,
- struct ptlrpc_request **);
-int mdc_setattr(struct ptlrpc_client *peer, struct inode *inode,
+int mdc_getattr(struct ptlrpc_client *, struct lustre_peer *, ino_t ino,
+ int type, int valid, struct ptlrpc_request **);
+int mdc_setattr(struct ptlrpc_client *, struct lustre_peer *, struct inode *,
struct iattr *iattr, struct ptlrpc_request **);
-int mdc_open(struct ptlrpc_client *cl, ino_t ino, int type, int flags,
- __u64 *fh, struct ptlrpc_request **req);
-int mdc_close(struct ptlrpc_client *cl, ino_t ino, int type, __u64 fh,
- struct ptlrpc_request **req);
-int mdc_readpage(struct ptlrpc_client *peer, ino_t ino, int type, __u64 offset,
- char *addr, struct ptlrpc_request **);
-int mdc_create(struct ptlrpc_client *peer,
+int mdc_open(struct ptlrpc_client *, struct lustre_peer *, ino_t ino, int type,
+ int flags, __u64 *fh, struct ptlrpc_request **req);
+int mdc_close(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino,
+ int type, __u64 fh, struct ptlrpc_request **req);
+int mdc_readpage(struct ptlrpc_client *, struct lustre_peer *, ino_t ino,
+ int type, __u64 offset, char *addr, struct ptlrpc_request **);
+int mdc_create(struct ptlrpc_client *, struct lustre_peer *,
struct inode *dir, const char *name, int namelen,
const char *tgt, int tgtlen,
int mode, __u64 id, __u32 uid, __u32 gid, __u64 time,
struct ptlrpc_request **);
-int mdc_unlink(struct ptlrpc_client *peer, struct inode *dir,
+int mdc_unlink(struct ptlrpc_client *, struct lustre_peer *, struct inode *dir,
struct inode *child, const char *name, int namelen,
struct ptlrpc_request **);
-int mdc_link(struct ptlrpc_client *peer, struct dentry *src,
+int mdc_link(struct ptlrpc_client *, struct lustre_peer *, struct dentry *src,
struct inode *dir, const char *name, int namelen,
struct ptlrpc_request **);
-int mdc_rename(struct ptlrpc_client *peer, struct inode *src,
+int mdc_rename(struct ptlrpc_client *, struct lustre_peer *, struct inode *src,
struct inode *tgt, const char *old, int oldlen,
- const char *new, int newlen,
- struct ptlrpc_request **);
+ const char *new, int newlen, struct ptlrpc_request **);
int mdc_create_client(char *uuid, struct ptlrpc_client *cl);
struct mds_fs_operations {
#define MDS_REPLY_PORTAL 11
#define MDS_BULK_PORTAL 12
-#define LDLM_REQUEST_PORTAL 13
-#define LDLM_REPLY_PORTAL 14
+#define LDLM_REQUEST_PORTAL 13
+#define LDLM_REPLY_PORTAL 14
+#define LDLM_CLI_REQUEST_PORTAL 15
+#define LDLM_CLI_REPLY_PORTAL 16
/* default rpc ring length */
#define RPC_RING_LENGTH 2
#define SVC_STOPPING 1
-#define SVC_RUNNING 2
-#define SVC_STOPPED 4
-#define SVC_KILLED 8
-#define SVC_EVENT 16
-#define SVC_LIST 32
-#define SVC_SIGNAL 64
+#define SVC_RUNNING 2
+#define SVC_STOPPED 4
+#define SVC_KILLED 8
+#define SVC_EVENT 16
+#define SVC_LIST 32
+#define SVC_SIGNAL 64
struct ptlrpc_client {
- struct lustre_peer cli_server;
struct obd_device *cli_obd;
struct list_head cli_sending_head;
struct list_head cli_sent_head;
int ptl_send_buf(struct ptlrpc_request *, struct lustre_peer *, int portal);
int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *);
int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *bulk);
-int ptlrpc_reply(struct obd_device *obddev, struct ptlrpc_service *svc,
- struct ptlrpc_request *req);
-int ptlrpc_error(struct obd_device *obddev, struct ptlrpc_service *svc,
- struct ptlrpc_request *req);
+int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req);
+int ptlrpc_error(struct ptlrpc_service *svc, struct ptlrpc_request *req);
int ptl_send_rpc(struct ptlrpc_request *request, struct ptlrpc_client *cl);
void ptlrpc_link_svc_me(struct ptlrpc_service *service, int i);
/* rpc/client.c */
void ptlrpc_init_client(int dev, int req_portal, int rep_portal,
- struct ptlrpc_client *cl);
-int ptlrpc_connect_client(int dev, char *uuid, struct ptlrpc_client *cl);
+ struct ptlrpc_client *cl);
+int ptlrpc_connect_client(char *uuid, struct ptlrpc_client *cl,
+ struct lustre_peer *peer);
int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req);
int ptlrpc_queue_req(struct ptlrpc_client *peer, struct ptlrpc_request *req);
-struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, int opcode,
+struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
+ struct lustre_peer *peer, int opcode,
int count, int *lengths, char **bufs);
void ptlrpc_free_req(struct ptlrpc_request *request);
struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct lustre_peer *);
struct ldlm_obd {
struct ptlrpc_service *ldlm_service;
+ struct ptlrpc_client *ldlm_client;
+ struct lustre_peer ldlm_server_peer;
struct list_head ldlm_namespaces;
spinlock_t ldlm_lock;
struct osc_obd {
struct obd_device *osc_tgt;
struct ptlrpc_client *osc_client;
+ struct lustre_peer osc_peer;
};
/* corresponds to one of the obd's */
#define __LINUX_CLASS_OBD_H
#ifndef __KERNEL__
-#include <stdint.h>
-#define __KERNEL__
-#include <linux/list.h>
-#undef __KERNEL__
+# include <stdint.h>
+# define __KERNEL__
+# include <linux/list.h>
+# undef __KERNEL__
#else
#include <asm/segment.h>
#include <asm/uaccess.h>
#define OBD_FAIL_OST_PUNCH_NET 0x20b
#define OBB_FAIL_LDLM 0x300
-#define OBD_FAIL_LDLM_ENQUEUE 0x301
-#define OBD_FAIL_LDLM_CONVERT 0x302
-#define OBD_FAIL_LDLM_CANCEL 0x303
-#define OBD_FAIL_LDLM_CALLBACK 0x304
+#define OBD_FAIL_LDLM_NAMESPACE_NEW 0x301
+#define OBD_FAIL_LDLM_ENQUEUE 0x302
+#define OBD_FAIL_LDLM_CONVERT 0x303
+#define OBD_FAIL_LDLM_CANCEL 0x304
+#define OBD_FAIL_LDLM_CALLBACK 0x305
/* preparation for a more advanced failure testbed (not functional yet) */
#define OBD_FAIL_MASK_SYS 0x0000FF00
} \
} while(0)
+#include <linux/types.h>
#include <linux/blkdev.h>
static inline void OBD_FAIL_WRITE(int id, kdev_t dev)
EXTRA_PROGRAMS = ldlm
ldlm_SOURCES = ldlm_lock.c ldlm_resource.c ldlm_test.c ldlm_lockd.c \
-ldlm_extent.c
+ldlm_extent.c ldlm_request.c
include $(top_srcdir)/Rules
kmem_cache_free(ldlm_lock_slab, lock);
}
+void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
+{
+ ldlm_res2desc(lock->l_resource, &desc->l_resource);
+ desc->l_req_mode = lock->l_req_mode;
+ desc->l_granted_mode = lock->l_granted_mode;
+ memcpy(&desc->l_extent, &lock->l_extent, sizeof(desc->l_extent));
+ memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version));
+}
+
static int ldlm_lock_compat(struct ldlm_lock *lock)
{
struct list_head *tmp;
res->lr_most_restr = lock->l_granted_mode;
if (lock->l_completion_ast)
- lock->l_completion_ast(lock, NULL, NULL, 0);
+ lock->l_completion_ast(lock, NULL,
+ lock->l_data, lock->l_data_len);
}
/* XXX: Revisit the error handling; we do not, for example, do
memcpy(&lock->l_extent, req_ex, sizeof(*req_ex));
lock->l_data = data;
lock->l_data_len = data_len;
- if ((*flags) & LDLM_FL_COMPLETION_AST)
- lock->l_completion_ast = completion;
- if ((*flags) & LDLM_FL_BLOCKING_AST)
- lock->l_blocking_ast = blocking;
+ lock->l_completion_ast = completion;
+ lock->l_blocking_ast = blocking;
ldlm_object2handle(lock, lockh);
spin_lock(&res->lr_lock);
if (!list_empty(&res->lr_converting)) {
ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
- GOTO(out, rc = -ELDLM_BLOCK_CONV);
+ *flags |= LDLM_FL_BLOCK_CONV;
+ GOTO(out, ELDLM_OK);
}
if (!list_empty(&res->lr_waiting)) {
ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
- GOTO(out, rc = -ELDLM_BLOCK_WAIT);
+ *flags |= LDLM_FL_BLOCK_WAIT;
+ GOTO(out, ELDLM_OK);
}
incompat = ldlm_lock_compat(lock);
if (incompat) {
ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
- GOTO(out, rc = -ELDLM_BLOCK_GRANTED);
+ *flags |= LDLM_FL_BLOCK_GRANTED;
+ GOTO(out, ELDLM_OK);
}
ldlm_grant_lock(res, lock);
- GOTO(out, rc = ELDLM_OK);
-
+ EXIT;
out:
spin_unlock(&res->lr_lock);
- return rc;
+ return ELDLM_OK;
}
static int ldlm_reprocess_queue(struct ldlm_resource *res,
#define DEBUG_SUBSYSTEM S_LDLM
-#include <linux/obd_class.h>
#include <linux/lustre_dlm.h>
-#include <linux/lustre_net.h>
extern kmem_cache_t *ldlm_resource_slab;
extern kmem_cache_t *ldlm_lock_slab;
-static int ldlm_client_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
- void *data, __u32 data_len)
+static int _ldlm_namespace_new(struct obd_device *obddev,
+ struct ptlrpc_request *req)
{
- LBUG();
- return 0;
+ struct ldlm_request *dlm_req;
+ struct ldlm_namespace *ns;
+ int rc;
+ ldlm_error_t err;
+ ENTRY;
+
+ rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
+ req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ if (rc) {
+ CERROR("out of memory\n");
+ req->rq_status = -ENOMEM;
+ RETURN(0);
+ }
+ dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+
+ err = ldlm_namespace_new(obddev, dlm_req->lock_desc.l_resource.lr_ns_id,
+ &ns);
+ req->rq_status = err;
+
+ CERROR("err = %d\n", err);
+
+ RETURN(0);
}
-static int ldlm_enqueue(struct ptlrpc_request *req)
+static int _ldlm_enqueue(struct ptlrpc_request *req)
{
struct ldlm_reply *dlm_rep;
struct ldlm_request *dlm_req;
+ int rc, size = sizeof(*dlm_rep);
ldlm_error_t err;
- int rc, bufsize = sizeof(*dlm_rep);
-
- rc = lustre_pack_msg(1, &bufsize, NULL, &req->rq_replen,
- &req->rq_repbuf);
+ ENTRY;
+
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf);
+ req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
if (rc) {
CERROR("out of memory\n");
req->rq_status = -ENOMEM;
RETURN(0);
}
- req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+ memcpy(&dlm_rep->lock_extent, &dlm_req->lock_desc.l_extent,
+ sizeof(dlm_rep->lock_extent));
+
err = ldlm_local_lock_enqueue(req->rq_obd,
- dlm_req->ns_id,
- &dlm_req->parent_lock_handle,
- dlm_req->res_id,
- dlm_req->res_type,
- &dlm_req->lock_extent,
- dlm_req->mode,
+ dlm_req->lock_desc.l_resource.lr_ns_id,
+ &dlm_req->lock_handle2,
+ dlm_req->lock_desc.l_resource.lr_name,
+ dlm_req->lock_desc.l_resource.lr_type,
+ &dlm_rep->lock_extent,
+ dlm_req->lock_desc.l_req_mode,
&dlm_req->flags,
- ldlm_client_callback,
- ldlm_client_callback,
+ //ldlm_cli_callback,
+ NULL,
+ ldlm_cli_callback,
lustre_msg_buf(req->rq_reqmsg, 1),
req->rq_reqmsg->buflens[1],
&dlm_rep->lock_handle);
+ if (err != -ENOMEM && err != -ELDLM_BAD_NAMESPACE) {
+ struct ldlm_lock *lock;
+ lock = ldlm_handle2object(&dlm_rep->lock_handle);
+ memcpy(&lock->l_peer, &req->rq_peer, sizeof(lock->l_peer));
+ memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
+ sizeof(lock->l_remote_handle));
+ }
req->rq_status = err;
- CERROR("local_lock_enqueue: %d\n", err);
+ CERROR("err = %d\n", err);
+
+ RETURN(0);
+}
+
+static int _ldlm_convert(struct ptlrpc_request *req)
+{
+ struct ldlm_request *dlm_req;
+ int rc;
+ ENTRY;
+
+ rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
+ req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ if (rc) {
+ CERROR("out of memory\n");
+ req->rq_status = -ENOMEM;
+ RETURN(0);
+ }
+ dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+
+ req->rq_status =
+ ldlm_local_lock_convert(req->rq_obd, &dlm_req->lock_handle1,
+ dlm_req->lock_desc.l_req_mode,
+ &dlm_req->flags);
+ RETURN(0);
+}
+
+static int _ldlm_cancel(struct ptlrpc_request *req)
+{
+ struct ldlm_request *dlm_req;
+ int rc;
+ ENTRY;
+
+ rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
+ req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ if (rc) {
+ CERROR("out of memory\n");
+ req->rq_status = -ENOMEM;
+ RETURN(0);
+ }
+ dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+
+ req->rq_status =
+ ldlm_local_lock_cancel(req->rq_obd, &dlm_req->lock_handle1);
+ RETURN(0);
+}
+
+static int _ldlm_callback(struct ptlrpc_request *req)
+{
+ struct ldlm_request *dlm_req;
+ struct ldlm_lock *lock;
+ int rc;
+ ENTRY;
+
+ rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
+ req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ if (rc) {
+ CERROR("out of memory\n");
+ req->rq_status = -ENOMEM;
+ RETURN(0);
+ }
+ dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+
+ lock = ldlm_handle2object(&dlm_req->lock_handle1);
+ ldlm_lock_dump(lock);
+
+ req->rq_status = 0;
RETURN(0);
}
}
switch (req->rq_reqmsg->opc) {
+ case LDLM_NAMESPACE_NEW:
+ CDEBUG(D_INODE, "namespace_new\n");
+ OBD_FAIL_RETURN(OBD_FAIL_LDLM_NAMESPACE_NEW, 0);
+ rc = _ldlm_namespace_new(dev, req);
+ break;
+
case LDLM_ENQUEUE:
CDEBUG(D_INODE, "enqueue\n");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
- rc = ldlm_enqueue(req);
+ rc = _ldlm_enqueue(req);
break;
-#if 0
+
case LDLM_CONVERT:
CDEBUG(D_INODE, "convert\n");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
- rc = ldlm_convert(req);
+ rc = _ldlm_convert(req);
break;
case LDLM_CANCEL:
CDEBUG(D_INODE, "cancel\n");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
- rc = ldlm_cancel(req);
+ rc = _ldlm_cancel(req);
break;
case LDLM_CALLBACK:
CDEBUG(D_INODE, "callback\n");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
- rc = ldlm_callback(req);
+ rc = _ldlm_callback(req);
break;
-#endif
default:
- rc = ptlrpc_error(dev, svc, req);
+ rc = ptlrpc_error(svc, req);
RETURN(rc);
}
- EXIT;
out:
- if (rc) {
- CERROR("no header\n");
- return 0;
- }
-
- if( req->rq_status) {
- ptlrpc_error(dev, svc, req);
- } else {
- CDEBUG(D_NET, "sending reply\n");
- ptlrpc_reply(dev, svc, req);
- }
-
- return 0;
+ if (rc)
+ RETURN(ptlrpc_error(svc, req));
+ else
+ RETURN(ptlrpc_reply(svc, req));
}
static int ldlm_iocontrol(int cmd, struct obd_conn *conn, int len, void *karg,
void *uarg)
{
struct obd_device *obddev = conn->oc_dev;
- struct ptlrpc_client cl;
int err;
-
ENTRY;
- if ( _IOC_TYPE(cmd) != IOC_LDLM_TYPE ||
- _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
- _IOC_NR(cmd) > IOC_LDLM_MAX_NR ) {
+ if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
+ _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
CDEBUG(D_IOCTL, "invalid ioctl ( type %d, nr %d, size %d )\n",
- _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
- EXIT;
- return -EINVAL;
- }
-
- ptlrpc_init_client(-1, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, &cl);
- err = ptlrpc_connect_client(-1, "ldlm", &cl);
- if (err) {
- CERROR("cannot create client\n");
+ _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
RETURN(-EINVAL);
}
case IOC_LDLM_TEST: {
err = ldlm_test(obddev);
CERROR("-- done err %d\n", err);
- EXIT;
- break;
+ GOTO(out, err);
}
default:
- err = -EINVAL;
- EXIT;
- break;
+ GOTO(out, err = -EINVAL);
}
+ out:
return err;
}
LBUG();
err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
- if (err)
+ if (err) {
CERROR("cannot start thread\n");
+ LBUG();
+ }
+
+ OBD_ALLOC(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
+ if (ldlm->ldlm_client == NULL)
+ LBUG();
+
+ ptlrpc_init_client(-1, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
+ ldlm->ldlm_client);
+ err = ptlrpc_connect_client("ldlm", ldlm->ldlm_client,
+ &ldlm->ldlm_server_peer);
+ if (err) {
+ CERROR("cannot create client\n");
+ LBUG();
+ }
MOD_INC_USE_COUNT;
RETURN(0);
ldlm_lock(obddev);
- list_for_each_safe(tmp, pos, &obddev->u.ldlm.ldlm_namespaces) {
+ list_for_each_safe(tmp, pos, &obddev->u.ldlm.ldlm_namespaces) {
struct ldlm_namespace *ns;
ns = list_entry(tmp, struct ldlm_namespace, ns_link);
CERROR("Request list not empty!\n");
}
+ OBD_FREE(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service));
if (ldlm_free_all(obddev)) {
MODULE_AUTHOR("Cluster File Systems, Inc. <braam@clusterfs.com>");
MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
-MODULE_LICENSE("GPL");
+MODULE_LICENSE("GPL");
module_init(ldlm_init);
module_exit(ldlm_exit);
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2002 Cluster File Systems, Inc.
+ *
+ * This code is issued under the GNU General Public License.
+ * See the file COPYING in this distribution
+ *
+ * by Cluster File Systems, Inc.
+ */
+
+#define EXPORT_SYMTAB
+
+#define DEBUG_SUBSYSTEM S_LDLM
+
+#include <linux/lustre_dlm.h>
+
+int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct lustre_peer *peer,
+ __u32 ns_id,
+ struct ldlm_handle *parent_lock_handle,
+ __u64 *res_id,
+ __u32 type,
+ struct ldlm_extent *req_ex,
+ ldlm_mode_t mode,
+ int *flags,
+ void *data,
+ __u32 data_len,
+ struct ldlm_handle *lockh,
+ struct ptlrpc_request **request)
+{
+ struct ldlm_request *body;
+ struct ldlm_reply *reply;
+ struct ptlrpc_request *req;
+ char *bufs[2] = {NULL, data};
+ int rc, size[2] = {sizeof(*body), data_len};
+
+#if 0
+ ldlm_local_lock_enqueue(obddev, ns_id, parent_lock_handle, res_id, type,
+ req_ex, mode, flags);
+#endif
+
+ /* FIXME: if this is a local lock, stop here. */
+
+ req = ptlrpc_prep_req(cl, peer, LDLM_ENQUEUE, 2, size, bufs);
+ if (!req)
+ GOTO(out, rc = -ENOMEM);
+
+ /* Dump all of this data into the request buffer */
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ body->lock_desc.l_resource.lr_ns_id = ns_id;
+ body->lock_desc.l_resource.lr_type = type;
+ memcpy(body->lock_desc.l_resource.lr_name, res_id,
+ sizeof(body->lock_desc.l_resource.lr_name));
+
+ body->lock_desc.l_req_mode = mode;
+ if (req_ex)
+ memcpy(&body->lock_desc.l_extent, req_ex,
+ sizeof(body->lock_desc.l_extent));
+ body->flags = *flags;
+
+ /* FIXME: lock_handle1 will be the shadow handle */
+
+ if (parent_lock_handle)
+ memcpy(&body->lock_handle2, parent_lock_handle,
+ sizeof(body->lock_handle2));
+
+ /* Continue as normal. */
+ size[0] = sizeof(*reply);
+ req->rq_replen = lustre_msg_size(1, size);
+
+ rc = ptlrpc_queue_wait(cl, req);
+ rc = ptlrpc_check_status(req, rc);
+ if (rc != ELDLM_OK)
+ GOTO(out, rc);
+
+ reply = lustre_msg_buf(req->rq_repmsg, 0);
+ CERROR("remote handle: %p\n",
+ (void *)(unsigned long)reply->lock_handle.addr);
+ CERROR("extent: %Lu -> %Lu\n", reply->lock_extent.start,
+ reply->lock_extent.end);
+
+ EXIT;
+ out:
+ *request = req;
+ return rc;
+}
+
+int ldlm_cli_namespace_new(struct ptlrpc_client *cl, struct lustre_peer *peer,
+ __u32 ns_id, struct ptlrpc_request **request)
+{
+ struct ldlm_request *body;
+ struct ptlrpc_request *req;
+ int rc, size = sizeof(*body);
+
+ req = ptlrpc_prep_req(cl, peer, LDLM_NAMESPACE_NEW, 1, &size, NULL);
+ if (!req)
+ GOTO(out, rc = -ENOMEM);
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ body->lock_desc.l_resource.lr_ns_id = ns_id;
+
+ req->rq_replen = lustre_msg_size(0, NULL);
+
+ rc = ptlrpc_queue_wait(cl, req);
+ rc = ptlrpc_check_status(req, rc);
+
+ EXIT;
+ out:
+ *request = req;
+ return rc;
+}
+
+int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
+ void *data, __u32 data_len)
+{
+ struct ldlm_request *body;
+ struct ptlrpc_request *req;
+ struct obd_device *obddev = lock->l_resource->lr_namespace->ns_obddev;
+ struct ptlrpc_client *cl = obddev->u.ldlm.ldlm_client;
+ int rc, size[2] = {sizeof(*body), data_len};
+ char *bufs[2] = {NULL, data};
+
+ req = ptlrpc_prep_req(cl, &lock->l_peer, LDLM_CALLBACK, 2, size, bufs);
+ if (!req)
+ GOTO(out, rc = -ENOMEM);
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ memcpy(&body->lock_handle1, &lock->l_remote_handle,
+ sizeof(body->lock_handle1));
+
+ if (new != NULL)
+ ldlm_lock2desc(new, &body->lock_desc);
+
+ req->rq_replen = lustre_msg_size(0, NULL);
+
+ rc = ptlrpc_queue_wait(cl, req);
+ rc = ptlrpc_check_status(req, rc);
+ ptlrpc_free_req(req);
+
+ EXIT;
+ out:
+ return rc;
+}
ns->ns_hash = res_hash;
}
-struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obddev, __u32 id)
+ldlm_error_t ldlm_namespace_new(struct obd_device *obddev, __u32 id,
+ struct ldlm_namespace **ns_out)
{
struct ldlm_namespace *ns;
if (ldlm_namespace_find(obddev, id))
- LBUG();
+ return -ELDLM_NAMESPACE_EXISTS;
OBD_ALLOC(ns, sizeof(*ns));
if (!ns)
LBUG();
ns->ns_id = id;
+ ns->ns_obddev = obddev;
INIT_LIST_HEAD(&ns->ns_root_list);
list_add(&ns->ns_link, &obddev->u.ldlm.ldlm_namespaces);
res_hash_init(ns);
atomic_set(&ns->ns_refcount, 0);
- return ns;
+ *ns_out = ns;
+ return ELDLM_OK;
}
int ldlm_namespace_free(struct ldlm_namespace *ns)
return 0;
}
+void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc)
+{
+ desc->lr_ns_id = res->lr_namespace->ns_id;
+ desc->lr_type = res->lr_type;
+ memcpy(desc->lr_name, res->lr_name, sizeof(desc->lr_name));
+ memcpy(desc->lr_version, res->lr_version, sizeof(desc->lr_version));
+}
+
void ldlm_resource_dump(struct ldlm_resource *res)
{
struct list_head *tmp;
ldlm_lock(obddev);
- ns = ldlm_namespace_new(obddev, 1);
- if (ns == NULL)
+ err = ldlm_namespace_new(obddev, 1, &ns);
+ if (err != ELDLM_OK)
LBUG();
res = ldlm_resource_get(ns, NULL, res_id, LDLM_PLAIN, 1);
LBUG();
/* Get a couple of read locks */
- flags = LDLM_FL_BLOCKING_AST;
err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_PLAIN,
NULL, LCK_CR, &flags, NULL,
ldlm_test_callback, NULL, 0, &lockh_1);
err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_PLAIN,
NULL, LCK_EX, &flags, NULL,
ldlm_test_callback, NULL, 0, &lockh_2);
- if (err != -ELDLM_BLOCK_GRANTED)
+ if (err != ELDLM_OK)
+ LBUG();
+ if (!(flags & LDLM_FL_BLOCK_GRANTED))
LBUG();
ldlm_resource_dump(res);
ldlm_lock(obddev);
- ns = ldlm_namespace_new(obddev, 2);
- if (ns == NULL)
+ err = ldlm_namespace_new(obddev, 2, &ns);
+ if (err != ELDLM_OK)
LBUG();
flags = 0;
err = ldlm_local_lock_enqueue(obddev, 2, NULL, res_id, LDLM_EXTENT,
&ext3, LCK_EX, &flags, NULL, NULL, NULL,
0, &ext3_h);
- if (err != -ELDLM_BLOCK_GRANTED)
+ if (err != ELDLM_OK)
+ LBUG();
+ if (!(flags & LDLM_FL_BLOCK_GRANTED))
LBUG();
if (flags & LDLM_FL_LOCK_CHANGED)
LBUG();
return 0;
}
+static int ldlm_test_network(struct obd_device *obddev)
+{
+ struct ldlm_obd *ldlm = &obddev->u.ldlm;
+ struct ptlrpc_request *request;
+
+ __u64 res_id[RES_NAME_SIZE] = {1, 2, 3};
+ struct ldlm_extent ext = {4, 6};
+ struct ldlm_handle lockh1, lockh2;
+ int flags = 0;
+ ldlm_error_t err;
+
+ err = ldlm_cli_namespace_new(ldlm->ldlm_client, &ldlm->ldlm_server_peer,
+ 3, &request);
+ ptlrpc_free_req(request);
+ CERROR("ldlm_cli_namespace_new: %d\n", err);
+ if (err != ELDLM_OK)
+ GOTO(out, err);
+
+ err = ldlm_cli_enqueue(ldlm->ldlm_client, &ldlm->ldlm_server_peer, 3,
+ NULL, res_id, LDLM_EXTENT, &ext, LCK_PR, &flags,
+ NULL, 0, &lockh1, &request);
+ ptlrpc_free_req(request);
+ CERROR("ldlm_cli_enqueue: %d\n", err);
+
+ flags = 0;
+ err = ldlm_cli_enqueue(ldlm->ldlm_client, &ldlm->ldlm_server_peer, 3,
+ NULL, res_id, LDLM_EXTENT, &ext, LCK_EX, &flags,
+ NULL, 0, &lockh2, &request);
+ ptlrpc_free_req(request);
+ CERROR("ldlm_cli_enqueue: %d\n", err);
+
+ EXIT;
+ out:
+ return err;
+}
+
int ldlm_test(struct obd_device *obddev)
{
- int rc;
+ int rc;
rc = ldlm_test_basics(obddev);
- if (rc)
+ if (rc)
RETURN(rc);
rc = ldlm_test_extents(obddev);
- RETURN(rc);
+ if (rc)
+ RETURN(rc);
+
+ rc = ldlm_test_network(obddev);
+ RETURN(rc);
}
offset = page->index << PAGE_SHIFT;
buf = kmap(page);
- rc = mdc_readpage(&sbi->ll_mds_client, inode->i_ino, S_IFDIR, offset,
- buf, &request);
+ rc = mdc_readpage(&sbi->ll_mds_client, &sbi->ll_mds_peer, inode->i_ino,
+ S_IFDIR, offset, buf, &request);
kunmap(page);
ptlrpc_free_req(request);
EXIT;
GOTO(out, rc = -ENOMEM);
memset(fd, 0, sizeof(*fd));
- rc = mdc_open(&sbi->ll_mds_client, inode->i_ino, S_IFREG,
- file->f_flags, &fd->fd_mdshandle, &req);
+ rc = mdc_open(&sbi->ll_mds_client, &sbi->ll_mds_peer, inode->i_ino,
+ S_IFREG, file->f_flags, &fd->fd_mdshandle, &req);
if (!fd->fd_mdshandle)
CERROR("mdc_open didn't assign fd_mdshandle\n");
rc = -EIO;
}
- rc = mdc_close(&sbi->ll_mds_client, inode->i_ino, S_IFREG,
- fd->fd_mdshandle, &req);
+ rc = mdc_close(&sbi->ll_mds_client, &sbi->ll_mds_peer, inode->i_ino,
+ S_IFREG, fd->fd_mdshandle, &req);
ptlrpc_free_req(req);
if (rc) {
if (rc > 0)
if (!ino)
goto negative;
- err = mdc_getattr(&sbi->ll_mds_client, ino, type,
+ err = mdc_getattr(&sbi->ll_mds_client, &sbi->ll_mds_peer, ino, type,
OBD_MD_FLNOTOBD|OBD_MD_FLBLOCKS, &request);
if (err) {
CERROR("failure %d inode %ld\n", err, ino);
ENTRY;
- err = mdc_create(&sbi->ll_mds_client, dir, name, namelen, tgt, tgtlen,
- mode, id, current->uid, current->gid, time, &request);
+ err = mdc_create(&sbi->ll_mds_client, &sbi->ll_mds_peer, dir, name,
+ namelen, tgt, tgtlen, mode, id, current->uid,
+ current->gid, time, &request);
if (err) {
inode = ERR_PTR(err);
GOTO(out, err);
ENTRY;
- err = mdc_unlink(&sbi->ll_mds_client, dir, child, name, len, &request);
+ err = mdc_unlink(&sbi->ll_mds_client, &sbi->ll_mds_peer, dir, child,
+ name, len, &request);
ptlrpc_free_req(request);
EXIT;
ENTRY;
- err = mdc_link(&sbi->ll_mds_client, src, dir, name, len, &request);
+ err = mdc_link(&sbi->ll_mds_client, &sbi->ll_mds_peer, src, dir, name,
+ len, &request);
ptlrpc_free_req(request);
EXIT;
ENTRY;
- err = mdc_rename(&sbi->ll_mds_client, src, tgt,
+ err = mdc_rename(&sbi->ll_mds_client, &sbi->ll_mds_peer, src, tgt,
old->d_name.name, old->d_name.len,
new->d_name.name, new->d_name.len, &request);
ptlrpc_free_req(request);
/* the first parameter should become an mds device no */
ptlrpc_init_client(-1, MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
&sbi->ll_mds_client);
- err = ptlrpc_connect_client(-1, "mds", &sbi->ll_mds_client);
+ err = ptlrpc_connect_client("mds", &sbi->ll_mds_client,
+ &sbi->ll_mds_peer);
if (err) {
CERROR("cannot find MDS\n");
GOTO(out_disc, sb = NULL);
sb->s_op = &ll_super_operations;
/* make root inode */
- err = mdc_getattr(&sbi->ll_mds_client, sbi->ll_rootino, S_IFDIR,
+ err = mdc_getattr(&sbi->ll_mds_client, &sbi->ll_mds_peer,
+ sbi->ll_rootino, S_IFDIR,
OBD_MD_FLNOTOBD|OBD_MD_FLBLOCKS, &request);
if (err) {
CERROR("mdc_getattr failed for root %d\n", err);
/* change incore inode */
ll_attr2inode(inode, attr, do_trunc);
- err = mdc_setattr(&sbi->ll_mds_client, inode, attr, &request);
+ err = mdc_setattr(&sbi->ll_mds_client, &sbi->ll_mds_peer, inode, attr,
+ &request);
if (err)
CERROR("mdc_setattr fails (%d)\n", err);
return rc;
}
-int mdc_setattr(struct ptlrpc_client *cl, struct inode *inode,
- struct iattr *iattr, struct ptlrpc_request **request)
+int mdc_setattr(struct ptlrpc_client *cl, struct lustre_peer *peer,
+ struct inode *inode, struct iattr *iattr,
+ struct ptlrpc_request **request)
{
struct mds_rec_setattr *rec;
struct ptlrpc_request *req;
int rc, size = sizeof(*rec);
ENTRY;
- req = ptlrpc_prep_req(cl, MDS_REINT, 1, &size, NULL);
+ req = ptlrpc_prep_req(cl, peer, MDS_REINT, 1, &size, NULL);
if (!req)
RETURN(-ENOMEM);
RETURN(rc);
}
-int mdc_create(struct ptlrpc_client *cl, struct inode *dir, const char *name,
- int namelen, const char *tgt, int tgtlen, int mode, __u64 id,
- __u32 uid, __u32 gid, __u64 time,
- struct ptlrpc_request **request)
+int mdc_create(struct ptlrpc_client *cl, struct lustre_peer *peer,
+ struct inode *dir, const char *name, int namelen,
+ const char *tgt, int tgtlen, int mode, __u64 id, __u32 uid,
+ __u32 gid, __u64 time, struct ptlrpc_request **request)
{
struct mds_rec_create *rec;
struct ptlrpc_request *req;
char *tmp;
ENTRY;
- req = ptlrpc_prep_req(cl, MDS_REINT, 3, size, NULL);
+ req = ptlrpc_prep_req(cl, peer, MDS_REINT, 3, size, NULL);
if (!req)
RETURN(-ENOMEM);
RETURN(rc);
}
-int mdc_unlink(struct ptlrpc_client *cl, struct inode *dir,
- struct inode *child, const char *name, int namelen,
- struct ptlrpc_request **request)
+int mdc_unlink(struct ptlrpc_client *cl, struct lustre_peer *peer,
+ struct inode *dir, struct inode *child, const char *name,
+ int namelen, struct ptlrpc_request **request)
{
struct mds_rec_unlink *rec;
struct ptlrpc_request *req;
char *tmp;
ENTRY;
- req = ptlrpc_prep_req(cl, MDS_REINT, 2, size, NULL);
+ req = ptlrpc_prep_req(cl, peer, MDS_REINT, 2, size, NULL);
if (!req)
RETURN(-ENOMEM);
RETURN(rc);
}
-int mdc_link(struct ptlrpc_client *cl, struct dentry *src, struct inode *dir,
- const char *name, int namelen, struct ptlrpc_request **request)
+int mdc_link(struct ptlrpc_client *cl, struct lustre_peer *peer,
+ struct dentry *src, struct inode *dir, const char *name,
+ int namelen, struct ptlrpc_request **request)
{
struct mds_rec_link *rec;
struct ptlrpc_request *req;
char *tmp;
ENTRY;
- req = ptlrpc_prep_req(cl, MDS_REINT, 2, size, NULL);
+ req = ptlrpc_prep_req(cl, peer, MDS_REINT, 2, size, NULL);
if (!req)
RETURN(-ENOMEM);
RETURN(rc);
}
-int mdc_rename(struct ptlrpc_client *cl, struct inode *src, struct inode *tgt,
- const char *old, int oldlen, const char *new, int newlen,
+int mdc_rename(struct ptlrpc_client *cl, struct lustre_peer *peer,
+ struct inode *src, struct inode *tgt, const char *old,
+ int oldlen, const char *new, int newlen,
struct ptlrpc_request **request)
{
struct mds_rec_rename *rec;
char *tmp;
ENTRY;
- req = ptlrpc_prep_req(cl, MDS_REINT, 3, size, NULL);
+ req = ptlrpc_prep_req(cl, peer, MDS_REINT, 3, size, NULL);
if (!req)
RETURN(-ENOMEM);
#define EXPORT_SYMTAB
-#include <linux/config.h>
#include <linux/module.h>
-#include <linux/kernel.h>
-#include <linux/mm.h>
-#include <linux/string.h>
-#include <linux/stat.h>
-#include <linux/errno.h>
-#include <linux/locks.h>
-#include <linux/unistd.h>
-
-#include <asm/system.h>
-#include <asm/uaccess.h>
-#include <linux/module.h>
-
-#include <linux/fs.h>
-#include <linux/stat.h>
-#include <asm/uaccess.h>
-#include <asm/segment.h>
#include <linux/miscdevice.h>
#define DEBUG_SUBSYSTEM S_MDC
-#include <linux/obd_support.h>
-#include <linux/obd_class.h>
-#include <linux/lustre_lib.h>
-#include <linux/lustre_idl.h>
#include <linux/lustre_mds.h>
#define REQUEST_MINOR 244
extern int mds_queue_req(struct ptlrpc_request *);
-int mdc_connect(struct ptlrpc_client *cl, ino_t ino, int type, int valid,
- struct ptlrpc_request **request)
+int mdc_connect(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino,
+ int type, int valid, struct ptlrpc_request **request)
{
struct ptlrpc_request *req;
struct mds_body *body;
int rc, size = sizeof(*body);
ENTRY;
- req = ptlrpc_prep_req(cl, MDS_GETATTR, 1, &size, NULL);
+ req = ptlrpc_prep_req(cl, peer, MDS_GETATTR, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
}
-int mdc_getattr(struct ptlrpc_client *cl, ino_t ino, int type, int valid,
- struct ptlrpc_request **request)
+int mdc_getattr(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino,
+ int type, int valid, struct ptlrpc_request **request)
{
struct ptlrpc_request *req;
struct mds_body *body;
int rc, size = sizeof(*body);
ENTRY;
- req = ptlrpc_prep_req(cl, MDS_GETATTR, 1, &size, NULL);
+ req = ptlrpc_prep_req(cl, peer, MDS_GETATTR, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
return rc;
}
-int mdc_open(struct ptlrpc_client *cl, ino_t ino, int type, int flags,
- __u64 *fh, struct ptlrpc_request **request)
+int mdc_open(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino,
+ int type, int flags, __u64 *fh, struct ptlrpc_request **request)
{
struct mds_body *body;
int rc, size = sizeof(*body);
struct ptlrpc_request *req;
- req = ptlrpc_prep_req(cl, MDS_OPEN, 1, &size, NULL);
+ req = ptlrpc_prep_req(cl, peer, MDS_OPEN, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
return rc;
}
-int mdc_close(struct ptlrpc_client *cl, ino_t ino, int type, __u64 fh,
- struct ptlrpc_request **request)
+int mdc_close(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino,
+ int type, __u64 fh, struct ptlrpc_request **request)
{
struct mds_body *body;
int rc, size = sizeof(*body);
struct ptlrpc_request *req;
- req = ptlrpc_prep_req(cl, MDS_CLOSE, 1, &size, NULL);
+ req = ptlrpc_prep_req(cl, peer, MDS_CLOSE, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
return rc;
}
-int mdc_readpage(struct ptlrpc_client *cl, ino_t ino, int type, __u64 offset,
- char *addr, struct ptlrpc_request **request)
+int mdc_readpage(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino,
+ int type, __u64 offset, char *addr,
+ struct ptlrpc_request **request)
{
struct ptlrpc_request *req = NULL;
struct ptlrpc_bulk_desc *bulk = NULL;
CDEBUG(D_INODE, "inode: %ld\n", ino);
- bulk = ptlrpc_prep_bulk(&cl->cli_server);
+ bulk = ptlrpc_prep_bulk(peer);
if (bulk == NULL) {
CERROR("%s: cannot init bulk desc\n", __FUNCTION__);
rc = -ENOMEM;
goto out;
}
- req = ptlrpc_prep_req(cl, MDS_READPAGE, 2, size, bufs);
+ req = ptlrpc_prep_req(cl, peer, MDS_READPAGE, 2, size, bufs);
if (!req)
GOTO(out, rc = -ENOMEM);
{
int err;
struct ptlrpc_client cl;
+ struct lustre_peer peer;
struct ptlrpc_request *request;
ENTRY;
}
ptlrpc_init_client(-1, MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, &cl);
- err = ptlrpc_connect_client(-1, "mds", &cl);
+ err = ptlrpc_connect_client("mds", &cl, &peer);
if (err) {
CERROR("cannot create client\n");
RETURN(-EINVAL);
switch (cmd) {
case IOC_REQUEST_GETATTR: {
CERROR("-- getting attr for ino %lu\n", arg);
- err = mdc_getattr(&cl, arg, S_IFDIR, ~0, &request);
+ err = mdc_getattr(&cl, &peer, arg, S_IFDIR, ~0, &request);
CERROR("-- done err %d\n", err);
GOTO(out, err);
break;
}
CERROR("-- readpage 0 for ino %lu\n", arg);
- err = mdc_readpage(&cl, arg, S_IFDIR, 0, buf, &request);
+ err = mdc_readpage(&cl, &peer, arg, S_IFDIR, 0, buf, &request);
CERROR("-- done err %d\n", err);
OBD_FREE(buf, PAGE_SIZE);
iattr.ia_atime = 0;
iattr.ia_valid = ATTR_MODE | ATTR_ATIME;
- err = mdc_setattr(&cl, &inode, &iattr, &request);
+ err = mdc_setattr(&cl, &peer, &inode, &iattr, &request);
CERROR("-- done err %d\n", err);
GOTO(out, err);
iattr.ia_atime = 0;
iattr.ia_valid = ATTR_MODE | ATTR_ATIME;
- err = mdc_create(&cl, &inode,
+ err = mdc_create(&cl, &peer, &inode,
"foofile", strlen("foofile"),
NULL, 0, 0100707, 47114711,
11, 47, 0, &request);
__u64 fh, ino;
copy_from_user(&ino, (__u64 *)arg, sizeof(ino));
CERROR("-- opening ino %llu\n", ino);
- err = mdc_open(&cl, ino, S_IFDIR, O_RDONLY, &fh, &request);
+ err = mdc_open(&cl, &peer, ino, S_IFDIR, O_RDONLY, &fh,
+ &request);
copy_to_user((__u64 *)arg, &fh, sizeof(fh));
CERROR("-- done err %d (fh=%Lu)\n", err, fh);
case IOC_REQUEST_CLOSE: {
CERROR("-- closing ino 2, filehandle %lu\n", arg);
- err = mdc_close(&cl, 2, S_IFDIR, arg, &request);
+ err = mdc_close(&cl, &peer, 2, S_IFDIR, arg, &request);
CERROR("-- done err %d\n", err);
GOTO(out, err);
break;
default:
- rc = ptlrpc_error(dev, svc, req);
+ rc = ptlrpc_error(svc, req);
RETURN(rc);
}
EXIT;
out:
if (rc) {
- ptlrpc_error(dev, svc, req);
+ ptlrpc_error(svc, req);
} else {
CDEBUG(D_NET, "sending reply\n");
- ptlrpc_reply(dev, svc, req);
+ ptlrpc_reply(svc, req);
}
return 0;
#include <linux/lustre_net.h>
#include <linux/obd_ost.h>
-struct ptlrpc_client *osc_con2cl(struct obd_conn *conn)
+static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl,
+ struct lustre_peer **peer)
{
struct osc_obd *osc = &conn->oc_dev->u.osc;
- return osc->osc_client;
+ *cl = osc->osc_client;
+ *peer = &osc->osc_peer;
}
static int osc_connect(struct obd_conn *conn)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl = osc_con2cl(conn);
+ struct ptlrpc_client *cl;
+ struct lustre_peer *peer;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
- request = ptlrpc_prep_req(cl, OST_CONNECT, 0, NULL, NULL);
+ osc_con2cl(conn, &cl, &peer);
+ request = ptlrpc_prep_req(cl, peer, OST_CONNECT, 0, NULL, NULL);
if (!request)
RETURN(-ENOMEM);
static int osc_disconnect(struct obd_conn *conn)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl = osc_con2cl(conn);
+ struct ptlrpc_client *cl;
+ struct lustre_peer *peer;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
- request = ptlrpc_prep_req(cl, OST_DISCONNECT, 1, &size, NULL);
+ osc_con2cl(conn, &cl, &peer);
+ request = ptlrpc_prep_req(cl, peer, OST_DISCONNECT, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl = osc_con2cl(conn);
+ struct ptlrpc_client *cl;
+ struct lustre_peer *peer;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
- request = ptlrpc_prep_req(cl, OST_GETATTR, 1, &size, NULL);
+ osc_con2cl(conn, &cl, &peer);
+ request = ptlrpc_prep_req(cl, peer, OST_GETATTR, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
static int osc_open(struct obd_conn *conn, struct obdo *oa)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl = osc_con2cl(conn);
+ struct ptlrpc_client *cl;
+ struct lustre_peer *peer;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
- request = ptlrpc_prep_req(cl, OST_OPEN, 1, &size, NULL);
+ osc_con2cl(conn, &cl, &peer);
+ request = ptlrpc_prep_req(cl, peer, OST_OPEN, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
static int osc_close(struct obd_conn *conn, struct obdo *oa)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl = osc_con2cl(conn);
+ struct ptlrpc_client *cl;
+ struct lustre_peer *peer;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
- request = ptlrpc_prep_req(cl, OST_CLOSE, 1, &size, NULL);
+ osc_con2cl(conn, &cl, &peer);
+ request = ptlrpc_prep_req(cl, peer, OST_CLOSE, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl = osc_con2cl(conn);
+ struct ptlrpc_client *cl;
+ struct lustre_peer *peer;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
- request = ptlrpc_prep_req(cl, OST_SETATTR, 1, &size, NULL);
+ osc_con2cl(conn, &cl, &peer);
+ request = ptlrpc_prep_req(cl, peer, OST_SETATTR, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
static int osc_create(struct obd_conn *conn, struct obdo *oa)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl = osc_con2cl(conn);
+ struct ptlrpc_client *cl;
+ struct lustre_peer *peer;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
CERROR("oa NULL\n");
RETURN(-EINVAL);
}
- request = ptlrpc_prep_req(cl, OST_CREATE, 1, &size, NULL);
+ osc_con2cl(conn, &cl, &peer);
+ request = ptlrpc_prep_req(cl, peer, OST_CREATE, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
obd_off offset)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl = osc_con2cl(conn);
+ struct ptlrpc_client *cl;
+ struct lustre_peer *peer;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
CERROR("oa NULL\n");
RETURN(-EINVAL);
}
- request = ptlrpc_prep_req(cl, OST_PUNCH, 1, &size, NULL);
+ osc_con2cl(conn, &cl, &peer);
+ request = ptlrpc_prep_req(cl, peer, OST_PUNCH, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
{
struct ptlrpc_request *request;
- struct ptlrpc_client *cl = osc_con2cl(conn);
+ struct ptlrpc_client *cl;
+ struct lustre_peer *peer;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
CERROR("oa NULL\n");
RETURN(-EINVAL);
}
- request = ptlrpc_prep_req(cl, OST_DESTROY, 1, &size, NULL);
+ osc_con2cl(conn, &cl, &peer);
+ request = ptlrpc_prep_req(cl, peer, OST_DESTROY, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
struct niobuf *dst, struct niobuf *src)
{
- struct ptlrpc_client *cl = osc_con2cl(conn);
+ struct ptlrpc_client *cl;
+ struct lustre_peer *peer;
+
+ osc_con2cl(conn, &cl, &peer);
if (cl->cli_obd) {
/* local sendpage */
struct ptlrpc_bulk_desc *bulk;
int rc;
- bulk = ptlrpc_prep_bulk(&cl->cli_server);
+ bulk = ptlrpc_prep_bulk(peer);
if (bulk == NULL)
return -ENOMEM;
obd_count *oa_bufs, struct page **buf, obd_size *count,
obd_off *offset, obd_flag *flags)
{
- struct ptlrpc_client *cl = osc_con2cl(conn);
+ struct ptlrpc_client *cl;
+ struct lustre_peer *peer;
struct ptlrpc_request *request;
struct ost_body *body;
struct obd_ioobj ioo;
if (bulk == NULL)
RETURN(-ENOMEM);
- request = ptlrpc_prep_req(cl, OST_BRW, 3, size, NULL);
+ osc_con2cl(conn, &cl, &peer);
+ request = ptlrpc_prep_req(cl, peer, OST_BRW, 3, size, NULL);
if (!request)
GOTO(out, rc = -ENOMEM);
for (pages = 0, i = 0; i < num_oa; i++) {
ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
for (j = 0; j < oa_bufs[i]; j++, pages++) {
- bulk[pages] = ptlrpc_prep_bulk(&cl->cli_server);
+ bulk[pages] = ptlrpc_prep_bulk(peer);
if (bulk[pages] == NULL)
GOTO(out, rc = -ENOMEM);
obd_count *oa_bufs, struct page **buf, obd_size *count,
obd_off *offset, obd_flag *flags)
{
- struct ptlrpc_client *cl = osc_con2cl(conn);
+ struct ptlrpc_client *cl;
+ struct lustre_peer *peer;
struct ptlrpc_request *request;
struct obd_ioobj ioo;
struct ost_body *body;
if (!src)
RETURN(-ENOMEM);
- request = ptlrpc_prep_req(cl, OST_BRW, 3, size, NULL);
+ osc_con2cl(conn, &cl, &peer);
+ request = ptlrpc_prep_req(cl, peer, OST_BRW, 3, size, NULL);
if (!request)
RETURN(-ENOMEM);
body = lustre_msg_buf(request->rq_reqmsg, 0);
ptlrpc_init_client(dev, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
osc->osc_client);
- rc = ptlrpc_connect_client(dev, "ost", osc->osc_client);
+ rc = ptlrpc_connect_client("ost", osc->osc_client, &osc->osc_peer);
if (rc == 0)
MOD_INC_USE_COUNT;
break;
default:
req->rq_status = -ENOTSUPP;
- rc = ptlrpc_error(obddev, svc, req);
+ rc = ptlrpc_error(svc, req);
RETURN(rc);
}
//req->rq_status = rc;
if (rc) {
CERROR("ost: processing error %d\n", rc);
- ptlrpc_error(obddev, svc, req);
+ ptlrpc_error(svc, req);
} else {
CDEBUG(D_INODE, "sending reply\n");
- ptlrpc_reply(obddev, svc, req);
+ ptlrpc_reply(svc, req);
}
return 0;
#include <linux/lustre_net.h>
void ptlrpc_init_client(int dev, int req_portal, int rep_portal,
- struct ptlrpc_client *cl)
+ struct ptlrpc_client *cl)
{
memset(cl, 0, sizeof(*cl));
spin_lock_init(&cl->cli_lock);
sema_init(&cl->cli_rpc_sem, 32);
}
-int ptlrpc_connect_client(int dev, char *uuid, struct ptlrpc_client *cl)
+int ptlrpc_connect_client(char *uuid, struct ptlrpc_client *cl,
+ struct lustre_peer *peer)
{
int err;
cl->cli_epoch++;
- err = kportal_uuid_to_peer(uuid, &cl->cli_server);
+ err = kportal_uuid_to_peer(uuid, peer);
if (err != 0)
CERROR("cannot find peer %s!\n", uuid);
return bulk;
}
-struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, int opcode,
+struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
+ struct lustre_peer *peer, int opcode,
int count, int *lengths, char **bufs)
{
struct ptlrpc_request *request;
RETURN(NULL);
}
request->rq_type = PTL_RPC_REQUEST;
+ memcpy(&request->rq_peer, peer, sizeof(*peer));
request->rq_reqmsg = (struct lustre_msg *)request->rq_reqbuf;
request->rq_reqmsg->opc = HTON__u32(opcode);
request->rq_reqmsg->xid = HTON__u32(request->rq_xid);
return rc;
}
-int ptlrpc_reply(struct obd_device *obddev, struct ptlrpc_service *svc,
- struct ptlrpc_request *req)
+int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req)
{
- struct ptlrpc_request *clnt_req = req->rq_reply_handle;
- ENTRY;
-
- if (req->rq_reply_handle == NULL) {
- /* This is a request that came from the network via portals. */
-
- /* FIXME: we need to increment the count of handled events */
- req->rq_type = PTL_RPC_REPLY;
- req->rq_repmsg->xid = HTON__u32(req->rq_reqmsg->xid);
- req->rq_repmsg->status = HTON__u32(req->rq_status);
- req->rq_reqmsg->type = HTON__u32(req->rq_type);
- ptl_send_buf(req, &req->rq_peer, svc->srv_rep_portal);
- } else {
- /* This is a local request that came from another thread. */
-
- /* move the reply to the client */
- clnt_req->rq_replen = req->rq_replen;
- clnt_req->rq_repbuf = req->rq_repbuf;
- req->rq_repbuf = NULL;
- req->rq_replen = 0;
-
- /* free the request buffer */
- OBD_FREE(req->rq_reqbuf, req->rq_reqlen);
- req->rq_reqbuf = NULL;
-
- /* wake up the client */
- wake_up_interruptible(&clnt_req->rq_wait_for_rep);
- }
-
- RETURN(0);
+ /* FIXME: we need to increment the count of handled events */
+ req->rq_type = PTL_RPC_REPLY;
+ req->rq_repmsg->xid = HTON__u32(req->rq_reqmsg->xid);
+ req->rq_repmsg->status = HTON__u32(req->rq_status);
+ req->rq_reqmsg->type = HTON__u32(req->rq_type);
+ return ptl_send_buf(req, &req->rq_peer, svc->srv_rep_portal);
}
-int ptlrpc_error(struct obd_device *obddev, struct ptlrpc_service *svc,
- struct ptlrpc_request *req)
+int ptlrpc_error(struct ptlrpc_service *svc, struct ptlrpc_request *req)
{
int rc;
ENTRY;
req->rq_repmsg->type = HTON__u32(PTL_RPC_ERR);
- rc = ptlrpc_reply(obddev, svc, req);
+ rc = ptlrpc_reply(svc, req);
RETURN(rc);
}
down(&cl->cli_rpc_sem);
- rc = PtlMEAttach(cl->cli_server.peer_ni, request->rq_reply_portal,
+ rc = PtlMEAttach(request->rq_peer.peer_ni, request->rq_reply_portal,
local_id, request->rq_xid, 0, PTL_UNLINK,
PTL_INS_AFTER, &request->rq_reply_me_h);
if (rc != PTL_OK) {
request->rq_replen, request->rq_xid, request->rq_reply_portal);
list_add(&request->rq_list, &cl->cli_sending_head);
- rc = ptl_send_buf(request, &cl->cli_server, request->rq_req_portal);
+ rc = ptl_send_buf(request, &request->rq_peer, request->rq_req_portal);
RETURN(rc);
cleanup2:
add_uuid self
add_uuid mds
add_uuid ost
+ add_uuid ldlm
quit
EOF
}
insmod $LUSTRE/ldlm/ldlm.o || exit -1
list_mods
+ echo "Press Enter to continue"
+ read
}
del_uuid self
del_uuid mds
del_uuid ost
+del_uuid ldlm
quit
EOF