Whamcloud - gitweb
i=liang:
[fs/lustre-release.git] / lnet / selftest / rpc.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Isaac Huang <isaac@clusterfs.com>
6  *
7  */
8
9 #define DEBUG_SUBSYSTEM S_LNET
10
11 #include "selftest.h"
12
13
14 #define SRPC_PEER_HASH_SIZE       101  /* # peer lists */
15
16 typedef enum {
17         SRPC_STATE_NONE,
18         SRPC_STATE_NI_INIT,
19         SRPC_STATE_EQ_INIT,
20         SRPC_STATE_WI_INIT,
21         SRPC_STATE_RUNNING,
22         SRPC_STATE_STOPPING,
23 } srpc_state_t;
24
25 struct smoketest_rpc {
26         spinlock_t        rpc_glock;     /* global lock */
27         srpc_service_t   *rpc_services[SRPC_SERVICE_MAX_ID + 1];
28         struct list_head *rpc_peers;     /* hash table of known peers */
29         lnet_handle_eq_t  rpc_lnet_eq;   /* _the_ LNet event queue */
30         srpc_state_t      rpc_state;
31         srpc_counters_t   rpc_counters;
32         __u64             rpc_matchbits; /* matchbits counter */
33 } srpc_data;
34
35 static int srpc_peer_credits = 16;
36 CFS_MODULE_PARM(srpc_peer_credits, "i", int, 0444,
37                 "# in-flight RPCs per peer (16 by default)");
38
39 /* forward ref's */
40 int srpc_handle_rpc (swi_workitem_t *wi);
41
42 void srpc_get_counters (srpc_counters_t *cnt)
43 {
44         spin_lock(&srpc_data.rpc_glock);
45         *cnt = srpc_data.rpc_counters;
46         spin_unlock(&srpc_data.rpc_glock);
47 }
48
49 void srpc_set_counters (const srpc_counters_t *cnt)
50 {
51         spin_lock(&srpc_data.rpc_glock);
52         srpc_data.rpc_counters = *cnt;
53         spin_unlock(&srpc_data.rpc_glock);
54 }
55
56 void
57 srpc_add_bulk_page (srpc_bulk_t *bk, cfs_page_t *pg, int i)
58 {
59         LASSERT (i >= 0 && i < bk->bk_niov);
60
61 #ifdef __KERNEL__
62         bk->bk_iovs[i].kiov_offset = 0;
63         bk->bk_iovs[i].kiov_page   = pg;
64         bk->bk_iovs[i].kiov_len    = CFS_PAGE_SIZE;
65 #else
66         LASSERT (bk->bk_pages != NULL);
67
68         bk->bk_pages[i] = pg;
69         bk->bk_iovs[i].iov_len  = CFS_PAGE_SIZE;
70         bk->bk_iovs[i].iov_base = cfs_page_address(pg);
71 #endif
72         return;
73 }
74
75 void
76 srpc_free_bulk (srpc_bulk_t *bk)
77 {
78         int         i;
79         cfs_page_t *pg;
80
81         LASSERT (bk != NULL);
82 #ifndef __KERNEL__
83         LASSERT (bk->bk_pages != NULL);
84 #endif
85
86         for (i = 0; i < bk->bk_niov; i++) {
87 #ifdef __KERNEL__
88                 pg = bk->bk_iovs[i].kiov_page;
89 #else
90                 pg = bk->bk_pages[i];
91 #endif
92                 if (pg == NULL) break;
93
94                 cfs_free_page(pg);
95         }
96
97 #ifndef __KERNEL__
98         LIBCFS_FREE(bk->bk_pages, sizeof(cfs_page_t *) * bk->bk_niov);
99 #endif
100         LIBCFS_FREE(bk, offsetof(srpc_bulk_t, bk_iovs[bk->bk_niov]));
101         return;
102 }
103
104 srpc_bulk_t *
105 srpc_alloc_bulk (int npages, int sink)
106 {
107         srpc_bulk_t  *bk;
108         cfs_page_t  **pages;
109         int           i;
110
111         LASSERT (npages > 0 && npages <= LNET_MAX_IOV);
112
113         LIBCFS_ALLOC(bk, offsetof(srpc_bulk_t, bk_iovs[npages]));
114         if (bk == NULL) {
115                 CERROR ("Can't allocate descriptor for %d pages\n", npages);
116                 return NULL;
117         }
118
119         memset(bk, 0, offsetof(srpc_bulk_t, bk_iovs[npages]));
120         bk->bk_sink = sink;
121         bk->bk_niov = npages;
122         bk->bk_len  = npages * CFS_PAGE_SIZE;
123 #ifndef __KERNEL__
124         LIBCFS_ALLOC(pages, sizeof(cfs_page_t *) * npages);
125         if (pages == NULL) {
126                 LIBCFS_FREE(bk, offsetof(srpc_bulk_t, bk_iovs[npages]));
127                 CERROR ("Can't allocate page array for %d pages\n", npages);
128                 return NULL;
129         }
130
131         memset(pages, 0, sizeof(cfs_page_t *) * npages);
132         bk->bk_pages = pages;
133 #else
134         UNUSED (pages);
135 #endif
136
137         for (i = 0; i < npages; i++) {
138                 cfs_page_t *pg = cfs_alloc_page(CFS_ALLOC_STD);
139
140                 if (pg == NULL) {
141                         CERROR ("Can't allocate page %d of %d\n", i, npages);
142                         srpc_free_bulk(bk);
143                         return NULL;
144                 }
145
146                 srpc_add_bulk_page(bk, pg, i);
147         }
148
149         return bk;
150 }
151
152
153 static inline struct list_head *
154 srpc_nid2peerlist (lnet_nid_t nid)
155 {
156         unsigned int hash = ((unsigned int)nid) % SRPC_PEER_HASH_SIZE;
157
158         return &srpc_data.rpc_peers[hash];
159 }
160
161 static inline srpc_peer_t *
162 srpc_create_peer (lnet_nid_t nid)
163 {
164         srpc_peer_t *peer;
165
166         LASSERT (nid != LNET_NID_ANY);
167
168         LIBCFS_ALLOC(peer, sizeof(srpc_peer_t));
169         if (peer == NULL) {
170                 CERROR ("Failed to allocate peer structure for %s\n",
171                         libcfs_nid2str(nid));
172                 return NULL;
173         }
174
175         memset(peer, 0, sizeof(srpc_peer_t));
176         peer->stp_nid     = nid;
177         peer->stp_credits = srpc_peer_credits;
178
179         spin_lock_init(&peer->stp_lock);
180         CFS_INIT_LIST_HEAD(&peer->stp_rpcq);
181         CFS_INIT_LIST_HEAD(&peer->stp_ctl_rpcq);
182         return peer;
183 }
184
185 srpc_peer_t *
186 srpc_find_peer_locked (lnet_nid_t nid)
187 {
188         struct list_head *peer_list = srpc_nid2peerlist(nid);
189         srpc_peer_t      *peer;
190
191         LASSERT (nid != LNET_NID_ANY);
192
193         list_for_each_entry (peer, peer_list, stp_list) {
194                 if (peer->stp_nid == nid)
195                         return peer;
196         }
197
198         return NULL;
199 }
200
201 static srpc_peer_t *
202 srpc_nid2peer (lnet_nid_t nid)
203 {
204         srpc_peer_t *peer;
205         srpc_peer_t *new_peer;
206
207         spin_lock(&srpc_data.rpc_glock);
208         peer = srpc_find_peer_locked(nid);
209         spin_unlock(&srpc_data.rpc_glock);
210
211         if (peer != NULL)
212                 return peer;
213         
214         new_peer = srpc_create_peer(nid);
215
216         spin_lock(&srpc_data.rpc_glock);
217
218         peer = srpc_find_peer_locked(nid);
219         if (peer != NULL) {
220                 spin_unlock(&srpc_data.rpc_glock);
221                 if (new_peer != NULL)
222                         LIBCFS_FREE(new_peer, sizeof(srpc_peer_t));
223
224                 return peer;
225         }
226
227         if (new_peer == NULL) {
228                 spin_unlock(&srpc_data.rpc_glock);
229                 return NULL;
230         }
231                 
232         list_add_tail(&new_peer->stp_list, srpc_nid2peerlist(nid));
233         spin_unlock(&srpc_data.rpc_glock);
234         return new_peer;
235 }
236
237 static inline __u64
238 srpc_next_id (void)
239 {
240         __u64 id;
241
242         spin_lock(&srpc_data.rpc_glock);
243         id = srpc_data.rpc_matchbits++;
244         spin_unlock(&srpc_data.rpc_glock);
245         return id;
246 }
247
248 void
249 srpc_init_server_rpc (srpc_server_rpc_t *rpc,
250                       srpc_service_t *sv, srpc_buffer_t *buffer)
251 {
252         memset(rpc, 0, sizeof(*rpc));
253         swi_init_workitem(&rpc->srpc_wi, rpc, srpc_handle_rpc);
254
255         rpc->srpc_ev.ev_fired = 1; /* no event expected now */
256
257         rpc->srpc_service  = sv;
258         rpc->srpc_reqstbuf = buffer;
259         rpc->srpc_peer     = buffer->buf_peer;
260         rpc->srpc_self     = buffer->buf_self;
261         rpc->srpc_replymdh = LNET_INVALID_HANDLE;
262 }
263
264 int
265 srpc_add_service (srpc_service_t *sv)
266 {
267         int                id = sv->sv_id;
268         int                i;
269         srpc_server_rpc_t *rpc;
270
271         LASSERT (sv->sv_concur > 0);
272         LASSERT (0 <= id && id <= SRPC_SERVICE_MAX_ID);
273
274         spin_lock(&srpc_data.rpc_glock);
275
276         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
277
278         if (srpc_data.rpc_services[id] != NULL) {
279                 spin_unlock(&srpc_data.rpc_glock);
280                 return -EBUSY;
281         }
282
283         srpc_data.rpc_services[id] = sv;
284         spin_unlock(&srpc_data.rpc_glock);
285
286         sv->sv_nprune       = 0;
287         sv->sv_nposted_msg  = 0;
288         sv->sv_shuttingdown = 0;
289         spin_lock_init(&sv->sv_lock);
290         CFS_INIT_LIST_HEAD(&sv->sv_free_rpcq);
291         CFS_INIT_LIST_HEAD(&sv->sv_active_rpcq);
292         CFS_INIT_LIST_HEAD(&sv->sv_posted_msgq);
293         CFS_INIT_LIST_HEAD(&sv->sv_blocked_msgq);
294
295         sv->sv_ev.ev_data = sv;
296         sv->sv_ev.ev_type = SRPC_REQUEST_RCVD;
297
298         for (i = 0; i < sv->sv_concur; i++) {
299                 LIBCFS_ALLOC(rpc, sizeof(*rpc));
300                 if (rpc == NULL) goto enomem;
301
302                 list_add(&rpc->srpc_list, &sv->sv_free_rpcq);
303         }
304
305         CDEBUG (D_NET, "Adding service: id %d, name %s, concurrency %d\n",
306                 id, sv->sv_name, sv->sv_concur);
307         return 0;
308
309 enomem:
310         while (!list_empty(&sv->sv_free_rpcq)) {
311                 rpc = list_entry(sv->sv_free_rpcq.next,
312                                  srpc_server_rpc_t, srpc_list);
313                 list_del(&rpc->srpc_list);
314                 LIBCFS_FREE(rpc, sizeof(*rpc));
315         }
316
317         spin_lock(&srpc_data.rpc_glock);
318         srpc_data.rpc_services[id] = NULL;
319         spin_unlock(&srpc_data.rpc_glock);
320         return -ENOMEM;
321 }
322
323 int
324 srpc_remove_service (srpc_service_t *sv)
325 {
326         int id = sv->sv_id;
327
328         spin_lock(&srpc_data.rpc_glock);
329
330         if (srpc_data.rpc_services[id] != sv) {
331                 spin_unlock(&srpc_data.rpc_glock);
332                 return -ENOENT;
333         }
334
335         srpc_data.rpc_services[id] = NULL;
336         spin_unlock(&srpc_data.rpc_glock);
337         return 0;
338 }
339
340 int
341 srpc_post_passive_rdma(int portal, __u64 matchbits, void *buf,
342                        int len, int options, lnet_process_id_t peer,
343                        lnet_handle_md_t *mdh, srpc_event_t *ev)
344 {
345         int              rc;
346         lnet_md_t        md;
347         lnet_handle_me_t meh;
348
349         rc = LNetMEAttach(portal, peer, matchbits, 0,
350                           LNET_UNLINK, LNET_INS_AFTER, &meh);
351         if (rc != 0) {
352                 CERROR ("LNetMEAttach failed: %d\n", rc);
353                 LASSERT (rc == -ENOMEM);
354                 return -ENOMEM;
355         }
356
357         md.threshold = 1;
358         md.user_ptr  = ev;
359         md.start     = buf;
360         md.length    = len;
361         md.options   = options;
362         md.eq_handle = srpc_data.rpc_lnet_eq;
363
364         rc = LNetMDAttach(meh, md, LNET_UNLINK, mdh);
365         if (rc != 0) {
366                 CERROR ("LNetMDAttach failed: %d\n", rc);
367                 LASSERT (rc == -ENOMEM);
368
369                 rc = LNetMEUnlink(meh);
370                 LASSERT (rc == 0);
371                 return -ENOMEM;
372         }
373
374         CDEBUG (D_NET,
375                 "Posted passive RDMA: peer %s, portal %d, matchbits "LPX64"\n",
376                 libcfs_id2str(peer), portal, matchbits);
377         return 0;
378 }
379
380 int
381 srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len, 
382                       int options, lnet_process_id_t peer, lnet_nid_t self,
383                       lnet_handle_md_t *mdh, srpc_event_t *ev)
384 {
385         int       rc;
386         lnet_md_t md;
387
388         md.user_ptr  = ev;
389         md.start     = buf;
390         md.length    = len;
391         md.eq_handle = srpc_data.rpc_lnet_eq;
392         md.threshold = ((options & LNET_MD_OP_GET) != 0) ? 2 : 1;
393         md.options   = options & ~(LNET_MD_OP_PUT | LNET_MD_OP_GET);
394
395         rc = LNetMDBind(md, LNET_UNLINK, mdh);
396         if (rc != 0) {
397                 CERROR ("LNetMDBind failed: %d\n", rc);
398                 LASSERT (rc == -ENOMEM);
399                 return -ENOMEM;
400         }
401
402         /* this is kind of an abuse of the LNET_MD_OP_{PUT,GET} options.
403          * they're only meaningful for MDs attached to an ME (i.e. passive
404          * buffers... */
405         if ((options & LNET_MD_OP_PUT) != 0) {
406                 rc = LNetPut(self, *mdh, LNET_NOACK_REQ, peer,
407                              portal, matchbits, 0, 0);
408         } else {
409                 LASSERT ((options & LNET_MD_OP_GET) != 0);
410
411                 rc = LNetGet(self, *mdh, peer, portal, matchbits, 0);
412         }
413
414         if (rc != 0) {
415                 CERROR ("LNet%s(%s, %d, "LPD64") failed: %d\n",
416                         ((options & LNET_MD_OP_PUT) != 0) ? "Put" : "Get",
417                         libcfs_id2str(peer), portal, matchbits, rc);
418
419                 /* The forthcoming unlink event will complete this operation
420                  * with failure, so fall through and return success here.
421                  */
422                 rc = LNetMDUnlink(*mdh);
423                 LASSERT (rc == 0);
424         } else {
425                 CDEBUG (D_NET,
426                         "Posted active RDMA: peer %s, portal %u, matchbits "LPX64"\n",
427                         libcfs_id2str(peer), portal, matchbits);
428         }
429         return 0;
430 }
431
432 int
433 srpc_post_active_rqtbuf(lnet_process_id_t peer, int service, void *buf,
434                         int len, lnet_handle_md_t *mdh, srpc_event_t *ev)
435 {
436         int rc;
437         int portal;
438
439         if (service > SRPC_FRAMEWORK_SERVICE_MAX_ID)
440                 portal = SRPC_REQUEST_PORTAL;
441         else
442                 portal = SRPC_FRAMEWORK_REQUEST_PORTAL;
443
444         rc = srpc_post_active_rdma(portal, service, buf, len, 
445                                    LNET_MD_OP_PUT, peer,
446                                    LNET_NID_ANY, mdh, ev);
447         return rc;
448 }
449
450 int
451 srpc_post_passive_rqtbuf(int service, void *buf, int len,
452                          lnet_handle_md_t *mdh, srpc_event_t *ev)
453 {
454         int               rc;
455         int               portal;
456         lnet_process_id_t any = {.nid = LNET_NID_ANY,
457                                  .pid = LNET_PID_ANY};
458
459         if (service > SRPC_FRAMEWORK_SERVICE_MAX_ID)
460                 portal = SRPC_REQUEST_PORTAL;
461         else
462                 portal = SRPC_FRAMEWORK_REQUEST_PORTAL;
463
464         rc = srpc_post_passive_rdma(portal, service, buf, len,
465                                     LNET_MD_OP_PUT, any, mdh, ev);
466         return rc;
467 }
468
469 int
470 srpc_service_post_buffer (srpc_service_t *sv, srpc_buffer_t *buf)
471 {
472         srpc_msg_t *msg = &buf->buf_msg;
473         int         rc;
474
475         LASSERT (!sv->sv_shuttingdown);
476
477         buf->buf_mdh = LNET_INVALID_HANDLE;
478         list_add(&buf->buf_list, &sv->sv_posted_msgq);
479         sv->sv_nposted_msg++;
480         spin_unlock(&sv->sv_lock);
481
482         rc = srpc_post_passive_rqtbuf(sv->sv_id, msg, sizeof(*msg),
483                                       &buf->buf_mdh, &sv->sv_ev);
484
485         /* At this point, a RPC (new or delayed) may have arrived in
486          * msg and its event handler has been called. So we must add
487          * buf to sv_posted_msgq _before_ dropping sv_lock */
488
489         spin_lock(&sv->sv_lock);
490
491         if (rc == 0) {
492                 if (sv->sv_shuttingdown) {
493                         spin_unlock(&sv->sv_lock);
494
495                         /* srpc_shutdown_service might have tried to unlink me
496                          * when my buf_mdh was still invalid */
497                         LNetMDUnlink(buf->buf_mdh);
498
499                         spin_lock(&sv->sv_lock);
500                 }
501                 return 0;
502         }
503
504         sv->sv_nposted_msg--;
505         if (sv->sv_shuttingdown) return rc;
506
507         list_del(&buf->buf_list);
508
509         spin_unlock(&sv->sv_lock);
510         LIBCFS_FREE(buf, sizeof(*buf));
511         spin_lock(&sv->sv_lock);
512         return rc; 
513 }
514
515 int
516 srpc_service_add_buffers (srpc_service_t *sv, int nbuffer)
517 {
518         int                rc;
519         int                posted;
520         srpc_buffer_t     *buf;
521
522         LASSERTF (nbuffer > 0,
523                   "nbuffer must be positive: %d\n", nbuffer);
524
525         for (posted = 0; posted < nbuffer; posted++) {
526                 LIBCFS_ALLOC(buf, sizeof(*buf));
527                 if (buf == NULL) break;
528
529                 spin_lock(&sv->sv_lock);
530                 rc = srpc_service_post_buffer(sv, buf);
531                 spin_unlock(&sv->sv_lock);
532
533                 if (rc != 0) break;
534         }
535
536         return posted;
537 }
538
539 void
540 srpc_service_remove_buffers (srpc_service_t *sv, int nbuffer)
541 {
542         LASSERTF (nbuffer > 0,
543                   "nbuffer must be positive: %d\n", nbuffer);
544
545         spin_lock(&sv->sv_lock);
546
547         LASSERT (sv->sv_nprune >= 0);
548         LASSERT (!sv->sv_shuttingdown);
549
550         sv->sv_nprune += nbuffer;
551
552         spin_unlock(&sv->sv_lock);
553         return;
554 }
555
556 /* returns 1 if sv has finished, otherwise 0 */
557 int
558 srpc_finish_service (srpc_service_t *sv)
559 {
560         srpc_server_rpc_t *rpc;
561         srpc_buffer_t     *buf;
562
563         spin_lock(&sv->sv_lock);
564
565         LASSERT (sv->sv_shuttingdown); /* srpc_shutdown_service called */
566
567         if (sv->sv_nposted_msg != 0 || !list_empty(&sv->sv_active_rpcq)) {
568                 CDEBUG (D_NET,
569                         "waiting for %d posted buffers to unlink and "
570                         "in-flight RPCs to die.\n",
571                         sv->sv_nposted_msg);
572
573                 if (!list_empty(&sv->sv_active_rpcq)) {
574                         rpc = list_entry(sv->sv_active_rpcq.next,
575                                          srpc_server_rpc_t, srpc_list);
576                         CDEBUG (D_NETERROR,
577                                 "Active RPC on shutdown: sv %s, peer %s, "
578                                 "wi %s scheduled %d running %d, "
579                                 "ev fired %d type %d status %d lnet %d\n",
580                                 sv->sv_name, libcfs_id2str(rpc->srpc_peer),
581                                 swi_state2str(rpc->srpc_wi.wi_state),
582                                 rpc->srpc_wi.wi_scheduled,
583                                 rpc->srpc_wi.wi_running,
584                                 rpc->srpc_ev.ev_fired,
585                                 rpc->srpc_ev.ev_type,
586                                 rpc->srpc_ev.ev_status,
587                                 rpc->srpc_ev.ev_lnet);
588                 }
589
590                 spin_unlock(&sv->sv_lock);
591                 return 0;
592         }
593
594         spin_unlock(&sv->sv_lock); /* no lock needed from now on */
595
596         for (;;) {
597                 struct list_head *q;
598
599                 if (!list_empty(&sv->sv_posted_msgq))
600                         q = &sv->sv_posted_msgq;
601                 else if (!list_empty(&sv->sv_blocked_msgq))
602                         q = &sv->sv_blocked_msgq;
603                 else
604                         break;
605
606                 buf = list_entry(q->next, srpc_buffer_t, buf_list);
607                 list_del(&buf->buf_list);
608
609                 LIBCFS_FREE(buf, sizeof(*buf));
610         }
611
612         while (!list_empty(&sv->sv_free_rpcq)) {
613                 rpc = list_entry(sv->sv_free_rpcq.next,
614                                  srpc_server_rpc_t, srpc_list);
615                 list_del(&rpc->srpc_list);
616                 LIBCFS_FREE(rpc, sizeof(*rpc));
617         }
618
619         return 1;
620 }
621
622 /* called with sv->sv_lock held */
623 void
624 srpc_service_recycle_buffer (srpc_service_t *sv, srpc_buffer_t *buf)
625 {
626         if (sv->sv_shuttingdown) goto free;
627
628         if (sv->sv_nprune == 0) {
629                 if (srpc_service_post_buffer(sv, buf) != 0)
630                         CWARN ("Failed to post %s buffer\n", sv->sv_name);
631                 return;
632         }
633
634         sv->sv_nprune--;
635 free:
636         spin_unlock(&sv->sv_lock);
637         LIBCFS_FREE(buf, sizeof(*buf));
638         spin_lock(&sv->sv_lock);
639 }
640
641 void
642 srpc_shutdown_service (srpc_service_t *sv)
643 {
644         srpc_server_rpc_t *rpc;
645         srpc_buffer_t     *buf;
646
647         spin_lock(&sv->sv_lock);
648
649         CDEBUG (D_NET, "Shutting down service: id %d, name %s\n",
650                 sv->sv_id, sv->sv_name);
651
652         sv->sv_shuttingdown = 1; /* i.e. no new active RPC */
653
654         /* schedule in-flight RPCs to notice the shutdown */
655         list_for_each_entry (rpc, &sv->sv_active_rpcq, srpc_list) {
656                 swi_schedule_workitem(&rpc->srpc_wi);
657         }
658
659         spin_unlock(&sv->sv_lock);
660
661         /* OK to traverse sv_posted_msgq without lock, since no one
662          * touches sv_posted_msgq now */
663         list_for_each_entry (buf, &sv->sv_posted_msgq, buf_list)
664                 LNetMDUnlink(buf->buf_mdh);
665
666         return;
667 }
668
669 int
670 srpc_send_request (srpc_client_rpc_t *rpc)
671 {
672         srpc_event_t *ev = &rpc->crpc_reqstev;
673         int           rc;
674
675         ev->ev_fired = 0;
676         ev->ev_data  = rpc;
677         ev->ev_type  = SRPC_REQUEST_SENT;
678
679         rc = srpc_post_active_rqtbuf(rpc->crpc_dest, rpc->crpc_service,
680                                      &rpc->crpc_reqstmsg, sizeof(srpc_msg_t),
681                                      &rpc->crpc_reqstmdh, ev);
682         if (rc != 0) {
683                 LASSERT (rc == -ENOMEM);
684                 ev->ev_fired = 1;  /* no more event expected */
685         }
686         return rc;
687 }
688
689 int
690 srpc_prepare_reply (srpc_client_rpc_t *rpc)
691 {
692         srpc_event_t *ev = &rpc->crpc_replyev;
693         __u64        *id = &rpc->crpc_reqstmsg.msg_body.reqst.rpyid;
694         int           rc;
695
696         ev->ev_fired = 0;
697         ev->ev_data  = rpc;
698         ev->ev_type  = SRPC_REPLY_RCVD;
699
700         *id = srpc_next_id();
701
702         rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, *id,
703                                     &rpc->crpc_replymsg, sizeof(srpc_msg_t),
704                                     LNET_MD_OP_PUT, rpc->crpc_dest,
705                                     &rpc->crpc_replymdh, ev);
706         if (rc != 0) {
707                 LASSERT (rc == -ENOMEM);
708                 ev->ev_fired = 1;  /* no more event expected */
709         }
710         return rc;
711 }
712
713 int
714 srpc_prepare_bulk (srpc_client_rpc_t *rpc)
715 {
716         srpc_bulk_t  *bk = &rpc->crpc_bulk;
717         srpc_event_t *ev = &rpc->crpc_bulkev;
718         __u64        *id = &rpc->crpc_reqstmsg.msg_body.reqst.bulkid;
719         int           rc;
720         int           opt;
721
722         LASSERT (bk->bk_niov <= LNET_MAX_IOV);
723
724         if (bk->bk_niov == 0) return 0; /* nothing to do */
725
726         opt = bk->bk_sink ? LNET_MD_OP_PUT : LNET_MD_OP_GET;
727 #ifdef __KERNEL__
728         opt |= LNET_MD_KIOV;
729 #else
730         opt |= LNET_MD_IOVEC;
731 #endif
732
733         ev->ev_fired = 0;
734         ev->ev_data  = rpc;
735         ev->ev_type  = SRPC_BULK_REQ_RCVD;
736
737         *id = srpc_next_id();
738
739         rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, *id,
740                                     &bk->bk_iovs[0], bk->bk_niov, opt,
741                                     rpc->crpc_dest, &bk->bk_mdh, ev);
742         if (rc != 0) {
743                 LASSERT (rc == -ENOMEM);
744                 ev->ev_fired = 1;  /* no more event expected */
745         }
746         return rc;
747 }
748
749 int
750 srpc_do_bulk (srpc_server_rpc_t *rpc)
751 {
752         srpc_event_t  *ev = &rpc->srpc_ev;
753         srpc_bulk_t   *bk = rpc->srpc_bulk;
754         __u64          id = rpc->srpc_reqstbuf->buf_msg.msg_body.reqst.bulkid;
755         int            rc;
756         int            opt;
757
758         LASSERT (bk != NULL);
759
760         opt = bk->bk_sink ? LNET_MD_OP_GET : LNET_MD_OP_PUT;
761 #ifdef __KERNEL__
762         opt |= LNET_MD_KIOV;
763 #else
764         opt |= LNET_MD_IOVEC;
765 #endif
766
767         ev->ev_fired = 0;
768         ev->ev_data  = rpc;
769         ev->ev_type  = bk->bk_sink ? SRPC_BULK_GET_RPLD : SRPC_BULK_PUT_SENT;
770
771         rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, id,
772                                    &bk->bk_iovs[0], bk->bk_niov, opt,
773                                    rpc->srpc_peer, rpc->srpc_self,
774                                    &bk->bk_mdh, ev);
775         if (rc != 0)
776                 ev->ev_fired = 1;  /* no more event expected */
777         return rc;
778 }
779
780 /* called with srpc_service_t::sv_lock held */
781 inline void
782 srpc_schedule_server_rpc (srpc_server_rpc_t *rpc)
783 {
784         srpc_service_t *sv = rpc->srpc_service;
785
786         if (sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID)
787                 swi_schedule_workitem(&rpc->srpc_wi);
788         else    /* framework RPCs are handled one by one */
789                 swi_schedule_serial_workitem(&rpc->srpc_wi);
790
791         return;
792 }
793
794 /* only called from srpc_handle_rpc */
795 void
796 srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status)
797 {
798         srpc_service_t *sv = rpc->srpc_service;
799         srpc_buffer_t  *buffer;
800
801         LASSERT (status != 0 || rpc->srpc_wi.wi_state == SWI_STATE_DONE);
802
803         rpc->srpc_status = status;
804
805         CDEBUG (status == 0 ? D_NET : D_NETERROR,
806                 "Server RPC done: service %s, peer %s, status %s:%d\n",
807                 sv->sv_name, libcfs_id2str(rpc->srpc_peer),
808                 swi_state2str(rpc->srpc_wi.wi_state), status);
809
810         if (status != 0) {
811                 spin_lock(&srpc_data.rpc_glock);
812                 srpc_data.rpc_counters.rpcs_dropped++;
813                 spin_unlock(&srpc_data.rpc_glock);
814         }
815
816         if (rpc->srpc_done != NULL)
817                 (*rpc->srpc_done) (rpc);
818         LASSERT (rpc->srpc_bulk == NULL);
819
820         spin_lock(&sv->sv_lock);
821
822         if (rpc->srpc_reqstbuf != NULL) {
823                 /* NB might drop sv_lock in srpc_service_recycle_buffer, but
824                  * sv won't go away for sv_active_rpcq must not be empty */
825                 srpc_service_recycle_buffer(sv, rpc->srpc_reqstbuf);
826                 rpc->srpc_reqstbuf = NULL;
827         }
828
829         list_del(&rpc->srpc_list); /* from sv->sv_active_rpcq */
830
831         /*
832          * No one can schedule me now since:
833          * - I'm not on sv_active_rpcq.
834          * - all LNet events have been fired.
835          * Cancel pending schedules and prevent future schedule attempts:
836          */
837         LASSERT (rpc->srpc_ev.ev_fired);
838         swi_kill_workitem(&rpc->srpc_wi);
839
840         if (!sv->sv_shuttingdown && !list_empty(&sv->sv_blocked_msgq)) {
841                 buffer = list_entry(sv->sv_blocked_msgq.next,
842                                     srpc_buffer_t, buf_list);
843                 list_del(&buffer->buf_list);
844
845                 srpc_init_server_rpc(rpc, sv, buffer);
846                 list_add_tail(&rpc->srpc_list, &sv->sv_active_rpcq);
847                 srpc_schedule_server_rpc(rpc);
848         } else {
849                 list_add(&rpc->srpc_list, &sv->sv_free_rpcq);
850         }
851
852         spin_unlock(&sv->sv_lock);
853         return;
854 }
855
856 /* handles an incoming RPC */
857 int
858 srpc_handle_rpc (swi_workitem_t *wi)
859 {
860         srpc_server_rpc_t *rpc = wi->wi_data;
861         srpc_service_t    *sv = rpc->srpc_service;
862         srpc_event_t      *ev = &rpc->srpc_ev;
863         int                rc = 0;
864
865         LASSERT (wi == &rpc->srpc_wi);
866
867         spin_lock(&sv->sv_lock);
868
869         if (sv->sv_shuttingdown) {
870                 spin_unlock(&sv->sv_lock);
871
872                 if (rpc->srpc_bulk != NULL)
873                         LNetMDUnlink(rpc->srpc_bulk->bk_mdh);
874                 LNetMDUnlink(rpc->srpc_replymdh);
875
876                 if (ev->ev_fired) { /* no more event, OK to finish */
877                         srpc_server_rpc_done(rpc, -ESHUTDOWN);
878                         return 1;
879                 }
880                 return 0;
881         }
882
883         spin_unlock(&sv->sv_lock);
884
885         switch (wi->wi_state) {
886         default:
887                 LBUG ();
888         case SWI_STATE_NEWBORN: {
889                 srpc_msg_t           *msg;
890                 srpc_generic_reply_t *reply;
891
892                 msg = &rpc->srpc_reqstbuf->buf_msg;
893                 reply = &rpc->srpc_replymsg.msg_body.reply;
894
895                 if (msg->msg_version != SRPC_MSG_VERSION &&
896                     msg->msg_version != __swab32(SRPC_MSG_VERSION)) {
897                         CWARN ("Version mismatch: %u, %u expected, from %s\n",
898                                msg->msg_version, SRPC_MSG_VERSION,
899                                libcfs_id2str(rpc->srpc_peer));
900                         reply->status = EPROTO;
901                 } else {
902                         reply->status = 0;
903                         rc = (*sv->sv_handler) (rpc);
904                         LASSERT (reply->status == 0 || !rpc->srpc_bulk);
905                 }
906
907                 if (rc != 0) {
908                         srpc_server_rpc_done(rpc, rc);
909                         return 1;
910                 }
911
912                 wi->wi_state = SWI_STATE_BULK_STARTED;
913
914                 if (rpc->srpc_bulk != NULL) {
915                         rc = srpc_do_bulk(rpc);
916                         if (rc == 0)
917                                 return 0; /* wait for bulk */
918
919                         LASSERT (ev->ev_fired);
920                         ev->ev_status = rc;
921                 }
922         }
923         case SWI_STATE_BULK_STARTED:
924                 LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired);
925
926                 if (rpc->srpc_bulk != NULL) {
927                         rc = ev->ev_status;
928
929                         if (sv->sv_bulk_ready != NULL)
930                                 rc = (*sv->sv_bulk_ready) (rpc, rc);
931
932                         if (rc != 0) {
933                                 srpc_server_rpc_done(rpc, rc);
934                                 return 1;
935                         }
936                 }
937
938                 wi->wi_state = SWI_STATE_REPLY_SUBMITTED;
939                 rc = srpc_send_reply(rpc);
940                 if (rc == 0)
941                         return 0; /* wait for reply */
942                 srpc_server_rpc_done(rpc, rc);
943                 return 1;
944
945         case SWI_STATE_REPLY_SUBMITTED:
946                 LASSERT (ev->ev_fired);
947
948                 wi->wi_state = SWI_STATE_DONE;
949                 srpc_server_rpc_done(rpc, ev->ev_status);
950                 return 1;
951         }
952
953         return 0;
954 }
955
956 void
957 srpc_client_rpc_expired (void *data)
958 {
959         srpc_client_rpc_t *rpc = data;
960
961         CWARN ("Client RPC expired: service %d, peer %s, timeout %d.\n",
962                rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
963                rpc->crpc_timeout);
964
965         spin_lock(&rpc->crpc_lock);
966
967         rpc->crpc_timeout = 0;
968         srpc_abort_rpc(rpc, -ETIMEDOUT);
969
970         spin_unlock(&rpc->crpc_lock);
971
972         spin_lock(&srpc_data.rpc_glock);
973         srpc_data.rpc_counters.rpcs_expired++;
974         spin_unlock(&srpc_data.rpc_glock);
975         return;
976 }
977
978 inline void
979 srpc_add_client_rpc_timer (srpc_client_rpc_t *rpc)
980 {
981         stt_timer_t *timer = &rpc->crpc_timer;
982
983         if (rpc->crpc_timeout == 0) return;
984
985         CFS_INIT_LIST_HEAD(&timer->stt_list);
986         timer->stt_data    = rpc;
987         timer->stt_func    = srpc_client_rpc_expired;
988         timer->stt_expires = cfs_time_add(rpc->crpc_timeout, 
989                                           cfs_time_current_sec());
990         stt_add_timer(timer);
991         return;
992 }
993
994 /* 
995  * Called with rpc->crpc_lock held.
996  *
997  * Upon exit the RPC expiry timer is not queued and the handler is not
998  * running on any CPU. */
999 void
1000 srpc_del_client_rpc_timer (srpc_client_rpc_t *rpc)
1001 {     
1002         /* timer not planted or already exploded */
1003         if (rpc->crpc_timeout == 0) return;
1004
1005         /* timer sucessfully defused */
1006         if (stt_del_timer(&rpc->crpc_timer)) return;
1007
1008 #ifdef __KERNEL__
1009         /* timer detonated, wait for it to explode */
1010         while (rpc->crpc_timeout != 0) {
1011                 spin_unlock(&rpc->crpc_lock);
1012
1013                 cfs_schedule(); 
1014
1015                 spin_lock(&rpc->crpc_lock);
1016         }
1017 #else
1018         LBUG(); /* impossible in single-threaded runtime */
1019 #endif
1020         return;
1021 }
1022
1023 void
1024 srpc_check_sends (srpc_peer_t *peer, int credits)
1025 {
1026         struct list_head  *q;
1027         srpc_client_rpc_t *rpc;
1028
1029         LASSERT (credits >= 0);
1030         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
1031
1032         spin_lock(&peer->stp_lock);
1033         peer->stp_credits += credits;
1034
1035         while (peer->stp_credits) {
1036                 if (!list_empty(&peer->stp_ctl_rpcq))
1037                         q = &peer->stp_ctl_rpcq;
1038                 else if (!list_empty(&peer->stp_rpcq))
1039                         q = &peer->stp_rpcq;
1040                 else
1041                         break;
1042
1043                 peer->stp_credits--;
1044
1045                 rpc = list_entry(q->next, srpc_client_rpc_t, crpc_privl);
1046                 list_del_init(&rpc->crpc_privl);
1047                 srpc_client_rpc_decref(rpc);  /* --ref for peer->*rpcq */
1048
1049                 swi_schedule_workitem(&rpc->crpc_wi);
1050         }
1051
1052         spin_unlock(&peer->stp_lock);
1053         return;
1054 }
1055
1056 void
1057 srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status)
1058 {
1059         swi_workitem_t *wi = &rpc->crpc_wi;
1060         srpc_peer_t    *peer = rpc->crpc_peer;
1061
1062         LASSERT (status != 0 || wi->wi_state == SWI_STATE_DONE);
1063
1064         spin_lock(&rpc->crpc_lock);
1065
1066         rpc->crpc_closed = 1;
1067         if (rpc->crpc_status == 0)
1068                 rpc->crpc_status = status;
1069
1070         srpc_del_client_rpc_timer(rpc);
1071
1072         CDEBUG ((status == 0) ? D_NET : D_NETERROR,
1073                 "Client RPC done: service %d, peer %s, status %s:%d:%d\n",
1074                 rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1075                 swi_state2str(wi->wi_state), rpc->crpc_aborted, status);
1076
1077         /*
1078          * No one can schedule me now since:
1079          * - RPC timer has been defused.
1080          * - all LNet events have been fired.
1081          * - crpc_closed has been set, preventing srpc_abort_rpc from 
1082          *   scheduling me.
1083          * Cancel pending schedules and prevent future schedule attempts:
1084          */
1085         LASSERT (!srpc_event_pending(rpc));
1086         swi_kill_workitem(wi);
1087
1088         spin_unlock(&rpc->crpc_lock);
1089
1090         (*rpc->crpc_done) (rpc);
1091
1092         if (peer != NULL)
1093                 srpc_check_sends(peer, 1);
1094         return;
1095 }
1096
1097 /* sends an outgoing RPC */
1098 int
1099 srpc_send_rpc (swi_workitem_t *wi)
1100 {
1101         int                rc = 0;
1102         srpc_client_rpc_t *rpc = wi->wi_data;
1103         srpc_msg_t        *reply = &rpc->crpc_replymsg;
1104         int                do_bulk = rpc->crpc_bulk.bk_niov > 0;
1105
1106         LASSERT (rpc != NULL);
1107         LASSERT (wi == &rpc->crpc_wi);
1108
1109         spin_lock(&rpc->crpc_lock);
1110
1111         if (rpc->crpc_aborted) {
1112                 spin_unlock(&rpc->crpc_lock);
1113                 goto abort;
1114         }
1115
1116         spin_unlock(&rpc->crpc_lock);
1117
1118         switch (wi->wi_state) {
1119         default:
1120                 LBUG ();
1121         case SWI_STATE_NEWBORN:
1122                 LASSERT (!srpc_event_pending(rpc));
1123
1124                 rc = srpc_prepare_reply(rpc);
1125                 if (rc != 0) {
1126                         srpc_client_rpc_done(rpc, rc);
1127                         return 1;
1128                 }
1129
1130                 rc = srpc_prepare_bulk(rpc);
1131                 if (rc != 0) break;
1132
1133                 wi->wi_state = SWI_STATE_REQUEST_SUBMITTED;
1134                 rc = srpc_send_request(rpc);
1135                 break;
1136
1137         case SWI_STATE_REQUEST_SUBMITTED:
1138                 /* CAVEAT EMPTOR: rqtev, rpyev, and bulkev may come in any
1139                  * order; however, they're processed in a strict order: 
1140                  * rqt, rpy, and bulk. */
1141                 if (!rpc->crpc_reqstev.ev_fired) break;
1142
1143                 rc = rpc->crpc_reqstev.ev_status;
1144                 if (rc != 0) break;
1145
1146                 wi->wi_state = SWI_STATE_REQUEST_SENT;
1147                 /* perhaps more events, fall thru */
1148         case SWI_STATE_REQUEST_SENT: {
1149                 srpc_msg_type_t type = srpc_service2reply(rpc->crpc_service);
1150
1151                 if (!rpc->crpc_replyev.ev_fired) break;
1152
1153                 rc = rpc->crpc_replyev.ev_status;
1154                 if (rc != 0) break;
1155
1156                 if ((reply->msg_type != type && 
1157                      reply->msg_type != __swab32(type)) ||
1158                     (reply->msg_magic != SRPC_MSG_MAGIC &&
1159                      reply->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
1160                         CWARN ("Bad message from %s: type %u (%d expected),"
1161                                " magic %u (%d expected).\n",
1162                                libcfs_id2str(rpc->crpc_dest),
1163                                reply->msg_type, type,
1164                                reply->msg_magic, SRPC_MSG_MAGIC);
1165                         rc = -EBADMSG;
1166                         break;
1167                 }
1168
1169                 if (do_bulk && reply->msg_body.reply.status != 0) {
1170                         CWARN ("Remote error %d at %s, unlink bulk buffer in "
1171                                "case peer didn't initiate bulk transfer\n",
1172                                reply->msg_body.reply.status,
1173                                libcfs_id2str(rpc->crpc_dest));
1174                         LNetMDUnlink(rpc->crpc_bulk.bk_mdh);
1175                 }
1176
1177                 wi->wi_state = SWI_STATE_REPLY_RECEIVED;
1178         }
1179         case SWI_STATE_REPLY_RECEIVED:
1180                 if (do_bulk && !rpc->crpc_bulkev.ev_fired) break;
1181
1182                 rc = do_bulk ? rpc->crpc_bulkev.ev_status : 0;
1183
1184                 /* Bulk buffer was unlinked due to remote error. Clear error
1185                  * since reply buffer still contains valid data.
1186                  * NB rpc->crpc_done shouldn't look into bulk data in case of
1187                  * remote error. */
1188                 if (do_bulk && rpc->crpc_bulkev.ev_lnet == LNET_EVENT_UNLINK &&
1189                     rpc->crpc_status == 0 && reply->msg_body.reply.status != 0)
1190                         rc = 0;
1191
1192                 wi->wi_state = SWI_STATE_DONE;
1193                 srpc_client_rpc_done(rpc, rc);
1194                 return 1;
1195         }
1196
1197         if (rc != 0) {
1198                 spin_lock(&rpc->crpc_lock);
1199                 srpc_abort_rpc(rpc, rc);
1200                 spin_unlock(&rpc->crpc_lock);
1201         }
1202
1203 abort:
1204         if (rpc->crpc_aborted) {
1205                 LNetMDUnlink(rpc->crpc_reqstmdh);
1206                 LNetMDUnlink(rpc->crpc_replymdh);
1207                 LNetMDUnlink(rpc->crpc_bulk.bk_mdh);
1208
1209                 if (!srpc_event_pending(rpc)) {
1210                         srpc_client_rpc_done(rpc, -EINTR);
1211                         return 1;
1212                 }
1213         }
1214         return 0;
1215 }
1216
1217 srpc_client_rpc_t *
1218 srpc_create_client_rpc (lnet_process_id_t peer, int service,
1219                         int nbulkiov, int bulklen,
1220                         void (*rpc_done)(srpc_client_rpc_t *),
1221                         void (*rpc_fini)(srpc_client_rpc_t *), void *priv)
1222 {
1223         srpc_client_rpc_t *rpc;
1224
1225         LIBCFS_ALLOC(rpc, offsetof(srpc_client_rpc_t,
1226                                    crpc_bulk.bk_iovs[nbulkiov]));
1227         if (rpc == NULL)
1228                 return NULL;
1229
1230         srpc_init_client_rpc(rpc, peer, service, nbulkiov,
1231                              bulklen, rpc_done, rpc_fini, priv);
1232         return rpc;
1233 }
1234
1235 /* called with rpc->crpc_lock held */
1236 static inline void
1237 srpc_queue_rpc (srpc_peer_t *peer, srpc_client_rpc_t *rpc)
1238 {
1239         int service = rpc->crpc_service;
1240
1241         LASSERT (peer->stp_nid == rpc->crpc_dest.nid);
1242         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
1243
1244         rpc->crpc_peer = peer;
1245
1246         spin_lock(&peer->stp_lock);
1247
1248         /* Framework RPCs that alter session state shall take precedence
1249          * over test RPCs and framework query RPCs */
1250         if (service <= SRPC_FRAMEWORK_SERVICE_MAX_ID &&
1251             service != SRPC_SERVICE_DEBUG &&
1252             service != SRPC_SERVICE_QUERY_STAT)
1253                 list_add_tail(&rpc->crpc_privl, &peer->stp_ctl_rpcq);
1254         else
1255                 list_add_tail(&rpc->crpc_privl, &peer->stp_rpcq);
1256
1257         srpc_client_rpc_addref(rpc); /* ++ref for peer->*rpcq */
1258         spin_unlock(&peer->stp_lock);
1259         return;
1260 }
1261
1262 /* called with rpc->crpc_lock held */
1263 void
1264 srpc_abort_rpc (srpc_client_rpc_t *rpc, int why)
1265 {
1266         srpc_peer_t *peer = rpc->crpc_peer;
1267
1268         LASSERT (why != 0);
1269
1270         if (rpc->crpc_aborted || /* already aborted */
1271             rpc->crpc_closed)    /* callback imminent */
1272                 return;
1273
1274         CDEBUG (D_NET,
1275                 "Aborting RPC: service %d, peer %s, state %s, why %d\n",
1276                 rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1277                 swi_state2str(rpc->crpc_wi.wi_state), why);
1278
1279         rpc->crpc_aborted = 1;
1280         rpc->crpc_status  = why;
1281
1282         if (peer != NULL) {
1283                 spin_lock(&peer->stp_lock);
1284
1285                 if (!list_empty(&rpc->crpc_privl)) { /* still queued */
1286                         list_del_init(&rpc->crpc_privl);
1287                         srpc_client_rpc_decref(rpc); /* --ref for peer->*rpcq */
1288                         rpc->crpc_peer = NULL;       /* no credit taken */
1289                 }
1290
1291                 spin_unlock(&peer->stp_lock);
1292         }
1293
1294         swi_schedule_workitem(&rpc->crpc_wi);
1295         return;
1296 }
1297
1298 /* called with rpc->crpc_lock held */
1299 void
1300 srpc_post_rpc (srpc_client_rpc_t *rpc)
1301 {
1302         srpc_peer_t *peer;
1303
1304         LASSERT (!rpc->crpc_aborted);
1305         LASSERT (rpc->crpc_peer == NULL);
1306         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
1307         LASSERT ((rpc->crpc_bulk.bk_len & ~CFS_PAGE_MASK) == 0);
1308
1309         CDEBUG (D_NET, "Posting RPC: peer %s, service %d, timeout %d\n",
1310                 libcfs_id2str(rpc->crpc_dest), rpc->crpc_service,
1311                 rpc->crpc_timeout);
1312
1313         srpc_add_client_rpc_timer(rpc);
1314
1315         peer = srpc_nid2peer(rpc->crpc_dest.nid);
1316         if (peer == NULL) {
1317                 srpc_abort_rpc(rpc, -ENOMEM);
1318                 return;
1319         }
1320
1321         srpc_queue_rpc(peer, rpc);
1322
1323         spin_unlock(&rpc->crpc_lock);
1324         srpc_check_sends(peer, 0);
1325         spin_lock(&rpc->crpc_lock);
1326         return;
1327 }
1328
1329
1330 int
1331 srpc_send_reply (srpc_server_rpc_t *rpc)
1332 {
1333         srpc_event_t   *ev = &rpc->srpc_ev;
1334         srpc_msg_t     *msg = &rpc->srpc_replymsg;
1335         srpc_buffer_t  *buffer = rpc->srpc_reqstbuf;
1336         srpc_service_t *sv = rpc->srpc_service;
1337         __u64           rpyid;
1338         int             rc;
1339
1340         LASSERT (buffer != NULL);
1341         rpyid = buffer->buf_msg.msg_body.reqst.rpyid;
1342
1343         spin_lock(&sv->sv_lock);
1344
1345         if (!sv->sv_shuttingdown &&
1346             sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID) {
1347                 /* Repost buffer before replying since test client
1348                  * might send me another RPC once it gets the reply */
1349                 if (srpc_service_post_buffer(sv, buffer) != 0)
1350                         CWARN ("Failed to repost %s buffer\n", sv->sv_name);
1351                 rpc->srpc_reqstbuf = NULL;
1352         }
1353
1354         spin_unlock(&sv->sv_lock);
1355
1356         ev->ev_fired = 0;
1357         ev->ev_data  = rpc;
1358         ev->ev_type  = SRPC_REPLY_SENT;
1359
1360         msg->msg_magic   = SRPC_MSG_MAGIC;
1361         msg->msg_version = SRPC_MSG_VERSION;
1362         msg->msg_type    = srpc_service2reply(sv->sv_id);
1363
1364         rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, rpyid, msg,
1365                                    sizeof(*msg), LNET_MD_OP_PUT,
1366                                    rpc->srpc_peer, rpc->srpc_self,
1367                                    &rpc->srpc_replymdh, ev);
1368         if (rc != 0)
1369                 ev->ev_fired = 1;  /* no more event expected */
1370         return rc;
1371 }
1372
1373 /* when in kernel always called with LNET_LOCK() held, and in thread context */
1374 void 
1375 srpc_lnet_ev_handler (lnet_event_t *ev)
1376 {
1377         srpc_event_t      *rpcev = ev->md.user_ptr;
1378         srpc_client_rpc_t *crpc;
1379         srpc_server_rpc_t *srpc;
1380         srpc_buffer_t     *buffer;
1381         srpc_service_t    *sv;
1382         srpc_msg_t        *msg;
1383         srpc_msg_type_t    type;
1384
1385         LASSERT (!in_interrupt());
1386
1387         if (ev->status != 0) {
1388                 spin_lock(&srpc_data.rpc_glock);
1389                 srpc_data.rpc_counters.errors++;
1390                 spin_unlock(&srpc_data.rpc_glock);
1391         }
1392
1393         rpcev->ev_lnet = ev->type;
1394
1395         switch (rpcev->ev_type) {
1396         default:
1397                 LBUG ();
1398         case SRPC_REQUEST_SENT:
1399                 if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {
1400                         spin_lock(&srpc_data.rpc_glock);
1401                         srpc_data.rpc_counters.rpcs_sent++;
1402                         spin_unlock(&srpc_data.rpc_glock);
1403                 }
1404         case SRPC_REPLY_RCVD:
1405         case SRPC_BULK_REQ_RCVD:
1406                 crpc = rpcev->ev_data;
1407
1408                 LASSERT (rpcev == &crpc->crpc_reqstev ||
1409                          rpcev == &crpc->crpc_replyev ||
1410                          rpcev == &crpc->crpc_bulkev);
1411
1412                 spin_lock(&crpc->crpc_lock);
1413
1414                 LASSERT (rpcev->ev_fired == 0);
1415                 rpcev->ev_fired  = 1;
1416                 rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? 
1417                                                 -EINTR : ev->status;
1418                 swi_schedule_workitem(&crpc->crpc_wi);
1419
1420                 spin_unlock(&crpc->crpc_lock);
1421                 break;
1422
1423         case SRPC_REQUEST_RCVD:
1424                 sv = rpcev->ev_data;
1425
1426                 LASSERT (rpcev == &sv->sv_ev);
1427
1428                 spin_lock(&sv->sv_lock);
1429
1430                 LASSERT (ev->unlinked);
1431                 LASSERT (ev->type == LNET_EVENT_PUT ||
1432                          ev->type == LNET_EVENT_UNLINK);
1433                 LASSERT (ev->type != LNET_EVENT_UNLINK ||
1434                          sv->sv_shuttingdown);
1435
1436                 buffer = container_of(ev->md.start, srpc_buffer_t, buf_msg);
1437                 buffer->buf_peer = ev->initiator;
1438                 buffer->buf_self = ev->target.nid;
1439
1440                 sv->sv_nposted_msg--;
1441                 LASSERT (sv->sv_nposted_msg >= 0);
1442
1443                 if (sv->sv_shuttingdown) {
1444                         /* Leave buffer on sv->sv_posted_msgq since 
1445                          * srpc_finish_service needs to traverse it. */
1446                         spin_unlock(&sv->sv_lock);
1447                         break;
1448                 }
1449
1450                 list_del(&buffer->buf_list); /* from sv->sv_posted_msgq */
1451                 msg = &buffer->buf_msg;
1452                 type = srpc_service2request(sv->sv_id);
1453
1454                 if (ev->status != 0 || ev->mlength != sizeof(*msg) ||
1455                     (msg->msg_type != type && 
1456                      msg->msg_type != __swab32(type)) ||
1457                     (msg->msg_magic != SRPC_MSG_MAGIC &&
1458                      msg->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
1459                         CERROR ("Dropping RPC (%s) from %s: "
1460                                 "status %d mlength %d type %u magic %u.\n",
1461                                 sv->sv_name, libcfs_id2str(ev->initiator),
1462                                 ev->status, ev->mlength,
1463                                 msg->msg_type, msg->msg_magic);
1464
1465                         /* NB might drop sv_lock in srpc_service_recycle_buffer,
1466                          * sv_nposted_msg++ as an implicit reference to prevent
1467                          * sv from disappearing under me */
1468                         sv->sv_nposted_msg++;
1469                         srpc_service_recycle_buffer(sv, buffer);
1470                         sv->sv_nposted_msg--;
1471                         spin_unlock(&sv->sv_lock);
1472
1473                         if (ev->status == 0) { /* status!=0 counted already */
1474                                 spin_lock(&srpc_data.rpc_glock);
1475                                 srpc_data.rpc_counters.errors++;
1476                                 spin_unlock(&srpc_data.rpc_glock);
1477                         }
1478                         break;
1479                 }
1480
1481                 if (!list_empty(&sv->sv_free_rpcq)) {
1482                         srpc = list_entry(sv->sv_free_rpcq.next,
1483                                           srpc_server_rpc_t, srpc_list);
1484                         list_del(&srpc->srpc_list);
1485
1486                         srpc_init_server_rpc(srpc, sv, buffer);
1487                         list_add_tail(&srpc->srpc_list, &sv->sv_active_rpcq);
1488                         srpc_schedule_server_rpc(srpc);
1489                 } else {
1490                         list_add_tail(&buffer->buf_list, &sv->sv_blocked_msgq);
1491                 }
1492
1493                 spin_unlock(&sv->sv_lock);
1494
1495                 spin_lock(&srpc_data.rpc_glock);
1496                 srpc_data.rpc_counters.rpcs_rcvd++;
1497                 spin_unlock(&srpc_data.rpc_glock);
1498                 break;
1499
1500         case SRPC_BULK_GET_RPLD:
1501                 LASSERT (ev->type == LNET_EVENT_SEND ||
1502                          ev->type == LNET_EVENT_REPLY ||
1503                          ev->type == LNET_EVENT_UNLINK);
1504
1505                 if (ev->type == LNET_EVENT_SEND && 
1506                     ev->status == 0 && !ev->unlinked)
1507                         break; /* wait for the final LNET_EVENT_REPLY */
1508
1509         case SRPC_BULK_PUT_SENT:
1510                 if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {
1511                         spin_lock(&srpc_data.rpc_glock);
1512
1513                         if (rpcev->ev_type == SRPC_BULK_GET_RPLD)
1514                                 srpc_data.rpc_counters.bulk_get += ev->mlength;
1515                         else
1516                                 srpc_data.rpc_counters.bulk_put += ev->mlength;
1517
1518                         spin_unlock(&srpc_data.rpc_glock);
1519                 }
1520         case SRPC_REPLY_SENT:
1521                 srpc = rpcev->ev_data;
1522                 sv = srpc->srpc_service;
1523
1524                 LASSERT (rpcev == &srpc->srpc_ev);
1525
1526                 spin_lock(&sv->sv_lock);
1527                 rpcev->ev_fired  = 1;
1528                 rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? 
1529                                                 -EINTR : ev->status;
1530                 srpc_schedule_server_rpc(srpc);
1531                 spin_unlock(&sv->sv_lock);
1532                 break;
1533         }
1534
1535         return;
1536 }
1537
1538 #ifndef __KERNEL__
1539
1540 int
1541 srpc_check_event (int timeout)
1542 {
1543         lnet_event_t ev;
1544         int          rc;
1545         int          i;
1546
1547         rc = LNetEQPoll(&srpc_data.rpc_lnet_eq, 1,
1548                         timeout * 1000, &ev, &i);
1549         if (rc == 0) return 0;
1550
1551         LASSERT (rc == -EOVERFLOW || rc == 1);
1552
1553         /* We can't affort to miss any events... */
1554         if (rc == -EOVERFLOW) {
1555                 CERROR ("Dropped an event!!!\n");
1556                 abort();
1557         }
1558
1559         srpc_lnet_ev_handler(&ev);
1560         return 1;
1561 }
1562
1563 #endif
1564
1565 int
1566 srpc_startup (void)
1567 {
1568         int i;
1569         int rc;
1570
1571 #ifndef __KERNEL__
1572         char *s;
1573
1574         s = getenv("SRPC_PEER_CREDITS");
1575         srpc_peer_credits = (s != NULL) ? atoi(s) : srpc_peer_credits;
1576 #endif
1577
1578         if (srpc_peer_credits <= 0) {
1579                 CERROR("Peer credits must be positive: %d\n", srpc_peer_credits);
1580                 return -EINVAL;
1581         }
1582
1583         memset(&srpc_data, 0, sizeof(struct smoketest_rpc));
1584         spin_lock_init(&srpc_data.rpc_glock);
1585
1586         /* 1 second pause to avoid timestamp reuse */
1587         cfs_pause(cfs_time_seconds(1));
1588         srpc_data.rpc_matchbits = ((__u64) cfs_time_current_sec()) << 48;
1589
1590         srpc_data.rpc_state = SRPC_STATE_NONE;
1591
1592         LIBCFS_ALLOC(srpc_data.rpc_peers,
1593                      sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);
1594         if (srpc_data.rpc_peers == NULL) {
1595                 CERROR ("Failed to alloc peer hash.\n");
1596                 return -ENOMEM;
1597         }
1598
1599         for (i = 0; i < SRPC_PEER_HASH_SIZE; i++)
1600                 CFS_INIT_LIST_HEAD(&srpc_data.rpc_peers[i]);
1601
1602 #ifdef __KERNEL__
1603         rc = LNetNIInit(LUSTRE_SRV_LNET_PID);
1604 #else
1605         if (the_lnet.ln_server_mode_flag)
1606                 rc = LNetNIInit(LUSTRE_SRV_LNET_PID);
1607         else
1608                 rc = LNetNIInit(getpid() | LNET_PID_USERFLAG);
1609 #endif
1610         if (rc < 0) {
1611                 CERROR ("LNetNIInit() has failed: %d\n", rc);
1612                 LIBCFS_FREE(srpc_data.rpc_peers,
1613                             sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);
1614                 return rc;
1615         }
1616
1617         srpc_data.rpc_state = SRPC_STATE_NI_INIT;
1618
1619         srpc_data.rpc_lnet_eq = LNET_EQ_NONE;
1620 #ifdef __KERNEL__
1621         rc = LNetEQAlloc(16, srpc_lnet_ev_handler, &srpc_data.rpc_lnet_eq);
1622 #else
1623         rc = LNetEQAlloc(10240, LNET_EQ_HANDLER_NONE, &srpc_data.rpc_lnet_eq);
1624 #endif
1625         if (rc != 0) {
1626                 CERROR("LNetEQAlloc() has failed: %d\n", rc);
1627                 goto bail;
1628         }
1629
1630         rc = LNetSetLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
1631         LASSERT (rc == 0);
1632
1633         srpc_data.rpc_state = SRPC_STATE_EQ_INIT;
1634
1635         rc = swi_startup();
1636         if (rc != 0)
1637                 goto bail;
1638
1639         srpc_data.rpc_state = SRPC_STATE_WI_INIT;
1640
1641         rc = stt_startup();
1642
1643 bail:
1644         if (rc != 0)
1645                 srpc_shutdown();
1646         else
1647                 srpc_data.rpc_state = SRPC_STATE_RUNNING;
1648
1649         return rc;
1650 }
1651
1652 void
1653 srpc_shutdown (void)
1654 {
1655         int i;
1656         int rc;
1657         int state;
1658
1659         state = srpc_data.rpc_state;
1660         srpc_data.rpc_state = SRPC_STATE_STOPPING;
1661
1662         switch (state) {
1663         default:
1664                 LBUG ();
1665         case SRPC_STATE_RUNNING:
1666                 spin_lock(&srpc_data.rpc_glock);
1667
1668                 for (i = 0; i <= SRPC_SERVICE_MAX_ID; i++) {
1669                         srpc_service_t *sv = srpc_data.rpc_services[i];
1670
1671                         LASSERTF (sv == NULL,
1672                                   "service not empty: id %d, name %s\n",
1673                                   i, sv->sv_name);
1674                 }
1675
1676                 spin_unlock(&srpc_data.rpc_glock);
1677
1678                 stt_shutdown();
1679
1680         case SRPC_STATE_WI_INIT:
1681                 swi_shutdown();
1682
1683         case SRPC_STATE_EQ_INIT:
1684                 rc = LNetClearLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
1685                 LASSERT (rc == 0);
1686                 rc = LNetEQFree(srpc_data.rpc_lnet_eq);
1687                 LASSERT (rc == 0); /* the EQ should have no user by now */
1688
1689         case SRPC_STATE_NI_INIT:
1690                 LNetNIFini();
1691                 break;
1692         }
1693
1694         /* srpc_peer_t's are kept in hash until shutdown */
1695         for (i = 0; i < SRPC_PEER_HASH_SIZE; i++) {
1696                 srpc_peer_t *peer;
1697
1698                 while (!list_empty(&srpc_data.rpc_peers[i])) {
1699                         peer = list_entry(srpc_data.rpc_peers[i].next,
1700                                           srpc_peer_t, stp_list);
1701                         list_del(&peer->stp_list);
1702
1703                         LASSERT (list_empty(&peer->stp_rpcq));
1704                         LASSERT (list_empty(&peer->stp_ctl_rpcq));
1705                         LASSERT (peer->stp_credits == srpc_peer_credits);
1706
1707                         LIBCFS_FREE(peer, sizeof(srpc_peer_t));
1708                 }
1709         }
1710
1711         LIBCFS_FREE(srpc_data.rpc_peers,
1712                     sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);
1713         return;
1714 }