Whamcloud - gitweb
Merge branch 'b1_8' of git.lustre.org:prime/lustre into b1_8
[fs/lustre-release.git] / lustre / ptlrpc / import.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ptlrpc/import.c
37  *
38  * Author: Mike Shaver <shaver@clusterfs.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_RPC
42 #ifndef __KERNEL__
43 # include <liblustre.h>
44 #endif
45
46 #include <obd_support.h>
47 #include <lustre_ha.h>
48 #include <lustre_net.h>
49 #include <lustre_import.h>
50 #include <lustre_export.h>
51 #include <obd.h>
52 #include <obd_class.h>
53
54 #include "ptlrpc_internal.h"
55
56 struct ptlrpc_connect_async_args {
57          __u64 pcaa_peer_committed;
58         int pcaa_initial_connect;
59 };
60
61 static void __import_set_state(struct obd_import *imp,
62                                enum lustre_imp_state state)
63 {
64         imp->imp_state = state;
65         imp->imp_state_hist[imp->imp_state_hist_idx].ish_state = state;
66         imp->imp_state_hist[imp->imp_state_hist_idx].ish_time =
67                 cfs_time_current_sec();
68         imp->imp_state_hist_idx = (imp->imp_state_hist_idx + 1) %
69                 IMP_STATE_HIST_LEN;
70 }
71
72 /* A CLOSED import should remain so. */
73 #define IMPORT_SET_STATE_NOLOCK(imp, state)                                    \
74 do {                                                                           \
75         if (imp->imp_state != LUSTRE_IMP_CLOSED) {                             \
76                CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",    \
77                       imp, obd2cli_tgt(imp->imp_obd),                          \
78                       ptlrpc_import_state_name(imp->imp_state),                \
79                       ptlrpc_import_state_name(state));                        \
80                __import_set_state(imp, state);                                 \
81         }                                                                      \
82 } while(0)
83
84 #define IMPORT_SET_STATE(imp, state)            \
85 do {                                            \
86         spin_lock(&imp->imp_lock);              \
87         IMPORT_SET_STATE_NOLOCK(imp, state);    \
88         spin_unlock(&imp->imp_lock);            \
89 } while(0)
90
91
92 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
93                                     void * data, int rc);
94 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
95
96 /* Only this function is allowed to change the import state when it is
97  * CLOSED. I would rather refcount the import and free it after
98  * disconnection like we do with exports. To do that, the client_obd
99  * will need to save the peer info somewhere other than in the import,
100  * though. */
101 int ptlrpc_init_import(struct obd_import *imp)
102 {
103         spin_lock(&imp->imp_lock);
104
105         imp->imp_generation++;
106         imp->imp_state =  LUSTRE_IMP_NEW;
107
108         spin_unlock(&imp->imp_lock);
109
110         return 0;
111 }
112 EXPORT_SYMBOL(ptlrpc_init_import);
113
114 #define UUID_STR "_UUID"
115 static void deuuidify(char *uuid, const char *prefix, char **uuid_start,
116                       int *uuid_len)
117 {
118         *uuid_start = !prefix || strncmp(uuid, prefix, strlen(prefix))
119                 ? uuid : uuid + strlen(prefix);
120
121         *uuid_len = strlen(*uuid_start);
122
123         if (*uuid_len < strlen(UUID_STR))
124                 return;
125
126         if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
127                     UUID_STR, strlen(UUID_STR)))
128                 *uuid_len -= strlen(UUID_STR);
129 }
130
131 /* Returns true if import was FULL, false if import was already not
132  * connected.
133  * @imp - import to be disconnected
134  * @conn_cnt - connection count (epoch) of the request that timed out
135  *             and caused the disconnection.  In some cases, multiple
136  *             inflight requests can fail to a single target (e.g. OST
137  *             bulk requests) and if one has already caused a reconnection
138  *             (increasing the import->conn_cnt) the older failure should
139  *             not also cause a reconnection.  If zero it forces a reconnect.
140  */
141 int ptlrpc_set_import_discon(struct obd_import *imp, __u32 conn_cnt)
142 {
143         int rc = 0;
144
145         spin_lock(&imp->imp_lock);
146
147         if (imp->imp_state == LUSTRE_IMP_FULL &&
148             (conn_cnt == 0 || conn_cnt == imp->imp_conn_cnt)) {
149                 char *target_start;
150                 int   target_len;
151
152                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
153                           &target_start, &target_len);
154                 if (imp->imp_replayable) {
155                         LCONSOLE_WARN("%s: Connection to service %.*s via nid "
156                                "%s was lost; in progress operations using this "
157                                "service will wait for recovery to complete.\n",
158                                imp->imp_obd->obd_name, target_len, target_start,
159                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
160                 } else {
161                         LCONSOLE_ERROR_MSG(0x166, "%s: Connection to service "
162                                "%.*s via nid %s was lost; in progress "
163                                "operations using this service will fail.\n",
164                                imp->imp_obd->obd_name, target_len, target_start,
165                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
166                 }
167                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
168                 spin_unlock(&imp->imp_lock);
169
170                 if (obd_dump_on_timeout)
171                         libcfs_debug_dumplog();
172
173                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
174                 rc = 1;
175         } else {
176                 spin_unlock(&imp->imp_lock);
177                 CDEBUG(D_HA, "%s: import %p already %s (conn %u, was %u): %s\n",
178                        imp->imp_client->cli_name, imp,
179                        (imp->imp_state == LUSTRE_IMP_FULL &&
180                         imp->imp_conn_cnt > conn_cnt) ?
181                        "reconnected" : "not connected", imp->imp_conn_cnt,
182                        conn_cnt, ptlrpc_import_state_name(imp->imp_state));
183         }
184
185         return rc;
186 }
187
188 /* Must be called with imp_lock held! */
189 static void ptlrpc_deactivate_and_unlock_import(struct obd_import *imp)
190 {
191         ENTRY;
192         LASSERT_SPIN_LOCKED(&imp->imp_lock);
193
194         CDEBUG(D_HA, "setting import %s INVALID\n", obd2cli_tgt(imp->imp_obd));
195         imp->imp_invalid = 1;
196         imp->imp_generation++;
197         spin_unlock(&imp->imp_lock);
198
199         ptlrpc_abort_inflight(imp);
200         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
201 }
202
203 /*
204  * This acts as a barrier; all existing requests are rejected, and
205  * no new requests will be accepted until the import is valid again.
206  */
207 void ptlrpc_deactivate_import(struct obd_import *imp)
208 {
209         spin_lock(&imp->imp_lock);
210         ptlrpc_deactivate_and_unlock_import(imp);
211 }
212
213 static unsigned int
214 ptlrpc_inflight_deadline(struct ptlrpc_request *req, time_t now)
215 {
216         long dl;
217
218         if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
219               (req->rq_phase == RQ_PHASE_BULK) ||
220               (req->rq_phase == RQ_PHASE_NEW)))
221                 return 0;
222
223         if (req->rq_timedout)
224                 return 0;
225
226         if (req->rq_phase == RQ_PHASE_NEW)
227                 dl = req->rq_sent;
228         else
229                 dl = req->rq_deadline;
230
231         if (dl <= now)
232                 return 0;
233
234         return dl - now;
235 }
236
237 static unsigned int ptlrpc_inflight_timeout(struct obd_import *imp)
238 {
239         time_t now = cfs_time_current_sec();
240         struct list_head *tmp, *n;
241         struct ptlrpc_request *req;
242         unsigned int timeout = 0;
243
244         spin_lock(&imp->imp_lock);
245         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
246                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
247                 timeout = max(ptlrpc_inflight_deadline(req, now), timeout);
248         }
249         spin_unlock(&imp->imp_lock);
250         return timeout;
251 }
252
253 /*
254  * This function will invalidate the import, if necessary, then block
255  * for all the RPC completions, and finally notify the obd to
256  * invalidate its state (ie cancel locks, clear pending requests,
257  * etc).
258  */
259 void ptlrpc_invalidate_import(struct obd_import *imp)
260 {
261         struct list_head *tmp, *n;
262         struct ptlrpc_request *req;
263         struct l_wait_info lwi;
264         unsigned int timeout;
265         int rc;
266
267         atomic_inc(&imp->imp_inval_count);
268
269         /*
270          * If this is an invalid MGC connection, then don't bother
271          * waiting for imp_inflight to drop to 0.
272          */
273         if (imp->imp_invalid && imp->imp_recon_bk &&!imp->imp_obd->obd_no_recov)
274                 goto out;
275
276         if (!imp->imp_invalid || imp->imp_obd->obd_no_recov)
277                 ptlrpc_deactivate_import(imp);
278
279         LASSERT(imp->imp_invalid);
280
281         /* Wait forever until inflight == 0. We really can't do it another
282          * way because in some cases we need to wait for very long reply
283          * unlink. We can't do anything before that because there is really
284          * no guarantee that some rdma transfer is not in progress right now. */
285         do {
286                 /* Calculate max timeout for waiting on rpcs to error
287                  * out. Use obd_timeout if calculated value is smaller
288                  * than it. */
289                 timeout = ptlrpc_inflight_timeout(imp);
290                 timeout += timeout / 3;
291
292                 if (timeout == 0)
293                         timeout = obd_timeout;
294
295                 CDEBUG(D_RPCTRACE,"Sleeping %d sec for inflight to error out\n",
296                        timeout);
297
298                 /* Wait for all requests to error out and call completion
299                  * callbacks. Cap it at obd_timeout -- these should all
300                  * have been locally cancelled by ptlrpc_abort_inflight. */
301                 lwi = LWI_TIMEOUT_INTERVAL(
302                         cfs_timeout_cap(cfs_time_seconds(timeout)),
303                         cfs_time_seconds(1), NULL, NULL);
304                 rc = l_wait_event(imp->imp_recovery_waitq,
305                                 (atomic_read(&imp->imp_inflight) == 0), &lwi);
306                 if (rc) {
307                         const char *cli_tgt = obd2cli_tgt(imp->imp_obd);
308
309                         CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
310                                cli_tgt, rc, atomic_read(&imp->imp_inflight));
311
312                         spin_lock(&imp->imp_lock);
313                         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
314                                 req = list_entry(tmp, struct ptlrpc_request,
315                                         rq_list);
316                                 DEBUG_REQ(D_ERROR, req,"still on sending list");
317                         }
318                         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
319                                 req = list_entry(tmp, struct ptlrpc_request,
320                                         rq_list);
321                                 DEBUG_REQ(D_ERROR, req,"still on delayed list");
322                         }
323
324                         if (atomic_read(&imp->imp_unregistering) == 0) {
325                                 /* We know that only "unregistering" rpcs may
326                                  * still survive in sending or delaying lists
327                                  * (They are waiting for long reply unlink in
328                                  * sluggish nets). Let's check this. If there
329                                  * is no unregistering and inflight != 0 this
330                                  * is bug. */
331                                 LASSERT(atomic_read(&imp->imp_inflight) == 0);
332
333                                 /* Let's save one loop as soon as inflight have
334                                  * dropped to zero. No new inflights possible at
335                                  * this point. */
336                                 rc = 0;
337                         } else {
338                                 CERROR("%s: RPCs in \"%s\" phase found (%d). "
339                                        "Network is sluggish? Waiting them "
340                                        "to error out.\n", cli_tgt,
341                                        ptlrpc_phase2str(RQ_PHASE_UNREGISTERING),
342                                        atomic_read(&imp->imp_unregistering));
343                         }
344                         spin_unlock(&imp->imp_lock);
345                 }
346         } while (rc != 0);
347
348         /* Let's additionally check that no new rpcs added to import in
349          * "invalidate" state. */
350         LASSERT(atomic_read(&imp->imp_inflight) == 0);
351
352 out:
353         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
354
355         atomic_dec(&imp->imp_inval_count);
356         cfs_waitq_broadcast(&imp->imp_recovery_waitq);
357 }
358
359 /* unset imp_invalid */
360 void ptlrpc_activate_import(struct obd_import *imp)
361 {
362         struct obd_device *obd = imp->imp_obd;
363
364         spin_lock(&imp->imp_lock);
365         imp->imp_invalid = 0;
366         spin_unlock(&imp->imp_lock);
367
368         obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
369 }
370
371 void ptlrpc_fail_import(struct obd_import *imp, __u32 conn_cnt)
372 {
373         ENTRY;
374
375         LASSERT(!imp->imp_dlm_fake);
376
377         if (ptlrpc_set_import_discon(imp, conn_cnt)) {
378                 if (!imp->imp_replayable) {
379                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
380                                "auto-deactivating\n",
381                                obd2cli_tgt(imp->imp_obd),
382                                imp->imp_connection->c_remote_uuid.uuid,
383                                imp->imp_obd->obd_name);
384                         ptlrpc_deactivate_import(imp);
385                 }
386
387                 CDEBUG(D_HA, "%s: waking up pinger\n",
388                        obd2cli_tgt(imp->imp_obd));
389
390                 spin_lock(&imp->imp_lock);
391                 imp->imp_force_verify = 1;
392                 spin_unlock(&imp->imp_lock);
393
394                 ptlrpc_pinger_wake_up();
395         }
396         EXIT;
397 }
398
399 int ptlrpc_reconnect_import(struct obd_import *imp)
400 {
401
402         ptlrpc_set_import_discon(imp, 0);
403         /* Force a new connect attempt */
404         ptlrpc_invalidate_import(imp);
405         /* Do a fresh connect next time by zeroing the handle */
406         ptlrpc_disconnect_import(imp, 1);
407         /* Wait for all invalidate calls to finish */
408         if (atomic_read(&imp->imp_inval_count) > 0) {
409                 int rc;
410                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
411                 rc = l_wait_event(imp->imp_recovery_waitq,
412                                   (atomic_read(&imp->imp_inval_count) == 0),
413                                   &lwi);
414                 if (rc)
415                         CERROR("Interrupted, inval=%d\n",
416                                atomic_read(&imp->imp_inval_count));
417         }
418
419         /*
420          * Allow reconnect attempts. Note: Currently, the function is
421          * only called by MGC. So assume this is a recoverable import,
422          * and force import to be recoverable. fix this if you need to
423          */
424
425         imp->imp_obd->obd_no_recov = 0;
426         /* Remove 'invalid' flag */
427         ptlrpc_activate_import(imp);
428         /* Attempt a new connect */
429         ptlrpc_recover_import(imp, NULL);
430         return 0;
431 }
432
433 EXPORT_SYMBOL(ptlrpc_reconnect_import);
434
435 static int import_select_connection(struct obd_import *imp)
436 {
437         struct obd_import_conn *imp_conn = NULL, *conn;
438         struct obd_export *dlmexp;
439         int tried_all = 1;
440         ENTRY;
441
442         spin_lock(&imp->imp_lock);
443
444         if (list_empty(&imp->imp_conn_list)) {
445                 CERROR("%s: no connections available\n",
446                         imp->imp_obd->obd_name);
447                 spin_unlock(&imp->imp_lock);
448                 RETURN(-EINVAL);
449         }
450
451         list_for_each_entry(conn, &imp->imp_conn_list, oic_item) {
452                 CDEBUG(D_HA, "%s: connect to NID %s last attempt "LPU64"\n",
453                        imp->imp_obd->obd_name,
454                        libcfs_nid2str(conn->oic_conn->c_peer.nid),
455                        conn->oic_last_attempt);
456
457                 /* Don't thrash connections */
458                 if (cfs_time_before_64(cfs_time_current_64(),
459                                      conn->oic_last_attempt +
460                                      cfs_time_seconds(CONNECTION_SWITCH_MIN))) {
461                         continue;
462                 }
463
464                 /* If we have not tried this connection since the
465                    the last successful attempt, go with this one */
466                 if ((conn->oic_last_attempt == 0) ||
467                     cfs_time_beforeq_64(conn->oic_last_attempt,
468                                        imp->imp_last_success_conn)) {
469                         imp_conn = conn;
470                         tried_all = 0;
471                         break;
472                 }
473
474                 /* If all of the connections have already been tried
475                    since the last successful connection; just choose the
476                    least recently used */
477                 if (!imp_conn)
478                         imp_conn = conn;
479                 else if (cfs_time_before_64(conn->oic_last_attempt,
480                                             imp_conn->oic_last_attempt))
481                         imp_conn = conn;
482         }
483
484         /* if not found, simply choose the current one */
485         if (!imp_conn || imp->imp_force_reconnect) {
486                 LASSERT(imp->imp_conn_current);
487                 imp_conn = imp->imp_conn_current;
488                 tried_all = 0;
489         }
490         LASSERT(imp_conn->oic_conn);
491
492         /* If we've tried everything, and we're back to the beginning of the
493            list, increase our timeout and try again. It will be reset when
494            we do finally connect. (FIXME: really we should wait for all network
495            state associated with the last connection attempt to drain before
496            trying to reconnect on it.) */
497         if (tried_all && (imp->imp_conn_list.next == &imp_conn->oic_item) &&
498             !imp->imp_recon_bk /* not retrying */) {
499                 if (at_get(&imp->imp_at.iat_net_latency) <
500                     CONNECTION_SWITCH_MAX) {
501                         at_add(&imp->imp_at.iat_net_latency,
502                                MIN(at_get(&imp->imp_at.iat_net_latency) +
503                                CONNECTION_SWITCH_INC, CONNECTION_SWITCH_MAX));
504                 }
505                 LASSERT(imp_conn->oic_last_attempt);
506                 CWARN("%s: tried all connections, increasing latency to %ds\n",
507                       imp->imp_obd->obd_name,
508                       at_get(&imp->imp_at.iat_net_latency));
509         }
510
511         imp_conn->oic_last_attempt = cfs_time_current_64();
512
513         /* switch connection, don't mind if it's same as the current one */
514         if (imp->imp_connection)
515                 ptlrpc_connection_put(imp->imp_connection);
516         imp->imp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
517
518         dlmexp =  class_conn2export(&imp->imp_dlm_handle);
519         LASSERT(dlmexp != NULL);
520         if (dlmexp->exp_connection)
521                 ptlrpc_connection_put(dlmexp->exp_connection);
522         dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
523         class_export_put(dlmexp);
524
525         if (imp->imp_conn_current != imp_conn) {
526                 if (imp->imp_conn_current)
527                         CDEBUG(D_HA, "Changing connection for %s to %s/%s\n",
528                                imp->imp_obd->obd_name, imp_conn->oic_uuid.uuid,
529                                libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
530                 imp->imp_conn_current = imp_conn;
531         }
532
533         CDEBUG(D_HA, "%s: import %p using connection %s/%s\n",
534                imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid,
535                libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
536
537         spin_unlock(&imp->imp_lock);
538
539         RETURN(0);
540 }
541
542 /**
543  * must be called under imp lock
544  */
545 static int ptlrpc_first_transno(struct obd_import *imp, __u64 *transno)
546 {
547         struct ptlrpc_request *req;
548         struct list_head *tmp;
549
550         if (list_empty(&imp->imp_replay_list))
551                 return 0;
552         tmp = imp->imp_replay_list.next;
553         req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
554         *transno = req->rq_transno;
555         if (req->rq_transno == 0) {
556                 DEBUG_REQ(D_ERROR, req, "zero transno in replay");
557                 LBUG();
558         }
559
560         return 1;
561 }
562
563 int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
564 {
565         struct obd_device *obd = imp->imp_obd;
566         int set_transno = 0;
567         int initial_connect = 0;
568         int rc;
569         __u64 committed_before_reconnect = 0;
570         struct ptlrpc_request *request;
571         __u32 size[] = { sizeof(struct ptlrpc_body),
572                        sizeof(imp->imp_obd->u.cli.cl_target_uuid),
573                        sizeof(obd->obd_uuid),
574                        sizeof(imp->imp_dlm_handle),
575                        sizeof(imp->imp_connect_data) };
576         char *tmp[] = { NULL,
577                         obd2cli_tgt(imp->imp_obd),
578                         obd->obd_uuid.uuid,
579                         (char *)&imp->imp_dlm_handle,
580                         (char *)&imp->imp_connect_data };
581         struct ptlrpc_connect_async_args *aa;
582
583         ENTRY;
584         spin_lock(&imp->imp_lock);
585         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
586                 spin_unlock(&imp->imp_lock);
587                 CERROR("can't connect to a closed import\n");
588                 RETURN(-EINVAL);
589         } else if (imp->imp_state == LUSTRE_IMP_FULL) {
590                 spin_unlock(&imp->imp_lock);
591                 CERROR("already connected\n");
592                 RETURN(0);
593         } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
594                 spin_unlock(&imp->imp_lock);
595                 CERROR("already connecting\n");
596                 RETURN(-EALREADY);
597         }
598
599         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
600
601         imp->imp_conn_cnt++;
602         imp->imp_resend_replay = 0;
603
604         if (!lustre_handle_is_used(&imp->imp_remote_handle))
605                 initial_connect = 1;
606         else
607                 committed_before_reconnect = imp->imp_peer_committed_transno;
608
609         set_transno = ptlrpc_first_transno(imp,
610                                            &imp->imp_connect_data.ocd_transno);
611
612         spin_unlock(&imp->imp_lock);
613
614         if (new_uuid) {
615                 struct obd_uuid uuid;
616
617                 obd_str2uuid(&uuid, new_uuid);
618                 rc = import_set_conn_priority(imp, &uuid);
619                 if (rc)
620                         GOTO(out, rc);
621         }
622
623         rc = import_select_connection(imp);
624         if (rc)
625                 GOTO(out, rc);
626
627         /* last in connection list */
628         if (imp->imp_conn_current->oic_item.next == &imp->imp_conn_list) {
629                 if (imp->imp_initial_recov_bk && initial_connect) {
630                         CDEBUG(D_HA, "Last connection attempt (%d) for %s\n",
631                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
632                         /* Don't retry if connect fails */
633                         rc = 0;
634                         obd_set_info_async(obd->obd_self_export,
635                                            sizeof(KEY_INIT_RECOV),
636                                            KEY_INIT_RECOV,
637                                            sizeof(rc), &rc, NULL);
638                 }
639                 if (imp->imp_recon_bk) {
640                         CDEBUG(D_HA, "Last reconnection attempt (%d) for %s\n",
641                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
642                         spin_lock(&imp->imp_lock);
643                         imp->imp_last_recon = 1;
644                         spin_unlock(&imp->imp_lock);
645                 }
646         }
647
648         /* Reset connect flags to the originally requested flags, in case
649          * the server is updated on-the-fly we will get the new features. */
650         imp->imp_connect_data.ocd_connect_flags = imp->imp_connect_flags_orig;
651         imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
652
653         rc = obd_reconnect(imp->imp_obd->obd_self_export, obd,
654                            &obd->obd_uuid, &imp->imp_connect_data, NULL);
655         if (rc)
656                 GOTO(out, rc);
657
658         request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, imp->imp_connect_op,
659                                   5, size, tmp);
660         if (!request)
661                 GOTO(out, rc = -ENOMEM);
662
663         /* Report the rpc service time to the server so that it knows how long
664          * to wait for clients to join recovery */
665         lustre_msg_set_service_time(request->rq_reqmsg,
666                                     at_timeout2est(request->rq_timeout));
667
668         /* The amount of time we give the server to process the connect req.
669          * import_select_connection will increase the net latency on
670          * repeated reconnect attempts to cover slow networks.
671          * We override/ignore the server rpc completion estimate here,
672          * which may be large if this is a reconnect attempt */
673         request->rq_timeout = INITIAL_CONNECT_TIMEOUT;
674         lustre_msg_set_timeout(request->rq_reqmsg, request->rq_timeout);
675
676 #ifndef __KERNEL__
677         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
678 #endif
679         if (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V1)
680                 lustre_msg_add_op_flags(request->rq_reqmsg,
681                                         MSG_CONNECT_NEXT_VER);
682
683         request->rq_no_resend = request->rq_no_delay = 1;
684         request->rq_send_state = LUSTRE_IMP_CONNECTING;
685         /* Allow a slightly larger reply for future growth compatibility */
686         size[REPLY_REC_OFF] = sizeof(struct obd_connect_data) +
687                               16 * sizeof(__u64);
688         ptlrpc_req_set_repsize(request, 2, size);
689         request->rq_interpret_reply = ptlrpc_connect_interpret;
690
691         CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
692         aa = ptlrpc_req_async_args(request);
693         memset(aa, 0, sizeof *aa);
694
695         aa->pcaa_peer_committed = committed_before_reconnect;
696         aa->pcaa_initial_connect = initial_connect;
697         if (aa->pcaa_initial_connect) {
698                 spin_lock(&imp->imp_lock);
699                 imp->imp_replayable = 1;
700                 spin_unlock(&imp->imp_lock);
701                 lustre_msg_add_op_flags(request->rq_reqmsg,
702                                         MSG_CONNECT_INITIAL);
703         }
704
705         if (set_transno)
706                 lustre_msg_add_op_flags(request->rq_reqmsg,
707                                         MSG_CONNECT_TRANSNO);
708
709         DEBUG_REQ(D_RPCTRACE, request, "%sconnect request %d",
710                   aa->pcaa_initial_connect ? "initial " : "re",
711                   imp->imp_conn_cnt);
712         ptlrpcd_add_req(request);
713         rc = 0;
714 out:
715         if (rc != 0) {
716                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
717         }
718
719         RETURN(rc);
720 }
721 EXPORT_SYMBOL(ptlrpc_connect_import);
722
723 static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp)
724 {
725 #ifdef __KERNEL__
726         struct obd_import_conn *imp_conn;
727 #endif
728         int wake_pinger = 0;
729
730         ENTRY;
731
732         spin_lock(&imp->imp_lock);
733         if (list_empty(&imp->imp_conn_list))
734                 GOTO(unlock, 0);
735
736 #ifdef __KERNEL__
737         imp_conn = list_entry(imp->imp_conn_list.prev,
738                               struct obd_import_conn,
739                               oic_item);
740
741         /* XXX: When the failover node is the primary node, it is possible
742          * to have two identical connections in imp_conn_list. We must
743          * compare not conn's pointers but NIDs, otherwise we can defeat
744          * connection throttling. (See bug 14774.) */
745         if (imp->imp_conn_current->oic_conn->c_peer.nid !=
746                                 imp_conn->oic_conn->c_peer.nid) {
747                 ptlrpc_ping_import_soon(imp);
748                 wake_pinger = 1;
749         }
750
751 #else
752         /* liblustre has no pinger thead, so we wakup pinger anyway */
753         wake_pinger = 1;
754 #endif
755  unlock:
756         spin_unlock(&imp->imp_lock);
757
758         if (wake_pinger)
759                 ptlrpc_pinger_wake_up();
760
761         EXIT;
762 }
763
764 static int ptlrpc_busy_reconnect(int rc)
765 {
766         return (rc == -EBUSY) || (rc == -EAGAIN);
767 }
768
769 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
770                                     void * data, int rc)
771 {
772         struct ptlrpc_connect_async_args *aa = data;
773         struct obd_import *imp = request->rq_import;
774         struct client_obd *cli = &imp->imp_obd->u.cli;
775         struct lustre_handle old_hdl;
776         __u64 old_connect_flags;
777         int msg_flags;
778         ENTRY;
779
780         spin_lock(&imp->imp_lock);
781         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
782                 spin_unlock(&imp->imp_lock);
783                 RETURN(0);
784         }
785
786         if (rc) {
787                 /* if this reconnect to busy export - not need select new target
788                  * for connecting*/
789                 imp->imp_force_reconnect = ptlrpc_busy_reconnect(rc);
790                 spin_unlock(&imp->imp_lock);
791                 GOTO(out, rc);
792         }
793
794         LASSERT(imp->imp_conn_current);
795
796         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
797
798         /* All imports are pingable */
799         imp->imp_pingable = 1;
800         imp->imp_force_reconnect = 0;
801
802         if (aa->pcaa_initial_connect) {
803                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
804                         imp->imp_replayable = 1;
805                         spin_unlock(&imp->imp_lock);
806                         CDEBUG(D_HA, "connected to replayable target: %s\n",
807                                obd2cli_tgt(imp->imp_obd));
808                 } else {
809                         imp->imp_replayable = 0;
810                         spin_unlock(&imp->imp_lock);
811                 }
812
813                 if ((request->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V1 &&
814                      msg_flags & MSG_CONNECT_NEXT_VER) ||
815                     request->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V2) {
816                         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
817                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v2\n",
818                                obd2cli_tgt(imp->imp_obd));
819                 } else {
820                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v1\n",
821                                obd2cli_tgt(imp->imp_obd));
822                 }
823
824                 imp->imp_remote_handle =
825                                 *lustre_msg_get_handle(request->rq_repmsg);
826
827                 /* Initial connects are allowed for clients with non-random
828                  * uuids when servers are in recovery.  Simply signal the
829                  * servers replay is complete and wait in REPLAY_WAIT. */
830                 if (msg_flags & MSG_CONNECT_RECOVERING) {
831                         CDEBUG(D_HA, "connect to %s during recovery\n",
832                                obd2cli_tgt(imp->imp_obd));
833                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
834                 } else {
835                         IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
836                         ptlrpc_activate_import(imp);
837                 }
838                 GOTO(finish, rc = 0);
839         } else {
840                 spin_unlock(&imp->imp_lock);
841         }
842
843         /* Determine what recovery state to move the import to. */
844         if (MSG_CONNECT_RECONNECT & msg_flags) {
845                 memset(&old_hdl, 0, sizeof(old_hdl));
846                 if (!memcmp(&old_hdl, lustre_msg_get_handle(request->rq_repmsg),
847                             sizeof (old_hdl))) {
848                         CERROR("%s@%s didn't like our handle "LPX64
849                                ", failed\n", obd2cli_tgt(imp->imp_obd),
850                                imp->imp_connection->c_remote_uuid.uuid,
851                                imp->imp_dlm_handle.cookie);
852                         GOTO(out, rc = -ENOTCONN);
853                 }
854
855                 if (memcmp(&imp->imp_remote_handle,
856                            lustre_msg_get_handle(request->rq_repmsg),
857                            sizeof(imp->imp_remote_handle))) {
858                         int level = msg_flags & MSG_CONNECT_RECOVERING ?
859                                 D_HA : D_WARNING;
860
861                         /* Bug 16611/14775: if server handle have changed,
862                          * that means some sort of disconnection happened.
863                          * If the server is not in recovery, that also means it
864                          * already erased all of our state because of previous
865                          * eviction. If it is in recovery - we are safe to
866                          * participate since we can reestablish all of our state
867                          * with server again */
868                         CDEBUG(level,"%s@%s changed server handle from "
869                                      LPX64" to "LPX64"%s\n",
870                                      obd2cli_tgt(imp->imp_obd),
871                                      imp->imp_connection->c_remote_uuid.uuid,
872                                      imp->imp_remote_handle.cookie,
873                                      lustre_msg_get_handle(request->rq_repmsg)->
874                                                                         cookie,
875                                      (MSG_CONNECT_RECOVERING & msg_flags) ?
876                                          " but is still in recovery" : "");
877
878                         imp->imp_remote_handle =
879                                      *lustre_msg_get_handle(request->rq_repmsg);
880
881                         if (!(MSG_CONNECT_RECOVERING & msg_flags)) {
882                                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
883                                 GOTO(finish, rc = 0);
884                         }
885
886                 } else {
887                         CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
888                                obd2cli_tgt(imp->imp_obd),
889                                imp->imp_connection->c_remote_uuid.uuid);
890                 }
891
892                 if (imp->imp_invalid) {
893                         CDEBUG(D_HA, "%s: reconnected but import is invalid; "
894                                "marking evicted\n", imp->imp_obd->obd_name);
895                         IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
896                 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
897                         CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
898                                imp->imp_obd->obd_name,
899                                obd2cli_tgt(imp->imp_obd));
900
901                         spin_lock(&imp->imp_lock);
902                         imp->imp_resend_replay = 1;
903                         /* VBR: delayed connection */
904                         if (MSG_CONNECT_DELAYED & msg_flags) {
905                                 imp->imp_delayed_recovery = 1;
906                                 imp->imp_no_lock_replay = 1;
907                         }
908                         spin_unlock(&imp->imp_lock);
909
910                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
911                 } else {
912                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
913                 }
914         } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
915                 LASSERT(imp->imp_replayable);
916                 imp->imp_remote_handle =
917                                 *lustre_msg_get_handle(request->rq_repmsg);
918                 imp->imp_last_replay_transno = 0;
919                 /* VBR: delayed connection */
920                 if (MSG_CONNECT_DELAYED & msg_flags) {
921                         spin_lock(&imp->imp_lock);
922                         imp->imp_delayed_recovery = 1;
923                         imp->imp_no_lock_replay = 1;
924                         spin_unlock(&imp->imp_lock);
925                 }
926                 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
927         } else {
928                 DEBUG_REQ(D_HA, request, "evicting (not initial connect and "
929                           "flags reconnect/recovering not set: %x)",msg_flags);
930                 imp->imp_remote_handle =
931                                 *lustre_msg_get_handle(request->rq_repmsg);
932                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
933         }
934
935         /* Sanity checks for a reconnected import. */
936         if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
937                 CERROR("imp_replayable flag does not match server "
938                        "after reconnect. We should LBUG right here.\n");
939         }
940
941         if (lustre_msg_get_last_committed(request->rq_repmsg) <
942             aa->pcaa_peer_committed) {
943                 CERROR("%s went back in time (transno "LPD64
944                        " was previously committed, server now claims "LPD64
945                        ")!  See https://bugzilla.lustre.org/show_bug.cgi?"
946                        "id=9646\n",
947                        obd2cli_tgt(imp->imp_obd), aa->pcaa_peer_committed,
948                        lustre_msg_get_last_committed(request->rq_repmsg));
949         }
950
951 finish:
952         rc = ptlrpc_import_recovery_state_machine(imp);
953         if (rc != 0) {
954                 if (rc == -ENOTCONN) {
955                         CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
956                                "invalidating and reconnecting\n",
957                                obd2cli_tgt(imp->imp_obd),
958                                imp->imp_connection->c_remote_uuid.uuid);
959                         ptlrpc_connect_import(imp, NULL);
960                         RETURN(0);
961                 }
962         } else {
963                 struct obd_connect_data *ocd;
964                 struct obd_export *exp;
965
966                 ocd = lustre_swab_repbuf(request, REPLY_REC_OFF, sizeof(*ocd),
967                                          lustre_swab_connect);
968                 spin_lock(&imp->imp_lock);
969                 list_del(&imp->imp_conn_current->oic_item);
970                 list_add(&imp->imp_conn_current->oic_item, &imp->imp_conn_list);
971                 imp->imp_last_success_conn =
972                         imp->imp_conn_current->oic_last_attempt;
973
974                 if (ocd == NULL) {
975                         spin_unlock(&imp->imp_lock);
976                         CERROR("Wrong connect data from server\n");
977                         rc = -EPROTO;
978                         GOTO(out, rc);
979                 }
980
981                 imp->imp_connect_data = *ocd;
982
983                 exp = class_conn2export(&imp->imp_dlm_handle);
984                 spin_unlock(&imp->imp_lock);
985
986                 /* check that server granted subset of flags we asked for. */
987                 LASSERTF((ocd->ocd_connect_flags &
988                           imp->imp_connect_flags_orig) ==
989                          ocd->ocd_connect_flags, LPX64" != "LPX64,
990                          imp->imp_connect_flags_orig, ocd->ocd_connect_flags);
991
992                 if (!exp) {
993                         /* This could happen if export is cleaned during the
994                            connect attempt */
995                         CERROR("Missing export for %s\n",
996                                imp->imp_obd->obd_name);
997                         GOTO(out, rc = -ENODEV);
998                 }
999                 old_connect_flags = exp->exp_connect_flags;
1000                 exp->exp_connect_flags = ocd->ocd_connect_flags;
1001                 imp->imp_obd->obd_self_export->exp_connect_flags =
1002                         ocd->ocd_connect_flags;
1003                 class_export_put(exp);
1004
1005                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD);
1006
1007                 if (!ocd->ocd_ibits_known &&
1008                     ocd->ocd_connect_flags & OBD_CONNECT_IBITS)
1009                         CERROR("Inodebits aware server returned zero compatible"
1010                                " bits?\n");
1011
1012                 if ((ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
1013                     (ocd->ocd_version > LUSTRE_VERSION_CODE +
1014                                         LUSTRE_VERSION_OFFSET_WARN ||
1015                      ocd->ocd_version < LUSTRE_VERSION_CODE -
1016                                         LUSTRE_VERSION_OFFSET_WARN)) {
1017                         /* Sigh, some compilers do not like #ifdef in the middle
1018                            of macro arguments */
1019 #ifdef __KERNEL__
1020                         const char *older =
1021                                 "older.  Consider upgrading this client";
1022 #else
1023                         const char *older =
1024                                 "older.  Consider recompiling this application";
1025 #endif
1026                         const char *newer = "newer than client version";
1027
1028                         LCONSOLE_WARN("Server %s version (%d.%d.%d.%d) "
1029                                       "is much %s (%s)\n",
1030                                       obd2cli_tgt(imp->imp_obd),
1031                                       OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
1032                                       OBD_OCD_VERSION_MINOR(ocd->ocd_version),
1033                                       OBD_OCD_VERSION_PATCH(ocd->ocd_version),
1034                                       OBD_OCD_VERSION_FIX(ocd->ocd_version),
1035                                       ocd->ocd_version > LUSTRE_VERSION_CODE ?
1036                                       newer : older, LUSTRE_VERSION_STRING);
1037                 }
1038
1039                 if (ocd->ocd_connect_flags & OBD_CONNECT_CKSUM) {
1040                         /* We sent to the server ocd_cksum_types with bits set
1041                          * for algorithms we understand. The server masked off
1042                          * the checksum types it doesn't support */
1043                         if ((ocd->ocd_cksum_types & OBD_CKSUM_ALL) == 0) {
1044                                 LCONSOLE_WARN("The negotiation of the checksum "
1045                                               "alogrithm to use with server %s "
1046                                               "failed (%x/%x), disabling "
1047                                               "checksums\n",
1048                                               obd2cli_tgt(imp->imp_obd),
1049                                               ocd->ocd_cksum_types,
1050                                               OBD_CKSUM_ALL);
1051                                 cli->cl_checksum = 0;
1052                                 cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
1053                                 cli->cl_cksum_type = OBD_CKSUM_CRC32;
1054                         } else {
1055                                 cli->cl_supp_cksum_types = ocd->ocd_cksum_types;
1056
1057                                 if (ocd->ocd_cksum_types & OSC_DEFAULT_CKSUM)
1058                                         cli->cl_cksum_type = OSC_DEFAULT_CKSUM;
1059                                 else if (ocd->ocd_cksum_types & OBD_CKSUM_ADLER)
1060                                         cli->cl_cksum_type = OBD_CKSUM_ADLER;
1061                                 else
1062                                         cli->cl_cksum_type = OBD_CKSUM_CRC32;
1063                         }
1064                 } else {
1065                         /* The server does not support OBD_CONNECT_CKSUM.
1066                          * Enforce CRC32 for backward compatibility*/
1067                         cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
1068                         cli->cl_cksum_type = OBD_CKSUM_CRC32;
1069                 }
1070
1071                 if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
1072                         cli->cl_max_pages_per_rpc =
1073                                 ocd->ocd_brw_size >> CFS_PAGE_SHIFT;
1074                 }
1075
1076                 /* Reset ns_connect_flags only for initial connect. It might be
1077                  * changed in while using FS and if we reset it in reconnect
1078                  * this leads to lossing user settings done before such as
1079                  * disable lru_resize, etc. */
1080                 if (old_connect_flags != exp->exp_connect_flags ||
1081                     aa->pcaa_initial_connect) {
1082                         CDEBUG(D_HA, "%s: Resetting ns_connect_flags to server "
1083                                "flags: "LPX64"\n", imp->imp_obd->obd_name,
1084                                ocd->ocd_connect_flags);
1085                         imp->imp_obd->obd_namespace->ns_connect_flags =
1086                                 ocd->ocd_connect_flags;
1087                         imp->imp_obd->obd_namespace->ns_orig_connect_flags =
1088                                 ocd->ocd_connect_flags;
1089                 }
1090
1091                 if ((ocd->ocd_connect_flags & OBD_CONNECT_AT) &&
1092                     (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V2))
1093                         /* We need a per-message support flag, because
1094                            a. we don't know if the incoming connect reply
1095                               supports AT or not (in reply_in_callback)
1096                               until we unpack it.
1097                            b. failovered server means export and flags are gone
1098                               (in ptlrpc_send_reply).
1099                            Can only be set when we know AT is supported at
1100                            both ends */
1101                         imp->imp_msghdr_flags |= MSGHDR_AT_SUPPORT;
1102                 else
1103                         imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
1104
1105                 LASSERT((cli->cl_max_pages_per_rpc <= PTLRPC_MAX_BRW_PAGES) &&
1106                         (cli->cl_max_pages_per_rpc > 0));
1107         }
1108
1109  out:
1110         if (rc != 0) {
1111                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
1112                 spin_lock(&imp->imp_lock);
1113                 if (aa->pcaa_initial_connect && !imp->imp_initial_recov &&
1114                     (request->rq_import_generation == imp->imp_generation))
1115                         ptlrpc_deactivate_and_unlock_import(imp);
1116                 else
1117                         spin_unlock(&imp->imp_lock);
1118
1119                 if (imp->imp_recon_bk && imp->imp_last_recon) {
1120                         /* Give up trying to reconnect */
1121                         imp->imp_obd->obd_no_recov = 1;
1122                         ptlrpc_deactivate_import(imp);
1123                 }
1124
1125                 if (rc == -EPROTO) {
1126                         struct obd_connect_data *ocd;
1127                         ocd = lustre_swab_repbuf(request, REPLY_REC_OFF,
1128                                                  sizeof *ocd,
1129                                                  lustre_swab_connect);
1130                         if (ocd &&
1131                             (ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
1132                             (ocd->ocd_version != LUSTRE_VERSION_CODE)) {
1133                            /* Actually servers are only supposed to refuse
1134                               connection from liblustre clients, so we should
1135                               never see this from VFS context */
1136                                 LCONSOLE_ERROR_MSG(0x16a, "Server %s version "
1137                                         "(%d.%d.%d.%d)"
1138                                         " refused connection from this client "
1139                                         "with an incompatible version (%s).  "
1140                                         "Client must be recompiled\n",
1141                                         obd2cli_tgt(imp->imp_obd),
1142                                         OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
1143                                         OBD_OCD_VERSION_MINOR(ocd->ocd_version),
1144                                         OBD_OCD_VERSION_PATCH(ocd->ocd_version),
1145                                         OBD_OCD_VERSION_FIX(ocd->ocd_version),
1146                                         LUSTRE_VERSION_STRING);
1147                                 ptlrpc_deactivate_import(imp);
1148                                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CLOSED);
1149                         }
1150                         RETURN(-EPROTO);
1151                 }
1152
1153                 ptlrpc_maybe_ping_import_soon(imp);
1154
1155                 CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
1156                        obd2cli_tgt(imp->imp_obd),
1157                        (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
1158         }
1159
1160         spin_lock(&imp->imp_lock);
1161         imp->imp_last_recon = 0;
1162         spin_unlock(&imp->imp_lock);
1163
1164         cfs_waitq_broadcast(&imp->imp_recovery_waitq);
1165         RETURN(rc);
1166 }
1167
1168 static int completed_replay_interpret(struct ptlrpc_request *req,
1169                                       void * data, int rc)
1170 {
1171         ENTRY;
1172         atomic_dec(&req->rq_import->imp_replay_inflight);
1173         if (req->rq_status == 0 &&
1174             !req->rq_import->imp_vbr_failed) {
1175                 ptlrpc_import_recovery_state_machine(req->rq_import);
1176         } else {
1177                 if (req->rq_import->imp_vbr_failed) {
1178                         CDEBUG(D_WARNING,
1179                                "%s: version recovery fails, reconnecting\n",
1180                                req->rq_import->imp_obd->obd_name);
1181                         spin_lock(&req->rq_import->imp_lock);
1182                         req->rq_import->imp_vbr_failed = 0;
1183                         spin_unlock(&req->rq_import->imp_lock);
1184                 } else {
1185                         CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
1186                                      "reconnecting\n",
1187                                req->rq_import->imp_obd->obd_name,
1188                                req->rq_status);
1189                 }
1190                 ptlrpc_connect_import(req->rq_import, NULL);
1191         }
1192         RETURN(0);
1193 }
1194
1195 static int signal_completed_replay(struct obd_import *imp)
1196 {
1197         struct ptlrpc_request *req;
1198         ENTRY;
1199
1200         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
1201         atomic_inc(&imp->imp_replay_inflight);
1202
1203         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, 1, NULL, NULL);
1204         if (!req) {
1205                 atomic_dec(&imp->imp_replay_inflight);
1206                 RETURN(-ENOMEM);
1207         }
1208
1209         ptlrpc_req_set_repsize(req, 1, NULL);
1210         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
1211         lustre_msg_add_flags(req->rq_reqmsg,
1212                              MSG_LOCK_REPLAY_DONE |
1213                              MSG_REQ_REPLAY_DONE |
1214                              MSG_LAST_REPLAY);
1215
1216         if (imp->imp_delayed_recovery)
1217                 lustre_msg_add_flags(req->rq_reqmsg, MSG_DELAY_REPLAY);
1218         req->rq_interpret_reply = completed_replay_interpret;
1219
1220         if (AT_OFF)
1221                 req->rq_timeout *= 3;
1222
1223         ptlrpcd_add_req(req);
1224         RETURN(0);
1225 }
1226
1227 #ifdef __KERNEL__
1228 static int ptlrpc_invalidate_import_thread(void *data)
1229 {
1230         struct obd_import *imp = data;
1231
1232         ENTRY;
1233
1234         cfs_daemonize_ctxt("ll_imp_inval");
1235
1236         CDEBUG(D_HA, "thread invalidate import %s to %s@%s\n",
1237                imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
1238                imp->imp_connection->c_remote_uuid.uuid);
1239
1240         ptlrpc_invalidate_import(imp);
1241
1242         if (obd_dump_on_eviction) {
1243                 CERROR("dump the log upon eviction\n");
1244                 libcfs_debug_dumplog();
1245         }
1246
1247         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1248         ptlrpc_import_recovery_state_machine(imp);
1249
1250         class_import_put(imp);
1251         RETURN(0);
1252 }
1253 #endif
1254
1255 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
1256 {
1257         int rc = 0;
1258         int inflight;
1259         char *target_start;
1260         int target_len;
1261
1262         ENTRY;
1263         if (imp->imp_state == LUSTRE_IMP_EVICTED) {
1264                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1265                           &target_start, &target_len);
1266                 /* Don't care about MGC eviction */
1267                 if (strcmp(imp->imp_obd->obd_type->typ_name,
1268                            LUSTRE_MGC_NAME) != 0) {
1269                         LCONSOLE_ERROR_MSG(0x167, "This client was evicted by "
1270                                            "%.*s; in progress operations using "
1271                                            "this service will fail.\n",
1272                                            target_len, target_start);
1273                 }
1274                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
1275                        obd2cli_tgt(imp->imp_obd),
1276                        imp->imp_connection->c_remote_uuid.uuid);
1277
1278 #ifdef __KERNEL__
1279                 /* bug 17802:  XXX client_disconnect_export vs connect request
1280                  * race. if client will evicted at this time, we start
1281                  * invalidate thread without referece to import and import can
1282                  * be freed at same time. */
1283                 class_import_get(imp);
1284                 rc = cfs_kernel_thread(ptlrpc_invalidate_import_thread, imp,
1285                                    CLONE_VM | CLONE_FILES);
1286                 if (rc < 0) {
1287                         class_import_put(imp);
1288                         CERROR("error starting invalidate thread: %d\n", rc);
1289                 } else {
1290                         rc = 0;
1291                 }
1292                 RETURN(rc);
1293 #else
1294                 ptlrpc_invalidate_import(imp);
1295
1296                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1297 #endif
1298         }
1299
1300         if (imp->imp_state == LUSTRE_IMP_REPLAY) {
1301                 CDEBUG(D_HA, "replay requested by %s\n",
1302                        obd2cli_tgt(imp->imp_obd));
1303                 rc = ptlrpc_replay_next(imp, &inflight);
1304                 if (inflight == 0 &&
1305                     atomic_read(&imp->imp_replay_inflight) == 0) {
1306                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
1307                         rc = ldlm_replay_locks(imp);
1308                         if (rc)
1309                                 GOTO(out, rc);
1310                 }
1311                 rc = 0;
1312         }
1313
1314         if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
1315                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1316                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
1317                         rc = signal_completed_replay(imp);
1318                         if (rc)
1319                                 GOTO(out, rc);
1320                 }
1321
1322         }
1323
1324         if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
1325                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1326                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1327                 }
1328         }
1329
1330         if (imp->imp_state == LUSTRE_IMP_RECOVER) {
1331                 CDEBUG(D_HA, "reconnected to %s@%s\n",
1332                        obd2cli_tgt(imp->imp_obd),
1333                        imp->imp_connection->c_remote_uuid.uuid);
1334
1335                 rc = ptlrpc_resend(imp);
1336                 if (rc)
1337                         GOTO(out, rc);
1338                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
1339                 ptlrpc_activate_import(imp);
1340
1341                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1342                           &target_start, &target_len);
1343                 LCONSOLE_INFO("%s: Connection restored to service %.*s "
1344                               "using nid %s.\n", imp->imp_obd->obd_name,
1345                               target_len, target_start,
1346                               libcfs_nid2str(imp->imp_connection->c_peer.nid));
1347         }
1348
1349         if (imp->imp_state == LUSTRE_IMP_FULL) {
1350                 cfs_waitq_broadcast(&imp->imp_recovery_waitq);
1351                 ptlrpc_wake_delayed(imp);
1352         }
1353
1354  out:
1355         RETURN(rc);
1356 }
1357
1358 static int back_to_sleep(void *unused)
1359 {
1360         return 0;
1361 }
1362
1363 int ptlrpc_disconnect_import(struct obd_import *imp, int noclose)
1364 {
1365         struct ptlrpc_request *req;
1366         int rq_opc, rc = 0;
1367         int nowait = imp->imp_obd->obd_force;
1368         ENTRY;
1369
1370         if (nowait)
1371                 GOTO(set_state, rc);
1372
1373         switch (imp->imp_connect_op) {
1374         case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
1375         case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
1376         case MGS_CONNECT: rq_opc = MGS_DISCONNECT; break;
1377         default:
1378                 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
1379                        obd2cli_tgt(imp->imp_obd), imp->imp_connect_op);
1380                 RETURN(-EINVAL);
1381         }
1382
1383         if (ptlrpc_import_in_recovery(imp)) {
1384                 struct l_wait_info lwi;
1385                 cfs_duration_t timeout;
1386
1387                 if (AT_OFF) {
1388                         timeout = cfs_time_seconds(obd_timeout);
1389                 } else {
1390                         int idx = import_at_get_index(imp,
1391                                 imp->imp_client->cli_request_portal);
1392                         timeout = cfs_time_seconds(
1393                                 at_get(&imp->imp_at.iat_service_estimate[idx]));
1394                 }
1395                 lwi = LWI_TIMEOUT_INTR(cfs_timeout_cap(timeout),
1396                                        back_to_sleep, LWI_ON_SIGNAL_NOOP, NULL);
1397                 rc = l_wait_event(imp->imp_recovery_waitq,
1398                                   !ptlrpc_import_in_recovery(imp), &lwi);
1399         }
1400
1401         spin_lock(&imp->imp_lock);
1402         if (imp->imp_state != LUSTRE_IMP_FULL)
1403                 GOTO(out, 0);
1404
1405         spin_unlock(&imp->imp_lock);
1406
1407         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, rq_opc, 1, NULL, NULL);
1408         if (req) {
1409                 /* We are disconnecting, do not retry a failed DISCONNECT rpc if
1410                  * it fails.  We can get through the above with a down server
1411                  * if the client doesn't know the server is gone yet. */
1412                 req->rq_no_resend = 1;
1413
1414 #ifndef CRAY_XT3
1415                 /* We want client umounts to happen quickly, no matter the
1416                    server state... */
1417                 req->rq_timeout = min_t(int, req->rq_timeout,
1418                                         INITIAL_CONNECT_TIMEOUT);
1419 #else
1420                 /* ... but we always want liblustre clients to nicely
1421                    disconnect, so only use the adaptive value. */
1422                 if (AT_OFF)
1423                         req->rq_timeout = obd_timeout / 3;
1424 #endif
1425
1426                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
1427                 req->rq_send_state =  LUSTRE_IMP_CONNECTING;
1428                 ptlrpc_req_set_repsize(req, 1, NULL);
1429                 rc = ptlrpc_queue_wait(req);
1430                 ptlrpc_req_finished(req);
1431         }
1432
1433 set_state:
1434         spin_lock(&imp->imp_lock);
1435 out:
1436         if (noclose)
1437                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
1438         else
1439                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1440         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
1441         /* Try all connections in the future - bz 12758 */
1442         imp->imp_last_recon = 0;
1443         spin_unlock(&imp->imp_lock);
1444
1445         RETURN(rc);
1446 }
1447
1448 /* Sets maximal number of RPCs possible originating from other side of this
1449    import (server) to us and number of async RPC replies that we are not waiting
1450    for arriving */
1451 void ptlrpc_import_setasync(struct obd_import *imp, int count)
1452 {
1453         LNetSetAsync(imp->imp_connection->c_peer, count);
1454 }
1455
1456 void ptlrpc_cleanup_imp(struct obd_import *imp)
1457 {
1458         ENTRY;
1459
1460         spin_lock(&imp->imp_lock);
1461         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1462         imp->imp_generation++;
1463         spin_unlock(&imp->imp_lock);
1464         ptlrpc_abort_inflight(imp);
1465
1466         EXIT;
1467 }
1468
1469 /* Adaptive Timeout utils */
1470 extern unsigned int at_min, at_max, at_history;
1471
1472 /* Bin into timeslices using AT_BINS bins.
1473    This gives us a max of the last binlimit*AT_BINS secs without the storage,
1474    but still smoothing out a return to normalcy from a slow response.
1475    (E.g. remember the maximum latency in each minute of the last 4 minutes.) */
1476 int at_add(struct adaptive_timeout *at, unsigned int val)
1477 {
1478         unsigned int old = at->at_current;
1479         time_t now = cfs_time_current_sec();
1480         time_t binlimit = max_t(time_t, at_history / AT_BINS, 1);
1481
1482         LASSERT(at);
1483         CDEBUG(D_OTHER, "add %u to %p time=%lu v=%u (%u %u %u %u)\n",
1484                val, at, now - at->at_binstart, at->at_current,
1485                at->at_hist[0], at->at_hist[1], at->at_hist[2], at->at_hist[3]);
1486
1487         if (val == 0)
1488                 /* 0's don't count, because we never want our timeout to
1489                    drop to 0, and because 0 could mean an error */
1490                 return 0;
1491
1492         spin_lock(&at->at_lock);
1493
1494         if (unlikely(at->at_binstart == 0)) {
1495                 /* Special case to remove default from history */
1496                 at->at_current = val;
1497                 at->at_worst_ever = val;
1498                 at->at_worst_time = now;
1499                 at->at_hist[0] = val;
1500                 at->at_binstart = now;
1501         } else if (now - at->at_binstart < binlimit ) {
1502                 /* in bin 0 */
1503                 at->at_hist[0] = max(val, at->at_hist[0]);
1504                 at->at_current = max(val, at->at_current);
1505         } else {
1506                 int i, shift;
1507                 unsigned int maxv = val;
1508                 /* move bins over */
1509                 shift = (now - at->at_binstart) / binlimit;
1510                 LASSERT(shift > 0);
1511                 for(i = AT_BINS - 1; i >= 0; i--) {
1512                         if (i >= shift) {
1513                                 at->at_hist[i] = at->at_hist[i - shift];
1514                                 maxv = max(maxv, at->at_hist[i]);
1515                         } else {
1516                                 at->at_hist[i] = 0;
1517                         }
1518                 }
1519                 at->at_hist[0] = val;
1520                 at->at_current = maxv;
1521                 at->at_binstart += shift * binlimit;
1522         }
1523
1524         if (at->at_current > at->at_worst_ever) {
1525                 at->at_worst_ever = at->at_current;
1526                 at->at_worst_time = now;
1527         }
1528
1529         if (at->at_flags & AT_FLG_NOHIST)
1530                 /* Only keep last reported val; keeping the rest of the history
1531                    for proc only */
1532                 at->at_current = val;
1533
1534         if (at_max > 0)
1535                 at->at_current =  min(at->at_current, at_max);
1536         at->at_current =  max(at->at_current, at_min);
1537
1538         if (at->at_current != old)
1539                 CDEBUG(D_OTHER, "AT %p change: old=%u new=%u delta=%d "
1540                        "(val=%u) hist %u %u %u %u\n", at,
1541                        old, at->at_current, at->at_current - old, val,
1542                        at->at_hist[0], at->at_hist[1], at->at_hist[2],
1543                        at->at_hist[3]);
1544
1545         /* if we changed, report the old value */
1546         old = (at->at_current != old) ? old : 0;
1547
1548         spin_unlock(&at->at_lock);
1549         return old;
1550 }
1551
1552 /* Find the imp_at index for a given portal; assign if space available */
1553 int import_at_get_index(struct obd_import *imp, int portal)
1554 {
1555         struct imp_at *at = &imp->imp_at;
1556         int i;
1557
1558         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1559                 if (at->iat_portal[i] == portal)
1560                         return i;
1561                 if (at->iat_portal[i] == 0)
1562                         /* unused */
1563                         break;
1564         }
1565
1566         /* Not found in list, add it under a lock */
1567         spin_lock(&imp->imp_lock);
1568
1569         /* Check unused under lock */
1570         for (; i < IMP_AT_MAX_PORTALS; i++) {
1571                 if (at->iat_portal[i] == portal)
1572                         goto out;
1573                 if (at->iat_portal[i] == 0)
1574                         /* unused */
1575                         break;
1576         }
1577
1578         /* Not enough portals? */
1579         LASSERT(i < IMP_AT_MAX_PORTALS);
1580
1581         at->iat_portal[i] = portal;
1582 out:
1583         spin_unlock(&imp->imp_lock);
1584         return i;
1585 }