Whamcloud - gitweb
Branch b1_8
[fs/lustre-release.git] / lustre / ptlrpc / import.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ptlrpc/import.c
37  *
38  * Author: Mike Shaver <shaver@clusterfs.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_RPC
42 #ifndef __KERNEL__
43 # include <liblustre.h>
44 #endif
45
46 #include <obd_support.h>
47 #include <lustre_ha.h>
48 #include <lustre_net.h>
49 #include <lustre_import.h>
50 #include <lustre_export.h>
51 #include <obd.h>
52 #include <obd_class.h>
53
54 #include "ptlrpc_internal.h"
55
56 struct ptlrpc_connect_async_args {
57          __u64 pcaa_peer_committed;
58         int pcaa_initial_connect;
59 };
60
61 /* A CLOSED import should remain so. */
62 #define IMPORT_SET_STATE_NOLOCK(imp, state)                                    \
63 do {                                                                           \
64         if (imp->imp_state != LUSTRE_IMP_CLOSED) {                             \
65                CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",    \
66                       imp, obd2cli_tgt(imp->imp_obd),                          \
67                       ptlrpc_import_state_name(imp->imp_state),                \
68                       ptlrpc_import_state_name(state));                        \
69                imp->imp_state = state;                                         \
70         }                                                                      \
71 } while(0)
72
73 #define IMPORT_SET_STATE(imp, state)            \
74 do {                                            \
75         spin_lock(&imp->imp_lock);              \
76         IMPORT_SET_STATE_NOLOCK(imp, state);    \
77         spin_unlock(&imp->imp_lock);            \
78 } while(0)
79
80
81 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
82                                     void * data, int rc);
83 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
84
85 /* Only this function is allowed to change the import state when it is
86  * CLOSED. I would rather refcount the import and free it after
87  * disconnection like we do with exports. To do that, the client_obd
88  * will need to save the peer info somewhere other than in the import,
89  * though. */
90 int ptlrpc_init_import(struct obd_import *imp)
91 {
92         spin_lock(&imp->imp_lock);
93
94         imp->imp_generation++;
95         imp->imp_state =  LUSTRE_IMP_NEW;
96
97         spin_unlock(&imp->imp_lock);
98
99         return 0;
100 }
101 EXPORT_SYMBOL(ptlrpc_init_import);
102
103 #define UUID_STR "_UUID"
104 static void deuuidify(char *uuid, const char *prefix, char **uuid_start,
105                       int *uuid_len)
106 {
107         *uuid_start = !prefix || strncmp(uuid, prefix, strlen(prefix))
108                 ? uuid : uuid + strlen(prefix);
109
110         *uuid_len = strlen(*uuid_start);
111
112         if (*uuid_len < strlen(UUID_STR))
113                 return;
114
115         if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
116                     UUID_STR, strlen(UUID_STR)))
117                 *uuid_len -= strlen(UUID_STR);
118 }
119
120 /* Returns true if import was FULL, false if import was already not
121  * connected.
122  * @imp - import to be disconnected
123  * @conn_cnt - connection count (epoch) of the request that timed out
124  *             and caused the disconnection.  In some cases, multiple
125  *             inflight requests can fail to a single target (e.g. OST
126  *             bulk requests) and if one has already caused a reconnection
127  *             (increasing the import->conn_cnt) the older failure should
128  *             not also cause a reconnection.  If zero it forces a reconnect.
129  */
130 int ptlrpc_set_import_discon(struct obd_import *imp, __u32 conn_cnt)
131 {
132         int rc = 0;
133
134         spin_lock(&imp->imp_lock);
135
136         if (imp->imp_state == LUSTRE_IMP_FULL &&
137             (conn_cnt == 0 || conn_cnt == imp->imp_conn_cnt)) {
138                 char *target_start;
139                 int   target_len;
140
141                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
142                           &target_start, &target_len);
143                 if (imp->imp_replayable) {
144                         LCONSOLE_WARN("%s: Connection to service %.*s via nid "
145                                "%s was lost; in progress operations using this "
146                                "service will wait for recovery to complete.\n",
147                                imp->imp_obd->obd_name, target_len, target_start,
148                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
149                 } else {
150                         LCONSOLE_ERROR_MSG(0x166, "%s: Connection to service "
151                                "%.*s via nid %s was lost; in progress "
152                                "operations using this service will fail.\n",
153                                imp->imp_obd->obd_name, target_len, target_start,
154                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
155                 }
156                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
157                 spin_unlock(&imp->imp_lock);
158
159                 if (obd_dump_on_timeout)
160                         libcfs_debug_dumplog();
161
162                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
163                 rc = 1;
164         } else {
165                 spin_unlock(&imp->imp_lock);
166                 CDEBUG(D_HA, "%s: import %p already %s (conn %u, was %u): %s\n",
167                        imp->imp_client->cli_name, imp,
168                        (imp->imp_state == LUSTRE_IMP_FULL &&
169                         imp->imp_conn_cnt > conn_cnt) ?
170                        "reconnected" : "not connected", imp->imp_conn_cnt,
171                        conn_cnt, ptlrpc_import_state_name(imp->imp_state));
172         }
173
174         return rc;
175 }
176
177 /* Must be called with imp_lock held! */
178 static void ptlrpc_deactivate_and_unlock_import(struct obd_import *imp)
179 {
180         ENTRY;
181         LASSERT_SPIN_LOCKED(&imp->imp_lock);
182
183         CDEBUG(D_HA, "setting import %s INVALID\n", obd2cli_tgt(imp->imp_obd));
184         imp->imp_invalid = 1;
185         imp->imp_generation++;
186         spin_unlock(&imp->imp_lock);
187
188         ptlrpc_abort_inflight(imp);
189         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
190 }
191
192 /*
193  * This acts as a barrier; all existing requests are rejected, and
194  * no new requests will be accepted until the import is valid again.
195  */
196 void ptlrpc_deactivate_import(struct obd_import *imp)
197 {
198         spin_lock(&imp->imp_lock);
199         ptlrpc_deactivate_and_unlock_import(imp);
200 }
201
202 static unsigned int
203 ptlrpc_inflight_deadline(struct ptlrpc_request *req, time_t now)
204 {
205         long dl;
206
207         if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
208               (req->rq_phase == RQ_PHASE_BULK) ||
209               (req->rq_phase == RQ_PHASE_NEW)))
210                 return 0;
211
212         if (req->rq_timedout)
213                 return 0;
214
215         if (req->rq_phase == RQ_PHASE_NEW)
216                 dl = req->rq_sent;
217         else
218                 dl = req->rq_deadline;
219
220         if (dl <= now)
221                 return 0;
222
223         return dl - now;
224 }
225
226 static unsigned int ptlrpc_inflight_timeout(struct obd_import *imp)
227 {
228         time_t now = cfs_time_current_sec();
229         struct list_head *tmp, *n;
230         struct ptlrpc_request *req;
231         unsigned int timeout = 0;
232
233         spin_lock(&imp->imp_lock);
234         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
235                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
236                 timeout = max(ptlrpc_inflight_deadline(req, now), timeout);
237         }
238         spin_unlock(&imp->imp_lock);
239         return timeout;
240 }
241
242 /*
243  * This function will invalidate the import, if necessary, then block
244  * for all the RPC completions, and finally notify the obd to
245  * invalidate its state (ie cancel locks, clear pending requests,
246  * etc).
247  */
248 void ptlrpc_invalidate_import(struct obd_import *imp)
249 {
250         struct list_head *tmp, *n;
251         struct ptlrpc_request *req;
252         struct l_wait_info lwi;
253         unsigned int timeout;
254         int rc;
255
256         atomic_inc(&imp->imp_inval_count);
257
258         /*
259          * If this is an invalid MGC connection, then don't bother
260          * waiting for imp_inflight to drop to 0.
261          */
262         if (imp->imp_invalid && imp->imp_recon_bk &&!imp->imp_obd->obd_no_recov)
263                 goto out;
264
265         if (!imp->imp_invalid || imp->imp_obd->obd_no_recov)
266                 ptlrpc_deactivate_import(imp);
267
268         LASSERT(imp->imp_invalid);
269
270         /* Wait forever until inflight == 0. We really can't do it another
271          * way because in some cases we need to wait for very long reply
272          * unlink. We can't do anything before that because there is really
273          * no guarantee that some rdma transfer is not in progress right now. */
274         do {
275                 /* Calculate max timeout for waiting on rpcs to error
276                  * out. Use obd_timeout if calculated value is smaller
277                  * than it. */
278                 timeout = ptlrpc_inflight_timeout(imp);
279                 timeout += timeout / 3;
280
281                 if (timeout == 0)
282                         timeout = obd_timeout;
283
284                 CDEBUG(D_RPCTRACE,"Sleeping %d sec for inflight to error out\n",
285                        timeout);
286
287                 /* Wait for all requests to error out and call completion
288                  * callbacks. Cap it at obd_timeout -- these should all
289                  * have been locally cancelled by ptlrpc_abort_inflight. */
290                 lwi = LWI_TIMEOUT_INTERVAL(
291                         cfs_timeout_cap(cfs_time_seconds(timeout)),
292                         cfs_time_seconds(1), NULL, NULL);
293                 rc = l_wait_event(imp->imp_recovery_waitq,
294                                 (atomic_read(&imp->imp_inflight) == 0), &lwi);
295                 if (rc) {
296                         const char *cli_tgt = obd2cli_tgt(imp->imp_obd);
297
298                         CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
299                                cli_tgt, rc, atomic_read(&imp->imp_inflight));
300
301                         spin_lock(&imp->imp_lock);
302                         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
303                                 req = list_entry(tmp, struct ptlrpc_request,
304                                         rq_list);
305                                 DEBUG_REQ(D_ERROR, req,"still on sending list");
306                         }
307                         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
308                                 req = list_entry(tmp, struct ptlrpc_request,
309                                         rq_list);
310                                 DEBUG_REQ(D_ERROR, req,"still on delayed list");
311                         }
312
313                         if (atomic_read(&imp->imp_unregistering) == 0) {
314                                 /* We know that only "unregistering" rpcs may
315                                  * still survive in sending or delaying lists
316                                  * (They are waiting for long reply unlink in
317                                  * sluggish nets). Let's check this. If there
318                                  * is no unregistering and inflight != 0 this
319                                  * is bug. */
320                                 LASSERT(atomic_read(&imp->imp_inflight) == 0);
321
322                                 /* Let's save one loop as soon as inflight have
323                                  * dropped to zero. No new inflights possible at
324                                  * this point. */
325                                 rc = 0;
326                         } else {
327                                 CERROR("%s: RPCs in \"%s\" phase found (%d). "
328                                        "Network is sluggish? Waiting them "
329                                        "to error out.\n", cli_tgt,
330                                        ptlrpc_phase2str(RQ_PHASE_UNREGISTERING),
331                                        atomic_read(&imp->imp_unregistering));
332                         }
333                         spin_unlock(&imp->imp_lock);
334                 }
335         } while (rc != 0);
336
337         /* Let's additionally check that no new rpcs added to import in
338          * "invalidate" state. */
339         LASSERT(atomic_read(&imp->imp_inflight) == 0);
340
341 out:
342         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
343
344         atomic_dec(&imp->imp_inval_count);
345         cfs_waitq_broadcast(&imp->imp_recovery_waitq);
346 }
347
348 /* unset imp_invalid */
349 void ptlrpc_activate_import(struct obd_import *imp)
350 {
351         struct obd_device *obd = imp->imp_obd;
352
353         spin_lock(&imp->imp_lock);
354         imp->imp_invalid = 0;
355         spin_unlock(&imp->imp_lock);
356
357         obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
358 }
359
360 void ptlrpc_fail_import(struct obd_import *imp, __u32 conn_cnt)
361 {
362         ENTRY;
363
364         LASSERT(!imp->imp_dlm_fake);
365
366         if (ptlrpc_set_import_discon(imp, conn_cnt)) {
367                 if (!imp->imp_replayable) {
368                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
369                                "auto-deactivating\n",
370                                obd2cli_tgt(imp->imp_obd),
371                                imp->imp_connection->c_remote_uuid.uuid,
372                                imp->imp_obd->obd_name);
373                         ptlrpc_deactivate_import(imp);
374                 }
375
376                 CDEBUG(D_HA, "%s: waking up pinger\n",
377                        obd2cli_tgt(imp->imp_obd));
378
379                 spin_lock(&imp->imp_lock);
380                 imp->imp_force_verify = 1;
381                 spin_unlock(&imp->imp_lock);
382
383                 ptlrpc_pinger_wake_up();
384         }
385         EXIT;
386 }
387
388 int ptlrpc_reconnect_import(struct obd_import *imp)
389 {
390
391         ptlrpc_set_import_discon(imp, 0);
392         /* Force a new connect attempt */
393         ptlrpc_invalidate_import(imp);
394         /* Do a fresh connect next time by zeroing the handle */
395         ptlrpc_disconnect_import(imp, 1);
396         /* Wait for all invalidate calls to finish */
397         if (atomic_read(&imp->imp_inval_count) > 0) {
398                 int rc;
399                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
400                 rc = l_wait_event(imp->imp_recovery_waitq,
401                                   (atomic_read(&imp->imp_inval_count) == 0),
402                                   &lwi);
403                 if (rc)
404                         CERROR("Interrupted, inval=%d\n",
405                                atomic_read(&imp->imp_inval_count));
406         }
407
408         /*
409          * Allow reconnect attempts. Note: Currently, the function is
410          * only called by MGC. So assume this is a recoverable import,
411          * and force import to be recoverable. fix this if you need to
412          */
413
414         imp->imp_obd->obd_no_recov = 0;
415         /* Remove 'invalid' flag */
416         ptlrpc_activate_import(imp);
417         /* Attempt a new connect */
418         ptlrpc_recover_import(imp, NULL);
419         return 0;
420 }
421
422 EXPORT_SYMBOL(ptlrpc_reconnect_import);
423
424 static int import_select_connection(struct obd_import *imp)
425 {
426         struct obd_import_conn *imp_conn = NULL, *conn;
427         struct obd_export *dlmexp;
428         int tried_all = 1;
429         ENTRY;
430
431         spin_lock(&imp->imp_lock);
432
433         if (list_empty(&imp->imp_conn_list)) {
434                 CERROR("%s: no connections available\n",
435                         imp->imp_obd->obd_name);
436                 spin_unlock(&imp->imp_lock);
437                 RETURN(-EINVAL);
438         }
439
440         list_for_each_entry(conn, &imp->imp_conn_list, oic_item) {
441                 CDEBUG(D_HA, "%s: connect to NID %s last attempt "LPU64"\n",
442                        imp->imp_obd->obd_name,
443                        libcfs_nid2str(conn->oic_conn->c_peer.nid),
444                        conn->oic_last_attempt);
445
446                 /* Don't thrash connections */
447                 if (cfs_time_before_64(cfs_time_current_64(),
448                                      conn->oic_last_attempt +
449                                      cfs_time_seconds(CONNECTION_SWITCH_MIN))) {
450                         continue;
451                 }
452
453                 /* If we have not tried this connection since the
454                    the last successful attempt, go with this one */
455                 if ((conn->oic_last_attempt == 0) ||
456                     cfs_time_beforeq_64(conn->oic_last_attempt,
457                                        imp->imp_last_success_conn)) {
458                         imp_conn = conn;
459                         tried_all = 0;
460                         break;
461                 }
462
463                 /* If all of the connections have already been tried
464                    since the last successful connection; just choose the
465                    least recently used */
466                 if (!imp_conn)
467                         imp_conn = conn;
468                 else if (cfs_time_before_64(conn->oic_last_attempt,
469                                             imp_conn->oic_last_attempt))
470                         imp_conn = conn;
471         }
472
473         /* if not found, simply choose the current one */
474         if (!imp_conn) {
475                 LASSERT(imp->imp_conn_current);
476                 imp_conn = imp->imp_conn_current;
477                 tried_all = 0;
478         }
479         LASSERT(imp_conn->oic_conn);
480
481         /* If we've tried everything, and we're back to the beginning of the
482            list, increase our timeout and try again. It will be reset when
483            we do finally connect. (FIXME: really we should wait for all network
484            state associated with the last connection attempt to drain before
485            trying to reconnect on it.) */
486         if (tried_all && (imp->imp_conn_list.next == &imp_conn->oic_item) &&
487             !imp->imp_recon_bk /* not retrying */) {
488                 if (at_get(&imp->imp_at.iat_net_latency) <
489                     CONNECTION_SWITCH_MAX) {
490                         at_add(&imp->imp_at.iat_net_latency,
491                                at_get(&imp->imp_at.iat_net_latency) +
492                                CONNECTION_SWITCH_INC);
493                 }
494                 LASSERT(imp_conn->oic_last_attempt);
495                 CWARN("%s: tried all connections, increasing latency to %ds\n",
496                       imp->imp_obd->obd_name,
497                       at_get(&imp->imp_at.iat_net_latency));
498         }
499
500         imp_conn->oic_last_attempt = cfs_time_current_64();
501
502         /* switch connection, don't mind if it's same as the current one */
503         if (imp->imp_connection)
504                 ptlrpc_connection_put(imp->imp_connection);
505         imp->imp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
506
507         dlmexp =  class_conn2export(&imp->imp_dlm_handle);
508         LASSERT(dlmexp != NULL);
509         if (dlmexp->exp_connection)
510                 ptlrpc_connection_put(dlmexp->exp_connection);
511         dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
512         class_export_put(dlmexp);
513
514         if (imp->imp_conn_current != imp_conn) {
515                 if (imp->imp_conn_current)
516                         LCONSOLE_INFO("Changing connection for %s to %s/%s\n",
517                                       imp->imp_obd->obd_name,
518                                       imp_conn->oic_uuid.uuid,
519                                       libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
520                 imp->imp_conn_current = imp_conn;
521         }
522
523         CDEBUG(D_HA, "%s: import %p using connection %s/%s\n",
524                imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid,
525                libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
526
527         spin_unlock(&imp->imp_lock);
528
529         RETURN(0);
530 }
531
532 /**
533  * must be called under imp lock
534  */
535 static int ptlrpc_first_transno(struct obd_import *imp, __u64 *transno)
536 {
537         struct ptlrpc_request *req;
538         struct list_head *tmp;
539
540         if (list_empty(&imp->imp_replay_list))
541                 return 0;
542         tmp = imp->imp_replay_list.next;
543         req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
544         *transno = req->rq_transno;
545         if (req->rq_transno == 0) {
546                 DEBUG_REQ(D_ERROR, req, "zero transno in replay");
547                 LBUG();
548         }
549
550         return 1;
551 }
552
553 int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
554 {
555         struct obd_device *obd = imp->imp_obd;
556         int set_transno = 0;
557         int initial_connect = 0;
558         int rc;
559         __u64 committed_before_reconnect = 0;
560         struct ptlrpc_request *request;
561         __u32 size[] = { sizeof(struct ptlrpc_body),
562                        sizeof(imp->imp_obd->u.cli.cl_target_uuid),
563                        sizeof(obd->obd_uuid),
564                        sizeof(imp->imp_dlm_handle),
565                        sizeof(imp->imp_connect_data) };
566         char *tmp[] = { NULL,
567                         obd2cli_tgt(imp->imp_obd),
568                         obd->obd_uuid.uuid,
569                         (char *)&imp->imp_dlm_handle,
570                         (char *)&imp->imp_connect_data };
571         struct ptlrpc_connect_async_args *aa;
572
573         ENTRY;
574         spin_lock(&imp->imp_lock);
575         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
576                 spin_unlock(&imp->imp_lock);
577                 CERROR("can't connect to a closed import\n");
578                 RETURN(-EINVAL);
579         } else if (imp->imp_state == LUSTRE_IMP_FULL) {
580                 spin_unlock(&imp->imp_lock);
581                 CERROR("already connected\n");
582                 RETURN(0);
583         } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
584                 spin_unlock(&imp->imp_lock);
585                 CERROR("already connecting\n");
586                 RETURN(-EALREADY);
587         }
588
589         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
590
591         imp->imp_conn_cnt++;
592         imp->imp_resend_replay = 0;
593
594         if (!lustre_handle_is_used(&imp->imp_remote_handle))
595                 initial_connect = 1;
596         else
597                 committed_before_reconnect = imp->imp_peer_committed_transno;
598
599         set_transno = ptlrpc_first_transno(imp,
600                                            &imp->imp_connect_data.ocd_transno);
601
602         spin_unlock(&imp->imp_lock);
603
604         if (new_uuid) {
605                 struct obd_uuid uuid;
606
607                 obd_str2uuid(&uuid, new_uuid);
608                 rc = import_set_conn_priority(imp, &uuid);
609                 if (rc)
610                         GOTO(out, rc);
611         }
612
613         rc = import_select_connection(imp);
614         if (rc)
615                 GOTO(out, rc);
616
617         /* last in connection list */
618         if (imp->imp_conn_current->oic_item.next == &imp->imp_conn_list) {
619                 if (imp->imp_initial_recov_bk && initial_connect) {
620                         CDEBUG(D_HA, "Last connection attempt (%d) for %s\n",
621                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
622                         /* Don't retry if connect fails */
623                         rc = 0;
624                         obd_set_info_async(obd->obd_self_export,
625                                            sizeof(KEY_INIT_RECOV),
626                                            KEY_INIT_RECOV,
627                                            sizeof(rc), &rc, NULL);
628                 }
629                 if (imp->imp_recon_bk) {
630                         CDEBUG(D_HA, "Last reconnection attempt (%d) for %s\n",
631                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
632                         spin_lock(&imp->imp_lock);
633                         imp->imp_last_recon = 1;
634                         spin_unlock(&imp->imp_lock);
635                 }
636         }
637
638         /* Reset connect flags to the originally requested flags, in case
639          * the server is updated on-the-fly we will get the new features. */
640         imp->imp_connect_data.ocd_connect_flags = imp->imp_connect_flags_orig;
641         imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
642
643         rc = obd_reconnect(imp->imp_obd->obd_self_export, obd,
644                            &obd->obd_uuid, &imp->imp_connect_data, NULL);
645         if (rc)
646                 GOTO(out, rc);
647
648         request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, imp->imp_connect_op,
649                                   5, size, tmp);
650         if (!request)
651                 GOTO(out, rc = -ENOMEM);
652
653         /* Report the rpc service time to the server so that it knows how long
654          * to wait for clients to join recovery */
655         lustre_msg_set_service_time(request->rq_reqmsg,
656                                     at_timeout2est(request->rq_timeout));
657
658         /* The amount of time we give the server to process the connect req.
659          * import_select_connection will increase the net latency on
660          * repeated reconnect attempts to cover slow networks.
661          * We override/ignore the server rpc completion estimate here,
662          * which may be large if this is a reconnect attempt */
663         request->rq_timeout = INITIAL_CONNECT_TIMEOUT;
664         lustre_msg_set_timeout(request->rq_reqmsg, request->rq_timeout);
665
666 #ifndef __KERNEL__
667         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
668 #endif
669         if (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V1)
670                 lustre_msg_add_op_flags(request->rq_reqmsg,
671                                         MSG_CONNECT_NEXT_VER);
672
673         request->rq_no_resend = request->rq_no_delay = 1;
674         request->rq_send_state = LUSTRE_IMP_CONNECTING;
675         /* Allow a slightly larger reply for future growth compatibility */
676         size[REPLY_REC_OFF] = sizeof(struct obd_connect_data) +
677                               16 * sizeof(__u64);
678         ptlrpc_req_set_repsize(request, 2, size);
679         request->rq_interpret_reply = ptlrpc_connect_interpret;
680
681         CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
682         aa = ptlrpc_req_async_args(request);
683         memset(aa, 0, sizeof *aa);
684
685         aa->pcaa_peer_committed = committed_before_reconnect;
686         aa->pcaa_initial_connect = initial_connect;
687         if (aa->pcaa_initial_connect) {
688                 spin_lock(&imp->imp_lock);
689                 imp->imp_replayable = 1;
690                 spin_unlock(&imp->imp_lock);
691                 lustre_msg_add_op_flags(request->rq_reqmsg,
692                                         MSG_CONNECT_INITIAL);
693         }
694
695         if (set_transno)
696                 lustre_msg_add_op_flags(request->rq_reqmsg,
697                                         MSG_CONNECT_TRANSNO);
698
699         DEBUG_REQ(D_RPCTRACE, request, "%sconnect request %d",
700                   aa->pcaa_initial_connect ? "initial " : "re",
701                   imp->imp_conn_cnt);
702         ptlrpcd_add_req(request);
703         rc = 0;
704 out:
705         if (rc != 0) {
706                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
707         }
708
709         RETURN(rc);
710 }
711 EXPORT_SYMBOL(ptlrpc_connect_import);
712
713 static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp)
714 {
715 #ifdef __KERNEL__
716         struct obd_import_conn *imp_conn;
717 #endif
718         int wake_pinger = 0;
719
720         ENTRY;
721
722         spin_lock(&imp->imp_lock);
723         if (list_empty(&imp->imp_conn_list))
724                 GOTO(unlock, 0);
725
726 #ifdef __KERNEL__
727         imp_conn = list_entry(imp->imp_conn_list.prev,
728                               struct obd_import_conn,
729                               oic_item);
730
731         /* XXX: When the failover node is the primary node, it is possible
732          * to have two identical connections in imp_conn_list. We must
733          * compare not conn's pointers but NIDs, otherwise we can defeat
734          * connection throttling. (See bug 14774.) */
735         if (imp->imp_conn_current->oic_conn->c_peer.nid !=
736                                 imp_conn->oic_conn->c_peer.nid) {
737                 ptlrpc_ping_import_soon(imp);
738                 wake_pinger = 1;
739         }
740
741 #else
742         /* liblustre has no pinger thead, so we wakup pinger anyway */
743         wake_pinger = 1;
744 #endif
745  unlock:
746         spin_unlock(&imp->imp_lock);
747
748         if (wake_pinger)
749                 ptlrpc_pinger_wake_up();
750
751         EXIT;
752 }
753
754 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
755                                     void * data, int rc)
756 {
757         struct ptlrpc_connect_async_args *aa = data;
758         struct obd_import *imp = request->rq_import;
759         struct client_obd *cli = &imp->imp_obd->u.cli;
760         struct lustre_handle old_hdl;
761         __u64 old_connect_flags;
762         int msg_flags;
763         ENTRY;
764
765         spin_lock(&imp->imp_lock);
766         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
767                 spin_unlock(&imp->imp_lock);
768                 RETURN(0);
769         }
770         spin_unlock(&imp->imp_lock);
771
772         if (rc)
773                 GOTO(out, rc);
774
775         LASSERT(imp->imp_conn_current);
776
777         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
778
779         /* All imports are pingable */
780         spin_lock(&imp->imp_lock);
781         imp->imp_pingable = 1;
782
783         if (aa->pcaa_initial_connect) {
784                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
785                         imp->imp_replayable = 1;
786                         spin_unlock(&imp->imp_lock);
787                         CDEBUG(D_HA, "connected to replayable target: %s\n",
788                                obd2cli_tgt(imp->imp_obd));
789                 } else {
790                         imp->imp_replayable = 0;
791                         spin_unlock(&imp->imp_lock);
792                 }
793
794                 if ((request->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V1 &&
795                      msg_flags & MSG_CONNECT_NEXT_VER) ||
796                     request->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V2) {
797                         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
798                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v2\n",
799                                obd2cli_tgt(imp->imp_obd));
800                 } else {
801                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v1\n",
802                                obd2cli_tgt(imp->imp_obd));
803                 }
804
805                 imp->imp_remote_handle =
806                                 *lustre_msg_get_handle(request->rq_repmsg);
807
808                 /* Initial connects are allowed for clients with non-random
809                  * uuids when servers are in recovery.  Simply signal the
810                  * servers replay is complete and wait in REPLAY_WAIT. */
811                 if (msg_flags & MSG_CONNECT_RECOVERING) {
812                         CDEBUG(D_HA, "connect to %s during recovery\n",
813                                obd2cli_tgt(imp->imp_obd));
814                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
815                 } else {
816                         IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
817                         ptlrpc_activate_import(imp);
818                 }
819                 GOTO(finish, rc = 0);
820         } else {
821                 spin_unlock(&imp->imp_lock);
822         }
823
824         /* Determine what recovery state to move the import to. */
825         if (MSG_CONNECT_RECONNECT & msg_flags) {
826                 memset(&old_hdl, 0, sizeof(old_hdl));
827                 if (!memcmp(&old_hdl, lustre_msg_get_handle(request->rq_repmsg),
828                             sizeof (old_hdl))) {
829                         CERROR("%s@%s didn't like our handle "LPX64
830                                ", failed\n", obd2cli_tgt(imp->imp_obd),
831                                imp->imp_connection->c_remote_uuid.uuid,
832                                imp->imp_dlm_handle.cookie);
833                         GOTO(out, rc = -ENOTCONN);
834                 }
835
836                 if (memcmp(&imp->imp_remote_handle,
837                            lustre_msg_get_handle(request->rq_repmsg),
838                            sizeof(imp->imp_remote_handle))) {
839                         int level = msg_flags & MSG_CONNECT_RECOVERING ?
840                                 D_HA : D_WARNING;
841
842                         /* Bug 16611/14775: if server handle have changed,
843                          * that means some sort of disconnection happened.
844                          * If the server is not in recovery, that also means it
845                          * already erased all of our state because of previous
846                          * eviction. If it is in recovery - we are safe to
847                          * participate since we can reestablish all of our state
848                          * with server again */
849                         CDEBUG(level,"%s@%s changed server handle from "
850                                      LPX64" to "LPX64"%s\n",
851                                      obd2cli_tgt(imp->imp_obd),
852                                      imp->imp_connection->c_remote_uuid.uuid,
853                                      imp->imp_remote_handle.cookie,
854                                      lustre_msg_get_handle(request->rq_repmsg)->
855                                                                         cookie,
856                                      (MSG_CONNECT_RECOVERING & msg_flags) ?
857                                          " but is still in recovery" : "");
858
859                         imp->imp_remote_handle =
860                                      *lustre_msg_get_handle(request->rq_repmsg);
861
862                         if (!(MSG_CONNECT_RECOVERING & msg_flags)) {
863                                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
864                                 GOTO(finish, rc = 0);
865                         }
866
867                 } else {
868                         CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
869                                obd2cli_tgt(imp->imp_obd),
870                                imp->imp_connection->c_remote_uuid.uuid);
871                 }
872
873                 if (imp->imp_invalid) {
874                         CDEBUG(D_HA, "%s: reconnected but import is invalid; "
875                                "marking evicted\n", imp->imp_obd->obd_name);
876                         IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
877                 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
878                         CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
879                                imp->imp_obd->obd_name,
880                                obd2cli_tgt(imp->imp_obd));
881
882                         spin_lock(&imp->imp_lock);
883                         imp->imp_resend_replay = 1;
884                         /* VBR: delayed connection */
885                         if (MSG_CONNECT_DELAYED & msg_flags) {
886                                 imp->imp_delayed_recovery = 1;
887                                 imp->imp_no_lock_replay = 1;
888                         }
889                         spin_unlock(&imp->imp_lock);
890
891                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
892                 } else {
893                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
894                 }
895         } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
896                 LASSERT(imp->imp_replayable);
897                 imp->imp_remote_handle =
898                                 *lustre_msg_get_handle(request->rq_repmsg);
899                 imp->imp_last_replay_transno = 0;
900                 /* VBR: delayed connection */
901                 if (MSG_CONNECT_DELAYED & msg_flags) {
902                         spin_lock(&imp->imp_lock);
903                         imp->imp_delayed_recovery = 1;
904                         imp->imp_no_lock_replay = 1;
905                         spin_unlock(&imp->imp_lock);
906                 }
907                 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
908         } else {
909                 DEBUG_REQ(D_HA, request, "evicting (not initial connect and "
910                           "flags reconnect/recovering not set: %x)",msg_flags);
911                 imp->imp_remote_handle =
912                                 *lustre_msg_get_handle(request->rq_repmsg);
913                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
914         }
915
916         /* Sanity checks for a reconnected import. */
917         if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
918                 CERROR("imp_replayable flag does not match server "
919                        "after reconnect. We should LBUG right here.\n");
920         }
921
922         if (lustre_msg_get_last_committed(request->rq_repmsg) <
923             aa->pcaa_peer_committed) {
924                 CERROR("%s went back in time (transno "LPD64
925                        " was previously committed, server now claims "LPD64
926                        ")!  See https://bugzilla.lustre.org/show_bug.cgi?"
927                        "id=9646\n",
928                        obd2cli_tgt(imp->imp_obd), aa->pcaa_peer_committed,
929                        lustre_msg_get_last_committed(request->rq_repmsg));
930         }
931
932 finish:
933         rc = ptlrpc_import_recovery_state_machine(imp);
934         if (rc != 0) {
935                 if (rc == -ENOTCONN) {
936                         CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
937                                "invalidating and reconnecting\n",
938                                obd2cli_tgt(imp->imp_obd),
939                                imp->imp_connection->c_remote_uuid.uuid);
940                         ptlrpc_connect_import(imp, NULL);
941                         RETURN(0);
942                 }
943         } else {
944                 struct obd_connect_data *ocd;
945                 struct obd_export *exp;
946
947                 ocd = lustre_swab_repbuf(request, REPLY_REC_OFF, sizeof(*ocd),
948                                          lustre_swab_connect);
949                 spin_lock(&imp->imp_lock);
950                 list_del(&imp->imp_conn_current->oic_item);
951                 list_add(&imp->imp_conn_current->oic_item, &imp->imp_conn_list);
952                 imp->imp_last_success_conn =
953                         imp->imp_conn_current->oic_last_attempt;
954
955                 if (ocd == NULL) {
956                         spin_unlock(&imp->imp_lock);
957                         CERROR("Wrong connect data from server\n");
958                         rc = -EPROTO;
959                         GOTO(out, rc);
960                 }
961
962                 imp->imp_connect_data = *ocd;
963
964                 exp = class_conn2export(&imp->imp_dlm_handle);
965                 spin_unlock(&imp->imp_lock);
966
967                 /* check that server granted subset of flags we asked for. */
968                 LASSERTF((ocd->ocd_connect_flags &
969                           imp->imp_connect_flags_orig) ==
970                          ocd->ocd_connect_flags, LPX64" != "LPX64,
971                          imp->imp_connect_flags_orig, ocd->ocd_connect_flags);
972
973                 if (!exp) {
974                         /* This could happen if export is cleaned during the
975                            connect attempt */
976                         CERROR("Missing export for %s\n",
977                                imp->imp_obd->obd_name);
978                         GOTO(out, rc = -ENODEV);
979                 }
980                 old_connect_flags = exp->exp_connect_flags;
981                 exp->exp_connect_flags = ocd->ocd_connect_flags;
982                 imp->imp_obd->obd_self_export->exp_connect_flags =
983                         ocd->ocd_connect_flags;
984                 class_export_put(exp);
985
986                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD);
987
988                 if (!ocd->ocd_ibits_known &&
989                     ocd->ocd_connect_flags & OBD_CONNECT_IBITS)
990                         CERROR("Inodebits aware server returned zero compatible"
991                                " bits?\n");
992
993                 if ((ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
994                     (ocd->ocd_version > LUSTRE_VERSION_CODE +
995                                         LUSTRE_VERSION_OFFSET_WARN ||
996                      ocd->ocd_version < LUSTRE_VERSION_CODE -
997                                         LUSTRE_VERSION_OFFSET_WARN)) {
998                         /* Sigh, some compilers do not like #ifdef in the middle
999                            of macro arguments */
1000 #ifdef __KERNEL__
1001                         const char *older =
1002                                 "older.  Consider upgrading this client";
1003 #else
1004                         const char *older =
1005                                 "older.  Consider recompiling this application";
1006 #endif
1007                         const char *newer = "newer than client version";
1008
1009                         LCONSOLE_WARN("Server %s version (%d.%d.%d.%d) "
1010                                       "is much %s (%s)\n",
1011                                       obd2cli_tgt(imp->imp_obd),
1012                                       OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
1013                                       OBD_OCD_VERSION_MINOR(ocd->ocd_version),
1014                                       OBD_OCD_VERSION_PATCH(ocd->ocd_version),
1015                                       OBD_OCD_VERSION_FIX(ocd->ocd_version),
1016                                       ocd->ocd_version > LUSTRE_VERSION_CODE ?
1017                                       newer : older, LUSTRE_VERSION_STRING);
1018                 }
1019
1020                 if (ocd->ocd_connect_flags & OBD_CONNECT_CKSUM) {
1021                         /* We sent to the server ocd_cksum_types with bits set
1022                          * for algorithms we understand. The server masked off
1023                          * the checksum types it doesn't support */
1024                         if ((ocd->ocd_cksum_types & OBD_CKSUM_ALL) == 0) {
1025                                 LCONSOLE_WARN("The negotiation of the checksum "
1026                                               "alogrithm to use with server %s "
1027                                               "failed (%x/%x), disabling "
1028                                               "checksums\n",
1029                                               obd2cli_tgt(imp->imp_obd),
1030                                               ocd->ocd_cksum_types,
1031                                               OBD_CKSUM_ALL);
1032                                 cli->cl_checksum = 0;
1033                                 cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
1034                                 cli->cl_cksum_type = OBD_CKSUM_CRC32;
1035                         } else {
1036                                 cli->cl_supp_cksum_types = ocd->ocd_cksum_types;
1037
1038                                 if (ocd->ocd_cksum_types & OSC_DEFAULT_CKSUM)
1039                                         cli->cl_cksum_type = OSC_DEFAULT_CKSUM;
1040                                 else if (ocd->ocd_cksum_types & OBD_CKSUM_ADLER)
1041                                         cli->cl_cksum_type = OBD_CKSUM_ADLER;
1042                                 else
1043                                         cli->cl_cksum_type = OBD_CKSUM_CRC32;
1044                         }
1045                 } else {
1046                         /* The server does not support OBD_CONNECT_CKSUM.
1047                          * Enforce CRC32 for backward compatibility*/
1048                         cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
1049                         cli->cl_cksum_type = OBD_CKSUM_CRC32;
1050                 }
1051
1052                 if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
1053                         cli->cl_max_pages_per_rpc =
1054                                 ocd->ocd_brw_size >> CFS_PAGE_SHIFT;
1055                 }
1056
1057                 /* Reset ns_connect_flags only for initial connect. It might be
1058                  * changed in while using FS and if we reset it in reconnect
1059                  * this leads to lossing user settings done before such as
1060                  * disable lru_resize, etc. */
1061                 if (old_connect_flags != exp->exp_connect_flags ||
1062                     aa->pcaa_initial_connect) {
1063                         CDEBUG(D_HA, "%s: Resetting ns_connect_flags to server "
1064                                "flags: "LPX64"\n", imp->imp_obd->obd_name,
1065                                ocd->ocd_connect_flags);
1066                         imp->imp_obd->obd_namespace->ns_connect_flags =
1067                                 ocd->ocd_connect_flags;
1068                         imp->imp_obd->obd_namespace->ns_orig_connect_flags =
1069                                 ocd->ocd_connect_flags;
1070                 }
1071
1072                 if ((ocd->ocd_connect_flags & OBD_CONNECT_AT) &&
1073                     (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V2))
1074                         /* We need a per-message support flag, because
1075                            a. we don't know if the incoming connect reply
1076                               supports AT or not (in reply_in_callback)
1077                               until we unpack it.
1078                            b. failovered server means export and flags are gone
1079                               (in ptlrpc_send_reply).
1080                            Can only be set when we know AT is supported at
1081                            both ends */
1082                         imp->imp_msghdr_flags |= MSGHDR_AT_SUPPORT;
1083                 else
1084                         imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
1085
1086                 LASSERT((cli->cl_max_pages_per_rpc <= PTLRPC_MAX_BRW_PAGES) &&
1087                         (cli->cl_max_pages_per_rpc > 0));
1088         }
1089
1090  out:
1091         if (rc != 0) {
1092                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
1093                 spin_lock(&imp->imp_lock);
1094                 if (aa->pcaa_initial_connect && !imp->imp_initial_recov &&
1095                     (request->rq_import_generation == imp->imp_generation))
1096                         ptlrpc_deactivate_and_unlock_import(imp);
1097                 else
1098                         spin_unlock(&imp->imp_lock);
1099
1100                 if (imp->imp_recon_bk && imp->imp_last_recon) {
1101                         /* Give up trying to reconnect */
1102                         imp->imp_obd->obd_no_recov = 1;
1103                         ptlrpc_deactivate_import(imp);
1104                 }
1105
1106                 if (rc == -EPROTO) {
1107                         struct obd_connect_data *ocd;
1108                         ocd = lustre_swab_repbuf(request, REPLY_REC_OFF,
1109                                                  sizeof *ocd,
1110                                                  lustre_swab_connect);
1111                         if (ocd &&
1112                             (ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
1113                             (ocd->ocd_version != LUSTRE_VERSION_CODE)) {
1114                            /* Actually servers are only supposed to refuse
1115                               connection from liblustre clients, so we should
1116                               never see this from VFS context */
1117                                 LCONSOLE_ERROR_MSG(0x16a, "Server %s version "
1118                                         "(%d.%d.%d.%d)"
1119                                         " refused connection from this client "
1120                                         "with an incompatible version (%s).  "
1121                                         "Client must be recompiled\n",
1122                                         obd2cli_tgt(imp->imp_obd),
1123                                         OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
1124                                         OBD_OCD_VERSION_MINOR(ocd->ocd_version),
1125                                         OBD_OCD_VERSION_PATCH(ocd->ocd_version),
1126                                         OBD_OCD_VERSION_FIX(ocd->ocd_version),
1127                                         LUSTRE_VERSION_STRING);
1128                                 ptlrpc_deactivate_import(imp);
1129                                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CLOSED);
1130                         }
1131                         RETURN(-EPROTO);
1132                 }
1133
1134                 ptlrpc_maybe_ping_import_soon(imp);
1135
1136                 CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
1137                        obd2cli_tgt(imp->imp_obd),
1138                        (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
1139         }
1140
1141         spin_lock(&imp->imp_lock);
1142         imp->imp_last_recon = 0;
1143         spin_unlock(&imp->imp_lock);
1144
1145         cfs_waitq_broadcast(&imp->imp_recovery_waitq);
1146         RETURN(rc);
1147 }
1148
1149 static int completed_replay_interpret(struct ptlrpc_request *req,
1150                                       void * data, int rc)
1151 {
1152         ENTRY;
1153         atomic_dec(&req->rq_import->imp_replay_inflight);
1154         if (req->rq_status == 0 &&
1155             !req->rq_import->imp_vbr_failed) {
1156                 ptlrpc_import_recovery_state_machine(req->rq_import);
1157         } else {
1158                 if (req->rq_import->imp_vbr_failed) {
1159                         CDEBUG(D_WARNING,
1160                                "%s: version recovery fails, reconnecting\n",
1161                                req->rq_import->imp_obd->obd_name);
1162                         spin_lock(&req->rq_import->imp_lock);
1163                         req->rq_import->imp_vbr_failed = 0;
1164                         spin_unlock(&req->rq_import->imp_lock);
1165                 } else {
1166                         CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
1167                                      "reconnecting\n",
1168                                req->rq_import->imp_obd->obd_name,
1169                                req->rq_status);
1170                 }
1171                 ptlrpc_connect_import(req->rq_import, NULL);
1172         }
1173         RETURN(0);
1174 }
1175
1176 static int signal_completed_replay(struct obd_import *imp)
1177 {
1178         struct ptlrpc_request *req;
1179         ENTRY;
1180
1181         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
1182         atomic_inc(&imp->imp_replay_inflight);
1183
1184         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, 1, NULL, NULL);
1185         if (!req) {
1186                 atomic_dec(&imp->imp_replay_inflight);
1187                 RETURN(-ENOMEM);
1188         }
1189
1190         ptlrpc_req_set_repsize(req, 1, NULL);
1191         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
1192         lustre_msg_add_flags(req->rq_reqmsg,
1193                              MSG_LOCK_REPLAY_DONE |
1194                              MSG_REQ_REPLAY_DONE |
1195                              MSG_LAST_REPLAY);
1196
1197         if (imp->imp_delayed_recovery)
1198                 lustre_msg_add_flags(req->rq_reqmsg, MSG_DELAY_REPLAY);
1199         req->rq_timeout *= 3;
1200         req->rq_interpret_reply = completed_replay_interpret;
1201
1202         ptlrpcd_add_req(req);
1203         RETURN(0);
1204 }
1205
1206 #ifdef __KERNEL__
1207 static int ptlrpc_invalidate_import_thread(void *data)
1208 {
1209         struct obd_import *imp = data;
1210         int disconnect;
1211
1212         ENTRY;
1213
1214         ptlrpc_daemonize("ll_imp_inval");
1215
1216         CDEBUG(D_HA, "thread invalidate import %s to %s@%s\n",
1217                imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
1218                imp->imp_connection->c_remote_uuid.uuid);
1219
1220         ptlrpc_invalidate_import(imp);
1221
1222         /* is client_disconnect_export in flight ? */
1223         spin_lock(&imp->imp_lock);
1224         disconnect = imp->imp_deactive;
1225         spin_unlock(&imp->imp_lock);
1226         if (disconnect)
1227                 GOTO(out, 0 );
1228
1229         if (obd_dump_on_eviction) {
1230                 CERROR("dump the log upon eviction\n");
1231                 libcfs_debug_dumplog();
1232         }
1233
1234         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1235         ptlrpc_import_recovery_state_machine(imp);
1236
1237 out:
1238         class_import_put(imp);
1239         RETURN(0);
1240 }
1241 #endif
1242
1243 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
1244 {
1245         int rc = 0;
1246         int inflight;
1247         char *target_start;
1248         int target_len;
1249
1250         ENTRY;
1251         if (imp->imp_state == LUSTRE_IMP_EVICTED) {
1252                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1253                           &target_start, &target_len);
1254                 /* Don't care about MGC eviction */
1255                 if (strcmp(imp->imp_obd->obd_type->typ_name,
1256                            LUSTRE_MGC_NAME) != 0) {
1257                         LCONSOLE_ERROR_MSG(0x167, "This client was evicted by "
1258                                            "%.*s; in progress operations using "
1259                                            "this service will fail.\n",
1260                                            target_len, target_start);
1261                 }
1262                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
1263                        obd2cli_tgt(imp->imp_obd),
1264                        imp->imp_connection->c_remote_uuid.uuid);
1265
1266 #ifdef __KERNEL__
1267                 /* bug 17802:  XXX client_disconnect_export vs connect request
1268                  * race. if client will evicted at this time, we start
1269                  * invalidate thread without referece to import and import can
1270                  * be freed at same time. */
1271                 class_import_get(imp);
1272                 rc = cfs_kernel_thread(ptlrpc_invalidate_import_thread, imp,
1273                                    CLONE_VM | CLONE_FILES);
1274                 if (rc < 0) {
1275                         class_import_put(imp);
1276                         CERROR("error starting invalidate thread: %d\n", rc);
1277                 } else {
1278                         rc = 0;
1279                 }
1280                 RETURN(rc);
1281 #else
1282                 ptlrpc_invalidate_import(imp);
1283
1284                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1285 #endif
1286         }
1287
1288         if (imp->imp_state == LUSTRE_IMP_REPLAY) {
1289                 CDEBUG(D_HA, "replay requested by %s\n",
1290                        obd2cli_tgt(imp->imp_obd));
1291                 rc = ptlrpc_replay_next(imp, &inflight);
1292                 if (inflight == 0 &&
1293                     atomic_read(&imp->imp_replay_inflight) == 0) {
1294                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
1295                         rc = ldlm_replay_locks(imp);
1296                         if (rc)
1297                                 GOTO(out, rc);
1298                 }
1299                 rc = 0;
1300         }
1301
1302         if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
1303                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1304                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
1305                         rc = signal_completed_replay(imp);
1306                         if (rc)
1307                                 GOTO(out, rc);
1308                 }
1309
1310         }
1311
1312         if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
1313                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1314                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1315                 }
1316         }
1317
1318         if (imp->imp_state == LUSTRE_IMP_RECOVER) {
1319                 CDEBUG(D_HA, "reconnected to %s@%s\n",
1320                        obd2cli_tgt(imp->imp_obd),
1321                        imp->imp_connection->c_remote_uuid.uuid);
1322
1323                 rc = ptlrpc_resend(imp);
1324                 if (rc)
1325                         GOTO(out, rc);
1326                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
1327                 ptlrpc_activate_import(imp);
1328
1329                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1330                           &target_start, &target_len);
1331                 LCONSOLE_INFO("%s: Connection restored to service %.*s "
1332                               "using nid %s.\n", imp->imp_obd->obd_name,
1333                               target_len, target_start,
1334                               libcfs_nid2str(imp->imp_connection->c_peer.nid));
1335         }
1336
1337         if (imp->imp_state == LUSTRE_IMP_FULL) {
1338                 cfs_waitq_broadcast(&imp->imp_recovery_waitq);
1339                 ptlrpc_wake_delayed(imp);
1340         }
1341
1342  out:
1343         RETURN(rc);
1344 }
1345
1346 static int back_to_sleep(void *unused)
1347 {
1348         return 0;
1349 }
1350
1351 int ptlrpc_disconnect_import(struct obd_import *imp, int noclose)
1352 {
1353         struct ptlrpc_request *req;
1354         int rq_opc, rc = 0;
1355         int nowait = imp->imp_obd->obd_force;
1356         ENTRY;
1357
1358         if (nowait)
1359                 GOTO(set_state, rc);
1360
1361         switch (imp->imp_connect_op) {
1362         case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
1363         case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
1364         case MGS_CONNECT: rq_opc = MGS_DISCONNECT; break;
1365         default:
1366                 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
1367                        obd2cli_tgt(imp->imp_obd), imp->imp_connect_op);
1368                 RETURN(-EINVAL);
1369         }
1370
1371         if (ptlrpc_import_in_recovery(imp)) {
1372                 struct l_wait_info lwi;
1373                 cfs_duration_t timeout;
1374
1375                 if (AT_OFF) {
1376                         timeout = cfs_time_seconds(obd_timeout);
1377                 } else {
1378                         int idx = import_at_get_index(imp,
1379                                 imp->imp_client->cli_request_portal);
1380                         timeout = cfs_time_seconds(
1381                                 at_get(&imp->imp_at.iat_service_estimate[idx]));
1382                 }
1383                 lwi = LWI_TIMEOUT_INTR(cfs_timeout_cap(timeout),
1384                                        back_to_sleep, LWI_ON_SIGNAL_NOOP, NULL);
1385                 rc = l_wait_event(imp->imp_recovery_waitq,
1386                                   !ptlrpc_import_in_recovery(imp), &lwi);
1387         }
1388
1389         spin_lock(&imp->imp_lock);
1390         if (imp->imp_state != LUSTRE_IMP_FULL)
1391                 GOTO(out, 0);
1392
1393         spin_unlock(&imp->imp_lock);
1394
1395         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, rq_opc, 1, NULL, NULL);
1396         if (req) {
1397                 /* We are disconnecting, do not retry a failed DISCONNECT rpc if
1398                  * it fails.  We can get through the above with a down server
1399                  * if the client doesn't know the server is gone yet. */
1400                 req->rq_no_resend = 1;
1401
1402 #ifndef CRAY_XT3
1403                 /* We want client umounts to happen quickly, no matter the
1404                    server state... */
1405                 req->rq_timeout = min_t(int, req->rq_timeout,
1406                                         INITIAL_CONNECT_TIMEOUT);
1407 #else
1408                 /* ... but we always want liblustre clients to nicely
1409                    disconnect, so only use the adaptive value. */
1410                 if (AT_OFF)
1411                         req->rq_timeout = obd_timeout / 3;
1412 #endif
1413
1414                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
1415                 req->rq_send_state =  LUSTRE_IMP_CONNECTING;
1416                 ptlrpc_req_set_repsize(req, 1, NULL);
1417                 rc = ptlrpc_queue_wait(req);
1418                 ptlrpc_req_finished(req);
1419         }
1420
1421 set_state:
1422         spin_lock(&imp->imp_lock);
1423 out:
1424         if (noclose)
1425                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
1426         else
1427                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1428         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
1429         /* Try all connections in the future - bz 12758 */
1430         imp->imp_last_recon = 0;
1431         spin_unlock(&imp->imp_lock);
1432
1433         RETURN(rc);
1434 }
1435
1436 /* Sets maximal number of RPCs possible originating from other side of this
1437    import (server) to us and number of async RPC replies that we are not waiting
1438    for arriving */
1439 void ptlrpc_import_setasync(struct obd_import *imp, int count)
1440 {
1441         LNetSetAsync(imp->imp_connection->c_peer, count);
1442 }
1443
1444
1445 /* Adaptive Timeout utils */
1446 extern unsigned int at_min, at_max, at_history;
1447
1448 /* Bin into timeslices using AT_BINS bins.
1449    This gives us a max of the last binlimit*AT_BINS secs without the storage,
1450    but still smoothing out a return to normalcy from a slow response.
1451    (E.g. remember the maximum latency in each minute of the last 4 minutes.) */
1452 int at_add(struct adaptive_timeout *at, unsigned int val)
1453 {
1454         unsigned int old = at->at_current;
1455         time_t now = cfs_time_current_sec();
1456         time_t binlimit = max_t(time_t, at_history / AT_BINS, 1);
1457
1458         LASSERT(at);
1459         CDEBUG(D_OTHER, "add %u to %p time=%lu v=%u (%u %u %u %u)\n",
1460                val, at, now - at->at_binstart, at->at_current,
1461                at->at_hist[0], at->at_hist[1], at->at_hist[2], at->at_hist[3]);
1462
1463         if (val == 0)
1464                 /* 0's don't count, because we never want our timeout to
1465                    drop to 0, and because 0 could mean an error */
1466                 return 0;
1467
1468         spin_lock(&at->at_lock);
1469
1470         if (unlikely(at->at_binstart == 0)) {
1471                 /* Special case to remove default from history */
1472                 at->at_current = val;
1473                 at->at_worst_ever = val;
1474                 at->at_worst_time = now;
1475                 at->at_hist[0] = val;
1476                 at->at_binstart = now;
1477         } else if (now - at->at_binstart < binlimit ) {
1478                 /* in bin 0 */
1479                 at->at_hist[0] = max(val, at->at_hist[0]);
1480                 at->at_current = max(val, at->at_current);
1481         } else {
1482                 int i, shift;
1483                 unsigned int maxv = val;
1484                 /* move bins over */
1485                 shift = (now - at->at_binstart) / binlimit;
1486                 LASSERT(shift > 0);
1487                 for(i = AT_BINS - 1; i >= 0; i--) {
1488                         if (i >= shift) {
1489                                 at->at_hist[i] = at->at_hist[i - shift];
1490                                 maxv = max(maxv, at->at_hist[i]);
1491                         } else {
1492                                 at->at_hist[i] = 0;
1493                         }
1494                 }
1495                 at->at_hist[0] = val;
1496                 at->at_current = maxv;
1497                 at->at_binstart += shift * binlimit;
1498         }
1499
1500         if (at->at_current > at->at_worst_ever) {
1501                 at->at_worst_ever = at->at_current;
1502                 at->at_worst_time = now;
1503         }
1504
1505         if (at->at_flags & AT_FLG_NOHIST)
1506                 /* Only keep last reported val; keeping the rest of the history
1507                    for proc only */
1508                 at->at_current = val;
1509
1510         if (at_max > 0)
1511                 at->at_current =  min(at->at_current, at_max);
1512         at->at_current =  max(at->at_current, at_min);
1513
1514         if (at->at_current != old)
1515                 CDEBUG(D_OTHER, "AT %p change: old=%u new=%u delta=%d "
1516                        "(val=%u) hist %u %u %u %u\n", at,
1517                        old, at->at_current, at->at_current - old, val,
1518                        at->at_hist[0], at->at_hist[1], at->at_hist[2],
1519                        at->at_hist[3]);
1520
1521         /* if we changed, report the old value */
1522         old = (at->at_current != old) ? old : 0;
1523
1524         spin_unlock(&at->at_lock);
1525         return old;
1526 }
1527
1528 /* Find the imp_at index for a given portal; assign if space available */
1529 int import_at_get_index(struct obd_import *imp, int portal)
1530 {
1531         struct imp_at *at = &imp->imp_at;
1532         int i;
1533
1534         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1535                 if (at->iat_portal[i] == portal)
1536                         return i;
1537                 if (at->iat_portal[i] == 0)
1538                         /* unused */
1539                         break;
1540         }
1541
1542         /* Not found in list, add it under a lock */
1543         spin_lock(&imp->imp_lock);
1544
1545         /* Check unused under lock */
1546         for (; i < IMP_AT_MAX_PORTALS; i++) {
1547                 if (at->iat_portal[i] == portal)
1548                         goto out;
1549                 if (at->iat_portal[i] == 0)
1550                         /* unused */
1551                         break;
1552         }
1553
1554         /* Not enough portals? */
1555         LASSERT(i < IMP_AT_MAX_PORTALS);
1556
1557         at->iat_portal[i] = portal;
1558 out:
1559         spin_unlock(&imp->imp_lock);
1560         return i;
1561 }