Whamcloud - gitweb
Branch b1_8
[fs/lustre-release.git] / lustre / ptlrpc / import.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ptlrpc/import.c
37  *
38  * Author: Mike Shaver <shaver@clusterfs.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_RPC
42 #ifndef __KERNEL__
43 # include <liblustre.h>
44 #endif
45
46 #include <obd_support.h>
47 #include <lustre_ha.h>
48 #include <lustre_net.h>
49 #include <lustre_import.h>
50 #include <lustre_export.h>
51 #include <obd.h>
52 #include <obd_class.h>
53
54 #include "ptlrpc_internal.h"
55
56 struct ptlrpc_connect_async_args {
57          __u64 pcaa_peer_committed;
58         int pcaa_initial_connect;
59 };
60
61 static void __import_set_state(struct obd_import *imp,
62                                enum lustre_imp_state state)
63 {
64         imp->imp_state = state;
65         imp->imp_state_hist[imp->imp_state_hist_idx].ish_state = state;
66         imp->imp_state_hist[imp->imp_state_hist_idx].ish_time =
67                 cfs_time_current_sec();
68         imp->imp_state_hist_idx = (imp->imp_state_hist_idx + 1) %
69                 IMP_STATE_HIST_LEN;
70 }
71
72 /* A CLOSED import should remain so. */
73 #define IMPORT_SET_STATE_NOLOCK(imp, state)                                    \
74 do {                                                                           \
75         if (imp->imp_state != LUSTRE_IMP_CLOSED) {                             \
76                CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",    \
77                       imp, obd2cli_tgt(imp->imp_obd),                          \
78                       ptlrpc_import_state_name(imp->imp_state),                \
79                       ptlrpc_import_state_name(state));                        \
80                __import_set_state(imp, state);                                 \
81         }                                                                      \
82 } while(0)
83
84 #define IMPORT_SET_STATE(imp, state)            \
85 do {                                            \
86         spin_lock(&imp->imp_lock);              \
87         IMPORT_SET_STATE_NOLOCK(imp, state);    \
88         spin_unlock(&imp->imp_lock);            \
89 } while(0)
90
91
92 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
93                                     void * data, int rc);
94 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
95
96 /* Only this function is allowed to change the import state when it is
97  * CLOSED. I would rather refcount the import and free it after
98  * disconnection like we do with exports. To do that, the client_obd
99  * will need to save the peer info somewhere other than in the import,
100  * though. */
101 int ptlrpc_init_import(struct obd_import *imp)
102 {
103         spin_lock(&imp->imp_lock);
104
105         imp->imp_generation++;
106         imp->imp_state =  LUSTRE_IMP_NEW;
107
108         spin_unlock(&imp->imp_lock);
109
110         return 0;
111 }
112 EXPORT_SYMBOL(ptlrpc_init_import);
113
114 #define UUID_STR "_UUID"
115 static void deuuidify(char *uuid, const char *prefix, char **uuid_start,
116                       int *uuid_len)
117 {
118         *uuid_start = !prefix || strncmp(uuid, prefix, strlen(prefix))
119                 ? uuid : uuid + strlen(prefix);
120
121         *uuid_len = strlen(*uuid_start);
122
123         if (*uuid_len < strlen(UUID_STR))
124                 return;
125
126         if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
127                     UUID_STR, strlen(UUID_STR)))
128                 *uuid_len -= strlen(UUID_STR);
129 }
130
131 /* Returns true if import was FULL, false if import was already not
132  * connected.
133  * @imp - import to be disconnected
134  * @conn_cnt - connection count (epoch) of the request that timed out
135  *             and caused the disconnection.  In some cases, multiple
136  *             inflight requests can fail to a single target (e.g. OST
137  *             bulk requests) and if one has already caused a reconnection
138  *             (increasing the import->conn_cnt) the older failure should
139  *             not also cause a reconnection.  If zero it forces a reconnect.
140  */
141 int ptlrpc_set_import_discon(struct obd_import *imp, __u32 conn_cnt)
142 {
143         int rc = 0;
144
145         spin_lock(&imp->imp_lock);
146
147         if (imp->imp_state == LUSTRE_IMP_FULL &&
148             (conn_cnt == 0 || conn_cnt == imp->imp_conn_cnt)) {
149                 char *target_start;
150                 int   target_len;
151
152                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
153                           &target_start, &target_len);
154                 if (imp->imp_replayable) {
155                         LCONSOLE_WARN("%s: Connection to service %.*s via nid "
156                                "%s was lost; in progress operations using this "
157                                "service will wait for recovery to complete.\n",
158                                imp->imp_obd->obd_name, target_len, target_start,
159                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
160                 } else {
161                         LCONSOLE_ERROR_MSG(0x166, "%s: Connection to service "
162                                "%.*s via nid %s was lost; in progress "
163                                "operations using this service will fail.\n",
164                                imp->imp_obd->obd_name, target_len, target_start,
165                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
166                 }
167                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
168                 spin_unlock(&imp->imp_lock);
169
170                 if (obd_dump_on_timeout)
171                         libcfs_debug_dumplog();
172
173                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
174                 rc = 1;
175         } else {
176                 spin_unlock(&imp->imp_lock);
177                 CDEBUG(D_HA, "%s: import %p already %s (conn %u, was %u): %s\n",
178                        imp->imp_client->cli_name, imp,
179                        (imp->imp_state == LUSTRE_IMP_FULL &&
180                         imp->imp_conn_cnt > conn_cnt) ?
181                        "reconnected" : "not connected", imp->imp_conn_cnt,
182                        conn_cnt, ptlrpc_import_state_name(imp->imp_state));
183         }
184
185         return rc;
186 }
187
188 /* Must be called with imp_lock held! */
189 static void ptlrpc_deactivate_and_unlock_import(struct obd_import *imp)
190 {
191         ENTRY;
192         LASSERT_SPIN_LOCKED(&imp->imp_lock);
193
194         CDEBUG(D_HA, "setting import %s INVALID\n", obd2cli_tgt(imp->imp_obd));
195         imp->imp_invalid = 1;
196         imp->imp_generation++;
197         spin_unlock(&imp->imp_lock);
198
199         ptlrpc_abort_inflight(imp);
200         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
201 }
202
203 /*
204  * This acts as a barrier; all existing requests are rejected, and
205  * no new requests will be accepted until the import is valid again.
206  */
207 void ptlrpc_deactivate_import(struct obd_import *imp)
208 {
209         spin_lock(&imp->imp_lock);
210         ptlrpc_deactivate_and_unlock_import(imp);
211 }
212
213 static unsigned int
214 ptlrpc_inflight_deadline(struct ptlrpc_request *req, time_t now)
215 {
216         long dl;
217
218         if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
219               (req->rq_phase == RQ_PHASE_BULK) ||
220               (req->rq_phase == RQ_PHASE_NEW)))
221                 return 0;
222
223         if (req->rq_timedout)
224                 return 0;
225
226         if (req->rq_phase == RQ_PHASE_NEW)
227                 dl = req->rq_sent;
228         else
229                 dl = req->rq_deadline;
230
231         if (dl <= now)
232                 return 0;
233
234         return dl - now;
235 }
236
237 static unsigned int ptlrpc_inflight_timeout(struct obd_import *imp)
238 {
239         time_t now = cfs_time_current_sec();
240         struct list_head *tmp, *n;
241         struct ptlrpc_request *req;
242         unsigned int timeout = 0;
243
244         spin_lock(&imp->imp_lock);
245         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
246                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
247                 timeout = max(ptlrpc_inflight_deadline(req, now), timeout);
248         }
249         spin_unlock(&imp->imp_lock);
250         return timeout;
251 }
252
253 /*
254  * This function will invalidate the import, if necessary, then block
255  * for all the RPC completions, and finally notify the obd to
256  * invalidate its state (ie cancel locks, clear pending requests,
257  * etc).
258  */
259 void ptlrpc_invalidate_import(struct obd_import *imp)
260 {
261         struct list_head *tmp, *n;
262         struct ptlrpc_request *req;
263         struct l_wait_info lwi;
264         unsigned int timeout;
265         int rc;
266
267         atomic_inc(&imp->imp_inval_count);
268
269         /*
270          * If this is an invalid MGC connection, then don't bother
271          * waiting for imp_inflight to drop to 0.
272          */
273         if (imp->imp_invalid && imp->imp_recon_bk &&!imp->imp_obd->obd_no_recov)
274                 goto out;
275
276         if (!imp->imp_invalid || imp->imp_obd->obd_no_recov)
277                 ptlrpc_deactivate_import(imp);
278
279         LASSERT(imp->imp_invalid);
280
281         /* Wait forever until inflight == 0. We really can't do it another
282          * way because in some cases we need to wait for very long reply
283          * unlink. We can't do anything before that because there is really
284          * no guarantee that some rdma transfer is not in progress right now. */
285         do {
286                 /* Calculate max timeout for waiting on rpcs to error
287                  * out. Use obd_timeout if calculated value is smaller
288                  * than it. */
289                 timeout = ptlrpc_inflight_timeout(imp);
290                 timeout += timeout / 3;
291
292                 if (timeout == 0)
293                         timeout = obd_timeout;
294
295                 CDEBUG(D_RPCTRACE,"Sleeping %d sec for inflight to error out\n",
296                        timeout);
297
298                 /* Wait for all requests to error out and call completion
299                  * callbacks. Cap it at obd_timeout -- these should all
300                  * have been locally cancelled by ptlrpc_abort_inflight. */
301                 lwi = LWI_TIMEOUT_INTERVAL(
302                         cfs_timeout_cap(cfs_time_seconds(timeout)),
303                         cfs_time_seconds(1), NULL, NULL);
304                 rc = l_wait_event(imp->imp_recovery_waitq,
305                                 (atomic_read(&imp->imp_inflight) == 0), &lwi);
306                 if (rc) {
307                         const char *cli_tgt = obd2cli_tgt(imp->imp_obd);
308
309                         CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
310                                cli_tgt, rc, atomic_read(&imp->imp_inflight));
311
312                         spin_lock(&imp->imp_lock);
313                         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
314                                 req = list_entry(tmp, struct ptlrpc_request,
315                                         rq_list);
316                                 DEBUG_REQ(D_ERROR, req,"still on sending list");
317                         }
318                         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
319                                 req = list_entry(tmp, struct ptlrpc_request,
320                                         rq_list);
321                                 DEBUG_REQ(D_ERROR, req,"still on delayed list");
322                         }
323
324                         if (atomic_read(&imp->imp_unregistering) == 0) {
325                                 /* We know that only "unregistering" rpcs may
326                                  * still survive in sending or delaying lists
327                                  * (They are waiting for long reply unlink in
328                                  * sluggish nets). Let's check this. If there
329                                  * is no unregistering and inflight != 0 this
330                                  * is bug. */
331                                 LASSERT(atomic_read(&imp->imp_inflight) == 0);
332
333                                 /* Let's save one loop as soon as inflight have
334                                  * dropped to zero. No new inflights possible at
335                                  * this point. */
336                                 rc = 0;
337                         } else {
338                                 CERROR("%s: RPCs in \"%s\" phase found (%d). "
339                                        "Network is sluggish? Waiting them "
340                                        "to error out.\n", cli_tgt,
341                                        ptlrpc_phase2str(RQ_PHASE_UNREGISTERING),
342                                        atomic_read(&imp->imp_unregistering));
343                         }
344                         spin_unlock(&imp->imp_lock);
345                 }
346         } while (rc != 0);
347
348         /* Let's additionally check that no new rpcs added to import in
349          * "invalidate" state. */
350         LASSERT(atomic_read(&imp->imp_inflight) == 0);
351
352 out:
353         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
354
355         atomic_dec(&imp->imp_inval_count);
356         cfs_waitq_broadcast(&imp->imp_recovery_waitq);
357 }
358
359 /* unset imp_invalid */
360 void ptlrpc_activate_import(struct obd_import *imp)
361 {
362         struct obd_device *obd = imp->imp_obd;
363
364         spin_lock(&imp->imp_lock);
365         imp->imp_invalid = 0;
366         spin_unlock(&imp->imp_lock);
367
368         obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
369 }
370
371 void ptlrpc_fail_import(struct obd_import *imp, __u32 conn_cnt)
372 {
373         ENTRY;
374
375         LASSERT(!imp->imp_dlm_fake);
376
377         if (ptlrpc_set_import_discon(imp, conn_cnt)) {
378                 if (!imp->imp_replayable) {
379                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
380                                "auto-deactivating\n",
381                                obd2cli_tgt(imp->imp_obd),
382                                imp->imp_connection->c_remote_uuid.uuid,
383                                imp->imp_obd->obd_name);
384                         ptlrpc_deactivate_import(imp);
385                 }
386
387                 CDEBUG(D_HA, "%s: waking up pinger\n",
388                        obd2cli_tgt(imp->imp_obd));
389
390                 spin_lock(&imp->imp_lock);
391                 imp->imp_force_verify = 1;
392                 spin_unlock(&imp->imp_lock);
393
394                 ptlrpc_pinger_wake_up();
395         }
396         EXIT;
397 }
398
399 int ptlrpc_reconnect_import(struct obd_import *imp)
400 {
401
402         ptlrpc_set_import_discon(imp, 0);
403         /* Force a new connect attempt */
404         ptlrpc_invalidate_import(imp);
405         /* Do a fresh connect next time by zeroing the handle */
406         ptlrpc_disconnect_import(imp, 1);
407         /* Wait for all invalidate calls to finish */
408         if (atomic_read(&imp->imp_inval_count) > 0) {
409                 int rc;
410                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
411                 rc = l_wait_event(imp->imp_recovery_waitq,
412                                   (atomic_read(&imp->imp_inval_count) == 0),
413                                   &lwi);
414                 if (rc)
415                         CERROR("Interrupted, inval=%d\n",
416                                atomic_read(&imp->imp_inval_count));
417         }
418
419         /*
420          * Allow reconnect attempts. Note: Currently, the function is
421          * only called by MGC. So assume this is a recoverable import,
422          * and force import to be recoverable. fix this if you need to
423          */
424
425         imp->imp_obd->obd_no_recov = 0;
426         /* Remove 'invalid' flag */
427         ptlrpc_activate_import(imp);
428         /* Attempt a new connect */
429         ptlrpc_recover_import(imp, NULL);
430         return 0;
431 }
432
433 EXPORT_SYMBOL(ptlrpc_reconnect_import);
434
435 static int import_select_connection(struct obd_import *imp)
436 {
437         struct obd_import_conn *imp_conn = NULL, *conn;
438         struct obd_export *dlmexp;
439         int tried_all = 1;
440         ENTRY;
441
442         spin_lock(&imp->imp_lock);
443
444         if (list_empty(&imp->imp_conn_list)) {
445                 CERROR("%s: no connections available\n",
446                         imp->imp_obd->obd_name);
447                 spin_unlock(&imp->imp_lock);
448                 RETURN(-EINVAL);
449         }
450
451         list_for_each_entry(conn, &imp->imp_conn_list, oic_item) {
452                 CDEBUG(D_HA, "%s: connect to NID %s last attempt "LPU64"\n",
453                        imp->imp_obd->obd_name,
454                        libcfs_nid2str(conn->oic_conn->c_peer.nid),
455                        conn->oic_last_attempt);
456
457                 /* Don't thrash connections */
458                 if (cfs_time_before_64(cfs_time_current_64(),
459                                      conn->oic_last_attempt +
460                                      cfs_time_seconds(CONNECTION_SWITCH_MIN))) {
461                         continue;
462                 }
463
464                 /* If we have not tried this connection since the
465                    the last successful attempt, go with this one */
466                 if ((conn->oic_last_attempt == 0) ||
467                     cfs_time_beforeq_64(conn->oic_last_attempt,
468                                        imp->imp_last_success_conn)) {
469                         imp_conn = conn;
470                         tried_all = 0;
471                         break;
472                 }
473
474                 /* If all of the connections have already been tried
475                    since the last successful connection; just choose the
476                    least recently used */
477                 if (!imp_conn)
478                         imp_conn = conn;
479                 else if (cfs_time_before_64(conn->oic_last_attempt,
480                                             imp_conn->oic_last_attempt))
481                         imp_conn = conn;
482         }
483
484         /* if not found, simply choose the current one */
485         if (!imp_conn) {
486                 LASSERT(imp->imp_conn_current);
487                 imp_conn = imp->imp_conn_current;
488                 tried_all = 0;
489         }
490         LASSERT(imp_conn->oic_conn);
491
492         /* If we've tried everything, and we're back to the beginning of the
493            list, increase our timeout and try again. It will be reset when
494            we do finally connect. (FIXME: really we should wait for all network
495            state associated with the last connection attempt to drain before
496            trying to reconnect on it.) */
497         if (tried_all && (imp->imp_conn_list.next == &imp_conn->oic_item) &&
498             !imp->imp_recon_bk /* not retrying */) {
499                 if (at_get(&imp->imp_at.iat_net_latency) <
500                     CONNECTION_SWITCH_MAX) {
501                         at_add(&imp->imp_at.iat_net_latency,
502                                MIN(at_get(&imp->imp_at.iat_net_latency) +
503                                CONNECTION_SWITCH_INC, CONNECTION_SWITCH_MAX));
504                 }
505                 LASSERT(imp_conn->oic_last_attempt);
506                 CWARN("%s: tried all connections, increasing latency to %ds\n",
507                       imp->imp_obd->obd_name,
508                       at_get(&imp->imp_at.iat_net_latency));
509         }
510
511         imp_conn->oic_last_attempt = cfs_time_current_64();
512
513         /* switch connection, don't mind if it's same as the current one */
514         if (imp->imp_connection)
515                 ptlrpc_connection_put(imp->imp_connection);
516         imp->imp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
517
518         dlmexp =  class_conn2export(&imp->imp_dlm_handle);
519         LASSERT(dlmexp != NULL);
520         if (dlmexp->exp_connection)
521                 ptlrpc_connection_put(dlmexp->exp_connection);
522         dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
523         class_export_put(dlmexp);
524
525         if (imp->imp_conn_current != imp_conn) {
526                 if (imp->imp_conn_current)
527                         CDEBUG(D_HA, "Changing connection for %s to %s/%s\n",
528                                imp->imp_obd->obd_name, imp_conn->oic_uuid.uuid,
529                                libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
530                 imp->imp_conn_current = imp_conn;
531         }
532
533         CDEBUG(D_HA, "%s: import %p using connection %s/%s\n",
534                imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid,
535                libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
536
537         spin_unlock(&imp->imp_lock);
538
539         RETURN(0);
540 }
541
542 /**
543  * must be called under imp lock
544  */
545 static int ptlrpc_first_transno(struct obd_import *imp, __u64 *transno)
546 {
547         struct ptlrpc_request *req;
548         struct list_head *tmp;
549
550         if (list_empty(&imp->imp_replay_list))
551                 return 0;
552         tmp = imp->imp_replay_list.next;
553         req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
554         *transno = req->rq_transno;
555         if (req->rq_transno == 0) {
556                 DEBUG_REQ(D_ERROR, req, "zero transno in replay");
557                 LBUG();
558         }
559
560         return 1;
561 }
562
563 int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
564 {
565         struct obd_device *obd = imp->imp_obd;
566         int set_transno = 0;
567         int initial_connect = 0;
568         int rc;
569         __u64 committed_before_reconnect = 0;
570         struct ptlrpc_request *request;
571         __u32 size[] = { sizeof(struct ptlrpc_body),
572                        sizeof(imp->imp_obd->u.cli.cl_target_uuid),
573                        sizeof(obd->obd_uuid),
574                        sizeof(imp->imp_dlm_handle),
575                        sizeof(imp->imp_connect_data) };
576         char *tmp[] = { NULL,
577                         obd2cli_tgt(imp->imp_obd),
578                         obd->obd_uuid.uuid,
579                         (char *)&imp->imp_dlm_handle,
580                         (char *)&imp->imp_connect_data };
581         struct ptlrpc_connect_async_args *aa;
582
583         ENTRY;
584         spin_lock(&imp->imp_lock);
585         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
586                 spin_unlock(&imp->imp_lock);
587                 CERROR("can't connect to a closed import\n");
588                 RETURN(-EINVAL);
589         } else if (imp->imp_state == LUSTRE_IMP_FULL) {
590                 spin_unlock(&imp->imp_lock);
591                 CERROR("already connected\n");
592                 RETURN(0);
593         } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
594                 spin_unlock(&imp->imp_lock);
595                 CERROR("already connecting\n");
596                 RETURN(-EALREADY);
597         }
598
599         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
600
601         imp->imp_conn_cnt++;
602         imp->imp_resend_replay = 0;
603
604         if (!lustre_handle_is_used(&imp->imp_remote_handle))
605                 initial_connect = 1;
606         else
607                 committed_before_reconnect = imp->imp_peer_committed_transno;
608
609         set_transno = ptlrpc_first_transno(imp,
610                                            &imp->imp_connect_data.ocd_transno);
611
612         spin_unlock(&imp->imp_lock);
613
614         if (new_uuid) {
615                 struct obd_uuid uuid;
616
617                 obd_str2uuid(&uuid, new_uuid);
618                 rc = import_set_conn_priority(imp, &uuid);
619                 if (rc)
620                         GOTO(out, rc);
621         }
622
623         rc = import_select_connection(imp);
624         if (rc)
625                 GOTO(out, rc);
626
627         /* last in connection list */
628         if (imp->imp_conn_current->oic_item.next == &imp->imp_conn_list) {
629                 if (imp->imp_initial_recov_bk && initial_connect) {
630                         CDEBUG(D_HA, "Last connection attempt (%d) for %s\n",
631                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
632                         /* Don't retry if connect fails */
633                         rc = 0;
634                         obd_set_info_async(obd->obd_self_export,
635                                            sizeof(KEY_INIT_RECOV),
636                                            KEY_INIT_RECOV,
637                                            sizeof(rc), &rc, NULL);
638                 }
639                 if (imp->imp_recon_bk) {
640                         CDEBUG(D_HA, "Last reconnection attempt (%d) for %s\n",
641                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
642                         spin_lock(&imp->imp_lock);
643                         imp->imp_last_recon = 1;
644                         spin_unlock(&imp->imp_lock);
645                 }
646         }
647
648         /* Reset connect flags to the originally requested flags, in case
649          * the server is updated on-the-fly we will get the new features. */
650         imp->imp_connect_data.ocd_connect_flags = imp->imp_connect_flags_orig;
651         imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
652
653         rc = obd_reconnect(imp->imp_obd->obd_self_export, obd,
654                            &obd->obd_uuid, &imp->imp_connect_data, NULL);
655         if (rc)
656                 GOTO(out, rc);
657
658         request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, imp->imp_connect_op,
659                                   5, size, tmp);
660         if (!request)
661                 GOTO(out, rc = -ENOMEM);
662
663         /* Report the rpc service time to the server so that it knows how long
664          * to wait for clients to join recovery */
665         lustre_msg_set_service_time(request->rq_reqmsg,
666                                     at_timeout2est(request->rq_timeout));
667
668         /* The amount of time we give the server to process the connect req.
669          * import_select_connection will increase the net latency on
670          * repeated reconnect attempts to cover slow networks.
671          * We override/ignore the server rpc completion estimate here,
672          * which may be large if this is a reconnect attempt */
673         request->rq_timeout = INITIAL_CONNECT_TIMEOUT;
674         lustre_msg_set_timeout(request->rq_reqmsg, request->rq_timeout);
675
676 #ifndef __KERNEL__
677         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
678 #endif
679         if (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V1)
680                 lustre_msg_add_op_flags(request->rq_reqmsg,
681                                         MSG_CONNECT_NEXT_VER);
682
683         request->rq_no_resend = request->rq_no_delay = 1;
684         request->rq_send_state = LUSTRE_IMP_CONNECTING;
685         /* Allow a slightly larger reply for future growth compatibility */
686         size[REPLY_REC_OFF] = sizeof(struct obd_connect_data) +
687                               16 * sizeof(__u64);
688         ptlrpc_req_set_repsize(request, 2, size);
689         request->rq_interpret_reply = ptlrpc_connect_interpret;
690
691         CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
692         aa = ptlrpc_req_async_args(request);
693         memset(aa, 0, sizeof *aa);
694
695         aa->pcaa_peer_committed = committed_before_reconnect;
696         aa->pcaa_initial_connect = initial_connect;
697         if (aa->pcaa_initial_connect) {
698                 spin_lock(&imp->imp_lock);
699                 imp->imp_replayable = 1;
700                 spin_unlock(&imp->imp_lock);
701                 lustre_msg_add_op_flags(request->rq_reqmsg,
702                                         MSG_CONNECT_INITIAL);
703         }
704
705         if (set_transno)
706                 lustre_msg_add_op_flags(request->rq_reqmsg,
707                                         MSG_CONNECT_TRANSNO);
708
709         DEBUG_REQ(D_RPCTRACE, request, "%sconnect request %d",
710                   aa->pcaa_initial_connect ? "initial " : "re",
711                   imp->imp_conn_cnt);
712         ptlrpcd_add_req(request);
713         rc = 0;
714 out:
715         if (rc != 0) {
716                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
717         }
718
719         RETURN(rc);
720 }
721 EXPORT_SYMBOL(ptlrpc_connect_import);
722
723 static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp)
724 {
725 #ifdef __KERNEL__
726         struct obd_import_conn *imp_conn;
727 #endif
728         int wake_pinger = 0;
729
730         ENTRY;
731
732         spin_lock(&imp->imp_lock);
733         if (list_empty(&imp->imp_conn_list))
734                 GOTO(unlock, 0);
735
736 #ifdef __KERNEL__
737         imp_conn = list_entry(imp->imp_conn_list.prev,
738                               struct obd_import_conn,
739                               oic_item);
740
741         /* XXX: When the failover node is the primary node, it is possible
742          * to have two identical connections in imp_conn_list. We must
743          * compare not conn's pointers but NIDs, otherwise we can defeat
744          * connection throttling. (See bug 14774.) */
745         if (imp->imp_conn_current->oic_conn->c_peer.nid !=
746                                 imp_conn->oic_conn->c_peer.nid) {
747                 ptlrpc_ping_import_soon(imp);
748                 wake_pinger = 1;
749         }
750
751 #else
752         /* liblustre has no pinger thead, so we wakup pinger anyway */
753         wake_pinger = 1;
754 #endif
755  unlock:
756         spin_unlock(&imp->imp_lock);
757
758         if (wake_pinger)
759                 ptlrpc_pinger_wake_up();
760
761         EXIT;
762 }
763
764 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
765                                     void * data, int rc)
766 {
767         struct ptlrpc_connect_async_args *aa = data;
768         struct obd_import *imp = request->rq_import;
769         struct client_obd *cli = &imp->imp_obd->u.cli;
770         struct lustre_handle old_hdl;
771         __u64 old_connect_flags;
772         int msg_flags;
773         ENTRY;
774
775         spin_lock(&imp->imp_lock);
776         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
777                 spin_unlock(&imp->imp_lock);
778                 RETURN(0);
779         }
780         spin_unlock(&imp->imp_lock);
781
782         if (rc)
783                 GOTO(out, rc);
784
785         LASSERT(imp->imp_conn_current);
786
787         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
788
789         /* All imports are pingable */
790         spin_lock(&imp->imp_lock);
791         imp->imp_pingable = 1;
792
793         if (aa->pcaa_initial_connect) {
794                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
795                         imp->imp_replayable = 1;
796                         spin_unlock(&imp->imp_lock);
797                         CDEBUG(D_HA, "connected to replayable target: %s\n",
798                                obd2cli_tgt(imp->imp_obd));
799                 } else {
800                         imp->imp_replayable = 0;
801                         spin_unlock(&imp->imp_lock);
802                 }
803
804                 if ((request->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V1 &&
805                      msg_flags & MSG_CONNECT_NEXT_VER) ||
806                     request->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V2) {
807                         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
808                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v2\n",
809                                obd2cli_tgt(imp->imp_obd));
810                 } else {
811                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v1\n",
812                                obd2cli_tgt(imp->imp_obd));
813                 }
814
815                 imp->imp_remote_handle =
816                                 *lustre_msg_get_handle(request->rq_repmsg);
817
818                 /* Initial connects are allowed for clients with non-random
819                  * uuids when servers are in recovery.  Simply signal the
820                  * servers replay is complete and wait in REPLAY_WAIT. */
821                 if (msg_flags & MSG_CONNECT_RECOVERING) {
822                         CDEBUG(D_HA, "connect to %s during recovery\n",
823                                obd2cli_tgt(imp->imp_obd));
824                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
825                 } else {
826                         IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
827                         ptlrpc_activate_import(imp);
828                 }
829                 GOTO(finish, rc = 0);
830         } else {
831                 spin_unlock(&imp->imp_lock);
832         }
833
834         /* Determine what recovery state to move the import to. */
835         if (MSG_CONNECT_RECONNECT & msg_flags) {
836                 memset(&old_hdl, 0, sizeof(old_hdl));
837                 if (!memcmp(&old_hdl, lustre_msg_get_handle(request->rq_repmsg),
838                             sizeof (old_hdl))) {
839                         CERROR("%s@%s didn't like our handle "LPX64
840                                ", failed\n", obd2cli_tgt(imp->imp_obd),
841                                imp->imp_connection->c_remote_uuid.uuid,
842                                imp->imp_dlm_handle.cookie);
843                         GOTO(out, rc = -ENOTCONN);
844                 }
845
846                 if (memcmp(&imp->imp_remote_handle,
847                            lustre_msg_get_handle(request->rq_repmsg),
848                            sizeof(imp->imp_remote_handle))) {
849                         int level = msg_flags & MSG_CONNECT_RECOVERING ?
850                                 D_HA : D_WARNING;
851
852                         /* Bug 16611/14775: if server handle have changed,
853                          * that means some sort of disconnection happened.
854                          * If the server is not in recovery, that also means it
855                          * already erased all of our state because of previous
856                          * eviction. If it is in recovery - we are safe to
857                          * participate since we can reestablish all of our state
858                          * with server again */
859                         CDEBUG(level,"%s@%s changed server handle from "
860                                      LPX64" to "LPX64"%s\n",
861                                      obd2cli_tgt(imp->imp_obd),
862                                      imp->imp_connection->c_remote_uuid.uuid,
863                                      imp->imp_remote_handle.cookie,
864                                      lustre_msg_get_handle(request->rq_repmsg)->
865                                                                         cookie,
866                                      (MSG_CONNECT_RECOVERING & msg_flags) ?
867                                          " but is still in recovery" : "");
868
869                         imp->imp_remote_handle =
870                                      *lustre_msg_get_handle(request->rq_repmsg);
871
872                         if (!(MSG_CONNECT_RECOVERING & msg_flags)) {
873                                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
874                                 GOTO(finish, rc = 0);
875                         }
876
877                 } else {
878                         CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
879                                obd2cli_tgt(imp->imp_obd),
880                                imp->imp_connection->c_remote_uuid.uuid);
881                 }
882
883                 if (imp->imp_invalid) {
884                         CDEBUG(D_HA, "%s: reconnected but import is invalid; "
885                                "marking evicted\n", imp->imp_obd->obd_name);
886                         IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
887                 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
888                         CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
889                                imp->imp_obd->obd_name,
890                                obd2cli_tgt(imp->imp_obd));
891
892                         spin_lock(&imp->imp_lock);
893                         imp->imp_resend_replay = 1;
894                         /* VBR: delayed connection */
895                         if (MSG_CONNECT_DELAYED & msg_flags) {
896                                 imp->imp_delayed_recovery = 1;
897                                 imp->imp_no_lock_replay = 1;
898                         }
899                         spin_unlock(&imp->imp_lock);
900
901                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
902                 } else {
903                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
904                 }
905         } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
906                 LASSERT(imp->imp_replayable);
907                 imp->imp_remote_handle =
908                                 *lustre_msg_get_handle(request->rq_repmsg);
909                 imp->imp_last_replay_transno = 0;
910                 /* VBR: delayed connection */
911                 if (MSG_CONNECT_DELAYED & msg_flags) {
912                         spin_lock(&imp->imp_lock);
913                         imp->imp_delayed_recovery = 1;
914                         imp->imp_no_lock_replay = 1;
915                         spin_unlock(&imp->imp_lock);
916                 }
917                 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
918         } else {
919                 DEBUG_REQ(D_HA, request, "evicting (not initial connect and "
920                           "flags reconnect/recovering not set: %x)",msg_flags);
921                 imp->imp_remote_handle =
922                                 *lustre_msg_get_handle(request->rq_repmsg);
923                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
924         }
925
926         /* Sanity checks for a reconnected import. */
927         if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
928                 CERROR("imp_replayable flag does not match server "
929                        "after reconnect. We should LBUG right here.\n");
930         }
931
932         if (lustre_msg_get_last_committed(request->rq_repmsg) <
933             aa->pcaa_peer_committed) {
934                 CERROR("%s went back in time (transno "LPD64
935                        " was previously committed, server now claims "LPD64
936                        ")!  See https://bugzilla.lustre.org/show_bug.cgi?"
937                        "id=9646\n",
938                        obd2cli_tgt(imp->imp_obd), aa->pcaa_peer_committed,
939                        lustre_msg_get_last_committed(request->rq_repmsg));
940         }
941
942 finish:
943         rc = ptlrpc_import_recovery_state_machine(imp);
944         if (rc != 0) {
945                 if (rc == -ENOTCONN) {
946                         CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
947                                "invalidating and reconnecting\n",
948                                obd2cli_tgt(imp->imp_obd),
949                                imp->imp_connection->c_remote_uuid.uuid);
950                         ptlrpc_connect_import(imp, NULL);
951                         RETURN(0);
952                 }
953         } else {
954                 struct obd_connect_data *ocd;
955                 struct obd_export *exp;
956
957                 ocd = lustre_swab_repbuf(request, REPLY_REC_OFF, sizeof(*ocd),
958                                          lustre_swab_connect);
959                 spin_lock(&imp->imp_lock);
960                 list_del(&imp->imp_conn_current->oic_item);
961                 list_add(&imp->imp_conn_current->oic_item, &imp->imp_conn_list);
962                 imp->imp_last_success_conn =
963                         imp->imp_conn_current->oic_last_attempt;
964
965                 if (ocd == NULL) {
966                         spin_unlock(&imp->imp_lock);
967                         CERROR("Wrong connect data from server\n");
968                         rc = -EPROTO;
969                         GOTO(out, rc);
970                 }
971
972                 imp->imp_connect_data = *ocd;
973
974                 exp = class_conn2export(&imp->imp_dlm_handle);
975                 spin_unlock(&imp->imp_lock);
976
977                 /* check that server granted subset of flags we asked for. */
978                 LASSERTF((ocd->ocd_connect_flags &
979                           imp->imp_connect_flags_orig) ==
980                          ocd->ocd_connect_flags, LPX64" != "LPX64,
981                          imp->imp_connect_flags_orig, ocd->ocd_connect_flags);
982
983                 if (!exp) {
984                         /* This could happen if export is cleaned during the
985                            connect attempt */
986                         CERROR("Missing export for %s\n",
987                                imp->imp_obd->obd_name);
988                         GOTO(out, rc = -ENODEV);
989                 }
990                 old_connect_flags = exp->exp_connect_flags;
991                 exp->exp_connect_flags = ocd->ocd_connect_flags;
992                 imp->imp_obd->obd_self_export->exp_connect_flags =
993                         ocd->ocd_connect_flags;
994                 class_export_put(exp);
995
996                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD);
997
998                 if (!ocd->ocd_ibits_known &&
999                     ocd->ocd_connect_flags & OBD_CONNECT_IBITS)
1000                         CERROR("Inodebits aware server returned zero compatible"
1001                                " bits?\n");
1002
1003                 if ((ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
1004                     (ocd->ocd_version > LUSTRE_VERSION_CODE +
1005                                         LUSTRE_VERSION_OFFSET_WARN ||
1006                      ocd->ocd_version < LUSTRE_VERSION_CODE -
1007                                         LUSTRE_VERSION_OFFSET_WARN)) {
1008                         /* Sigh, some compilers do not like #ifdef in the middle
1009                            of macro arguments */
1010 #ifdef __KERNEL__
1011                         const char *older =
1012                                 "older.  Consider upgrading this client";
1013 #else
1014                         const char *older =
1015                                 "older.  Consider recompiling this application";
1016 #endif
1017                         const char *newer = "newer than client version";
1018
1019                         LCONSOLE_WARN("Server %s version (%d.%d.%d.%d) "
1020                                       "is much %s (%s)\n",
1021                                       obd2cli_tgt(imp->imp_obd),
1022                                       OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
1023                                       OBD_OCD_VERSION_MINOR(ocd->ocd_version),
1024                                       OBD_OCD_VERSION_PATCH(ocd->ocd_version),
1025                                       OBD_OCD_VERSION_FIX(ocd->ocd_version),
1026                                       ocd->ocd_version > LUSTRE_VERSION_CODE ?
1027                                       newer : older, LUSTRE_VERSION_STRING);
1028                 }
1029
1030                 if (ocd->ocd_connect_flags & OBD_CONNECT_CKSUM) {
1031                         /* We sent to the server ocd_cksum_types with bits set
1032                          * for algorithms we understand. The server masked off
1033                          * the checksum types it doesn't support */
1034                         if ((ocd->ocd_cksum_types & OBD_CKSUM_ALL) == 0) {
1035                                 LCONSOLE_WARN("The negotiation of the checksum "
1036                                               "alogrithm to use with server %s "
1037                                               "failed (%x/%x), disabling "
1038                                               "checksums\n",
1039                                               obd2cli_tgt(imp->imp_obd),
1040                                               ocd->ocd_cksum_types,
1041                                               OBD_CKSUM_ALL);
1042                                 cli->cl_checksum = 0;
1043                                 cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
1044                                 cli->cl_cksum_type = OBD_CKSUM_CRC32;
1045                         } else {
1046                                 cli->cl_supp_cksum_types = ocd->ocd_cksum_types;
1047
1048                                 if (ocd->ocd_cksum_types & OSC_DEFAULT_CKSUM)
1049                                         cli->cl_cksum_type = OSC_DEFAULT_CKSUM;
1050                                 else if (ocd->ocd_cksum_types & OBD_CKSUM_ADLER)
1051                                         cli->cl_cksum_type = OBD_CKSUM_ADLER;
1052                                 else
1053                                         cli->cl_cksum_type = OBD_CKSUM_CRC32;
1054                         }
1055                 } else {
1056                         /* The server does not support OBD_CONNECT_CKSUM.
1057                          * Enforce CRC32 for backward compatibility*/
1058                         cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
1059                         cli->cl_cksum_type = OBD_CKSUM_CRC32;
1060                 }
1061
1062                 if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
1063                         cli->cl_max_pages_per_rpc =
1064                                 ocd->ocd_brw_size >> CFS_PAGE_SHIFT;
1065                 }
1066
1067                 /* Reset ns_connect_flags only for initial connect. It might be
1068                  * changed in while using FS and if we reset it in reconnect
1069                  * this leads to lossing user settings done before such as
1070                  * disable lru_resize, etc. */
1071                 if (old_connect_flags != exp->exp_connect_flags ||
1072                     aa->pcaa_initial_connect) {
1073                         CDEBUG(D_HA, "%s: Resetting ns_connect_flags to server "
1074                                "flags: "LPX64"\n", imp->imp_obd->obd_name,
1075                                ocd->ocd_connect_flags);
1076                         imp->imp_obd->obd_namespace->ns_connect_flags =
1077                                 ocd->ocd_connect_flags;
1078                         imp->imp_obd->obd_namespace->ns_orig_connect_flags =
1079                                 ocd->ocd_connect_flags;
1080                 }
1081
1082                 if ((ocd->ocd_connect_flags & OBD_CONNECT_AT) &&
1083                     (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V2))
1084                         /* We need a per-message support flag, because
1085                            a. we don't know if the incoming connect reply
1086                               supports AT or not (in reply_in_callback)
1087                               until we unpack it.
1088                            b. failovered server means export and flags are gone
1089                               (in ptlrpc_send_reply).
1090                            Can only be set when we know AT is supported at
1091                            both ends */
1092                         imp->imp_msghdr_flags |= MSGHDR_AT_SUPPORT;
1093                 else
1094                         imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
1095
1096                 LASSERT((cli->cl_max_pages_per_rpc <= PTLRPC_MAX_BRW_PAGES) &&
1097                         (cli->cl_max_pages_per_rpc > 0));
1098         }
1099
1100  out:
1101         if (rc != 0) {
1102                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
1103                 spin_lock(&imp->imp_lock);
1104                 if (aa->pcaa_initial_connect && !imp->imp_initial_recov &&
1105                     (request->rq_import_generation == imp->imp_generation))
1106                         ptlrpc_deactivate_and_unlock_import(imp);
1107                 else
1108                         spin_unlock(&imp->imp_lock);
1109
1110                 if (imp->imp_recon_bk && imp->imp_last_recon) {
1111                         /* Give up trying to reconnect */
1112                         imp->imp_obd->obd_no_recov = 1;
1113                         ptlrpc_deactivate_import(imp);
1114                 }
1115
1116                 if (rc == -EPROTO) {
1117                         struct obd_connect_data *ocd;
1118                         ocd = lustre_swab_repbuf(request, REPLY_REC_OFF,
1119                                                  sizeof *ocd,
1120                                                  lustre_swab_connect);
1121                         if (ocd &&
1122                             (ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
1123                             (ocd->ocd_version != LUSTRE_VERSION_CODE)) {
1124                            /* Actually servers are only supposed to refuse
1125                               connection from liblustre clients, so we should
1126                               never see this from VFS context */
1127                                 LCONSOLE_ERROR_MSG(0x16a, "Server %s version "
1128                                         "(%d.%d.%d.%d)"
1129                                         " refused connection from this client "
1130                                         "with an incompatible version (%s).  "
1131                                         "Client must be recompiled\n",
1132                                         obd2cli_tgt(imp->imp_obd),
1133                                         OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
1134                                         OBD_OCD_VERSION_MINOR(ocd->ocd_version),
1135                                         OBD_OCD_VERSION_PATCH(ocd->ocd_version),
1136                                         OBD_OCD_VERSION_FIX(ocd->ocd_version),
1137                                         LUSTRE_VERSION_STRING);
1138                                 ptlrpc_deactivate_import(imp);
1139                                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CLOSED);
1140                         }
1141                         RETURN(-EPROTO);
1142                 }
1143
1144                 ptlrpc_maybe_ping_import_soon(imp);
1145
1146                 CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
1147                        obd2cli_tgt(imp->imp_obd),
1148                        (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
1149         }
1150
1151         spin_lock(&imp->imp_lock);
1152         imp->imp_last_recon = 0;
1153         spin_unlock(&imp->imp_lock);
1154
1155         cfs_waitq_broadcast(&imp->imp_recovery_waitq);
1156         RETURN(rc);
1157 }
1158
1159 static int completed_replay_interpret(struct ptlrpc_request *req,
1160                                       void * data, int rc)
1161 {
1162         ENTRY;
1163         atomic_dec(&req->rq_import->imp_replay_inflight);
1164         if (req->rq_status == 0 &&
1165             !req->rq_import->imp_vbr_failed) {
1166                 ptlrpc_import_recovery_state_machine(req->rq_import);
1167         } else {
1168                 if (req->rq_import->imp_vbr_failed) {
1169                         CDEBUG(D_WARNING,
1170                                "%s: version recovery fails, reconnecting\n",
1171                                req->rq_import->imp_obd->obd_name);
1172                         spin_lock(&req->rq_import->imp_lock);
1173                         req->rq_import->imp_vbr_failed = 0;
1174                         spin_unlock(&req->rq_import->imp_lock);
1175                 } else {
1176                         CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
1177                                      "reconnecting\n",
1178                                req->rq_import->imp_obd->obd_name,
1179                                req->rq_status);
1180                 }
1181                 ptlrpc_connect_import(req->rq_import, NULL);
1182         }
1183         RETURN(0);
1184 }
1185
1186 static int signal_completed_replay(struct obd_import *imp)
1187 {
1188         struct ptlrpc_request *req;
1189         ENTRY;
1190
1191         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
1192         atomic_inc(&imp->imp_replay_inflight);
1193
1194         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, 1, NULL, NULL);
1195         if (!req) {
1196                 atomic_dec(&imp->imp_replay_inflight);
1197                 RETURN(-ENOMEM);
1198         }
1199
1200         ptlrpc_req_set_repsize(req, 1, NULL);
1201         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
1202         lustre_msg_add_flags(req->rq_reqmsg,
1203                              MSG_LOCK_REPLAY_DONE |
1204                              MSG_REQ_REPLAY_DONE |
1205                              MSG_LAST_REPLAY);
1206
1207         if (imp->imp_delayed_recovery)
1208                 lustre_msg_add_flags(req->rq_reqmsg, MSG_DELAY_REPLAY);
1209         req->rq_timeout *= 3;
1210         req->rq_interpret_reply = completed_replay_interpret;
1211
1212         ptlrpcd_add_req(req);
1213         RETURN(0);
1214 }
1215
1216 #ifdef __KERNEL__
1217 static int ptlrpc_invalidate_import_thread(void *data)
1218 {
1219         struct obd_import *imp = data;
1220
1221         ENTRY;
1222
1223         ptlrpc_daemonize("ll_imp_inval");
1224
1225         CDEBUG(D_HA, "thread invalidate import %s to %s@%s\n",
1226                imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
1227                imp->imp_connection->c_remote_uuid.uuid);
1228
1229         ptlrpc_invalidate_import(imp);
1230
1231         if (obd_dump_on_eviction) {
1232                 CERROR("dump the log upon eviction\n");
1233                 libcfs_debug_dumplog();
1234         }
1235
1236         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1237         ptlrpc_import_recovery_state_machine(imp);
1238
1239         class_import_put(imp);
1240         RETURN(0);
1241 }
1242 #endif
1243
1244 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
1245 {
1246         int rc = 0;
1247         int inflight;
1248         char *target_start;
1249         int target_len;
1250
1251         ENTRY;
1252         if (imp->imp_state == LUSTRE_IMP_EVICTED) {
1253                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1254                           &target_start, &target_len);
1255                 /* Don't care about MGC eviction */
1256                 if (strcmp(imp->imp_obd->obd_type->typ_name,
1257                            LUSTRE_MGC_NAME) != 0) {
1258                         LCONSOLE_ERROR_MSG(0x167, "This client was evicted by "
1259                                            "%.*s; in progress operations using "
1260                                            "this service will fail.\n",
1261                                            target_len, target_start);
1262                 }
1263                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
1264                        obd2cli_tgt(imp->imp_obd),
1265                        imp->imp_connection->c_remote_uuid.uuid);
1266
1267 #ifdef __KERNEL__
1268                 /* bug 17802:  XXX client_disconnect_export vs connect request
1269                  * race. if client will evicted at this time, we start
1270                  * invalidate thread without referece to import and import can
1271                  * be freed at same time. */
1272                 class_import_get(imp);
1273                 rc = cfs_kernel_thread(ptlrpc_invalidate_import_thread, imp,
1274                                    CLONE_VM | CLONE_FILES);
1275                 if (rc < 0) {
1276                         class_import_put(imp);
1277                         CERROR("error starting invalidate thread: %d\n", rc);
1278                 } else {
1279                         rc = 0;
1280                 }
1281                 RETURN(rc);
1282 #else
1283                 ptlrpc_invalidate_import(imp);
1284
1285                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1286 #endif
1287         }
1288
1289         if (imp->imp_state == LUSTRE_IMP_REPLAY) {
1290                 CDEBUG(D_HA, "replay requested by %s\n",
1291                        obd2cli_tgt(imp->imp_obd));
1292                 rc = ptlrpc_replay_next(imp, &inflight);
1293                 if (inflight == 0 &&
1294                     atomic_read(&imp->imp_replay_inflight) == 0) {
1295                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
1296                         rc = ldlm_replay_locks(imp);
1297                         if (rc)
1298                                 GOTO(out, rc);
1299                 }
1300                 rc = 0;
1301         }
1302
1303         if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
1304                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1305                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
1306                         rc = signal_completed_replay(imp);
1307                         if (rc)
1308                                 GOTO(out, rc);
1309                 }
1310
1311         }
1312
1313         if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
1314                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1315                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1316                 }
1317         }
1318
1319         if (imp->imp_state == LUSTRE_IMP_RECOVER) {
1320                 CDEBUG(D_HA, "reconnected to %s@%s\n",
1321                        obd2cli_tgt(imp->imp_obd),
1322                        imp->imp_connection->c_remote_uuid.uuid);
1323
1324                 rc = ptlrpc_resend(imp);
1325                 if (rc)
1326                         GOTO(out, rc);
1327                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
1328                 ptlrpc_activate_import(imp);
1329
1330                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1331                           &target_start, &target_len);
1332                 LCONSOLE_INFO("%s: Connection restored to service %.*s "
1333                               "using nid %s.\n", imp->imp_obd->obd_name,
1334                               target_len, target_start,
1335                               libcfs_nid2str(imp->imp_connection->c_peer.nid));
1336         }
1337
1338         if (imp->imp_state == LUSTRE_IMP_FULL) {
1339                 cfs_waitq_broadcast(&imp->imp_recovery_waitq);
1340                 ptlrpc_wake_delayed(imp);
1341         }
1342
1343  out:
1344         RETURN(rc);
1345 }
1346
1347 static int back_to_sleep(void *unused)
1348 {
1349         return 0;
1350 }
1351
1352 int ptlrpc_disconnect_import(struct obd_import *imp, int noclose)
1353 {
1354         struct ptlrpc_request *req;
1355         int rq_opc, rc = 0;
1356         int nowait = imp->imp_obd->obd_force;
1357         ENTRY;
1358
1359         if (nowait)
1360                 GOTO(set_state, rc);
1361
1362         switch (imp->imp_connect_op) {
1363         case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
1364         case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
1365         case MGS_CONNECT: rq_opc = MGS_DISCONNECT; break;
1366         default:
1367                 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
1368                        obd2cli_tgt(imp->imp_obd), imp->imp_connect_op);
1369                 RETURN(-EINVAL);
1370         }
1371
1372         if (ptlrpc_import_in_recovery(imp)) {
1373                 struct l_wait_info lwi;
1374                 cfs_duration_t timeout;
1375
1376                 if (AT_OFF) {
1377                         timeout = cfs_time_seconds(obd_timeout);
1378                 } else {
1379                         int idx = import_at_get_index(imp,
1380                                 imp->imp_client->cli_request_portal);
1381                         timeout = cfs_time_seconds(
1382                                 at_get(&imp->imp_at.iat_service_estimate[idx]));
1383                 }
1384                 lwi = LWI_TIMEOUT_INTR(cfs_timeout_cap(timeout),
1385                                        back_to_sleep, LWI_ON_SIGNAL_NOOP, NULL);
1386                 rc = l_wait_event(imp->imp_recovery_waitq,
1387                                   !ptlrpc_import_in_recovery(imp), &lwi);
1388         }
1389
1390         spin_lock(&imp->imp_lock);
1391         if (imp->imp_state != LUSTRE_IMP_FULL)
1392                 GOTO(out, 0);
1393
1394         spin_unlock(&imp->imp_lock);
1395
1396         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, rq_opc, 1, NULL, NULL);
1397         if (req) {
1398                 /* We are disconnecting, do not retry a failed DISCONNECT rpc if
1399                  * it fails.  We can get through the above with a down server
1400                  * if the client doesn't know the server is gone yet. */
1401                 req->rq_no_resend = 1;
1402
1403 #ifndef CRAY_XT3
1404                 /* We want client umounts to happen quickly, no matter the
1405                    server state... */
1406                 req->rq_timeout = min_t(int, req->rq_timeout,
1407                                         INITIAL_CONNECT_TIMEOUT);
1408 #else
1409                 /* ... but we always want liblustre clients to nicely
1410                    disconnect, so only use the adaptive value. */
1411                 if (AT_OFF)
1412                         req->rq_timeout = obd_timeout / 3;
1413 #endif
1414
1415                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
1416                 req->rq_send_state =  LUSTRE_IMP_CONNECTING;
1417                 ptlrpc_req_set_repsize(req, 1, NULL);
1418                 rc = ptlrpc_queue_wait(req);
1419                 ptlrpc_req_finished(req);
1420         }
1421
1422 set_state:
1423         spin_lock(&imp->imp_lock);
1424 out:
1425         if (noclose)
1426                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
1427         else
1428                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1429         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
1430         /* Try all connections in the future - bz 12758 */
1431         imp->imp_last_recon = 0;
1432         spin_unlock(&imp->imp_lock);
1433
1434         RETURN(rc);
1435 }
1436
1437 /* Sets maximal number of RPCs possible originating from other side of this
1438    import (server) to us and number of async RPC replies that we are not waiting
1439    for arriving */
1440 void ptlrpc_import_setasync(struct obd_import *imp, int count)
1441 {
1442         LNetSetAsync(imp->imp_connection->c_peer, count);
1443 }
1444
1445 void ptlrpc_cleanup_imp(struct obd_import *imp)
1446 {
1447         ENTRY;
1448
1449         spin_lock(&imp->imp_lock);
1450         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1451         imp->imp_generation++;
1452         spin_unlock(&imp->imp_lock);
1453         ptlrpc_abort_inflight(imp);
1454
1455         EXIT;
1456 }
1457
1458 /* Adaptive Timeout utils */
1459 extern unsigned int at_min, at_max, at_history;
1460
1461 /* Bin into timeslices using AT_BINS bins.
1462    This gives us a max of the last binlimit*AT_BINS secs without the storage,
1463    but still smoothing out a return to normalcy from a slow response.
1464    (E.g. remember the maximum latency in each minute of the last 4 minutes.) */
1465 int at_add(struct adaptive_timeout *at, unsigned int val)
1466 {
1467         unsigned int old = at->at_current;
1468         time_t now = cfs_time_current_sec();
1469         time_t binlimit = max_t(time_t, at_history / AT_BINS, 1);
1470
1471         LASSERT(at);
1472         CDEBUG(D_OTHER, "add %u to %p time=%lu v=%u (%u %u %u %u)\n",
1473                val, at, now - at->at_binstart, at->at_current,
1474                at->at_hist[0], at->at_hist[1], at->at_hist[2], at->at_hist[3]);
1475
1476         if (val == 0)
1477                 /* 0's don't count, because we never want our timeout to
1478                    drop to 0, and because 0 could mean an error */
1479                 return 0;
1480
1481         spin_lock(&at->at_lock);
1482
1483         if (unlikely(at->at_binstart == 0)) {
1484                 /* Special case to remove default from history */
1485                 at->at_current = val;
1486                 at->at_worst_ever = val;
1487                 at->at_worst_time = now;
1488                 at->at_hist[0] = val;
1489                 at->at_binstart = now;
1490         } else if (now - at->at_binstart < binlimit ) {
1491                 /* in bin 0 */
1492                 at->at_hist[0] = max(val, at->at_hist[0]);
1493                 at->at_current = max(val, at->at_current);
1494         } else {
1495                 int i, shift;
1496                 unsigned int maxv = val;
1497                 /* move bins over */
1498                 shift = (now - at->at_binstart) / binlimit;
1499                 LASSERT(shift > 0);
1500                 for(i = AT_BINS - 1; i >= 0; i--) {
1501                         if (i >= shift) {
1502                                 at->at_hist[i] = at->at_hist[i - shift];
1503                                 maxv = max(maxv, at->at_hist[i]);
1504                         } else {
1505                                 at->at_hist[i] = 0;
1506                         }
1507                 }
1508                 at->at_hist[0] = val;
1509                 at->at_current = maxv;
1510                 at->at_binstart += shift * binlimit;
1511         }
1512
1513         if (at->at_current > at->at_worst_ever) {
1514                 at->at_worst_ever = at->at_current;
1515                 at->at_worst_time = now;
1516         }
1517
1518         if (at->at_flags & AT_FLG_NOHIST)
1519                 /* Only keep last reported val; keeping the rest of the history
1520                    for proc only */
1521                 at->at_current = val;
1522
1523         if (at_max > 0)
1524                 at->at_current =  min(at->at_current, at_max);
1525         at->at_current =  max(at->at_current, at_min);
1526
1527         if (at->at_current != old)
1528                 CDEBUG(D_OTHER, "AT %p change: old=%u new=%u delta=%d "
1529                        "(val=%u) hist %u %u %u %u\n", at,
1530                        old, at->at_current, at->at_current - old, val,
1531                        at->at_hist[0], at->at_hist[1], at->at_hist[2],
1532                        at->at_hist[3]);
1533
1534         /* if we changed, report the old value */
1535         old = (at->at_current != old) ? old : 0;
1536
1537         spin_unlock(&at->at_lock);
1538         return old;
1539 }
1540
1541 /* Find the imp_at index for a given portal; assign if space available */
1542 int import_at_get_index(struct obd_import *imp, int portal)
1543 {
1544         struct imp_at *at = &imp->imp_at;
1545         int i;
1546
1547         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1548                 if (at->iat_portal[i] == portal)
1549                         return i;
1550                 if (at->iat_portal[i] == 0)
1551                         /* unused */
1552                         break;
1553         }
1554
1555         /* Not found in list, add it under a lock */
1556         spin_lock(&imp->imp_lock);
1557
1558         /* Check unused under lock */
1559         for (; i < IMP_AT_MAX_PORTALS; i++) {
1560                 if (at->iat_portal[i] == portal)
1561                         goto out;
1562                 if (at->iat_portal[i] == 0)
1563                         /* unused */
1564                         break;
1565         }
1566
1567         /* Not enough portals? */
1568         LASSERT(i < IMP_AT_MAX_PORTALS);
1569
1570         at->iat_portal[i] = portal;
1571 out:
1572         spin_unlock(&imp->imp_lock);
1573         return i;
1574 }