* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2012, Intel Corporation.
+ * Copyright (c) 2012, 2015, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
SRPC_STATE_STOPPING,
} srpc_state_t;
-struct smoketest_rpc {
+static struct smoketest_rpc {
spinlock_t rpc_glock; /* global lock */
srpc_service_t *rpc_services[SRPC_SERVICE_MAX_ID + 1];
lnet_handle_eq_t rpc_lnet_eq; /* _the_ LNet event queue */
spin_unlock(&srpc_data.rpc_glock);
}
-int
-srpc_add_bulk_page(srpc_bulk_t *bk, cfs_page_t *pg, int i, int nob)
+static int
+srpc_add_bulk_page(srpc_bulk_t *bk, struct page *pg, int i, int nob)
{
- nob = min(nob, (int)CFS_PAGE_SIZE);
+ nob = min(nob, (int)PAGE_CACHE_SIZE);
LASSERT(nob > 0);
LASSERT(i >= 0 && i < bk->bk_niov);
-#ifdef __KERNEL__
bk->bk_iovs[i].kiov_offset = 0;
bk->bk_iovs[i].kiov_page = pg;
bk->bk_iovs[i].kiov_len = nob;
-#else
- LASSERT(bk->bk_pages != NULL);
-
- bk->bk_pages[i] = pg;
- bk->bk_iovs[i].iov_len = nob;
- bk->bk_iovs[i].iov_base = cfs_page_address(pg);
-#endif
return nob;
}
srpc_free_bulk (srpc_bulk_t *bk)
{
int i;
- cfs_page_t *pg;
+ struct page *pg;
LASSERT (bk != NULL);
-#ifndef __KERNEL__
- LASSERT (bk->bk_pages != NULL);
-#endif
for (i = 0; i < bk->bk_niov; i++) {
-#ifdef __KERNEL__
pg = bk->bk_iovs[i].kiov_page;
-#else
- pg = bk->bk_pages[i];
-#endif
if (pg == NULL) break;
- cfs_free_page(pg);
+ __free_page(pg);
}
-#ifndef __KERNEL__
- LIBCFS_FREE(bk->bk_pages, sizeof(cfs_page_t *) * bk->bk_niov);
-#endif
LIBCFS_FREE(bk, offsetof(srpc_bulk_t, bk_iovs[bk->bk_niov]));
return;
}
srpc_alloc_bulk(int cpt, unsigned bulk_npg, unsigned bulk_len, int sink)
{
srpc_bulk_t *bk;
- cfs_page_t **pages;
int i;
LASSERT(bulk_npg > 0 && bulk_npg <= LNET_MAX_IOV);
bk->bk_sink = sink;
bk->bk_len = bulk_len;
bk->bk_niov = bulk_npg;
-#ifndef __KERNEL__
- LIBCFS_CPT_ALLOC(pages, lnet_cpt_table(), cpt,
- sizeof(cfs_page_t *) * bulk_npg);
- if (pages == NULL) {
- LIBCFS_FREE(bk, offsetof(srpc_bulk_t, bk_iovs[bulk_npg]));
- CERROR("Can't allocate page array for %d pages\n", bulk_npg);
- return NULL;
- }
-
- memset(pages, 0, sizeof(cfs_page_t *) * bulk_npg);
- bk->bk_pages = pages;
-#else
- UNUSED(pages);
-#endif
for (i = 0; i < bulk_npg; i++) {
- cfs_page_t *pg;
+ struct page *pg;
int nob;
- pg = cfs_page_cpt_alloc(lnet_cpt_table(), cpt, CFS_ALLOC_STD);
+ pg = cfs_page_cpt_alloc(lnet_cpt_table(), cpt, GFP_IOFS);
if (pg == NULL) {
CERROR("Can't allocate page %d of %d\n", i, bulk_npg);
srpc_free_bulk(bk);
return id;
}
-void
+static void
srpc_init_server_rpc(struct srpc_server_rpc *rpc,
struct srpc_service_cd *scd,
struct srpc_buffer *buffer)
struct srpc_service_cd *scd;
struct srpc_server_rpc *rpc;
struct srpc_buffer *buf;
- cfs_list_t *q;
+ struct list_head *q;
int i;
if (svc->sv_cpt_data == NULL)
cfs_percpt_for_each(scd, i, svc->sv_cpt_data) {
while (1) {
- if (!cfs_list_empty(&scd->scd_buf_posted))
+ if (!list_empty(&scd->scd_buf_posted))
q = &scd->scd_buf_posted;
- else if (!cfs_list_empty(&scd->scd_buf_blocked))
+ else if (!list_empty(&scd->scd_buf_blocked))
q = &scd->scd_buf_blocked;
else
break;
- while (!cfs_list_empty(q)) {
- buf = cfs_list_entry(q->next,
+ while (!list_empty(q)) {
+ buf = list_entry(q->next,
struct srpc_buffer,
buf_list);
- cfs_list_del(&buf->buf_list);
+ list_del(&buf->buf_list);
LIBCFS_FREE(buf, sizeof(*buf));
}
}
- LASSERT(cfs_list_empty(&scd->scd_rpc_active));
+ LASSERT(list_empty(&scd->scd_rpc_active));
- while (!cfs_list_empty(&scd->scd_rpc_free)) {
- rpc = cfs_list_entry(scd->scd_rpc_free.next,
+ while (!list_empty(&scd->scd_rpc_free)) {
+ rpc = list_entry(scd->scd_rpc_free.next,
struct srpc_server_rpc,
srpc_list);
- cfs_list_del(&rpc->srpc_list);
+ list_del(&rpc->srpc_list);
LIBCFS_FREE(rpc, sizeof(*rpc));
}
}
scd->scd_cpt = i;
scd->scd_svc = svc;
spin_lock_init(&scd->scd_lock);
- CFS_INIT_LIST_HEAD(&scd->scd_rpc_free);
- CFS_INIT_LIST_HEAD(&scd->scd_rpc_active);
- CFS_INIT_LIST_HEAD(&scd->scd_buf_posted);
- CFS_INIT_LIST_HEAD(&scd->scd_buf_blocked);
+ INIT_LIST_HEAD(&scd->scd_rpc_free);
+ INIT_LIST_HEAD(&scd->scd_rpc_active);
+ INIT_LIST_HEAD(&scd->scd_buf_posted);
+ INIT_LIST_HEAD(&scd->scd_buf_blocked);
scd->scd_ev.ev_data = scd;
scd->scd_ev.ev_type = SRPC_REQUEST_RCVD;
srpc_service_fini(svc);
return -ENOMEM;
}
- cfs_list_add(&rpc->srpc_list, &scd->scd_rpc_free);
+ list_add(&rpc->srpc_list, &scd->scd_rpc_free);
}
}
return 0;
}
-int
+static int
srpc_post_passive_rdma(int portal, int local, __u64 matchbits, void *buf,
int len, int options, lnet_process_id_t peer,
lnet_handle_md_t *mdh, srpc_event_t *ev)
return 0;
}
-int
+static int
srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len,
int options, lnet_process_id_t peer, lnet_nid_t self,
lnet_handle_md_t *mdh, srpc_event_t *ev)
return 0;
}
-int
-srpc_post_active_rqtbuf(lnet_process_id_t peer, int service, void *buf,
- int len, lnet_handle_md_t *mdh, srpc_event_t *ev)
-{
- return srpc_post_active_rdma(srpc_serv_portal(service), service,
- buf, len, LNET_MD_OP_PUT, peer,
- LNET_NID_ANY, mdh, ev);
-}
-
-int
+static int
srpc_post_passive_rqtbuf(int service, int local, void *buf, int len,
lnet_handle_md_t *mdh, srpc_event_t *ev)
{
LNET_MD_OP_PUT, any, mdh, ev);
}
-int
+static int
srpc_service_post_buffer(struct srpc_service_cd *scd, struct srpc_buffer *buf)
+__must_hold(&scd->scd_lock)
{
struct srpc_service *sv = scd->scd_svc;
struct srpc_msg *msg = &buf->buf_msg;
int rc;
LNetInvalidateHandle(&buf->buf_mdh);
- cfs_list_add(&buf->buf_list, &scd->scd_buf_posted);
+ list_add(&buf->buf_list, &scd->scd_buf_posted);
scd->scd_buf_nposted++;
spin_unlock(&scd->scd_lock);
if (sv->sv_shuttingdown)
return rc; /* don't allow to change scd_buf_posted */
- cfs_list_del(&buf->buf_list);
+ list_del(&buf->buf_list);
spin_unlock(&scd->scd_lock);
LIBCFS_FREE(buf, sizeof(*buf));
}
if (scd->scd_buf_nposted > 0) {
- CDEBUG(D_NET, "waiting for %d posted buffers to unlink",
- scd->scd_buf_nposted);
+ CDEBUG(D_NET, "waiting for %d posted buffers to "
+ "unlink\n", scd->scd_buf_nposted);
spin_unlock(&scd->scd_lock);
return 0;
}
- if (cfs_list_empty(&scd->scd_rpc_active)) {
+ if (list_empty(&scd->scd_rpc_active)) {
spin_unlock(&scd->scd_lock);
continue;
}
- rpc = cfs_list_entry(scd->scd_rpc_active.next,
+ rpc = list_entry(scd->scd_rpc_active.next,
struct srpc_server_rpc, srpc_list);
CNETERR("Active RPC %p on shutdown: sv %s, peer %s, "
"wi %s scheduled %d running %d, "
}
/* called with sv->sv_lock held */
-void
+static void
srpc_service_recycle_buffer(struct srpc_service_cd *scd, srpc_buffer_t *buf)
+__must_hold(&scd->scd_lock)
{
if (!scd->scd_svc->sv_shuttingdown && scd->scd_buf_adjust >= 0) {
if (srpc_service_post_buffer(scd, buf) != 0) {
/* schedule in-flight RPCs to notice the abort, NB:
* racing with incoming RPCs; complete fix should make test
* RPCs carry session ID in its headers */
- cfs_list_for_each_entry(rpc, &scd->scd_rpc_active, srpc_list) {
+ list_for_each_entry(rpc, &scd->scd_rpc_active, srpc_list) {
rpc->srpc_aborted = 1;
swi_schedule_workitem(&rpc->srpc_wi);
}
spin_lock(&scd->scd_lock);
/* schedule in-flight RPCs to notice the shutdown */
- cfs_list_for_each_entry(rpc, &scd->scd_rpc_active, srpc_list)
+ list_for_each_entry(rpc, &scd->scd_rpc_active, srpc_list)
swi_schedule_workitem(&rpc->srpc_wi);
spin_unlock(&scd->scd_lock);
/* OK to traverse scd_buf_posted without lock, since no one
* touches scd_buf_posted now */
- cfs_list_for_each_entry(buf, &scd->scd_buf_posted, buf_list)
+ list_for_each_entry(buf, &scd->scd_buf_posted, buf_list)
LNetMDUnlink(buf->buf_mdh);
}
}
-int
+static int
srpc_send_request (srpc_client_rpc_t *rpc)
{
srpc_event_t *ev = &rpc->crpc_reqstev;
ev->ev_data = rpc;
ev->ev_type = SRPC_REQUEST_SENT;
- rc = srpc_post_active_rqtbuf(rpc->crpc_dest, rpc->crpc_service,
- &rpc->crpc_reqstmsg, sizeof(srpc_msg_t),
- &rpc->crpc_reqstmdh, ev);
+ rc = srpc_post_active_rdma(srpc_serv_portal(rpc->crpc_service),
+ rpc->crpc_service, &rpc->crpc_reqstmsg,
+ sizeof(srpc_msg_t), LNET_MD_OP_PUT,
+ rpc->crpc_dest, LNET_NID_ANY,
+ &rpc->crpc_reqstmdh, ev);
if (rc != 0) {
LASSERT (rc == -ENOMEM);
ev->ev_fired = 1; /* no more event expected */
return rc;
}
-int
+static int
srpc_prepare_reply (srpc_client_rpc_t *rpc)
{
srpc_event_t *ev = &rpc->crpc_replyev;
return rc;
}
-int
+static int
srpc_prepare_bulk (srpc_client_rpc_t *rpc)
{
srpc_bulk_t *bk = &rpc->crpc_bulk;
if (bk->bk_niov == 0) return 0; /* nothing to do */
opt = bk->bk_sink ? LNET_MD_OP_PUT : LNET_MD_OP_GET;
-#ifdef __KERNEL__
opt |= LNET_MD_KIOV;
-#else
- opt |= LNET_MD_IOVEC;
-#endif
ev->ev_fired = 0;
ev->ev_data = rpc;
return rc;
}
-int
+static int
srpc_do_bulk (srpc_server_rpc_t *rpc)
{
srpc_event_t *ev = &rpc->srpc_ev;
LASSERT (bk != NULL);
opt = bk->bk_sink ? LNET_MD_OP_GET : LNET_MD_OP_PUT;
-#ifdef __KERNEL__
opt |= LNET_MD_KIOV;
-#else
- opt |= LNET_MD_IOVEC;
-#endif
ev->ev_fired = 0;
ev->ev_data = rpc;
}
/* only called from srpc_handle_rpc */
-void
+static void
srpc_server_rpc_done(srpc_server_rpc_t *rpc, int status)
{
struct srpc_service_cd *scd = rpc->srpc_scd;
rpc->srpc_reqstbuf = NULL;
}
- cfs_list_del(&rpc->srpc_list); /* from scd->scd_rpc_active */
+ list_del(&rpc->srpc_list); /* from scd->scd_rpc_active */
/*
* No one can schedule me now since:
LASSERT(rpc->srpc_ev.ev_fired);
swi_exit_workitem(&rpc->srpc_wi);
- if (!sv->sv_shuttingdown && !cfs_list_empty(&scd->scd_buf_blocked)) {
- buffer = cfs_list_entry(scd->scd_buf_blocked.next,
+ if (!sv->sv_shuttingdown && !list_empty(&scd->scd_buf_blocked)) {
+ buffer = list_entry(scd->scd_buf_blocked.next,
srpc_buffer_t, buf_list);
- cfs_list_del(&buffer->buf_list);
+ list_del(&buffer->buf_list);
srpc_init_server_rpc(rpc, scd, buffer);
- cfs_list_add_tail(&rpc->srpc_list, &scd->scd_rpc_active);
+ list_add_tail(&rpc->srpc_list, &scd->scd_rpc_active);
swi_schedule_workitem(&rpc->srpc_wi);
} else {
- cfs_list_add(&rpc->srpc_list, &scd->scd_rpc_free);
+ list_add(&rpc->srpc_list, &scd->scd_rpc_free);
}
spin_unlock(&scd->scd_lock);
return 0;
}
-void
+static void
srpc_client_rpc_expired (void *data)
{
srpc_client_rpc_t *rpc = data;
spin_unlock(&srpc_data.rpc_glock);
}
-inline void
-srpc_add_client_rpc_timer (srpc_client_rpc_t *rpc)
+static void
+srpc_add_client_rpc_timer(srpc_client_rpc_t *rpc)
{
- stt_timer_t *timer = &rpc->crpc_timer;
+ stt_timer_t *timer = &rpc->crpc_timer;
- if (rpc->crpc_timeout == 0) return;
+ if (rpc->crpc_timeout == 0)
+ return;
- CFS_INIT_LIST_HEAD(&timer->stt_list);
- timer->stt_data = rpc;
- timer->stt_func = srpc_client_rpc_expired;
- timer->stt_expires = cfs_time_add(rpc->crpc_timeout,
- cfs_time_current_sec());
- stt_add_timer(timer);
- return;
+ INIT_LIST_HEAD(&timer->stt_list);
+ timer->stt_data = rpc;
+ timer->stt_func = srpc_client_rpc_expired;
+ timer->stt_expires = cfs_time_add(rpc->crpc_timeout,
+ cfs_time_current_sec());
+ stt_add_timer(timer);
+ return;
}
/*
*
* Upon exit the RPC expiry timer is not queued and the handler is not
* running on any CPU. */
-void
+static void
srpc_del_client_rpc_timer (srpc_client_rpc_t *rpc)
{
/* timer not planted or already exploded */
if (rpc->crpc_timeout == 0)
return;
- /* timer sucessfully defused */
+ /* timer successfully defused */
if (stt_del_timer(&rpc->crpc_timer))
return;
-#ifdef __KERNEL__
/* timer detonated, wait for it to explode */
while (rpc->crpc_timeout != 0) {
spin_unlock(&rpc->crpc_lock);
- cfs_schedule();
+ schedule();
spin_lock(&rpc->crpc_lock);
}
-#else
- LBUG(); /* impossible in single-threaded runtime */
-#endif
}
-void
+static void
srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status)
{
swi_workitem_t *wi = &rpc->crpc_wi;
}
/* when in kernel always called with LNET_LOCK() held, and in thread context */
-void
+static void
srpc_lnet_ev_handler(lnet_event_t *ev)
{
struct srpc_service_cd *scd;
- srpc_event_t *rpcev = ev->md.user_ptr;
- srpc_client_rpc_t *crpc;
- srpc_server_rpc_t *srpc;
- srpc_buffer_t *buffer;
- srpc_service_t *sv;
- srpc_msg_t *msg;
- srpc_msg_type_t type;
+ srpc_event_t *rpcev = ev->md.user_ptr;
+ srpc_client_rpc_t *crpc;
+ srpc_server_rpc_t *srpc;
+ srpc_buffer_t *buffer;
+ srpc_service_t *sv;
+ srpc_msg_t *msg;
+ srpc_msg_type_t type;
- LASSERT (!cfs_in_interrupt());
+ LASSERT (!in_interrupt());
+
+ if (ev->status != 0) {
+ __u32 errors;
- if (ev->status != 0) {
spin_lock(&srpc_data.rpc_glock);
- srpc_data.rpc_counters.errors++;
+ if (ev->status != -ECANCELED) /* cancellation is not error */
+ srpc_data.rpc_counters.errors++;
+ errors = srpc_data.rpc_counters.errors;
spin_unlock(&srpc_data.rpc_glock);
- }
+
+ CNETERR("LNet event status %d type %d, RPC errors %u\n",
+ ev->status, ev->type, errors);
+ }
rpcev->ev_lnet = ev->type;
swi_schedule_workitem(&scd->scd_buf_wi);
}
- cfs_list_del(&buffer->buf_list); /* from scd->scd_buf_posted */
- msg = &buffer->buf_msg;
- type = srpc_service2request(sv->sv_id);
+ list_del(&buffer->buf_list); /* from scd->scd_buf_posted */
+ msg = &buffer->buf_msg;
+ type = srpc_service2request(sv->sv_id);
if (ev->status != 0 || ev->mlength != sizeof(*msg) ||
(msg->msg_type != type &&
msg->msg_magic = 0;
}
- if (!cfs_list_empty(&scd->scd_rpc_free)) {
- srpc = cfs_list_entry(scd->scd_rpc_free.next,
- struct srpc_server_rpc,
- srpc_list);
- cfs_list_del(&srpc->srpc_list);
+ if (!list_empty(&scd->scd_rpc_free)) {
+ srpc = list_entry(scd->scd_rpc_free.next,
+ struct srpc_server_rpc,
+ srpc_list);
+ list_del(&srpc->srpc_list);
srpc_init_server_rpc(srpc, scd, buffer);
- cfs_list_add_tail(&srpc->srpc_list,
- &scd->scd_rpc_active);
+ list_add_tail(&srpc->srpc_list,
+ &scd->scd_rpc_active);
swi_schedule_workitem(&srpc->srpc_wi);
} else {
- cfs_list_add_tail(&buffer->buf_list,
- &scd->scd_buf_blocked);
+ list_add_tail(&buffer->buf_list,
+ &scd->scd_buf_blocked);
}
spin_unlock(&scd->scd_lock);
}
}
-#ifndef __KERNEL__
-
-int
-srpc_check_event (int timeout)
-{
- lnet_event_t ev;
- int rc;
- int i;
-
- rc = LNetEQPoll(&srpc_data.rpc_lnet_eq, 1,
- timeout * 1000, &ev, &i);
- if (rc == 0) return 0;
-
- LASSERT (rc == -EOVERFLOW || rc == 1);
-
- /* We can't affort to miss any events... */
- if (rc == -EOVERFLOW) {
- CERROR ("Dropped an event!!!\n");
- abort();
- }
-
- srpc_lnet_ev_handler(&ev);
- return 1;
-}
-
-#endif
int
srpc_startup (void)
memset(&srpc_data, 0, sizeof(struct smoketest_rpc));
spin_lock_init(&srpc_data.rpc_glock);
- /* 1 second pause to avoid timestamp reuse */
- cfs_pause(cfs_time_seconds(1));
- srpc_data.rpc_matchbits = ((__u64) cfs_time_current_sec()) << 48;
+ /* 1 second pause to avoid timestamp reuse */
+ set_current_state(TASK_UNINTERRUPTIBLE);
+ schedule_timeout(cfs_time_seconds(1));
+ srpc_data.rpc_matchbits = ((__u64) cfs_time_current_sec()) << 48;
srpc_data.rpc_state = SRPC_STATE_NONE;
-#ifdef __KERNEL__
- rc = LNetNIInit(LUSTRE_SRV_LNET_PID);
-#else
- if (the_lnet.ln_server_mode_flag)
- rc = LNetNIInit(LUSTRE_SRV_LNET_PID);
- else
- rc = LNetNIInit(getpid() | LNET_PID_USERFLAG);
-#endif
+ rc = LNetNIInit(LNET_PID_LUSTRE);
if (rc < 0) {
CERROR ("LNetNIInit() has failed: %d\n", rc);
return rc;
srpc_data.rpc_state = SRPC_STATE_NI_INIT;
LNetInvalidateHandle(&srpc_data.rpc_lnet_eq);
-#ifdef __KERNEL__
rc = LNetEQAlloc(0, srpc_lnet_ev_handler, &srpc_data.rpc_lnet_eq);
-#else
- rc = LNetEQAlloc(10240, LNET_EQ_HANDLER_NONE, &srpc_data.rpc_lnet_eq);
-#endif
if (rc != 0) {
CERROR("LNetEQAlloc() has failed: %d\n", rc);
goto bail;