#endif
#include <linux/obd_support.h>
+#include <linux/obd_class.h>
#include <linux/lustre_net.h>
return (msg->magic == __swab32(PTLRPC_MSG_MAGIC));
}
-static int lustre_pack_msg(int count, int *lens, char **bufs, int *len,
- struct lustre_msg **msg)
+static void
+lustre_init_msg (struct lustre_msg *msg, int count, int *lens, char **bufs)
{
char *ptr;
- struct lustre_msg *m;
- int size = 0, i;
-
- size = HDR_SIZE (count);
+ int i;
+
+ msg->magic = PTLRPC_MSG_MAGIC;
+ msg->version = PTLRPC_MSG_VERSION;
+ msg->bufcount = count;
for (i = 0; i < count; i++)
- size += size_round(lens[i]);
-
- *len = size;
+ msg->buflens[i] = lens[i];
- OBD_ALLOC(*msg, *len);
- if (!*msg)
- RETURN(-ENOMEM);
-
- m = *msg;
- m->magic = PTLRPC_MSG_MAGIC;
- m->version = PTLRPC_MSG_VERSION;
- m->bufcount = count;
- for (i = 0; i < count; i++)
- m->buflens[i] = lens[i];
+ if (bufs == NULL)
+ return;
- ptr = (char *)m + HDR_SIZE(count);
+ ptr = (char *)msg + HDR_SIZE(count);
for (i = 0; i < count; i++) {
- char *tmp = NULL;
- if (bufs)
- tmp = bufs[i];
+ char *tmp = bufs[i];
LOGL(tmp, lens[i], ptr);
-
}
+}
+
+int lustre_pack_request (struct ptlrpc_request *req,
+ int count, int *lens, char **bufs)
+{
+ ENTRY;
+
+ req->rq_reqlen = lustre_msg_size (count, lens);
+ OBD_ALLOC(req->rq_reqmsg, req->rq_reqlen);
+ if (req->rq_reqmsg == NULL)
+ RETURN(-ENOMEM);
- return 0;
+ lustre_init_msg (req->rq_reqmsg, count, lens, bufs);
+ RETURN (0);
}
-int lustre_pack_request(struct ptlrpc_request *req, int count, int *lens,
- char **bufs)
+#if RS_DEBUG
+LIST_HEAD(ptlrpc_rs_debug_lru);
+spinlock_t ptlrpc_rs_debug_lock = SPIN_LOCK_UNLOCKED;
+
+#define PTLRPC_RS_DEBUG_LRU_ADD(rs) \
+do { \
+ unsigned long __flags; \
+ \
+ spin_lock_irqsave(&ptlrpc_rs_debug_lock, __flags); \
+ list_add_tail(&(rs)->rs_debug_list, &ptlrpc_rs_debug_lru); \
+ spin_unlock_irqrestore(&ptlrpc_rs_debug_lock, __flags); \
+} while (0)
+
+#define PTLRPC_RS_DEBUG_LRU_DEL(rs) \
+do { \
+ unsigned long __flags; \
+ \
+ spin_lock_irqsave(&ptlrpc_rs_debug_lock, __flags); \
+ list_del(&(rs)->rs_debug_list); \
+ spin_unlock_irqrestore(&ptlrpc_rs_debug_lock, __flags); \
+} while (0)
+#else
+# define PTLRPC_RS_DEBUG_LRU_ADD(rs) do {} while(0)
+# define PTLRPC_RS_DEBUG_LRU_DEL(rs) do {} while(0)
+#endif
+
+int lustre_pack_reply (struct ptlrpc_request *req,
+ int count, int *lens, char **bufs)
{
- return lustre_pack_msg(count, lens, bufs, &req->rq_reqlen,
- &req->rq_reqmsg);
+ struct ptlrpc_reply_state *rs;
+ int msg_len;
+ int size;
+ ENTRY;
+
+ LASSERT (req->rq_reply_state == NULL);
+
+ msg_len = lustre_msg_size (count, lens);
+ size = offsetof (struct ptlrpc_reply_state, rs_msg) + msg_len;
+ OBD_ALLOC (rs, size);
+ if (rs == NULL)
+ RETURN (-ENOMEM);
+
+ rs->rs_cb_id.cbid_fn = reply_out_callback;
+ rs->rs_cb_id.cbid_arg = rs;
+ rs->rs_srv_ni = req->rq_rqbd->rqbd_srv_ni;
+ rs->rs_size = size;
+ INIT_LIST_HEAD(&rs->rs_exp_list);
+ INIT_LIST_HEAD(&rs->rs_obd_list);
+
+ req->rq_replen = msg_len;
+ req->rq_reply_state = rs;
+ req->rq_repmsg = &rs->rs_msg;
+ lustre_init_msg (&rs->rs_msg, count, lens, bufs);
+
+ PTLRPC_RS_DEBUG_LRU_ADD(rs);
+
+ RETURN (0);
}
-int lustre_pack_reply(struct ptlrpc_request *req, int count, int *lens,
- char **bufs)
+void lustre_free_reply_state (struct ptlrpc_reply_state *rs)
{
- return lustre_pack_msg(count, lens, bufs, &req->rq_replen,
- &req->rq_repmsg);
+ PTLRPC_RS_DEBUG_LRU_DEL(rs);
+
+ LASSERT (!rs->rs_difficult || rs->rs_handled);
+ LASSERT (!rs->rs_on_net);
+ LASSERT (!rs->rs_scheduled);
+ LASSERT (rs->rs_export == NULL);
+ LASSERT (rs->rs_nlocks == 0);
+ LASSERT (list_empty(&rs->rs_exp_list));
+ LASSERT (list_empty(&rs->rs_obd_list));
+
+ OBD_FREE (rs, rs->rs_size);
}
/* This returns the size of the buffer that is required to hold a lustre_msg