Whamcloud - gitweb
LU-16741 ptlrpc: ptlrpc: rename ptlrpc_req_finished
[fs/lustre-release.git] / lustre / ptlrpc / import.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/ptlrpc/import.c
32  *
33  * Author: Mike Shaver <shaver@clusterfs.com>
34  */
35
36 #define DEBUG_SUBSYSTEM S_RPC
37
38 #include <linux/fs_struct.h>
39 #include <linux/kthread.h>
40 #include <linux/delay.h>
41 #include <obd_support.h>
42 #include <lustre_ha.h>
43 #include <lustre_net.h>
44 #include <lustre_import.h>
45 #include <lustre_export.h>
46 #include <obd.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49
50 #include "ptlrpc_internal.h"
51
52 struct ptlrpc_connect_async_args {
53          __u64 pcaa_peer_committed;
54         int pcaa_initial_connect;
55 };
56
57 /**
58  * Updates import \a imp current state to provided \a state value
59  * Helper function.
60  */
61 static void import_set_state_nolock(struct obd_import *imp,
62                                     enum lustre_imp_state state)
63 {
64         switch (state) {
65         case LUSTRE_IMP_CLOSED:
66         case LUSTRE_IMP_NEW:
67         case LUSTRE_IMP_DISCON:
68         case LUSTRE_IMP_CONNECTING:
69                 break;
70         case LUSTRE_IMP_REPLAY_WAIT:
71                 imp->imp_replay_state = LUSTRE_IMP_REPLAY_LOCKS;
72                 break;
73         default:
74                 imp->imp_replay_state = LUSTRE_IMP_REPLAY;
75                 break;
76         }
77
78         /* A CLOSED import should remain so. */
79         if (imp->imp_state == LUSTRE_IMP_CLOSED)
80                 return;
81
82         if (imp->imp_state != LUSTRE_IMP_NEW) {
83                 CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",
84                        imp, obd2cli_tgt(imp->imp_obd),
85                        ptlrpc_import_state_name(imp->imp_state),
86                        ptlrpc_import_state_name(state));
87         }
88
89         imp->imp_state = state;
90         imp->imp_state_hist[imp->imp_state_hist_idx].ish_state = state;
91         imp->imp_state_hist[imp->imp_state_hist_idx].ish_time =
92                 ktime_get_real_seconds();
93         imp->imp_state_hist_idx = (imp->imp_state_hist_idx + 1) %
94                 IMP_STATE_HIST_LEN;
95 }
96
97 static void import_set_state(struct obd_import *imp,
98                              enum lustre_imp_state new_state)
99 {
100         spin_lock(&imp->imp_lock);
101         import_set_state_nolock(imp, new_state);
102         spin_unlock(&imp->imp_lock);
103 }
104
105 void ptlrpc_import_enter_resend(struct obd_import *imp)
106 {
107         import_set_state(imp, LUSTRE_IMP_RECOVER);
108 }
109 EXPORT_SYMBOL(ptlrpc_import_enter_resend);
110
111
112 static int ptlrpc_connect_interpret(const struct lu_env *env,
113                                     struct ptlrpc_request *request,
114                                     void *args, int rc);
115 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
116
117 /* Only this function is allowed to change the import state when it is
118  * CLOSED. I would rather refcount the import and free it after
119  * disconnection like we do with exports. To do that, the client_obd
120  * will need to save the peer info somewhere other than in the import,
121  * though.
122  */
123 int ptlrpc_init_import(struct obd_import *imp)
124 {
125         spin_lock(&imp->imp_lock);
126
127         imp->imp_generation++;
128         imp->imp_state =  LUSTRE_IMP_NEW;
129
130         spin_unlock(&imp->imp_lock);
131
132         return 0;
133 }
134 EXPORT_SYMBOL(ptlrpc_init_import);
135
136 #define UUID_STR "_UUID"
137 void deuuidify(char *uuid, const char *prefix, char **uuid_start, int *uuid_len)
138 {
139         *uuid_start = !prefix || strncmp(uuid, prefix, strlen(prefix))
140                 ? uuid : uuid + strlen(prefix);
141
142         *uuid_len = strlen(*uuid_start);
143
144         if (*uuid_len < strlen(UUID_STR))
145                 return;
146
147         if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
148                     UUID_STR, strlen(UUID_STR)))
149                 *uuid_len -= strlen(UUID_STR);
150 }
151 EXPORT_SYMBOL(deuuidify);
152
153 /* Must be called with imp_lock held! */
154 static void ptlrpc_deactivate_import_nolock(struct obd_import *imp)
155 {
156         ENTRY;
157
158         assert_spin_locked(&imp->imp_lock);
159         CDEBUG(D_HA, "setting import %s INVALID\n", obd2cli_tgt(imp->imp_obd));
160         imp->imp_invalid = 1;
161         imp->imp_generation++;
162
163         ptlrpc_abort_inflight(imp);
164
165         EXIT;
166 }
167
168 /**
169  * Returns true if import was FULL, false if import was already not
170  * connected.
171  * @imp - import to be disconnected
172  * @conn_cnt - connection count (epoch) of the request that timed out
173  *             and caused the disconnection.  In some cases, multiple
174  *             inflight requests can fail to a single target (e.g. OST
175  *             bulk requests) and if one has already caused a reconnection
176  *             (increasing the import->conn_cnt) the older failure should
177  *             not also cause a reconnection.  If zero it forces a reconnect.
178  * @invalid - set import invalid flag
179  */
180 int ptlrpc_set_import_discon(struct obd_import *imp,
181                              __u32 conn_cnt, bool invalid)
182 {
183         int rc = 0;
184
185         spin_lock(&imp->imp_lock);
186
187         if (imp->imp_state == LUSTRE_IMP_FULL &&
188             (conn_cnt == 0 || conn_cnt == imp->imp_conn_cnt)) {
189                 char *target_start;
190                 int   target_len;
191                 bool  inact = false;
192
193                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
194                           &target_start, &target_len);
195
196                 import_set_state_nolock(imp, LUSTRE_IMP_DISCON);
197                 if (imp->imp_replayable) {
198                         LCONSOLE_WARN("%s: Connection to %.*s (at %s) was lost; in progress operations using this service will wait for recovery to complete\n",
199                                imp->imp_obd->obd_name, target_len, target_start,
200                                obd_import_nid2str(imp));
201                 } else {
202                         LCONSOLE_ERROR_MSG(0x166, "%s: Connection to %.*s (at %s) was lost; in progress operations using this service will fail\n",
203                                imp->imp_obd->obd_name, target_len, target_start,
204                                obd_import_nid2str(imp));
205                         if (invalid) {
206                                 CDEBUG(D_HA,
207                                        "import %s@%s for %s not replayable, auto-deactivating\n",
208                                        obd2cli_tgt(imp->imp_obd),
209                                        imp->imp_connection->c_remote_uuid.uuid,
210                                        imp->imp_obd->obd_name);
211                                 ptlrpc_deactivate_import_nolock(imp);
212                                 inact = true;
213                         }
214                 }
215                 spin_unlock(&imp->imp_lock);
216
217                 if (obd_dump_on_timeout)
218                         libcfs_debug_dumplog();
219
220                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
221
222                 if (inact)
223                         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
224                 rc = 1;
225         } else {
226                 spin_unlock(&imp->imp_lock);
227                 CDEBUG(D_HA, "%s: import %p already %s (conn %u, was %u): %s\n",
228                        imp->imp_client->cli_name, imp,
229                        (imp->imp_state == LUSTRE_IMP_FULL &&
230                         imp->imp_conn_cnt > conn_cnt) ?
231                        "reconnected" : "not connected", imp->imp_conn_cnt,
232                        conn_cnt, ptlrpc_import_state_name(imp->imp_state));
233         }
234
235         return rc;
236 }
237
238 /*
239  * This acts as a barrier; all existing requests are rejected, and
240  * no new requests will be accepted until the import is valid again.
241  */
242 void ptlrpc_deactivate_import(struct obd_import *imp)
243 {
244         spin_lock(&imp->imp_lock);
245         ptlrpc_deactivate_import_nolock(imp);
246         spin_unlock(&imp->imp_lock);
247
248         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
249 }
250 EXPORT_SYMBOL(ptlrpc_deactivate_import);
251
252 static time64_t ptlrpc_inflight_deadline(struct ptlrpc_request *req,
253                                          time64_t now)
254 {
255         time64_t dl;
256
257         if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
258               (req->rq_phase == RQ_PHASE_BULK) ||
259               (req->rq_phase == RQ_PHASE_NEW)))
260                 return 0;
261
262         if (req->rq_timedout)
263                 return 0;
264
265         if (req->rq_phase == RQ_PHASE_NEW)
266                 dl = req->rq_sent;
267         else
268                 dl = req->rq_deadline;
269
270         if (dl <= now)
271                 return 0;
272
273         return dl - now;
274 }
275
276 static time64_t ptlrpc_inflight_timeout(struct obd_import *imp)
277 {
278         time64_t now = ktime_get_real_seconds();
279         struct ptlrpc_request *req;
280         time64_t timeout = 0;
281
282         spin_lock(&imp->imp_lock);
283         list_for_each_entry(req, &imp->imp_sending_list, rq_list)
284                 timeout = max(ptlrpc_inflight_deadline(req, now), timeout);
285         spin_unlock(&imp->imp_lock);
286         return timeout;
287 }
288
289 /**
290  * This function will invalidate the import, if necessary, then block
291  * for all the RPC completions, and finally notify the obd to
292  * invalidate its state (ie cancel locks, clear pending requests,
293  * etc).
294  */
295 void ptlrpc_invalidate_import(struct obd_import *imp)
296 {
297         struct ptlrpc_request *req;
298         time64_t timeout;
299         int rc;
300
301         atomic_inc(&imp->imp_inval_count);
302
303         if (!imp->imp_invalid || imp->imp_obd->obd_no_recov)
304                 ptlrpc_deactivate_import(imp);
305
306         if (CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CONNECT_RACE)) {
307                 CFS_RACE(OBD_FAIL_PTLRPC_CONNECT_RACE);
308                 msleep(10 * MSEC_PER_SEC);
309         }
310         CFS_FAIL_TIMEOUT(OBD_FAIL_MGS_CONNECT_NET, 3 * cfs_fail_val / 2);
311         LASSERT(imp->imp_invalid);
312
313         /* Wait forever until inflight == 0. We really can't do it another
314          * way because in some cases we need to wait for very long reply
315          * unlink. We can't do anything before that because there is really
316          * no guarantee that some rdma transfer is not in progress right now.
317          */
318         do {
319                 long timeout_jiffies;
320
321                 /* Calculate max timeout for waiting on rpcs to error
322                  * out. Use obd_timeout if calculated value is smaller
323                  * than it.
324                  */
325                 if (!CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) {
326                         timeout = ptlrpc_inflight_timeout(imp);
327                         timeout += div_u64(timeout, 3);
328
329                         if (timeout == 0)
330                                 timeout = obd_timeout;
331                 } else {
332                         /* decrease the interval to increase race condition */
333                         timeout = 1;
334                 }
335
336                 CDEBUG(D_RPCTRACE, "Sleeping %llds for inflight to error out\n",
337                        timeout);
338
339                 /* Wait for all requests to error out and call completion
340                  * callbacks. Cap it at obd_timeout -- these should all
341                  * have been locally cancelled by ptlrpc_abort_inflight.
342                  */
343                 timeout_jiffies = max_t(long, cfs_time_seconds(timeout), 1);
344                 rc = wait_event_idle_timeout(
345                                     imp->imp_recovery_waitq,
346                                     (atomic_read(&imp->imp_inflight) == 0),
347                                     timeout_jiffies);
348
349                 if (rc == 0) {
350                         const char *cli_tgt = obd2cli_tgt(imp->imp_obd);
351
352                         CERROR("%s: timeout waiting for callback (%d != 0)\n",
353                                cli_tgt, atomic_read(&imp->imp_inflight));
354
355                         spin_lock(&imp->imp_lock);
356                         if (atomic_read(&imp->imp_inflight) == 0) {
357                                 int count = atomic_read(&imp->imp_unregistering);
358
359                                 /* We know that "unregistering" rpcs only can
360                                  * survive in sending or delaying lists (they
361                                  * maybe waiting for long reply unlink in
362                                  * sluggish nets). Check this. If there is no
363                                  * inflight and unregistering != 0, this is bug
364                                  */
365                                 LASSERTF(count == 0, "Some RPCs are still unregistering: %d\n",
366                                          count);
367
368                                 /* Let's save one loop as soon as inflight have
369                                  * dropped to zero. No new inflights possible at
370                                  * this point.
371                                  */
372                                 rc = 1;
373                         } else {
374                                 list_for_each_entry(req, &imp->imp_sending_list,
375                                                     rq_list) {
376                                         DEBUG_REQ(D_ERROR, req,
377                                                   "still on sending list");
378                                 }
379                                 list_for_each_entry(req, &imp->imp_delayed_list,
380                                                     rq_list) {
381                                         DEBUG_REQ(D_ERROR, req,
382                                                   "still on delayed list");
383                                 }
384
385                                 CERROR("%s: Unregistering RPCs found (%d). Network is sluggish? Waiting for them to error out.\n",
386                                        cli_tgt,
387                                        atomic_read(&imp->imp_unregistering));
388                         }
389                         spin_unlock(&imp->imp_lock);
390                 }
391         } while (rc == 0);
392
393         /*
394          * Let's additionally check that no new rpcs added to import in
395          * "invalidate" state.
396          */
397         LASSERT(atomic_read(&imp->imp_inflight) == 0);
398         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
399         sptlrpc_import_flush_all_ctx(imp);
400
401         atomic_dec(&imp->imp_inval_count);
402         wake_up(&imp->imp_recovery_waitq);
403 }
404 EXPORT_SYMBOL(ptlrpc_invalidate_import);
405
406 /* unset imp_invalid */
407 void ptlrpc_activate_import(struct obd_import *imp, bool set_state_full)
408 {
409         struct obd_device *obd = imp->imp_obd;
410
411         spin_lock(&imp->imp_lock);
412         if (imp->imp_deactive != 0) {
413                 LASSERT(imp->imp_state != LUSTRE_IMP_FULL);
414                 if (imp->imp_state != LUSTRE_IMP_DISCON)
415                         import_set_state_nolock(imp, LUSTRE_IMP_DISCON);
416                 spin_unlock(&imp->imp_lock);
417                 return;
418         }
419         if (set_state_full)
420                 import_set_state_nolock(imp, LUSTRE_IMP_FULL);
421
422         imp->imp_invalid = 0;
423
424         spin_unlock(&imp->imp_lock);
425         obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
426 }
427 EXPORT_SYMBOL(ptlrpc_activate_import);
428
429 void ptlrpc_pinger_force(struct obd_import *imp)
430 {
431         CDEBUG(D_HA, "%s: waking up pinger s:%s\n", obd2cli_tgt(imp->imp_obd),
432                ptlrpc_import_state_name(imp->imp_state));
433
434         spin_lock(&imp->imp_lock);
435         imp->imp_force_verify = 1;
436         spin_unlock(&imp->imp_lock);
437
438         if (imp->imp_state != LUSTRE_IMP_CONNECTING)
439                 ptlrpc_pinger_wake_up();
440 }
441 EXPORT_SYMBOL(ptlrpc_pinger_force);
442
443 void ptlrpc_fail_import(struct obd_import *imp, __u32 conn_cnt)
444 {
445         ENTRY;
446
447         LASSERT(!imp->imp_dlm_fake);
448
449         if (ptlrpc_set_import_discon(imp, conn_cnt, true))
450                 ptlrpc_pinger_force(imp);
451
452         EXIT;
453 }
454
455 int ptlrpc_reconnect_import(struct obd_import *imp)
456 {
457         int rc = 0;
458
459         ENTRY;
460
461         ptlrpc_set_import_discon(imp, 0, true);
462         /* Force a new connect attempt */
463         ptlrpc_invalidate_import(imp);
464         /* Wait for all invalidate calls to finish */
465         if (atomic_read(&imp->imp_inval_count) > 0) {
466                 int rc;
467
468                 rc = l_wait_event_abortable(
469                         imp->imp_recovery_waitq,
470                         (atomic_read(&imp->imp_inval_count) == 0));
471                 if (rc)
472                         CERROR("Interrupted, inval=%d\n",
473                                atomic_read(&imp->imp_inval_count));
474         }
475
476         /* Allow reconnect attempts */
477         imp->imp_obd->obd_no_recov = 0;
478         imp->imp_remote_handle.cookie = 0;
479         /* Attempt a new connect */
480         rc = ptlrpc_recover_import(imp, NULL, 0);
481
482         RETURN(rc);
483 }
484 EXPORT_SYMBOL(ptlrpc_reconnect_import);
485
486 /**
487  * Connection on import \a imp is changed to another one (if more than one is
488  * present). We typically chose connection that we have not tried to connect to
489  * the longest
490  */
491 static int import_select_connection(struct obd_import *imp)
492 {
493         struct obd_import_conn *imp_conn = NULL, *conn;
494         struct obd_export *dlmexp;
495         char *target_start;
496         int target_len, tried_all = 1;
497         int rc = 0;
498
499         ENTRY;
500
501         spin_lock(&imp->imp_lock);
502
503         if (list_empty(&imp->imp_conn_list)) {
504                 rc = -EINVAL;
505                 CERROR("%s: no connections available: rc = %d\n",
506                        imp->imp_obd->obd_name, rc);
507                 GOTO(out_unlock, rc);
508         }
509
510         list_for_each_entry(conn, &imp->imp_conn_list, oic_item) {
511                 CDEBUG(D_HA, "%s: connect to NID %s last attempt %lld\n",
512                        imp->imp_obd->obd_name,
513                        libcfs_nidstr(&conn->oic_conn->c_peer.nid),
514                        conn->oic_last_attempt);
515
516                 /* If we have not tried this connection since
517                  * the last successful attempt, go with this one
518                  */
519                 if ((conn->oic_last_attempt == 0) ||
520                     conn->oic_last_attempt <= imp->imp_last_success_conn) {
521                         imp_conn = conn;
522                         tried_all = 0;
523                         break;
524                 }
525
526                 /* If all of the connections have already been tried
527                  * since the last successful connection; just choose the
528                  * least recently used
529                  */
530                 if (!imp_conn)
531                         imp_conn = conn;
532                 else if (imp_conn->oic_last_attempt > conn->oic_last_attempt)
533                         imp_conn = conn;
534         }
535
536         /* if not found, simply choose the current one */
537         if (!imp_conn || imp->imp_force_reconnect) {
538                 LASSERT(imp->imp_conn_current);
539                 imp_conn = imp->imp_conn_current;
540                 tried_all = 0;
541         }
542         LASSERT(imp_conn->oic_conn);
543
544         /* If we've tried everything, and we're back to the beginning of the
545          * list, increase our timeout and try again. It will be reset when
546          * we do finally connect. (FIXME: really we should wait for all network
547          * state associated with the last connection attempt to drain before
548          * trying to reconnect on it.)
549          */
550         if (tried_all && (imp->imp_conn_list.next == &imp_conn->oic_item)) {
551                 struct adaptive_timeout *at = &imp->imp_at.iat_net_latency;
552                 timeout_t timeout = obd_at_get(imp->imp_obd, at);
553
554                 if (timeout < CONNECTION_SWITCH_MAX) {
555                         obd_at_measure(imp->imp_obd, at,
556                                        timeout + CONNECTION_SWITCH_INC);
557                         if (timeout > CONNECTION_SWITCH_MAX)
558                                 at_reset(at, CONNECTION_SWITCH_MAX);
559                 }
560                 LASSERT(imp_conn->oic_last_attempt);
561                 CDEBUG(D_HA,
562                        "%s: tried all connections, increasing latency to %ds\n",
563                        imp->imp_obd->obd_name, timeout);
564         }
565
566         imp_conn->oic_last_attempt = ktime_get_seconds();
567
568         /* switch connection, don't mind if it's same as the current one */
569         ptlrpc_connection_put(imp->imp_connection);
570         imp->imp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
571
572         dlmexp = class_conn2export(&imp->imp_dlm_handle);
573         if (!dlmexp)
574                 GOTO(out_unlock, rc = -EINVAL);
575         ptlrpc_connection_put(dlmexp->exp_connection);
576         dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
577         class_export_put(dlmexp);
578
579         if (imp->imp_conn_current != imp_conn) {
580                 if (imp->imp_conn_current) {
581                         deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
582                                   &target_start, &target_len);
583
584                         CDEBUG(D_HA, "%s: Connection changing to %.*s (at %s)\n",
585                                imp->imp_obd->obd_name,
586                                target_len, target_start,
587                                libcfs_nidstr(&imp_conn->oic_conn->c_peer.nid));
588                 }
589
590                 imp->imp_conn_current = imp_conn;
591         }
592
593         /* The below message is checked in conf-sanity.sh test_35[ab] */
594         CDEBUG(D_HA, "%s: import %p using connection %s/%s\n",
595                imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid,
596                libcfs_nidstr(&imp_conn->oic_conn->c_peer.nid));
597
598 out_unlock:
599         spin_unlock(&imp->imp_lock);
600         RETURN(rc);
601 }
602
603 /*
604  * must be called under imp_lock
605  */
606 static int ptlrpc_first_transno(struct obd_import *imp, __u64 *transno)
607 {
608         struct ptlrpc_request *req;
609
610         /* The requests in committed_list always have smaller transnos than
611          * the requests in replay_list
612          */
613         if (!list_empty(&imp->imp_committed_list)) {
614                 req = list_first_entry(&imp->imp_committed_list,
615                                        struct ptlrpc_request, rq_replay_list);
616                 *transno = req->rq_transno;
617                 if (req->rq_transno == 0) {
618                         DEBUG_REQ(D_ERROR, req,
619                                   "zero transno in committed_list");
620                         LBUG();
621                 }
622                 return 1;
623         }
624         if (!list_empty(&imp->imp_replay_list)) {
625                 req = list_first_entry(&imp->imp_replay_list,
626                                        struct ptlrpc_request, rq_replay_list);
627                 *transno = req->rq_transno;
628                 if (req->rq_transno == 0) {
629                         DEBUG_REQ(D_ERROR, req, "zero transno in replay_list");
630                         LBUG();
631                 }
632                 return 1;
633         }
634         return 0;
635 }
636
637 int ptlrpc_connect_import(struct obd_import *imp)
638 {
639         spin_lock(&imp->imp_lock);
640         return ptlrpc_connect_import_locked(imp);
641 }
642
643 /**
644  * Attempt to (re)connect import \a imp. This includes all preparations,
645  * initializing CONNECT RPC request and passing it to ptlrpcd for
646  * actual sending.
647  *
648  * Assumes imp->imp_lock is held, and releases it.
649  *
650  * Returns 0 on success or error code.
651  */
652 int ptlrpc_connect_import_locked(struct obd_import *imp)
653 {
654         struct obd_device *obd = imp->imp_obd;
655         int initial_connect = 0;
656         int set_transno = 0;
657         __u64 committed_before_reconnect = 0;
658         struct ptlrpc_request *request;
659         struct sptlrpc_sepol *sepol;
660         struct obd_connect_data ocd;
661         char *bufs[] = { NULL,
662                          obd2cli_tgt(imp->imp_obd),
663                          obd->obd_uuid.uuid,
664                          (char *)&imp->imp_dlm_handle,
665                          (char *)&ocd,
666                          NULL };
667         struct ptlrpc_connect_async_args *aa;
668         int rc;
669
670         ENTRY;
671
672         assert_spin_locked(&imp->imp_lock);
673
674         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
675                 spin_unlock(&imp->imp_lock);
676                 CERROR("can't connect to a closed import\n");
677                 RETURN(-EINVAL);
678         } else if (imp->imp_state == LUSTRE_IMP_FULL) {
679                 spin_unlock(&imp->imp_lock);
680                 CERROR("already connected\n");
681                 RETURN(0);
682         } else if (imp->imp_state == LUSTRE_IMP_CONNECTING ||
683                    imp->imp_state == LUSTRE_IMP_EVICTED ||
684                    imp->imp_connected) {
685                 spin_unlock(&imp->imp_lock);
686                 CERROR("already connecting\n");
687                 RETURN(-EALREADY);
688         }
689
690         import_set_state_nolock(imp, LUSTRE_IMP_CONNECTING);
691
692         imp->imp_conn_cnt++;
693         imp->imp_resend_replay = 0;
694
695         if (!lustre_handle_is_used(&imp->imp_remote_handle))
696                 initial_connect = 1;
697         else
698                 committed_before_reconnect = imp->imp_peer_committed_transno;
699
700         set_transno = ptlrpc_first_transno(imp,
701                                            &imp->imp_connect_data.ocd_transno);
702         spin_unlock(&imp->imp_lock);
703
704         rc = import_select_connection(imp);
705         if (rc)
706                 GOTO(out, rc);
707
708         rc = sptlrpc_import_sec_adapt(imp, NULL, NULL);
709         if (rc)
710                 GOTO(out, rc);
711
712         /* Reset connect flags to the originally requested flags, in case
713          * the server is updated on-the-fly we will get the new features.
714          */
715         ocd = imp->imp_connect_data;
716         ocd.ocd_connect_flags = imp->imp_connect_flags_orig;
717         ocd.ocd_connect_flags2 = imp->imp_connect_flags2_orig;
718         /* Reset ocd_version each time so the server knows the exact versions */
719         ocd.ocd_version = LUSTRE_VERSION_CODE;
720         imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
721         imp->imp_msghdr_flags &= ~MSGHDR_CKSUM_INCOMPAT18;
722
723         rc = obd_reconnect(NULL, imp->imp_obd->obd_self_export, obd,
724                            &obd->obd_uuid, &ocd, NULL);
725         if (rc)
726                 GOTO(out, rc);
727
728         request = ptlrpc_request_alloc(imp, &RQF_MDS_CONNECT);
729         if (request == NULL)
730                 GOTO(out, rc = -ENOMEM);
731
732         /* get SELinux policy info if any */
733         sepol = sptlrpc_sepol_get(request);
734         if (IS_ERR(sepol)) {
735                 ptlrpc_request_free(request);
736                 GOTO(out, rc = PTR_ERR(sepol));
737         }
738
739         bufs[5] = sepol->ssp_sepol;
740
741         req_capsule_set_size(&request->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
742                              sptlrpc_sepol_size(sepol));
743
744         rc = ptlrpc_request_bufs_pack(request, LUSTRE_OBD_VERSION,
745                                       imp->imp_connect_op, bufs, NULL);
746
747         sptlrpc_sepol_put(sepol);
748         if (rc) {
749                 ptlrpc_request_free(request);
750                 GOTO(out, rc);
751         }
752
753         /* Report the rpc service time to the server so that it knows how long
754          * to wait for clients to join recovery
755          */
756         lustre_msg_set_service_timeout(request->rq_reqmsg,
757                                        at_timeout2est(request->rq_timeout));
758
759         /* The amount of time we give the server to process the connect req.
760          * import_select_connection will increase the net latency on
761          * repeated reconnect attempts to cover slow networks.
762          * We override/ignore the server rpc completion estimate here,
763          * which may be large if this is a reconnect attempt
764          */
765         request->rq_timeout = INITIAL_CONNECT_TIMEOUT;
766         lustre_msg_set_timeout(request->rq_reqmsg, request->rq_timeout);
767
768         request->rq_no_resend = request->rq_no_delay = 1;
769         request->rq_send_state = LUSTRE_IMP_CONNECTING;
770         /* Allow a slightly larger reply for future growth compatibility */
771         req_capsule_set_size(&request->rq_pill, &RMF_CONNECT_DATA, RCL_SERVER,
772                              sizeof(struct obd_connect_data)+16*sizeof(__u64));
773         ptlrpc_request_set_replen(request);
774         request->rq_interpret_reply = ptlrpc_connect_interpret;
775
776         aa = ptlrpc_req_async_args(aa, request);
777         memset(aa, 0, sizeof(*aa));
778
779         aa->pcaa_peer_committed = committed_before_reconnect;
780         aa->pcaa_initial_connect = initial_connect;
781
782         if (aa->pcaa_initial_connect) {
783                 spin_lock(&imp->imp_lock);
784                 imp->imp_replayable = 1;
785                 spin_unlock(&imp->imp_lock);
786                 lustre_msg_add_op_flags(request->rq_reqmsg,
787                                         MSG_CONNECT_INITIAL);
788         }
789
790         if (set_transno)
791                 lustre_msg_add_op_flags(request->rq_reqmsg,
792                                         MSG_CONNECT_TRANSNO);
793
794         DEBUG_REQ(D_RPCTRACE, request, "(re)connect request (timeout %d)",
795                   request->rq_timeout);
796         ptlrpcd_add_req(request);
797         rc = 0;
798 out:
799         if (rc != 0)
800                 import_set_state(imp, LUSTRE_IMP_DISCON);
801
802         RETURN(rc);
803 }
804 EXPORT_SYMBOL(ptlrpc_connect_import);
805
806 static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp)
807 {
808         int force_verify;
809
810         spin_lock(&imp->imp_lock);
811         force_verify = imp->imp_force_verify != 0;
812         spin_unlock(&imp->imp_lock);
813
814         if (force_verify)
815                 ptlrpc_pinger_wake_up();
816 }
817
818 static int ptlrpc_busy_reconnect(int rc)
819 {
820         return (rc == -EBUSY) || (rc == -EAGAIN);
821 }
822
823 static int ptlrpc_connect_set_flags(struct obd_import *imp,
824                                     struct obd_connect_data *ocd,
825                                     __u64 old_connect_flags,
826                                     struct obd_export *exp, int init_connect)
827 {
828         static bool warned;
829         struct client_obd *cli = &imp->imp_obd->u.cli;
830
831         spin_lock(&imp->imp_lock);
832         list_move(&imp->imp_conn_current->oic_item,
833                   &imp->imp_conn_list);
834         imp->imp_last_success_conn =
835                 imp->imp_conn_current->oic_last_attempt;
836
837         spin_unlock(&imp->imp_lock);
838
839         if (!warned && (ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
840             (ocd->ocd_version > LUSTRE_VERSION_CODE +
841                                 LUSTRE_VERSION_OFFSET_WARN ||
842              ocd->ocd_version < LUSTRE_VERSION_CODE -
843                                 LUSTRE_VERSION_OFFSET_WARN)) {
844                 /* few compiler do not like #ifdef in middle of macro argument*/
845                 const char *older = "older than client. Consider upgrading server"
846                                     ;
847                 const char *newer = "newer than client. Consider upgrading client"
848                                     ;
849
850                 LCONSOLE_WARN("Client version (%s). Server %s version (%d.%d.%d.%d) is much %s\n",
851                               LUSTRE_VERSION_STRING,
852                               obd2cli_tgt(imp->imp_obd),
853                               OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
854                               OBD_OCD_VERSION_MINOR(ocd->ocd_version),
855                               OBD_OCD_VERSION_PATCH(ocd->ocd_version),
856                               OBD_OCD_VERSION_FIX(ocd->ocd_version),
857                               ocd->ocd_version > LUSTRE_VERSION_CODE ?
858                               newer : older);
859                 warned = true;
860         }
861
862         if (ocd->ocd_connect_flags & OBD_CONNECT_CKSUM) {
863                 /* We sent to the server ocd_cksum_types with bits set
864                  * for algorithms we understand. The server masked off
865                  * the checksum types it doesn't support
866                  */
867                 if ((ocd->ocd_cksum_types &
868                      obd_cksum_types_supported_client()) == 0) {
869                         LCONSOLE_ERROR("The negotiation of the checksum alogrithm to use with server %s failed (%x/%x)\n",
870                                        obd2cli_tgt(imp->imp_obd),
871                                        ocd->ocd_cksum_types,
872                                        obd_cksum_types_supported_client());
873                         return -EPROTO;
874                 }
875                 cli->cl_supp_cksum_types = ocd->ocd_cksum_types;
876         } else {
877                 /* The server does not support OBD_CONNECT_CKSUM.
878                  * Enforce ADLER for backward compatibility
879                  */
880                 cli->cl_supp_cksum_types = OBD_CKSUM_ADLER;
881         }
882         cli->cl_cksum_type = obd_cksum_type_select(imp->imp_obd->obd_name,
883                                                   cli->cl_supp_cksum_types,
884                                                   cli->cl_preferred_cksum_type);
885
886         if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE)
887                 cli->cl_max_pages_per_rpc =
888                         min(ocd->ocd_brw_size >> PAGE_SHIFT,
889                             cli->cl_max_pages_per_rpc);
890         else if (imp->imp_connect_op == MDS_CONNECT ||
891                  imp->imp_connect_op == MGS_CONNECT)
892                 cli->cl_max_pages_per_rpc = 1;
893
894         LASSERT((cli->cl_max_pages_per_rpc <= PTLRPC_MAX_BRW_PAGES) &&
895                 (cli->cl_max_pages_per_rpc > 0));
896
897         client_adjust_max_dirty(cli);
898
899         /* Update client max modify RPCs in flight with value returned
900          * by the server
901          */
902         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
903                 cli->cl_max_mod_rpcs_in_flight = min(
904                                         cli->cl_max_mod_rpcs_in_flight,
905                                         ocd->ocd_maxmodrpcs);
906         else
907                 cli->cl_max_mod_rpcs_in_flight = 1;
908
909         /* Reset ns_connect_flags only for initial connect. Might be changed
910          * while using FS and if we reset it in reconnect this leads to losing
911          * user settings done before such as disable lru_resize, etc.
912          */
913         if (old_connect_flags != exp_connect_flags(exp) || init_connect) {
914                 struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
915                 __u64 changed_flags;
916
917                 changed_flags =
918                         ns->ns_connect_flags ^ ns->ns_orig_connect_flags;
919                 CDEBUG(D_HA, "%s: Resetting ns_connect_flags to server flags: %#llx\n",
920                              imp->imp_obd->obd_name,
921                              ocd->ocd_connect_flags);
922                 ns->ns_connect_flags = (ns->ns_connect_flags & changed_flags) |
923                                       (ocd->ocd_connect_flags & ~changed_flags);
924                 ns->ns_orig_connect_flags = ocd->ocd_connect_flags;
925         }
926
927         if (ocd->ocd_connect_flags & OBD_CONNECT_AT)
928                 /* We need a per-message support flag, because
929                  * a. we don't know if the incoming connect reply
930                  *    supports AT or not (in reply_in_callback)
931                  *    until we unpack it.
932                  * b. failovered server means export and flags are gone
933                  *    (in ptlrpc_send_reply).
934                  *    Can only be set when we know AT is supported at
935                  *    both ends
936                  */
937                 imp->imp_msghdr_flags |= MSGHDR_AT_SUPPORT;
938         else
939                 imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
940
941         imp->imp_msghdr_flags |= MSGHDR_CKSUM_INCOMPAT18;
942
943         return 0;
944 }
945
946 /**
947  * Add all replay requests back to unreplied list before start replay,
948  * so that we can make sure the known replied XID is always increased
949  * only even if when replaying requests.
950  */
951 static void ptlrpc_prepare_replay(struct obd_import *imp)
952 {
953         struct ptlrpc_request *req;
954
955         if (imp->imp_state != LUSTRE_IMP_REPLAY ||
956             imp->imp_resend_replay)
957                 return;
958
959         /* If the server was restart during repaly, the requests may
960          * have been added to the unreplied list in former replay.
961          */
962         spin_lock(&imp->imp_lock);
963
964         list_for_each_entry(req, &imp->imp_committed_list, rq_replay_list) {
965                 if (list_empty(&req->rq_unreplied_list))
966                         ptlrpc_add_unreplied(req);
967         }
968
969         list_for_each_entry(req, &imp->imp_replay_list, rq_replay_list) {
970                 if (list_empty(&req->rq_unreplied_list))
971                         ptlrpc_add_unreplied(req);
972         }
973
974         imp->imp_known_replied_xid = ptlrpc_known_replied_xid(imp);
975         spin_unlock(&imp->imp_lock);
976 }
977
978 /**
979  * interpret_reply callback for connect RPCs.
980  * Looks into returned status of connect operation and decides
981  * what to do with the import - i.e enter recovery, promote it to
982  * full state for normal operations of disconnect it due to an error.
983  */
984 static int ptlrpc_connect_interpret(const struct lu_env *env,
985                                     struct ptlrpc_request *request,
986                                     void *data, int rc)
987 {
988         struct ptlrpc_connect_async_args *aa = data;
989         struct obd_import *imp = request->rq_import;
990         struct lustre_handle old_hdl;
991         __u64 old_connect_flags;
992         timeout_t service_timeout;
993         int msg_flags;
994         struct obd_connect_data *ocd;
995         struct obd_export *exp = NULL;
996         int ret;
997
998         ENTRY;
999
1000         spin_lock(&imp->imp_lock);
1001         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
1002                 imp->imp_connect_tried = 1;
1003                 spin_unlock(&imp->imp_lock);
1004                 RETURN(0);
1005         }
1006
1007         imp->imp_connect_error = rc;
1008         if (rc) {
1009                 struct ptlrpc_request *free_req;
1010                 struct ptlrpc_request *tmp;
1011
1012                 /* abort all delayed requests initiated connection */
1013                 list_for_each_entry_safe(free_req, tmp, &imp->imp_delayed_list,
1014                                          rq_list) {
1015                         spin_lock(&free_req->rq_lock);
1016                         if (free_req->rq_no_resend) {
1017                                 free_req->rq_err = 1;
1018                                 free_req->rq_status = -EIO;
1019                                 ptlrpc_client_wake_req(free_req);
1020                         }
1021                         spin_unlock(&free_req->rq_lock);
1022                 }
1023
1024                 /* if this reconnect to busy export - not need select new target
1025                  * for connecting
1026                  */
1027                 imp->imp_force_reconnect = ptlrpc_busy_reconnect(rc);
1028                 spin_unlock(&imp->imp_lock);
1029                 GOTO(out, rc);
1030         }
1031
1032         /* LU-7558: indicate that we are interpretting connect reply,
1033          * pltrpc_connect_import() will not try to reconnect until
1034          * interpret will finish.
1035          */
1036         imp->imp_connected = 1;
1037         spin_unlock(&imp->imp_lock);
1038
1039         LASSERT(imp->imp_conn_current);
1040
1041         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
1042
1043         ret = req_capsule_get_size(&request->rq_pill, &RMF_CONNECT_DATA,
1044                                    RCL_SERVER);
1045         /* server replied obd_connect_data is always bigger */
1046         ocd = req_capsule_server_sized_get(&request->rq_pill,
1047                                            &RMF_CONNECT_DATA, ret);
1048
1049         if (ocd == NULL) {
1050                 CERROR("%s: no connect data from server\n",
1051                        imp->imp_obd->obd_name);
1052                 rc = -EPROTO;
1053                 GOTO(out, rc);
1054         }
1055
1056         spin_lock(&imp->imp_lock);
1057
1058         /* All imports are pingable */
1059         imp->imp_pingable = 1;
1060         imp->imp_force_reconnect = 0;
1061         imp->imp_force_verify = 0;
1062         imp->imp_setup_time = ktime_get_seconds();
1063
1064         imp->imp_connect_data = *ocd;
1065
1066         CDEBUG(D_HA, "%s: connect to target with instance %u\n",
1067                imp->imp_obd->obd_name, ocd->ocd_instance);
1068         exp = class_conn2export(&imp->imp_dlm_handle);
1069
1070         spin_unlock(&imp->imp_lock);
1071
1072         if (!exp) {
1073                 /* Could happen if export is cleaned during connect attempt */
1074                 CERROR("%s: missing export after connect\n",
1075                        imp->imp_obd->obd_name);
1076                 GOTO(out, rc = -ENODEV);
1077         }
1078
1079         /* check that server granted subset of flags we asked for. */
1080         if ((ocd->ocd_connect_flags & imp->imp_connect_flags_orig) !=
1081             ocd->ocd_connect_flags) {
1082                 CERROR("%s: Server didn't grant requested subset of flags: asked=%#llx granted=%#llx\n",
1083                        imp->imp_obd->obd_name, imp->imp_connect_flags_orig,
1084                        ocd->ocd_connect_flags);
1085                 GOTO(out, rc = -EPROTO);
1086         }
1087
1088         if ((ocd->ocd_connect_flags2 & imp->imp_connect_flags2_orig) !=
1089             ocd->ocd_connect_flags2) {
1090                 CERROR("%s: Server didn't grant requested subset of flags2: asked=%#llx granted=%#llx\n",
1091                        imp->imp_obd->obd_name, imp->imp_connect_flags2_orig,
1092                        ocd->ocd_connect_flags2);
1093                 GOTO(out, rc = -EPROTO);
1094         }
1095
1096         if (!(imp->imp_connect_flags_orig & OBD_CONNECT_LIGHTWEIGHT) &&
1097             (imp->imp_connect_flags_orig & OBD_CONNECT_MDS_MDS) &&
1098             (imp->imp_connect_flags_orig & OBD_CONNECT_FID) &&
1099             (ocd->ocd_connect_flags & OBD_CONNECT_VERSION)) {
1100                 __u32 major = OBD_OCD_VERSION_MAJOR(ocd->ocd_version);
1101                 __u32 minor = OBD_OCD_VERSION_MINOR(ocd->ocd_version);
1102                 __u32 patch = OBD_OCD_VERSION_PATCH(ocd->ocd_version);
1103
1104                 /* We do not support the MDT-MDT interoperations with
1105                  * different version MDT because of protocol changes.
1106                  */
1107                 if (unlikely(major != LUSTRE_MAJOR ||
1108                              minor != LUSTRE_MINOR ||
1109                              abs(patch - LUSTRE_PATCH) > 3)) {
1110                         LCONSOLE_WARN("%s: import %p (%u.%u.%u.%u) tried the connection to different version MDT (%d.%d.%d.%d) %s\n",
1111                                       imp->imp_obd->obd_name, imp, LUSTRE_MAJOR,
1112                                       LUSTRE_MINOR, LUSTRE_PATCH, LUSTRE_FIX,
1113                                       major, minor, patch,
1114                                       OBD_OCD_VERSION_FIX(ocd->ocd_version),
1115                                       imp->imp_connection->c_remote_uuid.uuid);
1116
1117                         GOTO(out, rc = -EPROTO);
1118                 }
1119         }
1120
1121         old_connect_flags = exp_connect_flags(exp);
1122         exp->exp_connect_data = *ocd;
1123         imp->imp_obd->obd_self_export->exp_connect_data = *ocd;
1124
1125         /* The net statistics after (re-)connect is not valid anymore,
1126          * because may reflect other routing, etc.
1127          */
1128         service_timeout = lustre_msg_get_service_timeout(request->rq_repmsg);
1129         at_reinit(&imp->imp_at.iat_net_latency, 0, 0);
1130         ptlrpc_at_adj_net_latency(request, service_timeout);
1131
1132         /* Import flags should be updated before waking import at FULL state */
1133         rc = ptlrpc_connect_set_flags(imp, ocd, old_connect_flags, exp,
1134                                       aa->pcaa_initial_connect);
1135         class_export_put(exp);
1136         exp = NULL;
1137
1138         if (rc != 0)
1139                 GOTO(out, rc);
1140
1141         obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD);
1142
1143         if (aa->pcaa_initial_connect) {
1144                 spin_lock(&imp->imp_lock);
1145                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
1146                         imp->imp_replayable = 1;
1147                         CDEBUG(D_HA, "connected to replayable target: %s\n",
1148                                obd2cli_tgt(imp->imp_obd));
1149                 } else {
1150                         imp->imp_replayable = 0;
1151                 }
1152
1153                 /* if applies, adjust the imp->imp_msg_magic here
1154                  * according to reply flags
1155                  */
1156
1157                 imp->imp_remote_handle =
1158                         *lustre_msg_get_handle(request->rq_repmsg);
1159
1160                 imp->imp_no_cached_data = 1;
1161
1162                 /* Initial connects are allowed for clients with non-random
1163                  * uuids when servers are in recovery.  Simply signal the
1164                  * servers replay is complete and wait in REPLAY_WAIT.
1165                  */
1166                 if (msg_flags & MSG_CONNECT_RECOVERING) {
1167                         CDEBUG(D_HA, "connect to %s during recovery\n",
1168                                obd2cli_tgt(imp->imp_obd));
1169                         import_set_state_nolock(imp, LUSTRE_IMP_REPLAY_LOCKS);
1170                         spin_unlock(&imp->imp_lock);
1171                 } else {
1172                         spin_unlock(&imp->imp_lock);
1173                         ptlrpc_activate_import(imp, true);
1174                 }
1175
1176                 GOTO(finish, rc = 0);
1177         }
1178
1179         /* Determine what recovery state to move the import to. */
1180         if (MSG_CONNECT_RECONNECT & msg_flags) {
1181                 memset(&old_hdl, 0, sizeof(old_hdl));
1182                 if (!memcmp(&old_hdl, lustre_msg_get_handle(request->rq_repmsg),
1183                             sizeof(old_hdl))) {
1184                         LCONSOLE_WARN("Reconnect to %s (at @%s) failed due bad handle %#llx\n",
1185                                       obd2cli_tgt(imp->imp_obd),
1186                                       imp->imp_connection->c_remote_uuid.uuid,
1187                                       imp->imp_dlm_handle.cookie);
1188                         GOTO(out, rc = -ENOTCONN);
1189                 }
1190
1191                 if (memcmp(&imp->imp_remote_handle,
1192                            lustre_msg_get_handle(request->rq_repmsg),
1193                            sizeof(imp->imp_remote_handle))) {
1194                         int level = msg_flags & MSG_CONNECT_RECOVERING ?
1195                                 D_HA : D_WARNING;
1196
1197                         /* Bug 16611/14775: if server handle have changed,
1198                          * that means some sort of disconnection happened.
1199                          * If the server is not in recovery, that also means it
1200                          * already erased all of our state because of previous
1201                          * eviction. If it is in recovery - we are safe to
1202                          * participate since we can reestablish all of our state
1203                          * with server again
1204                          */
1205                         if ((MSG_CONNECT_RECOVERING & msg_flags)) {
1206                                 CDEBUG_LIMIT(level,
1207                                        "%s@%s changed server handle from %#llx to %#llx but is still in recovery\n",
1208                                        obd2cli_tgt(imp->imp_obd),
1209                                        imp->imp_connection->c_remote_uuid.uuid,
1210                                        imp->imp_remote_handle.cookie,
1211                                        lustre_msg_get_handle(
1212                                                request->rq_repmsg)->cookie);
1213                         } else {
1214                                 LCONSOLE_WARN("Evicted from %s (at %s) after server handle changed from %#llx to %#llx\n",
1215                                               obd2cli_tgt(imp->imp_obd),
1216                                               imp->imp_connection->c_remote_uuid.uuid,
1217                                               imp->imp_remote_handle.cookie,
1218                                               lustre_msg_get_handle(
1219                                                    request->rq_repmsg)->cookie);
1220                         }
1221
1222                         imp->imp_remote_handle =
1223                                 *lustre_msg_get_handle(request->rq_repmsg);
1224
1225                         if (!(MSG_CONNECT_RECOVERING & msg_flags)) {
1226                                 import_set_state(imp, LUSTRE_IMP_EVICTED);
1227                                 GOTO(finish, rc = 0);
1228                         }
1229                 } else {
1230                         CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
1231                                obd2cli_tgt(imp->imp_obd),
1232                                imp->imp_connection->c_remote_uuid.uuid);
1233                 }
1234
1235                 if (imp->imp_invalid) {
1236                         CDEBUG(D_HA, "%s: reconnected but import is invalid; "
1237                                "marking evicted\n", imp->imp_obd->obd_name);
1238                         import_set_state(imp, LUSTRE_IMP_EVICTED);
1239                 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
1240                         CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
1241                                imp->imp_obd->obd_name,
1242                                obd2cli_tgt(imp->imp_obd));
1243
1244                         spin_lock(&imp->imp_lock);
1245                         imp->imp_resend_replay = 1;
1246                         spin_unlock(&imp->imp_lock);
1247
1248                         import_set_state(imp, imp->imp_replay_state);
1249                 } else {
1250                         import_set_state(imp, LUSTRE_IMP_RECOVER);
1251                 }
1252         } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
1253                 LASSERT(imp->imp_replayable);
1254                 imp->imp_remote_handle =
1255                         *lustre_msg_get_handle(request->rq_repmsg);
1256                 imp->imp_last_replay_transno = 0;
1257                 imp->imp_replay_cursor = &imp->imp_committed_list;
1258                 import_set_state(imp, LUSTRE_IMP_REPLAY);
1259         } else if ((ocd->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0 &&
1260                    !imp->imp_invalid) {
1261
1262                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
1263                 /* The below message is checked in recovery-small.sh test_106 */
1264                 DEBUG_REQ(D_HA, request, "%s: lwp recover",
1265                           imp->imp_obd->obd_name);
1266                 imp->imp_remote_handle =
1267                         *lustre_msg_get_handle(request->rq_repmsg);
1268                 import_set_state(imp, LUSTRE_IMP_RECOVER);
1269         } else {
1270                 imp->imp_remote_handle =
1271                         *lustre_msg_get_handle(request->rq_repmsg);
1272                 if (!imp->imp_no_cached_data) {
1273                         DEBUG_REQ(D_HA, request,
1274                                   "%s: evicting (reconnect/recover flags not set: %x)",
1275                                   imp->imp_obd->obd_name, msg_flags);
1276                         import_set_state(imp, LUSTRE_IMP_EVICTED);
1277                 } else {
1278                         ptlrpc_activate_import(imp, true);
1279                 }
1280         }
1281
1282         /* Sanity checks for a reconnected import. */
1283         if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE))
1284                 CERROR("imp_replayable flag does not match server after reconnect. We should LBUG right here.\n");
1285
1286         if (lustre_msg_get_last_committed(request->rq_repmsg) > 0 &&
1287             lustre_msg_get_last_committed(request->rq_repmsg) <
1288             aa->pcaa_peer_committed) {
1289                 static bool printed;
1290
1291                 /* The below message is checked in recovery-small.sh test_54 */
1292                 CERROR("%s: went back in time (transno %lld was previously committed, server now claims %lld)!\n",
1293                        obd2cli_tgt(imp->imp_obd), aa->pcaa_peer_committed,
1294                        lustre_msg_get_last_committed(request->rq_repmsg));
1295                 if (!printed) {
1296                         CERROR("For further information, see http://doc.lustre.org/lustre_manual.xhtml#went_back_in_time\n");
1297                         printed = true;
1298                 }
1299         }
1300
1301 finish:
1302         ptlrpc_prepare_replay(imp);
1303         rc = ptlrpc_import_recovery_state_machine(imp);
1304         if (rc == -ENOTCONN) {
1305                 CDEBUG(D_HA,
1306                        "evicted/aborted by %s@%s during recovery; invalidating and reconnecting\n",
1307                        obd2cli_tgt(imp->imp_obd),
1308                        imp->imp_connection->c_remote_uuid.uuid);
1309                 ptlrpc_connect_import(imp);
1310                 spin_lock(&imp->imp_lock);
1311                 imp->imp_connected = 0;
1312                 imp->imp_connect_tried = 1;
1313                 spin_unlock(&imp->imp_lock);
1314                 RETURN(0);
1315         }
1316
1317 out:
1318         if (exp != NULL)
1319                 class_export_put(exp);
1320
1321         spin_lock(&imp->imp_lock);
1322         imp->imp_connected = 0;
1323         imp->imp_connect_tried = 1;
1324
1325         if (rc != 0) {
1326                 bool inact = false;
1327                 time64_t now = ktime_get_seconds();
1328                 time64_t next_connect;
1329
1330                 import_set_state_nolock(imp, LUSTRE_IMP_DISCON);
1331                 if (rc == -EACCES || rc == -EROFS) {
1332                         /*
1333                          * Give up trying to reconnect
1334                          * EACCES means client has no permission for connection
1335                          * EROFS means client must mount read-only
1336                          */
1337                         imp->imp_obd->obd_no_recov = 1;
1338                         ptlrpc_deactivate_import_nolock(imp);
1339                         inact = true;
1340                 } else if (rc == -EPROTO) {
1341                         struct obd_connect_data *ocd;
1342
1343                         /* reply message might not be ready */
1344                         if (request->rq_repmsg == NULL) {
1345                                 spin_unlock(&imp->imp_lock);
1346                                 RETURN(-EPROTO);
1347                         }
1348
1349                         ocd = req_capsule_server_get(&request->rq_pill,
1350                                                      &RMF_CONNECT_DATA);
1351                         /* Servers are not supposed to refuse connections from
1352                          * clients based on version, only connection feature
1353                          * flags.  We should never see this from llite, but it
1354                          * may be useful for debugging in the future.
1355                          */
1356                         if (ocd &&
1357                             (ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
1358                             (ocd->ocd_version != LUSTRE_VERSION_CODE)) {
1359                                 LCONSOLE_ERROR_MSG(0x16a, "Server %s version (%d.%d.%d.%d) refused connection from this client with an incompatible version (%s). Client must be recompiled\n",
1360                                                    obd2cli_tgt(imp->imp_obd),
1361                                                    OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
1362                                                    OBD_OCD_VERSION_MINOR(ocd->ocd_version),
1363                                                    OBD_OCD_VERSION_PATCH(ocd->ocd_version),
1364                                                    OBD_OCD_VERSION_FIX(ocd->ocd_version),
1365                                                    LUSTRE_VERSION_STRING);
1366                                 ptlrpc_deactivate_import_nolock(imp);
1367                                 import_set_state_nolock(imp, LUSTRE_IMP_CLOSED);
1368                                 inact = true;
1369                         }
1370                 } else if (rc == -ENODEV || rc == -ETIMEDOUT) {
1371                         /* ENODEV means there is no service, force reconnection
1372                          * to a pair if attempt happen ptlrpc_next_reconnect
1373                          * before now. ETIMEDOUT could be set during network
1374                          * error and do not guarantee request deadline happened.
1375                          */
1376                         struct obd_import_conn *conn;
1377                         time64_t reconnect_time;
1378
1379                         /* Same as ptlrpc_next_reconnect, but in past */
1380                         reconnect_time = now - INITIAL_CONNECT_TIMEOUT;
1381                         list_for_each_entry(conn, &imp->imp_conn_list,
1382                                             oic_item) {
1383                                 if (conn->oic_last_attempt <= reconnect_time) {
1384                                         imp->imp_force_verify = 1;
1385                                         break;
1386                                 }
1387                         }
1388                 }
1389
1390                 next_connect = imp->imp_conn_current->oic_last_attempt +
1391                                (request->rq_deadline - request->rq_sent);
1392                 spin_unlock(&imp->imp_lock);
1393
1394                 if (inact)
1395                         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
1396
1397                 if (rc == -EPROTO)
1398                         RETURN(rc);
1399
1400                 /* adjust imp_next_ping to request deadline + 1 and reschedule
1401                  * a pinger if import lost processing during CONNECTING or far
1402                  * away from request deadline. It could happen when connection
1403                  * was initiated outside of pinger, like
1404                  * ptlrpc_set_import_discon().
1405                  */
1406                 if (!imp->imp_force_verify && (imp->imp_next_ping <= now ||
1407                     imp->imp_next_ping > next_connect)) {
1408                         imp->imp_next_ping = max(now, next_connect) + 1;
1409                         ptlrpc_pinger_wake_up();
1410                 }
1411
1412                 ptlrpc_maybe_ping_import_soon(imp);
1413
1414                 CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
1415                        obd2cli_tgt(imp->imp_obd),
1416                        (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
1417         } else {
1418                 spin_unlock(&imp->imp_lock);
1419         }
1420
1421         wake_up(&imp->imp_recovery_waitq);
1422         RETURN(rc);
1423 }
1424
1425 /**
1426  * interpret callback for "completed replay" RPCs.
1427  * \see signal_completed_replay
1428  */
1429 static int completed_replay_interpret(const struct lu_env *env,
1430                                       struct ptlrpc_request *req,
1431                                       void *args, int rc)
1432 {
1433         ENTRY;
1434         atomic_dec(&req->rq_import->imp_replay_inflight);
1435         if (req->rq_status == 0 && !req->rq_import->imp_vbr_failed) {
1436                 ptlrpc_import_recovery_state_machine(req->rq_import);
1437         } else {
1438                 if (req->rq_import->imp_vbr_failed) {
1439                         CDEBUG(D_WARNING,
1440                                "%s: version recovery fails, reconnecting\n",
1441                                req->rq_import->imp_obd->obd_name);
1442                 } else {
1443                         CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, reconnecting\n",
1444                                req->rq_import->imp_obd->obd_name,
1445                                req->rq_status);
1446                 }
1447                 ptlrpc_connect_import(req->rq_import);
1448         }
1449
1450         RETURN(0);
1451 }
1452
1453 /**
1454  * Let server know that we have no requests to replay anymore.
1455  * Achieved by just sending a PING request
1456  */
1457 static int signal_completed_replay(struct obd_import *imp)
1458 {
1459         struct ptlrpc_request *req;
1460
1461         ENTRY;
1462
1463         if (unlikely(CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_FINISH_REPLAY)))
1464                 RETURN(0);
1465
1466         if (!atomic_add_unless(&imp->imp_replay_inflight, 1, 1))
1467                 RETURN(0);
1468
1469         req = ptlrpc_request_alloc_pack(imp, &RQF_OBD_PING, LUSTRE_OBD_VERSION,
1470                                         OBD_PING);
1471         if (req == NULL) {
1472                 atomic_dec(&imp->imp_replay_inflight);
1473                 RETURN(-ENOMEM);
1474         }
1475
1476         ptlrpc_request_set_replen(req);
1477         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
1478         lustre_msg_add_flags(req->rq_reqmsg,
1479                              MSG_LOCK_REPLAY_DONE | MSG_REQ_REPLAY_DONE);
1480         if (obd_at_off(imp->imp_obd))
1481                 req->rq_timeout *= 3;
1482         req->rq_interpret_reply = completed_replay_interpret;
1483
1484         ptlrpcd_add_req(req);
1485         RETURN(0);
1486 }
1487
1488 /**
1489  * In kernel code all import invalidation happens in its own
1490  * separate thread, so that whatever application happened to encounter
1491  * a problem could still be killed or otherwise continue
1492  */
1493 static int ptlrpc_invalidate_import_thread(void *data)
1494 {
1495         struct obd_import *imp = data;
1496
1497         ENTRY;
1498         unshare_fs_struct();
1499         CDEBUG(D_HA, "thread invalidate import %s to %s@%s\n",
1500                imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
1501                imp->imp_connection->c_remote_uuid.uuid);
1502
1503         if (do_dump_on_eviction(imp->imp_obd)) {
1504                 CERROR("dump the log upon eviction\n");
1505                 libcfs_debug_dumplog();
1506         }
1507
1508         ptlrpc_invalidate_import(imp);
1509         import_set_state(imp, LUSTRE_IMP_RECOVER);
1510         ptlrpc_import_recovery_state_machine(imp);
1511
1512         class_import_put(imp);
1513         RETURN(0);
1514 }
1515
1516 /**
1517  * This is the state machine for client-side recovery on import.
1518  *
1519  * Typicaly we have two possibly paths. If we came to server and it is not
1520  * in recovery, we just enter IMP_EVICTED state, invalidate our import
1521  * state and reconnect from scratch.
1522  * If we came to server that is in recovery, we enter IMP_REPLAY import state.
1523  * We go through our list of requests to replay and send them to server one by
1524  * one.
1525  * After sending all request from the list we change import state to
1526  * IMP_REPLAY_LOCKS and re-request all the locks we believe we have from server
1527  * and also all the locks we don't yet have and wait for server to grant us.
1528  * After that we send a special "replay completed" request and change import
1529  * state to IMP_REPLAY_WAIT.
1530  * Upon receiving reply to that "replay completed" RPC we enter IMP_RECOVER
1531  * state and resend all requests from sending list.
1532  * After that we promote import to FULL state and send all delayed requests
1533  * and import is fully operational after that.
1534  *
1535  */
1536 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
1537 {
1538         int rc = 0;
1539         int inflight;
1540         char *target_start;
1541         int target_len;
1542
1543         ENTRY;
1544         if (imp->imp_state == LUSTRE_IMP_EVICTED) {
1545                 struct task_struct *task;
1546                 u64 connect_flags;
1547
1548                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1549                           &target_start, &target_len);
1550                 connect_flags = imp->imp_connect_data.ocd_connect_flags;
1551                 /* Don't care about MGC eviction */
1552                 if (strcmp(imp->imp_obd->obd_type->typ_name,
1553                            LUSTRE_MGC_NAME) != 0 &&
1554                     (connect_flags & OBD_CONNECT_LIGHTWEIGHT) == 0) {
1555                         LCONSOLE_ERROR_MSG(0x167, "%s: This client was evicted by %.*s; in progress operations using this service will fail.\n",
1556                                            imp->imp_obd->obd_name, target_len,
1557                                            target_start);
1558                         LASSERTF(!obd_lbug_on_eviction, "LBUG upon eviction\n");
1559                 }
1560                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
1561                        obd2cli_tgt(imp->imp_obd),
1562                        imp->imp_connection->c_remote_uuid.uuid);
1563                 /* reset vbr_failed flag upon eviction */
1564                 spin_lock(&imp->imp_lock);
1565                 imp->imp_vbr_failed = 0;
1566                 spin_unlock(&imp->imp_lock);
1567
1568                 /* bug 17802:  XXX client_disconnect_export vs connect request
1569                  * race. if client is evicted at this time then we start
1570                  * invalidate thread without reference to import and import can
1571                  * be freed at same time.
1572                  */
1573                 class_import_get(imp);
1574                 task = kthread_run(ptlrpc_invalidate_import_thread, imp,
1575                                    "ll_imp_inval");
1576                 if (IS_ERR(task)) {
1577                         class_import_put(imp);
1578                         rc = PTR_ERR(task);
1579                         CERROR("%s: can't start invalidate thread: rc = %d\n",
1580                                imp->imp_obd->obd_name, rc);
1581                 } else {
1582                         rc = 0;
1583                 }
1584                 RETURN(rc);
1585         }
1586
1587         if (imp->imp_state == LUSTRE_IMP_REPLAY) {
1588                 CDEBUG(D_HA, "replay requested by %s\n",
1589                        obd2cli_tgt(imp->imp_obd));
1590                 rc = ptlrpc_replay_next(imp, &inflight);
1591                 if (inflight == 0 &&
1592                     atomic_read(&imp->imp_replay_inflight) == 0) {
1593                         import_set_state(imp, LUSTRE_IMP_REPLAY_LOCKS);
1594                         rc = ldlm_replay_locks(imp);
1595                         if (rc)
1596                                 GOTO(out, rc);
1597                 }
1598                 rc = 0;
1599         }
1600
1601         if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
1602                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1603                         import_set_state(imp, LUSTRE_IMP_REPLAY_WAIT);
1604                         rc = signal_completed_replay(imp);
1605                         if (rc)
1606                                 GOTO(out, rc);
1607                 }
1608         }
1609
1610         if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
1611                 if (atomic_read(&imp->imp_replay_inflight) == 0)
1612                         import_set_state(imp, LUSTRE_IMP_RECOVER);
1613         }
1614
1615         if (imp->imp_state == LUSTRE_IMP_RECOVER) {
1616                 struct ptlrpc_connection *conn = imp->imp_connection;
1617
1618                 rc = ptlrpc_resend(imp);
1619                 if (rc)
1620                         GOTO(out, rc);
1621                 ptlrpc_activate_import(imp, true);
1622
1623                 /* Reverse import are flagged with dlm_fake == 1.
1624                  * They do not do recovery and connection are not "restored".
1625                  */
1626                 if (!imp->imp_dlm_fake)
1627                         CDEBUG_LIMIT(imp->imp_was_idle ?
1628                                         imp->imp_idle_debug : D_CONSOLE,
1629                                      "%s: Connection restored to %s (at %s)\n",
1630                                      imp->imp_obd->obd_name,
1631                                      obd_uuid2str(&conn->c_remote_uuid),
1632                                      obd_import_nid2str(imp));
1633                 spin_lock(&imp->imp_lock);
1634                 imp->imp_was_idle = 0;
1635                 spin_unlock(&imp->imp_lock);
1636         }
1637
1638         if (imp->imp_state == LUSTRE_IMP_FULL) {
1639                 wake_up(&imp->imp_recovery_waitq);
1640                 ptlrpc_wake_delayed(imp);
1641         }
1642
1643 out:
1644         RETURN(rc);
1645 }
1646
1647 static struct ptlrpc_request *ptlrpc_disconnect_prep_req(struct obd_import *imp)
1648 {
1649         struct ptlrpc_request *req;
1650         int rq_opc, rc = 0;
1651
1652         ENTRY;
1653
1654         switch (imp->imp_connect_op) {
1655         case OST_CONNECT:
1656                 rq_opc = OST_DISCONNECT;
1657                 break;
1658         case MDS_CONNECT:
1659                 rq_opc = MDS_DISCONNECT;
1660                 break;
1661         case MGS_CONNECT:
1662                 rq_opc = MGS_DISCONNECT;
1663                 break;
1664         default:
1665                 rc = -EINVAL;
1666                 CERROR("%s: don't know how to disconnect from %s (connect_op %d): rc = %d\n",
1667                        imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
1668                        imp->imp_connect_op, rc);
1669                 RETURN(ERR_PTR(rc));
1670         }
1671
1672         req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_DISCONNECT,
1673                                         LUSTRE_OBD_VERSION, rq_opc);
1674         if (req == NULL)
1675                 RETURN(ERR_PTR(-ENOMEM));
1676
1677         /* We are disconnecting, do not retry a failed DISCONNECT rpc if
1678          * it fails.  We can get through the above with a down server
1679          * if the client doesn't know the server is gone yet.
1680          */
1681         req->rq_no_resend = 1;
1682
1683         /* We want client umounts to happen quickly, no matter server state */
1684         req->rq_timeout = min_t(timeout_t, req->rq_timeout,
1685                                 INITIAL_CONNECT_TIMEOUT);
1686
1687         req->rq_send_state =  LUSTRE_IMP_CONNECTING;
1688         ptlrpc_request_set_replen(req);
1689
1690         RETURN(req);
1691 }
1692
1693 int ptlrpc_disconnect_import(struct obd_import *imp, int noclose)
1694 {
1695         struct ptlrpc_request *req;
1696         int rc = 0;
1697
1698         ENTRY;
1699
1700         if (imp->imp_obd->obd_force)
1701                 GOTO(set_state, rc);
1702
1703         /* probably the import has been disconnected already being idle */
1704         spin_lock(&imp->imp_lock);
1705         if (imp->imp_state == LUSTRE_IMP_IDLE)
1706                 GOTO(out, rc);
1707         spin_unlock(&imp->imp_lock);
1708
1709         if (ptlrpc_import_in_recovery(imp)) {
1710                 long timeout_jiffies;
1711                 time64_t timeout;
1712
1713                 if (obd_at_off(imp->imp_obd)) {
1714                         if (imp->imp_server_timeout)
1715                                 timeout = obd_timeout >> 1;
1716                         else
1717                                 timeout = obd_timeout;
1718                 } else {
1719                         u32 req_portal;
1720                         int idx;
1721
1722                         req_portal = imp->imp_client->cli_request_portal;
1723                         idx = import_at_get_index(imp, req_portal);
1724                         timeout = obd_at_get(imp->imp_obd,
1725                                         &imp->imp_at.iat_service_estimate[idx]);
1726                 }
1727
1728                 timeout_jiffies = cfs_time_seconds(timeout);
1729                 if (wait_event_idle_timeout(imp->imp_recovery_waitq,
1730                                             !ptlrpc_import_in_recovery(imp),
1731                                             timeout_jiffies) == 0 &&
1732                     l_wait_event_abortable(imp->imp_recovery_waitq,
1733                                            !ptlrpc_import_in_recovery(imp)) < 0)
1734                         rc = -EINTR;
1735         }
1736
1737         req = ptlrpc_disconnect_prep_req(imp);
1738         if (IS_ERR(req))
1739                 GOTO(set_state, rc = PTR_ERR(req));
1740
1741         spin_lock(&imp->imp_lock);
1742         if (imp->imp_state != LUSTRE_IMP_FULL) {
1743                 ptlrpc_req_put_with_imp_lock(req);
1744                 GOTO(out, rc);
1745         }
1746         import_set_state_nolock(imp, LUSTRE_IMP_CONNECTING);
1747         spin_unlock(&imp->imp_lock);
1748
1749         rc = ptlrpc_queue_wait(req);
1750         ptlrpc_req_finished(req);
1751
1752 set_state:
1753         spin_lock(&imp->imp_lock);
1754 out:
1755         if (noclose)
1756                 import_set_state_nolock(imp, LUSTRE_IMP_DISCON);
1757         else
1758                 import_set_state_nolock(imp, LUSTRE_IMP_CLOSED);
1759         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
1760         spin_unlock(&imp->imp_lock);
1761
1762         obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
1763         if (!noclose)
1764                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
1765
1766         if (rc == -ETIMEDOUT || rc == -ENOTCONN || rc == -ESHUTDOWN)
1767                 rc = 0;
1768         RETURN(rc);
1769 }
1770 EXPORT_SYMBOL(ptlrpc_disconnect_import);
1771
1772 static void ptlrpc_reset_reqs_generation(struct obd_import *imp)
1773 {
1774         struct ptlrpc_request *old, *tmp;
1775
1776         /* tag all resendable requests generated before disconnection
1777          * notice this code is part of disconnect-at-idle path only
1778          */
1779         list_for_each_entry_safe(old, tmp, &imp->imp_delayed_list,
1780                         rq_list) {
1781                 spin_lock(&old->rq_lock);
1782                 if (old->rq_import_generation == imp->imp_generation - 1 &&
1783                     ((imp->imp_initiated_at == imp->imp_generation) ||
1784                      !old->rq_no_resend))
1785                         old->rq_import_generation = imp->imp_generation;
1786                 spin_unlock(&old->rq_lock);
1787         }
1788 }
1789
1790 static int ptlrpc_disconnect_idle_interpret(const struct lu_env *env,
1791                                             struct ptlrpc_request *req,
1792                                             void *args, int rc)
1793 {
1794         struct obd_import *imp = req->rq_import;
1795         int connect = 0;
1796
1797         DEBUG_REQ(D_HA, req, "inflight=%d, refcount=%d: rc = %d",
1798                   atomic_read(&imp->imp_inflight),
1799                   refcount_read(&imp->imp_refcount), rc);
1800
1801         spin_lock(&imp->imp_lock);
1802         /* DISCONNECT reply can be late and another connection can just
1803          * be initiated. so we have to abort disconnection.
1804          */
1805         if (req->rq_import_generation == imp->imp_generation &&
1806             imp->imp_state != LUSTRE_IMP_CLOSED) {
1807                 LASSERTF(imp->imp_state == LUSTRE_IMP_CONNECTING,
1808                          "%s\n", ptlrpc_import_state_name(imp->imp_state));
1809                 memset(&imp->imp_remote_handle, 0,
1810                        sizeof(imp->imp_remote_handle));
1811                 /* take our DISCONNECT into account */
1812                 if (atomic_read(&imp->imp_reqs) > 1) {
1813                         imp->imp_generation++;
1814                         imp->imp_initiated_at = imp->imp_generation;
1815                         import_set_state_nolock(imp, LUSTRE_IMP_NEW);
1816                         ptlrpc_reset_reqs_generation(imp);
1817                         connect = 1;
1818                 } else {
1819                         /* do not expose transient IDLE state */
1820                         import_set_state_nolock(imp, LUSTRE_IMP_IDLE);
1821                 }
1822         }
1823
1824         if (connect) {
1825                 rc = ptlrpc_connect_import_locked(imp);
1826                 if (rc >= 0)
1827                         ptlrpc_pinger_add_import(imp);
1828         } else {
1829                 spin_unlock(&imp->imp_lock);
1830         }
1831
1832         return 0;
1833 }
1834
1835 static bool ptlrpc_can_idle(struct obd_import *imp)
1836 {
1837         struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
1838
1839         /* one request for disconnect rpc */
1840         if (atomic_read(&imp->imp_reqs) > 1)
1841                 return false;
1842
1843         /* any lock increases ns_bref being a resource holder */
1844         if (ns && atomic_read(&ns->ns_bref) > 0)
1845                 return false;
1846
1847         return true;
1848 }
1849
1850 int ptlrpc_disconnect_and_idle_import(struct obd_import *imp)
1851 {
1852         struct ptlrpc_request *req;
1853
1854         ENTRY;
1855
1856         if (imp->imp_obd->obd_force)
1857                 RETURN(0);
1858
1859         if (ptlrpc_import_in_recovery(imp))
1860                 RETURN(0);
1861
1862         req = ptlrpc_disconnect_prep_req(imp);
1863         if (IS_ERR(req))
1864                 RETURN(PTR_ERR(req));
1865
1866         req->rq_interpret_reply = ptlrpc_disconnect_idle_interpret;
1867
1868         if (CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_IDLE_RACE)) {
1869                 __u32 idx;
1870
1871                 server_name2index(imp->imp_obd->obd_name, &idx, NULL);
1872                 if (idx == 0)
1873                         CFS_RACE(OBD_FAIL_PTLRPC_IDLE_RACE);
1874         }
1875
1876         spin_lock(&imp->imp_lock);
1877         if (imp->imp_state != LUSTRE_IMP_FULL || !ptlrpc_can_idle(imp)) {
1878                 ptlrpc_req_put_with_imp_lock(req);
1879                 spin_unlock(&imp->imp_lock);
1880                 RETURN(0);
1881         }
1882         import_set_state_nolock(imp, LUSTRE_IMP_CONNECTING);
1883         /* don't make noise at reconnection */
1884         imp->imp_was_idle = 1;
1885         spin_unlock(&imp->imp_lock);
1886
1887         CDEBUG_LIMIT(imp->imp_idle_debug, "%s: disconnect after %llus idle\n",
1888                      imp->imp_obd->obd_name,
1889                      ktime_get_real_seconds() - imp->imp_last_reply_time);
1890
1891         ptlrpcd_add_req(req);
1892
1893         RETURN(1);
1894 }
1895 EXPORT_SYMBOL(ptlrpc_disconnect_and_idle_import);
1896
1897 void ptlrpc_cleanup_imp(struct obd_import *imp)
1898 {
1899         ENTRY;
1900
1901         spin_lock(&imp->imp_lock);
1902
1903         import_set_state_nolock(imp, LUSTRE_IMP_CLOSED);
1904         imp->imp_generation++;
1905         ptlrpc_abort_inflight(imp);
1906
1907         spin_unlock(&imp->imp_lock);
1908
1909         EXIT;
1910 }
1911
1912 /* Adaptive Timeout utils */
1913
1914 /* Update at_current_timeout with the specified value (bounded by at_min and
1915  * at_max), as well as the AT history "bins".
1916  *  - Bin into timeslices using AT_BINS bins.
1917  *  - This gives us a max of the last at_history seconds without the storage,
1918  *    but still smoothing out a return to normalcy from a slow response.
1919  *  - (E.g. remember the maximum latency in each minute of the last 4 minutes.)
1920  */
1921 timeout_t obd_at_measure(struct obd_device *obd, struct adaptive_timeout *at,
1922                             timeout_t timeout)
1923 {
1924         unsigned int l_at_min = obd_get_at_min(obd);
1925         unsigned int l_at_max = obd_get_at_max(obd);
1926         timeout_t old_timeout = at->at_current_timeout;
1927         time64_t now = ktime_get_real_seconds();
1928         long binlimit = max_t(long, obd_get_at_history(obd) / AT_BINS, 1);
1929
1930         LASSERT(at);
1931         CDEBUG(D_OTHER, "add %u to %p time=%lld v=%u (%u %u %u %u)\n",
1932                timeout, at, now - at->at_binstart, at->at_current_timeout,
1933                at->at_hist[0], at->at_hist[1], at->at_hist[2], at->at_hist[3]);
1934
1935         if (timeout <= 0)
1936                 /* Negative timeouts and 0's don't count, because we never
1937                  * want our timeout to drop to 0 or below, and because 0 could
1938                  * mean an error
1939                  */
1940                 return 0;
1941
1942         spin_lock(&at->at_lock);
1943
1944         if (unlikely(at->at_binstart == 0)) {
1945                 /* Special case to remove default from history */
1946                 at->at_current_timeout = timeout;
1947                 at->at_worst_timeout_ever = timeout;
1948                 at->at_worst_timestamp = now;
1949                 at->at_hist[0] = timeout;
1950                 at->at_binstart = now;
1951         } else if (now - at->at_binstart < binlimit) {
1952                 /* in bin 0 */
1953                 at->at_hist[0] = max_t(timeout_t, timeout, at->at_hist[0]);
1954                 at->at_current_timeout = max_t(timeout_t, timeout,
1955                                                at->at_current_timeout);
1956         } else {
1957                 int i, shift;
1958                 timeout_t maxv = timeout;
1959
1960                 /* move bins over */
1961                 shift = (u32)(now - at->at_binstart) / binlimit;
1962                 LASSERT(shift > 0);
1963                 for (i = AT_BINS - 1; i >= 0; i--) {
1964                         if (i >= shift) {
1965                                 at->at_hist[i] = at->at_hist[i - shift];
1966                                 maxv = max_t(timeout_t, maxv, at->at_hist[i]);
1967                         } else {
1968                                 at->at_hist[i] = 0;
1969                         }
1970                 }
1971                 at->at_hist[0] = timeout;
1972                 at->at_current_timeout = maxv;
1973                 at->at_binstart += shift * binlimit;
1974         }
1975
1976         if (at->at_current_timeout > at->at_worst_timeout_ever) {
1977                 at->at_worst_timeout_ever = at->at_current_timeout;
1978                 at->at_worst_timestamp = now;
1979         }
1980
1981         if (at->at_flags & AT_FLG_NOHIST)
1982                 /* Only keep last reported val; keeping the rest of the history
1983                  * for debugfs only
1984                  */
1985                 at->at_current_timeout = timeout;
1986
1987         if (l_at_max > 0)
1988                 at->at_current_timeout = min_t(timeout_t,
1989                                                at->at_current_timeout,
1990                                                l_at_max);
1991         at->at_current_timeout = max_t(timeout_t, at->at_current_timeout,
1992                                        l_at_min);
1993         if (at->at_current_timeout != old_timeout)
1994                 CDEBUG(D_OTHER,
1995                        "AT %p change: old=%u new=%u delta=%d (val=%d) hist %u %u %u %u\n",
1996                        at, old_timeout, at->at_current_timeout,
1997                        at->at_current_timeout - old_timeout, timeout,
1998                        at->at_hist[0], at->at_hist[1], at->at_hist[2],
1999                        at->at_hist[3]);
2000
2001         /* if we changed, report the old timeout value */
2002         old_timeout = (at->at_current_timeout != old_timeout) ? old_timeout : 0;
2003
2004         spin_unlock(&at->at_lock);
2005         return old_timeout;
2006 }
2007
2008 /* Find the imp_at index for a given portal; assign if space available */
2009 int import_at_get_index(struct obd_import *imp, int portal)
2010 {
2011         struct imp_at *at = &imp->imp_at;
2012         int i;
2013
2014         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
2015                 if (at->iat_portal[i] == portal)
2016                         return i;
2017                 if (at->iat_portal[i] == 0)
2018                         /* unused */
2019                         break;
2020         }
2021
2022         /* Not found in list, add it under a lock */
2023         spin_lock(&imp->imp_lock);
2024
2025         /* Check unused under lock */
2026         for (; i < IMP_AT_MAX_PORTALS; i++) {
2027                 if (at->iat_portal[i] == portal)
2028                         goto out;
2029                 if (at->iat_portal[i] == 0)
2030                         /* unused */
2031                         break;
2032         }
2033
2034         /* Not enough portals? */
2035         LASSERT(i < IMP_AT_MAX_PORTALS);
2036
2037         at->iat_portal[i] = portal;
2038 out:
2039         spin_unlock(&imp->imp_lock);
2040         return i;
2041 }