/*
* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ *
+ * Copyright (c) 2012, 2014, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
SRPC_STATE_STOPPING,
} srpc_state_t;
-struct smoketest_rpc {
- cfs_spinlock_t rpc_glock; /* global lock */
- srpc_service_t *rpc_services[SRPC_SERVICE_MAX_ID + 1];
- lnet_handle_eq_t rpc_lnet_eq; /* _the_ LNet event queue */
- srpc_state_t rpc_state;
- srpc_counters_t rpc_counters;
- __u64 rpc_matchbits; /* matchbits counter */
+static struct smoketest_rpc {
+ spinlock_t rpc_glock; /* global lock */
+ srpc_service_t *rpc_services[SRPC_SERVICE_MAX_ID + 1];
+ lnet_handle_eq_t rpc_lnet_eq; /* _the_ LNet event queue */
+ srpc_state_t rpc_state;
+ srpc_counters_t rpc_counters;
+ __u64 rpc_matchbits; /* matchbits counter */
} srpc_data;
static inline int
void srpc_get_counters (srpc_counters_t *cnt)
{
- cfs_spin_lock(&srpc_data.rpc_glock);
- *cnt = srpc_data.rpc_counters;
- cfs_spin_unlock(&srpc_data.rpc_glock);
+ spin_lock(&srpc_data.rpc_glock);
+ *cnt = srpc_data.rpc_counters;
+ spin_unlock(&srpc_data.rpc_glock);
}
void srpc_set_counters (const srpc_counters_t *cnt)
{
- cfs_spin_lock(&srpc_data.rpc_glock);
- srpc_data.rpc_counters = *cnt;
- cfs_spin_unlock(&srpc_data.rpc_glock);
+ spin_lock(&srpc_data.rpc_glock);
+ srpc_data.rpc_counters = *cnt;
+ spin_unlock(&srpc_data.rpc_glock);
}
-void
-srpc_add_bulk_page (srpc_bulk_t *bk, cfs_page_t *pg, int i)
+static int
+srpc_add_bulk_page(srpc_bulk_t *bk, struct page *pg, int i, int nob)
{
- LASSERT (i >= 0 && i < bk->bk_niov);
+ nob = min(nob, (int)PAGE_CACHE_SIZE);
+
+ LASSERT(nob > 0);
+ LASSERT(i >= 0 && i < bk->bk_niov);
#ifdef __KERNEL__
- bk->bk_iovs[i].kiov_offset = 0;
- bk->bk_iovs[i].kiov_page = pg;
- bk->bk_iovs[i].kiov_len = CFS_PAGE_SIZE;
+ bk->bk_iovs[i].kiov_offset = 0;
+ bk->bk_iovs[i].kiov_page = pg;
+ bk->bk_iovs[i].kiov_len = nob;
#else
- LASSERT (bk->bk_pages != NULL);
+ LASSERT(bk->bk_pages != NULL);
- bk->bk_pages[i] = pg;
- bk->bk_iovs[i].iov_len = CFS_PAGE_SIZE;
- bk->bk_iovs[i].iov_base = cfs_page_address(pg);
+ bk->bk_pages[i] = pg;
+ bk->bk_iovs[i].iov_len = nob;
+ bk->bk_iovs[i].iov_base = page_address(pg);
#endif
- return;
+ return nob;
}
void
srpc_free_bulk (srpc_bulk_t *bk)
{
int i;
- cfs_page_t *pg;
+ struct page *pg;
LASSERT (bk != NULL);
#ifndef __KERNEL__
#endif
if (pg == NULL) break;
- cfs_free_page(pg);
+ __free_page(pg);
}
#ifndef __KERNEL__
- LIBCFS_FREE(bk->bk_pages, sizeof(cfs_page_t *) * bk->bk_niov);
+ LIBCFS_FREE(bk->bk_pages, sizeof(struct page *) * bk->bk_niov);
#endif
LIBCFS_FREE(bk, offsetof(srpc_bulk_t, bk_iovs[bk->bk_niov]));
return;
}
srpc_bulk_t *
-srpc_alloc_bulk(int cpt, int npages, int sink)
+srpc_alloc_bulk(int cpt, unsigned bulk_npg, unsigned bulk_len, int sink)
{
- srpc_bulk_t *bk;
- cfs_page_t **pages;
- int i;
+ srpc_bulk_t *bk;
+ int i;
- LASSERT (npages > 0 && npages <= LNET_MAX_IOV);
+ LASSERT(bulk_npg > 0 && bulk_npg <= LNET_MAX_IOV);
LIBCFS_CPT_ALLOC(bk, lnet_cpt_table(), cpt,
- offsetof(srpc_bulk_t, bk_iovs[npages]));
- if (bk == NULL) {
- CERROR ("Can't allocate descriptor for %d pages\n", npages);
- return NULL;
- }
+ offsetof(srpc_bulk_t, bk_iovs[bulk_npg]));
+ if (bk == NULL) {
+ CERROR("Can't allocate descriptor for %d pages\n", bulk_npg);
+ return NULL;
+ }
- memset(bk, 0, offsetof(srpc_bulk_t, bk_iovs[npages]));
- bk->bk_sink = sink;
- bk->bk_niov = npages;
- bk->bk_len = npages * CFS_PAGE_SIZE;
+ memset(bk, 0, offsetof(srpc_bulk_t, bk_iovs[bulk_npg]));
+ bk->bk_sink = sink;
+ bk->bk_len = bulk_len;
+ bk->bk_niov = bulk_npg;
#ifndef __KERNEL__
- LIBCFS_CPT_ALLOC(pages, lnet_cpt_table(), cpt,
- sizeof(cfs_page_t *) * npages);
- if (pages == NULL) {
- LIBCFS_FREE(bk, offsetof(srpc_bulk_t, bk_iovs[npages]));
- CERROR ("Can't allocate page array for %d pages\n", npages);
- return NULL;
- }
+ {
+ struct page **pages;
+
+ LIBCFS_CPT_ALLOC(pages, lnet_cpt_table(), cpt,
+ sizeof(struct page *) * bulk_npg);
+ if (pages == NULL) {
+ LIBCFS_FREE(bk, offsetof(srpc_bulk_t,
+ bk_iovs[bulk_npg]));
+ CERROR("Can't allocate page array for %d pages\n",
+ bulk_npg);
+ return NULL;
+ }
- memset(pages, 0, sizeof(cfs_page_t *) * npages);
- bk->bk_pages = pages;
-#else
- UNUSED (pages);
+ memset(pages, 0, sizeof(struct page *) * bulk_npg);
+ bk->bk_pages = pages;
+ }
#endif
- for (i = 0; i < npages; i++) {
- cfs_page_t *pg;
+ for (i = 0; i < bulk_npg; i++) {
+ struct page *pg;
+ int nob;
- pg = cfs_page_cpt_alloc(lnet_cpt_table(), cpt, CFS_ALLOC_STD);
- if (pg == NULL) {
- CERROR ("Can't allocate page %d of %d\n", i, npages);
- srpc_free_bulk(bk);
- return NULL;
- }
+ pg = cfs_page_cpt_alloc(lnet_cpt_table(), cpt, GFP_IOFS);
+ if (pg == NULL) {
+ CERROR("Can't allocate page %d of %d\n", i, bulk_npg);
+ srpc_free_bulk(bk);
+ return NULL;
+ }
- srpc_add_bulk_page(bk, pg, i);
- }
+ nob = srpc_add_bulk_page(bk, pg, i, bulk_len);
+ bulk_len -= nob;
+ }
- return bk;
+ return bk;
}
static inline __u64
srpc_next_id (void)
{
- __u64 id;
+ __u64 id;
- cfs_spin_lock(&srpc_data.rpc_glock);
- id = srpc_data.rpc_matchbits++;
- cfs_spin_unlock(&srpc_data.rpc_glock);
- return id;
+ spin_lock(&srpc_data.rpc_glock);
+ id = srpc_data.rpc_matchbits++;
+ spin_unlock(&srpc_data.rpc_glock);
+ return id;
}
-void
+static void
srpc_init_server_rpc(struct srpc_server_rpc *rpc,
struct srpc_service_cd *scd,
struct srpc_buffer *buffer)
struct srpc_service_cd *scd;
struct srpc_server_rpc *rpc;
struct srpc_buffer *buf;
- cfs_list_t *q;
+ struct list_head *q;
int i;
if (svc->sv_cpt_data == NULL)
cfs_percpt_for_each(scd, i, svc->sv_cpt_data) {
while (1) {
- if (!cfs_list_empty(&scd->scd_buf_posted))
+ if (!list_empty(&scd->scd_buf_posted))
q = &scd->scd_buf_posted;
- else if (!cfs_list_empty(&scd->scd_buf_blocked))
+ else if (!list_empty(&scd->scd_buf_blocked))
q = &scd->scd_buf_blocked;
else
break;
- while (!cfs_list_empty(q)) {
- buf = cfs_list_entry(q->next,
+ while (!list_empty(q)) {
+ buf = list_entry(q->next,
struct srpc_buffer,
buf_list);
- cfs_list_del(&buf->buf_list);
+ list_del(&buf->buf_list);
LIBCFS_FREE(buf, sizeof(*buf));
}
}
- LASSERT(cfs_list_empty(&scd->scd_rpc_active));
+ LASSERT(list_empty(&scd->scd_rpc_active));
- while (!cfs_list_empty(&scd->scd_rpc_free)) {
- rpc = cfs_list_entry(scd->scd_rpc_free.next,
+ while (!list_empty(&scd->scd_rpc_free)) {
+ rpc = list_entry(scd->scd_rpc_free.next,
struct srpc_server_rpc,
srpc_list);
- cfs_list_del(&rpc->srpc_list);
+ list_del(&rpc->srpc_list);
LIBCFS_FREE(rpc, sizeof(*rpc));
}
}
cfs_percpt_for_each(scd, i, svc->sv_cpt_data) {
scd->scd_cpt = i;
scd->scd_svc = svc;
- cfs_spin_lock_init(&scd->scd_lock);
- CFS_INIT_LIST_HEAD(&scd->scd_rpc_free);
- CFS_INIT_LIST_HEAD(&scd->scd_rpc_active);
- CFS_INIT_LIST_HEAD(&scd->scd_buf_posted);
- CFS_INIT_LIST_HEAD(&scd->scd_buf_blocked);
+ spin_lock_init(&scd->scd_lock);
+ INIT_LIST_HEAD(&scd->scd_rpc_free);
+ INIT_LIST_HEAD(&scd->scd_rpc_active);
+ INIT_LIST_HEAD(&scd->scd_buf_posted);
+ INIT_LIST_HEAD(&scd->scd_buf_blocked);
scd->scd_ev.ev_data = scd;
scd->scd_ev.ev_type = SRPC_REQUEST_RCVD;
srpc_service_fini(svc);
return -ENOMEM;
}
- cfs_list_add(&rpc->srpc_list, &scd->scd_rpc_free);
+ list_add(&rpc->srpc_list, &scd->scd_rpc_free);
}
}
if (srpc_service_init(sv) != 0)
return -ENOMEM;
- cfs_spin_lock(&srpc_data.rpc_glock);
+ spin_lock(&srpc_data.rpc_glock);
LASSERT(srpc_data.rpc_state == SRPC_STATE_RUNNING);
if (srpc_data.rpc_services[id] != NULL) {
- cfs_spin_unlock(&srpc_data.rpc_glock);
+ spin_unlock(&srpc_data.rpc_glock);
goto failed;
}
srpc_data.rpc_services[id] = sv;
- cfs_spin_unlock(&srpc_data.rpc_glock);
+ spin_unlock(&srpc_data.rpc_glock);
CDEBUG(D_NET, "Adding service: id %d, name %s\n", id, sv->sv_name);
return 0;
int
srpc_remove_service (srpc_service_t *sv)
{
- int id = sv->sv_id;
+ int id = sv->sv_id;
- cfs_spin_lock(&srpc_data.rpc_glock);
+ spin_lock(&srpc_data.rpc_glock);
- if (srpc_data.rpc_services[id] != sv) {
- cfs_spin_unlock(&srpc_data.rpc_glock);
- return -ENOENT;
- }
+ if (srpc_data.rpc_services[id] != sv) {
+ spin_unlock(&srpc_data.rpc_glock);
+ return -ENOENT;
+ }
- srpc_data.rpc_services[id] = NULL;
- cfs_spin_unlock(&srpc_data.rpc_glock);
- return 0;
+ srpc_data.rpc_services[id] = NULL;
+ spin_unlock(&srpc_data.rpc_glock);
+ return 0;
}
-int
+static int
srpc_post_passive_rdma(int portal, int local, __u64 matchbits, void *buf,
int len, int options, lnet_process_id_t peer,
lnet_handle_md_t *mdh, srpc_event_t *ev)
return 0;
}
-int
+static int
srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len,
int options, lnet_process_id_t peer, lnet_nid_t self,
lnet_handle_md_t *mdh, srpc_event_t *ev)
return 0;
}
-int
+static int
srpc_post_active_rqtbuf(lnet_process_id_t peer, int service, void *buf,
int len, lnet_handle_md_t *mdh, srpc_event_t *ev)
{
LNET_NID_ANY, mdh, ev);
}
-int
+static int
srpc_post_passive_rqtbuf(int service, int local, void *buf, int len,
lnet_handle_md_t *mdh, srpc_event_t *ev)
{
LNET_MD_OP_PUT, any, mdh, ev);
}
-int
+static int
srpc_service_post_buffer(struct srpc_service_cd *scd, struct srpc_buffer *buf)
+__must_hold(&scd->scd_lock)
{
struct srpc_service *sv = scd->scd_svc;
struct srpc_msg *msg = &buf->buf_msg;
int rc;
LNetInvalidateHandle(&buf->buf_mdh);
- cfs_list_add(&buf->buf_list, &scd->scd_buf_posted);
+ list_add(&buf->buf_list, &scd->scd_buf_posted);
scd->scd_buf_nposted++;
- cfs_spin_unlock(&scd->scd_lock);
+ spin_unlock(&scd->scd_lock);
rc = srpc_post_passive_rqtbuf(sv->sv_id,
!srpc_serv_is_framework(sv),
* msg and its event handler has been called. So we must add
* buf to scd_buf_posted _before_ dropping scd_lock */
- cfs_spin_lock(&scd->scd_lock);
+ spin_lock(&scd->scd_lock);
if (rc == 0) {
if (!sv->sv_shuttingdown)
return 0;
- cfs_spin_unlock(&scd->scd_lock);
+ spin_unlock(&scd->scd_lock);
/* srpc_shutdown_service might have tried to unlink me
* when my buf_mdh was still invalid */
LNetMDUnlink(buf->buf_mdh);
- cfs_spin_lock(&scd->scd_lock);
+ spin_lock(&scd->scd_lock);
return 0;
}
if (sv->sv_shuttingdown)
return rc; /* don't allow to change scd_buf_posted */
- cfs_list_del(&buf->buf_list);
- cfs_spin_unlock(&scd->scd_lock);
+ list_del(&buf->buf_list);
+ spin_unlock(&scd->scd_lock);
LIBCFS_FREE(buf, sizeof(*buf));
- cfs_spin_lock(&scd->scd_lock);
+ spin_lock(&scd->scd_lock);
return rc;
}
/* it's called by workitem scheduler threads, these threads
* should have been set CPT affinity, so buffers will be posted
* on CPT local list of Portal */
- cfs_spin_lock(&scd->scd_lock);
+ spin_lock(&scd->scd_lock);
while (scd->scd_buf_adjust > 0 &&
!scd->scd_svc->sv_shuttingdown) {
scd->scd_buf_adjust--; /* consume it */
scd->scd_buf_posting++;
- cfs_spin_unlock(&scd->scd_lock);
+ spin_unlock(&scd->scd_lock);
LIBCFS_ALLOC(buf, sizeof(*buf));
if (buf == NULL) {
CERROR("Failed to add new buf to service: %s\n",
scd->scd_svc->sv_name);
- cfs_spin_lock(&scd->scd_lock);
+ spin_lock(&scd->scd_lock);
rc = -ENOMEM;
break;
}
- cfs_spin_lock(&scd->scd_lock);
+ spin_lock(&scd->scd_lock);
if (scd->scd_svc->sv_shuttingdown) {
- cfs_spin_unlock(&scd->scd_lock);
+ spin_unlock(&scd->scd_lock);
LIBCFS_FREE(buf, sizeof(*buf));
- cfs_spin_lock(&scd->scd_lock);
+ spin_lock(&scd->scd_lock);
rc = -ESHUTDOWN;
break;
}
scd->scd_buf_posting--;
}
- cfs_spin_unlock(&scd->scd_lock);
+ spin_unlock(&scd->scd_lock);
return 0;
}
LASSERTF(nbuffer > 0, "nbuffer must be positive: %d\n", nbuffer);
cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
- cfs_spin_lock(&scd->scd_lock);
+ spin_lock(&scd->scd_lock);
scd->scd_buf_err = 0;
scd->scd_buf_err_stamp = 0;
scd->scd_buf_adjust = nbuffer;
/* start to post buffers */
swi_schedule_workitem(&scd->scd_buf_wi);
- cfs_spin_unlock(&scd->scd_lock);
+ spin_unlock(&scd->scd_lock);
/* framework service only post buffer for one partition */
if (srpc_serv_is_framework(sv))
}
cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
- cfs_spin_lock(&scd->scd_lock);
+ spin_lock(&scd->scd_lock);
/*
* NB: srpc_service_add_buffers() can be called inside
* thread context of lst_sched_serial, and we don't normally
if (scd->scd_buf_err != 0 && rc == 0)
rc = scd->scd_buf_err;
- cfs_spin_unlock(&scd->scd_lock);
+ spin_unlock(&scd->scd_lock);
}
return rc;
LASSERT(!sv->sv_shuttingdown);
cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
- cfs_spin_lock(&scd->scd_lock);
+ spin_lock(&scd->scd_lock);
num = scd->scd_buf_total + scd->scd_buf_posting;
scd->scd_buf_adjust -= min(nbuffer, num);
- cfs_spin_unlock(&scd->scd_lock);
+ spin_unlock(&scd->scd_lock);
}
}
LASSERT(sv->sv_shuttingdown); /* srpc_shutdown_service called */
cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
- cfs_spin_lock(&scd->scd_lock);
- if (!swi_deschedule_workitem(&scd->scd_buf_wi))
+ spin_lock(&scd->scd_lock);
+ if (!swi_deschedule_workitem(&scd->scd_buf_wi)) {
+ spin_unlock(&scd->scd_lock);
return 0;
+ }
if (scd->scd_buf_nposted > 0) {
- CDEBUG(D_NET, "waiting for %d posted buffers to unlink",
- scd->scd_buf_nposted);
- cfs_spin_unlock(&scd->scd_lock);
+ CDEBUG(D_NET, "waiting for %d posted buffers to "
+ "unlink\n", scd->scd_buf_nposted);
+ spin_unlock(&scd->scd_lock);
return 0;
}
- if (cfs_list_empty(&scd->scd_rpc_active)) {
- cfs_spin_unlock(&scd->scd_lock);
+ if (list_empty(&scd->scd_rpc_active)) {
+ spin_unlock(&scd->scd_lock);
continue;
}
- rpc = cfs_list_entry(scd->scd_rpc_active.next,
+ rpc = list_entry(scd->scd_rpc_active.next,
struct srpc_server_rpc, srpc_list);
CNETERR("Active RPC %p on shutdown: sv %s, peer %s, "
"wi %s scheduled %d running %d, "
rpc->srpc_wi.swi_workitem.wi_running,
rpc->srpc_ev.ev_fired, rpc->srpc_ev.ev_type,
rpc->srpc_ev.ev_status, rpc->srpc_ev.ev_lnet);
- cfs_spin_unlock(&scd->scd_lock);
+ spin_unlock(&scd->scd_lock);
return 0;
}
}
/* called with sv->sv_lock held */
-void
+static void
srpc_service_recycle_buffer(struct srpc_service_cd *scd, srpc_buffer_t *buf)
+__must_hold(&scd->scd_lock)
{
if (!scd->scd_svc->sv_shuttingdown && scd->scd_buf_adjust >= 0) {
if (srpc_service_post_buffer(scd, buf) != 0) {
}
}
- cfs_spin_unlock(&scd->scd_lock);
+ spin_unlock(&scd->scd_lock);
LIBCFS_FREE(buf, sizeof(*buf));
- cfs_spin_lock(&scd->scd_lock);
+ spin_lock(&scd->scd_lock);
}
void
sv->sv_id, sv->sv_name);
cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
- cfs_spin_lock(&scd->scd_lock);
+ spin_lock(&scd->scd_lock);
/* schedule in-flight RPCs to notice the abort, NB:
* racing with incoming RPCs; complete fix should make test
* RPCs carry session ID in its headers */
- cfs_list_for_each_entry(rpc, &scd->scd_rpc_active, srpc_list) {
+ list_for_each_entry(rpc, &scd->scd_rpc_active, srpc_list) {
rpc->srpc_aborted = 1;
swi_schedule_workitem(&rpc->srpc_wi);
}
- cfs_spin_unlock(&scd->scd_lock);
+ spin_unlock(&scd->scd_lock);
}
}
sv->sv_id, sv->sv_name);
cfs_percpt_for_each(scd, i, sv->sv_cpt_data)
- cfs_spin_lock(&scd->scd_lock);
+ spin_lock(&scd->scd_lock);
sv->sv_shuttingdown = 1; /* i.e. no new active RPC */
cfs_percpt_for_each(scd, i, sv->sv_cpt_data)
- cfs_spin_unlock(&scd->scd_lock);
+ spin_unlock(&scd->scd_lock);
cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
- cfs_spin_lock(&scd->scd_lock);
+ spin_lock(&scd->scd_lock);
/* schedule in-flight RPCs to notice the shutdown */
- cfs_list_for_each_entry(rpc, &scd->scd_rpc_active, srpc_list)
+ list_for_each_entry(rpc, &scd->scd_rpc_active, srpc_list)
swi_schedule_workitem(&rpc->srpc_wi);
- cfs_spin_unlock(&scd->scd_lock);
+ spin_unlock(&scd->scd_lock);
/* OK to traverse scd_buf_posted without lock, since no one
* touches scd_buf_posted now */
- cfs_list_for_each_entry(buf, &scd->scd_buf_posted, buf_list)
+ list_for_each_entry(buf, &scd->scd_buf_posted, buf_list)
LNetMDUnlink(buf->buf_mdh);
}
}
-int
+static int
srpc_send_request (srpc_client_rpc_t *rpc)
{
srpc_event_t *ev = &rpc->crpc_reqstev;
return rc;
}
-int
+static int
srpc_prepare_reply (srpc_client_rpc_t *rpc)
{
srpc_event_t *ev = &rpc->crpc_replyev;
return rc;
}
-int
+static int
srpc_prepare_bulk (srpc_client_rpc_t *rpc)
{
srpc_bulk_t *bk = &rpc->crpc_bulk;
return rc;
}
-int
+static int
srpc_do_bulk (srpc_server_rpc_t *rpc)
{
srpc_event_t *ev = &rpc->srpc_ev;
}
/* only called from srpc_handle_rpc */
-void
+static void
srpc_server_rpc_done(srpc_server_rpc_t *rpc, int status)
{
struct srpc_service_cd *scd = rpc->srpc_scd;
swi_state2str(rpc->srpc_wi.swi_state), status);
if (status != 0) {
- cfs_spin_lock(&srpc_data.rpc_glock);
- srpc_data.rpc_counters.rpcs_dropped++;
- cfs_spin_unlock(&srpc_data.rpc_glock);
- }
+ spin_lock(&srpc_data.rpc_glock);
+ srpc_data.rpc_counters.rpcs_dropped++;
+ spin_unlock(&srpc_data.rpc_glock);
+ }
- if (rpc->srpc_done != NULL)
- (*rpc->srpc_done) (rpc);
- LASSERT (rpc->srpc_bulk == NULL);
+ if (rpc->srpc_done != NULL)
+ (*rpc->srpc_done) (rpc);
+ LASSERT(rpc->srpc_bulk == NULL);
- cfs_spin_lock(&scd->scd_lock);
+ spin_lock(&scd->scd_lock);
if (rpc->srpc_reqstbuf != NULL) {
/* NB might drop sv_lock in srpc_service_recycle_buffer, but
rpc->srpc_reqstbuf = NULL;
}
- cfs_list_del(&rpc->srpc_list); /* from scd->scd_rpc_active */
+ list_del(&rpc->srpc_list); /* from scd->scd_rpc_active */
/*
* No one can schedule me now since:
LASSERT(rpc->srpc_ev.ev_fired);
swi_exit_workitem(&rpc->srpc_wi);
- if (!sv->sv_shuttingdown && !cfs_list_empty(&scd->scd_buf_blocked)) {
- buffer = cfs_list_entry(scd->scd_buf_blocked.next,
+ if (!sv->sv_shuttingdown && !list_empty(&scd->scd_buf_blocked)) {
+ buffer = list_entry(scd->scd_buf_blocked.next,
srpc_buffer_t, buf_list);
- cfs_list_del(&buffer->buf_list);
+ list_del(&buffer->buf_list);
srpc_init_server_rpc(rpc, scd, buffer);
- cfs_list_add_tail(&rpc->srpc_list, &scd->scd_rpc_active);
+ list_add_tail(&rpc->srpc_list, &scd->scd_rpc_active);
swi_schedule_workitem(&rpc->srpc_wi);
} else {
- cfs_list_add(&rpc->srpc_list, &scd->scd_rpc_free);
+ list_add(&rpc->srpc_list, &scd->scd_rpc_free);
}
- cfs_spin_unlock(&scd->scd_lock);
+ spin_unlock(&scd->scd_lock);
return;
}
LASSERT(wi == &rpc->srpc_wi);
- cfs_spin_lock(&scd->scd_lock);
+ spin_lock(&scd->scd_lock);
if (sv->sv_shuttingdown || rpc->srpc_aborted) {
- cfs_spin_unlock(&scd->scd_lock);
+ spin_unlock(&scd->scd_lock);
if (rpc->srpc_bulk != NULL)
LNetMDUnlink(rpc->srpc_bulk->bk_mdh);
return 0;
}
- cfs_spin_unlock(&scd->scd_lock);
+ spin_unlock(&scd->scd_lock);
switch (wi->swi_state) {
default:
if (msg->msg_magic == 0) {
/* moaned already in srpc_lnet_ev_handler */
- rc = EBADMSG;
- } else if (msg->msg_version != SRPC_MSG_VERSION &&
- msg->msg_version != __swab32(SRPC_MSG_VERSION)) {
- CWARN ("Version mismatch: %u, %u expected, from %s\n",
- msg->msg_version, SRPC_MSG_VERSION,
- libcfs_id2str(rpc->srpc_peer));
- reply->status = EPROTO;
- } else {
- reply->status = 0;
- rc = (*sv->sv_handler) (rpc);
- LASSERT (reply->status == 0 || !rpc->srpc_bulk);
- }
+ srpc_server_rpc_done(rpc, EBADMSG);
+ return 1;
+ }
- if (rc != 0) {
- srpc_server_rpc_done(rpc, rc);
- return 1;
+ srpc_unpack_msg_hdr(msg);
+ if (msg->msg_version != SRPC_MSG_VERSION) {
+ CWARN("Version mismatch: %u, %u expected, from %s\n",
+ msg->msg_version, SRPC_MSG_VERSION,
+ libcfs_id2str(rpc->srpc_peer));
+ reply->status = EPROTO;
+ /* drop through and send reply */
+ } else {
+ reply->status = 0;
+ rc = (*sv->sv_handler)(rpc);
+ LASSERT(reply->status == 0 || !rpc->srpc_bulk);
+ if (rc != 0) {
+ srpc_server_rpc_done(rpc, rc);
+ return 1;
+ }
}
wi->swi_state = SWI_STATE_BULK_STARTED;
return 0;
}
-void
+static void
srpc_client_rpc_expired (void *data)
{
srpc_client_rpc_t *rpc = data;
rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
rpc->crpc_timeout);
- cfs_spin_lock(&rpc->crpc_lock);
+ spin_lock(&rpc->crpc_lock);
- rpc->crpc_timeout = 0;
- srpc_abort_rpc(rpc, -ETIMEDOUT);
+ rpc->crpc_timeout = 0;
+ srpc_abort_rpc(rpc, -ETIMEDOUT);
- cfs_spin_unlock(&rpc->crpc_lock);
+ spin_unlock(&rpc->crpc_lock);
- cfs_spin_lock(&srpc_data.rpc_glock);
- srpc_data.rpc_counters.rpcs_expired++;
- cfs_spin_unlock(&srpc_data.rpc_glock);
- return;
+ spin_lock(&srpc_data.rpc_glock);
+ srpc_data.rpc_counters.rpcs_expired++;
+ spin_unlock(&srpc_data.rpc_glock);
}
-inline void
-srpc_add_client_rpc_timer (srpc_client_rpc_t *rpc)
+static void
+srpc_add_client_rpc_timer(srpc_client_rpc_t *rpc)
{
- stt_timer_t *timer = &rpc->crpc_timer;
+ stt_timer_t *timer = &rpc->crpc_timer;
- if (rpc->crpc_timeout == 0) return;
+ if (rpc->crpc_timeout == 0)
+ return;
- CFS_INIT_LIST_HEAD(&timer->stt_list);
- timer->stt_data = rpc;
- timer->stt_func = srpc_client_rpc_expired;
- timer->stt_expires = cfs_time_add(rpc->crpc_timeout,
- cfs_time_current_sec());
- stt_add_timer(timer);
- return;
+ INIT_LIST_HEAD(&timer->stt_list);
+ timer->stt_data = rpc;
+ timer->stt_func = srpc_client_rpc_expired;
+ timer->stt_expires = cfs_time_add(rpc->crpc_timeout,
+ cfs_time_current_sec());
+ stt_add_timer(timer);
+ return;
}
/*
*
* Upon exit the RPC expiry timer is not queued and the handler is not
* running on any CPU. */
-void
+static void
srpc_del_client_rpc_timer (srpc_client_rpc_t *rpc)
{
- /* timer not planted or already exploded */
- if (rpc->crpc_timeout == 0) return;
+ /* timer not planted or already exploded */
+ if (rpc->crpc_timeout == 0)
+ return;
- /* timer sucessfully defused */
- if (stt_del_timer(&rpc->crpc_timer)) return;
+ /* timer sucessfully defused */
+ if (stt_del_timer(&rpc->crpc_timer))
+ return;
#ifdef __KERNEL__
- /* timer detonated, wait for it to explode */
- while (rpc->crpc_timeout != 0) {
- cfs_spin_unlock(&rpc->crpc_lock);
+ /* timer detonated, wait for it to explode */
+ while (rpc->crpc_timeout != 0) {
+ spin_unlock(&rpc->crpc_lock);
- cfs_schedule();
+ schedule();
- cfs_spin_lock(&rpc->crpc_lock);
- }
+ spin_lock(&rpc->crpc_lock);
+ }
#else
- LBUG(); /* impossible in single-threaded runtime */
+ LBUG(); /* impossible in single-threaded runtime */
#endif
- return;
}
-void
+static void
srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status)
{
- swi_workitem_t *wi = &rpc->crpc_wi;
+ swi_workitem_t *wi = &rpc->crpc_wi;
- LASSERT (status != 0 || wi->swi_state == SWI_STATE_DONE);
+ LASSERT(status != 0 || wi->swi_state == SWI_STATE_DONE);
- cfs_spin_lock(&rpc->crpc_lock);
+ spin_lock(&rpc->crpc_lock);
rpc->crpc_closed = 1;
if (rpc->crpc_status == 0)
LASSERT (!srpc_event_pending(rpc));
swi_exit_workitem(wi);
- cfs_spin_unlock(&rpc->crpc_lock);
+ spin_unlock(&rpc->crpc_lock);
(*rpc->crpc_done)(rpc);
return;
srpc_send_rpc (swi_workitem_t *wi)
{
int rc = 0;
- srpc_client_rpc_t *rpc = wi->swi_workitem.wi_data;
- srpc_msg_t *reply = &rpc->crpc_replymsg;
- int do_bulk = rpc->crpc_bulk.bk_niov > 0;
+ srpc_client_rpc_t *rpc;
+ srpc_msg_t *reply;
+ int do_bulk;
+
+ LASSERT(wi != NULL);
+
+ rpc = wi->swi_workitem.wi_data;
LASSERT (rpc != NULL);
LASSERT (wi == &rpc->crpc_wi);
- cfs_spin_lock(&rpc->crpc_lock);
+ reply = &rpc->crpc_replymsg;
+ do_bulk = rpc->crpc_bulk.bk_niov > 0;
- if (rpc->crpc_aborted) {
- cfs_spin_unlock(&rpc->crpc_lock);
- goto abort;
- }
+ spin_lock(&rpc->crpc_lock);
+
+ if (rpc->crpc_aborted) {
+ spin_unlock(&rpc->crpc_lock);
+ goto abort;
+ }
- cfs_spin_unlock(&rpc->crpc_lock);
+ spin_unlock(&rpc->crpc_lock);
switch (wi->swi_state) {
default:
rc = rpc->crpc_replyev.ev_status;
if (rc != 0) break;
- if ((reply->msg_type != type &&
- reply->msg_type != __swab32(type)) ||
- (reply->msg_magic != SRPC_MSG_MAGIC &&
- reply->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
+ srpc_unpack_msg_hdr(reply);
+ if (reply->msg_type != type ||
+ (reply->msg_magic != SRPC_MSG_MAGIC &&
+ reply->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
CWARN ("Bad message from %s: type %u (%d expected),"
" magic %u (%d expected).\n",
libcfs_id2str(rpc->crpc_dest),
return 1;
}
- if (rc != 0) {
- cfs_spin_lock(&rpc->crpc_lock);
- srpc_abort_rpc(rpc, rc);
- cfs_spin_unlock(&rpc->crpc_lock);
- }
+ if (rc != 0) {
+ spin_lock(&rpc->crpc_lock);
+ srpc_abort_rpc(rpc, rc);
+ spin_unlock(&rpc->crpc_lock);
+ }
abort:
if (rpc->crpc_aborted) {
{
LASSERT (!rpc->crpc_aborted);
LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
- LASSERT ((rpc->crpc_bulk.bk_len & ~CFS_PAGE_MASK) == 0);
CDEBUG (D_NET, "Posting RPC: peer %s, service %d, timeout %d\n",
libcfs_id2str(rpc->crpc_dest), rpc->crpc_service,
LASSERT(buffer != NULL);
rpyid = buffer->buf_msg.msg_body.reqst.rpyid;
- cfs_spin_lock(&scd->scd_lock);
+ spin_lock(&scd->scd_lock);
if (!sv->sv_shuttingdown && !srpc_serv_is_framework(sv)) {
/* Repost buffer before replying since test client
rpc->srpc_reqstbuf = NULL;
}
- cfs_spin_unlock(&scd->scd_lock);
+ spin_unlock(&scd->scd_lock);
ev->ev_fired = 0;
ev->ev_data = rpc;
}
/* when in kernel always called with LNET_LOCK() held, and in thread context */
-void
+static void
srpc_lnet_ev_handler(lnet_event_t *ev)
{
struct srpc_service_cd *scd;
- srpc_event_t *rpcev = ev->md.user_ptr;
- srpc_client_rpc_t *crpc;
- srpc_server_rpc_t *srpc;
- srpc_buffer_t *buffer;
- srpc_service_t *sv;
- srpc_msg_t *msg;
- srpc_msg_type_t type;
-
- LASSERT (!cfs_in_interrupt());
-
- if (ev->status != 0) {
- cfs_spin_lock(&srpc_data.rpc_glock);
- srpc_data.rpc_counters.errors++;
- cfs_spin_unlock(&srpc_data.rpc_glock);
- }
+ srpc_event_t *rpcev = ev->md.user_ptr;
+ srpc_client_rpc_t *crpc;
+ srpc_server_rpc_t *srpc;
+ srpc_buffer_t *buffer;
+ srpc_service_t *sv;
+ srpc_msg_t *msg;
+ srpc_msg_type_t type;
+
+ LASSERT (!in_interrupt());
+
+ if (ev->status != 0) {
+ spin_lock(&srpc_data.rpc_glock);
+ srpc_data.rpc_counters.errors++;
+ CERROR("ev->status = %d, ev->type = %d, errors = %u, "
+ "rpcs_sent = %u, rpcs_rcvd = %u, rpcs_dropped = %u, "
+ "rpcs_expired = %u\n",
+ ev->status, ev->type, srpc_data.rpc_counters.errors,
+ srpc_data.rpc_counters.rpcs_sent,
+ srpc_data.rpc_counters.rpcs_rcvd,
+ srpc_data.rpc_counters.rpcs_dropped,
+ srpc_data.rpc_counters.rpcs_expired);
+ spin_unlock(&srpc_data.rpc_glock);
+ }
rpcev->ev_lnet = ev->type;
LBUG ();
case SRPC_REQUEST_SENT:
if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {
- cfs_spin_lock(&srpc_data.rpc_glock);
- srpc_data.rpc_counters.rpcs_sent++;
- cfs_spin_unlock(&srpc_data.rpc_glock);
+ spin_lock(&srpc_data.rpc_glock);
+ srpc_data.rpc_counters.rpcs_sent++;
+ spin_unlock(&srpc_data.rpc_glock);
}
case SRPC_REPLY_RCVD:
case SRPC_BULK_REQ_RCVD:
LBUG ();
}
- cfs_spin_lock(&crpc->crpc_lock);
+ spin_lock(&crpc->crpc_lock);
- LASSERT (rpcev->ev_fired == 0);
- rpcev->ev_fired = 1;
- rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
- -EINTR : ev->status;
- swi_schedule_workitem(&crpc->crpc_wi);
+ LASSERT(rpcev->ev_fired == 0);
+ rpcev->ev_fired = 1;
+ rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
+ -EINTR : ev->status;
+ swi_schedule_workitem(&crpc->crpc_wi);
- cfs_spin_unlock(&crpc->crpc_lock);
- break;
+ spin_unlock(&crpc->crpc_lock);
+ break;
- case SRPC_REQUEST_RCVD:
+ case SRPC_REQUEST_RCVD:
scd = rpcev->ev_data;
sv = scd->scd_svc;
LASSERT(rpcev == &scd->scd_ev);
- cfs_spin_lock(&scd->scd_lock);
+ spin_lock(&scd->scd_lock);
LASSERT (ev->unlinked);
LASSERT (ev->type == LNET_EVENT_PUT ||
if (sv->sv_shuttingdown) {
/* Leave buffer on scd->scd_buf_nposted since
* srpc_finish_service needs to traverse it. */
- cfs_spin_unlock(&scd->scd_lock);
+ spin_unlock(&scd->scd_lock);
break;
}
swi_schedule_workitem(&scd->scd_buf_wi);
}
- cfs_list_del(&buffer->buf_list); /* from scd->scd_buf_posted */
- msg = &buffer->buf_msg;
- type = srpc_service2request(sv->sv_id);
+ list_del(&buffer->buf_list); /* from scd->scd_buf_posted */
+ msg = &buffer->buf_msg;
+ type = srpc_service2request(sv->sv_id);
if (ev->status != 0 || ev->mlength != sizeof(*msg) ||
(msg->msg_type != type &&
msg->msg_magic = 0;
}
- if (!cfs_list_empty(&scd->scd_rpc_free)) {
- srpc = cfs_list_entry(scd->scd_rpc_free.next,
- struct srpc_server_rpc,
- srpc_list);
- cfs_list_del(&srpc->srpc_list);
+ if (!list_empty(&scd->scd_rpc_free)) {
+ srpc = list_entry(scd->scd_rpc_free.next,
+ struct srpc_server_rpc,
+ srpc_list);
+ list_del(&srpc->srpc_list);
srpc_init_server_rpc(srpc, scd, buffer);
- cfs_list_add_tail(&srpc->srpc_list,
- &scd->scd_rpc_active);
+ list_add_tail(&srpc->srpc_list,
+ &scd->scd_rpc_active);
swi_schedule_workitem(&srpc->srpc_wi);
} else {
- cfs_list_add_tail(&buffer->buf_list,
- &scd->scd_buf_blocked);
+ list_add_tail(&buffer->buf_list,
+ &scd->scd_buf_blocked);
}
- cfs_spin_unlock(&scd->scd_lock);
+ spin_unlock(&scd->scd_lock);
- cfs_spin_lock(&srpc_data.rpc_glock);
- srpc_data.rpc_counters.rpcs_rcvd++;
- cfs_spin_unlock(&srpc_data.rpc_glock);
+ spin_lock(&srpc_data.rpc_glock);
+ srpc_data.rpc_counters.rpcs_rcvd++;
+ spin_unlock(&srpc_data.rpc_glock);
break;
case SRPC_BULK_GET_RPLD:
case SRPC_BULK_PUT_SENT:
if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {
- cfs_spin_lock(&srpc_data.rpc_glock);
+ spin_lock(&srpc_data.rpc_glock);
- if (rpcev->ev_type == SRPC_BULK_GET_RPLD)
- srpc_data.rpc_counters.bulk_get += ev->mlength;
- else
- srpc_data.rpc_counters.bulk_put += ev->mlength;
+ if (rpcev->ev_type == SRPC_BULK_GET_RPLD)
+ srpc_data.rpc_counters.bulk_get += ev->mlength;
+ else
+ srpc_data.rpc_counters.bulk_put += ev->mlength;
- cfs_spin_unlock(&srpc_data.rpc_glock);
- }
- case SRPC_REPLY_SENT:
+ spin_unlock(&srpc_data.rpc_glock);
+ }
+ case SRPC_REPLY_SENT:
srpc = rpcev->ev_data;
scd = srpc->srpc_scd;
LASSERT(rpcev == &srpc->srpc_ev);
- cfs_spin_lock(&scd->scd_lock);
+ spin_lock(&scd->scd_lock);
rpcev->ev_fired = 1;
rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
-EINTR : ev->status;
swi_schedule_workitem(&srpc->srpc_wi);
- cfs_spin_unlock(&scd->scd_lock);
+ spin_unlock(&scd->scd_lock);
break;
}
}
int
srpc_startup (void)
{
- int rc;
+ int rc;
- memset(&srpc_data, 0, sizeof(struct smoketest_rpc));
- cfs_spin_lock_init(&srpc_data.rpc_glock);
+ memset(&srpc_data, 0, sizeof(struct smoketest_rpc));
+ spin_lock_init(&srpc_data.rpc_glock);
/* 1 second pause to avoid timestamp reuse */
cfs_pause(cfs_time_seconds(1));
srpc_data.rpc_state = SRPC_STATE_NONE;
#ifdef __KERNEL__
- rc = LNetNIInit(LUSTRE_SRV_LNET_PID);
+ rc = LNetNIInit(LNET_PID_LUSTRE);
#else
if (the_lnet.ln_server_mode_flag)
- rc = LNetNIInit(LUSTRE_SRV_LNET_PID);
+ rc = LNetNIInit(LNET_PID_LUSTRE);
else
rc = LNetNIInit(getpid() | LNET_PID_USERFLAG);
#endif
default:
LBUG ();
case SRPC_STATE_RUNNING:
- cfs_spin_lock(&srpc_data.rpc_glock);
+ spin_lock(&srpc_data.rpc_glock);
for (i = 0; i <= SRPC_SERVICE_MAX_ID; i++) {
srpc_service_t *sv = srpc_data.rpc_services[i];
i, sv->sv_name);
}
- cfs_spin_unlock(&srpc_data.rpc_glock);
+ spin_unlock(&srpc_data.rpc_glock);
stt_shutdown();