Whamcloud - gitweb
b=21776 make sure libcfs_memory_pressure_get is declared before using it in LNetPut()
[fs/lustre-release.git] / lnet / selftest / rpc.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/selftest/rpc.c
37  *
38  * Author: Isaac Huang <isaac@clusterfs.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_LNET
42
43 #include "selftest.h"
44
45
46 typedef enum {
47         SRPC_STATE_NONE,
48         SRPC_STATE_NI_INIT,
49         SRPC_STATE_EQ_INIT,
50         SRPC_STATE_WI_INIT,
51         SRPC_STATE_RUNNING,
52         SRPC_STATE_STOPPING,
53 } srpc_state_t;
54
55 struct smoketest_rpc {
56         spinlock_t        rpc_glock;     /* global lock */
57         srpc_service_t   *rpc_services[SRPC_SERVICE_MAX_ID + 1];
58         lnet_handle_eq_t  rpc_lnet_eq;   /* _the_ LNet event queue */
59         srpc_state_t      rpc_state;
60         srpc_counters_t   rpc_counters;
61         __u64             rpc_matchbits; /* matchbits counter */
62 } srpc_data;
63
64 /* forward ref's */
65 int srpc_handle_rpc (swi_workitem_t *wi);
66
67 void srpc_get_counters (srpc_counters_t *cnt)
68 {
69         spin_lock(&srpc_data.rpc_glock);
70         *cnt = srpc_data.rpc_counters;
71         spin_unlock(&srpc_data.rpc_glock);
72 }
73
74 void srpc_set_counters (const srpc_counters_t *cnt)
75 {
76         spin_lock(&srpc_data.rpc_glock);
77         srpc_data.rpc_counters = *cnt;
78         spin_unlock(&srpc_data.rpc_glock);
79 }
80
81 void
82 srpc_add_bulk_page (srpc_bulk_t *bk, cfs_page_t *pg, int i)
83 {
84         LASSERT (i >= 0 && i < bk->bk_niov);
85
86 #ifdef __KERNEL__
87         bk->bk_iovs[i].kiov_offset = 0;
88         bk->bk_iovs[i].kiov_page   = pg;
89         bk->bk_iovs[i].kiov_len    = CFS_PAGE_SIZE;
90 #else
91         LASSERT (bk->bk_pages != NULL);
92
93         bk->bk_pages[i] = pg;
94         bk->bk_iovs[i].iov_len  = CFS_PAGE_SIZE;
95         bk->bk_iovs[i].iov_base = cfs_page_address(pg);
96 #endif
97         return;
98 }
99
100 void
101 srpc_free_bulk (srpc_bulk_t *bk)
102 {
103         int         i;
104         cfs_page_t *pg;
105
106         LASSERT (bk != NULL);
107 #ifndef __KERNEL__
108         LASSERT (bk->bk_pages != NULL);
109 #endif
110
111         for (i = 0; i < bk->bk_niov; i++) {
112 #ifdef __KERNEL__
113                 pg = bk->bk_iovs[i].kiov_page;
114 #else
115                 pg = bk->bk_pages[i];
116 #endif
117                 if (pg == NULL) break;
118
119                 cfs_free_page(pg);
120         }
121
122 #ifndef __KERNEL__
123         LIBCFS_FREE(bk->bk_pages, sizeof(cfs_page_t *) * bk->bk_niov);
124 #endif
125         LIBCFS_FREE(bk, offsetof(srpc_bulk_t, bk_iovs[bk->bk_niov]));
126         return;
127 }
128
129 srpc_bulk_t *
130 srpc_alloc_bulk (int npages, int sink)
131 {
132         srpc_bulk_t  *bk;
133         cfs_page_t  **pages;
134         int           i;
135
136         LASSERT (npages > 0 && npages <= LNET_MAX_IOV);
137
138         LIBCFS_ALLOC(bk, offsetof(srpc_bulk_t, bk_iovs[npages]));
139         if (bk == NULL) {
140                 CERROR ("Can't allocate descriptor for %d pages\n", npages);
141                 return NULL;
142         }
143
144         memset(bk, 0, offsetof(srpc_bulk_t, bk_iovs[npages]));
145         bk->bk_sink = sink;
146         bk->bk_niov = npages;
147         bk->bk_len  = npages * CFS_PAGE_SIZE;
148 #ifndef __KERNEL__
149         LIBCFS_ALLOC(pages, sizeof(cfs_page_t *) * npages);
150         if (pages == NULL) {
151                 LIBCFS_FREE(bk, offsetof(srpc_bulk_t, bk_iovs[npages]));
152                 CERROR ("Can't allocate page array for %d pages\n", npages);
153                 return NULL;
154         }
155
156         memset(pages, 0, sizeof(cfs_page_t *) * npages);
157         bk->bk_pages = pages;
158 #else
159         UNUSED (pages);
160 #endif
161
162         for (i = 0; i < npages; i++) {
163                 cfs_page_t *pg = cfs_alloc_page(CFS_ALLOC_STD);
164
165                 if (pg == NULL) {
166                         CERROR ("Can't allocate page %d of %d\n", i, npages);
167                         srpc_free_bulk(bk);
168                         return NULL;
169                 }
170
171                 srpc_add_bulk_page(bk, pg, i);
172         }
173
174         return bk;
175 }
176
177 static inline __u64
178 srpc_next_id (void)
179 {
180         __u64 id;
181
182         spin_lock(&srpc_data.rpc_glock);
183         id = srpc_data.rpc_matchbits++;
184         spin_unlock(&srpc_data.rpc_glock);
185         return id;
186 }
187
188 void
189 srpc_init_server_rpc (srpc_server_rpc_t *rpc,
190                       srpc_service_t *sv, srpc_buffer_t *buffer)
191 {
192         memset(rpc, 0, sizeof(*rpc));
193         swi_init_workitem(&rpc->srpc_wi, rpc, srpc_handle_rpc);
194
195         rpc->srpc_ev.ev_fired = 1; /* no event expected now */
196
197         rpc->srpc_service  = sv;
198         rpc->srpc_reqstbuf = buffer;
199         rpc->srpc_peer     = buffer->buf_peer;
200         rpc->srpc_self     = buffer->buf_self;
201         rpc->srpc_replymdh = LNET_INVALID_HANDLE;
202 }
203
204 int
205 srpc_add_service (srpc_service_t *sv)
206 {
207         int                id = sv->sv_id;
208         int                i;
209         srpc_server_rpc_t *rpc;
210
211         LASSERT (sv->sv_concur > 0);
212         LASSERT (0 <= id && id <= SRPC_SERVICE_MAX_ID);
213
214         spin_lock(&srpc_data.rpc_glock);
215
216         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
217
218         if (srpc_data.rpc_services[id] != NULL) {
219                 spin_unlock(&srpc_data.rpc_glock);
220                 return -EBUSY;
221         }
222
223         srpc_data.rpc_services[id] = sv;
224         spin_unlock(&srpc_data.rpc_glock);
225
226         sv->sv_nprune       = 0;
227         sv->sv_nposted_msg  = 0;
228         sv->sv_shuttingdown = 0;
229         spin_lock_init(&sv->sv_lock);
230         CFS_INIT_LIST_HEAD(&sv->sv_free_rpcq);
231         CFS_INIT_LIST_HEAD(&sv->sv_active_rpcq);
232         CFS_INIT_LIST_HEAD(&sv->sv_posted_msgq);
233         CFS_INIT_LIST_HEAD(&sv->sv_blocked_msgq);
234
235         sv->sv_ev.ev_data = sv;
236         sv->sv_ev.ev_type = SRPC_REQUEST_RCVD;
237
238         for (i = 0; i < sv->sv_concur; i++) {
239                 LIBCFS_ALLOC(rpc, sizeof(*rpc));
240                 if (rpc == NULL) goto enomem;
241
242                 list_add(&rpc->srpc_list, &sv->sv_free_rpcq);
243         }
244
245         CDEBUG (D_NET, "Adding service: id %d, name %s, concurrency %d\n",
246                 id, sv->sv_name, sv->sv_concur);
247         return 0;
248
249 enomem:
250         while (!list_empty(&sv->sv_free_rpcq)) {
251                 rpc = list_entry(sv->sv_free_rpcq.next,
252                                  srpc_server_rpc_t, srpc_list);
253                 list_del(&rpc->srpc_list);
254                 LIBCFS_FREE(rpc, sizeof(*rpc));
255         }
256
257         spin_lock(&srpc_data.rpc_glock);
258         srpc_data.rpc_services[id] = NULL;
259         spin_unlock(&srpc_data.rpc_glock);
260         return -ENOMEM;
261 }
262
263 int
264 srpc_remove_service (srpc_service_t *sv)
265 {
266         int id = sv->sv_id;
267
268         spin_lock(&srpc_data.rpc_glock);
269
270         if (srpc_data.rpc_services[id] != sv) {
271                 spin_unlock(&srpc_data.rpc_glock);
272                 return -ENOENT;
273         }
274
275         srpc_data.rpc_services[id] = NULL;
276         spin_unlock(&srpc_data.rpc_glock);
277         return 0;
278 }
279
280 int
281 srpc_post_passive_rdma(int portal, __u64 matchbits, void *buf,
282                        int len, int options, lnet_process_id_t peer,
283                        lnet_handle_md_t *mdh, srpc_event_t *ev)
284 {
285         int              rc;
286         lnet_md_t        md;
287         lnet_handle_me_t meh;
288
289         rc = LNetMEAttach(portal, peer, matchbits, 0,
290                           LNET_UNLINK, LNET_INS_AFTER, &meh);
291         if (rc != 0) {
292                 CERROR ("LNetMEAttach failed: %d\n", rc);
293                 LASSERT (rc == -ENOMEM);
294                 return -ENOMEM;
295         }
296
297         md.threshold = 1;
298         md.user_ptr  = ev;
299         md.start     = buf;
300         md.length    = len;
301         md.options   = options;
302         md.eq_handle = srpc_data.rpc_lnet_eq;
303
304         rc = LNetMDAttach(meh, md, LNET_UNLINK, mdh);
305         if (rc != 0) {
306                 CERROR ("LNetMDAttach failed: %d\n", rc);
307                 LASSERT (rc == -ENOMEM);
308
309                 rc = LNetMEUnlink(meh);
310                 LASSERT (rc == 0);
311                 return -ENOMEM;
312         }
313
314         CDEBUG (D_NET,
315                 "Posted passive RDMA: peer %s, portal %d, matchbits "LPX64"\n",
316                 libcfs_id2str(peer), portal, matchbits);
317         return 0;
318 }
319
320 int
321 srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len,
322                       int options, lnet_process_id_t peer, lnet_nid_t self,
323                       lnet_handle_md_t *mdh, srpc_event_t *ev)
324 {
325         int       rc;
326         lnet_md_t md;
327
328         md.user_ptr  = ev;
329         md.start     = buf;
330         md.length    = len;
331         md.eq_handle = srpc_data.rpc_lnet_eq;
332         md.threshold = ((options & LNET_MD_OP_GET) != 0) ? 2 : 1;
333         md.options   = options & ~(LNET_MD_OP_PUT | LNET_MD_OP_GET);
334
335         rc = LNetMDBind(md, LNET_UNLINK, mdh);
336         if (rc != 0) {
337                 CERROR ("LNetMDBind failed: %d\n", rc);
338                 LASSERT (rc == -ENOMEM);
339                 return -ENOMEM;
340         }
341
342         /* this is kind of an abuse of the LNET_MD_OP_{PUT,GET} options.
343          * they're only meaningful for MDs attached to an ME (i.e. passive
344          * buffers... */
345         if ((options & LNET_MD_OP_PUT) != 0) {
346                 rc = LNetPut(self, *mdh, LNET_NOACK_REQ, peer,
347                              portal, matchbits, 0, 0);
348         } else {
349                 LASSERT ((options & LNET_MD_OP_GET) != 0);
350
351                 rc = LNetGet(self, *mdh, peer, portal, matchbits, 0);
352         }
353
354         if (rc != 0) {
355                 CERROR ("LNet%s(%s, %d, "LPD64") failed: %d\n",
356                         ((options & LNET_MD_OP_PUT) != 0) ? "Put" : "Get",
357                         libcfs_id2str(peer), portal, matchbits, rc);
358
359                 /* The forthcoming unlink event will complete this operation
360                  * with failure, so fall through and return success here.
361                  */
362                 rc = LNetMDUnlink(*mdh);
363                 LASSERT (rc == 0);
364         } else {
365                 CDEBUG (D_NET,
366                         "Posted active RDMA: peer %s, portal %u, matchbits "LPX64"\n",
367                         libcfs_id2str(peer), portal, matchbits);
368         }
369         return 0;
370 }
371
372 int
373 srpc_post_active_rqtbuf(lnet_process_id_t peer, int service, void *buf,
374                         int len, lnet_handle_md_t *mdh, srpc_event_t *ev)
375 {
376         int rc;
377         int portal;
378
379         if (service > SRPC_FRAMEWORK_SERVICE_MAX_ID)
380                 portal = SRPC_REQUEST_PORTAL;
381         else
382                 portal = SRPC_FRAMEWORK_REQUEST_PORTAL;
383
384         rc = srpc_post_active_rdma(portal, service, buf, len,
385                                    LNET_MD_OP_PUT, peer,
386                                    LNET_NID_ANY, mdh, ev);
387         return rc;
388 }
389
390 int
391 srpc_post_passive_rqtbuf(int service, void *buf, int len,
392                          lnet_handle_md_t *mdh, srpc_event_t *ev)
393 {
394         int               rc;
395         int               portal;
396         lnet_process_id_t any = {.nid = LNET_NID_ANY,
397                                  .pid = LNET_PID_ANY};
398
399         if (service > SRPC_FRAMEWORK_SERVICE_MAX_ID)
400                 portal = SRPC_REQUEST_PORTAL;
401         else
402                 portal = SRPC_FRAMEWORK_REQUEST_PORTAL;
403
404         rc = srpc_post_passive_rdma(portal, service, buf, len,
405                                     LNET_MD_OP_PUT, any, mdh, ev);
406         return rc;
407 }
408
409 int
410 srpc_service_post_buffer (srpc_service_t *sv, srpc_buffer_t *buf)
411 {
412         srpc_msg_t *msg = &buf->buf_msg;
413         int         rc;
414
415         LASSERT (!sv->sv_shuttingdown);
416
417         buf->buf_mdh = LNET_INVALID_HANDLE;
418         list_add(&buf->buf_list, &sv->sv_posted_msgq);
419         sv->sv_nposted_msg++;
420         spin_unlock(&sv->sv_lock);
421
422         rc = srpc_post_passive_rqtbuf(sv->sv_id, msg, sizeof(*msg),
423                                       &buf->buf_mdh, &sv->sv_ev);
424
425         /* At this point, a RPC (new or delayed) may have arrived in
426          * msg and its event handler has been called. So we must add
427          * buf to sv_posted_msgq _before_ dropping sv_lock */
428
429         spin_lock(&sv->sv_lock);
430
431         if (rc == 0) {
432                 if (sv->sv_shuttingdown) {
433                         spin_unlock(&sv->sv_lock);
434
435                         /* srpc_shutdown_service might have tried to unlink me
436                          * when my buf_mdh was still invalid */
437                         LNetMDUnlink(buf->buf_mdh);
438
439                         spin_lock(&sv->sv_lock);
440                 }
441                 return 0;
442         }
443
444         sv->sv_nposted_msg--;
445         if (sv->sv_shuttingdown) return rc;
446
447         list_del(&buf->buf_list);
448
449         spin_unlock(&sv->sv_lock);
450         LIBCFS_FREE(buf, sizeof(*buf));
451         spin_lock(&sv->sv_lock);
452         return rc;
453 }
454
455 int
456 srpc_service_add_buffers (srpc_service_t *sv, int nbuffer)
457 {
458         int                rc;
459         int                posted;
460         srpc_buffer_t     *buf;
461
462         LASSERTF (nbuffer > 0,
463                   "nbuffer must be positive: %d\n", nbuffer);
464
465         for (posted = 0; posted < nbuffer; posted++) {
466                 LIBCFS_ALLOC(buf, sizeof(*buf));
467                 if (buf == NULL) break;
468
469                 spin_lock(&sv->sv_lock);
470                 rc = srpc_service_post_buffer(sv, buf);
471                 spin_unlock(&sv->sv_lock);
472
473                 if (rc != 0) break;
474         }
475
476         return posted;
477 }
478
479 void
480 srpc_service_remove_buffers (srpc_service_t *sv, int nbuffer)
481 {
482         LASSERTF (nbuffer > 0,
483                   "nbuffer must be positive: %d\n", nbuffer);
484
485         spin_lock(&sv->sv_lock);
486
487         LASSERT (sv->sv_nprune >= 0);
488         LASSERT (!sv->sv_shuttingdown);
489
490         sv->sv_nprune += nbuffer;
491
492         spin_unlock(&sv->sv_lock);
493         return;
494 }
495
496 /* returns 1 if sv has finished, otherwise 0 */
497 int
498 srpc_finish_service (srpc_service_t *sv)
499 {
500         srpc_server_rpc_t *rpc;
501         srpc_buffer_t     *buf;
502
503         spin_lock(&sv->sv_lock);
504
505         LASSERT (sv->sv_shuttingdown); /* srpc_shutdown_service called */
506
507         if (sv->sv_nposted_msg != 0 || !list_empty(&sv->sv_active_rpcq)) {
508                 CDEBUG (D_NET,
509                         "waiting for %d posted buffers to unlink and "
510                         "in-flight RPCs to die.\n",
511                         sv->sv_nposted_msg);
512
513                 if (!list_empty(&sv->sv_active_rpcq)) {
514                         rpc = list_entry(sv->sv_active_rpcq.next,
515                                          srpc_server_rpc_t, srpc_list);
516                         CNETERR("Active RPC %p on shutdown: sv %s, peer %s, "
517                                 "wi %s scheduled %d running %d, "
518                                 "ev fired %d type %d status %d lnet %d\n",
519                                 rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
520                                 swi_state2str(rpc->srpc_wi.wi_state),
521                                 rpc->srpc_wi.wi_scheduled,
522                                 rpc->srpc_wi.wi_running,
523                                 rpc->srpc_ev.ev_fired,
524                                 rpc->srpc_ev.ev_type,
525                                 rpc->srpc_ev.ev_status,
526                                 rpc->srpc_ev.ev_lnet);
527                 }
528
529                 spin_unlock(&sv->sv_lock);
530                 return 0;
531         }
532
533         spin_unlock(&sv->sv_lock); /* no lock needed from now on */
534
535         for (;;) {
536                 struct list_head *q;
537
538                 if (!list_empty(&sv->sv_posted_msgq))
539                         q = &sv->sv_posted_msgq;
540                 else if (!list_empty(&sv->sv_blocked_msgq))
541                         q = &sv->sv_blocked_msgq;
542                 else
543                         break;
544
545                 buf = list_entry(q->next, srpc_buffer_t, buf_list);
546                 list_del(&buf->buf_list);
547
548                 LIBCFS_FREE(buf, sizeof(*buf));
549         }
550
551         while (!list_empty(&sv->sv_free_rpcq)) {
552                 rpc = list_entry(sv->sv_free_rpcq.next,
553                                  srpc_server_rpc_t, srpc_list);
554                 list_del(&rpc->srpc_list);
555                 LIBCFS_FREE(rpc, sizeof(*rpc));
556         }
557
558         return 1;
559 }
560
561 /* called with sv->sv_lock held */
562 void
563 srpc_service_recycle_buffer (srpc_service_t *sv, srpc_buffer_t *buf)
564 {
565         if (sv->sv_shuttingdown) goto free;
566
567         if (sv->sv_nprune == 0) {
568                 if (srpc_service_post_buffer(sv, buf) != 0)
569                         CWARN ("Failed to post %s buffer\n", sv->sv_name);
570                 return;
571         }
572
573         sv->sv_nprune--;
574 free:
575         spin_unlock(&sv->sv_lock);
576         LIBCFS_FREE(buf, sizeof(*buf));
577         spin_lock(&sv->sv_lock);
578 }
579
580 /* called with srpc_service_t::sv_lock held */
581 inline void
582 srpc_schedule_server_rpc (srpc_server_rpc_t *rpc)
583 {
584         srpc_service_t *sv = rpc->srpc_service;
585
586         if (sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID)
587                 swi_schedule_workitem(&rpc->srpc_wi);
588         else    /* framework RPCs are handled one by one */
589                 swi_schedule_serial_workitem(&rpc->srpc_wi);
590
591         return;
592 }
593
594 void
595 srpc_abort_service (srpc_service_t *sv)
596 {
597         srpc_server_rpc_t *rpc;
598
599         spin_lock(&sv->sv_lock);
600
601         CDEBUG(D_NET, "Aborting service: id %d, name %s\n",
602                sv->sv_id, sv->sv_name);
603
604         /* schedule in-flight RPCs to notice the abort, NB:
605          * racing with incoming RPCs; complete fix should make test
606          * RPCs carry session ID in its headers */
607         list_for_each_entry (rpc, &sv->sv_active_rpcq, srpc_list) {
608                 rpc->srpc_aborted = 1;
609                 srpc_schedule_server_rpc(rpc);
610         }
611
612         spin_unlock(&sv->sv_lock);
613         return;
614 }
615
616 void
617 srpc_shutdown_service (srpc_service_t *sv)
618 {
619         srpc_server_rpc_t *rpc;
620         srpc_buffer_t     *buf;
621
622         spin_lock(&sv->sv_lock);
623
624         CDEBUG(D_NET, "Shutting down service: id %d, name %s\n",
625                sv->sv_id, sv->sv_name);
626
627         sv->sv_shuttingdown = 1; /* i.e. no new active RPC */
628
629         /* schedule in-flight RPCs to notice the shutdown */
630         list_for_each_entry (rpc, &sv->sv_active_rpcq, srpc_list) {
631                 srpc_schedule_server_rpc(rpc);
632         }
633
634         spin_unlock(&sv->sv_lock);
635
636         /* OK to traverse sv_posted_msgq without lock, since no one
637          * touches sv_posted_msgq now */
638         list_for_each_entry (buf, &sv->sv_posted_msgq, buf_list)
639                 LNetMDUnlink(buf->buf_mdh);
640
641         return;
642 }
643
644 int
645 srpc_send_request (srpc_client_rpc_t *rpc)
646 {
647         srpc_event_t *ev = &rpc->crpc_reqstev;
648         int           rc;
649
650         ev->ev_fired = 0;
651         ev->ev_data  = rpc;
652         ev->ev_type  = SRPC_REQUEST_SENT;
653
654         rc = srpc_post_active_rqtbuf(rpc->crpc_dest, rpc->crpc_service,
655                                      &rpc->crpc_reqstmsg, sizeof(srpc_msg_t),
656                                      &rpc->crpc_reqstmdh, ev);
657         if (rc != 0) {
658                 LASSERT (rc == -ENOMEM);
659                 ev->ev_fired = 1;  /* no more event expected */
660         }
661         return rc;
662 }
663
664 int
665 srpc_prepare_reply (srpc_client_rpc_t *rpc)
666 {
667         srpc_event_t *ev = &rpc->crpc_replyev;
668         __u64        *id = &rpc->crpc_reqstmsg.msg_body.reqst.rpyid;
669         int           rc;
670
671         ev->ev_fired = 0;
672         ev->ev_data  = rpc;
673         ev->ev_type  = SRPC_REPLY_RCVD;
674
675         *id = srpc_next_id();
676
677         rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, *id,
678                                     &rpc->crpc_replymsg, sizeof(srpc_msg_t),
679                                     LNET_MD_OP_PUT, rpc->crpc_dest,
680                                     &rpc->crpc_replymdh, ev);
681         if (rc != 0) {
682                 LASSERT (rc == -ENOMEM);
683                 ev->ev_fired = 1;  /* no more event expected */
684         }
685         return rc;
686 }
687
688 int
689 srpc_prepare_bulk (srpc_client_rpc_t *rpc)
690 {
691         srpc_bulk_t  *bk = &rpc->crpc_bulk;
692         srpc_event_t *ev = &rpc->crpc_bulkev;
693         __u64        *id = &rpc->crpc_reqstmsg.msg_body.reqst.bulkid;
694         int           rc;
695         int           opt;
696
697         LASSERT (bk->bk_niov <= LNET_MAX_IOV);
698
699         if (bk->bk_niov == 0) return 0; /* nothing to do */
700
701         opt = bk->bk_sink ? LNET_MD_OP_PUT : LNET_MD_OP_GET;
702 #ifdef __KERNEL__
703         opt |= LNET_MD_KIOV;
704 #else
705         opt |= LNET_MD_IOVEC;
706 #endif
707
708         ev->ev_fired = 0;
709         ev->ev_data  = rpc;
710         ev->ev_type  = SRPC_BULK_REQ_RCVD;
711
712         *id = srpc_next_id();
713
714         rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, *id,
715                                     &bk->bk_iovs[0], bk->bk_niov, opt,
716                                     rpc->crpc_dest, &bk->bk_mdh, ev);
717         if (rc != 0) {
718                 LASSERT (rc == -ENOMEM);
719                 ev->ev_fired = 1;  /* no more event expected */
720         }
721         return rc;
722 }
723
724 int
725 srpc_do_bulk (srpc_server_rpc_t *rpc)
726 {
727         srpc_event_t  *ev = &rpc->srpc_ev;
728         srpc_bulk_t   *bk = rpc->srpc_bulk;
729         __u64          id = rpc->srpc_reqstbuf->buf_msg.msg_body.reqst.bulkid;
730         int            rc;
731         int            opt;
732
733         LASSERT (bk != NULL);
734
735         opt = bk->bk_sink ? LNET_MD_OP_GET : LNET_MD_OP_PUT;
736 #ifdef __KERNEL__
737         opt |= LNET_MD_KIOV;
738 #else
739         opt |= LNET_MD_IOVEC;
740 #endif
741
742         ev->ev_fired = 0;
743         ev->ev_data  = rpc;
744         ev->ev_type  = bk->bk_sink ? SRPC_BULK_GET_RPLD : SRPC_BULK_PUT_SENT;
745
746         rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, id,
747                                    &bk->bk_iovs[0], bk->bk_niov, opt,
748                                    rpc->srpc_peer, rpc->srpc_self,
749                                    &bk->bk_mdh, ev);
750         if (rc != 0)
751                 ev->ev_fired = 1;  /* no more event expected */
752         return rc;
753 }
754
755 /* only called from srpc_handle_rpc */
756 void
757 srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status)
758 {
759         srpc_service_t *sv = rpc->srpc_service;
760         srpc_buffer_t  *buffer;
761
762         LASSERT (status != 0 || rpc->srpc_wi.wi_state == SWI_STATE_DONE);
763
764         rpc->srpc_status = status;
765
766         CDEBUG_LIMIT(status == 0 ? D_NET : D_NETERROR,
767                      "Server RPC %p done: service %s, peer %s, status %s:%d\n",
768                      rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
769                      swi_state2str(rpc->srpc_wi.wi_state), status);
770
771         if (status != 0) {
772                 spin_lock(&srpc_data.rpc_glock);
773                 srpc_data.rpc_counters.rpcs_dropped++;
774                 spin_unlock(&srpc_data.rpc_glock);
775         }
776
777         if (rpc->srpc_done != NULL)
778                 (*rpc->srpc_done) (rpc);
779         LASSERT (rpc->srpc_bulk == NULL);
780
781         spin_lock(&sv->sv_lock);
782
783         if (rpc->srpc_reqstbuf != NULL) {
784                 /* NB might drop sv_lock in srpc_service_recycle_buffer, but
785                  * sv won't go away for sv_active_rpcq must not be empty */
786                 srpc_service_recycle_buffer(sv, rpc->srpc_reqstbuf);
787                 rpc->srpc_reqstbuf = NULL;
788         }
789
790         list_del(&rpc->srpc_list); /* from sv->sv_active_rpcq */
791
792         /*
793          * No one can schedule me now since:
794          * - I'm not on sv_active_rpcq.
795          * - all LNet events have been fired.
796          * Cancel pending schedules and prevent future schedule attempts:
797          */
798         LASSERT (rpc->srpc_ev.ev_fired);
799         swi_kill_workitem(&rpc->srpc_wi);
800
801         if (!sv->sv_shuttingdown && !list_empty(&sv->sv_blocked_msgq)) {
802                 buffer = list_entry(sv->sv_blocked_msgq.next,
803                                     srpc_buffer_t, buf_list);
804                 list_del(&buffer->buf_list);
805
806                 srpc_init_server_rpc(rpc, sv, buffer);
807                 list_add_tail(&rpc->srpc_list, &sv->sv_active_rpcq);
808                 srpc_schedule_server_rpc(rpc);
809         } else {
810                 list_add(&rpc->srpc_list, &sv->sv_free_rpcq);
811         }
812
813         spin_unlock(&sv->sv_lock);
814         return;
815 }
816
817 /* handles an incoming RPC */
818 int
819 srpc_handle_rpc (swi_workitem_t *wi)
820 {
821         srpc_server_rpc_t *rpc = wi->wi_data;
822         srpc_service_t    *sv = rpc->srpc_service;
823         srpc_event_t      *ev = &rpc->srpc_ev;
824         int                rc = 0;
825
826         LASSERT (wi == &rpc->srpc_wi);
827
828         spin_lock(&sv->sv_lock);
829
830         if (sv->sv_shuttingdown || rpc->srpc_aborted) {
831                 spin_unlock(&sv->sv_lock);
832
833                 if (rpc->srpc_bulk != NULL)
834                         LNetMDUnlink(rpc->srpc_bulk->bk_mdh);
835                 LNetMDUnlink(rpc->srpc_replymdh);
836
837                 if (ev->ev_fired) { /* no more event, OK to finish */
838                         srpc_server_rpc_done(rpc, -ESHUTDOWN);
839                         return 1;
840                 }
841                 return 0;
842         }
843
844         spin_unlock(&sv->sv_lock);
845
846         switch (wi->wi_state) {
847         default:
848                 LBUG ();
849         case SWI_STATE_NEWBORN: {
850                 srpc_msg_t           *msg;
851                 srpc_generic_reply_t *reply;
852
853                 msg = &rpc->srpc_reqstbuf->buf_msg;
854                 reply = &rpc->srpc_replymsg.msg_body.reply;
855
856                 if (msg->msg_magic == 0) {
857                         /* moaned already in srpc_lnet_ev_handler */
858                         rc = EBADMSG;
859                 } else if (msg->msg_version != SRPC_MSG_VERSION &&
860                            msg->msg_version != __swab32(SRPC_MSG_VERSION)) {
861                         CWARN ("Version mismatch: %u, %u expected, from %s\n",
862                                msg->msg_version, SRPC_MSG_VERSION,
863                                libcfs_id2str(rpc->srpc_peer));
864                         reply->status = EPROTO;
865                 } else {
866                         reply->status = 0;
867                         rc = (*sv->sv_handler) (rpc);
868                         LASSERT (reply->status == 0 || !rpc->srpc_bulk);
869                 }
870
871                 if (rc != 0) {
872                         srpc_server_rpc_done(rpc, rc);
873                         return 1;
874                 }
875
876                 wi->wi_state = SWI_STATE_BULK_STARTED;
877
878                 if (rpc->srpc_bulk != NULL) {
879                         rc = srpc_do_bulk(rpc);
880                         if (rc == 0)
881                                 return 0; /* wait for bulk */
882
883                         LASSERT (ev->ev_fired);
884                         ev->ev_status = rc;
885                 }
886         }
887         case SWI_STATE_BULK_STARTED:
888                 LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired);
889
890                 if (rpc->srpc_bulk != NULL) {
891                         rc = ev->ev_status;
892
893                         if (sv->sv_bulk_ready != NULL)
894                                 rc = (*sv->sv_bulk_ready) (rpc, rc);
895
896                         if (rc != 0) {
897                                 srpc_server_rpc_done(rpc, rc);
898                                 return 1;
899                         }
900                 }
901
902                 wi->wi_state = SWI_STATE_REPLY_SUBMITTED;
903                 rc = srpc_send_reply(rpc);
904                 if (rc == 0)
905                         return 0; /* wait for reply */
906                 srpc_server_rpc_done(rpc, rc);
907                 return 1;
908
909         case SWI_STATE_REPLY_SUBMITTED:
910                 if (!ev->ev_fired) {
911                         CERROR("RPC %p: bulk %p, service %d\n",
912                                rpc, rpc->srpc_bulk, rpc->srpc_service->sv_id);
913                         CERROR("Event: status %d, type %d, lnet %d\n",
914                                ev->ev_status, ev->ev_type, ev->ev_lnet);
915                         LASSERT (ev->ev_fired);
916                 }
917
918                 wi->wi_state = SWI_STATE_DONE;
919                 srpc_server_rpc_done(rpc, ev->ev_status);
920                 return 1;
921         }
922
923         return 0;
924 }
925
926 void
927 srpc_client_rpc_expired (void *data)
928 {
929         srpc_client_rpc_t *rpc = data;
930
931         CWARN ("Client RPC expired: service %d, peer %s, timeout %d.\n",
932                rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
933                rpc->crpc_timeout);
934
935         spin_lock(&rpc->crpc_lock);
936
937         rpc->crpc_timeout = 0;
938         srpc_abort_rpc(rpc, -ETIMEDOUT);
939
940         spin_unlock(&rpc->crpc_lock);
941
942         spin_lock(&srpc_data.rpc_glock);
943         srpc_data.rpc_counters.rpcs_expired++;
944         spin_unlock(&srpc_data.rpc_glock);
945         return;
946 }
947
948 inline void
949 srpc_add_client_rpc_timer (srpc_client_rpc_t *rpc)
950 {
951         stt_timer_t *timer = &rpc->crpc_timer;
952
953         if (rpc->crpc_timeout == 0) return;
954
955         CFS_INIT_LIST_HEAD(&timer->stt_list);
956         timer->stt_data    = rpc;
957         timer->stt_func    = srpc_client_rpc_expired;
958         timer->stt_expires = cfs_time_add(rpc->crpc_timeout,
959                                           cfs_time_current_sec());
960         stt_add_timer(timer);
961         return;
962 }
963
964 /*
965  * Called with rpc->crpc_lock held.
966  *
967  * Upon exit the RPC expiry timer is not queued and the handler is not
968  * running on any CPU. */
969 void
970 srpc_del_client_rpc_timer (srpc_client_rpc_t *rpc)
971 {
972         /* timer not planted or already exploded */
973         if (rpc->crpc_timeout == 0) return;
974
975         /* timer sucessfully defused */
976         if (stt_del_timer(&rpc->crpc_timer)) return;
977
978 #ifdef __KERNEL__
979         /* timer detonated, wait for it to explode */
980         while (rpc->crpc_timeout != 0) {
981                 spin_unlock(&rpc->crpc_lock);
982
983                 cfs_schedule();
984
985                 spin_lock(&rpc->crpc_lock);
986         }
987 #else
988         LBUG(); /* impossible in single-threaded runtime */
989 #endif
990         return;
991 }
992
993 void
994 srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status)
995 {
996         swi_workitem_t *wi = &rpc->crpc_wi;
997
998         LASSERT (status != 0 || wi->wi_state == SWI_STATE_DONE);
999
1000         spin_lock(&rpc->crpc_lock);
1001
1002         rpc->crpc_closed = 1;
1003         if (rpc->crpc_status == 0)
1004                 rpc->crpc_status = status;
1005
1006         srpc_del_client_rpc_timer(rpc);
1007
1008         CDEBUG_LIMIT((status == 0) ? D_NET : D_NETERROR,
1009                      "Client RPC done: service %d, peer %s, status %s:%d:%d\n",
1010                      rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1011                      swi_state2str(wi->wi_state), rpc->crpc_aborted, status);
1012
1013         /*
1014          * No one can schedule me now since:
1015          * - RPC timer has been defused.
1016          * - all LNet events have been fired.
1017          * - crpc_closed has been set, preventing srpc_abort_rpc from
1018          *   scheduling me.
1019          * Cancel pending schedules and prevent future schedule attempts:
1020          */
1021         LASSERT (!srpc_event_pending(rpc));
1022         swi_kill_workitem(wi);
1023
1024         spin_unlock(&rpc->crpc_lock);
1025
1026         (*rpc->crpc_done) (rpc);
1027         return;
1028 }
1029
1030 /* sends an outgoing RPC */
1031 int
1032 srpc_send_rpc (swi_workitem_t *wi)
1033 {
1034         int                rc = 0;
1035         srpc_client_rpc_t *rpc = wi->wi_data;
1036         srpc_msg_t        *reply = &rpc->crpc_replymsg;
1037         int                do_bulk = rpc->crpc_bulk.bk_niov > 0;
1038
1039         LASSERT (rpc != NULL);
1040         LASSERT (wi == &rpc->crpc_wi);
1041
1042         spin_lock(&rpc->crpc_lock);
1043
1044         if (rpc->crpc_aborted) {
1045                 spin_unlock(&rpc->crpc_lock);
1046                 goto abort;
1047         }
1048
1049         spin_unlock(&rpc->crpc_lock);
1050
1051         switch (wi->wi_state) {
1052         default:
1053                 LBUG ();
1054         case SWI_STATE_NEWBORN:
1055                 LASSERT (!srpc_event_pending(rpc));
1056
1057                 rc = srpc_prepare_reply(rpc);
1058                 if (rc != 0) {
1059                         srpc_client_rpc_done(rpc, rc);
1060                         return 1;
1061                 }
1062
1063                 rc = srpc_prepare_bulk(rpc);
1064                 if (rc != 0) break;
1065
1066                 wi->wi_state = SWI_STATE_REQUEST_SUBMITTED;
1067                 rc = srpc_send_request(rpc);
1068                 break;
1069
1070         case SWI_STATE_REQUEST_SUBMITTED:
1071                 /* CAVEAT EMPTOR: rqtev, rpyev, and bulkev may come in any
1072                  * order; however, they're processed in a strict order:
1073                  * rqt, rpy, and bulk. */
1074                 if (!rpc->crpc_reqstev.ev_fired) break;
1075
1076                 rc = rpc->crpc_reqstev.ev_status;
1077                 if (rc != 0) break;
1078
1079                 wi->wi_state = SWI_STATE_REQUEST_SENT;
1080                 /* perhaps more events, fall thru */
1081         case SWI_STATE_REQUEST_SENT: {
1082                 srpc_msg_type_t type = srpc_service2reply(rpc->crpc_service);
1083
1084                 if (!rpc->crpc_replyev.ev_fired) break;
1085
1086                 rc = rpc->crpc_replyev.ev_status;
1087                 if (rc != 0) break;
1088
1089                 if ((reply->msg_type != type &&
1090                      reply->msg_type != __swab32(type)) ||
1091                     (reply->msg_magic != SRPC_MSG_MAGIC &&
1092                      reply->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
1093                         CWARN ("Bad message from %s: type %u (%d expected),"
1094                                " magic %u (%d expected).\n",
1095                                libcfs_id2str(rpc->crpc_dest),
1096                                reply->msg_type, type,
1097                                reply->msg_magic, SRPC_MSG_MAGIC);
1098                         rc = -EBADMSG;
1099                         break;
1100                 }
1101
1102                 if (do_bulk && reply->msg_body.reply.status != 0) {
1103                         CWARN ("Remote error %d at %s, unlink bulk buffer in "
1104                                "case peer didn't initiate bulk transfer\n",
1105                                reply->msg_body.reply.status,
1106                                libcfs_id2str(rpc->crpc_dest));
1107                         LNetMDUnlink(rpc->crpc_bulk.bk_mdh);
1108                 }
1109
1110                 wi->wi_state = SWI_STATE_REPLY_RECEIVED;
1111         }
1112         case SWI_STATE_REPLY_RECEIVED:
1113                 if (do_bulk && !rpc->crpc_bulkev.ev_fired) break;
1114
1115                 rc = do_bulk ? rpc->crpc_bulkev.ev_status : 0;
1116
1117                 /* Bulk buffer was unlinked due to remote error. Clear error
1118                  * since reply buffer still contains valid data.
1119                  * NB rpc->crpc_done shouldn't look into bulk data in case of
1120                  * remote error. */
1121                 if (do_bulk && rpc->crpc_bulkev.ev_lnet == LNET_EVENT_UNLINK &&
1122                     rpc->crpc_status == 0 && reply->msg_body.reply.status != 0)
1123                         rc = 0;
1124
1125                 wi->wi_state = SWI_STATE_DONE;
1126                 srpc_client_rpc_done(rpc, rc);
1127                 return 1;
1128         }
1129
1130         if (rc != 0) {
1131                 spin_lock(&rpc->crpc_lock);
1132                 srpc_abort_rpc(rpc, rc);
1133                 spin_unlock(&rpc->crpc_lock);
1134         }
1135
1136 abort:
1137         if (rpc->crpc_aborted) {
1138                 LNetMDUnlink(rpc->crpc_reqstmdh);
1139                 LNetMDUnlink(rpc->crpc_replymdh);
1140                 LNetMDUnlink(rpc->crpc_bulk.bk_mdh);
1141
1142                 if (!srpc_event_pending(rpc)) {
1143                         srpc_client_rpc_done(rpc, -EINTR);
1144                         return 1;
1145                 }
1146         }
1147         return 0;
1148 }
1149
1150 srpc_client_rpc_t *
1151 srpc_create_client_rpc (lnet_process_id_t peer, int service,
1152                         int nbulkiov, int bulklen,
1153                         void (*rpc_done)(srpc_client_rpc_t *),
1154                         void (*rpc_fini)(srpc_client_rpc_t *), void *priv)
1155 {
1156         srpc_client_rpc_t *rpc;
1157
1158         LIBCFS_ALLOC(rpc, offsetof(srpc_client_rpc_t,
1159                                    crpc_bulk.bk_iovs[nbulkiov]));
1160         if (rpc == NULL)
1161                 return NULL;
1162
1163         srpc_init_client_rpc(rpc, peer, service, nbulkiov,
1164                              bulklen, rpc_done, rpc_fini, priv);
1165         return rpc;
1166 }
1167
1168 /* called with rpc->crpc_lock held */
1169 void
1170 srpc_abort_rpc (srpc_client_rpc_t *rpc, int why)
1171 {
1172         LASSERT (why != 0);
1173
1174         if (rpc->crpc_aborted || /* already aborted */
1175             rpc->crpc_closed)    /* callback imminent */
1176                 return;
1177
1178         CDEBUG (D_NET,
1179                 "Aborting RPC: service %d, peer %s, state %s, why %d\n",
1180                 rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1181                 swi_state2str(rpc->crpc_wi.wi_state), why);
1182
1183         rpc->crpc_aborted = 1;
1184         rpc->crpc_status  = why;
1185         swi_schedule_workitem(&rpc->crpc_wi);
1186         return;
1187 }
1188
1189 /* called with rpc->crpc_lock held */
1190 void
1191 srpc_post_rpc (srpc_client_rpc_t *rpc)
1192 {
1193         LASSERT (!rpc->crpc_aborted);
1194         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
1195         LASSERT ((rpc->crpc_bulk.bk_len & ~CFS_PAGE_MASK) == 0);
1196
1197         CDEBUG (D_NET, "Posting RPC: peer %s, service %d, timeout %d\n",
1198                 libcfs_id2str(rpc->crpc_dest), rpc->crpc_service,
1199                 rpc->crpc_timeout);
1200
1201         srpc_add_client_rpc_timer(rpc);
1202         swi_schedule_workitem(&rpc->crpc_wi);
1203         return;
1204 }
1205
1206
1207 int
1208 srpc_send_reply (srpc_server_rpc_t *rpc)
1209 {
1210         srpc_event_t   *ev = &rpc->srpc_ev;
1211         srpc_msg_t     *msg = &rpc->srpc_replymsg;
1212         srpc_buffer_t  *buffer = rpc->srpc_reqstbuf;
1213         srpc_service_t *sv = rpc->srpc_service;
1214         __u64           rpyid;
1215         int             rc;
1216
1217         LASSERT (buffer != NULL);
1218         rpyid = buffer->buf_msg.msg_body.reqst.rpyid;
1219
1220         spin_lock(&sv->sv_lock);
1221
1222         if (!sv->sv_shuttingdown &&
1223             sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID) {
1224                 /* Repost buffer before replying since test client
1225                  * might send me another RPC once it gets the reply */
1226                 if (srpc_service_post_buffer(sv, buffer) != 0)
1227                         CWARN ("Failed to repost %s buffer\n", sv->sv_name);
1228                 rpc->srpc_reqstbuf = NULL;
1229         }
1230
1231         spin_unlock(&sv->sv_lock);
1232
1233         ev->ev_fired = 0;
1234         ev->ev_data  = rpc;
1235         ev->ev_type  = SRPC_REPLY_SENT;
1236
1237         msg->msg_magic   = SRPC_MSG_MAGIC;
1238         msg->msg_version = SRPC_MSG_VERSION;
1239         msg->msg_type    = srpc_service2reply(sv->sv_id);
1240
1241         rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, rpyid, msg,
1242                                    sizeof(*msg), LNET_MD_OP_PUT,
1243                                    rpc->srpc_peer, rpc->srpc_self,
1244                                    &rpc->srpc_replymdh, ev);
1245         if (rc != 0)
1246                 ev->ev_fired = 1;  /* no more event expected */
1247         return rc;
1248 }
1249
1250 /* when in kernel always called with LNET_LOCK() held, and in thread context */
1251 void
1252 srpc_lnet_ev_handler (lnet_event_t *ev)
1253 {
1254         srpc_event_t      *rpcev = ev->md.user_ptr;
1255         srpc_client_rpc_t *crpc;
1256         srpc_server_rpc_t *srpc;
1257         srpc_buffer_t     *buffer;
1258         srpc_service_t    *sv;
1259         srpc_msg_t        *msg;
1260         srpc_msg_type_t    type;
1261
1262         LASSERT (!in_interrupt());
1263
1264         if (ev->status != 0) {
1265                 spin_lock(&srpc_data.rpc_glock);
1266                 srpc_data.rpc_counters.errors++;
1267                 spin_unlock(&srpc_data.rpc_glock);
1268         }
1269
1270         rpcev->ev_lnet = ev->type;
1271
1272         switch (rpcev->ev_type) {
1273         default:
1274                 CERROR("Unknown event: status %d, type %d, lnet %d\n",
1275                        rpcev->ev_status, rpcev->ev_type, rpcev->ev_lnet);
1276                 LBUG ();
1277         case SRPC_REQUEST_SENT:
1278                 if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {
1279                         spin_lock(&srpc_data.rpc_glock);
1280                         srpc_data.rpc_counters.rpcs_sent++;
1281                         spin_unlock(&srpc_data.rpc_glock);
1282                 }
1283         case SRPC_REPLY_RCVD:
1284         case SRPC_BULK_REQ_RCVD:
1285                 crpc = rpcev->ev_data;
1286
1287                 if (rpcev != &crpc->crpc_reqstev &&
1288                     rpcev != &crpc->crpc_replyev &&
1289                     rpcev != &crpc->crpc_bulkev) {
1290                         CERROR("rpcev %p, crpc %p, reqstev %p, replyev %p, bulkev %p\n",
1291                                rpcev, crpc, &crpc->crpc_reqstev,
1292                                &crpc->crpc_replyev, &crpc->crpc_bulkev);
1293                         CERROR("Bad event: status %d, type %d, lnet %d\n",
1294                                rpcev->ev_status, rpcev->ev_type, rpcev->ev_lnet);
1295                         LBUG ();
1296                 }
1297
1298                 spin_lock(&crpc->crpc_lock);
1299
1300                 LASSERT (rpcev->ev_fired == 0);
1301                 rpcev->ev_fired  = 1;
1302                 rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
1303                                                 -EINTR : ev->status;
1304                 swi_schedule_workitem(&crpc->crpc_wi);
1305
1306                 spin_unlock(&crpc->crpc_lock);
1307                 break;
1308
1309         case SRPC_REQUEST_RCVD:
1310                 sv = rpcev->ev_data;
1311
1312                 LASSERT (rpcev == &sv->sv_ev);
1313
1314                 spin_lock(&sv->sv_lock);
1315
1316                 LASSERT (ev->unlinked);
1317                 LASSERT (ev->type == LNET_EVENT_PUT ||
1318                          ev->type == LNET_EVENT_UNLINK);
1319                 LASSERT (ev->type != LNET_EVENT_UNLINK ||
1320                          sv->sv_shuttingdown);
1321
1322                 buffer = container_of(ev->md.start, srpc_buffer_t, buf_msg);
1323                 buffer->buf_peer = ev->initiator;
1324                 buffer->buf_self = ev->target.nid;
1325
1326                 sv->sv_nposted_msg--;
1327                 LASSERT (sv->sv_nposted_msg >= 0);
1328
1329                 if (sv->sv_shuttingdown) {
1330                         /* Leave buffer on sv->sv_posted_msgq since
1331                          * srpc_finish_service needs to traverse it. */
1332                         spin_unlock(&sv->sv_lock);
1333                         break;
1334                 }
1335
1336                 list_del(&buffer->buf_list); /* from sv->sv_posted_msgq */
1337                 msg = &buffer->buf_msg;
1338                 type = srpc_service2request(sv->sv_id);
1339
1340                 if (ev->status != 0 || ev->mlength != sizeof(*msg) ||
1341                     (msg->msg_type != type &&
1342                      msg->msg_type != __swab32(type)) ||
1343                     (msg->msg_magic != SRPC_MSG_MAGIC &&
1344                      msg->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
1345                         CERROR ("Dropping RPC (%s) from %s: "
1346                                 "status %d mlength %d type %u magic %u.\n",
1347                                 sv->sv_name, libcfs_id2str(ev->initiator),
1348                                 ev->status, ev->mlength,
1349                                 msg->msg_type, msg->msg_magic);
1350
1351                         /* NB can't call srpc_service_recycle_buffer here since
1352                          * it may call LNetM[DE]Attach. The invalid magic tells
1353                          * srpc_handle_rpc to drop this RPC */
1354                         msg->msg_magic = 0;
1355                 }
1356
1357                 if (!list_empty(&sv->sv_free_rpcq)) {
1358                         srpc = list_entry(sv->sv_free_rpcq.next,
1359                                           srpc_server_rpc_t, srpc_list);
1360                         list_del(&srpc->srpc_list);
1361
1362                         srpc_init_server_rpc(srpc, sv, buffer);
1363                         list_add_tail(&srpc->srpc_list, &sv->sv_active_rpcq);
1364                         srpc_schedule_server_rpc(srpc);
1365                 } else {
1366                         list_add_tail(&buffer->buf_list, &sv->sv_blocked_msgq);
1367                 }
1368
1369                 spin_unlock(&sv->sv_lock);
1370
1371                 spin_lock(&srpc_data.rpc_glock);
1372                 srpc_data.rpc_counters.rpcs_rcvd++;
1373                 spin_unlock(&srpc_data.rpc_glock);
1374                 break;
1375
1376         case SRPC_BULK_GET_RPLD:
1377                 LASSERT (ev->type == LNET_EVENT_SEND ||
1378                          ev->type == LNET_EVENT_REPLY ||
1379                          ev->type == LNET_EVENT_UNLINK);
1380
1381                 if (!ev->unlinked)
1382                         break; /* wait for final event */
1383
1384         case SRPC_BULK_PUT_SENT:
1385                 if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {
1386                         spin_lock(&srpc_data.rpc_glock);
1387
1388                         if (rpcev->ev_type == SRPC_BULK_GET_RPLD)
1389                                 srpc_data.rpc_counters.bulk_get += ev->mlength;
1390                         else
1391                                 srpc_data.rpc_counters.bulk_put += ev->mlength;
1392
1393                         spin_unlock(&srpc_data.rpc_glock);
1394                 }
1395         case SRPC_REPLY_SENT:
1396                 srpc = rpcev->ev_data;
1397                 sv = srpc->srpc_service;
1398
1399                 LASSERT (rpcev == &srpc->srpc_ev);
1400
1401                 spin_lock(&sv->sv_lock);
1402
1403                 rpcev->ev_fired  = 1;
1404                 rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
1405                                                 -EINTR : ev->status;
1406                 srpc_schedule_server_rpc(srpc);
1407
1408                 spin_unlock(&sv->sv_lock);
1409                 break;
1410         }
1411
1412         return;
1413 }
1414
1415 #ifndef __KERNEL__
1416
1417 int
1418 srpc_check_event (int timeout)
1419 {
1420         lnet_event_t ev;
1421         int          rc;
1422         int          i;
1423
1424         rc = LNetEQPoll(&srpc_data.rpc_lnet_eq, 1,
1425                         timeout * 1000, &ev, &i);
1426         if (rc == 0) return 0;
1427
1428         LASSERT (rc == -EOVERFLOW || rc == 1);
1429
1430         /* We can't affort to miss any events... */
1431         if (rc == -EOVERFLOW) {
1432                 CERROR ("Dropped an event!!!\n");
1433                 abort();
1434         }
1435
1436         srpc_lnet_ev_handler(&ev);
1437         return 1;
1438 }
1439
1440 #endif
1441
1442 int
1443 srpc_startup (void)
1444 {
1445         int rc;
1446
1447         memset(&srpc_data, 0, sizeof(struct smoketest_rpc));
1448         spin_lock_init(&srpc_data.rpc_glock);
1449
1450         /* 1 second pause to avoid timestamp reuse */
1451         cfs_pause(cfs_time_seconds(1));
1452         srpc_data.rpc_matchbits = ((__u64) cfs_time_current_sec()) << 48;
1453
1454         srpc_data.rpc_state = SRPC_STATE_NONE;
1455
1456 #ifdef __KERNEL__
1457         rc = LNetNIInit(LUSTRE_SRV_LNET_PID);
1458 #else
1459         if (the_lnet.ln_server_mode_flag)
1460                 rc = LNetNIInit(LUSTRE_SRV_LNET_PID);
1461         else
1462                 rc = LNetNIInit(getpid() | LNET_PID_USERFLAG);
1463 #endif
1464         if (rc < 0) {
1465                 CERROR ("LNetNIInit() has failed: %d\n", rc);
1466                 return rc;
1467         }
1468
1469         srpc_data.rpc_state = SRPC_STATE_NI_INIT;
1470
1471         srpc_data.rpc_lnet_eq = LNET_EQ_NONE;
1472 #ifdef __KERNEL__
1473         rc = LNetEQAlloc(16, srpc_lnet_ev_handler, &srpc_data.rpc_lnet_eq);
1474 #else
1475         rc = LNetEQAlloc(10240, LNET_EQ_HANDLER_NONE, &srpc_data.rpc_lnet_eq);
1476 #endif
1477         if (rc != 0) {
1478                 CERROR("LNetEQAlloc() has failed: %d\n", rc);
1479                 goto bail;
1480         }
1481
1482         rc = LNetSetLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
1483         LASSERT (rc == 0);
1484
1485         srpc_data.rpc_state = SRPC_STATE_EQ_INIT;
1486
1487         rc = swi_startup();
1488         if (rc != 0)
1489                 goto bail;
1490
1491         srpc_data.rpc_state = SRPC_STATE_WI_INIT;
1492
1493         rc = stt_startup();
1494
1495 bail:
1496         if (rc != 0)
1497                 srpc_shutdown();
1498         else
1499                 srpc_data.rpc_state = SRPC_STATE_RUNNING;
1500
1501         return rc;
1502 }
1503
1504 void
1505 srpc_shutdown (void)
1506 {
1507         int i;
1508         int rc;
1509         int state;
1510
1511         state = srpc_data.rpc_state;
1512         srpc_data.rpc_state = SRPC_STATE_STOPPING;
1513
1514         switch (state) {
1515         default:
1516                 LBUG ();
1517         case SRPC_STATE_RUNNING:
1518                 spin_lock(&srpc_data.rpc_glock);
1519
1520                 for (i = 0; i <= SRPC_SERVICE_MAX_ID; i++) {
1521                         srpc_service_t *sv = srpc_data.rpc_services[i];
1522
1523                         LASSERTF (sv == NULL,
1524                                   "service not empty: id %d, name %s\n",
1525                                   i, sv->sv_name);
1526                 }
1527
1528                 spin_unlock(&srpc_data.rpc_glock);
1529
1530                 stt_shutdown();
1531
1532         case SRPC_STATE_WI_INIT:
1533                 swi_shutdown();
1534
1535         case SRPC_STATE_EQ_INIT:
1536                 rc = LNetClearLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
1537                 LASSERT (rc == 0);
1538                 rc = LNetEQFree(srpc_data.rpc_lnet_eq);
1539                 LASSERT (rc == 0); /* the EQ should have no user by now */
1540
1541         case SRPC_STATE_NI_INIT:
1542                 LNetNIFini();
1543                 break;
1544         }
1545
1546         return;
1547 }