* associated with it. If an event handler exists, it will be run for each
* event that is deposited into the EQ.
*
- * In addition to the struct lnet_handle_eq, the LNet API defines two types
+ * In addition to the struct lnet_eq, the LNet API defines two types
* associated with events: The ::lnet_event_kind defines the kinds of events
* that can be stored in an EQ. The struct lnet_event defines a structure that
* holds the information about with an event.
* releases these resources and free the EQ. LNetEQPoll() can be used
* to test or wait on multiple EQs.
* @{ */
-int LNetEQAlloc(unsigned int count_in,
- lnet_eq_handler_t handler,
- struct lnet_handle_eq *handle_out);
+struct lnet_eq *
+LNetEQAlloc(unsigned int count_in,
+ lnet_eq_handler_t handler);
-int LNetEQFree(struct lnet_handle_eq eventq_in);
+int LNetEQFree(struct lnet_eq *eventq_in);
-int LNetEQPoll(struct lnet_handle_eq *eventqs_in,
- int neq_in,
- signed long timeout,
+int LNetEQPoll(struct lnet_eq **eventqs_in,
+ int neq_in,
+ signed long timeout,
struct lnet_event *event_out,
- int *which_eq_out);
+ int *which_eq_out);
/** @} lnet_eq */
/** \defgroup lnet_data Data movement operations
}
static inline void
-lnet_eq2handle(struct lnet_handle_eq *handle, struct lnet_eq *eq)
-{
- if (eq == NULL) {
- LNetInvalidateEQHandle(handle);
- return;
- }
-
- handle->cookie = eq->eq_lh.lh_cookie;
-}
-
-static inline struct lnet_eq *
-lnet_handle2eq(struct lnet_handle_eq *handle)
-{
- /* ALWAYS called with resource lock held */
- struct lnet_libhandle *lh;
-
- lh = lnet_res_lh_lookup(&the_lnet.ln_eq_container, handle->cookie);
- if (lh == NULL)
- return NULL;
-
- return lh_entry(lh, struct lnet_eq, eq_lh);
-}
-
-static inline void
lnet_md2handle(struct lnet_handle_md *handle, struct lnet_libmd *md)
{
handle->cookie = md->md_lh.lh_cookie;
unsigned int len);
int lnet_send(lnet_nid_t nid, struct lnet_msg *msg, lnet_nid_t rtr_nid);
int lnet_send_ping(lnet_nid_t dest_nid, struct lnet_handle_md *mdh, int nnis,
- void *user_ptr, struct lnet_handle_eq eqh, bool recovery);
+ void *user_ptr, struct lnet_eq *eq, bool recovery);
void lnet_return_tx_credits_locked(struct lnet_msg *msg);
void lnet_return_rx_credits_locked(struct lnet_msg *msg);
void lnet_schedule_blocked_locked(struct lnet_rtrbufpool *rbp);
((type *)((char *)(ptr)-(char *)(&((type *)0)->member)))
struct lnet_eq {
- struct list_head eq_list;
- struct lnet_libhandle eq_lh;
unsigned long eq_enq_seq;
unsigned long eq_deq_seq;
unsigned int eq_size;
* ln_api_mutex.
*/
struct lnet_handle_md ln_ping_target_md;
- struct lnet_handle_eq ln_ping_target_eq;
+ struct lnet_eq *ln_ping_target_eq;
struct lnet_ping_buffer *ln_ping_target;
atomic_t ln_ping_target_seqno;
* buffer may linger a while after it has been unlinked, in
* which case the event handler cleans up.
*/
- struct lnet_handle_eq ln_push_target_eq;
+ struct lnet_eq *ln_push_target_eq;
struct lnet_handle_md ln_push_target_md;
struct lnet_ping_buffer *ln_push_target;
int ln_push_target_nnis;
/* discovery event queue handle */
- struct lnet_handle_eq ln_dc_eqh;
+ struct lnet_eq *ln_dc_eq;
/* discovery requests */
struct list_head ln_dc_request;
/* discovery working list */
*/
struct list_head **ln_mt_zombie_rstqs;
/* recovery eq handler */
- struct lnet_handle_eq ln_mt_eqh;
+ struct lnet_eq *ln_mt_eq;
/*
* Completed when the discovery and monitor threads can enter their
*/
#define LNET_WIRE_HANDLE_COOKIE_NONE (-1)
-struct lnet_handle_eq {
- __u64 cookie;
-};
-
-/**
- * Invalidate eq handle \a h.
- */
-static inline void LNetInvalidateEQHandle(struct lnet_handle_eq *h)
-{
- h->cookie = LNET_WIRE_HANDLE_COOKIE_NONE;
-}
-
-/**
- * Check whether eq handle \a h is invalid.
- *
- * \return 1 if handle is invalid, 0 if valid.
- */
-static inline int LNetEQHandleIsInvalid(struct lnet_handle_eq h)
-{
- return (LNET_WIRE_HANDLE_COOKIE_NONE == h.cookie);
-}
-
struct lnet_handle_md {
__u64 cookie;
};
void *user_ptr;
/**
* A handle for the event queue used to log the operations performed on
- * the memory region. If this argument is a NULL handle (i.e. nullified
- * by LNetInvalidateHandle()), operations performed on this memory
- * descriptor are not logged.
+ * the memory region. If this argument is a NULL handle operations
+ * performed on this memory descriptor are not logged.
*/
- struct lnet_handle_eq eq_handle;
+ struct lnet_eq *eq_handle;
/**
* The bulk MD handle which was registered to describe the buffers
* either to be used to transfer data to the peer or receive data
struct list_head *e = rec->rec_active.next;
list_del_init(e);
- if (rec->rec_type == LNET_COOKIE_TYPE_EQ) {
- lnet_eq_free(list_entry(e, struct lnet_eq, eq_list));
-
- } else if (rec->rec_type == LNET_COOKIE_TYPE_MD) {
+ if (rec->rec_type == LNET_COOKIE_TYPE_MD) {
lnet_md_free(list_entry(e, struct lnet_libmd, md_list));
} else { /* NB: Active MEs should be attached on portals */
INIT_LIST_HEAD(&the_lnet.ln_mt_localNIRecovq);
INIT_LIST_HEAD(&the_lnet.ln_mt_peerNIRecovq);
init_waitqueue_head(&the_lnet.ln_dc_waitq);
- LNetInvalidateEQHandle(&the_lnet.ln_mt_eqh);
+ the_lnet.ln_mt_eq = NULL;
init_completion(&the_lnet.ln_started);
rc = lnet_slab_setup();
the_lnet.ln_mt_zombie_rstqs = NULL;
}
- if (!LNetEQHandleIsInvalid(the_lnet.ln_mt_eqh)) {
- rc = LNetEQFree(the_lnet.ln_mt_eqh);
- LNetInvalidateEQHandle(&the_lnet.ln_mt_eqh);
+ if (the_lnet.ln_mt_eq) {
+ rc = LNetEQFree(the_lnet.ln_mt_eq);
+ the_lnet.ln_mt_eq = NULL;
LASSERT(rc == 0);
}
int rc, rc2;
if (set_eq) {
- rc = LNetEQAlloc(0, lnet_ping_target_event_handler,
- &the_lnet.ln_ping_target_eq);
- if (rc != 0) {
+ the_lnet.ln_ping_target_eq =
+ LNetEQAlloc(0, lnet_ping_target_event_handler);
+ if (IS_ERR(the_lnet.ln_ping_target_eq)) {
+ rc = PTR_ERR(the_lnet.ln_ping_target_eq);
CERROR("Can't allocate ping buffer EQ: %d\n", rc);
return rc;
}
if (the_lnet.ln_push_target)
return -EALREADY;
- rc = LNetEQAlloc(0, lnet_push_target_event_handler,
- &the_lnet.ln_push_target_eq);
- if (rc) {
+ the_lnet.ln_push_target_eq =
+ LNetEQAlloc(0, lnet_push_target_event_handler);
+ if (IS_ERR(the_lnet.ln_push_target_eq)) {
+ rc = PTR_ERR(the_lnet.ln_push_target_eq);
CERROR("Can't allocated push target EQ: %d\n", rc);
return rc;
}
if (rc) {
LNetEQFree(the_lnet.ln_push_target_eq);
- LNetInvalidateEQHandle(&the_lnet.ln_push_target_eq);
+ the_lnet.ln_push_target_eq = NULL;
}
return rc;
the_lnet.ln_push_target_nnis = 0;
LNetEQFree(the_lnet.ln_push_target_eq);
- LNetInvalidateEQHandle(&the_lnet.ln_push_target_eq);
+ the_lnet.ln_push_target_eq = NULL;
}
static int
lnet_ping_target_update(pbuf, ping_mdh);
- rc = LNetEQAlloc(0, lnet_mt_event_handler, &the_lnet.ln_mt_eqh);
- if (rc != 0) {
+ the_lnet.ln_mt_eq = LNetEQAlloc(0, lnet_mt_event_handler);
+ if (IS_ERR(the_lnet.ln_mt_eq)) {
+ rc = PTR_ERR(the_lnet.ln_mt_eq);
CERROR("Can't allocate monitor thread EQ: %d\n", rc);
goto err_stop_ping;
}
static int lnet_ping(struct lnet_process_id id, signed long timeout,
struct lnet_process_id __user *ids, int n_ids)
{
- struct lnet_handle_eq eqh;
+ struct lnet_eq *eq;
struct lnet_handle_md mdh;
struct lnet_event event;
struct lnet_md md = { NULL };
return -ENOMEM;
/* NB 2 events max (including any unlink event) */
- rc = LNetEQAlloc(2, LNET_EQ_HANDLER_NONE, &eqh);
- if (rc != 0) {
+ eq = LNetEQAlloc(2, LNET_EQ_HANDLER_NONE);
+ if (IS_ERR(eq)) {
+ rc = PTR_ERR(eq);
CERROR("Can't allocate EQ: %d\n", rc);
goto fail_ping_buffer_decref;
}
md.max_size = 0;
md.options = LNET_MD_TRUNCATE;
md.user_ptr = NULL;
- md.eq_handle = eqh;
+ md.eq_handle = eq;
rc = LNetMDBind(md, LNET_UNLINK, &mdh);
if (rc != 0) {
sigprocmask(SIG_SETMASK, &set, &blocked);
}
- rc2 = LNetEQPoll(&eqh, 1, timeout, &event, &which);
+ rc2 = LNetEQPoll(&eq, 1, timeout, &event, &which);
if (unlinked)
cfs_restore_sigs(blocked);
rc = pbuf->pb_info.pi_nnis;
fail_free_eq:
- rc2 = LNetEQFree(eqh);
+ rc2 = LNetEQFree(eq);
if (rc2 != 0)
CERROR("rc2 %d\n", rc2);
LASSERT(rc2 == 0);
* \param callback A handler function that runs when an event is deposited
* into the EQ. The constant value LNET_EQ_HANDLER_NONE can be used to
* indicate that no event handler is desired.
- * \param handle On successful return, this location will hold a handle for
- * the newly created EQ.
*
- * \retval 0 On success.
+ * \retval eq On successful return, the newly created EQ is returned.
+ * On failure, an error code encoded with ERR_PTR() is returned.
* \retval -EINVAL If an parameter is not valid.
* \retval -ENOMEM If memory for the EQ can't be allocated.
*
* \see lnet_eq_handler_t for the discussion on EQ handler semantics.
*/
-int
-LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback,
- struct lnet_handle_eq *handle)
+struct lnet_eq *
+LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback)
{
struct lnet_eq *eq;
/* count can be 0 if only need callback, we can eliminate
* overhead of enqueue event */
if (count == 0 && callback == LNET_EQ_HANDLER_NONE)
- return -EINVAL;
+ return ERR_PTR(-EINVAL);
eq = lnet_eq_alloc();
if (eq == NULL)
- return -ENOMEM;
+ return ERR_PTR(-ENOMEM);
if (count != 0) {
LIBCFS_ALLOC(eq->eq_events, count * sizeof(*eq->eq_events));
if (eq->eq_refs == NULL)
goto failed;
- /* MUST hold both exclusive lnet_res_lock */
- lnet_res_lock(LNET_LOCK_EX);
- /* NB: hold lnet_eq_wait_lock for EQ link/unlink, so we can do
- * both EQ lookup and poll event with only lnet_eq_wait_lock */
- lnet_eq_wait_lock();
-
- lnet_res_lh_initialize(&the_lnet.ln_eq_container, &eq->eq_lh);
- list_add(&eq->eq_list, &the_lnet.ln_eq_container.rec_active);
-
- lnet_eq_wait_unlock();
- lnet_res_unlock(LNET_LOCK_EX);
-
- lnet_eq2handle(handle, eq);
- return 0;
+ return eq;
failed:
if (eq->eq_events != NULL)
cfs_percpt_free(eq->eq_refs);
lnet_eq_free(eq);
- return -ENOMEM;
+ return ERR_PTR(-ENOMEM);
}
EXPORT_SYMBOL(LNetEQAlloc);
* Release the resources associated with an event queue if it's idle;
* otherwise do nothing and it's up to the user to try again.
*
- * \param eqh A handle for the event queue to be released.
+ * \param eq The event queue to be released.
*
* \retval 0 If the EQ is not in use and freed.
- * \retval -ENOENT If \a eqh does not point to a valid EQ.
* \retval -EBUSY If the EQ is still in use by some MDs.
*/
int
-LNetEQFree(struct lnet_handle_eq eqh)
+LNetEQFree(struct lnet_eq *eq)
{
- struct lnet_eq *eq;
struct lnet_event *events = NULL;
int **refs = NULL;
int *ref;
* both EQ lookup and poll event with only lnet_eq_wait_lock */
lnet_eq_wait_lock();
- eq = lnet_handle2eq(&eqh);
- if (eq == NULL) {
- rc = -ENOENT;
- goto out;
- }
-
cfs_percpt_for_each(ref, i, eq->eq_refs) {
LASSERT(*ref >= 0);
if (*ref == 0)
size = eq->eq_size;
refs = eq->eq_refs;
- lnet_res_lh_invalidate(&eq->eq_lh);
- list_del(&eq->eq_list);
lnet_eq_free(eq);
out:
lnet_eq_wait_unlock();
* LNetEQPoll() provides a timeout to allow applications to poll, block for a
* fixed period, or block indefinitely.
*
- * \param eventqs,neq An array of EQ handles, and size of the array.
+ * \param eventqs,neq An array of lnet_eq, and size of the array.
* \param timeout Time in jiffies to wait for an event to occur on
* one of the EQs. The constant MAX_SCHEDULE_TIMEOUT can be used to indicate an
* infinite timeout.
* \retval -ENOENT If there's an invalid handle in \a eventqs.
*/
int
-LNetEQPoll(struct lnet_handle_eq *eventqs, int neq, signed long timeout,
+LNetEQPoll(struct lnet_eq **eventqs, int neq, signed long timeout,
struct lnet_event *event, int *which)
{
int wait = 1;
for (;;) {
for (i = 0; i < neq; i++) {
- struct lnet_eq *eq = lnet_handle2eq(&eventqs[i]);
+ struct lnet_eq *eq = eventqs[i];
if (eq == NULL) {
lnet_eq_wait_unlock();
/* must be called with resource lock held */
static int
-lnet_md_link(struct lnet_libmd *md, struct lnet_handle_eq eq_handle, int cpt)
+lnet_md_link(struct lnet_libmd *md, struct lnet_eq *eq, int cpt)
{
struct lnet_res_container *container = the_lnet.ln_md_containers[cpt];
* maybe there we shouldn't even allow LNET_EQ_NONE!)
* LASSERT (eq == NULL);
*/
- if (!LNetEQHandleIsInvalid(eq_handle)) {
- md->md_eq = lnet_handle2eq(&eq_handle);
-
- if (md->md_eq == NULL)
- return -ENOENT;
-
+ if (eq) {
+ md->md_eq = eq;
(*md->md_eq->eq_refs[cpt])++;
}
umd->max_size = lmd->md_max_size;
umd->options = lmd->md_options;
umd->user_ptr = lmd->md_user_ptr;
- lnet_eq2handle(&umd->eq_handle, lmd->md_eq);
}
static int
ev_info->mt_type = MT_TYPE_LOCAL_NI;
ev_info->mt_nid = nid;
rc = lnet_send_ping(nid, &mdh, LNET_INTERFACES_MIN,
- ev_info, the_lnet.ln_mt_eqh, true);
+ ev_info, the_lnet.ln_mt_eq, true);
/* lookup the nid again */
lnet_net_lock(0);
ni = lnet_nid2ni_locked(nid, 0);
ev_info->mt_type = MT_TYPE_PEER_NI;
ev_info->mt_nid = nid;
rc = lnet_send_ping(nid, &mdh, LNET_INTERFACES_MIN,
- ev_info, the_lnet.ln_mt_eqh, true);
+ ev_info, the_lnet.ln_mt_eq, true);
lnet_net_lock(0);
/*
* lnet_find_peer_ni_locked() grabs a refcount for
int
lnet_send_ping(lnet_nid_t dest_nid,
struct lnet_handle_md *mdh, int nnis,
- void *user_data, struct lnet_handle_eq eqh, bool recovery)
+ void *user_data, struct lnet_eq *eq, bool recovery)
{
struct lnet_md md = { NULL };
struct lnet_process_id id;
md.max_size = 0;
md.options = LNET_MD_TRUNCATE;
md.user_ptr = user_data;
- md.eq_handle = eqh;
+ md.eq_handle = eq;
rc = LNetMDBind(md, LNET_UNLINK, mdh);
if (rc) {
lnet_clean_local_ni_recoveryq();
lnet_clean_peer_ni_recoveryq();
lnet_clean_resendqs();
- LNetInvalidateEQHandle(&the_lnet.ln_mt_eqh);
+ the_lnet.ln_mt_eq = NULL;
return rc;
clean_queues:
lnet_rsp_tracker_clean();
nnis = max(lp->lp_data_nnis, LNET_INTERFACES_MIN);
rc = lnet_send_ping(pnid, &lp->lp_ping_mdh, nnis, lp,
- the_lnet.ln_dc_eqh, false);
+ the_lnet.ln_dc_eq, false);
/*
* if LNetMDBind in lnet_send_ping fails we need to decrement the
md.threshold = 2; /* Put/Ack */
md.max_size = 0;
md.options = 0;
- md.eq_handle = the_lnet.ln_dc_eqh;
+ md.eq_handle = the_lnet.ln_dc_eq;
md.user_ptr = lp;
rc = LNetMDBind(md, LNET_UNLINK, &lp->lp_push_mdh);
}
lnet_net_unlock(LNET_LOCK_EX);
- LNetEQFree(the_lnet.ln_dc_eqh);
- LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
+ LNetEQFree(the_lnet.ln_dc_eq);
+ the_lnet.ln_dc_eq = NULL;
the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
wake_up(&the_lnet.ln_dc_waitq);
int lnet_peer_discovery_start(void)
{
struct task_struct *task;
- int rc;
+ int rc = 0;
if (the_lnet.ln_dc_state != LNET_DC_STATE_SHUTDOWN)
return -EALREADY;
- rc = LNetEQAlloc(0, lnet_discovery_event_handler, &the_lnet.ln_dc_eqh);
- if (rc != 0) {
+ the_lnet.ln_dc_eq = LNetEQAlloc(0, lnet_discovery_event_handler);
+ if (IS_ERR(the_lnet.ln_dc_eq)) {
+ rc = PTR_ERR(the_lnet.ln_dc_eq);
CERROR("Can't allocate discovery EQ: %d\n", rc);
return rc;
}
rc = PTR_ERR(task);
CERROR("Can't start peer discovery thread: %d\n", rc);
- LNetEQFree(the_lnet.ln_dc_eqh);
- LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
+ LNetEQFree(the_lnet.ln_dc_eq);
+ the_lnet.ln_dc_eq = NULL;
the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
}
static struct smoketest_rpc {
spinlock_t rpc_glock; /* global lock */
struct srpc_service *rpc_services[SRPC_SERVICE_MAX_ID + 1];
- struct lnet_handle_eq rpc_lnet_eq; /* _the_ LNet event queue */
+ struct lnet_eq *rpc_lnet_eq; /* _the_ LNet event queue */
enum srpc_state rpc_state;
struct srpc_counters rpc_counters;
__u64 rpc_matchbits; /* matchbits counter */
srpc_data.rpc_state = SRPC_STATE_NI_INIT;
- LNetInvalidateEQHandle(&srpc_data.rpc_lnet_eq);
- rc = LNetEQAlloc(0, srpc_lnet_ev_handler, &srpc_data.rpc_lnet_eq);
- if (rc != 0) {
+ srpc_data.rpc_lnet_eq = LNetEQAlloc(0, srpc_lnet_ev_handler);
+ if (IS_ERR(srpc_data.rpc_lnet_eq)) {
+ rc = PTR_ERR(srpc_data.rpc_lnet_eq);
CERROR("LNetEQAlloc() has failed: %d\n", rc);
goto bail;
}
/** @} nrs */
/* ptlrpc/events.c */
-extern struct lnet_handle_eq ptlrpc_eq_h;
+extern struct lnet_eq *ptlrpc_eq;
extern int ptlrpc_uuid_to_peer(struct obd_uuid *uuid,
struct lnet_process_id *peer, lnet_nid_t *self);
/**
#include <lustre_sec.h>
#include "ptlrpc_internal.h"
-struct lnet_handle_eq ptlrpc_eq_h;
+struct lnet_eq *ptlrpc_eq;
/*
* Client's outgoing request callback
* replies */
for (retries = 0;; retries++) {
- rc = LNetEQFree(ptlrpc_eq_h);
+ rc = LNetEQFree(ptlrpc_eq);
switch (rc) {
default:
LBUG();
* because we are guaranteed to get every event via callback,
* so we just set EQ size to 0 to avoid overhread of serializing
* enqueue/dequeue operations in LNet. */
- rc = LNetEQAlloc(0, ptlrpc_master_callback, &ptlrpc_eq_h);
- if (rc == 0)
- return 0;
+ ptlrpc_eq = LNetEQAlloc(0, ptlrpc_master_callback);
+ if (!IS_ERR(ptlrpc_eq))
+ return 0;
- CERROR ("Failed to allocate event queue: %d\n", rc);
- LNetNIFini();
+ rc = PTR_ERR(ptlrpc_eq);
+ CERROR("Failed to allocate event queue: %d\n", rc);
+ LNetNIFini();
return rc;
}
-
int ptlrpc_init_portals(void)
{
int rc = ptlrpc_ni_init();
md.threshold = (ack == LNET_ACK_REQ) ? 2 : 1;
md.options = PTLRPC_MD_OPTIONS;
md.user_ptr = cbid;
- md.eq_handle = ptlrpc_eq_h;
+ md.eq_handle = ptlrpc_eq;
LNetInvalidateMDHandle(&md.bulk_handle);
if (bulk_cookie) {
desc->bd_failure = 0;
md.user_ptr = &desc->bd_cbid;
- md.eq_handle = ptlrpc_eq_h;
+ md.eq_handle = ptlrpc_eq;
md.threshold = 2; /* SENT and ACK/REPLY */
for (posted_md = 0; posted_md < total_md; mbits++) {
desc->bd_last_mbits = mbits;
desc->bd_md_count = total_md;
md.user_ptr = &desc->bd_cbid;
- md.eq_handle = ptlrpc_eq_h;
+ md.eq_handle = ptlrpc_eq;
md.threshold = 1; /* PUT or GET */
for (posted_md = 0; posted_md < total_md; posted_md++, mbits++) {
reply_md.options = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT |
LNET_MD_MANAGE_REMOTE |
LNET_MD_TRUNCATE; /* allow to make EOVERFLOW error */;
- reply_md.user_ptr = &request->rq_reply_cbid;
- reply_md.eq_handle = ptlrpc_eq_h;
+ reply_md.user_ptr = &request->rq_reply_cbid;
+ reply_md.eq_handle = ptlrpc_eq;
/* We must see the unlink callback to set rq_reply_unlinked,
* so we can't auto-unlink */
return -ENOMEM;
}
- LASSERT(rqbd->rqbd_refcount == 0);
- rqbd->rqbd_refcount = 1;
+ LASSERT(rqbd->rqbd_refcount == 0);
+ rqbd->rqbd_refcount = 1;
- md.start = rqbd->rqbd_buffer;
- md.length = service->srv_buf_size;
- md.max_size = service->srv_max_req_size;
- md.threshold = LNET_MD_THRESH_INF;
- md.options = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT | LNET_MD_MAX_SIZE;
- md.user_ptr = &rqbd->rqbd_cbid;
- md.eq_handle = ptlrpc_eq_h;
+ md.start = rqbd->rqbd_buffer;
+ md.length = service->srv_buf_size;
+ md.max_size = service->srv_max_req_size;
+ md.threshold = LNET_MD_THRESH_INF;
+ md.options = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT | LNET_MD_MAX_SIZE;
+ md.user_ptr = &rqbd->rqbd_cbid;
+ md.eq_handle = ptlrpc_eq;
rc = LNetMDAttach(me, md, LNET_UNLINK, &rqbd->rqbd_md_h);
if (rc == 0)