ldlm_mode_t l_granted_mode;
ldlm_lock_callback l_completion_ast;
ldlm_lock_callback l_blocking_ast;
- struct lustre_peer l_peer;
+ struct ptlrpc_connection *l_connection;
void *l_data;
__u32 l_data_len;
struct ldlm_extent l_extent;
handle->addr = (__u64)(unsigned long)object;
}
-static inline void ldlm_lock(struct obd_device *obddev)
+extern struct list_head ldlm_namespaces;
+extern spinlock_t ldlm_spinlock;
+
+static inline void ldlm_lock(void)
{
- spin_lock(&obddev->u.ldlm.ldlm_lock);
+ spin_lock(&ldlm_spinlock);
}
-static inline void ldlm_unlock(struct obd_device *obddev)
+static inline void ldlm_unlock(void)
{
- spin_unlock(&obddev->u.ldlm.ldlm_lock);
+ spin_unlock(&ldlm_spinlock);
}
extern struct obd_ops ldlm_obd_ops;
/* ldlm_lock.c */
void ldlm_lock_free(struct ldlm_lock *lock);
void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc);
-ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev,
- __u32 ns_id,
- struct ldlm_handle *parent_lock_handle,
- __u64 *res_id,
- __u32 type,
- struct ldlm_extent *req_ex,
+ldlm_error_t ldlm_local_lock_create(__u32 ns_id,
+ struct ldlm_handle *parent_lock_handle,
+ __u64 *res_id,
+ __u32 type,
+ struct ldlm_handle *lockh);
+ldlm_error_t ldlm_local_lock_enqueue(struct ldlm_handle *lockh,
ldlm_mode_t mode,
+ struct ldlm_extent *req_ex,
int *flags,
ldlm_lock_callback completion,
ldlm_lock_callback blocking,
void *data,
- __u32 data_len,
- struct ldlm_handle *lockh);
-ldlm_error_t ldlm_local_lock_convert(struct obd_device *obddev,
- struct ldlm_handle *lockh,
+ __u32 data_len);
+ldlm_error_t ldlm_local_lock_convert(struct ldlm_handle *lockh,
int new_mode, int *flags);
-ldlm_error_t ldlm_local_lock_cancel(struct obd_device *obddev,
- struct ldlm_handle *lockh);
+ldlm_error_t ldlm_local_lock_cancel(struct ldlm_handle *lockh);
void ldlm_lock_dump(struct ldlm_lock *lock);
/* ldlm_test.c */
int ldlm_test(struct obd_device *device);
/* resource.c */
-struct ldlm_namespace *ldlm_namespace_find(struct obd_device *, __u32 id);
+struct ldlm_namespace *ldlm_namespace_find(__u32 id);
ldlm_error_t ldlm_namespace_new(struct obd_device *, __u32 id,
struct ldlm_namespace **);
int ldlm_namespace_free(struct ldlm_namespace *ns);
void ldlm_resource_dump(struct ldlm_resource *res);
/* ldlm_request.c */
-int ldlm_cli_namespace_new(struct ptlrpc_client *, struct lustre_peer *,
+int ldlm_cli_namespace_new(struct obd_device *, struct ptlrpc_client *,
+ struct ptlrpc_connection *,
__u32 ns_id, struct ptlrpc_request **);
-int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct lustre_peer *peer,
+int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *peer,
__u32 ns_id,
struct ldlm_handle *parent_lock_handle,
__u64 *res_id,
struct ptlrpc_request **request);
int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
void *data, __u32 data_len);
+int ldlm_cli_convert(struct ptlrpc_client *, struct ldlm_handle *,
+ int new_mode, int *flags, struct ptlrpc_request **);
+int ldlm_cli_cancel(struct ptlrpc_client *, struct ldlm_handle *,
+ struct ptlrpc_request **);
#endif /* __KERNEL__ */
*/
struct lustre_msg {
+ __u64 conn;
+ __u64 token;
+
__u32 opc;
__u32 xid;
__u32 status;
};
struct ldlm_reply {
+ __u32 flags;
struct ldlm_handle lock_handle;
struct ldlm_extent lock_extent;
};
unsigned long ll_cache_count;
struct semaphore ll_list_mutex;
struct ptlrpc_client ll_mds_client;
- struct lustre_peer ll_mds_peer;
+ struct ptlrpc_connection *ll_mds_conn;
struct ptlrpc_client ll_ost_client;
struct lustre_ha_mgr *ll_ha_mgr;
- struct lustre_peer ll_ost_peer;
+ struct ptlrpc_connection *ll_ost_conn;
};
struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt);
/* llight/request.c */
-int mdc_getattr(struct ptlrpc_client *, struct lustre_peer *, ino_t ino,
+int mdc_getattr(struct ptlrpc_client *, struct ptlrpc_connection *, ino_t ino,
int type, int valid, struct ptlrpc_request **);
-int mdc_setattr(struct ptlrpc_client *, struct lustre_peer *, struct inode *,
- struct iattr *iattr, struct ptlrpc_request **);
-int mdc_open(struct ptlrpc_client *, struct lustre_peer *, ino_t ino, int type,
- int flags, __u64 *fh, struct ptlrpc_request **req);
-int mdc_close(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino,
- int type, __u64 fh, struct ptlrpc_request **req);
-int mdc_readpage(struct ptlrpc_client *, struct lustre_peer *, ino_t ino,
+int mdc_setattr(struct ptlrpc_client *, struct ptlrpc_connection *,
+ struct inode *, struct iattr *iattr, struct ptlrpc_request **);
+int mdc_open(struct ptlrpc_client *, struct ptlrpc_connection *, ino_t ino,
+ int type, int flags, __u64 *fh, struct ptlrpc_request **req);
+int mdc_close(struct ptlrpc_client *cl, struct ptlrpc_connection *peer,
+ ino_t ino, int type, __u64 fh, struct ptlrpc_request **req);
+int mdc_readpage(struct ptlrpc_client *, struct ptlrpc_connection *, ino_t ino,
int type, __u64 offset, char *addr, struct ptlrpc_request **);
-int mdc_create(struct ptlrpc_client *, struct lustre_peer *,
+int mdc_create(struct ptlrpc_client *, struct ptlrpc_connection *,
struct inode *dir, const char *name, int namelen,
const char *tgt, int tgtlen,
int mode, __u64 id, __u32 uid, __u32 gid, __u64 time,
struct ptlrpc_request **);
-int mdc_unlink(struct ptlrpc_client *, struct lustre_peer *, struct inode *dir,
- struct inode *child, const char *name, int namelen,
+int mdc_unlink(struct ptlrpc_client *, struct ptlrpc_connection *,
+ struct inode *dir, struct inode *child, const char *name,
+ int namelen, struct ptlrpc_request **);
+int mdc_link(struct ptlrpc_client *, struct ptlrpc_connection *,
+ struct dentry *src, struct inode *dir, const char *name,
+ int namelen, struct ptlrpc_request **);
+int mdc_rename(struct ptlrpc_client *, struct ptlrpc_connection *,
+ struct inode *src, struct inode *tgt, const char *old,
+ int oldlen, const char *new, int newlen,
struct ptlrpc_request **);
-int mdc_link(struct ptlrpc_client *, struct lustre_peer *, struct dentry *src,
- struct inode *dir, const char *name, int namelen,
- struct ptlrpc_request **);
-int mdc_rename(struct ptlrpc_client *, struct lustre_peer *, struct inode *src,
- struct inode *tgt, const char *old, int oldlen,
- const char *new, int newlen, struct ptlrpc_request **);
int mdc_create_client(char *uuid, struct ptlrpc_client *cl);
struct mds_fs_operations {
#define LDLM_REQUEST_PORTAL 13
#define LDLM_REPLY_PORTAL 14
-#define LDLM_CLI_REQUEST_PORTAL 15
-#define LDLM_CLI_REPLY_PORTAL 16
/* default rpc ring length */
#define RPC_RING_LENGTH 2
#define SVC_LIST 32
#define SVC_SIGNAL 64
+struct ptlrpc_connection {
+ struct list_head c_link;
+ struct lustre_peer c_peer;
+
+ __u32 c_generation; /* changes upon new connection */
+ __u32 c_epoch; /* changes when peer changes */
+ __u32 c_bootcount; /* peer's boot count */
+
+ spinlock_t c_lock;
+ __u32 c_xid_in;
+ __u32 c_xid_out;
+
+ atomic_t c_refcount;
+ __u64 c_token;
+};
+
struct ptlrpc_client {
struct obd_device *cli_obd;
struct list_head cli_sending_head;
__u32 cli_request_portal;
__u32 cli_reply_portal;
- spinlock_t cli_lock;
- __u32 cli_xid;
- __u32 cli_generation; /* changes upon new connection */
- __u32 cli_epoch; /* changes when peer changes */
- __u32 cli_bootcount; /* peer's boot count */
struct semaphore cli_rpc_sem;
struct list_head cli_ha_item;
struct lustre_ha_mgr *cli_ha_mgr;
int rq_status;
int rq_flags;
__u32 rq_connid;
- __u32 rq_xid;
int rq_reqlen;
- char *rq_reqbuf;
struct lustre_msg *rq_reqmsg;
int rq_replen;
- char *rq_repbuf;
struct lustre_msg *rq_repmsg;
char *rq_bulkbuf;
ptl_md_t rq_req_md;
ptl_handle_md_t rq_req_md_h;
- __u32 rq_reply_portal;
- __u32 rq_req_portal;
-
- struct lustre_peer rq_peer;
+ struct ptlrpc_connection *rq_connection;
struct ptlrpc_client *rq_client;
};
struct ptlrpc_bulk_desc {
int b_flags;
- struct lustre_peer b_peer;
+ struct ptlrpc_connection *b_connection;
__u32 b_portal;
char *b_buf;
int b_buflen;
struct ptlrpc_service *svc,
struct ptlrpc_request *req);
-
+/* rpc/connection.c */
+struct ptlrpc_connection *ptlrpc_get_connection(struct lustre_peer *peer);
+int ptlrpc_put_connection(struct ptlrpc_connection *c);
+struct ptlrpc_connection *ptlrpc_connection_addref(struct ptlrpc_connection *);
+void ptlrpc_init_connection(void);
+void ptlrpc_cleanup_connection(void);
/* rpc/niobuf.c */
int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *);
int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *, int portal);
-int ptl_send_buf(struct ptlrpc_request *, struct lustre_peer *, int portal);
+int ptl_send_buf(struct ptlrpc_request *, struct ptlrpc_connection *,
+ int portal);
int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *);
int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *bulk);
int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req);
int ptlrpc_error(struct ptlrpc_service *svc, struct ptlrpc_request *req);
-int ptl_send_rpc(struct ptlrpc_request *request, struct ptlrpc_client *cl);
+int ptl_send_rpc(struct ptlrpc_request *request);
void ptlrpc_link_svc_me(struct ptlrpc_service *service, int i);
/* rpc/client.c */
-void ptlrpc_init_client(struct lustre_ha_mgr *mgr, int req_portal, int rep_portal,
- struct ptlrpc_client *cl);
-int ptlrpc_connect_client(char *uuid, struct ptlrpc_client *cl,
- struct lustre_peer *peer);
-int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req);
-int ptlrpc_queue_req(struct ptlrpc_client *peer, struct ptlrpc_request *req);
+void ptlrpc_init_client(struct lustre_ha_mgr *, int req_portal, int rep_portal,
+ struct ptlrpc_client *);
+struct ptlrpc_connection *ptlrpc_connect_client(char *uuid);
+int ptlrpc_queue_wait(struct ptlrpc_request *req);
struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
- struct lustre_peer *peer, int opcode,
+ struct ptlrpc_connection *u, int opcode,
int count, int *lengths, char **bufs);
void ptlrpc_free_req(struct ptlrpc_request *request);
-struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct lustre_peer *);
+struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *);
int ptlrpc_check_status(struct ptlrpc_request *req, int err);
/* rpc/service.c */
};
/* rpc/pack_generic.c */
-int lustre_pack_msg(int count, int *lens, char **bufs, int *len, char **buf);
+int lustre_pack_msg(int count, int *lens, char **bufs, int *len,
+ struct lustre_msg **msg);
int lustre_msg_size(int count, int *lengths);
-int lustre_unpack_msg(char *buf, int len);
+int lustre_unpack_msg(struct lustre_msg *m, int len);
void *lustre_msg_buf(struct lustre_msg *m, int n);
#endif
struct ldlm_obd {
struct ptlrpc_service *ldlm_service;
struct ptlrpc_client *ldlm_client;
- struct lustre_peer ldlm_server_peer;
-
- struct list_head ldlm_namespaces;
- spinlock_t ldlm_lock;
+ struct ptlrpc_connection *ldlm_server_conn;
};
struct echo_obd {
struct osc_obd {
struct obd_device *osc_tgt;
struct ptlrpc_client *osc_client;
- struct lustre_peer osc_peer;
+ struct ptlrpc_connection *osc_conn;
};
/* corresponds to one of the obd's */
}
static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
- struct ldlm_resource *resource,
- ldlm_mode_t mode)
+ struct ldlm_resource *resource)
{
struct ldlm_lock *lock;
memset(lock, 0, sizeof(*lock));
lock->l_resource = resource;
- lock->l_req_mode = mode;
INIT_LIST_HEAD(&lock->l_children);
if (parent != NULL) {
lock->l_data, lock->l_data_len);
}
-/* XXX: Revisit the error handling; we do not, for example, do
- * ldlm_resource_put()s in our error cases, and we probably leak any allocated
- * memory. */
-ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev,
- __u32 ns_id,
- struct ldlm_handle *parent_lock_handle,
- __u64 *res_id,
- __u32 type,
- struct ldlm_extent *req_ex,
- ldlm_mode_t mode,
- int *flags,
- ldlm_lock_callback completion,
- ldlm_lock_callback blocking,
- void *data,
- __u32 data_len,
- struct ldlm_handle *lockh)
+ldlm_error_t ldlm_local_lock_create(__u32 ns_id,
+ struct ldlm_handle *parent_lock_handle,
+ __u64 *res_id,
+ __u32 type,
+ struct ldlm_handle *lockh)
{
struct ldlm_namespace *ns;
- struct ldlm_resource *res, *parent_res;
+ struct ldlm_resource *res, *parent_res = NULL;
struct ldlm_lock *lock, *parent_lock;
- struct ldlm_extent new_ex;
- int incompat = 0, rc;
- ldlm_res_policy policy;
- ENTRY;
+ ns = ldlm_namespace_find(ns_id);
+ if (ns == NULL || ns->ns_hash == NULL)
+ RETURN(-ELDLM_BAD_NAMESPACE);
parent_lock = ldlm_handle2object(parent_lock_handle);
if (parent_lock)
parent_res = parent_lock->l_resource;
- else
- parent_res = NULL;
-
- ns = ldlm_namespace_find(obddev, ns_id);
- if (ns == NULL || ns->ns_hash == NULL)
- RETURN(-ELDLM_BAD_NAMESPACE);
res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
if (res == NULL)
RETURN(-ENOMEM);
- lock = ldlm_lock_new(parent_lock, res, mode);
+ lock = ldlm_lock_new(parent_lock, res);
if (lock == NULL)
RETURN(-ENOMEM);
- if ((policy = ldlm_res_policy_table[type])) {
- rc = policy(res, req_ex, &new_ex, mode, NULL);
+ ldlm_object2handle(lock, lockh);
+
+ return ELDLM_OK;
+}
+
+/* XXX: Revisit the error handling; we do not, for example, do
+ * ldlm_resource_put()s in our error cases, and we probably leak any allocated
+ * memory. */
+ldlm_error_t ldlm_local_lock_enqueue(struct ldlm_handle *lockh,
+ ldlm_mode_t mode,
+ struct ldlm_extent *req_ex,
+ int *flags,
+ ldlm_lock_callback completion,
+ ldlm_lock_callback blocking,
+ void *data,
+ __u32 data_len)
+{
+ struct ldlm_lock *lock;
+ struct ldlm_extent new_ex;
+ int incompat = 0, rc;
+ ldlm_res_policy policy;
+ ENTRY;
+
+ lock = ldlm_handle2object(lockh);
+ if ((policy = ldlm_res_policy_table[lock->l_resource->lr_type])) {
+ rc = policy(lock->l_resource, req_ex, &new_ex, mode, NULL);
if (rc == ELDLM_LOCK_CHANGED) {
*flags |= LDLM_FL_LOCK_CHANGED;
memcpy(req_ex, &new_ex, sizeof(new_ex));
}
}
- if ((type == LDLM_EXTENT && !req_ex) ||
- (type != LDLM_EXTENT && req_ex))
+ if ((lock->l_resource->lr_type == LDLM_EXTENT && !req_ex) ||
+ (lock->l_resource->lr_type != LDLM_EXTENT && req_ex))
LBUG();
if (req_ex)
memcpy(&lock->l_extent, req_ex, sizeof(*req_ex));
+ lock->l_req_mode = mode;
lock->l_data = data;
lock->l_data_len = data_len;
lock->l_completion_ast = completion;
lock->l_blocking_ast = blocking;
- ldlm_object2handle(lock, lockh);
- spin_lock(&res->lr_lock);
+ spin_lock(&lock->l_resource->lr_lock);
/* FIXME: We may want to optimize by checking lr_most_restr */
- if (!list_empty(&res->lr_converting)) {
- ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
+ if (!list_empty(&lock->l_resource->lr_converting)) {
+ ldlm_resource_add_lock(lock->l_resource,
+ lock->l_resource->lr_waiting.prev, lock);
*flags |= LDLM_FL_BLOCK_CONV;
GOTO(out, ELDLM_OK);
}
- if (!list_empty(&res->lr_waiting)) {
- ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
+ if (!list_empty(&lock->l_resource->lr_waiting)) {
+ ldlm_resource_add_lock(lock->l_resource,
+ lock->l_resource->lr_waiting.prev, lock);
*flags |= LDLM_FL_BLOCK_WAIT;
GOTO(out, ELDLM_OK);
}
incompat = ldlm_lock_compat(lock);
if (incompat) {
- ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
+ ldlm_resource_add_lock(lock->l_resource,
+ lock->l_resource->lr_waiting.prev, lock);
*flags |= LDLM_FL_BLOCK_GRANTED;
GOTO(out, ELDLM_OK);
}
- ldlm_grant_lock(res, lock);
+ ldlm_grant_lock(lock->l_resource, lock);
EXIT;
out:
- spin_unlock(&res->lr_lock);
+ spin_unlock(&lock->l_resource->lr_lock);
return ELDLM_OK;
}
ldlm_reprocess_queue(res, &res->lr_waiting);
}
-ldlm_error_t ldlm_local_lock_cancel(struct obd_device *obddev,
- struct ldlm_handle *lockh)
+ldlm_error_t ldlm_local_lock_cancel(struct ldlm_handle *lockh)
{
struct ldlm_lock *lock;
struct ldlm_resource *res;
RETURN(ELDLM_OK);
}
-ldlm_error_t ldlm_local_lock_convert(struct obd_device *obddev,
- struct ldlm_handle *lockh,
+ldlm_error_t ldlm_local_lock_convert(struct ldlm_handle *lockh,
int new_mode, int *flags)
{
struct ldlm_lock *lock;
ldlm_error_t err;
ENTRY;
- rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
- req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc) {
CERROR("out of memory\n");
req->rq_status = -ENOMEM;
struct ldlm_request *dlm_req;
int rc, size = sizeof(*dlm_rep);
ldlm_error_t err;
+ struct ldlm_lock *lock;
ENTRY;
- rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf);
- req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc) {
CERROR("out of memory\n");
req->rq_status = -ENOMEM;
memcpy(&dlm_rep->lock_extent, &dlm_req->lock_desc.l_extent,
sizeof(dlm_rep->lock_extent));
+ dlm_rep->flags = dlm_req->flags;
+
+ err = ldlm_local_lock_create(dlm_req->lock_desc.l_resource.lr_ns_id,
+ &dlm_req->lock_handle2,
+ dlm_req->lock_desc.l_resource.lr_name,
+ dlm_req->lock_desc.l_resource.lr_type,
+ &dlm_rep->lock_handle);
+ if (err != ELDLM_OK)
+ GOTO(out, err);
- err = ldlm_local_lock_enqueue(req->rq_obd,
- dlm_req->lock_desc.l_resource.lr_ns_id,
- &dlm_req->lock_handle2,
- dlm_req->lock_desc.l_resource.lr_name,
- dlm_req->lock_desc.l_resource.lr_type,
- &dlm_rep->lock_extent,
+ err = ldlm_local_lock_enqueue(&dlm_rep->lock_handle,
dlm_req->lock_desc.l_req_mode,
- &dlm_req->flags,
- //ldlm_cli_callback,
- NULL,
+ &dlm_rep->lock_extent,
+ &dlm_rep->flags,
+ NULL, //ldlm_cli_callback,
ldlm_cli_callback,
lustre_msg_buf(req->rq_reqmsg, 1),
- req->rq_reqmsg->buflens[1],
- &dlm_rep->lock_handle);
- if (err != -ENOMEM && err != -ELDLM_BAD_NAMESPACE) {
- struct ldlm_lock *lock;
- lock = ldlm_handle2object(&dlm_rep->lock_handle);
- memcpy(&lock->l_peer, &req->rq_peer, sizeof(lock->l_peer));
- memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
- sizeof(lock->l_remote_handle));
- }
- req->rq_status = err;
+ req->rq_reqmsg->buflens[1]);
+ if (err != ELDLM_OK)
+ GOTO(out, err);
+ lock = ldlm_handle2object(&dlm_rep->lock_handle);
+ memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
+ sizeof(lock->l_remote_handle));
+ lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
+ EXIT;
+ out:
+ req->rq_status = err;
CERROR("err = %d\n", err);
- RETURN(0);
+ return 0;
}
static int _ldlm_convert(struct ptlrpc_request *req)
int rc;
ENTRY;
- rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
- req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc) {
CERROR("out of memory\n");
req->rq_status = -ENOMEM;
dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
req->rq_status =
- ldlm_local_lock_convert(req->rq_obd, &dlm_req->lock_handle1,
+ ldlm_local_lock_convert(&dlm_req->lock_handle1,
dlm_req->lock_desc.l_req_mode,
&dlm_req->flags);
RETURN(0);
int rc;
ENTRY;
- rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
- req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc) {
CERROR("out of memory\n");
req->rq_status = -ENOMEM;
}
dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
- req->rq_status =
- ldlm_local_lock_cancel(req->rq_obd, &dlm_req->lock_handle1);
+ req->rq_status = ldlm_local_lock_cancel(&dlm_req->lock_handle1);
RETURN(0);
}
int rc;
ENTRY;
- rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
- req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc) {
CERROR("out of memory\n");
req->rq_status = -ENOMEM;
int rc;
ENTRY;
- rc = lustre_unpack_msg(req->rq_reqbuf, req->rq_reqlen);
- req->rq_reqmsg = (struct lustre_msg *)req->rq_reqbuf;
+ rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
if (rc) {
CERROR("lustre_ldlm: Invalid request\n");
GOTO(out, rc);
int err;
ENTRY;
- INIT_LIST_HEAD(&obddev->u.ldlm.ldlm_namespaces);
- obddev->u.ldlm.ldlm_lock = SPIN_LOCK_UNLOCKED;
+ INIT_LIST_HEAD(&ldlm_namespaces);
+ ldlm_spinlock = SPIN_LOCK_UNLOCKED;
ldlm->ldlm_service = ptlrpc_init_svc(64 * 1024,
LDLM_REQUEST_PORTAL,
if (ldlm->ldlm_client == NULL)
LBUG();
- ptlrpc_init_client(-1, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
+ ptlrpc_init_client(NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
ldlm->ldlm_client);
- err = ptlrpc_connect_client("ldlm", ldlm->ldlm_client,
- &ldlm->ldlm_server_peer);
- if (err) {
+ ldlm->ldlm_server_conn = ptlrpc_connect_client("ldlm");
+ if (!ldlm->ldlm_server_conn) {
CERROR("cannot create client\n");
LBUG();
}
struct list_head *tmp, *pos;
int rc = 0;
- ldlm_lock(obddev);
+ ldlm_lock();
- list_for_each_safe(tmp, pos, &obddev->u.ldlm.ldlm_namespaces) {
+ list_for_each_safe(tmp, pos, &ldlm_namespaces) {
struct ldlm_namespace *ns;
ns = list_entry(tmp, struct ldlm_namespace, ns_link);
rc |= do_free_namespace(ns);
}
- ldlm_unlock(obddev);
+ ldlm_unlock();
return rc;
}
OBD_FREE(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service));
+ ptlrpc_put_connection(ldlm->ldlm_server_conn);
if (ldlm_free_all(obddev)) {
CERROR("ldlm_free_all could not complete.\n");
#include <linux/lustre_dlm.h>
-int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct lustre_peer *peer,
+int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
__u32 ns_id,
struct ldlm_handle *parent_lock_handle,
__u64 *res_id,
struct ldlm_handle *lockh,
struct ptlrpc_request **request)
{
+ struct ldlm_handle local_lockh;
+ struct ldlm_lock *lock;
struct ldlm_request *body;
struct ldlm_reply *reply;
- struct ptlrpc_request *req;
+ struct ptlrpc_request *req = NULL;
char *bufs[2] = {NULL, data};
int rc, size[2] = {sizeof(*body), data_len};
+ ldlm_error_t err;
+ ENTRY;
-#if 0
- ldlm_local_lock_enqueue(obddev, ns_id, parent_lock_handle, res_id, type,
- req_ex, mode, flags);
-#endif
+ err = ldlm_local_lock_create(ns_id, parent_lock_handle, res_id,
+ type, &local_lockh);
+ if (err != ELDLM_OK)
+ RETURN(err);
- /* FIXME: if this is a local lock, stop here. */
+ /* Is this lock locally managed? */
+ if (conn == NULL)
+ GOTO(local, 0);
- req = ptlrpc_prep_req(cl, peer, LDLM_ENQUEUE, 2, size, bufs);
+ req = ptlrpc_prep_req(cl, conn, LDLM_ENQUEUE, 2, size, bufs);
if (!req)
GOTO(out, rc = -ENOMEM);
sizeof(body->lock_desc.l_extent));
body->flags = *flags;
- /* FIXME: lock_handle1 will be the shadow handle */
+ memcpy(&body->lock_handle1, &local_lockh, sizeof(body->lock_handle1));
if (parent_lock_handle)
memcpy(&body->lock_handle2, parent_lock_handle,
size[0] = sizeof(*reply);
req->rq_replen = lustre_msg_size(1, size);
- rc = ptlrpc_queue_wait(cl, req);
+ rc = ptlrpc_queue_wait(req);
rc = ptlrpc_check_status(req, rc);
if (rc != ELDLM_OK)
GOTO(out, rc);
reply = lustre_msg_buf(req->rq_repmsg, 0);
- CERROR("remote handle: %p\n",
- (void *)(unsigned long)reply->lock_handle.addr);
+ lock = ldlm_handle2object(&local_lockh);
+ memcpy(&lock->l_remote_handle, &reply->lock_handle,
+ sizeof(lock->l_remote_handle));
+ *flags = reply->flags;
+
+ CERROR("remote handle: %p, flags: %d\n",
+ (void *)(unsigned long)reply->lock_handle.addr, *flags);
CERROR("extent: %Lu -> %Lu\n", reply->lock_extent.start,
reply->lock_extent.end);
EXIT;
+ local:
+ rc = ldlm_local_lock_enqueue(&local_lockh, mode, req_ex, flags, NULL,
+ NULL, data, data_len);
out:
*request = req;
return rc;
}
-int ldlm_cli_namespace_new(struct ptlrpc_client *cl, struct lustre_peer *peer,
- __u32 ns_id, struct ptlrpc_request **request)
+int ldlm_cli_namespace_new(struct obd_device *obddev, struct ptlrpc_client *cl,
+ struct ptlrpc_connection *conn, __u32 ns_id,
+ struct ptlrpc_request **request)
{
+ struct ldlm_namespace *ns;
struct ldlm_request *body;
struct ptlrpc_request *req;
int rc, size = sizeof(*body);
+ int err;
- req = ptlrpc_prep_req(cl, peer, LDLM_NAMESPACE_NEW, 1, &size, NULL);
+ if (conn == NULL)
+ GOTO(local, 0);
+
+ req = ptlrpc_prep_req(cl, conn, LDLM_NAMESPACE_NEW, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
req->rq_replen = lustre_msg_size(0, NULL);
- rc = ptlrpc_queue_wait(cl, req);
+ rc = ptlrpc_queue_wait(req);
rc = ptlrpc_check_status(req, rc);
+ if (rc)
+ GOTO(out, rc);
EXIT;
+ local:
+ err = ldlm_namespace_new(obddev, ns_id, &ns);
+ if (err != ELDLM_OK) {
+ /* XXX: It succeeded remotely but failed locally. What to do? */
+ return err;
+ }
out:
*request = req;
return rc;
int rc, size[2] = {sizeof(*body), data_len};
char *bufs[2] = {NULL, data};
- req = ptlrpc_prep_req(cl, &lock->l_peer, LDLM_CALLBACK, 2, size, bufs);
+ req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 2, size,
+ bufs);
if (!req)
GOTO(out, rc = -ENOMEM);
req->rq_replen = lustre_msg_size(0, NULL);
- rc = ptlrpc_queue_wait(cl, req);
+ rc = ptlrpc_queue_wait(req);
rc = ptlrpc_check_status(req, rc);
ptlrpc_free_req(req);
out:
return rc;
}
+
+int ldlm_cli_convert(struct ptlrpc_client *cl, struct ldlm_handle *lockh,
+ int new_mode, int *flags, struct ptlrpc_request **request)
+{
+ struct ldlm_request *body;
+ struct ldlm_reply *reply;
+ struct ldlm_lock *lock;
+ struct ptlrpc_request *req;
+ int rc, size[2] = {sizeof(*body), lock->l_data_len};
+ char *bufs[2] = {NULL, lock->l_data};
+
+ lock = ldlm_handle2object(lockh);
+
+ if (lock->l_connection == NULL)
+ GOTO(local, 0);
+
+ req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CONVERT, 2, size,
+ bufs);
+ if (!req)
+ GOTO(out, rc = -ENOMEM);
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ memcpy(&body->lock_handle1, &lock->l_remote_handle,
+ sizeof(body->lock_handle1));
+
+ body->lock_desc.l_req_mode = new_mode;
+ body->flags = *flags;
+
+ req->rq_replen = lustre_msg_size(1, size);
+
+ rc = ptlrpc_queue_wait(req);
+ rc = ptlrpc_check_status(req, rc);
+ if (rc != ELDLM_OK)
+ GOTO(out, rc);
+
+ reply = lustre_msg_buf(req->rq_repmsg, 0);
+ *flags = reply->flags;
+
+ EXIT;
+ local:
+ rc = ldlm_local_lock_convert(lockh, new_mode, flags);
+ out:
+ *request = req;
+ return rc;
+}
+
+int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_handle *lockh,
+ struct ptlrpc_request **request)
+{
+ struct ldlm_request *body;
+ struct ldlm_lock *lock;
+ struct ptlrpc_request *req;
+ int rc, size[2] = {sizeof(*body), lock->l_data_len};
+ char *bufs[2] = {NULL, lock->l_data};
+
+ lock = ldlm_handle2object(lockh);
+
+ if (lock->l_connection == NULL)
+ GOTO(local, 0);
+
+ req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CANCEL, 2, size,
+ bufs);
+ if (!req)
+ GOTO(out, rc = -ENOMEM);
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ memcpy(&body->lock_handle1, &lock->l_remote_handle,
+ sizeof(body->lock_handle1));
+
+ req->rq_replen = lustre_msg_size(0, NULL);
+
+ rc = ptlrpc_queue_wait(req);
+ rc = ptlrpc_check_status(req, rc);
+ if (rc != ELDLM_OK)
+ GOTO(out, rc);
+
+ EXIT;
+ local:
+ rc = ldlm_local_lock_cancel(lockh);
+ out:
+ *request = req;
+ return rc;
+}
kmem_cache_t *ldlm_resource_slab;
kmem_cache_t *ldlm_lock_slab;
-struct ldlm_namespace *ldlm_namespace_find(struct obd_device *obddev, __u32 id)
+struct list_head ldlm_namespaces;
+spinlock_t ldlm_spinlock;
+
+struct ldlm_namespace *ldlm_namespace_find(__u32 id)
{
struct list_head *tmp;
struct ldlm_namespace *res;
res = NULL;
- list_for_each(tmp, &obddev->u.ldlm.ldlm_namespaces) {
+ list_for_each(tmp, &ldlm_namespaces) {
struct ldlm_namespace *chk;
chk = list_entry(tmp, struct ldlm_namespace, ns_link);
return res;
}
-/* this must be called with ldlm_lock(obddev) held */
+/* this must be called with ldlm_lock() held */
static void res_hash_init(struct ldlm_namespace *ns)
{
struct list_head *res_hash;
{
struct ldlm_namespace *ns;
- if (ldlm_namespace_find(obddev, id))
+ if (ldlm_namespace_find(id))
return -ELDLM_NAMESPACE_EXISTS;
OBD_ALLOC(ns, sizeof(*ns));
ns->ns_obddev = obddev;
INIT_LIST_HEAD(&ns->ns_root_list);
- list_add(&ns->ns_link, &obddev->u.ldlm.ldlm_namespaces);
+ list_add(&ns->ns_link, &ldlm_namespaces);
res_hash_init(ns);
atomic_set(&ns->ns_refcount, 0);
return res;
}
-/* ldlm_lock(obddev) must be taken before calling resource_add */
+/* ldlm_lock() must be taken before calling resource_add */
static struct ldlm_resource *ldlm_resource_add(struct ldlm_namespace *ns,
struct ldlm_resource *parent,
__u64 *name, __u32 type)
struct ldlm_handle lockh_1, lockh_2;
int flags;
- ldlm_lock(obddev);
+ ldlm_lock();
err = ldlm_namespace_new(obddev, 1, &ns);
if (err != ELDLM_OK)
LBUG();
- res = ldlm_resource_get(ns, NULL, res_id, LDLM_PLAIN, 1);
- if (res == NULL)
- LBUG();
-
- /* Get a couple of read locks */
- err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_PLAIN,
- NULL, LCK_CR, &flags, NULL,
- ldlm_test_callback, NULL, 0, &lockh_1);
+ err = ldlm_local_lock_create(1, NULL, res_id, LDLM_PLAIN, &lockh_1);
+ err = ldlm_local_lock_enqueue(&lockh_1, LCK_CR, NULL, &flags, NULL,
+ ldlm_test_callback, NULL, 0);
if (err != ELDLM_OK)
LBUG();
- err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_PLAIN,
- NULL, LCK_EX, &flags, NULL,
- ldlm_test_callback, NULL, 0, &lockh_2);
+ err = ldlm_local_lock_create(1, NULL, res_id, LDLM_PLAIN, &lockh_2);
+ err = ldlm_local_lock_enqueue(&lockh_2, LCK_EX, NULL, &flags, NULL,
+ ldlm_test_callback, NULL, 0);
if (err != ELDLM_OK)
LBUG();
if (!(flags & LDLM_FL_BLOCK_GRANTED))
LBUG();
+ res = ldlm_resource_get(ns, NULL, res_id, LDLM_PLAIN, 1);
+ if (res == NULL)
+ LBUG();
ldlm_resource_dump(res);
- err = ldlm_local_lock_convert(obddev, &lockh_1, LCK_NL, &flags);
+ err = ldlm_local_lock_convert(&lockh_1, LCK_NL, &flags);
if (err != ELDLM_OK)
LBUG();
ldlm_resource_dump(res);
-
- ldlm_unlock(obddev);
+ ldlm_unlock();
return 0;
}
ldlm_error_t err;
int flags;
- ldlm_lock(obddev);
+ ldlm_lock();
err = ldlm_namespace_new(obddev, 2, &ns);
if (err != ELDLM_OK)
LBUG();
flags = 0;
- err = ldlm_local_lock_enqueue(obddev, 2, NULL, res_id, LDLM_EXTENT,
- &ext1, LCK_PR, &flags, NULL, NULL, NULL,
- 0, &ext1_h);
+ err = ldlm_local_lock_create(2, NULL, res_id, LDLM_EXTENT, &ext1_h);
+ err = ldlm_local_lock_enqueue(&ext1_h, LCK_PR, &ext1, &flags, NULL,
+ NULL, NULL, 0);
if (err != ELDLM_OK)
LBUG();
if (!(flags & LDLM_FL_LOCK_CHANGED))
LBUG();
flags = 0;
- err = ldlm_local_lock_enqueue(obddev, 2, NULL, res_id, LDLM_EXTENT,
- &ext2, LCK_PR, &flags, NULL, NULL, NULL,
- 0, &ext2_h);
+ err = ldlm_local_lock_create(2, NULL, res_id, LDLM_EXTENT, &ext2_h);
+ err = ldlm_local_lock_enqueue(&ext2_h, LCK_PR, &ext2, &flags, NULL,
+ NULL, NULL, 0);
if (err != ELDLM_OK)
LBUG();
if (!(flags & LDLM_FL_LOCK_CHANGED))
LBUG();
flags = 0;
- err = ldlm_local_lock_enqueue(obddev, 2, NULL, res_id, LDLM_EXTENT,
- &ext3, LCK_EX, &flags, NULL, NULL, NULL,
- 0, &ext3_h);
+ err = ldlm_local_lock_create(2, NULL, res_id, LDLM_EXTENT, &ext3_h);
+ err = ldlm_local_lock_enqueue(&ext3_h, LCK_EX, &ext3, &flags, NULL,
+ NULL, NULL, 0);
if (err != ELDLM_OK)
LBUG();
if (!(flags & LDLM_FL_BLOCK_GRANTED))
/* Convert/cancel blocking locks */
flags = 0;
- err = ldlm_local_lock_convert(obddev, &ext1_h, LCK_NL, &flags);
+ err = ldlm_local_lock_convert(&ext1_h, LCK_NL, &flags);
if (err != ELDLM_OK)
LBUG();
flags = 0;
- err = ldlm_local_lock_cancel(obddev, &ext2_h);
+ err = ldlm_local_lock_cancel(&ext2_h);
if (err != ELDLM_OK)
LBUG();
LBUG();
ldlm_resource_dump(res);
- ldlm_unlock(obddev);
+ ldlm_unlock();
return 0;
}
int flags = 0;
ldlm_error_t err;
- err = ldlm_cli_namespace_new(ldlm->ldlm_client, &ldlm->ldlm_server_peer,
- 3, &request);
+ err = ldlm_cli_namespace_new(obddev, ldlm->ldlm_client,
+ ldlm->ldlm_server_conn, 3, &request);
ptlrpc_free_req(request);
CERROR("ldlm_cli_namespace_new: %d\n", err);
if (err != ELDLM_OK)
GOTO(out, err);
- err = ldlm_cli_enqueue(ldlm->ldlm_client, &ldlm->ldlm_server_peer, 3,
+ err = ldlm_cli_enqueue(ldlm->ldlm_client, ldlm->ldlm_server_conn, 3,
NULL, res_id, LDLM_EXTENT, &ext, LCK_PR, &flags,
NULL, 0, &lockh1, &request);
ptlrpc_free_req(request);
CERROR("ldlm_cli_enqueue: %d\n", err);
flags = 0;
- err = ldlm_cli_enqueue(ldlm->ldlm_client, &ldlm->ldlm_server_peer, 3,
+ err = ldlm_cli_enqueue(ldlm->ldlm_client, ldlm->ldlm_server_conn, 3,
NULL, res_id, LDLM_EXTENT, &ext, LCK_EX, &flags,
NULL, 0, &lockh2, &request);
ptlrpc_free_req(request);
char *buf;
__u64 offset;
int rc = 0;
- struct ptlrpc_request *request;
+ struct ptlrpc_request *request = NULL;
ENTRY;
offset = page->index << PAGE_SHIFT;
buf = kmap(page);
- rc = mdc_readpage(&sbi->ll_mds_client, &sbi->ll_mds_peer, inode->i_ino,
+ rc = mdc_readpage(&sbi->ll_mds_client, sbi->ll_mds_conn, inode->i_ino,
S_IFDIR, offset, buf, &request);
kunmap(page);
ptlrpc_free_req(request);
static int ll_file_open(struct inode *inode, struct file *file)
{
int rc;
- struct ptlrpc_request *req;
+ struct ptlrpc_request *req = NULL;
struct ll_file_data *fd;
struct obdo *oa;
struct ll_sb_info *sbi = ll_i2sbi(inode);
GOTO(out, rc = -ENOMEM);
memset(fd, 0, sizeof(*fd));
- rc = mdc_open(&sbi->ll_mds_client, &sbi->ll_mds_peer, inode->i_ino,
+ rc = mdc_open(&sbi->ll_mds_client, sbi->ll_mds_conn, inode->i_ino,
S_IFREG, file->f_flags, &fd->fd_mdshandle, &req);
if (!fd->fd_mdshandle)
CERROR("mdc_open didn't assign fd_mdshandle\n");
static int ll_file_release(struct inode *inode, struct file *file)
{
int rc;
- struct ptlrpc_request *req;
+ struct ptlrpc_request *req = NULL;
struct ll_file_data *fd;
struct obdo *oa;
struct ll_sb_info *sbi = ll_i2sbi(inode);
rc = -EIO;
}
- rc = mdc_close(&sbi->ll_mds_client, &sbi->ll_mds_peer, inode->i_ino,
- S_IFREG, fd->fd_mdshandle, &req);
+ rc = mdc_close(&sbi->ll_mds_client, sbi->ll_mds_conn, inode->i_ino,
+ S_IFREG, fd->fd_mdshandle, &req);
ptlrpc_free_req(req);
if (rc) {
if (rc > 0)
static struct dentry *ll_lookup(struct inode * dir, struct dentry *dentry)
{
- struct ptlrpc_request *request;
+ struct ptlrpc_request *request = NULL;
struct inode * inode = NULL;
struct ll_sb_info *sbi = ll_i2sbi(dir);
int err;
ENTRY;
if (dentry->d_name.len > EXT2_NAME_LEN)
- return ERR_PTR(-ENAMETOOLONG);
+ RETURN(ERR_PTR(-ENAMETOOLONG));
ino = ll_inode_by_name(dir, dentry, &type);
if (!ino)
- goto negative;
+ GOTO(negative, NULL);
- err = mdc_getattr(&sbi->ll_mds_client, &sbi->ll_mds_peer, ino, type,
+ err = mdc_getattr(&sbi->ll_mds_client, sbi->ll_mds_conn, ino, type,
OBD_MD_FLNOTOBD|OBD_MD_FLBLOCKS, &request);
if (err) {
CERROR("failure %d inode %ld\n", err, ino);
ptlrpc_free_req(request);
- EXIT;
- return ERR_PTR(-abs(err));
+ RETURN(ERR_PTR(-abs(err)));
}
inode = iget4(dir->i_sb, ino, ll_find_inode,
ptlrpc_free_req(request);
if (!inode)
- return ERR_PTR(-ENOMEM);
+ RETURN(ERR_PTR(-ENOMEM));
+ EXIT;
negative:
d_add(dentry, inode);
return NULL;
int mode, __u64 id)
{
struct inode *inode;
- struct ptlrpc_request *request;
+ struct ptlrpc_request *request = NULL;
struct mds_body *body;
int err;
time_t time = CURRENT_TIME;
ENTRY;
- err = mdc_create(&sbi->ll_mds_client, &sbi->ll_mds_peer, dir, name,
+ err = mdc_create(&sbi->ll_mds_client, sbi->ll_mds_conn, dir, name,
namelen, tgt, tgtlen, mode, id, current->uid,
current->gid, time, &request);
if (err) {
int ll_mdc_unlink(struct inode *dir, struct inode *child,
const char *name, int len)
{
- struct ptlrpc_request *request;
+ struct ptlrpc_request *request = NULL;
int err;
struct ll_sb_info *sbi = ll_i2sbi(dir);
ENTRY;
- err = mdc_unlink(&sbi->ll_mds_client, &sbi->ll_mds_peer, dir, child,
+ err = mdc_unlink(&sbi->ll_mds_client, sbi->ll_mds_conn, dir, child,
name, len, &request);
ptlrpc_free_req(request);
int ll_mdc_link(struct dentry *src, struct inode *dir,
const char *name, int len)
{
- struct ptlrpc_request *request;
+ struct ptlrpc_request *request = NULL;
int err;
struct ll_sb_info *sbi = ll_i2sbi(dir);
ENTRY;
- err = mdc_link(&sbi->ll_mds_client, &sbi->ll_mds_peer, src, dir, name,
+ err = mdc_link(&sbi->ll_mds_client, sbi->ll_mds_conn, src, dir, name,
len, &request);
ptlrpc_free_req(request);
int ll_mdc_rename(struct inode *src, struct inode *tgt,
struct dentry *old, struct dentry *new)
{
- struct ptlrpc_request *request;
+ struct ptlrpc_request *request = NULL;
int err;
struct ll_sb_info *sbi = ll_i2sbi(src);
ENTRY;
- err = mdc_rename(&sbi->ll_mds_client, &sbi->ll_mds_peer, src, tgt,
+ err = mdc_rename(&sbi->ll_mds_client, sbi->ll_mds_conn, src, tgt,
old->d_name.name, old->d_name.len,
new->d_name.name, new->d_name.len, &request);
ptlrpc_free_req(request);
-/*
- * Lustre Light I/O Page Cache
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Lustre Lite I/O Page Cache
*
- * Copyright (C) 2002, Cluster File Systems, Inc.
+ * Copyright (C) 2002 Cluster File Systems, Inc.
*/
#include <linux/config.h>
#define DEBUG_SUBSYSTEM S_LLITE
-#include <linux/obd_support.h>
-#include <linux/obd_class.h>
-#include <linux/lustre_lib.h>
-#include <linux/lustre_idl.h>
#include <linux/lustre_mds.h>
#include <linux/lustre_lite.h>
-
#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,4,10))
/*
* Add a page to the dirty page list.
obd_off offset = ((obd_off)page->index) << PAGE_SHIFT;
obd_flag flags = create ? OBD_BRW_CREATE : 0;
int err;
-
ENTRY;
oa = ll_oa_from_inode(inode, OBD_MD_FLNOTOBD);
- if (!oa) {
- return -ENOMEM;
- }
+ if (!oa)
+ RETURN(-ENOMEM);
+
err = obd_brw(rw, ll_i2obdconn(inode), num_obdo, &oa, &bufs_per_obdo,
- &page, &count, &offset, &flags);
+ &page, &count, &offset, &flags);
obdo_free(oa);
- EXIT;
- return err;
+ RETURN(err);
} /* ll_brw */
extern void set_page_clean(struct page *);
{
struct inode *inode = page->mapping->host;
int rc = 0;
-
ENTRY;
if (!PageLocked(page))
<= page->index) {
memset(kmap(page), 0, PAGE_CACHE_SIZE);
kunmap(page);
- EXIT;
- goto readpage_out;
+ GOTO(readpage_out, rc);
}
if (Page_Uptodate(page)) {
CERROR("Explain this please?\n");
- EXIT;
- goto readpage_out;
+ GOTO(readpage_out, rc);
}
rc = ll_brw(OBD_BRW_READ, inode, page, 0);
static int ll_prepare_write(struct file *file, struct page *page, unsigned from,
- unsigned to)
+ unsigned to)
{
struct inode *inode = page->mapping->host;
obd_off offset = ((obd_off)page->index) << PAGE_SHIFT;
if (!PageLocked(page))
LBUG();
- if (Page_Uptodate(page)) {
- EXIT;
- goto prepare_done;
- }
+ if (Page_Uptodate(page))
+ GOTO(prepare_done, rc);
if (offset + from >= inode->i_size) {
memset(addr, 0, PAGE_SIZE);
- EXIT;
- goto prepare_done;
+ GOTO(prepare_done, rc);
}
/* We're completely overwriting an existing page, so _don't_ set it up
rc = ll_brw(OBD_BRW_READ, inode, page, 0);
+ EXIT;
prepare_done:
- if ( !rc )
+ if (!rc)
SetPageUptodate(page);
- EXIT;
return rc;
}
CERROR("ll_brw failure %d\n", err);
}
UnlockPage(page);
- EXIT;
- return err;
+ RETURN(err);
}
/* SYNCHRONOUS I/O to object storage for an inode -- object attr will be updated
}
obdo_free(oa);
- EXIT;
- return err;
+ RETURN(err);
} /* ll_commit_write */
void ll_truncate(struct inode *inode)
CERROR("obd_truncate fails (%d)\n", err);
}
EXIT;
- return;
+ return;
} /* ll_truncate */
struct address_space_operations ll_aops = {
commit_write: ll_commit_write,
bmap: NULL
};
-
/* the first parameter should become an mds device no */
ptlrpc_init_client(llite_ha_mgr, MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
&sbi->ll_mds_client);
- err = ptlrpc_connect_client("mds", &sbi->ll_mds_client,
- &sbi->ll_mds_peer);
+ sbi->ll_mds_conn = ptlrpc_connect_client("mds");
if (err) {
CERROR("cannot find MDS\n");
GOTO(out_disc, sb = NULL);
sb->s_op = &ll_super_operations;
/* make root inode */
- err = mdc_getattr(&sbi->ll_mds_client, &sbi->ll_mds_peer,
+ err = mdc_getattr(&sbi->ll_mds_client, sbi->ll_mds_conn,
sbi->ll_rootino, S_IFDIR,
OBD_MD_FLNOTOBD|OBD_MD_FLBLOCKS, &request);
if (err) {
struct ll_sb_info *sbi = sb->u.generic_sbp;
ENTRY;
obd_disconnect(&sbi->ll_conn);
+ ptlrpc_put_connection(sbi->ll_mds_conn);
OBD_FREE(sb->u.generic_sbp, sizeof(*sbi));
MOD_DEC_USE_COUNT;
EXIT;
int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc)
{
- struct ptlrpc_request *request;
+ struct ptlrpc_request *request = NULL;
struct ll_sb_info *sbi = ll_i2sbi(inode);
int err;
/* change incore inode */
ll_attr2inode(inode, attr, do_trunc);
- err = mdc_setattr(&sbi->ll_mds_client, &sbi->ll_mds_peer, inode, attr,
+ err = mdc_setattr(&sbi->ll_mds_client, sbi->ll_mds_conn, inode, attr,
&request);
if (err)
CERROR("mdc_setattr fails (%d)\n", err);
{
int rc;
- rc = ptlrpc_queue_wait(cl, request);
+ rc = ptlrpc_queue_wait(request);
rc = ptlrpc_check_status(request, rc);
if (rc)
CERROR("error in handling %d\n", rc);
return rc;
}
-int mdc_setattr(struct ptlrpc_client *cl, struct lustre_peer *peer,
+int mdc_setattr(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
struct inode *inode, struct iattr *iattr,
struct ptlrpc_request **request)
{
int rc, size = sizeof(*rec);
ENTRY;
- req = ptlrpc_prep_req(cl, peer, MDS_REINT, 1, &size, NULL);
+ req = ptlrpc_prep_req(cl, conn, MDS_REINT, 1, &size, NULL);
if (!req)
RETURN(-ENOMEM);
RETURN(rc);
}
-int mdc_create(struct ptlrpc_client *cl, struct lustre_peer *peer,
+int mdc_create(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
struct inode *dir, const char *name, int namelen,
const char *tgt, int tgtlen, int mode, __u64 id, __u32 uid,
__u32 gid, __u64 time, struct ptlrpc_request **request)
char *tmp;
ENTRY;
- req = ptlrpc_prep_req(cl, peer, MDS_REINT, 3, size, NULL);
+ req = ptlrpc_prep_req(cl, conn, MDS_REINT, 3, size, NULL);
if (!req)
RETURN(-ENOMEM);
RETURN(rc);
}
-int mdc_unlink(struct ptlrpc_client *cl, struct lustre_peer *peer,
+int mdc_unlink(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
struct inode *dir, struct inode *child, const char *name,
int namelen, struct ptlrpc_request **request)
{
char *tmp;
ENTRY;
- req = ptlrpc_prep_req(cl, peer, MDS_REINT, 2, size, NULL);
+ req = ptlrpc_prep_req(cl, conn, MDS_REINT, 2, size, NULL);
if (!req)
RETURN(-ENOMEM);
RETURN(rc);
}
-int mdc_link(struct ptlrpc_client *cl, struct lustre_peer *peer,
+int mdc_link(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
struct dentry *src, struct inode *dir, const char *name,
int namelen, struct ptlrpc_request **request)
{
char *tmp;
ENTRY;
- req = ptlrpc_prep_req(cl, peer, MDS_REINT, 2, size, NULL);
+ req = ptlrpc_prep_req(cl, conn, MDS_REINT, 2, size, NULL);
if (!req)
RETURN(-ENOMEM);
RETURN(rc);
}
-int mdc_rename(struct ptlrpc_client *cl, struct lustre_peer *peer,
+int mdc_rename(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
struct inode *src, struct inode *tgt, const char *old,
int oldlen, const char *new, int newlen,
struct ptlrpc_request **request)
char *tmp;
ENTRY;
- req = ptlrpc_prep_req(cl, peer, MDS_REINT, 3, size, NULL);
+ req = ptlrpc_prep_req(cl, conn, MDS_REINT, 3, size, NULL);
if (!req)
RETURN(-ENOMEM);
extern int mds_queue_req(struct ptlrpc_request *);
-int mdc_connect(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino,
- int type, int valid, struct ptlrpc_request **request)
+int mdc_connect(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
+ ino_t ino, int type, int valid, struct ptlrpc_request **request)
{
struct ptlrpc_request *req;
struct mds_body *body;
int rc, size = sizeof(*body);
ENTRY;
- req = ptlrpc_prep_req(cl, peer, MDS_GETATTR, 1, &size, NULL);
+ req = ptlrpc_prep_req(cl, conn, MDS_GETATTR, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
req->rq_replen = lustre_msg_size(1, &size);
- rc = ptlrpc_queue_wait(cl, req);
+ rc = ptlrpc_queue_wait(req);
rc = ptlrpc_check_status(req, rc);
if (!rc) {
}
-int mdc_getattr(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino,
- int type, int valid, struct ptlrpc_request **request)
+int mdc_getattr(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
+ ino_t ino, int type, int valid, struct ptlrpc_request **request)
{
struct ptlrpc_request *req;
struct mds_body *body;
int rc, size = sizeof(*body);
ENTRY;
- req = ptlrpc_prep_req(cl, peer, MDS_GETATTR, 1, &size, NULL);
+ req = ptlrpc_prep_req(cl, conn, MDS_GETATTR, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
req->rq_replen = lustre_msg_size(1, &size);
- rc = ptlrpc_queue_wait(cl, req);
+ rc = ptlrpc_queue_wait(req);
rc = ptlrpc_check_status(req, rc);
if (!rc) {
return rc;
}
-int mdc_open(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino,
- int type, int flags, __u64 *fh, struct ptlrpc_request **request)
+int mdc_open(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
+ ino_t ino, int type, int flags, __u64 *fh,
+ struct ptlrpc_request **request)
{
struct mds_body *body;
int rc, size = sizeof(*body);
struct ptlrpc_request *req;
- req = ptlrpc_prep_req(cl, peer, MDS_OPEN, 1, &size, NULL);
+ req = ptlrpc_prep_req(cl, conn, MDS_OPEN, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
req->rq_replen = lustre_msg_size(1, &size);
- rc = ptlrpc_queue_wait(cl, req);
+ rc = ptlrpc_queue_wait(req);
rc = ptlrpc_check_status(req, rc);
if (!rc) {
return rc;
}
-int mdc_close(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino,
- int type, __u64 fh, struct ptlrpc_request **request)
+int mdc_close(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
+ ino_t ino, int type, __u64 fh, struct ptlrpc_request **request)
{
struct mds_body *body;
int rc, size = sizeof(*body);
struct ptlrpc_request *req;
- req = ptlrpc_prep_req(cl, peer, MDS_CLOSE, 1, &size, NULL);
+ req = ptlrpc_prep_req(cl, conn, MDS_CLOSE, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
req->rq_replen = lustre_msg_size(1, &size);
- rc = ptlrpc_queue_wait(cl, req);
+ rc = ptlrpc_queue_wait(req);
rc = ptlrpc_check_status(req, rc);
EXIT;
return rc;
}
-int mdc_readpage(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino,
- int type, __u64 offset, char *addr,
+int mdc_readpage(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
+ ino_t ino, int type, __u64 offset, char *addr,
struct ptlrpc_request **request)
{
struct ptlrpc_request *req = NULL;
CDEBUG(D_INODE, "inode: %ld\n", ino);
- bulk = ptlrpc_prep_bulk(peer);
+ bulk = ptlrpc_prep_bulk(conn);
if (bulk == NULL) {
CERROR("%s: cannot init bulk desc\n", __FUNCTION__);
rc = -ENOMEM;
goto out;
}
- req = ptlrpc_prep_req(cl, peer, MDS_READPAGE, 2, size, bufs);
+ req = ptlrpc_prep_req(cl, conn, MDS_READPAGE, 2, size, bufs);
if (!req)
GOTO(out, rc = -ENOMEM);
bulk->b_buflen = PAGE_SIZE;
bulk->b_buf = (void *)(long)niobuf.addr;
bulk->b_portal = MDS_BULK_PORTAL;
- bulk->b_xid = req->rq_xid;
+ bulk->b_xid = req->rq_reqmsg->xid;
rc = ptlrpc_register_bulk(bulk);
if (rc) {
req->rq_replen = lustre_msg_size(1, size);
- rc = ptlrpc_queue_wait(cl, req);
+ rc = ptlrpc_queue_wait(req);
if (rc) {
CERROR("error in handling %d\n", rc);
ptlrpc_abort_bulk(bulk);
{
int err;
struct ptlrpc_client cl;
- struct lustre_peer peer;
+ struct ptlrpc_connection *conn;
struct ptlrpc_request *request;
ENTRY;
}
ptlrpc_init_client(NULL, MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, &cl);
- err = ptlrpc_connect_client("mds", &cl, &peer);
+ conn = ptlrpc_connect_client("mds");
if (err) {
CERROR("cannot create client\n");
RETURN(-EINVAL);
switch (cmd) {
case IOC_REQUEST_GETATTR: {
CERROR("-- getting attr for ino %lu\n", arg);
- err = mdc_getattr(&cl, &peer, arg, S_IFDIR, ~0, &request);
+ err = mdc_getattr(&cl, conn, arg, S_IFDIR, ~0, &request);
CERROR("-- done err %d\n", err);
GOTO(out, err);
break;
}
CERROR("-- readpage 0 for ino %lu\n", arg);
- err = mdc_readpage(&cl, &peer, arg, S_IFDIR, 0, buf, &request);
+ err = mdc_readpage(&cl, conn, arg, S_IFDIR, 0, buf, &request);
CERROR("-- done err %d\n", err);
OBD_FREE(buf, PAGE_SIZE);
iattr.ia_atime = 0;
iattr.ia_valid = ATTR_MODE | ATTR_ATIME;
- err = mdc_setattr(&cl, &peer, &inode, &iattr, &request);
+ err = mdc_setattr(&cl, conn, &inode, &iattr, &request);
CERROR("-- done err %d\n", err);
GOTO(out, err);
iattr.ia_atime = 0;
iattr.ia_valid = ATTR_MODE | ATTR_ATIME;
- err = mdc_create(&cl, &peer, &inode,
+ err = mdc_create(&cl, conn, &inode,
"foofile", strlen("foofile"),
NULL, 0, 0100707, 47114711,
11, 47, 0, &request);
__u64 fh, ino;
copy_from_user(&ino, (__u64 *)arg, sizeof(ino));
CERROR("-- opening ino %llu\n", ino);
- err = mdc_open(&cl, &peer, ino, S_IFDIR, O_RDONLY, &fh,
+ err = mdc_open(&cl, conn, ino, S_IFDIR, O_RDONLY, &fh,
&request);
copy_to_user((__u64 *)arg, &fh, sizeof(fh));
CERROR("-- done err %d (fh=%Lu)\n", err, fh);
case IOC_REQUEST_CLOSE: {
CERROR("-- closing ino 2, filehandle %lu\n", arg);
- err = mdc_close(&cl, &peer, 2, S_IFDIR, arg, &request);
+ err = mdc_close(&cl, conn, 2, S_IFDIR, arg, &request);
CERROR("-- done err %d\n", err);
GOTO(out, err);
out:
ptlrpc_free_req(request);
+ ptlrpc_put_connection(conn);
RETURN(err);
}
{
int rc = 0;
mm_segment_t oldfs = get_fs();
+ struct ptlrpc_bulk_desc *bulk;
+ char *buf;
- if (req->rq_peer.peer_nid == 0) {
- /* dst->addr is a user address, but in a different task! */
- char *buf = (char *)(long)dst->addr;
-
- set_fs(KERNEL_DS);
- rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
- &offset);
- set_fs(oldfs);
-
- if (rc != PAGE_SIZE) {
- rc = -EIO;
- GOTO(out, rc);
- }
- EXIT;
- } else {
- struct ptlrpc_bulk_desc *bulk;
- char *buf;
-
- bulk = ptlrpc_prep_bulk(&req->rq_peer);
- if (bulk == NULL) {
- rc = -ENOMEM;
- GOTO(out, rc);
- }
-
- bulk->b_xid = req->rq_xid;
+ bulk = ptlrpc_prep_bulk(req->rq_connection);
+ if (bulk == NULL) {
+ rc = -ENOMEM;
+ GOTO(out, rc);
+ }
- OBD_ALLOC(buf, PAGE_SIZE);
- if (!buf) {
- rc = -ENOMEM;
- GOTO(cleanup_bulk, rc);
- }
+ bulk->b_xid = req->rq_reqmsg->xid;
- set_fs(KERNEL_DS);
- rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
- &offset);
- set_fs(oldfs);
+ OBD_ALLOC(buf, PAGE_SIZE);
+ if (!buf) {
+ rc = -ENOMEM;
+ GOTO(cleanup_bulk, rc);
+ }
- if (rc != PAGE_SIZE) {
- rc = -EIO;
- GOTO(cleanup_buf, rc);
- }
+ set_fs(KERNEL_DS);
+ rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
+ &offset);
+ set_fs(oldfs);
- bulk->b_buf = buf;
- bulk->b_buflen = PAGE_SIZE;
+ if (rc != PAGE_SIZE) {
+ rc = -EIO;
+ GOTO(cleanup_buf, rc);
+ }
- rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL);
- if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
- CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
- OBD_FAIL_MDS_SENDPAGE, rc);
- PtlMDUnlink(bulk->b_md_h);
- GOTO(cleanup_buf, rc);
- }
- wait_event_interruptible(bulk->b_waitq,
- ptlrpc_check_bulk_sent(bulk));
+ bulk->b_buf = buf;
+ bulk->b_buflen = PAGE_SIZE;
- if (bulk->b_flags == PTL_RPC_INTR) {
- rc = -EINTR;
- GOTO(cleanup_buf, rc);
- }
+ rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL);
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
+ CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
+ OBD_FAIL_MDS_SENDPAGE, rc);
+ PtlMDUnlink(bulk->b_md_h);
+ GOTO(cleanup_buf, rc);
+ }
+ wait_event_interruptible(bulk->b_waitq,
+ ptlrpc_check_bulk_sent(bulk));
- EXIT;
- cleanup_buf:
- OBD_FREE(buf, PAGE_SIZE);
- cleanup_bulk:
- OBD_FREE(bulk, sizeof(*bulk));
+ if (bulk->b_flags == PTL_RPC_INTR) {
+ rc = -EINTR;
+ GOTO(cleanup_buf, rc);
}
-out:
+
+ EXIT;
+ cleanup_buf:
+ OBD_FREE(buf, PAGE_SIZE);
+ cleanup_bulk:
+ OBD_FREE(bulk, sizeof(*bulk));
+ out:
return rc;
}
int rc, size = sizeof(*body);
ENTRY;
- rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf);
- req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
CERROR("mds: out of memory\n");
req->rq_status = -ENOMEM;
int rc, size = sizeof(*body);
ENTRY;
- rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf);
- req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
CERROR("mds: out of memory\n");
req->rq_status = -ENOMEM;
int rc;
ENTRY;
- rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
- req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
CERROR("mds: out of memory\n");
req->rq_status = -ENOMEM;
int rc, size = sizeof(*body);
ENTRY;
- rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf);
- req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
CERROR("mds: out of memory\n");
req->rq_status = -ENOMEM;
int rc;
ENTRY;
- rc = lustre_unpack_msg(req->rq_reqbuf, req->rq_reqlen);
- req->rq_reqmsg = (struct lustre_msg *)req->rq_reqbuf;
+ rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
CERROR("lustre_mds: Invalid request\n");
GOTO(out, rc);
RETURN(rc);
}
- rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf);
- req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc) {
CERROR("mds: out of memory\n");
rc = req->rq_status = -ENOMEM;
#include <linux/obd_ost.h>
static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl,
- struct lustre_peer **peer)
+ struct ptlrpc_connection **connection)
{
struct osc_obd *osc = &conn->oc_dev->u.osc;
*cl = osc->osc_client;
- *peer = &osc->osc_peer;
+ *connection = osc->osc_conn;
}
static int osc_connect(struct obd_conn *conn)
{
struct ptlrpc_request *request;
struct ptlrpc_client *cl;
- struct lustre_peer *peer;
+ struct ptlrpc_connection *connection;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
- osc_con2cl(conn, &cl, &peer);
- request = ptlrpc_prep_req(cl, peer, OST_CONNECT, 0, NULL, NULL);
+ osc_con2cl(conn, &cl, &connection);
+ request = ptlrpc_prep_req(cl, connection, OST_CONNECT, 0, NULL, NULL);
if (!request)
RETURN(-ENOMEM);
request->rq_replen = lustre_msg_size(1, &size);
- rc = ptlrpc_queue_wait(cl, request);
+ rc = ptlrpc_queue_wait(request);
if (rc)
GOTO(out, rc);
{
struct ptlrpc_request *request;
struct ptlrpc_client *cl;
- struct lustre_peer *peer;
+ struct ptlrpc_connection *connection;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
- osc_con2cl(conn, &cl, &peer);
- request = ptlrpc_prep_req(cl, peer, OST_DISCONNECT, 1, &size, NULL);
+ osc_con2cl(conn, &cl, &connection);
+ request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
request->rq_replen = lustre_msg_size(1, &size);
- rc = ptlrpc_queue_wait(cl, request);
+ rc = ptlrpc_queue_wait(request);
GOTO(out, rc);
out:
ptlrpc_free_req(request);
{
struct ptlrpc_request *request;
struct ptlrpc_client *cl;
- struct lustre_peer *peer;
+ struct ptlrpc_connection *connection;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
- osc_con2cl(conn, &cl, &peer);
- request = ptlrpc_prep_req(cl, peer, OST_GETATTR, 1, &size, NULL);
+ osc_con2cl(conn, &cl, &connection);
+ request = ptlrpc_prep_req(cl, connection, OST_GETATTR, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
request->rq_replen = lustre_msg_size(1, &size);
- rc = ptlrpc_queue_wait(cl, request);
+ rc = ptlrpc_queue_wait(request);
if (rc)
GOTO(out, rc);
{
struct ptlrpc_request *request;
struct ptlrpc_client *cl;
- struct lustre_peer *peer;
+ struct ptlrpc_connection *connection;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
- osc_con2cl(conn, &cl, &peer);
- request = ptlrpc_prep_req(cl, peer, OST_OPEN, 1, &size, NULL);
+ osc_con2cl(conn, &cl, &connection);
+ request = ptlrpc_prep_req(cl, connection, OST_OPEN, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
request->rq_replen = lustre_msg_size(1, &size);
- rc = ptlrpc_queue_wait(cl, request);
+ rc = ptlrpc_queue_wait(request);
if (rc)
GOTO(out, rc);
{
struct ptlrpc_request *request;
struct ptlrpc_client *cl;
- struct lustre_peer *peer;
+ struct ptlrpc_connection *connection;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
- osc_con2cl(conn, &cl, &peer);
- request = ptlrpc_prep_req(cl, peer, OST_CLOSE, 1, &size, NULL);
+ osc_con2cl(conn, &cl, &connection);
+ request = ptlrpc_prep_req(cl, connection, OST_CLOSE, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
request->rq_replen = lustre_msg_size(1, &size);
- rc = ptlrpc_queue_wait(cl, request);
+ rc = ptlrpc_queue_wait(request);
if (rc)
GOTO(out, rc);
{
struct ptlrpc_request *request;
struct ptlrpc_client *cl;
- struct lustre_peer *peer;
+ struct ptlrpc_connection *connection;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
- osc_con2cl(conn, &cl, &peer);
- request = ptlrpc_prep_req(cl, peer, OST_SETATTR, 1, &size, NULL);
+ osc_con2cl(conn, &cl, &connection);
+ request = ptlrpc_prep_req(cl, connection, OST_SETATTR, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
request->rq_replen = lustre_msg_size(1, &size);
- rc = ptlrpc_queue_wait(cl, request);
+ rc = ptlrpc_queue_wait(request);
GOTO(out, rc);
out:
{
struct ptlrpc_request *request;
struct ptlrpc_client *cl;
- struct lustre_peer *peer;
+ struct ptlrpc_connection *connection;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
CERROR("oa NULL\n");
RETURN(-EINVAL);
}
- osc_con2cl(conn, &cl, &peer);
- request = ptlrpc_prep_req(cl, peer, OST_CREATE, 1, &size, NULL);
+ osc_con2cl(conn, &cl, &connection);
+ request = ptlrpc_prep_req(cl, connection, OST_CREATE, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
request->rq_replen = lustre_msg_size(1, &size);
- rc = ptlrpc_queue_wait(cl, request);
+ rc = ptlrpc_queue_wait(request);
if (rc)
GOTO(out, rc);
{
struct ptlrpc_request *request;
struct ptlrpc_client *cl;
- struct lustre_peer *peer;
+ struct ptlrpc_connection *connection;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
CERROR("oa NULL\n");
RETURN(-EINVAL);
}
- osc_con2cl(conn, &cl, &peer);
- request = ptlrpc_prep_req(cl, peer, OST_PUNCH, 1, &size, NULL);
+ osc_con2cl(conn, &cl, &connection);
+ request = ptlrpc_prep_req(cl, connection, OST_PUNCH, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
request->rq_replen = lustre_msg_size(1, &size);
- rc = ptlrpc_queue_wait(cl, request);
+ rc = ptlrpc_queue_wait(request);
if (rc)
GOTO(out, rc);
{
struct ptlrpc_request *request;
struct ptlrpc_client *cl;
- struct lustre_peer *peer;
+ struct ptlrpc_connection *connection;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
CERROR("oa NULL\n");
RETURN(-EINVAL);
}
- osc_con2cl(conn, &cl, &peer);
- request = ptlrpc_prep_req(cl, peer, OST_DESTROY, 1, &size, NULL);
+ osc_con2cl(conn, &cl, &connection);
+ request = ptlrpc_prep_req(cl, connection, OST_DESTROY, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
request->rq_replen = lustre_msg_size(1, &size);
- rc = ptlrpc_queue_wait(cl, request);
+ rc = ptlrpc_queue_wait(request);
if (rc)
GOTO(out, rc);
struct niobuf *dst, struct niobuf *src)
{
struct ptlrpc_client *cl;
- struct lustre_peer *peer;
+ struct ptlrpc_connection *connection;
- osc_con2cl(conn, &cl, &peer);
+ osc_con2cl(conn, &cl, &connection);
if (cl->cli_obd) {
/* local sendpage */
struct ptlrpc_bulk_desc *bulk;
int rc;
- bulk = ptlrpc_prep_bulk(peer);
+ bulk = ptlrpc_prep_bulk(connection);
if (bulk == NULL)
return -ENOMEM;
obd_off *offset, obd_flag *flags)
{
struct ptlrpc_client *cl;
- struct lustre_peer *peer;
+ struct ptlrpc_connection *connection;
struct ptlrpc_request *request;
struct ost_body *body;
struct obd_ioobj ioo;
if (bulk == NULL)
RETURN(-ENOMEM);
- osc_con2cl(conn, &cl, &peer);
- request = ptlrpc_prep_req(cl, peer, OST_BRW, 3, size, NULL);
+ osc_con2cl(conn, &cl, &connection);
+ request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
if (!request)
GOTO(out, rc = -ENOMEM);
for (pages = 0, i = 0; i < num_oa; i++) {
ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
for (j = 0; j < oa_bufs[i]; j++, pages++) {
- bulk[pages] = ptlrpc_prep_bulk(peer);
+ bulk[pages] = ptlrpc_prep_bulk(connection);
if (bulk[pages] == NULL)
GOTO(out, rc = -ENOMEM);
- spin_lock(&cl->cli_lock);
- bulk[pages]->b_xid = cl->cli_xid++;
- spin_unlock(&cl->cli_lock);
+ spin_lock(&connection->c_lock);
+ bulk[pages]->b_xid = ++connection->c_xid_out;
+ spin_unlock(&connection->c_lock);
bulk[pages]->b_buf = kmap(buf[pages]);
bulk[pages]->b_buflen = PAGE_SIZE;
}
request->rq_replen = lustre_msg_size(1, size);
- rc = ptlrpc_queue_wait(cl, request);
+ rc = ptlrpc_queue_wait(request);
GOTO(out, rc);
out:
obd_off *offset, obd_flag *flags)
{
struct ptlrpc_client *cl;
- struct lustre_peer *peer;
+ struct ptlrpc_connection *connection;
struct ptlrpc_request *request;
struct obd_ioobj ioo;
struct ost_body *body;
if (!src)
RETURN(-ENOMEM);
- osc_con2cl(conn, &cl, &peer);
- request = ptlrpc_prep_req(cl, peer, OST_BRW, 3, size, NULL);
+ osc_con2cl(conn, &cl, &connection);
+ request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
if (!request)
RETURN(-ENOMEM);
body = lustre_msg_buf(request->rq_reqmsg, 0);
size[1] = pages * sizeof(struct niobuf);
request->rq_replen = lustre_msg_size(2, size);
- rc = ptlrpc_queue_wait(cl, request);
+ rc = ptlrpc_queue_wait(request);
if (rc)
GOTO(out, rc);
}
/* mount the file system (secretly) */
-static int osc_setup(struct obd_device *obddev, obd_count len,
- void *buf)
-
+static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
{
struct osc_obd *osc = &obddev->u.osc;
- struct obd_ioctl_data *data = (struct obd_ioctl_data *)buf;
- int rc;
ENTRY;
OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
RETURN(-ENOMEM);
ptlrpc_init_client(NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
- osc->osc_client);
+ osc->osc_client);
- rc = ptlrpc_connect_client("ost", osc->osc_client, &osc->osc_peer);
+ osc->osc_conn = ptlrpc_connect_client("ost");
+ if (!osc->osc_conn)
+ RETURN(-EINVAL);
- if (rc == 0)
- MOD_INC_USE_COUNT;
- RETURN(rc);
+ MOD_INC_USE_COUNT;
+ RETURN(0);
}
static int osc_cleanup(struct obd_device * obddev)
{
struct osc_obd *osc = &obddev->u.osc;
- if (osc->osc_client != NULL)
- OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
+ OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
+ ptlrpc_put_connection(osc->osc_conn);
MOD_DEC_USE_COUNT;
return 0;
conn.oc_id = body->connid;
conn.oc_dev = ost->ost_tgt;
- rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf);
- req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
RETURN(rc);
conn.oc_id = body->connid;
conn.oc_dev = ost->ost_tgt;
- rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf);
- req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
RETURN(rc);
conn.oc_id = body->connid;
conn.oc_dev = ost->ost_tgt;
- rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf);
- req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
RETURN(rc);
conn.oc_id = body->connid;
conn.oc_dev = ost->ost_tgt;
- rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf);
- req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
RETURN(rc);
conn.oc_id = body->connid;
conn.oc_dev = ost->ost_tgt;
- rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf);
- req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
RETURN(rc);
conn.oc_id = body->connid;
conn.oc_dev = ost->ost_tgt;
- rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf);
- req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
RETURN(rc);
conn.oc_id = body->connid;
conn.oc_dev = ost->ost_tgt;
- rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf);
- req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
RETURN(rc);
conn.oc_dev = ost->ost_tgt;
- rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf);
- req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
RETURN(rc);
req->rq_status = obd_connect(&conn);
- CDEBUG(D_IOCTL, "rep buffer %p, id %d\n", req->rq_repbuf, conn.oc_id);
+ CDEBUG(D_IOCTL, "rep buffer %p, id %d\n", req->rq_repmsg, conn.oc_id);
body = lustre_msg_buf(req->rq_repmsg, 0);
body->connid = conn.oc_id;
RETURN(0);
conn.oc_id = body->connid;
conn.oc_dev = ost->ost_tgt;
- rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf);
- req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
RETURN(rc);
req->rq_status = obd_get_info(&conn, req->rq_reqmsg->buflens[1], ptr,
&(size[1]), (void **)&(bufs[1]));
- rc = lustre_pack_msg(2, size, bufs, &req->rq_replen, &req->rq_repbuf);
- req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ rc = lustre_pack_msg(2, size, bufs, &req->rq_replen, &req->rq_repmsg);
if (rc)
CERROR("cannot pack reply\n");
ost_unpack_niobuf(&tmp2, &nb);
}
- rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf);
- req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
RETURN(rc);
OBD_ALLOC(res, sizeof(*res) * niocount);
GOTO(out, 0);
for (i = 0; i < niocount; i++) {
- bulk = ptlrpc_prep_bulk(&req->rq_peer);
+ bulk = ptlrpc_prep_bulk(req->rq_connection);
if (bulk == NULL) {
CERROR("cannot alloc bulk desc\n");
GOTO(out, rc = -ENOMEM);
}
size[1] = niocount * sizeof(*nb);
- rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repbuf);
- req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
RETURN(rc);
struct ptlrpc_bulk_desc *bulk;
struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
- bulk = ptlrpc_prep_bulk(&req->rq_peer);
+ bulk = ptlrpc_prep_bulk(req->rq_connection);
if (bulk == NULL)
GOTO(out, rc = -ENOMEM);
struct ost_obd *ost = &obddev->u.ost;
ENTRY;
- rc = lustre_unpack_msg(req->rq_reqbuf, req->rq_reqlen);
- req->rq_reqmsg = (struct lustre_msg *)req->rq_reqbuf;
+ rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
CERROR("lustre_mds: Invalid request\n");
GOTO(out, rc);
modulefs_DATA = ptlrpc.o
EXTRA_PROGRAMS = ptlrpc
-ptlrpc_SOURCES = rpc.c events.c service.c client.c niobuf.c pack_generic.c
+ptlrpc_SOURCES = client.c connection.c events.c niobuf.c pack_generic.c rpc.c \
+service.c
include $(top_srcdir)/Rules
#define EXPORT_SYMTAB
-#include <linux/config.h>
-#include <linux/module.h>
-#include <linux/kernel.h>
-#include <linux/list.h>
-
#define DEBUG_SUBSYSTEM S_RPC
-#include <linux/obd_support.h>
-#include <linux/obd_class.h>
#include <linux/lustre_net.h>
void llite_ha_conn_manage(struct lustre_ha_mgr *mgr, struct ptlrpc_client *cli)
EXIT;
}
-void ptlrpc_init_client(struct lustre_ha_mgr *mgr, int req_portal, int rep_portal,
- struct ptlrpc_client *cl)
+void ptlrpc_init_client(struct lustre_ha_mgr *mgr, int req_portal,
+ int rep_portal, struct ptlrpc_client *cl)
{
memset(cl, 0, sizeof(*cl));
spin_lock_init(&cl->cli_lock);
cl->cli_ha_mgr = mgr;
if (mgr)
llite_ha_conn_manage(mgr, cl);
- cl->cli_xid = 1;
- cl->cli_generation = 1;
- cl->cli_epoch = 1;
- cl->cli_bootcount = 0;
cl->cli_obd = NULL;
cl->cli_request_portal = req_portal;
cl->cli_reply_portal = rep_portal;
sema_init(&cl->cli_rpc_sem, 32);
}
-int ptlrpc_connect_client(char *uuid, struct ptlrpc_client *cl,
- struct lustre_peer *peer)
+struct ptlrpc_connection *ptlrpc_connect_client(char *uuid)
{
+ struct ptlrpc_connection *c;
+ struct lustre_peer peer;
int err;
- cl->cli_epoch++;
- err = kportal_uuid_to_peer(uuid, peer);
- if (err != 0)
+ err = kportal_uuid_to_peer(uuid, &peer);
+ if (err != 0) {
CERROR("cannot find peer %s!\n", uuid);
+ return NULL;
+ }
- return err;
+ c = ptlrpc_get_connection(&peer);
+ if (c)
+ c->c_epoch++;
+
+ return c;
}
-struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct lustre_peer *peer)
+struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *conn)
{
struct ptlrpc_bulk_desc *bulk;
OBD_ALLOC(bulk, sizeof(*bulk));
if (bulk != NULL) {
- memcpy(&bulk->b_peer, peer, sizeof(*peer));
+ bulk->b_connection = ptlrpc_connection_addref(conn);
init_waitqueue_head(&bulk->b_waitq);
}
}
struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
- struct lustre_peer *peer, int opcode,
- int count, int *lengths, char **bufs)
+ struct ptlrpc_connection *conn,
+ int opcode, int count, int *lengths,
+ char **bufs)
{
struct ptlrpc_request *request;
int rc;
RETURN(NULL);
}
- spin_lock(&cl->cli_lock);
- request->rq_xid = cl->cli_xid++;
- spin_unlock(&cl->cli_lock);
-
rc = lustre_pack_msg(count, lengths, bufs,
- &request->rq_reqlen, &request->rq_reqbuf);
+ &request->rq_reqlen, &request->rq_reqmsg);
if (rc) {
CERROR("cannot pack request %d\n", rc);
RETURN(NULL);
}
+
request->rq_time = CURRENT_TIME;
request->rq_type = PTL_RPC_REQUEST;
- memcpy(&request->rq_peer, peer, sizeof(*peer));
- request->rq_reqmsg = (struct lustre_msg *)request->rq_reqbuf;
+ request->rq_connection = ptlrpc_connection_addref(conn);
+
+ request->rq_reqmsg->conn = (__u64)(unsigned long)conn;
+ request->rq_reqmsg->token = conn->c_token;
request->rq_reqmsg->opc = HTON__u32(opcode);
- request->rq_reqmsg->xid = HTON__u32(request->rq_xid);
request->rq_reqmsg->type = HTON__u32(request->rq_type);
+
+ spin_lock(&conn->c_lock);
+ request->rq_reqmsg->xid = HTON__u32(++conn->c_xid_out);
+ spin_unlock(&c->c_lock);
+
request->rq_client = cl;
- request->rq_req_portal = cl->cli_request_portal;
- request->rq_reply_portal = cl->cli_reply_portal;
RETURN(request);
}
if (request == NULL)
return;
- if (request->rq_repbuf != NULL)
- OBD_FREE(request->rq_repbuf, request->rq_replen);
+ if (request->rq_repmsg != NULL)
+ OBD_FREE(request->rq_repmsg, request->rq_replen);
+
+ ptlrpc_put_connection(request->rq_connection);
+
OBD_FREE(request, sizeof(*request));
}
int rc = 0;
schedule_timeout(3 * HZ); /* 3 second timeout */
- if (req->rq_repbuf != NULL) {
+ if (req->rq_repmsg != NULL) {
req->rq_flags = PTL_RPC_REPLY;
GOTO(out, rc = 1);
}
static void ptlrpc_cleanup_request_buf(struct ptlrpc_request *request)
{
- OBD_FREE(request->rq_reqbuf, request->rq_reqlen);
- request->rq_reqbuf = NULL;
+ OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
+ request->rq_reqmsg = NULL;
request->rq_reqlen = 0;
}
* that we can tear down the buffer safely. */
PtlMEUnlink(request->rq_reply_me_h);
OBD_FREE(request->rq_reply_md.start, request->rq_replen);
- request->rq_repbuf = NULL;
+ request->rq_repmsg = NULL;
request->rq_replen = 0;
return 0;
}
-int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
+int ptlrpc_queue_wait(struct ptlrpc_request *req)
{
int rc = 0;
ENTRY;
init_waitqueue_head(&req->rq_wait_for_rep);
- rc = ptl_send_rpc(req, cl);
+ rc = ptl_send_rpc(req);
if (rc) {
CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
ptlrpc_cleanup_request_buf(req);
- up(&cl->cli_rpc_sem);
+ up(&req->rq_client->cli_rpc_sem);
RETURN(-rc);
}
wait_event_interruptible(req->rq_wait_for_rep, ptlrpc_check_reply(req));
CDEBUG(D_OTHER, "-- done\n");
ptlrpc_cleanup_request_buf(req);
- up(&cl->cli_rpc_sem);
+ up(&req->rq_client->cli_rpc_sem);
if (req->rq_flags == PTL_RPC_INTR) {
/* Clean up the dangling reply buffers */
ptlrpc_abort(req);
GOTO(out, rc = -EINTR);
}
- rc = lustre_unpack_msg(req->rq_repbuf, req->rq_replen);
- req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
if (rc) {
CERROR("unpack_rep failed: %d\n", rc);
GOTO(out, rc);
CDEBUG(D_NET, "got rep %d\n", req->rq_repmsg->xid);
if (req->rq_repmsg->status == 0)
- CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repbuf,
+ CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repmsg,
req->rq_replen, req->rq_repmsg->status);
EXIT;
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2002 Cluster File Systems, Inc.
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or
+ * modify it under the terms of version 2 of the GNU General Public
+ * License as published by the Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Lustre; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#define EXPORT_SYMTAB
+
+#define DEBUG_SUBSYSTEM S_RPC
+
+#include <linux/lustre_net.h>
+
+static spinlock_t conn_lock;
+static struct list_head conn_list;
+static struct list_head conn_unused_list;
+
+struct ptlrpc_connection *ptlrpc_get_connection(struct lustre_peer *peer)
+{
+ struct list_head *tmp, *pos;
+ struct ptlrpc_connection *c;
+ ENTRY;
+
+ spin_lock(&conn_lock);
+ list_for_each(tmp, &conn_list) {
+ c = list_entry(tmp, struct ptlrpc_connection, c_link);
+ if (memcmp(peer, &c->c_peer, sizeof(*peer)) == 0) {
+ atomic_inc(&c->c_refcount);
+ GOTO(out, c);
+ }
+ }
+
+ list_for_each_safe(tmp, pos, &conn_unused_list) {
+ c = list_entry(tmp, struct ptlrpc_connection, c_link);
+ if (memcmp(peer, &c->c_peer, sizeof(*peer)) == 0) {
+ atomic_inc(&c->c_refcount);
+ list_del(&c->c_link);
+ list_add(&c->c_link, &conn_list);
+ GOTO(out, c);
+ }
+ }
+
+ /* FIXME: this should be a slab once we can validate slab addresses
+ * without OOPSing */
+ OBD_ALLOC(c, sizeof(*c));
+ if (c == NULL)
+ GOTO(out, c);
+
+ c->c_xid_in = 1;
+ c->c_xid_out = 1;
+ c->c_generation = 1;
+ c->c_epoch = 1;
+ c->c_bootcount = 0;
+ atomic_set(&c->c_refcount, 1);
+
+ memcpy(&c->c_peer, peer, sizeof(c->c_peer));
+ list_add(&c->c_link, &conn_list);
+
+ EXIT;
+ out:
+ spin_unlock(&conn_lock);
+ return c;
+}
+
+int ptlrpc_put_connection(struct ptlrpc_connection *c)
+{
+ int rc = 0;
+
+ if (atomic_dec_and_test(&c->c_refcount)) {
+ spin_lock(&conn_lock);
+ list_del(&c->c_link);
+ list_add(&c->c_link, &conn_unused_list);
+ spin_unlock(&conn_lock);
+ rc = 1;
+ }
+
+ return rc;
+}
+
+struct ptlrpc_connection *ptlrpc_connection_addref(struct ptlrpc_connection *c)
+{
+ atomic_inc(&c->c_refcount);
+ return c;
+}
+
+void ptlrpc_init_connection(void)
+{
+ INIT_LIST_HEAD(&conn_list);
+ INIT_LIST_HEAD(&conn_unused_list);
+ conn_lock = SPIN_LOCK_UNLOCKED;
+}
+
+void ptlrpc_cleanup_connection(void)
+{
+ struct list_head *tmp, *pos;
+ struct ptlrpc_connection *c;
+
+ spin_lock(&conn_lock);
+ list_for_each_safe(tmp, pos, &conn_unused_list) {
+ c = list_entry(tmp, struct ptlrpc_connection, c_link);
+ list_del(&c->c_link);
+ OBD_FREE(c, sizeof(*c));
+ }
+ list_for_each_safe(tmp, pos, &conn_list) {
+ c = list_entry(tmp, struct ptlrpc_connection, c_link);
+ CERROR("Connection %p has refcount %d at cleanup (nid=%lu)!\n",
+ c, atomic_read(&c->c_refcount),
+ (unsigned long)c->c_peer.peer_nid);
+ list_del(&c->c_link);
+ OBD_FREE(c, sizeof(*c));
+ }
+ spin_unlock(&conn_lock);
+}
#define EXPORT_SYMTAB
-#include <linux/config.h>
#include <linux/module.h>
-#include <linux/kernel.h>
#define DEBUG_SUBSYSTEM S_RPC
-#include <linux/obd_support.h>
-#include <linux/obd_class.h>
#include <linux/lustre_net.h>
ptl_handle_eq_t request_out_eq,
ENTRY;
if (ev->type == PTL_EVENT_PUT) {
- rpc->rq_repbuf = ev->mem_desc.start + ev->offset;
+ rpc->rq_repmsg = ev->mem_desc.start + ev->offset;
barrier();
wake_up_interruptible(&rpc->rq_wait_for_rep);
} else {
#define EXPORT_SYMTAB
-#include <linux/config.h>
-#include <linux/module.h>
-#include <linux/kernel.h>
-
#define DEBUG_SUBSYSTEM S_RPC
-#include <linux/obd_support.h>
-#include <linux/obd_class.h>
#include <linux/lustre_net.h>
extern ptl_handle_eq_t request_out_eq,
int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk)
{
- if (bulk->b_flags == PTL_BULK_SENT) {
- EXIT;
- return 1;
- }
+ ENTRY;
+
+ if (bulk->b_flags == PTL_BULK_SENT)
+ RETURN(1);
if (sigismember(&(current->pending.signal), SIGKILL) ||
sigismember(&(current->pending.signal), SIGINT)) {
bulk->b_flags = PTL_RPC_INTR;
- EXIT;
- return 1;
+ RETURN(1);
}
CDEBUG(D_NET, "no event yet\n");
- return 0;
+ RETURN(0);
}
-int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
+int ptl_send_buf(struct ptlrpc_request *request, struct ptlrpc_connection *conn,
int portal)
{
int rc;
ack = PTL_ACK_REQ;
break;
case PTL_RPC_REQUEST:
- request->rq_req_md.start = request->rq_reqbuf;
+ request->rq_req_md.start = request->rq_reqmsg;
request->rq_req_md.length = request->rq_reqlen;
request->rq_req_md.eventq = request_out_eq;
request->rq_req_md.threshold = 1;
ack = PTL_NOACK_REQ;
break;
case PTL_RPC_REPLY:
- request->rq_req_md.start = request->rq_repbuf;
+ request->rq_req_md.start = request->rq_repmsg;
request->rq_req_md.length = request->rq_replen;
request->rq_req_md.eventq = reply_out_eq;
request->rq_req_md.threshold = 1;
request->rq_req_md.options = PTL_MD_OP_PUT;
request->rq_req_md.user_ptr = request;
- rc = PtlMDBind(peer->peer_ni, request->rq_req_md, &md_h);
+ rc = PtlMDBind(conn->c_peer.peer_ni, request->rq_req_md, &md_h);
//CERROR("MDBind (outgoing req/rep/bulk): %Lu\n", (__u64)md_h);
if (rc != 0) {
CERROR("PtlMDBind failed: %d\n", rc);
return rc;
}
- remote_id.nid = peer->peer_nid;
+ remote_id.nid = conn->c_peer.peer_nid;
remote_id.pid = 0;
CDEBUG(D_NET, "Sending %d bytes to portal %d, xid %d\n",
- request->rq_req_md.length, portal, request->rq_xid);
+ request->rq_req_md.length, portal, request->rq_reqmsg->xid);
- rc = PtlPut(md_h, ack, remote_id, portal, 0, request->rq_xid, 0, 0);
+ rc = PtlPut(md_h, ack, remote_id, portal, 0, request->rq_reqmsg->xid,
+ 0, 0);
if (rc != PTL_OK) {
CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid,
- portal, request->rq_xid, rc);
+ portal, request->rq_reqmsg->xid, rc);
PtlMDUnlink(md_h);
}
bulk->b_md.options = PTL_MD_OP_PUT;
bulk->b_md.user_ptr = bulk;
- rc = PtlMDBind(bulk->b_peer.peer_ni, bulk->b_md, &bulk->b_md_h);
+ rc = PtlMDBind(bulk->b_connection->c_peer.peer_ni, bulk->b_md,
+ &bulk->b_md_h);
if (rc != 0) {
CERROR("PtlMDBind failed: %d\n", rc);
LBUG();
return rc;
}
- remote_id.nid = bulk->b_peer.peer_nid;
+ remote_id.nid = bulk->b_connection->c_peer.peer_nid;
remote_id.pid = 0;
CDEBUG(D_NET, "Sending %d bytes to portal %d, xid %d\n",
int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *bulk)
{
int rc;
-
ENTRY;
- rc = PtlMEAttach(bulk->b_peer.peer_ni, bulk->b_portal, local_id,
- bulk->b_xid, 0, PTL_UNLINK, PTL_INS_AFTER,
+ rc = PtlMEAttach(bulk->b_connection->c_peer.peer_ni, bulk->b_portal,
+ local_id, bulk->b_xid, 0, PTL_UNLINK, PTL_INS_AFTER,
&bulk->b_me_h);
if (rc != PTL_OK) {
CERROR("PtlMEAttach failed: %d\n", rc);
{
/* FIXME: we need to increment the count of handled events */
req->rq_type = PTL_RPC_REPLY;
+ req->rq_repmsg->conn = req->rq_reqmsg->conn;
+ req->rq_repmsg->token = req->rq_reqmsg->token;
req->rq_repmsg->xid = HTON__u32(req->rq_reqmsg->xid);
req->rq_repmsg->status = HTON__u32(req->rq_status);
req->rq_reqmsg->type = HTON__u32(req->rq_type);
- return ptl_send_buf(req, &req->rq_peer, svc->srv_rep_portal);
+ return ptl_send_buf(req, req->rq_connection, svc->srv_rep_portal);
}
int ptlrpc_error(struct ptlrpc_service *svc, struct ptlrpc_request *req)
int rc;
ENTRY;
- if (req->rq_repbuf) {
- CERROR("req has repbuf\n");
+ if (req->rq_repmsg) {
+ CERROR("req already has repmsg\n");
LBUG();
}
- rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
- req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+ rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
RETURN(rc);
RETURN(rc);
}
-int ptl_send_rpc(struct ptlrpc_request *request, struct ptlrpc_client *cl)
+int ptl_send_rpc(struct ptlrpc_request *request)
{
ptl_process_id_t local_id;
int rc;
RETURN(-EINVAL);
}
- /* request->rq_repbuf is set only when the reply comes in, in
+ /* request->rq_repmsg is set only when the reply comes in, in
* client_packet_callback() */
OBD_ALLOC(repbuf, request->rq_replen);
if (!repbuf)
local_id.nid = PTL_ID_ANY;
local_id.pid = PTL_ID_ANY;
- down(&cl->cli_rpc_sem);
+ down(&request->rq_client->cli_rpc_sem);
- rc = PtlMEAttach(request->rq_peer.peer_ni, request->rq_reply_portal,
- local_id, request->rq_xid, 0, PTL_UNLINK,
+ rc = PtlMEAttach(request->rq_connection->c_peer.peer_ni,
+ request->rq_client->cli_reply_portal,
+ local_id, request->rq_reqmsg->xid, 0, PTL_UNLINK,
PTL_INS_AFTER, &request->rq_reply_me_h);
if (rc != PTL_OK) {
CERROR("PtlMEAttach failed: %d\n", rc);
}
CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid %u, portal %u\n",
- request->rq_replen, request->rq_xid, request->rq_reply_portal);
+ request->rq_replen, request->rq_reqmsg->xid,
+ request->rq_client->cli_request_portal);
- list_add(&request->rq_list, &cl->cli_sending_head);
- rc = ptl_send_buf(request, &request->rq_peer, request->rq_req_portal);
+ list_add(&request->rq_list, &request->rq_client->cli_sending_head);
+ rc = ptl_send_buf(request, request->rq_connection,
+ request->rq_client->cli_request_portal);
RETURN(rc);
cleanup2:
PtlMEUnlink(request->rq_reply_me_h);
cleanup:
OBD_FREE(repbuf, request->rq_replen);
- up(&cl->cli_rpc_sem);
+ up(&request->rq_client->cli_rpc_sem);
return rc;
}
#define DEBUG_SUBSYSTEM S_CLASS
-#include <linux/obd_class.h>
#include <linux/lustre_net.h>
-int lustre_pack_msg(int count, int *lens, char **bufs, int *len, char **buf)
+int lustre_pack_msg(int count, int *lens, char **bufs, int *len,
+ struct lustre_msg **msg)
{
char *ptr;
struct lustre_msg *m;
*len = sizeof(*m) + count * sizeof(__u32) + size;
- OBD_ALLOC(*buf, *len);
- if (!*buf)
+ OBD_ALLOC(*msg, *len);
+ if (!*msg)
RETURN(-ENOMEM);
- m = (struct lustre_msg *)(*buf);
+ m = *msg;
m->bufcount = HTON__u32(count);
for (i = 0; i < count; i++)
m->buflens[i] = HTON__u32(lens[i]);
- ptr = *buf + sizeof(*m) + sizeof(__u32) * count;
+ ptr = (char *)m + sizeof(*m) + sizeof(__u32) * count;
for (i = 0; i < count; i++) {
char *tmp = NULL;
if (bufs)
return size;
}
-int lustre_unpack_msg(char *buf, int len)
+int lustre_unpack_msg(struct lustre_msg *m, int len)
{
- struct lustre_msg *m = (struct lustre_msg *)buf;
int required_len, i;
required_len = sizeof(*m);
#define EXPORT_SYMTAB
-#include <linux/config.h>
#include <linux/module.h>
-#include <linux/kernel.h>
#define DEBUG_SUBSYSTEM S_RPC
-#include <linux/obd_support.h>
-#include <linux/obd_class.h>
#include <linux/lustre_net.h>
-
extern int ptlrpc_init_portals(void);
extern void ptlrpc_exit_portals(void);
static int __init ptlrpc_init(void)
{
+ ptlrpc_init_connection();
return ptlrpc_init_portals();
}
static void __exit ptlrpc_exit(void)
{
ptlrpc_exit_portals();
-
- return;
+ ptlrpc_cleanup_connection();
}
MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
#define EXPORT_SYMTAB
-#include <linux/config.h>
-#include <linux/module.h>
-#include <linux/kernel.h>
-
#define DEBUG_SUBSYSTEM S_RPC
-#include <linux/obd_support.h>
-#include <linux/obd_class.h>
#include <linux/lustre_net.h>
extern int request_in_callback(ptl_event_t *ev, void *data);
return NULL;
}
+static int handle_incoming_request(struct obd_device *obddev,
+ struct ptlrpc_service *svc)
+{
+ struct ptlrpc_request request;
+ struct lustre_peer peer;
+ void *start;
+ int rc;
+
+ /* FIXME: If we move to an event-driven model, we should put the request
+ * on the stack of mds_handle instead. */
+ start = svc->srv_ev.mem_desc.start;
+ memset(&request, 0, sizeof(request));
+ request.rq_obd = obddev;
+ request.rq_reqmsg = (svc->srv_ev.mem_desc.start +
+ svc->srv_ev.offset);
+ request.rq_reqlen = svc->srv_ev.mem_desc.length;
+
+ if (request.rq_reqmsg->xid != svc->srv_ev.match_bits)
+ LBUG();
+
+ CDEBUG(D_NET, "got req %d\n", request.rq_reqmsg->xid);
+
+ if (request.rq_reqmsg->conn) {
+ request.rq_connection =
+ (void *)(unsigned long)request.rq_reqmsg->conn;
+ if (request.rq_reqmsg->token != request.rq_connection->c_token)
+ LBUG();
+ ptlrpc_connection_addref(request.rq_connection);
+ } else {
+ request.rq_connection = ptlrpc_get_connection(&peer);
+ if (!request.rq_connection)
+ LBUG();
+ CERROR("Did not find valid/conn token pair.\n");
+ }
+
+ peer.peer_nid = svc->srv_ev.initiator.nid;
+ /* FIXME: this NI should be the incoming NI.
+ * We don't know how to find that from here. */
+ peer.peer_ni = svc->srv_self.peer_ni;
+
+ svc->srv_flags &= ~SVC_EVENT;
+
+ spin_unlock(&svc->srv_lock);
+ rc = svc->srv_handler(obddev, svc, &request);
+ ptlrpc_put_connection(request.rq_connection);
+ ptl_handled_rpc(svc, start);
+ return rc;
+}
+
static int ptlrpc_main(void *arg)
{
int rc;
}
if (svc->srv_flags & SVC_EVENT) {
- struct ptlrpc_request request;
- void *start;
svc->srv_flags = SVC_RUNNING;
-
- /* FIXME: If we move to an event-driven model,
- * we should put the request on the stack of
- * mds_handle instead. */
- start = svc->srv_ev.mem_desc.start;
- memset(&request, 0, sizeof(request));
- request.rq_obd = obddev;
- request.rq_reqbuf = (svc->srv_ev.mem_desc.start +
- svc->srv_ev.offset);
- request.rq_reqlen = svc->srv_ev.mem_desc.length;
- request.rq_xid = svc->srv_ev.match_bits;
- CDEBUG(D_NET, "got req %d\n", request.rq_xid);
-
- request.rq_peer.peer_nid = svc->srv_ev.initiator.nid;
- /* FIXME: this NI should be the incoming NI.
- * We don't know how to find that from here. */
- request.rq_peer.peer_ni = svc->srv_self.peer_ni;
- svc->srv_flags &= ~SVC_EVENT;
-
- spin_unlock(&svc->srv_lock);
- rc = svc->srv_handler(obddev, svc, &request);
- ptl_handled_rpc(svc, start);
+ rc = handle_incoming_request(obddev, svc);
continue;
}
setup_portals
setup_lustre
+read
new_fs ext2 /tmp/ost 10000
OST=$LOOPDEV