Whamcloud - gitweb
057a081da63583d2fe8065370689f09aea8e3383
[fs/lustre-release.git] / lnet / selftest / rpc.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Isaac Huang <isaac@clusterfs.com>
6  *
7  */
8
9 #define DEBUG_SUBSYSTEM S_LNET
10
11 #include <libcfs/kp30.h>
12 #include <libcfs/libcfs.h>
13 #include <lnet/lib-lnet.h>
14
15 #include "selftest.h"
16
17
18 typedef enum {
19         SRPC_STATE_NONE,
20         SRPC_STATE_NI_INIT,
21         SRPC_STATE_EQ_INIT,
22         SRPC_STATE_WI_INIT,
23         SRPC_STATE_RUNNING,
24         SRPC_STATE_STOPPING,
25 } srpc_state_t;
26
27 #define SRPC_PEER_HASH_SIZE       101  /* # peer lists */
28 #define SRPC_PEER_CREDITS         16   /* >= most LND's default peer credit */
29
30 struct smoketest_rpc {
31         spinlock_t        rpc_glock;     /* global lock */
32         srpc_service_t   *rpc_services[SRPC_SERVICE_MAX_ID + 1];
33         struct list_head *rpc_peers;     /* hash table of known peers */
34         lnet_handle_eq_t  rpc_lnet_eq;   /* _the_ LNet event queue */
35         srpc_state_t      rpc_state;
36         srpc_counters_t   rpc_counters;
37         __u64             rpc_matchbits; /* matchbits counter */
38 } srpc_data;
39
40 /* forward ref's */
41 int srpc_handle_rpc (swi_workitem_t *wi);
42
43 void srpc_get_counters (srpc_counters_t *cnt)
44 {
45         spin_lock(&srpc_data.rpc_glock);
46         *cnt = srpc_data.rpc_counters;
47         spin_unlock(&srpc_data.rpc_glock);
48 }
49
50 void srpc_set_counters (const srpc_counters_t *cnt)
51 {
52         spin_lock(&srpc_data.rpc_glock);
53         srpc_data.rpc_counters = *cnt;
54         spin_unlock(&srpc_data.rpc_glock);
55 }
56
57 void
58 srpc_add_bulk_page (srpc_bulk_t *bk, cfs_page_t *pg, int i)
59 {
60         LASSERT (i >= 0 && i < bk->bk_niov);
61
62 #ifdef __KERNEL__
63         bk->bk_iovs[i].kiov_offset = 0;
64         bk->bk_iovs[i].kiov_page   = pg;
65         bk->bk_iovs[i].kiov_len    = CFS_PAGE_SIZE;
66 #else
67         LASSERT (bk->bk_pages != NULL);
68
69         bk->bk_pages[i] = pg;
70         bk->bk_iovs[i].iov_len  = CFS_PAGE_SIZE;
71         bk->bk_iovs[i].iov_base = cfs_page_address(pg);
72 #endif
73         return;
74 }
75
76 void
77 srpc_free_bulk (srpc_bulk_t *bk)
78 {
79         int         i;
80         cfs_page_t *pg;
81
82         LASSERT (bk != NULL);
83 #ifndef __KERNEL__
84         LASSERT (bk->bk_pages != NULL);
85 #endif
86
87         for (i = 0; i < bk->bk_niov; i++) {
88 #ifdef __KERNEL__
89                 pg = bk->bk_iovs[i].kiov_page;
90 #else
91                 pg = bk->bk_pages[i];
92 #endif
93                 if (pg == NULL) break;
94
95                 cfs_free_page(pg);
96         }
97
98 #ifndef __KERNEL__
99         LIBCFS_FREE(bk->bk_pages, sizeof(cfs_page_t *) * bk->bk_niov);
100 #endif
101         LIBCFS_FREE(bk, offsetof(srpc_bulk_t, bk_iovs[bk->bk_niov]));
102         return;
103 }
104
105 srpc_bulk_t *
106 srpc_alloc_bulk (int npages, int sink)
107 {
108         srpc_bulk_t  *bk;
109         cfs_page_t  **pages;
110         int           i;
111
112         LASSERT (npages > 0 && npages <= LNET_MAX_IOV);
113
114         LIBCFS_ALLOC(bk, offsetof(srpc_bulk_t, bk_iovs[npages]));
115         if (bk == NULL) {
116                 CERROR ("Can't allocate descriptor for %d pages\n", npages);
117                 return NULL;
118         }
119
120         memset(bk, 0, offsetof(srpc_bulk_t, bk_iovs[npages]));
121         bk->bk_sink = sink;
122         bk->bk_niov = npages;
123         bk->bk_len  = npages * CFS_PAGE_SIZE;
124 #ifndef __KERNEL__
125         LIBCFS_ALLOC(pages, sizeof(cfs_page_t *) * npages);
126         if (pages == NULL) {
127                 LIBCFS_FREE(bk, offsetof(srpc_bulk_t, bk_iovs[npages]));
128                 CERROR ("Can't allocate page array for %d pages\n", npages);
129                 return NULL;
130         }
131
132         memset(pages, 0, sizeof(cfs_page_t *) * npages);
133         bk->bk_pages = pages;
134 #else
135         UNUSED (pages);
136 #endif
137
138         for (i = 0; i < npages; i++) {
139                 cfs_page_t *pg = cfs_alloc_page(CFS_ALLOC_STD);
140
141                 if (pg == NULL) {
142                         CERROR ("Can't allocate page %d of %d\n", i, npages);
143                         srpc_free_bulk(bk);
144                         return NULL;
145                 }
146
147                 srpc_add_bulk_page(bk, pg, i);
148         }
149
150         return bk;
151 }
152
153
154 static inline struct list_head *
155 srpc_nid2peerlist (lnet_nid_t nid)
156 {
157         unsigned int hash = ((unsigned int)nid) % SRPC_PEER_HASH_SIZE;
158
159         return &srpc_data.rpc_peers[hash];
160 }
161
162 static inline srpc_peer_t *
163 srpc_create_peer (lnet_nid_t nid)
164 {
165         srpc_peer_t *peer;
166
167         LASSERT (nid != LNET_NID_ANY);
168
169         LIBCFS_ALLOC(peer, sizeof(srpc_peer_t));
170         if (peer == NULL) {
171                 CERROR ("Failed to allocate peer structure for %s\n",
172                         libcfs_nid2str(nid));
173                 return NULL;
174         }
175
176         memset(peer, 0, sizeof(srpc_peer_t));
177         peer->stp_nid     = nid;
178         peer->stp_credits = SRPC_PEER_CREDITS;
179
180         spin_lock_init(&peer->stp_lock);
181         CFS_INIT_LIST_HEAD(&peer->stp_rpcq);
182         CFS_INIT_LIST_HEAD(&peer->stp_ctl_rpcq);
183         return peer;
184 }
185
186 srpc_peer_t *
187 srpc_find_peer_locked (lnet_nid_t nid)
188 {
189         struct list_head *peer_list = srpc_nid2peerlist(nid);
190         srpc_peer_t      *peer;
191
192         LASSERT (nid != LNET_NID_ANY);
193
194         list_for_each_entry (peer, peer_list, stp_list) {
195                 if (peer->stp_nid == nid)
196                         return peer;
197         }
198
199         return NULL;
200 }
201
202 static srpc_peer_t *
203 srpc_nid2peer (lnet_nid_t nid)
204 {
205         srpc_peer_t *peer;
206         srpc_peer_t *new_peer;
207
208         spin_lock(&srpc_data.rpc_glock);
209         peer = srpc_find_peer_locked(nid);
210         spin_unlock(&srpc_data.rpc_glock);
211
212         if (peer != NULL)
213                 return peer;
214         
215         new_peer = srpc_create_peer(nid);
216
217         spin_lock(&srpc_data.rpc_glock);
218
219         peer = srpc_find_peer_locked(nid);
220         if (peer != NULL) {
221                 spin_unlock(&srpc_data.rpc_glock);
222                 if (new_peer != NULL)
223                         LIBCFS_FREE(new_peer, sizeof(srpc_peer_t));
224
225                 return peer;
226         }
227
228         if (new_peer == NULL) {
229                 spin_unlock(&srpc_data.rpc_glock);
230                 return NULL;
231         }
232                 
233         list_add_tail(&new_peer->stp_list, srpc_nid2peerlist(nid));
234         spin_unlock(&srpc_data.rpc_glock);
235         return new_peer;
236 }
237
238 static inline __u64
239 srpc_next_id (void)
240 {
241         __u64 id;
242
243         spin_lock(&srpc_data.rpc_glock);
244         id = srpc_data.rpc_matchbits++;
245         spin_unlock(&srpc_data.rpc_glock);
246         return id;
247 }
248
249 void
250 srpc_init_server_rpc (srpc_server_rpc_t *rpc,
251                       srpc_service_t *sv, srpc_buffer_t *buffer)
252 {
253         memset(rpc, 0, sizeof(*rpc));
254         swi_init_workitem(&rpc->srpc_wi, rpc, srpc_handle_rpc);
255
256         rpc->srpc_ev.ev_fired = 1; /* no event expected now */
257
258         rpc->srpc_service  = sv;
259         rpc->srpc_reqstbuf = buffer;
260         rpc->srpc_peer     = buffer->buf_peer;
261         rpc->srpc_self     = buffer->buf_self;
262         rpc->srpc_replymdh = LNET_INVALID_HANDLE;
263 }
264
265 int
266 srpc_add_service (srpc_service_t *sv)
267 {
268         int                id = sv->sv_id;
269         int                i;
270         srpc_server_rpc_t *rpc;
271
272         LASSERT (sv->sv_concur > 0);
273         LASSERT (0 <= id && id <= SRPC_SERVICE_MAX_ID);
274
275         spin_lock(&srpc_data.rpc_glock);
276
277         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
278
279         if (srpc_data.rpc_services[id] != NULL) {
280                 spin_unlock(&srpc_data.rpc_glock);
281                 return -EBUSY;
282         }
283
284         srpc_data.rpc_services[id] = sv;
285         spin_unlock(&srpc_data.rpc_glock);
286
287         sv->sv_nprune       = 0;
288         sv->sv_nposted_msg  = 0;
289         sv->sv_shuttingdown = 0;
290         spin_lock_init(&sv->sv_lock);
291         CFS_INIT_LIST_HEAD(&sv->sv_free_rpcq);
292         CFS_INIT_LIST_HEAD(&sv->sv_active_rpcq);
293         CFS_INIT_LIST_HEAD(&sv->sv_posted_msgq);
294         CFS_INIT_LIST_HEAD(&sv->sv_blocked_msgq);
295
296         sv->sv_ev.ev_data = sv;
297         sv->sv_ev.ev_type = SRPC_REQUEST_RCVD;
298
299         for (i = 0; i < sv->sv_concur; i++) {
300                 LIBCFS_ALLOC(rpc, sizeof(*rpc));
301                 if (rpc == NULL) goto enomem;
302
303                 list_add(&rpc->srpc_list, &sv->sv_free_rpcq);
304         }
305
306         CDEBUG (D_NET, "Adding service: id %d, name %s, concurrency %d\n",
307                 id, sv->sv_name, sv->sv_concur);
308         return 0;
309
310 enomem:
311         while (!list_empty(&sv->sv_free_rpcq)) {
312                 rpc = list_entry(sv->sv_free_rpcq.next,
313                                  srpc_server_rpc_t, srpc_list);
314                 list_del(&rpc->srpc_list);
315                 LIBCFS_FREE(rpc, sizeof(*rpc));
316         }
317
318         spin_lock(&srpc_data.rpc_glock);
319         srpc_data.rpc_services[id] = NULL;
320         spin_unlock(&srpc_data.rpc_glock);
321         return -ENOMEM;
322 }
323
324 int
325 srpc_remove_service (srpc_service_t *sv)
326 {
327         int id = sv->sv_id;
328
329         spin_lock(&srpc_data.rpc_glock);
330
331         if (srpc_data.rpc_services[id] != sv) {
332                 spin_unlock(&srpc_data.rpc_glock);
333                 return -ENOENT;
334         }
335
336         srpc_data.rpc_services[id] = NULL;
337         spin_unlock(&srpc_data.rpc_glock);
338         return 0;
339 }
340
341 int
342 srpc_post_passive_rdma(int portal, __u64 matchbits, void *buf,
343                        int len, int options, lnet_process_id_t peer,
344                        lnet_handle_md_t *mdh, srpc_event_t *ev)
345 {
346         int              rc;
347         lnet_md_t        md;
348         lnet_handle_me_t meh;
349
350         rc = LNetMEAttach(portal, peer, matchbits, 0,
351                           LNET_UNLINK, LNET_INS_AFTER, &meh);
352         if (rc != 0) {
353                 CERROR ("LNetMEAttach failed: %d\n", rc);
354                 LASSERT (rc == -ENOMEM);
355                 return -ENOMEM;
356         }
357
358         md.threshold = 1;
359         md.user_ptr  = ev;
360         md.start     = buf;
361         md.length    = len;
362         md.options   = options;
363         md.eq_handle = srpc_data.rpc_lnet_eq;
364
365         rc = LNetMDAttach(meh, md, LNET_UNLINK, mdh);
366         if (rc != 0) {
367                 CERROR ("LNetMDAttach failed: %d\n", rc);
368                 LASSERT (rc == -ENOMEM);
369
370                 rc = LNetMEUnlink(meh);
371                 LASSERT (rc == 0);
372                 return -ENOMEM;
373         }
374
375         CDEBUG (D_NET,
376                 "Posted passive RDMA: peer %s, portal %d, matchbits "LPX64"\n",
377                 libcfs_id2str(peer), portal, matchbits);
378         return 0;
379 }
380
381 int
382 srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len, 
383                       int options, lnet_process_id_t peer, lnet_nid_t self,
384                       lnet_handle_md_t *mdh, srpc_event_t *ev)
385 {
386         int       rc;
387         lnet_md_t md;
388
389         md.user_ptr  = ev;
390         md.start     = buf;
391         md.length    = len;
392         md.eq_handle = srpc_data.rpc_lnet_eq;
393         md.threshold = ((options & LNET_MD_OP_GET) != 0) ? 2 : 1;
394         md.options   = options & ~(LNET_MD_OP_PUT | LNET_MD_OP_GET);
395
396         rc = LNetMDBind(md, LNET_UNLINK, mdh);
397         if (rc != 0) {
398                 CERROR ("LNetMDBind failed: %d\n", rc);
399                 LASSERT (rc == -ENOMEM);
400                 return -ENOMEM;
401         }
402
403         /* this is kind of an abuse of the LNET_MD_OP_{PUT,GET} options.
404          * they're only meaningful for MDs attached to an ME (i.e. passive
405          * buffers... */
406         if ((options & LNET_MD_OP_PUT) != 0) {
407                 rc = LNetPut(self, *mdh, LNET_NOACK_REQ, peer,
408                              portal, matchbits, 0, 0);
409         } else {
410                 LASSERT ((options & LNET_MD_OP_GET) != 0);
411
412                 rc = LNetGet(self, *mdh, peer, portal, matchbits, 0);
413         }
414
415         if (rc != 0) {
416                 CERROR ("LNet%s(%s, %d, "LPD64") failed: %d\n",
417                         ((options & LNET_MD_OP_PUT) != 0) ? "Put" : "Get",
418                         libcfs_id2str(peer), portal, matchbits, rc);
419
420                 /* The forthcoming unlink event will complete this operation
421                  * with failure, so fall through and return success here.
422                  */
423                 rc = LNetMDUnlink(*mdh);
424                 LASSERT (rc == 0);
425         } else {
426                 CDEBUG (D_NET,
427                         "Posted active RDMA: peer %s, portal %u, matchbits "LPX64"\n",
428                         libcfs_id2str(peer), portal, matchbits);
429         }
430         return 0;
431 }
432
433 int
434 srpc_post_active_rqtbuf(lnet_process_id_t peer, int service, void *buf,
435                         int len, lnet_handle_md_t *mdh, srpc_event_t *ev)
436 {
437         int rc;
438         int portal;
439
440         if (service > SRPC_FRAMEWORK_SERVICE_MAX_ID)
441                 portal = SRPC_REQUEST_PORTAL;
442         else
443                 portal = SRPC_FRAMEWORK_REQUEST_PORTAL;
444
445         rc = srpc_post_active_rdma(portal, service, buf, len, 
446                                    LNET_MD_OP_PUT, peer,
447                                    LNET_NID_ANY, mdh, ev);
448         return rc;
449 }
450
451 int
452 srpc_post_passive_rqtbuf(int service, void *buf, int len,
453                          lnet_handle_md_t *mdh, srpc_event_t *ev)
454 {
455         int               rc;
456         int               portal;
457         lnet_process_id_t any = {.nid = LNET_NID_ANY,
458                                  .pid = LNET_PID_ANY};
459
460         if (service > SRPC_FRAMEWORK_SERVICE_MAX_ID)
461                 portal = SRPC_REQUEST_PORTAL;
462         else
463                 portal = SRPC_FRAMEWORK_REQUEST_PORTAL;
464
465         rc = srpc_post_passive_rdma(portal, service, buf, len,
466                                     LNET_MD_OP_PUT, any, mdh, ev);
467         return rc;
468 }
469
470 int
471 srpc_service_post_buffer (srpc_service_t *sv, srpc_buffer_t *buf)
472 {
473         srpc_msg_t *msg = &buf->buf_msg;
474         int         rc;
475
476         LASSERT (!sv->sv_shuttingdown);
477
478         buf->buf_mdh = LNET_INVALID_HANDLE;
479         list_add(&buf->buf_list, &sv->sv_posted_msgq);
480         sv->sv_nposted_msg++;
481         spin_unlock(&sv->sv_lock);
482
483         rc = srpc_post_passive_rqtbuf(sv->sv_id, msg, sizeof(*msg),
484                                       &buf->buf_mdh, &sv->sv_ev);
485
486         /* At this point, a RPC (new or delayed) may have arrived in
487          * msg and its event handler has been called. So we must add
488          * buf to sv_posted_msgq _before_ dropping sv_lock */
489
490         spin_lock(&sv->sv_lock);
491
492         if (rc == 0) {
493                 if (sv->sv_shuttingdown) {
494                         spin_unlock(&sv->sv_lock);
495
496                         /* srpc_shutdown_service might have tried to unlink me
497                          * when my buf_mdh was still invalid */
498                         LNetMDUnlink(buf->buf_mdh);
499
500                         spin_lock(&sv->sv_lock);
501                 }
502                 return 0;
503         }
504
505         sv->sv_nposted_msg--;
506         if (sv->sv_shuttingdown) return rc;
507
508         list_del(&buf->buf_list);
509
510         spin_unlock(&sv->sv_lock);
511         LIBCFS_FREE(buf, sizeof(*buf));
512         spin_lock(&sv->sv_lock);
513         return rc; 
514 }
515
516 int
517 srpc_service_add_buffers (srpc_service_t *sv, int nbuffer)
518 {
519         int                rc;
520         int                posted;
521         srpc_buffer_t     *buf;
522
523         LASSERTF (nbuffer > 0,
524                   "nbuffer must be positive: %d\n", nbuffer);
525
526         for (posted = 0; posted < nbuffer; posted++) {
527                 LIBCFS_ALLOC(buf, sizeof(*buf));
528                 if (buf == NULL) break;
529
530                 spin_lock(&sv->sv_lock);
531                 rc = srpc_service_post_buffer(sv, buf);
532                 spin_unlock(&sv->sv_lock);
533
534                 if (rc != 0) break;
535         }
536
537         return posted;
538 }
539
540 void
541 srpc_service_remove_buffers (srpc_service_t *sv, int nbuffer)
542 {
543         LASSERTF (nbuffer > 0,
544                   "nbuffer must be positive: %d\n", nbuffer);
545
546         spin_lock(&sv->sv_lock);
547
548         LASSERT (sv->sv_nprune >= 0);
549         LASSERT (!sv->sv_shuttingdown);
550
551         sv->sv_nprune += nbuffer;
552
553         spin_unlock(&sv->sv_lock);
554         return;
555 }
556
557 /* returns 1 if sv has finished, otherwise 0 */
558 int
559 srpc_finish_service (srpc_service_t *sv)
560 {
561         srpc_server_rpc_t *rpc;
562         srpc_buffer_t     *buf;
563
564         spin_lock(&sv->sv_lock);
565
566         LASSERT (sv->sv_shuttingdown); /* srpc_shutdown_service called */
567
568         if (sv->sv_nposted_msg != 0 || !list_empty(&sv->sv_active_rpcq)) {
569                 CDEBUG (D_NET,
570                         "waiting for %d posted buffers to unlink and "
571                         "in-flight RPCs to die.\n",
572                         sv->sv_nposted_msg);
573
574                 if (!list_empty(&sv->sv_active_rpcq)) {
575                         rpc = list_entry(sv->sv_active_rpcq.next,
576                                          srpc_server_rpc_t, srpc_list);
577                         CDEBUG (D_NETERROR,
578                                 "Active RPC on shutdown: sv %s, peer %s, "
579                                 "wi %s scheduled %d running %d, "
580                                 "ev fired %d type %d status %d lnet %d\n",
581                                 sv->sv_name, libcfs_id2str(rpc->srpc_peer),
582                                 swi_state2str(rpc->srpc_wi.wi_state),
583                                 rpc->srpc_wi.wi_scheduled,
584                                 rpc->srpc_wi.wi_running,
585                                 rpc->srpc_ev.ev_fired,
586                                 rpc->srpc_ev.ev_type,
587                                 rpc->srpc_ev.ev_status,
588                                 rpc->srpc_ev.ev_lnet);
589                 }
590
591                 spin_unlock(&sv->sv_lock);
592                 return 0;
593         }
594
595         spin_unlock(&sv->sv_lock); /* no lock needed from now on */
596
597         for (;;) {
598                 struct list_head *q;
599
600                 if (!list_empty(&sv->sv_posted_msgq))
601                         q = &sv->sv_posted_msgq;
602                 else if (!list_empty(&sv->sv_blocked_msgq))
603                         q = &sv->sv_blocked_msgq;
604                 else
605                         break;
606
607                 buf = list_entry(q->next, srpc_buffer_t, buf_list);
608                 list_del(&buf->buf_list);
609
610                 LIBCFS_FREE(buf, sizeof(*buf));
611         }
612
613         while (!list_empty(&sv->sv_free_rpcq)) {
614                 rpc = list_entry(sv->sv_free_rpcq.next,
615                                  srpc_server_rpc_t, srpc_list);
616                 list_del(&rpc->srpc_list);
617                 LIBCFS_FREE(rpc, sizeof(*rpc));
618         }
619
620         return 1;
621 }
622
623 /* called with sv->sv_lock held */
624 void
625 srpc_service_recycle_buffer (srpc_service_t *sv, srpc_buffer_t *buf)
626 {
627         if (sv->sv_shuttingdown) goto free;
628
629         if (sv->sv_nprune == 0) {
630                 if (srpc_service_post_buffer(sv, buf) != 0)
631                         CWARN ("Failed to post %s buffer\n", sv->sv_name);
632                 return;
633         }
634
635         sv->sv_nprune--;
636 free:
637         spin_unlock(&sv->sv_lock);
638         LIBCFS_FREE(buf, sizeof(*buf));
639         spin_lock(&sv->sv_lock);
640 }
641
642 void
643 srpc_shutdown_service (srpc_service_t *sv)
644 {
645         srpc_server_rpc_t *rpc;
646         srpc_buffer_t     *buf;
647
648         spin_lock(&sv->sv_lock);
649
650         CDEBUG (D_NET, "Shutting down service: id %d, name %s\n",
651                 sv->sv_id, sv->sv_name);
652
653         sv->sv_shuttingdown = 1; /* i.e. no new active RPC */
654
655         /* schedule in-flight RPCs to notice the shutdown */
656         list_for_each_entry (rpc, &sv->sv_active_rpcq, srpc_list) {
657                 swi_schedule_workitem(&rpc->srpc_wi);
658         }
659
660         spin_unlock(&sv->sv_lock);
661
662         /* OK to traverse sv_posted_msgq without lock, since no one
663          * touches sv_posted_msgq now */
664         list_for_each_entry (buf, &sv->sv_posted_msgq, buf_list)
665                 LNetMDUnlink(buf->buf_mdh);
666
667         return;
668 }
669
670 int
671 srpc_send_request (srpc_client_rpc_t *rpc)
672 {
673         srpc_event_t *ev = &rpc->crpc_reqstev;
674         int           rc;
675
676         ev->ev_fired = 0;
677         ev->ev_data  = rpc;
678         ev->ev_type  = SRPC_REQUEST_SENT;
679
680         rc = srpc_post_active_rqtbuf(rpc->crpc_dest, rpc->crpc_service,
681                                      &rpc->crpc_reqstmsg, sizeof(srpc_msg_t),
682                                      &rpc->crpc_reqstmdh, ev);
683         if (rc != 0) {
684                 LASSERT (rc == -ENOMEM);
685                 ev->ev_fired = 1;  /* no more event expected */
686         }
687         return rc;
688 }
689
690 int
691 srpc_prepare_reply (srpc_client_rpc_t *rpc)
692 {
693         srpc_event_t *ev = &rpc->crpc_replyev;
694         __u64        *id = &rpc->crpc_reqstmsg.msg_body.reqst.rpyid;
695         int           rc;
696
697         ev->ev_fired = 0;
698         ev->ev_data  = rpc;
699         ev->ev_type  = SRPC_REPLY_RCVD;
700
701         *id = srpc_next_id();
702
703         rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, *id,
704                                     &rpc->crpc_replymsg, sizeof(srpc_msg_t),
705                                     LNET_MD_OP_PUT, rpc->crpc_dest,
706                                     &rpc->crpc_replymdh, ev);
707         if (rc != 0) {
708                 LASSERT (rc == -ENOMEM);
709                 ev->ev_fired = 1;  /* no more event expected */
710         }
711         return rc;
712 }
713
714 int
715 srpc_prepare_bulk (srpc_client_rpc_t *rpc)
716 {
717         srpc_bulk_t  *bk = &rpc->crpc_bulk;
718         srpc_event_t *ev = &rpc->crpc_bulkev;
719         __u64        *id = &rpc->crpc_reqstmsg.msg_body.reqst.bulkid;
720         int           rc;
721         int           opt;
722
723         LASSERT (bk->bk_niov <= LNET_MAX_IOV);
724
725         if (bk->bk_niov == 0) return 0; /* nothing to do */
726
727         opt = bk->bk_sink ? LNET_MD_OP_PUT : LNET_MD_OP_GET;
728 #ifdef __KERNEL__
729         opt |= LNET_MD_KIOV;
730 #else
731         opt |= LNET_MD_IOVEC;
732 #endif
733
734         ev->ev_fired = 0;
735         ev->ev_data  = rpc;
736         ev->ev_type  = SRPC_BULK_REQ_RCVD;
737
738         *id = srpc_next_id();
739
740         rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, *id,
741                                     &bk->bk_iovs[0], bk->bk_niov, opt,
742                                     rpc->crpc_dest, &bk->bk_mdh, ev);
743         if (rc != 0) {
744                 LASSERT (rc == -ENOMEM);
745                 ev->ev_fired = 1;  /* no more event expected */
746         }
747         return rc;
748 }
749
750 int
751 srpc_do_bulk (srpc_server_rpc_t *rpc)
752 {
753         srpc_event_t  *ev = &rpc->srpc_ev;
754         srpc_bulk_t   *bk = rpc->srpc_bulk;
755         __u64          id = rpc->srpc_reqstbuf->buf_msg.msg_body.reqst.bulkid;
756         int            rc;
757         int            opt;
758
759         LASSERT (bk != NULL);
760
761         opt = bk->bk_sink ? LNET_MD_OP_GET : LNET_MD_OP_PUT;
762 #ifdef __KERNEL__
763         opt |= LNET_MD_KIOV;
764 #else
765         opt |= LNET_MD_IOVEC;
766 #endif
767
768         ev->ev_fired = 0;
769         ev->ev_data  = rpc;
770         ev->ev_type  = bk->bk_sink ? SRPC_BULK_GET_RPLD : SRPC_BULK_PUT_SENT;
771
772         rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, id,
773                                    &bk->bk_iovs[0], bk->bk_niov, opt,
774                                    rpc->srpc_peer, rpc->srpc_self,
775                                    &bk->bk_mdh, ev);
776         if (rc != 0)
777                 ev->ev_fired = 1;  /* no more event expected */
778         return rc;
779 }
780
781 /* called with srpc_service_t::sv_lock held */
782 inline void
783 srpc_schedule_server_rpc (srpc_server_rpc_t *rpc)
784 {
785         srpc_service_t *sv = rpc->srpc_service;
786
787         if (sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID)
788                 swi_schedule_workitem(&rpc->srpc_wi);
789         else    /* framework RPCs are handled one by one */
790                 swi_schedule_serial_workitem(&rpc->srpc_wi);
791
792         return;
793 }
794
795 /* only called from srpc_handle_rpc */
796 void
797 srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status)
798 {
799         srpc_service_t *sv = rpc->srpc_service;
800         srpc_buffer_t  *buffer;
801
802         LASSERT (status != 0 || rpc->srpc_wi.wi_state == SWI_STATE_DONE);
803
804         rpc->srpc_status = status;
805
806         CDEBUG (status == 0 ? D_NET : D_NETERROR,
807                 "Server RPC done: service %s, peer %s, status %s:%d\n",
808                 sv->sv_name, libcfs_id2str(rpc->srpc_peer),
809                 swi_state2str(rpc->srpc_wi.wi_state), status);
810
811         if (status != 0) {
812                 spin_lock(&srpc_data.rpc_glock);
813                 srpc_data.rpc_counters.rpcs_dropped++;
814                 spin_unlock(&srpc_data.rpc_glock);
815         }
816
817         if (rpc->srpc_done != NULL)
818                 (*rpc->srpc_done) (rpc);
819         LASSERT (rpc->srpc_bulk == NULL);
820
821         spin_lock(&sv->sv_lock);
822
823         if (rpc->srpc_reqstbuf != NULL) {
824                 /* NB might drop sv_lock in srpc_service_recycle_buffer, but
825                  * sv won't go away for sv_active_rpcq must not be empty */
826                 srpc_service_recycle_buffer(sv, rpc->srpc_reqstbuf);
827                 rpc->srpc_reqstbuf = NULL;
828         }
829
830         list_del(&rpc->srpc_list); /* from sv->sv_active_rpcq */
831
832         /*
833          * No one can schedule me now since:
834          * - I'm not on sv_active_rpcq.
835          * - all LNet events have been fired.
836          * Cancel pending schedules and prevent future schedule attempts:
837          */
838         LASSERT (rpc->srpc_ev.ev_fired);
839         swi_kill_workitem(&rpc->srpc_wi);
840
841         if (!sv->sv_shuttingdown && !list_empty(&sv->sv_blocked_msgq)) {
842                 buffer = list_entry(sv->sv_blocked_msgq.next,
843                                     srpc_buffer_t, buf_list);
844                 list_del(&buffer->buf_list);
845
846                 srpc_init_server_rpc(rpc, sv, buffer);
847                 list_add_tail(&rpc->srpc_list, &sv->sv_active_rpcq);
848                 srpc_schedule_server_rpc(rpc);
849         } else {
850                 list_add(&rpc->srpc_list, &sv->sv_free_rpcq);
851         }
852
853         spin_unlock(&sv->sv_lock);
854         return;
855 }
856
857 /* handles an incoming RPC */
858 int
859 srpc_handle_rpc (swi_workitem_t *wi)
860 {
861         srpc_server_rpc_t *rpc = wi->wi_data;
862         srpc_service_t    *sv = rpc->srpc_service;
863         srpc_event_t      *ev = &rpc->srpc_ev;
864         int                rc = 0;
865
866         LASSERT (wi == &rpc->srpc_wi);
867
868         spin_lock(&sv->sv_lock);
869
870         if (sv->sv_shuttingdown) {
871                 spin_unlock(&sv->sv_lock);
872
873                 if (rpc->srpc_bulk != NULL)
874                         LNetMDUnlink(rpc->srpc_bulk->bk_mdh);
875                 LNetMDUnlink(rpc->srpc_replymdh);
876
877                 if (ev->ev_fired) { /* no more event, OK to finish */
878                         srpc_server_rpc_done(rpc, -ESHUTDOWN);
879                         return 1;
880                 }
881                 return 0;
882         }
883
884         spin_unlock(&sv->sv_lock);
885
886         switch (wi->wi_state) {
887         default:
888                 LBUG ();
889         case SWI_STATE_NEWBORN: {
890                 srpc_msg_t           *msg;
891                 srpc_generic_reply_t *reply;
892
893                 msg = &rpc->srpc_reqstbuf->buf_msg;
894                 reply = &rpc->srpc_replymsg.msg_body.reply;
895
896                 if (msg->msg_version != SRPC_MSG_VERSION &&
897                     msg->msg_version != __swab32(SRPC_MSG_VERSION)) {
898                         CWARN ("Version mismatch: %u, %u expected, from %s\n",
899                                msg->msg_version, SRPC_MSG_VERSION,
900                                libcfs_id2str(rpc->srpc_peer));
901                         reply->status = EPROTO;
902                 } else {
903                         reply->status = 0;
904                         rc = (*sv->sv_handler) (rpc);
905                         LASSERT (reply->status == 0 || !rpc->srpc_bulk);
906                 }
907
908                 if (rc != 0) {
909                         srpc_server_rpc_done(rpc, rc);
910                         return 1;
911                 }
912
913                 wi->wi_state = SWI_STATE_BULK_STARTED;
914
915                 if (rpc->srpc_bulk != NULL) {
916                         rc = srpc_do_bulk(rpc);
917                         if (rc == 0)
918                                 return 0; /* wait for bulk */
919
920                         LASSERT (ev->ev_fired);
921                         ev->ev_status = rc;
922                 }
923         }
924         case SWI_STATE_BULK_STARTED:
925                 LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired);
926
927                 if (rpc->srpc_bulk != NULL) {
928                         rc = ev->ev_status;
929
930                         if (sv->sv_bulk_ready != NULL)
931                                 rc = (*sv->sv_bulk_ready) (rpc, rc);
932
933                         if (rc != 0) {
934                                 srpc_server_rpc_done(rpc, rc);
935                                 return 1;
936                         }
937                 }
938
939                 wi->wi_state = SWI_STATE_REPLY_SUBMITTED;
940                 rc = srpc_send_reply(rpc);
941                 if (rc == 0)
942                         return 0; /* wait for reply */
943                 srpc_server_rpc_done(rpc, rc);
944                 return 1;
945
946         case SWI_STATE_REPLY_SUBMITTED:
947                 LASSERT (ev->ev_fired);
948
949                 wi->wi_state = SWI_STATE_DONE;
950                 srpc_server_rpc_done(rpc, ev->ev_status);
951                 return 1;
952         }
953
954         return 0;
955 }
956
957 void
958 srpc_client_rpc_expired (void *data)
959 {
960         srpc_client_rpc_t *rpc = data;
961
962         CWARN ("Client RPC expired: service %d, peer %s, timeout %d.\n",
963                rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
964                rpc->crpc_timeout);
965
966         spin_lock(&rpc->crpc_lock);
967
968         rpc->crpc_timeout = 0;
969         srpc_abort_rpc(rpc, -ETIMEDOUT);
970
971         spin_unlock(&rpc->crpc_lock);
972
973         spin_lock(&srpc_data.rpc_glock);
974         srpc_data.rpc_counters.rpcs_expired++;
975         spin_unlock(&srpc_data.rpc_glock);
976         return;
977 }
978
979 inline void
980 srpc_add_client_rpc_timer (srpc_client_rpc_t *rpc)
981 {
982         stt_timer_t *timer = &rpc->crpc_timer;
983
984         if (rpc->crpc_timeout == 0) return;
985
986         CFS_INIT_LIST_HEAD(&timer->stt_list);
987         timer->stt_data    = rpc;
988         timer->stt_func    = srpc_client_rpc_expired;
989         timer->stt_expires = cfs_time_add(rpc->crpc_timeout, 
990                                           cfs_time_current_sec());
991         stt_add_timer(timer);
992         return;
993 }
994
995 /* 
996  * Called with rpc->crpc_lock held.
997  *
998  * Upon exit the RPC expiry timer is not queued and the handler is not
999  * running on any CPU. */
1000 void
1001 srpc_del_client_rpc_timer (srpc_client_rpc_t *rpc)
1002 {     
1003         /* timer not planted or already exploded */
1004         if (rpc->crpc_timeout == 0) return;
1005
1006         /* timer sucessfully defused */
1007         if (stt_del_timer(&rpc->crpc_timer)) return;
1008
1009 #ifdef __KERNEL__
1010         /* timer detonated, wait for it to explode */
1011         while (rpc->crpc_timeout != 0) {
1012                 spin_unlock(&rpc->crpc_lock);
1013
1014                 cfs_schedule(); 
1015
1016                 spin_lock(&rpc->crpc_lock);
1017         }
1018 #else
1019         LBUG(); /* impossible in single-threaded runtime */
1020 #endif
1021         return;
1022 }
1023
1024 void
1025 srpc_check_sends (srpc_peer_t *peer, int credits)
1026 {
1027         struct list_head  *q;
1028         srpc_client_rpc_t *rpc;
1029
1030         LASSERT (credits >= 0);
1031         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
1032
1033         spin_lock(&peer->stp_lock);
1034         peer->stp_credits += credits;
1035
1036         while (peer->stp_credits) {
1037                 if (!list_empty(&peer->stp_ctl_rpcq))
1038                         q = &peer->stp_ctl_rpcq;
1039                 else if (!list_empty(&peer->stp_rpcq))
1040                         q = &peer->stp_rpcq;
1041                 else
1042                         break;
1043
1044                 peer->stp_credits--;
1045
1046                 rpc = list_entry(q->next, srpc_client_rpc_t, crpc_privl);
1047                 list_del_init(&rpc->crpc_privl);
1048                 srpc_client_rpc_decref(rpc);  /* --ref for peer->*rpcq */
1049
1050                 swi_schedule_workitem(&rpc->crpc_wi);
1051         }
1052
1053         spin_unlock(&peer->stp_lock);
1054         return;
1055 }
1056
1057 void
1058 srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status)
1059 {
1060         swi_workitem_t *wi = &rpc->crpc_wi;
1061         srpc_peer_t    *peer = rpc->crpc_peer;
1062
1063         LASSERT (status != 0 || wi->wi_state == SWI_STATE_DONE);
1064
1065         spin_lock(&rpc->crpc_lock);
1066
1067         rpc->crpc_closed = 1;
1068         if (rpc->crpc_status == 0)
1069                 rpc->crpc_status = status;
1070
1071         srpc_del_client_rpc_timer(rpc);
1072
1073         CDEBUG ((status == 0) ? D_NET : D_NETERROR,
1074                 "Client RPC done: service %d, peer %s, status %s:%d:%d\n",
1075                 rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1076                 swi_state2str(wi->wi_state), rpc->crpc_aborted, status);
1077
1078         /*
1079          * No one can schedule me now since:
1080          * - RPC timer has been defused.
1081          * - all LNet events have been fired.
1082          * - crpc_closed has been set, preventing srpc_abort_rpc from 
1083          *   scheduling me.
1084          * Cancel pending schedules and prevent future schedule attempts:
1085          */
1086         LASSERT (!srpc_event_pending(rpc));
1087         swi_kill_workitem(wi);
1088
1089         spin_unlock(&rpc->crpc_lock);
1090
1091         (*rpc->crpc_done) (rpc);
1092
1093         if (peer != NULL)
1094                 srpc_check_sends(peer, 1);
1095         return;
1096 }
1097
1098 /* sends an outgoing RPC */
1099 int
1100 srpc_send_rpc (swi_workitem_t *wi)
1101 {
1102         int                rc = 0;
1103         srpc_client_rpc_t *rpc = wi->wi_data;
1104         srpc_msg_t        *reply = &rpc->crpc_replymsg;
1105         int                do_bulk = rpc->crpc_bulk.bk_niov > 0;
1106
1107         LASSERT (rpc != NULL);
1108         LASSERT (wi == &rpc->crpc_wi);
1109
1110         spin_lock(&rpc->crpc_lock);
1111
1112         if (rpc->crpc_aborted) {
1113                 spin_unlock(&rpc->crpc_lock);
1114                 goto abort;
1115         }
1116
1117         spin_unlock(&rpc->crpc_lock);
1118
1119         switch (wi->wi_state) {
1120         default:
1121                 LBUG ();
1122         case SWI_STATE_NEWBORN:
1123                 LASSERT (!srpc_event_pending(rpc));
1124
1125                 rc = srpc_prepare_reply(rpc);
1126                 if (rc != 0) {
1127                         srpc_client_rpc_done(rpc, rc);
1128                         return 1;
1129                 }
1130
1131                 rc = srpc_prepare_bulk(rpc);
1132                 if (rc != 0) break;
1133
1134                 wi->wi_state = SWI_STATE_REQUEST_SUBMITTED;
1135                 rc = srpc_send_request(rpc);
1136                 break;
1137
1138         case SWI_STATE_REQUEST_SUBMITTED:
1139                 /* CAVEAT EMPTOR: rqtev, rpyev, and bulkev may come in any
1140                  * order; however, they're processed in a strict order: 
1141                  * rqt, rpy, and bulk. */
1142                 if (!rpc->crpc_reqstev.ev_fired) break;
1143
1144                 rc = rpc->crpc_reqstev.ev_status;
1145                 if (rc != 0) break;
1146
1147                 wi->wi_state = SWI_STATE_REQUEST_SENT;
1148                 /* perhaps more events, fall thru */
1149         case SWI_STATE_REQUEST_SENT: {
1150                 srpc_msg_type_t type = srpc_service2reply(rpc->crpc_service);
1151
1152                 if (!rpc->crpc_replyev.ev_fired) break;
1153
1154                 rc = rpc->crpc_replyev.ev_status;
1155                 if (rc != 0) break;
1156
1157                 if ((reply->msg_type != type && 
1158                      reply->msg_type != __swab32(type)) ||
1159                     (reply->msg_magic != SRPC_MSG_MAGIC &&
1160                      reply->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
1161                         CWARN ("Bad message from %s: type %u (%d expected),"
1162                                " magic %u (%d expected).\n",
1163                                libcfs_id2str(rpc->crpc_dest),
1164                                reply->msg_type, type,
1165                                reply->msg_magic, SRPC_MSG_MAGIC);
1166                         rc = -EBADMSG;
1167                         break;
1168                 }
1169
1170                 if (do_bulk && reply->msg_body.reply.status != 0) {
1171                         CWARN ("Remote error %d at %s, unlink bulk buffer in "
1172                                "case peer didn't initiate bulk transfer\n",
1173                                reply->msg_body.reply.status,
1174                                libcfs_id2str(rpc->crpc_dest));
1175                         LNetMDUnlink(rpc->crpc_bulk.bk_mdh);
1176                 }
1177
1178                 wi->wi_state = SWI_STATE_REPLY_RECEIVED;
1179         }
1180         case SWI_STATE_REPLY_RECEIVED:
1181                 if (do_bulk && !rpc->crpc_bulkev.ev_fired) break;
1182
1183                 rc = do_bulk ? rpc->crpc_bulkev.ev_status : 0;
1184
1185                 /* Bulk buffer was unlinked due to remote error. Clear error
1186                  * since reply buffer still contains valid data.
1187                  * NB rpc->crpc_done shouldn't look into bulk data in case of
1188                  * remote error. */
1189                 if (do_bulk && rpc->crpc_bulkev.ev_lnet == LNET_EVENT_UNLINK &&
1190                     rpc->crpc_status == 0 && reply->msg_body.reply.status != 0)
1191                         rc = 0;
1192
1193                 wi->wi_state = SWI_STATE_DONE;
1194                 srpc_client_rpc_done(rpc, rc);
1195                 return 1;
1196         }
1197
1198         if (rc != 0) {
1199                 spin_lock(&rpc->crpc_lock);
1200                 srpc_abort_rpc(rpc, rc);
1201                 spin_unlock(&rpc->crpc_lock);
1202         }
1203
1204 abort:
1205         if (rpc->crpc_aborted) {
1206                 LNetMDUnlink(rpc->crpc_reqstmdh);
1207                 LNetMDUnlink(rpc->crpc_replymdh);
1208                 LNetMDUnlink(rpc->crpc_bulk.bk_mdh);
1209
1210                 if (!srpc_event_pending(rpc)) {
1211                         srpc_client_rpc_done(rpc, -EINTR);
1212                         return 1;
1213                 }
1214         }
1215         return 0;
1216 }
1217
1218 srpc_client_rpc_t *
1219 srpc_create_client_rpc (lnet_process_id_t peer, int service,
1220                         int nbulkiov, int bulklen,
1221                         void (*rpc_done)(srpc_client_rpc_t *),
1222                         void (*rpc_fini)(srpc_client_rpc_t *), void *priv)
1223 {
1224         srpc_client_rpc_t *rpc;
1225
1226         LIBCFS_ALLOC(rpc, offsetof(srpc_client_rpc_t,
1227                                    crpc_bulk.bk_iovs[nbulkiov]));
1228         if (rpc == NULL)
1229                 return NULL;
1230
1231         srpc_init_client_rpc(rpc, peer, service, nbulkiov,
1232                              bulklen, rpc_done, rpc_fini, priv);
1233         return rpc;
1234 }
1235
1236 /* called with rpc->crpc_lock held */
1237 static inline void
1238 srpc_queue_rpc (srpc_peer_t *peer, srpc_client_rpc_t *rpc)
1239 {
1240         int service = rpc->crpc_service;
1241
1242         LASSERT (peer->stp_nid == rpc->crpc_dest.nid);
1243         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
1244
1245         rpc->crpc_peer = peer;
1246
1247         spin_lock(&peer->stp_lock);
1248
1249         /* Framework RPCs that alter session state shall take precedence
1250          * over test RPCs and framework query RPCs */
1251         if (service <= SRPC_FRAMEWORK_SERVICE_MAX_ID &&
1252             service != SRPC_SERVICE_DEBUG &&
1253             service != SRPC_SERVICE_QUERY_STAT)
1254                 list_add_tail(&rpc->crpc_privl, &peer->stp_ctl_rpcq);
1255         else
1256                 list_add_tail(&rpc->crpc_privl, &peer->stp_rpcq);
1257
1258         srpc_client_rpc_addref(rpc); /* ++ref for peer->*rpcq */
1259         spin_unlock(&peer->stp_lock);
1260         return;
1261 }
1262
1263 /* called with rpc->crpc_lock held */
1264 void
1265 srpc_abort_rpc (srpc_client_rpc_t *rpc, int why)
1266 {
1267         srpc_peer_t *peer = rpc->crpc_peer;
1268
1269         LASSERT (why != 0);
1270
1271         if (rpc->crpc_aborted || /* already aborted */
1272             rpc->crpc_closed)    /* callback imminent */
1273                 return;
1274
1275         CDEBUG (D_NET,
1276                 "Aborting RPC: service %d, peer %s, state %s, why %d\n",
1277                 rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1278                 swi_state2str(rpc->crpc_wi.wi_state), why);
1279
1280         rpc->crpc_aborted = 1;
1281         rpc->crpc_status  = why;
1282
1283         if (peer != NULL) {
1284                 spin_lock(&peer->stp_lock);
1285
1286                 if (!list_empty(&rpc->crpc_privl)) { /* still queued */
1287                         list_del_init(&rpc->crpc_privl);
1288                         srpc_client_rpc_decref(rpc); /* --ref for peer->*rpcq */
1289                         rpc->crpc_peer = NULL;       /* no credit taken */
1290                 }
1291
1292                 spin_unlock(&peer->stp_lock);
1293         }
1294
1295         swi_schedule_workitem(&rpc->crpc_wi);
1296         return;
1297 }
1298
1299 /* called with rpc->crpc_lock held */
1300 void
1301 srpc_post_rpc (srpc_client_rpc_t *rpc)
1302 {
1303         srpc_peer_t *peer;
1304
1305         LASSERT (!rpc->crpc_aborted);
1306         LASSERT (rpc->crpc_peer == NULL);
1307         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
1308         LASSERT ((rpc->crpc_bulk.bk_len & ~CFS_PAGE_MASK) == 0);
1309
1310         CDEBUG (D_NET, "Posting RPC: peer %s, service %d, timeout %d\n",
1311                 libcfs_id2str(rpc->crpc_dest), rpc->crpc_service,
1312                 rpc->crpc_timeout);
1313
1314         srpc_add_client_rpc_timer(rpc);
1315
1316         peer = srpc_nid2peer(rpc->crpc_dest.nid);
1317         if (peer == NULL) {
1318                 srpc_abort_rpc(rpc, -ENOMEM);
1319                 return;
1320         }
1321
1322         srpc_queue_rpc(peer, rpc);
1323
1324         spin_unlock(&rpc->crpc_lock);
1325         srpc_check_sends(peer, 0);
1326         spin_lock(&rpc->crpc_lock);
1327         return;
1328 }
1329
1330
1331 int
1332 srpc_send_reply (srpc_server_rpc_t *rpc)
1333 {
1334         srpc_event_t   *ev = &rpc->srpc_ev;
1335         srpc_msg_t     *msg = &rpc->srpc_replymsg;
1336         srpc_buffer_t  *buffer = rpc->srpc_reqstbuf;
1337         srpc_service_t *sv = rpc->srpc_service;
1338         __u64           rpyid;
1339         int             rc;
1340
1341         LASSERT (buffer != NULL);
1342         rpyid = buffer->buf_msg.msg_body.reqst.rpyid;
1343
1344         spin_lock(&sv->sv_lock);
1345
1346         if (!sv->sv_shuttingdown &&
1347             sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID) {
1348                 /* Repost buffer before replying since test client
1349                  * might send me another RPC once it gets the reply */
1350                 if (srpc_service_post_buffer(sv, buffer) != 0)
1351                         CWARN ("Failed to repost %s buffer\n", sv->sv_name);
1352                 rpc->srpc_reqstbuf = NULL;
1353         }
1354
1355         spin_unlock(&sv->sv_lock);
1356
1357         ev->ev_fired = 0;
1358         ev->ev_data  = rpc;
1359         ev->ev_type  = SRPC_REPLY_SENT;
1360
1361         msg->msg_magic   = SRPC_MSG_MAGIC;
1362         msg->msg_version = SRPC_MSG_VERSION;
1363         msg->msg_type    = srpc_service2reply(sv->sv_id);
1364
1365         rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, rpyid, msg,
1366                                    sizeof(*msg), LNET_MD_OP_PUT,
1367                                    rpc->srpc_peer, rpc->srpc_self,
1368                                    &rpc->srpc_replymdh, ev);
1369         if (rc != 0)
1370                 ev->ev_fired = 1;  /* no more event expected */
1371         return rc;
1372 }
1373
1374 /* when in kernel always called with LNET_LOCK() held, and in thread context */
1375 void 
1376 srpc_lnet_ev_handler (lnet_event_t *ev)
1377 {
1378         srpc_event_t      *rpcev = ev->md.user_ptr;
1379         srpc_client_rpc_t *crpc;
1380         srpc_server_rpc_t *srpc;
1381         srpc_buffer_t     *buffer;
1382         srpc_service_t    *sv;
1383         srpc_msg_t        *msg;
1384         srpc_msg_type_t    type;
1385
1386         LASSERT (!in_interrupt());
1387
1388         if (ev->status != 0) {
1389                 spin_lock(&srpc_data.rpc_glock);
1390                 srpc_data.rpc_counters.errors++;
1391                 spin_unlock(&srpc_data.rpc_glock);
1392         }
1393
1394         rpcev->ev_lnet = ev->type;
1395
1396         switch (rpcev->ev_type) {
1397         default:
1398                 LBUG ();
1399         case SRPC_REQUEST_SENT:
1400                 if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {
1401                         spin_lock(&srpc_data.rpc_glock);
1402                         srpc_data.rpc_counters.rpcs_sent++;
1403                         spin_unlock(&srpc_data.rpc_glock);
1404                 }
1405         case SRPC_REPLY_RCVD:
1406         case SRPC_BULK_REQ_RCVD:
1407                 crpc = rpcev->ev_data;
1408
1409                 LASSERT (rpcev == &crpc->crpc_reqstev ||
1410                          rpcev == &crpc->crpc_replyev ||
1411                          rpcev == &crpc->crpc_bulkev);
1412
1413                 spin_lock(&crpc->crpc_lock);
1414
1415                 LASSERT (rpcev->ev_fired == 0);
1416                 rpcev->ev_fired  = 1;
1417                 rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? 
1418                                                 -EINTR : ev->status;
1419                 swi_schedule_workitem(&crpc->crpc_wi);
1420
1421                 spin_unlock(&crpc->crpc_lock);
1422                 break;
1423
1424         case SRPC_REQUEST_RCVD:
1425                 sv = rpcev->ev_data;
1426
1427                 LASSERT (rpcev == &sv->sv_ev);
1428
1429                 spin_lock(&sv->sv_lock);
1430
1431                 LASSERT (ev->unlinked);
1432                 LASSERT (ev->type == LNET_EVENT_PUT ||
1433                          ev->type == LNET_EVENT_UNLINK);
1434                 LASSERT (ev->type != LNET_EVENT_UNLINK ||
1435                          sv->sv_shuttingdown);
1436
1437                 buffer = container_of(ev->md.start, srpc_buffer_t, buf_msg);
1438                 buffer->buf_peer = ev->initiator;
1439                 buffer->buf_self = ev->target.nid;
1440
1441                 sv->sv_nposted_msg--;
1442                 LASSERT (sv->sv_nposted_msg >= 0);
1443
1444                 if (sv->sv_shuttingdown) {
1445                         /* Leave buffer on sv->sv_posted_msgq since 
1446                          * srpc_finish_service needs to traverse it. */
1447                         spin_unlock(&sv->sv_lock);
1448                         break;
1449                 }
1450
1451                 list_del(&buffer->buf_list); /* from sv->sv_posted_msgq */
1452                 msg = &buffer->buf_msg;
1453                 type = srpc_service2request(sv->sv_id);
1454
1455                 if (ev->status != 0 || ev->mlength != sizeof(*msg) ||
1456                     (msg->msg_type != type && 
1457                      msg->msg_type != __swab32(type)) ||
1458                     (msg->msg_magic != SRPC_MSG_MAGIC &&
1459                      msg->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
1460                         CERROR ("Dropping RPC (%s) from %s: "
1461                                 "status %d mlength %d type %u magic %u.\n",
1462                                 sv->sv_name, libcfs_id2str(ev->initiator),
1463                                 ev->status, ev->mlength,
1464                                 msg->msg_type, msg->msg_magic);
1465
1466                         /* NB might drop sv_lock in srpc_service_recycle_buffer,
1467                          * sv_nposted_msg++ as an implicit reference to prevent
1468                          * sv from disappearing under me */
1469                         sv->sv_nposted_msg++;
1470                         srpc_service_recycle_buffer(sv, buffer);
1471                         sv->sv_nposted_msg--;
1472                         spin_unlock(&sv->sv_lock);
1473
1474                         if (ev->status == 0) { /* status!=0 counted already */
1475                                 spin_lock(&srpc_data.rpc_glock);
1476                                 srpc_data.rpc_counters.errors++;
1477                                 spin_unlock(&srpc_data.rpc_glock);
1478                         }
1479                         break;
1480                 }
1481
1482                 if (!list_empty(&sv->sv_free_rpcq)) {
1483                         srpc = list_entry(sv->sv_free_rpcq.next,
1484                                           srpc_server_rpc_t, srpc_list);
1485                         list_del(&srpc->srpc_list);
1486
1487                         srpc_init_server_rpc(srpc, sv, buffer);
1488                         list_add_tail(&srpc->srpc_list, &sv->sv_active_rpcq);
1489                         srpc_schedule_server_rpc(srpc);
1490                 } else {
1491                         list_add_tail(&buffer->buf_list, &sv->sv_blocked_msgq);
1492                 }
1493
1494                 spin_unlock(&sv->sv_lock);
1495
1496                 spin_lock(&srpc_data.rpc_glock);
1497                 srpc_data.rpc_counters.rpcs_rcvd++;
1498                 spin_unlock(&srpc_data.rpc_glock);
1499                 break;
1500
1501         case SRPC_BULK_GET_RPLD:
1502                 LASSERT (ev->type == LNET_EVENT_SEND ||
1503                          ev->type == LNET_EVENT_REPLY ||
1504                          ev->type == LNET_EVENT_UNLINK);
1505
1506                 if (ev->type == LNET_EVENT_SEND && 
1507                     ev->status == 0 && !ev->unlinked)
1508                         break; /* wait for the final LNET_EVENT_REPLY */
1509
1510         case SRPC_BULK_PUT_SENT:
1511                 if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {
1512                         spin_lock(&srpc_data.rpc_glock);
1513
1514                         if (rpcev->ev_type == SRPC_BULK_GET_RPLD)
1515                                 srpc_data.rpc_counters.bulk_get += ev->mlength;
1516                         else
1517                                 srpc_data.rpc_counters.bulk_put += ev->mlength;
1518
1519                         spin_unlock(&srpc_data.rpc_glock);
1520                 }
1521         case SRPC_REPLY_SENT:
1522                 srpc = rpcev->ev_data;
1523                 sv = srpc->srpc_service;
1524
1525                 LASSERT (rpcev == &srpc->srpc_ev);
1526
1527                 spin_lock(&sv->sv_lock);
1528                 rpcev->ev_fired  = 1;
1529                 rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? 
1530                                                 -EINTR : ev->status;
1531                 srpc_schedule_server_rpc(srpc);
1532                 spin_unlock(&sv->sv_lock);
1533                 break;
1534         }
1535
1536         return;
1537 }
1538
1539 #ifndef __KERNEL__
1540
1541 int
1542 srpc_check_event (int timeout)
1543 {
1544         lnet_event_t ev;
1545         int          rc;
1546         int          i;
1547
1548         rc = LNetEQPoll(&srpc_data.rpc_lnet_eq, 1,
1549                         timeout * 1000, &ev, &i);
1550         if (rc == 0) return 0;
1551         
1552         LASSERT (rc == -EOVERFLOW || rc == 1);
1553         
1554         /* We can't affort to miss any events... */
1555         if (rc == -EOVERFLOW) {
1556                 CERROR ("Dropped an event!!!\n");
1557                 abort();
1558         }
1559                 
1560         srpc_lnet_ev_handler(&ev);
1561         return 1;
1562 }
1563
1564 #endif
1565
1566 int
1567 srpc_startup (void)
1568 {
1569         int i;
1570         int rc;
1571
1572         memset(&srpc_data, 0, sizeof(struct smoketest_rpc));
1573         spin_lock_init(&srpc_data.rpc_glock);
1574
1575         /* 1 second pause to avoid timestamp reuse */
1576         cfs_pause(cfs_time_seconds(1));
1577         srpc_data.rpc_matchbits = ((__u64) cfs_time_current_sec()) << 48;
1578
1579         srpc_data.rpc_state = SRPC_STATE_NONE;
1580
1581         LIBCFS_ALLOC(srpc_data.rpc_peers,
1582                      sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);
1583         if (srpc_data.rpc_peers == NULL) {
1584                 CERROR ("Failed to alloc peer hash.\n");
1585                 return -ENOMEM;
1586         }
1587
1588         for (i = 0; i < SRPC_PEER_HASH_SIZE; i++)
1589                 CFS_INIT_LIST_HEAD(&srpc_data.rpc_peers[i]);
1590
1591 #ifdef __KERNEL__
1592         rc = LNetNIInit(LUSTRE_SRV_LNET_PID);
1593 #else
1594         rc = LNetNIInit(getpid());
1595 #endif
1596         if (rc < 0) {
1597                 CERROR ("LNetNIInit() has failed: %d\n", rc);
1598                 LIBCFS_FREE(srpc_data.rpc_peers,
1599                             sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);
1600                 return rc;
1601         }
1602
1603         srpc_data.rpc_state = SRPC_STATE_NI_INIT;
1604
1605         srpc_data.rpc_lnet_eq = LNET_EQ_NONE;
1606 #ifdef __KERNEL__
1607         rc = LNetEQAlloc(16, srpc_lnet_ev_handler, &srpc_data.rpc_lnet_eq);
1608 #else
1609         rc = LNetEQAlloc(10240, LNET_EQ_HANDLER_NONE, &srpc_data.rpc_lnet_eq);
1610 #endif
1611         if (rc != 0) {
1612                 CERROR("LNetEQAlloc() has failed: %d\n", rc);
1613                 goto bail;
1614         }
1615
1616         rc = LNetSetLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
1617         LASSERT (rc == 0);
1618
1619         srpc_data.rpc_state = SRPC_STATE_EQ_INIT;
1620
1621         rc = swi_startup();
1622         if (rc != 0)
1623                 goto bail;
1624
1625         srpc_data.rpc_state = SRPC_STATE_WI_INIT;
1626
1627         rc = stt_startup();
1628
1629 bail:
1630         if (rc != 0)
1631                 srpc_shutdown();
1632         else
1633                 srpc_data.rpc_state = SRPC_STATE_RUNNING;
1634
1635         return rc;
1636 }
1637
1638 void
1639 srpc_shutdown (void)
1640 {
1641         int i;
1642         int rc;
1643         int state;
1644
1645         state = srpc_data.rpc_state;
1646         srpc_data.rpc_state = SRPC_STATE_STOPPING;
1647
1648         switch (state) {
1649         default:
1650                 LBUG ();
1651         case SRPC_STATE_RUNNING:
1652                 spin_lock(&srpc_data.rpc_glock);
1653
1654                 for (i = 0; i <= SRPC_SERVICE_MAX_ID; i++) {
1655                         srpc_service_t *sv = srpc_data.rpc_services[i];
1656
1657                         LASSERTF (sv == NULL,
1658                                   "service not empty: id %d, name %s\n",
1659                                   i, sv->sv_name);
1660                 }
1661
1662                 spin_unlock(&srpc_data.rpc_glock);
1663
1664                 stt_shutdown();
1665
1666         case SRPC_STATE_WI_INIT:
1667                 swi_shutdown();
1668
1669         case SRPC_STATE_EQ_INIT:
1670                 rc = LNetClearLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
1671                 LASSERT (rc == 0);
1672                 rc = LNetEQFree(srpc_data.rpc_lnet_eq);
1673                 LASSERT (rc == 0); /* the EQ should have no user by now */
1674
1675         case SRPC_STATE_NI_INIT:
1676                 LNetNIFini();
1677                 break;
1678         }
1679
1680         /* srpc_peer_t's are kept in hash until shutdown */
1681         for (i = 0; i < SRPC_PEER_HASH_SIZE; i++) {
1682                 srpc_peer_t *peer;
1683
1684                 while (!list_empty(&srpc_data.rpc_peers[i])) {
1685                         peer = list_entry(srpc_data.rpc_peers[i].next,
1686                                           srpc_peer_t, stp_list);
1687                         list_del(&peer->stp_list);
1688
1689                         LASSERT (list_empty(&peer->stp_rpcq));
1690                         LASSERT (list_empty(&peer->stp_ctl_rpcq));
1691                         LASSERT (peer->stp_credits == SRPC_PEER_CREDITS);
1692
1693                         LIBCFS_FREE(peer, sizeof(srpc_peer_t));
1694                 }
1695         }
1696
1697         LIBCFS_FREE(srpc_data.rpc_peers,
1698                     sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);
1699         return;
1700 }