Whamcloud - gitweb
Fix compiler warnings.
[fs/lustre-release.git] / lustre / ptlrpc / client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  *
24  */
25
26 #define DEBUG_SUBSYSTEM S_RPC
27 #ifndef __KERNEL__
28 #include <errno.h>
29 #include <signal.h>
30 #include <liblustre.h>
31 #endif
32
33 #include <obd_support.h>
34 #include <obd_class.h>
35 #include <lustre_lib.h>
36 #include <lustre_ha.h>
37 #include <lustre_import.h>
38 #include <lustre_req_layout.h>
39
40 #include "ptlrpc_internal.h"
41
42 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
43                         struct ptlrpc_client *cl)
44 {
45         cl->cli_request_portal = req_portal;
46         cl->cli_reply_portal   = rep_portal;
47         cl->cli_name           = name;
48 }
49
50 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
51 {
52         struct ptlrpc_connection *c;
53         lnet_nid_t                self;
54         lnet_process_id_t         peer;
55         int                       err;
56
57         err = ptlrpc_uuid_to_peer(uuid, &peer, &self);
58         if (err != 0) {
59                 CERROR("cannot find peer %s!\n", uuid->uuid);
60                 return NULL;
61         }
62
63         c = ptlrpc_get_connection(peer, self, uuid);
64         if (c) {
65                 memcpy(c->c_remote_uuid.uuid,
66                        uuid->uuid, sizeof(c->c_remote_uuid.uuid));
67         }
68
69         CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
70
71         return c;
72 }
73
74 void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,
75                                  struct obd_uuid *uuid)
76 {
77         lnet_nid_t        self;
78         lnet_process_id_t peer;
79         int               err;
80
81         err = ptlrpc_uuid_to_peer(uuid, &peer, &self);
82         if (err != 0) {
83                 CERROR("cannot find peer %s!\n", uuid->uuid);
84                 return;
85         }
86
87         conn->c_peer = peer;
88         conn->c_self = self;
89         return;
90 }
91
92 static inline struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal)
93 {
94         struct ptlrpc_bulk_desc *desc;
95
96         OBD_ALLOC(desc, offsetof (struct ptlrpc_bulk_desc, bd_iov[npages]));
97         if (!desc)
98                 return NULL;
99
100         spin_lock_init(&desc->bd_lock);
101         cfs_waitq_init(&desc->bd_waitq);
102         desc->bd_max_iov = npages;
103         desc->bd_iov_count = 0;
104         desc->bd_md_h = LNET_INVALID_HANDLE;
105         desc->bd_portal = portal;
106         desc->bd_type = type;
107
108         return desc;
109 }
110
111 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
112                                                int npages, int type, int portal)
113 {
114         struct obd_import *imp = req->rq_import;
115         struct ptlrpc_bulk_desc *desc;
116
117         ENTRY;
118         LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
119         desc = new_bulk(npages, type, portal);
120         if (desc == NULL)
121                 RETURN(NULL);
122
123         desc->bd_import_generation = req->rq_import_generation;
124         desc->bd_import = class_import_get(imp);
125         desc->bd_req = req;
126
127         desc->bd_cbid.cbid_fn  = client_bulk_callback;
128         desc->bd_cbid.cbid_arg = desc;
129
130         /* This makes req own desc, and free it when she frees herself */
131         req->rq_bulk = desc;
132
133         return desc;
134 }
135
136 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp(struct ptlrpc_request *req,
137                                               int npages, int type, int portal)
138 {
139         struct obd_export *exp = req->rq_export;
140         struct ptlrpc_bulk_desc *desc;
141
142         ENTRY;
143         LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
144
145         desc = new_bulk(npages, type, portal);
146         if (desc == NULL)
147                 RETURN(NULL);
148
149         desc->bd_export = class_export_get(exp);
150         desc->bd_req = req;
151
152         desc->bd_cbid.cbid_fn  = server_bulk_callback;
153         desc->bd_cbid.cbid_arg = desc;
154
155         /* NB we don't assign rq_bulk here; server-side requests are
156          * re-used, and the handler frees the bulk desc explicitly. */
157
158         return desc;
159 }
160
161 void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
162                            cfs_page_t *page, int pageoffset, int len)
163 {
164         LASSERT(desc->bd_iov_count < desc->bd_max_iov);
165         LASSERT(page != NULL);
166         LASSERT(pageoffset >= 0);
167         LASSERT(len > 0);
168         LASSERT(pageoffset + len <= CFS_PAGE_SIZE);
169
170         desc->bd_nob += len;
171
172         ptlrpc_add_bulk_page(desc, page, pageoffset, len);
173 }
174
175 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
176 {
177         ENTRY;
178
179         LASSERT(desc != NULL);
180         LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
181         LASSERT(!desc->bd_network_rw);         /* network hands off or */
182         LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
183
184         sptlrpc_enc_pool_put_pages(desc);
185
186         if (desc->bd_export)
187                 class_export_put(desc->bd_export);
188         else
189                 class_import_put(desc->bd_import);
190
191         OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc,
192                                 bd_iov[desc->bd_max_iov]));
193         EXIT;
194 }
195
196 /* Set server timelimit for this req */
197 void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req)
198 {
199         __u32 serv_est;
200         int idx;
201         struct imp_at *at;
202
203         LASSERT(req->rq_import);
204
205         if (AT_OFF) {
206                 /* non-AT settings */
207                 req->rq_timeout = req->rq_import->imp_server_timeout ?
208                         obd_timeout / 2 : obd_timeout;
209                 lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);
210                 return;
211         }
212
213         at = &req->rq_import->imp_at;
214         idx = import_at_get_index(req->rq_import,
215                                   req->rq_request_portal);
216         serv_est = at_get(&at->iat_service_estimate[idx]);
217         /* add an arbitrary minimum: 125% +5 sec */
218         req->rq_timeout = serv_est + (serv_est >> 2) + 5;
219         /* We could get even fancier here, using history to predict increased
220            loading... */
221
222         /* Let the server know what this RPC timeout is by putting it in the
223            reqmsg*/
224         lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);
225 }
226
227 /* Adjust max service estimate based on server value */
228 static void ptlrpc_at_adj_service(struct ptlrpc_request *req)
229 {
230         int idx;
231         unsigned int serv_est, oldse;
232         struct imp_at *at = &req->rq_import->imp_at;
233
234         LASSERT(req->rq_import);
235
236         /* service estimate is returned in the repmsg timeout field,
237            may be 0 on err */
238         serv_est = lustre_msg_get_timeout(req->rq_repmsg);
239
240         idx = import_at_get_index(req->rq_import, req->rq_request_portal);
241         /* max service estimates are tracked on the server side,
242            so just keep minimal history here */
243         oldse = at_add(&at->iat_service_estimate[idx], serv_est);
244         if (oldse != 0)
245                 CDEBUG(D_ADAPTTO, "The RPC service estimate for %s ptl %d "
246                        "has changed from %d to %d\n",
247                        req->rq_import->imp_obd->obd_name,req->rq_request_portal,
248                        oldse, at_get(&at->iat_service_estimate[idx]));
249 }
250
251 /* Expected network latency per remote node (secs) */
252 int ptlrpc_at_get_net_latency(struct ptlrpc_request *req)
253 {
254         return AT_OFF ? 0 : at_get(&req->rq_import->imp_at.iat_net_latency);
255 }
256
257 /* Adjust expected network latency */
258 static void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req)
259 {
260         unsigned int st, nl, oldnl;
261         struct imp_at *at = &req->rq_import->imp_at;
262         time_t now = cfs_time_current_sec();
263
264         LASSERT(req->rq_import);
265
266         st = lustre_msg_get_service_time(req->rq_repmsg);
267
268         /* Network latency is total time less server processing time */
269         nl = max_t(int, now - req->rq_sent - st, 0) + 1/*st rounding*/;
270         if (st > now - req->rq_sent + 2 /* rounding */)
271                 CERROR("Reported service time %u > total measured time %ld\n",
272                        st, now - req->rq_sent);
273
274         oldnl = at_add(&at->iat_net_latency, nl);
275         if (oldnl != 0)
276                 CDEBUG(D_ADAPTTO, "The network latency for %s (nid %s) "
277                        "has changed from %d to %d\n",
278                        req->rq_import->imp_obd->obd_name,
279                        obd_uuid2str(
280                                &req->rq_import->imp_connection->c_remote_uuid),
281                        oldnl, at_get(&at->iat_net_latency));
282 }
283
284 static int unpack_reply(struct ptlrpc_request *req)
285 {
286         int rc;
287
288         /* Clear reply swab mask; we may have already swabbed an early reply */
289         req->rq_rep_swab_mask = 0;
290
291         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
292         if (rc) {
293                 DEBUG_REQ(D_ERROR, req, "unpack_rep failed: %d", rc);
294                 return(-EPROTO);
295         }
296
297         rc = lustre_unpack_rep_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF);
298         if (rc) {
299                 DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: %d", rc);
300                 return(-EPROTO);
301         }
302         return 0;
303 }
304
305 /*
306  * Handle an early reply message, called with the rq_lock held.
307  * If anything goes wrong just ignore it - same as if it never happened
308  */
309 static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req) {
310         time_t          olddl;
311         int             rc;
312         ENTRY;
313
314         req->rq_early = 0;
315         spin_unlock(&req->rq_lock);
316
317         rc = sptlrpc_cli_unwrap_early_reply(req);
318         if (rc)
319                 GOTO(out, rc);
320
321         rc = unpack_reply(req);
322         if (rc)
323                 GOTO(out_cleanup, rc);
324
325         /* Expecting to increase the service time estimate here */
326         ptlrpc_at_adj_service(req);
327         ptlrpc_at_adj_net_latency(req);
328
329         /* Adjust the local timeout for this req */
330         ptlrpc_at_set_req_timeout(req);
331
332         olddl = req->rq_deadline;
333         /* server assumes it now has rq_timeout from when it sent the
334            early reply, so client should give it at least that long. */
335         req->rq_deadline = cfs_time_current_sec() + req->rq_timeout +
336                     ptlrpc_at_get_net_latency(req);
337
338         DEBUG_REQ(D_ADAPTTO, req,
339                   "Early reply #%d, new deadline in %lds (%+lds)",
340                   req->rq_early_count, req->rq_deadline -
341                   cfs_time_current_sec(), req->rq_deadline - olddl);
342
343 out_cleanup:
344         sptlrpc_cli_finish_early_reply(req);
345 out:
346         spin_lock(&req->rq_lock);
347         RETURN(rc);
348 }
349
350 void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool)
351 {
352         struct list_head *l, *tmp;
353         struct ptlrpc_request *req;
354
355         if (!pool)
356                 return;
357
358         list_for_each_safe(l, tmp, &pool->prp_req_list) {
359                 req = list_entry(l, struct ptlrpc_request, rq_list);
360                 list_del(&req->rq_list);
361                 LASSERT(req->rq_reqbuf);
362                 LASSERT(req->rq_reqbuf_len == pool->prp_rq_size);
363                 OBD_FREE(req->rq_reqbuf, pool->prp_rq_size);
364                 OBD_FREE(req, sizeof(*req));
365         }
366         OBD_FREE(pool, sizeof(*pool));
367 }
368
369 void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq)
370 {
371         int i;
372         int size = 1;
373
374         while (size < pool->prp_rq_size + SPTLRPC_MAX_PAYLOAD)
375                 size <<= 1;
376
377         LASSERTF(list_empty(&pool->prp_req_list) || size == pool->prp_rq_size,
378                  "Trying to change pool size with nonempty pool "
379                  "from %d to %d bytes\n", pool->prp_rq_size, size);
380
381         spin_lock(&pool->prp_lock);
382         pool->prp_rq_size = size;
383         for (i = 0; i < num_rq; i++) {
384                 struct ptlrpc_request *req;
385                 struct lustre_msg *msg;
386
387                 spin_unlock(&pool->prp_lock);
388                 OBD_ALLOC(req, sizeof(struct ptlrpc_request));
389                 if (!req)
390                         return;
391                 OBD_ALLOC_GFP(msg, size, CFS_ALLOC_STD);
392                 if (!msg) {
393                         OBD_FREE(req, sizeof(struct ptlrpc_request));
394                         return;
395                 }
396                 req->rq_reqbuf = msg;
397                 req->rq_reqbuf_len = size;
398                 req->rq_pool = pool;
399                 spin_lock(&pool->prp_lock);
400                 list_add_tail(&req->rq_list, &pool->prp_req_list);
401         }
402         spin_unlock(&pool->prp_lock);
403         return;
404 }
405
406 struct ptlrpc_request_pool *ptlrpc_init_rq_pool(int num_rq, int msgsize,
407                                                 void (*populate_pool)(struct ptlrpc_request_pool *, int))
408 {
409         struct ptlrpc_request_pool *pool;
410
411         OBD_ALLOC(pool, sizeof (struct ptlrpc_request_pool));
412         if (!pool)
413                 return NULL;
414
415         /* Request next power of two for the allocation, because internally
416            kernel would do exactly this */
417
418         spin_lock_init(&pool->prp_lock);
419         CFS_INIT_LIST_HEAD(&pool->prp_req_list);
420         pool->prp_rq_size = msgsize;
421         pool->prp_populate = populate_pool;
422
423         populate_pool(pool, num_rq);
424
425         if (list_empty(&pool->prp_req_list)) {
426                 /* have not allocated a single request for the pool */
427                 OBD_FREE(pool, sizeof (struct ptlrpc_request_pool));
428                 pool = NULL;
429         }
430         return pool;
431 }
432
433 static struct ptlrpc_request *ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool)
434 {
435         struct ptlrpc_request *request;
436         struct lustre_msg *reqbuf;
437
438         if (!pool)
439                 return NULL;
440
441         spin_lock(&pool->prp_lock);
442
443         /* See if we have anything in a pool, and bail out if nothing,
444          * in writeout path, where this matters, this is safe to do, because
445          * nothing is lost in this case, and when some in-flight requests
446          * complete, this code will be called again. */
447         if (unlikely(list_empty(&pool->prp_req_list))) {
448                 spin_unlock(&pool->prp_lock);
449                 return NULL;
450         }
451
452         request = list_entry(pool->prp_req_list.next, struct ptlrpc_request,
453                              rq_list);
454         list_del(&request->rq_list);
455         spin_unlock(&pool->prp_lock);
456
457         LASSERT(request->rq_reqbuf);
458         LASSERT(request->rq_pool);
459
460         reqbuf = request->rq_reqbuf;
461         memset(request, 0, sizeof(*request));
462         request->rq_reqbuf = reqbuf;
463         request->rq_reqbuf_len = pool->prp_rq_size;
464         request->rq_pool = pool;
465         return request;
466 }
467
468 static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request)
469 {
470         struct ptlrpc_request_pool *pool = request->rq_pool;
471
472         spin_lock(&pool->prp_lock);
473         LASSERT(list_empty(&request->rq_list));
474         list_add_tail(&request->rq_list, &pool->prp_req_list);
475         spin_unlock(&pool->prp_lock);
476 }
477
478 static int __ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
479                                       __u32 version, int opcode,
480                                       int count, __u32 *lengths, char **bufs,
481                                       struct ptlrpc_cli_ctx *ctx)
482 {
483         struct obd_import  *imp = request->rq_import;
484         int                 rc;
485         ENTRY;
486
487         if (unlikely(ctx))
488                 request->rq_cli_ctx = sptlrpc_cli_ctx_get(ctx);
489         else {
490                 rc = sptlrpc_req_get_ctx(request);
491                 if (rc)
492                         GOTO(out_free, rc);
493         }
494
495         sptlrpc_req_set_flavor(request, opcode);
496
497         rc = lustre_pack_request(request, imp->imp_msg_magic, count,
498                                  lengths, bufs);
499         if (rc) {
500                 LASSERT(!request->rq_pool);
501                 GOTO(out_ctx, rc);
502         }
503
504         lustre_msg_add_version(request->rq_reqmsg, version);
505         request->rq_send_state = LUSTRE_IMP_FULL;
506         request->rq_type = PTL_RPC_MSG_REQUEST;
507         request->rq_export = NULL;
508
509         request->rq_req_cbid.cbid_fn  = request_out_callback;
510         request->rq_req_cbid.cbid_arg = request;
511
512         request->rq_reply_cbid.cbid_fn  = reply_in_callback;
513         request->rq_reply_cbid.cbid_arg = request;
514
515         request->rq_phase = RQ_PHASE_NEW;
516
517         request->rq_request_portal = imp->imp_client->cli_request_portal;
518         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
519
520         ptlrpc_at_set_req_timeout(request);
521
522         spin_lock_init(&request->rq_lock);
523         CFS_INIT_LIST_HEAD(&request->rq_list);
524         CFS_INIT_LIST_HEAD(&request->rq_timed_list);
525         CFS_INIT_LIST_HEAD(&request->rq_replay_list);
526         CFS_INIT_LIST_HEAD(&request->rq_mod_list);
527         CFS_INIT_LIST_HEAD(&request->rq_ctx_chain);
528         CFS_INIT_LIST_HEAD(&request->rq_set_chain);
529         CFS_INIT_LIST_HEAD(&request->rq_history_list);
530         cfs_waitq_init(&request->rq_reply_waitq);
531         request->rq_xid = ptlrpc_next_xid();
532         atomic_set(&request->rq_refcount, 1);
533
534         lustre_msg_set_opc(request->rq_reqmsg, opcode);
535
536         RETURN(0);
537 out_ctx:
538         sptlrpc_cli_ctx_put(request->rq_cli_ctx, 1);
539 out_free:
540         class_import_put(imp);
541         return rc;
542 }
543
544 int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
545                              __u32 version, int opcode, char **bufs,
546                              struct ptlrpc_cli_ctx *ctx)
547 {
548         int count;
549
550         count = req_capsule_filled_sizes(&request->rq_pill, RCL_CLIENT);
551         return __ptlrpc_request_bufs_pack(request, version, opcode, count,
552                                           request->rq_pill.rc_area[RCL_CLIENT],
553                                           bufs, ctx);
554 }
555 EXPORT_SYMBOL(ptlrpc_request_bufs_pack);
556
557 int ptlrpc_request_pack(struct ptlrpc_request *request,
558                         __u32 version, int opcode) 
559 {
560         return ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL);
561 }
562
563 static inline
564 struct ptlrpc_request *__ptlrpc_request_alloc(struct obd_import *imp,
565                                               struct ptlrpc_request_pool *pool)
566 {
567         struct ptlrpc_request *request = NULL;
568
569         if (pool)
570                 request = ptlrpc_prep_req_from_pool(pool);
571
572         if (!request)
573                 OBD_ALLOC_PTR(request);
574
575         if (request) {
576                 LASSERT((unsigned long)imp > 0x1000);
577                 LASSERT(imp != LP_POISON);
578                 LASSERT((unsigned long)imp->imp_client > 0x1000);
579                 LASSERT(imp->imp_client != LP_POISON);
580
581                 request->rq_import = class_import_get(imp);
582         } else {
583                 CERROR("request allocation out of memory\n");
584         }
585
586         return request;
587 }
588
589 static struct ptlrpc_request *
590 ptlrpc_request_alloc_internal(struct obd_import *imp,
591                               struct ptlrpc_request_pool * pool,
592                               const struct req_format *format)
593 {
594         struct ptlrpc_request *request;
595
596         request = __ptlrpc_request_alloc(imp, pool);
597         if (request == NULL)
598                 return NULL;
599
600         req_capsule_init(&request->rq_pill, request, RCL_CLIENT);
601         req_capsule_set(&request->rq_pill, format);
602         return request;
603 }
604
605 struct ptlrpc_request *ptlrpc_request_alloc(struct obd_import *imp,
606                                             const struct req_format *format)
607 {
608         return ptlrpc_request_alloc_internal(imp, NULL, format);
609 }
610
611 struct ptlrpc_request *ptlrpc_request_alloc_pool(struct obd_import *imp,
612                                             struct ptlrpc_request_pool * pool,
613                                             const struct req_format *format)
614 {
615         return ptlrpc_request_alloc_internal(imp, pool, format);
616 }
617
618 void ptlrpc_request_free(struct ptlrpc_request *request)
619 {
620         if (request->rq_pool)
621                 __ptlrpc_free_req_to_pool(request);
622         else
623                 OBD_FREE_PTR(request);
624 }
625
626 struct ptlrpc_request *ptlrpc_request_alloc_pack(struct obd_import *imp,
627                                                 const struct req_format *format,
628                                                 __u32 version, int opcode)
629 {
630         struct ptlrpc_request *req = ptlrpc_request_alloc(imp, format);
631         int                    rc;
632
633         if (req) {
634                 rc = ptlrpc_request_pack(req, version, opcode);
635                 if (rc) {
636                         ptlrpc_request_free(req);
637                         req = NULL;
638                 }
639         }
640         return req;
641 }
642
643 struct ptlrpc_request *
644 ptlrpc_prep_req_pool(struct obd_import *imp,
645                      __u32 version, int opcode,
646                      int count, __u32 *lengths, char **bufs,
647                      struct ptlrpc_request_pool *pool)
648 {
649         struct ptlrpc_request *request;
650         int                    rc;
651
652         request = __ptlrpc_request_alloc(imp, pool);
653         if (!request)
654                 return NULL;
655
656         rc = __ptlrpc_request_bufs_pack(request, version, opcode, count,
657                                         lengths, bufs, NULL);
658         if (rc) {
659                 ptlrpc_request_free(request);
660                 request = NULL;
661         }
662         return request;
663 }
664
665 struct ptlrpc_request *
666 ptlrpc_prep_req(struct obd_import *imp, __u32 version, int opcode, int count,
667                 __u32 *lengths, char **bufs)
668 {
669         return ptlrpc_prep_req_pool(imp, version, opcode, count, lengths, bufs,
670                                     NULL);
671 }
672
673 struct ptlrpc_request_set *ptlrpc_prep_set(void)
674 {
675         struct ptlrpc_request_set *set;
676
677         ENTRY;
678         OBD_ALLOC(set, sizeof *set);
679         if (!set)
680                 RETURN(NULL);
681         CFS_INIT_LIST_HEAD(&set->set_requests);
682         cfs_waitq_init(&set->set_waitq);
683         set->set_remaining = 0;
684         spin_lock_init(&set->set_new_req_lock);
685         CFS_INIT_LIST_HEAD(&set->set_new_requests);
686         CFS_INIT_LIST_HEAD(&set->set_cblist);
687         
688         RETURN(set);
689 }
690
691 /* Finish with this set; opposite of prep_set. */
692 void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
693 {
694         struct list_head *tmp;
695         struct list_head *next;
696         int               expected_phase;
697         int               n = 0;
698         ENTRY;
699
700         /* Requests on the set should either all be completed, or all be new */
701         expected_phase = (set->set_remaining == 0) ?
702                          RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
703         list_for_each (tmp, &set->set_requests) {
704                 struct ptlrpc_request *req =
705                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
706
707                 LASSERT(req->rq_phase == expected_phase);
708                 n++;
709         }
710
711         LASSERT(set->set_remaining == 0 || set->set_remaining == n);
712
713         list_for_each_safe(tmp, next, &set->set_requests) {
714                 struct ptlrpc_request *req =
715                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
716                 list_del_init(&req->rq_set_chain);
717
718                 LASSERT(req->rq_phase == expected_phase);
719
720                 if (req->rq_phase == RQ_PHASE_NEW) {
721
722                         if (req->rq_interpret_reply != NULL) {
723                                 int (*interpreter)(struct ptlrpc_request *,
724                                                    void *, int) =
725                                         req->rq_interpret_reply;
726
727                                 /* higher level (i.e. LOV) failed;
728                                  * let the sub reqs clean up */
729                                 req->rq_status = -EBADR;
730                                 interpreter(req, &req->rq_async_args,
731                                             req->rq_status);
732                         }
733                         set->set_remaining--;
734                 }
735
736                 req->rq_set = NULL;
737                 ptlrpc_req_finished (req);
738         }
739
740         LASSERT(set->set_remaining == 0);
741
742         OBD_FREE(set, sizeof(*set));
743         EXIT;
744 }
745
746 int ptlrpc_set_add_cb(struct ptlrpc_request_set *set,
747                       set_interpreter_func fn, void *data)
748 {
749         struct ptlrpc_set_cbdata *cbdata;
750
751         OBD_ALLOC_PTR(cbdata);
752         if (cbdata == NULL)
753                 RETURN(-ENOMEM);
754
755         cbdata->psc_interpret = fn;
756         cbdata->psc_data = data;
757         list_add_tail(&cbdata->psc_item, &set->set_cblist);
758
759         RETURN(0);
760 }
761
762 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
763                         struct ptlrpc_request *req)
764 {
765         /* The set takes over the caller's request reference */
766         list_add_tail(&req->rq_set_chain, &set->set_requests);
767         req->rq_set = set;
768         set->set_remaining++;
769
770         atomic_inc(&req->rq_import->imp_inflight);
771 }
772
773 /* lock so many callers can add things, the context that owns the set
774  * is supposed to notice these and move them into the set proper. */
775 void ptlrpc_set_add_new_req(struct ptlrpc_request_set *set,
776                             struct ptlrpc_request *req)
777 {
778         spin_lock(&set->set_new_req_lock);
779         /* The set takes over the caller's request reference */
780         list_add_tail(&req->rq_set_chain, &set->set_new_requests);
781         req->rq_set = set;
782         spin_unlock(&set->set_new_req_lock);
783 }
784
785 /*
786  * Based on the current state of the import, determine if the request
787  * can be sent, is an error, or should be delayed.
788  *
789  * Returns true if this request should be delayed. If false, and
790  * *status is set, then the request can not be sent and *status is the
791  * error code.  If false and status is 0, then request can be sent.
792  *
793  * The imp->imp_lock must be held.
794  */
795 static int ptlrpc_import_delay_req(struct obd_import *imp,
796                                    struct ptlrpc_request *req, int *status)
797 {
798         int delay = 0;
799         ENTRY;
800
801         LASSERT (status != NULL);
802         *status = 0;
803
804         if (req->rq_ctx_init || req->rq_ctx_fini) {
805                 /* always allow ctx init/fini rpc go through */
806         } else if (imp->imp_state == LUSTRE_IMP_NEW) {
807                 DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
808                 *status = -EIO;
809                 LBUG();
810         } else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
811                 DEBUG_REQ(D_ERROR, req, "IMP_CLOSED ");
812                 *status = -EIO;
813         } else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
814                    imp->imp_state == LUSTRE_IMP_CONNECTING) {
815                 /* allow CONNECT even if import is invalid */ ;
816                 if (atomic_read(&imp->imp_inval_count) != 0) {
817                         DEBUG_REQ(D_ERROR, req, "invalidate in flight");
818                         *status = -EIO;
819                 }
820         } else if ((imp->imp_invalid && (!imp->imp_recon_bk)) ||
821                                          imp->imp_obd->obd_no_recov) {
822                 /* If the import has been invalidated (such as by an OST
823                  * failure), and if the import(MGC) tried all of its connection
824                  * list (Bug 13464), the request must fail with -ESHUTDOWN.
825                  * This indicates the requests should be discarded; an -EIO
826                  * may result in a resend of the request. */
827                 if (!imp->imp_deactive)
828                           DEBUG_REQ(D_ERROR, req, "IMP_INVALID");
829                 *status = -ESHUTDOWN; /* bz 12940 */
830         } else if (req->rq_import_generation != imp->imp_generation) {
831                 DEBUG_REQ(D_ERROR, req, "req wrong generation:");
832                 *status = -EIO;
833         } else if (req->rq_send_state != imp->imp_state) {
834                 /* invalidate in progress - any requests should be drop */
835                 if (atomic_read(&imp->imp_inval_count) != 0) {
836                         DEBUG_REQ(D_ERROR, req, "invalidate in flight");
837                         *status = -EIO;
838                 } else if (imp->imp_dlm_fake || req->rq_no_delay) {
839                         *status = -EWOULDBLOCK;
840                 } else {
841                         delay = 1;
842                 }
843         }
844
845         RETURN(delay);
846 }
847
848 static int ptlrpc_check_reply(struct ptlrpc_request *req)
849 {
850         int rc = 0;
851         ENTRY;
852
853         /* serialise with network callback */
854         spin_lock(&req->rq_lock);
855
856         if (req->rq_replied)
857                 GOTO(out, rc = 1);
858
859         if (req->rq_net_err && !req->rq_timedout) {
860                 spin_unlock(&req->rq_lock);
861                 rc = ptlrpc_expire_one_request(req);
862                 spin_lock(&req->rq_lock);
863                 GOTO(out, rc);
864         }
865
866         if (req->rq_err)
867                 GOTO(out, rc = 1);
868
869         if (req->rq_resend)
870                 GOTO(out, rc = 1);
871
872         if (req->rq_restart)
873                 GOTO(out, rc = 1);
874
875         if (req->rq_early) {
876                 ptlrpc_at_recv_early_reply(req);
877                 GOTO(out, rc = 0); /* keep waiting */
878         }
879
880         EXIT;
881  out:
882         spin_unlock(&req->rq_lock);
883         DEBUG_REQ(D_NET, req, "rc = %d for", rc);
884         return rc;
885 }
886
887 static int ptlrpc_check_status(struct ptlrpc_request *req)
888 {
889         int err;
890         ENTRY;
891
892         err = lustre_msg_get_status(req->rq_repmsg);
893         if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
894                 struct obd_import *imp = req->rq_import;
895                 __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
896                 LCONSOLE_ERROR_MSG(0x011,"an error occurred while communicating"
897                                 " with %s. The %s operation failed with %d\n",
898                                 libcfs_nid2str(imp->imp_connection->c_peer.nid),
899                                 ll_opcode2str(opc), err);
900                 RETURN(err < 0 ? err : -EINVAL);
901         }
902
903         if (err < 0) {
904                 DEBUG_REQ(D_INFO, req, "status is %d", err);
905         } else if (err > 0) {
906                 /* XXX: translate this error from net to host */
907                 DEBUG_REQ(D_INFO, req, "status is %d", err);
908         }
909
910         RETURN(err);
911 }
912
913 /**
914  * Callback function called when client receives RPC reply for \a req.
915  */
916 static int after_reply(struct ptlrpc_request *req)
917 {
918         struct obd_import *imp = req->rq_import;
919         struct obd_device *obd = req->rq_import->imp_obd;
920         int rc;
921         struct timeval work_start;
922         long timediff;
923         ENTRY;
924
925         LASSERT(!req->rq_receiving_reply);
926         LASSERT(obd);
927         LASSERT(req->rq_nob_received <= req->rq_repbuf_len);
928
929         /*
930          * NB Until this point, the whole of the incoming message,
931          * including buflens, status etc is in the sender's byte order. 
932          */
933
934         rc = sptlrpc_cli_unwrap_reply(req);
935         if (rc) {
936                 DEBUG_REQ(D_ERROR, req, "unwrap reply failed (%d):", rc);
937                 RETURN(rc);
938         }
939
940         /*
941          * Security layer unwrap might ask resend this request. 
942          */
943         if (req->rq_resend)
944                 RETURN(0);
945
946         rc = unpack_reply(req);
947         if (rc)
948                 RETURN(rc);
949
950         do_gettimeofday(&work_start);
951         timediff = cfs_timeval_sub(&work_start, &req->rq_arrival_time, NULL);
952         if (obd->obd_svc_stats != NULL)
953                 lprocfs_counter_add(obd->obd_svc_stats, PTLRPC_REQWAIT_CNTR,
954                                     timediff);
955
956         if (lustre_msg_get_type(req->rq_repmsg) != PTL_RPC_MSG_REPLY &&
957             lustre_msg_get_type(req->rq_repmsg) != PTL_RPC_MSG_ERR) {
958                 DEBUG_REQ(D_ERROR, req, "invalid packet received (type=%u)",
959                           lustre_msg_get_type(req->rq_repmsg));
960                 RETURN(-EPROTO);
961         }
962
963         OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_PAUSE_REP, obd_fail_val);
964         ptlrpc_at_adj_service(req);
965         ptlrpc_at_adj_net_latency(req);
966
967         rc = ptlrpc_check_status(req);
968         imp->imp_connect_error = rc;
969
970         if (rc) {
971                 /*
972                  * Either we've been evicted, or the server has failed for
973                  * some reason. Try to reconnect, and if that fails, punt to
974                  * the upcall. 
975                  */
976                 if (ll_rpc_recoverable_error(rc)) {
977                         if (req->rq_send_state != LUSTRE_IMP_FULL ||
978                             imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
979                                 RETURN(rc);
980                         }
981                         ptlrpc_request_handle_notconn(req);
982                         RETURN(rc);
983                 }
984         } else {
985                 /*
986                  * Let's look if server sent slv. Do it only for RPC with 
987                  * rc == 0. 
988                  */
989                 ldlm_cli_update_pool(req);
990         }
991
992         /*
993          * Store transno in reqmsg for replay. 
994          */
995         req->rq_transno = lustre_msg_get_transno(req->rq_repmsg);
996         lustre_msg_set_transno(req->rq_reqmsg, req->rq_transno);
997
998         if (req->rq_import->imp_replayable) {
999                 spin_lock(&imp->imp_lock);
1000                 /*
1001                  * No point in adding already-committed requests to the replay
1002                  * list, we will just remove them immediately. b=9829 
1003                  */
1004                 if (req->rq_transno != 0 && 
1005                     (req->rq_transno > 
1006                      lustre_msg_get_last_committed(req->rq_repmsg) ||
1007                      req->rq_replay))
1008                         ptlrpc_retain_replayable_request(req, imp);
1009                 else if (req->rq_commit_cb != NULL) {
1010                         spin_unlock(&imp->imp_lock);
1011                         req->rq_commit_cb(req);
1012                         spin_lock(&imp->imp_lock);
1013                 }
1014
1015                 /*
1016                  * Replay-enabled imports return commit-status information. 
1017                  */
1018                 if (lustre_msg_get_last_committed(req->rq_repmsg)) {
1019                         imp->imp_peer_committed_transno =
1020                                 lustre_msg_get_last_committed(req->rq_repmsg);
1021                 }
1022                 ptlrpc_free_committed(imp);
1023                 spin_unlock(&imp->imp_lock);
1024         }
1025
1026         RETURN(rc);
1027 }
1028
1029 static int ptlrpc_send_new_req(struct ptlrpc_request *req)
1030 {
1031         struct obd_import     *imp;
1032         int rc;
1033         ENTRY;
1034
1035         LASSERT(req->rq_phase == RQ_PHASE_NEW);
1036         if (req->rq_sent && (req->rq_sent > cfs_time_current_sec()))
1037                 RETURN (0);
1038         
1039         req->rq_phase = RQ_PHASE_RPC;
1040
1041         imp = req->rq_import;
1042         spin_lock(&imp->imp_lock);
1043
1044         req->rq_import_generation = imp->imp_generation;
1045
1046         if (ptlrpc_import_delay_req(imp, req, &rc)) {
1047                 spin_lock (&req->rq_lock);
1048                 req->rq_waiting = 1;
1049                 spin_unlock (&req->rq_lock);
1050
1051                 DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: "
1052                           "(%s != %s)",
1053                           lustre_msg_get_status(req->rq_reqmsg) ,
1054                           ptlrpc_import_state_name(req->rq_send_state),
1055                           ptlrpc_import_state_name(imp->imp_state));
1056                 LASSERT(list_empty (&req->rq_list));
1057
1058                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1059                 spin_unlock(&imp->imp_lock);
1060                 RETURN(0);
1061         }
1062
1063         if (rc != 0) {
1064                 spin_unlock(&imp->imp_lock);
1065                 req->rq_status = rc;
1066                 req->rq_phase = RQ_PHASE_INTERPRET;
1067                 RETURN(rc);
1068         }
1069
1070         /* XXX this is the same as ptlrpc_queue_wait */
1071         LASSERT(list_empty(&req->rq_list));
1072         list_add_tail(&req->rq_list, &imp->imp_sending_list);
1073         spin_unlock(&imp->imp_lock);
1074
1075         lustre_msg_set_status(req->rq_reqmsg, cfs_curproc_pid());
1076
1077         rc = sptlrpc_req_refresh_ctx(req, -1);
1078         if (rc) {
1079                 if (req->rq_err) {
1080                         req->rq_status = rc;
1081                         RETURN(1);
1082                 } else {
1083                         /* here begins timeout counting */
1084                         req->rq_sent = cfs_time_current_sec();
1085                         req->rq_wait_ctx = 1;
1086                         RETURN(0);
1087                 }
1088         }
1089
1090         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:nid:opc"
1091                " %s:%s:%d:"LPU64":%s:%d\n", cfs_curproc_comm(),
1092                imp->imp_obd->obd_uuid.uuid,
1093                lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
1094                libcfs_nid2str(imp->imp_connection->c_peer.nid),
1095                lustre_msg_get_opc(req->rq_reqmsg));
1096
1097         rc = ptl_send_rpc(req, 0);
1098         if (rc) {
1099                 DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
1100                 req->rq_net_err = 1;
1101                 RETURN(rc);
1102         }
1103         RETURN(0);
1104 }
1105
1106 /* this sends any unsent RPCs in @set and returns TRUE if all are sent */
1107 int ptlrpc_check_set(struct ptlrpc_request_set *set)
1108 {
1109         struct list_head *tmp;
1110         int force_timer_recalc = 0;
1111         ENTRY;
1112
1113         if (set->set_remaining == 0)
1114                 RETURN(1);
1115
1116         list_for_each(tmp, &set->set_requests) {
1117                 struct ptlrpc_request *req =
1118                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1119                 struct obd_import *imp = req->rq_import;
1120                 int rc = 0;
1121
1122                 if (req->rq_phase == RQ_PHASE_NEW &&
1123                     ptlrpc_send_new_req(req)) {
1124                         force_timer_recalc = 1;
1125                 }
1126                 /* delayed send - skip */
1127                 if (req->rq_phase == RQ_PHASE_NEW && req->rq_sent)
1128                         continue;
1129
1130                 if (!(req->rq_phase == RQ_PHASE_RPC ||
1131                       req->rq_phase == RQ_PHASE_BULK ||
1132                       req->rq_phase == RQ_PHASE_INTERPRET ||
1133                       req->rq_phase == RQ_PHASE_COMPLETE)) {
1134                         DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
1135                         LBUG();
1136                 }
1137
1138                 if (req->rq_phase == RQ_PHASE_COMPLETE)
1139                         continue;
1140
1141                 if (req->rq_phase == RQ_PHASE_INTERPRET)
1142                         GOTO(interpret, req->rq_status);
1143
1144                 if (req->rq_net_err && !req->rq_timedout)
1145                         ptlrpc_expire_one_request(req);
1146
1147                 if (req->rq_err) {
1148                         ptlrpc_unregister_reply(req);
1149                         req->rq_replied = 0;
1150                         if (req->rq_status == 0)
1151                                 req->rq_status = -EIO;
1152                         req->rq_phase = RQ_PHASE_INTERPRET;
1153
1154                         spin_lock(&imp->imp_lock);
1155                         list_del_init(&req->rq_list);
1156                         spin_unlock(&imp->imp_lock);
1157
1158                         GOTO(interpret, req->rq_status);
1159                 }
1160
1161                 /* ptlrpc_queue_wait->l_wait_event guarantees that rq_intr
1162                  * will only be set after rq_timedout, but the oig waiting
1163                  * path sets rq_intr irrespective of whether ptlrpcd has
1164                  * seen a timeout.  our policy is to only interpret
1165                  * interrupted rpcs after they have timed out */
1166                 if (req->rq_intr && (req->rq_timedout || req->rq_waiting ||
1167                                      req->rq_wait_ctx)) {
1168                         /* NB could be on delayed list */
1169                         ptlrpc_unregister_reply(req);
1170                         req->rq_status = -EINTR;
1171                         req->rq_phase = RQ_PHASE_INTERPRET;
1172
1173                         spin_lock(&imp->imp_lock);
1174                         list_del_init(&req->rq_list);
1175                         spin_unlock(&imp->imp_lock);
1176
1177                         GOTO(interpret, req->rq_status);
1178                 }
1179
1180                 if (req->rq_phase == RQ_PHASE_RPC) {
1181                         if (req->rq_timedout || req->rq_resend ||
1182                             req->rq_waiting || req->rq_wait_ctx) {
1183                                 int status;
1184
1185                                 /* rq_wait_ctx is only touched in ptlrpcd,
1186                                  * no lock needed here.
1187                                  */
1188                                 if (req->rq_wait_ctx)
1189                                         goto check_ctx;
1190
1191                                 ptlrpc_unregister_reply(req);
1192
1193                                 spin_lock(&imp->imp_lock);
1194
1195                                 if (ptlrpc_import_delay_req(imp, req, &status)){
1196                                         spin_unlock(&imp->imp_lock);
1197                                         continue;
1198                                 }
1199
1200                                 list_del_init(&req->rq_list);
1201                                 if (status != 0)  {
1202                                         req->rq_status = status;
1203                                         req->rq_phase = RQ_PHASE_INTERPRET;
1204                                         spin_unlock(&imp->imp_lock);
1205                                         GOTO(interpret, req->rq_status);
1206                                 }
1207                                 if (req->rq_no_resend && !req->rq_wait_ctx) {
1208                                         req->rq_status = -ENOTCONN;
1209                                         req->rq_phase = RQ_PHASE_INTERPRET;
1210                                         spin_unlock(&imp->imp_lock);
1211                                         GOTO(interpret, req->rq_status);
1212                                 }
1213                                 list_add_tail(&req->rq_list,
1214                                               &imp->imp_sending_list);
1215
1216                                 spin_unlock(&imp->imp_lock);
1217
1218                                 req->rq_waiting = 0;
1219                                 if (req->rq_resend) {
1220                                         lustre_msg_add_flags(req->rq_reqmsg,
1221                                                              MSG_RESENT);
1222                                         if (req->rq_bulk) {
1223                                                 __u64 old_xid = req->rq_xid;
1224
1225                                                 ptlrpc_unregister_bulk (req);
1226
1227                                                 /* ensure previous bulk fails */
1228                                                 req->rq_xid = ptlrpc_next_xid();
1229                                                 CDEBUG(D_HA, "resend bulk "
1230                                                        "old x"LPU64
1231                                                        " new x"LPU64"\n",
1232                                                        old_xid, req->rq_xid);
1233                                         }
1234                                 }
1235 check_ctx:
1236                                 status = sptlrpc_req_refresh_ctx(req, -1);
1237                                 if (status) {
1238                                         if (req->rq_err) {
1239                                                 req->rq_status = status;
1240                                                 force_timer_recalc = 1;
1241                                         }
1242                                         if (!req->rq_wait_ctx) {
1243                                                 /* begins timeout counting */
1244                                                 req->rq_sent = cfs_time_current_sec();
1245                                                 req->rq_wait_ctx = 1;
1246                                         }
1247                                         continue;
1248                                 } else {
1249                                         req->rq_sent = 0;
1250                                         req->rq_wait_ctx = 0;
1251                                 }
1252
1253                                 rc = ptl_send_rpc(req, 0);
1254                                 if (rc) {
1255                                         DEBUG_REQ(D_HA, req, "send failed (%d)",
1256                                                   rc);
1257                                         force_timer_recalc = 1;
1258                                         req->rq_net_err = 1;
1259                                 }
1260                                 /* need to reset the timeout */
1261                                 force_timer_recalc = 1;
1262                         }
1263
1264                         spin_lock(&req->rq_lock);
1265
1266                         if (req->rq_early) {
1267                                 ptlrpc_at_recv_early_reply(req);
1268                                 spin_unlock(&req->rq_lock);
1269                                 continue;
1270                         }
1271
1272                         /* Still waiting for a reply? */
1273                         if (req->rq_receiving_reply) {
1274                                 spin_unlock(&req->rq_lock);
1275                                 continue;
1276                         }
1277
1278                         /* Did we actually receive a reply? */
1279                         if (!req->rq_replied) {
1280                                 spin_unlock(&req->rq_lock);
1281                                 continue;
1282                         }
1283
1284                         spin_unlock(&req->rq_lock);
1285
1286                         spin_lock(&imp->imp_lock);
1287                         list_del_init(&req->rq_list);
1288                         spin_unlock(&imp->imp_lock);
1289
1290                         req->rq_status = after_reply(req);
1291                         if (req->rq_resend) {
1292                                 /* Add this req to the delayed list so
1293                                    it can be errored if the import is
1294                                    evicted after recovery. */
1295                                 spin_lock(&imp->imp_lock);
1296                                 list_add_tail(&req->rq_list,
1297                                               &imp->imp_delayed_list);
1298                                 spin_unlock(&imp->imp_lock);
1299                                 continue;
1300                         }
1301
1302                         /* If there is no bulk associated with this request,
1303                          * then we're done and should let the interpreter
1304                          * process the reply.  Similarly if the RPC returned
1305                          * an error, and therefore the bulk will never arrive.
1306                          */
1307                         if (req->rq_bulk == NULL || req->rq_status != 0) {
1308                                 req->rq_phase = RQ_PHASE_INTERPRET;
1309                                 GOTO(interpret, req->rq_status);
1310                         }
1311
1312                         req->rq_phase = RQ_PHASE_BULK;
1313                 }
1314
1315                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
1316                 if (ptlrpc_bulk_active(req->rq_bulk))
1317                         continue;
1318
1319                 if (!req->rq_bulk->bd_success) {
1320                         /* The RPC reply arrived OK, but the bulk screwed
1321                          * up!  Dead wierd since the server told us the RPC
1322                          * was good after getting the REPLY for her GET or
1323                          * the ACK for her PUT. */
1324                         DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
1325                         LBUG();
1326                 }
1327
1328                 req->rq_phase = RQ_PHASE_INTERPRET;
1329
1330         interpret:
1331                 LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
1332                 LASSERT(!req->rq_receiving_reply);
1333
1334                 ptlrpc_unregister_reply(req);
1335                 if (req->rq_bulk != NULL)
1336                         ptlrpc_unregister_bulk (req);
1337
1338                 if (req->rq_interpret_reply != NULL) {
1339                         int (*interpreter)(struct ptlrpc_request *,void *,int) =
1340                                 req->rq_interpret_reply;
1341                         req->rq_status = interpreter(req, &req->rq_async_args,
1342                                                      req->rq_status);
1343                 }
1344                 req->rq_phase = RQ_PHASE_COMPLETE;
1345
1346                 CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:nid:"
1347                        "opc %s:%s:%d:"LPU64":%s:%d\n", cfs_curproc_comm(),
1348                        imp->imp_obd->obd_uuid.uuid,
1349                        lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
1350                        libcfs_nid2str(imp->imp_connection->c_peer.nid),
1351                        lustre_msg_get_opc(req->rq_reqmsg));
1352
1353                 atomic_dec(&imp->imp_inflight);
1354                 set->set_remaining--;
1355                 cfs_waitq_signal(&imp->imp_recovery_waitq);
1356         }
1357
1358         /* If we hit an error, we want to recover promptly. */
1359         RETURN(set->set_remaining == 0 || force_timer_recalc);
1360 }
1361
1362 /* Return 1 if we should give up, else 0 */
1363 int ptlrpc_expire_one_request(struct ptlrpc_request *req)
1364 {
1365         struct obd_import *imp = req->rq_import;
1366         int rc = 0;
1367         ENTRY;
1368
1369         DEBUG_REQ(D_ERROR|D_NETERROR, req,
1370                   "%s (sent at %lu, "CFS_DURATION_T"s ago)",
1371                   req->rq_net_err ? "network error" : "timeout",
1372                   (long)req->rq_sent, cfs_time_current_sec() - req->rq_sent);
1373
1374         if (imp) {
1375                 LCONSOLE_WARN("Request x"LPU64" sent from %s to NID %s %lus ago"
1376                               " has timed out (limit %lus).\n", req->rq_xid,
1377                               req->rq_import->imp_obd->obd_name,
1378                               libcfs_nid2str(imp->imp_connection->c_peer.nid),
1379                               cfs_time_current_sec() - req->rq_sent,
1380                               req->rq_deadline - req->rq_sent);
1381         }
1382
1383         if (imp != NULL && obd_debug_peer_on_timeout)
1384                 LNetCtl(IOC_LIBCFS_DEBUG_PEER, &imp->imp_connection->c_peer);
1385
1386         spin_lock(&req->rq_lock);
1387         req->rq_timedout = 1;
1388         req->rq_wait_ctx = 0;
1389         spin_unlock(&req->rq_lock);
1390
1391         ptlrpc_unregister_reply (req);
1392
1393         if (obd_dump_on_timeout)
1394                 libcfs_debug_dumplog();
1395
1396         if (req->rq_bulk != NULL)
1397                 ptlrpc_unregister_bulk (req);
1398
1399         if (imp == NULL) {
1400                 DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
1401                 RETURN(1);
1402         }
1403
1404         /* The DLM server doesn't want recovery run on its imports. */
1405         if (imp->imp_dlm_fake)
1406                 RETURN(1);
1407
1408         /* If this request is for recovery or other primordial tasks,
1409          * then error it out here. */
1410         if (req->rq_ctx_init || req->rq_ctx_fini ||
1411             req->rq_send_state != LUSTRE_IMP_FULL ||
1412             imp->imp_obd->obd_no_recov) {
1413                 DEBUG_REQ(D_RPCTRACE, req, "err -110, sent_state=%s (now=%s)",
1414                           ptlrpc_import_state_name(req->rq_send_state),
1415                           ptlrpc_import_state_name(imp->imp_state));
1416                 spin_lock(&req->rq_lock);
1417                 req->rq_status = -ETIMEDOUT;
1418                 req->rq_err = 1;
1419                 spin_unlock(&req->rq_lock);
1420                 RETURN(1);
1421         }
1422         
1423         /* if a request can't be resent we can't wait for an answer after
1424            the timeout */
1425         if (req->rq_no_resend) {
1426                 DEBUG_REQ(D_RPCTRACE, req, "TIMEOUT-NORESEND:");
1427                 rc = 1;
1428         }
1429
1430         ptlrpc_fail_import(imp, lustre_msg_get_conn_cnt(req->rq_reqmsg));
1431
1432         RETURN(rc);
1433 }
1434
1435 int ptlrpc_expired_set(void *data)
1436 {
1437         struct ptlrpc_request_set *set = data;
1438         struct list_head          *tmp;
1439         time_t                     now = cfs_time_current_sec();
1440         ENTRY;
1441
1442         LASSERT(set != NULL);
1443
1444         /* A timeout expired; see which reqs it applies to... */
1445         list_for_each (tmp, &set->set_requests) {
1446                 struct ptlrpc_request *req =
1447                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1448
1449                 /* request in-flight? */
1450                 if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting &&
1451                        !req->rq_resend) ||
1452                       (req->rq_phase == RQ_PHASE_BULK)))
1453                         continue;
1454
1455                 if (req->rq_timedout ||           /* already dealt with */
1456                     req->rq_deadline > now)       /* not expired */
1457                         continue;
1458
1459                 /* deal with this guy */
1460                 ptlrpc_expire_one_request (req);
1461         }
1462
1463         /* When waiting for a whole set, we always to break out of the
1464          * sleep so we can recalculate the timeout, or enable interrupts
1465          * iff everyone's timed out.
1466          */
1467         RETURN(1);
1468 }
1469
1470 void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
1471 {
1472         spin_lock(&req->rq_lock);
1473         req->rq_intr = 1;
1474         spin_unlock(&req->rq_lock);
1475 }
1476
1477 void ptlrpc_interrupted_set(void *data)
1478 {
1479         struct ptlrpc_request_set *set = data;
1480         struct list_head *tmp;
1481
1482         LASSERT(set != NULL);
1483         CERROR("INTERRUPTED SET %p\n", set);
1484
1485         list_for_each(tmp, &set->set_requests) {
1486                 struct ptlrpc_request *req =
1487                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1488
1489                 if (req->rq_phase != RQ_PHASE_RPC)
1490                         continue;
1491
1492                 ptlrpc_mark_interrupted(req);
1493         }
1494 }
1495
1496 /* get the smallest timeout in the set; this does NOT set a timeout. */
1497 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
1498 {
1499         struct list_head      *tmp;
1500         time_t                 now = cfs_time_current_sec();
1501         int                    timeout = 0;
1502         struct ptlrpc_request *req;
1503         int                    deadline;
1504         ENTRY;
1505
1506         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1507
1508         list_for_each(tmp, &set->set_requests) {
1509                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1510
1511                 /* request in-flight? */
1512                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
1513                       (req->rq_phase == RQ_PHASE_BULK) ||
1514                       (req->rq_phase == RQ_PHASE_NEW)))
1515                         continue;
1516
1517                 if (req->rq_timedout)   /* already timed out */
1518                         continue;
1519
1520                 if (req->rq_phase == RQ_PHASE_NEW)
1521                         deadline = req->rq_sent;
1522                 else
1523                         deadline = req->rq_sent + req->rq_timeout;
1524
1525                 if (deadline <= now)    /* actually expired already */
1526                         timeout = 1;    /* ASAP */
1527                 else if (timeout == 0 || timeout > deadline - now)
1528                         timeout = deadline - now;
1529         }
1530         RETURN(timeout);
1531 }
1532
1533 int ptlrpc_set_wait(struct ptlrpc_request_set *set)
1534 {
1535         struct list_head      *tmp;
1536         struct ptlrpc_request *req;
1537         struct l_wait_info     lwi;
1538         int                    rc, timeout;
1539         ENTRY;
1540
1541         if (list_empty(&set->set_requests))
1542                 RETURN(0);
1543
1544         list_for_each(tmp, &set->set_requests) {
1545                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1546                 if (req->rq_phase == RQ_PHASE_NEW)
1547                         (void)ptlrpc_send_new_req(req);
1548         }
1549
1550         do {
1551                 timeout = ptlrpc_set_next_timeout(set);
1552
1553                 /* wait until all complete, interrupted, or an in-flight
1554                  * req times out */
1555                 CDEBUG(D_RPCTRACE, "set %p going to sleep for %d seconds\n",
1556                        set, timeout);
1557                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout ? timeout : 1),
1558                                        ptlrpc_expired_set,
1559                                        ptlrpc_interrupted_set, set);
1560                 rc = l_wait_event(set->set_waitq, ptlrpc_check_set(set), &lwi);
1561
1562                 LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
1563
1564                 /* -EINTR => all requests have been flagged rq_intr so next
1565                  * check completes.
1566                  * -ETIMEOUTD => someone timed out.  When all reqs have
1567                  * timed out, signals are enabled allowing completion with
1568                  * EINTR.
1569                  * I don't really care if we go once more round the loop in
1570                  * the error cases -eeb. */
1571         } while (rc != 0 || set->set_remaining != 0);
1572
1573         LASSERT(set->set_remaining == 0);
1574
1575         rc = 0;
1576         list_for_each(tmp, &set->set_requests) {
1577                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1578
1579                 LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
1580                 if (req->rq_status != 0)
1581                         rc = req->rq_status;
1582         }
1583
1584         if (set->set_interpret != NULL) {
1585                 int (*interpreter)(struct ptlrpc_request_set *set,void *,int) =
1586                         set->set_interpret;
1587                 rc = interpreter (set, set->set_arg, rc);
1588         } else {
1589                 struct ptlrpc_set_cbdata *cbdata, *n;
1590                 int err;
1591
1592                 list_for_each_entry_safe(cbdata, n,
1593                                          &set->set_cblist, psc_item) {
1594                         list_del_init(&cbdata->psc_item);
1595                         err = cbdata->psc_interpret(set, cbdata->psc_data, rc);
1596                         if (err && !rc)
1597                                 rc = err;
1598                         OBD_FREE_PTR(cbdata);
1599                 }
1600         }
1601
1602         RETURN(rc);
1603 }
1604
1605 static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
1606 {
1607         ENTRY;
1608         if (request == NULL) {
1609                 EXIT;
1610                 return;
1611         }
1612
1613         LASSERTF(!request->rq_receiving_reply, "req %p\n", request);
1614         LASSERTF(request->rq_rqbd == NULL, "req %p\n",request);/* client-side */
1615         LASSERTF(list_empty(&request->rq_list), "req %p\n", request);
1616         LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request);
1617         LASSERT(request->rq_cli_ctx);
1618
1619         req_capsule_fini(&request->rq_pill);
1620
1621         /* We must take it off the imp_replay_list first.  Otherwise, we'll set
1622          * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
1623         if (request->rq_import != NULL) {
1624                 if (!locked)
1625                         spin_lock(&request->rq_import->imp_lock);
1626                 list_del_init(&request->rq_mod_list);
1627                 list_del_init(&request->rq_replay_list);
1628                 if (!locked)
1629                         spin_unlock(&request->rq_import->imp_lock);
1630         }
1631         LASSERTF(list_empty(&request->rq_replay_list), "req %p\n", request);
1632
1633         if (atomic_read(&request->rq_refcount) != 0) {
1634                 DEBUG_REQ(D_ERROR, request,
1635                           "freeing request with nonzero refcount");
1636                 LBUG();
1637         }
1638
1639         if (request->rq_repbuf != NULL)
1640                 sptlrpc_cli_free_repbuf(request);
1641         if (request->rq_export != NULL) {
1642                 class_export_put(request->rq_export);
1643                 request->rq_export = NULL;
1644         }
1645         if (request->rq_import != NULL) {
1646                 class_import_put(request->rq_import);
1647                 request->rq_import = NULL;
1648         }
1649         if (request->rq_bulk != NULL)
1650                 ptlrpc_free_bulk(request->rq_bulk);
1651
1652         if (request->rq_reqbuf != NULL || request->rq_clrbuf != NULL)
1653                 sptlrpc_cli_free_reqbuf(request);
1654
1655         sptlrpc_req_put_ctx(request, !locked);
1656
1657         if (request->rq_pool)
1658                 __ptlrpc_free_req_to_pool(request);
1659         else
1660                 OBD_FREE(request, sizeof(*request));
1661         EXIT;
1662 }
1663
1664 void ptlrpc_free_req(struct ptlrpc_request *request)
1665 {
1666         __ptlrpc_free_req(request, 0);
1667 }
1668
1669 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked);
1670 void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request)
1671 {
1672         LASSERT_SPIN_LOCKED(&request->rq_import->imp_lock);
1673         (void)__ptlrpc_req_finished(request, 1);
1674 }
1675
1676 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
1677 {
1678         ENTRY;
1679         if (request == NULL)
1680                 RETURN(1);
1681
1682         if (request == LP_POISON ||
1683             request->rq_reqmsg == LP_POISON) {
1684                 CERROR("dereferencing freed request (bug 575)\n");
1685                 LBUG();
1686                 RETURN(1);
1687         }
1688
1689         DEBUG_REQ(D_INFO, request, "refcount now %u",
1690                   atomic_read(&request->rq_refcount) - 1);
1691
1692         if (atomic_dec_and_test(&request->rq_refcount)) {
1693                 __ptlrpc_free_req(request, locked);
1694                 RETURN(1);
1695         }
1696
1697         RETURN(0);
1698 }
1699
1700 void ptlrpc_req_finished(struct ptlrpc_request *request)
1701 {
1702         __ptlrpc_req_finished(request, 0);
1703 }
1704
1705 __u64 ptlrpc_req_xid(struct ptlrpc_request *request)
1706 {
1707         return request->rq_xid;
1708 }
1709 EXPORT_SYMBOL(ptlrpc_req_xid);
1710
1711 /* Disengage the client's reply buffer from the network
1712  * NB does _NOT_ unregister any client-side bulk.
1713  * IDEMPOTENT, but _not_ safe against concurrent callers.
1714  * The request owner (i.e. the thread doing the I/O) must call...
1715  */
1716 void ptlrpc_unregister_reply (struct ptlrpc_request *request)
1717 {
1718         int                rc;
1719         cfs_waitq_t       *wq;
1720         struct l_wait_info lwi;
1721
1722         LASSERT(!in_interrupt ());             /* might sleep */
1723         if (!ptlrpc_client_recv_or_unlink(request))
1724                 /* Nothing left to do */
1725                 return;
1726
1727         LNetMDUnlink (request->rq_reply_md_h);
1728
1729         /* We have to l_wait_event() whatever the result, to give liblustre
1730          * a chance to run reply_in_callback(), and to make sure we've
1731          * unlinked before returning a req to the pool */
1732
1733         if (request->rq_set != NULL)
1734                 wq = &request->rq_set->set_waitq;
1735         else
1736                 wq = &request->rq_reply_waitq;
1737
1738         for (;;) {
1739                 /* Network access will complete in finite time but the HUGE
1740                  * timeout lets us CWARN for visibility of sluggish NALs */
1741                 lwi = LWI_TIMEOUT(cfs_time_seconds(LONG_UNLINK), NULL, NULL);
1742                 rc = l_wait_event (*wq, !ptlrpc_client_recv_or_unlink(request),
1743                                    &lwi);
1744                 if (rc == 0)
1745                         return;
1746
1747                 LASSERT (rc == -ETIMEDOUT);
1748                 DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout "
1749                           "rvcng=%d unlnk=%d", request->rq_receiving_reply,
1750                           request->rq_must_unlink);
1751         }
1752 }
1753
1754 /* caller must hold imp->imp_lock */
1755 void ptlrpc_free_committed(struct obd_import *imp)
1756 {
1757         struct list_head *tmp, *saved;
1758         struct ptlrpc_request *req;
1759         struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
1760         ENTRY;
1761
1762         LASSERT(imp != NULL);
1763
1764         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1765
1766
1767         if (imp->imp_peer_committed_transno == imp->imp_last_transno_checked &&
1768             imp->imp_generation == imp->imp_last_generation_checked) {
1769                 CDEBUG(D_RPCTRACE, "%s: skip recheck: last_committed "LPU64"\n",
1770                        imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
1771                 EXIT;
1772                 return;
1773         }
1774
1775         CDEBUG(D_RPCTRACE, "%s: committing for last_committed "LPU64" gen %d\n",
1776                imp->imp_obd->obd_name, imp->imp_peer_committed_transno,
1777                imp->imp_generation);
1778         imp->imp_last_transno_checked = imp->imp_peer_committed_transno;
1779         imp->imp_last_generation_checked = imp->imp_generation;
1780
1781         list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
1782                 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1783
1784                 /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
1785                 LASSERT(req != last_req);
1786                 last_req = req;
1787
1788                 if (req->rq_import_generation < imp->imp_generation) {
1789                         DEBUG_REQ(D_RPCTRACE, req, "free request with old gen");
1790                         GOTO(free_req, 0);
1791                 }
1792
1793                 if (req->rq_replay) {
1794                         DEBUG_REQ(D_RPCTRACE, req, "keeping (FL_REPLAY)");
1795                         continue;
1796                 }
1797
1798                 /* not yet committed */
1799                 if (req->rq_transno > imp->imp_peer_committed_transno) {
1800                         DEBUG_REQ(D_RPCTRACE, req, "stopping search");
1801                         break;
1802                 }
1803
1804                 DEBUG_REQ(D_RPCTRACE, req, "commit (last_committed "LPU64")",
1805                           imp->imp_peer_committed_transno);
1806 free_req:
1807                 spin_lock(&req->rq_lock);
1808                 req->rq_replay = 0;
1809                 spin_unlock(&req->rq_lock);
1810                 if (req->rq_commit_cb != NULL)
1811                         req->rq_commit_cb(req);
1812                 list_del_init(&req->rq_replay_list);
1813                 __ptlrpc_req_finished(req, 1);
1814         }
1815
1816         EXIT;
1817         return;
1818 }
1819
1820 void ptlrpc_cleanup_client(struct obd_import *imp)
1821 {
1822         ENTRY;
1823         EXIT;
1824         return;
1825 }
1826
1827 void ptlrpc_resend_req(struct ptlrpc_request *req)
1828 {
1829         DEBUG_REQ(D_HA, req, "going to resend");
1830         lustre_msg_set_handle(req->rq_reqmsg, &(struct lustre_handle){ 0 });
1831         req->rq_status = -EAGAIN;
1832
1833         spin_lock(&req->rq_lock);
1834         req->rq_resend = 1;
1835         req->rq_net_err = 0;
1836         req->rq_timedout = 0;
1837         if (req->rq_bulk) {
1838                 __u64 old_xid = req->rq_xid;
1839
1840                 /* ensure previous bulk fails */
1841                 req->rq_xid = ptlrpc_next_xid();
1842                 CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
1843                        old_xid, req->rq_xid);
1844         }
1845         ptlrpc_wake_client_req(req);
1846         spin_unlock(&req->rq_lock);
1847 }
1848
1849 /* XXX: this function and rq_status are currently unused */
1850 void ptlrpc_restart_req(struct ptlrpc_request *req)
1851 {
1852         DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
1853         req->rq_status = -ERESTARTSYS;
1854
1855         spin_lock(&req->rq_lock);
1856         req->rq_restart = 1;
1857         req->rq_timedout = 0;
1858         ptlrpc_wake_client_req(req);
1859         spin_unlock(&req->rq_lock);
1860 }
1861
1862 static int expired_request(void *data)
1863 {
1864         struct ptlrpc_request *req = data;
1865         ENTRY;
1866
1867         /* some failure can suspend regular timeouts */
1868         if (ptlrpc_check_suspend())
1869                 RETURN(1);
1870
1871         /* deadline may have changed with an early reply */
1872         if (req->rq_deadline > cfs_time_current_sec())
1873                 RETURN(1);
1874
1875         RETURN(ptlrpc_expire_one_request(req));
1876 }
1877
1878 static void interrupted_request(void *data)
1879 {
1880         struct ptlrpc_request *req = data;
1881         DEBUG_REQ(D_HA, req, "request interrupted");
1882         spin_lock(&req->rq_lock);
1883         req->rq_intr = 1;
1884         spin_unlock(&req->rq_lock);
1885 }
1886
1887 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
1888 {
1889         ENTRY;
1890         atomic_inc(&req->rq_refcount);
1891         RETURN(req);
1892 }
1893
1894 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
1895                                       struct obd_import *imp)
1896 {
1897         struct list_head *tmp;
1898
1899         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1900
1901         /* clear this for new requests that were resent as well
1902            as resent replayed requests. */
1903         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
1904
1905         /* don't re-add requests that have been replayed */
1906         if (!list_empty(&req->rq_replay_list))
1907                 return;
1908
1909         lustre_msg_add_flags(req->rq_reqmsg, MSG_REPLAY);
1910
1911         LASSERT(imp->imp_replayable);
1912         /* Balanced in ptlrpc_free_committed, usually. */
1913         ptlrpc_request_addref(req);
1914         list_for_each_prev(tmp, &imp->imp_replay_list) {
1915                 struct ptlrpc_request *iter =
1916                         list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1917
1918                 /* We may have duplicate transnos if we create and then
1919                  * open a file, or for closes retained if to match creating
1920                  * opens, so use req->rq_xid as a secondary key.
1921                  * (See bugs 684, 685, and 428.)
1922                  * XXX no longer needed, but all opens need transnos!
1923                  */
1924                 if (iter->rq_transno > req->rq_transno)
1925                         continue;
1926
1927                 if (iter->rq_transno == req->rq_transno) {
1928                         LASSERT(iter->rq_xid != req->rq_xid);
1929                         if (iter->rq_xid > req->rq_xid)
1930                                 continue;
1931                 }
1932
1933                 list_add(&req->rq_replay_list, &iter->rq_replay_list);
1934                 return;
1935         }
1936
1937         list_add_tail(&req->rq_replay_list, &imp->imp_replay_list);
1938 }
1939
1940 int ptlrpc_queue_wait(struct ptlrpc_request *req)
1941 {
1942         int rc = 0;
1943         int brc;
1944         struct l_wait_info lwi;
1945         struct obd_import *imp = req->rq_import;
1946         cfs_duration_t timeout = CFS_TICK;
1947         long timeoutl;
1948         ENTRY;
1949
1950         LASSERT(req->rq_set == NULL);
1951         LASSERT(!req->rq_receiving_reply);
1952         atomic_inc(&imp->imp_inflight);
1953
1954         /* for distributed debugging */
1955         lustre_msg_set_status(req->rq_reqmsg, cfs_curproc_pid());
1956         LASSERT(imp->imp_obd != NULL);
1957         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:nid:opc "
1958                "%s:%s:%d:"LPU64":%s:%d\n", cfs_curproc_comm(),
1959                imp->imp_obd->obd_uuid.uuid,
1960                lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
1961                libcfs_nid2str(imp->imp_connection->c_peer.nid),
1962                lustre_msg_get_opc(req->rq_reqmsg));
1963
1964         /* Mark phase here for a little debug help */
1965         req->rq_phase = RQ_PHASE_RPC;
1966
1967         spin_lock(&imp->imp_lock);
1968         req->rq_import_generation = imp->imp_generation;
1969 restart:
1970         if (ptlrpc_import_delay_req(imp, req, &rc)) {
1971                 list_del(&req->rq_list);
1972
1973                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1974                 spin_unlock(&imp->imp_lock);
1975
1976                 DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)",
1977                           cfs_curproc_comm(),
1978                           ptlrpc_import_state_name(req->rq_send_state),
1979                           ptlrpc_import_state_name(imp->imp_state));
1980                 lwi = LWI_INTR(interrupted_request, req);
1981                 rc = l_wait_event(req->rq_reply_waitq,
1982                                   (req->rq_send_state == imp->imp_state ||
1983                                    req->rq_err || req->rq_intr),
1984                                   &lwi);
1985                 DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d/%d == 1)",
1986                           cfs_curproc_comm(),
1987                           ptlrpc_import_state_name(imp->imp_state),
1988                           ptlrpc_import_state_name(req->rq_send_state),
1989                           req->rq_err, req->rq_intr);
1990
1991                 spin_lock(&imp->imp_lock);
1992                 list_del_init(&req->rq_list);
1993
1994                 if (req->rq_err) {
1995                         /* rq_status was set locally */
1996                         rc = -EIO;
1997                 }
1998                 else if (req->rq_intr) {
1999                         rc = -EINTR;
2000                 }
2001                 else if (req->rq_no_resend) {
2002                         spin_unlock(&imp->imp_lock);
2003                         GOTO(out, rc = -ETIMEDOUT);
2004                 }
2005                 else {
2006                         GOTO(restart, rc);
2007                 }
2008         }
2009
2010         if (rc != 0) {
2011                 list_del_init(&req->rq_list);
2012                 spin_unlock(&imp->imp_lock);
2013                 req->rq_status = rc; // XXX this ok?
2014                 GOTO(out, rc);
2015         }
2016
2017         if (req->rq_resend) {
2018                 lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
2019
2020                 if (req->rq_bulk != NULL) {
2021                         ptlrpc_unregister_bulk (req);
2022
2023                         /* bulk requests are supposed to be
2024                          * idempotent, so we are free to bump the xid
2025                          * here, which we need to do before
2026                          * registering the bulk again (bug 6371).
2027                          * print the old xid first for sanity.
2028                          */
2029                         DEBUG_REQ(D_HA, req, "bumping xid for bulk: ");
2030                         req->rq_xid = ptlrpc_next_xid();
2031                 }
2032
2033                 DEBUG_REQ(D_HA, req, "resending: ");
2034         }
2035
2036         /* XXX this is the same as ptlrpc_set_wait */
2037         LASSERT(list_empty(&req->rq_list));
2038         list_add_tail(&req->rq_list, &imp->imp_sending_list);
2039         spin_unlock(&imp->imp_lock);
2040
2041         rc = sptlrpc_req_refresh_ctx(req, 0);
2042         if (rc) {
2043                 if (req->rq_err) {
2044                         /* we got fatal ctx refresh error, directly jump out
2045                          * thus we can pass back the actual error code.
2046                          */
2047                         spin_lock(&imp->imp_lock);
2048                         list_del_init(&req->rq_list);
2049                         spin_unlock(&imp->imp_lock);
2050
2051                         CERROR("Failed to refresh ctx of req %p: %d\n", req, rc);
2052                         GOTO(out, rc);
2053                 }
2054                 /* simulating we got error during send rpc */
2055                 goto after_send;
2056         }
2057
2058         rc = ptl_send_rpc(req, 0);
2059         if (rc)
2060                 DEBUG_REQ(D_HA, req, "send failed (%d); recovering", rc);
2061
2062 repeat:
2063         timeoutl = req->rq_deadline - cfs_time_current_sec();
2064         timeout = (timeoutl <= 0 || rc) ? CFS_TICK :
2065                 cfs_time_seconds(timeoutl);
2066         DEBUG_REQ(D_NET, req,
2067                   "-- sleeping for "CFS_DURATION_T" ticks", timeout);
2068         lwi = LWI_TIMEOUT_INTR(timeout, expired_request, interrupted_request,
2069                                req);
2070         rc = l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi);
2071         if (rc == -ETIMEDOUT && ((req->rq_deadline > cfs_time_current_sec()) ||
2072                                  ptlrpc_check_and_wait_suspend(req)))
2073                 goto repeat;
2074
2075 after_send:
2076         CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:nid:opc "
2077                "%s:%s:%d:"LPU64":%s:%d\n", cfs_curproc_comm(),
2078                imp->imp_obd->obd_uuid.uuid,
2079                lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
2080                libcfs_nid2str(imp->imp_connection->c_peer.nid),
2081                lustre_msg_get_opc(req->rq_reqmsg));
2082
2083         spin_lock(&imp->imp_lock);
2084         list_del_init(&req->rq_list);
2085         spin_unlock(&imp->imp_lock);
2086
2087         /* If the reply was received normally, this just grabs the spinlock
2088          * (ensuring the reply callback has returned), sees that
2089          * req->rq_receiving_reply is clear and returns. */
2090         ptlrpc_unregister_reply (req);
2091
2092
2093         if (req->rq_err) {
2094                 DEBUG_REQ(D_RPCTRACE, req, "err rc=%d status=%d",
2095                           rc, req->rq_status);
2096                 GOTO(out, rc = -EIO);
2097         }
2098
2099         if (req->rq_intr) {
2100                 /* Should only be interrupted if we timed out. */
2101                 if (!req->rq_timedout)
2102                         DEBUG_REQ(D_ERROR, req,
2103                                   "rq_intr set but rq_timedout not");
2104                 GOTO(out, rc = -EINTR);
2105         }
2106
2107         /* Resend if we need to */
2108         if (req->rq_resend) {
2109                 /* ...unless we were specifically told otherwise. */
2110                 if (req->rq_no_resend)
2111                         GOTO(out, rc = -ETIMEDOUT);
2112                 spin_lock(&imp->imp_lock);
2113                 goto restart;
2114         }
2115
2116         if (req->rq_timedout) {                 /* non-recoverable timeout */
2117                 GOTO(out, rc = -ETIMEDOUT);
2118         }
2119
2120         if (!req->rq_replied) {
2121                 /* How can this be? -eeb */
2122                 DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
2123                 LBUG();
2124                 GOTO(out, rc = req->rq_status);
2125         }
2126
2127         rc = after_reply(req);
2128         /* NB may return +ve success rc */
2129         if (req->rq_resend) {
2130                 spin_lock(&imp->imp_lock);
2131                 goto restart;
2132         }
2133
2134  out:
2135         if (req->rq_bulk != NULL) {
2136                 if (rc >= 0) {
2137                         /* success so far.  Note that anything going wrong
2138                          * with bulk now, is EXTREMELY strange, since the
2139                          * server must have believed that the bulk
2140                          * tranferred OK before she replied with success to
2141                          * me. */
2142                         lwi = LWI_TIMEOUT(timeout, NULL, NULL);
2143                         brc = l_wait_event(req->rq_reply_waitq,
2144                                            !ptlrpc_bulk_active(req->rq_bulk),
2145                                            &lwi);
2146                         LASSERT(brc == 0 || brc == -ETIMEDOUT);
2147                         if (brc != 0) {
2148                                 LASSERT(brc == -ETIMEDOUT);
2149                                 DEBUG_REQ(D_ERROR, req, "bulk timed out");
2150                                 rc = brc;
2151                         } else if (!req->rq_bulk->bd_success) {
2152                                 DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
2153                                 rc = -EIO;
2154                         }
2155                 }
2156                 if (rc < 0)
2157                         ptlrpc_unregister_bulk (req);
2158         }
2159
2160         LASSERT(!req->rq_receiving_reply);
2161         req->rq_phase = RQ_PHASE_INTERPRET;
2162
2163         atomic_dec(&imp->imp_inflight);
2164         cfs_waitq_signal(&imp->imp_recovery_waitq);
2165         RETURN(rc);
2166 }
2167
2168 struct ptlrpc_replay_async_args {
2169         int praa_old_state;
2170         int praa_old_status;
2171 };
2172
2173 static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
2174                                     void * data, int rc)
2175 {
2176         struct ptlrpc_replay_async_args *aa = data;
2177         struct obd_import *imp = req->rq_import;
2178
2179         ENTRY;
2180         atomic_dec(&imp->imp_replay_inflight);
2181
2182         if (!req->rq_replied) {
2183                 CERROR("request replay timed out, restarting recovery\n");
2184                 GOTO(out, rc = -ETIMEDOUT);
2185         }
2186
2187         if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR &&
2188             (lustre_msg_get_status(req->rq_repmsg) == -ENOTCONN ||
2189              lustre_msg_get_status(req->rq_repmsg) == -ENODEV))
2190                 GOTO(out, rc = lustre_msg_get_status(req->rq_repmsg));
2191
2192         /* The transno had better not change over replay. */
2193         LASSERT(lustre_msg_get_transno(req->rq_reqmsg) ==
2194                 lustre_msg_get_transno(req->rq_repmsg));
2195
2196         DEBUG_REQ(D_HA, req, "got rep");
2197
2198         /* let the callback do fixups, possibly including in the request */
2199         if (req->rq_replay_cb)
2200                 req->rq_replay_cb(req);
2201
2202         if (req->rq_replied &&
2203             lustre_msg_get_status(req->rq_repmsg) != aa->praa_old_status) {
2204                 DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
2205                           lustre_msg_get_status(req->rq_repmsg),
2206                           aa->praa_old_status);
2207         } else {
2208                 /* Put it back for re-replay. */
2209                 lustre_msg_set_status(req->rq_repmsg, aa->praa_old_status);
2210         }
2211
2212         /*
2213          * Errors while replay can set transno to 0, but
2214          * imp_last_replay_transno shouldn't be set to 0 anyway
2215          */
2216         if (req->rq_transno > 0) {
2217                 spin_lock(&imp->imp_lock);
2218                 LASSERT(req->rq_transno <= imp->imp_last_replay_transno);
2219                 imp->imp_last_replay_transno = req->rq_transno;
2220                 spin_unlock(&imp->imp_lock);
2221         } else
2222                 CERROR("Transno is 0 during replay!\n");
2223         /* continue with recovery */
2224         rc = ptlrpc_import_recovery_state_machine(imp);
2225  out:
2226         req->rq_send_state = aa->praa_old_state;
2227
2228         if (rc != 0)
2229                 /* this replay failed, so restart recovery */
2230                 ptlrpc_connect_import(imp, NULL);
2231
2232         RETURN(rc);
2233 }
2234
2235 int ptlrpc_replay_req(struct ptlrpc_request *req)
2236 {
2237         struct ptlrpc_replay_async_args *aa;
2238         ENTRY;
2239
2240         LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
2241         /* Not handling automatic bulk replay yet (or ever?) */
2242         LASSERT(req->rq_bulk == NULL);
2243
2244         LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
2245         aa = ptlrpc_req_async_args(req);
2246         memset(aa, 0, sizeof *aa);
2247
2248         /* Prepare request to be resent with ptlrpcd */
2249         aa->praa_old_state = req->rq_send_state;
2250         req->rq_send_state = LUSTRE_IMP_REPLAY;
2251         req->rq_phase = RQ_PHASE_NEW;
2252         if (req->rq_repmsg)
2253                 aa->praa_old_status = lustre_msg_get_status(req->rq_repmsg);
2254         req->rq_status = 0;
2255         req->rq_interpret_reply = ptlrpc_replay_interpret;
2256         /* Readjust the timeout for current conditions */
2257         ptlrpc_at_set_req_timeout(req);
2258
2259         DEBUG_REQ(D_HA, req, "REPLAY");
2260
2261         atomic_inc(&req->rq_import->imp_replay_inflight);
2262         ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
2263
2264         ptlrpcd_add_req(req);
2265         RETURN(0);
2266 }
2267
2268 void ptlrpc_abort_inflight(struct obd_import *imp)
2269 {
2270         struct list_head *tmp, *n;
2271         ENTRY;
2272
2273         /* Make sure that no new requests get processed for this import.
2274          * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
2275          * this flag and then putting requests on sending_list or delayed_list.
2276          */
2277         spin_lock(&imp->imp_lock);
2278
2279         /* XXX locking?  Maybe we should remove each request with the list
2280          * locked?  Also, how do we know if the requests on the list are
2281          * being freed at this time?
2282          */
2283         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
2284                 struct ptlrpc_request *req =
2285                         list_entry(tmp, struct ptlrpc_request, rq_list);
2286
2287                 DEBUG_REQ(D_RPCTRACE, req, "inflight");
2288
2289                 spin_lock (&req->rq_lock);
2290                 if (req->rq_import_generation < imp->imp_generation) {
2291                         req->rq_err = 1;
2292                         req->rq_status = -EINTR;
2293                         ptlrpc_wake_client_req(req);
2294                 }
2295                 spin_unlock (&req->rq_lock);
2296         }
2297
2298         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
2299                 struct ptlrpc_request *req =
2300                         list_entry(tmp, struct ptlrpc_request, rq_list);
2301
2302                 DEBUG_REQ(D_RPCTRACE, req, "aborting waiting req");
2303
2304                 spin_lock (&req->rq_lock);
2305                 if (req->rq_import_generation < imp->imp_generation) {
2306                         req->rq_err = 1;
2307                         req->rq_status = -EINTR;
2308                         ptlrpc_wake_client_req(req);
2309                 }
2310                 spin_unlock (&req->rq_lock);
2311         }
2312
2313         /* Last chance to free reqs left on the replay list, but we
2314          * will still leak reqs that haven't committed.  */
2315         if (imp->imp_replayable)
2316                 ptlrpc_free_committed(imp);
2317
2318         spin_unlock(&imp->imp_lock);
2319
2320         EXIT;
2321 }
2322
2323 static __u64 ptlrpc_last_xid = 0;
2324 spinlock_t ptlrpc_last_xid_lock;
2325
2326 __u64 ptlrpc_next_xid(void)
2327 {
2328         __u64 tmp;
2329         spin_lock(&ptlrpc_last_xid_lock);
2330         tmp = ++ptlrpc_last_xid;
2331         spin_unlock(&ptlrpc_last_xid_lock);
2332         return tmp;
2333 }
2334
2335 __u64 ptlrpc_sample_next_xid(void)
2336 {
2337         __u64 tmp;
2338         spin_lock(&ptlrpc_last_xid_lock);
2339         tmp = ptlrpc_last_xid + 1;
2340         spin_unlock(&ptlrpc_last_xid_lock);
2341         return tmp;
2342 }
2343 EXPORT_SYMBOL(ptlrpc_sample_next_xid);
2344