Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lnet / selftest / rpc.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Isaac Huang <isaac@clusterfs.com>
6  *
7  */
8
9 #define DEBUG_SUBSYSTEM S_LNET
10
11 #include <libcfs/kp30.h>
12 #include <libcfs/libcfs.h>
13 #include <lnet/lib-lnet.h>
14
15 #include "selftest.h"
16
17
18 typedef enum {
19         SRPC_STATE_NONE,
20         SRPC_STATE_NI_INIT,
21         SRPC_STATE_EQ_INIT,
22         SRPC_STATE_WI_INIT,
23         SRPC_STATE_RUNNING,
24         SRPC_STATE_STOPPING,
25 } srpc_state_t;
26
27 #define SRPC_PEER_HASH_SIZE       101  /* # peer lists */
28 #define SRPC_PEER_CREDITS         16   /* >= most LND's default peer credit */
29
30 struct smoketest_rpc {
31         spinlock_t        rpc_glock;     /* global lock */
32         srpc_service_t   *rpc_services[SRPC_SERVICE_MAX_ID + 1];
33         struct list_head *rpc_peers;     /* hash table of known peers */
34         lnet_handle_eq_t  rpc_lnet_eq;   /* _the_ LNet event queue */
35         srpc_state_t      rpc_state;
36         srpc_counters_t   rpc_counters;
37         __u64             rpc_matchbits; /* matchbits counter */
38 } srpc_data;
39
40 /* forward ref's */
41 int srpc_handle_rpc (swi_workitem_t *wi);
42
43 void srpc_get_counters (srpc_counters_t *cnt)
44 {
45         spin_lock(&srpc_data.rpc_glock);
46         *cnt = srpc_data.rpc_counters;
47         spin_unlock(&srpc_data.rpc_glock);
48 }
49
50 void srpc_set_counters (const srpc_counters_t *cnt)
51 {
52         spin_lock(&srpc_data.rpc_glock);
53         srpc_data.rpc_counters = *cnt;
54         spin_unlock(&srpc_data.rpc_glock);
55 }
56
57 void
58 srpc_add_bulk_page (srpc_bulk_t *bk, cfs_page_t *pg, int i)
59 {
60         LASSERT (i >= 0 && i < bk->bk_niov);
61
62 #ifdef __KERNEL__
63         bk->bk_iovs[i].kiov_offset = 0;
64         bk->bk_iovs[i].kiov_page   = pg;
65         bk->bk_iovs[i].kiov_len    = CFS_PAGE_SIZE;
66 #else
67         LASSERT (bk->bk_pages != NULL);
68
69         bk->bk_pages[i] = pg;
70         bk->bk_iovs[i].iov_len  = CFS_PAGE_SIZE;
71         bk->bk_iovs[i].iov_base = cfs_page_address(pg);
72 #endif
73         return;
74 }
75
76 void
77 srpc_free_bulk (srpc_bulk_t *bk)
78 {
79         int         i;
80         cfs_page_t *pg;
81
82         LASSERT (bk != NULL);
83 #ifndef __KERNEL__
84         LASSERT (bk->bk_pages != NULL);
85 #endif
86
87         for (i = 0; i < bk->bk_niov; i++) {
88 #ifdef __KERNEL__
89                 pg = bk->bk_iovs[i].kiov_page;
90 #else
91                 pg = bk->bk_pages[i];
92 #endif
93                 if (pg == NULL) break;
94
95                 cfs_free_page(pg);
96         }
97
98 #ifndef __KERNEL__
99         LIBCFS_FREE(bk->bk_pages, sizeof(cfs_page_t *) * bk->bk_niov);
100 #endif
101         LIBCFS_FREE(bk, offsetof(srpc_bulk_t, bk_iovs[bk->bk_niov]));
102         return;
103 }
104
105 srpc_bulk_t *
106 srpc_alloc_bulk (int npages, int sink)
107 {
108         srpc_bulk_t  *bk;
109         cfs_page_t  **pages;
110         int           i;
111
112         LASSERT (npages > 0 && npages <= LNET_MAX_IOV);
113
114         LIBCFS_ALLOC(bk, offsetof(srpc_bulk_t, bk_iovs[npages]));
115         if (bk == NULL) {
116                 CERROR ("Can't allocate descriptor for %d pages\n", npages);
117                 return NULL;
118         }
119
120         memset(bk, 0, offsetof(srpc_bulk_t, bk_iovs[npages]));
121         bk->bk_sink = sink;
122         bk->bk_niov = npages;
123         bk->bk_len  = npages * CFS_PAGE_SIZE;
124 #ifndef __KERNEL__
125         LIBCFS_ALLOC(pages, sizeof(cfs_page_t *) * npages);
126         if (pages == NULL) {
127                 LIBCFS_FREE(bk, offsetof(srpc_bulk_t, bk_iovs[npages]));
128                 CERROR ("Can't allocate page array for %d pages\n", npages);
129                 return NULL;
130         }
131
132         memset(pages, 0, sizeof(cfs_page_t *) * npages);
133         bk->bk_pages = pages;
134 #else
135         UNUSED (pages);
136 #endif
137
138         for (i = 0; i < npages; i++) {
139                 cfs_page_t *pg = cfs_alloc_page(CFS_ALLOC_STD);
140
141                 if (pg == NULL) {
142                         CERROR ("Can't allocate page %d of %d\n", i, npages);
143                         srpc_free_bulk(bk);
144                         return NULL;
145                 }
146
147                 srpc_add_bulk_page(bk, pg, i);
148         }
149
150         return bk;
151 }
152
153
154 static inline struct list_head *
155 srpc_nid2peerlist (lnet_nid_t nid)
156 {
157         unsigned int hash = ((unsigned int)nid) % SRPC_PEER_HASH_SIZE;
158
159         return &srpc_data.rpc_peers[hash];
160 }
161
162 static inline srpc_peer_t *
163 srpc_create_peer (lnet_nid_t nid)
164 {
165         srpc_peer_t *peer;
166
167         LASSERT (nid != LNET_NID_ANY);
168
169         LIBCFS_ALLOC(peer, sizeof(srpc_peer_t));
170         if (peer == NULL) {
171                 CERROR ("Failed to allocate peer structure for %s\n",
172                         libcfs_nid2str(nid));
173                 return NULL;
174         }
175
176         memset(peer, 0, sizeof(srpc_peer_t));
177         peer->stp_nid     = nid;
178         peer->stp_credits = SRPC_PEER_CREDITS;
179
180         spin_lock_init(&peer->stp_lock);
181         CFS_INIT_LIST_HEAD(&peer->stp_rpcq);
182         CFS_INIT_LIST_HEAD(&peer->stp_ctl_rpcq);
183         return peer;
184 }
185
186 srpc_peer_t *
187 srpc_find_peer_locked (lnet_nid_t nid)
188 {
189         struct list_head *peer_list = srpc_nid2peerlist(nid);
190         srpc_peer_t      *peer;
191
192         LASSERT (nid != LNET_NID_ANY);
193
194         list_for_each_entry (peer, peer_list, stp_list) {
195                 if (peer->stp_nid == nid)
196                         return peer;
197         }
198
199         return NULL;
200 }
201
202 static srpc_peer_t *
203 srpc_nid2peer (lnet_nid_t nid)
204 {
205         srpc_peer_t *peer;
206         srpc_peer_t *new_peer;
207
208         spin_lock(&srpc_data.rpc_glock);
209         peer = srpc_find_peer_locked(nid);
210         spin_unlock(&srpc_data.rpc_glock);
211
212         if (peer != NULL)
213                 return peer;
214         
215         new_peer = srpc_create_peer(nid);
216
217         spin_lock(&srpc_data.rpc_glock);
218
219         peer = srpc_find_peer_locked(nid);
220         if (peer != NULL) {
221                 spin_unlock(&srpc_data.rpc_glock);
222                 if (new_peer != NULL)
223                         LIBCFS_FREE(new_peer, sizeof(srpc_peer_t));
224
225                 return peer;
226         }
227
228         if (new_peer == NULL) {
229                 spin_unlock(&srpc_data.rpc_glock);
230                 return NULL;
231         }
232                 
233         list_add_tail(&new_peer->stp_list, srpc_nid2peerlist(nid));
234         spin_unlock(&srpc_data.rpc_glock);
235         return new_peer;
236 }
237
238 static inline __u64
239 srpc_next_id (void)
240 {
241         __u64 id;
242
243         spin_lock(&srpc_data.rpc_glock);
244         id = srpc_data.rpc_matchbits++;
245         spin_unlock(&srpc_data.rpc_glock);
246         return id;
247 }
248
249 void
250 srpc_init_server_rpc (srpc_server_rpc_t *rpc,
251                       srpc_service_t *sv, srpc_buffer_t *buffer)
252 {
253         memset(rpc, 0, sizeof(*rpc));
254         swi_init_workitem(&rpc->srpc_wi, rpc, srpc_handle_rpc);
255
256         rpc->srpc_ev.ev_fired = 1; /* no event expected now */
257
258         rpc->srpc_service  = sv;
259         rpc->srpc_reqstbuf = buffer;
260         rpc->srpc_peer     = buffer->buf_peer;
261         rpc->srpc_self     = buffer->buf_self;
262         rpc->srpc_replymdh = LNET_INVALID_HANDLE;
263 }
264
265 int
266 srpc_add_service (srpc_service_t *sv)
267 {
268         int                id = sv->sv_id;
269         int                i;
270         srpc_server_rpc_t *rpc;
271
272         LASSERT (sv->sv_concur > 0);
273         LASSERT (0 <= id && id <= SRPC_SERVICE_MAX_ID);
274
275         spin_lock(&srpc_data.rpc_glock);
276
277         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
278
279         if (srpc_data.rpc_services[id] != NULL) {
280                 spin_unlock(&srpc_data.rpc_glock);
281                 return -EBUSY;
282         }
283
284         srpc_data.rpc_services[id] = sv;
285         spin_unlock(&srpc_data.rpc_glock);
286
287         sv->sv_nprune       = 0;
288         sv->sv_nposted_msg  = 0;
289         sv->sv_shuttingdown = 0;
290         spin_lock_init(&sv->sv_lock);
291         CFS_INIT_LIST_HEAD(&sv->sv_free_rpcq);
292         CFS_INIT_LIST_HEAD(&sv->sv_active_rpcq);
293         CFS_INIT_LIST_HEAD(&sv->sv_posted_msgq);
294         CFS_INIT_LIST_HEAD(&sv->sv_blocked_msgq);
295
296         sv->sv_ev.ev_data = sv;
297         sv->sv_ev.ev_type = SRPC_REQUEST_RCVD;
298
299         for (i = 0; i < sv->sv_concur; i++) {
300                 LIBCFS_ALLOC(rpc, sizeof(*rpc));
301                 if (rpc == NULL) goto enomem;
302
303                 list_add(&rpc->srpc_list, &sv->sv_free_rpcq);
304         }
305
306         CDEBUG (D_NET, "Adding service: id %d, name %s, concurrency %d\n",
307                 id, sv->sv_name, sv->sv_concur);
308         return 0;
309
310 enomem:
311         while (!list_empty(&sv->sv_free_rpcq)) {
312                 rpc = list_entry(sv->sv_free_rpcq.next,
313                                  srpc_server_rpc_t, srpc_list);
314                 list_del(&rpc->srpc_list);
315                 LIBCFS_FREE(rpc, sizeof(*rpc));
316         }
317
318         spin_lock(&srpc_data.rpc_glock);
319         srpc_data.rpc_services[id] = NULL;
320         spin_unlock(&srpc_data.rpc_glock);
321         return -ENOMEM;
322 }
323
324 int
325 srpc_remove_service (srpc_service_t *sv)
326 {
327         int id = sv->sv_id;
328
329         spin_lock(&srpc_data.rpc_glock);
330
331         if (srpc_data.rpc_services[id] != sv) {
332                 spin_unlock(&srpc_data.rpc_glock);
333                 return -ENOENT;
334         }
335
336         srpc_data.rpc_services[id] = NULL;
337         spin_unlock(&srpc_data.rpc_glock);
338         return 0;
339 }
340
341 int
342 srpc_post_passive_rdma(int portal, __u64 matchbits, void *buf,
343                        int len, int options, lnet_process_id_t peer,
344                        lnet_handle_md_t *mdh, srpc_event_t *ev)
345 {
346         int              rc;
347         lnet_md_t        md;
348         lnet_handle_me_t meh;
349
350         rc = LNetMEAttach(portal, peer, matchbits, 0,
351                           LNET_UNLINK, LNET_INS_AFTER, &meh);
352         if (rc != 0) {
353                 CERROR ("LNetMEAttach failed: %d\n", rc);
354                 LASSERT (rc == -ENOMEM);
355                 return -ENOMEM;
356         }
357
358         md.threshold = 1;
359         md.user_ptr  = ev;
360         md.start     = buf;
361         md.length    = len;
362         md.options   = options;
363         md.eq_handle = srpc_data.rpc_lnet_eq;
364
365         rc = LNetMDAttach(meh, md, LNET_UNLINK, mdh);
366         if (rc != 0) {
367                 CERROR ("LNetMDAttach failed: %d\n", rc);
368                 LASSERT (rc == -ENOMEM);
369
370                 rc = LNetMEUnlink(meh);
371                 LASSERT (rc == 0);
372                 return -ENOMEM;
373         }
374
375         CDEBUG (D_NET,
376                 "Posted passive RDMA: peer %s, portal %d, matchbits "LPX64"\n",
377                 libcfs_id2str(peer), portal, matchbits);
378         return 0;
379 }
380
381 int
382 srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len, 
383                       int options, lnet_process_id_t peer, lnet_nid_t self,
384                       lnet_handle_md_t *mdh, srpc_event_t *ev)
385 {
386         int       rc;
387         lnet_md_t md;
388
389         md.user_ptr  = ev;
390         md.start     = buf;
391         md.length    = len;
392         md.eq_handle = srpc_data.rpc_lnet_eq;
393         md.threshold = ((options & LNET_MD_OP_GET) != 0) ? 2 : 1;
394         md.options   = options & ~(LNET_MD_OP_PUT | LNET_MD_OP_GET);
395
396         rc = LNetMDBind(md, LNET_UNLINK, mdh);
397         if (rc != 0) {
398                 CERROR ("LNetMDBind failed: %d\n", rc);
399                 LASSERT (rc == -ENOMEM);
400                 return -ENOMEM;
401         }
402
403         /* this is kind of an abuse of the LNET_MD_OP_{PUT,GET} options.
404          * they're only meaningful for MDs attached to an ME (i.e. passive
405          * buffers... */
406         if ((options & LNET_MD_OP_PUT) != 0) {
407                 rc = LNetPut(self, *mdh, LNET_NOACK_REQ, peer,
408                              portal, matchbits, 0, 0);
409         } else {
410                 LASSERT ((options & LNET_MD_OP_GET) != 0);
411
412                 rc = LNetGet(self, *mdh, peer, portal, matchbits, 0);
413         }
414
415         if (rc != 0) {
416                 CERROR ("LNet%s(%s, %d, "LPD64") failed: %d\n",
417                         ((options & LNET_MD_OP_PUT) != 0) ? "Put" : "Get",
418                         libcfs_id2str(peer), portal, matchbits, rc);
419
420                 /* The forthcoming unlink event will complete this operation
421                  * with failure, so fall through and return success here.
422                  */
423                 rc = LNetMDUnlink(*mdh);
424                 LASSERT (rc == 0);
425         } else {
426                 CDEBUG (D_NET,
427                         "Posted active RDMA: peer %s, portal %u, matchbits "LPX64"\n",
428                         libcfs_id2str(peer), portal, matchbits);
429         }
430         return 0;
431 }
432
433 int
434 srpc_post_active_rqtbuf(lnet_process_id_t peer, int service, void *buf,
435                         int len, lnet_handle_md_t *mdh, srpc_event_t *ev)
436 {
437         int rc;
438         int portal;
439
440         if (service > SRPC_FRAMEWORK_SERVICE_MAX_ID)
441                 portal = SRPC_REQUEST_PORTAL;
442         else
443                 portal = SRPC_FRAMEWORK_REQUEST_PORTAL;
444
445         rc = srpc_post_active_rdma(portal, service, buf, len, 
446                                    LNET_MD_OP_PUT, peer,
447                                    LNET_NID_ANY, mdh, ev);
448         return rc;
449 }
450
451 int
452 srpc_post_passive_rqtbuf(int service, void *buf, int len,
453                          lnet_handle_md_t *mdh, srpc_event_t *ev)
454 {
455         int               rc;
456         int               portal;
457         lnet_process_id_t any = {.nid = LNET_NID_ANY,
458                                  .pid = LNET_PID_ANY};
459
460         if (service > SRPC_FRAMEWORK_SERVICE_MAX_ID)
461                 portal = SRPC_REQUEST_PORTAL;
462         else
463                 portal = SRPC_FRAMEWORK_REQUEST_PORTAL;
464
465         rc = srpc_post_passive_rdma(portal, service, buf, len,
466                                     LNET_MD_OP_PUT, any, mdh, ev);
467         return rc;
468 }
469
470 int
471 srpc_service_post_buffer (srpc_service_t *sv, srpc_buffer_t *buf)
472 {
473         srpc_msg_t *msg = &buf->buf_msg;
474         int         rc;
475
476         LASSERT (!sv->sv_shuttingdown);
477
478         buf->buf_mdh = LNET_INVALID_HANDLE;
479         list_add(&buf->buf_list, &sv->sv_posted_msgq);
480         sv->sv_nposted_msg++;
481         spin_unlock(&sv->sv_lock);
482
483         rc = srpc_post_passive_rqtbuf(sv->sv_id, msg, sizeof(*msg),
484                                       &buf->buf_mdh, &sv->sv_ev);
485
486         /* At this point, a RPC (new or delayed) may have arrived in
487          * msg and its event handler has been called. So we must add
488          * buf to sv_posted_msgq _before_ dropping sv_lock */
489
490         spin_lock(&sv->sv_lock);
491
492         if (rc == 0) {
493                 if (sv->sv_shuttingdown) {
494                         spin_unlock(&sv->sv_lock);
495
496                         /* srpc_shutdown_service might have tried to unlink me
497                          * when my buf_mdh was still invalid */
498                         LNetMDUnlink(buf->buf_mdh);
499
500                         spin_lock(&sv->sv_lock);
501                 }
502                 return 0;
503         }
504
505         sv->sv_nposted_msg--;
506         if (sv->sv_shuttingdown) return rc;
507
508         list_del(&buf->buf_list);
509
510         spin_unlock(&sv->sv_lock);
511         LIBCFS_FREE(buf, sizeof(*buf));
512         spin_lock(&sv->sv_lock);
513         return rc; 
514 }
515
516 int
517 srpc_service_add_buffers (srpc_service_t *sv, int nbuffer)
518 {
519         int                rc;
520         int                posted;
521         srpc_buffer_t     *buf;
522
523         LASSERTF (nbuffer > 0,
524                   "nbuffer must be positive: %d\n", nbuffer);
525
526         for (posted = 0; posted < nbuffer; posted++) {
527                 LIBCFS_ALLOC(buf, sizeof(*buf));
528                 if (buf == NULL) break;
529
530                 spin_lock(&sv->sv_lock);
531                 rc = srpc_service_post_buffer(sv, buf);
532                 spin_unlock(&sv->sv_lock);
533
534                 if (rc != 0) break;
535         }
536
537         return posted;
538 }
539
540 void
541 srpc_service_remove_buffers (srpc_service_t *sv, int nbuffer)
542 {
543         LASSERTF (nbuffer > 0,
544                   "nbuffer must be positive: %d\n", nbuffer);
545
546         spin_lock(&sv->sv_lock);
547
548         LASSERT (sv->sv_nprune >= 0);
549         LASSERT (!sv->sv_shuttingdown);
550
551         sv->sv_nprune += nbuffer;
552
553         spin_unlock(&sv->sv_lock);
554         return;
555 }
556
557 /* returns 1 if sv has finished, otherwise 0 */
558 int
559 srpc_finish_service (srpc_service_t *sv)
560 {
561         srpc_server_rpc_t *rpc;
562         srpc_buffer_t     *buf;
563
564         spin_lock(&sv->sv_lock);
565
566         LASSERT (sv->sv_shuttingdown); /* srpc_shutdown_service called */
567
568         if (sv->sv_nposted_msg != 0 || !list_empty(&sv->sv_active_rpcq)) {
569                 CDEBUG (D_NET,
570                         "waiting for %d posted buffers to unlink and "
571                         "in-flight RPCs to die.\n",
572                         sv->sv_nposted_msg);
573
574                 if (!list_empty(&sv->sv_active_rpcq)) {
575                         rpc = list_entry(sv->sv_active_rpcq.next,
576                                          srpc_server_rpc_t, srpc_list);
577                         CDEBUG (D_NETERROR,
578                                 "Active RPC on shutdown: sv %s, peer %s, "
579                                 "wi %s scheduled %d running %d, "
580                                 "ev fired %d type %d status %d lnet %d\n",
581                                 sv->sv_name, libcfs_id2str(rpc->srpc_peer),
582                                 swi_state2str(rpc->srpc_wi.wi_state),
583                                 rpc->srpc_wi.wi_scheduled,
584                                 rpc->srpc_wi.wi_running,
585                                 rpc->srpc_ev.ev_fired,
586                                 rpc->srpc_ev.ev_type,
587                                 rpc->srpc_ev.ev_status,
588                                 rpc->srpc_ev.ev_lnet);
589                 }
590
591                 spin_unlock(&sv->sv_lock);
592                 return 0;
593         }
594
595         spin_unlock(&sv->sv_lock); /* no lock needed from now on */
596
597         for (;;) {
598                 struct list_head *q;
599
600                 if (!list_empty(&sv->sv_posted_msgq))
601                         q = &sv->sv_posted_msgq;
602                 else if (!list_empty(&sv->sv_blocked_msgq))
603                         q = &sv->sv_blocked_msgq;
604                 else
605                         break;
606
607                 buf = list_entry(q->next, srpc_buffer_t, buf_list);
608                 list_del(&buf->buf_list);
609
610                 LIBCFS_FREE(buf, sizeof(*buf));
611         }
612
613         while (!list_empty(&sv->sv_free_rpcq)) {
614                 rpc = list_entry(sv->sv_free_rpcq.next,
615                                  srpc_server_rpc_t, srpc_list);
616                 list_del(&rpc->srpc_list);
617                 LIBCFS_FREE(rpc, sizeof(*rpc));
618         }
619
620         return 1;
621 }
622
623 /* called with sv->sv_lock held */
624 void
625 srpc_service_recycle_buffer (srpc_service_t *sv, srpc_buffer_t *buf)
626 {
627         if (sv->sv_shuttingdown)
628                 goto free;
629
630         if (sv->sv_nprune == 0) {
631                 if (srpc_service_post_buffer(sv, buf) != 0)
632                         CWARN ("Failed to post %s buffer\n", sv->sv_name);
633                 return;
634         }
635
636         sv->sv_nprune--;
637 free:
638         spin_unlock(&sv->sv_lock);
639         LIBCFS_FREE(buf, sizeof(*buf));
640         spin_lock(&sv->sv_lock);
641 }
642
643 void
644 srpc_shutdown_service (srpc_service_t *sv)
645 {
646         srpc_server_rpc_t *rpc;
647         srpc_buffer_t     *buf;
648
649         spin_lock(&sv->sv_lock);
650
651         CDEBUG (D_NET, "Shutting down service: id %d, name %s\n",
652                 sv->sv_id, sv->sv_name);
653
654         sv->sv_shuttingdown = 1; /* i.e. no new active RPC */
655
656         /* schedule in-flight RPCs to notice the shutdown */
657         list_for_each_entry (rpc, &sv->sv_active_rpcq, srpc_list) {
658                 swi_schedule_workitem(&rpc->srpc_wi);
659         }
660
661         spin_unlock(&sv->sv_lock);
662
663         /* OK to traverse sv_posted_msgq without lock, since no one
664          * touches sv_posted_msgq now */
665         list_for_each_entry (buf, &sv->sv_posted_msgq, buf_list)
666                 LNetMDUnlink(buf->buf_mdh);
667
668         return;
669 }
670
671 int
672 srpc_send_request (srpc_client_rpc_t *rpc)
673 {
674         srpc_event_t *ev = &rpc->crpc_reqstev;
675         int           rc;
676
677         ev->ev_fired = 0;
678         ev->ev_data  = rpc;
679         ev->ev_type  = SRPC_REQUEST_SENT;
680
681         rc = srpc_post_active_rqtbuf(rpc->crpc_dest, rpc->crpc_service,
682                                      &rpc->crpc_reqstmsg, sizeof(srpc_msg_t),
683                                      &rpc->crpc_reqstmdh, ev);
684         if (rc != 0) {
685                 LASSERT (rc == -ENOMEM);
686                 ev->ev_fired = 1;  /* no more event expected */
687         }
688         return rc;
689 }
690
691 int
692 srpc_prepare_reply (srpc_client_rpc_t *rpc)
693 {
694         srpc_event_t *ev = &rpc->crpc_replyev;
695         __u64        *id = &rpc->crpc_reqstmsg.msg_body.reqst.rpyid;
696         int           rc;
697
698         ev->ev_fired = 0;
699         ev->ev_data  = rpc;
700         ev->ev_type  = SRPC_REPLY_RCVD;
701
702         *id = srpc_next_id();
703
704         rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, *id,
705                                     &rpc->crpc_replymsg, sizeof(srpc_msg_t),
706                                     LNET_MD_OP_PUT, rpc->crpc_dest,
707                                     &rpc->crpc_replymdh, ev);
708         if (rc != 0) {
709                 LASSERT (rc == -ENOMEM);
710                 ev->ev_fired = 1;  /* no more event expected */
711         }
712         return rc;
713 }
714
715 int
716 srpc_prepare_bulk (srpc_client_rpc_t *rpc)
717 {
718         srpc_bulk_t  *bk = &rpc->crpc_bulk;
719         srpc_event_t *ev = &rpc->crpc_bulkev;
720         __u64        *id = &rpc->crpc_reqstmsg.msg_body.reqst.bulkid;
721         int           rc;
722         int           opt;
723
724         LASSERT (bk->bk_niov <= LNET_MAX_IOV);
725
726         if (bk->bk_niov == 0) return 0; /* nothing to do */
727
728         opt = bk->bk_sink ? LNET_MD_OP_PUT : LNET_MD_OP_GET;
729 #ifdef __KERNEL__
730         opt |= LNET_MD_KIOV;
731 #else
732         opt |= LNET_MD_IOVEC;
733 #endif
734
735         ev->ev_fired = 0;
736         ev->ev_data  = rpc;
737         ev->ev_type  = SRPC_BULK_REQ_RCVD;
738
739         *id = srpc_next_id();
740
741         rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, *id,
742                                     &bk->bk_iovs[0], bk->bk_niov, opt,
743                                     rpc->crpc_dest, &bk->bk_mdh, ev);
744         if (rc != 0) {
745                 LASSERT (rc == -ENOMEM);
746                 ev->ev_fired = 1;  /* no more event expected */
747         }
748         return rc;
749 }
750
751 int
752 srpc_do_bulk (srpc_server_rpc_t *rpc)
753 {
754         srpc_event_t  *ev = &rpc->srpc_ev;
755         srpc_bulk_t   *bk = rpc->srpc_bulk;
756         __u64          id = rpc->srpc_reqstbuf->buf_msg.msg_body.reqst.bulkid;
757         int            rc;
758         int            opt;
759
760         LASSERT (bk != NULL);
761
762         opt = bk->bk_sink ? LNET_MD_OP_GET : LNET_MD_OP_PUT;
763 #ifdef __KERNEL__
764         opt |= LNET_MD_KIOV;
765 #else
766         opt |= LNET_MD_IOVEC;
767 #endif
768
769         ev->ev_fired = 0;
770         ev->ev_data  = rpc;
771         ev->ev_type  = bk->bk_sink ? SRPC_BULK_GET_RPLD : SRPC_BULK_PUT_SENT;
772
773         rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, id,
774                                    &bk->bk_iovs[0], bk->bk_niov, opt,
775                                    rpc->srpc_peer, rpc->srpc_self,
776                                    &bk->bk_mdh, ev);
777         if (rc != 0)
778                 ev->ev_fired = 1;  /* no more event expected */
779         return rc;
780 }
781
782 /* called with srpc_service_t::sv_lock held */
783 inline void
784 srpc_schedule_server_rpc (srpc_server_rpc_t *rpc)
785 {
786         srpc_service_t *sv = rpc->srpc_service;
787
788         if (sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID)
789                 swi_schedule_workitem(&rpc->srpc_wi);
790         else    /* framework RPCs are handled one by one */
791                 swi_schedule_serial_workitem(&rpc->srpc_wi);
792
793         return;
794 }
795
796 /* only called from srpc_handle_rpc */
797 void
798 srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status)
799 {
800         srpc_service_t *sv = rpc->srpc_service;
801         srpc_buffer_t  *buffer;
802
803         LASSERT (status != 0 || rpc->srpc_wi.wi_state == SWI_STATE_DONE);
804
805         rpc->srpc_status = status;
806
807         CDEBUG (status == 0 ? D_NET : D_NETERROR,
808                 "Server RPC done: service %s, peer %s, status %s:%d\n",
809                 sv->sv_name, libcfs_id2str(rpc->srpc_peer),
810                 swi_state2str(rpc->srpc_wi.wi_state), status);
811
812         if (status != 0) {
813                 spin_lock(&srpc_data.rpc_glock);
814                 srpc_data.rpc_counters.rpcs_dropped++;
815                 spin_unlock(&srpc_data.rpc_glock);
816         }
817
818         if (rpc->srpc_done != NULL)
819                 (*rpc->srpc_done) (rpc);
820         LASSERT (rpc->srpc_bulk == NULL);
821
822         spin_lock(&sv->sv_lock);
823
824         if (rpc->srpc_reqstbuf != NULL) {
825                 /* NB might drop sv_lock in srpc_service_recycle_buffer, but
826                  * sv won't go away for sv_active_rpcq must not be empty */
827                 srpc_service_recycle_buffer(sv, rpc->srpc_reqstbuf);
828                 rpc->srpc_reqstbuf = NULL;
829         }
830
831         list_del(&rpc->srpc_list); /* from sv->sv_active_rpcq */
832
833         /*
834          * No one can schedule me now since:
835          * - I'm not on sv_active_rpcq.
836          * - all LNet events have been fired.
837          * Cancel pending schedules and prevent future schedule attempts:
838          */
839         LASSERT (rpc->srpc_ev.ev_fired);
840         swi_kill_workitem(&rpc->srpc_wi);
841
842         if (!sv->sv_shuttingdown && !list_empty(&sv->sv_blocked_msgq)) {
843                 buffer = list_entry(sv->sv_blocked_msgq.next,
844                                     srpc_buffer_t, buf_list);
845                 list_del(&buffer->buf_list);
846
847                 srpc_init_server_rpc(rpc, sv, buffer);
848                 list_add_tail(&rpc->srpc_list, &sv->sv_active_rpcq);
849                 srpc_schedule_server_rpc(rpc);
850         } else {
851                 list_add(&rpc->srpc_list, &sv->sv_free_rpcq);
852         }
853
854         spin_unlock(&sv->sv_lock);
855         return;
856 }
857
858 /* handles an incoming RPC */
859 int
860 srpc_handle_rpc (swi_workitem_t *wi)
861 {
862         srpc_server_rpc_t *rpc = wi->wi_data;
863         srpc_service_t    *sv = rpc->srpc_service;
864         srpc_event_t      *ev = &rpc->srpc_ev;
865         int                rc = 0;
866
867         LASSERT (wi == &rpc->srpc_wi);
868
869         spin_lock(&sv->sv_lock);
870
871         if (sv->sv_shuttingdown) {
872                 spin_unlock(&sv->sv_lock);
873
874                 if (rpc->srpc_bulk != NULL)
875                         LNetMDUnlink(rpc->srpc_bulk->bk_mdh);
876                 LNetMDUnlink(rpc->srpc_replymdh);
877
878                 if (ev->ev_fired) { /* no more event, OK to finish */
879                         srpc_server_rpc_done(rpc, -ESHUTDOWN);
880                         return 1;
881                 }
882                 return 0;
883         }
884
885         spin_unlock(&sv->sv_lock);
886
887         switch (wi->wi_state) {
888         default:
889                 LBUG ();
890         case SWI_STATE_NEWBORN: {
891                 srpc_msg_t           *msg;
892                 srpc_generic_reply_t *reply;
893
894                 msg = &rpc->srpc_reqstbuf->buf_msg;
895                 reply = &rpc->srpc_replymsg.msg_body.reply;
896
897                 if (msg->msg_version != SRPC_MSG_VERSION &&
898                     msg->msg_version != __swab32(SRPC_MSG_VERSION)) {
899                         CWARN ("Version mismatch: %u, %u expected, from %s\n",
900                                msg->msg_version, SRPC_MSG_VERSION,
901                                libcfs_id2str(rpc->srpc_peer));
902                         reply->status = EPROTO;
903                 } else {
904                         reply->status = 0;
905                         rc = (*sv->sv_handler) (rpc);
906                         LASSERT (reply->status == 0 || !rpc->srpc_bulk);
907                 }
908
909                 if (rc != 0) {
910                         srpc_server_rpc_done(rpc, rc);
911                         return 1;
912                 }
913
914                 wi->wi_state = SWI_STATE_BULK_STARTED;
915
916                 if (rpc->srpc_bulk != NULL) {
917                         rc = srpc_do_bulk(rpc);
918                         if (rc == 0)
919                                 return 0; /* wait for bulk */
920
921                         LASSERT (ev->ev_fired);
922                         ev->ev_status = rc;
923                 }
924         }
925         case SWI_STATE_BULK_STARTED:
926                 LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired);
927
928                 if (rpc->srpc_bulk != NULL) {
929                         rc = ev->ev_status;
930
931                         if (sv->sv_bulk_ready != NULL)
932                                 rc = (*sv->sv_bulk_ready) (rpc, rc);
933
934                         if (rc != 0) {
935                                 srpc_server_rpc_done(rpc, rc);
936                                 return 1;
937                         }
938                 }
939
940                 wi->wi_state = SWI_STATE_REPLY_SUBMITTED;
941                 rc = srpc_send_reply(rpc);
942                 if (rc == 0)
943                         return 0; /* wait for reply */
944                 srpc_server_rpc_done(rpc, rc);
945                 return 1;
946
947         case SWI_STATE_REPLY_SUBMITTED:
948                 LASSERT (ev->ev_fired);
949
950                 wi->wi_state = SWI_STATE_DONE;
951                 srpc_server_rpc_done(rpc, ev->ev_status);
952                 return 1;
953         }
954
955         return 0;
956 }
957
958 void
959 srpc_client_rpc_expired (void *data)
960 {
961         srpc_client_rpc_t *rpc = data;
962
963         CWARN ("Client RPC expired: service %d, peer %s, timeout %d.\n",
964                rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
965                rpc->crpc_timeout);
966
967         spin_lock(&rpc->crpc_lock);
968
969         rpc->crpc_timeout = 0;
970         srpc_abort_rpc(rpc, -ETIMEDOUT);
971
972         spin_unlock(&rpc->crpc_lock);
973
974         spin_lock(&srpc_data.rpc_glock);
975         srpc_data.rpc_counters.rpcs_expired++;
976         spin_unlock(&srpc_data.rpc_glock);
977         return;
978 }
979
980 inline void
981 srpc_add_client_rpc_timer (srpc_client_rpc_t *rpc)
982 {
983         stt_timer_t *timer = &rpc->crpc_timer;
984
985         if (rpc->crpc_timeout == 0) return;
986
987         CFS_INIT_LIST_HEAD(&timer->stt_list);
988         timer->stt_data    = rpc;
989         timer->stt_func    = srpc_client_rpc_expired;
990         timer->stt_expires = cfs_time_add(rpc->crpc_timeout, 
991                                           cfs_time_current_sec());
992         stt_add_timer(timer);
993         return;
994 }
995
996 /* 
997  * Called with rpc->crpc_lock held.
998  *
999  * Upon exit the RPC expiry timer is not queued and the handler is not
1000  * running on any CPU. */
1001 void
1002 srpc_del_client_rpc_timer (srpc_client_rpc_t *rpc)
1003 {     
1004         /* timer not planted or already exploded */
1005         if (rpc->crpc_timeout == 0) return;
1006
1007         /* timer sucessfully defused */
1008         if (stt_del_timer(&rpc->crpc_timer)) return;
1009
1010 #ifdef __KERNEL__
1011         /* timer detonated, wait for it to explode */
1012         while (rpc->crpc_timeout != 0) {
1013                 spin_unlock(&rpc->crpc_lock);
1014
1015                 cfs_schedule(); 
1016
1017                 spin_lock(&rpc->crpc_lock);
1018         }
1019 #else
1020         LBUG(); /* impossible in single-threaded runtime */
1021 #endif
1022         return;
1023 }
1024
1025 void
1026 srpc_check_sends (srpc_peer_t *peer, int credits)
1027 {
1028         struct list_head  *q;
1029         srpc_client_rpc_t *rpc;
1030
1031         LASSERT (credits >= 0);
1032         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
1033
1034         spin_lock(&peer->stp_lock);
1035         peer->stp_credits += credits;
1036
1037         while (peer->stp_credits) {
1038                 if (!list_empty(&peer->stp_ctl_rpcq))
1039                         q = &peer->stp_ctl_rpcq;
1040                 else if (!list_empty(&peer->stp_rpcq))
1041                         q = &peer->stp_rpcq;
1042                 else
1043                         break;
1044
1045                 peer->stp_credits--;
1046
1047                 rpc = list_entry(q->next, srpc_client_rpc_t, crpc_privl);
1048                 list_del_init(&rpc->crpc_privl);
1049                 srpc_client_rpc_decref(rpc);  /* --ref for peer->*rpcq */
1050
1051                 swi_schedule_workitem(&rpc->crpc_wi);
1052         }
1053
1054         spin_unlock(&peer->stp_lock);
1055         return;
1056 }
1057
1058 void
1059 srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status)
1060 {
1061         swi_workitem_t *wi = &rpc->crpc_wi;
1062         srpc_peer_t    *peer = rpc->crpc_peer;
1063
1064         LASSERT (status != 0 || wi->wi_state == SWI_STATE_DONE);
1065
1066         spin_lock(&rpc->crpc_lock);
1067
1068         rpc->crpc_closed = 1;
1069         if (rpc->crpc_status == 0)
1070                 rpc->crpc_status = status;
1071
1072         srpc_del_client_rpc_timer(rpc);
1073
1074         CDEBUG ((status == 0) ? D_NET : D_NETERROR,
1075                 "Client RPC done: service %d, peer %s, status %s:%d:%d\n",
1076                 rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1077                 swi_state2str(wi->wi_state), rpc->crpc_aborted, status);
1078
1079         /*
1080          * No one can schedule me now since:
1081          * - RPC timer has been defused.
1082          * - all LNet events have been fired.
1083          * - crpc_closed has been set, preventing srpc_abort_rpc from 
1084          *   scheduling me.
1085          * Cancel pending schedules and prevent future schedule attempts:
1086          */
1087         LASSERT (!srpc_event_pending(rpc));
1088         swi_kill_workitem(wi);
1089
1090         spin_unlock(&rpc->crpc_lock);
1091
1092         (*rpc->crpc_done) (rpc);
1093
1094         if (peer != NULL)
1095                 srpc_check_sends(peer, 1);
1096         return;
1097 }
1098
1099 /* sends an outgoing RPC */
1100 int
1101 srpc_send_rpc (swi_workitem_t *wi)
1102 {
1103         int                rc = 0;
1104         srpc_client_rpc_t *rpc = wi->wi_data;
1105         srpc_msg_t        *reply = &rpc->crpc_replymsg;
1106         int                do_bulk = rpc->crpc_bulk.bk_niov > 0;
1107
1108         LASSERT (rpc != NULL);
1109         LASSERT (wi == &rpc->crpc_wi);
1110
1111         spin_lock(&rpc->crpc_lock);
1112
1113         if (rpc->crpc_aborted) {
1114                 spin_unlock(&rpc->crpc_lock);
1115                 goto abort;
1116         }
1117
1118         spin_unlock(&rpc->crpc_lock);
1119
1120         switch (wi->wi_state) {
1121         default:
1122                 LBUG ();
1123         case SWI_STATE_NEWBORN:
1124                 LASSERT (!srpc_event_pending(rpc));
1125
1126                 rc = srpc_prepare_reply(rpc);
1127                 if (rc != 0) {
1128                         srpc_client_rpc_done(rpc, rc);
1129                         return 1;
1130                 }
1131
1132                 rc = srpc_prepare_bulk(rpc);
1133                 if (rc != 0) break;
1134
1135                 wi->wi_state = SWI_STATE_REQUEST_SUBMITTED;
1136                 rc = srpc_send_request(rpc);
1137                 break;
1138
1139         case SWI_STATE_REQUEST_SUBMITTED:
1140                 /* CAVEAT EMPTOR: rqtev, rpyev, and bulkev may come in any
1141                  * order; however, they're processed in a strict order: 
1142                  * rqt, rpy, and bulk. */
1143                 if (!rpc->crpc_reqstev.ev_fired) break;
1144
1145                 rc = rpc->crpc_reqstev.ev_status;
1146                 if (rc != 0) break;
1147
1148                 wi->wi_state = SWI_STATE_REQUEST_SENT;
1149                 /* perhaps more events, fall thru */
1150         case SWI_STATE_REQUEST_SENT: {
1151                 srpc_msg_type_t type = srpc_service2reply(rpc->crpc_service);
1152
1153                 if (!rpc->crpc_replyev.ev_fired) break;
1154
1155                 rc = rpc->crpc_replyev.ev_status;
1156                 if (rc != 0) break;
1157
1158                 if ((reply->msg_type != type && 
1159                      reply->msg_type != __swab32(type)) ||
1160                     (reply->msg_magic != SRPC_MSG_MAGIC &&
1161                      reply->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
1162                         CWARN ("Bad message from %s: type %u (%d expected),"
1163                                " magic %u (%d expected).\n",
1164                                libcfs_id2str(rpc->crpc_dest),
1165                                reply->msg_type, type,
1166                                reply->msg_magic, SRPC_MSG_MAGIC);
1167                         rc = -EBADMSG;
1168                         break;
1169                 }
1170
1171                 if (do_bulk && reply->msg_body.reply.status != 0) {
1172                         CWARN ("Remote error %d at %s, unlink bulk buffer in "
1173                                "case peer didn't initiate bulk transfer\n",
1174                                reply->msg_body.reply.status,
1175                                libcfs_id2str(rpc->crpc_dest));
1176                         LNetMDUnlink(rpc->crpc_bulk.bk_mdh);
1177                 }
1178
1179                 wi->wi_state = SWI_STATE_REPLY_RECEIVED;
1180         }
1181         case SWI_STATE_REPLY_RECEIVED:
1182                 if (do_bulk && !rpc->crpc_bulkev.ev_fired) break;
1183
1184                 rc = do_bulk ? rpc->crpc_bulkev.ev_status : 0;
1185
1186                 /* Bulk buffer was unlinked due to remote error. Clear error
1187                  * since reply buffer still contains valid data.
1188                  * NB rpc->crpc_done shouldn't look into bulk data in case of
1189                  * remote error. */
1190                 if (do_bulk && rpc->crpc_bulkev.ev_lnet == LNET_EVENT_UNLINK &&
1191                     rpc->crpc_status == 0 && reply->msg_body.reply.status != 0)
1192                         rc = 0;
1193
1194                 wi->wi_state = SWI_STATE_DONE;
1195                 srpc_client_rpc_done(rpc, rc);
1196                 return 1;
1197         }
1198
1199         if (rc != 0) {
1200                 spin_lock(&rpc->crpc_lock);
1201                 srpc_abort_rpc(rpc, rc);
1202                 spin_unlock(&rpc->crpc_lock);
1203         }
1204
1205 abort:
1206         if (rpc->crpc_aborted) {
1207                 LNetMDUnlink(rpc->crpc_reqstmdh);
1208                 LNetMDUnlink(rpc->crpc_replymdh);
1209                 LNetMDUnlink(rpc->crpc_bulk.bk_mdh);
1210
1211                 if (!srpc_event_pending(rpc)) {
1212                         srpc_client_rpc_done(rpc, -EINTR);
1213                         return 1;
1214                 }
1215         }
1216         return 0;
1217 }
1218
1219 srpc_client_rpc_t *
1220 srpc_create_client_rpc (lnet_process_id_t peer, int service,
1221                         int nbulkiov, int bulklen,
1222                         void (*rpc_done)(srpc_client_rpc_t *),
1223                         void (*rpc_fini)(srpc_client_rpc_t *), void *priv)
1224 {
1225         srpc_client_rpc_t *rpc;
1226
1227         LIBCFS_ALLOC(rpc, offsetof(srpc_client_rpc_t,
1228                                    crpc_bulk.bk_iovs[nbulkiov]));
1229         if (rpc == NULL)
1230                 return NULL;
1231
1232         srpc_init_client_rpc(rpc, peer, service, nbulkiov,
1233                              bulklen, rpc_done, rpc_fini, priv);
1234         return rpc;
1235 }
1236
1237 /* called with rpc->crpc_lock held */
1238 static inline void
1239 srpc_queue_rpc (srpc_peer_t *peer, srpc_client_rpc_t *rpc)
1240 {
1241         int service = rpc->crpc_service;
1242
1243         LASSERT (peer->stp_nid == rpc->crpc_dest.nid);
1244         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
1245
1246         rpc->crpc_peer = peer;
1247
1248         spin_lock(&peer->stp_lock);
1249
1250         /* Framework RPCs that alter session state shall take precedence
1251          * over test RPCs and framework query RPCs */
1252         if (service <= SRPC_FRAMEWORK_SERVICE_MAX_ID &&
1253             service != SRPC_SERVICE_DEBUG &&
1254             service != SRPC_SERVICE_QUERY_STAT)
1255                 list_add_tail(&rpc->crpc_privl, &peer->stp_ctl_rpcq);
1256         else
1257                 list_add_tail(&rpc->crpc_privl, &peer->stp_rpcq);
1258
1259         srpc_client_rpc_addref(rpc); /* ++ref for peer->*rpcq */
1260         spin_unlock(&peer->stp_lock);
1261         return;
1262 }
1263
1264 /* called with rpc->crpc_lock held */
1265 void
1266 srpc_abort_rpc (srpc_client_rpc_t *rpc, int why)
1267 {
1268         srpc_peer_t *peer = rpc->crpc_peer;
1269
1270         LASSERT (why != 0);
1271
1272         if (rpc->crpc_aborted || /* already aborted */
1273             rpc->crpc_closed)    /* callback imminent */
1274                 return;
1275
1276         CDEBUG (D_NET,
1277                 "Aborting RPC: service %d, peer %s, state %s, why %d\n",
1278                 rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1279                 swi_state2str(rpc->crpc_wi.wi_state), why);
1280
1281         rpc->crpc_aborted = 1;
1282         rpc->crpc_status  = why;
1283
1284         if (peer != NULL) {
1285                 spin_lock(&peer->stp_lock);
1286
1287                 if (!list_empty(&rpc->crpc_privl)) { /* still queued */
1288                         list_del_init(&rpc->crpc_privl);
1289                         srpc_client_rpc_decref(rpc); /* --ref for peer->*rpcq */
1290                         rpc->crpc_peer = NULL;       /* no credit taken */
1291                 }
1292
1293                 spin_unlock(&peer->stp_lock);
1294         }
1295
1296         swi_schedule_workitem(&rpc->crpc_wi);
1297         return;
1298 }
1299
1300 /* called with rpc->crpc_lock held */
1301 void
1302 srpc_post_rpc (srpc_client_rpc_t *rpc)
1303 {
1304         srpc_peer_t *peer;
1305
1306         LASSERT (!rpc->crpc_aborted);
1307         LASSERT (rpc->crpc_peer == NULL);
1308         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
1309         LASSERT ((rpc->crpc_bulk.bk_len & ~CFS_PAGE_MASK) == 0);
1310
1311         CDEBUG (D_NET, "Posting RPC: peer %s, service %d, timeout %d\n",
1312                 libcfs_id2str(rpc->crpc_dest), rpc->crpc_service,
1313                 rpc->crpc_timeout);
1314
1315         srpc_add_client_rpc_timer(rpc);
1316
1317         peer = srpc_nid2peer(rpc->crpc_dest.nid);
1318         if (peer == NULL) {
1319                 srpc_abort_rpc(rpc, -ENOMEM);
1320                 return;
1321         }
1322
1323         srpc_queue_rpc(peer, rpc);
1324
1325         spin_unlock(&rpc->crpc_lock);
1326         srpc_check_sends(peer, 0);
1327         spin_lock(&rpc->crpc_lock);
1328         return;
1329 }
1330
1331
1332 int
1333 srpc_send_reply (srpc_server_rpc_t *rpc)
1334 {
1335         srpc_event_t   *ev = &rpc->srpc_ev;
1336         srpc_msg_t     *msg = &rpc->srpc_replymsg;
1337         srpc_buffer_t  *buffer = rpc->srpc_reqstbuf;
1338         srpc_service_t *sv = rpc->srpc_service;
1339         __u64           rpyid;
1340         int             rc;
1341
1342         LASSERT (buffer != NULL);
1343         rpyid = buffer->buf_msg.msg_body.reqst.rpyid;
1344
1345         spin_lock(&sv->sv_lock);
1346
1347         if (!sv->sv_shuttingdown &&
1348             sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID) {
1349                 /* Repost buffer before replying since test client
1350                  * might send me another RPC once it gets the reply */
1351                 if (srpc_service_post_buffer(sv, buffer) != 0)
1352                         CWARN ("Failed to repost %s buffer\n", sv->sv_name);
1353                 rpc->srpc_reqstbuf = NULL;
1354         }
1355
1356         spin_unlock(&sv->sv_lock);
1357
1358         ev->ev_fired = 0;
1359         ev->ev_data  = rpc;
1360         ev->ev_type  = SRPC_REPLY_SENT;
1361
1362         msg->msg_magic   = SRPC_MSG_MAGIC;
1363         msg->msg_version = SRPC_MSG_VERSION;
1364         msg->msg_type    = srpc_service2reply(sv->sv_id);
1365
1366         rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, rpyid, msg,
1367                                    sizeof(*msg), LNET_MD_OP_PUT,
1368                                    rpc->srpc_peer, rpc->srpc_self,
1369                                    &rpc->srpc_replymdh, ev);
1370         if (rc != 0)
1371                 ev->ev_fired = 1;  /* no more event expected */
1372         return rc;
1373 }
1374
1375 /* always called with LNET_LOCK() held, and in thread context */
1376 void 
1377 srpc_lnet_ev_handler (lnet_event_t *ev)
1378 {
1379         srpc_event_t      *rpcev = ev->md.user_ptr;
1380         srpc_client_rpc_t *crpc;
1381         srpc_server_rpc_t *srpc;
1382         srpc_buffer_t     *buffer;
1383         srpc_service_t    *sv;
1384         srpc_msg_t        *msg;
1385         srpc_msg_type_t    type;
1386
1387         LASSERT (!in_interrupt());
1388
1389         if (ev->status != 0) {
1390                 spin_lock(&srpc_data.rpc_glock);
1391                 srpc_data.rpc_counters.errors++;
1392                 spin_unlock(&srpc_data.rpc_glock);
1393         }
1394
1395         rpcev->ev_lnet = ev->type;
1396
1397         switch (rpcev->ev_type) {
1398         default:
1399                 LBUG ();
1400         case SRPC_REQUEST_SENT:
1401                 if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {
1402                         spin_lock(&srpc_data.rpc_glock);
1403                         srpc_data.rpc_counters.rpcs_sent++;
1404                         spin_unlock(&srpc_data.rpc_glock);
1405                 }
1406         case SRPC_REPLY_RCVD:
1407         case SRPC_BULK_REQ_RCVD:
1408                 crpc = rpcev->ev_data;
1409
1410                 LASSERT (rpcev == &crpc->crpc_reqstev ||
1411                          rpcev == &crpc->crpc_replyev ||
1412                          rpcev == &crpc->crpc_bulkev);
1413
1414                 spin_lock(&crpc->crpc_lock);
1415
1416                 LASSERT (rpcev->ev_fired == 0);
1417                 rpcev->ev_fired  = 1;
1418                 rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? 
1419                                                 -EINTR : ev->status;
1420                 swi_schedule_workitem(&crpc->crpc_wi);
1421
1422                 spin_unlock(&crpc->crpc_lock);
1423                 break;
1424
1425         case SRPC_REQUEST_RCVD:
1426                 sv = rpcev->ev_data;
1427
1428                 LASSERT (rpcev == &sv->sv_ev);
1429
1430                 spin_lock(&sv->sv_lock);
1431
1432                 LASSERT (ev->unlinked);
1433                 LASSERT (ev->type == LNET_EVENT_PUT ||
1434                          ev->type == LNET_EVENT_UNLINK);
1435                 LASSERT (ev->type != LNET_EVENT_UNLINK ||
1436                          sv->sv_shuttingdown);
1437
1438                 buffer = container_of(ev->md.start, srpc_buffer_t, buf_msg);
1439                 buffer->buf_peer = ev->initiator;
1440                 buffer->buf_self = ev->target.nid;
1441
1442                 sv->sv_nposted_msg--;
1443                 LASSERT (sv->sv_nposted_msg >= 0);
1444
1445                 if (sv->sv_shuttingdown) {
1446                         /* Leave buffer on sv->sv_posted_msgq since 
1447                          * srpc_finish_service needs to traverse it. */
1448                         spin_unlock(&sv->sv_lock);
1449                         break;
1450                 }
1451
1452                 list_del(&buffer->buf_list); /* from sv->sv_posted_msgq */
1453                 msg = &buffer->buf_msg;
1454                 type = srpc_service2request(sv->sv_id);
1455
1456                 if (ev->status != 0 || ev->mlength != sizeof(*msg) ||
1457                     (msg->msg_type != type && 
1458                      msg->msg_type != __swab32(type)) ||
1459                     (msg->msg_magic != SRPC_MSG_MAGIC &&
1460                      msg->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
1461                         CERROR ("Dropping RPC (%s) from %s: "
1462                                 "status %d mlength %d type %u magic %u.\n",
1463                                 sv->sv_name, libcfs_id2str(ev->initiator),
1464                                 ev->status, ev->mlength,
1465                                 msg->msg_type, msg->msg_magic);
1466
1467                         /* NB might drop sv_lock in srpc_service_recycle_buffer,
1468                          * sv_nposted_msg++ as an implicit reference to prevent
1469                          * sv from disappearing under me */
1470                         sv->sv_nposted_msg++;
1471                         srpc_service_recycle_buffer(sv, buffer);
1472                         sv->sv_nposted_msg--;
1473                         spin_unlock(&sv->sv_lock);
1474
1475                         if (ev->status == 0) { /* status!=0 counted already */
1476                                 spin_lock(&srpc_data.rpc_glock);
1477                                 srpc_data.rpc_counters.errors++;
1478                                 spin_unlock(&srpc_data.rpc_glock);
1479                         }
1480                         break;
1481                 }
1482
1483                 if (!list_empty(&sv->sv_free_rpcq)) {
1484                         srpc = list_entry(sv->sv_free_rpcq.next,
1485                                           srpc_server_rpc_t, srpc_list);
1486                         list_del(&srpc->srpc_list);
1487
1488                         srpc_init_server_rpc(srpc, sv, buffer);
1489                         list_add_tail(&srpc->srpc_list, &sv->sv_active_rpcq);
1490                         srpc_schedule_server_rpc(srpc);
1491                 } else {
1492                         list_add_tail(&buffer->buf_list, &sv->sv_blocked_msgq);
1493                 }
1494
1495                 spin_unlock(&sv->sv_lock);
1496
1497                 spin_lock(&srpc_data.rpc_glock);
1498                 srpc_data.rpc_counters.rpcs_rcvd++;
1499                 spin_unlock(&srpc_data.rpc_glock);
1500                 break;
1501
1502         case SRPC_BULK_GET_RPLD:
1503                 LASSERT (ev->type == LNET_EVENT_SEND ||
1504                          ev->type == LNET_EVENT_REPLY ||
1505                          ev->type == LNET_EVENT_UNLINK);
1506
1507                 if (ev->type == LNET_EVENT_SEND && 
1508                     ev->status == 0 && !ev->unlinked)
1509                         break; /* wait for the final LNET_EVENT_REPLY */
1510
1511         case SRPC_BULK_PUT_SENT:
1512                 if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {
1513                         spin_lock(&srpc_data.rpc_glock);
1514
1515                         if (rpcev->ev_type == SRPC_BULK_GET_RPLD)
1516                                 srpc_data.rpc_counters.bulk_get += ev->mlength;
1517                         else
1518                                 srpc_data.rpc_counters.bulk_put += ev->mlength;
1519
1520                         spin_unlock(&srpc_data.rpc_glock);
1521                 }
1522         case SRPC_REPLY_SENT:
1523                 srpc = rpcev->ev_data;
1524                 sv = srpc->srpc_service;
1525
1526                 LASSERT (rpcev == &srpc->srpc_ev);
1527
1528                 spin_lock(&sv->sv_lock);
1529                 rpcev->ev_fired  = 1;
1530                 rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? 
1531                                                 -EINTR : ev->status;
1532                 srpc_schedule_server_rpc(srpc);
1533                 spin_unlock(&sv->sv_lock);
1534                 break;
1535         }
1536
1537         return;
1538 }
1539
1540 #ifndef __KERNEL__
1541
1542 int
1543 srpc_check_event (int timeout)
1544 {
1545         lnet_event_t ev;
1546         int          rc;
1547         int          i;
1548
1549         rc = LNetEQPoll(&srpc_data.rpc_lnet_eq, 1,
1550                         timeout * 1000, &ev, &i);
1551         if (rc == 0)
1552                 return 0;
1553         
1554         LASSERT (rc == -EOVERFLOW || rc == 1);
1555         
1556         /* We can't affort to miss any events... */
1557         if (rc == -EOVERFLOW) {
1558                 CERROR ("Dropped an event!!!\n");
1559                 abort();
1560         }
1561                 
1562         srpc_lnet_ev_handler(&ev);
1563         return 1;
1564 }
1565
1566 #endif
1567
1568 int
1569 srpc_startup (void)
1570 {
1571         int i;
1572         int rc;
1573
1574         memset(&srpc_data, 0, sizeof(struct smoketest_rpc));
1575         spin_lock_init(&srpc_data.rpc_glock);
1576
1577         /* 1 second pause to avoid timestamp reuse */
1578         cfs_pause(cfs_time_seconds(1));
1579         srpc_data.rpc_matchbits = ((__u64) cfs_time_current_sec()) << 48;
1580
1581         srpc_data.rpc_state = SRPC_STATE_NONE;
1582
1583         LIBCFS_ALLOC(srpc_data.rpc_peers,
1584                      sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);
1585         if (srpc_data.rpc_peers == NULL) {
1586                 CERROR ("Failed to alloc peer hash.\n");
1587                 return -ENOMEM;
1588         }
1589
1590         for (i = 0; i < SRPC_PEER_HASH_SIZE; i++)
1591                 CFS_INIT_LIST_HEAD(&srpc_data.rpc_peers[i]);
1592
1593 #ifdef __KERNEL__
1594         rc = LNetNIInit(LUSTRE_SRV_LNET_PID);
1595 #else
1596         rc = LNetNIInit(getpid());
1597 #endif
1598         if (rc < 0) {
1599                 CERROR ("LNetNIInit() has failed: %d\n", rc);
1600                 LIBCFS_FREE(srpc_data.rpc_peers,
1601                             sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);
1602                 return rc;
1603         }
1604
1605         srpc_data.rpc_state = SRPC_STATE_NI_INIT;
1606
1607         srpc_data.rpc_lnet_eq = LNET_EQ_NONE;
1608 #ifdef __KERNEL__
1609         rc = LNetEQAlloc(16, srpc_lnet_ev_handler, &srpc_data.rpc_lnet_eq);
1610 #else
1611         rc = LNetEQAlloc(10240, LNET_EQ_HANDLER_NONE, &srpc_data.rpc_lnet_eq);
1612 #endif
1613         if (rc != 0) {
1614                 CERROR("LNetEQAlloc() has failed: %d\n", rc);
1615                 goto bail;
1616         }
1617
1618         rc = LNetSetLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
1619         LASSERT (rc == 0);
1620
1621         srpc_data.rpc_state = SRPC_STATE_EQ_INIT;
1622
1623         rc = swi_startup();
1624         if (rc != 0)
1625                 goto bail;
1626
1627         srpc_data.rpc_state = SRPC_STATE_WI_INIT;
1628
1629         rc = stt_startup();
1630
1631 bail:
1632         if (rc != 0)
1633                 srpc_shutdown();
1634         else
1635                 srpc_data.rpc_state = SRPC_STATE_RUNNING;
1636
1637         return rc;
1638 }
1639
1640 void
1641 srpc_shutdown (void)
1642 {
1643         int i;
1644         int rc;
1645         int state;
1646
1647         state = srpc_data.rpc_state;
1648         srpc_data.rpc_state = SRPC_STATE_STOPPING;
1649
1650         switch (state) {
1651         default:
1652                 LBUG ();
1653         case SRPC_STATE_RUNNING:
1654                 spin_lock(&srpc_data.rpc_glock);
1655
1656                 for (i = 0; i <= SRPC_SERVICE_MAX_ID; i++) {
1657                         srpc_service_t *sv = srpc_data.rpc_services[i];
1658
1659                         LASSERTF (sv == NULL,
1660                                   "service not empty: id %d, name %s\n",
1661                                   i, sv->sv_name);
1662                 }
1663
1664                 spin_unlock(&srpc_data.rpc_glock);
1665
1666                 stt_shutdown();
1667
1668         case SRPC_STATE_WI_INIT:
1669                 swi_shutdown();
1670
1671         case SRPC_STATE_EQ_INIT:
1672                 rc = LNetClearLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
1673                 LASSERT (rc == 0);
1674                 rc = LNetEQFree(srpc_data.rpc_lnet_eq);
1675                 LASSERT (rc == 0); /* the EQ should have no user by now */
1676
1677         case SRPC_STATE_NI_INIT:
1678                 LNetNIFini();
1679                 break;
1680         }
1681
1682         /* srpc_peer_t's are kept in hash until shutdown */
1683         for (i = 0; i < SRPC_PEER_HASH_SIZE; i++) {
1684                 srpc_peer_t *peer;
1685
1686                 while (!list_empty(&srpc_data.rpc_peers[i])) {
1687                         peer = list_entry(srpc_data.rpc_peers[i].next,
1688                                           srpc_peer_t, stp_list);
1689                         list_del(&peer->stp_list);
1690
1691                         LASSERT (list_empty(&peer->stp_rpcq));
1692                         LASSERT (list_empty(&peer->stp_ctl_rpcq));
1693                         LASSERT (peer->stp_credits == SRPC_PEER_CREDITS);
1694
1695                         LIBCFS_FREE(peer, sizeof(srpc_peer_t));
1696                 }
1697         }
1698
1699         LIBCFS_FREE(srpc_data.rpc_peers,
1700                     sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);
1701         return;
1702 }